Coverage for src/km3pipe/io/tests/test_evt.py: 95%

231 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 03:15 +0000

1# Filename: test_evt.py 

2# pylint: disable=locally-disabled,C0111,R0904,C0301,C0103,W0212 

3 

4from io import StringIO 

5from os.path import join, dirname 

6 

7import numpy as np 

8 

9from km3pipe.testing import TestCase, skip, data_path 

10from km3pipe.io.evt import EvtPump, EVT_PARSERS 

11 

12__author__ = "Tamas Gal" 

13__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

14__credits__ = [] 

15__license__ = "MIT" 

16__maintainer__ = "Tamas Gal" 

17__email__ = "tgal@km3net.de" 

18__status__ = "Development" 

19 

20 

21class TestEvtPump(TestCase): 

22 def setUp(self): 

23 self.valid_evt_header = "\n".join( 

24 ( 

25 "start_run: 1", 

26 "cut_nu: 0.100E+03 0.100E+09-0.100E+01 0.100E+01", 

27 "spectrum: -1.40", 

28 "physics: gSeaGen 4.1 180126 165142", 

29 "physics: GENIE 2.10.2 180126 165142", 

30 "end_event:", 

31 "start_event: 12 1", 

32 "track_in: 1 -389.951 213.427 516 -0.204562 -0.60399 -0.770293" 

33 + " 9.092 0 5 40.998", 

34 "hit: 1 44675 1 1170.59 5 2 1 1170.59", 

35 "end_event:", 

36 "start_event: 13 1", 

37 "track_in: 1 272.695 -105.613 516 -0.425451 -0.370522 -0.825654" 

38 + " 2431.47 0 5 -1380", 

39 "track_in: 2 272.348 -106.292 516 -0.425451 -0.370522 -0.825654" 

40 + " 24670.7 1.33 5 -1484", 

41 "track_in: 3 279.47 -134.999 516 -0.425451 -0.370522 -0.825654" 

42 + " 164.586 26.7 5 601.939", 

43 "hit: 1 20140 1 1140.06 5 1 1 1140.06", 

44 "hit: 2 20159 1 1177.14 5 1 1 1177.14", 

45 "hit: 3 20164 1 1178.19 5 1 1 1178.19", 

46 "hit: 4 20170 1 1177.23 5 1 1 1177.23", 

47 "hit: 5 20171 2 1177.25 5 1 2 1177.25", 

48 "end_event:", 

49 "start_event: 14 1", 

50 "track_in: 1 40.256 -639.888 516 0.185998 0.476123 -0.859483" 

51 + " 10016.1 0 5 -1668", 

52 "hit: 1 33788 1 2202.81 5 1 1 2202.81", 

53 "hit: 2 33801 1 2248.95 5 1 1 2248.95", 

54 "hit: 3 33814 1 2249.2 5 1 1 2249.2", 

55 "end_event:", 

56 ) 

57 ) 

58 self.no_evt_header = "\n".join( 

59 ( 

60 "start_event: 12 1", 

61 "track_in: 1 -389.951 213.427 516 -0.204562 -0.60399 -0.770293" 

62 + " 9.092 0 5 40.998", 

63 "hit: 1 44675 1 1170.59 5 2 1 1170.59", 

64 "end_event:", 

65 "start_event: 13 1", 

66 "track_in: 1 272.695 -105.613 516 -0.425451 -0.370522 -0.825654" 

67 + " 2431.47 0 5 -1380", 

68 "track_in: 2 272.348 -106.292 516 -0.425451 -0.370522 -0.825654" 

69 + " 24670.7 1.33 5 -1484", 

70 "track_in: 3 279.47 -134.999 516 -0.425451 -0.370522 -0.825654" 

71 + " 164.586 26.7 5 601.939", 

72 "hit: 1 20140 1 1140.06 5 1 1 1140.06", 

73 "hit: 2 20159 1 1177.14 5 1 1 1177.14", 

74 "hit: 3 20164 1 1178.19 5 1 1 1178.19", 

75 "hit: 4 20170 1 1177.23 5 1 1 1177.23", 

76 "hit: 5 20171 2 1177.25 5 1 2 1177.25", 

77 "end_event:", 

78 "start_event: 14 1", 

79 "track_in: 1 40.256 -639.888 516 0.185998 0.476123 -0.859483" 

80 + " 10016.1 0 5 -1668", 

81 "hit: 1 33788 1 2202.81 5 1 1 2202.81", 

82 "hit: 2 33801 1 2248.95 5 1 1 2248.95", 

83 "hit: 3 33814 1 2249.2 5 1 1 2249.2", 

84 "end_event:", 

85 ) 

86 ) 

