• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3.4
2#
3# Copyright (C) 2016 The Android Open Source Project
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may not
6# use this file except in compliance with the License. You may obtain a copy of
7# the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations under
15# the License.
16
17import pprint
18import queue
19
20import acts.base_test
21import acts.test_utils.wifi.wifi_test_utils as wutils
22import acts.utils
23from acts import asserts
24from acts.controllers.android import SL4AAPIError
25
26WifiEnums = wutils.WifiEnums
27
28# Macros for RttParam keywords
29RttParam = WifiEnums.RttParam
30# Macros for RttManager
31Rtt = WifiEnums.Rtt
32RttBW = WifiEnums.RttBW
33RttPreamble = WifiEnums.RttPreamble
34RttPeerType = WifiEnums.RttPeerType
35RttType = WifiEnums.RttType
36
37ScanResult = WifiEnums.ScanResult
38RTT_MARGIN_OF_ERROR = WifiEnums.RTT_MARGIN_OF_ERROR
39
40class WifiRTTRangingError (Exception):
41     """Error in WifiScanner Rtt."""
42
43class WifiRttManagerTest(acts.base_test.BaseTestClass):
44    """Tests for wifi's RttManager APIs."""
45    tests = None
46    MAX_RTT_AP = 10
47
48    def __init__(self, controllers):
49        acts.base_test.BaseTestClass.__init__(self, controllers)
50        self.tests = (
51            "test_support_check",
52            "test_invalid_params",
53            "test_capability_check",
54            "test_rtt_ranging_single_AP_stress",
55            "test_regular_scan_then_rtt_ranging_stress",
56            "test_gscan_then_rtt_ranging_stress"
57        )
58
59    def setup_class(self):
60        self.dut = self.android_devices[0]
61        wutils.wifi_test_device_init(self.dut)
62        required_params = (
63            "support_models",
64            "stress_num",
65            "vht80_5g",
66            "actual_distance"
67        )
68        self.unpack_userparams(required_params)
69        asserts.assert_true(self.actual_distance >= 5,
70            "Actual distance should be no shorter than 5 meters.")
71        self.visible_networks = (
72            self.vht80_5g,
73        )
74        self.default_rtt_params = {
75            RttParam.request_type: RttType.TYPE_TWO_SIDED,
76            RttParam.device_type: RttPeerType.PEER_TYPE_AP,
77            RttParam.preamble: RttPreamble.PREAMBLE_HT,
78            RttParam.bandwidth: RttBW.BW_80_SUPPORT
79        }
80        # Expected capability for devices that don't support RTT.
81        rtt_cap_neg = {
82            'lcrSupported': False,
83            'bwSupported': 0,
84            'twoSided11McRttSupported': False,
85            'preambleSupported': 0,
86            'oneSidedRttSupported': False,
87            'lciSupported': False
88        }
89        rtt_cap_shamu = {
90            'lcrSupported': False,
91            'bwSupported': 0x1C,
92            'twoSided11McRttSupported': True,
93            'preambleSupported': 6,
94            'oneSidedRttSupported': False,
95            'lciSupported': False
96        }
97        rtt_cap_bullhead = {
98            'lcrSupported': True,
99            'bwSupported': 0x1C,
100            'twoSided11McRttSupported': True,
101            'preambleSupported': 7,
102            'oneSidedRttSupported': True,
103            'lciSupported': True
104        }
105        rtt_cap_angler = {
106            'lcrSupported': True,
107            'bwSupported': 0x1C,
108            'twoSided11McRttSupported': True,
109            'preambleSupported': 6,
110            'oneSidedRttSupported': False,
111            'lciSupported': True
112        }
113        self.rtt_cap_table = {
114            "hammerhead": rtt_cap_neg,
115            "shamu": rtt_cap_shamu,
116            "volantis": rtt_cap_neg,
117            "volantisg": rtt_cap_neg,
118            "bullhead": rtt_cap_bullhead,
119            "angler": rtt_cap_angler
120        }
121
122    """Helper Functions"""
123    def invalid_params_logic(self, rtt_params):
124        try:
125            self.dut.droid.wifiRttStartRanging([rtt_params])
126        except SL4AAPIError as e:
127            e_str = str(e)
128            asserts.assert_true("IllegalArgumentException" in e_str,
129                "Missing IllegalArgumentException in %s." % e_str)
130            msg = "Got expected exception with invalid param %s." % rtt_params
131            self.log.info(msg)
132
133    def get_rtt_results(self, rtt_params):
134        """Starts RTT ranging and get results.
135
136        Args:
137            rtt_params: A list of dicts each representing an RttParam.
138
139        Returns:
140            Rtt ranging results.
141        """
142        self.log.debug("Start ranging with:\n%s" % pprint.pformat(rtt_params))
143        idx = self.dut.droid.wifiRttStartRanging(rtt_params)
144        event = None
145        try:
146            event = self.dut.ed.pop_events("WifiRttRanging%d" % idx, 30)
147            if event[0]["name"].endswith("onSuccess"):
148                results = event[0]["data"]["Results"]
149                result_len = len(results)
150                param_len = len(rtt_params)
151                asserts.assert_true(result_len == param_len,
152                    "Expected %d results, got %d." % (param_len, result_len))
153                # Add acceptable margin of error to results, which will be used
154                # during result processing.
155                for i, r in enumerate(results):
156                    bw_mode = rtt_params[i][RttParam.bandwidth]
157                    r[RttParam.margin] = RTT_MARGIN_OF_ERROR[bw_mode]
158            self.log.debug(pprint.pformat(event))
159            return event
160        except queue.Empty:
161            self.log.error("Waiting for RTT event timed out.")
162            return None
163
164    def network_selector(self, network_info):
165        """Decides if a network should be used for rtt ranging.
166
167        There are a few conditions:
168        1. This network supports 80211mc.
169        2. This network's info matches certain conditions.
170
171        This is added to better control which networks to range against instead
172        of blindly use all 80211mc networks in air.
173
174        Args:
175            network_info: A dict representing a WiFi network.
176
177        Returns:
178            True if the input network should be used for ranging, False
179            otherwise.
180        """
181        target_params = {
182            "is80211McRTTResponder": True,
183            WifiEnums.BSSID_KEY: self.vht80_5g[WifiEnums.BSSID_KEY],
184        }
185        for k, v in target_params.items():
186            if k not in network_info:
187                return False
188            if type(network_info[k]) is str:
189                network_info[k] = network_info[k].lower()
190                v = v.lower()
191            if network_info[k] != v:
192                return False
193        return True
194
195    def regular_scan_for_rtt_networks(self):
196        """Scans for 11mc-capable WiFi networks using regular wifi scan.
197
198        Networks are selected based on self.network_selector.
199
200        Returns:
201            A list of networks that have RTTResponders.
202        """
203        wutils.start_wifi_connection_scan(self.dut)
204        networks = self.dut.droid.wifiGetScanResults()
205        rtt_networks = []
206        for nw in networks:
207            if self.network_selector(nw):
208                rtt_networks.append(nw)
209        return rtt_networks
210
211    def gscan_for_rtt_networks(self):
212        """Scans for 11mc-capable WiFi networks using wifi gscan.
213
214        Networks are selected based on self.network_selector.
215
216        Returns:
217            A list of networks that have RTTResponders.
218        """
219        s = {
220            "reportEvents" : WifiEnums.REPORT_EVENT_FULL_SCAN_RESULT,
221            "band": WifiEnums.WIFI_BAND_BOTH,
222            "periodInMs": 10000,
223            "numBssidsPerScan": 32
224        }
225        idx = wutils.start_wifi_single_scan(self.android_devices[0], s)["Index"]
226        self.log.info("Scan index is %d" % idx)
227        event_name = "WifiScannerScan%donFullResult" % idx
228        def condition(event):
229            nw = event["data"]["Results"][0]
230            return self.network_selector(nw)
231        rtt_networks = []
232        try:
233            for i in range(len(self.visible_networks)):
234                event = self.dut.ed.wait_for_event(event_name, condition, 30)
235                rtt_networks.append(event["data"]["Results"][0])
236            self.log.info("Waiting for gscan to finish.")
237            event_name = "WifiScannerScan%donResults" % idx
238            event = self.dut.ed.pop_event(event_name, 300)
239            total_network_cnt = len(event["data"]["Results"][0]["ScanResults"])
240            self.log.info("Found %d networks in total." % total_network_cnt)
241            self.log.debug(rtt_networks)
242            return rtt_networks
243        except queue.Empty:
244            self.log.error("Timed out waiting for gscan result.")
245
246    def process_rtt_events(self, events):
247        """Processes rtt ranging events.
248
249        Validates RTT event types.
250        Validates RTT response status and measured RTT values.
251        Enforces success rate.
252
253        Args:
254            events: A list of callback results from RTT ranging.
255        """
256        total = aborted = failure = invalid = out_of_range = 0
257        for e in events:
258            if e["name"].endswith("onAborted"):
259                aborted += 1
260            if e["name"].endswith("onFailure"):
261                failure += 1
262            if e["name"].endswith("onSuccess"):
263                results = e["data"]["Results"]
264                for r in results:
265                    total += 1
266                    # Status needs to be "success".
267                    status = r["status"]
268                    if status != Rtt.STATUS_SUCCESS:
269                        self.log.warning("Got error status %d." % status)
270                        invalid += 1
271                        continue
272                    # RTT value should be positive.
273                    value = r["rtt"]
274                    if value <= 0:
275                        self.log.warning("Got error RTT value %d." % value)
276                        invalid += 1
277                        continue
278                    # Vadlidate values in successful responses.
279                    acd = self.actual_distance
280                    margin = r[RttParam.margin]
281                    # If the distance is >= 0, check distance only.
282                    d = r["distance"] / 100.0
283                    if d > 0:
284                        # Distance should be in acceptable range.
285                        is_d_valid = (acd - margin) <= d <= acd + (margin)
286                        if not is_d_valid:
287                            self.log.warning(("Reported distance %.2fm is out of the"
288                                " acceptable range %.2f±%.2fm.") % (d, acd, margin))
289                            out_of_range += 1
290                        continue
291                    # Check if the RTT value is in range.
292                    d = (value / 2) / 1E10 * wutils.SPEED_OF_LIGHT
293                    is_rtt_valid = (acd - margin) <= d <= (acd + margin)
294                    if not is_rtt_valid:
295                        self.log.warning(
296                           ("Distance calculated from RTT value %d - %.2fm is "
297                            "out of the acceptable range %.2f±%dm.") % (
298                            value, d, acd, margin))
299                        out_of_range += 1
300                        continue
301                    # Check if the RSSI value is in range.
302                    rssi = r["rssi"]
303                    # average rssi in 0.5dB steps, e.g. 143 implies -71.5dB,
304                    # so the valid range is 0 to 200
305                    is_rssi_valid = 0 <= rssi <= 200
306                    if not is_rssi_valid:
307                        self.log.warning(("Reported RSSI %d is out of the"
308                                " acceptable range 0-200") % rssi)
309                        out_of_range += 1
310                        continue
311        self.log.info(("Processed %d RTT events. %d aborted, %s failed. Among"
312            " the %d responses in successful callbacks, %s are invalid, %s has"
313            " RTT values that are out of range.") % (
314            len(events), aborted, failure, total, invalid, out_of_range))
315        asserts.assert_true(total > 0, "No RTT response received.")
316        # Percentage of responses that are valid should be >= 90%.
317        valid_total = float(total - invalid)
318        valid_response_rate = valid_total / total
319        self.log.info("%.2f%% of the responses are valid." % (
320                      valid_response_rate * 100))
321        asserts.assert_true(valid_response_rate >= 0.9,
322                         "Valid response rate is below 90%%.")
323        # Among the valid responses, the percentage of having an in-range RTT
324        # value should be >= 67%.
325        valid_value_rate = (total - invalid - out_of_range) / valid_total
326        self.log.info("%.2f%% of valid responses have in-range RTT value" % (
327                      valid_value_rate * 100))
328        msg = "In-range response rate is below 67%%."
329        asserts.assert_true(valid_value_rate >= 0.67, msg)
330
331    def scan_then_rtt_ranging_stress_logic(self, scan_func):
332        """Test logic to scan then do rtt ranging based on the scan results.
333
334        Steps:
335        1. Start scan and get scan results.
336        2. Filter out the networks that support rtt in scan results.
337        3. Start rtt ranging against those networks that support rtt.
338        4. Repeat
339        5. Process RTT events.
340
341        Args:
342            scan_func: A function that does a wifi scan and only returns the
343                networks that support rtt in the scan results.
344
345        Returns:
346            True if rtt behaves as expected, False otherwise.
347        """
348        total = self.stress_num
349        failed = 0
350        all_results = []
351        for i in range(total):
352            self.log.info("Iteration %d" % i)
353            rtt_networks = scan_func()
354            if not rtt_networks:
355                self.log.warning("Found no rtt network, skip this iteration.")
356                failed += 1
357                continue
358            self.log.debug("Found rtt networks:%s" % rtt_networks)
359            rtt_params = []
360            for rn in rtt_networks:
361                rtt_params.append(self.rtt_config_from_scan_result(rn))
362            results = self.get_rtt_results(rtt_params)
363            if results:
364                self.log.debug(results)
365                all_results += results
366        self.process_rtt_events(all_results)
367
368    def rtt_config_from_scan_result(self, scan_result):
369        """Creates an Rtt configuration based on the scan result of a network.
370        """
371        scan_result_channel_width_to_rtt = {
372            ScanResult.CHANNEL_WIDTH_20MHZ: RttBW.BW_20_SUPPORT,
373            ScanResult.CHANNEL_WIDTH_40MHZ: RttBW.BW_40_SUPPORT,
374            ScanResult.CHANNEL_WIDTH_80MHZ: RttBW.BW_80_SUPPORT,
375            ScanResult.CHANNEL_WIDTH_160MHZ: RttBW.BW_160_SUPPORT,
376            ScanResult.CHANNEL_WIDTH_80MHZ_PLUS_MHZ: RttBW.BW_160_SUPPORT
377        }
378        p = {}
379        freq = scan_result[RttParam.frequency]
380        p[RttParam.frequency] = freq
381        p[RttParam.BSSID] = scan_result[WifiEnums.BSSID_KEY]
382        if freq > 5000:
383            p[RttParam.preamble] = RttPreamble.PREAMBLE_VHT
384        else:
385            p[RttParam.preamble] = RttPreamble.PREAMBLE_HT
386        cf0 = scan_result[RttParam.center_freq0]
387        if cf0 > 0:
388            p[RttParam.center_freq0] = cf0
389        cf1 = scan_result[RttParam.center_freq1]
390        if cf1 > 0:
391            p[RttParam.center_freq1] = cf1
392        cw = scan_result["channelWidth"]
393        p[RttParam.channel_width] = cw
394        p[RttParam.bandwidth] = scan_result_channel_width_to_rtt[cw]
395        if scan_result["is80211McRTTResponder"]:
396            p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
397        else:
398            p[RttParam.request_type] = RttType.TYPE_ONE_SIDED
399        return p
400
401    """Tests"""
402    def test_invalid_params(self):
403        """Tests the sanity check function in RttManager.
404        """
405        param_list = [
406            {RttParam.device_type: 3},
407            {RttParam.device_type: 1, RttParam.request_type:3},
408            {
409                RttParam.device_type: 1,
410                RttParam.request_type:1,
411                RttParam.BSSID: None
412            },
413            {RttParam.BSSID: "xxxxxxxx", RttParam.number_burst: 1},
414            {RttParam.number_burst: 0, RttParam.num_samples_per_burst: -1},
415            {RttParam.num_samples_per_burst:32},
416            {
417                RttParam.num_samples_per_burst:5,
418                RttParam.num_retries_per_measurement_frame: -1
419            },
420            {RttParam.num_retries_per_measurement_frame: 4 },
421            {
422                RttParam.num_retries_per_measurement_frame: 2,
423                RttParam.num_retries_per_FTMR: -1
424            },
425            {RttParam.num_retries_per_FTMR: 4}
426        ]
427        for param in param_list:
428            self.invalid_params_logic(param)
429        return True
430
431    def test_support_check(self):
432        """No device supports device-to-device RTT; only shamu and volantis
433        devices support device-to-ap RTT.
434        """
435        model = acts.utils.trim_model_name(self.dut.model)
436        asserts.assert_true(not self.dut.droid.wifiIsDeviceToDeviceRttSupported(),
437            "Device to device is not supposed to be supported.")
438        if any([model in m for m in self.support_models]):
439            asserts.assert_true(self.dut.droid.wifiIsDeviceToApRttSupported(),
440                "%s should support device-to-ap RTT." % model)
441            self.log.info("%s supports device-to-ap RTT as expected." % model)
442        else:
443            asserts.assert_true(not self.dut.droid.wifiIsDeviceToApRttSupported(),
444                "%s should not support device-to-ap RTT." % model)
445            self.log.info(("%s does not support device-to-ap RTT as expected."
446                ) % model)
447            asserts.abort_class("Device %s does not support RTT, abort." % model)
448        return True
449
450    def test_capability_check(self):
451        """Checks the capabilities params are reported as expected.
452        """
453        caps = self.dut.droid.wifiRttGetCapabilities()
454        asserts.assert_true(caps, "Unable to get rtt capabilities.")
455        self.log.debug("Got rtt capabilities %s" % caps)
456        model = acts.utils.trim_model_name(self.dut.model)
457        asserts.assert_true(model in self.rtt_cap_table, "Unknown model %s" % model)
458        expected_caps = self.rtt_cap_table[model]
459        for k, v in expected_caps.items():
460            asserts.assert_true(k in caps, "%s missing in capabilities." % k)
461            asserts.assert_true(v == caps[k],
462                "Expected %s for %s, got %s." % (v, k, caps[k]))
463        return True
464
465    def test_discovery(self):
466        """Make sure all the expected 11mc BSSIDs are discovered properly, and
467        they are all reported as 802.11mc Rtt Responder.
468
469        Procedures:
470            1. Scan for wifi networks.
471
472        Expect:
473            All the RTT networks show up in scan results and their
474            "is80211McRTTResponder" is True.
475            All the non-RTT networks show up in scan results and their
476            "is80211McRTTResponder" is False.
477        """
478        wutils.start_wifi_connection_scan(self.dut)
479        scan_results = self.dut.droid.wifiGetScanResults()
480        self.log.debug(scan_results)
481        for n in visible_networks:
482            asserts.assert_true(wutils.match_networks(n, scan_results),
483                "Network %s was not discovered properly." % n)
484        return True
485
486    def test_missing_bssid(self):
487        """Start Rtt ranging with a config that does not have BSSID set.
488        Should not get onSuccess.
489        """
490        p = {}
491        p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
492        p[RttParam.device_type]  = RttPeerType.PEER_TYPE_AP
493        p[RttParam.preamble]     = RttPreamble.PREAMBLE_VHT
494        p[RttParam.bandwidth]    = RttBW.BW_80_SUPPORT
495        p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key]
496        p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0]
497        results = self.get_rtt_results([p])
498        asserts.assert_true(results, "Did not get any result.")
499        self.log.info(pprint.pformat(results))
500
501    def test_rtt_ranging_single_AP_stress(self):
502        """Stress test for Rtt against one AP.
503
504        Steps:
505            1. Do RTT ranging against the self.vht80_5g BSSID.
506            2. Repeat self.stress_num times.
507            3. Verify RTT results.
508        """
509        p = {}
510        p[RttParam.request_type] = RttType.TYPE_TWO_SIDED
511        p[RttParam.device_type]  = RttPeerType.PEER_TYPE_AP
512        p[RttParam.preamble]     = RttPreamble.PREAMBLE_VHT
513        p[RttParam.bandwidth]    = RttBW.BW_80_SUPPORT
514        p[RttParam.BSSID] = self.vht80_5g[WifiEnums.BSSID_KEY]
515        p[RttParam.frequency] = self.vht80_5g[WifiEnums.frequency_key]
516        p[RttParam.center_freq0] = self.vht80_5g[RttParam.center_freq0]
517        p[RttParam.channel_width] = ScanResult.CHANNEL_WIDTH_80MHZ
518        all_results = []
519        for i in range(self.stress_num):
520            self.log.info("RTT Ranging iteration %d" % (i + 1))
521            results = self.get_rtt_results([p])
522            if results:
523                all_results += results
524            else:
525                self.log.warning("Did not get result for iteration %d." % i)
526        frate = self.process_rtt_events(all_results)
527
528    def test_regular_scan_then_rtt_ranging_stress(self):
529        """Stress test for regular scan then start rtt ranging against the RTT
530        compatible networks found by the scan.
531
532        Steps:
533            1. Start a WiFi connection scan.
534            2. Get scan results.
535            3. Find all the 11mc capable BSSIDs and choose the ones to use
536               (self.network_selector)
537            4. Do RTT ranging against the selected BSSIDs, with the info from
538               the scan results.
539            5. Repeat self.stress_num times.
540            6. Verify RTT results.
541        """
542        scan_func = self.regular_scan_for_rtt_networks
543        self.scan_then_rtt_ranging_stress_logic(scan_func)
544
545    def test_gscan_then_rtt_ranging_stress(self):
546        """Stress test for gscan then start rtt ranging against the RTT
547        compatible networks found by the scan.
548
549        Steps:
550            1. Start a WifiScanner single shot scan on all channels.
551            2. Wait for full scan results of the expected 11mc capable BSSIDs.
552            3. Wait for single shot scan to finish on all channels.
553            4. Do RTT ranging against the selected BSSIDs, with the info from
554               the scan results.
555            5. Repeat self.stress_num times.
556            6. Verify RTT results.
557        """
558        scan_func = self.gscan_for_rtt_networks
559        self.scan_then_rtt_ranging_stress_logic(scan_func)
560