1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import dbus 6import logging 7import time 8 9from autotest_lib.client.common_lib import utils 10from autotest_lib.client.cros.networking import shill_proxy 11 12 13class WifiProxy(shill_proxy.ShillProxy): 14 """Wrapper around shill dbus interface used by wifi tests.""" 15 16 17 def set_logging_for_wifi_test(self): 18 """Set the logging in shill for a test of wifi technology. 19 20 Set the log level to |ShillProxy.LOG_LEVEL_FOR_TEST| and the log scopes 21 to the ones defined in |ShillProxy.LOG_SCOPES_FOR_TEST| for 22 |ShillProxy.TECHNOLOGY_WIFI|. 23 24 """ 25 self.set_logging_for_test(self.TECHNOLOGY_WIFI) 26 27 28 def remove_all_wifi_entries(self): 29 """Iterate over all pushed profiles and remove WiFi entries.""" 30 profiles = self.get_profiles() 31 for profile in profiles: 32 profile_properties = profile.GetProperties(utf8_strings=True) 33 entries = profile_properties[self.PROFILE_PROPERTY_ENTRIES] 34 for entry_id in entries: 35 try: 36 entry = profile.GetEntry(entry_id) 37 except dbus.exceptions.DBusException as e: 38 logging.error('Unable to retrieve entry %s:%r', entry_id, e) 39 continue 40 if entry[self.ENTRY_FIELD_TYPE] == 'wifi': 41 profile.DeleteEntry(entry_id) 42 43 44 def configure_wifi_service(self, ssid, security, security_parameters=None, 45 save_credentials=True, station_type=None, 46 hidden_network=False, guid=None, 47 autoconnect=None): 48 """Configure a WiFi service. 49 50 @param ssid string name of network to connect to. 51 @param security string type of security used in network (e.g. psk) 52 @param security_parameters dict of service property/value pairs that 53 make up the credentials and settings for the given security 54 type (e.g. the passphrase for psk security). 55 @param save_credentials bool True if we should save EAP credentials. 56 @param station_type string one of SUPPORTED_WIFI_STATION_TYPES. 57 @param hidden_network bool True when the SSID is not broadcasted. 58 @param guid string unique identifier for network. 59 @param autoconnect bool or None. None indicates that this should not 60 be set one way or the other, while a boolean indicates a desired 61 value. 62 63 """ 64 # |mode| is derived from the station type we're attempting to join. It 65 # does not refer to the 802.11x (802.11a/b/g/n) type. It refers to a 66 # shill connection mode. 67 mode = self.SUPPORTED_WIFI_STATION_TYPES[station_type] 68 69 if security_parameters is None: 70 security_parameters = {} 71 72 config_params = {self.SERVICE_PROPERTY_TYPE: 'wifi', 73 self.SERVICE_PROPERTY_HIDDEN: hidden_network, 74 self.SERVICE_PROPERTY_SSID: ssid, 75 self.SERVICE_PROPERTY_SECURITY_CLASS: security, 76 self.SERVICE_PROPERTY_MODE: mode} 77 if autoconnect is not None: 78 config_params[self.SERVICE_PROPERTY_AUTOCONNECT] = autoconnect 79 config_params.update(security_parameters) 80 if guid is not None: 81 config_params[self.SERVICE_PROPERTY_GUID] = guid 82 try: 83 self.configure_service(config_params) 84 except dbus.exceptions.DBusException as e: 85 logging.error('Caught an error while configuring a WiFi ' 86 'service: %r', e) 87 return False 88 89 logging.info('Configured service: %s', ssid) 90 return True 91 92 93 def connect_to_wifi_network(self, 94 ssid, 95 security, 96 security_parameters, 97 save_credentials, 98 station_type=None, 99 hidden_network=False, 100 guid=None, 101 autoconnect=None, 102 discovery_timeout_seconds=15, 103 association_timeout_seconds=15, 104 configuration_timeout_seconds=15): 105 """ 106 Connect to a WiFi network with the given association parameters. 107 108 @param ssid string name of network to connect to. 109 @param security string type of security used in network (e.g. psk) 110 @param security_parameters dict of service property/value pairs that 111 make up the credentials and settings for the given security 112 type (e.g. the passphrase for psk security). 113 @param save_credentials bool True if we should save EAP credentials. 114 @param station_type string one of SUPPORTED_WIFI_STATION_TYPES. 115 @param hidden_network bool True when the SSID is not broadcasted. 116 @param guid string unique identifier for network. 117 @param discovery_timeout_seconds float timeout for service discovery. 118 @param association_timeout_seconds float timeout for service 119 association. 120 @param configuration_timeout_seconds float timeout for DHCP 121 negotiations. 122 @param autoconnect: bool or None. None indicates that this should not 123 be set one way or the other, while a boolean indicates a desired 124 value. 125 @return (successful, discovery_time, association_time, 126 configuration_time, reason) 127 where successful is True iff the operation succeeded, *_time is 128 the time spent waiting for each transition, and reason is a string 129 which may contain a meaningful description of failures. 130 131 """ 132 logging.info('Attempting to connect to %s', ssid) 133 start_time = time.time() 134 discovery_time = -1.0 135 association_time = -1.0 136 configuration_time = -1.0 137 if station_type not in self.SUPPORTED_WIFI_STATION_TYPES: 138 return (False, discovery_time, association_time, 139 configuration_time, 140 'FAIL(Invalid station type specified.)') 141 142 # |mode| is derived from the station type we're attempting to join. It 143 # does not refer to the 802.11x (802.11a/b/g/n) type. It refers to a 144 # shill connection mode. 145 mode = self.SUPPORTED_WIFI_STATION_TYPES[station_type] 146 147 if hidden_network: 148 logging.info('Configuring %s as a hidden network.', ssid) 149 if not self.configure_wifi_service( 150 ssid, security, save_credentials=save_credentials, 151 station_type=station_type, hidden_network=True, 152 autoconnect=autoconnect): 153 return (False, discovery_time, association_time, 154 configuration_time, 155 'FAIL(Failed to configure hidden SSID)') 156 157 logging.info('Configured hidden service: %s', ssid) 158 159 160 logging.info('Discovering...') 161 discovery_params = {self.SERVICE_PROPERTY_TYPE: 'wifi', 162 self.SERVICE_PROPERTY_NAME: ssid, 163 self.SERVICE_PROPERTY_SECURITY_CLASS: security, 164 self.SERVICE_PROPERTY_MODE: mode} 165 while time.time() - start_time < discovery_timeout_seconds: 166 discovery_time = time.time() - start_time 167 service_object = self.find_matching_service(discovery_params) 168 if service_object: 169 try: 170 service_properties = service_object.GetProperties( 171 utf8_strings=True) 172 except dbus.exceptions.DBusException: 173 # This usually means the service handle has become invalid. 174 # Which is sort of like not getting a handle back from 175 # find_matching_service in the first place. 176 continue 177 strength = self.dbus2primitive( 178 service_properties[self.SERVICE_PROPERTY_STRENGTH]) 179 if strength > 0: 180 logging.info('Discovered service: %s. Strength: %r.', 181 ssid, strength) 182 break 183 184 # This is spammy, but shill handles that for us. 185 self.manager.RequestScan('wifi') 186 time.sleep(self.POLLING_INTERVAL_SECONDS) 187 else: 188 return (False, discovery_time, association_time, 189 configuration_time, 'FAIL(Discovery timed out)') 190 191 # At this point, we know |service| is in the service list. Attempt 192 # to connect it, and watch the states roll by. 193 logging.info('Connecting...') 194 try: 195 for service_property, value in security_parameters.iteritems(): 196 service_object.SetProperty(service_property, value) 197 if guid is not None: 198 service_object.SetProperty(self.SERVICE_PROPERTY_GUID, guid) 199 if autoconnect is not None: 200 service_object.SetProperty(self.SERVICE_PROPERTY_AUTOCONNECT, 201 autoconnect) 202 service_object.Connect() 203 logging.info('Called connect on service') 204 except dbus.exceptions.DBusException, e: 205 logging.error('Caught an error while trying to connect: %s', 206 e.get_dbus_message()) 207 return (False, discovery_time, association_time, 208 configuration_time, 'FAIL(Failed to call connect)') 209 210 logging.info('Associating...') 211 result = self.wait_for_property_in( 212 service_object, 213 self.SERVICE_PROPERTY_STATE, 214 self.SERVICE_CONNECTED_STATES + ['configuration'], 215 association_timeout_seconds) 216 (successful, _, association_time) = result 217 if not successful: 218 return (False, discovery_time, association_time, 219 configuration_time, 'FAIL(Association timed out)') 220 221 logging.info('Associated with service: %s', ssid) 222 223 logging.info('Configuring...') 224 result = self.wait_for_property_in( 225 service_object, 226 self.SERVICE_PROPERTY_STATE, 227 self.SERVICE_CONNECTED_STATES, 228 configuration_timeout_seconds) 229 (successful, _, configuration_time) = result 230 if not successful: 231 return (False, discovery_time, association_time, 232 configuration_time, 'FAIL(Configuration timed out)') 233 234 logging.info('Configured service: %s', ssid) 235 236 # Great success! 237 logging.info('Connected to WiFi service.') 238 return (True, discovery_time, association_time, configuration_time, 239 'SUCCESS(Connection successful)') 240 241 242 def disconnect_from_wifi_network(self, ssid, timeout=None): 243 """Disconnect from the specified WiFi network. 244 245 Method will succeed if it observes the specified network in the idle 246 state after calling Disconnect. 247 248 @param ssid string name of network to disconnect. 249 @param timeout float number of seconds to wait for idle. 250 @return tuple(success, duration, reason) where: 251 success is a bool (True on success). 252 duration is a float number of seconds the operation took. 253 reason is a string containing an informative error on failure. 254 255 """ 256 if timeout is None: 257 timeout = self.SERVICE_DISCONNECT_TIMEOUT 258 service_description = {self.SERVICE_PROPERTY_TYPE: 'wifi', 259 self.SERVICE_PROPERTY_NAME: ssid} 260 service = self.find_matching_service(service_description) 261 if service is None: 262 return (False, 263 0.0, 264 'Failed to disconnect from %s, service not found.' % ssid) 265 266 service.Disconnect() 267 result = self.wait_for_property_in(service, 268 self.SERVICE_PROPERTY_STATE, 269 ('idle',), 270 timeout) 271 (successful, final_state, duration) = result 272 message = 'Success.' 273 if not successful: 274 message = ('Failed to disconnect from %s, ' 275 'timed out in state: %s.' % (ssid, final_state)) 276 return (successful, duration, message) 277 278 279 def configure_bgscan(self, interface, method=None, short_interval=None, 280 long_interval=None, signal=None): 281 """Configures bgscan parameters for shill and wpa_supplicant. 282 283 @param interface string name of interface to configure (e.g. 'mlan0'). 284 @param method string bgscan method (e.g. 'none'). 285 @param short_interval int short scanning interval. 286 @param long_interval int normal scanning interval. 287 @param signal int signal threshold. 288 289 """ 290 device = self.find_object('Device', {'Name': interface}) 291 if device is None: 292 logging.error('No device found with name: %s', interface) 293 return False 294 295 attributes = {'ScanInterval': (dbus.UInt16, long_interval), 296 'BgscanMethod': (dbus.String, method), 297 'BgscanShortInterval': (dbus.UInt16, short_interval), 298 'BgscanSignalThreshold': (dbus.Int32, signal)} 299 for k, (type_cast, value) in attributes.iteritems(): 300 if value is None: 301 continue 302 303 # 'default' is defined in: 304 # client/common_lib/cros/network/xmlrpc_datatypes.py 305 # but we don't have access to that file here. 306 if value == 'default': 307 device.ClearProperty(k) 308 else: 309 device.SetProperty(k, type_cast(value)) 310 return True 311 312 313 def get_active_wifi_SSIDs(self): 314 """@return list of string SSIDs with at least one BSS we've scanned.""" 315 properties = self.manager.GetProperties(utf8_strings=True) 316 services = [self.get_dbus_object(self.DBUS_TYPE_SERVICE, path) 317 for path in properties[self.MANAGER_PROPERTY_SERVICES]] 318 wifi_services = [] 319 for service in services: 320 try: 321 service_properties = self.dbus2primitive(service.GetProperties( 322 utf8_strings=True)) 323 except dbus.exceptions.DBusException: 324 pass # Probably the service disappeared before GetProperties(). 325 logging.debug('Considering service with properties: %r', 326 service_properties) 327 service_type = service_properties[self.SERVICE_PROPERTY_TYPE] 328 strength = service_properties[self.SERVICE_PROPERTY_STRENGTH] 329 if service_type == 'wifi' and strength > 0: 330 # Note that this may cause terrible things if the SSID 331 # is not a valid ASCII string. 332 ssid = service_properties[self.SERVICE_PROPERTY_HEX_SSID] 333 logging.info('Found active WiFi service: %s', ssid) 334 wifi_services.append(ssid.decode('hex')) 335 return wifi_services 336 337 338 def wait_for_service_states(self, ssid, states, timeout_seconds): 339 """Wait for a service (ssid) to achieve one of a number of states. 340 341 @param ssid string name of network for whose state we're waiting. 342 @param states tuple states for which to wait. 343 @param timeout_seconds seconds to wait for property to be achieved 344 @return tuple(successful, final_value, duration) 345 where successful is True iff we saw one of |states|, final_value 346 is the final state we saw, and duration is how long we waited to 347 see that value. 348 349 """ 350 discovery_params = {self.SERVICE_PROPERTY_TYPE: 'wifi', 351 self.SERVICE_PROPERTY_NAME: ssid} 352 start_time = time.time() 353 try: 354 # Find a matching service in any state (only_visible=False) to 355 # make it possible to detect the state of services that are not 356 # visible because they're not in a connected state. 357 service_object = utils.poll_for_condition( 358 condition=lambda: self.find_matching_service( 359 discovery_params, only_visible=False), 360 timeout=timeout_seconds, 361 sleep_interval=self.POLLING_INTERVAL_SECONDS, 362 desc='Find a matching service to the discovery params') 363 364 return self.wait_for_property_in( 365 service_object, 366 self.SERVICE_PROPERTY_STATE, 367 states, 368 timeout_seconds - (time.time() - start_time)) 369 370 # poll_for_condition timed out 371 except utils.TimeoutError: 372 logging.error('Timed out waiting for %s states', ssid) 373 return False, 'unknown', timeout_seconds 374