87 self.corrupt_evt_header = "foo" 

88 self.corrupt_line = "\n".join( 

89 ("start_event: 1 1", "corrupt line", "end_event:") 

90 ) 

91 

92 self.pump = EvtPump(parsers=[]) 

93 self.pump.blob_file = StringIO(self.valid_evt_header) 

94 

95 def tearDown(self): 

96 self.pump.blob_file.close() 

97 

98 def test_parse_header(self): 

99 raw_header = self.pump.extract_header() 

100 self.assertEqual([["1"]], raw_header["start_run"]) 

101 self.assertAlmostEqual(-1.4, float(raw_header["spectrum"][0][0])) 

102 self.assertAlmostEqual(1, float(raw_header["cut_nu"][0][2])) 

103 

104 def test_header_entries_with_same_tag_are_put_in_lists(self): 

105 raw_header = self.pump.extract_header() 

106 self.assertAlmostEqual(2, len(raw_header["physics"])) 

107 self.assertAlmostEqual(1, len(raw_header["spectrum"])) 

108 assert "gSeaGen" == raw_header["physics"][0][0] 

109 assert "GENIE" == raw_header["physics"][1][0] 

110 

111 # def test_incomplete_header_raises_value_error(self): 

112 # temp_file = StringIO(self.corrupt_evt_header) 

113 # pump = EvtPump() 

114 # pump.blob_file = temp_file 

115 # with self.assertRaises(ValueError): 

116 # pump.extract_header() 

117 # temp_file.close() 

118 

119 def test_record_offset_saves_correct_offset(self): 

120 self.pump.blob_file = StringIO("a" * 42) 

121 offsets = [1, 4, 9, 12, 23] 

122 for offset in offsets: 

123 self.pump.blob_file.seek(0, 0) 

124 self.pump.blob_file.seek(offset, 0) 

125 self.pump._record_offset() 

126 self.assertListEqual(offsets, self.pump.event_offsets) 

127 

128 def test_event_offset_is_at_first_event_after_parsing_header(self): 

129 self.pump.extract_header() 

130 self.assertEqual(161, self.pump.event_offsets[0]) 

131 

132 def test_rebuild_offsets(self): 

133 self.pump.extract_header() 

134 self.pump._cache_offsets() 

135 self.assertListEqual([161, 306, 773], self.pump.event_offsets) 

136 

137 def test_rebuild_offsets_without_header(self): 

138 self.pump.blob_file = StringIO(self.no_evt_header) 

139 self.pump.extract_header() 

140 self.pump._cache_offsets() 

141 self.assertListEqual([0, 145, 612], self.pump.event_offsets) 

142 

143 def test_cache_enabled_triggers_rebuild_offsets(self): 

144 self.pump.cache_enabled = True 

145 self.pump.prepare_blobs() 

146 self.assertEqual(3, len(self.pump.event_offsets)) 

147 

148 def test_cache_disabled_doesnt_trigger_cache_offsets(self): 

149 self.pump.cache_enabled = False 

150 self.pump.prepare_blobs() 

151 self.assertEqual(1, len(self.pump.event_offsets)) 

152 

153 def test_get_blob_triggers_cache_offsets_if_cache_disabled(self): 

154 "...and asking for not indexed event" 

155 self.pump.cache_enabled = False 

156 self.pump.prepare_blobs() 

157 self.assertEqual(1, len(self.pump.event_offsets)) 

158 blob = self.pump.get_blob(2) 

159 self.assertTupleEqual((14, 1), blob["start_event"]) 

160 self.assertEqual(3, len(self.pump.event_offsets)) 

161 

162 def test_get_blob_raises_index_error_for_wrong_index(self): 

163 self.pump.prepare_blobs() 

