Coverage for src/km3pipe/hardware.py: 92%

355 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 03:14 +0000

1# Filename: hardware.py 

2# pylint: disable=locally-disabled 

3""" 

4Classes representing KM3NeT hardware. 

5 

6""" 

7from collections import OrderedDict, defaultdict 

8from io import StringIO 

9 

10import km3db 

11import numpy as np 

12 

13from .dataclasses import Table 

14from .tools import unpack_nfirst, split 

15from .math import intersect_3d, qrot_yaw # , ignored 

16 

17from .logger import get_logger, get_printer 

18 

19log = get_logger(__name__) # pylint: disable=C0103 

20 

21__author__ = "Tamas Gal" 

22__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

23__credits__ = [] 

24__license__ = "MIT" 

25__maintainer__ = "Tamas Gal" 

26__email__ = "tgal@km3net.de" 

27__status__ = "Development" 

28 

29 

30class Detector(object): 

31 """A KM3NeT detector. 

32 

33 Parameters 

34 ---------- 

35 filename: str, optional 

36 Name of .detx file with detector definition. 

37 det_id: int, optional 

38 .detx ID of detector (when retrieving from database). 

39 t0set: optional 

40 t0set (when retrieving from database). 

41 """ 

42 

43 max_supported_version = 5 

44 

45 def __init__(self, filename=None, det_id=None, t0set=0, string=None): 

46 self._det_file = None 

47 self.det_id = None 

48 self.n_doms = None 

49 self.dus = [] 

50 self.n_pmts_per_dom = None 

51 self.doms = OrderedDict() 

52 self.pmts = None # a Table 

53 self.version = None 

54 self.valid_from = None 

55 self.valid_until = None 

56 self.utm_info = None 

57 self._comments = [] 

58 self._dom_ids = [] 

59 self._pmt_index_by_omkey = OrderedDict() 

60 self._pmt_index_by_pmt_id = OrderedDict() 

61 self._current_du = None 

62 self._com = None 

63 

64 self._dom_positions = None 

65 self._dom_table = None 

66 self._pmt_angles = None 

67 self._xy_positions = None 

68 self.reset_caches() 

69 

70 self.cprint = get_printer(self.__class__.__name__) 

71 

72 if string: 

73 self._init_from_string(string) 

74 

75 if filename: 

76 self._init_from_file(filename) 

77 

78 if det_id is not None: 

79 self.cprint( 

80 "Retrieving DETX with detector ID {0} " 

81 "from the database...".format(det_id) 

82 ) 

83 detx = km3db.tools.detx(det_id, tcal=t0set) 

84 self._det_file = StringIO(detx) 

85 self._parse() 

86 if self.n_doms < 1: 

87 log.error("No data found for detector ID %s." % det_id) 

88 

89 def _init_from_string(self, string): 

90 # TODO this is ugly, refactor me please 

91 self._det_file = StringIO(string) 

92 self._extract_comments() 

93 self._parse() 

94 self._det_file.close() 

95 

96 def _init_from_file(self, filename): 

97 """Create detector from detx file.""" 

98 if not filename.endswith("detx"): 

99 raise NotImplementedError("Only the detx format is supported.") 

100 self._open_file(filename) 

101 self._extract_comments() 

102 self._parse() 

103 self._det_file.close() 

104 

105 def _open_file(self, filename): 

106 """Create the file handler""" 

107 self._det_file = open(filename, "r") 

108 

109 def _readline(self, ignore_comments=True): 

110 """The next line of the DETX file, optionally ignores comments""" 

111 while True: 

112 line = self._det_file.readline() 

113 if line == "": 

114 return line # To conform the EOF behaviour of .readline() 

115 line = line.strip() 

116 if line == "": 

117 continue # white-space-only line 

118 if line.startswith("#"): 

119 if not ignore_comments: 

120 return line 

121 else: 

122 return line 

123 

124 def _extract_comments(self): 

125 """Retrieve all comments from the file""" 

126 self._det_file.seek(0, 0) 

127 for line in self._det_file.readlines(): 

128 line = line.strip() 

129 if line.startswith("#"): 

130 self.add_comment(line[1:]) 

