• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# SMP - Security Manager Protocol
17#
18# See Bluetooth spec @ Vol 3, Part H
19#
20# -----------------------------------------------------------------------------
21
22# -----------------------------------------------------------------------------
23# Imports
24# -----------------------------------------------------------------------------
25from __future__ import annotations
26import logging
27import asyncio
28import secrets
29from typing import Dict, Optional, Type
30
31from pyee import EventEmitter
32
33from .colors import color
34from .hci import Address, HCI_LE_Enable_Encryption_Command, HCI_Object, key_with_value
35from .core import (
36    BT_BR_EDR_TRANSPORT,
37    BT_CENTRAL_ROLE,
38    BT_LE_TRANSPORT,
39    ProtocolError,
40    name_or_number,
41)
42from .keys import PairingKeys
43from . import crypto
44
45
46# -----------------------------------------------------------------------------
47# Logging
48# -----------------------------------------------------------------------------
49logger = logging.getLogger(__name__)
50
51
52# -----------------------------------------------------------------------------
53# Constants
54# -----------------------------------------------------------------------------
55# fmt: off
56# pylint: disable=line-too-long
57
58SMP_CID = 0x06
59SMP_BR_CID = 0x07
60
61SMP_PAIRING_REQUEST_COMMAND               = 0x01
62SMP_PAIRING_RESPONSE_COMMAND              = 0x02
63SMP_PAIRING_CONFIRM_COMMAND               = 0x03
64SMP_PAIRING_RANDOM_COMMAND                = 0x04
65SMP_PAIRING_FAILED_COMMAND                = 0x05
66SMP_ENCRYPTION_INFORMATION_COMMAND        = 0x06
67SMP_MASTER_IDENTIFICATION_COMMAND         = 0x07
68SMP_IDENTITY_INFORMATION_COMMAND          = 0x08
69SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND  = 0x09
70SMP_SIGNING_INFORMATION_COMMAND           = 0x0A
71SMP_SECURITY_REQUEST_COMMAND              = 0x0B
72SMP_PAIRING_PUBLIC_KEY_COMMAND            = 0x0C
73SMP_PAIRING_DHKEY_CHECK_COMMAND           = 0x0D
74SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND = 0x0E
75
76SMP_COMMAND_NAMES = {
77    SMP_PAIRING_REQUEST_COMMAND:               'SMP_PAIRING_REQUEST_COMMAND',
78    SMP_PAIRING_RESPONSE_COMMAND:              'SMP_PAIRING_RESPONSE_COMMAND',
79    SMP_PAIRING_CONFIRM_COMMAND:               'SMP_PAIRING_CONFIRM_COMMAND',
80    SMP_PAIRING_RANDOM_COMMAND:                'SMP_PAIRING_RANDOM_COMMAND',
81    SMP_PAIRING_FAILED_COMMAND:                'SMP_PAIRING_FAILED_COMMAND',
82    SMP_ENCRYPTION_INFORMATION_COMMAND:        'SMP_ENCRYPTION_INFORMATION_COMMAND',
83    SMP_MASTER_IDENTIFICATION_COMMAND:         'SMP_MASTER_IDENTIFICATION_COMMAND',
84    SMP_IDENTITY_INFORMATION_COMMAND:          'SMP_IDENTITY_INFORMATION_COMMAND',
85    SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND:  'SMP_IDENTITY_ADDRESS_INFORMATION_COMMAND',
86    SMP_SIGNING_INFORMATION_COMMAND:           'SMP_SIGNING_INFORMATION_COMMAND',
87    SMP_SECURITY_REQUEST_COMMAND:              'SMP_SECURITY_REQUEST_COMMAND',
88    SMP_PAIRING_PUBLIC_KEY_COMMAND:            'SMP_PAIRING_PUBLIC_KEY_COMMAND',
89    SMP_PAIRING_DHKEY_CHECK_COMMAND:           'SMP_PAIRING_DHKEY_CHECK_COMMAND',
90    SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND: 'SMP_PAIRING_KEYPRESS_NOTIFICATION_COMMAND'
91}
92
93SMP_DISPLAY_ONLY_IO_CAPABILITY       = 0x00
94SMP_DISPLAY_YES_NO_IO_CAPABILITY     = 0x01
95SMP_KEYBOARD_ONLY_IO_CAPABILITY      = 0x02
96SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY = 0x03
97SMP_KEYBOARD_DISPLAY_IO_CAPABILITY   = 0x04
98
99SMP_IO_CAPABILITY_NAMES = {
100    SMP_DISPLAY_ONLY_IO_CAPABILITY:       'SMP_DISPLAY_ONLY_IO_CAPABILITY',
101    SMP_DISPLAY_YES_NO_IO_CAPABILITY:     'SMP_DISPLAY_YES_NO_IO_CAPABILITY',
102    SMP_KEYBOARD_ONLY_IO_CAPABILITY:      'SMP_KEYBOARD_ONLY_IO_CAPABILITY',
103    SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: 'SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY',
104    SMP_KEYBOARD_DISPLAY_IO_CAPABILITY:   'SMP_KEYBOARD_DISPLAY_IO_CAPABILITY'
105}
106
107SMP_PASSKEY_ENTRY_FAILED_ERROR                       = 0x01
108SMP_OOB_NOT_AVAILABLE_ERROR                          = 0x02
109SMP_AUTHENTICATION_REQUIREMENTS_ERROR                = 0x03
110SMP_CONFIRM_VALUE_FAILED_ERROR                       = 0x04
111SMP_PAIRING_NOT_SUPPORTED_ERROR                      = 0x05
112SMP_ENCRYPTION_KEY_SIZE_ERROR                        = 0x06
113SMP_COMMAND_NOT_SUPPORTED_ERROR                      = 0x07
114SMP_UNSPECIFIED_REASON_ERROR                         = 0x08
115SMP_REPEATED_ATTEMPTS_ERROR                          = 0x09
116SMP_INVALID_PARAMETERS_ERROR                         = 0x0A
117SMP_DHKEY_CHECK_FAILED_ERROR                         = 0x0B
118SMP_NUMERIC_COMPARISON_FAILED_ERROR                  = 0x0C
119SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR                 = 0x0D
120SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR = 0x0E
121
122SMP_ERROR_NAMES = {
123    SMP_PASSKEY_ENTRY_FAILED_ERROR:                       'SMP_PASSKEY_ENTRY_FAILED_ERROR',
124    SMP_OOB_NOT_AVAILABLE_ERROR:                          'SMP_OOB_NOT_AVAILABLE_ERROR',
125    SMP_AUTHENTICATION_REQUIREMENTS_ERROR:                'SMP_AUTHENTICATION_REQUIREMENTS_ERROR',
126    SMP_CONFIRM_VALUE_FAILED_ERROR:                       'SMP_CONFIRM_VALUE_FAILED_ERROR',
127    SMP_PAIRING_NOT_SUPPORTED_ERROR:                      'SMP_PAIRING_NOT_SUPPORTED_ERROR',
128    SMP_ENCRYPTION_KEY_SIZE_ERROR:                        'SMP_ENCRYPTION_KEY_SIZE_ERROR',
129    SMP_COMMAND_NOT_SUPPORTED_ERROR:                      'SMP_COMMAND_NOT_SUPPORTED_ERROR',
130    SMP_UNSPECIFIED_REASON_ERROR:                         'SMP_UNSPECIFIED_REASON_ERROR',
131    SMP_REPEATED_ATTEMPTS_ERROR:                          'SMP_REPEATED_ATTEMPTS_ERROR',
132    SMP_INVALID_PARAMETERS_ERROR:                         'SMP_INVALID_PARAMETERS_ERROR',
133    SMP_DHKEY_CHECK_FAILED_ERROR:                         'SMP_DHKEY_CHECK_FAILED_ERROR',
134    SMP_NUMERIC_COMPARISON_FAILED_ERROR:                  'SMP_NUMERIC_COMPARISON_FAILED_ERROR',
135    SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR:                 'SMP_BD_EDR_PAIRING_IN_PROGRESS_ERROR',
136    SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR: 'SMP_CROSS_TRANSPORT_KEY_DERIVATION_NOT_ALLOWED_ERROR'
137}
138
139SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE   = 0
140SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE   = 1
141SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE    = 2
142SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE         = 3
143SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE = 4
144
145SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES = {
146    SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE:   'SMP_PASSKEY_ENTRY_STARTED_KEYPRESS_NOTIFICATION_TYPE',
147    SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE:   'SMP_PASSKEY_DIGIT_ENTERED_KEYPRESS_NOTIFICATION_TYPE',
148    SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE:    'SMP_PASSKEY_DIGIT_ERASED_KEYPRESS_NOTIFICATION_TYPE',
149    SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE:         'SMP_PASSKEY_CLEARED_KEYPRESS_NOTIFICATION_TYPE',
150    SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE: 'SMP_PASSKEY_ENTRY_COMPLETED_KEYPRESS_NOTIFICATION_TYPE'
151}
152
153# Bit flags for key distribution/generation
154SMP_ENC_KEY_DISTRIBUTION_FLAG  = 0b0001
155SMP_ID_KEY_DISTRIBUTION_FLAG   = 0b0010
156SMP_SIGN_KEY_DISTRIBUTION_FLAG = 0b0100
157SMP_LINK_KEY_DISTRIBUTION_FLAG = 0b1000
158
159# AuthReq fields
160SMP_BONDING_AUTHREQ  = 0b00000001
161SMP_MITM_AUTHREQ     = 0b00000100
162SMP_SC_AUTHREQ       = 0b00001000
163SMP_KEYPRESS_AUTHREQ = 0b00010000
164SMP_CT2_AUTHREQ      = 0b00100000
165
166# Crypto salt
167SMP_CTKD_H7_LEBR_SALT = bytes.fromhex('00000000000000000000000000000000746D7031')
168SMP_CTKD_H7_BRLE_SALT = bytes.fromhex('00000000000000000000000000000000746D7032')
169
170# fmt: on
171# pylint: enable=line-too-long
172# pylint: disable=invalid-name
173
174
175# -----------------------------------------------------------------------------
176# Utils
177# -----------------------------------------------------------------------------
178def error_name(error_code):
179    return name_or_number(SMP_ERROR_NAMES, error_code)
180
181
182# -----------------------------------------------------------------------------
183# Classes
184# -----------------------------------------------------------------------------
185class SMP_Command:
186    '''
187    See Bluetooth spec @ Vol 3, Part H - 3 SECURITY MANAGER PROTOCOL
188    '''
189
190    smp_classes: Dict[int, Type[SMP_Command]] = {}
191    code = 0
192    name = ''
193
194    @staticmethod
195    def from_bytes(pdu):
196        code = pdu[0]
197
198        cls = SMP_Command.smp_classes.get(code)
199        if cls is None:
200            instance = SMP_Command(pdu)
201            instance.name = SMP_Command.command_name(code)
202            instance.code = code
203            return instance
204        self = cls.__new__(cls)
205        SMP_Command.__init__(self, pdu)
206        if hasattr(self, 'fields'):
207            self.init_from_bytes(pdu, 1)
208        return self
209
210    @staticmethod
211    def command_name(code):
212        return name_or_number(SMP_COMMAND_NAMES, code)
213
214    @staticmethod
215    def auth_req_str(value):
216        bonding_flags = value & 3
217        mitm = (value >> 2) & 1
218        sc = (value >> 3) & 1
219        keypress = (value >> 4) & 1
220        ct2 = (value >> 5) & 1
221
222        return (
223            f'bonding_flags={bonding_flags}, '
224            f'MITM={mitm}, sc={sc}, keypress={keypress}, ct2={ct2}'
225        )
226
227    @staticmethod
228    def io_capability_name(io_capability):
229        return name_or_number(SMP_IO_CAPABILITY_NAMES, io_capability)
230
231    @staticmethod
232    def key_distribution_str(value):
233        key_types = []
234        if value & SMP_ENC_KEY_DISTRIBUTION_FLAG:
235            key_types.append('ENC')
236        if value & SMP_ID_KEY_DISTRIBUTION_FLAG:
237            key_types.append('ID')
238        if value & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
239            key_types.append('SIGN')
240        if value & SMP_LINK_KEY_DISTRIBUTION_FLAG:
241            key_types.append('LINK')
242        return ','.join(key_types)
243
244    @staticmethod
245    def keypress_notification_type_name(notification_type):
246        return name_or_number(SMP_KEYPRESS_NOTIFICATION_TYPE_NAMES, notification_type)
247
248    @staticmethod
249    def subclass(fields):
250        def inner(cls):
251            cls.name = cls.__name__.upper()
252            cls.code = key_with_value(SMP_COMMAND_NAMES, cls.name)
253            if cls.code is None:
254                raise KeyError(
255                    f'Command name {cls.name} not found in SMP_COMMAND_NAMES'
256                )
257            cls.fields = fields
258
259            # Register a factory for this class
260            SMP_Command.smp_classes[cls.code] = cls
261
262            return cls
263
264        return inner
265
266    def __init__(self, pdu=None, **kwargs):
267        if hasattr(self, 'fields') and kwargs:
268            HCI_Object.init_from_fields(self, self.fields, kwargs)
269        if pdu is None:
270            pdu = bytes([self.code]) + HCI_Object.dict_to_bytes(kwargs, self.fields)
271        self.pdu = pdu
272
273    def init_from_bytes(self, pdu, offset):
274        return HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
275
276    def to_bytes(self):
277        return self.pdu
278
279    def __bytes__(self):
280        return self.to_bytes()
281
282    def __str__(self):
283        result = color(self.name, 'yellow')
284        if fields := getattr(self, 'fields', None):
285            result += ':\n' + HCI_Object.format_fields(self.__dict__, fields, '  ')
286        else:
287            if len(self.pdu) > 1:
288                result += f': {self.pdu.hex()}'
289        return result
290
291
292# -----------------------------------------------------------------------------
293@SMP_Command.subclass(
294    [
295        ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
296        ('oob_data_flag', 1),
297        ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
298        ('maximum_encryption_key_size', 1),
299        (
300            'initiator_key_distribution',
301            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
302        ),
303        (
304            'responder_key_distribution',
305            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
306        ),
307    ]
308)
309class SMP_Pairing_Request_Command(SMP_Command):
310    '''
311    See Bluetooth spec @ Vol 3, Part H - 3.5.1 Pairing Request
312    '''
313
314
315# -----------------------------------------------------------------------------
316@SMP_Command.subclass(
317    [
318        ('io_capability', {'size': 1, 'mapper': SMP_Command.io_capability_name}),
319        ('oob_data_flag', 1),
320        ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
321        ('maximum_encryption_key_size', 1),
322        (
323            'initiator_key_distribution',
324            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
325        ),
326        (
327            'responder_key_distribution',
328            {'size': 1, 'mapper': SMP_Command.key_distribution_str},
329        ),
330    ]
331)
332class SMP_Pairing_Response_Command(SMP_Command):
333    '''
334    See Bluetooth spec @ Vol 3, Part H - 3.5.2 Pairing Response
335    '''
336
337
338# -----------------------------------------------------------------------------
339@SMP_Command.subclass([('confirm_value', 16)])
340class SMP_Pairing_Confirm_Command(SMP_Command):
341    '''
342    See Bluetooth spec @ Vol 3, Part H - 3.5.3 Pairing Confirm
343    '''
344
345
346# -----------------------------------------------------------------------------
347@SMP_Command.subclass([('random_value', 16)])
348class SMP_Pairing_Random_Command(SMP_Command):
349    '''
350    See Bluetooth spec @ Vol 3, Part H - 3.5.4 Pairing Random
351    '''
352
353
354# -----------------------------------------------------------------------------
355@SMP_Command.subclass([('reason', {'size': 1, 'mapper': error_name})])
356class SMP_Pairing_Failed_Command(SMP_Command):
357    '''
358    See Bluetooth spec @ Vol 3, Part H - 3.5.5 Pairing Failed
359    '''
360
361
362# -----------------------------------------------------------------------------
363@SMP_Command.subclass([('public_key_x', 32), ('public_key_y', 32)])
364class SMP_Pairing_Public_Key_Command(SMP_Command):
365    '''
366    See Bluetooth spec @ Vol 3, Part H - 3.5.6 Pairing Public Key
367    '''
368
369
370# -----------------------------------------------------------------------------
371@SMP_Command.subclass(
372    [
373        ('dhkey_check', 16),
374    ]
375)
376class SMP_Pairing_DHKey_Check_Command(SMP_Command):
377    '''
378    See Bluetooth spec @ Vol 3, Part H - 3.5.7 Pairing DHKey Check
379    '''
380
381
382# -----------------------------------------------------------------------------
383@SMP_Command.subclass(
384    [
385        (
386            'notification_type',
387            {'size': 1, 'mapper': SMP_Command.keypress_notification_type_name},
388        ),
389    ]
390)
391class SMP_Pairing_Keypress_Notification_Command(SMP_Command):
392    '''
393    See Bluetooth spec @ Vol 3, Part H - 3.5.8 Keypress Notification
394    '''
395
396
397# -----------------------------------------------------------------------------
398@SMP_Command.subclass([('long_term_key', 16)])
399class SMP_Encryption_Information_Command(SMP_Command):
400    '''
401    See Bluetooth spec @ Vol 3, Part H - 3.6.2 Encryption Information
402    '''
403
404
405# -----------------------------------------------------------------------------
406@SMP_Command.subclass([('ediv', 2), ('rand', 8)])
407class SMP_Master_Identification_Command(SMP_Command):
408    '''
409    See Bluetooth spec @ Vol 3, Part H - 3.6.3 Master Identification
410    '''
411
412
413# -----------------------------------------------------------------------------
414@SMP_Command.subclass([('identity_resolving_key', 16)])
415class SMP_Identity_Information_Command(SMP_Command):
416    '''
417    See Bluetooth spec @ Vol 3, Part H - 3.6.4 Identity Information
418    '''
419
420
421# -----------------------------------------------------------------------------
422@SMP_Command.subclass(
423    [
424        ('addr_type', Address.ADDRESS_TYPE_SPEC),
425        ('bd_addr', Address.parse_address_preceded_by_type),
426    ]
427)
428class SMP_Identity_Address_Information_Command(SMP_Command):
429    '''
430    See Bluetooth spec @ Vol 3, Part H - 3.6.5 Identity Address Information
431    '''
432
433
434# -----------------------------------------------------------------------------
435@SMP_Command.subclass([('signature_key', 16)])
436class SMP_Signing_Information_Command(SMP_Command):
437    '''
438    See Bluetooth spec @ Vol 3, Part H - 3.6.6 Signing Information
439    '''
440
441
442# -----------------------------------------------------------------------------
443@SMP_Command.subclass(
444    [
445        ('auth_req', {'size': 1, 'mapper': SMP_Command.auth_req_str}),
446    ]
447)
448class SMP_Security_Request_Command(SMP_Command):
449    '''
450    See Bluetooth spec @ Vol 3, Part H - 3.6.7 Security Request
451    '''
452
453
454# -----------------------------------------------------------------------------
455def smp_auth_req(bonding, mitm, sc, keypress, ct2):
456    value = 0
457    if bonding:
458        value |= SMP_BONDING_AUTHREQ
459    if mitm:
460        value |= SMP_MITM_AUTHREQ
461    if sc:
462        value |= SMP_SC_AUTHREQ
463    if keypress:
464        value |= SMP_KEYPRESS_AUTHREQ
465    if ct2:
466        value |= SMP_CT2_AUTHREQ
467    return value
468
469
470# -----------------------------------------------------------------------------
471class AddressResolver:
472    def __init__(self, resolving_keys):
473        self.resolving_keys = resolving_keys
474
475    def resolve(self, address):
476        address_bytes = bytes(address)
477        hash_part = address_bytes[0:3]
478        prand = address_bytes[3:6]
479        for (irk, resolved_address) in self.resolving_keys:
480            local_hash = crypto.ah(irk, prand)
481            if local_hash == hash_part:
482                # Match!
483                if resolved_address.address_type == Address.PUBLIC_DEVICE_ADDRESS:
484                    resolved_address_type = Address.PUBLIC_IDENTITY_ADDRESS
485                else:
486                    resolved_address_type = Address.RANDOM_IDENTITY_ADDRESS
487                return Address(
488                    address=str(resolved_address), address_type=resolved_address_type
489                )
490
491        return None
492
493
494# -----------------------------------------------------------------------------
495class PairingDelegate:
496    NO_OUTPUT_NO_INPUT = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
497    KEYBOARD_INPUT_ONLY = SMP_KEYBOARD_ONLY_IO_CAPABILITY
498    DISPLAY_OUTPUT_ONLY = SMP_DISPLAY_ONLY_IO_CAPABILITY
499    DISPLAY_OUTPUT_AND_YES_NO_INPUT = SMP_DISPLAY_YES_NO_IO_CAPABILITY
500    DISPLAY_OUTPUT_AND_KEYBOARD_INPUT = SMP_KEYBOARD_DISPLAY_IO_CAPABILITY
501    DEFAULT_KEY_DISTRIBUTION: int = (
502        SMP_ENC_KEY_DISTRIBUTION_FLAG | SMP_ID_KEY_DISTRIBUTION_FLAG
503    )
504
505    def __init__(
506        self,
507        io_capability: int = NO_OUTPUT_NO_INPUT,
508        local_initiator_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
509        local_responder_key_distribution: int = DEFAULT_KEY_DISTRIBUTION,
510    ) -> None:
511        self.io_capability = io_capability
512        self.local_initiator_key_distribution = local_initiator_key_distribution
513        self.local_responder_key_distribution = local_responder_key_distribution
514
515    async def accept(self) -> bool:
516        return True
517
518    async def confirm(self) -> bool:
519        return True
520
521    # pylint: disable-next=unused-argument
522    async def compare_numbers(self, number: int, digits: int) -> bool:
523        return True
524
525    async def get_number(self) -> Optional[int]:
526        '''
527        Returns an optional number as an answer to a passkey request.
528        Returning `None` will result in a negative reply.
529        '''
530        return 0
531
532    async def get_string(self, max_length) -> Optional[str]:
533        '''
534        Returns a string whose utf-8 encoding is up to max_length bytes.
535        '''
536        return None
537
538    # pylint: disable-next=unused-argument
539    async def display_number(self, number: int, digits: int) -> None:
540        pass
541
542    async def key_distribution_response(
543        self, peer_initiator_key_distribution, peer_responder_key_distribution
544    ):
545        return (
546            (peer_initiator_key_distribution & self.local_initiator_key_distribution),
547            (peer_responder_key_distribution & self.local_responder_key_distribution),
548        )
549
550
551# -----------------------------------------------------------------------------
552class PairingConfig:
553    def __init__(
554        self,
555        sc: bool = True,
556        mitm: bool = True,
557        bonding: bool = True,
558        delegate: Optional[PairingDelegate] = None,
559    ) -> None:
560        self.sc = sc
561        self.mitm = mitm
562        self.bonding = bonding
563        self.delegate = delegate or PairingDelegate()
564
565    def __str__(self):
566        io_capability_str = SMP_Command.io_capability_name(self.delegate.io_capability)
567        return (
568            f'PairingConfig(sc={self.sc}, '
569            f'mitm={self.mitm}, bonding={self.bonding}, '
570            f'delegate[{io_capability_str}])'
571        )
572
573
574# -----------------------------------------------------------------------------
575class Session:
576    # Pairing methods
577    JUST_WORKS = 0
578    NUMERIC_COMPARISON = 1
579    PASSKEY = 2
580    OOB = 3
581
582    PAIRING_METHOD_NAMES = {
583        JUST_WORKS: 'JUST_WORKS',
584        NUMERIC_COMPARISON: 'NUMERIC_COMPARISON',
585        PASSKEY: 'PASSKEY',
586        OOB: 'OOB',
587    }
588
589    # I/O Capability to pairing method decision matrix
590    #
591    # See Bluetooth spec @ Vol 3, part H - Table 2.8: Mapping of IO Capabilities to Key
592    # Generation Method
593    #
594    # Map: initiator -> responder -> <method>
595    # where <method> may be a simple entry or a 2-element tuple, with the first element
596    # for legacy pairing and the second  for secure connections, when the two are
597    # different. Each entry is either a method name, or, for PASSKEY, a tuple:
598    # (method, initiator_displays, responder_displays)
599    # to specify if the initiator and responder should display (True) or input a code
600    # (False).
601    PAIRING_METHODS = {
602        SMP_DISPLAY_ONLY_IO_CAPABILITY: {
603            SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS,
604            SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS,
605            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False),
606            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
607            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, True, False),
608        },
609        SMP_DISPLAY_YES_NO_IO_CAPABILITY: {
610            SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS,
611            SMP_DISPLAY_YES_NO_IO_CAPABILITY: (JUST_WORKS, NUMERIC_COMPARISON),
612            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False),
613            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
614            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (
615                (PASSKEY, True, False),
616                NUMERIC_COMPARISON,
617            ),
618        },
619        SMP_KEYBOARD_ONLY_IO_CAPABILITY: {
620            SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True),
621            SMP_DISPLAY_YES_NO_IO_CAPABILITY: (PASSKEY, False, True),
622            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, False, False),
623            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
624            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (PASSKEY, False, True),
625        },
626        SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: {
627            SMP_DISPLAY_ONLY_IO_CAPABILITY: JUST_WORKS,
628            SMP_DISPLAY_YES_NO_IO_CAPABILITY: JUST_WORKS,
629            SMP_KEYBOARD_ONLY_IO_CAPABILITY: JUST_WORKS,
630            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
631            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: JUST_WORKS,
632        },
633        SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: {
634            SMP_DISPLAY_ONLY_IO_CAPABILITY: (PASSKEY, False, True),
635            SMP_DISPLAY_YES_NO_IO_CAPABILITY: (
636                (PASSKEY, False, True),
637                NUMERIC_COMPARISON,
638            ),
639            SMP_KEYBOARD_ONLY_IO_CAPABILITY: (PASSKEY, True, False),
640            SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY: JUST_WORKS,
641            SMP_KEYBOARD_DISPLAY_IO_CAPABILITY: (
642                (PASSKEY, True, False),
643                NUMERIC_COMPARISON,
644            ),
645        },
646    }
647
648    def __init__(self, manager, connection, pairing_config):
649        self.manager = manager
650        self.connection = connection
651        self.preq = None
652        self.pres = None
653        self.ea = None
654        self.eb = None
655        self.tk = bytes(16)
656        self.r = bytes(16)
657        self.stk = None
658        self.ltk = None
659        self.ltk_ediv = 0
660        self.ltk_rand = bytes(8)
661        self.link_key = None
662        self.initiator_key_distribution = 0
663        self.responder_key_distribution = 0
664        self.peer_random_value = None
665        self.peer_public_key_x = bytes(32)
666        self.peer_public_key_y = bytes(32)
667        self.peer_ltk = None
668        self.peer_ediv = None
669        self.peer_rand = None
670        self.peer_identity_resolving_key = None
671        self.peer_bd_addr = None
672        self.peer_signature_key = None
673        self.peer_expected_distributions = []
674        self.dh_key = None
675        self.confirm_value = None
676        self.passkey = None
677        self.passkey_ready = asyncio.Event()
678        self.passkey_step = 0
679        self.passkey_display = False
680        self.pairing_method = 0
681        self.pairing_config = pairing_config
682        self.wait_before_continuing = None
683        self.completed = False
684        self.ctkd_task = None
685
686        # Decide if we're the initiator or the responder
687        self.is_initiator = connection.role == BT_CENTRAL_ROLE
688        self.is_responder = not self.is_initiator
689
690        # Listen for connection events
691        connection.on('disconnection', self.on_disconnection)
692        connection.on(
693            'connection_encryption_change', self.on_connection_encryption_change
694        )
695        connection.on(
696            'connection_encryption_key_refresh',
697            self.on_connection_encryption_key_refresh,
698        )
699
700        # Create a future that can be used to wait for the session to complete
701        if self.is_initiator:
702            self.pairing_result = asyncio.get_running_loop().create_future()
703        else:
704            self.pairing_result = None
705
706        # Key Distribution (default values before negotiation)
707        self.initiator_key_distribution = (
708            pairing_config.delegate.local_initiator_key_distribution
709        )
710        self.responder_key_distribution = (
711            pairing_config.delegate.local_responder_key_distribution
712        )
713
714        # Authentication Requirements Flags - Vol 3, Part H, Figure 3.3
715        self.bonding = pairing_config.bonding
716        self.sc = pairing_config.sc
717        self.mitm = pairing_config.mitm
718        self.keypress = False
719        self.ct2 = False
720
721        # I/O Capabilities
722        self.io_capability = pairing_config.delegate.io_capability
723        self.peer_io_capability = SMP_NO_INPUT_NO_OUTPUT_IO_CAPABILITY
724
725        # OOB (not supported yet)
726        self.oob = False
727
728        # Set up addresses
729        self_address = connection.self_address
730        peer_address = connection.peer_resolvable_address or connection.peer_address
731        if self.is_initiator:
732            self.ia = bytes(self_address)
733            self.iat = 1 if self_address.is_random else 0
734            self.ra = bytes(peer_address)
735            self.rat = 1 if peer_address.is_random else 0
736        else:
737            self.ra = bytes(self_address)
738            self.rat = 1 if self_address.is_random else 0
739            self.ia = bytes(peer_address)
740            self.iat = 1 if peer_address.is_random else 0
741
742    @property
743    def pkx(self):
744        return (bytes(reversed(self.manager.ecc_key.x)), self.peer_public_key_x)
745
746    @property
747    def pka(self):
748        return self.pkx[0 if self.is_initiator else 1]
749
750    @property
751    def pkb(self):
752        return self.pkx[0 if self.is_responder else 1]
753
754    @property
755    def nx(self):
756        return (self.r, self.peer_random_value)
757
758    @property
759    def na(self):
760        return self.nx[0 if self.is_initiator else 1]
761
762    @property
763    def nb(self):
764        return self.nx[0 if self.is_responder else 1]
765
766    @property
767    def auth_req(self):
768        return smp_auth_req(self.bonding, self.mitm, self.sc, self.keypress, self.ct2)
769
770    def get_long_term_key(self, rand, ediv):
771        if not self.sc and not self.completed:
772            if rand == self.ltk_rand and ediv == self.ltk_ediv:
773                return self.stk
774        else:
775            return self.ltk
776
777        return None
778
779    def decide_pairing_method(
780        self, auth_req, initiator_io_capability, responder_io_capability
781    ):
782        if (not self.mitm) and (auth_req & SMP_MITM_AUTHREQ == 0):
783            self.pairing_method = self.JUST_WORKS
784            return
785
786        details = self.PAIRING_METHODS[initiator_io_capability][responder_io_capability]
787        if isinstance(details, tuple) and len(details) == 2:
788            # One entry for legacy pairing and one for secure connections
789            details = details[1 if self.sc else 0]
790        if isinstance(details, int):
791            # Just a method ID
792            self.pairing_method = details
793        else:
794            # PASSKEY method, with a method ID and display/input flags
795            self.pairing_method = details[0]
796            self.passkey_display = details[1 if self.is_initiator else 2]
797
798    def check_expected_value(self, expected, received, error):
799        logger.debug(f'expected={expected.hex()} got={received.hex()}')
800        if expected != received:
801            logger.info(color('pairing confirm/check mismatch', 'red'))
802            self.send_pairing_failed(error)
803            return False
804        return True
805
806    def prompt_user_for_confirmation(self, next_steps):
807        async def prompt():
808            logger.debug('ask for confirmation')
809            try:
810                response = await self.pairing_config.delegate.confirm()
811                if response:
812                    next_steps()
813                    return
814            except Exception as error:
815                logger.warning(f'exception while confirm: {error}')
816
817            self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
818
819        self.connection.abort_on('disconnection', prompt())
820
821    def prompt_user_for_numeric_comparison(self, code, next_steps):
822        async def prompt():
823            logger.debug(f'verification code: {code}')
824            try:
825                response = await self.pairing_config.delegate.compare_numbers(
826                    code, digits=6
827                )
828                if response:
829                    next_steps()
830                    return
831            except Exception as error:
832                logger.warning(f'exception while prompting: {error}')
833
834            self.send_pairing_failed(SMP_CONFIRM_VALUE_FAILED_ERROR)
835
836        self.connection.abort_on('disconnection', prompt())
837
838    def prompt_user_for_number(self, next_steps):
839        async def prompt():
840            logger.debug('prompting user for passkey')
841            try:
842                passkey = await self.pairing_config.delegate.get_number()
843                logger.debug(f'user input: {passkey}')
844                next_steps(passkey)
845            except Exception as error:
846                logger.warning(f'exception while prompting: {error}')
847                self.send_pairing_failed(SMP_PASSKEY_ENTRY_FAILED_ERROR)
848
849        self.connection.abort_on('disconnection', prompt())
850
851    def display_passkey(self):
852        # Generate random Passkey/PIN code
853        self.passkey = secrets.randbelow(1000000)
854        logger.debug(f'Pairing PIN CODE: {self.passkey:06}')
855        self.passkey_ready.set()
856
857        # The value of TK is computed from the PIN code
858        if not self.sc:
859            self.tk = self.passkey.to_bytes(16, byteorder='little')
860            logger.debug(f'TK from passkey = {self.tk.hex()}')
861
862        self.connection.abort_on(
863            'disconnection',
864            self.pairing_config.delegate.display_number(self.passkey, digits=6),
865        )
866
867    def input_passkey(self, next_steps=None):
868        # Prompt the user for the passkey displayed on the peer
869        def after_input(passkey):
870            self.passkey = passkey
871
872            if not self.sc:
873                self.tk = passkey.to_bytes(16, byteorder='little')
874                logger.debug(f'TK from passkey = {self.tk.hex()}')
875
876            self.passkey_ready.set()
877
878            if next_steps is not None:
879                next_steps()
880
881        self.prompt_user_for_number(after_input)
882
883    def display_or_input_passkey(self, next_steps=None):
884        if self.passkey_display:
885            self.display_passkey()
886            if next_steps is not None:
887                next_steps()
888        else:
889            self.input_passkey(next_steps)
890
891    def send_command(self, command):
892        self.manager.send_command(self.connection, command)
893
894    def send_pairing_failed(self, error):
895        self.send_command(SMP_Pairing_Failed_Command(reason=error))
896        self.on_pairing_failure(error)
897
898    def send_pairing_request_command(self):
899        self.manager.on_session_start(self)
900
901        command = SMP_Pairing_Request_Command(
902            io_capability=self.io_capability,
903            oob_data_flag=0,
904            auth_req=self.auth_req,
905            maximum_encryption_key_size=16,
906            initiator_key_distribution=self.initiator_key_distribution,
907            responder_key_distribution=self.responder_key_distribution,
908        )
909        self.preq = bytes(command)
910        self.send_command(command)
911
912    def send_pairing_response_command(self):
913        response = SMP_Pairing_Response_Command(
914            io_capability=self.io_capability,
915            oob_data_flag=0,
916            auth_req=self.auth_req,
917            maximum_encryption_key_size=16,
918            initiator_key_distribution=self.initiator_key_distribution,
919            responder_key_distribution=self.responder_key_distribution,
920        )
921        self.pres = bytes(response)
922        self.send_command(response)
923
924    def send_pairing_confirm_command(self):
925        self.r = crypto.r()
926        logger.debug(f'generated random: {self.r.hex()}')
927
928        if self.sc:
929
930            async def next_steps():
931                if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
932                    z = 0
933                elif self.pairing_method == self.PASSKEY:
934                    # We need a passkey
935                    await self.passkey_ready.wait()
936
937                    z = 0x80 + ((self.passkey >> self.passkey_step) & 1)
938                else:
939                    return
940
941                if self.is_initiator:
942                    confirm_value = crypto.f4(self.pka, self.pkb, self.r, bytes([z]))
943                else:
944                    confirm_value = crypto.f4(self.pkb, self.pka, self.r, bytes([z]))
945
946                self.send_command(
947                    SMP_Pairing_Confirm_Command(confirm_value=confirm_value)
948                )
949
950            # Perform the next steps asynchronously in case we need to wait for input
951            self.connection.abort_on('disconnection', next_steps())
952        else:
953            confirm_value = crypto.c1(
954                self.tk,
955                self.r,
956                self.preq,
957                self.pres,
958                self.iat,
959                self.rat,
960                self.ia,
961                self.ra,
962            )
963
964            self.send_command(SMP_Pairing_Confirm_Command(confirm_value=confirm_value))
965
966    def send_pairing_random_command(self):
967        self.send_command(SMP_Pairing_Random_Command(random_value=self.r))
968
969    def send_public_key_command(self):
970        self.send_command(
971            SMP_Pairing_Public_Key_Command(
972                public_key_x=bytes(reversed(self.manager.ecc_key.x)),
973                public_key_y=bytes(reversed(self.manager.ecc_key.y)),
974            )
975        )
976
977    def send_pairing_dhkey_check_command(self):
978        self.send_command(
979            SMP_Pairing_DHKey_Check_Command(
980                dhkey_check=self.ea if self.is_initiator else self.eb
981            )
982        )
983
984    def start_encryption(self, key):
985        # We can now encrypt the connection with the short term key, so that we can
986        # distribute the long term and/or other keys over an encrypted connection
987        self.manager.device.host.send_command_sync(
988            HCI_LE_Enable_Encryption_Command(
989                connection_handle=self.connection.handle,
990                random_number=bytes(8),
991                encrypted_diversifier=0,
992                long_term_key=key,
993            )
994        )
995
996    async def derive_ltk(self):
997        link_key = await self.manager.device.get_link_key(self.connection.peer_address)
998        assert link_key is not None
999        ilk = (
1000            crypto.h7(salt=SMP_CTKD_H7_BRLE_SALT, w=link_key)
1001            if self.ct2
1002            else crypto.h6(link_key, b'tmp2')
1003        )
1004        self.ltk = crypto.h6(ilk, b'brle')
1005
1006    def distribute_keys(self):
1007        # Distribute the keys as required
1008        if self.is_initiator:
1009            # CTKD: Derive LTK from LinkKey
1010            if (
1011                self.connection.transport == BT_BR_EDR_TRANSPORT
1012                and self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
1013            ):
1014                self.ctkd_task = self.connection.abort_on(
1015                    'disconnection', self.derive_ltk()
1016                )
1017            elif not self.sc:
1018                # Distribute the LTK, EDIV and RAND
1019                if self.initiator_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
1020                    self.send_command(
1021                        SMP_Encryption_Information_Command(long_term_key=self.ltk)
1022                    )
1023                    self.send_command(
1024                        SMP_Master_Identification_Command(
1025                            ediv=self.ltk_ediv, rand=self.ltk_rand
1026                        )
1027                    )
1028
1029            # Distribute IRK & BD ADDR
1030            if self.initiator_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
1031                self.send_command(
1032                    SMP_Identity_Information_Command(
1033                        identity_resolving_key=self.manager.device.irk
1034                    )
1035                )
1036                self.send_command(
1037                    SMP_Identity_Address_Information_Command(
1038                        addr_type=self.connection.self_address.address_type,
1039                        bd_addr=self.connection.self_address,
1040                    )
1041                )
1042
1043            # Distribute CSRK
1044            csrk = bytes(16)  # FIXME: testing
1045            if self.initiator_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
1046                self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
1047
1048            # CTKD, calculate BR/EDR link key
1049            if self.initiator_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
1050                ilk = (
1051                    crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
1052                    if self.ct2
1053                    else crypto.h6(self.ltk, b'tmp1')
1054                )
1055                self.link_key = crypto.h6(ilk, b'lebr')
1056
1057        else:
1058            # CTKD: Derive LTK from LinkKey
1059            if (
1060                self.connection.transport == BT_BR_EDR_TRANSPORT
1061                and self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG
1062            ):
1063                self.ctkd_task = self.connection.abort_on(
1064                    'disconnection', self.derive_ltk()
1065                )
1066            # Distribute the LTK, EDIV and RAND
1067            elif not self.sc:
1068                if self.responder_key_distribution & SMP_ENC_KEY_DISTRIBUTION_FLAG:
1069                    self.send_command(
1070                        SMP_Encryption_Information_Command(long_term_key=self.ltk)
1071                    )
1072                    self.send_command(
1073                        SMP_Master_Identification_Command(
1074                            ediv=self.ltk_ediv, rand=self.ltk_rand
1075                        )
1076                    )
1077
1078            # Distribute IRK & BD ADDR
1079            if self.responder_key_distribution & SMP_ID_KEY_DISTRIBUTION_FLAG:
1080                self.send_command(
1081                    SMP_Identity_Information_Command(
1082                        identity_resolving_key=self.manager.device.irk
1083                    )
1084                )
1085                self.send_command(
1086                    SMP_Identity_Address_Information_Command(
1087                        addr_type=self.connection.self_address.address_type,
1088                        bd_addr=self.connection.self_address,
1089                    )
1090                )
1091
1092            # Distribute CSRK
1093            csrk = bytes(16)  # FIXME: testing
1094            if self.responder_key_distribution & SMP_SIGN_KEY_DISTRIBUTION_FLAG:
1095                self.send_command(SMP_Signing_Information_Command(signature_key=csrk))
1096
1097            # CTKD, calculate BR/EDR link key
1098            if self.responder_key_distribution & SMP_LINK_KEY_DISTRIBUTION_FLAG:
1099                ilk = (
1100                    crypto.h7(salt=SMP_CTKD_H7_LEBR_SALT, w=self.ltk)
1101                    if self.ct2
1102                    else crypto.h6(self.ltk, b'tmp1')
1103                )
1104                self.link_key = crypto.h6(ilk, b'lebr')
1105
1106    def compute_peer_expected_distributions(self, key_distribution_flags):
1107        # Set our expectations for what to wait for in the key distribution phase
1108        self.peer_expected_distributions = []
1109        if not self.sc and self.connection.transport == BT_LE_TRANSPORT:
1110            if key_distribution_flags & SMP_ENC_KEY_DISTRIBUTION_FLAG != 0:
1111                self.peer_expected_distributions.append(
1112                    SMP_Encryption_Information_Command
1113                )
1114                self.peer_expected_distributions.append(
1115                    SMP_Master_Identification_Command
1116                )
1117        if key_distribution_flags & SMP_ID_KEY_DISTRIBUTION_FLAG != 0:
1118            self.peer_expected_distributions.append(SMP_Identity_Information_Command)
1119            self.peer_expected_distributions.append(
1120                SMP_Identity_Address_Information_Command
1121            )
1122        if key_distribution_flags & SMP_SIGN_KEY_DISTRIBUTION_FLAG != 0:
1123            self.peer_expected_distributions.append(SMP_Signing_Information_Command)
1124        logger.debug(
1125            'expecting distributions: '
1126            f'{[c.__name__ for c in self.peer_expected_distributions]}'
1127        )
1128
1129    def check_key_distribution(self, command_class):
1130        # First, check that the connection is encrypted
1131        if not self.connection.is_encrypted:
1132            logger.warning(
1133                color('received key distribution on a non-encrypted connection', 'red')
1134            )
1135            self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
1136            return
1137
1138        # Check that this command class is expected
1139        if command_class in self.peer_expected_distributions:
1140            self.peer_expected_distributions.remove(command_class)
1141            logger.debug(
1142                'remaining distributions: '
1143                f'{[c.__name__ for c in self.peer_expected_distributions]}'
1144            )
1145            if not self.peer_expected_distributions:
1146                self.on_peer_key_distribution_complete()
1147        else:
1148            logger.warning(
1149                color(
1150                    '!!! unexpected key distribution command: '
1151                    f'{command_class.__name__}',
1152                    'red',
1153                )
1154            )
1155            self.send_pairing_failed(SMP_UNSPECIFIED_REASON_ERROR)
1156
1157    async def pair(self):
1158        # Start pairing as an initiator
1159        # TODO: check that this session isn't already active
1160
1161        # Send the pairing request to start the process
1162        self.send_pairing_request_command()
1163
1164        # Wait for the pairing process to finish
1165        await self.connection.abort_on('disconnection', self.pairing_result)
1166
1167    def on_disconnection(self, _):
1168        self.connection.remove_listener('disconnection', self.on_disconnection)
1169        self.connection.remove_listener(
1170            'connection_encryption_change', self.on_connection_encryption_change
1171        )
1172        self.connection.remove_listener(
1173            'connection_encryption_key_refresh',
1174            self.on_connection_encryption_key_refresh,
1175        )
1176        self.manager.on_session_end(self)
1177
1178    def on_peer_key_distribution_complete(self):
1179        # The initiator can now send its keys
1180        if self.is_initiator:
1181            self.distribute_keys()
1182
1183        self.connection.abort_on('disconnection', self.on_pairing())
1184
1185    def on_connection_encryption_change(self):
1186        if self.connection.is_encrypted:
1187            if self.is_responder:
1188                # The responder distributes its keys first, the initiator later
1189                self.distribute_keys()
1190
1191            # If we're not expecting key distributions from the peer, we're done
1192            if not self.peer_expected_distributions:
1193                self.on_peer_key_distribution_complete()
1194
1195    def on_connection_encryption_key_refresh(self):
1196        # Do as if the connection had just been encrypted
1197        self.on_connection_encryption_change()
1198
1199    async def on_pairing(self):
1200        logger.debug('pairing complete')
1201
1202        if self.completed:
1203            return
1204
1205        self.completed = True
1206
1207        if self.pairing_result is not None and not self.pairing_result.done():
1208            self.pairing_result.set_result(None)
1209
1210        # Use the peer address from the pairing protocol or the connection
1211        if self.peer_bd_addr:
1212            peer_address = self.peer_bd_addr
1213        else:
1214            peer_address = self.connection.peer_address
1215
1216        # Wait for link key fetch and key derivation
1217        if self.ctkd_task is not None:
1218            await self.ctkd_task
1219            self.ctkd_task = None
1220
1221        # Create an object to hold the keys
1222        keys = PairingKeys()
1223        keys.address_type = peer_address.address_type
1224        authenticated = self.pairing_method != self.JUST_WORKS
1225        if self.sc or self.connection.transport == BT_BR_EDR_TRANSPORT:
1226            keys.ltk = PairingKeys.Key(value=self.ltk, authenticated=authenticated)
1227        else:
1228            our_ltk_key = PairingKeys.Key(
1229                value=self.ltk,
1230                authenticated=authenticated,
1231                ediv=self.ltk_ediv,
1232                rand=self.ltk_rand,
1233            )
1234            peer_ltk_key = PairingKeys.Key(
1235                value=self.peer_ltk,
1236                authenticated=authenticated,
1237                ediv=self.peer_ediv,
1238                rand=self.peer_rand,
1239            )
1240            if self.is_initiator:
1241                keys.ltk_central = peer_ltk_key
1242                keys.ltk_peripheral = our_ltk_key
1243            else:
1244                keys.ltk_central = our_ltk_key
1245                keys.ltk_peripheral = peer_ltk_key
1246        if self.peer_identity_resolving_key is not None:
1247            keys.irk = PairingKeys.Key(
1248                value=self.peer_identity_resolving_key, authenticated=authenticated
1249            )
1250        if self.peer_signature_key is not None:
1251            keys.csrk = PairingKeys.Key(
1252                value=self.peer_signature_key, authenticated=authenticated
1253            )
1254        if self.link_key is not None:
1255            keys.link_key = PairingKeys.Key(
1256                value=self.link_key, authenticated=authenticated
1257            )
1258        self.manager.on_pairing(self, peer_address, keys)
1259
1260    def on_pairing_failure(self, reason):
1261        logger.warning(f'pairing failure ({error_name(reason)})')
1262
1263        if self.completed:
1264            return
1265
1266        self.completed = True
1267
1268        error = ProtocolError(reason, 'smp', error_name(reason))
1269        if self.pairing_result is not None and not self.pairing_result.done():
1270            self.pairing_result.set_exception(error)
1271        self.manager.on_pairing_failure(self, reason)
1272
1273    def on_smp_command(self, command):
1274        # Find the handler method
1275        handler_name = f'on_{command.name.lower()}'
1276        handler = getattr(self, handler_name, None)
1277        if handler is not None:
1278            try:
1279                handler(command)
1280            except Exception as error:
1281                logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
1282                response = SMP_Pairing_Failed_Command(
1283                    reason=SMP_UNSPECIFIED_REASON_ERROR
1284                )
1285                self.send_command(response)
1286        else:
1287            logger.error(color('SMP command not handled???', 'red'))
1288
1289    def on_smp_pairing_request_command(self, command):
1290        self.connection.abort_on(
1291            'disconnection', self.on_smp_pairing_request_command_async(command)
1292        )
1293
1294    async def on_smp_pairing_request_command_async(self, command):
1295        # Check if the request should proceed
1296        accepted = await self.pairing_config.delegate.accept()
1297        if not accepted:
1298            logger.debug('pairing rejected by delegate')
1299            self.send_pairing_failed(SMP_PAIRING_NOT_SUPPORTED_ERROR)
1300            return
1301
1302        # Save the request
1303        self.preq = bytes(command)
1304
1305        # Bonding and SC require both sides to request/support it
1306        self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
1307        self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
1308        self.ct2 = self.ct2 and (command.auth_req & SMP_CT2_AUTHREQ != 0)
1309
1310        # Check for OOB
1311        if command.oob_data_flag != 0:
1312            self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
1313            return
1314
1315        # Decide which pairing method to use
1316        self.decide_pairing_method(
1317            command.auth_req, command.io_capability, self.io_capability
1318        )
1319        logger.debug(
1320            f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}'
1321        )
1322
1323        # Key distribution
1324        (
1325            self.initiator_key_distribution,
1326            self.responder_key_distribution,
1327        ) = await self.pairing_config.delegate.key_distribution_response(
1328            command.initiator_key_distribution, command.responder_key_distribution
1329        )
1330        self.compute_peer_expected_distributions(self.initiator_key_distribution)
1331
1332        # The pairing is now starting
1333        self.manager.on_session_start(self)
1334
1335        # Display a passkey if we need to
1336        if not self.sc:
1337            if self.pairing_method == self.PASSKEY and self.passkey_display:
1338                self.display_passkey()
1339
1340        # Respond
1341        self.send_pairing_response_command()
1342
1343        # Vol 3, Part C, 5.2.2.1.3
1344        # CTKD over BR/EDR should happen after the connection has been encrypted,
1345        # so when receiving pairing requests, responder should start distributing keys
1346        if (
1347            self.connection.transport == BT_BR_EDR_TRANSPORT
1348            and self.connection.is_encrypted
1349            and self.is_responder
1350            and accepted
1351        ):
1352            self.distribute_keys()
1353
1354    def on_smp_pairing_response_command(self, command):
1355        if self.is_responder:
1356            logger.warning(color('received pairing response as a responder', 'red'))
1357            return
1358
1359        # Save the response
1360        self.pres = bytes(command)
1361        self.peer_io_capability = command.io_capability
1362
1363        # Bonding and SC require both sides to request/support it
1364        self.bonding = self.bonding and (command.auth_req & SMP_BONDING_AUTHREQ != 0)
1365        self.sc = self.sc and (command.auth_req & SMP_SC_AUTHREQ != 0)
1366
1367        # Check for OOB
1368        if self.sc and command.oob_data_flag:
1369            self.send_pairing_failed(SMP_OOB_NOT_AVAILABLE_ERROR)
1370            return
1371
1372        # Decide which pairing method to use
1373        self.decide_pairing_method(
1374            command.auth_req, self.io_capability, command.io_capability
1375        )
1376        logger.debug(
1377            f'pairing method: {self.PAIRING_METHOD_NAMES[self.pairing_method]}'
1378        )
1379
1380        # Key distribution
1381        if (
1382            command.initiator_key_distribution & ~self.initiator_key_distribution != 0
1383        ) or (
1384            command.responder_key_distribution & ~self.responder_key_distribution != 0
1385        ):
1386            # The response isn't a subset of the request
1387            self.send_pairing_failed(SMP_INVALID_PARAMETERS_ERROR)
1388            return
1389        self.initiator_key_distribution = command.initiator_key_distribution
1390        self.responder_key_distribution = command.responder_key_distribution
1391        self.compute_peer_expected_distributions(self.responder_key_distribution)
1392
1393        # Start phase 2
1394        if self.sc:
1395            if self.pairing_method == self.PASSKEY:
1396                self.display_or_input_passkey()
1397
1398            self.send_public_key_command()
1399        else:
1400            if self.pairing_method == self.PASSKEY:
1401                self.display_or_input_passkey(self.send_pairing_confirm_command)
1402            else:
1403                self.send_pairing_confirm_command()
1404
1405    def on_smp_pairing_confirm_command_legacy(self, _):
1406        if self.is_initiator:
1407            self.send_pairing_random_command()
1408        else:
1409            # If the method is PASSKEY, now is the time to input the code
1410            if self.pairing_method == self.PASSKEY and not self.passkey_display:
1411                self.input_passkey(self.send_pairing_confirm_command)
1412            else:
1413                self.send_pairing_confirm_command()
1414
1415    def on_smp_pairing_confirm_command_secure_connections(self, _):
1416        if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
1417            if self.is_initiator:
1418                self.r = crypto.r()
1419                self.send_pairing_random_command()
1420        elif self.pairing_method == self.PASSKEY:
1421            if self.is_initiator:
1422                self.send_pairing_random_command()
1423            else:
1424                self.send_pairing_confirm_command()
1425
1426    def on_smp_pairing_confirm_command(self, command):
1427        self.confirm_value = command.confirm_value
1428        if self.sc:
1429            self.on_smp_pairing_confirm_command_secure_connections(command)
1430        else:
1431            self.on_smp_pairing_confirm_command_legacy(command)
1432
1433    def on_smp_pairing_random_command_legacy(self, command):
1434        # Check that the confirmation values match
1435        confirm_verifier = crypto.c1(
1436            self.tk,
1437            command.random_value,
1438            self.preq,
1439            self.pres,
1440            self.iat,
1441            self.rat,
1442            self.ia,
1443            self.ra,
1444        )
1445        if not self.check_expected_value(
1446            self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1447        ):
1448            return
1449
1450        # Compute STK
1451        if self.is_initiator:
1452            mrand = self.r
1453            srand = command.random_value
1454        else:
1455            srand = self.r
1456            mrand = command.random_value
1457        self.stk = crypto.s1(self.tk, srand, mrand)
1458        logger.debug(f'STK = {self.stk.hex()}')
1459
1460        # Generate LTK
1461        self.ltk = crypto.r()
1462
1463        if self.is_initiator:
1464            self.start_encryption(self.stk)
1465        else:
1466            self.send_pairing_random_command()
1467
1468    def on_smp_pairing_random_command_secure_connections(self, command):
1469        if self.pairing_method == self.PASSKEY and self.passkey is None:
1470            logger.warning('no passkey entered, ignoring command')
1471            return
1472
1473        # pylint: disable=too-many-return-statements
1474        if self.is_initiator:
1475            if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
1476                # Check that the random value matches what was committed to earlier
1477                confirm_verifier = crypto.f4(
1478                    self.pkb, self.pka, command.random_value, bytes([0])
1479                )
1480                if not self.check_expected_value(
1481                    self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1482                ):
1483                    return
1484            elif self.pairing_method == self.PASSKEY:
1485                # Check that the random value matches what was committed to earlier
1486                confirm_verifier = crypto.f4(
1487                    self.pkb,
1488                    self.pka,
1489                    command.random_value,
1490                    bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]),
1491                )
1492                if not self.check_expected_value(
1493                    self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1494                ):
1495                    return
1496
1497                # Move on to the next iteration
1498                self.passkey_step += 1
1499                logger.debug(f'passkey finished step {self.passkey_step} of 20')
1500                if self.passkey_step < 20:
1501                    self.send_pairing_confirm_command()
1502                    return
1503            else:
1504                return
1505        else:
1506            if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
1507                self.send_pairing_random_command()
1508            elif self.pairing_method == self.PASSKEY:
1509                # Check that the random value matches what was committed to earlier
1510                confirm_verifier = crypto.f4(
1511                    self.pka,
1512                    self.pkb,
1513                    command.random_value,
1514                    bytes([0x80 + ((self.passkey >> self.passkey_step) & 1)]),
1515                )
1516                if not self.check_expected_value(
1517                    self.confirm_value, confirm_verifier, SMP_CONFIRM_VALUE_FAILED_ERROR
1518                ):
1519                    return
1520
1521                self.send_pairing_random_command()
1522
1523                # Move on to the next iteration
1524                self.passkey_step += 1
1525                logger.debug(f'passkey finished step {self.passkey_step} of 20')
1526                if self.passkey_step < 20:
1527                    self.r = crypto.r()
1528                    return
1529            else:
1530                return
1531
1532        # Compute the MacKey and LTK
1533        a = self.ia + bytes([self.iat])
1534        b = self.ra + bytes([self.rat])
1535        (mac_key, self.ltk) = crypto.f5(self.dh_key, self.na, self.nb, a, b)
1536
1537        # Compute the DH Key checks
1538        if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
1539            ra = bytes(16)
1540            rb = ra
1541        elif self.pairing_method == self.PASSKEY:
1542            ra = self.passkey.to_bytes(16, byteorder='little')
1543            rb = ra
1544        else:
1545            # OOB not implemented yet
1546            return
1547
1548        io_cap_a = self.preq[1:4]
1549        io_cap_b = self.pres[1:4]
1550        self.ea = crypto.f6(mac_key, self.na, self.nb, rb, io_cap_a, a, b)
1551        self.eb = crypto.f6(mac_key, self.nb, self.na, ra, io_cap_b, b, a)
1552
1553        # Next steps to be performed after possible user confirmation
1554        def next_steps():
1555            # The initiator sends the DH Key check to the responder
1556            if self.is_initiator:
1557                self.send_pairing_dhkey_check_command()
1558            else:
1559                if self.wait_before_continuing:
1560                    self.wait_before_continuing.set_result(None)
1561
1562        # Prompt the user for confirmation if needed
1563        if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
1564            # Compute the 6-digit code
1565            code = crypto.g2(self.pka, self.pkb, self.na, self.nb) % 1000000
1566
1567            # Ask for user confirmation
1568            self.wait_before_continuing = asyncio.get_running_loop().create_future()
1569            if self.pairing_method == self.JUST_WORKS:
1570                self.prompt_user_for_confirmation(next_steps)
1571            else:
1572                self.prompt_user_for_numeric_comparison(code, next_steps)
1573        else:
1574            next_steps()
1575
1576    def on_smp_pairing_random_command(self, command):
1577        self.peer_random_value = command.random_value
1578        if self.sc:
1579            self.on_smp_pairing_random_command_secure_connections(command)
1580        else:
1581            self.on_smp_pairing_random_command_legacy(command)
1582
1583    def on_smp_pairing_public_key_command(self, command):
1584        # Store the public key so that we can compute the confirmation value later
1585        self.peer_public_key_x = command.public_key_x
1586        self.peer_public_key_y = command.public_key_y
1587
1588        # Compute the DH key
1589        self.dh_key = bytes(
1590            reversed(
1591                self.manager.ecc_key.dh(
1592                    bytes(reversed(command.public_key_x)),
1593                    bytes(reversed(command.public_key_y)),
1594                )
1595            )
1596        )
1597        logger.debug(f'DH key: {self.dh_key.hex()}')
1598
1599        if self.is_initiator:
1600            self.send_pairing_confirm_command()
1601        else:
1602            if self.pairing_method == self.PASSKEY:
1603                self.display_or_input_passkey()
1604
1605            # Send our public key back to the initiator
1606            self.send_public_key_command()
1607
1608            if self.pairing_method in (self.JUST_WORKS, self.NUMERIC_COMPARISON):
1609                # We can now send the confirmation value
1610                self.send_pairing_confirm_command()
1611
1612    def on_smp_pairing_dhkey_check_command(self, command):
1613        # Check that what we received matches what we computed earlier
1614        expected = self.eb if self.is_initiator else self.ea
1615        if not self.check_expected_value(
1616            expected, command.dhkey_check, SMP_DHKEY_CHECK_FAILED_ERROR
1617        ):
1618            return
1619
1620        if self.is_responder:
1621            if self.wait_before_continuing is not None:
1622
1623                async def next_steps():
1624                    await self.wait_before_continuing
1625                    self.wait_before_continuing = None
1626                    self.send_pairing_dhkey_check_command()
1627
1628                self.connection.abort_on('disconnection', next_steps())
1629            else:
1630                self.send_pairing_dhkey_check_command()
1631        else:
1632            self.start_encryption(self.ltk)
1633
1634    def on_smp_pairing_failed_command(self, command):
1635        self.on_pairing_failure(command.reason)
1636
1637    def on_smp_encryption_information_command(self, command):
1638        self.peer_ltk = command.long_term_key
1639        self.check_key_distribution(SMP_Encryption_Information_Command)
1640
1641    def on_smp_master_identification_command(self, command):
1642        self.peer_ediv = command.ediv
1643        self.peer_rand = command.rand
1644        self.check_key_distribution(SMP_Master_Identification_Command)
1645
1646    def on_smp_identity_information_command(self, command):
1647        self.peer_identity_resolving_key = command.identity_resolving_key
1648        self.check_key_distribution(SMP_Identity_Information_Command)
1649
1650    def on_smp_identity_address_information_command(self, command):
1651        self.peer_bd_addr = command.bd_addr
1652        self.check_key_distribution(SMP_Identity_Address_Information_Command)
1653
1654    def on_smp_signing_information_command(self, command):
1655        self.peer_signature_key = command.signature_key
1656        self.check_key_distribution(SMP_Signing_Information_Command)
1657
1658
1659# -----------------------------------------------------------------------------
1660class Manager(EventEmitter):
1661    '''
1662    Implements the Initiator and Responder roles of the Security Manager Protocol
1663    '''
1664
1665    def __init__(self, device):
1666        super().__init__()
1667        self.device = device
1668        self.sessions = {}
1669        self._ecc_key = None
1670        self.pairing_config_factory = lambda connection: PairingConfig()
1671
1672    def send_command(self, connection, command):
1673        logger.debug(
1674            f'>>> Sending SMP Command on connection [0x{connection.handle:04X}] '
1675            f'{connection.peer_address}: {command}'
1676        )
1677        cid = SMP_BR_CID if connection.transport == BT_BR_EDR_TRANSPORT else SMP_CID
1678        connection.send_l2cap_pdu(cid, command.to_bytes())
1679
1680    def on_smp_pdu(self, connection, pdu):
1681        # Look for a session with this connection, and create one if none exists
1682        if not (session := self.sessions.get(connection.handle)):
1683            pairing_config = self.pairing_config_factory(connection)
1684            if pairing_config is None:
1685                # Pairing disabled
1686                self.send_command(
1687                    connection,
1688                    SMP_Pairing_Failed_Command(reason=SMP_PAIRING_NOT_SUPPORTED_ERROR),
1689                )
1690                return
1691            session = Session(self, connection, pairing_config)
1692            self.sessions[connection.handle] = session
1693
1694        # Parse the L2CAP payload into an SMP Command object
1695        command = SMP_Command.from_bytes(pdu)
1696        logger.debug(
1697            f'<<< Received SMP Command on connection [0x{connection.handle:04X}] '
1698            f'{connection.peer_address}: {command}'
1699        )
1700
1701        # Delegate the handling of the command to the session
1702        session.on_smp_command(command)
1703
1704    @property
1705    def ecc_key(self):
1706        if self._ecc_key is None:
1707            self._ecc_key = crypto.EccKey.generate()
1708        return self._ecc_key
1709
1710    async def pair(self, connection):
1711        # TODO: check if there's already a session for this connection
1712        pairing_config = self.pairing_config_factory(connection)
1713        if pairing_config is None:
1714            raise ValueError('pairing config must not be None when initiating')
1715        session = Session(self, connection, pairing_config)
1716        self.sessions[connection.handle] = session
1717        return await session.pair()
1718
1719    def request_pairing(self, connection):
1720        pairing_config = self.pairing_config_factory(connection)
1721        if pairing_config:
1722            auth_req = smp_auth_req(
1723                pairing_config.bonding,
1724                pairing_config.mitm,
1725                pairing_config.sc,
1726                False,
1727                False,
1728            )
1729        else:
1730            auth_req = 0
1731        self.send_command(connection, SMP_Security_Request_Command(auth_req=auth_req))
1732
1733    def on_session_start(self, session):
1734        self.device.on_pairing_start(session.connection.handle)
1735
1736    def on_pairing(self, session, identity_address, keys):
1737        # Store the keys in the key store
1738        if self.device.keystore and identity_address is not None:
1739
1740            async def store_keys():
1741                try:
1742                    await self.device.keystore.update(str(identity_address), keys)
1743                except Exception as error:
1744                    logger.warning(f'!!! error while storing keys: {error}')
1745
1746            self.device.abort_on('flush', store_keys())
1747
1748        # Notify the device
1749        self.device.on_pairing(session.connection.handle, keys, session.sc)
1750
1751    def on_pairing_failure(self, session, reason):
1752        self.device.on_pairing_failure(session.connection.handle, reason)
1753
1754    def on_session_end(self, session):
1755        logger.debug(f'session end for connection 0x{session.connection.handle:04X}')
1756        if session.connection.handle in self.sessions:
1757            del self.sessions[session.connection.handle]
1758
1759    def get_long_term_key(self, connection, rand, ediv):
1760        if session := self.sessions.get(connection.handle):
1761            return session.get_long_term_key(rand, ediv)
1762
1763        return None
1764