Coverage for src/km3pipe/io/daq.py: 76%

442 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-19 03:14 +0000

1# Filename: daq.py 

2# pylint: disable=R0903 

3""" 

4Pumps for the DAQ data formats. 

5 

6""" 

7 

8from collections import namedtuple 

9from io import BytesIO 

10import json 

11import math 

12import struct 

13from struct import unpack 

14import time 

15import pprint 

16from urllib.request import urlopen, URLError 

17 

18import numpy as np 

19 

20from thepipe import Module, Blob 

21from km3pipe.dataclasses import Table 

22from km3pipe.sys import ignored 

23from km3pipe.logger import get_logger, get_printer 

24 

25__author__ = "Tamas Gal" 

26__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

27__credits__ = [] 

28__license__ = "MIT" 

29__maintainer__ = "Tamas Gal" 

30__email__ = "tgal@km3net.de" 

31__status__ = "Development" 

32 

33log = get_logger(__name__) # pylint: disable=C0103 

34 

35DATA_TYPES = { 

36 101: "DAQSuperFrame", 

37 201: "DAQSummaryFrame", 

38 # Using the same class for all timeslices since they are structurally 

39 # identical (until now) 

40 1001: "DAQTimeslice", # Type of erroneous timeslice data 

41 1002: "DAQTimeslice", # L0 

42 1003: "DAQTimeslice", # L1 

43 1004: "DAQTimeslice", # L2 

44 1005: "DAQTimeslice", # SN 

45 2001: "DAQSummaryslice", 

46 10001: "DAQEvent", 

47} 

48MINIMAL_RATE_HZ = 2.0e3 

49MAXIMAL_RATE_HZ = 2.0e6 

50 

51 

52class TimesliceParser(Module): 

53 """Preliminary parser for DAQTimeslice""" 

54 

55 def configure(self): 

56 self.legacy = self.get("legacy", default=False) 

57 

58 def _get_raw_data(self, blob): 

59 if "CHPrefix" in blob: 

60 if not str(blob["CHPrefix"].tag).startswith("IO_TS"): 

61 log.info("Not an IO_TS* blob") 

62 return blob 

63 return BytesIO(blob["CHData"]) 

64 if "FileIO" in blob: 

65 return blob["FileIO"] 

66 if "RawBytes" in blob: 

67 return BytesIO(blob["RawBytes"]) 

68 

69 def process(self, blob): 

70 data = self._get_raw_data(blob) 

71 if data is None: 

72 return blob 

73 try: 

74 ts_info, ts_frameinfos, ts_hits = self._parse_timeslice(data) 

75 except struct.error: 

76 log.error("Could not parse Timeslice") 

77 log.error(blob.keys()) 

78 else: 

79 blob["TSHits"] = ts_hits 

80 blob["TimesliceInfo"] = ts_info 

81 blob["TimesliceFrameInfos"] = ts_frameinfos 

82 return blob 

83 

84 def _parse_timeslice(self, data): 

85 tsl_size, datatype = unpack("<ii", data.read(8)) 

86 if not self.legacy: 

87 version = unpack("<h", data.read(2))[0] 

88 if version != 1: 

89 raise ValueError( 

90 "Unsupported DAQTimeslice version ({}) or legacy DAQ. " 

91 "Make sure Jpp v13+ is used or pass 'legacy=True' " 

92 "to {}.".format(version, self.__class__.__name__) 

93 ) 

94 det_id, run, sqnr = unpack("<iii", data.read(12)) 

95 timestamp, ns_ticks, n_frames = unpack("<iii", data.read(12)) 

96 

97 ts_info = Table.from_template( 

98 { 

99 "frame_index": sqnr, 

100 "slice_id": 0, 

101 "timestamp": timestamp, 

102 "nanoseconds": ns_ticks * 16, 

103 "n_frames": n_frames, 

104 }, 

105 "TimesliceInfo", 

106 ) 

107 ts_frameinfos = {} 

108 

109 _dom_ids = [] 

110 _channel_ids = [] 

111 _times = [] 

