Coverage for src/km3pipe/tools.py: 57%

221 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-03-29 03:15 +0000

1# Filename: tools.py 

2# pylint: disable=C0103 

3""" 

4Some unsorted, frequently used logic. 

5 

6""" 

7import base64 

8import collections 

9from collections.abc import Mapping 

10from datetime import datetime, timedelta 

11import functools 

12import os 

13import re 

14import socket 

15import subprocess 

16import sys 

17import smtplib 

18import getpass 

19 

20import numpy as np 

21 

22__author__ = "Tamas Gal and Moritz Lotze" 

23__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

24__credits__ = ["Konstantin Lepa <konstantin.lepa@gmail.com> for termcolor"] 

25__license__ = "MIT" 

26__maintainer__ = "Tamas Gal and Moritz Lotze" 

27__email__ = "tgal@km3net.de" 

28__status__ = "Development" 

29 

30XROOTD_BASE = "root://ccxroot:1999" 

31 

32File = collections.namedtuple("File", field_names=["path", "size"]) 

33 

34 

35def ifiles(irods_path): 

36 """Return a list of File instances for the given iRODS path (recursively). 

37 

38 The File instances offer `.path` and `.size` attributes. 

39 """ 

40 if not iexists(irods_path): 

41 return [] 

42 raw_output = subprocess.check_output("ils -lr {0}".format(irods_path), shell=True) 

43 filenames = {} 

44 base = irods_path 

45 for line in raw_output.splitlines(): 

46 split_line = line.decode("ascii").strip().split() 

47 if len(split_line) == 1 and split_line[0].endswith(":"): 

48 base = split_line[0][:-1] # remove trailing ':' 

49 continue 

50 if len(split_line) == 2 and split_line[0] == "C-": 

51 base = split_line[1] 

52 continue 

53 try: 

54 fsize = int(split_line[3]) 

55 fname = split_line[6] 

56 except IndexError: 

57 import pdb 

58 

59 pdb.set_trace() 

60 fpath = os.path.join(base, fname) 

61 filenames[fpath] = File(path=fpath, size=fsize) 

62 return list(filenames.values()) 

63 

64 

65def iexists(irods_path): 

66 """Returns True of iRODS path exists, otherwise False""" 

67 try: 

68 subprocess.check_output( 

69 "ils {}".format(irods_path), 

70 shell=True, 

71 stderr=subprocess.PIPE, 

72 ) 

73 return True 

74 except subprocess.CalledProcessError: 

75 return False 

76 

77 

78def isize(irods_path): 

79 """Returns the size in bytes of the most recent version of the file""" 

80 raw_output = subprocess.check_output( 

81 "ils -l {} | tail -n1 |awk '{{print $4}}'".format(irods_path), shell=True 

82 ) 

83 try: 

84 return int(raw_output.decode("ascii").strip()) 

85 except ValueError: 

86 raise IOError("File not found or an iRODS error occured.") 

87 

88 

89def xrdsize(xrootd_path): 

90 """Returns the size in bytes of the file""" 

91 base, path = re.search(r"(root://.*:[0-9]*)(/.*)", xrootd_path).groups() 

92 

93 raw_output = subprocess.check_output( 

94 "xrdfs {} stat {} | grep Size | awk '{{print $2}}'".format(base, path), 

95 shell=True, 

96 ) 

97 try: 

98 return int(raw_output.decode("ascii").strip()) 

99 except ValueError: 

100 raise IOError("File not found or an xrootd error occured.") 

101 

102 

103def xrootd_path(det_id, run_id): 

104 """Return the xrootd path of a data file""" 

105 base = "root://ccxroot:1999//hpss/in2p3.fr/group/km3net/data/raw/sea" 

106 suffix = "KM3NeT_{:08d}/{}/KM3NeT_{:08d}_{:08d}.root".format( 

107 det_id, int(run_id / 1000), det_id, run_id 

108 ) 

109 return os.path.join(base, suffix) 

110 

111 

112def token_urlsafe(nbytes=32): 

113 """Return a random URL-safe text string, in Base64 encoding. 

114 

115 This is taken and slightly modified from the Python 3.6 stdlib. 

116 

117 The string has *nbytes* random bytes. If *nbytes* is ``None`` 

118 or not supplied, a reasonable default is used. 

119 

120 >>> token_urlsafe(16) #doctest:+SKIP 

121 'Drmhze6EPcv0fN_81Bj-nA' 

122 

123 """ 

