• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21import click
22import aioconsole
23from colors import color
24
25from bumble.device import Device, Peer
26from bumble.transport import open_transport_or_link
27from bumble.smp import PairingDelegate, PairingConfig
28from bumble.smp import error_name as smp_error_name
29from bumble.keys import JsonKeyStore
30from bumble.core import ProtocolError
31from bumble.gatt import (
32    GATT_DEVICE_NAME_CHARACTERISTIC,
33    GATT_GENERIC_ACCESS_SERVICE,
34    Service,
35    Characteristic,
36    CharacteristicValue
37)
38from bumble.att import (
39    ATT_Error,
40    ATT_INSUFFICIENT_AUTHENTICATION_ERROR,
41    ATT_INSUFFICIENT_ENCRYPTION_ERROR
42)
43
44
45# -----------------------------------------------------------------------------
46class Delegate(PairingDelegate):
47    def __init__(self, mode, connection, capability_string, prompt):
48        super().__init__({
49            'keyboard':         PairingDelegate.KEYBOARD_INPUT_ONLY,
50            'display':          PairingDelegate.DISPLAY_OUTPUT_ONLY,
51            'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
52            'display+yes/no':   PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
53            'none':             PairingDelegate.NO_OUTPUT_NO_INPUT
54        }[capability_string.lower()])
55
56        self.mode      = mode
57        self.peer      = Peer(connection)
58        self.peer_name = None
59        self.prompt    = prompt
60
61    async def update_peer_name(self):
62        if self.peer_name is not None:
63            # We already asked the peer
64            return
65
66        # Try to get the peer's name
67        if self.peer:
68            peer_name = await get_peer_name(self.peer, self.mode)
69            self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]'
70        else:
71            self.peer_name = '[?]'
72
73    async def accept(self):
74        if self.prompt:
75            await self.update_peer_name()
76
77            # Wait a bit to allow some of the log lines to print before we prompt
78            await asyncio.sleep(1)
79
80            # Prompt for acceptance
81            print(color('###-----------------------------------', 'yellow'))
82            print(color(f'### Pairing request from {self.peer_name}', 'yellow'))
83            print(color('###-----------------------------------', 'yellow'))
84            while True:
85                response = await aioconsole.ainput(color('>>> Accept? ', 'yellow'))
86                response = response.lower().strip()
87                if response == 'yes':
88                    return True
89                elif response == 'no':
90                    return False
91        else:
92            # Accept silently
93            return True
94
95    async def compare_numbers(self, number, digits):
96        await self.update_peer_name()
97
98        # Wait a bit to allow some of the log lines to print before we prompt
99        await asyncio.sleep(1)
100
101        # Prompt for a numeric comparison
102        print(color('###-----------------------------------', 'yellow'))
103        print(color(f'### Pairing with {self.peer_name}', 'yellow'))
104        print(color('###-----------------------------------', 'yellow'))
105        while True:
106            response = await aioconsole.ainput(color(f'>>> Does the other device display {number:0{digits}}? ', 'yellow'))
107            response = response.lower().strip()
108            if response == 'yes':
109                return True
110            elif response == 'no':
111                return False
112
113    async def get_number(self):
114        await self.update_peer_name()
115
116        # Wait a bit to allow some of the log lines to print before we prompt
117        await asyncio.sleep(1)
118
119        # Prompt for a PIN
120        while True:
121            try:
122                print(color('###-----------------------------------', 'yellow'))
123                print(color(f'### Pairing with {self.peer_name}', 'yellow'))
124                print(color('###-----------------------------------', 'yellow'))
125                return int(await aioconsole.ainput(color('>>> Enter PIN: ', 'yellow')))
126            except ValueError:
127                pass
128
129    async def display_number(self, number, digits):
130        await self.update_peer_name()
131
132        # Wait a bit to allow some of the log lines to print before we prompt
133        await asyncio.sleep(1)
134
135        # Display a PIN code
136        print(color('###-----------------------------------', 'yellow'))
137        print(color(f'### Pairing with {self.peer_name}', 'yellow'))
138        print(color(f'### PIN: {number:0{digits}}', 'yellow'))
139        print(color('###-----------------------------------', 'yellow'))
140
141
142# -----------------------------------------------------------------------------
143async def get_peer_name(peer, mode):
144    if mode == 'classic':
145        return await peer.request_name()
146    else:
147        # Try to get the peer name from GATT
148        services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE)
149        if not services:
150            return None
151
152        values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0])
153        if values:
154            return values[0].decode('utf-8')
155
156
157# -----------------------------------------------------------------------------
158AUTHENTICATION_ERROR_RETURNED = [False, False]
159
160
161def read_with_error(connection):
162    if not connection.is_encrypted:
163        raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR)
164
165    if AUTHENTICATION_ERROR_RETURNED[0]:
166        return bytes([1])
167    else:
168        AUTHENTICATION_ERROR_RETURNED[0] = True
169        raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR)
170
171
172def write_with_error(connection, value):
173    if not connection.is_encrypted:
174        raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR)
175
176    if not AUTHENTICATION_ERROR_RETURNED[1]:
177        AUTHENTICATION_ERROR_RETURNED[1] = True
178        raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR)
179
180
181# -----------------------------------------------------------------------------
182def on_connection(connection, request):
183    print(color(f'<<< Connection: {connection}', 'green'))
184
185    # Listen for pairing events
186    connection.on('pairing_start',   on_pairing_start)
187    connection.on('pairing',         on_pairing)
188    connection.on('pairing_failure', on_pairing_failure)
189
190    # Listen for encryption changes
191    connection.on(
192        'connection_encryption_change',
193        lambda: on_connection_encryption_change(connection)
194    )
195
196    # Request pairing if needed
197    if request:
198        print(color('>>> Requesting pairing', 'green'))
199        connection.request_pairing()
200
201
202# -----------------------------------------------------------------------------
203def on_connection_encryption_change(connection):
204    print(color('@@@-----------------------------------', 'blue'))
205    print(color(f'@@@ Connection is {"" if connection.is_encrypted else "not"}encrypted', 'blue'))
206    print(color('@@@-----------------------------------', 'blue'))
207
208
209# -----------------------------------------------------------------------------
210def on_pairing_start():
211    print(color('***-----------------------------------', 'magenta'))
212    print(color('*** Pairing starting', 'magenta'))
213    print(color('***-----------------------------------', 'magenta'))
214
215
216# -----------------------------------------------------------------------------
217def on_pairing(keys):
218    print(color('***-----------------------------------', 'cyan'))
219    print(color('*** Paired!', 'cyan'))
220    keys.print(prefix=color('*** ', 'cyan'))
221    print(color('***-----------------------------------', 'cyan'))
222
223
224# -----------------------------------------------------------------------------
225def on_pairing_failure(reason):
226    print(color('***-----------------------------------', 'red'))
227    print(color(f'*** Pairing failed: {smp_error_name(reason)}', 'red'))
228    print(color('***-----------------------------------', 'red'))
229
230
231# -----------------------------------------------------------------------------
232async def pair(
233    mode,
234    sc,
235    mitm,
236    bond,
237    io,
238    prompt,
239    request,
240    print_keys,
241    keystore_file,
242    device_config,
243    hci_transport,
244    address_or_name
245):
246    print('<<< connecting to HCI...')
247    async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink):
248        print('<<< connected')
249
250        # Create a device to manage the host
251        device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
252
253        # Set a custom keystore if specified on the command line
254        if keystore_file:
255            device.keystore = JsonKeyStore(namespace=None, filename=keystore_file)
256
257        # Print the existing keys before pairing
258        if print_keys and device.keystore:
259            print(color('@@@-----------------------------------', 'blue'))
260            print(color('@@@ Pairing Keys:', 'blue'))
261            await device.keystore.print(prefix=color('@@@ ', 'blue'))
262            print(color('@@@-----------------------------------', 'blue'))
263
264        # Expose a GATT characteristic that can be used to trigger pairing by
265        # responding with an authentication error when read
266        if mode == 'le':
267            device.add_service(
268                Service(
269                    '50DB505C-8AC4-4738-8448-3B1D9CC09CC5',
270                    [
271                        Characteristic(
272                            '552957FB-CF1F-4A31-9535-E78847E1A714',
273                            Characteristic.READ | Characteristic.WRITE,
274                            Characteristic.READABLE | Characteristic.WRITEABLE,
275                            CharacteristicValue(read=read_with_error, write=write_with_error)
276                        )
277                    ]
278                )
279            )
280
281        # Select LE or Classic
282        if mode == 'classic':
283            device.classic_enabled = True
284            device.le_enabled = False
285
286        # Get things going
287        await device.power_on()
288
289        # Set up a pairing config factory
290        device.pairing_config_factory = lambda connection: PairingConfig(
291            sc,
292            mitm,
293            bond,
294            Delegate(mode, connection, io, prompt)
295        )
296
297        # Connect to a peer or wait for a connection
298        device.on('connection', lambda connection: on_connection(connection, request))
299        if address_or_name is not None:
300            print(color(f'=== Connecting to {address_or_name}...', 'green'))
301            connection = await device.connect(address_or_name)
302
303            if not request:
304                try:
305                    if mode == 'le':
306                        await connection.pair()
307                    else:
308                        await connection.authenticate()
309                    return
310                except ProtocolError as error:
311                    print(color(f'Pairing failed: {error}', 'red'))
312                    return
313        else:
314            # Advertise so that peers can find us and connect
315            await device.start_advertising(auto_restart=True)
316
317        await hci_source.wait_for_termination()
318
319
320# -----------------------------------------------------------------------------
321@click.command()
322@click.option('--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True)
323@click.option('--sc', type=bool, default=True, help='Use the Secure Connections protocol', show_default=True)
324@click.option('--mitm', type=bool, default=True, help='Request MITM protection', show_default=True)
325@click.option('--bond', type=bool, default=True, help='Enable bonding', show_default=True)
326@click.option('--io', type=click.Choice(['keyboard', 'display', 'display+keyboard', 'display+yes/no', 'none']), default='display+keyboard', show_default=True)
327@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request')
328@click.option('--request', is_flag=True, help='Request that the connecting peer initiate pairing')
329@click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing')
330@click.option('--keystore-file', help='File in which to store the pairing keys')
331@click.argument('device-config')
332@click.argument('hci_transport')
333@click.argument('address-or-name', required=False)
334def main(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name):
335    logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
336    asyncio.run(pair(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name))
337
338
339# -----------------------------------------------------------------------------
340if __name__ == '__main__':
341    main()
342