Coverage for src/km3pipe/io/tests/test_daq.py: 100%
206 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-16 03:15 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-16 03:15 +0000
1# Filename: no_test_daq.py
2# pylint: disable=C0111,R0904,C0103
3"""
4Tests for KM3NeT binary formats readout.
6"""
7import io
8from os.path import dirname, join
9import struct
11import numpy as np
13from km3pipe.testing import TestCase, data_path
14from km3pipe.io.daq import (
15 DAQPump,
16 DAQPreamble,
17 DAQHeader,
18 DAQSummaryslice,
19 DAQProcessor,
20 DMMonitor,
21 RePump,
22 TMCHRepump,
23 TimesliceParser,
24)
27class TestRePump(TestCase):
28 def test_process(self):
29 pump = RePump(filename=data_path("daq/IO_EVT.dat"))
30 blob = pump.process({})
31 assert 5724 == len(blob["RawBytes"])
32 blob = pump.process({})
33 assert 1950 == len(blob["RawBytes"])
34 blob = pump.process({})
35 assert 516 == len(blob["RawBytes"])
38class TestDAQPumpWithLegacyFiles(TestCase):
39 def test_init_with_filename(self):
40 DAQPump(filename=data_path("daq/IO_SUM_legacy.dat"), legacy=True)
42 def test_frame_positions_in_io_sum(self):
43 p = DAQPump(filename=data_path("daq/IO_SUM_legacy.dat"), legacy=True)
44 assert 81 == len(p.frame_positions)
45 self.assertListEqual([0, 656, 1312], p.frame_positions[:3])
46 self.assertListEqual([50973, 51629, 52285], p.frame_positions[-3:])
48 def test_frame_positions_in_io_evt(self):
49 p = DAQPump(filename=data_path("daq/IO_EVT_legacy.dat"), legacy=True)
50 assert 38 == len(p.frame_positions)
51 self.assertListEqual([0, 570, 986], p.frame_positions[:3])
52 self.assertListEqual([13694, 14016, 14360], p.frame_positions[-3:])
54 def test_blob_in_io_sum(self):
55 p = DAQPump(filename=data_path("daq/IO_SUM_legacy.dat"), legacy=True)
56 blob = p.next_blob()
57 assert "DAQSummaryslice" in blob.keys()
58 assert "DAQPreamble" in blob.keys()
59 assert "DAQHeader" in blob.keys()
60 assert 16 == blob["DAQSummaryslice"].n_summary_frames
62 def test_blob_in_io_evt(self):
63 p = DAQPump(filename=data_path("daq/IO_EVT_legacy.dat"), legacy=True)
64 blob = p.next_blob()
65 assert "DAQEvent" in blob.keys()
66 assert "DAQPreamble" in blob.keys()
67 assert "DAQHeader" in blob.keys()
68 event = blob["DAQEvent"]
69 assert 13 == event.n_triggered_hits
70 assert 28 == event.n_snapshot_hits
72 def test_blob_iteration(self):
73 p = DAQPump(filename=data_path("daq/IO_EVT_legacy.dat"), legacy=True)
74 for blob in p:
75 pass
77 def test_get_item(self):
78 p = DAQPump(filename=data_path("daq/IO_EVT_legacy.dat"), legacy=True)
79 blob = p[4]
80 event = blob["DAQEvent"]
81 assert 6 == event.n_triggered_hits
82 assert 17 == event.n_snapshot_hits
85class TestDAQPump(TestCase):
86 def test_init_with_filename(self):
87 DAQPump(filename=data_path("daq/IO_SUM.dat"))
89 def test_frame_positions_in_io_sum(self):
90 p = DAQPump(filename=data_path("daq/IO_SUM.dat"))
91 assert 3 == len(p.frame_positions)
92 self.assertListEqual([0, 1849, 5678], p.frame_positions)
94 def test_frame_positions_in_io_evt(self):
95 p = DAQPump(filename=data_path("daq/IO_EVT.dat"))
96 assert 3 == len(p.frame_positions)
97 self.assertListEqual([0, 5724, 7674], p.frame_positions)
99 def test_blob_in_io_sum(self):
100 p = DAQPump(filename=data_path("daq/IO_SUM.dat"))
101 blob = p.next_blob()
102 assert "DAQSummaryslice" in blob.keys()
103 assert "DAQPreamble" in blob.keys()
104 assert "DAQHeader" in blob.keys()
105 assert 33 == blob["DAQSummaryslice"].n_summary_frames
107 def test_blob_in_io_evt(self):
108 p = DAQPump(filename=data_path("daq/IO_EVT.dat"))
109 blob = p.next_blob()
110 assert "DAQEvent" in blob.keys()
111 assert "DAQPreamble" in blob.keys()
112 assert "DAQHeader" in blob.keys()
113 event = blob["DAQEvent"]
114 assert 182 == event.n_triggered_hits
115 assert 239 == event.n_snapshot_hits
117 def test_blob_iteration(self):
118 p = DAQPump(filename=data_path("daq/IO_EVT.dat"))
119 for blob in p:
120 pass
122 def test_get_item(self):
123 p = DAQPump(filename=data_path("daq/IO_EVT.dat"))
124 blob = p[0]
125 event = blob["DAQEvent"]
126 assert 182 == event.n_triggered_hits
127 assert 239 == event.n_snapshot_hits
128 assert np.allclose(
129 [806451572, 806455814, 806455814, 806483369, 806483369],
130 event.triggered_hits.dom_id[:5],
131 )
132 assert np.allclose([23, 14, 24, 6, 11], event.triggered_hits.channel_id[:5])
133 assert np.allclose(
134 [40380598, 40380623, 40380551, 40380835, 40380920],
135 event.triggered_hits.time[:5],
136 )
137 assert np.allclose([4, 4, 4, 4, 22], event.triggered_hits.trigger_mask[:5])
139 assert np.allclose(
140 [806451572, 806455814, 806455814, 806483369, 806483369],
141 event.triggered_hits.dom_id[:5],
142 )
143 assert np.allclose([23, 17, 14, 24, 6], event.snapshot_hits.channel_id[:5])
144 assert np.allclose(
145 [40380598, 40380623, 40380551, 40380835, 40380920],
146 event.triggered_hits.time[:5],
147 )
150class TestDAQProcessor(TestCase):
151 def test_events(self):
152 dp = DAQProcessor()
154 filename = data_path("daq/IO_EVT.dat")
155 with open(filename, "rb") as fobj:
156 data = fobj.read()
157 blob = {}
158 blob["CHData"] = data
159 blob["CHPrefix"] = lambda x: x # quick and dirty thing to attach fields to
160 blob["CHPrefix"].tag = "IO_EVT"
162 blob = dp(blob)
164 assert "Hits" in blob
165 hits = blob["Hits"]
166 assert 182 == sum(hits.triggered)
167 assert 239 == len(hits)
169 def test_events_legacy(self):
170 dp = DAQProcessor(legacy=True)
172 filename = data_path("daq/IO_EVT_legacy.dat")
173 with open(filename, "rb") as fobj:
174 data = fobj.read()
175 blob = {}
176 blob["CHData"] = data
177 blob["CHPrefix"] = lambda x: x # quick and dirty thing to attach fields to
178 blob["CHPrefix"].tag = "IO_EVT"
180 blob = dp(blob)
182 assert "Hits" in blob
183 hits = blob["Hits"]
184 assert 13 == sum(hits.triggered)
185 assert 28 == len(hits)
188class TestTMCHRepump(TestCase):
189 def test_reading_version_2(self):
190 repump = TMCHRepump(filename=data_path("daq/IO_MONIT.dat"))
191 packets = [p["TMCHData"] for p in repump]
193 p1 = packets[0]
194 p2 = packets[5]
196 assert 86 == p1.run
197 assert 0 == p1.udp_sequence_number
198 assert 541 == p1.utc_seconds
199 assert 500000000 == p1.nanoseconds
200 assert 806472270 == p1.dom_id
201 assert 2 == p1.version
202 self.assertAlmostEqual(199.05982971191406, p1.yaw)
203 self.assertAlmostEqual(0.5397617816925049, p1.pitch)
204 self.assertAlmostEqual(-0.2243121862411499, p1.roll)
205 self.assertAlmostEqual(32.35, p1.temp)
206 self.assertAlmostEqual(16.77, p1.humidity)
207 assert np.allclose(np.full(31, 0), p1.pmt_rates)
208 assert np.allclose([0.00708725, 0.00213623, -0.86456668], p1.A)
209 assert np.allclose([-0.2621212, 0.02363636, 0.1430303], p1.H)
210 assert np.allclose([-2.87721825, -0.83284622, -0.28969574], p1.G)
212 assert 86 == p2.run
213 assert 0 == p2.udp_sequence_number
214 assert 542 == p2.utc_seconds
215 assert 0 == p2.nanoseconds
216 assert 806472270 == p2.dom_id
217 assert 2 == p2.version
218 self.assertAlmostEqual(199.05982971191406, p2.yaw)
219 self.assertAlmostEqual(0.5397617816925049, p2.pitch)
220 self.assertAlmostEqual(-0.2243121862411499, p2.roll)
221 self.assertAlmostEqual(32.35, p2.temp)
222 self.assertAlmostEqual(16.77, p2.humidity)
223 assert np.allclose(np.full(31, 0), p2.pmt_rates)
224 assert np.allclose([0.00708725, 0.00213623, -0.86456668], p2.A)
225 assert np.allclose([-0.2621212, 0.02363636, 0.1430303], p2.H)
226 assert np.allclose([-2.87721825, -0.83284622, -0.28969574], p2.G)
229class TestDMMonitor(TestCase):
230 def test_init(self):
231 dmm = DMMonitor("a")
232 assert "http://a:1302/mon/" == dmm._url
234 def test_available_parameters(self):
235 dmm = DMMonitor("a")
236 dmm._available_parameters = ["b", "c"]
237 self.assertListEqual(["b", "c"], dmm.available_parameters)
240class TestTimesliceParserLegacy(TestCase):
241 def test_l0(self):
242 with open(data_path("daq/IO_TSL0_legacy.dat"), "rb") as fobj:
243 ts_info, ts_frameinfos, ts_hits = TimesliceParser(
244 legacy=True
245 )._parse_timeslice(fobj)
246 assert 200 == len(ts_hits)
247 assert 25 == ts_hits[0].tot
248 assert np.all(ts_hits.channel_id < 31)
249 assert 808447031 == ts_hits[23].dom_id
250 assert 232 == ts_info[0].frame_index
252 def test_l1(self):
253 with open(data_path("daq/IO_TSL1_legacy.dat"), "rb") as fobj:
254 ts_info, ts_frameinfos, ts_hits = TimesliceParser(
255 legacy=True
256 )._parse_timeslice(fobj)
257 assert 0 == len(ts_hits)
258 assert 4873 == ts_info[0].frame_index
260 def test_l2(self):
261 with open(data_path("daq/IO_TSL2_legacy.dat"), "rb") as fobj:
262 ts_info, ts_frameinfos, ts_hits = TimesliceParser(
263 legacy=True
264 )._parse_timeslice(fobj)
265 assert 0 == len(ts_hits)
266 assert 4872 == ts_info[0].frame_index
269class TestTimesliceParser(TestCase):
270 def test_sn(self):
271 fobj = open(data_path("daq/IO_TSSN.dat"), "rb")
273 ts_info, ts_frameinfos, ts_hits = TimesliceParser()._parse_timeslice(fobj)
274 assert 0 == len(ts_hits)
275 assert 198 == ts_info[0].frame_index
277 ts_info, ts_frameinfos, ts_hits = TimesliceParser()._parse_timeslice(fobj)
278 assert 4 == len(ts_hits)
279 assert np.all(ts_hits.channel_id < 31)
280 assert 218 == ts_info[0].frame_index
282 ts_info, ts_frameinfos, ts_hits = TimesliceParser()._parse_timeslice(fobj)
283 assert 54 == len(ts_hits)
284 assert np.all(ts_hits.channel_id < 31)
285 assert 238 == ts_info[0].frame_index
287 fobj.close()