# # Copyright 2021 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import time from acts import asserts from acts.controllers.openwrt_ap import MOBLY_CONTROLLER_CONFIG_NAME as OPENWRT from acts.test_decorators import test_tracker_info from acts_contrib.test_utils.net.net_test_utils import start_tcpdump from acts_contrib.test_utils.net.net_test_utils import stop_tcpdump from acts_contrib.test_utils.wifi import wifi_test_utils as wutils from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest from scapy.all import rdpcap, DHCP, IPv6 from scapy.layers.inet6 import ICMPv6ND_NA as NA WLAN = "wlan0" PING_ADDR = "google.com" RAPID_COMMIT_OPTION = (80, b'') DEFAULT_IPV6_ALLROUTERS = "ff02::2" class DhcpTest(WifiBaseTest): """DHCP related test for Android.""" def setup_class(self): self.dut = self.android_devices[0] wutils.wifi_test_device_init(self.dut) req_params = [] opt_param = ["wifi_network", "configure_OpenWrt"] self.unpack_userparams( req_param_names=req_params, opt_param_names=opt_param) asserts.assert_true(OPENWRT in self.user_params, "OpenWrtAP is not in testbed.") self.openwrt = self.access_points[0] if hasattr(self, "configure_OpenWrt") and self.configure_OpenWrt == "skip": self.dut.log.info("Skip configure Wifi interface due to config setup.") else: self.configure_openwrt_ap_and_start(wpa_network=True) self.wifi_network = self.openwrt.get_wifi_network() self.openwrt.network_setting.setup_ipv6_bridge() asserts.assert_true(self.openwrt.verify_wifi_status(), "OpenWrt Wifi interface is not ready.") def teardown_class(self): """Reset wifi and stop tcpdump cleanly.""" wutils.reset_wifi(self.dut) self.openwrt.network_setting.clear_tcpdump() def teardown_test(self): """Reset wifi to make sure DUT tears down cleanly.""" wutils.reset_wifi(self.dut) def _verify_ping(self, option="", dest=PING_ADDR): try: out = self.dut.adb.shell("ping%s -c1 %s" % (option, dest)) return "100%" not in out except Exception as e: self.dut.log.debug(e) return False def _verify_device_address(self, ipv4=True, ipv6=True, timeout=15): """Verify device get assign address on wireless interface.""" current_time = time.time() while time.time() < current_time + timeout: try: if ipv4: ipv4_addr = self.dut.droid.connectivityGetIPv4Addresses(WLAN)[0] self.dut.log.info("ipv4_address is %s" % ipv4_addr) if ipv6: ipv6_addr = self.dut.droid.connectivityGetIPv6Addresses(WLAN)[0] self.dut.log.info("ipv6_address is %s" % ipv6_addr) return True except: time.sleep(1) return False def verify_dhcp_packet(self, packets, support_rapid_commit): for pkt in packets: if pkt.haslayer(DHCP): # Remove dhcp discover checking since rapid commit enable by default(aosp/2943087). if pkt[DHCP].options[0][1] == 2: asserts.assert_true(not support_rapid_commit, "Should not find DHCP OFFER when RAPID_COMMIT_OPTION supported.") elif pkt[DHCP].options[0][1] == 3: asserts.assert_true(not support_rapid_commit, "Should not find DHCP REQUEST when RAPID_COMMIT_OPTION supported.") elif pkt[DHCP].options[0][1] == 5: send_option = RAPID_COMMIT_OPTION in pkt[DHCP].options asserts.assert_true(send_option == support_rapid_commit, "Unexpected result in DHCP ACK.") def verify_gratuitous_na(self, packets): ipv6localaddress = self.dut.droid.connectivityGetLinkLocalIpv6Address(WLAN).strip("%wlan0") self.dut.log.info("Device local address : %s" % ipv6localaddress) ipv6globaladdress = sorted(self.dut.droid.connectivityGetIPv6Addresses(WLAN)) self.dut.log.info("Device global address : %s" % ipv6globaladdress) target_address = [] for pkt in packets: if pkt.haslayer(NA) and pkt.haslayer(IPv6) and pkt[IPv6].src == ipv6localaddress\ and pkt[IPv6].dst == DEFAULT_IPV6_ALLROUTERS: # broadcast global address target_address.append(pkt.tgt) self.dut.log.info("Broadcast target address : %s" % target_address) asserts.assert_equal(ipv6globaladdress, sorted(target_address), "Target address from NA is not match to device ipv6 address.") @test_tracker_info(uuid="01148659-6a3d-4a74-88b6-04b19c4acaaa") def test_ipv4_ipv6_network(self): """Verify device can get both ipv4 ipv6 address.""" wutils.connect_to_wifi_network(self.dut, self.wifi_network) asserts.assert_true(self._verify_device_address(), "Fail to get ipv4/ipv6 address.") asserts.assert_true(self._verify_ping(), "Fail to ping on ipv4.") asserts.assert_true(self._verify_ping("6"), "Fail to ping on ipv6.") @test_tracker_info(uuid="d3f37ba7-504e-48fc-95be-6eca9a148e4a") def test_ipv6_only_prefer_option(self): """Verify DUT can only get ipv6 address and ping out.""" self.openwrt.network_setting.add_ipv6_prefer_option() wutils.connect_to_wifi_network(self.dut, self.wifi_network) asserts.assert_true(self._verify_device_address(ipv4=False), "Fail to get ipv6 address.") asserts.assert_false(self._verify_ping(), "Should not ping on success on ipv4.") asserts.assert_true(self._verify_ping("6"), "Fail to ping on ipv6.") self.openwrt.network_setting.remove_ipv6_prefer_option() @test_tracker_info(uuid="a16f2a3c-e3ca-4fca-b3ee-bccb5cf34bab") def test_dhcp_rapid_commit(self): """Verify DUT can run with rapid commit on IPv4.""" self.dut.adb.shell("device_config put connectivity dhcp_rapid_commit_version 1") self.openwrt.network_setting.add_dhcp_rapid_commit() remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name) wutils.connect_to_wifi_network(self.dut, self.wifi_network) local_pcap_path = self.openwrt.network_setting.stop_tcpdump( remote_pcap_path, self.dut.device_log_path) self.dut.log.info("pcap file path : %s" % local_pcap_path) packets = rdpcap(local_pcap_path) self.verify_dhcp_packet(packets, True) self.openwrt.network_setting.remove_dhcp_rapid_commit() @test_tracker_info(uuid="cddb3d33-e5ef-4efd-8ae5-1325010a05c8") def test_dhcp_4_way_handshake(self): """Verify DUT can run with rapid commit on IPv4.""" self.dut.adb.shell("device_config put connectivity dhcp_rapid_commit_version 0") remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name) wutils.connect_to_wifi_network(self.dut, self.wifi_network) local_pcap_path = self.openwrt.network_setting.stop_tcpdump( remote_pcap_path, self.dut.device_log_path) self.dut.log.info("pcap file path : %s" % local_pcap_path) packets = rdpcap(local_pcap_path) self.verify_dhcp_packet(packets, False) @test_tracker_info(uuid="69fd9619-db35-406a-96e2-8425f8f5e8bd") def test_gratuitous_na(self): """Verify DUT will send NA after ipv6 address set.""" self.dut.adb.shell("device_config put connectivity ipclient_gratuitous_na_version 1") remote_pcap_path = self.openwrt.network_setting.start_tcpdump(self.test_name) self.tcpdump_pid = start_tcpdump(self.dut, self.test_name) wutils.connect_to_wifi_network(self.dut, self.wifi_network) local_pcap_path = self.openwrt.network_setting.stop_tcpdump( remote_pcap_path, self.dut.device_log_path) stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) self.dut.log.info("pcap file path : %s" % local_pcap_path) packets = rdpcap(local_pcap_path) self.verify_gratuitous_na(packets)