fdi.utils.common 源代码

# -*- coding: utf-8 -*-

from .masked import masked
from .ydump import ydump
from .. import dataset

import hashlib
import array
import traceback
import pprint
import copy
import pwd
import logging
from functools import lru_cache
from itertools import zip_longest, accumulate
from collections.abc import Sequence, Mapping
import sys
if sys.version_info[0] >= 3:  # + 0.1 * sys.version_info[1] >= 3.3:
    PY3 = True
else:
    PY3 = False


# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' %  (logger.getEffectiveLevel()))


[文档]def str2md5(string): return hashlib.md5(string.encode('utf8')).hexdigest()
[文档]def trbk(e): """ trace back """ ls = [x for x in traceback.extract_tb(e.__traceback__).format()] if hasattr( e, '__traceback__') else [''] return ' '.join(ls) + ' ' + \ (e.child_traceback if hasattr(e, 'child_traceback') else '')
[文档]def trbk2(e): tb = traceback.TracebackException.from_exception(e) return ''.join(tb.stack.format())
[文档]def bstr(x, length=0, tostr=True, quote="'", level=0, tablefmt='rst', tablefmt1='simple', tablefmt2='rst', width=0, heavy=True, yaml=False, **kwds): """ returns the best string representation. if the object is a string, return single-quoted; if has toString(), use it; else returns str(). Length limited by lls(lls) """ s = issubclass(x.__class__, str) if PY3 else issubclass( x.__class__, (str, unicode)) if s: r = quote + x + quote elif tostr and hasattr(x, 'toString') and not issubclass(x.__class__, type): r = x.toString(level=level, tablefmt=tablefmt, tablefmt1=tablefmt1, tablefmt2=tablefmt2, width=width, heavy=heavy, **kwds) elif issubclass(x.__class__, (bytes, bytearray, memoryview)): r = x.hex() else: html = tablefmt == 'html' or tablefmt2 == 'html' r = ydump(x) if yaml else str(x) if html: r = '<pre>%s</pre>' % r return lls(r, length=length)
[文档]def lls(s, length=80): """ length-limited string. Returns the str if len <= length or length <=3. Returns 'begin...end' if not. """ st = str(s) if len(st) <= length or length <= 3: return st else: l = int(0.8*(length-3)) return '%s...%s' % (st[:l], st[3 + l - length:])
""" https://stackoverflow.com/a/2718268 LHan = [[0x2E80, 0x2E99], # Han # So [26] CJK RADICAL REPEAT, CJK RADICAL RAP [0x2E9B, 0x2EF3], # Han # So [89] CJK RADICAL CHOKE, CJK RADICAL C-SIMPLIFIED TURTLE [0x2F00, 0x2FD5], # Han # So [214] KANGXI RADICAL ONE, KANGXI RADICAL FLUTE 0x3005, # Han # Lm IDEOGRAPHIC ITERATION MARK 0x3007, # Han # Nl IDEOGRAPHIC NUMBER ZERO [0x3021, 0x3029], # Han # Nl [9] HANGZHOU NUMERAL ONE, HANGZHOU NUMERAL NINE [0x3038, 0x303A], # Han # Nl [3] HANGZHOU NUMERAL TEN, HANGZHOU NUMERAL THIRTY 0x303B, # Han # Lm VERTICAL IDEOGRAPHIC ITERATION MARK [0x3400, 0x4DB5], # Han # Lo [6582] CJK UNIFIED IDEOGRAPH-3400, CJK UNIFIED IDEOGRAPH-4DB5 [0x4E00, 0x9FC3], # Han # Lo [20932] CJK UNIFIED IDEOGRAPH-4E00, CJK UNIFIED IDEOGRAPH-9FC3 [0xF900, 0xFA2D], # Han # Lo [302] CJK COMPATIBILITY IDEOGRAPH-F900, CJK COMPATIBILITY IDEOGRAPH-FA2D [0xFA30, 0xFA6A], # Han # Lo [59] CJK COMPATIBILITY IDEOGRAPH-FA30, CJK COMPATIBILITY IDEOGRAPH-FA6A [0xFA70, 0xFAD9], # Han # Lo [106] CJK COMPATIBILITY IDEOGRAPH-FA70, CJK COMPATIBILITY IDEOGRAPH-FAD9 [0x20000, 0x2A6D6], # Han # Lo [42711] CJK UNIFIED IDEOGRAPH-20000, CJK UNIFIED IDEOGRAPH-2A6D6 [0x2F800, 0x2FA1D]] # Han # Lo [542] CJK COMPATIBILITY IDEOGRAPH-2F800, CJK COMPATIBILITY IDEOGRAPH-2FA1D """
[文档]@lru_cache(maxsize=128) def wcw(char): # cached width function from ..dataset.metadata import wcwidth return wcwidth.wcwidth(char)
[文档]def wls(st, width=15, fill=None, unprintable='#'): """ generates a string comtaining width-limited strings separated with '\n'. Identifies Line-breaks with `str.splitlines` https://docs.python.org/3.6/library/stdtypes.html#str.splitlines Removes trailing line-breaks. :st: input string. If not a string, ```str(st)``` is used. :width: if > 0 returns the str with '\n' inserted every width chars. Or else return the input ``st``. Default is 15. A CJK characters occupies 2 in widths. :unprintable: substitute unprintable characters with is. default is '#'. """ if not issubclass(st.__class__, str): st = str(st) if width <= 0 or len(st) == 0: return st line = [] for s in st.splitlines(): lens = len(s) # starting index for current line based on the last line lasti = 0 # display length starting from the beginning of the last line. l = 0 for i, c in enumerate(s): w = wcw(c) l0 = l if w == -1: # change unprintable # ref https://wcwidth.readthedocs.io/en/latest/api.html c = unprintable w = wcw(c) l += w else: l += w #print(i, c, l, lasti, s) if l == width: line.append(c) line.append('\n') lasti, l = i+1, 0 elif l > width: if width < 2: # print wide characters even they are too wide for width==1 line.append(c) line.append('\n') lasti = i+1 l = 0 else: # set line pointer to this char if fill: line.append((width-l0) * fill) line.append('\n') line.append(c) lasti = i l = w else: line.append(c) if len(line) == 0 or line[-1] != '\n': if fill: line.append((width-l) * fill) line.append('\n') # print(line) return ''.join(line[:-1])
[文档]def mstr(obj, level=0, width=1, excpt=None, indent=4, depth=0, tablefmt='rst', tablefmt1='simple', tablefmt2='rst', **kwds): """ Makes a presentation string at a detail level. 'tablefmt' is needed to be passed in recursive calls under some conditions it is used. """ excp = ['_STID', 'data', '_sets'] if excpt: excp.extend(excpt) ind = ' '*indent if level == 0: if not hasattr(obj, 'items'): return bstr(obj, level=level, **kwds) if issubclass(obj.__class__, dataset.metadata.MetaData): return obj.toString(level=level, tablefmt=tablefmt, tablefmt1=tablefmt1, tablefmt2=tablefmt2, **kwds) s = ['%s= {%s}' % (mstr(k, level=level, excpt=excp, indent=indent, depth=depth+1, quote='', tablefmt=tablefmt, tablefmt1=tablefmt1, tablefmt2=tablefmt2, **kwds), mstr(v, level=level, excpt=excp, indent=indent, depth=depth+1, tablefmt=tablefmt, tablefmt1=tablefmt1, tablefmt2=tablefmt2, **kwds)) for k, v in obj.items() if k not in excp] if len(''.join(s)) < 70: sep = ', ' else: sep = ',\n' + ind*depth if depth > 0: s[0] = '\n' + ind*depth + s[0] return sep.join(s) elif level == 1: if not hasattr(obj, 'items'): # returns value of value if possible. limit to 40 char obj = obj.getValue() if hasattr(obj, 'getValue') else obj return bstr(obj, length=80, level=level, **kwds) if issubclass(obj.__class__, dataset.metadata.MetaData): return obj.toString(level=level, **kwds) + '\n' else: pat = '%s= {%s}' if depth == 0 else '%s= %s' data = obj s = [pat % (mstr(k, level=level, excpt=excp, tablefmt=tablefmt, tablefmt1=tablefmt1, tablefmt2=tablefmt2, indent=indent, depth=depth+1, quote='', **kwds), mstr(v, level=level, excpt=excp, tablefmt=tablefmt, tablefmt1=tablefmt1, tablefmt2=tablefmt2, indent=indent, depth=depth+1, **kwds)) for k, v in data.items() if k not in excp] sep = ',\n' if depth == 0 else ', ' return sep.join(s) else: if not hasattr(obj, 'items'): return mstr(obj, level=1, tablefmt=tablefmt, tablefmt1=tablefmt1, tablefmt2=tablefmt2, **kwds) s = ['%s' % (mstr(k, level=level, excpt=excp, quote='', tablefmt=tablefmt, tablefmt1=tablefmt1, tablefmt2=tablefmt2, **kwds)) for k, v in obj.items() if k not in excp] return ', '.join(s)
[文档]def binhexstring(val, typ_, width=0, v=None, p=None, level=0, **kwds): """ returns val in binary, hex, or string according to typ_. val; list of validity descriptor entries. typ_: parameter type in ``DataTypes``. """ if typ_ == 'hex': func = hex elif typ_ == 'binary': func = bin else: func = str breakline = True if not issubclass(val.__class__, list): return func(val) if v == '_valid' and p: validity = p.validate(val) lst = [] # number of bits of mask highest = 0 masks = [] for t in val: if v == '_valid': # val is for '_valid' [[], [], []..] rule, name = t[0], t[1] if issubclass(rule.__class__, (tuple, list)): # range or binary with mask. (1,95) (0B011, 011) if rule[0] < rule[1]: # not binary masked seg = "(%s, %s): %s" % (func(rule[0]), func(rule[1]), name) else: # binary masked. validity is a list of tuple/lists # validity[mask] is (val, state, mask height, mask width) mask, valid_val = rule[0], rule[1] masked_val, mask_height, mask_width = masked( p._value, mask) masks.append( (mask, format(valid_val, '#0%db' % (mask_width+2)), name)) if mask_height > highest: highest = mask_height seg = None elif issubclass(rule.__class__, str): seg = "'%s': %s" % (rule, name) else: seg = "%s: %s" % (func(rule), name) if seg: lst.append(seg) else: # val is a 1+ dimension array lst.append(lls(t, 19)) if len(lst) > 8: lst.append('... tot. %d in dim1' % len(val)) break if highest > 0: # like '110000: 0b10 name1', '001111: 0b0110 name2'] fmt = '0%db' % (highest) lst += [format(i[0], fmt) + ' ' + i[1] + ': ' + i[2] for i in masks] if width and breakline: return '\n'.join(lst) else: return '[%s]' % ', '.join(lst)
""" Must be lowercased """ Ommitted_Valid_Rule_Names = ['valid', 'default', '', 'range']
[文档]def attrstr(p, v, missingval='', ftime=False, state=True, width=1, **kwds): """ generic string representation of an attribute of a parameter or dataset. p: parameter object. v: name of parameter attribute. '_valid', '_type', '_default', '_value' (for Parameter) or '_data' (dataset) missingval: string used when the parameter does not have the attribute. ftime: True means that attribute value will be FineTime if _type is 'finetime'. state: The state validity of the parameter is returned in place of value, if the state is not in Ommitted_Valid_Rule_Names -- 'valid', 'range', '' or 'default'. """ ts = getattr(p, '_type') if hasattr(p, '_type') else missingval if ts is None: ts = 'None' # try: # except (KeyError, AttributeError): # return missingval if not hasattr(p, v): return missingval val = getattr(p, v) if val is None: return 'None' if v in ['_type', 'description', '_unit', '_typecode']: return val if v == '_default': if ts.startswith('finetime'): vs = val.toString(width=width, **kwds) else: # for default and value/data, print list horizontally width = 0 vs = binhexstring(val, ts, width=width, **kwds) elif v == '_valid': if ts.startswith('finetime'): # print('***', v, ts) vs = binhexstring(val, 'string', width=width, v=v, **kwds) else: vs = binhexstring(val, ts, width=width, v=v, p=p, **kwds) else: # v is '_value/data' if ts.startswith('finetime'): if state: vv, vdesc = p.validate(val) if vdesc.lower() not in Ommitted_Valid_Rule_Names: vs = '%s (%s)' % ( vdesc, val.toString(width=width, **kwds)) else: vs = val.toString(width=width, **kwds) else: vs = val.toString(width=width, **kwds) elif not state or not hasattr(p, 'validate'): # for value/data, print list horizontally width = 0 vs = binhexstring(val, ts, width=width, v=v, **kwds) elif hasattr(p, 'validate'): # v is _value/data of parameter of non-finetime to be displayed with state validity = p.validate(val) if issubclass(validity.__class__, tuple): # not binary masked vv, vdesc = validity if vdesc.lower() not in Ommitted_Valid_Rule_Names: vs = '%s (%s)' % ( vdesc, binhexstring(val, ts, v=v, **kwds)) else: vs = binhexstring(val, ts, v=v, **kwds) else: # binary masked. validity is a list of tuple/lists # validity is (val, state, mask height, mask width) sep = '\n' if width else ', ' vs = sep.join(r[1] if r[1] == 'Invalid' else'%s (%s)' % (r[1], format(r[0], '#0%db' % (r[3]+2))) for r in validity) return vs
[文档]def attrstr1(p, v, missingval='', ftime=False, state=True, width=1, **kwds): """ generic string representation of an attribute of a parameter or dataset. p: parameter object. v: name of parameter attribute. '_valid', '_type', '_default', '_value' (for Parameter) or '_data' (dataset) missingval: string used when the parameter does not have the attribute. ftime: True means that attribute value will be FineTime if _type is 'finetime'. state: The state validity of the parameter is returned in place of value, if the state is not in Ommitted_Valid_Rule_Names -- 'valid', 'range', '' or 'default'. """ ts = getattr(p, '_type') if hasattr(p, '_type') else missingval if ts is None: ts = 'None' if hasattr(p, v): val = getattr(p, v) if val is None: return 'None' val_cls = val.__class__ # from ..dataset.finetime import FineTime # if issubclass(val_cls, FineTime): if ftime: # v is '_valid', '_default' or '_value/data' if ts.startswith('finetime'): # print('***', v, ts) if v == '_valid': s = binhexstring(val, 'string', v=v, **kwds) elif v == '_default': s = val.toString(width=width, **kwds) elif state: vv, vdesc = p.validate(val) if vdesc.lower() not in Ommitted_Valid_Rule_Names: s = '%s (%s)' % ( vdesc, val.toString(width=width, **kwds)) else: s = val.toString(width=width, **kwds) else: s = val.toString(width=width, **kwds) vs = s elif not state or v == '_valid' or v == '_default' or not hasattr(p, 'validate'): if v != '_valid': # for default and value/data, print list horizontally width = 0 vs = binhexstring(val, ts, width=width, v=v, **kwds) elif hasattr(p, 'validate'): # v is _value/data of parameter of non-finetime to be displayed with state validity = p.validate(val) if issubclass(validity.__class__, tuple): # not binary masked vv, vdesc = validity if vdesc.lower() not in Ommitted_Valid_Rule_Names: vs = '%s (%s)' % ( vdesc, binhexstring(val, ts, v=v, **kwds)) else: vs = binhexstring(val, ts, v=v, **kwds) else: # binary masked. validity is a list of tuple/lists # validity is (val, state, mask height, mask width) sep = '\n' if width else ', ' vs = sep.join('%s (%s)' % (r[1], format(r[0], '#0%db' % r[3])) for r in validity) else: # must be string vs = val else: vs = missingval return vs
[文档]def exprstrs(param, v='_value', extra=False, **kwds): """ Generates a set of strings for param.toString(). :param: Parameeter or xDstaset. :extra: Whether to include less often used attributes suc as ```fits_keyword```. """ if issubclass(param.__class__, dataset.metadata.Parameter): extra_attrs = copy.copy(param._all_attrs) elif issubclass(param.__class__, (dataset.arraydataset.ArrayDataset, dataset.tabledataset.TableDataset, dataset.unstructureddataset.UnstrcturedDataset)): extra_attrs = dict((n, v['default']) for n, v in param.zInfo['metadata'].items()) else: extra_attrs = {} ts = attrstr(param, '_type', **kwds) if 'typ_' in extra_attrs: extra_attrs.pop('typ_', '') else: # Dataset extra_attrs.pop('type', '') vs = attrstr(param, v, ftime=True, **kwds) extra_attrs.pop('value', '') fs = attrstr(param, '_default', ftime=True, **kwds) extra_attrs.pop('default', '') ds = attrstr(param, 'description', **kwds) extra_attrs.pop('description') gs = attrstr(param, '_valid', ftime=True, **kwds) extra_attrs.pop('valid', '') us = attrstr(param, '_unit', **kwds) extra_attrs.pop('unit', '') cs = attrstr(param, '_typecode', **kwds) extra_attrs.pop('typecode', '') return (vs, us, ts, ds, fs, gs, cs, extra_attrs)
[文档]def pathjoin(*p): """ join path segments with given separater (default '/'). Useful when '\\' is needed. """ sep = '/' r = sep.join(p).replace(sep+sep, sep) # print(p, r) return r
bldins = str.__class__.__module__
[文档]def fullname(obj): """ full class name with module name. https://stackoverflow.com/a/2020083/13472124 """ t = type(obj) if not isinstance(obj, type) else obj module = t.__module__ if module is None or module == bldins: return t.__name__ # Avoid reporting __builtin__ else: return module + '.' + t.__name__
[文档]def getObjectbyId(idn, lgbv): """ lgb is from deserializing caller's globals().values() locals().values() and built-ins """ v = lgbv for obj in v: if id(obj) == idn: return obj raise ValueError("Object not found by id %d." % (idn))
[文档]def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" # python 3.6 doc # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue)
[文档]def t2l(v): """ convert tuples to lists in nested data structures """ # print(v) if issubclass(v.__class__, (list, tuple)): y = [t2l(x) if issubclass( x.__class__, tuple) else x for x in v] # print('== ', y) return y return v
[文档]def l2t(v): """ convert lists to tuples in nested data structures """ # print(v) if issubclass(v.__class__, (list, tuple)): y = tuple(l2t(x) if issubclass( x.__class__, list) else x for x in v) # print('== ', y) return y return v
[文档]def ld2tk(v): """ convert lists, to tuples and dicts to frozensets in nested data structures array.array is converted to (typecode, itemsize, size, ld2tk(0th element)) """ # print(v) if issubclass(v.__class__, (list, tuple)): y = tuple(ld2tk(x) for x in v) # elif : # issubclass(v.__class__, (list)): # if len(v) > 128 and issubclass(v[0].__class__, (Sequence)): # y = (type(v[0]), len(v), ld2tk(v[0])) # else: # y = tuple(ld2tk(x) for x in v) elif issubclass(v.__class__, (array.array)): y = (v.typecode, v.itemsize, len(v), len(v[0]) if issubclass( v[0].__class__, Sequence) else ld2tk(v[0])) elif issubclass(v.__class__, (dict)): # print('== ', y) y = frozenset((ld2tk(k), ld2tk(v)) for k, v in v.items()) elif issubclass(v.__class__, (set)): # print('== ', y) y = frozenset(ld2tk(x) for x in v) else: y = v return y
[文档]class UserOrGroupNotFoundError(BaseException): pass
[文档]def getUidGid(username): """ returns the UID and GID of the named user. return: -1 if not available """ try: uid = pwd.getpwnam(username).pw_uid except KeyError as e: msg = 'Cannot get UserID for ' + username + \ '. check config. ' + str(e) + trbk(e) logger.error(msg) uid = -1 # UserOrGroupNotFoundError(msg).with_traceback(sys.exc_info()[2]) raise # do if platform supports. try: gid = pwd.getpwnam(username).pw_gid except KeyError as e: msg = 'Cannot get GroupID for ' + username + \ '. check config. ' + str(e) + trbk(e) gid = -1 logger.error(msg) raise return uid, gid
[文档]def findShape(data, element_seq=(str)): """ Shape of list/dict of list/dict. :element_seq: treat elements of these sequence types as scalars. """ if data is None: return None shape = [] d = data while d is not None: if issubclass(d.__class__, element_seq): d = None else: try: shape.append(len(d)) d = list(d.values())[0] if issubclass( d.__class__, Mapping) else d[0] except (TypeError, IndexError, KeyError) as e: d = None return tuple(shape)
[文档]def guess_value(input_string, parameter=False, last=str): """ Returns guessed value from a string. | input | output | | ```'None'``` | `None` | | integer | `int()` | | float | `float()` | | ```'True'```, ```'False```` | `True`, `False` | | string starting with ```'0x'``` | `hex()` | | else | run `last`(input_string) | """ from ..dataset.numericparameter import NumericParameter, BooleanParameter from ..dataset.dateparameter import DateParameter from ..dataset.stringparameter import StringParameter from ..dataset.metadata import Parameter if input_string == 'None': res = None elif input_string == '': if parameter: return StringParameter(value=input_string) else: return input_string else: try: res = int(input_string) return NumericParameter(value=res) if parameter else res except ValueError: try: res = float(input_string) return NumericParameter(value=res) if parameter else res except ValueError: # string, bytes, bool if input_string.startswith('0x'): res = bytes.fromhex(input_string[2:]) return NumericParameter(value=res) if parameter else res elif input_string in ['True', 'False']: res = bool(input_string) return BooleanParameter(value=res) if parameter else res else: res = last(input_string) return Parameter(value=res) if parameter else res return None