112 _tots = [] 

113 for _ in range(n_frames): 

114 frame_size, datatype = unpack("<ii", data.read(8)) 

115 det_id, run, sqnr = unpack("<iii", data.read(12)) 

116 timestamp, ns_ticks, dom_id = unpack("<iii", data.read(12)) 

117 dataqueue_status = unpack("<i", data.read(4))[0] 

118 dom_status = unpack("<iiii", data.read(4 * 4)) 

119 n_hits = unpack("<i", data.read(4))[0] 

120 ts_frameinfos[dom_id] = Table.from_template( 

121 { 

122 "det_id": det_id, 

123 "run_id": run, 

124 "sqnr": sqnr, 

125 "timestamp": timestamp, 

126 "nanoseconds": ns_ticks * 16, 

127 "dom_id": dom_id, 

128 "dataqueue_status": dataqueue_status, 

129 "dom_status": dom_status, 

130 "n_hits": n_hits, 

131 }, 

132 "TimesliceFrameInfo", 

133 ) 

134 for j in range(n_hits): 

135 hit = unpack("!BlB", data.read(6)) 

136 _dom_ids.append(dom_id) 

137 _channel_ids.append(hit[0]) 

138 _times.append(hit[1]) 

139 _tots.append(hit[2]) 

140 

141 ts_hits = Table( 

142 { 

143 "channel_id": np.array(_channel_ids), 

144 "dom_id": np.array(_dom_ids), 

145 "time": np.array(_times), 

146 "tot": np.array(_tots), 

147 }, 

148 name="TimesliceHits", 

149 h5loc="/timeslice_hits", 

150 split_h5=True, 

151 ) 

152 return ts_info, ts_frameinfos, ts_hits 

153 

154 

155class RePump(Module): 

156 """A pump for binary DAQ files. 

157 

158 

159 This pump can be used to replay raw dumps e.g. created with the ``daqsample`` 

160 tool. It creates the same structures as the ``kp.io.ch.CHPump`` and thus 

161 suited to test online processing pipelines with offline files. 

162 

163 """ 

164 

165 def configure(self): 

166 self.filename = self.require("filename") 

167 self.fobj = open(self.filename, "rb") 

168 

169 def process(self, blob): 

170 try: 

171 length, data_type = unpack("<ii", self.fobj.read(8)) 

172 self.fobj.seek(-8, 1) 

173 except struct.error: 

174 raise StopIteration 

175 data = self.fobj.read(length) 

176 blob["RawBytes"] = data 

177 return blob 

178 

179 def finish(self): 

180 self.fobj.close() 

181 

182 

183class DAQPump(Module): 

184 """A pump for binary DAQ files. Deprecated!""" 

185 

186 def configure(self): 

187 self.filename = self.require("filename") 

188 self.legacy = self.get("legacy", default=False) 

189 self.frame_positions = [] 

190 self.index = 0 

191 

192 self.blob_file = self.open_file(self.filename) 

193 self.determine_frame_positions() 

194 

195 def next_blob(self): 

196 """Get the next frame from file""" 

197 blob_file = self.blob_file 

198 try: 

199 preamble = DAQPreamble(file_obj=blob_file) 

200 except struct.error: 

201 raise StopIteration 

202 

203 try: 

204 data_type = DATA_TYPES[preamble.data_type] 

205 except KeyError: 

206 log.error("Unknown datatype: {0}".format(preamble.data_type)) 

207 data_type = "Unknown" 

208 

209 blob = Blob() 

210 blob[data_type] = None 

211 blob["DAQPreamble"] = preamble 

212 

213 if data_type == "DAQSummaryslice": 

214 daq_frame = DAQSummaryslice(blob_file, legacy=self.legacy) 

215 blob[data_type] = daq_frame 

216 blob["DAQHeader"] = daq_frame.header 

217 elif data_type == "DAQEvent": 

218 daq_frame = DAQEvent(blob_file, legacy=self.legacy) 

219 blob[data_type] = daq_frame 

220 blob["DAQHeader"] = daq_frame.header 

221 else: 

