1#!/usr/bin/env python3 2# 3# Copyright 2020 - The Android Open Source Project 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17from google.protobuf import empty_pb2 as empty_proto 18from blueberry.tests.gd.cert.event_stream import EventStream 19from blueberry.tests.gd.cert.event_stream import FilteringEventStream 20from blueberry.tests.gd.cert.event_stream import IEventStream 21from blueberry.tests.gd.cert.closable import Closable 22from blueberry.tests.gd.cert.closable import safeClose 23from blueberry.tests.gd.cert.captures import HciCaptures 24from blueberry.tests.gd.cert.truth import assertThat 25from bluetooth_packets_python3.hci_packets import WriteScanEnableBuilder 26from bluetooth_packets_python3.hci_packets import ScanEnable 27from bluetooth_packets_python3.hci_packets import AclBuilder 28from bluetooth_packets_python3 import RawBuilder 29from bluetooth_packets_python3.hci_packets import BroadcastFlag 30from bluetooth_packets_python3.hci_packets import PacketBoundaryFlag 31from bluetooth_packets_python3 import hci_packets 32from blueberry.tests.gd.cert.matchers import HciMatchers 33from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingParametersLegacyBuilder 34from bluetooth_packets_python3.hci_packets import LegacyAdvertisingEventProperties 35from bluetooth_packets_python3.hci_packets import PeerAddressType 36from bluetooth_packets_python3.hci_packets import AdvertisingFilterPolicy 37from bluetooth_packets_python3.hci_packets import LeSetAdvertisingSetRandomAddressBuilder 38from bluetooth_packets_python3.hci_packets import GapData 39from bluetooth_packets_python3.hci_packets import GapDataType 40from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingDataBuilder 41from bluetooth_packets_python3.hci_packets import Operation 42from bluetooth_packets_python3.hci_packets import OwnAddressType 43from bluetooth_packets_python3.hci_packets import Enable 44from bluetooth_packets_python3.hci_packets import FragmentPreference 45from bluetooth_packets_python3.hci_packets import LeSetExtendedScanResponseDataBuilder 46from bluetooth_packets_python3.hci_packets import LeSetExtendedAdvertisingEnableBuilder 47from bluetooth_packets_python3.hci_packets import EnabledSet 48from bluetooth_packets_python3.hci_packets import OpCode 49from blueberry.facade import common_pb2 as common 50 51 52class PyHalAclConnection(IEventStream): 53 54 def __init__(self, handle, acl_stream, device): 55 self.handle = int(handle) 56 self.device = device 57 self.our_acl_stream = FilteringEventStream(acl_stream, None) 58 59 def send(self, pb_flag, b_flag, data): 60 acl = AclBuilder(self.handle, pb_flag, b_flag, RawBuilder(data)) 61 self.device.hal.SendAcl(common.Data(payload=bytes(acl.Serialize()))) 62 63 def send_first(self, data): 64 self.send(PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, bytes(data)) 65 66 def get_event_queue(self): 67 return self.our_acl_stream.get_event_queue() 68 69 70class PyHalAdvertisement(object): 71 72 def __init__(self, handle, py_hal): 73 self.handle = handle 74 self.py_hal = py_hal 75 76 def set_data(self, complete_name): 77 data = GapData() 78 data.data_type = GapDataType.COMPLETE_LOCAL_NAME 79 data.data = list(bytes(complete_name)) 80 self.py_hal.send_hci_command( 81 LeSetExtendedAdvertisingDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, 82 FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) 83 self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_DATA) 84 85 def set_scan_response(self, shortened_name): 86 data = GapData() 87 data.data_type = GapDataType.SHORTENED_LOCAL_NAME 88 data.data = list(bytes(shortened_name)) 89 self.py_hal.send_hci_command( 90 LeSetExtendedScanResponseDataBuilder(self.handle, Operation.COMPLETE_ADVERTISEMENT, 91 FragmentPreference.CONTROLLER_SHOULD_NOT, [data])) 92 self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_RESPONSE_DATA) 93 94 def start(self): 95 enabled_set = EnabledSet() 96 enabled_set.advertising_handle = self.handle 97 enabled_set.duration = 0 98 enabled_set.max_extended_advertising_events = 0 99 self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.ENABLED, [enabled_set])) 100 self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) 101 102 def stop(self): 103 enabled_set = EnabledSet() 104 enabled_set.advertising_handle = self.handle 105 enabled_set.duration = 0 106 enabled_set.max_extended_advertising_events = 0 107 self.py_hal.send_hci_command(LeSetExtendedAdvertisingEnableBuilder(Enable.DISABLED, [enabled_set])) 108 self.py_hal.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_ENABLE) 109 110 111class PyHal(Closable): 112 113 def __init__(self, device): 114 self.device = device 115 116 self.hci_event_stream = EventStream(self.device.hal.StreamEvents(empty_proto.Empty())) 117 self.acl_stream = EventStream(self.device.hal.StreamAcl(empty_proto.Empty())) 118 119 self.event_mask = 0x1FFF_FFFF_FFFF # Default Event Mask (Core Vol 4 [E] 7.3.1) 120 self.le_event_mask = 0x0000_0000_001F # Default LE Event Mask (Core Vol 4 [E] 7.8.1) 121 122 # We don't deal with SCO for now 123 124 def close(self): 125 safeClose(self.hci_event_stream) 126 safeClose(self.acl_stream) 127 128 def get_hci_event_stream(self): 129 return self.hci_event_stream 130 131 def wait_for_complete(self, opcode): 132 assertThat(self.hci_event_stream).emits(HciMatchers.CommandComplete(opcode)) 133 134 def wait_for_status(self, opcode): 135 assertThat(self.hci_event_stream).emits(HciMatchers.CommandStatus(opcode)) 136 137 def get_acl_stream(self): 138 return self.acl_stream 139 140 def send_hci_command(self, command): 141 self.device.hal.SendCommand(common.Data(payload=bytes(command.Serialize()))) 142 143 def send_acl(self, handle, pb_flag, b_flag, data): 144 acl = AclBuilder(handle, pb_flag, b_flag, RawBuilder(data)) 145 self.device.hal.SendAcl(common.Data(payload=bytes(acl.Serialize()))) 146 147 def send_acl_first(self, handle, data): 148 self.send_acl(handle, PacketBoundaryFlag.FIRST_NON_AUTOMATICALLY_FLUSHABLE, BroadcastFlag.POINT_TO_POINT, data) 149 150 def read_own_address(self): 151 self.send_hci_command(hci_packets.ReadBdAddrBuilder()) 152 read_bd_addr = HciCaptures.ReadBdAddrCompleteCapture() 153 assertThat(self.hci_event_stream).emits(read_bd_addr) 154 return read_bd_addr.get().GetBdAddr() 155 156 def set_random_le_address(self, addr): 157 self.send_hci_command(hci_packets.LeSetRandomAddressBuilder(addr)) 158 self.wait_for_complete(OpCode.LE_SET_RANDOM_ADDRESS) 159 160 def set_scan_parameters(self): 161 phy_scan_params = hci_packets.PhyScanParameters() 162 phy_scan_params.le_scan_interval = 6553 163 phy_scan_params.le_scan_window = 6553 164 phy_scan_params.le_scan_type = hci_packets.LeScanType.ACTIVE 165 166 self.send_hci_command( 167 hci_packets.LeSetExtendedScanParametersBuilder(hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 168 hci_packets.LeScanningFilterPolicy.ACCEPT_ALL, 1, 169 [phy_scan_params])) 170 self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_PARAMETERS) 171 172 def unmask_event(self, *event_codes): 173 for event_code in event_codes: 174 self.event_mask |= 1 << (int(event_code) - 1) 175 self.send_hci_command(hci_packets.SetEventMaskBuilder(self.event_mask)) 176 177 def unmask_le_event(self, *subevent_codes): 178 for subevent_code in subevent_codes: 179 self.le_event_mask |= 1 << (int(subevent_code) - 1) 180 self.send_hci_command(hci_packets.LeSetEventMaskBuilder(self.le_event_mask)) 181 182 def start_scanning(self): 183 self.send_hci_command( 184 hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.ENABLED, 185 hci_packets.FilterDuplicates.DISABLED, 0, 0)) 186 self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE) 187 188 def stop_scanning(self): 189 self.send_hci_command( 190 hci_packets.LeSetExtendedScanEnableBuilder(hci_packets.Enable.DISABLED, 191 hci_packets.FilterDuplicates.DISABLED, 0, 0)) 192 self.wait_for_complete(OpCode.LE_SET_EXTENDED_SCAN_ENABLE) 193 194 def reset(self): 195 self.send_hci_command(hci_packets.ResetBuilder()) 196 self.wait_for_complete(OpCode.RESET) 197 198 def enable_inquiry_and_page_scan(self): 199 self.send_hci_command(WriteScanEnableBuilder(ScanEnable.INQUIRY_AND_PAGE_SCAN)) 200 201 def initiate_connection(self, remote_addr): 202 self.send_hci_command( 203 hci_packets.CreateConnectionBuilder( 204 remote_addr if isinstance(remote_addr, str) else remote_addr.decode('utf-8'), 205 0xcc18, # Packet Type 206 hci_packets.PageScanRepetitionMode.R1, 207 0x0, 208 hci_packets.ClockOffsetValid.INVALID, 209 hci_packets.CreateConnectionRoleSwitch.ALLOW_ROLE_SWITCH)) 210 211 def accept_connection(self): 212 connection_request = HciCaptures.ConnectionRequestCapture() 213 assertThat(self.hci_event_stream).emits(connection_request) 214 215 self.send_hci_command( 216 hci_packets.AcceptConnectionRequestBuilder(connection_request.get().GetBdAddr(), 217 hci_packets.AcceptConnectionRequestRole.REMAIN_PERIPHERAL)) 218 return self.complete_connection() 219 220 def complete_connection(self): 221 connection_complete = HciCaptures.ConnectionCompleteCapture() 222 assertThat(self.hci_event_stream).emits(connection_complete) 223 224 handle = connection_complete.get().GetConnectionHandle() 225 return PyHalAclConnection(handle, self.acl_stream, self.device) 226 227 def initiate_le_connection(self, remote_addr): 228 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 229 phy_scan_params.scan_interval = 0x60 230 phy_scan_params.scan_window = 0x30 231 phy_scan_params.conn_interval_min = 0x18 232 phy_scan_params.conn_interval_max = 0x28 233 phy_scan_params.conn_latency = 0 234 phy_scan_params.supervision_timeout = 0x1f4 235 phy_scan_params.min_ce_length = 0 236 phy_scan_params.max_ce_length = 0 237 self.send_hci_command( 238 hci_packets.LeExtendedCreateConnectionBuilder( 239 hci_packets.InitiatorFilterPolicy.USE_PEER_ADDRESS, hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 240 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, [phy_scan_params])) 241 self.wait_for_status(OpCode.LE_EXTENDED_CREATE_CONNECTION) 242 243 def add_to_filter_accept_list(self, remote_addr): 244 self.send_hci_command( 245 hci_packets.LeAddDeviceToFilterAcceptListBuilder(hci_packets.FilterAcceptListAddressType.RANDOM, 246 remote_addr)) 247 248 def initiate_le_connection_by_filter_accept_list(self, remote_addr): 249 phy_scan_params = hci_packets.LeCreateConnPhyScanParameters() 250 phy_scan_params.scan_interval = 0x60 251 phy_scan_params.scan_window = 0x30 252 phy_scan_params.conn_interval_min = 0x18 253 phy_scan_params.conn_interval_max = 0x28 254 phy_scan_params.conn_latency = 0 255 phy_scan_params.supervision_timeout = 0x1f4 256 phy_scan_params.min_ce_length = 0 257 phy_scan_params.max_ce_length = 0 258 self.send_hci_command( 259 hci_packets.LeExtendedCreateConnectionBuilder(hci_packets.InitiatorFilterPolicy.USE_FILTER_ACCEPT_LIST, 260 hci_packets.OwnAddressType.RANDOM_DEVICE_ADDRESS, 261 hci_packets.AddressType.RANDOM_DEVICE_ADDRESS, remote_addr, 1, 262 [phy_scan_params])) 263 264 def complete_le_connection(self): 265 connection_complete = HciCaptures.LeConnectionCompleteCapture() 266 assertThat(self.hci_event_stream).emits(connection_complete) 267 268 handle = connection_complete.get().GetConnectionHandle() 269 return PyHalAclConnection(handle, self.acl_stream, self.device) 270 271 def create_advertisement(self, 272 handle, 273 own_address, 274 properties=LegacyAdvertisingEventProperties.ADV_IND, 275 min_interval=400, 276 max_interval=450, 277 channel_map=7, 278 own_address_type=OwnAddressType.RANDOM_DEVICE_ADDRESS, 279 peer_address_type=PeerAddressType.PUBLIC_DEVICE_OR_IDENTITY_ADDRESS, 280 peer_address='00:00:00:00:00:00', 281 filter_policy=AdvertisingFilterPolicy.ALL_DEVICES, 282 tx_power=0xF8, 283 sid=1, 284 scan_request_notification=Enable.DISABLED): 285 286 self.send_hci_command( 287 LeSetExtendedAdvertisingParametersLegacyBuilder(handle, properties, min_interval, max_interval, channel_map, 288 own_address_type, peer_address_type, peer_address, 289 filter_policy, tx_power, sid, scan_request_notification)) 290 self.wait_for_complete(OpCode.LE_SET_EXTENDED_ADVERTISING_PARAMETERS) 291 292 self.send_hci_command(LeSetAdvertisingSetRandomAddressBuilder(handle, own_address)) 293 self.wait_for_complete(OpCode.LE_SET_ADVERTISING_SET_RANDOM_ADDRESS) 294 295 return PyHalAdvertisement(handle, self) 296