Coverage for src/km3pipe/time.py: 81%
70 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1# Filename: time.py
2# pylint: disable=C0103
3"""
4Manipulating time and so...
6"""
7from datetime import datetime
8import numpy as np
9import time
10from timeit import default_timer as timer
12__author__ = "Tamas Gal and Moritz Lotze"
13__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
14__credits__ = []
15__license__ = "MIT"
16__maintainer__ = "Tamas Gal and Moritz Lotze"
17__email__ = "tgal@km3net.de"
18__status__ = "Development"
21def total_seconds(td):
22 """Convert the timedelta to seconds."""
23 return td.total_seconds()
26class Timer(object):
27 """A very simple, accurate and easy to use timer context"""
29 def __init__(self, message="It", precision=3, callback=print):
30 self.message = message
31 self.precision = precision
32 self.callback = callback
34 def __enter__(self):
35 self.start()
37 def __exit__(self, exit_type, value, traceback):
38 self.stop()
40 def start(self):
41 self.__start = timer()
42 self.__start_cpu = time.process_time()
44 def stop(self):
45 self.__finish = timer()
46 self.__finish_cpu = time.process_time()
47 if self.callback is not None:
48 self.log()
49 return self.seconds
51 @property
52 def seconds(self):
53 return self.__finish - self.__start
55 @property
56 def cpu_seconds(self):
57 return self.__finish_cpu - self.__start_cpu
59 def log(self):
60 self.callback(
61 "{0} took {1:.{3}f}s (CPU {2:.{3}f}s).".format(
62 self.message, self.seconds, self.cpu_seconds, self.precision
63 )
64 )
67class Cuckoo(object):
68 "A timed callback caller, which only executes once in a given interval."
70 def __init__(self, interval=0, callback=print):
71 "Setup with interval in seconds and a callback function"
72 self.interval = interval
73 self.callback = callback
74 self.timestamp = None
76 def msg(self, *args, **kwargs):
77 "Only execute callback when interval is reached."
78 if self.timestamp is None or self._interval_reached():
79 self.callback(*args, **kwargs)
80 self.reset()
82 def reset(self):
83 "Reset the timestamp"
84 self.timestamp = datetime.now()
86 def _interval_reached(self):
87 "Check if defined interval is reached"
88 return total_seconds(datetime.now() - self.timestamp) > self.interval
90 def __call__(self, *args, **kwargs):
91 "Run the msg function when called directly."
92 self.msg(*args, **kwargs)
95def tai_timestamp():
96 """Return current TAI timestamp."""
97 timestamp = time.time()
98 date = datetime.utcfromtimestamp(timestamp)
99 if date.year < 1972:
100 return timestamp
101 offset = 10 + timestamp
102 leap_seconds = [
103 (1972, 1, 1),
104 (1972, 7, 1),
105 (1973, 1, 1),
106 (1974, 1, 1),
107 (1975, 1, 1),
108 (1976, 1, 1),
109 (1977, 1, 1),
110 (1978, 1, 1),
111 (1979, 1, 1),
112 (1980, 1, 1),
113 (1981, 7, 1),
114 (1982, 7, 1),
115 (1983, 7, 1),
116 (1985, 7, 1),
117 (1988, 1, 1),
118 (1990, 1, 1),
119 (1991, 1, 1),
120 (1992, 7, 1),
121 (1993, 7, 1),
122 (1994, 7, 1),
123 (1996, 1, 1),
124 (1997, 7, 1),
125 (1999, 1, 1),
126 (2006, 1, 1),
127 (2009, 1, 1),
128 (2012, 7, 1),
129 (2015, 7, 1),
130 (2017, 1, 1),
131 ]
132 for idx, leap_date in enumerate(leap_seconds):
133 if leap_date >= (date.year, date.month, date.day):
134 return idx - 1 + offset
135 return len(leap_seconds) - 1 + offset
138def np_to_datetime(intime):
139 """Convert numpy/pandas datetime64 to list[datetime]."""
140 nptime = np.atleast_1d(intime)
141 np_corr = (nptime - np.datetime64("1970-01-01T00:00:00")) / np.timedelta64(1, "s")
142 return [datetime.utcfromtimestamp(t) for t in np_corr]