Coverage for src/km3modules/ahrs.py: 19%

125 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 03:14 +0000

1# Filename: ahrs.py 

2# -*- coding: utf-8 -*- 

3# pylint: disable=locally-disabled 

4""" 

5AHRS calibration. 

6 

7""" 

8 

9import io 

10from collections import defaultdict 

11import time 

12import xml.etree.ElementTree as ET 

13 

14import km3db 

15 

16import numpy as np 

17from numpy import cos, sin, arctan2 

18import km3pipe as kp 

19from km3pipe.tools import timed_cache 

20from km3pipe.io.daq import TMCHData 

21 

22__author__ = "Tamas Gal" 

23__email__ = "tgal@km3net.de" 

24__status__ = "Development" 

25 

26log = kp.logger.get_logger(__name__) # pylint: disable=C0103 

27 

28# log.setLevel("DEBUG") 

29 

30 

31class AHRSCalibrator(kp.Module): 

32 """Calculates AHRS yaw, pitch and roll from median A and H of an interval. 

33 

34 Parameters 

35 ---------- 

36 det_id: int 

37 The detector ID, e.g. 29) 

38 

39 Other Parameters 

40 ---------------- 

41 interval: int (accumulation interval in [sec], default: 10s) 

42 

43 Notes 

44 ----- 

45 Writes 'AHRSCalibration' in the blob with: 

46 dict: key=dom_id, value=tuple: (timestamp, du, floor, yaw, pitch, roll) 

47 

48 """ 

49 

50 def configure(self): 

51 det_id = self.require("det_id") 

52 self.interval = self.get("interval") or 10 # in sec 

53 self.A = defaultdict(list) 

54 self.H = defaultdict(list) 

55 self.detector = kp.hardware.Detector(det_id=det_id) 

56 self.clbmap = km3db.CLBMap(det_id) 

57 self.timestamp = time.time() 

58 

59 def process(self, blob): 

60 tmch_data = TMCHData(io.BytesIO(blob["CHData"])) 

61 dom_id = tmch_data.dom_id 

62 try: 

63 du, floor, _ = self.detector.doms[dom_id] 

64 except KeyError: # base CLB 

65 return blob 

66 

67 self.A[dom_id].append(tmch_data.A) 

68 self.H[dom_id].append(tmch_data.H) 

69 

70 if time.time() - self.timestamp > self.interval: 

71 self.timestamp = time.time() 

72 calib = self.calibrate() 

73 blob["AHRSCalibration"] = calib 

74 

75 return blob 

76 

77 def calibrate(self): 

78 """Calculate yaw, pitch and roll from the median of A and H. 

79 

80 After successful calibration, the `self.A` and `self.H` are reset. 

81 DOMs with missing AHRS pre-calibration data are skipped. 

82 

83 Returns 

84 ------- 

85 dict: key=dom_id, value=tuple: (timestamp, du, floor, yaw, pitch, roll) 

86 

87 """ 

88 now = time.time() 

89 dom_ids = self.A.keys() 

90 print("Calibrating AHRS from median A and H for {} DOMs.".format(len(dom_ids))) 

91 calibrations = {} 

92 for dom_id in dom_ids: 

93 print("Calibrating DOM ID {}".format(dom_id)) 

94 clb_upi = self.clbmap.doms_ids[dom_id].clb_upi 

95 ahrs_calib = get_latest_ahrs_calibration(clb_upi) 

96 if ahrs_calib is None: 

97 log.warning("AHRS calibration missing for '{}'".format(dom_id)) 

98 continue 

99 du, floor, _ = self.detector.doms[dom_id] 

100 A = np.median(self.A[dom_id], axis=0) 

101 H = np.median(self.H[dom_id], axis=0) 

102 

103 cyaw, cpitch, croll = fit_ahrs(A, H, *ahrs_calib) 

104 calibrations[dom_id] = (now, du, floor, cyaw, cpitch, croll) 

105 self.A = defaultdict(list) 

106 self.H = defaultdict(list) 

107 return calibrations 

108 

109 

110def fit_ahrs(A, H, Aoff, Arot, Hoff, Hrot): 

111 """Calculate yaw, pitch and roll for given A/H and calibration set. 

112 

113 Author: Vladimir Kulikovskiy 

114 

115 Parameters 

116 ---------- 

117 A: list, tuple or numpy.array of shape (3,) 

118 H: list, tuple or numpy.array of shape (3,) 

119 Aoff: numpy.array of shape(3,) 

120 Arot: numpy.array of shape(3, 3) 

121 Hoff: numpy.array of shape(3,) 

122 Hrot: numpy.array of shape(3, 3) 

123 

124 Returns 

125 ------- 

126 yaw, pitch, roll 

127 

128 """ 

129 Acal = np.dot(A - Aoff, Arot) 

130 Hcal = np.dot(H - Hoff, Hrot) 

131 

132 # invert axis for DOM upside down 

133 for i in (1, 2): 

134 Acal[i] = -Acal[i] 

135 Hcal[i] = -Hcal[i] 

136 

137 roll = arctan2(-Acal[1], -Acal[2]) 

