# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import time from autotest_lib.client.bin import test from autotest_lib.client.common_lib import error from autotest_lib.client.common_lib.cros import site_eap_certs from autotest_lib.client.common_lib.cros import virtual_ethernet_pair from autotest_lib.client.cros import hostapd_server from autotest_lib.client.cros import shill_temporary_profile from autotest_lib.client.cros.networking import shill_proxy class network_8021xWiredAuthentication(test.test): """The 802.1x EAP wired authentication class. Runs hostapd on one side of an ethernet pair, and shill on the other. Configures the Ethernet service with 802.1x credentials and ensures that when shill detects an EAP authenticator, it is successful in using its credentials to gain access. """ INTERFACE_NAME = 'pseudoethernet0' AUTHENTICATION_FLAG = 'EapAuthenticationCompleted' TEST_PROFILE_NAME = 'test1x' AUTHENTICATION_TIMEOUT = 10 version = 1 def get_device(self, interface_name): """Finds the corresponding Device object for an ethernet interface with the name |interface_name|. @param interface_name string The name of the interface to check. @return DBus interface object representing the associated device. """ device = self._shill_proxy.find_object('Device', {'Name': interface_name}) if device is None: raise error.TestFail('Device was not found.') return device def get_authenticated_flag(self, interface_name): """Checks whether |interface_name| has successfully negotiated 802.1x. @param interface_name string The name of the interface to check. @return True if the authenticated flag is set, False otherwise. """ device = self.get_device(interface_name) device_properties = device.GetProperties(utf8_strings=True) logging.info('Device properties are %r', device_properties) return shill_proxy.ShillProxy.dbus2primitive( device_properties[self.AUTHENTICATION_FLAG]) def wait_for_authentication(self, interface_name): """Wait for |interface_name| to get to enter authentication state. @param interface_name string The name of the interface to check. """ device = self.get_device(interface_name) result = self._shill_proxy.wait_for_property_in( device, self.AUTHENTICATION_FLAG, (True,), self.AUTHENTICATION_TIMEOUT) (successful, _, _) = result return successful def configure_credentials(self, interface_name): """Adds authentication properties to the Ethernet EAP service. @param interface_name string The name of the associated interface """ service = self._shill_proxy.configure_service({ 'Type': 'etherneteap', 'EAP.EAP': hostapd_server.HostapdServer.EAP_TYPE, 'EAP.InnerEAP': 'auth=%s' % hostapd_server.HostapdServer.EAP_PHASE2, 'EAP.Identity': hostapd_server.HostapdServer.EAP_USERNAME, 'EAP.Password': hostapd_server.HostapdServer.EAP_PASSWORD, 'EAP.CACertPEM': [ site_eap_certs.ca_cert_1 ] }) def run_once(self): """Test main loop.""" self._shill_proxy = shill_proxy.ShillProxy() manager = self._shill_proxy.manager with shill_temporary_profile.ShillTemporaryProfile( manager, profile_name=self.TEST_PROFILE_NAME): with virtual_ethernet_pair.VirtualEthernetPair( peer_interface_name=self.INTERFACE_NAME, peer_interface_ip=None) as ethernet_pair: if not ethernet_pair.is_healthy: raise error.TestFail('Virtual ethernet pair failed.') if self.get_authenticated_flag(self.INTERFACE_NAME): raise error.TestFail('Authentication flag already set.') with hostapd_server.HostapdServer( interface=ethernet_pair.interface_name) as hostapd: # Wait for hostapd to initialize. time.sleep(1) if not hostapd.running(): raise error.TestFail('hostapd process exited.') self.configure_credentials(self.INTERFACE_NAME) hostapd.send_eap_packets() if not self.wait_for_authentication(self.INTERFACE_NAME): raise error.TestFail('Authentication did not complete.') client_mac_address = ethernet_pair.peer_interface_mac if not hostapd.client_has_authenticated(client_mac_address): raise error.TestFail('Server does not agree that ' 'client is authenticated') if hostapd.client_has_logged_off(client_mac_address): raise error.TestFail('Client has already logged off') # Since the EAP credentials are associated with the # top-most profile, popping it should cause the client # to immediately log-off. manager.PopProfile(self.TEST_PROFILE_NAME) if self.get_authenticated_flag(self.INTERFACE_NAME): raise error.TestFail('Client is still authenticated.') if not hostapd.client_has_logged_off(client_mac_address): raise error.TestFail('Client did not log off') # Re-pushing the profile should make the EAP credentials # available again, and should cause the client to # re-authenticate. manager.PushProfile(self.TEST_PROFILE_NAME) if not self.wait_for_authentication(self.INTERFACE_NAME): raise error.TestFail('Re-authentication did not ' 'complete.')