1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import logging 19import asyncio 20import itertools 21import random 22import struct 23from bumble.colors import color 24from bumble.core import ( 25 BT_CENTRAL_ROLE, 26 BT_PERIPHERAL_ROLE, 27 BT_LE_TRANSPORT, 28 BT_BR_EDR_TRANSPORT, 29) 30 31from bumble.hci import ( 32 HCI_ACL_DATA_PACKET, 33 HCI_COMMAND_DISALLOWED_ERROR, 34 HCI_COMMAND_PACKET, 35 HCI_COMMAND_STATUS_PENDING, 36 HCI_CONNECTION_TIMEOUT_ERROR, 37 HCI_CONTROLLER_BUSY_ERROR, 38 HCI_EVENT_PACKET, 39 HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, 40 HCI_LE_1M_PHY, 41 HCI_SUCCESS, 42 HCI_UNKNOWN_HCI_COMMAND_ERROR, 43 HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 44 HCI_VERSION_BLUETOOTH_CORE_5_0, 45 Address, 46 HCI_AclDataPacket, 47 HCI_AclDataPacketAssembler, 48 HCI_Command_Complete_Event, 49 HCI_Command_Status_Event, 50 HCI_Connection_Complete_Event, 51 HCI_Connection_Request_Event, 52 HCI_Disconnection_Complete_Event, 53 HCI_Encryption_Change_Event, 54 HCI_LE_Advertising_Report_Event, 55 HCI_LE_Connection_Complete_Event, 56 HCI_LE_Read_Remote_Features_Complete_Event, 57 HCI_Number_Of_Completed_Packets_Event, 58 HCI_Packet, 59 HCI_Role_Change_Event, 60) 61from typing import Optional, Union, Dict 62 63 64# ----------------------------------------------------------------------------- 65# Logging 66# ----------------------------------------------------------------------------- 67logger = logging.getLogger(__name__) 68 69 70# ----------------------------------------------------------------------------- 71# Utils 72# ----------------------------------------------------------------------------- 73class DataObject: 74 pass 75 76 77# ----------------------------------------------------------------------------- 78class Connection: 79 def __init__(self, controller, handle, role, peer_address, link, transport): 80 self.controller = controller 81 self.handle = handle 82 self.role = role 83 self.peer_address = peer_address 84 self.link = link 85 self.assembler = HCI_AclDataPacketAssembler(self.on_acl_pdu) 86 self.transport = transport 87 88 def on_hci_acl_data_packet(self, packet): 89 self.assembler.feed_packet(packet) 90 self.controller.send_hci_packet( 91 HCI_Number_Of_Completed_Packets_Event([(self.handle, 1)]) 92 ) 93 94 def on_acl_pdu(self, data): 95 if self.link: 96 self.link.send_acl_data( 97 self.controller, self.peer_address, self.transport, data 98 ) 99 100 101# ----------------------------------------------------------------------------- 102class Controller: 103 def __init__( 104 self, 105 name, 106 host_source=None, 107 host_sink=None, 108 link=None, 109 public_address: Optional[Union[bytes, str, Address]] = None, 110 ): 111 self.name = name 112 self.hci_sink = None 113 self.link = link 114 115 self.central_connections: Dict[ 116 Address, Connection 117 ] = {} # Connections where this controller is the central 118 self.peripheral_connections: Dict[ 119 Address, Connection 120 ] = {} # Connections where this controller is the peripheral 121 self.classic_connections: Dict[ 122 Address, Connection 123 ] = {} # Connections in BR/EDR 124 125 self.hci_version = HCI_VERSION_BLUETOOTH_CORE_5_0 126 self.hci_revision = 0 127 self.lmp_version = HCI_VERSION_BLUETOOTH_CORE_5_0 128 self.lmp_subversion = 0 129 self.lmp_features = bytes.fromhex( 130 '0000000060000000' 131 ) # BR/EDR Not Supported, LE Supported (Controller) 132 self.manufacturer_name = 0xFFFF 133 self.hc_le_data_packet_length = 27 134 self.hc_total_num_le_data_packets = 64 135 self.event_mask = 0 136 self.event_mask_page_2 = 0 137 self.supported_commands = bytes.fromhex( 138 '2000800000c000000000e40000002822000000000000040000f7ffff7f000000' 139 '30f0f9ff01008004000000000000000000000000000000000000000000000000' 140 ) 141 self.le_event_mask = 0 142 self.advertising_parameters = None 143 self.le_features = bytes.fromhex('ff49010000000000') 144 self.le_states = bytes.fromhex('ffff3fffff030000') 145 self.advertising_channel_tx_power = 0 146 self.filter_accept_list_size = 8 147 self.filter_duplicates = False 148 self.resolving_list_size = 8 149 self.supported_max_tx_octets = 27 150 self.supported_max_tx_time = 10000 # microseconds 151 self.supported_max_rx_octets = 27 152 self.supported_max_rx_time = 10000 # microseconds 153 self.suggested_max_tx_octets = 27 154 self.suggested_max_tx_time = 0x0148 # microseconds 155 self.default_phy = bytes([0, 0, 0]) 156 self.le_scan_type = 0 157 self.le_scan_interval = 0x10 158 self.le_scan_window = 0x10 159 self.le_scan_enable = 0 160 self.le_scan_own_address_type = Address.RANDOM_DEVICE_ADDRESS 161 self.le_scanning_filter_policy = 0 162 self.le_scan_response_data = None 163 self.le_address_resolution = False 164 self.le_rpa_timeout = 0 165 self.sync_flow_control = False 166 self.local_name = 'Bumble' 167 168 self.advertising_interval = 2000 # Fixed for now 169 self.advertising_data = None 170 self.advertising_timer_handle = None 171 172 self._random_address = Address('00:00:00:00:00:00') 173 if isinstance(public_address, Address): 174 self._public_address = public_address 175 elif public_address is not None: 176 self._public_address = Address( 177 public_address, Address.PUBLIC_DEVICE_ADDRESS 178 ) 179 else: 180 self._public_address = Address('00:00:00:00:00:00') 181 182 # Set the source and sink interfaces 183 if host_source: 184 host_source.set_packet_sink(self) 185 self.host = host_sink 186 187 # Add this controller to the link if specified 188 if link: 189 link.add_controller(self) 190 191 @property 192 def host(self): 193 return self.hci_sink 194 195 @host.setter 196 def host(self, host): 197 ''' 198 Sets the host (sink) for this controller, and set this controller as the 199 controller (sink) for the host 200 ''' 201 self.set_packet_sink(host) 202 if host: 203 host.controller = self 204 205 def set_packet_sink(self, sink): 206 ''' 207 Method from the Packet Source interface 208 ''' 209 self.hci_sink = sink 210 211 @property 212 def public_address(self): 213 return self._public_address 214 215 @public_address.setter 216 def public_address(self, address): 217 if isinstance(address, str): 218 address = Address(address) 219 self._public_address = address 220 221 @property 222 def random_address(self): 223 return self._random_address 224 225 @random_address.setter 226 def random_address(self, address): 227 if isinstance(address, str): 228 address = Address(address) 229 self._random_address = address 230 logger.debug(f'new random address: {address}') 231 232 if self.link: 233 self.link.on_address_changed(self) 234 235 # Packet Sink protocol (packets coming from the host via HCI) 236 def on_packet(self, packet): 237 self.on_hci_packet(HCI_Packet.from_bytes(packet)) 238 239 def on_hci_packet(self, packet): 240 logger.debug( 241 f'{color("<<<", "blue")} [{self.name}] ' 242 f'{color("HOST -> CONTROLLER", "blue")}: {packet}' 243 ) 244 245 # If the packet is a command, invoke the handler for this packet 246 if packet.hci_packet_type == HCI_COMMAND_PACKET: 247 self.on_hci_command_packet(packet) 248 elif packet.hci_packet_type == HCI_EVENT_PACKET: 249 self.on_hci_event_packet(packet) 250 elif packet.hci_packet_type == HCI_ACL_DATA_PACKET: 251 self.on_hci_acl_data_packet(packet) 252 else: 253 logger.warning(f'!!! unknown packet type {packet.hci_packet_type}') 254 255 def on_hci_command_packet(self, command): 256 handler_name = f'on_{command.name.lower()}' 257 handler = getattr(self, handler_name, self.on_hci_command) 258 result = handler(command) 259 if isinstance(result, bytes): 260 self.send_hci_packet( 261 HCI_Command_Complete_Event( 262 num_hci_command_packets=1, 263 command_opcode=command.op_code, 264 return_parameters=result, 265 ) 266 ) 267 268 def on_hci_event_packet(self, _event): 269 logger.warning('!!! unexpected event packet') 270 271 def on_hci_acl_data_packet(self, packet): 272 # Look for the connection to which this data belongs 273 connection = self.find_connection_by_handle(packet.connection_handle) 274 if connection is None: 275 logger.warning( 276 f'!!! no connection for handle 0x{packet.connection_handle:04X}' 277 ) 278 return 279 280 # Pass the packet to the connection 281 connection.on_hci_acl_data_packet(packet) 282 283 def send_hci_packet(self, packet): 284 logger.debug( 285 f'{color(">>>", "green")} [{self.name}] ' 286 f'{color("CONTROLLER -> HOST", "green")}: {packet}' 287 ) 288 if self.host: 289 self.host.on_packet(packet.to_bytes()) 290 291 # This method allow the controller to emulate the same API as a transport source 292 async def wait_for_termination(self): 293 # For now, just wait forever 294 await asyncio.get_running_loop().create_future() 295 296 ############################################################ 297 # Link connections 298 ############################################################ 299 def allocate_connection_handle(self): 300 handle = 0 301 max_handle = 0 302 for connection in itertools.chain( 303 self.central_connections.values(), 304 self.peripheral_connections.values(), 305 self.classic_connections.values(), 306 ): 307 max_handle = max(max_handle, connection.handle) 308 if connection.handle == handle: 309 # Already used, continue searching after the current max 310 handle = max_handle + 1 311 return handle 312 313 def find_le_connection_by_address(self, address): 314 return self.central_connections.get(address) or self.peripheral_connections.get( 315 address 316 ) 317 318 def find_classic_connection_by_address(self, address): 319 return self.classic_connections.get(address) 320 321 def find_connection_by_handle(self, handle): 322 for connection in itertools.chain( 323 self.central_connections.values(), 324 self.peripheral_connections.values(), 325 self.classic_connections.values(), 326 ): 327 if connection.handle == handle: 328 return connection 329 return None 330 331 def find_central_connection_by_handle(self, handle): 332 for connection in self.central_connections.values(): 333 if connection.handle == handle: 334 return connection 335 return None 336 337 def find_classic_connection_by_handle(self, handle): 338 for connection in self.classic_connections.values(): 339 if connection.handle == handle: 340 return connection 341 return None 342 343 def on_link_central_connected(self, central_address): 344 ''' 345 Called when an incoming connection occurs from a central on the link 346 ''' 347 348 # Allocate (or reuse) a connection handle 349 peer_address = central_address 350 peer_address_type = central_address.address_type 351 connection = self.peripheral_connections.get(peer_address) 352 if connection is None: 353 connection_handle = self.allocate_connection_handle() 354 connection = Connection( 355 self, 356 connection_handle, 357 BT_PERIPHERAL_ROLE, 358 peer_address, 359 self.link, 360 BT_LE_TRANSPORT, 361 ) 362 self.peripheral_connections[peer_address] = connection 363 logger.debug(f'New PERIPHERAL connection handle: 0x{connection_handle:04X}') 364 365 # Then say that the connection has completed 366 self.send_hci_packet( 367 HCI_LE_Connection_Complete_Event( 368 status=HCI_SUCCESS, 369 connection_handle=connection.handle, 370 role=connection.role, 371 peer_address_type=peer_address_type, 372 peer_address=peer_address, 373 connection_interval=10, # FIXME 374 peripheral_latency=0, # FIXME 375 supervision_timeout=10, # FIXME 376 central_clock_accuracy=7, # FIXME 377 ) 378 ) 379 380 def on_link_central_disconnected(self, peer_address, reason): 381 ''' 382 Called when an active disconnection occurs from a peer 383 ''' 384 385 # Send a disconnection complete event 386 if connection := self.peripheral_connections.get(peer_address): 387 self.send_hci_packet( 388 HCI_Disconnection_Complete_Event( 389 status=HCI_SUCCESS, 390 connection_handle=connection.handle, 391 reason=reason, 392 ) 393 ) 394 395 # Remove the connection 396 del self.peripheral_connections[peer_address] 397 else: 398 logger.warning(f'!!! No peripheral connection found for {peer_address}') 399 400 def on_link_peripheral_connection_complete( 401 self, le_create_connection_command, status 402 ): 403 ''' 404 Called by the link when a connection has been made or has failed to be made 405 ''' 406 407 if status == HCI_SUCCESS: 408 # Allocate (or reuse) a connection handle 409 peer_address = le_create_connection_command.peer_address 410 connection = self.central_connections.get(peer_address) 411 if connection is None: 412 connection_handle = self.allocate_connection_handle() 413 connection = Connection( 414 self, 415 connection_handle, 416 BT_CENTRAL_ROLE, 417 peer_address, 418 self.link, 419 BT_LE_TRANSPORT, 420 ) 421 self.central_connections[peer_address] = connection 422 logger.debug( 423 f'New CENTRAL connection handle: 0x{connection_handle:04X}' 424 ) 425 else: 426 connection = None 427 428 # Say that the connection has completed 429 self.send_hci_packet( 430 # pylint: disable=line-too-long 431 HCI_LE_Connection_Complete_Event( 432 status=status, 433 connection_handle=connection.handle if connection else 0, 434 role=BT_CENTRAL_ROLE, 435 peer_address_type=le_create_connection_command.peer_address_type, 436 peer_address=le_create_connection_command.peer_address, 437 connection_interval=le_create_connection_command.connection_interval_min, 438 peripheral_latency=le_create_connection_command.max_latency, 439 supervision_timeout=le_create_connection_command.supervision_timeout, 440 central_clock_accuracy=0, 441 ) 442 ) 443 444 def on_link_peripheral_disconnection_complete(self, disconnection_command, status): 445 ''' 446 Called when a disconnection has been completed 447 ''' 448 449 # Send a disconnection complete event 450 self.send_hci_packet( 451 HCI_Disconnection_Complete_Event( 452 status=status, 453 connection_handle=disconnection_command.connection_handle, 454 reason=disconnection_command.reason, 455 ) 456 ) 457 458 # Remove the connection 459 if connection := self.find_central_connection_by_handle( 460 disconnection_command.connection_handle 461 ): 462 logger.debug(f'CENTRAL Connection removed: {connection}') 463 del self.central_connections[connection.peer_address] 464 465 def on_link_peripheral_disconnected(self, peer_address): 466 ''' 467 Called when a connection to a peripheral is broken 468 ''' 469 470 # Send a disconnection complete event 471 if connection := self.central_connections.get(peer_address): 472 self.send_hci_packet( 473 HCI_Disconnection_Complete_Event( 474 status=HCI_SUCCESS, 475 connection_handle=connection.handle, 476 reason=HCI_CONNECTION_TIMEOUT_ERROR, 477 ) 478 ) 479 480 # Remove the connection 481 del self.central_connections[peer_address] 482 else: 483 logger.warning(f'!!! No central connection found for {peer_address}') 484 485 def on_link_encrypted(self, peer_address, _rand, _ediv, _ltk): 486 # For now, just setup the encryption without asking the host 487 if connection := self.find_le_connection_by_address(peer_address): 488 self.send_hci_packet( 489 HCI_Encryption_Change_Event( 490 status=0, connection_handle=connection.handle, encryption_enabled=1 491 ) 492 ) 493 494 def on_link_acl_data(self, sender_address, transport, data): 495 # Look for the connection to which this data belongs 496 if transport == BT_LE_TRANSPORT: 497 connection = self.find_le_connection_by_address(sender_address) 498 else: 499 connection = self.find_classic_connection_by_address(sender_address) 500 if connection is None: 501 logger.warning(f'!!! no connection for {sender_address}') 502 return 503 504 # Send the data to the host 505 # TODO: should fragment 506 acl_packet = HCI_AclDataPacket(connection.handle, 2, 0, len(data), data) 507 self.send_hci_packet(acl_packet) 508 509 def on_link_advertising_data(self, sender_address, data): 510 # Ignore if we're not scanning 511 if self.le_scan_enable == 0: 512 return 513 514 # Send a scan report 515 report = HCI_LE_Advertising_Report_Event.Report( 516 HCI_LE_Advertising_Report_Event.Report.FIELDS, 517 event_type=HCI_LE_Advertising_Report_Event.ADV_IND, 518 address_type=sender_address.address_type, 519 address=sender_address, 520 data=data, 521 rssi=-50, 522 ) 523 self.send_hci_packet(HCI_LE_Advertising_Report_Event([report])) 524 525 # Simulate a scan response 526 report = HCI_LE_Advertising_Report_Event.Report( 527 HCI_LE_Advertising_Report_Event.Report.FIELDS, 528 event_type=HCI_LE_Advertising_Report_Event.SCAN_RSP, 529 address_type=sender_address.address_type, 530 address=sender_address, 531 data=data, 532 rssi=-50, 533 ) 534 self.send_hci_packet(HCI_LE_Advertising_Report_Event([report])) 535 536 ############################################################ 537 # Classic link connections 538 ############################################################ 539 540 def on_classic_connection_request(self, peer_address, link_type): 541 self.send_hci_packet( 542 HCI_Connection_Request_Event( 543 bd_addr=peer_address, 544 class_of_device=0, 545 link_type=link_type, 546 ) 547 ) 548 549 def on_classic_connection_complete(self, peer_address, status): 550 if status == HCI_SUCCESS: 551 # Allocate (or reuse) a connection handle 552 peer_address = peer_address 553 connection = self.classic_connections.get(peer_address) 554 if connection is None: 555 connection_handle = self.allocate_connection_handle() 556 connection = Connection( 557 controller=self, 558 handle=connection_handle, 559 # Role doesn't matter in Classic because they are managed by HCI_Role_Change and HCI_Role_Discovery 560 role=BT_CENTRAL_ROLE, 561 peer_address=peer_address, 562 link=self.link, 563 transport=BT_BR_EDR_TRANSPORT, 564 ) 565 self.classic_connections[peer_address] = connection 566 logger.debug( 567 f'New CLASSIC connection handle: 0x{connection_handle:04X}' 568 ) 569 else: 570 connection_handle = connection.handle 571 self.send_hci_packet( 572 HCI_Connection_Complete_Event( 573 status=status, 574 connection_handle=connection_handle, 575 bd_addr=peer_address, 576 encryption_enabled=False, 577 link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, 578 ) 579 ) 580 else: 581 connection = None 582 self.send_hci_packet( 583 HCI_Connection_Complete_Event( 584 status=status, 585 connection_handle=0, 586 bd_addr=peer_address, 587 encryption_enabled=False, 588 link_type=HCI_Connection_Complete_Event.ACL_LINK_TYPE, 589 ) 590 ) 591 592 def on_classic_disconnected(self, peer_address, reason): 593 # Send a disconnection complete event 594 if connection := self.classic_connections.get(peer_address): 595 self.send_hci_packet( 596 HCI_Disconnection_Complete_Event( 597 status=HCI_SUCCESS, 598 connection_handle=connection.handle, 599 reason=reason, 600 ) 601 ) 602 603 # Remove the connection 604 del self.classic_connections[peer_address] 605 else: 606 logger.warning(f'!!! No classic connection found for {peer_address}') 607 608 def on_classic_role_change(self, peer_address, new_role): 609 self.send_hci_packet( 610 HCI_Role_Change_Event( 611 status=HCI_SUCCESS, 612 bd_addr=peer_address, 613 new_role=new_role, 614 ) 615 ) 616 617 ############################################################ 618 # Advertising support 619 ############################################################ 620 def on_advertising_timer_fired(self): 621 self.send_advertising_data() 622 self.advertising_timer_handle = asyncio.get_running_loop().call_later( 623 self.advertising_interval / 1000.0, self.on_advertising_timer_fired 624 ) 625 626 def start_advertising(self): 627 # Stop any ongoing advertising before we start again 628 self.stop_advertising() 629 630 # Advertise now 631 self.advertising_timer_handle = asyncio.get_running_loop().call_soon( 632 self.on_advertising_timer_fired 633 ) 634 635 def stop_advertising(self): 636 if self.advertising_timer_handle is not None: 637 self.advertising_timer_handle.cancel() 638 self.advertising_timer_handle = None 639 640 def send_advertising_data(self): 641 if self.link and self.advertising_data: 642 self.link.send_advertising_data(self.random_address, self.advertising_data) 643 644 @property 645 def is_advertising(self): 646 return self.advertising_timer_handle is not None 647 648 ############################################################ 649 # HCI handlers 650 ############################################################ 651 def on_hci_command(self, command): 652 logger.warning(color(f'--- Unsupported command {command}', 'red')) 653 return bytes([HCI_UNKNOWN_HCI_COMMAND_ERROR]) 654 655 def on_hci_create_connection_command(self, command): 656 ''' 657 See Bluetooth spec Vol 2, Part E - 7.1.5 Create Connection command 658 ''' 659 660 if self.link is None: 661 return 662 logger.debug(f'Connection request to {command.bd_addr}') 663 664 # Check that we don't already have a pending connection 665 if self.link.get_pending_connection(): 666 self.send_hci_packet( 667 HCI_Command_Status_Event( 668 status=HCI_CONTROLLER_BUSY_ERROR, 669 num_hci_command_packets=1, 670 command_opcode=command.op_code, 671 ) 672 ) 673 return 674 675 self.link.classic_connect(self, command.bd_addr) 676 677 # Say that the connection is pending 678 self.send_hci_packet( 679 HCI_Command_Status_Event( 680 status=HCI_COMMAND_STATUS_PENDING, 681 num_hci_command_packets=1, 682 command_opcode=command.op_code, 683 ) 684 ) 685 686 def on_hci_disconnect_command(self, command): 687 ''' 688 See Bluetooth spec Vol 2, Part E - 7.1.6 Disconnect Command 689 ''' 690 # First, say that the disconnection is pending 691 self.send_hci_packet( 692 HCI_Command_Status_Event( 693 status=HCI_COMMAND_STATUS_PENDING, 694 num_hci_command_packets=1, 695 command_opcode=command.op_code, 696 ) 697 ) 698 699 # Notify the link of the disconnection 700 handle = command.connection_handle 701 if connection := self.find_central_connection_by_handle(handle): 702 if self.link: 703 self.link.disconnect( 704 self.random_address, connection.peer_address, command 705 ) 706 else: 707 # Remove the connection 708 del self.central_connections[connection.peer_address] 709 elif connection := self.find_classic_connection_by_handle(handle): 710 if self.link: 711 self.link.classic_disconnect( 712 self, 713 connection.peer_address, 714 HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 715 ) 716 else: 717 # Remove the connection 718 del self.classic_connections[connection.peer_address] 719 720 def on_hci_accept_connection_request_command(self, command): 721 ''' 722 See Bluetooth spec Vol 2, Part E - 7.1.8 Accept Connection Request command 723 ''' 724 725 if self.link is None: 726 return 727 self.send_hci_packet( 728 HCI_Command_Status_Event( 729 status=HCI_SUCCESS, 730 num_hci_command_packets=1, 731 command_opcode=command.op_code, 732 ) 733 ) 734 self.link.classic_accept_connection(self, command.bd_addr, command.role) 735 736 def on_hci_switch_role_command(self, command): 737 ''' 738 See Bluetooth spec Vol 2, Part E - 7.2.8 Switch Role command 739 ''' 740 741 if self.link is None: 742 return 743 self.send_hci_packet( 744 HCI_Command_Status_Event( 745 status=HCI_SUCCESS, 746 num_hci_command_packets=1, 747 command_opcode=command.op_code, 748 ) 749 ) 750 self.link.classic_switch_role(self, command.bd_addr, command.role) 751 752 def on_hci_set_event_mask_command(self, command): 753 ''' 754 See Bluetooth spec Vol 2, Part E - 7.3.1 Set Event Mask Command 755 ''' 756 self.event_mask = command.event_mask 757 return bytes([HCI_SUCCESS]) 758 759 def on_hci_reset_command(self, _command): 760 ''' 761 See Bluetooth spec Vol 2, Part E - 7.3.2 Reset Command 762 ''' 763 # TODO: cleanup what needs to be reset 764 return bytes([HCI_SUCCESS]) 765 766 def on_hci_write_local_name_command(self, command): 767 ''' 768 See Bluetooth spec Vol 2, Part E - 7.3.11 Write Local Name Command 769 ''' 770 local_name = command.local_name 771 if len(local_name): 772 try: 773 first_null = local_name.find(0) 774 if first_null >= 0: 775 local_name = local_name[:first_null] 776 self.local_name = str(local_name, 'utf-8') 777 except UnicodeDecodeError: 778 pass 779 return bytes([HCI_SUCCESS]) 780 781 def on_hci_read_local_name_command(self, _command): 782 ''' 783 See Bluetooth spec Vol 2, Part E - 7.3.12 Read Local Name Command 784 ''' 785 local_name = bytes(self.local_name, 'utf-8')[:248] 786 if len(local_name) < 248: 787 local_name = local_name + bytes(248 - len(local_name)) 788 789 return bytes([HCI_SUCCESS]) + local_name 790 791 def on_hci_read_class_of_device_command(self, _command): 792 ''' 793 See Bluetooth spec Vol 2, Part E - 7.3.25 Read Class of Device Command 794 ''' 795 return bytes([HCI_SUCCESS, 0, 0, 0]) 796 797 def on_hci_write_class_of_device_command(self, _command): 798 ''' 799 See Bluetooth spec Vol 2, Part E - 7.3.26 Write Class of Device Command 800 ''' 801 return bytes([HCI_SUCCESS]) 802 803 def on_hci_read_synchronous_flow_control_enable_command(self, _command): 804 ''' 805 See Bluetooth spec Vol 2, Part E - 7.3.36 Read Synchronous Flow Control Enable 806 Command 807 ''' 808 if self.sync_flow_control: 809 ret = 1 810 else: 811 ret = 0 812 return bytes([HCI_SUCCESS, ret]) 813 814 def on_hci_write_synchronous_flow_control_enable_command(self, command): 815 ''' 816 See Bluetooth spec Vol 2, Part E - 7.3.37 Write Synchronous Flow Control Enable 817 Command 818 ''' 819 ret = HCI_SUCCESS 820 if command.synchronous_flow_control_enable == 1: 821 self.sync_flow_control = True 822 elif command.synchronous_flow_control_enable == 0: 823 self.sync_flow_control = False 824 else: 825 ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR 826 return bytes([ret]) 827 828 def on_hci_write_extended_inquiry_response_command(self, _command): 829 ''' 830 See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command 831 ''' 832 return bytes([HCI_SUCCESS]) 833 834 def on_hci_write_simple_pairing_mode_command(self, _command): 835 ''' 836 See Bluetooth spec Vol 2, Part E - 7.3.59 Write Simple Pairing Mode Command 837 ''' 838 return bytes([HCI_SUCCESS]) 839 840 def on_hci_set_event_mask_page_2_command(self, command): 841 ''' 842 See Bluetooth spec Vol 2, Part E - 7.3.69 Set Event Mask Page 2 Command 843 ''' 844 self.event_mask_page_2 = command.event_mask_page_2 845 return bytes([HCI_SUCCESS]) 846 847 def on_hci_read_le_host_support_command(self, _command): 848 ''' 849 See Bluetooth spec Vol 2, Part E - 7.3.78 Write LE Host Support Command 850 ''' 851 return bytes([HCI_SUCCESS, 1, 0]) 852 853 def on_hci_write_le_host_support_command(self, _command): 854 ''' 855 See Bluetooth spec Vol 2, Part E - 7.3.79 Write LE Host Support Command 856 ''' 857 # TODO / Just ignore for now 858 return bytes([HCI_SUCCESS]) 859 860 def on_hci_write_authenticated_payload_timeout_command(self, command): 861 ''' 862 See Bluetooth spec Vol 2, Part E - 7.3.94 Write Authenticated Payload Timeout 863 Command 864 ''' 865 # TODO 866 return struct.pack('<BH', HCI_SUCCESS, command.connection_handle) 867 868 def on_hci_read_local_version_information_command(self, _command): 869 ''' 870 See Bluetooth spec Vol 2, Part E - 7.4.1 Read Local Version Information Command 871 ''' 872 return struct.pack( 873 '<BBHBHH', 874 HCI_SUCCESS, 875 self.hci_version, 876 self.hci_revision, 877 self.lmp_version, 878 self.manufacturer_name, 879 self.lmp_subversion, 880 ) 881 882 def on_hci_read_local_supported_commands_command(self, _command): 883 ''' 884 See Bluetooth spec Vol 2, Part E - 7.4.2 Read Local Supported Commands Command 885 ''' 886 return bytes([HCI_SUCCESS]) + self.supported_commands 887 888 def on_hci_read_local_supported_features_command(self, _command): 889 ''' 890 See Bluetooth spec Vol 2, Part E - 7.4.3 Read Local Supported Features Command 891 ''' 892 return bytes([HCI_SUCCESS]) + self.lmp_features 893 894 def on_hci_read_bd_addr_command(self, _command): 895 ''' 896 See Bluetooth spec Vol 2, Part E - 7.4.6 Read BD_ADDR Command 897 ''' 898 bd_addr = ( 899 self._public_address.to_bytes() 900 if self._public_address is not None 901 else bytes(6) 902 ) 903 return bytes([HCI_SUCCESS]) + bd_addr 904 905 def on_hci_le_set_event_mask_command(self, command): 906 ''' 907 See Bluetooth spec Vol 2, Part E - 7.8.1 LE Set Event Mask Command 908 ''' 909 self.le_event_mask = command.le_event_mask 910 return bytes([HCI_SUCCESS]) 911 912 def on_hci_le_read_buffer_size_command(self, _command): 913 ''' 914 See Bluetooth spec Vol 2, Part E - 7.8.2 LE Read Buffer Size Command 915 ''' 916 return struct.pack( 917 '<BHB', 918 HCI_SUCCESS, 919 self.hc_le_data_packet_length, 920 self.hc_total_num_le_data_packets, 921 ) 922 923 def on_hci_le_read_local_supported_features_command(self, _command): 924 ''' 925 See Bluetooth spec Vol 2, Part E - 7.8.3 LE Read Local Supported Features 926 Command 927 ''' 928 return bytes([HCI_SUCCESS]) + self.le_features 929 930 def on_hci_le_set_random_address_command(self, command): 931 ''' 932 See Bluetooth spec Vol 2, Part E - 7.8.4 LE Set Random Address Command 933 ''' 934 self.random_address = command.random_address 935 return bytes([HCI_SUCCESS]) 936 937 def on_hci_le_set_advertising_parameters_command(self, command): 938 ''' 939 See Bluetooth spec Vol 2, Part E - 7.8.5 LE Set Advertising Parameters Command 940 ''' 941 self.advertising_parameters = command 942 return bytes([HCI_SUCCESS]) 943 944 def on_hci_le_read_advertising_physical_channel_tx_power_command(self, _command): 945 ''' 946 See Bluetooth spec Vol 2, Part E - 7.8.6 LE Read Advertising Physical Channel 947 Tx Power Command 948 ''' 949 return bytes([HCI_SUCCESS, self.advertising_channel_tx_power]) 950 951 def on_hci_le_set_advertising_data_command(self, command): 952 ''' 953 See Bluetooth spec Vol 2, Part E - 7.8.7 LE Set Advertising Data Command 954 ''' 955 self.advertising_data = command.advertising_data 956 return bytes([HCI_SUCCESS]) 957 958 def on_hci_le_set_scan_response_data_command(self, command): 959 ''' 960 See Bluetooth spec Vol 2, Part E - 7.8.8 LE Set Scan Response Data Command 961 ''' 962 self.le_scan_response_data = command.scan_response_data 963 return bytes([HCI_SUCCESS]) 964 965 def on_hci_le_set_advertising_enable_command(self, command): 966 ''' 967 See Bluetooth spec Vol 2, Part E - 7.8.9 LE Set Advertising Enable Command 968 ''' 969 if command.advertising_enable: 970 self.start_advertising() 971 else: 972 self.stop_advertising() 973 974 return bytes([HCI_SUCCESS]) 975 976 def on_hci_le_set_scan_parameters_command(self, command): 977 ''' 978 See Bluetooth spec Vol 2, Part E - 7.8.10 LE Set Scan Parameters Command 979 ''' 980 self.le_scan_type = command.le_scan_type 981 self.le_scan_interval = command.le_scan_interval 982 self.le_scan_window = command.le_scan_window 983 self.le_scan_own_address_type = command.own_address_type 984 self.le_scanning_filter_policy = command.scanning_filter_policy 985 return bytes([HCI_SUCCESS]) 986 987 def on_hci_le_set_scan_enable_command(self, command): 988 ''' 989 See Bluetooth spec Vol 2, Part E - 7.8.11 LE Set Scan Enable Command 990 ''' 991 self.le_scan_enable = command.le_scan_enable 992 self.filter_duplicates = command.filter_duplicates 993 return bytes([HCI_SUCCESS]) 994 995 def on_hci_le_create_connection_command(self, command): 996 ''' 997 See Bluetooth spec Vol 2, Part E - 7.8.12 LE Create Connection Command 998 ''' 999 1000 if not self.link: 1001 return 1002 1003 logger.debug(f'Connection request to {command.peer_address}') 1004 1005 # Check that we don't already have a pending connection 1006 if self.link.get_pending_connection(): 1007 self.send_hci_packet( 1008 HCI_Command_Status_Event( 1009 status=HCI_COMMAND_DISALLOWED_ERROR, 1010 num_hci_command_packets=1, 1011 command_opcode=command.op_code, 1012 ) 1013 ) 1014 return 1015 1016 # Initiate the connection 1017 self.link.connect(self.random_address, command) 1018 1019 # Say that the connection is pending 1020 self.send_hci_packet( 1021 HCI_Command_Status_Event( 1022 status=HCI_COMMAND_STATUS_PENDING, 1023 num_hci_command_packets=1, 1024 command_opcode=command.op_code, 1025 ) 1026 ) 1027 1028 def on_hci_le_create_connection_cancel_command(self, _command): 1029 ''' 1030 See Bluetooth spec Vol 2, Part E - 7.8.13 LE Create Connection Cancel Command 1031 ''' 1032 return bytes([HCI_SUCCESS]) 1033 1034 def on_hci_le_read_filter_accept_list_size_command(self, _command): 1035 ''' 1036 See Bluetooth spec Vol 2, Part E - 7.8.14 LE Read Filter Accept List Size 1037 Command 1038 ''' 1039 return bytes([HCI_SUCCESS, self.filter_accept_list_size]) 1040 1041 def on_hci_le_clear_filter_accept_list_command(self, _command): 1042 ''' 1043 See Bluetooth spec Vol 2, Part E - 7.8.15 LE Clear Filter Accept List Command 1044 ''' 1045 return bytes([HCI_SUCCESS]) 1046 1047 def on_hci_le_add_device_to_filter_accept_list_command(self, _command): 1048 ''' 1049 See Bluetooth spec Vol 2, Part E - 7.8.16 LE Add Device To Filter Accept List 1050 Command 1051 ''' 1052 return bytes([HCI_SUCCESS]) 1053 1054 def on_hci_le_remove_device_from_filter_accept_list_command(self, _command): 1055 ''' 1056 See Bluetooth spec Vol 2, Part E - 7.8.17 LE Remove Device From Filter Accept 1057 List Command 1058 ''' 1059 return bytes([HCI_SUCCESS]) 1060 1061 def on_hci_le_read_remote_features_command(self, command): 1062 ''' 1063 See Bluetooth spec Vol 2, Part E - 7.8.21 LE Read Remote Features Command 1064 ''' 1065 1066 # First, say that the command is pending 1067 self.send_hci_packet( 1068 HCI_Command_Status_Event( 1069 status=HCI_COMMAND_STATUS_PENDING, 1070 num_hci_command_packets=1, 1071 command_opcode=command.op_code, 1072 ) 1073 ) 1074 1075 # Then send the remote features 1076 self.send_hci_packet( 1077 HCI_LE_Read_Remote_Features_Complete_Event( 1078 status=HCI_SUCCESS, 1079 connection_handle=0, 1080 le_features=bytes.fromhex('dd40000000000000'), 1081 ) 1082 ) 1083 1084 def on_hci_le_rand_command(self, _command): 1085 ''' 1086 See Bluetooth spec Vol 2, Part E - 7.8.23 LE Rand Command 1087 ''' 1088 return bytes([HCI_SUCCESS]) + struct.pack('Q', random.randint(0, 1 << 64)) 1089 1090 def on_hci_le_enable_encryption_command(self, command): 1091 ''' 1092 See Bluetooth spec Vol 2, Part E - 7.8.24 LE Enable Encryption Command 1093 ''' 1094 1095 # Check the parameters 1096 if not ( 1097 connection := self.find_central_connection_by_handle( 1098 command.connection_handle 1099 ) 1100 ): 1101 logger.warning('connection not found') 1102 return bytes([HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR]) 1103 1104 # Notify that the connection is now encrypted 1105 self.link.on_connection_encrypted( 1106 self.random_address, 1107 connection.peer_address, 1108 command.random_number, 1109 command.encrypted_diversifier, 1110 command.long_term_key, 1111 ) 1112 1113 self.send_hci_packet( 1114 HCI_Command_Status_Event( 1115 status=HCI_COMMAND_STATUS_PENDING, 1116 num_hci_command_packets=1, 1117 command_opcode=command.op_code, 1118 ) 1119 ) 1120 1121 return None 1122 1123 def on_hci_le_read_supported_states_command(self, _command): 1124 ''' 1125 See Bluetooth spec Vol 2, Part E - 7.8.27 LE Read Supported States Command 1126 ''' 1127 return bytes([HCI_SUCCESS]) + self.le_states 1128 1129 def on_hci_le_read_suggested_default_data_length_command(self, _command): 1130 ''' 1131 See Bluetooth spec Vol 2, Part E - 7.8.34 LE Read Suggested Default Data Length 1132 Command 1133 ''' 1134 return struct.pack( 1135 '<BHH', 1136 HCI_SUCCESS, 1137 self.suggested_max_tx_octets, 1138 self.suggested_max_tx_time, 1139 ) 1140 1141 def on_hci_le_write_suggested_default_data_length_command(self, command): 1142 ''' 1143 See Bluetooth spec Vol 2, Part E - 7.8.35 LE Write Suggested Default Data Length 1144 Command 1145 ''' 1146 self.suggested_max_tx_octets, self.suggested_max_tx_time = struct.unpack( 1147 '<HH', command.parameters[:4] 1148 ) 1149 return bytes([HCI_SUCCESS]) 1150 1151 def on_hci_le_read_local_p_256_public_key_command(self, _command): 1152 ''' 1153 See Bluetooth spec Vol 2, Part E - 7.8.36 LE Read P-256 Public Key Command 1154 ''' 1155 # TODO create key and send HCI_LE_Read_Local_P-256_Public_Key_Complete event 1156 return bytes([HCI_SUCCESS]) 1157 1158 def on_hci_le_add_device_to_resolving_list_command(self, _command): 1159 ''' 1160 See Bluetooth spec Vol 2, Part E - 7.8.38 LE Add Device To Resolving List 1161 Command 1162 ''' 1163 return bytes([HCI_SUCCESS]) 1164 1165 def on_hci_le_clear_resolving_list_command(self, _command): 1166 ''' 1167 See Bluetooth spec Vol 2, Part E - 7.8.40 LE Clear Resolving List Command 1168 ''' 1169 return bytes([HCI_SUCCESS]) 1170 1171 def on_hci_le_read_resolving_list_size_command(self, _command): 1172 ''' 1173 See Bluetooth spec Vol 2, Part E - 7.8.41 LE Read Resolving List Size Command 1174 ''' 1175 return bytes([HCI_SUCCESS, self.resolving_list_size]) 1176 1177 def on_hci_le_set_address_resolution_enable_command(self, command): 1178 ''' 1179 See Bluetooth spec Vol 2, Part E - 7.8.44 LE Set Address Resolution Enable 1180 Command 1181 ''' 1182 ret = HCI_SUCCESS 1183 if command.address_resolution_enable == 1: 1184 self.le_address_resolution = True 1185 elif command.address_resolution_enable == 0: 1186 self.le_address_resolution = False 1187 else: 1188 ret = HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR 1189 return bytes([ret]) 1190 1191 def on_hci_le_set_resolvable_private_address_timeout_command(self, command): 1192 ''' 1193 See Bluetooth spec Vol 2, Part E - 7.8.45 LE Set Resolvable Private Address 1194 Timeout Command 1195 ''' 1196 self.le_rpa_timeout = command.rpa_timeout 1197 return bytes([HCI_SUCCESS]) 1198 1199 def on_hci_le_read_maximum_data_length_command(self, _command): 1200 ''' 1201 See Bluetooth spec Vol 2, Part E - 7.8.46 LE Read Maximum Data Length Command 1202 ''' 1203 return struct.pack( 1204 '<BHHHH', 1205 HCI_SUCCESS, 1206 self.supported_max_tx_octets, 1207 self.supported_max_tx_time, 1208 self.supported_max_rx_octets, 1209 self.supported_max_rx_time, 1210 ) 1211 1212 def on_hci_le_read_phy_command(self, command): 1213 ''' 1214 See Bluetooth spec Vol 2, Part E - 7.8.47 LE Read PHY Command 1215 ''' 1216 return struct.pack( 1217 '<BHBB', 1218 HCI_SUCCESS, 1219 command.connection_handle, 1220 HCI_LE_1M_PHY, 1221 HCI_LE_1M_PHY, 1222 ) 1223 1224 def on_hci_le_set_default_phy_command(self, command): 1225 ''' 1226 See Bluetooth spec Vol 2, Part E - 7.8.48 LE Set Default PHY Command 1227 ''' 1228 self.default_phy = { 1229 'all_phys': command.all_phys, 1230 'tx_phys': command.tx_phys, 1231 'rx_phys': command.rx_phys, 1232 } 1233 return bytes([HCI_SUCCESS]) 1234 1235 def on_hci_le_read_transmit_power_command(self, _command): 1236 ''' 1237 See Bluetooth spec Vol 2, Part E - 7.8.74 LE Read Transmit Power Command 1238 ''' 1239 return struct.pack('<BBB', HCI_SUCCESS, 0, 0) 1240