Coverage for src/km3pipe/tools.py: 57%
221 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-03-29 03:15 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-03-29 03:15 +0000
1# Filename: tools.py
2# pylint: disable=C0103
3"""
4Some unsorted, frequently used logic.
6"""
7import base64
8import collections
9from collections.abc import Mapping
10from datetime import datetime, timedelta
11import functools
12import os
13import re
14import socket
15import subprocess
16import sys
17import smtplib
18import getpass
20import numpy as np
22__author__ = "Tamas Gal and Moritz Lotze"
23__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
24__credits__ = ["Konstantin Lepa <konstantin.lepa@gmail.com> for termcolor"]
25__license__ = "MIT"
26__maintainer__ = "Tamas Gal and Moritz Lotze"
27__email__ = "tgal@km3net.de"
28__status__ = "Development"
30XROOTD_BASE = "root://ccxroot:1999"
32File = collections.namedtuple("File", field_names=["path", "size"])
35def ifiles(irods_path):
36 """Return a list of File instances for the given iRODS path (recursively).
38 The File instances offer `.path` and `.size` attributes.
39 """
40 if not iexists(irods_path):
41 return []
42 raw_output = subprocess.check_output("ils -lr {0}".format(irods_path), shell=True)
43 filenames = {}
44 base = irods_path
45 for line in raw_output.splitlines():
46 split_line = line.decode("ascii").strip().split()
47 if len(split_line) == 1 and split_line[0].endswith(":"):
48 base = split_line[0][:-1] # remove trailing ':'
49 continue
50 if len(split_line) == 2 and split_line[0] == "C-":
51 base = split_line[1]
52 continue
53 try:
54 fsize = int(split_line[3])
55 fname = split_line[6]
56 except IndexError:
57 import pdb
59 pdb.set_trace()
60 fpath = os.path.join(base, fname)
61 filenames[fpath] = File(path=fpath, size=fsize)
62 return list(filenames.values())
65def iexists(irods_path):
66 """Returns True of iRODS path exists, otherwise False"""
67 try:
68 subprocess.check_output(
69 "ils {}".format(irods_path),
70 shell=True,
71 stderr=subprocess.PIPE,
72 )
73 return True
74 except subprocess.CalledProcessError:
75 return False
78def isize(irods_path):
79 """Returns the size in bytes of the most recent version of the file"""
80 raw_output = subprocess.check_output(
81 "ils -l {} | tail -n1 |awk '{{print $4}}'".format(irods_path), shell=True
82 )
83 try:
84 return int(raw_output.decode("ascii").strip())
85 except ValueError:
86 raise IOError("File not found or an iRODS error occured.")
89def xrdsize(xrootd_path):
90 """Returns the size in bytes of the file"""
91 base, path = re.search(r"(root://.*:[0-9]*)(/.*)", xrootd_path).groups()
93 raw_output = subprocess.check_output(
94 "xrdfs {} stat {} | grep Size | awk '{{print $2}}'".format(base, path),
95 shell=True,
96 )
97 try:
98 return int(raw_output.decode("ascii").strip())
99 except ValueError:
100 raise IOError("File not found or an xrootd error occured.")
103def xrootd_path(det_id, run_id):
104 """Return the xrootd path of a data file"""
105 base = "root://ccxroot:1999//hpss/in2p3.fr/group/km3net/data/raw/sea"
106 suffix = "KM3NeT_{:08d}/{}/KM3NeT_{:08d}_{:08d}.root".format(
107 det_id, int(run_id / 1000), det_id, run_id
108 )
109 return os.path.join(base, suffix)
112def token_urlsafe(nbytes=32):
113 """Return a random URL-safe text string, in Base64 encoding.
115 This is taken and slightly modified from the Python 3.6 stdlib.
117 The string has *nbytes* random bytes. If *nbytes* is ``None``
118 or not supplied, a reasonable default is used.
120 >>> token_urlsafe(16) #doctest:+SKIP
121 'Drmhze6EPcv0fN_81Bj-nA'
123 """
124 tok = os.urandom(nbytes)
125 return base64.urlsafe_b64encode(tok).rstrip(b"=").decode("ascii")
128def prettyln(text, fill="-", align="^", prefix="[ ", suffix=" ]", length=69):
129 """Wrap `text` in a pretty line with maximum length."""
130 text = "{prefix}{0}{suffix}".format(text, prefix=prefix, suffix=suffix)
131 print(
132 "{0:{fill}{align}{length}}".format(text, fill=fill, align=align, length=length)
133 )
136def irods_path(det_id, run_id):
137 """Generate the iRODS filepath for given detector (O)ID and run ID"""
138 data_path = "/in2p3/km3net/data/raw/sea"
140 return data_path + "/KM3NeT_{0:08}/{2}/KM3NeT_{0:08}_{1:08}.root".format(
141 det_id, run_id, run_id // 1000
142 )
145def unpack_nfirst(seq, nfirst, callback=None):
146 """Unpack the nfrist items from the list and return the rest.
148 >>> a, b, c, rest = unpack_nfirst((1, 2, 3, 4, 5), 3)
149 >>> a, b, c
150 (1, 2, 3)
151 >>> rest
152 (4, 5)
154 """
155 if callback is None:
156 callback = lambda x: x
157 iterator = iter(seq)
158 for _ in range(nfirst):
159 yield callback(next(iterator, None))
160 yield tuple(iterator)
163def split(string, callback=None, sep=None):
164 """Split the string and execute the callback function on each part.
166 >>> string = "1 2 3 4"
167 >>> parts = split(string, int)
168 >>> parts
169 [1, 2, 3, 4]
171 """
172 if callback is not None:
173 return [callback(i) for i in string.split(sep)]
174 else:
175 return string.split(sep)
178def namedtuple_with_defaults(typename, field_names, default_values=[]):
179 """Create a namedtuple with default values
181 Examples
182 --------
183 >>> Node = namedtuple_with_defaults('Node', 'val left right')
184 >>> Node()
185 Node(val=None, left=None, right=None)
186 >>> Node = namedtuple_with_defaults('Node', 'val left right', [1, 2, 3])
187 >>> Node()
188 Node(val=1, left=2, right=3)
189 >>> Node = namedtuple_with_defaults('Node', 'val left right', {'right':7})
190 >>> Node()
191 Node(val=None, left=None, right=7)
192 >>> Node(4)
193 Node(val=4, left=None, right=7)
195 """
196 the_tuple = collections.namedtuple(typename, field_names)
197 the_tuple.__new__.__defaults__ = (None,) * len(the_tuple._fields)
198 if isinstance(default_values, Mapping):
199 prototype = the_tuple(**default_values)
200 else:
201 prototype = the_tuple(*default_values)
202 the_tuple.__new__.__defaults__ = tuple(prototype)
203 return the_tuple
206def remain_file_pointer(function):
207 """Remain the file pointer position after calling the decorated function
209 This decorator assumes that the last argument is the file handler.
211 """
213 def wrapper(*args, **kwargs):
214 """Wrap the function and remain its parameters and return values"""
215 file_obj = args[-1]
216 old_position = file_obj.tell()
217 return_value = function(*args, **kwargs)
218 file_obj.seek(old_position, 0)
219 return return_value
221 return wrapper
224def itervalues(d):
225 return iter(d.values())
228def iteritems(d):
229 return iter(d.items())
232def decamelise(text):
233 """Convert CamelCase to lower_and_underscore."""
234 s = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", text)
235 return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s).lower()
238def camelise(text, capital_first=True):
239 """Convert lower_underscore to CamelCase."""
241 def camelcase():
242 if not capital_first:
243 yield str.lower
244 while True:
245 yield str.capitalize
247 if istype(text, "unicode"):
248 text = text.encode("utf8")
249 c = camelcase()
250 return "".join(next(c)(x) if x else "_" for x in text.split("_"))
253ATTRIBUTES = dict(
254 list(
255 zip(
256 ["bold", "dark", "", "underline", "blink", "", "reverse", "concealed"],
257 list(range(1, 9)),
258 )
259 )
260)
261del ATTRIBUTES[""]
263ATTRIBUTES_RE = r"\033\[(?:%s)m" % "|".join(["%d" % v for v in ATTRIBUTES.values()])
265HIGHLIGHTS = dict(
266 list(
267 zip(
268 [
269 "on_grey",
270 "on_red",
271 "on_green",
272 "on_yellow",
273 "on_blue",
274 "on_magenta",
275 "on_cyan",
276 "on_white",
277 ],
278 list(range(40, 48)),
279 )
280 )
281)
283HIGHLIGHTS_RE = r"\033\[(?:%s)m" % "|".join(["%d" % v for v in HIGHLIGHTS.values()])
285COLORS = dict(
286 list(
287 zip(
288 [
289 "grey",
290 "red",
291 "green",
292 "yellow",
293 "blue",
294 "magenta",
295 "cyan",
296 "white",
297 ],
298 list(range(30, 38)),
299 )
300 )
301)
303COLORS_RE = r"\033\[(?:%s)m" % "|".join(["%d" % v for v in COLORS.values()])
305RESET = r"\033[0m"
306RESET_RE = r"\033\[0m"
309def colored(text, color=None, on_color=None, attrs=None, ansi_code=None):
310 """Colorize text, while stripping nested ANSI color sequences.
312 Author: Konstantin Lepa <konstantin.lepa@gmail.com> / termcolor
314 Available text colors:
315 red, green, yellow, blue, magenta, cyan, white.
316 Available text highlights:
317 on_red, on_green, on_yellow, on_blue, on_magenta, on_cyan, on_white.
318 Available attributes:
319 bold, dark, underline, blink, reverse, concealed.
320 Example:
321 colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink'])
322 colored('Hello, World!', 'green')
323 """
324 if os.getenv("ANSI_COLORS_DISABLED") is None:
325 if ansi_code is not None:
326 return "\033[38;5;{}m{}\033[0m".format(ansi_code, text)
327 fmt_str = "\033[%dm%s"
328 if color is not None:
329 text = re.sub(COLORS_RE + "(.*?)" + RESET_RE, r"\1", text)
330 text = fmt_str % (COLORS[color], text)
331 if on_color is not None:
332 text = re.sub(HIGHLIGHTS_RE + "(.*?)" + RESET_RE, r"\1", text)
333 text = fmt_str % (HIGHLIGHTS[on_color], text)
334 if attrs is not None:
335 text = re.sub(ATTRIBUTES_RE + "(.*?)" + RESET_RE, r"\1", text)
336 for attr in attrs:
337 text = fmt_str % (ATTRIBUTES[attr], text)
338 return text + RESET
339 else:
340 return text
343def cprint(text, color=None, on_color=None, attrs=None):
344 """Print colorize text.
346 Author: Konstantin Lepa <konstantin.lepa@gmail.com> / termcolor
348 It accepts arguments of print function.
349 """
350 print((colored(text, color, on_color, attrs)))
353def issorted(arr):
354 """Check if array is sorted."""
355 return np.all(np.diff(arr) >= 0)
358def lstrip(text):
359 """Remove leading whitespace from each line of a multiline string."""
360 return "\n".join(l.lstrip() for l in text.lstrip().split("\n"))
363def chunks(l, n):
364 """Yield successive n-sized chunks from l."""
365 for i in range(0, len(l), n):
366 yield l[i : i + n]
369def is_coherent(seq):
370 """Find out if list of subsequent integers is complete.
372 Adapted from https://stackoverflow.com/questions/18131741/python-find-out-whether-a-list-of-integers-is-coherent
374 ```
375 is_coherent([1, 2, 3, 4, 5]) -> True
376 is_coherent([1, 3, 4, 5]) -> False
377 ```
378 """
379 return np.array_equal(seq, range(seq[0], int(seq[-1] + 1)))
382def zero_pad(m, n=1):
383 """Pad a matrix with zeros, on all sides."""
384 return np.pad(m, (n, n), mode="constant", constant_values=[0])
387def istype(obj, typename):
388 """Drop-in replacement for `isinstance` to avoid imports"""
389 return type(obj).__name__ == typename
392def isnotebook():
393 """Check if running within a Jupyter notebook"""
394 try:
395 shell = get_ipython().__class__.__name__
396 if shell == "ZMQInteractiveShell":
397 return True # Jupyter notebook or qtconsole
398 elif shell == "TerminalInteractiveShell":
399 return False # Terminal running IPython
400 else:
401 return False # Other type (?)
402 except NameError:
403 return False
406def supports_color():
407 """Checks if the terminal supports color."""
408 if isnotebook():
409 return True
410 supported_platform = sys.platform != "win32" or "ANSICON" in os.environ
411 is_a_tty = hasattr(sys.stdout, "isatty") and sys.stdout.isatty()
413 if not supported_platform or not is_a_tty:
414 return False
416 return True
419def get_jpp_version(via_command="JPrint -v"):
420 """Return the Jpp version or None if not available."""
421 try:
422 out = subprocess.getoutput(via_command)
423 except AttributeError: # TODO: python 2.7
424 try:
425 out = subprocess.check_output(via_command.split(), stderr=subprocess.STDOUT)
426 except OSError:
427 return None
429 for line in out.split("\n"):
430 if line.startswith("version:"):
431 jpp_version = line.split(":")[1].strip()
432 return jpp_version
434 return None
437def timed_cache(**timed_cache_kwargs):
438 """LRU cache decorator with timeout.
440 Parameters
441 ----------
442 days: int
443 seconds: int
444 microseconds: int
445 milliseconds: int
446 minutes: int
447 hours: int
448 weeks: int
449 maxsise: int [default: 128]
450 typed: bool [default: False]
451 """
453 def _wrapper(f):
454 maxsize = timed_cache_kwargs.pop("maxsize", 128)
455 typed = timed_cache_kwargs.pop("typed", False)
456 update_delta = timedelta(**timed_cache_kwargs)
457 # nonlocal workaround to support Python 2
458 # https://technotroph.wordpress.com/2012/10/01/python-closures-and-the-python-2-7-nonlocal-solution/
459 d = {"next_update": datetime.utcnow() - update_delta}
460 try:
461 f = functools.lru_cache(maxsize=maxsize, typed=typed)(f)
462 except AttributeError:
463 print(
464 "LRU caching is not available in Pyton 2.7, "
465 "this will have no effect!"
466 )
467 pass
469 @functools.wraps(f)
470 def _wrapped(*args, **kwargs):
471 now = datetime.utcnow()
472 if now >= d["next_update"]:
473 try:
474 f.cache_clear()
475 except AttributeError:
476 pass
477 d["next_update"] = now + update_delta
478 return f(*args, **kwargs)
480 return _wrapped
482 return _wrapper
485def sendmail(to, msg):
486 """Send an email"""
487 sender = "{}@{}".format(getpass.getuser(), socket.gethostname())
488 s = smtplib.SMTP("localhost")
489 s.sendmail(sender, to, msg)
490 s.quit()