• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22from colors import color
23from bumble.device import Device, Peer
24from bumble.transport import open_transport
25from bumble.profiles.battery_service import BatteryServiceProxy
26
27
28# -----------------------------------------------------------------------------
29async def main():
30    if len(sys.argv) != 3:
31        print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
32        print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
33        return
34
35    print('<<< connecting to HCI...')
36    async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
37        print('<<< connected')
38
39        # Create and start a device
40        device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
41        await device.power_on()
42
43        # Connect to the peer
44        target_address = sys.argv[2]
45        print(f'=== Connecting to {target_address}...')
46        connection = await device.connect(target_address)
47        print(f'=== Connected to {connection}')
48
49        # Discover the Battery Service
50        peer = Peer(connection)
51        print('=== Discovering Battery Service')
52        battery_service = await peer.discover_and_create_service_proxy(BatteryServiceProxy)
53
54        # Check that the service was found
55        if not battery_service:
56            print('!!! Service not found')
57            return
58
59        # Subscribe to and read the battery level
60        if battery_service.battery_level:
61            await battery_service.battery_level.subscribe(
62                lambda value: print(f'{color("Battery Level Update:", "green")} {value}')
63            )
64            value = await battery_service.battery_level.read_value()
65            print(f'{color("Initial Battery Level:", "green")} {value}')
66
67        await hci_source.wait_for_termination()
68
69
70# -----------------------------------------------------------------------------
71logging.basicConfig(level = os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
72asyncio.run(main())
73