1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# SMP - Security Manager Protocol 17# 18# See Bluetooth spec @ Vol 3, Part H 19# 20# ----------------------------------------------------------------------------- 21 22# ----------------------------------------------------------------------------- 23# Imports 24# ----------------------------------------------------------------------------- 25from __future__ import annotations 26import logging 27import asyncio 28import enum 29import secrets 30from dataclasses import dataclass 31from typing import ( 32 TYPE_CHECKING, 33 Any, 34 Awaitable, 35 Callable, 36 Dict, 37 List, 38 Optional, 39 Tuple, 40 Type, 41 cast, 42) 43 44from pyee import EventEmitter 45 46from .colors import color 47from .hci import ( 48 Address, 49 HCI_LE_Enable_Encryption_Command, 50 HCI_Object, 51 key_with_value, 52) 53from .core import ( 54 BT_BR_EDR_TRANSPORT, 55 BT_CENTRAL_ROLE, 56 BT_LE_TRANSPORT, 57 AdvertisingData, 58 ProtocolError, 59 name_or_number, 60) 61from .keys import PairingKeys 62from . import crypto 63 64if TYPE_CHECKING: 65 from bumble.device import Connection, Device 66 from bumble.pairing import PairingConfig 67 68 69# ----------------------------------------------------------------------------- 70# Logging 71# ----------------------------------------------------------------------------- 72logger = logging.getLogger(__name__) 73 74 75# ----------------------------------------------------------------------------- 76# Constants 77# ----------------------------------------------------------------------------- 78# fmt: off 79# pylint: disable=line-too-long 80 81SMP_CID = 0x06 82SMP_BR_CID = 0x07 83 84SMP_PAIRING_REQUEST_COMMAND = 0x01 85SMP_PAIRING_RESPONSE_COMMAND = 0x02 86SMP_PAIRING_CONFIRM_COMMAND = 0x03 87SMP_PAIRING_RANDOM_COMMAND = 0x04 88SMP_PAIRING_FAILED_COMMAND = 0x05 89SMP_ENCRYPTION_INFORMATION_COMMAND = 0x06 90SMP_MASTER_IDENTIFICATION_COMMAND = 0x07 91SMP_IDENTITY_INFORMATION_COMMAND = 0x08 92SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND = 0x09 93SMP_SIGNING_INFORMATION_COMMAND = 0x0A 94SMP_SECURITY_REQUEST_COMMAND = 0x0B 95SMP_PAIRING_PUBLIC_KEY_COMMAND = 0x0C 96SMP_PAIRING_DHKEY_CHECK_COMMAND = 0x0D 97SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E 98 99SMP_COMMAND_NAMES = { 100 SMP_PAIRING_REQUEST_COMMAND: 'SMP_PAIRING_REQUEST_COMMAND', 101 SMP_PAIRING_RESPONSE_COMMAND: 'SMP_PAIRING_RESPONSE_COMMAND', 102 SMP_PAIRING_CONFIRM_COMMAND: 'SMP_PAIRING_CONFIRM_COMMAND', 103 SMP_PAIRING_RANDOM_COMMAND: 'SMP_PAIRING_RANDOM_COMMAND', 104 SMP_PAIRING_FAILED_COMMAND: 'SMP_PAIRING_FAILED_COMMAND', 105 SMP_ENCRYPTION_INFORMATION_COMMAND: 'SMP_ENCRYPTION_INFORMATION_COMMAND', 106 SMP_MASTER_IDENTIFICATION_COMMAND: 'SMP_MASTER_IDENTIFICATION_COMMAND', 107 SMP_IDENTITY_INFORMATION_COMMAND: 'SMP_IDENTITY_INFORMATION_COMMAND', 108 SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND: 'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND', 109 SMP_SIGNING_INFORMATION_COMMAND: 'SMP_SIGNING_INFORMATION_COMMAND', 110 SMP_SECURITY_REQUEST_COMMAND: 'SMP_SECURITY_REQUEST_COMMAND', 111 SMP_PAIRING_PUBLIC_KEY_COMMAND: 'SMP_PAIRING_PUBLIC_KEY_COMMAND', 112 SMP_PAIRING_DHKEY_CHECK_COMMAND: 'SMP_PAIRING_DHKEY_CHECK_COMMAND', 113 SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND' 114} 115 116SMP_DISPLAY_ONLY_IO_CAPABILITY = 0x00 117SMP_DISPLAY_YES_NO_IO_CAPABILITY = 0x01 118SMP_KEYBOARD_ONLY_IO_CAPABILITY = 0x02 119SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03 120SMP_KEYBOARD_DISPLAY_IO_CAPABILITY = 0x04 121 122SMP_IO_CAPABILITY_NAMES = { 123 SMP_DISPLAY_ONLY_IO_CAPABILITY: 'SMP_DISPLAY_ONLY_IO_CAPABILITY', 124 SMP_DISPLAY_YES_NO_IO_CAPABILITY: 'SMP_DISPLAY_YES_NO_IO_CAPABILITY', 125 SMP_KEYBOARD_ONLY_IO_CAPABILITY: 'SMP_KEYBOARD_ONLY_IO_CAPABILITY', 126 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY', 127 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: 'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY' 128} 129 130SMP_PASSKEY_ENTRY_FAILED_ERROR = 0x01 131SMP_OOB_NOT_AVAILABLE_ERROR = 0x02 132SMP_AUTHENTICATION_REQUIREMENTS_ERROR = 0x03 133SMP_CONFIRM_VALUE_FAILED_ERROR = 0x04 134SMP_PAIRING_NOT_SUPPORTED_ERROR = 0x05 135SMP_ENCRYPTION_KEY_SIZE_ERROR = 0x06 136SMP_COMMAND_NOT_SUPPORTED_ERROR = 0x07 137SMP_UNSPECIFIED_REASON_ERROR = 0x08 138SMP_REPEATED_ATTEMPTS_ERROR = 0x09 139SMP_INVALID_PARAMETERS_ERROR = 0x0A 140SMP_DHKEY_CHECK_FAILED_ERROR = 0x0B 141SMP_NUMERIC_COMPARISON_FAILED_ERROR = 0x0C 142SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR = 0x0D 143SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E 144 145SMP_ERROR_NAMES = { 146 SMP_PASSKEY_ENTRY_FAILED_ERROR: 'SMP_PASSKEY_ENTRY_FAILED_ERROR', 147 SMP_OOB_NOT_AVAILABLE_ERROR: 'SMP_OOB_NOT_AVAILABLE_ERROR', 148 SMP_AUTHENTICATION_REQUIREMENTS_ERROR: 'SMP_AUTHENTICATION_REQUIREMENTS_ERROR', 149 SMP_CONFIRM_VALUE_FAILED_ERROR: 'SMP_CONFIRM_VALUE_FAILED_ERROR', 150 SMP_PAIRING_NOT_SUPPORTED_ERROR: 'SMP_PAIRING_NOT_SUPPORTED_ERROR', 151 SMP_ENCRYPTION_KEY_SIZE_ERROR: 'SMP_ENCRYPTION_KEY_SIZE_ERROR', 152 SMP_COMMAND_NOT_SUPPORTED_ERROR: 'SMP_COMMAND_NOT_SUPPORTED_ERROR', 153 SMP_UNSPECIFIED_REASON_ERROR: 'SMP_UNSPECIFIED_REASON_ERROR', 154 SMP_REPEATED_ATTEMPTS_ERROR: 'SMP_REPEATED_ATTEMPTS_ERROR', 155 SMP_INVALID_PARAMETERS_ERROR: 'SMP_INVALID_PARAMETERS_ERROR', 156 SMP_DHKEY_CHECK_FAILED_ERROR: 'SMP_DHKEY_CHECK_FAILED_ERROR', 157 SMP_NUMERIC_COMPARISON_FAILED_ERROR: 'SMP_NUMERIC_COMPARISON_FAILED_ERROR', 158 SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR: 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR', 159 SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR' 160} 161 162SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE = 0 163SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE = 1 164SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE = 2 165SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE = 3 166SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4 167 168SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = { 169 SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE', 170 SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE', 171 SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE', 172 SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE', 173 SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE' 174} 175 176# Bit flags for key distribution/generation 177SMP_ENC_KEY_DISTRIBUTION_FLAG = 0b0001 178SMP_ID_KEY_DISTRIBUTION_FLAG = 0b0010 179SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100 180SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000 181 182# AuthReq fields 183SMP_BONDING_AUTHREQ = 0b00000001 184SMP_MITM_AUTHREQ = 0b00000100 185SMP_SC_AUTHREQ = 0b00001000 186SMP_KEYPRESS_AUTHREQ = 0b00010000 187SMP_CT2_AUTHREQ = 0b00100000 188 189# Crypto salt 190SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031') 191SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032') 192 193# fmt: on 194# pylint: enable=line-too-long 195# pylint: disable=invalid-name 196 197 198# ----------------------------------------------------------------------------- 199# Utils 200# ----------------------------------------------------------------------------- 201def error_name(error_code: int) -> str: 202 return name_or_number(SMP_ERROR_NAMES, error_code) 203 204 205# ----------------------------------------------------------------------------- 206# Classes 207# ----------------------------------------------------------------------------- 208class SMP_Command: 209 ''' 210 See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL 211 ''' 212 213 smp_classes: Dict[int, Type[SMP_Command]] = {} 214 fields: Any 215 code = 0 216 name = '' 217 218 @staticmethod 219 def from_bytes(pdu: bytes) -> "SMP_Command": 220 code = pdu[0] 221 222 cls = SMP_Command.smp_classes.get(code) 223 if cls is None: 224 instance = SMP_Command(pdu) 225 instance.name = SMP_Command.command_name(code) 226 instance.code = code 227 return instance 228 self = cls.__new__(cls) 229 SMP_Command.__init__(self, pdu) 230 if hasattr(self, 'fields'): 231 self.init_from_bytes(pdu, 1) 232 return self 233 234 @staticmethod 235 def command_name(code: int) -> str: 236 return name_or_number(SMP_COMMAND_NAMES, code) 237 238 @staticmethod 239 def auth_req_str(value: int) -> str: 240 bonding_flags = value & 3 241 mitm = (value >> 2) & 1 242 sc = (value >> 3) & 1 243 keypress = (value >> 4) & 1 244 ct2 = (value >> 5) & 1 245 246 return ( 247 f'bonding_flags={bonding_flags}, ' 248 f'MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}' 249 ) 250 251 @staticmethod 252 def io_capability_name(io_capability: int) -> str: 253 return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability) 254 255 @staticmethod 256 def key_distribution_str(value: int) -> str: 257 key_types: List[str] = [] 258 if value & SMP_ENC_KEY_DISTRIBUTION_FLAG: 259 key_types.append('ENC') 260 if value & SMP_ID_KEY_DISTRIBUTION_FLAG: 261 key_types.append('ID') 262 if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 263 key_types.append('SIGN') 264 if value & SMP_LINK_KEY_DISTRIBUTION_FLAG: 265 key_types.append('LINK') 266 return ','.join(key_types) 267 268 @staticmethod 269 def keypress_notification_type_name(notification_type: int) -> str: 270 return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type) 271 272 @staticmethod 273 def subclass(fields): 274 def inner(cls): 275 cls.name = cls.__name__.upper() 276 cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name) 277 if cls.code is None: 278 raise KeyError( 279 f'Command name {cls.name} not found in SMP_COMMAND_NAMES' 280 ) 281 cls.fields = fields 282 283 # Register a factory for this class 284 SMP_Command.smp_classes[cls.code] = cls 285 286 return cls 287 288 return inner 289 290 def __init__(self, pdu: Optional[bytes] = None, **kwargs: Any) -> None: 291 if hasattr(self, 'fields') and kwargs: 292 HCI_Object.init_from_fields(self, self.fields, kwargs) 293 if pdu is None: 294 pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) 295 self.pdu = pdu 296 297 def init_from_bytes(self, pdu: bytes, offset: int) -> None: 298 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 299 300 def to_bytes(self): 301 return self.pdu 302 303 def __bytes__(self): 304 return self.to_bytes() 305 306 def __str__(self): 307 result = color(self.name, 'yellow') 308 if fields := getattr(self, 'fields', None): 309 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 310 else: 311 if len(self.pdu) > 1: 312 result += f': {self.pdu.hex()}' 313 return result 314 315 316# ----------------------------------------------------------------------------- 317@SMP_Command.subclass( 318 [ 319 ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), 320 ('oob_data_flag', 1), 321 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 322 ('maximum_encryption_key_size', 1), 323 ( 324 'initiator_key_distribution', 325 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 326 ), 327 ( 328 'responder_key_distribution', 329 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 330 ), 331 ] 332) 333class SMP_Pairing_Request_Command(SMP_Command): 334 ''' 335 See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request 336 ''' 337 338 io_capability: int 339 oob_data_flag: int 340 auth_req: int 341 maximum_encryption_key_size: int 342 initiator_key_distribution: int 343 responder_key_distribution: int 344 345 346# ----------------------------------------------------------------------------- 347@SMP_Command.subclass( 348 [ 349 ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), 350 ('oob_data_flag', 1), 351 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 352 ('maximum_encryption_key_size', 1), 353 ( 354 'initiator_key_distribution', 355 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 356 ), 357 ( 358 'responder_key_distribution', 359 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 360 ), 361 ] 362) 363class SMP_Pairing_Response_Command(SMP_Command): 364 ''' 365 See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response 366 ''' 367 368 io_capability: int 369 oob_data_flag: int 370 auth_req: int 371 maximum_encryption_key_size: int 372 initiator_key_distribution: int 373 responder_key_distribution: int 374 375 376# ----------------------------------------------------------------------------- 377@SMP_Command.subclass([('confirm_value', 16)]) 378class SMP_Pairing_Confirm_Command(SMP_Command): 379 ''' 380 See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm 381 ''' 382 383 confirm_value: bytes 384 385 386# ----------------------------------------------------------------------------- 387@SMP_Command.subclass([('random_value', 16)]) 388class SMP_Pairing_Random_Command(SMP_Command): 389 ''' 390 See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random 391 ''' 392 393 random_value: bytes 394 395 396# ----------------------------------------------------------------------------- 397@SMP_Command.subclass([('reason', {'size': 1, 'mapper': error_name})]) 398class SMP_Pairing_Failed_Command(SMP_Command): 399 ''' 400 See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed 401 ''' 402 403 reason: int 404 405 406# ----------------------------------------------------------------------------- 407@SMP_Command.subclass([('public_key_x', 32), ('public_key_y', 32)]) 408class SMP_Pairing_Public_Key_Command(SMP_Command): 409 ''' 410 See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key 411 ''' 412 413 public_key_x: bytes 414 public_key_y: bytes 415 416 417# ----------------------------------------------------------------------------- 418@SMP_Command.subclass( 419 [ 420 ('dhkey_check', 16), 421 ] 422) 423class SMP_Pairing_DHKey_Check_Command(SMP_Command): 424 ''' 425 See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check 426 ''' 427 428 dhkey_check: bytes 429 430 431# ----------------------------------------------------------------------------- 432@SMP_Command.subclass( 433 [ 434 ( 435 'notification_type', 436 {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name}, 437 ), 438 ] 439) 440class SMP_Pairing_Keypress_Notification_Command(SMP_Command): 441 ''' 442 See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification 443 ''' 444 445 notification_type: int 446 447 448# ----------------------------------------------------------------------------- 449@SMP_Command.subclass([('long_term_key', 16)]) 450class SMP_Encryption_Information_Command(SMP_Command): 451 ''' 452 See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information 453 ''' 454 455 long_term_key: bytes 456 457 458# ----------------------------------------------------------------------------- 459@SMP_Command.subclass([('ediv', 2), ('rand', 8)]) 460class SMP_Master_Identification_Command(SMP_Command): 461 ''' 462 See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification 463 ''' 464 465 ediv: int 466 rand: bytes 467 468 469# ----------------------------------------------------------------------------- 470@SMP_Command.subclass([('identity_resolving_key', 16)]) 471class SMP_Identity_Information_Command(SMP_Command): 472 ''' 473 See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information 474 ''' 475 476 identity_resolving_key: bytes 477 478 479# ----------------------------------------------------------------------------- 480@SMP_Command.subclass( 481 [ 482 ('addr_type', Address.ADDRESS_TYPE_SPEC), 483 ('bd_addr', Address.parse_address_preceded_by_type), 484 ] 485) 486class SMP_Identity_Address_Information_Command(SMP_Command): 487 ''' 488 See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information 489 ''' 490 491 addr_type: int 492 bd_addr: Address 493 494 495# ----------------------------------------------------------------------------- 496@SMP_Command.subclass([('signature_key', 16)]) 497class SMP_Signing_Information_Command(SMP_Command): 498 ''' 499 See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information 500 ''' 501 502 signature_key: bytes 503 504 505# ----------------------------------------------------------------------------- 506@SMP_Command.subclass( 507 [ 508 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 509 ] 510) 511class SMP_Security_Request_Command(SMP_Command): 512 ''' 513 See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request 514 ''' 515 516 auth_req: int 517 518 519# ----------------------------------------------------------------------------- 520def smp_auth_req(bonding: bool, mitm: bool, sc: bool, keypress: bool, ct2: bool) -> int: 521 value = 0 522 if bonding: 523 value |= SMP_BONDING_AUTHREQ 524 if mitm: 525 value |= SMP_MITM_AUTHREQ 526 if sc: 527 value |= SMP_SC_AUTHREQ 528 if keypress: 529 value |= SMP_KEYPRESS_AUTHREQ 530 if ct2: 531 value |= SMP_CT2_AUTHREQ 532 return value 533 534 535# ----------------------------------------------------------------------------- 536class AddressResolver: 537 def __init__(self, resolving_keys): 538 self.resolving_keys = resolving_keys 539 540 def resolve(self, address): 541 address_bytes = bytes(address) 542 hash_part = address_bytes[0:3] 543 prand = address_bytes[3:6] 544 for irk, resolved_address in self.resolving_keys: 545 local_hash = crypto.ah(irk, prand) 546 if local_hash == hash_part: 547 # Match! 548 if resolved_address.address_type == Address.PUBLIC_DEVICE_ADDRESS: 549 resolved_address_type = Address.PUBLIC_IDENTITY_ADDRESS 550 else: 551 resolved_address_type = Address.RANDOM_IDENTITY_ADDRESS 552 return Address( 553 address=str(resolved_address), address_type=resolved_address_type 554 ) 555 556 return None 557 558 559# ----------------------------------------------------------------------------- 560class PairingMethod(enum.IntEnum): 561 JUST_WORKS = 0 562 NUMERIC_COMPARISON = 1 563 PASSKEY = 2 564 OOB = 3 565 CTKD_OVER_CLASSIC = 4 566 567 568# ----------------------------------------------------------------------------- 569class OobContext: 570 """Cryptographic context for LE SC OOB pairing.""" 571 572 ecc_key: crypto.EccKey 573 r: bytes 574 575 def __init__( 576 self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None 577 ) -> None: 578 self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key 579 self.r = crypto.r() if r is None else r 580 581 def share(self) -> OobSharedData: 582 pkx = self.ecc_key.x[::-1] 583 return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r) 584 585 586# ----------------------------------------------------------------------------- 587class OobLegacyContext: 588 """Cryptographic context for LE Legacy OOB pairing.""" 589 590 tk: bytes 591 592 def __init__(self, tk: Optional[bytes] = None) -> None: 593 self.tk = crypto.r() if tk is None else tk 594 595 596# ----------------------------------------------------------------------------- 597@dataclass 598class OobSharedData: 599 """Shareable data for LE SC OOB pairing.""" 600 601 c: bytes 602 r: bytes 603 604 def to_ad(self) -> AdvertisingData: 605 return AdvertisingData( 606 [ 607 (AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c), 608 (AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r), 609 ] 610 ) 611 612 def __str__(self) -> str: 613 return f'OOB(C={self.c.hex()}, R={self.r.hex()})' 614 615 616# ----------------------------------------------------------------------------- 617class Session: 618 # I/O Capability to pairing method decision matrix 619 # 620 # See Bluetooth spec @ Vol 3, part H - Table 2.8: Mapping of IO Capabilities to Key 621 # Generation Method 622 # 623 # Map: initiator -> responder -> <method> 624 # where <method> may be a simple entry or a 2-element tuple, with the first element 625 # for legacy pairing and the second for secure connections, when the two are 626 # different. Each entry is either a method name, or, for PASSKEY, a tuple: 627 # (method, initiator_displays, responder_displays) 628 # to specify if the initiator and responder should display (True) or input a code 629 # (False). 630 PAIRING_METHODS = { 631 SMP_DISPLAY_ONLY_IO_CAPABILITY: { 632 SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 633 SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS, 634 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False), 635 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 636 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False), 637 }, 638 SMP_DISPLAY_YES_NO_IO_CAPABILITY: { 639 SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 640 SMP_DISPLAY_YES_NO_IO_CAPABILITY: ( 641 PairingMethod.JUST_WORKS, 642 PairingMethod.NUMERIC_COMPARISON, 643 ), 644 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False), 645 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 646 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ( 647 (PairingMethod.PASSKEY, True, False), 648 PairingMethod.NUMERIC_COMPARISON, 649 ), 650 }, 651 SMP_KEYBOARD_ONLY_IO_CAPABILITY: { 652 SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True), 653 SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True), 654 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, False), 655 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 656 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True), 657 }, 658 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { 659 SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 660 SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS, 661 SMP_KEYBOARD_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 662 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 663 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: PairingMethod.JUST_WORKS, 664 }, 665 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: { 666 SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True), 667 SMP_DISPLAY_YES_NO_IO_CAPABILITY: ( 668 (PairingMethod.PASSKEY, False, True), 669 PairingMethod.NUMERIC_COMPARISON, 670 ), 671 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False), 672 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS, 673 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ( 674 (PairingMethod.PASSKEY, True, False), 675 PairingMethod.NUMERIC_COMPARISON, 676 ), 677 }, 678 } 679 680 ea: bytes 681 eb: bytes 682 ltk: bytes 683 preq: bytes 684 pres: bytes 685 tk: bytes 686 687 def __init__( 688 self, 689 manager: Manager, 690 connection: Connection, 691 pairing_config: PairingConfig, 692 is_initiator: bool, 693 ) -> None: 694 self.manager = manager 695 self.connection = connection 696 self.stk = None 697 self.ltk_ediv = 0 698 self.ltk_rand = bytes(8) 699 self.link_key: Optional[bytes] = None 700 self.initiator_key_distribution: int = 0 701 self.responder_key_distribution: int = 0 702 self.peer_random_value: Optional[bytes] = None 703 self.peer_public_key_x: bytes = bytes(32) 704 self.peer_public_key_y = bytes(32) 705 self.peer_ltk = None 706 self.peer_ediv = None 707 self.peer_rand: Optional[bytes] = None 708 self.peer_identity_resolving_key = None 709 self.peer_bd_addr: Optional[Address] = None 710 self.peer_signature_key = None 711 self.peer_expected_distributions: List[Type[SMP_Command]] = [] 712 self.dh_key = b'' 713 self.confirm_value = None 714 self.passkey: Optional[int] = None 715 self.passkey_ready = asyncio.Event() 716 self.passkey_step = 0 717 self.passkey_display = False 718 self.pairing_method: PairingMethod = PairingMethod.JUST_WORKS 719 self.pairing_config = pairing_config 720 self.wait_before_continuing: Optional[asyncio.Future[None]] = None 721 self.completed = False 722 self.ctkd_task: Optional[Awaitable[None]] = None 723 724 # Decide if we're the initiator or the responder 725 self.is_initiator = is_initiator 726 self.is_responder = not self.is_initiator 727 728 # Listen for connection events 729 connection.on('disconnection', self.on_disconnection) 730 connection.on( 731 'connection_encryption_change', self.on_connection_encryption_change 732 ) 733 connection.on( 734 'connection_encryption_key_refresh', 735 self.on_connection_encryption_key_refresh, 736 ) 737 738 # Create a future that can be used to wait for the session to complete 739 if self.is_initiator: 740 self.pairing_result: Optional[asyncio.Future[None]] = ( 741 asyncio.get_running_loop().create_future() 742 ) 743 else: 744 self.pairing_result = None 745 746 # Key Distribution (default values before negotiation) 747 self.initiator_key_distribution = ( 748 pairing_config.delegate.local_initiator_key_distribution 749 ) 750 self.responder_key_distribution = ( 751 pairing_config.delegate.local_responder_key_distribution 752 ) 753 754 # Authentication Requirements Flags - Vol 3, Part H, Figure 3.3 755 self.bonding: bool = pairing_config.bonding 756 self.sc: bool = pairing_config.sc 757 self.mitm: bool = pairing_config.mitm 758 self.keypress = False 759 self.ct2: bool = False 760 761 # I/O Capabilities 762 self.io_capability = pairing_config.delegate.io_capability 763 self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY 764 765 # OOB 766 self.oob_data_flag = 0 if pairing_config.oob is None else 1 767 768 # Set up addresses 769 self_address = connection.self_address 770 peer_address = connection.peer_resolvable_address or connection.peer_address 771 if self.is_initiator: 772 self.ia = bytes(self_address) 773 self.iat = 1 if self_address.is_random else 0 774 self.ra = bytes(peer_address) 775 self.rat = 1 if peer_address.is_random else 0 776 else: 777 self.ra = bytes(self_address) 778 self.rat = 1 if self_address.is_random else 0 779 self.ia = bytes(peer_address) 780 self.iat = 1 if peer_address.is_random else 0 781 782 # Select the ECC key, TK and r initial value 783 if pairing_config.oob: 784 self.peer_oob_data = pairing_config.oob.peer_data 785 if pairing_config.sc: 786 if pairing_config.oob.our_context is None: 787 raise ValueError( 788 "oob pairing config requires a context when sc is True" 789 ) 790 self.r = pairing_config.oob.our_context.r 791 self.ecc_key = pairing_config.oob.our_context.ecc_key 792 if pairing_config.oob.legacy_context is not None: 793 self.tk = pairing_config.oob.legacy_context.tk 794 else: 795 if pairing_config.oob.legacy_context is None: 796 raise ValueError( 797 "oob pairing config requires a legacy context when sc is False" 798 ) 799 self.r = bytes(16) 800 self.ecc_key = manager.ecc_key 801 self.tk = pairing_config.oob.legacy_context.tk 802 else: 803 self.peer_oob_data = None 804 self.r = bytes(16) 805 self.ecc_key = manager.ecc_key 806 self.tk = bytes(16) 807 808 @property 809 def pkx(self) -> Tuple[bytes, bytes]: 810 return (self.ecc_key.x[::-1], self.peer_public_key_x) 811 812 @property 813 def pka(self) -> bytes: 814 return self.pkx[0 if self.is_initiator else 1] 815 816 @property 817 def pkb(self) -> bytes: 818 return self.pkx[0 if self.is_responder else 1] 819 820 @property 821 def nx(self) -> Tuple[bytes, bytes]: 822 assert self.peer_random_value 823 return (self.r, self.peer_random_value) 824 825 @property 826 def na(self) -> bytes: 827 return self.nx[0 if self.is_initiator else 1] 828 829 @property 830 def nb(self) -> bytes: 831 return self.nx[0 if self.is_responder else 1] 832 833 @property 834 def auth_req(self) -> int: 835 return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2) 836 837 def get_long_term_key(self, rand: bytes, ediv: int) -> Optional[bytes]: 838 if not self.sc and not self.completed: 839 if rand == self.ltk_rand and ediv == self.ltk_ediv: 840 return self.stk 841 else: 842 return self.ltk 843 844 return None 845 846 def decide_pairing_method( 847 self, 848 auth_req: int, 849 initiator_io_capability: int, 850 responder_io_capability: int, 851 ) -> None: 852 if self.connection.transport == BT_BR_EDR_TRANSPORT: 853 self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC 854 return 855 if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0): 856 self.pairing_method = PairingMethod.JUST_WORKS 857 return 858 859 details = self.PAIRING_METHODS[initiator_io_capability][responder_io_capability] # type: ignore[index] 860 if isinstance(details, tuple) and len(details) == 2: 861 # One entry for legacy pairing and one for secure connections 862 details = details[1 if self.sc else 0] 863 if isinstance(details, PairingMethod): 864 # Just a method ID 865 self.pairing_method = details 866 else: 867 # PASSKEY method, with a method ID and display/input flags 868 assert isinstance(details[0], PairingMethod) 869 self.pairing_method = details[0] 870 self.passkey_display = details[1 if self.is_initiator else 2] 871 872 def check_expected_value( 873 self, expected: bytes, received: bytes, error: int 874 ) -> bool: 875 logger.debug(f'expected={expected.hex()} got={received.hex()}') 876 if expected != received: 877 logger.info(color('pairing confirm/check mismatch', 'red')) 878 self.send_pairing_failed(error) 879 return False 880 return True 881 882 def prompt_user_for_confirmation(self, next_steps: Callable[[], None]) -> None: 883 async def prompt() -> None: 884 logger.debug('ask for confirmation') 885 try: 886 response = await self.pairing_config.delegate.confirm() 887 if response: 888 next_steps() 889 return 890 except Exception as error: 891 logger.warning(f'exception while confirm: {error}') 892 893 self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) 894 895 self.connection.abort_on('disconnection', prompt()) 896 897 def prompt_user_for_numeric_comparison( 898 self, code: int, next_steps: Callable[[], None] 899 ) -> None: 900 async def prompt() -> None: 901 logger.debug(f'verification code: {code}') 902 try: 903 response = await self.pairing_config.delegate.compare_numbers( 904 code, digits=6 905 ) 906 if response: 907 next_steps() 908 return 909 except Exception as error: 910 logger.warning(f'exception while prompting: {error}') 911 912 self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) 913 914 self.connection.abort_on('disconnection', prompt()) 915 916 def prompt_user_for_number(self, next_steps: Callable[[int], None]) -> None: 917 async def prompt() -> None: 918 logger.debug('prompting user for passkey') 919 try: 920 passkey = await self.pairing_config.delegate.get_number() 921 if passkey is None: 922 logger.debug('Passkey request rejected') 923 self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) 924 return 925 logger.debug(f'user input: {passkey}') 926 next_steps(passkey) 927 except Exception as error: 928 logger.warning(f'exception while prompting: {error}') 929 self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) 930 931 self.connection.abort_on('disconnection', prompt()) 932 933 def display_passkey(self) -> None: 934 # Generate random Passkey/PIN code 935 self.passkey = secrets.randbelow(1000000) 936 assert self.passkey is not None 937 logger.debug(f'Pairing PIN CODE: {self.passkey:06}') 938 self.passkey_ready.set() 939 940 # The value of TK is computed from the PIN code 941 if not self.sc: 942 self.tk = self.passkey.to_bytes(16, byteorder='little') 943 logger.debug(f'TK from passkey = {self.tk.hex()}') 944 945 try: 946 self.connection.abort_on( 947 'disconnection', 948 self.pairing_config.delegate.display_number(self.passkey, digits=6), 949 ) 950 except Exception as error: 951 logger.warning(f'exception while displaying number: {error}') 952 953 def input_passkey(self, next_steps: Optional[Callable[[], None]] = None) -> None: 954 # Prompt the user for the passkey displayed on the peer 955 def after_input(passkey: int) -> None: 956 self.passkey = passkey 957 958 if not self.sc: 959 self.tk = passkey.to_bytes(16, byteorder='little') 960 logger.debug(f'TK from passkey = {self.tk.hex()}') 961 962 self.passkey_ready.set() 963 964 if next_steps is not None: 965 next_steps() 966 967 self.prompt_user_for_number(after_input) 968 969 def display_or_input_passkey( 970 self, next_steps: Optional[Callable[[], None]] = None 971 ) -> None: 972 if self.passkey_display: 973 self.display_passkey() 974 if next_steps is not None: 975 next_steps() 976 else: 977 self.input_passkey(next_steps) 978 979 def send_command(self, command: SMP_Command) -> None: 980 self.manager.send_command(self.connection, command) 981 982 def send_pairing_failed(self, error: int) -> None: 983 self.send_command(SMP_Pairing_Failed_Command(reason=error)) 984 self.on_pairing_failure(error) 985 986 def send_pairing_request_command(self) -> None: 987 self.manager.on_session_start(self) 988 989 command = SMP_Pairing_Request_Command( 990 io_capability=self.io_capability, 991 oob_data_flag=self.oob_data_flag, 992 auth_req=self.auth_req, 993 maximum_encryption_key_size=16, 994 initiator_key_distribution=self.initiator_key_distribution, 995 responder_key_distribution=self.responder_key_distribution, 996 ) 997 self.preq = bytes(command) 998 self.send_command(command) 999 1000 def send_pairing_response_command(self) -> None: 1001 response = SMP_Pairing_Response_Command( 1002 io_capability=self.io_capability, 1003 oob_data_flag=self.oob_data_flag, 1004 auth_req=self.auth_req, 1005 maximum_encryption_key_size=16, 1006 initiator_key_distribution=self.initiator_key_distribution, 1007 responder_key_distribution=self.responder_key_distribution, 1008 ) 1009 self.pres = bytes(response) 1010 self.send_command(response) 1011 1012 def send_pairing_confirm_command(self) -> None: 1013 self.r = crypto.r() 1014 logger.debug(f'generated random: {self.r.hex()}') 1015 1016 if self.sc: 1017 1018 async def next_steps() -> None: 1019 if self.pairing_method in ( 1020 PairingMethod.JUST_WORKS, 1021 PairingMethod.NUMERIC_COMPARISON, 1022 ): 1023 z = 0 1024 elif self.pairing_method == PairingMethod.PASSKEY: 1025 # We need a passkey 1026 await self.passkey_ready.wait() 1027 assert self.passkey 1028 1029 z = 0x80 + ((self.passkey >> self.passkey_step) & 1) 1030 else: 1031 return 1032 1033 if self.is_initiator: 1034 confirm_value = crypto.f4(self.pka, self.pkb, self.r, bytes([z])) 1035 else: 1036 confirm_value = crypto.f4(self.pkb, self.pka, self.r, bytes([z])) 1037 1038 self.send_command( 1039 SMP_Pairing_Confirm_Command(confirm_value=confirm_value) 1040 ) 1041 1042 # Perform the next steps asynchronously in case we need to wait for input 1043 self.connection.abort_on('disconnection', next_steps()) 1044 else: 1045 confirm_value = crypto.c1( 1046 self.tk, 1047 self.r, 1048 self.preq, 1049 self.pres, 1050 self.iat, 1051 self.rat, 1052 self.ia, 1053 self.ra, 1054 ) 1055 1056 self.send_command(SMP_Pairing_Confirm_Command(confirm_value=confirm_value)) 1057 1058 def send_pairing_random_command(self) -> None: 1059 self.send_command(SMP_Pairing_Random_Command(random_value=self.r)) 1060 1061 def send_public_key_command(self) -> None: 1062 self.send_command( 1063 SMP_Pairing_Public_Key_Command( 1064 public_key_x=self.ecc_key.x[::-1], 1065 public_key_y=self.ecc_key.y[::-1], 1066 ) 1067 ) 1068 1069 def send_pairing_dhkey_check_command(self) -> None: 1070 self.send_command( 1071 SMP_Pairing_DHKey_Check_Command( 1072 dhkey_check=self.ea if self.is_initiator else self.eb 1073 ) 1074 ) 1075 1076 def send_identity_address_command(self) -> None: 1077 identity_address = { 1078 None: self.connection.self_address, 1079 Address.PUBLIC_DEVICE_ADDRESS: self.manager.device.public_address, 1080 Address.RANDOM_DEVICE_ADDRESS: self.manager.device.random_address, 1081 }[self.pairing_config.identity_address_type] 1082 self.send_command( 1083 SMP_Identity_Address_Information_Command( 1084 addr_type=identity_address.address_type, 1085 bd_addr=identity_address, 1086 ) 1087 ) 1088 1089 def start_encryption(self, key: bytes) -> None: 1090 # We can now encrypt the connection with the short term key, so that we can 1091 # distribute the long term and/or other keys over an encrypted connection 1092 self.manager.device.host.send_command_sync( 1093 HCI_LE_Enable_Encryption_Command( 1094 connection_handle=self.connection.handle, 1095 random_number=bytes(8), 1096 encrypted_diversifier=0, 1097 long_term_key=key, 1098 ) 1099 ) 1100 1101 @classmethod 1102 def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes: 1103 '''Derives Long Term Key from Link Key. 1104 1105 Args: 1106 link_key: BR/EDR Link Key bytes in little-endian. 1107 ct2: whether ct2 is supported on both devices. 1108 Returns: 1109 LE Long Tern Key bytes in little-endian. 1110 ''' 1111 ilk = ( 1112 crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key) 1113 if ct2 1114 else crypto.h6(link_key, b'tmp2') 1115 ) 1116 return crypto.h6(ilk, b'brle') 1117 1118 @classmethod 1119 def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes: 1120 '''Derives Link Key from Long Term Key. 1121 1122 Args: 1123 ltk: LE Long Term Key bytes in little-endian. 1124 ct2: whether ct2 is supported on both devices. 1125 Returns: 1126 BR/EDR Link Key bytes in little-endian. 1127 ''' 1128 ilk = ( 1129 crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk) 1130 if ct2 1131 else crypto.h6(ltk, b'tmp1') 1132 ) 1133 return crypto.h6(ilk, b'lebr') 1134 1135 async def get_link_key_and_derive_ltk(self) -> None: 1136 '''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.''' 1137 self.link_key = await self.manager.device.get_link_key( 1138 self.connection.peer_address 1139 ) 1140 if self.link_key is None: 1141 logging.warning( 1142 'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!' 1143 ) 1144 self.send_pairing_failed( 1145 SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR 1146 ) 1147 else: 1148 self.ltk = self.derive_ltk(self.link_key, self.ct2) 1149 1150 def distribute_keys(self) -> None: 1151 # Distribute the keys as required 1152 if self.is_initiator: 1153 # CTKD: Derive LTK from LinkKey 1154 if ( 1155 self.connection.transport == BT_BR_EDR_TRANSPORT 1156 and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG 1157 ): 1158 self.ctkd_task = self.connection.abort_on( 1159 'disconnection', self.get_link_key_and_derive_ltk() 1160 ) 1161 elif not self.sc: 1162 # Distribute the LTK, EDIV and RAND 1163 if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: 1164 self.send_command( 1165 SMP_Encryption_Information_Command(long_term_key=self.ltk) 1166 ) 1167 self.send_command( 1168 SMP_Master_Identification_Command( 1169 ediv=self.ltk_ediv, rand=self.ltk_rand 1170 ) 1171 ) 1172 1173 # Distribute IRK & BD ADDR 1174 if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: 1175 self.send_command( 1176 SMP_Identity_Information_Command( 1177 identity_resolving_key=self.manager.device.irk 1178 ) 1179 ) 1180 self.send_identity_address_command() 1181 1182 # Distribute CSRK 1183 csrk = bytes(16) # FIXME: testing 1184 if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 1185 self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) 1186 1187 # CTKD, calculate BR/EDR link key 1188 if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: 1189 self.link_key = self.derive_link_key(self.ltk, self.ct2) 1190 1191 else: 1192 # CTKD: Derive LTK from LinkKey 1193 if ( 1194 self.connection.transport == BT_BR_EDR_TRANSPORT 1195 and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG 1196 ): 1197 self.ctkd_task = self.connection.abort_on( 1198 'disconnection', self.get_link_key_and_derive_ltk() 1199 ) 1200 # Distribute the LTK, EDIV and RAND 1201 elif not self.sc: 1202 if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: 1203 self.send_command( 1204 SMP_Encryption_Information_Command(long_term_key=self.ltk) 1205 ) 1206 self.send_command( 1207 SMP_Master_Identification_Command( 1208 ediv=self.ltk_ediv, rand=self.ltk_rand 1209 ) 1210 ) 1211 1212 # Distribute IRK & BD ADDR 1213 if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: 1214 self.send_command( 1215 SMP_Identity_Information_Command( 1216 identity_resolving_key=self.manager.device.irk 1217 ) 1218 ) 1219 self.send_identity_address_command() 1220 1221 # Distribute CSRK 1222 csrk = bytes(16) # FIXME: testing 1223 if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 1224 self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) 1225 1226 # CTKD, calculate BR/EDR link key 1227 if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: 1228 self.link_key = self.derive_link_key(self.ltk, self.ct2) 1229 1230 def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None: 1231 # Set our expectations for what to wait for in the key distribution phase 1232 self.peer_expected_distributions = [] 1233 if not self.sc and self.connection.transport == BT_LE_TRANSPORT: 1234 if key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0: 1235 self.peer_expected_distributions.append( 1236 SMP_Encryption_Information_Command 1237 ) 1238 self.peer_expected_distributions.append( 1239 SMP_Master_Identification_Command 1240 ) 1241 if key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0: 1242 self.peer_expected_distributions.append(SMP_Identity_Information_Command) 1243 self.peer_expected_distributions.append( 1244 SMP_Identity_Address_Information_Command 1245 ) 1246 if key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0: 1247 self.peer_expected_distributions.append(SMP_Signing_Information_Command) 1248 logger.debug( 1249 'expecting distributions: ' 1250 f'{[c.__name__ for c in self.peer_expected_distributions]}' 1251 ) 1252 1253 def check_key_distribution(self, command_class: Type[SMP_Command]) -> None: 1254 # First, check that the connection is encrypted 1255 if not self.connection.is_encrypted: 1256 logger.warning( 1257 color('received key distribution on a non-encrypted connection', 'red') 1258 ) 1259 self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) 1260 return 1261 1262 # Check that this command class is expected 1263 if command_class in self.peer_expected_distributions: 1264 self.peer_expected_distributions.remove(command_class) 1265 logger.debug( 1266 'remaining distributions: ' 1267 f'{[c.__name__ for c in self.peer_expected_distributions]}' 1268 ) 1269 if not self.peer_expected_distributions: 1270 self.on_peer_key_distribution_complete() 1271 else: 1272 logger.warning( 1273 color( 1274 '!!! unexpected key distribution command: ' 1275 f'{command_class.__name__}', 1276 'red', 1277 ) 1278 ) 1279 self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) 1280 1281 async def pair(self) -> None: 1282 # Start pairing as an initiator 1283 # TODO: check that this session isn't already active 1284 1285 # Send the pairing request to start the process 1286 self.send_pairing_request_command() 1287 1288 # Wait for the pairing process to finish 1289 assert self.pairing_result 1290 await self.connection.abort_on('disconnection', self.pairing_result) 1291 1292 def on_disconnection(self, _: int) -> None: 1293 self.connection.remove_listener('disconnection', self.on_disconnection) 1294 self.connection.remove_listener( 1295 'connection_encryption_change', self.on_connection_encryption_change 1296 ) 1297 self.connection.remove_listener( 1298 'connection_encryption_key_refresh', 1299 self.on_connection_encryption_key_refresh, 1300 ) 1301 self.manager.on_session_end(self) 1302 1303 def on_peer_key_distribution_complete(self) -> None: 1304 # The initiator can now send its keys 1305 if self.is_initiator: 1306 self.distribute_keys() 1307 1308 self.connection.abort_on('disconnection', self.on_pairing()) 1309 1310 def on_connection_encryption_change(self) -> None: 1311 if self.connection.is_encrypted: 1312 if self.is_responder: 1313 # The responder distributes its keys first, the initiator later 1314 self.distribute_keys() 1315 1316 # If we're not expecting key distributions from the peer, we're done 1317 if not self.peer_expected_distributions: 1318 self.on_peer_key_distribution_complete() 1319 1320 def on_connection_encryption_key_refresh(self) -> None: 1321 # Do as if the connection had just been encrypted 1322 self.on_connection_encryption_change() 1323 1324 async def on_pairing(self) -> None: 1325 logger.debug('pairing complete') 1326 1327 if self.completed: 1328 return 1329 1330 self.completed = True 1331 1332 if self.pairing_result is not None and not self.pairing_result.done(): 1333 self.pairing_result.set_result(None) 1334 1335 # Use the peer address from the pairing protocol or the connection 1336 if self.peer_bd_addr is not None: 1337 peer_address = self.peer_bd_addr 1338 else: 1339 peer_address = self.connection.peer_address 1340 1341 # Wait for link key fetch and key derivation 1342 if self.ctkd_task is not None: 1343 await self.ctkd_task 1344 self.ctkd_task = None 1345 1346 # Create an object to hold the keys 1347 keys = PairingKeys() 1348 keys.address_type = peer_address.address_type 1349 authenticated = self.pairing_method != PairingMethod.JUST_WORKS 1350 if self.sc or self.connection.transport == BT_BR_EDR_TRANSPORT: 1351 keys.ltk = PairingKeys.Key(value=self.ltk, authenticated=authenticated) 1352 else: 1353 our_ltk_key = PairingKeys.Key( 1354 value=self.ltk, 1355 authenticated=authenticated, 1356 ediv=self.ltk_ediv, 1357 rand=self.ltk_rand, 1358 ) 1359 peer_ltk_key = PairingKeys.Key( 1360 value=self.peer_ltk, 1361 authenticated=authenticated, 1362 ediv=self.peer_ediv, 1363 rand=self.peer_rand, 1364 ) 1365 if self.is_initiator: 1366 keys.ltk_central = peer_ltk_key 1367 keys.ltk_peripheral = our_ltk_key 1368 else: 1369 keys.ltk_central = our_ltk_key 1370 keys.ltk_peripheral = peer_ltk_key 1371 if self.peer_identity_resolving_key is not None: 1372 keys.irk = PairingKeys.Key( 1373 value=self.peer_identity_resolving_key, authenticated=authenticated 1374 ) 1375 if self.peer_signature_key is not None: 1376 keys.csrk = PairingKeys.Key( 1377 value=self.peer_signature_key, authenticated=authenticated 1378 ) 1379 if self.link_key is not None: 1380 keys.link_key = PairingKeys.Key( 1381 value=self.link_key, authenticated=authenticated 1382 ) 1383 await self.manager.on_pairing(self, peer_address, keys) 1384 1385 def on_pairing_failure(self, reason: int) -> None: 1386 logger.warning(f'pairing failure ({error_name(reason)})') 1387 1388 if self.completed: 1389 return 1390 1391 self.completed = True 1392 1393 error = ProtocolError(reason, 'smp', error_name(reason)) 1394 if self.pairing_result is not None and not self.pairing_result.done(): 1395 self.pairing_result.set_exception(error) 1396 self.manager.on_pairing_failure(self, reason) 1397 1398 def on_smp_command(self, command: SMP_Command) -> None: 1399 # Find the handler method 1400 handler_name = f'on_{command.name.lower()}' 1401 handler = getattr(self, handler_name, None) 1402 if handler is not None: 1403 try: 1404 handler(command) 1405 except Exception as error: 1406 logger.exception(f'{color("!!! Exception in handler:", "red")} {error}') 1407 response = SMP_Pairing_Failed_Command( 1408 reason=SMP_UNSPECIFIED_REASON_ERROR 1409 ) 1410 self.send_command(response) 1411 else: 1412 logger.error(color('SMP command not handled???', 'red')) 1413 1414 def on_smp_pairing_request_command( 1415 self, command: SMP_Pairing_Request_Command 1416 ) -> None: 1417 self.connection.abort_on( 1418 'disconnection', self.on_smp_pairing_request_command_async(command) 1419 ) 1420 1421 async def on_smp_pairing_request_command_async( 1422 self, command: SMP_Pairing_Request_Command 1423 ) -> None: 1424 # Check if the request should proceed 1425 try: 1426 accepted = await self.pairing_config.delegate.accept() 1427 except Exception as error: 1428 logger.warning(f'exception while accepting: {error}') 1429 accepted = False 1430 if not accepted: 1431 logger.debug('pairing rejected by delegate') 1432 self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR) 1433 return 1434 1435 # Save the request 1436 self.preq = bytes(command) 1437 1438 # Bonding and SC require both sides to request/support it 1439 self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) 1440 self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) 1441 self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) 1442 1443 # Infer the pairing method 1444 if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( 1445 not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0) 1446 ): 1447 # Use OOB 1448 self.pairing_method = PairingMethod.OOB 1449 if not self.sc and self.tk is None: 1450 # For legacy OOB, TK is required. 1451 logger.warning("legacy OOB without TK") 1452 self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) 1453 return 1454 if command.oob_data_flag == 0: 1455 # The peer doesn't have OOB data, use r=0 1456 self.r = bytes(16) 1457 else: 1458 # Decide which pairing method to use from the IO capability 1459 self.decide_pairing_method( 1460 command.auth_req, 1461 command.io_capability, 1462 self.io_capability, 1463 ) 1464 1465 logger.debug(f'pairing method: {self.pairing_method.name}') 1466 1467 # Key distribution 1468 ( 1469 self.initiator_key_distribution, 1470 self.responder_key_distribution, 1471 ) = await self.pairing_config.delegate.key_distribution_response( 1472 command.initiator_key_distribution, command.responder_key_distribution 1473 ) 1474 self.compute_peer_expected_distributions(self.initiator_key_distribution) 1475 1476 # The pairing is now starting 1477 self.manager.on_session_start(self) 1478 1479 # Display a passkey if we need to 1480 if not self.sc: 1481 if self.pairing_method == PairingMethod.PASSKEY and self.passkey_display: 1482 self.display_passkey() 1483 1484 # Respond 1485 self.send_pairing_response_command() 1486 1487 # Vol 3, Part C, 5.2.2.1.3 1488 # CTKD over BR/EDR should happen after the connection has been encrypted, 1489 # so when receiving pairing requests, responder should start distributing keys 1490 if ( 1491 self.connection.transport == BT_BR_EDR_TRANSPORT 1492 and self.connection.is_encrypted 1493 and self.is_responder 1494 and accepted 1495 ): 1496 self.distribute_keys() 1497 1498 def on_smp_pairing_response_command( 1499 self, command: SMP_Pairing_Response_Command 1500 ) -> None: 1501 if self.is_responder: 1502 logger.warning(color('received pairing response as a responder', 'red')) 1503 return 1504 1505 # Save the response 1506 self.pres = bytes(command) 1507 self.peer_io_capability = command.io_capability 1508 1509 # Bonding and SC require both sides to request/support it 1510 self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) 1511 self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) 1512 1513 # Infer the pairing method 1514 if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or ( 1515 not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0) 1516 ): 1517 # Use OOB 1518 self.pairing_method = PairingMethod.OOB 1519 if not self.sc and self.tk is None: 1520 # For legacy OOB, TK is required. 1521 logger.warning("legacy OOB without TK") 1522 self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) 1523 return 1524 if command.oob_data_flag == 0: 1525 # The peer doesn't have OOB data, use r=0 1526 self.r = bytes(16) 1527 else: 1528 # Decide which pairing method to use from the IO capability 1529 self.decide_pairing_method( 1530 command.auth_req, self.io_capability, command.io_capability 1531 ) 1532 1533 logger.debug(f'pairing method: {self.pairing_method.name}') 1534 1535 # Key distribution 1536 if ( 1537 command.initiator_key_distribution & ~self.initiator_key_distribution != 0 1538 ) or ( 1539 command.responder_key_distribution & ~self.responder_key_distribution != 0 1540 ): 1541 # The response isn't a subset of the request 1542 self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR) 1543 return 1544 self.initiator_key_distribution = command.initiator_key_distribution 1545 self.responder_key_distribution = command.responder_key_distribution 1546 self.compute_peer_expected_distributions(self.responder_key_distribution) 1547 1548 # Start phase 2 1549 if self.pairing_method == PairingMethod.CTKD_OVER_CLASSIC: 1550 # Authentication is already done in SMP, so remote shall start keys distribution immediately 1551 return 1552 elif self.sc: 1553 if self.pairing_method == PairingMethod.PASSKEY: 1554 self.display_or_input_passkey() 1555 1556 self.send_public_key_command() 1557 else: 1558 if self.pairing_method == PairingMethod.PASSKEY: 1559 self.display_or_input_passkey(self.send_pairing_confirm_command) 1560 else: 1561 self.send_pairing_confirm_command() 1562 1563 def on_smp_pairing_confirm_command_legacy( 1564 self, _: SMP_Pairing_Confirm_Command 1565 ) -> None: 1566 if self.is_initiator: 1567 self.send_pairing_random_command() 1568 else: 1569 # If the method is PASSKEY, now is the time to input the code 1570 if ( 1571 self.pairing_method == PairingMethod.PASSKEY 1572 and not self.passkey_display 1573 ): 1574 self.input_passkey(self.send_pairing_confirm_command) 1575 else: 1576 self.send_pairing_confirm_command() 1577 1578 def on_smp_pairing_confirm_command_secure_connections( 1579 self, _: SMP_Pairing_Confirm_Command 1580 ) -> None: 1581 if self.pairing_method in ( 1582 PairingMethod.JUST_WORKS, 1583 PairingMethod.NUMERIC_COMPARISON, 1584 ): 1585 if self.is_initiator: 1586 self.r = crypto.r() 1587 self.send_pairing_random_command() 1588 elif self.pairing_method == PairingMethod.PASSKEY: 1589 if self.is_initiator: 1590 self.send_pairing_random_command() 1591 else: 1592 self.send_pairing_confirm_command() 1593 1594 def on_smp_pairing_confirm_command( 1595 self, command: SMP_Pairing_Confirm_Command 1596 ) -> None: 1597 self.confirm_value = command.confirm_value 1598 if self.sc: 1599 self.on_smp_pairing_confirm_command_secure_connections(command) 1600 else: 1601 self.on_smp_pairing_confirm_command_legacy(command) 1602 1603 def on_smp_pairing_random_command_legacy( 1604 self, command: SMP_Pairing_Random_Command 1605 ) -> None: 1606 # Check that the confirmation values match 1607 confirm_verifier = crypto.c1( 1608 self.tk, 1609 command.random_value, 1610 self.preq, 1611 self.pres, 1612 self.iat, 1613 self.rat, 1614 self.ia, 1615 self.ra, 1616 ) 1617 assert self.confirm_value 1618 if not self.check_expected_value( 1619 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1620 ): 1621 return 1622 1623 # Compute STK 1624 if self.is_initiator: 1625 mrand = self.r 1626 srand = command.random_value 1627 else: 1628 srand = self.r 1629 mrand = command.random_value 1630 self.stk = crypto.s1(self.tk, srand, mrand) 1631 logger.debug(f'STK = {self.stk.hex()}') 1632 1633 # Generate LTK 1634 self.ltk = crypto.r() 1635 1636 if self.is_initiator: 1637 self.start_encryption(self.stk) 1638 else: 1639 self.send_pairing_random_command() 1640 1641 def on_smp_pairing_random_command_secure_connections( 1642 self, command: SMP_Pairing_Random_Command 1643 ) -> None: 1644 if self.pairing_method == PairingMethod.PASSKEY and self.passkey is None: 1645 logger.warning('no passkey entered, ignoring command') 1646 return 1647 1648 # pylint: disable=too-many-return-statements 1649 if self.is_initiator: 1650 if self.pairing_method in ( 1651 PairingMethod.JUST_WORKS, 1652 PairingMethod.NUMERIC_COMPARISON, 1653 ): 1654 assert self.confirm_value 1655 # Check that the random value matches what was committed to earlier 1656 confirm_verifier = crypto.f4( 1657 self.pkb, self.pka, command.random_value, bytes([0]) 1658 ) 1659 if not self.check_expected_value( 1660 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1661 ): 1662 return 1663 elif self.pairing_method == PairingMethod.PASSKEY: 1664 assert self.passkey and self.confirm_value 1665 # Check that the random value matches what was committed to earlier 1666 confirm_verifier = crypto.f4( 1667 self.pkb, 1668 self.pka, 1669 command.random_value, 1670 bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), 1671 ) 1672 if not self.check_expected_value( 1673 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1674 ): 1675 return 1676 1677 # Move on to the next iteration 1678 self.passkey_step += 1 1679 logger.debug(f'passkey finished step {self.passkey_step} of 20') 1680 if self.passkey_step < 20: 1681 self.send_pairing_confirm_command() 1682 return 1683 elif self.pairing_method != PairingMethod.OOB: 1684 return 1685 else: 1686 if self.pairing_method in ( 1687 PairingMethod.JUST_WORKS, 1688 PairingMethod.NUMERIC_COMPARISON, 1689 PairingMethod.OOB, 1690 ): 1691 self.send_pairing_random_command() 1692 elif self.pairing_method == PairingMethod.PASSKEY: 1693 assert self.passkey and self.confirm_value 1694 # Check that the random value matches what was committed to earlier 1695 confirm_verifier = crypto.f4( 1696 self.pka, 1697 self.pkb, 1698 command.random_value, 1699 bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), 1700 ) 1701 if not self.check_expected_value( 1702 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1703 ): 1704 return 1705 1706 self.send_pairing_random_command() 1707 1708 # Move on to the next iteration 1709 self.passkey_step += 1 1710 logger.debug(f'passkey finished step {self.passkey_step} of 20') 1711 if self.passkey_step < 20: 1712 self.r = crypto.r() 1713 return 1714 else: 1715 return 1716 1717 # Compute the MacKey and LTK 1718 a = self.ia + bytes([self.iat]) 1719 b = self.ra + bytes([self.rat]) 1720 (mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b) 1721 1722 # Compute the DH Key checks 1723 if self.pairing_method in ( 1724 PairingMethod.JUST_WORKS, 1725 PairingMethod.NUMERIC_COMPARISON, 1726 PairingMethod.OOB, 1727 ): 1728 ra = bytes(16) 1729 rb = ra 1730 elif self.pairing_method == PairingMethod.PASSKEY: 1731 assert self.passkey 1732 ra = self.passkey.to_bytes(16, byteorder='little') 1733 rb = ra 1734 else: 1735 return 1736 1737 assert self.preq and self.pres 1738 io_cap_a = self.preq[1:4] 1739 io_cap_b = self.pres[1:4] 1740 self.ea = crypto.f6(mac_key, self.na, self.nb, rb, io_cap_a, a, b) 1741 self.eb = crypto.f6(mac_key, self.nb, self.na, ra, io_cap_b, b, a) 1742 1743 # Next steps to be performed after possible user confirmation 1744 def next_steps() -> None: 1745 # The initiator sends the DH Key check to the responder 1746 if self.is_initiator: 1747 self.send_pairing_dhkey_check_command() 1748 else: 1749 if self.wait_before_continuing: 1750 self.wait_before_continuing.set_result(None) 1751 1752 # Prompt the user for confirmation if needed 1753 if self.pairing_method in ( 1754 PairingMethod.JUST_WORKS, 1755 PairingMethod.NUMERIC_COMPARISON, 1756 ): 1757 # Compute the 6-digit code 1758 code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000 1759 1760 # Ask for user confirmation 1761 self.wait_before_continuing = asyncio.get_running_loop().create_future() 1762 if self.pairing_method == PairingMethod.JUST_WORKS: 1763 self.prompt_user_for_confirmation(next_steps) 1764 else: 1765 self.prompt_user_for_numeric_comparison(code, next_steps) 1766 else: 1767 next_steps() 1768 1769 def on_smp_pairing_random_command( 1770 self, command: SMP_Pairing_Random_Command 1771 ) -> None: 1772 self.peer_random_value = command.random_value 1773 if self.sc: 1774 self.on_smp_pairing_random_command_secure_connections(command) 1775 else: 1776 self.on_smp_pairing_random_command_legacy(command) 1777 1778 def on_smp_pairing_public_key_command( 1779 self, command: SMP_Pairing_Public_Key_Command 1780 ) -> None: 1781 # Store the public key so that we can compute the confirmation value later 1782 self.peer_public_key_x = command.public_key_x 1783 self.peer_public_key_y = command.public_key_y 1784 1785 # Compute the DH key 1786 self.dh_key = self.ecc_key.dh( 1787 command.public_key_x[::-1], 1788 command.public_key_y[::-1], 1789 )[::-1] 1790 logger.debug(f'DH key: {self.dh_key.hex()}') 1791 1792 if self.pairing_method == PairingMethod.OOB: 1793 # Check against shared OOB data 1794 if self.peer_oob_data: 1795 confirm_verifier = crypto.f4( 1796 self.peer_public_key_x, 1797 self.peer_public_key_x, 1798 self.peer_oob_data.r, 1799 bytes(1), 1800 ) 1801 if not self.check_expected_value( 1802 self.peer_oob_data.c, 1803 confirm_verifier, 1804 SMP_CONFIRM_VALUE_FAILED_ERROR, 1805 ): 1806 return 1807 1808 if self.is_initiator: 1809 if self.pairing_method == PairingMethod.OOB: 1810 self.send_pairing_random_command() 1811 else: 1812 self.send_pairing_confirm_command() 1813 else: 1814 if self.pairing_method == PairingMethod.PASSKEY: 1815 self.display_or_input_passkey() 1816 1817 # Send our public key back to the initiator 1818 self.send_public_key_command() 1819 1820 if self.pairing_method in ( 1821 PairingMethod.JUST_WORKS, 1822 PairingMethod.NUMERIC_COMPARISON, 1823 PairingMethod.OOB, 1824 ): 1825 # We can now send the confirmation value 1826 self.send_pairing_confirm_command() 1827 1828 def on_smp_pairing_dhkey_check_command( 1829 self, command: SMP_Pairing_DHKey_Check_Command 1830 ) -> None: 1831 # Check that what we received matches what we computed earlier 1832 expected = self.eb if self.is_initiator else self.ea 1833 assert expected 1834 if not self.check_expected_value( 1835 expected, command.dhkey_check, SMP_DHKEY_CHECK_FAILED_ERROR 1836 ): 1837 return 1838 1839 if self.is_responder: 1840 if self.wait_before_continuing is not None: 1841 1842 async def next_steps() -> None: 1843 assert self.wait_before_continuing 1844 await self.wait_before_continuing 1845 self.wait_before_continuing = None 1846 self.send_pairing_dhkey_check_command() 1847 1848 self.connection.abort_on('disconnection', next_steps()) 1849 else: 1850 self.send_pairing_dhkey_check_command() 1851 else: 1852 self.start_encryption(self.ltk) 1853 1854 def on_smp_pairing_failed_command( 1855 self, command: SMP_Pairing_Failed_Command 1856 ) -> None: 1857 self.on_pairing_failure(command.reason) 1858 1859 def on_smp_encryption_information_command( 1860 self, command: SMP_Encryption_Information_Command 1861 ) -> None: 1862 self.peer_ltk = command.long_term_key 1863 self.check_key_distribution(SMP_Encryption_Information_Command) 1864 1865 def on_smp_master_identification_command( 1866 self, command: SMP_Master_Identification_Command 1867 ) -> None: 1868 self.peer_ediv = command.ediv 1869 self.peer_rand = command.rand 1870 self.check_key_distribution(SMP_Master_Identification_Command) 1871 1872 def on_smp_identity_information_command( 1873 self, command: SMP_Identity_Information_Command 1874 ) -> None: 1875 self.peer_identity_resolving_key = command.identity_resolving_key 1876 self.check_key_distribution(SMP_Identity_Information_Command) 1877 1878 def on_smp_identity_address_information_command( 1879 self, command: SMP_Identity_Address_Information_Command 1880 ) -> None: 1881 self.peer_bd_addr = command.bd_addr 1882 self.check_key_distribution(SMP_Identity_Address_Information_Command) 1883 1884 def on_smp_signing_information_command( 1885 self, command: SMP_Signing_Information_Command 1886 ) -> None: 1887 self.peer_signature_key = command.signature_key 1888 self.check_key_distribution(SMP_Signing_Information_Command) 1889 1890 1891# ----------------------------------------------------------------------------- 1892class Manager(EventEmitter): 1893 ''' 1894 Implements the Initiator and Responder roles of the Security Manager Protocol 1895 ''' 1896 1897 device: Device 1898 sessions: Dict[int, Session] 1899 pairing_config_factory: Callable[[Connection], PairingConfig] 1900 session_proxy: Type[Session] 1901 _ecc_key: Optional[crypto.EccKey] 1902 1903 def __init__( 1904 self, 1905 device: Device, 1906 pairing_config_factory: Callable[[Connection], PairingConfig], 1907 ) -> None: 1908 super().__init__() 1909 self.device = device 1910 self.sessions = {} 1911 self._ecc_key = None 1912 self.pairing_config_factory = pairing_config_factory 1913 self.session_proxy = Session 1914 1915 def send_command(self, connection: Connection, command: SMP_Command) -> None: 1916 logger.debug( 1917 f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] ' 1918 f'{connection.peer_address}: {command}' 1919 ) 1920 cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID 1921 connection.send_l2cap_pdu(cid, command.to_bytes()) 1922 1923 def on_smp_security_request_command( 1924 self, connection: Connection, request: SMP_Security_Request_Command 1925 ) -> None: 1926 connection.emit('security_request', request.auth_req) 1927 1928 def on_smp_pdu(self, connection: Connection, pdu: bytes) -> None: 1929 # Parse the L2CAP payload into an SMP Command object 1930 command = SMP_Command.from_bytes(pdu) 1931 logger.debug( 1932 f'<<< Received SMP Command on connection [0x{connection.handle:04X}] ' 1933 f'{connection.peer_address}: {command}' 1934 ) 1935 1936 # Security request is more than just pairing, so let applications handle them 1937 if command.code == SMP_SECURITY_REQUEST_COMMAND: 1938 self.on_smp_security_request_command( 1939 connection, cast(SMP_Security_Request_Command, command) 1940 ) 1941 return 1942 1943 # Look for a session with this connection, and create one if none exists 1944 if not (session := self.sessions.get(connection.handle)): 1945 if connection.role == BT_CENTRAL_ROLE: 1946 logger.warning('Remote starts pairing as Peripheral!') 1947 pairing_config = self.pairing_config_factory(connection) 1948 session = self.session_proxy( 1949 self, connection, pairing_config, is_initiator=False 1950 ) 1951 self.sessions[connection.handle] = session 1952 1953 # Delegate the handling of the command to the session 1954 session.on_smp_command(command) 1955 1956 @property 1957 def ecc_key(self) -> crypto.EccKey: 1958 if self._ecc_key is None: 1959 self._ecc_key = crypto.EccKey.generate() 1960 assert self._ecc_key 1961 return self._ecc_key 1962 1963 async def pair(self, connection: Connection) -> None: 1964 # TODO: check if there's already a session for this connection 1965 if connection.role != BT_CENTRAL_ROLE: 1966 logger.warning('Start pairing as Peripheral!') 1967 pairing_config = self.pairing_config_factory(connection) 1968 session = self.session_proxy( 1969 self, connection, pairing_config, is_initiator=True 1970 ) 1971 self.sessions[connection.handle] = session 1972 return await session.pair() 1973 1974 def request_pairing(self, connection: Connection) -> None: 1975 pairing_config = self.pairing_config_factory(connection) 1976 if pairing_config: 1977 auth_req = smp_auth_req( 1978 pairing_config.bonding, 1979 pairing_config.mitm, 1980 pairing_config.sc, 1981 False, 1982 False, 1983 ) 1984 else: 1985 auth_req = 0 1986 self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req)) 1987 1988 def on_session_start(self, session: Session) -> None: 1989 self.device.on_pairing_start(session.connection) 1990 1991 async def on_pairing( 1992 self, session: Session, identity_address: Optional[Address], keys: PairingKeys 1993 ) -> None: 1994 # Store the keys in the key store 1995 if self.device.keystore and identity_address is not None: 1996 # Make sure on_pairing emits after key update. 1997 await self.device.update_keys(str(identity_address), keys) 1998 # Notify the device 1999 self.device.on_pairing(session.connection, identity_address, keys, session.sc) 2000 2001 def on_pairing_failure(self, session: Session, reason: int) -> None: 2002 self.device.on_pairing_failure(session.connection, reason) 2003 2004 def on_session_end(self, session: Session) -> None: 2005 logger.debug(f'session end for connection 0x{session.connection.handle:04X}') 2006 if session.connection.handle in self.sessions: 2007 del self.sessions[session.connection.handle] 2008 2009 def get_long_term_key( 2010 self, connection: Connection, rand: bytes, ediv: int 2011 ) -> Optional[bytes]: 2012 if session := self.sessions.get(connection.handle): 2013 return session.get_long_term_key(rand, ediv) 2014 2015 return None 2016