• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# GATT - Generic Attribute Profile
17#
18# See Bluetooth spec @ Vol 3, Part G
19#
20# -----------------------------------------------------------------------------
21
22# -----------------------------------------------------------------------------
23# Imports
24# -----------------------------------------------------------------------------
25from __future__ import annotations
26import enum
27import functools
28import logging
29import struct
30from typing import (
31    Callable,
32    Dict,
33    Iterable,
34    List,
35    Optional,
36    Sequence,
37    Union,
38    TYPE_CHECKING,
39)
40
41from bumble.colors import color
42from bumble.core import UUID
43from bumble.att import Attribute, AttributeValue
44
45if TYPE_CHECKING:
46    from bumble.gatt_client import AttributeProxy
47    from bumble.device import Connection
48
49
50# -----------------------------------------------------------------------------
51# Logging
52# -----------------------------------------------------------------------------
53logger = logging.getLogger(__name__)
54
55# -----------------------------------------------------------------------------
56# Constants
57# -----------------------------------------------------------------------------
58# fmt: off
59# pylint: disable=line-too-long
60
61GATT_REQUEST_TIMEOUT = 30  # seconds
62
63GATT_MAX_ATTRIBUTE_VALUE_SIZE = 512
64
65# Services
66GATT_GENERIC_ACCESS_SERVICE                 = UUID.from_16_bits(0x1800, 'Generic Access')
67GATT_GENERIC_ATTRIBUTE_SERVICE              = UUID.from_16_bits(0x1801, 'Generic Attribute')
68GATT_IMMEDIATE_ALERT_SERVICE                = UUID.from_16_bits(0x1802, 'Immediate Alert')
69GATT_LINK_LOSS_SERVICE                      = UUID.from_16_bits(0x1803, 'Link Loss')
70GATT_TX_POWER_SERVICE                       = UUID.from_16_bits(0x1804, 'TX Power')
71GATT_CURRENT_TIME_SERVICE                   = UUID.from_16_bits(0x1805, 'Current Time')
72GATT_REFERENCE_TIME_UPDATE_SERVICE          = UUID.from_16_bits(0x1806, 'Reference Time Update')
73GATT_NEXT_DST_CHANGE_SERVICE                = UUID.from_16_bits(0x1807, 'Next DST Change')
74GATT_GLUCOSE_SERVICE                        = UUID.from_16_bits(0x1808, 'Glucose')
75GATT_HEALTH_THERMOMETER_SERVICE             = UUID.from_16_bits(0x1809, 'Health Thermometer')
76GATT_DEVICE_INFORMATION_SERVICE             = UUID.from_16_bits(0x180A, 'Device Information')
77GATT_HEART_RATE_SERVICE                     = UUID.from_16_bits(0x180D, 'Heart Rate')
78GATT_PHONE_ALERT_STATUS_SERVICE             = UUID.from_16_bits(0x180E, 'Phone Alert Status')
79GATT_BATTERY_SERVICE                        = UUID.from_16_bits(0x180F, 'Battery')
80GATT_BLOOD_PRESSURE_SERVICE                 = UUID.from_16_bits(0x1810, 'Blood Pressure')
81GATT_ALERT_NOTIFICATION_SERVICE             = UUID.from_16_bits(0x1811, 'Alert Notification')
82GATT_HUMAN_INTERFACE_DEVICE_SERVICE         = UUID.from_16_bits(0x1812, 'Human Interface Device')
83GATT_SCAN_PARAMETERS_SERVICE                = UUID.from_16_bits(0x1813, 'Scan Parameters')
84GATT_RUNNING_SPEED_AND_CADENCE_SERVICE      = UUID.from_16_bits(0x1814, 'Running Speed and Cadence')
85GATT_AUTOMATION_IO_SERVICE                  = UUID.from_16_bits(0x1815, 'Automation IO')
86GATT_CYCLING_SPEED_AND_CADENCE_SERVICE      = UUID.from_16_bits(0x1816, 'Cycling Speed and Cadence')
87GATT_CYCLING_POWER_SERVICE                  = UUID.from_16_bits(0x1818, 'Cycling Power')
88GATT_LOCATION_AND_NAVIGATION_SERVICE        = UUID.from_16_bits(0x1819, 'Location and Navigation')
89GATT_ENVIRONMENTAL_SENSING_SERVICE          = UUID.from_16_bits(0x181A, 'Environmental Sensing')
90GATT_BODY_COMPOSITION_SERVICE               = UUID.from_16_bits(0x181B, 'Body Composition')
91GATT_USER_DATA_SERVICE                      = UUID.from_16_bits(0x181C, 'User Data')
92GATT_WEIGHT_SCALE_SERVICE                   = UUID.from_16_bits(0x181D, 'Weight Scale')
93GATT_BOND_MANAGEMENT_SERVICE                = UUID.from_16_bits(0x181E, 'Bond Management')
94GATT_CONTINUOUS_GLUCOSE_MONITORING_SERVICE  = UUID.from_16_bits(0x181F, 'Continuous Glucose Monitoring')
95GATT_INTERNET_PROTOCOL_SUPPORT_SERVICE      = UUID.from_16_bits(0x1820, 'Internet Protocol Support')
96GATT_INDOOR_POSITIONING_SERVICE             = UUID.from_16_bits(0x1821, 'Indoor Positioning')
97GATT_PULSE_OXIMETER_SERVICE                 = UUID.from_16_bits(0x1822, 'Pulse Oximeter')
98GATT_HTTP_PROXY_SERVICE                     = UUID.from_16_bits(0x1823, 'HTTP Proxy')
99GATT_TRANSPORT_DISCOVERY_SERVICE            = UUID.from_16_bits(0x1824, 'Transport Discovery')
100GATT_OBJECT_TRANSFER_SERVICE                = UUID.from_16_bits(0x1825, 'Object Transfer')
101GATT_FITNESS_MACHINE_SERVICE                = UUID.from_16_bits(0x1826, 'Fitness Machine')
102GATT_MESH_PROVISIONING_SERVICE              = UUID.from_16_bits(0x1827, 'Mesh Provisioning')
103GATT_MESH_PROXY_SERVICE                     = UUID.from_16_bits(0x1828, 'Mesh Proxy')
104GATT_RECONNECTION_CONFIGURATION_SERVICE     = UUID.from_16_bits(0x1829, 'Reconnection Configuration')
105GATT_INSULIN_DELIVERY_SERVICE               = UUID.from_16_bits(0x183A, 'Insulin Delivery')
106GATT_BINARY_SENSOR_SERVICE                  = UUID.from_16_bits(0x183B, 'Binary Sensor')
107GATT_EMERGENCY_CONFIGURATION_SERVICE        = UUID.from_16_bits(0x183C, 'Emergency Configuration')
108GATT_AUTHORIZATION_CONTROL_SERVICE          = UUID.from_16_bits(0x183D, 'Authorization Control')
109GATT_PHYSICAL_ACTIVITY_MONITOR_SERVICE      = UUID.from_16_bits(0x183E, 'Physical Activity Monitor')
110GATT_ELAPSED_TIME_SERVICE                   = UUID.from_16_bits(0x183F, 'Elapsed Time')
111GATT_GENERIC_HEALTH_SENSOR_SERVICE          = UUID.from_16_bits(0x1840, 'Generic Health Sensor')
112GATT_AUDIO_INPUT_CONTROL_SERVICE            = UUID.from_16_bits(0x1843, 'Audio Input Control')
113GATT_VOLUME_CONTROL_SERVICE                 = UUID.from_16_bits(0x1844, 'Volume Control')
114GATT_VOLUME_OFFSET_CONTROL_SERVICE          = UUID.from_16_bits(0x1845, 'Volume Offset Control')
115GATT_COORDINATED_SET_IDENTIFICATION_SERVICE = UUID.from_16_bits(0x1846, 'Coordinated Set Identification')
116GATT_DEVICE_TIME_SERVICE                    = UUID.from_16_bits(0x1847, 'Device Time')
117GATT_MEDIA_CONTROL_SERVICE                  = UUID.from_16_bits(0x1848, 'Media Control')
118GATT_GENERIC_MEDIA_CONTROL_SERVICE          = UUID.from_16_bits(0x1849, 'Generic Media Control')
119GATT_CONSTANT_TONE_EXTENSION_SERVICE        = UUID.from_16_bits(0x184A, 'Constant Tone Extension')
120GATT_TELEPHONE_BEARER_SERVICE               = UUID.from_16_bits(0x184B, 'Telephone Bearer')
121GATT_GENERIC_TELEPHONE_BEARER_SERVICE       = UUID.from_16_bits(0x184C, 'Generic Telephone Bearer')
122GATT_MICROPHONE_CONTROL_SERVICE             = UUID.from_16_bits(0x184D, 'Microphone Control')
123GATT_AUDIO_STREAM_CONTROL_SERVICE           = UUID.from_16_bits(0x184E, 'Audio Stream Control')
124GATT_BROADCAST_AUDIO_SCAN_SERVICE           = UUID.from_16_bits(0x184F, 'Broadcast Audio Scan')
125GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE   = UUID.from_16_bits(0x1850, 'Published Audio Capabilities')
126GATT_BASIC_AUDIO_ANNOUNCEMENT_SERVICE       = UUID.from_16_bits(0x1851, 'Basic Audio Announcement')
127GATT_BROADCAST_AUDIO_ANNOUNCEMENT_SERVICE   = UUID.from_16_bits(0x1852, 'Broadcast Audio Announcement')
128GATT_COMMON_AUDIO_SERVICE                   = UUID.from_16_bits(0x1853, 'Common Audio')
129GATT_HEARING_ACCESS_SERVICE                 = UUID.from_16_bits(0x1854, 'Hearing Access')
130GATT_TELEPHONY_AND_MEDIA_AUDIO_SERVICE      = UUID.from_16_bits(0x1855, 'Telephony and Media Audio')
131GATT_PUBLIC_BROADCAST_ANNOUNCEMENT_SERVICE  = UUID.from_16_bits(0x1856, 'Public Broadcast Announcement')
132GATT_ELECTRONIC_SHELF_LABEL_SERVICE         = UUID.from_16_bits(0X1857, 'Electronic Shelf Label')
133GATT_GAMING_AUDIO_SERVICE                   = UUID.from_16_bits(0x1858, 'Gaming Audio')
134GATT_MESH_PROXY_SOLICITATION_SERVICE        = UUID.from_16_bits(0x1859, 'Mesh Audio Solicitation')
135
136# Attribute Types
137GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE   = UUID.from_16_bits(0x2800, 'Primary Service')
138GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE = UUID.from_16_bits(0x2801, 'Secondary Service')
139GATT_INCLUDE_ATTRIBUTE_TYPE           = UUID.from_16_bits(0x2802, 'Include')
140GATT_CHARACTERISTIC_ATTRIBUTE_TYPE    = UUID.from_16_bits(0x2803, 'Characteristic')
141
142# Descriptors
143GATT_CHARACTERISTIC_EXTENDED_PROPERTIES_DESCRIPTOR   = UUID.from_16_bits(0x2900, 'Characteristic Extended Properties')
144GATT_CHARACTERISTIC_USER_DESCRIPTION_DESCRIPTOR      = UUID.from_16_bits(0x2901, 'Characteristic User Description')
145GATT_CLIENT_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR  = UUID.from_16_bits(0x2902, 'Client Characteristic Configuration')
146GATT_SERVER_CHARACTERISTIC_CONFIGURATION_DESCRIPTOR  = UUID.from_16_bits(0x2903, 'Server Characteristic Configuration')
147GATT_CHARACTERISTIC_PRESENTATION_FORMAT_DESCRIPTOR   = UUID.from_16_bits(0x2904, 'Characteristic Format')
148GATT_CHARACTERISTIC_AGGREGATE_FORMAT_DESCRIPTOR      = UUID.from_16_bits(0x2905, 'Characteristic Aggregate Format')
149GATT_VALID_RANGE_DESCRIPTOR                          = UUID.from_16_bits(0x2906, 'Valid Range')
150GATT_EXTERNAL_REPORT_DESCRIPTOR                      = UUID.from_16_bits(0x2907, 'External Report')
151GATT_REPORT_REFERENCE_DESCRIPTOR                     = UUID.from_16_bits(0x2908, 'Report Reference')
152GATT_NUMBER_OF_DIGITALS_DESCRIPTOR                   = UUID.from_16_bits(0x2909, 'Number of Digitals')
153GATT_VALUE_TRIGGER_SETTING_DESCRIPTOR                = UUID.from_16_bits(0x290A, 'Value Trigger Setting')
154GATT_ENVIRONMENTAL_SENSING_CONFIGURATION_DESCRIPTOR  = UUID.from_16_bits(0x290B, 'Environmental Sensing Configuration')
155GATT_ENVIRONMENTAL_SENSING_MEASUREMENT_DESCRIPTOR    = UUID.from_16_bits(0x290C, 'Environmental Sensing Measurement')
156GATT_ENVIRONMENTAL_SENSING_TRIGGER_DESCRIPTOR        = UUID.from_16_bits(0x290D, 'Environmental Sensing Trigger Setting')
157GATT_TIME_TRIGGER_DESCRIPTOR                         = UUID.from_16_bits(0x290E, 'Time Trigger Setting')
158GATT_COMPLETE_BR_EDR_TRANSPORT_BLOCK_DATA_DESCRIPTOR = UUID.from_16_bits(0x290F, 'Complete BR-EDR Transport Block Data')
159GATT_OBSERVATION_SCHEDULE_DESCRIPTOR                 = UUID.from_16_bits(0x290F, 'Observation Schedule')
160GATT_VALID_RANGE_AND_ACCURACY_DESCRIPTOR             = UUID.from_16_bits(0x290F, 'Valid Range And Accuracy')
161
162# Device Information Service
163GATT_SYSTEM_ID_CHARACTERISTIC                          = UUID.from_16_bits(0x2A23, 'System ID')
164GATT_MODEL_NUMBER_STRING_CHARACTERISTIC                = UUID.from_16_bits(0x2A24, 'Model Number String')
165GATT_SERIAL_NUMBER_STRING_CHARACTERISTIC               = UUID.from_16_bits(0x2A25, 'Serial Number String')
166GATT_FIRMWARE_REVISION_STRING_CHARACTERISTIC           = UUID.from_16_bits(0x2A26, 'Firmware Revision String')
167GATT_HARDWARE_REVISION_STRING_CHARACTERISTIC           = UUID.from_16_bits(0x2A27, 'Hardware Revision String')
168GATT_SOFTWARE_REVISION_STRING_CHARACTERISTIC           = UUID.from_16_bits(0x2A28, 'Software Revision String')
169GATT_MANUFACTURER_NAME_STRING_CHARACTERISTIC           = UUID.from_16_bits(0x2A29, 'Manufacturer Name String')
170GATT_REGULATORY_CERTIFICATION_DATA_LIST_CHARACTERISTIC = UUID.from_16_bits(0x2A2A, 'IEEE 11073-20601 Regulatory Certification Data List')
171GATT_PNP_ID_CHARACTERISTIC                             = UUID.from_16_bits(0x2A50, 'PnP ID')
172
173# Human Interface Device Service
174GATT_HID_INFORMATION_CHARACTERISTIC   = UUID.from_16_bits(0x2A4A, 'HID Information')
175GATT_REPORT_MAP_CHARACTERISTIC        = UUID.from_16_bits(0x2A4B, 'Report Map')
176GATT_HID_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A4C, 'HID Control Point')
177GATT_REPORT_CHARACTERISTIC            = UUID.from_16_bits(0x2A4D, 'Report')
178GATT_PROTOCOL_MODE_CHARACTERISTIC     = UUID.from_16_bits(0x2A4E, 'Protocol Mode')
179
180# Heart Rate Service
181GATT_HEART_RATE_MEASUREMENT_CHARACTERISTIC   = UUID.from_16_bits(0x2A37, 'Heart Rate Measurement')
182GATT_BODY_SENSOR_LOCATION_CHARACTERISTIC     = UUID.from_16_bits(0x2A38, 'Body Sensor Location')
183GATT_HEART_RATE_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2A39, 'Heart Rate Control Point')
184
185# Battery Service
186GATT_BATTERY_LEVEL_CHARACTERISTIC = UUID.from_16_bits(0x2A19, 'Battery Level')
187
188# Telephony And Media Audio Service (TMAS)
189GATT_TMAP_ROLE_CHARACTERISTIC = UUID.from_16_bits(0x2B51, 'TMAP Role')
190
191# Audio Input Control Service (AICS)
192GATT_AUDIO_INPUT_STATE_CHARACTERISTIC         = UUID.from_16_bits(0x2B77, 'Audio Input State')
193GATT_GAIN_SETTINGS_ATTRIBUTE_CHARACTERISTIC   = UUID.from_16_bits(0x2B78, 'Gain Settings Attribute')
194GATT_AUDIO_INPUT_TYPE_CHARACTERISTIC          = UUID.from_16_bits(0x2B79, 'Audio Input Type')
195GATT_AUDIO_INPUT_STATUS_CHARACTERISTIC        = UUID.from_16_bits(0x2B7A, 'Audio Input Status')
196GATT_AUDIO_INPUT_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B7B, 'Audio Input Control Point')
197GATT_AUDIO_INPUT_DESCRIPTION_CHARACTERISTIC   = UUID.from_16_bits(0x2B7C, 'Audio Input Description')
198
199# Volume Control Service (VCS)
200GATT_VOLUME_STATE_CHARACTERISTIC                = UUID.from_16_bits(0x2B7D, 'Volume State')
201GATT_VOLUME_CONTROL_POINT_CHARACTERISTIC        = UUID.from_16_bits(0x2B7E, 'Volume Control Point')
202GATT_VOLUME_FLAGS_CHARACTERISTIC                = UUID.from_16_bits(0x2B7F, 'Volume Flags')
203
204# Volume Offset Control Service (VOCS)
205GATT_VOLUME_OFFSET_STATE_CHARACTERISTIC         = UUID.from_16_bits(0x2B80, 'Volume Offset State')
206GATT_AUDIO_LOCATION_CHARACTERISTIC              = UUID.from_16_bits(0x2B81, 'Audio Location')
207GATT_VOLUME_OFFSET_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2B82, 'Volume Offset Control Point')
208GATT_AUDIO_OUTPUT_DESCRIPTION_CHARACTERISTIC    = UUID.from_16_bits(0x2B83, 'Audio Output Description')
209
210# Coordinated Set Identification Service (CSIS)
211GATT_SET_IDENTITY_RESOLVING_KEY_CHARACTERISTIC  = UUID.from_16_bits(0x2B84, 'Set Identity Resolving Key')
212GATT_COORDINATED_SET_SIZE_CHARACTERISTIC        = UUID.from_16_bits(0x2B85, 'Coordinated Set Size')
213GATT_SET_MEMBER_LOCK_CHARACTERISTIC             = UUID.from_16_bits(0x2B86, 'Set Member Lock')
214GATT_SET_MEMBER_RANK_CHARACTERISTIC             = UUID.from_16_bits(0x2B87, 'Set Member Rank')
215
216# Media Control Service (MCS)
217GATT_MEDIA_PLAYER_NAME_CHARACTERISTIC                     = UUID.from_16_bits(0x2B93, 'Media Player Name')
218GATT_MEDIA_PLAYER_ICON_OBJECT_ID_CHARACTERISTIC           = UUID.from_16_bits(0x2B94, 'Media Player Icon Object ID')
219GATT_MEDIA_PLAYER_ICON_URL_CHARACTERISTIC                 = UUID.from_16_bits(0x2B95, 'Media Player Icon URL')
220GATT_TRACK_CHANGED_CHARACTERISTIC                         = UUID.from_16_bits(0x2B96, 'Track Changed')
221GATT_TRACK_TITLE_CHARACTERISTIC                           = UUID.from_16_bits(0x2B97, 'Track Title')
222GATT_TRACK_DURATION_CHARACTERISTIC                        = UUID.from_16_bits(0x2B98, 'Track Duration')
223GATT_TRACK_POSITION_CHARACTERISTIC                        = UUID.from_16_bits(0x2B99, 'Track Position')
224GATT_PLAYBACK_SPEED_CHARACTERISTIC                        = UUID.from_16_bits(0x2B9A, 'Playback Speed')
225GATT_SEEKING_SPEED_CHARACTERISTIC                         = UUID.from_16_bits(0x2B9B, 'Seeking Speed')
226GATT_CURRENT_TRACK_SEGMENTS_OBJECT_ID_CHARACTERISTIC      = UUID.from_16_bits(0x2B9C, 'Current Track Segments Object ID')
227GATT_CURRENT_TRACK_OBJECT_ID_CHARACTERISTIC               = UUID.from_16_bits(0x2B9D, 'Current Track Object ID')
228GATT_NEXT_TRACK_OBJECT_ID_CHARACTERISTIC                  = UUID.from_16_bits(0x2B9E, 'Next Track Object ID')
229GATT_PARENT_GROUP_OBJECT_ID_CHARACTERISTIC                = UUID.from_16_bits(0x2B9F, 'Parent Group Object ID')
230GATT_CURRENT_GROUP_OBJECT_ID_CHARACTERISTIC               = UUID.from_16_bits(0x2BA0, 'Current Group Object ID')
231GATT_PLAYING_ORDER_CHARACTERISTIC                         = UUID.from_16_bits(0x2BA1, 'Playing Order')
232GATT_PLAYING_ORDERS_SUPPORTED_CHARACTERISTIC              = UUID.from_16_bits(0x2BA2, 'Playing Orders Supported')
233GATT_MEDIA_STATE_CHARACTERISTIC                           = UUID.from_16_bits(0x2BA3, 'Media State')
234GATT_MEDIA_CONTROL_POINT_CHARACTERISTIC                   = UUID.from_16_bits(0x2BA4, 'Media Control Point')
235GATT_MEDIA_CONTROL_POINT_OPCODES_SUPPORTED_CHARACTERISTIC = UUID.from_16_bits(0x2BA5, 'Media Control Point Opcodes Supported')
236GATT_SEARCH_RESULTS_OBJECT_ID_CHARACTERISTIC              = UUID.from_16_bits(0x2BA6, 'Search Results Object ID')
237GATT_SEARCH_CONTROL_POINT_CHARACTERISTIC                  = UUID.from_16_bits(0x2BA7, 'Search Control Point')
238GATT_CONTENT_CONTROL_ID_CHARACTERISTIC                    = UUID.from_16_bits(0x2BBA, 'Content Control Id')
239
240# Telephone Bearer Service (TBS)
241GATT_BEARER_PROVIDER_NAME_CHARACTERISTIC                      = UUID.from_16_bits(0x2BB4, 'Bearer Provider Name')
242GATT_BEARER_UCI_CHARACTERISTIC                                = UUID.from_16_bits(0x2BB5, 'Bearer UCI')
243GATT_BEARER_TECHNOLOGY_CHARACTERISTIC                         = UUID.from_16_bits(0x2BB6, 'Bearer Technology')
244GATT_BEARER_URI_SCHEMES_SUPPORTED_LIST_CHARACTERISTIC         = UUID.from_16_bits(0x2BB7, 'Bearer URI Schemes Supported List')
245GATT_BEARER_SIGNAL_STRENGTH_CHARACTERISTIC                    = UUID.from_16_bits(0x2BB8, 'Bearer Signal Strength')
246GATT_BEARER_SIGNAL_STRENGTH_REPORTING_INTERVAL_CHARACTERISTIC = UUID.from_16_bits(0x2BB9, 'Bearer Signal Strength Reporting Interval')
247GATT_BEARER_LIST_CURRENT_CALLS_CHARACTERISTIC                 = UUID.from_16_bits(0x2BBA, 'Bearer List Current Calls')
248GATT_CONTENT_CONTROL_ID_CHARACTERISTIC                        = UUID.from_16_bits(0x2BBB, 'Content Control ID')
249GATT_STATUS_FLAGS_CHARACTERISTIC                              = UUID.from_16_bits(0x2BBC, 'Status Flags')
250GATT_INCOMING_CALL_TARGET_BEARER_URI_CHARACTERISTIC           = UUID.from_16_bits(0x2BBD, 'Incoming Call Target Bearer URI')
251GATT_CALL_STATE_CHARACTERISTIC                                = UUID.from_16_bits(0x2BBE, 'Call State')
252GATT_CALL_CONTROL_POINT_CHARACTERISTIC                        = UUID.from_16_bits(0x2BBF, 'Call Control Point')
253GATT_CALL_CONTROL_POINT_OPTIONAL_OPCODES_CHARACTERISTIC       = UUID.from_16_bits(0x2BC0, 'Call Control Point Optional Opcodes')
254GATT_TERMINATION_REASON_CHARACTERISTIC                        = UUID.from_16_bits(0x2BC1, 'Termination Reason')
255GATT_INCOMING_CALL_CHARACTERISTIC                             = UUID.from_16_bits(0x2BC2, 'Incoming Call')
256GATT_CALL_FRIENDLY_NAME_CHARACTERISTIC                        = UUID.from_16_bits(0x2BC3, 'Call Friendly Name')
257
258# Microphone Control Service (MICS)
259GATT_MUTE_CHARACTERISTIC = UUID.from_16_bits(0x2BC3, 'Mute')
260
261# Audio Stream Control Service (ASCS)
262GATT_SINK_ASE_CHARACTERISTIC                    = UUID.from_16_bits(0x2BC4, 'Sink ASE')
263GATT_SOURCE_ASE_CHARACTERISTIC                  = UUID.from_16_bits(0x2BC5, 'Source ASE')
264GATT_ASE_CONTROL_POINT_CHARACTERISTIC           = UUID.from_16_bits(0x2BC6, 'ASE Control Point')
265
266# Broadcast Audio Scan Service (BASS)
267GATT_BROADCAST_AUDIO_SCAN_CONTROL_POINT_CHARACTERISTIC = UUID.from_16_bits(0x2BC7, 'Broadcast Audio Scan Control Point')
268GATT_BROADCAST_RECEIVE_STATE_CHARACTERISTIC            = UUID.from_16_bits(0x2BC8, 'Broadcast Receive State')
269
270# Published Audio Capabilities Service (PACS)
271GATT_SINK_PAC_CHARACTERISTIC                    = UUID.from_16_bits(0x2BC9, 'Sink PAC')
272GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC         = UUID.from_16_bits(0x2BCA, 'Sink Audio Location')
273GATT_SOURCE_PAC_CHARACTERISTIC                  = UUID.from_16_bits(0x2BCB, 'Source PAC')
274GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC       = UUID.from_16_bits(0x2BCC, 'Source Audio Location')
275GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC    = UUID.from_16_bits(0x2BCD, 'Available Audio Contexts')
276GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC    = UUID.from_16_bits(0x2BCE, 'Supported Audio Contexts')
277
278# ASHA Service
279GATT_ASHA_SERVICE                             = UUID.from_16_bits(0xFDF0, 'Audio Streaming for Hearing Aid')
280GATT_ASHA_READ_ONLY_PROPERTIES_CHARACTERISTIC = UUID('6333651e-c481-4a3e-9169-7c902aad37bb', 'ReadOnlyProperties')
281GATT_ASHA_AUDIO_CONTROL_POINT_CHARACTERISTIC  = UUID('f0d4de7e-4a88-476c-9d9f-1937b0996cc0', 'AudioControlPoint')
282GATT_ASHA_AUDIO_STATUS_CHARACTERISTIC         = UUID('38663f1a-e711-4cac-b641-326b56404837', 'AudioStatus')
283GATT_ASHA_VOLUME_CHARACTERISTIC               = UUID('00e4ca9e-ab14-41e4-8823-f9e70c7e91df', 'Volume')
284GATT_ASHA_LE_PSM_OUT_CHARACTERISTIC           = UUID('2d410339-82b6-42aa-b34e-e2e01df8cc1a', 'LE_PSM_OUT')
285
286# Misc
287GATT_DEVICE_NAME_CHARACTERISTIC                                = UUID.from_16_bits(0x2A00, 'Device Name')
288GATT_APPEARANCE_CHARACTERISTIC                                 = UUID.from_16_bits(0x2A01, 'Appearance')
289GATT_PERIPHERAL_PRIVACY_FLAG_CHARACTERISTIC                    = UUID.from_16_bits(0x2A02, 'Peripheral Privacy Flag')
290GATT_RECONNECTION_ADDRESS_CHARACTERISTIC                       = UUID.from_16_bits(0x2A03, 'Reconnection Address')
291GATT_PERIPHERAL_PREFERRED_CONNECTION_PARAMETERS_CHARACTERISTIC = UUID.from_16_bits(0x2A04, 'Peripheral Preferred Connection Parameters')
292GATT_SERVICE_CHANGED_CHARACTERISTIC                            = UUID.from_16_bits(0x2A05, 'Service Changed')
293GATT_ALERT_LEVEL_CHARACTERISTIC                                = UUID.from_16_bits(0x2A06, 'Alert Level')
294GATT_TX_POWER_LEVEL_CHARACTERISTIC                             = UUID.from_16_bits(0x2A07, 'Tx Power Level')
295GATT_BOOT_KEYBOARD_INPUT_REPORT_CHARACTERISTIC                 = UUID.from_16_bits(0x2A22, 'Boot Keyboard Input Report')
296GATT_CURRENT_TIME_CHARACTERISTIC                               = UUID.from_16_bits(0x2A2B, 'Current Time')
297GATT_BOOT_KEYBOARD_OUTPUT_REPORT_CHARACTERISTIC                = UUID.from_16_bits(0x2A32, 'Boot Keyboard Output Report')
298GATT_CENTRAL_ADDRESS_RESOLUTION__CHARACTERISTIC                = UUID.from_16_bits(0x2AA6, 'Central Address Resolution')
299GATT_CLIENT_SUPPORTED_FEATURES_CHARACTERISTIC                  = UUID.from_16_bits(0x2B29, 'Client Supported Features')
300GATT_DATABASE_HASH_CHARACTERISTIC                              = UUID.from_16_bits(0x2B2A, 'Database Hash')
301GATT_SERVER_SUPPORTED_FEATURES_CHARACTERISTIC                  = UUID.from_16_bits(0x2B3A, 'Server Supported Features')
302
303# fmt: on
304# pylint: enable=line-too-long
305
306
307# -----------------------------------------------------------------------------
308# Utils
309# -----------------------------------------------------------------------------
310
311
312def show_services(services: Iterable[Service]) -> None:
313    for service in services:
314        print(color(str(service), 'cyan'))
315
316        for characteristic in service.characteristics:
317            print(color('  ' + str(characteristic), 'magenta'))
318
319            for descriptor in characteristic.descriptors:
320                print(color('    ' + str(descriptor), 'green'))
321
322
323# -----------------------------------------------------------------------------
324class Service(Attribute):
325    '''
326    See Vol 3, Part G - 3.1 SERVICE DEFINITION
327    '''
328
329    uuid: UUID
330    characteristics: List[Characteristic]
331    included_services: List[Service]
332
333    def __init__(
334        self,
335        uuid: Union[str, UUID],
336        characteristics: List[Characteristic],
337        primary=True,
338        included_services: List[Service] = [],
339    ) -> None:
340        # Convert the uuid to a UUID object if it isn't already
341        if isinstance(uuid, str):
342            uuid = UUID(uuid)
343
344        super().__init__(
345            (
346                GATT_PRIMARY_SERVICE_ATTRIBUTE_TYPE
347                if primary
348                else GATT_SECONDARY_SERVICE_ATTRIBUTE_TYPE
349            ),
350            Attribute.READABLE,
351            uuid.to_pdu_bytes(),
352        )
353        self.uuid = uuid
354        self.included_services = included_services[:]
355        self.characteristics = characteristics[:]
356        self.primary = primary
357
358    def get_advertising_data(self) -> Optional[bytes]:
359        """
360        Get Service specific advertising data
361        Defined by each Service, default value is empty
362        :return Service data for advertising
363        """
364        return None
365
366    def __str__(self) -> str:
367        return (
368            f'Service(handle=0x{self.handle:04X}, '
369            f'end=0x{self.end_group_handle:04X}, '
370            f'uuid={self.uuid})'
371            f'{"" if self.primary else "*"}'
372        )
373
374
375# -----------------------------------------------------------------------------
376class TemplateService(Service):
377    '''
378    Convenience abstract class that can be used by profile-specific subclasses that want
379    to expose their UUID as a class property
380    '''
381
382    UUID: UUID
383
384    def __init__(
385        self,
386        characteristics: List[Characteristic],
387        primary: bool = True,
388        included_services: List[Service] = [],
389    ) -> None:
390        super().__init__(self.UUID, characteristics, primary, included_services)
391
392
393# -----------------------------------------------------------------------------
394class IncludedServiceDeclaration(Attribute):
395    '''
396    See Vol 3, Part G - 3.2 INCLUDE DEFINITION
397    '''
398
399    service: Service
400
401    def __init__(self, service: Service) -> None:
402        declaration_bytes = struct.pack(
403            '<HH2s', service.handle, service.end_group_handle, service.uuid.to_bytes()
404        )
405        super().__init__(
406            GATT_INCLUDE_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes
407        )
408        self.service = service
409
410    def __str__(self) -> str:
411        return (
412            f'IncludedServiceDefinition(handle=0x{self.handle:04X}, '
413            f'group_starting_handle=0x{self.service.handle:04X}, '
414            f'group_ending_handle=0x{self.service.end_group_handle:04X}, '
415            f'uuid={self.service.uuid})'
416        )
417
418
419# -----------------------------------------------------------------------------
420class Characteristic(Attribute):
421    '''
422    See Vol 3, Part G - 3.3 CHARACTERISTIC DEFINITION
423    '''
424
425    uuid: UUID
426    properties: Characteristic.Properties
427
428    class Properties(enum.IntFlag):
429        """Property flags"""
430
431        BROADCAST = 0x01
432        READ = 0x02
433        WRITE_WITHOUT_RESPONSE = 0x04
434        WRITE = 0x08
435        NOTIFY = 0x10
436        INDICATE = 0x20
437        AUTHENTICATED_SIGNED_WRITES = 0x40
438        EXTENDED_PROPERTIES = 0x80
439
440        @classmethod
441        def from_string(cls, properties_str: str) -> Characteristic.Properties:
442            try:
443                return functools.reduce(
444                    lambda x, y: x | cls[y],
445                    properties_str.replace("|", ",").split(","),
446                    Characteristic.Properties(0),
447                )
448            except (TypeError, KeyError):
449                # The check for `p.name is not None` here is needed because for InFlag
450                # enums, the .name property can be None, when the enum value is 0,
451                # so the type hint for .name is Optional[str].
452                enum_list: List[str] = [p.name for p in cls if p.name is not None]
453                enum_list_str = ",".join(enum_list)
454                raise TypeError(
455                    f"Characteristic.Properties::from_string() error:\nExpected a string containing any of the keys, separated by , or |: {enum_list_str}\nGot: {properties_str}"
456                )
457
458        def __str__(self) -> str:
459            # NOTE: we override this method to offer a consistent result between python
460            # versions: the value returned by IntFlag.__str__() changed in version 11.
461            return '|'.join(
462                flag.name
463                for flag in Characteristic.Properties
464                if self.value & flag.value and flag.name is not None
465            )
466
467    # For backwards compatibility these are defined here
468    # For new code, please use Characteristic.Properties.X
469    BROADCAST = Properties.BROADCAST
470    READ = Properties.READ
471    WRITE_WITHOUT_RESPONSE = Properties.WRITE_WITHOUT_RESPONSE
472    WRITE = Properties.WRITE
473    NOTIFY = Properties.NOTIFY
474    INDICATE = Properties.INDICATE
475    AUTHENTICATED_SIGNED_WRITES = Properties.AUTHENTICATED_SIGNED_WRITES
476    EXTENDED_PROPERTIES = Properties.EXTENDED_PROPERTIES
477
478    def __init__(
479        self,
480        uuid: Union[str, bytes, UUID],
481        properties: Characteristic.Properties,
482        permissions: Union[str, Attribute.Permissions],
483        value: Union[str, bytes, CharacteristicValue] = b'',
484        descriptors: Sequence[Descriptor] = (),
485    ):
486        super().__init__(uuid, permissions, value)
487        self.uuid = self.type
488        self.properties = properties
489        self.descriptors = descriptors
490
491    def get_descriptor(self, descriptor_type):
492        for descriptor in self.descriptors:
493            if descriptor.type == descriptor_type:
494                return descriptor
495
496        return None
497
498    def has_properties(self, properties: Characteristic.Properties) -> bool:
499        return self.properties & properties == properties
500
501    def __str__(self) -> str:
502        return (
503            f'Characteristic(handle=0x{self.handle:04X}, '
504            f'end=0x{self.end_group_handle:04X}, '
505            f'uuid={self.uuid}, '
506            f'{self.properties})'
507        )
508
509
510# -----------------------------------------------------------------------------
511class CharacteristicDeclaration(Attribute):
512    '''
513    See Vol 3, Part G - 3.3.1 CHARACTERISTIC DECLARATION
514    '''
515
516    characteristic: Characteristic
517
518    def __init__(self, characteristic: Characteristic, value_handle: int) -> None:
519        declaration_bytes = (
520            struct.pack('<BH', characteristic.properties, value_handle)
521            + characteristic.uuid.to_pdu_bytes()
522        )
523        super().__init__(
524            GATT_CHARACTERISTIC_ATTRIBUTE_TYPE, Attribute.READABLE, declaration_bytes
525        )
526        self.value_handle = value_handle
527        self.characteristic = characteristic
528
529    def __str__(self) -> str:
530        return (
531            f'CharacteristicDeclaration(handle=0x{self.handle:04X}, '
532            f'value_handle=0x{self.value_handle:04X}, '
533            f'uuid={self.characteristic.uuid}, '
534            f'{self.characteristic.properties})'
535        )
536
537
538# -----------------------------------------------------------------------------
539class CharacteristicValue(AttributeValue):
540    """Same as AttributeValue, for backward compatibility"""
541
542
543# -----------------------------------------------------------------------------
544class CharacteristicAdapter:
545    '''
546    An adapter that can adapt Characteristic and AttributeProxy objects
547    by wrapping their `read_value()` and `write_value()` methods with ones that
548    return/accept encoded/decoded values.
549
550    For proxies (i.e used by a GATT client), the adaptation is one where the return
551    value of `read_value()` is decoded and the value passed to `write_value()` is
552    encoded. The `subscribe()` method, is wrapped with one where the values are decoded
553    before being passed to the subscriber.
554
555    For local values (i.e hosted by a GATT server) the adaptation is one where the
556    return value of `read_value()` is encoded and the value passed to `write_value()`
557    is decoded.
558    '''
559
560    read_value: Callable
561    write_value: Callable
562
563    def __init__(self, characteristic: Union[Characteristic, AttributeProxy]):
564        self.wrapped_characteristic = characteristic
565        self.subscribers: Dict[Callable, Callable] = (
566            {}
567        )  # Map from subscriber to proxy subscriber
568
569        if isinstance(characteristic, Characteristic):
570            self.read_value = self.read_encoded_value
571            self.write_value = self.write_encoded_value
572        else:
573            self.read_value = self.read_decoded_value
574            self.write_value = self.write_decoded_value
575            self.subscribe = self.wrapped_subscribe
576            self.unsubscribe = self.wrapped_unsubscribe
577
578    def __getattr__(self, name):
579        return getattr(self.wrapped_characteristic, name)
580
581    def __setattr__(self, name, value):
582        if name in (
583            'wrapped_characteristic',
584            'subscribers',
585            'read_value',
586            'write_value',
587            'subscribe',
588            'unsubscribe',
589        ):
590            super().__setattr__(name, value)
591        else:
592            setattr(self.wrapped_characteristic, name, value)
593
594    async def read_encoded_value(self, connection):
595        return self.encode_value(
596            await self.wrapped_characteristic.read_value(connection)
597        )
598
599    async def write_encoded_value(self, connection, value):
600        return await self.wrapped_characteristic.write_value(
601            connection, self.decode_value(value)
602        )
603
604    async def read_decoded_value(self):
605        return self.decode_value(await self.wrapped_characteristic.read_value())
606
607    async def write_decoded_value(self, value, with_response=False):
608        return await self.wrapped_characteristic.write_value(
609            self.encode_value(value), with_response
610        )
611
612    def encode_value(self, value):
613        return value
614
615    def decode_value(self, value):
616        return value
617
618    def wrapped_subscribe(self, subscriber=None):
619        if subscriber is not None:
620            if subscriber in self.subscribers:
621                # We already have a proxy subscriber
622                subscriber = self.subscribers[subscriber]
623            else:
624                # Create and register a proxy that will decode the value
625                original_subscriber = subscriber
626
627                def on_change(value):
628                    original_subscriber(self.decode_value(value))
629
630                self.subscribers[subscriber] = on_change
631                subscriber = on_change
632
633        return self.wrapped_characteristic.subscribe(subscriber)
634
635    def wrapped_unsubscribe(self, subscriber=None):
636        if subscriber in self.subscribers:
637            subscriber = self.subscribers.pop(subscriber)
638
639        return self.wrapped_characteristic.unsubscribe(subscriber)
640
641    def __str__(self) -> str:
642        wrapped = str(self.wrapped_characteristic)
643        return f'{self.__class__.__name__}({wrapped})'
644
645
646# -----------------------------------------------------------------------------
647class DelegatedCharacteristicAdapter(CharacteristicAdapter):
648    '''
649    Adapter that converts bytes values using an encode and a decode function.
650    '''
651
652    def __init__(self, characteristic, encode=None, decode=None):
653        super().__init__(characteristic)
654        self.encode = encode
655        self.decode = decode
656
657    def encode_value(self, value):
658        return self.encode(value) if self.encode else value
659
660    def decode_value(self, value):
661        return self.decode(value) if self.decode else value
662
663
664# -----------------------------------------------------------------------------
665class PackedCharacteristicAdapter(CharacteristicAdapter):
666    '''
667    Adapter that packs/unpacks characteristic values according to a standard
668    Python `struct` format.
669    For formats with a single value, the adapted `read_value` and `write_value`
670    methods return/accept single values. For formats with multiple values,
671    they return/accept a tuple with the same number of elements as is required for
672    the format.
673    '''
674
675    def __init__(self, characteristic, pack_format):
676        super().__init__(characteristic)
677        self.struct = struct.Struct(pack_format)
678
679    def pack(self, *values):
680        return self.struct.pack(*values)
681
682    def unpack(self, buffer):
683        return self.struct.unpack(buffer)
684
685    def encode_value(self, value):
686        return self.pack(*value if isinstance(value, tuple) else (value,))
687
688    def decode_value(self, value):
689        unpacked = self.unpack(value)
690        return unpacked[0] if len(unpacked) == 1 else unpacked
691
692
693# -----------------------------------------------------------------------------
694class MappedCharacteristicAdapter(PackedCharacteristicAdapter):
695    '''
696    Adapter that packs/unpacks characteristic values according to a standard
697    Python `struct` format.
698    The adapted `read_value` and `write_value` methods return/accept aa dictionary which
699    is packed/unpacked according to format, with the arguments extracted from the
700    dictionary by key, in the same order as they occur in the `keys` parameter.
701    '''
702
703    def __init__(self, characteristic, pack_format, keys):
704        super().__init__(characteristic, pack_format)
705        self.keys = keys
706
707    # pylint: disable=arguments-differ
708    def pack(self, values):
709        return super().pack(*(values[key] for key in self.keys))
710
711    def unpack(self, buffer):
712        return dict(zip(self.keys, super().unpack(buffer)))
713
714
715# -----------------------------------------------------------------------------
716class UTF8CharacteristicAdapter(CharacteristicAdapter):
717    '''
718    Adapter that converts strings to/from bytes using UTF-8 encoding
719    '''
720
721    def encode_value(self, value: str) -> bytes:
722        return value.encode('utf-8')
723
724    def decode_value(self, value: bytes) -> str:
725        return value.decode('utf-8')
726
727
728# -----------------------------------------------------------------------------
729class Descriptor(Attribute):
730    '''
731    See Vol 3, Part G - 3.3.3 Characteristic Descriptor Declarations
732    '''
733
734    def __str__(self) -> str:
735        if isinstance(self.value, bytes):
736            value_str = self.value.hex()
737        elif isinstance(self.value, CharacteristicValue):
738            value = self.value.read(None)
739            if isinstance(value, bytes):
740                value_str = value.hex()
741            else:
742                value_str = '<async>'
743        else:
744            value_str = '<...>'
745        return (
746            f'Descriptor(handle=0x{self.handle:04X}, '
747            f'type={self.type}, '
748            f'value={value_str})'
749        )
750
751
752# -----------------------------------------------------------------------------
753class ClientCharacteristicConfigurationBits(enum.IntFlag):
754    '''
755    See Vol 3, Part G - 3.3.3.3 - Table 3.11 Client Characteristic Configuration bit
756    field definition
757    '''
758
759    DEFAULT = 0x0000
760    NOTIFICATION = 0x0001
761    INDICATION = 0x0002
762