222 log.warning( 

223 "Skipping DAQ frame with data type code '{0}'.".format( 

224 preamble.data_type 

225 ) 

226 ) 

227 blob_file.seek(preamble.length - DAQPreamble.size, 1) 

228 

229 return blob 

230 

231 def seek_to_frame(self, index): 

232 """Move file pointer to the frame with given index.""" 

233 pointer_position = self.frame_positions[index] 

234 self.blob_file.seek(pointer_position, 0) 

235 

236 def get_blob(self, index): 

237 """Return blob at given index.""" 

238 self.seek_to_frame(index) 

239 return self.next_blob() 

240 

241 def determine_frame_positions(self): 

242 """Record the file pointer position of each frame""" 

243 self.blob_file.seek(0, 0) 

244 with ignored(struct.error): 

245 while True: 

246 pointer_position = self.blob_file.tell() 

247 length = struct.unpack("<i", self.blob_file.read(4))[0] 

248 self.blob_file.seek(length - 4, 1) 

249 self.frame_positions.append(pointer_position) 

250 self.blob_file.seek(0, 0) 

251 log.info("Found {0} frames.".format(len(self.frame_positions))) 

252 

253 def process(self, blob): 

254 """Pump the next blob to the modules""" 

255 return self.next_blob() 

256 

257 def finish(self): 

258 """Clean everything up""" 

259 self.blob_file.close() 

260 

261 def __len__(self): 

262 if not self.frame_positions: 

263 self.determine_frame_positions() 

264 return len(self.frame_positions) 

265 

266 def __iter__(self): 

267 return self 

268 

269 def __next__(self): 

270 try: 

271 blob = self.get_blob(self.index) 

272 except IndexError: 

273 self.index = 0 

274 raise StopIteration 

275 self.index += 1 

276 return blob 

277 

278 def __getitem__(self, index): 

279 if isinstance(index, int): 

280 return self.get_blob(index) 

281 elif isinstance(index, slice): 

282 return self._slice_generator(index) 

283 else: 

284 raise TypeError("index must be int or slice") 

285 

286 def _slice_generator(self, index): 

287 """A simple slice generator for iterations""" 

288 start, stop, step = index.indices(len(self)) 

289 for i in range(start, stop, step): 

290 yield self.get_blob(i) 

291 

292 

293class DAQProcessor(Module): 

294 def configure(self): 

295 self.legacy = self.get("legacy", default=False) 

296 self.index = 0 

297 self.event_id = 0 

298 

299 def process(self, blob): 

300 tag = str(blob["CHPrefix"].tag) 

301 data = blob["CHData"] 

302 

303 processor = None 

304 

305 if tag == "IO_EVT": 

306 processor = self.process_event 

307 if tag == "IO_SUM": 

308 processor = self.process_summaryslice 

309 if tag == "IO_OLINE": 

310 processor = self.process_online_reco 

311 

312 if processor is None: 

313 self.log.error("Unsupported tag: %s", tag) 

314 return 

315 

316 try: 

317 processor(data, blob) 

318 except (struct.error, ValueError) as e: 

319 self.log.error("Corrupt data received. Skipping...\n" "Error: %s", e) 

320 return 

321 

322 return blob 

323 

324 def process_event(self, data, blob): 

325 data_io = BytesIO(data) 

326 preamble = DAQPreamble(file_obj=data_io) # noqa 

327 event = DAQEvent(file_obj=data_io, legacy=self.legacy) 

328 header = event.header 

329 

330 event_info = Table.from_template( 

331 { 

332 "det_id": header.det_id, 

333 # 'frame_index': self.index, # header.time_slice, 

334 "frame_index": header.time_slice, 

335 "livetime_sec": 0, 

336 "mc_id": 0, 

337 "mc_t": 0, 

338 "n_events_gen": 0, 

339 "n_files_gen": 0, 

340 "overlays": event.overlays, 

341 "trigger_counter": event.trigger_counter, 

342 "trigger_mask": event.trigger_mask, 

343 "utc_nanoseconds": header.ticks * 16, 

344 "utc_seconds": header.time_stamp, 

345 "weight_w1": 0, 

346 "weight_w2": 0, 

347 "weight_w3": 0, # MC weights 

348 "run_id": header.run, # run id 

349 "group_id": self.event_id, 

350 }, 

351 "EventInfo", 

352 ) 

