# Filename: evt.py
# pylint: disable=C0103,R0903
"""
Pumps for the EVT simulation dataformat.
"""
from collections import defaultdict
import sys
import numpy as np
from thepipe import Module, Blob
from km3pipe.dataclasses import Table
from km3pipe.logger import get_logger
from km3pipe.tools import split
[docs]
log = get_logger(__name__) # pylint: disable=C0103
__author__ = "Tamas Gal"
__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
__credits__ = []
__license__ = "MIT"
__maintainer__ = "Tamas Gal, Moritz Lotze"
__email__ = "tgal@km3net.de"
__status__ = "Development"
[docs]
def try_decode_string(text):
"""Decode string to ASCII if possible"""
try:
return text.decode("ascii")
except AttributeError:
return text
[docs]
class EvtPump(Module): # pylint: disable:R0902
"""Provides a pump for EVT-files.
Parameters
----------
filename: str
The file to read the events from.
parsers: list of str or callables
The parsers to apply for each blob (e.g. parsers=['km3sim', a_parser])
You can also pass your own function, which takes a single argument
`blob` and mutates it. `str` values will be looked up in the
`kp.io.evt.EVT_PARSERS` dictionary and ignored if not found.
If `parsers='auto'`, the `EvtPump` will try to find the appropriate
parsers, which is the default behaviour. [default: 'auto']
cache_enabled: bool
If enabled, a cache of the event indices is created when loading
the file. Enable it if you want to jump around and inspect the
events non-consecutively. [default: False]
basename: str
The common part of the filenames if you want to process multiple
files e.g. file1.evt, file2.evt and file3.evt. During processing,
the files will be concatenated behind the scenes.
You need to specify the `index_stop` and `index_start`
(1 and 3 for the example).
suffix: str
A string to append to each filename (before ".evt"), when basename
is given. [default: '']
index_start: int
The starting index if you process multiple files at once. [default: 1]
index_stop: int
The last index if you process multiple files at once. [default: 1]
n_digits: int or None
The number of digits for indexing multiple files. [default: None]
`None` means no leading zeros.
exclude_tags: list of strings
The tags in the EVT file, which should be ignored (e.g. if they
cause parse errors)
"""
def _register_parsers(self, parsers):
self.log.info("Found parsers {}".format(parsers))
for parser in parsers:
if callable(parser):
self.parsers.append(parser)
continue
if parser in EVT_PARSERS.keys():
self.parsers.append(EVT_PARSERS[parser])
else:
self.log.warning("Parser '{}' not found, ignoring...".format(parser))
def _reset(self):
"""Clear the cache."""
self.log.info("Clearing the cache, resetting event offsets")
self.raw_header = None
self.event_offsets = []
self.index = 0
def _get_file_index_str(self):
"""Create a string out of the current file_index"""
file_index = str(self.file_index)
if self.n_digits is not None:
file_index = file_index.zfill(self.n_digits)
return file_index
[docs]
def prepare_blobs(self):
"""Populate the blobs"""
self.raw_header = self.extract_header()
if self.cache_enabled:
self._cache_offsets()
[docs]
def get_blob(self, index):
"""Return a blob with the event at the given index"""
self.log.info("Retrieving blob #{}".format(index))
if index > len(self.event_offsets) - 1:
self.log.info("Index not in cache, caching offsets")
self._cache_offsets(index, verbose=False)
self.blob_file.seek(self.event_offsets[index], 0)
blob = self._create_blob()
if blob is None:
self.log.info("Empty blob created...")
raise IndexError
else:
self.log.debug("Applying parsers...")
for parser in self.parsers:
parser(blob)
self.log.debug("Returning the blob")
return blob
[docs]
def process(self, blob=None):
"""Pump the next blob to the modules"""
try:
blob = self.get_blob(self.index)
except IndexError:
self.log.info("Got an IndexError, trying the next file")
if (self.basename or self.filenames) and self.file_index < self.index_stop:
self.file_index += 1
self.log.info("Now at file_index={}".format(self.file_index))
self._reset()
self.blob_file.close()
self.log.info("Resetting blob index to 0")
self.index = 0
file_index = self._get_file_index_str()
if self.filenames:
self.filename = self.filenames[self.file_index - 1]
elif self.basename:
self.filename = "{}{}{}.evt".format(
self.basename, file_index, self.suffix
)
self.log.info("Next filename: {}".format(self.filename))
self.cprint("Opening {0}".format(self.filename))
self.blob_file = self.open_file(self.filename)
self.prepare_blobs()
try:
blob = self.get_blob(self.index)
except IndexError:
self.log.warning("No blob found in file {}".format(self.filename))
else:
return blob
self.log.info("No files left, terminating the pipeline")
raise StopIteration
self.index += 1
return blob
def _cache_offsets(self, up_to_index=None, verbose=True):
"""Cache all event offsets."""
if not up_to_index:
if verbose:
self.cprint("Caching event file offsets, this may take a bit.")
self.blob_file.seek(0, 0)
self.event_offsets = []
if not self.raw_header:
self.event_offsets.append(0)
else:
self.blob_file.seek(self.event_offsets[-1], 0)
for line in iter(self.blob_file.readline, ""):
line = try_decode_string(line)
if line.startswith("end_event:"):
self._record_offset()
if len(self.event_offsets) % 100 == 0:
if verbose:
print(".", end="")
sys.stdout.flush()
if up_to_index and len(self.event_offsets) >= up_to_index + 1:
return
self.event_offsets.pop() # get rid of the last entry
if not up_to_index:
self.whole_file_cached = True
self.cprint("\n{0} events indexed.".format(len(self.event_offsets)))
def _record_offset(self):
"""Stores the current file pointer position"""
offset = self.blob_file.tell()
self.event_offsets.append(offset)
def _create_blob(self):
"""Parse the next event from the current file position"""
blob = None
for line in self.blob_file:
line = try_decode_string(line)
line = line.strip()
if line == "":
self.log.info("Ignoring empty line...")
continue
if line.startswith("end_event:") and blob:
blob["raw_header"] = self.raw_header
return blob
try:
tag, values = line.split(":")
except ValueError:
self.log.warning("Ignoring corrupt line: {}".format(line))
continue
try:
values = tuple(split(values.strip(), callback=float))
except ValueError:
self.log.info("Empty value: {}".format(values))
if line.startswith("start_event:"):
blob = Blob()
blob[tag] = tuple(int(v) for v in values)
continue
if tag not in blob:
blob[tag] = []
blob[tag].append(values)
def __len__(self):
if not self.whole_file_cached:
self._cache_offsets()
return len(self.event_offsets)
def __iter__(self):
return self
def __next__(self):
try:
blob = self.get_blob(self.index)
except IndexError:
self.index = 0
raise StopIteration
self.index += 1
return blob
def __getitem__(self, index):
if isinstance(index, int):
return self.get_blob(index)
elif isinstance(index, slice):
return self._slice_generator(index)
else:
raise TypeError("index must be int or slice")
def _slice_generator(self, index):
"""A simple slice generator for iterations"""
start, stop, step = index.indices(len(self))
for i in range(start, stop, step):
yield self.get_blob(i)
[docs]
def finish(self):
if self.blob_file:
self.blob_file.close()
[docs]
class Parser(object):
"""Standard parser to create numpy times from EVT raw data.
The `tag_description` is a dict of tuples. The key is the target blob-key,
the value is tuple of "target blob-key" and "numpy dtype".
"""
def __init__(self, tag_description):
self.tag_description = tag_description
def __call__(self, blob):
"""Iterate through the blob-keys and add the parsed data to the blob"""
for key in list(blob.keys()):
if key in self.tag_description.keys():
data = blob[key]
out_key, dtype = self.tag_description[key]
arr = np.array([d[: len(dtype)] for d in data], dtype)
tab = Table(arr, name=out_key)
blob[out_key] = tab
[docs]
def parse_corant(blob):
"""Creates new blob entries for the given blob keys"""
if "track_seamuon" in blob.keys():
muon = blob["track_seamuon"]
blob["Muon"] = Table(
{
"id": np.array(muon)[:, 0].astype(int),
"pos_x": np.array(muon)[:, 1],
"pos_y": np.array(muon)[:, 2],
"pos_z": np.array(muon)[:, 3],
"dir_x": np.array(muon)[:, 4],
"dir_y": np.array(muon)[:, 5],
"dir_z": np.array(muon)[:, 6],
"energy": np.array(muon)[:, 7],
"time": np.array(muon)[:, 8],
"particle_id": np.array(muon)[:, 9].astype(int),
"is_charm": np.array(muon)[:, 10].astype(int),
"mother_pid": np.array(muon)[:, 11].astype(int),
"grandmother_pid": np.array(muon)[:, 11].astype(int),
},
h5loc="muon",
)
blob["MuonMultiplicity"] = Table(
{"muon_multiplicity": len(np.array(muon)[:, 6])}, h5loc="muon_multiplicity"
)
if "track_seaneutrino" in blob.keys():
nu = blob["track_seaneutrino"]
blob["Neutrino"] = Table(
{
"id": np.array(nu)[:, 0].astype(int),
"pos_x": np.array(nu)[:, 1],
"pos_y": np.array(nu)[:, 2],
"pos_z": np.array(nu)[:, 3],
"dir_x": np.array(nu)[:, 4],
"dir_y": np.array(nu)[:, 5],
"dir_z": np.array(nu)[:, 6],
"energy": np.array(nu)[:, 7],
"time": np.array(nu)[:, 8],
"particle_id": np.array(nu)[:, 9].astype(int),
"is_charm": np.array(nu)[:, 10].astype(int),
"mother_pid": np.array(nu)[:, 11].astype(int),
"grandmother_pid": np.array(nu)[:, 11].astype(int),
},
h5loc="nu",
)
blob["NeutrinoMultiplicity"] = Table(
{
"total": len(np.array(nu)[:, 6]),
"nue": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 66]),
"anue": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 67]),
"numu": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 68]),
"anumu": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 69]),
},
h5loc="nu_multiplicity",
)
if ("track_seamuon" or "track_seaneutrino") in blob.keys():
blob["Weights"] = Table(
{
"w1": blob["weights"][0][0],
"w2": blob["weights"][0][1],
"w3": blob["weights"][0][2],
},
h5loc="weights",
)
if "track_primary" in blob.keys():
primary = blob["track_primary"]
blob["Primary"] = Table(
{
"id": np.array(primary)[:, 0].astype(int),
"pos_x": np.array(primary)[:, 1],
"pos_y": np.array(primary)[:, 2],
"pos_z": np.array(primary)[:, 3],
"dir_x": np.array(primary)[:, 4],
"dir_y": np.array(primary)[:, 5],
"dir_z": np.array(primary)[:, 6],
"energy": np.array(primary)[:, 7],
"time": np.array(primary)[:, 8],
"particle_id": np.array(primary)[:, 9].astype(int),
},
h5loc="primary",
)
return blob
[docs]
def parse_propa(blob):
"""Creates new blob entries for the given blob keys"""
if "track_in" in blob.keys():
muon = blob["track_in"]
blob["Muon"] = Table(
{
"id": np.array(muon)[:, 0].astype(int),
"pos_x": np.array(muon)[:, 1],
"pos_y": np.array(muon)[:, 2],
"pos_z": np.array(muon)[:, 3],
"dir_x": np.array(muon)[:, 4],
"dir_y": np.array(muon)[:, 5],
"dir_z": np.array(muon)[:, 6],
"energy": np.array(muon)[:, 7],
"time": np.array(muon)[:, 8],
"particle_id": np.array(muon)[:, 9].astype(int),
"is_charm": np.array(muon)[:, 10].astype(int),
"mother_pid": np.array(muon)[:, 11].astype(int),
"grandmother_pid": np.array(muon)[:, 11].astype(int),
},
h5loc="muon",
)
blob["MuonMultiplicity"] = Table(
{"muon_multiplicity": len(np.array(muon)[:, 6])}, h5loc="muon_multiplicity"
)
if "neutrino" in blob.keys():
nu = blob["neutrino"]
blob["Neutrino"] = Table(
{
"id": np.array(nu)[:, 0].astype(int),
"pos_x": np.array(nu)[:, 1],
"pos_y": np.array(nu)[:, 2],
"pos_z": np.array(nu)[:, 3],
"dir_x": np.array(nu)[:, 4],
"dir_y": np.array(nu)[:, 5],
"dir_z": np.array(nu)[:, 6],
"energy": np.array(nu)[:, 7],
"time": np.array(nu)[:, 8],
"particle_id": np.array(nu)[:, 9].astype(int),
"is_charm": np.array(nu)[:, 10].astype(int),
"mother_pid": np.array(nu)[:, 11].astype(int),
"grandmother_pid": np.array(nu)[:, 11].astype(int),
},
h5loc="nu",
)
blob["NeutrinoMultiplicity"] = Table(
{
"total": len(np.array(nu)[:, 6]),
"nue": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 12]),
"anue": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == -12]),
"numu": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == 14]),
"anumu": len(np.array(nu)[:, 6][np.array(nu)[:, 9] == -14]),
},
h5loc="nu_multiplicity",
)
if ("track_in" or "neutrino") in blob.keys():
blob["Weights"] = Table(
{
"w1": blob["weights"][0][0],
"w2": blob["weights"][0][1],
"w3": blob["weights"][0][2],
},
h5loc="weights",
)
if "track_primary" in blob.keys():
primary = blob["track_primary"]
blob["Primary"] = Table(
{
"id": np.array(primary)[:, 0].astype(int),
"pos_x": np.array(primary)[:, 1],
"pos_y": np.array(primary)[:, 2],
"pos_z": np.array(primary)[:, 3],
"dir_x": np.array(primary)[:, 4],
"dir_y": np.array(primary)[:, 5],
"dir_z": np.array(primary)[:, 6],
"energy": np.array(primary)[:, 7],
"time": np.array(primary)[:, 8],
"particle_id": np.array(primary)[:, 9].astype(int),
},
h5loc="primary",
)
return blob
[docs]
EVT_PARSERS = {
"km3sim": Parser(KM3SIM_TAGS),
"gseagen": Parser(GSEAGEN_TAGS),
"km3": Parser(KM3_TAGS),
"propa": parse_propa,
"corant": parse_corant,
}