1# Copyright 2021-2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15 16# ----------------------------------------------------------------------------- 17# Imports 18# ----------------------------------------------------------------------------- 19from __future__ import annotations 20 21from collections.abc import Sequence 22import dataclasses 23import enum 24import struct 25import functools 26import logging 27from typing import Optional, List, Union, Type, Dict, Any, Tuple 28 29from bumble import core 30from bumble import colors 31from bumble import device 32from bumble import hci 33from bumble import gatt 34from bumble import gatt_client 35 36 37# ----------------------------------------------------------------------------- 38# Logging 39# ----------------------------------------------------------------------------- 40logger = logging.getLogger(__name__) 41 42# ----------------------------------------------------------------------------- 43# Constants 44# ----------------------------------------------------------------------------- 45 46 47class AudioLocation(enum.IntFlag): 48 '''Bluetooth Assigned Numbers, Section 6.12.1 - Audio Location''' 49 50 # fmt: off 51 NOT_ALLOWED = 0x00000000 52 FRONT_LEFT = 0x00000001 53 FRONT_RIGHT = 0x00000002 54 FRONT_CENTER = 0x00000004 55 LOW_FREQUENCY_EFFECTS_1 = 0x00000008 56 BACK_LEFT = 0x00000010 57 BACK_RIGHT = 0x00000020 58 FRONT_LEFT_OF_CENTER = 0x00000040 59 FRONT_RIGHT_OF_CENTER = 0x00000080 60 BACK_CENTER = 0x00000100 61 LOW_FREQUENCY_EFFECTS_2 = 0x00000200 62 SIDE_LEFT = 0x00000400 63 SIDE_RIGHT = 0x00000800 64 TOP_FRONT_LEFT = 0x00001000 65 TOP_FRONT_RIGHT = 0x00002000 66 TOP_FRONT_CENTER = 0x00004000 67 TOP_CENTER = 0x00008000 68 TOP_BACK_LEFT = 0x00010000 69 TOP_BACK_RIGHT = 0x00020000 70 TOP_SIDE_LEFT = 0x00040000 71 TOP_SIDE_RIGHT = 0x00080000 72 TOP_BACK_CENTER = 0x00100000 73 BOTTOM_FRONT_CENTER = 0x00200000 74 BOTTOM_FRONT_LEFT = 0x00400000 75 BOTTOM_FRONT_RIGHT = 0x00800000 76 FRONT_LEFT_WIDE = 0x01000000 77 FRONT_RIGHT_WIDE = 0x02000000 78 LEFT_SURROUND = 0x04000000 79 RIGHT_SURROUND = 0x08000000 80 81 @property 82 def channel_count(self) -> int: 83 return bin(self.value).count('1') 84 85 86class AudioInputType(enum.IntEnum): 87 '''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type''' 88 89 # fmt: off 90 UNSPECIFIED = 0x00 91 BLUETOOTH = 0x01 92 MICROPHONE = 0x02 93 ANALOG = 0x03 94 DIGITAL = 0x04 95 RADIO = 0x05 96 STREAMING = 0x06 97 AMBIENT = 0x07 98 99 100class ContextType(enum.IntFlag): 101 '''Bluetooth Assigned Numbers, Section 6.12.3 - Context Type''' 102 103 # fmt: off 104 PROHIBITED = 0x0000 105 CONVERSATIONAL = 0x0002 106 MEDIA = 0x0004 107 GAME = 0x0008 108 INSTRUCTIONAL = 0x0010 109 VOICE_ASSISTANTS = 0x0020 110 LIVE = 0x0040 111 SOUND_EFFECTS = 0x0080 112 NOTIFICATIONS = 0x0100 113 RINGTONE = 0x0200 114 ALERTS = 0x0400 115 EMERGENCY_ALARM = 0x0800 116 117 118class SamplingFrequency(enum.IntEnum): 119 '''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency''' 120 121 # fmt: off 122 FREQ_8000 = 0x01 123 FREQ_11025 = 0x02 124 FREQ_16000 = 0x03 125 FREQ_22050 = 0x04 126 FREQ_24000 = 0x05 127 FREQ_32000 = 0x06 128 FREQ_44100 = 0x07 129 FREQ_48000 = 0x08 130 FREQ_88200 = 0x09 131 FREQ_96000 = 0x0A 132 FREQ_176400 = 0x0B 133 FREQ_192000 = 0x0C 134 FREQ_384000 = 0x0D 135 # fmt: on 136 137 @classmethod 138 def from_hz(cls, frequency: int) -> SamplingFrequency: 139 return { 140 8000: SamplingFrequency.FREQ_8000, 141 11025: SamplingFrequency.FREQ_11025, 142 16000: SamplingFrequency.FREQ_16000, 143 22050: SamplingFrequency.FREQ_22050, 144 24000: SamplingFrequency.FREQ_24000, 145 32000: SamplingFrequency.FREQ_32000, 146 44100: SamplingFrequency.FREQ_44100, 147 48000: SamplingFrequency.FREQ_48000, 148 88200: SamplingFrequency.FREQ_88200, 149 96000: SamplingFrequency.FREQ_96000, 150 176400: SamplingFrequency.FREQ_176400, 151 192000: SamplingFrequency.FREQ_192000, 152 384000: SamplingFrequency.FREQ_384000, 153 }[frequency] 154 155 @property 156 def hz(self) -> int: 157 return { 158 SamplingFrequency.FREQ_8000: 8000, 159 SamplingFrequency.FREQ_11025: 11025, 160 SamplingFrequency.FREQ_16000: 16000, 161 SamplingFrequency.FREQ_22050: 22050, 162 SamplingFrequency.FREQ_24000: 24000, 163 SamplingFrequency.FREQ_32000: 32000, 164 SamplingFrequency.FREQ_44100: 44100, 165 SamplingFrequency.FREQ_48000: 48000, 166 SamplingFrequency.FREQ_88200: 88200, 167 SamplingFrequency.FREQ_96000: 96000, 168 SamplingFrequency.FREQ_176400: 176400, 169 SamplingFrequency.FREQ_192000: 192000, 170 SamplingFrequency.FREQ_384000: 384000, 171 }[self] 172 173 174class SupportedSamplingFrequency(enum.IntFlag): 175 '''Bluetooth Assigned Numbers, Section 6.12.4.1 - Sample Frequency''' 176 177 # fmt: off 178 FREQ_8000 = 1 << (SamplingFrequency.FREQ_8000 - 1) 179 FREQ_11025 = 1 << (SamplingFrequency.FREQ_11025 - 1) 180 FREQ_16000 = 1 << (SamplingFrequency.FREQ_16000 - 1) 181 FREQ_22050 = 1 << (SamplingFrequency.FREQ_22050 - 1) 182 FREQ_24000 = 1 << (SamplingFrequency.FREQ_24000 - 1) 183 FREQ_32000 = 1 << (SamplingFrequency.FREQ_32000 - 1) 184 FREQ_44100 = 1 << (SamplingFrequency.FREQ_44100 - 1) 185 FREQ_48000 = 1 << (SamplingFrequency.FREQ_48000 - 1) 186 FREQ_88200 = 1 << (SamplingFrequency.FREQ_88200 - 1) 187 FREQ_96000 = 1 << (SamplingFrequency.FREQ_96000 - 1) 188 FREQ_176400 = 1 << (SamplingFrequency.FREQ_176400 - 1) 189 FREQ_192000 = 1 << (SamplingFrequency.FREQ_192000 - 1) 190 FREQ_384000 = 1 << (SamplingFrequency.FREQ_384000 - 1) 191 # fmt: on 192 193 @classmethod 194 def from_hz(cls, frequencies: Sequence[int]) -> SupportedSamplingFrequency: 195 MAPPING = { 196 8000: SupportedSamplingFrequency.FREQ_8000, 197 11025: SupportedSamplingFrequency.FREQ_11025, 198 16000: SupportedSamplingFrequency.FREQ_16000, 199 22050: SupportedSamplingFrequency.FREQ_22050, 200 24000: SupportedSamplingFrequency.FREQ_24000, 201 32000: SupportedSamplingFrequency.FREQ_32000, 202 44100: SupportedSamplingFrequency.FREQ_44100, 203 48000: SupportedSamplingFrequency.FREQ_48000, 204 88200: SupportedSamplingFrequency.FREQ_88200, 205 96000: SupportedSamplingFrequency.FREQ_96000, 206 176400: SupportedSamplingFrequency.FREQ_176400, 207 192000: SupportedSamplingFrequency.FREQ_192000, 208 384000: SupportedSamplingFrequency.FREQ_384000, 209 } 210 211 return functools.reduce( 212 lambda x, y: x | MAPPING[y], 213 frequencies, 214 cls(0), 215 ) 216 217 218class FrameDuration(enum.IntEnum): 219 '''Bluetooth Assigned Numbers, Section 6.12.5.2 - Frame Duration''' 220 221 # fmt: off 222 DURATION_7500_US = 0x00 223 DURATION_10000_US = 0x01 224 225 @property 226 def us(self) -> int: 227 return { 228 FrameDuration.DURATION_7500_US: 7500, 229 FrameDuration.DURATION_10000_US: 10000, 230 }[self] 231 232 233class SupportedFrameDuration(enum.IntFlag): 234 '''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration''' 235 236 # fmt: off 237 DURATION_7500_US_SUPPORTED = 0b0001 238 DURATION_10000_US_SUPPORTED = 0b0010 239 DURATION_7500_US_PREFERRED = 0b0001 240 DURATION_10000_US_PREFERRED = 0b0010 241 242 243class AnnouncementType(enum.IntEnum): 244 '''Basic Audio Profile, 3.5.3. Additional Audio Stream Control Service requirements''' 245 246 # fmt: off 247 GENERAL = 0x00 248 TARGETED = 0x01 249 250 251# ----------------------------------------------------------------------------- 252# ASE Operations 253# ----------------------------------------------------------------------------- 254 255 256class ASE_Operation: 257 ''' 258 See Audio Stream Control Service - 5 ASE Control operations. 259 ''' 260 261 classes: Dict[int, Type[ASE_Operation]] = {} 262 op_code: int 263 name: str 264 fields: Optional[Sequence[Any]] = None 265 ase_id: List[int] 266 267 class Opcode(enum.IntEnum): 268 # fmt: off 269 CONFIG_CODEC = 0x01 270 CONFIG_QOS = 0x02 271 ENABLE = 0x03 272 RECEIVER_START_READY = 0x04 273 DISABLE = 0x05 274 RECEIVER_STOP_READY = 0x06 275 UPDATE_METADATA = 0x07 276 RELEASE = 0x08 277 278 @staticmethod 279 def from_bytes(pdu: bytes) -> ASE_Operation: 280 op_code = pdu[0] 281 282 cls = ASE_Operation.classes.get(op_code) 283 if cls is None: 284 instance = ASE_Operation(pdu) 285 instance.name = ASE_Operation.Opcode(op_code).name 286 instance.op_code = op_code 287 return instance 288 self = cls.__new__(cls) 289 ASE_Operation.__init__(self, pdu) 290 if self.fields is not None: 291 self.init_from_bytes(pdu, 1) 292 return self 293 294 @staticmethod 295 def subclass(fields): 296 def inner(cls: Type[ASE_Operation]): 297 try: 298 operation = ASE_Operation.Opcode[cls.__name__[4:].upper()] 299 cls.name = operation.name 300 cls.op_code = operation 301 except: 302 raise KeyError(f'PDU name {cls.name} not found in Ase_Operation.Opcode') 303 cls.fields = fields 304 305 # Register a factory for this class 306 ASE_Operation.classes[cls.op_code] = cls 307 308 return cls 309 310 return inner 311 312 def __init__(self, pdu: Optional[bytes] = None, **kwargs) -> None: 313 if self.fields is not None and kwargs: 314 hci.HCI_Object.init_from_fields(self, self.fields, kwargs) 315 if pdu is None: 316 pdu = bytes([self.op_code]) + hci.HCI_Object.dict_to_bytes( 317 kwargs, self.fields 318 ) 319 self.pdu = pdu 320 321 def init_from_bytes(self, pdu: bytes, offset: int): 322 return hci.HCI_Object.init_from_bytes(self, pdu, offset, self.fields) 323 324 def __bytes__(self) -> bytes: 325 return self.pdu 326 327 def __str__(self) -> str: 328 result = f'{colors.color(self.name, "yellow")} ' 329 if fields := getattr(self, 'fields', None): 330 result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, ' ') 331 else: 332 if len(self.pdu) > 1: 333 result += f': {self.pdu.hex()}' 334 return result 335 336 337@ASE_Operation.subclass( 338 [ 339 [ 340 ('ase_id', 1), 341 ('target_latency', 1), 342 ('target_phy', 1), 343 ('codec_id', hci.CodingFormat.parse_from_bytes), 344 ('codec_specific_configuration', 'v'), 345 ], 346 ] 347) 348class ASE_Config_Codec(ASE_Operation): 349 ''' 350 See Audio Stream Control Service 5.1 - Config Codec Operation 351 ''' 352 353 target_latency: List[int] 354 target_phy: List[int] 355 codec_id: List[hci.CodingFormat] 356 codec_specific_configuration: List[bytes] 357 358 359@ASE_Operation.subclass( 360 [ 361 [ 362 ('ase_id', 1), 363 ('cig_id', 1), 364 ('cis_id', 1), 365 ('sdu_interval', 3), 366 ('framing', 1), 367 ('phy', 1), 368 ('max_sdu', 2), 369 ('retransmission_number', 1), 370 ('max_transport_latency', 2), 371 ('presentation_delay', 3), 372 ], 373 ] 374) 375class ASE_Config_QOS(ASE_Operation): 376 ''' 377 See Audio Stream Control Service 5.2 - Config Qos Operation 378 ''' 379 380 cig_id: List[int] 381 cis_id: List[int] 382 sdu_interval: List[int] 383 framing: List[int] 384 phy: List[int] 385 max_sdu: List[int] 386 retransmission_number: List[int] 387 max_transport_latency: List[int] 388 presentation_delay: List[int] 389 390 391@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]]) 392class ASE_Enable(ASE_Operation): 393 ''' 394 See Audio Stream Control Service 5.3 - Enable Operation 395 ''' 396 397 metadata: bytes 398 399 400@ASE_Operation.subclass([[('ase_id', 1)]]) 401class ASE_Receiver_Start_Ready(ASE_Operation): 402 ''' 403 See Audio Stream Control Service 5.4 - Receiver Start Ready Operation 404 ''' 405 406 407@ASE_Operation.subclass([[('ase_id', 1)]]) 408class ASE_Disable(ASE_Operation): 409 ''' 410 See Audio Stream Control Service 5.5 - Disable Operation 411 ''' 412 413 414@ASE_Operation.subclass([[('ase_id', 1)]]) 415class ASE_Receiver_Stop_Ready(ASE_Operation): 416 ''' 417 See Audio Stream Control Service 5.6 - Receiver Stop Ready Operation 418 ''' 419 420 421@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]]) 422class ASE_Update_Metadata(ASE_Operation): 423 ''' 424 See Audio Stream Control Service 5.7 - Update Metadata Operation 425 ''' 426 427 metadata: List[bytes] 428 429 430@ASE_Operation.subclass([[('ase_id', 1)]]) 431class ASE_Release(ASE_Operation): 432 ''' 433 See Audio Stream Control Service 5.8 - Release Operation 434 ''' 435 436 437class AseResponseCode(enum.IntEnum): 438 # fmt: off 439 SUCCESS = 0x00 440 UNSUPPORTED_OPCODE = 0x01 441 INVALID_LENGTH = 0x02 442 INVALID_ASE_ID = 0x03 443 INVALID_ASE_STATE_MACHINE_TRANSITION = 0x04 444 INVALID_ASE_DIRECTION = 0x05 445 UNSUPPORTED_AUDIO_CAPABILITIES = 0x06 446 UNSUPPORTED_CONFIGURATION_PARAMETER_VALUE = 0x07 447 REJECTED_CONFIGURATION_PARAMETER_VALUE = 0x08 448 INVALID_CONFIGURATION_PARAMETER_VALUE = 0x09 449 UNSUPPORTED_METADATA = 0x0A 450 REJECTED_METADATA = 0x0B 451 INVALID_METADATA = 0x0C 452 INSUFFICIENT_RESOURCES = 0x0D 453 UNSPECIFIED_ERROR = 0x0E 454 455 456class AseReasonCode(enum.IntEnum): 457 # fmt: off 458 NONE = 0x00 459 CODEC_ID = 0x01 460 CODEC_SPECIFIC_CONFIGURATION = 0x02 461 SDU_INTERVAL = 0x03 462 FRAMING = 0x04 463 PHY = 0x05 464 MAXIMUM_SDU_SIZE = 0x06 465 RETRANSMISSION_NUMBER = 0x07 466 MAX_TRANSPORT_LATENCY = 0x08 467 PRESENTATION_DELAY = 0x09 468 INVALID_ASE_CIS_MAPPING = 0x0A 469 470 471class AudioRole(enum.IntEnum): 472 SINK = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST 473 SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER 474 475 476@dataclasses.dataclass 477class UnicastServerAdvertisingData: 478 """Advertising Data for ASCS.""" 479 480 announcement_type: AnnouncementType = AnnouncementType.TARGETED 481 available_audio_contexts: ContextType = ContextType.MEDIA 482 metadata: bytes = b'' 483 484 def __bytes__(self) -> bytes: 485 return bytes( 486 core.AdvertisingData( 487 [ 488 ( 489 core.AdvertisingData.SERVICE_DATA_16_BIT_UUID, 490 struct.pack( 491 '<2sBIB', 492 gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(), 493 self.announcement_type, 494 self.available_audio_contexts, 495 len(self.metadata), 496 ) 497 + self.metadata, 498 ) 499 ] 500 ) 501 ) 502 503 504# ----------------------------------------------------------------------------- 505# Utils 506# ----------------------------------------------------------------------------- 507 508 509def bits_to_channel_counts(data: int) -> List[int]: 510 pos = 0 511 counts = [] 512 while data != 0: 513 # Bit 0 = count 1 514 # Bit 1 = count 2, and so on 515 pos += 1 516 if data & 1: 517 counts.append(pos) 518 data >>= 1 519 return counts 520 521 522def channel_counts_to_bits(counts: Sequence[int]) -> int: 523 return sum(set([1 << (count - 1) for count in counts])) 524 525 526# ----------------------------------------------------------------------------- 527# Structures 528# ----------------------------------------------------------------------------- 529 530 531@dataclasses.dataclass 532class CodecSpecificCapabilities: 533 '''See: 534 * Bluetooth Assigned Numbers, 6.12.4 - Codec Specific Capabilities LTV Structures 535 * Basic Audio Profile, 4.3.1 - Codec_Specific_Capabilities LTV requirements 536 ''' 537 538 class Type(enum.IntEnum): 539 # fmt: off 540 SAMPLING_FREQUENCY = 0x01 541 FRAME_DURATION = 0x02 542 AUDIO_CHANNEL_COUNT = 0x03 543 OCTETS_PER_FRAME = 0x04 544 CODEC_FRAMES_PER_SDU = 0x05 545 546 supported_sampling_frequencies: SupportedSamplingFrequency 547 supported_frame_durations: SupportedFrameDuration 548 supported_audio_channel_count: Sequence[int] 549 min_octets_per_codec_frame: int 550 max_octets_per_codec_frame: int 551 supported_max_codec_frames_per_sdu: int 552 553 @classmethod 554 def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities: 555 offset = 0 556 # Allowed default values. 557 supported_audio_channel_count = [1] 558 supported_max_codec_frames_per_sdu = 1 559 while offset < len(data): 560 length, type = struct.unpack_from('BB', data, offset) 561 offset += 2 562 value = int.from_bytes(data[offset : offset + length - 1], 'little') 563 offset += length - 1 564 565 if type == CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY: 566 supported_sampling_frequencies = SupportedSamplingFrequency(value) 567 elif type == CodecSpecificCapabilities.Type.FRAME_DURATION: 568 supported_frame_durations = SupportedFrameDuration(value) 569 elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT: 570 supported_audio_channel_count = bits_to_channel_counts(value) 571 elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME: 572 min_octets_per_sample = value & 0xFFFF 573 max_octets_per_sample = value >> 16 574 elif type == CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU: 575 supported_max_codec_frames_per_sdu = value 576 577 # It is expected here that if some fields are missing, an error should be raised. 578 return CodecSpecificCapabilities( 579 supported_sampling_frequencies=supported_sampling_frequencies, 580 supported_frame_durations=supported_frame_durations, 581 supported_audio_channel_count=supported_audio_channel_count, 582 min_octets_per_codec_frame=min_octets_per_sample, 583 max_octets_per_codec_frame=max_octets_per_sample, 584 supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu, 585 ) 586 587 def __bytes__(self) -> bytes: 588 return struct.pack( 589 '<BBHBBBBBBBBHHBBB', 590 3, 591 CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY, 592 self.supported_sampling_frequencies, 593 2, 594 CodecSpecificCapabilities.Type.FRAME_DURATION, 595 self.supported_frame_durations, 596 2, 597 CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT, 598 channel_counts_to_bits(self.supported_audio_channel_count), 599 5, 600 CodecSpecificCapabilities.Type.OCTETS_PER_FRAME, 601 self.min_octets_per_codec_frame, 602 self.max_octets_per_codec_frame, 603 2, 604 CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU, 605 self.supported_max_codec_frames_per_sdu, 606 ) 607 608 609@dataclasses.dataclass 610class CodecSpecificConfiguration: 611 '''See: 612 * Bluetooth Assigned Numbers, 6.12.5 - Codec Specific Configuration LTV Structures 613 * Basic Audio Profile, 4.3.2 - Codec_Specific_Capabilities LTV requirements 614 ''' 615 616 class Type(enum.IntEnum): 617 # fmt: off 618 SAMPLING_FREQUENCY = 0x01 619 FRAME_DURATION = 0x02 620 AUDIO_CHANNEL_ALLOCATION = 0x03 621 OCTETS_PER_FRAME = 0x04 622 CODEC_FRAMES_PER_SDU = 0x05 623 624 sampling_frequency: SamplingFrequency 625 frame_duration: FrameDuration 626 audio_channel_allocation: AudioLocation 627 octets_per_codec_frame: int 628 codec_frames_per_sdu: int 629 630 @classmethod 631 def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration: 632 offset = 0 633 # Allowed default values. 634 audio_channel_allocation = AudioLocation.NOT_ALLOWED 635 codec_frames_per_sdu = 1 636 while offset < len(data): 637 length, type = struct.unpack_from('BB', data, offset) 638 offset += 2 639 value = int.from_bytes(data[offset : offset + length - 1], 'little') 640 offset += length - 1 641 642 if type == CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY: 643 sampling_frequency = SamplingFrequency(value) 644 elif type == CodecSpecificConfiguration.Type.FRAME_DURATION: 645 frame_duration = FrameDuration(value) 646 elif type == CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION: 647 audio_channel_allocation = AudioLocation(value) 648 elif type == CodecSpecificConfiguration.Type.OCTETS_PER_FRAME: 649 octets_per_codec_frame = value 650 elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU: 651 codec_frames_per_sdu = value 652 653 # It is expected here that if some fields are missing, an error should be raised. 654 return CodecSpecificConfiguration( 655 sampling_frequency=sampling_frequency, 656 frame_duration=frame_duration, 657 audio_channel_allocation=audio_channel_allocation, 658 octets_per_codec_frame=octets_per_codec_frame, 659 codec_frames_per_sdu=codec_frames_per_sdu, 660 ) 661 662 def __bytes__(self) -> bytes: 663 return struct.pack( 664 '<BBBBBBBBIBBHBBB', 665 2, 666 CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY, 667 self.sampling_frequency, 668 2, 669 CodecSpecificConfiguration.Type.FRAME_DURATION, 670 self.frame_duration, 671 5, 672 CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION, 673 self.audio_channel_allocation, 674 3, 675 CodecSpecificConfiguration.Type.OCTETS_PER_FRAME, 676 self.octets_per_codec_frame, 677 2, 678 CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU, 679 self.codec_frames_per_sdu, 680 ) 681 682 683@dataclasses.dataclass 684class PacRecord: 685 coding_format: hci.CodingFormat 686 codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes] 687 # TODO: Parse Metadata 688 metadata: bytes = b'' 689 690 @classmethod 691 def from_bytes(cls, data: bytes) -> PacRecord: 692 offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0) 693 codec_specific_capabilities_size = data[offset] 694 695 offset += 1 696 codec_specific_capabilities_bytes = data[ 697 offset : offset + codec_specific_capabilities_size 698 ] 699 offset += codec_specific_capabilities_size 700 metadata_size = data[offset] 701 metadata = data[offset : offset + metadata_size] 702 703 codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes] 704 if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC: 705 codec_specific_capabilities = codec_specific_capabilities_bytes 706 else: 707 codec_specific_capabilities = CodecSpecificCapabilities.from_bytes( 708 codec_specific_capabilities_bytes 709 ) 710 711 return PacRecord( 712 coding_format=coding_format, 713 codec_specific_capabilities=codec_specific_capabilities, 714 metadata=metadata, 715 ) 716 717 def __bytes__(self) -> bytes: 718 capabilities_bytes = bytes(self.codec_specific_capabilities) 719 return ( 720 bytes(self.coding_format) 721 + bytes([len(capabilities_bytes)]) 722 + capabilities_bytes 723 + bytes([len(self.metadata)]) 724 + self.metadata 725 ) 726 727 728# ----------------------------------------------------------------------------- 729# Server 730# ----------------------------------------------------------------------------- 731class PublishedAudioCapabilitiesService(gatt.TemplateService): 732 UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE 733 734 sink_pac: Optional[gatt.Characteristic] 735 sink_audio_locations: Optional[gatt.Characteristic] 736 source_pac: Optional[gatt.Characteristic] 737 source_audio_locations: Optional[gatt.Characteristic] 738 available_audio_contexts: gatt.Characteristic 739 supported_audio_contexts: gatt.Characteristic 740 741 def __init__( 742 self, 743 supported_source_context: ContextType, 744 supported_sink_context: ContextType, 745 available_source_context: ContextType, 746 available_sink_context: ContextType, 747 sink_pac: Sequence[PacRecord] = [], 748 sink_audio_locations: Optional[AudioLocation] = None, 749 source_pac: Sequence[PacRecord] = [], 750 source_audio_locations: Optional[AudioLocation] = None, 751 ) -> None: 752 characteristics = [] 753 754 self.supported_audio_contexts = gatt.Characteristic( 755 uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC, 756 properties=gatt.Characteristic.Properties.READ, 757 permissions=gatt.Characteristic.Permissions.READABLE, 758 value=struct.pack('<HH', supported_sink_context, supported_source_context), 759 ) 760 characteristics.append(self.supported_audio_contexts) 761 762 self.available_audio_contexts = gatt.Characteristic( 763 uuid=gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC, 764 properties=gatt.Characteristic.Properties.READ 765 | gatt.Characteristic.Properties.NOTIFY, 766 permissions=gatt.Characteristic.Permissions.READABLE, 767 value=struct.pack('<HH', available_sink_context, available_source_context), 768 ) 769 characteristics.append(self.available_audio_contexts) 770 771 if sink_pac: 772 self.sink_pac = gatt.Characteristic( 773 uuid=gatt.GATT_SINK_PAC_CHARACTERISTIC, 774 properties=gatt.Characteristic.Properties.READ, 775 permissions=gatt.Characteristic.Permissions.READABLE, 776 value=bytes([len(sink_pac)]) + b''.join(map(bytes, sink_pac)), 777 ) 778 characteristics.append(self.sink_pac) 779 780 if sink_audio_locations is not None: 781 self.sink_audio_locations = gatt.Characteristic( 782 uuid=gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC, 783 properties=gatt.Characteristic.Properties.READ, 784 permissions=gatt.Characteristic.Permissions.READABLE, 785 value=struct.pack('<I', sink_audio_locations), 786 ) 787 characteristics.append(self.sink_audio_locations) 788 789 if source_pac: 790 self.source_pac = gatt.Characteristic( 791 uuid=gatt.GATT_SOURCE_PAC_CHARACTERISTIC, 792 properties=gatt.Characteristic.Properties.READ, 793 permissions=gatt.Characteristic.Permissions.READABLE, 794 value=bytes([len(source_pac)]) + b''.join(map(bytes, source_pac)), 795 ) 796 characteristics.append(self.source_pac) 797 798 if source_audio_locations is not None: 799 self.source_audio_locations = gatt.Characteristic( 800 uuid=gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC, 801 properties=gatt.Characteristic.Properties.READ, 802 permissions=gatt.Characteristic.Permissions.READABLE, 803 value=struct.pack('<I', source_audio_locations), 804 ) 805 characteristics.append(self.source_audio_locations) 806 807 super().__init__(characteristics) 808 809 810class AseStateMachine(gatt.Characteristic): 811 class State(enum.IntEnum): 812 # fmt: off 813 IDLE = 0x00 814 CODEC_CONFIGURED = 0x01 815 QOS_CONFIGURED = 0x02 816 ENABLING = 0x03 817 STREAMING = 0x04 818 DISABLING = 0x05 819 RELEASING = 0x06 820 821 cis_link: Optional[device.CisLink] = None 822 823 # Additional parameters in CODEC_CONFIGURED State 824 preferred_framing = 0 # Unframed PDU supported 825 preferred_phy = 0 826 preferred_retransmission_number = 13 827 preferred_max_transport_latency = 100 828 supported_presentation_delay_min = 0 829 supported_presentation_delay_max = 0 830 preferred_presentation_delay_min = 0 831 preferred_presentation_delay_max = 0 832 codec_id = hci.CodingFormat(hci.CodecID.LC3) 833 codec_specific_configuration: Union[CodecSpecificConfiguration, bytes] = b'' 834 835 # Additional parameters in QOS_CONFIGURED State 836 cig_id = 0 837 cis_id = 0 838 sdu_interval = 0 839 framing = 0 840 phy = 0 841 max_sdu = 0 842 retransmission_number = 0 843 max_transport_latency = 0 844 presentation_delay = 0 845 846 # Additional parameters in ENABLING, STREAMING, DISABLING State 847 # TODO: Parse this 848 metadata = b'' 849 850 def __init__( 851 self, 852 role: AudioRole, 853 ase_id: int, 854 service: AudioStreamControlService, 855 ) -> None: 856 self.service = service 857 self.ase_id = ase_id 858 self._state = AseStateMachine.State.IDLE 859 self.role = role 860 861 uuid = ( 862 gatt.GATT_SINK_ASE_CHARACTERISTIC 863 if role == AudioRole.SINK 864 else gatt.GATT_SOURCE_ASE_CHARACTERISTIC 865 ) 866 super().__init__( 867 uuid=uuid, 868 properties=gatt.Characteristic.Properties.READ 869 | gatt.Characteristic.Properties.NOTIFY, 870 permissions=gatt.Characteristic.Permissions.READABLE, 871 value=gatt.CharacteristicValue(read=self.on_read), 872 ) 873 874 self.service.device.on('cis_request', self.on_cis_request) 875 self.service.device.on('cis_establishment', self.on_cis_establishment) 876 877 def on_cis_request( 878 self, 879 acl_connection: device.Connection, 880 cis_handle: int, 881 cig_id: int, 882 cis_id: int, 883 ) -> None: 884 if ( 885 cig_id == self.cig_id 886 and cis_id == self.cis_id 887 and self.state == self.State.ENABLING 888 ): 889 acl_connection.abort_on( 890 'flush', self.service.device.accept_cis_request(cis_handle) 891 ) 892 893 def on_cis_establishment(self, cis_link: device.CisLink) -> None: 894 if ( 895 cis_link.cig_id == self.cig_id 896 and cis_link.cis_id == self.cis_id 897 and self.state == self.State.ENABLING 898 ): 899 cis_link.on('disconnection', self.on_cis_disconnection) 900 901 async def post_cis_established(): 902 await self.service.device.send_command( 903 hci.HCI_LE_Setup_ISO_Data_Path_Command( 904 connection_handle=cis_link.handle, 905 data_path_direction=self.role, 906 data_path_id=0x00, # Fixed HCI 907 codec_id=hci.CodingFormat(hci.CodecID.TRANSPARENT), 908 controller_delay=0, 909 codec_configuration=b'', 910 ) 911 ) 912 if self.role == AudioRole.SINK: 913 self.state = self.State.STREAMING 914 await self.service.device.notify_subscribers(self, self.value) 915 916 cis_link.acl_connection.abort_on('flush', post_cis_established()) 917 self.cis_link = cis_link 918 919 def on_cis_disconnection(self, _reason) -> None: 920 self.cis_link = None 921 922 def on_config_codec( 923 self, 924 target_latency: int, 925 target_phy: int, 926 codec_id: hci.CodingFormat, 927 codec_specific_configuration: bytes, 928 ) -> Tuple[AseResponseCode, AseReasonCode]: 929 if self.state not in ( 930 self.State.IDLE, 931 self.State.CODEC_CONFIGURED, 932 self.State.QOS_CONFIGURED, 933 ): 934 return ( 935 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 936 AseReasonCode.NONE, 937 ) 938 939 self.max_transport_latency = target_latency 940 self.phy = target_phy 941 self.codec_id = codec_id 942 if codec_id.codec_id == hci.CodecID.VENDOR_SPECIFIC: 943 self.codec_specific_configuration = codec_specific_configuration 944 else: 945 self.codec_specific_configuration = CodecSpecificConfiguration.from_bytes( 946 codec_specific_configuration 947 ) 948 949 self.state = self.State.CODEC_CONFIGURED 950 951 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 952 953 def on_config_qos( 954 self, 955 cig_id: int, 956 cis_id: int, 957 sdu_interval: int, 958 framing: int, 959 phy: int, 960 max_sdu: int, 961 retransmission_number: int, 962 max_transport_latency: int, 963 presentation_delay: int, 964 ) -> Tuple[AseResponseCode, AseReasonCode]: 965 if self.state not in ( 966 AseStateMachine.State.CODEC_CONFIGURED, 967 AseStateMachine.State.QOS_CONFIGURED, 968 ): 969 return ( 970 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 971 AseReasonCode.NONE, 972 ) 973 974 self.cig_id = cig_id 975 self.cis_id = cis_id 976 self.sdu_interval = sdu_interval 977 self.framing = framing 978 self.phy = phy 979 self.max_sdu = max_sdu 980 self.retransmission_number = retransmission_number 981 self.max_transport_latency = max_transport_latency 982 self.presentation_delay = presentation_delay 983 984 self.state = self.State.QOS_CONFIGURED 985 986 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 987 988 def on_enable(self, metadata: bytes) -> Tuple[AseResponseCode, AseReasonCode]: 989 if self.state != AseStateMachine.State.QOS_CONFIGURED: 990 return ( 991 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 992 AseReasonCode.NONE, 993 ) 994 995 self.metadata = metadata 996 self.state = self.State.ENABLING 997 998 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 999 1000 def on_receiver_start_ready(self) -> Tuple[AseResponseCode, AseReasonCode]: 1001 if self.state != AseStateMachine.State.ENABLING: 1002 return ( 1003 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 1004 AseReasonCode.NONE, 1005 ) 1006 self.state = self.State.STREAMING 1007 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 1008 1009 def on_disable(self) -> Tuple[AseResponseCode, AseReasonCode]: 1010 if self.state not in ( 1011 AseStateMachine.State.ENABLING, 1012 AseStateMachine.State.STREAMING, 1013 ): 1014 return ( 1015 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 1016 AseReasonCode.NONE, 1017 ) 1018 if self.role == AudioRole.SINK: 1019 self.state = self.State.QOS_CONFIGURED 1020 else: 1021 self.state = self.State.DISABLING 1022 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 1023 1024 def on_receiver_stop_ready(self) -> Tuple[AseResponseCode, AseReasonCode]: 1025 if ( 1026 self.role != AudioRole.SOURCE 1027 or self.state != AseStateMachine.State.DISABLING 1028 ): 1029 return ( 1030 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 1031 AseReasonCode.NONE, 1032 ) 1033 self.state = self.State.QOS_CONFIGURED 1034 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 1035 1036 def on_update_metadata( 1037 self, metadata: bytes 1038 ) -> Tuple[AseResponseCode, AseReasonCode]: 1039 if self.state not in ( 1040 AseStateMachine.State.ENABLING, 1041 AseStateMachine.State.STREAMING, 1042 ): 1043 return ( 1044 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 1045 AseReasonCode.NONE, 1046 ) 1047 self.metadata = metadata 1048 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 1049 1050 def on_release(self) -> Tuple[AseResponseCode, AseReasonCode]: 1051 if self.state == AseStateMachine.State.IDLE: 1052 return ( 1053 AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION, 1054 AseReasonCode.NONE, 1055 ) 1056 self.state = self.State.RELEASING 1057 1058 async def remove_cis_async(): 1059 await self.service.device.send_command( 1060 hci.HCI_LE_Remove_ISO_Data_Path_Command( 1061 connection_handle=self.cis_link.handle, 1062 data_path_direction=self.role, 1063 ) 1064 ) 1065 self.state = self.State.IDLE 1066 await self.service.device.notify_subscribers(self, self.value) 1067 1068 self.service.device.abort_on('flush', remove_cis_async()) 1069 return (AseResponseCode.SUCCESS, AseReasonCode.NONE) 1070 1071 @property 1072 def state(self) -> State: 1073 return self._state 1074 1075 @state.setter 1076 def state(self, new_state: State) -> None: 1077 logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}') 1078 self._state = new_state 1079 self.emit('state_change') 1080 1081 @property 1082 def value(self): 1083 '''Returns ASE_ID, ASE_STATE, and ASE Additional Parameters.''' 1084 1085 if self.state == self.State.CODEC_CONFIGURED: 1086 codec_specific_configuration_bytes = bytes( 1087 self.codec_specific_configuration 1088 ) 1089 additional_parameters = ( 1090 struct.pack( 1091 '<BBBH', 1092 self.preferred_framing, 1093 self.preferred_phy, 1094 self.preferred_retransmission_number, 1095 self.preferred_max_transport_latency, 1096 ) 1097 + self.supported_presentation_delay_min.to_bytes(3, 'little') 1098 + self.supported_presentation_delay_max.to_bytes(3, 'little') 1099 + self.preferred_presentation_delay_min.to_bytes(3, 'little') 1100 + self.preferred_presentation_delay_max.to_bytes(3, 'little') 1101 + bytes(self.codec_id) 1102 + bytes([len(codec_specific_configuration_bytes)]) 1103 + codec_specific_configuration_bytes 1104 ) 1105 elif self.state == self.State.QOS_CONFIGURED: 1106 additional_parameters = ( 1107 bytes([self.cig_id, self.cis_id]) 1108 + self.sdu_interval.to_bytes(3, 'little') 1109 + struct.pack( 1110 '<BBHBH', 1111 self.framing, 1112 self.phy, 1113 self.max_sdu, 1114 self.retransmission_number, 1115 self.max_transport_latency, 1116 ) 1117 + self.presentation_delay.to_bytes(3, 'little') 1118 ) 1119 elif self.state in ( 1120 self.State.ENABLING, 1121 self.State.STREAMING, 1122 self.State.DISABLING, 1123 ): 1124 additional_parameters = ( 1125 bytes([self.cig_id, self.cis_id, len(self.metadata)]) + self.metadata 1126 ) 1127 else: 1128 additional_parameters = b'' 1129 1130 return bytes([self.ase_id, self.state]) + additional_parameters 1131 1132 @value.setter 1133 def value(self, _new_value): 1134 # Readonly. Do nothing in the setter. 1135 pass 1136 1137 def on_read(self, _: Optional[device.Connection]) -> bytes: 1138 return self.value 1139 1140 def __str__(self) -> str: 1141 return ( 1142 f'AseStateMachine(id={self.ase_id}, role={self.role.name} ' 1143 f'state={self._state.name})' 1144 ) 1145 1146 1147class AudioStreamControlService(gatt.TemplateService): 1148 UUID = gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE 1149 1150 ase_state_machines: Dict[int, AseStateMachine] 1151 ase_control_point: gatt.Characteristic 1152 _active_client: Optional[device.Connection] = None 1153 1154 def __init__( 1155 self, 1156 device: device.Device, 1157 source_ase_id: Sequence[int] = [], 1158 sink_ase_id: Sequence[int] = [], 1159 ) -> None: 1160 self.device = device 1161 self.ase_state_machines = { 1162 **{ 1163 id: AseStateMachine(role=AudioRole.SINK, ase_id=id, service=self) 1164 for id in sink_ase_id 1165 }, 1166 **{ 1167 id: AseStateMachine(role=AudioRole.SOURCE, ase_id=id, service=self) 1168 for id in source_ase_id 1169 }, 1170 } # ASE state machines, by ASE ID 1171 1172 self.ase_control_point = gatt.Characteristic( 1173 uuid=gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC, 1174 properties=gatt.Characteristic.Properties.WRITE 1175 | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE 1176 | gatt.Characteristic.Properties.NOTIFY, 1177 permissions=gatt.Characteristic.Permissions.WRITEABLE, 1178 value=gatt.CharacteristicValue(write=self.on_write_ase_control_point), 1179 ) 1180 1181 super().__init__([self.ase_control_point, *self.ase_state_machines.values()]) 1182 1183 def on_operation(self, opcode: ASE_Operation.Opcode, ase_id: int, args): 1184 if ase := self.ase_state_machines.get(ase_id): 1185 handler = getattr(ase, 'on_' + opcode.name.lower()) 1186 return (ase_id, *handler(*args)) 1187 else: 1188 return (ase_id, AseResponseCode.INVALID_ASE_ID, AseReasonCode.NONE) 1189 1190 def _on_client_disconnected(self, _reason: int) -> None: 1191 for ase in self.ase_state_machines.values(): 1192 ase.state = AseStateMachine.State.IDLE 1193 self._active_client = None 1194 1195 def on_write_ase_control_point(self, connection, data): 1196 if not self._active_client and connection: 1197 self._active_client = connection 1198 connection.once('disconnection', self._on_client_disconnected) 1199 1200 operation = ASE_Operation.from_bytes(data) 1201 responses = [] 1202 logger.debug(f'*** ASCS Write {operation} ***') 1203 1204 if operation.op_code == ASE_Operation.Opcode.CONFIG_CODEC: 1205 for ase_id, *args in zip( 1206 operation.ase_id, 1207 operation.target_latency, 1208 operation.target_phy, 1209 operation.codec_id, 1210 operation.codec_specific_configuration, 1211 ): 1212 responses.append(self.on_operation(operation.op_code, ase_id, args)) 1213 elif operation.op_code == ASE_Operation.Opcode.CONFIG_QOS: 1214 for ase_id, *args in zip( 1215 operation.ase_id, 1216 operation.cig_id, 1217 operation.cis_id, 1218 operation.sdu_interval, 1219 operation.framing, 1220 operation.phy, 1221 operation.max_sdu, 1222 operation.retransmission_number, 1223 operation.max_transport_latency, 1224 operation.presentation_delay, 1225 ): 1226 responses.append(self.on_operation(operation.op_code, ase_id, args)) 1227 elif operation.op_code in ( 1228 ASE_Operation.Opcode.ENABLE, 1229 ASE_Operation.Opcode.UPDATE_METADATA, 1230 ): 1231 for ase_id, *args in zip( 1232 operation.ase_id, 1233 operation.metadata, 1234 ): 1235 responses.append(self.on_operation(operation.op_code, ase_id, args)) 1236 elif operation.op_code in ( 1237 ASE_Operation.Opcode.RECEIVER_START_READY, 1238 ASE_Operation.Opcode.DISABLE, 1239 ASE_Operation.Opcode.RECEIVER_STOP_READY, 1240 ASE_Operation.Opcode.RELEASE, 1241 ): 1242 for ase_id in operation.ase_id: 1243 responses.append(self.on_operation(operation.op_code, ase_id, [])) 1244 1245 control_point_notification = bytes( 1246 [operation.op_code, len(responses)] 1247 ) + b''.join(map(bytes, responses)) 1248 self.device.abort_on( 1249 'flush', 1250 self.device.notify_subscribers( 1251 self.ase_control_point, control_point_notification 1252 ), 1253 ) 1254 1255 for ase_id, *_ in responses: 1256 if ase := self.ase_state_machines.get(ase_id): 1257 self.device.abort_on( 1258 'flush', 1259 self.device.notify_subscribers(ase, ase.value), 1260 ) 1261 1262 1263# ----------------------------------------------------------------------------- 1264# Client 1265# ----------------------------------------------------------------------------- 1266class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy): 1267 SERVICE_CLASS = PublishedAudioCapabilitiesService 1268 1269 sink_pac: Optional[gatt_client.CharacteristicProxy] = None 1270 sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None 1271 source_pac: Optional[gatt_client.CharacteristicProxy] = None 1272 source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None 1273 available_audio_contexts: gatt_client.CharacteristicProxy 1274 supported_audio_contexts: gatt_client.CharacteristicProxy 1275 1276 def __init__(self, service_proxy: gatt_client.ServiceProxy): 1277 self.service_proxy = service_proxy 1278 1279 self.available_audio_contexts = service_proxy.get_characteristics_by_uuid( 1280 gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC 1281 )[0] 1282 self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid( 1283 gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC 1284 )[0] 1285 1286 if characteristics := service_proxy.get_characteristics_by_uuid( 1287 gatt.GATT_SINK_PAC_CHARACTERISTIC 1288 ): 1289 self.sink_pac = characteristics[0] 1290 1291 if characteristics := service_proxy.get_characteristics_by_uuid( 1292 gatt.GATT_SOURCE_PAC_CHARACTERISTIC 1293 ): 1294 self.source_pac = characteristics[0] 1295 1296 if characteristics := service_proxy.get_characteristics_by_uuid( 1297 gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC 1298 ): 1299 self.sink_audio_locations = characteristics[0] 1300 1301 if characteristics := service_proxy.get_characteristics_by_uuid( 1302 gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC 1303 ): 1304 self.source_audio_locations = characteristics[0] 1305 1306 1307class AudioStreamControlServiceProxy(gatt_client.ProfileServiceProxy): 1308 SERVICE_CLASS = AudioStreamControlService 1309 1310 sink_ase: List[gatt_client.CharacteristicProxy] 1311 source_ase: List[gatt_client.CharacteristicProxy] 1312 ase_control_point: gatt_client.CharacteristicProxy 1313 1314 def __init__(self, service_proxy: gatt_client.ServiceProxy): 1315 self.service_proxy = service_proxy 1316 1317 self.sink_ase = service_proxy.get_characteristics_by_uuid( 1318 gatt.GATT_SINK_ASE_CHARACTERISTIC 1319 ) 1320 self.source_ase = service_proxy.get_characteristics_by_uuid( 1321 gatt.GATT_SOURCE_ASE_CHARACTERISTIC 1322 ) 1323 self.ase_control_point = service_proxy.get_characteristics_by_uuid( 1324 gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC 1325 )[0] 1326