353 blob["EventInfo"] = event_info 

354 

355 self.event_id += 1 

356 self.index += 1 

357 

358 hits = event.snapshot_hits 

359 n_hits = event.n_snapshot_hits 

360 if n_hits == 0: 

361 self.log.warning("No hits found in event.") 

362 return 

363 

364 # This might not be needed 

365 triggereds = np.zeros(n_hits) 

366 triggered_map = {} 

367 for thit in event.triggered_hits: 

368 # TODO: switch to thit.trigger_mask instead of True! 

369 triggered_map[(thit.dom_id, thit.channel_id, thit.time, thit.tot)] = True 

370 for idx, hit in enumerate(hits): 

371 triggereds[idx] = tuple(hit) in triggered_map 

372 

373 hit_series = Table.from_template( 

374 { 

375 "channel_id": hits.channel_id, 

376 "dom_id": hits.dom_id, 

377 "time": hits.time, 

378 "tot": hits.tot, 

379 "triggered": triggereds, # TODO: switch to trigger_mask! 

380 "group_id": self.event_id, 

381 }, 

382 "Hits", 

383 ) 

384 

385 blob["Hits"] = hit_series 

386 

387 def process_summaryslice(self, data, blob): 

388 data_io = BytesIO(data) 

389 preamble = DAQPreamble(file_obj=data_io) # noqa 

390 summaryslice = DAQSummaryslice(file_obj=data_io, legacy=self.legacy) 

391 blob["RawSummaryslice"] = summaryslice 

392 

393 def process_online_reco(self, data, blob): 

394 data_io = BytesIO(data) 

395 preamble = DAQPreamble(file_obj=data_io) # noqa 

396 _data = unpack("<iiiQI", data_io.read(4 + 4 + 4 + 8 + 4)) 

397 det_id, run_id, frame_index, trigger_counter, utc_seconds = _data 

398 shower_reco = unpack("9d", data_io.read(9 * 8)) 

399 shower_meta = unpack("3i", data_io.read(12)) 

400 track_reco = unpack("9d", data_io.read(9 * 8)) 

401 track_meta = unpack("3i", data_io.read(12)) 

402 print( 

403 "Shower: x/y/z/dx/dy/dz/E/Q/t (type/status/ndf): ", shower_reco, shower_meta 

404 ) 

405 print("Track: x/y/z/dx/dy/dz/E/Q/t (type/status/ndf): ", track_reco, track_meta) 

406 blob["ReconstructionInfo"] = Table( 

407 { 

408 "det_id": det_id, 

409 "run_id": run_id, 

410 "frame_index": frame_index, 

411 "trigger_counter": trigger_counter, 

412 "utc_seconds": utc_seconds, 

413 }, 

414 h5loc="reco", 

415 split_h5=True, 

416 name="Reconstructions", 

417 ) 

418 args = track_reco + track_meta 

419 blob["RecoTrack"] = RecoTrack(*args) 

420 args = shower_reco + shower_meta 

421 blob["RecoShower"] = RecoShower(*args) 

422 

423 

424RecoTrack = namedtuple("RecoTrack", "x y z dx dy dz E Q t type status ndf") 

425RecoShower = namedtuple("RecoShower", "x y z dx dy dz E Q t type status ndf") 

426 

427 

428class DAQPreamble(object): 

429 """Wrapper for the JDAQPreamble binary format. 

430 

431 Parameters 

432 ---------- 

433 byte_data : bytes (optional) 

434 The binary file, where the file pointer is at the beginning of the header. 

435 file_obj : file (optional) 

436 The binary file, where the file pointer is at the beginning of the header. 

437 

438 Attributes 

439 ---------- 

440 size : int 

441 The size of the original DAQ byte representation. 

442 data_type : int 

443 The data type of the following frame. The coding is stored in the 

444 ``DATA_TYPES``. 

445 

446 """ 

