Pipeline Workflow¶
km3pipe is a lightweight framework which tries to give you a lose structure and workflow for data analysis. It uses the simple, yet powerful module system of the thepipe Python module, which allows you to organise and reuse code.
In this section, a few examples are shown how to work the with pipeline, but it is recommended to check the documentation of thepipe since the features will change in future.
The main structure is a Pipeline
which is meant to hold everything
together. The building blocks are simply called Modules and are either
basic Python functions or instances of the class Module
. These classes
are simply imported from thepipe
package and can also be used directly
from that.
To setup a workflow, you first create a pipeline, attach the modules to it
and to fire up the analysis chain, you call .drain()
on your pipeline
and let the flow go.
Draining Procedure¶
Everything is standing still up until the pipe.drain()
method is called.
This method can be called without arguments, which will cycle through the iterations
until the attached pump stops it. One can however also pass a number, which stands
for the maximum number of iterations, like pipe.drain(23)
.
Another possibility to stop the pipeline before the pump finishes is pressing CTRL+C
,
which will execute the current cycle, gracefully finish up every module and close
all the file handlers properly.
The stages a pipeline launches when pipe.drain()
is executed are the following:
Call
.configure()
on every attached module one-by-oneCall
.prepare()
on every attached module one-by-oneStart with the actual iteration and call
.process(blob)
on the first moduleTake it’s output (a
Blob()
) and call the next attached module with that Blob instanceWhen the cycle reached the last module, start from the beginning
Stop whenever (one of) the pump raises a
StopIteration
Call
.finish()
on every module one-by-one
Pipeline Example¶
The following script shows the module system of KM3Pipe. There is a
DummyPump
which is in this case a dummy data generator. The other Modules
do some modifications on the data and pass them through to the next module in
the pipeline.
1#!/usr/bin/env python
2
3__author__ = "tamasgal"
4
5import km3pipe as kp
6
7
8class DummyPump(kp.Module):
9 """A pump demonstration with a dummy list as data."""
10
11 def configure(self):
12 self.data = [{"nr": 1}, {"nr": 2}]
13 self.blobs = self.blob_generator()
14
15 def process(self, blob):
16 return next(self.blobs)
17
18 def blob_generator(self):
19 """Create a blob generator."""
20 for blob in self.data:
21 yield blob
22
23
24class Foo(kp.Module):
25 """A dummy module with optional and required parameters"""
26
27 def configure(self):
28 self.foo = self.get("foo", default="default_foo") # optional
29 self.bar = self.get("bar", default=23) # optional
30 self.baz = self.require("baz") # required
31 self.i = 0
32
33 def process(self, blob):
34 print("This is the current blob: " + str(blob))
35 self.i += 1
36 blob["foo_entry"] = self.foo
37 return blob
38
39 def finish(self):
40 print("My process() method was called {} times.".format(self.i))
41
42
43def moo(blob):
44 """A simple function to attach"""
45 blob["moo_entry"] = 42
46 return blob
47
48
49class PrintBlob(kp.Module):
50 def process(self, blob):
51 print(blob)
52 return blob
53
54
55pipe = kp.Pipeline()
56pipe.attach(DummyPump, "the_pump")
57pipe.attach(Foo, bar="dummybar", baz=69)
58pipe.attach(moo)
59pipe.attach(PrintBlob)
60pipe.drain()
Which will print the following::
Pipeline and module initialisation took 0.000s (CPU 0.000s).
This is the current blob: {'nr': 1}
{'nr': 1, 'foo_entry': 'default_foo', 'moo_entry': 42}
This is the current blob: {'nr': 2}
{'nr': 2, 'foo_entry': 'default_foo', 'moo_entry': 42}
My process() method was called 2 times.
============================================================
2 cycles drained in 0.000553s (CPU 0.000525s). Memory peak: 154.08 MB
wall mean: 0.000058s medi: 0.000058s min: 0.000055s max: 0.000062s std: 0.000004s
CPU mean: 0.000059s medi: 0.000059s min: 0.000056s max: 0.000062s std: 0.000003s
Modules¶
A module is a configurable building block which can be attached to a pipeline.
It has a process()
method, which is called every time with the current
data (“blob”) in the pipeline cycle. This piece of data can be analysed,
manipulated and finally returned to allow the handover to the next module
in the pipeline system.
Instance variables can be initialised within the configure()
method.
User defined parameters are accessible via self.get()
or self.required()
calls inside the configure()
method.
Both of them return the passed value or in case of self.get()
, the default=None
value.
This allows an easy way to define default values as seen in the example below.
1class Foo(kp.Module):
2 """A dummy module with optional and required parameters"""
3
4 def configure(self):
5 self.foo = self.get("foo", default="default_foo") # optional
6 self.bar = self.get("bar", default=23) # optional
7 self.baz = self.require("baz") # required
8 self.i = 0
9
10 def process(self, blob):
11 print("This is the current blob: " + str(blob))
12 self.i += 1
13 blob["foo_entry"] = self.foo
14 return blob
15
16 def finish(self):
17 print("My process() method was called {} times.".format(self.i))
To override the default parameters, the desired values can be set when
attaching the module to the pipeline. Always use the class itself, since
the attach()
method of the pipeline will care about the initialisation:
pipe.attach(Foo, bar='dummybar', baz=69)
Module can optionally implement the following methods:
configure()
prepare()
process()
finish()
Pumps / Sinks¶
The pump and sink are special kinds of Module
and are usually the
first and last ones to be attached to a pipeline. They are responsible
for reading and writing data to/from files, or streams from socket
connections in each iteration, continuously.
Note that they still simply derive from Module
.
configure()
method should be used to set up the file or socket
handler and the finish()
has to close them. The actual data is
passed via the process()
method. A data chunk is internally called
Blob
(a fancier dict
) and contains whatever the pump extracts
in each iteration. It can be a single event or the contents of a whole
file.
To end the data pumping, the pump has to raise a StopIteration
exception. One elegant way to implement this in Python is using a
generator.
The following example shows a very basic pump, which simply initialises
a list of dictionaries and “io” one blob after another on each
process()
call to the next module in the pipeline.
1class DummyPump(kp.Module):
2 """A pump demonstration with a dummy list as data."""
3
4 def configure(self):
5 self.data = [{"nr": 1}, {"nr": 2}]
6 self.blobs = self.blob_generator()
7
8 def process(self, blob):
9 return next(self.blobs)
10
11 def blob_generator(self):
12 """Create a blob generator."""
13 for blob in self.data:
14 yield blob
Logging and Printing¶
Every module inheriting from the Module
class has a fancy logger and a
printer available to produce output which is unique (an actual colour code
is generated using a hash of the module name).
Inside any method of the module, use self.log
to access the logger, which
comes with the usual functions like self.log.debug()
, self.log.info()
,
self.log.warning()
, self.log.error()
or self.log.critical()
.
The self.cprint
function can be used to print messages which are colour
coded with the same colours used for the logger.
Configuring the Pipeline using Configuration Files¶
The pipeline and all the attached modules can be configured by a TOML formatted
file, sitting at the current working directory (where the initial script is
invoked to launch the pipeline). The default filename is pipeline.toml
but a different filename can be chosen when creating the Pipeline
instance
using Pipeline(configfile='your_desired_filename.toml')
.
TOML does not support variables, but a section called [VARIABLES]
can be
defined and those will be substituted when used in other (module) sections.
Here is an example of the file:
1[VARIABLES] # optional section to define variables
2TIMEOUT = 123
3DEBUG = False
4
5[StatusBar]
6every = 1000
7
8[AModule]
9some_parameter = "foo"
10other_parameter = 23
11a_list = [1, 2, 3, 23]
12sleep = "TIMEOUT" # this will be replaced with the value in [VARIABLES]
13debug = "DEBUG" # this either...
14
15[BModule]
16bar = 42
17sleep = "TIMEOUT" # this will be replaced with the value in [VARIABLES]