Coverage for src/km3modules/hardware.py: 39%
145 statements
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
« prev ^ index » next coverage.py v7.5.1, created at 2024-05-08 03:14 +0000
1# Filename: hardware.py
2# -*- coding: utf-8 -*-
3# pylint: disable=locally-disabled
4"""
5A collection of controllers and hardware related stuff.
7"""
9import time
10import os
12import km3pipe as kp
14__author__ = "Jonas Reubelt and Tamas Gal"
15__email__ = "jreubelt@km3net.de"
16__status__ = "Development"
18log = kp.logger.get_logger(__name__) # pylint: disable=C0103
21class PhidgetsController(kp.Module):
22 def configure(self):
23 from Phidgets.Devices.Stepper import Stepper
24 from Phidgets.Devices.Encoder import Encoder
26 self.current_limit = self.get("current_limit") or 2.5
27 self.motor_id = self.get("motor_id") or 0
28 self.stepper = Stepper()
29 self.encoder = Encoder()
30 self.setup(self.motor_id)
32 def setup(self, motor_id):
33 self.stepper.openPhidget()
34 self.stepper.waitForAttach(10000)
36 self.encoder.openPhidget()
37 self.encoder.waitForAttach(10000)
39 self.stepper.setVelocityLimit(motor_id, 1000)
40 self.stepper.setAcceleration(motor_id, 5000)
41 self.stepper.setCurrentLimit(motor_id, 2.5)
43 self.e = 13250.0
44 self.s = 70500.0
46 self._stepper_dest = 0
47 self._encoder_dest = 0
49 self.reset_positions()
51 def drive_to_angle(self, ang, motor_id=0, relative=False):
52 stepper_dest = self._stepper_dest = self.raw_stepper_position(ang)
53 self._encoder_dest = self.raw_encoder_position(ang)
55 if relative:
56 self.reset_positions()
58 self.wake_up()
59 time.sleep(0.1)
61 self.stepper_target_pos = stepper_dest
62 self.wait_for_stepper()
63 self.log_offset()
65 while abs(self.offset) > 1:
66 self.log_offset()
67 stepper_offset = round(self.offset / self.e * self.s)
68 log.debug("Correcting stepper by {0}".format(stepper_offset))
69 log.debug("Stepper target pos: {0}".format(self.stepper_target_pos))
70 log.debug("Stepper pos: {0}".format(self.stepper_pos))
71 self.stepper_target_pos = self.stepper_pos + stepper_offset
72 self.wait_for_stepper()
74 self.log_positions()
75 self.stand_by()
77 def wait_for_stepper(self):
78 while self.stepper_pos != self._stepper_dest:
79 time.sleep(0.1)
80 self.log_positions()
82 def log_offset(self):
83 log.debug("Difference (encoder): {0}".format(self.offset))
85 @property
86 def offset(self):
87 return self._encoder_dest - self.encoder_pos
89 @property
90 def stepper_target_pos(self):
91 return self.stepper.getTargetPosition(self.motor_id)
93 @stepper_target_pos.setter
94 def stepper_target_pos(self, val):
95 self._stepper_dest = int(val)
96 self.stepper.setTargetPosition(self.motor_id, int(val))
98 @property
99 def stepper_pos(self):
100 return self.stepper.getCurrentPosition(self.motor_id)
102 @stepper_pos.setter
103 def stepper_pos(self, val):
104 self.stepper.setCurrentPosition(self.motor_id, int(val))
106 @property
107 def encoder_pos(self):
108 return self.encoder.getPosition(self.motor_id)
110 def raw_stepper_position(self, angle):
111 return round(angle * self.s / 360)
113 def raw_encoder_position(self, angle):
114 return round(angle * self.e / 360)
116 def wake_up(self, motor_id=0):
117 self.stepper.setEngaged(motor_id, 1)
119 def stand_by(self, motor_id=0):
120 self.stepper.setEngaged(motor_id, 0)
122 def reset_positions(self, motor_id=0):
123 self.stepper.setCurrentPosition(motor_id, 0)
124 self.encoder.setPosition(motor_id, 0)
126 def log_positions(self, motor_id=0):
127 log.info(
128 "Stepper position: {0}\nEncoder position:{1}".format(
129 self.stepper_pos / self.s * 360, self.encoder_pos / self.e * 360
130 )
131 )
134class USBTMC(object):
135 "USB TMC communicator"
137 def __init__(self, path):
138 self.device = os.open(path, os.O_RDWR)
140 def write(self, msg):
141 os.write(self.device, msg)
143 def read(self, size=1000):
144 return os.read(self.device, size)
146 @property
147 def name(self):
148 self.write(b"*IDN?")
149 return self.read()
151 def reset(self):
152 self.write(b"*RST")
155class Agilent33220A(object):
156 """Controller for the Arbitrary Waveform Generator"""
158 def __init__(self, path):
159 self.tmc = USBTMC(path)
160 self._output = False
161 self._amplitude = None
162 self._frequency = None
163 self._mode = None
165 @property
166 def output(self):
167 return self._output
169 @output.setter
170 def output(self, value):
171 self.tmc.write("OUTP {0}".format("ON" if value else "OFF").encode())
172 self._output = value
174 @property
175 def amplitude(self):
176 return self._amplitude
178 @amplitude.setter
179 def amplitude(self, val):
180 low, high = val
181 diff = high - low
182 offset = diff / 2
183 self.tmc.write("VOLT:OFFS {0}".format(offset).encode())
184 self.tmc.write("VOLT {0}".format(diff).encode())
185 self._amplitude = val
186 self._mode = None
188 @property
189 def frequency(self):
190 return self._frequency
192 @frequency.setter
193 def frequency(self, val):
194 self.tmc.write("FREQ {0}".format(val).encode())
195 self._frequency = val
197 @property
198 def mode(self):
199 return self._mode
201 @mode.setter
202 def mode(self, val):
203 valid_modes = ("sin", "squ", "ramp", "puls", "nois", "dc", "user")
204 if val not in valid_modes:
205 print(
206 "Not a valid mode: '{0}'. Valid modes are: {1}".format(val, valid_modes)
207 )
208 return
209 self.tmc.write("FUNC {0}".format(val.upper()).encode())
210 self._mode = val