Coverage for src/km3pipe/io/ch.py: 42%

133 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 03:14 +0000

1#!/usr/bin/env python 

2# Filename: jpp.py 

3# pylint: disable= 

4""" 

5Pump for the jpp file read through aanet interface. 

6 

7""" 

8 

9from thepipe import Module, Blob 

10from km3pipe.controlhost import Client 

11from km3pipe.time import Cuckoo 

12from km3pipe.logger import get_logger 

13import threading 

14import socket 

15import time 

16import numpy as np 

17from collections import deque 

18from queue import Queue, Empty 

19 

20__author__ = "Tamas Gal" 

21__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

22__credits__ = [] 

23__license__ = "MIT" 

24__maintainer__ = "Tamas Gal" 

25__email__ = "tgal@km3net.de" 

26__status__ = "Development" 

27 

28log = get_logger(__name__) # pylint: disable=C0103 

29 

30 

31class CHPump(Module): 

32 """A pump for ControlHost data.""" 

33 

34 def configure(self): 

35 self.host = self.get("host") or "127.0.0.1" 

36 self.port = self.get("port") or 5553 

37 self.tags = self.get("tags") or "MSG" 

38 self.timeout = self.get("timeout") or 60 * 60 * 24 

39 self.max_queue = self.get("max_queue") or 50 

40 self.key_for_data = self.get("key_for_data") or "CHData" 

41 self.key_for_prefix = self.get("key_for_prefix") or "CHPrefix" 

42 self.subscription_mode = self.get("subscription_mode", default="any") 

43 self.show_statistics = self.get("show_statistics", default=False) 

44 self.statistics_interval = self.get("statistics_interval", default=30) 

45 self.cuckoo_warn = Cuckoo(60 * 5, log.warning) 

46 self.performance_warn = Cuckoo( 

47 self.statistics_interval, self.show_performance_statistics 

48 ) 

49 

50 self.idle_dt = deque(maxlen=1000) 

51 self.idle_timer = time.time() 

52 self.message_count = 0 

53 

54 self.loop_cycle = 0 

55 self.queue = Queue() 

56 self.client = None 

57 self.thread = None 

58 

59 if self.subscription_mode == "all": 

60 self.log.warning( 

61 "You subscribed to the ligier in 'all'-mode! " 

62 "If you are too slow with data processing, " 

63 "you will block other clients. " 

64 "If you don't understand this message " 

65 "and are running this code on a DAQ machine, " 

66 "consult a DAQ expert now and stop this script." 

67 ) 

68 

69 print( 

70 "Connecting to {0} on port {1}\n" 

71 "Subscribed tags: {2}\n" 

72 "Connection timeout: {3}s\n" 

73 "Maximum queue size for incoming data: {4}".format( 

74 self.host, self.port, self.tags, self.timeout, self.max_queue 

75 ) 

76 ) 

77 

78 self._init_controlhost() 

79 self._start_thread() 

80 

81 def _start_thread(self): 

82 log.debug("Starting and demonising thread.") 

83 self.thread = threading.Thread(target=self._run, args=()) 

84 self.thread.daemon = True 

85 self.thread.start() 

86 

87 def _init_controlhost(self): 

88 """Set up the controlhost connection""" 

89 log.debug("Connecting to JLigier") 

90 self.client = Client(self.host, self.port) 

91 self.client._connect() 

92 log.debug("Subscribing to tags: %s", self.tags) 

93 for tag in self.tags.split(","): 

94 self.client.subscribe(tag.strip(), mode=self.subscription_mode) 

95 log.debug("Controlhost initialisation done.") 

96 

97 def _run(self): 

98 log.debug("Entering the main loop.") 

99 while True: 

100 current_qsize = self.queue.qsize() 

101 self.loop_cycle += 1 

102 self._set_idle_timer() 

103 try: 

104 prefix, data = self.client.get_message() 

105 except EOFError: 

106 log.warning("EOF from Ligier, trying again in 30 seconds...") 

107 time.sleep(30) 

108 try: 

109 log.debug("Reinitialising new CH connection.") 

110 self._init_controlhost() 

111 except socket.error: 

112 log.error("Failed to connect to host.") 

113 continue 

114 except BufferError: 

115 log.error("Buffer error in Ligier stream, aborting...") 

116 break 

117 else: 

118 self._add_idle_dt() 

119 self.message_count += 1 

120 self.performance_warn() 

121 # log.debug("%d bytes received from network.", len(data)) 

122 if not data: 

123 log.critical( 

124 "No data received, connection died.\n" 

125 + "Trying to reconnect in 30 seconds." 

126 ) 

127 time.sleep(30) 

128 try: 

129 log.debug("Reinitialising new CH connection.") 

130 self._init_controlhost() 

131 except socket.error: 

132 log.error("Failed to connect to host.") 

133 continue 

134 if current_qsize > self.max_queue: 

135 self.cuckoo_warn( 

136 "Maximum queue size ({0}) reached, " 

137 "dropping data.".format(self.max_queue) 

138 ) 

139 else: 

140 self.queue.put((prefix, data)) 

141 log.debug("Quitting the main loop.") 

142 

143 def process(self, blob): 

144 """Wait for the next packet and put it in the blob""" 

145 try: 

146 log.debug("Waiting for queue items.") 

147 prefix, data = self.queue.get(timeout=self.timeout) 

148 log.debug("Got %d bytes from queue.", len(data)) 

149 except Empty: 

150 log.warning("ControlHost timeout (%d s) reached", self.timeout) 

151 raise StopIteration("ControlHost timeout reached.") 

152 blob[self.key_for_prefix] = prefix 

153 blob[self.key_for_data] = data 

154 return blob 

155 

156 def show_performance_statistics(self): 

157 if not self.show_statistics: 

158 return 

159 dt = np.median(self.idle_dt) 

160 current_qsize = self.queue.qsize() 

161 log_func = self.cprint 

162 if dt < 0 or current_qsize > 0: 

163 log_func = self.log.warning 

164 log_func( 

165 "Message rate: {0:.1f} Hz, median idle time per message: " 

166 "{1:.3f} us (current queue size: {2})".format( 

167 self.message_count / self.statistics_interval, dt * 1e6, current_qsize 

168 ) 

169 ) 

170 self.message_count = 0 

171 

172 def _set_idle_timer(self): 

173 self.idle_timer = time.time() 

174 

175 def _add_idle_dt(self): 

176 now = time.time() 

177 self.idle_dt.append(now - self.idle_timer) 

178 

179 def finish(self): 

180 """Clean up the JLigier controlhost connection""" 

181 log.debug("Disconnecting from JLigier.") 

182 self.client.socket.shutdown(socket.SHUT_RDWR) 

183 self.client._disconnect() 

184 

185 def __iter__(self): 

186 return self 

187 

188 def __next__(self): 

189 return self.process(Blob()) 

190 

191 def next(self): 

192 return self.__next__() 

193 

194 

195def CHTagger(blob): 

196 tag = str(blob["CHPrefix"].tag) 

197 blob[tag] = True 

198 return blob