Coverage for src/km3pipe/io/tests/test_daq.py: 100%

206 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-16 03:15 +0000

1# Filename: no_test_daq.py 

2# pylint: disable=C0111,R0904,C0103 

3""" 

4Tests for KM3NeT binary formats readout. 

5 

6""" 

7import io 

8from os.path import dirname, join 

9import struct 

10 

11import numpy as np 

12 

13from km3pipe.testing import TestCase, data_path 

14from km3pipe.io.daq import ( 

15 DAQPump, 

16 DAQPreamble, 

17 DAQHeader, 

18 DAQSummaryslice, 

19 DAQProcessor, 

20 DMMonitor, 

21 RePump, 

22 TMCHRepump, 

23 TimesliceParser, 

24) 

25 

26 

27class TestRePump(TestCase): 

28 def test_process(self): 

29 pump = RePump(filename=data_path("daq/IO_EVT.dat")) 

30 blob = pump.process({}) 

31 assert 5724 == len(blob["RawBytes"]) 

32 blob = pump.process({}) 

33 assert 1950 == len(blob["RawBytes"]) 

34 blob = pump.process({}) 

35 assert 516 == len(blob["RawBytes"]) 

36 

37 

38class TestDAQPumpWithLegacyFiles(TestCase): 

39 def test_init_with_filename(self): 

40 DAQPump(filename=data_path("daq/IO_SUM_legacy.dat"), legacy=True) 

41 

42 def test_frame_positions_in_io_sum(self): 

43 p = DAQPump(filename=data_path("daq/IO_SUM_legacy.dat"), legacy=True) 

44 assert 81 == len(p.frame_positions) 

45 self.assertListEqual([0, 656, 1312], p.frame_positions[:3]) 

46 self.assertListEqual([50973, 51629, 52285], p.frame_positions[-3:]) 

47 

48 def test_frame_positions_in_io_evt(self): 

49 p = DAQPump(filename=data_path("daq/IO_EVT_legacy.dat"), legacy=True) 

50 assert 38 == len(p.frame_positions) 

51 self.assertListEqual([0, 570, 986], p.frame_positions[:3]) 

52 self.assertListEqual([13694, 14016, 14360], p.frame_positions[-3:]) 

53 

54 def test_blob_in_io_sum(self): 

55 p = DAQPump(filename=data_path("daq/IO_SUM_legacy.dat"), legacy=True) 

56 blob = p.next_blob() 

57 assert "DAQSummaryslice" in blob.keys() 

58 assert "DAQPreamble" in blob.keys() 

59 assert "DAQHeader" in blob.keys() 

60 assert 16 == blob["DAQSummaryslice"].n_summary_frames 

61 

62 def test_blob_in_io_evt(self): 

63 p = DAQPump(filename=data_path("daq/IO_EVT_legacy.dat"), legacy=True) 

64 blob = p.next_blob() 

65 assert "DAQEvent" in blob.keys() 

66 assert "DAQPreamble" in blob.keys() 

67 assert "DAQHeader" in blob.keys() 

68 event = blob["DAQEvent"] 

69 assert 13 == event.n_triggered_hits 

70 assert 28 == event.n_snapshot_hits 

71 

72 def test_blob_iteration(self): 

73 p = DAQPump(filename=data_path("daq/IO_EVT_legacy.dat"), legacy=True) 

74 for blob in p: 

75 pass 

76 

77 def test_get_item(self): 

78 p = DAQPump(filename=data_path("daq/IO_EVT_legacy.dat"), legacy=True) 

79 blob = p[4] 

80 event = blob["DAQEvent"] 

81 assert 6 == event.n_triggered_hits 

82 assert 17 == event.n_snapshot_hits 

83 

84 

85class TestDAQPump(TestCase): 

86 def test_init_with_filename(self): 

87 DAQPump(filename=data_path("daq/IO_SUM.dat")) 

88 

89 def test_frame_positions_in_io_sum(self): 

90 p = DAQPump(filename=data_path("daq/IO_SUM.dat")) 

91 assert 3 == len(p.frame_positions) 

92 self.assertListEqual([0, 1849, 5678], p.frame_positions) 

93 

94 def test_frame_positions_in_io_evt(self): 

95 p = DAQPump(filename=data_path("daq/IO_EVT.dat")) 

96 assert 3 == len(p.frame_positions) 

97 self.assertListEqual([0, 5724, 7674], p.frame_positions) 

98 