138 pitch = arctan2(Acal[0], np.sqrt(Acal[1] * Acal[1] + Acal[2] * Acal[2])) 

139 yaw = arctan2( 

140 Hcal[2] * sin(roll) - Hcal[1] * cos(roll), 

141 sum( 

142 ( 

143 Hcal[0] * cos(pitch), 

144 Hcal[1] * sin(pitch) * sin(roll), 

145 Hcal[2] * sin(pitch) * cos(roll), 

146 ) 

147 ), 

148 ) 

149 

150 yaw = np.degrees(yaw) 

151 while yaw < 0: 

152 yaw += 360 

153 # yaw = (yaw + magnetic_declination + 360 ) % 360 

154 roll = np.degrees(roll) 

155 pitch = np.degrees(pitch) 

156 return yaw, pitch, roll 

157 

158 

159@timed_cache(hours=1, maxsize=None, typed=False) 

160def get_latest_ahrs_calibration(clb_upi, max_version=3): 

161 """Retrieve the latest AHRS calibration data for a given CLB 

162 

163 Parameters 

164 ---------- 

165 clb_upi: str 

166 max_version: int, maximum version to check, optional 

167 

168 Returns 

169 ------- 

170 Aoff: numpy.array with shape(3,) 

171 Arot: numpy.array with shape(3,3) 

172 Hoff: numpy.array with shape(3,) 

173 Hrot: numpy.array with shape(3,3) 

174 

175 or None if no calibration found. 

176 

177 """ 

178 ahrs_upi = km3db.tools.clbupi2compassupi(clb_upi) 

179 

180 db = km3db.DBManager() 

181 

182 datasets = [] 

183 for version in range(max_version, 0, -1): 

184 for n in range(1, 100): 

185 log.debug("Iteration #{} to get the calib data".format(n)) 

186 url = ( 

187 "show_product_test.htm?upi={0}&" 

188 "testtype=AHRS-CALIBRATION-v{1}&n={2}&out=xml".format( 

189 ahrs_upi, version, n 

190 ) 

191 ) 

192 log.debug("AHRS calib DB URL: {}".format(url)) 

193 _raw_data = db.get(url).replace("\n", "") 

194 log.debug("What I got back as AHRS calib: {}".format(_raw_data)) 

195 if len(_raw_data) == 0: 

196 break 

197 try: 

198 xroot = ET.parse(io.StringIO(_raw_data)).getroot() 

199 except ET.ParseError: 

200 continue 

201 else: 

202 datasets.append(xroot) 

203 

204 if len(datasets) == 0: 

205 return None 

206 

207 latest_dataset = _get_latest_dataset(datasets) 

208 return _extract_calibration(latest_dataset) 

209 

210 

211def _get_latest_dataset(datasets): 

212 """Find the latest valid AHRS calibration dataset""" 

213 return sorted(datasets, key=lambda d: d.findall(".//EndTime")[0].text)[-1] 

214 

215 

216def _extract_calibration(xroot): 

217 """Extract AHRS calibration information from XML root. 

218 

219 Parameters 

220 ---------- 

221 xroot: XML root 

222 

223 

224 Returns 

225 ------- 

226 Aoff: numpy.array with shape(3,) 

227 Arot: numpy.array with shape(3,3) 

228 Hoff: numpy.array with shape(3,) 

229 Hrot: numpy.array with shape(3,3) 

230 

231 """ 

232 names = [c.text for c in xroot.findall(".//Name")] 

233 val = [[i.text for i in c] for c in xroot.findall(".//Values")] 

234 

235 # The fields has to be reindeced, these are the index mappings 

236 col_ic = [int(v) for v in val[names.index("AHRS_Matrix_Column")]] 

237 try: 

238 row_ic = [int(v) for v in val[names.index("AHRS_Matrix_Row")]] 

239 except ValueError: 

240 row_ic = [2, 2, 2, 1, 1, 1, 0, 0, 0] 

241 try: 

242 vec_ic = [int(v) for v in val[names.index("AHRS_Vector_Index")]] 

243 except ValueError: 

244 vec_ic = [2, 1, 0] 

245 

246 Aoff_ix = names.index("AHRS_Acceleration_Offset") 

247 Arot_ix = names.index("AHRS_Acceleration_Rotation") 

248 Hrot_ix = names.index("AHRS_Magnetic_Rotation") 

249 

250 Aoff = np.array(val[Aoff_ix])[vec_ic].astype(float) 

251 Arot = ( 

252 np.array(val[Arot_ix]).reshape(3, 3)[col_ic, row_ic].reshape(3, 3).astype(float) 

253 ) 

254 Hrot = ( 

255 np.array(val[Hrot_ix]).reshape(3, 3)[col_ic, row_ic].reshape(3, 3).astype(float) 

256 ) 

257 

258 Hoff = [] 

259 for q in "XYZ": 

260 values = [] 

261 for t in ("Min", "Max"): 

262 ix = names.index("AHRS_Magnetic_{}{}".format(q, t)) 

263 values.append(float(val[ix][0])) 

264 Hoff.append(sum(values) / 2.0) 

265 Hoff = np.array(Hoff) 

266 

267 return Aoff, Arot, Hoff, Hrot