Source code for

# Filename:
# pylint: disable=R0903
Pumps for the DAQ data formats.


from collections import namedtuple
from io import BytesIO
import json
import math
import struct
from struct import unpack
import time
import pprint
from urllib.request import urlopen, URLError

import numpy as np

from thepipe import Module, Blob
from km3pipe.dataclasses import Table
from km3pipe.sys import ignored
from km3pipe.logger import get_logger, get_printer

__author__ = "Tamas Gal"
__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
__credits__ = []
__license__ = "MIT"
__maintainer__ = "Tamas Gal"
__email__ = ""
__status__ = "Development"

[docs] log = get_logger(__name__) # pylint: disable=C0103
[docs] DATA_TYPES = { 101: "DAQSuperFrame", 201: "DAQSummaryFrame", # Using the same class for all timeslices since they are structurally # identical (until now) 1001: "DAQTimeslice", # Type of erroneous timeslice data 1002: "DAQTimeslice", # L0 1003: "DAQTimeslice", # L1 1004: "DAQTimeslice", # L2 1005: "DAQTimeslice", # SN 2001: "DAQSummaryslice", 10001: "DAQEvent", }
[docs] MINIMAL_RATE_HZ = 2.0e3
[docs] MAXIMAL_RATE_HZ = 2.0e6
[docs] class TimesliceParser(Module): """Preliminary parser for DAQTimeslice"""
[docs] def configure(self): self.legacy = self.get("legacy", default=False)
def _get_raw_data(self, blob): if "CHPrefix" in blob: if not str(blob["CHPrefix"].tag).startswith("IO_TS"):"Not an IO_TS* blob") return blob return BytesIO(blob["CHData"]) if "FileIO" in blob: return blob["FileIO"] if "RawBytes" in blob: return BytesIO(blob["RawBytes"])
[docs] def process(self, blob): data = self._get_raw_data(blob) if data is None: return blob try: ts_info, ts_frameinfos, ts_hits = self._parse_timeslice(data) except struct.error: log.error("Could not parse Timeslice") log.error(blob.keys()) else: blob["TSHits"] = ts_hits blob["TimesliceInfo"] = ts_info blob["TimesliceFrameInfos"] = ts_frameinfos return blob
def _parse_timeslice(self, data): tsl_size, datatype = unpack("<ii", if not self.legacy: version = unpack("<h",[0] if version != 1: raise ValueError( "Unsupported DAQTimeslice version ({}) or legacy DAQ. " "Make sure Jpp v13+ is used or pass 'legacy=True' " "to {}.".format(version, self.__class__.__name__) ) det_id, run, sqnr = unpack("<iii", timestamp, ns_ticks, n_frames = unpack("<iii", ts_info = Table.from_template( { "frame_index": sqnr, "slice_id": 0, "timestamp": timestamp, "nanoseconds": ns_ticks * 16, "n_frames": n_frames, }, "TimesliceInfo", ) ts_frameinfos = {} _dom_ids = [] _channel_ids = [] _times = [] _tots = [] for _ in range(n_frames): frame_size, datatype = unpack("<ii", det_id, run, sqnr = unpack("<iii", timestamp, ns_ticks, dom_id = unpack("<iii", dataqueue_status = unpack("<i",[0] dom_status = unpack("<iiii", * 4)) n_hits = unpack("<i",[0] ts_frameinfos[dom_id] = Table.from_template( { "det_id": det_id, "run_id": run, "sqnr": sqnr, "timestamp": timestamp, "nanoseconds": ns_ticks * 16, "dom_id": dom_id, "dataqueue_status": dataqueue_status, "dom_status": dom_status, "n_hits": n_hits, }, "TimesliceFrameInfo", ) for j in range(n_hits): hit = unpack("!BlB", _dom_ids.append(dom_id) _channel_ids.append(hit[0]) _times.append(hit[1]) _tots.append(hit[2]) ts_hits = Table( { "channel_id": np.array(_channel_ids), "dom_id": np.array(_dom_ids), "time": np.array(_times), "tot": np.array(_tots), }, name="TimesliceHits", h5loc="/timeslice_hits", split_h5=True, ) return ts_info, ts_frameinfos, ts_hits
[docs] class RePump(Module): """A pump for binary DAQ files. This pump can be used to replay raw dumps e.g. created with the ``daqsample`` tool. It creates the same structures as the ```` and thus suited to test online processing pipelines with offline files. """
[docs] def configure(self): self.filename = self.require("filename") self.fobj = open(self.filename, "rb")
[docs] def process(self, blob): try: length, data_type = unpack("<ii",, 1) except struct.error: raise StopIteration data = blob["RawBytes"] = data return blob
[docs] def finish(self): self.fobj.close()
[docs] class DAQPump(Module): """A pump for binary DAQ files. Deprecated!"""
[docs] def configure(self): self.filename = self.require("filename") self.legacy = self.get("legacy", default=False) self.frame_positions = [] self.index = 0 self.blob_file = self.open_file(self.filename) self.determine_frame_positions()
[docs] def next_blob(self): """Get the next frame from file""" blob_file = self.blob_file try: preamble = DAQPreamble(file_obj=blob_file) except struct.error: raise StopIteration try: data_type = DATA_TYPES[preamble.data_type] except KeyError: log.error("Unknown datatype: {0}".format(preamble.data_type)) data_type = "Unknown" blob = Blob() blob[data_type] = None blob["DAQPreamble"] = preamble if data_type == "DAQSummaryslice": daq_frame = DAQSummaryslice(blob_file, legacy=self.legacy) blob[data_type] = daq_frame blob["DAQHeader"] = daq_frame.header elif data_type == "DAQEvent": daq_frame = DAQEvent(blob_file, legacy=self.legacy) blob[data_type] = daq_frame blob["DAQHeader"] = daq_frame.header else: log.warning( "Skipping DAQ frame with data type code '{0}'.".format( preamble.data_type ) ) - DAQPreamble.size, 1) return blob
[docs] def seek_to_frame(self, index): """Move file pointer to the frame with given index.""" pointer_position = self.frame_positions[index], 0)
[docs] def get_blob(self, index): """Return blob at given index.""" self.seek_to_frame(index) return self.next_blob()
[docs] def determine_frame_positions(self): """Record the file pointer position of each frame""", 0) with ignored(struct.error): while True: pointer_position = self.blob_file.tell() length = struct.unpack("<i",[0] - 4, 1) self.frame_positions.append(pointer_position), 0)"Found {0} frames.".format(len(self.frame_positions)))
[docs] def process(self, blob): """Pump the next blob to the modules""" return self.next_blob()
[docs] def finish(self): """Clean everything up""" self.blob_file.close()
def __len__(self): if not self.frame_positions: self.determine_frame_positions() return len(self.frame_positions) def __iter__(self): return self def __next__(self): try: blob = self.get_blob(self.index) except IndexError: self.index = 0 raise StopIteration self.index += 1 return blob def __getitem__(self, index): if isinstance(index, int): return self.get_blob(index) elif isinstance(index, slice): return self._slice_generator(index) else: raise TypeError("index must be int or slice") def _slice_generator(self, index): """A simple slice generator for iterations""" start, stop, step = index.indices(len(self)) for i in range(start, stop, step): yield self.get_blob(i)
[docs] class DAQProcessor(Module):
[docs] def configure(self): self.legacy = self.get("legacy", default=False) self.index = 0 self.event_id = 0
[docs] def process(self, blob): tag = str(blob["CHPrefix"].tag) data = blob["CHData"] processor = None if tag == "IO_EVT": processor = self.process_event if tag == "IO_SUM": processor = self.process_summaryslice if tag == "IO_OLINE": processor = self.process_online_reco if processor is None: self.log.error("Unsupported tag: %s", tag) return try: processor(data, blob) except (struct.error, ValueError) as e: self.log.error("Corrupt data received. Skipping...\n" "Error: %s", e) return return blob
[docs] def process_event(self, data, blob): data_io = BytesIO(data) preamble = DAQPreamble(file_obj=data_io) # noqa event = DAQEvent(file_obj=data_io, legacy=self.legacy) header = event.header event_info = Table.from_template( { "det_id": header.det_id, # 'frame_index': self.index, # header.time_slice, "frame_index": header.time_slice, "livetime_sec": 0, "mc_id": 0, "mc_t": 0, "n_events_gen": 0, "n_files_gen": 0, "overlays": event.overlays, "trigger_counter": event.trigger_counter, "trigger_mask": event.trigger_mask, "utc_nanoseconds": header.ticks * 16, "utc_seconds": header.time_stamp, "weight_w1": 0, "weight_w2": 0, "weight_w3": 0, # MC weights "run_id":, # run id "group_id": self.event_id, }, "EventInfo", ) blob["EventInfo"] = event_info self.event_id += 1 self.index += 1 hits = event.snapshot_hits n_hits = event.n_snapshot_hits if n_hits == 0: self.log.warning("No hits found in event.") return # This might not be needed triggereds = np.zeros(n_hits) triggered_map = {} for thit in event.triggered_hits: # TODO: switch to thit.trigger_mask instead of True! triggered_map[(thit.dom_id, thit.channel_id, thit.time, thit.tot)] = True for idx, hit in enumerate(hits): triggereds[idx] = tuple(hit) in triggered_map hit_series = Table.from_template( { "channel_id": hits.channel_id, "dom_id": hits.dom_id, "time": hits.time, "tot": hits.tot, "triggered": triggereds, # TODO: switch to trigger_mask! "group_id": self.event_id, }, "Hits", ) blob["Hits"] = hit_series
[docs] def process_summaryslice(self, data, blob): data_io = BytesIO(data) preamble = DAQPreamble(file_obj=data_io) # noqa summaryslice = DAQSummaryslice(file_obj=data_io, legacy=self.legacy) blob["RawSummaryslice"] = summaryslice
[docs] def process_online_reco(self, data, blob): data_io = BytesIO(data) preamble = DAQPreamble(file_obj=data_io) # noqa _data = unpack("<iiiQI", + 4 + 4 + 8 + 4)) det_id, run_id, frame_index, trigger_counter, utc_seconds = _data shower_reco = unpack("9d", * 8)) shower_meta = unpack("3i", track_reco = unpack("9d", * 8)) track_meta = unpack("3i", print( "Shower: x/y/z/dx/dy/dz/E/Q/t (type/status/ndf): ", shower_reco, shower_meta ) print("Track: x/y/z/dx/dy/dz/E/Q/t (type/status/ndf): ", track_reco, track_meta) blob["ReconstructionInfo"] = Table( { "det_id": det_id, "run_id": run_id, "frame_index": frame_index, "trigger_counter": trigger_counter, "utc_seconds": utc_seconds, }, h5loc="reco", split_h5=True, name="Reconstructions", ) args = track_reco + track_meta blob["RecoTrack"] = RecoTrack(*args) args = shower_reco + shower_meta blob["RecoShower"] = RecoShower(*args)
[docs] RecoTrack = namedtuple("RecoTrack", "x y z dx dy dz E Q t type status ndf")
[docs] RecoShower = namedtuple("RecoShower", "x y z dx dy dz E Q t type status ndf")
[docs] class DAQPreamble(object): """Wrapper for the JDAQPreamble binary format. Parameters ---------- byte_data : bytes (optional) The binary file, where the file pointer is at the beginning of the header. file_obj : file (optional) The binary file, where the file pointer is at the beginning of the header. Attributes ---------- size : int The size of the original DAQ byte representation. data_type : int The data type of the following frame. The coding is stored in the ``DATA_TYPES``. """
[docs] size = 8
def __init__(self, byte_data=None, file_obj=None): self.length = None self.data_type = None if byte_data: self._parse_byte_data(byte_data) if file_obj: self._parse_file(file_obj) def _parse_byte_data(self, byte_data): """Extract the values from byte string.""" self.length, self.data_type = unpack("<ii", byte_data[: self.size]) def _parse_file(self, file_obj): """Directly read from file handler. Note that this will move the file pointer. """ byte_data = self._parse_byte_data(byte_data) def __repr__(self): description = "Length: {0}\nDataType: {1}".format(self.length, self.data_type) return description
[docs] class DAQHeader(object): """Wrapper for the JDAQHeader binary format. Parameters ---------- byte_data : bytes (optional) The binary file, where the file pointer is at the beginning of the header. file_obj : file (optional) The binary file, where the file pointer is at the beginning of the header. Attributes ---------- size : int The size of the original DAQ byte representation. """
[docs] size = 20
def __init__(self, byte_data=None, file_obj=None): = None self.time_slice = None self.time_stamp = None self.wr_status = None if byte_data: self._parse_byte_data(byte_data) if file_obj: self._parse_file(file_obj) def _parse_byte_data(self, byte_data): """Extract the values from byte string.""" chunks = unpack("<iiiii", byte_data[: self.size]) det_id, run, time_slice, time_stamp, ticks = chunks self.det_id = det_id = run self.time_slice = time_slice # most significant bit is the WR status self.wr_status = time_stamp & 0x80000000 # masking the most significant bit, which is the WR status self.time_stamp = time_stamp & 0x7FFFFFFF self.ticks = ticks def _parse_file(self, file_obj): """Directly read from file handler. Note: This will move the file pointer. """ byte_data = self._parse_byte_data(byte_data) def __repr__(self): description = "Run: {0}\nTime slice: {1}\nTime stamp: {2} ({3})".format(, self.time_slice, self.time_stamp, self.ticks ) return description
[docs] class DAQSummaryslice(object): """Wrapper for the JDAQSummarySlice binary format. Parameters ---------- file_obj : file (optional) The binary file, where the file pointer is at the beginning of the header. Attributes ---------- n_summary_frames : int The number of summary frames. summary_frames : dict The PMT rates for each DOM. The key is the DOM identifier and the corresponding value is a sorted list of PMT rates in [Hz]. dom_rates : dict The overall DOM rate for each DOM. """ def __init__(self, file_obj, legacy=False): if not legacy: version = unpack("<h",[0] if version != 6: raise ValueError( "Unsupported {} version ({}) or legacy DAQ. " "Make sure Jpp v13+ is used or pass 'legacy=True' " "to the init.".format(self.__class__.__name__, version) ) self.header = DAQHeader(file_obj=file_obj) self.n_summary_frames = unpack("<i",[0] self.summary_frames = {} self.dq_status = {} self.dom_status = {} self.dom_rates = {} self._parse_summary_frames(file_obj) def _parse_summary_frames(self, file_obj): """Iterate through the byte data and fill the summary_frames""" for _ in range(self.n_summary_frames): dom_id = unpack("<i",[0] dq_status = # probably dom status? # noqa dom_status = unpack("<iiii", raw_rates = unpack("b" * 31, pmt_rates = [self._get_rate(value) for value in raw_rates] self.summary_frames[dom_id] = pmt_rates self.dq_status[dom_id] = dq_status self.dom_status[dom_id] = dom_status self.dom_rates[dom_id] = np.sum(pmt_rates) def _get_rate(self, value): """Return the rate in Hz from the short int value""" if value == 0: return 0 else: return MINIMAL_RATE_HZ * math.exp(value * self._get_factor()) def _get_factor(self): return math.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255
[docs] class DAQEvent(object): """Wrapper for the JDAQEvent binary format. Parameters ---------- file_obj : file The binary file, where the file pointer is at the beginning of the header. Attributes ---------- trigger_counter : int Incremental identifier of the occurred trigger. trigger_mask : int The trigger type(s) satisfied. overlays : int Number of merged events. n_triggered_hits : int Number of hits satisfying the trigger conditions. n_snapshot_hits : int Number of snapshot hits. triggered_hits : np.recarray Array of triggered hits (fields: dom_id, pmt_id, tdc_time, tot, trigger_mask) snapshot_hits : np.recarray A list of snapshot hits (fields: dom_id, pmt_id, tdc_time, tot) """
[docs] triggered_hits_dt = np.dtype( [ ("dom_id", "<i"), ("channel_id", np.uint8), ("time", ">I"), ("tot", np.uint8), ("trigger_mask", "<Q"), ] )
# Numba chokes on big endian and the time is big, so we need to reinterpret
[docs] triggered_hits_dt_final = np.dtype( [ ("dom_id", "<i"), ("channel_id", np.uint8), ("time", "<I"), ("tot", np.uint8), ("trigger_mask", "<Q"), ] )
[docs] snapshot_hits_dt = np.dtype( [ ("dom_id", "<i"), ("channel_id", np.uint8), ("time", ">I"), ("tot", np.uint8), ] )
# Numba chokes on big endian and the time is big, so we need to reinterpret
[docs] snapshot_hits_dt_final = np.dtype( [ ("dom_id", "<i"), ("channel_id", np.uint8), ("time", "<I"), ("tot", np.uint8), ] )
def __init__(self, file_obj, legacy=False): if not legacy: version = unpack("<h",[0] if version != 4: raise ValueError( "Unsupported {} version ({}) or legacy DAQ. " "Make sure Jpp v13+ is used or pass 'legacy=True' " "to the init.".format(self.__class__.__name__, version) ) self.header = DAQHeader(file_obj=file_obj) self.trigger_counter = unpack("<Q",[0] self.trigger_mask = unpack("<Q",[0] self.overlays = unpack("<i",[0] self.n_triggered_hits = unpack("<i",[0] self.triggered_hits = self._parse_triggered_hits(file_obj) self.n_snapshot_hits = unpack("<i",[0] self.snapshot_hits = self._parse_snapshot_hits(file_obj) def _parse_triggered_hits(self, file_obj): """Parse and store triggered hits.""" raw_data = self.triggered_hits_dt.itemsize * self.n_triggered_hits ) arr = np.frombuffer(raw_data, self.triggered_hits_dt).view(np.recarray) return arr.astype(self.triggered_hits_dt_final) def _parse_snapshot_hits(self, file_obj): """Parse and store snapshot hits.""" raw_data = * self.n_snapshot_hits) arr = np.frombuffer(raw_data, self.snapshot_hits_dt).view(np.recarray) return arr.astype(self.snapshot_hits_dt_final) def __repr__(self): string = "\n".join( ( " Number of triggered hits: " + str(self.n_triggered_hits), " Number of snapshot hits: " + str(self.n_snapshot_hits), ) ) string += "\nTriggered hits:\n" string += pprint.pformat(self.triggered_hits) string += "\nSnapshot hits:\n" string += pprint.pformat(self.snapshot_hits) return string
[docs] class TMCHData(object): """Monitoring Channel data.""" def __init__(self, file_obj, version=None): f = file_obj data_type = if data_type != b"TMCH": raise ValueError("Invalid datatype: {0}".format(data_type)) = unpack(">I",[0] self.udp_sequence_number = unpack(">I",[0] self.utc_seconds = unpack(">I",[0] self.nanoseconds = unpack(">I",[0] * 16 self.dom_id = unpack(">I",[0] self.dom_status_0 = unpack(">I",[0] self.dom_status_1 = unpack(">I",[0] self.dom_status_2 = unpack(">I",[0] self.dom_status_3 = unpack(">I",[0] self.pmt_rates = [r * 10.0 for r in unpack(">" + 31 * "I", * 4))] self.hrvbmp = unpack(">I",[0] self.flags = unpack(">I",[0] # flags: # bit 0: AHRS valid if version is None: # bit 3-1: structure version # 000 - 1, 001 - 2, 010 - unused, 011 - 3 self.version = int(bin((self.flags >> 1) & 7), 2) + 1 else: self.version = version self.yaw, self.pitch, self.roll = unpack(">fff", self.A = unpack(">fff", # AHRS: Ax, Ay, Az self.G = unpack(">fff", # AHRS: Gx, Gy, Gz self.H = unpack(">fff", # AHRS: Hx, Hy, Hz self.temp = unpack(">H",[0] / 100.0 self.humidity = unpack(">H",[0] / 100.0 self.tdcfull = unpack(">I",[0] self.aesfull = unpack(">I",[0] self.flushc = unpack(">I",[0] if self.version >= 2: self.ts_duration_ms = unpack(">I",[0] if self.version >= 3: self.tdc_supertime_fifo_size = unpack(">H",[0] self.aes_supertime_fifo_size = unpack(">H",[0] def __str__(self): return str(vars(self)) def __repr__(self): return self.__str__()
[docs] class TMCHRepump(Module): """Takes a IO_MONIT raw dump and replays it."""
[docs] def configure(self): filename = self.require("filename") self.format_version = self.get("format_version", default=None) self.fobj = open(filename, "rb") self.blobs = self.blob_generator()
[docs] def process(self, blob): return next(self.blobs)
[docs] def blob_generator(self): while True: blob = Blob() datatype = if len(datatype) == 0: return if datatype == b"TMCH":, 1) blob["TMCHData"] = TMCHData(self.fobj, version=self.format_version) yield blob
[docs] def finish(self): self.fobj.close()
def __iter__(self): return self def __next__(self): return next(self.blobs)
[docs] class DMMonitor(object): """A class which provides access to the Detector Manager parameters. Examples -------- >>> import km3pipe as kp >>> dmm ='', base='clb/outparams') >>> session = dmm.start_session('test', ['wr_mu/1/0','wr_mu/1/1']) >>> for values in session: print(values) """ def __init__(self, host, port=1302, base=""): self._host = host self._port = port self._base = base self._url = "http://{}:{}/mon/{}".format(self._host, self._port, self._base) self._available_parameters = [] self.log = get_logger(self.__class__.__name__) self.cprint = get_printer(self.__class__.__name__) @property
[docs] def available_parameters(self): if not self._available_parameters: self._get_available_parameters() return self._available_parameters
def _get_available_parameters(self): self._available_parameters = json.loads(urlopen(self._url).read())
[docs] def get(self, path): return json.loads( urlopen( "http://{}:{}/mon/{}/{}".format( self._host, self._port, self._base, path ) ).read() )
[docs] def start_session(self, name, paths, interval=10): self.cprint("Starting session '{}'".format(name)) ret = urlopen( "http://{}:{}/monshortdef?name={}&paths={}".format( self._host, self._port, name, ",".join(["/mon/{}/{}".format(self._base, p) for p in paths]), ) ).read() if ret != b"OK": self.log.error("Could not start session") return [] return self._session(name, interval)
def _session(self, name, interval): url = "http://{}:{}/monshort/{}".format(self._host, self._port, name) while True: try: yield json.loads(urlopen(url).read()) except URLError as e: self.log.error( "Error when trying to connect to the DM: %s\n" "Retry in %d seconds..." % (e, interval) ) time.sleep(interval)