# -*- coding: utf-8 -*-
from .productpool import ManagedPool, PoolNotFoundError, MetaData_Json_Start, MetaData_Json_End
from ..utils.common import pathjoin, trbk, find_all_files
from .dicthk import HKDBS
from .urn import makeUrn, Urn, parseUrn
from ..dataset.deserialize import deserialize
import tarfile
import filelock
from functools import lru_cache
import sys
import shutil
import mmap
import io
import os
from os import path as op
import logging
# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' % (logger.getEffectiveLevel()))
if sys.version_info[0] >= 3: # + 0.1 * sys.version_info[1] >= 3.3:
PY3 = True
strset = str
from urllib.parse import urlparse, quote, unquote
else:
PY3 = False
strset = (str, unicode)
from urlparse import urlparse, quote, unquote
[docs]def wipeLocal(path):
"""
does the scheme-specific remove-all.
A new directory at `path` will be created.
"""
# logger.debug()
if path == '/':
raise (ValueError('Do not remove root directory.'))
try:
if op.exists(path):
shutil.rmtree(path)
os.makedirs(path)
except Exception as e:
msg = 'remove-mkdir failed. exc: %s trbk: %s.' % (str(e), trbk(e))
logger.error(msg)
raise e
[docs]class LocalPool(ManagedPool):
""" the pool will save all products in local computer.
:makenew: when the pool does not exist, make a new one (````True```; default) or throws `PoolNotFoundError` (```False```).
"""
[docs] def __init__(self, makenew=True, **kwds):
""" creates file structure if there isn't one. if there is, read and populate house-keeping records. create persistent files if not exist.
"""
# print(__name__ + str(kwds))
self._makenew = makenew # must preceed setup() in super
super().__init__(**kwds)
[docs] def setup(self):
""" Sets up LocalPool interals.
Make sure that self._poolname and self._poolurl are present.
"""
if super().setup():
return True
real_poolpath = self.transformpath(self._poolname)
if not op.exists(real_poolpath):
if self._makenew:
os.makedirs(real_poolpath)
else:
raise PoolNotFoundError('poolname: %r poolurl: %r real_poolpath: %r' % (
self._poolname, self._poolurl, real_poolpath))
self._files = {}
self._atimes = {}
self._cached_files = {}
c, t, u, dTypes, dTags = tuple(self.readHK().values())
logger.debug('created ' + self.__class__.__name__ + ' ' + self._poolname +
' at ' + real_poolpath + ' HK read.')
self._classes.update(c)
self._tags.update(t)
self._urns.update(u)
# new ####
if len(self._dTypes) or len(self._dTags):
raise ValueError('self._dTypes or self._dTags not empty %s %s' % (
str(self._dTypes), str(self._dTags)))
self._dTypes.update(dTypes)
self._dTags.update(dTags)
# /new ###
if any(not op.exists(op.join(real_poolpath, hk+'.jsn'))
for hk in HKDBS):
self.writeHK(real_poolpath)
return False
[docs] def readmmap(self, filename, start=None, end=None, close=False, check_time=False):
fp = op.abspath(filename)
if check_time:
sr = os.stat(fp)
if fp in self._atimes and (sr.st_mtime_ns <= self._atimes[fp]):
# file hasnot changed since last time we read/wrote it.
return None
try:
if 1: # if fp not in self._files or self._files[fp] is None:
file_obj = open(fp, mode="r+", encoding="utf-8")
mmap_obj = mmap.mmap(
file_obj.fileno(), length=0, access=mmap.ACCESS_READ)
fo = mmap_obj
else:
fo = self._files[fp]
if start is None:
js = fo.read()
else:
fo.seek(start)
js = fo.read(end - start)
except Exception as e:
msg = 'Error in HK reading. file: %s. exc: %s trbk: %s.' % (
fp, str(e), trbk(e))
logging.error(msg)
raise NameError(msg)
if 1: # close:
fo.close()
if fp in self._files:
del self._files[fp]
else:
self._files[fp] = fo
if check_time:
# save the mtime as the self atime
self._atimes[fp] = sr.st_mtime_ns
return js.decode('ascii')
[docs] def readHK(self, hktype=None, serialize_out=False, all_versions=True):
"""
loads and returns the housekeeping data, or empty `dict` if not found.
hktype: one of the mappings listed in `dicthk.HKDBS`.
serialize_out: if True return serialized form. Default is false.
"""
if hktype is None:
# some new ####
hks = HKDBS
else:
hks = [hktype]
fp0 = self.transformpath(self._poolname)
hk = {}
for hkdata in hks:
fp = op.abspath(pathjoin(fp0, hkdata + '.jsn'))
if op.exists(fp) and (all_versions or ('dT' in fp)):
js = self.readmmap(fp, check_time=True)
if js:
if serialize_out:
r = js
else:
from ..dataset.deserialize import deserialize
r = deserialize(js, int_key=True)
self._cached_files[fp] = js
else:
# the file hasnot changed since last time we r/w it.
r = self._cached_files[fp] if serialize_out else \
self.__getattribute__('_' + hkdata)
else:
if serialize_out:
r = '{}' # '{"_STID":"ODict"}'
else:
from ..dataset.odict import ODict
r = {} # ODict()
hk[hkdata] = r
logger.debug('HK read from ' + fp0)
if serialize_out:
return '{%s}' % ', '.join(('"%s": %s' % (k, v) for k, v in hk.items())) if hktype is None else hk[hktype]
else:
return hk if hktype is None else hk[hktype]
[docs] def writeJsonmmap(self, fp, data, serialize_in=True, serialize_out=False, close=False, check_time=False, meta_location=False, **kwds):
""" write data in JSON from mmap file at fp.
register the file. Leave file open by default `close`.
data: to be serialized and saved.
serialize_out: if True returns contents in serialized form.
:check_time: to check if file has not been written since we did last time. Default `False`.
:meta_location: return the start and end offsets of metadata in data JSON. Default `False`.
:return:
int bytes written. If `meta_location` is ```True```, adding int int start and end point offsets of metadata in seriaized data.
"""
from ..dataset.serializable import serialize
js = serialize(data, **kwds) if serialize_in else data
# start = end = None
if meta_location:
# locate metadata
start = js.find(MetaData_Json_Start, 0)
# make end relative to file start
end = js.find(MetaData_Json_End, start) + start
start += len(MetaData_Json_Start)
end += len(MetaData_Json_End)
fp = op.abspath(fp)
if 1: # fp not in self._files or self._files[fp] is None:
file_obj = open(fp, mode="w+", encoding="utf-8")
# with mmap.mmap(file_obj.fileno(), length=0, access=mmap.ACCESS_WRITE) as mmap_obj:
else:
file_obj = self._files[fp]
file_obj.seek(0)
# file_obj.resize(len(js))
file_obj.truncate(0)
file_obj.write(js)
# file_obj.flush()
close = 1
if close:
file_obj.close()
if fp in self._files:
del self._files[fp]
else:
self._files[fp] = file_obj
if check_time:
# save the mtime as the self atime
sr = os.stat(fp)
os.utime(fp, ns=(sr.st_atime_ns, sr.st_mtime_ns))
self._atimes[fp] = sr.st_mtime_ns
self._cached_files[fp] = js
l = len(js)
logger.debug('JSON saved to: %s %d bytes' % (fp, l))
if meta_location:
return l, start, end
else:
return l
[docs] def writeHK(self, fp0=None, all_versions=True):
""" save the housekeeping data to disk
"""
if fp0 is None:
fp0 = self._poolpath + '/' + self._poolname
l = 0
for hkdata in HKDBS:
if not all_versions and not 'dT' in hkdata:
continue
fp = pathjoin(fp0, hkdata + '.jsn')
l += self.writeJsonmmap(fp, self.__getattribute__('_' + hkdata),
check_time=True)
logger.debug('=== '+str(self._dTypes))
return l
# self._urns[u]['meta'] = mt
[docs] def getCacheInfo(self):
info = super().getCacheInfo()
for i in ['getMetaByUrnJson', 'transformpath']:
info[i] = getattr(self, i).cache_info()
return info
[docs] def doSave(self, resourcetype, index, data, tags=None, serialize_in=True, **kwds):
"""
does the media-specific saving.
index: int
"""
fp0 = self.transformpath(self._poolname)
fp = pathjoin(fp0, quote(resourcetype) + '_' + str(index))
try:
# t0 = time.time()
l, start, end = self.writeJsonmmap(
fp, data, serialize_in=serialize_in, close=True,
meta_location=True, **kwds)
urn = makeUrn(self._poolname, resourcetype, index)
self.setMetaByUrn(start, end, urn)
l += self.writeHK(fp0)
# print('tl %.8f %9d' % (time.time()-t0, l))
logger.debug('HK written')
except IOError as e:
logger.error('Save failed. exc: %s trbk: %s.' % (str(e), trbk(e)))
raise e # needed for undoing HK changes
return l, start, end
[docs] def doLoad(self, resourcetype, index, start=None, end=None, serialize_out=False):
"""
does the action of loading.
serialize_out: if True returns contents in serialized form.
"""
indexstr = str(index)
pp = self.transformpath(self._poolname) + '/' + \
resourcetype + '_' + indexstr
js = self.readmmap(pp, start=start, end=end, close=True)
if serialize_out:
r = js
else:
from ..dataset.deserialize import deserialize
r = deserialize(js)
return r
[docs] def doRemove(self, resourcetype, index, **kwds):
"""
does the action of removal of product from pool.
"""
fp0 = self.transformpath(self._poolname)
fp1 = [op.abspath(pathjoin(fp0, quote(r) + f'_{i}'))
for r, i in zip(resourcetype, index)]
res = []
for fp in fp1:
try:
if fp in self._files:
if self._files[fp]:
self._files[fp].flush()
self._files[fp].close()
del self._files[fp]
os.unlink(fp)
self.writeHK(fp0)
res.append(0)
except RuntimeError as e:
msg = f'Remove failed. exc: {e} trbk: {trbk(e)}'
logger.debug(msg)
if self.ignore_error_when_delete:
res.append(None)
continue
else:
raise
return res
[docs] def doWipe(self):
"""
does the action of remove-all
"""
for n, f in self._files.items():
if f:
f.flush()
f.close()
self._files.clear()
self._atimes.clear()
self._cached_files.clear()
# will leave a newly made pool dir
wipeLocal(self.transformpath(self._poolname))
return 0
[docs] def getHead(self, ref):
""" Returns the latest version of a given product, belonging
to the first pool where the same track id is found.
"""
raise (NotImplementedError())
[docs] def backup(self):
""" make a tarfile string into a string """
fp0 = self.transformpath(self._poolname)
logger.info('Making a gz tar file of %s for pool %s.' %
(fp0, self._poolname))
with filelock.FileLock(self.lockpath('r')):
# Save unsaved changes
self.writeHK(fp0)
with io.BytesIO() as iob:
with tarfile.open(None, 'w|gz', iob) as tf:
tar = tf.add(fp0, arcname='.')
file_image = iob.getvalue()
return file_image
[docs] def restore(self, tar):
"""untar the input file to this pool and return the file list."""
with filelock.FileLock(self.lockpath('w')):
fp0 = self.transformpath(self._poolname)
self.doWipe()
with io.BytesIO(tar) as iob:
with tarfile.open(None, 'r|gz', iob) as tf:
tf.extractall(fp0)
allf = find_all_files(fp0)
# read into memory
self._classes, self._tags, self._urns, self._dTypes, self._dTags = tuple(
self.readHK().values())
logger.info('Restored from a gz tar file to %s for pool %s. %d files.' %
(fp0, self._poolname, len(allf)))
return allf