Coverage for src/km3pipe/controlhost.py: 43%

175 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 03:14 +0000

1# Filename: controlhost.py 

2""" 

3A set of classes and tools wich uses the ControlHost protocol. 

4 

5""" 

6import socket 

7import struct 

8import time 

9 

10from .logger import get_logger 

11 

12__author__ = "Tamas Gal" 

13__copyright__ = "Copyright 2016, Tamas Gal and the KM3NeT collaboration." 

14__credits__ = [] 

15__license__ = "MIT" 

16__maintainer__ = "Tamas Gal" 

17__email__ = "tgal@km3net.de" 

18__status__ = "Development" 

19 

20log = get_logger(__name__) 

21 

22BUFFER_SIZE = 1024 

23 

24 

25class Client(object): 

26 """The ControlHost client""" 

27 

28 _valid_modes = ["any", "all"] 

29 

30 def __init__(self, host, port=5553): 

31 self.host = host 

32 self.port = port 

33 self.socket = None 

34 self.tags = [] 

35 self.valid_tags = [] 

36 

37 def subscribe(self, tag, mode="any"): 

38 if mode not in self._valid_modes: 

39 raise ValueError( 

40 "Possible subscription modes are: {}".format( 

41 ", ".join(self._valid_modes) 

42 ) 

43 ) 

44 log.info("Subscribing to %s in mode %s", tag, mode) 

45 full_tag = self._full_tag(tag, mode) 

46 if full_tag not in self.tags: 

47 self.tags.append(full_tag) 

48 for t in tag.split(): 

49 if t not in self.valid_tags: 

50 self.valid_tags.append(t) 

51 self._update_subscriptions() 

52 

53 def unsubscribe(self, tag, mode="any"): 

54 try: 

55 self.tags.remove(self._full_tag(tag, mode)) 

56 self.valid_tags.remove(tag) 

57 except ValueError: 

58 pass 

59 else: 

60 self._update_subscriptions() 

61 

62 def _full_tag(self, tag, mode): 

63 mode_flag = " {} ".format("w" if mode == "any" else "a") 

64 full_tag = mode_flag + tag 

65 return full_tag 

66 

67 def _update_subscriptions(self): 

68 log.debug("Subscribing to tags: %s", self.tags) 

69 if not self.socket: 

70 self._connect() 

71 tags = "".join(self.tags).encode("ascii") 

72 message = Message(b"_Subscri", tags) 

73 self.socket.send(message.data) 

74 message = Message(b"_Always") 

75 self.socket.send(message.data) 

76 

77 def put_message(self, tag, data): 

78 """Send data to the ligier with a given tag""" 

79 if not self.socket: 

80 self._connect() 

81 msg = Message(tag, data) 

82 self.socket.send(msg.data) 

83 

84 def get_message(self): 

85 while True: 

86 log.info(" Waiting for control host Prefix") 

87 if self.socket is None: 

88 log.error("Lost socket connection, reconnecting...") 

89 self._reconnect() 

90 continue 

91 try: 

92 data = self._recv(Prefix.SIZE) 

93 timestamp = time.time() 

94 log.info(" raw prefix data received: '%s'", data) 

95 if data == b"": 

96 raise EOFError 

97 prefix = Prefix(data=data, timestamp=timestamp) 

98 except (UnicodeDecodeError, OSError, struct.error): 

99 log.error("Failed to construct Prefix, reconnecting.") 

100 self._reconnect() 

101 continue 

102 

103 try: 

104 prefix_tag = str(prefix.tag) 

105 except UnicodeDecodeError: 

106 log.error("The tag could not be decoded. Reconnecting.") 

107 self._reconnect() 

108 continue 

109 

110 if prefix_tag not in self.valid_tags: 

111 log.error( 

112 "Invalid tag '%s' received, ignoring the message \n" 

113 "and reconnecting.\n" 

114 " -> valid tags are: %s", 

115 prefix_tag, 

116 self.valid_tags, 

117 ) 

118 self._reconnect() 

119 continue 

120 else: 

