Coverage for src/km3pipe/io/tests/test_evt.py: 95%
231 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:15 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 03:15 +0000
1# Filename: test_evt.py
2# pylint: disable=locally-disabled,C0111,R0904,C0301,C0103,W0212
4from io import StringIO
5from os.path import join, dirname
7import numpy as np
9from km3pipe.testing import TestCase, skip, data_path
10from km3pipe.io.evt import EvtPump, EVT_PARSERS
12__author__ = "Tamas Gal"
13__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
14__credits__ = []
15__license__ = "MIT"
16__maintainer__ = "Tamas Gal"
17__email__ = "tgal@km3net.de"
18__status__ = "Development"
21class TestEvtPump(TestCase):
22 def setUp(self):
23 self.valid_evt_header = "\n".join(
24 (
25 "start_run: 1",
26 "cut_nu: 0.100E+03 0.100E+09-0.100E+01 0.100E+01",
27 "spectrum: -1.40",
28 "physics: gSeaGen 4.1 180126 165142",
29 "physics: GENIE 2.10.2 180126 165142",
30 "end_event:",
31 "start_event: 12 1",
32 "track_in: 1 -389.951 213.427 516 -0.204562 -0.60399 -0.770293"
33 + " 9.092 0 5 40.998",
34 "hit: 1 44675 1 1170.59 5 2 1 1170.59",
35 "end_event:",
36 "start_event: 13 1",
37 "track_in: 1 272.695 -105.613 516 -0.425451 -0.370522 -0.825654"
38 + " 2431.47 0 5 -1380",
39 "track_in: 2 272.348 -106.292 516 -0.425451 -0.370522 -0.825654"
40 + " 24670.7 1.33 5 -1484",
41 "track_in: 3 279.47 -134.999 516 -0.425451 -0.370522 -0.825654"
42 + " 164.586 26.7 5 601.939",
43 "hit: 1 20140 1 1140.06 5 1 1 1140.06",
44 "hit: 2 20159 1 1177.14 5 1 1 1177.14",
45 "hit: 3 20164 1 1178.19 5 1 1 1178.19",
46 "hit: 4 20170 1 1177.23 5 1 1 1177.23",
47 "hit: 5 20171 2 1177.25 5 1 2 1177.25",
48 "end_event:",
49 "start_event: 14 1",
50 "track_in: 1 40.256 -639.888 516 0.185998 0.476123 -0.859483"
51 + " 10016.1 0 5 -1668",
52 "hit: 1 33788 1 2202.81 5 1 1 2202.81",
53 "hit: 2 33801 1 2248.95 5 1 1 2248.95",
54 "hit: 3 33814 1 2249.2 5 1 1 2249.2",
55 "end_event:",
56 )
57 )
58 self.no_evt_header = "\n".join(
59 (
60 "start_event: 12 1",
61 "track_in: 1 -389.951 213.427 516 -0.204562 -0.60399 -0.770293"
62 + " 9.092 0 5 40.998",
63 "hit: 1 44675 1 1170.59 5 2 1 1170.59",
64 "end_event:",
65 "start_event: 13 1",
66 "track_in: 1 272.695 -105.613 516 -0.425451 -0.370522 -0.825654"
67 + " 2431.47 0 5 -1380",
68 "track_in: 2 272.348 -106.292 516 -0.425451 -0.370522 -0.825654"
69 + " 24670.7 1.33 5 -1484",
70 "track_in: 3 279.47 -134.999 516 -0.425451 -0.370522 -0.825654"
71 + " 164.586 26.7 5 601.939",
72 "hit: 1 20140 1 1140.06 5 1 1 1140.06",
73 "hit: 2 20159 1 1177.14 5 1 1 1177.14",
74 "hit: 3 20164 1 1178.19 5 1 1 1178.19",
75 "hit: 4 20170 1 1177.23 5 1 1 1177.23",
76 "hit: 5 20171 2 1177.25 5 1 2 1177.25",
77 "end_event:",
78 "start_event: 14 1",
79 "track_in: 1 40.256 -639.888 516 0.185998 0.476123 -0.859483"
80 + " 10016.1 0 5 -1668",
81 "hit: 1 33788 1 2202.81 5 1 1 2202.81",
82 "hit: 2 33801 1 2248.95 5 1 1 2248.95",
83 "hit: 3 33814 1 2249.2 5 1 1 2249.2",
84 "end_event:",
85 )
86 )
87 self.corrupt_evt_header = "foo"
88 self.corrupt_line = "\n".join(
89 ("start_event: 1 1", "corrupt line", "end_event:")
90 )
92 self.pump = EvtPump(parsers=[])
93 self.pump.blob_file = StringIO(self.valid_evt_header)
95 def tearDown(self):
96 self.pump.blob_file.close()
98 def test_parse_header(self):
99 raw_header = self.pump.extract_header()
100 self.assertEqual([["1"]], raw_header["start_run"])
101 self.assertAlmostEqual(-1.4, float(raw_header["spectrum"][0][0]))
102 self.assertAlmostEqual(1, float(raw_header["cut_nu"][0][2]))
104 def test_header_entries_with_same_tag_are_put_in_lists(self):
105 raw_header = self.pump.extract_header()
106 self.assertAlmostEqual(2, len(raw_header["physics"]))
107 self.assertAlmostEqual(1, len(raw_header["spectrum"]))
108 assert "gSeaGen" == raw_header["physics"][0][0]
109 assert "GENIE" == raw_header["physics"][1][0]
111 # def test_incomplete_header_raises_value_error(self):
112 # temp_file = StringIO(self.corrupt_evt_header)
113 # pump = EvtPump()
114 # pump.blob_file = temp_file
115 # with self.assertRaises(ValueError):
116 # pump.extract_header()
117 # temp_file.close()
119 def test_record_offset_saves_correct_offset(self):
120 self.pump.blob_file = StringIO("a" * 42)
121 offsets = [1, 4, 9, 12, 23]
122 for offset in offsets:
123 self.pump.blob_file.seek(0, 0)
124 self.pump.blob_file.seek(offset, 0)
125 self.pump._record_offset()
126 self.assertListEqual(offsets, self.pump.event_offsets)
128 def test_event_offset_is_at_first_event_after_parsing_header(self):
129 self.pump.extract_header()
130 self.assertEqual(161, self.pump.event_offsets[0])
132 def test_rebuild_offsets(self):
133 self.pump.extract_header()
134 self.pump._cache_offsets()
135 self.assertListEqual([161, 306, 773], self.pump.event_offsets)
137 def test_rebuild_offsets_without_header(self):
138 self.pump.blob_file = StringIO(self.no_evt_header)
139 self.pump.extract_header()
140 self.pump._cache_offsets()
141 self.assertListEqual([0, 145, 612], self.pump.event_offsets)
143 def test_cache_enabled_triggers_rebuild_offsets(self):
144 self.pump.cache_enabled = True
145 self.pump.prepare_blobs()
146 self.assertEqual(3, len(self.pump.event_offsets))
148 def test_cache_disabled_doesnt_trigger_cache_offsets(self):
149 self.pump.cache_enabled = False
150 self.pump.prepare_blobs()
151 self.assertEqual(1, len(self.pump.event_offsets))
153 def test_get_blob_triggers_cache_offsets_if_cache_disabled(self):
154 "...and asking for not indexed event"
155 self.pump.cache_enabled = False
156 self.pump.prepare_blobs()
157 self.assertEqual(1, len(self.pump.event_offsets))
158 blob = self.pump.get_blob(2)
159 self.assertTupleEqual((14, 1), blob["start_event"])
160 self.assertEqual(3, len(self.pump.event_offsets))
162 def test_get_blob_raises_index_error_for_wrong_index(self):
163 self.pump.prepare_blobs()
164 with self.assertRaises(IndexError):
165 self.pump.get_blob(23)
167 def test_get_blob_returns_correct_event_information(self):
168 self.pump.prepare_blobs()
169 blob = self.pump.get_blob(0)
170 self.assertTrue("raw_header" in blob)
171 self.assertEqual([["1"]], blob["raw_header"]["start_run"])
172 self.assertTupleEqual((12, 1), blob["start_event"])
173 # TODO: all the other stuff like hit, track etc.
174 assert "hit" in blob
175 assert "track_in" in blob
176 assert np.allclose(
177 [[1.0, 44675.0, 1.0, 1170.59, 5.0, 2.0, 1.0, 1170.59]], blob["hit"]
178 )
179 blob = self.pump.get_blob(1)
180 assert 5 == len(blob["hit"])
181 assert np.allclose(
182 [3.0, 20164.0, 1.0, 1178.19, 5.0, 1.0, 1.0, 1178.19], blob["hit"][2]
183 )
185 def test_get_blob_returns_correct_events(self):
186 self.pump.prepare_blobs()
187 blob = self.pump.get_blob(0)
188 self.assertTupleEqual((12, 1), blob["start_event"])
189 blob = self.pump.get_blob(2)
190 self.assertTupleEqual((14, 1), blob["start_event"])
191 blob = self.pump.get_blob(1)
192 self.assertTupleEqual((13, 1), blob["start_event"])
194 def test_process_returns_correct_blobs(self):
195 self.pump.prepare_blobs()
196 blob = self.pump.process()
197 self.assertTupleEqual((12, 1), blob["start_event"])
198 blob = self.pump.process()
199 self.assertTupleEqual((13, 1), blob["start_event"])
200 blob = self.pump.process()
201 self.assertTupleEqual((14, 1), blob["start_event"])
203 def test_process_raises_stop_iteration_if_eof_reached(self):
204 self.pump.prepare_blobs()
205 self.pump.process()
206 self.pump.process()
207 self.pump.process()
208 with self.assertRaises(StopIteration):
209 self.pump.process()
211 def test_pump_acts_as_iterator(self):
212 self.pump.prepare_blobs()
213 event_numbers = []
214 for blob in self.pump:
215 event_numbers.append(blob["start_event"][0])
216 self.assertListEqual([12, 13, 14], event_numbers)
218 def test_pump_has_len(self):
219 self.pump.prepare_blobs()
220 self.assertEqual(3, len(self.pump))
222 def test_pump_get_item_returns_first_for_index_zero(self):
223 self.pump.prepare_blobs()
224 first_blob = self.pump[0]
225 self.assertEqual(12, first_blob["start_event"][0])
227 def test_pump_get_item_returns_correct_blob_for_index(self):
228 self.pump.prepare_blobs()
229 blob = self.pump[1]
230 self.assertEqual(13, blob["start_event"][0])
232 def test_pump_slice_generator(self):
233 self.pump.prepare_blobs()
234 blobs = self.pump[:]
235 blobs = list(self.pump[1:3])
236 self.assertEqual(2, len(blobs))
237 self.assertEqual((13, 1), blobs[0]["start_event"])
239 def test_create_blob_entry_for_line_ignores_corrupt_line(self):
240 self.pump.blob_file = StringIO(self.corrupt_line)
241 self.pump.extract_header()
242 self.pump.prepare_blobs()
243 self.pump.get_blob(0)
245 def test_parsers_are_ignored_if_not_valid(self):
246 self.pump = EvtPump(parsers=["a", "b"])
247 self.pump.blob_file = StringIO(self.valid_evt_header)
248 assert "a" not in self.pump.parsers
249 assert "b" not in self.pump.parsers
251 def test_parsers_are_added(self):
252 self.pump = EvtPump(parsers=["km3sim"])
253 self.pump.blob_file = StringIO(self.valid_evt_header)
254 assert EVT_PARSERS["km3sim"] in self.pump.parsers
256 def test_custom_parser(self):
257 def a_parser(blob):
258 blob["foo"] = 23
260 self.pump = EvtPump(parsers=[a_parser])
261 self.pump.blob_file = StringIO(self.valid_evt_header)
262 self.pump.extract_header()
263 self.pump.prepare_blobs()
264 blob = self.pump[0]
266 assert 23 == blob["foo"]
268 def test_auto_parser_finds_all_physics_parsers(self):
269 self.pump = EvtPump(parsers="auto")
270 self.pump.blob_file = StringIO(self.valid_evt_header)
271 self.pump.extract_header()
272 assert EVT_PARSERS["gseagen"] in self.pump.parsers
275class TestEvtFilePump(TestCase):
276 def setUp(self):
277 self.fname = data_path("evt/example_numuNC.evt")
279 def test_pipe(self):
280 pump = EvtPump(filename=self.fname)
281 next(pump)
282 pump.finish()
285class TestCorsika(TestCase):
286 def setUp(self):
287 self.fname = data_path("evt/example_corant_propa.evt")
289 def test_pipe(self):
290 pump = EvtPump(filename=self.fname)
291 next(pump)
292 pump.finish()
295class TestPropa(TestCase):
296 def setUp(self):
297 self.fname = data_path("evt/example_corant_propa.evt")
298 self.fnames = []
299 for i in [0, 1]:
300 self.fnames.append(data_path("evt/example_corant_propa.evt"))
302 def test_pipe(self):
303 pump = EvtPump(filename=self.fname, parsers=["propa"])
304 assert EVT_PARSERS["propa"] in pump.parsers
305 blob = next(pump)
306 assert "start_event" in blob
307 assert "track_primary" in blob
308 assert "track_in" in blob
309 pump.finish()
311 def test_filenames(self):
312 pump = EvtPump(filenames=self.fnames, parsers=["propa"])
313 assert EVT_PARSERS["propa"] in pump.parsers
314 blob = next(pump)
315 assert "start_event" in blob
316 assert "track_primary" in blob
317 assert "track_in" in blob
318 pump.finish()
320 @skip
321 def test_auto_parser(self):
322 pump = EvtPump(filename=self.fname)
323 assert EVT_PARSERS["propa"] in pump.parsers
324 blob = next(pump)
325 assert "start_event" in blob
326 assert "track_primary" in blob
327 assert "Muon" in blob
328 assert "MuonMultiplicity" in blob
329 assert "Neutrino" in blob
330 assert "NeutrinoMultiplicity" in blob
331 assert "Weights" in blob
332 assert "Primary" in blob
333 pump.finish()
336class TestKM3Sim(TestCase):
337 def setUp(self):
338 self.fname = data_path("evt/KM3Sim.evt")
340 def test_pipe(self):
341 pump = EvtPump(filename=self.fname, parsers=["km3sim"])
342 assert EVT_PARSERS["km3sim"] in pump.parsers
343 next(pump)
344 pump.finish()
346 def test_hits(self):
347 pump = EvtPump(filename=self.fname, parsers=["km3sim"])
348 blob = pump[0]
349 hits = blob["KM3SimHits"]
350 assert 4 == len(hits)
351 assert 195749 == hits[0].pmt_id
353 def test_neutrino(self):
354 pump = EvtPump(filename=self.fname, parsers=["gseagen", "km3sim"])
355 blob = pump[0]
356 EVT_PARSERS["gseagen"](blob)
357 neutrino = blob["Neutrinos"][0]
358 self.assertAlmostEqual(0.10066, neutrino.energy)
361class TestParserDetection(TestCase):
362 def test_parsers_are_automatically_detected(self):
363 pass