1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import os 20import logging 21import click 22from prompt_toolkit.shortcuts import PromptSession 23 24from bumble.colors import color 25from bumble.device import Device, Peer 26from bumble.transport import open_transport_or_link 27from bumble.pairing import OobData, PairingDelegate, PairingConfig 28from bumble.smp import OobContext, OobLegacyContext 29from bumble.smp import error_name as smp_error_name 30from bumble.keys import JsonKeyStore 31from bumble.core import ( 32 AdvertisingData, 33 ProtocolError, 34 BT_LE_TRANSPORT, 35 BT_BR_EDR_TRANSPORT, 36) 37from bumble.gatt import ( 38 GATT_DEVICE_NAME_CHARACTERISTIC, 39 GATT_GENERIC_ACCESS_SERVICE, 40 Service, 41 Characteristic, 42 CharacteristicValue, 43) 44from bumble.att import ( 45 ATT_Error, 46 ATT_INSUFFICIENT_AUTHENTICATION_ERROR, 47 ATT_INSUFFICIENT_ENCRYPTION_ERROR, 48) 49 50 51# ----------------------------------------------------------------------------- 52class Waiter: 53 instance = None 54 55 def __init__(self, linger=False): 56 self.done = asyncio.get_running_loop().create_future() 57 self.linger = linger 58 59 def terminate(self): 60 if not self.linger: 61 self.done.set_result(None) 62 63 async def wait_until_terminated(self): 64 return await self.done 65 66 67# ----------------------------------------------------------------------------- 68class Delegate(PairingDelegate): 69 def __init__(self, mode, connection, capability_string, do_prompt): 70 super().__init__( 71 io_capability={ 72 'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY, 73 'display': PairingDelegate.DISPLAY_OUTPUT_ONLY, 74 'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, 75 'display+yes/no': PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, 76 'none': PairingDelegate.NO_OUTPUT_NO_INPUT, 77 }[capability_string.lower()] 78 ) 79 80 self.mode = mode 81 self.peer = Peer(connection) 82 self.peer_name = None 83 self.do_prompt = do_prompt 84 85 def print(self, message): 86 print(color(message, 'yellow')) 87 88 async def prompt(self, message): 89 # Wait a bit to allow some of the log lines to print before we prompt 90 await asyncio.sleep(1) 91 92 session = PromptSession(message) 93 response = await session.prompt_async() 94 return response.lower().strip() 95 96 async def update_peer_name(self): 97 if self.peer_name is not None: 98 # We already asked the peer 99 return 100 101 # Try to get the peer's name 102 if self.peer: 103 peer_name = await get_peer_name(self.peer, self.mode) 104 self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]' 105 else: 106 self.peer_name = '[?]' 107 108 async def accept(self): 109 if self.do_prompt: 110 await self.update_peer_name() 111 112 # Prompt for acceptance 113 self.print('###-----------------------------------') 114 self.print(f'### Pairing request from {self.peer_name}') 115 self.print('###-----------------------------------') 116 while True: 117 response = await self.prompt('>>> Accept? ') 118 119 if response == 'yes': 120 return True 121 122 if response == 'no': 123 return False 124 125 # Accept silently 126 return True 127 128 async def compare_numbers(self, number, digits): 129 await self.update_peer_name() 130 131 # Prompt for a numeric comparison 132 self.print('###-----------------------------------') 133 self.print(f'### Pairing with {self.peer_name}') 134 self.print('###-----------------------------------') 135 while True: 136 response = await self.prompt( 137 f'>>> Does the other device display {number:0{digits}}? ' 138 ) 139 140 if response == 'yes': 141 return True 142 143 if response == 'no': 144 return False 145 146 async def get_number(self): 147 await self.update_peer_name() 148 149 # Prompt for a PIN 150 while True: 151 try: 152 self.print('###-----------------------------------') 153 self.print(f'### Pairing with {self.peer_name}') 154 self.print('###-----------------------------------') 155 return int(await self.prompt('>>> Enter PIN: ')) 156 except ValueError: 157 pass 158 159 async def display_number(self, number, digits): 160 await self.update_peer_name() 161 162 # Display a PIN code 163 self.print('###-----------------------------------') 164 self.print(f'### Pairing with {self.peer_name}') 165 self.print(f'### PIN: {number:0{digits}}') 166 self.print('###-----------------------------------') 167 168 async def get_string(self, max_length: int): 169 await self.update_peer_name() 170 171 # Prompt a PIN (for legacy pairing in classic) 172 self.print('###-----------------------------------') 173 self.print(f'### Pairing with {self.peer_name}') 174 self.print('###-----------------------------------') 175 count = 0 176 while True: 177 response = await self.prompt('>>> Enter PIN (1-6 chars):') 178 if len(response) == 0: 179 count += 1 180 if count > 3: 181 self.print('too many tries, stopping the pairing') 182 return None 183 184 self.print('no PIN was entered, try again') 185 continue 186 return response 187 188 189# ----------------------------------------------------------------------------- 190async def get_peer_name(peer, mode): 191 if mode == 'classic': 192 return await peer.request_name() 193 194 # Try to get the peer name from GATT 195 services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE) 196 if not services: 197 return None 198 199 values = await peer.read_characteristics_by_uuid( 200 GATT_DEVICE_NAME_CHARACTERISTIC, services[0] 201 ) 202 if values: 203 return values[0].decode('utf-8') 204 205 return None 206 207 208# ----------------------------------------------------------------------------- 209AUTHENTICATION_ERROR_RETURNED = [False, False] 210 211 212def read_with_error(connection): 213 if not connection.is_encrypted: 214 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 215 216 if AUTHENTICATION_ERROR_RETURNED[0]: 217 return bytes([1]) 218 219 AUTHENTICATION_ERROR_RETURNED[0] = True 220 raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) 221 222 223def write_with_error(connection, _value): 224 if not connection.is_encrypted: 225 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 226 227 if not AUTHENTICATION_ERROR_RETURNED[1]: 228 AUTHENTICATION_ERROR_RETURNED[1] = True 229 raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) 230 231 232# ----------------------------------------------------------------------------- 233def on_connection(connection, request): 234 print(color(f'<<< Connection: {connection}', 'green')) 235 236 # Listen for pairing events 237 connection.on('pairing_start', on_pairing_start) 238 connection.on('pairing', lambda keys: on_pairing(connection.peer_address, keys)) 239 connection.on('pairing_failure', on_pairing_failure) 240 241 # Listen for encryption changes 242 connection.on( 243 'connection_encryption_change', 244 lambda: on_connection_encryption_change(connection), 245 ) 246 247 # Request pairing if needed 248 if request: 249 print(color('>>> Requesting pairing', 'green')) 250 connection.request_pairing() 251 252 253# ----------------------------------------------------------------------------- 254def on_connection_encryption_change(connection): 255 print(color('@@@-----------------------------------', 'blue')) 256 print( 257 color( 258 f'@@@ Connection is {"" if connection.is_encrypted else "not"}encrypted', 259 'blue', 260 ) 261 ) 262 print(color('@@@-----------------------------------', 'blue')) 263 264 265# ----------------------------------------------------------------------------- 266def on_pairing_start(): 267 print(color('***-----------------------------------', 'magenta')) 268 print(color('*** Pairing starting', 'magenta')) 269 print(color('***-----------------------------------', 'magenta')) 270 271 272# ----------------------------------------------------------------------------- 273def on_pairing(address, keys): 274 print(color('***-----------------------------------', 'cyan')) 275 print(color(f'*** Paired! (peer identity={address})', 'cyan')) 276 keys.print(prefix=color('*** ', 'cyan')) 277 print(color('***-----------------------------------', 'cyan')) 278 Waiter.instance.terminate() 279 280 281# ----------------------------------------------------------------------------- 282def on_pairing_failure(reason): 283 print(color('***-----------------------------------', 'red')) 284 print(color(f'*** Pairing failed: {smp_error_name(reason)}', 'red')) 285 print(color('***-----------------------------------', 'red')) 286 Waiter.instance.terminate() 287 288 289# ----------------------------------------------------------------------------- 290async def pair( 291 mode, 292 sc, 293 mitm, 294 bond, 295 ctkd, 296 linger, 297 io, 298 oob, 299 prompt, 300 request, 301 print_keys, 302 keystore_file, 303 device_config, 304 hci_transport, 305 address_or_name, 306): 307 Waiter.instance = Waiter(linger=linger) 308 309 print('<<< connecting to HCI...') 310 async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink): 311 print('<<< connected') 312 313 # Create a device to manage the host 314 device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) 315 316 # Expose a GATT characteristic that can be used to trigger pairing by 317 # responding with an authentication error when read 318 if mode == 'le': 319 device.le_enabled = True 320 device.add_service( 321 Service( 322 '50DB505C-8AC4-4738-8448-3B1D9CC09CC5', 323 [ 324 Characteristic( 325 '552957FB-CF1F-4A31-9535-E78847E1A714', 326 Characteristic.Properties.READ 327 | Characteristic.Properties.WRITE, 328 Characteristic.READABLE | Characteristic.WRITEABLE, 329 CharacteristicValue( 330 read=read_with_error, write=write_with_error 331 ), 332 ) 333 ], 334 ) 335 ) 336 337 # Select LE or Classic 338 if mode == 'classic': 339 device.classic_enabled = True 340 device.classic_smp_enabled = ctkd 341 342 # Get things going 343 await device.power_on() 344 345 # Set a custom keystore if specified on the command line 346 if keystore_file: 347 device.keystore = JsonKeyStore.from_device(device, filename=keystore_file) 348 349 # Print the existing keys before pairing 350 if print_keys and device.keystore: 351 print(color('@@@-----------------------------------', 'blue')) 352 print(color('@@@ Pairing Keys:', 'blue')) 353 await device.keystore.print(prefix=color('@@@ ', 'blue')) 354 print(color('@@@-----------------------------------', 'blue')) 355 356 # Create an OOB context if needed 357 if oob: 358 our_oob_context = OobContext() 359 shared_data = ( 360 None 361 if oob == '-' 362 else OobData.from_ad(AdvertisingData.from_bytes(bytes.fromhex(oob))) 363 ) 364 legacy_context = OobLegacyContext() 365 oob_contexts = PairingConfig.OobConfig( 366 our_context=our_oob_context, 367 peer_data=shared_data, 368 legacy_context=legacy_context, 369 ) 370 oob_data = OobData( 371 address=device.random_address, 372 shared_data=shared_data, 373 legacy_context=legacy_context, 374 ) 375 print(color('@@@-----------------------------------', 'yellow')) 376 print(color('@@@ OOB Data:', 'yellow')) 377 print(color(f'@@@ {our_oob_context.share()}', 'yellow')) 378 print(color(f'@@@ TK={legacy_context.tk.hex()}', 'yellow')) 379 print(color(f'@@@ HEX: ({bytes(oob_data.to_ad()).hex()})', 'yellow')) 380 print(color('@@@-----------------------------------', 'yellow')) 381 else: 382 oob_contexts = None 383 384 # Set up a pairing config factory 385 device.pairing_config_factory = lambda connection: PairingConfig( 386 sc=sc, 387 mitm=mitm, 388 bonding=bond, 389 oob=oob_contexts, 390 delegate=Delegate(mode, connection, io, prompt), 391 ) 392 393 # Connect to a peer or wait for a connection 394 device.on('connection', lambda connection: on_connection(connection, request)) 395 if address_or_name is not None: 396 print(color(f'=== Connecting to {address_or_name}...', 'green')) 397 connection = await device.connect( 398 address_or_name, 399 transport=BT_LE_TRANSPORT if mode == 'le' else BT_BR_EDR_TRANSPORT, 400 ) 401 402 if not request: 403 try: 404 if mode == 'le': 405 await connection.pair() 406 else: 407 await connection.authenticate() 408 except ProtocolError as error: 409 print(color(f'Pairing failed: {error}', 'red')) 410 411 else: 412 if mode == 'le': 413 # Advertise so that peers can find us and connect 414 await device.start_advertising(auto_restart=True) 415 else: 416 # Become discoverable and connectable 417 await device.set_discoverable(True) 418 await device.set_connectable(True) 419 420 # Run until the user asks to exit 421 await Waiter.instance.wait_until_terminated() 422 423 424# ----------------------------------------------------------------------------- 425class LogHandler(logging.Handler): 426 def __init__(self): 427 super().__init__() 428 self.setFormatter(logging.Formatter('%(levelname)s:%(name)s:%(message)s')) 429 430 def emit(self, record): 431 message = self.format(record) 432 print(message) 433 434 435# ----------------------------------------------------------------------------- 436@click.command() 437@click.option( 438 '--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True 439) 440@click.option( 441 '--sc', 442 type=bool, 443 default=True, 444 help='Use the Secure Connections protocol', 445 show_default=True, 446) 447@click.option( 448 '--mitm', type=bool, default=True, help='Request MITM protection', show_default=True 449) 450@click.option( 451 '--bond', type=bool, default=True, help='Enable bonding', show_default=True 452) 453@click.option( 454 '--ctkd', 455 type=bool, 456 default=True, 457 help='Enable CTKD', 458 show_default=True, 459) 460@click.option('--linger', default=False, is_flag=True, help='Linger after pairing') 461@click.option( 462 '--io', 463 type=click.Choice( 464 ['keyboard', 'display', 'display+keyboard', 'display+yes/no', 'none'] 465 ), 466 default='display+keyboard', 467 show_default=True, 468) 469@click.option( 470 '--oob', 471 metavar='<oob-data-hex>', 472 help=( 473 'Use OOB pairing with this data from the peer ' 474 '(use "-" to enable OOB without peer data)' 475 ), 476) 477@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request') 478@click.option( 479 '--request', is_flag=True, help='Request that the connecting peer initiate pairing' 480) 481@click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing') 482@click.option( 483 '--keystore-file', 484 metavar='<filename>', 485 help='File in which to store the pairing keys', 486) 487@click.argument('device-config') 488@click.argument('hci_transport') 489@click.argument('address-or-name', required=False) 490def main( 491 mode, 492 sc, 493 mitm, 494 bond, 495 ctkd, 496 linger, 497 io, 498 oob, 499 prompt, 500 request, 501 print_keys, 502 keystore_file, 503 device_config, 504 hci_transport, 505 address_or_name, 506): 507 # Setup logging 508 log_handler = LogHandler() 509 root_logger = logging.getLogger() 510 root_logger.addHandler(log_handler) 511 root_logger.setLevel(os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 512 513 # Pair 514 asyncio.run( 515 pair( 516 mode, 517 sc, 518 mitm, 519 bond, 520 ctkd, 521 linger, 522 io, 523 oob, 524 prompt, 525 request, 526 print_keys, 527 keystore_file, 528 device_config, 529 hci_transport, 530 address_or_name, 531 ) 532 ) 533 534 535# ----------------------------------------------------------------------------- 536if __name__ == '__main__': 537 main() 538