Coverage for src/km3pipe/io/daq.py: 76%
442 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-19 03:14 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-19 03:14 +0000
1# Filename: daq.py
2# pylint: disable=R0903
3"""
4Pumps for the DAQ data formats.
6"""
8from collections import namedtuple
9from io import BytesIO
10import json
11import math
12import struct
13from struct import unpack
14import time
15import pprint
16from urllib.request import urlopen, URLError
18import numpy as np
20from thepipe import Module, Blob
21from km3pipe.dataclasses import Table
22from km3pipe.sys import ignored
23from km3pipe.logger import get_logger, get_printer
25__author__ = "Tamas Gal"
26__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
27__credits__ = []
28__license__ = "MIT"
29__maintainer__ = "Tamas Gal"
30__email__ = "tgal@km3net.de"
31__status__ = "Development"
33log = get_logger(__name__) # pylint: disable=C0103
35DATA_TYPES = {
36 101: "DAQSuperFrame",
37 201: "DAQSummaryFrame",
38 # Using the same class for all timeslices since they are structurally
39 # identical (until now)
40 1001: "DAQTimeslice", # Type of erroneous timeslice data
41 1002: "DAQTimeslice", # L0
42 1003: "DAQTimeslice", # L1
43 1004: "DAQTimeslice", # L2
44 1005: "DAQTimeslice", # SN
45 2001: "DAQSummaryslice",
46 10001: "DAQEvent",
47}
48MINIMAL_RATE_HZ = 2.0e3
49MAXIMAL_RATE_HZ = 2.0e6
52class TimesliceParser(Module):
53 """Preliminary parser for DAQTimeslice"""
55 def configure(self):
56 self.legacy = self.get("legacy", default=False)
58 def _get_raw_data(self, blob):
59 if "CHPrefix" in blob:
60 if not str(blob["CHPrefix"].tag).startswith("IO_TS"):
61 log.info("Not an IO_TS* blob")
62 return blob
63 return BytesIO(blob["CHData"])
64 if "FileIO" in blob:
65 return blob["FileIO"]
66 if "RawBytes" in blob:
67 return BytesIO(blob["RawBytes"])
69 def process(self, blob):
70 data = self._get_raw_data(blob)
71 if data is None:
72 return blob
73 try:
74 ts_info, ts_frameinfos, ts_hits = self._parse_timeslice(data)
75 except struct.error:
76 log.error("Could not parse Timeslice")
77 log.error(blob.keys())
78 else:
79 blob["TSHits"] = ts_hits
80 blob["TimesliceInfo"] = ts_info
81 blob["TimesliceFrameInfos"] = ts_frameinfos
82 return blob
84 def _parse_timeslice(self, data):
85 tsl_size, datatype = unpack("<ii", data.read(8))
86 if not self.legacy:
87 version = unpack("<h", data.read(2))[0]
88 if version != 1:
89 raise ValueError(
90 "Unsupported DAQTimeslice version ({}) or legacy DAQ. "
91 "Make sure Jpp v13+ is used or pass 'legacy=True' "
92 "to {}.".format(version, self.__class__.__name__)
93 )
94 det_id, run, sqnr = unpack("<iii", data.read(12))
95 timestamp, ns_ticks, n_frames = unpack("<iii", data.read(12))
97 ts_info = Table.from_template(
98 {
99 "frame_index": sqnr,
100 "slice_id": 0,
101 "timestamp": timestamp,
102 "nanoseconds": ns_ticks * 16,
103 "n_frames": n_frames,
104 },
105 "TimesliceInfo",
106 )
107 ts_frameinfos = {}
109 _dom_ids = []
110 _channel_ids = []
111 _times = []
112 _tots = []
113 for _ in range(n_frames):
114 frame_size, datatype = unpack("<ii", data.read(8))
115 det_id, run, sqnr = unpack("<iii", data.read(12))
116 timestamp, ns_ticks, dom_id = unpack("<iii", data.read(12))
117 dataqueue_status = unpack("<i", data.read(4))[0]
118 dom_status = unpack("<iiii", data.read(4 * 4))
119 n_hits = unpack("<i", data.read(4))[0]
120 ts_frameinfos[dom_id] = Table.from_template(
121 {
122 "det_id": det_id,
123 "run_id": run,
124 "sqnr": sqnr,
125 "timestamp": timestamp,
126 "nanoseconds": ns_ticks * 16,
127 "dom_id": dom_id,
128 "dataqueue_status": dataqueue_status,
129 "dom_status": dom_status,
130 "n_hits": n_hits,
131 },
132 "TimesliceFrameInfo",
133 )
134 for j in range(n_hits):
135 hit = unpack("!BlB", data.read(6))
136 _dom_ids.append(dom_id)
137 _channel_ids.append(hit[0])
138 _times.append(hit[1])
139 _tots.append(hit[2])
141 ts_hits = Table(
142 {
143 "channel_id": np.array(_channel_ids),
144 "dom_id": np.array(_dom_ids),
145 "time": np.array(_times),
146 "tot": np.array(_tots),
147 },
148 name="TimesliceHits",
149 h5loc="/timeslice_hits",
150 split_h5=True,
151 )
152 return ts_info, ts_frameinfos, ts_hits
155class RePump(Module):
156 """A pump for binary DAQ files.
159 This pump can be used to replay raw dumps e.g. created with the ``daqsample``
160 tool. It creates the same structures as the ``kp.io.ch.CHPump`` and thus
161 suited to test online processing pipelines with offline files.
163 """
165 def configure(self):
166 self.filename = self.require("filename")
167 self.fobj = open(self.filename, "rb")
169 def process(self, blob):
170 try:
171 length, data_type = unpack("<ii", self.fobj.read(8))
172 self.fobj.seek(-8, 1)
173 except struct.error:
174 raise StopIteration
175 data = self.fobj.read(length)
176 blob["RawBytes"] = data
177 return blob
179 def finish(self):
180 self.fobj.close()
183class DAQPump(Module):
184 """A pump for binary DAQ files. Deprecated!"""
186 def configure(self):
187 self.filename = self.require("filename")
188 self.legacy = self.get("legacy", default=False)
189 self.frame_positions = []
190 self.index = 0
192 self.blob_file = self.open_file(self.filename)
193 self.determine_frame_positions()
195 def next_blob(self):
196 """Get the next frame from file"""
197 blob_file = self.blob_file
198 try:
199 preamble = DAQPreamble(file_obj=blob_file)
200 except struct.error:
201 raise StopIteration
203 try:
204 data_type = DATA_TYPES[preamble.data_type]
205 except KeyError:
206 log.error("Unknown datatype: {0}".format(preamble.data_type))
207 data_type = "Unknown"
209 blob = Blob()
210 blob[data_type] = None
211 blob["DAQPreamble"] = preamble
213 if data_type == "DAQSummaryslice":
214 daq_frame = DAQSummaryslice(blob_file, legacy=self.legacy)
215 blob[data_type] = daq_frame
216 blob["DAQHeader"] = daq_frame.header
217 elif data_type == "DAQEvent":
218 daq_frame = DAQEvent(blob_file, legacy=self.legacy)
219 blob[data_type] = daq_frame
220 blob["DAQHeader"] = daq_frame.header
221 else:
222 log.warning(
223 "Skipping DAQ frame with data type code '{0}'.".format(
224 preamble.data_type
225 )
226 )
227 blob_file.seek(preamble.length - DAQPreamble.size, 1)
229 return blob
231 def seek_to_frame(self, index):
232 """Move file pointer to the frame with given index."""
233 pointer_position = self.frame_positions[index]
234 self.blob_file.seek(pointer_position, 0)
236 def get_blob(self, index):
237 """Return blob at given index."""
238 self.seek_to_frame(index)
239 return self.next_blob()
241 def determine_frame_positions(self):
242 """Record the file pointer position of each frame"""
243 self.blob_file.seek(0, 0)
244 with ignored(struct.error):
245 while True:
246 pointer_position = self.blob_file.tell()
247 length = struct.unpack("<i", self.blob_file.read(4))[0]
248 self.blob_file.seek(length - 4, 1)
249 self.frame_positions.append(pointer_position)
250 self.blob_file.seek(0, 0)
251 log.info("Found {0} frames.".format(len(self.frame_positions)))
253 def process(self, blob):
254 """Pump the next blob to the modules"""
255 return self.next_blob()
257 def finish(self):
258 """Clean everything up"""
259 self.blob_file.close()
261 def __len__(self):
262 if not self.frame_positions:
263 self.determine_frame_positions()
264 return len(self.frame_positions)
266 def __iter__(self):
267 return self
269 def __next__(self):
270 try:
271 blob = self.get_blob(self.index)
272 except IndexError:
273 self.index = 0
274 raise StopIteration
275 self.index += 1
276 return blob
278 def __getitem__(self, index):
279 if isinstance(index, int):
280 return self.get_blob(index)
281 elif isinstance(index, slice):
282 return self._slice_generator(index)
283 else:
284 raise TypeError("index must be int or slice")
286 def _slice_generator(self, index):
287 """A simple slice generator for iterations"""
288 start, stop, step = index.indices(len(self))
289 for i in range(start, stop, step):
290 yield self.get_blob(i)
293class DAQProcessor(Module):
294 def configure(self):
295 self.legacy = self.get("legacy", default=False)
296 self.index = 0
297 self.event_id = 0
299 def process(self, blob):
300 tag = str(blob["CHPrefix"].tag)
301 data = blob["CHData"]
303 processor = None
305 if tag == "IO_EVT":
306 processor = self.process_event
307 if tag == "IO_SUM":
308 processor = self.process_summaryslice
309 if tag == "IO_OLINE":
310 processor = self.process_online_reco
312 if processor is None:
313 self.log.error("Unsupported tag: %s", tag)
314 return
316 try:
317 processor(data, blob)
318 except (struct.error, ValueError) as e:
319 self.log.error("Corrupt data received. Skipping...\n" "Error: %s", e)
320 return
322 return blob
324 def process_event(self, data, blob):
325 data_io = BytesIO(data)
326 preamble = DAQPreamble(file_obj=data_io) # noqa
327 event = DAQEvent(file_obj=data_io, legacy=self.legacy)
328 header = event.header
330 event_info = Table.from_template(
331 {
332 "det_id": header.det_id,
333 # 'frame_index': self.index, # header.time_slice,
334 "frame_index": header.time_slice,
335 "livetime_sec": 0,
336 "mc_id": 0,
337 "mc_t": 0,
338 "n_events_gen": 0,
339 "n_files_gen": 0,
340 "overlays": event.overlays,
341 "trigger_counter": event.trigger_counter,
342 "trigger_mask": event.trigger_mask,
343 "utc_nanoseconds": header.ticks * 16,
344 "utc_seconds": header.time_stamp,
345 "weight_w1": 0,
346 "weight_w2": 0,
347 "weight_w3": 0, # MC weights
348 "run_id": header.run, # run id
349 "group_id": self.event_id,
350 },
351 "EventInfo",
352 )
353 blob["EventInfo"] = event_info
355 self.event_id += 1
356 self.index += 1
358 hits = event.snapshot_hits
359 n_hits = event.n_snapshot_hits
360 if n_hits == 0:
361 self.log.warning("No hits found in event.")
362 return
364 # This might not be needed
365 triggereds = np.zeros(n_hits)
366 triggered_map = {}
367 for thit in event.triggered_hits:
368 # TODO: switch to thit.trigger_mask instead of True!
369 triggered_map[(thit.dom_id, thit.channel_id, thit.time, thit.tot)] = True
370 for idx, hit in enumerate(hits):
371 triggereds[idx] = tuple(hit) in triggered_map
373 hit_series = Table.from_template(
374 {
375 "channel_id": hits.channel_id,
376 "dom_id": hits.dom_id,
377 "time": hits.time,
378 "tot": hits.tot,
379 "triggered": triggereds, # TODO: switch to trigger_mask!
380 "group_id": self.event_id,
381 },
382 "Hits",
383 )
385 blob["Hits"] = hit_series
387 def process_summaryslice(self, data, blob):
388 data_io = BytesIO(data)
389 preamble = DAQPreamble(file_obj=data_io) # noqa
390 summaryslice = DAQSummaryslice(file_obj=data_io, legacy=self.legacy)
391 blob["RawSummaryslice"] = summaryslice
393 def process_online_reco(self, data, blob):
394 data_io = BytesIO(data)
395 preamble = DAQPreamble(file_obj=data_io) # noqa
396 _data = unpack("<iiiQI", data_io.read(4 + 4 + 4 + 8 + 4))
397 det_id, run_id, frame_index, trigger_counter, utc_seconds = _data
398 shower_reco = unpack("9d", data_io.read(9 * 8))
399 shower_meta = unpack("3i", data_io.read(12))
400 track_reco = unpack("9d", data_io.read(9 * 8))
401 track_meta = unpack("3i", data_io.read(12))
402 print(
403 "Shower: x/y/z/dx/dy/dz/E/Q/t (type/status/ndf): ", shower_reco, shower_meta
404 )
405 print("Track: x/y/z/dx/dy/dz/E/Q/t (type/status/ndf): ", track_reco, track_meta)
406 blob["ReconstructionInfo"] = Table(
407 {
408 "det_id": det_id,
409 "run_id": run_id,
410 "frame_index": frame_index,
411 "trigger_counter": trigger_counter,
412 "utc_seconds": utc_seconds,
413 },
414 h5loc="reco",
415 split_h5=True,
416 name="Reconstructions",
417 )
418 args = track_reco + track_meta
419 blob["RecoTrack"] = RecoTrack(*args)
420 args = shower_reco + shower_meta
421 blob["RecoShower"] = RecoShower(*args)
424RecoTrack = namedtuple("RecoTrack", "x y z dx dy dz E Q t type status ndf")
425RecoShower = namedtuple("RecoShower", "x y z dx dy dz E Q t type status ndf")
428class DAQPreamble(object):
429 """Wrapper for the JDAQPreamble binary format.
431 Parameters
432 ----------
433 byte_data : bytes (optional)
434 The binary file, where the file pointer is at the beginning of the header.
435 file_obj : file (optional)
436 The binary file, where the file pointer is at the beginning of the header.
438 Attributes
439 ----------
440 size : int
441 The size of the original DAQ byte representation.
442 data_type : int
443 The data type of the following frame. The coding is stored in the
444 ``DATA_TYPES``.
446 """
448 size = 8
450 def __init__(self, byte_data=None, file_obj=None):
451 self.length = None
452 self.data_type = None
453 if byte_data:
454 self._parse_byte_data(byte_data)
455 if file_obj:
456 self._parse_file(file_obj)
458 def _parse_byte_data(self, byte_data):
459 """Extract the values from byte string."""
460 self.length, self.data_type = unpack("<ii", byte_data[: self.size])
462 def _parse_file(self, file_obj):
463 """Directly read from file handler.
465 Note that this will move the file pointer.
467 """
468 byte_data = file_obj.read(self.size)
469 self._parse_byte_data(byte_data)
471 def __repr__(self):
472 description = "Length: {0}\nDataType: {1}".format(self.length, self.data_type)
473 return description
476class DAQHeader(object):
477 """Wrapper for the JDAQHeader binary format.
479 Parameters
480 ----------
481 byte_data : bytes (optional)
482 The binary file, where the file pointer is at the beginning of the header.
483 file_obj : file (optional)
484 The binary file, where the file pointer is at the beginning of the header.
486 Attributes
487 ----------
488 size : int
489 The size of the original DAQ byte representation.
491 """
493 size = 20
495 def __init__(self, byte_data=None, file_obj=None):
496 self.run = None
497 self.time_slice = None
498 self.time_stamp = None
499 self.wr_status = None
500 if byte_data:
501 self._parse_byte_data(byte_data)
502 if file_obj:
503 self._parse_file(file_obj)
505 def _parse_byte_data(self, byte_data):
506 """Extract the values from byte string."""
507 chunks = unpack("<iiiii", byte_data[: self.size])
508 det_id, run, time_slice, time_stamp, ticks = chunks
509 self.det_id = det_id
510 self.run = run
511 self.time_slice = time_slice
512 # most significant bit is the WR status
513 self.wr_status = time_stamp & 0x80000000
514 # masking the most significant bit, which is the WR status
515 self.time_stamp = time_stamp & 0x7FFFFFFF
516 self.ticks = ticks
518 def _parse_file(self, file_obj):
519 """Directly read from file handler.
521 Note:
522 This will move the file pointer.
524 """
525 byte_data = file_obj.read(self.size)
526 self._parse_byte_data(byte_data)
528 def __repr__(self):
529 description = "Run: {0}\nTime slice: {1}\nTime stamp: {2} ({3})".format(
530 self.run, self.time_slice, self.time_stamp, self.ticks
531 )
532 return description
535class DAQSummaryslice(object):
536 """Wrapper for the JDAQSummarySlice binary format.
538 Parameters
539 ----------
540 file_obj : file (optional)
541 The binary file, where the file pointer is at the beginning of the header.
543 Attributes
544 ----------
545 n_summary_frames : int
546 The number of summary frames.
547 summary_frames : dict
548 The PMT rates for each DOM. The key is the DOM identifier and the
549 corresponding value is a sorted list of PMT rates in [Hz].
550 dom_rates : dict
551 The overall DOM rate for each DOM.
553 """
555 def __init__(self, file_obj, legacy=False):
556 if not legacy:
557 version = unpack("<h", file_obj.read(2))[0]
558 if version != 6:
559 raise ValueError(
560 "Unsupported {} version ({}) or legacy DAQ. "
561 "Make sure Jpp v13+ is used or pass 'legacy=True' "
562 "to the init.".format(self.__class__.__name__, version)
563 )
564 self.header = DAQHeader(file_obj=file_obj)
565 self.n_summary_frames = unpack("<i", file_obj.read(4))[0]
566 self.summary_frames = {}
567 self.dq_status = {}
568 self.dom_status = {}
569 self.dom_rates = {}
571 self._parse_summary_frames(file_obj)
573 def _parse_summary_frames(self, file_obj):
574 """Iterate through the byte data and fill the summary_frames"""
575 for _ in range(self.n_summary_frames):
576 dom_id = unpack("<i", file_obj.read(4))[0]
577 dq_status = file_obj.read(4) # probably dom status? # noqa
578 dom_status = unpack("<iiii", file_obj.read(16))
579 raw_rates = unpack("b" * 31, file_obj.read(31))
580 pmt_rates = [self._get_rate(value) for value in raw_rates]
581 self.summary_frames[dom_id] = pmt_rates
582 self.dq_status[dom_id] = dq_status
583 self.dom_status[dom_id] = dom_status
584 self.dom_rates[dom_id] = np.sum(pmt_rates)
586 def _get_rate(self, value):
587 """Return the rate in Hz from the short int value"""
588 if value == 0:
589 return 0
590 else:
591 return MINIMAL_RATE_HZ * math.exp(value * self._get_factor())
593 def _get_factor(self):
594 return math.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255
597class DAQEvent(object):
598 """Wrapper for the JDAQEvent binary format.
600 Parameters
601 ----------
602 file_obj : file
603 The binary file, where the file pointer is at the beginning of the header.
606 Attributes
607 ----------
608 trigger_counter : int
609 Incremental identifier of the occurred trigger.
610 trigger_mask : int
611 The trigger type(s) satisfied.
612 overlays : int
613 Number of merged events.
614 n_triggered_hits : int
615 Number of hits satisfying the trigger conditions.
616 n_snapshot_hits : int
617 Number of snapshot hits.
618 triggered_hits : np.recarray
619 Array of triggered hits (fields: dom_id, pmt_id, tdc_time, tot, trigger_mask)
620 snapshot_hits : np.recarray
621 A list of snapshot hits (fields: dom_id, pmt_id, tdc_time, tot)
623 """
625 triggered_hits_dt = np.dtype(
626 [
627 ("dom_id", "<i"),
628 ("channel_id", np.uint8),
629 ("time", ">I"),
630 ("tot", np.uint8),
631 ("trigger_mask", "<Q"),
632 ]
633 )
634 # Numba chokes on big endian and the time is big, so we need to reinterpret
635 triggered_hits_dt_final = np.dtype(
636 [
637 ("dom_id", "<i"),
638 ("channel_id", np.uint8),
639 ("time", "<I"),
640 ("tot", np.uint8),
641 ("trigger_mask", "<Q"),
642 ]
643 )
644 snapshot_hits_dt = np.dtype(
645 [
646 ("dom_id", "<i"),
647 ("channel_id", np.uint8),
648 ("time", ">I"),
649 ("tot", np.uint8),
650 ]
651 )
652 # Numba chokes on big endian and the time is big, so we need to reinterpret
653 snapshot_hits_dt_final = np.dtype(
654 [
655 ("dom_id", "<i"),
656 ("channel_id", np.uint8),
657 ("time", "<I"),
658 ("tot", np.uint8),
659 ]
660 )
662 def __init__(self, file_obj, legacy=False):
663 if not legacy:
664 version = unpack("<h", file_obj.read(2))[0]
665 if version != 4:
666 raise ValueError(
667 "Unsupported {} version ({}) or legacy DAQ. "
668 "Make sure Jpp v13+ is used or pass 'legacy=True' "
669 "to the init.".format(self.__class__.__name__, version)
670 )
671 self.header = DAQHeader(file_obj=file_obj)
672 self.trigger_counter = unpack("<Q", file_obj.read(8))[0]
673 self.trigger_mask = unpack("<Q", file_obj.read(8))[0]
674 self.overlays = unpack("<i", file_obj.read(4))[0]
676 self.n_triggered_hits = unpack("<i", file_obj.read(4))[0]
677 self.triggered_hits = self._parse_triggered_hits(file_obj)
679 self.n_snapshot_hits = unpack("<i", file_obj.read(4))[0]
680 self.snapshot_hits = self._parse_snapshot_hits(file_obj)
682 def _parse_triggered_hits(self, file_obj):
683 """Parse and store triggered hits."""
684 raw_data = file_obj.read(
685 self.triggered_hits_dt.itemsize * self.n_triggered_hits
686 )
687 arr = np.frombuffer(raw_data, self.triggered_hits_dt).view(np.recarray)
688 return arr.astype(self.triggered_hits_dt_final)
690 def _parse_snapshot_hits(self, file_obj):
691 """Parse and store snapshot hits."""
692 raw_data = file_obj.read(self.snapshot_hits_dt.itemsize * self.n_snapshot_hits)
693 arr = np.frombuffer(raw_data, self.snapshot_hits_dt).view(np.recarray)
694 return arr.astype(self.snapshot_hits_dt_final)
696 def __repr__(self):
697 string = "\n".join(
698 (
699 " Number of triggered hits: " + str(self.n_triggered_hits),
700 " Number of snapshot hits: " + str(self.n_snapshot_hits),
701 )
702 )
703 string += "\nTriggered hits:\n"
704 string += pprint.pformat(self.triggered_hits)
705 string += "\nSnapshot hits:\n"
706 string += pprint.pformat(self.snapshot_hits)
707 return string
710class TMCHData(object):
711 """Monitoring Channel data."""
713 def __init__(self, file_obj, version=None):
714 f = file_obj
716 data_type = f.read(4)
717 if data_type != b"TMCH":
718 raise ValueError("Invalid datatype: {0}".format(data_type))
720 self.run = unpack(">I", f.read(4))[0]
721 self.udp_sequence_number = unpack(">I", f.read(4))[0]
722 self.utc_seconds = unpack(">I", f.read(4))[0]
723 self.nanoseconds = unpack(">I", f.read(4))[0] * 16
724 self.dom_id = unpack(">I", f.read(4))[0]
725 self.dom_status_0 = unpack(">I", f.read(4))[0]
726 self.dom_status_1 = unpack(">I", f.read(4))[0]
727 self.dom_status_2 = unpack(">I", f.read(4))[0]
728 self.dom_status_3 = unpack(">I", f.read(4))[0]
729 self.pmt_rates = [r * 10.0 for r in unpack(">" + 31 * "I", f.read(31 * 4))]
730 self.hrvbmp = unpack(">I", f.read(4))[0]
731 self.flags = unpack(">I", f.read(4))[0]
732 # flags:
733 # bit 0: AHRS valid
734 if version is None:
735 # bit 3-1: structure version
736 # 000 - 1, 001 - 2, 010 - unused, 011 - 3
737 self.version = int(bin((self.flags >> 1) & 7), 2) + 1
738 else:
739 self.version = version
740 self.yaw, self.pitch, self.roll = unpack(">fff", f.read(12))
741 self.A = unpack(">fff", f.read(12)) # AHRS: Ax, Ay, Az
742 self.G = unpack(">fff", f.read(12)) # AHRS: Gx, Gy, Gz
743 self.H = unpack(">fff", f.read(12)) # AHRS: Hx, Hy, Hz
744 self.temp = unpack(">H", f.read(2))[0] / 100.0
745 self.humidity = unpack(">H", f.read(2))[0] / 100.0
746 self.tdcfull = unpack(">I", f.read(4))[0]
747 self.aesfull = unpack(">I", f.read(4))[0]
748 self.flushc = unpack(">I", f.read(4))[0]
750 if self.version >= 2:
751 self.ts_duration_ms = unpack(">I", f.read(4))[0]
752 if self.version >= 3:
753 self.tdc_supertime_fifo_size = unpack(">H", f.read(2))[0]
754 self.aes_supertime_fifo_size = unpack(">H", f.read(2))[0]
756 def __str__(self):
757 return str(vars(self))
759 def __repr__(self):
760 return self.__str__()
763class TMCHRepump(Module):
764 """Takes a IO_MONIT raw dump and replays it."""
766 def configure(self):
767 filename = self.require("filename")
768 self.format_version = self.get("format_version", default=None)
769 self.fobj = open(filename, "rb")
770 self.blobs = self.blob_generator()
772 def process(self, blob):
773 return next(self.blobs)
775 def blob_generator(self):
776 while True:
777 blob = Blob()
778 datatype = self.fobj.read(4)
779 if len(datatype) == 0:
780 return
781 if datatype == b"TMCH":
782 self.fobj.seek(-4, 1)
783 blob["TMCHData"] = TMCHData(self.fobj, version=self.format_version)
784 yield blob
786 def finish(self):
787 self.fobj.close()
789 def __iter__(self):
790 return self
792 def __next__(self):
793 return next(self.blobs)
796class DMMonitor(object):
797 """A class which provides access to the Detector Manager parameters.
799 Examples
800 --------
801 >>> import km3pipe as kp
802 >>> dmm = kp.io.daq.DMMonitor('192.168.0.120', base='clb/outparams')
803 >>> session = dmm.start_session('test', ['wr_mu/1/0','wr_mu/1/1'])
804 >>> for values in session:
805 print(values)
807 """
809 def __init__(self, host, port=1302, base=""):
810 self._host = host
811 self._port = port
812 self._base = base
813 self._url = "http://{}:{}/mon/{}".format(self._host, self._port, self._base)
814 self._available_parameters = []
815 self.log = get_logger(self.__class__.__name__)
816 self.cprint = get_printer(self.__class__.__name__)
818 @property
819 def available_parameters(self):
820 if not self._available_parameters:
821 self._get_available_parameters()
822 return self._available_parameters
824 def _get_available_parameters(self):
825 self._available_parameters = json.loads(urlopen(self._url).read())
827 def get(self, path):
828 return json.loads(
829 urlopen(
830 "http://{}:{}/mon/{}/{}".format(
831 self._host, self._port, self._base, path
832 )
833 ).read()
834 )
836 def start_session(self, name, paths, interval=10):
837 self.cprint("Starting session '{}'".format(name))
838 ret = urlopen(
839 "http://{}:{}/monshortdef?name={}&paths={}".format(
840 self._host,
841 self._port,
842 name,
843 ",".join(["/mon/{}/{}".format(self._base, p) for p in paths]),
844 )
845 ).read()
846 if ret != b"OK":
847 self.log.error("Could not start session")
848 return []
849 return self._session(name, interval)
851 def _session(self, name, interval):
852 url = "http://{}:{}/monshort/{}".format(self._host, self._port, name)
853 while True:
854 try:
855 yield json.loads(urlopen(url).read())
856 except URLError as e:
857 self.log.error(
858 "Error when trying to connect to the DM: %s\n"
859 "Retry in %d seconds..." % (e, interval)
860 )
861 time.sleep(interval)