• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import re
18import sys
19import random
20import time
21
22import acts.controllers.packet_capture as packet_capture
23import acts.signals as signals
24import acts_contrib.test_utils.wifi.rpm_controller_utils as rutils
25import acts_contrib.test_utils.wifi.wifi_datastore_utils as dutils
26import acts_contrib.test_utils.wifi.wifi_test_utils as wutils
27
28from acts import asserts
29from acts.base_test import BaseTestClass
30from acts.controllers.ap_lib import hostapd_constants
31from acts.test_decorators import test_tracker_info
32from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
33
34WifiEnums = wutils.WifiEnums
35
36WAIT_BEFORE_CONNECTION = 1
37SINGLE_BAND = 1
38DUAL_BAND = 2
39
40TIMEOUT = 60
41TEST = 'test_'
42PING_ADDR = 'www.google.com'
43
44NUM_LINK_PROBES = 3
45PROBE_DELAY_SEC = 1
46
47
48class WifiChaosTest(WifiBaseTest):
49    """ Tests for wifi IOT
50
51        Test Bed Requirement:
52          * One Android device
53          * Wi-Fi IOT networks visible to the device
54    """
55
56    def __init__(self, configs):
57        BaseTestClass.__init__(self, configs)
58        self.generate_interop_tests()
59
60    def randomize_testcases(self):
61        """Randomize the list of hosts and build a random order of tests,
62           based on SSIDs, keeping all the relevant bands of an AP together.
63
64        """
65        temp_tests = list()
66        hosts = self.user_params['interop_host']
67
68        random.shuffle(hosts)
69
70        for host in hosts:
71            ssid_2g = None
72            ssid_5g = None
73            info = dutils.show_device(host)
74
75            # Based on the information in datastore, add to test list if
76            # AP has 2G band.
77            if 'ssid_2g' in info:
78                ssid_2g = info['ssid_2g']
79                temp_tests.append(TEST + ssid_2g)
80
81            # Based on the information in datastore, add to test list if
82            # AP has 5G band.
83            if 'ssid_5g' in info:
84                ssid_5g = info['ssid_5g']
85                temp_tests.append(TEST + ssid_5g)
86
87        self.tests = temp_tests
88
89    def generate_interop_testcase(self, base_test, testcase_name, ssid_dict):
90        """Generates a single test case from the given data.
91
92        Args:
93            base_test: The base test case function to run.
94            testcase_name: The name of the test case.
95            ssid_dict: The information about the network under test.
96        """
97        ssid = testcase_name
98        test_tracker_uuid = ssid_dict[testcase_name]['uuid']
99        hostname = ssid_dict[testcase_name]['host']
100        if not testcase_name.startswith('test_'):
101            testcase_name = 'test_%s' % testcase_name
102        test_case = test_tracker_info(uuid=test_tracker_uuid)(
103            lambda: base_test(ssid, hostname))
104        setattr(self, testcase_name, test_case)
105        self.tests.append(testcase_name)
106
107    def generate_interop_tests(self):
108        for ssid_dict in self.user_params['interop_ssid']:
109            testcase_name = list(ssid_dict)[0]
110            self.generate_interop_testcase(self.interop_base_test,
111                                           testcase_name, ssid_dict)
112        self.randomize_testcases()
113
114    def setup_class(self):
115        super().setup_class()
116        self.dut = self.android_devices[0]
117        self.admin = 'admin' + str(random.randint(1000001, 12345678))
118        wutils.wifi_test_device_init(self.dut)
119        # Set country code explicitly to "US".
120        wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
121
122        asserts.assert_true(
123            self.lock_pcap(),
124            "Could not lock a Packet Capture. Aborting Interop test.")
125
126        wutils.wifi_toggle_state(self.dut, True)
127
128    def lock_pcap(self):
129        """Lock a Packet Capturere to use for the test."""
130
131        # Get list of devices from the datastore.
132        locked_pcap = False
133        devices = dutils.get_devices()
134
135        for device in devices:
136
137            device_name = device['hostname']
138            device_type = device['ap_label']
139            if device_type == 'PCAP' and not device['lock_status']:
140                if dutils.lock_device(device_name, self.admin):
141                    self.pcap_host = device_name
142                    host = device['ip_address']
143                    self.log.info("Locked Packet Capture device: %s" % device_name)
144                    locked_pcap = True
145                    break
146                else:
147                    self.log.warning("Failed to lock %s PCAP." % device_name)
148
149        if not locked_pcap:
150            return False
151
152        pcap_config = {'ssh_config':{'user':'root'} }
153        pcap_config['ssh_config']['host'] = host
154
155        self.pcap = packet_capture.PacketCapture(pcap_config)
156        return True
157
158    def setup_test(self):
159        super().setup_test()
160        self.dut.droid.wakeLockAcquireBright()
161        self.dut.droid.wakeUpNow()
162
163    def on_pass(self, test_name, begin_time):
164        wutils.stop_pcap(self.pcap, self.pcap_procs, True)
165
166    def on_fail(self, test_name, begin_time):
167        # Sleep to ensure all failed packets are captured.
168        time.sleep(5)
169        wutils.stop_pcap(self.pcap, self.pcap_procs, False)
170        super().on_fail(test_name, begin_time)
171
172    def teardown_test(self):
173        super().teardown_test()
174        self.dut.droid.wakeLockRelease()
175        self.dut.droid.goToSleepNow()
176
177    def teardown_class(self):
178        # Unlock the PCAP.
179        if not dutils.unlock_device(self.pcap_host):
180            self.log.warning("Failed to unlock %s PCAP. Check in datastore.")
181
182
183    """Helper Functions"""
184
185    def scan_and_connect_by_id(self, network, net_id):
186        """Scan for network and connect using network id.
187
188        Args:
189            net_id: Integer specifying the network id of the network.
190
191        """
192        ssid = network[WifiEnums.SSID_KEY]
193        wutils.start_wifi_connection_scan_and_ensure_network_found(self.dut,
194                                                                   ssid)
195        wutils.wifi_connect_by_id(self.dut, net_id)
196
197    def run_ping(self, sec):
198        """Run ping for given number of seconds.
199
200        Args:
201            sec: Time in seconds to run teh ping traffic.
202
203        """
204        self.log.info("Finding Gateway...")
205        route_response = self.dut.adb.shell("ip route get 8.8.8.8")
206        gateway_ip = re.search('via (.*) dev', str(route_response)).group(1)
207        self.log.info("Gateway IP = %s" % gateway_ip)
208        self.log.info("Running ping for %d seconds" % sec)
209        result = self.dut.adb.shell("ping -w %d %s" % (sec, gateway_ip),
210                                    timeout=sec + 1)
211        self.log.debug("Ping Result = %s" % result)
212        if "100% packet loss" in result:
213            raise signals.TestFailure("100% packet loss during ping")
214
215    def send_link_probes(self, network):
216        """
217        Send link probes, and verify that the device and AP did not crash.
218        Also verify that at least one link probe succeeded.
219
220        Steps:
221        1. Send a few link probes.
222        2. Ensure that the device and AP did not crash (by checking that the
223           device remains connected to the expected network).
224        """
225        results = wutils.send_link_probes(
226            self.dut, NUM_LINK_PROBES, PROBE_DELAY_SEC)
227
228        self.log.info("Link Probe results: %s" % (results,))
229
230        wifi_info = self.dut.droid.wifiGetConnectionInfo()
231        expected = network[WifiEnums.SSID_KEY]
232        actual = wifi_info[WifiEnums.SSID_KEY]
233        asserts.assert_equal(
234            expected, actual,
235            "Device did not remain connected after sending link probes!")
236
237    def unlock_and_turn_off_ap(self, hostname, rpm_port, rpm_ip):
238        """UNlock the AP in datastore and turn off the AP.
239
240        Args:
241            hostname: Hostname of the AP.
242            rpm_port: Port number on the RPM for the AP.
243            rpm_ip: RPM's IP address.
244
245        """
246        # Un-Lock AP in datastore.
247        self.log.debug("Un-lock AP in datastore")
248        if not dutils.unlock_device(hostname):
249            self.log.warning("Failed to unlock %s AP. Check AP in datastore.")
250        # Turn OFF AP from the RPM port.
251        rutils.turn_off_ap(rpm_port, rpm_ip)
252
253    def run_connect_disconnect(self, network, hostname, rpm_port, rpm_ip,
254                               release_ap):
255        """Run connect/disconnect to a given network in loop.
256
257           Args:
258               network: Dict, network information.
259               hostname: Hostanme of the AP to connect to.
260               rpm_port: Port number on the RPM for the AP.
261               rpm_ip: Port number on the RPM for the AP.
262               release_ap: Flag to determine if we should turn off the AP yet.
263
264           Raises: TestFailure if the network connection fails.
265
266        """
267        for attempt in range(5):
268            try:
269                begin_time = time.time()
270                ssid = network[WifiEnums.SSID_KEY]
271                net_id = self.dut.droid.wifiAddNetwork(network)
272                asserts.assert_true(net_id != -1, "Add network %s failed" % network)
273                self.log.info("Connecting to %s" % ssid)
274                self.scan_and_connect_by_id(network, net_id)
275                self.run_ping(10)
276                # TODO(b/133369482): uncomment once bug is resolved
277                # self.send_link_probes(network)
278                wutils.wifi_forget_network(self.dut, ssid)
279                time.sleep(WAIT_BEFORE_CONNECTION)
280            except Exception as e:
281                self.log.error("Connection to %s network failed on the %d "
282                               "attempt with exception %s." % (ssid, attempt, e))
283                # TODO:(bmahadev) Uncomment after scan issue is fixed.
284                # self.dut.take_bug_report(ssid, begin_time)
285                # self.dut.cat_adb_log(ssid, begin_time)
286                if release_ap:
287                    self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)
288                raise signals.TestFailure("Failed to connect to %s" % ssid)
289
290    def get_band_and_chan(self, ssid):
291        """Get the band and channel information from SSID.
292
293        Args:
294            ssid: SSID of the network.
295
296        """
297        ssid_info = ssid.split('_')
298        self.band = ssid_info[-1]
299        for item in ssid_info:
300            # Skip over the router model part.
301            if 'ch' in item and item != ssid_info[0]:
302                self.chan = re.search(r'(\d+)',item).group(0)
303                return
304        raise signals.TestFailure("Channel information not found in SSID.")
305
306    def interop_base_test(self, ssid, hostname):
307        """Base test for all the connect-disconnect interop tests.
308
309        Args:
310            ssid: string, SSID of the network to connect to.
311            hostname: string, hostname of the AP.
312
313        Steps:
314            1. Lock AP in datstore.
315            2. Turn on AP on the rpm switch.
316            3. Run connect-disconnect in loop.
317            4. Turn off AP on the rpm switch.
318            5. Unlock AP in datastore.
319
320        """
321        network = {}
322        network['password'] = 'password'
323        network['SSID'] = ssid
324        release_ap = False
325        wutils.reset_wifi(self.dut)
326
327        # Lock AP in datastore.
328        self.log.info("Lock AP in datastore")
329
330        ap_info = dutils.show_device(hostname)
331
332        # If AP is locked by a different test admin, then we skip.
333        if ap_info['lock_status'] and ap_info['locked_by'] != self.admin:
334            raise signals.TestSkip("AP %s is locked, skipping test" % hostname)
335
336        if not dutils.lock_device(hostname, self.admin):
337            self.log.warning("Failed to lock %s AP. Unlock AP in datastore"
338                             " and try again.")
339            raise signals.TestFailure("Failed to lock AP")
340
341        band = SINGLE_BAND
342        if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info):
343            band = DUAL_BAND
344        if (band == SINGLE_BAND) or (
345                band == DUAL_BAND and '5G' in ssid):
346            release_ap = True
347
348        # Get AP RPM attributes and Turn ON AP.
349        rpm_ip = ap_info['rpm_ip']
350        rpm_port = ap_info['rpm_port']
351
352        rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip)
353        self.log.info("Finished turning ON AP.")
354        # Experimental. Some APs take upto a min to come online.
355        time.sleep(60)
356
357        self.get_band_and_chan(ssid)
358        self.pcap.configure_monitor_mode(self.band, self.chan)
359        self.pcap_procs = wutils.start_pcap(
360                self.pcap, self.band.lower(), self.test_name)
361        self.run_connect_disconnect(network, hostname, rpm_port, rpm_ip,
362                                    release_ap)
363
364        # Un-lock only if it's a single band AP or we are running the last band.
365        if release_ap:
366            self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)
367