1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19from enum import IntEnum 20import functools 21import json 22import asyncio 23import logging 24from contextlib import asynccontextmanager, AsyncExitStack 25from dataclasses import dataclass 26from typing import Any, ClassVar, Dict, List, Optional, Tuple, Union 27 28from .colors import color 29from .att import ATT_CID, ATT_DEFAULT_MTU, ATT_PDU 30from .gatt import Characteristic, Descriptor, Service 31from .hci import ( 32 HCI_CENTRAL_ROLE, 33 HCI_COMMAND_STATUS_PENDING, 34 HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, 35 HCI_DISPLAY_ONLY_IO_CAPABILITY, 36 HCI_DISPLAY_YES_NO_IO_CAPABILITY, 37 HCI_EXTENDED_INQUIRY_MODE, 38 HCI_GENERAL_INQUIRY_LAP, 39 HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR, 40 HCI_KEYBOARD_ONLY_IO_CAPABILITY, 41 HCI_LE_1M_PHY, 42 HCI_LE_1M_PHY_BIT, 43 HCI_LE_2M_PHY, 44 HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE, 45 HCI_LE_CLEAR_RESOLVING_LIST_COMMAND, 46 HCI_LE_CODED_PHY, 47 HCI_LE_CODED_PHY_BIT, 48 HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE, 49 HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE, 50 HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND, 51 HCI_LE_RAND_COMMAND, 52 HCI_LE_READ_PHY_COMMAND, 53 HCI_LE_SET_PHY_COMMAND, 54 HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 55 HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 56 HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 57 HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 58 HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, 59 HCI_R2_PAGE_SCAN_REPETITION_MODE, 60 HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR, 61 HCI_SUCCESS, 62 HCI_WRITE_LE_HOST_SUPPORT_COMMAND, 63 Address, 64 HCI_Accept_Connection_Request_Command, 65 HCI_Authentication_Requested_Command, 66 HCI_Command_Status_Event, 67 HCI_Constant, 68 HCI_Create_Connection_Cancel_Command, 69 HCI_Create_Connection_Command, 70 HCI_Disconnect_Command, 71 HCI_Encryption_Change_Event, 72 HCI_Error, 73 HCI_IO_Capability_Request_Reply_Command, 74 HCI_Inquiry_Cancel_Command, 75 HCI_Inquiry_Command, 76 HCI_LE_Add_Device_To_Resolving_List_Command, 77 HCI_LE_Advertising_Report_Event, 78 HCI_LE_Clear_Resolving_List_Command, 79 HCI_LE_Connection_Update_Command, 80 HCI_LE_Create_Connection_Cancel_Command, 81 HCI_LE_Create_Connection_Command, 82 HCI_LE_Enable_Encryption_Command, 83 HCI_LE_Extended_Advertising_Report_Event, 84 HCI_LE_Extended_Create_Connection_Command, 85 HCI_LE_Rand_Command, 86 HCI_LE_Read_PHY_Command, 87 HCI_LE_Set_Advertising_Data_Command, 88 HCI_LE_Set_Advertising_Enable_Command, 89 HCI_LE_Set_Advertising_Parameters_Command, 90 HCI_LE_Set_Default_PHY_Command, 91 HCI_LE_Set_Extended_Scan_Enable_Command, 92 HCI_LE_Set_Extended_Scan_Parameters_Command, 93 HCI_LE_Set_PHY_Command, 94 HCI_LE_Set_Random_Address_Command, 95 HCI_LE_Set_Scan_Enable_Command, 96 HCI_LE_Set_Scan_Parameters_Command, 97 HCI_LE_Set_Scan_Response_Data_Command, 98 HCI_PIN_Code_Request_Reply_Command, 99 HCI_PIN_Code_Request_Negative_Reply_Command, 100 HCI_Read_BD_ADDR_Command, 101 HCI_Read_RSSI_Command, 102 HCI_Reject_Connection_Request_Command, 103 HCI_Remote_Name_Request_Command, 104 HCI_Switch_Role_Command, 105 HCI_Set_Connection_Encryption_Command, 106 HCI_StatusError, 107 HCI_User_Confirmation_Request_Negative_Reply_Command, 108 HCI_User_Confirmation_Request_Reply_Command, 109 HCI_User_Passkey_Request_Negative_Reply_Command, 110 HCI_User_Passkey_Request_Reply_Command, 111 HCI_Write_Class_Of_Device_Command, 112 HCI_Write_Extended_Inquiry_Response_Command, 113 HCI_Write_Inquiry_Mode_Command, 114 HCI_Write_LE_Host_Support_Command, 115 HCI_Write_Local_Name_Command, 116 HCI_Write_Scan_Enable_Command, 117 HCI_Write_Secure_Connections_Host_Support_Command, 118 HCI_Write_Simple_Pairing_Mode_Command, 119 OwnAddressType, 120 phy_list_to_bits, 121) 122from .host import Host 123from .gap import GenericAccessService 124from .core import ( 125 BT_BR_EDR_TRANSPORT, 126 BT_CENTRAL_ROLE, 127 BT_LE_TRANSPORT, 128 BT_PERIPHERAL_ROLE, 129 AdvertisingData, 130 CommandTimeoutError, 131 ConnectionPHY, 132 InvalidStateError, 133) 134from .utils import ( 135 AsyncRunner, 136 CompositeEventEmitter, 137 setup_event_forwarding, 138 composite_listener, 139) 140from .keys import ( 141 KeyStore, 142 PairingKeys, 143) 144from . import gatt_client 145from . import gatt_server 146from . import smp 147from . import sdp 148from . import l2cap 149from . import core 150 151 152# ----------------------------------------------------------------------------- 153# Logging 154# ----------------------------------------------------------------------------- 155logger = logging.getLogger(__name__) 156 157# ----------------------------------------------------------------------------- 158# Constants 159# ----------------------------------------------------------------------------- 160# fmt: off 161# pylint: disable=line-too-long 162 163DEVICE_MIN_SCAN_INTERVAL = 25 164DEVICE_MAX_SCAN_INTERVAL = 10240 165DEVICE_MIN_SCAN_WINDOW = 25 166DEVICE_MAX_SCAN_WINDOW = 10240 167DEVICE_MIN_LE_RSSI = -127 168DEVICE_MAX_LE_RSSI = 20 169 170DEVICE_DEFAULT_ADDRESS = '00:00:00:00:00:00' 171DEVICE_DEFAULT_ADVERTISING_INTERVAL = 1000 # ms 172DEVICE_DEFAULT_ADVERTISING_DATA = '' 173DEVICE_DEFAULT_NAME = 'Bumble' 174DEVICE_DEFAULT_INQUIRY_LENGTH = 8 # 10.24 seconds 175DEVICE_DEFAULT_CLASS_OF_DEVICE = 0 176DEVICE_DEFAULT_SCAN_RESPONSE_DATA = b'' 177DEVICE_DEFAULT_DATA_LENGTH = (27, 328, 27, 328) 178DEVICE_DEFAULT_SCAN_INTERVAL = 60 # ms 179DEVICE_DEFAULT_SCAN_WINDOW = 60 # ms 180DEVICE_DEFAULT_CONNECT_TIMEOUT = None # No timeout 181DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL = 60 # ms 182DEVICE_DEFAULT_CONNECT_SCAN_WINDOW = 60 # ms 183DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN = 15 # ms 184DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX = 30 # ms 185DEVICE_DEFAULT_CONNECTION_MAX_LATENCY = 0 186DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT = 720 # ms 187DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH = 0 # ms 188DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH = 0 # ms 189DEVICE_DEFAULT_L2CAP_COC_MTU = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MTU 190DEVICE_DEFAULT_L2CAP_COC_MPS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_MPS 191DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS = l2cap.L2CAP_LE_CREDIT_BASED_CONNECTION_DEFAULT_INITIAL_CREDITS 192 193# fmt: on 194# pylint: enable=line-too-long 195 196 197# ----------------------------------------------------------------------------- 198# Classes 199# ----------------------------------------------------------------------------- 200 201# ----------------------------------------------------------------------------- 202class Advertisement: 203 address: Address 204 205 TX_POWER_NOT_AVAILABLE = ( 206 HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE 207 ) 208 RSSI_NOT_AVAILABLE = HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE 209 210 @classmethod 211 def from_advertising_report(cls, report): 212 if isinstance(report, HCI_LE_Advertising_Report_Event.Report): 213 return LegacyAdvertisement.from_advertising_report(report) 214 215 if isinstance(report, HCI_LE_Extended_Advertising_Report_Event.Report): 216 return ExtendedAdvertisement.from_advertising_report(report) 217 218 return None 219 220 # pylint: disable=line-too-long 221 def __init__( 222 self, 223 address, 224 rssi=HCI_LE_Extended_Advertising_Report_Event.RSSI_NOT_AVAILABLE, 225 is_legacy=False, 226 is_anonymous=False, 227 is_connectable=False, 228 is_directed=False, 229 is_scannable=False, 230 is_scan_response=False, 231 is_complete=True, 232 is_truncated=False, 233 primary_phy=0, 234 secondary_phy=0, 235 tx_power=HCI_LE_Extended_Advertising_Report_Event.TX_POWER_INFORMATION_NOT_AVAILABLE, 236 sid=0, 237 data=b'', 238 ): 239 self.address = address 240 self.rssi = rssi 241 self.is_legacy = is_legacy 242 self.is_anonymous = is_anonymous 243 self.is_connectable = is_connectable 244 self.is_directed = is_directed 245 self.is_scannable = is_scannable 246 self.is_scan_response = is_scan_response 247 self.is_complete = is_complete 248 self.is_truncated = is_truncated 249 self.primary_phy = primary_phy 250 self.secondary_phy = secondary_phy 251 self.tx_power = tx_power 252 self.sid = sid 253 self.data = AdvertisingData.from_bytes(data) 254 255 256# ----------------------------------------------------------------------------- 257class LegacyAdvertisement(Advertisement): 258 @classmethod 259 def from_advertising_report(cls, report): 260 return cls( 261 address=report.address, 262 rssi=report.rssi, 263 is_legacy=True, 264 is_connectable=report.event_type 265 in ( 266 HCI_LE_Advertising_Report_Event.ADV_IND, 267 HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, 268 ), 269 is_directed=report.event_type 270 == HCI_LE_Advertising_Report_Event.ADV_DIRECT_IND, 271 is_scannable=report.event_type 272 in ( 273 HCI_LE_Advertising_Report_Event.ADV_IND, 274 HCI_LE_Advertising_Report_Event.ADV_SCAN_IND, 275 ), 276 is_scan_response=report.event_type 277 == HCI_LE_Advertising_Report_Event.SCAN_RSP, 278 data=report.data, 279 ) 280 281 282# ----------------------------------------------------------------------------- 283class ExtendedAdvertisement(Advertisement): 284 @classmethod 285 def from_advertising_report(cls, report): 286 # fmt: off 287 # pylint: disable=line-too-long 288 return cls( 289 address = report.address, 290 rssi = report.rssi, 291 is_legacy = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.LEGACY_ADVERTISING_PDU_USED) != 0, 292 is_anonymous = report.address.address_type == HCI_LE_Extended_Advertising_Report_Event.ANONYMOUS_ADDRESS_TYPE, 293 is_connectable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.CONNECTABLE_ADVERTISING) != 0, 294 is_directed = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.DIRECTED_ADVERTISING) != 0, 295 is_scannable = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCANNABLE_ADVERTISING) != 0, 296 is_scan_response = report.event_type & (1 << HCI_LE_Extended_Advertising_Report_Event.SCAN_RESPONSE) != 0, 297 is_complete = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_COMPLETE, 298 is_truncated = (report.event_type >> 5 & 3) == HCI_LE_Extended_Advertising_Report_Event.DATA_INCOMPLETE_TRUNCATED_NO_MORE_TO_COME, 299 primary_phy = report.primary_phy, 300 secondary_phy = report.secondary_phy, 301 tx_power = report.tx_power, 302 sid = report.advertising_sid, 303 data = report.data 304 ) 305 # fmt: on 306 307 308# ----------------------------------------------------------------------------- 309class AdvertisementDataAccumulator: 310 def __init__(self, passive=False): 311 self.passive = passive 312 self.last_advertisement = None 313 self.last_data = b'' 314 315 def update(self, report): 316 advertisement = Advertisement.from_advertising_report(report) 317 if advertisement is None: 318 return None 319 320 result = None 321 322 if advertisement.is_scan_response: 323 if ( 324 self.last_advertisement is not None 325 and not self.last_advertisement.is_scan_response 326 ): 327 # This is the response to a scannable advertisement 328 result = Advertisement.from_advertising_report(report) 329 result.is_connectable = self.last_advertisement.is_connectable 330 result.is_scannable = True 331 result.data = AdvertisingData.from_bytes(self.last_data + report.data) 332 self.last_data = b'' 333 else: 334 if ( 335 self.passive 336 or (not advertisement.is_scannable) 337 or ( 338 self.last_advertisement is not None 339 and not self.last_advertisement.is_scan_response 340 ) 341 ): 342 # Don't wait for a scan response 343 result = Advertisement.from_advertising_report(report) 344 345 self.last_data = report.data 346 347 self.last_advertisement = advertisement 348 349 return result 350 351 352# ----------------------------------------------------------------------------- 353class AdvertisingType(IntEnum): 354 # fmt: off 355 # pylint: disable=line-too-long 356 UNDIRECTED_CONNECTABLE_SCANNABLE = 0x00 # Undirected, connectable, scannable 357 DIRECTED_CONNECTABLE_HIGH_DUTY = 0x01 # Directed, connectable, non-scannable 358 UNDIRECTED_SCANNABLE = 0x02 # Undirected, non-connectable, scannable 359 UNDIRECTED = 0x03 # Undirected, non-connectable, non-scannable 360 DIRECTED_CONNECTABLE_LOW_DUTY = 0x04 # Directed, connectable, non-scannable 361 # fmt: on 362 363 @property 364 def has_data(self): 365 return self in ( 366 AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 367 AdvertisingType.UNDIRECTED_SCANNABLE, 368 AdvertisingType.UNDIRECTED, 369 ) 370 371 @property 372 def is_connectable(self): 373 return self in ( 374 AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 375 AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, 376 AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY, 377 ) 378 379 @property 380 def is_scannable(self): 381 return self in ( 382 AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 383 AdvertisingType.UNDIRECTED_SCANNABLE, 384 ) 385 386 @property 387 def is_directed(self): 388 return self in ( 389 AdvertisingType.DIRECTED_CONNECTABLE_HIGH_DUTY, 390 AdvertisingType.DIRECTED_CONNECTABLE_LOW_DUTY, 391 ) 392 393 394# ----------------------------------------------------------------------------- 395class LePhyOptions: 396 # Coded PHY preference 397 ANY_CODED_PHY = 0 398 PREFER_S_2_CODED_PHY = 1 399 PREFER_S_8_CODED_PHY = 2 400 401 def __init__(self, coded_phy_preference=0): 402 self.coded_phy_preference = coded_phy_preference 403 404 def __int__(self): 405 return self.coded_phy_preference & 3 406 407 408# ----------------------------------------------------------------------------- 409class Peer: 410 def __init__(self, connection): 411 self.connection = connection 412 413 # Create a GATT client for the connection 414 self.gatt_client = gatt_client.Client(connection) 415 connection.gatt_client = self.gatt_client 416 417 @property 418 def services(self): 419 return self.gatt_client.services 420 421 async def request_mtu(self, mtu): 422 mtu = await self.gatt_client.request_mtu(mtu) 423 self.connection.emit('connection_att_mtu_update') 424 return mtu 425 426 async def discover_service(self, uuid): 427 return await self.gatt_client.discover_service(uuid) 428 429 async def discover_services(self, uuids=()): 430 return await self.gatt_client.discover_services(uuids) 431 432 async def discover_included_services(self, service): 433 return await self.gatt_client.discover_included_services(service) 434 435 async def discover_characteristics(self, uuids=(), service=None): 436 return await self.gatt_client.discover_characteristics( 437 uuids=uuids, service=service 438 ) 439 440 async def discover_descriptors( 441 self, characteristic=None, start_handle=None, end_handle=None 442 ): 443 return await self.gatt_client.discover_descriptors( 444 characteristic, start_handle, end_handle 445 ) 446 447 async def discover_attributes(self): 448 return await self.gatt_client.discover_attributes() 449 450 async def subscribe(self, characteristic, subscriber=None, prefer_notify=True): 451 return await self.gatt_client.subscribe( 452 characteristic, subscriber, prefer_notify 453 ) 454 455 async def unsubscribe(self, characteristic, subscriber=None): 456 return await self.gatt_client.unsubscribe(characteristic, subscriber) 457 458 async def read_value(self, attribute): 459 return await self.gatt_client.read_value(attribute) 460 461 async def write_value(self, attribute, value, with_response=False): 462 return await self.gatt_client.write_value(attribute, value, with_response) 463 464 async def read_characteristics_by_uuid(self, uuid, service=None): 465 return await self.gatt_client.read_characteristics_by_uuid(uuid, service) 466 467 def get_services_by_uuid(self, uuid): 468 return self.gatt_client.get_services_by_uuid(uuid) 469 470 def get_characteristics_by_uuid(self, uuid, service=None): 471 return self.gatt_client.get_characteristics_by_uuid(uuid, service) 472 473 def create_service_proxy(self, proxy_class): 474 return proxy_class.from_client(self.gatt_client) 475 476 async def discover_service_and_create_proxy(self, proxy_class): 477 # Discover the first matching service and its characteristics 478 services = await self.discover_service(proxy_class.SERVICE_CLASS.UUID) 479 if services: 480 service = services[0] 481 await service.discover_characteristics() 482 return self.create_service_proxy(proxy_class) 483 484 async def sustain(self, timeout=None): 485 await self.connection.sustain(timeout) 486 487 # [Classic only] 488 async def request_name(self): 489 return await self.connection.request_remote_name() 490 491 async def __aenter__(self): 492 await self.discover_services() 493 for service in self.services: 494 await service.discover_characteristics() 495 496 return self 497 498 async def __aexit__(self, exc_type, exc_value, traceback): 499 pass 500 501 def __str__(self): 502 return f'{self.connection.peer_address} as {self.connection.role_name}' 503 504 505# ----------------------------------------------------------------------------- 506@dataclass 507class ConnectionParametersPreferences: 508 default: ClassVar[ConnectionParametersPreferences] 509 connection_interval_min: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MIN 510 connection_interval_max: int = DEVICE_DEFAULT_CONNECTION_INTERVAL_MAX 511 max_latency: int = DEVICE_DEFAULT_CONNECTION_MAX_LATENCY 512 supervision_timeout: int = DEVICE_DEFAULT_CONNECTION_SUPERVISION_TIMEOUT 513 min_ce_length: int = DEVICE_DEFAULT_CONNECTION_MIN_CE_LENGTH 514 max_ce_length: int = DEVICE_DEFAULT_CONNECTION_MAX_CE_LENGTH 515 516 517ConnectionParametersPreferences.default = ConnectionParametersPreferences() 518 519 520# ----------------------------------------------------------------------------- 521class Connection(CompositeEventEmitter): 522 device: Device 523 handle: int 524 transport: int 525 self_address: Address 526 peer_address: Address 527 role: int 528 encryption: int 529 authenticated: bool 530 sc: bool 531 link_key_type: int 532 533 @composite_listener 534 class Listener: 535 def on_disconnection(self, reason): 536 pass 537 538 def on_connection_parameters_update(self): 539 pass 540 541 def on_connection_parameters_update_failure(self, error): 542 pass 543 544 def on_connection_data_length_change(self): 545 pass 546 547 def on_connection_phy_update(self): 548 pass 549 550 def on_connection_phy_update_failure(self, error): 551 pass 552 553 def on_connection_att_mtu_update(self): 554 pass 555 556 def on_connection_encryption_change(self): 557 pass 558 559 def on_connection_encryption_key_refresh(self): 560 pass 561 562 def __init__( 563 self, 564 device, 565 handle, 566 transport, 567 self_address, 568 peer_address, 569 peer_resolvable_address, 570 role, 571 parameters, 572 phy, 573 ): 574 super().__init__() 575 self.device = device 576 self.handle = handle 577 self.transport = transport 578 self.self_address = self_address 579 self.peer_address = peer_address 580 self.peer_resolvable_address = peer_resolvable_address 581 self.peer_name = None # Classic only 582 self.role = role 583 self.parameters = parameters 584 self.encryption = 0 585 self.authenticated = False 586 self.sc = False 587 self.link_key_type = None 588 self.phy = phy 589 self.att_mtu = ATT_DEFAULT_MTU 590 self.data_length = DEVICE_DEFAULT_DATA_LENGTH 591 self.gatt_client = None # Per-connection client 592 self.gatt_server = ( 593 device.gatt_server 594 ) # By default, use the device's shared server 595 596 # [Classic only] 597 @classmethod 598 def incomplete(cls, device, peer_address): 599 """ 600 Instantiate an incomplete connection (ie. one waiting for a HCI Connection 601 Complete event). 602 Once received it shall be completed using the `.complete` method. 603 """ 604 return cls( 605 device, 606 None, 607 BT_BR_EDR_TRANSPORT, 608 device.public_address, 609 peer_address, 610 None, 611 None, 612 None, 613 None, 614 ) 615 616 # [Classic only] 617 def complete(self, handle, peer_resolvable_address, role, parameters): 618 """ 619 Finish an incomplete connection upon completion. 620 """ 621 assert self.handle is None 622 assert self.transport == BT_BR_EDR_TRANSPORT 623 self.handle = handle 624 self.peer_resolvable_address = peer_resolvable_address 625 # Quirk: role might be known before complete 626 if self.role is None: 627 self.role = role 628 self.parameters = parameters 629 630 @property 631 def role_name(self): 632 return 'CENTRAL' if self.role == BT_CENTRAL_ROLE else 'PERIPHERAL' 633 634 @property 635 def is_encrypted(self): 636 return self.encryption != 0 637 638 @property 639 def is_incomplete(self) -> bool: 640 return self.handle == None 641 642 def send_l2cap_pdu(self, cid, pdu): 643 self.device.send_l2cap_pdu(self.handle, cid, pdu) 644 645 def create_l2cap_connector(self, psm): 646 return self.device.create_l2cap_connector(self, psm) 647 648 async def open_l2cap_channel( 649 self, 650 psm, 651 max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS, 652 mtu=DEVICE_DEFAULT_L2CAP_COC_MTU, 653 mps=DEVICE_DEFAULT_L2CAP_COC_MPS, 654 ): 655 return await self.device.open_l2cap_channel(self, psm, max_credits, mtu, mps) 656 657 async def disconnect( 658 self, reason: int = HCI_REMOTE_USER_TERMINATED_CONNECTION_ERROR 659 ) -> None: 660 await self.device.disconnect(self, reason) 661 662 async def pair(self) -> None: 663 return await self.device.pair(self) 664 665 def request_pairing(self) -> None: 666 return self.device.request_pairing(self) 667 668 # [Classic only] 669 async def authenticate(self) -> None: 670 return await self.device.authenticate(self) 671 672 async def encrypt(self, enable: bool = True) -> None: 673 return await self.device.encrypt(self, enable) 674 675 async def switch_role(self, role: int) -> None: 676 return await self.device.switch_role(self, role) 677 678 async def sustain(self, timeout=None): 679 """Idles the current task waiting for a disconnect or timeout""" 680 681 abort = asyncio.get_running_loop().create_future() 682 self.on('disconnection', abort.set_result) 683 self.on('disconnection_failure', abort.set_exception) 684 685 try: 686 await asyncio.wait_for(self.device.abort_on('flush', abort), timeout) 687 except asyncio.TimeoutError: 688 pass 689 690 self.remove_listener('disconnection', abort.set_result) 691 self.remove_listener('disconnection_failure', abort.set_exception) 692 693 async def update_parameters( 694 self, 695 connection_interval_min, 696 connection_interval_max, 697 max_latency, 698 supervision_timeout, 699 ): 700 return await self.device.update_connection_parameters( 701 self, 702 connection_interval_min, 703 connection_interval_max, 704 max_latency, 705 supervision_timeout, 706 ) 707 708 async def set_phy(self, tx_phys=None, rx_phys=None, phy_options=None): 709 return await self.device.set_connection_phy(self, tx_phys, rx_phys, phy_options) 710 711 async def get_rssi(self): 712 return await self.device.get_connection_rssi(self) 713 714 async def get_phy(self): 715 return await self.device.get_connection_phy(self) 716 717 # [Classic only] 718 async def request_remote_name(self): 719 return await self.device.request_remote_name(self) 720 721 async def __aenter__(self): 722 return self 723 724 async def __aexit__(self, exc_type, exc_value, traceback): 725 if exc_type is None: 726 try: 727 await self.disconnect() 728 except HCI_StatusError as error: 729 # Invalid parameter means the connection is no longer valid 730 if error.error_code != HCI_INVALID_HCI_COMMAND_PARAMETERS_ERROR: 731 raise 732 733 def __str__(self): 734 return ( 735 f'Connection(handle=0x{self.handle:04X}, ' 736 f'role={self.role_name}, ' 737 f'address={self.peer_address})' 738 ) 739 740 741# ----------------------------------------------------------------------------- 742class DeviceConfiguration: 743 def __init__(self) -> None: 744 # Setup defaults 745 self.name = DEVICE_DEFAULT_NAME 746 self.address = Address(DEVICE_DEFAULT_ADDRESS) 747 self.class_of_device = DEVICE_DEFAULT_CLASS_OF_DEVICE 748 self.scan_response_data = DEVICE_DEFAULT_SCAN_RESPONSE_DATA 749 self.advertising_interval_min = DEVICE_DEFAULT_ADVERTISING_INTERVAL 750 self.advertising_interval_max = DEVICE_DEFAULT_ADVERTISING_INTERVAL 751 self.le_enabled = True 752 # LE host enable 2nd parameter 753 self.le_simultaneous_enabled = True 754 self.classic_enabled = False 755 self.classic_sc_enabled = True 756 self.classic_ssp_enabled = True 757 self.classic_accept_any = True 758 self.connectable = True 759 self.discoverable = True 760 self.advertising_data = bytes( 761 AdvertisingData( 762 [(AdvertisingData.COMPLETE_LOCAL_NAME, bytes(self.name, 'utf-8'))] 763 ) 764 ) 765 self.irk = bytes(16) # This really must be changed for any level of security 766 self.keystore = None 767 self.gatt_services: List[Dict[str, Any]] = [] 768 769 def load_from_dict(self, config: Dict[str, Any]) -> None: 770 # Load simple properties 771 self.name = config.get('name', self.name) 772 if address := config.get('address', None): 773 self.address = Address(address) 774 self.class_of_device = config.get('class_of_device', self.class_of_device) 775 self.advertising_interval_min = config.get( 776 'advertising_interval', self.advertising_interval_min 777 ) 778 self.advertising_interval_max = self.advertising_interval_min 779 self.keystore = config.get('keystore') 780 self.le_enabled = config.get('le_enabled', self.le_enabled) 781 self.le_simultaneous_enabled = config.get( 782 'le_simultaneous_enabled', self.le_simultaneous_enabled 783 ) 784 self.classic_enabled = config.get('classic_enabled', self.classic_enabled) 785 self.classic_sc_enabled = config.get( 786 'classic_sc_enabled', self.classic_sc_enabled 787 ) 788 self.classic_ssp_enabled = config.get( 789 'classic_ssp_enabled', self.classic_ssp_enabled 790 ) 791 self.classic_accept_any = config.get( 792 'classic_accept_any', self.classic_accept_any 793 ) 794 self.connectable = config.get('connectable', self.connectable) 795 self.discoverable = config.get('discoverable', self.discoverable) 796 self.gatt_services = config.get('gatt_services', self.gatt_services) 797 798 # Load or synthesize an IRK 799 irk = config.get('irk') 800 if irk: 801 self.irk = bytes.fromhex(irk) 802 else: 803 # Construct an IRK from the address bytes 804 # NOTE: this is not secure, but will always give the same IRK for the same 805 # address 806 address_bytes = bytes(self.address) 807 self.irk = (address_bytes * 3)[:16] 808 809 # Load advertising data 810 advertising_data = config.get('advertising_data') 811 if advertising_data: 812 self.advertising_data = bytes.fromhex(advertising_data) 813 814 def load_from_file(self, filename): 815 with open(filename, 'r', encoding='utf-8') as file: 816 self.load_from_dict(json.load(file)) 817 818 819# ----------------------------------------------------------------------------- 820# Decorators used with the following Device class 821# (we define them outside of the Device class, because defining decorators 822# within a class requires unnecessarily complicated acrobatics) 823# ----------------------------------------------------------------------------- 824 825 826# Decorator that converts the first argument from a connection handle to a connection 827def with_connection_from_handle(function): 828 @functools.wraps(function) 829 def wrapper(self, connection_handle, *args, **kwargs): 830 if (connection := self.lookup_connection(connection_handle)) is None: 831 raise ValueError(f"no connection for handle: 0x{connection_handle:04x}") 832 return function(self, connection, *args, **kwargs) 833 834 return wrapper 835 836 837# Decorator that converts the first argument from a bluetooth address to a connection 838def with_connection_from_address(function): 839 @functools.wraps(function) 840 def wrapper(self, address, *args, **kwargs): 841 if connection := self.pending_connections.get(address, False): 842 return function(self, connection, *args, **kwargs) 843 for connection in self.connections.values(): 844 if connection.peer_address == address: 845 return function(self, connection, *args, **kwargs) 846 raise ValueError('no connection for address') 847 848 return wrapper 849 850 851# Decorator that tries to convert the first argument from a bluetooth address to a 852# connection 853def try_with_connection_from_address(function): 854 @functools.wraps(function) 855 def wrapper(self, address, *args, **kwargs): 856 if connection := self.pending_connections.get(address, False): 857 return function(self, connection, address, *args, **kwargs) 858 for connection in self.connections.values(): 859 if connection.peer_address == address: 860 return function(self, connection, address, *args, **kwargs) 861 return function(self, None, address, *args, **kwargs) 862 863 return wrapper 864 865 866# Decorator that adds a method to the list of event handlers for host events. 867# This assumes that the method name starts with `on_` 868def host_event_handler(function): 869 device_host_event_handlers.append(function.__name__[3:]) 870 return function 871 872 873# List of host event handlers for the Device class. 874# (we define this list outside the class, because referencing a class in method 875# decorators is not straightforward) 876device_host_event_handlers: list[str] = [] 877 878 879# ----------------------------------------------------------------------------- 880class Device(CompositeEventEmitter): 881 # incomplete list of fields. 882 random_address: Address 883 public_address: Address 884 classic_enabled: bool 885 name: str 886 class_of_device: int 887 gatt_server: gatt_server.Server 888 advertising_data: bytes 889 scan_response_data: bytes 890 connections: Dict[int, Connection] 891 pending_connections: Dict[Address, Connection] 892 classic_pending_accepts: Dict[ 893 Address, List[asyncio.Future[Union[Connection, Tuple[Address, int, int]]]] 894 ] 895 advertisement_accumulators: Dict[Address, AdvertisementDataAccumulator] 896 897 @composite_listener 898 class Listener: 899 def on_advertisement(self, advertisement): 900 pass 901 902 def on_inquiry_result(self, address, class_of_device, data, rssi): 903 pass 904 905 def on_connection(self, connection): 906 pass 907 908 def on_connection_failure(self, error): 909 pass 910 911 def on_connection_request(self, bd_addr, class_of_device, link_type): 912 pass 913 914 def on_characteristic_subscription( 915 self, connection, characteristic, notify_enabled, indicate_enabled 916 ): 917 pass 918 919 @classmethod 920 def with_hci(cls, name, address, hci_source, hci_sink): 921 ''' 922 Create a Device instance with a Host configured to communicate with a controller 923 through an HCI source/sink 924 ''' 925 host = Host(controller_source=hci_source, controller_sink=hci_sink) 926 return cls(name=name, address=address, host=host) 927 928 @classmethod 929 def from_config_file(cls, filename): 930 config = DeviceConfiguration() 931 config.load_from_file(filename) 932 return cls(config=config) 933 934 @classmethod 935 def from_config_file_with_hci(cls, filename, hci_source, hci_sink): 936 config = DeviceConfiguration() 937 config.load_from_file(filename) 938 host = Host(controller_source=hci_source, controller_sink=hci_sink) 939 return cls(config=config, host=host) 940 941 def __init__( 942 self, 943 name: Optional[str] = None, 944 address: Optional[Address] = None, 945 config: Optional[DeviceConfiguration] = None, 946 host: Optional[Host] = None, 947 generic_access_service: bool = True, 948 ) -> None: 949 super().__init__() 950 951 self._host = None 952 self.powered_on = False 953 self.advertising = False 954 self.advertising_type = None 955 self.auto_restart_inquiry = True 956 self.auto_restart_advertising = False 957 self.command_timeout = 10 # seconds 958 self.gatt_server = gatt_server.Server(self) 959 self.sdp_server = sdp.Server(self) 960 self.l2cap_channel_manager = l2cap.ChannelManager( 961 [l2cap.L2CAP_Information_Request.EXTENDED_FEATURE_FIXED_CHANNELS] 962 ) 963 self.advertisement_accumulators = {} # Accumulators, by address 964 self.scanning = False 965 self.scanning_is_passive = False 966 self.discovering = False 967 self.le_connecting = False 968 self.disconnecting = False 969 self.connections = {} # Connections, by connection handle 970 self.pending_connections = {} # Connections, by BD address (BR/EDR only) 971 self.classic_enabled = False 972 self.inquiry_response = None 973 self.address_resolver = None 974 self.classic_pending_accepts = { 975 Address.ANY: [] 976 } # Futures, by BD address OR [Futures] for Address.ANY 977 978 # Own address type cache 979 self.advertising_own_address_type = None 980 self.connect_own_address_type = None 981 982 # Use the initial config or a default 983 self.public_address = Address('00:00:00:00:00:00') 984 if config is None: 985 config = DeviceConfiguration() 986 self.name = config.name 987 self.random_address = config.address 988 self.class_of_device = config.class_of_device 989 self.scan_response_data = config.scan_response_data 990 self.advertising_data = config.advertising_data 991 self.advertising_interval_min = config.advertising_interval_min 992 self.advertising_interval_max = config.advertising_interval_max 993 self.keystore = KeyStore.create_for_device(config) 994 self.irk = config.irk 995 self.le_enabled = config.le_enabled 996 self.classic_enabled = config.classic_enabled 997 self.le_simultaneous_enabled = config.le_simultaneous_enabled 998 self.classic_ssp_enabled = config.classic_ssp_enabled 999 self.classic_sc_enabled = config.classic_sc_enabled 1000 self.discoverable = config.discoverable 1001 self.connectable = config.connectable 1002 self.classic_accept_any = config.classic_accept_any 1003 1004 for service in config.gatt_services: 1005 characteristics = [] 1006 for characteristic in service.get("characteristics", []): 1007 descriptors = [] 1008 for descriptor in characteristic.get("descriptors", []): 1009 # Leave this check until 5/25/2023 1010 if descriptor.get("permission", False): 1011 raise Exception( 1012 "Error parsing Device Config's GATT Services. The key 'permission' must be renamed to 'permissions'" 1013 ) 1014 new_descriptor = Descriptor( 1015 attribute_type=descriptor["descriptor_type"], 1016 permissions=descriptor["permissions"], 1017 ) 1018 descriptors.append(new_descriptor) 1019 new_characteristic = Characteristic( 1020 uuid=characteristic["uuid"], 1021 properties=characteristic["properties"], 1022 permissions=characteristic["permissions"], 1023 descriptors=descriptors, 1024 ) 1025 characteristics.append(new_characteristic) 1026 new_service = Service(uuid=service["uuid"], characteristics=characteristics) 1027 self.gatt_server.add_service(new_service) 1028 1029 # If a name is passed, override the name from the config 1030 if name: 1031 self.name = name 1032 1033 # If an address is passed, override the address from the config 1034 if address: 1035 if isinstance(address, str): 1036 address = Address(address) 1037 self.random_address = address 1038 1039 # Setup SMP 1040 self.smp_manager = smp.Manager(self) 1041 self.l2cap_channel_manager.register_fixed_channel(smp.SMP_CID, self.on_smp_pdu) 1042 self.l2cap_channel_manager.register_fixed_channel( 1043 smp.SMP_BR_CID, self.on_smp_pdu 1044 ) 1045 1046 # Register the SDP server with the L2CAP Channel Manager 1047 self.sdp_server.register(self.l2cap_channel_manager) 1048 1049 self.add_default_services(generic_access_service) 1050 self.l2cap_channel_manager.register_fixed_channel(ATT_CID, self.on_gatt_pdu) 1051 1052 # Forward some events 1053 setup_event_forwarding(self.gatt_server, self, 'characteristic_subscription') 1054 1055 # Set the initial host 1056 if host: 1057 self.host = host 1058 1059 @property 1060 def host(self) -> Host: 1061 assert self._host 1062 return self._host 1063 1064 @host.setter 1065 def host(self, host): 1066 # Unsubscribe from events from the current host 1067 if self._host: 1068 for event_name in device_host_event_handlers: 1069 self._host.remove_listener( 1070 event_name, getattr(self, f'on_{event_name}') 1071 ) 1072 1073 # Subscribe to events from the new host 1074 if host: 1075 for event_name in device_host_event_handlers: 1076 host.on(event_name, getattr(self, f'on_{event_name}')) 1077 1078 # Update the references to the new host 1079 self._host = host 1080 self.l2cap_channel_manager.host = host 1081 1082 # Set providers for the new host 1083 if host: 1084 host.long_term_key_provider = self.get_long_term_key 1085 host.link_key_provider = self.get_link_key 1086 1087 @property 1088 def sdp_service_records(self): 1089 return self.sdp_server.service_records 1090 1091 @sdp_service_records.setter 1092 def sdp_service_records(self, service_records): 1093 self.sdp_server.service_records = service_records 1094 1095 def lookup_connection(self, connection_handle: int) -> Optional[Connection]: 1096 if connection := self.connections.get(connection_handle): 1097 return connection 1098 1099 return None 1100 1101 def find_connection_by_bd_addr( 1102 self, 1103 bd_addr: Address, 1104 transport: Optional[int] = None, 1105 check_address_type: bool = False, 1106 ) -> Optional[Connection]: 1107 for connection in self.connections.values(): 1108 if connection.peer_address.to_bytes() == bd_addr.to_bytes(): 1109 if ( 1110 check_address_type 1111 and connection.peer_address.address_type != bd_addr.address_type 1112 ): 1113 continue 1114 if transport is None or connection.transport == transport: 1115 return connection 1116 1117 return None 1118 1119 def create_l2cap_connector(self, connection, psm): 1120 return lambda: self.l2cap_channel_manager.connect(connection, psm) 1121 1122 def create_l2cap_registrar(self, psm): 1123 return lambda handler: self.register_l2cap_server(psm, handler) 1124 1125 def register_l2cap_server(self, psm, server): 1126 self.l2cap_channel_manager.register_server(psm, server) 1127 1128 def register_l2cap_channel_server( 1129 self, 1130 psm, 1131 server, 1132 max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS, 1133 mtu=DEVICE_DEFAULT_L2CAP_COC_MTU, 1134 mps=DEVICE_DEFAULT_L2CAP_COC_MPS, 1135 ): 1136 return self.l2cap_channel_manager.register_le_coc_server( 1137 psm, server, max_credits, mtu, mps 1138 ) 1139 1140 async def open_l2cap_channel( 1141 self, 1142 connection, 1143 psm, 1144 max_credits=DEVICE_DEFAULT_L2CAP_COC_MAX_CREDITS, 1145 mtu=DEVICE_DEFAULT_L2CAP_COC_MTU, 1146 mps=DEVICE_DEFAULT_L2CAP_COC_MPS, 1147 ): 1148 return await self.l2cap_channel_manager.open_le_coc( 1149 connection, psm, max_credits, mtu, mps 1150 ) 1151 1152 def send_l2cap_pdu(self, connection_handle, cid, pdu): 1153 self.host.send_l2cap_pdu(connection_handle, cid, pdu) 1154 1155 async def send_command(self, command, check_result=False): 1156 try: 1157 return await asyncio.wait_for( 1158 self.host.send_command(command, check_result), self.command_timeout 1159 ) 1160 except asyncio.TimeoutError as error: 1161 logger.warning('!!! Command timed out') 1162 raise CommandTimeoutError() from error 1163 1164 async def power_on(self) -> None: 1165 # Reset the controller 1166 await self.host.reset() 1167 1168 response = await self.send_command(HCI_Read_BD_ADDR_Command()) # type: ignore[call-arg] 1169 if response.return_parameters.status == HCI_SUCCESS: 1170 logger.debug( 1171 color(f'BD_ADDR: {response.return_parameters.bd_addr}', 'yellow') 1172 ) 1173 self.public_address = response.return_parameters.bd_addr 1174 1175 if self.host.supports_command(HCI_WRITE_LE_HOST_SUPPORT_COMMAND): 1176 await self.send_command( 1177 HCI_Write_LE_Host_Support_Command( 1178 le_supported_host=int(self.le_enabled), 1179 simultaneous_le_host=int(self.le_simultaneous_enabled), 1180 ) # type: ignore[call-arg] 1181 ) 1182 1183 if self.le_enabled: 1184 # Set the controller address 1185 if self.random_address == Address.ANY_RANDOM: 1186 # Try to use an address generated at random by the controller 1187 if self.host.supports_command(HCI_LE_RAND_COMMAND): 1188 # Get 8 random bytes 1189 response = await self.send_command( 1190 HCI_LE_Rand_Command(), check_result=True # type: ignore[call-arg] 1191 ) 1192 1193 # Ensure the address bytes can be a static random address 1194 address_bytes = response.return_parameters.random_number[ 1195 :5 1196 ] + bytes([response.return_parameters.random_number[5] | 0xC0]) 1197 1198 # Create a static random address from the random bytes 1199 self.random_address = Address(address_bytes) 1200 1201 if self.random_address != Address.ANY_RANDOM: 1202 logger.debug( 1203 color( 1204 f'LE Random Address: {self.random_address}', 1205 'yellow', 1206 ) 1207 ) 1208 await self.send_command( 1209 HCI_LE_Set_Random_Address_Command( 1210 random_address=self.random_address 1211 ), # type: ignore[call-arg] 1212 check_result=True, 1213 ) 1214 1215 # Load the address resolving list 1216 if self.keystore and self.host.supports_command( 1217 HCI_LE_CLEAR_RESOLVING_LIST_COMMAND 1218 ): 1219 await self.send_command(HCI_LE_Clear_Resolving_List_Command()) # type: ignore[call-arg] 1220 1221 resolving_keys = await self.keystore.get_resolving_keys() 1222 for (irk, address) in resolving_keys: 1223 await self.send_command( 1224 HCI_LE_Add_Device_To_Resolving_List_Command( 1225 peer_identity_address_type=address.address_type, 1226 peer_identity_address=address, 1227 peer_irk=irk, 1228 local_irk=self.irk, 1229 ) # type: ignore[call-arg] 1230 ) 1231 1232 # Enable address resolution 1233 # await self.send_command( 1234 # HCI_LE_Set_Address_Resolution_Enable_Command( 1235 # address_resolution_enable=1) 1236 # ) 1237 # ) 1238 1239 # Create a host-side address resolver 1240 self.address_resolver = smp.AddressResolver(resolving_keys) 1241 1242 if self.classic_enabled: 1243 await self.send_command( 1244 HCI_Write_Local_Name_Command(local_name=self.name.encode('utf8')) # type: ignore[call-arg] 1245 ) 1246 await self.send_command( 1247 HCI_Write_Class_Of_Device_Command(class_of_device=self.class_of_device) # type: ignore[call-arg] 1248 ) 1249 await self.send_command( 1250 HCI_Write_Simple_Pairing_Mode_Command( 1251 simple_pairing_mode=int(self.classic_ssp_enabled) 1252 ) # type: ignore[call-arg] 1253 ) 1254 await self.send_command( 1255 HCI_Write_Secure_Connections_Host_Support_Command( 1256 secure_connections_host_support=int(self.classic_sc_enabled) 1257 ) # type: ignore[call-arg] 1258 ) 1259 await self.set_connectable(self.connectable) 1260 await self.set_discoverable(self.discoverable) 1261 1262 # Done 1263 self.powered_on = True 1264 1265 async def power_off(self) -> None: 1266 if self.powered_on: 1267 await self.host.flush() 1268 self.powered_on = False 1269 1270 def supports_le_feature(self, feature): 1271 return self.host.supports_le_feature(feature) 1272 1273 def supports_le_phy(self, phy): 1274 if phy == HCI_LE_1M_PHY: 1275 return True 1276 1277 feature_map = { 1278 HCI_LE_2M_PHY: HCI_LE_2M_PHY_LE_SUPPORTED_FEATURE, 1279 HCI_LE_CODED_PHY: HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE, 1280 } 1281 if phy not in feature_map: 1282 raise ValueError('invalid PHY') 1283 1284 return self.host.supports_le_feature(feature_map[phy]) 1285 1286 async def start_advertising( 1287 self, 1288 advertising_type: AdvertisingType = AdvertisingType.UNDIRECTED_CONNECTABLE_SCANNABLE, 1289 target: Optional[Address] = None, 1290 own_address_type: int = OwnAddressType.RANDOM, 1291 auto_restart: bool = False, 1292 ) -> None: 1293 # If we're advertising, stop first 1294 if self.advertising: 1295 await self.stop_advertising() 1296 1297 # Set/update the advertising data if the advertising type allows it 1298 if advertising_type.has_data: 1299 await self.send_command( 1300 HCI_LE_Set_Advertising_Data_Command( 1301 advertising_data=self.advertising_data 1302 ), # type: ignore[call-arg] 1303 check_result=True, 1304 ) 1305 1306 # Set/update the scan response data if the advertising is scannable 1307 if advertising_type.is_scannable: 1308 await self.send_command( 1309 HCI_LE_Set_Scan_Response_Data_Command( 1310 scan_response_data=self.scan_response_data 1311 ), # type: ignore[call-arg] 1312 check_result=True, 1313 ) 1314 1315 # Decide what peer address to use 1316 if advertising_type.is_directed: 1317 if target is None: 1318 raise ValueError('directed advertising requires a target address') 1319 1320 peer_address = target 1321 peer_address_type = target.address_type 1322 else: 1323 peer_address = Address('00:00:00:00:00:00') 1324 peer_address_type = Address.PUBLIC_DEVICE_ADDRESS 1325 1326 # Set the advertising parameters 1327 await self.send_command( 1328 HCI_LE_Set_Advertising_Parameters_Command( 1329 advertising_interval_min=self.advertising_interval_min, 1330 advertising_interval_max=self.advertising_interval_max, 1331 advertising_type=int(advertising_type), 1332 own_address_type=own_address_type, 1333 peer_address_type=peer_address_type, 1334 peer_address=peer_address, 1335 advertising_channel_map=7, 1336 advertising_filter_policy=0, 1337 ), # type: ignore[call-arg] 1338 check_result=True, 1339 ) 1340 1341 # Enable advertising 1342 await self.send_command( 1343 HCI_LE_Set_Advertising_Enable_Command(advertising_enable=1), # type: ignore[call-arg] 1344 check_result=True, 1345 ) 1346 1347 self.advertising_own_address_type = own_address_type 1348 self.auto_restart_advertising = auto_restart 1349 self.advertising_type = advertising_type 1350 self.advertising = True 1351 1352 async def stop_advertising(self) -> None: 1353 # Disable advertising 1354 if self.advertising: 1355 await self.send_command( 1356 HCI_LE_Set_Advertising_Enable_Command(advertising_enable=0), # type: ignore[call-arg] 1357 check_result=True, 1358 ) 1359 1360 self.advertising_own_address_type = None 1361 self.advertising = False 1362 self.advertising_type = None 1363 self.auto_restart_advertising = False 1364 1365 @property 1366 def is_advertising(self): 1367 return self.advertising 1368 1369 async def start_scanning( 1370 self, 1371 legacy: bool = False, 1372 active: bool = True, 1373 scan_interval: int = DEVICE_DEFAULT_SCAN_INTERVAL, # Scan interval in ms 1374 scan_window: int = DEVICE_DEFAULT_SCAN_WINDOW, # Scan window in ms 1375 own_address_type: int = OwnAddressType.RANDOM, 1376 filter_duplicates: bool = False, 1377 scanning_phys: Tuple[int, int] = (HCI_LE_1M_PHY, HCI_LE_CODED_PHY), 1378 ) -> None: 1379 # Check that the arguments are legal 1380 if scan_interval < scan_window: 1381 raise ValueError('scan_interval must be >= scan_window') 1382 if ( 1383 scan_interval < DEVICE_MIN_SCAN_INTERVAL 1384 or scan_interval > DEVICE_MAX_SCAN_INTERVAL 1385 ): 1386 raise ValueError('scan_interval out of range') 1387 if scan_window < DEVICE_MIN_SCAN_WINDOW or scan_window > DEVICE_MAX_SCAN_WINDOW: 1388 raise ValueError('scan_interval out of range') 1389 1390 # Reset the accumulators 1391 self.advertisement_accumulators = {} 1392 1393 # Enable scanning 1394 if not legacy and self.supports_le_feature( 1395 HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE 1396 ): 1397 # Set the scanning parameters 1398 scan_type = ( 1399 HCI_LE_Set_Extended_Scan_Parameters_Command.ACTIVE_SCANNING 1400 if active 1401 else HCI_LE_Set_Extended_Scan_Parameters_Command.PASSIVE_SCANNING 1402 ) 1403 scanning_filter_policy = ( 1404 HCI_LE_Set_Extended_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY 1405 ) # TODO: support other types 1406 1407 scanning_phy_count = 0 1408 scanning_phys_bits = 0 1409 if HCI_LE_1M_PHY in scanning_phys: 1410 scanning_phys_bits |= 1 << HCI_LE_1M_PHY_BIT 1411 scanning_phy_count += 1 1412 if HCI_LE_CODED_PHY in scanning_phys: 1413 if self.supports_le_feature(HCI_LE_CODED_PHY_LE_SUPPORTED_FEATURE): 1414 scanning_phys_bits |= 1 << HCI_LE_CODED_PHY_BIT 1415 scanning_phy_count += 1 1416 1417 if scanning_phy_count == 0: 1418 raise ValueError('at least one scanning PHY must be enabled') 1419 1420 await self.send_command( 1421 HCI_LE_Set_Extended_Scan_Parameters_Command( 1422 own_address_type=own_address_type, 1423 scanning_filter_policy=scanning_filter_policy, 1424 scanning_phys=scanning_phys_bits, 1425 scan_types=[scan_type] * scanning_phy_count, 1426 scan_intervals=[int(scan_window / 0.625)] * scanning_phy_count, 1427 scan_windows=[int(scan_window / 0.625)] * scanning_phy_count, 1428 ), # type: ignore[call-arg] 1429 check_result=True, 1430 ) 1431 1432 # Enable scanning 1433 await self.send_command( 1434 HCI_LE_Set_Extended_Scan_Enable_Command( 1435 enable=1, 1436 filter_duplicates=1 if filter_duplicates else 0, 1437 duration=0, # TODO allow other values 1438 period=0, # TODO allow other values 1439 ), # type: ignore[call-arg] 1440 check_result=True, 1441 ) 1442 else: 1443 # Set the scanning parameters 1444 scan_type = ( 1445 HCI_LE_Set_Scan_Parameters_Command.ACTIVE_SCANNING 1446 if active 1447 else HCI_LE_Set_Scan_Parameters_Command.PASSIVE_SCANNING 1448 ) 1449 await self.send_command( 1450 # pylint: disable=line-too-long 1451 HCI_LE_Set_Scan_Parameters_Command( 1452 le_scan_type=scan_type, 1453 le_scan_interval=int(scan_window / 0.625), 1454 le_scan_window=int(scan_window / 0.625), 1455 own_address_type=own_address_type, 1456 scanning_filter_policy=HCI_LE_Set_Scan_Parameters_Command.BASIC_UNFILTERED_POLICY, 1457 ), # type: ignore[call-arg] 1458 check_result=True, 1459 ) 1460 1461 # Enable scanning 1462 await self.send_command( 1463 HCI_LE_Set_Scan_Enable_Command( 1464 le_scan_enable=1, filter_duplicates=1 if filter_duplicates else 0 1465 ), # type: ignore[call-arg] 1466 check_result=True, 1467 ) 1468 1469 self.scanning_is_passive = not active 1470 self.scanning = True 1471 1472 async def stop_scanning(self) -> None: 1473 # Disable scanning 1474 if self.supports_le_feature(HCI_LE_EXTENDED_ADVERTISING_LE_SUPPORTED_FEATURE): 1475 await self.send_command( 1476 HCI_LE_Set_Extended_Scan_Enable_Command( 1477 enable=0, filter_duplicates=0, duration=0, period=0 1478 ), # type: ignore[call-arg] 1479 check_result=True, 1480 ) 1481 else: 1482 await self.send_command( 1483 HCI_LE_Set_Scan_Enable_Command(le_scan_enable=0, filter_duplicates=0), # type: ignore[call-arg] 1484 check_result=True, 1485 ) 1486 1487 self.scanning = False 1488 1489 @property 1490 def is_scanning(self): 1491 return self.scanning 1492 1493 @host_event_handler 1494 def on_advertising_report(self, report): 1495 if not (accumulator := self.advertisement_accumulators.get(report.address)): 1496 accumulator = AdvertisementDataAccumulator(passive=self.scanning_is_passive) 1497 self.advertisement_accumulators[report.address] = accumulator 1498 if advertisement := accumulator.update(report): 1499 self.emit('advertisement', advertisement) 1500 1501 async def start_discovery(self, auto_restart: bool = True) -> None: 1502 await self.send_command( 1503 HCI_Write_Inquiry_Mode_Command(inquiry_mode=HCI_EXTENDED_INQUIRY_MODE), # type: ignore[call-arg] 1504 check_result=True, 1505 ) 1506 1507 response = await self.send_command( 1508 HCI_Inquiry_Command( 1509 lap=HCI_GENERAL_INQUIRY_LAP, 1510 inquiry_length=DEVICE_DEFAULT_INQUIRY_LENGTH, 1511 num_responses=0, # Unlimited number of responses. 1512 ) # type: ignore[call-arg] 1513 ) 1514 if response.status != HCI_Command_Status_Event.PENDING: 1515 self.discovering = False 1516 raise HCI_StatusError(response) 1517 1518 self.auto_restart_inquiry = auto_restart 1519 self.discovering = True 1520 1521 async def stop_discovery(self) -> None: 1522 if self.discovering: 1523 await self.send_command(HCI_Inquiry_Cancel_Command(), check_result=True) # type: ignore[call-arg] 1524 self.auto_restart_inquiry = True 1525 self.discovering = False 1526 1527 @host_event_handler 1528 def on_inquiry_result(self, address, class_of_device, data, rssi): 1529 self.emit( 1530 'inquiry_result', 1531 address, 1532 class_of_device, 1533 AdvertisingData.from_bytes(data), 1534 rssi, 1535 ) 1536 1537 async def set_scan_enable(self, inquiry_scan_enabled, page_scan_enabled): 1538 if inquiry_scan_enabled and page_scan_enabled: 1539 scan_enable = 0x03 1540 elif page_scan_enabled: 1541 scan_enable = 0x02 1542 elif inquiry_scan_enabled: 1543 scan_enable = 0x01 1544 else: 1545 scan_enable = 0x00 1546 1547 return await self.send_command( 1548 HCI_Write_Scan_Enable_Command(scan_enable=scan_enable) 1549 ) 1550 1551 async def set_discoverable(self, discoverable: bool = True) -> None: 1552 self.discoverable = discoverable 1553 if self.classic_enabled: 1554 # Synthesize an inquiry response if none is set already 1555 if self.inquiry_response is None: 1556 self.inquiry_response = bytes( 1557 AdvertisingData( 1558 [ 1559 ( 1560 AdvertisingData.COMPLETE_LOCAL_NAME, 1561 bytes(self.name, 'utf-8'), 1562 ) 1563 ] 1564 ) 1565 ) 1566 1567 # Update the controller 1568 await self.send_command( 1569 HCI_Write_Extended_Inquiry_Response_Command( 1570 fec_required=0, extended_inquiry_response=self.inquiry_response 1571 ), # type: ignore[call-arg] 1572 check_result=True, 1573 ) 1574 await self.set_scan_enable( 1575 inquiry_scan_enabled=self.discoverable, 1576 page_scan_enabled=self.connectable, 1577 ) 1578 1579 async def set_connectable(self, connectable: bool = True) -> None: 1580 self.connectable = connectable 1581 if self.classic_enabled: 1582 await self.set_scan_enable( 1583 inquiry_scan_enabled=self.discoverable, 1584 page_scan_enabled=self.connectable, 1585 ) 1586 1587 async def connect( 1588 self, 1589 peer_address: Union[Address, str], 1590 transport: int = BT_LE_TRANSPORT, 1591 connection_parameters_preferences: Optional[ 1592 Dict[int, ConnectionParametersPreferences] 1593 ] = None, 1594 own_address_type: int = OwnAddressType.RANDOM, 1595 timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, 1596 ) -> Connection: 1597 ''' 1598 Request a connection to a peer. 1599 When transport is BLE, this method cannot be called if there is already a 1600 pending connection. 1601 1602 connection_parameters_preferences: (BLE only, ignored for BR/EDR) 1603 * None: use all PHYs with default parameters 1604 * map: each entry has a PHY as key and a ConnectionParametersPreferences 1605 object as value 1606 1607 own_address_type: (BLE only) 1608 ''' 1609 1610 # Check parameters 1611 if transport not in (BT_LE_TRANSPORT, BT_BR_EDR_TRANSPORT): 1612 raise ValueError('invalid transport') 1613 1614 # Adjust the transport automatically if we need to 1615 if transport == BT_LE_TRANSPORT and not self.le_enabled: 1616 transport = BT_BR_EDR_TRANSPORT 1617 elif transport == BT_BR_EDR_TRANSPORT and not self.classic_enabled: 1618 transport = BT_LE_TRANSPORT 1619 1620 # Check that there isn't already a pending connection 1621 if transport == BT_LE_TRANSPORT and self.is_le_connecting: 1622 raise InvalidStateError('connection already pending') 1623 1624 if isinstance(peer_address, str): 1625 try: 1626 peer_address = Address.from_string_for_transport( 1627 peer_address, transport 1628 ) 1629 except ValueError: 1630 # If the address is not parsable, assume it is a name instead 1631 logger.debug('looking for peer by name') 1632 peer_address = await self.find_peer_by_name( 1633 peer_address, transport 1634 ) # TODO: timeout 1635 else: 1636 # All BR/EDR addresses should be public addresses 1637 if ( 1638 transport == BT_BR_EDR_TRANSPORT 1639 and peer_address.address_type != Address.PUBLIC_DEVICE_ADDRESS 1640 ): 1641 raise ValueError('BR/EDR addresses must be PUBLIC') 1642 1643 assert isinstance(peer_address, Address) 1644 1645 def on_connection(connection): 1646 if transport == BT_LE_TRANSPORT or ( 1647 # match BR/EDR connection event against peer address 1648 connection.transport == transport 1649 and connection.peer_address == peer_address 1650 ): 1651 pending_connection.set_result(connection) 1652 1653 def on_connection_failure(error): 1654 if transport == BT_LE_TRANSPORT or ( 1655 # match BR/EDR connection failure event against peer address 1656 error.transport == transport 1657 and error.peer_address == peer_address 1658 ): 1659 pending_connection.set_exception(error) 1660 1661 # Create a future so that we can wait for the connection's result 1662 pending_connection = asyncio.get_running_loop().create_future() 1663 self.on('connection', on_connection) 1664 self.on('connection_failure', on_connection_failure) 1665 1666 try: 1667 # Tell the controller to connect 1668 if transport == BT_LE_TRANSPORT: 1669 if connection_parameters_preferences is None: 1670 if connection_parameters_preferences is None: 1671 connection_parameters_preferences = { 1672 HCI_LE_1M_PHY: ConnectionParametersPreferences.default, 1673 HCI_LE_2M_PHY: ConnectionParametersPreferences.default, 1674 HCI_LE_CODED_PHY: ConnectionParametersPreferences.default, 1675 } 1676 1677 self.connect_own_address_type = own_address_type 1678 1679 if self.host.supports_command( 1680 HCI_LE_EXTENDED_CREATE_CONNECTION_COMMAND 1681 ): 1682 # Only keep supported PHYs 1683 phys = sorted( 1684 list( 1685 set( 1686 filter( 1687 self.supports_le_phy, 1688 connection_parameters_preferences.keys(), 1689 ) 1690 ) 1691 ) 1692 ) 1693 if not phys: 1694 raise ValueError('at least one supported PHY needed') 1695 1696 phy_count = len(phys) 1697 initiating_phys = phy_list_to_bits(phys) 1698 1699 connection_interval_mins = [ 1700 int( 1701 connection_parameters_preferences[ 1702 phy 1703 ].connection_interval_min 1704 / 1.25 1705 ) 1706 for phy in phys 1707 ] 1708 connection_interval_maxs = [ 1709 int( 1710 connection_parameters_preferences[ 1711 phy 1712 ].connection_interval_max 1713 / 1.25 1714 ) 1715 for phy in phys 1716 ] 1717 max_latencies = [ 1718 connection_parameters_preferences[phy].max_latency 1719 for phy in phys 1720 ] 1721 supervision_timeouts = [ 1722 int( 1723 connection_parameters_preferences[phy].supervision_timeout 1724 / 10 1725 ) 1726 for phy in phys 1727 ] 1728 min_ce_lengths = [ 1729 int( 1730 connection_parameters_preferences[phy].min_ce_length / 0.625 1731 ) 1732 for phy in phys 1733 ] 1734 max_ce_lengths = [ 1735 int( 1736 connection_parameters_preferences[phy].max_ce_length / 0.625 1737 ) 1738 for phy in phys 1739 ] 1740 1741 result = await self.send_command( 1742 HCI_LE_Extended_Create_Connection_Command( 1743 initiator_filter_policy=0, 1744 own_address_type=own_address_type, 1745 peer_address_type=peer_address.address_type, 1746 peer_address=peer_address, 1747 initiating_phys=initiating_phys, 1748 scan_intervals=( 1749 int(DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625), 1750 ) 1751 * phy_count, 1752 scan_windows=( 1753 int(DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625), 1754 ) 1755 * phy_count, 1756 connection_interval_mins=connection_interval_mins, 1757 connection_interval_maxs=connection_interval_maxs, 1758 max_latencies=max_latencies, 1759 supervision_timeouts=supervision_timeouts, 1760 min_ce_lengths=min_ce_lengths, 1761 max_ce_lengths=max_ce_lengths, 1762 ) # type: ignore[call-arg] 1763 ) 1764 else: 1765 if HCI_LE_1M_PHY not in connection_parameters_preferences: 1766 raise ValueError('1M PHY preferences required') 1767 1768 prefs = connection_parameters_preferences[HCI_LE_1M_PHY] 1769 result = await self.send_command( 1770 HCI_LE_Create_Connection_Command( 1771 le_scan_interval=int( 1772 DEVICE_DEFAULT_CONNECT_SCAN_INTERVAL / 0.625 1773 ), 1774 le_scan_window=int( 1775 DEVICE_DEFAULT_CONNECT_SCAN_WINDOW / 0.625 1776 ), 1777 initiator_filter_policy=0, 1778 peer_address_type=peer_address.address_type, 1779 peer_address=peer_address, 1780 own_address_type=own_address_type, 1781 connection_interval_min=int( 1782 prefs.connection_interval_min / 1.25 1783 ), 1784 connection_interval_max=int( 1785 prefs.connection_interval_max / 1.25 1786 ), 1787 max_latency=prefs.max_latency, 1788 supervision_timeout=int(prefs.supervision_timeout / 10), 1789 min_ce_length=int(prefs.min_ce_length / 0.625), 1790 max_ce_length=int(prefs.max_ce_length / 0.625), 1791 ) # type: ignore[call-arg] 1792 ) 1793 else: 1794 # Save pending connection 1795 self.pending_connections[peer_address] = Connection.incomplete( 1796 self, peer_address 1797 ) 1798 1799 # TODO: allow passing other settings 1800 result = await self.send_command( 1801 HCI_Create_Connection_Command( 1802 bd_addr=peer_address, 1803 packet_type=0xCC18, # FIXME: change 1804 page_scan_repetition_mode=HCI_R2_PAGE_SCAN_REPETITION_MODE, 1805 clock_offset=0x0000, 1806 allow_role_switch=0x01, 1807 reserved=0, 1808 ) # type: ignore[call-arg] 1809 ) 1810 1811 if result.status != HCI_Command_Status_Event.PENDING: 1812 raise HCI_StatusError(result) 1813 1814 # Wait for the connection process to complete 1815 if transport == BT_LE_TRANSPORT: 1816 self.le_connecting = True 1817 1818 if timeout is None: 1819 return await self.abort_on('flush', pending_connection) 1820 1821 try: 1822 return await asyncio.wait_for( 1823 asyncio.shield(pending_connection), timeout 1824 ) 1825 except asyncio.TimeoutError: 1826 if transport == BT_LE_TRANSPORT: 1827 await self.send_command(HCI_LE_Create_Connection_Cancel_Command()) # type: ignore[call-arg] 1828 else: 1829 await self.send_command( 1830 HCI_Create_Connection_Cancel_Command(bd_addr=peer_address) # type: ignore[call-arg] 1831 ) 1832 1833 try: 1834 return await self.abort_on('flush', pending_connection) 1835 except core.ConnectionError as error: 1836 raise core.TimeoutError() from error 1837 finally: 1838 self.remove_listener('connection', on_connection) 1839 self.remove_listener('connection_failure', on_connection_failure) 1840 if transport == BT_LE_TRANSPORT: 1841 self.le_connecting = False 1842 self.connect_own_address_type = None 1843 else: 1844 self.pending_connections.pop(peer_address, None) 1845 1846 async def accept( 1847 self, 1848 peer_address: Union[Address, str] = Address.ANY, 1849 role: int = BT_PERIPHERAL_ROLE, 1850 timeout: Optional[float] = DEVICE_DEFAULT_CONNECT_TIMEOUT, 1851 ) -> Connection: 1852 ''' 1853 Wait and accept any incoming connection or a connection from `peer_address` when 1854 set. 1855 1856 Notes: 1857 * A `connect` to the same peer will not complete this call. 1858 * The `timeout` parameter is only handled while waiting for the connection 1859 request, once received and accepted, the controller shall issue a connection 1860 complete event. 1861 ''' 1862 1863 if isinstance(peer_address, str): 1864 try: 1865 peer_address = Address(peer_address) 1866 except ValueError: 1867 # If the address is not parsable, assume it is a name instead 1868 logger.debug('looking for peer by name') 1869 peer_address = await self.find_peer_by_name( 1870 peer_address, BT_BR_EDR_TRANSPORT 1871 ) # TODO: timeout 1872 1873 assert isinstance(peer_address, Address) 1874 1875 if peer_address == Address.NIL: 1876 raise ValueError('accept on nil address') 1877 1878 # Create a future so that we can wait for the request 1879 pending_request_fut = asyncio.get_running_loop().create_future() 1880 1881 if peer_address == Address.ANY: 1882 self.classic_pending_accepts[Address.ANY].append(pending_request_fut) 1883 elif peer_address in self.classic_pending_accepts: 1884 raise InvalidStateError('accept connection already pending') 1885 else: 1886 self.classic_pending_accepts[peer_address] = [pending_request_fut] 1887 1888 try: 1889 # Wait for a request or a completed connection 1890 pending_request = self.abort_on('flush', pending_request_fut) 1891 result = await ( 1892 asyncio.wait_for(pending_request, timeout) 1893 if timeout 1894 else pending_request 1895 ) 1896 except Exception: 1897 # Remove future from device context 1898 if peer_address == Address.ANY: 1899 self.classic_pending_accepts[Address.ANY].remove(pending_request_fut) 1900 else: 1901 self.classic_pending_accepts.pop(peer_address) 1902 raise 1903 1904 # Result may already be a completed connection, 1905 # see `on_connection` for details 1906 if isinstance(result, Connection): 1907 return result 1908 1909 # Otherwise, result came from `on_connection_request` 1910 peer_address, _class_of_device, _link_type = result 1911 assert isinstance(peer_address, Address) 1912 1913 # Create a future so that we can wait for the connection's result 1914 pending_connection = asyncio.get_running_loop().create_future() 1915 1916 def on_connection(connection): 1917 if ( 1918 connection.transport == BT_BR_EDR_TRANSPORT 1919 and connection.peer_address == peer_address 1920 ): 1921 pending_connection.set_result(connection) 1922 1923 def on_connection_failure(error): 1924 if ( 1925 error.transport == BT_BR_EDR_TRANSPORT 1926 and error.peer_address == peer_address 1927 ): 1928 pending_connection.set_exception(error) 1929 1930 self.on('connection', on_connection) 1931 self.on('connection_failure', on_connection_failure) 1932 1933 # Save pending connection 1934 self.pending_connections[peer_address] = Connection.incomplete( 1935 self, peer_address 1936 ) 1937 1938 try: 1939 # Accept connection request 1940 await self.send_command( 1941 HCI_Accept_Connection_Request_Command(bd_addr=peer_address, role=role) # type: ignore[call-arg] 1942 ) 1943 1944 # Wait for connection complete 1945 return await self.abort_on('flush', pending_connection) 1946 1947 finally: 1948 self.remove_listener('connection', on_connection) 1949 self.remove_listener('connection_failure', on_connection_failure) 1950 self.pending_connections.pop(peer_address, None) 1951 1952 @asynccontextmanager 1953 async def connect_as_gatt(self, peer_address): 1954 async with AsyncExitStack() as stack: 1955 connection = await stack.enter_async_context( 1956 await self.connect(peer_address) 1957 ) 1958 peer = await stack.enter_async_context(Peer(connection)) 1959 1960 yield peer 1961 1962 @property 1963 def is_le_connecting(self): 1964 return self.le_connecting 1965 1966 @property 1967 def is_disconnecting(self): 1968 return self.disconnecting 1969 1970 async def cancel_connection(self, peer_address=None): 1971 # Low-energy: cancel ongoing connection 1972 if peer_address is None: 1973 if not self.is_le_connecting: 1974 return 1975 await self.send_command( 1976 HCI_LE_Create_Connection_Cancel_Command(), check_result=True 1977 ) 1978 1979 # BR/EDR: try to cancel to ongoing connection 1980 # NOTE: This API does not prevent from trying to cancel a connection which is 1981 # not currently being created 1982 else: 1983 if isinstance(peer_address, str): 1984 try: 1985 peer_address = Address(peer_address) 1986 except ValueError: 1987 # If the address is not parsable, assume it is a name instead 1988 logger.debug('looking for peer by name') 1989 peer_address = await self.find_peer_by_name( 1990 peer_address, BT_BR_EDR_TRANSPORT 1991 ) # TODO: timeout 1992 1993 await self.send_command( 1994 HCI_Create_Connection_Cancel_Command(bd_addr=peer_address), 1995 check_result=True, 1996 ) 1997 1998 async def disconnect(self, connection, reason): 1999 # Create a future so that we can wait for the disconnection's result 2000 pending_disconnection = asyncio.get_running_loop().create_future() 2001 connection.on('disconnection', pending_disconnection.set_result) 2002 connection.on('disconnection_failure', pending_disconnection.set_exception) 2003 2004 # Request a disconnection 2005 result = await self.send_command( 2006 HCI_Disconnect_Command(connection_handle=connection.handle, reason=reason) 2007 ) 2008 2009 try: 2010 if result.status != HCI_Command_Status_Event.PENDING: 2011 raise HCI_StatusError(result) 2012 2013 # Wait for the disconnection process to complete 2014 self.disconnecting = True 2015 return await self.abort_on('flush', pending_disconnection) 2016 finally: 2017 connection.remove_listener( 2018 'disconnection', pending_disconnection.set_result 2019 ) 2020 connection.remove_listener( 2021 'disconnection_failure', pending_disconnection.set_exception 2022 ) 2023 self.disconnecting = False 2024 2025 async def update_connection_parameters( 2026 self, 2027 connection, 2028 connection_interval_min, 2029 connection_interval_max, 2030 max_latency, 2031 supervision_timeout, 2032 min_ce_length=0, 2033 max_ce_length=0, 2034 ): 2035 ''' 2036 NOTE: the name of the parameters may look odd, but it just follows the names 2037 used in the Bluetooth spec. 2038 ''' 2039 result = await self.send_command( 2040 HCI_LE_Connection_Update_Command( 2041 connection_handle=connection.handle, 2042 connection_interval_min=connection_interval_min, 2043 connection_interval_max=connection_interval_max, 2044 max_latency=max_latency, 2045 supervision_timeout=supervision_timeout, 2046 min_ce_length=min_ce_length, 2047 max_ce_length=max_ce_length, 2048 ) 2049 ) 2050 if result.status != HCI_Command_Status_Event.PENDING: 2051 raise HCI_StatusError(result) 2052 2053 async def get_connection_rssi(self, connection): 2054 result = await self.send_command( 2055 HCI_Read_RSSI_Command(handle=connection.handle), check_result=True 2056 ) 2057 return result.return_parameters.rssi 2058 2059 async def get_connection_phy(self, connection): 2060 result = await self.send_command( 2061 HCI_LE_Read_PHY_Command(connection_handle=connection.handle), 2062 check_result=True, 2063 ) 2064 return (result.return_parameters.tx_phy, result.return_parameters.rx_phy) 2065 2066 async def set_connection_phy( 2067 self, connection, tx_phys=None, rx_phys=None, phy_options=None 2068 ): 2069 if not self.host.supports_command(HCI_LE_SET_PHY_COMMAND): 2070 logger.warning('ignoring request, command not supported') 2071 return 2072 2073 all_phys_bits = (1 if tx_phys is None else 0) | ( 2074 (1 if rx_phys is None else 0) << 1 2075 ) 2076 2077 result = await self.send_command( 2078 HCI_LE_Set_PHY_Command( 2079 connection_handle=connection.handle, 2080 all_phys=all_phys_bits, 2081 tx_phys=phy_list_to_bits(tx_phys), 2082 rx_phys=phy_list_to_bits(rx_phys), 2083 phy_options=0 if phy_options is None else int(phy_options), 2084 ) 2085 ) 2086 2087 if result.status != HCI_COMMAND_STATUS_PENDING: 2088 logger.warning( 2089 'HCI_LE_Set_PHY_Command failed: ' 2090 f'{HCI_Constant.error_name(result.status)}' 2091 ) 2092 raise HCI_StatusError(result) 2093 2094 async def set_default_phy(self, tx_phys=None, rx_phys=None): 2095 all_phys_bits = (1 if tx_phys is None else 0) | ( 2096 (1 if rx_phys is None else 0) << 1 2097 ) 2098 2099 return await self.send_command( 2100 HCI_LE_Set_Default_PHY_Command( 2101 all_phys=all_phys_bits, 2102 tx_phys=phy_list_to_bits(tx_phys), 2103 rx_phys=phy_list_to_bits(rx_phys), 2104 ), 2105 check_result=True, 2106 ) 2107 2108 async def find_peer_by_name(self, name, transport=BT_LE_TRANSPORT): 2109 """ 2110 Scan for a peer with a give name and return its address and transport 2111 """ 2112 2113 # Create a future to wait for an address to be found 2114 peer_address = asyncio.get_running_loop().create_future() 2115 2116 # Scan/inquire with event handlers to handle scan/inquiry results 2117 def on_peer_found(address, ad_data): 2118 local_name = ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) 2119 if local_name is None: 2120 local_name = ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True) 2121 if local_name is not None: 2122 if local_name.decode('utf-8') == name: 2123 peer_address.set_result(address) 2124 2125 handler = None 2126 was_scanning = self.scanning 2127 was_discovering = self.discovering 2128 try: 2129 if transport == BT_LE_TRANSPORT: 2130 event_name = 'advertisement' 2131 handler = self.on( 2132 event_name, 2133 lambda advertisement: on_peer_found( 2134 advertisement.address, advertisement.data 2135 ), 2136 ) 2137 2138 if not self.scanning: 2139 await self.start_scanning(filter_duplicates=True) 2140 2141 elif transport == BT_BR_EDR_TRANSPORT: 2142 event_name = 'inquiry_result' 2143 handler = self.on( 2144 event_name, 2145 lambda address, class_of_device, eir_data, rssi: on_peer_found( 2146 address, eir_data 2147 ), 2148 ) 2149 2150 if not self.discovering: 2151 await self.start_discovery() 2152 else: 2153 return None 2154 2155 return await self.abort_on('flush', peer_address) 2156 finally: 2157 if handler is not None: 2158 self.remove_listener(event_name, handler) 2159 2160 if transport == BT_LE_TRANSPORT and not was_scanning: 2161 await self.stop_scanning() 2162 elif transport == BT_BR_EDR_TRANSPORT and not was_discovering: 2163 await self.stop_discovery() 2164 2165 @property 2166 def pairing_config_factory(self): 2167 return self.smp_manager.pairing_config_factory 2168 2169 @pairing_config_factory.setter 2170 def pairing_config_factory(self, pairing_config_factory): 2171 self.smp_manager.pairing_config_factory = pairing_config_factory 2172 2173 async def pair(self, connection): 2174 return await self.smp_manager.pair(connection) 2175 2176 def request_pairing(self, connection): 2177 return self.smp_manager.request_pairing(connection) 2178 2179 async def get_long_term_key(self, connection_handle, rand, ediv): 2180 if (connection := self.lookup_connection(connection_handle)) is None: 2181 return 2182 2183 # Start by looking for the key in an SMP session 2184 ltk = self.smp_manager.get_long_term_key(connection, rand, ediv) 2185 if ltk is not None: 2186 return ltk 2187 2188 # Then look for the key in the keystore 2189 if self.keystore is not None: 2190 keys = await self.keystore.get(str(connection.peer_address)) 2191 if keys is not None: 2192 logger.debug('found keys in the key store') 2193 if keys.ltk: 2194 return keys.ltk.value 2195 2196 if connection.role == BT_CENTRAL_ROLE and keys.ltk_central: 2197 return keys.ltk_central.value 2198 2199 if connection.role == BT_PERIPHERAL_ROLE and keys.ltk_peripheral: 2200 return keys.ltk_peripheral.value 2201 2202 async def get_link_key(self, address): 2203 # Look for the key in the keystore 2204 if self.keystore is not None: 2205 keys = await self.keystore.get(str(address)) 2206 if keys is not None: 2207 logger.debug('found keys in the key store') 2208 return keys.link_key.value 2209 2210 # [Classic only] 2211 async def authenticate(self, connection): 2212 # Set up event handlers 2213 pending_authentication = asyncio.get_running_loop().create_future() 2214 2215 def on_authentication(): 2216 pending_authentication.set_result(None) 2217 2218 def on_authentication_failure(error_code): 2219 pending_authentication.set_exception(HCI_Error(error_code)) 2220 2221 connection.on('connection_authentication', on_authentication) 2222 connection.on('connection_authentication_failure', on_authentication_failure) 2223 2224 # Request the authentication 2225 try: 2226 result = await self.send_command( 2227 HCI_Authentication_Requested_Command( 2228 connection_handle=connection.handle 2229 ) 2230 ) 2231 if result.status != HCI_COMMAND_STATUS_PENDING: 2232 logger.warning( 2233 'HCI_Authentication_Requested_Command failed: ' 2234 f'{HCI_Constant.error_name(result.status)}' 2235 ) 2236 raise HCI_StatusError(result) 2237 2238 # Wait for the authentication to complete 2239 await connection.abort_on('disconnection', pending_authentication) 2240 finally: 2241 connection.remove_listener('connection_authentication', on_authentication) 2242 connection.remove_listener( 2243 'connection_authentication_failure', on_authentication_failure 2244 ) 2245 2246 async def encrypt(self, connection, enable=True): 2247 if not enable and connection.transport == BT_LE_TRANSPORT: 2248 raise ValueError('`enable` parameter is classic only.') 2249 2250 # Set up event handlers 2251 pending_encryption = asyncio.get_running_loop().create_future() 2252 2253 def on_encryption_change(): 2254 pending_encryption.set_result(None) 2255 2256 def on_encryption_failure(error_code): 2257 pending_encryption.set_exception(HCI_Error(error_code)) 2258 2259 connection.on('connection_encryption_change', on_encryption_change) 2260 connection.on('connection_encryption_failure', on_encryption_failure) 2261 2262 # Request the encryption 2263 try: 2264 if connection.transport == BT_LE_TRANSPORT: 2265 # Look for a key in the key store 2266 if self.keystore is None: 2267 raise RuntimeError('no key store') 2268 2269 keys = await self.keystore.get(str(connection.peer_address)) 2270 if keys is None: 2271 raise RuntimeError('keys not found in key store') 2272 2273 if keys.ltk is not None: 2274 ltk = keys.ltk.value 2275 rand = bytes(8) 2276 ediv = 0 2277 elif keys.ltk_central is not None: 2278 ltk = keys.ltk_central.value 2279 rand = keys.ltk_central.rand 2280 ediv = keys.ltk_central.ediv 2281 else: 2282 raise RuntimeError('no LTK found for peer') 2283 2284 if connection.role != HCI_CENTRAL_ROLE: 2285 raise InvalidStateError('only centrals can start encryption') 2286 2287 result = await self.send_command( 2288 HCI_LE_Enable_Encryption_Command( 2289 connection_handle=connection.handle, 2290 random_number=rand, 2291 encrypted_diversifier=ediv, 2292 long_term_key=ltk, 2293 ) 2294 ) 2295 2296 if result.status != HCI_COMMAND_STATUS_PENDING: 2297 logger.warning( 2298 'HCI_LE_Enable_Encryption_Command failed: ' 2299 f'{HCI_Constant.error_name(result.status)}' 2300 ) 2301 raise HCI_StatusError(result) 2302 else: 2303 result = await self.send_command( 2304 HCI_Set_Connection_Encryption_Command( 2305 connection_handle=connection.handle, 2306 encryption_enable=0x01 if enable else 0x00, 2307 ) 2308 ) 2309 2310 if result.status != HCI_COMMAND_STATUS_PENDING: 2311 logger.warning( 2312 'HCI_Set_Connection_Encryption_Command failed: ' 2313 f'{HCI_Constant.error_name(result.status)}' 2314 ) 2315 raise HCI_StatusError(result) 2316 2317 # Wait for the result 2318 await connection.abort_on('disconnection', pending_encryption) 2319 finally: 2320 connection.remove_listener( 2321 'connection_encryption_change', on_encryption_change 2322 ) 2323 connection.remove_listener( 2324 'connection_encryption_failure', on_encryption_failure 2325 ) 2326 2327 # [Classic only] 2328 async def switch_role(self, connection: Connection, role: int): 2329 pending_role_change = asyncio.get_running_loop().create_future() 2330 2331 def on_role_change(new_role): 2332 pending_role_change.set_result(new_role) 2333 2334 def on_role_change_failure(error_code): 2335 pending_role_change.set_exception(HCI_Error(error_code)) 2336 2337 connection.on('role_change', on_role_change) 2338 connection.on('role_change_failure', on_role_change_failure) 2339 2340 try: 2341 result = await self.send_command( 2342 HCI_Switch_Role_Command(bd_addr=connection.peer_address, role=role) # type: ignore[call-arg] 2343 ) 2344 if result.status != HCI_COMMAND_STATUS_PENDING: 2345 logger.warning( 2346 'HCI_Switch_Role_Command failed: ' 2347 f'{HCI_Constant.error_name(result.status)}' 2348 ) 2349 raise HCI_StatusError(result) 2350 await connection.abort_on('disconnection', pending_role_change) 2351 finally: 2352 connection.remove_listener('role_change', on_role_change) 2353 connection.remove_listener('role_change_failure', on_role_change_failure) 2354 2355 # [Classic only] 2356 async def request_remote_name(self, remote: Union[Address, Connection]) -> str: 2357 # Set up event handlers 2358 pending_name = asyncio.get_running_loop().create_future() 2359 2360 peer_address = remote if isinstance(remote, Address) else remote.peer_address 2361 2362 handler = self.on( 2363 'remote_name', 2364 lambda address, remote_name: pending_name.set_result(remote_name) 2365 if address == peer_address 2366 else None, 2367 ) 2368 failure_handler = self.on( 2369 'remote_name_failure', 2370 lambda address, error_code: pending_name.set_exception( 2371 HCI_Error(error_code) 2372 ) 2373 if address == peer_address 2374 else None, 2375 ) 2376 2377 try: 2378 result = await self.send_command( 2379 HCI_Remote_Name_Request_Command( 2380 bd_addr=peer_address, 2381 page_scan_repetition_mode=HCI_Remote_Name_Request_Command.R2, 2382 reserved=0, 2383 clock_offset=0, # TODO investigate non-0 values 2384 ) # type: ignore[call-arg] 2385 ) 2386 2387 if result.status != HCI_COMMAND_STATUS_PENDING: 2388 logger.warning( 2389 'HCI_Set_Connection_Encryption_Command failed: ' 2390 f'{HCI_Constant.error_name(result.status)}' 2391 ) 2392 raise HCI_StatusError(result) 2393 2394 # Wait for the result 2395 return await self.abort_on('flush', pending_name) 2396 finally: 2397 self.remove_listener('remote_name', handler) 2398 self.remove_listener('remote_name_failure', failure_handler) 2399 2400 @host_event_handler 2401 def on_flush(self): 2402 self.emit('flush') 2403 for _, connection in self.connections.items(): 2404 connection.emit('disconnection', 0) 2405 self.connections = {} 2406 2407 # [Classic only] 2408 @host_event_handler 2409 def on_link_key(self, bd_addr, link_key, key_type): 2410 # Store the keys in the key store 2411 if self.keystore: 2412 pairing_keys = PairingKeys() 2413 pairing_keys.link_key = PairingKeys.Key(value=link_key) 2414 2415 async def store_keys(): 2416 try: 2417 await self.keystore.update(str(bd_addr), pairing_keys) 2418 except Exception as error: 2419 logger.warning(f'!!! error while storing keys: {error}') 2420 2421 self.abort_on('flush', store_keys()) 2422 2423 if connection := self.find_connection_by_bd_addr( 2424 bd_addr, transport=BT_BR_EDR_TRANSPORT 2425 ): 2426 connection.link_key_type = key_type 2427 2428 def add_service(self, service): 2429 self.gatt_server.add_service(service) 2430 2431 def add_services(self, services): 2432 self.gatt_server.add_services(services) 2433 2434 def add_default_services(self, generic_access_service=True): 2435 # Add a GAP Service if requested 2436 if generic_access_service: 2437 self.gatt_server.add_service(GenericAccessService(self.name)) 2438 2439 async def notify_subscriber(self, connection, attribute, value=None, force=False): 2440 await self.gatt_server.notify_subscriber(connection, attribute, value, force) 2441 2442 async def notify_subscribers(self, attribute, value=None, force=False): 2443 await self.gatt_server.notify_subscribers(attribute, value, force) 2444 2445 async def indicate_subscriber(self, connection, attribute, value=None, force=False): 2446 await self.gatt_server.indicate_subscriber(connection, attribute, value, force) 2447 2448 async def indicate_subscribers(self, attribute, value=None, force=False): 2449 await self.gatt_server.indicate_subscribers(attribute, value, force) 2450 2451 @host_event_handler 2452 def on_connection( 2453 self, 2454 connection_handle, 2455 transport, 2456 peer_address, 2457 peer_resolvable_address, 2458 role, 2459 connection_parameters, 2460 ): 2461 logger.debug( 2462 f'*** Connection: [0x{connection_handle:04X}] ' 2463 f'{peer_address} as {HCI_Constant.role_name(role)}' 2464 ) 2465 if connection_handle in self.connections: 2466 logger.warning( 2467 'new connection reuses the same handle as a previous connection' 2468 ) 2469 2470 if transport == BT_BR_EDR_TRANSPORT: 2471 # Create a new connection 2472 connection = self.pending_connections.pop(peer_address) 2473 connection.complete( 2474 connection_handle, peer_resolvable_address, role, connection_parameters 2475 ) 2476 self.connections[connection_handle] = connection 2477 2478 # Emit an event to notify listeners of the new connection 2479 self.emit('connection', connection) 2480 else: 2481 # Resolve the peer address if we can 2482 if self.address_resolver: 2483 if peer_address.is_resolvable: 2484 resolved_address = self.address_resolver.resolve(peer_address) 2485 if resolved_address is not None: 2486 logger.debug(f'*** Address resolved as {resolved_address}') 2487 peer_resolvable_address = peer_address 2488 peer_address = resolved_address 2489 2490 # Guess which own address type is used for this connection. 2491 # This logic is somewhat correct but may need to be improved 2492 # when multiple advertising are run simultaneously. 2493 if self.connect_own_address_type is not None: 2494 own_address_type = self.connect_own_address_type 2495 else: 2496 own_address_type = self.advertising_own_address_type 2497 2498 # We are no longer advertising 2499 self.advertising_own_address_type = None 2500 self.advertising = False 2501 2502 if own_address_type in ( 2503 OwnAddressType.PUBLIC, 2504 OwnAddressType.RESOLVABLE_OR_PUBLIC, 2505 ): 2506 self_address = self.public_address 2507 else: 2508 self_address = self.random_address 2509 2510 # Create a new connection 2511 connection = Connection( 2512 self, 2513 connection_handle, 2514 transport, 2515 self_address, 2516 peer_address, 2517 peer_resolvable_address, 2518 role, 2519 connection_parameters, 2520 ConnectionPHY(HCI_LE_1M_PHY, HCI_LE_1M_PHY), 2521 ) 2522 self.connections[connection_handle] = connection 2523 2524 # If supported, read which PHY we're connected with before 2525 # notifying listeners of the new connection. 2526 if self.host.supports_command(HCI_LE_READ_PHY_COMMAND): 2527 2528 async def read_phy(): 2529 result = await self.send_command( 2530 HCI_LE_Read_PHY_Command(connection_handle=connection_handle), 2531 check_result=True, 2532 ) 2533 connection.phy = ConnectionPHY( 2534 result.return_parameters.tx_phy, result.return_parameters.rx_phy 2535 ) 2536 # Emit an event to notify listeners of the new connection 2537 self.emit('connection', connection) 2538 2539 # Do so asynchronously to not block the current event handler 2540 connection.abort_on('disconnection', read_phy()) 2541 2542 else: 2543 # Emit an event to notify listeners of the new connection 2544 self.emit('connection', connection) 2545 2546 @host_event_handler 2547 def on_connection_failure(self, transport, peer_address, error_code): 2548 logger.debug(f'*** Connection failed: {HCI_Constant.error_name(error_code)}') 2549 2550 # For directed advertising, this means a timeout 2551 if ( 2552 transport == BT_LE_TRANSPORT 2553 and self.advertising 2554 and self.advertising_type.is_directed 2555 ): 2556 self.advertising_own_address_type = None 2557 self.advertising = False 2558 2559 # Notify listeners 2560 error = core.ConnectionError( 2561 error_code, 2562 transport, 2563 peer_address, 2564 'hci', 2565 HCI_Constant.error_name(error_code), 2566 ) 2567 self.emit('connection_failure', error) 2568 2569 # FIXME: Explore a delegate-model for BR/EDR wait connection #56. 2570 @host_event_handler 2571 def on_connection_request(self, bd_addr, class_of_device, link_type): 2572 logger.debug(f'*** Connection request: {bd_addr}') 2573 2574 # match a pending future using `bd_addr` 2575 if bd_addr in self.classic_pending_accepts: 2576 future, *_ = self.classic_pending_accepts.pop(bd_addr) 2577 future.set_result((bd_addr, class_of_device, link_type)) 2578 2579 # match first pending future for ANY address 2580 elif len(self.classic_pending_accepts[Address.ANY]) > 0: 2581 future = self.classic_pending_accepts[Address.ANY].pop(0) 2582 future.set_result((bd_addr, class_of_device, link_type)) 2583 2584 # device configuration is set to accept any incoming connection 2585 elif self.classic_accept_any: 2586 # Save pending connection 2587 self.pending_connections[bd_addr] = Connection.incomplete(self, bd_addr) 2588 2589 self.host.send_command_sync( 2590 HCI_Accept_Connection_Request_Command( 2591 bd_addr=bd_addr, role=0x01 # Remain the peripheral 2592 ) 2593 ) 2594 2595 # reject incoming connection 2596 else: 2597 self.host.send_command_sync( 2598 HCI_Reject_Connection_Request_Command( 2599 bd_addr=bd_addr, 2600 reason=HCI_CONNECTION_REJECTED_DUE_TO_LIMITED_RESOURCES_ERROR, 2601 ) 2602 ) 2603 2604 @host_event_handler 2605 @with_connection_from_handle 2606 def on_disconnection(self, connection, reason): 2607 logger.debug( 2608 f'*** Disconnection: [0x{connection.handle:04X}] ' 2609 f'{connection.peer_address} as {connection.role_name}, reason={reason}' 2610 ) 2611 connection.emit('disconnection', reason) 2612 2613 # Remove the connection from the map 2614 del self.connections[connection.handle] 2615 2616 # Cleanup subsystems that maintain per-connection state 2617 self.gatt_server.on_disconnection(connection) 2618 2619 # Restart advertising if auto-restart is enabled 2620 if self.auto_restart_advertising: 2621 logger.debug('restarting advertising') 2622 self.abort_on( 2623 'flush', 2624 self.start_advertising( 2625 advertising_type=self.advertising_type, auto_restart=True 2626 ), 2627 ) 2628 2629 @host_event_handler 2630 @with_connection_from_handle 2631 def on_disconnection_failure(self, connection, error_code): 2632 logger.debug(f'*** Disconnection failed: {error_code}') 2633 error = core.ConnectionError( 2634 error_code, 2635 connection.transport, 2636 connection.peer_address, 2637 'hci', 2638 HCI_Constant.error_name(error_code), 2639 ) 2640 connection.emit('disconnection_failure', error) 2641 2642 @host_event_handler 2643 @AsyncRunner.run_in_task() 2644 async def on_inquiry_complete(self): 2645 if self.auto_restart_inquiry: 2646 # Inquire again 2647 await self.start_discovery(auto_restart=True) 2648 else: 2649 self.auto_restart_inquiry = True 2650 self.discovering = False 2651 self.emit('inquiry_complete') 2652 2653 @host_event_handler 2654 @with_connection_from_handle 2655 def on_connection_authentication(self, connection): 2656 logger.debug( 2657 f'*** Connection Authentication: [0x{connection.handle:04X}] ' 2658 f'{connection.peer_address} as {connection.role_name}' 2659 ) 2660 connection.authenticated = True 2661 connection.emit('connection_authentication') 2662 2663 @host_event_handler 2664 @with_connection_from_handle 2665 def on_connection_authentication_failure(self, connection, error): 2666 logger.debug( 2667 f'*** Connection Authentication Failure: [0x{connection.handle:04X}] ' 2668 f'{connection.peer_address} as {connection.role_name}, error={error}' 2669 ) 2670 connection.emit('connection_authentication_failure', error) 2671 2672 @host_event_handler 2673 @with_connection_from_address 2674 def on_ssp_complete(self, connection): 2675 # On Secure Simple Pairing complete, in case: 2676 # - Connection isn't already authenticated 2677 # - AND we are not the initiator of the authentication 2678 # We must trigger authentication to known if we are truly authenticated 2679 if not connection.authenticating and not connection.authenticated: 2680 logger.debug( 2681 f'*** Trigger Connection Authentication: [0x{connection.handle:04X}] ' 2682 f'{connection.peer_address}' 2683 ) 2684 asyncio.create_task(connection.authenticate()) 2685 2686 # [Classic only] 2687 @host_event_handler 2688 @with_connection_from_address 2689 def on_authentication_io_capability_request(self, connection): 2690 # Ask what the pairing config should be for this connection 2691 pairing_config = self.pairing_config_factory(connection) 2692 2693 # Map the SMP IO capability to a Classic IO capability 2694 # pylint: disable=line-too-long 2695 io_capability = { 2696 smp.SMP_DISPLAY_ONLY_IO_CAPABILITY: HCI_DISPLAY_ONLY_IO_CAPABILITY, 2697 smp.SMP_DISPLAY_YES_NO_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY, 2698 smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY: HCI_KEYBOARD_ONLY_IO_CAPABILITY, 2699 smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, 2700 smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: HCI_DISPLAY_YES_NO_IO_CAPABILITY, 2701 }.get(pairing_config.delegate.io_capability) 2702 2703 if io_capability is None: 2704 logger.warning( 2705 f'cannot map IO capability ({pairing_config.delegate.io_capability}' 2706 ) 2707 io_capability = HCI_NO_INPUT_NO_OUTPUT_IO_CAPABILITY 2708 2709 # Compute the authentication requirements 2710 authentication_requirements = ( 2711 # No Bonding 2712 ( 2713 HCI_MITM_NOT_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 2714 HCI_MITM_REQUIRED_NO_BONDING_AUTHENTICATION_REQUIREMENTS, 2715 ), 2716 # General Bonding 2717 ( 2718 HCI_MITM_NOT_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 2719 HCI_MITM_REQUIRED_GENERAL_BONDING_AUTHENTICATION_REQUIREMENTS, 2720 ), 2721 )[1 if pairing_config.bonding else 0][1 if pairing_config.mitm else 0] 2722 2723 # Respond 2724 self.host.send_command_sync( 2725 HCI_IO_Capability_Request_Reply_Command( 2726 bd_addr=connection.peer_address, 2727 io_capability=io_capability, 2728 oob_data_present=0x00, # Not present 2729 authentication_requirements=authentication_requirements, 2730 ) 2731 ) 2732 2733 # [Classic only] 2734 @host_event_handler 2735 @with_connection_from_address 2736 def on_authentication_user_confirmation_request(self, connection, code): 2737 # Ask what the pairing config should be for this connection 2738 pairing_config = self.pairing_config_factory(connection) 2739 2740 can_compare = pairing_config.delegate.io_capability not in ( 2741 smp.SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY, 2742 smp.SMP_DISPLAY_ONLY_IO_CAPABILITY, 2743 ) 2744 2745 # Respond 2746 if can_compare: 2747 2748 async def compare_numbers(): 2749 numbers_match = await connection.abort_on( 2750 'disconnection', 2751 pairing_config.delegate.compare_numbers(code, digits=6), 2752 ) 2753 if numbers_match: 2754 await self.host.send_command( 2755 HCI_User_Confirmation_Request_Reply_Command( 2756 bd_addr=connection.peer_address 2757 ) 2758 ) 2759 else: 2760 await self.host.send_command( 2761 HCI_User_Confirmation_Request_Negative_Reply_Command( 2762 bd_addr=connection.peer_address 2763 ) 2764 ) 2765 2766 asyncio.create_task(compare_numbers()) 2767 else: 2768 2769 async def confirm(): 2770 confirm = await connection.abort_on( 2771 'disconnection', pairing_config.delegate.confirm() 2772 ) 2773 if confirm: 2774 await self.host.send_command( 2775 HCI_User_Confirmation_Request_Reply_Command( 2776 bd_addr=connection.peer_address 2777 ) 2778 ) 2779 else: 2780 await self.host.send_command( 2781 HCI_User_Confirmation_Request_Negative_Reply_Command( 2782 bd_addr=connection.peer_address 2783 ) 2784 ) 2785 2786 asyncio.create_task(confirm()) 2787 2788 # [Classic only] 2789 @host_event_handler 2790 @with_connection_from_address 2791 def on_authentication_user_passkey_request(self, connection): 2792 # Ask what the pairing config should be for this connection 2793 pairing_config = self.pairing_config_factory(connection) 2794 2795 can_input = pairing_config.delegate.io_capability in ( 2796 smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY, 2797 smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY, 2798 ) 2799 2800 # Respond 2801 if can_input: 2802 2803 async def get_number(): 2804 number = await connection.abort_on( 2805 'disconnection', pairing_config.delegate.get_number() 2806 ) 2807 if number is not None: 2808 await self.host.send_command( 2809 HCI_User_Passkey_Request_Reply_Command( 2810 bd_addr=connection.peer_address, numeric_value=number 2811 ) 2812 ) 2813 else: 2814 await self.host.send_command( 2815 HCI_User_Passkey_Request_Negative_Reply_Command( 2816 bd_addr=connection.peer_address 2817 ) 2818 ) 2819 2820 asyncio.create_task(get_number()) 2821 else: 2822 self.host.send_command_sync( 2823 HCI_User_Passkey_Request_Negative_Reply_Command( 2824 bd_addr=connection.peer_address 2825 ) 2826 ) 2827 2828 # [Classic only] 2829 @host_event_handler 2830 @with_connection_from_address 2831 def on_pin_code_request(self, connection): 2832 # classic legacy pairing 2833 # Ask what the pairing config should be for this connection 2834 pairing_config = self.pairing_config_factory(connection) 2835 2836 can_input = pairing_config.delegate.io_capability in ( 2837 smp.SMP_KEYBOARD_ONLY_IO_CAPABILITY, 2838 smp.SMP_KEYBOARD_DISPLAY_IO_CAPABILITY, 2839 ) 2840 2841 # respond the pin code 2842 if can_input: 2843 2844 async def get_pin_code(): 2845 pin_code = await connection.abort_on( 2846 'disconnection', pairing_config.delegate.get_string(16) 2847 ) 2848 2849 if pin_code is not None: 2850 pin_code = bytes(pin_code, encoding='utf-8') 2851 pin_code_len = len(pin_code) 2852 assert 0 < pin_code_len <= 16, "pin_code should be 1-16 bytes" 2853 await self.host.send_command( 2854 HCI_PIN_Code_Request_Reply_Command( 2855 bd_addr=connection.peer_address, 2856 pin_code_length=pin_code_len, 2857 pin_code=pin_code, 2858 ) 2859 ) 2860 else: 2861 logger.debug("delegate.get_string() returned None") 2862 await self.host.send_command( 2863 HCI_PIN_Code_Request_Negative_Reply_Command( 2864 bd_addr=connection.peer_address 2865 ) 2866 ) 2867 2868 asyncio.create_task(get_pin_code()) 2869 else: 2870 self.host.send_command_sync( 2871 HCI_PIN_Code_Request_Negative_Reply_Command( 2872 bd_addr=connection.peer_address 2873 ) 2874 ) 2875 2876 # [Classic only] 2877 @host_event_handler 2878 @with_connection_from_address 2879 def on_authentication_user_passkey_notification(self, connection, passkey): 2880 # Ask what the pairing config should be for this connection 2881 pairing_config = self.pairing_config_factory(connection) 2882 2883 connection.abort_on( 2884 'disconnection', pairing_config.delegate.display_number(passkey) 2885 ) 2886 2887 # [Classic only] 2888 @host_event_handler 2889 @try_with_connection_from_address 2890 def on_remote_name(self, connection: Connection, address, remote_name): 2891 # Try to decode the name 2892 try: 2893 remote_name = remote_name.decode('utf-8') 2894 if connection: 2895 connection.peer_name = remote_name 2896 connection.emit('remote_name') 2897 self.emit('remote_name', address, remote_name) 2898 except UnicodeDecodeError as error: 2899 logger.warning('peer name is not valid UTF-8') 2900 if connection: 2901 connection.emit('remote_name_failure', error) 2902 else: 2903 self.emit('remote_name_failure', address, error) 2904 2905 # [Classic only] 2906 @host_event_handler 2907 @try_with_connection_from_address 2908 def on_remote_name_failure(self, connection: Connection, address, error): 2909 if connection: 2910 connection.emit('remote_name_failure', error) 2911 self.emit('remote_name_failure', address, error) 2912 2913 @host_event_handler 2914 @with_connection_from_handle 2915 def on_connection_encryption_change(self, connection, encryption): 2916 logger.debug( 2917 f'*** Connection Encryption Change: [0x{connection.handle:04X}] ' 2918 f'{connection.peer_address} as {connection.role_name}, ' 2919 f'encryption={encryption}' 2920 ) 2921 connection.encryption = encryption 2922 if ( 2923 not connection.authenticated 2924 and encryption == HCI_Encryption_Change_Event.AES_CCM 2925 ): 2926 connection.authenticated = True 2927 connection.sc = True 2928 connection.emit('connection_encryption_change') 2929 2930 @host_event_handler 2931 @with_connection_from_handle 2932 def on_connection_encryption_failure(self, connection, error): 2933 logger.debug( 2934 f'*** Connection Encryption Failure: [0x{connection.handle:04X}] ' 2935 f'{connection.peer_address} as {connection.role_name}, ' 2936 f'error={error}' 2937 ) 2938 connection.emit('connection_encryption_failure', error) 2939 2940 @host_event_handler 2941 @with_connection_from_handle 2942 def on_connection_encryption_key_refresh(self, connection): 2943 logger.debug( 2944 f'*** Connection Key Refresh: [0x{connection.handle:04X}] ' 2945 f'{connection.peer_address} as {connection.role_name}' 2946 ) 2947 connection.emit('connection_encryption_key_refresh') 2948 2949 @host_event_handler 2950 @with_connection_from_handle 2951 def on_connection_parameters_update(self, connection, connection_parameters): 2952 logger.debug( 2953 f'*** Connection Parameters Update: [0x{connection.handle:04X}] ' 2954 f'{connection.peer_address} as {connection.role_name}, ' 2955 f'{connection_parameters}' 2956 ) 2957 connection.parameters = connection_parameters 2958 connection.emit('connection_parameters_update') 2959 2960 @host_event_handler 2961 @with_connection_from_handle 2962 def on_connection_parameters_update_failure(self, connection, error): 2963 logger.debug( 2964 f'*** Connection Parameters Update Failed: [0x{connection.handle:04X}] ' 2965 f'{connection.peer_address} as {connection.role_name}, ' 2966 f'error={error}' 2967 ) 2968 connection.emit('connection_parameters_update_failure', error) 2969 2970 @host_event_handler 2971 @with_connection_from_handle 2972 def on_connection_phy_update(self, connection, connection_phy): 2973 logger.debug( 2974 f'*** Connection PHY Update: [0x{connection.handle:04X}] ' 2975 f'{connection.peer_address} as {connection.role_name}, ' 2976 f'{connection_phy}' 2977 ) 2978 connection.phy = connection_phy 2979 connection.emit('connection_phy_update') 2980 2981 @host_event_handler 2982 @with_connection_from_handle 2983 def on_connection_phy_update_failure(self, connection, error): 2984 logger.debug( 2985 f'*** Connection PHY Update Failed: [0x{connection.handle:04X}] ' 2986 f'{connection.peer_address} as {connection.role_name}, ' 2987 f'error={error}' 2988 ) 2989 connection.emit('connection_phy_update_failure', error) 2990 2991 @host_event_handler 2992 @with_connection_from_handle 2993 def on_connection_att_mtu_update(self, connection, att_mtu): 2994 logger.debug( 2995 f'*** Connection ATT MTU Update: [0x{connection.handle:04X}] ' 2996 f'{connection.peer_address} as {connection.role_name}, ' 2997 f'{att_mtu}' 2998 ) 2999 connection.att_mtu = att_mtu 3000 connection.emit('connection_att_mtu_update') 3001 3002 @host_event_handler 3003 @with_connection_from_handle 3004 def on_connection_data_length_change( 3005 self, connection, max_tx_octets, max_tx_time, max_rx_octets, max_rx_time 3006 ): 3007 logger.debug( 3008 f'*** Connection Data Length Change: [0x{connection.handle:04X}] ' 3009 f'{connection.peer_address} as {connection.role_name}' 3010 ) 3011 connection.data_length = ( 3012 max_tx_octets, 3013 max_tx_time, 3014 max_rx_octets, 3015 max_rx_time, 3016 ) 3017 connection.emit('connection_data_length_change') 3018 3019 # [Classic only] 3020 @host_event_handler 3021 @with_connection_from_address 3022 def on_role_change(self, connection, new_role): 3023 connection.role = new_role 3024 connection.emit('role_change', new_role) 3025 3026 # [Classic only] 3027 @host_event_handler 3028 @try_with_connection_from_address 3029 def on_role_change_failure(self, connection, address, error): 3030 if connection: 3031 connection.emit('role_change_failure', error) 3032 self.emit('role_change_failure', address, error) 3033 3034 @with_connection_from_handle 3035 def on_pairing_start(self, connection): 3036 connection.emit('pairing_start') 3037 3038 @with_connection_from_handle 3039 def on_pairing(self, connection, keys, sc): 3040 connection.sc = sc 3041 connection.authenticated = True 3042 connection.emit('pairing', keys) 3043 3044 @with_connection_from_handle 3045 def on_pairing_failure(self, connection, reason): 3046 connection.emit('pairing_failure', reason) 3047 3048 @with_connection_from_handle 3049 def on_gatt_pdu(self, connection, pdu): 3050 # Parse the L2CAP payload into an ATT PDU object 3051 att_pdu = ATT_PDU.from_bytes(pdu) 3052 3053 # Conveniently, even-numbered op codes are client->server and 3054 # odd-numbered ones are server->client 3055 if att_pdu.op_code & 1: 3056 if connection.gatt_client is None: 3057 logger.warning( 3058 color('no GATT client for connection 0x{connection_handle:04X}') 3059 ) 3060 return 3061 connection.gatt_client.on_gatt_pdu(att_pdu) 3062 else: 3063 if connection.gatt_server is None: 3064 logger.warning( 3065 color('no GATT server for connection 0x{connection_handle:04X}') 3066 ) 3067 return 3068 connection.gatt_server.on_gatt_pdu(connection, att_pdu) 3069 3070 @with_connection_from_handle 3071 def on_smp_pdu(self, connection, pdu): 3072 self.smp_manager.on_smp_pdu(connection, pdu) 3073 3074 @host_event_handler 3075 @with_connection_from_handle 3076 def on_l2cap_pdu(self, connection, cid, pdu): 3077 self.l2cap_channel_manager.on_pdu(connection, cid, pdu) 3078 3079 def __str__(self): 3080 return ( 3081 f'Device(name="{self.name}", ' 3082 f'random_address="{self.random_address}", ' 3083 f'public_address="{self.public_address}")' 3084 ) 3085