Coverage for src/km3modules/tests/test_common.py: 99%

439 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 03:14 +0000

1# Filename: test_time.py 

2# pylint: disable=locally-disabled,C0111,R0904,C0103 

3 

4import sqlite3 

5import tempfile 

6 

7import km3pipe as kp 

8from km3pipe.dataclasses import Table 

9from km3modules.common import ( 

10 Siphon, 

11 Delete, 

12 Keep, 

13 Dump, 

14 StatusBar, 

15 TickTock, 

16 MemoryObserver, 

17 BlobIndexer, 

18 LocalDBService, 

19 Observer, 

20 MultiFilePump, 

21 FilePump, 

22) 

23from km3pipe.testing import TestCase, MagicMock 

24from km3pipe.tools import istype 

25 

26__author__ = "Tamas Gal" 

27__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

28__credits__ = [] 

29__license__ = "MIT" 

30__maintainer__ = "Tamas Gal" 

31__email__ = "tgal@km3net.de" 

32__status__ = "Development" 

33 

34 

35class InfinitePump(kp.Module): 

36 """A pump which just infinetly spits out indexed blobs""" 

37 

38 def configure(self): 

39 self.i = 0 

40 

41 def process(self, blob): 

42 self.i += 1 

43 blob["i"] = self.i 

44 return blob 

45 

46 

47class TestKeep(TestCase): 

48 def test_keep_a_single_key(self): 

49 class APump(kp.Module): 

50 def process(self, blob): 

51 blob["a"] = "a" 

52 blob["b"] = "b" 

53 blob["c"] = "c" 

54 blob["d"] = "d" 

55 return blob 

56 

57 class Observer(kp.Module): 

58 def process(self, blob): 

59 assert "a" not in blob 

60 assert "b" not in blob 

61 assert "c" not in blob 

62 assert "d" == blob["d"] 

63 return blob 

64 

65 pipe = kp.Pipeline() 

66 pipe.attach(APump) 

67 pipe.attach(Keep, keys="d") 

68 pipe.attach(Observer) 

69 pipe.drain(5) 

70 

71 def test_keep_multiple_keys(self): 

72 class APump(kp.Module): 

73 def process(self, blob): 

74 blob["a"] = "a" 

75 blob["b"] = "b" 

76 blob["c"] = "c" 

77 blob["d"] = "d" 

78 return blob 

79 

80 class Observer(kp.Module): 

81 def process(self, blob): 

82 assert "a" not in blob 

83 assert "b" == blob["b"] 

84 assert "c" not in blob 

85 assert "d" == blob["d"] 

86 return blob 

87 

88 pipe = kp.Pipeline() 

89 pipe.attach(APump) 

90 pipe.attach(Keep, keys=["b", "d"]) 

91 pipe.attach(Observer) 

92 pipe.drain(5) 

93 

94 def test_hdf5_keep_group_wo_subgroup(self): 

95 class APump(kp.Module): 

96 def process(self, blob): 

97 blob["A"] = kp.Table( 

98 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar" 

99 ) 

100 blob["B"] = kp.Table( 

101 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/ab" 

102 ) 

103 return blob 

104 

105 class Observer(kp.Module): 

106 def process(self, blob): 

107 assert "A" in blob.keys() 

108 assert "/foobar" == blob["A"].h5loc 

109 assert not "B" in blob.keys() 

110 return blob 

111 

112 pipe = kp.Pipeline() 

113 pipe.attach(APump) 

114 pipe.attach(Keep, h5locs=["/foobar"]) 

115 pipe.attach(Observer) 

116 pipe.drain(5) 

117 

118 def test_hdf5_keep_group_w_subgroup(self): 

119 class APump(kp.Module): 

120 def process(self, blob): 

121 blob["A"] = kp.Table( 

122 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar" 

123 ) 

124 blob["B"] = kp.Table( 

125 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/ab" 

126 ) 

127 return blob 

128 

129 class Observer(kp.Module): 

130 def process(self, blob): 

131 assert "A" in blob.keys() 

132 assert "/foobar" == blob["A"].h5loc 

133 assert not "B" in blob.keys() 

134 return blob 

135 

136 pipe = kp.Pipeline() 

137 pipe.attach(APump) 

138 pipe.attach(Keep, h5locs=["/foobar"]) 

139 pipe.attach(Observer) 

140 pipe.drain(5) 

141 

142 def test_key_hdf5_group_individual(self): 

143 class APump(kp.Module): 

144 def process(self, blob): 

145 blob["A"] = kp.Table( 

146 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar" 

147 ) 

148 blob["B"] = kp.Table( 

149 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/ab" 

150 ) 

151 return blob 

152 

153 class Observer(kp.Module): 

154 def process(self, blob): 

155 assert "A" in blob.keys() 

