• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import logging
18
19from blueberry.tests.gd.cert.captures import HalCaptures, HciCaptures
20from blueberry.tests.gd.cert.matchers import HciMatchers
21from blueberry.tests.gd.cert.py_hal import PyHal
22from blueberry.tests.gd.cert.py_hci import PyHci
23from blueberry.tests.gd.cert.truth import assertThat
24from blueberry.tests.gd.cert import gd_base_test
25from blueberry.facade import common_pb2 as common
26from mobly import test_runner
27
28import hci_packets as hci
29from blueberry.utils import bluetooth
30
31
32class DirectHciTest(gd_base_test.GdBaseTestClass):
33
34    def setup_class(self):
35        gd_base_test.GdBaseTestClass.setup_class(self, dut_module='HCI', cert_module='HAL')
36
37    def setup_test(self):
38        gd_base_test.GdBaseTestClass.setup_test(self)
39        self.dut_hci = PyHci(self.dut, acl_streaming=True)
40        self.cert_hal = PyHal(self.cert)
41        self.cert_hal.send_hci_command(hci.Reset())
42
43    def teardown_test(self):
44        self.dut_hci.close()
45        self.cert_hal.close()
46        gd_base_test.GdBaseTestClass.teardown_test(self)
47
48    def enqueue_acl_data(self, handle, pb_flag, b_flag, data):
49        acl = hci.Acl(handle=handle, packet_boundary_flag=pb_flag, broadcast_flag=b_flag, payload=data)
50        self.dut.hci.SendAcl(common.Data(payload=acl.serialize()))
51
52    def test_local_hci_cmd_and_event(self):
53        # Loopback mode responds with ACL and SCO connection complete
54        self.dut_hci.register_for_events(hci.EventCode.LOOPBACK_COMMAND)
55        self.dut_hci.send_command(hci.WriteLoopbackMode(loopback_mode=hci.LoopbackMode.ENABLE_LOCAL))
56
57        self.dut_hci.send_command(hci.ReadLocalName())
58        assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.LoopbackOf(hci.ReadLocalName().serialize()))
59
60    def test_inquiry_from_dut(self):
61        self.dut_hci.register_for_events(hci.EventCode.INQUIRY_RESULT)
62
63        self.cert_hal.enable_inquiry_and_page_scan()
64        self.dut_hci.send_command(hci.Inquiry(lap=hci.Lap(lap=0x33), inquiry_length=0x30, num_responses=0xff))
65        assertThat(self.dut_hci.get_event_stream()).emits(HciMatchers.EventWithCode(hci.EventCode.INQUIRY_RESULT))
66
67    def test_le_ad_scan_cert_advertises(self):
68        self.dut_hci.register_for_le_events(hci.SubeventCode.EXTENDED_ADVERTISING_REPORT,
69                                            hci.SubeventCode.ADVERTISING_REPORT)
70
71        # DUT Scans
72        self.dut_hci.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0D:05:04:03:02:01')))
73
74        self.dut_hci.send_command(
75            hci.LeSetExtendedScanParameters(own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
76                                            scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL,
77                                            scanning_phys=1,
78                                            parameters=[
79                                                hci.PhyScanParameters(le_scan_type=hci.LeScanType.ACTIVE,
80                                                                      le_scan_interval=6553,
81                                                                      le_scan_window=6553)
82                                            ]))
83
84        self.dut_hci.send_command(
85            hci.LeSetExtendedScanEnable(enable=hci.Enable.ENABLED,
86                                        filter_duplicates=hci.FilterDuplicates.DISABLED,
87                                        duration=0,
88                                        period=0))
89
90        # CERT Advertises
91        advertising_handle = 0
92        self.cert_hal.send_hci_command(
93            hci.LeSetExtendedAdvertisingParametersLegacy(
94                advertising_handle=advertising_handle,
95                legacy_advertising_event_properties=hci.LegacyAdvertisingEventProperties.ADV_IND,
96                primary_advertising_interval_min=512,
97                primary_advertising_interval_max=768,
98                primary_advertising_channel_map=7,
99                own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
100                peer_address_type=hci.PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
101                peer_address=bluetooth.Address('A6:A5:A4:A3:A2:A1'),
102                advertising_filter_policy=hci.AdvertisingFilterPolicy.ALL_DEVICES,
103                advertising_tx_power=0xF7,
104                advertising_sid=1,
105                scan_request_notification_enable=hci.Enable.DISABLED))
106
107        self.cert_hal.send_hci_command(
108            hci.LeSetAdvertisingSetRandomAddress(advertising_handle=advertising_handle,
109                                                 random_address=bluetooth.Address('0C:05:04:03:02:01')))
110
111        self.cert_hal.send_hci_command(
112            hci.LeSetExtendedAdvertisingData(
113                advertising_handle=advertising_handle,
114                operation=hci.Operation.COMPLETE_ADVERTISEMENT,
115                fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT,
116                advertising_data=[hci.GapData(data_type=hci.GapDataType.COMPLETE_LOCAL_NAME, data=list(b'Im_A_Cert'))]))
117
118        self.cert_hal.send_hci_command(
119            hci.LeSetExtendedScanResponseData(
120                advertising_handle=advertising_handle,
121                operation=hci.Operation.COMPLETE_ADVERTISEMENT,
122                fragment_preference=hci.FragmentPreference.CONTROLLER_SHOULD_NOT,
123                scan_response_data=[hci.GapData(data_type=hci.GapDataType.SHORTENED_LOCAL_NAME, data=list(b'Im_A_C'))]))
124
125        self.cert_hal.send_hci_command(
126            hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.ENABLED,
127                                               enabled_sets=[
128                                                   hci.EnabledSet(advertising_handle=advertising_handle,
129                                                                  duration=0,
130                                                                  max_extended_advertising_events=0)
131                                               ]))
132
133        assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: b'Im_A_Cert' in packet.payload)
134
135        self.cert_hal.send_hci_command(
136            hci.LeSetExtendedAdvertisingEnable(enable=hci.Enable.DISABLED,
137                                               enabled_sets=[
138                                                   hci.EnabledSet(advertising_handle=advertising_handle,
139                                                                  duration=0,
140                                                                  max_extended_advertising_events=0)
141                                               ]))
142
143        self.dut_hci.send_command(hci.LeSetExtendedScanEnable(enable=hci.Enable.DISABLED))
144
145    def _verify_le_connection_complete(self):
146        cert_conn_complete_capture = HalCaptures.LeConnectionCompleteCapture()
147        assertThat(self.cert_hal.get_hci_event_stream()).emits(cert_conn_complete_capture)
148        cert_handle = cert_conn_complete_capture.get().connection_handle
149
150        dut_conn_complete_capture = HciCaptures.LeConnectionCompleteCapture()
151        assertThat(self.dut_hci.get_le_event_stream()).emits(dut_conn_complete_capture)
152        dut_handle = dut_conn_complete_capture.get().connection_handle
153
154        return (dut_handle, cert_handle)
155
156    @staticmethod
157    def _create_phy_scan_params():
158        return hci.LeCreateConnPhyScanParameters(scan_interval=0x60,
159                                                 scan_window=0x30,
160                                                 conn_interval_min=0x18,
161                                                 conn_interval_max=0x28,
162                                                 conn_latency=0,
163                                                 supervision_timeout=0x1f4,
164                                                 min_ce_length=0,
165                                                 max_ce_length=0)
166
167    def test_le_connection_dut_advertises(self):
168        self.dut_hci.register_for_le_events(hci.SubeventCode.CONNECTION_COMPLETE,
169                                            hci.SubeventCode.ADVERTISING_SET_TERMINATED,
170                                            hci.SubeventCode.READ_REMOTE_FEATURES_COMPLETE)
171        # Cert Connects
172        self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT)
173        self.cert_hal.send_hci_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0C:05:04:03:02:01')))
174        self.cert_hal.send_hci_command(
175            hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_PEER_ADDRESS,
176                                           own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
177                                           peer_address_type=hci.AddressType.RANDOM_DEVICE_ADDRESS,
178                                           peer_address=bluetooth.Address('0D:05:04:03:02:01'),
179                                           initiating_phys=1,
180                                           phy_scan_parameters=[self._create_phy_scan_params()]))
181
182        advertisement = self.dut_hci.create_advertisement(0, '0D:05:04:03:02:01')
183        advertisement.set_data(b'Im_The_DUT')
184        advertisement.set_scan_response(b'Im_The_D')
185        advertisement.start()
186
187        (dut_handle, cert_handle) = self._verify_le_connection_complete()
188
189        self.dut_hci.send_command(hci.LeReadRemoteFeatures(connection_handle=dut_handle))
190        assertThat(self.dut_hci.get_le_event_stream()).emits(lambda packet: packet.payload[0] == int(
191            hci.EventCode.LE_META_EVENT) and packet.payload[2] == int(hci.SubeventCode.READ_REMOTE_FEATURES_COMPLETE))
192
193        # Send ACL Data
194        self.enqueue_acl_data(dut_handle, hci.PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE,
195                              hci.BroadcastFlag.POINT_TO_POINT, bytes(b'Just SomeAclData'))
196        self.cert_hal.send_acl_first(cert_handle, bytes(b'Just SomeMoreAclData'))
197
198        assertThat(self.cert_hal.get_acl_stream()).emits(
199            lambda packet: logging.debug(packet.payload) or b'SomeAclData' in packet.payload)
200        assertThat(self.dut_hci.get_raw_acl_stream()).emits(
201            lambda packet: logging.debug(packet.payload) or b'SomeMoreAclData' in packet.payload)
202
203    def test_le_filter_accept_list_connection_cert_advertises(self):
204        # DUT Connects
205        self.dut_hci.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0D:05:04:03:02:01')))
206        self.dut_hci.send_command(
207            hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM,
208                                              address=bluetooth.Address('0C:05:04:03:02:01')))
209        self.dut_hci.send_command(
210            hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST,
211                                           own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
212                                           initiating_phys=1,
213                                           phy_scan_parameters=[self._create_phy_scan_params()]))
214
215        self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT)
216        advertisement = self.cert_hal.create_advertisement(1,
217                                                           '0C:05:04:03:02:01',
218                                                           min_interval=512,
219                                                           max_interval=768,
220                                                           peer_address='A6:A5:A4:A3:A2:A1',
221                                                           tx_power=0x7f,
222                                                           sid=0)
223        advertisement.set_data(b'Im_A_Cert')
224        advertisement.start()
225
226        # LeConnectionComplete
227        self._verify_le_connection_complete()
228
229    def test_le_filter_accept_list_connection_cert_advertises_legacy(self):
230        # DUT Connects
231        self.dut_hci.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0D:05:04:03:02:01')))
232        self.dut_hci.send_command(
233            hci.LeAddDeviceToFilterAcceptList(address_type=hci.FilterAcceptListAddressType.RANDOM,
234                                              address=bluetooth.Address('0C:05:04:03:02:01')))
235        self.dut_hci.send_command(
236            hci.LeExtendedCreateConnection(initiator_filter_policy=hci.InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST,
237                                           own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
238                                           initiating_phys=1,
239                                           phy_scan_parameters=[self._create_phy_scan_params()]))
240
241        self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT)
242        self.cert_hal.send_hci_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0C:05:04:03:02:01')))
243
244        advertisement = self.cert_hal.create_legacy_advertisement(min_interval=512,
245                                                                  max_interval=768,
246                                                                  peer_address='A6:A5:A4:A3:A2:A1')
247        advertisement.set_data(b'Im_A_Cert')
248        advertisement.start()
249
250        # LeConnectionComplete
251        self._verify_le_connection_complete()
252
253    def test_le_ad_scan_cert_advertises_legacy(self):
254        self.dut_hci.register_for_le_events(hci.SubeventCode.EXTENDED_ADVERTISING_REPORT,
255                                            hci.SubeventCode.ADVERTISING_REPORT)
256
257        # DUT Scans
258        self.dut_hci.send_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0D:05:04:03:02:01')))
259
260        self.dut_hci.send_command(
261            hci.LeSetExtendedScanParameters(own_address_type=hci.OwnAddressType.RANDOM_DEVICE_ADDRESS,
262                                            scanning_filter_policy=hci.LeScanningFilterPolicy.ACCEPT_ALL,
263                                            scanning_phys=1,
264                                            parameters=[
265                                                hci.PhyScanParameters(le_scan_type=hci.LeScanType.ACTIVE,
266                                                                      le_scan_interval=6553,
267                                                                      le_scan_window=6553)
268                                            ]))
269
270        self.dut_hci.send_command(
271            hci.LeSetExtendedScanEnable(enable=hci.Enable.ENABLED,
272                                        filter_duplicates=hci.FilterDuplicates.DISABLED,
273                                        duration=0,
274                                        period=0))
275
276        self.cert_hal.unmask_event(hci.EventCode.LE_META_EVENT)
277        self.cert_hal.send_hci_command(hci.LeSetRandomAddress(random_address=bluetooth.Address('0C:05:04:03:02:01')))
278
279        advertisement = self.cert_hal.create_legacy_advertisement(min_interval=512,
280                                                                  max_interval=768,
281                                                                  peer_address='A6:A5:A4:A3:A2:A1')
282        advertisement.set_data(b'Im_A_Cert')
283        advertisement.start()
284
285        assertThat(self.dut_hci.get_le_event_stream()).emits(
286            HciMatchers.LeAdvertisement(address='0C:05:04:03:02:01', data=b'Im_A_Cert'))
287
288    def test_connection_dut_connects(self):
289        self.dut_hci.send_command(hci.WritePageTimeout(page_timeout=0x4000))
290
291        self.cert_hal.enable_inquiry_and_page_scan()
292        address = self.cert_hal.read_own_address()
293
294        self.dut_hci.initiate_connection(address)
295        cert_acl = self.cert_hal.accept_connection()
296        dut_acl = self.dut_hci.complete_connection()
297
298        # Send ACL Data
299        dut_acl.send_first(b'Just SomeAclData')
300        cert_acl.send_first(b'Just SomeMoreAclData')
301
302        assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload)
303        assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
304
305    def test_connection_cert_connects(self):
306        self.cert_hal.send_hci_command(hci.WritePageTimeout(page_timeout=0x4000))
307
308        self.dut_hci.enable_inquiry_and_page_scan()
309        address = self.dut_hci.read_own_address()
310
311        self.cert_hal.initiate_connection(address)
312        dut_acl = self.dut_hci.accept_connection()
313        cert_acl = self.cert_hal.complete_connection()
314
315        # Send ACL Data
316        dut_acl.send_first(b'This is just SomeAclData')
317        cert_acl.send_first(b'This is just SomeMoreAclData')
318
319        assertThat(self.cert_hal.get_acl_stream()).emits(lambda packet: b'SomeAclData' in packet.payload)
320        assertThat(self.dut_hci.get_raw_acl_stream()).emits(lambda packet: b'SomeMoreAclData' in packet.payload)
321
322
323if __name__ == '__main__':
324    test_runner.main()
325