• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from google.protobuf import empty_pb2 as empty_proto
18from blueberry.tests.gd.cert.event_stream import EventStream
19from blueberry.tests.gd.cert.event_stream import FilteringEventStream
20from blueberry.tests.gd.cert.event_stream import IEventStream
21from blueberry.tests.gd.cert.closable import Closable
22from blueberry.tests.gd.cert.closable import safeClose
23from blueberry.tests.gd.cert.captures import HciCaptures
24from blueberry.tests.gd.cert.truth import assertThat
25from bluetooth_packets_python3.hci_packets import WriteScanEnableBuilder
26from bluetooth_packets_python3.hci_packets import ScanEnable
27from bluetooth_packets_python3.hci_packets import AclBuilder
28from bluetooth_packets_python3 import RawBuilder
29from bluetooth_packets_python3.hci_packets import BroadcastFlag
30from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag
31from bluetooth_packets_python3 import hci_packets
32from blueberry.tests.gd.cert.matchers import HciMatchers
33from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingParametersLegacyBuilder
34from bluetooth_packets_python3.hci_packets import LegacyAdvertisingEventProperties
35from bluetooth_packets_python3.hci_packets import PeerAddressType
36from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
37from bluetooth_packets_python3.hci_packets import LeSetAdvertisingSetRandomAddressBuilder
38from bluetooth_packets_python3.hci_packets import GapData
39from bluetooth_packets_python3.hci_packets import GapDataType
40from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
41from bluetooth_packets_python3.hci_packets import Operation
42from bluetooth_packets_python3.hci_packets import OwnAddressType
43from bluetooth_packets_python3.hci_packets import Enable
44from bluetooth_packets_python3.hci_packets import FragmentPreference
45from bluetooth_packets_python3.hci_packets import LeSetExtendedScanResponseDataBuilder
46from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
47from bluetooth_packets_python3.hci_packets import EnabledSet
48from bluetooth_packets_python3.hci_packets import OpCode
49from blueberry.facade import common_pb2 as common
50
51
52class PyHalAclConnection(IEventStream):
53
54    def __init__(self, handle, acl_stream, device):
55        self.handle = int(handle)
56        self.device = device
57        self.our_acl_stream = FilteringEventStream(acl_stream, None)
58
59    def send(self, pb_flag, b_flag, data):
60        acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data))
61        self.device.hal.SendAcl(common.Data(payload=bytes(acl.Serialize())))
62
63    def send_first(self, data):
64        self.send(PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(data))
65
66    def get_event_queue(self):
67        return self.our_acl_stream.get_event_queue()
68
69
70class PyHalAdvertisement(object):
71
72    def __init__(self, handle, py_hal):
73        self.handle = handle
74        self.py_hal = py_hal
75
76    def set_data(self, complete_name):
77        data = GapData()
78        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
79        data.data = list(bytes(complete_name))
80        self.py_hal.send_hci_command(
81            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
82                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
83        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA)
84
85    def set_scan_response(self, shortened_name):
86        data = GapData()
87        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
88        data.data = list(bytes(shortened_name))
89        self.py_hal.send_hci_command(
90            LeSetExtendedScanResponseDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
91                                                 FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
92        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_RESPONSE_DATA)
93
94    def start(self):
95        enabled_set = EnabledSet()
96        enabled_set.advertising_handle = self.handle
97        enabled_set.duration = 0
98        enabled_set.max_extended_advertising_events = 0
99        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
100        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)
101
102    def stop(self):
103        enabled_set = EnabledSet()
104        enabled_set.advertising_handle = self.handle
105        enabled_set.duration = 0
106        enabled_set.max_extended_advertising_events = 0
107        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set]))
108        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)
109
110
111class PyHal(Closable):
112
113    def __init__(self, device):
114        self.device = device
115
116        self.hci_event_stream = EventStream(self.device.hal.StreamEvents(empty_proto.Empty()))
117        self.acl_stream = EventStream(self.device.hal.StreamAcl(empty_proto.Empty()))
118
119        self.event_mask = 0x1FFF_FFFF_FFFF  # Default Event Mask (Core Vol 4 [E] 7.3.1)
120        self.le_event_mask = 0x0000_0000_001F  # Default LE Event Mask (Core Vol 4 [E] 7.8.1)
121
122        # We don't deal with SCO for now
123
124    def close(self):
125        safeClose(self.hci_event_stream)
126        safeClose(self.acl_stream)
127
128    def get_hci_event_stream(self):
129        return self.hci_event_stream
130
131    def wait_for_complete(self, opcode):
132        assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode))
133
134    def wait_for_status(self, opcode):
135        assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode))
136
137    def get_acl_stream(self):
138        return self.acl_stream
139
140    def send_hci_command(self, command):
141        self.device.hal.SendCommand(common.Data(payload=bytes(command.Serialize())))
142
143    def send_acl(self, handle, pb_flag, b_flag, data):
144        acl = AclBuilder(handle, pb_flag, b_flag, RawBuilder(data))
145        self.device.hal.SendAcl(common.Data(payload=bytes(acl.Serialize())))
146
147    def send_acl_first(self, handle, data):
148        self.send_acl(handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, data)
149
150    def read_own_address(self):
151        self.send_hci_command(hci_packets.ReadBdAddrBuilder())
152        read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture()
153        assertThat(self.hci_event_stream).emits(read_bd_addr)
154        return read_bd_addr.get().GetBdAddr()
155
156    def set_random_le_address(self, addr):
157        self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr))
158        self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS)
159
160    def set_scan_parameters(self):
161        phy_scan_params = hci_packets.PhyScanParameters()
162        phy_scan_params.le_scan_interval = 6553
163        phy_scan_params.le_scan_window = 6553
164        phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE
165
166        self.send_hci_command(
167            hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
168                                                           hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
169                                                           [phy_scan_params]))
170        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS)
171
172    def unmask_event(self, *event_codes):
173        for event_code in event_codes:
174            self.event_mask |= 1 << (int(event_code) - 1)
175        self.send_hci_command(hci_packets.SetEventMaskBuilder(self.event_mask))
176
177    def unmask_le_event(self, *subevent_codes):
178        for subevent_code in subevent_codes:
179            self.le_event_mask |= 1 << (int(subevent_code) - 1)
180        self.send_hci_command(hci_packets.LeSetEventMaskBuilder(self.le_event_mask))
181
182    def start_scanning(self):
183        self.send_hci_command(
184            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
185                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
186        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)
187
188    def stop_scanning(self):
189        self.send_hci_command(
190            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED,
191                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
192        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)
193
194    def reset(self):
195        self.send_hci_command(hci_packets.ResetBuilder())
196        self.wait_for_complete(OpCode.RESET)
197
198    def enable_inquiry_and_page_scan(self):
199        self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN))
200
201    def initiate_connection(self, remote_addr):
202        self.send_hci_command(
203            hci_packets.CreateConnectionBuilder(
204                remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'),
205                0xcc18,  # Packet Type
206                hci_packets.PageScanRepetitionMode.R1,
207                0x0,
208                hci_packets.ClockOffsetValid.INVALID,
209                hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
210
211    def accept_connection(self):
212        connection_request = HciCaptures.ConnectionRequestCapture()
213        assertThat(self.hci_event_stream).emits(connection_request)
214
215        self.send_hci_command(
216            hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(),
217                                                       hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL))
218        return self.complete_connection()
219
220    def complete_connection(self):
221        connection_complete = HciCaptures.ConnectionCompleteCapture()
222        assertThat(self.hci_event_stream).emits(connection_complete)
223
224        handle = connection_complete.get().GetConnectionHandle()
225        return PyHalAclConnection(handle, self.acl_stream, self.device)
226
227    def initiate_le_connection(self, remote_addr):
228        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
229        phy_scan_params.scan_interval = 0x60
230        phy_scan_params.scan_window = 0x30
231        phy_scan_params.conn_interval_min = 0x18
232        phy_scan_params.conn_interval_max = 0x28
233        phy_scan_params.conn_latency = 0
234        phy_scan_params.supervision_timeout = 0x1f4
235        phy_scan_params.min_ce_length = 0
236        phy_scan_params.max_ce_length = 0
237        self.send_hci_command(
238            hci_packets.LeExtendedCreateConnectionBuilder(
239                hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
240                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
241        self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION)
242
243    def add_to_filter_accept_list(self, remote_addr):
244        self.send_hci_command(
245            hci_packets.LeAddDeviceToFilterAcceptListBuilder(hci_packets.FilterAcceptListAddressType.RANDOM,
246                                                             remote_addr))
247
248    def initiate_le_connection_by_filter_accept_list(self, remote_addr):
249        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
250        phy_scan_params.scan_interval = 0x60
251        phy_scan_params.scan_window = 0x30
252        phy_scan_params.conn_interval_min = 0x18
253        phy_scan_params.conn_interval_max = 0x28
254        phy_scan_params.conn_latency = 0
255        phy_scan_params.supervision_timeout = 0x1f4
256        phy_scan_params.min_ce_length = 0
257        phy_scan_params.max_ce_length = 0
258        self.send_hci_command(
259            hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST,
260                                                          hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
261                                                          hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1,
262                                                          [phy_scan_params]))
263
264    def complete_le_connection(self):
265        connection_complete = HciCaptures.LeConnectionCompleteCapture()
266        assertThat(self.hci_event_stream).emits(connection_complete)
267
268        handle = connection_complete.get().GetConnectionHandle()
269        return PyHalAclConnection(handle, self.acl_stream, self.device)
270
271    def create_advertisement(self,
272                             handle,
273                             own_address,
274                             properties=LegacyAdvertisingEventProperties.ADV_IND,
275                             min_interval=400,
276                             max_interval=450,
277                             channel_map=7,
278                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
279                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
280                             peer_address='00:00:00:00:00:00',
281                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
282                             tx_power=0xF8,
283                             sid=1,
284                             scan_request_notification=Enable.DISABLED):
285
286        self.send_hci_command(
287            LeSetExtendedAdvertisingParametersLegacyBuilder(handle, properties, min_interval, max_interval, channel_map,
288                                                            own_address_type, peer_address_type, peer_address,
289                                                            filter_policy, tx_power, sid, scan_request_notification))
290        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS)
291
292        self.send_hci_command(LeSetAdvertisingSetRandomAddressBuilder(handle, own_address))
293        self.wait_for_complete(OpCode.LE_SET_ADVERTISING_SET_RANDOM_ADDRESS)
294
295        return PyHalAdvertisement(handle, self)
296