Coverage for src/km3pipe/physics.py: 82%

90 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 03:14 +0000

1# Filename: physics.py 

2# pylint: disable=locally-disabled 

3""" 

4Cherenkov photon parameters. 

5 

6""" 

7 

8import km3io 

9import numba 

10import numpy as np 

11 

12import awkward as ak 

13from numba import njit 

14 

15from thepipe import Module 

16from km3pipe.dataclasses import Table 

17from km3pipe.hardware import Detector 

18from km3pipe.logger import get_logger 

19from km3pipe.constants import ( 

20 C_WATER, 

21 SIN_CHERENKOV, 

22 TAN_CHERENKOV, 

23 V_LIGHT_WATER, 

24 C_LIGHT, 

25) 

26import km3pipe.math 

27import km3pipe.extras 

28 

29__author__ = "Zineb ALY" 

30__copyright__ = "Copyright 2020, Tamas Gal and the KM3NeT collaboration." 

31__credits__ = [] 

32__license__ = "MIT" 

33__maintainer__ = "Zineb ALY" 

34__email__ = "zaly@km3net.de" 

35__status__ = "Development" 

36 

37log = get_logger(__name__) 

38 

39 

40def cherenkov(calib_hits, track): 

41 """Compute parameters of Cherenkov photons emitted from a track and hitting a PMT. 

42 calib_hits is the table of calibrated hits of the track. 

43 

44 Parameters 

45 ---------- 

46 calib_hits : kp Table or a DataFrame or a numpy recarray or a dict. 

47 Table of calibrated hits with the following parameters: 

48 - pos_x. 

49 - pos_y. 

50 - pos_z. 

51 - dir_x. 

52 - dir_y. 

53 - dir_z. 

54 track : awkward.Array, DataFrame, numpy.recarray, km3pipe.Table or a dict. 

55 One track with the following parameters: 

56 - pos_x. 

57 - pos_y. 

58 - pos_z. 

59 - dir_x. 

60 - dir_y. 

61 - dir_z. 

62 - t. 

63 

64 Returns 

65 ------- 

66 ndarray 

67 a structured array of the physics parameters of Cherenkov photons: 

68 - d_photon_closest: the closest distance between the PMT and the track 

69 (it is perpendicular to the track). 

70 - d_photon: distance traveled by the photon from the track to the PMT. 

71 - d_track: distance along the track where the photon was emitted. 

72 - t_photon: time of photon travel in [s]. 

73 - cos_photon_PMT: cos angle of impact of photon with respect to the PMT direction: 

74 - dir_x_photon, dir_y_photon, dir_z_photon: photon directions. 

75 """ 

76 

77 pd = km3pipe.extras.pandas() 

78 

79 if isinstance(calib_hits, (dict, pd.DataFrame, np.ndarray, Table, ak.Record)): 

80 calib_pos = np.array( 

81 [calib_hits["pos_x"], calib_hits["pos_y"], calib_hits["pos_z"]] 

82 ).T 

83 calib_dir = np.array( 

84 [calib_hits["dir_x"], calib_hits["dir_y"], calib_hits["dir_z"]] 

85 ).T 

86 

87 if isinstance(track, (dict, pd.core.series.Series, pd.DataFrame)): 

88 track_pos = np.array([track["pos_x"], track["pos_y"], track["pos_z"]]).T 

89 track_dir = np.array([track["dir_x"], track["dir_y"], track["dir_z"]]).T 

90 track_t = track["t"] 

91 

92 if isinstance(track, (Table, np.ndarray)): 

93 track_pos = np.array([track["pos_x"], track["pos_y"], track["pos_z"]]).reshape( 

94 3, 

95 ) 

96 track_dir = np.array([track["dir_x"], track["dir_y"], track["dir_z"]]).reshape( 

97 3, 

98 ) 

99 track_t = track["t"][0] 

100 

101 if isinstance(track, (ak.Array, ak.Record, km3io.rootio.Branch)): 

102 track_pos = np.array([track.pos_x, track.pos_y, track.pos_z]).T 

103 track_dir = np.array([track.dir_x, track.dir_y, track.dir_z]).T 

104 track_t = track.t 

105 

106 out = _cherenkov(calib_pos, calib_dir, track_pos, track_dir, track_t) 

107 

108 return out.view( 

109 dtype=[ 

110 ("d_photon_closest", "<f8"), 

111 ("d_photon", "<f8"), 

112 ("d_track", "<f8"), 

113 ("t_photon", "<f8"), 

114 ("cos_photon_PMT", "<f8"), 

115 ("dir_x_photon", "<f8"), 

116 ("dir_y_photon", "<f8"), 

117 ("dir_z_photon", "<f8"), 

118 ] 

119 ).reshape( 

120 len(out), 

121 ) 

122 

123 

124@njit 

125def _cherenkov(calib_pos, calib_dir, track_pos, track_dir, track_t): 

126 """calculate Cherenkov photons parameters""" 

127 rows = len(calib_pos) 

128 out = np.zeros((rows, 8)) 

129 

130 for i in range(rows): 

131 # (vector PMT) - (vector track position) 

132 V = calib_pos[i] - track_pos 

133 L = np.sum(V * track_dir) 

134 out[i, 0] = np.sqrt(np.sum(V * V) - L * L) # d_photon_closest 

135 out[i, 1] = out[i][0] / SIN_CHERENKOV # d_photon 

136 out[i, 2] = L - out[i][0] / TAN_CHERENKOV # d_track 

