# Copyright (c) 2013 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import collections import copy import logging import re import time from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import utils from autotest_lib.client.common_lib.cros.network import iw_event_logger # These must mirror the values in 'iw list' output. CHAN_FLAG_DISABLED = 'disabled' CHAN_FLAG_NO_IR = 'no IR' CHAN_FLAG_PASSIVE_SCAN = 'passive scan' CHAN_FLAG_RADAR_DETECT = 'radar detection' DEV_MODE_AP = 'AP' DEV_MODE_IBSS = 'IBSS' DEV_MODE_MONITOR = 'monitor' HT20 = 'HT20' HT40_ABOVE = 'HT40+' HT40_BELOW = 'HT40-' SECURITY_OPEN = 'open' SECURITY_WEP = 'wep' SECURITY_WPA = 'wpa' SECURITY_WPA2 = 'wpa2' # Mixed mode security is WPA2/WPA SECURITY_MIXED = 'mixed' # Table of lookups between the output of item 'secondary channel offset:' from # iw scan to constants. HT_TABLE = {'no secondary': HT20, 'above': HT40_ABOVE, 'below': HT40_BELOW} IwBand = collections.namedtuple( 'Band', ['num', 'frequencies', 'frequency_flags', 'mcs_indices']) IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security', 'ht', 'signal']) IwNetDev = collections.namedtuple('IwNetDev', ['phy', 'if_name', 'if_type']) IwTimedScan = collections.namedtuple('IwTimedScan', ['time', 'bss_list']) # The fields for IwPhy are as follows: # name: string name of the phy, such as "phy0" # bands: list of IwBand objects. # modes: List of strings containing interface modes supported, such as "AP". # commands: List of strings containing nl80211 commands supported, such as # "authenticate". # features: List of strings containing nl80211 features supported, such as # "T-DLS". # max_scan_ssids: Maximum number of SSIDs which can be scanned at once. IwPhy = collections.namedtuple( 'Phy', ['name', 'bands', 'modes', 'commands', 'features', 'max_scan_ssids', 'avail_tx_antennas', 'avail_rx_antennas', 'supports_setting_antenna_mask', 'support_vht']) DEFAULT_COMMAND_IW = 'iw' # Redirect stderr to stdout on Cros since adb commands cannot distinguish them # on Brillo. IW_TIME_COMMAND_FORMAT = '(time -p %s) 2>&1' IW_TIME_COMMAND_OUTPUT_START = 'real' IW_LINK_KEY_BEACON_INTERVAL = 'beacon int' IW_LINK_KEY_DTIM_PERIOD = 'dtim period' IW_LINK_KEY_FREQUENCY = 'freq' IW_LOCAL_EVENT_LOG_FILE = './debug/iw_event_%d.log' class IwRunner(object): """Defines an interface to the 'iw' command.""" def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW): self._run = utils.run self._host = remote_host if remote_host: self._run = remote_host.run self._command_iw = command_iw self._log_id = 0 def _parse_scan_results(self, output): """Parse the output of the 'scan' and 'scan dump' commands. Here is an example of what a single network would look like for the input parameter. Some fields have been removed in this example: BSS 00:11:22:33:44:55(on wlan0) freq: 2447 beacon interval: 100 TUs signal: -46.00 dBm Information elements from Probe Response frame: SSID: my_open_network Extended supported rates: 24.0 36.0 48.0 54.0 HT capabilities: Capabilities: 0x0c HT20 HT operation: * primary channel: 8 * secondary channel offset: no secondary * STA channel width: 20 MHz RSN: * Version: 1 * Group cipher: CCMP * Pairwise ciphers: CCMP * Authentication suites: PSK * Capabilities: 1-PTKSA-RC 1-GTKSA-RC (0x0000) @param output: string command output. @returns a list of IwBss namedtuples; None if the scan fails """ bss = None frequency = None ssid = None ht = None signal = None security = None supported_securities = [] bss_list = [] for line in output.splitlines(): line = line.strip() bss_match = re.match('BSS ([0-9a-f:]+)', line) if bss_match: if bss != None: security = self.determine_security(supported_securities) iwbss = IwBss(bss, frequency, ssid, security, ht, signal) bss_list.append(iwbss) bss = frequency = ssid = security = ht = None supported_securities = [] bss = bss_match.group(1) if line.startswith('freq:'): frequency = int(line.split()[1]) if line.startswith('signal:'): signal = float(line.split()[1]) if line.startswith('SSID: '): _, ssid = line.split(': ', 1) if line.startswith('* secondary channel offset'): ht = HT_TABLE[line.split(':')[1].strip()] if line.startswith('WPA'): supported_securities.append(SECURITY_WPA) if line.startswith('RSN'): supported_securities.append(SECURITY_WPA2) security = self.determine_security(supported_securities) bss_list.append(IwBss(bss, frequency, ssid, security, ht, signal)) return bss_list def _parse_scan_time(self, output): """ Parse the scan time in seconds from the output of the 'time -p "scan"' command. 'time -p' Command output format is below: real 0.01 user 0.01 sys 0.00 @param output: string command output. @returns float time in seconds. """ output_lines = output.splitlines() for line_num, line in enumerate(output_lines): line = line.strip() if (line.startswith(IW_TIME_COMMAND_OUTPUT_START) and output_lines[line_num + 1].startswith('user') and output_lines[line_num + 2].startswith('sys')): return float(line.split()[1]) raise error.TestFail('Could not parse scan time.') def add_interface(self, phy, interface, interface_type): """ Add an interface to a WiFi PHY. @param phy: string name of PHY to add an interface to. @param interface: string name of interface to add. @param interface_type: string type of interface to add (e.g. 'monitor'). """ self._run('%s phy %s interface add %s type %s' % (self._command_iw, phy, interface, interface_type)) def disconnect_station(self, interface): """ Disconnect a STA from a network. @param interface: string name of interface to disconnect. """ self._run('%s dev %s disconnect' % (self._command_iw, interface)) def get_current_bssid(self, interface_name): """Get the BSSID that |interface_name| is associated with. @param interface_name: string name of interface (e.g. 'wlan0'). @return string bssid of our current association, or None. """ result = self._run('%s dev %s link' % (self._command_iw, interface_name), ignore_status=True) if result.exit_status: # See comment in get_link_value. return None # We're looking for a line like: # Connected to 04:f0:21:03:7d:bb (on wlan0) match = re.search( 'Connected to ([0-9a-fA-F:]{17}) \\(on %s\\)' % interface_name, result.stdout) if match is None: return None return match.group(1) def get_interface(self, interface_name): """Get full information about an interface given an interface name. @param interface_name: string name of interface (e.g. 'wlan0'). @return IwNetDev tuple. """ matching_interfaces = [iw_if for iw_if in self.list_interfaces() if iw_if.if_name == interface_name] if len(matching_interfaces) != 1: raise error.TestFail('Could not find interface named %s' % interface_name) return matching_interfaces[0] def get_link_value(self, interface, iw_link_key): """Get the value of a link property for |interface|. This command parses fields of iw link: #> iw dev wlan0 link Connected to 74:e5:43:10:4f:c0 (on wlan0) SSID: PMKSACaching_4m9p5_ch1 freq: 5220 RX: 5370 bytes (37 packets) TX: 3604 bytes (15 packets) signal: -59 dBm tx bitrate: 13.0 MBit/s MCS 1 bss flags: short-slot-time dtim period: 5 beacon int: 100 @param iw_link_key: string one of IW_LINK_KEY_* defined above. @param interface: string desired value of iw link property. """ result = self._run('%s dev %s link' % (self._command_iw, interface), ignore_status=True) if result.exit_status: # When roaming, there is a period of time for mac80211 based drivers # when the driver is 'associated' with an SSID but not a particular # BSS. This causes iw to return an error code (-2) when attempting # to retrieve information specific to the BSS. This does not happen # in mwifiex drivers. return None find_re = re.compile('\s*%s:\s*(.*\S)\s*$' % iw_link_key) find_results = filter(bool, map(find_re.match, result.stdout.splitlines())) if not find_results: return None actual_value = find_results[0].group(1) logging.info('Found iw link key %s with value %s.', iw_link_key, actual_value) return actual_value def ibss_join(self, interface, ssid, frequency): """ Join a WiFi interface to an IBSS. @param interface: string name of interface to join to the IBSS. @param ssid: string SSID of IBSS to join. @param frequency: int frequency of IBSS in Mhz. """ self._run('%s dev %s ibss join %s %d' % (self._command_iw, interface, ssid, frequency)) def ibss_leave(self, interface): """ Leave an IBSS. @param interface: string name of interface to remove from the IBSS. """ self._run('%s dev %s ibss leave' % (self._command_iw, interface)) def list_interfaces(self, desired_if_type=None): """List WiFi related interfaces on this system. @param desired_if_type: string type of interface to filter our returned list of interfaces for (e.g. 'managed'). @return list of IwNetDev tuples. """ # Parse output in the following format: # # $ adb shell iw dev # phy#0 # Unnamed/non-netdev interface # wdev 0x2 # addr aa:bb:cc:dd:ee:ff # type P2P-device # Interface wlan0 # ifindex 4 # wdev 0x1 # addr aa:bb:cc:dd:ee:ff # ssid Whatever # type managed output = self._run('%s dev' % self._command_iw).stdout interfaces = [] phy = None if_name = None if_type = None for line in output.splitlines(): m = re.match('phy#([0-9]+)', line) if m: phy = 'phy%d' % int(m.group(1)) if_name = None if_type = None continue if not phy: continue m = re.match('[\s]*Interface (.*)', line) if m: if_name = m.group(1) continue if not if_name: continue # Common values for type are 'managed', 'monitor', and 'IBSS'. m = re.match('[\s]*type ([a-zA-Z]+)', line) if m: if_type = m.group(1) interfaces.append(IwNetDev(phy=phy, if_name=if_name, if_type=if_type)) # One phy may have many interfaces, so don't reset it. if_name = None if desired_if_type: interfaces = [interface for interface in interfaces if interface.if_type == desired_if_type] return interfaces def list_phys(self): """ List WiFi PHYs on the given host. @return list of IwPhy tuples. """ output = self._run('%s list' % self._command_iw).stdout pending_phy_name = None current_band = None current_section = None all_phys = [] def add_pending_phy(): """Add the pending phy into |all_phys|.""" bands = tuple(IwBand(band.num, tuple(band.frequencies), dict(band.frequency_flags), tuple(band.mcs_indices)) for band in pending_phy_bands) new_phy = IwPhy(pending_phy_name, bands, tuple(pending_phy_modes), tuple(pending_phy_commands), tuple(pending_phy_features), pending_phy_max_scan_ssids, pending_phy_tx_antennas, pending_phy_rx_antennas, pending_phy_tx_antennas and pending_phy_rx_antennas, pending_phy_support_vht) all_phys.append(new_phy) for line in output.splitlines(): match_phy = re.search('Wiphy (.*)', line) if match_phy: if pending_phy_name: add_pending_phy() pending_phy_name = match_phy.group(1) pending_phy_bands = [] pending_phy_modes = [] pending_phy_commands = [] pending_phy_features = [] pending_phy_max_scan_ssids = None pending_phy_tx_antennas = 0 pending_phy_rx_antennas = 0 pending_phy_support_vht = False continue match_section = re.match('\s*(\w.*):\s*$', line) if match_section: current_section = match_section.group(1) match_band = re.match('Band (\d+)', current_section) if match_band: current_band = IwBand(num=int(match_band.group(1)), frequencies=[], frequency_flags={}, mcs_indices=[]) pending_phy_bands.append(current_band) continue # Check for max_scan_ssids. This isn't a section, but it # also isn't within a section. match_max_scan_ssids = re.match('\s*max # scan SSIDs: (\d+)', line) if match_max_scan_ssids and pending_phy_name: pending_phy_max_scan_ssids = int( match_max_scan_ssids.group(1)) continue if (current_section == 'Supported interface modes' and pending_phy_name): mode_match = re.search('\* (\w+)', line) if mode_match: pending_phy_modes.append(mode_match.group(1)) continue if current_section == 'Supported commands' and pending_phy_name: command_match = re.search('\* (\w+)', line) if command_match: pending_phy_commands.append(command_match.group(1)) continue if (current_section is not None and current_section.startswith('VHT Capabilities') and pending_phy_name): pending_phy_support_vht = True continue match_avail_antennas = re.match('\s*Available Antennas: TX (\S+)' ' RX (\S+)', line) if match_avail_antennas and pending_phy_name: pending_phy_tx_antennas = int( match_avail_antennas.group(1), 16) pending_phy_rx_antennas = int( match_avail_antennas.group(2), 16) continue match_device_support = re.match('\s*Device supports (.*)\.', line) if match_device_support and pending_phy_name: pending_phy_features.append(match_device_support.group(1)) continue if not all([current_band, pending_phy_name, line.startswith('\t')]): continue # E.g. # * 2412 MHz [1] (20.0 dBm) # * 2467 MHz [12] (20.0 dBm) (passive scan) # * 2472 MHz [13] (disabled) # * 5260 MHz [52] (19.0 dBm) (no IR, radar detection) match_chan_info = re.search( r'(?P\d+) MHz' r' (?P\[\d+\])' r'(?: \((?P[0-9.]+ dBm)\))?' r'(?: \((?P[a-zA-Z, ]+)\))?', line) if match_chan_info: frequency = int(match_chan_info.group('frequency')) current_band.frequencies.append(frequency) flags_string = match_chan_info.group('flags') if flags_string: current_band.frequency_flags[frequency] = frozenset( flags_string.split(',')) else: # Populate the dict with an empty set, to make # things uniform for client code. current_band.frequency_flags[frequency] = frozenset() continue # re_mcs needs to match something like: # HT TX/RX MCS rate indexes supported: 0-15, 32 if re.search('HT TX/RX MCS rate indexes supported: ', line): rate_string = line.split(':')[1].strip() for piece in rate_string.split(','): if piece.find('-') > 0: # Must be a range like ' 0-15' begin, end = piece.split('-') for index in range(int(begin), int(end) + 1): current_band.mcs_indices.append(index) else: # Must be a single rate like '32 ' current_band.mcs_indices.append(int(piece)) if pending_phy_name: add_pending_phy() return all_phys def remove_interface(self, interface, ignore_status=False): """ Remove a WiFi interface from a PHY. @param interface: string name of interface (e.g. mon0) @param ignore_status: boolean True iff we should ignore failures to remove the interface. """ self._run('%s dev %s del' % (self._command_iw, interface), ignore_status=ignore_status) def determine_security(self, supported_securities): """Determines security from the given list of supported securities. @param supported_securities: list of supported securities from scan """ if not supported_securities: security = SECURITY_OPEN elif len(supported_securities) == 1: security = supported_securities[0] else: security = SECURITY_MIXED return security def scan(self, interface, frequencies=(), ssids=()): """Performs a scan. @param interface: the interface to run the iw command against @param frequencies: list of int frequencies in Mhz to scan. @param ssids: list of string SSIDs to send probe requests for. @returns a list of IwBss namedtuples; None if the scan fails """ scan_result = self.timed_scan(interface, frequencies, ssids) if scan_result is None: return None return scan_result.bss_list def timed_scan(self, interface, frequencies=(), ssids=()): """Performs a timed scan. @param interface: the interface to run the iw command against @param frequencies: list of int frequencies in Mhz to scan. @param ssids: list of string SSIDs to send probe requests for. @returns a IwTimedScan namedtuple; None if the scan fails """ freq_param = '' if frequencies: freq_param = ' freq %s' % ' '.join(map(str, frequencies)) ssid_param = '' if ssids: ssid_param = ' ssid "%s"' % '" "'.join(ssids) iw_command = '%s dev %s scan%s%s' % (self._command_iw, interface, freq_param, ssid_param) command = IW_TIME_COMMAND_FORMAT % iw_command scan = self._run(command, ignore_status=True) if scan.exit_status != 0: # The device was busy logging.debug('scan exit_status: %d', scan.exit_status) return None if not scan.stdout: raise error.TestFail('Missing scan parse time') if scan.stdout.startswith(IW_TIME_COMMAND_OUTPUT_START): logging.debug('Empty scan result') bss_list = [] else: bss_list = self._parse_scan_results(scan.stdout) scan_time = self._parse_scan_time(scan.stdout) return IwTimedScan(scan_time, bss_list) def scan_dump(self, interface): """Dump the contents of the scan cache. Note that this does not trigger a scan. Instead, it returns the kernel's idea of what BSS's are currently visible. @param interface: the interface to run the iw command against @returns a list of IwBss namedtuples; None if the scan fails """ result = self._run('%s dev %s scan dump' % (self._command_iw, interface)) return self._parse_scan_results(result.stdout) def set_tx_power(self, interface, power): """ Set the transmission power for an interface. @param interface: string name of interface to set Tx power on. @param power: string power parameter. (e.g. 'auto'). """ self._run('%s dev %s set txpower %s' % (self._command_iw, interface, power)) def set_freq(self, interface, freq): """ Set the frequency for an interface. @param interface: string name of interface to set frequency on. @param freq: int frequency """ self._run('%s dev %s set freq %d' % (self._command_iw, interface, freq)) def set_regulatory_domain(self, domain_string): """ Set the regulatory domain of the current machine. Note that the regulatory change happens asynchronously to the exit of this function. @param domain_string: string regulatory domain name (e.g. 'US'). """ self._run('%s reg set %s' % (self._command_iw, domain_string)) def get_regulatory_domain(self): """ Get the regulatory domain of the current machine. @returns a string containing the 2-letter regulatory domain name (e.g. 'US'). """ output = self._run('%s reg get' % self._command_iw).stdout m = re.match('^country (..):', output) if not m: return None return m.group(1) def wait_for_scan_result(self, interface, bsses=(), ssids=(), timeout_seconds=30, wait_for_all=False): """Returns a list of IWBSS objects for given list of bsses or ssids. This method will scan for a given timeout and return all of the networks that have a matching ssid or bss. If wait_for_all is true and all networks are not found within the given timeout an empty list will be returned. @param interface: which interface to run iw against @param bsses: a list of BSS strings @param ssids: a list of ssid strings @param timeout_seconds: the amount of time to wait in seconds @param wait_for_all: True to wait for all listed bsses or ssids; False to return if any of the networks were found @returns a list of IwBss collections that contain the given bss or ssid; if the scan is empty or returns an error code None is returned. """ start_time = time.time() scan_failure_attempts = 0 logging.info('Performing a scan with a max timeout of %d seconds.', timeout_seconds) remaining_bsses = copy.copy(bsses) remaining_ssids = copy.copy(ssids) while time.time() - start_time < timeout_seconds: scan_results = self.scan(interface) if scan_results is None or len(scan_results) == 0: scan_failure_attempts += 1 # Allow in-progress scan to complete time.sleep(5) # If the in-progress scan takes more than 30 seconds to # complete it will most likely never complete; abort. # See crbug.com/309148. if scan_failure_attempts > 5: logging.error('Scan failed to run, see debug log for ' 'error code.') return None continue scan_failure_attempts = 0 matching_iwbsses = set() for iwbss in scan_results: if iwbss.bss in bsses and len(remaining_bsses) > 0: remaining_bsses.remove(iwbss.bss) matching_iwbsses.add(iwbss) if iwbss.ssid in ssids and len(remaining_ssids) > 0: remaining_ssids.remove(iwbss.ssid) matching_iwbsses.add(iwbss) if wait_for_all: if len(remaining_bsses) == 0 and len(remaining_ssids) == 0: return list(matching_iwbsses) else: if len(matching_iwbsses) > 0: return list(matching_iwbsses) if scan_failure_attempts > 0: return None # The SSID wasn't found, but the device is fine. return list() def wait_for_link(self, interface, timeout_seconds=10): """Waits until a link completes on |interface|. @param interface: which interface to run iw against. @param timeout_seconds: the amount of time to wait in seconds. @returns True if link was established before the timeout. """ start_time = time.time() while time.time() - start_time < timeout_seconds: link_results = self._run('%s dev %s link' % (self._command_iw, interface)) if 'Not connected' not in link_results.stdout: return True time.sleep(1) return False def set_antenna_bitmap(self, phy, tx_bitmap, rx_bitmap): """Set antenna chain mask on given phy (radio). This function will set the antennas allowed to use for TX and RX on the |phy| based on the |tx_bitmap| and |rx_bitmap|. This command is only allowed when the interfaces on the phy are down. @param phy: phy name @param tx_bitmap: bitmap of allowed antennas to use for TX @param rx_bitmap: bitmap of allowed antennas to use for RX """ command = '%s phy %s set antenna %d %d' % (self._command_iw, phy, tx_bitmap, rx_bitmap) self._run(command) def get_event_logger(self): """Create and return a IwEventLogger object. @returns a IwEventLogger object. """ local_file = IW_LOCAL_EVENT_LOG_FILE % (self._log_id) self._log_id += 1 return iw_event_logger.IwEventLogger(self._host, self._command_iw, local_file) def vht_supported(self): """Returns True if VHT is supported; False otherwise.""" result = self._run('%s list' % self._command_iw).stdout if 'VHT Capabilities' in result: return True return False def frequency_supported(self, frequency): """Returns True if the given frequency is supported; False otherwise. @param frequency: int Wifi frequency to check if it is supported by DUT. """ phys = self.list_phys() for phy in phys: for band in phy.bands: if frequency in band.frequencies: return True return False