Coverage for src/km3pipe/hardware.py: 92%
355 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1# Filename: hardware.py
2# pylint: disable=locally-disabled
3"""
4Classes representing KM3NeT hardware.
6"""
7from collections import OrderedDict, defaultdict
8from io import StringIO
10import km3db
11import numpy as np
13from .dataclasses import Table
14from .tools import unpack_nfirst, split
15from .math import intersect_3d, qrot_yaw # , ignored
17from .logger import get_logger, get_printer
19log = get_logger(__name__) # pylint: disable=C0103
21__author__ = "Tamas Gal"
22__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
23__credits__ = []
24__license__ = "MIT"
25__maintainer__ = "Tamas Gal"
26__email__ = "tgal@km3net.de"
27__status__ = "Development"
30class Detector(object):
31 """A KM3NeT detector.
33 Parameters
34 ----------
35 filename: str, optional
36 Name of .detx file with detector definition.
37 det_id: int, optional
38 .detx ID of detector (when retrieving from database).
39 t0set: optional
40 t0set (when retrieving from database).
41 """
43 max_supported_version = 5
45 def __init__(self, filename=None, det_id=None, t0set=0, string=None):
46 self._det_file = None
47 self.det_id = None
48 self.n_doms = None
49 self.dus = []
50 self.n_pmts_per_dom = None
51 self.doms = OrderedDict()
52 self.pmts = None # a Table
53 self.version = None
54 self.valid_from = None
55 self.valid_until = None
56 self.utm_info = None
57 self._comments = []
58 self._dom_ids = []
59 self._pmt_index_by_omkey = OrderedDict()
60 self._pmt_index_by_pmt_id = OrderedDict()
61 self._current_du = None
62 self._com = None
64 self._dom_positions = None
65 self._dom_table = None
66 self._pmt_angles = None
67 self._xy_positions = None
68 self.reset_caches()
70 self.cprint = get_printer(self.__class__.__name__)
72 if string:
73 self._init_from_string(string)
75 if filename:
76 self._init_from_file(filename)
78 if det_id is not None:
79 self.cprint(
80 "Retrieving DETX with detector ID {0} "
81 "from the database...".format(det_id)
82 )
83 detx = km3db.tools.detx(det_id, tcal=t0set)
84 self._det_file = StringIO(detx)
85 self._parse()
86 if self.n_doms < 1:
87 log.error("No data found for detector ID %s." % det_id)
89 def _init_from_string(self, string):
90 # TODO this is ugly, refactor me please
91 self._det_file = StringIO(string)
92 self._extract_comments()
93 self._parse()
94 self._det_file.close()
96 def _init_from_file(self, filename):
97 """Create detector from detx file."""
98 if not filename.endswith("detx"):
99 raise NotImplementedError("Only the detx format is supported.")
100 self._open_file(filename)
101 self._extract_comments()
102 self._parse()
103 self._det_file.close()
105 def _open_file(self, filename):
106 """Create the file handler"""
107 self._det_file = open(filename, "r")
109 def _readline(self, ignore_comments=True):
110 """The next line of the DETX file, optionally ignores comments"""
111 while True:
112 line = self._det_file.readline()
113 if line == "":
114 return line # To conform the EOF behaviour of .readline()
115 line = line.strip()
116 if line == "":
117 continue # white-space-only line
118 if line.startswith("#"):
119 if not ignore_comments:
120 return line
121 else:
122 return line
124 def _extract_comments(self):
125 """Retrieve all comments from the file"""
126 self._det_file.seek(0, 0)
127 for line in self._det_file.readlines():
128 line = line.strip()
129 if line.startswith("#"):
130 self.add_comment(line[1:])
132 def _parse_header(self):
133 """Extract information from the header of the detector file"""
134 self.cprint("Parsing the DETX header")
135 self._det_file.seek(0, 0)
136 first_line = self._readline()
137 try: # backward compatibility workaround
138 self.det_id, self.n_doms = split(first_line, int)
139 self.version = 1
140 except ValueError:
141 det_id, self.version = first_line.split()
142 self.det_id = int(det_id)
143 self.version = int(self.version.lower().split("v")[1])
144 if self.version > self.max_supported_version:
145 raise NotImplementedError(
146 "DETX version {} not supported yet".format(self.version)
147 )
148 validity = self._readline().strip()
149 self.valid_from, self.valid_until = split(validity, float)
150 raw_utm_info = self._readline().strip().split()
151 try:
152 self.utm_info = UTMInfo(*raw_utm_info[1:])
153 except TypeError:
154 log.warning("Missing UTM information.")
155 n_doms = self._readline()
156 self.n_doms = int(n_doms)
158 # pylint: disable=C0103
159 def _parse(self):
160 """Extract dom information from detector file"""
161 self._parse_header()
162 self.cprint("Reading PMT information...")
163 pmts = defaultdict(list)
164 pmt_index = 0
165 while True:
166 line = self._readline()
168 if line == "": # readline semantics, signaling EOF
169 self.cprint("Done.")
170 break
172 if self.version <= 3:
173 dom_id, du, floor, n_pmts = split(line, int)
174 if self.version == 4:
175 dom_id, du, floor, rest = unpack_nfirst(split(line), 3, int)
176 x, y, z, q0, qx, qy, qz, t0, rest = unpack_nfirst(rest, 8, float)
177 n_pmts, rest = unpack_nfirst(rest, 1, int)
178 if rest:
179 log.warning("Unexpected DOM values: {0}".format(rest))
180 if self.version == 5:
181 dom_id, du, floor, rest = unpack_nfirst(split(line), 3, int)
182 x, y, z, q0, qx, qy, qz, t0, rest = unpack_nfirst(rest, 8, float)
183 component_status, n_pmts, rest = unpack_nfirst(rest, 2, int)
184 if rest:
185 log.warning("Unexpected DOM values: {0}".format(rest))
187 if du != self._current_du:
188 log.debug("Next DU, resetting floor to 1.")
189 self._current_du = du
190 self.dus.append(du)
191 self._current_floor = 1
192 if du == 1 and floor == -1:
193 log.warning(
194 "Floor ID is -1 (Jpp conversion bug), "
195 "using our own floor ID!"
196 )
197 else:
198 self._current_floor += 1
200 if floor == -1:
201 log.debug("Setting floor ID to our own ID")
202 floor = self._current_floor
204 if self.version <= 3:
205 self.doms[dom_id] = (du, floor, n_pmts)
206 if self.version == 4:
207 self.doms[dom_id] = (du, floor, n_pmts, x, y, z, q0, qx, qy, qz, t0)
208 self._dom_positions[dom_id] = np.array([x, y, z])
209 if self.version == 5:
210 self.doms[dom_id] = (
211 du,
212 floor,
213 n_pmts,
214 x,
215 y,
216 z,
217 q0,
218 qx,
219 qy,
220 qz,
221 component_status,
222 t0,
223 )
224 self._dom_positions[dom_id] = np.array([x, y, z])
226 if self.n_pmts_per_dom is None:
227 self.n_pmts_per_dom = n_pmts
229 if self.n_pmts_per_dom != n_pmts:
230 log.warning(
231 "DOMs with different number of PMTs are "
232 "detected, this can cause some unexpected "
233 "behaviour."
234 )
236 for i in range(n_pmts):
237 raw_pmt_info = self._readline()
238 pmt_info = raw_pmt_info.split()
239 pmt_id, x, y, z, rest = unpack_nfirst(pmt_info, 4)
240 dx, dy, dz, t0, rest = unpack_nfirst(rest, 4)
241 pmt_id = int(pmt_id)
242 omkey = (du, floor, i)
243 pmts["pmt_id"].append(int(pmt_id))
244 pmts["pos_x"].append(float(x))
245 pmts["pos_y"].append(float(y))
246 pmts["pos_z"].append(float(z))
247 pmts["dir_x"].append(float(dx))
248 pmts["dir_y"].append(float(dy))
249 pmts["dir_z"].append(float(dz))
250 pmts["t0"].append(float(t0))
251 pmts["du"].append(int(du))
252 pmts["floor"].append(int(floor))
253 pmts["channel_id"].append(int(i))
254 pmts["dom_id"].append(int(dom_id))
255 if self.version in (3, 4, 5) and rest:
256 status, rest = unpack_nfirst(rest, 1)
257 pmts["status"].append(int(status))
258 if rest:
259 log.warning("Unexpected PMT values: {0}".format(rest))
260 self._pmt_index_by_omkey[omkey] = pmt_index
261 self._pmt_index_by_pmt_id[pmt_id] = pmt_index
262 pmt_index += 1
264 self.pmts = Table(pmts, name="PMT")
266 def reset_caches(self):
267 log.debug("Resetting caches.")
268 self._dom_positions = OrderedDict()
269 self._dom_table = None
270 self._xy_positions = []
271 self._pmt_angles = []
272 self._com = None
274 def add_comment(self, comment):
275 """Add a comment which will be prefixed with a '#'"""
276 self._comments.append(comment)
278 @property
279 def comments(self):
280 return self._comments
282 @property
283 def dom_ids(self):
284 if not self._dom_ids:
285 self._dom_ids = list(self.doms.keys())
286 return self._dom_ids
288 @property
289 def dom_positions(self):
290 """The positions of the DOMs, calculated from PMT directions."""
291 if not self._dom_positions:
292 for dom_id in self.dom_ids:
293 mask = self.pmts.dom_id == dom_id
294 pmt_pos = self.pmts[mask].pos
295 pmt_dir = self.pmts[mask].dir
296 centre = intersect_3d(pmt_pos, pmt_pos - pmt_dir * 10)
297 self._dom_positions[dom_id] = centre
298 return self._dom_positions
300 @property
301 def dom_table(self):
302 """A `Table` containing DOM attributes"""
303 if self._dom_table is None:
304 data = defaultdict(list)
305 for dom_id, (du, floor, _) in self.doms.items():
306 data["dom_id"].append(dom_id)
307 data["du"].append(du)
308 data["floor"].append(floor)
309 dom_position = self.dom_positions[dom_id]
310 data["pos_x"].append(dom_position[0])
311 data["pos_y"].append(dom_position[1])
312 data["pos_z"].append(dom_position[2])
313 self._dom_table = Table(data, name="DOMs", h5loc="/dom_table")
314 return self._dom_table
316 @property
317 def com(self):
318 """Center of mass, calculated from the mean of the PMT positions"""
319 if self._com is None:
320 self._com = np.mean(self.pmts.pos, axis=0)
321 return self._com
323 @property
324 def xy_positions(self):
325 """XY positions of the DUs, given by the DOMs on floor 1."""
326 if self._xy_positions is None or len(self._xy_positions) == 0:
327 xy_pos = []
328 for dom_id, pos in self.dom_positions.items():
329 if self.domid2floor(dom_id) == 1:
330 xy_pos.append(np.array([pos[0], pos[1]]))
331 self._xy_positions = np.array(xy_pos)
332 return self._xy_positions
334 def translate_detector(self, vector):
335 """Translate the detector by a given vector"""
336 vector = np.array(vector, dtype=float)
337 self.pmts.pos_x += vector[0]
338 self.pmts.pos_y += vector[1]
339 self.pmts.pos_z += vector[2]
340 self.reset_caches()
342 def rotate_dom_by_yaw(self, dom_id, heading, centre_point=None):
343 """Rotate a DOM by a given (yaw) heading."""
344 pmts = self.pmts[self.pmts.dom_id == dom_id]
345 if centre_point is None:
346 centre_point = self.dom_positions[dom_id]
348 for pmt in pmts:
349 pmt_pos = np.array([pmt.pos_x, pmt.pos_y, pmt.pos_z])
350 pmt_dir = np.array([pmt.dir_x, pmt.dir_y, pmt.dir_z])
351 pmt_radius = np.linalg.norm(centre_point - pmt_pos)
352 index = self._pmt_index_by_pmt_id[pmt.pmt_id]
353 pmt_ref = self.pmts[index]
355 dir_rot = qrot_yaw([pmt.dir_x, pmt.dir_y, pmt.dir_z], heading)
356 pos_rot = pmt_pos - pmt_dir * pmt_radius + dir_rot * pmt_radius
358 pmt_ref.dir_x = dir_rot[0]
359 pmt_ref.dir_y = dir_rot[1]
360 pmt_ref.dir_z = dir_rot[2]
361 pmt_ref.pos_x = pos_rot[0]
362 pmt_ref.pos_y = pos_rot[1]
363 pmt_ref.pos_z = pos_rot[2]
364 self.reset_caches()
366 def rotate_du_by_yaw(self, du, heading):
367 """Rotate all DOMs on DU by a given (yaw) heading."""
368 mask = self.pmts.du == du
369 dom_ids = np.unique(self.pmts.dom_id[mask])
370 for dom_id in dom_ids:
371 self.rotate_dom_by_yaw(dom_id, heading)
372 self.reset_caches()
374 def rescale(self, factor, origin=(0, 0, 0)):
375 """Stretch or shrink detector (DOM positions) by a given factor."""
376 pmts = self.pmts
377 for dom_id in self.dom_ids:
378 mask = pmts.dom_id == dom_id
379 pos_x = pmts[mask].pos_x
380 pos_y = pmts[mask].pos_y
381 pos_z = pmts[mask].pos_z
382 pmts.pos_x[mask] = (pos_x - origin[0]) * factor
383 pmts.pos_y[mask] = (pos_y - origin[1]) * factor
384 pmts.pos_z[mask] = (pos_z - origin[2]) * factor
385 self.reset_caches()
387 @property
388 def pmt_angles(self):
389 """A list of PMT directions sorted by PMT channel, on DU-1, floor-1"""
390 if self._pmt_angles == []:
391 mask = (self.pmts.du == 1) & (self.pmts.floor == 1)
392 self._pmt_angles = self.pmts.dir[mask]
393 return self._pmt_angles
395 @property
396 def ascii(self):
397 """The ascii representation of the detector"""
398 comments = ""
399 if self.version == 3:
400 for comment in self.comments:
401 if not comment.startswith(" "):
402 comment = " " + comment
403 comments += "#" + comment + "\n"
405 if self.version == 1:
406 header = "{det.det_id} {det.n_doms}".format(det=self)
407 else:
408 header = "{det.det_id} v{det.version}".format(det=self)
409 header += "\n{0} {1}".format(self.valid_from, self.valid_until)
410 header += "\n" + str(self.utm_info) + "\n"
411 header += str(self.n_doms)
413 doms = ""
414 for dom_id, (line, floor, n_pmts) in self.doms.items():
415 doms += "{0} {1} {2} {3}\n".format(dom_id, line, floor, n_pmts)
416 for channel_id in range(n_pmts):
417 pmt_idx = self._pmt_index_by_omkey[(line, floor, channel_id)]
418 pmt = self.pmts[pmt_idx]
419 doms += " {0} {1} {2} {3} {4} {5} {6} {7}".format(
420 pmt.pmt_id,
421 pmt.pos_x,
422 pmt.pos_y,
423 pmt.pos_z,
424 pmt.dir_x,
425 pmt.dir_y,
426 pmt.dir_z,
427 pmt.t0,
428 )
429 if self.version == 3:
430 doms += " {0}".format(pmt.status)
431 doms += "\n"
432 return comments + header + "\n" + doms
434 def write(self, filename):
435 """Save detx file."""
436 with open(filename, "w") as f:
437 f.write(self.ascii)
438 self.cprint("Detector file saved as '{0}'".format(filename))
440 def pmt_with_id(self, pmt_id):
441 """Get PMT with global pmt_id"""
442 try:
443 return self.pmts[self._pmt_index_by_pmt_id[pmt_id]]
444 except KeyError:
445 raise KeyError("No PMT found for ID: {0}".format(pmt_id))
447 def get_pmt(self, dom_id, channel_id):
448 """Return PMT with DOM ID and DAQ channel ID"""
449 dom = self.doms[dom_id]
450 du = dom[0]
451 floor = dom[1]
452 pmt = self.pmts[self._pmt_index_by_omkey[(du, floor, channel_id)]]
453 return pmt
455 def pmtid2omkey(self, pmt_id):
456 return self._pmts_by_id[int(pmt_id)].omkey
458 def domid2floor(self, dom_id):
459 return self.doms[dom_id][1]
461 @property
462 def n_dus(self):
463 return len(self.dus)
465 def __str__(self):
466 return "Detector id: '{0}', n_doms: {1}, dus: {2}".format(
467 self.det_id, self.n_doms, self.dus
468 )
470 def __repr__(self):
471 return self.__str__()
474class UTMInfo(object):
475 """The UTM information"""
477 def __init__(self, ellipsoid, grid, easting, northing, z):
478 self.ellipsoid = ellipsoid
479 self.grid = grid
480 self.easting = float(easting)
481 self.northing = float(northing)
482 self.z = float(z)
484 def __str__(self):
485 return "UTM {} {} {} {} {}".format(
486 self.ellipsoid, self.grid, self.easting, self.northing, self.z
487 )
489 def __repr__(self):
490 return "UTMInfo: {}".format(self)
493class PMT(object):
494 """Represents a photomultiplier.
496 Parameters
497 ----------
498 id: int
499 pos: 3-float-tuple (x, y, z)
500 dir: 3-float-tuple (x, y, z)
501 t0: int
502 channel_id: int
503 omkey: int
504 """
506 def __init__(self, id, pos, dir, t0, channel_id, omkey):
507 self.id = id
508 self.pos = pos
509 self.dir = dir
510 self.t0 = t0
511 self.channel_id = channel_id
512 self.omkey = omkey
514 def __str__(self):
515 return "PMT id:{0}, pos: {1}, dir: dir{2}, t0: {3}, DAQ channel: {4}".format(
516 self.id, self.pos, self.dir, self.t0, self.channel_id
517 )
520# PMT DAQ channel IDs ordered from top to bottom
521ORDERED_PMT_IDS = [
522 28,
523 23,
524 22,
525 21,
526 27,
527 29,
528 20,
529 30,
530 26,
531 25,
532 19,
533 24,
534 13,
535 7,
536 1,
537 14,
538 18,
539 12,
540 6,
541 2,
542 11,
543 8,
544 0,
545 15,
546 4,
547 3,
548 5,
549 17,
550 10,
551 9,
552 16,
553]