Coverage for src/km3modules/hardware.py: 39%

145 statements  

« prev     ^ index     » next       coverage.py v7.5.1, created at 2024-05-08 03:14 +0000

1# Filename: hardware.py 

2# -*- coding: utf-8 -*- 

3# pylint: disable=locally-disabled 

4""" 

5A collection of controllers and hardware related stuff. 

6 

7""" 

8 

9import time 

10import os 

11 

12import km3pipe as kp 

13 

14__author__ = "Jonas Reubelt and Tamas Gal" 

15__email__ = "jreubelt@km3net.de" 

16__status__ = "Development" 

17 

18log = kp.logger.get_logger(__name__) # pylint: disable=C0103 

19 

20 

21class PhidgetsController(kp.Module): 

22 def configure(self): 

23 from Phidgets.Devices.Stepper import Stepper 

24 from Phidgets.Devices.Encoder import Encoder 

25 

26 self.current_limit = self.get("current_limit") or 2.5 

27 self.motor_id = self.get("motor_id") or 0 

28 self.stepper = Stepper() 

29 self.encoder = Encoder() 

30 self.setup(self.motor_id) 

31 

32 def setup(self, motor_id): 

33 self.stepper.openPhidget() 

34 self.stepper.waitForAttach(10000) 

35 

36 self.encoder.openPhidget() 

37 self.encoder.waitForAttach(10000) 

38 

39 self.stepper.setVelocityLimit(motor_id, 1000) 

40 self.stepper.setAcceleration(motor_id, 5000) 

41 self.stepper.setCurrentLimit(motor_id, 2.5) 

42 

43 self.e = 13250.0 

44 self.s = 70500.0 

45 

46 self._stepper_dest = 0 

47 self._encoder_dest = 0 

48 

49 self.reset_positions() 

50 

51 def drive_to_angle(self, ang, motor_id=0, relative=False): 

52 stepper_dest = self._stepper_dest = self.raw_stepper_position(ang) 

53 self._encoder_dest = self.raw_encoder_position(ang) 

54 

55 if relative: 

56 self.reset_positions() 

57 

58 self.wake_up() 

59 time.sleep(0.1) 

60 

61 self.stepper_target_pos = stepper_dest 

62 self.wait_for_stepper() 

63 self.log_offset() 

64 

65 while abs(self.offset) > 1: 

66 self.log_offset() 

67 stepper_offset = round(self.offset / self.e * self.s) 

68 log.debug("Correcting stepper by {0}".format(stepper_offset)) 

69 log.debug("Stepper target pos: {0}".format(self.stepper_target_pos)) 

70 log.debug("Stepper pos: {0}".format(self.stepper_pos)) 

71 self.stepper_target_pos = self.stepper_pos + stepper_offset 

72 self.wait_for_stepper() 

73 

74 self.log_positions() 

75 self.stand_by() 

76 

77 def wait_for_stepper(self): 

78 while self.stepper_pos != self._stepper_dest: 

79 time.sleep(0.1) 

80 self.log_positions() 

81 

82 def log_offset(self): 

83 log.debug("Difference (encoder): {0}".format(self.offset)) 

84 

85 @property 

86 def offset(self): 

87 return self._encoder_dest - self.encoder_pos 

88 

89 @property 

90 def stepper_target_pos(self): 

91 return self.stepper.getTargetPosition(self.motor_id) 

92 

93 @stepper_target_pos.setter 

94 def stepper_target_pos(self, val): 

95 self._stepper_dest = int(val) 

96 self.stepper.setTargetPosition(self.motor_id, int(val)) 

97 

98 @property 

99 def stepper_pos(self): 

100 return self.stepper.getCurrentPosition(self.motor_id) 

101 

102 @stepper_pos.setter 

103 def stepper_pos(self, val): 

104 self.stepper.setCurrentPosition(self.motor_id, int(val)) 

105 

106 @property 

107 def encoder_pos(self): 

108 return self.encoder.getPosition(self.motor_id) 

109 

110 def raw_stepper_position(self, angle): 

111 return round(angle * self.s / 360) 

112 

113 def raw_encoder_position(self, angle): 

114 return round(angle * self.e / 360) 

115 

116 def wake_up(self, motor_id=0): 

117 self.stepper.setEngaged(motor_id, 1) 

118 

119 def stand_by(self, motor_id=0): 

120 self.stepper.setEngaged(motor_id, 0) 

121 

122 def reset_positions(self, motor_id=0): 

123 self.stepper.setCurrentPosition(motor_id, 0) 

124 self.encoder.setPosition(motor_id, 0) 

125 

126 def log_positions(self, motor_id=0): 

127 log.info( 

128 "Stepper position: {0}\nEncoder position:{1}".format( 

129 self.stepper_pos / self.s * 360, self.encoder_pos / self.e * 360 

130 ) 

131 ) 

132 

133 

134class USBTMC(object): 

135 "USB TMC communicator" 

136 

137 def __init__(self, path): 

138 self.device = os.open(path, os.O_RDWR) 

139 

140 def write(self, msg): 

141 os.write(self.device, msg) 

142 

143 def read(self, size=1000): 

144 return os.read(self.device, size) 

145 

146 @property 

147 def name(self): 

148 self.write(b"*IDN?") 

149 return self.read() 

150 

151 def reset(self): 

152 self.write(b"*RST") 

153 

154 

155class Agilent33220A(object): 

156 """Controller for the Arbitrary Waveform Generator""" 

157 

158 def __init__(self, path): 

159 self.tmc = USBTMC(path) 

160 self._output = False 

161 self._amplitude = None 

162 self._frequency = None 

163 self._mode = None 

164 

165 @property 

166 def output(self): 

167 return self._output 

168 

169 @output.setter 

170 def output(self, value): 

171 self.tmc.write("OUTP {0}".format("ON" if value else "OFF").encode()) 

172 self._output = value 

173 

174 @property 

175 def amplitude(self): 

176 return self._amplitude 

177 

178 @amplitude.setter 

179 def amplitude(self, val): 

180 low, high = val 

181 diff = high - low 

182 offset = diff / 2 

183 self.tmc.write("VOLT:OFFS {0}".format(offset).encode()) 

184 self.tmc.write("VOLT {0}".format(diff).encode()) 

185 self._amplitude = val 

186 self._mode = None 

187 

188 @property 

189 def frequency(self): 

190 return self._frequency 

191 

192 @frequency.setter 

193 def frequency(self, val): 

194 self.tmc.write("FREQ {0}".format(val).encode()) 

195 self._frequency = val 

196 

197 @property 

198 def mode(self): 

199 return self._mode 

200 

201 @mode.setter 

202 def mode(self, val): 

203 valid_modes = ("sin", "squ", "ramp", "puls", "nois", "dc", "user") 

204 if val not in valid_modes: 

205 print( 

206 "Not a valid mode: '{0}'. Valid modes are: {1}".format(val, valid_modes) 

207 ) 

208 return 

209 self.tmc.write("FUNC {0}".format(val.upper()).encode()) 

210 self._mode = val