Coverage for src/km3pipe/io/offline.py: 88%

34 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 03:14 +0000

1#!/usr/bin/env python3 

2 

3from thepipe import Module, Blob 

4from ..dataclasses import Table 

5from .hdf5 import HDF5Header 

6from thepipe import Provenance 

7 

8import km3io 

9import numpy as np 

10from collections import defaultdict 

11 

12 

13class OfflinePump(Module): 

14 def configure(self): 

15 self._filename = self.get("filename") 

16 step_size = self.get("step_size", default=2000) 

17 

18 self._reader = km3io.OfflineReader(self._filename, step_size=step_size) 

19 self.header = self._reader.header 

20 self.blobs = self._blob_generator() 

21 

22 Provenance().record_input( 

23 self._filename, uuid=str(self._reader.uuid), comment="OfflinePump input" 

24 ) 

25 

26 self.expose(self.header, "offline_header") 

27 

28 def process(self, blob=None): 

29 return next(self.blobs) 

30 

31 def finish(self): 

32 self._reader.close() 

33 

34 def __iter__(self): 

35 return self 

36 

37 def __next__(self): 

38 return next(self.blobs) 

39 

40 def __getitem__(self, item): 

41 if not isinstance(item, int): 

42 raise TypeError("Only integer indices are supported.") 

43 return Blob({"event": self._reader[item], "header": self.header}) 

44 

45 def get_number_of_blobs(self): 

46 return len(self._reader) 

47 

48 def _blob_generator(self): 

49 for event in self._reader: 

50 blob = Blob({"event": event, "header": self.header}) 

51 yield blob