• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Bumble Tool
17# -----------------------------------------------------------------------------
18
19# -----------------------------------------------------------------------------
20# Imports
21# -----------------------------------------------------------------------------
22import asyncio
23import logging
24import os
25import random
26import re
27from typing import Optional
28from collections import OrderedDict
29
30import click
31
32from prompt_toolkit import Application
33from prompt_toolkit.history import FileHistory
34from prompt_toolkit.completion import Completer, Completion, NestedCompleter
35from prompt_toolkit.key_binding import KeyBindings
36from prompt_toolkit.formatted_text import ANSI
37from prompt_toolkit.styles import Style
38from prompt_toolkit.filters import Condition
39from prompt_toolkit.widgets import TextArea, Frame
40from prompt_toolkit.widgets.toolbars import FormattedTextToolbar
41from prompt_toolkit.data_structures import Point
42from prompt_toolkit.layout import (
43    Layout,
44    HSplit,
45    Window,
46    CompletionsMenu,
47    Float,
48    FormattedTextControl,
49    FloatContainer,
50    ConditionalContainer,
51    Dimension,
52)
53
54from bumble import __version__
55import bumble.core
56from bumble import colors
57from bumble.core import UUID, AdvertisingData, BT_LE_TRANSPORT
58from bumble.device import ConnectionParametersPreferences, Device, Connection, Peer
59from bumble.utils import AsyncRunner
60from bumble.transport import open_transport_or_link
61from bumble.gatt import Characteristic, Service, CharacteristicDeclaration, Descriptor
62from bumble.gatt_client import CharacteristicProxy
63from bumble.hci import (
64    HCI_Constant,
65    HCI_LE_1M_PHY,
66    HCI_LE_2M_PHY,
67    HCI_LE_CODED_PHY,
68)
69
70
71# -----------------------------------------------------------------------------
72# Constants
73# -----------------------------------------------------------------------------
74BUMBLE_USER_DIR = os.path.expanduser('~/.bumble')
75DEFAULT_RSSI_BAR_WIDTH = 20
76DEFAULT_CONNECTION_TIMEOUT = 30.0
77DISPLAY_MIN_RSSI = -100
78DISPLAY_MAX_RSSI = -30
79RSSI_MONITOR_INTERVAL = 5.0  # Seconds
80
81
82# -----------------------------------------------------------------------------
83# Utils
84# -----------------------------------------------------------------------------
85
86
87def le_phy_name(phy_id):
88    return {HCI_LE_1M_PHY: '1M', HCI_LE_2M_PHY: '2M', HCI_LE_CODED_PHY: 'CODED'}.get(
89        phy_id, HCI_Constant.le_phy_name(phy_id)
90    )
91
92
93def rssi_bar(rssi):
94    blocks = ['', '▏', '▎', '▍', '▌', '▋', '▊', '▉']
95    bar_width = (rssi - DISPLAY_MIN_RSSI) / (DISPLAY_MAX_RSSI - DISPLAY_MIN_RSSI)
96    bar_width = min(max(bar_width, 0), 1)
97    bar_ticks = int(bar_width * DEFAULT_RSSI_BAR_WIDTH * 8)
98    bar_blocks = ('█' * int(bar_ticks / 8)) + blocks[bar_ticks % 8]
99    return f'{rssi:4} {bar_blocks}'
100
101
102def parse_phys(phys):
103    if phys.lower() == '*':
104        return None
105
106    phy_list = []
107    elements = phys.lower().split(',')
108    for element in elements:
109        if element == '1m':
110            phy_list.append(HCI_LE_1M_PHY)
111        elif element == '2m':
112            phy_list.append(HCI_LE_2M_PHY)
113        elif element == 'coded':
114            phy_list.append(HCI_LE_CODED_PHY)
115        else:
116            raise ValueError('invalid PHY name')
117    return phy_list
118
119
120# -----------------------------------------------------------------------------
121# Console App
122# -----------------------------------------------------------------------------
123class ConsoleApp:
124    connected_peer: Optional[Peer]
125
126    def __init__(self):
127        self.known_addresses = set()
128        self.known_attributes = []
129        self.device = None
130        self.connected_peer = None
131        self.top_tab = 'device'
132        self.monitor_rssi = False
133        self.connection_rssi = None
134
135        style = Style.from_dict(
136            {
137                'output-field': 'bg:#000044 #ffffff',
138                'input-field': 'bg:#000000 #ffffff',
139                'line': '#004400',
140                'error': 'fg:ansired',
141            }
142        )
143
144        class LiveCompleter(Completer):
145            def __init__(self, words):
146                self.words = words
147
148            def get_completions(self, document, complete_event):
149                prefix = document.text_before_cursor.upper()
150                for word in [x for x in self.words if x.upper().startswith(prefix)]:
151                    yield Completion(word, start_position=-len(prefix))
152
153        def make_completer():
154            return NestedCompleter.from_nested_dict(
155                {
156                    'scan': {'on': None, 'off': None, 'clear': None},
157                    'advertise': {'on': None, 'off': None},
158                    'rssi': {'on': None, 'off': None},
159                    'show': {
160                        'scan': None,
161                        'log': None,
162                        'device': None,
163                        'local-services': None,
164                        'remote-services': None,
165                    },
166                    'filter': {
167                        'address': None,
168                    },
169                    'connect': LiveCompleter(self.known_addresses),
170                    'update-parameters': None,
171                    'encrypt': None,
172                    'disconnect': None,
173                    'discover': {'services': None, 'attributes': None},
174                    'request-mtu': None,
175                    'read': LiveCompleter(self.known_attributes),
176                    'write': LiveCompleter(self.known_attributes),
177                    'subscribe': LiveCompleter(self.known_attributes),
178                    'unsubscribe': LiveCompleter(self.known_attributes),
179                    'set-phy': {'1m': None, '2m': None, 'coded': None},
180                    'set-default-phy': None,
181                    'quit': None,
182                    'exit': None,
183                }
184            )
185
186        self.input_field = TextArea(
187            height=1,
188            prompt="> ",
189            multiline=False,
190            wrap_lines=False,
191            completer=make_completer(),
192            history=FileHistory(os.path.join(BUMBLE_USER_DIR, 'history')),
193        )
194
195        self.input_field.accept_handler = self.accept_input
196
197        self.output_height = Dimension(min=7, max=7, weight=1)
198        self.output_lines = []
199        self.output = FormattedTextControl(
200            get_cursor_position=lambda: Point(0, max(0, len(self.output_lines) - 1))
201        )
202        self.output_max_lines = 20
203        self.scan_results_text = FormattedTextControl()
204        self.local_services_text = FormattedTextControl()
205        self.remote_services_text = FormattedTextControl()
206        self.device_text = FormattedTextControl()
207        self.log_text = FormattedTextControl(
208            get_cursor_position=lambda: Point(0, max(0, len(self.log_lines) - 1))
209        )
210        self.log_height = Dimension(min=7, weight=4)
211        self.log_max_lines = 100
212        self.log_lines = []
213
214        container = HSplit(
215            [
216                ConditionalContainer(
217                    Frame(Window(self.scan_results_text), title='Scan Results'),
218                    filter=Condition(lambda: self.top_tab == 'scan'),
219                ),
220                ConditionalContainer(
221                    Frame(Window(self.local_services_text), title='Local Services'),
222                    filter=Condition(lambda: self.top_tab == 'local-services'),
223                ),
224                ConditionalContainer(
225                    Frame(Window(self.remote_services_text), title='Remote Services'),
226                    filter=Condition(lambda: self.top_tab == 'remote-services'),
227                ),
228                ConditionalContainer(
229                    Frame(Window(self.log_text, height=self.log_height), title='Log'),
230                    filter=Condition(lambda: self.top_tab == 'log'),
231                ),
232                ConditionalContainer(
233                    Frame(Window(self.device_text), title='Device'),
234                    filter=Condition(lambda: self.top_tab == 'device'),
235                ),
236                Frame(Window(self.output, height=self.output_height)),
237                FormattedTextToolbar(text=self.get_status_bar_text, style='reverse'),
238                self.input_field,
239            ]
240        )
241
242        container = FloatContainer(
243            container,
244            floats=[
245                Float(
246                    xcursor=True,
247                    ycursor=True,
248                    content=CompletionsMenu(max_height=16, scroll_offset=1),
249                ),
250            ],
251        )
252
253        layout = Layout(container, focused_element=self.input_field)
254
255        key_bindings = KeyBindings()
256
257        @key_bindings.add("c-c")
258        @key_bindings.add("c-q")
259        def _(event):
260            event.app.exit()
261
262        # pylint: disable=invalid-name
263        self.ui = Application(
264            layout=layout, style=style, key_bindings=key_bindings, full_screen=True
265        )
266
267    async def run_async(self, device_config, transport):
268        rssi_monitoring_task = asyncio.create_task(self.rssi_monitor_loop())
269
270        async with await open_transport_or_link(transport) as (hci_source, hci_sink):
271            if device_config:
272                self.device = Device.from_config_file_with_hci(
273                    device_config, hci_source, hci_sink
274                )
275            else:
276                random_address = (
277                    f"{random.randint(192,255):02X}"  # address is static random
278                )
279                for random_byte in random.sample(range(255), 5):
280                    random_address += f":{random_byte:02X}"
281                self.append_to_log(f"Setting random address: {random_address}")
282                self.device = Device.with_hci(
283                    'Bumble', random_address, hci_source, hci_sink
284                )
285            self.device.listener = DeviceListener(self)
286            await self.device.power_on()
287            self.show_device(self.device)
288            self.show_local_services(self.device.gatt_server.attributes)
289
290            # Run the UI
291            await self.ui.run_async()
292
293        rssi_monitoring_task.cancel()
294
295    def add_known_address(self, address):
296        self.known_addresses.add(address)
297
298    def accept_input(self, _):
299        if len(self.input_field.text) == 0:
300            return
301        self.append_to_output([('', '* '), ('ansicyan', self.input_field.text)], False)
302        self.ui.create_background_task(self.command(self.input_field.text))
303
304    def get_status_bar_text(self):
305        scanning = "ON" if self.device and self.device.is_scanning else "OFF"
306
307        connection_state = 'NONE'
308        encryption_state = ''
309        att_mtu = ''
310        rssi = '' if self.connection_rssi is None else rssi_bar(self.connection_rssi)
311
312        if self.device:
313            if self.device.is_le_connecting:
314                connection_state = 'CONNECTING'
315            elif self.connected_peer:
316                connection = self.connected_peer.connection
317                connection_parameters = (
318                    f'{connection.parameters.connection_interval}/'
319                    f'{connection.parameters.peripheral_latency}/'
320                    f'{connection.parameters.supervision_timeout}'
321                )
322                if connection.transport == BT_LE_TRANSPORT:
323                    phy_state = (
324                        f' RX={le_phy_name(connection.phy.rx_phy)}/'
325                        f'TX={le_phy_name(connection.phy.tx_phy)}'
326                    )
327                else:
328                    phy_state = ''
329                connection_state = (
330                    f'{connection.peer_address} '
331                    f'{connection_parameters} '
332                    f'{connection.data_length}'
333                    f'{phy_state}'
334                )
335                encryption_state = (
336                    'ENCRYPTED' if connection.is_encrypted else 'NOT ENCRYPTED'
337                )
338                att_mtu = f'ATT_MTU: {connection.att_mtu}'
339
340        return [
341            ('ansigreen', f' SCAN: {scanning} '),
342            ('', '  '),
343            ('ansiblue', f' CONNECTION: {connection_state} '),
344            ('', '  '),
345            ('ansimagenta', f' {encryption_state} '),
346            ('', '  '),
347            ('ansicyan', f' {att_mtu} '),
348            ('', '  '),
349            ('ansiyellow', f' {rssi} '),
350        ]
351
352    def show_error(self, title, details=None):
353        appended = [('class:error', title)]
354        if details:
355            appended.append(('', f' {details}'))
356        self.append_to_output(appended)
357
358    def show_scan_results(self, scan_results):
359        max_lines = 40  # TEMP
360        lines = []
361        keys = list(scan_results.keys())[:max_lines]
362        for key in keys:
363            lines.append(scan_results[key].to_display_string())
364        self.scan_results_text.text = ANSI('\n'.join(lines))
365        self.ui.invalidate()
366
367    def show_remote_services(self, services):
368        lines = []
369        del self.known_attributes[:]
370        for service in services:
371            lines.append(("ansicyan", f"{service}\n"))
372
373            for characteristic in service.characteristics:
374                lines.append(('ansimagenta', f'  {characteristic} + \n'))
375                self.known_attributes.append(
376                    f'{service.uuid.to_hex_str()}.{characteristic.uuid.to_hex_str()}'
377                )
378                self.known_attributes.append(f'*.{characteristic.uuid.to_hex_str()}')
379                self.known_attributes.append(f'#{characteristic.handle:X}')
380                for descriptor in characteristic.descriptors:
381                    lines.append(("ansigreen", f"    {descriptor}\n"))
382
383        self.remote_services_text.text = lines
384        self.ui.invalidate()
385
386    def show_local_services(self, attributes):
387        lines = []
388        for attribute in attributes:
389            if isinstance(attribute, Service):
390                lines.append(("ansicyan", f"{attribute}\n"))
391            elif isinstance(attribute, (Characteristic, CharacteristicDeclaration)):
392                lines.append(("ansimagenta", f"  {attribute}\n"))
393            elif isinstance(attribute, Descriptor):
394                lines.append(("ansigreen", f"    {attribute}\n"))
395            else:
396                lines.append(("ansiyellow", f"{attribute}\n"))
397
398        self.local_services_text.text = lines
399        self.ui.invalidate()
400
401    def show_device(self, device):
402        lines = []
403
404        lines.append(('ansicyan', 'Bumble Version:       '))
405        lines.append(('', f'{__version__}\n'))
406        lines.append(('ansicyan', 'Name:                 '))
407        lines.append(('', f'{device.name}\n'))
408        lines.append(('ansicyan', 'Public Address:       '))
409        lines.append(('', f'{device.public_address}\n'))
410        lines.append(('ansicyan', 'Random Address:       '))
411        lines.append(('', f'{device.random_address}\n'))
412        lines.append(('ansicyan', 'LE Enabled:           '))
413        lines.append(('', f'{device.le_enabled}\n'))
414        lines.append(('ansicyan', 'Classic Enabled:      '))
415        lines.append(('', f'{device.classic_enabled}\n'))
416        lines.append(('ansicyan', 'Classic SC Enabled:   '))
417        lines.append(('', f'{device.classic_sc_enabled}\n'))
418        lines.append(('ansicyan', 'Classic SSP Enabled:  '))
419        lines.append(('', f'{device.classic_ssp_enabled}\n'))
420        lines.append(('ansicyan', 'Classic Class:        '))
421        lines.append(('', f'{device.class_of_device}\n'))
422        lines.append(('ansicyan', 'Discoverable:         '))
423        lines.append(('', f'{device.discoverable}\n'))
424        lines.append(('ansicyan', 'Connectable:          '))
425        lines.append(('', f'{device.connectable}\n'))
426        lines.append(('ansicyan', 'Advertising Data:     '))
427        lines.append(('', f'{device.advertising_data}\n'))
428        lines.append(('ansicyan', 'Scan Response Data:   '))
429        lines.append(('', f'{device.scan_response_data}\n'))
430        advertising_interval = (
431            device.advertising_interval_min
432            if device.advertising_interval_min == device.advertising_interval_max
433            else (
434                f'{device.advertising_interval_min} to '
435                f'{device.advertising_interval_max}'
436            )
437        )
438        lines.append(('ansicyan', 'Advertising Interval: '))
439        lines.append(('', f'{advertising_interval}\n'))
440
441        self.device_text.text = lines
442        self.ui.invalidate()
443
444    def append_to_output(self, line, invalidate=True):
445        if isinstance(line, str):
446            line = [('', line)]
447        self.output_lines = self.output_lines[-self.output_max_lines :]
448        self.output_lines.append(line)
449        formatted_text = []
450        for line in self.output_lines:
451            formatted_text += line
452            formatted_text.append(('', '\n'))
453        self.output.text = formatted_text
454        if invalidate:
455            self.ui.invalidate()
456
457    def append_to_log(self, lines, invalidate=True):
458        self.log_lines.extend(lines.split('\n'))
459        self.log_lines = self.log_lines[-self.log_max_lines :]
460        self.log_text.text = ANSI('\n'.join(self.log_lines))
461        if invalidate:
462            self.ui.invalidate()
463
464    async def discover_services(self):
465        if not self.connected_peer:
466            self.show_error('not connected')
467            return
468
469        # Discover all services, characteristics and descriptors
470        self.append_to_output('discovering services...')
471        await self.connected_peer.discover_services()
472        self.append_to_output(
473            f'found {len(self.connected_peer.services)} services,'
474            ' discovering characteristics...'
475        )
476        await self.connected_peer.discover_characteristics()
477        self.append_to_output('found characteristics, discovering descriptors...')
478        for service in self.connected_peer.services:
479            for characteristic in service.characteristics:
480                await self.connected_peer.discover_descriptors(characteristic)
481        self.append_to_output('discovery completed')
482
483        self.show_remote_services(self.connected_peer.services)
484
485    async def discover_attributes(self):
486        if not self.connected_peer:
487            self.show_error('not connected')
488            return
489
490        # Discover all attributes
491        self.append_to_output('discovering attributes...')
492        attributes = await self.connected_peer.discover_attributes()
493        self.append_to_output(f'discovered {len(attributes)} attributes...')
494
495        self.show_attributes(attributes)
496
497    def find_characteristic(self, param) -> Optional[CharacteristicProxy]:
498        if not self.connected_peer:
499            return None
500        parts = param.split('.')
501        if len(parts) == 2:
502            service_uuid = UUID(parts[0]) if parts[0] != '*' else None
503            characteristic_uuid = UUID(parts[1])
504            for service in self.connected_peer.services:
505                if service_uuid is None or service.uuid == service_uuid:
506                    for characteristic in service.characteristics:
507                        if characteristic.uuid == characteristic_uuid:
508                            return characteristic
509        elif len(parts) == 1:
510            if parts[0].startswith('#'):
511                attribute_handle = int(f'{parts[0][1:]}', 16)
512                for service in self.connected_peer.services:
513                    for characteristic in service.characteristics:
514                        if characteristic.handle == attribute_handle:
515                            return characteristic
516
517        return None
518
519    async def rssi_monitor_loop(self):
520        while True:
521            if self.monitor_rssi and self.connected_peer:
522                self.connection_rssi = await self.connected_peer.connection.get_rssi()
523            await asyncio.sleep(RSSI_MONITOR_INTERVAL)
524
525    async def command(self, command):
526        try:
527            (keyword, *params) = command.strip().split(' ')
528            keyword = keyword.replace('-', '_').lower()
529            handler = getattr(self, f'do_{keyword}', None)
530            if handler:
531                await handler(params)
532                self.ui.invalidate()
533            else:
534                self.show_error('unknown command', keyword)
535        except Exception as error:
536            self.show_error(str(error))
537
538    async def do_scan(self, params):
539        if len(params) == 0:
540            # Toggle scanning
541            if self.device.is_scanning:
542                await self.device.stop_scanning()
543            else:
544                await self.device.start_scanning()
545        elif params[0] == 'on':
546            if len(params) == 2:
547                if not params[1].startswith("filter="):
548                    self.show_error(
549                        'invalid syntax',
550                        'expected address filter=key1:value1,key2:value,... '
551                        'available filters: address',
552                    )
553                # regex: (word):(any char except ,)
554                matches = re.findall(r"(\w+):([^,]+)", params[1])
555                for match in matches:
556                    if match[0] == "address":
557                        self.device.listener.address_filter = match[1]
558
559            await self.device.start_scanning()
560            self.top_tab = 'scan'
561        elif params[0] == 'off':
562            await self.device.stop_scanning()
563        elif params[0] == 'clear':
564            self.device.listener.scan_results.clear()
565            self.known_addresses.clear()
566            self.show_scan_results(self.device.listener.scan_results)
567        else:
568            self.show_error('unsupported arguments for scan command')
569
570    async def do_rssi(self, params):
571        if len(params) == 0:
572            # Toggle monitoring
573            self.monitor_rssi = not self.monitor_rssi
574        elif params[0] == 'on':
575            self.monitor_rssi = True
576        elif params[0] == 'off':
577            self.monitor_rssi = False
578        else:
579            self.show_error('unsupported arguments for rssi command')
580
581    async def do_connect(self, params):
582        if len(params) != 1 and len(params) != 2:
583            self.show_error('invalid syntax', 'expected connect <address> [phys]')
584            return
585
586        if len(params) == 1:
587            phys = None
588        else:
589            phys = parse_phys(params[1])
590        if phys is None:
591            connection_parameters_preferences = None
592        else:
593            connection_parameters_preferences = {
594                phy: ConnectionParametersPreferences() for phy in phys
595            }
596
597        if self.device.is_scanning:
598            await self.device.stop_scanning()
599
600        self.append_to_output('connecting...')
601
602        try:
603            await self.device.connect(
604                params[0],
605                connection_parameters_preferences=connection_parameters_preferences,
606                timeout=DEFAULT_CONNECTION_TIMEOUT,
607            )
608            self.top_tab = 'services'
609        except bumble.core.TimeoutError:
610            self.show_error('connection timed out')
611
612    async def do_disconnect(self, _):
613        if self.device.is_le_connecting:
614            await self.device.cancel_connection()
615        else:
616            if not self.connected_peer:
617                self.show_error('not connected')
618                return
619
620            await self.connected_peer.connection.disconnect()
621
622    async def do_update_parameters(self, params):
623        if len(params) != 1 or len(params[0].split('/')) != 3:
624            self.show_error(
625                'invalid syntax',
626                'expected update-parameters <interval-min>-<interval-max>'
627                '/<max-latency>/<supervision>',
628            )
629            return
630
631        if not self.connected_peer:
632            self.show_error('not connected')
633            return
634
635        connection_intervals, max_latency, supervision_timeout = params[0].split('/')
636        connection_interval_min, connection_interval_max = [
637            int(x) for x in connection_intervals.split('-')
638        ]
639        max_latency = int(max_latency)
640        supervision_timeout = int(supervision_timeout)
641        await self.connected_peer.connection.update_parameters(
642            connection_interval_min,
643            connection_interval_max,
644            max_latency,
645            supervision_timeout,
646        )
647
648    async def do_encrypt(self, _):
649        if not self.connected_peer:
650            self.show_error('not connected')
651            return
652
653        await self.connected_peer.connection.encrypt()
654
655    async def do_advertise(self, params):
656        if len(params) == 0:
657            # Toggle advertising
658            if self.device.is_advertising:
659                await self.device.stop_advertising()
660            else:
661                await self.device.start_advertising()
662        elif params[0] == 'on':
663            await self.device.start_advertising()
664        elif params[0] == 'off':
665            await self.device.stop_advertising()
666        else:
667            self.show_error('unsupported arguments for advertise command')
668
669    async def do_show(self, params):
670        if params:
671            if params[0] in {
672                'scan',
673                'log',
674                'device',
675                'local-services',
676                'remote-services',
677            }:
678                self.top_tab = params[0]
679                self.ui.invalidate()
680
681    async def do_get_phy(self, _):
682        if not self.connected_peer:
683            self.show_error('not connected')
684            return
685
686        phy = await self.connected_peer.connection.get_phy()
687        self.append_to_output(
688            f'PHY: RX={HCI_Constant.le_phy_name(phy[0])}, '
689            f'TX={HCI_Constant.le_phy_name(phy[1])}'
690        )
691
692    async def do_request_mtu(self, params):
693        if len(params) != 1:
694            self.show_error('invalid syntax', 'expected request-mtu <mtu>')
695            return
696
697        if not self.connected_peer:
698            self.show_error('not connected')
699            return
700
701        await self.connected_peer.request_mtu(int(params[0]))
702
703    async def do_discover(self, params):
704        if not params:
705            self.show_error('invalid syntax', 'expected discover services|attributes')
706            return
707
708        discovery_type = params[0]
709        if discovery_type == 'services':
710            await self.discover_services()
711        elif discovery_type == 'attributes':
712            await self.discover_attributes()
713
714    async def do_read(self, params):
715        if len(params) != 1:
716            self.show_error('invalid syntax', 'expected read <attribute>')
717            return
718
719        if not self.connected_peer:
720            self.show_error('not connected')
721            return
722
723        characteristic = self.find_characteristic(params[0])
724        if characteristic is None:
725            self.show_error('no such characteristic')
726            return
727
728        value = await characteristic.read_value()
729        self.append_to_output(f'VALUE: 0x{value.hex()}')
730
731    async def do_write(self, params):
732        if not self.connected_peer:
733            self.show_error('not connected')
734            return
735
736        if len(params) != 2:
737            self.show_error('invalid syntax', 'expected write <attribute> <value>')
738            return
739
740        if params[1].upper().startswith("0X"):
741            value = bytes.fromhex(params[1][2:])  # parse as hex string
742        else:
743            try:
744                value = int(params[1])  # try as integer
745            except ValueError:
746                value = str.encode(params[1])  # must be a string
747
748        characteristic = self.find_characteristic(params[0])
749        if characteristic is None:
750            self.show_error('no such characteristic')
751            return
752
753        # use write with response if supported
754        with_response = characteristic.properties & Characteristic.WRITE
755        await characteristic.write_value(value, with_response=with_response)
756
757    async def do_subscribe(self, params):
758        if not self.connected_peer:
759            self.show_error('not connected')
760            return
761
762        if len(params) != 1:
763            self.show_error('invalid syntax', 'expected subscribe <attribute>')
764            return
765
766        characteristic = self.find_characteristic(params[0])
767        if characteristic is None:
768            self.show_error('no such characteristic')
769            return
770
771        await characteristic.subscribe(
772            lambda value: self.append_to_output(
773                f"{characteristic} VALUE: 0x{value.hex()}"
774            ),
775        )
776
777    async def do_unsubscribe(self, params):
778        if not self.connected_peer:
779            self.show_error('not connected')
780            return
781
782        if len(params) != 1:
783            self.show_error('invalid syntax', 'expected subscribe <attribute>')
784            return
785
786        characteristic = self.find_characteristic(params[0])
787        if characteristic is None:
788            self.show_error('no such characteristic')
789            return
790
791        await characteristic.unsubscribe()
792
793    async def do_set_phy(self, params):
794        if len(params) != 1:
795            self.show_error(
796                'invalid syntax', 'expected set-phy <tx_rx_phys>|<tx_phys>/<rx_phys>'
797            )
798            return
799
800        if not self.connected_peer:
801            self.show_error('not connected')
802            return
803
804        if '/' in params[0]:
805            tx_phys, rx_phys = params[0].split('/')
806        else:
807            tx_phys = params[0]
808            rx_phys = tx_phys
809
810        await self.connected_peer.connection.set_phy(
811            tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys)
812        )
813
814    async def do_set_default_phy(self, params):
815        if len(params) != 1:
816            self.show_error(
817                'invalid syntax',
818                'expected set-default-phy <tx_rx_phys>|<tx_phys>/<rx_phys>',
819            )
820            return
821
822        if '/' in params[0]:
823            tx_phys, rx_phys = params[0].split('/')
824        else:
825            tx_phys = params[0]
826            rx_phys = tx_phys
827
828        await self.device.set_default_phy(
829            tx_phys=parse_phys(tx_phys), rx_phys=parse_phys(rx_phys)
830        )
831
832    async def do_exit(self, _):
833        self.ui.exit()
834
835    async def do_quit(self, _):
836        self.ui.exit()
837
838    async def do_filter(self, params):
839        if params[0] == "address":
840            if len(params) != 2:
841                self.show_error('invalid syntax', 'expected filter address <pattern>')
842                return
843            self.device.listener.address_filter = params[1]
844
845
846# -----------------------------------------------------------------------------
847# Device and Connection Listener
848# -----------------------------------------------------------------------------
849class DeviceListener(Device.Listener, Connection.Listener):
850    def __init__(self, app):
851        self.app = app
852        self.scan_results = OrderedDict()
853        self.address_filter = None
854
855    @property
856    def address_filter(self):
857        return self._address_filter
858
859    @address_filter.setter
860    def address_filter(self, filter_addr):
861        if filter_addr is None:
862            self._address_filter = re.compile(r".*")
863        else:
864            self._address_filter = re.compile(filter_addr)
865        self.scan_results = OrderedDict(
866            filter(self.filter_address_match, self.scan_results)
867        )
868        self.app.show_scan_results(self.scan_results)
869
870    def filter_address_match(self, address):
871        """
872        Returns true if an address matches the filter
873        """
874        return bool(self.address_filter.match(address))
875
876    @AsyncRunner.run_in_task()
877    # pylint: disable=invalid-overridden-method
878    async def on_connection(self, connection):
879        self.app.connected_peer = Peer(connection)
880        self.app.connection_rssi = None
881        self.app.append_to_output(f'connected to {self.app.connected_peer}')
882        connection.listener = self
883
884    def on_disconnection(self, reason):
885        self.app.append_to_output(
886            f'disconnected from {self.app.connected_peer}, '
887            f'reason: {HCI_Constant.error_name(reason)}'
888        )
889        self.app.connected_peer = None
890        self.app.connection_rssi = None
891
892    def on_connection_parameters_update(self):
893        self.app.append_to_output(
894            f'connection parameters update: '
895            f'{self.app.connected_peer.connection.parameters}'
896        )
897
898    def on_connection_phy_update(self):
899        self.app.append_to_output(
900            f'connection phy update: {self.app.connected_peer.connection.phy}'
901        )
902
903    def on_connection_att_mtu_update(self):
904        self.app.append_to_output(
905            f'connection att mtu update: {self.app.connected_peer.connection.att_mtu}'
906        )
907
908    def on_connection_encryption_change(self):
909        encryption_state = (
910            'encrypted'
911            if self.app.connected_peer.connection.is_encrypted
912            else 'not encrypted'
913        )
914        self.app.append_to_output(
915            'connection encryption change: ' f'{encryption_state}'
916        )
917
918    def on_connection_data_length_change(self):
919        self.app.append_to_output(
920            'connection data length change: '
921            f'{self.app.connected_peer.connection.data_length}'
922        )
923
924    def on_advertisement(self, advertisement):
925        if not self.filter_address_match(str(advertisement.address)):
926            return
927
928        entry_key = f'{advertisement.address}/{advertisement.address.address_type}'
929        entry = self.scan_results.get(entry_key)
930        if entry:
931            entry.ad_data = advertisement.data
932            entry.rssi = advertisement.rssi
933            entry.connectable = advertisement.is_connectable
934        else:
935            self.app.add_known_address(str(advertisement.address))
936            self.scan_results[entry_key] = ScanResult(
937                advertisement.address,
938                advertisement.address.address_type,
939                advertisement.data,
940                advertisement.rssi,
941                advertisement.is_connectable,
942            )
943
944        self.app.show_scan_results(self.scan_results)
945
946
947# -----------------------------------------------------------------------------
948# Scanning
949# -----------------------------------------------------------------------------
950class ScanResult:
951    def __init__(self, address, address_type, ad_data, rssi, connectable):
952        self.address = address
953        self.address_type = address_type
954        self.ad_data = ad_data
955        self.rssi = rssi
956        self.connectable = connectable
957
958    def to_display_string(self):
959        address_type_string = ('P', 'R', 'PI', 'RI')[self.address_type]
960        address_color = colors.yellow if self.connectable else colors.red
961        if address_type_string.startswith('P'):
962            type_color = colors.green
963        else:
964            type_color = colors.cyan
965
966        name = self.ad_data.get(AdvertisingData.COMPLETE_LOCAL_NAME, raw=True)
967        if name is None:
968            name = self.ad_data.get(AdvertisingData.SHORTENED_LOCAL_NAME, raw=True)
969        if name:
970            # Convert to string
971            try:
972                name = name.decode()
973            except UnicodeDecodeError:
974                name = name.hex()
975        else:
976            name = ''
977
978        # Remove any '/P' qualifier suffix from the address string
979        address_str = str(self.address).replace('/P', '')
980
981        # RSSI bar
982        bar_string = rssi_bar(self.rssi)
983        bar_padding = ' ' * (DEFAULT_RSSI_BAR_WIDTH + 5 - len(bar_string))
984        return (
985            f'{address_color(address_str)} [{type_color(address_type_string)}] '
986            f'{bar_string} {bar_padding} {name}'
987        )
988
989
990# -----------------------------------------------------------------------------
991# Logging
992# -----------------------------------------------------------------------------
993class LogHandler(logging.Handler):
994    def __init__(self, app):
995        super().__init__()
996        self.app = app
997        self.setFormatter(logging.Formatter('[%(asctime)s][%(levelname)s] %(message)s'))
998
999    def emit(self, record):
1000        message = self.format(record)
1001        self.app.append_to_log(message)
1002
1003
1004# -----------------------------------------------------------------------------
1005# Main
1006# -----------------------------------------------------------------------------
1007@click.command()
1008@click.option('--device-config', help='Device configuration file')
1009@click.argument('transport')
1010def main(device_config, transport):
1011    # Ensure that the BUMBLE_USER_DIR directory exists
1012    if not os.path.isdir(BUMBLE_USER_DIR):
1013        os.mkdir(BUMBLE_USER_DIR)
1014
1015    # Create an instance of the app
1016    app = ConsoleApp()
1017
1018    # Setup logging
1019    # logging.basicConfig(level = 'FATAL')
1020    # logging.basicConfig(level = 'DEBUG')
1021    root_logger = logging.getLogger()
1022
1023    root_logger.addHandler(LogHandler(app))
1024    root_logger.setLevel(logging.DEBUG)
1025
1026    # Run until the user exits
1027    asyncio.run(app.run_async(device_config, transport))
1028
1029
1030# -----------------------------------------------------------------------------
1031if __name__ == "__main__":
1032    main()  # pylint: disable=no-value-for-parameter
1033