PAL:产品访问层

产品访问层允许数据处理者、数据存储和数据消费者使用轻量级产品引用访问存储的逻辑 “pools” 数据。 数据产品可以包括使用相关数据的引用构建的上下文。 提供了一个 ProductStorage 接口来处理注册池中的保存/检索/查询数据。

基本原理

在数据处理管道或处理节点网络中,数据产品是在上下文中生成的,其中可能包括输入数据、参考数据和多种辅助数据。通常需要记录产品的相关上下文。然而,上下文可能有很大的尺寸,因此将它们的实际数据作为产品的元数据通常是不切实际的。

一旦生成 FDI 数据,它们就可以有一个引用,通过它可以访问它们。此类引用的大小通常小于几百字节,例如 URL。在产品上下文中只记录数据引用。

该包提供 MapContextProductRefUrnProductStorageProductPoolQuery 类(简化但大部分 API 与 Herschel Common Science System v15.0 兼容),用于存储、检索、标记和创建在数据集中建模的数据产品。

定义

URN

注解

以下来自:class:Urn

通用资源名称 (URN) 字符串具有以下格式:

urn:<poolname>:<resourcetype>:<serialnumber>

在哪里

<poolname>

也称为池ID。它的要求类似于对文件名和 URL 段的要求,只有可打印的字符,不包括`` , ``%, ?, *, =, /.

<resourcetype>

