#!/usr/bin/env python
# Filename: jpp.py
# pylint: disable=
"""
Pump for the jpp file read through aanet interface.
"""
from thepipe import Module, Blob
from km3pipe.controlhost import Client
from km3pipe.time import Cuckoo
from km3pipe.logger import get_logger
import threading
import socket
import time
import numpy as np
from collections import deque
from queue import Queue, Empty
__author__ = "Tamas Gal"
__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
__credits__ = []
__license__ = "MIT"
__maintainer__ = "Tamas Gal"
__email__ = "tgal@km3net.de"
__status__ = "Development"
[docs]
log = get_logger(__name__) # pylint: disable=C0103
[docs]
class CHPump(Module):
"""A pump for ControlHost data."""
def _start_thread(self):
log.debug("Starting and demonising thread.")
self.thread = threading.Thread(target=self._run, args=())
self.thread.daemon = True
self.thread.start()
def _init_controlhost(self):
"""Set up the controlhost connection"""
log.debug("Connecting to JLigier")
self.client = Client(self.host, self.port)
self.client._connect()
log.debug("Subscribing to tags: %s", self.tags)
for tag in self.tags.split(","):
self.client.subscribe(tag.strip(), mode=self.subscription_mode)
log.debug("Controlhost initialisation done.")
def _run(self):
log.debug("Entering the main loop.")
while True:
current_qsize = self.queue.qsize()
self.loop_cycle += 1
self._set_idle_timer()
try:
prefix, data = self.client.get_message()
except EOFError:
log.warning("EOF from Ligier, trying again in 30 seconds...")
time.sleep(30)
try:
log.debug("Reinitialising new CH connection.")
self._init_controlhost()
except socket.error:
log.error("Failed to connect to host.")
continue
except BufferError:
log.error("Buffer error in Ligier stream, aborting...")
break
else:
self._add_idle_dt()
self.message_count += 1
self.performance_warn()
# log.debug("%d bytes received from network.", len(data))
if not data:
log.critical(
"No data received, connection died.\n"
+ "Trying to reconnect in 30 seconds."
)
time.sleep(30)
try:
log.debug("Reinitialising new CH connection.")
self._init_controlhost()
except socket.error:
log.error("Failed to connect to host.")
continue
if current_qsize > self.max_queue:
self.cuckoo_warn(
"Maximum queue size ({0}) reached, "
"dropping data.".format(self.max_queue)
)
else:
self.queue.put((prefix, data))
log.debug("Quitting the main loop.")
[docs]
def process(self, blob):
"""Wait for the next packet and put it in the blob"""
try:
log.debug("Waiting for queue items.")
prefix, data = self.queue.get(timeout=self.timeout)
log.debug("Got %d bytes from queue.", len(data))
except Empty:
log.warning("ControlHost timeout (%d s) reached", self.timeout)
raise StopIteration("ControlHost timeout reached.")
blob[self.key_for_prefix] = prefix
blob[self.key_for_data] = data
return blob
def _set_idle_timer(self):
self.idle_timer = time.time()
def _add_idle_dt(self):
now = time.time()
self.idle_dt.append(now - self.idle_timer)
[docs]
def finish(self):
"""Clean up the JLigier controlhost connection"""
log.debug("Disconnecting from JLigier.")
self.client.socket.shutdown(socket.SHUT_RDWR)
self.client._disconnect()
def __iter__(self):
return self
def __next__(self):
return self.process(Blob())
[docs]
def next(self):
return self.__next__()
[docs]
def CHTagger(blob):
tag = str(blob["CHPrefix"].tag)
blob[tag] = True
return blob