137 out[i, 3] = ( 

138 track_t + out[i][2] / C_LIGHT + out[i][1] / V_LIGHT_WATER 

139 ) # t_photon 

140 V_photon = V - (out[i][2] * track_dir) # photon position 

141 norm = np.sqrt(np.sum(V_photon * V_photon)) 

142 out[i, 5:8] = V_photon / norm # photon direction 

143 

144 # cos angle of impact of photon with respect to the PMT direction 

145 out[i, 4] = np.sum(out[i][5:8] * calib_dir[i]) 

146 return out 

147 

148 

149def _get_closest(track_pos, track_dir, meanDU_pos, meanDU_dir): 

150 """calculate the distance of closest approach""" 

151 # direction track to the mean DU, assumes vertical DU: 

152 cross = np.cross(track_dir, meanDU_dir) 

153 dir_to_DU = cross / np.linalg.norm(cross, axis=0) # normalized vector 

154 

155 # vector track position to mean DU position, perpendicular to track. 

156 V = track_pos - meanDU_pos 

157 d_closest = abs(np.dot(V, dir_to_DU)) 

158 

159 # find z-coordinate of the point of closest approach 

160 # 1) - vector track position to mean DU position, assuming vertical DU. 

161 V[2] = 0.0 

162 V_norm = np.linalg.norm(V, axis=0) 

163 

164 # 2) distance from the track origin to point of closest approach along the track 

165 d = np.sqrt(V_norm**2 - d_closest**2) 

166 # norm of the track direction along (x,y) 

167 xy_norm = np.linalg.norm(track_dir[0:2], axis=0) 

168 

169 # 3) track_pos_Z + distance (along z axis) to closest approach along the track 

170 z_closest = track_pos[2] + d * (track_dir[2] / xy_norm) 

171 

172 return d_closest, z_closest 

173 

174 

175def get_closest(track, du_pos): 

176 """calculate the distance of closest approach (d_closest) and its coordinate along the z axis (z_closest). 

177 These calculations aLWAYS assume vertical DU. 

178 

179 Parameters 

180 ---------- 

181 track : awkward.Array, DataFrame, numpy.recarray, km3pipe.Table or a dict. 

182 One track with the following parameters: 

183 - pos_x. 

184 - pos_y. 

185 - pos_z. 

186 - dir_x. 

187 - dir_y. 

188 - dir_z. 

189 du_pos : a DataFrame or a numpy recarray or a km3pipe Table or a dict. 

190 du_pos vector with the following information: 

191 - pos_x. 

192 - pos_y. 

193 - pos_z. 

194 

195 Returns 

196 ------- 

197 tuple 

198 (d_closest, z_closest). 

199 """ 

200 pd = km3pipe.extras.pandas() 

201 

202 if isinstance( 

203 du_pos, (dict, pd.core.series.Series, pd.DataFrame, Table, np.ndarray) 

204 ): 

205 meanDU_pos = np.array( 

206 [du_pos["pos_x"], du_pos["pos_y"], du_pos["pos_z"]] 

207 ).reshape( 

208 3, 

209 ) 

210 meanDU_dir = np.array([0, 0, 1]) # assumes vertical DU 

211 

212 if isinstance( 

213 track, (dict, pd.core.series.Series, pd.DataFrame, Table, np.ndarray) 

214 ): 

215 track_pos = np.array([track["pos_x"], track["pos_y"], track["pos_z"]]).reshape( 

216 3, 

217 ) 

218 track_dir = np.array([track["dir_x"], track["dir_y"], track["dir_z"]]).reshape( 

219 3, 

220 ) 

221 

222 if isinstance(track, ak.Array): 

223 track_pos = np.array([track.pos_x, track.pos_y, track.pos_z]) 

224 track_dir = np.array([track.dir_x, track.dir_y, track.dir_z]) 

225 

226 return _get_closest(track_pos, track_dir, meanDU_pos, meanDU_dir) 

227 

228 

229def cut4d(point4d, tmin, tmax, rmin, rmax, items, c_water=C_WATER): 

230 """Select items with a certain time residual and 

231 within a certain radius around a given 4D point. 

232 

233 Parameters 

234 ----------- 

235 point4d: array of shape(x, y, z, t) 

236 central point of the selection 

237 tmin: float 

238 minimum tres of the sphere selection 

239 tmax: float 

240 maximum tres of the sphere selection 

241 rmin: float 

242 minimum radius of the sphere selection 

243 rmax: float 

244 maximum radius of the sphere selection 

245 items: iterable with pos_[xyz]-attributed items 

246 the items to cut on 

247 

248 Returns 

249 -------- 

250 iterable with pos_[xyz]-attributed items 

251 items which survived the cut. 

252 """ 

253 point_array = point4d 

254 

255 if all(hasattr(point4d, "pos_" + q) for q in "xyz"): 

256 point_array = np.array( 

257 [point4d[0].pos_x, point4d[0].pos_y, point4d[0].pos_z, point4d[0].t] 

258 ) 

259 

260 dt = items.time - point_array[3] 

261 

262 items_pos = np.array([items.pos_x, items.pos_y, items.pos_z]).T 

263 

264 distances = km3pipe.math.dist( 

265 np.array([point_array[0], point_array[1], point_array[2]]), items_pos, axis=1 

266 ) 

267 

268 tres = dt - (distances / C_WATER) 

269 

270 mask = (tres >= tmin) & (tres <= tmax) & (distances >= rmin) & (distances <= rmax) 

271 selected_items = items[mask] 

272 

273 return selected_items