# -*- coding: utf-8 -*-
from .serializable import Serializable
from .eq import DeepEqual
from .copyable import Copyable
# from .metadata import ParameterTypes
from ..utils import leapseconds
import datetime
from string import ascii_uppercase
from collections import OrderedDict
import logging
# create logger
logger = logging.getLogger(__name__)
# logger.debug('level %d' % (logger.getEffectiveLevel()))
utcobj = datetime.timezone.utc
[docs]class FineTime(Copyable, DeepEqual, Serializable):
""" Atomic time (SI seconds) elapsed since the TAI epoch
of 1 January 1958 UT2. The resolution is one microsecond and the
allowable range is: epoch + /-290, 000 years approximately.
This has the following advantages, compared with the standard class:
* It has better resolution(microseconds);
* ime differences are correct across leap seconds
* It is immutable unless its TAI is 0..
"""
""" The starting date in UTC """
EPOCH = datetime.datetime(1958, 1, 1, 0, 0, 0, tzinfo=utcobj)
""" number of TAI units in a second """
RESOLUTION = 1000000 # microseconds
""" The earliest time when valid leapsecond is used."""
UTC_LOW_LIMIT = datetime.datetime(1972, 1, 1, 0, 0, 0, tzinfo=utcobj)
UTC_LOW_LIMIT_TIMESTAMP = UTC_LOW_LIMIT.timestamp()
""" Format used when not specified."""
DEFAULT_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' # ISO
DEFAULT_FORMAT_SECOND = '%Y-%m-%dT%H:%M:%S'
RETURNFMT = '%s.%06d'
""" """
TIMESPEC = 'microseconds'
[docs] def __init__(self, date=None, format=None, **kwds):
""" Initiate with a UTC date or an integer TAI.
:date: time to be set to. Acceptable types:
* `int` for TAI,
* `float`, `double` for UNIX time-stamp,
* `datetime.datetime`,
* `string` for ISO format date-time,
* bytes-like classes that can get string by calling its `decode(encoding='utf-8')`
:format: ISO-8601 or some variation of it. Default is
`DEFAULT_FORMAT` and `DEFAULT_FORMAT_SECOND`.
"""
self.format = FineTime.DEFAULT_FORMAT if format is None else format
self.setTime(date)
# logger.debug('date= %s TAI = %d' % (str(date), self.tai))
super().__init__(**kwds)
@property
def time(self):
return self.getTime()
@time.setter
def time(self, time):
self.setTime(time)
[docs] def getTime(self):
""" Returns the time related to this object."""
return self.tai
[docs] def setTime(self, time):
""" Sets the time of this object.
If an integer is given, it will be taken as the TAI.
'0' and b'0' are taken as TAI=0.
If a float is given, it will be taken as the `time` time stamp (UTC).
If a datetime object or a string code is given, the timezone will be set to UTC.
A FineTime instance is immutable except when TAI == 0. Violation gets a TypeError.
"""
# setting to an impossible value
setTai = ...
if time is None:
setTai = None
elif issubclass(time.__class__, int):
setTai = time
elif issubclass(time.__class__, float):
if time < self.UTC_LOW_LIMIT_TIMESTAMP:
logger.warn(
'Timestamp before %s not defined yet.' % str(self.UTC_LOW_LIMIT_TIMESTAMP))
d = datetime.datetime.fromtimestamp(time, tz=utcobj)
setTai = self.datetimeToFineTime(d)
elif issubclass(time.__class__, datetime.datetime):
if time.tzinfo is None:
d = time.replace(tzinfo=utcobj)
else:
d = time
setTai = self.datetimeToFineTime(d)
elif issubclass(time.__class__, (str, bytes)):
t = time.strip()
if issubclass(t.__class__, bytes):
t = t.decode('ascii')
fmt = self.format
if fmt:
try:
d = datetime.datetime.strptime(t, fmt)
d1 = d.replace(tzinfo=utcobj)
setTai = self.datetimeToFineTime(d1)
except ValueError:
pass
else:
msg = ('%s must be an integer,a datetime object,'
'or a string or bytes or float, but its type is %s.' % (
str(time), type(time).__name__))
raise TypeError(msg)
if setTai is ...:
# Now t has a date-time string in it.
msg = '%s is not an integer or `datetime`' % t
fmt = FineTime.DEFAULT_FORMAT
try:
d = datetime.datetime.strptime(t, fmt)
except ValueError:
msg += '\n%s does not match %s.' % (t, fmt)
fmt = FineTime.DEFAULT_FORMAT_SECOND
try:
d = datetime.datetime.strptime(t, fmt)
except ValueError:
msg += '\n%s does not match %s.' % (t, fmt)
if ' ' in t:
fmt = FineTime.DEFAULT_FORMAT + ' %Z'
try:
d = datetime.datetime.strptime(t, fmt)
gotit = 2
except ValueError:
msg += '\n%s does not match %s.' % (t, fmt)
fmt = FineTime.DEFAULT_FORMAT + ' %z'
try:
d = datetime.datetime.strptime(t, fmt)
gotit = 2
except ValueError:
msg += '\n%s does not match %s.' % (t, fmt)
raise ValueError(msg)
logger.warning('Time zone %s assumed for %s' %
(t.rsplit(' ')[1], t))
else:
# No ' ' in time string
gotit = False
if t.endswith('Z'):
fmt = FineTime.DEFAULT_FORMAT + 'Z'
try:
d = datetime.datetime.strptime(t, fmt).replace(
tzinfo=utcobj)
gotit = 2
except ValueError:
msg += '\n%s does not match %s.' % (t, fmt)
fmt = FineTime.DEFAULT_FORMAT_SECOND + 'Z'
try:
d = datetime.datetime.strptime(t, fmt).replace(
tzinfo=utcobj)
gotit = 2
except ValueError:
msg += '\n%s does not match %s.' % (t, fmt)
pass
if not gotit:
# not ending with 'Z' or not match fmt+'Z'
fmt = FineTime.DEFAULT_FORMAT + '%Z'
try:
d = datetime.datetime.strptime(t, fmt)
except ValueError:
msg += '\n%s does not match %s.' % (t, fmt)
fmt = FineTime.DEFAULT_FORMAT + '%z'
try:
d = datetime.datetime.strptime(t, fmt)
except ValueError:
msg += '\n%s does not match %s.' % (t, fmt)
raise TypeError(msg)
if gotit != 2:
logger.warning('Time zone %s taken for %s' %
(str(d.tzname()), t))
d1 = d.replace(tzinfo=utcobj) if d.tzinfo != utcobj else d
setTai = self.datetimeToFineTime(d1)
# setTai has a value
try:
if setTai and self.tai:
raise TypeError(
'FineTime objects with non-zero TAI are immutable.')
except AttributeError:
# self.tai not exists
pass
self.tai = setTai
[docs] def microsecondsSinceEPOCH(self):
""" Return the rounded integer number of microseconds since the epoch: 1 Jan 1958. """
return int(self.tai * self.RESOLUTION / FineTime.RESOLUTION+0.5)
[docs] def subtract(self, time):
""" Subract the specified time and return the difference
in microseconds. """
return self.tai - time.tai
[docs] @ classmethod
def datetimeToFineTime(cls, dtm):
"""DateTime to FineTime conversion.
Return given Python Datetime in FineTime to the precision of
the input. Rounded to the last digit. Unit is decided by
RESOLUTION.
Parameters
----------
cls : class
dtm : DateTime
Must be time-zone aware.
Returns
-------
FineTime
converted.
"""
if dtm < cls.UTC_LOW_LIMIT:
logger.warning(
'UTC before %s not defined yet.' % str(cls.UTC_LOW_LIMIT))
leapsec = leapseconds.dTAI_UTC_from_utc(dtm)
sec = cls.RESOLUTION * ((dtm - cls.EPOCH + leapsec).total_seconds())
return int(sec+0.5)
[docs] def toDatetime(self, tai=None):
""" Return given FineTime as a Python Datetime.
tai: if not given or given as `None`, return this object's time as a Python Datetime.
"""
if tai is None:
tai = self.tai
if tai is None:
return None
tai_time = datetime.timedelta(seconds=(float(tai) / FineTime.RESOLUTION)) \
+ FineTime.EPOCH
# leapseconds is offset-native
leapsec = leapseconds.dTAI_UTC_from_tai(tai_time)
return tai_time - leapsec
# HCSS compatibility
toDate = toDatetime
[docs] def isoutc(self, format='%Y-%m-%dT%H:%M:%S.%f'):
""" Returns a String representation of this objet in ISO format without timezone. sub-second set to TIMESPEC.
If `tai is None` return `''`.
;format: time format. default '%Y-%m-%dT%H:%M:%S' prints like 2019-02-17T12:43:04.577000 """
if self.tai is None:
return 'Unknown'
dt = self.toDatetime(self.tai)
return dt.strftime(format)
[docs] def toString(self, level=0, width=0, **kwds):
""" Returns a String representation of this object according to self.format.
level: 0 prints like 2019-02-17T12:43:04.577000 TAI(...)
width: if non-zero, insert newline to break simplified output into shorter lines. For level=0 it is ``` #TODO ```
"""
tais = str(self.tai) if hasattr(
self, 'tai') and self.tai is not None else 'Unknown'
fmt = self.format
if level == 0:
if width:
tstr = self.isoutc(format=fmt) + '\n' + tais
s = tstr
else:
tstr = self.isoutc(format=fmt) + ' TAI(%s)' % tais
s = tstr + ' format=' + self.format
elif level == 1:
if width:
tstr = self.isoutc(format=fmt) + '\n%s' % tais
else:
tstr = self.isoutc(format=fmt) + ' (%s)' % tais
s = tstr
elif level == 2:
if width:
s = self.__class__.__name__ + '(' + \
self.isoutc(format=fmt).replace('T', '\n') + ')'
else:
s = self.__class__.__name__ + '(' + \
self.isoutc(format=fmt) + ')'
else:
s = tais
return s
string = toString
txt = toString
def __repr__(self):
return self.toString(level=2)
def __bool__(self):
""" `True` if `tai > 0`.
For `if` etc
"""
return bool(self.tai)
def __int__(self):
return self.tai
__index__ = __int__
def __add__(self, obj):
""" can add an integer as a TAI directly and return a new instance."""
oc = obj.__class__
sc = self.__class__
if issubclass(oc, int):
return sc(self.tai+obj, format=self.format)
else:
raise TypeError(
f'{sc.__name__} cannot add/minus {oc.__name__} {obj}')
def __sub__(self, obj):
""" can minus an integer as a TAI directly and return a new instance,
or subtract another FineTime instance and returns TAI difference in microseconds.
"""
oc = obj.__class__
sc = self.__class__
if issubclass(oc, int):
return sc(self.tai-obj, format=self.format)
elif issubclass(oc, sc):
return self.tai - obj.tai
else:
raise TypeError(f'{sc.__name__} cannot minus {oc.__name__} {obj}')
def __iadd__(self, obj):
""" can add an integer as a TAI directly to self like ```v += 3```."""
oc = obj.__class__
sc = self.__class__
if issubclass(oc, int):
self.tai += obj
else:
raise TypeError(f'{sc.__name__} cannot add/minus {oc} {obj}')
def __isub__(self, obj):
""" can subtract an integer as a TAI directly from self like ```v -= 3```."""
oc = obj.__class__
sc = self.__class__
if issubclass(oc, int):
self.tai -= obj
else:
raise TypeError(f'{sc.__name__} cannot add/minus {oc} {obj}')
def __eq__(self, obj, **kwds):
""" fast comparison using TAI. """
if obj is None or not hasattr(self, 'tai') or not hasattr(obj, 'tai'):
return False
if id(self) == id(obj):
return True
if type(self) != type(obj):
return False
return self.tai == obj.tai
def __lt__(self, obj):
""" can compare TAI directly """
if 1:
# if type(obj).__name__ in ParameterTypes.values():
return self.tai < obj
else:
return super(FineTime, self).__lt__(obj)
def __gt__(self, obj):
""" can compare TAI directly """
if 1:
# if type(obj).__name__ in ParameterTypes.values():
return self.tai > obj
else:
return super(FineTime, self).__gt__(obj)
def __le__(self, obj):
""" can compare TAI directly """
if 1:
# if type(obj).__name__ in ParameterTypes.values():
return self.tai <= obj
else:
return super(FineTime, self).__le__(obj)
def __ge__(self, obj):
""" can compare TAI directly """
if 1:
# if type(obj).__name__ in ParameterTypes.values():
return self.tai >= obj
else:
return super(FineTime, self).__ge__(obj)
def __getstate__(self):
""" Can be encoded with serializableEncoder """
return OrderedDict(tai=self.tai,
format=self.format)
[docs]class FineTime1(FineTime):
""" Same as FineTime but Epoch is 2017-1-1 0 UTC and unit is millisecond"""
EPOCH = datetime.datetime(2017, 1, 1, 0, 0, 0, tzinfo=utcobj)
RESOLUTION = 1000 # millisecond
RETURNFMT = '%s.%03d'
TIMESPEC = 'milliseconds'
TAI_AT_EPOCH = FineTime.datetimeToFineTime(EPOCH)
def __init__(self, *args, **kwds):
self.relative_res = FineTime.RESOLUTION / float(self.RESOLUTION)
super().__init__(*args, **kwds)
[docs] @ classmethod
def datetimeToFineTime(cls, dtm):
sec = (FineTime.datetimeToFineTime(dtm) -
cls.TAI_AT_EPOCH) / FineTime.RESOLUTION
# for subclasses with a different epoch
return int(sec * cls.RESOLUTION + 0.5)
[docs] def toDatetime(self, tai=None):
if tai is None:
tai = self.tai
return super().toDatetime(tai * self.relative_res + self.TAI_AT_EPOCH)