• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21import click
22from prompt_toolkit.shortcuts import PromptSession
23
24from bumble.colors import color
25from bumble.device import Device, Peer
26from bumble.transport import open_transport_or_link
27from bumble.pairing import OobData, PairingDelegate, PairingConfig
28from bumble.smp import OobContext, OobLegacyContext
29from bumble.smp import error_name as smp_error_name
30from bumble.keys import JsonKeyStore
31from bumble.core import (
32    AdvertisingData,
33    ProtocolError,
34    BT_LE_TRANSPORT,
35    BT_BR_EDR_TRANSPORT,
36)
37from bumble.gatt import (
38    GATT_DEVICE_NAME_CHARACTERISTIC,
39    GATT_GENERIC_ACCESS_SERVICE,
40    Service,
41    Characteristic,
42    CharacteristicValue,
43)
44from bumble.att import (
45    ATT_Error,
46    ATT_INSUFFICIENT_AUTHENTICATION_ERROR,
47    ATT_INSUFFICIENT_ENCRYPTION_ERROR,
48)
49
50
51# -----------------------------------------------------------------------------
52class Waiter:
53    instance = None
54
55    def __init__(self, linger=False):
56        self.done = asyncio.get_running_loop().create_future()
57        self.linger = linger
58
59    def terminate(self):
60        if not self.linger:
61            self.done.set_result(None)
62
63    async def wait_until_terminated(self):
64        return await self.done
65
66
67# -----------------------------------------------------------------------------
68class Delegate(PairingDelegate):
69    def __init__(self, mode, connection, capability_string, do_prompt):
70        super().__init__(
71            io_capability={
72                'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY,
73                'display': PairingDelegate.DISPLAY_OUTPUT_ONLY,
74                'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
75                'display+yes/no': PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
76                'none': PairingDelegate.NO_OUTPUT_NO_INPUT,
77            }[capability_string.lower()]
78        )
79
80        self.mode = mode
81        self.peer = Peer(connection)
82        self.peer_name = None
83        self.do_prompt = do_prompt
84
85    def print(self, message):
86        print(color(message, 'yellow'))
87
88    async def prompt(self, message):
89        # Wait a bit to allow some of the log lines to print before we prompt
90        await asyncio.sleep(1)
91
92        session = PromptSession(message)
93        response = await session.prompt_async()
94        return response.lower().strip()
95
96    async def update_peer_name(self):
97        if self.peer_name is not None:
98            # We already asked the peer
99            return
100
101        # Try to get the peer's name
102        if self.peer:
103            peer_name = await get_peer_name(self.peer, self.mode)
104            self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]'
105        else:
106            self.peer_name = '[?]'
107
108    async def accept(self):
109        if self.do_prompt:
110            await self.update_peer_name()
111
112            # Prompt for acceptance
113            self.print('###-----------------------------------')
114            self.print(f'### Pairing request from {self.peer_name}')
115            self.print('###-----------------------------------')
116            while True:
117                response = await self.prompt('>>> Accept? ')
118
119                if response == 'yes':
120                    return True
121
122                if response == 'no':
123                    return False
124
125        # Accept silently
126        return True
127
128    async def compare_numbers(self, number, digits):
129        await self.update_peer_name()
130
131        # Prompt for a numeric comparison
132        self.print('###-----------------------------------')
133        self.print(f'### Pairing with {self.peer_name}')
134        self.print('###-----------------------------------')
135        while True:
136            response = await self.prompt(
137                f'>>> Does the other device display {number:0{digits}}? '
138            )
139
140            if response == 'yes':
141                return True
142
143            if response == 'no':
144                return False
145
146    async def get_number(self):
147        await self.update_peer_name()
148
149        # Prompt for a PIN
150        while True:
151            try:
152                self.print('###-----------------------------------')
153                self.print(f'### Pairing with {self.peer_name}')
154                self.print('###-----------------------------------')
155                return int(await self.prompt('>>> Enter PIN: '))
156            except ValueError:
157                pass
158
159    async def display_number(self, number, digits):
160        await self.update_peer_name()
161
162        # Display a PIN code
163        self.print('###-----------------------------------')
164        self.print(f'### Pairing with {self.peer_name}')
165        self.print(f'### PIN: {number:0{digits}}')
166        self.print('###-----------------------------------')
167
168    async def get_string(self, max_length: int):
169        await self.update_peer_name()
170
171        # Prompt a PIN (for legacy pairing in classic)
172        self.print('###-----------------------------------')
173        self.print(f'### Pairing with {self.peer_name}')
174        self.print('###-----------------------------------')
175        count = 0
176        while True:
177            response = await self.prompt('>>> Enter PIN (1-6 chars):')
178            if len(response) == 0:
179                count += 1
180                if count > 3:
181                    self.print('too many tries, stopping the pairing')
182                    return None
183
184                self.print('no PIN was entered, try again')
185                continue
186            return response
187
188
189# -----------------------------------------------------------------------------
190async def get_peer_name(peer, mode):
191    if mode == 'classic':
192        return await peer.request_name()
193
194    # Try to get the peer name from GATT
195    services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE)
196    if not services:
197        return None
198
199    values = await peer.read_characteristics_by_uuid(
200        GATT_DEVICE_NAME_CHARACTERISTIC, services[0]
201    )
202    if values:
203        return values[0].decode('utf-8')
204
205    return None
206
207
208# -----------------------------------------------------------------------------
209AUTHENTICATION_ERROR_RETURNED = [False, False]
210
211
212def read_with_error(connection):
213    if not connection.is_encrypted:
214        raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR)
215
216    if AUTHENTICATION_ERROR_RETURNED[0]:
217        return bytes([1])
218
219    AUTHENTICATION_ERROR_RETURNED[0] = True
220    raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR)
221
222
223def write_with_error(connection, _value):
224    if not connection.is_encrypted:
225        raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR)
226
227    if not AUTHENTICATION_ERROR_RETURNED[1]:
228        AUTHENTICATION_ERROR_RETURNED[1] = True
229        raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR)
230
231
232# -----------------------------------------------------------------------------
233def on_connection(connection, request):
234    print(color(f'<<< Connection: {connection}', 'green'))
235
236    # Listen for pairing events
237    connection.on('pairing_start', on_pairing_start)
238    connection.on('pairing', lambda keys: on_pairing(connection.peer_address, keys))
239    connection.on('pairing_failure', on_pairing_failure)
240
241    # Listen for encryption changes
242    connection.on(
243        'connection_encryption_change',
244        lambda: on_connection_encryption_change(connection),
245    )
246
247    # Request pairing if needed
248    if request:
249        print(color('>>> Requesting pairing', 'green'))
250        connection.request_pairing()
251
252
253# -----------------------------------------------------------------------------
254def on_connection_encryption_change(connection):
255    print(color('@@@-----------------------------------', 'blue'))
256    print(
257        color(
258            f'@@@ Connection is {"" if connection.is_encrypted else "not"}encrypted',
259            'blue',
260        )
261    )
262    print(color('@@@-----------------------------------', 'blue'))
263
264
265# -----------------------------------------------------------------------------
266def on_pairing_start():
267    print(color('***-----------------------------------', 'magenta'))
268    print(color('*** Pairing starting', 'magenta'))
269    print(color('***-----------------------------------', 'magenta'))
270
271
272# -----------------------------------------------------------------------------
273def on_pairing(address, keys):
274    print(color('***-----------------------------------', 'cyan'))
275    print(color(f'*** Paired! (peer identity={address})', 'cyan'))
276    keys.print(prefix=color('*** ', 'cyan'))
277    print(color('***-----------------------------------', 'cyan'))
278    Waiter.instance.terminate()
279
280
281# -----------------------------------------------------------------------------
282def on_pairing_failure(reason):
283    print(color('***-----------------------------------', 'red'))
284    print(color(f'*** Pairing failed: {smp_error_name(reason)}', 'red'))
285    print(color('***-----------------------------------', 'red'))
286    Waiter.instance.terminate()
287
288
289# -----------------------------------------------------------------------------
290async def pair(
291    mode,
292    sc,
293    mitm,
294    bond,
295    ctkd,
296    linger,
297    io,
298    oob,
299    prompt,
300    request,
301    print_keys,
302    keystore_file,
303    device_config,
304    hci_transport,
305    address_or_name,
306):
307    Waiter.instance = Waiter(linger=linger)
308
309    print('<<< connecting to HCI...')
310    async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
311        print('<<< connected')
312
313        # Create a device to manage the host
314        device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
315
316        # Expose a GATT characteristic that can be used to trigger pairing by
317        # responding with an authentication error when read
318        if mode == 'le':
319            device.le_enabled = True
320            device.add_service(
321                Service(
322                    '50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
323                    [
324                        Characteristic(
325                            '552957FB-CF1F-4A31-9535-E78847E1A714',
326                            Characteristic.Properties.READ
327                            | Characteristic.Properties.WRITE,
328                            Characteristic.READABLE | Characteristic.WRITEABLE,
329                            CharacteristicValue(
330                                read=read_with_error, write=write_with_error
331                            ),
332                        )
333                    ],
334                )
335            )
336
337        # Select LE or Classic
338        if mode == 'classic':
339            device.classic_enabled = True
340            device.classic_smp_enabled = ctkd
341
342        # Get things going
343        await device.power_on()
344
345        # Set a custom keystore if specified on the command line
346        if keystore_file:
347            device.keystore = JsonKeyStore.from_device(device, filename=keystore_file)
348
349        # Print the existing keys before pairing
350        if print_keys and device.keystore:
351            print(color('@@@-----------------------------------', 'blue'))
352            print(color('@@@ Pairing Keys:', 'blue'))
353            await device.keystore.print(prefix=color('@@@ ', 'blue'))
354            print(color('@@@-----------------------------------', 'blue'))
355
356        # Create an OOB context if needed
357        if oob:
358            our_oob_context = OobContext()
359            shared_data = (
360                None
361                if oob == '-'
362                else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob)))
363            )
364            legacy_context = OobLegacyContext()
365            oob_contexts = PairingConfig.OobConfig(
366                our_context=our_oob_context,
367                peer_data=shared_data,
368                legacy_context=legacy_context,
369            )
370            oob_data = OobData(
371                address=device.random_address,
372                shared_data=shared_data,
373                legacy_context=legacy_context,
374            )
375            print(color('@@@-----------------------------------', 'yellow'))
376            print(color('@@@ OOB Data:', 'yellow'))
377            print(color(f'@@@   {our_oob_context.share()}', 'yellow'))
378            print(color(f'@@@   TK={legacy_context.tk.hex()}', 'yellow'))
379            print(color(f'@@@   HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow'))
380            print(color('@@@-----------------------------------', 'yellow'))
381        else:
382            oob_contexts = None
383
384        # Set up a pairing config factory
385        device.pairing_config_factory = lambda connection: PairingConfig(
386            sc=sc,
387            mitm=mitm,
388            bonding=bond,
389            oob=oob_contexts,
390            delegate=Delegate(mode, connection, io, prompt),
391        )
392
393        # Connect to a peer or wait for a connection
394        device.on('connection', lambda connection: on_connection(connection, request))
395        if address_or_name is not None:
396            print(color(f'=== Connecting to {address_or_name}...', 'green'))
397            connection = await device.connect(
398                address_or_name,
399                transport=BT_LE_TRANSPORT if mode == 'le' else BT_BR_EDR_TRANSPORT,
400            )
401
402            if not request:
403                try:
404                    if mode == 'le':
405                        await connection.pair()
406                    else:
407                        await connection.authenticate()
408                except ProtocolError as error:
409                    print(color(f'Pairing failed: {error}', 'red'))
410
411        else:
412            if mode == 'le':
413                # Advertise so that peers can find us and connect
414                await device.start_advertising(auto_restart=True)
415            else:
416                # Become discoverable and connectable
417                await device.set_discoverable(True)
418                await device.set_connectable(True)
419
420        # Run until the user asks to exit
421        await Waiter.instance.wait_until_terminated()
422
423
424# -----------------------------------------------------------------------------
425class LogHandler(logging.Handler):
426    def __init__(self):
427        super().__init__()
428        self.setFormatter(logging.Formatter('%(levelname)s:%(name)s:%(message)s'))
429
430    def emit(self, record):
431        message = self.format(record)
432        print(message)
433
434
435# -----------------------------------------------------------------------------
436@click.command()
437@click.option(
438    '--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True
439)
440@click.option(
441    '--sc',
442    type=bool,
443    default=True,
444    help='Use the Secure Connections protocol',
445    show_default=True,
446)
447@click.option(
448    '--mitm', type=bool, default=True, help='Request MITM protection', show_default=True
449)
450@click.option(
451    '--bond', type=bool, default=True, help='Enable bonding', show_default=True
452)
453@click.option(
454    '--ctkd',
455    type=bool,
456    default=True,
457    help='Enable CTKD',
458    show_default=True,
459)
460@click.option('--linger', default=False, is_flag=True, help='Linger after pairing')
461@click.option(
462    '--io',
463    type=click.Choice(
464        ['keyboard', 'display', 'display+keyboard', 'display+yes/no', 'none']
465    ),
466    default='display+keyboard',
467    show_default=True,
468)
469@click.option(
470    '--oob',
471    metavar='<oob-data-hex>',
472    help=(
473        'Use OOB pairing with this data from the peer '
474        '(use "-" to enable OOB without peer data)'
475    ),
476)
477@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request')
478@click.option(
479    '--request', is_flag=True, help='Request that the connecting peer initiate pairing'
480)
481@click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing')
482@click.option(
483    '--keystore-file',
484    metavar='<filename>',
485    help='File in which to store the pairing keys',
486)
487@click.argument('device-config')
488@click.argument('hci_transport')
489@click.argument('address-or-name', required=False)
490def main(
491    mode,
492    sc,
493    mitm,
494    bond,
495    ctkd,
496    linger,
497    io,
498    oob,
499    prompt,
500    request,
501    print_keys,
502    keystore_file,
503    device_config,
504    hci_transport,
505    address_or_name,
506):
507    # Setup logging
508    log_handler = LogHandler()
509    root_logger = logging.getLogger()
510    root_logger.addHandler(log_handler)
511    root_logger.setLevel(os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
512
513    # Pair
514    asyncio.run(
515        pair(
516            mode,
517            sc,
518            mitm,
519            bond,
520            ctkd,
521            linger,
522            io,
523            oob,
524            prompt,
525            request,
526            print_keys,
527            keystore_file,
528            device_config,
529            hci_transport,
530            address_or_name,
531        )
532    )
533
534
535# -----------------------------------------------------------------------------
536if __name__ == '__main__':
537    main()
538