Coverage for src/km3pipe/io/ch.py: 42%
133 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1#!/usr/bin/env python
2# Filename: jpp.py
3# pylint: disable=
4"""
5Pump for the jpp file read through aanet interface.
7"""
9from thepipe import Module, Blob
10from km3pipe.controlhost import Client
11from km3pipe.time import Cuckoo
12from km3pipe.logger import get_logger
13import threading
14import socket
15import time
16import numpy as np
17from collections import deque
18from queue import Queue, Empty
20__author__ = "Tamas Gal"
21__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
22__credits__ = []
23__license__ = "MIT"
24__maintainer__ = "Tamas Gal"
25__email__ = "tgal@km3net.de"
26__status__ = "Development"
28log = get_logger(__name__) # pylint: disable=C0103
31class CHPump(Module):
32 """A pump for ControlHost data."""
34 def configure(self):
35 self.host = self.get("host") or "127.0.0.1"
36 self.port = self.get("port") or 5553
37 self.tags = self.get("tags") or "MSG"
38 self.timeout = self.get("timeout") or 60 * 60 * 24
39 self.max_queue = self.get("max_queue") or 50
40 self.key_for_data = self.get("key_for_data") or "CHData"
41 self.key_for_prefix = self.get("key_for_prefix") or "CHPrefix"
42 self.subscription_mode = self.get("subscription_mode", default="any")
43 self.show_statistics = self.get("show_statistics", default=False)
44 self.statistics_interval = self.get("statistics_interval", default=30)
45 self.cuckoo_warn = Cuckoo(60 * 5, log.warning)
46 self.performance_warn = Cuckoo(
47 self.statistics_interval, self.show_performance_statistics
48 )
50 self.idle_dt = deque(maxlen=1000)
51 self.idle_timer = time.time()
52 self.message_count = 0
54 self.loop_cycle = 0
55 self.queue = Queue()
56 self.client = None
57 self.thread = None
59 if self.subscription_mode == "all":
60 self.log.warning(
61 "You subscribed to the ligier in 'all'-mode! "
62 "If you are too slow with data processing, "
63 "you will block other clients. "
64 "If you don't understand this message "
65 "and are running this code on a DAQ machine, "
66 "consult a DAQ expert now and stop this script."
67 )
69 print(
70 "Connecting to {0} on port {1}\n"
71 "Subscribed tags: {2}\n"
72 "Connection timeout: {3}s\n"
73 "Maximum queue size for incoming data: {4}".format(
74 self.host, self.port, self.tags, self.timeout, self.max_queue
75 )
76 )
78 self._init_controlhost()
79 self._start_thread()
81 def _start_thread(self):
82 log.debug("Starting and demonising thread.")
83 self.thread = threading.Thread(target=self._run, args=())
84 self.thread.daemon = True
85 self.thread.start()
87 def _init_controlhost(self):
88 """Set up the controlhost connection"""
89 log.debug("Connecting to JLigier")
90 self.client = Client(self.host, self.port)
91 self.client._connect()
92 log.debug("Subscribing to tags: %s", self.tags)
93 for tag in self.tags.split(","):
94 self.client.subscribe(tag.strip(), mode=self.subscription_mode)
95 log.debug("Controlhost initialisation done.")
97 def _run(self):
98 log.debug("Entering the main loop.")
99 while True:
100 current_qsize = self.queue.qsize()
101 self.loop_cycle += 1
102 self._set_idle_timer()
103 try:
104 prefix, data = self.client.get_message()
105 except EOFError:
106 log.warning("EOF from Ligier, trying again in 30 seconds...")
107 time.sleep(30)
108 try:
109 log.debug("Reinitialising new CH connection.")
110 self._init_controlhost()
111 except socket.error:
112 log.error("Failed to connect to host.")
113 continue
114 except BufferError:
115 log.error("Buffer error in Ligier stream, aborting...")
116 break
117 else:
118 self._add_idle_dt()
119 self.message_count += 1
120 self.performance_warn()
121 # log.debug("%d bytes received from network.", len(data))
122 if not data:
123 log.critical(
124 "No data received, connection died.\n"
125 + "Trying to reconnect in 30 seconds."
126 )
127 time.sleep(30)
128 try:
129 log.debug("Reinitialising new CH connection.")
130 self._init_controlhost()
131 except socket.error:
132 log.error("Failed to connect to host.")
133 continue
134 if current_qsize > self.max_queue:
135 self.cuckoo_warn(
136 "Maximum queue size ({0}) reached, "
137 "dropping data.".format(self.max_queue)
138 )
139 else:
140 self.queue.put((prefix, data))
141 log.debug("Quitting the main loop.")
143 def process(self, blob):
144 """Wait for the next packet and put it in the blob"""
145 try:
146 log.debug("Waiting for queue items.")
147 prefix, data = self.queue.get(timeout=self.timeout)
148 log.debug("Got %d bytes from queue.", len(data))
149 except Empty:
150 log.warning("ControlHost timeout (%d s) reached", self.timeout)
151 raise StopIteration("ControlHost timeout reached.")
152 blob[self.key_for_prefix] = prefix
153 blob[self.key_for_data] = data
154 return blob
156 def show_performance_statistics(self):
157 if not self.show_statistics:
158 return
159 dt = np.median(self.idle_dt)
160 current_qsize = self.queue.qsize()
161 log_func = self.cprint
162 if dt < 0 or current_qsize > 0:
163 log_func = self.log.warning
164 log_func(
165 "Message rate: {0:.1f} Hz, median idle time per message: "
166 "{1:.3f} us (current queue size: {2})".format(
167 self.message_count / self.statistics_interval, dt * 1e6, current_qsize
168 )
169 )
170 self.message_count = 0
172 def _set_idle_timer(self):
173 self.idle_timer = time.time()
175 def _add_idle_dt(self):
176 now = time.time()
177 self.idle_dt.append(now - self.idle_timer)
179 def finish(self):
180 """Clean up the JLigier controlhost connection"""
181 log.debug("Disconnecting from JLigier.")
182 self.client.socket.shutdown(socket.SHUT_RDWR)
183 self.client._disconnect()
185 def __iter__(self):
186 return self
188 def __next__(self):
189 return self.process(Blob())
191 def next(self):
192 return self.__next__()
195def CHTagger(blob):
196 tag = str(blob["CHPrefix"].tag)
197 blob[tag] = True
198 return blob