Coverage for src/km3pipe/io/online.py: 75%

134 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 03:14 +0000

1#!/usr/bin/env python 

2# Filename: online.py 

3# pylint: disable= 

4""" 

5Pump for the online file format  

6(the file format formerly known as formerly as JPP) 

7 

8""" 

9 

10import numpy as np 

11import km3io 

12 

13from thepipe import Module, Blob 

14from km3pipe.dataclasses import Table 

15from km3pipe.logger import get_logger 

16 

17log = get_logger(__name__) # pylint: disable=C0103 

18 

19__author__ = "Tamas Gal" 

20__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

21__credits__ = ["Thomas Heid", "Giuliano Maggi", "Moritz Lotze", "Johannes Schumann"] 

22__license__ = "MIT" 

23__maintainer__ = "Tamas Gal" 

24__email__ = "tgal@km3net.de" 

25__status__ = "Development" 

26 

27 

28class EventPump(Module): 

29 """A pump for DAQ (triggered) events in online files. 

30 

31 Parameters 

32 ---------- 

33 filename: str 

34 Name of the file to open. 

35 

36 """ 

37 

38 def configure(self): 

39 self.event_index = self.get("index") or 0 

40 self.filename = self.require("filename") 

41 

42 self.event_reader = km3io.OnlineReader(self.filename.encode()) 

43 self.blobs = self.blob_generator() 

44 self.n_events = len(self.event_reader.events) 

45 self._current_blob = Blob() 

46 

47 def blob_generator(self): 

48 for i in range(self.n_events): 

49 try: 

50 yield self.extract_event(i) 

51 except IndexError: 

52 return 

53 

54 def __getitem__(self, index): 

55 if index >= self.n_events: 

56 raise IndexError 

57 return self.extract_event(index) 

58 

59 def _get_trigger_mask(self, snapshot_hits, triggered_hits): 

60 trg_mask = np.zeros(len(snapshot_hits)) 

61 s = np.array( 

62 [snapshot_hits.time, snapshot_hits.channel_id, snapshot_hits.dom_id] 

63 ).transpose() 

64 t = np.array( 

65 [triggered_hits.time, triggered_hits.channel_id, triggered_hits.dom_id] 

66 ).transpose() 

67 cmp_mask = s == t[:, None] 

68 trg_map = np.argwhere(np.all(cmp_mask, axis=2))[:, 1] 

69 trg_mask[trg_map] = triggered_hits.trigger_mask 

70 return trg_mask 

71 

72 def extract_event(self, event_number): 

73 blob = self._current_blob 

74 r = self.event_reader 

75 hits = r.events.snapshot_hits[event_number] 

76 trg_hits = r.events.triggered_hits[event_number] 

77 raw_event_info = r.events.headers[event_number] 

78 

79 trigger_mask = self._get_trigger_mask(hits, trg_hits) 

80 hit_series = Table( 

81 { 

82 "channel_id": hits.channel_id, 

83 "dom_id": hits.dom_id, 

84 "time": hits.time, 

85 "tot": hits.tot, 

86 "triggered": trigger_mask, 

87 }, 

88 name="Hits", 

89 h5loc="/hits", 

90 split_h5=True, 

91 ) 

92 

93 event_info = Table( 

94 { 

95 "det_id": raw_event_info["detector_id"], 

96 "frame_index": raw_event_info["frame_index"], 

97 "livetime_sec": 0, 

98 "mc_id": 0, 

99 "mc_t": 0, 

100 "n_events_gen": 0, 

101 "n_files_gen": 0, 

102 "overlays": raw_event_info["overlays"], 

103 "trigger_counter": raw_event_info["trigger_counter"], 

104 "trigger_mask": raw_event_info["trigger_mask"], 

105 "utc_nanoseconds": raw_event_info["UTC_16nanosecondcycles"] * 16.0, 

106 "utc_seconds": raw_event_info["UTC_seconds"], 

107 "weight_w1": np.nan, 

108 "weight_w2": np.nan, 

109 "weight_w3": np.nan, 

110 "run_id": raw_event_info["run"], 

111 }, 

112 name="EventInfo", 

113 h5loc="/event_info", 

114 ) 

115 

116 self.event_index += 1 

117 blob["EventInfo"] = event_info 

118 blob["Hits"] = hit_series 

119 return blob 

120 

121 def process(self, blob): 

122 self._current_blob = blob 

123 return next(self.blobs) 

124 

125 def __iter__(self): 

126 return self 

127 

128 def __next__(self): 

