Source code for km3io.online

import binascii
from collections import namedtuple
import os
import uproot
import uproot3
import numpy as np

import numba as nb

[docs] TIMESLICE_FRAME_BASKET_CACHE_SIZE = 523 * 1024**2 # [byte]
[docs] SUMMARYSLICE_FRAME_BASKET_CACHE_SIZE = 523 * 1024**2 # [byte]
[docs] BASKET_CACHE_SIZE = 110 * 1024**2
[docs] BASKET_CACHE = uproot3.cache.ThreadSafeArrayCache(BASKET_CACHE_SIZE)
# Parameters for PMT rate conversions, since the rates in summary slices are # stored as a single byte to save space. The values from 0-255 can be decoded # using the `get_rate(value)` function, which will yield the actual rate # in Hz.
[docs] MINIMAL_RATE_HZ = 2.0e3
[docs] MAXIMAL_RATE_HZ = 2.0e6
[docs] RATE_FACTOR = np.log(MAXIMAL_RATE_HZ / MINIMAL_RATE_HZ) / 255
[docs] CHANNEL_BITS_TEMPLATE = np.zeros(31, dtype=bool)
[docs] BranchConfiguration = namedtuple( field_names=["branch_address", "interpretation"], typename="BranchConfiguration" )
[docs] class SummarysliceReader: """ A reader for summaryslices which are loaded as chunks given by step_size. To be used as an iterator (`for chunks in SummarysliceReader(...): ...`) """
[docs] TREE_ADDR = "KM3NET_SUMMARYSLICE/KM3NET_SUMMARYSLICE"
_subbranches = [ BranchConfiguration( "KM3NETDAQ::JDAQSummarysliceHeader", uproot.interpretation.numerical.AsDtype( [ (" cnt", "u4"), (" vers", "u2"), (" cnt2", "u4"), (" vers2", "u2"), (" cnt3", "u4"), (" vers3", "u2"), ("detector_id", ">i4"), ("run", ">i4"), ("frame_index", ">i4"), (" cnt4", "u4"), (" vers4", "u2"), ("UTC_seconds", ">u4"), ("UTC_16nanosecondcycles", ">u4"), ] ), ), BranchConfiguration( "vector<KM3NETDAQ::JDAQSummaryFrame>", uproot.interpretation.jagged.AsJagged( uproot.interpretation.numerical.AsDtype( [ ("dom_id", ">i4"), ("dq_status", ">u4"), ("hrv", ">u4"), ("fifo", ">u4"), ("status3", ">u4"), ("status4", ">u4"), ] + [(f"ch{c}", "u1") for c in range(31)] ), header_bytes=10, ), ), ] def __init__(self, fobj, step_size=1000): if isinstance(fobj, str): self._fobj = uproot.open(fobj) else: self._fobj = fobj self._step_size = step_size self._branch = self._fobj[self.TREE_ADDR] self.ChunksConstructor = namedtuple( field_names=["headers", "slices"], typename="SummarysliceChunk" ) def _chunks_generator(self): for chunk in self._branch.iterate( dict(self._subbranches), step_size=self._step_size ): yield self.ChunksConstructor( *[getattr(chunk, bc.branch_address) for bc in self._subbranches] ) def __getitem__(self, idx): if idx >= len(self) or idx < -len(self): raise IndexError("Chunk index out of range") s = self._step_size if idx < 0: idx = len(self) + idx chunk = self._branch.arrays( dict(self._subbranches), entry_start=idx * s, entry_stop=(idx + 1) * s ) return self.ChunksConstructor( *[getattr(chunk, bc.branch_address) for bc in self._subbranches] ) def __iter__(self): self._chunks = self._chunks_generator() return self def __next__(self): return next(self._chunks) def __len__(self): return int(np.ceil(self._branch.num_entries / self._step_size)) def __repr__(self): step_size = self._step_size n_items = self._branch.num_entries cls_name = self.__class__.__name__ n_chunks = len(self) return ( f"<{cls_name} {n_items} items, step_size={step_size} " f"({n_chunks} chunk{'' if n_chunks == 1 else 's'})>" )
@nb.vectorize( [nb.int32(nb.int8), nb.int32(nb.int16), nb.int32(nb.int32), nb.int32(nb.int64)] )
[docs] def get_rate(value): # pragma: no cover """Return the rate in Hz from the short int value""" if value == 0: return 0 else: return MINIMAL_RATE_HZ * np.exp(value * RATE_FACTOR)
@nb.guvectorize( "void(i8, b1[:], b1[:])", "(), (n) -> (n)", target="parallel", nopython=True )
[docs] def unpack_bits(value, bits_template, out): # pragma: no cover """Return a boolean array for a value's bit representation. This function also accepts arrays as input, the output shape will be NxM where N is the number of input values and M the length of the ``bits_template`` array, which is just a dummy array, due to the weird signature system of numba. Parameters ---------- value: int or np.array(int) with shape (N,) The binary value of containing the bit information bits_template: np.array() with shape (M,) The template for the output array, the only important is its shape Returns ------- np.array(bool) either with shape (M,) or (N, M) """ for i in range(bits_template.shape[0]): out[30 - i] = value & (1 << i) > 0
[docs] def get_channel_flags(value): """Returns the hrv/fifo flags for the PMT channels (hrv/fifo) Parameters ---------- value : int32 The integer value to be parsed. """ channel_bits = np.bitwise_and(value, 0x7FFFFFFF) flags = unpack_bits(channel_bits, CHANNEL_BITS_TEMPLATE) return np.flip(flags, axis=-1)
[docs] def get_number_udp_packets(value): """Returns the number of received UDP packets (dq_status) Parameters ---------- value : int32 The integer value to be parsed. """ return np.bitwise_and(value, 0x7FFF)
[docs] def get_udp_max_sequence_number(value): """Returns the maximum sequence number of the received UDP packets (dq_status) Parameters ---------- value : int32 The integer value to be parsed. """ return np.right_shift(value, 16)
[docs] def has_udp_trailer(value): """Returns the UDP Trailer flag (fifo) Parameters ---------- value : int32 The integer value to be parsed. """ return np.any(np.bitwise_and(value, np.left_shift(1, 31)))
[docs] class OnlineReader: """Reader for online ROOT files""" def __init__(self, filename): self._fobj = uproot3.open(filename) self._filename = filename self._events = None self._timeslices = None self._summaryslices = None self._uuid = binascii.hexlify(self._fobj._context.uuid).decode("ascii") @property
[docs] def uuid(self): return self._uuid
[docs] def close(self): self._fobj.close()
def __enter__(self): return self def __exit__(self, *args): self.close() @property
[docs] def events(self): if self._events is None: tree = self._fobj["KM3NET_EVENT"] headers = tree["KM3NETDAQ::JDAQEventHeader"].array( uproot3.interpret(tree["KM3NETDAQ::JDAQEventHeader"], cntvers=True) ) snapshot_hits = tree["snapshotHits"].array( uproot3.asjagged( uproot3.astable( uproot3.asdtype( [ ("dom_id", ">i4"), ("channel_id", "u1"), ("time", "<u4"), ("tot", "u1"), ] ) ), skipbytes=10, ) ) triggered_hits = tree["triggeredHits"].array( uproot3.asjagged( uproot3.astable( uproot3.asdtype( [ ("dom_id", ">i4"), ("channel_id", "u1"), ("time", "<u4"), ("tot", "u1"), (" cnt", "u4"), (" vers", "u2"), ("trigger_mask", ">u8"), ] ) ), skipbytes=10, ) ) self._events = OnlineEvents(headers, snapshot_hits, triggered_hits) return self._events
@property
[docs] def timeslices(self): if self._timeslices is None: self._timeslices = Timeslices(self._fobj) return self._timeslices
@property
[docs] def summaryslices(self): if self._summaryslices is None: self._summaryslices = SummarysliceReader( uproot.open(self._filename) ) # TODO: remove when using uproot4 return self._summaryslices
[docs] class Timeslices: """A simple wrapper for timeslices""" def __init__(self, fobj): self._fobj = fobj self._timeslices = {} self._read_streams() def _read_streams(self): """Read the L0, L1, L2 and SN streams if available""" streams = set( s.split(b"KM3NET_TIMESLICE_")[1].split(b";")[0] for s in self._fobj.keys() if b"KM3NET_TIMESLICE_" in s ) for stream in streams: tree = self._fobj[b"KM3NET_TIMESLICE_" + stream][ b"KM3NETDAQ::JDAQTimeslice" ] headers = tree[b"KM3NETDAQ::JDAQTimesliceHeader"][b"KM3NETDAQ::JDAQHeader"][ b"KM3NETDAQ::JDAQChronometer" ] if len(headers) == 0: continue superframes = tree[b"vector<KM3NETDAQ::JDAQSuperFrame>"] hits_dtype = np.dtype([("pmt", "u1"), ("tdc", "<u4"), ("tot", "u1")]) hits_buffer = superframes[ b"vector<KM3NETDAQ::JDAQSuperFrame>.buffer" ].lazyarray( uproot3.asjagged( uproot3.astable(uproot3.asdtype(hits_dtype)), skipbytes=6 ), basketcache=uproot3.cache.ThreadSafeArrayCache( TIMESLICE_FRAME_BASKET_CACHE_SIZE ), ) self._timeslices[stream.decode("ascii")] = ( headers, superframes, hits_buffer, ) setattr( self, stream.decode("ascii"), TimesliceStream(headers, superframes, hits_buffer), )
[docs] def stream(self, stream, idx): ts = self._timeslices[stream] return Timeslice(*ts, idx, stream)
def __str__(self): return "Available timeslice streams: {}".format( ", ".join(s for s in self._timeslices.keys()) ) def __repr__(self): return str(self)
[docs] class TimesliceStream: def __init__(self, headers, superframes, hits_buffer): # self.headers = headers.lazyarray( # uproot3.asjagged(uproot3.astable( # uproot3.asdtype( # np.dtype([('a', 'i4'), ('b', 'i4'), ('c', 'i4'), # ('d', 'i4'), ('e', 'i4')]))), # skipbytes=6), # basketcache=uproot3.cache.ThreadSafeArrayCache( # TIMESLICE_FRAME_BASKET_CACHE_SIZE)) self.headers = headers self.superframes = superframes self._hits_buffer = hits_buffer
# def frames(self): # n_hits = self._superframe[ # b'vector<KM3NETDAQ::JDAQSuperFrame>.numberOfHits'].lazyarray( # basketcache=BASKET_CACHE)[self._idx] # module_ids = self._superframe[ # b'vector<KM3NETDAQ::JDAQSuperFrame>.id'].lazyarray(basketcache=BASKET_CACHE)[self._idx] # idx = 0 # for module_id, n_hits in zip(module_ids, n_hits): # self._frames[module_id] = hits_buffer[idx:idx + n_hits] # idx += n_hits
[docs] class Timeslice: """A wrapper for a timeslice""" def __init__(self, header, superframe, hits_buffer, idx, stream): self.header = header self._frames = {} self._superframe = superframe self._hits_buffer = hits_buffer self._idx = idx self._stream = stream self._n_frames = None @property
[docs] def frames(self): if not self._frames: self._read_frames() return self._frames
def _read_frames(self): """Populate a dictionary of frames with the module ID as key""" hits_buffer = self._hits_buffer[self._idx] n_hits = self._superframe[ b"vector<KM3NETDAQ::JDAQSuperFrame>.numberOfHits" ].lazyarray(basketcache=BASKET_CACHE)[self._idx] try: module_ids = self._superframe[ b"vector<KM3NETDAQ::JDAQSuperFrame>.id" ].lazyarray(basketcache=BASKET_CACHE)[self._idx] except KeyError: module_ids = ( self._superframe[ b"vector<KM3NETDAQ::JDAQSuperFrame>.KM3NETDAQ::JDAQModuleIdentifier" ] .lazyarray( uproot3.asjagged( uproot3.astable(uproot3.asdtype([("dom_id", ">i4")])) ), basketcache=BASKET_CACHE, )[self._idx] .dom_id ) idx = 0 for module_id, n_hits in zip(module_ids, n_hits): self._frames[module_id] = hits_buffer[idx : idx + n_hits] idx += n_hits def __len__(self): if self._n_frames is None: self._n_frames = len( self._superframe[b"vector<KM3NETDAQ::JDAQSuperFrame>.id"].lazyarray( basketcache=BASKET_CACHE )[self._idx] ) return self._n_frames def __str__(self): return "{} timeslice with {} frames.".format(self._stream, len(self)) def __repr__(self): return "<{}: {} entries>".format(self.__class__.__name__, len(self.header))
[docs] class OnlineEvents: """A simple wrapper for online events""" def __init__(self, headers, snapshot_hits, triggered_hits): self.headers = headers self.snapshot_hits = snapshot_hits self.triggered_hits = triggered_hits def __getitem__(self, item): return OnlineEvent( self.headers[item], self.snapshot_hits[item], self.triggered_hits[item] ) def __len__(self): return len(self.headers) def __str__(self): return "Number of events: {}".format(len(self.headers)) def __repr__(self): return str(self)
[docs] class OnlineEvent: """A wrapper for a online event""" def __init__(self, header, snapshot_hits, triggered_hits): self.header = header self.snapshot_hits = snapshot_hits self.triggered_hits = triggered_hits def __str__(self): return "Online event with {} snapshot hits and {} triggered hits".format( len(self.snapshot_hits), len(self.triggered_hits) ) def __repr__(self): return str(self)