1# Copyright 2021-2024 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16# ----------------------------------------------------------------------------- 17# Imports 18# ----------------------------------------------------------------------------- 19from __future__ import annotations 20import enum 21 22from bumble import att 23from bumble import device 24from bumble import gatt 25from bumble import gatt_client 26 27from typing import Optional 28 29# ----------------------------------------------------------------------------- 30# Constants 31# ----------------------------------------------------------------------------- 32 33MIN_VOLUME = 0 34MAX_VOLUME = 255 35 36 37class ErrorCode(enum.IntEnum): 38 ''' 39 See Volume Control Service 1.6. Application error codes. 40 ''' 41 42 INVALID_CHANGE_COUNTER = 0x80 43 OPCODE_NOT_SUPPORTED = 0x81 44 45 46class VolumeFlags(enum.IntFlag): 47 ''' 48 See Volume Control Service 3.3. Volume Flags. 49 ''' 50 51 VOLUME_SETTING_PERSISTED = 0x01 52 # RFU 53 54 55class VolumeControlPointOpcode(enum.IntEnum): 56 ''' 57 See Volume Control Service Table 3.3: Volume Control Point procedure requirements. 58 ''' 59 60 # fmt: off 61 RELATIVE_VOLUME_DOWN = 0x00 62 RELATIVE_VOLUME_UP = 0x01 63 UNMUTE_RELATIVE_VOLUME_DOWN = 0x02 64 UNMUTE_RELATIVE_VOLUME_UP = 0x03 65 SET_ABSOLUTE_VOLUME = 0x04 66 UNMUTE = 0x05 67 MUTE = 0x06 68 69 70# ----------------------------------------------------------------------------- 71# Server 72# ----------------------------------------------------------------------------- 73class VolumeControlService(gatt.TemplateService): 74 UUID = gatt.GATT_VOLUME_CONTROL_SERVICE 75 76 volume_state: gatt.Characteristic 77 volume_control_point: gatt.Characteristic 78 volume_flags: gatt.Characteristic 79 80 volume_setting: int 81 muted: int 82 change_counter: int 83 84 def __init__( 85 self, 86 step_size: int = 16, 87 volume_setting: int = 0, 88 muted: int = 0, 89 change_counter: int = 0, 90 volume_flags: int = 0, 91 ) -> None: 92 self.step_size = step_size 93 self.volume_setting = volume_setting 94 self.muted = muted 95 self.change_counter = change_counter 96 97 self.volume_state = gatt.Characteristic( 98 uuid=gatt.GATT_VOLUME_STATE_CHARACTERISTIC, 99 properties=( 100 gatt.Characteristic.Properties.READ 101 | gatt.Characteristic.Properties.NOTIFY 102 ), 103 permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, 104 value=gatt.CharacteristicValue(read=self._on_read_volume_state), 105 ) 106 self.volume_control_point = gatt.Characteristic( 107 uuid=gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC, 108 properties=gatt.Characteristic.Properties.WRITE, 109 permissions=gatt.Characteristic.Permissions.WRITE_REQUIRES_ENCRYPTION, 110 value=gatt.CharacteristicValue(write=self._on_write_volume_control_point), 111 ) 112 self.volume_flags = gatt.Characteristic( 113 uuid=gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC, 114 properties=gatt.Characteristic.Properties.READ, 115 permissions=gatt.Characteristic.Permissions.READ_REQUIRES_ENCRYPTION, 116 value=bytes([volume_flags]), 117 ) 118 119 super().__init__( 120 [ 121 self.volume_state, 122 self.volume_control_point, 123 self.volume_flags, 124 ] 125 ) 126 127 @property 128 def volume_state_bytes(self) -> bytes: 129 return bytes([self.volume_setting, self.muted, self.change_counter]) 130 131 @volume_state_bytes.setter 132 def volume_state_bytes(self, new_value: bytes) -> None: 133 self.volume_setting, self.muted, self.change_counter = new_value 134 135 def _on_read_volume_state(self, _connection: Optional[device.Connection]) -> bytes: 136 return self.volume_state_bytes 137 138 def _on_write_volume_control_point( 139 self, connection: Optional[device.Connection], value: bytes 140 ) -> None: 141 assert connection 142 143 opcode = VolumeControlPointOpcode(value[0]) 144 change_counter = value[1] 145 146 if change_counter != self.change_counter: 147 raise att.ATT_Error(ErrorCode.INVALID_CHANGE_COUNTER) 148 149 handler = getattr(self, '_on_' + opcode.name.lower()) 150 if handler(*value[2:]): 151 self.change_counter = (self.change_counter + 1) % 256 152 connection.abort_on( 153 'disconnection', 154 connection.device.notify_subscribers( 155 attribute=self.volume_state, 156 value=self.volume_state_bytes, 157 ), 158 ) 159 self.emit( 160 'volume_state', self.volume_setting, self.muted, self.change_counter 161 ) 162 163 def _on_relative_volume_down(self) -> bool: 164 old_volume = self.volume_setting 165 self.volume_setting = max(self.volume_setting - self.step_size, MIN_VOLUME) 166 return self.volume_setting != old_volume 167 168 def _on_relative_volume_up(self) -> bool: 169 old_volume = self.volume_setting 170 self.volume_setting = min(self.volume_setting + self.step_size, MAX_VOLUME) 171 return self.volume_setting != old_volume 172 173 def _on_unmute_relative_volume_down(self) -> bool: 174 old_volume, old_muted_state = self.volume_setting, self.muted 175 self.volume_setting = max(self.volume_setting - self.step_size, MIN_VOLUME) 176 self.muted = 0 177 return (self.volume_setting, self.muted) != (old_volume, old_muted_state) 178 179 def _on_unmute_relative_volume_up(self) -> bool: 180 old_volume, old_muted_state = self.volume_setting, self.muted 181 self.volume_setting = min(self.volume_setting + self.step_size, MAX_VOLUME) 182 self.muted = 0 183 return (self.volume_setting, self.muted) != (old_volume, old_muted_state) 184 185 def _on_set_absolute_volume(self, volume_setting: int) -> bool: 186 old_volume_setting = self.volume_setting 187 self.volume_setting = volume_setting 188 return old_volume_setting != self.volume_setting 189 190 def _on_unmute(self) -> bool: 191 old_muted_state = self.muted 192 self.muted = 0 193 return self.muted != old_muted_state 194 195 def _on_mute(self) -> bool: 196 old_muted_state = self.muted 197 self.muted = 1 198 return self.muted != old_muted_state 199 200 201# ----------------------------------------------------------------------------- 202# Client 203# ----------------------------------------------------------------------------- 204class VolumeControlServiceProxy(gatt_client.ProfileServiceProxy): 205 SERVICE_CLASS = VolumeControlService 206 207 volume_control_point: gatt_client.CharacteristicProxy 208 209 def __init__(self, service_proxy: gatt_client.ServiceProxy) -> None: 210 self.service_proxy = service_proxy 211 212 self.volume_state = gatt.PackedCharacteristicAdapter( 213 service_proxy.get_characteristics_by_uuid( 214 gatt.GATT_VOLUME_STATE_CHARACTERISTIC 215 )[0], 216 'BBB', 217 ) 218 219 self.volume_control_point = service_proxy.get_characteristics_by_uuid( 220 gatt.GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC 221 )[0] 222 223 self.volume_flags = gatt.PackedCharacteristicAdapter( 224 service_proxy.get_characteristics_by_uuid( 225 gatt.GATT_VOLUME_FLAGS_CHARACTERISTIC 226 )[0], 227 'B', 228 ) 229