• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import sys
20import os
21import logging
22from bumble.colors import color
23from bumble.device import Device
24from bumble.transport import open_transport
25from bumble.profiles.battery_service import BatteryServiceProxy
26
27
28# -----------------------------------------------------------------------------
29async def main():
30    if len(sys.argv) != 3:
31        print('Usage: battery_client.py <transport-spec> <bluetooth-address>')
32        print('example: battery_client.py usb:0 E1:CA:72:48:C4:E8')
33        return
34
35    print('<<< connecting to HCI...')
36    async with await open_transport(sys.argv[1]) as (hci_source, hci_sink):
37        print('<<< connected')
38
39        # Create and start a device
40        device = Device.with_hci('Bumble', 'F0:F1:F2:F3:F4:F5', hci_source, hci_sink)
41        await device.power_on()
42
43        # Connect to the peer
44        target_address = sys.argv[2]
45        print(f'=== Connecting to {target_address}...')
46        async with device.connect_as_gatt(target_address) as peer:
47            print(f'=== Connected to {peer}')
48            battery_service = peer.create_service_proxy(BatteryServiceProxy)
49
50            # Check that the service was found
51            if not battery_service:
52                print('!!! Service not found')
53                return
54
55            # Subscribe to and read the battery level
56            if battery_service.battery_level:
57                await battery_service.battery_level.subscribe(
58                    lambda value: print(
59                        f'{color("Battery Level Update:", "green")} {value}'
60                    )
61                )
62                value = await battery_service.battery_level.read_value()
63                print(f'{color("Initial Battery Level:", "green")} {value}')
64
65            await peer.sustain()
66
67
68# -----------------------------------------------------------------------------
69logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
70asyncio.run(main())
71