Pipeline Workflow

km3pipe is a lightweight framework which tries to give you a lose structure and workflow for data analysis. It uses the simple, yet powerful module system of the thepipe Python module, which allows you to organise and reuse code.

In this section, a few examples are shown how to work the with pipeline, but it is recommended to check the documentation of thepipe since the features will change in future.

The main structure is a Pipeline which is meant to hold everything together. The building blocks are simply called Modules and are either basic Python functions or instances of the class Module. These classes are simply imported from thepipe package and can also be used directly from that.

To setup a workflow, you first create a pipeline, attach the modules to it and to fire up the analysis chain, you call .drain() on your pipeline and let the flow go.

Draining Procedure

Everything is standing still up until the pipe.drain() method is called. This method can be called without arguments, which will cycle through the iterations until the attached pump stops it. One can however also pass a number, which stands for the maximum number of iterations, like pipe.drain(23). Another possibility to stop the pipeline before the pump finishes is pressing CTRL+C, which will execute the current cycle, gracefully finish up every module and close all the file handlers properly.

The stages a pipeline launches when pipe.drain() is executed are the following:

  • Call .configure() on every attached module one-by-one

  • Call .prepare() on every attached module one-by-one

  • Start with the actual iteration and call .process(blob) on the first module

  • Take it’s output (a Blob()) and call the next attached module with that Blob instance

  • When the cycle reached the last module, start from the beginning

  • Stop whenever (one of) the pump raises a StopIteration

  • Call .finish() on every module one-by-one

Pipeline Example

The following script shows the module system of KM3Pipe. There is a DummyPump which is in this case a dummy data generator. The other Modules do some modifications on the data and pass them through to the next module in the pipeline.

 1#!/usr/bin/env python
 2
 3__author__ = "tamasgal"
 4
 5import km3pipe as kp
 6
 7
 8class DummyPump(kp.Module):
 9    """A pump demonstration with a dummy list as data."""
10
11    def configure(self):
12        self.data = [{"nr": 1}, {"nr": 2}]
13        self.blobs = self.blob_generator()
14
15    def process(self, blob):
16        return next(self.blobs)
17
18    def blob_generator(self):
19        """Create a blob generator."""
20        for blob in self.data:
21            yield blob
22
23
24class Foo(kp.Module):
25    """A dummy module with optional and required parameters"""
26
27    def configure(self):
28        self.foo = self.get("foo", default="default_foo")  # optional
29        self.bar = self.get("bar", default=23)  # optional
30        self.baz = self.require("baz")  # required
31        self.i = 0
32
33    def process(self, blob):
34        print("This is the current blob: " + str(blob))
35        self.i += 1
36        blob["foo_entry"] = self.foo
37        return blob
38
39    def finish(self):
40        print("My process() method was called {} times.".format(self.i))
41
42
43def moo(blob):
44    """A simple function to attach"""
45    blob["moo_entry"] = 42
46    return blob
47
48
49class PrintBlob(kp.Module):
50    def process(self, blob):
51        print(blob)
52        return blob
53
54
55pipe = kp.Pipeline()
56pipe.attach(DummyPump, "the_pump")
57pipe.attach(Foo, bar="dummybar", baz=69)
58pipe.attach(moo)
59pipe.attach(PrintBlob)
60pipe.drain()

Which will print the following::

Pipeline and module initialisation took 0.000s (CPU 0.000s).
This is the current blob: {'nr': 1}
{'nr': 1, 'foo_entry': 'default_foo', 'moo_entry': 42}
This is the current blob: {'nr': 2}
{'nr': 2, 'foo_entry': 'default_foo', 'moo_entry': 42}
My process() method was called 2 times.
============================================================
2 cycles drained in 0.000553s (CPU 0.000525s). Memory peak: 154.08 MB
  wall  mean: 0.000058s  medi: 0.000058s  min: 0.000055s  max: 0.000062s  std: 0.000004s
  CPU   mean: 0.000059s  medi: 0.000059s  min: 0.000056s  max: 0.000062s  std: 0.000003s

