• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#!/usr/bin/env python3
2#
3#   Copyright 2020 - The Android Open Source Project
4#
5#   Licensed under the Apache License, Version 2.0 (the "License");
6#   you may not use this file except in compliance with the License.
7#   You may obtain a copy of the License at
8#
9#       http://www.apache.org/licenses/LICENSE-2.0
10#
11#   Unless required by applicable law or agreed to in writing, software
12#   distributed under the License is distributed on an "AS IS" BASIS,
13#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14#   See the License for the specific language governing permissions and
15#   limitations under the License.
16
17from google.protobuf import empty_pb2 as empty_proto
18from cert.event_stream import EventStream
19from cert.event_stream import FilteringEventStream
20from cert.event_stream import IEventStream
21from cert.closable import Closable
22from cert.closable import safeClose
23from cert.captures import HciCaptures
24from cert.truth import assertThat
25from bluetooth_packets_python3.hci_packets import WriteScanEnableBuilder
26from bluetooth_packets_python3.hci_packets import ScanEnable
27from bluetooth_packets_python3.hci_packets import AclBuilder
28from bluetooth_packets_python3 import RawBuilder
29from bluetooth_packets_python3.hci_packets import BroadcastFlag
30from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag
31from bluetooth_packets_python3 import hci_packets
32from cert.matchers import HciMatchers
33from bluetooth_packets_python3.hci_packets import FilterDuplicates
34from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder
35from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties
36from bluetooth_packets_python3.hci_packets import PeerAddressType
37from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy
38from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder
39from bluetooth_packets_python3.hci_packets import GapData
40from bluetooth_packets_python3.hci_packets import GapDataType
41from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder
42from bluetooth_packets_python3.hci_packets import Operation
43from bluetooth_packets_python3.hci_packets import OwnAddressType
44from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy
45from bluetooth_packets_python3.hci_packets import Enable
46from bluetooth_packets_python3.hci_packets import FragmentPreference
47from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder
48from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder
49from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder
50from bluetooth_packets_python3.hci_packets import EnabledSet
51from bluetooth_packets_python3.hci_packets import OpCode
52from facade import common_pb2 as common
53
54
55class PyHalAclConnection(IEventStream):
56
57    def __init__(self, handle, acl_stream, device):
58        self.handle = int(handle)
59        self.device = device
60        self.our_acl_stream = FilteringEventStream(acl_stream, None)
61
62    def send(self, pb_flag, b_flag, data):
63        acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data))
64        self.device.hal.SendAcl(common.Data(payload=bytes(acl.Serialize())))
65
66    def send_first(self, data):
67        self.send(PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(data))
68
69    def get_event_queue(self):
70        return self.our_acl_stream.get_event_queue()
71
72
73class PyHalAdvertisement(object):
74
75    def __init__(self, handle, py_hal):
76        self.handle = handle
77        self.py_hal = py_hal
78
79    def set_data(self, complete_name):
80        data = GapData()
81        data.data_type = GapDataType.COMPLETE_LOCAL_NAME
82        data.data = list(bytes(complete_name))
83        self.py_hal.send_hci_command(
84            LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
85                                                FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
86        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA)
87
88    def set_scan_response(self, shortened_name):
89        data = GapData()
90        data.data_type = GapDataType.SHORTENED_LOCAL_NAME
91        data.data = list(bytes(shortened_name))
92        self.py_hal.send_hci_command(
93            LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT,
94                                                        FragmentPreference.CONTROLLER_SHOULD_NOT, [data]))
95        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_SCAN_RESPONSE)
96
97    def start(self):
98        enabled_set = EnabledSet()
99        enabled_set.advertising_handle = self.handle
100        enabled_set.duration = 0
101        enabled_set.max_extended_advertising_events = 0
102        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set]))
103        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)
104
105    def stop(self):
106        enabled_set = EnabledSet()
107        enabled_set.advertising_handle = self.handle
108        enabled_set.duration = 0
109        enabled_set.max_extended_advertising_events = 0
110        self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set]))
111        self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE)
112
113
114class PyHal(Closable):
115
116    def __init__(self, device):
117        self.device = device
118
119        self.hci_event_stream = EventStream(self.device.hal.StreamEvents(empty_proto.Empty()))
120        self.acl_stream = EventStream(self.device.hal.StreamAcl(empty_proto.Empty()))
121
122        # We don't deal with SCO for now
123
124    def close(self):
125        safeClose(self.hci_event_stream)
126        safeClose(self.acl_stream)
127
128    def get_hci_event_stream(self):
129        return self.hci_event_stream
130
131    def wait_for_complete(self, opcode):
132        assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode))
133
134    def wait_for_status(self, opcode):
135        assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode))
136
137    def get_acl_stream(self):
138        return self.acl_stream
139
140    def send_hci_command(self, command):
141        self.device.hal.SendCommand(common.Data(payload=bytes(command.Serialize())))
142
143    def send_acl(self, handle, pb_flag, b_flag, data):
144        acl = AclBuilder(handle, pb_flag, b_flag, RawBuilder(data))
145        self.device.hal.SendAcl(common.Data(payload=bytes(acl.Serialize())))
146
147    def send_acl_first(self, handle, data):
148        self.send_acl(handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, data)
149
150    def read_own_address(self):
151        self.send_hci_command(hci_packets.ReadBdAddrBuilder())
152        read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture()
153        assertThat(self.hci_event_stream).emits(read_bd_addr)
154        return read_bd_addr.get().GetBdAddr()
155
156    def set_random_le_address(self, addr):
157        self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr))
158        self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS)
159
160    def set_scan_parameters(self):
161        phy_scan_params = hci_packets.PhyScanParameters()
162        phy_scan_params.le_scan_interval = 6553
163        phy_scan_params.le_scan_window = 6553
164        phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE
165
166        self.send_hci_command(
167            hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
168                                                           hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1,
169                                                           [phy_scan_params]))
170        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS)
171
172    def start_scanning(self):
173        self.send_hci_command(
174            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED,
175                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
176        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)
177
178    def stop_scanning(self):
179        self.send_hci_command(
180            hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED,
181                                                       hci_packets.FilterDuplicates.DISABLED, 0, 0))
182        self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE)
183
184    def reset(self):
185        self.send_hci_command(hci_packets.ResetBuilder())
186        self.wait_for_complete(OpCode.RESET)
187
188    def enable_inquiry_and_page_scan(self):
189        self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN))
190
191    def initiate_connection(self, remote_addr):
192        self.send_hci_command(
193            hci_packets.CreateConnectionBuilder(
194                remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'),
195                0xcc18,  # Packet Type
196                hci_packets.PageScanRepetitionMode.R1,
197                0x0,
198                hci_packets.ClockOffsetValid.INVALID,
199                hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH))
200
201    def accept_connection(self):
202        connection_request = HciCaptures.ConnectionRequestCapture()
203        assertThat(self.hci_event_stream).emits(connection_request)
204
205        self.send_hci_command(
206            hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(),
207                                                       hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL))
208        return self.complete_connection()
209
210    def complete_connection(self):
211        connection_complete = HciCaptures.ConnectionCompleteCapture()
212        assertThat(self.hci_event_stream).emits(connection_complete)
213
214        handle = connection_complete.get().GetConnectionHandle()
215        return PyHalAclConnection(handle, self.acl_stream, self.device)
216
217    def initiate_le_connection(self, remote_addr):
218        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
219        phy_scan_params.scan_interval = 0x60
220        phy_scan_params.scan_window = 0x30
221        phy_scan_params.conn_interval_min = 0x18
222        phy_scan_params.conn_interval_max = 0x28
223        phy_scan_params.conn_latency = 0
224        phy_scan_params.supervision_timeout = 0x1f4
225        phy_scan_params.min_ce_length = 0
226        phy_scan_params.max_ce_length = 0
227        self.send_hci_command(
228            hci_packets.LeExtendedCreateConnectionBuilder(
229                hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
230                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
231        self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION)
232
233    def add_to_connect_list(self, remote_addr):
234        self.send_hci_command(
235            hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, remote_addr))
236
237    def initiate_le_connection_by_connect_list(self, remote_addr):
238        phy_scan_params = hci_packets.LeCreateConnPhyScanParameters()
239        phy_scan_params.scan_interval = 0x60
240        phy_scan_params.scan_window = 0x30
241        phy_scan_params.conn_interval_min = 0x18
242        phy_scan_params.conn_interval_max = 0x28
243        phy_scan_params.conn_latency = 0
244        phy_scan_params.supervision_timeout = 0x1f4
245        phy_scan_params.min_ce_length = 0
246        phy_scan_params.max_ce_length = 0
247        self.send_hci_command(
248            hci_packets.LeExtendedCreateConnectionBuilder(
249                hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS,
250                hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params]))
251
252    def complete_le_connection(self):
253        connection_complete = HciCaptures.LeConnectionCompleteCapture()
254        assertThat(self.hci_event_stream).emits(connection_complete)
255
256        handle = connection_complete.get().GetConnectionHandle()
257        return PyHalAclConnection(handle, self.acl_stream, self.device)
258
259    def create_advertisement(self,
260                             handle,
261                             own_address,
262                             properties=LegacyAdvertisingProperties.ADV_IND,
263                             min_interval=400,
264                             max_interval=450,
265                             channel_map=7,
266                             own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS,
267                             peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS,
268                             peer_address='00:00:00:00:00:00',
269                             filter_policy=AdvertisingFilterPolicy.ALL_DEVICES,
270                             tx_power=0xF8,
271                             sid=1,
272                             scan_request_notification=Enable.DISABLED):
273
274        self.send_hci_command(
275            LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map,
276                                                            own_address_type, peer_address_type, peer_address,
277                                                            filter_policy, tx_power, sid, scan_request_notification))
278        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS)
279
280        self.send_hci_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address))
281        self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_RANDOM_ADDRESS)
282
283        return PyHalAdvertisement(handle, self)
284