• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2019 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from datetime import timedelta
18
19from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
20from cert.event_callback_stream import EventCallbackStream
21from cert.event_asserts import EventAsserts
22from google.protobuf import empty_pb2
23from facade import rootservice_pb2 as facade_rootservice_pb2
24from hal import facade_pb2 as hal_facade_pb2
25from bluetooth_packets_python3 import hci_packets
26import bluetooth_packets_python3 as bt_packets
27
28
29class SimpleHalTest(GdFacadeOnlyBaseTestClass):
30
31    def send_cert_hci_command(self, command):
32        self.cert_device.hal.SendHciCommand(
33            hal_facade_pb2.HciCommandPacket(payload=bytes(command.Serialize())))
34
35    def send_cert_acl_data(self, handle, pb_flag, b_flag, acl):
36        lower = handle & 0xff
37        upper = (handle >> 8) & 0xf
38        upper = upper | int(pb_flag) & 0x3
39        upper = upper | ((int(b_flag) & 0x3) << 2)
40        lower_length = len(acl) & 0xff
41        upper_length = (len(acl) & 0xff00) >> 8
42        concatenated = bytes([lower, upper, lower_length, upper_length] +
43                             list(acl))
44        self.cert_device.hal.SendHciAcl(
45            hal_facade_pb2.HciAclPacket(payload=concatenated))
46
47    def send_dut_hci_command(self, command):
48        self.device_under_test.hal.SendHciCommand(
49            hal_facade_pb2.HciCommandPacket(payload=bytes(command.Serialize())))
50
51    def send_dut_acl_data(self, handle, pb_flag, b_flag, acl):
52        lower = handle & 0xff
53        upper = (handle >> 8) & 0xf
54        upper = upper | int(pb_flag) & 0x3
55        upper = upper | ((int(b_flag) & 0x3) << 2)
56        lower_length = len(acl) & 0xff
57        upper_length = (len(acl) & 0xff00) >> 8
58        concatenated = bytes([lower, upper, lower_length, upper_length] +
59                             list(acl))
60        self.device_under_test.hal.SendHciAcl(
61            hal_facade_pb2.HciAclPacket(payload=concatenated))
62
63    def setup_test(self):
64        self.device_under_test.rootservice.StartStack(
65            facade_rootservice_pb2.StartStackRequest(
66                module_under_test=facade_rootservice_pb2.BluetoothModule.Value(
67                    'HAL'),))
68        self.cert_device.rootservice.StartStack(
69            facade_rootservice_pb2.StartStackRequest(
70                module_under_test=facade_rootservice_pb2.BluetoothModule.Value(
71                    'HAL'),))
72
73        self.device_under_test.wait_channel_ready()
74        self.cert_device.wait_channel_ready()
75
76        self.send_dut_hci_command(hci_packets.ResetBuilder())
77        self.send_cert_hci_command(hci_packets.ResetBuilder())
78
79    def teardown_test(self):
80        self.device_under_test.rootservice.StopStack(
81            facade_rootservice_pb2.StopStackRequest())
82        self.cert_device.rootservice.StopStack(
83            facade_rootservice_pb2.StopStackRequest())
84
85    def test_none_event(self):
86        with EventCallbackStream(
87                self.device_under_test.hal.FetchHciEvent(
88                    empty_pb2.Empty())) as hci_event_stream:
89            hci_event_asserts = EventAsserts(hci_event_stream)
90            hci_event_asserts.assert_none(timeout=timedelta(seconds=1))
91
92    def test_fetch_hci_event(self):
93        with EventCallbackStream(
94                self.device_under_test.hal.FetchHciEvent(
95                    empty_pb2.Empty())) as hci_event_stream:
96
97            hci_event_asserts = EventAsserts(hci_event_stream)
98
99            self.send_dut_hci_command(
100                hci_packets.LeAddDeviceToWhiteListBuilder(
101                    hci_packets.WhiteListAddressType.RANDOM,
102                    '0C:05:04:03:02:01'))
103            event = hci_packets.LeAddDeviceToWhiteListCompleteBuilder(
104                1, hci_packets.ErrorCode.SUCCESS)
105            hci_event_asserts.assert_event_occurs(
106                lambda packet: bytes(event.Serialize()) in packet.payload)
107
108    def test_loopback_hci_command(self):
109        with EventCallbackStream(
110                self.device_under_test.hal.FetchHciEvent(
111                    empty_pb2.Empty())) as hci_event_stream:
112
113            self.send_dut_hci_command(
114                hci_packets.WriteLoopbackModeBuilder(
115                    hci_packets.LoopbackMode.ENABLE_LOCAL))
116
117            hci_event_asserts = EventAsserts(hci_event_stream)
118
119            command = hci_packets.LeAddDeviceToWhiteListBuilder(
120                hci_packets.WhiteListAddressType.RANDOM, '0C:05:04:03:02:01')
121            self.send_dut_hci_command(command)
122            hci_event_asserts.assert_event_occurs(
123                lambda packet: bytes(command.Serialize()) in packet.payload)
124
125    def test_inquiry_from_dut(self):
126        with EventCallbackStream(
127                self.device_under_test.hal.FetchHciEvent(
128                    empty_pb2.Empty())) as hci_event_stream:
129            hci_event_asserts = EventAsserts(hci_event_stream)
130            self.send_cert_hci_command(
131                hci_packets.WriteScanEnableBuilder(
132                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN))
133            lap = hci_packets.Lap()
134            lap.lap = 0x33
135            self.send_dut_hci_command(
136                hci_packets.InquiryBuilder(lap, 0x30, 0xff))
137            hci_event_asserts.assert_event_occurs(
138                lambda packet: b'\x02\x0f' in packet.payload
139                # Expecting an HCI Event (code 0x02, length 0x0f)
140            )
141
142    def test_le_ad_scan_cert_advertises(self):
143        with EventCallbackStream(
144                self.device_under_test.hal.FetchHciEvent(
145                    empty_pb2.Empty())) as hci_event_stream:
146            hci_event_asserts = EventAsserts(hci_event_stream)
147
148            # DUT scans
149            self.send_dut_hci_command(
150                hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
151            phy_scan_params = hci_packets.PhyScanParameters()
152            phy_scan_params.le_scan_interval = 6553
153            phy_scan_params.le_scan_window = 6553
154            phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE
155
156            self.send_dut_hci_command(
157                hci_packets.LeSetExtendedScanParametersBuilder(
158                    hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
159                    hci_packets.LeSetScanningFilterPolicy.ACCEPT_ALL, 1,
160                    [phy_scan_params]))
161            self.send_dut_hci_command(
162                hci_packets.LeSetExtendedScanEnableBuilder(
163                    hci_packets.Enable.ENABLED,
164                    hci_packets.FilterDuplicates.DISABLED, 0, 0))
165
166            # CERT Advertises
167            advertising_handle = 0
168            self.send_cert_hci_command(
169                hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
170                    advertising_handle,
171                    hci_packets.LegacyAdvertisingProperties.ADV_IND,
172                    512,
173                    768,
174                    7,
175                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
176                    hci_packets.PeerAddressType.
177                    PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
178                    'A6:A5:A4:A3:A2:A1',
179                    hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
180                    0x7F,
181                    0,  # SID
182                    hci_packets.Enable.DISABLED  # Scan request notification
183                ))
184
185            self.send_cert_hci_command(
186                hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(
187                    advertising_handle, '0C:05:04:03:02:01'))
188
189            gap_name = hci_packets.GapData()
190            gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
191            gap_name.data = list(bytes(b'Im_A_Cert!'))  # TODO: Fix and remove !
192
193            self.send_cert_hci_command(
194                hci_packets.LeSetExtendedAdvertisingDataBuilder(
195                    advertising_handle,
196                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
197                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
198                    [gap_name]))
199            enabled_set = hci_packets.EnabledSet()
200            enabled_set.advertising_handle = advertising_handle
201            enabled_set.duration = 0
202            enabled_set.max_extended_advertising_events = 0
203            self.send_cert_hci_command(
204                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
205                    hci_packets.Enable.ENABLED, [enabled_set]))
206
207            hci_event_asserts.assert_event_occurs(
208                lambda packet: b'Im_A_Cert' in packet.payload)
209
210            # Disable Advertising
211            self.send_cert_hci_command(
212                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
213                    hci_packets.Enable.DISABLED, [enabled_set]))
214
215            # Disable Scanning
216            self.send_dut_hci_command(
217                hci_packets.LeSetExtendedScanEnableBuilder(
218                    hci_packets.Enable.ENABLED,
219                    hci_packets.FilterDuplicates.DISABLED, 0, 0))
220
221    def test_le_connection_dut_advertises(self):
222        with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream, \
223                EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_pb2.Empty())) as cert_hci_event_stream, \
224                EventCallbackStream(self.device_under_test.hal.FetchHciAcl(empty_pb2.Empty())) as acl_data_stream, \
225                EventCallbackStream(self.cert_device.hal.FetchHciAcl(empty_pb2.Empty())) as cert_acl_data_stream:
226
227            hci_event_asserts = EventAsserts(hci_event_stream)
228            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
229            acl_data_asserts = EventAsserts(acl_data_stream)
230            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
231
232            # Cert Connects
233            self.send_cert_hci_command(
234                hci_packets.LeSetRandomAddressBuilder('0C:05:04:03:02:01'))
235            phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
236            phy_scan_params.scan_interval = 0x60
237            phy_scan_params.scan_window = 0x30
238            phy_scan_params.conn_interval_min = 0x18
239            phy_scan_params.conn_interval_max = 0x28
240            phy_scan_params.conn_latency = 0
241            phy_scan_params.supervision_timeout = 0x1f4
242            phy_scan_params.min_ce_length = 0
243            phy_scan_params.max_ce_length = 0
244            self.send_cert_hci_command(
245                hci_packets.LeExtendedCreateConnectionBuilder(
246                    hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS,
247                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
248                    hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
249                    '0D:05:04:03:02:01', 1, [phy_scan_params]))
250
251            # DUT Advertises
252            advertising_handle = 0
253            self.send_dut_hci_command(
254                hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
255                    advertising_handle,
256                    hci_packets.LegacyAdvertisingProperties.ADV_IND,
257                    400,
258                    450,
259                    7,
260                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
261                    hci_packets.PeerAddressType.
262                    PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
263                    '00:00:00:00:00:00',
264                    hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
265                    0xF8,
266                    1,  #SID
267                    hci_packets.Enable.DISABLED  # Scan request notification
268                ))
269
270            self.send_dut_hci_command(
271                hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(
272                    advertising_handle, '0D:05:04:03:02:01'))
273
274            gap_name = hci_packets.GapData()
275            gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
276            gap_name.data = list(
277                bytes(b'Im_The_DUT!'))  # TODO: Fix and remove !
278
279            self.send_dut_hci_command(
280                hci_packets.LeSetExtendedAdvertisingDataBuilder(
281                    advertising_handle,
282                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
283                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
284                    [gap_name]))
285
286            gap_short_name = hci_packets.GapData()
287            gap_short_name.data_type = hci_packets.GapDataType.SHORTENED_LOCAL_NAME
288            gap_short_name.data = list(bytes(b'Im_The_D'))
289
290            self.send_dut_hci_command(
291                hci_packets.LeSetExtendedAdvertisingScanResponseBuilder(
292                    advertising_handle,
293                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
294                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
295                    [gap_short_name]))
296
297            enabled_set = hci_packets.EnabledSet()
298            enabled_set.advertising_handle = advertising_handle
299            enabled_set.duration = 0
300            enabled_set.max_extended_advertising_events = 0
301            self.send_dut_hci_command(
302                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
303                    hci_packets.Enable.ENABLED, [enabled_set]))
304
305            conn_handle = 0xfff
306
307            def payload_handle(packet):
308                packet_bytes = packet.payload
309                if b'\x3e\x13\x01\x00' in packet_bytes:
310                    nonlocal conn_handle
311                    cc_view = hci_packets.LeConnectionCompleteView(
312                        hci_packets.LeMetaEventView(
313                            hci_packets.EventPacketView(
314                                bt_packets.PacketViewLittleEndian(
315                                    list(packet_bytes)))))
316                    conn_handle = cc_view.GetConnectionHandle()
317                    return True
318                return False
319
320            cert_hci_event_asserts.assert_event_occurs(payload_handle)
321            cert_handle = conn_handle
322            conn_handle = 0xfff
323            hci_event_asserts.assert_event_occurs(payload_handle)
324            dut_handle = conn_handle
325
326            # Send ACL Data
327            self.send_dut_acl_data(
328                dut_handle, hci_packets.PacketBoundaryFlag.
329                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
330                hci_packets.BroadcastFlag.POINT_TO_POINT,
331                bytes(b'Just SomeAclData'))
332            self.send_cert_acl_data(
333                cert_handle, hci_packets.PacketBoundaryFlag.
334                FIRST_NON_AUTOMATICALLY_FLUSHABLE,
335                hci_packets.BroadcastFlag.POINT_TO_POINT,
336                bytes(b'Just SomeMoreAclData'))
337
338            cert_acl_data_asserts.assert_event_occurs(
339                lambda packet: b'SomeAclData' in packet.payload)
340            acl_data_asserts.assert_event_occurs(
341                lambda packet: b'SomeMoreAclData' in packet.payload)
342
343    def test_le_white_list_connection_cert_advertises(self):
344        with EventCallbackStream(self.device_under_test.hal.FetchHciEvent(empty_pb2.Empty())) as hci_event_stream, \
345            EventCallbackStream(self.cert_device.hal.FetchHciEvent(empty_pb2.Empty())) as cert_hci_event_stream:
346            hci_event_asserts = EventAsserts(hci_event_stream)
347            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
348
349            # DUT Connects
350            self.send_dut_hci_command(
351                hci_packets.LeSetRandomAddressBuilder('0D:05:04:03:02:01'))
352            self.send_dut_hci_command(
353                hci_packets.LeAddDeviceToWhiteListBuilder(
354                    hci_packets.WhiteListAddressType.RANDOM,
355                    '0C:05:04:03:02:01'))
356            phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
357            phy_scan_params.scan_interval = 0x60
358            phy_scan_params.scan_window = 0x30
359            phy_scan_params.conn_interval_min = 0x18
360            phy_scan_params.conn_interval_max = 0x28
361            phy_scan_params.conn_latency = 0
362            phy_scan_params.supervision_timeout = 0x1f4
363            phy_scan_params.min_ce_length = 0
364            phy_scan_params.max_ce_length = 0
365            self.send_dut_hci_command(
366                hci_packets.LeExtendedCreateConnectionBuilder(
367                    hci_packets.InitiatorFilterPolicy.USE_WHITE_LIST,
368                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
369                    hci_packets.AddressType.RANDOM_DEVICE_ADDRESS,
370                    'BA:D5:A4:A3:A2:A1', 1, [phy_scan_params]))
371
372            # CERT Advertises
373            advertising_handle = 1
374            self.send_cert_hci_command(
375                hci_packets.LeSetExtendedAdvertisingLegacyParametersBuilder(
376                    advertising_handle,
377                    hci_packets.LegacyAdvertisingProperties.ADV_IND,
378                    512,
379                    768,
380                    7,
381                    hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
382                    hci_packets.PeerAddressType.
383                    PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
384                    'A6:A5:A4:A3:A2:A1',
385                    hci_packets.AdvertisingFilterPolicy.ALL_DEVICES,
386                    0x7F,
387                    0,  # SID
388                    hci_packets.Enable.DISABLED  # Scan request notification
389                ))
390
391            self.send_cert_hci_command(
392                hci_packets.LeSetExtendedAdvertisingRandomAddressBuilder(
393                    advertising_handle, '0C:05:04:03:02:01'))
394
395            gap_name = hci_packets.GapData()
396            gap_name.data_type = hci_packets.GapDataType.COMPLETE_LOCAL_NAME
397            gap_name.data = list(bytes(b'Im_A_Cert!'))  # TODO: Fix and remove !
398
399            self.send_cert_hci_command(
400                hci_packets.LeSetExtendedAdvertisingDataBuilder(
401                    advertising_handle,
402                    hci_packets.Operation.COMPLETE_ADVERTISEMENT,
403                    hci_packets.FragmentPreference.CONTROLLER_SHOULD_NOT,
404                    [gap_name]))
405            enabled_set = hci_packets.EnabledSet()
406            enabled_set.advertising_handle = 1
407            enabled_set.duration = 0
408            enabled_set.max_extended_advertising_events = 0
409            self.send_cert_hci_command(
410                hci_packets.LeSetExtendedAdvertisingEnableBuilder(
411                    hci_packets.Enable.ENABLED, [enabled_set]))
412
413            # LeConnectionComplete
414            cert_hci_event_asserts.assert_event_occurs(
415                lambda packet: b'\x3e\x13\x01\x00' in packet.payload,
416                timeout=timedelta(seconds=20))
417            hci_event_asserts.assert_event_occurs(
418                lambda packet: b'\x3e\x13\x01\x00' in packet.payload,
419                timeout=timedelta(seconds=20))
420