1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19import asyncio 20import collections 21import dataclasses 22import logging 23import struct 24 25from typing import ( 26 Any, 27 Awaitable, 28 Callable, 29 Deque, 30 Dict, 31 Optional, 32 Set, 33 cast, 34 TYPE_CHECKING, 35) 36 37from bumble.colors import color 38from bumble.l2cap import L2CAP_PDU 39from bumble.snoop import Snooper 40from bumble import drivers 41from bumble import hci 42from bumble.core import ( 43 BT_BR_EDR_TRANSPORT, 44 BT_LE_TRANSPORT, 45 ConnectionPHY, 46 ConnectionParameters, 47) 48from bumble.utils import AbortableEventEmitter 49from bumble.transport.common import TransportLostError 50 51if TYPE_CHECKING: 52 from .transport.common import TransportSink, TransportSource 53 54 55# ----------------------------------------------------------------------------- 56# Logging 57# ----------------------------------------------------------------------------- 58logger = logging.getLogger(__name__) 59 60 61# ----------------------------------------------------------------------------- 62class AclPacketQueue: 63 max_packet_size: int 64 65 def __init__( 66 self, 67 max_packet_size: int, 68 max_in_flight: int, 69 send: Callable[[hci.HCI_Packet], None], 70 ) -> None: 71 self.max_packet_size = max_packet_size 72 self.max_in_flight = max_in_flight 73 self.in_flight = 0 74 self.send = send 75 self.packets: Deque[hci.HCI_AclDataPacket] = collections.deque() 76 77 def enqueue(self, packet: hci.HCI_AclDataPacket) -> None: 78 self.packets.appendleft(packet) 79 self.check_queue() 80 81 if self.packets: 82 logger.debug( 83 f'{self.in_flight} ACL packets in flight, ' 84 f'{len(self.packets)} in queue' 85 ) 86 87 def check_queue(self) -> None: 88 while self.packets and self.in_flight < self.max_in_flight: 89 packet = self.packets.pop() 90 self.send(packet) 91 self.in_flight += 1 92 93 def on_packets_completed(self, packet_count: int) -> None: 94 if packet_count > self.in_flight: 95 logger.warning( 96 color( 97 '!!! {packet_count} completed but only ' 98 f'{self.in_flight} in flight' 99 ) 100 ) 101 packet_count = self.in_flight 102 103 self.in_flight -= packet_count 104 self.check_queue() 105 106 107# ----------------------------------------------------------------------------- 108class Connection: 109 def __init__( 110 self, host: Host, handle: int, peer_address: hci.Address, transport: int 111 ): 112 self.host = host 113 self.handle = handle 114 self.peer_address = peer_address 115 self.assembler = hci.HCI_AclDataPacketAssembler(self.on_acl_pdu) 116 self.transport = transport 117 acl_packet_queue: Optional[AclPacketQueue] = ( 118 host.le_acl_packet_queue 119 if transport == BT_LE_TRANSPORT 120 else host.acl_packet_queue 121 ) 122 assert acl_packet_queue 123 self.acl_packet_queue = acl_packet_queue 124 125 def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None: 126 self.assembler.feed_packet(packet) 127 128 def on_acl_pdu(self, pdu: bytes) -> None: 129 l2cap_pdu = L2CAP_PDU.from_bytes(pdu) 130 self.host.on_l2cap_pdu(self, l2cap_pdu.cid, l2cap_pdu.payload) 131 132 133# ----------------------------------------------------------------------------- 134@dataclasses.dataclass 135class ScoLink: 136 peer_address: hci.Address 137 handle: int 138 139 140# ----------------------------------------------------------------------------- 141@dataclasses.dataclass 142class CisLink: 143 peer_address: hci.Address 144 handle: int 145 146 147# ----------------------------------------------------------------------------- 148class Host(AbortableEventEmitter): 149 connections: Dict[int, Connection] 150 cis_links: Dict[int, CisLink] 151 sco_links: Dict[int, ScoLink] 152 acl_packet_queue: Optional[AclPacketQueue] = None 153 le_acl_packet_queue: Optional[AclPacketQueue] = None 154 hci_sink: Optional[TransportSink] = None 155 hci_metadata: Dict[str, Any] 156 long_term_key_provider: Optional[ 157 Callable[[int, bytes, int], Awaitable[Optional[bytes]]] 158 ] 159 link_key_provider: Optional[Callable[[hci.Address], Awaitable[Optional[bytes]]]] 160 161 def __init__( 162 self, 163 controller_source: Optional[TransportSource] = None, 164 controller_sink: Optional[TransportSink] = None, 165 ) -> None: 166 super().__init__() 167 168 self.hci_metadata = {} 169 self.ready = False # True when we can accept incoming packets 170 self.connections = {} # Connections, by connection handle 171 self.cis_links = {} # CIS links, by connection handle 172 self.sco_links = {} # SCO links, by connection handle 173 self.pending_command = None 174 self.pending_response = None 175 self.number_of_supported_advertising_sets = 0 176 self.maximum_advertising_data_length = 31 177 self.local_version = None 178 self.local_supported_commands = 0 179 self.local_le_features = 0 180 self.local_lmp_features = hci.LmpFeatureMask(0) # Classic LMP features 181 self.suggested_max_tx_octets = 251 # Max allowed 182 self.suggested_max_tx_time = 2120 # Max allowed 183 self.command_semaphore = asyncio.Semaphore(1) 184 self.long_term_key_provider = None 185 self.link_key_provider = None 186 self.pairing_io_capability_provider = None # Classic only 187 self.snooper: Optional[Snooper] = None 188 189 # Connect to the source and sink if specified 190 if controller_source: 191 self.set_packet_source(controller_source) 192 if controller_sink: 193 self.set_packet_sink(controller_sink) 194 195 def find_connection_by_bd_addr( 196 self, 197 bd_addr: hci.Address, 198 transport: Optional[int] = None, 199 check_address_type: bool = False, 200 ) -> Optional[Connection]: 201 for connection in self.connections.values(): 202 if connection.peer_address.to_bytes() == bd_addr.to_bytes(): 203 if ( 204 check_address_type 205 and connection.peer_address.address_type != bd_addr.address_type 206 ): 207 continue 208 if transport is None or connection.transport == transport: 209 return connection 210 211 return None 212 213 async def flush(self) -> None: 214 # Make sure no command is pending 215 await self.command_semaphore.acquire() 216 217 # Flush current host state, then release command semaphore 218 self.emit('flush') 219 self.command_semaphore.release() 220 221 async def reset(self, driver_factory=drivers.get_driver_for_host): 222 if self.ready: 223 self.ready = False 224 await self.flush() 225 226 # Instantiate and init a driver for the host if needed. 227 # NOTE: we don't keep a reference to the driver here, because we don't 228 # currently have a need for the driver later on. But if the driver interface 229 # evolves, it may be required, then, to store a reference to the driver in 230 # an object property. 231 reset_needed = True 232 if driver_factory is not None: 233 if driver := await driver_factory(self): 234 await driver.init_controller() 235 reset_needed = False 236 237 # Send a reset command unless a driver has already done so. 238 if reset_needed: 239 await self.send_command(hci.HCI_Reset_Command(), check_result=True) 240 self.ready = True 241 242 response = await self.send_command( 243 hci.HCI_Read_Local_Supported_Commands_Command(), check_result=True 244 ) 245 self.local_supported_commands = int.from_bytes( 246 response.return_parameters.supported_commands, 'little' 247 ) 248 249 if self.supports_command(hci.HCI_LE_READ_LOCAL_SUPPORTED_FEATURES_COMMAND): 250 response = await self.send_command( 251 hci.HCI_LE_Read_Local_Supported_Features_Command(), check_result=True 252 ) 253 self.local_le_features = struct.unpack( 254 '<Q', response.return_parameters.le_features 255 )[0] 256 257 if self.supports_command(hci.HCI_READ_LOCAL_VERSION_INFORMATION_COMMAND): 258 response = await self.send_command( 259 hci.HCI_Read_Local_Version_Information_Command(), check_result=True 260 ) 261 self.local_version = response.return_parameters 262 263 if self.supports_command(hci.HCI_READ_LOCAL_EXTENDED_FEATURES_COMMAND): 264 max_page_number = 0 265 page_number = 0 266 lmp_features = 0 267 while page_number <= max_page_number: 268 response = await self.send_command( 269 hci.HCI_Read_Local_Extended_Features_Command( 270 page_number=page_number 271 ), 272 check_result=True, 273 ) 274 lmp_features |= int.from_bytes( 275 response.return_parameters.extended_lmp_features, 'little' 276 ) << (64 * page_number) 277 max_page_number = response.return_parameters.maximum_page_number 278 page_number += 1 279 self.local_lmp_features = hci.LmpFeatureMask(lmp_features) 280 281 elif self.supports_command(hci.HCI_READ_LOCAL_SUPPORTED_FEATURES_COMMAND): 282 response = await self.send_command( 283 hci.HCI_Read_Local_Supported_Features_Command(), check_result=True 284 ) 285 self.local_lmp_features = hci.LmpFeatureMask( 286 int.from_bytes(response.return_parameters.lmp_features, 'little') 287 ) 288 289 await self.send_command( 290 hci.HCI_Set_Event_Mask_Command( 291 event_mask=hci.HCI_Set_Event_Mask_Command.mask( 292 [ 293 hci.HCI_INQUIRY_COMPLETE_EVENT, 294 hci.HCI_INQUIRY_RESULT_EVENT, 295 hci.HCI_CONNECTION_COMPLETE_EVENT, 296 hci.HCI_CONNECTION_REQUEST_EVENT, 297 hci.HCI_DISCONNECTION_COMPLETE_EVENT, 298 hci.HCI_AUTHENTICATION_COMPLETE_EVENT, 299 hci.HCI_REMOTE_NAME_REQUEST_COMPLETE_EVENT, 300 hci.HCI_ENCRYPTION_CHANGE_EVENT, 301 hci.HCI_CHANGE_CONNECTION_LINK_KEY_COMPLETE_EVENT, 302 hci.HCI_LINK_KEY_TYPE_CHANGED_EVENT, 303 hci.HCI_READ_REMOTE_SUPPORTED_FEATURES_COMPLETE_EVENT, 304 hci.HCI_READ_REMOTE_VERSION_INFORMATION_COMPLETE_EVENT, 305 hci.HCI_QOS_SETUP_COMPLETE_EVENT, 306 hci.HCI_HARDWARE_ERROR_EVENT, 307 hci.HCI_FLUSH_OCCURRED_EVENT, 308 hci.HCI_ROLE_CHANGE_EVENT, 309 hci.HCI_MODE_CHANGE_EVENT, 310 hci.HCI_RETURN_LINK_KEYS_EVENT, 311 hci.HCI_PIN_CODE_REQUEST_EVENT, 312 hci.HCI_LINK_KEY_REQUEST_EVENT, 313 hci.HCI_LINK_KEY_NOTIFICATION_EVENT, 314 hci.HCI_LOOPBACK_COMMAND_EVENT, 315 hci.HCI_DATA_BUFFER_OVERFLOW_EVENT, 316 hci.HCI_MAX_SLOTS_CHANGE_EVENT, 317 hci.HCI_READ_CLOCK_OFFSET_COMPLETE_EVENT, 318 hci.HCI_CONNECTION_PACKET_TYPE_CHANGED_EVENT, 319 hci.HCI_QOS_VIOLATION_EVENT, 320 hci.HCI_PAGE_SCAN_REPETITION_MODE_CHANGE_EVENT, 321 hci.HCI_FLOW_SPECIFICATION_COMPLETE_EVENT, 322 hci.HCI_INQUIRY_RESULT_WITH_RSSI_EVENT, 323 hci.HCI_READ_REMOTE_EXTENDED_FEATURES_COMPLETE_EVENT, 324 hci.HCI_SYNCHRONOUS_CONNECTION_COMPLETE_EVENT, 325 hci.HCI_SYNCHRONOUS_CONNECTION_CHANGED_EVENT, 326 hci.HCI_SNIFF_SUBRATING_EVENT, 327 hci.HCI_EXTENDED_INQUIRY_RESULT_EVENT, 328 hci.HCI_ENCRYPTION_KEY_REFRESH_COMPLETE_EVENT, 329 hci.HCI_IO_CAPABILITY_REQUEST_EVENT, 330 hci.HCI_IO_CAPABILITY_RESPONSE_EVENT, 331 hci.HCI_USER_CONFIRMATION_REQUEST_EVENT, 332 hci.HCI_USER_PASSKEY_REQUEST_EVENT, 333 hci.HCI_REMOTE_OOB_DATA_REQUEST_EVENT, 334 hci.HCI_SIMPLE_PAIRING_COMPLETE_EVENT, 335 hci.HCI_LINK_SUPERVISION_TIMEOUT_CHANGED_EVENT, 336 hci.HCI_ENHANCED_FLUSH_COMPLETE_EVENT, 337 hci.HCI_USER_PASSKEY_NOTIFICATION_EVENT, 338 hci.HCI_KEYPRESS_NOTIFICATION_EVENT, 339 hci.HCI_REMOTE_HOST_SUPPORTED_FEATURES_NOTIFICATION_EVENT, 340 hci.HCI_LE_META_EVENT, 341 ] 342 ) 343 ) 344 ) 345 346 if ( 347 self.local_version is not None 348 and self.local_version.hci_version <= hci.HCI_VERSION_BLUETOOTH_CORE_4_0 349 ): 350 # Some older controllers don't like event masks with bits they don't 351 # understand 352 le_event_mask = bytes.fromhex('1F00000000000000') 353 else: 354 le_event_mask = hci.HCI_LE_Set_Event_Mask_Command.mask( 355 [ 356 hci.HCI_LE_CONNECTION_COMPLETE_EVENT, 357 hci.HCI_LE_ADVERTISING_REPORT_EVENT, 358 hci.HCI_LE_CONNECTION_UPDATE_COMPLETE_EVENT, 359 hci.HCI_LE_READ_REMOTE_FEATURES_COMPLETE_EVENT, 360 hci.HCI_LE_LONG_TERM_KEY_REQUEST_EVENT, 361 hci.HCI_LE_REMOTE_CONNECTION_PARAMETER_REQUEST_EVENT, 362 hci.HCI_LE_DATA_LENGTH_CHANGE_EVENT, 363 hci.HCI_LE_READ_LOCAL_P_256_PUBLIC_KEY_COMPLETE_EVENT, 364 hci.HCI_LE_GENERATE_DHKEY_COMPLETE_EVENT, 365 hci.HCI_LE_ENHANCED_CONNECTION_COMPLETE_EVENT, 366 hci.HCI_LE_DIRECTED_ADVERTISING_REPORT_EVENT, 367 hci.HCI_LE_PHY_UPDATE_COMPLETE_EVENT, 368 hci.HCI_LE_EXTENDED_ADVERTISING_REPORT_EVENT, 369 hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_ESTABLISHED_EVENT, 370 hci.HCI_LE_PERIODIC_ADVERTISING_REPORT_EVENT, 371 hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_LOST_EVENT, 372 hci.HCI_LE_SCAN_TIMEOUT_EVENT, 373 hci.HCI_LE_ADVERTISING_SET_TERMINATED_EVENT, 374 hci.HCI_LE_SCAN_REQUEST_RECEIVED_EVENT, 375 hci.HCI_LE_CONNECTIONLESS_IQ_REPORT_EVENT, 376 hci.HCI_LE_CONNECTION_IQ_REPORT_EVENT, 377 hci.HCI_LE_CTE_REQUEST_FAILED_EVENT, 378 hci.HCI_LE_PERIODIC_ADVERTISING_SYNC_TRANSFER_RECEIVED_EVENT, 379 hci.HCI_LE_CIS_ESTABLISHED_EVENT, 380 hci.HCI_LE_CIS_REQUEST_EVENT, 381 hci.HCI_LE_CREATE_BIG_COMPLETE_EVENT, 382 hci.HCI_LE_TERMINATE_BIG_COMPLETE_EVENT, 383 hci.HCI_LE_BIG_SYNC_ESTABLISHED_EVENT, 384 hci.HCI_LE_BIG_SYNC_LOST_EVENT, 385 hci.HCI_LE_REQUEST_PEER_SCA_COMPLETE_EVENT, 386 hci.HCI_LE_PATH_LOSS_THRESHOLD_EVENT, 387 hci.HCI_LE_TRANSMIT_POWER_REPORTING_EVENT, 388 hci.HCI_LE_BIGINFO_ADVERTISING_REPORT_EVENT, 389 hci.HCI_LE_SUBRATE_CHANGE_EVENT, 390 ] 391 ) 392 393 await self.send_command( 394 hci.HCI_LE_Set_Event_Mask_Command(le_event_mask=le_event_mask) 395 ) 396 397 if self.supports_command(hci.HCI_READ_BUFFER_SIZE_COMMAND): 398 response = await self.send_command( 399 hci.HCI_Read_Buffer_Size_Command(), check_result=True 400 ) 401 hc_acl_data_packet_length = ( 402 response.return_parameters.hc_acl_data_packet_length 403 ) 404 hc_total_num_acl_data_packets = ( 405 response.return_parameters.hc_total_num_acl_data_packets 406 ) 407 408 logger.debug( 409 'HCI ACL flow control: ' 410 f'hc_acl_data_packet_length={hc_acl_data_packet_length},' 411 f'hc_total_num_acl_data_packets={hc_total_num_acl_data_packets}' 412 ) 413 414 self.acl_packet_queue = AclPacketQueue( 415 max_packet_size=hc_acl_data_packet_length, 416 max_in_flight=hc_total_num_acl_data_packets, 417 send=self.send_hci_packet, 418 ) 419 420 hc_le_acl_data_packet_length = 0 421 hc_total_num_le_acl_data_packets = 0 422 if self.supports_command(hci.HCI_LE_READ_BUFFER_SIZE_COMMAND): 423 response = await self.send_command( 424 hci.HCI_LE_Read_Buffer_Size_Command(), check_result=True 425 ) 426 hc_le_acl_data_packet_length = ( 427 response.return_parameters.hc_le_acl_data_packet_length 428 ) 429 hc_total_num_le_acl_data_packets = ( 430 response.return_parameters.hc_total_num_le_acl_data_packets 431 ) 432 433 logger.debug( 434 'HCI LE ACL flow control: ' 435 f'hc_le_acl_data_packet_length={hc_le_acl_data_packet_length},' 436 f'hc_total_num_le_acl_data_packets={hc_total_num_le_acl_data_packets}' 437 ) 438 439 if hc_le_acl_data_packet_length == 0 or hc_total_num_le_acl_data_packets == 0: 440 # LE and Classic share the same queue 441 self.le_acl_packet_queue = self.acl_packet_queue 442 else: 443 # Create a separate queue for LE 444 self.le_acl_packet_queue = AclPacketQueue( 445 max_packet_size=hc_le_acl_data_packet_length, 446 max_in_flight=hc_total_num_le_acl_data_packets, 447 send=self.send_hci_packet, 448 ) 449 450 if self.supports_command( 451 hci.HCI_LE_READ_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND 452 ) and self.supports_command( 453 hci.HCI_LE_WRITE_SUGGESTED_DEFAULT_DATA_LENGTH_COMMAND 454 ): 455 response = await self.send_command( 456 hci.HCI_LE_Read_Suggested_Default_Data_Length_Command() 457 ) 458 suggested_max_tx_octets = response.return_parameters.suggested_max_tx_octets 459 suggested_max_tx_time = response.return_parameters.suggested_max_tx_time 460 if ( 461 suggested_max_tx_octets != self.suggested_max_tx_octets 462 or suggested_max_tx_time != self.suggested_max_tx_time 463 ): 464 await self.send_command( 465 hci.HCI_LE_Write_Suggested_Default_Data_Length_Command( 466 suggested_max_tx_octets=self.suggested_max_tx_octets, 467 suggested_max_tx_time=self.suggested_max_tx_time, 468 ) 469 ) 470 471 if self.supports_command( 472 hci.HCI_LE_READ_NUMBER_OF_SUPPORTED_ADVERTISING_SETS_COMMAND 473 ): 474 response = await self.send_command( 475 hci.HCI_LE_Read_Number_Of_Supported_Advertising_Sets_Command(), 476 check_result=True, 477 ) 478 self.number_of_supported_advertising_sets = ( 479 response.return_parameters.num_supported_advertising_sets 480 ) 481 482 if self.supports_command( 483 hci.HCI_LE_READ_MAXIMUM_ADVERTISING_DATA_LENGTH_COMMAND 484 ): 485 response = await self.send_command( 486 hci.HCI_LE_Read_Maximum_Advertising_Data_Length_Command(), 487 check_result=True, 488 ) 489 self.maximum_advertising_data_length = ( 490 response.return_parameters.max_advertising_data_length 491 ) 492 493 @property 494 def controller(self) -> Optional[TransportSink]: 495 return self.hci_sink 496 497 @controller.setter 498 def controller(self, controller) -> None: 499 self.set_packet_sink(controller) 500 if controller: 501 self.set_packet_source(controller) 502 503 def set_packet_sink(self, sink: Optional[TransportSink]) -> None: 504 self.hci_sink = sink 505 506 def set_packet_source(self, source: TransportSource) -> None: 507 source.set_packet_sink(self) 508 self.hci_metadata = getattr(source, 'metadata', self.hci_metadata) 509 510 def send_hci_packet(self, packet: hci.HCI_Packet) -> None: 511 logger.debug(f'{color("### HOST -> CONTROLLER", "blue")}: {packet}') 512 if self.snooper: 513 self.snooper.snoop(bytes(packet), Snooper.Direction.HOST_TO_CONTROLLER) 514 if self.hci_sink: 515 self.hci_sink.on_packet(bytes(packet)) 516 517 async def send_command(self, command, check_result=False): 518 # Wait until we can send (only one pending command at a time) 519 async with self.command_semaphore: 520 assert self.pending_command is None 521 assert self.pending_response is None 522 523 # Create a future value to hold the eventual response 524 self.pending_response = asyncio.get_running_loop().create_future() 525 self.pending_command = command 526 527 try: 528 self.send_hci_packet(command) 529 response = await self.pending_response 530 531 # Check the return parameters if required 532 if check_result: 533 if isinstance(response, hci.HCI_Command_Status_Event): 534 status = response.status 535 elif isinstance(response.return_parameters, int): 536 status = response.return_parameters 537 elif isinstance(response.return_parameters, bytes): 538 # return parameters first field is a one byte status code 539 status = response.return_parameters[0] 540 else: 541 status = response.return_parameters.status 542 543 if status != hci.HCI_SUCCESS: 544 logger.warning( 545 f'{command.name} failed ' 546 f'({hci.HCI_Constant.error_name(status)})' 547 ) 548 raise hci.HCI_Error(status) 549 550 return response 551 except Exception as error: 552 logger.warning( 553 f'{color("!!! Exception while sending command:", "red")} {error}' 554 ) 555 raise error 556 finally: 557 self.pending_command = None 558 self.pending_response = None 559 560 # Use this method to send a command from a task 561 def send_command_sync(self, command: hci.HCI_Command) -> None: 562 async def send_command(command: hci.HCI_Command) -> None: 563 await self.send_command(command) 564 565 asyncio.create_task(send_command(command)) 566 567 def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None: 568 if not (connection := self.connections.get(connection_handle)): 569 logger.warning(f'connection 0x{connection_handle:04X} not found') 570 return 571 packet_queue = connection.acl_packet_queue 572 if packet_queue is None: 573 logger.warning( 574 f'no ACL packet queue for connection 0x{connection_handle:04X}' 575 ) 576 return 577 578 # Create a PDU 579 l2cap_pdu = bytes(L2CAP_PDU(cid, pdu)) 580 581 # Send the data to the controller via ACL packets 582 bytes_remaining = len(l2cap_pdu) 583 offset = 0 584 pb_flag = 0 585 while bytes_remaining: 586 data_total_length = min(bytes_remaining, packet_queue.max_packet_size) 587 acl_packet = hci.HCI_AclDataPacket( 588 connection_handle=connection_handle, 589 pb_flag=pb_flag, 590 bc_flag=0, 591 data_total_length=data_total_length, 592 data=l2cap_pdu[offset : offset + data_total_length], 593 ) 594 logger.debug(f'>>> ACL packet enqueue: (CID={cid}) {acl_packet}') 595 packet_queue.enqueue(acl_packet) 596 pb_flag = 1 597 offset += data_total_length 598 bytes_remaining -= data_total_length 599 600 def supports_command(self, op_code: int) -> bool: 601 return ( 602 self.local_supported_commands 603 & hci.HCI_SUPPORTED_COMMANDS_MASKS.get(op_code, 0) 604 ) != 0 605 606 @property 607 def supported_commands(self) -> Set[int]: 608 return set( 609 op_code 610 for op_code, mask in hci.HCI_SUPPORTED_COMMANDS_MASKS.items() 611 if self.local_supported_commands & mask 612 ) 613 614 def supports_le_features(self, feature: hci.LeFeatureMask) -> bool: 615 return (self.local_le_features & feature) == feature 616 617 def supports_lmp_features(self, feature: hci.LmpFeatureMask) -> bool: 618 return self.local_lmp_features & (feature) == feature 619 620 @property 621 def supported_le_features(self): 622 return [ 623 feature for feature in range(64) if self.local_le_features & (1 << feature) 624 ] 625 626 # Packet Sink protocol (packets coming from the controller via HCI) 627 def on_packet(self, packet: bytes) -> None: 628 hci_packet = hci.HCI_Packet.from_bytes(packet) 629 if self.ready or ( 630 isinstance(hci_packet, hci.HCI_Command_Complete_Event) 631 and hci_packet.command_opcode == hci.HCI_RESET_COMMAND 632 ): 633 self.on_hci_packet(hci_packet) 634 else: 635 logger.debug('reset not done, ignoring packet from controller') 636 637 def on_transport_lost(self): 638 # Called by the source when the transport has been lost. 639 if self.pending_response: 640 self.pending_response.set_exception(TransportLostError('transport lost')) 641 642 self.emit('flush') 643 644 def on_hci_packet(self, packet: hci.HCI_Packet) -> None: 645 logger.debug(f'{color("### CONTROLLER -> HOST", "green")}: {packet}') 646 647 if self.snooper: 648 self.snooper.snoop(bytes(packet), Snooper.Direction.CONTROLLER_TO_HOST) 649 650 # If the packet is a command, invoke the handler for this packet 651 if packet.hci_packet_type == hci.HCI_COMMAND_PACKET: 652 self.on_hci_command_packet(cast(hci.HCI_Command, packet)) 653 elif packet.hci_packet_type == hci.HCI_EVENT_PACKET: 654 self.on_hci_event_packet(cast(hci.HCI_Event, packet)) 655 elif packet.hci_packet_type == hci.HCI_ACL_DATA_PACKET: 656 self.on_hci_acl_data_packet(cast(hci.HCI_AclDataPacket, packet)) 657 elif packet.hci_packet_type == hci.HCI_SYNCHRONOUS_DATA_PACKET: 658 self.on_hci_sco_data_packet(cast(hci.HCI_SynchronousDataPacket, packet)) 659 elif packet.hci_packet_type == hci.HCI_ISO_DATA_PACKET: 660 self.on_hci_iso_data_packet(cast(hci.HCI_IsoDataPacket, packet)) 661 else: 662 logger.warning(f'!!! unknown packet type {packet.hci_packet_type}') 663 664 def on_hci_command_packet(self, command: hci.HCI_Command) -> None: 665 logger.warning(f'!!! unexpected command packet: {command}') 666 667 def on_hci_event_packet(self, event: hci.HCI_Event) -> None: 668 handler_name = f'on_{event.name.lower()}' 669 handler = getattr(self, handler_name, self.on_hci_event) 670 handler(event) 671 672 def on_hci_acl_data_packet(self, packet: hci.HCI_AclDataPacket) -> None: 673 # Look for the connection to which this data belongs 674 if connection := self.connections.get(packet.connection_handle): 675 connection.on_hci_acl_data_packet(packet) 676 677 def on_hci_sco_data_packet(self, packet: hci.HCI_SynchronousDataPacket) -> None: 678 # Experimental 679 self.emit('sco_packet', packet.connection_handle, packet) 680 681 def on_hci_iso_data_packet(self, packet: hci.HCI_IsoDataPacket) -> None: 682 # Experimental 683 self.emit('iso_packet', packet.connection_handle, packet) 684 685 def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes) -> None: 686 self.emit('l2cap_pdu', connection.handle, cid, pdu) 687 688 def on_command_processed(self, event): 689 if self.pending_response: 690 # Check that it is what we were expecting 691 if self.pending_command.op_code != event.command_opcode: 692 logger.warning( 693 '!!! command result mismatch, expected ' 694 f'0x{self.pending_command.op_code:X} but got ' 695 f'0x{event.command_opcode:X}' 696 ) 697 698 self.pending_response.set_result(event) 699 else: 700 logger.warning('!!! no pending response future to set') 701 702 ############################################################ 703 # HCI handlers 704 ############################################################ 705 def on_hci_event(self, event): 706 logger.warning(f'{color(f"--- Ignoring event {event}", "red")}') 707 708 def on_hci_command_complete_event(self, event): 709 if event.command_opcode == 0: 710 # This is used just for the Num_HCI_Command_Packets field, not related to 711 # an actual command 712 logger.debug('no-command event') 713 return 714 715 return self.on_command_processed(event) 716 717 def on_hci_command_status_event(self, event): 718 return self.on_command_processed(event) 719 720 def on_hci_number_of_completed_packets_event(self, event): 721 for connection_handle, num_completed_packets in zip( 722 event.connection_handles, event.num_completed_packets 723 ): 724 if connection := self.connections.get(connection_handle): 725 connection.acl_packet_queue.on_packets_completed(num_completed_packets) 726 elif not ( 727 self.cis_links.get(connection_handle) 728 or self.sco_links.get(connection_handle) 729 ): 730 logger.warning( 731 'received packet completion event for unknown handle ' 732 f'0x{connection_handle:04X}' 733 ) 734 735 # Classic only 736 def on_hci_connection_request_event(self, event): 737 # Notify the listeners 738 self.emit( 739 'connection_request', 740 event.bd_addr, 741 event.class_of_device, 742 event.link_type, 743 ) 744 745 def on_hci_le_connection_complete_event(self, event): 746 # Check if this is a cancellation 747 if event.status == hci.HCI_SUCCESS: 748 # Create/update the connection 749 logger.debug( 750 f'### LE CONNECTION: [0x{event.connection_handle:04X}] ' 751 f'{event.peer_address} as {hci.HCI_Constant.role_name(event.role)}' 752 ) 753 754 connection = self.connections.get(event.connection_handle) 755 if connection is None: 756 connection = Connection( 757 self, 758 event.connection_handle, 759 event.peer_address, 760 BT_LE_TRANSPORT, 761 ) 762 self.connections[event.connection_handle] = connection 763 764 # Notify the client 765 connection_parameters = ConnectionParameters( 766 event.connection_interval, 767 event.peripheral_latency, 768 event.supervision_timeout, 769 ) 770 self.emit( 771 'connection', 772 event.connection_handle, 773 BT_LE_TRANSPORT, 774 event.peer_address, 775 event.role, 776 connection_parameters, 777 ) 778 else: 779 logger.debug(f'### CONNECTION FAILED: {event.status}') 780 781 # Notify the listeners 782 self.emit( 783 'connection_failure', BT_LE_TRANSPORT, event.peer_address, event.status 784 ) 785 786 def on_hci_le_enhanced_connection_complete_event(self, event): 787 # Just use the same implementation as for the non-enhanced event for now 788 self.on_hci_le_connection_complete_event(event) 789 790 def on_hci_connection_complete_event(self, event): 791 if event.status == hci.HCI_SUCCESS: 792 # Create/update the connection 793 logger.debug( 794 f'### BR/EDR CONNECTION: [0x{event.connection_handle:04X}] ' 795 f'{event.bd_addr}' 796 ) 797 798 connection = self.connections.get(event.connection_handle) 799 if connection is None: 800 connection = Connection( 801 self, 802 event.connection_handle, 803 event.bd_addr, 804 BT_BR_EDR_TRANSPORT, 805 ) 806 self.connections[event.connection_handle] = connection 807 808 # Notify the client 809 self.emit( 810 'connection', 811 event.connection_handle, 812 BT_BR_EDR_TRANSPORT, 813 event.bd_addr, 814 None, 815 None, 816 ) 817 else: 818 logger.debug(f'### BR/EDR CONNECTION FAILED: {event.status}') 819 820 # Notify the client 821 self.emit( 822 'connection_failure', BT_BR_EDR_TRANSPORT, event.bd_addr, event.status 823 ) 824 825 def on_hci_disconnection_complete_event(self, event): 826 # Find the connection 827 handle = event.connection_handle 828 if ( 829 connection := ( 830 self.connections.get(handle) 831 or self.cis_links.get(handle) 832 or self.sco_links.get(handle) 833 ) 834 ) is None: 835 logger.warning('!!! DISCONNECTION COMPLETE: unknown handle') 836 return 837 838 if event.status == hci.HCI_SUCCESS: 839 logger.debug( 840 f'### DISCONNECTION: [0x{handle:04X}] ' 841 f'{connection.peer_address} ' 842 f'reason={event.reason}' 843 ) 844 845 # Notify the listeners 846 self.emit('disconnection', handle, event.reason) 847 848 # Remove the handle reference 849 _ = ( 850 self.connections.pop(handle, 0) 851 or self.cis_links.pop(handle, 0) 852 or self.sco_links.pop(handle, 0) 853 ) 854 else: 855 logger.debug(f'### DISCONNECTION FAILED: {event.status}') 856 857 # Notify the listeners 858 self.emit('disconnection_failure', handle, event.status) 859 860 def on_hci_le_connection_update_complete_event(self, event): 861 if (connection := self.connections.get(event.connection_handle)) is None: 862 logger.warning('!!! CONNECTION PARAMETERS UPDATE COMPLETE: unknown handle') 863 return 864 865 # Notify the client 866 if event.status == hci.HCI_SUCCESS: 867 connection_parameters = ConnectionParameters( 868 event.connection_interval, 869 event.peripheral_latency, 870 event.supervision_timeout, 871 ) 872 self.emit( 873 'connection_parameters_update', connection.handle, connection_parameters 874 ) 875 else: 876 self.emit( 877 'connection_parameters_update_failure', connection.handle, event.status 878 ) 879 880 def on_hci_le_phy_update_complete_event(self, event): 881 if (connection := self.connections.get(event.connection_handle)) is None: 882 logger.warning('!!! CONNECTION PHY UPDATE COMPLETE: unknown handle') 883 return 884 885 # Notify the client 886 if event.status == hci.HCI_SUCCESS: 887 connection_phy = ConnectionPHY(event.tx_phy, event.rx_phy) 888 self.emit('connection_phy_update', connection.handle, connection_phy) 889 else: 890 self.emit('connection_phy_update_failure', connection.handle, event.status) 891 892 def on_hci_le_advertising_report_event(self, event): 893 for report in event.reports: 894 self.emit('advertising_report', report) 895 896 def on_hci_le_extended_advertising_report_event(self, event): 897 self.on_hci_le_advertising_report_event(event) 898 899 def on_hci_le_advertising_set_terminated_event(self, event): 900 self.emit( 901 'advertising_set_termination', 902 event.status, 903 event.advertising_handle, 904 event.connection_handle, 905 event.num_completed_extended_advertising_events, 906 ) 907 908 def on_hci_le_cis_request_event(self, event): 909 self.emit( 910 'cis_request', 911 event.acl_connection_handle, 912 event.cis_connection_handle, 913 event.cig_id, 914 event.cis_id, 915 ) 916 917 def on_hci_le_cis_established_event(self, event): 918 # The remaining parameters are unused for now. 919 if event.status == hci.HCI_SUCCESS: 920 self.cis_links[event.connection_handle] = CisLink( 921 handle=event.connection_handle, 922 peer_address=hci.Address.ANY, 923 ) 924 self.emit('cis_establishment', event.connection_handle) 925 else: 926 self.emit( 927 'cis_establishment_failure', event.connection_handle, event.status 928 ) 929 930 def on_hci_le_remote_connection_parameter_request_event(self, event): 931 if event.connection_handle not in self.connections: 932 logger.warning('!!! REMOTE CONNECTION PARAMETER REQUEST: unknown handle') 933 return 934 935 # For now, just accept everything 936 # TODO: delegate the decision 937 self.send_command_sync( 938 hci.HCI_LE_Remote_Connection_Parameter_Request_Reply_Command( 939 connection_handle=event.connection_handle, 940 interval_min=event.interval_min, 941 interval_max=event.interval_max, 942 max_latency=event.max_latency, 943 timeout=event.timeout, 944 min_ce_length=0, 945 max_ce_length=0, 946 ) 947 ) 948 949 def on_hci_le_long_term_key_request_event(self, event): 950 if (connection := self.connections.get(event.connection_handle)) is None: 951 logger.warning('!!! LE LONG TERM KEY REQUEST: unknown handle') 952 return 953 954 async def send_long_term_key(): 955 if self.long_term_key_provider is None: 956 logger.debug('no long term key provider') 957 long_term_key = None 958 else: 959 long_term_key = await self.abort_on( 960 'flush', 961 # pylint: disable-next=not-callable 962 self.long_term_key_provider( 963 connection.handle, 964 event.random_number, 965 event.encryption_diversifier, 966 ), 967 ) 968 if long_term_key: 969 response = hci.HCI_LE_Long_Term_Key_Request_Reply_Command( 970 connection_handle=event.connection_handle, 971 long_term_key=long_term_key, 972 ) 973 else: 974 response = hci.HCI_LE_Long_Term_Key_Request_Negative_Reply_Command( 975 connection_handle=event.connection_handle 976 ) 977 978 await self.send_command(response) 979 980 asyncio.create_task(send_long_term_key()) 981 982 def on_hci_synchronous_connection_complete_event(self, event): 983 if event.status == hci.HCI_SUCCESS: 984 # Create/update the connection 985 logger.debug( 986 f'### SCO CONNECTION: [0x{event.connection_handle:04X}] ' 987 f'{event.bd_addr}' 988 ) 989 990 self.sco_links[event.connection_handle] = ScoLink( 991 peer_address=event.bd_addr, 992 handle=event.connection_handle, 993 ) 994 995 # Notify the client 996 self.emit( 997 'sco_connection', 998 event.bd_addr, 999 event.connection_handle, 1000 event.link_type, 1001 ) 1002 else: 1003 logger.debug(f'### SCO CONNECTION FAILED: {event.status}') 1004 1005 # Notify the client 1006 self.emit('sco_connection_failure', event.bd_addr, event.status) 1007 1008 def on_hci_synchronous_connection_changed_event(self, event): 1009 pass 1010 1011 def on_hci_role_change_event(self, event): 1012 if event.status == hci.HCI_SUCCESS: 1013 logger.debug( 1014 f'role change for {event.bd_addr}: ' 1015 f'{hci.HCI_Constant.role_name(event.new_role)}' 1016 ) 1017 self.emit('role_change', event.bd_addr, event.new_role) 1018 else: 1019 logger.debug( 1020 f'role change for {event.bd_addr} failed: ' 1021 f'{hci.HCI_Constant.error_name(event.status)}' 1022 ) 1023 self.emit('role_change_failure', event.bd_addr, event.status) 1024 1025 def on_hci_le_data_length_change_event(self, event): 1026 self.emit( 1027 'connection_data_length_change', 1028 event.connection_handle, 1029 event.max_tx_octets, 1030 event.max_tx_time, 1031 event.max_rx_octets, 1032 event.max_rx_time, 1033 ) 1034 1035 def on_hci_authentication_complete_event(self, event): 1036 # Notify the client 1037 if event.status == hci.HCI_SUCCESS: 1038 self.emit('connection_authentication', event.connection_handle) 1039 else: 1040 self.emit( 1041 'connection_authentication_failure', 1042 event.connection_handle, 1043 event.status, 1044 ) 1045 1046 def on_hci_encryption_change_event(self, event): 1047 # Notify the client 1048 if event.status == hci.HCI_SUCCESS: 1049 self.emit( 1050 'connection_encryption_change', 1051 event.connection_handle, 1052 event.encryption_enabled, 1053 ) 1054 else: 1055 self.emit( 1056 'connection_encryption_failure', event.connection_handle, event.status 1057 ) 1058 1059 def on_hci_encryption_key_refresh_complete_event(self, event): 1060 # Notify the client 1061 if event.status == hci.HCI_SUCCESS: 1062 self.emit('connection_encryption_key_refresh', event.connection_handle) 1063 else: 1064 self.emit( 1065 'connection_encryption_key_refresh_failure', 1066 event.connection_handle, 1067 event.status, 1068 ) 1069 1070 def on_hci_link_supervision_timeout_changed_event(self, event): 1071 pass 1072 1073 def on_hci_max_slots_change_event(self, event): 1074 pass 1075 1076 def on_hci_page_scan_repetition_mode_change_event(self, event): 1077 pass 1078 1079 def on_hci_link_key_notification_event(self, event): 1080 logger.debug( 1081 f'link key for {event.bd_addr}: {event.link_key.hex()}, ' 1082 f'type={hci.HCI_Constant.link_key_type_name(event.key_type)}' 1083 ) 1084 self.emit('link_key', event.bd_addr, event.link_key, event.key_type) 1085 1086 def on_hci_simple_pairing_complete_event(self, event): 1087 logger.debug( 1088 f'simple pairing complete for {event.bd_addr}: ' 1089 f'status={hci.HCI_Constant.status_name(event.status)}' 1090 ) 1091 if event.status == hci.HCI_SUCCESS: 1092 self.emit('classic_pairing', event.bd_addr) 1093 else: 1094 self.emit('classic_pairing_failure', event.bd_addr, event.status) 1095 1096 def on_hci_pin_code_request_event(self, event): 1097 self.emit('pin_code_request', event.bd_addr) 1098 1099 def on_hci_link_key_request_event(self, event): 1100 async def send_link_key(): 1101 if self.link_key_provider is None: 1102 logger.debug('no link key provider') 1103 link_key = None 1104 else: 1105 link_key = await self.abort_on( 1106 'flush', 1107 # pylint: disable-next=not-callable 1108 self.link_key_provider(event.bd_addr), 1109 ) 1110 if link_key: 1111 response = hci.HCI_Link_Key_Request_Reply_Command( 1112 bd_addr=event.bd_addr, link_key=link_key 1113 ) 1114 else: 1115 response = hci.HCI_Link_Key_Request_Negative_Reply_Command( 1116 bd_addr=event.bd_addr 1117 ) 1118 1119 await self.send_command(response) 1120 1121 asyncio.create_task(send_link_key()) 1122 1123 def on_hci_io_capability_request_event(self, event): 1124 self.emit('authentication_io_capability_request', event.bd_addr) 1125 1126 def on_hci_io_capability_response_event(self, event): 1127 self.emit( 1128 'authentication_io_capability_response', 1129 event.bd_addr, 1130 event.io_capability, 1131 event.authentication_requirements, 1132 ) 1133 1134 def on_hci_user_confirmation_request_event(self, event): 1135 self.emit( 1136 'authentication_user_confirmation_request', 1137 event.bd_addr, 1138 event.numeric_value, 1139 ) 1140 1141 def on_hci_user_passkey_request_event(self, event): 1142 self.emit('authentication_user_passkey_request', event.bd_addr) 1143 1144 def on_hci_user_passkey_notification_event(self, event): 1145 self.emit( 1146 'authentication_user_passkey_notification', event.bd_addr, event.passkey 1147 ) 1148 1149 def on_hci_inquiry_complete_event(self, _event): 1150 self.emit('inquiry_complete') 1151 1152 def on_hci_inquiry_result_with_rssi_event(self, event): 1153 for response in event.responses: 1154 self.emit( 1155 'inquiry_result', 1156 response.bd_addr, 1157 response.class_of_device, 1158 b'', 1159 response.rssi, 1160 ) 1161 1162 def on_hci_extended_inquiry_result_event(self, event): 1163 self.emit( 1164 'inquiry_result', 1165 event.bd_addr, 1166 event.class_of_device, 1167 event.extended_inquiry_response, 1168 event.rssi, 1169 ) 1170 1171 def on_hci_remote_name_request_complete_event(self, event): 1172 if event.status != hci.HCI_SUCCESS: 1173 self.emit('remote_name_failure', event.bd_addr, event.status) 1174 else: 1175 utf8_name = event.remote_name 1176 terminator = utf8_name.find(0) 1177 if terminator >= 0: 1178 utf8_name = utf8_name[0:terminator] 1179 1180 self.emit('remote_name', event.bd_addr, utf8_name) 1181 1182 def on_hci_remote_host_supported_features_notification_event(self, event): 1183 self.emit( 1184 'remote_host_supported_features', 1185 event.bd_addr, 1186 event.host_supported_features, 1187 ) 1188 1189 def on_hci_le_read_remote_features_complete_event(self, event): 1190 if event.status != hci.HCI_SUCCESS: 1191 self.emit( 1192 'le_remote_features_failure', event.connection_handle, event.status 1193 ) 1194 else: 1195 self.emit( 1196 'le_remote_features', 1197 event.connection_handle, 1198 int.from_bytes(event.le_features, 'little'), 1199 ) 1200