131 

132 def _parse_header(self): 

133 """Extract information from the header of the detector file""" 

134 self.cprint("Parsing the DETX header") 

135 self._det_file.seek(0, 0) 

136 first_line = self._readline() 

137 try: # backward compatibility workaround 

138 self.det_id, self.n_doms = split(first_line, int) 

139 self.version = 1 

140 except ValueError: 

141 det_id, self.version = first_line.split() 

142 self.det_id = int(det_id) 

143 self.version = int(self.version.lower().split("v")[1]) 

144 if self.version > self.max_supported_version: 

145 raise NotImplementedError( 

146 "DETX version {} not supported yet".format(self.version) 

147 ) 

148 validity = self._readline().strip() 

149 self.valid_from, self.valid_until = split(validity, float) 

150 raw_utm_info = self._readline().strip().split() 

151 try: 

152 self.utm_info = UTMInfo(*raw_utm_info[1:]) 

153 except TypeError: 

154 log.warning("Missing UTM information.") 

155 n_doms = self._readline() 

156 self.n_doms = int(n_doms) 

157 

158 # pylint: disable=C0103 

159 def _parse(self): 

160 """Extract dom information from detector file""" 

161 self._parse_header() 

162 self.cprint("Reading PMT information...") 

163 pmts = defaultdict(list) 

164 pmt_index = 0 

165 while True: 

166 line = self._readline() 

167 

168 if line == "": # readline semantics, signaling EOF 

169 self.cprint("Done.") 

170 break 

171 

172 if self.version <= 3: 

173 dom_id, du, floor, n_pmts = split(line, int) 

174 if self.version == 4: 

175 dom_id, du, floor, rest = unpack_nfirst(split(line), 3, int) 

176 x, y, z, q0, qx, qy, qz, t0, rest = unpack_nfirst(rest, 8, float) 

177 n_pmts, rest = unpack_nfirst(rest, 1, int) 

178 if rest: 

179 log.warning("Unexpected DOM values: {0}".format(rest)) 

180 if self.version == 5: 

181 dom_id, du, floor, rest = unpack_nfirst(split(line), 3, int) 

182 x, y, z, q0, qx, qy, qz, t0, rest = unpack_nfirst(rest, 8, float) 

183 component_status, n_pmts, rest = unpack_nfirst(rest, 2, int) 

184 if rest: 

185 log.warning("Unexpected DOM values: {0}".format(rest)) 

186 

187 if du != self._current_du: 

188 log.debug("Next DU, resetting floor to 1.") 

189 self._current_du = du 

190 self.dus.append(du) 

191 self._current_floor = 1 

192 if du == 1 and floor == -1: 

193 log.warning( 

194 "Floor ID is -1 (Jpp conversion bug), " 

195 "using our own floor ID!" 

196 ) 

197 else: 

198 self._current_floor += 1 

199 

200 if floor == -1: 

201 log.debug("Setting floor ID to our own ID") 

202 floor = self._current_floor 

203 

204 if self.version <= 3: 

205 self.doms[dom_id] = (du, floor, n_pmts) 

206 if self.version == 4: 

207 self.doms[dom_id] = (du, floor, n_pmts, x, y, z, q0, qx, qy, qz, t0) 

208 self._dom_positions[dom_id] = np.array([x, y, z]) 

209 if self.version == 5: 

210 self.doms[dom_id] = ( 

211 du, 

212 floor, 

213 n_pmts, 

214 x, 

215 y, 

216 z, 

217 q0, 

218 qx, 

219 qy, 

220 qz, 

221 component_status, 

222 t0, 

223 ) 

224 self._dom_positions[dom_id] = np.array([x, y, z]) 

225 

226 if self.n_pmts_per_dom is None: 

227 self.n_pmts_per_dom = n_pmts 

228 

229 if self.n_pmts_per_dom != n_pmts: 

230 log.warning( 

231 "DOMs with different number of PMTs are " 

232 "detected, this can cause some unexpected " 

233 "behaviour." 

234 ) 

235 

236 for i in range(n_pmts): 

237 raw_pmt_info = self._readline() 

238 pmt_info = raw_pmt_info.split() 

