1#!/usr/bin/python3.4 2# 3# Copyright 2018 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18import time 19 20from acts import asserts 21from acts.test_utils.wifi.aware import aware_const as aconsts 22from acts.test_utils.wifi.aware import aware_test_utils as autils 23from acts.test_utils.wifi.aware.AwareBaseTest import AwareBaseTest 24from acts.test_utils.wifi.rtt import rtt_const as rconsts 25from acts.test_utils.wifi.rtt import rtt_test_utils as rutils 26from acts.test_utils.wifi.rtt.RttBaseTest import RttBaseTest 27 28 29class StressRangeAwareTest(AwareBaseTest, RttBaseTest): 30 """Test class for stress testing of RTT ranging to Wi-Fi Aware peers.""" 31 SERVICE_NAME = "GoogleTestServiceXY" 32 33 def __init__(self, controllers): 34 AwareBaseTest.__init__(self, controllers) 35 RttBaseTest.__init__(self, controllers) 36 37 def setup_test(self): 38 """Manual setup here due to multiple inheritance: explicitly execute the 39 setup method from both parents.""" 40 AwareBaseTest.setup_test(self) 41 RttBaseTest.setup_test(self) 42 43 def teardown_test(self): 44 """Manual teardown here due to multiple inheritance: explicitly execute the 45 teardown method from both parents.""" 46 AwareBaseTest.teardown_test(self) 47 RttBaseTest.teardown_test(self) 48 49 ############################################################################# 50 51 def run_rtt_discovery(self, init_dut, resp_mac=None, resp_peer_id=None): 52 """Perform single RTT measurement, using Aware, from the Initiator DUT to 53 a Responder. The RTT Responder can be specified using its MAC address 54 (obtained using out- of-band discovery) or its Peer ID (using Aware 55 discovery). 56 57 Args: 58 init_dut: RTT Initiator device 59 resp_mac: MAC address of the RTT Responder device 60 resp_peer_id: Peer ID of the RTT Responder device 61 """ 62 asserts.assert_true( 63 resp_mac is not None or resp_peer_id is not None, 64 "One of the Responder specifications (MAC or Peer ID)" 65 " must be provided!") 66 if resp_mac is not None: 67 id = init_dut.droid.wifiRttStartRangingToAwarePeerMac(resp_mac) 68 else: 69 id = init_dut.droid.wifiRttStartRangingToAwarePeerId(resp_peer_id) 70 try: 71 event = init_dut.ed.pop_event( 72 rutils.decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT, id), 73 rutils.EVENT_TIMEOUT) 74 result = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS][0] 75 if resp_mac is not None: 76 rutils.validate_aware_mac_result(result, resp_mac, "DUT") 77 else: 78 rutils.validate_aware_peer_id_result(result, resp_peer_id, 79 "DUT") 80 return result 81 except queue.Empty: 82 return None 83 84 def test_stress_rtt_ib_discovery_set(self): 85 """Perform a set of RTT measurements, using in-band (Aware) discovery, and 86 switching Initiator and Responder roles repeatedly. 87 88 Stress test: repeat ranging operations. Verify rate of success and 89 stability of results. 90 """ 91 p_dut = self.android_devices[0] 92 s_dut = self.android_devices[1] 93 94 (p_id, s_id, p_disc_id, s_disc_id, peer_id_on_sub, 95 peer_id_on_pub) = autils.create_discovery_pair( 96 p_dut, 97 s_dut, 98 p_config=autils.add_ranging_to_pub( 99 autils.create_discovery_config( 100 self.SERVICE_NAME, aconsts.PUBLISH_TYPE_UNSOLICITED), 101 True), 102 s_config=autils.add_ranging_to_pub( 103 autils.create_discovery_config( 104 self.SERVICE_NAME, aconsts.SUBSCRIBE_TYPE_PASSIVE), True), 105 device_startup_offset=self.device_startup_offset, 106 msg_id=self.get_next_msg_id()) 107 108 results = [] 109 start_clock = time.time() 110 iterations_done = 0 111 run_time = 0 112 while iterations_done < self.stress_test_min_iteration_count or ( 113 self.stress_test_target_run_time_sec != 0 114 and run_time < self.stress_test_target_run_time_sec): 115 results.append( 116 self.run_rtt_discovery(p_dut, resp_peer_id=peer_id_on_pub)) 117 results.append( 118 self.run_rtt_discovery(s_dut, resp_peer_id=peer_id_on_sub)) 119 120 iterations_done = iterations_done + 1 121 run_time = time.time() - start_clock 122 123 stats = rutils.extract_stats( 124 results, 125 self.rtt_reference_distance_mm, 126 self.rtt_reference_distance_margin_mm, 127 self.rtt_min_expected_rssi_dbm, 128 summary_only=True) 129 self.log.debug("Stats: %s", stats) 130 asserts.assert_true( 131 stats['num_no_results'] == 0, 132 "Missing (timed-out) results", 133 extras=stats) 134 asserts.assert_false( 135 stats['any_lci_mismatch'], "LCI mismatch", extras=stats) 136 asserts.assert_false( 137 stats['any_lcr_mismatch'], "LCR mismatch", extras=stats) 138 asserts.assert_equal( 139 stats['num_invalid_rssi'], 0, "Invalid RSSI", extras=stats) 140 asserts.assert_true( 141 stats['num_failures'] <= 142 self.rtt_max_failure_rate_two_sided_rtt_percentage * 143 stats['num_results'] / 100, 144 "Failure rate is too high", 145 extras=stats) 146 asserts.assert_true( 147 stats['num_range_out_of_margin'] <= 148 self.rtt_max_margin_exceeded_rate_two_sided_rtt_percentage * 149 stats['num_success_results'] / 100, 150 "Results exceeding error margin rate is too high", 151 extras=stats) 152