# -*- coding: utf-8 -*-
from .serializable import serialize, ATTR, LEN_ATTR
from .odict import ODict
from .classes import Classes
from ..utils.common import lls, trbk, guess_value
import logging
import json
import codecs
import binascii
import array
import mmap
from collections import ChainMap
import builtins
import urllib
from collections import UserDict
from collections.abc import MutableMapping as MM, MutableSequence as MS, MutableSet as MSe
import sys
if sys.version_info[0] >= 3: # + 0.1 * sys.version_info[1] >= 3.3:
PY3 = True
strset = (str, bytes, bytearray)
else:
PY3 = False
strset = (str, unicode)
# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' % (logger.getEffectiveLevel()))
''' Note: this has to be in a different file where other interface
classes are defined to avoid circular dependency (such as ,
Serializable.
'''
[文档]def constructSerializable(obj, lookup=None, debug=False):
""" mh: reconstruct object from the output of jason.loads().
Recursively goes into nested class instances that are not
encoded by default by JSONEncoder, instantiate and fill in
variables.
Objects to be deserialized must have their classes loaded.
_STID cannot have module names in it (e.g. dataset.Product)
or locals()[classname] or globals()[classname] will not work.
Parameters
----------
Returns
-------
"""
global indent
indent += 1
spaces = ' ' * indent
classname = obj.__class__.__name__
if debug:
print(spaces + '===OBJECT %s ===' % lls(obj, 150))
if not hasattr(obj, '__iter__') or issubclass(obj.__class__, strset):
if debug:
print(spaces + 'Find non-iter <%s>' % classname)
indent -= 1
return obj
# process list first
if isinstance(obj, list):
if debug:
print(spaces + 'Find list <%s>' % classname)
inst = []
# loop i to preserve order
for i in range(len(obj)):
x = obj[i]
xc = x.__class__
if debug:
print(spaces + 'looping through list %d <%s>' %
(i, xc.__name__))
if issubclass(xc, (list, dict)):
des = constructSerializable(x, lookup=lookup, debug=debug)
else:
des = x
inst.append(des)
if debug:
print(spaces + 'Done with list <%s>' % (classname))
indent -= 1
return inst
if not '_STID' in obj:
""" This object is supported by JSON encoder """
if debug:
print(spaces + 'Find non-_STID. <%s>' % classname)
inst = obj
else:
ostid = obj['_STID']
# check escape case
if ostid.startswith('0'):
# escape
inst = obj
# classname = ostid[1:]
# inst['_STID'] = classname
if debug:
print(spaces + 'Find _STID <%s>. Take as <%s>.' %
(ostid, classname))
indent -= 1
return inst
else:
classname = ostid
if debug:
print(spaces + 'Find _STID <%s>.' % (ostid))
# process types wrapped in a dict
if PY3:
if classname == 'bytes':
inst = codecs.decode(obj['code'], 'hex')
if debug:
print(spaces + 'Instanciate hex')
indent -= 1
return inst
elif classname.startswith('array.array'):
tcode = classname.rsplit('_', 1)[1]
inst = array.array(tcode, binascii.a2b_hex(obj['code']))
if debug:
print(spaces + 'Instanciate array.array')
indent -= 1
return inst
if classname in lookup:
# Now we have a blank instance.
inst = lookup[classname]()
if debug:
print(spaces + 'Instanciate custom obj <%s>' % classname)
elif classname == 'ellipsis':
if debug:
print(spaces + 'Instanciate Ellipsis')
indent -= 1
return Ellipsis
elif classname in lookup and 'obj' in obj:
o = constructSerializable(obj['obj'], lookup=lookup, debug=debug)
inst = lookup[classname](o)
if debug:
print(spaces + 'Instanciate defined %s' % obj['obj'])
indent -= 1
return inst
elif classname == 'dtype':
if debug:
print(spaces + 'Instanciate type %s' % obj['obj'])
inst = lookup[obj['obj']]
indent -= 1
return inst
else:
raise ValueError('Class %s is not known.' % classname)
if debug:
print(spaces + 'Go through properties of instance')
for (k, v) in obj.items():
""" loop through all key-value pairs. """
if k == '_STID':
continue
# deserialize v
# should be object_pairs_hook in the following if... line
if issubclass(v.__class__, (dict, list)):
if debug:
print(spaces + '[%s]value(dict/list) <%s>: %s' %
(k, v.__class__.__qualname__,
lls(list(iter(v)), 70)))
desv = constructSerializable(v, lookup=lookup, debug=debug)
else:
if debug:
print(spaces + '[%s]value(simple) <%s>: %s' %
(k, v.__class__.__name__, lls(v, 70)))
if 1:
desv = v
else:
if isinstance(v, str) or isinstance(v, bytes):
try:
desv = int(v)
except ValueError:
desv = v
# set k with desv
icn = inst.__class__.__name__
dcn = desv.__class__.__name__
if issubclass(inst.__class__, (MM)): # should be object_pairs_hook
if k.startswith(ATTR):
k2 = k[LEN_ATTR:]
setattr(inst, k2, desv)
if debug:
print(spaces + 'Set attrbute to dict/usrd <%s>.%s = %s <%s>' %
(icn, str(k2), lls(desv, 70), dcn))
else:
inst[k] = desv
if debug:
print(spaces + 'Set member to dict/usrd <%s>[%s] = %s <%s>' %
(icn, str(k), lls(desv, 70), dcn))
else:
if k.startswith(ATTR):
k2 = k[LEN_ATTR:]
setattr(inst, k2, desv)
if debug:
print(spaces + 'set attribute to non-dict <%s>.%s = %s <%s>' %
(icn, str(k2), lls(desv, 70), dcn))
else:
setattr(inst, k, desv)
if debug:
print(spaces + 'set attribute to non-dict <%s>.%s = %s <%s>' %
(icn, str(k), lls(desv, 70), dcn))
indent -= 1
return inst
[文档]class IntDecoder(json.JSONDecoder):
""" adapted from https://stackoverflow.com/questions/45068797/how-to-convert-string-int-json-into-real-int-with-json-loads
modified to also convert keys in dictionaries.
"""
[文档] def decode(self, s):
"""
Parameters
----------
Returns
-------
"""
# result = super(Decoder, self).decode(s) for Python 2.x
result = super(IntDecoder, self).decode(s)
return self._decode(result)
def _decode(self, o):
"""
Parameters
----------
Returns
-------
"""
if isinstance(o, str) or isinstance(o, bytes):
try:
return int(o)
except ValueError:
return o
elif isinstance(o, dict):
return dict({self._decode(k): self._decode(v) for k, v in o.items()})
elif isinstance(o, list):
return [self._decode(v) for v in o]
else:
return o
[文档]class IntDecoderOD(IntDecoder):
def _decode(self, o):
""" Uses ODict
Parameters
----------
Returns
-------
"""
if isinstance(o, str) or isinstance(o, bytes):
try:
return int(o)
except ValueError:
return o
elif isinstance(o, dict):
return ODict({self._decode(k): self._decode(v) for k, v in o.items()})
elif isinstance(o, list):
return [self._decode(v) for v in o]
else:
return o
[文档]def deserialize(js, lookup=None, debug=False, usedict=True):
""" Loads classes with _STID from the results of serialize.
if usedict is True dict insted of ODict will be used.
Parameters
----------
Returns
-------
"""
lookup = Class_Look_Up
if not isinstance(js, strset) or len(js) == 0:
return None
# debug = False # True if issubclass(obj.__class__, list) else False
try:
if usedict:
obj = json.loads(js) # , cls=IntDecoder)
else:
# , cls=IntDecoderOD)
obj = json.loads(js, object_pairs_hook=ODict)
except json.decoder.JSONDecodeError as e:
msg = '\nBad string to decode as JSON=====>\n%s\n<======\n%s' % (
lls(js, 500), str(e))
logging.error(msg)
obj = msg
if debug:
# print('load-str ' + str(o) + ' class ' + str(o.__class__))
print('-------- json loads returns: --------\n' + str(obj))
global indent
indent = -1
return constructSerializable(obj, lookup=lookup, debug=debug)
Class_Look_Up = ChainMap(Classes._classes, globals(), vars(builtins))
Serialize_Args_Sep = '__'
SAS_Avatar = '~'
[文档]def encode_str(a0):
""" quote to remove general url offenders then use a mostly harmless str to substitute Serialize_Args_Sep.
"""
return urllib.parse.quote(a0).replace(Serialize_Args_Sep, SAS_Avatar)
[文档]def serialize_args(*args, not_quoted=False, **kwds):
"""
Serialize all positional and keywords arguements as they would appear in a function call.
Arguements are assumed to have been placed in the same order of a valid function/method call. They are scanned from left to right from `args[i]` i = 0, 1,... to `kwds[j]` j = 0, 1, ...
* Scan args from i=0. if is of args[i] is of `bool`, `int`, `float` types, convert with `str`, if `str()`, convert with `encode_str()`, if `bytes` or `bytearray' types, with ```0x```+`hex()`, save to the convered-list, and move on to the next element.
* else if finding a segment not of any of the above types,
** put this and the rest of ```args``` as the ```value``` in ```{'apiargs':value}```,
** and append `kwds` key-val pairs after this pair,
** serialize the disctionary with `serialize()` and encode_str()
** append the result to the converted-list.
** break from the args scan loop.
* if args scan loop reaches its end, if `kwds` is not empty, serialize it with `serialize()` and encode_str(),
or scanning reaches the end of args.
* append the result to the converted-list.
* join the converted-list with `Serialize_Args_Sep`.
* return the result string
"""
noseriargs = []
i = 0
# print('AR ', args, ' KW ', kwds)
# from ..pal.query import AbstractQuery
# if len(args) and issubclass(args[0].__class__, AbstractQuery):
# __import__('pdb').set_trace()
for i, a0 in enumerate(args):
# a string or number or boolean
a0c = a0.__class__
if a0 is None or issubclass(a0c, (bool, int, float)):
noseriargs.append(str(a0))
elif issubclass(a0c, (str)):
noseriargs.append(a0 if not_quoted else encode_str(a0))
elif issubclass(a0c, (bytes, bytearray)):
noseriargs.append('0x'+a0.hex())
else:
seri = serialize(dict(apiargs=args[i:], **kwds))
noseriargs.append(seri if not_quoted else encode_str(seri))
break
else:
# loop ended w/ break
if kwds:
seri = serialize(kwds)
noseriargs.append(seri if not_quoted else encode_str(seri))
# print(noseriargs)
despaced = Serialize_Args_Sep.join(noseriargs)
return despaced
[文档]def decode_str(a0):
"""
"""
return urllib.parse.unquote(a0.replace(SAS_Avatar, Serialize_Args_Sep))
[文档]def deserialize_args(all_args, not_quoted=False, first_string=True, serialize_out=False):
""" parse the command path to get positional and keywords arguments.
1. if `not_quoted` is `True`, split everythine to the left of first `{` with `Serialize_Args_Sep` append the part startin from the `{`. `mark='{'`
2. else after splitting all_args with `Serialize_Args_Sep`: `mark='%7B%22'` (`quote('{')`)
Scan from left. if all_args[i] not start with `mark`
Conversion rules:
|all_args[i]| converted to |
| else | convert (case insensitive) and move on to the next segment |
| string not starting with ```'0x'``` | `quote` |
* else `decode_str()` if ```not_quoted==False``` else only substitute SAS_Avatar with Serialize_Args_Sep. Then `deserialize()` this segment to become ```{'apiargs':list, 'foo':bar ...}```, append value of ```apiargs``` to the converted-list above, remove the ```apiargs```-```val``` pair.
* return 200 as the reurn code followed by the converted-list and the deserialized ```dict```.
:all_args: a list of path segments for the args list.
:first_string: Do not try to change the type of the first arg (assumed to be the function/method name).
"""
args, kwds = [], {}
if not_quoted:
mark = '{'
ar = all_args.split(mark, 1)
qulist = ar[0].split(Serialize_Args_Sep)
if len(ar) > 1:
if len(qulist):
# the last ',' was for mark so should be removed,
qulist = qulist[:-1]
qulist.append(mark + ar[1])
else:
mark = '%7B%22'
qulist = all_args.split(Serialize_Args_Sep)
# print(qulist)
for a0 in qulist:
if not a0.startswith(mark):
# guess and change type
# a string, bytes or number or boolean
# if int(a0l.lstrip('+-').split('0x',1)[-1].isnumeric():
# this covers '-/+0x34'
if first_string:
# do not try to change type
arg = decode_str(a0)
first_string = False # do not come back here
else:
arg = guess_value(a0, last=decode_str)
args.append(arg)
# print(args)
else:
# quoted serialized dict
readable = a0.replace(
SAS_Avatar, Serialize_Args_Sep) if not_quoted else decode_str(a0)
dese = deserialize(readable)
if 'apiargs' in dese:
args += dese['apiargs']
del dese['apiargs']
kwds = dese
break
return 200, args, kwds