Coverage for src/km3pipe/io/online.py: 75%
134 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1#!/usr/bin/env python
2# Filename: online.py
3# pylint: disable=
4"""
5Pump for the online file format
6(the file format formerly known as formerly as JPP)
8"""
10import numpy as np
11import km3io
13from thepipe import Module, Blob
14from km3pipe.dataclasses import Table
15from km3pipe.logger import get_logger
17log = get_logger(__name__) # pylint: disable=C0103
19__author__ = "Tamas Gal"
20__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
21__credits__ = ["Thomas Heid", "Giuliano Maggi", "Moritz Lotze", "Johannes Schumann"]
22__license__ = "MIT"
23__maintainer__ = "Tamas Gal"
24__email__ = "tgal@km3net.de"
25__status__ = "Development"
28class EventPump(Module):
29 """A pump for DAQ (triggered) events in online files.
31 Parameters
32 ----------
33 filename: str
34 Name of the file to open.
36 """
38 def configure(self):
39 self.event_index = self.get("index") or 0
40 self.filename = self.require("filename")
42 self.event_reader = km3io.OnlineReader(self.filename.encode())
43 self.blobs = self.blob_generator()
44 self.n_events = len(self.event_reader.events)
45 self._current_blob = Blob()
47 def blob_generator(self):
48 for i in range(self.n_events):
49 try:
50 yield self.extract_event(i)
51 except IndexError:
52 return
54 def __getitem__(self, index):
55 if index >= self.n_events:
56 raise IndexError
57 return self.extract_event(index)
59 def _get_trigger_mask(self, snapshot_hits, triggered_hits):
60 trg_mask = np.zeros(len(snapshot_hits))
61 s = np.array(
62 [snapshot_hits.time, snapshot_hits.channel_id, snapshot_hits.dom_id]
63 ).transpose()
64 t = np.array(
65 [triggered_hits.time, triggered_hits.channel_id, triggered_hits.dom_id]
66 ).transpose()
67 cmp_mask = s == t[:, None]
68 trg_map = np.argwhere(np.all(cmp_mask, axis=2))[:, 1]
69 trg_mask[trg_map] = triggered_hits.trigger_mask
70 return trg_mask
72 def extract_event(self, event_number):
73 blob = self._current_blob
74 r = self.event_reader
75 hits = r.events.snapshot_hits[event_number]
76 trg_hits = r.events.triggered_hits[event_number]
77 raw_event_info = r.events.headers[event_number]
79 trigger_mask = self._get_trigger_mask(hits, trg_hits)
80 hit_series = Table(
81 {
82 "channel_id": hits.channel_id,
83 "dom_id": hits.dom_id,
84 "time": hits.time,
85 "tot": hits.tot,
86 "triggered": trigger_mask,
87 },
88 name="Hits",
89 h5loc="/hits",
90 split_h5=True,
91 )
93 event_info = Table(
94 {
95 "det_id": raw_event_info["detector_id"],
96 "frame_index": raw_event_info["frame_index"],
97 "livetime_sec": 0,
98 "mc_id": 0,
99 "mc_t": 0,
100 "n_events_gen": 0,
101 "n_files_gen": 0,
102 "overlays": raw_event_info["overlays"],
103 "trigger_counter": raw_event_info["trigger_counter"],
104 "trigger_mask": raw_event_info["trigger_mask"],
105 "utc_nanoseconds": raw_event_info["UTC_16nanosecondcycles"] * 16.0,
106 "utc_seconds": raw_event_info["UTC_seconds"],
107 "weight_w1": np.nan,
108 "weight_w2": np.nan,
109 "weight_w3": np.nan,
110 "run_id": raw_event_info["run"],
111 },
112 name="EventInfo",
113 h5loc="/event_info",
114 )
116 self.event_index += 1
117 blob["EventInfo"] = event_info
118 blob["Hits"] = hit_series
119 return blob
121 def process(self, blob):
122 self._current_blob = blob
123 return next(self.blobs)
125 def __iter__(self):
126 return self
128 def __next__(self):
129 self._current_blob = next(self.blobs)
130 return self._current_blob
133class TimeslicePump(Module):
134 """A pump to read and extract timeslices. Currently only hits are read.
136 Parameters
137 -------------------
138 filename: str
139 stream: str ('L0', 'L1', 'L2', 'SN') default: 'L1'
141 """
143 def configure(self):
144 fname = self.require("filename")
145 self.stream = self.get("stream", default="L1")
146 self.blobs = self.timeslice_generator()
147 self.r = km3io.OnlineReader(fname)
148 self.timeslice_info = self.create_timeslice_info()
149 self.n_timeslices = len(self.timeslice_info)
151 self._current_blob = Blob()
152 self._hits_blob_key = "{}Hits".format(self.stream if self.stream else "TS")
154 def create_timeslice_info(self):
155 header = self.r.timeslices.stream(self.stream, 0).header
156 slice_ids = header["frame_index"].array()
157 timestamps = header["timeslice_start.UTC_seconds"].array()
158 number_of_slices = len(slice_ids)
159 nanoseconds = header["timeslice_start.UTC_16nanosecondcycles"].array() * 16
160 timeslice_info = Table.from_template(
161 {
162 "frame_index": slice_ids,
163 "slice_id": range(number_of_slices),
164 "timestamp": timestamps,
165 "nanoseconds": nanoseconds,
166 "n_frames": np.zeros(len(slice_ids)),
167 },
168 "TimesliceInfo",
169 )
170 return timeslice_info
172 def process(self, blob):
173 self._current_blob = blob
174 return next(self.blobs)
176 def timeslice_generator(self):
177 """Uses slice ID as iterator"""
178 slice_id = 0
179 while slice_id < self.n_timeslices:
180 blob = self.get_blob(slice_id)
181 yield blob
182 slice_id += 1
184 def get_blob(self, index):
185 """Index is slice ID"""
186 blob = self._current_blob
187 hits = self._extract_hits(index)
188 hits.group_id = index
189 blob["TimesliceInfo"] = self.timeslice_info[index : index + 1]
190 blob["TimesliceInfo"]["n_frames"] = self._extract_number_of_frames(index)
191 blob[self._hits_blob_key] = hits
192 return blob
194 def _extract_number_of_frames(self, index):
195 timeslice = self.r.timeslices.stream(self.stream, index)
196 return len(timeslice.frames)
198 def _extract_hits(self, index):
199 timeslice = self.r.timeslices.stream(self.stream, index)
200 raw_hits = {
201 "dom_id": [],
202 "channel_id": [],
203 "time": [],
204 "tot": [],
205 "group_id": [],
206 }
208 for dom_id, frame in timeslice.frames.items():
209 raw_hits["channel_id"].extend(frame.pmt)
210 raw_hits["time"].extend(frame.tdc)
211 raw_hits["tot"].extend(frame.tot)
212 raw_hits["dom_id"].extend(len(frame.pmt) * [dom_id])
213 raw_hits["group_id"].extend(len(frame.pmt) * [0])
215 hits = Table.from_template(raw_hits, "TimesliceHits")
216 return hits
218 def get_by_frame_index(self, frame_index):
219 blob = Blob()
220 ts_info = self.timeslice_info[self.timeslice_info.frame_index == frame_index][0]
221 slice_id = ts_info.slice_id
222 hits = self._extract_hits(slice_id)
223 blob[self._hits_blob_key] = hits
224 return blob
226 def __len__(self):
227 return self.n_timeslices
229 def __iter__(self):
230 return self
232 def __next__(self):
233 self._current_blob = next(self.blobs)
234 return self._current_blob
236 def __getitem__(self, index):
237 if isinstance(index, int):
238 return self.get_blob(index)
239 elif isinstance(index, slice):
240 return self._slice_generator(index)
241 else:
242 raise TypeError("index must be int or slice")
244 def _slice_generator(self, index):
245 """A simple slice generator for iterations"""
246 start, stop, step = index.indices(len(self))
247 for i in range(start, stop, step):
248 yield self.get_blob(i)