1#!/usr/bin/env python3.4 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import re 18import random 19import time 20 21import acts.controllers.packet_capture as packet_capture 22import acts.signals as signals 23import acts_contrib.test_utils.wifi.rpm_controller_utils as rutils 24import acts_contrib.test_utils.wifi.wifi_datastore_utils as dutils 25import acts_contrib.test_utils.wifi.wifi_test_utils as wutils 26 27from acts import asserts 28from acts.base_test import BaseTestClass 29from acts.controllers.ap_lib import hostapd_constants 30from acts.test_decorators import test_tracker_info 31from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest 32 33WifiEnums = wutils.WifiEnums 34 35WAIT_BEFORE_CONNECTION = 1 36SINGLE_BAND = 1 37DUAL_BAND = 2 38 39TIMEOUT = 60 40TEST = 'test_' 41PING_ADDR = 'www.google.com' 42 43NUM_LINK_PROBES = 3 44PROBE_DELAY_SEC = 1 45 46 47class WifiChaosTest(WifiBaseTest): 48 """ Tests for wifi IOT 49 50 Test Bed Requirement: 51 * One Android device 52 * Wi-Fi IOT networks visible to the device 53 """ 54 55 def __init__(self, configs): 56 BaseTestClass.__init__(self, configs) 57 self.generate_interop_tests() 58 59 def randomize_testcases(self): 60 """Randomize the list of hosts and build a random order of tests, 61 based on SSIDs, keeping all the relevant bands of an AP together. 62 63 """ 64 temp_tests = list() 65 hosts = self.user_params['interop_host'] 66 67 random.shuffle(hosts) 68 69 for host in hosts: 70 ssid_2g = None 71 ssid_5g = None 72 info = dutils.show_device(host) 73 74 # Based on the information in datastore, add to test list if 75 # AP has 2G band. 76 if 'ssid_2g' in info: 77 ssid_2g = info['ssid_2g'] 78 temp_tests.append(TEST + ssid_2g) 79 80 # Based on the information in datastore, add to test list if 81 # AP has 5G band. 82 if 'ssid_5g' in info: 83 ssid_5g = info['ssid_5g'] 84 temp_tests.append(TEST + ssid_5g) 85 86 self.tests = temp_tests 87 88 def generate_interop_testcase(self, base_test, testcase_name, ssid_dict): 89 """Generates a single test case from the given data. 90 91 Args: 92 base_test: The base test case function to run. 93 testcase_name: The name of the test case. 94 ssid_dict: The information about the network under test. 95 """ 96 ssid = testcase_name 97 test_tracker_uuid = ssid_dict[testcase_name]['uuid'] 98 hostname = ssid_dict[testcase_name]['host'] 99 if not testcase_name.startswith('test_'): 100 testcase_name = 'test_%s' % testcase_name 101 test_case = test_tracker_info( 102 uuid=test_tracker_uuid)(lambda: base_test(ssid, hostname)) 103 setattr(self, testcase_name, test_case) 104 self.tests.append(testcase_name) 105 106 def generate_interop_tests(self): 107 for ssid_dict in self.user_params['interop_ssid']: 108 testcase_name = list(ssid_dict)[0] 109 self.generate_interop_testcase(self.interop_base_test, 110 testcase_name, ssid_dict) 111 self.randomize_testcases() 112 113 def setup_class(self): 114 super().setup_class() 115 self.dut = self.android_devices[0] 116 self.admin = 'admin' + str(random.randint(1000001, 12345678)) 117 wutils.wifi_test_device_init(self.dut) 118 # Set country code explicitly to "US". 119 wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US) 120 121 asserts.assert_true( 122 self.lock_pcap(), 123 "Could not lock a Packet Capture. Aborting Interop test.") 124 125 wutils.wifi_toggle_state(self.dut, True) 126 127 def lock_pcap(self): 128 """Lock a Packet Capturere to use for the test.""" 129 130 # Get list of devices from the datastore. 131 locked_pcap = False 132 devices = dutils.get_devices() 133 134 for device in devices: 135 136 device_name = device['hostname'] 137 device_type = device['ap_label'] 138 if device_type == 'PCAP' and not device['lock_status']: 139 if dutils.lock_device(device_name, self.admin): 140 self.pcap_host = device_name 141 host = device['ip_address'] 142 self.log.info("Locked Packet Capture device: %s" % 143 device_name) 144 locked_pcap = True 145 break 146 else: 147 self.log.warning("Failed to lock %s PCAP." % device_name) 148 149 if not locked_pcap: 150 return False 151 152 pcap_config = {'ssh_config': {'user': 'root'}} 153 pcap_config['ssh_config']['host'] = host 154 155 self.pcap = packet_capture.PacketCapture(pcap_config) 156 return True 157 158 def setup_test(self): 159 super().setup_test() 160 self.dut.droid.wakeLockAcquireBright() 161 self.dut.droid.wakeUpNow() 162 163 def on_pass(self, test_name, begin_time): 164 wutils.stop_pcap(self.pcap, self.pcap_procs, True) 165 166 def on_fail(self, test_name, begin_time): 167 # Sleep to ensure all failed packets are captured. 168 time.sleep(5) 169 wutils.stop_pcap(self.pcap, self.pcap_procs, False) 170 super().on_fail(test_name, begin_time) 171 172 def teardown_test(self): 173 super().teardown_test() 174 self.dut.droid.wakeLockRelease() 175 self.dut.droid.goToSleepNow() 176 177 def teardown_class(self): 178 # Unlock the PCAP. 179 if not dutils.unlock_device(self.pcap_host): 180 self.log.warning("Failed to unlock %s PCAP. Check in datastore.") 181 182 """Helper Functions""" 183 184 def scan_and_connect_by_id(self, network, net_id): 185 """Scan for network and connect using network id. 186 187 Args: 188 net_id: Integer specifying the network id of the network. 189 190 """ 191 ssid = network[WifiEnums.SSID_KEY] 192 wutils.start_wifi_connection_scan_and_ensure_network_found( 193 self.dut, ssid) 194 wutils.wifi_connect_by_id(self.dut, net_id) 195 196 def run_ping(self, sec): 197 """Run ping for given number of seconds. 198 199 Args: 200 sec: Time in seconds to run teh ping traffic. 201 202 """ 203 self.log.info("Finding Gateway...") 204 route_response = self.dut.adb.shell("ip route get 8.8.8.8") 205 gateway_ip = re.search('via (.*) dev', str(route_response)).group(1) 206 self.log.info("Gateway IP = %s" % gateway_ip) 207 self.log.info("Running ping for %d seconds" % sec) 208 result = self.dut.adb.shell("ping -w %d %s" % (sec, gateway_ip), 209 timeout=sec + 1) 210 self.log.debug("Ping Result = %s" % result) 211 if "100% packet loss" in result: 212 raise signals.TestFailure("100% packet loss during ping") 213 214 def send_link_probes(self, network): 215 """ 216 Send link probes, and verify that the device and AP did not crash. 217 Also verify that at least one link probe succeeded. 218 219 Steps: 220 1. Send a few link probes. 221 2. Ensure that the device and AP did not crash (by checking that the 222 device remains connected to the expected network). 223 """ 224 results = wutils.send_link_probes(self.dut, NUM_LINK_PROBES, 225 PROBE_DELAY_SEC) 226 227 self.log.info("Link Probe results: %s" % (results, )) 228 229 wifi_info = self.dut.droid.wifiGetConnectionInfo() 230 expected = network[WifiEnums.SSID_KEY] 231 actual = wifi_info[WifiEnums.SSID_KEY] 232 asserts.assert_equal( 233 expected, actual, 234 "Device did not remain connected after sending link probes!") 235 236 def unlock_and_turn_off_ap(self, hostname, rpm_port, rpm_ip): 237 """UNlock the AP in datastore and turn off the AP. 238 239 Args: 240 hostname: Hostname of the AP. 241 rpm_port: Port number on the RPM for the AP. 242 rpm_ip: RPM's IP address. 243 244 """ 245 # Un-Lock AP in datastore. 246 self.log.debug("Un-lock AP in datastore") 247 if not dutils.unlock_device(hostname): 248 self.log.warning("Failed to unlock %s AP. Check AP in datastore.") 249 # Turn OFF AP from the RPM port. 250 rutils.turn_off_ap(rpm_port, rpm_ip) 251 252 def run_connect_disconnect(self, network, hostname, rpm_port, rpm_ip, 253 release_ap): 254 """Run connect/disconnect to a given network in loop. 255 256 Args: 257 network: Dict, network information. 258 hostname: Hostanme of the AP to connect to. 259 rpm_port: Port number on the RPM for the AP. 260 rpm_ip: Port number on the RPM for the AP. 261 release_ap: Flag to determine if we should turn off the AP yet. 262 263 Raises: TestFailure if the network connection fails. 264 265 """ 266 for attempt in range(5): 267 try: 268 begin_time = time.time() 269 ssid = network[WifiEnums.SSID_KEY] 270 net_id = self.dut.droid.wifiAddNetwork(network) 271 asserts.assert_true(net_id != -1, 272 "Add network %s failed" % network) 273 self.log.info("Connecting to %s" % ssid) 274 self.scan_and_connect_by_id(network, net_id) 275 self.run_ping(10) 276 # TODO(b/133369482): uncomment once bug is resolved 277 # self.send_link_probes(network) 278 wutils.wifi_forget_network(self.dut, ssid) 279 time.sleep(WAIT_BEFORE_CONNECTION) 280 except Exception as e: 281 self.log.error("Connection to %s network failed on the %d " 282 "attempt with exception %s." % 283 (ssid, attempt, e)) 284 # TODO:(bmahadev) Uncomment after scan issue is fixed. 285 # self.dut.take_bug_report(ssid, begin_time) 286 # self.dut.cat_adb_log(ssid, begin_time) 287 if release_ap: 288 self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip) 289 raise signals.TestFailure("Failed to connect to %s" % ssid) 290 291 def get_band_and_chan(self, ssid): 292 """Get the band and channel information from SSID. 293 294 Args: 295 ssid: SSID of the network. 296 297 """ 298 ssid_info = ssid.split('_') 299 self.band = ssid_info[-1] 300 for item in ssid_info: 301 # Skip over the router model part. 302 if 'ch' in item and item != ssid_info[0]: 303 self.chan = re.search(r'(\d+)', item).group(0) 304 return 305 raise signals.TestFailure("Channel information not found in SSID.") 306 307 def interop_base_test(self, ssid, hostname): 308 """Base test for all the connect-disconnect interop tests. 309 310 Args: 311 ssid: string, SSID of the network to connect to. 312 hostname: string, hostname of the AP. 313 314 Steps: 315 1. Lock AP in datstore. 316 2. Turn on AP on the rpm switch. 317 3. Run connect-disconnect in loop. 318 4. Turn off AP on the rpm switch. 319 5. Unlock AP in datastore. 320 321 """ 322 network = {} 323 network['password'] = 'password' 324 network['SSID'] = ssid 325 release_ap = False 326 wutils.reset_wifi(self.dut) 327 328 # Lock AP in datastore. 329 self.log.info("Lock AP in datastore") 330 331 ap_info = dutils.show_device(hostname) 332 333 # If AP is locked by a different test admin, then we skip. 334 if ap_info['lock_status'] and ap_info['locked_by'] != self.admin: 335 raise signals.TestSkip("AP %s is locked, skipping test" % hostname) 336 337 if not dutils.lock_device(hostname, self.admin): 338 self.log.warning("Failed to lock %s AP. Unlock AP in datastore" 339 " and try again.") 340 raise signals.TestFailure("Failed to lock AP") 341 342 band = SINGLE_BAND 343 if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info): 344 band = DUAL_BAND 345 if (band == SINGLE_BAND) or (band == DUAL_BAND and '5G' in ssid): 346 release_ap = True 347 348 # Get AP RPM attributes and Turn ON AP. 349 rpm_ip = ap_info['rpm_ip'] 350 rpm_port = ap_info['rpm_port'] 351 352 rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip) 353 self.log.info("Finished turning ON AP.") 354 # Experimental. Some APs take upto a min to come online. 355 time.sleep(60) 356 357 self.get_band_and_chan(ssid) 358 self.pcap.configure_monitor_mode(self.band, self.chan) 359 self.pcap_procs = wutils.start_pcap(self.pcap, self.band.lower(), 360 self.test_name) 361 self.run_connect_disconnect(network, hostname, rpm_port, rpm_ip, 362 release_ap) 363 364 # Un-lock only if it's a single band AP or we are running the last band. 365 if release_ap: 366 self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip) 367