Source code for fdi.utils.common

# -*- coding: utf-8 -*-

from .masked import masked
from .ydump import ydump
from .. import dataset

import hashlib
import array
import builtins
import traceback
import pprint
import textwrap
import copy
import fnmatch
import os
from pathlib import Path
import pwd
import logging
from functools import lru_cache
from itertools import zip_longest, chain
import collections
from collections.abc import Sequence, Mapping
import sys
if sys.version_info[0] >= 3:  # + 0.1 * sys.version_info[1] >= 3.3:
    PY3 = True
else:
    PY3 = False


# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' %  (logger.getEffectiveLevel()))

logging_ERROR = logging.ERROR
logging_WARNING = logging.WARNING
logging_INFO = logging.INFO
logging_DEBUG = logging.DEBUG


[docs]def str2md5(string): return hashlib.md5(string.encode('utf8')).hexdigest()
[docs]def trbk(e): """ trace back """ ls = [x for x in traceback.extract_tb(e.__traceback__).format()] if hasattr( e, '__traceback__') else [''] return '\n'.join(ls) + ' ' + \ (e.child_traceback if hasattr(e, 'child_traceback') else '')
[docs]def trbk2(e): tb = traceback.TracebackException.from_exception(e) return ''.join(tb.stack.format())
[docs]def bstr(x, length=0, tostr=True, quote="'", yaml=False, html=False, **kwds): """ returns the best string representation. if the object is a string, return single-quoted; if has toString(), use it; else returns str(). Length limited by lls(lls) """ is_str = issubclass(x.__class__, str) if PY3 else issubclass( x.__class__, (str, unicode)) if is_str: # is a string (or unicode if not python3) r = quote + x + quote elif tostr and hasattr(x, 'toString') and not issubclass(x.__class__, type): # has toString() r = x.toString(**kwds) elif issubclass(x.__class__, (bytes, bytearray, memoryview)): r = x.hex() else: r = ydump(x) if yaml else str(x) if html: r = '<pre>%s</pre>' % r return lls(r, length=length)
[docs]def lls(s, length=80): """ length-limited string. Returns the str if len <= length or length <=3. Returns 'begin...end' if not. """ st = str(s) if len(st) <= length or length <= 3: return st else: l = int(0.8*(length-3)) return '%s...%s' % (st[:l], st[3 + l - length:])
[docs]def wls(st, width=15, fill=None, linebreak='\n', unprintable='#'): """ generates a string comtaining width-limited strings separated with '\n'. Identifies Line-breaks with `str.splitlines` https://docs.python.org/3.6/library/stdtypes.html#str.splitlines Removes trailing line-breaks. :st: input string. If not a string, ```str(st)``` is used. :width: if > 0 returns the str with `linebreak` inserted every width chars max. Default width is 15. A CJK characters occupies 2 in widths. :linebreak: line-break character. default `\n` :unprintable: substitute unprintable characters with this, only active if wide or unprintable characters are found. default is '#'. """ if not issubclass(st.__class__, str): st = str(st) if len(st) == 0: return st # wrap text. if width > 0: lines = [] for s in st.splitlines(): s = textwrap.wrap(s, width=width, replace_whitespace=False, drop_whitespace=False) lines.extend(s) return linebreak.join(lines) return linebreak.join(st.splitlines()) # wrap text. This is obsolete for tabulate since tabulate 0.8.10 if len(st.encode('utf8')) == len(st) and fill is None: lines = [] for s in st.splitlines(): if width > 0: s = textwrap.wrap(s, width=width, replace_whitespace=False, drop_whitespace=False) lines.extend(s) else: lines.append(s) lines.append(s) return linebreak.join(lines) else: # string has CJK characters return linebreak.join(st.splitlines())
# TODO: fix width breaking for CJK # return wcw_wls(st, width=width, fill=fill, # linebreak=linebreak, unprintable=unprintable)
[docs]def mstr(obj, level=0, excpt=None, indent=4, depth=0, **kwds): """ Makes a presentation string at a detail level. 'tablefmt' is needed to be passed in recursive calls under some conditions it is used. """ excp = ['_STID', 'data', '_sets'] if excpt: excp.extend(excpt) ind = ' '*indent if level == 0: if not hasattr(obj, 'items'): return bstr(obj, level=level, **kwds) if issubclass(obj.__class__, dataset.metadata.MetaData): return obj.toString(level=level, **kwds) s = ['%s= {%s}' % (mstr(k, level=level, excpt=excp, indent=indent, depth=depth+1, quote='', **kwds), mstr(v, level=level, excpt=excp, indent=indent, depth=depth+1, **kwds)) for k, v in obj.items() if k not in excp] if len(''.join(s)) < 70: sep = ', ' else: sep = ',\n' + ind*depth if depth > 0: s[0] = '\n' + ind*depth + s[0] return sep.join(s) elif level == 1: if not hasattr(obj, 'items'): # returns value of value if possible. limit to 40 char obj = obj.getValue() if hasattr(obj, 'getValue') else obj return bstr(obj, length=80, level=level, **kwds) if issubclass(obj.__class__, dataset.metadata.MetaData): return obj.toString(level=level, **kwds) + '\n' else: pat = '%s= {%s}' if depth == 0 else '%s= %s' data = obj s = [pat % (mstr(k, level=level, excpt=excp, indent=indent, depth=depth+1, quote='', **kwds), mstr(v, level=level, excpt=excp, indent=indent, depth=depth+1, **kwds)) for k, v in data.items() if k not in excp] sep = ',\n' if depth == 0 else ', ' return sep.join(s) else: if not hasattr(obj, 'items'): return mstr(obj, level=1, **kwds) s = ['%s' % (mstr(k, level=level, excpt=excp, quote='', **kwds)) for k, v in obj.items() if k not in excp] return ', '.join(s)
[docs]def binhexstring(val, typ_, width=0, v=None, p=None, level=0, **kwds): """ returns val in binary, hex, or string according to typ_. val; list of validity descriptor entries. typ_: parameter type in ``DataTypes``. """ if typ_ == 'hex': func = hex elif typ_ == 'binary': func = bin else: func = str breakline = True if not issubclass(val.__class__, list): return func(val) if v == '_valid' and p: validity = p.validate(val) lst = [] # number of bits of mask highest = 0 masks = [] for t in val: if v == '_valid': # val is for '_valid' [[], [], []..] rule, name = t[0], t[1] if issubclass(rule.__class__, (tuple, list)): # range or binary with mask. (1,95) (0B011, 011) if rule[0] < rule[1]: # not binary masked seg = "(%s, %s): %s" % (func(rule[0]), func(rule[1]), name) else: # binary masked. validity is a list of tuple/lists # validity[mask] is (val, state, mask height, mask width) mask, valid_val = rule[0], rule[1] masked_val, mask_height, mask_width = masked( p._value, mask) masks.append( (mask, format(valid_val, '#0%db' % (mask_width+2)), name)) if mask_height > highest: highest = mask_height seg = None elif issubclass(rule.__class__, str): seg = "'%s': %s" % (rule, name) else: seg = "%s: %s" % (func(rule), name) if seg: lst.append(seg) else: # val is a 1+ dimension array lst.append(lls(t, 19)) if len(lst) > 8: lst.append('... tot. %d in dim1' % len(val)) break if highest > 0: # like '110000: 0b10 name1', '001111: 0b0110 name2'] fmt = '0%db' % (highest) lst += [format(i[0], fmt) + ' ' + i[1] + ': ' + i[2] for i in masks] if width and breakline: return '\n'.join(lst) else: return '[%s]' % ', '.join(lst)
""" Must be lowercased """ Ommitted_Valid_Rule_Names = ['valid', 'default', '', 'range']
[docs]def attrstr(p, v, missingval='', ftime=False, state=True, width=1, **kwds): """ generic string representation of an attribute of a parameter or dataset. p: parameter object. v: name of parameter attribute. '_valid', '_type', '_default', '_value' (for Parameter) or '_data' (dataset) missingval: string used when the parameter does not have the attribute. ftime: True means that attribute value will be FineTime if _type is 'finetime'. state: The state validity of the parameter is returned in place of value, if the state is not in Ommitted_Valid_Rule_Names -- 'valid', 'range', '' or 'default'. """ ts = getattr(p, '_type') if hasattr(p, '_type') else missingval if ts is None: ts = 'None' # try: # except (KeyError, AttributeError): # return missingval if not hasattr(p, v): return missingval val = getattr(p, v) if val is None: return 'None' if v in ['_type', 'description', '_unit', '_typecode']: return val if v == '_default': if ts.startswith('finetime'): vs = val.toString(width=width, **kwds) else: # for default and value/data, print list horizontally width = 0 vs = binhexstring(val, ts, width=width, **kwds) elif v == '_valid': if ts.startswith('finetime'): # print('***', v, ts) vs = binhexstring(val, 'string', width=width, v=v, **kwds) else: vs = binhexstring(val, ts, width=width, v=v, p=p, **kwds) else: # v is '_value/data' if ts.startswith('finetime'): if state: vv, vdesc = p.validate(val) if vdesc.lower() not in Ommitted_Valid_Rule_Names: vs = '%s (%s)' % ( vdesc, val.toString(width=width, **kwds)) else: vs = val.toString(width=width, **kwds) else: vs = val.toString(width=width, **kwds) elif not state or not hasattr(p, 'validate'): # for value/data, print list horizontally width = 0 vs = binhexstring(val, ts, width=width, v=v, **kwds) elif hasattr(p, 'validate'): # v is _value/data of parameter of non-finetime to be displayed with state validity = p.validate(val) if issubclass(validity.__class__, tuple): # not binary masked vv, vdesc = validity if vdesc.lower() not in Ommitted_Valid_Rule_Names: vs = '%s (%s)' % ( vdesc, binhexstring(val, ts, v=v, **kwds)) else: vs = binhexstring(val, ts, v=v, **kwds) else: # binary masked. validity is a list of tuple/lists # validity is (val, state, mask height, mask width) sep = '\n' if width else ', ' vs = sep.join(r[1] if r[1] == 'Invalid' else '%s (%s)' % (r[1], format(r[0], '#0%db' % (r[3]+2))) for r in validity) return vs
[docs]def attrstr1(p, v, missingval='', ftime=False, state=True, width=1, **kwds): """ generic string representation of an attribute of a parameter or dataset. p: parameter object. v: name of parameter attribute. '_valid', '_type', '_default', '_value' (for Parameter) or '_data' (dataset) missingval: string used when the parameter does not have the attribute. ftime: True means that attribute value will be FineTime if _type is 'finetime'. state: The state validity of the parameter is returned in place of value, if the state is not in Ommitted_Valid_Rule_Names -- 'valid', 'range', '' or 'default'. """ ts = getattr(p, '_type') if hasattr(p, '_type') else missingval if ts is None: ts = missingval if hasattr(p, v): val = getattr(p, v) if val is None: return missingval val_cls = val.__class__ # from ..dataset.finetime import FineTime # if issubclass(val_cls, FineTime): if ftime: # v is '_valid', '_default' or '_value/data' if ts.startswith('finetime'): # print('***', v, ts) if v == '_valid': s = binhexstring(val, 'string', v=v, **kwds) elif v == '_default': s = val.toString(width=width, **kwds) elif state: vv, vdesc = p.validate(val) if vdesc.lower() not in Ommitted_Valid_Rule_Names: s = '%s (%s)' % ( vdesc, val.toString(width=width, **kwds)) else: s = val.toString(width=width, **kwds) else: s = val.toString(width=width, **kwds) vs = s elif not state or v == '_valid' or v == '_default' or not hasattr(p, 'validate'): if v != '_valid': # for default and value/data, print list horizontally width = 0 vs = binhexstring(val, ts, width=width, v=v, **kwds) elif hasattr(p, 'validate'): # v is _value/data of parameter of non-finetime to be displayed with state validity = p.validate(val) if issubclass(validity.__class__, tuple): # not binary masked vv, vdesc = validity if vdesc.lower() not in Ommitted_Valid_Rule_Names: vs = '%s (%s)' % ( vdesc, binhexstring(val, ts, v=v, **kwds)) else: vs = binhexstring(val, ts, v=v, **kwds) else: # binary masked. validity is a list of tuple/lists # validity is (val, state, mask height, mask width) sep = '\n' if width else ', ' vs = sep.join('%s (%s)' % (r[1], format(r[0], '#0%db' % r[3])) for r in validity) else: # must be string vs = val else: vs = missingval return vs
[docs]def exprstrs(param, v='_value', extra=False, **kwds): """ Generates a set of strings for param.toString(). :param: Parameter or xDstaset. :extra: Whether to include less often used attributes such as ```fits_keyword```. """ if issubclass(param.__class__, dataset.metadata.Parameter): extra_attrs = copy.copy(param._all_attrs) elif issubclass(param.__class__, (dataset.arraydataset.ArrayDataset, dataset.tabledataset.TableDataset, dataset.unstructureddataset.UnstructuredDataset)): # if v['default'] else '') extra_attrs = dict((n, v['default']) for n, v in param.zInfo['metadata'].items()) else: extra_attrs = {} ts = attrstr(param, '_type', **kwds) if 'typ_' in extra_attrs: extra_attrs.pop('typ_', '') else: # Dataset extra_attrs.pop('type', '') vs = attrstr(param, v, ftime=True, **kwds) extra_attrs.pop('value', '') fs = attrstr(param, '_default', ftime=True, **kwds) extra_attrs.pop('default', '') ds = attrstr(param, 'description', **kwds) extra_attrs.pop('description') gs = attrstr(param, '_valid', ftime=True, **kwds) extra_attrs.pop('valid', '') us = attrstr(param, '_unit', **kwds) extra_attrs.pop('unit', '') cs = attrstr(param, '_typecode', **kwds) extra_attrs.pop('typecode', '') return (vs, us, ts, ds, fs, gs, cs, extra_attrs)
[docs]def pathjoin(*p): """ join path segments with given separater (default '/'). Useful when '\\' is needed. """ sep = '/' r = sep.join(p).replace(sep+sep, sep) # print(p, r) return r
bldins = str.__class__.__module__
[docs]def fullname(obj): """ full class name with module name. https://stackoverflow.com/a/2020083/13472124 """ t = type(obj) if not isinstance(obj, type) else obj module = t.__module__ if module is None or module == bldins: return t.__name__ # Avoid reporting __builtin__ else: return module + '.' + t.__name__
[docs]def getObjectbyId(idn, lgbv): """ lgb is from deserializing caller's globals().values() locals().values() and built-ins """ v = lgbv for obj in v: if id(obj) == idn: return obj raise ValueError("Object not found by id %d." % (idn))
[docs]def grouper(iterable, n, fillvalue=None): "Collect data into fixed-length chunks or blocks" # python 3.6 doc # grouper('ABCDEFG', 3, 'x') --> ABC DEF Gxx" args = [iter(iterable)] * n return zip_longest(*args, fillvalue=fillvalue)
[docs]def t2l(v): """ convert tuples to lists in nested data structures """ # print(v) if issubclass(v.__class__, (list, tuple)): y = [t2l(x) if issubclass( x.__class__, tuple) else x for x in v] # print('== ', y) return y return v
[docs]def l2t(v): """ convert lists to tuples in nested data structures """ # print(v) if issubclass(v.__class__, (list, tuple)): y = tuple(l2t(x) if issubclass( x.__class__, list) else x for x in v) # print('== ', y) return y return v
[docs]def ld2tk(v): """ convert lists, to tuples and dicts to frozensets in nested data structures array.array is converted to (typecode, itemsize, size, ld2tk(0th element)) """ # print(v) if issubclass(v.__class__, (list, tuple)): y = tuple(ld2tk(x) for x in v) # elif : # issubclass(v.__class__, (list)): # if len(v) > 128 and issubclass(v[0].__class__, (Sequence)): # y = (type(v[0]), len(v), ld2tk(v[0])) # else: # y = tuple(ld2tk(x) for x in v) elif issubclass(v.__class__, (array.array)): y = (v.typecode, v.itemsize, len(v), len(v[0]) if issubclass( v[0].__class__, Sequence) else ld2tk(v[0])) elif issubclass(v.__class__, (dict)): # print('== ', y) y = frozenset((ld2tk(k), ld2tk(v)) for k, v in v.items()) elif issubclass(v.__class__, (set)): # print('== ', y) y = frozenset(ld2tk(x) for x in v) else: y = v return y
[docs]class UserOrGroupNotFoundError(BaseException): pass
[docs]def getUidGid(username): """ returns the UID and GID of the named user. return: -1 if not available """ try: uid = pwd.getpwnam(username).pw_uid except KeyError as e: msg = 'Cannot get UserID for ' + username + \ '. check config. ' + str(e) + trbk(e) logger.error(msg) uid = -1 # UserOrGroupNotFoundError(msg).with_traceback(sys.exc_info()[2]) raise # do if platform supports. try: gid = pwd.getpwnam(username).pw_gid except KeyError as e: msg = 'Cannot get GroupID for ' + username + \ '. check config. ' + str(e) + trbk(e) gid = -1 logger.error(msg) raise return uid, gid
[docs]def findShape(data, element_seq=(str)): """ Shape of list/dict of list/dict. :element_seq: treat elements of these sequence types as scalars. """ if data is None: return None shape = [] d = data while d is not None: if issubclass(d.__class__, element_seq): d = None else: try: shape.append(len(d)) d = list(d.values())[0] if issubclass( d.__class__, Mapping) else d[0] except (TypeError, IndexError, KeyError) as e: d = None return shape
[docs]def find_all_files(datadir, verbose=False, include=None, exclude=None, not_if=None, absdir=False): """ returns a list of names of all files in `datadir`. :name: of starting directory or a list of file name strings to filter. :include: only if a file name has any of these sub-strings. format is as if used in `glob(include)`. :exclude: only if a file name has not any of these sub-strings. Empty strings are removed. :not_if: a function that returns true if given a name of unwanted file. default is None, (which excludes directories when `datadir` is a string, and disabled if `datadir` is a `list`. :absdir: Set to True to return absolute_paths. """ allf = [] if not include: include = '*' isadir = issubclass(datadir.__class__, str) if isadir: if not_if is None: not_if = os.path.isdir inc = Path(datadir).glob(include) inc = list(str(f) for f in inc) else: inc = fnmatch.filter(datadir, include) # print("find", len(inc)) if exclude is None: exclude = [] ab = os.path.abspath if absdir else lambda x: x allf = list(ab(f) for f in inc if not any( e in f for e in exclude if e != '') and not (isadir and not_if(f))) # for root, dirs, files in os.walk(datadir): # if verbose: # print("In ", root, "...", end=" ") # print("find", len(files), "non-dir files", end=' ') # print("and", len(dirs), "dirs") # allf += [os.path.join(root, f) # for f in files if ok(f, include, exclude)] if verbose: print('Find %d files total.' % len(allf)) return allf
########### old code grave yard ############ """ https://stackoverflow.com/a/2718268 LHan = [[0x2E80, 0x2E99], # Han # So [26] CJK RADICAL REPEAT, CJK RADICAL RAP # Han # So [89] CJK RADICAL CHOKE, CJK RADICAL C-SIMPLIFIED TURTLE [0x2E9B, 0x2EF3], [0x2F00, 0x2FD5], # Han # So [214] KANGXI RADICAL ONE, KANGXI RADICAL FLUTE 0x3005, # Han # Lm IDEOGRAPHIC ITERATION MARK 0x3007, # Han # Nl IDEOGRAPHIC NUMBER ZERO # Han # Nl [9] HANGZHOU NUMERAL ONE, HANGZHOU NUMERAL NINE [0x3021, 0x3029], # Han # Nl [3] HANGZHOU NUMERAL TEN, HANGZHOU NUMERAL THIRTY [0x3038, 0x303A], 0x303B, # Han # Lm VERTICAL IDEOGRAPHIC ITERATION MARK # Han # Lo [6582] CJK UNIFIED IDEOGRAPH-3400, CJK UNIFIED IDEOGRAPH-4DB5 [0x3400, 0x4DB5], # Han # Lo [20932] CJK UNIFIED IDEOGRAPH-4E00, CJK UNIFIED IDEOGRAPH-9FC3 [0x4E00, 0x9FC3], # Han # Lo [302] CJK COMPATIBILITY IDEOGRAPH-F900, CJK COMPATIBILITY IDEOGRAPH-FA2D [0xF900, 0xFA2D], # Han # Lo [59] CJK COMPATIBILITY IDEOGRAPH-FA30, CJK COMPATIBILITY IDEOGRAPH-FA6A [0xFA30, 0xFA6A], # Han # Lo [106] CJK COMPATIBILITY IDEOGRAPH-FA70, CJK COMPATIBILITY IDEOGRAPH-FAD9 [0xFA70, 0xFAD9], # Han # Lo [42711] CJK UNIFIED IDEOGRAPH-20000, CJK UNIFIED IDEOGRAPH-2A6D6 [0x20000, 0x2A6D6], [0x2F800, 0x2FA1D]] # Han # Lo [542] CJK COMPATIBILITY IDEOGRAPH-2F800, CJK COMPATIBILITY IDEOGRAPH-2FA1D """ # @lru_cache(maxsize=128) # def wcw(char): # # cached width function # from ..dataset.metadata import wcwidth # return wcwidth.wcwidth(char) # def wcw_wls(st, width=15, fill=None, linebreak='\n', unprintable='#'): # # for CJK this is obsolete with the CJK code in `wls` for tabulate # line = [] # for s in st.splitlines(): # lens = len(s) # # starting index for current line based on the last line # lasti = 0 # # display length starting from the beginning of the last line. # l = 0 # for i, c in enumerate(s): # w = wcw(c) # l0 = l # if w == -1: # # change unprintable # # ref https://wcwidth.readthedocs.io/en/latest/api.html # c = unprintable # w = wcw(c) # l += w # else: # l += w # # print(i, c, l, lasti, s) # if l == width: # line.append(c) # line.append(linebreak) # lasti, l = i+1, 0 # elif l > width: # if width < 2: # # print wide characters even they are too wide for width==1 # line.append(c) # line.append(linebreak) # lasti = i+1 # l = 0 # else: # # set line pointer to this char # if fill: # line.append((width-l0) * fill) # line.append(linebreak) # line.append(c) # lasti = i # l = w # else: # line.append(c) # if len(line) == 0 or line[-1] != '\n': # if fill: # line.append((width-l) * fill) # line.append(linebreak) # # print('*****', line) # end = len(linebreak) # return ''.join(line[:-end])