124 tok = os.urandom(nbytes) 

125 return base64.urlsafe_b64encode(tok).rstrip(b"=").decode("ascii") 

126 

127 

128def prettyln(text, fill="-", align="^", prefix="[ ", suffix=" ]", length=69): 

129 """Wrap `text` in a pretty line with maximum length.""" 

130 text = "{prefix}{0}{suffix}".format(text, prefix=prefix, suffix=suffix) 

131 print( 

132 "{0:{fill}{align}{length}}".format(text, fill=fill, align=align, length=length) 

133 ) 

134 

135 

136def irods_path(det_id, run_id): 

137 """Generate the iRODS filepath for given detector (O)ID and run ID""" 

138 data_path = "/in2p3/km3net/data/raw/sea" 

139 

140 return data_path + "/KM3NeT_{0:08}/{2}/KM3NeT_{0:08}_{1:08}.root".format( 

141 det_id, run_id, run_id // 1000 

142 ) 

143 

144 

145def unpack_nfirst(seq, nfirst, callback=None): 

146 """Unpack the nfrist items from the list and return the rest. 

147 

148 >>> a, b, c, rest = unpack_nfirst((1, 2, 3, 4, 5), 3) 

149 >>> a, b, c 

150 (1, 2, 3) 

151 >>> rest 

152 (4, 5) 

153 

154 """ 

155 if callback is None: 

156 callback = lambda x: x 

157 iterator = iter(seq) 

158 for _ in range(nfirst): 

159 yield callback(next(iterator, None)) 

160 yield tuple(iterator) 

161 

162 

163def split(string, callback=None, sep=None): 

164 """Split the string and execute the callback function on each part. 

165 

166 >>> string = "1 2 3 4" 

167 >>> parts = split(string, int) 

168 >>> parts 

169 [1, 2, 3, 4] 

170 

171 """ 

172 if callback is not None: 

173 return [callback(i) for i in string.split(sep)] 

174 else: 

175 return string.split(sep) 

176 

177 

178def namedtuple_with_defaults(typename, field_names, default_values=[]): 

179 """Create a namedtuple with default values 

180 

181 Examples 

182 -------- 

183 >>> Node = namedtuple_with_defaults('Node', 'val left right') 

184 >>> Node() 

185 Node(val=None, left=None, right=None) 

186 >>> Node = namedtuple_with_defaults('Node', 'val left right', [1, 2, 3]) 

187 >>> Node() 

188 Node(val=1, left=2, right=3) 

189 >>> Node = namedtuple_with_defaults('Node', 'val left right', {'right':7}) 

190 >>> Node() 

191 Node(val=None, left=None, right=7) 

192 >>> Node(4) 

193 Node(val=4, left=None, right=7) 

194 

195 """ 

196 the_tuple = collections.namedtuple(typename, field_names) 

197 the_tuple.__new__.__defaults__ = (None,) * len(the_tuple._fields) 

198 if isinstance(default_values, Mapping): 

199 prototype = the_tuple(**default_values) 

200 else: 

201 prototype = the_tuple(*default_values) 

202 the_tuple.__new__.__defaults__ = tuple(prototype) 

203 return the_tuple 

204 

205 

206def remain_file_pointer(function): 

207 """Remain the file pointer position after calling the decorated function 

208 

209 This decorator assumes that the last argument is the file handler. 

210 

211 """ 

212 

213 def wrapper(*args, **kwargs): 

214 """Wrap the function and remain its parameters and return values""" 

215 file_obj = args[-1] 

216 old_position = file_obj.tell() 

217 return_value = function(*args, **kwargs) 

218 file_obj.seek(old_position, 0) 

219 return return_value 

220 

221 return wrapper 

222 

223 

224def itervalues(d): 

225 return iter(d.values()) 

226 

227 

228def iteritems(d): 

229 return iter(d.items()) 

230 

231 

232def decamelise(text): 

233 """Convert CamelCase to lower_and_underscore.""" 

234 s = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", text) 

235 return re.sub("([a-z0-9])([A-Z])", r"\1_\2", s).lower() 

236 

237 

238def camelise(text, capital_first=True): 

239 """Convert lower_underscore to CamelCase.""" 

