• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#  Copyright (C) 2024 The Android Open Source Project
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#       http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15import random
16import sys
17import time
18import logging
19from typing import Set
20from lib import cs
21from lib import ranging_base_test
22from lib import rssi
23from lib import rtt
24from lib import utils
25from lib import uwb
26from lib.session import RangingSession
27from lib.params import *
28from lib.ranging_decorator import *
29from mobly import asserts
30from mobly import config_parser
31from mobly import signals
32from mobly import suite_runner
33from mobly.controllers import android_device
34from android.platform.test.annotations import ApiTest
35
36
37_TEST_CASES = [
38    "test_one_to_one_uwb_ranging_unicast_static_sts",
39    "test_one_to_one_uwb_ranging_multicast_provisioned_sts",
40    "test_one_to_one_uwb_ranging_unicast_provisioned_sts",
41    "test_one_to_one_uwb_ranging_disable_range_data_ntf",
42    "test_one_to_one_wifi_rtt_ranging",
43    "test_one_to_one_wifi_periodic_rtt_ranging",
44    "test_one_to_one_ble_rssi_ranging",
45    "test_one_to_one_ble_cs_ranging",
46    "test_one_to_one_uwb_ranging_with_oob",
47    "test_one_to_one_ble_cs_ranging_with_oob",
48    "test_uwb_ranging_measurement_limit",
49    "test_ble_rssi_ranging_measurement_limit",
50    "test_one_to_one_wifi_rtt_ranging_with_oob",
51    "test_one_to_one_ble_rssi_ranging_with_oob",
52    "test_oob_responder_persists_until_explicitly_stopped",
53]
54
55
56SERVICE_UUID = "0000fffb-0000-1000-8000-00805f9b34fc"
57
58class RangingManagerTest(ranging_base_test.RangingBaseTest):
59  """Tests for UWB Ranging APIs.
60
61  Attributes:
62
63  android_devices: list of android device objects.
64  """
65
66  def __init__(self, configs: config_parser.TestRunConfig):
67    """Init method for the test class.
68
69    Args:
70
71    configs: A config_parser.TestRunConfig object.
72    """
73    super().__init__(configs)
74    self.tests = _TEST_CASES
75
76  def _is_cuttlefish_device(self, ad: android_device.AndroidDevice) -> bool:
77    product_name = ad.adb.getprop("ro.product.name")
78    return "cf_x86" in product_name
79
80  def setup_class(self):
81    super().setup_class()
82    self.devices = [RangingDecorator(ad) for ad in self.android_devices]
83    self.initiator, self.responder = self.devices
84
85    for device in self.devices:
86      utils.set_airplane_mode(device.ad, state=False)
87      time.sleep(1)
88      if device.is_ranging_technology_supported(RangingTechnology.UWB):
89        utils.initialize_uwb_country_code(device.ad)
90        utils.request_hw_idle_vote(device.ad, True)
91
92    self.initiator.uwb_address = [1, 2]
93    self.responder.uwb_address = [3, 4]
94
95  def teardown_class(self):
96      super().teardown_class()
97      for device in self.devices:
98        if device.is_ranging_technology_supported(RangingTechnology.UWB):
99            utils.request_hw_idle_vote(device.ad, False)
100        if device.is_ranging_technology_supported(RangingTechnology.WIFI_RTT):
101            utils.set_wifi_state_and_verify(device.ad, True)
102        if device.is_ranging_technology_supported(RangingTechnology.BLE_CS) or \
103            device.is_ranging_technology_supported(RangingTechnology.BLE_RSSI):
104            utils.set_bt_state_and_verify(device.ad, True)
105
106  def setup_test(self):
107    super().setup_test()
108    for device in self.devices:
109      if device.is_ranging_technology_supported(RangingTechnology.UWB):
110        utils.set_uwb_state_and_verify(device.ad, state=True)
111        utils.set_snippet_foreground_state(device.ad, isForeground=True)
112      utils.set_screen_state(device.ad, on=True)
113    self.initiator.bt_addr = None
114    self.responder.bt_addr = None
115
116  def teardown_test(self):
117    super().teardown_test()
118    for device in self.devices:
119      device.clear_ranging_sessions()
120
121  ### Helpers ###
122
123  def _start_mutual_ranging_and_assert_started(
124      self,
125      session_handle: str,
126      initiator_preference: RangingPreference,
127      responder_preference: RangingPreference,
128      technologies: Set[RangingTechnology],
129  ):
130    """Starts one-to-one ranging session between initiator and responder.
131
132    Args:
133        session_id: id to use for the ranging session.
134    """
135    self.initiator.start_ranging_and_assert_opened(
136        session_handle, initiator_preference
137    )
138    if responder_preference is not None:
139        self.responder.start_ranging_and_assert_opened(
140            session_handle, responder_preference
141        )
142
143    asserts.assert_true(
144        self.initiator.verify_received_data_from_peer_using_technologies(
145            session_handle,
146            self.responder.id,
147            technologies,
148        ),
149        f"Initiator did not find responder",
150    )
151    if responder_preference is not None:
152        asserts.assert_true(
153            self.responder.verify_received_data_from_peer_using_technologies(
154                session_handle,
155                self.initiator.id,
156                technologies,
157            ),
158            f"Responder did not find initiator",
159        )
160
161  def _enable_bt(self):
162    utils.set_bt_state_and_verify(self.initiator.ad, True)
163    utils.set_bt_state_and_verify(self.responder.ad, True)
164
165  def _disable_bt(self):
166    utils.set_bt_state_and_verify(self.initiator.ad, False)
167    utils.set_bt_state_and_verify(self.responder.ad, False)
168
169  def _reset_wifi_state(self):
170    utils.reset_wifi_state(self.initiator.ad)
171    utils.reset_wifi_state(self.responder.ad)
172
173  def _disable_wifi(self):
174      utils.set_wifi_state_and_verify(self.initiator.ad, False)
175      utils.set_wifi_state_and_verify(self.responder.ad, False)
176
177  def _ble_connect(self):
178    """Create BLE GATT connection between initiator and responder.
179
180    """
181    # Start and advertise regular server
182    self.responder.ad.bluetooth.createAndAdvertiseServer(SERVICE_UUID)
183    # Connect to the advertisement
184    self.responder.bt_addr = self.initiator.ad.bluetooth.connectGatt(SERVICE_UUID)
185    asserts.assert_true(self.responder.bt_addr, "Server not connected")
186    connected_devices = self.responder.ad.bluetooth.getConnectedDevices()
187    asserts.assert_true(connected_devices, "No clients found connected to server")
188    self.initiator.bt_addr = connected_devices[0]
189
190  def _ble_disconnect(self):
191    if self.responder.bt_addr and self.initiator.ad.bluetooth.disconnectGatt(SERVICE_UUID) is False:
192            logging.error("Server did not disconnect %s", self.initiator.bt_addr)
193
194  def _ble_bond(self):
195    """Create BLE GATT connection and bonding between initiator and responder.
196
197    """
198    # Start and advertise regular server
199    self.responder.ad.bluetooth.createAndAdvertiseServer(SERVICE_UUID)
200    oob_data = self.responder.ad.bluetooth.generateServerLocalOobData()
201    asserts.assert_true(oob_data, "OOB data not generated")
202    # Connect to the advertisement using OOB data generated on responder.
203    self.responder.bt_addr = self.initiator.ad.bluetooth.createBondOob(SERVICE_UUID, oob_data)
204    asserts.assert_true(self.responder.bt_addr, "Server not bonded")
205    connected_devices = self.responder.ad.bluetooth.getConnectedDevices()
206    asserts.assert_true(connected_devices, "No clients found connected to server")
207    self.initiator.bt_addr = connected_devices[0]
208
209  def _ble_unbond(self):
210    if self.responder.bt_addr and self.initiator.ad.bluetooth.removeBond(self.responder.bt_addr) is False:
211        logging.error("Server not unbonded %s", self.responder.bt_addr)
212    if self.initiator.bt_addr and self.responder.ad.bluetooth.removeBond(self.initiator.bt_addr) is False:
213        logging.error("Client not unbonded %s", self.initiator.bt_addr)
214
215  ### Test Cases ###
216
217  def _test_one_to_one_uwb_ranging(self, config_id: int):
218      """Verifies uwb ranging with peer device, devices range for 10 seconds."""
219      SESSION_HANDLE = str(uuid4())
220      UWB_SESSION_ID = 5
221      TECHNOLOGIES = {RangingTechnology.UWB}
222
223      asserts.skip_if(
224          not self.responder.is_ranging_technology_supported(RangingTechnology.UWB),
225          f"UWB not supported by responder",
226      )
227      asserts.skip_if(
228          not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB),
229          f"UWB not supported by initiator",
230      )
231
232      initiator_preference = RangingPreference(
233          device_role=DeviceRole.INITIATOR,
234          ranging_params=RawInitiatorRangingParams(
235              peer_params=[
236                  DeviceParams(
237                      peer_id=self.responder.id,
238                      uwb_params=uwb.UwbRangingParams(
239                          session_id=UWB_SESSION_ID,
240                          config_id=uwb.ConfigId.UNICAST_DS_TWR,
241                          device_address=self.initiator.uwb_address,
242                          peer_address=self.responder.uwb_address,
243                      ),
244                  )
245              ],
246          ),
247      )
248
249      responder_preference = RangingPreference(
250          device_role=DeviceRole.RESPONDER,
251          ranging_params=RawResponderRangingParams(
252              peer_params=DeviceParams(
253                  peer_id=self.initiator.id,
254                  uwb_params=uwb.UwbRangingParams(
255                      session_id=UWB_SESSION_ID,
256                      config_id=uwb.ConfigId.UNICAST_DS_TWR,
257                      device_address=self.responder.uwb_address,
258                      peer_address=self.initiator.uwb_address,
259                  ),
260              ),
261          ),
262      )
263
264      self._start_mutual_ranging_and_assert_started(
265          SESSION_HANDLE,
266          initiator_preference,
267          responder_preference,
268          TECHNOLOGIES,
269      )
270
271      time.sleep(10)
272
273      asserts.assert_true(
274          self.initiator.verify_received_data_from_peer_using_technologies(
275              SESSION_HANDLE, self.responder.id, TECHNOLOGIES
276          ),
277          "Initiator did not find responder",
278      )
279      asserts.assert_true(
280          self.responder.verify_received_data_from_peer_using_technologies(
281              SESSION_HANDLE,
282              self.initiator.id,
283              TECHNOLOGIES,
284          ),
285          "Responder did not find initiator",
286      )
287
288      self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
289      self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
290
291
292  @ApiTest(apis=[
293    'android.ranging.RangingData#getDistance',
294    'android.ranging.RangingData#getAzimuth',
295    'android.ranging.RangingData#getElevation',
296    'android.ranging.RangingData#getRangingTechnology',
297    'android.ranging.RangingData#getRssi',
298    'android.ranging.RangingData#hasRssi',
299    'android.ranging.RangingData#getTimestampMillis',
300    'android.ranging.RangingMeasurement#getMeasurement',
301    'android.ranging.RangingMeasurement#getConfidence',
302    'android.ranging.RangingSession.Callback#onOpened()',
303    'android.ranging.RangingSession.Callback#onOpenFailed(int)',
304    'android.ranging.RangingSession.Callback#onClosed(int)',
305    'android.ranging.RangingSession.Callback#onResults(android.ranging.RangingDevice, android.ranging.RangingData)',
306    'android.ranging.RangingSession.Callback#onStarted(android.ranging.RangingDevice, int)',
307    'android.ranging.RangingSession.Callback#onStopped(android.ranging.RangingDevice, int)',
308    'android.os.Parcel#writeBlob(byte[])',
309  ])
310  def test_one_to_one_uwb_ranging_unicast_static_sts(self):
311    """Verifies uwb ranging with peer device using unicast static sts"""
312    self._test_one_to_one_uwb_ranging(uwb.ConfigId.UNICAST_DS_TWR)
313
314  def test_one_to_one_uwb_ranging_multicast_provisioned_sts(self):
315    """Verifies uwb ranging with peer device using multicast provisioned sts"""
316    self._test_one_to_one_uwb_ranging(uwb.ConfigId.PROVISIONED_MULTICAST_DS_TWR)
317
318  def test_one_to_one_uwb_ranging_unicast_provisioned_sts(self):
319      """Verifies uwb ranging with peer device using unicast provisioned sts"""
320      self._test_one_to_one_uwb_ranging(uwb.ConfigId.PROVISIONED_UNICAST_DS_TWR)
321
322  def test_one_to_one_uwb_ranging_disable_range_data_ntf(self):
323    """Verifies device does not receive range data after disabling range data notifications"""
324    SESSION_HANDLE = str(uuid4())
325    UWB_SESSION_ID = 5
326    asserts.skip_if(
327        not self.responder.is_ranging_technology_supported(RangingTechnology.UWB),
328        f"UWB not supported by responder",
329    )
330    asserts.skip_if(
331        not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB),
332        f"UWB not supported by initiator",
333    )
334    initiator_preference = RangingPreference(
335        device_role=DeviceRole.INITIATOR,
336        ranging_params=RawInitiatorRangingParams(
337            peer_params=[
338                DeviceParams(
339                    peer_id=self.responder.id,
340                    uwb_params=uwb.UwbRangingParams(
341                        session_id=UWB_SESSION_ID,
342                        config_id=uwb.ConfigId.MULTICAST_DS_TWR,
343                        device_address=self.initiator.uwb_address,
344                        peer_address=self.responder.uwb_address,
345                    ),
346                )
347            ],
348        ),
349        enable_range_data_notifications=False,
350    )
351
352    responder_preference = RangingPreference(
353        device_role=DeviceRole.RESPONDER,
354        ranging_params=RawResponderRangingParams(
355            peer_params=DeviceParams(
356                peer_id=self.initiator.id,
357                uwb_params=uwb.UwbRangingParams(
358                    session_id=UWB_SESSION_ID,
359                    config_id=uwb.ConfigId.MULTICAST_DS_TWR,
360                    device_address=self.responder.uwb_address,
361                    peer_address=self.initiator.uwb_address,
362                ),
363            ),
364        ),
365        enable_range_data_notifications=True,
366    )
367
368    self.initiator.start_ranging_and_assert_opened(
369        SESSION_HANDLE, initiator_preference
370    )
371    self.responder.start_ranging_and_assert_opened(
372        SESSION_HANDLE, responder_preference
373    )
374
375    asserts.assert_false(
376        self.initiator.verify_received_data_from_peer(
377            SESSION_HANDLE, self.responder.id
378        ),
379        "Initiator found responder but initiator has range data"
380        " notifications disabled",
381    )
382    asserts.assert_true(
383        self.responder.verify_received_data_from_peer(
384            SESSION_HANDLE, self.initiator.id
385        ),
386        "Responder did not find initiator but responder has range data"
387        " notifications enabled",
388    )
389
390    self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
391    self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
392
393  def test_uwb_ranging_measurement_limit(self):
394      """Verifies device does not receive range data after measurement limit"""
395      SESSION_HANDLE = str(uuid4())
396      UWB_SESSION_ID = 5
397      asserts.skip_if(
398          not self.responder.is_ranging_technology_supported(RangingTechnology.UWB),
399          f"UWB not supported by responder",
400      )
401      asserts.skip_if(
402          not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB),
403          f"UWB not supported by initiator",
404      )
405
406      asserts.skip_if(self.initiator.ad.uwb.getSpecificationInfo()["fira"]["uci_version"] < 2,
407          f"Measurements limit is supported from Fira 2.0",
408      )
409      asserts.skip_if(self.responder.ad.uwb.getSpecificationInfo()["fira"]["uci_version"] < 2,
410         f"Measurements limit in supported from Fira 2.0",
411      )
412
413      initiator_preference = RangingPreference(
414          device_role=DeviceRole.INITIATOR,
415          ranging_params=RawInitiatorRangingParams(
416              peer_params=[
417                  DeviceParams(
418                      peer_id=self.responder.id,
419                      uwb_params=uwb.UwbRangingParams(
420                          session_id=UWB_SESSION_ID,
421                          config_id=uwb.ConfigId.MULTICAST_DS_TWR,
422                          device_address=self.initiator.uwb_address,
423                          peer_address=self.responder.uwb_address,
424                      ),
425                  )
426              ],
427          ),
428          measurement_limit=2,
429      )
430
431      responder_preference = RangingPreference(
432          device_role=DeviceRole.RESPONDER,
433          ranging_params=RawResponderRangingParams(
434              peer_params=DeviceParams(
435                  peer_id=self.initiator.id,
436                  uwb_params=uwb.UwbRangingParams(
437                      session_id=UWB_SESSION_ID,
438                      config_id=uwb.ConfigId.MULTICAST_DS_TWR,
439                      device_address=self.responder.uwb_address,
440                      peer_address=self.initiator.uwb_address,
441                  ),
442              ),
443          ),
444          measurement_limit=2,
445      )
446
447      self.initiator.start_ranging_and_assert_opened(
448          SESSION_HANDLE, initiator_preference
449      )
450      self.responder.start_ranging_and_assert_opened(
451          SESSION_HANDLE, responder_preference
452      )
453
454      time.sleep(2)
455
456      self.initiator.assert_close_ranging_event_received(SESSION_HANDLE)
457      self.responder.assert_close_ranging_event_received(SESSION_HANDLE)
458
459  def test_ble_rssi_ranging_measurement_limit(self):
460      """Verifies ble rssi ranging with measurement limit."""
461      asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad),
462                      "Skipping BLE RSSI test on Cuttlefish")
463      SESSION_HANDLE = str(uuid4())
464
465      asserts.skip_if(
466          not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_RSSI),
467          f"BLE RSSI not supported by responder",
468      )
469      asserts.skip_if(
470          not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_RSSI),
471          f"BLE RSSI not supported by initiator",
472      )
473      self._enable_bt()
474
475      try:
476          self._ble_connect()
477      except Exception as e:
478          asserts.skip("Failed to create ble connection", str(e))
479
480      try:
481          initiator_preference = RangingPreference(
482              device_role=DeviceRole.INITIATOR,
483              ranging_params=RawInitiatorRangingParams(
484                  peer_params=[
485                      DeviceParams(
486                          peer_id=self.responder.id,
487                          rssi_params=rssi.BleRssiRangingParams(
488                              peer_address=self.responder.bt_addr,
489                              ranging_update_rate=rssi.RangingUpdateRate.FREQUENT,
490                          ),
491                      )
492                  ],
493              ),
494              measurement_limit=4,
495          )
496          self.initiator.start_ranging_and_assert_opened(
497              SESSION_HANDLE, initiator_preference
498          )
499          self.initiator.assert_close_ranging_event_received(SESSION_HANDLE)
500      finally:
501          self._ble_disconnect()
502
503  def test_one_to_one_wifi_rtt_ranging(self):
504    """Verifies wifi rtt ranging with peer device, devices range for 10 seconds."""
505    asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad),
506                    "Skipping WiFi RTT test on Cuttlefish")
507    SESSION_HANDLE = str(uuid4())
508    TECHNOLOGIES = {RangingTechnology.WIFI_RTT}
509
510    asserts.skip_if(
511        not self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT),
512        f"Wifi nan rtt not supported by responder",
513    )
514    asserts.skip_if(
515        not self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT),
516        f"Wifi nan rtt not supported by initiator",
517    )
518    # TODO(rpius): Remove this once the technology is stable.
519    self._reset_wifi_state()
520    test_service_name = "test_service_name" + str(random.randint(1,100))
521    initiator_preference = RangingPreference(
522        device_role=DeviceRole.INITIATOR,
523        ranging_params=RawInitiatorRangingParams(
524            peer_params=[
525                DeviceParams(
526                    peer_id=self.responder.id,
527                    rtt_params=rtt.RttRangingParams(
528                        service_name=test_service_name,
529                    ),
530                )
531            ],
532        ),
533        enable_range_data_notifications=True,
534    )
535
536    responder_preference = RangingPreference(
537        device_role=DeviceRole.RESPONDER,
538        ranging_params=RawResponderRangingParams(
539            peer_params=DeviceParams(
540                peer_id=self.initiator.id,
541                rtt_params=rtt.RttRangingParams(
542                    service_name=test_service_name,
543                ),
544            ),
545        ),
546        enable_range_data_notifications=False,
547    )
548
549    # Should be able to call _start_mutual_ranging_and_assert_started once we get consistent data.
550    self.initiator.start_ranging_and_assert_opened(
551        SESSION_HANDLE, initiator_preference
552    )
553    self.responder.start_ranging_and_assert_opened(
554        SESSION_HANDLE, responder_preference
555    )
556
557    time.sleep(10)
558    asserts.assert_true(
559        self.initiator.verify_received_data_from_peer_using_technologies(
560            SESSION_HANDLE, self.responder.id, TECHNOLOGIES
561        ),
562        "Initiator did not find responder",
563    )
564
565    # Enable when this is supported.
566    # asserts.assert_true(
567    #    self.responder.verify_received_data_from_peer_using_technologies(
568    #        SESSION_HANDLE, self.initiator.id, TECHNOLOGIES
569    #    ),
570    #    "Responder did not find initiator",
571    #)
572
573    self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
574    self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
575
576  def test_one_to_one_wifi_periodic_rtt_ranging(self):
577    """Verifies wifi periodic rtt ranging with peer device, devices range for 10 seconds."""
578    asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad),
579                    "Skipping WiFi periodic RTT test on Cuttlefish")
580    SESSION_HANDLE = str(uuid4())
581    TECHNOLOGIES = {RangingTechnology.WIFI_RTT}
582
583    asserts.skip_if(
584        not self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT),
585        f"Wifi nan rtt not supported by responder",
586    )
587    asserts.skip_if(
588        not self.responder.ad.ranging.hasPeriodicRangingHwFeature(),
589        f"Wifi nan periodic rtt not supported by responder",
590    )
591    asserts.skip_if(
592        not self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT),
593        f"Wifi nan rtt not supported by initiator",
594    )
595    asserts.skip_if(
596        not self.initiator.ad.ranging.hasPeriodicRangingHwFeature(),
597        f"Wifi nan periodic rtt not supported by initiator",
598    )
599    # TODO(rpius): Remove this once the technology is stable.
600    self._reset_wifi_state()
601
602    test_service_name = "test_periodic_service_name" + str(random.randint(1,100))
603    initiator_preference = RangingPreference(
604        device_role=DeviceRole.INITIATOR,
605        ranging_params=RawInitiatorRangingParams(
606            peer_params=[
607                DeviceParams(
608                    peer_id=self.responder.id,
609                    rtt_params=rtt.RttRangingParams(
610                        service_name=test_service_name,
611                        enable_periodic_ranging_hw_feature=True,
612                    ),
613                )
614            ],
615        ),
616        enable_range_data_notifications=True,
617    )
618
619    responder_preference = RangingPreference(
620        device_role=DeviceRole.RESPONDER,
621        ranging_params=RawResponderRangingParams(
622            peer_params=DeviceParams(
623                peer_id=self.initiator.id,
624                rtt_params=rtt.RttRangingParams(
625                    service_name=test_service_name,
626                    enable_periodic_ranging_hw_feature=True,
627                ),
628            ),
629        ),
630        enable_range_data_notifications=False,
631    )
632
633    # Should be able to call _start_mutual_ranging_and_assert_started once we get consistent data.
634    self.initiator.start_ranging_and_assert_opened(
635        SESSION_HANDLE, initiator_preference
636    )
637    self.responder.start_ranging_and_assert_opened(
638        SESSION_HANDLE, responder_preference
639    )
640
641    time.sleep(10)
642    asserts.assert_true(
643        self.initiator.verify_received_data_from_peer_using_technologies(
644            SESSION_HANDLE, self.responder.id, TECHNOLOGIES
645        ),
646        "Initiator did not find responder",
647    )
648
649    asserts.assert_true(
650        self.responder.verify_received_data_from_peer_using_technologies(
651            SESSION_HANDLE, self.initiator.id, TECHNOLOGIES
652        ),
653        "Responder did not find initiator",
654    )
655
656    self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
657    self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
658
659  @ApiTest(apis=[
660      'android.bluetooth.le.DistanceMeasurementSession#stopSession',
661      'android.content.AttributionSource#checkCallingUid',
662      'java.util#copyOf(byte[], int)',
663      'java.util#copyOfRange(byte[], int, int)',
664  ])
665  def test_one_to_one_ble_rssi_ranging(self):
666    """Verifies cs ranging with peer device, devices range for 10 seconds."""
667    asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad),
668                    "Skipping BLE RSSI test on Cuttlefish")
669    SESSION_HANDLE = str(uuid4())
670    TECHNOLOGIES = {RangingTechnology.BLE_RSSI}
671
672    asserts.skip_if(
673        not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_RSSI),
674        f"BLE RSSI not supported by responder",
675    )
676    asserts.skip_if(
677        not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_RSSI),
678        f"BLE RSSI not supported by initiator",
679    )
680    self._enable_bt()
681
682    try:
683        self._ble_connect()
684    except Exception as e:
685        asserts.skip("Failed to create ble connection", str(e))
686
687    try:
688      initiator_preference = RangingPreference(
689          device_role=DeviceRole.INITIATOR,
690          ranging_params=RawInitiatorRangingParams(
691              peer_params=[
692                  DeviceParams(
693                      peer_id=self.responder.id,
694                      rssi_params=rssi.BleRssiRangingParams(
695                      peer_address=self.responder.bt_addr,
696                      ),
697                  )
698              ],
699          ),
700      )
701
702      responder_preference = RangingPreference(
703          device_role=DeviceRole.RESPONDER,
704          ranging_params=RawResponderRangingParams(
705              peer_params=DeviceParams(
706                  peer_id=self.initiator.id,
707                  rssi_params=rssi.BleRssiRangingParams(
708                  peer_address=self.initiator.bt_addr,
709                  ),
710              ),
711          ),
712      )
713
714      self._start_mutual_ranging_and_assert_started(
715          SESSION_HANDLE,
716          initiator_preference,
717          responder_preference,
718          TECHNOLOGIES,
719      )
720
721      time.sleep(10)
722
723      asserts.assert_true(
724          self.initiator.verify_received_data_from_peer_using_technologies(
725              SESSION_HANDLE,
726              self.responder.id,
727              TECHNOLOGIES
728          ),
729          "Initiator did not find responder",
730      )
731      asserts.assert_true(
732          self.responder.verify_received_data_from_peer_using_technologies(
733              SESSION_HANDLE,
734              self.initiator.id,
735              TECHNOLOGIES,
736          ),
737          "Responder did not find initiator",
738      )
739    finally:
740      self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
741      self.responder.stop_ranging_and_assert_closed(SESSION_HANDLE)
742
743      self._ble_disconnect()
744
745  @ApiTest(apis=[
746      'android.bluetooth.le.DistanceMeasurementSession#stopSession',
747      'android.bluetooth.le.DistanceMeasurementParams#getMaxDurationSeconds',
748  ])
749  def test_one_to_one_ble_cs_ranging(self):
750    """
751    Verifies cs ranging with peer device, devices range for 10 seconds.
752    This test is only one way since we don't test if responder also can simultaneously get the data.
753    """
754    asserts.skip_if(self._is_cuttlefish_device(self.initiator.ad),
755                    "Skipping BLE CS test on Cuttlefish")
756    SESSION_HANDLE = str(uuid4())
757    TECHNOLOGIES = {RangingTechnology.BLE_CS}
758
759    asserts.skip_if(
760        not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS),
761        f"BLE_CS not supported by responder",
762    )
763    asserts.skip_if(
764        not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS),
765        f"BLE CS not supported by initiator",
766    )
767    self._enable_bt()
768
769    try:
770        self._ble_bond()
771    except Exception as e:
772        asserts.skip("Failed to create ble bond", str(e))
773
774    try:
775      initiator_preference = RangingPreference(
776          device_role=DeviceRole.INITIATOR,
777          ranging_params=RawInitiatorRangingParams(
778              peer_params=[
779                  DeviceParams(
780                      peer_id=self.responder.id,
781                      cs_params=cs.CsRangingParams(
782                        peer_address=self.responder.bt_addr,
783                      ),
784                  )
785              ],
786          ),
787      )
788
789      self._start_mutual_ranging_and_assert_started(
790          SESSION_HANDLE,
791          initiator_preference,
792          None,
793          TECHNOLOGIES,
794      )
795
796      time.sleep(10)
797
798      asserts.assert_true(
799          self.initiator.verify_received_data_from_peer_using_technologies(
800              SESSION_HANDLE,
801              self.responder.id,
802              TECHNOLOGIES
803          ),
804          "Initiator did not find responder",
805      )
806    finally:
807      self.initiator.stop_ranging_and_assert_closed(SESSION_HANDLE)
808
809      self._ble_unbond()
810
811  @ApiTest(apis=[
812    'android.ranging.oob.TransportHandle#sendData(byte[])',
813    'android.ranging.oob.TransportHandle#registerReceiveCallback(java.util.concurrent.Executor, android.ranging.oob.TransportHandle.ReceiveCallback)',
814    'android.ranging.oob.TransportHandle.ReceiveCallback#onSendFailed()',
815  ])
816  def test_one_to_one_uwb_ranging_with_oob(self):
817    asserts.skip_if(
818        not self.responder.is_ranging_technology_supported(RangingTechnology.UWB),
819        f"UWB not supported by responder",
820    )
821    asserts.skip_if(
822        not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB),
823        f"UWB not supported by initiator",
824    )
825
826    initiator_preference = RangingPreference(
827        device_role=DeviceRole.INITIATOR,
828        ranging_params=OobInitiatorRangingParams(peer_ids=[self.responder.id], ranging_mode=RangingMode.HIGH_ACCURACY),
829    )
830
831    responder_preference = RangingPreference(
832        device_role=DeviceRole.RESPONDER,
833        ranging_params=OobResponderRangingParams(peer_id=self.initiator.id),
834    )
835
836    session = RangingSession()
837    session.set_initiator(self.initiator, initiator_preference)
838    session.add_responder(self.responder, responder_preference)
839
840    session.start_and_assert_opened()
841    session.assert_received_data()
842    session.stop_and_assert_closed()
843
844  def test_one_to_one_ble_cs_ranging_with_oob(self):
845    asserts.skip_if(
846        not self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS),
847        f"BLE_CS not supported by responder",
848    )
849    asserts.skip_if(
850        not self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS),
851        f"BLE_CS not supported by initiator",
852    )
853
854    if self.initiator.is_ranging_technology_supported(RangingTechnology.UWB):
855      utils.set_uwb_state_and_verify(self.initiator.ad, state=False)
856    if self.responder.is_ranging_technology_supported(RangingTechnology.UWB):
857      utils.set_uwb_state_and_verify(self.responder.ad, state=False)
858
859    initiator_preference = RangingPreference(
860        device_role=DeviceRole.INITIATOR,
861        ranging_params=OobInitiatorRangingParams(
862          peer_ids=[self.responder.id],
863          # HIGH_ACCURACY_PREFERRED mode with UWB disabled should fallback to CS
864          ranging_mode=RangingMode.HIGH_ACCURACY_PREFERRED
865        ),
866    )
867
868    responder_preference = RangingPreference(
869        device_role=DeviceRole.RESPONDER,
870        ranging_params=OobResponderRangingParams(peer_id=self.initiator.id),
871    )
872
873    session = RangingSession()
874    session.set_initiator(self.initiator, initiator_preference)
875    session.add_responder(self.responder, responder_preference)
876
877    self._enable_bt()
878
879    try:
880        self._ble_bond()
881    except Exception as e:
882        asserts.skip("Failed to create ble bond", str(e))
883
884    try:
885      session.start_and_assert_opened(check_responders=False)
886      session.assert_received_data(technologies=[RangingTechnology.BLE_CS], check_responders=False)
887    finally:
888      session.stop_and_assert_closed(check_responders=False)
889      self._ble_unbond()
890
891  def test_one_to_one_wifi_rtt_ranging_with_oob(self):
892      asserts.skip_if(
893          not self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT),
894          f"WIFI_RTT not supported by responder",
895      )
896      asserts.skip_if(
897          not self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT),
898          f"WIFI_RTT not supported by initiator",
899      )
900
901      if self.initiator.is_ranging_technology_supported(RangingTechnology.UWB):
902          utils.set_uwb_state_and_verify(self.initiator.ad, state=False)
903      if self.responder.is_ranging_technology_supported(RangingTechnology.UWB):
904          utils.set_uwb_state_and_verify(self.responder.ad, state=False)
905
906      if self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS) and \
907             self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS):
908          self._disable_bt()
909
910      self._reset_wifi_state()
911
912      initiator_preference = RangingPreference(
913          device_role=DeviceRole.INITIATOR,
914          ranging_params=OobInitiatorRangingParams(
915              peer_ids=[self.responder.id],
916              # HIGH_ACCURACY_PREFERRED mode with UWB and CS disabled should fallback to RTT
917              ranging_mode=RangingMode.HIGH_ACCURACY_PREFERRED
918          ),
919          enable_range_data_notifications=True,
920      )
921
922      responder_preference = RangingPreference(
923          device_role=DeviceRole.RESPONDER,
924          ranging_params=OobResponderRangingParams(peer_id=self.initiator.id),
925          enable_range_data_notifications=False,
926      )
927
928      session = RangingSession()
929      session.set_initiator(self.initiator, initiator_preference)
930      session.add_responder(self.responder, responder_preference)
931
932      session.start_and_assert_opened(check_responders=False)
933      session.assert_received_data(technologies=[RangingTechnology.WIFI_RTT], check_responders=False)
934
935  def test_one_to_one_ble_rssi_ranging_with_oob(self):
936
937    """ Skip if BLE CS is supported by both devices. """
938    asserts.skip_if(
939        self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_CS) and
940        self.responder.is_ranging_technology_supported(RangingTechnology.BLE_CS),
941        f"BLE_CS is supported, skip running BLE_RSSI tests",
942    )
943
944    asserts.skip_if(
945        self.initiator.is_ranging_technology_supported(RangingTechnology.BLE_RSSI) or
946        self.responder.is_ranging_technology_supported(RangingTechnology.BLE_RSSI),
947        f"BLE_RSSI is not supported",
948        )
949
950    if self.initiator.is_ranging_technology_supported(RangingTechnology.UWB):
951        utils.set_uwb_state_and_verify(self.initiator.ad, state=False)
952    if self.responder.is_ranging_technology_supported(RangingTechnology.UWB):
953        utils.set_uwb_state_and_verify(self.responder.ad, state=False)
954
955    if self.initiator.is_ranging_technology_supported(RangingTechnology.WIFI_RTT) and \
956        self.responder.is_ranging_technology_supported(RangingTechnology.WIFI_RTT):
957      self._disable_wifi()
958
959    self._enable_bt()
960
961    try:
962        self._ble_connect()
963    except Exception as e:
964        asserts.skip("Failed to create ble connection", str(e))
965
966    try:
967      initiator_preference = RangingPreference(
968          device_role=DeviceRole.INITIATOR,
969          ranging_params=OobInitiatorRangingParams(
970            peer_ids=[self.responder.id],
971            ranging_mode=RangingMode.AUTO
972          ),
973      )
974
975      responder_preference = RangingPreference(
976          device_role=DeviceRole.RESPONDER,
977          ranging_params=OobResponderRangingParams(peer_id=self.initiator.id),
978      )
979
980      session = RangingSession()
981      session.set_initiator(self.initiator, initiator_preference)
982      session.add_responder(self.responder, responder_preference)
983
984      session.start_and_assert_opened(check_responders=False)
985      session.assert_received_data(technologies=[RangingTechnology.BLE_RSSI], check_responders=False)
986
987    finally:
988        self._ble_disconnect()
989
990
991  def test_oob_responder_persists_until_explicitly_stopped(self):
992    asserts.skip_if(
993        not self.responder.is_ranging_technology_supported(RangingTechnology.UWB),
994        f"UWB not supported by responder",
995    )
996    asserts.skip_if(
997        not self.initiator.is_ranging_technology_supported(RangingTechnology.UWB),
998        f"UWB not supported by initiator",
999    )
1000
1001    initiator_preference = RangingPreference(
1002        device_role=DeviceRole.INITIATOR,
1003        ranging_params=OobInitiatorRangingParams(peer_ids=[self.responder.id], ranging_mode=RangingMode.HIGH_ACCURACY),
1004    )
1005
1006    responder_preference = RangingPreference(
1007        device_role=DeviceRole.RESPONDER,
1008        ranging_params=OobResponderRangingParams(peer_id=self.initiator.id),
1009    )
1010
1011    session = RangingSession()
1012    session.set_initiator(self.initiator, initiator_preference)
1013    session.add_responder(self.responder, responder_preference)
1014
1015    session.start_and_assert_opened()
1016    session.assert_received_data()
1017    session.stop_and_assert_closed(stop_responders=False, check_responders=False)
1018
1019    time.sleep(1)
1020
1021    session.start_and_assert_opened(start_responders=False, check_responders=False)
1022    session.assert_received_data()
1023    session.stop_and_assert_closed()
1024
1025if __name__ == "__main__":
1026  if "--" in sys.argv:
1027    index = sys.argv.index("--")
1028    sys.argv = sys.argv[:1] + sys.argv[index + 1 :]
1029  suite_runner.run_suite([RangingManagerTest])
1030