• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17import os
18import sys
19import logging
20
21from cert.gd_base_test_facade_only import GdFacadeOnlyBaseTestClass
22from cert.event_callback_stream import EventCallbackStream
23from cert.event_asserts import EventAsserts
24from google.protobuf import empty_pb2 as empty_proto
25from facade import rootservice_pb2 as facade_rootservice
26from hci.facade import acl_manager_facade_pb2 as acl_manager_facade
27from neighbor.facade import facade_pb2 as neighbor_facade
28from hci.facade import controller_facade_pb2 as controller_facade
29from hci.facade import facade_pb2 as hci_facade
30import bluetooth_packets_python3 as bt_packets
31from bluetooth_packets_python3 import hci_packets
32
33
34class AclManagerTest(GdFacadeOnlyBaseTestClass):
35
36    def setup_test(self):
37        self.device_under_test.rootservice.StartStack(
38            facade_rootservice.StartStackRequest(
39                module_under_test=facade_rootservice.BluetoothModule.Value(
40                    'HCI_INTERFACES'),))
41        self.cert_device.rootservice.StartStack(
42            facade_rootservice.StartStackRequest(
43                module_under_test=facade_rootservice.BluetoothModule.Value(
44                    'HCI'),))
45
46        self.device_under_test.wait_channel_ready()
47        self.cert_device.wait_channel_ready()
48
49    def teardown_test(self):
50        self.device_under_test.rootservice.StopStack(
51            facade_rootservice.StopStackRequest())
52        self.cert_device.rootservice.StopStack(
53            facade_rootservice.StopStackRequest())
54
55    def register_for_event(self, event_code):
56        msg = hci_facade.EventCodeMsg(code=int(event_code))
57        self.cert_device.hci.RegisterEventHandler(msg)
58
59    def enqueue_hci_command(self, command, expect_complete):
60        cmd_bytes = bytes(command.Serialize())
61        cmd = hci_facade.CommandMsg(command=cmd_bytes)
62        if (expect_complete):
63            self.cert_device.hci.EnqueueCommandWithComplete(cmd)
64        else:
65            self.cert_device.hci.EnqueueCommandWithStatus(cmd)
66
67    def enqueue_acl_data(self, handle, pb_flag, b_flag, acl):
68        acl_msg = hci_facade.AclMsg(
69            handle=int(handle),
70            packet_boundary_flag=int(pb_flag),
71            broadcast_flag=int(b_flag),
72            data=acl)
73        self.cert_device.hci.SendAclData(acl_msg)
74
75    def test_dut_connects(self):
76        self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST)
77        self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
78        self.register_for_event(
79            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
80        with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \
81            EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
82            EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
83
84            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
85            acl_data_asserts = EventAsserts(acl_data_stream)
86            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
87
88            # CERT Enables scans and gets its address
89            self.enqueue_hci_command(
90                hci_packets.WriteScanEnableBuilder(
91                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
92
93            cert_address = None
94
95            def get_address_from_complete(packet):
96                packet_bytes = packet.event
97                if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
98                    nonlocal cert_address
99                    addr_view = hci_packets.ReadBdAddrCompleteView(
100                        hci_packets.CommandCompleteView(
101                            hci_packets.EventPacketView(
102                                bt_packets.PacketViewLittleEndian(
103                                    list(packet_bytes)))))
104                    cert_address = addr_view.GetBdAddr()
105                    return True
106                return False
107
108            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
109
110            cert_hci_event_asserts.assert_event_occurs(
111                get_address_from_complete)
112
113            with EventCallbackStream(
114                    self.device_under_test.hci_acl_manager.CreateConnection(
115                        acl_manager_facade.ConnectionMsg(
116                            address_type=int(
117                                hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
118                            address=bytes(cert_address,
119                                          'utf8')))) as connection_event_stream:
120
121                connection_event_asserts = EventAsserts(connection_event_stream)
122                connection_request = None
123
124                def get_connect_request(packet):
125                    if b'\x04\x0a' in packet.event:
126                        nonlocal connection_request
127                        connection_request = hci_packets.ConnectionRequestView(
128                            hci_packets.EventPacketView(
129                                bt_packets.PacketViewLittleEndian(
130                                    list(packet.event))))
131                        return True
132                    return False
133
134                # Cert Accepts
135                cert_hci_event_asserts.assert_event_occurs(get_connect_request)
136                self.enqueue_hci_command(
137                    hci_packets.AcceptConnectionRequestBuilder(
138                        connection_request.GetBdAddr(),
139                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
140                    False)
141
142                # Cert gets ConnectionComplete with a handle and sends ACL data
143                handle = 0xfff
144
145                def get_handle(packet):
146                    packet_bytes = packet.event
147                    if b'\x03\x0b\x00' in packet_bytes:
148                        nonlocal handle
149                        cc_view = hci_packets.ConnectionCompleteView(
150                            hci_packets.EventPacketView(
151                                bt_packets.PacketViewLittleEndian(
152                                    list(packet_bytes))))
153                        handle = cc_view.GetConnectionHandle()
154                        return True
155                    return False
156
157                cert_hci_event_asserts.assert_event_occurs(get_handle)
158                cert_handle = handle
159
160                self.enqueue_acl_data(
161                    cert_handle, hci_packets.PacketBoundaryFlag.
162                    FIRST_AUTOMATICALLY_FLUSHABLE,
163                    hci_packets.BroadcastFlag.POINT_TO_POINT,
164                    bytes(
165                        b'\x26\x00\x07\x00This is just SomeAclData from the Cert'
166                    ))
167
168                # DUT gets a connection complete event and sends and receives
169                handle = 0xfff
170                connection_event_asserts.assert_event_occurs(get_handle)
171
172                self.device_under_test.hci_acl_manager.SendAclData(
173                    acl_manager_facade.AclData(
174                        handle=handle,
175                        payload=bytes(
176                            b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
177                        )))
178
179                cert_acl_data_asserts.assert_event_occurs(
180                    lambda packet: b'SomeMoreAclData' in packet.data)
181                acl_data_asserts.assert_event_occurs(
182                    lambda packet: b'SomeAclData' in packet.payload)
183
184    def test_cert_connects(self):
185        self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
186        self.register_for_event(hci_packets.EventCode.ROLE_CHANGE)
187        self.register_for_event(
188            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
189        with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \
190            EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
191            EventCallbackStream(self.device_under_test.hci_acl_manager.FetchIncomingConnection(empty_proto.Empty())) as incoming_connection_stream, \
192            EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
193
194            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
195            incoming_connection_asserts = EventAsserts(
196                incoming_connection_stream)
197            cert_acl_data_asserts = EventAsserts(cert_acl_data_stream)
198            acl_data_asserts = EventAsserts(acl_data_stream)
199
200            # DUT Enables scans and gets its address
201            dut_address = self.device_under_test.hci_controller.GetMacAddress(
202                empty_proto.Empty()).address
203
204            self.device_under_test.neighbor.EnablePageScan(
205                neighbor_facade.EnableMsg(enabled=True))
206
207            # Cert connects
208            self.enqueue_hci_command(
209                hci_packets.CreateConnectionBuilder(
210                    dut_address.decode('utf-8'),
211                    0xcc18,  # Packet Type
212                    hci_packets.PageScanRepetitionMode.R1,
213                    0x0,
214                    hci_packets.ClockOffsetValid.INVALID,
215                    hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH),
216                False)
217
218            conn_handle = 0xfff
219
220            def get_handle(packet):
221                packet_bytes = packet.event
222                if b'\x03\x0b\x00' in packet_bytes:
223                    nonlocal conn_handle
224                    cc_view = hci_packets.ConnectionCompleteView(
225                        hci_packets.EventPacketView(
226                            bt_packets.PacketViewLittleEndian(
227                                list(packet_bytes))))
228                    conn_handle = cc_view.GetConnectionHandle()
229                    return True
230                return False
231
232            # DUT gets a connection request
233            incoming_connection_asserts.assert_event_occurs(get_handle)
234
235            self.device_under_test.hci_acl_manager.SendAclData(
236                acl_manager_facade.AclData(
237                    handle=conn_handle,
238                    payload=bytes(
239                        b'\x29\x00\x07\x00This is just SomeMoreAclData from the DUT'
240                    )))
241
242            conn_handle = 0xfff
243
244            cert_hci_event_asserts.assert_event_occurs(get_handle)
245            cert_handle = conn_handle
246
247            self.enqueue_acl_data(
248                cert_handle,
249                hci_packets.PacketBoundaryFlag.FIRST_AUTOMATICALLY_FLUSHABLE,
250                hci_packets.BroadcastFlag.POINT_TO_POINT,
251                bytes(
252                    b'\x26\x00\x07\x00This is just SomeAclData from the Cert'))
253
254            cert_acl_data_asserts.assert_event_occurs(
255                lambda packet: b'SomeMoreAclData' in packet.data)
256            acl_data_asserts.assert_event_occurs(
257                lambda packet: b'SomeAclData' in packet.payload)
258
259    def test_recombination_l2cap_packet(self):
260        self.register_for_event(hci_packets.EventCode.CONNECTION_REQUEST)
261        self.register_for_event(hci_packets.EventCode.CONNECTION_COMPLETE)
262        self.register_for_event(
263            hci_packets.EventCode.CONNECTION_PACKET_TYPE_CHANGED)
264        with EventCallbackStream(self.cert_device.hci.FetchEvents(empty_proto.Empty())) as cert_hci_event_stream, \
265            EventCallbackStream(self.cert_device.hci.FetchAclPackets(empty_proto.Empty())) as cert_acl_data_stream, \
266            EventCallbackStream(self.device_under_test.hci_acl_manager.FetchAclData(empty_proto.Empty())) as acl_data_stream:
267
268            cert_hci_event_asserts = EventAsserts(cert_hci_event_stream)
269            acl_data_asserts = EventAsserts(acl_data_stream)
270
271            # CERT Enables scans and gets its address
272            self.enqueue_hci_command(
273                hci_packets.WriteScanEnableBuilder(
274                    hci_packets.ScanEnable.INQUIRY_AND_PAGE_SCAN), True)
275
276            cert_address = None
277
278            def get_address_from_complete(packet):
279                packet_bytes = packet.event
280                if b'\x0e\x0a\x01\x09\x10' in packet_bytes:
281                    nonlocal cert_address
282                    addr_view = hci_packets.ReadBdAddrCompleteView(
283                        hci_packets.CommandCompleteView(
284                            hci_packets.EventPacketView(
285                                bt_packets.PacketViewLittleEndian(
286                                    list(packet_bytes)))))
287                    cert_address = addr_view.GetBdAddr()
288                    return True
289                return False
290
291            self.enqueue_hci_command(hci_packets.ReadBdAddrBuilder(), True)
292
293            cert_hci_event_asserts.assert_event_occurs(
294                get_address_from_complete)
295
296            with EventCallbackStream(
297                    self.device_under_test.hci_acl_manager.CreateConnection(
298                        acl_manager_facade.ConnectionMsg(
299                            address_type=int(
300                                hci_packets.AddressType.PUBLIC_DEVICE_ADDRESS),
301                            address=bytes(cert_address,
302                                          'utf8')))) as connection_event_stream:
303
304                connection_event_asserts = EventAsserts(connection_event_stream)
305                connection_request = None
306
307                def get_connect_request(packet):
308                    if b'\x04\x0a' in packet.event:
309                        nonlocal connection_request
310                        connection_request = hci_packets.ConnectionRequestView(
311                            hci_packets.EventPacketView(
312                                bt_packets.PacketViewLittleEndian(
313                                    list(packet.event))))
314                        return True
315                    return False
316
317                # Cert Accepts
318                cert_hci_event_asserts.assert_event_occurs(get_connect_request)
319                self.enqueue_hci_command(
320                    hci_packets.AcceptConnectionRequestBuilder(
321                        connection_request.GetBdAddr(),
322                        hci_packets.AcceptConnectionRequestRole.REMAIN_SLAVE),
323                    False)
324
325                # Cert gets ConnectionComplete with a handle and sends ACL data
326                handle = 0xfff
327
328                def get_handle(packet):
329                    packet_bytes = packet.event
330                    if b'\x03\x0b\x00' in packet_bytes:
331                        nonlocal handle
332                        cc_view = hci_packets.ConnectionCompleteView(
333                            hci_packets.EventPacketView(
334                                bt_packets.PacketViewLittleEndian(
335                                    list(packet_bytes))))
336                        handle = cc_view.GetConnectionHandle()
337                        return True
338                    return False
339
340                cert_hci_event_asserts.assert_event_occurs(get_handle)
341                cert_handle = handle
342
343                self.enqueue_acl_data(
344                    cert_handle, hci_packets.PacketBoundaryFlag.
345                    FIRST_AUTOMATICALLY_FLUSHABLE,
346                    hci_packets.BroadcastFlag.POINT_TO_POINT,
347                    bytes(b'\x06\x00\x07\x00Hello'))
348                self.enqueue_acl_data(
349                    cert_handle,
350                    hci_packets.PacketBoundaryFlag.CONTINUING_FRAGMENT,
351                    hci_packets.BroadcastFlag.POINT_TO_POINT, bytes(b'!'))
352                self.enqueue_acl_data(
353                    cert_handle, hci_packets.PacketBoundaryFlag.
354                    FIRST_AUTOMATICALLY_FLUSHABLE,
355                    hci_packets.BroadcastFlag.POINT_TO_POINT,
356                    bytes(b'\xe8\x03\x07\x00' + b'Hello' * 200))
357
358                # DUT gets a connection complete event and sends and receives
359                connection_event_asserts.assert_event_occurs(get_handle)
360
361                acl_data_asserts.assert_event_occurs(
362                    lambda packet: b'Hello!' in packet.payload)
363                acl_data_asserts.assert_event_occurs(
364                    lambda packet: b'Hello' * 200 in packet.payload)
365