121 break 

122 

123 log.info(" got a Prefix with %d bytes.", prefix.length) 

124 message = self._recv(prefix.length) 

125 log.info(" ------ returning message with %d bytes", len(message)) 

126 return prefix, message 

127 

128 def _connect(self): 

129 """Connect to JLigier""" 

130 log.debug("Connecting to JLigier") 

131 

132 s = socket.socket() 

133 s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) 

134 s.connect((self.host, self.port)) 

135 self.socket = s 

136 

137 def _disconnect(self): 

138 """Close the socket""" 

139 log.debug("Disconnecting from JLigier") 

140 if self.socket: 

141 self.socket.close() 

142 

143 def _reconnect(self): 

144 """Reconnect to JLigier and subscribe to the tags.""" 

145 log.debug("Reconnecting to JLigier...") 

146 self._disconnect() 

147 self._connect() 

148 self._update_subscriptions() 

149 

150 def _recv(self, size): 

151 """Receive the exact amount of bytes from the socket. 

152 

153 This is needed since socket.recv(size) may return less than 

154 size, according to the specification: 'size' is the maximum 

155 number of bytes returned. 

156 """ 

157 message = b"" 

158 while len(message) < size: 

159 buffer_size = min((BUFFER_SIZE, (size - len(message)))) 

160 try: 

161 message += self.socket.recv(buffer_size) 

162 except OSError as e: 

163 log.error("Failed to receive controlhost message: %s", e) 

164 raise BufferError 

165 return message 

166 

167 def __enter__(self): 

168 self._connect() 

169 return self 

170 

171 def __exit__(self, exit_type, value, traceback): 

172 self._disconnect() 

173 

174 

175class Message(object): 

176 """The representation of a ControlHost message.""" 

177 

178 def __init__(self, tag, message=b""): 

179 try: 

180 message = message.encode() 

181 except AttributeError: 

182 pass 

183 try: 

184 tag = tag.encode() 

185 except AttributeError: 

186 pass 

187 self.prefix = Prefix(tag, len(message)) 

188 self.message = message 

189 

190 @property 

191 def data(self): 

192 return self.prefix.data + self.message 

193 

194 

195class Tag(object): 

196 """Represents the tag in a ControlHost Prefix.""" 

197 

198 SIZE = 8 

199 

200 def __init__(self, data=None): 

201 self._data = b"" 

202 self.data = data 

203 

204 @property 

205 def data(self): 

206 """The byte data""" 

207 return self._data 

208 

209 @data.setter 

210 def data(self, value): 

211 """Set the byte data and fill up the bytes to fit the size.""" 

212 if not value: 

213 value = b"" 

214 if len(value) > self.SIZE: 

215 raise ValueError("The maximum tag size is {}".format(self.SIZE)) 

216 self._data = value 

217 while len(self._data) < self.SIZE: 

218 self._data += b"\x00" 

219 

220 def __str__(self): 

221 return self.data.decode(encoding="UTF-8").strip("\x00") 

222 

223 def __len__(self): 

224 return len(self._data) 

225 

226 

227class Prefix(object): 

228 """The prefix of a ControlHost message.""" 

229 

230 SIZE = 16 

231 

232 def __init__(self, tag=None, length=None, data=None, timestamp=None): 

233 if data: 

234 self.data = data 

235 else: 

236 self.tag = Tag(tag) 

237 self.length = length 

238 if timestamp is None: 

239 self.timestamp = time.time() 

240 else: 

241 self.timestamp = timestamp 

242 

243 @property 

244 def data(self): 

245 return self.tag.data + struct.pack(">i", self.length) + b"\x00" * 4 

246 

247 @data.setter 

248 def data(self, value): 

249 self.tag = Tag(data=value[: Tag.SIZE]) 

250 self.length = struct.unpack(">i", value[Tag.SIZE : Tag.SIZE + 4])[0] 

251 

252 def __str__(self): 

253 return "ControlHost Prefix with tag '{0}' ({1} bytes of data)".format( 

254 self.tag, self.length 

255 )