# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import dbus import logging import subprocess import time from autotest_lib.client.cros.networking import shill_proxy class WifiProxy(shill_proxy.ShillProxy): """Wrapper around shill dbus interface used by wifi tests.""" def set_logging_for_wifi_test(self): """Set the logging in shill for a test of wifi technology. Set the log level to |ShillProxy.LOG_LEVEL_FOR_TEST| and the log scopes to the ones defined in |ShillProxy.LOG_SCOPES_FOR_TEST| for |ShillProxy.TECHNOLOGY_WIFI|. """ self.set_logging_for_test(self.TECHNOLOGY_WIFI) def remove_all_wifi_entries(self): """Iterate over all pushed profiles and remove WiFi entries.""" profiles = self.get_profiles() for profile in profiles: profile_properties = profile.GetProperties(utf8_strings=True) entries = profile_properties[self.PROFILE_PROPERTY_ENTRIES] for entry_id in entries: try: entry = profile.GetEntry(entry_id) except dbus.exceptions.DBusException as e: logging.error('Unable to retrieve entry %s', entry_id) continue if entry[self.ENTRY_FIELD_TYPE] == 'wifi': profile.DeleteEntry(entry_id) def configure_wifi_service(self, ssid, security, security_parameters={}, save_credentials=True, station_type=None, hidden_network=False, guid=None, autoconnect=None): """Configure a WiFi service. @param ssid string name of network to connect to. @param security string type of security used in network (e.g. psk) @param security_parameters dict of service property/value pairs that make up the credentials and settings for the given security type (e.g. the passphrase for psk security). @param save_credentials bool True if we should save EAP credentials. @param station_type string one of SUPPORTED_WIFI_STATION_TYPES. @param hidden_network bool True when the SSID is not broadcasted. @param guid string unique identifier for network. @param autoconnect bool or None. None indicates that this should not be set one way or the other, while a boolean indicates a desired value. """ # |mode| is derived from the station type we're attempting to join. It # does not refer to the 802.11x (802.11a/b/g/n) type. It refers to a # shill connection mode. mode = self.SUPPORTED_WIFI_STATION_TYPES[station_type] config_params = {self.SERVICE_PROPERTY_TYPE: 'wifi', self.SERVICE_PROPERTY_HIDDEN: hidden_network, self.SERVICE_PROPERTY_SSID: ssid, self.SERVICE_PROPERTY_SECURITY_CLASS: security, self.SERVICE_PROPERTY_MODE: mode} if autoconnect is not None: config_params[self.SERVICE_PROPERTY_AUTOCONNECT] = autoconnect config_params.update(security_parameters) if guid is not None: config_params[self.SERVICE_PROPERTY_GUID] = guid try: self.configure_service(config_params) except dbus.exceptions.DBusException as e: logging.error('Caught an error while configuring a WiFi ' 'service: %r', e) return False logging.info('Configured service: %s', ssid) return True def connect_to_wifi_network(self, ssid, security, security_parameters, save_credentials, station_type=None, hidden_network=False, guid=None, autoconnect=None, discovery_timeout_seconds=15, association_timeout_seconds=15, configuration_timeout_seconds=15): """ Connect to a WiFi network with the given association parameters. @param ssid string name of network to connect to. @param security string type of security used in network (e.g. psk) @param security_parameters dict of service property/value pairs that make up the credentials and settings for the given security type (e.g. the passphrase for psk security). @param save_credentials bool True if we should save EAP credentials. @param station_type string one of SUPPORTED_WIFI_STATION_TYPES. @param hidden_network bool True when the SSID is not broadcasted. @param guid string unique identifier for network. @param discovery_timeout_seconds float timeout for service discovery. @param association_timeout_seconds float timeout for service association. @param configuration_timeout_seconds float timeout for DHCP negotiations. @param autoconnect: bool or None. None indicates that this should not be set one way or the other, while a boolean indicates a desired value. @return (successful, discovery_time, association_time, configuration_time, reason) where successful is True iff the operation succeeded, *_time is the time spent waiting for each transition, and reason is a string which may contain a meaningful description of failures. """ logging.info('Attempting to connect to %s', ssid) service_proxy = None start_time = time.time() discovery_time = -1.0 association_time = -1.0 configuration_time = -1.0 if station_type not in self.SUPPORTED_WIFI_STATION_TYPES: return (False, discovery_time, association_time, configuration_time, 'FAIL(Invalid station type specified.)') # |mode| is derived from the station type we're attempting to join. It # does not refer to the 802.11x (802.11a/b/g/n) type. It refers to a # shill connection mode. mode = self.SUPPORTED_WIFI_STATION_TYPES[station_type] if hidden_network: logging.info('Configuring %s as a hidden network.', ssid) if not self.configure_wifi_service( ssid, security, save_credentials=save_credentials, station_type=station_type, hidden_network=True, autoconnect=autoconnect): return (False, discovery_time, association_time, configuration_time, 'FAIL(Failed to configure hidden SSID)') logging.info('Configured hidden service: %s', ssid) logging.info('Discovering...') discovery_params = {self.SERVICE_PROPERTY_TYPE: 'wifi', self.SERVICE_PROPERTY_NAME: ssid, self.SERVICE_PROPERTY_SECURITY_CLASS: security, self.SERVICE_PROPERTY_MODE: mode} while time.time() - start_time < discovery_timeout_seconds: discovery_time = time.time() - start_time service_object = self.find_matching_service(discovery_params) if service_object: try: service_properties = service_object.GetProperties( utf8_strings=True) except dbus.exceptions.DBusException: # This usually means the service handle has become invalid. # Which is sort of like not getting a handle back from # find_matching_service in the first place. continue strength = self.dbus2primitive( service_properties[self.SERVICE_PROPERTY_STRENGTH]) if strength > 0: logging.info('Discovered service: %s. Strength: %r.', ssid, strength) break # This is spammy, but shill handles that for us. self.manager.RequestScan('wifi') time.sleep(self.POLLING_INTERVAL_SECONDS) else: return (False, discovery_time, association_time, configuration_time, 'FAIL(Discovery timed out)') # At this point, we know |service| is in the service list. Attempt # to connect it, and watch the states roll by. logging.info('Connecting...') try: for service_property, value in security_parameters.iteritems(): service_object.SetProperty(service_property, value) if guid is not None: service_object.SetProperty(self.SERVICE_PROPERTY_GUID, guid) if autoconnect is not None: service_object.SetProperty(self.SERVICE_PROPERTY_AUTOCONNECT, autoconnect) service_object.Connect() logging.info('Called connect on service') except dbus.exceptions.DBusException, e: logging.error('Caught an error while trying to connect: %s', e.get_dbus_message()) return (False, discovery_time, association_time, configuration_time, 'FAIL(Failed to call connect)') logging.info('Associating...') result = self.wait_for_property_in( service_object, self.SERVICE_PROPERTY_STATE, ('configuration', 'ready', 'portal', 'online'), association_timeout_seconds) (successful, _, association_time) = result if not successful: return (False, discovery_time, association_time, configuration_time, 'FAIL(Association timed out)') logging.info('Associated with service: %s', ssid) logging.info('Configuring...') result = self.wait_for_property_in( service_object, self.SERVICE_PROPERTY_STATE, ('ready', 'portal', 'online'), configuration_timeout_seconds) (successful, _, configuration_time) = result if not successful: return (False, discovery_time, association_time, configuration_time, 'FAIL(Configuration timed out)') logging.info('Configured service: %s', ssid) # Great success! logging.info('Connected to WiFi service.') return (True, discovery_time, association_time, configuration_time, 'SUCCESS(Connection successful)') def disconnect_from_wifi_network(self, ssid, timeout=None): """Disconnect from the specified WiFi network. Method will succeed if it observes the specified network in the idle state after calling Disconnect. @param ssid string name of network to disconnect. @param timeout float number of seconds to wait for idle. @return tuple(success, duration, reason) where: success is a bool (True on success). duration is a float number of seconds the operation took. reason is a string containing an informative error on failure. """ if timeout is None: timeout = self.SERVICE_DISCONNECT_TIMEOUT service_description = {self.SERVICE_PROPERTY_TYPE: 'wifi', self.SERVICE_PROPERTY_NAME: ssid} service = self.find_matching_service(service_description) if service is None: return (False, 0.0, 'Failed to disconnect from %s, service not found.' % ssid) service.Disconnect() result = self.wait_for_property_in(service, self.SERVICE_PROPERTY_STATE, ('idle',), timeout) (successful, final_state, duration) = result message = 'Success.' if not successful: message = ('Failed to disconnect from %s, ' 'timed out in state: %s.' % (ssid, final_state)) return (successful, duration, message) def configure_bgscan(self, interface, method=None, short_interval=None, long_interval=None, signal=None): """Configures bgscan parameters for wpa_supplicant. @param interface string name of interface to configure (e.g. 'mlan0'). @param method string bgscan method (e.g. 'none'). @param short_interval int short scanning interval. @param long_interval int normal scanning interval. @param signal int signal threshold. """ device = self.find_object('Device', {'Name': interface}) if device is None: logging.error('No device found with name: %s', interface) return False attributes = {'ScanInterval': (dbus.UInt16, long_interval), 'BgscanMethod': (dbus.String, method), 'BgscanShortInterval': (dbus.UInt16, short_interval), 'BgscanSignalThreshold': (dbus.Int32, signal)} for k, (type_cast, value) in attributes.iteritems(): if value is None: continue # 'default' is defined in: # client/common_lib/cros/network/xmlrpc_datatypes.py # but we don't have access to that file here. if value == 'default': device.ClearProperty(k) else: device.SetProperty(k, type_cast(value)) return True def get_active_wifi_SSIDs(self): """@return list of string SSIDs with at least one BSS we've scanned.""" properties = self.manager.GetProperties(utf8_strings=True) services = [self.get_dbus_object(self.DBUS_TYPE_SERVICE, path) for path in properties[self.MANAGER_PROPERTY_SERVICES]] wifi_services = [] for service in services: try: service_properties = self.dbus2primitive(service.GetProperties( utf8_strings=True)) except dbus.exceptions.DBusException as e: pass # Probably the service disappeared before GetProperties(). logging.debug('Considering service with properties: %r', service_properties) service_type = service_properties[self.SERVICE_PROPERTY_TYPE] strength = service_properties[self.SERVICE_PROPERTY_STRENGTH] if service_type == 'wifi' and strength > 0: # Note that this may cause terrible things if the SSID # is not a valid ASCII string. ssid = service_properties[self.SERVICE_PROPERTY_HEX_SSID] logging.info('Found active WiFi service: %s', ssid) wifi_services.append(ssid.decode('hex')) return wifi_services def wait_for_service_states(self, ssid, states, timeout_seconds): """Wait for a service (ssid) to achieve one of a number of states. @param ssid string name of network for whose state we're waiting. @param states tuple states for which to wait. @param timeout_seconds seconds to wait for property to be achieved @return tuple(successful, final_value, duration) where successful is True iff we saw one of |states|, final_value is the final state we saw, and duration is how long we waited to see that value. """ discovery_params = {self.SERVICE_PROPERTY_TYPE: 'wifi', self.SERVICE_PROPERTY_NAME: ssid} start_time = time.time() service_object = None while time.time() - start_time < timeout_seconds: service_object = self.find_matching_service(discovery_params) if service_object: break time.sleep(self.POLLING_INTERVAL_SECONDS) else: logging.error('Timed out waiting for %s states', ssid) return False, 'unknown', timeout_seconds return self.wait_for_property_in( service_object, self.SERVICE_PROPERTY_STATE, states, timeout_seconds - (time.time() - start_time))