164 with self.assertRaises(IndexError): 

165 self.pump.get_blob(23) 

166 

167 def test_get_blob_returns_correct_event_information(self): 

168 self.pump.prepare_blobs() 

169 blob = self.pump.get_blob(0) 

170 self.assertTrue("raw_header" in blob) 

171 self.assertEqual([["1"]], blob["raw_header"]["start_run"]) 

172 self.assertTupleEqual((12, 1), blob["start_event"]) 

173 # TODO: all the other stuff like hit, track etc. 

174 assert "hit" in blob 

175 assert "track_in" in blob 

176 assert np.allclose( 

177 [[1.0, 44675.0, 1.0, 1170.59, 5.0, 2.0, 1.0, 1170.59]], blob["hit"] 

178 ) 

179 blob = self.pump.get_blob(1) 

180 assert 5 == len(blob["hit"]) 

181 assert np.allclose( 

182 [3.0, 20164.0, 1.0, 1178.19, 5.0, 1.0, 1.0, 1178.19], blob["hit"][2] 

183 ) 

184 

185 def test_get_blob_returns_correct_events(self): 

186 self.pump.prepare_blobs() 

187 blob = self.pump.get_blob(0) 

188 self.assertTupleEqual((12, 1), blob["start_event"]) 

189 blob = self.pump.get_blob(2) 

190 self.assertTupleEqual((14, 1), blob["start_event"]) 

191 blob = self.pump.get_blob(1) 

192 self.assertTupleEqual((13, 1), blob["start_event"]) 

193 

194 def test_process_returns_correct_blobs(self): 

195 self.pump.prepare_blobs() 

196 blob = self.pump.process() 

197 self.assertTupleEqual((12, 1), blob["start_event"]) 

198 blob = self.pump.process() 

199 self.assertTupleEqual((13, 1), blob["start_event"]) 

200 blob = self.pump.process() 

201 self.assertTupleEqual((14, 1), blob["start_event"]) 

202 

203 def test_process_raises_stop_iteration_if_eof_reached(self): 

204 self.pump.prepare_blobs() 

205 self.pump.process() 

206 self.pump.process() 

207 self.pump.process() 

208 with self.assertRaises(StopIteration): 

209 self.pump.process() 

210 

211 def test_pump_acts_as_iterator(self): 

212 self.pump.prepare_blobs() 

213 event_numbers = [] 

214 for blob in self.pump: 

215 event_numbers.append(blob["start_event"][0]) 

216 self.assertListEqual([12, 13, 14], event_numbers) 

217 

218 def test_pump_has_len(self): 

219 self.pump.prepare_blobs() 

220 self.assertEqual(3, len(self.pump)) 

221 

222 def test_pump_get_item_returns_first_for_index_zero(self): 

223 self.pump.prepare_blobs() 

224 first_blob = self.pump[0] 

225 self.assertEqual(12, first_blob["start_event"][0]) 

226 

227 def test_pump_get_item_returns_correct_blob_for_index(self): 

228 self.pump.prepare_blobs() 

229 blob = self.pump[1] 

230 self.assertEqual(13, blob["start_event"][0]) 

231 

232 def test_pump_slice_generator(self): 

233 self.pump.prepare_blobs() 

234 blobs = self.pump[:] 

235 blobs = list(self.pump[1:3]) 

236 self.assertEqual(2, len(blobs)) 

237 self.assertEqual((13, 1), blobs[0]["start_event"]) 

238 

239 def test_create_blob_entry_for_line_ignores_corrupt_line(self): 

240 self.pump.blob_file = StringIO(self.corrupt_line) 

241 self.pump.extract_header() 

242 self.pump.prepare_blobs() 

243 self.pump.get_blob(0) 

244 

245 def test_parsers_are_ignored_if_not_valid(self): 

246 self.pump = EvtPump(parsers=["a", "b"]) 

247 self.pump.blob_file = StringIO(self.valid_evt_header) 

248 assert "a" not in self.pump.parsers 

249 assert "b" not in self.pump.parsers 

250 

251 def test_parsers_are_added(self): 

252 self.pump = EvtPump(parsers=["km3sim"]) 