99 def test_blob_in_io_sum(self): 

100 p = DAQPump(filename=data_path("daq/IO_SUM.dat")) 

101 blob = p.next_blob() 

102 assert "DAQSummaryslice" in blob.keys() 

103 assert "DAQPreamble" in blob.keys() 

104 assert "DAQHeader" in blob.keys() 

105 assert 33 == blob["DAQSummaryslice"].n_summary_frames 

106 

107 def test_blob_in_io_evt(self): 

108 p = DAQPump(filename=data_path("daq/IO_EVT.dat")) 

109 blob = p.next_blob() 

110 assert "DAQEvent" in blob.keys() 

111 assert "DAQPreamble" in blob.keys() 

112 assert "DAQHeader" in blob.keys() 

113 event = blob["DAQEvent"] 

114 assert 182 == event.n_triggered_hits 

115 assert 239 == event.n_snapshot_hits 

116 

117 def test_blob_iteration(self): 

118 p = DAQPump(filename=data_path("daq/IO_EVT.dat")) 

119 for blob in p: 

120 pass 

121 

122 def test_get_item(self): 

123 p = DAQPump(filename=data_path("daq/IO_EVT.dat")) 

124 blob = p[0] 

125 event = blob["DAQEvent"] 

126 assert 182 == event.n_triggered_hits 

127 assert 239 == event.n_snapshot_hits 

128 assert np.allclose( 

129 [806451572, 806455814, 806455814, 806483369, 806483369], 

130 event.triggered_hits.dom_id[:5], 

131 ) 

132 assert np.allclose([23, 14, 24, 6, 11], event.triggered_hits.channel_id[:5]) 

133 assert np.allclose( 

134 [40380598, 40380623, 40380551, 40380835, 40380920], 

135 event.triggered_hits.time[:5], 

136 ) 

137 assert np.allclose([4, 4, 4, 4, 22], event.triggered_hits.trigger_mask[:5]) 

138 

139 assert np.allclose( 

140 [806451572, 806455814, 806455814, 806483369, 806483369], 

141 event.triggered_hits.dom_id[:5], 

142 ) 

143 assert np.allclose([23, 17, 14, 24, 6], event.snapshot_hits.channel_id[:5]) 

144 assert np.allclose( 

145 [40380598, 40380623, 40380551, 40380835, 40380920], 

146 event.triggered_hits.time[:5], 

147 ) 

148 

149 

150class TestDAQProcessor(TestCase): 

151 def test_events(self): 

152 dp = DAQProcessor() 

153 

154 filename = data_path("daq/IO_EVT.dat") 

155 with open(filename, "rb") as fobj: 

156 data = fobj.read() 

157 blob = {} 

158 blob["CHData"] = data 

159 blob["CHPrefix"] = lambda x: x # quick and dirty thing to attach fields to 

160 blob["CHPrefix"].tag = "IO_EVT" 

161 

162 blob = dp(blob) 

163 

164 assert "Hits" in blob 

165 hits = blob["Hits"] 

166 assert 182 == sum(hits.triggered) 

167 assert 239 == len(hits) 

168 

169 def test_events_legacy(self): 

170 dp = DAQProcessor(legacy=True) 

171 

172 filename = data_path("daq/IO_EVT_legacy.dat") 

173 with open(filename, "rb") as fobj: 

174 data = fobj.read() 

175 blob = {} 

176 blob["CHData"] = data 

177 blob["CHPrefix"] = lambda x: x # quick and dirty thing to attach fields to 

178 blob["CHPrefix"].tag = "IO_EVT" 

179 

180 blob = dp(blob) 

181 

182 assert "Hits" in blob 

183 hits = blob["Hits"] 

184 assert 13 == sum(hits.triggered) 

185 assert 28 == len(hits) 

186 

187 

188class TestTMCHRepump(TestCase): 

189 def test_reading_version_2(self): 

190 repump = TMCHRepump(filename=data_path("daq/IO_MONIT.dat")) 

191 packets = [p["TMCHData"] for p in repump] 

192 

193 p1 = packets[0] 

194 p2 = packets[5] 

195 

196 assert 86 == p1.run 

197 assert 0 == p1.udp_sequence_number 

198 assert 541 == p1.utc_seconds 

199 assert 500000000 == p1.nanoseconds 

200 assert 806472270 == p1.dom_id 

201 assert 2 == p1.version 

202 self.assertAlmostEqual(199.05982971191406, p1.yaw) 

