Coverage for src/km3modules/tests/test_io.py: 100%
126 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-03-29 03:15 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-03-29 03:15 +0000
1#!/usr/bin/env python3
2import tempfile
3import unittest
5from km3net_testdata import data_path
7import km3pipe as kp
8import km3modules as km
9import numpy as np
10import km3io
11import awkward as ak
14class TestOfflineHeaderTabulator(unittest.TestCase):
15 def test_module(self):
16 outfile = tempfile.NamedTemporaryFile(delete=True)
18 pipe = kp.Pipeline()
19 pipe.attach(kp.io.OfflinePump, filename=data_path("offline/numucc.root"))
20 pipe.attach(km.io.OfflineHeaderTabulator)
21 pipe.attach(kp.io.HDF5Sink, filename=outfile.name)
22 pipe.drain()
24 pipe = kp.Pipeline()
25 pipe.attach(kp.io.HDF5Pump, filename=outfile.name)
26 pipe.attach(km.common.Observer, count=10, required_keys=["RawHeader"])
27 pipe.drain()
30class TestEventInfoTabulator(unittest.TestCase):
31 def test_module(self):
32 outfile = tempfile.NamedTemporaryFile(delete=True)
34 pipe = kp.Pipeline()
35 pipe.attach(
36 kp.io.OfflinePump,
37 filename=data_path(
38 "offline/mcv6.0.gsg_muon_highE-CC_50-500GeV.km3sim.jterbr00008357.jorcarec.aanet.905.root"
39 ),
40 )
41 pipe.attach(km.io.EventInfoTabulator)
42 pipe.attach(kp.io.HDF5Sink, filename=outfile.name)
43 pipe.drain(10)
45 pipe = kp.Pipeline()
46 pipe.attach(kp.io.HDF5Pump, filename=outfile.name)
47 pipe.attach(km.common.Observer, count=10, required_keys=["EventInfo"])
48 pipe.attach(CheckW2listContents)
49 pipe.drain()
52class TestHitsTabulator(unittest.TestCase):
53 def test_offline_hits(self):
54 outfile = tempfile.NamedTemporaryFile(delete=True)
56 pipe = kp.Pipeline()
57 pipe.attach(kp.io.OfflinePump, filename=data_path("offline/numucc.root"))
58 pipe.attach(km.io.HitsTabulator, kind="offline")
59 pipe.attach(kp.io.HDF5Sink, filename=outfile.name)
60 pipe.drain()
62 pipe = kp.Pipeline()
63 pipe.attach(kp.io.HDF5Pump, filename=outfile.name)
64 pipe.attach(km.common.Observer, count=10, required_keys=["Hits"])
65 pipe.drain()
67 def test_mc_hits(self):
68 outfile = tempfile.NamedTemporaryFile(delete=True)
70 pipe = kp.Pipeline()
71 pipe.attach(kp.io.OfflinePump, filename=data_path("offline/numucc.root"))
72 pipe.attach(km.io.HitsTabulator, kind="mc")
73 pipe.attach(kp.io.HDF5Sink, filename=outfile.name)
74 pipe.drain()
76 pipe = kp.Pipeline()
77 pipe.attach(kp.io.HDF5Pump, filename=outfile.name)
78 pipe.attach(km.common.Observer, count=10, required_keys=["McHits"])
79 pipe.drain()
82class TestMCTracksTabulator(unittest.TestCase):
83 def test_module(self):
84 outfile = tempfile.NamedTemporaryFile(delete=True)
86 pipe = kp.Pipeline()
87 pipe.attach(kp.io.OfflinePump, filename=data_path("offline/numucc.root"))
88 pipe.attach(km.io.MCTracksTabulator)
89 pipe.attach(kp.io.HDF5Sink, filename=outfile.name)
90 pipe.drain()
92 pipe = kp.Pipeline()
93 pipe.attach(kp.io.HDF5Pump, filename=outfile.name)
94 pipe.attach(km.common.Observer, count=10, required_keys=["McTracks"])
95 pipe.drain()
98class TestRecoTracksTabulator(unittest.TestCase):
99 def test_module(self):
100 outfile = tempfile.NamedTemporaryFile(delete=True)
102 pipe = kp.Pipeline()
103 pipe.attach(
104 kp.io.OfflinePump,
105 filename=data_path(
106 "offline/mcv6.0.gsg_muon_highE-CC_50-500GeV.km3sim.jterbr00008357.jorcarec.aanet.905.root"
107 ),
108 )
109 pipe.attach(km.io.RecoTracksTabulator, best_tracks=True)
110 pipe.attach(kp.io.HDF5Sink, filename=outfile.name)
111 pipe.drain(5)
113 pipe = kp.Pipeline()
114 pipe.attach(kp.io.HDF5Pump, filename=outfile.name)
115 pipe.attach(km.common.Observer, count=5, required_keys=["Tracks"])
116 pipe.attach(km.common.Observer, count=5, required_keys=["RecStages"])
117 pipe.attach(km.common.Observer, count=5, required_keys=["BestJmuon"])
118 pipe.attach(CheckRecoContents)
119 pipe.drain()
122class CheckRecoContents(kp.Module):
123 def configure(self):
125 # use this to count through the single events
126 self.event_idx = 0
128 # get the original file to compare to
129 filename = data_path(
130 "offline/mcv6.0.gsg_muon_highE-CC_50-500GeV.km3sim.jterbr00008357.jorcarec.aanet.905.root"
131 )
132 self.f = km3io.OfflineReader(filename)
134 def process(self, blob):
136 # first, get some extracted values form the h5 file
138 # best track
139 jmuon_dir_z = blob["BestJmuon"].dir_z # a reco parameter
140 jmuon_jgandalf_chi2 = blob["BestJmuon"].JGANDALF_CHI2 # a fitinf parameter
142 # all tracks
143 tracks_dir_z = blob["Tracks"].dir_z
144 tracks_jgandalf_chi2 = blob["Tracks"].JGANDALF_CHI2
146 # then, get the values from the original file
148 # all tracks
149 all_tracks_raw = self.f.events[self.event_idx].tracks
150 all_tracks_raw_dir_z = all_tracks_raw.dir_z
151 all_tracks_raw_fitinf = self._preprocess_fitinf(all_tracks_raw.fitinf)
152 all_tracks_raw_jgandalf_chi2 = all_tracks_raw_fitinf[
153 :, km3io.definitions.fitparameters.JGANDALF_CHI2
154 ]
156 # best tracks
157 best_tracks_raw = km3io.tools.best_jmuon(all_tracks_raw)
158 best_tracks_raw_dir_z = best_tracks_raw.dir_z
159 best_tracks_raw_jgandalf_chi2 = best_tracks_raw.fitinf[
160 km3io.definitions.fitparameters.JGANDALF_CHI2
161 ]
163 # and finally compare
164 assert np.allclose(best_tracks_raw_dir_z, jmuon_dir_z)
165 assert np.allclose(best_tracks_raw_jgandalf_chi2, jmuon_jgandalf_chi2)
167 # since an ak array wise assertion is not possible, do this element wise
168 for i in range(len(all_tracks_raw_dir_z)):
170 assert np.allclose(all_tracks_raw_dir_z[i], tracks_dir_z[i])
172 # exclude nans from the assertion as they are not detected as equal
173 if not np.isnan(all_tracks_raw_jgandalf_chi2[i]) and not np.isnan(
174 tracks_jgandalf_chi2[i]
175 ):
176 assert np.allclose(
177 all_tracks_raw_jgandalf_chi2[i], tracks_jgandalf_chi2[i]
178 )
180 self.event_idx += 1
182 return blob
184 def _preprocess_fitinf(self, fitinf):
185 # preprocess the fitinf a bit - yay!
186 n_columns = max(km3io.definitions.fitparameters.values()) + 1
187 fitinf_array = np.ma.filled(
188 ak.to_numpy(ak.pad_none(fitinf, target=n_columns, axis=-1)),
189 fill_value=np.nan,
190 ).astype("float32")
191 return fitinf_array
194class CheckW2listContents(kp.Module):
195 def configure(self):
197 # use this to count through the single events
198 self.event_idx = 0
200 # get the original file to compare to
201 filename = data_path(
202 "offline/mcv6.0.gsg_muon_highE-CC_50-500GeV.km3sim.jterbr00008357.jorcarec.aanet.905.root"
203 )
204 self.f = km3io.OfflineReader(filename)
206 def process(self, blob):
208 # extracted values
209 by = blob["EventInfo"].W2LIST_GSEAGEN_BY[0]
210 cc = blob["EventInfo"].W2LIST_GSEAGEN_CC[0]
212 # original values
213 original_by = self.f.events[self.event_idx].w2list[
214 km3io.definitions.w2list_gseagen["W2LIST_GSEAGEN_BY"]
215 ]
216 original_cc = self.f.events[self.event_idx].w2list[
217 km3io.definitions.w2list_gseagen["W2LIST_GSEAGEN_CC"]
218 ]
220 # and compare
221 assert by == original_by
222 assert cc == original_cc
224 self.event_idx += 1
226 return blob