• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# SMP - Security Manager Protocol
17#
18# See Bluetooth spec @ Vol 3, Part H
19#
20# -----------------------------------------------------------------------------
21
22# -----------------------------------------------------------------------------
23# Imports
24# -----------------------------------------------------------------------------
25from __future__ import annotations
26import logging
27import asyncio
28import enum
29import secrets
30from dataclasses import dataclass
31from typing import (
32    TYPE_CHECKING,
33    Any,
34    Awaitable,
35    Callable,
36    Dict,
37    List,
38    Optional,
39    Tuple,
40    Type,
41    cast,
42)
43
44from pyee import EventEmitter
45
46from .colors import color
47from .hci import (
48    Address,
49    HCI_LE_Enable_Encryption_Command,
50    HCI_Object,
51    key_with_value,
52)
53from .core import (
54    BT_BR_EDR_TRANSPORT,
55    BT_CENTRAL_ROLE,
56    BT_LE_TRANSPORT,
57    AdvertisingData,
58    ProtocolError,
59    name_or_number,
60)
61from .keys import PairingKeys
62from . import crypto
63
64if TYPE_CHECKING:
65    from bumble.device import Connection, Device
66    from bumble.pairing import PairingConfig
67
68
69# -----------------------------------------------------------------------------
70# Logging
71# -----------------------------------------------------------------------------
72logger = logging.getLogger(__name__)
73
74
75# -----------------------------------------------------------------------------
76# Constants
77# -----------------------------------------------------------------------------
78# fmt: off
79# pylint: disable=line-too-long
80
81SMP_CID = 0x06
82SMP_BR_CID = 0x07
83
84SMP_PAIRING_REQUEST_COMMAND               = 0x01
85SMP_PAIRING_RESPONSE_COMMAND              = 0x02
86SMP_PAIRING_CONFIRM_COMMAND               = 0x03
87SMP_PAIRING_RANDOM_COMMAND                = 0x04
88SMP_PAIRING_FAILED_COMMAND                = 0x05
89SMP_ENCRYPTION_INFORMATION_COMMAND        = 0x06
90SMP_MASTER_IDENTIFICATION_COMMAND         = 0x07
91SMP_IDENTITY_INFORMATION_COMMAND          = 0x08
92SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND  = 0x09
93SMP_SIGNING_INFORMATION_COMMAND           = 0x0A
94SMP_SECURITY_REQUEST_COMMAND              = 0x0B
95SMP_PAIRING_PUBLIC_KEY_COMMAND            = 0x0C
96SMP_PAIRING_DHKEY_CHECK_COMMAND           = 0x0D
97SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E
98
99SMP_COMMAND_NAMES = {
100    SMP_PAIRING_REQUEST_COMMAND:               'SMP_PAIRING_REQUEST_COMMAND',
101    SMP_PAIRING_RESPONSE_COMMAND:              'SMP_PAIRING_RESPONSE_COMMAND',
102    SMP_PAIRING_CONFIRM_COMMAND:               'SMP_PAIRING_CONFIRM_COMMAND',
103    SMP_PAIRING_RANDOM_COMMAND:                'SMP_PAIRING_RANDOM_COMMAND',
104    SMP_PAIRING_FAILED_COMMAND:                'SMP_PAIRING_FAILED_COMMAND',
105    SMP_ENCRYPTION_INFORMATION_COMMAND:        'SMP_ENCRYPTION_INFORMATION_COMMAND',
106    SMP_MASTER_IDENTIFICATION_COMMAND:         'SMP_MASTER_IDENTIFICATION_COMMAND',
107    SMP_IDENTITY_INFORMATION_COMMAND:          'SMP_IDENTITY_INFORMATION_COMMAND',
108    SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND:  'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND',
109    SMP_SIGNING_INFORMATION_COMMAND:           'SMP_SIGNING_INFORMATION_COMMAND',
110    SMP_SECURITY_REQUEST_COMMAND:              'SMP_SECURITY_REQUEST_COMMAND',
111    SMP_PAIRING_PUBLIC_KEY_COMMAND:            'SMP_PAIRING_PUBLIC_KEY_COMMAND',
112    SMP_PAIRING_DHKEY_CHECK_COMMAND:           'SMP_PAIRING_DHKEY_CHECK_COMMAND',
113    SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND'
114}
115
116SMP_DISPLAY_ONLY_IO_CAPABILITY       = 0x00
117SMP_DISPLAY_YES_NO_IO_CAPABILITY     = 0x01
118SMP_KEYBOARD_ONLY_IO_CAPABILITY      = 0x02
119SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03
120SMP_KEYBOARD_DISPLAY_IO_CAPABILITY   = 0x04
121
122SMP_IO_CAPABILITY_NAMES = {
123    SMP_DISPLAY_ONLY_IO_CAPABILITY:       'SMP_DISPLAY_ONLY_IO_CAPABILITY',
124    SMP_DISPLAY_YES_NO_IO_CAPABILITY:     'SMP_DISPLAY_YES_NO_IO_CAPABILITY',
125    SMP_KEYBOARD_ONLY_IO_CAPABILITY:      'SMP_KEYBOARD_ONLY_IO_CAPABILITY',
126    SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY',
127    SMP_KEYBOARD_DISPLAY_IO_CAPABILITY:   'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY'
128}
129
130SMP_PASSKEY_ENTRY_FAILED_ERROR                       = 0x01
131SMP_OOB_NOT_AVAILABLE_ERROR                          = 0x02
132SMP_AUTHENTICATION_REQUIREMENTS_ERROR                = 0x03
133SMP_CONFIRM_VALUE_FAILED_ERROR                       = 0x04
134SMP_PAIRING_NOT_SUPPORTED_ERROR                      = 0x05
135SMP_ENCRYPTION_KEY_SIZE_ERROR                        = 0x06
136SMP_COMMAND_NOT_SUPPORTED_ERROR                      = 0x07
137SMP_UNSPECIFIED_REASON_ERROR                         = 0x08
138SMP_REPEATED_ATTEMPTS_ERROR                          = 0x09
139SMP_INVALID_PARAMETERS_ERROR                         = 0x0A
140SMP_DHKEY_CHECK_FAILED_ERROR                         = 0x0B
141SMP_NUMERIC_COMPARISON_FAILED_ERROR                  = 0x0C
142SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR                 = 0x0D
143SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E
144
145SMP_ERROR_NAMES = {
146    SMP_PASSKEY_ENTRY_FAILED_ERROR:                       'SMP_PASSKEY_ENTRY_FAILED_ERROR',
147    SMP_OOB_NOT_AVAILABLE_ERROR:                          'SMP_OOB_NOT_AVAILABLE_ERROR',
148    SMP_AUTHENTICATION_REQUIREMENTS_ERROR:                'SMP_AUTHENTICATION_REQUIREMENTS_ERROR',
149    SMP_CONFIRM_VALUE_FAILED_ERROR:                       'SMP_CONFIRM_VALUE_FAILED_ERROR',
150    SMP_PAIRING_NOT_SUPPORTED_ERROR:                      'SMP_PAIRING_NOT_SUPPORTED_ERROR',
151    SMP_ENCRYPTION_KEY_SIZE_ERROR:                        'SMP_ENCRYPTION_KEY_SIZE_ERROR',
152    SMP_COMMAND_NOT_SUPPORTED_ERROR:                      'SMP_COMMAND_NOT_SUPPORTED_ERROR',
153    SMP_UNSPECIFIED_REASON_ERROR:                         'SMP_UNSPECIFIED_REASON_ERROR',
154    SMP_REPEATED_ATTEMPTS_ERROR:                          'SMP_REPEATED_ATTEMPTS_ERROR',
155    SMP_INVALID_PARAMETERS_ERROR:                         'SMP_INVALID_PARAMETERS_ERROR',
156    SMP_DHKEY_CHECK_FAILED_ERROR:                         'SMP_DHKEY_CHECK_FAILED_ERROR',
157    SMP_NUMERIC_COMPARISON_FAILED_ERROR:                  'SMP_NUMERIC_COMPARISON_FAILED_ERROR',
158    SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR:                 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR',
159    SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR'
160}
161
162SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE   = 0
163SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE   = 1
164SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE    = 2
165SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE         = 3
166SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4
167
168SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = {
169    SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE:   'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE',
170    SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE:   'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE',
171    SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE:    'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE',
172    SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE:         'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE',
173    SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE'
174}
175
176# Bit flags for key distribution/generation
177SMP_ENC_KEY_DISTRIBUTION_FLAG  = 0b0001
178SMP_ID_KEY_DISTRIBUTION_FLAG   = 0b0010
179SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100
180SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000
181
182# AuthReq fields
183SMP_BONDING_AUTHREQ  = 0b00000001
184SMP_MITM_AUTHREQ     = 0b00000100
185SMP_SC_AUTHREQ       = 0b00001000
186SMP_KEYPRESS_AUTHREQ = 0b00010000
187SMP_CT2_AUTHREQ      = 0b00100000
188
189# Crypto salt
190SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('000000000000000000000000746D7031')
191SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('000000000000000000000000746D7032')
192
193# fmt: on
194# pylint: enable=line-too-long
195# pylint: disable=invalid-name
196
197
198# -----------------------------------------------------------------------------
199# Utils
200# -----------------------------------------------------------------------------
201def error_name(error_code: int) -> str:
202    return name_or_number(SMP_ERROR_NAMES, error_code)
203
204
205# -----------------------------------------------------------------------------
206# Classes
207# -----------------------------------------------------------------------------
208class SMP_Command:
209    '''
210    See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
211    '''
212
213    smp_classes: Dict[int, Type[SMP_Command]] = {}
214    fields: Any
215    code = 0
216    name = ''
217
218    @staticmethod
219    def from_bytes(pdu: bytes) -> "SMP_Command":
220        code = pdu[0]
221
222        cls = SMP_Command.smp_classes.get(code)
223        if cls is None:
224            instance = SMP_Command(pdu)
225            instance.name = SMP_Command.command_name(code)
226            instance.code = code
227            return instance
228        self = cls.__new__(cls)
229        SMP_Command.__init__(self, pdu)
230        if hasattr(self, 'fields'):
231            self.init_from_bytes(pdu, 1)
232        return self
233
234    @staticmethod
235    def command_name(code: int) -> str:
236        return name_or_number(SMP_COMMAND_NAMES, code)
237
238    @staticmethod
239    def auth_req_str(value: int) -> str:
240        bonding_flags = value & 3
241        mitm = (value >> 2) & 1
242        sc = (value >> 3) & 1
243        keypress = (value >> 4) & 1
244        ct2 = (value >> 5) & 1
245
246        return (
247            f'bonding_flags={bonding_flags}, '
248            f'MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}'
249        )
250
251    @staticmethod
252    def io_capability_name(io_capability: int) -> str:
253        return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability)
254
255    @staticmethod
256    def key_distribution_str(value: int) -> str:
257        key_types: List[str] = []
258        if value & SMP_ENC_KEY_DISTRIBUTION_FLAG:
259            key_types.append('ENC')
260        if value & SMP_ID_KEY_DISTRIBUTION_FLAG:
261            key_types.append('ID')
262        if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
263            key_types.append('SIGN')
264        if value & SMP_LINK_KEY_DISTRIBUTION_FLAG:
265            key_types.append('LINK')
266        return ','.join(key_types)
267
268    @staticmethod
269    def keypress_notification_type_name(notification_type: int) -> str:
270        return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type)
271
272    @staticmethod
273    def subclass(fields):
274        def inner(cls):
275            cls.name = cls.__name__.upper()
276            cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name)
277            if cls.code is None:
278                raise KeyError(
279                    f'Command name {cls.name} not found in SMP_COMMAND_NAMES'
280                )
281            cls.fields = fields
282
283            # Register a factory for this class
284            SMP_Command.smp_classes[cls.code] = cls
285
286            return cls
287
288        return inner
289
290    def __init__(self, pdu: Optional[bytes] = None, **kwargs: Any) -> None:
291        if hasattr(self, 'fields') and kwargs:
292            HCI_Object.init_from_fields(self, self.fields, kwargs)
293        if pdu is None:
294            pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
295        self.pdu = pdu
296
297    def init_from_bytes(self, pdu: bytes, offset: int) -> None:
298        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
299
300    def to_bytes(self):
301        return self.pdu
302
303    def __bytes__(self):
304        return self.to_bytes()
305
306    def __str__(self):
307        result = color(self.name, 'yellow')
308        if fields := getattr(self, 'fields', None):
309            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
310        else:
311            if len(self.pdu) > 1:
312                result += f': {self.pdu.hex()}'
313        return result
314
315
316# -----------------------------------------------------------------------------
317@SMP_Command.subclass(
318    [
319        ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
320        ('oob_data_flag', 1),
321        ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
322        ('maximum_encryption_key_size', 1),
323        (
324            'initiator_key_distribution',
325            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
326        ),
327        (
328            'responder_key_distribution',
329            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
330        ),
331    ]
332)
333class SMP_Pairing_Request_Command(SMP_Command):
334    '''
335    See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request
336    '''
337
338    io_capability: int
339    oob_data_flag: int
340    auth_req: int
341    maximum_encryption_key_size: int
342    initiator_key_distribution: int
343    responder_key_distribution: int
344
345
346# -----------------------------------------------------------------------------
347@SMP_Command.subclass(
348    [
349        ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
350        ('oob_data_flag', 1),
351        ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
352        ('maximum_encryption_key_size', 1),
353        (
354            'initiator_key_distribution',
355            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
356        ),
357        (
358            'responder_key_distribution',
359            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
360        ),
361    ]
362)
363class SMP_Pairing_Response_Command(SMP_Command):
364    '''
365    See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response
366    '''
367
368    io_capability: int
369    oob_data_flag: int
370    auth_req: int
371    maximum_encryption_key_size: int
372    initiator_key_distribution: int
373    responder_key_distribution: int
374
375
376# -----------------------------------------------------------------------------
377@SMP_Command.subclass([('confirm_value', 16)])
378class SMP_Pairing_Confirm_Command(SMP_Command):
379    '''
380    See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm
381    '''
382
383    confirm_value: bytes
384
385
386# -----------------------------------------------------------------------------
387@SMP_Command.subclass([('random_value', 16)])
388class SMP_Pairing_Random_Command(SMP_Command):
389    '''
390    See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random
391    '''
392
393    random_value: bytes
394
395
396# -----------------------------------------------------------------------------
397@SMP_Command.subclass([('reason', {'size': 1, 'mapper': error_name})])
398class SMP_Pairing_Failed_Command(SMP_Command):
399    '''
400    See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed
401    '''
402
403    reason: int
404
405
406# -----------------------------------------------------------------------------
407@SMP_Command.subclass([('public_key_x', 32), ('public_key_y', 32)])
408class SMP_Pairing_Public_Key_Command(SMP_Command):
409    '''
410    See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key
411    '''
412
413    public_key_x: bytes
414    public_key_y: bytes
415
416
417# -----------------------------------------------------------------------------
418@SMP_Command.subclass(
419    [
420        ('dhkey_check', 16),
421    ]
422)
423class SMP_Pairing_DHKey_Check_Command(SMP_Command):
424    '''
425    See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check
426    '''
427
428    dhkey_check: bytes
429
430
431# -----------------------------------------------------------------------------
432@SMP_Command.subclass(
433    [
434        (
435            'notification_type',
436            {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name},
437        ),
438    ]
439)
440class SMP_Pairing_Keypress_Notification_Command(SMP_Command):
441    '''
442    See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification
443    '''
444
445    notification_type: int
446
447
448# -----------------------------------------------------------------------------
449@SMP_Command.subclass([('long_term_key', 16)])
450class SMP_Encryption_Information_Command(SMP_Command):
451    '''
452    See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information
453    '''
454
455    long_term_key: bytes
456
457
458# -----------------------------------------------------------------------------
459@SMP_Command.subclass([('ediv', 2), ('rand', 8)])
460class SMP_Master_Identification_Command(SMP_Command):
461    '''
462    See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification
463    '''
464
465    ediv: int
466    rand: bytes
467
468
469# -----------------------------------------------------------------------------
470@SMP_Command.subclass([('identity_resolving_key', 16)])
471class SMP_Identity_Information_Command(SMP_Command):
472    '''
473    See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information
474    '''
475
476    identity_resolving_key: bytes
477
478
479# -----------------------------------------------------------------------------
480@SMP_Command.subclass(
481    [
482        ('addr_type', Address.ADDRESS_TYPE_SPEC),
483        ('bd_addr', Address.parse_address_preceded_by_type),
484    ]
485)
486class SMP_Identity_Address_Information_Command(SMP_Command):
487    '''
488    See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information
489    '''
490
491    addr_type: int
492    bd_addr: Address
493
494
495# -----------------------------------------------------------------------------
496@SMP_Command.subclass([('signature_key', 16)])
497class SMP_Signing_Information_Command(SMP_Command):
498    '''
499    See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information
500    '''
501
502    signature_key: bytes
503
504
505# -----------------------------------------------------------------------------
506@SMP_Command.subclass(
507    [
508        ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
509    ]
510)
511class SMP_Security_Request_Command(SMP_Command):
512    '''
513    See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request
514    '''
515
516    auth_req: int
517
518
519# -----------------------------------------------------------------------------
520def smp_auth_req(bonding: bool, mitm: bool, sc: bool, keypress: bool, ct2: bool) -> int:
521    value = 0
522    if bonding:
523        value |= SMP_BONDING_AUTHREQ
524    if mitm:
525        value |= SMP_MITM_AUTHREQ
526    if sc:
527        value |= SMP_SC_AUTHREQ
528    if keypress:
529        value |= SMP_KEYPRESS_AUTHREQ
530    if ct2:
531        value |= SMP_CT2_AUTHREQ
532    return value
533
534
535# -----------------------------------------------------------------------------
536class AddressResolver:
537    def __init__(self, resolving_keys):
538        self.resolving_keys = resolving_keys
539
540    def resolve(self, address):
541        address_bytes = bytes(address)
542        hash_part = address_bytes[0:3]
543        prand = address_bytes[3:6]
544        for irk, resolved_address in self.resolving_keys:
545            local_hash = crypto.ah(irk, prand)
546            if local_hash == hash_part:
547                # Match!
548                if resolved_address.address_type == Address.PUBLIC_DEVICE_ADDRESS:
549                    resolved_address_type = Address.PUBLIC_IDENTITY_ADDRESS
550                else:
551                    resolved_address_type = Address.RANDOM_IDENTITY_ADDRESS
552                return Address(
553                    address=str(resolved_address), address_type=resolved_address_type
554                )
555
556        return None
557
558
559# -----------------------------------------------------------------------------
560class PairingMethod(enum.IntEnum):
561    JUST_WORKS = 0
562    NUMERIC_COMPARISON = 1
563    PASSKEY = 2
564    OOB = 3
565    CTKD_OVER_CLASSIC = 4
566
567
568# -----------------------------------------------------------------------------
569class OobContext:
570    """Cryptographic context for LE SC OOB pairing."""
571
572    ecc_key: crypto.EccKey
573    r: bytes
574
575    def __init__(
576        self, ecc_key: Optional[crypto.EccKey] = None, r: Optional[bytes] = None
577    ) -> None:
578        self.ecc_key = crypto.EccKey.generate() if ecc_key is None else ecc_key
579        self.r = crypto.r() if r is None else r
580
581    def share(self) -> OobSharedData:
582        pkx = self.ecc_key.x[::-1]
583        return OobSharedData(c=crypto.f4(pkx, pkx, self.r, bytes(1)), r=self.r)
584
585
586# -----------------------------------------------------------------------------
587class OobLegacyContext:
588    """Cryptographic context for LE Legacy OOB pairing."""
589
590    tk: bytes
591
592    def __init__(self, tk: Optional[bytes] = None) -> None:
593        self.tk = crypto.r() if tk is None else tk
594
595
596# -----------------------------------------------------------------------------
597@dataclass
598class OobSharedData:
599    """Shareable data for LE SC OOB pairing."""
600
601    c: bytes
602    r: bytes
603
604    def to_ad(self) -> AdvertisingData:
605        return AdvertisingData(
606            [
607                (AdvertisingData.LE_SECURE_CONNECTIONS_CONFIRMATION_VALUE, self.c),
608                (AdvertisingData.LE_SECURE_CONNECTIONS_RANDOM_VALUE, self.r),
609            ]
610        )
611
612    def __str__(self) -> str:
613        return f'OOB(C={self.c.hex()}, R={self.r.hex()})'
614
615
616# -----------------------------------------------------------------------------
617class Session:
618    # I/O Capability to pairing method decision matrix
619    #
620    # See Bluetooth spec @ Vol 3, part H - Table 2.8: Mapping of IO Capabilities to Key
621    # Generation Method
622    #
623    # Map: initiator -> responder -> <method>
624    # where <method> may be a simple entry or a 2-element tuple, with the first element
625    # for legacy pairing and the second  for secure connections, when the two are
626    # different. Each entry is either a method name, or, for PASSKEY, a tuple:
627    # (method, initiator_displays, responder_displays)
628    # to specify if the initiator and responder should display (True) or input a code
629    # (False).
630    PAIRING_METHODS = {
631        SMP_DISPLAY_ONLY_IO_CAPABILITY: {
632            SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
633            SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS,
634            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
635            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
636            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
637        },
638        SMP_DISPLAY_YES_NO_IO_CAPABILITY: {
639            SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
640            SMP_DISPLAY_YES_NO_IO_CAPABILITY: (
641                PairingMethod.JUST_WORKS,
642                PairingMethod.NUMERIC_COMPARISON,
643            ),
644            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
645            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
646            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (
647                (PairingMethod.PASSKEY, True, False),
648                PairingMethod.NUMERIC_COMPARISON,
649            ),
650        },
651        SMP_KEYBOARD_ONLY_IO_CAPABILITY: {
652            SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
653            SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
654            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, False),
655            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
656            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
657        },
658        SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
659            SMP_DISPLAY_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
660            SMP_DISPLAY_YES_NO_IO_CAPABILITY: PairingMethod.JUST_WORKS,
661            SMP_KEYBOARD_ONLY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
662            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
663            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: PairingMethod.JUST_WORKS,
664        },
665        SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: {
666            SMP_DISPLAY_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, False, True),
667            SMP_DISPLAY_YES_NO_IO_CAPABILITY: (
668                (PairingMethod.PASSKEY, False, True),
669                PairingMethod.NUMERIC_COMPARISON,
670            ),
671            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PairingMethod.PASSKEY, True, False),
672            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: PairingMethod.JUST_WORKS,
673            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (
674                (PairingMethod.PASSKEY, True, False),
675                PairingMethod.NUMERIC_COMPARISON,
676            ),
677        },
678    }
679
680    ea: bytes
681    eb: bytes
682    ltk: bytes
683    preq: bytes
684    pres: bytes
685    tk: bytes
686
687    def __init__(
688        self,
689        manager: Manager,
690        connection: Connection,
691        pairing_config: PairingConfig,
692        is_initiator: bool,
693    ) -> None:
694        self.manager = manager
695        self.connection = connection
696        self.stk = None
697        self.ltk_ediv = 0
698        self.ltk_rand = bytes(8)
699        self.link_key: Optional[bytes] = None
700        self.initiator_key_distribution: int = 0
701        self.responder_key_distribution: int = 0
702        self.peer_random_value: Optional[bytes] = None
703        self.peer_public_key_x: bytes = bytes(32)
704        self.peer_public_key_y = bytes(32)
705        self.peer_ltk = None
706        self.peer_ediv = None
707        self.peer_rand: Optional[bytes] = None
708        self.peer_identity_resolving_key = None
709        self.peer_bd_addr: Optional[Address] = None
710        self.peer_signature_key = None
711        self.peer_expected_distributions: List[Type[SMP_Command]] = []
712        self.dh_key = b''
713        self.confirm_value = None
714        self.passkey: Optional[int] = None
715        self.passkey_ready = asyncio.Event()
716        self.passkey_step = 0
717        self.passkey_display = False
718        self.pairing_method: PairingMethod = PairingMethod.JUST_WORKS
719        self.pairing_config = pairing_config
720        self.wait_before_continuing: Optional[asyncio.Future[None]] = None
721        self.completed = False
722        self.ctkd_task: Optional[Awaitable[None]] = None
723
724        # Decide if we're the initiator or the responder
725        self.is_initiator = is_initiator
726        self.is_responder = not self.is_initiator
727
728        # Listen for connection events
729        connection.on('disconnection', self.on_disconnection)
730        connection.on(
731            'connection_encryption_change', self.on_connection_encryption_change
732        )
733        connection.on(
734            'connection_encryption_key_refresh',
735            self.on_connection_encryption_key_refresh,
736        )
737
738        # Create a future that can be used to wait for the session to complete
739        if self.is_initiator:
740            self.pairing_result: Optional[asyncio.Future[None]] = (
741                asyncio.get_running_loop().create_future()
742            )
743        else:
744            self.pairing_result = None
745
746        # Key Distribution (default values before negotiation)
747        self.initiator_key_distribution = (
748            pairing_config.delegate.local_initiator_key_distribution
749        )
750        self.responder_key_distribution = (
751            pairing_config.delegate.local_responder_key_distribution
752        )
753
754        # Authentication Requirements Flags - Vol 3, Part H, Figure 3.3
755        self.bonding: bool = pairing_config.bonding
756        self.sc: bool = pairing_config.sc
757        self.mitm: bool = pairing_config.mitm
758        self.keypress = False
759        self.ct2: bool = False
760
761        # I/O Capabilities
762        self.io_capability = pairing_config.delegate.io_capability
763        self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
764
765        # OOB
766        self.oob_data_flag = 0 if pairing_config.oob is None else 1
767
768        # Set up addresses
769        self_address = connection.self_address
770        peer_address = connection.peer_resolvable_address or connection.peer_address
771        if self.is_initiator:
772            self.ia = bytes(self_address)
773            self.iat = 1 if self_address.is_random else 0
774            self.ra = bytes(peer_address)
775            self.rat = 1 if peer_address.is_random else 0
776        else:
777            self.ra = bytes(self_address)
778            self.rat = 1 if self_address.is_random else 0
779            self.ia = bytes(peer_address)
780            self.iat = 1 if peer_address.is_random else 0
781
782        # Select the ECC key, TK and r initial value
783        if pairing_config.oob:
784            self.peer_oob_data = pairing_config.oob.peer_data
785            if pairing_config.sc:
786                if pairing_config.oob.our_context is None:
787                    raise ValueError(
788                        "oob pairing config requires a context when sc is True"
789                    )
790                self.r = pairing_config.oob.our_context.r
791                self.ecc_key = pairing_config.oob.our_context.ecc_key
792                if pairing_config.oob.legacy_context is not None:
793                    self.tk = pairing_config.oob.legacy_context.tk
794            else:
795                if pairing_config.oob.legacy_context is None:
796                    raise ValueError(
797                        "oob pairing config requires a legacy context when sc is False"
798                    )
799                self.r = bytes(16)
800                self.ecc_key = manager.ecc_key
801                self.tk = pairing_config.oob.legacy_context.tk
802        else:
803            self.peer_oob_data = None
804            self.r = bytes(16)
805            self.ecc_key = manager.ecc_key
806            self.tk = bytes(16)
807
808    @property
809    def pkx(self) -> Tuple[bytes, bytes]:
810        return (self.ecc_key.x[::-1], self.peer_public_key_x)
811
812    @property
813    def pka(self) -> bytes:
814        return self.pkx[0 if self.is_initiator else 1]
815
816    @property
817    def pkb(self) -> bytes:
818        return self.pkx[0 if self.is_responder else 1]
819
820    @property
821    def nx(self) -> Tuple[bytes, bytes]:
822        assert self.peer_random_value
823        return (self.r, self.peer_random_value)
824
825    @property
826    def na(self) -> bytes:
827        return self.nx[0 if self.is_initiator else 1]
828
829    @property
830    def nb(self) -> bytes:
831        return self.nx[0 if self.is_responder else 1]
832
833    @property
834    def auth_req(self) -> int:
835        return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2)
836
837    def get_long_term_key(self, rand: bytes, ediv: int) -> Optional[bytes]:
838        if not self.sc and not self.completed:
839            if rand == self.ltk_rand and ediv == self.ltk_ediv:
840                return self.stk
841        else:
842            return self.ltk
843
844        return None
845
846    def decide_pairing_method(
847        self,
848        auth_req: int,
849        initiator_io_capability: int,
850        responder_io_capability: int,
851    ) -> None:
852        if self.connection.transport == BT_BR_EDR_TRANSPORT:
853            self.pairing_method = PairingMethod.CTKD_OVER_CLASSIC
854            return
855        if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0):
856            self.pairing_method = PairingMethod.JUST_WORKS
857            return
858
859        details = self.PAIRING_METHODS[initiator_io_capability][responder_io_capability]  # type: ignore[index]
860        if isinstance(details, tuple) and len(details) == 2:
861            # One entry for legacy pairing and one for secure connections
862            details = details[1 if self.sc else 0]
863        if isinstance(details, PairingMethod):
864            # Just a method ID
865            self.pairing_method = details
866        else:
867            # PASSKEY method, with a method ID and display/input flags
868            assert isinstance(details[0], PairingMethod)
869            self.pairing_method = details[0]
870            self.passkey_display = details[1 if self.is_initiator else 2]
871
872    def check_expected_value(
873        self, expected: bytes, received: bytes, error: int
874    ) -> bool:
875        logger.debug(f'expected={expected.hex()} got={received.hex()}')
876        if expected != received:
877            logger.info(color('pairing confirm/check mismatch', 'red'))
878            self.send_pairing_failed(error)
879            return False
880        return True
881
882    def prompt_user_for_confirmation(self, next_steps: Callable[[], None]) -> None:
883        async def prompt() -> None:
884            logger.debug('ask for confirmation')
885            try:
886                response = await self.pairing_config.delegate.confirm()
887                if response:
888                    next_steps()
889                    return
890            except Exception as error:
891                logger.warning(f'exception while confirm: {error}')
892
893            self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
894
895        self.connection.abort_on('disconnection', prompt())
896
897    def prompt_user_for_numeric_comparison(
898        self, code: int, next_steps: Callable[[], None]
899    ) -> None:
900        async def prompt() -> None:
901            logger.debug(f'verification code: {code}')
902            try:
903                response = await self.pairing_config.delegate.compare_numbers(
904                    code, digits=6
905                )
906                if response:
907                    next_steps()
908                    return
909            except Exception as error:
910                logger.warning(f'exception while prompting: {error}')
911
912            self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
913
914        self.connection.abort_on('disconnection', prompt())
915
916    def prompt_user_for_number(self, next_steps: Callable[[int], None]) -> None:
917        async def prompt() -> None:
918            logger.debug('prompting user for passkey')
919            try:
920                passkey = await self.pairing_config.delegate.get_number()
921                if passkey is None:
922                    logger.debug('Passkey request rejected')
923                    self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR)
924                    return
925                logger.debug(f'user input: {passkey}')
926                next_steps(passkey)
927            except Exception as error:
928                logger.warning(f'exception while prompting: {error}')
929                self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR)
930
931        self.connection.abort_on('disconnection', prompt())
932
933    def display_passkey(self) -> None:
934        # Generate random Passkey/PIN code
935        self.passkey = secrets.randbelow(1000000)
936        assert self.passkey is not None
937        logger.debug(f'Pairing PIN CODE: {self.passkey:06}')
938        self.passkey_ready.set()
939
940        # The value of TK is computed from the PIN code
941        if not self.sc:
942            self.tk = self.passkey.to_bytes(16, byteorder='little')
943            logger.debug(f'TK from passkey = {self.tk.hex()}')
944
945        try:
946            self.connection.abort_on(
947                'disconnection',
948                self.pairing_config.delegate.display_number(self.passkey, digits=6),
949            )
950        except Exception as error:
951            logger.warning(f'exception while displaying number: {error}')
952
953    def input_passkey(self, next_steps: Optional[Callable[[], None]] = None) -> None:
954        # Prompt the user for the passkey displayed on the peer
955        def after_input(passkey: int) -> None:
956            self.passkey = passkey
957
958            if not self.sc:
959                self.tk = passkey.to_bytes(16, byteorder='little')
960                logger.debug(f'TK from passkey = {self.tk.hex()}')
961
962            self.passkey_ready.set()
963
964            if next_steps is not None:
965                next_steps()
966
967        self.prompt_user_for_number(after_input)
968
969    def display_or_input_passkey(
970        self, next_steps: Optional[Callable[[], None]] = None
971    ) -> None:
972        if self.passkey_display:
973            self.display_passkey()
974            if next_steps is not None:
975                next_steps()
976        else:
977            self.input_passkey(next_steps)
978
979    def send_command(self, command: SMP_Command) -> None:
980        self.manager.send_command(self.connection, command)
981
982    def send_pairing_failed(self, error: int) -> None:
983        self.send_command(SMP_Pairing_Failed_Command(reason=error))
984        self.on_pairing_failure(error)
985
986    def send_pairing_request_command(self) -> None:
987        self.manager.on_session_start(self)
988
989        command = SMP_Pairing_Request_Command(
990            io_capability=self.io_capability,
991            oob_data_flag=self.oob_data_flag,
992            auth_req=self.auth_req,
993            maximum_encryption_key_size=16,
994            initiator_key_distribution=self.initiator_key_distribution,
995            responder_key_distribution=self.responder_key_distribution,
996        )
997        self.preq = bytes(command)
998        self.send_command(command)
999
1000    def send_pairing_response_command(self) -> None:
1001        response = SMP_Pairing_Response_Command(
1002            io_capability=self.io_capability,
1003            oob_data_flag=self.oob_data_flag,
1004            auth_req=self.auth_req,
1005            maximum_encryption_key_size=16,
1006            initiator_key_distribution=self.initiator_key_distribution,
1007            responder_key_distribution=self.responder_key_distribution,
1008        )
1009        self.pres = bytes(response)
1010        self.send_command(response)
1011
1012    def send_pairing_confirm_command(self) -> None:
1013        self.r = crypto.r()
1014        logger.debug(f'generated random: {self.r.hex()}')
1015
1016        if self.sc:
1017
1018            async def next_steps() -> None:
1019                if self.pairing_method in (
1020                    PairingMethod.JUST_WORKS,
1021                    PairingMethod.NUMERIC_COMPARISON,
1022                ):
1023                    z = 0
1024                elif self.pairing_method == PairingMethod.PASSKEY:
1025                    # We need a passkey
1026                    await self.passkey_ready.wait()
1027                    assert self.passkey
1028
1029                    z = 0x80 + ((self.passkey >> self.passkey_step) & 1)
1030                else:
1031                    return
1032
1033                if self.is_initiator:
1034                    confirm_value = crypto.f4(self.pka, self.pkb, self.r, bytes([z]))
1035                else:
1036                    confirm_value = crypto.f4(self.pkb, self.pka, self.r, bytes([z]))
1037
1038                self.send_command(
1039                    SMP_Pairing_Confirm_Command(confirm_value=confirm_value)
1040                )
1041
1042            # Perform the next steps asynchronously in case we need to wait for input
1043            self.connection.abort_on('disconnection', next_steps())
1044        else:
1045            confirm_value = crypto.c1(
1046                self.tk,
1047                self.r,
1048                self.preq,
1049                self.pres,
1050                self.iat,
1051                self.rat,
1052                self.ia,
1053                self.ra,
1054            )
1055
1056            self.send_command(SMP_Pairing_Confirm_Command(confirm_value=confirm_value))
1057
1058    def send_pairing_random_command(self) -> None:
1059        self.send_command(SMP_Pairing_Random_Command(random_value=self.r))
1060
1061    def send_public_key_command(self) -> None:
1062        self.send_command(
1063            SMP_Pairing_Public_Key_Command(
1064                public_key_x=self.ecc_key.x[::-1],
1065                public_key_y=self.ecc_key.y[::-1],
1066            )
1067        )
1068
1069    def send_pairing_dhkey_check_command(self) -> None:
1070        self.send_command(
1071            SMP_Pairing_DHKey_Check_Command(
1072                dhkey_check=self.ea if self.is_initiator else self.eb
1073            )
1074        )
1075
1076    def send_identity_address_command(self) -> None:
1077        identity_address = {
1078            None: self.connection.self_address,
1079            Address.PUBLIC_DEVICE_ADDRESS: self.manager.device.public_address,
1080            Address.RANDOM_DEVICE_ADDRESS: self.manager.device.random_address,
1081        }[self.pairing_config.identity_address_type]
1082        self.send_command(
1083            SMP_Identity_Address_Information_Command(
1084                addr_type=identity_address.address_type,
1085                bd_addr=identity_address,
1086            )
1087        )
1088
1089    def start_encryption(self, key: bytes) -> None:
1090        # We can now encrypt the connection with the short term key, so that we can
1091        # distribute the long term and/or other keys over an encrypted connection
1092        self.manager.device.host.send_command_sync(
1093            HCI_LE_Enable_Encryption_Command(
1094                connection_handle=self.connection.handle,
1095                random_number=bytes(8),
1096                encrypted_diversifier=0,
1097                long_term_key=key,
1098            )
1099        )
1100
1101    @classmethod
1102    def derive_ltk(cls, link_key: bytes, ct2: bool) -> bytes:
1103        '''Derives Long Term Key from Link Key.
1104
1105        Args:
1106            link_key: BR/EDR Link Key bytes in little-endian.
1107            ct2: whether ct2 is supported on both devices.
1108        Returns:
1109            LE Long Tern Key bytes in little-endian.
1110        '''
1111        ilk = (
1112            crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
1113            if ct2
1114            else crypto.h6(link_key, b'tmp2')
1115        )
1116        return crypto.h6(ilk, b'brle')
1117
1118    @classmethod
1119    def derive_link_key(cls, ltk: bytes, ct2: bool) -> bytes:
1120        '''Derives Link Key from Long Term Key.
1121
1122        Args:
1123            ltk: LE Long Term Key bytes in little-endian.
1124            ct2: whether ct2 is supported on both devices.
1125        Returns:
1126            BR/EDR Link Key bytes in little-endian.
1127        '''
1128        ilk = (
1129            crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=ltk)
1130            if ct2
1131            else crypto.h6(ltk, b'tmp1')
1132        )
1133        return crypto.h6(ilk, b'lebr')
1134
1135    async def get_link_key_and_derive_ltk(self) -> None:
1136        '''Retrieves BR/EDR Link Key from storage and derive it to LE LTK.'''
1137        self.link_key = await self.manager.device.get_link_key(
1138            self.connection.peer_address
1139        )
1140        if self.link_key is None:
1141            logging.warning(
1142                'Try to derive LTK but host does not have the LK. Send a SMP_PAIRING_FAILED but the procedure will not be paused!'
1143            )
1144            self.send_pairing_failed(
1145                SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR
1146            )
1147        else:
1148            self.ltk = self.derive_ltk(self.link_key, self.ct2)
1149
1150    def distribute_keys(self) -> None:
1151        # Distribute the keys as required
1152        if self.is_initiator:
1153            # CTKD: Derive LTK from LinkKey
1154            if (
1155                self.connection.transport == BT_BR_EDR_TRANSPORT
1156                and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
1157            ):
1158                self.ctkd_task = self.connection.abort_on(
1159                    'disconnection', self.get_link_key_and_derive_ltk()
1160                )
1161            elif not self.sc:
1162                # Distribute the LTK, EDIV and RAND
1163                if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
1164                    self.send_command(
1165                        SMP_Encryption_Information_Command(long_term_key=self.ltk)
1166                    )
1167                    self.send_command(
1168                        SMP_Master_Identification_Command(
1169                            ediv=self.ltk_ediv, rand=self.ltk_rand
1170                        )
1171                    )
1172
1173            # Distribute IRK & BD ADDR
1174            if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
1175                self.send_command(
1176                    SMP_Identity_Information_Command(
1177                        identity_resolving_key=self.manager.device.irk
1178                    )
1179                )
1180                self.send_identity_address_command()
1181
1182            # Distribute CSRK
1183            csrk = bytes(16)  # FIXME: testing
1184            if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
1185                self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
1186
1187            # CTKD, calculate BR/EDR link key
1188            if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
1189                self.link_key = self.derive_link_key(self.ltk, self.ct2)
1190
1191        else:
1192            # CTKD: Derive LTK from LinkKey
1193            if (
1194                self.connection.transport == BT_BR_EDR_TRANSPORT
1195                and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
1196            ):
1197                self.ctkd_task = self.connection.abort_on(
1198                    'disconnection', self.get_link_key_and_derive_ltk()
1199                )
1200            # Distribute the LTK, EDIV and RAND
1201            elif not self.sc:
1202                if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
1203                    self.send_command(
1204                        SMP_Encryption_Information_Command(long_term_key=self.ltk)
1205                    )
1206                    self.send_command(
1207                        SMP_Master_Identification_Command(
1208                            ediv=self.ltk_ediv, rand=self.ltk_rand
1209                        )
1210                    )
1211
1212            # Distribute IRK & BD ADDR
1213            if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
1214                self.send_command(
1215                    SMP_Identity_Information_Command(
1216                        identity_resolving_key=self.manager.device.irk
1217                    )
1218                )
1219                self.send_identity_address_command()
1220
1221            # Distribute CSRK
1222            csrk = bytes(16)  # FIXME: testing
1223            if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
1224                self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
1225
1226            # CTKD, calculate BR/EDR link key
1227            if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
1228                self.link_key = self.derive_link_key(self.ltk, self.ct2)
1229
1230    def compute_peer_expected_distributions(self, key_distribution_flags: int) -> None:
1231        # Set our expectations for what to wait for in the key distribution phase
1232        self.peer_expected_distributions = []
1233        if not self.sc and self.connection.transport == BT_LE_TRANSPORT:
1234            if key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0:
1235                self.peer_expected_distributions.append(
1236                    SMP_Encryption_Information_Command
1237                )
1238                self.peer_expected_distributions.append(
1239                    SMP_Master_Identification_Command
1240                )
1241        if key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0:
1242            self.peer_expected_distributions.append(SMP_Identity_Information_Command)
1243            self.peer_expected_distributions.append(
1244                SMP_Identity_Address_Information_Command
1245            )
1246        if key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0:
1247            self.peer_expected_distributions.append(SMP_Signing_Information_Command)
1248        logger.debug(
1249            'expecting distributions: '
1250            f'{[c.__name__ for c in self.peer_expected_distributions]}'
1251        )
1252
1253    def check_key_distribution(self, command_class: Type[SMP_Command]) -> None:
1254        # First, check that the connection is encrypted
1255        if not self.connection.is_encrypted:
1256            logger.warning(
1257                color('received key distribution on a non-encrypted connection', 'red')
1258            )
1259            self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
1260            return
1261
1262        # Check that this command class is expected
1263        if command_class in self.peer_expected_distributions:
1264            self.peer_expected_distributions.remove(command_class)
1265            logger.debug(
1266                'remaining distributions: '
1267                f'{[c.__name__ for c in self.peer_expected_distributions]}'
1268            )
1269            if not self.peer_expected_distributions:
1270                self.on_peer_key_distribution_complete()
1271        else:
1272            logger.warning(
1273                color(
1274                    '!!! unexpected key distribution command: '
1275                    f'{command_class.__name__}',
1276                    'red',
1277                )
1278            )
1279            self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
1280
1281    async def pair(self) -> None:
1282        # Start pairing as an initiator
1283        # TODO: check that this session isn't already active
1284
1285        # Send the pairing request to start the process
1286        self.send_pairing_request_command()
1287
1288        # Wait for the pairing process to finish
1289        assert self.pairing_result
1290        await self.connection.abort_on('disconnection', self.pairing_result)
1291
1292    def on_disconnection(self, _: int) -> None:
1293        self.connection.remove_listener('disconnection', self.on_disconnection)
1294        self.connection.remove_listener(
1295            'connection_encryption_change', self.on_connection_encryption_change
1296        )
1297        self.connection.remove_listener(
1298            'connection_encryption_key_refresh',
1299            self.on_connection_encryption_key_refresh,
1300        )
1301        self.manager.on_session_end(self)
1302
1303    def on_peer_key_distribution_complete(self) -> None:
1304        # The initiator can now send its keys
1305        if self.is_initiator:
1306            self.distribute_keys()
1307
1308        self.connection.abort_on('disconnection', self.on_pairing())
1309
1310    def on_connection_encryption_change(self) -> None:
1311        if self.connection.is_encrypted:
1312            if self.is_responder:
1313                # The responder distributes its keys first, the initiator later
1314                self.distribute_keys()
1315
1316            # If we're not expecting key distributions from the peer, we're done
1317            if not self.peer_expected_distributions:
1318                self.on_peer_key_distribution_complete()
1319
1320    def on_connection_encryption_key_refresh(self) -> None:
1321        # Do as if the connection had just been encrypted
1322        self.on_connection_encryption_change()
1323
1324    async def on_pairing(self) -> None:
1325        logger.debug('pairing complete')
1326
1327        if self.completed:
1328            return
1329
1330        self.completed = True
1331
1332        if self.pairing_result is not None and not self.pairing_result.done():
1333            self.pairing_result.set_result(None)
1334
1335        # Use the peer address from the pairing protocol or the connection
1336        if self.peer_bd_addr is not None:
1337            peer_address = self.peer_bd_addr
1338        else:
1339            peer_address = self.connection.peer_address
1340
1341        # Wait for link key fetch and key derivation
1342        if self.ctkd_task is not None:
1343            await self.ctkd_task
1344            self.ctkd_task = None
1345
1346        # Create an object to hold the keys
1347        keys = PairingKeys()
1348        keys.address_type = peer_address.address_type
1349        authenticated = self.pairing_method != PairingMethod.JUST_WORKS
1350        if self.sc or self.connection.transport == BT_BR_EDR_TRANSPORT:
1351            keys.ltk = PairingKeys.Key(value=self.ltk, authenticated=authenticated)
1352        else:
1353            our_ltk_key = PairingKeys.Key(
1354                value=self.ltk,
1355                authenticated=authenticated,
1356                ediv=self.ltk_ediv,
1357                rand=self.ltk_rand,
1358            )
1359            peer_ltk_key = PairingKeys.Key(
1360                value=self.peer_ltk,
1361                authenticated=authenticated,
1362                ediv=self.peer_ediv,
1363                rand=self.peer_rand,
1364            )
1365            if self.is_initiator:
1366                keys.ltk_central = peer_ltk_key
1367                keys.ltk_peripheral = our_ltk_key
1368            else:
1369                keys.ltk_central = our_ltk_key
1370                keys.ltk_peripheral = peer_ltk_key
1371        if self.peer_identity_resolving_key is not None:
1372            keys.irk = PairingKeys.Key(
1373                value=self.peer_identity_resolving_key, authenticated=authenticated
1374            )
1375        if self.peer_signature_key is not None:
1376            keys.csrk = PairingKeys.Key(
1377                value=self.peer_signature_key, authenticated=authenticated
1378            )
1379        if self.link_key is not None:
1380            keys.link_key = PairingKeys.Key(
1381                value=self.link_key, authenticated=authenticated
1382            )
1383        await self.manager.on_pairing(self, peer_address, keys)
1384
1385    def on_pairing_failure(self, reason: int) -> None:
1386        logger.warning(f'pairing failure ({error_name(reason)})')
1387
1388        if self.completed:
1389            return
1390
1391        self.completed = True
1392
1393        error = ProtocolError(reason, 'smp', error_name(reason))
1394        if self.pairing_result is not None and not self.pairing_result.done():
1395            self.pairing_result.set_exception(error)
1396        self.manager.on_pairing_failure(self, reason)
1397
1398    def on_smp_command(self, command: SMP_Command) -> None:
1399        # Find the handler method
1400        handler_name = f'on_{command.name.lower()}'
1401        handler = getattr(self, handler_name, None)
1402        if handler is not None:
1403            try:
1404                handler(command)
1405            except Exception as error:
1406                logger.exception(f'{color("!!! Exception in handler:", "red")} {error}')
1407                response = SMP_Pairing_Failed_Command(
1408                    reason=SMP_UNSPECIFIED_REASON_ERROR
1409                )
1410                self.send_command(response)
1411        else:
1412            logger.error(color('SMP command not handled???', 'red'))
1413
1414    def on_smp_pairing_request_command(
1415        self, command: SMP_Pairing_Request_Command
1416    ) -> None:
1417        self.connection.abort_on(
1418            'disconnection', self.on_smp_pairing_request_command_async(command)
1419        )
1420
1421    async def on_smp_pairing_request_command_async(
1422        self, command: SMP_Pairing_Request_Command
1423    ) -> None:
1424        # Check if the request should proceed
1425        try:
1426            accepted = await self.pairing_config.delegate.accept()
1427        except Exception as error:
1428            logger.warning(f'exception while accepting: {error}')
1429            accepted = False
1430        if not accepted:
1431            logger.debug('pairing rejected by delegate')
1432            self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR)
1433            return
1434
1435        # Save the request
1436        self.preq = bytes(command)
1437
1438        # Bonding and SC require both sides to request/support it
1439        self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
1440        self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
1441        self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
1442
1443        # Infer the pairing method
1444        if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
1445            not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
1446        ):
1447            # Use OOB
1448            self.pairing_method = PairingMethod.OOB
1449            if not self.sc and self.tk is None:
1450                # For legacy OOB, TK is required.
1451                logger.warning("legacy OOB without TK")
1452                self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
1453                return
1454            if command.oob_data_flag == 0:
1455                # The peer doesn't have OOB data, use r=0
1456                self.r = bytes(16)
1457        else:
1458            # Decide which pairing method to use from the IO capability
1459            self.decide_pairing_method(
1460                command.auth_req,
1461                command.io_capability,
1462                self.io_capability,
1463            )
1464
1465        logger.debug(f'pairing method: {self.pairing_method.name}')
1466
1467        # Key distribution
1468        (
1469            self.initiator_key_distribution,
1470            self.responder_key_distribution,
1471        ) = await self.pairing_config.delegate.key_distribution_response(
1472            command.initiator_key_distribution, command.responder_key_distribution
1473        )
1474        self.compute_peer_expected_distributions(self.initiator_key_distribution)
1475
1476        # The pairing is now starting
1477        self.manager.on_session_start(self)
1478
1479        # Display a passkey if we need to
1480        if not self.sc:
1481            if self.pairing_method == PairingMethod.PASSKEY and self.passkey_display:
1482                self.display_passkey()
1483
1484        # Respond
1485        self.send_pairing_response_command()
1486
1487        # Vol 3, Part C, 5.2.2.1.3
1488        # CTKD over BR/EDR should happen after the connection has been encrypted,
1489        # so when receiving pairing requests, responder should start distributing keys
1490        if (
1491            self.connection.transport == BT_BR_EDR_TRANSPORT
1492            and self.connection.is_encrypted
1493            and self.is_responder
1494            and accepted
1495        ):
1496            self.distribute_keys()
1497
1498    def on_smp_pairing_response_command(
1499        self, command: SMP_Pairing_Response_Command
1500    ) -> None:
1501        if self.is_responder:
1502            logger.warning(color('received pairing response as a responder', 'red'))
1503            return
1504
1505        # Save the response
1506        self.pres = bytes(command)
1507        self.peer_io_capability = command.io_capability
1508
1509        # Bonding and SC require both sides to request/support it
1510        self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
1511        self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
1512
1513        # Infer the pairing method
1514        if (self.sc and (self.oob_data_flag != 0 or command.oob_data_flag != 0)) or (
1515            not self.sc and (self.oob_data_flag != 0 and command.oob_data_flag != 0)
1516        ):
1517            # Use OOB
1518            self.pairing_method = PairingMethod.OOB
1519            if not self.sc and self.tk is None:
1520                # For legacy OOB, TK is required.
1521                logger.warning("legacy OOB without TK")
1522                self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
1523                return
1524            if command.oob_data_flag == 0:
1525                # The peer doesn't have OOB data, use r=0
1526                self.r = bytes(16)
1527        else:
1528            # Decide which pairing method to use from the IO capability
1529            self.decide_pairing_method(
1530                command.auth_req, self.io_capability, command.io_capability
1531            )
1532
1533        logger.debug(f'pairing method: {self.pairing_method.name}')
1534
1535        # Key distribution
1536        if (
1537            command.initiator_key_distribution & ~self.initiator_key_distribution != 0
1538        ) or (
1539            command.responder_key_distribution & ~self.responder_key_distribution != 0
1540        ):
1541            # The response isn't a subset of the request
1542            self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR)
1543            return
1544        self.initiator_key_distribution = command.initiator_key_distribution
1545        self.responder_key_distribution = command.responder_key_distribution
1546        self.compute_peer_expected_distributions(self.responder_key_distribution)
1547
1548        # Start phase 2
1549        if self.pairing_method == PairingMethod.CTKD_OVER_CLASSIC:
1550            # Authentication is already done in SMP, so remote shall start keys distribution immediately
1551            return
1552        elif self.sc:
1553            if self.pairing_method == PairingMethod.PASSKEY:
1554                self.display_or_input_passkey()
1555
1556            self.send_public_key_command()
1557        else:
1558            if self.pairing_method == PairingMethod.PASSKEY:
1559                self.display_or_input_passkey(self.send_pairing_confirm_command)
1560            else:
1561                self.send_pairing_confirm_command()
1562
1563    def on_smp_pairing_confirm_command_legacy(
1564        self, _: SMP_Pairing_Confirm_Command
1565    ) -> None:
1566        if self.is_initiator:
1567            self.send_pairing_random_command()
1568        else:
1569            # If the method is PASSKEY, now is the time to input the code
1570            if (
1571                self.pairing_method == PairingMethod.PASSKEY
1572                and not self.passkey_display
1573            ):
1574                self.input_passkey(self.send_pairing_confirm_command)
1575            else:
1576                self.send_pairing_confirm_command()
1577
1578    def on_smp_pairing_confirm_command_secure_connections(
1579        self, _: SMP_Pairing_Confirm_Command
1580    ) -> None:
1581        if self.pairing_method in (
1582            PairingMethod.JUST_WORKS,
1583            PairingMethod.NUMERIC_COMPARISON,
1584        ):
1585            if self.is_initiator:
1586                self.r = crypto.r()
1587                self.send_pairing_random_command()
1588        elif self.pairing_method == PairingMethod.PASSKEY:
1589            if self.is_initiator:
1590                self.send_pairing_random_command()
1591            else:
1592                self.send_pairing_confirm_command()
1593
1594    def on_smp_pairing_confirm_command(
1595        self, command: SMP_Pairing_Confirm_Command
1596    ) -> None:
1597        self.confirm_value = command.confirm_value
1598        if self.sc:
1599            self.on_smp_pairing_confirm_command_secure_connections(command)
1600        else:
1601            self.on_smp_pairing_confirm_command_legacy(command)
1602
1603    def on_smp_pairing_random_command_legacy(
1604        self, command: SMP_Pairing_Random_Command
1605    ) -> None:
1606        # Check that the confirmation values match
1607        confirm_verifier = crypto.c1(
1608            self.tk,
1609            command.random_value,
1610            self.preq,
1611            self.pres,
1612            self.iat,
1613            self.rat,
1614            self.ia,
1615            self.ra,
1616        )
1617        assert self.confirm_value
1618        if not self.check_expected_value(
1619            self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1620        ):
1621            return
1622
1623        # Compute STK
1624        if self.is_initiator:
1625            mrand = self.r
1626            srand = command.random_value
1627        else:
1628            srand = self.r
1629            mrand = command.random_value
1630        self.stk = crypto.s1(self.tk, srand, mrand)
1631        logger.debug(f'STK = {self.stk.hex()}')
1632
1633        # Generate LTK
1634        self.ltk = crypto.r()
1635
1636        if self.is_initiator:
1637            self.start_encryption(self.stk)
1638        else:
1639            self.send_pairing_random_command()
1640
1641    def on_smp_pairing_random_command_secure_connections(
1642        self, command: SMP_Pairing_Random_Command
1643    ) -> None:
1644        if self.pairing_method == PairingMethod.PASSKEY and self.passkey is None:
1645            logger.warning('no passkey entered, ignoring command')
1646            return
1647
1648        # pylint: disable=too-many-return-statements
1649        if self.is_initiator:
1650            if self.pairing_method in (
1651                PairingMethod.JUST_WORKS,
1652                PairingMethod.NUMERIC_COMPARISON,
1653            ):
1654                assert self.confirm_value
1655                # Check that the random value matches what was committed to earlier
1656                confirm_verifier = crypto.f4(
1657                    self.pkb, self.pka, command.random_value, bytes([0])
1658                )
1659                if not self.check_expected_value(
1660                    self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1661                ):
1662                    return
1663            elif self.pairing_method == PairingMethod.PASSKEY:
1664                assert self.passkey and self.confirm_value
1665                # Check that the random value matches what was committed to earlier
1666                confirm_verifier = crypto.f4(
1667                    self.pkb,
1668                    self.pka,
1669                    command.random_value,
1670                    bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]),
1671                )
1672                if not self.check_expected_value(
1673                    self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1674                ):
1675                    return
1676
1677                # Move on to the next iteration
1678                self.passkey_step += 1
1679                logger.debug(f'passkey finished step {self.passkey_step} of 20')
1680                if self.passkey_step < 20:
1681                    self.send_pairing_confirm_command()
1682                    return
1683            elif self.pairing_method != PairingMethod.OOB:
1684                return
1685        else:
1686            if self.pairing_method in (
1687                PairingMethod.JUST_WORKS,
1688                PairingMethod.NUMERIC_COMPARISON,
1689                PairingMethod.OOB,
1690            ):
1691                self.send_pairing_random_command()
1692            elif self.pairing_method == PairingMethod.PASSKEY:
1693                assert self.passkey and self.confirm_value
1694                # Check that the random value matches what was committed to earlier
1695                confirm_verifier = crypto.f4(
1696                    self.pka,
1697                    self.pkb,
1698                    command.random_value,
1699                    bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]),
1700                )
1701                if not self.check_expected_value(
1702                    self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1703                ):
1704                    return
1705
1706                self.send_pairing_random_command()
1707
1708                # Move on to the next iteration
1709                self.passkey_step += 1
1710                logger.debug(f'passkey finished step {self.passkey_step} of 20')
1711                if self.passkey_step < 20:
1712                    self.r = crypto.r()
1713                    return
1714            else:
1715                return
1716
1717        # Compute the MacKey and LTK
1718        a = self.ia + bytes([self.iat])
1719        b = self.ra + bytes([self.rat])
1720        (mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b)
1721
1722        # Compute the DH Key checks
1723        if self.pairing_method in (
1724            PairingMethod.JUST_WORKS,
1725            PairingMethod.NUMERIC_COMPARISON,
1726            PairingMethod.OOB,
1727        ):
1728            ra = bytes(16)
1729            rb = ra
1730        elif self.pairing_method == PairingMethod.PASSKEY:
1731            assert self.passkey
1732            ra = self.passkey.to_bytes(16, byteorder='little')
1733            rb = ra
1734        else:
1735            return
1736
1737        assert self.preq and self.pres
1738        io_cap_a = self.preq[1:4]
1739        io_cap_b = self.pres[1:4]
1740        self.ea = crypto.f6(mac_key, self.na, self.nb, rb, io_cap_a, a, b)
1741        self.eb = crypto.f6(mac_key, self.nb, self.na, ra, io_cap_b, b, a)
1742
1743        # Next steps to be performed after possible user confirmation
1744        def next_steps() -> None:
1745            # The initiator sends the DH Key check to the responder
1746            if self.is_initiator:
1747                self.send_pairing_dhkey_check_command()
1748            else:
1749                if self.wait_before_continuing:
1750                    self.wait_before_continuing.set_result(None)
1751
1752        # Prompt the user for confirmation if needed
1753        if self.pairing_method in (
1754            PairingMethod.JUST_WORKS,
1755            PairingMethod.NUMERIC_COMPARISON,
1756        ):
1757            # Compute the 6-digit code
1758            code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000
1759
1760            # Ask for user confirmation
1761            self.wait_before_continuing = asyncio.get_running_loop().create_future()
1762            if self.pairing_method == PairingMethod.JUST_WORKS:
1763                self.prompt_user_for_confirmation(next_steps)
1764            else:
1765                self.prompt_user_for_numeric_comparison(code, next_steps)
1766        else:
1767            next_steps()
1768
1769    def on_smp_pairing_random_command(
1770        self, command: SMP_Pairing_Random_Command
1771    ) -> None:
1772        self.peer_random_value = command.random_value
1773        if self.sc:
1774            self.on_smp_pairing_random_command_secure_connections(command)
1775        else:
1776            self.on_smp_pairing_random_command_legacy(command)
1777
1778    def on_smp_pairing_public_key_command(
1779        self, command: SMP_Pairing_Public_Key_Command
1780    ) -> None:
1781        # Store the public key so that we can compute the confirmation value later
1782        self.peer_public_key_x = command.public_key_x
1783        self.peer_public_key_y = command.public_key_y
1784
1785        # Compute the DH key
1786        self.dh_key = self.ecc_key.dh(
1787            command.public_key_x[::-1],
1788            command.public_key_y[::-1],
1789        )[::-1]
1790        logger.debug(f'DH key: {self.dh_key.hex()}')
1791
1792        if self.pairing_method == PairingMethod.OOB:
1793            # Check against shared OOB data
1794            if self.peer_oob_data:
1795                confirm_verifier = crypto.f4(
1796                    self.peer_public_key_x,
1797                    self.peer_public_key_x,
1798                    self.peer_oob_data.r,
1799                    bytes(1),
1800                )
1801                if not self.check_expected_value(
1802                    self.peer_oob_data.c,
1803                    confirm_verifier,
1804                    SMP_CONFIRM_VALUE_FAILED_ERROR,
1805                ):
1806                    return
1807
1808        if self.is_initiator:
1809            if self.pairing_method == PairingMethod.OOB:
1810                self.send_pairing_random_command()
1811            else:
1812                self.send_pairing_confirm_command()
1813        else:
1814            if self.pairing_method == PairingMethod.PASSKEY:
1815                self.display_or_input_passkey()
1816
1817            # Send our public key back to the initiator
1818            self.send_public_key_command()
1819
1820            if self.pairing_method in (
1821                PairingMethod.JUST_WORKS,
1822                PairingMethod.NUMERIC_COMPARISON,
1823                PairingMethod.OOB,
1824            ):
1825                # We can now send the confirmation value
1826                self.send_pairing_confirm_command()
1827
1828    def on_smp_pairing_dhkey_check_command(
1829        self, command: SMP_Pairing_DHKey_Check_Command
1830    ) -> None:
1831        # Check that what we received matches what we computed earlier
1832        expected = self.eb if self.is_initiator else self.ea
1833        assert expected
1834        if not self.check_expected_value(
1835            expected, command.dhkey_check, SMP_DHKEY_CHECK_FAILED_ERROR
1836        ):
1837            return
1838
1839        if self.is_responder:
1840            if self.wait_before_continuing is not None:
1841
1842                async def next_steps() -> None:
1843                    assert self.wait_before_continuing
1844                    await self.wait_before_continuing
1845                    self.wait_before_continuing = None
1846                    self.send_pairing_dhkey_check_command()
1847
1848                self.connection.abort_on('disconnection', next_steps())
1849            else:
1850                self.send_pairing_dhkey_check_command()
1851        else:
1852            self.start_encryption(self.ltk)
1853
1854    def on_smp_pairing_failed_command(
1855        self, command: SMP_Pairing_Failed_Command
1856    ) -> None:
1857        self.on_pairing_failure(command.reason)
1858
1859    def on_smp_encryption_information_command(
1860        self, command: SMP_Encryption_Information_Command
1861    ) -> None:
1862        self.peer_ltk = command.long_term_key
1863        self.check_key_distribution(SMP_Encryption_Information_Command)
1864
1865    def on_smp_master_identification_command(
1866        self, command: SMP_Master_Identification_Command
1867    ) -> None:
1868        self.peer_ediv = command.ediv
1869        self.peer_rand = command.rand
1870        self.check_key_distribution(SMP_Master_Identification_Command)
1871
1872    def on_smp_identity_information_command(
1873        self, command: SMP_Identity_Information_Command
1874    ) -> None:
1875        self.peer_identity_resolving_key = command.identity_resolving_key
1876        self.check_key_distribution(SMP_Identity_Information_Command)
1877
1878    def on_smp_identity_address_information_command(
1879        self, command: SMP_Identity_Address_Information_Command
1880    ) -> None:
1881        self.peer_bd_addr = command.bd_addr
1882        self.check_key_distribution(SMP_Identity_Address_Information_Command)
1883
1884    def on_smp_signing_information_command(
1885        self, command: SMP_Signing_Information_Command
1886    ) -> None:
1887        self.peer_signature_key = command.signature_key
1888        self.check_key_distribution(SMP_Signing_Information_Command)
1889
1890
1891# -----------------------------------------------------------------------------
1892class Manager(EventEmitter):
1893    '''
1894    Implements the Initiator and Responder roles of the Security Manager Protocol
1895    '''
1896
1897    device: Device
1898    sessions: Dict[int, Session]
1899    pairing_config_factory: Callable[[Connection], PairingConfig]
1900    session_proxy: Type[Session]
1901    _ecc_key: Optional[crypto.EccKey]
1902
1903    def __init__(
1904        self,
1905        device: Device,
1906        pairing_config_factory: Callable[[Connection], PairingConfig],
1907    ) -> None:
1908        super().__init__()
1909        self.device = device
1910        self.sessions = {}
1911        self._ecc_key = None
1912        self.pairing_config_factory = pairing_config_factory
1913        self.session_proxy = Session
1914
1915    def send_command(self, connection: Connection, command: SMP_Command) -> None:
1916        logger.debug(
1917            f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] '
1918            f'{connection.peer_address}: {command}'
1919        )
1920        cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
1921        connection.send_l2cap_pdu(cid, command.to_bytes())
1922
1923    def on_smp_security_request_command(
1924        self, connection: Connection, request: SMP_Security_Request_Command
1925    ) -> None:
1926        connection.emit('security_request', request.auth_req)
1927
1928    def on_smp_pdu(self, connection: Connection, pdu: bytes) -> None:
1929        # Parse the L2CAP payload into an SMP Command object
1930        command = SMP_Command.from_bytes(pdu)
1931        logger.debug(
1932            f'<<< Received SMP Command on connection [0x{connection.handle:04X}] '
1933            f'{connection.peer_address}: {command}'
1934        )
1935
1936        # Security request is more than just pairing, so let applications handle them
1937        if command.code == SMP_SECURITY_REQUEST_COMMAND:
1938            self.on_smp_security_request_command(
1939                connection, cast(SMP_Security_Request_Command, command)
1940            )
1941            return
1942
1943        # Look for a session with this connection, and create one if none exists
1944        if not (session := self.sessions.get(connection.handle)):
1945            if connection.role == BT_CENTRAL_ROLE:
1946                logger.warning('Remote starts pairing as Peripheral!')
1947            pairing_config = self.pairing_config_factory(connection)
1948            session = self.session_proxy(
1949                self, connection, pairing_config, is_initiator=False
1950            )
1951            self.sessions[connection.handle] = session
1952
1953        # Delegate the handling of the command to the session
1954        session.on_smp_command(command)
1955
1956    @property
1957    def ecc_key(self) -> crypto.EccKey:
1958        if self._ecc_key is None:
1959            self._ecc_key = crypto.EccKey.generate()
1960        assert self._ecc_key
1961        return self._ecc_key
1962
1963    async def pair(self, connection: Connection) -> None:
1964        # TODO: check if there's already a session for this connection
1965        if connection.role != BT_CENTRAL_ROLE:
1966            logger.warning('Start pairing as Peripheral!')
1967        pairing_config = self.pairing_config_factory(connection)
1968        session = self.session_proxy(
1969            self, connection, pairing_config, is_initiator=True
1970        )
1971        self.sessions[connection.handle] = session
1972        return await session.pair()
1973
1974    def request_pairing(self, connection: Connection) -> None:
1975        pairing_config = self.pairing_config_factory(connection)
1976        if pairing_config:
1977            auth_req = smp_auth_req(
1978                pairing_config.bonding,
1979                pairing_config.mitm,
1980                pairing_config.sc,
1981                False,
1982                False,
1983            )
1984        else:
1985            auth_req = 0
1986        self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req))
1987
1988    def on_session_start(self, session: Session) -> None:
1989        self.device.on_pairing_start(session.connection)
1990
1991    async def on_pairing(
1992        self, session: Session, identity_address: Optional[Address], keys: PairingKeys
1993    ) -> None:
1994        # Store the keys in the key store
1995        if self.device.keystore and identity_address is not None:
1996            # Make sure on_pairing emits after key update.
1997            await self.device.update_keys(str(identity_address), keys)
1998        # Notify the device
1999        self.device.on_pairing(session.connection, identity_address, keys, session.sc)
2000
2001    def on_pairing_failure(self, session: Session, reason: int) -> None:
2002        self.device.on_pairing_failure(session.connection, reason)
2003
2004    def on_session_end(self, session: Session) -> None:
2005        logger.debug(f'session end for connection 0x{session.connection.handle:04X}')
2006        if session.connection.handle in self.sessions:
2007            del self.sessions[session.connection.handle]
2008
2009    def get_long_term_key(
2010        self, connection: Connection, rand: bytes, ediv: int
2011    ) -> Optional[bytes]:
2012        if session := self.sessions.get(connection.handle):
2013            return session.get_long_term_key(rand, ediv)
2014
2015        return None
2016