1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import os 20import logging 21import click 22import aioconsole 23from colors import color 24 25from bumble.device import Device, Peer 26from bumble.transport import open_transport_or_link 27from bumble.smp import PairingDelegate, PairingConfig 28from bumble.smp import error_name as smp_error_name 29from bumble.keys import JsonKeyStore 30from bumble.core import ProtocolError 31from bumble.gatt import ( 32 GATT_DEVICE_NAME_CHARACTERISTIC, 33 GATT_GENERIC_ACCESS_SERVICE, 34 Service, 35 Characteristic, 36 CharacteristicValue 37) 38from bumble.att import ( 39 ATT_Error, 40 ATT_INSUFFICIENT_AUTHENTICATION_ERROR, 41 ATT_INSUFFICIENT_ENCRYPTION_ERROR 42) 43 44 45# ----------------------------------------------------------------------------- 46class Delegate(PairingDelegate): 47 def __init__(self, mode, connection, capability_string, prompt): 48 super().__init__({ 49 'keyboard': PairingDelegate.KEYBOARD_INPUT_ONLY, 50 'display': PairingDelegate.DISPLAY_OUTPUT_ONLY, 51 'display+keyboard': PairingDelegate.DISPLAY_OUTPUT_AND_KEYBOARD_INPUT, 52 'display+yes/no': PairingDelegate.DISPLAY_OUTPUT_AND_YES_NO_INPUT, 53 'none': PairingDelegate.NO_OUTPUT_NO_INPUT 54 }[capability_string.lower()]) 55 56 self.mode = mode 57 self.peer = Peer(connection) 58 self.peer_name = None 59 self.prompt = prompt 60 61 async def update_peer_name(self): 62 if self.peer_name is not None: 63 # We already asked the peer 64 return 65 66 # Try to get the peer's name 67 if self.peer: 68 peer_name = await get_peer_name(self.peer, self.mode) 69 self.peer_name = f'{peer_name or ""} [{self.peer.connection.peer_address}]' 70 else: 71 self.peer_name = '[?]' 72 73 async def accept(self): 74 if self.prompt: 75 await self.update_peer_name() 76 77 # Wait a bit to allow some of the log lines to print before we prompt 78 await asyncio.sleep(1) 79 80 # Prompt for acceptance 81 print(color('###-----------------------------------', 'yellow')) 82 print(color(f'### Pairing request from {self.peer_name}', 'yellow')) 83 print(color('###-----------------------------------', 'yellow')) 84 while True: 85 response = await aioconsole.ainput(color('>>> Accept? ', 'yellow')) 86 response = response.lower().strip() 87 if response == 'yes': 88 return True 89 elif response == 'no': 90 return False 91 else: 92 # Accept silently 93 return True 94 95 async def compare_numbers(self, number, digits): 96 await self.update_peer_name() 97 98 # Wait a bit to allow some of the log lines to print before we prompt 99 await asyncio.sleep(1) 100 101 # Prompt for a numeric comparison 102 print(color('###-----------------------------------', 'yellow')) 103 print(color(f'### Pairing with {self.peer_name}', 'yellow')) 104 print(color('###-----------------------------------', 'yellow')) 105 while True: 106 response = await aioconsole.ainput(color(f'>>> Does the other device display {number:0{digits}}? ', 'yellow')) 107 response = response.lower().strip() 108 if response == 'yes': 109 return True 110 elif response == 'no': 111 return False 112 113 async def get_number(self): 114 await self.update_peer_name() 115 116 # Wait a bit to allow some of the log lines to print before we prompt 117 await asyncio.sleep(1) 118 119 # Prompt for a PIN 120 while True: 121 try: 122 print(color('###-----------------------------------', 'yellow')) 123 print(color(f'### Pairing with {self.peer_name}', 'yellow')) 124 print(color('###-----------------------------------', 'yellow')) 125 return int(await aioconsole.ainput(color('>>> Enter PIN: ', 'yellow'))) 126 except ValueError: 127 pass 128 129 async def display_number(self, number, digits): 130 await self.update_peer_name() 131 132 # Wait a bit to allow some of the log lines to print before we prompt 133 await asyncio.sleep(1) 134 135 # Display a PIN code 136 print(color('###-----------------------------------', 'yellow')) 137 print(color(f'### Pairing with {self.peer_name}', 'yellow')) 138 print(color(f'### PIN: {number:0{digits}}', 'yellow')) 139 print(color('###-----------------------------------', 'yellow')) 140 141 142# ----------------------------------------------------------------------------- 143async def get_peer_name(peer, mode): 144 if mode == 'classic': 145 return await peer.request_name() 146 else: 147 # Try to get the peer name from GATT 148 services = await peer.discover_service(GATT_GENERIC_ACCESS_SERVICE) 149 if not services: 150 return None 151 152 values = await peer.read_characteristics_by_uuid(GATT_DEVICE_NAME_CHARACTERISTIC, services[0]) 153 if values: 154 return values[0].decode('utf-8') 155 156 157# ----------------------------------------------------------------------------- 158AUTHENTICATION_ERROR_RETURNED = [False, False] 159 160 161def read_with_error(connection): 162 if not connection.is_encrypted: 163 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 164 165 if AUTHENTICATION_ERROR_RETURNED[0]: 166 return bytes([1]) 167 else: 168 AUTHENTICATION_ERROR_RETURNED[0] = True 169 raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) 170 171 172def write_with_error(connection, value): 173 if not connection.is_encrypted: 174 raise ATT_Error(ATT_INSUFFICIENT_ENCRYPTION_ERROR) 175 176 if not AUTHENTICATION_ERROR_RETURNED[1]: 177 AUTHENTICATION_ERROR_RETURNED[1] = True 178 raise ATT_Error(ATT_INSUFFICIENT_AUTHENTICATION_ERROR) 179 180 181# ----------------------------------------------------------------------------- 182def on_connection(connection, request): 183 print(color(f'<<< Connection: {connection}', 'green')) 184 185 # Listen for pairing events 186 connection.on('pairing_start', on_pairing_start) 187 connection.on('pairing', on_pairing) 188 connection.on('pairing_failure', on_pairing_failure) 189 190 # Listen for encryption changes 191 connection.on( 192 'connection_encryption_change', 193 lambda: on_connection_encryption_change(connection) 194 ) 195 196 # Request pairing if needed 197 if request: 198 print(color('>>> Requesting pairing', 'green')) 199 connection.request_pairing() 200 201 202# ----------------------------------------------------------------------------- 203def on_connection_encryption_change(connection): 204 print(color('@@@-----------------------------------', 'blue')) 205 print(color(f'@@@ Connection is {"" if connection.is_encrypted else "not"}encrypted', 'blue')) 206 print(color('@@@-----------------------------------', 'blue')) 207 208 209# ----------------------------------------------------------------------------- 210def on_pairing_start(): 211 print(color('***-----------------------------------', 'magenta')) 212 print(color('*** Pairing starting', 'magenta')) 213 print(color('***-----------------------------------', 'magenta')) 214 215 216# ----------------------------------------------------------------------------- 217def on_pairing(keys): 218 print(color('***-----------------------------------', 'cyan')) 219 print(color('*** Paired!', 'cyan')) 220 keys.print(prefix=color('*** ', 'cyan')) 221 print(color('***-----------------------------------', 'cyan')) 222 223 224# ----------------------------------------------------------------------------- 225def on_pairing_failure(reason): 226 print(color('***-----------------------------------', 'red')) 227 print(color(f'*** Pairing failed: {smp_error_name(reason)}', 'red')) 228 print(color('***-----------------------------------', 'red')) 229 230 231# ----------------------------------------------------------------------------- 232async def pair( 233 mode, 234 sc, 235 mitm, 236 bond, 237 io, 238 prompt, 239 request, 240 print_keys, 241 keystore_file, 242 device_config, 243 hci_transport, 244 address_or_name 245): 246 print('<<< connecting to HCI...') 247 async with await open_transport_or_link(hci_transport) as (hci_source, hci_sink): 248 print('<<< connected') 249 250 # Create a device to manage the host 251 device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) 252 253 # Set a custom keystore if specified on the command line 254 if keystore_file: 255 device.keystore = JsonKeyStore(namespace=None, filename=keystore_file) 256 257 # Print the existing keys before pairing 258 if print_keys and device.keystore: 259 print(color('@@@-----------------------------------', 'blue')) 260 print(color('@@@ Pairing Keys:', 'blue')) 261 await device.keystore.print(prefix=color('@@@ ', 'blue')) 262 print(color('@@@-----------------------------------', 'blue')) 263 264 # Expose a GATT characteristic that can be used to trigger pairing by 265 # responding with an authentication error when read 266 if mode == 'le': 267 device.add_service( 268 Service( 269 '50DB505C-8AC4-4738-8448-3B1D9CC09CC5', 270 [ 271 Characteristic( 272 '552957FB-CF1F-4A31-9535-E78847E1A714', 273 Characteristic.READ | Characteristic.WRITE, 274 Characteristic.READABLE | Characteristic.WRITEABLE, 275 CharacteristicValue(read=read_with_error, write=write_with_error) 276 ) 277 ] 278 ) 279 ) 280 281 # Select LE or Classic 282 if mode == 'classic': 283 device.classic_enabled = True 284 device.le_enabled = False 285 286 # Get things going 287 await device.power_on() 288 289 # Set up a pairing config factory 290 device.pairing_config_factory = lambda connection: PairingConfig( 291 sc, 292 mitm, 293 bond, 294 Delegate(mode, connection, io, prompt) 295 ) 296 297 # Connect to a peer or wait for a connection 298 device.on('connection', lambda connection: on_connection(connection, request)) 299 if address_or_name is not None: 300 print(color(f'=== Connecting to {address_or_name}...', 'green')) 301 connection = await device.connect(address_or_name) 302 303 if not request: 304 try: 305 if mode == 'le': 306 await connection.pair() 307 else: 308 await connection.authenticate() 309 return 310 except ProtocolError as error: 311 print(color(f'Pairing failed: {error}', 'red')) 312 return 313 else: 314 # Advertise so that peers can find us and connect 315 await device.start_advertising(auto_restart=True) 316 317 await hci_source.wait_for_termination() 318 319 320# ----------------------------------------------------------------------------- 321@click.command() 322@click.option('--mode', type=click.Choice(['le', 'classic']), default='le', show_default=True) 323@click.option('--sc', type=bool, default=True, help='Use the Secure Connections protocol', show_default=True) 324@click.option('--mitm', type=bool, default=True, help='Request MITM protection', show_default=True) 325@click.option('--bond', type=bool, default=True, help='Enable bonding', show_default=True) 326@click.option('--io', type=click.Choice(['keyboard', 'display', 'display+keyboard', 'display+yes/no', 'none']), default='display+keyboard', show_default=True) 327@click.option('--prompt', is_flag=True, help='Prompt to accept/reject pairing request') 328@click.option('--request', is_flag=True, help='Request that the connecting peer initiate pairing') 329@click.option('--print-keys', is_flag=True, help='Print the bond keys before pairing') 330@click.option('--keystore-file', help='File in which to store the pairing keys') 331@click.argument('device-config') 332@click.argument('hci_transport') 333@click.argument('address-or-name', required=False) 334def main(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name): 335 logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 336 asyncio.run(pair(mode, sc, mitm, bond, io, prompt, request, print_keys, keystore_file, device_config, hci_transport, address_or_name)) 337 338 339# ----------------------------------------------------------------------------- 340if __name__ == '__main__': 341 main() 342