Coverage for src/km3modules/tests/test_common.py: 99%
439 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1# Filename: test_time.py
2# pylint: disable=locally-disabled,C0111,R0904,C0103
4import sqlite3
5import tempfile
7import km3pipe as kp
8from km3pipe.dataclasses import Table
9from km3modules.common import (
10 Siphon,
11 Delete,
12 Keep,
13 Dump,
14 StatusBar,
15 TickTock,
16 MemoryObserver,
17 BlobIndexer,
18 LocalDBService,
19 Observer,
20 MultiFilePump,
21 FilePump,
22)
23from km3pipe.testing import TestCase, MagicMock
24from km3pipe.tools import istype
26__author__ = "Tamas Gal"
27__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
28__credits__ = []
29__license__ = "MIT"
30__maintainer__ = "Tamas Gal"
31__email__ = "tgal@km3net.de"
32__status__ = "Development"
35class InfinitePump(kp.Module):
36 """A pump which just infinetly spits out indexed blobs"""
38 def configure(self):
39 self.i = 0
41 def process(self, blob):
42 self.i += 1
43 blob["i"] = self.i
44 return blob
47class TestKeep(TestCase):
48 def test_keep_a_single_key(self):
49 class APump(kp.Module):
50 def process(self, blob):
51 blob["a"] = "a"
52 blob["b"] = "b"
53 blob["c"] = "c"
54 blob["d"] = "d"
55 return blob
57 class Observer(kp.Module):
58 def process(self, blob):
59 assert "a" not in blob
60 assert "b" not in blob
61 assert "c" not in blob
62 assert "d" == blob["d"]
63 return blob
65 pipe = kp.Pipeline()
66 pipe.attach(APump)
67 pipe.attach(Keep, keys="d")
68 pipe.attach(Observer)
69 pipe.drain(5)
71 def test_keep_multiple_keys(self):
72 class APump(kp.Module):
73 def process(self, blob):
74 blob["a"] = "a"
75 blob["b"] = "b"
76 blob["c"] = "c"
77 blob["d"] = "d"
78 return blob
80 class Observer(kp.Module):
81 def process(self, blob):
82 assert "a" not in blob
83 assert "b" == blob["b"]
84 assert "c" not in blob
85 assert "d" == blob["d"]
86 return blob
88 pipe = kp.Pipeline()
89 pipe.attach(APump)
90 pipe.attach(Keep, keys=["b", "d"])
91 pipe.attach(Observer)
92 pipe.drain(5)
94 def test_hdf5_keep_group_wo_subgroup(self):
95 class APump(kp.Module):
96 def process(self, blob):
97 blob["A"] = kp.Table(
98 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar"
99 )
100 blob["B"] = kp.Table(
101 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/ab"
102 )
103 return blob
105 class Observer(kp.Module):
106 def process(self, blob):
107 assert "A" in blob.keys()
108 assert "/foobar" == blob["A"].h5loc
109 assert not "B" in blob.keys()
110 return blob
112 pipe = kp.Pipeline()
113 pipe.attach(APump)
114 pipe.attach(Keep, h5locs=["/foobar"])
115 pipe.attach(Observer)
116 pipe.drain(5)
118 def test_hdf5_keep_group_w_subgroup(self):
119 class APump(kp.Module):
120 def process(self, blob):
121 blob["A"] = kp.Table(
122 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar"
123 )
124 blob["B"] = kp.Table(
125 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/ab"
126 )
127 return blob
129 class Observer(kp.Module):
130 def process(self, blob):
131 assert "A" in blob.keys()
132 assert "/foobar" == blob["A"].h5loc
133 assert not "B" in blob.keys()
134 return blob
136 pipe = kp.Pipeline()
137 pipe.attach(APump)
138 pipe.attach(Keep, h5locs=["/foobar"])
139 pipe.attach(Observer)
140 pipe.drain(5)
142 def test_key_hdf5_group_individual(self):
143 class APump(kp.Module):
144 def process(self, blob):
145 blob["A"] = kp.Table(
146 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar"
147 )
148 blob["B"] = kp.Table(
149 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/ab"
150 )
151 return blob
153 class Observer(kp.Module):
154 def process(self, blob):
155 assert "A" in blob.keys()
156 assert "/foobar" == blob["A"].h5loc
157 assert "B" in blob.keys()
158 assert "/ab" == blob["B"].h5loc
159 return blob
161 pipe = kp.Pipeline()
162 pipe.attach(APump)
163 pipe.attach(Keep, keys=["B"], h5locs=["/foobar"])
164 pipe.attach(Observer)
165 pipe.drain(5)
167 def test_key_hdf5_group_parallel(self):
168 class APump(kp.Module):
169 def process(self, blob):
170 blob["A"] = kp.Table(
171 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar"
172 )
173 blob["B"] = kp.Table(
174 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/ab"
175 )
176 return blob
178 class Observer(kp.Module):
179 def process(self, blob):
180 assert "A" in blob.keys()
181 assert "/foobar" == blob["A"].h5loc
182 assert not "B" in blob.keys()
183 return blob
185 pipe = kp.Pipeline()
186 pipe.attach(APump)
187 pipe.attach(Keep, keys=["A"], h5locs=["/foobar"])
188 pipe.attach(Observer)
189 pipe.drain(5)
191 def test_major_hdf5_group(self):
192 class APump(kp.Module):
193 def process(self, blob):
194 blob["A"] = kp.Table(
195 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar/a"
196 )
197 blob["B"] = kp.Table(
198 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/foobar/b"
199 )
200 return blob
202 class Observer(kp.Module):
203 def process(self, blob):
204 assert "A" in blob.keys()
205 assert "/foobar/a" == blob["A"].h5loc
206 assert "B" in blob.keys()
207 assert "/foobar/b" == blob["B"].h5loc
208 return blob
210 pipe = kp.Pipeline()
211 pipe.attach(APump)
212 pipe.attach(Keep, h5locs=["/foobar"])
213 pipe.attach(Observer)
214 pipe.drain(5)
216 def test_major_hdf5_group_nested(self):
217 class APump(kp.Module):
218 def process(self, blob):
219 blob["A"] = kp.Table({"a": 0}, h5loc="/foo/bar/a")
220 blob["B"] = kp.Table({"b": 1}, h5loc="/foo/bar/baz/b")
221 blob["C"] = kp.Table({"c": 2}, h5loc="/foo/bar/baz/fjord/c")
222 return blob
224 class Observer(kp.Module):
225 def process(self, blob):
226 assert "A" not in blob
227 assert "B" in blob
228 assert "C" in blob
229 return blob
231 pipe = kp.Pipeline()
232 pipe.attach(APump)
233 pipe.attach(Keep, h5locs=["/foo/bar/baz"])
234 pipe.attach(Observer)
235 pipe.drain(5)
238class TestDelete(TestCase):
239 def test_delete_a_single_key(self):
240 class APump(kp.Module):
241 def process(self, blob):
242 blob["a"] = "a"
243 blob["b"] = "b"
244 blob["c"] = "c"
245 return blob
247 class Observer(kp.Module):
248 def process(self, blob):
249 assert "a" == blob["a"]
250 assert "b" not in blob
251 assert "c" == blob["c"]
252 return blob
254 pipe = kp.Pipeline()
255 pipe.attach(APump)
256 pipe.attach(Delete, key="b")
257 pipe.attach(Observer)
258 pipe.drain(5)
260 def test_delete_multiple_keys(self):
261 class APump(kp.Module):
262 def process(self, blob):
263 blob["a"] = "a"
264 blob["b"] = "b"
265 blob["c"] = "c"
266 return blob
268 class Observer(kp.Module):
269 def process(self, blob):
270 assert "a" not in blob
271 assert "b" not in blob
272 assert "c" == blob["c"]
273 return blob
275 pipe = kp.Pipeline()
276 pipe.attach(APump)
277 pipe.attach(Delete, keys=["a", "b"])
278 pipe.attach(Observer)
279 pipe.drain(5)
282class TestSiphon(TestCase):
283 def test_siphon(self):
284 class Observer(kp.Module):
285 def configure(self):
286 self.mock = MagicMock()
288 def process(self, blob):
289 self.mock()
290 return blob
292 def finish(self):
293 assert self.mock.call_count == 7
295 pipe = kp.Pipeline()
296 pipe.attach(InfinitePump)
297 pipe.attach(Siphon, volume=10)
298 pipe.attach(Observer)
299 pipe.drain(17)
301 def test_siphon_with_flush(self):
302 class Observer(kp.Module):
303 def configure(self):
304 self.mock = MagicMock()
306 def process(self, blob):
307 self.mock()
308 return blob
310 def finish(self):
311 assert self.mock.call_count == 1
313 pipe = kp.Pipeline()
314 pipe.attach(InfinitePump)
315 pipe.attach(Siphon, volume=10, flush=True)
316 pipe.attach(Observer)
317 pipe.drain(21)
319 def test_siphon_with_flush_2(self):
320 class Observer(kp.Module):
321 def configure(self):
322 self.mock = MagicMock()
324 def process(self, blob):
325 self.mock()
326 return blob
328 def finish(self):
329 assert self.mock.call_count == 2
331 pipe = kp.Pipeline()
332 pipe.attach(InfinitePump)
333 pipe.attach(Siphon, volume=10, flush=True)
334 pipe.attach(Observer)
335 pipe.drain(22)
338class TestDump(TestCase):
339 def test_dump(self):
340 def add_something(blob):
341 blob["a"] = 1
342 return blob
344 pipe = kp.Pipeline()
345 pipe.attach(InfinitePump)
346 pipe.attach(add_something)
347 pipe.attach(Dump)
348 pipe.drain(3)
350 def test_dump_a_key(self):
351 def add_something(blob):
352 blob["a"] = 1
353 return blob
355 pipe = kp.Pipeline()
356 pipe.attach(InfinitePump)
357 pipe.attach(add_something)
358 pipe.attach(Dump, key="a")
359 pipe.drain(3)
361 def test_dump_multiple_keys(self):
362 def add_something(blob):
363 blob["a"] = 1
364 blob["b"] = 2
365 return blob
367 pipe = kp.Pipeline()
368 pipe.attach(InfinitePump)
369 pipe.attach(add_something)
370 pipe.attach(Dump, keys=["a", "b"])
371 pipe.drain(3)
373 def test_dump_full(self):
374 def add_something(blob):
375 blob["a"] = 1
376 blob["b"] = 2
377 return blob
379 pipe = kp.Pipeline()
380 pipe.attach(InfinitePump)
381 pipe.attach(add_something)
382 pipe.attach(Dump, full=True)
383 pipe.drain(3)
386class TestStatusbar(TestCase):
387 def test_statusbar(self):
388 pipe = kp.Pipeline()
389 pipe.attach(InfinitePump)
390 pipe.attach(StatusBar, every=2)
391 pipe.drain(5)
394class TestTickTock(TestCase):
395 def test_ticktock(self):
396 pipe = kp.Pipeline()
397 pipe.attach(InfinitePump)
398 pipe.attach(TickTock)
399 pipe.drain(5)
402class TestMemoryObserver(TestCase):
403 def test_memory_observer(self):
404 pipe = kp.Pipeline()
405 pipe.attach(InfinitePump)
406 pipe.attach(MemoryObserver)
407 pipe.drain(5)
410class TestBlobIndexer(TestCase):
411 def test_blob_indexer(self):
412 class Observer(kp.Module):
413 def configure(self):
414 self.index = 0
416 def process(self, blob):
417 assert blob["blob_index"] == self.index
418 self.index += 1
419 return blob
421 pipe = kp.Pipeline()
422 pipe.attach(InfinitePump)
423 pipe.attach(BlobIndexer)
424 pipe.attach(Observer)
425 pipe.drain(4)
428class TestLocalDBService(TestCase):
429 def test_create_table(self):
430 fobj = tempfile.NamedTemporaryFile(delete=True)
431 dbs = LocalDBService(filename=fobj.name)
432 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"])
433 assert dbs.table_exists("foo")
435 def test_create_table_does_not_overwrite_by_default(self):
436 fobj = tempfile.NamedTemporaryFile(delete=True)
437 dbs = LocalDBService(filename=fobj.name)
438 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"])
439 with self.assertRaises(sqlite3.OperationalError):
440 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"])
442 def test_create_table_allows_overwrite(self):
443 fobj = tempfile.NamedTemporaryFile(delete=True)
444 dbs = LocalDBService(filename=fobj.name)
445 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"])
446 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"], overwrite=True)
448 def test_insert_row(self):
449 fobj = tempfile.NamedTemporaryFile(delete=True)
450 dbs = LocalDBService(filename=fobj.name)
451 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"])
453 dbs.insert_row("foo", ["a", "b"], (23, "42"))
454 dbs.insert_row("foo", ["a", "b"], (5, "hello"))
456 cur = dbs.connection.cursor()
457 cur.execute("SELECT * FROM foo")
458 data = cur.fetchall()
459 assert 2 == len(data)
460 assert 23 == data[0][0]
461 assert "42" == data[0][1]
462 assert 5 == data[1][0]
463 assert "hello" == data[1][1]
466class TestObserver(TestCase):
467 def test_observer(self):
468 class Dummy(kp.Module):
469 def process(self, blob):
470 blob["a"] = 1
471 return blob
473 pipe = kp.Pipeline()
474 pipe.attach(Dummy)
475 pipe.attach(Observer, count=5, required_keys="a")
476 pipe.drain(5)
478 def test_observer_raises_when_count_wrong(self):
479 class Dummy(kp.Module):
480 def process(self, blob):
481 return blob
483 pipe = kp.Pipeline()
484 pipe.attach(Dummy)
485 pipe.attach(Observer, count=5)
487 with self.assertRaises(AssertionError):
488 pipe.drain(2)
490 def test_observer_raises_when_key_is_missing(self):
491 class Dummy(kp.Module):
492 def process(self, blob):
493 blob["a"] = 1
494 return blob
496 pipe = kp.Pipeline()
497 pipe.attach(Dummy)
498 pipe.attach(Observer, required_keys=["b"])
500 with self.assertRaises(AssertionError):
501 pipe.drain(2)
504class TestMultiFilePump(TestCase):
505 def test_iteration(self):
506 class DummyPump(kp.Module):
507 def configure(self):
508 self.idx = 0
509 self.max_iterations = self.get("max_iterations", default=5)
510 self.blobs = self.blob_generator()
511 assert 23 == self.get("foo")
512 assert "narf" == self.get("bar")
514 def process(self, blob):
515 return next(self)
517 def blob_generator(self):
518 for idx in range(self.max_iterations):
519 yield kp.Blob({"index": self.idx, "tab": kp.Table({"a": 1})})
521 def finish(self):
522 return self.idx
524 def __iter__(self):
525 return self
527 def __next__(self):
528 return next(self.blobs)
530 filenames = ["a", "b", "c"]
531 max_iterations = 5
532 total_iterations = max_iterations * len(filenames)
534 super_self = self
536 class Observer(kp.Module):
537 def configure(self):
538 self.count = 0
539 self.filenames = []
540 self.group_id = []
542 def process(self, blob):
543 self.count += 1
544 self.filenames.append(blob["filename"])
545 self.group_id.append(blob["tab"].group_id[0])
546 return blob
548 def finish(self):
549 assert self.count == total_iterations
550 assert "".join(f * max_iterations for f in filenames) == "".join(
551 self.filenames
552 )
553 super_self.assertListEqual(list(range(total_iterations)), self.group_id)
555 pipe = kp.Pipeline()
556 pipe.attach(
557 MultiFilePump,
558 pump=DummyPump,
559 filenames=filenames,
560 max_iterations=max_iterations,
561 kwargs={"foo": 23, "bar": "narf"},
562 )
563 pipe.attach(Observer)
564 pipe.drain()
567class TestFilePump(TestCase):
568 def test_iteration(self):
569 filenames = ["a", "b", "c"]
571 super_self = self
573 class Observer(kp.Module):
574 def configure(self):
575 self.count = 0
576 self.filenames = []
578 def process(self, blob):
579 self.count += 1
580 self.filenames.append(blob["filename"])
581 return blob
583 def finish(self):
584 assert self.count == len(filenames)
585 super_self.assertListEqual(filenames, self.filenames)
587 pipe = kp.Pipeline()
588 pipe.attach(FilePump, filenames=filenames)
589 pipe.attach(Observer)
590 pipe.drain()