447 

448 size = 8 

449 

450 def __init__(self, byte_data=None, file_obj=None): 

451 self.length = None 

452 self.data_type = None 

453 if byte_data: 

454 self._parse_byte_data(byte_data) 

455 if file_obj: 

456 self._parse_file(file_obj) 

457 

458 def _parse_byte_data(self, byte_data): 

459 """Extract the values from byte string.""" 

460 self.length, self.data_type = unpack("<ii", byte_data[: self.size]) 

461 

462 def _parse_file(self, file_obj): 

463 """Directly read from file handler. 

464 

465 Note that this will move the file pointer. 

466 

467 """ 

468 byte_data = file_obj.read(self.size) 

469 self._parse_byte_data(byte_data) 

470 

471 def __repr__(self): 

472 description = "Length: {0}\nDataType: {1}".format(self.length, self.data_type) 

473 return description 

474 

475 

476class DAQHeader(object): 

477 """Wrapper for the JDAQHeader binary format. 

478 

479 Parameters 

480 ---------- 

481 byte_data : bytes (optional) 

482 The binary file, where the file pointer is at the beginning of the header. 

483 file_obj : file (optional) 

484 The binary file, where the file pointer is at the beginning of the header. 

485 

486 Attributes 

487 ---------- 

488 size : int 

489 The size of the original DAQ byte representation. 

490 

491 """ 

492 

493 size = 20 

494 

495 def __init__(self, byte_data=None, file_obj=None): 

496 self.run = None 

497 self.time_slice = None 

498 self.time_stamp = None 

499 self.wr_status = None 

500 if byte_data: 

501 self._parse_byte_data(byte_data) 

502 if file_obj: 

503 self._parse_file(file_obj) 

504 

505 def _parse_byte_data(self, byte_data): 

506 """Extract the values from byte string.""" 

507 chunks = unpack("<iiiii", byte_data[: self.size]) 

508 det_id, run, time_slice, time_stamp, ticks = chunks 

509 self.det_id = det_id 

510 self.run = run 

511 self.time_slice = time_slice 

512 # most significant bit is the WR status 

513 self.wr_status = time_stamp & 0x80000000 

514 # masking the most significant bit, which is the WR status 

515 self.time_stamp = time_stamp & 0x7FFFFFFF 

516 self.ticks = ticks 

517 

518 def _parse_file(self, file_obj): 

519 """Directly read from file handler. 

520 

521 Note: 

522 This will move the file pointer. 

523 

524 """ 

525 byte_data = file_obj.read(self.size) 

526 self._parse_byte_data(byte_data) 

527 

528 def __repr__(self): 

529 description = "Run: {0}\nTime slice: {1}\nTime stamp: {2} ({3})".format( 

530 self.run, self.time_slice, self.time_stamp, self.ticks 

531 ) 

532 return description 

533 

534 

535class DAQSummaryslice(object): 

536 """Wrapper for the JDAQSummarySlice binary format. 

537 

538 Parameters 

539 ---------- 

540 file_obj : file (optional) 

541 The binary file, where the file pointer is at the beginning of the header. 

542 

543 Attributes 

544 ---------- 

545 n_summary_frames : int 

546 The number of summary frames. 

547 summary_frames : dict 

548 The PMT rates for each DOM. The key is the DOM identifier and the 

549 corresponding value is a sorted list of PMT rates in [Hz]. 

550 dom_rates : dict 

551 The overall DOM rate for each DOM. 

552 

553 """ 

554 

555 def __init__(self, file_obj, legacy=False): 

556 if not legacy: 

557 version = unpack("<h", file_obj.read(2))[0] 

558 if version != 6: 

559 raise ValueError( 

560 "Unsupported {} version ({}) or legacy DAQ. " 

561 "Make sure Jpp v13+ is used or pass 'legacy=True' " 

562 "to the init.".format(self.__class__.__name__, version) 

563 ) 

