• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3#   Copyright 2018 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import re
18import random
19import time
20
21import acts.controllers.packet_capture as packet_capture
22import acts.signals as signals
23import acts_contrib.test_utils.wifi.rpm_controller_utils as rutils
24import acts_contrib.test_utils.wifi.wifi_datastore_utils as dutils
25import acts_contrib.test_utils.wifi.wifi_test_utils as wutils
26
27from acts import asserts
28from acts.base_test import BaseTestClass
29from acts.controllers.ap_lib import hostapd_constants
30from acts.test_decorators import test_tracker_info
31from acts_contrib.test_utils.wifi.WifiBaseTest import WifiBaseTest
32
33WifiEnums = wutils.WifiEnums
34
35WAIT_BEFORE_CONNECTION = 1
36SINGLE_BAND = 1
37DUAL_BAND = 2
38
39TIMEOUT = 60
40TEST = 'test_'
41PING_ADDR = 'www.google.com'
42
43NUM_LINK_PROBES = 3
44PROBE_DELAY_SEC = 1
45
46
47class WifiChaosTest(WifiBaseTest):
48    """ Tests for wifi IOT
49
50        Test Bed Requirement:
51          * One Android device
52          * Wi-Fi IOT networks visible to the device
53    """
54
55    def __init__(self, configs):
56        BaseTestClass.__init__(self, configs)
57        self.generate_interop_tests()
58
59    def randomize_testcases(self):
60        """Randomize the list of hosts and build a random order of tests,
61           based on SSIDs, keeping all the relevant bands of an AP together.
62
63        """
64        temp_tests = list()
65        hosts = self.user_params['interop_host']
66
67        random.shuffle(hosts)
68
69        for host in hosts:
70            ssid_2g = None
71            ssid_5g = None
72            info = dutils.show_device(host)
73
74            # Based on the information in datastore, add to test list if
75            # AP has 2G band.
76            if 'ssid_2g' in info:
77                ssid_2g = info['ssid_2g']
78                temp_tests.append(TEST + ssid_2g)
79
80            # Based on the information in datastore, add to test list if
81            # AP has 5G band.
82            if 'ssid_5g' in info:
83                ssid_5g = info['ssid_5g']
84                temp_tests.append(TEST + ssid_5g)
85
86        self.tests = temp_tests
87
88    def generate_interop_testcase(self, base_test, testcase_name, ssid_dict):
89        """Generates a single test case from the given data.
90
91        Args:
92            base_test: The base test case function to run.
93            testcase_name: The name of the test case.
94            ssid_dict: The information about the network under test.
95        """
96        ssid = testcase_name
97        test_tracker_uuid = ssid_dict[testcase_name]['uuid']
98        hostname = ssid_dict[testcase_name]['host']
99        if not testcase_name.startswith('test_'):
100            testcase_name = 'test_%s' % testcase_name
101        test_case = test_tracker_info(
102            uuid=test_tracker_uuid)(lambda: base_test(ssid, hostname))
103        setattr(self, testcase_name, test_case)
104        self.tests.append(testcase_name)
105
106    def generate_interop_tests(self):
107        for ssid_dict in self.user_params['interop_ssid']:
108            testcase_name = list(ssid_dict)[0]
109            self.generate_interop_testcase(self.interop_base_test,
110                                           testcase_name, ssid_dict)
111        self.randomize_testcases()
112
113    def setup_class(self):
114        super().setup_class()
115        self.dut = self.android_devices[0]
116        self.admin = 'admin' + str(random.randint(1000001, 12345678))
117        wutils.wifi_test_device_init(self.dut)
118        # Set country code explicitly to "US".
119        wutils.set_wifi_country_code(self.dut, wutils.WifiEnums.CountryCode.US)
120
121        asserts.assert_true(
122            self.lock_pcap(),
123            "Could not lock a Packet Capture. Aborting Interop test.")
124
125        wutils.wifi_toggle_state(self.dut, True)
126
127    def lock_pcap(self):
128        """Lock a Packet Capturere to use for the test."""
129
130        # Get list of devices from the datastore.
131        locked_pcap = False
132        devices = dutils.get_devices()
133
134        for device in devices:
135
136            device_name = device['hostname']
137            device_type = device['ap_label']
138            if device_type == 'PCAP' and not device['lock_status']:
139                if dutils.lock_device(device_name, self.admin):
140                    self.pcap_host = device_name
141                    host = device['ip_address']
142                    self.log.info("Locked Packet Capture device: %s" %
143                                  device_name)
144                    locked_pcap = True
145                    break
146                else:
147                    self.log.warning("Failed to lock %s PCAP." % device_name)
148
149        if not locked_pcap:
150            return False
151
152        pcap_config = {'ssh_config': {'user': 'root'}}
153        pcap_config['ssh_config']['host'] = host
154
155        self.pcap = packet_capture.PacketCapture(pcap_config)
156        return True
157
158    def setup_test(self):
159        super().setup_test()
160        self.dut.droid.wakeLockAcquireBright()
161        self.dut.droid.wakeUpNow()
162
163    def on_pass(self, test_name, begin_time):
164        wutils.stop_pcap(self.pcap, self.pcap_procs, True)
165
166    def on_fail(self, test_name, begin_time):
167        # Sleep to ensure all failed packets are captured.
168        time.sleep(5)
169        wutils.stop_pcap(self.pcap, self.pcap_procs, False)
170        super().on_fail(test_name, begin_time)
171
172    def teardown_test(self):
173        super().teardown_test()
174        self.dut.droid.wakeLockRelease()
175        self.dut.droid.goToSleepNow()
176
177    def teardown_class(self):
178        # Unlock the PCAP.
179        if not dutils.unlock_device(self.pcap_host):
180            self.log.warning("Failed to unlock %s PCAP. Check in datastore.")
181
182    """Helper Functions"""
183
184    def scan_and_connect_by_id(self, network, net_id):
185        """Scan for network and connect using network id.
186
187        Args:
188            net_id: Integer specifying the network id of the network.
189
190        """
191        ssid = network[WifiEnums.SSID_KEY]
192        wutils.start_wifi_connection_scan_and_ensure_network_found(
193            self.dut, ssid)
194        wutils.wifi_connect_by_id(self.dut, net_id)
195
196    def run_ping(self, sec):
197        """Run ping for given number of seconds.
198
199        Args:
200            sec: Time in seconds to run teh ping traffic.
201
202        """
203        self.log.info("Finding Gateway...")
204        route_response = self.dut.adb.shell("ip route get 8.8.8.8")
205        gateway_ip = re.search('via (.*) dev', str(route_response)).group(1)
206        self.log.info("Gateway IP = %s" % gateway_ip)
207        self.log.info("Running ping for %d seconds" % sec)
208        result = self.dut.adb.shell("ping -w %d %s" % (sec, gateway_ip),
209                                    timeout=sec + 1)
210        self.log.debug("Ping Result = %s" % result)
211        if "100% packet loss" in result:
212            raise signals.TestFailure("100% packet loss during ping")
213
214    def send_link_probes(self, network):
215        """
216        Send link probes, and verify that the device and AP did not crash.
217        Also verify that at least one link probe succeeded.
218
219        Steps:
220        1. Send a few link probes.
221        2. Ensure that the device and AP did not crash (by checking that the
222           device remains connected to the expected network).
223        """
224        results = wutils.send_link_probes(self.dut, NUM_LINK_PROBES,
225                                          PROBE_DELAY_SEC)
226
227        self.log.info("Link Probe results: %s" % (results, ))
228
229        wifi_info = self.dut.droid.wifiGetConnectionInfo()
230        expected = network[WifiEnums.SSID_KEY]
231        actual = wifi_info[WifiEnums.SSID_KEY]
232        asserts.assert_equal(
233            expected, actual,
234            "Device did not remain connected after sending link probes!")
235
236    def unlock_and_turn_off_ap(self, hostname, rpm_port, rpm_ip):
237        """UNlock the AP in datastore and turn off the AP.
238
239        Args:
240            hostname: Hostname of the AP.
241            rpm_port: Port number on the RPM for the AP.
242            rpm_ip: RPM's IP address.
243
244        """
245        # Un-Lock AP in datastore.
246        self.log.debug("Un-lock AP in datastore")
247        if not dutils.unlock_device(hostname):
248            self.log.warning("Failed to unlock %s AP. Check AP in datastore.")
249        # Turn OFF AP from the RPM port.
250        rutils.turn_off_ap(rpm_port, rpm_ip)
251
252    def run_connect_disconnect(self, network, hostname, rpm_port, rpm_ip,
253                               release_ap):
254        """Run connect/disconnect to a given network in loop.
255
256           Args:
257               network: Dict, network information.
258               hostname: Hostanme of the AP to connect to.
259               rpm_port: Port number on the RPM for the AP.
260               rpm_ip: Port number on the RPM for the AP.
261               release_ap: Flag to determine if we should turn off the AP yet.
262
263           Raises: TestFailure if the network connection fails.
264
265        """
266        for attempt in range(5):
267            try:
268                begin_time = time.time()
269                ssid = network[WifiEnums.SSID_KEY]
270                net_id = self.dut.droid.wifiAddNetwork(network)
271                asserts.assert_true(net_id != -1,
272                                    "Add network %s failed" % network)
273                self.log.info("Connecting to %s" % ssid)
274                self.scan_and_connect_by_id(network, net_id)
275                self.run_ping(10)
276                # TODO(b/133369482): uncomment once bug is resolved
277                # self.send_link_probes(network)
278                wutils.wifi_forget_network(self.dut, ssid)
279                time.sleep(WAIT_BEFORE_CONNECTION)
280            except Exception as e:
281                self.log.error("Connection to %s network failed on the %d "
282                               "attempt with exception %s." %
283                               (ssid, attempt, e))
284                # TODO:(bmahadev) Uncomment after scan issue is fixed.
285                # self.dut.take_bug_report(ssid, begin_time)
286                # self.dut.cat_adb_log(ssid, begin_time)
287                if release_ap:
288                    self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)
289                raise signals.TestFailure("Failed to connect to %s" % ssid)
290
291    def get_band_and_chan(self, ssid):
292        """Get the band and channel information from SSID.
293
294        Args:
295            ssid: SSID of the network.
296
297        """
298        ssid_info = ssid.split('_')
299        self.band = ssid_info[-1]
300        for item in ssid_info:
301            # Skip over the router model part.
302            if 'ch' in item and item != ssid_info[0]:
303                self.chan = re.search(r'(\d+)', item).group(0)
304                return
305        raise signals.TestFailure("Channel information not found in SSID.")
306
307    def interop_base_test(self, ssid, hostname):
308        """Base test for all the connect-disconnect interop tests.
309
310        Args:
311            ssid: string, SSID of the network to connect to.
312            hostname: string, hostname of the AP.
313
314        Steps:
315            1. Lock AP in datstore.
316            2. Turn on AP on the rpm switch.
317            3. Run connect-disconnect in loop.
318            4. Turn off AP on the rpm switch.
319            5. Unlock AP in datastore.
320
321        """
322        network = {}
323        network['password'] = 'password'
324        network['SSID'] = ssid
325        release_ap = False
326        wutils.reset_wifi(self.dut)
327
328        # Lock AP in datastore.
329        self.log.info("Lock AP in datastore")
330
331        ap_info = dutils.show_device(hostname)
332
333        # If AP is locked by a different test admin, then we skip.
334        if ap_info['lock_status'] and ap_info['locked_by'] != self.admin:
335            raise signals.TestSkip("AP %s is locked, skipping test" % hostname)
336
337        if not dutils.lock_device(hostname, self.admin):
338            self.log.warning("Failed to lock %s AP. Unlock AP in datastore"
339                             " and try again.")
340            raise signals.TestFailure("Failed to lock AP")
341
342        band = SINGLE_BAND
343        if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info):
344            band = DUAL_BAND
345        if (band == SINGLE_BAND) or (band == DUAL_BAND and '5G' in ssid):
346            release_ap = True
347
348        # Get AP RPM attributes and Turn ON AP.
349        rpm_ip = ap_info['rpm_ip']
350        rpm_port = ap_info['rpm_port']
351
352        rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip)
353        self.log.info("Finished turning ON AP.")
354        # Experimental. Some APs take upto a min to come online.
355        time.sleep(60)
356
357        self.get_band_and_chan(ssid)
358        self.pcap.configure_monitor_mode(self.band, self.chan)
359        self.pcap_procs = wutils.start_pcap(self.pcap, self.band.lower(),
360                                            self.test_name)
361        self.run_connect_disconnect(network, hostname, rpm_port, rpm_ip,
362                                    release_ap)
363
364        # Un-lock only if it's a single band AP or we are running the last band.
365        if release_ap:
366            self.unlock_and_turn_off_ap(hostname, rpm_port, rpm_ip)
367