1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import logging 7import re 8import time 9 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib import utils 12from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 13 14 15# Used to represent stations we parse out of scan results. 16Station = collections.namedtuple('Station', 17 ['bssid', 'frequency', 'signal', 'ssid']) 18 19class WpaCliProxy(object): 20 """Interacts with a DUT through wpa_cli rather than shill.""" 21 22 SCANNING_INTERVAL_SECONDS = 5 23 POLLING_INTERVAL_SECONDS = 0.5 24 # From wpa_supplicant.c:wpa_supplicant_state_txt() 25 WPA_SUPPLICANT_ASSOCIATING_STATES = ( 26 'AUTHENTICATING', 27 'ASSOCIATING', 28 'ASSOCIATED', 29 '4WAY_HANDSHAKE', 30 'GROUP_HANDSHAKE') 31 WPA_SUPPLICANT_ASSOCIATED_STATES = ( 32 'COMPLETED',) 33 ANDROID_CMD_FORMAT = '/system/bin/wpa_cli IFNAME={0[ifname]} {0[cmd]}' 34 BRILLO_CMD_FORMAT = 'su system /system/bin/wpa_cli -i{0[ifname]} -p/data/misc/wifi/sockets {0[cmd]}' 35 CROS_CMD_FORMAT = ('su wpa -s /bin/bash ' 36 '-c "/usr/bin/wpa_cli -i {0[ifname]} {0[cmd]}"') 37 CAST_CMD_FORMAT = '/system/bin/wpa_cli -i {0[ifname]} {0[cmd]}' 38 39 40 def __init__(self, host, wifi_if): 41 self._host = host 42 self._wifi_if = wifi_if 43 self._created_networks = {} 44 # TODO(wiley) Hardcoding this IFNAME prefix makes some big assumptions. 45 # we'll need to discover this parameter as it becomes more 46 # generally useful. 47 if host.get_os_type() == 'android': 48 self._wpa_cli_cmd_format = self.ANDROID_CMD_FORMAT 49 elif host.get_os_type() == 'brillo': 50 self._wpa_cli_cmd_format = self.BRILLO_CMD_FORMAT 51 elif host.get_os_type() == 'cros': 52 self._wpa_cli_cmd_format = self.CROS_CMD_FORMAT 53 elif host.get_os_type() == 'cast_os': 54 self._wpa_cli_cmd_format = self.CAST_CMD_FORMAT 55 56 57 def _add_network(self, ssid): 58 """ 59 Add a wpa_supplicant network for ssid. 60 61 @param ssid string: name of network to add. 62 @return int network id of added network. 63 64 """ 65 add_result = self.run_wpa_cli_cmd('add_network', check_result=False) 66 network_id = int(add_result.stdout.splitlines()[-1]) 67 self.run_wpa_cli_cmd('set_network %d ssid \\"%s\\"' % 68 (network_id, ssid)) 69 self._created_networks[ssid] = network_id 70 logging.debug('Added network %s=%d', ssid, network_id) 71 return network_id 72 73 74 def run_wpa_cli_cmd(self, command, check_result=True): 75 """ 76 Run a wpa_cli command and optionally check the result. 77 78 Note: if you're using this function to do things like initiating scans, 79 consider initating those through Shill instead, to avoid collisions. 80 81 @param command string: suffix of a command to be prefixed with 82 an appropriate wpa_cli for this host. 83 @param check_result bool: True iff we want to check that the 84 command comes back with an 'OK' response. 85 @return result object returned by host.run. 86 87 """ 88 cmd = self._wpa_cli_cmd_format.format( 89 {'ifname' : self._wifi_if, 'cmd' : command}) 90 result = self._host.run(cmd) 91 if check_result and not result.stdout.strip().endswith('OK'): 92 raise error.TestFail('wpa_cli command failed: %s' % command) 93 94 return result 95 96 97 def _get_status_dict(self): 98 """ 99 Gets the status output for a WiFi interface. 100 101 Get the output of wpa_cli status. This summarizes what wpa_supplicant 102 is doing with respect to the WiFi interface. 103 104 Example output: 105 106 Using interface 'wlan0' 107 wpa_state=INACTIVE 108 p2p_device_address=32:76:6f:f2:a6:c4 109 address=30:76:6f:f2:a6:c4 110 111 @return dict of key/value pairs parsed from output using = as divider. 112 113 """ 114 status_result = self.run_wpa_cli_cmd('status', check_result=False) 115 return dict([line.strip().split('=', 1) 116 for line in status_result.stdout.splitlines() 117 if line.find('=') > 0]) 118 119 120 def _is_associating_or_associated(self): 121 """@return True if the DUT is assocating or associated with a BSS.""" 122 state = self._get_status_dict().get('wpa_state', None) 123 return state in (self.WPA_SUPPLICANT_ASSOCIATING_STATES + 124 self.WPA_SUPPLICANT_ASSOCIATED_STATES) 125 126 127 def _is_associated(self, ssid): 128 """ 129 Check if the DUT is associated to a given SSID. 130 131 @param ssid string: SSID of the network we're concerned about. 132 @return True if we're associated with the specified SSID. 133 134 """ 135 status_dict = self._get_status_dict() 136 return (status_dict.get('ssid', None) == ssid and 137 status_dict.get('wpa_state', None) in 138 self.WPA_SUPPLICANT_ASSOCIATED_STATES) 139 140 141 def _is_connected(self, ssid): 142 """ 143 Check that we're connected to |ssid| and have an IP address. 144 145 @param ssid string: SSID of the network we're concerned about. 146 @return True if we have an IP and we're associated with |ssid|. 147 148 """ 149 status_dict = self._get_status_dict() 150 return (status_dict.get('ssid', None) == ssid and 151 status_dict.get('ip_address', None)) 152 153 154 def clean_profiles(self): 155 """Remove state associated with past networks we've connected to.""" 156 # list_networks output looks like: 157 # Using interface 'wlan0'^M 158 # network id / ssid / bssid / flags^M 159 # 0 SimpleConnect_jstja_ch1 any [DISABLED]^M 160 # 1 SimpleConnect_gjji2_ch6 any [DISABLED]^M 161 # 2 SimpleConnect_xe9d1_ch11 any [DISABLED]^M 162 list_networks_result = self.run_wpa_cli_cmd( 163 'list_networks', check_result=False) 164 start_parsing = False 165 for line in list_networks_result.stdout.splitlines(): 166 if not start_parsing: 167 if line.startswith('network id'): 168 start_parsing = True 169 continue 170 171 network_id = int(line.split()[0]) 172 self.run_wpa_cli_cmd('remove_network %d' % network_id) 173 self._created_networks = {} 174 175 176 def create_profile(self, _): 177 """ 178 This is a no op, since we don't have profiles. 179 180 @param _ ignored. 181 182 """ 183 logging.info('Skipping create_profile on %s', self.__class__.__name__) 184 185 186 def pop_profile(self, _): 187 """ 188 This is a no op, since we don't have profiles. 189 190 @param _ ignored. 191 192 """ 193 logging.info('Skipping pop_profile on %s', self.__class__.__name__) 194 195 196 def push_profile(self, _): 197 """ 198 This is a no op, since we don't have profiles. 199 200 @param _ ignored. 201 202 """ 203 logging.info('Skipping push_profile on %s', self.__class__.__name__) 204 205 206 def remove_profile(self, _): 207 """ 208 This is a no op, since we don't have profiles. 209 210 @param _ ignored. 211 212 """ 213 logging.info('Skipping remove_profile on %s', self.__class__.__name__) 214 215 216 def init_test_network_state(self): 217 """Create a clean slate for tests with respect to remembered networks. 218 219 For wpa_cli hosts, this means removing all remembered networks. 220 221 @return True iff operation succeeded, False otherwise. 222 223 """ 224 self.clean_profiles() 225 return True 226 227 228 def connect_wifi(self, assoc_params): 229 """ 230 Connect to the WiFi network described by AssociationParameters. 231 232 @param assoc_params AssociationParameters object. 233 @return serialized AssociationResult object. 234 235 """ 236 logging.debug('connect_wifi()') 237 # Ouptut should look like: 238 # Using interface 'wlan0' 239 # 0 240 assoc_result = xmlrpc_datatypes.AssociationResult() 241 network_id = self._add_network(assoc_params.ssid) 242 if assoc_params.is_hidden: 243 self.run_wpa_cli_cmd('set_network %d %s %s' % 244 (network_id, 'scan_ssid', '1')) 245 246 sec_config = assoc_params.security_config 247 for field, value in sec_config.get_wpa_cli_properties().iteritems(): 248 self.run_wpa_cli_cmd('set_network %d %s %s' % 249 (network_id, field, value)) 250 self.run_wpa_cli_cmd('select_network %d' % network_id) 251 252 # Wait for an appropriate BSS to appear in scan results. 253 scan_results_pattern = '\t'.join(['([0-9a-f:]{17})', # BSSID 254 '([0-9]+)', # Frequency 255 '(-[0-9]+)', # Signal level 256 '(.*)', # Encryption types 257 '(.*)']) # SSID 258 last_scan_time = -1.0 259 start_time = time.time() 260 while time.time() - start_time < assoc_params.discovery_timeout: 261 assoc_result.discovery_time = time.time() - start_time 262 if self._is_associating_or_associated(): 263 # Internally, wpa_supplicant writes its scan_results response 264 # to a 4kb buffer. When there are many BSS's, the buffer fills 265 # up, and we'll never see the BSS we care about in some cases. 266 break 267 268 scan_result = self.run_wpa_cli_cmd('scan_results', 269 check_result=False) 270 found_stations = [] 271 for line in scan_result.stdout.strip().splitlines(): 272 match = re.match(scan_results_pattern, line) 273 if match is None: 274 continue 275 found_stations.append( 276 Station(bssid=match.group(1), frequency=match.group(2), 277 signal=match.group(3), ssid=match.group(5))) 278 logging.debug('Found stations: %r', 279 [station.ssid for station in found_stations]) 280 if [station for station in found_stations 281 if station.ssid == assoc_params.ssid]: 282 break 283 284 if time.time() - last_scan_time > self.SCANNING_INTERVAL_SECONDS: 285 # Sometimes this might fail with a FAIL-BUSY if the previous 286 # scan hasn't finished. 287 scan_result = self.run_wpa_cli_cmd('scan', check_result=False) 288 if scan_result.stdout.strip().endswith('OK'): 289 last_scan_time = time.time() 290 time.sleep(self.POLLING_INTERVAL_SECONDS) 291 else: 292 assoc_result.failure_reason = 'Discovery timed out' 293 return assoc_result.serialize() 294 295 # Wait on association to finish. 296 start_time = time.time() 297 success = utils.poll_for_condition( 298 condition=lambda: self._is_associated(assoc_params.ssid), 299 timeout=assoc_params.association_timeout, 300 sleep_interval=self.POLLING_INTERVAL_SECONDS, 301 desc='Wait on association to finish') 302 assoc_result.association_time = time.time() - start_time 303 if not success: 304 assoc_result.failure_reason = 'Association timed out' 305 return assoc_result.serialize() 306 307 # Then wait for ip configuration to finish. 308 start_time = time.time() 309 success = utils.poll_for_condition( 310 condition=lambda: self._is_connected(assoc_params.ssid), 311 timeout=assoc_params.configuration_timeout, 312 sleep_interval=self.POLLING_INTERVAL_SECONDS, 313 desc='Wait for ip configuration to finish') 314 assoc_result.configuration_time = time.time() - start_time 315 if not success: 316 assoc_result.failure_reason = 'DHCP negotiation timed out' 317 return assoc_result.serialize() 318 319 assoc_result.success = True 320 logging.info('Connected to %s', assoc_params.ssid) 321 return assoc_result.serialize() 322 323 324 def disconnect(self, ssid): 325 """ 326 Disconnect from a WiFi network named |ssid|. 327 328 @param ssid string: name of network to disable in wpa_supplicant. 329 330 """ 331 logging.debug('disconnect()') 332 if ssid not in self._created_networks: 333 return False 334 self.run_wpa_cli_cmd('disable_network %d' % 335 self._created_networks[ssid]) 336 return True 337 338 339 def delete_entries_for_ssid(self, ssid): 340 """Delete a profile entry. 341 342 @param ssid string of WiFi service for which to delete entries. 343 @return True on success, False otherwise. 344 """ 345 return self.disconnect(ssid) 346 347 348 def set_device_enabled(self, wifi_interface, enabled): 349 """Enable or disable the WiFi device. 350 351 @param wifi_interface: string name of interface being modified. 352 @param enabled: boolean; true if this device should be enabled, 353 false if this device should be disabled. 354 @return True if it worked; false, otherwise 355 356 """ 357 return False 358 359 360 def sync_time_to(self, epoch_seconds): 361 """ 362 Sync time on the DUT to |epoch_seconds| from the epoch. 363 364 @param epoch_seconds float: number of seconds since the epoch. 365 366 """ 367 # This will claim to fail, but will work anyway. 368 self._host.run('date -u %f' % epoch_seconds, ignore_status=True) 369