564 self.header = DAQHeader(file_obj=file_obj) 

565 self.n_summary_frames = unpack("<i", file_obj.read(4))[0] 

566 self.summary_frames = {} 

567 self.dq_status = {} 

568 self.dom_status = {} 

569 self.dom_rates = {} 

570 

571 self._parse_summary_frames(file_obj) 

572 

573 def _parse_summary_frames(self, file_obj): 

574 """Iterate through the byte data and fill the summary_frames""" 

575 for _ in range(self.n_summary_frames): 

576 dom_id = unpack("<i", file_obj.read(4))[0] 

577 dq_status = file_obj.read(4) # probably dom status? # noqa 

578 dom_status = unpack("<iiii", file_obj.read(16)) 

579 raw_rates = unpack("b" * 31, file_obj.read(31)) 

580 pmt_rates = [self._get_rate(value) for value in raw_rates] 

581 self.summary_frames[dom_id] = pmt_rates 

582 self.dq_status[dom_id] = dq_status 

583 self.dom_status[dom_id] = dom_status 

584 self.dom_rates[dom_id] = np.sum(pmt_rates) 

585 

586 def _get_rate(self, value): 

587 """Return the rate in Hz from the short int value""" 

588 if value == 0: 

589 return 0 

590 else: 

591 return MINIMAL_RATE_HZ * math.exp(value * self._get_factor()) 

592 

593 def _get_factor(self): 

594 return math.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255 

595 

596 

597class DAQEvent(object): 

598 """Wrapper for the JDAQEvent binary format. 

599 

600 Parameters 

601 ---------- 

602 file_obj : file 

603 The binary file, where the file pointer is at the beginning of the header. 

604 

605 

606 Attributes 

607 ---------- 

608 trigger_counter : int 

609 Incremental identifier of the occurred trigger. 

610 trigger_mask : int 

611 The trigger type(s) satisfied. 

612 overlays : int 

613 Number of merged events. 

614 n_triggered_hits : int 

615 Number of hits satisfying the trigger conditions. 

616 n_snapshot_hits : int 

617 Number of snapshot hits. 

618 triggered_hits : np.recarray 

619 Array of triggered hits (fields: dom_id, pmt_id, tdc_time, tot, trigger_mask) 

620 snapshot_hits : np.recarray 

621 A list of snapshot hits (fields: dom_id, pmt_id, tdc_time, tot) 

622 

623 """ 

624 

625 triggered_hits_dt = np.dtype( 

626 [ 

627 ("dom_id", "<i"), 

628 ("channel_id", np.uint8), 

629 ("time", ">I"), 

630 ("tot", np.uint8), 

631 ("trigger_mask", "<Q"), 

632 ] 

633 ) 

634 # Numba chokes on big endian and the time is big, so we need to reinterpret 

635 triggered_hits_dt_final = np.dtype( 

636 [ 

637 ("dom_id", "<i"), 

638 ("channel_id", np.uint8), 

639 ("time", "<I"), 

640 ("tot", np.uint8), 

641 ("trigger_mask", "<Q"), 

642 ] 

643 ) 

644 snapshot_hits_dt = np.dtype( 

645 [ 

646 ("dom_id", "<i"), 

647 ("channel_id", np.uint8), 

648 ("time", ">I"), 

649 ("tot", np.uint8), 

650 ] 

651 ) 

652 # Numba chokes on big endian and the time is big, so we need to reinterpret 

653 snapshot_hits_dt_final = np.dtype( 

654 [ 

655 ("dom_id", "<i"), 

656 ("channel_id", np.uint8), 

657 ("time", "<I"), 

658 ("tot", np.uint8), 

659 ] 

660 ) 

661 

662 def __init__(self, file_obj, legacy=False): 

663 if not legacy: 

664 version = unpack("<h", file_obj.read(2))[0] 

665 if version != 4: 

666 raise ValueError( 

667 "Unsupported {} version ({}) or legacy DAQ. " 

668 "Make sure Jpp v13+ is used or pass 'legacy=True' " 

669 "to the init.".format(self.__class__.__name__, version) 

670 ) 

