• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15
16# -----------------------------------------------------------------------------
17# Imports
18# -----------------------------------------------------------------------------
19from __future__ import annotations
20
21from collections.abc import Sequence
22import dataclasses
23import enum
24import struct
25import functools
26import logging
27from typing import Optional, List, Union, Type, Dict, Any, Tuple
28
29from bumble import core
30from bumble import colors
31from bumble import device
32from bumble import hci
33from bumble import gatt
34from bumble import gatt_client
35
36
37# -----------------------------------------------------------------------------
38# Logging
39# -----------------------------------------------------------------------------
40logger = logging.getLogger(__name__)
41
42# -----------------------------------------------------------------------------
43# Constants
44# -----------------------------------------------------------------------------
45
46
47class AudioLocation(enum.IntFlag):
48    '''Bluetooth Assigned Numbers, Section 6.12.1 - Audio Location'''
49
50    # fmt: off
51    NOT_ALLOWED             = 0x00000000
52    FRONT_LEFT              = 0x00000001
53    FRONT_RIGHT             = 0x00000002
54    FRONT_CENTER            = 0x00000004
55    LOW_FREQUENCY_EFFECTS_1 = 0x00000008
56    BACK_LEFT               = 0x00000010
57    BACK_RIGHT              = 0x00000020
58    FRONT_LEFT_OF_CENTER    = 0x00000040
59    FRONT_RIGHT_OF_CENTER   = 0x00000080
60    BACK_CENTER             = 0x00000100
61    LOW_FREQUENCY_EFFECTS_2 = 0x00000200
62    SIDE_LEFT               = 0x00000400
63    SIDE_RIGHT              = 0x00000800
64    TOP_FRONT_LEFT          = 0x00001000
65    TOP_FRONT_RIGHT         = 0x00002000
66    TOP_FRONT_CENTER        = 0x00004000
67    TOP_CENTER              = 0x00008000
68    TOP_BACK_LEFT           = 0x00010000
69    TOP_BACK_RIGHT          = 0x00020000
70    TOP_SIDE_LEFT           = 0x00040000
71    TOP_SIDE_RIGHT          = 0x00080000
72    TOP_BACK_CENTER         = 0x00100000
73    BOTTOM_FRONT_CENTER     = 0x00200000
74    BOTTOM_FRONT_LEFT       = 0x00400000
75    BOTTOM_FRONT_RIGHT      = 0x00800000
76    FRONT_LEFT_WIDE         = 0x01000000
77    FRONT_RIGHT_WIDE        = 0x02000000
78    LEFT_SURROUND           = 0x04000000
79    RIGHT_SURROUND          = 0x08000000
80
81    @property
82    def channel_count(self) -> int:
83        return bin(self.value).count('1')
84
85
86class AudioInputType(enum.IntEnum):
87    '''Bluetooth Assigned Numbers, Section 6.12.2 - Audio Input Type'''
88
89    # fmt: off
90    UNSPECIFIED = 0x00
91    BLUETOOTH   = 0x01
92    MICROPHONE  = 0x02
93    ANALOG      = 0x03
94    DIGITAL     = 0x04
95    RADIO       = 0x05
96    STREAMING   = 0x06
97    AMBIENT     = 0x07
98
99
100class ContextType(enum.IntFlag):
101    '''Bluetooth Assigned Numbers, Section 6.12.3 - Context Type'''
102
103    # fmt: off
104    PROHIBITED       = 0x0000
105    CONVERSATIONAL   = 0x0002
106    MEDIA            = 0x0004
107    GAME             = 0x0008
108    INSTRUCTIONAL    = 0x0010
109    VOICE_ASSISTANTS = 0x0020
110    LIVE             = 0x0040
111    SOUND_EFFECTS    = 0x0080
112    NOTIFICATIONS    = 0x0100
113    RINGTONE         = 0x0200
114    ALERTS           = 0x0400
115    EMERGENCY_ALARM  = 0x0800
116
117
118class SamplingFrequency(enum.IntEnum):
119    '''Bluetooth Assigned Numbers, Section 6.12.5.1 - Sampling Frequency'''
120
121    # fmt: off
122    FREQ_8000    = 0x01
123    FREQ_11025   = 0x02
124    FREQ_16000   = 0x03
125    FREQ_22050   = 0x04
126    FREQ_24000   = 0x05
127    FREQ_32000   = 0x06
128    FREQ_44100   = 0x07
129    FREQ_48000   = 0x08
130    FREQ_88200   = 0x09
131    FREQ_96000   = 0x0A
132    FREQ_176400  = 0x0B
133    FREQ_192000  = 0x0C
134    FREQ_384000  = 0x0D
135    # fmt: on
136
137    @classmethod
138    def from_hz(cls, frequency: int) -> SamplingFrequency:
139        return {
140            8000: SamplingFrequency.FREQ_8000,
141            11025: SamplingFrequency.FREQ_11025,
142            16000: SamplingFrequency.FREQ_16000,
143            22050: SamplingFrequency.FREQ_22050,
144            24000: SamplingFrequency.FREQ_24000,
145            32000: SamplingFrequency.FREQ_32000,
146            44100: SamplingFrequency.FREQ_44100,
147            48000: SamplingFrequency.FREQ_48000,
148            88200: SamplingFrequency.FREQ_88200,
149            96000: SamplingFrequency.FREQ_96000,
150            176400: SamplingFrequency.FREQ_176400,
151            192000: SamplingFrequency.FREQ_192000,
152            384000: SamplingFrequency.FREQ_384000,
153        }[frequency]
154
155    @property
156    def hz(self) -> int:
157        return {
158            SamplingFrequency.FREQ_8000: 8000,
159            SamplingFrequency.FREQ_11025: 11025,
160            SamplingFrequency.FREQ_16000: 16000,
161            SamplingFrequency.FREQ_22050: 22050,
162            SamplingFrequency.FREQ_24000: 24000,
163            SamplingFrequency.FREQ_32000: 32000,
164            SamplingFrequency.FREQ_44100: 44100,
165            SamplingFrequency.FREQ_48000: 48000,
166            SamplingFrequency.FREQ_88200: 88200,
167            SamplingFrequency.FREQ_96000: 96000,
168            SamplingFrequency.FREQ_176400: 176400,
169            SamplingFrequency.FREQ_192000: 192000,
170            SamplingFrequency.FREQ_384000: 384000,
171        }[self]
172
173
174class SupportedSamplingFrequency(enum.IntFlag):
175    '''Bluetooth Assigned Numbers, Section 6.12.4.1 - Sample Frequency'''
176
177    # fmt: off
178    FREQ_8000    = 1 << (SamplingFrequency.FREQ_8000 - 1)
179    FREQ_11025   = 1 << (SamplingFrequency.FREQ_11025 - 1)
180    FREQ_16000   = 1 << (SamplingFrequency.FREQ_16000 - 1)
181    FREQ_22050   = 1 << (SamplingFrequency.FREQ_22050 - 1)
182    FREQ_24000   = 1 << (SamplingFrequency.FREQ_24000 - 1)
183    FREQ_32000   = 1 << (SamplingFrequency.FREQ_32000 - 1)
184    FREQ_44100   = 1 << (SamplingFrequency.FREQ_44100 - 1)
185    FREQ_48000   = 1 << (SamplingFrequency.FREQ_48000 - 1)
186    FREQ_88200   = 1 << (SamplingFrequency.FREQ_88200 - 1)
187    FREQ_96000   = 1 << (SamplingFrequency.FREQ_96000 - 1)
188    FREQ_176400  = 1 << (SamplingFrequency.FREQ_176400 - 1)
189    FREQ_192000  = 1 << (SamplingFrequency.FREQ_192000 - 1)
190    FREQ_384000  = 1 << (SamplingFrequency.FREQ_384000 - 1)
191    # fmt: on
192
193    @classmethod
194    def from_hz(cls, frequencies: Sequence[int]) -> SupportedSamplingFrequency:
195        MAPPING = {
196            8000: SupportedSamplingFrequency.FREQ_8000,
197            11025: SupportedSamplingFrequency.FREQ_11025,
198            16000: SupportedSamplingFrequency.FREQ_16000,
199            22050: SupportedSamplingFrequency.FREQ_22050,
200            24000: SupportedSamplingFrequency.FREQ_24000,
201            32000: SupportedSamplingFrequency.FREQ_32000,
202            44100: SupportedSamplingFrequency.FREQ_44100,
203            48000: SupportedSamplingFrequency.FREQ_48000,
204            88200: SupportedSamplingFrequency.FREQ_88200,
205            96000: SupportedSamplingFrequency.FREQ_96000,
206            176400: SupportedSamplingFrequency.FREQ_176400,
207            192000: SupportedSamplingFrequency.FREQ_192000,
208            384000: SupportedSamplingFrequency.FREQ_384000,
209        }
210
211        return functools.reduce(
212            lambda x, y: x | MAPPING[y],
213            frequencies,
214            cls(0),
215        )
216
217
218class FrameDuration(enum.IntEnum):
219    '''Bluetooth Assigned Numbers, Section 6.12.5.2 - Frame Duration'''
220
221    # fmt: off
222    DURATION_7500_US  = 0x00
223    DURATION_10000_US = 0x01
224
225    @property
226    def us(self) -> int:
227        return {
228            FrameDuration.DURATION_7500_US: 7500,
229            FrameDuration.DURATION_10000_US: 10000,
230        }[self]
231
232
233class SupportedFrameDuration(enum.IntFlag):
234    '''Bluetooth Assigned Numbers, Section 6.12.4.2 - Frame Duration'''
235
236    # fmt: off
237    DURATION_7500_US_SUPPORTED  = 0b0001
238    DURATION_10000_US_SUPPORTED = 0b0010
239    DURATION_7500_US_PREFERRED  = 0b0001
240    DURATION_10000_US_PREFERRED = 0b0010
241
242
243class AnnouncementType(enum.IntEnum):
244    '''Basic Audio Profile, 3.5.3. Additional Audio Stream Control Service requirements'''
245
246    # fmt: off
247    GENERAL  = 0x00
248    TARGETED = 0x01
249
250
251# -----------------------------------------------------------------------------
252# ASE Operations
253# -----------------------------------------------------------------------------
254
255
256class ASE_Operation:
257    '''
258    See Audio Stream Control Service - 5 ASE Control operations.
259    '''
260
261    classes: Dict[int, Type[ASE_Operation]] = {}
262    op_code: int
263    name: str
264    fields: Optional[Sequence[Any]] = None
265    ase_id: List[int]
266
267    class Opcode(enum.IntEnum):
268        # fmt: off
269        CONFIG_CODEC         = 0x01
270        CONFIG_QOS           = 0x02
271        ENABLE               = 0x03
272        RECEIVER_START_READY = 0x04
273        DISABLE              = 0x05
274        RECEIVER_STOP_READY  = 0x06
275        UPDATE_METADATA      = 0x07
276        RELEASE              = 0x08
277
278    @staticmethod
279    def from_bytes(pdu: bytes) -> ASE_Operation:
280        op_code = pdu[0]
281
282        cls = ASE_Operation.classes.get(op_code)
283        if cls is None:
284            instance = ASE_Operation(pdu)
285            instance.name = ASE_Operation.Opcode(op_code).name
286            instance.op_code = op_code
287            return instance
288        self = cls.__new__(cls)
289        ASE_Operation.__init__(self, pdu)
290        if self.fields is not None:
291            self.init_from_bytes(pdu, 1)
292        return self
293
294    @staticmethod
295    def subclass(fields):
296        def inner(cls: Type[ASE_Operation]):
297            try:
298                operation = ASE_Operation.Opcode[cls.__name__[4:].upper()]
299                cls.name = operation.name
300                cls.op_code = operation
301            except:
302                raise KeyError(f'PDU name {cls.name} not found in Ase_Operation.Opcode')
303            cls.fields = fields
304
305            # Register a factory for this class
306            ASE_Operation.classes[cls.op_code] = cls
307
308            return cls
309
310        return inner
311
312    def __init__(self, pdu: Optional[bytes] = None, **kwargs) -> None:
313        if self.fields is not None and kwargs:
314            hci.HCI_Object.init_from_fields(self, self.fields, kwargs)
315        if pdu is None:
316            pdu = bytes([self.op_code]) + hci.HCI_Object.dict_to_bytes(
317                kwargs, self.fields
318            )
319        self.pdu = pdu
320
321    def init_from_bytes(self, pdu: bytes, offset: int):
322        return hci.HCI_Object.init_from_bytes(self, pdu, offset, self.fields)
323
324    def __bytes__(self) -> bytes:
325        return self.pdu
326
327    def __str__(self) -> str:
328        result = f'{colors.color(self.name, "yellow")} '
329        if fields := getattr(self, 'fields', None):
330            result += ':\n' + hci.HCI_Object.format_fields(self.__dict__, fields, '  ')
331        else:
332            if len(self.pdu) > 1:
333                result += f': {self.pdu.hex()}'
334        return result
335
336
337@ASE_Operation.subclass(
338    [
339        [
340            ('ase_id', 1),
341            ('target_latency', 1),
342            ('target_phy', 1),
343            ('codec_id', hci.CodingFormat.parse_from_bytes),
344            ('codec_specific_configuration', 'v'),
345        ],
346    ]
347)
348class ASE_Config_Codec(ASE_Operation):
349    '''
350    See Audio Stream Control Service 5.1 - Config Codec Operation
351    '''
352
353    target_latency: List[int]
354    target_phy: List[int]
355    codec_id: List[hci.CodingFormat]
356    codec_specific_configuration: List[bytes]
357
358
359@ASE_Operation.subclass(
360    [
361        [
362            ('ase_id', 1),
363            ('cig_id', 1),
364            ('cis_id', 1),
365            ('sdu_interval', 3),
366            ('framing', 1),
367            ('phy', 1),
368            ('max_sdu', 2),
369            ('retransmission_number', 1),
370            ('max_transport_latency', 2),
371            ('presentation_delay', 3),
372        ],
373    ]
374)
375class ASE_Config_QOS(ASE_Operation):
376    '''
377    See Audio Stream Control Service 5.2 - Config Qos Operation
378    '''
379
380    cig_id: List[int]
381    cis_id: List[int]
382    sdu_interval: List[int]
383    framing: List[int]
384    phy: List[int]
385    max_sdu: List[int]
386    retransmission_number: List[int]
387    max_transport_latency: List[int]
388    presentation_delay: List[int]
389
390
391@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]])
392class ASE_Enable(ASE_Operation):
393    '''
394    See Audio Stream Control Service 5.3 - Enable Operation
395    '''
396
397    metadata: bytes
398
399
400@ASE_Operation.subclass([[('ase_id', 1)]])
401class ASE_Receiver_Start_Ready(ASE_Operation):
402    '''
403    See Audio Stream Control Service 5.4 - Receiver Start Ready Operation
404    '''
405
406
407@ASE_Operation.subclass([[('ase_id', 1)]])
408class ASE_Disable(ASE_Operation):
409    '''
410    See Audio Stream Control Service 5.5 - Disable Operation
411    '''
412
413
414@ASE_Operation.subclass([[('ase_id', 1)]])
415class ASE_Receiver_Stop_Ready(ASE_Operation):
416    '''
417    See Audio Stream Control Service 5.6 - Receiver Stop Ready Operation
418    '''
419
420
421@ASE_Operation.subclass([[('ase_id', 1), ('metadata', 'v')]])
422class ASE_Update_Metadata(ASE_Operation):
423    '''
424    See Audio Stream Control Service 5.7 - Update Metadata Operation
425    '''
426
427    metadata: List[bytes]
428
429
430@ASE_Operation.subclass([[('ase_id', 1)]])
431class ASE_Release(ASE_Operation):
432    '''
433    See Audio Stream Control Service 5.8 - Release Operation
434    '''
435
436
437class AseResponseCode(enum.IntEnum):
438    # fmt: off
439    SUCCESS                                     = 0x00
440    UNSUPPORTED_OPCODE                          = 0x01
441    INVALID_LENGTH                              = 0x02
442    INVALID_ASE_ID                              = 0x03
443    INVALID_ASE_STATE_MACHINE_TRANSITION        = 0x04
444    INVALID_ASE_DIRECTION                       = 0x05
445    UNSUPPORTED_AUDIO_CAPABILITIES              = 0x06
446    UNSUPPORTED_CONFIGURATION_PARAMETER_VALUE   = 0x07
447    REJECTED_CONFIGURATION_PARAMETER_VALUE      = 0x08
448    INVALID_CONFIGURATION_PARAMETER_VALUE       = 0x09
449    UNSUPPORTED_METADATA                        = 0x0A
450    REJECTED_METADATA                           = 0x0B
451    INVALID_METADATA                            = 0x0C
452    INSUFFICIENT_RESOURCES                      = 0x0D
453    UNSPECIFIED_ERROR                           = 0x0E
454
455
456class AseReasonCode(enum.IntEnum):
457    # fmt: off
458    NONE                            = 0x00
459    CODEC_ID                        = 0x01
460    CODEC_SPECIFIC_CONFIGURATION    = 0x02
461    SDU_INTERVAL                    = 0x03
462    FRAMING                         = 0x04
463    PHY                             = 0x05
464    MAXIMUM_SDU_SIZE                = 0x06
465    RETRANSMISSION_NUMBER           = 0x07
466    MAX_TRANSPORT_LATENCY           = 0x08
467    PRESENTATION_DELAY              = 0x09
468    INVALID_ASE_CIS_MAPPING         = 0x0A
469
470
471class AudioRole(enum.IntEnum):
472    SINK = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.CONTROLLER_TO_HOST
473    SOURCE = hci.HCI_LE_Setup_ISO_Data_Path_Command.Direction.HOST_TO_CONTROLLER
474
475
476@dataclasses.dataclass
477class UnicastServerAdvertisingData:
478    """Advertising Data for ASCS."""
479
480    announcement_type: AnnouncementType = AnnouncementType.TARGETED
481    available_audio_contexts: ContextType = ContextType.MEDIA
482    metadata: bytes = b''
483
484    def __bytes__(self) -> bytes:
485        return bytes(
486            core.AdvertisingData(
487                [
488                    (
489                        core.AdvertisingData.SERVICE_DATA_16_BIT_UUID,
490                        struct.pack(
491                            '<2sBIB',
492                            gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE.to_bytes(),
493                            self.announcement_type,
494                            self.available_audio_contexts,
495                            len(self.metadata),
496                        )
497                        + self.metadata,
498                    )
499                ]
500            )
501        )
502
503
504# -----------------------------------------------------------------------------
505# Utils
506# -----------------------------------------------------------------------------
507
508
509def bits_to_channel_counts(data: int) -> List[int]:
510    pos = 0
511    counts = []
512    while data != 0:
513        # Bit 0 = count 1
514        # Bit 1 = count 2, and so on
515        pos += 1
516        if data & 1:
517            counts.append(pos)
518        data >>= 1
519    return counts
520
521
522def channel_counts_to_bits(counts: Sequence[int]) -> int:
523    return sum(set([1 << (count - 1) for count in counts]))
524
525
526# -----------------------------------------------------------------------------
527# Structures
528# -----------------------------------------------------------------------------
529
530
531@dataclasses.dataclass
532class CodecSpecificCapabilities:
533    '''See:
534    * Bluetooth Assigned Numbers, 6.12.4 - Codec Specific Capabilities LTV Structures
535    * Basic Audio Profile, 4.3.1 - Codec_Specific_Capabilities LTV requirements
536    '''
537
538    class Type(enum.IntEnum):
539        # fmt: off
540        SAMPLING_FREQUENCY   = 0x01
541        FRAME_DURATION       = 0x02
542        AUDIO_CHANNEL_COUNT  = 0x03
543        OCTETS_PER_FRAME     = 0x04
544        CODEC_FRAMES_PER_SDU = 0x05
545
546    supported_sampling_frequencies: SupportedSamplingFrequency
547    supported_frame_durations: SupportedFrameDuration
548    supported_audio_channel_count: Sequence[int]
549    min_octets_per_codec_frame: int
550    max_octets_per_codec_frame: int
551    supported_max_codec_frames_per_sdu: int
552
553    @classmethod
554    def from_bytes(cls, data: bytes) -> CodecSpecificCapabilities:
555        offset = 0
556        # Allowed default values.
557        supported_audio_channel_count = [1]
558        supported_max_codec_frames_per_sdu = 1
559        while offset < len(data):
560            length, type = struct.unpack_from('BB', data, offset)
561            offset += 2
562            value = int.from_bytes(data[offset : offset + length - 1], 'little')
563            offset += length - 1
564
565            if type == CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY:
566                supported_sampling_frequencies = SupportedSamplingFrequency(value)
567            elif type == CodecSpecificCapabilities.Type.FRAME_DURATION:
568                supported_frame_durations = SupportedFrameDuration(value)
569            elif type == CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT:
570                supported_audio_channel_count = bits_to_channel_counts(value)
571            elif type == CodecSpecificCapabilities.Type.OCTETS_PER_FRAME:
572                min_octets_per_sample = value & 0xFFFF
573                max_octets_per_sample = value >> 16
574            elif type == CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU:
575                supported_max_codec_frames_per_sdu = value
576
577        # It is expected here that if some fields are missing, an error should be raised.
578        return CodecSpecificCapabilities(
579            supported_sampling_frequencies=supported_sampling_frequencies,
580            supported_frame_durations=supported_frame_durations,
581            supported_audio_channel_count=supported_audio_channel_count,
582            min_octets_per_codec_frame=min_octets_per_sample,
583            max_octets_per_codec_frame=max_octets_per_sample,
584            supported_max_codec_frames_per_sdu=supported_max_codec_frames_per_sdu,
585        )
586
587    def __bytes__(self) -> bytes:
588        return struct.pack(
589            '<BBHBBBBBBBBHHBBB',
590            3,
591            CodecSpecificCapabilities.Type.SAMPLING_FREQUENCY,
592            self.supported_sampling_frequencies,
593            2,
594            CodecSpecificCapabilities.Type.FRAME_DURATION,
595            self.supported_frame_durations,
596            2,
597            CodecSpecificCapabilities.Type.AUDIO_CHANNEL_COUNT,
598            channel_counts_to_bits(self.supported_audio_channel_count),
599            5,
600            CodecSpecificCapabilities.Type.OCTETS_PER_FRAME,
601            self.min_octets_per_codec_frame,
602            self.max_octets_per_codec_frame,
603            2,
604            CodecSpecificCapabilities.Type.CODEC_FRAMES_PER_SDU,
605            self.supported_max_codec_frames_per_sdu,
606        )
607
608
609@dataclasses.dataclass
610class CodecSpecificConfiguration:
611    '''See:
612    * Bluetooth Assigned Numbers, 6.12.5 - Codec Specific Configuration LTV Structures
613    * Basic Audio Profile, 4.3.2 - Codec_Specific_Capabilities LTV requirements
614    '''
615
616    class Type(enum.IntEnum):
617        # fmt: off
618        SAMPLING_FREQUENCY       = 0x01
619        FRAME_DURATION           = 0x02
620        AUDIO_CHANNEL_ALLOCATION = 0x03
621        OCTETS_PER_FRAME         = 0x04
622        CODEC_FRAMES_PER_SDU     = 0x05
623
624    sampling_frequency: SamplingFrequency
625    frame_duration: FrameDuration
626    audio_channel_allocation: AudioLocation
627    octets_per_codec_frame: int
628    codec_frames_per_sdu: int
629
630    @classmethod
631    def from_bytes(cls, data: bytes) -> CodecSpecificConfiguration:
632        offset = 0
633        # Allowed default values.
634        audio_channel_allocation = AudioLocation.NOT_ALLOWED
635        codec_frames_per_sdu = 1
636        while offset < len(data):
637            length, type = struct.unpack_from('BB', data, offset)
638            offset += 2
639            value = int.from_bytes(data[offset : offset + length - 1], 'little')
640            offset += length - 1
641
642            if type == CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY:
643                sampling_frequency = SamplingFrequency(value)
644            elif type == CodecSpecificConfiguration.Type.FRAME_DURATION:
645                frame_duration = FrameDuration(value)
646            elif type == CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION:
647                audio_channel_allocation = AudioLocation(value)
648            elif type == CodecSpecificConfiguration.Type.OCTETS_PER_FRAME:
649                octets_per_codec_frame = value
650            elif type == CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU:
651                codec_frames_per_sdu = value
652
653        # It is expected here that if some fields are missing, an error should be raised.
654        return CodecSpecificConfiguration(
655            sampling_frequency=sampling_frequency,
656            frame_duration=frame_duration,
657            audio_channel_allocation=audio_channel_allocation,
658            octets_per_codec_frame=octets_per_codec_frame,
659            codec_frames_per_sdu=codec_frames_per_sdu,
660        )
661
662    def __bytes__(self) -> bytes:
663        return struct.pack(
664            '<BBBBBBBBIBBHBBB',
665            2,
666            CodecSpecificConfiguration.Type.SAMPLING_FREQUENCY,
667            self.sampling_frequency,
668            2,
669            CodecSpecificConfiguration.Type.FRAME_DURATION,
670            self.frame_duration,
671            5,
672            CodecSpecificConfiguration.Type.AUDIO_CHANNEL_ALLOCATION,
673            self.audio_channel_allocation,
674            3,
675            CodecSpecificConfiguration.Type.OCTETS_PER_FRAME,
676            self.octets_per_codec_frame,
677            2,
678            CodecSpecificConfiguration.Type.CODEC_FRAMES_PER_SDU,
679            self.codec_frames_per_sdu,
680        )
681
682
683@dataclasses.dataclass
684class PacRecord:
685    coding_format: hci.CodingFormat
686    codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
687    # TODO: Parse Metadata
688    metadata: bytes = b''
689
690    @classmethod
691    def from_bytes(cls, data: bytes) -> PacRecord:
692        offset, coding_format = hci.CodingFormat.parse_from_bytes(data, 0)
693        codec_specific_capabilities_size = data[offset]
694
695        offset += 1
696        codec_specific_capabilities_bytes = data[
697            offset : offset + codec_specific_capabilities_size
698        ]
699        offset += codec_specific_capabilities_size
700        metadata_size = data[offset]
701        metadata = data[offset : offset + metadata_size]
702
703        codec_specific_capabilities: Union[CodecSpecificCapabilities, bytes]
704        if coding_format.codec_id == hci.CodecID.VENDOR_SPECIFIC:
705            codec_specific_capabilities = codec_specific_capabilities_bytes
706        else:
707            codec_specific_capabilities = CodecSpecificCapabilities.from_bytes(
708                codec_specific_capabilities_bytes
709            )
710
711        return PacRecord(
712            coding_format=coding_format,
713            codec_specific_capabilities=codec_specific_capabilities,
714            metadata=metadata,
715        )
716
717    def __bytes__(self) -> bytes:
718        capabilities_bytes = bytes(self.codec_specific_capabilities)
719        return (
720            bytes(self.coding_format)
721            + bytes([len(capabilities_bytes)])
722            + capabilities_bytes
723            + bytes([len(self.metadata)])
724            + self.metadata
725        )
726
727
728# -----------------------------------------------------------------------------
729# Server
730# -----------------------------------------------------------------------------
731class PublishedAudioCapabilitiesService(gatt.TemplateService):
732    UUID = gatt.GATT_PUBLISHED_AUDIO_CAPABILITIES_SERVICE
733
734    sink_pac: Optional[gatt.Characteristic]
735    sink_audio_locations: Optional[gatt.Characteristic]
736    source_pac: Optional[gatt.Characteristic]
737    source_audio_locations: Optional[gatt.Characteristic]
738    available_audio_contexts: gatt.Characteristic
739    supported_audio_contexts: gatt.Characteristic
740
741    def __init__(
742        self,
743        supported_source_context: ContextType,
744        supported_sink_context: ContextType,
745        available_source_context: ContextType,
746        available_sink_context: ContextType,
747        sink_pac: Sequence[PacRecord] = [],
748        sink_audio_locations: Optional[AudioLocation] = None,
749        source_pac: Sequence[PacRecord] = [],
750        source_audio_locations: Optional[AudioLocation] = None,
751    ) -> None:
752        characteristics = []
753
754        self.supported_audio_contexts = gatt.Characteristic(
755            uuid=gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC,
756            properties=gatt.Characteristic.Properties.READ,
757            permissions=gatt.Characteristic.Permissions.READABLE,
758            value=struct.pack('<HH', supported_sink_context, supported_source_context),
759        )
760        characteristics.append(self.supported_audio_contexts)
761
762        self.available_audio_contexts = gatt.Characteristic(
763            uuid=gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC,
764            properties=gatt.Characteristic.Properties.READ
765            | gatt.Characteristic.Properties.NOTIFY,
766            permissions=gatt.Characteristic.Permissions.READABLE,
767            value=struct.pack('<HH', available_sink_context, available_source_context),
768        )
769        characteristics.append(self.available_audio_contexts)
770
771        if sink_pac:
772            self.sink_pac = gatt.Characteristic(
773                uuid=gatt.GATT_SINK_PAC_CHARACTERISTIC,
774                properties=gatt.Characteristic.Properties.READ,
775                permissions=gatt.Characteristic.Permissions.READABLE,
776                value=bytes([len(sink_pac)]) + b''.join(map(bytes, sink_pac)),
777            )
778            characteristics.append(self.sink_pac)
779
780        if sink_audio_locations is not None:
781            self.sink_audio_locations = gatt.Characteristic(
782                uuid=gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC,
783                properties=gatt.Characteristic.Properties.READ,
784                permissions=gatt.Characteristic.Permissions.READABLE,
785                value=struct.pack('<I', sink_audio_locations),
786            )
787            characteristics.append(self.sink_audio_locations)
788
789        if source_pac:
790            self.source_pac = gatt.Characteristic(
791                uuid=gatt.GATT_SOURCE_PAC_CHARACTERISTIC,
792                properties=gatt.Characteristic.Properties.READ,
793                permissions=gatt.Characteristic.Permissions.READABLE,
794                value=bytes([len(source_pac)]) + b''.join(map(bytes, source_pac)),
795            )
796            characteristics.append(self.source_pac)
797
798        if source_audio_locations is not None:
799            self.source_audio_locations = gatt.Characteristic(
800                uuid=gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC,
801                properties=gatt.Characteristic.Properties.READ,
802                permissions=gatt.Characteristic.Permissions.READABLE,
803                value=struct.pack('<I', source_audio_locations),
804            )
805            characteristics.append(self.source_audio_locations)
806
807        super().__init__(characteristics)
808
809
810class AseStateMachine(gatt.Characteristic):
811    class State(enum.IntEnum):
812        # fmt: off
813        IDLE             = 0x00
814        CODEC_CONFIGURED = 0x01
815        QOS_CONFIGURED   = 0x02
816        ENABLING         = 0x03
817        STREAMING        = 0x04
818        DISABLING        = 0x05
819        RELEASING        = 0x06
820
821    cis_link: Optional[device.CisLink] = None
822
823    # Additional parameters in CODEC_CONFIGURED State
824    preferred_framing = 0  # Unframed PDU supported
825    preferred_phy = 0
826    preferred_retransmission_number = 13
827    preferred_max_transport_latency = 100
828    supported_presentation_delay_min = 0
829    supported_presentation_delay_max = 0
830    preferred_presentation_delay_min = 0
831    preferred_presentation_delay_max = 0
832    codec_id = hci.CodingFormat(hci.CodecID.LC3)
833    codec_specific_configuration: Union[CodecSpecificConfiguration, bytes] = b''
834
835    # Additional parameters in QOS_CONFIGURED State
836    cig_id = 0
837    cis_id = 0
838    sdu_interval = 0
839    framing = 0
840    phy = 0
841    max_sdu = 0
842    retransmission_number = 0
843    max_transport_latency = 0
844    presentation_delay = 0
845
846    # Additional parameters in ENABLING, STREAMING, DISABLING State
847    # TODO: Parse this
848    metadata = b''
849
850    def __init__(
851        self,
852        role: AudioRole,
853        ase_id: int,
854        service: AudioStreamControlService,
855    ) -> None:
856        self.service = service
857        self.ase_id = ase_id
858        self._state = AseStateMachine.State.IDLE
859        self.role = role
860
861        uuid = (
862            gatt.GATT_SINK_ASE_CHARACTERISTIC
863            if role == AudioRole.SINK
864            else gatt.GATT_SOURCE_ASE_CHARACTERISTIC
865        )
866        super().__init__(
867            uuid=uuid,
868            properties=gatt.Characteristic.Properties.READ
869            | gatt.Characteristic.Properties.NOTIFY,
870            permissions=gatt.Characteristic.Permissions.READABLE,
871            value=gatt.CharacteristicValue(read=self.on_read),
872        )
873
874        self.service.device.on('cis_request', self.on_cis_request)
875        self.service.device.on('cis_establishment', self.on_cis_establishment)
876
877    def on_cis_request(
878        self,
879        acl_connection: device.Connection,
880        cis_handle: int,
881        cig_id: int,
882        cis_id: int,
883    ) -> None:
884        if (
885            cig_id == self.cig_id
886            and cis_id == self.cis_id
887            and self.state == self.State.ENABLING
888        ):
889            acl_connection.abort_on(
890                'flush', self.service.device.accept_cis_request(cis_handle)
891            )
892
893    def on_cis_establishment(self, cis_link: device.CisLink) -> None:
894        if (
895            cis_link.cig_id == self.cig_id
896            and cis_link.cis_id == self.cis_id
897            and self.state == self.State.ENABLING
898        ):
899            cis_link.on('disconnection', self.on_cis_disconnection)
900
901            async def post_cis_established():
902                await self.service.device.send_command(
903                    hci.HCI_LE_Setup_ISO_Data_Path_Command(
904                        connection_handle=cis_link.handle,
905                        data_path_direction=self.role,
906                        data_path_id=0x00,  # Fixed HCI
907                        codec_id=hci.CodingFormat(hci.CodecID.TRANSPARENT),
908                        controller_delay=0,
909                        codec_configuration=b'',
910                    )
911                )
912                if self.role == AudioRole.SINK:
913                    self.state = self.State.STREAMING
914                await self.service.device.notify_subscribers(self, self.value)
915
916            cis_link.acl_connection.abort_on('flush', post_cis_established())
917            self.cis_link = cis_link
918
919    def on_cis_disconnection(self, _reason) -> None:
920        self.cis_link = None
921
922    def on_config_codec(
923        self,
924        target_latency: int,
925        target_phy: int,
926        codec_id: hci.CodingFormat,
927        codec_specific_configuration: bytes,
928    ) -> Tuple[AseResponseCode, AseReasonCode]:
929        if self.state not in (
930            self.State.IDLE,
931            self.State.CODEC_CONFIGURED,
932            self.State.QOS_CONFIGURED,
933        ):
934            return (
935                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
936                AseReasonCode.NONE,
937            )
938
939        self.max_transport_latency = target_latency
940        self.phy = target_phy
941        self.codec_id = codec_id
942        if codec_id.codec_id == hci.CodecID.VENDOR_SPECIFIC:
943            self.codec_specific_configuration = codec_specific_configuration
944        else:
945            self.codec_specific_configuration = CodecSpecificConfiguration.from_bytes(
946                codec_specific_configuration
947            )
948
949        self.state = self.State.CODEC_CONFIGURED
950
951        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
952
953    def on_config_qos(
954        self,
955        cig_id: int,
956        cis_id: int,
957        sdu_interval: int,
958        framing: int,
959        phy: int,
960        max_sdu: int,
961        retransmission_number: int,
962        max_transport_latency: int,
963        presentation_delay: int,
964    ) -> Tuple[AseResponseCode, AseReasonCode]:
965        if self.state not in (
966            AseStateMachine.State.CODEC_CONFIGURED,
967            AseStateMachine.State.QOS_CONFIGURED,
968        ):
969            return (
970                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
971                AseReasonCode.NONE,
972            )
973
974        self.cig_id = cig_id
975        self.cis_id = cis_id
976        self.sdu_interval = sdu_interval
977        self.framing = framing
978        self.phy = phy
979        self.max_sdu = max_sdu
980        self.retransmission_number = retransmission_number
981        self.max_transport_latency = max_transport_latency
982        self.presentation_delay = presentation_delay
983
984        self.state = self.State.QOS_CONFIGURED
985
986        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
987
988    def on_enable(self, metadata: bytes) -> Tuple[AseResponseCode, AseReasonCode]:
989        if self.state != AseStateMachine.State.QOS_CONFIGURED:
990            return (
991                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
992                AseReasonCode.NONE,
993            )
994
995        self.metadata = metadata
996        self.state = self.State.ENABLING
997
998        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
999
1000    def on_receiver_start_ready(self) -> Tuple[AseResponseCode, AseReasonCode]:
1001        if self.state != AseStateMachine.State.ENABLING:
1002            return (
1003                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
1004                AseReasonCode.NONE,
1005            )
1006        self.state = self.State.STREAMING
1007        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
1008
1009    def on_disable(self) -> Tuple[AseResponseCode, AseReasonCode]:
1010        if self.state not in (
1011            AseStateMachine.State.ENABLING,
1012            AseStateMachine.State.STREAMING,
1013        ):
1014            return (
1015                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
1016                AseReasonCode.NONE,
1017            )
1018        if self.role == AudioRole.SINK:
1019            self.state = self.State.QOS_CONFIGURED
1020        else:
1021            self.state = self.State.DISABLING
1022        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
1023
1024    def on_receiver_stop_ready(self) -> Tuple[AseResponseCode, AseReasonCode]:
1025        if (
1026            self.role != AudioRole.SOURCE
1027            or self.state != AseStateMachine.State.DISABLING
1028        ):
1029            return (
1030                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
1031                AseReasonCode.NONE,
1032            )
1033        self.state = self.State.QOS_CONFIGURED
1034        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
1035
1036    def on_update_metadata(
1037        self, metadata: bytes
1038    ) -> Tuple[AseResponseCode, AseReasonCode]:
1039        if self.state not in (
1040            AseStateMachine.State.ENABLING,
1041            AseStateMachine.State.STREAMING,
1042        ):
1043            return (
1044                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
1045                AseReasonCode.NONE,
1046            )
1047        self.metadata = metadata
1048        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
1049
1050    def on_release(self) -> Tuple[AseResponseCode, AseReasonCode]:
1051        if self.state == AseStateMachine.State.IDLE:
1052            return (
1053                AseResponseCode.INVALID_ASE_STATE_MACHINE_TRANSITION,
1054                AseReasonCode.NONE,
1055            )
1056        self.state = self.State.RELEASING
1057
1058        async def remove_cis_async():
1059            await self.service.device.send_command(
1060                hci.HCI_LE_Remove_ISO_Data_Path_Command(
1061                    connection_handle=self.cis_link.handle,
1062                    data_path_direction=self.role,
1063                )
1064            )
1065            self.state = self.State.IDLE
1066            await self.service.device.notify_subscribers(self, self.value)
1067
1068        self.service.device.abort_on('flush', remove_cis_async())
1069        return (AseResponseCode.SUCCESS, AseReasonCode.NONE)
1070
1071    @property
1072    def state(self) -> State:
1073        return self._state
1074
1075    @state.setter
1076    def state(self, new_state: State) -> None:
1077        logger.debug(f'{self} state change -> {colors.color(new_state.name, "cyan")}')
1078        self._state = new_state
1079        self.emit('state_change')
1080
1081    @property
1082    def value(self):
1083        '''Returns ASE_ID, ASE_STATE, and ASE Additional Parameters.'''
1084
1085        if self.state == self.State.CODEC_CONFIGURED:
1086            codec_specific_configuration_bytes = bytes(
1087                self.codec_specific_configuration
1088            )
1089            additional_parameters = (
1090                struct.pack(
1091                    '<BBBH',
1092                    self.preferred_framing,
1093                    self.preferred_phy,
1094                    self.preferred_retransmission_number,
1095                    self.preferred_max_transport_latency,
1096                )
1097                + self.supported_presentation_delay_min.to_bytes(3, 'little')
1098                + self.supported_presentation_delay_max.to_bytes(3, 'little')
1099                + self.preferred_presentation_delay_min.to_bytes(3, 'little')
1100                + self.preferred_presentation_delay_max.to_bytes(3, 'little')
1101                + bytes(self.codec_id)
1102                + bytes([len(codec_specific_configuration_bytes)])
1103                + codec_specific_configuration_bytes
1104            )
1105        elif self.state == self.State.QOS_CONFIGURED:
1106            additional_parameters = (
1107                bytes([self.cig_id, self.cis_id])
1108                + self.sdu_interval.to_bytes(3, 'little')
1109                + struct.pack(
1110                    '<BBHBH',
1111                    self.framing,
1112                    self.phy,
1113                    self.max_sdu,
1114                    self.retransmission_number,
1115                    self.max_transport_latency,
1116                )
1117                + self.presentation_delay.to_bytes(3, 'little')
1118            )
1119        elif self.state in (
1120            self.State.ENABLING,
1121            self.State.STREAMING,
1122            self.State.DISABLING,
1123        ):
1124            additional_parameters = (
1125                bytes([self.cig_id, self.cis_id, len(self.metadata)]) + self.metadata
1126            )
1127        else:
1128            additional_parameters = b''
1129
1130        return bytes([self.ase_id, self.state]) + additional_parameters
1131
1132    @value.setter
1133    def value(self, _new_value):
1134        # Readonly. Do nothing in the setter.
1135        pass
1136
1137    def on_read(self, _: Optional[device.Connection]) -> bytes:
1138        return self.value
1139
1140    def __str__(self) -> str:
1141        return (
1142            f'AseStateMachine(id={self.ase_id}, role={self.role.name} '
1143            f'state={self._state.name})'
1144        )
1145
1146
1147class AudioStreamControlService(gatt.TemplateService):
1148    UUID = gatt.GATT_AUDIO_STREAM_CONTROL_SERVICE
1149
1150    ase_state_machines: Dict[int, AseStateMachine]
1151    ase_control_point: gatt.Characteristic
1152    _active_client: Optional[device.Connection] = None
1153
1154    def __init__(
1155        self,
1156        device: device.Device,
1157        source_ase_id: Sequence[int] = [],
1158        sink_ase_id: Sequence[int] = [],
1159    ) -> None:
1160        self.device = device
1161        self.ase_state_machines = {
1162            **{
1163                id: AseStateMachine(role=AudioRole.SINK, ase_id=id, service=self)
1164                for id in sink_ase_id
1165            },
1166            **{
1167                id: AseStateMachine(role=AudioRole.SOURCE, ase_id=id, service=self)
1168                for id in source_ase_id
1169            },
1170        }  # ASE state machines, by ASE ID
1171
1172        self.ase_control_point = gatt.Characteristic(
1173            uuid=gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC,
1174            properties=gatt.Characteristic.Properties.WRITE
1175            | gatt.Characteristic.Properties.WRITE_WITHOUT_RESPONSE
1176            | gatt.Characteristic.Properties.NOTIFY,
1177            permissions=gatt.Characteristic.Permissions.WRITEABLE,
1178            value=gatt.CharacteristicValue(write=self.on_write_ase_control_point),
1179        )
1180
1181        super().__init__([self.ase_control_point, *self.ase_state_machines.values()])
1182
1183    def on_operation(self, opcode: ASE_Operation.Opcode, ase_id: int, args):
1184        if ase := self.ase_state_machines.get(ase_id):
1185            handler = getattr(ase, 'on_' + opcode.name.lower())
1186            return (ase_id, *handler(*args))
1187        else:
1188            return (ase_id, AseResponseCode.INVALID_ASE_ID, AseReasonCode.NONE)
1189
1190    def _on_client_disconnected(self, _reason: int) -> None:
1191        for ase in self.ase_state_machines.values():
1192            ase.state = AseStateMachine.State.IDLE
1193        self._active_client = None
1194
1195    def on_write_ase_control_point(self, connection, data):
1196        if not self._active_client and connection:
1197            self._active_client = connection
1198            connection.once('disconnection', self._on_client_disconnected)
1199
1200        operation = ASE_Operation.from_bytes(data)
1201        responses = []
1202        logger.debug(f'*** ASCS Write {operation} ***')
1203
1204        if operation.op_code == ASE_Operation.Opcode.CONFIG_CODEC:
1205            for ase_id, *args in zip(
1206                operation.ase_id,
1207                operation.target_latency,
1208                operation.target_phy,
1209                operation.codec_id,
1210                operation.codec_specific_configuration,
1211            ):
1212                responses.append(self.on_operation(operation.op_code, ase_id, args))
1213        elif operation.op_code == ASE_Operation.Opcode.CONFIG_QOS:
1214            for ase_id, *args in zip(
1215                operation.ase_id,
1216                operation.cig_id,
1217                operation.cis_id,
1218                operation.sdu_interval,
1219                operation.framing,
1220                operation.phy,
1221                operation.max_sdu,
1222                operation.retransmission_number,
1223                operation.max_transport_latency,
1224                operation.presentation_delay,
1225            ):
1226                responses.append(self.on_operation(operation.op_code, ase_id, args))
1227        elif operation.op_code in (
1228            ASE_Operation.Opcode.ENABLE,
1229            ASE_Operation.Opcode.UPDATE_METADATA,
1230        ):
1231            for ase_id, *args in zip(
1232                operation.ase_id,
1233                operation.metadata,
1234            ):
1235                responses.append(self.on_operation(operation.op_code, ase_id, args))
1236        elif operation.op_code in (
1237            ASE_Operation.Opcode.RECEIVER_START_READY,
1238            ASE_Operation.Opcode.DISABLE,
1239            ASE_Operation.Opcode.RECEIVER_STOP_READY,
1240            ASE_Operation.Opcode.RELEASE,
1241        ):
1242            for ase_id in operation.ase_id:
1243                responses.append(self.on_operation(operation.op_code, ase_id, []))
1244
1245        control_point_notification = bytes(
1246            [operation.op_code, len(responses)]
1247        ) + b''.join(map(bytes, responses))
1248        self.device.abort_on(
1249            'flush',
1250            self.device.notify_subscribers(
1251                self.ase_control_point, control_point_notification
1252            ),
1253        )
1254
1255        for ase_id, *_ in responses:
1256            if ase := self.ase_state_machines.get(ase_id):
1257                self.device.abort_on(
1258                    'flush',
1259                    self.device.notify_subscribers(ase, ase.value),
1260                )
1261
1262
1263# -----------------------------------------------------------------------------
1264# Client
1265# -----------------------------------------------------------------------------
1266class PublishedAudioCapabilitiesServiceProxy(gatt_client.ProfileServiceProxy):
1267    SERVICE_CLASS = PublishedAudioCapabilitiesService
1268
1269    sink_pac: Optional[gatt_client.CharacteristicProxy] = None
1270    sink_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
1271    source_pac: Optional[gatt_client.CharacteristicProxy] = None
1272    source_audio_locations: Optional[gatt_client.CharacteristicProxy] = None
1273    available_audio_contexts: gatt_client.CharacteristicProxy
1274    supported_audio_contexts: gatt_client.CharacteristicProxy
1275
1276    def __init__(self, service_proxy: gatt_client.ServiceProxy):
1277        self.service_proxy = service_proxy
1278
1279        self.available_audio_contexts = service_proxy.get_characteristics_by_uuid(
1280            gatt.GATT_AVAILABLE_AUDIO_CONTEXTS_CHARACTERISTIC
1281        )[0]
1282        self.supported_audio_contexts = service_proxy.get_characteristics_by_uuid(
1283            gatt.GATT_SUPPORTED_AUDIO_CONTEXTS_CHARACTERISTIC
1284        )[0]
1285
1286        if characteristics := service_proxy.get_characteristics_by_uuid(
1287            gatt.GATT_SINK_PAC_CHARACTERISTIC
1288        ):
1289            self.sink_pac = characteristics[0]
1290
1291        if characteristics := service_proxy.get_characteristics_by_uuid(
1292            gatt.GATT_SOURCE_PAC_CHARACTERISTIC
1293        ):
1294            self.source_pac = characteristics[0]
1295
1296        if characteristics := service_proxy.get_characteristics_by_uuid(
1297            gatt.GATT_SINK_AUDIO_LOCATION_CHARACTERISTIC
1298        ):
1299            self.sink_audio_locations = characteristics[0]
1300
1301        if characteristics := service_proxy.get_characteristics_by_uuid(
1302            gatt.GATT_SOURCE_AUDIO_LOCATION_CHARACTERISTIC
1303        ):
1304            self.source_audio_locations = characteristics[0]
1305
1306
1307class AudioStreamControlServiceProxy(gatt_client.ProfileServiceProxy):
1308    SERVICE_CLASS = AudioStreamControlService
1309
1310    sink_ase: List[gatt_client.CharacteristicProxy]
1311    source_ase: List[gatt_client.CharacteristicProxy]
1312    ase_control_point: gatt_client.CharacteristicProxy
1313
1314    def __init__(self, service_proxy: gatt_client.ServiceProxy):
1315        self.service_proxy = service_proxy
1316
1317        self.sink_ase = service_proxy.get_characteristics_by_uuid(
1318            gatt.GATT_SINK_ASE_CHARACTERISTIC
1319        )
1320        self.source_ase = service_proxy.get_characteristics_by_uuid(
1321            gatt.GATT_SOURCE_ASE_CHARACTERISTIC
1322        )
1323        self.ase_control_point = service_proxy.get_characteristics_by_uuid(
1324            gatt.GATT_ASE_CONTROL_POINT_CHARACTERISTIC
1325        )[0]
1326