156 assert "/foobar" == blob["A"].h5loc 

157 assert "B" in blob.keys() 

158 assert "/ab" == blob["B"].h5loc 

159 return blob 

160 

161 pipe = kp.Pipeline() 

162 pipe.attach(APump) 

163 pipe.attach(Keep, keys=["B"], h5locs=["/foobar"]) 

164 pipe.attach(Observer) 

165 pipe.drain(5) 

166 

167 def test_key_hdf5_group_parallel(self): 

168 class APump(kp.Module): 

169 def process(self, blob): 

170 blob["A"] = kp.Table( 

171 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar" 

172 ) 

173 blob["B"] = kp.Table( 

174 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/ab" 

175 ) 

176 return blob 

177 

178 class Observer(kp.Module): 

179 def process(self, blob): 

180 assert "A" in blob.keys() 

181 assert "/foobar" == blob["A"].h5loc 

182 assert not "B" in blob.keys() 

183 return blob 

184 

185 pipe = kp.Pipeline() 

186 pipe.attach(APump) 

187 pipe.attach(Keep, keys=["A"], h5locs=["/foobar"]) 

188 pipe.attach(Observer) 

189 pipe.drain(5) 

190 

191 def test_major_hdf5_group(self): 

192 class APump(kp.Module): 

193 def process(self, blob): 

194 blob["A"] = kp.Table( 

195 {"foo": [1, 2, 3], "bar": [4, 5, 6]}, h5loc="/foobar/a" 

196 ) 

197 blob["B"] = kp.Table( 

198 {"a": [1.1, 2.1, 3.1], "b": [4.2, 5.2, 6.2]}, h5loc="/foobar/b" 

199 ) 

200 return blob 

201 

202 class Observer(kp.Module): 

203 def process(self, blob): 

204 assert "A" in blob.keys() 

205 assert "/foobar/a" == blob["A"].h5loc 

206 assert "B" in blob.keys() 

207 assert "/foobar/b" == blob["B"].h5loc 

208 return blob 

209 

210 pipe = kp.Pipeline() 

211 pipe.attach(APump) 

212 pipe.attach(Keep, h5locs=["/foobar"]) 

213 pipe.attach(Observer) 

214 pipe.drain(5) 

215 

216 def test_major_hdf5_group_nested(self): 

217 class APump(kp.Module): 

218 def process(self, blob): 

219 blob["A"] = kp.Table({"a": 0}, h5loc="/foo/bar/a") 

220 blob["B"] = kp.Table({"b": 1}, h5loc="/foo/bar/baz/b") 

221 blob["C"] = kp.Table({"c": 2}, h5loc="/foo/bar/baz/fjord/c") 

222 return blob 

223 

224 class Observer(kp.Module): 

225 def process(self, blob): 

226 assert "A" not in blob 

227 assert "B" in blob 

228 assert "C" in blob 

229 return blob 

230 

231 pipe = kp.Pipeline() 

232 pipe.attach(APump) 

233 pipe.attach(Keep, h5locs=["/foo/bar/baz"]) 

234 pipe.attach(Observer) 

235 pipe.drain(5) 

236 

237 

238class TestDelete(TestCase): 

239 def test_delete_a_single_key(self): 

240 class APump(kp.Module): 

241 def process(self, blob): 

242 blob["a"] = "a" 

243 blob["b"] = "b" 

244 blob["c"] = "c" 

245 return blob 

246 

247 class Observer(kp.Module): 

248 def process(self, blob): 

249 assert "a" == blob["a"] 

250 assert "b" not in blob 

251 assert "c" == blob["c"] 

252 return blob 

253 

254 pipe = kp.Pipeline() 

255 pipe.attach(APump) 

256 pipe.attach(Delete, key="b") 

257 pipe.attach(Observer) 

258 pipe.drain(5) 

259 

260 def test_delete_multiple_keys(self): 

261 class APump(kp.Module): 

262 def process(self, blob): 

263 blob["a"] = "a" 

264 blob["b"] = "b" 

265 blob["c"] = "c" 

266 return blob 

267 

268 class Observer(kp.Module): 

269 def process(self, blob): 

270 assert "a" not in blob 

271 assert "b" not in blob 

272 assert "c" == blob["c"] 

273 return blob 

274 

275 pipe = kp.Pipeline() 

276 pipe.attach(APump) 

277 pipe.attach(Delete, keys=["a", "b"]) 

278 pipe.attach(Observer) 

279 pipe.drain(5) 

280 

281 

282class TestSiphon(TestCase): 

283 def test_siphon(self): 

284 class Observer(kp.Module): 

285 def configure(self): 

286 self.mock = MagicMock() 

287 

288 def process(self, blob): 

289 self.mock() 

290 return blob 

291 

292 def finish(self): 

293 assert self.mock.call_count == 7 

294 

