1# 2# Copyright 2018 - The Android Open Source Project 3# 4# Licensed under the Apache License, Version 2.0 (the "License"); 5# you may not use this file except in compliance with the License. 6# You may obtain a copy of the License at 7# 8# http://www.apache.org/licenses/LICENSE-2.0 9# 10# Unless required by applicable law or agreed to in writing, software 11# distributed under the License is distributed on an "AS IS" BASIS, 12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13# See the License for the specific language governing permissions and 14# limitations under the License. 15 16from acts import asserts 17from acts.test_decorators import test_tracker_info 18from acts_contrib.test_utils.net.net_test_utils import start_tcpdump 19from acts_contrib.test_utils.net.net_test_utils import stop_tcpdump 20from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest 21from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 22 23from scapy.all import ICMPv6ND_RA 24from scapy.all import rdpcap 25from scapy.all import Scapy_Exception 26 27import acts.base_test 28import acts_contrib.test_utils.wifi.wifi_test_utils as wutils 29 30import copy 31import os 32import random 33import time 34 35WifiEnums = wutils.WifiEnums 36 37RA_SCRIPT = 'sendra.py' 38PROC_NET_SNMP6 = '/proc/net/snmp6' 39LIFETIME_FRACTION = 6 40LIFETIME = 180 41INTERVAL = 2 42WLAN0= "wlan0" 43 44 45class ApfCountersTest(WifiBaseTest): 46 47 def __init__(self, controllers): 48 WifiBaseTest.__init__(self, controllers) 49 self.tests = ("test_IPv6_RA_packets", 50 "test_IPv6_RA_with_RTT", ) 51 52 def setup_class(self): 53 super().setup_class() 54 self.dut = self.android_devices[0] 55 wutils.wifi_test_device_init(self.dut) 56 req_params = ["scapy"] 57 opt_param = ["reference_networks"] 58 59 self.unpack_userparams( 60 req_param_names=req_params, opt_param_names=opt_param) 61 62 if "AccessPoint" in self.user_params: 63 self.legacy_configure_ap_and_start() 64 65 asserts.assert_true( 66 len(self.reference_networks) > 0, 67 "Need at least one reference network with psk.") 68 wutils.wifi_toggle_state(self.dut, True) 69 70 self.wpapsk_2g = self.reference_networks[0]["2g"] 71 self.wpapsk_5g = self.reference_networks[0]["5g"] 72 73 # install scapy 74 current_dir = os.path.dirname(os.path.realpath(__file__)) 75 send_ra = os.path.join(current_dir, RA_SCRIPT) 76 self.access_points[0].install_scapy(self.scapy[0], send_ra) 77 self.tcpdump_pid = None 78 79 def setup_test(self): 80 if 'RTT' not in self.test_name: 81 self.tcpdump_pid = start_tcpdump(self.dut, self.test_name, WLAN0) 82 83 def teardown_test(self): 84 if 'RTT' not in self.test_name: 85 stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) 86 87 def on_fail(self, test_name, begin_time): 88 self.dut.take_bug_report(test_name, begin_time) 89 self.dut.cat_adb_log(test_name, begin_time) 90 91 def teardown_class(self): 92 if "AccessPoint" in self.user_params: 93 del self.user_params["reference_networks"] 94 self.access_points[0].cleanup_scapy() 95 wutils.reset_wifi(self.dut) 96 self.dut.adb.shell("settings put global stay_on_while_plugged_in 7") 97 98 """ Helper methods """ 99 100 def _get_icmp6intype134(self): 101 """ Get ICMP6 Type 134 packet count on the DUT 102 103 Returns: 104 Number of ICMP6 type 134 packets 105 """ 106 cmd = "grep Icmp6InType134 %s || true" % PROC_NET_SNMP6 107 ra_count = self.dut.adb.shell(cmd) 108 if not ra_count: 109 return 0 110 ra_count = int(ra_count.split()[-1].rstrip()) 111 self.dut.log.info("RA Count %s:" % ra_count) 112 return ra_count 113 114 def _get_rtt_list_from_tcpdump(self, pcap_file): 115 """ Get RTT of each RA pkt in a list 116 117 Args: 118 pcap_file: tcpdump file from the DUT 119 120 Returns: 121 List of RTT of 400 pkts 122 """ 123 rtt = [] 124 try: 125 packets = rdpcap(pcap_file) 126 except Scapy_Exception: 127 self.log.error("Not a valid pcap file") 128 return rtt 129 130 for pkt in packets: 131 if ICMPv6ND_RA in pkt: 132 rtt.append(int(pkt[ICMPv6ND_RA].retranstimer)) 133 return rtt 134 135 """ Tests """ 136 137 @test_tracker_info(uuid="bc8d3f27-582a-464a-be30-556f07b77ee1") 138 def test_IPv6_RA_packets(self): 139 """ Test if the device filters the IPv6 packets 140 141 Steps: 142 1. Send a RA packet to DUT. DUT should accept this 143 2. Send duplicate RA packets. The RA packets should be filtered 144 for the next 30 seconds (1/6th of RA lifetime) 145 3. The next RA packets should be accepted 146 """ 147 # get mac address of the dut 148 ap = self.access_points[0] 149 wifi_network = copy.deepcopy(self.wpapsk_5g) 150 wifi_network['meteredOverride'] = 1 151 wutils.connect_to_wifi_network(self.dut, wifi_network) 152 mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address'] 153 self.log.info("mac_addr %s" % mac_addr) 154 time.sleep(30) # wait 30 sec before sending RAs 155 156 # get the current ra count 157 ra_count = self._get_icmp6intype134() 158 159 # Start scapy to send RA to the phone's MAC 160 ap.send_ra('wlan1', mac_addr, 0, 1) 161 162 # get the latest ra count 163 ra_count_latest = self._get_icmp6intype134() 164 asserts.assert_true(ra_count_latest == ra_count + 1, 165 "Device dropped the first RA in sequence") 166 self.dut.adb.shell("settings put global stay_on_while_plugged_in 0") 167 self.dut.droid.goToSleepNow() 168 169 # Generate and send 'x' number of duplicate RAs, for 1/6th of the the 170 # lifetime of the original RA. Test assumes that the original RA has a 171 # lifetime of 180s. Hence, all RAs received within the next 30s of the 172 # original RA should be filtered. 173 ra_count = ra_count_latest 174 count = LIFETIME / LIFETIME_FRACTION / INTERVAL 175 ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=count) 176 177 # Fail test if at least 90% of RAs were not dropped. 178 ra_count_latest = self._get_icmp6intype134() 179 pkt_loss = count - (ra_count_latest - ra_count) 180 percentage_loss = float(pkt_loss) / count * 100 181 asserts.assert_true(percentage_loss >= 90, "Device did not filter " 182 "duplicate RAs correctly. %d Percent of duplicate" 183 " RAs were accepted" % (100 - percentage_loss)) 184 185 # Any new RA after this should be accepted. 186 ap.send_ra('wlan1', mac_addr, interval=INTERVAL, count=1) 187 ra_count_latest = self._get_icmp6intype134() 188 asserts.assert_true(ra_count_latest == ra_count + 1, 189 "Device did not accept new RA after 1/6th time " 190 "interval. Device dropped a valid RA in sequence.") 191 192 @test_tracker_info(uuid="d2a0aff0-048c-475f-9bba-d90d8d9ebae3") 193 def test_IPv6_RA_with_RTT(self): 194 """Test if the device filters IPv6 RA packets with different re-trans time 195 196 Steps: 197 1. Get the current RA count 198 2. Send 400 packets with different re-trans time 199 3. Verify that RA count increased by 400 200 4. Verify internet connectivity 201 """ 202 pkt_num = 400 203 rtt_list = random.sample(range(10, 10000), pkt_num) 204 self.log.info("RTT List: %s" % rtt_list) 205 206 # get mac address of the dut 207 ap = self.access_points[0] 208 wutils.connect_to_wifi_network(self.dut, self.wpapsk_5g) 209 mac_addr = self.dut.droid.wifiGetConnectionInfo()['mac_address'] 210 self.log.info("mac_addr %s" % mac_addr) 211 time.sleep(30) # wait 30 sec before sending RAs 212 213 # get the current ra count 214 ra_count = self._get_icmp6intype134() 215 216 # start tcpdump on the device 217 tcpdump_pid = start_tcpdump(self.dut, self.test_name, WLAN0) 218 219 # send RA with differnt re-trans time 220 for rtt in rtt_list: 221 ap.send_ra('wlan1', mac_addr, 0, 1, rtt=rtt) 222 223 # stop tcpdump and pull file 224 time.sleep(60) 225 pcap_file = stop_tcpdump(self.dut, tcpdump_pid, self.test_name) 226 227 # get the new RA count 228 new_ra_count = self._get_icmp6intype134() 229 asserts.assert_true(new_ra_count >= ra_count + pkt_num, 230 "Device did not accept all RAs") 231 232 # verify the RA pkts RTT match 233 tcpdump_rtt_list = self._get_rtt_list_from_tcpdump(pcap_file) 234 asserts.assert_true(set(rtt_list).issubset(set(tcpdump_rtt_list)), 235 "RA packets didn't match with tcpdump") 236 237 # verify if internet connectivity works after sending RA packets 238 wutils.validate_connection(self.dut) 239