• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import sys
19import time
20import math
21import random
22import struct
23import logging
24import asyncio
25import os
26
27from bumble.core import AdvertisingData
28from bumble.device import Device
29from bumble.transport import open_transport_or_link
30from bumble.profiles.device_information_service import DeviceInformationService
31from bumble.profiles.heart_rate_service import HeartRateService
32
33
34# -----------------------------------------------------------------------------
35async def main():
36    if len(sys.argv) != 3:
37        print('Usage: python heart_rate_server.py <device-config> <transport-spec>')
38        print('example: python heart_rate_server.py device1.json usb:0')
39        return
40
41    async with await open_transport_or_link(sys.argv[2]) as (hci_source, hci_sink):
42        device = Device.from_config_file_with_hci(sys.argv[1], hci_source, hci_sink)
43
44        # Keep track of accumulated expended energy
45        energy_start_time = time.time()
46
47        def reset_energy_expended():
48            nonlocal energy_start_time
49            energy_start_time = time.time()
50
51        # Add a Device Information Service and Heart Rate Service to the GATT sever
52        device_information_service = DeviceInformationService(
53            manufacturer_name='ACME',
54            model_number='HR-102',
55            serial_number='7654321',
56            hardware_revision='1.1.3',
57            software_revision='2.5.6',
58            system_id=(0x123456, 0x8877665544),
59        )
60
61        heart_rate_service = HeartRateService(
62            read_heart_rate_measurement=lambda _: HeartRateService.HeartRateMeasurement(
63                heart_rate=100 + int(50 * math.sin(time.time() * math.pi / 60)),
64                sensor_contact_detected=random.choice((True, False, None)),
65                energy_expended=random.choice(
66                    (int((time.time() - energy_start_time) * 100), None)
67                ),
68                rr_intervals=random.choice(
69                    (
70                        (
71                            random.randint(900, 1100) / 1000,
72                            random.randint(900, 1100) / 1000,
73                        ),
74                        None,
75                    )
76                ),
77            ),
78            body_sensor_location=HeartRateService.BodySensorLocation.WRIST,
79            reset_energy_expended=lambda _: reset_energy_expended(),
80        )
81
82        device.add_services([device_information_service, heart_rate_service])
83
84        # Set the advertising data
85        device.advertising_data = bytes(
86            AdvertisingData(
87                [
88                    (
89                        AdvertisingData.COMPLETE_LOCAL_NAME,
90                        bytes('Bumble Heart', 'utf-8'),
91                    ),
92                    (
93                        AdvertisingData.INCOMPLETE_LIST_OF_16_BIT_SERVICE_CLASS_UUIDS,
94                        bytes(heart_rate_service.uuid),
95                    ),
96                    (AdvertisingData.APPEARANCE, struct.pack('<H', 0x0340)),
97                ]
98            )
99        )
100
101        # Go!
102        await device.power_on()
103        await device.start_advertising(auto_restart=True)
104
105        # Notify every 3 seconds
106        while True:
107            await asyncio.sleep(3.0)
108            await device.notify_subscribers(
109                heart_rate_service.heart_rate_measurement_characteristic
110            )
111
112
113# -----------------------------------------------------------------------------
114logging.basicConfig(level=os.environ.get('BUMBLE_LOGLEVEL', 'DEBUG').upper())
115asyncio.run(main())
116