Coverage for src/km3pipe/io/offline.py: 88%
34 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1#!/usr/bin/env python3
3from thepipe import Module, Blob
4from ..dataclasses import Table
5from .hdf5 import HDF5Header
6from thepipe import Provenance
8import km3io
9import numpy as np
10from collections import defaultdict
13class OfflinePump(Module):
14 def configure(self):
15 self._filename = self.get("filename")
16 step_size = self.get("step_size", default=2000)
18 self._reader = km3io.OfflineReader(self._filename, step_size=step_size)
19 self.header = self._reader.header
20 self.blobs = self._blob_generator()
22 Provenance().record_input(
23 self._filename, uuid=str(self._reader.uuid), comment="OfflinePump input"
24 )
26 self.expose(self.header, "offline_header")
28 def process(self, blob=None):
29 return next(self.blobs)
31 def finish(self):
32 self._reader.close()
34 def __iter__(self):
35 return self
37 def __next__(self):
38 return next(self.blobs)
40 def __getitem__(self, item):
41 if not isinstance(item, int):
42 raise TypeError("Only integer indices are supported.")
43 return Blob({"event": self._reader[item], "header": self.header})
45 def get_number_of_blobs(self):
46 return len(self._reader)
48 def _blob_generator(self):
49 for event in self._reader:
50 blob = Blob({"event": event, "header": self.header})
51 yield blob