• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2021 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import queue
18import logging
19
20from google.protobuf import empty_pb2 as empty_proto
21
22from bluetooth_packets_python3 import hci_packets
23from blueberry.facade.hci import le_advertising_manager_facade_pb2 as le_advertising_facade
24from blueberry.facade.hci import le_initiator_address_facade_pb2 as le_initiator_address_facade
25from blueberry.facade import common_pb2 as common
26from blueberry.tests.gd.cert.truth import assertThat
27from blueberry.tests.gd_sl4a.lib import gd_sl4a_base_test
28from blueberry.tests.gd_sl4a.lib.bt_constants import ble_scan_settings_modes, ble_address_types, scan_result, ble_scan_settings_phys, ble_scan_settings_callback_types
29from blueberry.tests.gd_sl4a.lib.ble_lib import generate_ble_scan_objects
30
31from mobly import test_runner
32
33
34class LeAdvancedScanningTest(gd_sl4a_base_test.GdSl4aBaseTestClass):
35
36    def setup_class(self):
37        super().setup_class(cert_module='HCI_INTERFACES')
38        self.default_timeout = 60  # seconds
39
40    def setup_test(self):
41        super().setup_test()
42
43    def teardown_test(self):
44        super().teardown_test()
45
46    def __get_test_irk(self):
47        return bytes(
48            bytearray([0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10]))
49
50    def _set_cert_privacy_policy_with_random_address(self, random_address):
51        private_policy = le_initiator_address_facade.PrivacyPolicy(
52            address_policy=le_initiator_address_facade.AddressPolicy.USE_STATIC_ADDRESS,
53            address_with_type=common.BluetoothAddressWithType(
54                address=common.BluetoothAddress(address=bytes(random_address, encoding='utf8')),
55                type=common.RANDOM_DEVICE_ADDRESS))
56        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
57
58    def _set_cert_privacy_policy_with_random_address_but_advertise_resolvable(self, irk):
59        random_address_bytes = "DD:34:02:05:5C:EE".encode()
60        private_policy = le_initiator_address_facade.PrivacyPolicy(
61            address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS,
62            address_with_type=common.BluetoothAddressWithType(
63                address=common.BluetoothAddress(address=random_address_bytes), type=common.RANDOM_DEVICE_ADDRESS),
64            rotation_irk=irk)
65        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
66        # Bluetooth MAC address must be upper case
67        return random_address_bytes.decode('utf-8').upper()
68
69    def __advertise_rpa_random_policy(self, legacy_pdus, irk):
70        DEVICE_NAME = 'Im_The_CERT!'
71        logging.info("Getting public address")
72        ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(irk)
73        logging.info("Done %s" % ADDRESS)
74
75        # Setup cert side to advertise
76        gap_name = hci_packets.GapData()
77        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
78        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
79        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
80        config = le_advertising_facade.AdvertisingConfig(
81            advertisement=[gap_data],
82            interval_min=512,
83            interval_max=768,
84            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
85            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
86            channel_map=7,
87            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
88        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
89            include_tx_power=True,
90            connectable=True,
91            legacy_pdus=legacy_pdus,
92            advertising_config=config,
93            secondary_advertising_phy=ble_scan_settings_phys["1m"])
94        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
95        logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended"))
96        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
97        logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended"))
98        return (ADDRESS, create_response)
99
100    def _advertise_rpa_random_legacy_pdu(self, irk):
101        return self.__advertise_rpa_random_policy(True, irk)
102
103    def _advertise_rpa_random_extended_pdu(self, irk):
104        return self.__advertise_rpa_random_policy(False, irk)
105
106    def _set_cert_privacy_policy_with_public_address(self):
107        public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address
108        private_policy = le_initiator_address_facade.PrivacyPolicy(
109            address_policy=le_initiator_address_facade.AddressPolicy.USE_PUBLIC_ADDRESS,
110            address_with_type=common.BluetoothAddressWithType(
111                address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS))
112        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
113        # Bluetooth MAC address must be upper case
114        return public_address_bytes.decode('utf-8').upper()
115
116    def _set_cert_privacy_policy_with_public_address_but_advertise_resolvable(self, irk):
117        public_address_bytes = self.cert.hci_controller.GetMacAddress(empty_proto.Empty()).address
118        private_policy = le_initiator_address_facade.PrivacyPolicy(
119            address_policy=le_initiator_address_facade.AddressPolicy.USE_RESOLVABLE_ADDRESS,
120            address_with_type=common.BluetoothAddressWithType(
121                address=common.BluetoothAddress(address=public_address_bytes), type=common.PUBLIC_DEVICE_ADDRESS),
122            rotation_irk=irk)
123        self.cert.hci_le_initiator_address.SetPrivacyPolicyForInitiatorAddress(private_policy)
124        # Bluetooth MAC address must be upper case
125        return public_address_bytes.decode('utf-8').upper()
126
127    def __advertise_rpa_public_policy(self, legacy_pdus, irk):
128        DEVICE_NAME = 'Im_The_CERT!'
129        logging.info("Getting public address")
130        ADDRESS = self._set_cert_privacy_policy_with_public_address_but_advertise_resolvable(irk)
131        logging.info("Done %s" % ADDRESS)
132
133        # Setup cert side to advertise
134        gap_name = hci_packets.GapData()
135        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
136        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
137        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
138        config = le_advertising_facade.AdvertisingConfig(
139            advertisement=[gap_data],
140            interval_min=512,
141            interval_max=768,
142            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
143            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
144            channel_map=7,
145            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
146        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
147            include_tx_power=True,
148            connectable=True,
149            legacy_pdus=legacy_pdus,
150            advertising_config=config,
151            secondary_advertising_phy=ble_scan_settings_phys["1m"])
152        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
153        logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended"))
154        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
155        logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended"))
156        return (ADDRESS, create_response)
157
158    def _advertise_rpa_public_legacy_pdu(self, irk):
159        return self.__advertise_rpa_public_policy(True, irk)
160
161    def _advertise_rpa_public_extended_pdu(self, irk):
162        return self.__advertise_rpa_public_policy(False, irk)
163
164    def _wait_for_scan_result_event(self, expected_event_name):
165        try:
166            # Verify if there is scan result
167            event_info = self.dut.ed.pop_event(expected_event_name, self.default_timeout)
168            # Print out scan result
169            mac_address = event_info['data']['Result']['deviceInfo']['address']
170            logging.info("Filter advertisement with address {}".format(mac_address))
171            return True
172        except queue.Empty as error:
173            logging.error("Could not find initial advertisement.")
174            return False
175
176    def _stop_advertising(self, advertiser_id):
177        logging.info("Stop advertising")
178        remove_request = le_advertising_facade.RemoveAdvertiserRequest(advertiser_id=advertiser_id)
179        self.cert.hci_le_advertising_manager.RemoveAdvertiser(remove_request)
180        logging.info("Stopped advertising")
181
182    def _stop_scanning(self, scan_callback):
183        logging.info("Stop scanning")
184        self.dut.sl4a.bleStopBleScan(scan_callback)
185        logging.info("Stopped scanning")
186
187    def test_scan_filter_device_public_address_with_irk_legacy_pdu(self):
188        """
189        The cert side will advertise a RRPA generated from the test IRK using Legacy PDU
190
191        DUT will scan for the device using the Identity Address + Address Type + IRK
192
193        Results received via ScanCallback
194        """
195        PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_legacy_pdu(self.__get_test_irk())
196        addr_type = ble_address_types["public"]
197        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" %
198                     (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
199
200        # Setup SL4A DUT side to scan
201        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
202        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
203        expected_event_name = scan_result.format(scan_callback)
204
205        # Start scanning on SL4A DUT side
206        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type),
207                                                              self.__get_test_irk().decode("utf-8"))
208        self.dut.sl4a.bleBuildScanFilter(filter_list)
209        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
210        logging.info("Started scanning")
211
212        # Wait for results
213        got_result = self._wait_for_scan_result_event(expected_event_name)
214
215        # Test over
216        self._stop_scanning(scan_callback)
217        self._stop_advertising(create_response.advertiser_id)
218
219        assertThat(got_result).isTrue()
220
221    def test_scan_filter_device_public_address_with_irk_legacy_pdu_pending_intent(self):
222        """
223        The cert side will advertise a RRPA generated from the test IRK using Legacy PDU
224
225        DUT will scan for the device using the Identity Address + Address Type + IRK
226
227        Results received via PendingIntent
228        """
229        PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_legacy_pdu(self.__get_test_irk())
230        addr_type = ble_address_types["public"]
231        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" %
232                     (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
233
234        # Setup SL4A DUT side to scan
235        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
236        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
237        # The event name needs to be set to this otherwise the index iterates from the scancallbacks
238        # being run consecutively.  This is a PendingIntent callback but it hooks into the
239        # ScanCallback and uses just the 1 for the index.
240        expected_event_name = "BleScan1onScanResults"
241
242        # Start scanning on SL4A DUT side
243        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type),
244                                                              self.__get_test_irk().decode("utf-8"))
245        self.dut.sl4a.bleBuildScanFilter(filter_list)
246        self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings)
247        logging.info("Started scanning")
248
249        # Wait for results
250        got_result = self._wait_for_scan_result_event(expected_event_name)
251
252        # Test over
253        self._stop_scanning(scan_callback)
254        self._stop_advertising(create_response.advertiser_id)
255
256        assertThat(got_result).isTrue()
257
258    def test_scan_filter_device_public_address_with_irk_extended_pdu(self):
259        """
260        The cert side will advertise a RRPA generated from the test IRK using Extended PDU
261
262        DUT will scan for the device using the Identity Address + Address Type + IRK
263
264        Results received via PendingIntent
265        """
266        PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_extended_pdu(self.__get_test_irk())
267        addr_type = ble_address_types["public"]
268        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" %
269                     (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
270
271        # Setup SL4A DUT side to scan
272        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
273        self.dut.sl4a.bleSetScanSettingsLegacy(False)
274        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
275        expected_event_name = scan_result.format(scan_callback)
276
277        # Start scanning on SL4A DUT side
278        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type),
279                                                              self.__get_test_irk().decode("utf-8"))
280        self.dut.sl4a.bleBuildScanFilter(filter_list)
281        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
282        logging.info("Started scanning")
283
284        # Wait for results
285        got_result = self._wait_for_scan_result_event(expected_event_name)
286
287        # Test over
288        self._stop_scanning(scan_callback)
289        self._stop_advertising(create_response.advertiser_id)
290
291        assertThat(got_result).isTrue()
292
293    def test_scan_filter_device_name_legacy_pdu(self):
294        """
295        The cert side will advertise using PUBLIC address and device name data on legacy PDU
296
297        DUT will scan for the device using the Device Name
298
299        Results received via ScanCallback
300        """
301        # Use public address on cert side
302        logging.info("Setting public address")
303        DEVICE_NAME = 'Im_The_CERT!'
304        public_address = self._set_cert_privacy_policy_with_public_address()
305        logging.info("Set public address")
306
307        # Setup cert side to advertise
308        gap_name = hci_packets.GapData()
309        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
310        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
311        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
312        config = le_advertising_facade.AdvertisingConfig(
313            advertisement=[gap_data],
314            interval_min=512,
315            interval_max=768,
316            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
317            own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS,
318            channel_map=7,
319            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES,
320            tx_power=20)
321        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
322        logging.info("Creating advertiser")
323        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request)
324        logging.info("Created advertiser")
325
326        # Setup SL4A DUT side to scan
327        logging.info("Start scanning with public address %s" % public_address)
328        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
329        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
330        expected_event_name = scan_result.format(scan_callback)
331
332        # Start scanning on SL4A DUT side
333        self.dut.sl4a.bleSetScanFilterDeviceName(DEVICE_NAME)
334        self.dut.sl4a.bleBuildScanFilter(filter_list)
335        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
336        logging.info("Started scanning")
337
338        # Wait for results
339        got_result = self._wait_for_scan_result_event(expected_event_name)
340
341        # Test over
342        self._stop_scanning(scan_callback)
343        self._stop_advertising(create_response.advertiser_id)
344
345        assertThat(got_result).isTrue()
346
347    def test_scan_filter_device_random_address_legacy_pdu(self):
348        """
349        The cert side will advertise using RANDOM STATIC address with legacy PDU
350
351        DUT will scan for the device using the RANDOM STATIC Address of the advertising device
352
353        Results received via ScanCallback
354        """
355        # Use random address on cert side
356        logging.info("Setting random address")
357        RANDOM_ADDRESS = 'D0:05:04:03:02:01'
358        DEVICE_NAME = 'Im_The_CERT!'
359        self._set_cert_privacy_policy_with_random_address(RANDOM_ADDRESS)
360        logging.info("Set random address")
361
362        # Setup cert side to advertise
363        gap_name = hci_packets.GapData()
364        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
365        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
366        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
367        config = le_advertising_facade.AdvertisingConfig(
368            advertisement=[gap_data],
369            interval_min=512,
370            interval_max=768,
371            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
372            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
373            channel_map=7,
374            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
375        request = le_advertising_facade.CreateAdvertiserRequest(config=config)
376        logging.info("Creating advertiser")
377        create_response = self.cert.hci_le_advertising_manager.CreateAdvertiser(request)
378        logging.info("Created advertiser")
379
380        # Setup SL4A DUT side to scan
381        addr_type = ble_address_types["random"]
382        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d" % (RANDOM_ADDRESS, addr_type))
383        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
384        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
385        expected_event_name = scan_result.format(scan_callback)
386
387        # Start scanning on SL4A DUT side
388        self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(RANDOM_ADDRESS, int(addr_type))
389        self.dut.sl4a.bleBuildScanFilter(filter_list)
390        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
391        logging.info("Started scanning")
392
393        # Wait for results
394        got_result = self._wait_for_scan_result_event(expected_event_name)
395
396        # Test over
397        self._stop_scanning(scan_callback)
398        self._stop_advertising(create_response.advertiser_id)
399
400        assertThat(got_result).isTrue()
401
402    def test_scan_filter_device_public_address_extended_pdu(self):
403        """
404        The cert side will advertise using PUBLIC address with Extended PDU
405
406        DUT will scan for the device using the PUBLIC Address of the advertising device
407
408        Results received via ScanCallback
409        """
410        # Use public address on cert side
411        logging.info("Setting public address")
412        DEVICE_NAME = 'Im_The_CERT!'
413        public_address = self._set_cert_privacy_policy_with_public_address()
414        logging.info("Set public address")
415
416        # Setup cert side to advertise
417        gap_name = hci_packets.GapData()
418        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
419        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
420        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
421        config = le_advertising_facade.AdvertisingConfig(
422            advertisement=[gap_data],
423            interval_min=512,
424            interval_max=768,
425            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
426            own_address_type=common.USE_PUBLIC_DEVICE_ADDRESS,
427            channel_map=7,
428            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
429        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
430            advertising_config=config, secondary_advertising_phy=ble_scan_settings_phys["1m"])
431        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
432        logging.info("Creating advertiser")
433        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
434        logging.info("Created advertiser")
435
436        # Setup SL4A DUT side to scan
437        addr_type = ble_address_types["public"]
438        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d" % (public_address, addr_type))
439        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
440        self.dut.sl4a.bleSetScanSettingsLegacy(False)
441        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
442        expected_event_name = scan_result.format(scan_callback)
443
444        # Start scanning on SL4A DUT side
445        self.dut.sl4a.bleSetScanFilterDeviceAddressAndType(public_address, int(addr_type))
446        self.dut.sl4a.bleBuildScanFilter(filter_list)
447        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
448        logging.info("Started scanning")
449
450        # Wait for results
451        got_result = self._wait_for_scan_result_event(expected_event_name)
452
453        # Test over
454        self._stop_scanning(scan_callback)
455        self._stop_advertising(create_response.advertiser_id)
456
457        assertThat(got_result).isTrue()
458
459    def test_scan_filter_device_public_address_with_irk_extended_pdu_pending_intent(self):
460        """
461        The cert side will advertise using RRPA with Extended PDU
462
463        DUT will scan for the device using the pre-shared PUBLIC ADDRESS of the advertising
464        device + IRK
465
466        Results received via PendingIntent
467        """
468        PUBLIC_ADDRESS, create_response = self._advertise_rpa_public_extended_pdu(self.__get_test_irk())
469
470        # Setup SL4A DUT side to scan
471        addr_type = ble_address_types["public"]
472        logging.info("Start scanning for PUBLIC_ADDRESS %s with address type %d and IRK %s" %
473                     (PUBLIC_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
474        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
475        self.dut.sl4a.bleSetScanSettingsLegacy(False)
476        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
477        # Hard code here since callback index iterates and will cause this to fail if ran
478        # Second as the impl in SL4A sends this since its a single callback for broadcast.
479        expected_event_name = "BleScan1onScanResults"
480
481        # Start scanning on SL4A DUT side
482        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(PUBLIC_ADDRESS, int(addr_type),
483                                                              self.__get_test_irk().decode("utf-8"))
484        self.dut.sl4a.bleBuildScanFilter(filter_list)
485        self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings)
486        logging.info("Started scanning")
487
488        # Wait for results
489        got_result = self._wait_for_scan_result_event(expected_event_name)
490
491        # Test over
492        self._stop_scanning(scan_callback)
493        self._stop_advertising(create_response.advertiser_id)
494
495        assertThat(got_result).isTrue()
496
497    def test_scan_filter_device_random_address_with_irk_extended_pdu(self):
498        """
499        The cert side will advertise using RRPA with Extended PDU
500
501        DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising
502        device + IRK
503
504        Results received via ScanCallback
505        """
506        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())
507
508        # Setup SL4A DUT side to scan
509        addr_type = ble_address_types["random"]
510        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
511                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
512        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
513        self.dut.sl4a.bleSetScanSettingsLegacy(False)
514        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
515        expected_event_name = scan_result.format(scan_callback)
516
517        # Setup SL4A DUT filter
518        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
519                                                              self.__get_test_irk().decode("utf-8"))
520        self.dut.sl4a.bleBuildScanFilter(filter_list)
521
522        # Start scanning on SL4A DUT side
523        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
524        logging.info("Started scanning")
525
526        # Wait for results
527        got_result = self._wait_for_scan_result_event(expected_event_name)
528
529        # Test over
530        self._stop_scanning(scan_callback)
531        self._stop_advertising(create_response.advertiser_id)
532
533        assertThat(got_result).isTrue()
534
535    def test_scan_filter_device_random_address_with_irk_extended_pdu_pending_intent(self):
536        """
537        The cert side will advertise using RRPA with Extended PDU
538
539        DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising
540        device + IRK
541
542        Results received via PendingIntent
543        """
544        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())
545
546        # Setup SL4A DUT side to scan
547        addr_type = ble_address_types["random"]
548        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
549                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
550        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
551        self.dut.sl4a.bleSetScanSettingsLegacy(False)
552        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
553        # Hard code here since callback index iterates and will cause this to fail if ran
554        # Second as the impl in SL4A sends this since its a single callback for broadcast.
555        expected_event_name = "BleScan1onScanResults"
556
557        # Setup SL4A DUT filter
558        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
559                                                              self.__get_test_irk().decode("utf-8"))
560        self.dut.sl4a.bleBuildScanFilter(filter_list)
561
562        # Start scanning on SL4A DUT side
563        self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings)
564        logging.info("Started scanning")
565
566        # Wait for results
567        got_result = self._wait_for_scan_result_event(expected_event_name)
568
569        # Test over
570        self._stop_scanning(scan_callback)
571        self._stop_advertising(create_response.advertiser_id)
572
573        assertThat(got_result).isTrue()
574
575    def test_scan_filter_device_random_address_with_irk_extended_pdu_scan_twice(self):
576        """
577        The cert side will advertise using RRPA with Extended PDU
578
579        DUT will scan for the device using the pre-shared RANDOM STATIC ADDRESS of the advertising
580        device + IRK
581
582        DUT will stop scanning, then start scanning again for results
583
584        Results received via ScanCallback
585        """
586        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())
587
588        # Setup SL4A DUT side to scan
589        addr_type = ble_address_types["random"]
590        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
591                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
592        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
593        self.dut.sl4a.bleSetScanSettingsLegacy(False)
594        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
595        expected_event_name = scan_result.format(scan_callback)
596
597        # Start scanning on SL4A DUT side
598        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
599                                                              self.__get_test_irk().decode("utf-8"))
600        self.dut.sl4a.bleBuildScanFilter(filter_list)
601        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
602        logging.info("Started scanning")
603
604        # Wait for results
605        got_result = self._wait_for_scan_result_event(expected_event_name)
606
607        # Test over
608        self._stop_scanning(scan_callback)
609
610        # Start scanning on SL4A DUT side
611        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
612        logging.info("Started scanning...again")
613
614        # Wait for results
615        got_result = self._wait_for_scan_result_event(expected_event_name)
616
617        # Test over
618        self._stop_scanning(scan_callback)
619        self._stop_advertising(create_response.advertiser_id)
620
621        assertThat(got_result).isTrue()
622
623    def test_scan_filter_device_random_address_with_irk_extended_pdu_pending_intent_128_640(self):
624        """
625        The CERT side will advertise an RPA derived from the IRK.
626
627        The DUT (SL4A) side will scan for a RPA with matching IRK.
628
629        Adjust the scan intervals to Digital Carkey specific timings.
630        """
631        DEVICE_NAME = 'Im_The_CERT!'
632        logging.info("Getting public address")
633        RANDOM_ADDRESS = self._set_cert_privacy_policy_with_random_address_but_advertise_resolvable(
634            self.__get_test_irk())
635        logging.info("Done %s" % RANDOM_ADDRESS)
636
637        legacy_pdus = False
638
639        # Setup cert side to advertise
640        gap_name = hci_packets.GapData()
641        gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
642        gap_name.data = list(bytes(DEVICE_NAME, encoding='utf8'))
643        gap_data = le_advertising_facade.GapDataMsg(data=bytes(gap_name.Serialize()))
644        config = le_advertising_facade.AdvertisingConfig(
645            advertisement=[gap_data],
646            interval_min=128,
647            interval_max=640,
648            advertising_type=le_advertising_facade.AdvertisingEventType.ADV_IND,
649            own_address_type=common.USE_RANDOM_DEVICE_ADDRESS,
650            channel_map=7,
651            filter_policy=le_advertising_facade.AdvertisingFilterPolicy.ALL_DEVICES)
652        extended_config = le_advertising_facade.ExtendedAdvertisingConfig(
653            include_tx_power=True,
654            connectable=True,
655            legacy_pdus=legacy_pdus,
656            advertising_config=config,
657            secondary_advertising_phy=ble_scan_settings_phys["1m"])
658        request = le_advertising_facade.ExtendedCreateAdvertiserRequest(config=extended_config)
659        logging.info("Creating %s PDU advertiser..." % ("Legacy" if legacy_pdus else "Extended"))
660        create_response = self.cert.hci_le_advertising_manager.ExtendedCreateAdvertiser(request)
661        logging.info("%s PDU advertiser created." % ("Legacy" if legacy_pdus else "Extended"))
662
663        # Setup SL4A DUT side to scan
664        addr_type = ble_address_types["random"]
665        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
666                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
667        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['ambient_discovery'])
668        self.dut.sl4a.bleSetScanSettingsLegacy(False)
669        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
670        # Hard code here since callback index iterates and will cause this to fail if ran
671        # Second as the impl in SL4A sends this since its a single callback for broadcast.
672        expected_event_name = "BleScan1onScanResults"
673
674        # Start scanning on SL4A DUT side
675        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
676                                                              self.__get_test_irk().decode("utf-8"))
677        self.dut.sl4a.bleBuildScanFilter(filter_list)
678        self.dut.sl4a.bleStartBleScanPendingIntent(filter_list, scan_settings)
679        logging.info("Started scanning")
680
681        # Wait for results
682        got_result = self._wait_for_scan_result_event(expected_event_name)
683
684        # Test over
685        self._stop_scanning(scan_callback)
686        self._stop_advertising(create_response.advertiser_id)
687
688        assertThat(got_result).isTrue()
689
690    def test_scan_filter_lost_random_address_with_irk(self):
691        RANDOM_ADDRESS, create_response = self._advertise_rpa_random_extended_pdu(self.__get_test_irk())
692
693        # Setup SL4A DUT side to scan
694        addr_type = ble_address_types["random"]
695        logging.info("Start scanning for RANDOM_ADDRESS %s with address type %d and IRK %s" %
696                     (RANDOM_ADDRESS, addr_type, self.__get_test_irk().decode("utf-8")))
697        self.dut.sl4a.bleSetScanSettingsScanMode(ble_scan_settings_modes['low_latency'])
698        self.dut.sl4a.bleSetScanSettingsLegacy(False)
699        self.dut.sl4a.bleSetScanSettingsCallbackType(ble_scan_settings_callback_types['found_and_lost'])
700        filter_list, scan_settings, scan_callback = generate_ble_scan_objects(self.dut.sl4a)
701        expected_event_name = scan_result.format(scan_callback)
702
703        # Start scanning on SL4A DUT side
704        self.dut.sl4a.bleSetScanFilterDeviceAddressTypeAndIrk(RANDOM_ADDRESS, int(addr_type),
705                                                              self.__get_test_irk().decode("utf-8"))
706        self.dut.sl4a.bleBuildScanFilter(filter_list)
707        self.dut.sl4a.bleStartBleScan(filter_list, scan_settings, scan_callback)
708        logging.info("Started scanning")
709
710        # Wait for found event to ensure scanning is started before stopping the advertiser
711        # to trigger lost event, else the lost event might not be caught by the test
712        got_found_result = self._wait_for_scan_result_event(expected_event_name)
713        assertThat(got_found_result).isTrue()
714
715        self._stop_advertising(create_response.advertiser_id)
716
717        # Wait for lost event
718        got_lost_result = self._wait_for_scan_result_event(expected_event_name)
719        assertThat(got_lost_result).isTrue()
720
721        # Test over
722        self._stop_scanning(scan_callback)
723
724
725if __name__ == '__main__':
726    test_runner.main()
727