1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Bumble Tool 17# ----------------------------------------------------------------------------- 18 19# ----------------------------------------------------------------------------- 20# Imports 21# ----------------------------------------------------------------------------- 22import asyncio 23import logging 24import os 25import random 26import re 27from typing import Optional 28from collections import OrderedDict 29 30import click 31 32from prompt_toolkit import Application 33from prompt_toolkit.history import FileHistory 34from prompt_toolkit.completion import Completer, Completion, NestedCompleter 35from prompt_toolkit.key_binding import KeyBindings 36from prompt_toolkit.formatted_text import ANSI 37from prompt_toolkit.styles import Style 38from prompt_toolkit.filters import Condition 39from prompt_toolkit.widgets import TextArea, Frame 40from prompt_toolkit.widgets.toolbars import FormattedTextToolbar 41from prompt_toolkit.data_structures import Point 42from prompt_toolkit.layout import ( 43 Layout, 44 HSplit, 45 Window, 46 CompletionsMenu, 47 Float, 48 FormattedTextControl, 49 FloatContainer, 50 ConditionalContainer, 51 Dimension, 52) 53 54from bumble import __version__ 55import bumble.core 56from bumble import colors 57from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT 58from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer 59from bumble.utils import AsyncRunner 60from bumble.transport import open_transport_or_link 61from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor 62from bumble.gatt_client import CharacteristicProxy 63from bumble.hci import ( 64 HCI_Constant, 65 HCI_LE_1M_PHY, 66 HCI_LE_2M_PHY, 67 HCI_LE_CODED_PHY, 68) 69 70 71# ----------------------------------------------------------------------------- 72# Constants 73# ----------------------------------------------------------------------------- 74BUMBLE_USER_DIR = os.path.expanduser('~/.bumble') 75DEFAULT_RSSI_BAR_WIDTH = 20 76DEFAULT_CONNECTION_TIMEOUT = 30.0 77DISPLAY_MIN_RSSI = -100 78DISPLAY_MAX_RSSI = -30 79RSSI_MONITOR_INTERVAL = 5.0 # Seconds 80 81 82# ----------------------------------------------------------------------------- 83# Utils 84# ----------------------------------------------------------------------------- 85 86 87def le_phy_name(phy_id): 88 return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get( 89 phy_id, HCI_Constant.le_phy_name(phy_id) 90 ) 91 92 93def rssi_bar(rssi): 94 blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉'] 95 bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI) 96 bar_width = min(max(bar_width, 0), 1) 97 bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8) 98 bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8] 99 return f'{rssi:4} {bar_blocks}' 100 101 102def parse_phys(phys): 103 if phys.lower() == '*': 104 return None 105 106 phy_list = [] 107 elements = phys.lower().split(',') 108 for element in elements: 109 if element == '1m': 110 phy_list.append(HCI_LE_1M_PHY) 111 elif element == '2m': 112 phy_list.append(HCI_LE_2M_PHY) 113 elif element == 'coded': 114 phy_list.append(HCI_LE_CODED_PHY) 115 else: 116 raise ValueError('invalid PHY name') 117 return phy_list 118 119 120# ----------------------------------------------------------------------------- 121# Console App 122# ----------------------------------------------------------------------------- 123class ConsoleApp: 124 connected_peer: Optional[Peer] 125 126 def __init__(self): 127 self.known_addresses = set() 128 self.known_attributes = [] 129 self.device = None 130 self.connected_peer = None 131 self.top_tab = 'device' 132 self.monitor_rssi = False 133 self.connection_rssi = None 134 135 style = Style.from_dict( 136 { 137 'output-field': 'bg:#000044 #ffffff', 138 'input-field': 'bg:#000000 #ffffff', 139 'line': '#004400', 140 'error': 'fg:ansired', 141 } 142 ) 143 144 class LiveCompleter(Completer): 145 def __init__(self, words): 146 self.words = words 147 148 def get_completions(self, document, complete_event): 149 prefix = document.text_before_cursor.upper() 150 for word in [x for x in self.words if x.upper().startswith(prefix)]: 151 yield Completion(word, start_position=-len(prefix)) 152 153 def make_completer(): 154 return NestedCompleter.from_nested_dict( 155 { 156 'scan': {'on': None, 'off': None, 'clear': None}, 157 'advertise': {'on': None, 'off': None}, 158 'rssi': {'on': None, 'off': None}, 159 'show': { 160 'scan': None, 161 'log': None, 162 'device': None, 163 'local-services': None, 164 'remote-services': None, 165 }, 166 'filter': { 167 'address': None, 168 }, 169 'connect': LiveCompleter(self.known_addresses), 170 'update-parameters': None, 171 'encrypt': None, 172 'disconnect': None, 173 'discover': {'services': None, 'attributes': None}, 174 'request-mtu': None, 175 'read': LiveCompleter(self.known_attributes), 176 'write': LiveCompleter(self.known_attributes), 177 'subscribe': LiveCompleter(self.known_attributes), 178 'unsubscribe': LiveCompleter(self.known_attributes), 179 'set-phy': {'1m': None, '2m': None, 'coded': None}, 180 'set-default-phy': None, 181 'quit': None, 182 'exit': None, 183 } 184 ) 185 186 self.input_field = TextArea( 187 height=1, 188 prompt="> ", 189 multiline=False, 190 wrap_lines=False, 191 completer=make_completer(), 192 history=FileHistory(os.path.join(BUMBLE_USER_DIR, 'history')), 193 ) 194 195 self.input_field.accept_handler = self.accept_input 196 197 self.output_height = Dimension(min=7, max=7, weight=1) 198 self.output_lines = [] 199 self.output = FormattedTextControl( 200 get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1)) 201 ) 202 self.output_max_lines = 20 203 self.scan_results_text = FormattedTextControl() 204 self.local_services_text = FormattedTextControl() 205 self.remote_services_text = FormattedTextControl() 206 self.device_text = FormattedTextControl() 207 self.log_text = FormattedTextControl( 208 get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1)) 209 ) 210 self.log_height = Dimension(min=7, weight=4) 211 self.log_max_lines = 100 212 self.log_lines = [] 213 214 container = HSplit( 215 [ 216 ConditionalContainer( 217 Frame(Window(self.scan_results_text), title='Scan Results'), 218 filter=Condition(lambda: self.top_tab == 'scan'), 219 ), 220 ConditionalContainer( 221 Frame(Window(self.local_services_text), title='Local Services'), 222 filter=Condition(lambda: self.top_tab == 'local-services'), 223 ), 224 ConditionalContainer( 225 Frame(Window(self.remote_services_text), title='Remote Services'), 226 filter=Condition(lambda: self.top_tab == 'remote-services'), 227 ), 228 ConditionalContainer( 229 Frame(Window(self.log_text, height=self.log_height), title='Log'), 230 filter=Condition(lambda: self.top_tab == 'log'), 231 ), 232 ConditionalContainer( 233 Frame(Window(self.device_text), title='Device'), 234 filter=Condition(lambda: self.top_tab == 'device'), 235 ), 236 Frame(Window(self.output, height=self.output_height)), 237 FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'), 238 self.input_field, 239 ] 240 ) 241 242 container = FloatContainer( 243 container, 244 floats=[ 245 Float( 246 xcursor=True, 247 ycursor=True, 248 content=CompletionsMenu(max_height=16, scroll_offset=1), 249 ), 250 ], 251 ) 252 253 layout = Layout(container, focused_element=self.input_field) 254 255 key_bindings = KeyBindings() 256 257 @key_bindings.add("c-c") 258 @key_bindings.add("c-q") 259 def _(event): 260 event.app.exit() 261 262 # pylint: disable=invalid-name 263 self.ui = Application( 264 layout=layout, style=style, key_bindings=key_bindings, full_screen=True 265 ) 266 267 async def run_async(self, device_config, transport): 268 rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop()) 269 270 async with await open_transport_or_link(transport) as (hci_source, hci_sink): 271 if device_config: 272 self.device = Device.from_config_file_with_hci( 273 device_config, hci_source, hci_sink 274 ) 275 else: 276 random_address = ( 277 f"{random.randint(192,255):02X}" # address is static random 278 ) 279 for random_byte in random.sample(range(255), 5): 280 random_address += f":{random_byte:02X}" 281 self.append_to_log(f"Setting random address: {random_address}") 282 self.device = Device.with_hci( 283 'Bumble', random_address, hci_source, hci_sink 284 ) 285 self.device.listener = DeviceListener(self) 286 await self.device.power_on() 287 self.show_device(self.device) 288 self.show_local_services(self.device.gatt_server.attributes) 289 290 # Run the UI 291 await self.ui.run_async() 292 293 rssi_monitoring_task.cancel() 294 295 def add_known_address(self, address): 296 self.known_addresses.add(address) 297 298 def accept_input(self, _): 299 if len(self.input_field.text) == 0: 300 return 301 self.append_to_output([('', '* '), ('ansicyan', self.input_field.text)], False) 302 self.ui.create_background_task(self.command(self.input_field.text)) 303 304 def get_status_bar_text(self): 305 scanning = "ON" if self.device and self.device.is_scanning else "OFF" 306 307 connection_state = 'NONE' 308 encryption_state = '' 309 att_mtu = '' 310 rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi) 311 312 if self.device: 313 if self.device.is_le_connecting: 314 connection_state = 'CONNECTING' 315 elif self.connected_peer: 316 connection = self.connected_peer.connection 317 connection_parameters = ( 318 f'{connection.parameters.connection_interval}/' 319 f'{connection.parameters.peripheral_latency}/' 320 f'{connection.parameters.supervision_timeout}' 321 ) 322 if connection.transport == BT_LE_TRANSPORT: 323 phy_state = ( 324 f' RX={le_phy_name(connection.phy.rx_phy)}/' 325 f'TX={le_phy_name(connection.phy.tx_phy)}' 326 ) 327 else: 328 phy_state = '' 329 connection_state = ( 330 f'{connection.peer_address} ' 331 f'{connection_parameters} ' 332 f'{connection.data_length}' 333 f'{phy_state}' 334 ) 335 encryption_state = ( 336 'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED' 337 ) 338 att_mtu = f'ATT_MTU: {connection.att_mtu}' 339 340 return [ 341 ('ansigreen', f' SCAN: {scanning} '), 342 ('', ' '), 343 ('ansiblue', f' CONNECTION: {connection_state} '), 344 ('', ' '), 345 ('ansimagenta', f' {encryption_state} '), 346 ('', ' '), 347 ('ansicyan', f' {att_mtu} '), 348 ('', ' '), 349 ('ansiyellow', f' {rssi} '), 350 ] 351 352 def show_error(self, title, details=None): 353 appended = [('class:error', title)] 354 if details: 355 appended.append(('', f' {details}')) 356 self.append_to_output(appended) 357 358 def show_scan_results(self, scan_results): 359 max_lines = 40 # TEMP 360 lines = [] 361 keys = list(scan_results.keys())[:max_lines] 362 for key in keys: 363 lines.append(scan_results[key].to_display_string()) 364 self.scan_results_text.text = ANSI('\n'.join(lines)) 365 self.ui.invalidate() 366 367 def show_remote_services(self, services): 368 lines = [] 369 del self.known_attributes[:] 370 for service in services: 371 lines.append(("ansicyan", f"{service}\n")) 372 373 for characteristic in service.characteristics: 374 lines.append(('ansimagenta', f' {characteristic} + \n')) 375 self.known_attributes.append( 376 f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}' 377 ) 378 self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}') 379 self.known_attributes.append(f'#{characteristic.handle:X}') 380 for descriptor in characteristic.descriptors: 381 lines.append(("ansigreen", f" {descriptor}\n")) 382 383 self.remote_services_text.text = lines 384 self.ui.invalidate() 385 386 def show_local_services(self, attributes): 387 lines = [] 388 for attribute in attributes: 389 if isinstance(attribute, Service): 390 lines.append(("ansicyan", f"{attribute}\n")) 391 elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)): 392 lines.append(("ansimagenta", f" {attribute}\n")) 393 elif isinstance(attribute, Descriptor): 394 lines.append(("ansigreen", f" {attribute}\n")) 395 else: 396 lines.append(("ansiyellow", f"{attribute}\n")) 397 398 self.local_services_text.text = lines 399 self.ui.invalidate() 400 401 def show_device(self, device): 402 lines = [] 403 404 lines.append(('ansicyan', 'Bumble Version: ')) 405 lines.append(('', f'{__version__}\n')) 406 lines.append(('ansicyan', 'Name: ')) 407 lines.append(('', f'{device.name}\n')) 408 lines.append(('ansicyan', 'Public Address: ')) 409 lines.append(('', f'{device.public_address}\n')) 410 lines.append(('ansicyan', 'Random Address: ')) 411 lines.append(('', f'{device.random_address}\n')) 412 lines.append(('ansicyan', 'LE Enabled: ')) 413 lines.append(('', f'{device.le_enabled}\n')) 414 lines.append(('ansicyan', 'Classic Enabled: ')) 415 lines.append(('', f'{device.classic_enabled}\n')) 416 lines.append(('ansicyan', 'Classic SC Enabled: ')) 417 lines.append(('', f'{device.classic_sc_enabled}\n')) 418 lines.append(('ansicyan', 'Classic SSP Enabled: ')) 419 lines.append(('', f'{device.classic_ssp_enabled}\n')) 420 lines.append(('ansicyan', 'Classic Class: ')) 421 lines.append(('', f'{device.class_of_device}\n')) 422 lines.append(('ansicyan', 'Discoverable: ')) 423 lines.append(('', f'{device.discoverable}\n')) 424 lines.append(('ansicyan', 'Connectable: ')) 425 lines.append(('', f'{device.connectable}\n')) 426 lines.append(('ansicyan', 'Advertising Data: ')) 427 lines.append(('', f'{device.advertising_data}\n')) 428 lines.append(('ansicyan', 'Scan Response Data: ')) 429 lines.append(('', f'{device.scan_response_data}\n')) 430 advertising_interval = ( 431 device.advertising_interval_min 432 if device.advertising_interval_min == device.advertising_interval_max 433 else ( 434 f'{device.advertising_interval_min} to ' 435 f'{device.advertising_interval_max}' 436 ) 437 ) 438 lines.append(('ansicyan', 'Advertising Interval: ')) 439 lines.append(('', f'{advertising_interval}\n')) 440 441 self.device_text.text = lines 442 self.ui.invalidate() 443 444 def append_to_output(self, line, invalidate=True): 445 if isinstance(line, str): 446 line = [('', line)] 447 self.output_lines = self.output_lines[-self.output_max_lines :] 448 self.output_lines.append(line) 449 formatted_text = [] 450 for line in self.output_lines: 451 formatted_text += line 452 formatted_text.append(('', '\n')) 453 self.output.text = formatted_text 454 if invalidate: 455 self.ui.invalidate() 456 457 def append_to_log(self, lines, invalidate=True): 458 self.log_lines.extend(lines.split('\n')) 459 self.log_lines = self.log_lines[-self.log_max_lines :] 460 self.log_text.text = ANSI('\n'.join(self.log_lines)) 461 if invalidate: 462 self.ui.invalidate() 463 464 async def discover_services(self): 465 if not self.connected_peer: 466 self.show_error('not connected') 467 return 468 469 # Discover all services, characteristics and descriptors 470 self.append_to_output('discovering services...') 471 await self.connected_peer.discover_services() 472 self.append_to_output( 473 f'found {len(self.connected_peer.services)} services,' 474 ' discovering characteristics...' 475 ) 476 await self.connected_peer.discover_characteristics() 477 self.append_to_output('found characteristics, discovering descriptors...') 478 for service in self.connected_peer.services: 479 for characteristic in service.characteristics: 480 await self.connected_peer.discover_descriptors(characteristic) 481 self.append_to_output('discovery completed') 482 483 self.show_remote_services(self.connected_peer.services) 484 485 async def discover_attributes(self): 486 if not self.connected_peer: 487 self.show_error('not connected') 488 return 489 490 # Discover all attributes 491 self.append_to_output('discovering attributes...') 492 attributes = await self.connected_peer.discover_attributes() 493 self.append_to_output(f'discovered {len(attributes)} attributes...') 494 495 self.show_attributes(attributes) 496 497 def find_characteristic(self, param) -> Optional[CharacteristicProxy]: 498 if not self.connected_peer: 499 return None 500 parts = param.split('.') 501 if len(parts) == 2: 502 service_uuid = UUID(parts[0]) if parts[0] != '*' else None 503 characteristic_uuid = UUID(parts[1]) 504 for service in self.connected_peer.services: 505 if service_uuid is None or service.uuid == service_uuid: 506 for characteristic in service.characteristics: 507 if characteristic.uuid == characteristic_uuid: 508 return characteristic 509 elif len(parts) == 1: 510 if parts[0].startswith('#'): 511 attribute_handle = int(f'{parts[0][1:]}', 16) 512 for service in self.connected_peer.services: 513 for characteristic in service.characteristics: 514 if characteristic.handle == attribute_handle: 515 return characteristic 516 517 return None 518 519 async def rssi_monitor_loop(self): 520 while True: 521 if self.monitor_rssi and self.connected_peer: 522 self.connection_rssi = await self.connected_peer.connection.get_rssi() 523 await asyncio.sleep(RSSI_MONITOR_INTERVAL) 524 525 async def command(self, command): 526 try: 527 (keyword, *params) = command.strip().split(' ') 528 keyword = keyword.replace('-', '_').lower() 529 handler = getattr(self, f'do_{keyword}', None) 530 if handler: 531 await handler(params) 532 self.ui.invalidate() 533 else: 534 self.show_error('unknown command', keyword) 535 except Exception as error: 536 self.show_error(str(error)) 537 538 async def do_scan(self, params): 539 if len(params) == 0: 540 # Toggle scanning 541 if self.device.is_scanning: 542 await self.device.stop_scanning() 543 else: 544 await self.device.start_scanning() 545 elif params[0] == 'on': 546 if len(params) == 2: 547 if not params[1].startswith("filter="): 548 self.show_error( 549 'invalid syntax', 550 'expected address filter=key1:value1,key2:value,... ' 551 'available filters: address', 552 ) 553 # regex: (word):(any char except ,) 554 matches = re.findall(r"(\w+):([^,]+)", params[1]) 555 for match in matches: 556 if match[0] == "address": 557 self.device.listener.address_filter = match[1] 558 559 await self.device.start_scanning() 560 self.top_tab = 'scan' 561 elif params[0] == 'off': 562 await self.device.stop_scanning() 563 elif params[0] == 'clear': 564 self.device.listener.scan_results.clear() 565 self.known_addresses.clear() 566 self.show_scan_results(self.device.listener.scan_results) 567 else: 568 self.show_error('unsupported arguments for scan command') 569 570 async def do_rssi(self, params): 571 if len(params) == 0: 572 # Toggle monitoring 573 self.monitor_rssi = not self.monitor_rssi 574 elif params[0] == 'on': 575 self.monitor_rssi = True 576 elif params[0] == 'off': 577 self.monitor_rssi = False 578 else: 579 self.show_error('unsupported arguments for rssi command') 580 581 async def do_connect(self, params): 582 if len(params) != 1 and len(params) != 2: 583 self.show_error('invalid syntax', 'expected connect <address> [phys]') 584 return 585 586 if len(params) == 1: 587 phys = None 588 else: 589 phys = parse_phys(params[1]) 590 if phys is None: 591 connection_parameters_preferences = None 592 else: 593 connection_parameters_preferences = { 594 phy: ConnectionParametersPreferences() for phy in phys 595 } 596 597 if self.device.is_scanning: 598 await self.device.stop_scanning() 599 600 self.append_to_output('connecting...') 601 602 try: 603 await self.device.connect( 604 params[0], 605 connection_parameters_preferences=connection_parameters_preferences, 606 timeout=DEFAULT_CONNECTION_TIMEOUT, 607 ) 608 self.top_tab = 'services' 609 except bumble.core.TimeoutError: 610 self.show_error('connection timed out') 611 612 async def do_disconnect(self, _): 613 if self.device.is_le_connecting: 614 await self.device.cancel_connection() 615 else: 616 if not self.connected_peer: 617 self.show_error('not connected') 618 return 619 620 await self.connected_peer.connection.disconnect() 621 622 async def do_update_parameters(self, params): 623 if len(params) != 1 or len(params[0].split('/')) != 3: 624 self.show_error( 625 'invalid syntax', 626 'expected update-parameters <interval-min>-<interval-max>' 627 '/<max-latency>/<supervision>', 628 ) 629 return 630 631 if not self.connected_peer: 632 self.show_error('not connected') 633 return 634 635 connection_intervals, max_latency, supervision_timeout = params[0].split('/') 636 connection_interval_min, connection_interval_max = [ 637 int(x) for x in connection_intervals.split('-') 638 ] 639 max_latency = int(max_latency) 640 supervision_timeout = int(supervision_timeout) 641 await self.connected_peer.connection.update_parameters( 642 connection_interval_min, 643 connection_interval_max, 644 max_latency, 645 supervision_timeout, 646 ) 647 648 async def do_encrypt(self, _): 649 if not self.connected_peer: 650 self.show_error('not connected') 651 return 652 653 await self.connected_peer.connection.encrypt() 654 655 async def do_advertise(self, params): 656 if len(params) == 0: 657 # Toggle advertising 658 if self.device.is_advertising: 659 await self.device.stop_advertising() 660 else: 661 await self.device.start_advertising() 662 elif params[0] == 'on': 663 await self.device.start_advertising() 664 elif params[0] == 'off': 665 await self.device.stop_advertising() 666 else: 667 self.show_error('unsupported arguments for advertise command') 668 669 async def do_show(self, params): 670 if params: 671 if params[0] in { 672 'scan', 673 'log', 674 'device', 675 'local-services', 676 'remote-services', 677 }: 678 self.top_tab = params[0] 679 self.ui.invalidate() 680 681 async def do_get_phy(self, _): 682 if not self.connected_peer: 683 self.show_error('not connected') 684 return 685 686 phy = await self.connected_peer.connection.get_phy() 687 self.append_to_output( 688 f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, ' 689 f'TX={HCI_Constant.le_phy_name(phy[1])}' 690 ) 691 692 async def do_request_mtu(self, params): 693 if len(params) != 1: 694 self.show_error('invalid syntax', 'expected request-mtu <mtu>') 695 return 696 697 if not self.connected_peer: 698 self.show_error('not connected') 699 return 700 701 await self.connected_peer.request_mtu(int(params[0])) 702 703 async def do_discover(self, params): 704 if not params: 705 self.show_error('invalid syntax', 'expected discover services|attributes') 706 return 707 708 discovery_type = params[0] 709 if discovery_type == 'services': 710 await self.discover_services() 711 elif discovery_type == 'attributes': 712 await self.discover_attributes() 713 714 async def do_read(self, params): 715 if len(params) != 1: 716 self.show_error('invalid syntax', 'expected read <attribute>') 717 return 718 719 if not self.connected_peer: 720 self.show_error('not connected') 721 return 722 723 characteristic = self.find_characteristic(params[0]) 724 if characteristic is None: 725 self.show_error('no such characteristic') 726 return 727 728 value = await characteristic.read_value() 729 self.append_to_output(f'VALUE: 0x{value.hex()}') 730 731 async def do_write(self, params): 732 if not self.connected_peer: 733 self.show_error('not connected') 734 return 735 736 if len(params) != 2: 737 self.show_error('invalid syntax', 'expected write <attribute> <value>') 738 return 739 740 if params[1].upper().startswith("0X"): 741 value = bytes.fromhex(params[1][2:]) # parse as hex string 742 else: 743 try: 744 value = int(params[1]) # try as integer 745 except ValueError: 746 value = str.encode(params[1]) # must be a string 747 748 characteristic = self.find_characteristic(params[0]) 749 if characteristic is None: 750 self.show_error('no such characteristic') 751 return 752 753 # use write with response if supported 754 with_response = characteristic.properties & Characteristic.WRITE 755 await characteristic.write_value(value, with_response=with_response) 756 757 async def do_subscribe(self, params): 758 if not self.connected_peer: 759 self.show_error('not connected') 760 return 761 762 if len(params) != 1: 763 self.show_error('invalid syntax', 'expected subscribe <attribute>') 764 return 765 766 characteristic = self.find_characteristic(params[0]) 767 if characteristic is None: 768 self.show_error('no such characteristic') 769 return 770 771 await characteristic.subscribe( 772 lambda value: self.append_to_output( 773 f"{characteristic} VALUE: 0x{value.hex()}" 774 ), 775 ) 776 777 async def do_unsubscribe(self, params): 778 if not self.connected_peer: 779 self.show_error('not connected') 780 return 781 782 if len(params) != 1: 783 self.show_error('invalid syntax', 'expected subscribe <attribute>') 784 return 785 786 characteristic = self.find_characteristic(params[0]) 787 if characteristic is None: 788 self.show_error('no such characteristic') 789 return 790 791 await characteristic.unsubscribe() 792 793 async def do_set_phy(self, params): 794 if len(params) != 1: 795 self.show_error( 796 'invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>' 797 ) 798 return 799 800 if not self.connected_peer: 801 self.show_error('not connected') 802 return 803 804 if '/' in params[0]: 805 tx_phys, rx_phys = params[0].split('/') 806 else: 807 tx_phys = params[0] 808 rx_phys = tx_phys 809 810 await self.connected_peer.connection.set_phy( 811 tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys) 812 ) 813 814 async def do_set_default_phy(self, params): 815 if len(params) != 1: 816 self.show_error( 817 'invalid syntax', 818 'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>', 819 ) 820 return 821 822 if '/' in params[0]: 823 tx_phys, rx_phys = params[0].split('/') 824 else: 825 tx_phys = params[0] 826 rx_phys = tx_phys 827 828 await self.device.set_default_phy( 829 tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys) 830 ) 831 832 async def do_exit(self, _): 833 self.ui.exit() 834 835 async def do_quit(self, _): 836 self.ui.exit() 837 838 async def do_filter(self, params): 839 if params[0] == "address": 840 if len(params) != 2: 841 self.show_error('invalid syntax', 'expected filter address <pattern>') 842 return 843 self.device.listener.address_filter = params[1] 844 845 846# ----------------------------------------------------------------------------- 847# Device and Connection Listener 848# ----------------------------------------------------------------------------- 849class DeviceListener(Device.Listener, Connection.Listener): 850 def __init__(self, app): 851 self.app = app 852 self.scan_results = OrderedDict() 853 self.address_filter = None 854 855 @property 856 def address_filter(self): 857 return self._address_filter 858 859 @address_filter.setter 860 def address_filter(self, filter_addr): 861 if filter_addr is None: 862 self._address_filter = re.compile(r".*") 863 else: 864 self._address_filter = re.compile(filter_addr) 865 self.scan_results = OrderedDict( 866 filter(self.filter_address_match, self.scan_results) 867 ) 868 self.app.show_scan_results(self.scan_results) 869 870 def filter_address_match(self, address): 871 """ 872 Returns true if an address matches the filter 873 """ 874 return bool(self.address_filter.match(address)) 875 876 @AsyncRunner.run_in_task() 877 # pylint: disable=invalid-overridden-method 878 async def on_connection(self, connection): 879 self.app.connected_peer = Peer(connection) 880 self.app.connection_rssi = None 881 self.app.append_to_output(f'connected to {self.app.connected_peer}') 882 connection.listener = self 883 884 def on_disconnection(self, reason): 885 self.app.append_to_output( 886 f'disconnected from {self.app.connected_peer}, ' 887 f'reason: {HCI_Constant.error_name(reason)}' 888 ) 889 self.app.connected_peer = None 890 self.app.connection_rssi = None 891 892 def on_connection_parameters_update(self): 893 self.app.append_to_output( 894 f'connection parameters update: ' 895 f'{self.app.connected_peer.connection.parameters}' 896 ) 897 898 def on_connection_phy_update(self): 899 self.app.append_to_output( 900 f'connection phy update: {self.app.connected_peer.connection.phy}' 901 ) 902 903 def on_connection_att_mtu_update(self): 904 self.app.append_to_output( 905 f'connection att mtu update: {self.app.connected_peer.connection.att_mtu}' 906 ) 907 908 def on_connection_encryption_change(self): 909 encryption_state = ( 910 'encrypted' 911 if self.app.connected_peer.connection.is_encrypted 912 else 'not encrypted' 913 ) 914 self.app.append_to_output( 915 'connection encryption change: ' f'{encryption_state}' 916 ) 917 918 def on_connection_data_length_change(self): 919 self.app.append_to_output( 920 'connection data length change: ' 921 f'{self.app.connected_peer.connection.data_length}' 922 ) 923 924 def on_advertisement(self, advertisement): 925 if not self.filter_address_match(str(advertisement.address)): 926 return 927 928 entry_key = f'{advertisement.address}/{advertisement.address.address_type}' 929 entry = self.scan_results.get(entry_key) 930 if entry: 931 entry.ad_data = advertisement.data 932 entry.rssi = advertisement.rssi 933 entry.connectable = advertisement.is_connectable 934 else: 935 self.app.add_known_address(str(advertisement.address)) 936 self.scan_results[entry_key] = ScanResult( 937 advertisement.address, 938 advertisement.address.address_type, 939 advertisement.data, 940 advertisement.rssi, 941 advertisement.is_connectable, 942 ) 943 944 self.app.show_scan_results(self.scan_results) 945 946 947# ----------------------------------------------------------------------------- 948# Scanning 949# ----------------------------------------------------------------------------- 950class ScanResult: 951 def __init__(self, address, address_type, ad_data, rssi, connectable): 952 self.address = address 953 self.address_type = address_type 954 self.ad_data = ad_data 955 self.rssi = rssi 956 self.connectable = connectable 957 958 def to_display_string(self): 959 address_type_string = ('P', 'R', 'PI', 'RI')[self.address_type] 960 address_color = colors.yellow if self.connectable else colors.red 961 if address_type_string.startswith('P'): 962 type_color = colors.green 963 else: 964 type_color = colors.cyan 965 966 name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True) 967 if name is None: 968 name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True) 969 if name: 970 # Convert to string 971 try: 972 name = name.decode() 973 except UnicodeDecodeError: 974 name = name.hex() 975 else: 976 name = '' 977 978 # Remove any '/P' qualifier suffix from the address string 979 address_str = str(self.address).replace('/P', '') 980 981 # RSSI bar 982 bar_string = rssi_bar(self.rssi) 983 bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string)) 984 return ( 985 f'{address_color(address_str)} [{type_color(address_type_string)}] ' 986 f'{bar_string} {bar_padding} {name}' 987 ) 988 989 990# ----------------------------------------------------------------------------- 991# Logging 992# ----------------------------------------------------------------------------- 993class LogHandler(logging.Handler): 994 def __init__(self, app): 995 super().__init__() 996 self.app = app 997 self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s')) 998 999 def emit(self, record): 1000 message = self.format(record) 1001 self.app.append_to_log(message) 1002 1003 1004# ----------------------------------------------------------------------------- 1005# Main 1006# ----------------------------------------------------------------------------- 1007@click.command() 1008@click.option('--device-config', help='Device configuration file') 1009@click.argument('transport') 1010def main(device_config, transport): 1011 # Ensure that the BUMBLE_USER_DIR directory exists 1012 if not os.path.isdir(BUMBLE_USER_DIR): 1013 os.mkdir(BUMBLE_USER_DIR) 1014 1015 # Create an instance of the app 1016 app = ConsoleApp() 1017 1018 # Setup logging 1019 # logging.basicConfig(level = 'FATAL') 1020 # logging.basicConfig(level = 'DEBUG') 1021 root_logger = logging.getLogger() 1022 1023 root_logger.addHandler(LogHandler(app)) 1024 root_logger.setLevel(logging.DEBUG) 1025 1026 # Run until the user exits 1027 asyncio.run(app.run_async(device_config, transport)) 1028 1029 1030# ----------------------------------------------------------------------------- 1031if __name__ == "__main__": 1032 main() # pylint: disable=no-value-for-parameter 1033