129 self._current_blob = next(self.blobs) 

130 return self._current_blob 

131 

132 

133class TimeslicePump(Module): 

134 """A pump to read and extract timeslices. Currently only hits are read. 

135 

136 Parameters 

137 ------------------- 

138 filename: str 

139 stream: str ('L0', 'L1', 'L2', 'SN') default: 'L1' 

140 

141 """ 

142 

143 def configure(self): 

144 fname = self.require("filename") 

145 self.stream = self.get("stream", default="L1") 

146 self.blobs = self.timeslice_generator() 

147 self.r = km3io.OnlineReader(fname) 

148 self.timeslice_info = self.create_timeslice_info() 

149 self.n_timeslices = len(self.timeslice_info) 

150 

151 self._current_blob = Blob() 

152 self._hits_blob_key = "{}Hits".format(self.stream if self.stream else "TS") 

153 

154 def create_timeslice_info(self): 

155 header = self.r.timeslices.stream(self.stream, 0).header 

156 slice_ids = header["frame_index"].array() 

157 timestamps = header["timeslice_start.UTC_seconds"].array() 

158 number_of_slices = len(slice_ids) 

159 nanoseconds = header["timeslice_start.UTC_16nanosecondcycles"].array() * 16 

160 timeslice_info = Table.from_template( 

161 { 

162 "frame_index": slice_ids, 

163 "slice_id": range(number_of_slices), 

164 "timestamp": timestamps, 

165 "nanoseconds": nanoseconds, 

166 "n_frames": np.zeros(len(slice_ids)), 

167 }, 

168 "TimesliceInfo", 

169 ) 

170 return timeslice_info 

171 

172 def process(self, blob): 

173 self._current_blob = blob 

174 return next(self.blobs) 

175 

176 def timeslice_generator(self): 

177 """Uses slice ID as iterator""" 

178 slice_id = 0 

179 while slice_id < self.n_timeslices: 

180 blob = self.get_blob(slice_id) 

181 yield blob 

182 slice_id += 1 

183 

184 def get_blob(self, index): 

185 """Index is slice ID""" 

186 blob = self._current_blob 

187 hits = self._extract_hits(index) 

188 hits.group_id = index 

189 blob["TimesliceInfo"] = self.timeslice_info[index : index + 1] 

190 blob["TimesliceInfo"]["n_frames"] = self._extract_number_of_frames(index) 

191 blob[self._hits_blob_key] = hits 

192 return blob 

193 

194 def _extract_number_of_frames(self, index): 

195 timeslice = self.r.timeslices.stream(self.stream, index) 

196 return len(timeslice.frames) 

197 

198 def _extract_hits(self, index): 

199 timeslice = self.r.timeslices.stream(self.stream, index) 

200 raw_hits = { 

201 "dom_id": [], 

202 "channel_id": [], 

203 "time": [], 

204 "tot": [], 

205 "group_id": [], 

206 } 

207 

208 for dom_id, frame in timeslice.frames.items(): 

209 raw_hits["channel_id"].extend(frame.pmt) 

210 raw_hits["time"].extend(frame.tdc) 

211 raw_hits["tot"].extend(frame.tot) 

212 raw_hits["dom_id"].extend(len(frame.pmt) * [dom_id]) 

213 raw_hits["group_id"].extend(len(frame.pmt) * [0]) 

214 

215 hits = Table.from_template(raw_hits, "TimesliceHits") 

216 return hits 

217 

218 def get_by_frame_index(self, frame_index): 

219 blob = Blob() 

220 ts_info = self.timeslice_info[self.timeslice_info.frame_index == frame_index][0] 

221 slice_id = ts_info.slice_id 

222 hits = self._extract_hits(slice_id) 

223 blob[self._hits_blob_key] = hits 

224 return blob 

225 

226 def __len__(self): 

227 return self.n_timeslices 

228 

229 def __iter__(self): 

230 return self 

231 

232 def __next__(self): 

233 self._current_blob = next(self.blobs) 

234 return self._current_blob 

235 

236 def __getitem__(self, index): 

237 if isinstance(index, int): 

238 return self.get_blob(index) 

239 elif isinstance(index, slice): 

240 return self._slice_generator(index) 

241 else: 

242 raise TypeError("index must be int or slice") 

243 

244 def _slice_generator(self, index): 

245 """A simple slice generator for iterations""" 

246 start, stop, step = index.indices(len(self)) 

247 for i in range(start, stop, step): 

248 yield self.get_blob(i)