203 self.assertAlmostEqual(0.5397617816925049, p1.pitch) 

204 self.assertAlmostEqual(-0.2243121862411499, p1.roll) 

205 self.assertAlmostEqual(32.35, p1.temp) 

206 self.assertAlmostEqual(16.77, p1.humidity) 

207 assert np.allclose(np.full(31, 0), p1.pmt_rates) 

208 assert np.allclose([0.00708725, 0.00213623, -0.86456668], p1.A) 

209 assert np.allclose([-0.2621212, 0.02363636, 0.1430303], p1.H) 

210 assert np.allclose([-2.87721825, -0.83284622, -0.28969574], p1.G) 

211 

212 assert 86 == p2.run 

213 assert 0 == p2.udp_sequence_number 

214 assert 542 == p2.utc_seconds 

215 assert 0 == p2.nanoseconds 

216 assert 806472270 == p2.dom_id 

217 assert 2 == p2.version 

218 self.assertAlmostEqual(199.05982971191406, p2.yaw) 

219 self.assertAlmostEqual(0.5397617816925049, p2.pitch) 

220 self.assertAlmostEqual(-0.2243121862411499, p2.roll) 

221 self.assertAlmostEqual(32.35, p2.temp) 

222 self.assertAlmostEqual(16.77, p2.humidity) 

223 assert np.allclose(np.full(31, 0), p2.pmt_rates) 

224 assert np.allclose([0.00708725, 0.00213623, -0.86456668], p2.A) 

225 assert np.allclose([-0.2621212, 0.02363636, 0.1430303], p2.H) 

226 assert np.allclose([-2.87721825, -0.83284622, -0.28969574], p2.G) 

227 

228 

229class TestDMMonitor(TestCase): 

230 def test_init(self): 

231 dmm = DMMonitor("a") 

232 assert "http://a:1302/mon/" == dmm._url 

233 

234 def test_available_parameters(self): 

235 dmm = DMMonitor("a") 

236 dmm._available_parameters = ["b", "c"] 

237 self.assertListEqual(["b", "c"], dmm.available_parameters) 

238 

239 

240class TestTimesliceParserLegacy(TestCase): 

241 def test_l0(self): 

242 with open(data_path("daq/IO_TSL0_legacy.dat"), "rb") as fobj: 

243 ts_info, ts_frameinfos, ts_hits = TimesliceParser( 

244 legacy=True 

245 )._parse_timeslice(fobj) 

246 assert 200 == len(ts_hits) 

247 assert 25 == ts_hits[0].tot 

248 assert np.all(ts_hits.channel_id < 31) 

249 assert 808447031 == ts_hits[23].dom_id 

250 assert 232 == ts_info[0].frame_index 

251 

252 def test_l1(self): 

253 with open(data_path("daq/IO_TSL1_legacy.dat"), "rb") as fobj: 

254 ts_info, ts_frameinfos, ts_hits = TimesliceParser( 

255 legacy=True 

256 )._parse_timeslice(fobj) 

257 assert 0 == len(ts_hits) 

258 assert 4873 == ts_info[0].frame_index 

259 

260 def test_l2(self): 

261 with open(data_path("daq/IO_TSL2_legacy.dat"), "rb") as fobj: 

262 ts_info, ts_frameinfos, ts_hits = TimesliceParser( 

263 legacy=True 

264 )._parse_timeslice(fobj) 

265 assert 0 == len(ts_hits) 

266 assert 4872 == ts_info[0].frame_index 

267 

268 

269class TestTimesliceParser(TestCase): 

270 def test_sn(self): 

271 fobj = open(data_path("daq/IO_TSSN.dat"), "rb") 

272 

273 ts_info, ts_frameinfos, ts_hits = TimesliceParser()._parse_timeslice(fobj) 

274 assert 0 == len(ts_hits) 

275 assert 198 == ts_info[0].frame_index 

276 

277 ts_info, ts_frameinfos, ts_hits = TimesliceParser()._parse_timeslice(fobj) 

278 assert 4 == len(ts_hits) 

279 assert np.all(ts_hits.channel_id < 31) 

280 assert 218 == ts_info[0].frame_index 

281 

282 ts_info, ts_frameinfos, ts_hits = TimesliceParser()._parse_timeslice(fobj) 

283 assert 54 == len(ts_hits) 

284 assert np.all(ts_hits.channel_id < 31) 

285 assert 238 == ts_info[0].frame_index 

286 

287 fobj.close()