240 

241 def camelcase(): 

242 if not capital_first: 

243 yield str.lower 

244 while True: 

245 yield str.capitalize 

246 

247 if istype(text, "unicode"): 

248 text = text.encode("utf8") 

249 c = camelcase() 

250 return "".join(next(c)(x) if x else "_" for x in text.split("_")) 

251 

252 

253ATTRIBUTES = dict( 

254 list( 

255 zip( 

256 ["bold", "dark", "", "underline", "blink", "", "reverse", "concealed"], 

257 list(range(1, 9)), 

258 ) 

259 ) 

260) 

261del ATTRIBUTES[""] 

262 

263ATTRIBUTES_RE = r"\033\[(?:%s)m" % "|".join(["%d" % v for v in ATTRIBUTES.values()]) 

264 

265HIGHLIGHTS = dict( 

266 list( 

267 zip( 

268 [ 

269 "on_grey", 

270 "on_red", 

271 "on_green", 

272 "on_yellow", 

273 "on_blue", 

274 "on_magenta", 

275 "on_cyan", 

276 "on_white", 

277 ], 

278 list(range(40, 48)), 

279 ) 

280 ) 

281) 

282 

283HIGHLIGHTS_RE = r"\033\[(?:%s)m" % "|".join(["%d" % v for v in HIGHLIGHTS.values()]) 

284 

285COLORS = dict( 

286 list( 

287 zip( 

288 [ 

289 "grey", 

290 "red", 

291 "green", 

292 "yellow", 

293 "blue", 

294 "magenta", 

295 "cyan", 

296 "white", 

297 ], 

298 list(range(30, 38)), 

299 ) 

300 ) 

301) 

302 

303COLORS_RE = r"\033\[(?:%s)m" % "|".join(["%d" % v for v in COLORS.values()]) 

304 

305RESET = r"\033[0m" 

306RESET_RE = r"\033\[0m" 

307 

308 

309def colored(text, color=None, on_color=None, attrs=None, ansi_code=None): 

310 """Colorize text, while stripping nested ANSI color sequences. 

311 

312 Author: Konstantin Lepa <konstantin.lepa@gmail.com> / termcolor 

313 

314 Available text colors: 

315 red, green, yellow, blue, magenta, cyan, white. 

316 Available text highlights: 

317 on_red, on_green, on_yellow, on_blue, on_magenta, on_cyan, on_white. 

318 Available attributes: 

319 bold, dark, underline, blink, reverse, concealed. 

320 Example: 

321 colored('Hello, World!', 'red', 'on_grey', ['blue', 'blink']) 

322 colored('Hello, World!', 'green') 

323 """ 

324 if os.getenv("ANSI_COLORS_DISABLED") is None: 

325 if ansi_code is not None: 

326 return "\033[38;5;{}m{}\033[0m".format(ansi_code, text) 

327 fmt_str = "\033[%dm%s" 

328 if color is not None: 

329 text = re.sub(COLORS_RE + "(.*?)" + RESET_RE, r"\1", text) 

330 text = fmt_str % (COLORS[color], text) 

331 if on_color is not None: 

332 text = re.sub(HIGHLIGHTS_RE + "(.*?)" + RESET_RE, r"\1", text) 

333 text = fmt_str % (HIGHLIGHTS[on_color], text) 

334 if attrs is not None: 

335 text = re.sub(ATTRIBUTES_RE + "(.*?)" + RESET_RE, r"\1", text) 

336 for attr in attrs: 

337 text = fmt_str % (ATTRIBUTES[attr], text) 

338 return text + RESET 

339 else: 

340 return text 

341 

342 

343def cprint(text, color=None, on_color=None, attrs=None): 

344 """Print colorize text. 

345 

346 Author: Konstantin Lepa <konstantin.lepa@gmail.com> / termcolor 

347 

348 It accepts arguments of print function. 

349 """ 

350 print((colored(text, color, on_color, attrs))) 

351 

352 

353def issorted(arr): 

354 """Check if array is sorted.""" 

355 return np.all(np.diff(arr) >= 0) 

356 

357 

358def lstrip(text): 

359 """Remove leading whitespace from each line of a multiline string.""" 

360 return "\n".join(l.lstrip() for l in text.lstrip().split("\n")) 

361 

362 

363def chunks(l, n): 

