• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16# -----------------------------------------------------------------------------
17# Imports
18# -----------------------------------------------------------------------------
19from enum import IntEnum
20import struct
21
22from ..gatt_client import ProfileServiceProxy
23from ..att import ATT_Error
24from ..gatt import (
25    GATT_HEART_RATE_SERVICE,
26    GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
27    GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
28    GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
29    TemplateService,
30    Characteristic,
31    CharacteristicValue,
32    DelegatedCharacteristicAdapter,
33    PackedCharacteristicAdapter,
34)
35
36
37# -----------------------------------------------------------------------------
38class HeartRateService(TemplateService):
39    UUID = GATT_HEART_RATE_SERVICE
40    HEART_RATE_CONTROL_POINT_FORMAT = 'B'
41    CONTROL_POINT_NOT_SUPPORTED = 0x80
42    RESET_ENERGY_EXPENDED = 0x01
43
44    class BodySensorLocation(IntEnum):
45        OTHER = (0,)
46        CHEST = (1,)
47        WRIST = (2,)
48        FINGER = (3,)
49        HAND = (4,)
50        EAR_LOBE = (5,)
51        FOOT = 6
52
53    class HeartRateMeasurement:
54        def __init__(
55            self,
56            heart_rate,
57            sensor_contact_detected=None,
58            energy_expended=None,
59            rr_intervals=None,
60        ):
61            if heart_rate < 0 or heart_rate > 0xFFFF:
62                raise ValueError('heart_rate out of range')
63
64            if energy_expended is not None and (
65                energy_expended < 0 or energy_expended > 0xFFFF
66            ):
67                raise ValueError('energy_expended out of range')
68
69            if rr_intervals:
70                for rr_interval in rr_intervals:
71                    if rr_interval < 0 or rr_interval * 1024 > 0xFFFF:
72                        raise ValueError('rr_intervals out of range')
73
74            self.heart_rate = heart_rate
75            self.sensor_contact_detected = sensor_contact_detected
76            self.energy_expended = energy_expended
77            self.rr_intervals = rr_intervals
78
79        @classmethod
80        def from_bytes(cls, data):
81            flags = data[0]
82            offset = 1
83
84            if flags & 1:
85                hr = struct.unpack_from('<H', data, offset)[0]
86                offset += 2
87            else:
88                hr = struct.unpack_from('B', data, offset)[0]
89                offset += 1
90
91            if flags & (1 << 2):
92                sensor_contact_detected = flags & (1 << 1) != 0
93            else:
94                sensor_contact_detected = None
95
96            if flags & (1 << 3):
97                energy_expended = struct.unpack_from('<H', data, offset)[0]
98                offset += 2
99            else:
100                energy_expended = None
101
102            if flags & (1 << 4):
103                rr_intervals = tuple(
104                    struct.unpack_from('<H', data, offset + i * 2)[0] / 1024
105                    for i in range((len(data) - offset) // 2)
106                )
107            else:
108                rr_intervals = ()
109
110            return cls(hr, sensor_contact_detected, energy_expended, rr_intervals)
111
112        def __bytes__(self):
113            if self.heart_rate < 256:
114                flags = 0
115                data = struct.pack('B', self.heart_rate)
116            else:
117                flags = 1
118                data = struct.pack('<H', self.heart_rate)
119
120            if self.sensor_contact_detected is not None:
121                flags |= ((1 if self.sensor_contact_detected else 0) << 1) | (1 << 2)
122
123            if self.energy_expended is not None:
124                flags |= 1 << 3
125                data += struct.pack('<H', self.energy_expended)
126
127            if self.rr_intervals:
128                flags |= 1 << 4
129                data += b''.join(
130                    [
131                        struct.pack('<H', int(rr_interval * 1024))
132                        for rr_interval in self.rr_intervals
133                    ]
134                )
135
136            return bytes([flags]) + data
137
138        def __str__(self):
139            return (
140                f'HeartRateMeasurement(heart_rate={self.heart_rate},'
141                f' sensor_contact_detected={self.sensor_contact_detected},'
142                f' energy_expended={self.energy_expended},'
143                f' rr_intervals={self.rr_intervals})'
144            )
145
146    def __init__(
147        self,
148        read_heart_rate_measurement,
149        body_sensor_location=None,
150        reset_energy_expended=None,
151    ):
152        self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter(
153            Characteristic(
154                GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC,
155                Characteristic.NOTIFY,
156                0,
157                CharacteristicValue(read=read_heart_rate_measurement),
158            ),
159            # pylint: disable=unnecessary-lambda
160            encode=lambda value: bytes(value),
161        )
162        characteristics = [self.heart_rate_measurement_characteristic]
163
164        if body_sensor_location is not None:
165            self.body_sensor_location_characteristic = Characteristic(
166                GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC,
167                Characteristic.READ,
168                Characteristic.READABLE,
169                bytes([int(body_sensor_location)]),
170            )
171            characteristics.append(self.body_sensor_location_characteristic)
172
173        if reset_energy_expended:
174
175            def write_heart_rate_control_point_value(connection, value):
176                if value == self.RESET_ENERGY_EXPENDED:
177                    if reset_energy_expended is not None:
178                        reset_energy_expended(connection)
179                else:
180                    raise ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED)
181
182            self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter(
183                Characteristic(
184                    GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC,
185                    Characteristic.WRITE,
186                    Characteristic.WRITEABLE,
187                    CharacteristicValue(write=write_heart_rate_control_point_value),
188                ),
189                pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
190            )
191            characteristics.append(self.heart_rate_control_point_characteristic)
192
193        super().__init__(characteristics)
194
195
196# -----------------------------------------------------------------------------
197class HeartRateServiceProxy(ProfileServiceProxy):
198    SERVICE_CLASS = HeartRateService
199
200    def __init__(self, service_proxy):
201        self.service_proxy = service_proxy
202
203        if characteristics := service_proxy.get_characteristics_by_uuid(
204            GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC
205        ):
206            self.heart_rate_measurement = DelegatedCharacteristicAdapter(
207                characteristics[0],
208                decode=HeartRateService.HeartRateMeasurement.from_bytes,
209            )
210        else:
211            self.heart_rate_measurement = None
212
213        if characteristics := service_proxy.get_characteristics_by_uuid(
214            GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC
215        ):
216            self.body_sensor_location = DelegatedCharacteristicAdapter(
217                characteristics[0],
218                decode=lambda value: HeartRateService.BodySensorLocation(value[0]),
219            )
220        else:
221            self.body_sensor_location = None
222
223        if characteristics := service_proxy.get_characteristics_by_uuid(
224            GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC
225        ):
226            self.heart_rate_control_point = PackedCharacteristicAdapter(
227                characteristics[0],
228                pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT,
229            )
230        else:
231            self.heart_rate_control_point = None
232
233    async def reset_energy_expended(self):
234        if self.heart_rate_control_point is not None:
235            return await self.heart_rate_control_point.write_value(
236                HeartRateService.RESET_ENERGY_EXPENDED
237            )
238