Coverage for src/km3pipe/io/evt.py: 86%
282 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1# Filename: evt.py
2# pylint: disable=C0103,R0903
3"""
4Pumps for the EVT simulation dataformat.
6"""
8from collections import defaultdict
9import sys
11import numpy as np
13from thepipe import Module, Blob
14from km3pipe.dataclasses import Table
15from km3pipe.logger import get_logger
16from km3pipe.tools import split
18log = get_logger(__name__) # pylint: disable=C0103
20__author__ = "Tamas Gal"
21__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
22__credits__ = []
23__license__ = "MIT"
24__maintainer__ = "Tamas Gal, Moritz Lotze"
25__email__ = "tgal@km3net.de"
26__status__ = "Development"
29def try_decode_string(text):
30 """Decode string to ASCII if possible"""
31 try:
32 return text.decode("ascii")
33 except AttributeError:
34 return text
37class EvtPump(Module): # pylint: disable:R0902
38 """Provides a pump for EVT-files.
40 Parameters
41 ----------
42 filename: str
43 The file to read the events from.
44 parsers: list of str or callables
45 The parsers to apply for each blob (e.g. parsers=['km3sim', a_parser])
46 You can also pass your own function, which takes a single argument
47 `blob` and mutates it. `str` values will be looked up in the
48 `kp.io.evt.EVT_PARSERS` dictionary and ignored if not found.
49 If `parsers='auto'`, the `EvtPump` will try to find the appropriate
50 parsers, which is the default behaviour. [default: 'auto']
51 cache_enabled: bool
52 If enabled, a cache of the event indices is created when loading
53 the file. Enable it if you want to jump around and inspect the
54 events non-consecutively. [default: False]
55 basename: str
56 The common part of the filenames if you want to process multiple
57 files e.g. file1.evt, file2.evt and file3.evt. During processing,
58 the files will be concatenated behind the scenes.
59 You need to specify the `index_stop` and `index_start`
60 (1 and 3 for the example).
61 suffix: str
62 A string to append to each filename (before ".evt"), when basename
63 is given. [default: '']
64 index_start: int
65 The starting index if you process multiple files at once. [default: 1]
66 index_stop: int
67 The last index if you process multiple files at once. [default: 1]
68 n_digits: int or None
69 The number of digits for indexing multiple files. [default: None]
70 `None` means no leading zeros.
71 exclude_tags: list of strings
72 The tags in the EVT file, which should be ignored (e.g. if they
73 cause parse errors)
75 """
77 def configure(self):
78 self.filename = self.get("filename", default=None)
79 self.filenames = self.get("filenames", default=[])
80 parsers = self.get("parsers", default="auto")
81 self.cache_enabled = self.get("cache_enabled", default=False)
82 self.basename = self.get("basename", default=None)
83 self.suffix = self.get("suffix", default="")
84 self.index_start = self.get("index_start", default=1)
85 if self.filenames:
86 self.filename = self.filenames[0]
87 self.index_stop = len(self.filenames)
88 else:
89 self.index_stop = self.get("index_stop", default=1)
90 self.n_digits = self.get("n_digits", default=None)
91 self.exclude_tags = self.get("exclude_tags")
92 if self.exclude_tags is None:
93 self.exclude_tags = []
95 self.raw_header = None
96 self.event_offsets = []
97 self.index = 0
98 self.whole_file_cached = False
99 self.parsers = []
100 self._auto_parse = False
102 if not self.filename and not self.filenames and not self.basename:
103 print("No file- or basename(s) defined!")
105 if parsers:
106 if parsers == "auto":
107 self.cprint("Automatic tag parsing enabled.")
108 self._auto_parse = True
109 else:
110 if isinstance(parsers, str):
111 # expects a list(str)
112 parsers = [parsers]
113 self._register_parsers(parsers)
115 self.file_index = int(self.index_start)
116 if self.filenames:
117 self.filename = self.filenames[self.file_index - 1]
118 elif self.basename:
119 self.log.info(
120 "Got a basename ({}), constructing the first "
121 "filename.".format(self.basename)
122 )
123 file_index = self._get_file_index_str()
125 self.filename = "{}{}{}.evt".format(self.basename, file_index, self.suffix)
126 self.log.info("Constructed filename: {}".format(self.filename))
128 if self.filename:
129 self.cprint("Opening {0}".format(self.filename))
130 self.blob_file = self.open_file(self.filename)
131 self.prepare_blobs()
133 def _register_parsers(self, parsers):
134 self.log.info("Found parsers {}".format(parsers))
135 for parser in parsers:
136 if callable(parser):
137 self.parsers.append(parser)
138 continue
140 if parser in EVT_PARSERS.keys():
141 self.parsers.append(EVT_PARSERS[parser])
142 else:
143 self.log.warning("Parser '{}' not found, ignoring...".format(parser))
145 def _reset(self):
146 """Clear the cache."""
147 self.log.info("Clearing the cache, resetting event offsets")
148 self.raw_header = None
149 self.event_offsets = []
150 self.index = 0
152 def _get_file_index_str(self):
153 """Create a string out of the current file_index"""
154 file_index = str(self.file_index)
155 if self.n_digits is not None:
156 file_index = file_index.zfill(self.n_digits)
157 return file_index
159 def prepare_blobs(self):
160 """Populate the blobs"""
161 self.raw_header = self.extract_header()
162 if self.cache_enabled:
163 self._cache_offsets()
165 def extract_header(self):
166 """Create a dictionary with the EVT header information"""
167 self.log.info("Extracting the header")
168 raw_header = self.raw_header = defaultdict(list)
169 first_line = self.blob_file.readline()
170 first_line = try_decode_string(first_line)
171 self.blob_file.seek(0, 0)
172 if not first_line.startswith(str("start_run")):
173 self.log.warning("No header found.")
174 return raw_header
175 for line in iter(self.blob_file.readline, ""):
176 line = try_decode_string(line)
177 line = line.strip()
178 try:
179 tag, value = str(line).split(":")
180 except ValueError:
181 continue
182 raw_header[tag].append(str(value).split())
183 if line.startswith(str("end_event:")):
184 self._record_offset()
185 if self._auto_parse and "physics" in raw_header:
186 parsers = [p[0].lower() for p in raw_header["physics"]]
187 self._register_parsers(parsers)
188 return raw_header
189 raise ValueError("Incomplete header, no 'end_event' tag found!")
191 def get_blob(self, index):
192 """Return a blob with the event at the given index"""
193 self.log.info("Retrieving blob #{}".format(index))
194 if index > len(self.event_offsets) - 1:
195 self.log.info("Index not in cache, caching offsets")
196 self._cache_offsets(index, verbose=False)
197 self.blob_file.seek(self.event_offsets[index], 0)
198 blob = self._create_blob()
199 if blob is None:
200 self.log.info("Empty blob created...")
201 raise IndexError
202 else:
203 self.log.debug("Applying parsers...")
204 for parser in self.parsers:
205 parser(blob)
206 self.log.debug("Returning the blob")
207 return blob
209 def process(self, blob=None):
210 """Pump the next blob to the modules"""
211 try:
212 blob = self.get_blob(self.index)
214 except IndexError:
215 self.log.info("Got an IndexError, trying the next file")
216 if (self.basename or self.filenames) and self.file_index < self.index_stop:
217 self.file_index += 1
218 self.log.info("Now at file_index={}".format(self.file_index))
219 self._reset()
220 self.blob_file.close()
221 self.log.info("Resetting blob index to 0")
222 self.index = 0
223 file_index = self._get_file_index_str()
224 if self.filenames:
225 self.filename = self.filenames[self.file_index - 1]
226 elif self.basename:
227 self.filename = "{}{}{}.evt".format(
228 self.basename, file_index, self.suffix
229 )
230 self.log.info("Next filename: {}".format(self.filename))
231 self.cprint("Opening {0}".format(self.filename))
232 self.blob_file = self.open_file(self.filename)
233 self.prepare_blobs()
234 try:
235 blob = self.get_blob(self.index)
236 except IndexError:
237 self.log.warning("No blob found in file {}".format(self.filename))
238 else:
239 return blob
240 self.log.info("No files left, terminating the pipeline")
241 raise StopIteration
243 self.index += 1
244 return blob
246 def _cache_offsets(self, up_to_index=None, verbose=True):
247 """Cache all event offsets."""
248 if not up_to_index:
249 if verbose:
250 self.cprint("Caching event file offsets, this may take a bit.")
251 self.blob_file.seek(0, 0)
252 self.event_offsets = []
253 if not self.raw_header:
254 self.event_offsets.append(0)
255 else:
256 self.blob_file.seek(self.event_offsets[-1], 0)
257 for line in iter(self.blob_file.readline, ""):
258 line = try_decode_string(line)
259 if line.startswith("end_event:"):
260 self._record_offset()
261 if len(self.event_offsets) % 100 == 0:
262 if verbose:
263 print(".", end="")
264 sys.stdout.flush()
265 if up_to_index and len(self.event_offsets) >= up_to_index + 1:
266 return
267 self.event_offsets.pop() # get rid of the last entry
268 if not up_to_index:
269 self.whole_file_cached = True
270 self.cprint("\n{0} events indexed.".format(len(self.event_offsets)))
272 def _record_offset(self):
273 """Stores the current file pointer position"""
274 offset = self.blob_file.tell()
275 self.event_offsets.append(offset)
277 def _create_blob(self):
278 """Parse the next event from the current file position"""
279 blob = None
280 for line in self.blob_file:
281 line = try_decode_string(line)
282 line = line.strip()
283 if line == "":
284 self.log.info("Ignoring empty line...")
285 continue
286 if line.startswith("end_event:") and blob:
287 blob["raw_header"] = self.raw_header
288 return blob
289 try:
290 tag, values = line.split(":")
291 except ValueError:
292 self.log.warning("Ignoring corrupt line: {}".format(line))
293 continue
294 try:
295 values = tuple(split(values.strip(), callback=float))
296 except ValueError:
297 self.log.info("Empty value: {}".format(values))
298 if line.startswith("start_event:"):
299 blob = Blob()
300 blob[tag] = tuple(int(v) for v in values)
301 continue
302 if tag not in blob:
303 blob[tag] = []
304 blob[tag].append(values)
306 def __len__(self):
307 if not self.whole_file_cached:
308 self._cache_offsets()
309 return len(self.event_offsets)
311 def __iter__(self):
312 return self
314 def __next__(self):
315 try:
316 blob = self.get_blob(self.index)
317 except IndexError:
318 self.index = 0
319 raise StopIteration
320 self.index += 1
321 return blob
323 def __getitem__(self, index):
324 if isinstance(index, int):
325 return self.get_blob(index)
326 elif isinstance(index, slice):
327 return self._slice_generator(index)
328 else:
329 raise TypeError("index must be int or slice")
331 def _slice_generator(self, index):
332 """A simple slice generator for iterations"""
333 start, stop, step = index.indices(len(self))
334 for i in range(start, stop, step):
335 yield self.get_blob(i)
337 def finish(self):
338 if self.blob_file:
339 self.blob_file.close()
342class Parser(object):
343 """Standard parser to create numpy times from EVT raw data.
345 The `tag_description` is a dict of tuples. The key is the target blob-key,
346 the value is tuple of "target blob-key" and "numpy dtype".
348 """
350 def __init__(self, tag_description):
351 self.tag_description = tag_description
353 def __call__(self, blob):
354 """Iterate through the blob-keys and add the parsed data to the blob"""
355 for key in list(blob.keys()):
356 if key in self.tag_description.keys():
357 data = blob[key]
358 out_key, dtype = self.tag_description[key]
359 arr = np.array([d[: len(dtype)] for d in data], dtype)
360 tab = Table(arr, name=out_key)
361 blob[out_key] = tab
364KM3SIM_TAGS = {
365 "hit": [
366 "KM3SimHits",
367 [
368 ("id", "f4"),
369 ("pmt_id", "<i4"),
370 ("pe", "f4"),
371 ("time", "f4"),
372 ("type", "f4"),
373 ("n_photons", "f4"),
374 ("track_in", "f4"),
375 ("c_time", "f4"),
376 ("unknown", "f4"),
377 ],
378 ],
379}
381GSEAGEN_TAGS = {
382 "neutrino": [
383 "Neutrinos",
384 [
385 ("id", "<i4"),
386 ("pos_x", "f4"),
387 ("pos_y", "f4"),
388 ("pos_z", "f4"),
389 ("dir_x", "f4"),
390 ("dir_y", "f4"),
391 ("dir_z", "f4"),
392 ("energy", "f4"),
393 ("time", "f4"),
394 ("bjorken_x", "f4"),
395 ("bjorken_y", "f4"),
396 ("scattering_type", "<i4"),
397 ("pdg_id", "<i4"),
398 ("interaction_type", "<i4"),
399 ],
400 ],
401 "track_in": [
402 "TrackIns",
403 [
404 ("id", "<i4"),
405 ("pos_x", "f4"),
406 ("pos_y", "f4"),
407 ("pos_z", "f4"),
408 ("dir_x", "f4"),
409 ("dir_y", "f4"),
410 ("dir_z", "f4"),
411 ("energy", "f4"),
412 ("time", "f4"),
413 ("geant_id", "f4"),
414 ],
415 ],
416 "primary_lepton": [
417 "PrimaryLeptons",
418 [
419 ("id", "<i4"),
420 ("pos_x", "f4"),
421 ("pos_y", "f4"),
422 ("pos_z", "f4"),
423 ("dir_x", "f4"),
424 ("dir_y", "f4"),
425 ("dir_z", "f4"),
426 ("energy", "f4"),
427 ("time", "f4"),
428 ("geant_id", "f4"),
429 ],
430 ],
431}
433KM3_TAGS = {
434 "neutrino": [
435 "Neutrinos",
436 [
437 ("id", "<i4"),
438 ("pos_x", "f4"),
439 ("pos_y", "f4"),
440 ("pos_z", "f4"),
441 ("dir_x", "f4"),
442 ("dir_y", "f4"),
443 ("dir_z", "f4"),
444 ("energy", "f4"),
445 ("time", "f4"),
446 ("bjorken_x", "f4"),
447 ("bjorken_y", "f4"),
448 ("scattering_type", "<i4"),
449 ("pdg_id", "<i4"),
450 ("interaction_type", "<i4"),
451 ],
452 ],
453 "track_in": [
454 "TrackIns",
455 [
456 ("id", "<i4"),
457 ("pos_x", "f4"),
458 ("pos_y", "f4"),
459 ("pos_z", "f4"),
460 ("dir_x", "f4"),
461 ("dir_y", "f4"),
462 ("dir_z", "f4"),
463 ("energy", "f4"),
464 ("time", "f4"),
465 ("type", "f4"),
466 ("something", "<i4"),
467 ],
468 ],
469 "hit_raw": [
470 "Hits",
471 [
472 ("id", "<i4"),
473 ("pmt_id", "<i4"),
474 ("npe", "<i4"),
475 ("time", "f4"),
476 ],
477 ],
478}
481def parse_corant(blob):
482 """Creates new blob entries for the given blob keys"""
484 if "track_seamuon" in blob.keys():
486 muon = blob["track_seamuon"]
488 blob["Muon"] = Table(
489 {
490 "id": np.array(muon)[:, 0].astype(int),
491 "pos_x": np.array(muon)[:, 1],
492 "pos_y": np.array(muon)[:, 2],
493 "pos_z": np.array(muon)[:, 3],
494 "dir_x": np.array(muon)[:, 4],
495 "dir_y": np.array(muon)[:, 5],
496 "dir_z": np.array(muon)[:, 6],
497 "energy": np.array(muon)[:, 7],
498 "time": np.array(muon)[:, 8],
499 "particle_id": np.array(muon)[:, 9].astype(int),
500 "is_charm": np.array(muon)[:, 10].astype(int),
501 "mother_pid": np.array(muon)[:, 11].astype(int),
502 "grandmother_pid": np.array(muon)[:, 11].astype(int),
503 },
504 h5loc="muon",
505 )
507 blob["MuonMultiplicity"] = Table(
508 {"muon_multiplicity": len(np.array(muon)[:, 6])}, h5loc="muon_multiplicity"
509 )
511 if "track_seaneutrino" in blob.keys():
513 nu = blob["track_seaneutrino"]
515 blob["Neutrino"] = Table(
516 {
517 "id": np.array(nu)[:, 0].astype(int),
518 "pos_x": np.array(nu)[:, 1],
519 "pos_y": np.array(nu)[:, 2],
520 "pos_z": np.array(nu)[:, 3],
521 "dir_x": np.array(nu)[:, 4],
522 "dir_y": np.array(nu)[:, 5],
523 "dir_z": np.array(nu)[:, 6],
524 "energy": np.array(nu)[:, 7],
525 "time": np.array(nu)[:, 8],
526 "particle_id": np.array(nu)[:, 9].astype(int),
527 "is_charm": np.array(nu)[:, 10].astype(int),
528 "mother_pid": np.array(nu)[:, 11].astype(int),
529 "grandmother_pid": np.array(nu)[:, 11].astype(int),
530 },
531 h5loc="nu",
532 )
533 blob["NeutrinoMultiplicity"] = Table(
534 {
535 "total": len(np.array(nu)[:, 6]),
536 "nue": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 66]),
537 "anue": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 67]),
538 "numu": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 68]),
539 "anumu": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 69]),
540 },
541 h5loc="nu_multiplicity",
542 )
544 if ("track_seamuon" or "track_seaneutrino") in blob.keys():
546 blob["Weights"] = Table(
547 {
548 "w1": blob["weights"][0][0],
549 "w2": blob["weights"][0][1],
550 "w3": blob["weights"][0][2],
551 },
552 h5loc="weights",
553 )
555 if "track_primary" in blob.keys():
557 primary = blob["track_primary"]
559 blob["Primary"] = Table(
560 {
561 "id": np.array(primary)[:, 0].astype(int),
562 "pos_x": np.array(primary)[:, 1],
563 "pos_y": np.array(primary)[:, 2],
564 "pos_z": np.array(primary)[:, 3],
565 "dir_x": np.array(primary)[:, 4],
566 "dir_y": np.array(primary)[:, 5],
567 "dir_z": np.array(primary)[:, 6],
568 "energy": np.array(primary)[:, 7],
569 "time": np.array(primary)[:, 8],
570 "particle_id": np.array(primary)[:, 9].astype(int),
571 },
572 h5loc="primary",
573 )
575 return blob
578def parse_propa(blob):
579 """Creates new blob entries for the given blob keys"""
581 if "track_in" in blob.keys():
583 muon = blob["track_in"]
585 blob["Muon"] = Table(
586 {
587 "id": np.array(muon)[:, 0].astype(int),
588 "pos_x": np.array(muon)[:, 1],
589 "pos_y": np.array(muon)[:, 2],
590 "pos_z": np.array(muon)[:, 3],
591 "dir_x": np.array(muon)[:, 4],
592 "dir_y": np.array(muon)[:, 5],
593 "dir_z": np.array(muon)[:, 6],
594 "energy": np.array(muon)[:, 7],
595 "time": np.array(muon)[:, 8],
596 "particle_id": np.array(muon)[:, 9].astype(int),
597 "is_charm": np.array(muon)[:, 10].astype(int),
598 "mother_pid": np.array(muon)[:, 11].astype(int),
599 "grandmother_pid": np.array(muon)[:, 11].astype(int),
600 },
601 h5loc="muon",
602 )
604 blob["MuonMultiplicity"] = Table(
605 {"muon_multiplicity": len(np.array(muon)[:, 6])}, h5loc="muon_multiplicity"
606 )
608 if "neutrino" in blob.keys():
610 nu = blob["neutrino"]
612 blob["Neutrino"] = Table(
613 {
614 "id": np.array(nu)[:, 0].astype(int),
615 "pos_x": np.array(nu)[:, 1],
616 "pos_y": np.array(nu)[:, 2],
617 "pos_z": np.array(nu)[:, 3],
618 "dir_x": np.array(nu)[:, 4],
619 "dir_y": np.array(nu)[:, 5],
620 "dir_z": np.array(nu)[:, 6],
621 "energy": np.array(nu)[:, 7],
622 "time": np.array(nu)[:, 8],
623 "particle_id": np.array(nu)[:, 9].astype(int),
624 "is_charm": np.array(nu)[:, 10].astype(int),
625 "mother_pid": np.array(nu)[:, 11].astype(int),
626 "grandmother_pid": np.array(nu)[:, 11].astype(int),
627 },
628 h5loc="nu",
629 )
630 blob["NeutrinoMultiplicity"] = Table(
631 {
632 "total": len(np.array(nu)[:, 6]),
633 "nue": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 12]),
634 "anue": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == -12]),
635 "numu": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 14]),
636 "anumu": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == -14]),
637 },
638 h5loc="nu_multiplicity",
639 )
641 if ("track_in" or "neutrino") in blob.keys():
643 blob["Weights"] = Table(
644 {
645 "w1": blob["weights"][0][0],
646 "w2": blob["weights"][0][1],
647 "w3": blob["weights"][0][2],
648 },
649 h5loc="weights",
650 )
652 if "track_primary" in blob.keys():
654 primary = blob["track_primary"]
656 blob["Primary"] = Table(
657 {
658 "id": np.array(primary)[:, 0].astype(int),
659 "pos_x": np.array(primary)[:, 1],
660 "pos_y": np.array(primary)[:, 2],
661 "pos_z": np.array(primary)[:, 3],
662 "dir_x": np.array(primary)[:, 4],
663 "dir_y": np.array(primary)[:, 5],
664 "dir_z": np.array(primary)[:, 6],
665 "energy": np.array(primary)[:, 7],
666 "time": np.array(primary)[:, 8],
667 "particle_id": np.array(primary)[:, 9].astype(int),
668 },
669 h5loc="primary",
670 )
672 return blob
675EVT_PARSERS = {
676 "km3sim": Parser(KM3SIM_TAGS),
677 "gseagen": Parser(GSEAGEN_TAGS),
678 "km3": Parser(KM3_TAGS),
679 "propa": parse_propa,
680 "corant": parse_corant,
681}