295 pipe = kp.Pipeline() 

296 pipe.attach(InfinitePump) 

297 pipe.attach(Siphon, volume=10) 

298 pipe.attach(Observer) 

299 pipe.drain(17) 

300 

301 def test_siphon_with_flush(self): 

302 class Observer(kp.Module): 

303 def configure(self): 

304 self.mock = MagicMock() 

305 

306 def process(self, blob): 

307 self.mock() 

308 return blob 

309 

310 def finish(self): 

311 assert self.mock.call_count == 1 

312 

313 pipe = kp.Pipeline() 

314 pipe.attach(InfinitePump) 

315 pipe.attach(Siphon, volume=10, flush=True) 

316 pipe.attach(Observer) 

317 pipe.drain(21) 

318 

319 def test_siphon_with_flush_2(self): 

320 class Observer(kp.Module): 

321 def configure(self): 

322 self.mock = MagicMock() 

323 

324 def process(self, blob): 

325 self.mock() 

326 return blob 

327 

328 def finish(self): 

329 assert self.mock.call_count == 2 

330 

331 pipe = kp.Pipeline() 

332 pipe.attach(InfinitePump) 

333 pipe.attach(Siphon, volume=10, flush=True) 

334 pipe.attach(Observer) 

335 pipe.drain(22) 

336 

337 

338class TestDump(TestCase): 

339 def test_dump(self): 

340 def add_something(blob): 

341 blob["a"] = 1 

342 return blob 

343 

344 pipe = kp.Pipeline() 

345 pipe.attach(InfinitePump) 

346 pipe.attach(add_something) 

347 pipe.attach(Dump) 

348 pipe.drain(3) 

349 

350 def test_dump_a_key(self): 

351 def add_something(blob): 

352 blob["a"] = 1 

353 return blob 

354 

355 pipe = kp.Pipeline() 

356 pipe.attach(InfinitePump) 

357 pipe.attach(add_something) 

358 pipe.attach(Dump, key="a") 

359 pipe.drain(3) 

360 

361 def test_dump_multiple_keys(self): 

362 def add_something(blob): 

363 blob["a"] = 1 

364 blob["b"] = 2 

365 return blob 

366 

367 pipe = kp.Pipeline() 

368 pipe.attach(InfinitePump) 

369 pipe.attach(add_something) 

370 pipe.attach(Dump, keys=["a", "b"]) 

371 pipe.drain(3) 

372 

373 def test_dump_full(self): 

374 def add_something(blob): 

375 blob["a"] = 1 

376 blob["b"] = 2 

377 return blob 

378 

379 pipe = kp.Pipeline() 

380 pipe.attach(InfinitePump) 

381 pipe.attach(add_something) 

382 pipe.attach(Dump, full=True) 

383 pipe.drain(3) 

384 

385 

386class TestStatusbar(TestCase): 

387 def test_statusbar(self): 

388 pipe = kp.Pipeline() 

389 pipe.attach(InfinitePump) 

390 pipe.attach(StatusBar, every=2) 

391 pipe.drain(5) 

392 

393 

394class TestTickTock(TestCase): 

395 def test_ticktock(self): 

396 pipe = kp.Pipeline() 

397 pipe.attach(InfinitePump) 

398 pipe.attach(TickTock) 

399 pipe.drain(5) 

400 

401 

402class TestMemoryObserver(TestCase): 

403 def test_memory_observer(self): 

404 pipe = kp.Pipeline() 

405 pipe.attach(InfinitePump) 

406 pipe.attach(MemoryObserver) 

407 pipe.drain(5) 

408 

409 

410class TestBlobIndexer(TestCase): 

411 def test_blob_indexer(self): 

412 class Observer(kp.Module): 

413 def configure(self): 

414 self.index = 0 

415 

416 def process(self, blob): 

417 assert blob["blob_index"] == self.index 

418 self.index += 1 

419 return blob 

420 

421 pipe = kp.Pipeline() 

422 pipe.attach(InfinitePump) 

423 pipe.attach(BlobIndexer) 

424 pipe.attach(Observer) 

425 pipe.drain(4) 

426 

427 

428class TestLocalDBService(TestCase): 

429 def test_create_table(self): 

430 fobj = tempfile.NamedTemporaryFile(delete=True) 

431 dbs = LocalDBService(filename=fobj.name) 

432 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"]) 

433 assert dbs.table_exists("foo") 

434 

435 def test_create_table_does_not_overwrite_by_default(self): 

436 fobj = tempfile.NamedTemporaryFile(delete=True) 

437 dbs = LocalDBService(filename=fobj.name) 

438 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"]) 

439 with self.assertRaises(sqlite3.OperationalError): 

440 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"]) 

441 

442 def test_create_table_allows_overwrite(self): 

443 fobj = tempfile.NamedTemporaryFile(delete=True) 

