• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
2# Use of this source code is governed by a BSD-style license that can be
3# found in the LICENSE file.
4
5import logging
6
7from autotest_lib.client.common_lib import error
8from autotest_lib.server.cros.network import attenuator
9from autotest_lib.server.cros.network import attenuator_hosts
10
11from chromite.lib import timeout_util
12
13HOST_TO_FIXED_ATTENUATIONS = attenuator_hosts.HOST_FIXED_ATTENUATIONS
14
15
16class AttenuatorController(object):
17    """Represents a minicircuits variable attenuator.
18
19    This device is used to vary the attenuation between a router and a client.
20    This allows us to measure throughput as a function of signal strength and
21    test some roaming situations.  The throughput vs signal strength tests
22    are referred to rate vs range (RvR) tests in places.
23
24    """
25
26    @property
27    def supported_attenuators(self):
28        """@return iterable of int attenuators supported on this host."""
29        return self._fixed_attenuations.keys()
30
31
32    def __init__(self, hostname):
33        """Construct a AttenuatorController.
34
35        @param hostname: Hostname representing minicircuits attenuator.
36
37        """
38        self.hostname = hostname
39        super(AttenuatorController, self).__init__()
40        part = hostname.split('.', 1)[0]
41        if part not in HOST_TO_FIXED_ATTENUATIONS.keys():
42            raise error.TestError('Unexpected RvR host name %r.' % hostname)
43        self._fixed_attenuations = HOST_TO_FIXED_ATTENUATIONS[part]
44        num_atten = len(self.supported_attenuators)
45
46        self._attenuator = attenuator.Attenuator(hostname, num_atten)
47        self.set_variable_attenuation(0)
48
49
50    def _approximate_frequency(self, attenuator_num, freq):
51        """Finds an approximate frequency to freq.
52
53        In case freq is not present in self._fixed_attenuations, we use a value
54        from a nearby channel as an approximation.
55
56        @param attenuator_num: attenuator in question on the remote host.  Each
57                attenuator has a different fixed path loss per frequency.
58        @param freq: int frequency in MHz.
59        @returns int approximate frequency from self._fixed_attenuations.
60
61        """
62        old_offset = None
63        approx_freq = None
64        for defined_freq in self._fixed_attenuations[attenuator_num].keys():
65            new_offset = abs(defined_freq - freq)
66            if old_offset is None or new_offset < old_offset:
67                old_offset = new_offset
68                approx_freq = defined_freq
69
70        logging.debug('Approximating attenuation for frequency %d with '
71                      'constants for frequency %d.', freq, approx_freq)
72        return approx_freq
73
74
75    def close(self):
76        """Close variable attenuator connection."""
77        self._attenuator.close()
78
79
80    def set_total_attenuation(self, atten_db, frequency_mhz,
81                              attenuator_num=None):
82        """Set the total attenuation on one or all attenuators.
83
84        @param atten_db: int level of attenuation in dB.  This must be
85                higher than the fixed attenuation level of the affected
86                attenuators.
87        @param frequency_mhz: int frequency for which to calculate the
88                total attenuation.  The fixed component of attenuation
89                varies with frequency.
90        @param attenuator_num: int attenuator to change, or None to
91                set all variable attenuators.
92
93        """
94        affected_attenuators = self.supported_attenuators
95        if attenuator_num is not None:
96            affected_attenuators = [attenuator_num]
97        for atten in affected_attenuators:
98            freq_to_fixed_loss = self._fixed_attenuations[atten]
99            approx_freq = self._approximate_frequency(atten,
100                                                      frequency_mhz)
101            variable_atten_db = atten_db - freq_to_fixed_loss[approx_freq]
102            self.set_variable_attenuation(variable_atten_db,
103                                          attenuator_num=atten)
104
105
106    def set_variable_attenuation(self, atten_db, attenuator_num=None):
107        """Set the variable attenuation on one or all attenuators.
108
109        @param atten_db: int non-negative level of attenuation in dB.
110        @param attenuator_num: int attenuator to change, or None to
111                set all variable attenuators.
112
113        """
114        affected_attenuators = self.supported_attenuators
115        if attenuator_num is not None:
116            affected_attenuators = [attenuator_num]
117        for atten in affected_attenuators:
118            try:
119                self._attenuator.set_atten(atten, atten_db)
120                if int(self._attenuator.get_atten(atten)) != atten_db:
121                    raise error.TestError('Attenuation did not set as expected '
122                                          'on attenuator %d' % atten)
123            except error.TestError:
124                self._attenuator.reopen(self.hostname)
125                self._attenuator.set_atten(atten, atten_db)
126                if int(self._attenuator.get_atten(atten)) != atten_db:
127                    raise error.TestError('Attenuation did not set as expected '
128                                          'on attenuator %d' % atten)
129            logging.info('%ddb attenuation set successfully on attenautor %d',
130                         atten_db, atten)
131
132
133    def get_minimal_total_attenuation(self):
134        """Get attenuator's maximum fixed attenuation value.
135
136        This is pulled from the current attenuator's lines and becomes the
137        minimal total attenuation when stepping through attenuation levels.
138
139        @return maximum starting attenuation value
140
141        """
142        max_atten = 0
143        for atten_num in self._fixed_attenuations.iterkeys():
144            atten_values = self._fixed_attenuations[atten_num].values()
145            max_atten = max(max(atten_values), max_atten)
146        return max_atten
147
148
149    def set_signal_level(self, client_context, requested_sig_level,
150            min_sig_level_allowed=-85, tolerance_percent=3, timeout=240):
151        """Set wifi signal to desired level by changing attenuation.
152
153        @param client_context: Client context object.
154        @param requested_sig_level: Negative int value in dBm for wifi signal
155                level to be set.
156        @param min_sig_level_allowed: Minimum signal level allowed; this is to
157                ensure that we don't set a signal that is too weak and DUT can
158                not associate.
159        @param tolerance_percent: Percentage to be used to calculate the desired
160                range for the wifi signal level.
161        """
162        atten_db = 0
163        starting_sig_level = client_context.wifi_signal_level
164        if not starting_sig_level:
165            raise error.TestError("No signal detected.")
166        if not (min_sig_level_allowed <= requested_sig_level <=
167                starting_sig_level):
168            raise error.TestError("Requested signal level (%d) is either "
169                                  "higher than current signal level (%r) with "
170                                  "0db attenuation or lower than minimum "
171                                  "signal level (%d) allowed." %
172                                  (requested_sig_level,
173                                  starting_sig_level,
174                                  min_sig_level_allowed))
175
176        try:
177            with timeout_util.Timeout(timeout):
178                while True:
179                    client_context.reassociate(timeout_seconds=1)
180                    current_sig_level = client_context.wifi_signal_level
181                    logging.info("Current signal level %r", current_sig_level)
182                    if not current_sig_level:
183                        raise error.TestError("No signal detected.")
184                    if self.signal_in_range(requested_sig_level,
185                            current_sig_level, tolerance_percent):
186                        logging.info("Signal level set to %r.",
187                                     current_sig_level)
188                        break
189                    if current_sig_level > requested_sig_level:
190                        self.set_variable_attenuation(atten_db)
191                        atten_db +=1
192                    if current_sig_level < requested_sig_level:
193                        self.set_variable_attenuation(atten_db)
194                        atten_db -= 1
195        except (timeout_util.TimeoutError, error.TestError,
196                error.TestFail) as e:
197            raise error.TestError("Not able to set wifi signal to requested "
198                                  "level. \n%s" % e)
199
200
201    def signal_in_range(self, req_sig_level, curr_sig_level, tolerance_percent):
202        """Check if wifi signal is within the threshold of requested signal.
203
204        @param req_sig_level: Negative int value in dBm for wifi signal
205                level to be set.
206        @param curr_sig_level: Current wifi signal level seen by the DUT.
207        @param tolerance_percent: Percentage to be used to calculate the desired
208                range for the wifi signal level.
209
210        @returns True if wifi signal is in the desired range.
211        """
212        min_sig = req_sig_level + (req_sig_level * tolerance_percent / 100)
213        max_sig = req_sig_level - (req_sig_level * tolerance_percent / 100)
214        if min_sig <= curr_sig_level <= max_sig:
215            return True
216        return False
217