1# Copyright (c) 2017 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5"""Interface to control RF Switch. 6 7Helper module to control RF Switch. Common commands to control the relays 8are made available as methods that can be called. 9 10Please refer go/rf-switch for more info on the switch. 11""" 12import contextlib 13import logging 14 15from autotest_lib.server.cros.network.rf_switch import scpi 16 17 18class RfSwitchException(Exception): 19 """Exception for RfSwitch Errors.""" 20 21 22class RfSwitch(scpi.Scpi): 23 """RF Switch Controller.""" 24 25 _ALL_AP_RELAYS = 'k1_1:k1_24' # Each AP uses 6 relays 26 _ALL_CLIENT_RELAYS = 'k1_25:k1_48' # Each client uses 6 relays 27 _MIN_ENCLOSURE = 1 # Starting enclosure number 28 _MAX_ENCLOSURE = 4 # Last enclosure number 29 _ALL_ENCLOSURE = 0 # All the enclosures 30 _RELAYS_PER_ENCLOSURE = 6 # Number of relays in an enclosure 31 32 # Each AP enclosure uses 6 relays and a R relay to set attenuation 33 _AP_ATTENUATOR_RELAYS = { 34 1: ['k1_49', 'k1_50', 'k1_51', 'k1_52', 'k1_53', 'k1_54', 'R1_9'], 35 2: ['k1_55', 'k1_56', 'k1_57', 'k1_58', 'k1_59', 'k1_60', 'R1_10'], 36 3: ['k1_61', 'k1_62', 'k1_63', 'k1_64', 'k1_65', 'k1_66', 'R1_11'], 37 4: ['k1_67', 'k1_68', 'k1_69', 'k1_70', 'k1_71', 'k1_72', 'R1_12'], 38 } 39 # Shorter version 40 _AP_ATTENUATOR_RELAYS_SHORT = { 41 1: 'k1_49:k1_54,R1_9', 42 2: 'k1_55:k1_60,R1_10', 43 3: 'k1_61:k1_66,R1_11', 44 4: 'k1_67:k1_72,R1_12', 45 } 46 47 _CMD_CLOSE_RELAYS = 'ROUT:CLOS' 48 _CMD_CHECK_RELAYS_CLOSED = 'ROUT:CLOS?' 49 _CMD_OPEN_RELAYS = 'ROUT:OPEN' 50 _CMD_OPEN_ALL_RELAYS = 'ROUT:OPEN:ALL' 51 _CMD_SET_VERIFY = 'ROUT:CHAN:VER' 52 _CMD_GET_VERIFY = 'ROUT:CHAN:VER?' 53 _CMD_SET_VERIFY_INVERTED = 'ROUT:CHAN:VER:POL' 54 _CMD_GET_VERIFY_INVERTED = 'ROUT:CHAN:VER:POL?' 55 _CMD_GET_VERIFY_STATE = 'ROUT:CHAN:VER:POS:STAT?' 56 _CMD_CHECK_BUSY = 'ROUT:MOD:BUSY?' 57 _CMD_WAIT = 'ROUT:MOD:WAIT' 58 59 def __init__(self, host, port=scpi.Scpi.SCPI_PORT): 60 """Controller for RF Switch. 61 62 @param host: Hostname or IP address of RF Switch 63 @param port: Int SCPI port number (default 5025) 64 65 """ 66 scpi.Scpi.__init__(self, host, port) 67 68 def send_cmd_check_error(self, cmd): 69 """Send command to switch and check for any error. 70 71 @param cmd: string cmd to send to switch 72 73 """ 74 self.write(cmd) 75 code, error = self.error_query() 76 if code: 77 raise RfSwitchException('Error on command: "%s" - code: %s,' 78 ' message: %s', cmd, code, error) 79 self.wait() 80 81 def close_relays(self, relays): 82 """Close relays. 83 84 @param relays: relays to close (, to separate and : for range) 85 86 """ 87 self.send_cmd_check_error('%s (@%s)\n' 88 % (self._CMD_CLOSE_RELAYS, relays)) 89 90 def relays_closed(self, relays): 91 """Get open/closed status of relays. 92 93 @param relays: relays to check (, to separate and : for range) 94 95 @returns tuple of relay status, status is true if a relay is closed. 96 97 """ 98 99 status = self.query('%s (@%s)\n' % (self._CMD_CHECK_RELAYS_CLOSED, 100 relays)).strip().split(',') 101 return tuple(bool(int(x)) for x in status) 102 103 def open_relays(self, relays): 104 """Open relays. 105 106 @param relays: string relays to close (, to separate and : for range) 107 108 """ 109 self.send_cmd_check_error('%s (@%s)\n' % ( 110 self._CMD_OPEN_RELAYS, relays)) 111 112 def open_all_relays(self): 113 """Open all relays. 114 115 This will open all relays including the attenuator, which will set it 116 to Max. Please remember to set your attenuation to right level after 117 this call. 118 119 """ 120 self.send_cmd_check_error('%s\n' % self._CMD_OPEN_ALL_RELAYS) 121 122 def set_verify(self, relays, on): 123 """Configure Close? to return indicator state instead of driven state. 124 125 @param relays: relays to verify (, to separate and : for range) 126 @param on: string state to verify (on(True)/off(False)) 127 128 """ 129 self.send_cmd_check_error('%s %s,(@%s)\n' % ( 130 self._CMD_SET_VERIFY, int(bool(on)), relays)) 131 132 def get_verify(self, relays): 133 """Get the verify mode of relays. 134 135 @param relays: relays to verify (, to separate and : for range) 136 @returns tuple of verify mode for relays 137 138 """ 139 status = self.query('%s (@%s)\n' % ( 140 self._CMD_GET_VERIFY, relays)).strip().split(',') 141 return tuple(bool(int(x)) for x in status) 142 143 def set_verify_inverted(self, relays, inverted): 144 """Set the polarity confidence of relay to be inverted. 145 146 @param relays: relays to set (, to separate and : for range) 147 @param inverted: Boolean True if INV, False for NORM 148 149 """ 150 self.send_cmd_check_error('%s %s,(@%s)\n' % ( 151 self._CMD_SET_VERIFY_INVERTED, 152 'INV' if inverted else 'NORM', relays)) 153 154 def get_verify_inverted(self, relays): 155 """Get the confidence polarity. 1 is inverted, 0 is value as-is. 156 157 @param relays: relays to get (, to separate and : for range) 158 @returns tuple of status where 1 in inverted and 0 for value as-is 159 160 """ 161 status = self.query('%s (@%s)\n' % (self._CMD_GET_VERIFY_INVERTED, 162 relays)).strip().split(',') 163 return tuple(bool(int(x)) for x in status) 164 165 def get_verify_state(self, relays): 166 """If verify set get driven state, else confidence state. 167 168 @param relays: relays to get (, to separate and : for range) 169 170 @returns tuple of verify status for given relays 171 172 """ 173 status = self.query('%s (@%s)\n' % (self._CMD_GET_VERIFY_STATE, 174 relays)).strip().split(',') 175 return tuple(bool(int(x)) for x in status) 176 177 @property 178 def busy(self): 179 """Check relays are still settling. 180 181 @returns Boolean True if relays are settling, False if not. 182 183 """ 184 return bool(int(self.query('%s\n' % self._CMD_CHECK_BUSY).strip())) 185 186 def wait(self): 187 """Wait for relays to debounce.""" 188 self.write('%s\n' % self._CMD_WAIT) 189 190 def get_attenuation(self, ap_enclosure): 191 """Get Attenuation for an AP enclosure. 192 193 Attenuation is set by turning on/off the relays. Each relay specifies 194 a bit in the binary value of attenuation. Find the relay status and 195 build the binary value, then find the decimal value from it. 196 197 @param ap_enclosure: Int 1/2/3/4 (AP enclosure index) 198 199 @returns attenuation value in int 200 201 @raises ValueError: on bad ap_enclosure value 202 203 """ 204 if (ap_enclosure < self._MIN_ENCLOSURE or 205 ap_enclosure > self._MAX_ENCLOSURE): 206 raise ValueError('Invalid AP enclosure number: %s.', ap_enclosure) 207 else: 208 a_status = self.relays_closed( 209 self._AP_ATTENUATOR_RELAYS_SHORT[ap_enclosure]) 210 status = a_status[::-1] # reverse for Endian transform 211 logging.debug('attenuator status: %s', status) 212 213 # build the binary value 214 binary = ''.join(['0' if x else '1' for x in status]) 215 return int(binary, 2) 216 217 def get_all_attenuation(self): 218 """Get attenuation value for all AP enclosures. 219 220 @returns tuple of attenuation value for each enclosure 221 222 """ 223 attenuations = [] 224 for x in range(self._MIN_ENCLOSURE, self._MAX_ENCLOSURE + 1): 225 attenuations.append(self.get_attenuation(x)) 226 return tuple(attenuations) 227 228 def set_attenuation(self, ap_enclosure, attenuation): 229 """Set attenuation for an AP enclosure. 230 231 @param ap_enclosure: Int 0,1,2,3,4 AP enclosure number. 0 for all 232 @param attenuation: Int Attenuation value to set 233 234 @raises ValueError: on bad ap_enclosure value 235 236 """ 237 if ap_enclosure == self._ALL_ENCLOSURE: 238 # set attenuation on all 239 for x in range(self._MIN_ENCLOSURE, self._MAX_ENCLOSURE + 1): 240 self.set_attenuation(x, attenuation) 241 elif (ap_enclosure < self._MIN_ENCLOSURE or 242 ap_enclosure > self._MAX_ENCLOSURE): 243 raise ValueError('Bad AP enclosure value: %s' % ap_enclosure) 244 else: 245 # convert attenuation decimal value to binary 246 bin_array = [int(x) for x in list('{0:07b}'.format(attenuation))] 247 # For endian 248 reverse_bits = self._AP_ATTENUATOR_RELAYS[ap_enclosure][:: -1] 249 250 # determine which bits should be set 251 relays_to_close = [ 252 reverse_bits[i] for i, j in enumerate(bin_array) if not j 253 ] 254 255 # open all relay for attenuator & then close the ones we need 256 relays = ','.join(self._AP_ATTENUATOR_RELAYS[ap_enclosure]) 257 self.open_relays(relays) 258 logging.debug('Attenuator relays opened: %s', relays) 259 relays = ','.join(relays_to_close) 260 self.close_relays(relays) 261 logging.debug('Attenuator relays closed: %s', relays) 262 263 def get_ap_connections(self): 264 """Get a list of AP to client connections. 265 266 @returns tuple of dict of connections ({'AP': 1, 'Client': 1}, ...) 267 268 """ 269 # Get the closed status for relays connected to AP enclosures. 270 ap_connections = self.relays_closed(self._ALL_AP_RELAYS) 271 272 # Find out the connections 273 connections = [] 274 for index, relay_num in enumerate(ap_connections): 275 # if the relay is closed, there is a connection 276 if relay_num: 277 connection = { 278 'AP': (index / self._RELAYS_PER_ENCLOSURE) + 1, 279 'Client': (index % self._RELAYS_PER_ENCLOSURE) + 1 280 } 281 connections.append(connection) 282 return tuple(connections) 283 284 def get_client_connections(self): 285 """Get a list of AP to client connections. 286 287 @returns tuple of dict of connections ({'AP': 1, 'Client': 1}, ...) 288 289 """ 290 # Get the closed status for relays connected to client enclosures. 291 c_connections = self.relays_closed(self._ALL_CLIENT_RELAYS) 292 293 # Find out the connections 294 connections = [] 295 for index, relay_num in enumerate(c_connections): 296 # if the relay is closed, there is a connection 297 if relay_num: 298 connection = { 299 'AP': (index % self._RELAYS_PER_ENCLOSURE) + 1, 300 'Client': (index / self._RELAYS_PER_ENCLOSURE) + 1 301 } 302 connections.append(connection) 303 return tuple(connections) 304 305 def connect_ap_client(self, ap, client): 306 """Connect AP and a Client enclosure. 307 308 @param ap: Int 1/2/3/4 AP enclosure index 309 @param client: Int 1/2/3/4 Client enclosure index 310 311 @raises ValueError: when ap / client value is not 1/2/3/4 312 313 """ 314 if (ap < self._MIN_ENCLOSURE or ap > self._MAX_ENCLOSURE): 315 raise ValueError( 316 'AP enclosure should be 1/2/3/4. Given: %s' % ap) 317 if (client < self._MIN_ENCLOSURE or 318 client > self._MAX_ENCLOSURE): 319 raise ValueError('Client enclosure should be 1/2/3/4. Given: %s' % 320 client) 321 322 # Relay index start from 0 so subtract 1 for index 323 relays = 'k1_%s,k1_%s' % ( 324 ((int(ap) - 1) * self._RELAYS_PER_ENCLOSURE) + int(client), 325 (((int(client) - 1) + self._MAX_ENCLOSURE) * 326 self._RELAYS_PER_ENCLOSURE) + int(ap)) 327 logging.debug('Relays to close: %s', relays) 328 self.close_relays(relays) 329 330 331@contextlib.contextmanager 332def RfSwitchSession(host, port=scpi.Scpi.SCPI_PORT): 333 """Start a RF Switch session and close it when done. 334 335 @param host: Hostname or IP address of RF Switch 336 @param port: Int SCPI port number (default 5025) 337 338 @yields: an instance of InteractiveSsh 339 """ 340 session = RfSwitch(host, port) 341 try: 342 yield session 343 finally: 344 session.close() 345