671 self.header = DAQHeader(file_obj=file_obj) 

672 self.trigger_counter = unpack("<Q", file_obj.read(8))[0] 

673 self.trigger_mask = unpack("<Q", file_obj.read(8))[0] 

674 self.overlays = unpack("<i", file_obj.read(4))[0] 

675 

676 self.n_triggered_hits = unpack("<i", file_obj.read(4))[0] 

677 self.triggered_hits = self._parse_triggered_hits(file_obj) 

678 

679 self.n_snapshot_hits = unpack("<i", file_obj.read(4))[0] 

680 self.snapshot_hits = self._parse_snapshot_hits(file_obj) 

681 

682 def _parse_triggered_hits(self, file_obj): 

683 """Parse and store triggered hits.""" 

684 raw_data = file_obj.read( 

685 self.triggered_hits_dt.itemsize * self.n_triggered_hits 

686 ) 

687 arr = np.frombuffer(raw_data, self.triggered_hits_dt).view(np.recarray) 

688 return arr.astype(self.triggered_hits_dt_final) 

689 

690 def _parse_snapshot_hits(self, file_obj): 

691 """Parse and store snapshot hits.""" 

692 raw_data = file_obj.read(self.snapshot_hits_dt.itemsize * self.n_snapshot_hits) 

693 arr = np.frombuffer(raw_data, self.snapshot_hits_dt).view(np.recarray) 

694 return arr.astype(self.snapshot_hits_dt_final) 

695 

696 def __repr__(self): 

697 string = "\n".join( 

698 ( 

699 " Number of triggered hits: " + str(self.n_triggered_hits), 

700 " Number of snapshot hits: " + str(self.n_snapshot_hits), 

701 ) 

702 ) 

703 string += "\nTriggered hits:\n" 

704 string += pprint.pformat(self.triggered_hits) 

705 string += "\nSnapshot hits:\n" 

706 string += pprint.pformat(self.snapshot_hits) 

707 return string 

708 

709 

710class TMCHData(object): 

711 """Monitoring Channel data.""" 

712 

713 def __init__(self, file_obj, version=None): 

714 f = file_obj 

715 

716 data_type = f.read(4) 

717 if data_type != b"TMCH": 

718 raise ValueError("Invalid datatype: {0}".format(data_type)) 

719 

720 self.run = unpack(">I", f.read(4))[0] 

721 self.udp_sequence_number = unpack(">I", f.read(4))[0] 

722 self.utc_seconds = unpack(">I", f.read(4))[0] 

723 self.nanoseconds = unpack(">I", f.read(4))[0] * 16 

724 self.dom_id = unpack(">I", f.read(4))[0] 

725 self.dom_status_0 = unpack(">I", f.read(4))[0] 

726 self.dom_status_1 = unpack(">I", f.read(4))[0] 

727 self.dom_status_2 = unpack(">I", f.read(4))[0] 

728 self.dom_status_3 = unpack(">I", f.read(4))[0] 

729 self.pmt_rates = [r * 10.0 for r in unpack(">" + 31 * "I", f.read(31 * 4))] 

730 self.hrvbmp = unpack(">I", f.read(4))[0] 

731 self.flags = unpack(">I", f.read(4))[0] 

732 # flags: 

733 # bit 0: AHRS valid 

734 if version is None: 

735 # bit 3-1: structure version 

736 # 000 - 1, 001 - 2, 010 - unused, 011 - 3 

737 self.version = int(bin((self.flags >> 1) & 7), 2) + 1 

738 else: 

739 self.version = version 

740 self.yaw, self.pitch, self.roll = unpack(">fff", f.read(12)) 

741 self.A = unpack(">fff", f.read(12)) # AHRS: Ax, Ay, Az 

742 self.G = unpack(">fff", f.read(12)) # AHRS: Gx, Gy, Gz 

743 self.H = unpack(">fff", f.read(12)) # AHRS: Hx, Hy, Hz 

744 self.temp = unpack(">H", f.read(2))[0] / 100.0 

