# -*- coding: utf-8 -*-
from . import productref
from .poolmanager import PoolManager
from .productpool import ProductPool, makeLockpath
from .urn import Urn
from ..dataset.odict import ODict
import filelock
from weakref import finalize
import logging
# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' % (logger.getEffectiveLevel()))
[文档]class ProductStorage(object):
""" Logical store created from a pool or a poolURL.
Every instanciation with the same pool will result in a new instance of ProdStorage.
"""
[文档] def __init__(self, pool=None, poolurl=None, **kwds):
""" Gets the storage "control pannel" for pool with specifed name.
pool: if is a string will be taken as a poolname. if is a pool object will be registered with its name,
poolurl: is sent to the PoolManager with poolname to get the pool object.
"""
if issubclass(pool.__class__, str) and ':' in pool:
raise TypeError(
'First argument must be a poolname or a pool object, not ' + str(pool))
super(ProductStorage, self).__init__()
self._pools = ODict() # dict of poolname - poolobj pairs
self.register(pool=pool, poolurl=poolurl, **kwds)
[文档] def register(self, poolname=None, poolurl=None, pool=None, **kwds):
""" Registers the given pools to the storage.
"""
if issubclass(pool.__class__, str) and poolname is None:
pool, poolname = poolname, pool
with filelock.FileLock(makeLockpath('ProdStorage', 'w')), \
filelock.FileLock(makeLockpath('ProdStorage', 'r')):
if pool and issubclass(pool.__class__, ProductPool):
_p = PoolManager.getPool(pool=pool, **kwds)
elif poolurl is None and poolname is None:
# quietly return for no-arg construction case
return
else:
if poolname is not None and not issubclass(poolname.__class__, str):
raise TypeError('Poolname must be a string, not ' +
poolname.__class__.__name__)
if poolurl is not None and not issubclass(poolurl.__class__, str):
raise TypeError('Poolurl must be a string, not ' +
poolurl.__class__.__name__)
_p = PoolManager.getPool(
poolname=poolname, poolurl=poolurl, **kwds)
self._pools[_p._poolname] = _p
logger.debug('registered pool %s -> %s.' %
(str(pool), str(self._pools)))
[文档] def unregister(self, pool=None, **kwds):
""" Unregisters the given pools to the storage.
"""
with filelock.FileLock(makeLockpath('ProdStorage', 'w')):
if issubclass(pool.__class__, ProductPool):
poolname = pool.getId()
else:
poolname = pool
if PoolManager.isLoaded(poolname):
# remove frpm pool manager
res = PoolManager.remove(poolname) # TODO i dentify self
# do this after del above
del self._pools[poolname]
logger.debug('unregistered pool %s -> %s.' %
(str(pool), str(self._pools)))
else:
logger.info('Pool %s is not registered.' % poolname)
return
[文档] def unregisterAll(self):
PoolManager.removeAll()
self._pools.clear()
[文档] def load(self, urnortag):
""" Loads a product with a URN or a list of products with a tag, from the (writeable) pool.
It always creates new ProductRefs.
:return: productref if there is only one. A ```list``` of ```ProductRefs```.
urnortag: urn or tag
"""
poolname = self.getWritablePool()
def runner(urnortag):
if issubclass(urnortag.__class__, list):
ulist = list(map(runner, urnortag))
return ulist
else:
if issubclass(urnortag.__class__, str):
if len(urnortag) > 3 and urnortag[0:4].lower() == 'urn:':
urns = urnortag
else:
urns = self.getUrnFromTag(urnortag)
ret = []
for x in urns:
pr = productref.ProductRef(
urn=x, poolname=poolname)
ret.append(pr)
return ret
elif issubclass(urnortag.__class__, Urn):
urns = urnortag.urn
else:
raise ValueError(
'must provide urn, urnobj, tags, or lists of them')
return productref.ProductRef(urn=urns, poolname=poolname)
ls = runner(urnortag=urnortag)
# return a list only when more than one refs
return ls # if len(ls) > 1 else ls[0]
[文档] def save(self, product, tag=None, poolname=None, geturnobjs=False, **kwds):
""" saves to the writable pool if it has been registered.
product: can be one or a list of prpoducts.
poolName: if the named pool is not registered, registers and saves.
geturnobjs: mh: returns UrnObjs if geturnobjs is True.
kwds: options passed to json.dump() for localpools.
Returns: one or a list of productref with storage info.
"""
if poolname is None:
if len(self._pools) > 0:
poolname = self.getWritablePool()
else:
raise ValueError('no pool registered')
elif poolname not in self._pools:
self.register(poolname)
desc = [x.description for x in product] if issubclass(
product.__class__, list) else product.description
logger.debug('saving product:' + str(desc) +
' to pool ' + str(poolname) + ' with tag ' + str(tag))
try:
ret = self._pools[poolname].saveProduct(
product, tag=tag, geturnobjs=geturnobjs,
**kwds)
except Exception as e:
logger.error('unable to save to the writable pool.')
raise
return ret
[文档] def remove(self, urn):
""" removes product of urn from the writeable pool
"""
poolname = self.getWritablePool()
logger.debug('removing product:' + str(urn) +
' from pool ' + str(poolname))
try:
self._pools[poolname].remove(urn)
except Exception as e:
logger.error('unable to remove from the writable pool.')
raise e
[文档] def accept(self, visitor):
""" Hook for adding functionality to object
through visitor pattern."""
visitor.visit(self)
[文档] def getHead(self, ref):
""" Returns the latest version of a given product, belonging
to the first pool where the same track id is found.
"""
raise NotImplementedError()
[文档] def getPools(self):
""" Returns the set of ProductPools registered.
mh: in a list of poolnames
"""
return list(self._pools.keys())
[文档] def getPool(self, poolname):
""" mh: returns the pool object from poolname
"""
if poolname not in self._pools:
msg = 'pool ' + poolname + ' not found'
logger.error(msg)
raise NameError(msg)
return self._pools[poolname]
[文档] def getWritablePool(self):
""" returns the poolname of the first pool, which is the only writeable pool.
"""
return self.getPools()[0]
[文档] def getProductClasses(self, poolname):
""" Yields all Product classes found in this pool.
"""
return self._pools[poolname].getProductClasses()
[文档] def getUrnFromTag(self, tag):
""" Get the URN belonging to the writable pool that is associated
to a given tag.
"""
return self._pools[self.getWritablePool()].getUrn(tag)
[文档] def wipePool(self):
""" Clear all data and meta data of the writable pool.
"""
list(self._pools.values())[0].removeAll()
[文档] def select(self, query, previous=None):
""" Returns a list of URNs to products that match the specified query.
Parameters:
query - the query object
previous - results to be refined
Returns:
the set of return eferences to products matching the supplied query.
"""
ret = []
# search all registered pools
for poolnm, pool in self._pools.items():
ret += pool.select(query, previous)
return ret
def __getstate__(self):
""" Can be encoded with serializableEncoder """
return OrderedDict(writablePool=self.getWritablePool())
def __repr__(self):
return self.__class__.__name__ + '( pool= ' + str(self._pools if hasattr(self, '_pools') else None) + ' )'