1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18import asyncio 19import os 20import logging 21import click 22from colors import color 23 24from bumble.core import ProtocolError, TimeoutError 25from bumble.device import Device, Peer 26from bumble.gatt import show_services 27from bumble.transport import open_transport_or_link 28 29 30# ----------------------------------------------------------------------------- 31async def dump_gatt_db(peer, done): 32 # Discover all services 33 print(color('### Discovering Services and Characteristics', 'magenta')) 34 await peer.discover_services() 35 for service in peer.services: 36 await service.discover_characteristics() 37 for characteristic in service.characteristics: 38 await characteristic.discover_descriptors() 39 40 print(color('=== Services ===', 'yellow')) 41 show_services(peer.services) 42 print() 43 44 # Discover all attributes 45 print(color('=== All Attributes ===', 'yellow')) 46 attributes = await peer.discover_attributes() 47 for attribute in attributes: 48 print(attribute) 49 try: 50 value = await attribute.read_value() 51 print(color(f'{value.hex()}', 'green')) 52 except ProtocolError as error: 53 print(color(error, 'red')) 54 except TimeoutError: 55 print(color('read timeout', 'red')) 56 57 if done is not None: 58 done.set_result(None) 59 60 61# ----------------------------------------------------------------------------- 62async def async_main(device_config, encrypt, transport, address_or_name): 63 async with await open_transport_or_link(transport) as (hci_source, hci_sink): 64 65 # Create a device 66 if device_config: 67 device = Device.from_config_file_with_hci(device_config, hci_source, hci_sink) 68 else: 69 device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink) 70 await device.power_on() 71 72 if address_or_name: 73 # Connect to the target peer 74 connection = await device.connect(address_or_name) 75 76 # Encrypt the connection if required 77 if encrypt: 78 await connection.encrypt() 79 80 await dump_gatt_db(Peer(connection), None) 81 else: 82 # Wait for a connection 83 done = asyncio.get_running_loop().create_future() 84 device.on('connection', lambda connection: asyncio.create_task(dump_gatt_db(Peer(connection), done))) 85 await device.start_advertising(auto_restart=True) 86 87 print(color('### Waiting for connection...', 'blue')) 88 await done 89 90 91# ----------------------------------------------------------------------------- 92@click.command() 93@click.option('--device-config', help='Device configuration', type=click.Path()) 94@click.option('--encrypt', help='Encrypt the connection', is_flag=True, default=False) 95@click.argument('transport') 96@click.argument('address-or-name', required=False) 97def main(device_config, encrypt, transport, address_or_name): 98 """ 99 Dump the GATT database on a remote device. If ADDRESS_OR_NAME is not specified, 100 wait for an incoming connection. 101 """ 102 logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'INFO').upper()) 103 asyncio.run(async_main(device_config, encrypt, transport, address_or_name)) 104 105 106# ----------------------------------------------------------------------------- 107if __name__ == '__main__': 108 main() 109