239 pmt_id, x, y, z, rest = unpack_nfirst(pmt_info, 4) 

240 dx, dy, dz, t0, rest = unpack_nfirst(rest, 4) 

241 pmt_id = int(pmt_id) 

242 omkey = (du, floor, i) 

243 pmts["pmt_id"].append(int(pmt_id)) 

244 pmts["pos_x"].append(float(x)) 

245 pmts["pos_y"].append(float(y)) 

246 pmts["pos_z"].append(float(z)) 

247 pmts["dir_x"].append(float(dx)) 

248 pmts["dir_y"].append(float(dy)) 

249 pmts["dir_z"].append(float(dz)) 

250 pmts["t0"].append(float(t0)) 

251 pmts["du"].append(int(du)) 

252 pmts["floor"].append(int(floor)) 

253 pmts["channel_id"].append(int(i)) 

254 pmts["dom_id"].append(int(dom_id)) 

255 if self.version in (3, 4, 5) and rest: 

256 status, rest = unpack_nfirst(rest, 1) 

257 pmts["status"].append(int(status)) 

258 if rest: 

259 log.warning("Unexpected PMT values: {0}".format(rest)) 

260 self._pmt_index_by_omkey[omkey] = pmt_index 

261 self._pmt_index_by_pmt_id[pmt_id] = pmt_index 

262 pmt_index += 1 

263 

264 self.pmts = Table(pmts, name="PMT") 

265 

266 def reset_caches(self): 

267 log.debug("Resetting caches.") 

268 self._dom_positions = OrderedDict() 

269 self._dom_table = None 

270 self._xy_positions = [] 

271 self._pmt_angles = [] 

272 self._com = None 

273 

274 def add_comment(self, comment): 

275 """Add a comment which will be prefixed with a '#'""" 

276 self._comments.append(comment) 

277 

278 @property 

279 def comments(self): 

280 return self._comments 

281 

282 @property 

283 def dom_ids(self): 

284 if not self._dom_ids: 

285 self._dom_ids = list(self.doms.keys()) 

286 return self._dom_ids 

287 

288 @property 

289 def dom_positions(self): 

290 """The positions of the DOMs, calculated from PMT directions.""" 

291 if not self._dom_positions: 

292 for dom_id in self.dom_ids: 

293 mask = self.pmts.dom_id == dom_id 

294 pmt_pos = self.pmts[mask].pos 

295 pmt_dir = self.pmts[mask].dir 

296 centre = intersect_3d(pmt_pos, pmt_pos - pmt_dir * 10) 

297 self._dom_positions[dom_id] = centre 

298 return self._dom_positions 

299 

300 @property 

301 def dom_table(self): 

302 """A `Table` containing DOM attributes""" 

303 if self._dom_table is None: 

304 data = defaultdict(list) 

305 for dom_id, (du, floor, _) in self.doms.items(): 

306 data["dom_id"].append(dom_id) 

307 data["du"].append(du) 

308 data["floor"].append(floor) 

309 dom_position = self.dom_positions[dom_id] 

310 data["pos_x"].append(dom_position[0]) 

311 data["pos_y"].append(dom_position[1]) 

312 data["pos_z"].append(dom_position[2]) 

313 self._dom_table = Table(data, name="DOMs", h5loc="/dom_table") 

314 return self._dom_table 

315 

316 @property 

317 def com(self): 

318 """Center of mass, calculated from the mean of the PMT positions""" 

319 if self._com is None: 

320 self._com = np.mean(self.pmts.pos, axis=0) 

321 return self._com 

322 

323 @property 

324 def xy_positions(self): 

325 """XY positions of the DUs, given by the DOMs on floor 1.""" 

326 if self._xy_positions is None or len(self._xy_positions) == 0: 

327 xy_pos = [] 

328 for dom_id, pos in self.dom_positions.items(): 

329 if self.domid2floor(dom_id) == 1: 

330 xy_pos.append(np.array([pos[0], pos[1]])) 

331 self._xy_positions = np.array(xy_pos) 

332 return self._xy_positions 

333 

334 def translate_detector(self, vector): 

