Source code for fdi.dataset.serializable

# -*- coding: utf-8 -*-

# from ..utils.common import fullname

import array
import binascii
import gzip
# from .odict import ODict
import logging
import json
import copy
import codecs
import urllib
from collections.abc import Collection, Mapping
from functools import lru_cache
import sys
import datetime
if sys.version_info[0] >= 3:  # + 0.1 * sys.version_info[1] >= 3.3:
    PY3 = True
    strset = (str, bytes)
else:
    PY3 = False
    strset = (str, unicode)

# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' %  (logger.getEffectiveLevel()))


[docs]class SerializableEncoderAll(json.JSONEncoder): """ can encode parameter and product etc such that they can be recovered with deserialize(). Python 3 treats string and unicode as unicode, encoded with utf-8, byte blocks as bytes, encoded with utf-8. Python 2 treats string as str and unicode as unicode, encoded with utf-8, byte blocks as str, encoded with utf-8 """
[docs] def default(self, obj): """ Parameters ---------- Returns ------- """ # logger.debug # print('&&&& %s %s' % (str(obj.__class__), str(obj))) if PY3: if issubclass(obj.__class__, bytes): return dict(code=codecs.encode(obj, 'hex'), _STID='bytes') elif issubclass(obj.__class__, array.array): return dict(code=str(binascii.b2a_hex(obj), encoding='ascii'), _STID='array.array_'+obj.typecode) if not PY3 and issubclass(obj.__class__, str): return dict(code=codec.encode(obj, 'hex'), _STID='bytes') if obj is Ellipsis: return {'obj': '...', '_STID': 'ellipsis'} # print(obj.__getstate__()) if issubclass(obj.__class__, Serializable): return obj.__getstate__() print('%%%' + str(obj.__class__)) return # Let the base class default method raise the TypeError d = json.JSONEncoder.default(self, obj) print('encoded d=' + d) return d
# https://stackoverflow.com/a/63455796/13472124 base = (str, int, float, bool, type(None)) def _preprocess(self, obj): """ this all only work on the first level of nested objects Parameters ---------- Returns ------- """ oc = obj.__class__ ocn = type(obj).__name__ # print('%%%*****prepro ' + ocn) # pdb.set_trace() # if issubclass(oc, self.base): # # mainly to process string which is a collections (bellow) # return obj # elif 0 and issubclass(oc, (Serializable, bytes)): # if issubclass(oc, dict): # # if is both __Getstate__ and Mapping, insert _STID, to a copy # o = copy.copy(obj) # o['_STID'] = obj._STID # return o # return obj # elif isinstance(obj, list): # return obj # elif issubclass(oc, (Mapping)): # # if all((issubclass(k.__class__, self.base) for k in obj)): # if True: # # JSONEncoder can handle the keys # if isinstance(obj, dict): # return obj # else: # return {'obj': dict(obj), '_STID': ocn} # else: # # This handles the top-level dict keys # return {'obj': [(k, v) for k, v in obj.items()], '_STID': ocn} if issubclass(oc, (Collection)): return {'obj': list(obj), '_STID': ocn} # elif obj is Ellipsis: # return {'obj': '...', '_STID': ocn} else: return obj
[docs] def iterencode(self, obj, **kwds): """ Parameters ---------- Returns ------- """ return super().iterencode(self._preprocess(obj), **kwds)
GZIP = False """ Use ```gzip``` (and ```Bae64``` if needed) to compress. """ SCHEMA = False """ Output JSONschema instead of JSON erialization. """
[docs]class SerializableEncoder(json.JSONEncoder): """ can encode parameter and product etc such that they can be recovered with deserialize(). Python 3 treats string and unicode as unicode, encoded with utf-8, byte blocks as bytes, encoded with utf-8. Python 2 treats string as str and unicode as unicode, encoded with utf-8, byte blocks as str, encoded with utf-8 """
[docs] def default(self, obj): """ Parameters ---------- Returns ------- """ try: # print('%%%' + str(obj.__class__)) # Let the base class default method raise the TypeError d = json.JSONEncoder.default(self, obj) # print('d=' + d) except TypeError as err: try: # logger.debug # print('&&&& %s %s' % (str(obj.__class__), str(obj))) oc = obj.__class__ if PY3: if issubclass(oc, (datetime.datetime)): if SCHEMA: return '{"$ref": "%s"}' % oc.__name__ from ..dataset.finetime import FineTime return dict( code=FineTime.datetimeToFineTime(obj), _STID=oc.__name__+',tai') if issubclass(oc, (bytes, bytearray)): if SCHEMA: return '{"$ref": "bytes"}' if GZIP: r = dict(code=binascii.b2a_base64( gzip.compress(obj, 5)).decode('ascii'), _STID=oc.__name__ + ',gz,b64') else: r = dict(code=obj.hex(), _STID=oc.__name__) return r elif issubclass(oc, array.array): if SCHEMA: return '{"$ref": "%s"}' % oc.__name__ if GZIP: r = dict(code=binascii.b2a_base64( gzip.compress(obj, 5)).decode('ascii'), _STID='a.array_%s,gz,b64' % obj.typecode) else: r = dict(code=str(codecs.encode(obj, 'hex'), encoding='ascii'), _STID='a.array_'+obj.typecode) return r if not PY3 and issubclass(oc, str): # return dict(code=codec.encode(obj, 'hex'), _STID='bytes') assert False, lls(obj, 50) if GZIP: if SCHEMA: return '{"$ref": "%s"}' % 'bytes' return dict(code=gzip.compress(obj, 5), _STID='bytes,gz') else: return obj if obj is Ellipsis: if SCHEMA: return '{"$ref": "%s"}' % oc.__name__ return {'obj': '...', '_STID': 'ellipsis'} if issubclass(oc, type): if SCHEMA: return '{"$ref": "%s"}' % oc.__name__ return {'obj': obj.__name__, '_STID': 'dtype'} if hasattr(obj, 'serializable'): if SCHEMA: return '{%s}' % obj.schema() try: typ = obj.type except (LookupError, AttributeError): typ = None if typ in ['image/svg']: ser = obj.serializable()[ATTR+'data'] ser = urllib.parse.quote(ser) # print(obj.serializable()) return obj.serializable() try: return dict(obj) except Exception: return list(obj) except Exception as e: print('Serialization failed.' + str(e)) raise
# obj = json.loads(jstring)
[docs]def serialize(o, cls=None, **kwds): """ return JSON using special encoder SerializableEncoder Parameterts ----------- Returns ------- """ if not cls: cls = SerializableEncoder return json.dumps(o, cls=cls, allow_nan=True, **kwds)
[docs]@lru_cache(maxsize=256) def get_schema_with_classname(cls_name, store=None): if store is None: store = makeSchemeStore() for sch in store: if cls_name == 'array': n = 'a_array' else: n = cls_name if sch.endswith('/%s' % n): return sch, store[sch] # did not find. return None, None
ATTR = '_ATTR_' LEN_ATTR = len(ATTR)
[docs]class Serializable(): """ mh: Can be serialized. Has a _STID instance property to show its class information. """
[docs] def __init__(self, *args, **kwds): """ Parameters ---------- Returns ------- """ super().__init__(*args, **kwds) sc = self.__class__ # print('@@@ ' + sc.__name__, str(issubclass(sc, dict))) self._STID = sc.__name__
[docs] def serialized(self, indent=None): """ Parameters ---------- Returns ------- """ return serialize(self, indent=indent)
def __repr__(self): co = ', '.join(str(k)+'=' + ('"'+v+'"' if issubclass(v.__class__, str) else str(v)) for k, v in self.__getstate__().items() ) return self.__class__.__name__ + '(' + co + ')' def __getstate__(self): """ returns an ordered ddict that has all state info of this object. Subclasses should override this function. Parameters ---------- Returns ------- """ raise NotImplementedError() def __setstate__(self, state): """ Parameters ---------- Returns ------- """ for name in state.keys(): if name.startswith(ATTR): k2 = name[LEN_ATTR:] self.__setattr__(k2, state[name]) elif name == '_STID': pass elif hasattr(self, '__setitem__'): self[name] = state[name] else: self.__setattr__(name, state[name]) def __reduce_ex__(self, protocol): """ Parameters ---------- Returns ------- """ def func(): return self.__class__() args = tuple() state = self.__getstate__() return func, args, state def __reduce__(self): """ Parameters ---------- Returns ------- """ return self.__reduce_ex__(4)
[docs] def serializable(self): """ Can be encoded with serializableEncoder. Return ------ dict The state variables plus the Serialization Type ID with ```_STID``` as uts key. """ s = copy.copy(self.__getstate__()) # make sure _STID is the last, for pools to ID data. if '_STID' in s: del s['_STID'] s.update({'_STID': self._STID}) return s
[docs] def schema(self): """ Get schema definition using the FDI standard schema set in `FDI_SCHEMA_STORE`. Subclassing to add more schemas. """ sid, sch = get_schema_with_classname(self.__class__.__name__) return sch
[docs] def yaml(self, *args, **kwds): """ Get a YAML representation. """ from ..utils.ydump import ydump, yinit yinit() return ydump(self, *args, **kwds)
[docs] def tree(self, *args, **kwds): """ Get a directory-tree-like representation. """ from ..utils.tree import tree return '\n'.join(tree(self, *args, **kwds))
[docs] def fits(self, *args, **kwds): """ Get a FITS representation. """ from ..utils.tofits import toFits, FITS_INSTALLED if not FITS_INSTALLED: raise NotImplemented( 'Astropy not installed. Include SCI in extra-dependency when installing FDI.') return toFits(self, *args, **kwds)
[docs] def html(self, extra=False, param_widths=-1, **kwds): """ Get a HTML representation. """ return self.toString(level=0, tablefmt='unsafehtml', tablefmt1='unsafehtml', tablefmt2='unsafehtml', extra=extra, param_widths=param_widths, **kwds)
[docs] def jsonPath(self, expr, val='simple', sep='/', indent=None, *args, **kwds): from ..utils.jsonpath import jsonPath return jsonPath(self.data, expr=expr, val=val, indent=indent, *args, **kwds)
[docs] def fetch(self, paths, exe=['is'], not_quoted=True): from ..utils.fetch import fetch return fetch(paths, self, re='', exe=exe, not_quoted=not_quoted)