• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import itertools
20import logging
21import os
22import pytest
23
24from bumble.controller import Controller
25from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE
26from bumble.link import LocalLink
27from bumble.device import Device, Peer
28from bumble.host import Host
29from bumble.gatt import Service, Characteristic
30from bumble.transport import AsyncPipeSink
31from bumble.smp import (
32    PairingConfig,
33    PairingDelegate,
34    SMP_PAIRING_NOT_SUPPORTED_ERROR,
35    SMP_CONFIRM_VALUE_FAILED_ERROR,
36)
37from bumble.core import ProtocolError
38
39
40# -----------------------------------------------------------------------------
41# Logging
42# -----------------------------------------------------------------------------
43logger = logging.getLogger(__name__)
44
45
46# -----------------------------------------------------------------------------
47class TwoDevices:
48    def __init__(self):
49        self.connections = [None, None]
50
51        addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0']
52        self.link = LocalLink()
53        self.controllers = [
54            Controller('C1', link=self.link, public_address=addresses[0]),
55            Controller('C2', link=self.link, public_address=addresses[1]),
56        ]
57        self.devices = [
58            Device(
59                address=addresses[0],
60                host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])),
61            ),
62            Device(
63                address=addresses[1],
64                host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])),
65            ),
66        ]
67
68        self.paired = [None, None]
69
70    def on_connection(self, which, connection):
71        self.connections[which] = connection
72
73    def on_paired(self, which, keys):
74        self.paired[which] = keys
75
76
77# -----------------------------------------------------------------------------
78@pytest.mark.asyncio
79async def test_self_connection():
80    # Create two devices, each with a controller, attached to the same link
81    two_devices = TwoDevices()
82
83    # Attach listeners
84    two_devices.devices[0].on(
85        'connection', lambda connection: two_devices.on_connection(0, connection)
86    )
87    two_devices.devices[1].on(
88        'connection', lambda connection: two_devices.on_connection(1, connection)
89    )
90
91    # Start
92    await two_devices.devices[0].power_on()
93    await two_devices.devices[1].power_on()
94
95    # Connect the two devices
96    await two_devices.devices[0].connect(two_devices.devices[1].random_address)
97
98    # Check the post conditions
99    assert two_devices.connections[0] is not None
100    assert two_devices.connections[1] is not None
101
102
103# -----------------------------------------------------------------------------
104@pytest.mark.asyncio
105@pytest.mark.parametrize(
106    'responder_role,',
107    (BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE),
108)
109async def test_self_classic_connection(responder_role):
110    # Create two devices, each with a controller, attached to the same link
111    two_devices = TwoDevices()
112
113    # Attach listeners
114    two_devices.devices[0].on(
115        'connection', lambda connection: two_devices.on_connection(0, connection)
116    )
117    two_devices.devices[1].on(
118        'connection', lambda connection: two_devices.on_connection(1, connection)
119    )
120
121    # Enable Classic connections
122    two_devices.devices[0].classic_enabled = True
123    two_devices.devices[1].classic_enabled = True
124
125    # Start
126    await two_devices.devices[0].power_on()
127    await two_devices.devices[1].power_on()
128
129    # Connect the two devices
130    await asyncio.gather(
131        two_devices.devices[0].connect(
132            two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT
133        ),
134        two_devices.devices[1].accept(
135            two_devices.devices[0].public_address, responder_role
136        ),
137    )
138
139    # Check the post conditions
140    assert two_devices.connections[0] is not None
141    assert two_devices.connections[1] is not None
142
143    # Check the role
144    assert two_devices.connections[0].role != responder_role
145    assert two_devices.connections[1].role == responder_role
146
147    # Role switch
148    await two_devices.connections[0].switch_role(responder_role)
149
150    # Check the role
151    assert two_devices.connections[0].role == responder_role
152    assert two_devices.connections[1].role != responder_role
153
154    await two_devices.connections[0].disconnect()
155
156
157# -----------------------------------------------------------------------------
158@pytest.mark.asyncio
159async def test_self_gatt():
160    # Create two devices, each with a controller, attached to the same link
161    two_devices = TwoDevices()
162
163    # Add some GATT characteristics to device 1
164    c1 = Characteristic(
165        '3A143AD7-D4A7-436B-97D6-5B62C315E833',
166        Characteristic.READ,
167        Characteristic.READABLE,
168        bytes([1, 2, 3]),
169    )
170    c2 = Characteristic(
171        '9557CCE2-DB37-46EB-94C4-50AE5B9CB0F8',
172        Characteristic.READ | Characteristic.WRITE,
173        Characteristic.READABLE | Characteristic.WRITEABLE,
174        bytes([4, 5, 6]),
175    )
176    c3 = Characteristic(
177        '84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
178        Characteristic.READ | Characteristic.WRITE_WITHOUT_RESPONSE,
179        Characteristic.READABLE | Characteristic.WRITEABLE,
180        bytes([7, 8, 9]),
181    )
182    c4 = Characteristic(
183        '84FC1A2E-C52D-4A2D-B8C3-8855BAB86638',
184        Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE,
185        Characteristic.READABLE,
186        bytes([1, 1, 1]),
187    )
188
189    s1 = Service('8140E247-04F0-42C1-BC34-534C344DAFCA', [c1, c2, c3])
190    s2 = Service('97210A0F-1875-4D05-9E5D-326EB171257A', [c4])
191    two_devices.devices[1].add_services([s1, s2])
192
193    # Start
194    await two_devices.devices[0].power_on()
195    await two_devices.devices[1].power_on()
196
197    # Connect the two devices
198    connection = await two_devices.devices[0].connect(
199        two_devices.devices[1].random_address
200    )
201    peer = Peer(connection)
202
203    bogus_uuid = 'A0AA6007-0B48-4BBE-80AC-0DE9AAF541EA'
204    result = await peer.discover_services([bogus_uuid])
205    assert result == []
206    services = peer.get_services_by_uuid(bogus_uuid)
207    assert len(services) == 0
208
209    result = await peer.discover_service(s1.uuid)
210    assert len(result) == 1
211    services = peer.get_services_by_uuid(s1.uuid)
212    assert len(services) == 1
213    s = services[0]
214    assert services[0].uuid == s1.uuid
215
216    result = await peer.discover_characteristics([c1.uuid], s)
217    assert len(result) == 1
218    characteristics = peer.get_characteristics_by_uuid(c1.uuid)
219    assert len(characteristics) == 1
220    c = characteristics[0]
221    assert c.uuid == c1.uuid
222    result = await peer.read_value(c)
223    assert result is not None
224    assert result == c1.value
225
226
227# -----------------------------------------------------------------------------
228@pytest.mark.asyncio
229async def test_self_gatt_long_read():
230    # Create two devices, each with a controller, attached to the same link
231    two_devices = TwoDevices()
232
233    # Add some GATT characteristics to device 1
234    characteristics = [
235        Characteristic(
236            f'3A143AD7-D4A7-436B-97D6-5B62C315{i:04X}',
237            Characteristic.READ,
238            Characteristic.READABLE,
239            bytes([x & 255 for x in range(i)]),
240        )
241        for i in range(0, 513)
242    ]
243
244    service = Service('8140E247-04F0-42C1-BC34-534C344DAFCA', characteristics)
245    two_devices.devices[1].add_service(service)
246
247    # Start
248    await two_devices.devices[0].power_on()
249    await two_devices.devices[1].power_on()
250
251    # Connect the two devices
252    connection = await two_devices.devices[0].connect(
253        two_devices.devices[1].random_address
254    )
255    peer = Peer(connection)
256
257    result = await peer.discover_service(service.uuid)
258    assert len(result) == 1
259    found_service = result[0]
260    found_characteristics = await found_service.discover_characteristics()
261    assert len(found_characteristics) == 513
262    for (i, characteristic) in enumerate(found_characteristics):
263        value = await characteristic.read_value()
264        assert value == characteristics[i].value
265
266
267# -----------------------------------------------------------------------------
268async def _test_self_smp_with_configs(pairing_config1, pairing_config2):
269    # Create two devices, each with a controller, attached to the same link
270    two_devices = TwoDevices()
271
272    # Start
273    await two_devices.devices[0].power_on()
274    await two_devices.devices[1].power_on()
275
276    # Attach listeners
277    two_devices.devices[0].on(
278        'connection', lambda connection: two_devices.on_connection(0, connection)
279    )
280    two_devices.devices[1].on(
281        'connection', lambda connection: two_devices.on_connection(1, connection)
282    )
283
284    # Connect the two devices
285    connection = await two_devices.devices[0].connect(
286        two_devices.devices[1].random_address
287    )
288    assert not connection.is_encrypted
289
290    # Attach connection listeners
291    two_devices.connections[0].on(
292        'pairing', lambda keys: two_devices.on_paired(0, keys)
293    )
294    two_devices.connections[1].on(
295        'pairing', lambda keys: two_devices.on_paired(1, keys)
296    )
297
298    # Set up the pairing configs
299    if pairing_config1:
300        two_devices.devices[
301            0
302        ].pairing_config_factory = lambda connection: pairing_config1
303    if pairing_config2:
304        two_devices.devices[
305            1
306        ].pairing_config_factory = lambda connection: pairing_config2
307
308    # Pair
309    await two_devices.devices[0].pair(connection)
310    assert connection.is_encrypted
311    assert two_devices.paired[0] is not None
312    assert two_devices.paired[1] is not None
313
314
315# -----------------------------------------------------------------------------
316IO_CAP = [
317    PairingDelegate.NO_OUTPUT_NO_INPUT,
318    PairingDelegate.KEYBOARD_INPUT_ONLY,
319    PairingDelegate.DISPLAY_OUTPUT_ONLY,
320    PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT,
321    PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT,
322]
323SC = [False, True]
324MITM = [False, True]
325# Key distribution is a 4-bit bitmask
326KEY_DIST = range(16)
327
328
329@pytest.mark.asyncio
330@pytest.mark.parametrize(
331    'io_caps, sc, mitm, key_dist',
332    itertools.chain(
333        itertools.product([IO_CAP], SC, MITM, [15]),
334        itertools.product(
335            [[PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]], SC, MITM, KEY_DIST
336        ),
337    ),
338)
339async def test_self_smp(io_caps, sc, mitm, key_dist):
340    class Delegate(PairingDelegate):
341        def __init__(
342            self,
343            name,
344            io_capability,
345            local_initiator_key_distribution,
346            local_responder_key_distribution,
347        ):
348            super().__init__(
349                io_capability,
350                local_initiator_key_distribution,
351                local_responder_key_distribution,
352            )
353            self.name = name
354            self.reset()
355
356        def reset(self):
357            self.peer_delegate = None
358            self.number = asyncio.get_running_loop().create_future()
359
360        # pylint: disable-next=unused-argument
361        async def compare_numbers(self, number, digits):
362            if self.peer_delegate is None:
363                logger.warning(f'[{self.name}] no peer delegate')
364                return False
365            await self.display_number(number, digits=6)
366            logger.debug(f'[{self.name}] waiting for peer number')
367            peer_number = await self.peer_delegate.number
368            logger.debug(f'[{self.name}] comparing numbers: {number} and {peer_number}')
369            return number == peer_number
370
371        async def get_number(self):
372            if self.peer_delegate is None:
373                logger.warning(f'[{self.name}] no peer delegate')
374                return 0
375            else:
376                if (
377                    self.peer_delegate.io_capability
378                    == PairingDelegate.KEYBOARD_INPUT_ONLY
379                ):
380                    peer_number = 6789
381                else:
382                    logger.debug(f'[{self.name}] waiting for peer number')
383                    peer_number = await self.peer_delegate.number
384                logger.debug(f'[{self.name}] returning number: {peer_number}')
385                return peer_number
386
387        async def display_number(self, number, digits):
388            logger.debug(f'[{self.name}] displaying number: {number}')
389            self.number.set_result(number)
390
391        def __str__(self):
392            return f'Delegate(name={self.name}, io_capability={self.io_capability})'
393
394    pairing_config_sets = [('Initiator', [None]), ('Responder', [None])]
395    for pairing_config_set in pairing_config_sets:
396        for io_cap in io_caps:
397            delegate = Delegate(pairing_config_set[0], io_cap, key_dist, key_dist)
398            pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate))
399
400    for pairing_config1 in pairing_config_sets[0][1]:
401        for pairing_config2 in pairing_config_sets[1][1]:
402            logger.info(
403                f'########## self_smp with {pairing_config1} and {pairing_config2}'
404            )
405            if pairing_config1:
406                pairing_config1.delegate.reset()
407            if pairing_config2:
408                pairing_config2.delegate.reset()
409            if pairing_config1 and pairing_config2:
410                pairing_config1.delegate.peer_delegate = pairing_config2.delegate
411                pairing_config2.delegate.peer_delegate = pairing_config1.delegate
412
413            await _test_self_smp_with_configs(pairing_config1, pairing_config2)
414
415
416# -----------------------------------------------------------------------------
417@pytest.mark.asyncio
418async def test_self_smp_reject():
419    class RejectingDelegate(PairingDelegate):
420        def __init__(self):
421            super().__init__(PairingDelegate.NO_OUTPUT_NO_INPUT)
422
423        async def accept(self):
424            return False
425
426    rejecting_pairing_config = PairingConfig(delegate=RejectingDelegate())
427    paired = False
428    try:
429        await _test_self_smp_with_configs(None, rejecting_pairing_config)
430        paired = True
431    except ProtocolError as error:
432        assert error.error_code == SMP_PAIRING_NOT_SUPPORTED_ERROR
433
434    assert not paired
435
436
437# -----------------------------------------------------------------------------
438@pytest.mark.asyncio
439async def test_self_smp_wrong_pin():
440    class WrongPinDelegate(PairingDelegate):
441        def __init__(self):
442            super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT)
443
444        async def compare_numbers(self, number, digits):
445            return False
446
447    wrong_pin_pairing_config = PairingConfig(delegate=WrongPinDelegate())
448    paired = False
449    try:
450        await _test_self_smp_with_configs(
451            wrong_pin_pairing_config, wrong_pin_pairing_config
452        )
453        paired = True
454    except ProtocolError as error:
455        assert error.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR
456
457    assert not paired
458
459
460# -----------------------------------------------------------------------------
461async def run_test_self():
462    await test_self_connection()
463    await test_self_gatt()
464    await test_self_gatt_long_read()
465    await test_self_smp()
466    await test_self_smp_reject()
467    await test_self_smp_wrong_pin()
468
469
470# -----------------------------------------------------------------------------
471if __name__ == '__main__':
472    logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
473    asyncio.run(run_test_self())
474