Source code for fdi.pal.poolmanager


# -*- coding: utf-8 -*-
import logging
from weakref import WeakValueDictionary, getweakrefcount
import getpass

from ..utils.getconfig import getConfig
from ..utils.common import lls
from .urn import parse_poolurl
from ..pal.httppool import HttpPool

from requests.exceptions import ConnectionError
import requests

# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' %  (logger.getEffectiveLevel()))

pc = getConfig()

DEFAULT_MEM_POOL = 'defaultmem'
# localpool
DEFAULT_POOL = 'fdi_pool_' + __name__ + getpass.getuser()
Invalid_Pool_Names = ['pools', 'urn', 'URN', 'api']


[docs]def remoteRegister(pool): """ if registered a pool's auth and client will be used. Note that a new http/csdb pool gets remoteRegistered before locally registered. Parameter --------- pool : HttpClientPool, PublicClientPool Pool object to be registered on remote server and have client/session set. auth : object Authorization object for the client. If given will substitute that of pool, if pool has auth. client : flask.requests (testing), or requests.Session The client. If given will substitute that of pool, if pool is given """ # pool object poolo = None from ..pal import httpclientpool, publicclientpool if issubclass(pool.__class__, httpclientpool.HttpClientPool): # HttpClientPool. Init the remote pool. If exists, load.d poolurl = pool._poolurl poolo = pool logger.debug('Register %s on the server', poolurl) if poolurl.endswith('/'): poolurl = poolurl[:-1] from ..pns.fdi_requests import put_on_server try: res, msg = put_on_server( 'urn:::0', poolurl, 'register_pool', auth=poolo.auth, client=poolo.client) except ConnectionError as e: res, msg = 'FAILED', str(e) logger.error(poolurl + ' ' + msg) raise if res == 'FAILED': np = '<' + poolo.auth.username + ' ' + poolo.auth.password + \ '>' if poolo.auth else '<no authorization>' raise RuntimeError( 'Registering ' + poolurl + ' failed with auth ' + np + ' , ' + msg) return res, msg elif issubclass(pool.__class__, publicclientpool.PublicClientPool): from ..pns.fdi_requests import ServerError # register csdb pool. If existing, load. IF not exists, create and initialize sn. poolurl = pool._poolurl try: res = pool.createPool2() pool.getToken() pool.client.headers.update({'X-AUTH-TOKEN': pool.token}) msg = 'New pool made.' pool.poolInfo = pool.getPoolInfo() res = 'OK' except ServerError as e: # if e.code == 1: # msg = 'Pool exists or bad namw.' msg = str(e) logger.error( 'Registering ' + poolurl + ' failed with auth ' + 'np' + ' , ' + msg) res = 'FAILED' raise return res, msg else: return
[docs]def remoteUnregister(poolurl, auth=None, client=None): """ this method does not reference pool object. """ if not poolurl.lower().startswith('http'): logger.warning('Ignored: %s not for a remote pool.' % poolurl) return 1 logger.debug('unregister %s on the server', poolurl) # check if poolurl has been registered for pool, poolo in PoolManager._GlobalPoolList.items(): if issubclass(poolo.__class__, HttpPool): continue if poolurl == poolo._poolurl: if client is None: client = poolo.client if auth is None: auth = poolo.auth break else: raise NameError( f'Remote Unregistering failed. {poolurl} not registered or not suitable.') from ..pns.fdi_requests import delete_from_server # url = api_baseurl + post_poolid # x = requests.delete(url, auth=HTTPBasicAuth(auth_user, auth_pass)) # o = deserialize(x.text) urn = 'urn:::0' try: res, msg = delete_from_server( urn, poolurl, 'unregister_pool', auth=auth, client=client) except ConnectionError as e: res, msg = 'FAILED', str(e) if res == 'FAILED': msg = f'Unregistering {poolurl} failed. {msg}' if getattr(poolo, 'ignore_error_when_delete', False): logger.info('Ignored: ' + msg) code = 2 else: raise ValueError(msg) else: code = 0 return code
[docs]class PoolManager(object): """ This class provides the means to reference ProductPool objects without having to hard-code the type of pool. For example, it could be desired to easily switch from one pool type to another. This is done by calling the getPool() method, which will return an existing pool or create a new one if necessary. """ _GlobalPoolList = WeakValueDictionary() """ Global centralized dict that returns singleton -- the same -- pool for the same ID.""" # maps scheme to default place/poolpath # pc['host']+':'+str(pc['port'])+pc['baseurl'] p = getConfig('poolurl:').strip('/').split('://')[1] PlacePaths = { 'file': pc['base_local_poolpath'], 'mem': '/', 'http': p, 'https': p, 'server': pc['server_local_poolpath'], 'csdb': pc['cloud_api_version'] } del p
[docs] @classmethod def getPool(cls, poolname=None, poolurl=None, pool=None, makenew=True, auth=None, client=None, **kwds): """ returns an instance of pool according to name or path of the pool. Returns the pool object if the pool is registered. Creates the pool if it does not already exist. the same poolname-path always get the same pool. Http pools will be registered on the server side. Pools registered are kept as long as the last reference remains. When the last is gone the pool gets :meth;`removed` d. Parameter --------- poolname : str name of the pool. poolurl : str If given the poolpath, scheme, place will be derived from it. if not given for making a new pool (i.e. when poolname is not a registered pool name. If poolname is missing it is derived from poolurl; if poolurl is also absent, ValueError will be raised. pool: ProductPool If `auth` and `client` are given they will substitute those of `pool`. If `pool` is not given, those will need to be given. makenew : bool When the pool does not exist, make a new one (````True```; default) or throws `PoolNotFoundError` (```False```). auth : str For `remoteRegister`. client : default is `None`. For `remoteRegister`. kwds : dict Passed to pool instanciation arg-list. Returns ------- ProductPool: The pool object. """ # logger.debug('GPL ' + str(id(cls._GlobalPoolList)) + # str(cls._GlobalPoolList) + ' PConf ' + str(cls.PlacePaths)) if pool: if poolname: raise ValueError( 'Pool name %s and pool object cannot be both given.' % poolname) poolname, poolurl = pool._poolname, pool._poolurl if cls.isLoaded(poolname): return cls._GlobalPoolList[poolname] if poolurl.lower()[:4] in ('http', 'csdb'): if auth is not None and getattr(pool, 'auth', None) is None: pool.auth = auth if client is not None and getattr(pool, 'client', None) is None: from ..httppool.session import requests_retry_session pool.client = requests_retry_session() res, msg = remoteRegister(pool) p = pool else: # quick decisions can be made knowing poolname only if poolname == DEFAULT_MEM_POOL: if not poolurl: poolurl = 'mem:///' + poolname if poolname is not None: if poolname in Invalid_Pool_Names: raise ValueError( 'Cannot register invalid pool name: ' + poolname) if cls.isLoaded(poolname): return cls._GlobalPoolList[poolname] # get poolname and scheme if not poolurl: poolurl = getConfig('poolurl:'+poolname) if poolurl: pp, schm, pl, pn, un, pw = parse_poolurl(poolurl) else: raise ValueError( 'A new pool %s cannot be created without a pool url. Maybe the pool needs to be registered?' % poolname) if poolname: if pn != poolname: raise ValueError( f'Poolname in poolurl {poolurl} is different from poolname {poolname}.') else: poolname = pn # now we have scheme, poolname, poolurl if poolname in Invalid_Pool_Names: raise ValueError( 'Cannot register invalid pool name: ' + poolname) if cls.isLoaded(poolname): return cls._GlobalPoolList[poolname] if schm == 'file': from . import localpool p = localpool.LocalPool( poolname=poolname, poolurl=poolurl, makenew=makenew, **kwds) elif schm == 'mem': from . import mempool p = mempool.MemPool(poolname=poolname, poolurl=poolurl, **kwds) elif schm == 'server': from . import httppool p = httppool.HttpPool( poolname=poolname, poolurl=poolurl, **kwds) elif schm in ('http', 'https', 'csdb'): if schm == 'csdb': from . import publicclientpool pooltype = publicclientpool.PublicClientPool purl = pc['scheme'] + poolurl[4:] else: from . import httpclientpool pooltype = httpclientpool.HttpClientPool purl = poolurl if auth is None: from requests.auth import HTTPBasicAuth auth = HTTPBasicAuth(pc['username'], pc['password']) if client is None: from ..httppool.session import requests_retry_session client = requests_retry_session() p = pooltype(poolname=poolname, poolurl=purl, auth=auth, client=client, **kwds) res, msg = remoteRegister(p) else: raise NotImplementedError(schm + ':// is not supported') # print(getweakrefs(p), id(p), '////') cls.save(poolname, p) # print(getweakrefs(p), id(p)) # Pass poolurl to PoolManager.remove() for remote pools # finalize(p, print, poolname, poolurl) logger.debug('made pool ' + lls(p, 900)) return p
[docs] @ classmethod def getMap(cls): """ Returns a poolname - poolobject map. """ return cls._GlobalPoolList
[docs] @ classmethod def isLoaded(cls, poolname): """ Whether an item with the given id has been loaded (cached). :returns: the number of remaining week references if the pool is loaded. Returns 0 if poolname is not found in _GlobalPoolList or weakref count is 0. """ if poolname in cls._GlobalPoolList: # print(poolname, getweakrefcount(cls._GlobalPoolList[poolname])) return getweakrefcount(cls._GlobalPoolList[poolname]) else: return 0
[docs] @ classmethod def removeAll(cls, ignore_error=False): """ deletes all pools from the pool list, pools not wiped """ nl = list(cls._GlobalPoolList) for pool in nl: cls.remove(pool, ignore_error=ignore_error)
[docs] @ classmethod def save(cls, poolname, poolobj): """ """ cls._GlobalPoolList[poolname] = poolobj poolobj.setPoolManager(cls)
[docs] @ classmethod def remove(cls, poolname, ignore_error=False): """ Remove from list and unregister remote pools. returns 0 for successful removal, ``1`` for poolname not registered or referenced, still attempted to remove. ``> 1`` for the number of weakrefs the pool still have, and removing failed. """ # number of weakrefs nwr = cls.isLoaded(poolname) # print(getweakrefs(cls._GlobalPoolList[poolname]), id( # cls._GlobalPoolList[poolname]), '......', nwr) if nwr == 1: # this is the only reference. unregister remote first. thepool = cls._GlobalPoolList[poolname] poolurl = thepool._poolurl from ..pal.httpclientpool import HttpClientPool if issubclass(thepool.__class__, HttpClientPool): code = remoteUnregister(poolurl) else: code = 0 elif nwr > 1: # nothing needs to be done. weakref number will decrement after Storage deletes ref return nwr else: # nwr <= 0 code = 1 try: pool = cls._GlobalPoolList.pop(poolname) pool.setPoolManager(None) except KeyError as e: if ignore_error: logger.info("Ignored: "+str(e)) else: raise return code
[docs] @ classmethod def getPoolurlMap(cls): """ Gives the default poolurls of PoolManager. """ return cls.PlacePaths
[docs] @ classmethod def setPoolurlMap(cls, new): """ Sets the default poolurls of PoolManager. """ cls.PlacePaths.clear() cls.PlacePaths.update(new)
[docs] @ classmethod def size(cls): """ Gives the number of entries in this manager. """ return len(cls._GlobalPoolList)
[docs] def items(self): """ Returns map's items """ return self._GlobalPoolList.items()
def __setitem__(self, poolname, poolobj): """ sets value at key. """ self._GlobalPoolList.__setitem__(poolname, poolobj) poolobj.setPoolManager(None, self.__class__) def __getitem__(self, *args, **kwargs): """ returns value at key. """ return self._GlobalPoolList.__getitem__(*args, **kwargs) def __delitem__(self, poolname): """ removes value and its key. """ self._GlobalPoolList[poolname].setPoolManager(None) self._GlobalPoolList.__delitem__(poolname) def __len__(self, *args, **kwargs): """ size of data """ return self._GlobalPoolList.__len__(*args, **kwargs) def __iter__(self, *args, **kwargs): """ returns an iterator """ return self._GlobalPoolList.__iter__(*args, **kwargs) def __repr__(self): return self.__class__.__name__ + '(' + str(self._GlobalPoolList) + ')'