Coverage for src/km3modules/tests/test_io.py: 100%

126 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-03-29 03:15 +0000

1#!/usr/bin/env python3 

2import tempfile 

3import unittest 

4 

5from km3net_testdata import data_path 

6 

7import km3pipe as kp 

8import km3modules as km 

9import numpy as np 

10import km3io 

11import awkward as ak 

12 

13 

14class TestOfflineHeaderTabulator(unittest.TestCase): 

15 def test_module(self): 

16 outfile = tempfile.NamedTemporaryFile(delete=True) 

17 

18 pipe = kp.Pipeline() 

19 pipe.attach(kp.io.OfflinePump, filename=data_path("offline/numucc.root")) 

20 pipe.attach(km.io.OfflineHeaderTabulator) 

21 pipe.attach(kp.io.HDF5Sink, filename=outfile.name) 

22 pipe.drain() 

23 

24 pipe = kp.Pipeline() 

25 pipe.attach(kp.io.HDF5Pump, filename=outfile.name) 

26 pipe.attach(km.common.Observer, count=10, required_keys=["RawHeader"]) 

27 pipe.drain() 

28 

29 

30class TestEventInfoTabulator(unittest.TestCase): 

31 def test_module(self): 

32 outfile = tempfile.NamedTemporaryFile(delete=True) 

33 

34 pipe = kp.Pipeline() 

35 pipe.attach( 

36 kp.io.OfflinePump, 

37 filename=data_path( 

38 "offline/mcv6.0.gsg_muon_highE-CC_50-500GeV.km3sim.jterbr00008357.jorcarec.aanet.905.root" 

39 ), 

40 ) 

41 pipe.attach(km.io.EventInfoTabulator) 

42 pipe.attach(kp.io.HDF5Sink, filename=outfile.name) 

43 pipe.drain(10) 

44 

45 pipe = kp.Pipeline() 

46 pipe.attach(kp.io.HDF5Pump, filename=outfile.name) 

47 pipe.attach(km.common.Observer, count=10, required_keys=["EventInfo"]) 

48 pipe.attach(CheckW2listContents) 

49 pipe.drain() 

50 

51 

52class TestHitsTabulator(unittest.TestCase): 

53 def test_offline_hits(self): 

54 outfile = tempfile.NamedTemporaryFile(delete=True) 

55 

56 pipe = kp.Pipeline() 

57 pipe.attach(kp.io.OfflinePump, filename=data_path("offline/numucc.root")) 

58 pipe.attach(km.io.HitsTabulator, kind="offline") 

59 pipe.attach(kp.io.HDF5Sink, filename=outfile.name) 

60 pipe.drain() 

61 

62 pipe = kp.Pipeline() 

63 pipe.attach(kp.io.HDF5Pump, filename=outfile.name) 

64 pipe.attach(km.common.Observer, count=10, required_keys=["Hits"]) 

65 pipe.drain() 

66 

67 def test_mc_hits(self): 

68 outfile = tempfile.NamedTemporaryFile(delete=True) 

69 

70 pipe = kp.Pipeline() 

71 pipe.attach(kp.io.OfflinePump, filename=data_path("offline/numucc.root")) 

72 pipe.attach(km.io.HitsTabulator, kind="mc") 

73 pipe.attach(kp.io.HDF5Sink, filename=outfile.name) 

74 pipe.drain() 

75 

76 pipe = kp.Pipeline() 

77 pipe.attach(kp.io.HDF5Pump, filename=outfile.name) 

78 pipe.attach(km.common.Observer, count=10, required_keys=["McHits"]) 

79 pipe.drain() 

80 

81 

82class TestMCTracksTabulator(unittest.TestCase): 

83 def test_module(self): 

84 outfile = tempfile.NamedTemporaryFile(delete=True) 

85 

86 pipe = kp.Pipeline() 

87 pipe.attach(kp.io.OfflinePump, filename=data_path("offline/numucc.root")) 

88 pipe.attach(km.io.MCTracksTabulator) 

89 pipe.attach(kp.io.HDF5Sink, filename=outfile.name) 

90 pipe.drain() 

91 

92 pipe = kp.Pipeline() 

93 pipe.attach(kp.io.HDF5Pump, filename=outfile.name) 

94 pipe.attach(km.common.Observer, count=10, required_keys=["McTracks"]) 

95 pipe.drain() 

96 

97 

98class TestRecoTracksTabulator(unittest.TestCase): 

99 def test_module(self): 

100 outfile = tempfile.NamedTemporaryFile(delete=True) 

101 

102 pipe = kp.Pipeline() 

103 pipe.attach( 

104 kp.io.OfflinePump, 

105 filename=data_path( 

106 "offline/mcv6.0.gsg_muon_highE-CC_50-500GeV.km3sim.jterbr00008357.jorcarec.aanet.905.root" 

107 ), 

108 ) 

109 pipe.attach(km.io.RecoTracksTabulator, best_tracks=True) 

110 pipe.attach(kp.io.HDF5Sink, filename=outfile.name) 

