# -*- coding: utf-8 -*-
from ..dataset.baseproduct import BaseProduct
from ..dataset.classes import Classes
from ..dataset.product import Product
from ..dataset.serializable import serialize
from ..dataset.deserialize import deserialize, Class_Look_Up
from .urn import Urn, parseUrn, parse_poolurl, makeUrn
from .versionable import Versionable
from .taggable import Taggable
from . import dicthk
from .definable import Definable
from ..utils.common import (fullname, lls, trbk, pathjoin,
logging_ERROR,
logging_WARNING,
logging_INFO,
logging_DEBUG
)
from .productref import ProductRef
from .query import AbstractQuery, MetaQuery, StorageQuery
from collections import OrderedDict, ChainMap
from functools import lru_cache
import logging
import filelock
import getpass
import functools
import os
import sys
import builtins
if sys.version_info[0] >= 3: # + 0.1 * sys.version_info[1] >= 3.3:
PY3 = True
from urllib.parse import urlparse
else:
PY3 = False
from urlparse import urlparse
# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' % (logger.getEffectiveLevel()))
Lock_Path_Base = '/tmp/fdi_locks_' + getpass.getuser()
# lock time-out
locktout = 10
[docs]def makeLockpath(direc, op='w'):
""" returns the appropriate path to put lock file.
creats the path if non-existing. Set lockpath-base permission to all-modify so other fdi users can use.
op: 'r' for readlock no-reading) 'w' for writelock (no-writing)
"""
if not os.path.exists(Lock_Path_Base):
os.makedirs(Lock_Path_Base, mode=0o777)
lp = pathjoin(Lock_Path_Base, direc.replace('/', '_'))
return lp+'.read' if op == 'r' else lp+'.write'
[docs]class ProductPool(Definable, Taggable, Versionable):
""" A mechanism that can store and retrieve Products.
A product pool should not be used directly by users. The general user should access data in a ProductPool through a ProductStorage instance.
When implementing a ProductPool, the following rules need to be applied:
1. Pools must guarantee that a Product saved via the pool saveProduct(Product) method is stored persistently, and that method returns a unique identifier (URN). If it is not possible to save a Product, an IOException shall be raised.
2. A saved Product can be retrieved using the loadProduct(Urn) method, using as the argument the same URN that assigned to that Product in the earlier saveProduct(Product) call. No other Product shall be retrievable by that same URN. If this is not possible, an IOException or GeneralSecurityException is raised.
3. Pools should not implement functionality currently implemented in the core package. Specifically, it should not address functionality provided in the Context abstract class, and it should not implement versioning/cloning support.
"""
[docs] def __init__(self, poolname='', poolurl='', **kwds):
"""
Creates and initializes a productpool.
* poolname: if provided will override that in poolurl.
* poolurl: needed to initialize.
"""
super().__init__(**kwds)
self.setPoolname(poolname)
self.setPoolurl(poolurl)
# self._pathurl = pr.netloc + pr.path
# self._pathurl = None
self._poolmanager = None
self.ignore_error_when_delete = False
[docs] class ParametersIncommpleteError(Exception):
pass
[docs] def setup(self):
""" Sets up interal machiney of this Pool,
but only if self._poolname and self._poolurl are present,
and other pre-requisits are met.
Subclasses should implement own setup(), and
make sure that self._poolname and self._poolurl are present with ``
if <pre-requisit not met>: return True
if super().setup(): return True
# super().setup() has done its things by now.
<do setup>
return False
``
returns: True if not both self._poolname and self._poolurl are present.
"""
if not hasattr(self, '_poolurl') or not self._poolurl:
return True
return False
@property
def poolname(self):
""" for property getter
"""
return self.getPoolname()
@poolname.setter
def poolname(self, poolname):
""" for property setter
"""
self.setPoolname(poolname)
[docs] def getPoolname(self):
""" Gets the poolname of this pool as an Object. """
return self._poolname
[docs] def setPoolname(self, poolname):
""" Replaces the current poolname of this pool.
"""
self._poolname = poolname
@property
def poolurl(self):
""" for property getter
"""
return self.getPoolurl()
@poolurl.setter
def poolurl(self, poolurl):
""" for property setter
"""
self.setPoolurl(poolurl)
def getPoolurl(self):
""" Gets the poolurl of this pool as an Object. """
return self._poolurl
[docs] def setPoolurl(self, poolurl):
""" Replaces the current poolurl of this pool.
"""
s = (not hasattr(self, '_poolurl') or not self._poolurl)
self._poolpath, self._scheme, self._place, \
self._poolname, self._username, self._password = \
parse_poolurl(poolurl)
self._poolurl = poolurl
# call setup only if poolurl was None
if s:
self.setup()
[docs] def accept(self, visitor):
""" Hook for adding functionality to object
through visitor pattern."""
visitor.visit(self)
[docs] def getPoolManager(self):
"""
"""
return self._poolmanager
[docs] def setPoolManager(self, pm):
"""
"""
self._poolmanager = pm
[docs] def dereference(self, ref):
"""
Decrement the reference count of a ProductRef.
XXX TODO
"""
raise (NotImplementedError)
[docs] def exists(self, urn):
"""
Determines the existence of a product with specified URN.
"""
raise (NotImplementedError)
[docs] def getDefinition(self):
"""
Returns pool definition info which contains pool type and other pool specific configuration parameters
"""
return super().getDefinition()
[docs] def getId(self):
"""
Gets the identifier of this pool.
"""
return self._poolname
[docs] def getPoolurl(self):
"""
Gets the pool URL of this pool.
"""
return self._poolurl
[docs] def getPlace(self):
"""
Gets the place of this pool.
"""
return self._place
[docs] def getProductClasses(self):
"""
Returns all Product classes found in this pool.
mh: returns an iterator.
"""
raise (NotImplementedError)
[docs] def getReferenceCount(self, ref):
"""
Returns the reference count of a ProductRef.
"""
raise (NotImplementedError)
[docs] def getScheme(self):
"""
Gets the scheme of this pool.
"""
return self._scheme
[docs] def getUrnId(self):
"""
Get the identifier of this pool used to build URN, usually it's same as id returned by getId().
"""
return self.getId()
[docs] @staticmethod
def vectorize(*p):
"""
::
vectorize(9, [8,7,6]) -> ([9, 9, 9], [8, 7, 6], True)
"""
lens = [len(v) if isinstance(v, (list, tuple)) else 0 for v in p]
sz = max(lens)
# remove redundant
s = set(lens)
alist = any(s)
# remove longest and scalar
s.remove(sz)
try:
s.remove(0)
except KeyError:
pass
if len(s):
# found more than 2 sizes
raise ValueError(f'Some args have different sizes {s}.')
if sz == 0:
# force scalar to vector
sz = 1
res = [q if l else ([q] * sz) for q, l in zip(p, lens)]
res.append(alist)
return tuple(res)
[docs] def isAlive(self):
"""
Test if the pool is capable of responding to commands.
"""
return True
[docs] def isEmpty(self):
"""
Determines if the pool is empty.
"""
raise NotImplementedError
[docs] def schematicSave(self, products, tag=None, geturnobjs=False, serialize_in=True, serialize_out=False, asyn=False, **kwds):
""" to be implemented by subclasses to do the scheme-specific saving
"""
raise (NotImplementedError)
[docs] def saveProduct(self, product, tag=None, geturnobjs=False, serialize_in=True, serialize_out=False, asyn=False, **kwds):
"""
Saves specified product(s) and returns the designated ProductRefs or URNs.
Saves a product or a list of products to the pool, possibly under the
supplied tag(s), and returns the reference (or a list of references if
the input is a list of products), or Urns if geturnobjs is True.
See pal document for pool structure.
Parameters
----------
product : BaseProduct, list
Product or a list of them or '[ size1, prd, size2, prd2, ...]'.
tag : str, list
If given a tag, all products will be having this tag.
If a list tags are given to every one product then the
number of tags must not be the same to that of `product`. If
they are equal, each tag is goven to the product at the same
index in the `product` list.
serialize_out : bool
if `True` returns contents in serialized form.
serialize_in : bool
If set, product input is serialized.
"""
res = self.schematicSave(product, tag=tag,
geturnobjs=geturnobjs,
serialize_in=serialize_in,
serialize_out=serialize_out,
asyn=asyn, **kwds)
if issubclass(product.__class__, str) or isinstance(product, list) and \
issubclass(product[0].__class__, str):
# p is urn string from server-side LocalPool
return res
if isinstance(res, list):
for p, u in zip(product, res):
p._urn = u if geturnobjs else u.getUrnObj()
else:
product._urn = res if geturnobjs else res.getUrnObj()
return res
[docs] def loadDescriptors(self, urn):
"""
Loads the descriptors belonging to specified URN.
"""
raise (NotImplementedError)
[docs] def schematicLoad(self, resourcetype, index,
start=None, end=None, serialize_out=False):
""" to be implemented by subclasses to do the scheme-specific loading
"""
raise (NotImplementedError)
[docs] def loadProduct(self, urn, serialize_out=False, asyn=False):
"""
Loads a Product belonging to specified URN.
serialize_out: if True returns contents in serialized form.
"""
poolname, resource, index = parseUrn(urn)
if poolname != self._poolname:
raise (ValueError('wrong pool: ' + poolname +
' . This is ' + self._poolname))
ret = self.schematicLoad(
resourcetype=resource, index=index, serialize_out=serialize_out)
if issubclass(ret.__class__, str) or isinstance(ret, list) and \
issubclass(ret[0].__class__, str):
# ret is a urn string from server-side LocalPool
return ret
if isinstance(ret, list):
logger.warning('TODO: unexpected')
for x, u in zip(ret, urn):
x._urn = u
else:
ret._urn = urn
return ret
@property
def count(self):
""" for property getter
"""
return self.getCount()
@count.setter
def count(self, count):
""" for property setter
"""
raise ValueError('Pool.count is read-only.')
[docs] def getCount(self, typename=None):
"""
Return the number of URNs for the product type.
"""
raise (NotImplementedError)
[docs] def reference(self, ref):
"""
Increment the reference count of a ProductRef.
"""
raise (NotImplementedError)
[docs] def schematicRemove(self, urn=None, resourcetype=None, index=None, asyn=False, **kwds):
""" to be implemented by subclasses to do the scheme-specific removing
"""
raise (NotImplementedError)
[docs] def remove(self, urn=None, resourcetype=None, index=None, ignore_error=False, asyn=False, **kwds):
"""
Removes a Product belonging to specified URN or a pair of data type and serial number.
"""
self.ignore_error_when_delete = ignore_error
res = self.schematicRemove(
urn, resourcetype=resourcetype, index=index, asyn=asyn, **kwds)
return res
[docs] def schematicWipe(self):
""" to be implemented by subclasses to do the scheme-specific wiping.
"""
raise (NotImplementedError)
[docs] def removeAll(self, ignore_error=False, asyn=False, **kwds):
"""
Remove all pool data (self, products) and all pool meta data (self, descriptors, indices, etc.).
"""
self.ignore_error_when_delete = ignore_error
return self.schematicWipe(asyn=asyn, **kwds)
[docs] def saveDescriptors(self, urn, desc):
"""
Save/Update descriptors in pool.
"""
raise (NotImplementedError)
[docs] def schematicSelect(self, query, previous=None):
"""
to be implemented by subclasses to do the scheme-specific querying.
"""
raise (NotImplementedError)
[docs] def select(self, query, variable='m', ptype=Product,
previous=None):
"""Returns a list of references to products that match the specified query.
Parameters
----------
query : str
the 'where' query string to make a query object.
variable : str
name of the dummy variable in the query string.
if `variable` is 'm', query goes via `MetaQuery(ptype, query)` ; else by `AbstractQuery(ptype, variable, query)` .
ptype : class
The class object whose instances are to be queried. Or
fragment of the name of such classes.
previous : list or str
of urns, possibly from previous search. or a string of comma-separated urns, e.g. `'urn:a:foo:12,urn:b:bar:9'`
Returns
-------
list
of found URNs.
"""
if issubclass(previous.__class__, str):
previous = previous.split(',')
if issubclass(query.__class__, StorageQuery):
res = self.schematicSelect(query, previous)
return res
if issubclass(ptype.__class__, str):
for cn, cls in Classes.mapping.items():
if ptype in cn and issubclass(cls, BaseProduct):
break
else:
raise (ValueError(ptype + ' is not a product type.'))
ptype = cls
if variable == 'm':
res = self.schematicSelect(MetaQuery(ptype, where=query), previous)
else:
res = self.schematicSelect(AbstractQuery(
ptype, where=query, variable=variable), previous)
return res
[docs] def qm(self, qw, prod='BaseProduct', urns=None):
""" short-hand method for `select(qw, variable'm', ptype=prod, previous=urns`.
example:
..code:
curl http://foo.edu:23456/data/pool/api/qm__m["age"]>66 and m["name"]=="Bob"'
"""
return self.select(qw, variable='m', ptype=prod, previous=urns)
def __repr__(self):
co = ', '.join(str(k) + '=' + lls(v, 40)
for k, v in self.__getstate__().items())
# co = ', '.join(str(k)+'=' + (v if issubclass(v.__class__, str) else
# '<' + v.__class__.__name__+'>') \
# for k, v in self.__getstate__().items())
return '<'+self.__class__.__name__ + ' ' + co + '>'
def __getstate__(self):
""" returns an odict that has all state info of this object.
Subclasses should override this function.
"""
return OrderedDict(
poolurl=self._poolurl if hasattr(self, '_poolurl') else None,
)
###########################
[docs]class PoolNotFoundError(Exception):
pass
def _eval(code='', m='', **kwds):
""" evaluate compiled code with given local vars. """
try:
res = eval(code)
except NameError:
res = False
logger.debug("Evaluation error: %s. Traceback %s" %
(str(e), trbk(e)))
return res
[docs]def populate_pool2(tags, ptype, sn=None, dTypes=None, dTags=None):
"""
tags : list
The tags in a list. `None` and empty tags are ignored.
ptype : str
The product name / datatype / class name of the data item, new or existing.
sn : str
Serial number. If is `None`, use the one in `dTypes`.
Returns
-------
tuple
dTypes and dTags with updates, and the index/serial number
"""
# new ###
if dTypes is None:
dTypes = {}
if dTags is None:
dTags = {}
if ptype in dTypes:
if sn is None:
int_sn = dTypes[ptype]['currentSN'] + 1
else:
int_sn = int(sn)
else:
int_sn = 0 if sn is None else int(sn)
dTypes[ptype] = {
'currentSN': int_sn,
'sn': {}
}
snd = dTypes[ptype]['sn']
_sn = str(int_sn)
if int_sn not in snd:
snd[int_sn] = {
'tags': [],
'meta': []
}
dTypes[ptype]['currentSN'] = int_sn
# /new #####
if tags is not None:
for t in tags:
dicthk.add_tag_datatype_sn(t, ptype, int_sn, dTypes, dTags)
return dTypes, dTags, _sn
# Do not include leading or trailing whitespace as they are not guarantteed.
MetaData_Json_Start = '{"_ATTR_meta":'
MetaData_Json_End = '"_STID": "MetaData"}'
[docs]class ManagedPool(ProductPool, dicthk.DictHk):
""" A ProductPool that manages its internal house keeping. """
def __init__(self, **kwds):
super().__init__(**kwds)
# {type|classname -> {'sn:[sn]'}}
[docs] def setup(self):
""" Sets up interal machiney of this Pool,
but only if self._poolname and self._poolurl are present,
and other pre-requisits are met.
Subclasses should implement own setup(), and
make sure that self._poolname and self._poolurl are present with ``
if <pre-requisit not met>: return True
if super().setup(): return True
# super().setup() has done its things by now.
<do setup>
return False
``
returns: True if not both self._poolname and self._poolurl are present.
"""
if super().setup():
return True
self._classes = dict()
# new ##
self._dTypes = dict()
return False
[docs] def getPoolpath(self):
"""
Gets the poolpath of this pool.
poolpath is usually derived from poolurl received from ``PoolManager`` during initialization.
"""
return self._poolpath
[docs] def lockpath(self, op='w'):
""" Make lock path using transformed poolname as name.
"""
return makeLockpath(self.transformpath(self._poolname), op)
[docs] def getCacheInfo(self):
info = {}
for i in ['transformpath']:
info[i] = getattr(self, i).cache_info()
return info
[docs] def dereference(self, ref):
"""
Decrement the reference count of a ProductRef.
"""
# new ###
poolname, dt, sn = parseUrn(urn, int_index=True)
# assert self._urns[ref.urn]['refcnt'] == self._dType[dt]['sn'][sn]['refcnt']
r = self._dType[dt]['sn'][sn]
if r['refcnt'] == 0:
raise ValueError('Cannot deref below 0.')
else:
r['refcnt'] -= 1
# /new ###
# self._urns[ref.urn]['refcnt'] -= 1
[docs] def exists(self, urn, resourcetype=None, index=None):
"""
Determines the existence of a product with specified URN.
"""
# new ###
try:
urn, datatype, sn = self.get_missing(
urn, resourcetype, index, no_check=True)
except KeyError:
return False
# return True
# /new#
return urn in self._urns
[docs] def getProductClasses(self):
"""
Returns all Product classes found in this pool.
mh: returns an iterator.
"""
# new ###
assert list(self._classes.keys()) == list(self._dTypes.keys())
return self._classes.keys()
[docs] def getCount(self, typename=None):
"""
Return the number of URNs for the product type.
"""
try:
# new ###
# assert len(self._classes[typename]['sn']) == len(
# self._dTypes[typename]['sn'])
if typename:
return len(self._dTypes[typename]['sn'])
else:
return sum(len(dt['sn']) for dt in self._dTypes.values())
except KeyError:
return 0
[docs] def doSave(self, resourcetype, index, data, tags=None, serialize_in=True, **kwds):
""" to be implemented by subclasses to do the action of saving
"""
raise (NotImplementedError)
[docs] def getReferenceCount(self, ref):
"""
Returns the reference count of a ProductRef.
"""
# new ###
poolname, dt, sn = parseUrn(urn, int_index=False)
_snd = self._dType[dt]['sn'][sn]
# assert self._urns[ref.urn]['refcnt'] == _snd['refcnt']
return _snd['refcnt']
# /new ###
# return self._urns[ref.urn]['refcnt']
[docs] def isEmpty(self):
"""
Determines if the pool is empty.
"""
# new ###
# assert len(self._urns) == len(self._dTypes)
# return len(self._urns) == 0
return len(self._dTypes) == 0
[docs] def loadDescriptors(self, urn, resourcetype=None, index=None):
"""
Loads the descriptors belonging to specified URN.
"""
# new ###
urn, datatype, sn = self.get_missing(urn, resourcetype, index)
return self._dTypes
# return self._urns[urn]
[docs] def readHK():
""" Subclass should overide this.
Returns
-------
tuple
Of 5 dicts that are the legacy `self._classes`, `self._tags`,
`self._urns`, and
`self._dTypes`, `self._dTags`
"""
raise NotImplementedError()
[docs] def reference(self, ref):
"""
Increment the reference count of a ProductRef.
"""
# new ###
poolname, dt, sn = parseUrn(ref.urn, int_index=True)
_snd = self._dType[dt]['sn'][sn]
if 'refcnt' not in _snd:
_snd['refcnt'] = 0
# assert self._urns[ref.urn]['refcnt'] == _snd['refcnt']
_snd['refcnt'] += 1
# /new ###
if 0:
if 'refcnt' not in self._urns:
self._urns['refcnt'] = 0
self._urns[ref.urn]['refcnt'] += 1
[docs] def saveOne(self, prd, tag, geturnobjs, serialize_in, serialize_out, res, **kwds):
"""
Save one product.
# get the latest HK
the new classes and tags.
dTypes combines old classes and urns, is defined as::
$class_name0:
'currentSN': $sn
'sn':
$sn0:
tags: [$tag1, $tag2, ...
meta: [$start, $end]
refcnt: $count
$sn1:
tags: [$tag1, $tag2, ...
meta: [$start, $end]
refcnt: $count
example::
'foo.bar.Bar':
'currentSN': 1
0:
'tags': ['cat', 'white']
'meta': [123, 456]
'refcnt': 0
1:
'tags': ['dog', 'white']
'meta': [321, 765]
'refcnt': 0
'foo.baz.Baz':
'currentSN': 34
34:
'tags': ['tree', 'green']
'meta': [100, 654]
'refcnt': 1
`dTags` differs from `tag` 1. uses dType:[sn], instead of urn, so there is no poolname anywhere, 2. simplify by removing second level dict::
$tag_name0:
$class_name1:[$sn1, $sn2...]
$class_name2:[$sn3, ...]
example::
'cat': { 'foo.bar.Bar':[0] }
'white': { 'foo.bar.Bar'; [0, 1] }
'dog': ...
:res: list of results.
:serialize_out: if True returns contents in serialized form.
Return
------
`list` of the following:
`ProductRef`. `Urn` if `geturnobjs` is set. if`serialze_out` is set for `ProductRef` no product metadata is stored in the returned instance.
The result is also stored in the `re` parameter.
"""
if serialize_in:
pn = fullname(prd)
cls = prd.__class__
else:
# prd is json. extract prod name
# '... "_STID": "Product"}]'
pn = prd.rsplit('"', 2)[1]
cls = Class_Look_Up[pn]
pn = fullname(cls)
with filelock.FileLock(self.lockpath('w')), \
filelock.FileLock(self.lockpath('r')):
# some new ####
self._classes, self._tags, self._urns, \
self._dTypes, self._dTags = tuple(
self.readHK().values())
# /some new ####
# if 0:
# c, t, u = self._classes, self._tags, self._urns
# if pn in c:
# sn = (c[pn]['currentSN'] + 1)
# else:
# sn = 0
# c[pn] = dict(sn=[])
# c[pn]['currentSN'] = sn
# c[pn]['sn'].append(sn)
# urn = makeUrn(poolname=self._poolname, typename=pn, index=sn)
# if urn not in u:
# u[urn] = {'tags': []}
# new+old NORMALIZE TAGS###
if tag is None:
tags = []
elif issubclass(tag.__class__, str):
tags = [tag]
elif issubclass(tag.__class__, list):
tags = tag
else:
raise TypeError('Bad type for tag: %s.' %
tag.__class__.__name__)
# new ####
self._dTypes, self._dTags, _sn = \
populate_pool2(tags, pn, sn=None,
dTypes=self._dTypes, dTags=self._dTags)
# if 0:
# if tag is None:
# tags = []
# elif issubclass(tag.__class__, str):
# tags = [tag]
# elif issubclass(tag.__class__, list):
# tags = tag
# else:
# raise TypeError('Bad type for tag: %s.' %
# tag.__class__.__name__)
# # new ####
# dTypes, dTags = self._dTypes, self._dTags
# if pn in dTypes:
# _sn = dTypes[pn]['currentSN'] + 1
# else:
# _sn = 0
# dTypes[pn] = {
# 'currentSN': _sn,
# 'sn': {}
# }
# snd = dTypes[pn]['sn']
# if _sn not in snd:
# snd[_sn] = {
# 'tags': [],
# 'meta': []
# }
# dTypes[pn]['currentSN'] = _sn
# urn = makeUrn(poolname=self._poolname, typename=pn, index=_sn)
# # /new #####
# for t in tags:
# self.setTag(t, urn)
urn = makeUrn(poolname=self._poolname, typename=pn, index=_sn)
try:
# save prod and HK
self.doSave(resourcetype=pn,
index=_sn,
data=prd,
tags=tags,
serialize_in=serialize_in,
serialize_out=serialize_out,
**kwds)
except ValueError as e:
msg = 'product ' + urn + ' saving failed.' + str(e) + trbk(e)
logger.debug(msg)
# some new ##
self._classes, self._tags, self._urns, \
self._dTypes, self._dTags = tuple(
self.readHK().values())
raise e
if geturnobjs:
if serialize_out:
# return the URN string.
res.append(urn)
else:
res.append(Urn(urn, poolurl=self._poolurl))
else:
rf = ProductRef(urn=Urn(urn, poolurl=self._poolurl),
poolmanager=self._poolmanager)
if serialize_out:
# return without meta
res.append(rf)
else:
# it seems that there is no better way to set meta
rf._meta = prd.getMeta()
res.append(rf)
[docs] def schematicSave(self, products, tag=None, geturnobjs=False, serialize_in=True, serialize_out=False, asyn=False, **kwds):
""" do the scheme-specific saving.
Parameters
----------
product : BaseProduct, list
Product or a list of them or '[ size1, prd, size2, prd2, ...]'.
tag : str, list
If given a tag, all products will be having this tag.
If a list tags are given to every one product then the
number of tags must not be the same to that of `product`. If
they are equal, each tag is goven to the product at the same
index in the `product` list.
serialize_out : bool
if `True` returns contents in serialized form.
serialize_in : bool
If set, product input is serialized.
Returns
-------
ProductRef: Product reference.
Urn: If `geturnobjs` is set.
str: If `serialze_out` is set, serialized form of `ProductRef` or `URN`.
list: `list` of the above of input is a list.
"""
res = []
alist = issubclass(products.__class__, list)
json_list = False
if alist:
if isinstance(tag, list) and len(tag) != len(products):
# make a list of tags to ','-separated tags
tag = ','.join(t for t in tag if t)
if isinstance(tag, str) or tag is None:
tag = [tag] * len(products)
if serialize_in:
if not alist:
prd = products
self.saveOne(prd, tag, geturnobjs,
serialize_in, serialize_out, res, **kwds)
else:
if asyn:
prd = products
self.asyncSave(prd, tag, geturnobjs,
serialize_in, serialize_out, res, **kwds)
else:
for prd, t in zip(products, tag):
# result is in res
self.saveOne(prd, t, geturnobjs,
serialize_in, serialize_out, res, **kwds)
else:
if alist:
raise TypeError('a list cannot go with False serialize-in.')
json_list = products.lstrip().startswith('[')
if not json_list:
prd = products
self.saveOne(prd, tag, geturnobjs,
serialize_in, serialize_out, res, **kwds)
else:
# parse '[ size1, prd, size2, prd2, ...]'
last_end = 1
productlist = []
comma = products.find(',', last_end)
while comma > 0:
length = int(products[last_end: comma])
productlist.append(length)
last_end = comma + 1 + length
prd = products[comma + 2: last_end+1]
self.saveOne(prd, tag, geturnobjs,
serialize_in, serialize_out, res, **kwds)
# +2 to skip the following ', '
last_end += 2
comma = products.find(',', last_end)
if logger.isEnabledFor(logging_DEBUG):
sz = 1 if not json_list and not alist else len(
products) if serialize_in else len(productlist)
logger.debug('%d product(s) generated %d %s: %s.' %
(sz, len(res), 'Urns ' if geturnobjs else 'prodRefs', lls(res, 200)))
if alist or json_list:
return serialize(res) if serialize_out else res
else:
return serialize(res[0]) if serialize_out else res[0]
[docs] def doLoad(self, resourcetype, index, start=None, end=None, serialize_out=False):
""" to be implemented by subclasses to do the action of loading
"""
raise (NotImplementedError)
[docs] def schematicLoad(self, resourcetype, index, start=None, end=None,
serialize_out=False):
""" do the scheme-specific loading
"""
with filelock.FileLock(self.lockpath('w')):
ret = self.doLoad(resourcetype=resourcetype,
index=index, start=start, end=end,
serialize_out=serialize_out)
return ret
[docs] def doRemove(self, resourcetype, index, ayn=False):
""" to be implemented by subclasses to do the action of reemoving
"""
raise (NotImplementedError)
[docs] def schematicRemove(self, urn=None, resourcetype=None, index=None, asyn=False, **kwds):
""" do the scheme-specific removing
"""
urn, datatype, sn = self.get_missing(
urn, resourcetype, index, no_check=True)
with filelock.FileLock(self.lockpath('w')),\
filelock.FileLock(self.lockpath('r')):
# get the latest HK
# some new ####
self._classes, self._tags, self._urns, \
self._dTypes, self._dTags = tuple(
self.readHK().values())
# c, t, u = self._classes, self._tags, self._urns
# if urn not in u:
# raise ValueError(
# '%s not found in pool %s.' % (urn, self.getId()))
datatypes, sns, alist = ProductPool.vectorize(datatype, sn)
self.removeUrn(urn, datatype=datatype, sn=sn)
if 0:
n = max(lr, li)
if lr and li and lr != li:
raise TypeError(
f'Two args have different sizes {lr}, {li}.')
alist = lr or li
if n == 0:
n = 1
datatypes = datatype if lr else [datatype] * n
sns = sn if li else [sn] * n
res = self.doRemove(resourcetype=datatypes, index=sns, asyn=asyn)
res1 = res if alist else [res]
for i, r in enumerate(res1):
if r is None:
msg = f'product {urn[i]} removal failed.'
if isinstance(self, (LocalPool, MemPool, HTTPClientpool)):
self._classes, self._tags, self._urns, \
self._dTypes, self._dTags = tuple(
self.readHK().values())
if getattr(self, 'ignore_error_when_delete', False):
raise
else:
logger.warning(msg)
# can only do one at a time
break
elif isinstance(self, (PublicClientPool)):
self.getPoolInfo(update_hk=True)
return res if alist else res[0]
[docs] def doWipe(self):
""" to be implemented by subclasses to do the action of wiping.
"""
raise (NotImplementedError)
[docs] def schematicWipe(self, asyn=False):
""" do the scheme-specific wiping
"""
with filelock.FileLock(self.lockpath('w')),\
filelock.FileLock(self.lockpath('r')):
# self._classes.clear()
# self._tags.clear()
# self._urns.clear()
# new ##
self._dTypes.clear()
self._dTags.clear()
# /new ##
if asyn:
res = self.doAsyncWipe()
else:
try:
res = self.doWipe()
except ValueError as e:
msg = f'Wiping {self.poolname} failed. {e} traceback: {trbk(e)}'
if getattr(self, 'ignore_error_when_delete', False):
logger.warning(msg)
else:
raise
return res
[docs] def prod_filter(self, q, cls=None, reflist=None, urnlist=None, snlist=None, datatypes=None):
""" returns filtered collection using the query.
q: an AbstractQuery.
valid inputs: cls and ns list; productref list; urn list; datatypes dict.
:cls: type. data type
:reflist: list of ProductRefs
:urnlist: list of URNs
:datatypes: dict of {cls:sn_list}
"""
ret = []
# will add query variable (e.g. 'p') to Global name space
glbs = globals()
qw = q.getWhere()
var = q.getVariable()
if var in glbs:
savevar = glbs[var]
else:
savevar = 'not in glbs'
if reflist:
if isinstance(qw, str):
code = compile(qw, 'qw.py', 'eval')
for ref in reflist:
glbs[var] = pref.getProduct()
if _eval(code=code, m=m):
ret.append(ref)
if savevar != 'not in glbs':
glbs[var] = savevar
return ret
else:
for ref in reflist:
glbs[var] = pref.getProduct()
if qw(m):
ret.append(ref)
if savevar != 'not in glbs':
glbs[var] = savevar
return ret
elif urnlist:
if isinstance(qw, str):
code = compile(qw, 'qw.py', 'eval')
for urn in urnlist:
pref = ProductRef(urn=urn, poolmanager=self._poolmanager)
glbs[var] = pref.getProduct()
if _eval(code=code):
ret.append(pref)
if savevar != 'not in glbs':
glbs[var] = savevar
return ret
else:
for urn in urnlist:
pref = ProductRef(urn=urn, poolmanager=self._poolmanager)
glbs[var] = pref.getProduct()
if qw(glbs[var]):
ret.append(pref)
if savevar != 'not in glbs':
glbs[var] = savevar
return ret
elif snlist or datatypes:
if isinstance(qw, str):
code = compile(qw, 'qw.py', 'eval')
if snlist:
datatypes = {cls.__name__: snlist}
for typename in datatypes:
snlist = datatypes[typename]
cls = Class_Look_Up[typename.rsplit('.', 1)[-1]]
for n in snlist:
urno = Urn(cls=cls, poolname=self._poolname, index=n)
pref = ProductRef(
urn=urno, poolmanager=self._poolmanager)
glbs[var] = pref.getProduct()
if _eval(code=code):
ret.append(pref)
if savevar != 'not in glbs':
glbs[var] = savevar
return ret
else:
if snlist:
datatypes = {cls.__name__: snlist}
for typename in datatypes:
snlist = datatypes[typename]
cls = glbs[typename]
for n in snlist:
urno = Urn(cls=cls, poolname=self._poolname, index=n)
pref = ProductRef(
urn=urno, poolmanager=self._poolmanager)
glbs[var] = pref.getProduct()
if qw(glbs[var]):
ret.append(pref)
if savevar != 'not in glbs':
glbs[var] = savevar
return ret
else:
raise ('Must give a list of ProductRef or urn or sn')
[docs] def where(self, qw, prod='BaseProduct', urns=None):
q = AbstractQuery(prod, 'p', qw)
# if urns is None:
# new ###
datatypes = dict((k, list(v['sn'].keys()))
for k, v in self._dTypes.items())
if 0:
urns = self._urns.keys()
res = self.prod_filter(q, prod, urnlist=urns)
# new ###
res2 = self.prod_filter(q, prod, datatypes=datatypes)
# assert [r.urn for r in res] == [r.urn for r in res2]
return [r.urn for r in res2]
[docs] def doSelect(self, query, previous=None):
"""
to be implemented by subclasses to do the action of querying.
"""
raise (NotImplementedError)
[docs] def schematicSelect(self, query, previous=None):
"""
do the scheme-specific querying.
"""
is_MetaQ = issubclass(query.__class__, MetaQuery)
is_AbstQ = issubclass(query.__class__, AbstractQuery)
if not is_MetaQ and not is_AbstQ:
raise TypeError('not a Query')
lgb = Classes.mapping
t, v, w, a = query.getType(), query.getVariable(
), query.getWhere(), query.retrieveAllVersions()
ret = []
if previous:
this = (x for x in previous if x.urnobj.getPoolId()
== self._poolname)
if is_MetaQ:
ret += self.meta_filter(q=query, reflist=this)
else:
ret += self.prod_filter(q=query, reflist=this)
else:
# new ##
# assert list(self._dTypes) == list(self._classes)
for cname in self._dTypes:
cls = lgb[cname.rsplit('.', 1)[-1]]
if issubclass(cls, t):
# snlist = self._classes[cname]['sn']
# new ###
# assert snlist == list(self._dTypes[cname]['sn'])
snlist = list(self._dTypes[cname]['sn'])
if is_MetaQ:
ret += self.meta_filter(q=query, typename=cname,
snlist=snlist)
else:
ret += self.prod_filter(q=query, cls=cls,
snlist=snlist)
return ret
def __repr__(self):
# co = ', '.join(str(k) + '=' + lls(v, 40)
# for k, v in self.__getstate__().items())
co = ', '.join(str(k)+'=' + (v if issubclass(v.__class__, str) else
f'< {v.__class__.__name__} {len(v)} >')
for k, v in self.__getstate__().items())
return '<'+self.__class__.__name__ + ' ' + co + '>'
def __getstate__(self):
""" returns an odict that has all state info of this object.
Subclasses should override this function.
"""
return OrderedDict(
poolname=getattr(self, '_poolname', 'unknown'),
poolurl=getattr(self, '_poolurl', 'unknown'),
_dTypes=self._dTypes,
_dTags=self._dTags,
)