1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16# ----------------------------------------------------------------------------- 17# Imports 18# ----------------------------------------------------------------------------- 19from enum import IntEnum 20import struct 21 22from ..gatt_client import ProfileServiceProxy 23from ..att import ATT_Error 24from ..gatt import ( 25 GATT_HEART_RATE_SERVICE, 26 GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, 27 GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC, 28 GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC, 29 TemplateService, 30 Characteristic, 31 CharacteristicValue, 32 DelegatedCharacteristicAdapter, 33 PackedCharacteristicAdapter, 34) 35 36 37# ----------------------------------------------------------------------------- 38class HeartRateService(TemplateService): 39 UUID = GATT_HEART_RATE_SERVICE 40 HEART_RATE_CONTROL_POINT_FORMAT = 'B' 41 CONTROL_POINT_NOT_SUPPORTED = 0x80 42 RESET_ENERGY_EXPENDED = 0x01 43 44 class BodySensorLocation(IntEnum): 45 OTHER = (0,) 46 CHEST = (1,) 47 WRIST = (2,) 48 FINGER = (3,) 49 HAND = (4,) 50 EAR_LOBE = (5,) 51 FOOT = 6 52 53 class HeartRateMeasurement: 54 def __init__( 55 self, 56 heart_rate, 57 sensor_contact_detected=None, 58 energy_expended=None, 59 rr_intervals=None, 60 ): 61 if heart_rate < 0 or heart_rate > 0xFFFF: 62 raise ValueError('heart_rate out of range') 63 64 if energy_expended is not None and ( 65 energy_expended < 0 or energy_expended > 0xFFFF 66 ): 67 raise ValueError('energy_expended out of range') 68 69 if rr_intervals: 70 for rr_interval in rr_intervals: 71 if rr_interval < 0 or rr_interval * 1024 > 0xFFFF: 72 raise ValueError('rr_intervals out of range') 73 74 self.heart_rate = heart_rate 75 self.sensor_contact_detected = sensor_contact_detected 76 self.energy_expended = energy_expended 77 self.rr_intervals = rr_intervals 78 79 @classmethod 80 def from_bytes(cls, data): 81 flags = data[0] 82 offset = 1 83 84 if flags & 1: 85 hr = struct.unpack_from('<H', data, offset)[0] 86 offset += 2 87 else: 88 hr = struct.unpack_from('B', data, offset)[0] 89 offset += 1 90 91 if flags & (1 << 2): 92 sensor_contact_detected = flags & (1 << 1) != 0 93 else: 94 sensor_contact_detected = None 95 96 if flags & (1 << 3): 97 energy_expended = struct.unpack_from('<H', data, offset)[0] 98 offset += 2 99 else: 100 energy_expended = None 101 102 if flags & (1 << 4): 103 rr_intervals = tuple( 104 struct.unpack_from('<H', data, offset + i * 2)[0] / 1024 105 for i in range((len(data) - offset) // 2) 106 ) 107 else: 108 rr_intervals = () 109 110 return cls(hr, sensor_contact_detected, energy_expended, rr_intervals) 111 112 def __bytes__(self): 113 if self.heart_rate < 256: 114 flags = 0 115 data = struct.pack('B', self.heart_rate) 116 else: 117 flags = 1 118 data = struct.pack('<H', self.heart_rate) 119 120 if self.sensor_contact_detected is not None: 121 flags |= ((1 if self.sensor_contact_detected else 0) << 1) | (1 << 2) 122 123 if self.energy_expended is not None: 124 flags |= 1 << 3 125 data += struct.pack('<H', self.energy_expended) 126 127 if self.rr_intervals: 128 flags |= 1 << 4 129 data += b''.join( 130 [ 131 struct.pack('<H', int(rr_interval * 1024)) 132 for rr_interval in self.rr_intervals 133 ] 134 ) 135 136 return bytes([flags]) + data 137 138 def __str__(self): 139 return ( 140 f'HeartRateMeasurement(heart_rate={self.heart_rate},' 141 f' sensor_contact_detected={self.sensor_contact_detected},' 142 f' energy_expended={self.energy_expended},' 143 f' rr_intervals={self.rr_intervals})' 144 ) 145 146 def __init__( 147 self, 148 read_heart_rate_measurement, 149 body_sensor_location=None, 150 reset_energy_expended=None, 151 ): 152 self.heart_rate_measurement_characteristic = DelegatedCharacteristicAdapter( 153 Characteristic( 154 GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC, 155 Characteristic.NOTIFY, 156 0, 157 CharacteristicValue(read=read_heart_rate_measurement), 158 ), 159 # pylint: disable=unnecessary-lambda 160 encode=lambda value: bytes(value), 161 ) 162 characteristics = [self.heart_rate_measurement_characteristic] 163 164 if body_sensor_location is not None: 165 self.body_sensor_location_characteristic = Characteristic( 166 GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC, 167 Characteristic.READ, 168 Characteristic.READABLE, 169 bytes([int(body_sensor_location)]), 170 ) 171 characteristics.append(self.body_sensor_location_characteristic) 172 173 if reset_energy_expended: 174 175 def write_heart_rate_control_point_value(connection, value): 176 if value == self.RESET_ENERGY_EXPENDED: 177 if reset_energy_expended is not None: 178 reset_energy_expended(connection) 179 else: 180 raise ATT_Error(self.CONTROL_POINT_NOT_SUPPORTED) 181 182 self.heart_rate_control_point_characteristic = PackedCharacteristicAdapter( 183 Characteristic( 184 GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC, 185 Characteristic.WRITE, 186 Characteristic.WRITEABLE, 187 CharacteristicValue(write=write_heart_rate_control_point_value), 188 ), 189 pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT, 190 ) 191 characteristics.append(self.heart_rate_control_point_characteristic) 192 193 super().__init__(characteristics) 194 195 196# ----------------------------------------------------------------------------- 197class HeartRateServiceProxy(ProfileServiceProxy): 198 SERVICE_CLASS = HeartRateService 199 200 def __init__(self, service_proxy): 201 self.service_proxy = service_proxy 202 203 if characteristics := service_proxy.get_characteristics_by_uuid( 204 GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC 205 ): 206 self.heart_rate_measurement = DelegatedCharacteristicAdapter( 207 characteristics[0], 208 decode=HeartRateService.HeartRateMeasurement.from_bytes, 209 ) 210 else: 211 self.heart_rate_measurement = None 212 213 if characteristics := service_proxy.get_characteristics_by_uuid( 214 GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC 215 ): 216 self.body_sensor_location = DelegatedCharacteristicAdapter( 217 characteristics[0], 218 decode=lambda value: HeartRateService.BodySensorLocation(value[0]), 219 ) 220 else: 221 self.body_sensor_location = None 222 223 if characteristics := service_proxy.get_characteristics_by_uuid( 224 GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC 225 ): 226 self.heart_rate_control_point = PackedCharacteristicAdapter( 227 characteristics[0], 228 pack_format=HeartRateService.HEART_RATE_CONTROL_POINT_FORMAT, 229 ) 230 else: 231 self.heart_rate_control_point = None 232 233 async def reset_energy_expended(self): 234 if self.heart_rate_control_point is not None: 235 return await self.heart_rate_control_point.write_value( 236 HeartRateService.RESET_ENERGY_EXPENDED 237 ) 238