1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19from enum import IntEnum 20import copy 21import functools 22import json 23import asyncio 24import logging 25import secrets 26import sys 27from contextlib import ( 28 asynccontextmanager, 29 AsyncExitStack, 30 closing, 31 AbstractAsyncContextManager, 32) 33from dataclasses import dataclass, field 34from collections.abc import Iterable 35from typing import ( 36 Any, 37 Callable, 38 ClassVar, 39 Dict, 40 List, 41 Optional, 42 Tuple, 43 Type, 44 TypeVar, 45 Union, 46 cast, 47 overload, 48 TYPE_CHECKING, 49) 50from typing_extensions import Self 51 52from pyee import EventEmitter 53 54from .colors import color 55from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU 56from .gatt import Characteristic, Descriptor, Service 57from .hci import ( 58 HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, 59 HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, 60 HCI_CENTRAL_ROLE, 61 HCI_PERIPHERAL_ROLE, 62 HCI_COMMAND_STATUS_PENDING, 63 HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, 64 HCI_DISPLAY_YES_NO_IO_CAPABILITY, 65 HCI_DISPLAY_ONLY_IO_CAPABILITY, 66 HCI_EXTENDED_INQUIRY_MODE, 67 HCI_GENERAL_INQUIRY_LAP, 68 HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, 69 HCI_KEYBOARD_ONLY_IO_CAPABILITY, 70 HCI_LE_1M_PHY, 71 HCI_LE_1M_PHY_BIT, 72 HCI_LE_2M_PHY, 73 HCI_LE_CODED_PHY, 74 HCI_LE_CODED_PHY_BIT, 75 HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND, 76 HCI_LE_RAND_COMMAND, 77 HCI_LE_READ_PHY_COMMAND, 78 HCI_LE_SET_PHY_COMMAND, 79 HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 80 HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 81 HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 82 HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 83 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, 84 HCI_R2_PAGE_SCAN_REPETITION_MODE, 85 HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 86 HCI_SUCCESS, 87 HCI_WRITE_LE_HOST_SUPPORT_COMMAND, 88 HCI_Accept_Connection_Request_Command, 89 HCI_Authentication_Requested_Command, 90 HCI_Command_Status_Event, 91 HCI_Constant, 92 HCI_Create_Connection_Cancel_Command, 93 HCI_Create_Connection_Command, 94 HCI_Connection_Complete_Event, 95 HCI_Disconnect_Command, 96 HCI_Encryption_Change_Event, 97 HCI_Error, 98 HCI_IO_Capability_Request_Reply_Command, 99 HCI_Inquiry_Cancel_Command, 100 HCI_Inquiry_Command, 101 HCI_IsoDataPacket, 102 HCI_LE_Accept_CIS_Request_Command, 103 HCI_LE_Add_Device_To_Resolving_List_Command, 104 HCI_LE_Advertising_Report_Event, 105 HCI_LE_Clear_Resolving_List_Command, 106 HCI_LE_Connection_Update_Command, 107 HCI_LE_Create_Connection_Cancel_Command, 108 HCI_LE_Create_Connection_Command, 109 HCI_LE_Create_CIS_Command, 110 HCI_LE_Enable_Encryption_Command, 111 HCI_LE_Extended_Advertising_Report_Event, 112 HCI_LE_Extended_Create_Connection_Command, 113 HCI_LE_Rand_Command, 114 HCI_LE_Read_PHY_Command, 115 HCI_LE_Read_Remote_Features_Command, 116 HCI_LE_Reject_CIS_Request_Command, 117 HCI_LE_Remove_Advertising_Set_Command, 118 HCI_LE_Set_Address_Resolution_Enable_Command, 119 HCI_LE_Set_Advertising_Data_Command, 120 HCI_LE_Set_Advertising_Enable_Command, 121 HCI_LE_Set_Advertising_Parameters_Command, 122 HCI_LE_Set_Advertising_Set_Random_Address_Command, 123 HCI_LE_Set_CIG_Parameters_Command, 124 HCI_LE_Set_Data_Length_Command, 125 HCI_LE_Set_Default_PHY_Command, 126 HCI_LE_Set_Extended_Scan_Enable_Command, 127 HCI_LE_Set_Extended_Scan_Parameters_Command, 128 HCI_LE_Set_Extended_Scan_Response_Data_Command, 129 HCI_LE_Set_Extended_Advertising_Data_Command, 130 HCI_LE_Set_Extended_Advertising_Enable_Command, 131 HCI_LE_Set_Extended_Advertising_Parameters_Command, 132 HCI_LE_Set_Host_Feature_Command, 133 HCI_LE_Set_Periodic_Advertising_Enable_Command, 134 HCI_LE_Set_PHY_Command, 135 HCI_LE_Set_Random_Address_Command, 136 HCI_LE_Set_Scan_Enable_Command, 137 HCI_LE_Set_Scan_Parameters_Command, 138 HCI_LE_Set_Scan_Response_Data_Command, 139 HCI_PIN_Code_Request_Reply_Command, 140 HCI_PIN_Code_Request_Negative_Reply_Command, 141 HCI_Read_BD_ADDR_Command, 142 HCI_Read_RSSI_Command, 143 HCI_Reject_Connection_Request_Command, 144 HCI_Remote_Name_Request_Command, 145 HCI_Switch_Role_Command, 146 HCI_Set_Connection_Encryption_Command, 147 HCI_StatusError, 148 HCI_SynchronousDataPacket, 149 HCI_User_Confirmation_Request_Negative_Reply_Command, 150 HCI_User_Confirmation_Request_Reply_Command, 151 HCI_User_Passkey_Request_Negative_Reply_Command, 152 HCI_User_Passkey_Request_Reply_Command, 153 HCI_Write_Class_Of_Device_Command, 154 HCI_Write_Extended_Inquiry_Response_Command, 155 HCI_Write_Inquiry_Mode_Command, 156 HCI_Write_LE_Host_Support_Command, 157 HCI_Write_Local_Name_Command, 158 HCI_Write_Scan_Enable_Command, 159 HCI_Write_Secure_Connections_Host_Support_Command, 160 HCI_Write_Simple_Pairing_Mode_Command, 161 Address, 162 OwnAddressType, 163 LeFeature, 164 LeFeatureMask, 165 Phy, 166 phy_list_to_bits, 167) 168from .host import Host 169from .gap import GenericAccessService 170from .core import ( 171 BT_BR_EDR_TRANSPORT, 172 BT_CENTRAL_ROLE, 173 BT_LE_TRANSPORT, 174 BT_PERIPHERAL_ROLE, 175 AdvertisingData, 176 ConnectionParameterUpdateError, 177 CommandTimeoutError, 178 ConnectionPHY, 179 InvalidStateError, 180) 181from .utils import ( 182 AsyncRunner, 183 CompositeEventEmitter, 184 EventWatcher, 185 setup_event_forwarding, 186 composite_listener, 187 deprecated, 188 experimental, 189) 190from .keys import ( 191 KeyStore, 192 PairingKeys, 193) 194from .pairing import PairingConfig 195from . import gatt_client 196from . import gatt_server 197from . import smp 198from . import sdp 199from . import l2cap 200from . import core 201 202if TYPE_CHECKING: 203 from .transport.common import TransportSource, TransportSink 204 205 206# ----------------------------------------------------------------------------- 207# Logging 208# ----------------------------------------------------------------------------- 209logger = logging.getLogger(__name__) 210 211# ----------------------------------------------------------------------------- 212# Constants 213# ----------------------------------------------------------------------------- 214# fmt: off 215# pylint: disable=line-too-long 216 217DEVICE_MIN_SCAN_INTERVAL = 25 218DEVICE_MAX_SCAN_INTERVAL = 10240 219DEVICE_MIN_SCAN_WINDOW = 25 220DEVICE_MAX_SCAN_WINDOW = 10240 221DEVICE_MIN_LE_RSSI = -127 222DEVICE_MAX_LE_RSSI = 20 223DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE = 0x00 224DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE = 0xEF 225 226DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00' 227DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms 228DEVICE_DEFAULT_ADVERTISING_DATA = '' 229DEVICE_DEFAULT_NAME = 'Bumble' 230DEVICE_DEFAULT_INQUIRY_LENGTH = 8 # 10.24 seconds 231DEVICE_DEFAULT_CLASS_OF_DEVICE = 0 232DEVICE_DEFAULT_SCAN_RESPONSE_DATA = b'' 233DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328) 234DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms 235DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms 236DEVICE_DEFAULT_CONNECT_TIMEOUT = None # No timeout 237DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL = 60 # ms 238DEVICE_DEFAULT_CONNECT_SCAN_WINDOW = 60 # ms 239DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN = 15 # ms 240DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX = 30 # ms 241DEVICE_DEFAULT_CONNECTION_MAX_LATENCY = 0 242DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT = 720 # ms 243DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH = 0 # ms 244DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH = 0 # ms 245DEVICE_DEFAULT_L2CAP_COC_MTU = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU 246DEVICE_DEFAULT_L2CAP_COC_MPS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS 247DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS 248DEVICE_DEFAULT_ADVERTISING_TX_POWER = ( 249 HCI_LE_Set_Extended_Advertising_Parameters_Command.TX_POWER_NO_PREFERENCE 250) 251 252# fmt: on 253# pylint: enable=line-too-long 254 255# As specified in 7.8.56 LE Set Extended Advertising Enable command 256DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION = 1.28 257 258 259# ----------------------------------------------------------------------------- 260# Classes 261# ----------------------------------------------------------------------------- 262 263 264# ----------------------------------------------------------------------------- 265@dataclass 266class Advertisement: 267 # Attributes 268 address: Address 269 rssi: int = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE 270 is_legacy: bool = False 271 is_anonymous: bool = False 272 is_connectable: bool = False 273 is_directed: bool = False 274 is_scannable: bool = False 275 is_scan_response: bool = False 276 is_complete: bool = True 277 is_truncated: bool = False 278 primary_phy: int = 0 279 secondary_phy: int = 0 280 tx_power: int = ( 281 HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE 282 ) 283 sid: int = 0 284 data_bytes: bytes = b'' 285 286 # Constants 287 TX_POWER_NOT_AVAILABLE: ClassVar[int] = ( 288 HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE 289 ) 290 RSSI_NOT_AVAILABLE: ClassVar[int] = ( 291 HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE 292 ) 293 294 def __post_init__(self) -> None: 295 self.data = AdvertisingData.from_bytes(self.data_bytes) 296 297 @classmethod 298 def from_advertising_report(cls, report) -> Optional[Advertisement]: 299 if isinstance(report, HCI_LE_Advertising_Report_Event.Report): 300 return LegacyAdvertisement.from_advertising_report(report) 301 302 if isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report): 303 return ExtendedAdvertisement.from_advertising_report(report) 304 305 return None 306 307 308# ----------------------------------------------------------------------------- 309class LegacyAdvertisement(Advertisement): 310 @classmethod 311 def from_advertising_report(cls, report): 312 return cls( 313 address=report.address, 314 rssi=report.rssi, 315 is_legacy=True, 316 is_connectable=report.event_type 317 in ( 318 HCI_LE_Advertising_Report_Event.ADV_IND, 319 HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, 320 ), 321 is_directed=report.event_type 322 == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, 323 is_scannable=report.event_type 324 in ( 325 HCI_LE_Advertising_Report_Event.ADV_IND, 326 HCI_LE_Advertising_Report_Event.ADV_SCAN_IND, 327 ), 328 is_scan_response=report.event_type 329 == HCI_LE_Advertising_Report_Event.SCAN_RSP, 330 data_bytes=report.data, 331 ) 332 333 334# ----------------------------------------------------------------------------- 335class ExtendedAdvertisement(Advertisement): 336 @classmethod 337 def from_advertising_report(cls, report): 338 # fmt: off 339 # pylint: disable=line-too-long 340 return cls( 341 address = report.address, 342 rssi = report.rssi, 343 is_legacy = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0, 344 is_anonymous = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE, 345 is_connectable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0, 346 is_directed = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0, 347 is_scannable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0, 348 is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0, 349 is_complete = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE, 350 is_truncated = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME, 351 primary_phy = report.primary_phy, 352 secondary_phy = report.secondary_phy, 353 tx_power = report.tx_power, 354 sid = report.advertising_sid, 355 data_bytes = report.data 356 ) 357 # fmt: on 358 359 360# ----------------------------------------------------------------------------- 361class AdvertisementDataAccumulator: 362 def __init__(self, passive=False): 363 self.passive = passive 364 self.last_advertisement = None 365 self.last_data = b'' 366 367 def update(self, report): 368 advertisement = Advertisement.from_advertising_report(report) 369 if advertisement is None: 370 return None 371 372 result = None 373 374 if advertisement.is_scan_response: 375 if ( 376 self.last_advertisement is not None 377 and not self.last_advertisement.is_scan_response 378 ): 379 # This is the response to a scannable advertisement 380 result = Advertisement.from_advertising_report(report) 381 result.is_connectable = self.last_advertisement.is_connectable 382 result.is_scannable = True 383 result.data = AdvertisingData.from_bytes(self.last_data + report.data) 384 self.last_data = b'' 385 else: 386 if ( 387 self.passive 388 or (not advertisement.is_scannable) 389 or ( 390 self.last_advertisement is not None 391 and not self.last_advertisement.is_scan_response 392 ) 393 ): 394 # Don't wait for a scan response 395 result = Advertisement.from_advertising_report(report) 396 397 self.last_data = report.data 398 399 self.last_advertisement = advertisement 400 401 return result 402 403 404# ----------------------------------------------------------------------------- 405class AdvertisingType(IntEnum): 406 # fmt: off 407 # pylint: disable=line-too-long 408 UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00 # Undirected, connectable, scannable 409 DIRECTED_CONNECTABLE_HIGH_DUTY = 0x01 # Directed, connectable, non-scannable 410 UNDIRECTED_SCANNABLE = 0x02 # Undirected, non-connectable, scannable 411 UNDIRECTED = 0x03 # Undirected, non-connectable, non-scannable 412 DIRECTED_CONNECTABLE_LOW_DUTY = 0x04 # Directed, connectable, non-scannable 413 # fmt: on 414 415 @property 416 def has_data(self) -> bool: 417 return self in ( 418 AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 419 AdvertisingType.UNDIRECTED_SCANNABLE, 420 AdvertisingType.UNDIRECTED, 421 ) 422 423 @property 424 def is_connectable(self) -> bool: 425 return self in ( 426 AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 427 AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, 428 AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY, 429 ) 430 431 @property 432 def is_scannable(self) -> bool: 433 return self in ( 434 AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 435 AdvertisingType.UNDIRECTED_SCANNABLE, 436 ) 437 438 @property 439 def is_directed(self) -> bool: 440 return self in ( 441 AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, 442 AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY, 443 ) 444 445 @property 446 def is_high_duty_cycle_directed_connectable(self): 447 return self == AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY 448 449 450# ----------------------------------------------------------------------------- 451@dataclass 452class LegacyAdvertiser: 453 device: Device 454 advertising_type: AdvertisingType 455 own_address_type: OwnAddressType 456 peer_address: Address 457 auto_restart: bool 458 459 async def start(self) -> None: 460 # Set/update the advertising data if the advertising type allows it 461 if self.advertising_type.has_data: 462 await self.device.send_command( 463 HCI_LE_Set_Advertising_Data_Command( 464 advertising_data=self.device.advertising_data 465 ), 466 check_result=True, 467 ) 468 469 # Set/update the scan response data if the advertising is scannable 470 if self.advertising_type.is_scannable: 471 await self.device.send_command( 472 HCI_LE_Set_Scan_Response_Data_Command( 473 scan_response_data=self.device.scan_response_data 474 ), 475 check_result=True, 476 ) 477 478 # Set the advertising parameters 479 await self.device.send_command( 480 HCI_LE_Set_Advertising_Parameters_Command( 481 advertising_interval_min=self.device.advertising_interval_min, 482 advertising_interval_max=self.device.advertising_interval_max, 483 advertising_type=int(self.advertising_type), 484 own_address_type=self.own_address_type, 485 peer_address_type=self.peer_address.address_type, 486 peer_address=self.peer_address, 487 advertising_channel_map=7, 488 advertising_filter_policy=0, 489 ), 490 check_result=True, 491 ) 492 493 # Enable advertising 494 await self.device.send_command( 495 HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), 496 check_result=True, 497 ) 498 499 async def stop(self) -> None: 500 # Disable advertising 501 await self.device.send_command( 502 HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), 503 check_result=True, 504 ) 505 506 507# ----------------------------------------------------------------------------- 508@dataclass 509class AdvertisingEventProperties: 510 is_connectable: bool = True 511 is_scannable: bool = False 512 is_directed: bool = False 513 is_high_duty_cycle_directed_connectable: bool = False 514 is_legacy: bool = False 515 is_anonymous: bool = False 516 include_tx_power: bool = False 517 518 def __int__(self) -> int: 519 properties = ( 520 HCI_LE_Set_Extended_Advertising_Parameters_Command.AdvertisingProperties(0) 521 ) 522 if self.is_connectable: 523 properties |= properties.CONNECTABLE_ADVERTISING 524 if self.is_scannable: 525 properties |= properties.SCANNABLE_ADVERTISING 526 if self.is_directed: 527 properties |= properties.DIRECTED_ADVERTISING 528 if self.is_high_duty_cycle_directed_connectable: 529 properties |= properties.HIGH_DUTY_CYCLE_DIRECTED_CONNECTABLE_ADVERTISING 530 if self.is_legacy: 531 properties |= properties.USE_LEGACY_ADVERTISING_PDUS 532 if self.is_anonymous: 533 properties |= properties.ANONYMOUS_ADVERTISING 534 if self.include_tx_power: 535 properties |= properties.INCLUDE_TX_POWER 536 537 return int(properties) 538 539 @classmethod 540 def from_advertising_type( 541 cls: Type[AdvertisingEventProperties], 542 advertising_type: AdvertisingType, 543 ) -> AdvertisingEventProperties: 544 return cls( 545 is_connectable=advertising_type.is_connectable, 546 is_scannable=advertising_type.is_scannable, 547 is_directed=advertising_type.is_directed, 548 is_high_duty_cycle_directed_connectable=advertising_type.is_high_duty_cycle_directed_connectable, 549 is_legacy=True, 550 is_anonymous=False, 551 include_tx_power=False, 552 ) 553 554 555# ----------------------------------------------------------------------------- 556# TODO: replace with typing.TypeAlias when the code base is all Python >= 3.10 557AdvertisingChannelMap = HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap 558 559 560# ----------------------------------------------------------------------------- 561@dataclass 562class AdvertisingParameters: 563 # pylint: disable=line-too-long 564 advertising_event_properties: AdvertisingEventProperties = field( 565 default_factory=AdvertisingEventProperties 566 ) 567 primary_advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL 568 primary_advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL 569 primary_advertising_channel_map: ( 570 HCI_LE_Set_Extended_Advertising_Parameters_Command.ChannelMap 571 ) = ( 572 AdvertisingChannelMap.CHANNEL_37 573 | AdvertisingChannelMap.CHANNEL_38 574 | AdvertisingChannelMap.CHANNEL_39 575 ) 576 own_address_type: OwnAddressType = OwnAddressType.RANDOM 577 peer_address: Address = Address.ANY 578 advertising_filter_policy: int = 0 579 advertising_tx_power: int = DEVICE_DEFAULT_ADVERTISING_TX_POWER 580 primary_advertising_phy: Phy = Phy.LE_1M 581 secondary_advertising_max_skip: int = 0 582 secondary_advertising_phy: Phy = Phy.LE_1M 583 advertising_sid: int = 0 584 enable_scan_request_notifications: bool = False 585 primary_advertising_phy_options: int = 0 586 secondary_advertising_phy_options: int = 0 587 588 589# ----------------------------------------------------------------------------- 590@dataclass 591class PeriodicAdvertisingParameters: 592 # TODO implement this class 593 pass 594 595 596# ----------------------------------------------------------------------------- 597@dataclass 598class AdvertisingSet(EventEmitter): 599 device: Device 600 advertising_handle: int 601 auto_restart: bool 602 random_address: Optional[Address] 603 advertising_parameters: AdvertisingParameters 604 advertising_data: bytes 605 scan_response_data: bytes 606 periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] 607 periodic_advertising_data: bytes 608 selected_tx_power: int = 0 609 enabled: bool = False 610 611 def __post_init__(self) -> None: 612 super().__init__() 613 614 async def set_advertising_parameters( 615 self, advertising_parameters: AdvertisingParameters 616 ) -> None: 617 # Compliance check 618 if ( 619 not advertising_parameters.advertising_event_properties.is_legacy 620 and advertising_parameters.advertising_event_properties.is_connectable 621 and advertising_parameters.advertising_event_properties.is_scannable 622 ): 623 logger.warning( 624 "non-legacy extended advertising event properties may not be both " 625 "connectable and scannable" 626 ) 627 628 response = await self.device.send_command( 629 HCI_LE_Set_Extended_Advertising_Parameters_Command( 630 advertising_handle=self.advertising_handle, 631 advertising_event_properties=int( 632 advertising_parameters.advertising_event_properties 633 ), 634 primary_advertising_interval_min=( 635 int(advertising_parameters.primary_advertising_interval_min / 0.625) 636 ), 637 primary_advertising_interval_max=( 638 int(advertising_parameters.primary_advertising_interval_min / 0.625) 639 ), 640 primary_advertising_channel_map=int( 641 advertising_parameters.primary_advertising_channel_map 642 ), 643 own_address_type=advertising_parameters.own_address_type, 644 peer_address_type=advertising_parameters.peer_address.address_type, 645 peer_address=advertising_parameters.peer_address, 646 advertising_tx_power=advertising_parameters.advertising_tx_power, 647 advertising_filter_policy=( 648 advertising_parameters.advertising_filter_policy 649 ), 650 primary_advertising_phy=advertising_parameters.primary_advertising_phy, 651 secondary_advertising_max_skip=( 652 advertising_parameters.secondary_advertising_max_skip 653 ), 654 secondary_advertising_phy=( 655 advertising_parameters.secondary_advertising_phy 656 ), 657 advertising_sid=advertising_parameters.advertising_sid, 658 scan_request_notification_enable=( 659 1 if advertising_parameters.enable_scan_request_notifications else 0 660 ), 661 ), 662 check_result=True, 663 ) 664 self.selected_tx_power = response.return_parameters.selected_tx_power 665 self.advertising_parameters = advertising_parameters 666 667 async def set_advertising_data(self, advertising_data: bytes) -> None: 668 # pylint: disable=line-too-long 669 await self.device.send_command( 670 HCI_LE_Set_Extended_Advertising_Data_Command( 671 advertising_handle=self.advertising_handle, 672 operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, 673 fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, 674 advertising_data=advertising_data, 675 ), 676 check_result=True, 677 ) 678 self.advertising_data = advertising_data 679 680 async def set_scan_response_data(self, scan_response_data: bytes) -> None: 681 # pylint: disable=line-too-long 682 if ( 683 scan_response_data 684 and not self.advertising_parameters.advertising_event_properties.is_scannable 685 ): 686 logger.warning( 687 "ignoring attempt to set non-empty scan response data on non-scannable " 688 "advertising set" 689 ) 690 return 691 692 await self.device.send_command( 693 HCI_LE_Set_Extended_Scan_Response_Data_Command( 694 advertising_handle=self.advertising_handle, 695 operation=HCI_LE_Set_Extended_Advertising_Data_Command.Operation.COMPLETE_DATA, 696 fragment_preference=HCI_LE_Set_Extended_Advertising_Parameters_Command.SHOULD_NOT_FRAGMENT, 697 scan_response_data=scan_response_data, 698 ), 699 check_result=True, 700 ) 701 self.scan_response_data = scan_response_data 702 703 async def set_periodic_advertising_parameters( 704 self, advertising_parameters: PeriodicAdvertisingParameters 705 ) -> None: 706 # TODO: send command 707 self.periodic_advertising_parameters = advertising_parameters 708 709 async def set_periodic_advertising_data(self, advertising_data: bytes) -> None: 710 # TODO: send command 711 self.periodic_advertising_data = advertising_data 712 713 async def set_random_address(self, random_address: Address) -> None: 714 await self.device.send_command( 715 HCI_LE_Set_Advertising_Set_Random_Address_Command( 716 advertising_handle=self.advertising_handle, 717 random_address=(random_address or self.device.random_address), 718 ), 719 check_result=True, 720 ) 721 722 async def start( 723 self, duration: float = 0.0, max_advertising_events: int = 0 724 ) -> None: 725 """ 726 Start advertising. 727 728 Args: 729 duration: How long to advertise for, in seconds. Use 0 (the default) for 730 an unlimited duration, unless this advertising set is a High Duty Cycle 731 Directed Advertisement type. 732 max_advertising_events: Maximum number of events to advertise for. Use 0 733 (the default) for an unlimited number of advertisements. 734 """ 735 await self.device.send_command( 736 HCI_LE_Set_Extended_Advertising_Enable_Command( 737 enable=1, 738 advertising_handles=[self.advertising_handle], 739 durations=[round(duration * 100)], 740 max_extended_advertising_events=[max_advertising_events], 741 ), 742 check_result=True, 743 ) 744 self.enabled = True 745 746 self.emit('start') 747 748 async def start_periodic(self, include_adi: bool = False) -> None: 749 await self.device.send_command( 750 HCI_LE_Set_Periodic_Advertising_Enable_Command( 751 enable=1 | (2 if include_adi else 0), 752 advertising_handles=self.advertising_handle, 753 ), 754 check_result=True, 755 ) 756 757 self.emit('start_periodic') 758 759 async def stop(self) -> None: 760 await self.device.send_command( 761 HCI_LE_Set_Extended_Advertising_Enable_Command( 762 enable=0, 763 advertising_handles=[self.advertising_handle], 764 durations=[0], 765 max_extended_advertising_events=[0], 766 ), 767 check_result=True, 768 ) 769 self.enabled = False 770 771 self.emit('stop') 772 773 async def stop_periodic(self) -> None: 774 await self.device.send_command( 775 HCI_LE_Set_Periodic_Advertising_Enable_Command( 776 enable=0, 777 advertising_handles=self.advertising_handle, 778 ), 779 check_result=True, 780 ) 781 782 self.emit('stop_periodic') 783 784 async def remove(self) -> None: 785 await self.device.send_command( 786 HCI_LE_Remove_Advertising_Set_Command( 787 advertising_handle=self.advertising_handle 788 ), 789 check_result=True, 790 ) 791 del self.device.extended_advertising_sets[self.advertising_handle] 792 793 def on_termination(self, status: int) -> None: 794 self.enabled = False 795 self.emit('termination', status) 796 797 798# ----------------------------------------------------------------------------- 799class LePhyOptions: 800 # Coded PHY preference 801 ANY_CODED_PHY = 0 802 PREFER_S_2_CODED_PHY = 1 803 PREFER_S_8_CODED_PHY = 2 804 805 def __init__(self, coded_phy_preference=0): 806 self.coded_phy_preference = coded_phy_preference 807 808 def __int__(self): 809 return self.coded_phy_preference & 3 810 811 812# ----------------------------------------------------------------------------- 813_PROXY_CLASS = TypeVar('_PROXY_CLASS', bound=gatt_client.ProfileServiceProxy) 814 815 816class Peer: 817 def __init__(self, connection: Connection) -> None: 818 self.connection = connection 819 820 # Create a GATT client for the connection 821 self.gatt_client = gatt_client.Client(connection) 822 connection.gatt_client = self.gatt_client 823 824 @property 825 def services(self) -> List[gatt_client.ServiceProxy]: 826 return self.gatt_client.services 827 828 async def request_mtu(self, mtu: int) -> int: 829 mtu = await self.gatt_client.request_mtu(mtu) 830 self.connection.emit('connection_att_mtu_update') 831 return mtu 832 833 async def discover_service( 834 self, uuid: Union[core.UUID, str] 835 ) -> List[gatt_client.ServiceProxy]: 836 return await self.gatt_client.discover_service(uuid) 837 838 async def discover_services( 839 self, uuids: Iterable[core.UUID] = () 840 ) -> List[gatt_client.ServiceProxy]: 841 return await self.gatt_client.discover_services(uuids) 842 843 async def discover_included_services( 844 self, service: gatt_client.ServiceProxy 845 ) -> List[gatt_client.ServiceProxy]: 846 return await self.gatt_client.discover_included_services(service) 847 848 async def discover_characteristics( 849 self, 850 uuids: Iterable[Union[core.UUID, str]] = (), 851 service: Optional[gatt_client.ServiceProxy] = None, 852 ) -> List[gatt_client.CharacteristicProxy]: 853 return await self.gatt_client.discover_characteristics( 854 uuids=uuids, service=service 855 ) 856 857 async def discover_descriptors( 858 self, 859 characteristic: Optional[gatt_client.CharacteristicProxy] = None, 860 start_handle: Optional[int] = None, 861 end_handle: Optional[int] = None, 862 ): 863 return await self.gatt_client.discover_descriptors( 864 characteristic, start_handle, end_handle 865 ) 866 867 async def discover_attributes(self) -> List[gatt_client.AttributeProxy]: 868 return await self.gatt_client.discover_attributes() 869 870 async def subscribe( 871 self, 872 characteristic: gatt_client.CharacteristicProxy, 873 subscriber: Optional[Callable[[bytes], Any]] = None, 874 prefer_notify: bool = True, 875 ) -> None: 876 return await self.gatt_client.subscribe( 877 characteristic, subscriber, prefer_notify 878 ) 879 880 async def unsubscribe( 881 self, 882 characteristic: gatt_client.CharacteristicProxy, 883 subscriber: Optional[Callable[[bytes], Any]] = None, 884 ) -> None: 885 return await self.gatt_client.unsubscribe(characteristic, subscriber) 886 887 async def read_value( 888 self, attribute: Union[int, gatt_client.AttributeProxy] 889 ) -> bytes: 890 return await self.gatt_client.read_value(attribute) 891 892 async def write_value( 893 self, 894 attribute: Union[int, gatt_client.AttributeProxy], 895 value: bytes, 896 with_response: bool = False, 897 ) -> None: 898 return await self.gatt_client.write_value(attribute, value, with_response) 899 900 async def read_characteristics_by_uuid( 901 self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None 902 ) -> List[bytes]: 903 return await self.gatt_client.read_characteristics_by_uuid(uuid, service) 904 905 def get_services_by_uuid(self, uuid: core.UUID) -> List[gatt_client.ServiceProxy]: 906 return self.gatt_client.get_services_by_uuid(uuid) 907 908 def get_characteristics_by_uuid( 909 self, uuid: core.UUID, service: Optional[gatt_client.ServiceProxy] = None 910 ) -> List[gatt_client.CharacteristicProxy]: 911 return self.gatt_client.get_characteristics_by_uuid(uuid, service) 912 913 def create_service_proxy(self, proxy_class: Type[_PROXY_CLASS]) -> _PROXY_CLASS: 914 return cast(_PROXY_CLASS, proxy_class.from_client(self.gatt_client)) 915 916 async def discover_service_and_create_proxy( 917 self, proxy_class: Type[_PROXY_CLASS] 918 ) -> Optional[_PROXY_CLASS]: 919 # Discover the first matching service and its characteristics 920 services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID) 921 if services: 922 service = services[0] 923 await service.discover_characteristics() 924 return self.create_service_proxy(proxy_class) 925 return None 926 927 async def sustain(self, timeout: Optional[float] = None) -> None: 928 await self.connection.sustain(timeout) 929 930 # [Classic only] 931 async def request_name(self) -> str: 932 return await self.connection.request_remote_name() 933 934 async def __aenter__(self): 935 await self.discover_services() 936 for service in self.services: 937 await service.discover_characteristics() 938 939 return self 940 941 async def __aexit__(self, exc_type, exc_value, traceback): 942 pass 943 944 def __str__(self) -> str: 945 return f'{self.connection.peer_address} as {self.connection.role_name}' 946 947 948# ----------------------------------------------------------------------------- 949@dataclass 950class ConnectionParametersPreferences: 951 default: ClassVar[ConnectionParametersPreferences] 952 connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN 953 connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX 954 max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY 955 supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT 956 min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH 957 max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH 958 959 960ConnectionParametersPreferences.default = ConnectionParametersPreferences() 961 962 963# ----------------------------------------------------------------------------- 964@dataclass 965class ScoLink(CompositeEventEmitter): 966 device: Device 967 acl_connection: Connection 968 handle: int 969 link_type: int 970 sink: Optional[Callable[[HCI_SynchronousDataPacket], Any]] = None 971 972 def __post_init__(self) -> None: 973 super().__init__() 974 975 async def disconnect( 976 self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR 977 ) -> None: 978 await self.device.disconnect(self, reason) 979 980 981# ----------------------------------------------------------------------------- 982@dataclass 983class CisLink(CompositeEventEmitter): 984 class State(IntEnum): 985 PENDING = 0 986 ESTABLISHED = 1 987 988 device: Device 989 acl_connection: Connection # Based ACL connection 990 handle: int # CIS handle assigned by Controller (in LE_Set_CIG_Parameters Complete or LE_CIS_Request events) 991 cis_id: int # CIS ID assigned by Central device 992 cig_id: int # CIG ID assigned by Central device 993 state: State = State.PENDING 994 sink: Optional[Callable[[HCI_IsoDataPacket], Any]] = None 995 996 def __post_init__(self) -> None: 997 super().__init__() 998 999 async def disconnect( 1000 self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR 1001 ) -> None: 1002 await self.device.disconnect(self, reason) 1003 1004 1005# ----------------------------------------------------------------------------- 1006class Connection(CompositeEventEmitter): 1007 device: Device 1008 handle: int 1009 transport: int 1010 self_address: Address 1011 peer_address: Address 1012 peer_resolvable_address: Optional[Address] 1013 peer_le_features: Optional[LeFeatureMask] 1014 role: int 1015 encryption: int 1016 authenticated: bool 1017 sc: bool 1018 link_key_type: int 1019 gatt_client: gatt_client.Client 1020 pairing_peer_io_capability: Optional[int] 1021 pairing_peer_authentication_requirements: Optional[int] 1022 1023 @composite_listener 1024 class Listener: 1025 def on_disconnection(self, reason): 1026 pass 1027 1028 def on_connection_parameters_update(self): 1029 pass 1030 1031 def on_connection_parameters_update_failure(self, error): 1032 pass 1033 1034 def on_connection_data_length_change(self): 1035 pass 1036 1037 def on_connection_phy_update(self): 1038 pass 1039 1040 def on_connection_phy_update_failure(self, error): 1041 pass 1042 1043 def on_connection_att_mtu_update(self): 1044 pass 1045 1046 def on_connection_encryption_change(self): 1047 pass 1048 1049 def on_connection_encryption_key_refresh(self): 1050 pass 1051 1052 def __init__( 1053 self, 1054 device, 1055 handle, 1056 transport, 1057 self_address, 1058 peer_address, 1059 peer_resolvable_address, 1060 role, 1061 parameters, 1062 phy, 1063 ): 1064 super().__init__() 1065 self.device = device 1066 self.handle = handle 1067 self.transport = transport 1068 self.self_address = self_address 1069 self.peer_address = peer_address 1070 self.peer_resolvable_address = peer_resolvable_address 1071 self.peer_name = None # Classic only 1072 self.role = role 1073 self.parameters = parameters 1074 self.encryption = 0 1075 self.authenticated = False 1076 self.sc = False 1077 self.link_key_type = None 1078 self.phy = phy 1079 self.att_mtu = ATT_DEFAULT_MTU 1080 self.data_length = DEVICE_DEFAULT_DATA_LENGTH 1081 self.gatt_client = None # Per-connection client 1082 self.gatt_server = ( 1083 device.gatt_server 1084 ) # By default, use the device's shared server 1085 self.pairing_peer_io_capability = None 1086 self.pairing_peer_authentication_requirements = None 1087 self.peer_le_features = None 1088 1089 # [Classic only] 1090 @classmethod 1091 def incomplete(cls, device, peer_address, role): 1092 """ 1093 Instantiate an incomplete connection (ie. one waiting for a HCI Connection 1094 Complete event). 1095 Once received it shall be completed using the `.complete` method. 1096 """ 1097 return cls( 1098 device, 1099 None, 1100 BT_BR_EDR_TRANSPORT, 1101 device.public_address, 1102 peer_address, 1103 None, 1104 role, 1105 None, 1106 None, 1107 ) 1108 1109 # [Classic only] 1110 def complete(self, handle, parameters): 1111 """ 1112 Finish an incomplete connection upon completion. 1113 """ 1114 assert self.handle is None 1115 assert self.transport == BT_BR_EDR_TRANSPORT 1116 self.handle = handle 1117 self.parameters = parameters 1118 1119 @property 1120 def role_name(self): 1121 if self.role is None: 1122 return 'NOT-SET' 1123 if self.role == BT_CENTRAL_ROLE: 1124 return 'CENTRAL' 1125 if self.role == BT_PERIPHERAL_ROLE: 1126 return 'PERIPHERAL' 1127 return f'UNKNOWN[{self.role}]' 1128 1129 @property 1130 def is_encrypted(self): 1131 return self.encryption != 0 1132 1133 @property 1134 def is_incomplete(self) -> bool: 1135 return self.handle is None 1136 1137 def send_l2cap_pdu(self, cid: int, pdu: bytes) -> None: 1138 self.device.send_l2cap_pdu(self.handle, cid, pdu) 1139 1140 @deprecated("Please use create_l2cap_channel()") 1141 async def open_l2cap_channel( 1142 self, 1143 psm, 1144 max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS, 1145 mtu=DEVICE_DEFAULT_L2CAP_COC_MTU, 1146 mps=DEVICE_DEFAULT_L2CAP_COC_MPS, 1147 ): 1148 return await self.device.open_l2cap_channel(self, psm, max_credits, mtu, mps) 1149 1150 @overload 1151 async def create_l2cap_channel( 1152 self, spec: l2cap.ClassicChannelSpec 1153 ) -> l2cap.ClassicChannel: ... 1154 1155 @overload 1156 async def create_l2cap_channel( 1157 self, spec: l2cap.LeCreditBasedChannelSpec 1158 ) -> l2cap.LeCreditBasedChannel: ... 1159 1160 async def create_l2cap_channel( 1161 self, spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec] 1162 ) -> Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]: 1163 return await self.device.create_l2cap_channel(connection=self, spec=spec) 1164 1165 async def disconnect( 1166 self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR 1167 ) -> None: 1168 await self.device.disconnect(self, reason) 1169 1170 async def pair(self) -> None: 1171 return await self.device.pair(self) 1172 1173 def request_pairing(self) -> None: 1174 return self.device.request_pairing(self) 1175 1176 # [Classic only] 1177 async def authenticate(self) -> None: 1178 return await self.device.authenticate(self) 1179 1180 async def encrypt(self, enable: bool = True) -> None: 1181 return await self.device.encrypt(self, enable) 1182 1183 async def switch_role(self, role: int) -> None: 1184 return await self.device.switch_role(self, role) 1185 1186 async def sustain(self, timeout: Optional[float] = None) -> None: 1187 """Idles the current task waiting for a disconnect or timeout""" 1188 1189 abort = asyncio.get_running_loop().create_future() 1190 self.on('disconnection', abort.set_result) 1191 self.on('disconnection_failure', abort.set_exception) 1192 1193 try: 1194 await asyncio.wait_for(self.device.abort_on('flush', abort), timeout) 1195 except asyncio.TimeoutError: 1196 pass 1197 1198 self.remove_listener('disconnection', abort.set_result) 1199 self.remove_listener('disconnection_failure', abort.set_exception) 1200 1201 async def set_data_length(self, tx_octets, tx_time) -> None: 1202 return await self.device.set_data_length(self, tx_octets, tx_time) 1203 1204 async def update_parameters( 1205 self, 1206 connection_interval_min, 1207 connection_interval_max, 1208 max_latency, 1209 supervision_timeout, 1210 use_l2cap=False, 1211 ): 1212 return await self.device.update_connection_parameters( 1213 self, 1214 connection_interval_min, 1215 connection_interval_max, 1216 max_latency, 1217 supervision_timeout, 1218 use_l2cap=use_l2cap, 1219 ) 1220 1221 async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None): 1222 return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options) 1223 1224 async def get_rssi(self): 1225 return await self.device.get_connection_rssi(self) 1226 1227 async def get_phy(self): 1228 return await self.device.get_connection_phy(self) 1229 1230 # [Classic only] 1231 async def request_remote_name(self): 1232 return await self.device.request_remote_name(self) 1233 1234 async def get_remote_le_features(self) -> LeFeatureMask: 1235 """[LE Only] Reads remote LE supported features. 1236 1237 Returns: 1238 LE features supported by the remote device. 1239 """ 1240 self.peer_le_features = await self.device.get_remote_le_features(self) 1241 return self.peer_le_features 1242 1243 async def __aenter__(self): 1244 return self 1245 1246 async def __aexit__(self, exc_type, exc_value, traceback): 1247 if exc_type is None: 1248 try: 1249 await self.disconnect() 1250 except HCI_StatusError as error: 1251 # Invalid parameter means the connection is no longer valid 1252 if error.error_code != HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR: 1253 raise 1254 1255 def __str__(self): 1256 return ( 1257 f'Connection(handle=0x{self.handle:04X}, ' 1258 f'role={self.role_name}, ' 1259 f'self_address={self.self_address}, ' 1260 f'peer_address={self.peer_address})' 1261 ) 1262 1263 1264# ----------------------------------------------------------------------------- 1265@dataclass 1266class DeviceConfiguration: 1267 # Setup defaults 1268 name: str = DEVICE_DEFAULT_NAME 1269 address: Address = Address(DEVICE_DEFAULT_ADDRESS) 1270 class_of_device: int = DEVICE_DEFAULT_CLASS_OF_DEVICE 1271 scan_response_data: bytes = DEVICE_DEFAULT_SCAN_RESPONSE_DATA 1272 advertising_interval_min: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL 1273 advertising_interval_max: int = DEVICE_DEFAULT_ADVERTISING_INTERVAL 1274 le_enabled: bool = True 1275 # LE host enable 2nd parameter 1276 le_simultaneous_enabled: bool = False 1277 classic_enabled: bool = False 1278 classic_sc_enabled: bool = True 1279 classic_ssp_enabled: bool = True 1280 classic_smp_enabled: bool = True 1281 classic_accept_any: bool = True 1282 connectable: bool = True 1283 discoverable: bool = True 1284 advertising_data: bytes = bytes( 1285 AdvertisingData( 1286 [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(DEVICE_DEFAULT_NAME, 'utf-8'))] 1287 ) 1288 ) 1289 irk: bytes = bytes(16) # This really must be changed for any level of security 1290 keystore: Optional[str] = None 1291 address_resolution_offload: bool = False 1292 cis_enabled: bool = False 1293 1294 def __post_init__(self) -> None: 1295 self.gatt_services: List[Dict[str, Any]] = [] 1296 1297 def load_from_dict(self, config: Dict[str, Any]) -> None: 1298 config = copy.deepcopy(config) 1299 1300 # Load simple properties 1301 if address := config.pop('address', None): 1302 self.address = Address(address) 1303 1304 # Load or synthesize an IRK 1305 if irk := config.pop('irk', None): 1306 self.irk = bytes.fromhex(irk) 1307 elif self.address != Address(DEVICE_DEFAULT_ADDRESS): 1308 # Construct an IRK from the address bytes 1309 # NOTE: this is not secure, but will always give the same IRK for the same 1310 # address 1311 address_bytes = bytes(self.address) 1312 self.irk = (address_bytes * 3)[:16] 1313 else: 1314 # Fallback - when both IRK and address are not set, randomly generate an IRK. 1315 self.irk = secrets.token_bytes(16) 1316 1317 if (name := config.pop('name', None)) is not None: 1318 self.name = name 1319 1320 # Load advertising data 1321 if advertising_data := config.pop('advertising_data', None): 1322 self.advertising_data = bytes.fromhex(advertising_data) 1323 elif name is not None: 1324 self.advertising_data = bytes( 1325 AdvertisingData( 1326 [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))] 1327 ) 1328 ) 1329 1330 # Load advertising interval (for backward compatibility) 1331 if advertising_interval := config.pop('advertising_interval', None): 1332 self.advertising_interval_min = advertising_interval 1333 self.advertising_interval_max = advertising_interval 1334 if ( 1335 'advertising_interval_max' in config 1336 or 'advertising_interval_min' in config 1337 ): 1338 logger.warning( 1339 'Trying to set both advertising_interval and ' 1340 'advertising_interval_min/max, advertising_interval will be' 1341 'ignored.' 1342 ) 1343 1344 # Load data in primitive types. 1345 for key, value in config.items(): 1346 setattr(self, key, value) 1347 1348 def load_from_file(self, filename: str) -> None: 1349 with open(filename, 'r', encoding='utf-8') as file: 1350 self.load_from_dict(json.load(file)) 1351 1352 @classmethod 1353 def from_file(cls: Type[Self], filename: str) -> Self: 1354 config = cls() 1355 config.load_from_file(filename) 1356 return config 1357 1358 @classmethod 1359 def from_dict(cls: Type[Self], config: Dict[str, Any]) -> Self: 1360 device_config = cls() 1361 device_config.load_from_dict(config) 1362 return device_config 1363 1364 1365# ----------------------------------------------------------------------------- 1366# Decorators used with the following Device class 1367# (we define them outside of the Device class, because defining decorators 1368# within a class requires unnecessarily complicated acrobatics) 1369# ----------------------------------------------------------------------------- 1370 1371 1372# Decorator that converts the first argument from a connection handle to a connection 1373def with_connection_from_handle(function): 1374 @functools.wraps(function) 1375 def wrapper(self, connection_handle, *args, **kwargs): 1376 if (connection := self.lookup_connection(connection_handle)) is None: 1377 raise ValueError(f'no connection for handle: 0x{connection_handle:04x}') 1378 return function(self, connection, *args, **kwargs) 1379 1380 return wrapper 1381 1382 1383# Decorator that converts the first argument from a bluetooth address to a connection 1384def with_connection_from_address(function): 1385 @functools.wraps(function) 1386 def wrapper(self, address, *args, **kwargs): 1387 if connection := self.pending_connections.get(address, False): 1388 return function(self, connection, *args, **kwargs) 1389 for connection in self.connections.values(): 1390 if connection.peer_address == address: 1391 return function(self, connection, *args, **kwargs) 1392 raise ValueError('no connection for address') 1393 1394 return wrapper 1395 1396 1397# Decorator that tries to convert the first argument from a bluetooth address to a 1398# connection 1399def try_with_connection_from_address(function): 1400 @functools.wraps(function) 1401 def wrapper(self, address, *args, **kwargs): 1402 if connection := self.pending_connections.get(address, False): 1403 return function(self, connection, address, *args, **kwargs) 1404 for connection in self.connections.values(): 1405 if connection.peer_address == address: 1406 return function(self, connection, address, *args, **kwargs) 1407 return function(self, None, address, *args, **kwargs) 1408 1409 return wrapper 1410 1411 1412# Decorator that adds a method to the list of event handlers for host events. 1413# This assumes that the method name starts with `on_` 1414def host_event_handler(function): 1415 device_host_event_handlers.append(function.__name__[3:]) 1416 return function 1417 1418 1419# List of host event handlers for the Device class. 1420# (we define this list outside the class, because referencing a class in method 1421# decorators is not straightforward) 1422device_host_event_handlers: List[str] = [] 1423 1424 1425# ----------------------------------------------------------------------------- 1426class Device(CompositeEventEmitter): 1427 # Incomplete list of fields. 1428 random_address: Address 1429 public_address: Address 1430 classic_enabled: bool 1431 name: str 1432 class_of_device: int 1433 gatt_server: gatt_server.Server 1434 advertising_data: bytes 1435 scan_response_data: bytes 1436 connections: Dict[int, Connection] 1437 pending_connections: Dict[Address, Connection] 1438 classic_pending_accepts: Dict[ 1439 Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]] 1440 ] 1441 advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] 1442 config: DeviceConfiguration 1443 legacy_advertiser: Optional[LegacyAdvertiser] 1444 sco_links: Dict[int, ScoLink] 1445 cis_links: Dict[int, CisLink] 1446 _pending_cis: Dict[int, Tuple[int, int]] 1447 1448 @composite_listener 1449 class Listener: 1450 def on_advertisement(self, advertisement): 1451 pass 1452 1453 def on_inquiry_result(self, address, class_of_device, data, rssi): 1454 pass 1455 1456 def on_connection(self, connection): 1457 pass 1458 1459 def on_connection_failure(self, error): 1460 pass 1461 1462 def on_connection_request(self, bd_addr, class_of_device, link_type): 1463 pass 1464 1465 def on_characteristic_subscription( 1466 self, connection, characteristic, notify_enabled, indicate_enabled 1467 ): 1468 pass 1469 1470 @classmethod 1471 def with_hci( 1472 cls, 1473 name: str, 1474 address: Address, 1475 hci_source: TransportSource, 1476 hci_sink: TransportSink, 1477 ) -> Device: 1478 ''' 1479 Create a Device instance with a Host configured to communicate with a controller 1480 through an HCI source/sink 1481 ''' 1482 host = Host(controller_source=hci_source, controller_sink=hci_sink) 1483 return cls(name=name, address=address, host=host) 1484 1485 @classmethod 1486 def from_config_file(cls, filename: str) -> Device: 1487 config = DeviceConfiguration.from_file(filename) 1488 return cls(config=config) 1489 1490 @classmethod 1491 def from_config_with_hci( 1492 cls, 1493 config: DeviceConfiguration, 1494 hci_source: TransportSource, 1495 hci_sink: TransportSink, 1496 ) -> Device: 1497 host = Host(controller_source=hci_source, controller_sink=hci_sink) 1498 return cls(config=config, host=host) 1499 1500 @classmethod 1501 def from_config_file_with_hci( 1502 cls, filename: str, hci_source: TransportSource, hci_sink: TransportSink 1503 ) -> Device: 1504 config = DeviceConfiguration.from_file(filename) 1505 return cls.from_config_with_hci(config, hci_source, hci_sink) 1506 1507 def __init__( 1508 self, 1509 name: Optional[str] = None, 1510 address: Optional[Address] = None, 1511 config: Optional[DeviceConfiguration] = None, 1512 host: Optional[Host] = None, 1513 generic_access_service: bool = True, 1514 ) -> None: 1515 super().__init__() 1516 1517 self._host = None 1518 self.powered_on = False 1519 self.auto_restart_inquiry = True 1520 self.command_timeout = 10 # seconds 1521 self.gatt_server = gatt_server.Server(self) 1522 self.sdp_server = sdp.Server(self) 1523 self.l2cap_channel_manager = l2cap.ChannelManager( 1524 [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS] 1525 ) 1526 self.advertisement_accumulators = {} # Accumulators, by address 1527 self.scanning = False 1528 self.scanning_is_passive = False 1529 self.discovering = False 1530 self.le_connecting = False 1531 self.disconnecting = False 1532 self.connections = {} # Connections, by connection handle 1533 self.pending_connections = {} # Connections, by BD address (BR/EDR only) 1534 self.sco_links = {} # ScoLinks, by connection handle (BR/EDR only) 1535 self.cis_links = {} # CisLinks, by connection handle (LE only) 1536 self._pending_cis = {} # (CIS_ID, CIG_ID), by CIS_handle 1537 self.classic_enabled = False 1538 self.inquiry_response = None 1539 self.address_resolver = None 1540 self.classic_pending_accepts = { 1541 Address.ANY: [] 1542 } # Futures, by BD address OR [Futures] for Address.ANY 1543 1544 # In Python <= 3.9 + Rust Runtime, asyncio.Lock cannot be properly initiated. 1545 if sys.version_info >= (3, 10): 1546 self._cis_lock = asyncio.Lock() 1547 else: 1548 self._cis_lock = AsyncExitStack() 1549 1550 # Own address type cache 1551 self.connect_own_address_type = None 1552 1553 # Use the initial config or a default 1554 config = config or DeviceConfiguration() 1555 self.config = config 1556 1557 self.public_address = Address('00:00:00:00:00:00') 1558 self.name = config.name 1559 self.random_address = config.address 1560 self.class_of_device = config.class_of_device 1561 self.keystore = None 1562 self.irk = config.irk 1563 self.le_enabled = config.le_enabled 1564 self.classic_enabled = config.classic_enabled 1565 self.le_simultaneous_enabled = config.le_simultaneous_enabled 1566 self.cis_enabled = config.cis_enabled 1567 self.classic_sc_enabled = config.classic_sc_enabled 1568 self.classic_ssp_enabled = config.classic_ssp_enabled 1569 self.classic_smp_enabled = config.classic_smp_enabled 1570 self.discoverable = config.discoverable 1571 self.connectable = config.connectable 1572 self.classic_accept_any = config.classic_accept_any 1573 self.address_resolution_offload = config.address_resolution_offload 1574 1575 # Extended advertising. 1576 self.extended_advertising_sets: Dict[int, AdvertisingSet] = {} 1577 1578 # Legacy advertising. 1579 # The advertising and scan response data, as well as the advertising interval 1580 # values are stored as properties of this object for convenience so that they 1581 # can be initialized from a config object, and for backward compatibility for 1582 # client code that may set those values directly before calling 1583 # start_advertising(). 1584 self.legacy_advertising_set: Optional[AdvertisingSet] = None 1585 self.legacy_advertiser: Optional[LegacyAdvertiser] = None 1586 self.advertising_data = config.advertising_data 1587 self.scan_response_data = config.scan_response_data 1588 self.advertising_interval_min = config.advertising_interval_min 1589 self.advertising_interval_max = config.advertising_interval_max 1590 1591 for service in config.gatt_services: 1592 characteristics = [] 1593 for characteristic in service.get("characteristics", []): 1594 descriptors = [] 1595 for descriptor in characteristic.get("descriptors", []): 1596 # Leave this check until 5/25/2023 1597 if descriptor.get("permission", False): 1598 raise Exception( 1599 "Error parsing Device Config's GATT Services. " 1600 "The key 'permission' must be renamed to 'permissions'" 1601 ) 1602 new_descriptor = Descriptor( 1603 attribute_type=descriptor["descriptor_type"], 1604 permissions=descriptor["permissions"], 1605 ) 1606 descriptors.append(new_descriptor) 1607 new_characteristic = Characteristic( 1608 uuid=characteristic["uuid"], 1609 properties=Characteristic.Properties.from_string( 1610 characteristic["properties"] 1611 ), 1612 permissions=characteristic["permissions"], 1613 descriptors=descriptors, 1614 ) 1615 characteristics.append(new_characteristic) 1616 new_service = Service(uuid=service["uuid"], characteristics=characteristics) 1617 self.gatt_server.add_service(new_service) 1618 1619 # If a name is passed, override the name from the config 1620 if name: 1621 self.name = name 1622 1623 # If an address is passed, override the address from the config 1624 if address: 1625 if isinstance(address, str): 1626 address = Address(address) 1627 self.random_address = address 1628 1629 # Setup SMP 1630 self.smp_manager = smp.Manager( 1631 self, pairing_config_factory=lambda connection: PairingConfig() 1632 ) 1633 1634 self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu) 1635 1636 # Register the SDP server with the L2CAP Channel Manager 1637 self.sdp_server.register(self.l2cap_channel_manager) 1638 1639 self.add_default_services(generic_access_service) 1640 self.l2cap_channel_manager.register_fixed_channel(ATT_CID, self.on_gatt_pdu) 1641 1642 # Forward some events 1643 setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription') 1644 1645 # Set the initial host 1646 if host: 1647 self.host = host 1648 1649 @property 1650 def host(self) -> Host: 1651 assert self._host 1652 return self._host 1653 1654 @host.setter 1655 def host(self, host: Host) -> None: 1656 # Unsubscribe from events from the current host 1657 if self._host: 1658 for event_name in device_host_event_handlers: 1659 self._host.remove_listener( 1660 event_name, getattr(self, f'on_{event_name}') 1661 ) 1662 1663 # Subscribe to events from the new host 1664 if host: 1665 for event_name in device_host_event_handlers: 1666 host.on(event_name, getattr(self, f'on_{event_name}')) 1667 1668 # Update the references to the new host 1669 self._host = host 1670 self.l2cap_channel_manager.host = host 1671 1672 # Set providers for the new host 1673 if host: 1674 host.long_term_key_provider = self.get_long_term_key 1675 host.link_key_provider = self.get_link_key 1676 1677 @property 1678 def sdp_service_records(self): 1679 return self.sdp_server.service_records 1680 1681 @sdp_service_records.setter 1682 def sdp_service_records(self, service_records): 1683 self.sdp_server.service_records = service_records 1684 1685 def lookup_connection(self, connection_handle: int) -> Optional[Connection]: 1686 if connection := self.connections.get(connection_handle): 1687 return connection 1688 1689 return None 1690 1691 def find_connection_by_bd_addr( 1692 self, 1693 bd_addr: Address, 1694 transport: Optional[int] = None, 1695 check_address_type: bool = False, 1696 ) -> Optional[Connection]: 1697 for connection in self.connections.values(): 1698 if connection.peer_address.to_bytes() == bd_addr.to_bytes(): 1699 if ( 1700 check_address_type 1701 and connection.peer_address.address_type != bd_addr.address_type 1702 ): 1703 continue 1704 if transport is None or connection.transport == transport: 1705 return connection 1706 1707 return None 1708 1709 @deprecated("Please use create_l2cap_server()") 1710 def register_l2cap_server(self, psm, server) -> int: 1711 return self.l2cap_channel_manager.register_server(psm, server) 1712 1713 @deprecated("Please use create_l2cap_server()") 1714 def register_l2cap_channel_server( 1715 self, 1716 psm, 1717 server, 1718 max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS, 1719 mtu=DEVICE_DEFAULT_L2CAP_COC_MTU, 1720 mps=DEVICE_DEFAULT_L2CAP_COC_MPS, 1721 ): 1722 return self.l2cap_channel_manager.register_le_coc_server( 1723 psm, server, max_credits, mtu, mps 1724 ) 1725 1726 @deprecated("Please use create_l2cap_channel()") 1727 async def open_l2cap_channel( 1728 self, 1729 connection, 1730 psm, 1731 max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS, 1732 mtu=DEVICE_DEFAULT_L2CAP_COC_MTU, 1733 mps=DEVICE_DEFAULT_L2CAP_COC_MPS, 1734 ): 1735 return await self.l2cap_channel_manager.open_le_coc( 1736 connection, psm, max_credits, mtu, mps 1737 ) 1738 1739 @overload 1740 async def create_l2cap_channel( 1741 self, 1742 connection: Connection, 1743 spec: l2cap.ClassicChannelSpec, 1744 ) -> l2cap.ClassicChannel: ... 1745 1746 @overload 1747 async def create_l2cap_channel( 1748 self, 1749 connection: Connection, 1750 spec: l2cap.LeCreditBasedChannelSpec, 1751 ) -> l2cap.LeCreditBasedChannel: ... 1752 1753 async def create_l2cap_channel( 1754 self, 1755 connection: Connection, 1756 spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec], 1757 ) -> Union[l2cap.ClassicChannel, l2cap.LeCreditBasedChannel]: 1758 if isinstance(spec, l2cap.ClassicChannelSpec): 1759 return await self.l2cap_channel_manager.create_classic_channel( 1760 connection=connection, spec=spec 1761 ) 1762 if isinstance(spec, l2cap.LeCreditBasedChannelSpec): 1763 return await self.l2cap_channel_manager.create_le_credit_based_channel( 1764 connection=connection, spec=spec 1765 ) 1766 1767 @overload 1768 def create_l2cap_server( 1769 self, 1770 spec: l2cap.ClassicChannelSpec, 1771 handler: Optional[Callable[[l2cap.ClassicChannel], Any]] = None, 1772 ) -> l2cap.ClassicChannelServer: ... 1773 1774 @overload 1775 def create_l2cap_server( 1776 self, 1777 spec: l2cap.LeCreditBasedChannelSpec, 1778 handler: Optional[Callable[[l2cap.LeCreditBasedChannel], Any]] = None, 1779 ) -> l2cap.LeCreditBasedChannelServer: ... 1780 1781 def create_l2cap_server( 1782 self, 1783 spec: Union[l2cap.ClassicChannelSpec, l2cap.LeCreditBasedChannelSpec], 1784 handler: Union[ 1785 Callable[[l2cap.ClassicChannel], Any], 1786 Callable[[l2cap.LeCreditBasedChannel], Any], 1787 None, 1788 ] = None, 1789 ) -> Union[l2cap.ClassicChannelServer, l2cap.LeCreditBasedChannelServer]: 1790 if isinstance(spec, l2cap.ClassicChannelSpec): 1791 return self.l2cap_channel_manager.create_classic_server( 1792 spec=spec, 1793 handler=cast(Callable[[l2cap.ClassicChannel], Any], handler), 1794 ) 1795 elif isinstance(spec, l2cap.LeCreditBasedChannelSpec): 1796 return self.l2cap_channel_manager.create_le_credit_based_server( 1797 handler=cast(Callable[[l2cap.LeCreditBasedChannel], Any], handler), 1798 spec=spec, 1799 ) 1800 else: 1801 raise ValueError(f'Unexpected mode {spec}') 1802 1803 def send_l2cap_pdu(self, connection_handle: int, cid: int, pdu: bytes) -> None: 1804 self.host.send_l2cap_pdu(connection_handle, cid, pdu) 1805 1806 async def send_command(self, command, check_result=False): 1807 try: 1808 return await asyncio.wait_for( 1809 self.host.send_command(command, check_result), self.command_timeout 1810 ) 1811 except asyncio.TimeoutError as error: 1812 logger.warning(f'!!! Command {command.name} timed out') 1813 raise CommandTimeoutError() from error 1814 1815 async def power_on(self) -> None: 1816 # Reset the controller 1817 await self.host.reset() 1818 1819 # Try to get the public address from the controller 1820 response = await self.send_command(HCI_Read_BD_ADDR_Command()) 1821 if response.return_parameters.status == HCI_SUCCESS: 1822 logger.debug( 1823 color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow') 1824 ) 1825 self.public_address = response.return_parameters.bd_addr 1826 1827 # Instantiate the Key Store (we do this here rather than at __init__ time 1828 # because some Key Store implementations use the public address as a namespace) 1829 if self.keystore is None: 1830 self.keystore = KeyStore.create_for_device(self) 1831 1832 # Finish setting up SMP based on post-init configurable options 1833 if self.classic_smp_enabled: 1834 self.l2cap_channel_manager.register_fixed_channel( 1835 smp.SMP_BR_CID, self.on_smp_pdu 1836 ) 1837 1838 if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND): 1839 await self.send_command( 1840 HCI_Write_LE_Host_Support_Command( 1841 le_supported_host=int(self.le_enabled), 1842 simultaneous_le_host=int(self.le_simultaneous_enabled), 1843 ) 1844 ) 1845 1846 if self.le_enabled: 1847 # Set the controller address 1848 if self.random_address == Address.ANY_RANDOM: 1849 # Try to use an address generated at random by the controller 1850 if self.host.supports_command(HCI_LE_RAND_COMMAND): 1851 # Get 8 random bytes 1852 response = await self.send_command( 1853 HCI_LE_Rand_Command(), check_result=True 1854 ) 1855 1856 # Ensure the address bytes can be a static random address 1857 address_bytes = response.return_parameters.random_number[ 1858 :5 1859 ] + bytes([response.return_parameters.random_number[5] | 0xC0]) 1860 1861 # Create a static random address from the random bytes 1862 self.random_address = Address(address_bytes) 1863 1864 if self.random_address != Address.ANY_RANDOM: 1865 logger.debug( 1866 color( 1867 f'LE Random Address: {self.random_address}', 1868 'yellow', 1869 ) 1870 ) 1871 await self.send_command( 1872 HCI_LE_Set_Random_Address_Command( 1873 random_address=self.random_address 1874 ), 1875 check_result=True, 1876 ) 1877 1878 # Load the address resolving list 1879 if self.keystore: 1880 await self.refresh_resolving_list() 1881 1882 # Enable address resolution 1883 if self.address_resolution_offload: 1884 await self.send_command( 1885 HCI_LE_Set_Address_Resolution_Enable_Command( 1886 address_resolution_enable=1 1887 ) 1888 ) 1889 1890 if self.cis_enabled: 1891 await self.send_command( 1892 HCI_LE_Set_Host_Feature_Command( 1893 bit_number=LeFeature.CONNECTED_ISOCHRONOUS_STREAM, 1894 bit_value=1, 1895 ) 1896 ) 1897 1898 if self.classic_enabled: 1899 await self.send_command( 1900 HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) 1901 ) 1902 await self.send_command( 1903 HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) 1904 ) 1905 await self.send_command( 1906 HCI_Write_Simple_Pairing_Mode_Command( 1907 simple_pairing_mode=int(self.classic_ssp_enabled) 1908 ) 1909 ) 1910 await self.send_command( 1911 HCI_Write_Secure_Connections_Host_Support_Command( 1912 secure_connections_host_support=int(self.classic_sc_enabled) 1913 ) 1914 ) 1915 await self.set_connectable(self.connectable) 1916 await self.set_discoverable(self.discoverable) 1917 1918 # Done 1919 self.powered_on = True 1920 1921 async def reset(self) -> None: 1922 await self.host.reset() 1923 1924 async def power_off(self) -> None: 1925 if self.powered_on: 1926 await self.host.flush() 1927 self.powered_on = False 1928 1929 async def refresh_resolving_list(self) -> None: 1930 assert self.keystore is not None 1931 1932 resolving_keys = await self.keystore.get_resolving_keys() 1933 # Create a host-side address resolver 1934 self.address_resolver = smp.AddressResolver(resolving_keys) 1935 1936 if self.address_resolution_offload: 1937 await self.send_command(HCI_LE_Clear_Resolving_List_Command()) 1938 1939 # Add an empty entry for non-directed address generation. 1940 await self.send_command( 1941 HCI_LE_Add_Device_To_Resolving_List_Command( 1942 peer_identity_address_type=Address.ANY.address_type, 1943 peer_identity_address=Address.ANY, 1944 peer_irk=bytes(16), 1945 local_irk=self.irk, 1946 ) 1947 ) 1948 1949 for irk, address in resolving_keys: 1950 await self.send_command( 1951 HCI_LE_Add_Device_To_Resolving_List_Command( 1952 peer_identity_address_type=address.address_type, 1953 peer_identity_address=address, 1954 peer_irk=irk, 1955 local_irk=self.irk, 1956 ) 1957 ) 1958 1959 def supports_le_features(self, feature: LeFeatureMask) -> bool: 1960 return self.host.supports_le_features(feature) 1961 1962 def supports_le_phy(self, phy): 1963 if phy == HCI_LE_1M_PHY: 1964 return True 1965 1966 feature_map = { 1967 HCI_LE_2M_PHY: LeFeatureMask.LE_2M_PHY, 1968 HCI_LE_CODED_PHY: LeFeatureMask.LE_CODED_PHY, 1969 } 1970 if phy not in feature_map: 1971 raise ValueError('invalid PHY') 1972 1973 return self.supports_le_features(feature_map[phy]) 1974 1975 @property 1976 def supports_le_extended_advertising(self): 1977 return self.supports_le_features(LeFeatureMask.LE_EXTENDED_ADVERTISING) 1978 1979 async def start_advertising( 1980 self, 1981 advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 1982 target: Optional[Address] = None, 1983 own_address_type: int = OwnAddressType.RANDOM, 1984 auto_restart: bool = False, 1985 advertising_data: Optional[bytes] = None, 1986 scan_response_data: Optional[bytes] = None, 1987 advertising_interval_min: Optional[int] = None, 1988 advertising_interval_max: Optional[int] = None, 1989 ) -> None: 1990 """Start legacy advertising. 1991 1992 If the controller supports it, extended advertising commands with legacy PDUs 1993 will be used to advertise. If not, legacy advertising commands will be used. 1994 1995 Args: 1996 advertising_type: 1997 Type of advertising events. 1998 target: 1999 Peer address for directed advertising target. 2000 (Ignored if `advertising_type` is not directed) 2001 own_address_type: 2002 Own address type to use in the advertising. 2003 auto_restart: 2004 Whether the advertisement will be restarted after disconnection. 2005 advertising_data: 2006 Raw advertising data. If None, the value of the property 2007 self.advertising_data will be used. 2008 scan_response_data: 2009 Raw scan response. If None, the value of the property 2010 self.scan_response_data will be used. 2011 advertising_interval_min: 2012 Minimum advertising interval, in milliseconds. If None, the value of the 2013 property self.advertising_interval_min will be used. 2014 advertising_interval_max: 2015 Maximum advertising interval, in milliseconds. If None, the value of the 2016 property self.advertising_interval_max will be used. 2017 """ 2018 # Update backing properties. 2019 if advertising_data is not None: 2020 self.advertising_data = advertising_data 2021 if scan_response_data is not None: 2022 self.scan_response_data = scan_response_data 2023 if advertising_interval_min is not None: 2024 self.advertising_interval_min = advertising_interval_min 2025 if advertising_interval_max is not None: 2026 self.advertising_interval_max = advertising_interval_max 2027 2028 # Decide what peer address to use 2029 if advertising_type.is_directed: 2030 if target is None: 2031 raise ValueError('directed advertising requires a target') 2032 peer_address = target 2033 else: 2034 peer_address = Address.ANY 2035 2036 # If we're already advertising, stop now because we'll be re-creating 2037 # a new advertiser or advertising set. 2038 await self.stop_advertising() 2039 assert self.legacy_advertiser is None 2040 assert self.legacy_advertising_set is None 2041 2042 if self.supports_le_extended_advertising: 2043 # Use extended advertising commands with legacy PDUs. 2044 self.legacy_advertising_set = await self.create_advertising_set( 2045 auto_start=True, 2046 auto_restart=auto_restart, 2047 random_address=self.random_address, 2048 advertising_parameters=AdvertisingParameters( 2049 advertising_event_properties=( 2050 AdvertisingEventProperties.from_advertising_type( 2051 advertising_type 2052 ) 2053 ), 2054 primary_advertising_interval_min=self.advertising_interval_min, 2055 primary_advertising_interval_max=self.advertising_interval_max, 2056 own_address_type=OwnAddressType(own_address_type), 2057 peer_address=peer_address, 2058 ), 2059 advertising_data=( 2060 self.advertising_data if advertising_type.has_data else b'' 2061 ), 2062 scan_response_data=( 2063 self.scan_response_data if advertising_type.is_scannable else b'' 2064 ), 2065 ) 2066 else: 2067 # Use legacy commands. 2068 self.legacy_advertiser = LegacyAdvertiser( 2069 device=self, 2070 advertising_type=advertising_type, 2071 own_address_type=OwnAddressType(own_address_type), 2072 peer_address=peer_address, 2073 auto_restart=auto_restart, 2074 ) 2075 2076 await self.legacy_advertiser.start() 2077 2078 async def stop_advertising(self) -> None: 2079 """Stop legacy advertising.""" 2080 # Disable advertising 2081 if self.legacy_advertising_set: 2082 if self.legacy_advertising_set.enabled: 2083 await self.legacy_advertising_set.stop() 2084 await self.legacy_advertising_set.remove() 2085 self.legacy_advertising_set = None 2086 elif self.legacy_advertiser: 2087 await self.legacy_advertiser.stop() 2088 self.legacy_advertiser = None 2089 2090 async def create_advertising_set( 2091 self, 2092 advertising_parameters: Optional[AdvertisingParameters] = None, 2093 random_address: Optional[Address] = None, 2094 advertising_data: bytes = b'', 2095 scan_response_data: bytes = b'', 2096 periodic_advertising_parameters: Optional[PeriodicAdvertisingParameters] = None, 2097 periodic_advertising_data: bytes = b'', 2098 auto_start: bool = True, 2099 auto_restart: bool = False, 2100 ) -> AdvertisingSet: 2101 """ 2102 Create an advertising set. 2103 2104 This method allows the creation of advertising sets for controllers that 2105 support extended advertising. 2106 2107 Args: 2108 advertising_parameters: 2109 The parameters to use for this set. If None, default parameters are used. 2110 random_address: 2111 The random address to use (only relevant when the parameters specify that 2112 own_address_type is random). 2113 advertising_data: 2114 Initial value for the set's advertising data. 2115 scan_response_data: 2116 Initial value for the set's scan response data. 2117 periodic_advertising_parameters: 2118 The parameters to use for periodic advertising (if needed). 2119 periodic_advertising_data: 2120 Initial value for the set's periodic advertising data. 2121 auto_start: 2122 True if the set should be automatically started upon creation. 2123 auto_restart: 2124 True if the set should be automatically restated after a disconnection. 2125 2126 Returns: 2127 An AdvertisingSet instance. 2128 """ 2129 # Instantiate default values 2130 if advertising_parameters is None: 2131 advertising_parameters = AdvertisingParameters() 2132 2133 if ( 2134 not advertising_parameters.advertising_event_properties.is_legacy 2135 and advertising_data 2136 and scan_response_data 2137 ): 2138 raise ValueError( 2139 "Extended advertisements can't have both data and scan \ 2140 response data" 2141 ) 2142 2143 # Allocate a new handle 2144 try: 2145 advertising_handle = next( 2146 handle 2147 for handle in range( 2148 DEVICE_MIN_EXTENDED_ADVERTISING_SET_HANDLE, 2149 DEVICE_MAX_EXTENDED_ADVERTISING_SET_HANDLE + 1, 2150 ) 2151 if handle not in self.extended_advertising_sets 2152 ) 2153 except StopIteration as exc: 2154 raise RuntimeError("all valid advertising handles already in use") from exc 2155 2156 # Use the device's random address if a random address is needed but none was 2157 # provided. 2158 if ( 2159 advertising_parameters.own_address_type 2160 in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) 2161 and random_address is None 2162 ): 2163 random_address = self.random_address 2164 2165 # Create the object that represents the set. 2166 advertising_set = AdvertisingSet( 2167 device=self, 2168 advertising_handle=advertising_handle, 2169 auto_restart=auto_restart, 2170 random_address=random_address, 2171 advertising_parameters=advertising_parameters, 2172 advertising_data=advertising_data, 2173 scan_response_data=scan_response_data, 2174 periodic_advertising_parameters=periodic_advertising_parameters, 2175 periodic_advertising_data=periodic_advertising_data, 2176 ) 2177 2178 # Create the set in the controller. 2179 await advertising_set.set_advertising_parameters(advertising_parameters) 2180 2181 # Update the set in the controller. 2182 try: 2183 if random_address: 2184 await advertising_set.set_random_address(random_address) 2185 2186 if advertising_data: 2187 await advertising_set.set_advertising_data(advertising_data) 2188 2189 if scan_response_data: 2190 await advertising_set.set_scan_response_data(scan_response_data) 2191 2192 if periodic_advertising_parameters: 2193 # TODO: call LE Set Periodic Advertising Parameters command 2194 raise NotImplementedError('periodic advertising not yet supported') 2195 2196 if periodic_advertising_data: 2197 # TODO: call LE Set Periodic Advertising Data command 2198 raise NotImplementedError('periodic advertising not yet supported') 2199 2200 except HCI_Error as error: 2201 # Remove the advertising set so that it doesn't stay dangling in the 2202 # controller. 2203 await self.send_command( 2204 HCI_LE_Remove_Advertising_Set_Command( 2205 advertising_handle=advertising_handle 2206 ), 2207 check_result=False, 2208 ) 2209 raise error 2210 2211 # Remember the set. 2212 self.extended_advertising_sets[advertising_handle] = advertising_set 2213 2214 # Try to start the set if requested. 2215 if auto_start: 2216 try: 2217 # pylint: disable=line-too-long 2218 duration = ( 2219 DEVICE_MAX_HIGH_DUTY_CYCLE_CONNECTABLE_DIRECTED_ADVERTISING_DURATION 2220 if advertising_parameters.advertising_event_properties.is_high_duty_cycle_directed_connectable 2221 else 0 2222 ) 2223 await advertising_set.start(duration=duration) 2224 except Exception as error: 2225 logger.exception(f'failed to start advertising set: {error}') 2226 await advertising_set.remove() 2227 raise 2228 2229 return advertising_set 2230 2231 @property 2232 def is_advertising(self): 2233 if self.legacy_advertiser: 2234 return True 2235 2236 return any( 2237 advertising_set.enabled 2238 for advertising_set in self.extended_advertising_sets.values() 2239 ) 2240 2241 async def start_scanning( 2242 self, 2243 legacy: bool = False, 2244 active: bool = True, 2245 scan_interval: int = DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms 2246 scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms 2247 own_address_type: int = OwnAddressType.RANDOM, 2248 filter_duplicates: bool = False, 2249 scanning_phys: List[int] = [HCI_LE_1M_PHY, HCI_LE_CODED_PHY], 2250 ) -> None: 2251 # Check that the arguments are legal 2252 if scan_interval < scan_window: 2253 raise ValueError('scan_interval must be >= scan_window') 2254 if ( 2255 scan_interval < DEVICE_MIN_SCAN_INTERVAL 2256 or scan_interval > DEVICE_MAX_SCAN_INTERVAL 2257 ): 2258 raise ValueError('scan_interval out of range') 2259 if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW: 2260 raise ValueError('scan_interval out of range') 2261 2262 # Reset the accumulators 2263 self.advertisement_accumulators = {} 2264 2265 # Enable scanning 2266 if not legacy and self.supports_le_extended_advertising: 2267 # Set the scanning parameters 2268 scan_type = ( 2269 HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING 2270 if active 2271 else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING 2272 ) 2273 scanning_filter_policy = ( 2274 HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY 2275 ) # TODO: support other types 2276 2277 scanning_phy_count = 0 2278 scanning_phys_bits = 0 2279 if HCI_LE_1M_PHY in scanning_phys: 2280 scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT 2281 scanning_phy_count += 1 2282 if HCI_LE_CODED_PHY in scanning_phys: 2283 if self.supports_le_features(LeFeatureMask.LE_CODED_PHY): 2284 scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT 2285 scanning_phy_count += 1 2286 2287 if scanning_phy_count == 0: 2288 raise ValueError('at least one scanning PHY must be enabled') 2289 2290 await self.send_command( 2291 HCI_LE_Set_Extended_Scan_Parameters_Command( 2292 own_address_type=own_address_type, 2293 scanning_filter_policy=scanning_filter_policy, 2294 scanning_phys=scanning_phys_bits, 2295 scan_types=[scan_type] * scanning_phy_count, 2296 scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count, 2297 scan_windows=[int(scan_window / 0.625)] * scanning_phy_count, 2298 ), 2299 check_result=True, 2300 ) 2301 2302 # Enable scanning 2303 await self.send_command( 2304 HCI_LE_Set_Extended_Scan_Enable_Command( 2305 enable=1, 2306 filter_duplicates=1 if filter_duplicates else 0, 2307 duration=0, # TODO allow other values 2308 period=0, # TODO allow other values 2309 ), 2310 check_result=True, 2311 ) 2312 else: 2313 # Set the scanning parameters 2314 scan_type = ( 2315 HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING 2316 if active 2317 else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING 2318 ) 2319 await self.send_command( 2320 # pylint: disable=line-too-long 2321 HCI_LE_Set_Scan_Parameters_Command( 2322 le_scan_type=scan_type, 2323 le_scan_interval=int(scan_window / 0.625), 2324 le_scan_window=int(scan_window / 0.625), 2325 own_address_type=own_address_type, 2326 scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY, 2327 ), 2328 check_result=True, 2329 ) 2330 2331 # Enable scanning 2332 await self.send_command( 2333 HCI_LE_Set_Scan_Enable_Command( 2334 le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0 2335 ), 2336 check_result=True, 2337 ) 2338 2339 self.scanning_is_passive = not active 2340 self.scanning = True 2341 2342 async def stop_scanning(self, legacy: bool = False) -> None: 2343 # Disable scanning 2344 if not legacy and self.supports_le_extended_advertising: 2345 await self.send_command( 2346 HCI_LE_Set_Extended_Scan_Enable_Command( 2347 enable=0, filter_duplicates=0, duration=0, period=0 2348 ), 2349 check_result=True, 2350 ) 2351 else: 2352 await self.send_command( 2353 HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), 2354 check_result=True, 2355 ) 2356 2357 self.scanning = False 2358 2359 @property 2360 def is_scanning(self): 2361 return self.scanning 2362 2363 @host_event_handler 2364 def on_advertising_report(self, report): 2365 if not (accumulator := self.advertisement_accumulators.get(report.address)): 2366 accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive) 2367 self.advertisement_accumulators[report.address] = accumulator 2368 if advertisement := accumulator.update(report): 2369 self.emit('advertisement', advertisement) 2370 2371 async def start_discovery(self, auto_restart: bool = True) -> None: 2372 await self.send_command( 2373 HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), 2374 check_result=True, 2375 ) 2376 2377 response = await self.send_command( 2378 HCI_Inquiry_Command( 2379 lap=HCI_GENERAL_INQUIRY_LAP, 2380 inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH, 2381 num_responses=0, # Unlimited number of responses. 2382 ) 2383 ) 2384 if response.status != HCI_Command_Status_Event.PENDING: 2385 self.discovering = False 2386 raise HCI_StatusError(response) 2387 2388 self.auto_restart_inquiry = auto_restart 2389 self.discovering = True 2390 2391 async def stop_discovery(self) -> None: 2392 if self.discovering: 2393 await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) 2394 self.auto_restart_inquiry = True 2395 self.discovering = False 2396 2397 @host_event_handler 2398 def on_inquiry_result(self, address, class_of_device, data, rssi): 2399 self.emit( 2400 'inquiry_result', 2401 address, 2402 class_of_device, 2403 AdvertisingData.from_bytes(data), 2404 rssi, 2405 ) 2406 2407 async def set_scan_enable(self, inquiry_scan_enabled, page_scan_enabled): 2408 if inquiry_scan_enabled and page_scan_enabled: 2409 scan_enable = 0x03 2410 elif page_scan_enabled: 2411 scan_enable = 0x02 2412 elif inquiry_scan_enabled: 2413 scan_enable = 0x01 2414 else: 2415 scan_enable = 0x00 2416 2417 return await self.send_command( 2418 HCI_Write_Scan_Enable_Command(scan_enable=scan_enable) 2419 ) 2420 2421 async def set_discoverable(self, discoverable: bool = True) -> None: 2422 self.discoverable = discoverable 2423 if self.classic_enabled: 2424 # Synthesize an inquiry response if none is set already 2425 if self.inquiry_response is None: 2426 self.inquiry_response = bytes( 2427 AdvertisingData( 2428 [ 2429 ( 2430 AdvertisingData.COMPLETE_LOCAL_NAME, 2431 bytes(self.name, 'utf-8'), 2432 ) 2433 ] 2434 ) 2435 ) 2436 2437 # Update the controller 2438 await self.send_command( 2439 HCI_Write_Extended_Inquiry_Response_Command( 2440 fec_required=0, extended_inquiry_response=self.inquiry_response 2441 ), 2442 check_result=True, 2443 ) 2444 await self.set_scan_enable( 2445 inquiry_scan_enabled=self.discoverable, 2446 page_scan_enabled=self.connectable, 2447 ) 2448 2449 async def set_connectable(self, connectable: bool = True) -> None: 2450 self.connectable = connectable 2451 if self.classic_enabled: 2452 await self.set_scan_enable( 2453 inquiry_scan_enabled=self.discoverable, 2454 page_scan_enabled=self.connectable, 2455 ) 2456 2457 async def connect( 2458 self, 2459 peer_address: Union[Address, str], 2460 transport: int = BT_LE_TRANSPORT, 2461 connection_parameters_preferences: Optional[ 2462 Dict[int, ConnectionParametersPreferences] 2463 ] = None, 2464 own_address_type: int = OwnAddressType.RANDOM, 2465 timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, 2466 ) -> Connection: 2467 ''' 2468 Request a connection to a peer. 2469 When transport is BLE, this method cannot be called if there is already a 2470 pending connection. 2471 2472 connection_parameters_preferences: (BLE only, ignored for BR/EDR) 2473 * None: use the 1M PHY with default parameters 2474 * map: each entry has a PHY as key and a ConnectionParametersPreferences 2475 object as value 2476 2477 own_address_type: (BLE only) 2478 ''' 2479 2480 # Check parameters 2481 if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT): 2482 raise ValueError('invalid transport') 2483 2484 # Adjust the transport automatically if we need to 2485 if transport == BT_LE_TRANSPORT and not self.le_enabled: 2486 transport = BT_BR_EDR_TRANSPORT 2487 elif transport == BT_BR_EDR_TRANSPORT and not self.classic_enabled: 2488 transport = BT_LE_TRANSPORT 2489 2490 # Check that there isn't already a pending connection 2491 if transport == BT_LE_TRANSPORT and self.is_le_connecting: 2492 raise InvalidStateError('connection already pending') 2493 2494 if isinstance(peer_address, str): 2495 try: 2496 peer_address = Address.from_string_for_transport( 2497 peer_address, transport 2498 ) 2499 except ValueError: 2500 # If the address is not parsable, assume it is a name instead 2501 logger.debug('looking for peer by name') 2502 peer_address = await self.find_peer_by_name( 2503 peer_address, transport 2504 ) # TODO: timeout 2505 else: 2506 # All BR/EDR addresses should be public addresses 2507 if ( 2508 transport == BT_BR_EDR_TRANSPORT 2509 and peer_address.address_type != Address.PUBLIC_DEVICE_ADDRESS 2510 ): 2511 raise ValueError('BR/EDR addresses must be PUBLIC') 2512 2513 assert isinstance(peer_address, Address) 2514 2515 def on_connection(connection): 2516 if transport == BT_LE_TRANSPORT or ( 2517 # match BR/EDR connection event against peer address 2518 connection.transport == transport 2519 and connection.peer_address == peer_address 2520 ): 2521 pending_connection.set_result(connection) 2522 2523 def on_connection_failure(error): 2524 if transport == BT_LE_TRANSPORT or ( 2525 # match BR/EDR connection failure event against peer address 2526 error.transport == transport 2527 and error.peer_address == peer_address 2528 ): 2529 pending_connection.set_exception(error) 2530 2531 # Create a future so that we can wait for the connection's result 2532 pending_connection = asyncio.get_running_loop().create_future() 2533 self.on('connection', on_connection) 2534 self.on('connection_failure', on_connection_failure) 2535 2536 try: 2537 # Tell the controller to connect 2538 if transport == BT_LE_TRANSPORT: 2539 if connection_parameters_preferences is None: 2540 if connection_parameters_preferences is None: 2541 connection_parameters_preferences = { 2542 HCI_LE_1M_PHY: ConnectionParametersPreferences.default 2543 } 2544 2545 self.connect_own_address_type = own_address_type 2546 2547 if self.host.supports_command( 2548 HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND 2549 ): 2550 # Only keep supported PHYs 2551 phys = sorted( 2552 list( 2553 set( 2554 filter( 2555 self.supports_le_phy, 2556 connection_parameters_preferences.keys(), 2557 ) 2558 ) 2559 ) 2560 ) 2561 if not phys: 2562 raise ValueError('at least one supported PHY needed') 2563 2564 phy_count = len(phys) 2565 initiating_phys = phy_list_to_bits(phys) 2566 2567 connection_interval_mins = [ 2568 int( 2569 connection_parameters_preferences[ 2570 phy 2571 ].connection_interval_min 2572 / 1.25 2573 ) 2574 for phy in phys 2575 ] 2576 connection_interval_maxs = [ 2577 int( 2578 connection_parameters_preferences[ 2579 phy 2580 ].connection_interval_max 2581 / 1.25 2582 ) 2583 for phy in phys 2584 ] 2585 max_latencies = [ 2586 connection_parameters_preferences[phy].max_latency 2587 for phy in phys 2588 ] 2589 supervision_timeouts = [ 2590 int( 2591 connection_parameters_preferences[phy].supervision_timeout 2592 / 10 2593 ) 2594 for phy in phys 2595 ] 2596 min_ce_lengths = [ 2597 int( 2598 connection_parameters_preferences[phy].min_ce_length / 0.625 2599 ) 2600 for phy in phys 2601 ] 2602 max_ce_lengths = [ 2603 int( 2604 connection_parameters_preferences[phy].max_ce_length / 0.625 2605 ) 2606 for phy in phys 2607 ] 2608 2609 result = await self.send_command( 2610 HCI_LE_Extended_Create_Connection_Command( 2611 initiator_filter_policy=0, 2612 own_address_type=own_address_type, 2613 peer_address_type=peer_address.address_type, 2614 peer_address=peer_address, 2615 initiating_phys=initiating_phys, 2616 scan_intervals=( 2617 int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625), 2618 ) 2619 * phy_count, 2620 scan_windows=( 2621 int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625), 2622 ) 2623 * phy_count, 2624 connection_interval_mins=connection_interval_mins, 2625 connection_interval_maxs=connection_interval_maxs, 2626 max_latencies=max_latencies, 2627 supervision_timeouts=supervision_timeouts, 2628 min_ce_lengths=min_ce_lengths, 2629 max_ce_lengths=max_ce_lengths, 2630 ) 2631 ) 2632 else: 2633 if HCI_LE_1M_PHY not in connection_parameters_preferences: 2634 raise ValueError('1M PHY preferences required') 2635 2636 prefs = connection_parameters_preferences[HCI_LE_1M_PHY] 2637 result = await self.send_command( 2638 HCI_LE_Create_Connection_Command( 2639 le_scan_interval=int( 2640 DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625 2641 ), 2642 le_scan_window=int( 2643 DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625 2644 ), 2645 initiator_filter_policy=0, 2646 peer_address_type=peer_address.address_type, 2647 peer_address=peer_address, 2648 own_address_type=own_address_type, 2649 connection_interval_min=int( 2650 prefs.connection_interval_min / 1.25 2651 ), 2652 connection_interval_max=int( 2653 prefs.connection_interval_max / 1.25 2654 ), 2655 max_latency=prefs.max_latency, 2656 supervision_timeout=int(prefs.supervision_timeout / 10), 2657 min_ce_length=int(prefs.min_ce_length / 0.625), 2658 max_ce_length=int(prefs.max_ce_length / 0.625), 2659 ) 2660 ) 2661 else: 2662 # Save pending connection 2663 self.pending_connections[peer_address] = Connection.incomplete( 2664 self, peer_address, BT_CENTRAL_ROLE 2665 ) 2666 2667 # TODO: allow passing other settings 2668 result = await self.send_command( 2669 HCI_Create_Connection_Command( 2670 bd_addr=peer_address, 2671 packet_type=0xCC18, # FIXME: change 2672 page_scan_repetition_mode=HCI_R2_PAGE_SCAN_REPETITION_MODE, 2673 clock_offset=0x0000, 2674 allow_role_switch=0x01, 2675 reserved=0, 2676 ) 2677 ) 2678 2679 if result.status != HCI_Command_Status_Event.PENDING: 2680 raise HCI_StatusError(result) 2681 2682 # Wait for the connection process to complete 2683 if transport == BT_LE_TRANSPORT: 2684 self.le_connecting = True 2685 2686 if timeout is None: 2687 return await self.abort_on('flush', pending_connection) 2688 2689 try: 2690 return await asyncio.wait_for( 2691 asyncio.shield(pending_connection), timeout 2692 ) 2693 except asyncio.TimeoutError: 2694 if transport == BT_LE_TRANSPORT: 2695 await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) 2696 else: 2697 await self.send_command( 2698 HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) 2699 ) 2700 2701 try: 2702 return await self.abort_on('flush', pending_connection) 2703 except core.ConnectionError as error: 2704 raise core.TimeoutError() from error 2705 finally: 2706 self.remove_listener('connection', on_connection) 2707 self.remove_listener('connection_failure', on_connection_failure) 2708 if transport == BT_LE_TRANSPORT: 2709 self.le_connecting = False 2710 self.connect_own_address_type = None 2711 else: 2712 self.pending_connections.pop(peer_address, None) 2713 2714 async def accept( 2715 self, 2716 peer_address: Union[Address, str] = Address.ANY, 2717 role: int = BT_PERIPHERAL_ROLE, 2718 timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, 2719 ) -> Connection: 2720 ''' 2721 Wait and accept any incoming connection or a connection from `peer_address` when 2722 set. 2723 2724 Notes: 2725 * A `connect` to the same peer will not complete this call. 2726 * The `timeout` parameter is only handled while waiting for the connection 2727 request, once received and accepted, the controller shall issue a connection 2728 complete event. 2729 ''' 2730 2731 if isinstance(peer_address, str): 2732 try: 2733 peer_address = Address(peer_address) 2734 except ValueError: 2735 # If the address is not parsable, assume it is a name instead 2736 logger.debug('looking for peer by name') 2737 peer_address = await self.find_peer_by_name( 2738 peer_address, BT_BR_EDR_TRANSPORT 2739 ) # TODO: timeout 2740 2741 assert isinstance(peer_address, Address) 2742 2743 if peer_address == Address.NIL: 2744 raise ValueError('accept on nil address') 2745 2746 # Create a future so that we can wait for the request 2747 pending_request_fut = asyncio.get_running_loop().create_future() 2748 2749 if peer_address == Address.ANY: 2750 self.classic_pending_accepts[Address.ANY].append(pending_request_fut) 2751 elif peer_address in self.classic_pending_accepts: 2752 raise InvalidStateError('accept connection already pending') 2753 else: 2754 self.classic_pending_accepts[peer_address] = [pending_request_fut] 2755 2756 try: 2757 # Wait for a request or a completed connection 2758 pending_request = self.abort_on('flush', pending_request_fut) 2759 result = await ( 2760 asyncio.wait_for(pending_request, timeout) 2761 if timeout 2762 else pending_request 2763 ) 2764 except Exception: 2765 # Remove future from device context 2766 if peer_address == Address.ANY: 2767 self.classic_pending_accepts[Address.ANY].remove(pending_request_fut) 2768 else: 2769 self.classic_pending_accepts.pop(peer_address) 2770 raise 2771 2772 # Result may already be a completed connection, 2773 # see `on_connection` for details 2774 if isinstance(result, Connection): 2775 return result 2776 2777 # Otherwise, result came from `on_connection_request` 2778 peer_address, _class_of_device, _link_type = result 2779 assert isinstance(peer_address, Address) 2780 2781 # Create a future so that we can wait for the connection's result 2782 pending_connection = asyncio.get_running_loop().create_future() 2783 2784 def on_connection(connection): 2785 if ( 2786 connection.transport == BT_BR_EDR_TRANSPORT 2787 and connection.peer_address == peer_address 2788 ): 2789 pending_connection.set_result(connection) 2790 2791 def on_connection_failure(error): 2792 if ( 2793 error.transport == BT_BR_EDR_TRANSPORT 2794 and error.peer_address == peer_address 2795 ): 2796 pending_connection.set_exception(error) 2797 2798 self.on('connection', on_connection) 2799 self.on('connection_failure', on_connection_failure) 2800 2801 # Save pending connection, with the Peripheral role. 2802 # Even if we requested a role switch in the HCI_Accept_Connection_Request 2803 # command, this connection is still considered Peripheral until an eventual 2804 # role change event. 2805 self.pending_connections[peer_address] = Connection.incomplete( 2806 self, peer_address, BT_PERIPHERAL_ROLE 2807 ) 2808 2809 try: 2810 # Accept connection request 2811 await self.send_command( 2812 HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) 2813 ) 2814 2815 # Wait for connection complete 2816 return await self.abort_on('flush', pending_connection) 2817 2818 finally: 2819 self.remove_listener('connection', on_connection) 2820 self.remove_listener('connection_failure', on_connection_failure) 2821 self.pending_connections.pop(peer_address, None) 2822 2823 @asynccontextmanager 2824 async def connect_as_gatt(self, peer_address): 2825 async with AsyncExitStack() as stack: 2826 connection = await stack.enter_async_context( 2827 await self.connect(peer_address) 2828 ) 2829 peer = await stack.enter_async_context(Peer(connection)) 2830 2831 yield peer 2832 2833 @property 2834 def is_le_connecting(self): 2835 return self.le_connecting 2836 2837 @property 2838 def is_disconnecting(self): 2839 return self.disconnecting 2840 2841 async def cancel_connection(self, peer_address=None): 2842 # Low-energy: cancel ongoing connection 2843 if peer_address is None: 2844 if not self.is_le_connecting: 2845 return 2846 await self.send_command( 2847 HCI_LE_Create_Connection_Cancel_Command(), check_result=True 2848 ) 2849 2850 # BR/EDR: try to cancel to ongoing connection 2851 # NOTE: This API does not prevent from trying to cancel a connection which is 2852 # not currently being created 2853 else: 2854 if isinstance(peer_address, str): 2855 try: 2856 peer_address = Address(peer_address) 2857 except ValueError: 2858 # If the address is not parsable, assume it is a name instead 2859 logger.debug('looking for peer by name') 2860 peer_address = await self.find_peer_by_name( 2861 peer_address, BT_BR_EDR_TRANSPORT 2862 ) # TODO: timeout 2863 2864 await self.send_command( 2865 HCI_Create_Connection_Cancel_Command(bd_addr=peer_address), 2866 check_result=True, 2867 ) 2868 2869 async def disconnect( 2870 self, connection: Union[Connection, ScoLink, CisLink], reason: int 2871 ) -> None: 2872 # Create a future so that we can wait for the disconnection's result 2873 pending_disconnection = asyncio.get_running_loop().create_future() 2874 connection.on('disconnection', pending_disconnection.set_result) 2875 connection.on('disconnection_failure', pending_disconnection.set_exception) 2876 2877 # Request a disconnection 2878 result = await self.send_command( 2879 HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) 2880 ) 2881 2882 try: 2883 if result.status != HCI_Command_Status_Event.PENDING: 2884 raise HCI_StatusError(result) 2885 2886 # Wait for the disconnection process to complete 2887 self.disconnecting = True 2888 return await self.abort_on('flush', pending_disconnection) 2889 finally: 2890 connection.remove_listener( 2891 'disconnection', pending_disconnection.set_result 2892 ) 2893 connection.remove_listener( 2894 'disconnection_failure', pending_disconnection.set_exception 2895 ) 2896 self.disconnecting = False 2897 2898 async def set_data_length(self, connection, tx_octets, tx_time) -> None: 2899 if tx_octets < 0x001B or tx_octets > 0x00FB: 2900 raise ValueError('tx_octets must be between 0x001B and 0x00FB') 2901 2902 if tx_time < 0x0148 or tx_time > 0x4290: 2903 raise ValueError('tx_time must be between 0x0148 and 0x4290') 2904 2905 return await self.send_command( 2906 HCI_LE_Set_Data_Length_Command( 2907 connection_handle=connection.handle, 2908 tx_octets=tx_octets, 2909 tx_time=tx_time, 2910 ), 2911 check_result=True, 2912 ) 2913 2914 async def update_connection_parameters( 2915 self, 2916 connection, 2917 connection_interval_min, 2918 connection_interval_max, 2919 max_latency, 2920 supervision_timeout, 2921 min_ce_length=0, 2922 max_ce_length=0, 2923 use_l2cap=False, 2924 ) -> None: 2925 ''' 2926 NOTE: the name of the parameters may look odd, but it just follows the names 2927 used in the Bluetooth spec. 2928 ''' 2929 2930 if use_l2cap: 2931 if connection.role != BT_PERIPHERAL_ROLE: 2932 raise InvalidStateError( 2933 'only peripheral can update connection parameters with l2cap' 2934 ) 2935 l2cap_result = ( 2936 await self.l2cap_channel_manager.update_connection_parameters( 2937 connection, 2938 connection_interval_min, 2939 connection_interval_max, 2940 max_latency, 2941 supervision_timeout, 2942 ) 2943 ) 2944 if l2cap_result != l2cap.L2CAP_CONNECTION_PARAMETERS_ACCEPTED_RESULT: 2945 raise ConnectionParameterUpdateError(l2cap_result) 2946 2947 result = await self.send_command( 2948 HCI_LE_Connection_Update_Command( 2949 connection_handle=connection.handle, 2950 connection_interval_min=connection_interval_min, 2951 connection_interval_max=connection_interval_max, 2952 max_latency=max_latency, 2953 supervision_timeout=supervision_timeout, 2954 min_ce_length=min_ce_length, 2955 max_ce_length=max_ce_length, 2956 ) 2957 ) 2958 if result.status != HCI_Command_Status_Event.PENDING: 2959 raise HCI_StatusError(result) 2960 2961 async def get_connection_rssi(self, connection): 2962 result = await self.send_command( 2963 HCI_Read_RSSI_Command(handle=connection.handle), check_result=True 2964 ) 2965 return result.return_parameters.rssi 2966 2967 async def get_connection_phy(self, connection): 2968 result = await self.send_command( 2969 HCI_LE_Read_PHY_Command(connection_handle=connection.handle), 2970 check_result=True, 2971 ) 2972 return (result.return_parameters.tx_phy, result.return_parameters.rx_phy) 2973 2974 async def set_connection_phy( 2975 self, connection, tx_phys=None, rx_phys=None, phy_options=None 2976 ): 2977 if not self.host.supports_command(HCI_LE_SET_PHY_COMMAND): 2978 logger.warning('ignoring request, command not supported') 2979 return 2980 2981 all_phys_bits = (1 if tx_phys is None else 0) | ( 2982 (1 if rx_phys is None else 0) << 1 2983 ) 2984 2985 result = await self.send_command( 2986 HCI_LE_Set_PHY_Command( 2987 connection_handle=connection.handle, 2988 all_phys=all_phys_bits, 2989 tx_phys=phy_list_to_bits(tx_phys), 2990 rx_phys=phy_list_to_bits(rx_phys), 2991 phy_options=0 if phy_options is None else int(phy_options), 2992 ) 2993 ) 2994 2995 if result.status != HCI_COMMAND_STATUS_PENDING: 2996 logger.warning( 2997 'HCI_LE_Set_PHY_Command failed: ' 2998 f'{HCI_Constant.error_name(result.status)}' 2999 ) 3000 raise HCI_StatusError(result) 3001 3002 async def set_default_phy(self, tx_phys=None, rx_phys=None): 3003 all_phys_bits = (1 if tx_phys is None else 0) | ( 3004 (1 if rx_phys is None else 0) << 1 3005 ) 3006 3007 return await self.send_command( 3008 HCI_LE_Set_Default_PHY_Command( 3009 all_phys=all_phys_bits, 3010 tx_phys=phy_list_to_bits(tx_phys), 3011 rx_phys=phy_list_to_bits(rx_phys), 3012 ), 3013 check_result=True, 3014 ) 3015 3016 async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT): 3017 """ 3018 Scan for a peer with a give name and return its address and transport 3019 """ 3020 3021 # Create a future to wait for an address to be found 3022 peer_address = asyncio.get_running_loop().create_future() 3023 3024 # Scan/inquire with event handlers to handle scan/inquiry results 3025 def on_peer_found(address, ad_data): 3026 local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) 3027 if local_name is None: 3028 local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True) 3029 if local_name is not None: 3030 if local_name.decode('utf-8') == name: 3031 peer_address.set_result(address) 3032 3033 handler = None 3034 was_scanning = self.scanning 3035 was_discovering = self.discovering 3036 try: 3037 if transport == BT_LE_TRANSPORT: 3038 event_name = 'advertisement' 3039 handler = self.on( 3040 event_name, 3041 lambda advertisement: on_peer_found( 3042 advertisement.address, advertisement.data 3043 ), 3044 ) 3045 3046 if not self.scanning: 3047 await self.start_scanning(filter_duplicates=True) 3048 3049 elif transport == BT_BR_EDR_TRANSPORT: 3050 event_name = 'inquiry_result' 3051 handler = self.on( 3052 event_name, 3053 lambda address, class_of_device, eir_data, rssi: on_peer_found( 3054 address, eir_data 3055 ), 3056 ) 3057 3058 if not self.discovering: 3059 await self.start_discovery() 3060 else: 3061 return None 3062 3063 return await self.abort_on('flush', peer_address) 3064 finally: 3065 if handler is not None: 3066 self.remove_listener(event_name, handler) 3067 3068 if transport == BT_LE_TRANSPORT and not was_scanning: 3069 await self.stop_scanning() 3070 elif transport == BT_BR_EDR_TRANSPORT and not was_discovering: 3071 await self.stop_discovery() 3072 3073 @property 3074 def pairing_config_factory(self) -> Callable[[Connection], PairingConfig]: 3075 return self.smp_manager.pairing_config_factory 3076 3077 @pairing_config_factory.setter 3078 def pairing_config_factory( 3079 self, pairing_config_factory: Callable[[Connection], PairingConfig] 3080 ) -> None: 3081 self.smp_manager.pairing_config_factory = pairing_config_factory 3082 3083 @property 3084 def smp_session_proxy(self) -> Type[smp.Session]: 3085 return self.smp_manager.session_proxy 3086 3087 @smp_session_proxy.setter 3088 def smp_session_proxy(self, session_proxy: Type[smp.Session]) -> None: 3089 self.smp_manager.session_proxy = session_proxy 3090 3091 async def pair(self, connection): 3092 return await self.smp_manager.pair(connection) 3093 3094 def request_pairing(self, connection): 3095 return self.smp_manager.request_pairing(connection) 3096 3097 async def get_long_term_key( 3098 self, connection_handle: int, rand: bytes, ediv: int 3099 ) -> Optional[bytes]: 3100 if (connection := self.lookup_connection(connection_handle)) is None: 3101 return None 3102 3103 # Start by looking for the key in an SMP session 3104 ltk = self.smp_manager.get_long_term_key(connection, rand, ediv) 3105 if ltk is not None: 3106 return ltk 3107 3108 # Then look for the key in the keystore 3109 if self.keystore is not None: 3110 keys = await self.keystore.get(str(connection.peer_address)) 3111 if keys is not None: 3112 logger.debug('found keys in the key store') 3113 if keys.ltk: 3114 return keys.ltk.value 3115 3116 if connection.role == BT_CENTRAL_ROLE and keys.ltk_central: 3117 return keys.ltk_central.value 3118 3119 if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral: 3120 return keys.ltk_peripheral.value 3121 return None 3122 3123 async def get_link_key(self, address: Address) -> Optional[bytes]: 3124 if self.keystore is None: 3125 return None 3126 3127 # Look for the key in the keystore 3128 keys = await self.keystore.get(str(address)) 3129 if keys is None: 3130 logger.debug(f'no keys found for {address}') 3131 return None 3132 3133 logger.debug('found keys in the key store') 3134 if keys.link_key is None: 3135 logger.warning('no link key') 3136 return None 3137 3138 return keys.link_key.value 3139 3140 # [Classic only] 3141 async def authenticate(self, connection): 3142 # Set up event handlers 3143 pending_authentication = asyncio.get_running_loop().create_future() 3144 3145 def on_authentication(): 3146 pending_authentication.set_result(None) 3147 3148 def on_authentication_failure(error_code): 3149 pending_authentication.set_exception(HCI_Error(error_code)) 3150 3151 connection.on('connection_authentication', on_authentication) 3152 connection.on('connection_authentication_failure', on_authentication_failure) 3153 3154 # Request the authentication 3155 try: 3156 result = await self.send_command( 3157 HCI_Authentication_Requested_Command( 3158 connection_handle=connection.handle 3159 ) 3160 ) 3161 if result.status != HCI_COMMAND_STATUS_PENDING: 3162 logger.warning( 3163 'HCI_Authentication_Requested_Command failed: ' 3164 f'{HCI_Constant.error_name(result.status)}' 3165 ) 3166 raise HCI_StatusError(result) 3167 3168 # Wait for the authentication to complete 3169 await connection.abort_on('disconnection', pending_authentication) 3170 finally: 3171 connection.remove_listener('connection_authentication', on_authentication) 3172 connection.remove_listener( 3173 'connection_authentication_failure', on_authentication_failure 3174 ) 3175 3176 async def encrypt(self, connection, enable=True): 3177 if not enable and connection.transport == BT_LE_TRANSPORT: 3178 raise ValueError('`enable` parameter is classic only.') 3179 3180 # Set up event handlers 3181 pending_encryption = asyncio.get_running_loop().create_future() 3182 3183 def on_encryption_change(): 3184 pending_encryption.set_result(None) 3185 3186 def on_encryption_failure(error_code): 3187 pending_encryption.set_exception(HCI_Error(error_code)) 3188 3189 connection.on('connection_encryption_change', on_encryption_change) 3190 connection.on('connection_encryption_failure', on_encryption_failure) 3191 3192 # Request the encryption 3193 try: 3194 if connection.transport == BT_LE_TRANSPORT: 3195 # Look for a key in the key store 3196 if self.keystore is None: 3197 raise RuntimeError('no key store') 3198 3199 keys = await self.keystore.get(str(connection.peer_address)) 3200 if keys is None: 3201 raise RuntimeError('keys not found in key store') 3202 3203 if keys.ltk is not None: 3204 ltk = keys.ltk.value 3205 rand = bytes(8) 3206 ediv = 0 3207 elif keys.ltk_central is not None: 3208 ltk = keys.ltk_central.value 3209 rand = keys.ltk_central.rand 3210 ediv = keys.ltk_central.ediv 3211 else: 3212 raise RuntimeError('no LTK found for peer') 3213 3214 if connection.role != HCI_CENTRAL_ROLE: 3215 raise InvalidStateError('only centrals can start encryption') 3216 3217 result = await self.send_command( 3218 HCI_LE_Enable_Encryption_Command( 3219 connection_handle=connection.handle, 3220 random_number=rand, 3221 encrypted_diversifier=ediv, 3222 long_term_key=ltk, 3223 ) 3224 ) 3225 3226 if result.status != HCI_COMMAND_STATUS_PENDING: 3227 logger.warning( 3228 'HCI_LE_Enable_Encryption_Command failed: ' 3229 f'{HCI_Constant.error_name(result.status)}' 3230 ) 3231 raise HCI_StatusError(result) 3232 else: 3233 result = await self.send_command( 3234 HCI_Set_Connection_Encryption_Command( 3235 connection_handle=connection.handle, 3236 encryption_enable=0x01 if enable else 0x00, 3237 ) 3238 ) 3239 3240 if result.status != HCI_COMMAND_STATUS_PENDING: 3241 logger.warning( 3242 'HCI_Set_Connection_Encryption_Command failed: ' 3243 f'{HCI_Constant.error_name(result.status)}' 3244 ) 3245 raise HCI_StatusError(result) 3246 3247 # Wait for the result 3248 await connection.abort_on('disconnection', pending_encryption) 3249 finally: 3250 connection.remove_listener( 3251 'connection_encryption_change', on_encryption_change 3252 ) 3253 connection.remove_listener( 3254 'connection_encryption_failure', on_encryption_failure 3255 ) 3256 3257 async def update_keys(self, address: str, keys: PairingKeys) -> None: 3258 if self.keystore is None: 3259 return 3260 3261 try: 3262 await self.keystore.update(address, keys) 3263 await self.refresh_resolving_list() 3264 except Exception as error: 3265 logger.warning(f'!!! error while storing keys: {error}') 3266 else: 3267 self.emit('key_store_update') 3268 3269 # [Classic only] 3270 async def switch_role(self, connection: Connection, role: int): 3271 pending_role_change = asyncio.get_running_loop().create_future() 3272 3273 def on_role_change(new_role): 3274 pending_role_change.set_result(new_role) 3275 3276 def on_role_change_failure(error_code): 3277 pending_role_change.set_exception(HCI_Error(error_code)) 3278 3279 connection.on('role_change', on_role_change) 3280 connection.on('role_change_failure', on_role_change_failure) 3281 3282 try: 3283 result = await self.send_command( 3284 HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) 3285 ) 3286 if result.status != HCI_COMMAND_STATUS_PENDING: 3287 logger.warning( 3288 'HCI_Switch_Role_Command failed: ' 3289 f'{HCI_Constant.error_name(result.status)}' 3290 ) 3291 raise HCI_StatusError(result) 3292 await connection.abort_on('disconnection', pending_role_change) 3293 finally: 3294 connection.remove_listener('role_change', on_role_change) 3295 connection.remove_listener('role_change_failure', on_role_change_failure) 3296 3297 # [Classic only] 3298 async def request_remote_name(self, remote: Union[Address, Connection]) -> str: 3299 # Set up event handlers 3300 pending_name = asyncio.get_running_loop().create_future() 3301 3302 peer_address = remote if isinstance(remote, Address) else remote.peer_address 3303 3304 handler = self.on( 3305 'remote_name', 3306 lambda address, remote_name: ( 3307 pending_name.set_result(remote_name) 3308 if address == peer_address 3309 else None 3310 ), 3311 ) 3312 failure_handler = self.on( 3313 'remote_name_failure', 3314 lambda address, error_code: ( 3315 pending_name.set_exception(HCI_Error(error_code)) 3316 if address == peer_address 3317 else None 3318 ), 3319 ) 3320 3321 try: 3322 result = await self.send_command( 3323 HCI_Remote_Name_Request_Command( 3324 bd_addr=peer_address, 3325 page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2, 3326 reserved=0, 3327 clock_offset=0, # TODO investigate non-0 values 3328 ) 3329 ) 3330 3331 if result.status != HCI_COMMAND_STATUS_PENDING: 3332 logger.warning( 3333 'HCI_Remote_Name_Request_Command failed: ' 3334 f'{HCI_Constant.error_name(result.status)}' 3335 ) 3336 raise HCI_StatusError(result) 3337 3338 # Wait for the result 3339 return await self.abort_on('flush', pending_name) 3340 finally: 3341 self.remove_listener('remote_name', handler) 3342 self.remove_listener('remote_name_failure', failure_handler) 3343 3344 # [LE only] 3345 @experimental('Only for testing.') 3346 async def setup_cig( 3347 self, 3348 cig_id: int, 3349 cis_id: List[int], 3350 sdu_interval: Tuple[int, int], 3351 framing: int, 3352 max_sdu: Tuple[int, int], 3353 retransmission_number: int, 3354 max_transport_latency: Tuple[int, int], 3355 ) -> List[int]: 3356 """Sends HCI_LE_Set_CIG_Parameters_Command. 3357 3358 Args: 3359 cig_id: CIG_ID. 3360 cis_id: CID ID list. 3361 sdu_interval: SDU intervals of (Central->Peripheral, Peripheral->Cental). 3362 framing: Un-framing(0) or Framing(1). 3363 max_sdu: Max SDU counts of (Central->Peripheral, Peripheral->Cental). 3364 retransmission_number: retransmission_number. 3365 max_transport_latency: Max transport latencies of 3366 (Central->Peripheral, Peripheral->Cental). 3367 3368 Returns: 3369 List of created CIS handles corresponding to the same order of [cid_id]. 3370 """ 3371 num_cis = len(cis_id) 3372 3373 response = await self.send_command( 3374 HCI_LE_Set_CIG_Parameters_Command( 3375 cig_id=cig_id, 3376 sdu_interval_c_to_p=sdu_interval[0], 3377 sdu_interval_p_to_c=sdu_interval[1], 3378 worst_case_sca=0x00, # 251-500 ppm 3379 packing=0x00, # Sequential 3380 framing=framing, 3381 max_transport_latency_c_to_p=max_transport_latency[0], 3382 max_transport_latency_p_to_c=max_transport_latency[1], 3383 cis_id=cis_id, 3384 max_sdu_c_to_p=[max_sdu[0]] * num_cis, 3385 max_sdu_p_to_c=[max_sdu[1]] * num_cis, 3386 phy_c_to_p=[HCI_LE_2M_PHY] * num_cis, 3387 phy_p_to_c=[HCI_LE_2M_PHY] * num_cis, 3388 rtn_c_to_p=[retransmission_number] * num_cis, 3389 rtn_p_to_c=[retransmission_number] * num_cis, 3390 ), 3391 check_result=True, 3392 ) 3393 3394 # Ideally, we should manage CIG lifecycle, but they are not useful for Unicast 3395 # Server, so here it only provides a basic functionality for testing. 3396 cis_handles = response.return_parameters.connection_handle[:] 3397 for id, cis_handle in zip(cis_id, cis_handles): 3398 self._pending_cis[cis_handle] = (id, cig_id) 3399 3400 return cis_handles 3401 3402 # [LE only] 3403 @experimental('Only for testing.') 3404 async def create_cis(self, cis_acl_pairs: List[Tuple[int, int]]) -> List[CisLink]: 3405 for cis_handle, acl_handle in cis_acl_pairs: 3406 acl_connection = self.lookup_connection(acl_handle) 3407 assert acl_connection 3408 cis_id, cig_id = self._pending_cis.pop(cis_handle) 3409 self.cis_links[cis_handle] = CisLink( 3410 device=self, 3411 acl_connection=acl_connection, 3412 handle=cis_handle, 3413 cis_id=cis_id, 3414 cig_id=cig_id, 3415 ) 3416 3417 with closing(EventWatcher()) as watcher: 3418 pending_cis_establishments = { 3419 cis_handle: asyncio.get_running_loop().create_future() 3420 for cis_handle, _ in cis_acl_pairs 3421 } 3422 3423 def on_cis_establishment(cis_link: CisLink) -> None: 3424 if pending_future := pending_cis_establishments.get(cis_link.handle): 3425 pending_future.set_result(cis_link) 3426 3427 def on_cis_establishment_failure(cis_handle: int, status: int) -> None: 3428 if pending_future := pending_cis_establishments.get(cis_handle): 3429 pending_future.set_exception(HCI_Error(status)) 3430 3431 watcher.on(self, 'cis_establishment', on_cis_establishment) 3432 watcher.on(self, 'cis_establishment_failure', on_cis_establishment_failure) 3433 await self.send_command( 3434 HCI_LE_Create_CIS_Command( 3435 cis_connection_handle=[p[0] for p in cis_acl_pairs], 3436 acl_connection_handle=[p[1] for p in cis_acl_pairs], 3437 ), 3438 check_result=True, 3439 ) 3440 3441 return await asyncio.gather(*pending_cis_establishments.values()) 3442 3443 # [LE only] 3444 @experimental('Only for testing.') 3445 async def accept_cis_request(self, handle: int) -> CisLink: 3446 """[LE Only] Accepts an incoming CIS request. 3447 3448 When the specified CIS handle is already created, this method returns the 3449 existed CIS link object immediately. 3450 3451 Args: 3452 handle: CIS handle to accept. 3453 3454 Returns: 3455 CIS link object on the given handle. 3456 """ 3457 if not (cis_link := self.cis_links.get(handle)): 3458 raise InvalidStateError(f'No pending CIS request of handle {handle}') 3459 3460 # There might be multiple ASE sharing a CIS channel. 3461 # If one of them has accepted the request, the others should just leverage it. 3462 async with self._cis_lock: 3463 if cis_link.state == CisLink.State.ESTABLISHED: 3464 return cis_link 3465 3466 with closing(EventWatcher()) as watcher: 3467 pending_establishment = asyncio.get_running_loop().create_future() 3468 3469 def on_establishment() -> None: 3470 pending_establishment.set_result(None) 3471 3472 def on_establishment_failure(status: int) -> None: 3473 pending_establishment.set_exception(HCI_Error(status)) 3474 3475 watcher.on(cis_link, 'establishment', on_establishment) 3476 watcher.on(cis_link, 'establishment_failure', on_establishment_failure) 3477 3478 await self.send_command( 3479 HCI_LE_Accept_CIS_Request_Command(connection_handle=handle), 3480 check_result=True, 3481 ) 3482 3483 await pending_establishment 3484 return cis_link 3485 3486 # Mypy believes this is reachable when context is an ExitStack. 3487 raise InvalidStateError('Unreachable') 3488 3489 # [LE only] 3490 @experimental('Only for testing.') 3491 async def reject_cis_request( 3492 self, 3493 handle: int, 3494 reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 3495 ) -> None: 3496 await self.send_command( 3497 HCI_LE_Reject_CIS_Request_Command(connection_handle=handle, reason=reason), 3498 check_result=True, 3499 ) 3500 3501 async def get_remote_le_features(self, connection: Connection) -> LeFeatureMask: 3502 """[LE Only] Reads remote LE supported features. 3503 3504 Args: 3505 handle: connection handle to read LE features. 3506 3507 Returns: 3508 LE features supported by the remote device. 3509 """ 3510 with closing(EventWatcher()) as watcher: 3511 read_feature_future: asyncio.Future[LeFeatureMask] = ( 3512 asyncio.get_running_loop().create_future() 3513 ) 3514 3515 def on_le_remote_features(handle: int, features: int): 3516 if handle == connection.handle: 3517 read_feature_future.set_result(LeFeatureMask(features)) 3518 3519 def on_failure(handle: int, status: int): 3520 if handle == connection.handle: 3521 read_feature_future.set_exception(HCI_Error(status)) 3522 3523 watcher.on(self.host, 'le_remote_features', on_le_remote_features) 3524 watcher.on(self.host, 'le_remote_features_failure', on_failure) 3525 await self.send_command( 3526 HCI_LE_Read_Remote_Features_Command( 3527 connection_handle=connection.handle 3528 ), 3529 check_result=True, 3530 ) 3531 return await read_feature_future 3532 3533 @host_event_handler 3534 def on_flush(self): 3535 self.emit('flush') 3536 for _, connection in self.connections.items(): 3537 connection.emit('disconnection', 0) 3538 self.connections = {} 3539 3540 # [Classic only] 3541 @host_event_handler 3542 def on_link_key(self, bd_addr, link_key, key_type): 3543 # Store the keys in the key store 3544 if self.keystore: 3545 authenticated = key_type in ( 3546 HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_192_TYPE, 3547 HCI_AUTHENTICATED_COMBINATION_KEY_GENERATED_FROM_P_256_TYPE, 3548 ) 3549 pairing_keys = PairingKeys() 3550 pairing_keys.link_key = PairingKeys.Key( 3551 value=link_key, authenticated=authenticated 3552 ) 3553 3554 self.abort_on('flush', self.update_keys(str(bd_addr), pairing_keys)) 3555 3556 if connection := self.find_connection_by_bd_addr( 3557 bd_addr, transport=BT_BR_EDR_TRANSPORT 3558 ): 3559 connection.link_key_type = key_type 3560 3561 def add_service(self, service): 3562 self.gatt_server.add_service(service) 3563 3564 def add_services(self, services): 3565 self.gatt_server.add_services(services) 3566 3567 def add_default_services(self, generic_access_service=True): 3568 # Add a GAP Service if requested 3569 if generic_access_service: 3570 self.gatt_server.add_service(GenericAccessService(self.name)) 3571 3572 async def notify_subscriber(self, connection, attribute, value=None, force=False): 3573 await self.gatt_server.notify_subscriber(connection, attribute, value, force) 3574 3575 async def notify_subscribers(self, attribute, value=None, force=False): 3576 await self.gatt_server.notify_subscribers(attribute, value, force) 3577 3578 async def indicate_subscriber(self, connection, attribute, value=None, force=False): 3579 await self.gatt_server.indicate_subscriber(connection, attribute, value, force) 3580 3581 async def indicate_subscribers(self, attribute, value=None, force=False): 3582 await self.gatt_server.indicate_subscribers(attribute, value, force) 3583 3584 @host_event_handler 3585 def on_advertising_set_termination( 3586 self, 3587 status, 3588 advertising_handle, 3589 connection_handle, 3590 number_of_completed_extended_advertising_events, 3591 ): 3592 # Legacy advertising set is also one of extended advertising sets. 3593 if not ( 3594 advertising_set := self.extended_advertising_sets.get(advertising_handle) 3595 ): 3596 logger.warning(f'advertising set {advertising_handle} not found') 3597 return 3598 3599 advertising_set.on_termination(status) 3600 3601 if status != HCI_SUCCESS: 3602 logger.debug( 3603 f'advertising set {advertising_handle} ' 3604 f'terminated with status {status}' 3605 ) 3606 return 3607 3608 if not (connection := self.lookup_connection(connection_handle)): 3609 logger.warning(f'no connection for handle 0x{connection_handle:04x}') 3610 return 3611 3612 # Update the connection address. 3613 connection.self_address = ( 3614 advertising_set.random_address 3615 if advertising_set.advertising_parameters.own_address_type 3616 in (OwnAddressType.RANDOM, OwnAddressType.RESOLVABLE_OR_RANDOM) 3617 else self.public_address 3618 ) 3619 3620 # Setup auto-restart of the advertising set if needed. 3621 if advertising_set.auto_restart: 3622 connection.once( 3623 'disconnection', 3624 lambda _: self.abort_on('flush', advertising_set.start()), 3625 ) 3626 3627 self._emit_le_connection(connection) 3628 3629 def _emit_le_connection(self, connection: Connection) -> None: 3630 # If supported, read which PHY we're connected with before 3631 # notifying listeners of the new connection. 3632 if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): 3633 3634 async def read_phy(): 3635 result = await self.send_command( 3636 HCI_LE_Read_PHY_Command(connection_handle=connection.handle), 3637 check_result=True, 3638 ) 3639 connection.phy = ConnectionPHY( 3640 result.return_parameters.tx_phy, result.return_parameters.rx_phy 3641 ) 3642 # Emit an event to notify listeners of the new connection 3643 self.emit('connection', connection) 3644 3645 # Do so asynchronously to not block the current event handler 3646 connection.abort_on('disconnection', read_phy()) 3647 3648 return 3649 3650 self.emit('connection', connection) 3651 3652 @host_event_handler 3653 def on_connection( 3654 self, 3655 connection_handle, 3656 transport, 3657 peer_address, 3658 role, 3659 connection_parameters, 3660 ): 3661 logger.debug( 3662 f'*** Connection: [0x{connection_handle:04X}] ' 3663 f'{peer_address} {"" if role is None else HCI_Constant.role_name(role)}' 3664 ) 3665 if connection_handle in self.connections: 3666 logger.warning( 3667 'new connection reuses the same handle as a previous connection' 3668 ) 3669 3670 if transport == BT_BR_EDR_TRANSPORT: 3671 # Create a new connection 3672 connection = self.pending_connections.pop(peer_address) 3673 connection.complete(connection_handle, connection_parameters) 3674 self.connections[connection_handle] = connection 3675 3676 # Emit an event to notify listeners of the new connection 3677 self.emit('connection', connection) 3678 3679 return 3680 3681 # Resolve the peer address if we can 3682 peer_resolvable_address = None 3683 if self.address_resolver: 3684 if peer_address.is_resolvable: 3685 resolved_address = self.address_resolver.resolve(peer_address) 3686 if resolved_address is not None: 3687 logger.debug(f'*** Address resolved as {resolved_address}') 3688 peer_resolvable_address = peer_address 3689 peer_address = resolved_address 3690 3691 self_address = None 3692 if role == HCI_CENTRAL_ROLE: 3693 own_address_type = self.connect_own_address_type 3694 assert own_address_type is not None 3695 else: 3696 if self.supports_le_extended_advertising: 3697 # We'll know the address when the advertising set terminates, 3698 # Use a temporary placeholder value for self_address. 3699 self_address = Address.ANY_RANDOM 3700 else: 3701 # We were connected via a legacy advertisement. 3702 if self.legacy_advertiser: 3703 own_address_type = self.legacy_advertiser.own_address_type 3704 else: 3705 # This should not happen, but just in case, pick a default. 3706 logger.warning("connection without an advertiser") 3707 self_address = self.random_address 3708 3709 if self_address is None: 3710 self_address = ( 3711 self.public_address 3712 if own_address_type 3713 in ( 3714 OwnAddressType.PUBLIC, 3715 OwnAddressType.RESOLVABLE_OR_PUBLIC, 3716 ) 3717 else self.random_address 3718 ) 3719 3720 # Create a connection. 3721 connection = Connection( 3722 self, 3723 connection_handle, 3724 transport, 3725 self_address, 3726 peer_address, 3727 peer_resolvable_address, 3728 role, 3729 connection_parameters, 3730 ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), 3731 ) 3732 self.connections[connection_handle] = connection 3733 3734 if role == HCI_PERIPHERAL_ROLE and self.legacy_advertiser: 3735 if self.legacy_advertiser.auto_restart: 3736 connection.once( 3737 'disconnection', 3738 lambda _: self.abort_on('flush', self.legacy_advertiser.start()), 3739 ) 3740 else: 3741 self.legacy_advertiser = None 3742 3743 if role == HCI_CENTRAL_ROLE or not self.supports_le_extended_advertising: 3744 # We can emit now, we have all the info we need 3745 self._emit_le_connection(connection) 3746 3747 @host_event_handler 3748 def on_connection_failure(self, transport, peer_address, error_code): 3749 logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}') 3750 3751 # For directed advertising, this means a timeout 3752 if ( 3753 transport == BT_LE_TRANSPORT 3754 and self.legacy_advertiser 3755 and self.legacy_advertiser.advertising_type.is_directed 3756 ): 3757 self.legacy_advertiser = None 3758 3759 # Notify listeners 3760 error = core.ConnectionError( 3761 error_code, 3762 transport, 3763 peer_address, 3764 'hci', 3765 HCI_Constant.error_name(error_code), 3766 ) 3767 self.emit('connection_failure', error) 3768 3769 # FIXME: Explore a delegate-model for BR/EDR wait connection #56. 3770 @host_event_handler 3771 def on_connection_request(self, bd_addr, class_of_device, link_type): 3772 logger.debug(f'*** Connection request: {bd_addr}') 3773 3774 # Handle SCO request. 3775 if link_type in ( 3776 HCI_Connection_Complete_Event.SCO_LINK_TYPE, 3777 HCI_Connection_Complete_Event.ESCO_LINK_TYPE, 3778 ): 3779 if connection := self.find_connection_by_bd_addr( 3780 bd_addr, transport=BT_BR_EDR_TRANSPORT 3781 ): 3782 self.emit('sco_request', connection, link_type) 3783 else: 3784 logger.error(f'SCO request from a non-connected device {bd_addr}') 3785 return 3786 3787 # match a pending future using `bd_addr` 3788 elif bd_addr in self.classic_pending_accepts: 3789 future, *_ = self.classic_pending_accepts.pop(bd_addr) 3790 future.set_result((bd_addr, class_of_device, link_type)) 3791 3792 # match first pending future for ANY address 3793 elif len(self.classic_pending_accepts[Address.ANY]) > 0: 3794 future = self.classic_pending_accepts[Address.ANY].pop(0) 3795 future.set_result((bd_addr, class_of_device, link_type)) 3796 3797 # device configuration is set to accept any incoming connection 3798 elif self.classic_accept_any: 3799 # Save pending connection 3800 self.pending_connections[bd_addr] = Connection.incomplete( 3801 self, bd_addr, BT_PERIPHERAL_ROLE 3802 ) 3803 3804 self.host.send_command_sync( 3805 HCI_Accept_Connection_Request_Command( 3806 bd_addr=bd_addr, role=0x01 # Remain the peripheral 3807 ) 3808 ) 3809 3810 # reject incoming connection 3811 else: 3812 self.host.send_command_sync( 3813 HCI_Reject_Connection_Request_Command( 3814 bd_addr=bd_addr, 3815 reason=HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, 3816 ) 3817 ) 3818 3819 @host_event_handler 3820 def on_disconnection(self, connection_handle: int, reason: int) -> None: 3821 if connection := self.connections.pop(connection_handle, None): 3822 logger.debug( 3823 f'*** Disconnection: [0x{connection.handle:04X}] ' 3824 f'{connection.peer_address} as {connection.role_name}, reason={reason}' 3825 ) 3826 connection.emit('disconnection', reason) 3827 3828 # Cleanup subsystems that maintain per-connection state 3829 self.gatt_server.on_disconnection(connection) 3830 elif sco_link := self.sco_links.pop(connection_handle, None): 3831 sco_link.emit('disconnection', reason) 3832 elif cis_link := self.cis_links.pop(connection_handle, None): 3833 cis_link.emit('disconnection', reason) 3834 else: 3835 logger.error( 3836 f'*** Unknown disconnection handle=0x{connection_handle}, reason={reason} ***' 3837 ) 3838 3839 @host_event_handler 3840 @with_connection_from_handle 3841 def on_disconnection_failure(self, connection, error_code): 3842 logger.debug(f'*** Disconnection failed: {error_code}') 3843 error = core.ConnectionError( 3844 error_code, 3845 connection.transport, 3846 connection.peer_address, 3847 'hci', 3848 HCI_Constant.error_name(error_code), 3849 ) 3850 connection.emit('disconnection_failure', error) 3851 3852 @host_event_handler 3853 @AsyncRunner.run_in_task() 3854 async def on_inquiry_complete(self): 3855 if self.auto_restart_inquiry: 3856 # Inquire again 3857 await self.start_discovery(auto_restart=True) 3858 else: 3859 self.auto_restart_inquiry = True 3860 self.discovering = False 3861 self.emit('inquiry_complete') 3862 3863 @host_event_handler 3864 @with_connection_from_handle 3865 def on_connection_authentication(self, connection): 3866 logger.debug( 3867 f'*** Connection Authentication: [0x{connection.handle:04X}] ' 3868 f'{connection.peer_address} as {connection.role_name}' 3869 ) 3870 connection.authenticated = True 3871 connection.emit('connection_authentication') 3872 3873 @host_event_handler 3874 @with_connection_from_handle 3875 def on_connection_authentication_failure(self, connection, error): 3876 logger.debug( 3877 f'*** Connection Authentication Failure: [0x{connection.handle:04X}] ' 3878 f'{connection.peer_address} as {connection.role_name}, error={error}' 3879 ) 3880 connection.emit('connection_authentication_failure', error) 3881 3882 # [Classic only] 3883 @host_event_handler 3884 @with_connection_from_address 3885 def on_authentication_io_capability_request(self, connection): 3886 # Ask what the pairing config should be for this connection 3887 pairing_config = self.pairing_config_factory(connection) 3888 3889 # Compute the authentication requirements 3890 authentication_requirements = ( 3891 # No Bonding 3892 ( 3893 HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 3894 HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 3895 ), 3896 # General Bonding 3897 ( 3898 HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 3899 HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 3900 ), 3901 )[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0] 3902 3903 # Respond 3904 self.host.send_command_sync( 3905 HCI_IO_Capability_Request_Reply_Command( 3906 bd_addr=connection.peer_address, 3907 io_capability=pairing_config.delegate.classic_io_capability, 3908 oob_data_present=0x00, # Not present 3909 authentication_requirements=authentication_requirements, 3910 ) 3911 ) 3912 3913 # [Classic only] 3914 @host_event_handler 3915 @with_connection_from_address 3916 def on_authentication_io_capability_response( 3917 self, connection, io_capability, authentication_requirements 3918 ): 3919 connection.peer_pairing_io_capability = io_capability 3920 connection.peer_pairing_authentication_requirements = ( 3921 authentication_requirements 3922 ) 3923 3924 # [Classic only] 3925 @host_event_handler 3926 @with_connection_from_address 3927 def on_authentication_user_confirmation_request(self, connection, code) -> None: 3928 # Ask what the pairing config should be for this connection 3929 pairing_config = self.pairing_config_factory(connection) 3930 io_capability = pairing_config.delegate.classic_io_capability 3931 peer_io_capability = connection.peer_pairing_io_capability 3932 3933 async def confirm() -> bool: 3934 # Ask the user to confirm the pairing, without display 3935 return await pairing_config.delegate.confirm() 3936 3937 async def auto_confirm() -> bool: 3938 # Ask the user to auto-confirm the pairing, without display 3939 return await pairing_config.delegate.confirm(auto=True) 3940 3941 async def display_confirm() -> bool: 3942 # Display the code and ask the user to compare 3943 return await pairing_config.delegate.compare_numbers(code, digits=6) 3944 3945 async def display_auto_confirm() -> bool: 3946 # Display the code to the user and ask the delegate to auto-confirm 3947 await pairing_config.delegate.display_number(code, digits=6) 3948 return await pairing_config.delegate.confirm(auto=True) 3949 3950 async def na() -> bool: 3951 assert False, "N/A: unreachable" 3952 3953 # See Bluetooth spec @ Vol 3, Part C 5.2.2.6 3954 methods = { 3955 HCI_DISPLAY_ONLY_IO_CAPABILITY: { 3956 HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, 3957 HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, 3958 HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, 3959 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, 3960 }, 3961 HCI_DISPLAY_YES_NO_IO_CAPABILITY: { 3962 HCI_DISPLAY_ONLY_IO_CAPABILITY: display_auto_confirm, 3963 HCI_DISPLAY_YES_NO_IO_CAPABILITY: display_confirm, 3964 HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, 3965 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, 3966 }, 3967 HCI_KEYBOARD_ONLY_IO_CAPABILITY: { 3968 HCI_DISPLAY_ONLY_IO_CAPABILITY: na, 3969 HCI_DISPLAY_YES_NO_IO_CAPABILITY: na, 3970 HCI_KEYBOARD_ONLY_IO_CAPABILITY: na, 3971 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, 3972 }, 3973 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { 3974 HCI_DISPLAY_ONLY_IO_CAPABILITY: confirm, 3975 HCI_DISPLAY_YES_NO_IO_CAPABILITY: confirm, 3976 HCI_KEYBOARD_ONLY_IO_CAPABILITY: auto_confirm, 3977 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: auto_confirm, 3978 }, 3979 } 3980 3981 method = methods[peer_io_capability][io_capability] 3982 3983 async def reply() -> None: 3984 try: 3985 if await connection.abort_on('disconnection', method()): 3986 await self.host.send_command( 3987 HCI_User_Confirmation_Request_Reply_Command( 3988 bd_addr=connection.peer_address 3989 ) 3990 ) 3991 return 3992 except Exception as error: 3993 logger.warning(f'exception while confirming: {error}') 3994 3995 await self.host.send_command( 3996 HCI_User_Confirmation_Request_Negative_Reply_Command( 3997 bd_addr=connection.peer_address 3998 ) 3999 ) 4000 4001 AsyncRunner.spawn(reply()) 4002 4003 # [Classic only] 4004 @host_event_handler 4005 @with_connection_from_address 4006 def on_authentication_user_passkey_request(self, connection) -> None: 4007 # Ask what the pairing config should be for this connection 4008 pairing_config = self.pairing_config_factory(connection) 4009 4010 async def reply() -> None: 4011 try: 4012 number = await connection.abort_on( 4013 'disconnection', pairing_config.delegate.get_number() 4014 ) 4015 if number is not None: 4016 await self.host.send_command( 4017 HCI_User_Passkey_Request_Reply_Command( 4018 bd_addr=connection.peer_address, numeric_value=number 4019 ) 4020 ) 4021 return 4022 except Exception as error: 4023 logger.warning(f'exception while asking for pass-key: {error}') 4024 4025 await self.host.send_command( 4026 HCI_User_Passkey_Request_Negative_Reply_Command( 4027 bd_addr=connection.peer_address 4028 ) 4029 ) 4030 4031 AsyncRunner.spawn(reply()) 4032 4033 # [Classic only] 4034 @host_event_handler 4035 @with_connection_from_address 4036 def on_pin_code_request(self, connection): 4037 # Classic legacy pairing 4038 # Ask what the pairing config should be for this connection 4039 pairing_config = self.pairing_config_factory(connection) 4040 io_capability = pairing_config.delegate.classic_io_capability 4041 4042 # Respond 4043 if io_capability == HCI_KEYBOARD_ONLY_IO_CAPABILITY: 4044 # Ask the user to enter a string 4045 async def get_pin_code(): 4046 pin_code = await connection.abort_on( 4047 'disconnection', pairing_config.delegate.get_string(16) 4048 ) 4049 4050 if pin_code is not None: 4051 pin_code = bytes(pin_code, encoding='utf-8') 4052 pin_code_len = len(pin_code) 4053 assert 0 < pin_code_len <= 16, "pin_code should be 1-16 bytes" 4054 await self.host.send_command( 4055 HCI_PIN_Code_Request_Reply_Command( 4056 bd_addr=connection.peer_address, 4057 pin_code_length=pin_code_len, 4058 pin_code=pin_code, 4059 ) 4060 ) 4061 else: 4062 logger.debug("delegate.get_string() returned None") 4063 await self.host.send_command( 4064 HCI_PIN_Code_Request_Negative_Reply_Command( 4065 bd_addr=connection.peer_address 4066 ) 4067 ) 4068 4069 asyncio.create_task(get_pin_code()) 4070 else: 4071 self.host.send_command_sync( 4072 HCI_PIN_Code_Request_Negative_Reply_Command( 4073 bd_addr=connection.peer_address 4074 ) 4075 ) 4076 4077 # [Classic only] 4078 @host_event_handler 4079 @with_connection_from_address 4080 def on_authentication_user_passkey_notification(self, connection, passkey): 4081 # Ask what the pairing config should be for this connection 4082 pairing_config = self.pairing_config_factory(connection) 4083 4084 # Show the passkey to the user 4085 connection.abort_on( 4086 'disconnection', pairing_config.delegate.display_number(passkey) 4087 ) 4088 4089 # [Classic only] 4090 @host_event_handler 4091 @try_with_connection_from_address 4092 def on_remote_name(self, connection: Connection, address, remote_name): 4093 # Try to decode the name 4094 try: 4095 remote_name = remote_name.decode('utf-8') 4096 if connection: 4097 connection.peer_name = remote_name 4098 connection.emit('remote_name') 4099 self.emit('remote_name', address, remote_name) 4100 except UnicodeDecodeError as error: 4101 logger.warning('peer name is not valid UTF-8') 4102 if connection: 4103 connection.emit('remote_name_failure', error) 4104 else: 4105 self.emit('remote_name_failure', address, error) 4106 4107 # [Classic only] 4108 @host_event_handler 4109 @try_with_connection_from_address 4110 def on_remote_name_failure(self, connection: Connection, address, error): 4111 if connection: 4112 connection.emit('remote_name_failure', error) 4113 self.emit('remote_name_failure', address, error) 4114 4115 # [Classic only] 4116 @host_event_handler 4117 @with_connection_from_address 4118 @experimental('Only for testing.') 4119 def on_sco_connection( 4120 self, acl_connection: Connection, sco_handle: int, link_type: int 4121 ) -> None: 4122 logger.debug( 4123 f'*** SCO connected: {acl_connection.peer_address}, ' 4124 f'sco_handle=[0x{sco_handle:04X}], ' 4125 f'link_type=[0x{link_type:02X}] ***' 4126 ) 4127 sco_link = self.sco_links[sco_handle] = ScoLink( 4128 device=self, 4129 acl_connection=acl_connection, 4130 handle=sco_handle, 4131 link_type=link_type, 4132 ) 4133 self.emit('sco_connection', sco_link) 4134 4135 # [Classic only] 4136 @host_event_handler 4137 @with_connection_from_address 4138 @experimental('Only for testing.') 4139 def on_sco_connection_failure( 4140 self, acl_connection: Connection, status: int 4141 ) -> None: 4142 logger.debug(f'*** SCO connection failure: {acl_connection.peer_address}***') 4143 self.emit('sco_connection_failure') 4144 4145 # [Classic only] 4146 @host_event_handler 4147 @experimental('Only for testing') 4148 def on_sco_packet(self, sco_handle: int, packet: HCI_SynchronousDataPacket) -> None: 4149 if (sco_link := self.sco_links.get(sco_handle)) and sco_link.sink: 4150 sco_link.sink(packet) 4151 4152 # [LE only] 4153 @host_event_handler 4154 @with_connection_from_handle 4155 @experimental('Only for testing') 4156 def on_cis_request( 4157 self, 4158 acl_connection: Connection, 4159 cis_handle: int, 4160 cig_id: int, 4161 cis_id: int, 4162 ) -> None: 4163 logger.debug( 4164 f'*** CIS Request ' 4165 f'acl_handle=[0x{acl_connection.handle:04X}]{acl_connection.peer_address}, ' 4166 f'cis_handle=[0x{cis_handle:04X}], ' 4167 f'cig_id=[0x{cig_id:02X}], ' 4168 f'cis_id=[0x{cis_id:02X}] ***' 4169 ) 4170 # LE_CIS_Established event doesn't provide info, so we must store them here. 4171 self.cis_links[cis_handle] = CisLink( 4172 device=self, 4173 acl_connection=acl_connection, 4174 handle=cis_handle, 4175 cig_id=cig_id, 4176 cis_id=cis_id, 4177 ) 4178 self.emit('cis_request', acl_connection, cis_handle, cig_id, cis_id) 4179 4180 # [LE only] 4181 @host_event_handler 4182 @experimental('Only for testing') 4183 def on_cis_establishment(self, cis_handle: int) -> None: 4184 cis_link = self.cis_links[cis_handle] 4185 cis_link.state = CisLink.State.ESTABLISHED 4186 4187 assert cis_link.acl_connection 4188 4189 logger.debug( 4190 f'*** CIS Establishment ' 4191 f'{cis_link.acl_connection.peer_address}, ' 4192 f'cis_handle=[0x{cis_handle:04X}], ' 4193 f'cig_id=[0x{cis_link.cig_id:02X}], ' 4194 f'cis_id=[0x{cis_link.cis_id:02X}] ***' 4195 ) 4196 4197 cis_link.emit('establishment') 4198 self.emit('cis_establishment', cis_link) 4199 4200 # [LE only] 4201 @host_event_handler 4202 @experimental('Only for testing') 4203 def on_cis_establishment_failure(self, cis_handle: int, status: int) -> None: 4204 logger.debug(f'*** CIS Establishment Failure: cis=[0x{cis_handle:04X}] ***') 4205 if cis_link := self.cis_links.pop(cis_handle): 4206 cis_link.emit('establishment_failure', status) 4207 self.emit('cis_establishment_failure', cis_handle, status) 4208 4209 # [LE only] 4210 @host_event_handler 4211 @experimental('Only for testing') 4212 def on_iso_packet(self, handle: int, packet: HCI_IsoDataPacket) -> None: 4213 if (cis_link := self.cis_links.get(handle)) and cis_link.sink: 4214 cis_link.sink(packet) 4215 4216 @host_event_handler 4217 @with_connection_from_handle 4218 def on_connection_encryption_change(self, connection, encryption): 4219 logger.debug( 4220 f'*** Connection Encryption Change: [0x{connection.handle:04X}] ' 4221 f'{connection.peer_address} as {connection.role_name}, ' 4222 f'encryption={encryption}' 4223 ) 4224 connection.encryption = encryption 4225 if ( 4226 not connection.authenticated 4227 and connection.transport == BT_BR_EDR_TRANSPORT 4228 and encryption == HCI_Encryption_Change_Event.AES_CCM 4229 ): 4230 connection.authenticated = True 4231 connection.sc = True 4232 if ( 4233 not connection.authenticated 4234 and connection.transport == BT_LE_TRANSPORT 4235 and encryption == HCI_Encryption_Change_Event.E0_OR_AES_CCM 4236 ): 4237 connection.authenticated = True 4238 connection.sc = True 4239 connection.emit('connection_encryption_change') 4240 4241 @host_event_handler 4242 @with_connection_from_handle 4243 def on_connection_encryption_failure(self, connection, error): 4244 logger.debug( 4245 f'*** Connection Encryption Failure: [0x{connection.handle:04X}] ' 4246 f'{connection.peer_address} as {connection.role_name}, ' 4247 f'error={error}' 4248 ) 4249 connection.emit('connection_encryption_failure', error) 4250 4251 @host_event_handler 4252 @with_connection_from_handle 4253 def on_connection_encryption_key_refresh(self, connection): 4254 logger.debug( 4255 f'*** Connection Key Refresh: [0x{connection.handle:04X}] ' 4256 f'{connection.peer_address} as {connection.role_name}' 4257 ) 4258 connection.emit('connection_encryption_key_refresh') 4259 4260 @host_event_handler 4261 @with_connection_from_handle 4262 def on_connection_parameters_update(self, connection, connection_parameters): 4263 logger.debug( 4264 f'*** Connection Parameters Update: [0x{connection.handle:04X}] ' 4265 f'{connection.peer_address} as {connection.role_name}, ' 4266 f'{connection_parameters}' 4267 ) 4268 connection.parameters = connection_parameters 4269 connection.emit('connection_parameters_update') 4270 4271 @host_event_handler 4272 @with_connection_from_handle 4273 def on_connection_parameters_update_failure(self, connection, error): 4274 logger.debug( 4275 f'*** Connection Parameters Update Failed: [0x{connection.handle:04X}] ' 4276 f'{connection.peer_address} as {connection.role_name}, ' 4277 f'error={error}' 4278 ) 4279 connection.emit('connection_parameters_update_failure', error) 4280 4281 @host_event_handler 4282 @with_connection_from_handle 4283 def on_connection_phy_update(self, connection, connection_phy): 4284 logger.debug( 4285 f'*** Connection PHY Update: [0x{connection.handle:04X}] ' 4286 f'{connection.peer_address} as {connection.role_name}, ' 4287 f'{connection_phy}' 4288 ) 4289 connection.phy = connection_phy 4290 connection.emit('connection_phy_update') 4291 4292 @host_event_handler 4293 @with_connection_from_handle 4294 def on_connection_phy_update_failure(self, connection, error): 4295 logger.debug( 4296 f'*** Connection PHY Update Failed: [0x{connection.handle:04X}] ' 4297 f'{connection.peer_address} as {connection.role_name}, ' 4298 f'error={error}' 4299 ) 4300 connection.emit('connection_phy_update_failure', error) 4301 4302 @host_event_handler 4303 @with_connection_from_handle 4304 def on_connection_att_mtu_update(self, connection, att_mtu): 4305 logger.debug( 4306 f'*** Connection ATT MTU Update: [0x{connection.handle:04X}] ' 4307 f'{connection.peer_address} as {connection.role_name}, ' 4308 f'{att_mtu}' 4309 ) 4310 connection.att_mtu = att_mtu 4311 connection.emit('connection_att_mtu_update') 4312 4313 @host_event_handler 4314 @with_connection_from_handle 4315 def on_connection_data_length_change( 4316 self, connection, max_tx_octets, max_tx_time, max_rx_octets, max_rx_time 4317 ): 4318 logger.debug( 4319 f'*** Connection Data Length Change: [0x{connection.handle:04X}] ' 4320 f'{connection.peer_address} as {connection.role_name}' 4321 ) 4322 connection.data_length = ( 4323 max_tx_octets, 4324 max_tx_time, 4325 max_rx_octets, 4326 max_rx_time, 4327 ) 4328 connection.emit('connection_data_length_change') 4329 4330 # [Classic only] 4331 @host_event_handler 4332 @with_connection_from_address 4333 def on_role_change(self, connection, new_role): 4334 connection.role = new_role 4335 connection.emit('role_change', new_role) 4336 4337 # [Classic only] 4338 @host_event_handler 4339 @try_with_connection_from_address 4340 def on_role_change_failure(self, connection, address, error): 4341 if connection: 4342 connection.emit('role_change_failure', error) 4343 self.emit('role_change_failure', address, error) 4344 4345 # [Classic only] 4346 @host_event_handler 4347 @with_connection_from_address 4348 def on_classic_pairing(self, connection: Connection) -> None: 4349 connection.emit('classic_pairing') 4350 4351 # [Classic only] 4352 @host_event_handler 4353 @with_connection_from_address 4354 def on_classic_pairing_failure(self, connection: Connection, status) -> None: 4355 connection.emit('classic_pairing_failure', status) 4356 4357 def on_pairing_start(self, connection: Connection) -> None: 4358 connection.emit('pairing_start') 4359 4360 def on_pairing( 4361 self, 4362 connection: Connection, 4363 identity_address: Optional[Address], 4364 keys: PairingKeys, 4365 sc: bool, 4366 ) -> None: 4367 if identity_address is not None: 4368 connection.peer_resolvable_address = connection.peer_address 4369 connection.peer_address = identity_address 4370 connection.sc = sc 4371 connection.authenticated = True 4372 connection.emit('pairing', keys) 4373 4374 def on_pairing_failure(self, connection: Connection, reason: int) -> None: 4375 connection.emit('pairing_failure', reason) 4376 4377 @with_connection_from_handle 4378 def on_gatt_pdu(self, connection, pdu): 4379 # Parse the L2CAP payload into an ATT PDU object 4380 att_pdu = ATT_PDU.from_bytes(pdu) 4381 4382 # Conveniently, even-numbered op codes are client->server and 4383 # odd-numbered ones are server->client 4384 if att_pdu.op_code & 1: 4385 if connection.gatt_client is None: 4386 logger.warning( 4387 color('no GATT client for connection 0x{connection_handle:04X}') 4388 ) 4389 return 4390 connection.gatt_client.on_gatt_pdu(att_pdu) 4391 else: 4392 if connection.gatt_server is None: 4393 logger.warning( 4394 color('no GATT server for connection 0x{connection_handle:04X}') 4395 ) 4396 return 4397 connection.gatt_server.on_gatt_pdu(connection, att_pdu) 4398 4399 @with_connection_from_handle 4400 def on_smp_pdu(self, connection, pdu): 4401 self.smp_manager.on_smp_pdu(connection, pdu) 4402 4403 @host_event_handler 4404 @with_connection_from_handle 4405 def on_l2cap_pdu(self, connection: Connection, cid: int, pdu: bytes): 4406 self.l2cap_channel_manager.on_pdu(connection, cid, pdu) 4407 4408 def __str__(self): 4409 return ( 4410 f'Device(name="{self.name}", ' 4411 f'random_address="{self.random_address}", ' 4412 f'public_address="{self.public_address}")' 4413 ) 4414