1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18from cert.event_stream import EventStream 19from cert.event_stream import FilteringEventStream 20from cert.event_stream import IEventStream 21from cert.closable import Closable 22from cert.closable import safeClose 23from cert.captures import HciCaptures 24from cert.truth import assertThat 25from bluetooth_packets_python3.hci_packets import WriteScanEnableBuilder 26from bluetooth_packets_python3.hci_packets import ScanEnable 27from bluetooth_packets_python3.hci_packets import AclBuilder 28from bluetooth_packets_python3 import RawBuilder 29from bluetooth_packets_python3.hci_packets import BroadcastFlag 30from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag 31from bluetooth_packets_python3 import hci_packets 32from cert.matchers import HciMatchers 33from bluetooth_packets_python3.hci_packets import FilterDuplicates 34from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingLegacyParametersBuilder 35from bluetooth_packets_python3.hci_packets import LegacyAdvertisingProperties 36from bluetooth_packets_python3.hci_packets import PeerAddressType 37from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy 38from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingRandomAddressBuilder 39from bluetooth_packets_python3.hci_packets import GapData 40from bluetooth_packets_python3.hci_packets import GapDataType 41from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder 42from bluetooth_packets_python3.hci_packets import Operation 43from bluetooth_packets_python3.hci_packets import OwnAddressType 44from bluetooth_packets_python3.hci_packets import LeScanningFilterPolicy 45from bluetooth_packets_python3.hci_packets import Enable 46from bluetooth_packets_python3.hci_packets import FragmentPreference 47from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingScanResponseBuilder 48from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder 49from bluetooth_packets_python3.hci_packets import LeSetExtendedScanEnableBuilder 50from bluetooth_packets_python3.hci_packets import EnabledSet 51from bluetooth_packets_python3.hci_packets import OpCode 52from facade import common_pb2 as common 53 54 55class PyHalAclConnection(IEventStream): 56 57 def __init__(self, handle, acl_stream, device): 58 self.handle = int(handle) 59 self.device = device 60 self.our_acl_stream = FilteringEventStream(acl_stream, None) 61 62 def send(self, pb_flag, b_flag, data): 63 acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data)) 64 self.device.hal.SendAcl(common.Data(payload=bytes(acl.Serialize()))) 65 66 def send_first(self, data): 67 self.send(PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(data)) 68 69 def get_event_queue(self): 70 return self.our_acl_stream.get_event_queue() 71 72 73class PyHalAdvertisement(object): 74 75 def __init__(self, handle, py_hal): 76 self.handle = handle 77 self.py_hal = py_hal 78 79 def set_data(self, complete_name): 80 data = GapData() 81 data.data_type = GapDataType.COMPLETE_LOCAL_NAME 82 data.data = list(bytes(complete_name)) 83 self.py_hal.send_hci_command( 84 LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, 85 FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) 86 self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA) 87 88 def set_scan_response(self, shortened_name): 89 data = GapData() 90 data.data_type = GapDataType.SHORTENED_LOCAL_NAME 91 data.data = list(bytes(shortened_name)) 92 self.py_hal.send_hci_command( 93 LeSetExtendedAdvertisingScanResponseBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, 94 FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) 95 self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_SCAN_RESPONSE) 96 97 def start(self): 98 enabled_set = EnabledSet() 99 enabled_set.advertising_handle = self.handle 100 enabled_set.duration = 0 101 enabled_set.max_extended_advertising_events = 0 102 self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) 103 self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) 104 105 def stop(self): 106 enabled_set = EnabledSet() 107 enabled_set.advertising_handle = self.handle 108 enabled_set.duration = 0 109 enabled_set.max_extended_advertising_events = 0 110 self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set])) 111 self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) 112 113 114class PyHal(Closable): 115 116 def __init__(self, device): 117 self.device = device 118 119 self.hci_event_stream = EventStream(self.device.hal.StreamEvents(empty_proto.Empty())) 120 self.acl_stream = EventStream(self.device.hal.StreamAcl(empty_proto.Empty())) 121 122 # We don't deal with SCO for now 123 124 def close(self): 125 safeClose(self.hci_event_stream) 126 safeClose(self.acl_stream) 127 128 def get_hci_event_stream(self): 129 return self.hci_event_stream 130 131 def wait_for_complete(self, opcode): 132 assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode)) 133 134 def wait_for_status(self, opcode): 135 assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode)) 136 137 def get_acl_stream(self): 138 return self.acl_stream 139 140 def send_hci_command(self, command): 141 self.device.hal.SendCommand(common.Data(payload=bytes(command.Serialize()))) 142 143 def send_acl(self, handle, pb_flag, b_flag, data): 144 acl = AclBuilder(handle, pb_flag, b_flag, RawBuilder(data)) 145 self.device.hal.SendAcl(common.Data(payload=bytes(acl.Serialize()))) 146 147 def send_acl_first(self, handle, data): 148 self.send_acl(handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, data) 149 150 def read_own_address(self): 151 self.send_hci_command(hci_packets.ReadBdAddrBuilder()) 152 read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture() 153 assertThat(self.hci_event_stream).emits(read_bd_addr) 154 return read_bd_addr.get().GetBdAddr() 155 156 def set_random_le_address(self, addr): 157 self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr)) 158 self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS) 159 160 def set_scan_parameters(self): 161 phy_scan_params = hci_packets.PhyScanParameters() 162 phy_scan_params.le_scan_interval = 6553 163 phy_scan_params.le_scan_window = 6553 164 phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE 165 166 self.send_hci_command( 167 hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 168 hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1, 169 [phy_scan_params])) 170 self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS) 171 172 def start_scanning(self): 173 self.send_hci_command( 174 hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, 175 hci_packets.FilterDuplicates.DISABLED, 0, 0)) 176 self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE) 177 178 def stop_scanning(self): 179 self.send_hci_command( 180 hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED, 181 hci_packets.FilterDuplicates.DISABLED, 0, 0)) 182 self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE) 183 184 def reset(self): 185 self.send_hci_command(hci_packets.ResetBuilder()) 186 self.wait_for_complete(OpCode.RESET) 187 188 def enable_inquiry_and_page_scan(self): 189 self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) 190 191 def initiate_connection(self, remote_addr): 192 self.send_hci_command( 193 hci_packets.CreateConnectionBuilder( 194 remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'), 195 0xcc18, # Packet Type 196 hci_packets.PageScanRepetitionMode.R1, 197 0x0, 198 hci_packets.ClockOffsetValid.INVALID, 199 hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) 200 201 def accept_connection(self): 202 connection_request = HciCaptures.ConnectionRequestCapture() 203 assertThat(self.hci_event_stream).emits(connection_request) 204 205 self.send_hci_command( 206 hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(), 207 hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) 208 return self.complete_connection() 209 210 def complete_connection(self): 211 connection_complete = HciCaptures.ConnectionCompleteCapture() 212 assertThat(self.hci_event_stream).emits(connection_complete) 213 214 handle = connection_complete.get().GetConnectionHandle() 215 return PyHalAclConnection(handle, self.acl_stream, self.device) 216 217 def initiate_le_connection(self, remote_addr): 218 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 219 phy_scan_params.scan_interval = 0x60 220 phy_scan_params.scan_window = 0x30 221 phy_scan_params.conn_interval_min = 0x18 222 phy_scan_params.conn_interval_max = 0x28 223 phy_scan_params.conn_latency = 0 224 phy_scan_params.supervision_timeout = 0x1f4 225 phy_scan_params.min_ce_length = 0 226 phy_scan_params.max_ce_length = 0 227 self.send_hci_command( 228 hci_packets.LeExtendedCreateConnectionBuilder( 229 hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 230 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params])) 231 self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION) 232 233 def add_to_connect_list(self, remote_addr): 234 self.send_hci_command( 235 hci_packets.LeAddDeviceToConnectListBuilder(hci_packets.ConnectListAddressType.RANDOM, remote_addr)) 236 237 def initiate_le_connection_by_connect_list(self, remote_addr): 238 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 239 phy_scan_params.scan_interval = 0x60 240 phy_scan_params.scan_window = 0x30 241 phy_scan_params.conn_interval_min = 0x18 242 phy_scan_params.conn_interval_max = 0x28 243 phy_scan_params.conn_latency = 0 244 phy_scan_params.supervision_timeout = 0x1f4 245 phy_scan_params.min_ce_length = 0 246 phy_scan_params.max_ce_length = 0 247 self.send_hci_command( 248 hci_packets.LeExtendedCreateConnectionBuilder( 249 hci_packets.InitiatorFilterPolicy.USE_CONNECT_LIST, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 250 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params])) 251 252 def complete_le_connection(self): 253 connection_complete = HciCaptures.LeConnectionCompleteCapture() 254 assertThat(self.hci_event_stream).emits(connection_complete) 255 256 handle = connection_complete.get().GetConnectionHandle() 257 return PyHalAclConnection(handle, self.acl_stream, self.device) 258 259 def create_advertisement(self, 260 handle, 261 own_address, 262 properties=LegacyAdvertisingProperties.ADV_IND, 263 min_interval=400, 264 max_interval=450, 265 channel_map=7, 266 own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS, 267 peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 268 peer_address='00:00:00:00:00:00', 269 filter_policy=AdvertisingFilterPolicy.ALL_DEVICES, 270 tx_power=0xF8, 271 sid=1, 272 scan_request_notification=Enable.DISABLED): 273 274 self.send_hci_command( 275 LeSetExtendedAdvertisingLegacyParametersBuilder(handle, properties, min_interval, max_interval, channel_map, 276 own_address_type, peer_address_type, peer_address, 277 filter_policy, tx_power, sid, scan_request_notification)) 278 self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS) 279 280 self.send_hci_command(LeSetExtendedAdvertisingRandomAddressBuilder(handle, own_address)) 281 self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_RANDOM_ADDRESS) 282 283 return PyHalAdvertisement(handle, self) 284