Coverage for src/km3modules/ahrs.py: 19%
125 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1# Filename: ahrs.py
2# -*- coding: utf-8 -*-
3# pylint: disable=locally-disabled
4"""
5AHRS calibration.
7"""
9import io
10from collections import defaultdict
11import time
12import xml.etree.ElementTree as ET
14import km3db
16import numpy as np
17from numpy import cos, sin, arctan2
18import km3pipe as kp
19from km3pipe.tools import timed_cache
20from km3pipe.io.daq import TMCHData
22__author__ = "Tamas Gal"
23__email__ = "tgal@km3net.de"
24__status__ = "Development"
26log = kp.logger.get_logger(__name__) # pylint: disable=C0103
28# log.setLevel("DEBUG")
31class AHRSCalibrator(kp.Module):
32 """Calculates AHRS yaw, pitch and roll from median A and H of an interval.
34 Parameters
35 ----------
36 det_id: int
37 The detector ID, e.g. 29)
39 Other Parameters
40 ----------------
41 interval: int (accumulation interval in [sec], default: 10s)
43 Notes
44 -----
45 Writes 'AHRSCalibration' in the blob with:
46 dict: key=dom_id, value=tuple: (timestamp, du, floor, yaw, pitch, roll)
48 """
50 def configure(self):
51 det_id = self.require("det_id")
52 self.interval = self.get("interval") or 10 # in sec
53 self.A = defaultdict(list)
54 self.H = defaultdict(list)
55 self.detector = kp.hardware.Detector(det_id=det_id)
56 self.clbmap = km3db.CLBMap(det_id)
57 self.timestamp = time.time()
59 def process(self, blob):
60 tmch_data = TMCHData(io.BytesIO(blob["CHData"]))
61 dom_id = tmch_data.dom_id
62 try:
63 du, floor, _ = self.detector.doms[dom_id]
64 except KeyError: # base CLB
65 return blob
67 self.A[dom_id].append(tmch_data.A)
68 self.H[dom_id].append(tmch_data.H)
70 if time.time() - self.timestamp > self.interval:
71 self.timestamp = time.time()
72 calib = self.calibrate()
73 blob["AHRSCalibration"] = calib
75 return blob
77 def calibrate(self):
78 """Calculate yaw, pitch and roll from the median of A and H.
80 After successful calibration, the `self.A` and `self.H` are reset.
81 DOMs with missing AHRS pre-calibration data are skipped.
83 Returns
84 -------
85 dict: key=dom_id, value=tuple: (timestamp, du, floor, yaw, pitch, roll)
87 """
88 now = time.time()
89 dom_ids = self.A.keys()
90 print("Calibrating AHRS from median A and H for {} DOMs.".format(len(dom_ids)))
91 calibrations = {}
92 for dom_id in dom_ids:
93 print("Calibrating DOM ID {}".format(dom_id))
94 clb_upi = self.clbmap.doms_ids[dom_id].clb_upi
95 ahrs_calib = get_latest_ahrs_calibration(clb_upi)
96 if ahrs_calib is None:
97 log.warning("AHRS calibration missing for '{}'".format(dom_id))
98 continue
99 du, floor, _ = self.detector.doms[dom_id]
100 A = np.median(self.A[dom_id], axis=0)
101 H = np.median(self.H[dom_id], axis=0)
103 cyaw, cpitch, croll = fit_ahrs(A, H, *ahrs_calib)
104 calibrations[dom_id] = (now, du, floor, cyaw, cpitch, croll)
105 self.A = defaultdict(list)
106 self.H = defaultdict(list)
107 return calibrations
110def fit_ahrs(A, H, Aoff, Arot, Hoff, Hrot):
111 """Calculate yaw, pitch and roll for given A/H and calibration set.
113 Author: Vladimir Kulikovskiy
115 Parameters
116 ----------
117 A: list, tuple or numpy.array of shape (3,)
118 H: list, tuple or numpy.array of shape (3,)
119 Aoff: numpy.array of shape(3,)
120 Arot: numpy.array of shape(3, 3)
121 Hoff: numpy.array of shape(3,)
122 Hrot: numpy.array of shape(3, 3)
124 Returns
125 -------
126 yaw, pitch, roll
128 """
129 Acal = np.dot(A - Aoff, Arot)
130 Hcal = np.dot(H - Hoff, Hrot)
132 # invert axis for DOM upside down
133 for i in (1, 2):
134 Acal[i] = -Acal[i]
135 Hcal[i] = -Hcal[i]
137 roll = arctan2(-Acal[1], -Acal[2])
138 pitch = arctan2(Acal[0], np.sqrt(Acal[1] * Acal[1] + Acal[2] * Acal[2]))
139 yaw = arctan2(
140 Hcal[2] * sin(roll) - Hcal[1] * cos(roll),
141 sum(
142 (
143 Hcal[0] * cos(pitch),
144 Hcal[1] * sin(pitch) * sin(roll),
145 Hcal[2] * sin(pitch) * cos(roll),
146 )
147 ),
148 )
150 yaw = np.degrees(yaw)
151 while yaw < 0:
152 yaw += 360
153 # yaw = (yaw + magnetic_declination + 360 ) % 360
154 roll = np.degrees(roll)
155 pitch = np.degrees(pitch)
156 return yaw, pitch, roll
159@timed_cache(hours=1, maxsize=None, typed=False)
160def get_latest_ahrs_calibration(clb_upi, max_version=3):
161 """Retrieve the latest AHRS calibration data for a given CLB
163 Parameters
164 ----------
165 clb_upi: str
166 max_version: int, maximum version to check, optional
168 Returns
169 -------
170 Aoff: numpy.array with shape(3,)
171 Arot: numpy.array with shape(3,3)
172 Hoff: numpy.array with shape(3,)
173 Hrot: numpy.array with shape(3,3)
175 or None if no calibration found.
177 """
178 ahrs_upi = km3db.tools.clbupi2compassupi(clb_upi)
180 db = km3db.DBManager()
182 datasets = []
183 for version in range(max_version, 0, -1):
184 for n in range(1, 100):
185 log.debug("Iteration #{} to get the calib data".format(n))
186 url = (
187 "show_product_test.htm?upi={0}&"
188 "testtype=AHRS-CALIBRATION-v{1}&n={2}&out=xml".format(
189 ahrs_upi, version, n
190 )
191 )
192 log.debug("AHRS calib DB URL: {}".format(url))
193 _raw_data = db.get(url).replace("\n", "")
194 log.debug("What I got back as AHRS calib: {}".format(_raw_data))
195 if len(_raw_data) == 0:
196 break
197 try:
198 xroot = ET.parse(io.StringIO(_raw_data)).getroot()
199 except ET.ParseError:
200 continue
201 else:
202 datasets.append(xroot)
204 if len(datasets) == 0:
205 return None
207 latest_dataset = _get_latest_dataset(datasets)
208 return _extract_calibration(latest_dataset)
211def _get_latest_dataset(datasets):
212 """Find the latest valid AHRS calibration dataset"""
213 return sorted(datasets, key=lambda d: d.findall(".//EndTime")[0].text)[-1]
216def _extract_calibration(xroot):
217 """Extract AHRS calibration information from XML root.
219 Parameters
220 ----------
221 xroot: XML root
224 Returns
225 -------
226 Aoff: numpy.array with shape(3,)
227 Arot: numpy.array with shape(3,3)
228 Hoff: numpy.array with shape(3,)
229 Hrot: numpy.array with shape(3,3)
231 """
232 names = [c.text for c in xroot.findall(".//Name")]
233 val = [[i.text for i in c] for c in xroot.findall(".//Values")]
235 # The fields has to be reindeced, these are the index mappings
236 col_ic = [int(v) for v in val[names.index("AHRS_Matrix_Column")]]
237 try:
238 row_ic = [int(v) for v in val[names.index("AHRS_Matrix_Row")]]
239 except ValueError:
240 row_ic = [2, 2, 2, 1, 1, 1, 0, 0, 0]
241 try:
242 vec_ic = [int(v) for v in val[names.index("AHRS_Vector_Index")]]
243 except ValueError:
244 vec_ic = [2, 1, 0]
246 Aoff_ix = names.index("AHRS_Acceleration_Offset")
247 Arot_ix = names.index("AHRS_Acceleration_Rotation")
248 Hrot_ix = names.index("AHRS_Magnetic_Rotation")
250 Aoff = np.array(val[Aoff_ix])[vec_ic].astype(float)
251 Arot = (
252 np.array(val[Arot_ix]).reshape(3, 3)[col_ic, row_ic].reshape(3, 3).astype(float)
253 )
254 Hrot = (
255 np.array(val[Hrot_ix]).reshape(3, 3)[col_ic, row_ic].reshape(3, 3).astype(float)
256 )
258 Hoff = []
259 for q in "XYZ":
260 values = []
261 for t in ("Min", "Max"):
262 ix = names.index("AHRS_Magnetic_{}{}".format(q, t))
263 values.append(float(val[ix][0]))
264 Hoff.append(sum(values) / 2.0)
265 Hoff = np.array(Hoff)
267 return Aoff, Arot, Hoff, Hrot