1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Bumble Tool 17# ----------------------------------------------------------------------------- 18 19# ----------------------------------------------------------------------------- 20# Imports 21# ----------------------------------------------------------------------------- 22import asyncio 23import logging 24import os 25import random 26import re 27import humanize 28from typing import Optional, Union 29from collections import OrderedDict 30 31import click 32from prettytable import PrettyTable 33 34from prompt_toolkit import Application 35from prompt_toolkit.history import FileHistory 36from prompt_toolkit.completion import Completer, Completion, NestedCompleter 37from prompt_toolkit.key_binding import KeyBindings 38from prompt_toolkit.formatted_text import ANSI 39from prompt_toolkit.styles import Style 40from prompt_toolkit.filters import Condition 41from prompt_toolkit.widgets import TextArea, Frame 42from prompt_toolkit.widgets.toolbars import FormattedTextToolbar 43from prompt_toolkit.data_structures import Point 44from prompt_toolkit.layout import ( 45 Layout, 46 HSplit, 47 Window, 48 CompletionsMenu, 49 Float, 50 FormattedTextControl, 51 FloatContainer, 52 ConditionalContainer, 53 Dimension, 54) 55 56from bumble import __version__ 57import bumble.core 58from bumble import colors 59from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT 60from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer 61from bumble.utils import AsyncRunner 62from bumble.transport import open_transport_or_link 63from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor 64from bumble.gatt_client import CharacteristicProxy 65from bumble.hci import ( 66 HCI_Constant, 67 HCI_LE_1M_PHY, 68 HCI_LE_2M_PHY, 69 HCI_LE_CODED_PHY, 70) 71 72 73# ----------------------------------------------------------------------------- 74# Constants 75# ----------------------------------------------------------------------------- 76BUMBLE_USER_DIR = os.path.expanduser('~/.bumble') 77DEFAULT_RSSI_BAR_WIDTH = 20 78DEFAULT_CONNECTION_TIMEOUT = 30.0 79DISPLAY_MIN_RSSI = -100 80DISPLAY_MAX_RSSI = -30 81RSSI_MONITOR_INTERVAL = 5.0 # Seconds 82 83 84# ----------------------------------------------------------------------------- 85# Utils 86# ----------------------------------------------------------------------------- 87 88 89def le_phy_name(phy_id): 90 return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get( 91 phy_id, HCI_Constant.le_phy_name(phy_id) 92 ) 93 94 95def rssi_bar(rssi): 96 blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉'] 97 bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI) 98 bar_width = min(max(bar_width, 0), 1) 99 bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8) 100 bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8] 101 return f'{rssi:4} {bar_blocks}' 102 103 104def parse_phys(phys): 105 if phys.lower() == '*': 106 return None 107 108 phy_list = [] 109 elements = phys.lower().split(',') 110 for element in elements: 111 if element == '1m': 112 phy_list.append(HCI_LE_1M_PHY) 113 elif element == '2m': 114 phy_list.append(HCI_LE_2M_PHY) 115 elif element == 'coded': 116 phy_list.append(HCI_LE_CODED_PHY) 117 else: 118 raise ValueError('invalid PHY name') 119 return phy_list 120 121 122# ----------------------------------------------------------------------------- 123# Console App 124# ----------------------------------------------------------------------------- 125class ConsoleApp: 126 connected_peer: Optional[Peer] 127 128 def __init__(self): 129 self.known_addresses = set() 130 self.known_remote_attributes = [] 131 self.known_local_attributes = [] 132 self.device = None 133 self.connected_peer = None 134 self.top_tab = 'device' 135 self.monitor_rssi = False 136 self.connection_rssi = None 137 138 style = Style.from_dict( 139 { 140 'output-field': 'bg:#000044 #ffffff', 141 'input-field': 'bg:#000000 #ffffff', 142 'line': '#004400', 143 'error': 'fg:ansired', 144 } 145 ) 146 147 class LiveCompleter(Completer): 148 def __init__(self, words): 149 self.words = words 150 151 def get_completions(self, document, complete_event): 152 prefix = document.text_before_cursor.upper() 153 for word in [x for x in self.words if x.upper().startswith(prefix)]: 154 yield Completion(word, start_position=-len(prefix)) 155 156 def make_completer(): 157 return NestedCompleter.from_nested_dict( 158 { 159 'scan': {'on': None, 'off': None, 'clear': None}, 160 'advertise': {'on': None, 'off': None}, 161 'rssi': {'on': None, 'off': None}, 162 'show': { 163 'scan': None, 164 'log': None, 165 'device': None, 166 'local-services': None, 167 'remote-services': None, 168 'local-values': None, 169 'remote-values': None, 170 }, 171 'filter': { 172 'address': None, 173 }, 174 'connect': LiveCompleter(self.known_addresses), 175 'update-parameters': None, 176 'encrypt': None, 177 'disconnect': None, 178 'discover': {'services': None, 'attributes': None}, 179 'request-mtu': None, 180 'read': LiveCompleter(self.known_remote_attributes), 181 'write': LiveCompleter(self.known_remote_attributes), 182 'local-write': LiveCompleter(self.known_local_attributes), 183 'subscribe': LiveCompleter(self.known_remote_attributes), 184 'unsubscribe': LiveCompleter(self.known_remote_attributes), 185 'set-phy': {'1m': None, '2m': None, 'coded': None}, 186 'set-default-phy': None, 187 'quit': None, 188 'exit': None, 189 } 190 ) 191 192 self.input_field = TextArea( 193 height=1, 194 prompt="> ", 195 multiline=False, 196 wrap_lines=False, 197 completer=make_completer(), 198 history=FileHistory(os.path.join(BUMBLE_USER_DIR, 'history')), 199 ) 200 201 self.input_field.accept_handler = self.accept_input 202 203 self.output_height = Dimension(min=7, max=7, weight=1) 204 self.output_lines = [] 205 self.output = FormattedTextControl( 206 get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)) 207 ) 208 self.output_max_lines = 20 209 self.scan_results_text = FormattedTextControl() 210 self.local_services_text = FormattedTextControl() 211 self.remote_services_text = FormattedTextControl() 212 self.device_text = FormattedTextControl() 213 self.log_text = FormattedTextControl( 214 get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)) 215 ) 216 self.local_values_text = FormattedTextControl() 217 self.remote_values_text = FormattedTextControl() 218 self.log_height = Dimension(min=7, weight=4) 219 self.log_max_lines = 100 220 self.log_lines = [] 221 222 container = HSplit( 223 [ 224 ConditionalContainer( 225 Frame(Window(self.scan_results_text), title='Scan Results'), 226 filter=Condition(lambda: self.top_tab == 'scan'), 227 ), 228 ConditionalContainer( 229 Frame(Window(self.local_services_text), title='Local Services'), 230 filter=Condition(lambda: self.top_tab == 'local-services'), 231 ), 232 ConditionalContainer( 233 Frame(Window(self.local_values_text), title='Local Values'), 234 filter=Condition(lambda: self.top_tab == 'local-values'), 235 ), 236 ConditionalContainer( 237 Frame(Window(self.remote_services_text), title='Remote Services'), 238 filter=Condition(lambda: self.top_tab == 'remote-services'), 239 ), 240 ConditionalContainer( 241 Frame(Window(self.remote_values_text), title='Remote Values'), 242 filter=Condition(lambda: self.top_tab == 'remote-values'), 243 ), 244 ConditionalContainer( 245 Frame(Window(self.log_text, height=self.log_height), title='Log'), 246 filter=Condition(lambda: self.top_tab == 'log'), 247 ), 248 ConditionalContainer( 249 Frame(Window(self.device_text), title='Device'), 250 filter=Condition(lambda: self.top_tab == 'device'), 251 ), 252 Frame(Window(self.output, height=self.output_height)), 253 FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'), 254 self.input_field, 255 ] 256 ) 257 258 container = FloatContainer( 259 container, 260 floats=[ 261 Float( 262 xcursor=True, 263 ycursor=True, 264 content=CompletionsMenu(max_height=16, scroll_offset=1), 265 ), 266 ], 267 ) 268 269 layout = Layout(container, focused_element=self.input_field) 270 271 key_bindings = KeyBindings() 272 273 @key_bindings.add("c-c") 274 @key_bindings.add("c-q") 275 def _(event): 276 event.app.exit() 277 278 # pylint: disable=invalid-name 279 self.ui = Application( 280 layout=layout, style=style, key_bindings=key_bindings, full_screen=True 281 ) 282 283 async def run_async(self, device_config, transport): 284 rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop()) 285 286 async with await open_transport_or_link(transport) as (hci_source, hci_sink): 287 if device_config: 288 self.device = Device.from_config_file_with_hci( 289 device_config, hci_source, hci_sink 290 ) 291 else: 292 random_address = ( 293 f"{random.randint(192,255):02X}" # address is static random 294 ) 295 for random_byte in random.sample(range(255), 5): 296 random_address += f":{random_byte:02X}" 297 self.append_to_log(f"Setting random address: {random_address}") 298 self.device = Device.with_hci( 299 'Bumble', random_address, hci_source, hci_sink 300 ) 301 self.device.listener = DeviceListener(self) 302 await self.device.power_on() 303 self.show_device(self.device) 304 self.show_local_services(self.device.gatt_server.attributes) 305 306 # Run the UI 307 await self.ui.run_async() 308 309 rssi_monitoring_task.cancel() 310 311 def add_known_address(self, address): 312 self.known_addresses.add(address) 313 314 def accept_input(self, _): 315 if len(self.input_field.text) == 0: 316 return 317 self.append_to_output([('', '* '), ('ansicyan', self.input_field.text)], False) 318 self.ui.create_background_task(self.command(self.input_field.text)) 319 320 def get_status_bar_text(self): 321 scanning = "ON" if self.device and self.device.is_scanning else "OFF" 322 323 connection_state = 'NONE' 324 encryption_state = '' 325 att_mtu = '' 326 rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi) 327 328 if self.device: 329 if self.device.is_le_connecting: 330 connection_state = 'CONNECTING' 331 elif self.connected_peer: 332 connection = self.connected_peer.connection 333 connection_parameters = ( 334 f'{connection.parameters.connection_interval}/' 335 f'{connection.parameters.peripheral_latency}/' 336 f'{connection.parameters.supervision_timeout}' 337 ) 338 if connection.transport == BT_LE_TRANSPORT: 339 phy_state = ( 340 f' RX={le_phy_name(connection.phy.rx_phy)}/' 341 f'TX={le_phy_name(connection.phy.tx_phy)}' 342 ) 343 else: 344 phy_state = '' 345 connection_state = ( 346 f'{connection.peer_address} ' 347 f'{connection_parameters} ' 348 f'{connection.data_length}' 349 f'{phy_state}' 350 ) 351 encryption_state = ( 352 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED' 353 ) 354 att_mtu = f'ATT_MTU: {connection.att_mtu}' 355 356 return [ 357 ('ansigreen', f' SCAN: {scanning} '), 358 ('', ' '), 359 ('ansiblue', f' CONNECTION: {connection_state} '), 360 ('', ' '), 361 ('ansimagenta', f' {encryption_state} '), 362 ('', ' '), 363 ('ansicyan', f' {att_mtu} '), 364 ('', ' '), 365 ('ansiyellow', f' {rssi} '), 366 ] 367 368 def show_error(self, title, details=None): 369 appended = [('class:error', title)] 370 if details: 371 appended.append(('', f' {details}')) 372 self.append_to_output(appended) 373 374 def show_scan_results(self, scan_results): 375 max_lines = 40 # TEMP 376 lines = [] 377 keys = list(scan_results.keys())[:max_lines] 378 for key in keys: 379 lines.append(scan_results[key].to_display_string()) 380 self.scan_results_text.text = ANSI('\n'.join(lines)) 381 self.ui.invalidate() 382 383 def show_remote_services(self, services): 384 lines = [] 385 del self.known_remote_attributes[:] 386 for service in services: 387 lines.append(("ansicyan", f"{service}\n")) 388 389 for characteristic in service.characteristics: 390 lines.append(('ansimagenta', f' {characteristic} + \n')) 391 self.known_remote_attributes.append( 392 f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}' 393 ) 394 self.known_remote_attributes.append( 395 f'*.{characteristic.uuid.to_hex_str()}' 396 ) 397 self.known_remote_attributes.append(f'#{characteristic.handle:X}') 398 for descriptor in characteristic.descriptors: 399 lines.append(("ansigreen", f" {descriptor}\n")) 400 401 self.remote_services_text.text = lines 402 self.ui.invalidate() 403 404 def show_local_services(self, attributes): 405 lines = [] 406 del self.known_local_attributes[:] 407 for attribute in attributes: 408 if isinstance(attribute, Service): 409 # Save the most recent service for use later 410 service = attribute 411 lines.append(("ansicyan", f"{attribute}\n")) 412 elif isinstance(attribute, Characteristic): 413 # CharacteristicDeclaration includes all info from Characteristic 414 # no need to print it twice 415 continue 416 elif isinstance(attribute, CharacteristicDeclaration): 417 # Save the most recent characteristic declaration for use later 418 characteristic_declaration = attribute 419 self.known_local_attributes.append( 420 f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}' 421 ) 422 self.known_local_attributes.append( 423 f'#{attribute.characteristic.handle:X}' 424 ) 425 lines.append(("ansimagenta", f" {attribute}\n")) 426 elif isinstance(attribute, Descriptor): 427 self.known_local_attributes.append( 428 f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}' 429 ) 430 self.known_local_attributes.append(f'#{attribute.handle:X}') 431 lines.append(("ansigreen", f" {attribute}\n")) 432 else: 433 lines.append(("ansiyellow", f"{attribute}\n")) 434 435 self.local_services_text.text = lines 436 self.ui.invalidate() 437 438 def show_device(self, device): 439 lines = [] 440 441 lines.append(('ansicyan', 'Bumble Version: ')) 442 lines.append(('', f'{__version__}\n')) 443 lines.append(('ansicyan', 'Name: ')) 444 lines.append(('', f'{device.name}\n')) 445 lines.append(('ansicyan', 'Public Address: ')) 446 lines.append(('', f'{device.public_address}\n')) 447 lines.append(('ansicyan', 'Random Address: ')) 448 lines.append(('', f'{device.random_address}\n')) 449 lines.append(('ansicyan', 'LE Enabled: ')) 450 lines.append(('', f'{device.le_enabled}\n')) 451 lines.append(('ansicyan', 'Classic Enabled: ')) 452 lines.append(('', f'{device.classic_enabled}\n')) 453 lines.append(('ansicyan', 'Classic SC Enabled: ')) 454 lines.append(('', f'{device.classic_sc_enabled}\n')) 455 lines.append(('ansicyan', 'Classic SSP Enabled: ')) 456 lines.append(('', f'{device.classic_ssp_enabled}\n')) 457 lines.append(('ansicyan', 'Classic Class: ')) 458 lines.append(('', f'{device.class_of_device}\n')) 459 lines.append(('ansicyan', 'Discoverable: ')) 460 lines.append(('', f'{device.discoverable}\n')) 461 lines.append(('ansicyan', 'Connectable: ')) 462 lines.append(('', f'{device.connectable}\n')) 463 lines.append(('ansicyan', 'Advertising Data: ')) 464 lines.append(('', f'{device.advertising_data}\n')) 465 lines.append(('ansicyan', 'Scan Response Data: ')) 466 lines.append(('', f'{device.scan_response_data}\n')) 467 advertising_interval = ( 468 device.advertising_interval_min 469 if device.advertising_interval_min == device.advertising_interval_max 470 else ( 471 f'{device.advertising_interval_min} to ' 472 f'{device.advertising_interval_max}' 473 ) 474 ) 475 lines.append(('ansicyan', 'Advertising Interval: ')) 476 lines.append(('', f'{advertising_interval}\n')) 477 478 self.device_text.text = lines 479 self.ui.invalidate() 480 481 def append_to_output(self, line, invalidate=True): 482 if isinstance(line, str): 483 line = [('', line)] 484 self.output_lines = self.output_lines[-self.output_max_lines :] 485 self.output_lines.append(line) 486 formatted_text = [] 487 for line in self.output_lines: 488 formatted_text += line 489 formatted_text.append(('', '\n')) 490 self.output.text = formatted_text 491 if invalidate: 492 self.ui.invalidate() 493 494 def append_to_log(self, lines, invalidate=True): 495 self.log_lines.extend(lines.split('\n')) 496 self.log_lines = self.log_lines[-self.log_max_lines :] 497 self.log_text.text = ANSI('\n'.join(self.log_lines)) 498 if invalidate: 499 self.ui.invalidate() 500 501 async def discover_services(self): 502 if not self.connected_peer: 503 self.show_error('not connected') 504 return 505 506 # Discover all services, characteristics and descriptors 507 self.append_to_output('discovering services...') 508 await self.connected_peer.discover_services() 509 self.append_to_output( 510 f'found {len(self.connected_peer.services)} services,' 511 ' discovering characteristics...' 512 ) 513 await self.connected_peer.discover_characteristics() 514 self.append_to_output('found characteristics, discovering descriptors...') 515 for service in self.connected_peer.services: 516 for characteristic in service.characteristics: 517 await self.connected_peer.discover_descriptors(characteristic) 518 self.append_to_output('discovery completed') 519 520 self.show_remote_services(self.connected_peer.services) 521 522 async def discover_attributes(self): 523 if not self.connected_peer: 524 self.show_error('not connected') 525 return 526 527 # Discover all attributes 528 self.append_to_output('discovering attributes...') 529 attributes = await self.connected_peer.discover_attributes() 530 self.append_to_output(f'discovered {len(attributes)} attributes...') 531 532 self.show_attributes(attributes) 533 534 def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]: 535 if not self.connected_peer: 536 return None 537 parts = param.split('.') 538 if len(parts) == 2: 539 service_uuid = UUID(parts[0]) if parts[0] != '*' else None 540 characteristic_uuid = UUID(parts[1]) 541 for service in self.connected_peer.services: 542 if service_uuid is None or service.uuid == service_uuid: 543 for characteristic in service.characteristics: 544 if characteristic.uuid == characteristic_uuid: 545 return characteristic 546 elif len(parts) == 1: 547 if parts[0].startswith('#'): 548 attribute_handle = int(f'{parts[0][1:]}', 16) 549 for service in self.connected_peer.services: 550 for characteristic in service.characteristics: 551 if characteristic.handle == attribute_handle: 552 return characteristic 553 554 return None 555 556 def find_local_attribute( 557 self, param 558 ) -> Optional[Union[Characteristic, Descriptor]]: 559 parts = param.split('.') 560 if len(parts) == 3: 561 service_uuid = UUID(parts[0]) 562 characteristic_uuid = UUID(parts[1]) 563 descriptor_uuid = UUID(parts[2]) 564 return self.device.gatt_server.get_descriptor_attribute( 565 service_uuid, characteristic_uuid, descriptor_uuid 566 ) 567 if len(parts) == 2: 568 service_uuid = UUID(parts[0]) 569 characteristic_uuid = UUID(parts[1]) 570 characteristic_attributes = ( 571 self.device.gatt_server.get_characteristic_attributes( 572 service_uuid, characteristic_uuid 573 ) 574 ) 575 if characteristic_attributes: 576 return characteristic_attributes[1] 577 return None 578 elif len(parts) == 1: 579 if parts[0].startswith('#'): 580 attribute_handle = int(f'{parts[0][1:]}', 16) 581 attribute = self.device.gatt_server.get_attribute(attribute_handle) 582 if isinstance(attribute, (Characteristic, Descriptor)): 583 return attribute 584 return None 585 586 return None 587 588 async def rssi_monitor_loop(self): 589 while True: 590 if self.monitor_rssi and self.connected_peer: 591 self.connection_rssi = await self.connected_peer.connection.get_rssi() 592 await asyncio.sleep(RSSI_MONITOR_INTERVAL) 593 594 async def command(self, command): 595 try: 596 (keyword, *params) = command.strip().split(' ') 597 keyword = keyword.replace('-', '_').lower() 598 handler = getattr(self, f'do_{keyword}', None) 599 if handler: 600 await handler(params) 601 self.ui.invalidate() 602 else: 603 self.show_error('unknown command', keyword) 604 except Exception as error: 605 self.show_error(str(error)) 606 607 async def do_scan(self, params): 608 if len(params) == 0: 609 # Toggle scanning 610 if self.device.is_scanning: 611 await self.device.stop_scanning() 612 else: 613 await self.device.start_scanning() 614 elif params[0] == 'on': 615 if len(params) == 2: 616 if not params[1].startswith("filter="): 617 self.show_error( 618 'invalid syntax', 619 'expected address filter=key1:value1,key2:value,... ' 620 'available filters: address', 621 ) 622 # regex: (word):(any char except ,) 623 matches = re.findall(r"(\w+):([^,]+)", params[1]) 624 for match in matches: 625 if match[0] == "address": 626 self.device.listener.address_filter = match[1] 627 628 await self.device.start_scanning() 629 self.top_tab = 'scan' 630 elif params[0] == 'off': 631 await self.device.stop_scanning() 632 elif params[0] == 'clear': 633 self.device.listener.scan_results.clear() 634 self.known_addresses.clear() 635 self.show_scan_results(self.device.listener.scan_results) 636 else: 637 self.show_error('unsupported arguments for scan command') 638 639 async def do_rssi(self, params): 640 if len(params) == 0: 641 # Toggle monitoring 642 self.monitor_rssi = not self.monitor_rssi 643 elif params[0] == 'on': 644 self.monitor_rssi = True 645 elif params[0] == 'off': 646 self.monitor_rssi = False 647 else: 648 self.show_error('unsupported arguments for rssi command') 649 650 async def do_connect(self, params): 651 if len(params) != 1 and len(params) != 2: 652 self.show_error('invalid syntax', 'expected connect <address> [phys]') 653 return 654 655 if len(params) == 1: 656 phys = None 657 else: 658 phys = parse_phys(params[1]) 659 if phys is None: 660 connection_parameters_preferences = None 661 else: 662 connection_parameters_preferences = { 663 phy: ConnectionParametersPreferences() for phy in phys 664 } 665 666 if self.device.is_scanning: 667 await self.device.stop_scanning() 668 669 self.append_to_output('connecting...') 670 671 try: 672 await self.device.connect( 673 params[0], 674 connection_parameters_preferences=connection_parameters_preferences, 675 timeout=DEFAULT_CONNECTION_TIMEOUT, 676 ) 677 self.top_tab = 'services' 678 except bumble.core.TimeoutError: 679 self.show_error('connection timed out') 680 681 async def do_disconnect(self, _): 682 if self.device.is_le_connecting: 683 await self.device.cancel_connection() 684 else: 685 if not self.connected_peer: 686 self.show_error('not connected') 687 return 688 689 await self.connected_peer.connection.disconnect() 690 691 async def do_update_parameters(self, params): 692 if len(params) != 1 or len(params[0].split('/')) != 3: 693 self.show_error( 694 'invalid syntax', 695 'expected update-parameters <interval-min>-<interval-max>' 696 '/<max-latency>/<supervision>', 697 ) 698 return 699 700 if not self.connected_peer: 701 self.show_error('not connected') 702 return 703 704 connection_intervals, max_latency, supervision_timeout = params[0].split('/') 705 connection_interval_min, connection_interval_max = [ 706 int(x) for x in connection_intervals.split('-') 707 ] 708 max_latency = int(max_latency) 709 supervision_timeout = int(supervision_timeout) 710 await self.connected_peer.connection.update_parameters( 711 connection_interval_min, 712 connection_interval_max, 713 max_latency, 714 supervision_timeout, 715 ) 716 717 async def do_encrypt(self, _): 718 if not self.connected_peer: 719 self.show_error('not connected') 720 return 721 722 await self.connected_peer.connection.encrypt() 723 724 async def do_advertise(self, params): 725 if len(params) == 0: 726 # Toggle advertising 727 if self.device.is_advertising: 728 await self.device.stop_advertising() 729 else: 730 await self.device.start_advertising() 731 elif params[0] == 'on': 732 await self.device.start_advertising() 733 elif params[0] == 'off': 734 await self.device.stop_advertising() 735 else: 736 self.show_error('unsupported arguments for advertise command') 737 738 async def do_show(self, params): 739 if params: 740 if params[0] in { 741 'scan', 742 'log', 743 'device', 744 'local-services', 745 'remote-services', 746 'local-values', 747 'remote-values', 748 }: 749 self.top_tab = params[0] 750 self.ui.invalidate() 751 752 while self.top_tab == 'local-values': 753 await self.do_show_local_values() 754 await asyncio.sleep(1) 755 756 while self.top_tab == 'remote-values': 757 await self.do_show_remote_values() 758 await asyncio.sleep(1) 759 760 async def do_show_local_values(self): 761 prettytable = PrettyTable() 762 field_names = ["Service", "Characteristic", "Descriptor"] 763 764 # if there's no connections, add a column just for value 765 if not self.device.connections: 766 field_names.append("Value") 767 768 # if there are connections, add a column for each connection's value 769 for connection in self.device.connections.values(): 770 field_names.append(f"Connection {connection.handle}") 771 772 for attribute in self.device.gatt_server.attributes: 773 if isinstance(attribute, Characteristic): 774 service = self.device.gatt_server.get_attribute_group( 775 attribute.handle, Service 776 ) 777 if not service: 778 continue 779 values = [ 780 await attribute.read_value(connection) 781 for connection in self.device.connections.values() 782 ] 783 if not values: 784 values = [attribute.read_value(None)] 785 prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values) 786 787 elif isinstance(attribute, Descriptor): 788 service = self.device.gatt_server.get_attribute_group( 789 attribute.handle, Service 790 ) 791 if not service: 792 continue 793 characteristic = self.device.gatt_server.get_attribute_group( 794 attribute.handle, Characteristic 795 ) 796 if not characteristic: 797 continue 798 values = [ 799 await attribute.read_value(connection) 800 for connection in self.device.connections.values() 801 ] 802 if not values: 803 values = [await attribute.read_value(None)] 804 805 # TODO: future optimization: convert CCCD value to human readable string 806 807 prettytable.add_row( 808 [service.uuid, characteristic.uuid, attribute.type] + values 809 ) 810 811 prettytable.field_names = field_names 812 self.local_values_text.text = prettytable.get_string() 813 self.ui.invalidate() 814 815 async def do_show_remote_values(self): 816 prettytable = PrettyTable( 817 field_names=[ 818 "Connection", 819 "Service", 820 "Characteristic", 821 "Descriptor", 822 "Time", 823 "Value", 824 ] 825 ) 826 for connection in self.device.connections.values(): 827 for handle, (time, value) in connection.gatt_client.cached_values.items(): 828 row = [connection.handle] 829 attribute = connection.gatt_client.get_attributes(handle) 830 if not attribute: 831 continue 832 if len(attribute) == 3: 833 row.extend( 834 [attribute[0].uuid, attribute[1].uuid, attribute[2].type] 835 ) 836 elif len(attribute) == 2: 837 row.extend([attribute[0].uuid, attribute[1].uuid, ""]) 838 elif len(attribute) == 1: 839 row.extend([attribute[0].uuid, "", ""]) 840 else: 841 continue 842 843 row.extend([humanize.naturaltime(time), value]) 844 prettytable.add_row(row) 845 846 self.remote_values_text.text = prettytable.get_string() 847 self.ui.invalidate() 848 849 async def do_get_phy(self, _): 850 if not self.connected_peer: 851 self.show_error('not connected') 852 return 853 854 phy = await self.connected_peer.connection.get_phy() 855 self.append_to_output( 856 f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, ' 857 f'TX={HCI_Constant.le_phy_name(phy[1])}' 858 ) 859 860 async def do_request_mtu(self, params): 861 if len(params) != 1: 862 self.show_error('invalid syntax', 'expected request-mtu <mtu>') 863 return 864 865 if not self.connected_peer: 866 self.show_error('not connected') 867 return 868 869 await self.connected_peer.request_mtu(int(params[0])) 870 871 async def do_discover(self, params): 872 if not params: 873 self.show_error('invalid syntax', 'expected discover services|attributes') 874 return 875 876 discovery_type = params[0] 877 if discovery_type == 'services': 878 await self.discover_services() 879 elif discovery_type == 'attributes': 880 await self.discover_attributes() 881 882 async def do_read(self, params): 883 if len(params) != 1: 884 self.show_error('invalid syntax', 'expected read <attribute>') 885 return 886 887 if not self.connected_peer: 888 self.show_error('not connected') 889 return 890 891 characteristic = self.find_remote_characteristic(params[0]) 892 if characteristic is None: 893 self.show_error('no such characteristic') 894 return 895 896 value = await characteristic.read_value() 897 self.append_to_output(f'VALUE: 0x{value.hex()}') 898 899 async def do_write(self, params): 900 if not self.connected_peer: 901 self.show_error('not connected') 902 return 903 904 if len(params) != 2: 905 self.show_error('invalid syntax', 'expected write <attribute> <value>') 906 return 907 908 if params[1].upper().startswith("0X"): 909 value = bytes.fromhex(params[1][2:]) # parse as hex string 910 else: 911 try: 912 value = int(params[1]) # try as integer 913 except ValueError: 914 value = str.encode(params[1]) # must be a string 915 916 characteristic = self.find_remote_characteristic(params[0]) 917 if characteristic is None: 918 self.show_error('no such characteristic') 919 return 920 921 # use write with response if supported 922 with_response = characteristic.properties & Characteristic.Properties.WRITE 923 await characteristic.write_value(value, with_response=with_response) 924 925 async def do_local_write(self, params): 926 if len(params) != 2: 927 self.show_error( 928 'invalid syntax', 'expected local-write <attribute> <value>' 929 ) 930 return 931 932 if params[1].upper().startswith("0X"): 933 value = bytes.fromhex(params[1][2:]) # parse as hex string 934 else: 935 try: 936 value = int(params[1]).to_bytes(2, "little") # try as 2 byte integer 937 except ValueError: 938 value = str.encode(params[1]) # must be a string 939 940 attribute = self.find_local_attribute(params[0]) 941 if not attribute: 942 self.show_error('invalid syntax', 'unable to find attribute') 943 return 944 945 # send data to any subscribers 946 if isinstance(attribute, Characteristic): 947 await attribute.write_value(None, value) 948 if attribute.has_properties(Characteristic.NOTIFY): 949 await self.device.gatt_server.notify_subscribers(attribute) 950 if attribute.has_properties(Characteristic.INDICATE): 951 await self.device.gatt_server.indicate_subscribers(attribute) 952 953 async def do_subscribe(self, params): 954 if not self.connected_peer: 955 self.show_error('not connected') 956 return 957 958 if len(params) != 1: 959 self.show_error('invalid syntax', 'expected subscribe <attribute>') 960 return 961 962 characteristic = self.find_remote_characteristic(params[0]) 963 if characteristic is None: 964 self.show_error('no such characteristic') 965 return 966 967 await characteristic.subscribe( 968 lambda value: self.append_to_output( 969 f"{characteristic} VALUE: 0x{value.hex()}" 970 ), 971 ) 972 973 async def do_unsubscribe(self, params): 974 if not self.connected_peer: 975 self.show_error('not connected') 976 return 977 978 if len(params) != 1: 979 self.show_error('invalid syntax', 'expected subscribe <attribute>') 980 return 981 982 characteristic = self.find_remote_characteristic(params[0]) 983 if characteristic is None: 984 self.show_error('no such characteristic') 985 return 986 987 await characteristic.unsubscribe() 988 989 async def do_set_phy(self, params): 990 if len(params) != 1: 991 self.show_error( 992 'invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>' 993 ) 994 return 995 996 if not self.connected_peer: 997 self.show_error('not connected') 998 return 999 1000 if '/' in params[0]: 1001 tx_phys, rx_phys = params[0].split('/') 1002 else: 1003 tx_phys = params[0] 1004 rx_phys = tx_phys 1005 1006 await self.connected_peer.connection.set_phy( 1007 tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys) 1008 ) 1009 1010 async def do_set_default_phy(self, params): 1011 if len(params) != 1: 1012 self.show_error( 1013 'invalid syntax', 1014 'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>', 1015 ) 1016 return 1017 1018 if '/' in params[0]: 1019 tx_phys, rx_phys = params[0].split('/') 1020 else: 1021 tx_phys = params[0] 1022 rx_phys = tx_phys 1023 1024 await self.device.set_default_phy( 1025 tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys) 1026 ) 1027 1028 async def do_exit(self, _): 1029 self.ui.exit() 1030 1031 async def do_quit(self, _): 1032 self.ui.exit() 1033 1034 async def do_filter(self, params): 1035 if params[0] == "address": 1036 if len(params) != 2: 1037 self.show_error('invalid syntax', 'expected filter address <pattern>') 1038 return 1039 self.device.listener.address_filter = params[1] 1040 1041 1042# ----------------------------------------------------------------------------- 1043# Device and Connection Listener 1044# ----------------------------------------------------------------------------- 1045class DeviceListener(Device.Listener, Connection.Listener): 1046 def __init__(self, app): 1047 self.app = app 1048 self.scan_results = OrderedDict() 1049 self.address_filter = None 1050 1051 @property 1052 def address_filter(self): 1053 return self._address_filter 1054 1055 @address_filter.setter 1056 def address_filter(self, filter_addr): 1057 if filter_addr is None: 1058 self._address_filter = re.compile(r".*") 1059 else: 1060 self._address_filter = re.compile(filter_addr) 1061 self.scan_results = OrderedDict( 1062 filter(self.filter_address_match, self.scan_results) 1063 ) 1064 self.app.show_scan_results(self.scan_results) 1065 1066 def filter_address_match(self, address): 1067 """ 1068 Returns true if an address matches the filter 1069 """ 1070 return bool(self.address_filter.match(address)) 1071 1072 @AsyncRunner.run_in_task() 1073 # pylint: disable=invalid-overridden-method 1074 async def on_connection(self, connection): 1075 self.app.connected_peer = Peer(connection) 1076 self.app.connection_rssi = None 1077 self.app.append_to_output(f'connected to {self.app.connected_peer}') 1078 connection.listener = self 1079 1080 def on_disconnection(self, reason): 1081 self.app.append_to_output( 1082 f'disconnected from {self.app.connected_peer}, ' 1083 f'reason: {HCI_Constant.error_name(reason)}' 1084 ) 1085 self.app.connected_peer = None 1086 self.app.connection_rssi = None 1087 1088 def on_connection_parameters_update(self): 1089 self.app.append_to_output( 1090 f'connection parameters update: ' 1091 f'{self.app.connected_peer.connection.parameters}' 1092 ) 1093 1094 def on_connection_phy_update(self): 1095 self.app.append_to_output( 1096 f'connection phy update: {self.app.connected_peer.connection.phy}' 1097 ) 1098 1099 def on_connection_att_mtu_update(self): 1100 self.app.append_to_output( 1101 f'connection att mtu update: {self.app.connected_peer.connection.att_mtu}' 1102 ) 1103 1104 def on_connection_encryption_change(self): 1105 encryption_state = ( 1106 'encrypted' 1107 if self.app.connected_peer.connection.is_encrypted 1108 else 'not encrypted' 1109 ) 1110 self.app.append_to_output( 1111 'connection encryption change: ' f'{encryption_state}' 1112 ) 1113 1114 def on_connection_data_length_change(self): 1115 self.app.append_to_output( 1116 'connection data length change: ' 1117 f'{self.app.connected_peer.connection.data_length}' 1118 ) 1119 1120 def on_advertisement(self, advertisement): 1121 if not self.filter_address_match(str(advertisement.address)): 1122 return 1123 1124 entry_key = f'{advertisement.address}/{advertisement.address.address_type}' 1125 entry = self.scan_results.get(entry_key) 1126 if entry: 1127 entry.ad_data = advertisement.data 1128 entry.rssi = advertisement.rssi 1129 entry.connectable = advertisement.is_connectable 1130 else: 1131 self.app.add_known_address(str(advertisement.address)) 1132 self.scan_results[entry_key] = ScanResult( 1133 advertisement.address, 1134 advertisement.address.address_type, 1135 advertisement.data, 1136 advertisement.rssi, 1137 advertisement.is_connectable, 1138 ) 1139 1140 self.app.show_scan_results(self.scan_results) 1141 1142 1143# ----------------------------------------------------------------------------- 1144# Scanning 1145# ----------------------------------------------------------------------------- 1146class ScanResult: 1147 def __init__(self, address, address_type, ad_data, rssi, connectable): 1148 self.address = address 1149 self.address_type = address_type 1150 self.ad_data = ad_data 1151 self.rssi = rssi 1152 self.connectable = connectable 1153 1154 def to_display_string(self): 1155 address_type_string = ('P', 'R', 'PI', 'RI')[self.address_type] 1156 address_color = colors.yellow if self.connectable else colors.red 1157 if address_type_string.startswith('P'): 1158 type_color = colors.green 1159 else: 1160 type_color = colors.cyan 1161 1162 name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) 1163 if name is None: 1164 name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True) 1165 if name: 1166 # Convert to string 1167 try: 1168 name = name.decode() 1169 except UnicodeDecodeError: 1170 name = name.hex() 1171 else: 1172 name = '' 1173 1174 # Remove any '/P' qualifier suffix from the address string 1175 address_str = self.address.to_string(with_type_qualifier=False) 1176 1177 # RSSI bar 1178 bar_string = rssi_bar(self.rssi) 1179 bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string)) 1180 return ( 1181 f'{address_color(address_str)} [{type_color(address_type_string)}] ' 1182 f'{bar_string} {bar_padding} {name}' 1183 ) 1184 1185 1186# ----------------------------------------------------------------------------- 1187# Logging 1188# ----------------------------------------------------------------------------- 1189class LogHandler(logging.Handler): 1190 def __init__(self, app): 1191 super().__init__() 1192 self.app = app 1193 self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')) 1194 1195 def emit(self, record): 1196 message = self.format(record) 1197 self.app.append_to_log(message) 1198 1199 1200# ----------------------------------------------------------------------------- 1201# Main 1202# ----------------------------------------------------------------------------- 1203@click.command() 1204@click.option('--device-config', help='Device configuration file') 1205@click.argument('transport') 1206def main(device_config, transport): 1207 # Ensure that the BUMBLE_USER_DIR directory exists 1208 if not os.path.isdir(BUMBLE_USER_DIR): 1209 os.mkdir(BUMBLE_USER_DIR) 1210 1211 # Create an instance of the app 1212 app = ConsoleApp() 1213 1214 # Setup logging 1215 # logging.basicConfig(level = 'FATAL') 1216 # logging.basicConfig(level = 'DEBUG') 1217 root_logger = logging.getLogger() 1218 1219 root_logger.addHandler(LogHandler(app)) 1220 root_logger.setLevel(logging.DEBUG) 1221 1222 # Run until the user exits 1223 asyncio.run(app.run_async(device_config, transport)) 1224 1225 1226# ----------------------------------------------------------------------------- 1227if __name__ == "__main__": 1228 main() # pylint: disable=no-value-for-parameter 1229