1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import collections 6import logging 7import re 8import time 9 10from autotest_lib.client.common_lib import error 11from autotest_lib.client.common_lib.cros.network import xmlrpc_datatypes 12 13 14# Used to represent stations we parse out of scan results. 15Station = collections.namedtuple('Station', 16 ['bssid', 'frequency', 'signal', 'ssid']) 17 18class WpaCliProxy(object): 19 """Interacts with a DUT through wpa_cli rather than shill.""" 20 21 SCANNING_INTERVAL_SECONDS = 5 22 POLLING_INTERVAL_SECONDS = 0.5 23 # From wpa_supplicant.c:wpa_supplicant_state_txt() 24 WPA_SUPPLICANT_ASSOCIATING_STATES = ( 25 'AUTHENTICATING', 26 'ASSOCIATING', 27 'ASSOCIATED', 28 '4WAY_HANDSHAKE', 29 'GROUP_HANDSHAKE') 30 WPA_SUPPLICANT_ASSOCIATED_STATES = ( 31 'COMPLETED',) 32 ANDROID_CMD_FORMAT = '/system/bin/wpa_cli IFNAME={0[ifname]} {0[cmd]}' 33 BRILLO_CMD_FORMAT = 'su system /system/bin/wpa_cli -i{0[ifname]} -p/data/misc/wifi/sockets {0[cmd]}' 34 CROS_CMD_FORMAT = 'su wpa -s /bin/bash -c "/usr/bin/wpa_cli {0[cmd]}"' 35 CAST_CMD_FORMAT = '/system/bin/wpa_cli {0[cmd]}' 36 37 38 def __init__(self, host, wifi_if): 39 self._host = host 40 self._wifi_if = wifi_if 41 self._created_networks = {} 42 # TODO(wiley) Hardcoding this IFNAME prefix makes some big assumptions. 43 # we'll need to discover this parameter as it becomes more 44 # generally useful. 45 if host.get_os_type() == 'android': 46 self._wpa_cli_cmd_format = self.ANDROID_CMD_FORMAT 47 elif host.get_os_type() == 'brillo': 48 self._wpa_cli_cmd_format = self.BRILLO_CMD_FORMAT 49 elif host.get_os_type() == 'cros': 50 self._wpa_cli_cmd_format = self.CROS_CMD_FORMAT 51 elif host.get_os_type() == 'cast_os': 52 self._wpa_cli_cmd_format = self.CAST_CMD_FORMAT 53 54 55 def _add_network(self, ssid): 56 """ 57 Add a wpa_supplicant network for ssid. 58 59 @param ssid string: name of network to add. 60 @return int network id of added network. 61 62 """ 63 add_result = self.run_wpa_cli_cmd('add_network', check_result=False) 64 network_id = int(add_result.stdout.splitlines()[-1]) 65 self.run_wpa_cli_cmd('set_network %d ssid \\"%s\\"' % 66 (network_id, ssid)) 67 self._created_networks[ssid] = network_id 68 logging.debug('Added network %s=%d', ssid, network_id) 69 return network_id 70 71 72 def run_wpa_cli_cmd(self, command, check_result=True): 73 """ 74 Run a wpa_cli command and optionally check the result. 75 76 @param command string: suffix of a command to be prefixed with 77 an appropriate wpa_cli for this host. 78 @param check_result bool: True iff we want to check that the 79 command comes back with an 'OK' response. 80 @return result object returned by host.run. 81 82 """ 83 cmd = self._wpa_cli_cmd_format.format( 84 {'ifname' : self._wifi_if, 'cmd' : command}) 85 result = self._host.run(cmd) 86 if check_result and not result.stdout.strip().endswith('OK'): 87 raise error.TestFail('wpa_cli command failed: %s' % command) 88 89 return result 90 91 92 def _get_status_dict(self): 93 """ 94 Gets the status output for a WiFi interface. 95 96 Get the output of wpa_cli status. This summarizes what wpa_supplicant 97 is doing with respect to the WiFi interface. 98 99 Example output: 100 101 Using interface 'wlan0' 102 wpa_state=INACTIVE 103 p2p_device_address=32:76:6f:f2:a6:c4 104 address=30:76:6f:f2:a6:c4 105 106 @return dict of key/value pairs parsed from output using = as divider. 107 108 """ 109 status_result = self.run_wpa_cli_cmd('status', check_result=False) 110 return dict([line.strip().split('=', 1) 111 for line in status_result.stdout.splitlines() 112 if line.find('=') > 0]) 113 114 115 def _is_associating_or_associated(self): 116 """@return True if the DUT is assocating or associated with a BSS.""" 117 state = self._get_status_dict().get('wpa_state', None) 118 return state in (self.WPA_SUPPLICANT_ASSOCIATING_STATES + 119 self.WPA_SUPPLICANT_ASSOCIATED_STATES) 120 121 122 def _is_associated(self, ssid): 123 """ 124 Check if the DUT is associated to a given SSID. 125 126 @param ssid string: SSID of the network we're concerned about. 127 @return True if we're associated with the specified SSID. 128 129 """ 130 status_dict = self._get_status_dict() 131 return (status_dict.get('ssid', None) == ssid and 132 status_dict.get('wpa_state', None) in 133 self.WPA_SUPPLICANT_ASSOCIATED_STATES) 134 135 136 def _is_connected(self, ssid): 137 """ 138 Check that we're connected to |ssid| and have an IP address. 139 140 @param ssid string: SSID of the network we're concerned about. 141 @return True if we have an IP and we're associated with |ssid|. 142 143 """ 144 status_dict = self._get_status_dict() 145 return (status_dict.get('ssid', None) == ssid and 146 status_dict.get('ip_address', None)) 147 148 149 def _wait_until(self, value_check, timeout_seconds): 150 """ 151 Call a function repeatedly until we time out. 152 153 Call value_check() every POLLING_INTERVAL_SECONDS seconds 154 until |timeout_seconds| have passed. Return whether 155 value_check() returned a True value and the time we spent in this 156 function. 157 158 @param timeout_seconds numeric: number of seconds to wait. 159 @return a tuple (success, duration_seconds) where success is a boolean 160 and duration is a float. 161 162 """ 163 start_time = time.time() 164 while time.time() - start_time < timeout_seconds: 165 duration = time.time() - start_time 166 if value_check(): 167 return (True, duration) 168 169 time.sleep(self.POLLING_INTERVAL_SECONDS) 170 duration = time.time() - start_time 171 return (False, duration) 172 173 174 def clean_profiles(self): 175 """Remove state associated with past networks we've connected to.""" 176 # list_networks output looks like: 177 # Using interface 'wlan0'^M 178 # network id / ssid / bssid / flags^M 179 # 0 SimpleConnect_jstja_ch1 any [DISABLED]^M 180 # 1 SimpleConnect_gjji2_ch6 any [DISABLED]^M 181 # 2 SimpleConnect_xe9d1_ch11 any [DISABLED]^M 182 list_networks_result = self.run_wpa_cli_cmd( 183 'list_networks', check_result=False) 184 start_parsing = False 185 for line in list_networks_result.stdout.splitlines(): 186 if not start_parsing: 187 if line.startswith('network id'): 188 start_parsing = True 189 continue 190 191 network_id = int(line.split()[0]) 192 self.run_wpa_cli_cmd('remove_network %d' % network_id) 193 self._created_networks = {} 194 195 196 def create_profile(self, _): 197 """ 198 This is a no op, since we don't have profiles. 199 200 @param _ ignored. 201 202 """ 203 logging.info('Skipping create_profile on %s', self.__class__.__name__) 204 205 206 def pop_profile(self, _): 207 """ 208 This is a no op, since we don't have profiles. 209 210 @param _ ignored. 211 212 """ 213 logging.info('Skipping pop_profile on %s', self.__class__.__name__) 214 215 216 def push_profile(self, _): 217 """ 218 This is a no op, since we don't have profiles. 219 220 @param _ ignored. 221 222 """ 223 logging.info('Skipping push_profile on %s', self.__class__.__name__) 224 225 226 def remove_profile(self, _): 227 """ 228 This is a no op, since we don't have profiles. 229 230 @param _ ignored. 231 232 """ 233 logging.info('Skipping remove_profile on %s', self.__class__.__name__) 234 235 236 def init_test_network_state(self): 237 """Create a clean slate for tests with respect to remembered networks. 238 239 For wpa_cli hosts, this means removing all remembered networks. 240 241 @return True iff operation succeeded, False otherwise. 242 243 """ 244 self.clean_profiles() 245 return True 246 247 248 def connect_wifi(self, assoc_params): 249 """ 250 Connect to the WiFi network described by AssociationParameters. 251 252 @param assoc_params AssociationParameters object. 253 @return serialized AssociationResult object. 254 255 """ 256 logging.debug('connect_wifi()') 257 # Ouptut should look like: 258 # Using interface 'wlan0' 259 # 0 260 assoc_result = xmlrpc_datatypes.AssociationResult() 261 network_id = self._add_network(assoc_params.ssid) 262 if assoc_params.is_hidden: 263 self.run_wpa_cli_cmd('set_network %d %s %s' % 264 (network_id, 'scan_ssid', '1')) 265 266 sec_config = assoc_params.security_config 267 for field, value in sec_config.get_wpa_cli_properties().iteritems(): 268 self.run_wpa_cli_cmd('set_network %d %s %s' % 269 (network_id, field, value)) 270 self.run_wpa_cli_cmd('select_network %d' % network_id) 271 272 # Wait for an appropriate BSS to appear in scan results. 273 scan_results_pattern = '\t'.join(['([0-9a-f:]{17})', # BSSID 274 '([0-9]+)', # Frequency 275 '(-[0-9]+)', # Signal level 276 '(.*)', # Encryption types 277 '(.*)']) # SSID 278 last_scan_time = -1.0 279 start_time = time.time() 280 while time.time() - start_time < assoc_params.discovery_timeout: 281 assoc_result.discovery_time = time.time() - start_time 282 if self._is_associating_or_associated(): 283 # Internally, wpa_supplicant writes its scan_results response 284 # to a 4kb buffer. When there are many BSS's, the buffer fills 285 # up, and we'll never see the BSS we care about in some cases. 286 break 287 288 scan_result = self.run_wpa_cli_cmd('scan_results', 289 check_result=False) 290 found_stations = [] 291 for line in scan_result.stdout.strip().splitlines(): 292 match = re.match(scan_results_pattern, line) 293 if match is None: 294 continue 295 found_stations.append( 296 Station(bssid=match.group(1), frequency=match.group(2), 297 signal=match.group(3), ssid=match.group(5))) 298 logging.debug('Found stations: %r', 299 [station.ssid for station in found_stations]) 300 if [station for station in found_stations 301 if station.ssid == assoc_params.ssid]: 302 break 303 304 if time.time() - last_scan_time > self.SCANNING_INTERVAL_SECONDS: 305 # Sometimes this might fail with a FAIL-BUSY if the previous 306 # scan hasn't finished. 307 scan_result = self.run_wpa_cli_cmd('scan', check_result=False) 308 if scan_result.stdout.strip().endswith('OK'): 309 last_scan_time = time.time() 310 time.sleep(self.POLLING_INTERVAL_SECONDS) 311 else: 312 assoc_result.failure_reason = 'Discovery timed out' 313 return assoc_result.serialize() 314 315 # Wait on association to finish. 316 success, assoc_result.association_time = self._wait_until( 317 lambda: self._is_associated(assoc_params.ssid), 318 assoc_params.association_timeout) 319 if not success: 320 assoc_result.failure_reason = 'Association timed out' 321 return assoc_result.serialize() 322 323 # Then wait for ip configuration to finish. 324 success, assoc_result.configuration_time = self._wait_until( 325 lambda: self._is_connected(assoc_params.ssid), 326 assoc_params.configuration_timeout) 327 if not success: 328 assoc_result.failure_reason = 'DHCP negotiation timed out' 329 return assoc_result.serialize() 330 331 assoc_result.success = True 332 logging.info('Connected to %s', assoc_params.ssid) 333 return assoc_result.serialize() 334 335 336 def disconnect(self, ssid): 337 """ 338 Disconnect from a WiFi network named |ssid|. 339 340 @param ssid string: name of network to disable in wpa_supplicant. 341 342 """ 343 logging.debug('disconnect()') 344 if ssid not in self._created_networks: 345 return False 346 self.run_wpa_cli_cmd('disable_network %d' % 347 self._created_networks[ssid]) 348 return True 349 350 351 def delete_entries_for_ssid(self, ssid): 352 """Delete a profile entry. 353 354 @param ssid string of WiFi service for which to delete entries. 355 @return True on success, False otherwise. 356 """ 357 return self.disconnect(ssid) 358 359 360 def set_device_enabled(self, wifi_interface, enabled): 361 """Enable or disable the WiFi device. 362 363 @param wifi_interface: string name of interface being modified. 364 @param enabled: boolean; true if this device should be enabled, 365 false if this device should be disabled. 366 @return True if it worked; false, otherwise 367 368 """ 369 return False 370 371 372 def sync_time_to(self, epoch_seconds): 373 """ 374 Sync time on the DUT to |epoch_seconds| from the epoch. 375 376 @param epoch_seconds float: number of seconds since the epoch. 377 378 """ 379 # This will claim to fail, but will work anyway. 380 self._host.run('date -u %f' % epoch_seconds, ignore_status=True) 381