745 self.humidity = unpack(">H", f.read(2))[0] / 100.0 

746 self.tdcfull = unpack(">I", f.read(4))[0] 

747 self.aesfull = unpack(">I", f.read(4))[0] 

748 self.flushc = unpack(">I", f.read(4))[0] 

749 

750 if self.version >= 2: 

751 self.ts_duration_ms = unpack(">I", f.read(4))[0] 

752 if self.version >= 3: 

753 self.tdc_supertime_fifo_size = unpack(">H", f.read(2))[0] 

754 self.aes_supertime_fifo_size = unpack(">H", f.read(2))[0] 

755 

756 def __str__(self): 

757 return str(vars(self)) 

758 

759 def __repr__(self): 

760 return self.__str__() 

761 

762 

763class TMCHRepump(Module): 

764 """Takes a IO_MONIT raw dump and replays it.""" 

765 

766 def configure(self): 

767 filename = self.require("filename") 

768 self.format_version = self.get("format_version", default=None) 

769 self.fobj = open(filename, "rb") 

770 self.blobs = self.blob_generator() 

771 

772 def process(self, blob): 

773 return next(self.blobs) 

774 

775 def blob_generator(self): 

776 while True: 

777 blob = Blob() 

778 datatype = self.fobj.read(4) 

779 if len(datatype) == 0: 

780 return 

781 if datatype == b"TMCH": 

782 self.fobj.seek(-4, 1) 

783 blob["TMCHData"] = TMCHData(self.fobj, version=self.format_version) 

784 yield blob 

785 

786 def finish(self): 

787 self.fobj.close() 

788 

789 def __iter__(self): 

790 return self 

791 

792 def __next__(self): 

793 return next(self.blobs) 

794 

795 

796class DMMonitor(object): 

797 """A class which provides access to the Detector Manager parameters. 

798 

799 Examples 

800 -------- 

801 >>> import km3pipe as kp 

802 >>> dmm = kp.io.daq.DMMonitor('192.168.0.120', base='clb/outparams') 

803 >>> session = dmm.start_session('test', ['wr_mu/1/0','wr_mu/1/1']) 

804 >>> for values in session: 

805 print(values) 

806 

807 """ 

808 

809 def __init__(self, host, port=1302, base=""): 

810 self._host = host 

811 self._port = port 

812 self._base = base 

813 self._url = "http://{}:{}/mon/{}".format(self._host, self._port, self._base) 

814 self._available_parameters = [] 

815 self.log = get_logger(self.__class__.__name__) 

816 self.cprint = get_printer(self.__class__.__name__) 

817 

818 @property 

819 def available_parameters(self): 

820 if not self._available_parameters: 

821 self._get_available_parameters() 

822 return self._available_parameters 

823 

824 def _get_available_parameters(self): 

825 self._available_parameters = json.loads(urlopen(self._url).read()) 

826 

827 def get(self, path): 

828 return json.loads( 

829 urlopen( 

830 "http://{}:{}/mon/{}/{}".format( 

831 self._host, self._port, self._base, path 

832 ) 

833 ).read() 

834 ) 

835 

836 def start_session(self, name, paths, interval=10): 

837 self.cprint("Starting session '{}'".format(name)) 

838 ret = urlopen( 

839 "http://{}:{}/monshortdef?name={}&paths={}".format( 

840 self._host, 

841 self._port, 

842 name, 

843 ",".join(["/mon/{}/{}".format(self._base, p) for p in paths]), 

844 ) 

845 ).read() 

846 if ret != b"OK": 

847 self.log.error("Could not start session") 

848 return [] 

849 return self._session(name, interval) 

850 

851 def _session(self, name, interval): 

852 url = "http://{}:{}/monshort/{}".format(self._host, self._port, name) 

853 while True: 

854 try: 

855 yield json.loads(urlopen(url).read()) 

856 except URLError as e: 

857 self.log.error( 

858 "Error when trying to connect to the DM: %s\n" 

859 "Retry in %d seconds..." % (e, interval) 

860 ) 

861 time.sleep(interval)