1# Copyright (C) 2024 The Android Open Source Project 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15import random 16import sys 17import time 18import logging 19from typing import Set 20from lib import cs 21from lib import ranging_base_test 22from lib import rssi 23from lib import rtt 24from lib import utils 25from lib import uwb 26from lib.session import RangingSession 27from lib.params import * 28from lib.ranging_decorator import * 29from mobly import asserts 30from mobly import config_parser 31from mobly import signals 32from mobly import suite_runner 33from mobly.controllers import android_device 34from android.platform.test.annotations import ApiTest 35 36 37_TEST_CASES = [ 38 "test_one_to_one_uwb_ranging_unicast_static_sts", 39 "test_one_to_one_uwb_ranging_multicast_provisioned_sts", 40 "test_one_to_one_uwb_ranging_unicast_provisioned_sts", 41 "test_one_to_one_uwb_ranging_disable_range_data_ntf", 42 "test_one_to_one_wifi_rtt_ranging", 43 "test_one_to_one_wifi_periodic_rtt_ranging", 44 "test_one_to_one_ble_rssi_ranging", 45 "test_one_to_one_ble_cs_ranging", 46 "test_one_to_one_uwb_ranging_with_oob", 47 "test_one_to_one_ble_cs_ranging_with_oob", 48 "test_uwb_ranging_measurement_limit", 49 "test_ble_rssi_ranging_measurement_limit", 50 "test_one_to_one_wifi_rtt_ranging_with_oob", 51 "test_one_to_one_ble_rssi_ranging_with_oob", 52 "test_oob_responder_persists_until_explicitly_stopped", 53] 54 55 56SERVICE_UUID = "0000fffb-0000-1000-8000-00805f9b34fc" 57 58class RangingManagerTest(ranging_base_test.RangingBaseTest): 59 """Tests for UWB Ranging APIs. 60 61 Attributes: 62 63 android_devices: list of android device objects. 64 """ 65 66 def __init__(self, configs: config_parser.TestRunConfig): 67 """Init method for the test class. 68 69 Args: 70 71 configs: A config_parser.TestRunConfig object. 72 """ 73 super().__init__(configs) 74 self.tests = _TEST_CASES 75 76 def _is_cuttlefish_device(self, ad: android_device.AndroidDevice) -> bool: 77 product_name = ad.adb.getprop("ro.product.name") 78 return "cf_x86" in product_name 79 80 def setup_class(self): 81 super().setup_class() 82 self.devices = [RangingDecorator(ad) for ad in self.android_devices] 83 self.initiator, self.responder = self.devices 84 85 for device in self.devices: 86 utils.set_airplane_mode(device.ad, state=False) 87 time.sleep(1) 88 if device.is_ranging_technology_supported(RangingTechnology.UWB): 89 utils.initialize_uwb_country_code(device.ad) 90 utils.request_hw_idle_vote(device.ad, True) 91 92 self.initiator.uwb_address = [1, 2] 93 self.responder.uwb_address = [3, 4] 94 95 def teardown_class(self): 96 super().teardown_class() 97 for device in self.devices: 98 if device.is_ranging_technology_supported(RangingTechnology.UWB): 99 utils.request_hw_idle_vote(device.ad, False) 100 if device.is_ranging_technology_supported(RangingTechnology.WIFI_RTT): 101 utils.set_wifi_state_and_verify(device.ad, True) 102 if device.is_ranging_technology_supported(RangingTechnology.BLE_CS) or \ 103 device.is_ranging_technology_supported(RangingTechnology.BLE_RSSI): 104 utils.set_bt_state_and_verify(device.ad, True) 105 106 def setup_test(self): 107 super().setup_test() 108 for device in self.devices: 109 if device.is_ranging_technology_supported(RangingTechnology.UWB): 110 utils.set_uwb_state_and_verify(device.ad, state=True) 111 utils.set_snippet_foreground_state(device.ad, isForeground=True) 112 utils.set_screen_state(device.ad, on=True) 113 self.initiator.bt_addr = None 114 self.responder.bt_addr = None 115 116 def teardown_test(self): 117 super().teardown_test() 118 for device in self.devices: 119 device.clear_ranging_sessions() 120 121 ### Helpers ### 122 123 def _start_mutual_ranging_and_assert_started( 124 self, 125 session_handle: str, 126 initiator_preference: RangingPreference, 127 responder_preference: RangingPreference, 128 technologies: Set[RangingTechnology], 129 ): 130 """Starts one-to-one ranging session between initiator and responder. 131 132 Args: 133 session_id: id to use for the ranging session. 134 """ 135 self.initiator.start_ranging_and_assert_opened( 136 session_handle, initiator_preference 137 ) 138 if responder_preference is not None: 139 self.responder.start_ranging_and_assert_opened( 140 session_handle, responder_preference 141 ) 142 143 asserts.assert_true( 144 self.initiator.verify_received_data_from_peer_using_technologies( 145 session_handle, 146 self.responder.id, 147 technologies, 148 ), 149 f"Initiator did not find responder", 150 ) 151 if responder_preference is not None: 152 asserts.assert_true( 153 self.responder.verify_received_data_from_peer_using_technologies( 154 session_handle, 155 self.initiator.id, 156 technologies, 157 ), 158 f"Responder did not find initiator", 159 ) 160 161 def _enable_bt(self): 162 utils.set_bt_state_and_verify(self.initiator.ad, True) 163 utils.set_bt_state_and_verify(self.responder.ad, True) 164 165 def _disable_bt(self): 166 utils.set_bt_state_and_verify(self.initiator.ad, False) 167 utils.set_bt_state_and_verify(self.responder.ad, False) 168 169 def _reset_wifi_state(self): 170 utils.reset_wifi_state(self.initiator.ad) 171 utils.reset_wifi_state(self.responder.ad) 172 173 def _disable_wifi(self): 174 utils.set_wifi_state_and_verify(self.initiator.ad, False) 175 utils.set_wifi_state_and_verify(self.responder.ad, False) 176 177 def _ble_connect(self): 178 """Create BLE GATT connection between initiator and responder. 179 180 """ 181 # Start and advertise regular server 182 self.responder.ad.bluetooth.createAndAdvertiseServer(SERVICE_UUID) 183 # Connect to the advertisement 184 self.responder.bt_addr = self.initiator.ad.bluetooth.connectGatt(SERVICE_UUID) 185 asserts.assert_true(self.responder.bt_addr, "Server not connected") 186 connected_devices = self.responder.ad.bluetooth.getConnectedDevices() 187 asserts.assert_true(connected_devices, "No clients found connected to server") 188 self.initiator.bt_addr = connected_devices[0] 189 190 def _ble_disconnect(self): 191 if self.responder.bt_addr and self.initiator.ad.bluetooth.disconnectGatt(SERVICE_UUID) is False: 192 logging.error("Server did not disconnect %s", self.initiator.bt_addr) 193 194 def _ble_bond(self): 195 """Create BLE GATT connection and bonding between initiator and responder. 196 197 """ 198 # Start and advertise regular server 199 self.responder.ad.bluetooth.createAndAdvertiseServer(SERVICE_UUID) 200 oob_data = self.responder.ad.bluetooth.generateServerLocalOobData() 201 asserts.assert_true(oob_data, "OOB data not generated") 202 # Connect to the advertisement using OOB data generated on responder. 203 self.responder.bt_addr = self.initiator.ad.bluetooth.createBondOob(SERVICE_UUID, oob_data) 204 asserts.assert_true(self.responder.bt_addr, "Server not bonded") 205 connected_devices = self.responder.ad.bluetooth.getConnectedDevices() 206 asserts.assert_true(connected_devices, "No clients found connected to server") 207 self.initiator.bt_addr = connected_devices[0] 208 209 def _ble_unbond(self): 210 if self.responder.bt_addr and self.initiator.ad.bluetooth.removeBond(self.responder.bt_addr) is False: 211 logging.error("Server not unbonded %s", self.responder.bt_addr) 212 if self.initiator.bt_addr and self.responder.ad.bluetooth.removeBond(self.initiator.bt_addr) is False: 213 logging.error("Client not unbonded %s", self.initiator.bt_addr) 214 215 ### Test Cases ### 216 217 def _test_one_to_one_uwb_ranging(self, config_id: int): 218 """Verifies uwb ranging with peer device, devices range for 10 seconds.""" 219 SESSION_HANDLE = str(uuid4()) 220 UWB_SESSION_ID = 5 221 TECHNOLOGIES = {RangingTechnology.UWB} 222 223 asserts.skip_if( 224 not self.responder.is_ranging_technology_supported(RangingTechnology.UWB), 225 f"UWB not supported by responder", 226 ) 227 asserts.skip_if( 228 not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB), 229 f"UWB not supported by initiator", 230 ) 231 232 initiator_preference = RangingPreference( 233 device_role=DeviceRole.INITIATOR, 234 ranging_params=RawInitiatorRangingParams( 235 peer_params=[ 236 DeviceParams( 237 peer_id=self.responder.id, 238 uwb_params=uwb.UwbRangingParams( 239 session_id=UWB_SESSION_ID, 240 config_id=uwb.ConfigId.UNICAST_DS_TWR, 241 device_address=self.initiator.uwb_address, 242 peer_address=self.responder.uwb_address, 243 ), 244 ) 245 ], 246 ), 247 ) 248 249 responder_preference = RangingPreference( 250 device_role=DeviceRole.RESPONDER, 251 ranging_params=RawResponderRangingParams( 252 peer_params=DeviceParams( 253 peer_id=self.initiator.id, 254 uwb_params=uwb.UwbRangingParams( 255 session_id=UWB_SESSION_ID, 256 config_id=uwb.ConfigId.UNICAST_DS_TWR, 257 device_address=self.responder.uwb_address, 258 peer_address=self.initiator.uwb_address, 259 ), 260 ), 261 ), 262 ) 263 264 self._start_mutual_ranging_and_assert_started( 265 SESSION_HANDLE, 266 initiator_preference, 267 responder_preference, 268 TECHNOLOGIES, 269 ) 270 271 time.sleep(10) 272 273 asserts.assert_true( 274 self.initiator.verify_received_data_from_peer_using_technologies( 275 SESSION_HANDLE, self.responder.id, TECHNOLOGIES 276 ), 277 "Initiator did not find responder", 278 ) 279 asserts.assert_true( 280 self.responder.verify_received_data_from_peer_using_technologies( 281 SESSION_HANDLE, 282 self.initiator.id, 283 TECHNOLOGIES, 284 ), 285 "Responder did not find initiator", 286 ) 287 288 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 289 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 290 291 292 @ApiTest(apis=[ 293 'android.ranging.RangingData#getDistance', 294 'android.ranging.RangingData#getAzimuth', 295 'android.ranging.RangingData#getElevation', 296 'android.ranging.RangingData#getRangingTechnology', 297 'android.ranging.RangingData#getRssi', 298 'android.ranging.RangingData#hasRssi', 299 'android.ranging.RangingData#getTimestampMillis', 300 'android.ranging.RangingMeasurement#getMeasurement', 301 'android.ranging.RangingMeasurement#getConfidence', 302 'android.ranging.RangingSession.Callback#onOpened()', 303 'android.ranging.RangingSession.Callback#onOpenFailed(int)', 304 'android.ranging.RangingSession.Callback#onClosed(int)', 305 'android.ranging.RangingSession.Callback#onResults(android.ranging.RangingDevice, android.ranging.RangingData)', 306 'android.ranging.RangingSession.Callback#onStarted(android.ranging.RangingDevice, int)', 307 'android.ranging.RangingSession.Callback#onStopped(android.ranging.RangingDevice, int)', 308 'android.os.Parcel#writeBlob(byte[])', 309 ]) 310 def test_one_to_one_uwb_ranging_unicast_static_sts(self): 311 """Verifies uwb ranging with peer device using unicast static sts""" 312 self._test_one_to_one_uwb_ranging(uwb.ConfigId.UNICAST_DS_TWR) 313 314 def test_one_to_one_uwb_ranging_multicast_provisioned_sts(self): 315 """Verifies uwb ranging with peer device using multicast provisioned sts""" 316 self._test_one_to_one_uwb_ranging(uwb.ConfigId.PROVISIONED_MULTICAST_DS_TWR) 317 318 def test_one_to_one_uwb_ranging_unicast_provisioned_sts(self): 319 """Verifies uwb ranging with peer device using unicast provisioned sts""" 320 self._test_one_to_one_uwb_ranging(uwb.ConfigId.PROVISIONED_UNICAST_DS_TWR) 321 322 def test_one_to_one_uwb_ranging_disable_range_data_ntf(self): 323 """Verifies device does not receive range data after disabling range data notifications""" 324 SESSION_HANDLE = str(uuid4()) 325 UWB_SESSION_ID = 5 326 asserts.skip_if( 327 not self.responder.is_ranging_technology_supported(RangingTechnology.UWB), 328 f"UWB not supported by responder", 329 ) 330 asserts.skip_if( 331 not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB), 332 f"UWB not supported by initiator", 333 ) 334 initiator_preference = RangingPreference( 335 device_role=DeviceRole.INITIATOR, 336 ranging_params=RawInitiatorRangingParams( 337 peer_params=[ 338 DeviceParams( 339 peer_id=self.responder.id, 340 uwb_params=uwb.UwbRangingParams( 341 session_id=UWB_SESSION_ID, 342 config_id=uwb.ConfigId.MULTICAST_DS_TWR, 343 device_address=self.initiator.uwb_address, 344 peer_address=self.responder.uwb_address, 345 ), 346 ) 347 ], 348 ), 349 enable_range_data_notifications=False, 350 ) 351 352 responder_preference = RangingPreference( 353 device_role=DeviceRole.RESPONDER, 354 ranging_params=RawResponderRangingParams( 355 peer_params=DeviceParams( 356 peer_id=self.initiator.id, 357 uwb_params=uwb.UwbRangingParams( 358 session_id=UWB_SESSION_ID, 359 config_id=uwb.ConfigId.MULTICAST_DS_TWR, 360 device_address=self.responder.uwb_address, 361 peer_address=self.initiator.uwb_address, 362 ), 363 ), 364 ), 365 enable_range_data_notifications=True, 366 ) 367 368 self.initiator.start_ranging_and_assert_opened( 369 SESSION_HANDLE, initiator_preference 370 ) 371 self.responder.start_ranging_and_assert_opened( 372 SESSION_HANDLE, responder_preference 373 ) 374 375 asserts.assert_false( 376 self.initiator.verify_received_data_from_peer( 377 SESSION_HANDLE, self.responder.id 378 ), 379 "Initiator found responder but initiator has range data" 380 " notifications disabled", 381 ) 382 asserts.assert_true( 383 self.responder.verify_received_data_from_peer( 384 SESSION_HANDLE, self.initiator.id 385 ), 386 "Responder did not find initiator but responder has range data" 387 " notifications enabled", 388 ) 389 390 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 391 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 392 393 def test_uwb_ranging_measurement_limit(self): 394 """Verifies device does not receive range data after measurement limit""" 395 SESSION_HANDLE = str(uuid4()) 396 UWB_SESSION_ID = 5 397 asserts.skip_if( 398 not self.responder.is_ranging_technology_supported(RangingTechnology.UWB), 399 f"UWB not supported by responder", 400 ) 401 asserts.skip_if( 402 not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB), 403 f"UWB not supported by initiator", 404 ) 405 406 asserts.skip_if(self.initiator.ad.uwb.getSpecificationInfo()["fira"]["uci_version"] < 2, 407 f"Measurements limit is supported from Fira 2.0", 408 ) 409 asserts.skip_if(self.responder.ad.uwb.getSpecificationInfo()["fira"]["uci_version"] < 2, 410 f"Measurements limit in supported from Fira 2.0", 411 ) 412 413 initiator_preference = RangingPreference( 414 device_role=DeviceRole.INITIATOR, 415 ranging_params=RawInitiatorRangingParams( 416 peer_params=[ 417 DeviceParams( 418 peer_id=self.responder.id, 419 uwb_params=uwb.UwbRangingParams( 420 session_id=UWB_SESSION_ID, 421 config_id=uwb.ConfigId.MULTICAST_DS_TWR, 422 device_address=self.initiator.uwb_address, 423 peer_address=self.responder.uwb_address, 424 ), 425 ) 426 ], 427 ), 428 measurement_limit=2, 429 ) 430 431 responder_preference = RangingPreference( 432 device_role=DeviceRole.RESPONDER, 433 ranging_params=RawResponderRangingParams( 434 peer_params=DeviceParams( 435 peer_id=self.initiator.id, 436 uwb_params=uwb.UwbRangingParams( 437 session_id=UWB_SESSION_ID, 438 config_id=uwb.ConfigId.MULTICAST_DS_TWR, 439 device_address=self.responder.uwb_address, 440 peer_address=self.initiator.uwb_address, 441 ), 442 ), 443 ), 444 measurement_limit=2, 445 ) 446 447 self.initiator.start_ranging_and_assert_opened( 448 SESSION_HANDLE, initiator_preference 449 ) 450 self.responder.start_ranging_and_assert_opened( 451 SESSION_HANDLE, responder_preference 452 ) 453 454 time.sleep(2) 455 456 self.initiator.assert_close_ranging_event_received(SESSION_HANDLE) 457 self.responder.assert_close_ranging_event_received(SESSION_HANDLE) 458 459 def test_ble_rssi_ranging_measurement_limit(self): 460 """Verifies ble rssi ranging with measurement limit.""" 461 asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad), 462 "Skipping BLE RSSI test on Cuttlefish") 463 SESSION_HANDLE = str(uuid4()) 464 465 asserts.skip_if( 466 not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_RSSI), 467 f"BLE RSSI not supported by responder", 468 ) 469 asserts.skip_if( 470 not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_RSSI), 471 f"BLE RSSI not supported by initiator", 472 ) 473 self._enable_bt() 474 475 try: 476 self._ble_connect() 477 except Exception as e: 478 asserts.skip("Failed to create ble connection", str(e)) 479 480 try: 481 initiator_preference = RangingPreference( 482 device_role=DeviceRole.INITIATOR, 483 ranging_params=RawInitiatorRangingParams( 484 peer_params=[ 485 DeviceParams( 486 peer_id=self.responder.id, 487 rssi_params=rssi.BleRssiRangingParams( 488 peer_address=self.responder.bt_addr, 489 ranging_update_rate=rssi.RangingUpdateRate.FREQUENT, 490 ), 491 ) 492 ], 493 ), 494 measurement_limit=4, 495 ) 496 self.initiator.start_ranging_and_assert_opened( 497 SESSION_HANDLE, initiator_preference 498 ) 499 self.initiator.assert_close_ranging_event_received(SESSION_HANDLE) 500 finally: 501 self._ble_disconnect() 502 503 def test_one_to_one_wifi_rtt_ranging(self): 504 """Verifies wifi rtt ranging with peer device, devices range for 10 seconds.""" 505 asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad), 506 "Skipping WiFi RTT test on Cuttlefish") 507 SESSION_HANDLE = str(uuid4()) 508 TECHNOLOGIES = {RangingTechnology.WIFI_RTT} 509 510 asserts.skip_if( 511 not self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT), 512 f"Wifi nan rtt not supported by responder", 513 ) 514 asserts.skip_if( 515 not self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT), 516 f"Wifi nan rtt not supported by initiator", 517 ) 518 # TODO(rpius): Remove this once the technology is stable. 519 self._reset_wifi_state() 520 test_service_name = "test_service_name" + str(random.randint(1,100)) 521 initiator_preference = RangingPreference( 522 device_role=DeviceRole.INITIATOR, 523 ranging_params=RawInitiatorRangingParams( 524 peer_params=[ 525 DeviceParams( 526 peer_id=self.responder.id, 527 rtt_params=rtt.RttRangingParams( 528 service_name=test_service_name, 529 ), 530 ) 531 ], 532 ), 533 enable_range_data_notifications=True, 534 ) 535 536 responder_preference = RangingPreference( 537 device_role=DeviceRole.RESPONDER, 538 ranging_params=RawResponderRangingParams( 539 peer_params=DeviceParams( 540 peer_id=self.initiator.id, 541 rtt_params=rtt.RttRangingParams( 542 service_name=test_service_name, 543 ), 544 ), 545 ), 546 enable_range_data_notifications=False, 547 ) 548 549 # Should be able to call _start_mutual_ranging_and_assert_started once we get consistent data. 550 self.initiator.start_ranging_and_assert_opened( 551 SESSION_HANDLE, initiator_preference 552 ) 553 self.responder.start_ranging_and_assert_opened( 554 SESSION_HANDLE, responder_preference 555 ) 556 557 time.sleep(10) 558 asserts.assert_true( 559 self.initiator.verify_received_data_from_peer_using_technologies( 560 SESSION_HANDLE, self.responder.id, TECHNOLOGIES 561 ), 562 "Initiator did not find responder", 563 ) 564 565 # Enable when this is supported. 566 # asserts.assert_true( 567 # self.responder.verify_received_data_from_peer_using_technologies( 568 # SESSION_HANDLE, self.initiator.id, TECHNOLOGIES 569 # ), 570 # "Responder did not find initiator", 571 #) 572 573 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 574 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 575 576 def test_one_to_one_wifi_periodic_rtt_ranging(self): 577 """Verifies wifi periodic rtt ranging with peer device, devices range for 10 seconds.""" 578 asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad), 579 "Skipping WiFi periodic RTT test on Cuttlefish") 580 SESSION_HANDLE = str(uuid4()) 581 TECHNOLOGIES = {RangingTechnology.WIFI_RTT} 582 583 asserts.skip_if( 584 not self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT), 585 f"Wifi nan rtt not supported by responder", 586 ) 587 asserts.skip_if( 588 not self.responder.ad.ranging.hasPeriodicRangingHwFeature(), 589 f"Wifi nan periodic rtt not supported by responder", 590 ) 591 asserts.skip_if( 592 not self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT), 593 f"Wifi nan rtt not supported by initiator", 594 ) 595 asserts.skip_if( 596 not self.initiator.ad.ranging.hasPeriodicRangingHwFeature(), 597 f"Wifi nan periodic rtt not supported by initiator", 598 ) 599 # TODO(rpius): Remove this once the technology is stable. 600 self._reset_wifi_state() 601 602 test_service_name = "test_periodic_service_name" + str(random.randint(1,100)) 603 initiator_preference = RangingPreference( 604 device_role=DeviceRole.INITIATOR, 605 ranging_params=RawInitiatorRangingParams( 606 peer_params=[ 607 DeviceParams( 608 peer_id=self.responder.id, 609 rtt_params=rtt.RttRangingParams( 610 service_name=test_service_name, 611 enable_periodic_ranging_hw_feature=True, 612 ), 613 ) 614 ], 615 ), 616 enable_range_data_notifications=True, 617 ) 618 619 responder_preference = RangingPreference( 620 device_role=DeviceRole.RESPONDER, 621 ranging_params=RawResponderRangingParams( 622 peer_params=DeviceParams( 623 peer_id=self.initiator.id, 624 rtt_params=rtt.RttRangingParams( 625 service_name=test_service_name, 626 enable_periodic_ranging_hw_feature=True, 627 ), 628 ), 629 ), 630 enable_range_data_notifications=False, 631 ) 632 633 # Should be able to call _start_mutual_ranging_and_assert_started once we get consistent data. 634 self.initiator.start_ranging_and_assert_opened( 635 SESSION_HANDLE, initiator_preference 636 ) 637 self.responder.start_ranging_and_assert_opened( 638 SESSION_HANDLE, responder_preference 639 ) 640 641 time.sleep(10) 642 asserts.assert_true( 643 self.initiator.verify_received_data_from_peer_using_technologies( 644 SESSION_HANDLE, self.responder.id, TECHNOLOGIES 645 ), 646 "Initiator did not find responder", 647 ) 648 649 asserts.assert_true( 650 self.responder.verify_received_data_from_peer_using_technologies( 651 SESSION_HANDLE, self.initiator.id, TECHNOLOGIES 652 ), 653 "Responder did not find initiator", 654 ) 655 656 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 657 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 658 659 @ApiTest(apis=[ 660 'android.bluetooth.le.DistanceMeasurementSession#stopSession', 661 'android.content.AttributionSource#checkCallingUid', 662 'java.util#copyOf(byte[], int)', 663 'java.util#copyOfRange(byte[], int, int)', 664 ]) 665 def test_one_to_one_ble_rssi_ranging(self): 666 """Verifies cs ranging with peer device, devices range for 10 seconds.""" 667 asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad), 668 "Skipping BLE RSSI test on Cuttlefish") 669 SESSION_HANDLE = str(uuid4()) 670 TECHNOLOGIES = {RangingTechnology.BLE_RSSI} 671 672 asserts.skip_if( 673 not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_RSSI), 674 f"BLE RSSI not supported by responder", 675 ) 676 asserts.skip_if( 677 not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_RSSI), 678 f"BLE RSSI not supported by initiator", 679 ) 680 self._enable_bt() 681 682 try: 683 self._ble_connect() 684 except Exception as e: 685 asserts.skip("Failed to create ble connection", str(e)) 686 687 try: 688 initiator_preference = RangingPreference( 689 device_role=DeviceRole.INITIATOR, 690 ranging_params=RawInitiatorRangingParams( 691 peer_params=[ 692 DeviceParams( 693 peer_id=self.responder.id, 694 rssi_params=rssi.BleRssiRangingParams( 695 peer_address=self.responder.bt_addr, 696 ), 697 ) 698 ], 699 ), 700 ) 701 702 responder_preference = RangingPreference( 703 device_role=DeviceRole.RESPONDER, 704 ranging_params=RawResponderRangingParams( 705 peer_params=DeviceParams( 706 peer_id=self.initiator.id, 707 rssi_params=rssi.BleRssiRangingParams( 708 peer_address=self.initiator.bt_addr, 709 ), 710 ), 711 ), 712 ) 713 714 self._start_mutual_ranging_and_assert_started( 715 SESSION_HANDLE, 716 initiator_preference, 717 responder_preference, 718 TECHNOLOGIES, 719 ) 720 721 time.sleep(10) 722 723 asserts.assert_true( 724 self.initiator.verify_received_data_from_peer_using_technologies( 725 SESSION_HANDLE, 726 self.responder.id, 727 TECHNOLOGIES 728 ), 729 "Initiator did not find responder", 730 ) 731 asserts.assert_true( 732 self.responder.verify_received_data_from_peer_using_technologies( 733 SESSION_HANDLE, 734 self.initiator.id, 735 TECHNOLOGIES, 736 ), 737 "Responder did not find initiator", 738 ) 739 finally: 740 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 741 self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE) 742 743 self._ble_disconnect() 744 745 @ApiTest(apis=[ 746 'android.bluetooth.le.DistanceMeasurementSession#stopSession', 747 'android.bluetooth.le.DistanceMeasurementParams#getMaxDurationSeconds', 748 ]) 749 def test_one_to_one_ble_cs_ranging(self): 750 """ 751 Verifies cs ranging with peer device, devices range for 10 seconds. 752 This test is only one way since we don't test if responder also can simultaneously get the data. 753 """ 754 asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad), 755 "Skipping BLE CS test on Cuttlefish") 756 SESSION_HANDLE = str(uuid4()) 757 TECHNOLOGIES = {RangingTechnology.BLE_CS} 758 759 asserts.skip_if( 760 not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS), 761 f"BLE_CS not supported by responder", 762 ) 763 asserts.skip_if( 764 not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS), 765 f"BLE CS not supported by initiator", 766 ) 767 self._enable_bt() 768 769 try: 770 self._ble_bond() 771 except Exception as e: 772 asserts.skip("Failed to create ble bond", str(e)) 773 774 try: 775 initiator_preference = RangingPreference( 776 device_role=DeviceRole.INITIATOR, 777 ranging_params=RawInitiatorRangingParams( 778 peer_params=[ 779 DeviceParams( 780 peer_id=self.responder.id, 781 cs_params=cs.CsRangingParams( 782 peer_address=self.responder.bt_addr, 783 ), 784 ) 785 ], 786 ), 787 ) 788 789 self._start_mutual_ranging_and_assert_started( 790 SESSION_HANDLE, 791 initiator_preference, 792 None, 793 TECHNOLOGIES, 794 ) 795 796 time.sleep(10) 797 798 asserts.assert_true( 799 self.initiator.verify_received_data_from_peer_using_technologies( 800 SESSION_HANDLE, 801 self.responder.id, 802 TECHNOLOGIES 803 ), 804 "Initiator did not find responder", 805 ) 806 finally: 807 self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE) 808 809 self._ble_unbond() 810 811 @ApiTest(apis=[ 812 'android.ranging.oob.TransportHandle#sendData(byte[])', 813 'android.ranging.oob.TransportHandle#registerReceiveCallback(java.util.concurrent.Executor, android.ranging.oob.TransportHandle.ReceiveCallback)', 814 'android.ranging.oob.TransportHandle.ReceiveCallback#onSendFailed()', 815 ]) 816 def test_one_to_one_uwb_ranging_with_oob(self): 817 asserts.skip_if( 818 not self.responder.is_ranging_technology_supported(RangingTechnology.UWB), 819 f"UWB not supported by responder", 820 ) 821 asserts.skip_if( 822 not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB), 823 f"UWB not supported by initiator", 824 ) 825 826 initiator_preference = RangingPreference( 827 device_role=DeviceRole.INITIATOR, 828 ranging_params=OobInitiatorRangingParams(peer_ids=[self.responder.id], ranging_mode=RangingMode.HIGH_ACCURACY), 829 ) 830 831 responder_preference = RangingPreference( 832 device_role=DeviceRole.RESPONDER, 833 ranging_params=OobResponderRangingParams(peer_id=self.initiator.id), 834 ) 835 836 session = RangingSession() 837 session.set_initiator(self.initiator, initiator_preference) 838 session.add_responder(self.responder, responder_preference) 839 840 session.start_and_assert_opened() 841 session.assert_received_data() 842 session.stop_and_assert_closed() 843 844 def test_one_to_one_ble_cs_ranging_with_oob(self): 845 asserts.skip_if( 846 not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS), 847 f"BLE_CS not supported by responder", 848 ) 849 asserts.skip_if( 850 not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS), 851 f"BLE_CS not supported by initiator", 852 ) 853 854 if self.initiator.is_ranging_technology_supported(RangingTechnology.UWB): 855 utils.set_uwb_state_and_verify(self.initiator.ad, state=False) 856 if self.responder.is_ranging_technology_supported(RangingTechnology.UWB): 857 utils.set_uwb_state_and_verify(self.responder.ad, state=False) 858 859 initiator_preference = RangingPreference( 860 device_role=DeviceRole.INITIATOR, 861 ranging_params=OobInitiatorRangingParams( 862 peer_ids=[self.responder.id], 863 # HIGH_ACCURACY_PREFERRED mode with UWB disabled should fallback to CS 864 ranging_mode=RangingMode.HIGH_ACCURACY_PREFERRED 865 ), 866 ) 867 868 responder_preference = RangingPreference( 869 device_role=DeviceRole.RESPONDER, 870 ranging_params=OobResponderRangingParams(peer_id=self.initiator.id), 871 ) 872 873 session = RangingSession() 874 session.set_initiator(self.initiator, initiator_preference) 875 session.add_responder(self.responder, responder_preference) 876 877 self._enable_bt() 878 879 try: 880 self._ble_bond() 881 except Exception as e: 882 asserts.skip("Failed to create ble bond", str(e)) 883 884 try: 885 session.start_and_assert_opened(check_responders=False) 886 session.assert_received_data(technologies=[RangingTechnology.BLE_CS], check_responders=False) 887 finally: 888 session.stop_and_assert_closed(check_responders=False) 889 self._ble_unbond() 890 891 def test_one_to_one_wifi_rtt_ranging_with_oob(self): 892 asserts.skip_if( 893 not self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT), 894 f"WIFI_RTT not supported by responder", 895 ) 896 asserts.skip_if( 897 not self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT), 898 f"WIFI_RTT not supported by initiator", 899 ) 900 901 if self.initiator.is_ranging_technology_supported(RangingTechnology.UWB): 902 utils.set_uwb_state_and_verify(self.initiator.ad, state=False) 903 if self.responder.is_ranging_technology_supported(RangingTechnology.UWB): 904 utils.set_uwb_state_and_verify(self.responder.ad, state=False) 905 906 if self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS) and \ 907 self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS): 908 self._disable_bt() 909 910 self._reset_wifi_state() 911 912 initiator_preference = RangingPreference( 913 device_role=DeviceRole.INITIATOR, 914 ranging_params=OobInitiatorRangingParams( 915 peer_ids=[self.responder.id], 916 # HIGH_ACCURACY_PREFERRED mode with UWB and CS disabled should fallback to RTT 917 ranging_mode=RangingMode.HIGH_ACCURACY_PREFERRED 918 ), 919 enable_range_data_notifications=True, 920 ) 921 922 responder_preference = RangingPreference( 923 device_role=DeviceRole.RESPONDER, 924 ranging_params=OobResponderRangingParams(peer_id=self.initiator.id), 925 enable_range_data_notifications=False, 926 ) 927 928 session = RangingSession() 929 session.set_initiator(self.initiator, initiator_preference) 930 session.add_responder(self.responder, responder_preference) 931 932 session.start_and_assert_opened(check_responders=False) 933 session.assert_received_data(technologies=[RangingTechnology.WIFI_RTT], check_responders=False) 934 935 def test_one_to_one_ble_rssi_ranging_with_oob(self): 936 937 """ Skip if BLE CS is supported by both devices. """ 938 asserts.skip_if( 939 self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS) and 940 self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS), 941 f"BLE_CS is supported, skip running BLE_RSSI tests", 942 ) 943 944 asserts.skip_if( 945 self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_RSSI) or 946 self.responder.is_ranging_technology_supported(RangingTechnology.BLE_RSSI), 947 f"BLE_RSSI is not supported", 948 ) 949 950 if self.initiator.is_ranging_technology_supported(RangingTechnology.UWB): 951 utils.set_uwb_state_and_verify(self.initiator.ad, state=False) 952 if self.responder.is_ranging_technology_supported(RangingTechnology.UWB): 953 utils.set_uwb_state_and_verify(self.responder.ad, state=False) 954 955 if self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT) and \ 956 self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT): 957 self._disable_wifi() 958 959 self._enable_bt() 960 961 try: 962 self._ble_connect() 963 except Exception as e: 964 asserts.skip("Failed to create ble connection", str(e)) 965 966 try: 967 initiator_preference = RangingPreference( 968 device_role=DeviceRole.INITIATOR, 969 ranging_params=OobInitiatorRangingParams( 970 peer_ids=[self.responder.id], 971 ranging_mode=RangingMode.AUTO 972 ), 973 ) 974 975 responder_preference = RangingPreference( 976 device_role=DeviceRole.RESPONDER, 977 ranging_params=OobResponderRangingParams(peer_id=self.initiator.id), 978 ) 979 980 session = RangingSession() 981 session.set_initiator(self.initiator, initiator_preference) 982 session.add_responder(self.responder, responder_preference) 983 984 session.start_and_assert_opened(check_responders=False) 985 session.assert_received_data(technologies=[RangingTechnology.BLE_RSSI], check_responders=False) 986 987 finally: 988 self._ble_disconnect() 989 990 991 def test_oob_responder_persists_until_explicitly_stopped(self): 992 asserts.skip_if( 993 not self.responder.is_ranging_technology_supported(RangingTechnology.UWB), 994 f"UWB not supported by responder", 995 ) 996 asserts.skip_if( 997 not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB), 998 f"UWB not supported by initiator", 999 ) 1000 1001 initiator_preference = RangingPreference( 1002 device_role=DeviceRole.INITIATOR, 1003 ranging_params=OobInitiatorRangingParams(peer_ids=[self.responder.id], ranging_mode=RangingMode.HIGH_ACCURACY), 1004 ) 1005 1006 responder_preference = RangingPreference( 1007 device_role=DeviceRole.RESPONDER, 1008 ranging_params=OobResponderRangingParams(peer_id=self.initiator.id), 1009 ) 1010 1011 session = RangingSession() 1012 session.set_initiator(self.initiator, initiator_preference) 1013 session.add_responder(self.responder, responder_preference) 1014 1015 session.start_and_assert_opened() 1016 session.assert_received_data() 1017 session.stop_and_assert_closed(stop_responders=False, check_responders=False) 1018 1019 time.sleep(1) 1020 1021 session.start_and_assert_opened(start_responders=False, check_responders=False) 1022 session.assert_received_data() 1023 session.stop_and_assert_closed() 1024 1025if __name__ == "__main__": 1026 if "--" in sys.argv: 1027 index = sys.argv.index("--") 1028 sys.argv = sys.argv[:1] + sys.argv[index + 1 :] 1029 suite_runner.run_suite([RangingManagerTest]) 1030