Coverage for src/km3pipe/physics.py: 82%
90 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1# Filename: physics.py
2# pylint: disable=locally-disabled
3"""
4Cherenkov photon parameters.
6"""
8import km3io
9import numba
10import numpy as np
12import awkward as ak
13from numba import njit
15from thepipe import Module
16from km3pipe.dataclasses import Table
17from km3pipe.hardware import Detector
18from km3pipe.logger import get_logger
19from km3pipe.constants import (
20 C_WATER,
21 SIN_CHERENKOV,
22 TAN_CHERENKOV,
23 V_LIGHT_WATER,
24 C_LIGHT,
25)
26import km3pipe.math
27import km3pipe.extras
29__author__ = "Zineb ALY"
30__copyright__ = "Copyright 2020, Tamas Gal and the KM3NeT collaboration."
31__credits__ = []
32__license__ = "MIT"
33__maintainer__ = "Zineb ALY"
34__email__ = "zaly@km3net.de"
35__status__ = "Development"
37log = get_logger(__name__)
40def cherenkov(calib_hits, track):
41 """Compute parameters of Cherenkov photons emitted from a track and hitting a PMT.
42 calib_hits is the table of calibrated hits of the track.
44 Parameters
45 ----------
46 calib_hits : kp Table or a DataFrame or a numpy recarray or a dict.
47 Table of calibrated hits with the following parameters:
48 - pos_x.
49 - pos_y.
50 - pos_z.
51 - dir_x.
52 - dir_y.
53 - dir_z.
54 track : awkward.Array, DataFrame, numpy.recarray, km3pipe.Table or a dict.
55 One track with the following parameters:
56 - pos_x.
57 - pos_y.
58 - pos_z.
59 - dir_x.
60 - dir_y.
61 - dir_z.
62 - t.
64 Returns
65 -------
66 ndarray
67 a structured array of the physics parameters of Cherenkov photons:
68 - d_photon_closest: the closest distance between the PMT and the track
69 (it is perpendicular to the track).
70 - d_photon: distance traveled by the photon from the track to the PMT.
71 - d_track: distance along the track where the photon was emitted.
72 - t_photon: time of photon travel in [s].
73 - cos_photon_PMT: cos angle of impact of photon with respect to the PMT direction:
74 - dir_x_photon, dir_y_photon, dir_z_photon: photon directions.
75 """
77 pd = km3pipe.extras.pandas()
79 if isinstance(calib_hits, (dict, pd.DataFrame, np.ndarray, Table, ak.Record)):
80 calib_pos = np.array(
81 [calib_hits["pos_x"], calib_hits["pos_y"], calib_hits["pos_z"]]
82 ).T
83 calib_dir = np.array(
84 [calib_hits["dir_x"], calib_hits["dir_y"], calib_hits["dir_z"]]
85 ).T
87 if isinstance(track, (dict, pd.core.series.Series, pd.DataFrame)):
88 track_pos = np.array([track["pos_x"], track["pos_y"], track["pos_z"]]).T
89 track_dir = np.array([track["dir_x"], track["dir_y"], track["dir_z"]]).T
90 track_t = track["t"]
92 if isinstance(track, (Table, np.ndarray)):
93 track_pos = np.array([track["pos_x"], track["pos_y"], track["pos_z"]]).reshape(
94 3,
95 )
96 track_dir = np.array([track["dir_x"], track["dir_y"], track["dir_z"]]).reshape(
97 3,
98 )
99 track_t = track["t"][0]
101 if isinstance(track, (ak.Array, ak.Record, km3io.rootio.Branch)):
102 track_pos = np.array([track.pos_x, track.pos_y, track.pos_z]).T
103 track_dir = np.array([track.dir_x, track.dir_y, track.dir_z]).T
104 track_t = track.t
106 out = _cherenkov(calib_pos, calib_dir, track_pos, track_dir, track_t)
108 return out.view(
109 dtype=[
110 ("d_photon_closest", "<f8"),
111 ("d_photon", "<f8"),
112 ("d_track", "<f8"),
113 ("t_photon", "<f8"),
114 ("cos_photon_PMT", "<f8"),
115 ("dir_x_photon", "<f8"),
116 ("dir_y_photon", "<f8"),
117 ("dir_z_photon", "<f8"),
118 ]
119 ).reshape(
120 len(out),
121 )
124@njit
125def _cherenkov(calib_pos, calib_dir, track_pos, track_dir, track_t):
126 """calculate Cherenkov photons parameters"""
127 rows = len(calib_pos)
128 out = np.zeros((rows, 8))
130 for i in range(rows):
131 # (vector PMT) - (vector track position)
132 V = calib_pos[i] - track_pos
133 L = np.sum(V * track_dir)
134 out[i, 0] = np.sqrt(np.sum(V * V) - L * L) # d_photon_closest
135 out[i, 1] = out[i][0] / SIN_CHERENKOV # d_photon
136 out[i, 2] = L - out[i][0] / TAN_CHERENKOV # d_track
137 out[i, 3] = (
138 track_t + out[i][2] / C_LIGHT + out[i][1] / V_LIGHT_WATER
139 ) # t_photon
140 V_photon = V - (out[i][2] * track_dir) # photon position
141 norm = np.sqrt(np.sum(V_photon * V_photon))
142 out[i, 5:8] = V_photon / norm # photon direction
144 # cos angle of impact of photon with respect to the PMT direction
145 out[i, 4] = np.sum(out[i][5:8] * calib_dir[i])
146 return out
149def _get_closest(track_pos, track_dir, meanDU_pos, meanDU_dir):
150 """calculate the distance of closest approach"""
151 # direction track to the mean DU, assumes vertical DU:
152 cross = np.cross(track_dir, meanDU_dir)
153 dir_to_DU = cross / np.linalg.norm(cross, axis=0) # normalized vector
155 # vector track position to mean DU position, perpendicular to track.
156 V = track_pos - meanDU_pos
157 d_closest = abs(np.dot(V, dir_to_DU))
159 # find z-coordinate of the point of closest approach
160 # 1) - vector track position to mean DU position, assuming vertical DU.
161 V[2] = 0.0
162 V_norm = np.linalg.norm(V, axis=0)
164 # 2) distance from the track origin to point of closest approach along the track
165 d = np.sqrt(V_norm**2 - d_closest**2)
166 # norm of the track direction along (x,y)
167 xy_norm = np.linalg.norm(track_dir[0:2], axis=0)
169 # 3) track_pos_Z + distance (along z axis) to closest approach along the track
170 z_closest = track_pos[2] + d * (track_dir[2] / xy_norm)
172 return d_closest, z_closest
175def get_closest(track, du_pos):
176 """calculate the distance of closest approach (d_closest) and its coordinate along the z axis (z_closest).
177 These calculations aLWAYS assume vertical DU.
179 Parameters
180 ----------
181 track : awkward.Array, DataFrame, numpy.recarray, km3pipe.Table or a dict.
182 One track with the following parameters:
183 - pos_x.
184 - pos_y.
185 - pos_z.
186 - dir_x.
187 - dir_y.
188 - dir_z.
189 du_pos : a DataFrame or a numpy recarray or a km3pipe Table or a dict.
190 du_pos vector with the following information:
191 - pos_x.
192 - pos_y.
193 - pos_z.
195 Returns
196 -------
197 tuple
198 (d_closest, z_closest).
199 """
200 pd = km3pipe.extras.pandas()
202 if isinstance(
203 du_pos, (dict, pd.core.series.Series, pd.DataFrame, Table, np.ndarray)
204 ):
205 meanDU_pos = np.array(
206 [du_pos["pos_x"], du_pos["pos_y"], du_pos["pos_z"]]
207 ).reshape(
208 3,
209 )
210 meanDU_dir = np.array([0, 0, 1]) # assumes vertical DU
212 if isinstance(
213 track, (dict, pd.core.series.Series, pd.DataFrame, Table, np.ndarray)
214 ):
215 track_pos = np.array([track["pos_x"], track["pos_y"], track["pos_z"]]).reshape(
216 3,
217 )
218 track_dir = np.array([track["dir_x"], track["dir_y"], track["dir_z"]]).reshape(
219 3,
220 )
222 if isinstance(track, ak.Array):
223 track_pos = np.array([track.pos_x, track.pos_y, track.pos_z])
224 track_dir = np.array([track.dir_x, track.dir_y, track.dir_z])
226 return _get_closest(track_pos, track_dir, meanDU_pos, meanDU_dir)
229def cut4d(point4d, tmin, tmax, rmin, rmax, items, c_water=C_WATER):
230 """Select items with a certain time residual and
231 within a certain radius around a given 4D point.
233 Parameters
234 -----------
235 point4d: array of shape(x, y, z, t)
236 central point of the selection
237 tmin: float
238 minimum tres of the sphere selection
239 tmax: float
240 maximum tres of the sphere selection
241 rmin: float
242 minimum radius of the sphere selection
243 rmax: float
244 maximum radius of the sphere selection
245 items: iterable with pos_[xyz]-attributed items
246 the items to cut on
248 Returns
249 --------
250 iterable with pos_[xyz]-attributed items
251 items which survived the cut.
252 """
253 point_array = point4d
255 if all(hasattr(point4d, "pos_" + q) for q in "xyz"):
256 point_array = np.array(
257 [point4d[0].pos_x, point4d[0].pos_y, point4d[0].pos_z, point4d[0].t]
258 )
260 dt = items.time - point_array[3]
262 items_pos = np.array([items.pos_x, items.pos_y, items.pos_z]).T
264 distances = km3pipe.math.dist(
265 np.array([point_array[0], point_array[1], point_array[2]]), items_pos, axis=1
266 )
268 tres = dt - (distances / C_WATER)
270 mask = (tres >= tmin) & (tres <= tmax) & (distances >= rmin) & (distances <= rmax)
271 selected_items = items[mask]
273 return selected_items