PAL:产品访问层
目录
产品访问层允许数据处理者、数据存储和数据消费者使用轻量级产品引用访问存储的逻辑 “pools” 数据。 数据产品可以包括使用相关数据的引用构建的上下文。 提供了一个 ProductStorage
接口来处理注册池中的保存/检索/查询数据。
基本原理
在数据处理管道或处理节点网络中,数据产品是在上下文中生成的,其中可能包括输入数据、参考数据和多种辅助数据。通常需要记录产品的相关上下文。然而,上下文可能有很大的尺寸,因此将它们的实际数据作为产品的元数据通常是不切实际的。
一旦生成 FDI 数据,它们就可以有一个引用,通过它可以访问它们。此类引用的大小通常小于几百字节,例如 URL。在产品上下文中只记录数据引用。
该包提供 MapContext
、ProductRef
、Urn
、ProductStorage
、ProductPool
和 Query
类(简化但大部分 API 与 Herschel Common Science System v15.0 兼容),用于存储、检索、标记和创建在数据集中建模的数据产品。
定义
URN
注解
以下来自:class:Urn
通用资源名称 (URN) 字符串具有以下格式:
urn:<poolname>:<resourcetype>:<serialnumber>
在哪里
- <poolname>
也称为池ID。它的要求类似于对文件名和 URL 段的要求,只有可打印的字符,不包括``
, ``%
,?
,*
,=
,/
.- <resourcetype>
数据项的类名(通常是:class:Product)
- <serialnumber>
某个 <resourcetype> 的内部索引
URN 中的 poolname
是一个标签。 一些例子:
存储池( ProductPool
的子类)是数据项所在的位置。PoolURL 用于提供池的实用信息,例如池名称、其位置和访问方案。它被设计为一个本地设置细节,应该对池用户隐藏。数据处理软件使用 URN
来指代产品,而不指定池位置。URN
中的 poolID 可以是开发笔记本电脑上的 LocalPool
和生产云上的 HTTPClientPool
。
注解
以下内容来自 parse_poolurl()
PoolURL
格式采用位于其池名称部分之前的 URL 的形式:
<scheme>://<place><poolpath>/<poolname>
- <scheme>
Implementation protocol including
file
forLocalPool
,mem
forMemPool
,http
,https
forHttpclientPool
.- <place>
IP:port such as``192.168.5.6:8080`` for
http
andhttps
schemes, or an empty string forfile
andmem
schemes.- <poolname>
same as in URN.
- <poolpath>
The part between
place
and an optionalpoolhint
:- <username>
- <password>
For
file
orserver
schemes, e.g. poolpath is/c:/tmp
inhttp://localhost:9000/c:/tmp/mypool/
withpoolhint
keyword arguement ofparse_poolurl()
not given, or given asmypool
(ormyp
ormy
…).For
http
andhttps
schemes, it is e.g./0.6/tmp
inhttps://10.0.0.114:5000/v0.6/tmp/mypool
withpoolhint
keyword arguement not given, or given asmypool
(ormyp` or 'my' ...). The meaning of poolpath is subject to interpretation by the server. In the preceeding example the poolpath has an API version. :meth:`ProductPool.transformpath` is used to map it further. Note that trailing blank and ``/
are ignored, and stripped in the output.
例子:
file:///tmp/mydata for pool
`mydata`
file:///d:/data/test2–v2 for pool
test2--v2
mem:///dummy for pool
dummy
https://10.0.0.114:5000/v0.6/obs for a httpclientpool
obs
server:///tmp/data/0.4/test for a pool
test
used on a server.
ProductRef
这个类不仅保存它引用的产品的 URN,还记录谁(_parents_)保持这个引用。
ProductStorage
用于保存/加载/查询/删除在概念池中组织的数据的集中访问位置。保存数据时会得到一个 ProductRef。
ProductPool
可以保存产品的地方,并带有生成的保存产品的引用。可以通过引用检索产品。可以实现基于不同媒体或网络机制的池。在 ProductStorage 前端可以注册多个池,用户可以在其中进行保存、加载、查询等操作,从而使池共同形成一个更大的逻辑存储。
引用 LocalPool 显示在以下类似 YAML 的示意图中:
Pool:!!dict
_classes:!!odict
product0_class_name:!!dict
currentSN:!!int #the serial number of the latest added prod to the pool
sn:!!list
- serial number of a prod
- serial number of a prod
- ...
product1_class_name:
...
_urns:!!odict
urn0:!!odict
meta:!!MetaData #prod.meta
tags:!!list
- $tag
- $tag
- ...
urn1:!!odict
...
_tags:!!odict
tag0:!!odict
urns:!!list
- $urn
- $urn
- ...
tag1:!!odict
...
urn0:!!serialized product
urn1:!!serialized product
...
例子(来自 快速开始)
本节展示了如何创建/获取一个 pool。
>>> # Create a product and a productStorage with a pool registered
... # First disable debugging messages
... logger = logging.getLogger('')
... logger.setLevel(logging.WARNING)
... # a pool (LocalPool) for demonstration will be create here
... demopoolname = 'demopool_' + getpass.getuser()
... demopoolpath = '/tmp/' + demopoolname
... demopoolurl = 'file://' + demopoolpath
... # clean possible data left from previous runs
... os.system('rm -rf ' + demopoolpath)
... if PoolManager.isLoaded(DEFAULT_MEM_POOL):
... PoolManager.getPool(DEFAULT_MEM_POOL).removeAll()
... PoolManager.getPool(demopoolname, demopoolurl).removeAll()
0
保存一个产品
本节展示了如何将产品存储在 “pool” 中并获取引用。
>>> # create a prooduct and save it to a pool
... x = Product(description='save me in store')
... # add a tabledataset
... s1 = [('energy', [1, 4.4, 5.6], 'eV'), ('freq', [0, 43.2, 2E3], 'Hz')]
... x["Spectrum"] = TableDataset(data=s1)
... # create a product store
... pstore = ProductStorage(poolurl=demopoolurl)
... # see what is in it.
... pstore
ProductStorage( pool= {'demopool_mh': <LocalPool poolname=demopool_mh, poolurl=file:///tmp/demopool_mh, _classes={}, _urns={}, _tags={}>} )
>>> # save the product and get a reference back.
... prodref = pstore.save(x)
... # This gives detailed information of the product being referenced
... print(prodref)
ProductRef {urn:demopool_mh:fdi.dataset.product.Product:0
# Parents=[]
# meta=
============ ==================== ====== ======== ======= ================= ====== =====================
name value unit type valid default code description
============ ==================== ====== ======== ======= ================= ====== =====================
description save me in store string None UNKNOWN B Description of this p
roduct
type Product string None Product B Product Type identifi
cation. Name of class
or CARD.
level ALL string None ALL B Product level.
creator UNKNOWN string None UNKNOWN B Generator of this pro
duct.
creationDate 1958-01-01T00:00:00. finetime None 1958-01-01T00:00: Q Creation date of this
000000 00.000000 product
0 0
rootCause UNKNOWN string None UNKNOWN B Reason of this run of
pipeline.
version 0.8 string None 0.8 B Version of product
FORMATV 1.6.0.10 string None 1.6.0.10 B Version of product sc
hema and revision
startDate 1958-01-01T00:00:00. finetime None 1958-01-01T00:00: Q Nominal start time o
000000 00.000000 f this product.
0 0
endDate 1958-01-01T00:00:00. finetime None 1958-01-01T00:00: Q Nominal end time of
000000 00.000000 this product.
0 0
instrument UNKNOWN string None UNKNOWN B Instrument that gener
ated data of this pro
duct
modelName UNKNOWN string None UNKNOWN B Model name of the ins
trument of this produ
ct
mission _AGS string None _AGS B Name of the mission.
============ ==================== ====== ======== ======= ================= ====== =====================
MetaData-listeners = ListnerSet{}}
>>> # get the URN string
... urn = prodref.urn
... print(urn) # urn:demopool_mh:fdi.dataset.product.Product:0
urn:demopool_mh:fdi.dataset.product.Product:0
>>> # re-create a product only using the urn
... newp = ProductRef(urn).product
... # the new and the old one are equal
... print(newp == x) # == True
True
Context and MapContext
Context 是一个 Product ,其中包含一组可通过键访问的 ProductRef
。键是 MapContext 的字符串,通常将名称映射到产品引用。
例子(来自 快速开始)
本节展示了如何在上下文中存储产品引用的基本步骤。
>>> p1 = Product(description='p1')
... p2 = Product(description='p2')
... # create an empty mapcontext that can carry references with name labels
... map1 = MapContext(description='product with refs 1')
... # A ProductRef created with the syntax of a lone product argument will use a MemPool
... pref1 = ProductRef(p1)
... pref1
ProductRef(urnobj=Urn(urn="urn:defaultmem:fdi.dataset.product.Product:0", _STID="Urn"), _STID="ProductRef")
>>> # A productStorage with a LocalPool -- a pool on the disk.
... pref2 = pstore.save(p2)
... pref2.urn
'urn:demopool_mh:fdi.dataset.product.Product:1'
>>> # how many prodrefs do we have?
... map1['refs'].size() # == 0
0
>>> # how many 'parents' do these prodrefs have before saved?
... len(pref1.parents) # == 0
0
>>> len(pref2.parents) # == 0
0
>>> # add a ref to the context. Every productref has a name in a MapContext
... map1['refs']['spam'] = pref1
... # add the second one
... map1['refs']['egg'] = pref2
... # how many prodrefs do we have?
... map1['refs'].size() # == 2
2
>>> # parent list of the productref object now has an entry
... len(pref2.parents) # == 1
1
>>> pref2.parents[0] == map1
True
>>> pref1.parents[0] == map1
True
>>> # remove a ref
... del map1['refs']['spam']
... map1.refs.size() # == 1
1
>>> # how many prodrefs do we have?
... len(pref1.parents) # == 0
0
>>> # add ref2 to another map
... map2 = MapContext(description='product with refs 2')
... map2.refs['also2'] = pref2
... map2['refs'].size() # == 1
1
>>> # two parents
... len(pref2.parents) # == 2
2
>>> pref2.parents[1] == map2
True
查询
可以对 ProductStorage 进行查询,并获取对满足搜索条件的产品的引用列表。可以使用有关产品及其元数据的 Python 谓词表达式或返回 True 或 False 的函数来构造查询。
例子(来自 快速开始)
可以使用 Python 语法,利用标签、元数据中存储的属性甚至存储产品中的数据来查询附加了池的 ProductStorage
。
>>> # clean possible data left from previous runs
... poolname = 'fdi_pool_' + getpass.getuser()
... poolpath = '/tmp/' + poolname
... newpoolname = 'fdi_newpool_' + getpass.getuser()
... newpoolpath = '/tmp/' + newpoolname
... os.system('rm -rf ' + poolpath)
... os.system('rm -rf ' + newpoolpath)
... poolurl = 'file://' + poolpath
... newpoolurl = 'file://' + newpoolpath
... if PoolManager.isLoaded(DEFAULT_MEM_POOL):
... PoolManager.getPool(DEFAULT_MEM_POOL).removeAll()
... PoolManager.getPool(poolname, poolurl).removeAll()
... PoolManager.getPool(newpoolname, newpoolurl).removeAll()
... # make a productStorage
... pstore = ProductStorage(poolurl=poolurl)
... # make another
... pstore2 = ProductStorage(poolurl=newpoolurl)
>>> # add some products to both storages. The product properties are different.
... n = 7
... for i in range(n):
... # three counters for properties to be queried.
... a0, a1, a2 = 'desc %d' % i, 'fatman %d' % (i*4), 5000+i
... if i < 3:
... # Product type
... x = Product(description=a0, creator=a1)
... x.meta['extra'] = Parameter(value=a2)
... elif i < 5:
... ...
... x.meta['time'] = Parameter(value=FineTime1(a2))
... if i < 4:
... # some are stored in one pool
... r = pstore.save(x)
... else:
... # some the other
... r = pstore2.save(x)
... print(r.urn)
... # Two pools, 7 products in 3 types
... # [P P P C] [C M M]
urn:fdi_pool_mh:fdi.dataset.product.Product:0
urn:fdi_pool_mh:fdi.dataset.product.Product:1
urn:fdi_pool_mh:fdi.dataset.product.Product:2
urn:fdi_pool_mh:fdi.pal.context.Context:0
urn:fdi_newpool_mh:fdi.pal.context.Context:0
urn:fdi_newpool_mh:fdi.pal.context.MapContext:0
urn:fdi_newpool_mh:fdi.pal.context.MapContext:1
>>> # register the new pool above to the 1st productStorage
... pstore.register(newpoolname)
... len(pstore.getPools()) # == 2
2
>>> # make a query on product metadata, which is the variable 'm'
... # in the query expression, i.e. ``m = product.meta; ...``
... # But '5000 < m["extra"]' does not work. see tests/test.py.
... q = MetaQuery(Product, 'm["extra"] > 5000 and m["extra"] <= 5005')
... # search all pools registered on pstore
... res = pstore.select(q)
... # we expect [#2, #3] Contex is not a subclass of Product, which is being searched
... len(res) # == 2
2
>>> # see
... [r.product.description for r in res]
['desc 1', 'desc 2']
>>> def t(m):
... # query is a function
... import re
... # 'creator' matches the regex pattern: 'n' + ? + '1'
... return re.match('.*n.1.*', m['creator'].value)
>>> q = MetaQuery(BaseProduct, t)
... res = pstore.select(q)
... # expecting [3,4]
... [r.product.creator for r in res]
['fatman 12', 'fatman 16']
>>>
运行测试
要测试基于本地 (JSON) 池和内存池的 PAL 功能,请在同一目录中运行:
make test2
要测试基于 http 客户端池的功能,请在一个终端运行
make runpoolserver
运行
make testhttp
检查输出
设计
包
类