1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import itertools 20import logging 21import os 22import pytest 23 24from bumble.controller import Controller 25from bumble.core import BT_BR_EDR_TRANSPORT, BT_PERIPHERAL_ROLE, BT_CENTRAL_ROLE 26from bumble.link import LocalLink 27from bumble.device import Device, Peer 28from bumble.host import Host 29from bumble.gatt import Service, Characteristic 30from bumble.transport import AsyncPipeSink 31from bumble.smp import ( 32 PairingConfig, 33 PairingDelegate, 34 SMP_PAIRING_NOT_SUPPORTED_ERROR, 35 SMP_CONFIRM_VALUE_FAILED_ERROR, 36) 37from bumble.core import ProtocolError 38 39 40# ----------------------------------------------------------------------------- 41# Logging 42# ----------------------------------------------------------------------------- 43logger = logging.getLogger(__name__) 44 45 46# ----------------------------------------------------------------------------- 47class TwoDevices: 48 def __init__(self): 49 self.connections = [None, None] 50 51 addresses = ['F0:F1:F2:F3:F4:F5', 'F5:F4:F3:F2:F1:F0'] 52 self.link = LocalLink() 53 self.controllers = [ 54 Controller('C1', link=self.link, public_address=addresses[0]), 55 Controller('C2', link=self.link, public_address=addresses[1]), 56 ] 57 self.devices = [ 58 Device( 59 address=addresses[0], 60 host=Host(self.controllers[0], AsyncPipeSink(self.controllers[0])), 61 ), 62 Device( 63 address=addresses[1], 64 host=Host(self.controllers[1], AsyncPipeSink(self.controllers[1])), 65 ), 66 ] 67 68 self.paired = [None, None] 69 70 def on_connection(self, which, connection): 71 self.connections[which] = connection 72 73 def on_paired(self, which, keys): 74 self.paired[which] = keys 75 76 77# ----------------------------------------------------------------------------- 78@pytest.mark.asyncio 79async def test_self_connection(): 80 # Create two devices, each with a controller, attached to the same link 81 two_devices = TwoDevices() 82 83 # Attach listeners 84 two_devices.devices[0].on( 85 'connection', lambda connection: two_devices.on_connection(0, connection) 86 ) 87 two_devices.devices[1].on( 88 'connection', lambda connection: two_devices.on_connection(1, connection) 89 ) 90 91 # Start 92 await two_devices.devices[0].power_on() 93 await two_devices.devices[1].power_on() 94 95 # Connect the two devices 96 await two_devices.devices[0].connect(two_devices.devices[1].random_address) 97 98 # Check the post conditions 99 assert two_devices.connections[0] is not None 100 assert two_devices.connections[1] is not None 101 102 103# ----------------------------------------------------------------------------- 104@pytest.mark.asyncio 105@pytest.mark.parametrize( 106 'responder_role,', 107 (BT_CENTRAL_ROLE, BT_PERIPHERAL_ROLE), 108) 109async def test_self_classic_connection(responder_role): 110 # Create two devices, each with a controller, attached to the same link 111 two_devices = TwoDevices() 112 113 # Attach listeners 114 two_devices.devices[0].on( 115 'connection', lambda connection: two_devices.on_connection(0, connection) 116 ) 117 two_devices.devices[1].on( 118 'connection', lambda connection: two_devices.on_connection(1, connection) 119 ) 120 121 # Enable Classic connections 122 two_devices.devices[0].classic_enabled = True 123 two_devices.devices[1].classic_enabled = True 124 125 # Start 126 await two_devices.devices[0].power_on() 127 await two_devices.devices[1].power_on() 128 129 # Connect the two devices 130 await asyncio.gather( 131 two_devices.devices[0].connect( 132 two_devices.devices[1].public_address, transport=BT_BR_EDR_TRANSPORT 133 ), 134 two_devices.devices[1].accept( 135 two_devices.devices[0].public_address, responder_role 136 ), 137 ) 138 139 # Check the post conditions 140 assert two_devices.connections[0] is not None 141 assert two_devices.connections[1] is not None 142 143 # Check the role 144 assert two_devices.connections[0].role != responder_role 145 assert two_devices.connections[1].role == responder_role 146 147 # Role switch 148 await two_devices.connections[0].switch_role(responder_role) 149 150 # Check the role 151 assert two_devices.connections[0].role == responder_role 152 assert two_devices.connections[1].role != responder_role 153 154 await two_devices.connections[0].disconnect() 155 156 157# ----------------------------------------------------------------------------- 158@pytest.mark.asyncio 159async def test_self_gatt(): 160 # Create two devices, each with a controller, attached to the same link 161 two_devices = TwoDevices() 162 163 # Add some GATT characteristics to device 1 164 c1 = Characteristic( 165 '3A143AD7-D4A7-436B-97D6-5B62C315E833', 166 Characteristic.READ, 167 Characteristic.READABLE, 168 bytes([1, 2, 3]), 169 ) 170 c2 = Characteristic( 171 '9557CCE2-DB37-46EB-94C4-50AE5B9CB0F8', 172 Characteristic.READ | Characteristic.WRITE, 173 Characteristic.READABLE | Characteristic.WRITEABLE, 174 bytes([4, 5, 6]), 175 ) 176 c3 = Characteristic( 177 '84FC1A2E-C52D-4A2D-B8C3-8855BAB86638', 178 Characteristic.READ | Characteristic.WRITE_WITHOUT_RESPONSE, 179 Characteristic.READABLE | Characteristic.WRITEABLE, 180 bytes([7, 8, 9]), 181 ) 182 c4 = Characteristic( 183 '84FC1A2E-C52D-4A2D-B8C3-8855BAB86638', 184 Characteristic.READ | Characteristic.NOTIFY | Characteristic.INDICATE, 185 Characteristic.READABLE, 186 bytes([1, 1, 1]), 187 ) 188 189 s1 = Service('8140E247-04F0-42C1-BC34-534C344DAFCA', [c1, c2, c3]) 190 s2 = Service('97210A0F-1875-4D05-9E5D-326EB171257A', [c4]) 191 two_devices.devices[1].add_services([s1, s2]) 192 193 # Start 194 await two_devices.devices[0].power_on() 195 await two_devices.devices[1].power_on() 196 197 # Connect the two devices 198 connection = await two_devices.devices[0].connect( 199 two_devices.devices[1].random_address 200 ) 201 peer = Peer(connection) 202 203 bogus_uuid = 'A0AA6007-0B48-4BBE-80AC-0DE9AAF541EA' 204 result = await peer.discover_services([bogus_uuid]) 205 assert result == [] 206 services = peer.get_services_by_uuid(bogus_uuid) 207 assert len(services) == 0 208 209 result = await peer.discover_service(s1.uuid) 210 assert len(result) == 1 211 services = peer.get_services_by_uuid(s1.uuid) 212 assert len(services) == 1 213 s = services[0] 214 assert services[0].uuid == s1.uuid 215 216 result = await peer.discover_characteristics([c1.uuid], s) 217 assert len(result) == 1 218 characteristics = peer.get_characteristics_by_uuid(c1.uuid) 219 assert len(characteristics) == 1 220 c = characteristics[0] 221 assert c.uuid == c1.uuid 222 result = await peer.read_value(c) 223 assert result is not None 224 assert result == c1.value 225 226 227# ----------------------------------------------------------------------------- 228@pytest.mark.asyncio 229async def test_self_gatt_long_read(): 230 # Create two devices, each with a controller, attached to the same link 231 two_devices = TwoDevices() 232 233 # Add some GATT characteristics to device 1 234 characteristics = [ 235 Characteristic( 236 f'3A143AD7-D4A7-436B-97D6-5B62C315{i:04X}', 237 Characteristic.READ, 238 Characteristic.READABLE, 239 bytes([x & 255 for x in range(i)]), 240 ) 241 for i in range(0, 513) 242 ] 243 244 service = Service('8140E247-04F0-42C1-BC34-534C344DAFCA', characteristics) 245 two_devices.devices[1].add_service(service) 246 247 # Start 248 await two_devices.devices[0].power_on() 249 await two_devices.devices[1].power_on() 250 251 # Connect the two devices 252 connection = await two_devices.devices[0].connect( 253 two_devices.devices[1].random_address 254 ) 255 peer = Peer(connection) 256 257 result = await peer.discover_service(service.uuid) 258 assert len(result) == 1 259 found_service = result[0] 260 found_characteristics = await found_service.discover_characteristics() 261 assert len(found_characteristics) == 513 262 for (i, characteristic) in enumerate(found_characteristics): 263 value = await characteristic.read_value() 264 assert value == characteristics[i].value 265 266 267# ----------------------------------------------------------------------------- 268async def _test_self_smp_with_configs(pairing_config1, pairing_config2): 269 # Create two devices, each with a controller, attached to the same link 270 two_devices = TwoDevices() 271 272 # Start 273 await two_devices.devices[0].power_on() 274 await two_devices.devices[1].power_on() 275 276 # Attach listeners 277 two_devices.devices[0].on( 278 'connection', lambda connection: two_devices.on_connection(0, connection) 279 ) 280 two_devices.devices[1].on( 281 'connection', lambda connection: two_devices.on_connection(1, connection) 282 ) 283 284 # Connect the two devices 285 connection = await two_devices.devices[0].connect( 286 two_devices.devices[1].random_address 287 ) 288 assert not connection.is_encrypted 289 290 # Attach connection listeners 291 two_devices.connections[0].on( 292 'pairing', lambda keys: two_devices.on_paired(0, keys) 293 ) 294 two_devices.connections[1].on( 295 'pairing', lambda keys: two_devices.on_paired(1, keys) 296 ) 297 298 # Set up the pairing configs 299 if pairing_config1: 300 two_devices.devices[ 301 0 302 ].pairing_config_factory = lambda connection: pairing_config1 303 if pairing_config2: 304 two_devices.devices[ 305 1 306 ].pairing_config_factory = lambda connection: pairing_config2 307 308 # Pair 309 await two_devices.devices[0].pair(connection) 310 assert connection.is_encrypted 311 assert two_devices.paired[0] is not None 312 assert two_devices.paired[1] is not None 313 314 315# ----------------------------------------------------------------------------- 316IO_CAP = [ 317 PairingDelegate.NO_OUTPUT_NO_INPUT, 318 PairingDelegate.KEYBOARD_INPUT_ONLY, 319 PairingDelegate.DISPLAY_OUTPUT_ONLY, 320 PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, 321 PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, 322] 323SC = [False, True] 324MITM = [False, True] 325# Key distribution is a 4-bit bitmask 326KEY_DIST = range(16) 327 328 329@pytest.mark.asyncio 330@pytest.mark.parametrize( 331 'io_caps, sc, mitm, key_dist', 332 itertools.chain( 333 itertools.product([IO_CAP], SC, MITM, [15]), 334 itertools.product( 335 [[PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT]], SC, MITM, KEY_DIST 336 ), 337 ), 338) 339async def test_self_smp(io_caps, sc, mitm, key_dist): 340 class Delegate(PairingDelegate): 341 def __init__( 342 self, 343 name, 344 io_capability, 345 local_initiator_key_distribution, 346 local_responder_key_distribution, 347 ): 348 super().__init__( 349 io_capability, 350 local_initiator_key_distribution, 351 local_responder_key_distribution, 352 ) 353 self.name = name 354 self.reset() 355 356 def reset(self): 357 self.peer_delegate = None 358 self.number = asyncio.get_running_loop().create_future() 359 360 # pylint: disable-next=unused-argument 361 async def compare_numbers(self, number, digits): 362 if self.peer_delegate is None: 363 logger.warning(f'[{self.name}] no peer delegate') 364 return False 365 await self.display_number(number, digits=6) 366 logger.debug(f'[{self.name}] waiting for peer number') 367 peer_number = await self.peer_delegate.number 368 logger.debug(f'[{self.name}] comparing numbers: {number} and {peer_number}') 369 return number == peer_number 370 371 async def get_number(self): 372 if self.peer_delegate is None: 373 logger.warning(f'[{self.name}] no peer delegate') 374 return 0 375 else: 376 if ( 377 self.peer_delegate.io_capability 378 == PairingDelegate.KEYBOARD_INPUT_ONLY 379 ): 380 peer_number = 6789 381 else: 382 logger.debug(f'[{self.name}] waiting for peer number') 383 peer_number = await self.peer_delegate.number 384 logger.debug(f'[{self.name}] returning number: {peer_number}') 385 return peer_number 386 387 async def display_number(self, number, digits): 388 logger.debug(f'[{self.name}] displaying number: {number}') 389 self.number.set_result(number) 390 391 def __str__(self): 392 return f'Delegate(name={self.name}, io_capability={self.io_capability})' 393 394 pairing_config_sets = [('Initiator', [None]), ('Responder', [None])] 395 for pairing_config_set in pairing_config_sets: 396 for io_cap in io_caps: 397 delegate = Delegate(pairing_config_set[0], io_cap, key_dist, key_dist) 398 pairing_config_set[1].append(PairingConfig(sc, mitm, True, delegate)) 399 400 for pairing_config1 in pairing_config_sets[0][1]: 401 for pairing_config2 in pairing_config_sets[1][1]: 402 logger.info( 403 f'########## self_smp with {pairing_config1} and {pairing_config2}' 404 ) 405 if pairing_config1: 406 pairing_config1.delegate.reset() 407 if pairing_config2: 408 pairing_config2.delegate.reset() 409 if pairing_config1 and pairing_config2: 410 pairing_config1.delegate.peer_delegate = pairing_config2.delegate 411 pairing_config2.delegate.peer_delegate = pairing_config1.delegate 412 413 await _test_self_smp_with_configs(pairing_config1, pairing_config2) 414 415 416# ----------------------------------------------------------------------------- 417@pytest.mark.asyncio 418async def test_self_smp_reject(): 419 class RejectingDelegate(PairingDelegate): 420 def __init__(self): 421 super().__init__(PairingDelegate.NO_OUTPUT_NO_INPUT) 422 423 async def accept(self): 424 return False 425 426 rejecting_pairing_config = PairingConfig(delegate=RejectingDelegate()) 427 paired = False 428 try: 429 await _test_self_smp_with_configs(None, rejecting_pairing_config) 430 paired = True 431 except ProtocolError as error: 432 assert error.error_code == SMP_PAIRING_NOT_SUPPORTED_ERROR 433 434 assert not paired 435 436 437# ----------------------------------------------------------------------------- 438@pytest.mark.asyncio 439async def test_self_smp_wrong_pin(): 440 class WrongPinDelegate(PairingDelegate): 441 def __init__(self): 442 super().__init__(PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT) 443 444 async def compare_numbers(self, number, digits): 445 return False 446 447 wrong_pin_pairing_config = PairingConfig(delegate=WrongPinDelegate()) 448 paired = False 449 try: 450 await _test_self_smp_with_configs( 451 wrong_pin_pairing_config, wrong_pin_pairing_config 452 ) 453 paired = True 454 except ProtocolError as error: 455 assert error.error_code == SMP_CONFIRM_VALUE_FAILED_ERROR 456 457 assert not paired 458 459 460# ----------------------------------------------------------------------------- 461async def run_test_self(): 462 await test_self_connection() 463 await test_self_gatt() 464 await test_self_gatt_long_read() 465 await test_self_smp() 466 await test_self_smp_reject() 467 await test_self_smp_wrong_pin() 468 469 470# ----------------------------------------------------------------------------- 471if __name__ == '__main__': 472 logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 473 asyncio.run(run_test_self()) 474