Coverage for src/km3pipe/io/clb.py: 94%

66 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-19 03:14 +0000

1# Filename: clb.py 

2""" 

3Pumps for the CLB data formats. 

4 

5""" 

6 

7import struct 

8from struct import unpack 

9 

10import numpy as np 

11 

12from thepipe import Blob, Module 

13from km3pipe.dataclasses import Table 

14from km3pipe.sys import ignored 

15 

16__author__ = "Tamas Gal" 

17__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

18__credits__ = [] 

19__license__ = "MIT" 

20__maintainer__ = "Tamas Gal" 

21__email__ = "tgal@km3net.de" 

22__status__ = "Development" 

23 

24 

25class CLBPump(Module): 

26 """A pump for binary CLB files. 

27 

28 Parameters 

29 ---------- 

30 file: str 

31 filename or file-like object. 

32 

33 """ 

34 

35 pmt_dt = np.dtype([("channel_id", np.uint8), ("time", ">i"), ("tot", np.uint8)]) 

36 

37 def configure(self): 

38 self.file = self.require("file") 

39 if isinstance(self.file, str): 

40 self.file = open(self.file, "rb") 

41 self._packet_positions = [] 

42 

43 self._determine_packet_positions() 

44 

45 self.blobs = self.blob_generator() 

46 

47 def _determine_packet_positions(self): 

48 """Record the file pointer position of each frame""" 

49 self.cprint("Scanning UDP packets...") 

50 self.file.seek(0, 0) 

51 with ignored(struct.error): 

52 while True: 

53 pointer_position = self.file.tell() 

54 length = unpack("<i", self.file.read(4))[0] 

55 self._packet_positions.append(pointer_position) 

56 self.file.seek(length, 1) 

57 self.file.seek(0, 0) 

58 

59 def __len__(self): 

60 return len(self._packet_positions) 

61 

62 def seek_to_packet(self, index): 

63 """Move file pointer to the packet with given index.""" 

64 pointer_position = self._packet_positions[index] 

65 self.file.seek(pointer_position, 0) 

66 

67 def blob_generator(self): 

68 """Generate next blob in file""" 

69 for _ in range(len(self)): 

70 yield self.extract_blob() 

71 

72 def extract_blob(self): 

73 try: 

74 length = unpack("<i", self.file.read(4))[0] 

75 except struct.error: 

76 raise StopIteration 

77 

78 blob = Blob() 

79 

80 blob["PacketInfo"] = Table( 

81 { 

82 "data_type": b"".join(unpack("cccc", self.file.read(4))).decode(), 

83 "run": unpack(">i", self.file.read(4))[0], 

84 "udp_sequence": unpack(">i", self.file.read(4))[0], 

85 "timestamp": unpack(">I", self.file.read(4))[0], 

86 "ns_ticks": unpack(">I", self.file.read(4))[0], 

87 "dom_id": unpack(">i", self.file.read(4))[0], 

88 "dom_status": unpack(">I", self.file.read(4))[0], 

89 }, 

90 h5loc="/packet_info", 

91 split_h5=True, 

92 name="UDP Packet Info", 

93 ) 

94 

95 remaining_length = length - 7 * 4 

96 pmt_data = [] 

97 

98 count = remaining_length // self.pmt_dt.itemsize 

99 

100 pmt_data = np.fromfile(self.file, dtype=self.pmt_dt, count=count) 

101 

102 blob["Hits"] = Table(pmt_data, h5loc="/hits", split_h5=True) 

103 return blob 

104 

105 def __getitem__(self, index): 

106 """Return blob at given index.""" 

107 self.seek_to_packet(index) 

108 return self.extract_blob() 

109 

110 def process(self, blob): 

111 return next(self.blobs) 

112 

113 def __iter__(self): 

114 self.file.seek(0, 0) 

115 self.blobs = self.blob_generator() 

116 return self 

117 

118 def __next__(self): 

119 return next(self.blobs) 

120 

121 def finish(self): 

122 """Clean everything up""" 

123 self.file.close()