111 pipe.drain(5) 

112 

113 pipe = kp.Pipeline() 

114 pipe.attach(kp.io.HDF5Pump, filename=outfile.name) 

115 pipe.attach(km.common.Observer, count=5, required_keys=["Tracks"]) 

116 pipe.attach(km.common.Observer, count=5, required_keys=["RecStages"]) 

117 pipe.attach(km.common.Observer, count=5, required_keys=["BestJmuon"]) 

118 pipe.attach(CheckRecoContents) 

119 pipe.drain() 

120 

121 

122class CheckRecoContents(kp.Module): 

123 def configure(self): 

124 

125 # use this to count through the single events 

126 self.event_idx = 0 

127 

128 # get the original file to compare to 

129 filename = data_path( 

130 "offline/mcv6.0.gsg_muon_highE-CC_50-500GeV.km3sim.jterbr00008357.jorcarec.aanet.905.root" 

131 ) 

132 self.f = km3io.OfflineReader(filename) 

133 

134 def process(self, blob): 

135 

136 # first, get some extracted values form the h5 file 

137 

138 # best track 

139 jmuon_dir_z = blob["BestJmuon"].dir_z # a reco parameter 

140 jmuon_jgandalf_chi2 = blob["BestJmuon"].JGANDALF_CHI2 # a fitinf parameter 

141 

142 # all tracks 

143 tracks_dir_z = blob["Tracks"].dir_z 

144 tracks_jgandalf_chi2 = blob["Tracks"].JGANDALF_CHI2 

145 

146 # then, get the values from the original file 

147 

148 # all tracks 

149 all_tracks_raw = self.f.events[self.event_idx].tracks 

150 all_tracks_raw_dir_z = all_tracks_raw.dir_z 

151 all_tracks_raw_fitinf = self._preprocess_fitinf(all_tracks_raw.fitinf) 

152 all_tracks_raw_jgandalf_chi2 = all_tracks_raw_fitinf[ 

153 :, km3io.definitions.fitparameters.JGANDALF_CHI2 

154 ] 

155 

156 # best tracks 

157 best_tracks_raw = km3io.tools.best_jmuon(all_tracks_raw) 

158 best_tracks_raw_dir_z = best_tracks_raw.dir_z 

159 best_tracks_raw_jgandalf_chi2 = best_tracks_raw.fitinf[ 

160 km3io.definitions.fitparameters.JGANDALF_CHI2 

161 ] 

162 

163 # and finally compare 

164 assert np.allclose(best_tracks_raw_dir_z, jmuon_dir_z) 

165 assert np.allclose(best_tracks_raw_jgandalf_chi2, jmuon_jgandalf_chi2) 

166 

167 # since an ak array wise assertion is not possible, do this element wise 

168 for i in range(len(all_tracks_raw_dir_z)): 

169 

170 assert np.allclose(all_tracks_raw_dir_z[i], tracks_dir_z[i]) 

171 

172 # exclude nans from the assertion as they are not detected as equal 

173 if not np.isnan(all_tracks_raw_jgandalf_chi2[i]) and not np.isnan( 

174 tracks_jgandalf_chi2[i] 

175 ): 

176 assert np.allclose( 

177 all_tracks_raw_jgandalf_chi2[i], tracks_jgandalf_chi2[i] 

178 ) 

179 

180 self.event_idx += 1 

181 

182 return blob 

183 

184 def _preprocess_fitinf(self, fitinf): 

185 # preprocess the fitinf a bit - yay! 

186 n_columns = max(km3io.definitions.fitparameters.values()) + 1 

187 fitinf_array = np.ma.filled( 

188 ak.to_numpy(ak.pad_none(fitinf, target=n_columns, axis=-1)), 

189 fill_value=np.nan, 

190 ).astype("float32") 

191 return fitinf_array 

192 

193 

194class CheckW2listContents(kp.Module): 

195 def configure(self): 

196 

197 # use this to count through the single events 

198 self.event_idx = 0 

199 

200 # get the original file to compare to 

201 filename = data_path( 

202 "offline/mcv6.0.gsg_muon_highE-CC_50-500GeV.km3sim.jterbr00008357.jorcarec.aanet.905.root" 

203 ) 

204 self.f = km3io.OfflineReader(filename) 

205 

206 def process(self, blob): 

207 

208 # extracted values 

209 by = blob["EventInfo"].W2LIST_GSEAGEN_BY[0] 

210 cc = blob["EventInfo"].W2LIST_GSEAGEN_CC[0] 

211 

212 # original values 

213 original_by = self.f.events[self.event_idx].w2list[ 

214 km3io.definitions.w2list_gseagen["W2LIST_GSEAGEN_BY"] 

215 ] 

216 original_cc = self.f.events[self.event_idx].w2list[ 

217 km3io.definitions.w2list_gseagen["W2LIST_GSEAGEN_CC"] 

218 ] 

219 

220 # and compare 

221 assert by == original_by 

222 assert cc == original_cc 

223 

224 self.event_idx += 1 

225 

226 return blob