Modules

A module is a configurable building block which can be attached to a pipeline. It has a process() method, which is called every time with the current data (“blob”) in the pipeline cycle. This piece of data can be analysed, manipulated and finally returned to allow the handover to the next module in the pipeline system.

Instance variables can be initialised within the configure() method. User defined parameters are accessible via self.get() or self.required() calls inside the configure() method. Both of them return the passed value or in case of self.get(), the default=None value. This allows an easy way to define default values as seen in the example below.

 1class Foo(kp.Module):
 2    """A dummy module with optional and required parameters"""
 3
 4    def configure(self):
 5        self.foo = self.get("foo", default="default_foo")  # optional
 6        self.bar = self.get("bar", default=23)  # optional
 7        self.baz = self.require("baz")  # required
 8        self.i = 0
 9
10    def process(self, blob):
11        print("This is the current blob: " + str(blob))
12        self.i += 1
13        blob["foo_entry"] = self.foo
14        return blob
15
16    def finish(self):
17        print("My process() method was called {} times.".format(self.i))

To override the default parameters, the desired values can be set when attaching the module to the pipeline. Always use the class itself, since the attach() method of the pipeline will care about the initialisation:

pipe.attach(Foo, bar='dummybar', baz=69)

Module can optionally implement the following methods:

configure()
prepare()
process()
finish()

Pumps / Sinks

The pump and sink are special kinds of Module and are usually the first and last ones to be attached to a pipeline. They are responsible for reading and writing data to/from files, or streams from socket connections in each iteration, continuously. Note that they still simply derive from Module.

configure() method should be used to set up the file or socket handler and the finish() has to close them. The actual data is passed via the process() method. A data chunk is internally called Blob (a fancier dict) and contains whatever the pump extracts in each iteration. It can be a single event or the contents of a whole file.

To end the data pumping, the pump has to raise a StopIteration exception. One elegant way to implement this in Python is using a generator.

The following example shows a very basic pump, which simply initialises a list of dictionaries and “io” one blob after another on each process() call to the next module in the pipeline.

 1class DummyPump(kp.Module):
 2    """A pump demonstration with a dummy list as data."""
 3
 4    def configure(self):
 5        self.data = [{"nr": 1}, {"nr": 2}]
 6        self.blobs = self.blob_generator()
 7
 8    def process(self, blob):
 9        return next(self.blobs)
10
11    def blob_generator(self):
12        """Create a blob generator."""
13        for blob in self.data:
14            yield blob

Logging and Printing

Every module inheriting from the Module class has a fancy logger and a printer available to produce output which is unique (an actual colour code is generated using a hash of the module name).

Inside any method of the module, use self.log to access the logger, which comes with the usual functions like self.log.debug(), self.log.info(), self.log.warning(), self.log.error() or self.log.critical().

The self.cprint function can be used to print messages which are colour coded with the same colours used for the logger.

Configuring the Pipeline using Configuration Files

The pipeline and all the attached modules can be configured by a TOML formatted file, sitting at the current working directory (where the initial script is invoked to launch the pipeline). The default filename is pipeline.toml but a different filename can be chosen when creating the Pipeline instance using Pipeline(configfile='your_desired_filename.toml').

TOML does not support variables, but a section called [VARIABLES] can be defined and those will be substituted when used in other (module) sections.

Here is an example of the file:

 1[VARIABLES]  # optional section to define variables
 2TIMEOUT = 123
 3DEBUG = False
 4
 5[StatusBar]
 6every = 1000
 7
 8[AModule]
 9some_parameter = "foo"
10other_parameter = 23
11a_list = [1, 2, 3, 23]
12sleep = "TIMEOUT"  # this will be replaced with the value in [VARIABLES]
13debug = "DEBUG"    # this either...
14
15[BModule]
16bar = 42
17sleep = "TIMEOUT"  # this will be replaced with the value in [VARIABLES]