Coverage for src/km3pipe/controlhost.py: 43%
175 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1# Filename: controlhost.py
2"""
3A set of classes and tools wich uses the ControlHost protocol.
5"""
6import socket
7import struct
8import time
10from .logger import get_logger
12__author__ = "Tamas Gal"
13__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration."
14__credits__ = []
15__license__ = "MIT"
16__maintainer__ = "Tamas Gal"
17__email__ = "tgal@km3net.de"
18__status__ = "Development"
20log = get_logger(__name__)
22BUFFER_SIZE = 1024
25class Client(object):
26 """The ControlHost client"""
28 _valid_modes = ["any", "all"]
30 def __init__(self, host, port=5553):
31 self.host = host
32 self.port = port
33 self.socket = None
34 self.tags = []
35 self.valid_tags = []
37 def subscribe(self, tag, mode="any"):
38 if mode not in self._valid_modes:
39 raise ValueError(
40 "Possible subscription modes are: {}".format(
41 ", ".join(self._valid_modes)
42 )
43 )
44 log.info("Subscribing to %s in mode %s", tag, mode)
45 full_tag = self._full_tag(tag, mode)
46 if full_tag not in self.tags:
47 self.tags.append(full_tag)
48 for t in tag.split():
49 if t not in self.valid_tags:
50 self.valid_tags.append(t)
51 self._update_subscriptions()
53 def unsubscribe(self, tag, mode="any"):
54 try:
55 self.tags.remove(self._full_tag(tag, mode))
56 self.valid_tags.remove(tag)
57 except ValueError:
58 pass
59 else:
60 self._update_subscriptions()
62 def _full_tag(self, tag, mode):
63 mode_flag = " {} ".format("w" if mode == "any" else "a")
64 full_tag = mode_flag + tag
65 return full_tag
67 def _update_subscriptions(self):
68 log.debug("Subscribing to tags: %s", self.tags)
69 if not self.socket:
70 self._connect()
71 tags = "".join(self.tags).encode("ascii")
72 message = Message(b"_Subscri", tags)
73 self.socket.send(message.data)
74 message = Message(b"_Always")
75 self.socket.send(message.data)
77 def put_message(self, tag, data):
78 """Send data to the ligier with a given tag"""
79 if not self.socket:
80 self._connect()
81 msg = Message(tag, data)
82 self.socket.send(msg.data)
84 def get_message(self):
85 while True:
86 log.info(" Waiting for control host Prefix")
87 if self.socket is None:
88 log.error("Lost socket connection, reconnecting...")
89 self._reconnect()
90 continue
91 try:
92 data = self._recv(Prefix.SIZE)
93 timestamp = time.time()
94 log.info(" raw prefix data received: '%s'", data)
95 if data == b"":
96 raise EOFError
97 prefix = Prefix(data=data, timestamp=timestamp)
98 except (UnicodeDecodeError, OSError, struct.error):
99 log.error("Failed to construct Prefix, reconnecting.")
100 self._reconnect()
101 continue
103 try:
104 prefix_tag = str(prefix.tag)
105 except UnicodeDecodeError:
106 log.error("The tag could not be decoded. Reconnecting.")
107 self._reconnect()
108 continue
110 if prefix_tag not in self.valid_tags:
111 log.error(
112 "Invalid tag '%s' received, ignoring the message \n"
113 "and reconnecting.\n"
114 " -> valid tags are: %s",
115 prefix_tag,
116 self.valid_tags,
117 )
118 self._reconnect()
119 continue
120 else:
121 break
123 log.info(" got a Prefix with %d bytes.", prefix.length)
124 message = self._recv(prefix.length)
125 log.info(" ------ returning message with %d bytes", len(message))
126 return prefix, message
128 def _connect(self):
129 """Connect to JLigier"""
130 log.debug("Connecting to JLigier")
132 s = socket.socket()
133 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
134 s.connect((self.host, self.port))
135 self.socket = s
137 def _disconnect(self):
138 """Close the socket"""
139 log.debug("Disconnecting from JLigier")
140 if self.socket:
141 self.socket.close()
143 def _reconnect(self):
144 """Reconnect to JLigier and subscribe to the tags."""
145 log.debug("Reconnecting to JLigier...")
146 self._disconnect()
147 self._connect()
148 self._update_subscriptions()
150 def _recv(self, size):
151 """Receive the exact amount of bytes from the socket.
153 This is needed since socket.recv(size) may return less than
154 size, according to the specification: 'size' is the maximum
155 number of bytes returned.
156 """
157 message = b""
158 while len(message) < size:
159 buffer_size = min((BUFFER_SIZE, (size - len(message))))
160 try:
161 message += self.socket.recv(buffer_size)
162 except OSError as e:
163 log.error("Failed to receive controlhost message: %s", e)
164 raise BufferError
165 return message
167 def __enter__(self):
168 self._connect()
169 return self
171 def __exit__(self, exit_type, value, traceback):
172 self._disconnect()
175class Message(object):
176 """The representation of a ControlHost message."""
178 def __init__(self, tag, message=b""):
179 try:
180 message = message.encode()
181 except AttributeError:
182 pass
183 try:
184 tag = tag.encode()
185 except AttributeError:
186 pass
187 self.prefix = Prefix(tag, len(message))
188 self.message = message
190 @property
191 def data(self):
192 return self.prefix.data + self.message
195class Tag(object):
196 """Represents the tag in a ControlHost Prefix."""
198 SIZE = 8
200 def __init__(self, data=None):
201 self._data = b""
202 self.data = data
204 @property
205 def data(self):
206 """The byte data"""
207 return self._data
209 @data.setter
210 def data(self, value):
211 """Set the byte data and fill up the bytes to fit the size."""
212 if not value:
213 value = b""
214 if len(value) > self.SIZE:
215 raise ValueError("The maximum tag size is {}".format(self.SIZE))
216 self._data = value
217 while len(self._data) < self.SIZE:
218 self._data += b"\x00"
220 def __str__(self):
221 return self.data.decode(encoding="UTF-8").strip("\x00")
223 def __len__(self):
224 return len(self._data)
227class Prefix(object):
228 """The prefix of a ControlHost message."""
230 SIZE = 16
232 def __init__(self, tag=None, length=None, data=None, timestamp=None):
233 if data:
234 self.data = data
235 else:
236 self.tag = Tag(tag)
237 self.length = length
238 if timestamp is None:
239 self.timestamp = time.time()
240 else:
241 self.timestamp = timestamp
243 @property
244 def data(self):
245 return self.tag.data + struct.pack(">i", self.length) + b"\x00" * 4
247 @data.setter
248 def data(self, value):
249 self.tag = Tag(data=value[: Tag.SIZE])
250 self.length = struct.unpack(">i", value[Tag.SIZE : Tag.SIZE + 4])[0]
252 def __str__(self):
253 return "ControlHost Prefix with tag '{0}' ({1} bytes of data)".format(
254 self.tag, self.length
255 )