335 """Translate the detector by a given vector""" 

336 vector = np.array(vector, dtype=float) 

337 self.pmts.pos_x += vector[0] 

338 self.pmts.pos_y += vector[1] 

339 self.pmts.pos_z += vector[2] 

340 self.reset_caches() 

341 

342 def rotate_dom_by_yaw(self, dom_id, heading, centre_point=None): 

343 """Rotate a DOM by a given (yaw) heading.""" 

344 pmts = self.pmts[self.pmts.dom_id == dom_id] 

345 if centre_point is None: 

346 centre_point = self.dom_positions[dom_id] 

347 

348 for pmt in pmts: 

349 pmt_pos = np.array([pmt.pos_x, pmt.pos_y, pmt.pos_z]) 

350 pmt_dir = np.array([pmt.dir_x, pmt.dir_y, pmt.dir_z]) 

351 pmt_radius = np.linalg.norm(centre_point - pmt_pos) 

352 index = self._pmt_index_by_pmt_id[pmt.pmt_id] 

353 pmt_ref = self.pmts[index] 

354 

355 dir_rot = qrot_yaw([pmt.dir_x, pmt.dir_y, pmt.dir_z], heading) 

356 pos_rot = pmt_pos - pmt_dir * pmt_radius + dir_rot * pmt_radius 

357 

358 pmt_ref.dir_x = dir_rot[0] 

359 pmt_ref.dir_y = dir_rot[1] 

360 pmt_ref.dir_z = dir_rot[2] 

361 pmt_ref.pos_x = pos_rot[0] 

362 pmt_ref.pos_y = pos_rot[1] 

363 pmt_ref.pos_z = pos_rot[2] 

364 self.reset_caches() 

365 

366 def rotate_du_by_yaw(self, du, heading): 

367 """Rotate all DOMs on DU by a given (yaw) heading.""" 

368 mask = self.pmts.du == du 

369 dom_ids = np.unique(self.pmts.dom_id[mask]) 

370 for dom_id in dom_ids: 

371 self.rotate_dom_by_yaw(dom_id, heading) 

372 self.reset_caches() 

373 

374 def rescale(self, factor, origin=(0, 0, 0)): 

375 """Stretch or shrink detector (DOM positions) by a given factor.""" 

376 pmts = self.pmts 

377 for dom_id in self.dom_ids: 

378 mask = pmts.dom_id == dom_id 

379 pos_x = pmts[mask].pos_x 

380 pos_y = pmts[mask].pos_y 

381 pos_z = pmts[mask].pos_z 

382 pmts.pos_x[mask] = (pos_x - origin[0]) * factor 

383 pmts.pos_y[mask] = (pos_y - origin[1]) * factor 

384 pmts.pos_z[mask] = (pos_z - origin[2]) * factor 

385 self.reset_caches() 

386 

387 @property 

388 def pmt_angles(self): 

389 """A list of PMT directions sorted by PMT channel, on DU-1, floor-1""" 

390 if self._pmt_angles == []: 

391 mask = (self.pmts.du == 1) & (self.pmts.floor == 1) 

392 self._pmt_angles = self.pmts.dir[mask] 

393 return self._pmt_angles 

394 

395 @property 

396 def ascii(self): 

397 """The ascii representation of the detector""" 

398 comments = "" 

399 if self.version == 3: 

400 for comment in self.comments: 

401 if not comment.startswith(" "): 

402 comment = " " + comment 

403 comments += "#" + comment + "\n" 

404 

405 if self.version == 1: 

406 header = "{det.det_id} {det.n_doms}".format(det=self) 

407 else: 

408 header = "{det.det_id} v{det.version}".format(det=self) 

409 header += "\n{0} {1}".format(self.valid_from, self.valid_until) 

410 header += "\n" + str(self.utm_info) + "\n" 

411 header += str(self.n_doms) 

412 

413 doms = "" 

414 for dom_id, (line, floor, n_pmts) in self.doms.items(): 

415 doms += "{0} {1} {2} {3}\n".format(dom_id, line, floor, n_pmts) 

416 for channel_id in range(n_pmts): 

417 pmt_idx = self._pmt_index_by_omkey[(line, floor, channel_id)] 

