• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2017 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5"""Interface to control RF Switch.
6
7Helper module to control RF Switch. Common commands to control the relays
8are made available as methods that can be called.
9
10Please refer go/rf-switch for more info on the switch.
11"""
12import contextlib
13import logging
14
15from autotest_lib.server.cros.network.rf_switch import scpi
16
17
18class RfSwitchException(Exception):
19    """Exception for RfSwitch Errors."""
20
21
22class RfSwitch(scpi.Scpi):
23    """RF Switch Controller."""
24
25    _ALL_AP_RELAYS = 'k1_1:k1_24'  # Each AP uses 6 relays
26    _ALL_CLIENT_RELAYS = 'k1_25:k1_48'  # Each client uses 6 relays
27    _MIN_ENCLOSURE = 1  # Starting enclosure number
28    _MAX_ENCLOSURE = 4  # Last enclosure number
29    _ALL_ENCLOSURE = 0  # All the enclosures
30    _RELAYS_PER_ENCLOSURE = 6  # Number of relays in an enclosure
31
32    # Each AP enclosure uses 6 relays and a R relay to set attenuation
33    _AP_ATTENUATOR_RELAYS = {
34        1: ['k1_49', 'k1_50', 'k1_51', 'k1_52', 'k1_53', 'k1_54', 'R1_9'],
35        2: ['k1_55', 'k1_56', 'k1_57', 'k1_58', 'k1_59', 'k1_60', 'R1_10'],
36        3: ['k1_61', 'k1_62', 'k1_63', 'k1_64', 'k1_65', 'k1_66', 'R1_11'],
37        4: ['k1_67', 'k1_68', 'k1_69', 'k1_70', 'k1_71', 'k1_72', 'R1_12'],
38    }
39    # Shorter version
40    _AP_ATTENUATOR_RELAYS_SHORT = {
41        1: 'k1_49:k1_54,R1_9',
42        2: 'k1_55:k1_60,R1_10',
43        3: 'k1_61:k1_66,R1_11',
44        4: 'k1_67:k1_72,R1_12',
45    }
46
47    _CMD_CLOSE_RELAYS = 'ROUT:CLOS'
48    _CMD_CHECK_RELAYS_CLOSED = 'ROUT:CLOS?'
49    _CMD_OPEN_RELAYS = 'ROUT:OPEN'
50    _CMD_OPEN_ALL_RELAYS = 'ROUT:OPEN:ALL'
51    _CMD_SET_VERIFY = 'ROUT:CHAN:VER'
52    _CMD_GET_VERIFY = 'ROUT:CHAN:VER?'
53    _CMD_SET_VERIFY_INVERTED = 'ROUT:CHAN:VER:POL'
54    _CMD_GET_VERIFY_INVERTED = 'ROUT:CHAN:VER:POL?'
55    _CMD_GET_VERIFY_STATE = 'ROUT:CHAN:VER:POS:STAT?'
56    _CMD_CHECK_BUSY = 'ROUT:MOD:BUSY?'
57    _CMD_WAIT = 'ROUT:MOD:WAIT'
58
59    def __init__(self, host, port=scpi.Scpi.SCPI_PORT):
60        """Controller for RF Switch.
61
62        @param host: Hostname or IP address of RF Switch
63        @param port: Int SCPI port number (default 5025)
64
65        """
66        scpi.Scpi.__init__(self, host, port)
67
68    def send_cmd_check_error(self, cmd):
69        """Send command to switch and check for any error.
70
71        @param cmd: string cmd to send to switch
72
73        """
74        self.write(cmd)
75        code, error = self.error_query()
76        if code:
77            raise RfSwitchException('Error on command: "%s" - code: %s,'
78                                    ' message: %s', cmd, code, error)
79        self.wait()
80
81    def close_relays(self, relays):
82        """Close relays.
83
84        @param relays: relays to close (, to separate and : for range)
85
86        """
87        self.send_cmd_check_error('%s (@%s)\n'
88                                  % (self._CMD_CLOSE_RELAYS, relays))
89
90    def relays_closed(self, relays):
91        """Get open/closed status of relays.
92
93        @param relays: relays to check (, to separate and : for range)
94
95        @returns tuple of relay status, status is true if a relay is closed.
96
97        """
98
99        status = self.query('%s (@%s)\n' % (self._CMD_CHECK_RELAYS_CLOSED,
100                                            relays)).strip().split(',')
101        return tuple(bool(int(x)) for x in status)
102
103    def open_relays(self, relays):
104        """Open relays.
105
106        @param relays: string relays to close (, to separate and : for range)
107
108        """
109        self.send_cmd_check_error('%s (@%s)\n' % (
110                self._CMD_OPEN_RELAYS, relays))
111
112    def open_all_relays(self):
113        """Open all relays.
114
115        This will open all relays including the attenuator, which will set it
116        to Max. Please remember to set your attenuation to right level after
117        this call.
118
119        """
120        self.send_cmd_check_error('%s\n' % self._CMD_OPEN_ALL_RELAYS)
121
122    def set_verify(self, relays, on):
123        """Configure Close? to return indicator state instead of driven state.
124
125        @param relays: relays to verify (, to separate and : for range)
126        @param on: string state to verify (on(True)/off(False))
127
128        """
129        self.send_cmd_check_error('%s %s,(@%s)\n' % (
130            self._CMD_SET_VERIFY, int(bool(on)), relays))
131
132    def get_verify(self, relays):
133        """Get the verify mode of relays.
134
135        @param relays: relays to verify (, to separate and : for range)
136        @returns tuple of verify mode for relays
137
138        """
139        status = self.query('%s (@%s)\n' % (
140            self._CMD_GET_VERIFY, relays)).strip().split(',')
141        return tuple(bool(int(x)) for x in status)
142
143    def set_verify_inverted(self, relays, inverted):
144        """Set the polarity confidence of relay to be inverted.
145
146        @param relays: relays to set (, to separate and : for range)
147        @param inverted: Boolean True if INV, False for NORM
148
149        """
150        self.send_cmd_check_error('%s %s,(@%s)\n' % (
151                self._CMD_SET_VERIFY_INVERTED,
152                'INV' if inverted else 'NORM', relays))
153
154    def get_verify_inverted(self, relays):
155        """Get the confidence polarity. 1 is inverted, 0 is value as-is.
156
157        @param relays: relays to get (, to separate and : for range)
158        @returns tuple of status where 1 in inverted and 0 for value as-is
159
160        """
161        status = self.query('%s (@%s)\n' % (self._CMD_GET_VERIFY_INVERTED,
162                                            relays)).strip().split(',')
163        return tuple(bool(int(x)) for x in status)
164
165    def get_verify_state(self, relays):
166        """If verify set get driven state, else confidence state.
167
168        @param relays: relays to get (, to separate and : for range)
169
170        @returns tuple of verify status for given relays
171
172        """
173        status = self.query('%s (@%s)\n' % (self._CMD_GET_VERIFY_STATE,
174                                            relays)).strip().split(',')
175        return tuple(bool(int(x)) for x in status)
176
177    @property
178    def busy(self):
179        """Check relays are still settling.
180
181        @returns Boolean True if relays are settling, False if not.
182
183        """
184        return bool(int(self.query('%s\n' % self._CMD_CHECK_BUSY).strip()))
185
186    def wait(self):
187        """Wait for relays to debounce."""
188        self.write('%s\n' % self._CMD_WAIT)
189
190    def get_attenuation(self, ap_enclosure):
191        """Get Attenuation for an AP enclosure.
192
193        Attenuation is set by turning on/off the relays.  Each relay specifies
194        a bit in the binary value of attenuation.  Find the relay status and
195        build the binary value, then find the decimal value from it.
196
197        @param ap_enclosure: Int 1/2/3/4 (AP enclosure index)
198
199        @returns attenuation value in int
200
201        @raises ValueError: on bad ap_enclosure value
202
203        """
204        if (ap_enclosure < self._MIN_ENCLOSURE or
205                ap_enclosure > self._MAX_ENCLOSURE):
206            raise ValueError('Invalid AP enclosure number: %s.', ap_enclosure)
207        else:
208            a_status = self.relays_closed(
209                self._AP_ATTENUATOR_RELAYS_SHORT[ap_enclosure])
210            status = a_status[::-1]  # reverse for Endian transform
211            logging.debug('attenuator status: %s', status)
212
213            # build the binary value
214            binary = ''.join(['0' if x else '1' for x in status])
215            return int(binary, 2)
216
217    def get_all_attenuation(self):
218        """Get attenuation value for all AP enclosures.
219
220        @returns tuple of attenuation value for each enclosure
221
222        """
223        attenuations = []
224        for x in xrange(self._MIN_ENCLOSURE, self._MAX_ENCLOSURE + 1):
225            attenuations.append(self.get_attenuation(x))
226        return tuple(attenuations)
227
228    def set_attenuation(self, ap_enclosure, attenuation):
229        """Set attenuation for an AP enclosure.
230
231        @param ap_enclosure: Int 0,1,2,3,4 AP enclosure number. 0 for all
232        @param attenuation: Int Attenuation value to set
233
234        @raises ValueError: on bad ap_enclosure value
235
236        """
237        if ap_enclosure == self._ALL_ENCLOSURE:
238            # set attenuation on all
239            for x in xrange(self._MIN_ENCLOSURE, self._MAX_ENCLOSURE + 1):
240                self.set_attenuation(x, attenuation)
241        elif (ap_enclosure < self._MIN_ENCLOSURE or
242              ap_enclosure > self._MAX_ENCLOSURE):
243            raise ValueError('Bad AP enclosure value: %s' % ap_enclosure)
244        else:
245            # convert attenuation decimal value to binary
246            bin_array = [int(x) for x in list('{0:07b}'.format(attenuation))]
247            # For endian
248            reverse_bits = self._AP_ATTENUATOR_RELAYS[ap_enclosure][:: -1]
249
250            # determine which bits should be set
251            relays_to_close = [
252                reverse_bits[i] for i, j in enumerate(bin_array) if not j
253            ]
254
255            # open all relay for attenuator & then close the ones we need
256            relays = ','.join(self._AP_ATTENUATOR_RELAYS[ap_enclosure])
257            self.open_relays(relays)
258            logging.debug('Attenuator relays opened: %s', relays)
259            relays = ','.join(relays_to_close)
260            self.close_relays(relays)
261            logging.debug('Attenuator relays closed: %s', relays)
262
263    def get_ap_connections(self):
264        """Get a list of AP to client connections.
265
266        @returns tuple of dict of connections ({'AP': 1, 'Client': 1}, ...)
267
268        """
269        # Get the closed status for relays connected to AP enclosures.
270        ap_connections = self.relays_closed(self._ALL_AP_RELAYS)
271
272        # Find out the connections
273        connections = []
274        for index, relay_num in enumerate(ap_connections):
275            # if the relay is closed, there is a connection
276            if relay_num:
277                connection = {
278                    'AP': (index / self._RELAYS_PER_ENCLOSURE) + 1,
279                    'Client': (index % self._RELAYS_PER_ENCLOSURE) + 1
280                }
281                connections.append(connection)
282        return tuple(connections)
283
284    def get_client_connections(self):
285        """Get a list of AP to client connections.
286
287        @returns tuple of dict of connections ({'AP': 1, 'Client': 1}, ...)
288
289        """
290        # Get the closed status for relays connected to client enclosures.
291        c_connections = self.relays_closed(self._ALL_CLIENT_RELAYS)
292
293        # Find out the connections
294        connections = []
295        for index, relay_num in enumerate(c_connections):
296            # if the relay is closed, there is a connection
297            if relay_num:
298                connection = {
299                    'AP': (index % self._RELAYS_PER_ENCLOSURE) + 1,
300                    'Client': (index / self._RELAYS_PER_ENCLOSURE) + 1
301                }
302                connections.append(connection)
303        return tuple(connections)
304
305    def connect_ap_client(self, ap, client):
306        """Connect AP and a Client enclosure.
307
308        @param ap: Int 1/2/3/4 AP enclosure index
309        @param client: Int 1/2/3/4 Client enclosure index
310
311        @raises ValueError: when ap / client value is not 1/2/3/4
312
313        """
314        if (ap < self._MIN_ENCLOSURE or ap > self._MAX_ENCLOSURE):
315            raise ValueError(
316                'AP enclosure should be 1/2/3/4. Given: %s' % ap)
317        if (client < self._MIN_ENCLOSURE or
318                client > self._MAX_ENCLOSURE):
319            raise ValueError('Client enclosure should be 1/2/3/4. Given: %s' %
320                             client)
321
322        # Relay index start from 0 so subtract 1 for index
323        relays = 'k1_%s,k1_%s' % (
324            ((int(ap) - 1) * self._RELAYS_PER_ENCLOSURE) + int(client),
325            (((int(client) - 1) + self._MAX_ENCLOSURE) *
326             self._RELAYS_PER_ENCLOSURE) + int(ap))
327        logging.debug('Relays to close: %s', relays)
328        self.close_relays(relays)
329
330
331@contextlib.contextmanager
332def RfSwitchSession(host, port=scpi.Scpi.SCPI_PORT):
333    """Start a RF Switch session and close it when done.
334
335    @param host: Hostname or IP address of RF Switch
336    @param port: Int SCPI port number (default 5025)
337
338    @yields: an instance of InteractiveSsh
339    """
340    session = RfSwitch(host, port)
341    try:
342        yield session
343    finally:
344        session.close()
345