• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import os
20import logging
21import click
22from colors import color
23
24from bumble.core import ProtocolError, TimeoutError
25from bumble.device import Device, Peer
26from bumble.gatt import show_services
27from bumble.transport import open_transport_or_link
28
29
30# -----------------------------------------------------------------------------
31async def dump_gatt_db(peer, done):
32    # Discover all services
33    print(color('### Discovering Services and Characteristics', 'magenta'))
34    await peer.discover_services()
35    for service in peer.services:
36        await service.discover_characteristics()
37        for characteristic in service.characteristics:
38            await characteristic.discover_descriptors()
39
40    print(color('=== Services ===', 'yellow'))
41    show_services(peer.services)
42    print()
43
44    # Discover all attributes
45    print(color('=== All Attributes ===', 'yellow'))
46    attributes = await peer.discover_attributes()
47    for attribute in attributes:
48        print(attribute)
49        try:
50            value = await attribute.read_value()
51            print(color(f'{value.hex()}', 'green'))
52        except ProtocolError as error:
53            print(color(error, 'red'))
54        except TimeoutError:
55            print(color('read timeout', 'red'))
56
57    if done is not None:
58        done.set_result(None)
59
60
61# -----------------------------------------------------------------------------
62async def async_main(device_config, encrypt, transport, address_or_name):
63    async with await open_transport_or_link(transport) as (hci_source, hci_sink):
64
65        # Create a device
66        if device_config:
67            device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink)
68        else:
69            device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
70        await device.power_on()
71
72        if address_or_name:
73            # Connect to the target peer
74            connection = await device.connect(address_or_name)
75
76            # Encrypt the connection if required
77            if encrypt:
78                await connection.encrypt()
79
80            await dump_gatt_db(Peer(connection), None)
81        else:
82            # Wait for a connection
83            done = asyncio.get_running_loop().create_future()
84            device.on('connection', lambda connection: asyncio.create_task(dump_gatt_db(Peer(connection), done)))
85            await device.start_advertising(auto_restart=True)
86
87            print(color('### Waiting for connection...', 'blue'))
88            await done
89
90
91# -----------------------------------------------------------------------------
92@click.command()
93@click.option('--device-config', help='Device configuration', type=click.Path())
94@click.option('--encrypt', help='Encrypt the connection', is_flag=True, default=False)
95@click.argument('transport')
96@click.argument('address-or-name', required=False)
97def main(device_config, encrypt, transport, address_or_name):
98    """
99    Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified,
100    wait for an incoming connection.
101    """
102    logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper())
103    asyncio.run(async_main(device_config, encrypt, transport, address_or_name))
104
105
106# -----------------------------------------------------------------------------
107if __name__ == '__main__':
108    main()
109