1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import random 18import time 19import re 20 21from acts import asserts 22from acts import utils 23from acts.controllers.access_point import setup_ap, AccessPoint 24from acts.controllers.ap_lib import dhcp_config 25from acts.controllers.ap_lib import hostapd_constants 26from acts.controllers.ap_lib.hostapd_security import Security 27from acts.controllers.ap_lib.hostapd_utils import generate_random_password 28from acts.controllers.utils_lib.commands import ip 29from acts_contrib.test_utils.abstract_devices.wlan_device import create_wlan_device 30from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest 31 32 33class Dhcpv4InteropFixture(WifiBaseTest): 34 """Test helpers for validating DHCPv4 Interop 35 36 Test Bed Requirement: 37 * One Android device or Fuchsia device 38 * One Access Point 39 """ 40 access_point: AccessPoint 41 42 def setup_class(self): 43 super().setup_class() 44 if 'dut' in self.user_params: 45 if self.user_params['dut'] == 'fuchsia_devices': 46 self.dut = create_wlan_device(self.fuchsia_devices[0]) 47 elif self.user_params['dut'] == 'android_devices': 48 self.dut = create_wlan_device(self.android_devices[0]) 49 else: 50 raise ValueError('Invalid DUT specified in config. (%s)' % 51 self.user_params['dut']) 52 else: 53 # Default is an android device, just like the other tests 54 self.dut = create_wlan_device(self.android_devices[0]) 55 56 self.access_point = self.access_points[0] 57 self.access_point.stop_all_aps() 58 59 def setup_test(self): 60 if hasattr(self, "android_devices"): 61 for ad in self.android_devices: 62 ad.droid.wakeLockAcquireBright() 63 ad.droid.wakeUpNow() 64 self.dut.wifi_toggle_state(True) 65 66 def teardown_test(self): 67 if hasattr(self, "android_devices"): 68 for ad in self.android_devices: 69 ad.droid.wakeLockRelease() 70 ad.droid.goToSleepNow() 71 self.dut.turn_location_off_and_scan_toggle_off() 72 self.dut.disconnect() 73 self.dut.reset_wifi() 74 self.access_point.stop_all_aps() 75 76 def connect(self, ap_params): 77 asserts.assert_true( 78 self.dut.associate(ap_params['ssid'], 79 target_pwd=ap_params['password'], 80 target_security=ap_params['target_security']), 81 'Failed to connect.') 82 83 def setup_ap(self): 84 """Generates a hostapd config and sets up the AP with that config. 85 Does not run a DHCP server. 86 87 Returns: A dictionary of information about the AP. 88 """ 89 ssid = utils.rand_ascii_str(20) 90 security_mode = hostapd_constants.WPA2_STRING 91 security_profile = Security( 92 security_mode=security_mode, 93 password=generate_random_password(length=20), 94 wpa_cipher='CCMP', 95 wpa2_cipher='CCMP') 96 password = security_profile.password 97 target_security = hostapd_constants.SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get( 98 security_mode) 99 100 ap_ids = setup_ap(access_point=self.access_point, 101 profile_name='whirlwind', 102 mode=hostapd_constants.MODE_11N_MIXED, 103 channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G, 104 n_capabilities=[], 105 ac_capabilities=[], 106 force_wmm=True, 107 ssid=ssid, 108 security=security_profile, 109 password=password) 110 111 if len(ap_ids) > 1: 112 raise Exception("Expected only one SSID on AP") 113 114 configured_subnets = self.access_point.get_configured_subnets() 115 if len(configured_subnets) > 1: 116 raise Exception("Expected only one subnet on AP") 117 router_ip = configured_subnets[0].router 118 network = configured_subnets[0].network 119 120 self.access_point.stop_dhcp() 121 122 return { 123 'ssid': ssid, 124 'password': password, 125 'target_security': target_security, 126 'ip': router_ip, 127 'network': network, 128 'id': ap_ids[0], 129 } 130 131 def device_can_ping(self, dest_ip): 132 """Checks if the DUT can ping the given address. 133 134 Returns: True if can ping, False otherwise""" 135 self.log.info('Attempting to ping %s...' % dest_ip) 136 ping_result = self.dut.can_ping(dest_ip, count=2) 137 if ping_result: 138 self.log.info('Success pinging: %s' % dest_ip) 139 else: 140 self.log.info('Failure pinging: %s' % dest_ip) 141 return ping_result 142 143 def get_device_ipv4_addr(self, interface=None, timeout=20): 144 """Checks if device has an ipv4 private address. Sleeps 1 second between 145 retries. 146 147 Args: 148 interface: string, name of interface from which to get ipv4 address. 149 150 Raises: 151 ConnectionError, if DUT does not have an ipv4 address after all 152 timeout. 153 154 Returns: 155 The device's IP address 156 157 """ 158 self.log.debug('Fetching updated WLAN interface list') 159 if interface is None: 160 interface = self.dut.device.wlan_client_test_interface_name 161 self.log.info( 162 'Checking if DUT has received an ipv4 addr on iface %s. Will retry for %s ' 163 'seconds.' % (interface, timeout)) 164 timeout = time.time() + timeout 165 while time.time() < timeout: 166 ip_addrs = self.dut.device.get_interface_ip_addresses(interface) 167 168 if len(ip_addrs['ipv4_private']) > 0: 169 ip = ip_addrs['ipv4_private'][0] 170 self.log.info('DUT has an ipv4 address: %s' % ip) 171 return ip 172 else: 173 self.log.debug( 174 'DUT does not yet have an ipv4 address...retrying in 1 ' 175 'second.') 176 time.sleep(1) 177 else: 178 raise ConnectionError('DUT failed to get an ipv4 address.') 179 180 def run_test_case_expect_dhcp_success(self, settings): 181 """Starts the AP and DHCP server, and validates that the client 182 connects and obtains an address. 183 184 Args: 185 settings: a dictionary containing: 186 dhcp_parameters: a dictionary of DHCP parameters 187 dhcp_options: a dictionary of DHCP options 188 """ 189 ap_params = self.setup_ap() 190 subnet_conf = dhcp_config.Subnet( 191 subnet=ap_params['network'], 192 router=ap_params['ip'], 193 additional_parameters=settings['dhcp_parameters'], 194 additional_options=settings['dhcp_options']) 195 dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf]) 196 197 self.log.debug('DHCP Configuration:\n' + 198 dhcp_conf.render_config_file() + "\n") 199 200 dhcp_logs_before = self.access_point.get_dhcp_logs().split('\n') 201 self.access_point.start_dhcp(dhcp_conf=dhcp_conf) 202 self.connect(ap_params=ap_params) 203 204 # Typical log lines look like: 205 # dhcpd[26695]: DHCPDISCOVER from f8:0f:f9:3d:ce:d1 via wlan1 206 # dhcpd[26695]: DHCPOFFER on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1 207 # dhcpd[26695]: DHCPREQUEST for 192.168.9.2 (192.168.9.1) from f8:0f:f9:3d:ce:d1 via wlan1 208 # dhcpd[26695]: DHCPACK on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1 209 210 try: 211 ip = self.get_device_ipv4_addr() 212 except ConnectionError: 213 self.log.warn(dhcp_logs) 214 asserts.fail(f'DUT failed to get an IP address') 215 216 # Get updates to DHCP logs 217 dhcp_logs = self.access_point.get_dhcp_logs() 218 for line in dhcp_logs_before: 219 dhcp_logs = dhcp_logs.replace(line, '') 220 221 expected_string = f'DHCPDISCOVER from' 222 asserts.assert_equal( 223 dhcp_logs.count(expected_string), 1, 224 f'Incorrect count of DHCP Discovers ("{expected_string}") in logs:\n' 225 + dhcp_logs + "\n") 226 227 expected_string = f'DHCPOFFER on {ip}' 228 asserts.assert_equal( 229 dhcp_logs.count(expected_string), 1, 230 f'Incorrect count of DHCP Offers ("{expected_string}") in logs:\n' 231 + dhcp_logs + "\n") 232 233 expected_string = f'DHCPREQUEST for {ip}' 234 asserts.assert_true( 235 dhcp_logs.count(expected_string) >= 1, 236 f'Incorrect count of DHCP Requests ("{expected_string}") in logs: ' 237 + dhcp_logs + "\n") 238 239 expected_string = f'DHCPACK on {ip}' 240 asserts.assert_true( 241 dhcp_logs.count(expected_string) >= 1, 242 f'Incorrect count of DHCP Acks ("{expected_string}") in logs: ' + 243 dhcp_logs + "\n") 244 245 asserts.assert_true(self.device_can_ping(ap_params['ip']), 246 f'DUT failed to ping router at {ap_params["ip"]}') 247 248 249class Dhcpv4InteropFixtureTest(Dhcpv4InteropFixture): 250 """Tests which validate the behavior of the Dhcpv4InteropFixture. 251 252 In theory, these are more similar to unit tests than ACTS tests, but 253 since they interact with hardware (specifically, the AP), we have to 254 write and run them like the rest of the ACTS tests.""" 255 256 def test_invalid_options_not_accepted(self): 257 """Ensures the DHCP server doesn't accept invalid options""" 258 ap_params = self.setup_ap() 259 subnet_conf = dhcp_config.Subnet(subnet=ap_params['network'], 260 router=ap_params['ip'], 261 additional_options={'foo': 'bar'}) 262 dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf]) 263 with asserts.assert_raises_regex(Exception, r'failed to start'): 264 self.access_point.start_dhcp(dhcp_conf=dhcp_conf) 265 266 def test_invalid_parameters_not_accepted(self): 267 """Ensures the DHCP server doesn't accept invalid parameters""" 268 ap_params = self.setup_ap() 269 subnet_conf = dhcp_config.Subnet(subnet=ap_params['network'], 270 router=ap_params['ip'], 271 additional_parameters={'foo': 'bar'}) 272 dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf]) 273 with asserts.assert_raises_regex(Exception, r'failed to start'): 274 self.access_point.start_dhcp(dhcp_conf=dhcp_conf) 275 276 def test_no_dhcp_server_started(self): 277 """Validates that the test fixture does not start a DHCP server.""" 278 ap_params = self.setup_ap() 279 self.connect(ap_params=ap_params) 280 with asserts.assert_raises(ConnectionError): 281 self.get_device_ipv4_addr() 282 283 284class Dhcpv4InteropBasicTest(Dhcpv4InteropFixture): 285 """DhcpV4 tests which validate basic DHCP client/server interactions.""" 286 287 def test_basic_dhcp_assignment(self): 288 self.run_test_case_expect_dhcp_success(settings={ 289 'dhcp_options': {}, 290 'dhcp_parameters': {} 291 }) 292 293 def test_pool_allows_unknown_clients(self): 294 self.run_test_case_expect_dhcp_success(settings={ 295 'dhcp_options': {}, 296 'dhcp_parameters': { 297 'allow': 'unknown-clients' 298 } 299 }) 300 301 def test_pool_disallows_unknown_clients(self): 302 ap_params = self.setup_ap() 303 subnet_conf = dhcp_config.Subnet( 304 subnet=ap_params['network'], 305 router=ap_params['ip'], 306 additional_parameters={'deny': 'unknown-clients'}) 307 dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf]) 308 self.access_point.start_dhcp(dhcp_conf=dhcp_conf) 309 310 self.connect(ap_params=ap_params) 311 with asserts.assert_raises(ConnectionError): 312 self.get_device_ipv4_addr() 313 314 dhcp_logs = self.access_point.get_dhcp_logs() 315 asserts.assert_true( 316 re.search(r'DHCPDISCOVER from .*no free leases', dhcp_logs), 317 "Did not find expected message in dhcp logs: " + dhcp_logs + "\n") 318 319 def test_lease_renewal(self): 320 """Validates that a client renews their DHCP lease.""" 321 LEASE_TIME = 30 322 ap_params = self.setup_ap() 323 subnet_conf = dhcp_config.Subnet(subnet=ap_params['network'], 324 router=ap_params['ip']) 325 dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf], 326 default_lease_time=LEASE_TIME, 327 max_lease_time=LEASE_TIME) 328 self.access_point.start_dhcp(dhcp_conf=dhcp_conf) 329 self.connect(ap_params=ap_params) 330 ip = self.get_device_ipv4_addr() 331 332 dhcp_logs_before = self.access_point.get_dhcp_logs() 333 SLEEP_TIME = LEASE_TIME + 3 334 self.log.info(f'Sleeping {SLEEP_TIME}s to await DHCP renewal') 335 time.sleep(SLEEP_TIME) 336 337 dhcp_logs_after = self.access_point.get_dhcp_logs() 338 dhcp_logs = dhcp_logs_after.replace(dhcp_logs_before, '') 339 # Fuchsia renews at LEASE_TIME / 2, so there should be at least 2 DHCPREQUESTs in logs. 340 # The log lines look like: 341 # INFO dhcpd[17385]: DHCPREQUEST for 192.168.9.2 from f8:0f:f9:3d:ce:d1 via wlan1 342 # INFO dhcpd[17385]: DHCPACK on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1 343 expected_string = f'DHCPREQUEST for {ip}' 344 asserts.assert_true( 345 dhcp_logs.count(expected_string) >= 2, 346 f'Not enough DHCP renewals ("{expected_string}") in logs: ' + 347 dhcp_logs + "\n") 348 349 350class Dhcpv4DuplicateAddressTest(Dhcpv4InteropFixture): 351 352 def setup_test(self): 353 super().setup_test() 354 self.extra_addresses = [] 355 self.ap_params = self.setup_ap() 356 self.ap_ip_cmd = ip.LinuxIpCommand(self.access_point.ssh) 357 358 def teardown_test(self): 359 super().teardown_test() 360 for ip in self.extra_addresses: 361 self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'], ip) 362 363 def test_duplicate_address_assignment(self): 364 """It's possible for a DHCP server to assign an address that already exists on the network. 365 DHCP clients are expected to perform a "gratuitous ARP" of the to-be-assigned address, and 366 refuse to assign that address. Clients should also recover by asking for a different 367 address. 368 """ 369 # Modify subnet to hold fewer addresses. 370 # A '/29' has 8 addresses (6 usable excluding router / broadcast) 371 subnet = next(self.ap_params['network'].subnets(new_prefix=29)) 372 subnet_conf = dhcp_config.Subnet( 373 subnet=subnet, 374 router=self.ap_params['ip'], 375 # When the DHCP server is considering dynamically allocating an IP address to a client, 376 # it first sends an ICMP Echo request (a ping) to the address being assigned. It waits 377 # for a second, and if no ICMP Echo response has been heard, it assigns the address. 378 # If a response is heard, the lease is abandoned, and the server does not respond to 379 # the client. 380 # The ping-check configuration parameter can be used to control checking - if its value 381 # is false, no ping check is done. 382 additional_parameters={'ping-check': 'false'}) 383 dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf]) 384 self.access_point.start_dhcp(dhcp_conf=dhcp_conf) 385 386 # Add each of the usable IPs as an alias for the router's interface, such that the router 387 # will respond to any pings on it. 388 for ip in subnet.hosts(): 389 self.ap_ip_cmd.add_ipv4_address(self.ap_params['id'], ip) 390 # Ensure we remove the address in self.teardown_test() even if the test fails 391 self.extra_addresses.append(ip) 392 393 self.connect(ap_params=self.ap_params) 394 with asserts.assert_raises(ConnectionError): 395 self.get_device_ipv4_addr() 396 397 # Per spec, the flow should be: 398 # Discover -> Offer -> Request -> Ack -> client optionally performs DAD 399 dhcp_logs = self.access_point.get_dhcp_logs() 400 for expected_message in [ 401 r'DHCPDISCOVER from \S+', 402 r'DHCPOFFER on [0-9.]+ to \S+', 403 r'DHCPREQUEST for [0-9.]+', 404 r'DHCPACK on [0-9.]+', 405 r'DHCPDECLINE of [0-9.]+ from \S+ via .*: abandoned', 406 r'Abandoning IP address [0-9.]+: declined', 407 ]: 408 asserts.assert_true( 409 re.search(expected_message, dhcp_logs), 410 f'Did not find expected message ({expected_message}) in dhcp logs: {dhcp_logs}' 411 + "\n") 412 413 # Remove each of the IP aliases. 414 # Note: this also removes the router's address (e.g. 192.168.1.1), so pinging the 415 # router after this will not work. 416 while self.extra_addresses: 417 self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'], 418 self.extra_addresses.pop()) 419 420 # Now, we should get an address successfully 421 ip = self.get_device_ipv4_addr() 422 dhcp_logs = self.access_point.get_dhcp_logs() 423 424 expected_string = f'DHCPREQUEST for {ip}' 425 asserts.assert_true( 426 dhcp_logs.count(expected_string) >= 1, 427 f'Incorrect count of DHCP Requests ("{expected_string}") in logs: ' 428 + dhcp_logs + "\n") 429 430 expected_string = f'DHCPACK on {ip}' 431 asserts.assert_true( 432 dhcp_logs.count(expected_string) >= 1, 433 f'Incorrect count of DHCP Acks ("{expected_string}") in logs: ' + 434 dhcp_logs + "\n") 435 436 437class Dhcpv4InteropCombinatorialOptionsTest(Dhcpv4InteropFixture): 438 """DhcpV4 tests which validate combinations of DHCP options.""" 439 OPT_NUM_DOMAIN_SEARCH = 119 440 OPT_NUM_DOMAIN_NAME = 15 441 442 def setup_generated_tests(self): 443 self._generate_dhcp_options() 444 445 test_args = [] 446 for test in self.DHCP_OPTIONS: 447 for option_list in self.DHCP_OPTIONS[test]: 448 test_args.append(({ 449 'dhcp_options': option_list, 450 'dhcp_parameters': {} 451 }, )) 452 453 self.generate_tests(test_logic=self.run_test_case_expect_dhcp_success, 454 name_func=self.generate_test_name, 455 arg_sets=test_args) 456 457 def generate_test_name(self, settings): 458 return settings["dhcp_options"]["test_name"] 459 460 def _generate_dhcp_options(self): 461 self.DHCP_OPTIONS = { 462 'domain-name-tests': [{ 463 'domain-name': '"example.invalid"', 464 'dhcp-parameter-request-list': self.OPT_NUM_DOMAIN_NAME, 465 'test_name': "test_domain_name_invalid_tld" 466 }, { 467 'domain-name': '"example.test"', 468 'dhcp-parameter-request-list': self.OPT_NUM_DOMAIN_NAME, 469 'test_name': "test_domain_name_valid_tld" 470 }], 471 'domain-search-tests': [{ 472 'domain-search': 473 '"example.invalid"', 474 'dhcp-parameter-request-list': 475 self.OPT_NUM_DOMAIN_SEARCH, 476 'test_name': 477 "test_domain_search_invalid_tld" 478 }, { 479 'domain-search': '"example.test"', 480 'dhcp-parameter-request-list': self.OPT_NUM_DOMAIN_SEARCH, 481 'test_name': "test_domain_search_valid_tld" 482 }] 483 } 484 485 # The RFC limits DHCP payloads to 576 bytes unless the client signals it can handle larger 486 # payloads, which it does by sending DHCP option 57, "Maximum DHCP Message Size". Despite 487 # being able to accept larger payloads, clients typically don't advertise this. 488 # The test verifies that the client accepts a large message split across multiple ethernet 489 # frames. 490 # The test is created by sending many bytes of options through the domain-name-servers 491 # option, which is of unbounded length (though is compressed per RFC1035 section 4.1.4). 492 typical_ethernet_mtu = 1500 493 self.DHCP_OPTIONS['max-message-size-tests'] = [] 494 495 long_dns_setting = ', '.join( 496 f'"ns{num}.example"' 497 for num in random.sample(range(100_000, 1_000_000), 250)) 498 # RFC1035 compression means any shared suffix ('.example' in this case) will 499 # be deduplicated. Calculate approximate length by removing that suffix. 500 long_dns_setting_len = len( 501 long_dns_setting.replace(', ', '').replace('"', '').replace( 502 '.example', '').encode('utf-8')) 503 asserts.assert_true( 504 long_dns_setting_len > typical_ethernet_mtu, 505 "Expected to generate message greater than ethernet mtu") 506 self.DHCP_OPTIONS['max-message-size-tests'].append({ 507 'dhcp-max-message-size': 508 long_dns_setting_len * 2, 509 'domain-search': 510 long_dns_setting, 511 'dhcp-parameter-request-list': 512 self.OPT_NUM_DOMAIN_SEARCH, 513 'test_name': 514 "test_max_sized_message", 515 }) 516