# -*- coding: utf-8 -*-
from .serializable import serialize, ATTR, LEN_ATTR
from .classes import Classes
from ..utils.common import lls
from ..dataset.metadata import guess_value
from ..dataset.finetime import FineTime
import logging
import json
import gzip
import binascii
import array
import urllib
from .odict import ODict
from collections.abc import MutableMapping as MM
import sys
if sys.version_info[0] >= 3: # + 0.1 * sys.version_info[1] >= 3.3:
PY3 = True
strset = (str, bytes, bytearray)
else:
PY3 = False
strset = (str, unicode)
# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' % (logger.getEffectiveLevel()))
''' Note: this has to be in a different file where other interface
classes are defined to avoid circular dependency (such as ,
Serializable.
'''
[docs]def constructSerializable(obj, lookup=None, int_key=False, debug=False):
""" mh: reconstruct object from the output of jason.loads().
Recursively goes into nested class instances that are not
encoded by default by JSONEncoder, instantiate and fill in
variables.
Objects to be deserialized must have their classes loaded.
_STID cannot have module names in it (e.g. dataset.Product)
or locals()[classname] or globals()[classname] will not work.
Parameters
----------
:useint: is a key in key-value pair has only digits characters the key will be substituted by `int(key)`.
Returns
-------
"""
global indent
indent += 1
spaces = ' ' * indent
classname = obj.__class__.__name__
if debug:
print(spaces + '===OBJECT %s ===' % lls(obj, 150))
if not hasattr(obj, '__iter__') or issubclass(obj.__class__, strset):
if debug:
print(spaces + 'Find non-iter <%s>' % classname)
indent -= 1
return obj
# process list first
if isinstance(obj, list):
if debug:
print(spaces + 'Find list <%s>' % classname)
inst = []
# loop i to preserve order
for i in range(len(obj)):
x = obj[i]
xc = x.__class__
if debug:
print(spaces + 'looping through list %d <%s>' %
(i, xc.__name__))
if issubclass(xc, (list, dict)):
des = constructSerializable(
x, lookup=lookup, int_key=int_key, debug=debug)
else:
des = x
inst.append(des)
if debug:
print(spaces + 'Done with list <%s>' % (classname))
indent -= 1
return inst
if not '_STID' in obj:
""" This object is supported by JSON encoder """
if debug:
print(spaces + 'Find non-_STID. <%s>' % classname)
inst = obj
else:
ostid = obj['_STID']
# check escape case
if ostid.startswith('0'):
# escape
inst = obj
# classname = ostid[1:]
# inst['_STID'] = classname
if debug:
print(spaces + 'Find _STID <%s>. Take as <%s>.' %
(ostid, classname))
indent -= 1
return inst
else:
classname = ostid
if debug:
print(spaces + 'Find _STID <%s>.' % (ostid))
# process types wrapped in a dict
if PY3:
if classname == 'datetime,tai':
inst = FineTime(obj['code']).toDatetime()
if debug:
print(spaces + 'Instanciate datetime')
indent -= 1
return inst
if classname == 'bytes':
inst = bytes.fromhex(obj['code'])
# inst = codecs.decode(obj['code'], 'hex')
if debug:
print(spaces + 'Instanciate bytes')
indent -= 1
return inst
elif classname == 'bytes,gz' or classname == 'bytes,gz,b64':
inst = gzip.decompress(binascii.a2b_base64(obj['code']))
if debug:
print(spaces + 'Instanciate hex_gz')
indent -= 1
return inst
elif classname == 'bytearray':
inst = bytearray.fromhex(obj['code'])
if debug:
print(spaces + 'Instanciate bytearray')
indent -= 1
return inst
elif classname == 'bytearray,gz' or classname == 'bytearray,gz,b64':
inst = bytearray(gzip.decompress(
binascii.a2b_base64(obj['code'])))
if debug:
print(spaces + 'Instanciate bytearray_gz')
indent -= 1
return inst
elif classname.startswith('a.array'):
# format is "...a.array_I,..."
array_t, tcode = tuple(classname.split(',', 1)[0].split('_'))
if array_t == 'a.array':
inst = array.array(tcode, binascii.a2b_hex(obj['code']))
if debug:
print(spaces + 'Instanciate array.array')
indent -= 1
return inst
elif array_t == 'a.array,gz,b64':
# gzip-base64
inst = array.array(tcode, gzip.decompress(
binascii.a2b_base64(obj['code'])))
if debug:
print(spaces + 'Instanciate array.array gzip base64')
indent -= 1
return inst
if classname in lookup:
# Now we have a blank instance.
inst = lookup[classname]()
if debug:
print(spaces + 'Instanciate custom obj <%s>' % classname)
elif classname == 'ellipsis':
if debug:
print(spaces + 'Instanciate Ellipsis')
indent -= 1
return Ellipsis
elif classname in lookup and 'obj' in obj:
o = constructSerializable(
obj['obj'], lookup=lookup, int_key=int_key, debug=debug)
inst = lookup[classname](o)
obj = inst
try:
typ = obj.type
except (LookupError, AttributeError) as e:
typ = None
if typ in ['image/svg']:
ser = obj[ATTR+'data']
ser = urllib.parse.unquote(ser)
if debug:
print(spaces + 'Instanciate defined %s' % obj['obj'])
indent -= 1
return inst
elif classname == 'dtype':
if debug:
print(spaces + 'Instanciate type %s' % obj['obj'])
inst = lookup[obj['obj']]
if inst is None:
__import__('pdb').set_trace()
indent -= 1
return inst
else:
raise ValueError('Class %s is not known.' % classname)
if debug:
print(spaces + 'Go through properties of instance')
# we might change key during iteration so save the original keys
for k in list(obj.keys()):
""" loop through all key-value pairs. """
v = obj[k]
if k == '_STID':
continue
# deserialize v
# should be object_pairs_hook in the following if... line
if issubclass(v.__class__, (dict, list)):
if debug:
print(spaces + '[%s]value(dict/list) <%s>: %s' %
(k, v.__class__.__qualname__,
lls(list(iter(v)), 70)))
desv = constructSerializable(
v, lookup=lookup, int_key=int_key, debug=debug)
else:
if debug:
print(spaces + '[%s]value(simple) <%s>: %s' %
(str(k), v.__class__.__name__, lls(v, 70)))
if 1:
desv = v
else:
if isinstance(v, str) or isinstance(v, bytes):
try:
desv = int(v)
except ValueError:
desv = v
# set k with desv
icn = inst.__class__.__name__
dcn = desv.__class__.__name__
if issubclass(inst.__class__, (MM)): # should be object_pairs_hook
# set attributes. JSON doesn't support attributes to all objects so we have to use the `ATTR` work-around. Because `inst` could actually could have a key starting with `ATTR`, we set the attributes only when inst is not `dict` or when '_STID' is a member.
if issubclass(k.__class__, str) and k.startswith(ATTR) and (not isinstance(inst, dict) or '_STID' in inst):
k2 = k[LEN_ATTR:]
setattr(inst, k2, desv)
if debug:
print(spaces + 'Set attrbute to dict/usrd <%s>.%s = %s <%s>' %
(icn, str(k2), lls(desv, 70), dcn))
else:
# convert key to int if required
if int_key and k.isdigit():
# obj == inst so delete old entry if to change key
inst.pop(k)
inst[int(k)] = desv
else:
inst[k] = desv
if debug:
print(spaces + 'Set member to dict/usrd <%s>[%s] = %s <%s>' %
(icn, str(k), lls(desv, 70), dcn))
else:
if k.startswith(ATTR):
k2 = k[LEN_ATTR:]
setattr(inst, k2, desv)
if debug:
print(spaces + 'set attribute to non-dict <%s>.%s = %s <%s>' %
(icn, str(k2), lls(desv, 70), dcn))
else:
setattr(inst, k, desv)
if debug:
print(spaces + 'set attribute to non-dict <%s>.%s = %s <%s>' %
(icn, str(k), lls(desv, 70), dcn))
indent -= 1
return inst
[docs]class IntDecoder(json.JSONDecoder):
""" adapted from https://stackoverflow.com/questions/45068797/how-to-convert-string-int-json-into-real-int-with-json-loads
modified to also convert keys in dictionaries.
"""
[docs] def decode(self, s):
"""
Parameters
----------
Returns
-------
"""
# result = super(Decoder, self).decode(s) for Python 2.x
result = super(IntDecoder, self).decode(s)
return self._decode(result)
def _decode(self, o):
"""
Parameters
----------
Returns
-------
"""
if isinstance(o, str) or isinstance(o, bytes):
try:
return int(o)
except ValueError:
return o
elif isinstance(o, dict):
return dict({self._decode(k): self._decode(v) for k, v in o.items()})
elif isinstance(o, list):
return [self._decode(v) for v in o]
else:
return o
[docs]class IntDecoderOD(IntDecoder):
def _decode(self, o):
""" Uses ODict
Parameters
----------
Returns
-------
"""
if isinstance(o, str) or isinstance(o, bytes):
try:
return int(o)
except ValueError:
return o
elif isinstance(o, dict):
return ODict({self._decode(k): self._decode(v) for k, v in o.items()})
elif isinstance(o, list):
return [self._decode(v) for v in o]
else:
return o
Class_Look_Up = Classes.mapping
[docs]def deserialize(js, lookup=None, debug=False, usedict=True, int_key=False, verbose=False):
""" Create (Load) live object from string, bytes that are generated by serialization.
Parameters
----------
js : str, bytes.
JSON string or bytes, made with `json.load`.
lookup : mapping
A `globals` or `locals` -like mapping that gives class object if the class name is given.
debug : bool
A if set `True`, print out step-by-step report of how a JSON document is deserialized into an object. Default `False`.
usedict: bool
If is `True` `dict` insted of `ODict` will be used during deserialization. Default `True`.
init_key: bool
If set, dictionary keys that are integers in string form, e.g. '1', are casted to integers. Default `False`.
verbose: bool
Give even more info.
Returns
-------
object
Deserialized object.
"""
if lookup is None:
lookup = Class_Look_Up
if not isinstance(js, strset) or len(js) == 0:
return None
# debug = False # True if issubclass(obj.__class__, list) else False
try:
if usedict:
obj = json.loads(js) # , cls=IntDecoder)
else:
# , cls=IntDecoderOD)
obj = json.loads(js, object_pairs_hook=ODict)
except json.decoder.JSONDecodeError as e:
msg = 'Bad JSON====>\n%s\n<====\n%s' % (
lls(js, 500), str(e))
logging.error(msg)
obj = msg
if debug:
# print('load-str ' + str(o) + ' class ' + str(o.__class__))
print('-------- json loads returns: --------\n' + str(obj))
global indent
indent = -1
return constructSerializable(obj, lookup=lookup, int_key=int_key, debug=debug)
Serialize_Args_Sep = '__'
SAS_Avatar = '~'
[docs]def encode_str(a0):
""" quote to remove general url offenders then use a mostly harmless str to substitute Serialize_Args_Sep.
"""
return urllib.parse.quote(a0).replace(Serialize_Args_Sep, SAS_Avatar)
[docs]def serialize_args(*args, not_quoted=False, **kwds):
"""
Serialize all positional and keywords arguements as they would appear in a function call.
Arguements are assumed to have been placed in the same order of a valid function/method call. They are scanned from left to right from `args[i]` i = 0, 1,... to `kwds[j]` j = 0, 1, ...
* Scan args from i=0. if is of args[i] is of `bool`, `int`, `float` types, convert with `str`, if `str()`, convert with `encode_str()`, if `bytes` or `bytearray' types, with ```0x```+`hex()`, save to the convered-list, and move on to the next element.
* else if finding a segment not of any of the above types,
** put this and the rest of ```args``` as the ```value``` in ```{'apiargs':value}```,
** and append `kwds` key-val pairs after this pair,
** serialize the disctionary with `serialize()` and encode_str()
** append the result to the converted-list.
** break from the args scan loop.
* if args scan loop reaches its end, if `kwds` is not empty, serialize it with `serialize()` and encode_str(),
or scanning reaches the end of args.
* append the result to the converted-list.
* join the converted-list with `Serialize_Args_Sep`.
* return the result string
"""
noseriargs = []
i = 0
# print('AR ', args, ' KW ', kwds)
# from ..pal.query import AbstractQuery
# if len(args) and issubclass(args[0].__class__, AbstractQuery):
# __import__('pdb').set_trace()
for i, a0 in enumerate(args):
# a string or number or boolean
a0c = a0.__class__
if a0 is None or issubclass(a0c, (bool, int, float)):
noseriargs.append(str(a0))
elif issubclass(a0c, (str)):
noseriargs.append(a0 if not_quoted else encode_str(a0))
elif issubclass(a0c, (bytes, bytearray)):
noseriargs.append('0x'+a0.hex())
else:
seri = serialize(dict(apiargs=args[i:], **kwds))
noseriargs.append(seri if not_quoted else encode_str(seri))
break
else:
# loop ended w/ break
if kwds:
seri = serialize(kwds)
noseriargs.append(seri if not_quoted else encode_str(seri))
# print(noseriargs)
despaced = Serialize_Args_Sep.join(noseriargs)
return despaced
[docs]def decode_str(a0):
"""
"""
return urllib.parse.unquote(a0.replace(SAS_Avatar, Serialize_Args_Sep))
[docs]def deserialize_args(all_args, not_quoted=False, first_string=True, serialize_out=False):
""" parse the command path to get positional and keywords arguments.
1. if `not_quoted` is `True`, split everythine to the left of first `{` with `Serialize_Args_Sep` append the part startin from the `{`. `mark='{'`
2. else after splitting all_args with `Serialize_Args_Sep`: `mark='%7B%22'` (`quote('{')`)
Scan from left. if all_args[i] not start with `mark`
Conversion rules:
|all_args[i]| converted to |
| else | convert (case insensitive) and move on to the next segment |
| string not starting with ```'0x'``` | `quote` |
* else `decode_str()` if ```not_quoted==False``` else only substitute SAS_Avatar with Serialize_Args_Sep. Then `deserialize()` this segment to become ```{'apiargs':list, 'foo':bar ...}```, append value of ```apiargs``` to the converted-list above, remove the ```apiargs```-```val``` pair.
* return 200 as the reurn code followed by the converted-list and the deserialized ```dict```.
:all_args: a list of path segments for the args list.
:first_string: Do not try to change the type of the first arg (assumed to be the function/method name).
"""
args, kwds = [], {}
if not_quoted:
mark = '{'
ar = all_args.split(mark, 1)
qulist = ar[0].split(Serialize_Args_Sep)
if len(ar) > 1:
if len(qulist):
# the last ',' was for mark so should be removed,
qulist = qulist[:-1]
qulist.append(mark + ar[1])
else:
mark = '%7B%22'
qulist = all_args.split(Serialize_Args_Sep)
# print(qulist)
for a0 in qulist:
if not a0.startswith(mark):
# guess and change type
# a string, bytes or number or boolean
# if int(a0l.lstrip('+-').split('0x',1)[-1].isnumeric():
# this covers '-/+0x34'
if first_string:
# do not try to change type
arg = decode_str(a0)
first_string = False # do not come back here
else:
arg = guess_value(a0, last=decode_str)
args.append(arg)
# print(args)
else:
# quoted serialized dict
readable = a0.replace(
SAS_Avatar, Serialize_Args_Sep) if not_quoted else decode_str(a0)
dese = deserialize(readable)
if 'apiargs' in dese:
args += dese['apiargs']
del dese['apiargs']
kwds = dese
break
logger.debug('args %s KWDS %s' % (str(args), str(kwds)))
return 200, args, kwds