• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import random
18import time
19import re
20
21from acts import asserts
22from acts import utils
23from acts.controllers.access_point import setup_ap, AccessPoint
24from acts.controllers.ap_lib import dhcp_config
25from acts.controllers.ap_lib import hostapd_constants
26from acts.controllers.ap_lib.hostapd_security import Security
27from acts.controllers.ap_lib.hostapd_utils import generate_random_password
28from acts.controllers.utils_lib.commands import ip
29from acts_contrib.test_utils.abstract_devices.wlan_device import create_wlan_device
30from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
31
32
33class Dhcpv4InteropFixture(WifiBaseTest):
34    """Test helpers for validating DHCPv4 Interop
35
36    Test Bed Requirement:
37    * One Android device or Fuchsia device
38    * One Access Point
39    """
40    access_point: AccessPoint
41
42    def setup_class(self):
43        super().setup_class()
44        if 'dut' in self.user_params:
45            if self.user_params['dut'] == 'fuchsia_devices':
46                self.dut = create_wlan_device(self.fuchsia_devices[0])
47            elif self.user_params['dut'] == 'android_devices':
48                self.dut = create_wlan_device(self.android_devices[0])
49            else:
50                raise ValueError('Invalid DUT specified in config. (%s)' %
51                                 self.user_params['dut'])
52        else:
53            # Default is an android device, just like the other tests
54            self.dut = create_wlan_device(self.android_devices[0])
55
56        self.access_point = self.access_points[0]
57        self.access_point.stop_all_aps()
58
59    def setup_test(self):
60        if hasattr(self, "android_devices"):
61            for ad in self.android_devices:
62                ad.droid.wakeLockAcquireBright()
63                ad.droid.wakeUpNow()
64        self.dut.wifi_toggle_state(True)
65
66    def teardown_test(self):
67        if hasattr(self, "android_devices"):
68            for ad in self.android_devices:
69                ad.droid.wakeLockRelease()
70                ad.droid.goToSleepNow()
71        self.dut.turn_location_off_and_scan_toggle_off()
72        self.dut.disconnect()
73        self.dut.reset_wifi()
74        self.access_point.stop_all_aps()
75
76    def connect(self, ap_params):
77        asserts.assert_true(
78            self.dut.associate(ap_params['ssid'],
79                               target_pwd=ap_params['password'],
80                               target_security=ap_params['target_security']),
81            'Failed to connect.')
82
83    def setup_ap(self):
84        """Generates a hostapd config and sets up the AP with that config.
85        Does not run a DHCP server.
86
87        Returns: A dictionary of information about the AP.
88        """
89        ssid = utils.rand_ascii_str(20)
90        security_mode = hostapd_constants.WPA2_STRING
91        security_profile = Security(
92            security_mode=security_mode,
93            password=generate_random_password(length=20),
94            wpa_cipher='CCMP',
95            wpa2_cipher='CCMP')
96        password = security_profile.password
97        target_security = hostapd_constants.SECURITY_STRING_TO_DEFAULT_TARGET_SECURITY.get(
98            security_mode)
99
100        ap_ids = setup_ap(access_point=self.access_point,
101                          profile_name='whirlwind',
102                          mode=hostapd_constants.MODE_11N_MIXED,
103                          channel=hostapd_constants.AP_DEFAULT_CHANNEL_5G,
104                          n_capabilities=[],
105                          ac_capabilities=[],
106                          force_wmm=True,
107                          ssid=ssid,
108                          security=security_profile,
109                          password=password)
110
111        if len(ap_ids) > 1:
112            raise Exception("Expected only one SSID on AP")
113
114        configured_subnets = self.access_point.get_configured_subnets()
115        if len(configured_subnets) > 1:
116            raise Exception("Expected only one subnet on AP")
117        router_ip = configured_subnets[0].router
118        network = configured_subnets[0].network
119
120        self.access_point.stop_dhcp()
121
122        return {
123            'ssid': ssid,
124            'password': password,
125            'target_security': target_security,
126            'ip': router_ip,
127            'network': network,
128            'id': ap_ids[0],
129        }
130
131    def device_can_ping(self, dest_ip):
132        """Checks if the DUT can ping the given address.
133
134        Returns: True if can ping, False otherwise"""
135        self.log.info('Attempting to ping %s...' % dest_ip)
136        ping_result = self.dut.can_ping(dest_ip, count=2)
137        if ping_result:
138            self.log.info('Success pinging: %s' % dest_ip)
139        else:
140            self.log.info('Failure pinging: %s' % dest_ip)
141        return ping_result
142
143    def get_device_ipv4_addr(self, interface=None, timeout=20):
144        """Checks if device has an ipv4 private address. Sleeps 1 second between
145        retries.
146
147        Args:
148            interface: string, name of interface from which to get ipv4 address.
149
150        Raises:
151            ConnectionError, if DUT does not have an ipv4 address after all
152            timeout.
153
154        Returns:
155            The device's IP address
156
157        """
158        self.log.debug('Fetching updated WLAN interface list')
159        if interface is None:
160            interface = self.dut.device.wlan_client_test_interface_name
161        self.log.info(
162            'Checking if DUT has received an ipv4 addr on iface %s. Will retry for %s '
163            'seconds.' % (interface, timeout))
164        timeout = time.time() + timeout
165        while time.time() < timeout:
166            ip_addrs = self.dut.device.get_interface_ip_addresses(interface)
167
168            if len(ip_addrs['ipv4_private']) > 0:
169                ip = ip_addrs['ipv4_private'][0]
170                self.log.info('DUT has an ipv4 address: %s' % ip)
171                return ip
172            else:
173                self.log.debug(
174                    'DUT does not yet have an ipv4 address...retrying in 1 '
175                    'second.')
176                time.sleep(1)
177        else:
178            raise ConnectionError('DUT failed to get an ipv4 address.')
179
180    def run_test_case_expect_dhcp_success(self, settings):
181        """Starts the AP and DHCP server, and validates that the client
182        connects and obtains an address.
183
184        Args:
185            settings: a dictionary containing:
186                dhcp_parameters: a dictionary of DHCP parameters
187                dhcp_options: a dictionary of DHCP options
188        """
189        ap_params = self.setup_ap()
190        subnet_conf = dhcp_config.Subnet(
191            subnet=ap_params['network'],
192            router=ap_params['ip'],
193            additional_parameters=settings['dhcp_parameters'],
194            additional_options=settings['dhcp_options'])
195        dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
196
197        self.log.debug('DHCP Configuration:\n' +
198                       dhcp_conf.render_config_file() + "\n")
199
200        dhcp_logs_before = self.access_point.get_dhcp_logs().split('\n')
201        self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
202        self.connect(ap_params=ap_params)
203
204        # Typical log lines look like:
205        # dhcpd[26695]: DHCPDISCOVER from f8:0f:f9:3d:ce:d1 via wlan1
206        # dhcpd[26695]: DHCPOFFER on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1
207        # dhcpd[26695]: DHCPREQUEST for 192.168.9.2 (192.168.9.1) from f8:0f:f9:3d:ce:d1 via wlan1
208        # dhcpd[26695]: DHCPACK on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1
209
210        try:
211            ip = self.get_device_ipv4_addr()
212        except ConnectionError:
213            self.log.warn(dhcp_logs)
214            asserts.fail(f'DUT failed to get an IP address')
215
216        # Get updates to DHCP logs
217        dhcp_logs = self.access_point.get_dhcp_logs()
218        for line in dhcp_logs_before:
219            dhcp_logs = dhcp_logs.replace(line, '')
220
221        expected_string = f'DHCPDISCOVER from'
222        asserts.assert_equal(
223            dhcp_logs.count(expected_string), 1,
224            f'Incorrect count of DHCP Discovers ("{expected_string}") in logs:\n'
225            + dhcp_logs + "\n")
226
227        expected_string = f'DHCPOFFER on {ip}'
228        asserts.assert_equal(
229            dhcp_logs.count(expected_string), 1,
230            f'Incorrect count of DHCP Offers ("{expected_string}") in logs:\n'
231            + dhcp_logs + "\n")
232
233        expected_string = f'DHCPREQUEST for {ip}'
234        asserts.assert_true(
235            dhcp_logs.count(expected_string) >= 1,
236            f'Incorrect count of DHCP Requests ("{expected_string}") in logs: '
237            + dhcp_logs + "\n")
238
239        expected_string = f'DHCPACK on {ip}'
240        asserts.assert_true(
241            dhcp_logs.count(expected_string) >= 1,
242            f'Incorrect count of DHCP Acks ("{expected_string}") in logs: ' +
243            dhcp_logs + "\n")
244
245        asserts.assert_true(self.device_can_ping(ap_params['ip']),
246                            f'DUT failed to ping router at {ap_params["ip"]}')
247
248
249class Dhcpv4InteropFixtureTest(Dhcpv4InteropFixture):
250    """Tests which validate the behavior of the Dhcpv4InteropFixture.
251
252    In theory, these are more similar to unit tests than ACTS tests, but
253    since they interact with hardware (specifically, the AP), we have to
254    write and run them like the rest of the ACTS tests."""
255
256    def test_invalid_options_not_accepted(self):
257        """Ensures the DHCP server doesn't accept invalid options"""
258        ap_params = self.setup_ap()
259        subnet_conf = dhcp_config.Subnet(subnet=ap_params['network'],
260                                         router=ap_params['ip'],
261                                         additional_options={'foo': 'bar'})
262        dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
263        with asserts.assert_raises_regex(Exception, r'failed to start'):
264            self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
265
266    def test_invalid_parameters_not_accepted(self):
267        """Ensures the DHCP server doesn't accept invalid parameters"""
268        ap_params = self.setup_ap()
269        subnet_conf = dhcp_config.Subnet(subnet=ap_params['network'],
270                                         router=ap_params['ip'],
271                                         additional_parameters={'foo': 'bar'})
272        dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
273        with asserts.assert_raises_regex(Exception, r'failed to start'):
274            self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
275
276    def test_no_dhcp_server_started(self):
277        """Validates that the test fixture does not start a DHCP server."""
278        ap_params = self.setup_ap()
279        self.connect(ap_params=ap_params)
280        with asserts.assert_raises(ConnectionError):
281            self.get_device_ipv4_addr()
282
283
284class Dhcpv4InteropBasicTest(Dhcpv4InteropFixture):
285    """DhcpV4 tests which validate basic DHCP client/server interactions."""
286
287    def test_basic_dhcp_assignment(self):
288        self.run_test_case_expect_dhcp_success(settings={
289            'dhcp_options': {},
290            'dhcp_parameters': {}
291        })
292
293    def test_pool_allows_unknown_clients(self):
294        self.run_test_case_expect_dhcp_success(settings={
295            'dhcp_options': {},
296            'dhcp_parameters': {
297                'allow': 'unknown-clients'
298            }
299        })
300
301    def test_pool_disallows_unknown_clients(self):
302        ap_params = self.setup_ap()
303        subnet_conf = dhcp_config.Subnet(
304            subnet=ap_params['network'],
305            router=ap_params['ip'],
306            additional_parameters={'deny': 'unknown-clients'})
307        dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
308        self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
309
310        self.connect(ap_params=ap_params)
311        with asserts.assert_raises(ConnectionError):
312            self.get_device_ipv4_addr()
313
314        dhcp_logs = self.access_point.get_dhcp_logs()
315        asserts.assert_true(
316            re.search(r'DHCPDISCOVER from .*no free leases', dhcp_logs),
317            "Did not find expected message in dhcp logs: " + dhcp_logs + "\n")
318
319    def test_lease_renewal(self):
320        """Validates that a client renews their DHCP lease."""
321        LEASE_TIME = 30
322        ap_params = self.setup_ap()
323        subnet_conf = dhcp_config.Subnet(subnet=ap_params['network'],
324                                         router=ap_params['ip'])
325        dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf],
326                                           default_lease_time=LEASE_TIME,
327                                           max_lease_time=LEASE_TIME)
328        self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
329        self.connect(ap_params=ap_params)
330        ip = self.get_device_ipv4_addr()
331
332        dhcp_logs_before = self.access_point.get_dhcp_logs()
333        SLEEP_TIME = LEASE_TIME + 3
334        self.log.info(f'Sleeping {SLEEP_TIME}s to await DHCP renewal')
335        time.sleep(SLEEP_TIME)
336
337        dhcp_logs_after = self.access_point.get_dhcp_logs()
338        dhcp_logs = dhcp_logs_after.replace(dhcp_logs_before, '')
339        # Fuchsia renews at LEASE_TIME / 2, so there should be at least 2 DHCPREQUESTs in logs.
340        # The log lines look like:
341        # INFO dhcpd[17385]: DHCPREQUEST for 192.168.9.2 from f8:0f:f9:3d:ce:d1 via wlan1
342        # INFO dhcpd[17385]: DHCPACK on 192.168.9.2 to f8:0f:f9:3d:ce:d1 via wlan1
343        expected_string = f'DHCPREQUEST for {ip}'
344        asserts.assert_true(
345            dhcp_logs.count(expected_string) >= 2,
346            f'Not enough DHCP renewals ("{expected_string}") in logs: ' +
347            dhcp_logs + "\n")
348
349
350class Dhcpv4DuplicateAddressTest(Dhcpv4InteropFixture):
351
352    def setup_test(self):
353        super().setup_test()
354        self.extra_addresses = []
355        self.ap_params = self.setup_ap()
356        self.ap_ip_cmd = ip.LinuxIpCommand(self.access_point.ssh)
357
358    def teardown_test(self):
359        super().teardown_test()
360        for ip in self.extra_addresses:
361            self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'], ip)
362
363    def test_duplicate_address_assignment(self):
364        """It's possible for a DHCP server to assign an address that already exists on the network.
365        DHCP clients are expected to perform a "gratuitous ARP" of the to-be-assigned address, and
366        refuse to assign that address. Clients should also recover by asking for a different
367        address.
368        """
369        # Modify subnet to hold fewer addresses.
370        # A '/29' has 8 addresses (6 usable excluding router / broadcast)
371        subnet = next(self.ap_params['network'].subnets(new_prefix=29))
372        subnet_conf = dhcp_config.Subnet(
373            subnet=subnet,
374            router=self.ap_params['ip'],
375            # When the DHCP server is considering dynamically allocating an IP address to a client,
376            # it first sends an ICMP Echo request (a ping) to the address being assigned. It waits
377            # for a second, and if no ICMP Echo response has been heard, it assigns the address.
378            # If a response is heard, the lease is abandoned, and the server does not respond to
379            # the client.
380            # The ping-check configuration parameter can be used to control checking - if its value
381            # is false, no ping check is done.
382            additional_parameters={'ping-check': 'false'})
383        dhcp_conf = dhcp_config.DhcpConfig(subnets=[subnet_conf])
384        self.access_point.start_dhcp(dhcp_conf=dhcp_conf)
385
386        # Add each of the usable IPs as an alias for the router's interface, such that the router
387        # will respond to any pings on it.
388        for ip in subnet.hosts():
389            self.ap_ip_cmd.add_ipv4_address(self.ap_params['id'], ip)
390            # Ensure we remove the address in self.teardown_test() even if the test fails
391            self.extra_addresses.append(ip)
392
393        self.connect(ap_params=self.ap_params)
394        with asserts.assert_raises(ConnectionError):
395            self.get_device_ipv4_addr()
396
397        # Per spec, the flow should be:
398        # Discover -> Offer -> Request -> Ack -> client optionally performs DAD
399        dhcp_logs = self.access_point.get_dhcp_logs()
400        for expected_message in [
401                r'DHCPDISCOVER from \S+',
402                r'DHCPOFFER on [0-9.]+ to \S+',
403                r'DHCPREQUEST for [0-9.]+',
404                r'DHCPACK on [0-9.]+',
405                r'DHCPDECLINE of [0-9.]+ from \S+ via .*: abandoned',
406                r'Abandoning IP address [0-9.]+: declined',
407        ]:
408            asserts.assert_true(
409                re.search(expected_message, dhcp_logs),
410                f'Did not find expected message ({expected_message}) in dhcp logs: {dhcp_logs}'
411                + "\n")
412
413        # Remove each of the IP aliases.
414        # Note: this also removes the router's address (e.g. 192.168.1.1), so pinging the
415        # router after this will not work.
416        while self.extra_addresses:
417            self.ap_ip_cmd.remove_ipv4_address(self.ap_params['id'],
418                                               self.extra_addresses.pop())
419
420        # Now, we should get an address successfully
421        ip = self.get_device_ipv4_addr()
422        dhcp_logs = self.access_point.get_dhcp_logs()
423
424        expected_string = f'DHCPREQUEST for {ip}'
425        asserts.assert_true(
426            dhcp_logs.count(expected_string) >= 1,
427            f'Incorrect count of DHCP Requests ("{expected_string}") in logs: '
428            + dhcp_logs + "\n")
429
430        expected_string = f'DHCPACK on {ip}'
431        asserts.assert_true(
432            dhcp_logs.count(expected_string) >= 1,
433            f'Incorrect count of DHCP Acks ("{expected_string}") in logs: ' +
434            dhcp_logs + "\n")
435
436
437class Dhcpv4InteropCombinatorialOptionsTest(Dhcpv4InteropFixture):
438    """DhcpV4 tests which validate combinations of DHCP options."""
439    OPT_NUM_DOMAIN_SEARCH = 119
440    OPT_NUM_DOMAIN_NAME = 15
441
442    def setup_generated_tests(self):
443        self._generate_dhcp_options()
444
445        test_args = []
446        for test in self.DHCP_OPTIONS:
447            for option_list in self.DHCP_OPTIONS[test]:
448                test_args.append(({
449                    'dhcp_options': option_list,
450                    'dhcp_parameters': {}
451                }, ))
452
453        self.generate_tests(test_logic=self.run_test_case_expect_dhcp_success,
454                            name_func=self.generate_test_name,
455                            arg_sets=test_args)
456
457    def generate_test_name(self, settings):
458        return settings["dhcp_options"]["test_name"]
459
460    def _generate_dhcp_options(self):
461        self.DHCP_OPTIONS = {
462            'domain-name-tests': [{
463                'domain-name': '"example.invalid"',
464                'dhcp-parameter-request-list': self.OPT_NUM_DOMAIN_NAME,
465                'test_name': "test_domain_name_invalid_tld"
466            }, {
467                'domain-name': '"example.test"',
468                'dhcp-parameter-request-list': self.OPT_NUM_DOMAIN_NAME,
469                'test_name': "test_domain_name_valid_tld"
470            }],
471            'domain-search-tests': [{
472                'domain-search':
473                '"example.invalid"',
474                'dhcp-parameter-request-list':
475                self.OPT_NUM_DOMAIN_SEARCH,
476                'test_name':
477                "test_domain_search_invalid_tld"
478            }, {
479                'domain-search': '"example.test"',
480                'dhcp-parameter-request-list': self.OPT_NUM_DOMAIN_SEARCH,
481                'test_name': "test_domain_search_valid_tld"
482            }]
483        }
484
485        # The RFC limits DHCP payloads to 576 bytes unless the client signals it can handle larger
486        # payloads, which it does by sending DHCP option 57, "Maximum DHCP Message Size". Despite
487        # being able to accept larger payloads, clients typically don't advertise this.
488        # The test verifies that the client accepts a large message split across multiple ethernet
489        # frames.
490        # The test is created by sending many bytes of options through the domain-name-servers
491        # option, which is of unbounded length (though is compressed per RFC1035 section 4.1.4).
492        typical_ethernet_mtu = 1500
493        self.DHCP_OPTIONS['max-message-size-tests'] = []
494
495        long_dns_setting = ', '.join(
496            f'"ns{num}.example"'
497            for num in random.sample(range(100_000, 1_000_000), 250))
498        # RFC1035 compression means any shared suffix ('.example' in this case) will
499        # be deduplicated. Calculate approximate length by removing that suffix.
500        long_dns_setting_len = len(
501            long_dns_setting.replace(', ', '').replace('"', '').replace(
502                '.example', '').encode('utf-8'))
503        asserts.assert_true(
504            long_dns_setting_len > typical_ethernet_mtu,
505            "Expected to generate message greater than ethernet mtu")
506        self.DHCP_OPTIONS['max-message-size-tests'].append({
507            'dhcp-max-message-size':
508            long_dns_setting_len * 2,
509            'domain-search':
510            long_dns_setting,
511            'dhcp-parameter-request-list':
512            self.OPT_NUM_DOMAIN_SEARCH,
513            'test_name':
514            "test_max_sized_message",
515        })
516