1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# SMP - Security Manager Protocol 17# 18# See Bluetooth spec @ Vol 3, Part H 19# 20# ----------------------------------------------------------------------------- 21 22# ----------------------------------------------------------------------------- 23# Imports 24# ----------------------------------------------------------------------------- 25import logging 26import asyncio 27import secrets 28from pyee import EventEmitter 29from colors import color 30 31from .core import * 32from .hci import * 33from .keys import PairingKeys 34from . import crypto 35 36 37# ----------------------------------------------------------------------------- 38# Logging 39# ----------------------------------------------------------------------------- 40logger = logging.getLogger(__name__) 41 42 43# ----------------------------------------------------------------------------- 44# Constants 45# ----------------------------------------------------------------------------- 46SMP_CID = 0x06 47 48SMP_PAIRING_REQUEST_COMMAND = 0x01 49SMP_PAIRING_RESPONSE_COMMAND = 0x02 50SMP_PAIRING_CONFIRM_COMMAND = 0x03 51SMP_PAIRING_RANDOM_COMMAND = 0x04 52SMP_PAIRING_FAILED_COMMAND = 0x05 53SMP_ENCRYPTION_INFORMATION_COMMAND = 0x06 54SMP_MASTER_IDENTIFICATION_COMMAND = 0x07 55SMP_IDENTITY_INFORMATION_COMMAND = 0x08 56SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND = 0x09 57SMP_SIGNING_INFORMATION_COMMAND = 0x0A 58SMP_SECURITY_REQUEST_COMMAND = 0x0B 59SMP_PAIRING_PUBLIC_KEY_COMMAND = 0x0C 60SMP_PAIRING_DHKEY_CHECK_COMMAND = 0x0D 61SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E 62 63SMP_COMMAND_NAMES = { 64 SMP_PAIRING_REQUEST_COMMAND: 'SMP_PAIRING_REQUEST_COMMAND', 65 SMP_PAIRING_RESPONSE_COMMAND: 'SMP_PAIRING_RESPONSE_COMMAND', 66 SMP_PAIRING_CONFIRM_COMMAND: 'SMP_PAIRING_CONFIRM_COMMAND', 67 SMP_PAIRING_RANDOM_COMMAND: 'SMP_PAIRING_RANDOM_COMMAND', 68 SMP_PAIRING_FAILED_COMMAND: 'SMP_PAIRING_FAILED_COMMAND', 69 SMP_ENCRYPTION_INFORMATION_COMMAND: 'SMP_ENCRYPTION_INFORMATION_COMMAND', 70 SMP_MASTER_IDENTIFICATION_COMMAND: 'SMP_MASTER_IDENTIFICATION_COMMAND', 71 SMP_IDENTITY_INFORMATION_COMMAND: 'SMP_IDENTITY_INFORMATION_COMMAND', 72 SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND: 'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND', 73 SMP_SIGNING_INFORMATION_COMMAND: 'SMP_SIGNING_INFORMATION_COMMAND', 74 SMP_SECURITY_REQUEST_COMMAND: 'SMP_SECURITY_REQUEST_COMMAND', 75 SMP_PAIRING_PUBLIC_KEY_COMMAND: 'SMP_PAIRING_PUBLIC_KEY_COMMAND', 76 SMP_PAIRING_DHKEY_CHECK_COMMAND: 'SMP_PAIRING_DHKEY_CHECK_COMMAND', 77 SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND' 78} 79 80SMP_DISPLAY_ONLY_IO_CAPABILITY = 0x00 81SMP_DISPLAY_YES_NO_IO_CAPABILITY = 0x01 82SMP_KEYBOARD_ONLY_IO_CAPABILITY = 0x02 83SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03 84SMP_KEYBOARD_DISPLAY_IO_CAPABILITY = 0x04 85 86SMP_IO_CAPABILITY_NAMES = { 87 SMP_DISPLAY_ONLY_IO_CAPABILITY: 'SMP_DISPLAY_ONLY_IO_CAPABILITY', 88 SMP_DISPLAY_YES_NO_IO_CAPABILITY: 'SMP_DISPLAY_YES_NO_IO_CAPABILITY', 89 SMP_KEYBOARD_ONLY_IO_CAPABILITY: 'SMP_KEYBOARD_ONLY_IO_CAPABILITY', 90 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY', 91 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: 'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY' 92} 93 94SMP_PASSKEY_ENTRY_FAILED_ERROR = 0x01 95SMP_OOB_NOT_AVAILABLE_ERROR = 0x02 96SMP_AUTHENTICATION_REQUIREMENTS_ERROR = 0x03 97SMP_CONFIRM_VALUE_FAILED_ERROR = 0x04 98SMP_PAIRING_NOT_SUPPORTED_ERROR = 0x05 99SMP_ENCRYPTION_KEY_SIZE_ERROR = 0x06 100SMP_COMMAND_NOT_SUPPORTED_ERROR = 0x07 101SMP_UNSPECIFIED_REASON_ERROR = 0x08 102SMP_REPEATED_ATTEMPTS_ERROR = 0x09 103SMP_INVALID_PARAMETERS_ERROR = 0x0A 104SMP_DHKEY_CHECK_FAILED_ERROR = 0x0B 105SMP_NUMERIC_COMPARISON_FAILED_ERROR = 0x0C 106SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR = 0x0D 107SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E 108 109SMP_ERROR_NAMES = { 110 SMP_PASSKEY_ENTRY_FAILED_ERROR: 'SMP_PASSKEY_ENTRY_FAILED_ERROR', 111 SMP_OOB_NOT_AVAILABLE_ERROR: 'SMP_OOB_NOT_AVAILABLE_ERROR', 112 SMP_AUTHENTICATION_REQUIREMENTS_ERROR: 'SMP_AUTHENTICATION_REQUIREMENTS_ERROR', 113 SMP_CONFIRM_VALUE_FAILED_ERROR: 'SMP_CONFIRM_VALUE_FAILED_ERROR', 114 SMP_PAIRING_NOT_SUPPORTED_ERROR: 'SMP_PAIRING_NOT_SUPPORTED_ERROR', 115 SMP_ENCRYPTION_KEY_SIZE_ERROR: 'SMP_ENCRYPTION_KEY_SIZE_ERROR', 116 SMP_COMMAND_NOT_SUPPORTED_ERROR: 'SMP_COMMAND_NOT_SUPPORTED_ERROR', 117 SMP_UNSPECIFIED_REASON_ERROR: 'SMP_UNSPECIFIED_REASON_ERROR', 118 SMP_REPEATED_ATTEMPTS_ERROR: 'SMP_REPEATED_ATTEMPTS_ERROR', 119 SMP_INVALID_PARAMETERS_ERROR: 'SMP_INVALID_PARAMETERS_ERROR', 120 SMP_DHKEY_CHECK_FAILED_ERROR: 'SMP_DHKEY_CHECK_FAILED_ERROR', 121 SMP_NUMERIC_COMPARISON_FAILED_ERROR: 'SMP_NUMERIC_COMPARISON_FAILED_ERROR', 122 SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR: 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR', 123 SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR' 124} 125 126SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE = 0 127SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE = 1 128SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE = 2 129SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE = 3 130SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4 131 132SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = { 133 SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE', 134 SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE', 135 SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE', 136 SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE', 137 SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE' 138} 139 140# Bit flags for key distribution/generation 141SMP_ENC_KEY_DISTRIBUTION_FLAG = 0b0001 142SMP_ID_KEY_DISTRIBUTION_FLAG = 0b0010 143SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100 144SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000 145 146# AuthReq fields 147SMP_BONDING_AUTHREQ = 0b00000001 148SMP_MITM_AUTHREQ = 0b00000100 149SMP_SC_AUTHREQ = 0b00001000 150SMP_KEYPRESS_AUTHREQ = 0b00010000 151SMP_CT2_AUTHREQ = 0b00100000 152 153# Crypto salt 154SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031') 155 156# ----------------------------------------------------------------------------- 157# Utils 158# ----------------------------------------------------------------------------- 159def error_name(error_code): 160 return name_or_number(SMP_ERROR_NAMES, error_code) 161 162 163# ----------------------------------------------------------------------------- 164# Classes 165# ----------------------------------------------------------------------------- 166class SMP_Command: 167 ''' 168 See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL 169 ''' 170 smp_classes = {} 171 code = 0 172 173 @staticmethod 174 def from_bytes(pdu): 175 code = pdu[0] 176 177 cls = SMP_Command.smp_classes.get(code) 178 if cls is None: 179 instance = SMP_Command(pdu) 180 instance.name = SMP_Command.command_name(code) 181 instance.code = code 182 return instance 183 self = cls.__new__(cls) 184 SMP_Command.__init__(self, pdu) 185 if hasattr(self, 'fields'): 186 self.init_from_bytes(pdu, 1) 187 return self 188 189 @staticmethod 190 def command_name(code): 191 return name_or_number(SMP_COMMAND_NAMES, code) 192 193 @staticmethod 194 def auth_req_str(value): 195 bonding_flags = value & 3 196 mitm = (value >> 2) & 1 197 sc = (value >> 3) & 1 198 keypress = (value >> 4) & 1 199 ct2 = (value >> 5) & 1 200 201 return f'bonding_flags={bonding_flags}, MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}' 202 203 @staticmethod 204 def io_capability_name(io_capability): 205 return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability) 206 207 @staticmethod 208 def key_distribution_str(value): 209 key_types = [] 210 if value & SMP_ENC_KEY_DISTRIBUTION_FLAG: 211 key_types.append('ENC') 212 if value & SMP_ID_KEY_DISTRIBUTION_FLAG: 213 key_types.append('ID') 214 if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 215 key_types.append('SIGN') 216 if value & SMP_LINK_KEY_DISTRIBUTION_FLAG: 217 key_types.append('LINK') 218 return ','.join(key_types) 219 220 @staticmethod 221 def keypress_notification_type_name(notification_type): 222 return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type) 223 224 @staticmethod 225 def subclass(fields): 226 def inner(cls): 227 cls.name = cls.__name__.upper() 228 cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name) 229 if cls.code is None: 230 raise KeyError(f'Command name {cls.name} not found in SMP_COMMAND_NAMES') 231 cls.fields = fields 232 233 # Register a factory for this class 234 SMP_Command.smp_classes[cls.code] = cls 235 236 return cls 237 238 return inner 239 240 def __init__(self, pdu=None, **kwargs): 241 if hasattr(self, 'fields') and kwargs: 242 HCI_Object.init_from_fields(self, self.fields, kwargs) 243 if pdu is None: 244 pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields) 245 self.pdu = pdu 246 247 def init_from_bytes(self, pdu, offset): 248 return HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 249 250 def to_bytes(self): 251 return self.pdu 252 253 def __bytes__(self): 254 return self.to_bytes() 255 256 def __str__(self): 257 result = color(self.name, 'yellow') 258 if fields := getattr(self, 'fields', None): 259 result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, ' ') 260 else: 261 if len(self.pdu) > 1: 262 result += f': {self.pdu.hex()}' 263 return result 264 265 266# ----------------------------------------------------------------------------- 267@SMP_Command.subclass([ 268 ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), 269 ('oob_data_flag', 1), 270 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 271 ('maximum_encryption_key_size', 1), 272 ('initiator_key_distribution', {'size': 1, 'mapper': SMP_Command.key_distribution_str}), 273 ('responder_key_distribution', {'size': 1, 'mapper': SMP_Command.key_distribution_str}) 274]) 275class SMP_Pairing_Request_Command(SMP_Command): 276 ''' 277 See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request 278 ''' 279 280 281# ----------------------------------------------------------------------------- 282@SMP_Command.subclass([ 283 ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}), 284 ('oob_data_flag', 1), 285 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 286 ('maximum_encryption_key_size', 1), 287 ('initiator_key_distribution', {'size': 1, 'mapper': SMP_Command.key_distribution_str}), 288 ('responder_key_distribution', {'size': 1, 'mapper': SMP_Command.key_distribution_str}) 289]) 290class SMP_Pairing_Response_Command(SMP_Command): 291 ''' 292 See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response 293 ''' 294 295 296# ----------------------------------------------------------------------------- 297@SMP_Command.subclass([ 298 ('confirm_value', 16) 299]) 300class SMP_Pairing_Confirm_Command(SMP_Command): 301 ''' 302 See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm 303 ''' 304 305 306# ----------------------------------------------------------------------------- 307@SMP_Command.subclass([ 308 ('random_value', 16) 309]) 310class SMP_Pairing_Random_Command(SMP_Command): 311 ''' 312 See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random 313 ''' 314 315 316# ----------------------------------------------------------------------------- 317@SMP_Command.subclass([ 318 ('reason', {'size': 1, 'mapper': error_name}) 319]) 320class SMP_Pairing_Failed_Command(SMP_Command): 321 ''' 322 See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed 323 ''' 324 325 326# ----------------------------------------------------------------------------- 327@SMP_Command.subclass([ 328 ('public_key_x', 32), 329 ('public_key_y', 32) 330]) 331class SMP_Pairing_Public_Key_Command(SMP_Command): 332 ''' 333 See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key 334 ''' 335 336 337# ----------------------------------------------------------------------------- 338@SMP_Command.subclass([ 339 ('dhkey_check', 16), 340]) 341class SMP_Pairing_DHKey_Check_Command(SMP_Command): 342 ''' 343 See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check 344 ''' 345 346 347# ----------------------------------------------------------------------------- 348@SMP_Command.subclass([ 349 ('notification_type', {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name}), 350]) 351class SMP_Pairing_Keypress_Notification_Command(SMP_Command): 352 ''' 353 See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification 354 ''' 355 356 357# ----------------------------------------------------------------------------- 358@SMP_Command.subclass([ 359 ('long_term_key', 16) 360]) 361class SMP_Encryption_Information_Command(SMP_Command): 362 ''' 363 See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information 364 ''' 365 366 367# ----------------------------------------------------------------------------- 368@SMP_Command.subclass([ 369 ('ediv', 2), 370 ('rand', 8) 371]) 372class SMP_Master_Identification_Command(SMP_Command): 373 ''' 374 See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification 375 ''' 376 377 378# ----------------------------------------------------------------------------- 379@SMP_Command.subclass([ 380 ('identity_resolving_key', 16) 381]) 382class SMP_Identity_Information_Command(SMP_Command): 383 ''' 384 See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information 385 ''' 386 387 388# ----------------------------------------------------------------------------- 389@SMP_Command.subclass([ 390 ('addr_type', Address.ADDRESS_TYPE_SPEC), 391 ('bd_addr', Address.parse_address_preceded_by_type) 392]) 393class SMP_Identity_Address_Information_Command(SMP_Command): 394 ''' 395 See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information 396 ''' 397 398 399# ----------------------------------------------------------------------------- 400@SMP_Command.subclass([ 401 ('signature_key', 16) 402]) 403class SMP_Signing_Information_Command(SMP_Command): 404 ''' 405 See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information 406 ''' 407 408 409# ----------------------------------------------------------------------------- 410@SMP_Command.subclass([ 411 ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}), 412]) 413class SMP_Security_Request_Command(SMP_Command): 414 ''' 415 See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request 416 ''' 417 418 419# ----------------------------------------------------------------------------- 420def smp_auth_req(bonding, mitm, sc, keypress, ct2): 421 value = 0 422 if bonding: 423 value |= SMP_BONDING_AUTHREQ 424 if mitm: 425 value |= SMP_MITM_AUTHREQ 426 if sc: 427 value |= SMP_SC_AUTHREQ 428 if keypress: 429 value |= SMP_KEYPRESS_AUTHREQ 430 if ct2: 431 value |= SMP_CT2_AUTHREQ 432 return value 433 434 435# ----------------------------------------------------------------------------- 436class AddressResolver: 437 def __init__(self, resolving_keys): 438 self.resolving_keys = resolving_keys 439 440 def resolve(self, address): 441 address_bytes = bytes(address) 442 hash = address_bytes[0:3] 443 prand = address_bytes[3:6] 444 for (irk, resolved_address) in self.resolving_keys: 445 local_hash = crypto.ah(irk, prand) 446 if local_hash == hash: 447 # Match! 448 if resolved_address.address_type == Address.PUBLIC_DEVICE_ADDRESS: 449 resolved_address_type = Address.PUBLIC_IDENTITY_ADDRESS 450 else: 451 resolved_address_type = Address.RANDOM_IDENTITY_ADDRESS 452 return Address(address=str(resolved_address), address_type=resolved_address_type) 453 454 455# ----------------------------------------------------------------------------- 456class PairingDelegate: 457 NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY 458 KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY 459 DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY 460 DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY 461 DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY 462 DEFAULT_KEY_DISTRIBUTION = (SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG) 463 464 def __init__( 465 self, 466 io_capability=NO_OUTPUT_NO_INPUT, 467 local_initiator_key_distribution=DEFAULT_KEY_DISTRIBUTION, 468 local_responder_key_distribution=DEFAULT_KEY_DISTRIBUTION 469 ): 470 self.io_capability = io_capability 471 self.local_initiator_key_distribution = local_initiator_key_distribution 472 self.local_responder_key_distribution = local_responder_key_distribution 473 474 async def accept(self): 475 return True 476 477 async def compare_numbers(self, number, digits=6): 478 return True 479 480 async def get_number(self): 481 return 0 482 483 async def display_number(self, number, digits=6): 484 pass 485 486 async def key_distribution_response(self, peer_initiator_key_distribution, peer_responder_key_distribution): 487 return ( 488 (peer_initiator_key_distribution & 489 self.local_initiator_key_distribution), 490 (peer_responder_key_distribution & 491 self.local_responder_key_distribution) 492 ) 493 494 495# ----------------------------------------------------------------------------- 496class PairingConfig: 497 def __init__(self, sc=True, mitm=True, bonding=True, delegate=None): 498 self.sc = sc 499 self.mitm = mitm 500 self.bonding = bonding 501 self.delegate = delegate or PairingDelegate() 502 503 def __str__(self): 504 io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability) 505 return f'PairingConfig(sc={self.sc}, mitm={self.mitm}, bonding={self.bonding}, delegate[{io_capability_str}])' 506 507 508# ----------------------------------------------------------------------------- 509class Session: 510 # Pairing methods 511 JUST_WORKS = 0 512 NUMERIC_COMPARISON = 1 513 PASSKEY = 2 514 OOB = 3 515 516 PAIRING_METHOD_NAMES = { 517 JUST_WORKS: 'JUST_WORKS', 518 NUMERIC_COMPARISON: 'NUMERIC_COMPARISON', 519 PASSKEY: 'PASSKEY', 520 OOB: 'OOB' 521 } 522 523 # I/O Capability to pairing method decision matrix 524 # 525 # See Bluetooth spec @ Vol 3, part H - Table 2.8: Mapping of IO Capabilities to Key Generation Method 526 # 527 # Map: initiator -> responder -> <method> 528 # where <method> may be a simple entry or a 2-element tuple, with the first element for legacy 529 # pairing and the second for secure connections, when the two are different. 530 # Each entry is either a method name, or, for PASSKEY, a tuple: 531 # (method, initiator_displays, responder_displays) 532 # to specify if the initiator and responder should display (True) or input a code (False). 533 PAIRING_METHODS = { 534 SMP_DISPLAY_ONLY_IO_CAPABILITY: { 535 SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS, 536 SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS, 537 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False), 538 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 539 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, True, False), 540 }, 541 SMP_DISPLAY_YES_NO_IO_CAPABILITY: { 542 SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS, 543 SMP_DISPLAY_YES_NO_IO_CAPABILITY: (JUST_WORKS, NUMERIC_COMPARISON), 544 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False), 545 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 546 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ((PASSKEY, True, False), NUMERIC_COMPARISON) 547 }, 548 SMP_KEYBOARD_ONLY_IO_CAPABILITY: { 549 SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True), 550 SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PASSKEY, False, True), 551 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, False, False), 552 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 553 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, False, True), 554 }, 555 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: { 556 SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS, 557 SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS, 558 SMP_KEYBOARD_ONLY_IO_CAPABILITY: JUST_WORKS, 559 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 560 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: JUST_WORKS 561 }, 562 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: { 563 SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True), 564 SMP_DISPLAY_YES_NO_IO_CAPABILITY: ((PASSKEY, False, True), NUMERIC_COMPARISON), 565 SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False), 566 SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS, 567 SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: ((PASSKEY, True, False), NUMERIC_COMPARISON) 568 } 569 } 570 571 def __init__(self, manager, connection, pairing_config): 572 self.manager = manager 573 self.connection = connection 574 self.tk = bytes(16) 575 self.r = bytes(16) 576 self.stk = None 577 self.ltk = None 578 self.ltk_ediv = 0 579 self.ltk_rand = bytes(8) 580 self.link_key = None 581 self.initiator_key_distribution = 0 582 self.responder_key_distribution = 0 583 self.peer_random_value = None 584 self.peer_public_key_x = bytes(32) 585 self.peer_public_key_y = bytes(32) 586 self.peer_ltk = None 587 self.peer_ediv = None 588 self.peer_rand = None 589 self.peer_identity_resolving_key = None 590 self.peer_bd_addr = None 591 self.peer_signature_key = None 592 self.peer_expected_distributions = [] 593 self.dh_key = None 594 self.passkey = 0 595 self.passkey_step = 0 596 self.passkey_display = False 597 self.pairing_method = 0 598 self.pairing_config = pairing_config 599 self.wait_before_continuing = None 600 self.completed = False 601 602 # Decide if we're the initiator or the responder 603 self.is_initiator = (connection.role == BT_CENTRAL_ROLE) 604 self.is_responder = not self.is_initiator 605 606 # Listen for connection events 607 connection.on('disconnection', self.on_disconnection) 608 connection.on('connection_encryption_change', self.on_connection_encryption_change) 609 connection.on('connection_encryption_key_refresh', self.on_connection_encryption_key_refresh) 610 611 # Create a future that can be used to wait for the session to complete 612 if self.is_initiator: 613 self.pairing_result = asyncio.get_running_loop().create_future() 614 else: 615 self.pairing_result = None 616 617 # Key Distribution (default values before negotiation) 618 self.initiator_key_distribution = pairing_config.delegate.local_initiator_key_distribution 619 self.responder_key_distribution = pairing_config.delegate.local_responder_key_distribution 620 621 # Authentication Requirements Flags - Vol 3, Part H, Figure 3.3 622 self.bonding = pairing_config.bonding 623 self.sc = pairing_config.sc 624 self.mitm = pairing_config.mitm 625 self.keypress = False 626 self.ct2 = False 627 628 # I/O Capabilities 629 self.io_capability = pairing_config.delegate.io_capability 630 self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY 631 632 # OOB (not supported yet) 633 self.oob = False 634 635 # Set up addresses 636 peer_address = connection.peer_resolvable_address or connection.peer_address 637 if self.is_initiator: 638 self.ia = bytes(manager.address) 639 self.iat = 1 if manager.address.is_random else 0 640 self.ra = bytes(peer_address) 641 self.rat = 1 if peer_address.is_random else 0 642 else: 643 self.ra = bytes(manager.address) 644 self.rat = 1 if manager.address.is_random else 0 645 self.ia = bytes(peer_address) 646 self.iat = 1 if peer_address.is_random else 0 647 648 @property 649 def pkx(self): 650 return ( 651 bytes(reversed(self.manager.ecc_key.x)), 652 self.peer_public_key_x 653 ) 654 655 @property 656 def pka(self): 657 return self.pkx[0 if self.is_initiator else 1] 658 659 @property 660 def pkb(self): 661 return self.pkx[0 if self.is_responder else 1] 662 663 @property 664 def nx(self): 665 return ( 666 self.r, 667 self.peer_random_value 668 ) 669 670 @property 671 def na(self): 672 return self.nx[0 if self.is_initiator else 1] 673 674 @property 675 def nb(self): 676 return self.nx[0 if self.is_responder else 1] 677 678 @property 679 def auth_req(self): 680 return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2) 681 682 def get_long_term_key(self, rand, ediv): 683 if not self.sc and not self.completed: 684 if rand == self.ltk_rand and ediv == self.ltk_ediv: 685 return self.stk 686 else: 687 return self.ltk 688 689 def decide_pairing_method(self, auth_req, initiator_io_capability, responder_io_capability): 690 if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0): 691 self.pairing_method = self.JUST_WORKS 692 return 693 694 details = self.PAIRING_METHODS[initiator_io_capability][responder_io_capability] 695 if type(details) is tuple and len(details) == 2: 696 # One entry for legacy pairing and one for secure connections 697 details = details[1 if self.sc else 0] 698 if type(details) is int: 699 # Just a method ID 700 self.pairing_method = details 701 else: 702 # PASSKEY method, with a method ID and display/input flags 703 self.pairing_method = details[0] 704 self.passkey_display = details[1 if self.is_initiator else 2] 705 706 def check_expected_value(self, expected, received, error): 707 logger.debug(f'expected={expected.hex()} got={received.hex()}') 708 if expected != received: 709 logger.info(color('pairing confirm/check mismatch', 'red')) 710 self.send_pairing_failed(error) 711 return False 712 return True 713 714 def prompt_user_for_numeric_comparison(self, code, next_steps): 715 async def prompt(): 716 logger.debug(f'verification code: {code}') 717 try: 718 response = await self.pairing_config.delegate.compare_numbers(code, digits=6) 719 if response: 720 next_steps() 721 return 722 except Exception as error: 723 logger.warn(f'exception while prompting: {error}') 724 725 self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR) 726 727 asyncio.create_task(prompt()) 728 729 def prompt_user_for_number(self, next_steps): 730 async def prompt(): 731 logger.debug('prompting user for passkey') 732 try: 733 passkey = await self.pairing_config.delegate.get_number() 734 logger.debug(f'user input: {passkey}') 735 next_steps(passkey) 736 except Exception as error: 737 logger.warn(f'exception while prompting: {error}') 738 self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR) 739 740 asyncio.create_task(prompt()) 741 742 def display_passkey(self): 743 # Generate random Passkey/PIN code 744 self.passkey = secrets.randbelow(1000000) 745 logger.debug(f'Pairing PIN CODE: {self.passkey:06}') 746 747 # The value of TK is computed from the PIN code 748 if not self.sc: 749 self.tk = self.passkey.to_bytes(16, byteorder='little') 750 logger.debug(f'TK from passkey = {self.tk.hex()}') 751 752 asyncio.create_task(self.pairing_config.delegate.display_number(self.passkey, digits=6)) 753 754 def input_passkey(self, next_steps=None): 755 # Prompt the user for the passkey displayed on the peer 756 def after_input(passkey): 757 self.passkey = passkey 758 759 if not self.sc: 760 self.tk = passkey.to_bytes(16, byteorder='little') 761 logger.debug(f'TK from passkey = {self.tk.hex()}') 762 763 if next_steps is not None: 764 next_steps() 765 self.prompt_user_for_number(after_input) 766 767 def display_or_input_passkey(self, next_steps=None): 768 if self.passkey_display: 769 self.display_passkey() 770 if next_steps is not None: 771 next_steps() 772 else: 773 self.input_passkey(next_steps) 774 775 def send_command(self, command): 776 self.manager.send_command(self.connection, command) 777 778 def send_pairing_failed(self, error): 779 self.send_command(SMP_Pairing_Failed_Command(reason = error)) 780 self.on_pairing_failure(error) 781 782 def send_pairing_request_command(self): 783 self.manager.on_session_start(self) 784 785 command = SMP_Pairing_Request_Command( 786 io_capability = self.io_capability, 787 oob_data_flag = 0, 788 auth_req = self.auth_req, 789 maximum_encryption_key_size = 16, 790 initiator_key_distribution = self.initiator_key_distribution, 791 responder_key_distribution = self.responder_key_distribution 792 ) 793 self.preq = bytes(command) 794 self.send_command(command) 795 796 def send_pairing_response_command(self): 797 response = SMP_Pairing_Response_Command( 798 io_capability = self.io_capability, 799 oob_data_flag = 0, 800 auth_req = self.auth_req, 801 maximum_encryption_key_size = 16, 802 initiator_key_distribution = self.initiator_key_distribution, 803 responder_key_distribution = self.responder_key_distribution 804 ) 805 self.pres = bytes(response) 806 self.send_command(response) 807 808 def send_pairing_confirm_command(self): 809 self.r = crypto.r() 810 logger.debug(f'generated random: {self.r.hex()}') 811 812 if self.sc: 813 if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON: 814 z = 0 815 elif self.pairing_method == self.PASSKEY: 816 z = 0x80 + ((self.passkey >> self.passkey_step) & 1) 817 else: 818 return 819 820 if self.is_initiator: 821 confirm_value = crypto.f4( 822 self.pka, 823 self.pkb, 824 self.r, 825 bytes([z]) 826 ) 827 else: 828 confirm_value = crypto.f4( 829 self.pkb, 830 self.pka, 831 self.r, 832 bytes([z]) 833 ) 834 else: 835 confirm_value = crypto.c1( 836 self.tk, 837 self.r, 838 self.preq, 839 self.pres, 840 self.iat, 841 self.rat, 842 self.ia, 843 self.ra 844 ) 845 846 self.send_command(SMP_Pairing_Confirm_Command(confirm_value = confirm_value)) 847 848 def send_pairing_random_command(self): 849 self.send_command(SMP_Pairing_Random_Command(random_value = self.r)) 850 851 def send_public_key_command(self): 852 self.send_command( 853 SMP_Pairing_Public_Key_Command( 854 public_key_x = bytes(reversed(self.manager.ecc_key.x)), 855 public_key_y = bytes(reversed(self.manager.ecc_key.y)) 856 ) 857 ) 858 859 def send_pairing_dhkey_check_command(self): 860 self.send_command( 861 SMP_Pairing_DHKey_Check_Command( 862 dhkey_check = self.ea if self.is_initiator else self.eb 863 ) 864 ) 865 866 def start_encryption(self, key): 867 # We can now encrypt the connection with the short term key, so that we can 868 # distribute the long term and/or other keys over an encrypted connection 869 asyncio.create_task( 870 self.manager.device.host.send_command( 871 HCI_LE_Start_Encryption_Command( 872 connection_handle = self.connection.handle, 873 random_number = bytes(8), 874 encrypted_diversifier = 0, 875 long_term_key = key 876 ) 877 ) 878 ) 879 880 def distribute_keys(self): 881 # Distribute the keys as required 882 if self.is_initiator: 883 if not self.sc: 884 # Distribute the LTK, EDIV and RAND 885 if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: 886 self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk)) 887 self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand)) 888 889 # Distribute IRK & BD ADDR 890 if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: 891 self.send_command( 892 SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) 893 ) 894 self.send_command(SMP_Identity_Address_Information_Command( 895 addr_type = self.manager.address.address_type, 896 bd_addr = self.manager.address 897 )) 898 899 # Distribute CSRK 900 csrk = bytes(16) # FIXME: testing 901 if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 902 self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) 903 904 # CTKD, calculate BR/EDR link key 905 if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: 906 ilk = crypto.h7( 907 salt=SMP_CTKD_H7_LEBR_SALT, 908 w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1') 909 self.link_key = crypto.h6(ilk, b'lebr') 910 911 else: 912 # Distribute the LTK, EDIV and RAND 913 if not self.sc: 914 if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG: 915 self.send_command(SMP_Encryption_Information_Command(long_term_key=self.ltk)) 916 self.send_command(SMP_Master_Identification_Command(ediv=self.ltk_ediv, rand=self.ltk_rand)) 917 918 # Distribute IRK & BD ADDR 919 if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG: 920 self.send_command( 921 SMP_Identity_Information_Command(identity_resolving_key=self.manager.device.irk) 922 ) 923 self.send_command(SMP_Identity_Address_Information_Command( 924 addr_type = self.manager.address.address_type, 925 bd_addr = self.manager.address 926 )) 927 928 # Distribute CSRK 929 csrk = bytes(16) # FIXME: testing 930 if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG: 931 self.send_command(SMP_Signing_Information_Command(signature_key=csrk)) 932 933 # CTKD, calculate BR/EDR link key 934 if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG: 935 ilk = crypto.h7( 936 salt=SMP_CTKD_H7_LEBR_SALT, 937 w=self.ltk) if self.ct2 else crypto.h6(self.ltk, b'tmp1') 938 self.link_key = crypto.h6(ilk, b'lebr') 939 940 def compute_peer_expected_distributions(self, key_distribution_flags): 941 # Set our expectations for what to wait for in the key distribution phase 942 self.peer_expected_distributions = [] 943 if not self.sc: 944 if (key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0): 945 self.peer_expected_distributions.append(SMP_Encryption_Information_Command) 946 self.peer_expected_distributions.append(SMP_Master_Identification_Command) 947 if (key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0): 948 self.peer_expected_distributions.append(SMP_Identity_Information_Command) 949 self.peer_expected_distributions.append(SMP_Identity_Address_Information_Command) 950 if (key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0): 951 self.peer_expected_distributions.append(SMP_Signing_Information_Command) 952 logger.debug(f'expecting distributions: {[c.__name__ for c in self.peer_expected_distributions]}') 953 954 def check_key_distribution(self, command_class): 955 # First, check that the connection is encrypted 956 if not self.connection.is_encrypted: 957 logger.warn(color('received key distribution on a non-encrypted connection', 'red')) 958 self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) 959 return 960 961 # Check that this command class is expected 962 if command_class in self.peer_expected_distributions: 963 self.peer_expected_distributions.remove(command_class) 964 logger.debug(f'remaining distributions: {[c.__name__ for c in self.peer_expected_distributions]}') 965 if not self.peer_expected_distributions: 966 # The initiator can now send its keys 967 if self.is_initiator: 968 self.distribute_keys() 969 970 # Nothing left to expect, we're done 971 self.on_pairing() 972 else: 973 logger.warn(color(f'!!! unexpected key distribution command: {command_class.__name__}', 'red')) 974 self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR) 975 976 async def pair(self): 977 # Start pairing as an initiator 978 # TODO: check that this session isn't already active 979 980 # Send the pairing request to start the process 981 self.send_pairing_request_command() 982 983 # Wait for the pairing process to finish 984 await self.pairing_result 985 986 def on_disconnection(self, reason): 987 self.connection.remove_listener('disconnection', self.on_disconnection) 988 self.connection.remove_listener('connection_encryption_change', self.on_connection_encryption_change) 989 self.connection.remove_listener('connection_encryption_key_refresh', self.on_connection_encryption_key_refresh) 990 self.manager.on_session_end(self) 991 992 def on_connection_encryption_change(self): 993 if self.connection.is_encrypted: 994 if self.is_responder: 995 # The responder distributes its keys first, the initiator later 996 self.distribute_keys() 997 998 def on_connection_encryption_key_refresh(self): 999 # Do as if the connection had just been encrypted 1000 self.on_connection_encryption_change() 1001 1002 def on_pairing(self): 1003 logger.debug('pairing complete') 1004 1005 if self.completed: 1006 return 1007 else: 1008 self.completed = True 1009 1010 if self.pairing_result is not None and not self.pairing_result.done(): 1011 self.pairing_result.set_result(None) 1012 1013 # Use the peer address from the pairing protocol or the connection 1014 if self.peer_bd_addr: 1015 peer_address = self.peer_bd_addr 1016 else: 1017 peer_address = self.connection.peer_address 1018 1019 # Create an object to hold the keys 1020 keys = PairingKeys() 1021 keys.address_type = peer_address.address_type 1022 authenticated = self.pairing_method != self.JUST_WORKS 1023 if self.sc: 1024 keys.ltk = PairingKeys.Key( 1025 value = self.ltk, 1026 authenticated = authenticated 1027 ) 1028 else: 1029 our_ltk_key = PairingKeys.Key( 1030 value = self.ltk, 1031 authenticated = authenticated, 1032 ediv = self.ltk_ediv, 1033 rand = self.ltk_rand 1034 ) 1035 peer_ltk_key = PairingKeys.Key( 1036 value = self.peer_ltk, 1037 authenticated = authenticated, 1038 ediv = self.peer_ediv, 1039 rand = self.peer_rand 1040 ) 1041 if self.is_initiator: 1042 keys.ltk_central = peer_ltk_key 1043 keys.ltk_peripheral = our_ltk_key 1044 else: 1045 keys.ltk_central = our_ltk_key 1046 keys.ltk_peripheral = peer_ltk_key 1047 if self.peer_identity_resolving_key is not None: 1048 keys.irk = PairingKeys.Key( 1049 value = self.peer_identity_resolving_key, 1050 authenticated = authenticated 1051 ) 1052 if self.peer_signature_key is not None: 1053 keys.csrk = PairingKeys.Key( 1054 value = self.peer_signature_key, 1055 authenticated = authenticated 1056 ) 1057 if self.link_key is not None: 1058 keys.link_key = PairingKeys.Key( 1059 value = self.link_key, 1060 authenticated = authenticated 1061 ) 1062 1063 self.manager.on_pairing(self, peer_address, keys) 1064 1065 def on_pairing_failure(self, reason): 1066 logger.warn(f'pairing failure ({error_name(reason)})') 1067 1068 if self.completed: 1069 return 1070 else: 1071 self.completed = True 1072 1073 error = ProtocolError(reason, 'smp', error_name(reason)) 1074 if self.pairing_result is not None and not self.pairing_result.done(): 1075 self.pairing_result.set_exception(error) 1076 self.manager.on_pairing_failure(self, reason) 1077 1078 def on_smp_command(self, command): 1079 # Find the handler method 1080 handler_name = f'on_{command.name.lower()}' 1081 handler = getattr(self, handler_name, None) 1082 if handler is not None: 1083 try: 1084 handler(command) 1085 except Exception as error: 1086 logger.warning(f'{color("!!! Exception in handler:", "red")} {error}') 1087 response = SMP_Pairing_Failed_Command(reason = SMP_UNSPECIFIED_REASON_ERROR) 1088 self.send_command(response) 1089 else: 1090 logger.error(color('SMP command not handled???', 'red')) 1091 1092 def on_smp_pairing_request_command(self, command): 1093 asyncio.create_task(self.on_smp_pairing_request_command_async(command)) 1094 1095 async def on_smp_pairing_request_command_async(self, command): 1096 # Check if the request should proceed 1097 accepted = await self.pairing_config.delegate.accept() 1098 if not accepted: 1099 logger.debug('pairing rejected by delegate') 1100 self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR) 1101 return 1102 1103 # Save the request 1104 self.preq = bytes(command) 1105 1106 # Bonding and SC require both sides to request/support it 1107 self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) 1108 self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) 1109 self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0) 1110 1111 # Check for OOB 1112 if command.oob_data_flag != 0: 1113 self.terminate(SMP_OOB_NOT_AVAILABLE_ERROR) 1114 return 1115 1116 # Decide which pairing method to use 1117 self.decide_pairing_method( 1118 command.auth_req, 1119 command.io_capability, 1120 self.io_capability 1121 ) 1122 logger.debug(f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}') 1123 1124 # Key distribution 1125 self.initiator_key_distribution, self.responder_key_distribution = await self.pairing_config.delegate.key_distribution_response( 1126 command.initiator_key_distribution, command.responder_key_distribution) 1127 self.compute_peer_expected_distributions(self.initiator_key_distribution) 1128 1129 # The pairing is now starting 1130 self.manager.on_session_start(self) 1131 1132 # Display a passkey if we need to 1133 if not self.sc: 1134 if self.pairing_method == self.PASSKEY and self.passkey_display: 1135 self.display_passkey() 1136 1137 # Respond 1138 self.send_pairing_response_command() 1139 1140 def on_smp_pairing_response_command(self, command): 1141 if self.is_responder: 1142 logger.warn(color('received pairing response as a responder', 'red')) 1143 return 1144 1145 # Save the response 1146 self.pres = bytes(command) 1147 self.peer_io_capability = command.io_capability 1148 1149 # Bonding and SC require both sides to request/support it 1150 self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0) 1151 self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0) 1152 1153 # Check for OOB 1154 if self.sc and command.oob_data_flag: 1155 self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR) 1156 return 1157 1158 # Decide which pairing method to use 1159 self.decide_pairing_method( 1160 command.auth_req, 1161 self.io_capability, 1162 command.io_capability 1163 ) 1164 logger.debug(f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}') 1165 1166 # Key distribution 1167 if (command.initiator_key_distribution & ~self.initiator_key_distribution != 0) or \ 1168 (command.responder_key_distribution & ~self.responder_key_distribution != 0): 1169 # The response isn't a subset of the request 1170 self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR) 1171 return 1172 self.initiator_key_distribution = command.initiator_key_distribution 1173 self.responder_key_distribution = command.responder_key_distribution 1174 self.compute_peer_expected_distributions(self.responder_key_distribution) 1175 1176 # Start phase 2 1177 if self.sc: 1178 if self.pairing_method == self.PASSKEY and self.passkey_display: 1179 self.display_passkey() 1180 1181 self.send_public_key_command() 1182 else: 1183 if self.pairing_method == self.PASSKEY: 1184 self.display_or_input_passkey(self.send_pairing_confirm_command) 1185 else: 1186 self.send_pairing_confirm_command() 1187 1188 def on_smp_pairing_confirm_command_legacy(self, command): 1189 if self.is_initiator: 1190 self.send_pairing_random_command() 1191 else: 1192 # If the method is PASSKEY, now is the time to input the code 1193 if self.pairing_method == self.PASSKEY and not self.passkey_display: 1194 self.input_passkey(self.send_pairing_confirm_command) 1195 else: 1196 self.send_pairing_confirm_command() 1197 1198 def on_smp_pairing_confirm_command_secure_connections(self, command): 1199 if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON: 1200 if self.is_initiator: 1201 self.r = crypto.r() 1202 self.send_pairing_random_command() 1203 elif self.pairing_method == self.PASSKEY: 1204 if self.is_initiator: 1205 self.send_pairing_random_command() 1206 else: 1207 self.send_pairing_confirm_command() 1208 1209 def on_smp_pairing_confirm_command(self, command): 1210 self.confirm_value = command.confirm_value 1211 if self.sc: 1212 self.on_smp_pairing_confirm_command_secure_connections(command) 1213 else: 1214 self.on_smp_pairing_confirm_command_legacy(command) 1215 1216 def on_smp_pairing_random_command_legacy(self, command): 1217 # Check that the confirmation values match 1218 confirm_verifier = crypto.c1( 1219 self.tk, 1220 command.random_value, 1221 self.preq, 1222 self.pres, 1223 self.iat, 1224 self.rat, 1225 self.ia, 1226 self.ra 1227 ) 1228 if not self.check_expected_value( 1229 self.confirm_value, 1230 confirm_verifier, 1231 SMP_CONFIRM_VALUE_FAILED_ERROR 1232 ): 1233 return 1234 1235 # Compute STK 1236 if self.is_initiator: 1237 mrand = self.r 1238 srand = command.random_value 1239 else: 1240 srand = self.r 1241 mrand = command.random_value 1242 stk = crypto.s1(self.tk, srand, mrand) 1243 logger.debug(f'STK = {stk.hex()}') 1244 1245 # Generate LTK 1246 self.ltk = crypto.r() 1247 1248 if self.is_initiator: 1249 self.start_encryption(stk) 1250 else: 1251 self.send_pairing_random_command() 1252 1253 def on_smp_pairing_random_command_secure_connections(self, command): 1254 if self.is_initiator: 1255 if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON: 1256 # Check that the random value matches what was committed to earlier 1257 confirm_verifier = crypto.f4( 1258 self.pkb, 1259 self.pka, 1260 command.random_value, 1261 bytes([0]) 1262 ) 1263 if not self.check_expected_value( 1264 self.confirm_value, 1265 confirm_verifier, 1266 SMP_CONFIRM_VALUE_FAILED_ERROR 1267 ): 1268 return 1269 elif self.pairing_method == self.PASSKEY: 1270 # Check that the random value matches what was committed to earlier 1271 confirm_verifier = crypto.f4( 1272 self.pkb, 1273 self.pka, 1274 command.random_value, 1275 bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]) 1276 ) 1277 if not self.check_expected_value( 1278 self.confirm_value, 1279 confirm_verifier, 1280 SMP_CONFIRM_VALUE_FAILED_ERROR 1281 ): 1282 return 1283 1284 # Move on to the next iteration 1285 self.passkey_step += 1 1286 logger.debug(f'passkey finished step {self.passkey_step} of 20') 1287 if self.passkey_step < 20: 1288 self.send_pairing_confirm_command() 1289 return 1290 else: 1291 return 1292 else: 1293 if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON: 1294 self.send_pairing_random_command() 1295 elif self.pairing_method == self.PASSKEY: 1296 # Check that the random value matches what was committed to earlier 1297 confirm_verifier = crypto.f4( 1298 self.pka, 1299 self.pkb, 1300 command.random_value, 1301 bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]) 1302 ) 1303 if not self.check_expected_value( 1304 self.confirm_value, 1305 confirm_verifier, 1306 SMP_CONFIRM_VALUE_FAILED_ERROR 1307 ): 1308 return 1309 1310 self.send_pairing_random_command() 1311 1312 # Move on to the next iteration 1313 self.passkey_step += 1 1314 logger.debug(f'passkey finished step {self.passkey_step} of 20') 1315 if self.passkey_step < 20: 1316 self.r = crypto.r() 1317 return 1318 else: 1319 return 1320 1321 # Compute the MacKey and LTK 1322 a = self.ia + bytes([self.iat]) 1323 b = self.ra + bytes([self.rat]) 1324 (mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b) 1325 1326 # Compute the DH Key checks 1327 if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON: 1328 ra = bytes(16) 1329 rb = ra 1330 elif self.pairing_method == self.PASSKEY: 1331 ra = self.passkey.to_bytes(16, byteorder='little') 1332 rb = ra 1333 else: 1334 # OOB not implemented yet 1335 return 1336 1337 io_cap_a = self.preq[1:4] 1338 io_cap_b = self.pres[1:4] 1339 self.ea = crypto.f6(mac_key, self.na, self.nb, rb, io_cap_a, a, b) 1340 self.eb = crypto.f6(mac_key, self.nb, self.na, ra, io_cap_b, b, a) 1341 1342 # Next steps to be performed after possible user confirmation 1343 def next_steps(): 1344 # The initiator sends the DH Key check to the responder 1345 if self.is_initiator: 1346 self.send_pairing_dhkey_check_command() 1347 else: 1348 if self.wait_before_continuing: 1349 self.wait_before_continuing.set_result(None) 1350 1351 # Prompt the user for confirmation if needed 1352 if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON: 1353 # Compute the 6-digit code 1354 code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000 1355 1356 if self.pairing_method == self.NUMERIC_COMPARISON: 1357 # Ask for user confirmation 1358 self.wait_before_continuing = asyncio.get_running_loop().create_future() 1359 self.prompt_user_for_numeric_comparison(code, next_steps) 1360 else: 1361 next_steps() 1362 else: 1363 next_steps() 1364 1365 def on_smp_pairing_random_command(self, command): 1366 self.peer_random_value = command.random_value 1367 if self.sc: 1368 self.on_smp_pairing_random_command_secure_connections(command) 1369 else: 1370 self.on_smp_pairing_random_command_legacy(command) 1371 1372 def on_smp_pairing_public_key_command(self, command): 1373 # Store the public key so that we can compute the confirmation value later 1374 self.peer_public_key_x = command.public_key_x 1375 self.peer_public_key_y = command.public_key_y 1376 1377 # Compute the DH key 1378 self.dh_key = bytes(reversed(self.manager.ecc_key.dh( 1379 bytes(reversed(command.public_key_x)), 1380 bytes(reversed(command.public_key_y)) 1381 ))) 1382 logger.debug(f'DH key: {self.dh_key.hex()}') 1383 1384 if self.is_initiator: 1385 if self.pairing_method == self.PASSKEY: 1386 if self.passkey_display: 1387 self.send_pairing_confirm_command() 1388 else: 1389 self.input_passkey(self.send_pairing_confirm_command) 1390 else: 1391 # Send our public key back to the initiator 1392 if self.pairing_method == self.PASSKEY: 1393 self.display_or_input_passkey(self.send_public_key_command) 1394 else: 1395 self.send_public_key_command() 1396 1397 if self.pairing_method == self.JUST_WORKS or self.pairing_method == self.NUMERIC_COMPARISON: 1398 # We can now send the confirmation value 1399 self.send_pairing_confirm_command() 1400 1401 def on_smp_pairing_dhkey_check_command(self, command): 1402 # Check that what we received matches what we computed earlier 1403 expected = self.eb if self.is_initiator else self.ea 1404 if not self.check_expected_value( 1405 expected, 1406 command.dhkey_check, 1407 SMP_DHKEY_CHECK_FAILED_ERROR 1408 ): 1409 return 1410 1411 if self.is_responder: 1412 if self.wait_before_continuing is not None: 1413 async def next_steps(): 1414 await self.wait_before_continuing 1415 self.wait_before_continuing = None 1416 self.send_pairing_dhkey_check_command() 1417 1418 asyncio.create_task(next_steps()) 1419 else: 1420 self.send_pairing_dhkey_check_command() 1421 else: 1422 self.start_encryption(self.ltk) 1423 1424 def on_smp_pairing_failed_command(self, command): 1425 self.on_pairing_failure(command.reason) 1426 1427 def on_smp_encryption_information_command(self, command): 1428 self.peer_ltk = command.long_term_key 1429 self.check_key_distribution(SMP_Encryption_Information_Command) 1430 1431 def on_smp_master_identification_command(self, command): 1432 self.peer_ediv = command.ediv 1433 self.peer_rand = command.rand 1434 self.check_key_distribution(SMP_Master_Identification_Command) 1435 1436 def on_smp_identity_information_command(self, command): 1437 self.peer_identity_resolving_key = command.identity_resolving_key 1438 self.check_key_distribution(SMP_Identity_Information_Command) 1439 1440 def on_smp_identity_address_information_command(self, command): 1441 self.peer_bd_addr = command.bd_addr 1442 self.check_key_distribution(SMP_Identity_Address_Information_Command) 1443 1444 def on_smp_signing_information_command(self, command): 1445 self.peer_signature_key = command.signature_key 1446 self.check_key_distribution(SMP_Signing_Information_Command) 1447 1448 1449# ----------------------------------------------------------------------------- 1450class Manager(EventEmitter): 1451 ''' 1452 Implements the Initiator and Responder roles of the Security Manager Protocol 1453 ''' 1454 1455 def __init__(self, device, address): 1456 super().__init__() 1457 self.device = device 1458 self.address = address 1459 self.sessions = {} 1460 self._ecc_key = None 1461 self.pairing_config_factory = lambda connection: PairingConfig() 1462 1463 def send_command(self, connection, command): 1464 logger.debug(f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] {connection.peer_address}: {command}') 1465 connection.send_l2cap_pdu(SMP_CID, command.to_bytes()) 1466 1467 def on_smp_pdu(self, connection, pdu): 1468 # Look for a session with this connection, and create one if none exists 1469 if not (session := self.sessions.get(connection.handle)): 1470 pairing_config = self.pairing_config_factory(connection) 1471 if pairing_config is None: 1472 # Pairing disabled 1473 self.send_command( 1474 connection, 1475 SMP_Pairing_Failed_Command( 1476 reason = SMP_PAIRING_NOT_SUPPORTED_ERROR 1477 ) 1478 ) 1479 return 1480 session = Session(self, connection, pairing_config) 1481 self.sessions[connection.handle] = session 1482 1483 # Parse the L2CAP payload into an SMP Command object 1484 command = SMP_Command.from_bytes(pdu) 1485 logger.debug(f'<<< Received SMP Command on connection [0x{connection.handle:04X}] {connection.peer_address}: {command}') 1486 1487 # Delegate the handling of the command to the session 1488 session.on_smp_command(command) 1489 1490 @property 1491 def ecc_key(self): 1492 if self._ecc_key is None: 1493 self._ecc_key = crypto.EccKey.generate() 1494 return self._ecc_key 1495 1496 async def pair(self, connection): 1497 # TODO: check if there's already a session for this connection 1498 pairing_config = self.pairing_config_factory(connection) 1499 if pairing_config is None: 1500 raise ValueError('pairing config must not be None when initiating') 1501 session = Session(self, connection, pairing_config) 1502 self.sessions[connection.handle] = session 1503 return await session.pair() 1504 1505 def request_pairing(self, connection): 1506 pairing_config = self.pairing_config_factory(connection) 1507 if pairing_config: 1508 auth_req = smp_auth_req( 1509 pairing_config.bonding, 1510 pairing_config.mitm, 1511 pairing_config.sc, 1512 False, 1513 False 1514 ) 1515 else: 1516 auth_req = 0 1517 self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req)) 1518 1519 def on_session_start(self, session): 1520 self.device.on_pairing_start(session.connection.handle) 1521 1522 def on_pairing(self, session, identity_address, keys): 1523 # Store the keys in the key store 1524 if self.device.keystore and identity_address is not None: 1525 async def store_keys(): 1526 try: 1527 await self.device.keystore.update(str(identity_address), keys) 1528 except Exception as error: 1529 logger.warn(f'!!! error while storing keys: {error}') 1530 asyncio.create_task(store_keys()) 1531 1532 # Notify the device 1533 self.device.on_pairing(session.connection.handle, keys) 1534 1535 def on_pairing_failure(self, session, reason): 1536 self.device.on_pairing_failure(session.connection.handle, reason) 1537 1538 def on_session_end(self, session): 1539 logger.debug(f'session end for connection 0x{session.connection.handle:04X}') 1540 if session.connection.handle in self.sessions: 1541 del self.sessions[session.connection.handle] 1542 1543 def get_long_term_key(self, connection, rand, ediv): 1544 if session := self.sessions.get(connection.handle): 1545 return session.get_long_term_key(rand, ediv) 1546