1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import time 18 19from acts import logger 20from acts import signals 21from acts import utils 22 23TIME_TO_SLEEP_BETWEEN_RETRIES = 1 24TIME_TO_WAIT_FOR_COUNTRY_CODE = 10 25 26 27class WlanControllerError(signals.ControllerError): 28 pass 29 30 31class WlanController: 32 """Contains methods related to wlan core, to be used in FuchsiaDevice object""" 33 def __init__(self, fuchsia_device): 34 self.device = fuchsia_device 35 self.log = logger.create_tagged_trace_logger( 36 'WlanController for FuchsiaDevice | %s' % self.device.ip) 37 38 # TODO(70501): Wrap wlan_lib functions and setup from FuchsiaDevice here 39 # (similar to how WlanPolicyController does it) to prevent FuchsiaDevice 40 # from growing too large. 41 def _configure_wlan(self): 42 pass 43 44 def _deconfigure_wlan(self): 45 pass 46 47 def get_wlan_client_interface_id(self): 48 """ Returns the wlan interface id of the first found wlan client 49 interface. 50 """ 51 # Retrieve wlan ifaces 52 response = self.device.wlan_lib.wlanGetIfaceIdList() 53 if response.get('error'): 54 raise WlanControllerError('Failed to get WLAN iface ids: %s' % 55 response['error']) 56 57 # If iface has role 'client', retunr id 58 iface_ids = response.get('result', []) 59 for id in iface_ids: 60 query_response = self.device.wlan_lib.wlanQueryInterface(id) 61 if query_response.get('error'): 62 raise WlanControllerError( 63 'Failed to query wlan iface id %s: %s' % 64 (id, query_response['error'])) 65 66 if query_response['result'].get('role').lower() == 'client': 67 return id 68 69 return None 70 71 def get_wlan_interface_mac_addr_from_id(self, iface_id): 72 """ Retrieves the mac address of a wlan iface, using the wlan iface 73 id. 74 75 Args: 76 iface_id: int, wlan iface id 77 78 Returns: 79 string, mac address of wlan iface 80 """ 81 query_response = self.device.wlan_lib.wlanQueryInterface(iface_id) 82 if query_response.get('error'): 83 raise WlanControllerError('Failed to query wlan iface id %s: %s' % 84 (iface_id, query_response['error'])) 85 return utils.mac_address_list_to_str( 86 query_response['result'].get('mac_addr')) 87 88 def get_wlan_interface_name(self, mac_addr=None): 89 """ Retrieves name (netstack) of wlan interface using the mac address. If 90 mac address is not provided, returns the name of the first found wlan 91 client (as opposed to AP) interface. 92 93 Args: 94 mac_addr: optional, string or list of decimal octets representing 95 the mac addr of the wlan interface. e.g. "44:07:0b:50:c1:ef" or 96 [68, 7, 11, 80, 193, 239] 97 98 Returns: 99 string, name of wlan interface 100 """ 101 # Default to first found client wlan interface 102 if not mac_addr: 103 client_iface_id = self.get_wlan_client_interface_id() 104 mac_addr = self.get_wlan_interface_mac_addr_from_id( 105 client_iface_id) 106 107 # Convert mac addr to list, for comparison 108 if type(mac_addr) == str: 109 mac_addr = utils.mac_address_str_to_list(mac_addr) 110 111 err = self.device.netstack_lib.init().get('error') 112 if err: 113 raise WlanControllerError('Failed to init netstack_lib: %s' % err) 114 115 # Retrieve net ifaces 116 response = self.device.netstack_lib.netstackListInterfaces() 117 if response.get('error'): 118 raise WlanControllerError( 119 'Failed to get network interfaces list: %s' % 120 response['error']) 121 122 # Find iface with matching mac addr, and return name 123 for iface_info in response['result']: 124 if iface_info['mac'] == mac_addr: 125 return iface_info['name'] 126 return None 127 128 def set_country_code(self, country_code): 129 """Sets country code through the regulatory region service and waits 130 for the code to be applied to WLAN PHY. 131 132 Args: 133 country_code: string, the 2 character country code to set 134 135 Raises: 136 EnvironmentError - failure to get/set regulatory region 137 ConnectionError - failure to query PHYs 138 """ 139 self.log.info('Setting DUT country code to %s' % country_code) 140 country_code_response = self.device.regulatory_region_lib.setRegion( 141 country_code) 142 if country_code_response.get('error'): 143 raise EnvironmentError( 144 'Failed to set country code (%s) on DUT. Error: %s' % 145 (country_code, country_code_response['error'])) 146 147 self.log.info('Verifying DUT country code was correctly set to %s.' % 148 country_code) 149 phy_ids_response = self.device.wlan_lib.wlanPhyIdList() 150 if phy_ids_response.get('error'): 151 raise ConnectionError('Failed to get phy ids from DUT. Error: %s' % 152 (country_code, phy_ids_response['error'])) 153 154 end_time = time.time() + TIME_TO_WAIT_FOR_COUNTRY_CODE 155 while time.time() < end_time: 156 for id in phy_ids_response['result']: 157 get_country_response = self.device.wlan_lib.wlanGetCountry(id) 158 if get_country_response.get('error'): 159 raise ConnectionError( 160 'Failed to query PHY ID (%s) for country. Error: %s' % 161 (id, get_country_response['error'])) 162 163 set_code = ''.join([ 164 chr(ascii_char) 165 for ascii_char in get_country_response['result'] 166 ]) 167 if set_code != country_code: 168 self.log.debug( 169 'PHY (id: %s) has incorrect country code set. ' 170 'Expected: %s, Got: %s' % (id, country_code, set_code)) 171 break 172 else: 173 self.log.info('All PHYs have expected country code (%s)' % 174 country_code) 175 break 176 time.sleep(TIME_TO_SLEEP_BETWEEN_RETRIES) 177 else: 178 raise EnvironmentError('Failed to set DUT country code to %s.' % 179 country_code) 180