# Lint as: python2, python3 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging from autotest_lib.client.common_lib import error from autotest_lib.server.cros.network import attenuator from autotest_lib.server.cros.network import attenuator_hosts from autotest_lib.utils.frozen_chromite.lib import timeout_util HOST_TO_FIXED_ATTENUATIONS = attenuator_hosts.HOST_FIXED_ATTENUATIONS # Fake entry to deal with attenuator not added to attenuator_hosts.py file FAKE_HOST = HOST_TO_FIXED_ATTENUATIONS['fake-atten-host'] class AttenuatorController(object): """Represents a minicircuits variable attenuator. This device is used to vary the attenuation between a router and a client. This allows us to measure throughput as a function of signal strength and test some roaming situations. The throughput vs signal strength tests are referred to rate vs range (RvR) tests in places. Fixed attenuatations should be recorded in attenuator_hosts.py else TestError will be raised when fixed attentuations are accessed. """ @property def supported_attenuators(self): """@return iterable of int attenuators supported on this host.""" return list(self._fixed_attenuations.keys()) def __init__(self, hostname): """Construct a AttenuatorController. @param hostname: Hostname representing minicircuits attenuator. """ self.hostname = hostname super(AttenuatorController, self).__init__() part = hostname.split('.cros', 1)[0] if part not in list(HOST_TO_FIXED_ATTENUATIONS.keys()): logging.debug('Attenuator %s not found in attenuator_host list', part) self._fixed_attenuations = FAKE_HOST else: self._fixed_attenuations = HOST_TO_FIXED_ATTENUATIONS[part] num_atten = len(self.supported_attenuators) self._attenuator = attenuator.Attenuator(hostname, num_atten) self.set_variable_attenuation(0) def _approximate_frequency(self, attenuator_num, freq): """Finds an approximate frequency to freq. In case freq is not present in self._fixed_attenuations, we use a value from a nearby channel as an approximation. @param attenuator_num: attenuator in question on the remote host. Each attenuator has a different fixed path loss per frequency. @param freq: int frequency in MHz. @returns int approximate frequency from self._fixed_attenuations. @raises TestError if attenuator is not in attenuator_hosts.py """ self._fail_if_fake() old_offset = None approx_freq = None for defined_freq in list(self._fixed_attenuations[attenuator_num].keys()): new_offset = abs(defined_freq - freq) if old_offset is None or new_offset < old_offset: old_offset = new_offset approx_freq = defined_freq logging.debug('Approximating attenuation for frequency %d with ' 'constants for frequency %d.', freq, approx_freq) return approx_freq def close(self): """Close variable attenuator connection.""" self._attenuator.close() def set_total_attenuation(self, atten_db, frequency_mhz, attenuator_num=None): """Set the total attenuation on one or all attenuators. @param atten_db: int level of attenuation in dB. This must be higher than the fixed attenuation level of the affected attenuators. @param frequency_mhz: int frequency for which to calculate the total attenuation. The fixed component of attenuation varies with frequency. @param attenuator_num: int attenuator to change, or None to set all variable attenuators. @raises TestError if attenuator is not in attenuator_hosts.py """ self._fail_if_fake() affected_attenuators = self.supported_attenuators if attenuator_num is not None: affected_attenuators = [attenuator_num] for atten in affected_attenuators: freq_to_fixed_loss = self._fixed_attenuations[atten] approx_freq = self._approximate_frequency(atten, frequency_mhz) variable_atten_db = atten_db - freq_to_fixed_loss[approx_freq] self.set_variable_attenuation(variable_atten_db, attenuator_num=atten) def set_variable_attenuation(self, atten_db, attenuator_num=None): """Set the variable attenuation on one or all attenuators. @param atten_db: int non-negative level of attenuation in dB. @param attenuator_num: int attenuator to change, or None to set all variable attenuators. """ affected_attenuators = self.supported_attenuators if attenuator_num is not None: affected_attenuators = [attenuator_num] for atten in affected_attenuators: try: self._attenuator.set_atten(atten, atten_db) if int(self._attenuator.get_atten(atten)) != atten_db: raise error.TestError('Attenuation did not set as expected ' 'on attenuator %d' % atten) except error.TestError: self._attenuator.reopen(self.hostname) self._attenuator.set_atten(atten, atten_db) if int(self._attenuator.get_atten(atten)) != atten_db: raise error.TestError('Attenuation did not set as expected ' 'on attenuator %d' % atten) logging.info('%ddb attenuation set successfully on attenautor %d', atten_db, atten) def get_minimal_total_attenuation(self): """Get attenuator's maximum fixed attenuation value. This is pulled from the current attenuator's lines and becomes the minimal total attenuation when stepping through attenuation levels. @return maximum starting attenuation value @raises TestError if attenuator is not in attenuator_hosts.py """ self._fail_if_fake() max_atten = 0 for atten_num in self._fixed_attenuations.keys(): atten_values = list(self._fixed_attenuations[atten_num].values()) max_atten = max(max(atten_values), max_atten) return max_atten def set_signal_level(self, client_context, requested_sig_level, min_sig_level_allowed=-85, tolerance_percent=3, timeout=240): """Set wifi signal to desired level by changing attenuation. @param client_context: Client context object. @param requested_sig_level: Negative int value in dBm for wifi signal level to be set. @param min_sig_level_allowed: Minimum signal level allowed; this is to ensure that we don't set a signal that is too weak and DUT can not associate. @param tolerance_percent: Percentage to be used to calculate the desired range for the wifi signal level. """ atten_db = 0 starting_sig_level = client_context.wifi_signal_level if not starting_sig_level: raise error.TestError("No signal detected.") if not (min_sig_level_allowed <= requested_sig_level <= starting_sig_level): raise error.TestError("Requested signal level (%d) is either " "higher than current signal level (%r) with " "0db attenuation or lower than minimum " "signal level (%d) allowed." % (requested_sig_level, starting_sig_level, min_sig_level_allowed)) try: with timeout_util.Timeout(timeout): while True: client_context.reassociate(timeout_seconds=1) current_sig_level = client_context.wifi_signal_level logging.info("Current signal level %r", current_sig_level) if not current_sig_level: raise error.TestError("No signal detected.") if self.signal_in_range(requested_sig_level, current_sig_level, tolerance_percent): logging.info("Signal level set to %r.", current_sig_level) break if current_sig_level > requested_sig_level: self.set_variable_attenuation(atten_db) atten_db +=1 if current_sig_level < requested_sig_level: self.set_variable_attenuation(atten_db) atten_db -= 1 except (timeout_util.TimeoutError, error.TestError, error.TestFail) as e: raise error.TestError("Not able to set wifi signal to requested " "level. \n%s" % e) def signal_in_range(self, req_sig_level, curr_sig_level, tolerance_percent): """Check if wifi signal is within the threshold of requested signal. @param req_sig_level: Negative int value in dBm for wifi signal level to be set. @param curr_sig_level: Current wifi signal level seen by the DUT. @param tolerance_percent: Percentage to be used to calculate the desired range for the wifi signal level. @returns True if wifi signal is in the desired range. """ min_sig = req_sig_level + (req_sig_level * tolerance_percent / 100) max_sig = req_sig_level - (req_sig_level * tolerance_percent / 100) if min_sig <= curr_sig_level <= max_sig: return True return False def _fail_if_fake(self): """ Raises test error if this attenuator is missing If an attenuator is missing, we use use a fake entry. This function will fail the test if the current attenuator is fake. """ if self._fixed_attenuations == FAKE_HOST: raise error.TestError( 'Attenuator %r not found in attenuator_hosts.py' % self.hostname)