1#!/usr/bin/env python3.4 2# 3# Copyright 2017 - Google 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import json 18import queue 19import statistics 20import time 21 22from acts import asserts 23from acts.test_utils.wifi import wifi_test_utils as wutils 24from acts.test_utils.wifi.rtt import rtt_const as rconsts 25 26# arbitrary timeout for events 27EVENT_TIMEOUT = 15 28 29 30def decorate_event(event_name, id): 31 return '%s_%d' % (event_name, id) 32 33 34def wait_for_event(ad, event_name, timeout=EVENT_TIMEOUT): 35 """Wait for the specified event or timeout. 36 37 Args: 38 ad: The android device 39 event_name: The event to wait on 40 timeout: Number of seconds to wait 41 Returns: 42 The event (if available) 43 """ 44 prefix = '' 45 if hasattr(ad, 'pretty_name'): 46 prefix = '[%s] ' % ad.pretty_name 47 try: 48 event = ad.ed.pop_event(event_name, timeout) 49 ad.log.info('%s%s: %s', prefix, event_name, event['data']) 50 return event 51 except queue.Empty: 52 ad.log.info('%sTimed out while waiting for %s', prefix, event_name) 53 asserts.fail(event_name) 54 55 56def fail_on_event(ad, event_name, timeout=EVENT_TIMEOUT): 57 """Wait for a timeout period and looks for the specified event - fails if it 58 is observed. 59 60 Args: 61 ad: The android device 62 event_name: The event to wait for (and fail on its appearance) 63 """ 64 prefix = '' 65 if hasattr(ad, 'pretty_name'): 66 prefix = '[%s] ' % ad.pretty_name 67 try: 68 event = ad.ed.pop_event(event_name, timeout) 69 ad.log.info('%sReceived unwanted %s: %s', prefix, event_name, 70 event['data']) 71 asserts.fail(event_name, extras=event) 72 except queue.Empty: 73 ad.log.info('%s%s not seen (as expected)', prefix, event_name) 74 return 75 76 77def get_rtt_capabilities(ad): 78 """Get the Wi-Fi RTT capabilities from the specified device. The 79 capabilities are a dictionary keyed by rtt_const.CAP_* keys. 80 81 Args: 82 ad: the Android device 83 Returns: the capability dictionary. 84 """ 85 return json.loads(ad.adb.shell('cmd wifirtt get_capabilities')) 86 87 88def config_privilege_override(dut, override_to_no_privilege): 89 """Configure the device to override the permission check and to disallow any 90 privileged RTT operations, e.g. disallow one-sided RTT to Responders (APs) 91 which do not support IEEE 802.11mc. 92 93 Args: 94 dut: Device to configure. 95 override_to_no_privilege: True to indicate no privileged ops, False for 96 default (which will allow privileged ops). 97 """ 98 dut.adb.shell("cmd wifirtt set override_assume_no_privilege %d" % 99 (1 if override_to_no_privilege else 0)) 100 101 102def get_rtt_constrained_results(scanned_networks, support_rtt): 103 """Filter the input list and only return those networks which either support 104 or do not support RTT (IEEE 802.11mc.) 105 106 Args: 107 scanned_networks: A list of networks from scan results. 108 support_rtt: True - only return those APs which support RTT, False - only 109 return those APs which do not support RTT. 110 111 Returns: a sub-set of the scanned_networks per support_rtt constraint. 112 """ 113 matching_networks = [] 114 for network in scanned_networks: 115 if support_rtt: 116 if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in network 117 and network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]): 118 matching_networks.append(network) 119 else: 120 if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER not in network 121 or not network[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]): 122 matching_networks.append(network) 123 124 return matching_networks 125 126 127def scan_networks(dut): 128 """Perform a scan and return scan results. 129 130 Args: 131 dut: Device under test. 132 133 Returns: an array of scan results. 134 """ 135 wutils.start_wifi_connection_scan(dut) 136 return dut.droid.wifiGetScanResults() 137 138 139def scan_with_rtt_support_constraint(dut, support_rtt, repeat=0): 140 """Perform a scan and return scan results of APs: only those that support or 141 do not support RTT (IEEE 802.11mc) - per the support_rtt parameter. 142 143 Args: 144 dut: Device under test. 145 support_rtt: True - only return those APs which support RTT, False - only 146 return those APs which do not support RTT. 147 repeat: Re-scan this many times to find an RTT supporting network. 148 149 Returns: an array of scan results. 150 """ 151 for i in range(repeat + 1): 152 scan_results = scan_networks(dut) 153 aps = get_rtt_constrained_results(scan_results, support_rtt) 154 if len(aps) != 0: 155 return aps 156 157 return [] 158 159 160def select_best_scan_results(scans, select_count, lowest_rssi=-80): 161 """Select the strongest 'select_count' scans in the input list based on 162 highest RSSI. Exclude all very weak signals, even if results in a shorter 163 list. 164 165 Args: 166 scans: List of scan results. 167 select_count: An integer specifying how many scans to return at most. 168 lowest_rssi: The lowest RSSI to accept into the output. 169 Returns: a list of the strongest 'select_count' scan results from the scans 170 list. 171 """ 172 173 def takeRssi(element): 174 return element['level'] 175 176 result = [] 177 scans.sort(key=takeRssi, reverse=True) 178 for scan in scans: 179 if len(result) == select_count: 180 break 181 if scan['level'] < lowest_rssi: 182 break # rest are lower since we're sorted 183 result.append(scan) 184 185 return result 186 187 188def validate_ap_result(scan_result, range_result): 189 """Validate the range results: 190 - Successful if AP (per scan result) support 802.11mc (allowed to fail 191 otherwise) 192 - MAC of result matches the BSSID 193 194 Args: 195 scan_result: Scan result for the AP 196 range_result: Range result returned by the RTT API 197 """ 198 asserts.assert_equal( 199 scan_result[wutils.WifiEnums.BSSID_KEY], 200 range_result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID], 201 'MAC/BSSID mismatch') 202 if (rconsts.SCAN_RESULT_KEY_RTT_RESPONDER in scan_result 203 and scan_result[rconsts.SCAN_RESULT_KEY_RTT_RESPONDER]): 204 asserts.assert_true( 205 range_result[rconsts.EVENT_CB_RANGING_KEY_STATUS] == 206 rconsts.EVENT_CB_RANGING_STATUS_SUCCESS, 207 'Ranging failed for an AP which supports 802.11mc!') 208 209 210def validate_ap_results(scan_results, range_results): 211 """Validate an array of ranging results against the scan results used to 212 trigger the range. The assumption is that the results are returned in the 213 same order as the request (which were the scan results). 214 215 Args: 216 scan_results: Scans results used to trigger the range request 217 range_results: Range results returned by the RTT API 218 """ 219 asserts.assert_equal( 220 len(scan_results), len(range_results), 221 'Mismatch in length of scan results and range results') 222 223 # sort first based on BSSID/MAC 224 scan_results.sort(key=lambda x: x[wutils.WifiEnums.BSSID_KEY]) 225 range_results.sort( 226 key=lambda x: x[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING_BSSID]) 227 228 for i in range(len(scan_results)): 229 validate_ap_result(scan_results[i], range_results[i]) 230 231 232def validate_aware_mac_result(range_result, mac, description): 233 """Validate the range result for an Aware peer specified with a MAC address: 234 - Correct MAC address. 235 236 The MAC addresses may contain ":" (which are ignored for the comparison) and 237 may be in any case (which is ignored for the comparison). 238 239 Args: 240 range_result: Range result returned by the RTT API 241 mac: MAC address of the peer 242 description: Additional content to print on failure 243 """ 244 mac1 = mac.replace(':', '').lower() 245 mac2 = range_result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING].replace( 246 ':', '').lower() 247 asserts.assert_equal(mac1, mac2, '%s: MAC mismatch' % description) 248 249 250def validate_aware_peer_id_result(range_result, peer_id, description): 251 """Validate the range result for An Aware peer specified with a Peer ID: 252 - Correct Peer ID 253 - MAC address information not available 254 255 Args: 256 range_result: Range result returned by the RTT API 257 peer_id: Peer ID of the peer 258 description: Additional content to print on failure 259 """ 260 asserts.assert_equal(peer_id, 261 range_result[rconsts.EVENT_CB_RANGING_KEY_PEER_ID], 262 '%s: Peer Id mismatch' % description) 263 asserts.assert_false(rconsts.EVENT_CB_RANGING_KEY_MAC in range_result, 264 '%s: MAC Address not empty!' % description) 265 266 267def extract_stats(results, 268 range_reference_mm, 269 range_margin_mm, 270 min_rssi, 271 reference_lci=[], 272 reference_lcr=[], 273 summary_only=False): 274 """Extract statistics from a list of RTT results. Returns a dictionary 275 with results: 276 - num_results (success or fails) 277 - num_success_results 278 - num_no_results (e.g. timeout) 279 - num_failures 280 - num_range_out_of_margin (only for successes) 281 - num_invalid_rssi (only for successes) 282 - distances: extracted list of distances 283 - distance_std_devs: extracted list of distance standard-deviations 284 - rssis: extracted list of RSSI 285 - distance_mean 286 - distance_std_dev (based on distance - ignoring the individual std-devs) 287 - rssi_mean 288 - rssi_std_dev 289 - status_codes 290 - lcis: extracted list of all of the individual LCI 291 - lcrs: extracted list of all of the individual LCR 292 - any_lci_mismatch: True/False - checks if all LCI results are identical to 293 the reference LCI. 294 - any_lcr_mismatch: True/False - checks if all LCR results are identical to 295 the reference LCR. 296 - num_attempted_measurements: extracted list of all of the individual 297 number of attempted measurements. 298 - num_successful_measurements: extracted list of all of the individual 299 number of successful measurements. 300 - invalid_num_attempted: True/False - checks if number of attempted 301 measurements is non-zero for successful results. 302 - invalid_num_successful: True/False - checks if number of successful 303 measurements is non-zero for successful results. 304 305 Args: 306 results: List of RTT results. 307 range_reference_mm: Reference value for the distance (in mm) 308 range_margin_mm: Acceptable absolute margin for distance (in mm) 309 min_rssi: Acceptable minimum RSSI value. 310 reference_lci, reference_lcr: Reference values for LCI and LCR. 311 summary_only: Only include summary keys (reduce size). 312 313 Returns: A dictionary of stats. 314 """ 315 stats = {} 316 stats['num_results'] = 0 317 stats['num_success_results'] = 0 318 stats['num_no_results'] = 0 319 stats['num_failures'] = 0 320 stats['num_range_out_of_margin'] = 0 321 stats['num_invalid_rssi'] = 0 322 stats['any_lci_mismatch'] = False 323 stats['any_lcr_mismatch'] = False 324 stats['invalid_num_attempted'] = False 325 stats['invalid_num_successful'] = False 326 327 range_max_mm = range_reference_mm + range_margin_mm 328 range_min_mm = range_reference_mm - range_margin_mm 329 330 distances = [] 331 distance_std_devs = [] 332 rssis = [] 333 num_attempted_measurements = [] 334 num_successful_measurements = [] 335 status_codes = [] 336 lcis = [] 337 lcrs = [] 338 339 for i in range(len(results)): 340 result = results[i] 341 342 if result is None: # None -> timeout waiting for RTT result 343 stats['num_no_results'] = stats['num_no_results'] + 1 344 continue 345 stats['num_results'] = stats['num_results'] + 1 346 347 status_codes.append(result[rconsts.EVENT_CB_RANGING_KEY_STATUS]) 348 if status_codes[-1] != rconsts.EVENT_CB_RANGING_STATUS_SUCCESS: 349 stats['num_failures'] = stats['num_failures'] + 1 350 continue 351 stats['num_success_results'] = stats['num_success_results'] + 1 352 353 distance_mm = result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_MM] 354 distances.append(distance_mm) 355 if not range_min_mm <= distance_mm <= range_max_mm: 356 stats[ 357 'num_range_out_of_margin'] = stats['num_range_out_of_margin'] + 1 358 distance_std_devs.append( 359 result[rconsts.EVENT_CB_RANGING_KEY_DISTANCE_STD_DEV_MM]) 360 361 rssi = result[rconsts.EVENT_CB_RANGING_KEY_RSSI] 362 rssis.append(rssi) 363 if not min_rssi <= rssi <= 0: 364 stats['num_invalid_rssi'] = stats['num_invalid_rssi'] + 1 365 366 num_attempted = result[ 367 rconsts.EVENT_CB_RANGING_KEY_NUM_ATTEMPTED_MEASUREMENTS] 368 num_attempted_measurements.append(num_attempted) 369 if num_attempted == 0: 370 stats['invalid_num_attempted'] = True 371 372 num_successful = result[ 373 rconsts.EVENT_CB_RANGING_KEY_NUM_SUCCESSFUL_MEASUREMENTS] 374 num_successful_measurements.append(num_successful) 375 if num_successful == 0: 376 stats['invalid_num_successful'] = True 377 378 lcis.append(result[rconsts.EVENT_CB_RANGING_KEY_LCI]) 379 if (result[rconsts.EVENT_CB_RANGING_KEY_LCI] != reference_lci): 380 stats['any_lci_mismatch'] = True 381 lcrs.append(result[rconsts.EVENT_CB_RANGING_KEY_LCR]) 382 if (result[rconsts.EVENT_CB_RANGING_KEY_LCR] != reference_lcr): 383 stats['any_lcr_mismatch'] = True 384 385 if len(distances) > 0: 386 stats['distance_mean'] = statistics.mean(distances) 387 if len(distances) > 1: 388 stats['distance_std_dev'] = statistics.stdev(distances) 389 if len(rssis) > 0: 390 stats['rssi_mean'] = statistics.mean(rssis) 391 if len(rssis) > 1: 392 stats['rssi_std_dev'] = statistics.stdev(rssis) 393 if not summary_only: 394 stats['distances'] = distances 395 stats['distance_std_devs'] = distance_std_devs 396 stats['rssis'] = rssis 397 stats['num_attempted_measurements'] = num_attempted_measurements 398 stats['num_successful_measurements'] = num_successful_measurements 399 stats['status_codes'] = status_codes 400 stats['lcis'] = lcis 401 stats['lcrs'] = lcrs 402 403 return stats 404 405 406def run_ranging(dut, 407 aps, 408 iter_count, 409 time_between_iterations, 410 target_run_time_sec=0): 411 """Executing ranging to the set of APs. 412 413 Will execute a minimum of 'iter_count' iterations. Will continue to run 414 until execution time (just) exceeds 'target_run_time_sec'. 415 416 Args: 417 dut: Device under test 418 aps: A list of APs (Access Points) to range to. 419 iter_count: (Minimum) Number of measurements to perform. 420 time_between_iterations: Number of seconds to wait between iterations. 421 target_run_time_sec: The target run time in seconds. 422 423 Returns: a list of the events containing the RTT results (or None for a 424 failed measurement). 425 """ 426 max_peers = dut.droid.wifiRttMaxPeersInRequest() 427 428 asserts.assert_true(len(aps) > 0, "Need at least one AP!") 429 if len(aps) > max_peers: 430 aps = aps[0:max_peers] 431 432 events = {} # need to keep track per BSSID! 433 for ap in aps: 434 events[ap["BSSID"]] = [] 435 436 start_clock = time.time() 437 iterations_done = 0 438 run_time = 0 439 while iterations_done < iter_count or (target_run_time_sec != 0 440 and run_time < target_run_time_sec): 441 if iterations_done != 0 and time_between_iterations != 0: 442 time.sleep(time_between_iterations) 443 444 id = dut.droid.wifiRttStartRangingToAccessPoints(aps) 445 try: 446 event = dut.ed.pop_event( 447 decorate_event(rconsts.EVENT_CB_RANGING_ON_RESULT, id), 448 EVENT_TIMEOUT) 449 range_results = event["data"][rconsts.EVENT_CB_RANGING_KEY_RESULTS] 450 asserts.assert_equal( 451 len(aps), len(range_results), 452 'Mismatch in length of scan results and range results') 453 for result in range_results: 454 bssid = result[rconsts.EVENT_CB_RANGING_KEY_MAC_AS_STRING] 455 asserts.assert_true( 456 bssid in events, 457 "Result BSSID %s not in requested AP!?" % bssid) 458 asserts.assert_equal( 459 len(events[bssid]), iterations_done, 460 "Duplicate results for BSSID %s!?" % bssid) 461 events[bssid].append(result) 462 except queue.Empty: 463 for ap in aps: 464 events[ap["BSSID"]].append(None) 465 466 iterations_done = iterations_done + 1 467 run_time = time.time() - start_clock 468 469 return events 470 471 472def analyze_results(all_aps_events, 473 rtt_reference_distance_mm, 474 distance_margin_mm, 475 min_expected_rssi, 476 lci_reference, 477 lcr_reference, 478 summary_only=False): 479 """Verifies the results of the RTT experiment. 480 481 Args: 482 all_aps_events: Dictionary of APs, each a list of RTT result events. 483 rtt_reference_distance_mm: Expected distance to the AP (source of truth). 484 distance_margin_mm: Accepted error marging in distance measurement. 485 min_expected_rssi: Minimum acceptable RSSI value 486 lci_reference, lcr_reference: Expected LCI/LCR values (arrays of bytes). 487 summary_only: Only include summary keys (reduce size). 488 """ 489 all_stats = {} 490 for bssid, events in all_aps_events.items(): 491 stats = extract_stats(events, rtt_reference_distance_mm, 492 distance_margin_mm, min_expected_rssi, 493 lci_reference, lcr_reference, summary_only) 494 all_stats[bssid] = stats 495 return all_stats 496