418 pmt = self.pmts[pmt_idx] 

419 doms += " {0} {1} {2} {3} {4} {5} {6} {7}".format( 

420 pmt.pmt_id, 

421 pmt.pos_x, 

422 pmt.pos_y, 

423 pmt.pos_z, 

424 pmt.dir_x, 

425 pmt.dir_y, 

426 pmt.dir_z, 

427 pmt.t0, 

428 ) 

429 if self.version == 3: 

430 doms += " {0}".format(pmt.status) 

431 doms += "\n" 

432 return comments + header + "\n" + doms 

433 

434 def write(self, filename): 

435 """Save detx file.""" 

436 with open(filename, "w") as f: 

437 f.write(self.ascii) 

438 self.cprint("Detector file saved as '{0}'".format(filename)) 

439 

440 def pmt_with_id(self, pmt_id): 

441 """Get PMT with global pmt_id""" 

442 try: 

443 return self.pmts[self._pmt_index_by_pmt_id[pmt_id]] 

444 except KeyError: 

445 raise KeyError("No PMT found for ID: {0}".format(pmt_id)) 

446 

447 def get_pmt(self, dom_id, channel_id): 

448 """Return PMT with DOM ID and DAQ channel ID""" 

449 dom = self.doms[dom_id] 

450 du = dom[0] 

451 floor = dom[1] 

452 pmt = self.pmts[self._pmt_index_by_omkey[(du, floor, channel_id)]] 

453 return pmt 

454 

455 def pmtid2omkey(self, pmt_id): 

456 return self._pmts_by_id[int(pmt_id)].omkey 

457 

458 def domid2floor(self, dom_id): 

459 return self.doms[dom_id][1] 

460 

461 @property 

462 def n_dus(self): 

463 return len(self.dus) 

464 

465 def __str__(self): 

466 return "Detector id: '{0}', n_doms: {1}, dus: {2}".format( 

467 self.det_id, self.n_doms, self.dus 

468 ) 

469 

470 def __repr__(self): 

471 return self.__str__() 

472 

473 

474class UTMInfo(object): 

475 """The UTM information""" 

476 

477 def __init__(self, ellipsoid, grid, easting, northing, z): 

478 self.ellipsoid = ellipsoid 

479 self.grid = grid 

480 self.easting = float(easting) 

481 self.northing = float(northing) 

482 self.z = float(z) 

483 

484 def __str__(self): 

485 return "UTM {} {} {} {} {}".format( 

486 self.ellipsoid, self.grid, self.easting, self.northing, self.z 

487 ) 

488 

489 def __repr__(self): 

490 return "UTMInfo: {}".format(self) 

491 

492 

493class PMT(object): 

494 """Represents a photomultiplier. 

495 

496 Parameters 

497 ---------- 

498 id: int 

499 pos: 3-float-tuple (x, y, z) 

500 dir: 3-float-tuple (x, y, z) 

501 t0: int 

502 channel_id: int 

503 omkey: int 

504 """ 

505 

506 def __init__(self, id, pos, dir, t0, channel_id, omkey): 

507 self.id = id 

508 self.pos = pos 

509 self.dir = dir 

510 self.t0 = t0 

511 self.channel_id = channel_id 

512 self.omkey = omkey 

513 

514 def __str__(self): 

515 return "PMT id:{0}, pos: {1}, dir: dir{2}, t0: {3}, DAQ channel: {4}".format( 

516 self.id, self.pos, self.dir, self.t0, self.channel_id 

517 ) 

518 

519 

520# PMT DAQ channel IDs ordered from top to bottom 

521ORDERED_PMT_IDS = [ 

522 28, 

523 23, 

524 22, 

525 21, 

526 27, 

527 29, 

528 20, 

529 30, 

530 26, 

531 25, 

532 19, 

533 24, 

534 13, 

535 7, 

536 1, 

537 14, 

538 18, 

539 12, 

540 6, 

541 2, 

542 11, 

543 8, 

544 0, 

545 15, 

546 4, 

547 3, 

548 5, 

549 17, 

550 10, 

551 9, 

552 16, 

553]