444 dbs = LocalDBService(filename=fobj.name) 

445 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"]) 

446 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"], overwrite=True) 

447 

448 def test_insert_row(self): 

449 fobj = tempfile.NamedTemporaryFile(delete=True) 

450 dbs = LocalDBService(filename=fobj.name) 

451 dbs.create_table("foo", ["a", "b"], ["INT", "TEXT"]) 

452 

453 dbs.insert_row("foo", ["a", "b"], (23, "42")) 

454 dbs.insert_row("foo", ["a", "b"], (5, "hello")) 

455 

456 cur = dbs.connection.cursor() 

457 cur.execute("SELECT * FROM foo") 

458 data = cur.fetchall() 

459 assert 2 == len(data) 

460 assert 23 == data[0][0] 

461 assert "42" == data[0][1] 

462 assert 5 == data[1][0] 

463 assert "hello" == data[1][1] 

464 

465 

466class TestObserver(TestCase): 

467 def test_observer(self): 

468 class Dummy(kp.Module): 

469 def process(self, blob): 

470 blob["a"] = 1 

471 return blob 

472 

473 pipe = kp.Pipeline() 

474 pipe.attach(Dummy) 

475 pipe.attach(Observer, count=5, required_keys="a") 

476 pipe.drain(5) 

477 

478 def test_observer_raises_when_count_wrong(self): 

479 class Dummy(kp.Module): 

480 def process(self, blob): 

481 return blob 

482 

483 pipe = kp.Pipeline() 

484 pipe.attach(Dummy) 

485 pipe.attach(Observer, count=5) 

486 

487 with self.assertRaises(AssertionError): 

488 pipe.drain(2) 

489 

490 def test_observer_raises_when_key_is_missing(self): 

491 class Dummy(kp.Module): 

492 def process(self, blob): 

493 blob["a"] = 1 

494 return blob 

495 

496 pipe = kp.Pipeline() 

497 pipe.attach(Dummy) 

498 pipe.attach(Observer, required_keys=["b"]) 

499 

500 with self.assertRaises(AssertionError): 

501 pipe.drain(2) 

502 

503 

504class TestMultiFilePump(TestCase): 

505 def test_iteration(self): 

506 class DummyPump(kp.Module): 

507 def configure(self): 

508 self.idx = 0 

509 self.max_iterations = self.get("max_iterations", default=5) 

510 self.blobs = self.blob_generator() 

511 assert 23 == self.get("foo") 

512 assert "narf" == self.get("bar") 

513 

514 def process(self, blob): 

515 return next(self) 

516 

517 def blob_generator(self): 

518 for idx in range(self.max_iterations): 

519 yield kp.Blob({"index": self.idx, "tab": kp.Table({"a": 1})}) 

520 

521 def finish(self): 

522 return self.idx 

523 

524 def __iter__(self): 

525 return self 

526 

527 def __next__(self): 

528 return next(self.blobs) 

529 

530 filenames = ["a", "b", "c"] 

531 max_iterations = 5 

532 total_iterations = max_iterations * len(filenames) 

533 

534 super_self = self 

535 

536 class Observer(kp.Module): 

537 def configure(self): 

538 self.count = 0 

539 self.filenames = [] 

540 self.group_id = [] 

541 

542 def process(self, blob): 

543 self.count += 1 

544 self.filenames.append(blob["filename"]) 

545 self.group_id.append(blob["tab"].group_id[0]) 

546 return blob 

547 

548 def finish(self): 

549 assert self.count == total_iterations 

550 assert "".join(f * max_iterations for f in filenames) == "".join( 

551 self.filenames 

552 ) 

553 super_self.assertListEqual(list(range(total_iterations)), self.group_id) 

554 

555 pipe = kp.Pipeline() 

556 pipe.attach( 

557 MultiFilePump, 

558 pump=DummyPump, 

559 filenames=filenames, 

560 max_iterations=max_iterations, 

561 kwargs={"foo": 23, "bar": "narf"}, 

562 ) 

563 pipe.attach(Observer) 

564 pipe.drain() 

565 

566 

567class TestFilePump(TestCase): 

568 def test_iteration(self): 

569 filenames = ["a", "b", "c"] 

570 

571 super_self = self 

572 

573 class Observer(kp.Module): 

574 def configure(self): 

575 self.count = 0 

576 self.filenames = [] 

577 

578 def process(self, blob): 

579 self.count += 1 

580 self.filenames.append(blob["filename"]) 

581 return blob 

582 

583 def finish(self): 

584 assert self.count == len(filenames) 

585 super_self.assertListEqual(filenames, self.filenames) 

586 

587 pipe = kp.Pipeline() 

588 pipe.attach(FilePump, filenames=filenames) 

589 pipe.attach(Observer) 

590 pipe.drain()