Coverage for src/km3pipe/io/tests/test_hdf5.py: 98%
900 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1#!/usr/bin/env python
2"""Tests for HDF5 stuff"""
3from collections import OrderedDict, defaultdict
4import tempfile
5from os.path import join, dirname
7import numpy as np
8import tables as tb
9import km3io
11from km3pipe import Blob, Module, Pipeline, version
12from km3pipe.dataclasses import Table, NDArray
13from km3pipe.io.hdf5 import (
14 HDF5Pump,
15 HDF5Sink,
16 HDF5Header,
17 header2table,
18 FORMAT_VERSION,
19)
20from km3pipe.testing import TestCase, data_path
23class Skipper(Module):
24 """Skips the iteration with a given index (starting at 0)"""
26 def configure(self):
27 self.skip_indices = self.require("indices")
28 self.index = 0
30 def process(self, blob):
31 self.index += 1
32 if self.index - 1 in self.skip_indices:
33 print("skipping")
34 return
35 print(blob)
36 return blob
39class TestH5Pump(TestCase):
40 def setUp(self):
41 self.fname = data_path(
42 "hdf5/mcv5.40.mupage_10G.sirene.jterbr00006060.962.root.h5"
43 )
45 def test_init_sets_filename_if_no_keyword_arg_is_passed(self):
46 p = HDF5Pump(filename=self.fname)
47 self.assertEqual(self.fname, p.filename)
48 p.finish()
50 def test_standalone(self):
51 pump = HDF5Pump(filename=self.fname)
52 next(pump)
53 pump.finish()
55 def test_pipe(self):
56 class Observer(Module):
57 def configure(self):
58 self.dump = defaultdict(list)
60 def process(self, blob):
61 for key, data in blob.items():
62 if key == "Header":
63 self.dump["headers"].append(data)
64 else:
65 self.dump[key].append(len(data))
66 return blob
68 def finish(self):
69 return self.dump
71 p = Pipeline()
72 p.attach(HDF5Pump, filename=self.fname)
73 p.attach(Observer)
74 results = p.drain()["Observer"]
75 self.assertListEqual(
76 [147, 110, 70, 62, 59, 199, 130, 92, 296, 128], results["Hits"]
77 )
78 self.assertListEqual(
79 [315, 164, 100, 111, 123, 527, 359, 117, 984, 263], results["McHits"]
80 )
81 self.assertListEqual([1, 1, 1, 1, 1, 3, 2, 1, 2, 1], results["McTracks"])
83 def test_event_info_is_not_empty(self):
84 self.fname = data_path("hdf5/test_event_info.h5")
86 class Printer(Module):
87 def process(self, blob):
88 assert blob["EventInfo"].size != 0
89 return blob
91 p = Pipeline()
92 p.attach(HDF5Pump, filename=self.fname)
93 p.attach(Printer)
94 p.drain()
96 def test_event_info_has_correct_group_id(self):
97 self.fname = data_path("hdf5/test_event_info.h5")
99 class Printer(Module):
100 def configure(self):
101 self.index = 0
103 def process(self, blob):
104 assert blob["EventInfo"][0].group_id == self.index
105 self.index += 1
106 return blob
108 p = Pipeline()
109 p.attach(HDF5Pump, filename=self.fname)
110 p.attach(Printer)
111 p.drain()
113 def test_get_blob(self):
114 fname = data_path("hdf5/test_event_info.h5")
115 pump = HDF5Pump(filename=fname)
116 assert 44 == len(pump[0]["McTracks"])
117 assert 3 == len(pump[1]["McTracks"])
118 assert 179 == len(pump[2]["McTracks"])
119 assert 55 == len(pump[3]["McTracks"])
120 pump.finish()
123class TestH5Sink(TestCase):
124 def setUp(self):
125 self.fname = data_path(
126 "hdf5/mcv5.40.mupage_10G.sirene.jterbr00006060.962.root.h5"
127 )
128 self.fobj = tempfile.NamedTemporaryFile(delete=True)
129 self.out = tb.open_file(
130 self.fobj.name, "w", driver="H5FD_CORE", driver_core_backing_store=0
131 )
133 def tearDown(self):
134 self.out.close()
135 self.fobj.close()
137 def test_pipe(self):
138 p = Pipeline()
139 p.attach(HDF5Pump, filename=self.fname)
140 p.attach(HDF5Sink, h5file=self.out)
141 p.drain()
143 def test_h5info(self):
144 fobj = tempfile.NamedTemporaryFile(delete=True)
145 fname = fobj.name
147 class DummyPump(Module):
148 def process(self, blob):
149 return Blob()
151 pipe = Pipeline()
152 pipe.attach(DummyPump)
153 pipe.attach(HDF5Sink, filename=fname)
154 pipe.drain(5)
156 with tb.open_file(fname, "r") as h5file:
157 assert version == h5file.root._v_attrs.km3pipe.decode()
158 assert tb.__version__ == h5file.root._v_attrs.pytables.decode()
159 assert FORMAT_VERSION == h5file.root._v_attrs.format_version
161 fobj.close()
163 def test_filtered_writing(self):
164 fobjs = []
165 for i in range(3):
166 fobj = tempfile.NamedTemporaryFile(delete=True)
167 fobjs.append(fobj)
169 fobj_all = tempfile.NamedTemporaryFile(delete=True)
171 class DummyPump(Module):
172 def configure(self):
173 self.i = 0
175 def process(self, blob):
176 blob["A"] = Table({"a": self.i}, name="A", h5loc="tab_a")
177 blob["B"] = Table({"b": self.i}, name="B", h5loc="tab_b")
178 blob["C"] = Table({"c": self.i}, name="C", h5loc="tab_c")
179 self.i += 1
180 return blob
182 keys = "ABC"
184 pipe = Pipeline()
185 pipe.attach(DummyPump)
186 for fobj, key in zip(fobjs, keys):
187 pipe.attach(HDF5Sink, filename=fobj.name, keys=[key])
188 pipe.attach(HDF5Sink, filename=fobj_all.name)
189 pipe.drain(5)
191 for fobj, key in zip(fobjs, keys):
192 with tb.File(fobj.name, "r") as f:
193 assert "/tab_" + key.lower() in f
194 for _key in set(keys) - set(key):
195 assert "/tab_" + _key.lower() not in f
197 for key in keys:
198 with tb.File(fobj_all.name, "r") as f:
199 assert "/tab_" + key.lower() in f
201 for fobj in fobjs:
202 fobj.close()
203 fobj_all.close()
205 def test_filtered_writing_of_multiple_keys(self):
206 fobj = tempfile.NamedTemporaryFile(delete=True)
208 class DummyPump(Module):
209 def configure(self):
210 self.i = 0
212 def process(self, blob):
213 blob["A"] = Table({"a": self.i}, name="A", h5loc="tab_a")
214 blob["B"] = Table({"b": self.i}, name="B", h5loc="tab_b")
215 blob["C"] = Table({"c": self.i}, name="C", h5loc="tab_c")
216 self.i += 1
217 return blob
219 keys = ["A", "B"]
221 pipe = Pipeline()
222 pipe.attach(DummyPump)
223 pipe.attach(HDF5Sink, filename=fobj.name, keys=keys)
224 pipe.drain(5)
226 with tb.File(fobj.name, "r") as f:
227 assert "/tab_a" in f
228 assert "/tab_b" in f
229 assert "/tab_c" not in f
231 fobj.close()
233 def test_write_table_service(self):
234 fobj = tempfile.NamedTemporaryFile(delete=True)
236 class Foo(Module):
237 def prepare(self):
238 self.services["write_table"](Table({"a": 1}, name="A", h5loc="tab_a"))
240 pipe = Pipeline()
241 pipe.attach(Foo)
242 pipe.attach(HDF5Sink, filename=fobj.name)
243 pipe.drain(5)
245 with tb.File(fobj.name, "r") as f:
246 assert "/tab_a" in f
248 fobj.close()
251class TestNDArrayHandling(TestCase):
252 def test_writing_of_n_dim_arrays_with_defaults(self):
253 fobj = tempfile.NamedTemporaryFile(delete=True)
254 fname = fobj.name
256 arr = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
258 class DummyPump(Module):
259 def process(self, blob):
260 blob["foo"] = NDArray(arr)
261 return blob
263 pipe = Pipeline()
264 pipe.attach(DummyPump)
265 pipe.attach(HDF5Sink, filename=fname)
266 pipe.drain(3)
268 with tb.File(fname) as f:
269 foo = f.get_node("/misc")
270 assert 3 == foo[0, 1, 0]
271 assert 4 == foo[0, 1, 1]
272 assert "Unnamed NDArray" == foo.title
273 indices = f.get_node("/misc_indices")
274 self.assertTupleEqual((0, 2, 4), tuple(indices.cols.index[:]))
275 self.assertTupleEqual((2, 2, 2), tuple(indices.cols.n_items[:]))
277 fobj.close()
279 def test_writing_of_n_dim_arrays(self):
280 fobj = tempfile.NamedTemporaryFile(delete=True)
281 fname = fobj.name
283 arr = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
285 class DummyPump(Module):
286 def configure(self):
287 self.index = 0
289 def process(self, blob):
290 blob["foo"] = NDArray(arr + self.index * 10, h5loc="/foo", title="Yep")
291 self.index += 1
292 return blob
294 pipe = Pipeline()
295 pipe.attach(DummyPump)
296 pipe.attach(HDF5Sink, filename=fname)
297 pipe.drain(3)
299 with tb.File(fname) as f:
300 foo = f.get_node("/foo")
301 assert 3 == foo[0, 1, 0]
302 assert 4 == foo[0, 1, 1]
303 assert "Yep" == foo.title
305 fobj.close()
307 def test_writing_of_n_dim_arrays_in_nested_group(self):
308 fobj = tempfile.NamedTemporaryFile(delete=True)
309 fname = fobj.name
311 arr = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
313 class DummyPump(Module):
314 def configure(self):
315 self.index = 0
317 def process(self, blob):
318 blob["foo"] = NDArray(arr + self.index * 10, h5loc="/foo/bar/baz")
319 self.index += 1
320 return blob
322 pipe = Pipeline()
323 pipe.attach(DummyPump)
324 pipe.attach(HDF5Sink, filename=fname)
325 pipe.drain(3)
327 with tb.File(fname) as f:
328 foo = f.get_node("/foo/bar/baz")
329 print(foo)
330 assert 3 == foo[0, 1, 0]
331 assert 4 == foo[0, 1, 1]
333 fobj.close()
335 def test_writing_of_n_dim_arrays(self):
336 fobj = tempfile.NamedTemporaryFile(delete=True)
337 fname = fobj.name
339 arr = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
341 class DummyPump(Module):
342 def configure(self):
343 self.index = 0
345 def process(self, blob):
346 blob["foo"] = NDArray(arr + self.index * 10, h5loc="/foo", title="Yep")
347 self.index += 1
348 return blob
350 pipe = Pipeline()
351 pipe.attach(DummyPump)
352 pipe.attach(HDF5Sink, filename=fname)
353 pipe.drain(3)
355 with tb.File(fname) as f:
356 foo = f.get_node("/foo")
357 assert 3 == foo[0, 1, 0]
358 assert 4 == foo[0, 1, 1]
359 assert "Yep" == foo.title
361 fobj.close()
363 def test_reading_of_n_dim_arrays(self):
364 fobj = tempfile.NamedTemporaryFile(delete=True)
365 fname = fobj.name
367 arr = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
369 class DummyPump(Module):
370 def configure(self):
371 self.index = 0
373 def process(self, blob):
374 blob["Foo"] = NDArray(arr + self.index * 10, h5loc="/foo", title="Yep")
375 self.index += 1
376 return blob
378 pipe = Pipeline()
379 pipe.attach(DummyPump)
380 pipe.attach(HDF5Sink, filename=fname)
381 pipe.drain(3)
383 class Observer(Module):
384 def configure(self):
385 self.index = 0
387 def process(self, blob):
388 assert "Foo" in blob
389 foo = blob["Foo"]
390 print(self.index)
391 assert self.index * 10 + 1 == foo[0, 0, 0]
392 assert self.index * 10 + 8 == foo[1, 1, 1]
393 assert self.index * 10 + 3 == foo[0, 1, 0]
394 assert self.index * 10 + 6 == foo[1, 0, 1]
395 self.index += 1
396 return blob
398 pipe = Pipeline()
399 pipe.attach(HDF5Pump, filename=fname)
400 pipe.attach(Observer)
401 pipe.drain()
403 fobj.close()
406class TestH5SinkSkippedBlobs(TestCase):
407 def test_skipped_blob_with_tables(self):
408 fobj = tempfile.NamedTemporaryFile(delete=True)
409 fname = fobj.name
411 class DummyPump(Module):
412 def configure(self):
413 self.index = 0
415 def process(self, blob):
416 blob["Tab"] = Table(
417 {"a": np.arange(self.index + 1), "i": self.index}, h5loc="/tab"
418 )
419 self.index += 1
420 return blob
422 pipe = Pipeline()
423 pipe.attach(DummyPump)
424 pipe.attach(Skipper, indices=[2])
425 pipe.attach(HDF5Sink, filename=fname)
426 pipe.drain(5)
428 with tb.File(fname) as f:
429 a = f.get_node("/tab")[:]["a"]
430 i = f.get_node("/tab")[:]["i"]
431 group_id = f.get_node("/tab")[:]["group_id"]
432 assert np.allclose([0, 1, 1, 3, 3, 3, 3, 4, 4, 4, 4, 4], i)
433 assert np.allclose([0, 0, 1, 0, 1, 2, 3, 0, 1, 2, 3, 4], a)
434 assert np.allclose([0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3], group_id)
436 fobj.close()
438 def test_skipped_blob_with_ndarray(self):
439 fobj = tempfile.NamedTemporaryFile(delete=True)
440 fname = fobj.name
442 class DummyPump(Module):
443 def configure(self):
444 self.index = 0
446 def process(self, blob):
447 blob["Arr"] = NDArray(np.arange(self.index + 1), h5loc="/arr")
448 self.index += 1
449 return blob
451 pipe = Pipeline()
452 pipe.attach(DummyPump)
453 pipe.attach(Skipper, indices=[2])
454 pipe.attach(HDF5Sink, filename=fname)
455 pipe.drain(5)
457 with tb.File(fname) as f:
458 a = f.get_node("/arr")[:]
459 index_table = f.get_node("/arr_indices")[:]
460 assert np.allclose([0, 0, 1, 0, 1, 2, 3, 0, 1, 2, 3, 4], a)
461 assert np.allclose([0, 1, 3, 7], index_table["index"])
462 assert np.allclose([1, 2, 4, 5], index_table["n_items"])
464 fobj.close()
466 def test_skipped_blob_with_tables_and_ndarrays(self):
467 fobj = tempfile.NamedTemporaryFile(delete=True)
468 fname = fobj.name
470 class DummyPump(Module):
471 def configure(self):
472 self.index = 0
474 def process(self, blob):
475 blob["Arr"] = NDArray(np.arange(self.index + 1), h5loc="/arr")
476 blob["Tab"] = Table(
477 {"a": np.arange(self.index + 1), "i": self.index}, h5loc="/tab"
478 )
479 self.index += 1
480 return blob
482 pipe = Pipeline()
483 pipe.attach(DummyPump)
484 pipe.attach(Skipper, indices=[2])
485 pipe.attach(HDF5Sink, filename=fname)
486 pipe.drain(5)
488 with tb.File(fname) as f:
489 tab_a = f.get_node("/tab")[:]["a"]
490 tab_i = f.get_node("/tab")[:]["i"]
491 group_id = f.get_node("/tab")[:]["group_id"]
493 arr = f.get_node("/arr")[:]
494 index_table = f.get_node("/arr_indices")[:]
496 group_info = f.get_node("/group_info")[:]
498 assert np.allclose([0, 1, 1, 3, 3, 3, 3, 4, 4, 4, 4, 4], tab_i)
499 assert np.allclose([0, 0, 1, 0, 1, 2, 3, 0, 1, 2, 3, 4], tab_a)
500 assert np.allclose([0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3], group_id)
502 assert np.allclose([0, 0, 1, 0, 1, 2, 3, 0, 1, 2, 3, 4], arr)
503 assert np.allclose([0, 1, 3, 7], index_table["index"])
504 assert np.allclose([1, 2, 4, 5], index_table["n_items"])
506 fobj.close()
508 def test_skipped_blob_with_tables_and_ndarrays_first_and_last(self):
509 fobj = tempfile.NamedTemporaryFile(delete=True)
510 fname = fobj.name
512 class DummyPump(Module):
513 def configure(self):
514 self.index = 0
516 def process(self, blob):
517 blob["Arr"] = NDArray(np.arange(self.index + 1), h5loc="/arr")
518 blob["Tab"] = Table(
519 {"a": np.arange(self.index + 1), "i": self.index}, h5loc="/tab"
520 )
521 self.index += 1
522 return blob
524 pipe = Pipeline()
525 pipe.attach(DummyPump)
526 pipe.attach(Skipper, indices=[0, 4])
527 pipe.attach(HDF5Sink, filename=fname)
528 pipe.drain(5)
530 with tb.File(fname) as f:
531 tab_a = f.get_node("/tab")[:]["a"]
532 tab_i = f.get_node("/tab")[:]["i"]
533 group_id = f.get_node("/tab")[:]["group_id"]
535 arr = f.get_node("/arr")[:]
536 index_table = f.get_node("/arr_indices")[:]
538 group_info = f.get_node("/group_info")[:]
540 assert np.allclose([1, 1, 2, 2, 2, 3, 3, 3, 3], tab_i)
541 assert np.allclose([0, 1, 0, 1, 2, 0, 1, 2, 3], tab_a)
542 assert np.allclose([0, 0, 1, 1, 1, 2, 2, 2, 2], group_id)
544 assert np.allclose([0, 1, 0, 1, 2, 0, 1, 2, 3], arr)
545 assert np.allclose([0, 2, 5], index_table["index"])
546 assert np.allclose([2, 3, 4], index_table["n_items"])
548 fobj.close()
551class TestH5SinkConsistency(TestCase):
552 def test_h5_consistency_for_tables_without_group_id(self):
553 fobj = tempfile.NamedTemporaryFile(delete=True)
554 fname = fobj.name
556 class DummyPump(Module):
557 def configure(self):
558 self.count = 0
560 def process(self, blob):
561 self.count += 10
562 tab = Table({"a": self.count, "b": 1}, h5loc="tab")
563 return Blob({"tab": tab})
565 pipe = Pipeline()
566 pipe.attach(DummyPump)
567 pipe.attach(HDF5Sink, filename=fname)
568 pipe.drain(5)
570 with tb.File(fname) as f:
571 a = f.get_node("/tab")[:]["a"]
572 b = f.get_node("/tab")[:]["b"]
573 group_id = f.get_node("/tab")[:]["group_id"]
574 assert np.allclose([10, 20, 30, 40, 50], a)
575 assert np.allclose([1, 1, 1, 1, 1], b)
576 assert np.allclose([0, 1, 2, 3, 4], group_id)
577 fobj.close()
579 def test_h5_consistency_for_tables_without_group_id_and_multiple_keys(self):
580 fobj = tempfile.NamedTemporaryFile(delete=True)
581 fname = fobj.name
583 class DummyPump(Module):
584 def configure(self):
585 self.count = 0
587 def process(self, blob):
588 self.count += 10
589 tab1 = Table({"a": self.count, "b": 1}, h5loc="tab1")
590 tab2 = Table({"c": self.count + 1, "d": 2}, h5loc="tab2")
591 return Blob({"tab1": tab1, "tab2": tab2})
593 pipe = Pipeline()
594 pipe.attach(DummyPump)
595 pipe.attach(HDF5Sink, filename=fname)
596 pipe.drain(5)
598 with tb.File(fname) as f:
599 a = f.get_node("/tab1")[:]["a"]
600 b = f.get_node("/tab1")[:]["b"]
601 c = f.get_node("/tab2")[:]["c"]
602 d = f.get_node("/tab2")[:]["d"]
603 group_id_1 = f.get_node("/tab1")[:]["group_id"]
604 group_id_2 = f.get_node("/tab1")[:]["group_id"]
605 assert np.allclose([10, 20, 30, 40, 50], a)
606 assert np.allclose([1, 1, 1, 1, 1], b)
607 assert np.allclose([0, 1, 2, 3, 4], group_id_1)
608 assert np.allclose([11, 21, 31, 41, 51], c)
609 assert np.allclose([2, 2, 2, 2, 2], d)
610 assert np.allclose([0, 1, 2, 3, 4], group_id_2)
611 fobj.close()
613 def test_h5_consistency_for_tables_with_custom_group_id(self):
614 fobj = tempfile.NamedTemporaryFile(delete=True)
615 fname = fobj.name
617 class DummyPump(Module):
618 def process(self, blob):
619 tab = Table({"group_id": 2}, h5loc="tab")
620 return Blob({"tab": tab})
622 pipe = Pipeline()
623 pipe.attach(DummyPump)
624 pipe.attach(HDF5Sink, filename=fname, reset_group_id=False)
625 pipe.drain(5)
627 with tb.File(fname) as f:
628 group_id = f.get_node("/tab")[:]["group_id"]
630 assert np.allclose([2, 2, 2, 2, 2], group_id)
632 fobj.close()
634 def test_h5_singletons(self):
635 fobj = tempfile.NamedTemporaryFile(delete=True)
636 fname = fobj.name
638 class DummyPump(Module):
639 def process(self, blob):
640 tab = Table({"a": 2}, h5loc="tab", h5singleton=True)
641 return Blob({"tab": tab})
643 pipe = Pipeline()
644 pipe.attach(DummyPump)
645 pipe.attach(HDF5Sink, filename=fname)
646 pipe.drain(5)
648 with tb.File(fname) as f:
649 a = f.get_node("/tab")[:]["a"]
651 assert len(a) == 1
653 fobj.close()
655 def test_h5_singletons_reading(self):
656 fobj = tempfile.NamedTemporaryFile(delete=True)
657 fname = fobj.name
659 class DummyPump(Module):
660 def process(self, blob):
661 tab = Table({"a": 2}, h5loc="tab", h5singleton=True)
662 return Blob({"Tab": tab})
664 pipe = Pipeline()
665 pipe.attach(DummyPump)
666 pipe.attach(HDF5Sink, filename=fname)
667 pipe.drain(5)
669 class Observer(Module):
670 def process(self, blob):
671 print(blob)
672 assert "Tab" in blob
673 print(blob["Tab"])
674 assert len(blob["Tab"]) == 1
675 assert blob["Tab"].a[0] == 2
676 return blob
678 pipe = Pipeline()
679 pipe.attach(HDF5Pump, filename=fname)
680 pipe.attach(Observer)
681 pipe.drain()
683 fobj.close()
686class TestHDF5PumpConsistency(TestCase):
687 def test_hdf5_readout(self):
688 fobj = tempfile.NamedTemporaryFile(delete=True)
689 fname = fobj.name
691 class DummyPump(Module):
692 def configure(self):
693 self.count = 0
695 def process(self, blob):
696 self.count += 1
697 tab = Table({"a": self.count * 10, "b": 1}, h5loc="tab")
698 tab2 = Table({"a": np.arange(self.count)}, h5loc="tab2")
699 blob["Tab"] = tab
700 blob["Tab2"] = tab2
701 return blob
703 pipe = Pipeline()
704 pipe.attach(DummyPump)
705 pipe.attach(HDF5Sink, filename=fname)
706 pipe.drain(5)
708 class BlobTester(Module):
709 def configure(self):
710 self.index = 0
712 def process(self, blob):
713 self.index += 1
714 assert "GroupInfo" in blob
715 assert "Tab" in blob
716 print(self.index)
717 print(blob["Tab"])
718 print(blob["Tab"]["a"])
719 assert self.index - 1 == blob["GroupInfo"].group_id
720 assert self.index * 10 == blob["Tab"]["a"]
721 assert 1 == blob["Tab"]["b"] == 1
722 assert np.allclose(np.arange(self.index), blob["Tab2"]["a"])
723 return blob
725 pipe = Pipeline()
726 pipe.attach(HDF5Pump, filename=fname)
727 pipe.attach(BlobTester)
728 pipe.drain()
730 fobj.close()
732 def test_hdf5_readout_split_tables(self):
733 fobj = tempfile.NamedTemporaryFile(delete=True)
734 fname = fobj.name
736 class DummyPump(Module):
737 def configure(self):
738 self.count = 0
740 def process(self, blob):
741 self.count += 1
742 tab = Table({"a": self.count * 10, "b": 1}, h5loc="/tab", split_h5=True)
743 blob["Tab"] = tab
744 return blob
746 pipe = Pipeline()
747 pipe.attach(DummyPump)
748 pipe.attach(HDF5Sink, filename=fname)
749 pipe.drain(5)
751 class BlobTester(Module):
752 def configure(self):
753 self.index = 0
755 def process(self, blob):
756 self.index += 1
757 assert "GroupInfo" in blob
758 assert "Tab" in blob
759 assert self.index - 1 == blob["GroupInfo"].group_id
760 assert self.index * 10 == blob["Tab"]["a"]
761 assert 1 == blob["Tab"]["b"]
762 return blob
764 pipe = Pipeline()
765 pipe.attach(HDF5Pump, filename=fname)
766 pipe.attach(BlobTester)
767 pipe.drain()
769 fobj.close()
771 def test_hdf5_readout_split_tables_in_same_group(self):
772 fobj = tempfile.NamedTemporaryFile(delete=True)
773 fname = fobj.name
775 class DummyPump(Module):
776 def configure(self):
777 self.count = 0
779 def process(self, blob):
780 self.count += 1
781 tab_a = Table(
782 {
783 "a": self.count * 10,
784 },
785 h5loc="/tabs/tab_a",
786 split_h5=True,
787 )
788 tab_b = Table(
789 {
790 "b": self.count * 100,
791 },
792 h5loc="/tabs/tab_b",
793 split_h5=True,
794 )
795 blob["TabA"] = tab_a
796 blob["TabB"] = tab_b
797 return blob
799 pipe = Pipeline()
800 pipe.attach(DummyPump)
801 pipe.attach(HDF5Sink, filename=fname)
802 pipe.drain(5)
804 class BlobTester(Module):
805 def configure(self):
806 self.index = 0
808 def process(self, blob):
809 self.index += 1
810 assert "GroupInfo" in blob
811 assert "TabA" in blob
812 assert "TabB" in blob
813 assert self.index - 1 == blob["GroupInfo"].group_id
814 assert self.index * 10 == blob["TabA"]["a"]
815 assert self.index * 100 == blob["TabB"]["b"]
816 return blob
818 pipe = Pipeline()
819 pipe.attach(HDF5Pump, filename=fname)
820 pipe.attach(BlobTester)
821 pipe.drain()
823 fobj.close()
825 def test_sparse_table(self):
826 fobj = tempfile.NamedTemporaryFile(delete=True)
827 fname = fobj.name
829 class Dummy(Module):
830 def configure(self):
831 self.i = 0
833 def process(self, blob):
834 self.i += 1
836 if self.i == 5:
837 blob["Tab"] = Table({"a": 23}, h5loc="/tab")
838 return blob
840 pipe = Pipeline()
841 pipe.attach(Dummy)
842 pipe.attach(HDF5Sink, filename=fname)
843 pipe.drain(10)
845 class Observer(Module):
846 def configure(self):
847 self.i = 0
849 def process(self, blob):
850 self.i += 1
852 if self.i == 5:
853 assert 23 == blob["Tab"].a[0]
854 else:
855 assert "Tab" not in blob
857 return blob
859 pipe = Pipeline()
860 pipe.attach(HDF5Pump, filename=fname)
861 pipe.attach(Observer)
862 pipe.drain()
864 def test_sparse_ndarray(self):
865 fobj = tempfile.NamedTemporaryFile(delete=True)
866 fname = fobj.name
868 class Dummy(Module):
869 def configure(self):
870 self.i = 0
872 def process(self, blob):
873 self.i += 1
875 if self.i == 5:
876 blob["Arr"] = NDArray([1, 2, 3], h5loc="/arr")
877 return blob
879 pipe = Pipeline()
880 pipe.attach(Dummy)
881 pipe.attach(HDF5Sink, filename=fname)
882 pipe.drain(10)
884 class Observer(Module):
885 def configure(self):
886 self.i = 0
888 def process(self, blob):
889 self.i += 1
891 print(blob)
892 if self.i == 5:
893 assert 6 == np.sum(blob["Arr"])
894 else:
895 assert len(blob["Arr"]) == 0
897 return blob
899 pipe = Pipeline()
900 pipe.attach(HDF5Pump, filename=fname)
901 pipe.attach(Observer)
902 pipe.drain()
905class TestHDF5Shuffle(TestCase):
906 def test_shuffle_without_reset_index(self):
907 fobj = tempfile.NamedTemporaryFile(delete=True)
908 fname = fobj.name
910 class DummyPump(Module):
911 def configure(self):
912 self.i = 0
914 def process(self, blob):
915 blob["Tab"] = Table({"a": self.i}, h5loc="/tab")
916 blob["SplitTab"] = Table(
917 {"b": self.i}, h5loc="/split_tab", split_h5=True
918 )
919 blob["Arr"] = NDArray(np.arange(self.i + 1), h5loc="/arr")
920 self.i += 1
921 return blob
923 pipe = Pipeline()
924 pipe.attach(DummyPump)
925 pipe.attach(HDF5Sink, filename=fname)
926 pipe.drain(5)
928 shuffled_group_ids = [2, 1, 0, 3, 4]
930 def shuffle(x):
931 for i in range(len(x)):
932 x[i] = shuffled_group_ids[i]
934 class Observer(Module):
935 def configure(self):
936 self.group_ids_tab = []
937 self.group_ids_split_tab = []
938 self.group_ids_arr = []
939 self.a = []
940 self.b = []
941 self.arr_len = []
943 def process(self, blob):
944 group_id_tab = blob["Tab"].group_id[0]
945 group_id_split_tab = blob["SplitTab"].group_id[0]
946 group_id_arr = blob["Arr"].group_id
947 assert blob["GroupInfo"].group_id[0] == group_id_tab
948 assert blob["GroupInfo"].group_id[0] == group_id_split_tab
949 assert blob["GroupInfo"].group_id[0] == group_id_arr
950 self.group_ids_tab.append(blob["Tab"].group_id[0])
951 self.group_ids_split_tab.append(blob["SplitTab"].group_id[0])
952 self.group_ids_arr.append(blob["Arr"].group_id)
953 self.a.append(blob["Tab"].a[0])
954 self.b.append(blob["SplitTab"].b[0])
955 self.arr_len.append(len(blob["Arr"]) - 1)
956 return blob
958 def finish(self):
959 return {
960 "group_ids_tab": self.group_ids_tab,
961 "group_ids_split_tab": self.group_ids_split_tab,
962 "group_ids_arr": self.group_ids_arr,
963 "a": self.a,
964 "b": self.b,
965 "arr_len": self.arr_len,
966 }
968 pipe = Pipeline()
969 pipe.attach(
970 HDF5Pump,
971 filename=fname,
972 shuffle=True,
973 shuffle_function=shuffle,
974 reset_index=False,
975 )
976 pipe.attach(Observer)
977 results = pipe.drain()
979 self.assertListEqual(results["Observer"]["group_ids_tab"], shuffled_group_ids)
980 self.assertListEqual(
981 results["Observer"]["group_ids_split_tab"], shuffled_group_ids
982 )
983 self.assertListEqual(results["Observer"]["group_ids_arr"], shuffled_group_ids)
984 self.assertListEqual(results["Observer"]["a"], shuffled_group_ids)
985 self.assertListEqual(results["Observer"]["b"], shuffled_group_ids)
986 # a small hack: we store the length of the array in 'b', which is
987 # then equal to the shuffled group IDs (since those were generated
988 # using the group_id
989 self.assertListEqual(results["Observer"]["arr_len"], shuffled_group_ids)
991 fobj.close()
993 def test_shuffle_with_reset_index(self):
994 fobj = tempfile.NamedTemporaryFile(delete=True)
995 fname = fobj.name
997 class DummyPump(Module):
998 def configure(self):
999 self.i = 0
1001 def process(self, blob):
1002 blob["Tab"] = Table({"a": self.i}, h5loc="/tab")
1003 blob["SplitTab"] = Table(
1004 {"b": self.i}, h5loc="/split_tab", split_h5=True
1005 )
1006 blob["Arr"] = NDArray(np.arange(self.i + 1), h5loc="/arr")
1007 self.i += 1
1008 return blob
1010 pipe = Pipeline()
1011 pipe.attach(DummyPump)
1012 pipe.attach(HDF5Sink, filename=fname)
1013 pipe.drain(5)
1015 shuffled_group_ids = [2, 1, 0, 3, 4]
1017 def shuffle(x):
1018 for i in range(len(x)):
1019 x[i] = shuffled_group_ids[i]
1021 class Observer(Module):
1022 def configure(self):
1023 self.group_ids_tab = []
1024 self.group_ids_split_tab = []
1025 self.group_ids_arr = []
1026 self.a = []
1027 self.b = []
1028 self.arr_len = []
1030 def process(self, blob):
1031 group_id_tab = blob["Tab"].group_id[0]
1032 group_id_split_tab = blob["SplitTab"].group_id[0]
1033 group_id_arr = blob["Arr"].group_id
1034 assert blob["GroupInfo"].group_id[0] == group_id_tab
1035 assert blob["GroupInfo"].group_id[0] == group_id_split_tab
1036 assert blob["GroupInfo"].group_id[0] == group_id_arr
1037 self.group_ids_tab.append(blob["Tab"].group_id[0])
1038 self.group_ids_split_tab.append(blob["SplitTab"].group_id[0])
1039 self.group_ids_arr.append(blob["Arr"].group_id)
1040 self.a.append(blob["Tab"].a[0])
1041 self.b.append(blob["SplitTab"].b[0])
1042 self.arr_len.append(len(blob["Arr"]) - 1)
1043 return blob
1045 def finish(self):
1046 return {
1047 "group_ids_tab": self.group_ids_tab,
1048 "group_ids_split_tab": self.group_ids_split_tab,
1049 "group_ids_arr": self.group_ids_arr,
1050 "a": self.a,
1051 "b": self.b,
1052 "arr_len": self.arr_len,
1053 }
1055 pipe = Pipeline()
1056 pipe.attach(
1057 HDF5Pump,
1058 filename=fname,
1059 shuffle=True,
1060 shuffle_function=shuffle,
1061 reset_index=True,
1062 )
1063 pipe.attach(Observer)
1064 results = pipe.drain()
1066 self.assertListEqual(results["Observer"]["group_ids_tab"], [0, 1, 2, 3, 4])
1067 self.assertListEqual(
1068 results["Observer"]["group_ids_split_tab"], [0, 1, 2, 3, 4]
1069 )
1070 self.assertListEqual(results["Observer"]["group_ids_arr"], [0, 1, 2, 3, 4])
1071 self.assertListEqual(results["Observer"]["a"], shuffled_group_ids)
1072 self.assertListEqual(results["Observer"]["b"], shuffled_group_ids)
1073 # a small hack: we store the length of the array in 'b', which is
1074 # then equal to the shuffled group IDs (since those were generated
1075 # using the group_id
1076 self.assertListEqual(results["Observer"]["arr_len"], shuffled_group_ids)
1078 fobj.close()
1081class TestHDF5Header(TestCase):
1082 def setUp(self):
1083 # self.hdict = OrderedDict([
1084 # # yapf crushes the formatting, never mind...
1085 # # we use OrderedDict here to ensure the correct ordering
1086 # ("param_a", OrderedDict([("field_a_1", "1"), ("field_a_2", "2")])),
1087 # ("param_b", OrderedDict([("field_b_1", "a")])),
1088 # ("param_c", OrderedDict([("field_c_1", 23)])),
1089 # (
1090 # "param_d",
1091 # OrderedDict([("param_d_0", 1), ("param_d_1", 2),
1092 # ("param_d_2", 3)])
1093 # )
1094 # ])
1095 self.hdict = {
1096 "param_a": {"field_a_1": "1", "field_a_2": "2"},
1097 "param_b": {"field_b_1": "a"},
1098 "param_c": {"field_c_1": 23},
1099 "param_d": {"param_d_0": 1, "param_d_1": 2, "param_d_2": 3},
1100 "param_e": {"param_e_2": 3, "param_e_0": 1, "param_e_1": 2},
1101 # "param+invalid.attribute": {"a": 1, "b": 2, "c": 3}
1102 }
1104 def test_init(self):
1105 HDF5Header({})
1107 def test_header_behaves_like_a_dict(self):
1108 h = HDF5Header(self.hdict)
1109 self.assertListEqual(list(h.keys()), list(self.hdict.keys()))
1110 assert 5 == len(h.items())
1111 assert 5 == len(h.values())
1113 def test_header(self):
1114 header = HDF5Header(self.hdict)
1115 assert "1" == header.param_a.field_a_1
1116 assert "2" == header.param_a.field_a_2
1117 assert "a" == header.param_b.field_b_1
1118 assert 23 == header.param_c.field_c_1
1120 def test_header_getitem(self):
1121 header = HDF5Header(self.hdict)
1122 print(header["param_a"])
1123 assert "1" == header["param_a"].field_a_1
1124 assert "2" == header["param_a"].field_a_2
1125 assert "a" == header["param_b"].field_b_1
1126 assert 23 == header["param_c"].field_c_1
1128 def test_header_with_vectors(self):
1129 header = HDF5Header(self.hdict)
1130 self.assertTupleEqual((1, 2, 3), header.param_d)
1132 def test_header_with_scrumbled_vectors(self):
1133 header = HDF5Header(self.hdict)
1134 self.assertTupleEqual((1, 2, 3), header.param_e)
1136 # def test_header_with_scalars(self):
1137 # header = HDF5Header(self.hdict)
1138 # assert 4 == header.param_e
1139 # assert 5.6 == header.param_f
1140 #
1141 # def test_scientific_notation(self):
1142 # header = HDF5Header(self.hdict)
1143 # assert 7e+08 == header.param_g
1145 def test_header_from_table(self):
1146 table = header2table(self.hdict)
1147 header = HDF5Header.from_table(table)
1148 print(header)
1149 assert 1.0 == header.param_a.field_a_1
1150 assert 2.0 == header.param_a.field_a_2
1151 assert "a" == header.param_b.field_b_1
1152 assert 23 == header.param_c.field_c_1
1153 self.assertTupleEqual((1, 2, 3), header.param_d)
1155 def test_header_from_hdf5_file(self):
1156 header = HDF5Header.from_hdf5(data_path("hdf5/raw_header.h5"))
1157 assert "MUSIC" == header.propag[0]
1158 assert "seawater" == header.propag[1]
1159 assert 3450 == header.seabottom[0]
1160 self.assertAlmostEqual(12.1, header.livetime.numberOfSeconds, places=3)
1161 self.assertAlmostEqual(0.09, header.livetime.errorOfSeconds, places=3)
1162 assert 0 == header.coord_origin.x
1163 assert 0 == header.coord_origin.y
1164 assert 0 == header.coord_origin.z
1165 self.assertTupleEqual((0, 0, 0), header.coord_origin)
1167 def test_header_from_hdf5_file_with_invalid_identifier_names_in_header(self):
1168 header = HDF5Header.from_hdf5(data_path("hdf5/geamon.h5"))
1169 assert 1.0 == header["drays+z"][0]
1170 assert 68.5 == header["drays+z"][1]
1172 def test_header_from_table_with_bytes(self):
1173 table = Table(
1174 {
1175 "dtype": [b"f4 a2", b"f4"],
1176 "field_names": [b"a b", b"c"],
1177 "field_values": [b"1.2 ab", b"3.4"],
1178 "parameter": [b"foo", b"bar"],
1179 }
1180 )
1181 header = HDF5Header.from_aanet(table)
1182 self.assertAlmostEqual(1.2, header.foo.a, places=2)
1183 assert "ab" == header.foo.b
1184 self.assertAlmostEqual(3.4, header.bar.c, places=2)
1186 def test_header_from_km3io(self):
1187 head = {
1188 "a": "1 2 3",
1189 "b+c": "4 5 6",
1190 "c": "foo",
1191 "d": "7",
1192 "e+f": "bar",
1193 }
1195 header = HDF5Header.from_km3io(km3io.offline.Header(head))
1197 assert 1 == header["a"][0]
1198 assert 2 == header["a"][1]
1199 assert 3 == header["a"][2]
1200 assert 1 == header.a[0]
1201 assert 2 == header.a[1]
1202 assert 3 == header.a[2]
1203 assert 4 == header["b+c"][0]
1204 assert 5 == header["b+c"][1]
1205 assert 6 == header["b+c"][2]
1206 assert "foo" == header.c
1207 assert "foo" == header["c"]
1208 assert 7 == header.d
1209 assert 7 == header["d"]
1210 assert "bar" == header["e+f"]
1212 def test_header_fails_when_no_info_in_file(self):
1213 with self.assertRaises(tb.NoSuchNodeError):
1214 HDF5Header.from_hdf5(data_path("hdf5/basic_analysis_sample.h5"))
1217class TestConvertHeaderDictToTable(TestCase):
1218 def setUp(self):
1219 hdict = {
1220 "param_a": {"field_a_1": "1", "field_a_2": "2"},
1221 "param_b": {"field_b_1": "a"},
1222 "param_c": {"field_c_1": 1},
1223 }
1224 self.tab = header2table(hdict)
1226 def test_length(self):
1227 assert 3 == len(self.tab)
1229 def test_values(self):
1230 tab = self.tab
1232 index_a = tab.parameter.tolist().index(b"param_a")
1233 index_b = tab.parameter.tolist().index(b"param_b")
1235 assert b"param_a" == tab.parameter[index_a]
1236 assert b"field_a_1" in tab.field_names[index_a]
1237 assert b"field_a_2" in tab.field_names[index_a]
1238 if b"field_a_1 field_a_2" == tab.field_names[index_a]:
1239 assert b"1 2" == tab.field_values[index_a]
1240 else:
1241 assert b"2 1" == tab.field_values[index_a]
1242 assert b"f4 f4" == tab["dtype"][index_a]
1244 assert b"param_b" == tab.parameter[index_b]
1245 assert b"field_b_1" == tab.field_names[index_b]
1246 assert b"a" == tab.field_values[index_b]
1247 assert b"a1" == tab["dtype"][index_b]
1249 def test_values_are_converted_to_str(self):
1250 index_c = self.tab.parameter.tolist().index(b"param_c")
1251 assert b"param_c" == self.tab.parameter[index_c]
1252 assert b"1" == self.tab.field_values[index_c]
1254 def test_conversion_returns_none_for_empty_dict(self):
1255 assert None is header2table(None)
1256 assert None is header2table({})
1258 def test_conversion_of_km3io_header(self):
1259 header = km3io.OfflineReader(data_path("offline/numucc.root")).header
1260 tab = header2table(header)
1261 print(tab)
1262 for p in [
1263 b"DAQ",
1264 b"PDF",
1265 b"can",
1266 b"can_user",
1267 b"coord_origin",
1268 b"cut_in",
1269 b"cut_nu",
1270 b"cut_primary",
1271 b"cut_seamuon",
1272 b"decay",
1273 b"detector",
1274 b"drawing",
1275 b"genhencut",
1276 b"genvol",
1277 b"kcut",
1278 b"livetime",
1279 b"model",
1280 b"ngen",
1281 b"norma",
1282 b"nuflux",
1283 b"physics",
1284 b"seed",
1285 b"simul",
1286 b"sourcemode",
1287 b"spectrum",
1288 b"start_run",
1289 b"target",
1290 b"usedetfile",
1291 b"xlat_user",
1292 b"xparam",
1293 b"zed_user",
1294 ]:
1295 assert p in tab.parameter
1297 h5header = HDF5Header.from_table(tab)
1298 assert h5header.can.zmin == header.can.zmin
1300 def test_conversion_of_hdf5header(self):
1301 hdict = {
1302 "param_a": {"field_a_1": "1", "field_a_2": "2"},
1303 "param_b": {"field_b_1": "a"},
1304 "param_c": {"field_c_1": 1},
1305 }
1307 header = HDF5Header(hdict)
1308 tab = header2table(header)
1310 for p in [b"param_a", b"param_b", b"param_c"]:
1311 assert p in tab.parameter
1313 hdf5header_from_table = HDF5Header.from_table(tab)