1#!/usr/bin/env python3.8 2# 3# Copyright 2024 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the 'License'); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an 'AS IS' BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import collections 18import json 19import logging 20import numpy 21import pandas 22import os 23import re 24import subprocess 25import time 26from acts import asserts 27from acts import base_test 28from acts import context 29from acts import utils 30import acts_contrib.test_utils.bt.bt_test_utils as btutils 31from acts_contrib.test_utils.wifi import wifi_performance_test_utils as wputils 32from acts_contrib.test_utils.wifi import wifi_test_utils as wutils 33 34SHORT_SLEEP = 1 35MED_SLEEP = 5 36 37 38class UwbRangingLink(object): 39 def __init__(self, initiator, responder, uwb_ranging_params): 40 self.initiator_dut = initiator 41 self.responder_dut = responder 42 self.uwb_ranging_params = uwb_ranging_params 43 self.responder_dut.adb.shell('cmd uwb force-country-code enabled US') 44 self.initiator_dut.adb.shell('cmd uwb force-country-code enabled US') 45 self.responder_dut.adb.shell('cmd uwb enable-uwb') 46 self.initiator_dut.adb.shell('cmd uwb enable-uwb') 47 self.ANTENNA_MAPPING = {'ranging': 'none', 'patch': 'azimuth-only'} 48 49 def setup_initiator(self): 50 self.initiator_dut.adb.shell('cmd uwb start-fira-ranging-session ' 51 '-i 1 -c {} -t controller -r initiator -a 11 -d 22 -e {} -j 3600000'.format( 52 self.uwb_ranging_params['channel'], self.ANTENNA_MAPPING[self.uwb_ranging_params['initiator_antenna']])) 53 54 def clean_up_initiator(self): 55 self.initiator_dut.adb.shell('cmd uwb stop-all-ranging-sessions') 56 57 def measure_range(self, duration): 58 self.responder_dut.adb.shell('cmd uwb start-fira-ranging-session ' 59 '-i 1 -c {} -t controlee -r responder -a 22 -d 11 -e {} -j 3600000'.format( 60 self.uwb_ranging_params['channel'], self.ANTENNA_MAPPING[self.uwb_ranging_params['responder_antenna']])) 61 ranging_output = '' 62 for idx in range(int(duration/3)): 63 ranging_output = ranging_output + self.responder_dut.adb.shell('cmd uwb get-ranging-session-reports 1') 64 time.sleep(3) 65 self.responder_dut.adb.shell('cmd uwb stop-all-ranging-sessions') 66 logging.debug(ranging_output) 67 ranging_result = self.parse_ranging_result(ranging_output) 68 return ranging_result 69 70 def parse_ranging_result(self, ranging_output): 71 pattern = r"meters: ([\d.]+).*rssiDbm: (-?\d+)" 72 matches = re.findall(pattern, ranging_output) 73 74 measurements = [] 75 for match in matches: 76 meters, rssiDbm = match 77 measurements.append({"distance": float(meters), "rssi": int(rssiDbm)}) 78 distance_array = [result['distance'] for result in measurements] 79 rssi_array = [result['rssi'] for result in measurements] 80 avg_distance = numpy.mean(distance_array) 81 std_dev_distance = numpy.std(distance_array) 82 avg_rssi = numpy.mean(rssi_array) 83 std_dev_rssi = numpy.std(rssi_array) 84 result = { 85 'raw_results': measurements, 86 'compiled_results': {'distance' : distance_array, 'rssi': rssi_array}, 87 'summary': {'avg_distance': avg_distance, 'avg_rssi': avg_rssi, 88 'std_dev_distance': std_dev_distance, 'std_dev_rssi': std_dev_rssi, 89 } 90 } 91 return result 92 93class BtRangingLinkV2(object): 94 def __init__(self, initiator, responder, bt_ranging_params): 95 # self.dut1 = self.android_devices[0] 96 # self.dut2 = self.android_devices[1] 97 self.initiator = initiator 98 self.reflector = responder 99 self.bt_ranging_params = bt_ranging_params 100 utils.sync_device_time(self.initiator) 101 utils.sync_device_time(self.reflector) 102 self.setup_devices() 103 104 def setup_devices(self): 105 self.reflector.adb.shell("cmd bluetooth_manager disable") 106 time.sleep(MED_SLEEP) 107 try: 108 self.reflector.adb.shell("rm data/misc/bluetooth/logs/cs_log*") 109 except: 110 logging.info('Could not delete CS logs') 111 self.reflector.adb.shell("cmd bluetooth_manager enable") 112 self.setprop_overwrite_default(self.reflector) 113 self.restart_app(self.reflector) 114 time.sleep(SHORT_SLEEP) 115 self.app_click_reflector(self.reflector) 116 117 self.initiator.adb.shell("cmd bluetooth_manager disable") 118 time.sleep(MED_SLEEP) 119 try: 120 self.initiator.adb.shell("rm data/misc/bluetooth/logs/cs_log*") 121 except: 122 logging.info('Could not delete CS logs') 123 self.initiator.adb.shell("cmd bluetooth_manager enable") 124 self.setprop_overwrite_default(self.initiator) 125 def restart_app(self, device): 126 device.ensure_screen_on() 127 device.unlock_screen() 128 device.adb.shell("am force-stop com.android.bluetooth.channelsoundingtestapp") 129 time.sleep(2) 130 device.adb.shell( 131 "am start -n com.android.bluetooth.channelsoundingtestapp/com.android.bluetooth.channelsoundingtestapp.MainActivity") 132 133 def app_click_reflector(self, device): 134 # step 1 135 filename_re = re.compile(r'([^ ]+.xml)') 136 #st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(device.serial), shell=True).decode('utf-8') 137 st = device.adb.shell('uiautomator dump') 138 filename_device = filename_re.findall(st)[0] 139 device.adb.pull('{} tmp.xml'.format(filename_device)) 140 with open("tmp.xml", 'r') as f: 141 xml = f.read() 142 143 button_reflector = re.compile( 144 r'text="Reflector" resource-id="com.android.bluetooth.channelsoundingtestapp:id/button_reflector" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"' 145 ) 146 button_reflector_cord = self.get_cord(button_reflector, xml) 147 148 print("Push [Reflector] on reflector") 149 self.push_button(device,button_reflector_cord) 150 #device.adb.shell("input tap {} {}".format(button_reflector_cord[0], button_reflector_cord[1])) 151 time.sleep(0.5) 152 153 # step 2 154 filename_re = re.compile(r'([^ ]+.xml)') 155 #st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(device.serial), shell=True).decode('utf-8') 156 st = device.adb.shell('uiautomator dump') 157 filename_device = filename_re.findall(st)[0] 158 device.adb.pull('{} tmp.xml'.format(filename_device)) 159 with open("tmp.xml", 'r') as f: 160 xml = f.read() 161 162 button_start_advertising = re.compile( 163 r'text="Start Advertising" resource-id="com.android.bluetooth.channelsoundingtestapp:id/btn_advertising" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"' 164 ) 165 button_start_advertising_cord = self.get_cord(button_start_advertising, xml) 166 167 print("Push [Start Advertising] on reflector") 168 self.push_button(device, button_start_advertising_cord) 169 #device.adb.shell("input tap {} {}".format(button_start_advertising_cord[0], 170 # button_start_advertising_cord[1])) 171 time.sleep(0.5) 172 173 174 def app_click_initiator(self, device, duration=10): 175 # step 1 176 filename_re = re.compile(r'([^ ]+.xml)') 177 st = device.adb.shell('uiautomator dump') 178 #st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(device.serial), shell=True).decode('utf-8') 179 filename_device = filename_re.findall(st)[0] 180 device.adb.pull('{} tmp.xml'.format(filename_device)) 181 with open("tmp.xml", 'r') as f: 182 xml = f.read() 183 184 button_initiator = re.compile( 185 r'text="Initiator" resource-id="com.android.bluetooth.channelsoundingtestapp:id/button_initiator" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"' 186 ) 187 button_initiator_cord = self.get_cord(button_initiator, xml) 188 189 print("Push [Initiator] on initiator") 190 self.push_button(device, button_initiator_cord) 191 #device.adb.shell("input tap {} {}".format(button_initiator_cord[0], button_initiator_cord[1])) 192 time.sleep(0.5) 193 194 # step 2 195 filename_re = re.compile(r'([^ ]+.xml)') 196 st = device.adb.shell('uiautomator dump') 197 #st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(device.serial), shell=True).decode('utf-8') 198 filename_device = filename_re.findall(st)[0] 199 device.adb.pull('{} tmp.xml'.format(filename_device)) 200 with open("tmp.xml", 'r') as f: 201 xml = f.read() 202 203 button_connect_gatt = re.compile( 204 r'text="Connect Gatt" resource-id="com.android.bluetooth.channelsoundingtestapp:id/btn_connect_gatt" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"' 205 ) 206 button_connect_gatt_cord = self.get_cord(button_connect_gatt, xml) 207 208 button_start_distance_measurement = re.compile( 209 r'text="Start Distance Measurement" resource-id="com.android.bluetooth.channelsoundingtestapp:id/btn_cs" class="android.widget.Button" package="com.android.bluetooth.channelsoundingtestapp" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"' 210 ) 211 button_start_distance_measurement_cord = self.get_cord(button_start_distance_measurement, xml) 212 213 print("Push [Connect] on initiator") 214 self.push_button(device, button_connect_gatt_cord) 215 #device.adb.shell( 216 # "input tap {} {}".format(button_connect_gatt_cord[0], button_connect_gatt_cord[1])) 217 time.sleep(MED_SLEEP) 218 219 print("Push [Start Distance Measurement] on initiator") 220 self.push_button(device, button_start_distance_measurement_cord) 221 #device.adb.shell("input tap {} {}".format(button_start_distance_measurement_cord[0], 222 # button_start_distance_measurement_cord[1])) 223 time.sleep(0.5) 224 225 # step 3 226 print('wait for {} seconds'.format(duration)) 227 time.sleep(duration) 228 229 # if want infinite until button press 230 # _ = input("Press Enter to continue...") 231 232 print("Push [Stop Distance Measurement] on initiator") 233 self.push_button(device, button_start_distance_measurement_cord) 234 #device.adb.shell("input tap {} {}".format(button_start_distance_measurement_cord[0], 235 # button_start_distance_measurement_cord[1])) 236 time.sleep(0.5) 237 238 239 def get_cord(self, p, fi): 240 x1, y1, x2, y2 = p.findall(fi)[0] 241 cord = ((int(x1) + int(x2)) // 2, (int(y1) + int(y2)) // 2) 242 return cord 243 244 245 def push_button(self, device, button_cord): 246 device.ensure_screen_on() 247 time.sleep(SHORT_SLEEP) 248 device.adb.shell("input tap {0} {1}".format(button_cord[0], button_cord[1])) 249 250 251 def setprop_and_load_cal(self, device): 252 subprocess.run(["./cs_setprop.sh", device]) 253 time.sleep(SHORT_SLEEP) 254 subprocess.run(["./load_cal_P24_zero.sh", device]) 255 time.sleep(SHORT_SLEEP) 256 257 258 def setprop_overwrite_default(self, device): 259 device.adb.shell('setprop bluetooth.core.cs.channel_map 1FFFFFFFFFFFFC7FFFFC') 260 device.adb.shell('setprop bluetooth.core.cs.max_procedure_count 4') 261 device.adb.shell('setprop bluetooth.core.cs.max_subevent_len 2000000') 262 263 264 def unlock_device(self, device): 265 device.adb.shell("input keyevent 82") 266 267 268 def start_channel_sounding(self, init_device, refl_device): 269 self.app_click_reflector(refl_device) 270 self.app_click_initiator(init_device) 271 272 def parse_bt_cs_results(self, log_path): 273 measurements = [] 274 with open(log_path,'r') as log: 275 for line in log: 276 if 'resultMeters' in line: 277 range = float(line.split(' ')[1].rstrip().rstrip(',')) 278 if range == -1.976171: 279 measurements.append(float('nan')) 280 else: 281 measurements.append(range) 282 avg_distance = numpy.mean(measurements) 283 std_dev_distance = numpy.std(measurements) 284 avg_rssi = 0 285 std_dev_rssi = 0 286 result = { 287 'measurements': measurements, 288 'summary': {'avg_distance': avg_distance, 'avg_rssi': avg_rssi, 289 'std_dev_distance': std_dev_distance, 'std_dev_rssi': std_dev_rssi, 290 } 291 } 292 return result 293 294 def collect_log_and_rename(self, device, test_name): 295 files = device.get_file_names("data/misc/bluetooth/logs") 296 log_folder = context.get_current_context().get_full_output_path() + "/cs_logs" 297 os.system("mkdir {0}".format(log_folder)) 298 for filename in files: 299 if "cs_log" not in filename: 300 continue 301 log_path = context.get_current_context().get_full_output_path() + "/cs_logs/" + test_name + ".txt" 302 device.pull_files(filename, log_path) 303 return log_path 304 def do_channel_sounding(self, test_name="temp", duration=20): 305 self.initiator.adb.shell("cmd bluetooth_manager enable") 306 307 # App setup 308 self.restart_app(self.initiator) 309 time.sleep(SHORT_SLEEP) 310 311 self.app_click_initiator(self.initiator, duration) 312 313 # self.collect_log_and_rename(self.initiator, test_name=test_name) 314 log_filepath = "./{0}".format(test_name) 315 #adb("pull data/misc/bluetooth/logs {0}".format(log_filepath), device.serial) 316 #self.initiator.pull_files('data/misc/bluetooth/logs', log_filepath) 317 318 log_file_path = self.collect_log_and_rename(self.initiator, test_name=test_name) 319 result = self.parse_bt_cs_results(log_file_path) 320 321 self.initiator.adb.shell("cmd bluetooth_manager disable") 322 self.initiator.adb.shell("rm data/misc/bluetooth/logs/cs_log*") 323 return result 324 325class BtRangingLink(object): 326 def __init__(self, initiator, responder, bt_ranging_params): 327 # self.dut1 = self.android_devices[0] 328 # self.dut2 = self.android_devices[1] 329 self.initiator = initiator 330 self.reflector = responder 331 self.bt_ranging_params = bt_ranging_params 332 self.CSParameters = bt_ranging_params['CSParameters'] 333 utils.sync_device_time(self.initiator) 334 utils.sync_device_time(self.reflector) 335 self.setup_devices() 336 337 def setup_devices(self): 338 # CS setprop 339 for dut in [self.initiator, self.reflector]: 340 self.cs_setprop(dut) 341 time.sleep(SHORT_SLEEP) 342 logging.info('Loading BT cal') 343 subprocess.call([self.bt_ranging_params['calibration_file'], dut.serial]) 344 #self.load_cal(dut) 345 time.sleep(SHORT_SLEEP) 346 dut.button_cord = self.app_setup(dut) 347 time.sleep(MED_SLEEP) 348 349 def cs_setprop(self, dut): 350 logging.info("{0} setting CS prop ...".format(dut.serial)) 351 for key in self.CSParameters: 352 dut.adb.shell("setprop bluetooth.core.cs.{0} {1}".format(key, self.CSParameters[key])) 353 354 ori_mask = dut.adb.getprop("persist.bluetooth.bqr.event_mask") 355 new_mask = (int(ori_mask) | 1) 356 dut.adb.shell("setprop persist.bluetooth.bqr.event_mask {0}".format(new_mask)) 357 dut.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms 500") 358 359 dut.adb.shell("touch data/misc/bluetooth/logs/cs_log_tmp.txt") 360 dut.adb.shell("cmd bluetooth_manager disable") 361 dut.adb.shell("cmd bluetooth_manager wait-for-state:STATE_OFF") 362 dut.adb.shell("rm data/misc/bluetooth/logs/cs_log*") 363 dut.adb.shell("cmd bluetooth_manager enable") 364 365 time.sleep(MED_SLEEP) 366 367 def load_cal(self, dut): 368 logging.info("{0} loading calibration file ...".format(dut.serial)) 369 with open(self.bt_ranging_params['calibration_file'], 'r') as f: 370 for line in f: 371 if not line[0] == '#': 372 dut.adb.shell("/vendor/bin/hw/hci_inject -c {0}".format(line)) 373 374 def app_setup(self, dut): 375 logging.info("{0} restarting CS App ...".format(dut.serial)) 376 dut.ensure_screen_on() 377 dut.unlock_screen() 378 self._restart_app(dut) 379 380 button_connect_gatt = re.compile( 381 r'text="Connect Gatt" resource-id="com.example.ble_test:id/btn_connect_gatt" class="android.widget.Button" package="com.example.ble_test" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"' 382 ) 383 button_adv = re.compile( 384 r'text="Start Adv" resource-id="com.example.ble_test:id/btn_adv" class="android.widget.Button" package="com.example.ble_test" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"' 385 ) 386 button_clear_log = re.compile( 387 r'text="Clear log" resource-id="com.example.ble_test:id/btn_clear_log" class="android.widget.Button" package="com.example.ble_test" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"' 388 ) 389 button_cs = re.compile( 390 r'text="Start CS" resource-id="com.example.ble_test:id/btn_cs" class="android.widget.Button" package="com.example.ble_test" content-desc="" checkable="false" checked="false" clickable="true" enabled="true" focusable="true" focused="false" scrollable="false" long-clickable="false" password="false" selected="false" bounds="\[([0-9]*),([0-9]*)]\[([0-9]*),([0-9]*)\]"' 391 ) 392 393 dut_xml = "" 394 filename_re = re.compile(r'([^ ]+.xml)') 395 st = subprocess.check_output('adb -s {0} shell uiautomator dump'.format(dut.serial), shell=True).decode('utf-8') 396 filename_device = filename_re.findall(st)[0] 397 dut.adb.pull("{0} tmp.xml".format(filename_device)) 398 with open("tmp.xml", 'r') as f: 399 dut_xml = f.read() 400 connect_gatt_cord = self._get_cord(button_connect_gatt, dut_xml) 401 adv_cord = self._get_cord(button_adv, dut_xml) 402 clear_log_cord = self._get_cord(button_clear_log, dut_xml) 403 cs_cord = self._get_cord(button_cs, dut_xml) 404 button_cord = {"connect_gatt": connect_gatt_cord, 405 "adv": adv_cord, 406 "clear_log": clear_log_cord, 407 "cs": cs_cord} 408 409 return button_cord 410 411 def collect_log_and_rename(self, device, test_name): 412 files = device.get_file_names("data/misc/bluetooth/logs") 413 log_folder = context.get_current_context().get_full_output_path() + "/cs_logs" 414 os.system("mkdir {0}".format(log_folder)) 415 for filename in files: 416 if "cs_log" not in filename: 417 continue 418 log_path = context.get_current_context().get_full_output_path() + "/cs_logs/" + test_name + ".txt" 419 device.pull_files(filename, log_path) 420 return log_path 421 422 def collect_bt_metric(self, tag): 423 self._get_bt_link_metrics(self.initiator, duration=5, bqr_tag='Monitoring , Handle: 0x0040', tag=tag) 424 self._get_bt_link_metrics(self.reflector, duration=5, bqr_tag='Monitoring , Handle: 0x0040', tag=tag) 425 426 427 def start_advertising(self): 428 # Start CS 429 logging.info("Push [Start Adv] on reflector") 430 self._push_button(self.reflector, self.reflector.button_cord["adv"]) 431 time.sleep(SHORT_SLEEP) 432 433 def parse_bt_cs_results(self, log_path): 434 measurements = [] 435 with open(log_path,'r') as log: 436 for line in log: 437 if 'resultMeters' in line: 438 range = float(line.split(' ')[1].rstrip()) 439 if range == -1.976171: 440 measurements.append(float('nan')) 441 else: 442 measurements.append(range) 443 avg_distance = numpy.mean(measurements) 444 std_dev_distance = numpy.std(measurements) 445 avg_rssi = 0 446 std_dev_rssi = 0 447 result = { 448 'measurements': measurements, 449 'summary': {'avg_distance': avg_distance, 'avg_rssi': avg_rssi, 450 'std_dev_distance': std_dev_distance, 'std_dev_rssi': std_dev_rssi, 451 } 452 } 453 return result 454 def do_channel_sounding(self, test_name="cs_log_temp"): 455 self.cs_setprop(self.initiator) 456 self.initiator.button_cord = self.app_setup(self.initiator) 457 458 logging.info("Push [Connect Gatt] on initiator") 459 self._push_button(self.initiator, self.initiator.button_cord["connect_gatt"]) 460 found = self._wait_for_keyword(self.initiator, "CYDBG: MTU changed to: 517", start_time=utils.get_current_epoch_time(), timeout=10) 461 time.sleep(MED_SLEEP) 462 if not found: 463 return False 464 465 logging.info("Push [Start CS] on initiator") 466 push_button_time = utils.get_current_epoch_time() 467 self._push_button(self.initiator, self.initiator.button_cord["cs"]) 468 469 keyword_finish_procedures = "CYDBG: Add Node {0} with distance".format(self.CSParameters["max_procedure_count"]) 470 timeout = self.CSParameters["max_procedure_count"] * self.CSParameters["min_procedure_interval"] * self.CSParameters["conn_interval"] * 1.25 / 1000 * 1.5 # may need to adjust according to max_procedure_count 471 self._wait_for_keyword(self.initiator, keyword_finish_procedures, push_button_time, timeout) 472 time.sleep(MED_SLEEP) 473 474 log_file_path = self.collect_log_and_rename(self.initiator, test_name=test_name) 475 result = self.parse_bt_cs_results(log_file_path) 476 #self.log.info("Total Test run time: {0} s".format((utils.get_current_epoch_time() - self.begin_time)/1000.0)) 477 return result 478 479 def _restart_app(self, device): 480 device.adb.shell("am force-stop com.example.ble_test") 481 time.sleep(2) 482 device.adb.shell("am start -n com.example.ble_test/com.example.ble_test.MainActivity") 483 484 def _restart_bt(self, device): 485 device.adb.shell("cmd bluetooth_manager disable") 486 device.adb.shell("cmd bluetooth_manager wait-for-state:STATE_OFF") 487 device.adb.shell("cmd bluetooth_manager enable") 488 489 def _get_cord(self, p, fi): 490 x1, y1, x2, y2 = p.findall(fi)[0] 491 cord = ((int(x1) + int(x2)) // 2, (int(y1) + int(y2)) // 2) 492 return cord 493 494 def _wait_for_keyword(self, dut, keyword, start_time, timeout=600): 495 wait = True 496 while wait: 497 if utils.get_current_epoch_time() - start_time > timeout * 1000: 498 logging.info("Wait for {0} timeout".format(keyword)) 499 return False 500 time.sleep(0.5) 501 result = dut.search_logcat(keyword, start_time, utils.get_current_epoch_time()) 502 if len(result) > 0: 503 wait = False 504 logging.info("Found \"{0}\" with wait time {1} s".format(keyword, ( 505 utils.get_current_epoch_time() - start_time) / 1000.0)) 506 return True 507 508 def _push_button(self, dut, button_cord): 509 dut.ensure_screen_on() 510 #dut.unlock_screen() 511 time.sleep(SHORT_SLEEP) 512 dut.adb.shell("input tap {0} {1}".format(button_cord[0], button_cord[1])) 513 514 def _read_distance_from_logcat(self, logcat_filename): 515 distance_readout = [] 516 counter_readout = [] 517 with open(logcat_filename, 'r') as f: 518 for line in f: 519 if "---- Distance" in line: 520 distance_readout.append(float(line.split()[-2])) 521 if "---- End of Procedure complete counter" in line: 522 counter_readout.append(int(line.split()[-6].split(':')[-1])) 523 524 distance_readout = numpy.array(distance_readout) 525 counter_readout = numpy.array(counter_readout) 526 527 return counter_readout, distance_readout 528 529 def _get_bt_link_metrics(self, dut, duration=5, bqr_tag='Monitoring , Handle: 0x0040', tag=''): 530 """Get bt link metrics such as rssi and tx pwls. 531 532 Returns: 533 master_metrics_list: list of metrics of central device 534 slave_metrics_list: list of metric of peripheral device 535 """ 536 537 self.raw_bt_metrics_path = os.path.join(context.get_current_context().get_full_output_path(), 538 'BT_Raw_Metrics') 539 540 # Get master rssi and power level 541 process_data_dict = btutils.get_bt_metric( 542 dut, duration=duration, bqr_tag=bqr_tag, tag=tag, log_path=self.raw_bt_metrics_path) 543 rssi_master = process_data_dict.get('rssi') 544 pwl_master = process_data_dict.get('pwlv') 545 rssi_c0_master = process_data_dict.get('rssi_c0') 546 rssi_c1_master = process_data_dict.get('rssi_c1') 547 txpw_c0_master = process_data_dict.get('txpw_c0') 548 txpw_c1_master = process_data_dict.get('txpw_c1') 549 bftx_master = process_data_dict.get('bftx') 550 divtx_master = process_data_dict.get('divtx') 551 linkquality_master = process_data_dict.get('linkquality') 552 553 condition = False 554 if condition: 555 rssi_slave = btutils.get_bt_rssi(self.bt_device, 556 tag=tag, 557 log_path=self.raw_bt_metrics_path) 558 else: 559 rssi_slave = None 560 561 master_metrics_list = [ 562 rssi_master, pwl_master, rssi_c0_master, rssi_c1_master, 563 txpw_c0_master, txpw_c1_master, bftx_master, divtx_master, linkquality_master 564 ] 565 slave_metrics_list = [rssi_slave] 566 567 # rssi, pwlv, rssi_c0, rssi_c1, txpw_c0, txpw_c1, bftx, divtx 568 569 return master_metrics_list, slave_metrics_list 570 571 572class RangingComparisonTest(base_test.BaseTestClass): 573 574 def __init__(self, controllers): 575 base_test.BaseTestClass.__init__(self, controllers) 576 577 def setup_class(self): 578 """Initializes common test hardware and parameters. 579 580 This function initializes hardware and compiles parameters that are 581 common to all tests in this class. 582 """ 583 self.duts = self.android_devices 584 req_params = [ 585 'wifi_ranging_params', 'bt_ranging_params', 'uwb_ranging_params' 586 ] 587 self.unpack_userparams(req_params, []) 588 589 self.testclass_results = {} 590 591 def teardown_test(self): 592 self.process_testcase_result() 593 594 def teardown_class(self): 595 user_input = input('Please reconnect {} and press enter when done.'.format(self.duts[-1].serial)) 596 time.sleep(MED_SLEEP) 597 self.duts[1].start_services() 598 if self.uwb_ranging_params['enabled']: 599 self.uwb_ranging_link.clean_up_initiator() 600 if self.wifi_ranging_params['11mc_config']['enabled'] or self.wifi_ranging_params['11az_config']['enabled']: 601 self.wifi_ranging_link.teardown_ranging_link() 602 for dev in self.android_devices: 603 wutils.wifi_toggle_state(dev, False) 604 dev.go_to_sleep() 605 606 def reset_remote_devices(self): 607 user_input = input('Ensure {} is connected and press enter when done.'.format(self.duts[-1].serial)) 608 for dev in self.android_devices: 609 dev.reboot() 610 # Turn Wifi On 611 for dev in self.android_devices: 612 # self.log.info('Turning on airplane mode.') 613 # try: 614 # asserts.assert_true(utils.force_airplane_mode(dev, True), 615 # 'Can not turn on airplane mode.') 616 # except: 617 # self.log.warning('Could not enable airplane mode!') 618 wutils.reset_wifi(dev) 619 wutils.wifi_toggle_state(dev, True) 620 621 if self.wifi_ranging_params['11mc_config']['enabled'] or self.wifi_ranging_params['11az_config']['enabled']: 622 self.log.info('Setting up WiFi ranging link.') 623 self.wifi_ranging_link = wputils.brcm_utils.RangingLink(self.duts[0], self.duts[1]) 624 self.wifi_ranging_link.setup_ranging_link(self.wifi_ranging_params['channel'], 625 self.wifi_ranging_params['bandwidth'], 626 self.wifi_ranging_params['associate_initiator'], 627 '11mc', 628 self.wifi_ranging_params['11az_config']['enabled']) 629 if self.uwb_ranging_params['enabled']: 630 self.log.info('Setting up UWB ranging link') 631 self.uwb_ranging_link = UwbRangingLink(self.duts[1], self.duts[0], self.uwb_ranging_params) 632 self.uwb_ranging_link.clean_up_initiator() 633 self.uwb_ranging_link.setup_initiator() 634 if self.bt_ranging_params['enabled']: 635 self.log.info('Setting up BT ranging link') 636 self.bt_ranging_link = BtRangingLinkV2(self.duts[0], self.duts[1], self.bt_ranging_params) 637 #self.bt_ranging_link.setup_devices() 638 #self.bt_ranging_link.start_advertising() 639 640 # try: 641 # self.wifi_ranging_link.teardown_ranging_link() 642 # except: 643 # self.log.warning('Could not tear down wifi link') 644 645 self.duts[1].stop_services() 646 time.sleep(MED_SLEEP) 647 user_input = input('Disconnect {}, position for test, and press enter when done.'.format(self.duts[-1].serial)) 648 649 def test_ranging(self): 650 651 self.test_results = collections.OrderedDict() 652 self.log.info('Starting ranging test.') 653 first_reset_needed = 1 654 while 1: 655 location = input('Enter location ID. Enter "Done" to stop the test: ') 656 if location == 'Done': 657 break 658 true_distance = input('Enter true distance (in meters) between devices: ') 659 if first_reset_needed or self.wifi_ranging_params['reset_devices']: 660 self.reset_remote_devices() 661 first_reset_needed = 0 662 location_result = collections.OrderedDict() 663 location_result['location_id'] = location 664 location_result['true_distance'] = true_distance 665 self.log.info('Starting ranging test at location {} with distance {}.'.format(location, true_distance)) 666 if self.wifi_ranging_params['11mc_config']['enabled']: 667 self.log.info('Starting WiFi ranging test.') 668 wifi_11mc_ranging_result = self.wifi_ranging_link.measure_range(self.wifi_ranging_params['num_measurements'], 669 self.wifi_ranging_params['channel'], 670 self.wifi_ranging_params['11mc_config']['bandwidth'], 671 self.wifi_ranging_params['11mc_config']['num_frames'], 672 ranging_method='11mc', 673 ant_diversity=0) 674 self.log.info(wifi_11mc_ranging_result['summary']) 675 location_result['wifi_11mc_ranging_result'] = wifi_11mc_ranging_result 676 # wifi_11mc_ant_diversity_ranging_result = self.wifi_ranging_link.measure_range(self.wifi_ranging_params['num_measurements'], 677 # self.wifi_ranging_params['channel'], 678 # self.wifi_ranging_params['11mc_config']['bandwidth'], 679 # self.wifi_ranging_params['11mc_config']['num_frames'], 680 # ranging_method='11mc', 681 # ant_diversity=1) 682 # self.log.info(wifi_11mc_ant_diversity_ranging_result['summary']) 683 # location_result['wifi_11mc_ant_diversity_ranging_result'] = wifi_11mc_ant_diversity_ranging_result 684 if self.wifi_ranging_params['11az_config']['enabled']: 685 wifi_11az_ranging_result = self.wifi_ranging_link.measure_range(self.wifi_ranging_params['num_measurements'], 686 self.wifi_ranging_params['channel'], 687 self.wifi_ranging_params['11az_config']['bandwidth'], 688 self.wifi_ranging_params['11az_config']['num_frames'], 689 ranging_method='11az') 690 self.log.info(wifi_11az_ranging_result['summary']) 691 location_result['wifi_11az_ranging_result'] = wifi_11az_ranging_result 692 if self.uwb_ranging_params['enabled']: 693 self.log.info('Starting UWB ranging test.') 694 location_result['uwb_ranging_result'] = self.uwb_ranging_link.measure_range( 695 duration=self.uwb_ranging_params['duration']) 696 self.log.info(location_result['uwb_ranging_result']) 697 if self.bt_ranging_params['enabled']: 698 self.log.info('Starting BT ranging test.') 699 location_result['bt_ranging_result'] = self.bt_ranging_link.do_channel_sounding(test_name=location) 700 self.log.info(location_result['bt_ranging_result']['summary']) 701 self.test_results[location] = location_result 702 results_file_path = os.path.join( 703 context.get_current_context().get_full_output_path(), 704 '{}.json'.format(self.current_test_name)) 705 with open(results_file_path, 'w') as results_file: 706 json.dump(wputils.serialize_dict(self.test_results), 707 results_file, 708 indent=4) 709 return self.test_results 710 711 712 713 def process_testcase_result(self): 714 pass