# Lint as: python2, python3 # Copyright 2019 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. from __future__ import absolute_import from __future__ import division from __future__ import print_function import collections import logging import operator import re from six.moves import map from six.moves import range from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib import utils from autotest_lib.client.common_lib.cros.network import iw_event_logger # These must mirror the values in 'iw list' output. CHAN_FLAG_DISABLED = 'disabled' CHAN_FLAG_NO_IR = 'no IR' CHAN_FLAG_PASSIVE_SCAN = 'passive scan' CHAN_FLAG_RADAR_DETECT = 'radar detection' DEV_MODE_AP = 'AP' DEV_MODE_IBSS = 'IBSS' DEV_MODE_MONITOR = 'monitor' DEV_MODE_MESH_POINT = 'mesh point' DEV_MODE_STATION = 'managed' SUPPORTED_DEV_MODES = (DEV_MODE_AP, DEV_MODE_IBSS, DEV_MODE_MONITOR, DEV_MODE_MESH_POINT, DEV_MODE_STATION) class _PrintableWidth: """Printable width constant objects used by packet_capturer.""" def __init__(self, name): self._name = name def __repr__(self): return '\'%s\'' % self._name def __str__(self): return self._name WIDTH_HT20 = _PrintableWidth('HT20') WIDTH_HT40_PLUS = _PrintableWidth('HT40+') WIDTH_HT40_MINUS = _PrintableWidth('HT40-') WIDTH_VHT80 = _PrintableWidth('VHT80') WIDTH_VHT160 = _PrintableWidth('VHT160') WIDTH_VHT80_80 = _PrintableWidth('VHT80+80') VHT160_CENTER_CHANNELS = ('50','114') SECURITY_OPEN = 'open' SECURITY_WEP = 'wep' SECURITY_WPA = 'wpa' SECURITY_WPA2 = 'wpa2' # Mixed mode security is WPA2/WPA SECURITY_MIXED = 'mixed' # Table of lookups between the output of item 'secondary channel offset:' from # iw scan to constants. HT_TABLE = {'no secondary': WIDTH_HT20, 'above': WIDTH_HT40_PLUS, 'below': WIDTH_HT40_MINUS} IwBand = collections.namedtuple( 'Band', ['num', 'frequencies', 'frequency_flags', 'mcs_indices']) IwBss = collections.namedtuple('IwBss', ['bss', 'frequency', 'ssid', 'security', 'width', 'signal']) IwNetDev = collections.namedtuple('IwNetDev', ['phy', 'if_name', 'if_type']) IwTimedScan = collections.namedtuple('IwTimedScan', ['time', 'bss_list']) # The fields for IwPhy are as follows: # name: string name of the phy, such as "phy0" # bands: list of IwBand objects. # modes: List of strings containing interface modes supported, such as "AP". # commands: List of strings containing nl80211 commands supported, such as # "authenticate". # features: List of strings containing nl80211 features supported, such as # "T-DLS". # max_scan_ssids: Maximum number of SSIDs which can be scanned at once. IwPhy = collections.namedtuple( 'Phy', ['name', 'bands', 'modes', 'commands', 'features', 'max_scan_ssids', 'avail_tx_antennas', 'avail_rx_antennas', 'supports_setting_antenna_mask', 'support_vht']) DEFAULT_COMMAND_IW = 'iw' # Redirect stderr to stdout on Cros since adb commands cannot distinguish them # on Brillo. IW_TIME_COMMAND_FORMAT = '(time -p %s) 2>&1' IW_TIME_COMMAND_OUTPUT_START = 'real' IW_LINK_KEY_BEACON_INTERVAL = 'beacon int' IW_LINK_KEY_DTIM_PERIOD = 'dtim period' IW_LINK_KEY_FREQUENCY = 'freq' IW_LINK_KEY_SIGNAL = 'signal' IW_LINK_KEY_RX_BITRATE = 'rx bitrate' IW_LINK_KEY_RX_DROPS = 'rx drop misc' IW_LINK_KEY_RX_PACKETS = 'rx packets' IW_LINK_KEY_TX_BITRATE = 'tx bitrate' IW_LINK_KEY_TX_FAILURES = 'tx failed' IW_LINK_KEY_TX_PACKETS = 'tx packets' IW_LINK_KEY_TX_RETRIES = 'tx retries' IW_LOCAL_EVENT_LOG_FILE = './debug/iw_event_%d.log' # Strings from iw/util.c describing supported HE features HE_MAC_PLUS_HTC_HE = '+HTC HE Supported' HE_MAC_TWT_REQUESTER = 'TWT Requester' HE_MAC_TWT_RESPONDER = 'TWT Responder' HE_MAC_DYNAMIC_BA_FRAGMENTATION = 'Dynamic BA Fragementation Level' HE_MAC_MAX_MSDUS = 'Maximum number of MSDUS Fragments' HE_MAC_MIN_PAYLOAD_128 = 'Minimum Payload size of 128 bytes' HE_MAC_TRIGGER_FRAME_PADDING = 'Trigger Frame MAC Padding Duration' HE_MAC_MULTI_TID_AGGREGATION = 'Multi-TID Aggregation Support' HE_MAC_ALL_ACK = 'All Ack' HE_MAC_TRS = 'TRS' HE_MAC_BSR = 'BSR' HE_MAC_TWT_BROADCAST = 'Broadcast TWT' HE_MAC_32_BIT_BA_BITMAP = '32-bit BA Bitmap' HE_MAC_MU_CASCADING = 'MU Cascading' HE_MAC_ACK_AGGREGATION = 'Ack-Enabled Aggregation' HE_MAC_OM_CONTROL = 'OM Control' HE_MAC_OFDMA_RA = 'OFDMA RA' HE_MAC_MAX_AMPDU_LENGTH_EXPONENT = 'Maximum A-MPDU Length Exponent' HE_MAC_AMSDU_FRAGMENTATION = 'A-MSDU Fragmentation' HE_MAC_FLEXIBLE_TWT = 'Flexible TWT Scheduling' HE_MAC_RX_CONTROL_FRAME_TO_MULTIBSS = 'RX Control Frame to MultiBSS' HE_MAC_BSRP_BQRP_AMPDU_AGGREGATION = 'BSRP BQRP A-MPDU Aggregation' HE_MAC_QTP = 'QTP' HE_MAC_BQR = 'BQR' HE_MAC_SRP_RESPONDER_ROLE = 'SRP Responder Role' HE_MAC_NDP_FEEDBACK_REPORT = 'NDP Feedback Report' HE_MAC_OPS = 'OPS' HE_MAC_AMSDU_IN_AMPDU = 'A-MSDU in A-MPDU' HE_MAC_MULTI_TID_AGGREGATION_TX = 'Multi-TID Aggregation TX' HE_MAC_SUBCHANNEL_SELECTIVE = 'HE Subchannel Selective Transmission' HE_MAC_UL_2X966_TONE_RU = 'UL 2x996-Tone RU' HE_MAC_OM_CONTROL_DISABLE_RX = 'OM Control UL MU Data Disable RX' HE_PHY_24HE40 = 'HE40/2.4GHz' HE_PHY_5HE40_80 = 'HE40/HE80/5GHz' HE_PHY_5HE160 = 'HE160/5GHz' HE_PHY_5HE160_80_80 = 'HE160/HE80+80/5GHz' HE_PHY_242_TONE_RU_24 = '242 tone RUs/2.4GHz' HE_PHY_242_TONE_RU_5 = '242 tone RUs/5GHz' HE_PHY_PUNCTURED_PREAMBLE_RX = 'Punctured Preamble RX' HE_PHY_DEVICE_CLASS = 'Device Class' HE_PHY_LDPC_CODING_IN_PAYLOAD = 'LDPC Coding in Payload' HE_PHY_HE_SU_PPDU_1X_HE_LTF_08_GI = 'HE SU PPDU with 1x HE-LTF and 0.8us GI' HE_PHY_HE_MIDAMBLE_RX_MAX_NSTS = 'Midamble Rx Max NSTS' HE_PHY_NDP_4X_HE_LTF_32_GI = 'NDP with 4x HE-LTF and 3.2us GI' HE_PHY_STBC_TX_LEQ_80 = 'STBC Tx <= 80MHz' HE_PHY_STBC_RX_LEQ_80 = 'STBC Rx <= 80MHz' HE_PHY_DOPPLER_TX = 'Doppler Tx' HE_PHY_DOPPLER_RX = 'Doppler Rx' HE_PHY_FULL_BAND_UL_MU_MIMO = 'Full Bandwidth UL MU-MIMO' HE_PHY_PART_BAND_UL_MU_MIMO = 'Partial Bandwidth UL MU-MIMO' HE_PHY_DCM_MAX_CONSTELLATION = 'DCM Max Constellation' HE_PHY_DCM_MAX_NSS_TX = 'DCM Max NSS Tx' HE_PHY_DCM_MAX_CONSTELLATION_RX = 'DCM Max Constellation Rx' HE_PHY_DCM_MAX_NSS_RX = 'DCM Max NSS Rx' HE_PHY_RX_MU_PPDU_FROM_NON_AP = 'Rx HE MU PPDU from Non-AP STA' HE_PHY_SU_BEAMFORMER = 'SU Beamformer' HE_PHY_SU_BEAMFORMEE = 'SU Beamformee' HE_PHY_MU_BEAMFORMER = 'MU Beamformer' HE_PHY_BEAMFORMEE_STS_LEQ_80 = 'Beamformee STS <= 80Mhz' HE_PHY_BEAMFORMEE_STS_GT_80 = 'Beamformee STS > 80Mhz' HE_PHY_SOUNDING_DIMENSIONS_LEQ_80 = 'Sounding Dimensions <= 80Mhz' HE_PHY_SOUNDING_DIMENSIONS_GT_80 = 'Sounding Dimensions > 80Mhz' HE_PHY_NG_EQ_16_SU_FB = 'Ng = 16 SU Feedback' HE_PHY_NG_EQ_16_MU_FB = 'Ng = 16 MU Feedback' HE_PHY_CODEBOOK_SIZE_SU_FB = 'Codebook Size SU Feedback' HE_PHY_CODEBOOK_SIZE_MU_FB = 'Codebook Size MU Feedback' HE_PHY_TRIGGERED_SU_BEAMFORMING_FB = 'Triggered SU Beamforming Feedback' HE_PHY_TRIGGERED_MU_BEAMFORMING_FB = 'Triggered MU Beamforming Feedback' HE_PHY_TRIGGERED_CQI_FB = 'Triggered CQI Feedback' HE_PHY_PART_BAND_EXT_RANGE = 'Partial Bandwidth Extended Range' HE_PHY_PART_BAND_DL_MU_MIMO = 'Partial Bandwidth DL MU-MIMO' HE_PHY_PPE_THRESHOLD = 'PPE Threshold Present' HE_PHY_SRP_SR = 'SRP-based SR' HE_PHY_POWER_BOOST_FACTOR_AR = 'Power Boost Factor ar' HE_PHY_SU_PPDU_4X_HE_LTF_08_GI = 'HE SU PPDU & HE PPDU 4x HE-LTF 0.8us GI' HE_PHY_MAX_NC = 'Max NC' HE_PHY_STBC_TX_GT_80 = 'STBC Tx > 80MHz' HE_PHY_STBC_RX_GT_80 = 'STBC Rx > 80MHz' HE_PHY_ER_SU_PPDU_4X_HE_LTF_08_GI = 'HE ER SU PPDU 4x HE-LTF 0.8us GI' HE_PHY_20_IN_44_PPDU_24 = '20MHz in 40MHz HE PPDU 2.4GHz' HE_PHY_20_IN_160_80_80 = '20MHz in 160/80+80MHz HE PPDU' HE_PHY_80_IN_160_80_80 = '80MHz in 160/80+80MHz HE PPDU' HE_PHY_ER_SU_PPDU_1X_HE_LTF_08_GI = 'HE ER SU PPDU 1x HE-LTF 0.8us GI' HE_PHY_MIDAMBLE_RX_2X_AND_1X_HE_LTF = 'Midamble Rx 2x & 1x HE-LTF' HE_PHY_DCM_MAX_BW = 'DCM Max BW' HE_PHY_LONGER_THAN_16HE_OFDM_SYM = 'Longer Than 16HE SIG-B OFDM Symbols' HE_PHY_NON_TRIGGERED_CQI_FB = 'Non-Triggered CQI Feedback' HE_PHY_TX_1024_QAM = 'TX 1024-QAM' HE_PHY_RX_1024_QAM = 'RX 1024-QAM' HE_PHY_RX_FULL_BW_SU_USING_MU_COMPRESSION_SIGB = \ 'RX Full BW SU Using HE MU PPDU with Compression SIGB' HE_PHY_RX_FULL_BW_SU_USING_MU_NON_COMPRESSION_SIGB = \ 'RX Full BW SU Using HE MU PPDU with Non-Compression SIGB' def _get_all_link_keys(link_information): """Parses link or station dump output for link key value pairs. Link or station dump information is in the format below: Connected to 74:e5:43:10:4f:c0 (on wlan0) SSID: PMKSACaching_4m9p5_ch1 freq: 5220 RX: 5370 bytes (37 packets) TX: 3604 bytes (15 packets) signal: -59 dBm tx bitrate: 13.0 MBit/s MCS 1 bss flags: short-slot-time dtim period: 5 beacon int: 100 @param link_information: string containing the raw link or station dump information as reported by iw. Note that this parsing assumes a single entry, in the case of multiple entries (e.g. listing stations from an AP, or listing mesh peers), the entries must be split on a per peer/client basis before this parsing operation. @return a dictionary containing all the link key/value pairs. """ link_key_value_pairs = {} keyval_regex = re.compile(r'^\s+(.*):\s+(.*)$') for link_key in link_information.splitlines()[1:]: match = re.search(keyval_regex, link_key) if match: # Station dumps can contain blank lines. link_key_value_pairs[match.group(1)] = match.group(2) return link_key_value_pairs def _extract_bssid(link_information, interface_name, station_dump=False): """Get the BSSID that |interface_name| is associated with. See doc for _get_all_link_keys() for expected format of the station or link information entry. @param link_information: string containing the raw link or station dump information as reported by iw. Note that this parsing assumes a single entry, in the case of multiple entries (e.g. listing stations from an AP or listing mesh peers), the entries must be split on a per peer/client basis before this parsing operation. @param interface_name: string name of interface (e.g. 'wlan0'). @param station_dump: boolean indicator of whether the link information is from a 'station dump' query. If False, it is assumed the string is from a 'link' query. @return string bssid of the current association, or None if no matching association information is found. """ # We're looking for a line like this when parsing the output of a 'link' # query: # Connected to 04:f0:21:03:7d:bb (on wlan0) # We're looking for a line like this when parsing the output of a # 'station dump' query: # Station 04:f0:21:03:7d:bb (on mesh-5000mhz) identifier = 'Station' if station_dump else 'Connected to' search_re = r'%s ([0-9a-fA-F:]{17}) \(on %s\)' % (identifier, interface_name) match = re.match(search_re, link_information) if match is None: return None return match.group(1) class IwRunner(object): """Defines an interface to the 'iw' command.""" def __init__(self, remote_host=None, command_iw=DEFAULT_COMMAND_IW): self._run = utils.run self._host = remote_host if remote_host: self._run = remote_host.run self._command_iw = command_iw self._log_id = 0 def _parse_scan_results(self, output): """Parse the output of the 'scan' and 'scan dump' commands. Here is an example of what a single network would look like for the input parameter. Some fields have been removed in this example: BSS 00:11:22:33:44:55(on wlan0) freq: 2447 beacon interval: 100 TUs signal: -46.00 dBm Information elements from Probe Response frame: SSID: my_open_network Extended supported rates: 24.0 36.0 48.0 54.0 HT capabilities: Capabilities: 0x0c HT20 HT operation: * primary channel: 8 * secondary channel offset: no secondary * STA channel width: 20 MHz RSN: * Version: 1 * Group cipher: CCMP * Pairwise ciphers: CCMP * Authentication suites: PSK * Capabilities: 1-PTKSA-RC 1-GTKSA-RC (0x0000) @param output: string command output. @returns a list of IwBss namedtuples; None if the scan fails """ bss = None frequency = None ssid = None ht = None vht = None signal = None security = None supported_securities = [] bss_list = [] # TODO(crbug.com/1032892): The parsing logic here wasn't really designed # for the presence of multiple information elements like HT, VHT, and # (eventually) HE. We should eventually update it to check that we are # in the right section (e.g., verify the '* channel width' match is a # match in the VHT section and not a different section). Also, we should # probably add in VHT20, and VHT40 whenever we finish this bug. for line in output.splitlines(): line = line.strip() bss_match = re.match('BSS ([0-9a-f:]+)', line) if bss_match: if bss != None: security = self.determine_security(supported_securities) iwbss = IwBss(bss, frequency, ssid, security, vht if vht else ht, signal) bss_list.append(iwbss) bss = frequency = ssid = security = ht = vht = None supported_securities = [] bss = bss_match.group(1) if line.startswith('freq:'): frequency = int(line.split()[1]) if line.startswith('signal:'): signal = float(line.split()[1]) if line.startswith('SSID: '): _, ssid = line.split(': ', 1) if line.startswith('* secondary channel offset'): ht = HT_TABLE[line.split(':')[1].strip()] # Checking for the VHT channel width based on IEEE 802.11-2016 # Table 9-252. if line.startswith('* channel width:'): chan_width_subfield = line.split(':')[1].strip()[0] if chan_width_subfield == '1': vht = WIDTH_VHT80 # 2 and 3 are deprecated but are included here for older APs. if chan_width_subfield == '2': vht = WIDTH_VHT160 if chan_width_subfield == '3': vht = WIDTH_VHT80_80 if line.startswith('* center freq segment 2:'): center_chan_two = line.split(':')[1].strip() if vht == WIDTH_VHT80: if center_chan_two in VHT160_CENTER_CHANNELS: vht = WIDTH_VHT160 elif center_chan_two != '0': vht = WIDTH_VHT80_80 if line.startswith('WPA'): supported_securities.append(SECURITY_WPA) if line.startswith('RSN'): supported_securities.append(SECURITY_WPA2) security = self.determine_security(supported_securities) bss_list.append(IwBss(bss, frequency, ssid, security, vht if vht else ht, signal)) return bss_list def _parse_scan_time(self, output): """ Parse the scan time in seconds from the output of the 'time -p "scan"' command. 'time -p' Command output format is below: real 0.01 user 0.01 sys 0.00 @param output: string command output. @returns float time in seconds. """ output_lines = output.splitlines() for line_num, line in enumerate(output_lines): line = line.strip() if (line.startswith(IW_TIME_COMMAND_OUTPUT_START) and output_lines[line_num + 1].startswith('user') and output_lines[line_num + 2].startswith('sys')): return float(line.split()[1]) raise error.TestFail('Could not parse scan time.') def add_interface(self, phy, interface, interface_type): """ Add an interface to a WiFi PHY. @param phy: string name of PHY to add an interface to. @param interface: string name of interface to add. @param interface_type: string type of interface to add (e.g. 'monitor'). """ self._run('%s phy %s interface add %s type %s' % (self._command_iw, phy, interface, interface_type)) def disconnect_station(self, interface): """ Disconnect a STA from a network. @param interface: string name of interface to disconnect. """ self._run('%s dev %s disconnect' % (self._command_iw, interface)) def get_current_bssid(self, interface_name): """Get the BSSID that |interface_name| is associated with. @param interface_name: string name of interface (e.g. 'wlan0'). @return string bssid of our current association, or None. """ result = self._run('%s dev %s link' % (self._command_iw, interface_name), ignore_status=True) if result.exit_status: # See comment in get_link_value. return None return _extract_bssid(result.stdout, interface_name) def get_interface(self, interface_name): """Get full information about an interface given an interface name. @param interface_name: string name of interface (e.g. 'wlan0'). @return IwNetDev tuple. """ matching_interfaces = [iw_if for iw_if in self.list_interfaces() if iw_if.if_name == interface_name] if len(matching_interfaces) != 1: raise error.TestFail('Could not find interface named %s' % interface_name) return matching_interfaces[0] def get_link_value(self, interface, iw_link_key): """Get the value of a link property for |interface|. Checks the link using iw, and parses the result to return a link key. @param iw_link_key: string one of IW_LINK_KEY_* defined above. @param interface: string desired value of iw link property. @return string containing the corresponding link property value, None if there was a parsing error or the iw command failed. """ result = self._run('%s dev %s link' % (self._command_iw, interface), ignore_status=True) if result.exit_status: # When roaming, there is a period of time for mac80211 based drivers # when the driver is 'associated' with an SSID but not a particular # BSS. This causes iw to return an error code (-2) when attempting # to retrieve information specific to the BSS. This does not happen # in mwifiex drivers. return None actual_value = _get_all_link_keys(result.stdout).get(iw_link_key) if actual_value is not None: logging.info('Found iw link key %s with value %s.', iw_link_key, actual_value) return actual_value def get_station_dump(self, interface): """Gets information about connected peers. Returns information about the currently connected peers. When the host is in station mode, it returns a single entry, with information about the link to the AP it is currently connected to. If the host is in mesh or AP mode, it can return multiple entries, one for each connected station, or mesh peer. @param interface: string name of interface to get peer information from. @return a list of dictionaries with link information about each connected peer (ordered by peer mac address). """ result = self._run('%s dev %s station dump' % (self._command_iw, interface)) parts = re.split(r'^Station ', result.stdout, flags=re.MULTILINE)[1:] peer_list_raw = ['Station ' + x for x in parts] parsed_peer_info = [] for peer in peer_list_raw: peer_link_keys = _get_all_link_keys(peer) rssi_str = peer_link_keys.get(IW_LINK_KEY_SIGNAL, '0') rssi_int = int(rssi_str.split()[0]) tx_bitrate = peer_link_keys.get(IW_LINK_KEY_TX_BITRATE, '0') tx_failures = int(peer_link_keys.get(IW_LINK_KEY_TX_FAILURES, 0)) tx_packets = int(peer_link_keys.get(IW_LINK_KEY_TX_PACKETS, 0)) tx_retries = int(peer_link_keys.get(IW_LINK_KEY_TX_RETRIES, 0)) rx_bitrate = peer_link_keys.get(IW_LINK_KEY_RX_BITRATE, '0') rx_drops = int(peer_link_keys.get(IW_LINK_KEY_RX_DROPS, 0)) rx_packets = int(peer_link_keys.get(IW_LINK_KEY_RX_PACKETS, 0)) mac = _extract_bssid(link_information=peer, interface_name=interface, station_dump=True) # If any of these are missing, they will be None peer_info = {'rssi_int': rssi_int, 'rssi_str': rssi_str, 'tx_bitrate': tx_bitrate, 'tx_failures': tx_failures, 'tx_packets': tx_packets, 'tx_retries': tx_retries, 'rx_bitrate': rx_bitrate, 'rx_drops': rx_drops, 'rx_packets': rx_packets, 'mac': mac} # don't evaluate if tx_packets 0 if tx_packets: peer_info['tx_retry_rate'] = tx_retries / float(tx_packets) peer_info['tx_failure_rate'] = tx_failures / float(tx_packets) # don't evaluate if rx_packets is 0 if rx_packets: peer_info['rx_drop_rate'] = rx_drops / float(rx_packets) parsed_peer_info.append(peer_info) return sorted(parsed_peer_info, key=operator.itemgetter('mac')) def get_operating_mode(self, interface): """Gets the operating mode for |interface|. @param interface: string name of interface to get peer information about. @return string one of DEV_MODE_* defined above, or None if no mode is found, or if an unsupported mode is found. """ ret = self._run('%s dev %s info' % (self._command_iw, interface)) mode_regex = r'^\s*type (.*)$' match = re.search(mode_regex, ret.stdout, re.MULTILINE) if match: operating_mode = match.group(1) if operating_mode in SUPPORTED_DEV_MODES: return operating_mode logging.warning( 'Unsupported operating mode %s found for interface: %s. ' 'Supported modes: %s', operating_mode, interface, SUPPORTED_DEV_MODES) return None def get_radio_config(self, interface): """Gets the channel information of a specfic interface using iw. @param interface: string name of interface to get radio information from. @return dictionary containing the channel information. """ channel_config = {} ret = self._run('%s dev %s info' % (self._command_iw, interface)) channel_config_regex = (r'^\s*channel ([0-9]+) \(([0-9]+) MHz\), ' 'width: ([2,4,8]0) MHz, center1: ([0-9]+) MHz') match = re.search(channel_config_regex, ret.stdout, re.MULTILINE) if match: channel_config['number'] = int(match.group(1)) channel_config['freq'] = int(match.group(2)) channel_config['width'] = int(match.group(3)) channel_config['center1_freq'] = int(match.group(4)) return channel_config def ibss_join(self, interface, ssid, frequency): """ Join a WiFi interface to an IBSS. @param interface: string name of interface to join to the IBSS. @param ssid: string SSID of IBSS to join. @param frequency: int frequency of IBSS in Mhz. """ self._run('%s dev %s ibss join %s %d' % (self._command_iw, interface, ssid, frequency)) def ibss_leave(self, interface): """ Leave an IBSS. @param interface: string name of interface to remove from the IBSS. """ self._run('%s dev %s ibss leave' % (self._command_iw, interface)) def list_interfaces(self, desired_if_type=None): """List WiFi related interfaces on this system. @param desired_if_type: string type of interface to filter our returned list of interfaces for (e.g. 'managed'). @return list of IwNetDev tuples. """ # Parse output in the following format: # # $ adb shell iw dev # phy#0 # Unnamed/non-netdev interface # wdev 0x2 # addr aa:bb:cc:dd:ee:ff # type P2P-device # Interface wlan0 # ifindex 4 # wdev 0x1 # addr aa:bb:cc:dd:ee:ff # ssid Whatever # type managed output = self._run('%s dev' % self._command_iw).stdout interfaces = [] phy = None if_name = None if_type = None for line in output.splitlines(): m = re.match('phy#([0-9]+)', line) if m: phy = 'phy%d' % int(m.group(1)) if_name = None if_type = None continue if not phy: continue m = re.match('[\s]*Interface (.*)', line) if m: if_name = m.group(1) continue if not if_name: continue # Common values for type are 'managed', 'monitor', and 'IBSS'. m = re.match('[\s]*type ([a-zA-Z]+)', line) if m: if_type = m.group(1) interfaces.append(IwNetDev(phy=phy, if_name=if_name, if_type=if_type)) # One phy may have many interfaces, so don't reset it. if_name = None if desired_if_type: interfaces = [interface for interface in interfaces if interface.if_type == desired_if_type] return interfaces def list_phys(self): """ List WiFi PHYs on the given host. @return list of IwPhy tuples. """ output = self._run('%s list' % self._command_iw).stdout pending_phy_name = None current_band = None current_section = None all_phys = [] def add_pending_phy(): """Add the pending phy into |all_phys|.""" bands = tuple(IwBand(band.num, tuple(band.frequencies), dict(band.frequency_flags), tuple(band.mcs_indices)) for band in pending_phy_bands) new_phy = IwPhy(pending_phy_name, bands, tuple(pending_phy_modes), tuple(pending_phy_commands), tuple(pending_phy_features), pending_phy_max_scan_ssids, pending_phy_tx_antennas, pending_phy_rx_antennas, pending_phy_tx_antennas and pending_phy_rx_antennas, pending_phy_support_vht) all_phys.append(new_phy) for line in output.splitlines(): match_phy = re.search('Wiphy (.*)', line) if match_phy: if pending_phy_name: add_pending_phy() pending_phy_name = match_phy.group(1) pending_phy_bands = [] pending_phy_modes = [] pending_phy_commands = [] pending_phy_features = [] pending_phy_max_scan_ssids = None pending_phy_tx_antennas = 0 pending_phy_rx_antennas = 0 pending_phy_support_vht = False continue match_section = re.match('\s*(\w.*):\s*$', line) if match_section: current_section = match_section.group(1) match_band = re.match('Band (\d+)', current_section) if match_band: current_band = IwBand(num=int(match_band.group(1)), frequencies=[], frequency_flags={}, mcs_indices=[]) pending_phy_bands.append(current_band) continue # Check for max_scan_ssids. This isn't a section, but it # also isn't within a section. match_max_scan_ssids = re.match('\s*max # scan SSIDs: (\d+)', line) if match_max_scan_ssids and pending_phy_name: pending_phy_max_scan_ssids = int( match_max_scan_ssids.group(1)) continue if (current_section == 'Supported interface modes' and pending_phy_name): mode_match = re.search('\* (\w+)', line) if mode_match: pending_phy_modes.append(mode_match.group(1)) continue if current_section == 'Supported commands' and pending_phy_name: command_match = re.search('\* (\w+)', line) if command_match: pending_phy_commands.append(command_match.group(1)) continue if (current_section is not None and current_section.startswith('VHT Capabilities') and pending_phy_name): pending_phy_support_vht = True continue match_avail_antennas = re.match('\s*Available Antennas: TX (\S+)' ' RX (\S+)', line) if match_avail_antennas and pending_phy_name: pending_phy_tx_antennas = int( match_avail_antennas.group(1), 16) pending_phy_rx_antennas = int( match_avail_antennas.group(2), 16) continue match_device_support = re.match('\s*Device supports (.*)\.', line) if match_device_support and pending_phy_name: pending_phy_features.append(match_device_support.group(1)) continue if not all([current_band, pending_phy_name, line.startswith('\t')]): continue # E.g. # * 2412 MHz [1] (20.0 dBm) # * 2467 MHz [12] (20.0 dBm) (passive scan) # * 2472 MHz [13] (disabled) # * 5260 MHz [52] (19.0 dBm) (no IR, radar detection) match_chan_info = re.search( r'(?P\d+) MHz' r' (?P\[\d+\])' r'(?: \((?P[0-9.]+ dBm)\))?' r'(?: \((?P[a-zA-Z, ]+)\))?', line) if match_chan_info: frequency = int(match_chan_info.group('frequency')) current_band.frequencies.append(frequency) flags_string = match_chan_info.group('flags') if flags_string: current_band.frequency_flags[frequency] = frozenset( flags_string.split(',')) else: # Populate the dict with an empty set, to make # things uniform for client code. current_band.frequency_flags[frequency] = frozenset() continue # re_mcs needs to match something like: # HT TX/RX MCS rate indexes supported: 0-15, 32 if re.search('HT TX/RX MCS rate indexes supported: ', line): rate_string = line.split(':')[1].strip() for piece in rate_string.split(','): if piece.find('-') > 0: # Must be a range like ' 0-15' begin, end = piece.split('-') for index in range(int(begin), int(end) + 1): current_band.mcs_indices.append(index) else: # Must be a single rate like '32 ' current_band.mcs_indices.append(int(piece)) if pending_phy_name: add_pending_phy() return all_phys def remove_interface(self, interface, ignore_status=False): """ Remove a WiFi interface from a PHY. @param interface: string name of interface (e.g. mon0) @param ignore_status: boolean True iff we should ignore failures to remove the interface. """ self._run('%s dev %s del' % (self._command_iw, interface), ignore_status=ignore_status) def determine_security(self, supported_securities): """Determines security from the given list of supported securities. @param supported_securities: list of supported securities from scan """ if not supported_securities: security = SECURITY_OPEN elif len(supported_securities) == 1: security = supported_securities[0] else: security = SECURITY_MIXED return security def scan(self, interface, frequencies=(), ssids=()): """Performs a scan. @param interface: the interface to run the iw command against @param frequencies: list of int frequencies in Mhz to scan. @param ssids: list of string SSIDs to send probe requests for. @returns a list of IwBss namedtuples; None if the scan fails """ scan_result = self.timed_scan(interface, frequencies, ssids) if scan_result is None: return None return scan_result.bss_list def timed_scan(self, interface, frequencies=(), ssids=()): """Performs a timed scan. @param interface: the interface to run the iw command against @param frequencies: list of int frequencies in Mhz to scan. @param ssids: list of string SSIDs to send probe requests for. @returns a IwTimedScan namedtuple; None if the scan fails """ freq_param = '' if frequencies: freq_param = ' freq %s' % ' '.join(map(str, frequencies)) ssid_param = '' if ssids: ssid_param = ' ssid "%s"' % '" "'.join(ssids) iw_command = '%s dev %s scan%s%s' % (self._command_iw, interface, freq_param, ssid_param) command = IW_TIME_COMMAND_FORMAT % iw_command scan = self._run(command, ignore_status=True) if scan.exit_status != 0: # The device was busy logging.debug('scan exit_status: %d', scan.exit_status) return None if not scan.stdout: raise error.TestFail('Missing scan parse time') if scan.stdout.startswith(IW_TIME_COMMAND_OUTPUT_START): logging.debug('Empty scan result') bss_list = [] else: bss_list = self._parse_scan_results(scan.stdout) scan_time = self._parse_scan_time(scan.stdout) return IwTimedScan(scan_time, bss_list) def scan_dump(self, interface): """Dump the contents of the scan cache. Note that this does not trigger a scan. Instead, it returns the kernel's idea of what BSS's are currently visible. @param interface: the interface to run the iw command against @returns a list of IwBss namedtuples; None if the scan fails """ result = self._run('%s dev %s scan dump' % (self._command_iw, interface)) return self._parse_scan_results(result.stdout) def set_tx_power(self, interface, power): """ Set the transmission power for an interface. @param interface: string name of interface to set Tx power on. @param power: string power parameter. (e.g. 'auto'). """ self._run('%s dev %s set txpower %s' % (self._command_iw, interface, power)) def set_freq(self, interface, freq): """ Set the frequency for an interface. @param interface: string name of interface to set frequency on. @param freq: int frequency """ self._run('%s dev %s set freq %d' % (self._command_iw, interface, freq)) def set_regulatory_domain(self, domain_string): """ Set the regulatory domain of the current machine. Note that the regulatory change happens asynchronously to the exit of this function. @param domain_string: string regulatory domain name (e.g. 'US'). """ self._run('%s reg set %s' % (self._command_iw, domain_string)) def get_regulatory_domain(self, wiphy=None): """ Get the regulatory domain of the current machine. @param wiphy: string; if provided, check for the phy-specific domain, rather than the global one. @returns a string containing the 2-letter regulatory domain name (e.g. 'US'). """ cmd = self._command_iw if wiphy: cmd += ' phy ' + wiphy cmd += ' reg get' output = self._run(cmd).stdout m = re.search('^country (..):', output, re.MULTILINE) if not m: return None return m.group(1) def is_regulatory_self_managed(self): """ Determine if any WiFi device on the system manages its own regulatory info (NL80211_ATTR_WIPHY_SELF_MANAGED_REG). @returns True if self-managed, False otherwise. """ output = self._run('%s reg get' % self._command_iw).stdout m = re.search('^phy#.*\(self-managed\)', output, re.MULTILINE) return not m is None def wait_for_scan_result(self, interface, bsses=(), ssids=(), timeout_seconds=30, wait_for_all=False): """Returns a list of IWBSS objects for given list of bsses or ssids. This method will scan for a given timeout and return all of the networks that have a matching ssid or bss. If wait_for_all is true and all networks are not found within the given timeout an empty list will be returned. @param interface: which interface to run iw against @param bsses: a list of BSS strings @param ssids: a list of ssid strings @param timeout_seconds: the amount of time to wait in seconds @param wait_for_all: True to wait for all listed bsses or ssids; False to return if any of the networks were found @returns a list of IwBss collections that contain the given bss or ssid; if the scan is empty or returns an error code None is returned. """ logging.info('Performing a scan with a max timeout of %d seconds.', timeout_seconds) # If the in-progress scan takes more than 30 seconds to # complete it will most likely never complete; abort. # See crbug.com/309148 scan_results = list() try: scan_results = utils.poll_for_condition( condition=lambda: self.scan(interface), timeout=timeout_seconds, sleep_interval=5, # to allow in-progress scans to complete desc='Timed out getting IWBSSes that match desired') except utils.TimeoutError as e: pass if not scan_results: # empty list or None return None # get all IWBSSes from the scan that match any of the desired # ssids or bsses passed in matching_iwbsses = [iwbss for iwbss in scan_results if iwbss.ssid in ssids or iwbss.bss in bsses] if wait_for_all: found_bsses = [iwbss.bss for iwbss in matching_iwbsses] found_ssids = [iwbss.ssid for iwbss in matching_iwbsses] # if an expected bss or ssid was not found, and it was required # by the caller that all expected be found, return empty list if any(bss not in found_bsses for bss in bsses) or any( ssid not in found_ssids for ssid in ssids): return list() return list(matching_iwbsses) def set_antenna_bitmap(self, phy, tx_bitmap, rx_bitmap): """Set antenna chain mask on given phy (radio). This function will set the antennas allowed to use for TX and RX on the |phy| based on the |tx_bitmap| and |rx_bitmap|. This command is only allowed when the interfaces on the phy are down. @param phy: phy name @param tx_bitmap: bitmap of allowed antennas to use for TX @param rx_bitmap: bitmap of allowed antennas to use for RX """ command = '%s phy %s set antenna %d %d' % (self._command_iw, phy, tx_bitmap, rx_bitmap) self._run(command) def get_event_logger(self): """Create and return a IwEventLogger object. @returns a IwEventLogger object. """ local_file = IW_LOCAL_EVENT_LOG_FILE % (self._log_id) self._log_id += 1 return iw_event_logger.IwEventLogger(self._host, self._command_iw, local_file) def vht_supported(self): """Returns True if VHT is supported; False otherwise.""" result = self._run('%s list' % self._command_iw).stdout if 'VHT Capabilities' in result: return True return False def he_supported(self): """Returns True if HE (802.11ax) is supported; False otherwise.""" result = self._run('%s list' % self._command_iw).stdout if 'HE MAC Capabilities' in result: return True return False def frequency_supported(self, frequency): """Returns True if the given frequency is supported; False otherwise. @param frequency: int Wifi frequency to check if it is supported by DUT. """ phys = self.list_phys() for phy in phys: for band in phy.bands: if frequency in band.frequencies: return True return False def get_fragmentation_threshold(self, phy): """Returns the fragmentation threshold for |phy|. @param phy: phy name """ ret = self._run('%s phy %s info' % (self._command_iw, phy)) frag_regex = r'^\s+Fragmentation threshold:\s+([0-9]+)$' match = re.search(frag_regex, ret.stdout, re.MULTILINE) if match: return int(match.group(1)) return None def get_info(self, phy=None): """ Returns the output of 'iw phy info' for |phy|, or 'iw list' if no phy specified. @param phy: optional string giving the name of the phy @return string stdout of the command run """ if phy and phy not in [iw_phy.name for iw_phy in self.list_phys()]: logging.info('iw could not find phy %s', phy) return None if phy: out = self._run('%s phy %s info' % (self._command_iw, phy)).stdout else: out = self._run('%s list' % self._command_iw).stdout if 'Wiphy' in out: return out return None