# Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging from autotest_lib.client.common_lib import error from autotest_lib.server.cros.network import attenuator from autotest_lib.server.cros.network import attenuator_hosts from chromite.lib import timeout_util HOST_TO_FIXED_ATTENUATIONS = attenuator_hosts.HOST_FIXED_ATTENUATIONS class AttenuatorController(object): """Represents a minicircuits variable attenuator. This device is used to vary the attenuation between a router and a client. This allows us to measure throughput as a function of signal strength and test some roaming situations. The throughput vs signal strength tests are referred to rate vs range (RvR) tests in places. """ @property def supported_attenuators(self): """@return iterable of int attenuators supported on this host.""" return self._fixed_attenuations.keys() def __init__(self, hostname): """Construct a AttenuatorController. @param hostname: Hostname representing minicircuits attenuator. """ self.hostname = hostname super(AttenuatorController, self).__init__() part = hostname.split('.', 1)[0] if part not in HOST_TO_FIXED_ATTENUATIONS.keys(): raise error.TestError('Unexpected RvR host name %r.' % hostname) self._fixed_attenuations = HOST_TO_FIXED_ATTENUATIONS[part] num_atten = len(self.supported_attenuators) self._attenuator = attenuator.Attenuator(hostname, num_atten) self.set_variable_attenuation(0) def _approximate_frequency(self, attenuator_num, freq): """Finds an approximate frequency to freq. In case freq is not present in self._fixed_attenuations, we use a value from a nearby channel as an approximation. @param attenuator_num: attenuator in question on the remote host. Each attenuator has a different fixed path loss per frequency. @param freq: int frequency in MHz. @returns int approximate frequency from self._fixed_attenuations. """ old_offset = None approx_freq = None for defined_freq in self._fixed_attenuations[attenuator_num].keys(): new_offset = abs(defined_freq - freq) if old_offset is None or new_offset < old_offset: old_offset = new_offset approx_freq = defined_freq logging.debug('Approximating attenuation for frequency %d with ' 'constants for frequency %d.', freq, approx_freq) return approx_freq def close(self): """Close variable attenuator connection.""" self._attenuator.close() def set_total_attenuation(self, atten_db, frequency_mhz, attenuator_num=None): """Set the total attenuation on one or all attenuators. @param atten_db: int level of attenuation in dB. This must be higher than the fixed attenuation level of the affected attenuators. @param frequency_mhz: int frequency for which to calculate the total attenuation. The fixed component of attenuation varies with frequency. @param attenuator_num: int attenuator to change, or None to set all variable attenuators. """ affected_attenuators = self.supported_attenuators if attenuator_num is not None: affected_attenuators = [attenuator_num] for atten in affected_attenuators: freq_to_fixed_loss = self._fixed_attenuations[atten] approx_freq = self._approximate_frequency(atten, frequency_mhz) variable_atten_db = atten_db - freq_to_fixed_loss[approx_freq] self.set_variable_attenuation(variable_atten_db, attenuator_num=atten) def set_variable_attenuation(self, atten_db, attenuator_num=None): """Set the variable attenuation on one or all attenuators. @param atten_db: int non-negative level of attenuation in dB. @param attenuator_num: int attenuator to change, or None to set all variable attenuators. """ affected_attenuators = self.supported_attenuators if attenuator_num is not None: affected_attenuators = [attenuator_num] for atten in affected_attenuators: try: self._attenuator.set_atten(atten, atten_db) if int(self._attenuator.get_atten(atten)) != atten_db: raise error.TestError('Attenuation did not set as expected ' 'on attenuator %d' % atten) except error.TestError: self._attenuator.reopen(self.hostname) self._attenuator.set_atten(atten, atten_db) if int(self._attenuator.get_atten(atten)) != atten_db: raise error.TestError('Attenuation did not set as expected ' 'on attenuator %d' % atten) logging.info('%ddb attenuation set successfully on attenautor %d', atten_db, atten) def get_minimal_total_attenuation(self): """Get attenuator's maximum fixed attenuation value. This is pulled from the current attenuator's lines and becomes the minimal total attenuation when stepping through attenuation levels. @return maximum starting attenuation value """ max_atten = 0 for atten_num in self._fixed_attenuations.iterkeys(): atten_values = self._fixed_attenuations[atten_num].values() max_atten = max(max(atten_values), max_atten) return max_atten def set_signal_level(self, client_context, requested_sig_level, min_sig_level_allowed=-85, tolerance_percent=3, timeout=240): """Set wifi signal to desired level by changing attenuation. @param client_context: Client context object. @param requested_sig_level: Negative int value in dBm for wifi signal level to be set. @param min_sig_level_allowed: Minimum signal level allowed; this is to ensure that we don't set a signal that is too weak and DUT can not associate. @param tolerance_percent: Percentage to be used to calculate the desired range for the wifi signal level. """ atten_db = 0 starting_sig_level = client_context.wifi_signal_level if not starting_sig_level: raise error.TestError("No signal detected.") if not (min_sig_level_allowed <= requested_sig_level <= starting_sig_level): raise error.TestError("Requested signal level (%d) is either " "higher than current signal level (%r) with " "0db attenuation or lower than minimum " "signal level (%d) allowed." % (requested_sig_level, starting_sig_level, min_sig_level_allowed)) try: with timeout_util.Timeout(timeout): while True: client_context.reassociate(timeout_seconds=1) current_sig_level = client_context.wifi_signal_level logging.info("Current signal level %r", current_sig_level) if not current_sig_level: raise error.TestError("No signal detected.") if self.signal_in_range(requested_sig_level, current_sig_level, tolerance_percent): logging.info("Signal level set to %r.", current_sig_level) break if current_sig_level > requested_sig_level: self.set_variable_attenuation(atten_db) atten_db +=1 if current_sig_level < requested_sig_level: self.set_variable_attenuation(atten_db) atten_db -= 1 except (timeout_util.TimeoutError, error.TestError, error.TestFail) as e: raise error.TestError("Not able to set wifi signal to requested " "level. \n%s" % e) def signal_in_range(self, req_sig_level, curr_sig_level, tolerance_percent): """Check if wifi signal is within the threshold of requested signal. @param req_sig_level: Negative int value in dBm for wifi signal level to be set. @param curr_sig_level: Current wifi signal level seen by the DUT. @param tolerance_percent: Percentage to be used to calculate the desired range for the wifi signal level. @returns True if wifi signal is in the desired range. """ min_sig = req_sig_level + (req_sig_level * tolerance_percent / 100) max_sig = req_sig_level - (req_sig_level * tolerance_percent / 100) if min_sig <= curr_sig_level <= max_sig: return True return False