1# Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 2# Use of this source code is governed by a BSD-style license that can be 3# found in the LICENSE file. 4 5import os 6import subprocess 7 8from autotest_lib.client.bin import utils 9from autotest_lib.client.common_lib.cros import site_eap_certs 10 11class HostapdServer(object): 12 """Hostapd server instance wrapped in a context manager. 13 14 Simple interface to starting and controlling a hsotapd instance. 15 This can be combined with a virtual-ethernet setup to test 802.1x 16 on a wired interface. 17 18 Example usage: 19 with hostapd_server.HostapdServer(interface='veth_master') as hostapd: 20 hostapd.send_eap_packets() 21 22 """ 23 CONFIG_TEMPLATE = """ 24interface=%(interface)s 25driver=%(driver)s 26logger_syslog=-1 27logger_syslog_level=2 28logger_stdout=-1 29logger_stdout_level=2 30dump_file=%(config_directory)s/hostapd.dump 31ctrl_interface=%(config_directory)s/%(control_directory)s 32ieee8021x=1 33eapol_key_index_workaround=0 34eap_server=1 35eap_user_file=%(config_directory)s/%(user_file)s 36ca_cert=%(config_directory)s/%(ca_cert)s 37server_cert=%(config_directory)s/%(server_cert)s 38private_key=%(config_directory)s/%(server_key)s 39use_pae_group_addr=1 40eap_reauth_period=10 41""" 42 CA_CERTIFICATE_FILE = 'ca.crt' 43 CONFIG_FILE = 'hostapd.conf' 44 CONTROL_DIRECTORY = 'hostapd.ctl' 45 EAP_PASSWORD = 'password' 46 EAP_PHASE2 = 'MSCHAPV2' 47 EAP_TYPE = 'PEAP' 48 EAP_USERNAME = 'test' 49 HOSTAPD_EXECUTABLE = 'hostapd' 50 HOSTAPD_CLIENT_EXECUTABLE = 'hostapd_cli' 51 SERVER_CERTIFICATE_FILE = 'server.crt' 52 SERVER_PRIVATE_KEY_FILE = 'server.key' 53 USER_AUTHENTICATION_TEMPLATE = """* %(type)s 54"%(username)s"\t%(phase2)s\t"%(password)s"\t[2] 55""" 56 USER_FILE = 'hostapd.eap_user' 57 # This is the default group MAC address to which EAP challenges 58 # are sent, absent any prior knowledge of a specific client on 59 # the link. 60 PAE_NEAREST_ADDRESS = '01:80:c2:00:00:03' 61 62 def __init__(self, 63 interface=None, 64 driver='wired', 65 config_directory='/tmp/hostapd-test'): 66 super(HostapdServer, self).__init__() 67 self._interface = interface 68 self._config_directory = config_directory 69 self._control_directory = '%s/%s' % (self._config_directory, 70 self.CONTROL_DIRECTORY) 71 self._driver = driver 72 self._process = None 73 74 75 def __enter__(self): 76 self.start() 77 return self 78 79 80 def __exit__(self, exception, value, traceback): 81 self.stop() 82 83 84 def write_config(self): 85 """Write out a hostapd configuration file-set based on the caller 86 supplied parameters. 87 88 @return the file name of the top-level configuration file written. 89 90 """ 91 if not os.path.exists(self._config_directory): 92 os.mkdir(self._config_directory) 93 config_params = { 94 'ca_cert': self.CA_CERTIFICATE_FILE, 95 'config_directory' : self._config_directory, 96 'control_directory': self.CONTROL_DIRECTORY, 97 'driver': self._driver, 98 'interface': self._interface, 99 'server_cert': self.SERVER_CERTIFICATE_FILE, 100 'server_key': self.SERVER_PRIVATE_KEY_FILE, 101 'user_file': self.USER_FILE 102 } 103 authentication_params = { 104 'password': self.EAP_PASSWORD, 105 'phase2': self.EAP_PHASE2, 106 'username': self.EAP_USERNAME, 107 'type': self.EAP_TYPE 108 } 109 for filename, contents in ( 110 ( self.CA_CERTIFICATE_FILE, site_eap_certs.ca_cert_1 ), 111 ( self.CONFIG_FILE, self.CONFIG_TEMPLATE % config_params), 112 ( self.SERVER_CERTIFICATE_FILE, site_eap_certs.server_cert_1 ), 113 ( self.SERVER_PRIVATE_KEY_FILE, 114 site_eap_certs.server_private_key_1 ), 115 ( self.USER_FILE, 116 self.USER_AUTHENTICATION_TEMPLATE % authentication_params )): 117 config_file = '%s/%s' % (self._config_directory, filename) 118 with open(config_file, 'w') as f: 119 f.write(contents) 120 return '%s/%s' % (self._config_directory, self.CONFIG_FILE) 121 122 123 def start(self): 124 """Start the hostap server.""" 125 config_file = self.write_config() 126 env = dict(os.environ) 127 env['OPENSSL_CONF'] = '/etc/ssl/openssl.cnf.compat' 128 self._process = subprocess.Popen( 129 [self.HOSTAPD_EXECUTABLE, '-dd', config_file], 130 env=env) 131 132 133 def stop(self): 134 """Stop the hostapd server.""" 135 if self._process: 136 self._process.terminate() 137 self._process.wait() 138 self._process = None 139 140 141 def running(self): 142 """Tests whether the hostapd process is still running. 143 144 @return True if the hostapd process is still running, False otherwise. 145 146 """ 147 if not self._process: 148 return False 149 150 if self._process.poll() != None: 151 # We have essentially reaped the proces, and it is no more. 152 self._process = None 153 return False 154 155 return True 156 157 158 def send_eap_packets(self): 159 """Start sending EAP packets to the nearest neighbor.""" 160 self.send_command('new_sta %s' % self.PAE_NEAREST_ADDRESS) 161 162 163 def get_client_mib(self, client_mac_address): 164 """Get a dict representing the MIB properties for |client_mac_address|. 165 166 @param client_mac_address string MAC address of the client. 167 @return dict containing mib properties. 168 169 """ 170 # Expected output of "hostapd cli <client_mac_address>": 171 # 172 # Selected interface 'veth_master' 173 # b6:f1:39:1d:ad:10 174 # dot1xPaePortNumber=0 175 # dot1xPaePortProtocolVersion=2 176 # [...] 177 result = self.send_command('sta %s' % client_mac_address) 178 client_mib = {} 179 found_client = False 180 for line in result.splitlines(): 181 if found_client: 182 parts = line.split('=', 1) 183 if len(parts) == 2: 184 client_mib[parts[0]] = parts[1] 185 elif line == client_mac_address: 186 found_client = True 187 return client_mib 188 189 190 def send_command(self, command): 191 """Send a command to the hostapd instance. 192 193 @param command string containing the command to run on hostapd. 194 @return string output of the command. 195 196 """ 197 return utils.system_output('%s -p %s %s' % 198 (self.HOSTAPD_CLIENT_EXECUTABLE, 199 self._control_directory, command)) 200 201 202 def client_has_authenticated(self, client_mac_address): 203 """Return whether |client_mac_address| has successfully authenticated. 204 205 @param client_mac_address string MAC address of the client. 206 @return True if client is authenticated. 207 208 """ 209 mib = self.get_client_mib(client_mac_address) 210 return mib.get('dot1xAuthAuthSuccessesWhileAuthenticating', '') == '1' 211 212 213 def client_has_logged_off(self, client_mac_address): 214 """Return whether |client_mac_address| has logged-off. 215 216 @param client_mac_address string MAC address of the client. 217 @return True if client has logged off. 218 219 """ 220 mib = self.get_client_mib(client_mac_address) 221 return mib.get('dot1xAuthAuthEapLogoffWhileAuthenticated', '') == '1' 222