1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# SMP - Security Manager Protocol 17# 18# See Bluetooth spec @ Vol 3, Part H 19# 20# ----------------------------------------------------------------------------- 21 22# ----------------------------------------------------------------------------- 23# Imports 24# ----------------------------------------------------------------------------- 25from __future__ import annotations 26import logging 27import asyncio 28import secrets 29from typing import Dict, Optional, Type 30 31from pyee import EventEmitter 32 33from .colors import color 34from .hci import Address, HCI_LE_Enable_Encryption_Command, HCI_Object, key_with_value 35from .core import ( 36 BT_BR_EDR_TRANSPORT, 37 BT_CENTRAL_ROLE, 38 BT_LE_TRANSPORT, 39 ProtocolError, 40 name_or_number, 41) 42from .keys import PairingKeys 43from . import crypto 44 45 46# ----------------------------------------------------------------------------- 47# Logging 48# ----------------------------------------------------------------------------- 49logger = logging.getLogger(__name__) 50 51 52# ----------------------------------------------------------------------------- 53# Constants 54# ----------------------------------------------------------------------------- 55# fmt: off 56# pylint: disable=line-too-long 57 58SMP_CID = 0x06 59SMP_BR_CID = 0x07 60 61SMP_PAIRING_REQUEST_COMMAND = 0x01 62SMP_PAIRING_RESPONSE_COMMAND = 0x02 63SMP_PAIRING_CONFIRM_COMMAND = 0x03 64SMP_PAIRING_RANDOM_COMMAND = 0x04 65SMP_PAIRING_FAILED_COMMAND = 0x05 66SMP_ENCRYPTION_INFORMATION_COMMAND = 0x06 67SMP_MASTER_IDENTIFICATION_COMMAND = 0x07 68SMP_IDENTITY_INFORMATION_COMMAND = 0x08 69SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND = 0x09 70SMP_SIGNING_INFORMATION_COMMAND = 0x0A 71SMP_SECURITY_REQUEST_COMMAND = 0x0B 72SMP_PAIRING_PUBLIC_KEY_COMMAND = 0x0C 73SMP_PAIRING_DHKEY_CHECK_COMMAND = 0x0D 74SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E 75 76SMP_COMMAND_NAMES = { 77 SMP_PAIRING_REQUEST_COMMAND: 'SMP_PAIRING_REQUEST_COMMAND', 78 SMP_PAIRING_RESPONSE_COMMAND: 'SMP_PAIRING_RESPONSE_COMMAND', 79 SMP_PAIRING_CONFIRM_COMMAND: 'SMP_PAIRING_CONFIRM_COMMAND', 80 SMP_PAIRING_RANDOM_COMMAND: 'SMP_PAIRING_RANDOM_COMMAND', 81 SMP_PAIRING_FAILED_COMMAND: 'SMP_PAIRING_FAILED_COMMAND', 82 SMP_ENCRYPTION_INFORMATION_COMMAND: 'SMP_ENCRYPTION_INFORMATION_COMMAND', 83 SMP_MASTER_IDENTIFICATION_COMMAND: 'SMP_MASTER_IDENTIFICATION_COMMAND', 84 SMP_IDENTITY_INFORMATION_COMMAND: 'SMP_IDENTITY_INFORMATION_COMMAND', 85 SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND: 'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND', 86 SMP_SIGNING_INFORMATION_COMMAND: 'SMP_SIGNING_INFORMATION_COMMAND', 87 SMP_SECURITY_REQUEST_COMMAND: 'SMP_SECURITY_REQUEST_COMMAND', 88 SMP_PAIRING_PUBLIC_KEY_COMMAND: 'SMP_PAIRING_PUBLIC_KEY_COMMAND', 89 SMP_PAIRING_DHKEY_CHECK_COMMAND: 'SMP_PAIRING_DHKEY_CHECK_COMMAND', 90 SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND' 91} 92 93SMP_DISPLAY_ONLY_IO_CAPABILITY = 0x00 94SMP_DISPLAY_YES_NO_IO_CAPABILITY = 0x01 95SMP_KEYBOARD_ONLY_IO_CAPABILITY = 0x02 96SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03 97SMP_KEYBOARD_DISPLAY_IO_CAPABILITY = 0x04 98 99SMP_IO_CAPABILITY_NAMES = { 100 SMP_DISPLAY_ONLY_IO_CAPABILITY: 'SMP_DISPLAY_ONLY_IO_CAPABILITY', 101 SMP_DISPLAY_YES_NO_IO_CAPABILITY: 'SMP_DISPLAY_YES_NO_IO_CAPABILITY', 102 SMP_KEYBOARD_ONLY_IO_CAPABILITY: 'SMP_KEYBOARD_ONLY_IO_CAPABILITY', 103 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY', 104 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: 'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY' 105} 106 107SMP_PASSKEY_ENTRY_FAILED_ERROR = 0x01 108SMP_OOB_NOT_AVAILABLE_ERROR = 0x02 109SMP_AUTHENTICATION_REQUIREMENTS_ERROR = 0x03 110SMP_CONFIRM_VALUE_FAILED_ERROR = 0x04 111SMP_PAIRING_NOT_SUPPORTED_ERROR = 0x05 112SMP_ENCRYPTION_KEY_SIZE_ERROR = 0x06 113SMP_COMMAND_NOT_SUPPORTED_ERROR = 0x07 114SMP_UNSPECIFIED_REASON_ERROR = 0x08 115SMP_REPEATED_ATTEMPTS_ERROR = 0x09 116SMP_INVALID_PARAMETERS_ERROR = 0x0A 117SMP_DHKEY_CHECK_FAILED_ERROR = 0x0B 118SMP_NUMERIC_COMPARISON_FAILED_ERROR = 0x0C 119SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR = 0x0D 120SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E 121 122SMP_ERROR_NAMES = { 123 SMP_PASSKEY_ENTRY_FAILED_ERROR: 'SMP_PASSKEY_ENTRY_FAILED_ERROR', 124 SMP_OOB_NOT_AVAILABLE_ERROR: 'SMP_OOB_NOT_AVAILABLE_ERROR', 125 SMP_AUTHENTICATION_REQUIREMENTS_ERROR: 'SMP_AUTHENTICATION_REQUIREMENTS_ERROR', 126 SMP_CONFIRM_VALUE_FAILED_ERROR: 'SMP_CONFIRM_VALUE_FAILED_ERROR', 127 SMP_PAIRING_NOT_SUPPORTED_ERROR: 'SMP_PAIRING_NOT_SUPPORTED_ERROR', 128 SMP_ENCRYPTION_KEY_SIZE_ERROR: 'SMP_ENCRYPTION_KEY_SIZE_ERROR', 129 SMP_COMMAND_NOT_SUPPORTED_ERROR: 'SMP_COMMAND_NOT_SUPPORTED_ERROR', 130 SMP_UNSPECIFIED_REASON_ERROR: 'SMP_UNSPECIFIED_REASON_ERROR', 131 SMP_REPEATED_ATTEMPTS_ERROR: 'SMP_REPEATED_ATTEMPTS_ERROR', 132 SMP_INVALID_PARAMETERS_ERROR: 'SMP_INVALID_PARAMETERS_ERROR', 133 SMP_DHKEY_CHECK_FAILED_ERROR: 'SMP_DHKEY_CHECK_FAILED_ERROR', 134 SMP_NUMERIC_COMPARISON_FAILED_ERROR: 'SMP_NUMERIC_COMPARISON_FAILED_ERROR', 135 SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR: 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR', 136 SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR' 137} 138 139SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE = 0 140SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE = 1 141SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE = 2 142SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE = 3 143SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4 144 145SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = { 146 SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE', 147 SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE', 148 SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE', 149 SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE', 150 SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE' 151} 152 153# Bit flags for key distribution/generation 154SMP_ENC_KEY_DISTRIBUTION_FLAG = 0b0001 155SMP_ID_KEY_DISTRIBUTION_FLAG = 0b0010 156SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100 157SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000 158 159# AuthReq fields 160SMP_BONDING_AUTHREQ = 0b00000001 161SMP_MITM_AUTHREQ = 0b00000100 162SMP_SC_AUTHREQ = 0b00001000 163SMP_KEYPRESS_AUTHREQ = 0b00010000 164SMP_CT2_AUTHREQ = 0b00100000 165 166# Crypto salt 167SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031') 168SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032') 169 170# fmt: on 171# pylint: enable=line-too-long 172# pylint: disable=invalid-name 173 174 175# ----------------------------------------------------------------------------- 176# Utils 177# ----------------------------------------------------------------------------- 178def error_name(error_code): 179 return name_or_number(SMP_ERROR_NAMES, error_code) 180 181 182# ----------------------------------------------------------------------------- 183# Classes 184# ----------------------------------------------------------------------------- 185class SMP_Command: 186 ''' 187 See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL 188 ''' 189 190 smp_classes: Dict[int, Type[SMP_Command]] = {} 191 code = 0 192 name = '' 193 194 @staticmethod 195 def from_bytes(pdu): 196 code = pdu[0] 197 198 cls = SMP_Command.smp_classes.get(code) 199 if cls is None: 200 instance = SMP_Command(pdu) 201 instance.name = SMP_Command.command_name(code) 202 instance.code = code 203 return instance 204 self = cls.__new__(cls) 205 SMP_Command.__init__(self, pdu) 206 if hasattr(self, 'fields'): 207 self.init_from_bytes(pdu, 1) 208 return self 209 210 @staticmethod 211 def command_name(code): 212 return name_or_number(SMP_COMMAND_NAMES, code) 213 214 @staticmethod 215 def auth_req_str(value): 216 bonding_flags = value & 3 217 mitm = (value >> 2) & 1 218 sc = (value >> 3) & 1 219 keypress = (value >> 4) & 1 220 ct2 = (value >> 5) & 1 221 222 return ( 223 f'bonding_flags={bonding_flags}, ' 224 f'MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}' 225 ) 226 227 @staticmethod 228 def io_capability_name(io_capability): 229 return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability) 230 231 @staticmethod 232 def key_distribution_str(value): 233 key_types = [] 234 if value & SMP_ENC_KEY_DISTRIBUTION_FLAG: 235 key_types.append('ENC') 236 if value & SMP_ID_KEY_DISTRIBUTION_FLAG: 237 key_types.append('ID') 238 if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 239 key_types.append('SIGN') 240 if value & SMP_LINK_KEY_DISTRIBUTION_FLAG: 241 key_types.append('LINK') 242 return ','.join(key_types) 243 244 @staticmethod 245 def keypress_notification_type_name(notification_type): 246 return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type) 247 248 @staticmethod 249 def subclass(fields): 250 def inner(cls): 251 cls.name = cls.__name__.upper() 252 cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name) 253 if cls.code is None: 254 raise KeyError( 255 f'Command name {cls.name} not found in SMP_COMMAND_NAMES' 256 ) 257 cls.fields = fields 258 259 # Register a factory for this class 260 SMP_Command.smp_classes[cls.code] = cls 261 262 return cls 263 264 return inner 265 266 def __init__(self, pdu=None, **kwargs): 267 if hasattr(self, 'fields') and kwargs: 268 HCI_Object.init_from_fields(self, self.fields, kwargs) 269 if pdu is None: 270 pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) 271 self.pdu = pdu 272 273 def init_from_bytes(self, pdu, offset): 274 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 275 276 def to_bytes(self): 277 return self.pdu 278 279 def __bytes__(self): 280 return self.to_bytes() 281 282 def __str__(self): 283 result = color(self.name, 'yellow') 284 if fields := getattr(self, 'fields', None): 285 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 286 else: 287 if len(self.pdu) > 1: 288 result += f': {self.pdu.hex()}' 289 return result 290 291 292# ----------------------------------------------------------------------------- 293@SMP_Command.subclass( 294 [ 295 ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), 296 ('oob_data_flag', 1), 297 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 298 ('maximum_encryption_key_size', 1), 299 ( 300 'initiator_key_distribution', 301 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 302 ), 303 ( 304 'responder_key_distribution', 305 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 306 ), 307 ] 308) 309class SMP_Pairing_Request_Command(SMP_Command): 310 ''' 311 See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request 312 ''' 313 314 315# ----------------------------------------------------------------------------- 316@SMP_Command.subclass( 317 [ 318 ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), 319 ('oob_data_flag', 1), 320 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 321 ('maximum_encryption_key_size', 1), 322 ( 323 'initiator_key_distribution', 324 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 325 ), 326 ( 327 'responder_key_distribution', 328 {'size': 1, 'mapper': SMP_Command.key_distribution_str}, 329 ), 330 ] 331) 332class SMP_Pairing_Response_Command(SMP_Command): 333 ''' 334 See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response 335 ''' 336 337 338# ----------------------------------------------------------------------------- 339@SMP_Command.subclass([('confirm_value', 16)]) 340class SMP_Pairing_Confirm_Command(SMP_Command): 341 ''' 342 See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm 343 ''' 344 345 346# ----------------------------------------------------------------------------- 347@SMP_Command.subclass([('random_value', 16)]) 348class SMP_Pairing_Random_Command(SMP_Command): 349 ''' 350 See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random 351 ''' 352 353 354# ----------------------------------------------------------------------------- 355@SMP_Command.subclass([('reason', {'size': 1, 'mapper': error_name})]) 356class SMP_Pairing_Failed_Command(SMP_Command): 357 ''' 358 See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed 359 ''' 360 361 362# ----------------------------------------------------------------------------- 363@SMP_Command.subclass([('public_key_x', 32), ('public_key_y', 32)]) 364class SMP_Pairing_Public_Key_Command(SMP_Command): 365 ''' 366 See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key 367 ''' 368 369 370# ----------------------------------------------------------------------------- 371@SMP_Command.subclass( 372 [ 373 ('dhkey_check', 16), 374 ] 375) 376class SMP_Pairing_DHKey_Check_Command(SMP_Command): 377 ''' 378 See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check 379 ''' 380 381 382# ----------------------------------------------------------------------------- 383@SMP_Command.subclass( 384 [ 385 ( 386 'notification_type', 387 {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name}, 388 ), 389 ] 390) 391class SMP_Pairing_Keypress_Notification_Command(SMP_Command): 392 ''' 393 See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification 394 ''' 395 396 397# ----------------------------------------------------------------------------- 398@SMP_Command.subclass([('long_term_key', 16)]) 399class SMP_Encryption_Information_Command(SMP_Command): 400 ''' 401 See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information 402 ''' 403 404 405# ----------------------------------------------------------------------------- 406@SMP_Command.subclass([('ediv', 2), ('rand', 8)]) 407class SMP_Master_Identification_Command(SMP_Command): 408 ''' 409 See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification 410 ''' 411 412 413# ----------------------------------------------------------------------------- 414@SMP_Command.subclass([('identity_resolving_key', 16)]) 415class SMP_Identity_Information_Command(SMP_Command): 416 ''' 417 See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information 418 ''' 419 420 421# ----------------------------------------------------------------------------- 422@SMP_Command.subclass( 423 [ 424 ('addr_type', Address.ADDRESS_TYPE_SPEC), 425 ('bd_addr', Address.parse_address_preceded_by_type), 426 ] 427) 428class SMP_Identity_Address_Information_Command(SMP_Command): 429 ''' 430 See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information 431 ''' 432 433 434# ----------------------------------------------------------------------------- 435@SMP_Command.subclass([('signature_key', 16)]) 436class SMP_Signing_Information_Command(SMP_Command): 437 ''' 438 See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information 439 ''' 440 441 442# ----------------------------------------------------------------------------- 443@SMP_Command.subclass( 444 [ 445 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 446 ] 447) 448class SMP_Security_Request_Command(SMP_Command): 449 ''' 450 See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request 451 ''' 452 453 454# ----------------------------------------------------------------------------- 455def smp_auth_req(bonding, mitm, sc, keypress, ct2): 456 value = 0 457 if bonding: 458 value |= SMP_BONDING_AUTHREQ 459 if mitm: 460 value |= SMP_MITM_AUTHREQ 461 if sc: 462 value |= SMP_SC_AUTHREQ 463 if keypress: 464 value |= SMP_KEYPRESS_AUTHREQ 465 if ct2: 466 value |= SMP_CT2_AUTHREQ 467 return value 468 469 470# ----------------------------------------------------------------------------- 471class AddressResolver: 472 def __init__(self, resolving_keys): 473 self.resolving_keys = resolving_keys 474 475 def resolve(self, address): 476 address_bytes = bytes(address) 477 hash_part = address_bytes[0:3] 478 prand = address_bytes[3:6] 479 for (irk, resolved_address) in self.resolving_keys: 480 local_hash = crypto.ah(irk, prand) 481 if local_hash == hash_part: 482 # Match! 483 if resolved_address.address_type == Address.PUBLIC_DEVICE_ADDRESS: 484 resolved_address_type = Address.PUBLIC_IDENTITY_ADDRESS 485 else: 486 resolved_address_type = Address.RANDOM_IDENTITY_ADDRESS 487 return Address( 488 address=str(resolved_address), address_type=resolved_address_type 489 ) 490 491 return None 492 493 494# ----------------------------------------------------------------------------- 495class PairingDelegate: 496 NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY 497 KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY 498 DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY 499 DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY 500 DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY 501 DEFAULT_KEY_DISTRIBUTION: int = ( 502 SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG 503 ) 504 505 def __init__( 506 self, 507 io_capability: int = NO_OUTPUT_NO_INPUT, 508 local_initiator_key_distribution: int = DEFAULT_KEY_DISTRIBUTION, 509 local_responder_key_distribution: int = DEFAULT_KEY_DISTRIBUTION, 510 ) -> None: 511 self.io_capability = io_capability 512 self.local_initiator_key_distribution = local_initiator_key_distribution 513 self.local_responder_key_distribution = local_responder_key_distribution 514 515 async def accept(self) -> bool: 516 return True 517 518 async def confirm(self) -> bool: 519 return True 520 521 # pylint: disable-next=unused-argument 522 async def compare_numbers(self, number: int, digits: int) -> bool: 523 return True 524 525 async def get_number(self) -> Optional[int]: 526 ''' 527 Returns an optional number as an answer to a passkey request. 528 Returning `None` will result in a negative reply. 529 ''' 530 return 0 531 532 async def get_string(self, max_length) -> Optional[str]: 533 ''' 534 Returns a string whose utf-8 encoding is up to max_length bytes. 535 ''' 536 return None 537 538 # pylint: disable-next=unused-argument 539 async def display_number(self, number: int, digits: int) -> None: 540 pass 541 542 async def key_distribution_response( 543 self, peer_initiator_key_distribution, peer_responder_key_distribution 544 ): 545 return ( 546 (peer_initiator_key_distribution & self.local_initiator_key_distribution), 547 (peer_responder_key_distribution & self.local_responder_key_distribution), 548 ) 549 550 551# ----------------------------------------------------------------------------- 552class PairingConfig: 553 def __init__( 554 self, 555 sc: bool = True, 556 mitm: bool = True, 557 bonding: bool = True, 558 delegate: Optional[PairingDelegate] = None, 559 ) -> None: 560 self.sc = sc 561 self.mitm = mitm 562 self.bonding = bonding 563 self.delegate = delegate or PairingDelegate() 564 565 def __str__(self): 566 io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability) 567 return ( 568 f'PairingConfig(sc={self.sc}, ' 569 f'mitm={self.mitm}, bonding={self.bonding}, ' 570 f'delegate[{io_capability_str}])' 571 ) 572 573 574# ----------------------------------------------------------------------------- 575class Session: 576 # Pairing methods 577 JUST_WORKS = 0 578 NUMERIC_COMPARISON = 1 579 PASSKEY = 2 580 OOB = 3 581 582 PAIRING_METHOD_NAMES = { 583 JUST_WORKS: 'JUST_WORKS', 584 NUMERIC_COMPARISON: 'NUMERIC_COMPARISON', 585 PASSKEY: 'PASSKEY', 586 OOB: 'OOB', 587 } 588 589 # I/O Capability to pairing method decision matrix 590 # 591 # See Bluetooth spec @ Vol 3, part H - Table 2.8: Mapping of IO Capabilities to Key 592 # Generation Method 593 # 594 # Map: initiator -> responder -> <method> 595 # where <method> may be a simple entry or a 2-element tuple, with the first element 596 # for legacy pairing and the second for secure connections, when the two are 597 # different. Each entry is either a method name, or, for PASSKEY, a tuple: 598 # (method, initiator_displays, responder_displays) 599 # to specify if the initiator and responder should display (True) or input a code 600 # (False). 601 PAIRING_METHODS = { 602 SMP_DISPLAY_ONLY_IO_CAPABILITY: { 603 SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS, 604 SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS, 605 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False), 606 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 607 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, True, False), 608 }, 609 SMP_DISPLAY_YES_NO_IO_CAPABILITY: { 610 SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS, 611 SMP_DISPLAY_YES_NO_IO_CAPABILITY: (JUST_WORKS, NUMERIC_COMPARISON), 612 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False), 613 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 614 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ( 615 (PASSKEY, True, False), 616 NUMERIC_COMPARISON, 617 ), 618 }, 619 SMP_KEYBOARD_ONLY_IO_CAPABILITY: { 620 SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True), 621 SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PASSKEY, False, True), 622 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, False, False), 623 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 624 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, False, True), 625 }, 626 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { 627 SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS, 628 SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS, 629 SMP_KEYBOARD_ONLY_IO_CAPABILITY: JUST_WORKS, 630 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 631 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: JUST_WORKS, 632 }, 633 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: { 634 SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True), 635 SMP_DISPLAY_YES_NO_IO_CAPABILITY: ( 636 (PASSKEY, False, True), 637 NUMERIC_COMPARISON, 638 ), 639 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False), 640 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 641 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ( 642 (PASSKEY, True, False), 643 NUMERIC_COMPARISON, 644 ), 645 }, 646 } 647 648 def __init__(self, manager, connection, pairing_config): 649 self.manager = manager 650 self.connection = connection 651 self.preq = None 652 self.pres = None 653 self.ea = None 654 self.eb = None 655 self.tk = bytes(16) 656 self.r = bytes(16) 657 self.stk = None 658 self.ltk = None 659 self.ltk_ediv = 0 660 self.ltk_rand = bytes(8) 661 self.link_key = None 662 self.initiator_key_distribution = 0 663 self.responder_key_distribution = 0 664 self.peer_random_value = None 665 self.peer_public_key_x = bytes(32) 666 self.peer_public_key_y = bytes(32) 667 self.peer_ltk = None 668 self.peer_ediv = None 669 self.peer_rand = None 670 self.peer_identity_resolving_key = None 671 self.peer_bd_addr = None 672 self.peer_signature_key = None 673 self.peer_expected_distributions = [] 674 self.dh_key = None 675 self.confirm_value = None 676 self.passkey = None 677 self.passkey_ready = asyncio.Event() 678 self.passkey_step = 0 679 self.passkey_display = False 680 self.pairing_method = 0 681 self.pairing_config = pairing_config 682 self.wait_before_continuing = None 683 self.completed = False 684 self.ctkd_task = None 685 686 # Decide if we're the initiator or the responder 687 self.is_initiator = connection.role == BT_CENTRAL_ROLE 688 self.is_responder = not self.is_initiator 689 690 # Listen for connection events 691 connection.on('disconnection', self.on_disconnection) 692 connection.on( 693 'connection_encryption_change', self.on_connection_encryption_change 694 ) 695 connection.on( 696 'connection_encryption_key_refresh', 697 self.on_connection_encryption_key_refresh, 698 ) 699 700 # Create a future that can be used to wait for the session to complete 701 if self.is_initiator: 702 self.pairing_result = asyncio.get_running_loop().create_future() 703 else: 704 self.pairing_result = None 705 706 # Key Distribution (default values before negotiation) 707 self.initiator_key_distribution = ( 708 pairing_config.delegate.local_initiator_key_distribution 709 ) 710 self.responder_key_distribution = ( 711 pairing_config.delegate.local_responder_key_distribution 712 ) 713 714 # Authentication Requirements Flags - Vol 3, Part H, Figure 3.3 715 self.bonding = pairing_config.bonding 716 self.sc = pairing_config.sc 717 self.mitm = pairing_config.mitm 718 self.keypress = False 719 self.ct2 = False 720 721 # I/O Capabilities 722 self.io_capability = pairing_config.delegate.io_capability 723 self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY 724 725 # OOB (not supported yet) 726 self.oob = False 727 728 # Set up addresses 729 self_address = connection.self_address 730 peer_address = connection.peer_resolvable_address or connection.peer_address 731 if self.is_initiator: 732 self.ia = bytes(self_address) 733 self.iat = 1 if self_address.is_random else 0 734 self.ra = bytes(peer_address) 735 self.rat = 1 if peer_address.is_random else 0 736 else: 737 self.ra = bytes(self_address) 738 self.rat = 1 if self_address.is_random else 0 739 self.ia = bytes(peer_address) 740 self.iat = 1 if peer_address.is_random else 0 741 742 @property 743 def pkx(self): 744 return (bytes(reversed(self.manager.ecc_key.x)), self.peer_public_key_x) 745 746 @property 747 def pka(self): 748 return self.pkx[0 if self.is_initiator else 1] 749 750 @property 751 def pkb(self): 752 return self.pkx[0 if self.is_responder else 1] 753 754 @property 755 def nx(self): 756 return (self.r, self.peer_random_value) 757 758 @property 759 def na(self): 760 return self.nx[0 if self.is_initiator else 1] 761 762 @property 763 def nb(self): 764 return self.nx[0 if self.is_responder else 1] 765 766 @property 767 def auth_req(self): 768 return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2) 769 770 def get_long_term_key(self, rand, ediv): 771 if not self.sc and not self.completed: 772 if rand == self.ltk_rand and ediv == self.ltk_ediv: 773 return self.stk 774 else: 775 return self.ltk 776 777 return None 778 779 def decide_pairing_method( 780 self, auth_req, initiator_io_capability, responder_io_capability 781 ): 782 if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0): 783 self.pairing_method = self.JUST_WORKS 784 return 785 786 details = self.PAIRING_METHODS[initiator_io_capability][responder_io_capability] 787 if isinstance(details, tuple) and len(details) == 2: 788 # One entry for legacy pairing and one for secure connections 789 details = details[1 if self.sc else 0] 790 if isinstance(details, int): 791 # Just a method ID 792 self.pairing_method = details 793 else: 794 # PASSKEY method, with a method ID and display/input flags 795 self.pairing_method = details[0] 796 self.passkey_display = details[1 if self.is_initiator else 2] 797 798 def check_expected_value(self, expected, received, error): 799 logger.debug(f'expected={expected.hex()} got={received.hex()}') 800 if expected != received: 801 logger.info(color('pairing confirm/check mismatch', 'red')) 802 self.send_pairing_failed(error) 803 return False 804 return True 805 806 def prompt_user_for_confirmation(self, next_steps): 807 async def prompt(): 808 logger.debug('ask for confirmation') 809 try: 810 response = await self.pairing_config.delegate.confirm() 811 if response: 812 next_steps() 813 return 814 except Exception as error: 815 logger.warning(f'exception while confirm: {error}') 816 817 self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) 818 819 self.connection.abort_on('disconnection', prompt()) 820 821 def prompt_user_for_numeric_comparison(self, code, next_steps): 822 async def prompt(): 823 logger.debug(f'verification code: {code}') 824 try: 825 response = await self.pairing_config.delegate.compare_numbers( 826 code, digits=6 827 ) 828 if response: 829 next_steps() 830 return 831 except Exception as error: 832 logger.warning(f'exception while prompting: {error}') 833 834 self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) 835 836 self.connection.abort_on('disconnection', prompt()) 837 838 def prompt_user_for_number(self, next_steps): 839 async def prompt(): 840 logger.debug('prompting user for passkey') 841 try: 842 passkey = await self.pairing_config.delegate.get_number() 843 logger.debug(f'user input: {passkey}') 844 next_steps(passkey) 845 except Exception as error: 846 logger.warning(f'exception while prompting: {error}') 847 self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) 848 849 self.connection.abort_on('disconnection', prompt()) 850 851 def display_passkey(self): 852 # Generate random Passkey/PIN code 853 self.passkey = secrets.randbelow(1000000) 854 logger.debug(f'Pairing PIN CODE: {self.passkey:06}') 855 self.passkey_ready.set() 856 857 # The value of TK is computed from the PIN code 858 if not self.sc: 859 self.tk = self.passkey.to_bytes(16, byteorder='little') 860 logger.debug(f'TK from passkey = {self.tk.hex()}') 861 862 self.connection.abort_on( 863 'disconnection', 864 self.pairing_config.delegate.display_number(self.passkey, digits=6), 865 ) 866 867 def input_passkey(self, next_steps=None): 868 # Prompt the user for the passkey displayed on the peer 869 def after_input(passkey): 870 self.passkey = passkey 871 872 if not self.sc: 873 self.tk = passkey.to_bytes(16, byteorder='little') 874 logger.debug(f'TK from passkey = {self.tk.hex()}') 875 876 self.passkey_ready.set() 877 878 if next_steps is not None: 879 next_steps() 880 881 self.prompt_user_for_number(after_input) 882 883 def display_or_input_passkey(self, next_steps=None): 884 if self.passkey_display: 885 self.display_passkey() 886 if next_steps is not None: 887 next_steps() 888 else: 889 self.input_passkey(next_steps) 890 891 def send_command(self, command): 892 self.manager.send_command(self.connection, command) 893 894 def send_pairing_failed(self, error): 895 self.send_command(SMP_Pairing_Failed_Command(reason=error)) 896 self.on_pairing_failure(error) 897 898 def send_pairing_request_command(self): 899 self.manager.on_session_start(self) 900 901 command = SMP_Pairing_Request_Command( 902 io_capability=self.io_capability, 903 oob_data_flag=0, 904 auth_req=self.auth_req, 905 maximum_encryption_key_size=16, 906 initiator_key_distribution=self.initiator_key_distribution, 907 responder_key_distribution=self.responder_key_distribution, 908 ) 909 self.preq = bytes(command) 910 self.send_command(command) 911 912 def send_pairing_response_command(self): 913 response = SMP_Pairing_Response_Command( 914 io_capability=self.io_capability, 915 oob_data_flag=0, 916 auth_req=self.auth_req, 917 maximum_encryption_key_size=16, 918 initiator_key_distribution=self.initiator_key_distribution, 919 responder_key_distribution=self.responder_key_distribution, 920 ) 921 self.pres = bytes(response) 922 self.send_command(response) 923 924 def send_pairing_confirm_command(self): 925 self.r = crypto.r() 926 logger.debug(f'generated random: {self.r.hex()}') 927 928 if self.sc: 929 930 async def next_steps(): 931 if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON): 932 z = 0 933 elif self.pairing_method == self.PASSKEY: 934 # We need a passkey 935 await self.passkey_ready.wait() 936 937 z = 0x80 + ((self.passkey >> self.passkey_step) & 1) 938 else: 939 return 940 941 if self.is_initiator: 942 confirm_value = crypto.f4(self.pka, self.pkb, self.r, bytes([z])) 943 else: 944 confirm_value = crypto.f4(self.pkb, self.pka, self.r, bytes([z])) 945 946 self.send_command( 947 SMP_Pairing_Confirm_Command(confirm_value=confirm_value) 948 ) 949 950 # Perform the next steps asynchronously in case we need to wait for input 951 self.connection.abort_on('disconnection', next_steps()) 952 else: 953 confirm_value = crypto.c1( 954 self.tk, 955 self.r, 956 self.preq, 957 self.pres, 958 self.iat, 959 self.rat, 960 self.ia, 961 self.ra, 962 ) 963 964 self.send_command(SMP_Pairing_Confirm_Command(confirm_value=confirm_value)) 965 966 def send_pairing_random_command(self): 967 self.send_command(SMP_Pairing_Random_Command(random_value=self.r)) 968 969 def send_public_key_command(self): 970 self.send_command( 971 SMP_Pairing_Public_Key_Command( 972 public_key_x=bytes(reversed(self.manager.ecc_key.x)), 973 public_key_y=bytes(reversed(self.manager.ecc_key.y)), 974 ) 975 ) 976 977 def send_pairing_dhkey_check_command(self): 978 self.send_command( 979 SMP_Pairing_DHKey_Check_Command( 980 dhkey_check=self.ea if self.is_initiator else self.eb 981 ) 982 ) 983 984 def start_encryption(self, key): 985 # We can now encrypt the connection with the short term key, so that we can 986 # distribute the long term and/or other keys over an encrypted connection 987 self.manager.device.host.send_command_sync( 988 HCI_LE_Enable_Encryption_Command( 989 connection_handle=self.connection.handle, 990 random_number=bytes(8), 991 encrypted_diversifier=0, 992 long_term_key=key, 993 ) 994 ) 995 996 async def derive_ltk(self): 997 link_key = await self.manager.device.get_link_key(self.connection.peer_address) 998 assert link_key is not None 999 ilk = ( 1000 crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key) 1001 if self.ct2 1002 else crypto.h6(link_key, b'tmp2') 1003 ) 1004 self.ltk = crypto.h6(ilk, b'brle') 1005 1006 def distribute_keys(self): 1007 # Distribute the keys as required 1008 if self.is_initiator: 1009 # CTKD: Derive LTK from LinkKey 1010 if ( 1011 self.connection.transport == BT_BR_EDR_TRANSPORT 1012 and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG 1013 ): 1014 self.ctkd_task = self.connection.abort_on( 1015 'disconnection', self.derive_ltk() 1016 ) 1017 elif not self.sc: 1018 # Distribute the LTK, EDIV and RAND 1019 if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: 1020 self.send_command( 1021 SMP_Encryption_Information_Command(long_term_key=self.ltk) 1022 ) 1023 self.send_command( 1024 SMP_Master_Identification_Command( 1025 ediv=self.ltk_ediv, rand=self.ltk_rand 1026 ) 1027 ) 1028 1029 # Distribute IRK & BD ADDR 1030 if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: 1031 self.send_command( 1032 SMP_Identity_Information_Command( 1033 identity_resolving_key=self.manager.device.irk 1034 ) 1035 ) 1036 self.send_command( 1037 SMP_Identity_Address_Information_Command( 1038 addr_type=self.connection.self_address.address_type, 1039 bd_addr=self.connection.self_address, 1040 ) 1041 ) 1042 1043 # Distribute CSRK 1044 csrk = bytes(16) # FIXME: testing 1045 if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 1046 self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) 1047 1048 # CTKD, calculate BR/EDR link key 1049 if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: 1050 ilk = ( 1051 crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk) 1052 if self.ct2 1053 else crypto.h6(self.ltk, b'tmp1') 1054 ) 1055 self.link_key = crypto.h6(ilk, b'lebr') 1056 1057 else: 1058 # CTKD: Derive LTK from LinkKey 1059 if ( 1060 self.connection.transport == BT_BR_EDR_TRANSPORT 1061 and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG 1062 ): 1063 self.ctkd_task = self.connection.abort_on( 1064 'disconnection', self.derive_ltk() 1065 ) 1066 # Distribute the LTK, EDIV and RAND 1067 elif not self.sc: 1068 if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: 1069 self.send_command( 1070 SMP_Encryption_Information_Command(long_term_key=self.ltk) 1071 ) 1072 self.send_command( 1073 SMP_Master_Identification_Command( 1074 ediv=self.ltk_ediv, rand=self.ltk_rand 1075 ) 1076 ) 1077 1078 # Distribute IRK & BD ADDR 1079 if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: 1080 self.send_command( 1081 SMP_Identity_Information_Command( 1082 identity_resolving_key=self.manager.device.irk 1083 ) 1084 ) 1085 self.send_command( 1086 SMP_Identity_Address_Information_Command( 1087 addr_type=self.connection.self_address.address_type, 1088 bd_addr=self.connection.self_address, 1089 ) 1090 ) 1091 1092 # Distribute CSRK 1093 csrk = bytes(16) # FIXME: testing 1094 if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 1095 self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) 1096 1097 # CTKD, calculate BR/EDR link key 1098 if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: 1099 ilk = ( 1100 crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk) 1101 if self.ct2 1102 else crypto.h6(self.ltk, b'tmp1') 1103 ) 1104 self.link_key = crypto.h6(ilk, b'lebr') 1105 1106 def compute_peer_expected_distributions(self, key_distribution_flags): 1107 # Set our expectations for what to wait for in the key distribution phase 1108 self.peer_expected_distributions = [] 1109 if not self.sc and self.connection.transport == BT_LE_TRANSPORT: 1110 if key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0: 1111 self.peer_expected_distributions.append( 1112 SMP_Encryption_Information_Command 1113 ) 1114 self.peer_expected_distributions.append( 1115 SMP_Master_Identification_Command 1116 ) 1117 if key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0: 1118 self.peer_expected_distributions.append(SMP_Identity_Information_Command) 1119 self.peer_expected_distributions.append( 1120 SMP_Identity_Address_Information_Command 1121 ) 1122 if key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0: 1123 self.peer_expected_distributions.append(SMP_Signing_Information_Command) 1124 logger.debug( 1125 'expecting distributions: ' 1126 f'{[c.__name__ for c in self.peer_expected_distributions]}' 1127 ) 1128 1129 def check_key_distribution(self, command_class): 1130 # First, check that the connection is encrypted 1131 if not self.connection.is_encrypted: 1132 logger.warning( 1133 color('received key distribution on a non-encrypted connection', 'red') 1134 ) 1135 self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) 1136 return 1137 1138 # Check that this command class is expected 1139 if command_class in self.peer_expected_distributions: 1140 self.peer_expected_distributions.remove(command_class) 1141 logger.debug( 1142 'remaining distributions: ' 1143 f'{[c.__name__ for c in self.peer_expected_distributions]}' 1144 ) 1145 if not self.peer_expected_distributions: 1146 self.on_peer_key_distribution_complete() 1147 else: 1148 logger.warning( 1149 color( 1150 '!!! unexpected key distribution command: ' 1151 f'{command_class.__name__}', 1152 'red', 1153 ) 1154 ) 1155 self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) 1156 1157 async def pair(self): 1158 # Start pairing as an initiator 1159 # TODO: check that this session isn't already active 1160 1161 # Send the pairing request to start the process 1162 self.send_pairing_request_command() 1163 1164 # Wait for the pairing process to finish 1165 await self.connection.abort_on('disconnection', self.pairing_result) 1166 1167 def on_disconnection(self, _): 1168 self.connection.remove_listener('disconnection', self.on_disconnection) 1169 self.connection.remove_listener( 1170 'connection_encryption_change', self.on_connection_encryption_change 1171 ) 1172 self.connection.remove_listener( 1173 'connection_encryption_key_refresh', 1174 self.on_connection_encryption_key_refresh, 1175 ) 1176 self.manager.on_session_end(self) 1177 1178 def on_peer_key_distribution_complete(self): 1179 # The initiator can now send its keys 1180 if self.is_initiator: 1181 self.distribute_keys() 1182 1183 self.connection.abort_on('disconnection', self.on_pairing()) 1184 1185 def on_connection_encryption_change(self): 1186 if self.connection.is_encrypted: 1187 if self.is_responder: 1188 # The responder distributes its keys first, the initiator later 1189 self.distribute_keys() 1190 1191 # If we're not expecting key distributions from the peer, we're done 1192 if not self.peer_expected_distributions: 1193 self.on_peer_key_distribution_complete() 1194 1195 def on_connection_encryption_key_refresh(self): 1196 # Do as if the connection had just been encrypted 1197 self.on_connection_encryption_change() 1198 1199 async def on_pairing(self): 1200 logger.debug('pairing complete') 1201 1202 if self.completed: 1203 return 1204 1205 self.completed = True 1206 1207 if self.pairing_result is not None and not self.pairing_result.done(): 1208 self.pairing_result.set_result(None) 1209 1210 # Use the peer address from the pairing protocol or the connection 1211 if self.peer_bd_addr: 1212 peer_address = self.peer_bd_addr 1213 else: 1214 peer_address = self.connection.peer_address 1215 1216 # Wait for link key fetch and key derivation 1217 if self.ctkd_task is not None: 1218 await self.ctkd_task 1219 self.ctkd_task = None 1220 1221 # Create an object to hold the keys 1222 keys = PairingKeys() 1223 keys.address_type = peer_address.address_type 1224 authenticated = self.pairing_method != self.JUST_WORKS 1225 if self.sc or self.connection.transport == BT_BR_EDR_TRANSPORT: 1226 keys.ltk = PairingKeys.Key(value=self.ltk, authenticated=authenticated) 1227 else: 1228 our_ltk_key = PairingKeys.Key( 1229 value=self.ltk, 1230 authenticated=authenticated, 1231 ediv=self.ltk_ediv, 1232 rand=self.ltk_rand, 1233 ) 1234 peer_ltk_key = PairingKeys.Key( 1235 value=self.peer_ltk, 1236 authenticated=authenticated, 1237 ediv=self.peer_ediv, 1238 rand=self.peer_rand, 1239 ) 1240 if self.is_initiator: 1241 keys.ltk_central = peer_ltk_key 1242 keys.ltk_peripheral = our_ltk_key 1243 else: 1244 keys.ltk_central = our_ltk_key 1245 keys.ltk_peripheral = peer_ltk_key 1246 if self.peer_identity_resolving_key is not None: 1247 keys.irk = PairingKeys.Key( 1248 value=self.peer_identity_resolving_key, authenticated=authenticated 1249 ) 1250 if self.peer_signature_key is not None: 1251 keys.csrk = PairingKeys.Key( 1252 value=self.peer_signature_key, authenticated=authenticated 1253 ) 1254 if self.link_key is not None: 1255 keys.link_key = PairingKeys.Key( 1256 value=self.link_key, authenticated=authenticated 1257 ) 1258 self.manager.on_pairing(self, peer_address, keys) 1259 1260 def on_pairing_failure(self, reason): 1261 logger.warning(f'pairing failure ({error_name(reason)})') 1262 1263 if self.completed: 1264 return 1265 1266 self.completed = True 1267 1268 error = ProtocolError(reason, 'smp', error_name(reason)) 1269 if self.pairing_result is not None and not self.pairing_result.done(): 1270 self.pairing_result.set_exception(error) 1271 self.manager.on_pairing_failure(self, reason) 1272 1273 def on_smp_command(self, command): 1274 # Find the handler method 1275 handler_name = f'on_{command.name.lower()}' 1276 handler = getattr(self, handler_name, None) 1277 if handler is not None: 1278 try: 1279 handler(command) 1280 except Exception as error: 1281 logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') 1282 response = SMP_Pairing_Failed_Command( 1283 reason=SMP_UNSPECIFIED_REASON_ERROR 1284 ) 1285 self.send_command(response) 1286 else: 1287 logger.error(color('SMP command not handled???', 'red')) 1288 1289 def on_smp_pairing_request_command(self, command): 1290 self.connection.abort_on( 1291 'disconnection', self.on_smp_pairing_request_command_async(command) 1292 ) 1293 1294 async def on_smp_pairing_request_command_async(self, command): 1295 # Check if the request should proceed 1296 accepted = await self.pairing_config.delegate.accept() 1297 if not accepted: 1298 logger.debug('pairing rejected by delegate') 1299 self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR) 1300 return 1301 1302 # Save the request 1303 self.preq = bytes(command) 1304 1305 # Bonding and SC require both sides to request/support it 1306 self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) 1307 self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) 1308 self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) 1309 1310 # Check for OOB 1311 if command.oob_data_flag != 0: 1312 self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) 1313 return 1314 1315 # Decide which pairing method to use 1316 self.decide_pairing_method( 1317 command.auth_req, command.io_capability, self.io_capability 1318 ) 1319 logger.debug( 1320 f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}' 1321 ) 1322 1323 # Key distribution 1324 ( 1325 self.initiator_key_distribution, 1326 self.responder_key_distribution, 1327 ) = await self.pairing_config.delegate.key_distribution_response( 1328 command.initiator_key_distribution, command.responder_key_distribution 1329 ) 1330 self.compute_peer_expected_distributions(self.initiator_key_distribution) 1331 1332 # The pairing is now starting 1333 self.manager.on_session_start(self) 1334 1335 # Display a passkey if we need to 1336 if not self.sc: 1337 if self.pairing_method == self.PASSKEY and self.passkey_display: 1338 self.display_passkey() 1339 1340 # Respond 1341 self.send_pairing_response_command() 1342 1343 # Vol 3, Part C, 5.2.2.1.3 1344 # CTKD over BR/EDR should happen after the connection has been encrypted, 1345 # so when receiving pairing requests, responder should start distributing keys 1346 if ( 1347 self.connection.transport == BT_BR_EDR_TRANSPORT 1348 and self.connection.is_encrypted 1349 and self.is_responder 1350 and accepted 1351 ): 1352 self.distribute_keys() 1353 1354 def on_smp_pairing_response_command(self, command): 1355 if self.is_responder: 1356 logger.warning(color('received pairing response as a responder', 'red')) 1357 return 1358 1359 # Save the response 1360 self.pres = bytes(command) 1361 self.peer_io_capability = command.io_capability 1362 1363 # Bonding and SC require both sides to request/support it 1364 self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) 1365 self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) 1366 1367 # Check for OOB 1368 if self.sc and command.oob_data_flag: 1369 self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) 1370 return 1371 1372 # Decide which pairing method to use 1373 self.decide_pairing_method( 1374 command.auth_req, self.io_capability, command.io_capability 1375 ) 1376 logger.debug( 1377 f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}' 1378 ) 1379 1380 # Key distribution 1381 if ( 1382 command.initiator_key_distribution & ~self.initiator_key_distribution != 0 1383 ) or ( 1384 command.responder_key_distribution & ~self.responder_key_distribution != 0 1385 ): 1386 # The response isn't a subset of the request 1387 self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR) 1388 return 1389 self.initiator_key_distribution = command.initiator_key_distribution 1390 self.responder_key_distribution = command.responder_key_distribution 1391 self.compute_peer_expected_distributions(self.responder_key_distribution) 1392 1393 # Start phase 2 1394 if self.sc: 1395 if self.pairing_method == self.PASSKEY: 1396 self.display_or_input_passkey() 1397 1398 self.send_public_key_command() 1399 else: 1400 if self.pairing_method == self.PASSKEY: 1401 self.display_or_input_passkey(self.send_pairing_confirm_command) 1402 else: 1403 self.send_pairing_confirm_command() 1404 1405 def on_smp_pairing_confirm_command_legacy(self, _): 1406 if self.is_initiator: 1407 self.send_pairing_random_command() 1408 else: 1409 # If the method is PASSKEY, now is the time to input the code 1410 if self.pairing_method == self.PASSKEY and not self.passkey_display: 1411 self.input_passkey(self.send_pairing_confirm_command) 1412 else: 1413 self.send_pairing_confirm_command() 1414 1415 def on_smp_pairing_confirm_command_secure_connections(self, _): 1416 if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON): 1417 if self.is_initiator: 1418 self.r = crypto.r() 1419 self.send_pairing_random_command() 1420 elif self.pairing_method == self.PASSKEY: 1421 if self.is_initiator: 1422 self.send_pairing_random_command() 1423 else: 1424 self.send_pairing_confirm_command() 1425 1426 def on_smp_pairing_confirm_command(self, command): 1427 self.confirm_value = command.confirm_value 1428 if self.sc: 1429 self.on_smp_pairing_confirm_command_secure_connections(command) 1430 else: 1431 self.on_smp_pairing_confirm_command_legacy(command) 1432 1433 def on_smp_pairing_random_command_legacy(self, command): 1434 # Check that the confirmation values match 1435 confirm_verifier = crypto.c1( 1436 self.tk, 1437 command.random_value, 1438 self.preq, 1439 self.pres, 1440 self.iat, 1441 self.rat, 1442 self.ia, 1443 self.ra, 1444 ) 1445 if not self.check_expected_value( 1446 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1447 ): 1448 return 1449 1450 # Compute STK 1451 if self.is_initiator: 1452 mrand = self.r 1453 srand = command.random_value 1454 else: 1455 srand = self.r 1456 mrand = command.random_value 1457 self.stk = crypto.s1(self.tk, srand, mrand) 1458 logger.debug(f'STK = {self.stk.hex()}') 1459 1460 # Generate LTK 1461 self.ltk = crypto.r() 1462 1463 if self.is_initiator: 1464 self.start_encryption(self.stk) 1465 else: 1466 self.send_pairing_random_command() 1467 1468 def on_smp_pairing_random_command_secure_connections(self, command): 1469 if self.pairing_method == self.PASSKEY and self.passkey is None: 1470 logger.warning('no passkey entered, ignoring command') 1471 return 1472 1473 # pylint: disable=too-many-return-statements 1474 if self.is_initiator: 1475 if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON): 1476 # Check that the random value matches what was committed to earlier 1477 confirm_verifier = crypto.f4( 1478 self.pkb, self.pka, command.random_value, bytes([0]) 1479 ) 1480 if not self.check_expected_value( 1481 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1482 ): 1483 return 1484 elif self.pairing_method == self.PASSKEY: 1485 # Check that the random value matches what was committed to earlier 1486 confirm_verifier = crypto.f4( 1487 self.pkb, 1488 self.pka, 1489 command.random_value, 1490 bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), 1491 ) 1492 if not self.check_expected_value( 1493 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1494 ): 1495 return 1496 1497 # Move on to the next iteration 1498 self.passkey_step += 1 1499 logger.debug(f'passkey finished step {self.passkey_step} of 20') 1500 if self.passkey_step < 20: 1501 self.send_pairing_confirm_command() 1502 return 1503 else: 1504 return 1505 else: 1506 if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON): 1507 self.send_pairing_random_command() 1508 elif self.pairing_method == self.PASSKEY: 1509 # Check that the random value matches what was committed to earlier 1510 confirm_verifier = crypto.f4( 1511 self.pka, 1512 self.pkb, 1513 command.random_value, 1514 bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]), 1515 ) 1516 if not self.check_expected_value( 1517 self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR 1518 ): 1519 return 1520 1521 self.send_pairing_random_command() 1522 1523 # Move on to the next iteration 1524 self.passkey_step += 1 1525 logger.debug(f'passkey finished step {self.passkey_step} of 20') 1526 if self.passkey_step < 20: 1527 self.r = crypto.r() 1528 return 1529 else: 1530 return 1531 1532 # Compute the MacKey and LTK 1533 a = self.ia + bytes([self.iat]) 1534 b = self.ra + bytes([self.rat]) 1535 (mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b) 1536 1537 # Compute the DH Key checks 1538 if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON): 1539 ra = bytes(16) 1540 rb = ra 1541 elif self.pairing_method == self.PASSKEY: 1542 ra = self.passkey.to_bytes(16, byteorder='little') 1543 rb = ra 1544 else: 1545 # OOB not implemented yet 1546 return 1547 1548 io_cap_a = self.preq[1:4] 1549 io_cap_b = self.pres[1:4] 1550 self.ea = crypto.f6(mac_key, self.na, self.nb, rb, io_cap_a, a, b) 1551 self.eb = crypto.f6(mac_key, self.nb, self.na, ra, io_cap_b, b, a) 1552 1553 # Next steps to be performed after possible user confirmation 1554 def next_steps(): 1555 # The initiator sends the DH Key check to the responder 1556 if self.is_initiator: 1557 self.send_pairing_dhkey_check_command() 1558 else: 1559 if self.wait_before_continuing: 1560 self.wait_before_continuing.set_result(None) 1561 1562 # Prompt the user for confirmation if needed 1563 if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON): 1564 # Compute the 6-digit code 1565 code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000 1566 1567 # Ask for user confirmation 1568 self.wait_before_continuing = asyncio.get_running_loop().create_future() 1569 if self.pairing_method == self.JUST_WORKS: 1570 self.prompt_user_for_confirmation(next_steps) 1571 else: 1572 self.prompt_user_for_numeric_comparison(code, next_steps) 1573 else: 1574 next_steps() 1575 1576 def on_smp_pairing_random_command(self, command): 1577 self.peer_random_value = command.random_value 1578 if self.sc: 1579 self.on_smp_pairing_random_command_secure_connections(command) 1580 else: 1581 self.on_smp_pairing_random_command_legacy(command) 1582 1583 def on_smp_pairing_public_key_command(self, command): 1584 # Store the public key so that we can compute the confirmation value later 1585 self.peer_public_key_x = command.public_key_x 1586 self.peer_public_key_y = command.public_key_y 1587 1588 # Compute the DH key 1589 self.dh_key = bytes( 1590 reversed( 1591 self.manager.ecc_key.dh( 1592 bytes(reversed(command.public_key_x)), 1593 bytes(reversed(command.public_key_y)), 1594 ) 1595 ) 1596 ) 1597 logger.debug(f'DH key: {self.dh_key.hex()}') 1598 1599 if self.is_initiator: 1600 self.send_pairing_confirm_command() 1601 else: 1602 if self.pairing_method == self.PASSKEY: 1603 self.display_or_input_passkey() 1604 1605 # Send our public key back to the initiator 1606 self.send_public_key_command() 1607 1608 if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON): 1609 # We can now send the confirmation value 1610 self.send_pairing_confirm_command() 1611 1612 def on_smp_pairing_dhkey_check_command(self, command): 1613 # Check that what we received matches what we computed earlier 1614 expected = self.eb if self.is_initiator else self.ea 1615 if not self.check_expected_value( 1616 expected, command.dhkey_check, SMP_DHKEY_CHECK_FAILED_ERROR 1617 ): 1618 return 1619 1620 if self.is_responder: 1621 if self.wait_before_continuing is not None: 1622 1623 async def next_steps(): 1624 await self.wait_before_continuing 1625 self.wait_before_continuing = None 1626 self.send_pairing_dhkey_check_command() 1627 1628 self.connection.abort_on('disconnection', next_steps()) 1629 else: 1630 self.send_pairing_dhkey_check_command() 1631 else: 1632 self.start_encryption(self.ltk) 1633 1634 def on_smp_pairing_failed_command(self, command): 1635 self.on_pairing_failure(command.reason) 1636 1637 def on_smp_encryption_information_command(self, command): 1638 self.peer_ltk = command.long_term_key 1639 self.check_key_distribution(SMP_Encryption_Information_Command) 1640 1641 def on_smp_master_identification_command(self, command): 1642 self.peer_ediv = command.ediv 1643 self.peer_rand = command.rand 1644 self.check_key_distribution(SMP_Master_Identification_Command) 1645 1646 def on_smp_identity_information_command(self, command): 1647 self.peer_identity_resolving_key = command.identity_resolving_key 1648 self.check_key_distribution(SMP_Identity_Information_Command) 1649 1650 def on_smp_identity_address_information_command(self, command): 1651 self.peer_bd_addr = command.bd_addr 1652 self.check_key_distribution(SMP_Identity_Address_Information_Command) 1653 1654 def on_smp_signing_information_command(self, command): 1655 self.peer_signature_key = command.signature_key 1656 self.check_key_distribution(SMP_Signing_Information_Command) 1657 1658 1659# ----------------------------------------------------------------------------- 1660class Manager(EventEmitter): 1661 ''' 1662 Implements the Initiator and Responder roles of the Security Manager Protocol 1663 ''' 1664 1665 def __init__(self, device): 1666 super().__init__() 1667 self.device = device 1668 self.sessions = {} 1669 self._ecc_key = None 1670 self.pairing_config_factory = lambda connection: PairingConfig() 1671 1672 def send_command(self, connection, command): 1673 logger.debug( 1674 f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] ' 1675 f'{connection.peer_address}: {command}' 1676 ) 1677 cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID 1678 connection.send_l2cap_pdu(cid, command.to_bytes()) 1679 1680 def on_smp_pdu(self, connection, pdu): 1681 # Look for a session with this connection, and create one if none exists 1682 if not (session := self.sessions.get(connection.handle)): 1683 pairing_config = self.pairing_config_factory(connection) 1684 if pairing_config is None: 1685 # Pairing disabled 1686 self.send_command( 1687 connection, 1688 SMP_Pairing_Failed_Command(reason=SMP_PAIRING_NOT_SUPPORTED_ERROR), 1689 ) 1690 return 1691 session = Session(self, connection, pairing_config) 1692 self.sessions[connection.handle] = session 1693 1694 # Parse the L2CAP payload into an SMP Command object 1695 command = SMP_Command.from_bytes(pdu) 1696 logger.debug( 1697 f'<<< Received SMP Command on connection [0x{connection.handle:04X}] ' 1698 f'{connection.peer_address}: {command}' 1699 ) 1700 1701 # Delegate the handling of the command to the session 1702 session.on_smp_command(command) 1703 1704 @property 1705 def ecc_key(self): 1706 if self._ecc_key is None: 1707 self._ecc_key = crypto.EccKey.generate() 1708 return self._ecc_key 1709 1710 async def pair(self, connection): 1711 # TODO: check if there's already a session for this connection 1712 pairing_config = self.pairing_config_factory(connection) 1713 if pairing_config is None: 1714 raise ValueError('pairing config must not be None when initiating') 1715 session = Session(self, connection, pairing_config) 1716 self.sessions[connection.handle] = session 1717 return await session.pair() 1718 1719 def request_pairing(self, connection): 1720 pairing_config = self.pairing_config_factory(connection) 1721 if pairing_config: 1722 auth_req = smp_auth_req( 1723 pairing_config.bonding, 1724 pairing_config.mitm, 1725 pairing_config.sc, 1726 False, 1727 False, 1728 ) 1729 else: 1730 auth_req = 0 1731 self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req)) 1732 1733 def on_session_start(self, session): 1734 self.device.on_pairing_start(session.connection.handle) 1735 1736 def on_pairing(self, session, identity_address, keys): 1737 # Store the keys in the key store 1738 if self.device.keystore and identity_address is not None: 1739 1740 async def store_keys(): 1741 try: 1742 await self.device.keystore.update(str(identity_address), keys) 1743 except Exception as error: 1744 logger.warning(f'!!! error while storing keys: {error}') 1745 1746 self.device.abort_on('flush', store_keys()) 1747 1748 # Notify the device 1749 self.device.on_pairing(session.connection.handle, keys, session.sc) 1750 1751 def on_pairing_failure(self, session, reason): 1752 self.device.on_pairing_failure(session.connection.handle, reason) 1753 1754 def on_session_end(self, session): 1755 logger.debug(f'session end for connection 0x{session.connection.handle:04X}') 1756 if session.connection.handle in self.sessions: 1757 del self.sessions[session.connection.handle] 1758 1759 def get_long_term_key(self, connection, rand, ediv): 1760 if session := self.sessions.get(connection.handle): 1761 return session.get_long_term_key(rand, ediv) 1762 1763 return None 1764