253 self.pump.blob_file = StringIO(self.valid_evt_header) 

254 assert EVT_PARSERS["km3sim"] in self.pump.parsers 

255 

256 def test_custom_parser(self): 

257 def a_parser(blob): 

258 blob["foo"] = 23 

259 

260 self.pump = EvtPump(parsers=[a_parser]) 

261 self.pump.blob_file = StringIO(self.valid_evt_header) 

262 self.pump.extract_header() 

263 self.pump.prepare_blobs() 

264 blob = self.pump[0] 

265 

266 assert 23 == blob["foo"] 

267 

268 def test_auto_parser_finds_all_physics_parsers(self): 

269 self.pump = EvtPump(parsers="auto") 

270 self.pump.blob_file = StringIO(self.valid_evt_header) 

271 self.pump.extract_header() 

272 assert EVT_PARSERS["gseagen"] in self.pump.parsers 

273 

274 

275class TestEvtFilePump(TestCase): 

276 def setUp(self): 

277 self.fname = data_path("evt/example_numuNC.evt") 

278 

279 def test_pipe(self): 

280 pump = EvtPump(filename=self.fname) 

281 next(pump) 

282 pump.finish() 

283 

284 

285class TestCorsika(TestCase): 

286 def setUp(self): 

287 self.fname = data_path("evt/example_corant_propa.evt") 

288 

289 def test_pipe(self): 

290 pump = EvtPump(filename=self.fname) 

291 next(pump) 

292 pump.finish() 

293 

294 

295class TestPropa(TestCase): 

296 def setUp(self): 

297 self.fname = data_path("evt/example_corant_propa.evt") 

298 self.fnames = [] 

299 for i in [0, 1]: 

300 self.fnames.append(data_path("evt/example_corant_propa.evt")) 

301 

302 def test_pipe(self): 

303 pump = EvtPump(filename=self.fname, parsers=["propa"]) 

304 assert EVT_PARSERS["propa"] in pump.parsers 

305 blob = next(pump) 

306 assert "start_event" in blob 

307 assert "track_primary" in blob 

308 assert "track_in" in blob 

309 pump.finish() 

310 

311 def test_filenames(self): 

312 pump = EvtPump(filenames=self.fnames, parsers=["propa"]) 

313 assert EVT_PARSERS["propa"] in pump.parsers 

314 blob = next(pump) 

315 assert "start_event" in blob 

316 assert "track_primary" in blob 

317 assert "track_in" in blob 

318 pump.finish() 

319 

320 @skip 

321 def test_auto_parser(self): 

322 pump = EvtPump(filename=self.fname) 

323 assert EVT_PARSERS["propa"] in pump.parsers 

324 blob = next(pump) 

325 assert "start_event" in blob 

326 assert "track_primary" in blob 

327 assert "Muon" in blob 

328 assert "MuonMultiplicity" in blob 

329 assert "Neutrino" in blob 

330 assert "NeutrinoMultiplicity" in blob 

331 assert "Weights" in blob 

332 assert "Primary" in blob 

333 pump.finish() 

334 

335 

336class TestKM3Sim(TestCase): 

337 def setUp(self): 

338 self.fname = data_path("evt/KM3Sim.evt") 

339 

340 def test_pipe(self): 

341 pump = EvtPump(filename=self.fname, parsers=["km3sim"]) 

342 assert EVT_PARSERS["km3sim"] in pump.parsers 

343 next(pump) 

344 pump.finish() 

345 

346 def test_hits(self): 

347 pump = EvtPump(filename=self.fname, parsers=["km3sim"]) 

348 blob = pump[0] 

349 hits = blob["KM3SimHits"] 

350 assert 4 == len(hits) 

351 assert 195749 == hits[0].pmt_id 

352 

353 def test_neutrino(self): 

354 pump = EvtPump(filename=self.fname, parsers=["gseagen", "km3sim"]) 

355 blob = pump[0] 

356 EVT_PARSERS["gseagen"](blob) 

357 neutrino = blob["Neutrinos"][0] 

358 self.assertAlmostEqual(0.10066, neutrino.energy) 

359 

360 

361class TestParserDetection(TestCase): 

362 def test_parsers_are_automatically_detected(self): 

363 pass