1#!/usr/bin/env python3.4 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import re 18import sys 19import random 20import time 21 22import acts.controllers.packet_capture as packet_capture 23import acts.signals as signals 24import acts.test_utils.wifi.rpm_controller_utils as rutils 25import acts.test_utils.wifi.wifi_datastore_utils as dutils 26import acts.test_utils.wifi.wifi_test_utils as wutils 27 28from acts import asserts 29from acts.base_test import BaseTestClass 30from acts.controllers.ap_lib import hostapd_constants 31from acts.test_decorators import test_tracker_info 32from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest 33 34WifiEnums = wutils.WifiEnums 35 36WAIT_BEFORE_CONNECTION = 1 37SINGLE_BAND = 1 38DUAL_BAND = 2 39 40TIMEOUT = 60 41TEST = 'test_' 42PING_ADDR = 'www.google.com' 43 44NUM_LINK_PROBES = 3 45PROBE_DELAY_SEC = 1 46 47 48class WifiChaosTest(WifiBaseTest): 49 """ Tests for wifi IOT 50 51 Test Bed Requirement: 52 * One Android device 53 * Wi-Fi IOT networks visible to the device 54 """ 55 56 def __init__(self, configs): 57 BaseTestClass.__init__(self, configs) 58 self.generate_interop_tests() 59 60 def randomize_testcases(self): 61 """Randomize the list of hosts and build a random order of tests, 62 based on SSIDs, keeping all the relevant bands of an AP together. 63 64 """ 65 temp_tests = list() 66 hosts = self.user_params['interop_host'] 67 68 random.shuffle(hosts) 69 70 for host in hosts: 71 ssid_2g = None 72 ssid_5g = None 73 info = dutils.show_device(host) 74 75 # Based on the information in datastore, add to test list if 76 # AP has 2G band. 77 if 'ssid_2g' in info: 78 ssid_2g = info['ssid_2g'] 79 temp_tests.append(TEST + ssid_2g) 80 81 # Based on the information in datastore, add to test list if 82 # AP has 5G band. 83 if 'ssid_5g' in info: 84 ssid_5g = info['ssid_5g'] 85 temp_tests.append(TEST + ssid_5g) 86 87 self.tests = temp_tests 88 89 def generate_interop_testcase(self, base_test, testcase_name, ssid_dict): 90 """Generates a single test case from the given data. 91 92 Args: 93 base_test: The base test case function to run. 94 testcase_name: The name of the test case. 95 ssid_dict: The information about the network under test. 96 """ 97 ssid = testcase_name 98 test_tracker_uuid = ssid_dict[testcase_name]['uuid'] 99 hostname = ssid_dict[testcase_name]['host'] 100 if not testcase_name.startswith('test_'): 101 testcase_name = 'test_%s' % testcase_name 102 test_case = test_tracker_info(uuid=test_tracker_uuid)( 103 lambda: base_test(ssid, hostname)) 104 setattr(self, testcase_name, test_case) 105 self.tests.append(testcase_name) 106 107 def generate_interop_tests(self): 108 for ssid_dict in self.user_params['interop_ssid']: 109 testcase_name = list(ssid_dict)[0] 110 self.generate_interop_testcase(self.interop_base_test, 111 testcase_name, ssid_dict) 112 self.randomize_testcases() 113 114 def setup_class(self): 115 self.dut = self.android_devices[0] 116 self.admin = 'admin' + str(random.randint(1000001, 12345678)) 117 wutils.wifi_test_device_init(self.dut) 118 # Set country code explicitly to "US". 119 self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US) 120 121 asserts.assert_true( 122 self.lock_pcap(), 123 "Could not lock a Packet Capture. Aborting Interop test.") 124 125 wutils.wifi_toggle_state(self.dut, True) 126 127 def lock_pcap(self): 128 """Lock a Packet Capturere to use for the test.""" 129 130 # Get list of devices from the datastore. 131 locked_pcap = False 132 devices = dutils.get_devices() 133 134 for device in devices: 135 136 device_name = device['hostname'] 137 device_type = device['ap_label'] 138 if device_type == 'PCAP' and not device['lock_status']: 139 if dutils.lock_device(device_name, self.admin): 140 self.pcap_host = device_name 141 host = device['ip_address'] 142 self.log.info("Locked Packet Capture device: %s" % device_name) 143 locked_pcap = True 144 break 145 else: 146 self.log.warning("Failed to lock %s PCAP." % device_name) 147 148 if not locked_pcap: 149 return False 150 151 pcap_config = {'ssh_config':{'user':'root'} } 152 pcap_config['ssh_config']['host'] = host 153 154 self.pcap = packet_capture.PacketCapture(pcap_config) 155 return True 156 157 def setup_test(self): 158 self.dut.droid.wakeLockAcquireBright() 159 self.dut.droid.wakeUpNow() 160 161 def on_pass(self, test_name, begin_time): 162 wutils.stop_pcap(self.pcap, self.pcap_procs, True) 163 164 def on_fail(self, test_name, begin_time): 165 # Sleep to ensure all failed packets are captured. 166 time.sleep(5) 167 wutils.stop_pcap(self.pcap, self.pcap_procs, False) 168 self.dut.take_bug_report(test_name, begin_time) 169 self.dut.cat_adb_log(test_name, begin_time) 170 171 def teardown_test(self): 172 self.dut.droid.wakeLockRelease() 173 self.dut.droid.goToSleepNow() 174 175 def teardown_class(self): 176 # Unlock the PCAP. 177 if not dutils.unlock_device(self.pcap_host): 178 self.log.warning("Failed to unlock %s PCAP. Check in datastore.") 179 180 181 """Helper Functions""" 182 183 def scan_and_connect_by_id(self, network, net_id): 184 """Scan for network and connect using network id. 185 186 Args: 187 net_id: Integer specifying the network id of the network. 188 189 """ 190 ssid = network[WifiEnums.SSID_KEY] 191 wutils.start_wifi_connection_scan_and_ensure_network_found(self.dut, 192 ssid) 193 wutils.wifi_connect_by_id(self.dut, net_id) 194 195 def run_ping(self, sec): 196 """Run ping for given number of seconds. 197 198 Args: 199 sec: Time in seconds to run teh ping traffic. 200 201 """ 202 self.log.info("Running ping for %d seconds" % sec) 203 result = self.dut.adb.shell("ping -w %d %s" % (sec, PING_ADDR), 204 timeout=sec + 1) 205 self.log.debug("Ping Result = %s" % result) 206 if "100% packet loss" in result: 207 raise signals.TestFailure("100% packet loss during ping") 208 209 def send_link_probes(self, network): 210 """ 211 Send link probes, and verify that the device and AP did not crash. 212 Also verify that at least one link probe succeeded. 213 214 Steps: 215 1. Send a few link probes. 216 2. Verify that at least one link probe succeeded. 217 3. Ensure that the device and AP did not crash (by checking that the 218 device remains connected to the expected network). 219 """ 220 results = wutils.send_link_probes( 221 self.dut, NUM_LINK_PROBES, PROBE_DELAY_SEC) 222 223 asserts.assert_true(any(result.is_success for result in results), 224 "Expect at least 1 probe success: " + str(results)) 225 226 wifi_info = self.dut.droid.wifiGetConnectionInfo() 227 expected = network[WifiEnums.SSID_KEY] 228 actual = wifi_info[WifiEnums.SSID_KEY] 229 asserts.assert_equal( 230 expected, actual, 231 "Device did not remain connected after sending link probes!") 232 233 def unlock_and_turn_off_ap(self, hostname, rpm_port, rpm_ip): 234 """UNlock the AP in datastore and turn off the AP. 235 236 Args: 237 hostname: Hostname of the AP. 238 rpm_port: Port number on the RPM for the AP. 239 rpm_ip: RPM's IP address. 240 241 """ 242 # Un-Lock AP in datastore. 243 self.log.debug("Un-lock AP in datastore") 244 if not dutils.unlock_device(hostname): 245 self.log.warning("Failed to unlock %s AP. Check AP in datastore.") 246 # Turn OFF AP from the RPM port. 247 rutils.turn_off_ap(rpm_port, rpm_ip) 248 249 def run_connect_disconnect(self, network, hostname, rpm_port, rpm_ip, 250 release_ap): 251 """Run connect/disconnect to a given network in loop. 252 253 Args: 254 network: Dict, network information. 255 hostname: Hostanme of the AP to connect to. 256 rpm_port: Port number on the RPM for the AP. 257 rpm_ip: Port number on the RPM for the AP. 258 release_ap: Flag to determine if we should turn off the AP yet. 259 260 Raises: TestFailure if the network connection fails. 261 262 """ 263 for attempt in range(5): 264 try: 265 begin_time = time.time() 266 ssid = network[WifiEnums.SSID_KEY] 267 net_id = self.dut.droid.wifiAddNetwork(network) 268 asserts.assert_true(net_id != -1, "Add network %s failed" % network) 269 self.log.info("Connecting to %s" % ssid) 270 self.scan_and_connect_by_id(network, net_id) 271 self.run_ping(10) 272 self.send_link_probes(network) 273 wutils.wifi_forget_network(self.dut, ssid) 274 time.sleep(WAIT_BEFORE_CONNECTION) 275 except: 276 self.log.error("Connection to %s network failed on the %d " 277 "attempt." % (ssid, attempt)) 278 # TODO:(bmahadev) Uncomment after scan issue is fixed. 279 # self.dut.take_bug_report(ssid, begin_time) 280 # self.dut.cat_adb_log(ssid, begin_time) 281 if release_ap: 282 self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip) 283 raise signals.TestFailure("Failed to connect to %s" % ssid) 284 285 def get_band_and_chan(self, ssid): 286 """Get the band and channel information from SSID. 287 288 Args: 289 ssid: SSID of the network. 290 291 """ 292 ssid_info = ssid.split('_') 293 self.band = ssid_info[-1] 294 for item in ssid_info: 295 # Skip over the router model part. 296 if 'ch' in item and item != ssid_info[0]: 297 self.chan = re.search(r'(\d+)',item).group(0) 298 return 299 raise signals.TestFailure("Channel information not found in SSID.") 300 301 def interop_base_test(self, ssid, hostname): 302 """Base test for all the connect-disconnect interop tests. 303 304 Args: 305 ssid: string, SSID of the network to connect to. 306 hostname: string, hostname of the AP. 307 308 Steps: 309 1. Lock AP in datstore. 310 2. Turn on AP on the rpm switch. 311 3. Run connect-disconnect in loop. 312 4. Turn off AP on the rpm switch. 313 5. Unlock AP in datastore. 314 315 """ 316 network = {} 317 network['password'] = 'password' 318 network['SSID'] = ssid 319 release_ap = False 320 wutils.reset_wifi(self.dut) 321 322 # Lock AP in datastore. 323 self.log.info("Lock AP in datastore") 324 325 ap_info = dutils.show_device(hostname) 326 327 # If AP is locked by a different test admin, then we skip. 328 if ap_info['lock_status'] and ap_info['locked_by'] != self.admin: 329 raise signals.TestSkip("AP %s is locked, skipping test" % hostname) 330 331 if not dutils.lock_device(hostname, self.admin): 332 self.log.warning("Failed to lock %s AP. Unlock AP in datastore" 333 " and try again.") 334 raise signals.TestFailure("Failed to lock AP") 335 336 band = SINGLE_BAND 337 if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info): 338 band = DUAL_BAND 339 if (band == SINGLE_BAND) or ( 340 band == DUAL_BAND and '5G' in ssid): 341 release_ap = True 342 343 # Get AP RPM attributes and Turn ON AP. 344 rpm_ip = ap_info['rpm_ip'] 345 rpm_port = ap_info['rpm_port'] 346 347 rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip) 348 self.log.info("Finished turning ON AP.") 349 # Experimental. Some APs take upto a min to come online. 350 time.sleep(60) 351 352 self.get_band_and_chan(ssid) 353 self.pcap.configure_monitor_mode(self.band, self.chan) 354 self.pcap_procs = wutils.start_pcap( 355 self.pcap, self.band.lower(), self.test_name) 356 self.run_connect_disconnect(network, hostname, rpm_port, rpm_ip, 357 release_ap) 358 359 # Un-lock only if it's a single band AP or we are running the last band. 360 if release_ap: 361 self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip) 362