Coverage for src/km3pipe/io/clb.py: 94%
66 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-19 03:14 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-19 03:14 +0000
1# Filename: clb.py
2"""
3Pumps for the CLB data formats.
5"""
7import struct
8from struct import unpack
10import numpy as np
12from thepipe import Blob, Module
13from km3pipe.dataclasses import Table
14from km3pipe.sys import ignored
16__author__ = "Tamas Gal"
17__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
18__credits__ = []
19__license__ = "MIT"
20__maintainer__ = "Tamas Gal"
21__email__ = "tgal@km3net.de"
22__status__ = "Development"
25class CLBPump(Module):
26 """A pump for binary CLB files.
28 Parameters
29 ----------
30 file: str
31 filename or file-like object.
33 """
35 pmt_dt = np.dtype([("channel_id", np.uint8), ("time", ">i"), ("tot", np.uint8)])
37 def configure(self):
38 self.file = self.require("file")
39 if isinstance(self.file, str):
40 self.file = open(self.file, "rb")
41 self._packet_positions = []
43 self._determine_packet_positions()
45 self.blobs = self.blob_generator()
47 def _determine_packet_positions(self):
48 """Record the file pointer position of each frame"""
49 self.cprint("Scanning UDP packets...")
50 self.file.seek(0, 0)
51 with ignored(struct.error):
52 while True:
53 pointer_position = self.file.tell()
54 length = unpack("<i", self.file.read(4))[0]
55 self._packet_positions.append(pointer_position)
56 self.file.seek(length, 1)
57 self.file.seek(0, 0)
59 def __len__(self):
60 return len(self._packet_positions)
62 def seek_to_packet(self, index):
63 """Move file pointer to the packet with given index."""
64 pointer_position = self._packet_positions[index]
65 self.file.seek(pointer_position, 0)
67 def blob_generator(self):
68 """Generate next blob in file"""
69 for _ in range(len(self)):
70 yield self.extract_blob()
72 def extract_blob(self):
73 try:
74 length = unpack("<i", self.file.read(4))[0]
75 except struct.error:
76 raise StopIteration
78 blob = Blob()
80 blob["PacketInfo"] = Table(
81 {
82 "data_type": b"".join(unpack("cccc", self.file.read(4))).decode(),
83 "run": unpack(">i", self.file.read(4))[0],
84 "udp_sequence": unpack(">i", self.file.read(4))[0],
85 "timestamp": unpack(">I", self.file.read(4))[0],
86 "ns_ticks": unpack(">I", self.file.read(4))[0],
87 "dom_id": unpack(">i", self.file.read(4))[0],
88 "dom_status": unpack(">I", self.file.read(4))[0],
89 },
90 h5loc="/packet_info",
91 split_h5=True,
92 name="UDP Packet Info",
93 )
95 remaining_length = length - 7 * 4
96 pmt_data = []
98 count = remaining_length // self.pmt_dt.itemsize
100 pmt_data = np.fromfile(self.file, dtype=self.pmt_dt, count=count)
102 blob["Hits"] = Table(pmt_data, h5loc="/hits", split_h5=True)
103 return blob
105 def __getitem__(self, index):
106 """Return blob at given index."""
107 self.seek_to_packet(index)
108 return self.extract_blob()
110 def process(self, blob):
111 return next(self.blobs)
113 def __iter__(self):
114 self.file.seek(0, 0)
115 self.blobs = self.blob_generator()
116 return self
118 def __next__(self):
119 return next(self.blobs)
121 def finish(self):
122 """Clean everything up"""
123 self.file.close()