1#!/usr/bin/env python3 2# 3# Copyright 2021 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17import queue 18import logging 19 20from google.protobuf import empty_pb2 as empty_proto 21 22from bluetooth_packets_python3 import hci_packets 23from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade 24from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade 25from blueberry.facade import common_pb2 as common 26from blueberry.tests.gd.cert.truth import assertThat 27from blueberry.tests.gd_sl4a.lib import gd_sl4a_base_test 28from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes, ble_address_types, scan_result, ble_scan_settings_phys, ble_scan_settings_callback_types 29from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects 30 31from mobly import test_runner 32 33 34class LeAdvancedScanningTest(gd_sl4a_base_test.GdSl4aBaseTestClass): 35 36 def setup_class(self): 37 super().setup_class(cert_module='HCI_INTERFACES') 38 self.default_timeout = 60 # seconds 39 40 def setup_test(self): 41 super().setup_test() 42 43 def teardown_test(self): 44 super().teardown_test() 45 46 def __get_test_irk(self): 47 return bytes( 48 bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10])) 49 50 def _set_cert_privacy_policy_with_random_address(self, random_address): 51 private_policy = le_initiator_address_facade.PrivacyPolicy( 52 address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS, 53 address_with_type=common.BluetoothAddressWithType( 54 address=common.BluetoothAddress(address=bytes(random_address, encoding='utf8')), 55 type=common.RANDOM_DEVICE_ADDRESS)) 56 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 57 58 def _set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk): 59 random_address_bytes = "DD:34:02:05:5C:EE".encode() 60 private_policy = le_initiator_address_facade.PrivacyPolicy( 61 address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, 62 address_with_type=common.BluetoothAddressWithType( 63 address=common.BluetoothAddress(address=random_address_bytes), type=common.RANDOM_DEVICE_ADDRESS), 64 rotation_irk=irk) 65 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 66 # Bluetooth MAC address must be upper case 67 return random_address_bytes.decode('utf-8').upper() 68 69 def __advertise_rpa_random_policy(self, legacy_pdus, irk): 70 DEVICE_NAME = 'Im_The_CERT!' 71 logging.info("Getting public address") 72 ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk) 73 logging.info("Done %s" % ADDRESS) 74 75 # Setup cert side to advertise 76 gap_name = hci_packets.GapData() 77 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 78 gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) 79 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 80 config = le_advertising_facade.AdvertisingConfig( 81 advertisement=[gap_data], 82 interval_min=512, 83 interval_max=768, 84 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 85 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 86 channel_map=7, 87 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 88 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 89 include_tx_power=True, 90 connectable=True, 91 legacy_pdus=legacy_pdus, 92 advertising_config=config, 93 secondary_advertising_phy=ble_scan_settings_phys["1m"]) 94 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 95 logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended")) 96 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 97 logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended")) 98 return (ADDRESS, create_response) 99 100 def _advertise_rpa_random_legacy_pdu(self, irk): 101 return self.__advertise_rpa_random_policy(True, irk) 102 103 def _advertise_rpa_random_extended_pdu(self, irk): 104 return self.__advertise_rpa_random_policy(False, irk) 105 106 def _set_cert_privacy_policy_with_public_address(self): 107 public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address 108 private_policy = le_initiator_address_facade.PrivacyPolicy( 109 address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS, 110 address_with_type=common.BluetoothAddressWithType( 111 address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS)) 112 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 113 # Bluetooth MAC address must be upper case 114 return public_address_bytes.decode('utf-8').upper() 115 116 def _set_cert_privacy_policy_with_public_address_but_advertise_resolvable(self, irk): 117 public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address 118 private_policy = le_initiator_address_facade.PrivacyPolicy( 119 address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS, 120 address_with_type=common.BluetoothAddressWithType( 121 address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS), 122 rotation_irk=irk) 123 self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy) 124 # Bluetooth MAC address must be upper case 125 return public_address_bytes.decode('utf-8').upper() 126 127 def __advertise_rpa_public_policy(self, legacy_pdus, irk): 128 DEVICE_NAME = 'Im_The_CERT!' 129 logging.info("Getting public address") 130 ADDRESS = self._set_cert_privacy_policy_with_public_address_but_advertise_resolvable(irk) 131 logging.info("Done %s" % ADDRESS) 132 133 # Setup cert side to advertise 134 gap_name = hci_packets.GapData() 135 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 136 gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) 137 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 138 config = le_advertising_facade.AdvertisingConfig( 139 advertisement=[gap_data], 140 interval_min=512, 141 interval_max=768, 142 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 143 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 144 channel_map=7, 145 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 146 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 147 include_tx_power=True, 148 connectable=True, 149 legacy_pdus=legacy_pdus, 150 advertising_config=config, 151 secondary_advertising_phy=ble_scan_settings_phys["1m"]) 152 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 153 logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended")) 154 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 155 logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended")) 156 return (ADDRESS, create_response) 157 158 def _advertise_rpa_public_legacy_pdu(self, irk): 159 return self.__advertise_rpa_public_policy(True, irk) 160 161 def _advertise_rpa_public_extended_pdu(self, irk): 162 return self.__advertise_rpa_public_policy(False, irk) 163 164 def _wait_for_scan_result_event(self, expected_event_name): 165 try: 166 # Verify if there is scan result 167 event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout) 168 # Print out scan result 169 mac_address = event_info['data']['Result']['deviceInfo']['address'] 170 logging.info("Filter advertisement with address {}".format(mac_address)) 171 return True 172 except queue.Empty as error: 173 logging.error("Could not find initial advertisement.") 174 return False 175 176 def _stop_advertising(self, advertiser_id): 177 logging.info("Stop advertising") 178 remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=advertiser_id) 179 self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request) 180 logging.info("Stopped advertising") 181 182 def _stop_scanning(self, scan_callback): 183 logging.info("Stop scanning") 184 self.dut.sl4a.bleStopBleScan(scan_callback) 185 logging.info("Stopped scanning") 186 187 def test_scan_filter_device_public_address_with_irk_legacy_pdu(self): 188 """ 189 The cert side will advertise a RRPA generated from the test IRK using Legacy PDU 190 191 DUT will scan for the device using the Identity Address + Address Type + IRK 192 193 Results received via ScanCallback 194 """ 195 PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_legacy_pdu(self.__get_test_irk()) 196 addr_type = ble_address_types["public"] 197 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % 198 (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 199 200 # Setup SL4A DUT side to scan 201 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 202 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 203 expected_event_name = scan_result.format(scan_callback) 204 205 # Start scanning on SL4A DUT side 206 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type), 207 self.__get_test_irk().decode("utf-8")) 208 self.dut.sl4a.bleBuildScanFilter(filter_list) 209 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 210 logging.info("Started scanning") 211 212 # Wait for results 213 got_result = self._wait_for_scan_result_event(expected_event_name) 214 215 # Test over 216 self._stop_scanning(scan_callback) 217 self._stop_advertising(create_response.advertiser_id) 218 219 assertThat(got_result).isTrue() 220 221 def test_scan_filter_device_public_address_with_irk_legacy_pdu_pending_intent(self): 222 """ 223 The cert side will advertise a RRPA generated from the test IRK using Legacy PDU 224 225 DUT will scan for the device using the Identity Address + Address Type + IRK 226 227 Results received via PendingIntent 228 """ 229 PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_legacy_pdu(self.__get_test_irk()) 230 addr_type = ble_address_types["public"] 231 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % 232 (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 233 234 # Setup SL4A DUT side to scan 235 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 236 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 237 # The event name needs to be set to this otherwise the index iterates from the scancallbacks 238 # being run consecutively. This is a PendingIntent callback but it hooks into the 239 # ScanCallback and uses just the 1 for the index. 240 expected_event_name = "BleScan1onScanResults" 241 242 # Start scanning on SL4A DUT side 243 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type), 244 self.__get_test_irk().decode("utf-8")) 245 self.dut.sl4a.bleBuildScanFilter(filter_list) 246 self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings) 247 logging.info("Started scanning") 248 249 # Wait for results 250 got_result = self._wait_for_scan_result_event(expected_event_name) 251 252 # Test over 253 self._stop_scanning(scan_callback) 254 self._stop_advertising(create_response.advertiser_id) 255 256 assertThat(got_result).isTrue() 257 258 def test_scan_filter_device_public_address_with_irk_extended_pdu(self): 259 """ 260 The cert side will advertise a RRPA generated from the test IRK using Extended PDU 261 262 DUT will scan for the device using the Identity Address + Address Type + IRK 263 264 Results received via PendingIntent 265 """ 266 PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_extended_pdu(self.__get_test_irk()) 267 addr_type = ble_address_types["public"] 268 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % 269 (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 270 271 # Setup SL4A DUT side to scan 272 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 273 self.dut.sl4a.bleSetScanSettingsLegacy(False) 274 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 275 expected_event_name = scan_result.format(scan_callback) 276 277 # Start scanning on SL4A DUT side 278 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type), 279 self.__get_test_irk().decode("utf-8")) 280 self.dut.sl4a.bleBuildScanFilter(filter_list) 281 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 282 logging.info("Started scanning") 283 284 # Wait for results 285 got_result = self._wait_for_scan_result_event(expected_event_name) 286 287 # Test over 288 self._stop_scanning(scan_callback) 289 self._stop_advertising(create_response.advertiser_id) 290 291 assertThat(got_result).isTrue() 292 293 def test_scan_filter_device_name_legacy_pdu(self): 294 """ 295 The cert side will advertise using PUBLIC address and device name data on legacy PDU 296 297 DUT will scan for the device using the Device Name 298 299 Results received via ScanCallback 300 """ 301 # Use public address on cert side 302 logging.info("Setting public address") 303 DEVICE_NAME = 'Im_The_CERT!' 304 public_address = self._set_cert_privacy_policy_with_public_address() 305 logging.info("Set public address") 306 307 # Setup cert side to advertise 308 gap_name = hci_packets.GapData() 309 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 310 gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) 311 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 312 config = le_advertising_facade.AdvertisingConfig( 313 advertisement=[gap_data], 314 interval_min=512, 315 interval_max=768, 316 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 317 own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, 318 channel_map=7, 319 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES, 320 tx_power=20) 321 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 322 logging.info("Creating advertiser") 323 create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) 324 logging.info("Created advertiser") 325 326 # Setup SL4A DUT side to scan 327 logging.info("Start scanning with public address %s" % public_address) 328 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 329 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 330 expected_event_name = scan_result.format(scan_callback) 331 332 # Start scanning on SL4A DUT side 333 self.dut.sl4a.bleSetScanFilterDeviceName(DEVICE_NAME) 334 self.dut.sl4a.bleBuildScanFilter(filter_list) 335 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 336 logging.info("Started scanning") 337 338 # Wait for results 339 got_result = self._wait_for_scan_result_event(expected_event_name) 340 341 # Test over 342 self._stop_scanning(scan_callback) 343 self._stop_advertising(create_response.advertiser_id) 344 345 assertThat(got_result).isTrue() 346 347 def test_scan_filter_device_random_address_legacy_pdu(self): 348 """ 349 The cert side will advertise using RANDOM STATIC address with legacy PDU 350 351 DUT will scan for the device using the RANDOM STATIC Address of the advertising device 352 353 Results received via ScanCallback 354 """ 355 # Use random address on cert side 356 logging.info("Setting random address") 357 RANDOM_ADDRESS = 'D0:05:04:03:02:01' 358 DEVICE_NAME = 'Im_The_CERT!' 359 self._set_cert_privacy_policy_with_random_address(RANDOM_ADDRESS) 360 logging.info("Set random address") 361 362 # Setup cert side to advertise 363 gap_name = hci_packets.GapData() 364 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 365 gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) 366 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 367 config = le_advertising_facade.AdvertisingConfig( 368 advertisement=[gap_data], 369 interval_min=512, 370 interval_max=768, 371 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 372 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 373 channel_map=7, 374 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 375 request = le_advertising_facade.CreateAdvertiserRequest(config=config) 376 logging.info("Creating advertiser") 377 create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request) 378 logging.info("Created advertiser") 379 380 # Setup SL4A DUT side to scan 381 addr_type = ble_address_types["random"] 382 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d" % (RANDOM_ADDRESS, addr_type)) 383 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 384 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 385 expected_event_name = scan_result.format(scan_callback) 386 387 # Start scanning on SL4A DUT side 388 self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(RANDOM_ADDRESS, int(addr_type)) 389 self.dut.sl4a.bleBuildScanFilter(filter_list) 390 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 391 logging.info("Started scanning") 392 393 # Wait for results 394 got_result = self._wait_for_scan_result_event(expected_event_name) 395 396 # Test over 397 self._stop_scanning(scan_callback) 398 self._stop_advertising(create_response.advertiser_id) 399 400 assertThat(got_result).isTrue() 401 402 def test_scan_filter_device_public_address_extended_pdu(self): 403 """ 404 The cert side will advertise using PUBLIC address with Extended PDU 405 406 DUT will scan for the device using the PUBLIC Address of the advertising device 407 408 Results received via ScanCallback 409 """ 410 # Use public address on cert side 411 logging.info("Setting public address") 412 DEVICE_NAME = 'Im_The_CERT!' 413 public_address = self._set_cert_privacy_policy_with_public_address() 414 logging.info("Set public address") 415 416 # Setup cert side to advertise 417 gap_name = hci_packets.GapData() 418 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 419 gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) 420 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 421 config = le_advertising_facade.AdvertisingConfig( 422 advertisement=[gap_data], 423 interval_min=512, 424 interval_max=768, 425 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 426 own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS, 427 channel_map=7, 428 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 429 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 430 advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"]) 431 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 432 logging.info("Creating advertiser") 433 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 434 logging.info("Created advertiser") 435 436 # Setup SL4A DUT side to scan 437 addr_type = ble_address_types["public"] 438 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d" % (public_address, addr_type)) 439 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 440 self.dut.sl4a.bleSetScanSettingsLegacy(False) 441 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 442 expected_event_name = scan_result.format(scan_callback) 443 444 # Start scanning on SL4A DUT side 445 self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(public_address, int(addr_type)) 446 self.dut.sl4a.bleBuildScanFilter(filter_list) 447 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 448 logging.info("Started scanning") 449 450 # Wait for results 451 got_result = self._wait_for_scan_result_event(expected_event_name) 452 453 # Test over 454 self._stop_scanning(scan_callback) 455 self._stop_advertising(create_response.advertiser_id) 456 457 assertThat(got_result).isTrue() 458 459 def test_scan_filter_device_public_address_with_irk_extended_pdu_pending_intent(self): 460 """ 461 The cert side will advertise using RRPA with Extended PDU 462 463 DUT will scan for the device using the pre-shared PUBLIC ADDRESS of the advertising 464 device + IRK 465 466 Results received via PendingIntent 467 """ 468 PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_extended_pdu(self.__get_test_irk()) 469 470 # Setup SL4A DUT side to scan 471 addr_type = ble_address_types["public"] 472 logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" % 473 (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 474 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 475 self.dut.sl4a.bleSetScanSettingsLegacy(False) 476 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 477 # Hard code here since callback index iterates and will cause this to fail if ran 478 # Second as the impl in SL4A sends this since its a single callback for broadcast. 479 expected_event_name = "BleScan1onScanResults" 480 481 # Start scanning on SL4A DUT side 482 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type), 483 self.__get_test_irk().decode("utf-8")) 484 self.dut.sl4a.bleBuildScanFilter(filter_list) 485 self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings) 486 logging.info("Started scanning") 487 488 # Wait for results 489 got_result = self._wait_for_scan_result_event(expected_event_name) 490 491 # Test over 492 self._stop_scanning(scan_callback) 493 self._stop_advertising(create_response.advertiser_id) 494 495 assertThat(got_result).isTrue() 496 497 def test_scan_filter_device_random_address_with_irk_extended_pdu(self): 498 """ 499 The cert side will advertise using RRPA with Extended PDU 500 501 DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising 502 device + IRK 503 504 Results received via ScanCallback 505 """ 506 RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk()) 507 508 # Setup SL4A DUT side to scan 509 addr_type = ble_address_types["random"] 510 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 511 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 512 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 513 self.dut.sl4a.bleSetScanSettingsLegacy(False) 514 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 515 expected_event_name = scan_result.format(scan_callback) 516 517 # Setup SL4A DUT filter 518 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 519 self.__get_test_irk().decode("utf-8")) 520 self.dut.sl4a.bleBuildScanFilter(filter_list) 521 522 # Start scanning on SL4A DUT side 523 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 524 logging.info("Started scanning") 525 526 # Wait for results 527 got_result = self._wait_for_scan_result_event(expected_event_name) 528 529 # Test over 530 self._stop_scanning(scan_callback) 531 self._stop_advertising(create_response.advertiser_id) 532 533 assertThat(got_result).isTrue() 534 535 def test_scan_filter_device_random_address_with_irk_extended_pdu_pending_intent(self): 536 """ 537 The cert side will advertise using RRPA with Extended PDU 538 539 DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising 540 device + IRK 541 542 Results received via PendingIntent 543 """ 544 RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk()) 545 546 # Setup SL4A DUT side to scan 547 addr_type = ble_address_types["random"] 548 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 549 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 550 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 551 self.dut.sl4a.bleSetScanSettingsLegacy(False) 552 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 553 # Hard code here since callback index iterates and will cause this to fail if ran 554 # Second as the impl in SL4A sends this since its a single callback for broadcast. 555 expected_event_name = "BleScan1onScanResults" 556 557 # Setup SL4A DUT filter 558 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 559 self.__get_test_irk().decode("utf-8")) 560 self.dut.sl4a.bleBuildScanFilter(filter_list) 561 562 # Start scanning on SL4A DUT side 563 self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings) 564 logging.info("Started scanning") 565 566 # Wait for results 567 got_result = self._wait_for_scan_result_event(expected_event_name) 568 569 # Test over 570 self._stop_scanning(scan_callback) 571 self._stop_advertising(create_response.advertiser_id) 572 573 assertThat(got_result).isTrue() 574 575 def test_scan_filter_device_random_address_with_irk_extended_pdu_scan_twice(self): 576 """ 577 The cert side will advertise using RRPA with Extended PDU 578 579 DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising 580 device + IRK 581 582 DUT will stop scanning, then start scanning again for results 583 584 Results received via ScanCallback 585 """ 586 RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk()) 587 588 # Setup SL4A DUT side to scan 589 addr_type = ble_address_types["random"] 590 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 591 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 592 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 593 self.dut.sl4a.bleSetScanSettingsLegacy(False) 594 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 595 expected_event_name = scan_result.format(scan_callback) 596 597 # Start scanning on SL4A DUT side 598 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 599 self.__get_test_irk().decode("utf-8")) 600 self.dut.sl4a.bleBuildScanFilter(filter_list) 601 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 602 logging.info("Started scanning") 603 604 # Wait for results 605 got_result = self._wait_for_scan_result_event(expected_event_name) 606 607 # Test over 608 self._stop_scanning(scan_callback) 609 610 # Start scanning on SL4A DUT side 611 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 612 logging.info("Started scanning...again") 613 614 # Wait for results 615 got_result = self._wait_for_scan_result_event(expected_event_name) 616 617 # Test over 618 self._stop_scanning(scan_callback) 619 self._stop_advertising(create_response.advertiser_id) 620 621 assertThat(got_result).isTrue() 622 623 def test_scan_filter_device_random_address_with_irk_extended_pdu_pending_intent_128_640(self): 624 """ 625 The CERT side will advertise an RPA derived from the IRK. 626 627 The DUT (SL4A) side will scan for a RPA with matching IRK. 628 629 Adjust the scan intervals to Digital Carkey specific timings. 630 """ 631 DEVICE_NAME = 'Im_The_CERT!' 632 logging.info("Getting public address") 633 RANDOM_ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable( 634 self.__get_test_irk()) 635 logging.info("Done %s" % RANDOM_ADDRESS) 636 637 legacy_pdus = False 638 639 # Setup cert side to advertise 640 gap_name = hci_packets.GapData() 641 gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME 642 gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8')) 643 gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize())) 644 config = le_advertising_facade.AdvertisingConfig( 645 advertisement=[gap_data], 646 interval_min=128, 647 interval_max=640, 648 advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND, 649 own_address_type=common.USE_RANDOM_DEVICE_ADDRESS, 650 channel_map=7, 651 filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES) 652 extended_config = le_advertising_facade.ExtendedAdvertisingConfig( 653 include_tx_power=True, 654 connectable=True, 655 legacy_pdus=legacy_pdus, 656 advertising_config=config, 657 secondary_advertising_phy=ble_scan_settings_phys["1m"]) 658 request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config) 659 logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended")) 660 create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request) 661 logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended")) 662 663 # Setup SL4A DUT side to scan 664 addr_type = ble_address_types["random"] 665 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 666 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 667 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['ambient_discovery']) 668 self.dut.sl4a.bleSetScanSettingsLegacy(False) 669 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 670 # Hard code here since callback index iterates and will cause this to fail if ran 671 # Second as the impl in SL4A sends this since its a single callback for broadcast. 672 expected_event_name = "BleScan1onScanResults" 673 674 # Start scanning on SL4A DUT side 675 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 676 self.__get_test_irk().decode("utf-8")) 677 self.dut.sl4a.bleBuildScanFilter(filter_list) 678 self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings) 679 logging.info("Started scanning") 680 681 # Wait for results 682 got_result = self._wait_for_scan_result_event(expected_event_name) 683 684 # Test over 685 self._stop_scanning(scan_callback) 686 self._stop_advertising(create_response.advertiser_id) 687 688 assertThat(got_result).isTrue() 689 690 def test_scan_filter_lost_random_address_with_irk(self): 691 RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk()) 692 693 # Setup SL4A DUT side to scan 694 addr_type = ble_address_types["random"] 695 logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" % 696 (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8"))) 697 self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency']) 698 self.dut.sl4a.bleSetScanSettingsLegacy(False) 699 self.dut.sl4a.bleSetScanSettingsCallbackType(ble_scan_settings_callback_types['found_and_lost']) 700 filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a) 701 expected_event_name = scan_result.format(scan_callback) 702 703 # Start scanning on SL4A DUT side 704 self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type), 705 self.__get_test_irk().decode("utf-8")) 706 self.dut.sl4a.bleBuildScanFilter(filter_list) 707 self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback) 708 logging.info("Started scanning") 709 710 # Wait for found event to ensure scanning is started before stopping the advertiser 711 # to trigger lost event, else the lost event might not be caught by the test 712 got_found_result = self._wait_for_scan_result_event(expected_event_name) 713 assertThat(got_found_result).isTrue() 714 715 self._stop_advertising(create_response.advertiser_id) 716 717 # Wait for lost event 718 got_lost_result = self._wait_for_scan_result_event(expected_event_name) 719 assertThat(got_lost_result).isTrue() 720 721 # Test over 722 self._stop_scanning(scan_callback) 723 724 725if __name__ == '__main__': 726 test_runner.main() 727