• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Bumble Tool
17# -----------------------------------------------------------------------------
18
19# -----------------------------------------------------------------------------
20# Imports
21# -----------------------------------------------------------------------------
22import asyncio
23import logging
24import os
25import random
26import re
27import humanize
28from typing import Optional, Union
29from collections import OrderedDict
30
31import click
32from prettytable import PrettyTable
33
34from prompt_toolkit import Application
35from prompt_toolkit.history import FileHistory
36from prompt_toolkit.completion import Completer, Completion, NestedCompleter
37from prompt_toolkit.key_binding import KeyBindings
38from prompt_toolkit.formatted_text import ANSI
39from prompt_toolkit.styles import Style
40from prompt_toolkit.filters import Condition
41from prompt_toolkit.widgets import TextArea, Frame
42from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
43from prompt_toolkit.data_structures import Point
44from prompt_toolkit.layout import (
45    Layout,
46    HSplit,
47    Window,
48    CompletionsMenu,
49    Float,
50    FormattedTextControl,
51    FloatContainer,
52    ConditionalContainer,
53    Dimension,
54)
55
56from bumble import __version__
57import bumble.core
58from bumble import colors
59from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
60from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
61from bumble.utils import AsyncRunner
62from bumble.transport import open_transport_or_link
63from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
64from bumble.gatt_client import CharacteristicProxy
65from bumble.hci import (
66    HCI_Constant,
67    HCI_LE_1M_PHY,
68    HCI_LE_2M_PHY,
69    HCI_LE_CODED_PHY,
70)
71
72
73# -----------------------------------------------------------------------------
74# Constants
75# -----------------------------------------------------------------------------
76BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
77DEFAULT_RSSI_BAR_WIDTH = 20
78DEFAULT_CONNECTION_TIMEOUT = 30.0
79DISPLAY_MIN_RSSI = -100
80DISPLAY_MAX_RSSI = -30
81RSSI_MONITOR_INTERVAL = 5.0  # Seconds
82
83
84# -----------------------------------------------------------------------------
85# Utils
86# -----------------------------------------------------------------------------
87
88
89def le_phy_name(phy_id):
90    return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get(
91        phy_id, HCI_Constant.le_phy_name(phy_id)
92    )
93
94
95def rssi_bar(rssi):
96    blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
97    bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
98    bar_width = min(max(bar_width, 0), 1)
99    bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
100    bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
101    return f'{rssi:4} {bar_blocks}'
102
103
104def parse_phys(phys):
105    if phys.lower() == '*':
106        return None
107
108    phy_list = []
109    elements = phys.lower().split(',')
110    for element in elements:
111        if element == '1m':
112            phy_list.append(HCI_LE_1M_PHY)
113        elif element == '2m':
114            phy_list.append(HCI_LE_2M_PHY)
115        elif element == 'coded':
116            phy_list.append(HCI_LE_CODED_PHY)
117        else:
118            raise ValueError('invalid PHY name')
119    return phy_list
120
121
122# -----------------------------------------------------------------------------
123# Console App
124# -----------------------------------------------------------------------------
125class ConsoleApp:
126    connected_peer: Optional[Peer]
127
128    def __init__(self):
129        self.known_addresses = set()
130        self.known_remote_attributes = []
131        self.known_local_attributes = []
132        self.device = None
133        self.connected_peer = None
134        self.top_tab = 'device'
135        self.monitor_rssi = False
136        self.connection_rssi = None
137
138        style = Style.from_dict(
139            {
140                'output-field': 'bg:#000044 #ffffff',
141                'input-field': 'bg:#000000 #ffffff',
142                'line': '#004400',
143                'error': 'fg:ansired',
144            }
145        )
146
147        class LiveCompleter(Completer):
148            def __init__(self, words):
149                self.words = words
150
151            def get_completions(self, document, complete_event):
152                prefix = document.text_before_cursor.upper()
153                for word in [x for x in self.words if x.upper().startswith(prefix)]:
154                    yield Completion(word, start_position=-len(prefix))
155
156        def make_completer():
157            return NestedCompleter.from_nested_dict(
158                {
159                    'scan': {'on': None, 'off': None, 'clear': None},
160                    'advertise': {'on': None, 'off': None},
161                    'rssi': {'on': None, 'off': None},
162                    'show': {
163                        'scan': None,
164                        'log': None,
165                        'device': None,
166                        'local-services': None,
167                        'remote-services': None,
168                        'local-values': None,
169                        'remote-values': None,
170                    },
171                    'filter': {
172                        'address': None,
173                    },
174                    'connect': LiveCompleter(self.known_addresses),
175                    'update-parameters': None,
176                    'encrypt': None,
177                    'disconnect': None,
178                    'discover': {'services': None, 'attributes': None},
179                    'request-mtu': None,
180                    'read': LiveCompleter(self.known_remote_attributes),
181                    'write': LiveCompleter(self.known_remote_attributes),
182                    'local-write': LiveCompleter(self.known_local_attributes),
183                    'subscribe': LiveCompleter(self.known_remote_attributes),
184                    'unsubscribe': LiveCompleter(self.known_remote_attributes),
185                    'set-phy': {'1m': None, '2m': None, 'coded': None},
186                    'set-default-phy': None,
187                    'quit': None,
188                    'exit': None,
189                }
190            )
191
192        self.input_field = TextArea(
193            height=1,
194            prompt="> ",
195            multiline=False,
196            wrap_lines=False,
197            completer=make_completer(),
198            history=FileHistory(os.path.join(BUMBLE_USER_DIR, 'history')),
199        )
200
201        self.input_field.accept_handler = self.accept_input
202
203        self.output_height = Dimension(min=7, max=7, weight=1)
204        self.output_lines = []
205        self.output = FormattedTextControl(
206            get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1))
207        )
208        self.output_max_lines = 20
209        self.scan_results_text = FormattedTextControl()
210        self.local_services_text = FormattedTextControl()
211        self.remote_services_text = FormattedTextControl()
212        self.device_text = FormattedTextControl()
213        self.log_text = FormattedTextControl(
214            get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
215        )
216        self.local_values_text = FormattedTextControl()
217        self.remote_values_text = FormattedTextControl()
218        self.log_height = Dimension(min=7, weight=4)
219        self.log_max_lines = 100
220        self.log_lines = []
221
222        container = HSplit(
223            [
224                ConditionalContainer(
225                    Frame(Window(self.scan_results_text), title='Scan Results'),
226                    filter=Condition(lambda: self.top_tab == 'scan'),
227                ),
228                ConditionalContainer(
229                    Frame(Window(self.local_services_text), title='Local Services'),
230                    filter=Condition(lambda: self.top_tab == 'local-services'),
231                ),
232                ConditionalContainer(
233                    Frame(Window(self.local_values_text), title='Local Values'),
234                    filter=Condition(lambda: self.top_tab == 'local-values'),
235                ),
236                ConditionalContainer(
237                    Frame(Window(self.remote_services_text), title='Remote Services'),
238                    filter=Condition(lambda: self.top_tab == 'remote-services'),
239                ),
240                ConditionalContainer(
241                    Frame(Window(self.remote_values_text), title='Remote Values'),
242                    filter=Condition(lambda: self.top_tab == 'remote-values'),
243                ),
244                ConditionalContainer(
245                    Frame(Window(self.log_text, height=self.log_height), title='Log'),
246                    filter=Condition(lambda: self.top_tab == 'log'),
247                ),
248                ConditionalContainer(
249                    Frame(Window(self.device_text), title='Device'),
250                    filter=Condition(lambda: self.top_tab == 'device'),
251                ),
252                Frame(Window(self.output, height=self.output_height)),
253                FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
254                self.input_field,
255            ]
256        )
257
258        container = FloatContainer(
259            container,
260            floats=[
261                Float(
262                    xcursor=True,
263                    ycursor=True,
264                    content=CompletionsMenu(max_height=16, scroll_offset=1),
265                ),
266            ],
267        )
268
269        layout = Layout(container, focused_element=self.input_field)
270
271        key_bindings = KeyBindings()
272
273        @key_bindings.add("c-c")
274        @key_bindings.add("c-q")
275        def _(event):
276            event.app.exit()
277
278        # pylint: disable=invalid-name
279        self.ui = Application(
280            layout=layout, style=style, key_bindings=key_bindings, full_screen=True
281        )
282
283    async def run_async(self, device_config, transport):
284        rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
285
286        async with await open_transport_or_link(transport) as (hci_source, hci_sink):
287            if device_config:
288                self.device = Device.from_config_file_with_hci(
289                    device_config, hci_source, hci_sink
290                )
291            else:
292                random_address = (
293                    f"{random.randint(192,255):02X}"  # address is static random
294                )
295                for random_byte in random.sample(range(255), 5):
296                    random_address += f":{random_byte:02X}"
297                self.append_to_log(f"Setting random address: {random_address}")
298                self.device = Device.with_hci(
299                    'Bumble', random_address, hci_source, hci_sink
300                )
301            self.device.listener = DeviceListener(self)
302            await self.device.power_on()
303            self.show_device(self.device)
304            self.show_local_services(self.device.gatt_server.attributes)
305
306            # Run the UI
307            await self.ui.run_async()
308
309        rssi_monitoring_task.cancel()
310
311    def add_known_address(self, address):
312        self.known_addresses.add(address)
313
314    def accept_input(self, _):
315        if len(self.input_field.text) == 0:
316            return
317        self.append_to_output([('', '* '), ('ansicyan', self.input_field.text)], False)
318        self.ui.create_background_task(self.command(self.input_field.text))
319
320    def get_status_bar_text(self):
321        scanning = "ON" if self.device and self.device.is_scanning else "OFF"
322
323        connection_state = 'NONE'
324        encryption_state = ''
325        att_mtu = ''
326        rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
327
328        if self.device:
329            if self.device.is_le_connecting:
330                connection_state = 'CONNECTING'
331            elif self.connected_peer:
332                connection = self.connected_peer.connection
333                connection_parameters = (
334                    f'{connection.parameters.connection_interval}/'
335                    f'{connection.parameters.peripheral_latency}/'
336                    f'{connection.parameters.supervision_timeout}'
337                )
338                if connection.transport == BT_LE_TRANSPORT:
339                    phy_state = (
340                        f' RX={le_phy_name(connection.phy.rx_phy)}/'
341                        f'TX={le_phy_name(connection.phy.tx_phy)}'
342                    )
343                else:
344                    phy_state = ''
345                connection_state = (
346                    f'{connection.peer_address} '
347                    f'{connection_parameters} '
348                    f'{connection.data_length}'
349                    f'{phy_state}'
350                )
351                encryption_state = (
352                    'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
353                )
354                att_mtu = f'ATT_MTU: {connection.att_mtu}'
355
356        return [
357            ('ansigreen', f' SCAN: {scanning} '),
358            ('', '  '),
359            ('ansiblue', f' CONNECTION: {connection_state} '),
360            ('', '  '),
361            ('ansimagenta', f' {encryption_state} '),
362            ('', '  '),
363            ('ansicyan', f' {att_mtu} '),
364            ('', '  '),
365            ('ansiyellow', f' {rssi} '),
366        ]
367
368    def show_error(self, title, details=None):
369        appended = [('class:error', title)]
370        if details:
371            appended.append(('', f' {details}'))
372        self.append_to_output(appended)
373
374    def show_scan_results(self, scan_results):
375        max_lines = 40  # TEMP
376        lines = []
377        keys = list(scan_results.keys())[:max_lines]
378        for key in keys:
379            lines.append(scan_results[key].to_display_string())
380        self.scan_results_text.text = ANSI('\n'.join(lines))
381        self.ui.invalidate()
382
383    def show_remote_services(self, services):
384        lines = []
385        del self.known_remote_attributes[:]
386        for service in services:
387            lines.append(("ansicyan", f"{service}\n"))
388
389            for characteristic in service.characteristics:
390                lines.append(('ansimagenta', f'  {characteristic} + \n'))
391                self.known_remote_attributes.append(
392                    f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
393                )
394                self.known_remote_attributes.append(
395                    f'*.{characteristic.uuid.to_hex_str()}'
396                )
397                self.known_remote_attributes.append(f'#{characteristic.handle:X}')
398                for descriptor in characteristic.descriptors:
399                    lines.append(("ansigreen", f"    {descriptor}\n"))
400
401        self.remote_services_text.text = lines
402        self.ui.invalidate()
403
404    def show_local_services(self, attributes):
405        lines = []
406        del self.known_local_attributes[:]
407        for attribute in attributes:
408            if isinstance(attribute, Service):
409                # Save the most recent service for use later
410                service = attribute
411                lines.append(("ansicyan", f"{attribute}\n"))
412            elif isinstance(attribute, Characteristic):
413                # CharacteristicDeclaration includes all info from Characteristic
414                # no need to print it twice
415                continue
416            elif isinstance(attribute, CharacteristicDeclaration):
417                # Save the most recent characteristic declaration for use later
418                characteristic_declaration = attribute
419                self.known_local_attributes.append(
420                    f'{service.uuid.to_hex_str()}.{attribute.characteristic.uuid.to_hex_str()}'
421                )
422                self.known_local_attributes.append(
423                    f'#{attribute.characteristic.handle:X}'
424                )
425                lines.append(("ansimagenta", f"  {attribute}\n"))
426            elif isinstance(attribute, Descriptor):
427                self.known_local_attributes.append(
428                    f'{service.uuid.to_hex_str()}.{characteristic_declaration.characteristic.uuid.to_hex_str()}.{attribute.type.to_hex_str()}'
429                )
430                self.known_local_attributes.append(f'#{attribute.handle:X}')
431                lines.append(("ansigreen", f"    {attribute}\n"))
432            else:
433                lines.append(("ansiyellow", f"{attribute}\n"))
434
435        self.local_services_text.text = lines
436        self.ui.invalidate()
437
438    def show_device(self, device):
439        lines = []
440
441        lines.append(('ansicyan', 'Bumble Version:       '))
442        lines.append(('', f'{__version__}\n'))
443        lines.append(('ansicyan', 'Name:                 '))
444        lines.append(('', f'{device.name}\n'))
445        lines.append(('ansicyan', 'Public Address:       '))
446        lines.append(('', f'{device.public_address}\n'))
447        lines.append(('ansicyan', 'Random Address:       '))
448        lines.append(('', f'{device.random_address}\n'))
449        lines.append(('ansicyan', 'LE Enabled:           '))
450        lines.append(('', f'{device.le_enabled}\n'))
451        lines.append(('ansicyan', 'Classic Enabled:      '))
452        lines.append(('', f'{device.classic_enabled}\n'))
453        lines.append(('ansicyan', 'Classic SC Enabled:   '))
454        lines.append(('', f'{device.classic_sc_enabled}\n'))
455        lines.append(('ansicyan', 'Classic SSP Enabled:  '))
456        lines.append(('', f'{device.classic_ssp_enabled}\n'))
457        lines.append(('ansicyan', 'Classic Class:        '))
458        lines.append(('', f'{device.class_of_device}\n'))
459        lines.append(('ansicyan', 'Discoverable:         '))
460        lines.append(('', f'{device.discoverable}\n'))
461        lines.append(('ansicyan', 'Connectable:          '))
462        lines.append(('', f'{device.connectable}\n'))
463        lines.append(('ansicyan', 'Advertising Data:     '))
464        lines.append(('', f'{device.advertising_data}\n'))
465        lines.append(('ansicyan', 'Scan Response Data:   '))
466        lines.append(('', f'{device.scan_response_data}\n'))
467        advertising_interval = (
468            device.advertising_interval_min
469            if device.advertising_interval_min == device.advertising_interval_max
470            else (
471                f'{device.advertising_interval_min} to '
472                f'{device.advertising_interval_max}'
473            )
474        )
475        lines.append(('ansicyan', 'Advertising Interval: '))
476        lines.append(('', f'{advertising_interval}\n'))
477
478        self.device_text.text = lines
479        self.ui.invalidate()
480
481    def append_to_output(self, line, invalidate=True):
482        if isinstance(line, str):
483            line = [('', line)]
484        self.output_lines = self.output_lines[-self.output_max_lines :]
485        self.output_lines.append(line)
486        formatted_text = []
487        for line in self.output_lines:
488            formatted_text += line
489            formatted_text.append(('', '\n'))
490        self.output.text = formatted_text
491        if invalidate:
492            self.ui.invalidate()
493
494    def append_to_log(self, lines, invalidate=True):
495        self.log_lines.extend(lines.split('\n'))
496        self.log_lines = self.log_lines[-self.log_max_lines :]
497        self.log_text.text = ANSI('\n'.join(self.log_lines))
498        if invalidate:
499            self.ui.invalidate()
500
501    async def discover_services(self):
502        if not self.connected_peer:
503            self.show_error('not connected')
504            return
505
506        # Discover all services, characteristics and descriptors
507        self.append_to_output('discovering services...')
508        await self.connected_peer.discover_services()
509        self.append_to_output(
510            f'found {len(self.connected_peer.services)} services,'
511            ' discovering characteristics...'
512        )
513        await self.connected_peer.discover_characteristics()
514        self.append_to_output('found characteristics, discovering descriptors...')
515        for service in self.connected_peer.services:
516            for characteristic in service.characteristics:
517                await self.connected_peer.discover_descriptors(characteristic)
518        self.append_to_output('discovery completed')
519
520        self.show_remote_services(self.connected_peer.services)
521
522    async def discover_attributes(self):
523        if not self.connected_peer:
524            self.show_error('not connected')
525            return
526
527        # Discover all attributes
528        self.append_to_output('discovering attributes...')
529        attributes = await self.connected_peer.discover_attributes()
530        self.append_to_output(f'discovered {len(attributes)} attributes...')
531
532        self.show_attributes(attributes)
533
534    def find_remote_characteristic(self, param) -> Optional[CharacteristicProxy]:
535        if not self.connected_peer:
536            return None
537        parts = param.split('.')
538        if len(parts) == 2:
539            service_uuid = UUID(parts[0]) if parts[0] != '*' else None
540            characteristic_uuid = UUID(parts[1])
541            for service in self.connected_peer.services:
542                if service_uuid is None or service.uuid == service_uuid:
543                    for characteristic in service.characteristics:
544                        if characteristic.uuid == characteristic_uuid:
545                            return characteristic
546        elif len(parts) == 1:
547            if parts[0].startswith('#'):
548                attribute_handle = int(f'{parts[0][1:]}', 16)
549                for service in self.connected_peer.services:
550                    for characteristic in service.characteristics:
551                        if characteristic.handle == attribute_handle:
552                            return characteristic
553
554        return None
555
556    def find_local_attribute(
557        self, param
558    ) -> Optional[Union[Characteristic, Descriptor]]:
559        parts = param.split('.')
560        if len(parts) == 3:
561            service_uuid = UUID(parts[0])
562            characteristic_uuid = UUID(parts[1])
563            descriptor_uuid = UUID(parts[2])
564            return self.device.gatt_server.get_descriptor_attribute(
565                service_uuid, characteristic_uuid, descriptor_uuid
566            )
567        if len(parts) == 2:
568            service_uuid = UUID(parts[0])
569            characteristic_uuid = UUID(parts[1])
570            characteristic_attributes = (
571                self.device.gatt_server.get_characteristic_attributes(
572                    service_uuid, characteristic_uuid
573                )
574            )
575            if characteristic_attributes:
576                return characteristic_attributes[1]
577            return None
578        elif len(parts) == 1:
579            if parts[0].startswith('#'):
580                attribute_handle = int(f'{parts[0][1:]}', 16)
581                attribute = self.device.gatt_server.get_attribute(attribute_handle)
582                if isinstance(attribute, (Characteristic, Descriptor)):
583                    return attribute
584                return None
585
586        return None
587
588    async def rssi_monitor_loop(self):
589        while True:
590            if self.monitor_rssi and self.connected_peer:
591                self.connection_rssi = await self.connected_peer.connection.get_rssi()
592            await asyncio.sleep(RSSI_MONITOR_INTERVAL)
593
594    async def command(self, command):
595        try:
596            (keyword, *params) = command.strip().split(' ')
597            keyword = keyword.replace('-', '_').lower()
598            handler = getattr(self, f'do_{keyword}', None)
599            if handler:
600                await handler(params)
601                self.ui.invalidate()
602            else:
603                self.show_error('unknown command', keyword)
604        except Exception as error:
605            self.show_error(str(error))
606
607    async def do_scan(self, params):
608        if len(params) == 0:
609            # Toggle scanning
610            if self.device.is_scanning:
611                await self.device.stop_scanning()
612            else:
613                await self.device.start_scanning()
614        elif params[0] == 'on':
615            if len(params) == 2:
616                if not params[1].startswith("filter="):
617                    self.show_error(
618                        'invalid syntax',
619                        'expected address filter=key1:value1,key2:value,... '
620                        'available filters: address',
621                    )
622                # regex: (word):(any char except ,)
623                matches = re.findall(r"(\w+):([^,]+)", params[1])
624                for match in matches:
625                    if match[0] == "address":
626                        self.device.listener.address_filter = match[1]
627
628            await self.device.start_scanning()
629            self.top_tab = 'scan'
630        elif params[0] == 'off':
631            await self.device.stop_scanning()
632        elif params[0] == 'clear':
633            self.device.listener.scan_results.clear()
634            self.known_addresses.clear()
635            self.show_scan_results(self.device.listener.scan_results)
636        else:
637            self.show_error('unsupported arguments for scan command')
638
639    async def do_rssi(self, params):
640        if len(params) == 0:
641            # Toggle monitoring
642            self.monitor_rssi = not self.monitor_rssi
643        elif params[0] == 'on':
644            self.monitor_rssi = True
645        elif params[0] == 'off':
646            self.monitor_rssi = False
647        else:
648            self.show_error('unsupported arguments for rssi command')
649
650    async def do_connect(self, params):
651        if len(params) != 1 and len(params) != 2:
652            self.show_error('invalid syntax', 'expected connect <address> [phys]')
653            return
654
655        if len(params) == 1:
656            phys = None
657        else:
658            phys = parse_phys(params[1])
659        if phys is None:
660            connection_parameters_preferences = None
661        else:
662            connection_parameters_preferences = {
663                phy: ConnectionParametersPreferences() for phy in phys
664            }
665
666        if self.device.is_scanning:
667            await self.device.stop_scanning()
668
669        self.append_to_output('connecting...')
670
671        try:
672            await self.device.connect(
673                params[0],
674                connection_parameters_preferences=connection_parameters_preferences,
675                timeout=DEFAULT_CONNECTION_TIMEOUT,
676            )
677            self.top_tab = 'services'
678        except bumble.core.TimeoutError:
679            self.show_error('connection timed out')
680
681    async def do_disconnect(self, _):
682        if self.device.is_le_connecting:
683            await self.device.cancel_connection()
684        else:
685            if not self.connected_peer:
686                self.show_error('not connected')
687                return
688
689            await self.connected_peer.connection.disconnect()
690
691    async def do_update_parameters(self, params):
692        if len(params) != 1 or len(params[0].split('/')) != 3:
693            self.show_error(
694                'invalid syntax',
695                'expected update-parameters <interval-min>-<interval-max>'
696                '/<max-latency>/<supervision>',
697            )
698            return
699
700        if not self.connected_peer:
701            self.show_error('not connected')
702            return
703
704        connection_intervals, max_latency, supervision_timeout = params[0].split('/')
705        connection_interval_min, connection_interval_max = [
706            int(x) for x in connection_intervals.split('-')
707        ]
708        max_latency = int(max_latency)
709        supervision_timeout = int(supervision_timeout)
710        await self.connected_peer.connection.update_parameters(
711            connection_interval_min,
712            connection_interval_max,
713            max_latency,
714            supervision_timeout,
715        )
716
717    async def do_encrypt(self, _):
718        if not self.connected_peer:
719            self.show_error('not connected')
720            return
721
722        await self.connected_peer.connection.encrypt()
723
724    async def do_advertise(self, params):
725        if len(params) == 0:
726            # Toggle advertising
727            if self.device.is_advertising:
728                await self.device.stop_advertising()
729            else:
730                await self.device.start_advertising()
731        elif params[0] == 'on':
732            await self.device.start_advertising()
733        elif params[0] == 'off':
734            await self.device.stop_advertising()
735        else:
736            self.show_error('unsupported arguments for advertise command')
737
738    async def do_show(self, params):
739        if params:
740            if params[0] in {
741                'scan',
742                'log',
743                'device',
744                'local-services',
745                'remote-services',
746                'local-values',
747                'remote-values',
748            }:
749                self.top_tab = params[0]
750                self.ui.invalidate()
751
752        while self.top_tab == 'local-values':
753            await self.do_show_local_values()
754            await asyncio.sleep(1)
755
756        while self.top_tab == 'remote-values':
757            await self.do_show_remote_values()
758            await asyncio.sleep(1)
759
760    async def do_show_local_values(self):
761        prettytable = PrettyTable()
762        field_names = ["Service", "Characteristic", "Descriptor"]
763
764        # if there's no connections, add a column just for value
765        if not self.device.connections:
766            field_names.append("Value")
767
768        # if there are connections, add a column for each connection's value
769        for connection in self.device.connections.values():
770            field_names.append(f"Connection {connection.handle}")
771
772        for attribute in self.device.gatt_server.attributes:
773            if isinstance(attribute, Characteristic):
774                service = self.device.gatt_server.get_attribute_group(
775                    attribute.handle, Service
776                )
777                if not service:
778                    continue
779                values = [
780                    await attribute.read_value(connection)
781                    for connection in self.device.connections.values()
782                ]
783                if not values:
784                    values = [attribute.read_value(None)]
785                prettytable.add_row([f"{service.uuid}", attribute.uuid, ""] + values)
786
787            elif isinstance(attribute, Descriptor):
788                service = self.device.gatt_server.get_attribute_group(
789                    attribute.handle, Service
790                )
791                if not service:
792                    continue
793                characteristic = self.device.gatt_server.get_attribute_group(
794                    attribute.handle, Characteristic
795                )
796                if not characteristic:
797                    continue
798                values = [
799                    await attribute.read_value(connection)
800                    for connection in self.device.connections.values()
801                ]
802                if not values:
803                    values = [await attribute.read_value(None)]
804
805                # TODO: future optimization: convert CCCD value to human readable string
806
807                prettytable.add_row(
808                    [service.uuid, characteristic.uuid, attribute.type] + values
809                )
810
811        prettytable.field_names = field_names
812        self.local_values_text.text = prettytable.get_string()
813        self.ui.invalidate()
814
815    async def do_show_remote_values(self):
816        prettytable = PrettyTable(
817            field_names=[
818                "Connection",
819                "Service",
820                "Characteristic",
821                "Descriptor",
822                "Time",
823                "Value",
824            ]
825        )
826        for connection in self.device.connections.values():
827            for handle, (time, value) in connection.gatt_client.cached_values.items():
828                row = [connection.handle]
829                attribute = connection.gatt_client.get_attributes(handle)
830                if not attribute:
831                    continue
832                if len(attribute) == 3:
833                    row.extend(
834                        [attribute[0].uuid, attribute[1].uuid, attribute[2].type]
835                    )
836                elif len(attribute) == 2:
837                    row.extend([attribute[0].uuid, attribute[1].uuid, ""])
838                elif len(attribute) == 1:
839                    row.extend([attribute[0].uuid, "", ""])
840                else:
841                    continue
842
843                row.extend([humanize.naturaltime(time), value])
844                prettytable.add_row(row)
845
846        self.remote_values_text.text = prettytable.get_string()
847        self.ui.invalidate()
848
849    async def do_get_phy(self, _):
850        if not self.connected_peer:
851            self.show_error('not connected')
852            return
853
854        phy = await self.connected_peer.connection.get_phy()
855        self.append_to_output(
856            f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, '
857            f'TX={HCI_Constant.le_phy_name(phy[1])}'
858        )
859
860    async def do_request_mtu(self, params):
861        if len(params) != 1:
862            self.show_error('invalid syntax', 'expected request-mtu <mtu>')
863            return
864
865        if not self.connected_peer:
866            self.show_error('not connected')
867            return
868
869        await self.connected_peer.request_mtu(int(params[0]))
870
871    async def do_discover(self, params):
872        if not params:
873            self.show_error('invalid syntax', 'expected discover services|attributes')
874            return
875
876        discovery_type = params[0]
877        if discovery_type == 'services':
878            await self.discover_services()
879        elif discovery_type == 'attributes':
880            await self.discover_attributes()
881
882    async def do_read(self, params):
883        if len(params) != 1:
884            self.show_error('invalid syntax', 'expected read <attribute>')
885            return
886
887        if not self.connected_peer:
888            self.show_error('not connected')
889            return
890
891        characteristic = self.find_remote_characteristic(params[0])
892        if characteristic is None:
893            self.show_error('no such characteristic')
894            return
895
896        value = await characteristic.read_value()
897        self.append_to_output(f'VALUE: 0x{value.hex()}')
898
899    async def do_write(self, params):
900        if not self.connected_peer:
901            self.show_error('not connected')
902            return
903
904        if len(params) != 2:
905            self.show_error('invalid syntax', 'expected write <attribute> <value>')
906            return
907
908        if params[1].upper().startswith("0X"):
909            value = bytes.fromhex(params[1][2:])  # parse as hex string
910        else:
911            try:
912                value = int(params[1])  # try as integer
913            except ValueError:
914                value = str.encode(params[1])  # must be a string
915
916        characteristic = self.find_remote_characteristic(params[0])
917        if characteristic is None:
918            self.show_error('no such characteristic')
919            return
920
921        # use write with response if supported
922        with_response = characteristic.properties & Characteristic.Properties.WRITE
923        await characteristic.write_value(value, with_response=with_response)
924
925    async def do_local_write(self, params):
926        if len(params) != 2:
927            self.show_error(
928                'invalid syntax', 'expected local-write <attribute> <value>'
929            )
930            return
931
932        if params[1].upper().startswith("0X"):
933            value = bytes.fromhex(params[1][2:])  # parse as hex string
934        else:
935            try:
936                value = int(params[1]).to_bytes(2, "little")  # try as 2 byte integer
937            except ValueError:
938                value = str.encode(params[1])  # must be a string
939
940        attribute = self.find_local_attribute(params[0])
941        if not attribute:
942            self.show_error('invalid syntax', 'unable to find attribute')
943            return
944
945        # send data to any subscribers
946        if isinstance(attribute, Characteristic):
947            await attribute.write_value(None, value)
948            if attribute.has_properties(Characteristic.NOTIFY):
949                await self.device.gatt_server.notify_subscribers(attribute)
950            if attribute.has_properties(Characteristic.INDICATE):
951                await self.device.gatt_server.indicate_subscribers(attribute)
952
953    async def do_subscribe(self, params):
954        if not self.connected_peer:
955            self.show_error('not connected')
956            return
957
958        if len(params) != 1:
959            self.show_error('invalid syntax', 'expected subscribe <attribute>')
960            return
961
962        characteristic = self.find_remote_characteristic(params[0])
963        if characteristic is None:
964            self.show_error('no such characteristic')
965            return
966
967        await characteristic.subscribe(
968            lambda value: self.append_to_output(
969                f"{characteristic} VALUE: 0x{value.hex()}"
970            ),
971        )
972
973    async def do_unsubscribe(self, params):
974        if not self.connected_peer:
975            self.show_error('not connected')
976            return
977
978        if len(params) != 1:
979            self.show_error('invalid syntax', 'expected subscribe <attribute>')
980            return
981
982        characteristic = self.find_remote_characteristic(params[0])
983        if characteristic is None:
984            self.show_error('no such characteristic')
985            return
986
987        await characteristic.unsubscribe()
988
989    async def do_set_phy(self, params):
990        if len(params) != 1:
991            self.show_error(
992                'invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>'
993            )
994            return
995
996        if not self.connected_peer:
997            self.show_error('not connected')
998            return
999
1000        if '/' in params[0]:
1001            tx_phys, rx_phys = params[0].split('/')
1002        else:
1003            tx_phys = params[0]
1004            rx_phys = tx_phys
1005
1006        await self.connected_peer.connection.set_phy(
1007            tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys)
1008        )
1009
1010    async def do_set_default_phy(self, params):
1011        if len(params) != 1:
1012            self.show_error(
1013                'invalid syntax',
1014                'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>',
1015            )
1016            return
1017
1018        if '/' in params[0]:
1019            tx_phys, rx_phys = params[0].split('/')
1020        else:
1021            tx_phys = params[0]
1022            rx_phys = tx_phys
1023
1024        await self.device.set_default_phy(
1025            tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys)
1026        )
1027
1028    async def do_exit(self, _):
1029        self.ui.exit()
1030
1031    async def do_quit(self, _):
1032        self.ui.exit()
1033
1034    async def do_filter(self, params):
1035        if params[0] == "address":
1036            if len(params) != 2:
1037                self.show_error('invalid syntax', 'expected filter address <pattern>')
1038                return
1039            self.device.listener.address_filter = params[1]
1040
1041
1042# -----------------------------------------------------------------------------
1043# Device and Connection Listener
1044# -----------------------------------------------------------------------------
1045class DeviceListener(Device.Listener, Connection.Listener):
1046    def __init__(self, app):
1047        self.app = app
1048        self.scan_results = OrderedDict()
1049        self.address_filter = None
1050
1051    @property
1052    def address_filter(self):
1053        return self._address_filter
1054
1055    @address_filter.setter
1056    def address_filter(self, filter_addr):
1057        if filter_addr is None:
1058            self._address_filter = re.compile(r".*")
1059        else:
1060            self._address_filter = re.compile(filter_addr)
1061        self.scan_results = OrderedDict(
1062            filter(self.filter_address_match, self.scan_results)
1063        )
1064        self.app.show_scan_results(self.scan_results)
1065
1066    def filter_address_match(self, address):
1067        """
1068        Returns true if an address matches the filter
1069        """
1070        return bool(self.address_filter.match(address))
1071
1072    @AsyncRunner.run_in_task()
1073    # pylint: disable=invalid-overridden-method
1074    async def on_connection(self, connection):
1075        self.app.connected_peer = Peer(connection)
1076        self.app.connection_rssi = None
1077        self.app.append_to_output(f'connected to {self.app.connected_peer}')
1078        connection.listener = self
1079
1080    def on_disconnection(self, reason):
1081        self.app.append_to_output(
1082            f'disconnected from {self.app.connected_peer}, '
1083            f'reason: {HCI_Constant.error_name(reason)}'
1084        )
1085        self.app.connected_peer = None
1086        self.app.connection_rssi = None
1087
1088    def on_connection_parameters_update(self):
1089        self.app.append_to_output(
1090            f'connection parameters update: '
1091            f'{self.app.connected_peer.connection.parameters}'
1092        )
1093
1094    def on_connection_phy_update(self):
1095        self.app.append_to_output(
1096            f'connection phy update: {self.app.connected_peer.connection.phy}'
1097        )
1098
1099    def on_connection_att_mtu_update(self):
1100        self.app.append_to_output(
1101            f'connection att mtu update: {self.app.connected_peer.connection.att_mtu}'
1102        )
1103
1104    def on_connection_encryption_change(self):
1105        encryption_state = (
1106            'encrypted'
1107            if self.app.connected_peer.connection.is_encrypted
1108            else 'not encrypted'
1109        )
1110        self.app.append_to_output(
1111            'connection encryption change: ' f'{encryption_state}'
1112        )
1113
1114    def on_connection_data_length_change(self):
1115        self.app.append_to_output(
1116            'connection data length change: '
1117            f'{self.app.connected_peer.connection.data_length}'
1118        )
1119
1120    def on_advertisement(self, advertisement):
1121        if not self.filter_address_match(str(advertisement.address)):
1122            return
1123
1124        entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
1125        entry = self.scan_results.get(entry_key)
1126        if entry:
1127            entry.ad_data = advertisement.data
1128            entry.rssi = advertisement.rssi
1129            entry.connectable = advertisement.is_connectable
1130        else:
1131            self.app.add_known_address(str(advertisement.address))
1132            self.scan_results[entry_key] = ScanResult(
1133                advertisement.address,
1134                advertisement.address.address_type,
1135                advertisement.data,
1136                advertisement.rssi,
1137                advertisement.is_connectable,
1138            )
1139
1140        self.app.show_scan_results(self.scan_results)
1141
1142
1143# -----------------------------------------------------------------------------
1144# Scanning
1145# -----------------------------------------------------------------------------
1146class ScanResult:
1147    def __init__(self, address, address_type, ad_data, rssi, connectable):
1148        self.address = address
1149        self.address_type = address_type
1150        self.ad_data = ad_data
1151        self.rssi = rssi
1152        self.connectable = connectable
1153
1154    def to_display_string(self):
1155        address_type_string = ('P', 'R', 'PI', 'RI')[self.address_type]
1156        address_color = colors.yellow if self.connectable else colors.red
1157        if address_type_string.startswith('P'):
1158            type_color = colors.green
1159        else:
1160            type_color = colors.cyan
1161
1162        name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
1163        if name is None:
1164            name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
1165        if name:
1166            # Convert to string
1167            try:
1168                name = name.decode()
1169            except UnicodeDecodeError:
1170                name = name.hex()
1171        else:
1172            name = ''
1173
1174        # Remove any '/P' qualifier suffix from the address string
1175        address_str = self.address.to_string(with_type_qualifier=False)
1176
1177        # RSSI bar
1178        bar_string = rssi_bar(self.rssi)
1179        bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
1180        return (
1181            f'{address_color(address_str)} [{type_color(address_type_string)}] '
1182            f'{bar_string} {bar_padding} {name}'
1183        )
1184
1185
1186# -----------------------------------------------------------------------------
1187# Logging
1188# -----------------------------------------------------------------------------
1189class LogHandler(logging.Handler):
1190    def __init__(self, app):
1191        super().__init__()
1192        self.app = app
1193        self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s'))
1194
1195    def emit(self, record):
1196        message = self.format(record)
1197        self.app.append_to_log(message)
1198
1199
1200# -----------------------------------------------------------------------------
1201# Main
1202# -----------------------------------------------------------------------------
1203@click.command()
1204@click.option('--device-config', help='Device configuration file')
1205@click.argument('transport')
1206def main(device_config, transport):
1207    # Ensure that the BUMBLE_USER_DIR directory exists
1208    if not os.path.isdir(BUMBLE_USER_DIR):
1209        os.mkdir(BUMBLE_USER_DIR)
1210
1211    # Create an instance of the app
1212    app = ConsoleApp()
1213
1214    # Setup logging
1215    # logging.basicConfig(level = 'FATAL')
1216    # logging.basicConfig(level = 'DEBUG')
1217    root_logger = logging.getLogger()
1218
1219    root_logger.addHandler(LogHandler(app))
1220    root_logger.setLevel(logging.DEBUG)
1221
1222    # Run until the user exits
1223    asyncio.run(app.run_async(device_config, transport))
1224
1225
1226# -----------------------------------------------------------------------------
1227if __name__ == "__main__":
1228    main()  # pylint: disable=no-value-for-parameter
1229