364 """Yield successive n-sized chunks from l.""" 

365 for i in range(0, len(l), n): 

366 yield l[i : i + n] 

367 

368 

369def is_coherent(seq): 

370 """Find out if list of subsequent integers is complete. 

371 

372 Adapted from https://stackoverflow.com/questions/18131741/python-find-out-whether-a-list-of-integers-is-coherent 

373 

374 ``` 

375 is_coherent([1, 2, 3, 4, 5]) -> True 

376 is_coherent([1, 3, 4, 5]) -> False 

377 ``` 

378 """ 

379 return np.array_equal(seq, range(seq[0], int(seq[-1] + 1))) 

380 

381 

382def zero_pad(m, n=1): 

383 """Pad a matrix with zeros, on all sides.""" 

384 return np.pad(m, (n, n), mode="constant", constant_values=[0]) 

385 

386 

387def istype(obj, typename): 

388 """Drop-in replacement for `isinstance` to avoid imports""" 

389 return type(obj).__name__ == typename 

390 

391 

392def isnotebook(): 

393 """Check if running within a Jupyter notebook""" 

394 try: 

395 shell = get_ipython().__class__.__name__ 

396 if shell == "ZMQInteractiveShell": 

397 return True # Jupyter notebook or qtconsole 

398 elif shell == "TerminalInteractiveShell": 

399 return False # Terminal running IPython 

400 else: 

401 return False # Other type (?) 

402 except NameError: 

403 return False 

404 

405 

406def supports_color(): 

407 """Checks if the terminal supports color.""" 

408 if isnotebook(): 

409 return True 

410 supported_platform = sys.platform != "win32" or "ANSICON" in os.environ 

411 is_a_tty = hasattr(sys.stdout, "isatty") and sys.stdout.isatty() 

412 

413 if not supported_platform or not is_a_tty: 

414 return False 

415 

416 return True 

417 

418 

419def get_jpp_version(via_command="JPrint -v"): 

420 """Return the Jpp version or None if not available.""" 

421 try: 

422 out = subprocess.getoutput(via_command) 

423 except AttributeError: # TODO: python 2.7 

424 try: 

425 out = subprocess.check_output(via_command.split(), stderr=subprocess.STDOUT) 

426 except OSError: 

427 return None 

428 

429 for line in out.split("\n"): 

430 if line.startswith("version:"): 

431 jpp_version = line.split(":")[1].strip() 

432 return jpp_version 

433 

434 return None 

435 

436 

437def timed_cache(**timed_cache_kwargs): 

438 """LRU cache decorator with timeout. 

439 

440 Parameters 

441 ---------- 

442 days: int 

443 seconds: int 

444 microseconds: int 

445 milliseconds: int 

446 minutes: int 

447 hours: int 

448 weeks: int 

449 maxsise: int [default: 128] 

450 typed: bool [default: False] 

451 """ 

452 

453 def _wrapper(f): 

454 maxsize = timed_cache_kwargs.pop("maxsize", 128) 

455 typed = timed_cache_kwargs.pop("typed", False) 

456 update_delta = timedelta(**timed_cache_kwargs) 

457 # nonlocal workaround to support Python 2 

458 # https://technotroph.wordpress.com/2012/10/01/python-closures-and-the-python-2-7-nonlocal-solution/ 

459 d = {"next_update": datetime.utcnow() - update_delta} 

460 try: 

461 f = functools.lru_cache(maxsize=maxsize, typed=typed)(f) 

462 except AttributeError: 

463 print( 

464 "LRU caching is not available in Pyton 2.7, " 

465 "this will have no effect!" 

466 ) 

467 pass 

468 

469 @functools.wraps(f) 

470 def _wrapped(*args, **kwargs): 

471 now = datetime.utcnow() 

472 if now >= d["next_update"]: 

473 try: 

474 f.cache_clear() 

475 except AttributeError: 

476 pass 

477 d["next_update"] = now + update_delta 

478 return f(*args, **kwargs) 

479 

480 return _wrapped 

481 

482 return _wrapper 

483 

484 

485def sendmail(to, msg): 

486 """Send an email""" 

487 sender = "{}@{}".format(getpass.getuser(), socket.gethostname()) 

488 s = smtplib.SMTP("localhost") 

489 s.sendmail(sender, to, msg) 

490 s.quit()