数据项的类名(通常是:class:Product

<serialnumber>

某个 <resourcetype> 的内部索引

URN 中的 poolname 是一个标签。 一些例子:

存储池( ProductPool 的子类)是数据项所在的位置。PoolURL 用于提供池的实用信息,例如池名称、其位置和访问方案。它被设计为一个本地设置细节,应该对池用户隐藏。数据处理软件使用 URN 来指代产品,而不指定池位置。URN 中的 poolID 可以是开发笔记本电脑上的 LocalPool 和生产云上的 HTTPClientPool

注解

以下内容来自 parse_poolurl()

PoolURL 格式采用位于其池名称部分之前的 URL 的形式:

<scheme>://<place><poolpath>/<poolname>

<scheme>

Implementation protocol including file for LocalPool, mem for MemPool, http, https for HttpclientPool.

<place>

IP:port such as``192.168.5.6:8080`` for http and https schemes, or an empty string for file and mem schemes.

<poolname>

same as in URN.

<poolpath>

The part between place and an optional poolhint:

<username>

<password>

  • For file or server schemes, e.g. poolpath is /c:/tmp in http://localhost:9000/c:/tmp/mypool/ with poolhint keyword arguement of parse_poolurl() not given, or given as mypool (or myp or my …).

  • For http and https schemes, it is e.g. /0.6/tmp in https://10.0.0.114:5000/v0.6/tmp/mypool with poolhint keyword arguement not given, or given as mypool (or myp` or 'my' ...). The meaning of poolpath is subject to interpretation by the  server. In the preceeding example the poolpath has an API version.  :meth:`ProductPool.transformpath` is used to map it further. Note that trailing blank and ``/ are ignored, and stripped in the output.

例子:

ProductRef

这个类不仅保存它引用的产品的 URN,还记录谁(_parents_)保持这个引用。

ProductStorage

用于保存/加载/查询/删除在概念池中组织的数据的集中访问位置。保存数据时会得到一个 ProductRef。

ProductPool

可以保存产品的地方,并带有生成的保存产品的引用。可以通过引用检索产品。可以实现基于不同媒体或网络机制的池。在 ProductStorage 前端可以注册多个池,用户可以在其中进行保存、加载、查询等操作,从而使池共同形成一个更大的逻辑存储。

引用 LocalPool 显示在以下类似 YAML 的示意图中:

Pool:!!dict
       _classes:!!odict
           product0_class_name:!!dict
                   currentSN:!!int #the serial number of the latest added prod to the pool
                          sn:!!list
                              - serial number of a prod
                              - serial number of a prod
                              - ...
           product1_class_name:
           ...
       _urns:!!odict
           urn0:!!odict
                   meta:!!MetaData #prod.meta
                   tags:!!list
                         - $tag
                         - $tag
                         - ...
           urn1:!!odict
           ...
       _tags:!!odict
           tag0:!!odict
                   urns:!!list
                        - $urn
                        - $urn
                        - ...
           tag1:!!odict
           ...

       urn0:!!serialized product
       urn1:!!serialized product
       ...

例子(来自 快速开始


本节展示了如何创建/获取一个 pool

>>> # Create a product and a productStorage with a pool registered
... # First disable debugging messages
... logger = logging.getLogger('')
... logger.setLevel(logging.WARNING)
... # a pool (LocalPool) for demonstration will be create here
... demopoolname = 'demopool_' + getpass.getuser()
... demopoolpath = '/tmp/' + demopoolname
... demopoolurl = 'file://' + demopoolpath
... # clean possible data left from previous runs
... os.system('rm -rf ' + demopoolpath)
... if PoolManager.isLoaded(DEFAULT_MEM_POOL):
...     PoolManager.getPool(DEFAULT_MEM_POOL).removeAll()
... PoolManager.getPool(demopoolname, demopoolurl).removeAll()
0

保存一个产品

本节展示了如何将产品存储在 “pool” 中并获取引用。

>>> # create a prooduct and save it to a pool
... x = Product(description='save me in store')
... # add a tabledataset
... s1 = [('energy', [1, 4.4, 5.6], 'eV'), ('freq', [0, 43.2, 2E3], 'Hz')]
... x["Spectrum"] = TableDataset(data=s1)
... # create a product store
... pstore = ProductStorage(poolurl=demopoolurl)
... # see what is in it.
... pstore
ProductStorage( pool= {'demopool_mh': <LocalPool poolname=demopool_mh, poolurl=file:///tmp/demopool_mh, _classes={}, _urns={}, _tags={}>} )
>>> # save the product and get a reference back.
... prodref = pstore.save(x)
... # This gives detailed information of the product being referenced
... print(prodref)
ProductRef {urn:demopool_mh:fdi.dataset.product.Product:0
# Parents=[]
# meta=
============  ====================  ======  ========  =======  =================  ======  =====================
name          value                 unit    type      valid    default            code    description
============  ====================  ======  ========  =======  =================  ======  =====================
description   save me in store              string    None     UNKNOWN            B       Description of this p
                                                                                          roduct
type          Product                       string    None     Product            B       Product Type identifi
                                                                                          cation. Name of class
                                                                                           or CARD.
level         ALL                           string    None     ALL                B       Product level.
creator       UNKNOWN                       string    None     UNKNOWN            B       Generator of this pro
                                                                                          duct.
creationDate  1958-01-01T00:00:00.          finetime  None     1958-01-01T00:00:  Q       Creation date of this
              000000                                           00.000000                   product
              0                                                0
rootCause     UNKNOWN                       string    None     UNKNOWN            B       Reason of this run of
                                                                                           pipeline.
version       0.8                           string    None     0.8                B       Version of product
FORMATV       1.6.0.10                      string    None     1.6.0.10           B       Version of product sc
                                                                                          hema and revision
startDate     1958-01-01T00:00:00.          finetime  None     1958-01-01T00:00:  Q       Nominal start time  o
              000000                                           00.000000                  f this product.
              0                                                0
endDate       1958-01-01T00:00:00.          finetime  None     1958-01-01T00:00:  Q       Nominal end time  of
              000000                                           00.000000                  this product.
              0                                                0
instrument    UNKNOWN                       string    None     UNKNOWN            B       Instrument that gener
                                                                                          ated data of this pro
                                                                                          duct
modelName     UNKNOWN                       string    None     UNKNOWN            B       Model name of the ins
                                                                                          trument of this produ
                                                                                          ct
mission       _AGS                          string    None     _AGS               B       Name of the mission.
============  ====================  ======  ========  =======  =================  ======  =====================
MetaData-listeners = ListnerSet{}}
>>> # get the URN string
... urn = prodref.urn
... print(urn)    # urn:demopool_mh:fdi.dataset.product.Product:0
urn:demopool_mh:fdi.dataset.product.Product:0
>>> # re-create a product only using the urn
... newp = ProductRef(urn).product
... # the new and the old one are equal
... print(newp == x)   # == True
True

Context and MapContext

Context 是一个 Product ,其中包含一组可通过键访问的 ProductRef。键是 MapContext 的字符串,通常将名称映射到产品引用。

例子(来自 快速开始


本节展示了如何在上下文中存储产品引用的基本步骤。

>>> p1 = Product(description='p1')
... p2 = Product(description='p2')
... # create an empty mapcontext that can carry references with name labels
... map1 = MapContext(description='product with refs 1')
... # A ProductRef created with the syntax of a lone product argument will use a MemPool
... pref1 = ProductRef(p1)
... pref1
ProductRef(urnobj=Urn(urn="urn:defaultmem:fdi.dataset.product.Product:0", _STID="Urn"), _STID="ProductRef")
>>> # A productStorage with a LocalPool -- a pool on the disk.
... pref2 = pstore.save(p2)
... pref2.urn
'urn:demopool_mh:fdi.dataset.product.Product:1'
>>> # how many prodrefs do we have?
... map1['refs'].size()   # == 0
0
>>> # how many 'parents' do these prodrefs have before saved?
... len(pref1.parents)   # == 0
0
>>> len(pref2.parents)   # == 0
0
>>> # add a ref to the context. Every productref has a name in a MapContext
... map1['refs']['spam'] = pref1
... # add the second one
... map1['refs']['egg'] = pref2
... # how many prodrefs do we have?
... map1['refs'].size()   # == 2
2
>>> # parent list of the productref object now has an entry
... len(pref2.parents)   # == 1
1
>>> pref2.parents[0] == map1
True
>>> pref1.parents[0] == map1
True
>>> # remove a ref
... del map1['refs']['spam']
... map1.refs.size()   # == 1
1
>>> # how many prodrefs do we have?
... len(pref1.parents)   # == 0
0
>>> # add ref2 to another map
... map2 = MapContext(description='product with refs 2')
... map2.refs['also2'] = pref2
... map2['refs'].size()   # == 1
1
>>> # two parents
... len(pref2.parents)   # == 2
2
>>> pref2.parents[1] == map2
True

查询

可以对 ProductStorage 进行查询,并获取对满足搜索条件的产品的引用列表。可以使用有关产品及其元数据的 Python 谓词表达式或返回 True 或 False 的函数来构造查询。

例子(来自 快速开始


可以使用 Python 语法,利用标签、元数据中存储的属性甚至存储产品中的数据来查询附加了池的 ProductStorage

>>> # clean possible data left from previous runs
... poolname = 'fdi_pool_' + getpass.getuser()
... poolpath = '/tmp/' + poolname
... newpoolname = 'fdi_newpool_' + getpass.getuser()
... newpoolpath = '/tmp/' + newpoolname
... os.system('rm -rf ' + poolpath)
... os.system('rm -rf ' + newpoolpath)
... poolurl = 'file://' + poolpath
... newpoolurl = 'file://' + newpoolpath
... if PoolManager.isLoaded(DEFAULT_MEM_POOL):
...     PoolManager.getPool(DEFAULT_MEM_POOL).removeAll()
... PoolManager.getPool(poolname, poolurl).removeAll()
... PoolManager.getPool(newpoolname, newpoolurl).removeAll()
... # make a productStorage
... pstore = ProductStorage(poolurl=poolurl)
... # make another
... pstore2 = ProductStorage(poolurl=newpoolurl)
>>> # add some products to both storages. The product properties are different.
... n = 7
... for i in range(n):
...     # three counters for properties to be queried.
...     a0, a1, a2 = 'desc %d' % i, 'fatman %d' % (i*4), 5000+i
...     if i < 3:
...         # Product type
...         x = Product(description=a0, creator=a1)
...         x.meta['extra'] = Parameter(value=a2)
...     elif i < 5:
... ...
...         x.meta['time'] = Parameter(value=FineTime1(a2))
...     if i < 4:
...         # some are stored in one pool
...         r = pstore.save(x)
...     else:
...         # some the other
...         r = pstore2.save(x)
...     print(r.urn)
... # Two pools, 7 products in 3 types
... # [P P P C] [C M M]
urn:fdi_pool_mh:fdi.dataset.product.Product:0
urn:fdi_pool_mh:fdi.dataset.product.Product:1
urn:fdi_pool_mh:fdi.dataset.product.Product:2
urn:fdi_pool_mh:fdi.pal.context.Context:0
urn:fdi_newpool_mh:fdi.pal.context.Context:0
urn:fdi_newpool_mh:fdi.pal.context.MapContext:0
urn:fdi_newpool_mh:fdi.pal.context.MapContext:1
>>> # register the new pool above to the  1st productStorage
... pstore.register(newpoolname)
... len(pstore.getPools())   # == 2
2
>>> # make a query on product metadata, which is the variable 'm'
... # in the query expression, i.e. ``m = product.meta; ...``
... # But '5000 < m["extra"]' does not work. see tests/test.py.
... q = MetaQuery(Product, 'm["extra"] > 5000 and m["extra"] <= 5005')
... # search all pools registered on pstore
... res = pstore.select(q)
... # we expect [#2, #3] Contex is not a subclass of Product, which is being searched
... len(res)   # == 2
2
>>> # see
... [r.product.description for r in res]
['desc 1', 'desc 2']
>>> def t(m):
...     # query is a function
...     import re
...     # 'creator' matches the regex pattern: 'n' + ? + '1'
...     return re.match('.*n.1.*', m['creator'].value)
>>> q = MetaQuery(BaseProduct, t)
... res = pstore.select(q)
... # expecting [3,4]
... [r.product.creator for r in res]
['fatman 12', 'fatman 16']
>>>

运行测试

要测试基于本地 (JSON) 池和内存池的 PAL 功能,请在同一目录中运行:

make test2

要测试基于 http 客户端池的功能,请在一个终端运行

make runpoolserver

运行

make testhttp

检查输出

设计

../_images/packages_pal.png

../_images/classes_pal.png