• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20import struct
21import time
22import logging
23import enum
24import warnings
25from pyee import EventEmitter
26from typing import (
27    Any,
28    Awaitable,
29    Dict,
30    Type,
31    Tuple,
32    Optional,
33    Callable,
34    List,
35    AsyncGenerator,
36    Iterable,
37    Union,
38    SupportsBytes,
39    cast,
40)
41
42from .core import (
43    BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE,
44    InvalidStateError,
45    ProtocolError,
46    name_or_number,
47)
48from .a2dp import (
49    A2DP_CODEC_TYPE_NAMES,
50    A2DP_MPEG_2_4_AAC_CODEC_TYPE,
51    A2DP_NON_A2DP_CODEC_TYPE,
52    A2DP_SBC_CODEC_TYPE,
53    AacMediaCodecInformation,
54    SbcMediaCodecInformation,
55    VendorSpecificMediaCodecInformation,
56)
57from . import sdp, device, l2cap
58from .colors import color
59
60# -----------------------------------------------------------------------------
61# Logging
62# -----------------------------------------------------------------------------
63logger = logging.getLogger(__name__)
64
65
66# -----------------------------------------------------------------------------
67# Constants
68# -----------------------------------------------------------------------------
69# fmt: off
70# pylint: disable=line-too-long
71
72AVDTP_PSM = 0x0019
73
74AVDTP_DEFAULT_RTX_SIG_TIMER = 5  # Seconds
75
76# Signal Identifiers (AVDTP spec - 8.5 Signal Command Set)
77AVDTP_DISCOVER             = 0x01
78AVDTP_GET_CAPABILITIES     = 0x02
79AVDTP_SET_CONFIGURATION    = 0x03
80AVDTP_GET_CONFIGURATION    = 0x04
81AVDTP_RECONFIGURE          = 0x05
82AVDTP_OPEN                 = 0x06
83AVDTP_START                = 0x07
84AVDTP_CLOSE                = 0x08
85AVDTP_SUSPEND              = 0x09
86AVDTP_ABORT                = 0x0A
87AVDTP_SECURITY_CONTROL     = 0x0B
88AVDTP_GET_ALL_CAPABILITIES = 0x0C
89AVDTP_DELAYREPORT          = 0x0D
90
91AVDTP_SIGNAL_NAMES = {
92    AVDTP_DISCOVER:             'AVDTP_DISCOVER',
93    AVDTP_GET_CAPABILITIES:     'AVDTP_GET_CAPABILITIES',
94    AVDTP_SET_CONFIGURATION:    'AVDTP_SET_CONFIGURATION',
95    AVDTP_GET_CONFIGURATION:    'AVDTP_GET_CONFIGURATION',
96    AVDTP_RECONFIGURE:          'AVDTP_RECONFIGURE',
97    AVDTP_OPEN:                 'AVDTP_OPEN',
98    AVDTP_START:                'AVDTP_START',
99    AVDTP_CLOSE:                'AVDTP_CLOSE',
100    AVDTP_SUSPEND:              'AVDTP_SUSPEND',
101    AVDTP_ABORT:                'AVDTP_ABORT',
102    AVDTP_SECURITY_CONTROL:     'AVDTP_SECURITY_CONTROL',
103    AVDTP_GET_ALL_CAPABILITIES: 'AVDTP_GET_ALL_CAPABILITIES',
104    AVDTP_DELAYREPORT:          'AVDTP_DELAYREPORT'
105}
106
107AVDTP_SIGNAL_IDENTIFIERS = {
108    'AVDTP_DISCOVER':             AVDTP_DISCOVER,
109    'AVDTP_GET_CAPABILITIES':     AVDTP_GET_CAPABILITIES,
110    'AVDTP_SET_CONFIGURATION':    AVDTP_SET_CONFIGURATION,
111    'AVDTP_GET_CONFIGURATION':    AVDTP_GET_CONFIGURATION,
112    'AVDTP_RECONFIGURE':          AVDTP_RECONFIGURE,
113    'AVDTP_OPEN':                 AVDTP_OPEN,
114    'AVDTP_START':                AVDTP_START,
115    'AVDTP_CLOSE':                AVDTP_CLOSE,
116    'AVDTP_SUSPEND':              AVDTP_SUSPEND,
117    'AVDTP_ABORT':                AVDTP_ABORT,
118    'AVDTP_SECURITY_CONTROL':     AVDTP_SECURITY_CONTROL,
119    'AVDTP_GET_ALL_CAPABILITIES': AVDTP_GET_ALL_CAPABILITIES,
120    'AVDTP_DELAYREPORT':          AVDTP_DELAYREPORT
121}
122
123# Error codes (AVDTP spec - 8.20.6.2 ERROR_CODE tables)
124AVDTP_BAD_HEADER_FORMAT_ERROR          = 0x01
125AVDTP_BAD_LENGTH_ERROR                 = 0x11
126AVDTP_BAD_ACP_SEID_ERROR               = 0x12
127AVDTP_SEP_IN_USE_ERROR                 = 0x13
128AVDTP_SEP_NOT_IN_USE_ERROR             = 0x14
129AVDTP_BAD_SERV_CATEGORY_ERROR          = 0x17
130AVDTP_BAD_PAYLOAD_FORMAT_ERROR         = 0x18
131AVDTP_NOT_SUPPORTED_COMMAND_ERROR      = 0x19
132AVDTP_INVALID_CAPABILITIES_ERROR       = 0x1A
133AVDTP_BAD_RECOVERY_TYPE_ERROR          = 0x22
134AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR = 0x23
135AVDTP_BAD_RECOVERY_FORMAT_ERROR        = 0x25
136AVDTP_BAD_ROHC_FORMAT_ERROR            = 0x26
137AVDTP_BAD_CP_FORMAT_ERROR              = 0x27
138AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR    = 0x28
139AVDTP_UNSUPPORTED_CONFIGURATION_ERROR  = 0x29
140AVDTP_BAD_STATE_ERROR                  = 0x31
141
142AVDTP_ERROR_NAMES = {
143    AVDTP_BAD_HEADER_FORMAT_ERROR:          'AVDTP_BAD_HEADER_FORMAT_ERROR',
144    AVDTP_BAD_LENGTH_ERROR:                 'AVDTP_BAD_LENGTH_ERROR',
145    AVDTP_BAD_ACP_SEID_ERROR:               'AVDTP_BAD_ACP_SEID_ERROR',
146    AVDTP_SEP_IN_USE_ERROR:                 'AVDTP_SEP_IN_USE_ERROR',
147    AVDTP_SEP_NOT_IN_USE_ERROR:             'AVDTP_SEP_NOT_IN_USE_ERROR',
148    AVDTP_BAD_SERV_CATEGORY_ERROR:          'AVDTP_BAD_SERV_CATEGORY_ERROR',
149    AVDTP_BAD_PAYLOAD_FORMAT_ERROR:         'AVDTP_BAD_PAYLOAD_FORMAT_ERROR',
150    AVDTP_NOT_SUPPORTED_COMMAND_ERROR:      'AVDTP_NOT_SUPPORTED_COMMAND_ERROR',
151    AVDTP_INVALID_CAPABILITIES_ERROR:       'AVDTP_INVALID_CAPABILITIES_ERROR',
152    AVDTP_BAD_RECOVERY_TYPE_ERROR:          'AVDTP_BAD_RECOVERY_TYPE_ERROR',
153    AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR: 'AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR',
154    AVDTP_BAD_RECOVERY_FORMAT_ERROR:        'AVDTP_BAD_RECOVERY_FORMAT_ERROR',
155    AVDTP_BAD_ROHC_FORMAT_ERROR:            'AVDTP_BAD_ROHC_FORMAT_ERROR',
156    AVDTP_BAD_CP_FORMAT_ERROR:              'AVDTP_BAD_CP_FORMAT_ERROR',
157    AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR:    'AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR',
158    AVDTP_UNSUPPORTED_CONFIGURATION_ERROR:  'AVDTP_UNSUPPORTED_CONFIGURATION_ERROR',
159    AVDTP_BAD_STATE_ERROR:                  'AVDTP_BAD_STATE_ERROR'
160}
161
162AVDTP_AUDIO_MEDIA_TYPE      = 0x00
163AVDTP_VIDEO_MEDIA_TYPE      = 0x01
164AVDTP_MULTIMEDIA_MEDIA_TYPE = 0x02
165
166AVDTP_MEDIA_TYPE_NAMES = {
167    AVDTP_AUDIO_MEDIA_TYPE:      'AVDTP_AUDIO_MEDIA_TYPE',
168    AVDTP_VIDEO_MEDIA_TYPE:      'AVDTP_VIDEO_MEDIA_TYPE',
169    AVDTP_MULTIMEDIA_MEDIA_TYPE: 'AVDTP_MULTIMEDIA_MEDIA_TYPE'
170}
171
172# TSEP (AVDTP spec - 8.20.3 Stream End-point Type, Source or Sink (TSEP))
173AVDTP_TSEP_SRC = 0x00
174AVDTP_TSEP_SNK = 0x01
175
176AVDTP_TSEP_NAMES = {
177    AVDTP_TSEP_SRC: 'AVDTP_TSEP_SRC',
178    AVDTP_TSEP_SNK: 'AVDTP_TSEP_SNK'
179}
180
181# Service Categories (AVDTP spec - Table 8.47: Service Category information element field values)
182AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY    = 0x01
183AVDTP_REPORTING_SERVICE_CATEGORY          = 0x02
184AVDTP_RECOVERY_SERVICE_CATEGORY           = 0x03
185AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY = 0x04
186AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY = 0x05
187AVDTP_MULTIPLEXING_SERVICE_CATEGORY       = 0x06
188AVDTP_MEDIA_CODEC_SERVICE_CATEGORY        = 0x07
189AVDTP_DELAY_REPORTING_SERVICE_CATEGORY    = 0x08
190
191AVDTP_SERVICE_CATEGORY_NAMES = {
192    AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY:    'AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY',
193    AVDTP_REPORTING_SERVICE_CATEGORY:          'AVDTP_REPORTING_SERVICE_CATEGORY',
194    AVDTP_RECOVERY_SERVICE_CATEGORY:           'AVDTP_RECOVERY_SERVICE_CATEGORY',
195    AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY: 'AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY',
196    AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY: 'AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY',
197    AVDTP_MULTIPLEXING_SERVICE_CATEGORY:       'AVDTP_MULTIPLEXING_SERVICE_CATEGORY',
198    AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:        'AVDTP_MEDIA_CODEC_SERVICE_CATEGORY',
199    AVDTP_DELAY_REPORTING_SERVICE_CATEGORY:    'AVDTP_DELAY_REPORTING_SERVICE_CATEGORY'
200}
201
202# States (AVDTP spec - 9.1 State Definitions)
203AVDTP_IDLE_STATE       = 0x00
204AVDTP_CONFIGURED_STATE = 0x01
205AVDTP_OPEN_STATE       = 0x02
206AVDTP_STREAMING_STATE  = 0x03
207AVDTP_CLOSING_STATE    = 0x04
208AVDTP_ABORTING_STATE   = 0x05
209
210AVDTP_STATE_NAMES = {
211    AVDTP_IDLE_STATE:       'AVDTP_IDLE_STATE',
212    AVDTP_CONFIGURED_STATE: 'AVDTP_CONFIGURED_STATE',
213    AVDTP_OPEN_STATE:       'AVDTP_OPEN_STATE',
214    AVDTP_STREAMING_STATE:  'AVDTP_STREAMING_STATE',
215    AVDTP_CLOSING_STATE:    'AVDTP_CLOSING_STATE',
216    AVDTP_ABORTING_STATE:   'AVDTP_ABORTING_STATE'
217}
218
219# fmt: on
220# pylint: enable=line-too-long
221# pylint: disable=invalid-name
222
223
224# -----------------------------------------------------------------------------
225async def find_avdtp_service_with_sdp_client(
226    sdp_client: sdp.Client,
227) -> Optional[Tuple[int, int]]:
228    '''
229    Find an AVDTP service, using a connected SDP client, and return its version,
230    or None if none is found
231    '''
232
233    # Search for services with an Audio Sink service class
234    search_result = await sdp_client.search_attributes(
235        [BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE],
236        [sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID],
237    )
238    for attribute_list in search_result:
239        profile_descriptor_list = sdp.ServiceAttribute.find_attribute_in_list(
240            attribute_list, sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
241        )
242        if profile_descriptor_list:
243            for profile_descriptor in profile_descriptor_list.value:
244                if (
245                    profile_descriptor.type == sdp.DataElement.SEQUENCE
246                    and len(profile_descriptor.value) >= 2
247                ):
248                    avdtp_version_major = profile_descriptor.value[1].value >> 8
249                    avdtp_version_minor = profile_descriptor.value[1].value & 0xFF
250                    return (avdtp_version_major, avdtp_version_minor)
251    return None
252
253
254# -----------------------------------------------------------------------------
255async def find_avdtp_service_with_connection(
256    connection: device.Connection,
257) -> Optional[Tuple[int, int]]:
258    '''
259    Find an AVDTP service, for a connection, and return its version,
260    or None if none is found
261    '''
262
263    sdp_client = sdp.Client(connection)
264    await sdp_client.connect()
265    service_version = await find_avdtp_service_with_sdp_client(sdp_client)
266    await sdp_client.disconnect()
267
268    return service_version
269
270
271# -----------------------------------------------------------------------------
272class RealtimeClock:
273    def now(self) -> float:
274        return time.time()
275
276    async def sleep(self, duration: float) -> None:
277        await asyncio.sleep(duration)
278
279
280# -----------------------------------------------------------------------------
281class MediaPacket:
282    @staticmethod
283    def from_bytes(data: bytes) -> MediaPacket:
284        version = (data[0] >> 6) & 0x03
285        padding = (data[0] >> 5) & 0x01
286        extension = (data[0] >> 4) & 0x01
287        csrc_count = data[0] & 0x0F
288        marker = (data[1] >> 7) & 0x01
289        payload_type = data[1] & 0x7F
290        sequence_number = struct.unpack_from('>H', data, 2)[0]
291        timestamp = struct.unpack_from('>I', data, 4)[0]
292        ssrc = struct.unpack_from('>I', data, 8)[0]
293        csrc_list = [
294            struct.unpack_from('>I', data, 12 + i)[0] for i in range(csrc_count)
295        ]
296        payload = data[12 + csrc_count * 4 :]
297
298        return MediaPacket(
299            version,
300            padding,
301            extension,
302            marker,
303            sequence_number,
304            timestamp,
305            ssrc,
306            csrc_list,
307            payload_type,
308            payload,
309        )
310
311    def __init__(
312        self,
313        version: int,
314        padding: int,
315        extension: int,
316        marker: int,
317        sequence_number: int,
318        timestamp: int,
319        ssrc: int,
320        csrc_list: List[int],
321        payload_type: int,
322        payload: bytes,
323    ) -> None:
324        self.version = version
325        self.padding = padding
326        self.extension = extension
327        self.marker = marker
328        self.sequence_number = sequence_number & 0xFFFF
329        self.timestamp = timestamp & 0xFFFFFFFF
330        self.ssrc = ssrc
331        self.csrc_list = csrc_list
332        self.payload_type = payload_type
333        self.payload = payload
334
335    def __bytes__(self) -> bytes:
336        header = bytes(
337            [
338                self.version << 6
339                | self.padding << 5
340                | self.extension << 4
341                | len(self.csrc_list),
342                self.marker << 7 | self.payload_type,
343            ]
344        ) + struct.pack(
345            '>HII',
346            self.sequence_number,
347            self.timestamp,
348            self.ssrc,
349        )
350        for csrc in self.csrc_list:
351            header += struct.pack('>I', csrc)
352        return header + self.payload
353
354    def __str__(self) -> str:
355        return (
356            f'RTP(v={self.version},'
357            f'p={self.padding},'
358            f'x={self.extension},'
359            f'm={self.marker},'
360            f'pt={self.payload_type},'
361            f'sn={self.sequence_number},'
362            f'ts={self.timestamp},'
363            f'ssrc={self.ssrc},'
364            f'csrcs={self.csrc_list},'
365            f'payload_size={len(self.payload)})'
366        )
367
368
369# -----------------------------------------------------------------------------
370class MediaPacketPump:
371    pump_task: Optional[asyncio.Task]
372
373    def __init__(
374        self, packets: AsyncGenerator, clock: RealtimeClock = RealtimeClock()
375    ) -> None:
376        self.packets = packets
377        self.clock = clock
378        self.pump_task = None
379
380    async def start(self, rtp_channel: l2cap.ClassicChannel) -> None:
381        async def pump_packets():
382            start_time = 0
383            start_timestamp = 0
384
385            try:
386                logger.debug('pump starting')
387                async for packet in self.packets:
388                    # Capture the timestamp of the first packet
389                    if start_time == 0:
390                        start_time = self.clock.now()
391                        start_timestamp = packet.timestamp_seconds
392
393                    # Wait until we can send
394                    when = start_time + (packet.timestamp_seconds - start_timestamp)
395                    now = self.clock.now()
396                    if when > now:
397                        delay = when - now
398                        logger.debug(f'waiting for {delay}')
399                        await self.clock.sleep(delay)
400
401                    # Emit
402                    rtp_channel.send_pdu(bytes(packet))
403                    logger.debug(
404                        f'{color(">>> sending RTP packet:", "green")} {packet}'
405                    )
406            except asyncio.exceptions.CancelledError:
407                logger.debug('pump canceled')
408
409        # Pump packets
410        self.pump_task = asyncio.create_task(pump_packets())
411
412    async def stop(self) -> None:
413        # Stop the pump
414        if self.pump_task:
415            self.pump_task.cancel()
416            await self.pump_task
417            self.pump_task = None
418
419
420# -----------------------------------------------------------------------------
421class MessageAssembler:
422    message: Optional[bytes]
423
424    def __init__(self, callback: Callable[[int, Message], Any]) -> None:
425        self.callback = callback
426        self.reset()
427
428    def reset(self) -> None:
429        self.transaction_label = 0
430        self.message = None
431        self.message_type = Message.MessageType.COMMAND
432        self.signal_identifier = 0
433        self.number_of_signal_packets = 0
434        self.packet_count = 0
435
436    def on_pdu(self, pdu: bytes) -> None:
437        self.packet_count += 1
438
439        transaction_label = pdu[0] >> 4
440        packet_type = Protocol.PacketType((pdu[0] >> 2) & 3)
441        message_type = Message.MessageType(pdu[0] & 3)
442
443        logger.debug(
444            f'transaction_label={transaction_label}, '
445            f'packet_type={packet_type.name}, '
446            f'message_type={message_type.name}'
447        )
448        if packet_type in (
449            Protocol.PacketType.SINGLE_PACKET,
450            Protocol.PacketType.START_PACKET,
451        ):
452            if self.message is not None:
453                # The previous message has not been terminated
454                logger.warning(
455                    'received a start or single packet when expecting an end or '
456                    'continuation'
457                )
458                self.reset()
459
460            self.transaction_label = transaction_label
461            self.signal_identifier = pdu[1] & 0x3F
462            self.message_type = message_type
463
464            if packet_type == Protocol.PacketType.SINGLE_PACKET:
465                self.message = pdu[2:]
466                self.on_message_complete()
467            else:
468                self.number_of_signal_packets = pdu[2]
469                self.message = pdu[3:]
470        elif packet_type in (
471            Protocol.PacketType.CONTINUE_PACKET,
472            Protocol.PacketType.END_PACKET,
473        ):
474            if self.packet_count == 0:
475                logger.warning('unexpected continuation')
476                return
477
478            if transaction_label != self.transaction_label:
479                logger.warning(
480                    f'transaction label mismatch: expected {self.transaction_label}, '
481                    f'received {transaction_label}'
482                )
483                return
484
485            if message_type != self.message_type:
486                logger.warning(
487                    f'message type mismatch: expected {self.message_type}, '
488                    f'received {message_type}'
489                )
490                return
491
492            self.message = (self.message or b'') + pdu[1:]
493
494            if packet_type == Protocol.PacketType.END_PACKET:
495                if self.packet_count != self.number_of_signal_packets:
496                    logger.warning(
497                        'incomplete fragmented message: '
498                        f'expected {self.number_of_signal_packets} packets, '
499                        f'received {self.packet_count}'
500                    )
501                    self.reset()
502                    return
503
504                self.on_message_complete()
505            else:
506                if self.packet_count > self.number_of_signal_packets:
507                    logger.warning(
508                        'too many packets: '
509                        f'expected {self.number_of_signal_packets}, '
510                        f'received {self.packet_count}'
511                    )
512                    self.reset()
513                    return
514
515    def on_message_complete(self) -> None:
516        message = Message.create(
517            self.signal_identifier, self.message_type, self.message or b''
518        )
519        try:
520            self.callback(self.transaction_label, message)
521        except Exception as error:
522            logger.exception(color(f'!!! exception in callback: {error}', 'red'))
523
524        self.reset()
525
526
527# -----------------------------------------------------------------------------
528class ServiceCapabilities:
529    @staticmethod
530    def create(
531        service_category: int, service_capabilities_bytes: bytes
532    ) -> ServiceCapabilities:
533        # Select the appropriate subclass
534        cls: Type[ServiceCapabilities]
535        if service_category == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:
536            cls = MediaCodecCapabilities
537        else:
538            cls = ServiceCapabilities
539
540        # Create an instance and initialize it
541        instance = cls.__new__(cls)
542        instance.service_category = service_category
543        instance.service_capabilities_bytes = service_capabilities_bytes
544        instance.init_from_bytes()
545
546        return instance
547
548    @staticmethod
549    def parse_capabilities(payload: bytes) -> List[ServiceCapabilities]:
550        capabilities = []
551        while payload:
552            service_category = payload[0]
553            length_of_service_capabilities = payload[1]
554            service_capabilities_bytes = payload[2 : 2 + length_of_service_capabilities]
555            capabilities.append(
556                ServiceCapabilities.create(service_category, service_capabilities_bytes)
557            )
558
559            payload = payload[2 + length_of_service_capabilities :]
560
561        return capabilities
562
563    @staticmethod
564    def serialize_capabilities(capabilities: Iterable[ServiceCapabilities]) -> bytes:
565        serialized = b''
566        for item in capabilities:
567            serialized += (
568                bytes([item.service_category, len(item.service_capabilities_bytes)])
569                + item.service_capabilities_bytes
570            )
571        return serialized
572
573    def init_from_bytes(self) -> None:
574        pass
575
576    def __init__(
577        self, service_category: int, service_capabilities_bytes: bytes = b''
578    ) -> None:
579        self.service_category = service_category
580        self.service_capabilities_bytes = service_capabilities_bytes
581
582    def to_string(self, details: List[str] = []) -> str:
583        attributes = ','.join(
584            [name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)]
585            + details
586        )
587        return f'ServiceCapabilities({attributes})'
588
589    def __str__(self) -> str:
590        if self.service_capabilities_bytes:
591            details = [self.service_capabilities_bytes.hex()]
592        else:
593            details = []
594        return self.to_string(details)
595
596
597# -----------------------------------------------------------------------------
598class MediaCodecCapabilities(ServiceCapabilities):
599    media_codec_information: Union[bytes, SupportsBytes]
600    media_type: int
601    media_codec_type: int
602
603    def init_from_bytes(self) -> None:
604        self.media_type = self.service_capabilities_bytes[0]
605        self.media_codec_type = self.service_capabilities_bytes[1]
606        self.media_codec_information = self.service_capabilities_bytes[2:]
607
608        if self.media_codec_type == A2DP_SBC_CODEC_TYPE:
609            self.media_codec_information = SbcMediaCodecInformation.from_bytes(
610                self.media_codec_information
611            )
612        elif self.media_codec_type == A2DP_MPEG_2_4_AAC_CODEC_TYPE:
613            self.media_codec_information = AacMediaCodecInformation.from_bytes(
614                self.media_codec_information
615            )
616        elif self.media_codec_type == A2DP_NON_A2DP_CODEC_TYPE:
617            self.media_codec_information = (
618                VendorSpecificMediaCodecInformation.from_bytes(
619                    self.media_codec_information
620                )
621            )
622
623    def __init__(
624        self,
625        media_type: int,
626        media_codec_type: int,
627        media_codec_information: Union[bytes, SupportsBytes],
628    ) -> None:
629        super().__init__(
630            AVDTP_MEDIA_CODEC_SERVICE_CATEGORY,
631            bytes([media_type, media_codec_type]) + bytes(media_codec_information),
632        )
633        self.media_type = media_type
634        self.media_codec_type = media_codec_type
635        self.media_codec_information = media_codec_information
636
637    def __str__(self) -> str:
638        codec_info = (
639            self.media_codec_information.hex()
640            if isinstance(self.media_codec_information, bytes)
641            else str(self.media_codec_information)
642        )
643
644        details = [
645            f'media_type={name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}',
646            f'codec={name_or_number(A2DP_CODEC_TYPE_NAMES, self.media_codec_type)}',
647            f'codec_info={codec_info}',
648        ]
649        return self.to_string(details)
650
651
652# -----------------------------------------------------------------------------
653class EndPointInfo:
654    @staticmethod
655    def from_bytes(payload: bytes) -> EndPointInfo:
656        return EndPointInfo(
657            payload[0] >> 2, payload[0] >> 1 & 1, payload[1] >> 4, payload[1] >> 3 & 1
658        )
659
660    def __bytes__(self) -> bytes:
661        return bytes(
662            [self.seid << 2 | self.in_use << 1, self.media_type << 4 | self.tsep << 3]
663        )
664
665    def __init__(self, seid: int, in_use: int, media_type: int, tsep: int) -> None:
666        self.seid = seid
667        self.in_use = in_use
668        self.media_type = media_type
669        self.tsep = tsep
670
671
672# -----------------------------------------------------------------------------
673class Message:  # pylint:disable=attribute-defined-outside-init
674    class MessageType(enum.IntEnum):
675        COMMAND = 0
676        GENERAL_REJECT = 1
677        RESPONSE_ACCEPT = 2
678        RESPONSE_REJECT = 3
679
680    # Subclasses, by signal identifier and message type
681    subclasses: Dict[int, Dict[int, Type[Message]]] = {}
682    message_type: MessageType
683    signal_identifier: int
684
685    @staticmethod
686    def subclass(subclass):
687        # Infer the signal identifier and message subtype from the class name
688        name = subclass.__name__
689        if name == 'General_Reject':
690            subclass.signal_identifier = 0
691            signal_identifier_str = None
692            message_type = Message.MessageType.COMMAND
693        elif name.endswith('_Command'):
694            signal_identifier_str = name[:-8]
695            message_type = Message.MessageType.COMMAND
696        elif name.endswith('_Response'):
697            signal_identifier_str = name[:-9]
698            message_type = Message.MessageType.RESPONSE_ACCEPT
699        elif name.endswith('_Reject'):
700            signal_identifier_str = name[:-7]
701            message_type = Message.MessageType.RESPONSE_REJECT
702        else:
703            raise ValueError('invalid class name')
704
705        subclass.message_type = message_type
706
707        if signal_identifier_str is not None:
708            for name, signal_identifier in AVDTP_SIGNAL_IDENTIFIERS.items():
709                if name.lower().endswith(signal_identifier_str.lower()):
710                    subclass.signal_identifier = signal_identifier
711                    break
712
713            # Register the subclass
714            Message.subclasses.setdefault(subclass.signal_identifier, {})[
715                subclass.message_type
716            ] = subclass
717
718        return subclass
719
720    # Factory method to create a subclass based on the signal identifier and message
721    # type
722    @staticmethod
723    def create(
724        signal_identifier: int, message_type: MessageType, payload: bytes
725    ) -> Message:
726        # Look for a registered subclass
727        subclasses = Message.subclasses.get(signal_identifier)
728        if subclasses:
729            subclass = subclasses.get(message_type)
730            if subclass:
731                instance = subclass.__new__(subclass)
732                instance.payload = payload
733                instance.init_from_payload()
734                return instance
735
736        # Instantiate the appropriate class based on the message type
737        if message_type == Message.MessageType.RESPONSE_REJECT:
738            # Assume a simple reject message
739            instance = Simple_Reject(payload)
740            instance.init_from_payload()
741        else:
742            instance = Message(payload)
743        instance.signal_identifier = signal_identifier
744        instance.message_type = message_type
745        return instance
746
747    def init_from_payload(self) -> None:
748        pass
749
750    def __init__(self, payload: bytes = b'') -> None:
751        self.payload = payload
752
753    def to_string(self, details: Union[str, Iterable[str]]) -> str:
754        base = color(
755            f'{name_or_number(AVDTP_SIGNAL_NAMES, self.signal_identifier)}_'
756            f'{self.message_type.name}',
757            'yellow',
758        )
759
760        if details:
761            if isinstance(details, str):
762                return f'{base}: {details}'
763
764            return (
765                base
766                + ':\n'
767                + '\n'.join(['  ' + color(detail, 'cyan') for detail in details])
768            )
769
770        return base
771
772    def __str__(self) -> str:
773        return self.to_string(self.payload.hex())
774
775
776# -----------------------------------------------------------------------------
777class Simple_Command(Message):
778    '''
779    Command message with just one seid
780    '''
781
782    def init_from_payload(self):
783        self.acp_seid = self.payload[0] >> 2
784
785    def __init__(self, seid):
786        super().__init__(payload=bytes([seid << 2]))
787        self.acp_seid = seid
788
789    def __str__(self) -> str:
790        return self.to_string([f'ACP SEID: {self.acp_seid}'])
791
792
793# -----------------------------------------------------------------------------
794class Simple_Reject(Message):
795    '''
796    Reject messages with just an error code
797    '''
798
799    def init_from_payload(self):
800        self.error_code = self.payload[0]
801
802    def __init__(self, error_code):
803        super().__init__(payload=bytes([error_code]))
804        self.error_code = error_code
805
806    def __str__(self) -> str:
807        details = [f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}']
808        return self.to_string(details)
809
810
811# -----------------------------------------------------------------------------
812@Message.subclass
813class Discover_Command(Message):
814    '''
815    See Bluetooth AVDTP spec - 8.6.1 Stream End Point Discovery Command
816    '''
817
818
819# -----------------------------------------------------------------------------
820@Message.subclass
821class Discover_Response(Message):
822    '''
823    See Bluetooth AVDTP spec - 8.6.2 Stream End Point Discovery Response
824    '''
825
826    endpoints: List[EndPointInfo]
827
828    def init_from_payload(self):
829        self.endpoints = []
830        endpoint_count = len(self.payload) // 2
831        for i in range(endpoint_count):
832            self.endpoints.append(
833                EndPointInfo.from_bytes(self.payload[i * 2 : (i + 1) * 2])
834            )
835
836    def __init__(self, endpoints):
837        super().__init__(payload=b''.join([bytes(endpoint) for endpoint in endpoints]))
838        self.endpoints = endpoints
839
840    def __str__(self) -> str:
841        details = []
842        for endpoint in self.endpoints:
843            details.extend(
844                # pylint: disable=line-too-long
845                [
846                    f'ACP SEID: {endpoint.seid}',
847                    f'  in_use:     {endpoint.in_use}',
848                    f'  media_type: {name_or_number(AVDTP_MEDIA_TYPE_NAMES, endpoint.media_type)}',
849                    f'  tsep:       {name_or_number(AVDTP_TSEP_NAMES, endpoint.tsep)}',
850                ]
851            )
852        return self.to_string(details)
853
854
855# -----------------------------------------------------------------------------
856@Message.subclass
857class Get_Capabilities_Command(Simple_Command):
858    '''
859    See Bluetooth AVDTP spec - 8.7.1 Get Capabilities Command
860    '''
861
862
863# -----------------------------------------------------------------------------
864@Message.subclass
865class Get_Capabilities_Response(Message):
866    '''
867    See Bluetooth AVDTP spec - 8.7.2 Get All Capabilities Response
868    '''
869
870    def init_from_payload(self):
871        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload)
872
873    def __init__(self, capabilities):
874        super().__init__(
875            payload=ServiceCapabilities.serialize_capabilities(capabilities)
876        )
877        self.capabilities = capabilities
878
879    def __str__(self) -> str:
880        details = [str(capability) for capability in self.capabilities]
881        return self.to_string(details)
882
883
884# -----------------------------------------------------------------------------
885@Message.subclass
886class Get_Capabilities_Reject(Simple_Reject):
887    '''
888    See Bluetooth AVDTP spec - 8.7.3 Get Capabilities Reject
889    '''
890
891
892# -----------------------------------------------------------------------------
893@Message.subclass
894class Get_All_Capabilities_Command(Get_Capabilities_Command):
895    '''
896    See Bluetooth AVDTP spec - 8.8.1 Get All Capabilities Command
897    '''
898
899
900# -----------------------------------------------------------------------------
901@Message.subclass
902class Get_All_Capabilities_Response(Get_Capabilities_Response):
903    '''
904    See Bluetooth AVDTP spec - 8.8.2 Get All Capabilities Response
905    '''
906
907
908# -----------------------------------------------------------------------------
909@Message.subclass
910class Get_All_Capabilities_Reject(Simple_Reject):
911    '''
912    See Bluetooth AVDTP spec - 8.8.3 Get All Capabilities Reject
913    '''
914
915
916# -----------------------------------------------------------------------------
917@Message.subclass
918class Set_Configuration_Command(Message):
919    '''
920    See Bluetooth AVDTP spec - 8.9.1 Set Configuration Command
921    '''
922
923    def init_from_payload(self):
924        self.acp_seid = self.payload[0] >> 2
925        self.int_seid = self.payload[1] >> 2
926        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[2:])
927
928    def __init__(
929        self, acp_seid: int, int_seid: int, capabilities: Iterable[ServiceCapabilities]
930    ) -> None:
931        super().__init__(
932            payload=bytes([acp_seid << 2, int_seid << 2])
933            + ServiceCapabilities.serialize_capabilities(capabilities)
934        )
935        self.acp_seid = acp_seid
936        self.int_seid = int_seid
937        self.capabilities = capabilities
938
939    def __str__(self) -> str:
940        details = [f'ACP SEID: {self.acp_seid}', f'INT SEID: {self.int_seid}'] + [
941            str(capability) for capability in self.capabilities
942        ]
943        return self.to_string(details)
944
945
946# -----------------------------------------------------------------------------
947@Message.subclass
948class Set_Configuration_Response(Message):
949    '''
950    See Bluetooth AVDTP spec - 8.9.2 Set Configuration Response
951    '''
952
953
954# -----------------------------------------------------------------------------
955@Message.subclass
956class Set_Configuration_Reject(Message):
957    '''
958    See Bluetooth AVDTP spec - 8.9.3 Set Configuration Reject
959    '''
960
961    def init_from_payload(self):
962        self.service_category = self.payload[0]
963        self.error_code = self.payload[1]
964
965    def __init__(self, service_category, error_code):
966        super().__init__(payload=bytes([service_category, error_code]))
967        self.service_category = service_category
968        self.error_code = error_code
969
970    def __str__(self) -> str:
971        details = [
972            (
973                'service_category: '
974                f'{name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)}'
975            ),
976            (
977                'error_code:       '
978                f'{name_or_number(AVDTP_ERROR_NAMES, self.error_code)}'
979            ),
980        ]
981        return self.to_string(details)
982
983
984# -----------------------------------------------------------------------------
985@Message.subclass
986class Get_Configuration_Command(Simple_Command):
987    '''
988    See Bluetooth AVDTP spec - 8.10.1 Get Configuration Command
989    '''
990
991
992# -----------------------------------------------------------------------------
993@Message.subclass
994class Get_Configuration_Response(Message):
995    '''
996    See Bluetooth AVDTP spec - 8.10.2 Get Configuration Response
997    '''
998
999    def init_from_payload(self):
1000        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload)
1001
1002    def __init__(self, capabilities: Iterable[ServiceCapabilities]) -> None:
1003        super().__init__(
1004            payload=ServiceCapabilities.serialize_capabilities(capabilities)
1005        )
1006        self.capabilities = capabilities
1007
1008    def __str__(self) -> str:
1009        details = [str(capability) for capability in self.capabilities]
1010        return self.to_string(details)
1011
1012
1013# -----------------------------------------------------------------------------
1014@Message.subclass
1015class Get_Configuration_Reject(Simple_Reject):
1016    '''
1017    See Bluetooth AVDTP spec - 8.10.3 Get Configuration Reject
1018    '''
1019
1020
1021# -----------------------------------------------------------------------------
1022@Message.subclass
1023class Reconfigure_Command(Message):
1024    '''
1025    See Bluetooth AVDTP spec - 8.11.1 Reconfigure Command
1026    '''
1027
1028    def init_from_payload(self):
1029        # pylint: disable=attribute-defined-outside-init
1030        self.acp_seid = self.payload[0] >> 2
1031        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[1:])
1032
1033    def __str__(self) -> str:
1034        details = [
1035            f'ACP SEID: {self.acp_seid}',
1036        ] + [str(capability) for capability in self.capabilities]
1037        return self.to_string(details)
1038
1039
1040# -----------------------------------------------------------------------------
1041@Message.subclass
1042class Reconfigure_Response(Message):
1043    '''
1044    See Bluetooth AVDTP spec - 8.11.2 Reconfigure Response
1045    '''
1046
1047
1048# -----------------------------------------------------------------------------
1049@Message.subclass
1050class Reconfigure_Reject(Set_Configuration_Reject):
1051    '''
1052    See Bluetooth AVDTP spec - 8.11.3 Reconfigure Reject
1053    '''
1054
1055
1056# -----------------------------------------------------------------------------
1057@Message.subclass
1058class Open_Command(Simple_Command):
1059    '''
1060    See Bluetooth AVDTP spec - 8.12.1 Open Stream Command
1061    '''
1062
1063
1064# -----------------------------------------------------------------------------
1065@Message.subclass
1066class Open_Response(Message):
1067    '''
1068    See Bluetooth AVDTP spec - 8.12.2 Open Stream Response
1069    '''
1070
1071
1072# -----------------------------------------------------------------------------
1073@Message.subclass
1074class Open_Reject(Simple_Reject):
1075    '''
1076    See Bluetooth AVDTP spec - 8.12.3 Open Stream Reject
1077    '''
1078
1079
1080# -----------------------------------------------------------------------------
1081@Message.subclass
1082class Start_Command(Message):
1083    '''
1084    See Bluetooth AVDTP spec - 8.13.1 Start Stream Command
1085    '''
1086
1087    def init_from_payload(self):
1088        self.acp_seids = [x >> 2 for x in self.payload]
1089
1090    def __init__(self, seids: Iterable[int]) -> None:
1091        super().__init__(payload=bytes([seid << 2 for seid in seids]))
1092        self.acp_seids = seids
1093
1094    def __str__(self) -> str:
1095        return self.to_string([f'ACP SEIDs: {self.acp_seids}'])
1096
1097
1098# -----------------------------------------------------------------------------
1099@Message.subclass
1100class Start_Response(Message):
1101    '''
1102    See Bluetooth AVDTP spec - 8.13.2 Start Stream Response
1103    '''
1104
1105
1106# -----------------------------------------------------------------------------
1107@Message.subclass
1108class Start_Reject(Message):
1109    '''
1110    See Bluetooth AVDTP spec - 8.13.3 Set Configuration Reject
1111    '''
1112
1113    def init_from_payload(self):
1114        self.acp_seid = self.payload[0] >> 2
1115        self.error_code = self.payload[1]
1116
1117    def __init__(self, acp_seid, error_code):
1118        super().__init__(payload=bytes([acp_seid << 2, error_code]))
1119        self.acp_seid = acp_seid
1120        self.error_code = error_code
1121
1122    def __str__(self) -> str:
1123        details = [
1124            f'acp_seid:   {self.acp_seid}',
1125            f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}',
1126        ]
1127        return self.to_string(details)
1128
1129
1130# -----------------------------------------------------------------------------
1131@Message.subclass
1132class Close_Command(Simple_Command):
1133    '''
1134    See Bluetooth AVDTP spec - 8.14.1 Close Stream Command
1135    '''
1136
1137
1138# -----------------------------------------------------------------------------
1139@Message.subclass
1140class Close_Response(Message):
1141    '''
1142    See Bluetooth AVDTP spec - 8.14.2 Close Stream Response
1143    '''
1144
1145
1146# -----------------------------------------------------------------------------
1147@Message.subclass
1148class Close_Reject(Simple_Reject):
1149    '''
1150    See Bluetooth AVDTP spec - 8.14.3 Close Stream Reject
1151    '''
1152
1153
1154# -----------------------------------------------------------------------------
1155@Message.subclass
1156class Suspend_Command(Start_Command):
1157    '''
1158    See Bluetooth AVDTP spec - 8.15.1 Suspend Command
1159    '''
1160
1161
1162# -----------------------------------------------------------------------------
1163@Message.subclass
1164class Suspend_Response(Message):
1165    '''
1166    See Bluetooth AVDTP spec - 8.15.2 Suspend Response
1167    '''
1168
1169
1170# -----------------------------------------------------------------------------
1171@Message.subclass
1172class Suspend_Reject(Start_Reject):
1173    '''
1174    See Bluetooth AVDTP spec - 8.15.3 Suspend Reject
1175    '''
1176
1177
1178# -----------------------------------------------------------------------------
1179@Message.subclass
1180class Abort_Command(Simple_Command):
1181    '''
1182    See Bluetooth AVDTP spec - 8.16.1 Abort Command
1183    '''
1184
1185
1186# -----------------------------------------------------------------------------
1187@Message.subclass
1188class Abort_Response(Message):
1189    '''
1190    See Bluetooth AVDTP spec - 8.16.2 Abort Response
1191    '''
1192
1193
1194# -----------------------------------------------------------------------------
1195@Message.subclass
1196class Security_Control_Command(Message):
1197    '''
1198    See Bluetooth AVDTP spec - 8.17.1 Security Control Command
1199    '''
1200
1201
1202# -----------------------------------------------------------------------------
1203@Message.subclass
1204class Security_Control_Response(Message):
1205    '''
1206    See Bluetooth AVDTP spec - 8.17.2 Security Control Response
1207    '''
1208
1209
1210# -----------------------------------------------------------------------------
1211@Message.subclass
1212class Security_Control_Reject(Simple_Reject):
1213    '''
1214    See Bluetooth AVDTP spec - 8.17.3 Security Control Reject
1215    '''
1216
1217
1218# -----------------------------------------------------------------------------
1219@Message.subclass
1220class General_Reject(Message):
1221    '''
1222    See Bluetooth AVDTP spec - 8.18 General Reject
1223    '''
1224
1225    def to_string(self, details):
1226        return color('GENERAL_REJECT', 'yellow')
1227
1228
1229# -----------------------------------------------------------------------------
1230@Message.subclass
1231class DelayReport_Command(Message):
1232    '''
1233    See Bluetooth AVDTP spec - 8.19.1 Delay Report Command
1234    '''
1235
1236    def init_from_payload(self):
1237        # pylint: disable=attribute-defined-outside-init
1238        self.acp_seid = self.payload[0] >> 2
1239        self.delay = (self.payload[1] << 8) | (self.payload[2])
1240
1241    def __str__(self) -> str:
1242        return self.to_string([f'ACP_SEID: {self.acp_seid}', f'delay:    {self.delay}'])
1243
1244
1245# -----------------------------------------------------------------------------
1246@Message.subclass
1247class DelayReport_Response(Message):
1248    '''
1249    See Bluetooth AVDTP spec - 8.19.2 Delay Report Response
1250    '''
1251
1252
1253# -----------------------------------------------------------------------------
1254@Message.subclass
1255class DelayReport_Reject(Simple_Reject):
1256    '''
1257    See Bluetooth AVDTP spec - 8.19.3 Delay Report Reject
1258    '''
1259
1260
1261# -----------------------------------------------------------------------------
1262class Protocol(EventEmitter):
1263    local_endpoints: List[LocalStreamEndPoint]
1264    remote_endpoints: Dict[int, DiscoveredStreamEndPoint]
1265    streams: Dict[int, Stream]
1266    transaction_results: List[Optional[asyncio.Future[Message]]]
1267    channel_connector: Callable[[], Awaitable[l2cap.ClassicChannel]]
1268
1269    class PacketType(enum.IntEnum):
1270        SINGLE_PACKET = 0
1271        START_PACKET = 1
1272        CONTINUE_PACKET = 2
1273        END_PACKET = 3
1274
1275    @staticmethod
1276    def packet_type_name(packet_type):
1277        return name_or_number(Protocol.PACKET_TYPE_NAMES, packet_type)
1278
1279    @staticmethod
1280    async def connect(
1281        connection: device.Connection, version: Tuple[int, int] = (1, 3)
1282    ) -> Protocol:
1283        channel = await connection.create_l2cap_channel(
1284            spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM)
1285        )
1286        protocol = Protocol(channel, version)
1287
1288        return protocol
1289
1290    def __init__(
1291        self, l2cap_channel: l2cap.ClassicChannel, version: Tuple[int, int] = (1, 3)
1292    ) -> None:
1293        super().__init__()
1294        self.l2cap_channel = l2cap_channel
1295        self.version = version
1296        self.rtx_sig_timer = AVDTP_DEFAULT_RTX_SIG_TIMER
1297        self.message_assembler = MessageAssembler(self.on_message)
1298        self.transaction_results = [None] * 16  # Futures for up to 16 transactions
1299        self.transaction_semaphore = asyncio.Semaphore(16)
1300        self.transaction_count = 0
1301        self.channel_acceptor = None
1302        self.local_endpoints = []  # Local endpoints, with contiguous seid values
1303        self.remote_endpoints = {}  # Remote stream endpoints, by seid
1304        self.streams = {}  # Streams, by seid
1305
1306        # Register to receive PDUs from the channel
1307        l2cap_channel.sink = self.on_pdu
1308        l2cap_channel.on('open', self.on_l2cap_channel_open)
1309        l2cap_channel.on('close', self.on_l2cap_channel_close)
1310
1311    def get_local_endpoint_by_seid(self, seid: int) -> Optional[LocalStreamEndPoint]:
1312        if 0 < seid <= len(self.local_endpoints):
1313            return self.local_endpoints[seid - 1]
1314
1315        return None
1316
1317    def add_source(
1318        self, codec_capabilities: MediaCodecCapabilities, packet_pump: MediaPacketPump
1319    ) -> LocalSource:
1320        seid = len(self.local_endpoints) + 1
1321        source = LocalSource(self, seid, codec_capabilities, packet_pump)
1322        self.local_endpoints.append(source)
1323
1324        return source
1325
1326    def add_sink(self, codec_capabilities: MediaCodecCapabilities) -> LocalSink:
1327        seid = len(self.local_endpoints) + 1
1328        sink = LocalSink(self, seid, codec_capabilities)
1329        self.local_endpoints.append(sink)
1330
1331        return sink
1332
1333    async def create_stream(
1334        self, source: LocalStreamEndPoint, sink: StreamEndPointProxy
1335    ) -> Stream:
1336        # Check that the source isn't already used in a stream
1337        if source.in_use:
1338            raise InvalidStateError('source already in use')
1339
1340        # Create or reuse a new stream to associate the source and the sink
1341        if source.seid in self.streams:
1342            stream = self.streams[source.seid]
1343        else:
1344            stream = Stream(self, source, sink)
1345            self.streams[source.seid] = stream
1346
1347        # The stream can now be configured
1348        await stream.configure()
1349
1350        return stream
1351
1352    async def discover_remote_endpoints(self) -> Iterable[DiscoveredStreamEndPoint]:
1353        self.remote_endpoints = {}
1354
1355        response: Discover_Response = await self.send_command(Discover_Command())
1356        for endpoint_entry in response.endpoints:
1357            logger.debug(
1358                f'getting endpoint capabilities for endpoint {endpoint_entry.seid}'
1359            )
1360            get_capabilities_response = await self.get_capabilities(endpoint_entry.seid)
1361            endpoint = DiscoveredStreamEndPoint(
1362                self,
1363                endpoint_entry.seid,
1364                endpoint_entry.media_type,
1365                endpoint_entry.tsep,
1366                endpoint_entry.in_use,
1367                get_capabilities_response.capabilities,
1368            )
1369            self.remote_endpoints[endpoint_entry.seid] = endpoint
1370
1371        return self.remote_endpoints.values()
1372
1373    def find_remote_sink_by_codec(
1374        self, media_type: int, codec_type: int
1375    ) -> Optional[DiscoveredStreamEndPoint]:
1376        for endpoint in self.remote_endpoints.values():
1377            if (
1378                not endpoint.in_use
1379                and endpoint.media_type == media_type
1380                and endpoint.tsep == AVDTP_TSEP_SNK
1381            ):
1382                has_media_transport = False
1383                has_codec = False
1384                for capabilities in endpoint.capabilities:
1385                    if (
1386                        capabilities.service_category
1387                        == AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY
1388                    ):
1389                        has_media_transport = True
1390                    elif (
1391                        capabilities.service_category
1392                        == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY
1393                    ):
1394                        codec_capabilities = cast(MediaCodecCapabilities, capabilities)
1395                        if (
1396                            codec_capabilities.media_type == AVDTP_AUDIO_MEDIA_TYPE
1397                            and codec_capabilities.media_codec_type == codec_type
1398                        ):
1399                            has_codec = True
1400                if has_media_transport and has_codec:
1401                    return endpoint
1402
1403        return None
1404
1405    def on_pdu(self, pdu: bytes) -> None:
1406        self.message_assembler.on_pdu(pdu)
1407
1408    def on_message(self, transaction_label: int, message: Message) -> None:
1409        logger.debug(
1410            f'{color("<<< Received AVDTP message", "magenta")}: '
1411            f'[{transaction_label}] {message}'
1412        )
1413
1414        # Check that the identifier is not reserved
1415        if message.signal_identifier == 0:
1416            logger.warning('!!! reserved signal identifier')
1417            return
1418
1419        # Check that the identifier is valid
1420        if (
1421            message.signal_identifier < 0
1422            or message.signal_identifier > AVDTP_DELAYREPORT
1423        ):
1424            logger.warning('!!! invalid signal identifier')
1425            self.send_message(transaction_label, General_Reject())
1426
1427        if message.message_type == Message.MessageType.COMMAND:
1428            # Command
1429            signal_name = (
1430                AVDTP_SIGNAL_NAMES.get(message.signal_identifier, "")
1431                .replace("AVDTP_", "")
1432                .lower()
1433            )
1434            handler_name = f'on_{signal_name}_command'
1435            handler = getattr(self, handler_name, None)
1436            if handler:
1437                try:
1438                    response = handler(message)
1439                    self.send_message(transaction_label, response)
1440                except Exception as error:
1441                    logger.warning(
1442                        f'{color("!!! Exception in handler:", "red")} {error}'
1443                    )
1444            else:
1445                logger.warning('unhandled command')
1446        else:
1447            # Response, look for a pending transaction with the same label
1448            transaction_result = self.transaction_results[transaction_label]
1449            if transaction_result is None:
1450                logger.warning(color('!!! no pending transaction for label', 'red'))
1451                return
1452
1453            transaction_result.set_result(message)
1454            self.transaction_results[transaction_label] = None
1455            self.transaction_semaphore.release()
1456
1457    def on_l2cap_connection(self, channel):
1458        # Forward the channel to the endpoint that's expecting it
1459        if self.channel_acceptor is None:
1460            logger.warning(color('!!! l2cap connection with no acceptor', 'red'))
1461            return
1462        self.channel_acceptor.on_l2cap_connection(channel)
1463
1464    def on_l2cap_channel_open(self):
1465        logger.debug(color('<<< L2CAP channel open', 'magenta'))
1466        self.emit('open')
1467
1468    def on_l2cap_channel_close(self):
1469        logger.debug(color('<<< L2CAP channel close', 'magenta'))
1470        self.emit('close')
1471
1472    def send_message(self, transaction_label: int, message: Message) -> None:
1473        logger.debug(
1474            f'{color(">>> Sending AVDTP message", "magenta")}: '
1475            f'[{transaction_label}] {message}'
1476        )
1477        max_fragment_size = (
1478            self.l2cap_channel.peer_mtu - 3
1479        )  # Enough space for a 3-byte start packet header
1480        payload = message.payload
1481        if len(payload) + 2 <= self.l2cap_channel.peer_mtu:
1482            # Fits in a single packet
1483            packet_type = self.PacketType.SINGLE_PACKET
1484        else:
1485            packet_type = self.PacketType.START_PACKET
1486
1487        done = False
1488        while not done:
1489            first_header_byte = (
1490                transaction_label << 4 | packet_type << 2 | message.message_type
1491            )
1492
1493            if packet_type == self.PacketType.SINGLE_PACKET:
1494                header = bytes([first_header_byte, message.signal_identifier])
1495            elif packet_type == self.PacketType.START_PACKET:
1496                packet_count = (
1497                    max_fragment_size - 1 + len(payload)
1498                ) // max_fragment_size
1499                header = bytes(
1500                    [first_header_byte, message.signal_identifier, packet_count]
1501                )
1502            else:
1503                header = bytes([first_header_byte])
1504
1505            # Send one packet
1506            self.l2cap_channel.send_pdu(header + payload[:max_fragment_size])
1507
1508            # Prepare for the next packet
1509            payload = payload[max_fragment_size:]
1510            if payload:
1511                packet_type = (
1512                    self.PacketType.CONTINUE_PACKET
1513                    if len(payload) > max_fragment_size
1514                    else self.PacketType.END_PACKET
1515                )
1516            else:
1517                done = True
1518
1519    async def send_command(self, command: Message):
1520        # TODO: support timeouts
1521        # Send the command
1522        (transaction_label, transaction_result) = await self.start_transaction()
1523        self.send_message(transaction_label, command)
1524
1525        # Wait for the response
1526        response = await transaction_result
1527
1528        # Check for errors
1529        if response.message_type in (
1530            Message.MessageType.GENERAL_REJECT,
1531            Message.MessageType.RESPONSE_REJECT,
1532        ):
1533            assert hasattr(response, 'error_code')
1534            raise ProtocolError(response.error_code, 'avdtp')
1535
1536        return response
1537
1538    async def start_transaction(self) -> Tuple[int, asyncio.Future[Message]]:
1539        # Wait until we can start a new transaction
1540        await self.transaction_semaphore.acquire()
1541
1542        # Look for the next free entry to store the transaction result
1543        for i in range(16):
1544            transaction_label = (self.transaction_count + i) % 16
1545            if self.transaction_results[transaction_label] is None:
1546                transaction_result = asyncio.get_running_loop().create_future()
1547                self.transaction_results[transaction_label] = transaction_result
1548                self.transaction_count += 1
1549                return (transaction_label, transaction_result)
1550
1551        assert False  # Should never reach this
1552
1553    async def get_capabilities(self, seid: int) -> Union[
1554        Get_Capabilities_Response,
1555        Get_All_Capabilities_Response,
1556    ]:
1557        if self.version > (1, 2):
1558            return await self.send_command(Get_All_Capabilities_Command(seid))
1559
1560        return await self.send_command(Get_Capabilities_Command(seid))
1561
1562    async def set_configuration(
1563        self, acp_seid: int, int_seid: int, capabilities: Iterable[ServiceCapabilities]
1564    ) -> Set_Configuration_Response:
1565        return await self.send_command(
1566            Set_Configuration_Command(acp_seid, int_seid, capabilities)
1567        )
1568
1569    async def get_configuration(self, seid: int) -> Get_Configuration_Response:
1570        response = await self.send_command(Get_Configuration_Command(seid))
1571        return response.capabilities
1572
1573    async def open(self, seid: int) -> Open_Response:
1574        return await self.send_command(Open_Command(seid))
1575
1576    async def start(self, seids: Iterable[int]) -> Start_Response:
1577        return await self.send_command(Start_Command(seids))
1578
1579    async def suspend(self, seids: Iterable[int]) -> Suspend_Response:
1580        return await self.send_command(Suspend_Command(seids))
1581
1582    async def close(self, seid: int) -> Close_Response:
1583        return await self.send_command(Close_Command(seid))
1584
1585    async def abort(self, seid: int) -> Abort_Response:
1586        return await self.send_command(Abort_Command(seid))
1587
1588    def on_discover_command(self, _command):
1589        endpoint_infos = [
1590            EndPointInfo(endpoint.seid, 0, endpoint.media_type, endpoint.tsep)
1591            for endpoint in self.local_endpoints
1592        ]
1593        return Discover_Response(endpoint_infos)
1594
1595    def on_get_capabilities_command(self, command):
1596        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1597        if endpoint is None:
1598            return Get_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1599
1600        return Get_Capabilities_Response(endpoint.capabilities)
1601
1602    def on_get_all_capabilities_command(self, command):
1603        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1604        if endpoint is None:
1605            return Get_All_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1606
1607        return Get_All_Capabilities_Response(endpoint.capabilities)
1608
1609    def on_set_configuration_command(self, command):
1610        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1611        if endpoint is None:
1612            return Set_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1613
1614        # Check that the local endpoint isn't in use
1615        if endpoint.in_use:
1616            return Set_Configuration_Reject(AVDTP_SEP_IN_USE_ERROR)
1617
1618        # Create a stream object for the pair of endpoints
1619        stream = Stream(self, endpoint, StreamEndPointProxy(self, command.int_seid))
1620        self.streams[command.acp_seid] = stream
1621
1622        result = stream.on_set_configuration_command(command.capabilities)
1623        return result or Set_Configuration_Response()
1624
1625    def on_get_configuration_command(self, command):
1626        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1627        if endpoint is None:
1628            return Get_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1629        if endpoint.stream is None:
1630            return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1631
1632        return endpoint.stream.on_get_configuration_command()
1633
1634    def on_reconfigure_command(self, command):
1635        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1636        if endpoint is None:
1637            return Reconfigure_Reject(0, AVDTP_BAD_ACP_SEID_ERROR)
1638        if endpoint.stream is None:
1639            return Reconfigure_Reject(0, AVDTP_BAD_STATE_ERROR)
1640
1641        result = endpoint.stream.on_reconfigure_command(command.capabilities)
1642        return result or Reconfigure_Response()
1643
1644    def on_open_command(self, command):
1645        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1646        if endpoint is None:
1647            return Open_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1648        if endpoint.stream is None:
1649            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1650
1651        result = endpoint.stream.on_open_command()
1652        return result or Open_Response()
1653
1654    def on_start_command(self, command):
1655        for seid in command.acp_seids:
1656            endpoint = self.get_local_endpoint_by_seid(seid)
1657            if endpoint is None:
1658                return Start_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR)
1659            if endpoint.stream is None:
1660                return Start_Reject(AVDTP_BAD_STATE_ERROR)
1661
1662        # Start all streams
1663        # TODO: deal with partial failures
1664        for seid in command.acp_seids:
1665            endpoint = self.get_local_endpoint_by_seid(seid)
1666            result = endpoint.stream.on_start_command()
1667            if result is not None:
1668                return result
1669
1670        return Start_Response()
1671
1672    def on_suspend_command(self, command):
1673        for seid in command.acp_seids:
1674            endpoint = self.get_local_endpoint_by_seid(seid)
1675            if endpoint is None:
1676                return Suspend_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR)
1677            if endpoint.stream is None:
1678                return Suspend_Reject(seid, AVDTP_BAD_STATE_ERROR)
1679
1680        # Suspend all streams
1681        # TODO: deal with partial failures
1682        for seid in command.acp_seids:
1683            endpoint = self.get_local_endpoint_by_seid(seid)
1684            result = endpoint.stream.on_suspend_command()
1685            if result is not None:
1686                return result
1687
1688        return Suspend_Response()
1689
1690    def on_close_command(self, command):
1691        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1692        if endpoint is None:
1693            return Close_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1694        if endpoint.stream is None:
1695            return Close_Reject(AVDTP_BAD_STATE_ERROR)
1696
1697        result = endpoint.stream.on_close_command()
1698        return result or Close_Response()
1699
1700    def on_abort_command(self, command):
1701        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1702        if endpoint is None or endpoint.stream is None:
1703            return Abort_Response()
1704
1705        endpoint.stream.on_abort_command()
1706        return Abort_Response()
1707
1708    def on_security_control_command(self, command):
1709        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1710        if endpoint is None:
1711            return Security_Control_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1712
1713        result = endpoint.on_security_control_command(command.payload)
1714        return result or Security_Control_Response()
1715
1716    def on_delayreport_command(self, command):
1717        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1718        if endpoint is None:
1719            return DelayReport_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1720
1721        result = endpoint.on_delayreport_command(command.delay)
1722        return result or DelayReport_Response()
1723
1724
1725# -----------------------------------------------------------------------------
1726class Listener(EventEmitter):
1727    servers: Dict[int, Protocol]
1728
1729    @staticmethod
1730    def create_registrar(device: device.Device):
1731        warnings.warn("Please use Listener.for_device()", DeprecationWarning)
1732
1733        def wrapper(handler: Callable[[l2cap.ClassicChannel], None]) -> None:
1734            device.create_l2cap_server(l2cap.ClassicChannelSpec(psm=AVDTP_PSM), handler)
1735
1736        return wrapper
1737
1738    def set_server(self, connection: device.Connection, server: Protocol) -> None:
1739        self.servers[connection.handle] = server
1740
1741    def remove_server(self, connection: device.Connection) -> None:
1742        if connection.handle in self.servers:
1743            del self.servers[connection.handle]
1744
1745    def __init__(self, registrar=None, version=(1, 3)):
1746        super().__init__()
1747        self.version = version
1748        self.servers = {}  # Servers, by connection handle
1749
1750        # Listen for incoming L2CAP connections
1751        if registrar:
1752            warnings.warn("Please use Listener.for_device()", DeprecationWarning)
1753            registrar(self.on_l2cap_connection)
1754
1755    @classmethod
1756    def for_device(
1757        cls, device: device.Device, version: Tuple[int, int] = (1, 3)
1758    ) -> Listener:
1759        listener = Listener(registrar=None, version=version)
1760        l2cap_server = device.create_l2cap_server(
1761            spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM)
1762        )
1763        l2cap_server.on('connection', listener.on_l2cap_connection)
1764        return listener
1765
1766    def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None:
1767        logger.debug(f'{color("<<< incoming L2CAP connection:", "magenta")} {channel}')
1768
1769        if channel.connection.handle in self.servers:
1770            # This is a channel for a stream endpoint
1771            server = self.servers[channel.connection.handle]
1772            server.on_l2cap_connection(channel)
1773        else:
1774            # This is a new command/response channel
1775            def on_channel_open():
1776                logger.debug('setting up new Protocol for the connection')
1777                server = Protocol(channel, self.version)
1778                self.set_server(channel.connection, server)
1779                self.emit('connection', server)
1780
1781            def on_channel_close():
1782                logger.debug('removing Protocol for the connection')
1783                self.remove_server(channel.connection)
1784
1785            channel.on('open', on_channel_open)
1786            channel.on('close', on_channel_close)
1787
1788
1789# -----------------------------------------------------------------------------
1790class Stream:
1791    '''
1792    Pair of a local and a remote stream endpoint that can stream from one to the other
1793    '''
1794
1795    rtp_channel: Optional[l2cap.ClassicChannel]
1796
1797    @staticmethod
1798    def state_name(state: int) -> str:
1799        return name_or_number(AVDTP_STATE_NAMES, state)
1800
1801    def change_state(self, state: int) -> None:
1802        logger.debug(f'{self} state change -> {color(self.state_name(state), "cyan")}')
1803        self.state = state
1804
1805    def send_media_packet(self, packet: MediaPacket) -> None:
1806        assert self.rtp_channel
1807        self.rtp_channel.send_pdu(bytes(packet))
1808
1809    async def configure(self) -> None:
1810        if self.state != AVDTP_IDLE_STATE:
1811            raise InvalidStateError('current state is not IDLE')
1812
1813        await self.remote_endpoint.set_configuration(
1814            self.local_endpoint.seid, self.local_endpoint.configuration
1815        )
1816        self.change_state(AVDTP_CONFIGURED_STATE)
1817
1818    async def open(self) -> None:
1819        if self.state != AVDTP_CONFIGURED_STATE:
1820            raise InvalidStateError('current state is not CONFIGURED')
1821
1822        logger.debug('opening remote endpoint')
1823        await self.remote_endpoint.open()
1824
1825        self.change_state(AVDTP_OPEN_STATE)
1826
1827        # Create a channel for RTP packets
1828        self.rtp_channel = (
1829            await self.protocol.l2cap_channel.connection.create_l2cap_channel(
1830                l2cap.ClassicChannelSpec(psm=AVDTP_PSM)
1831            )
1832        )
1833
1834    async def start(self) -> None:
1835        # Auto-open if needed
1836        if self.state == AVDTP_CONFIGURED_STATE:
1837            await self.open()
1838
1839        if self.state != AVDTP_OPEN_STATE:
1840            raise InvalidStateError('current state is not OPEN')
1841
1842        logger.debug('starting remote endpoint')
1843        await self.remote_endpoint.start()
1844
1845        logger.debug('starting local endpoint')
1846        await self.local_endpoint.start()
1847
1848        self.change_state(AVDTP_STREAMING_STATE)
1849
1850    async def stop(self) -> None:
1851        if self.state != AVDTP_STREAMING_STATE:
1852            raise InvalidStateError('current state is not STREAMING')
1853
1854        logger.debug('stopping local endpoint')
1855        await self.local_endpoint.stop()
1856
1857        logger.debug('stopping remote endpoint')
1858        await self.remote_endpoint.stop()
1859
1860        self.change_state(AVDTP_OPEN_STATE)
1861
1862    async def close(self) -> None:
1863        if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE):
1864            raise InvalidStateError('current state is not OPEN or STREAMING')
1865
1866        logger.debug('closing local endpoint')
1867        await self.local_endpoint.close()
1868
1869        logger.debug('closing remote endpoint')
1870        await self.remote_endpoint.close()
1871
1872        # Release any channels we may have created
1873        self.change_state(AVDTP_CLOSING_STATE)
1874        if self.rtp_channel:
1875            await self.rtp_channel.disconnect()
1876            self.rtp_channel = None
1877
1878        # Release the endpoint
1879        self.local_endpoint.in_use = 0
1880
1881        self.change_state(AVDTP_IDLE_STATE)
1882
1883    def on_set_configuration_command(self, configuration):
1884        if self.state != AVDTP_IDLE_STATE:
1885            return Set_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1886
1887        result = self.local_endpoint.on_set_configuration_command(configuration)
1888        if result is not None:
1889            return result
1890
1891        self.change_state(AVDTP_CONFIGURED_STATE)
1892        return None
1893
1894    def on_get_configuration_command(self, configuration):
1895        if self.state not in (
1896            AVDTP_CONFIGURED_STATE,
1897            AVDTP_OPEN_STATE,
1898            AVDTP_STREAMING_STATE,
1899        ):
1900            return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1901
1902        return self.local_endpoint.on_get_configuration_command(configuration)
1903
1904    def on_reconfigure_command(self, configuration):
1905        if self.state != AVDTP_OPEN_STATE:
1906            return Reconfigure_Reject(AVDTP_BAD_STATE_ERROR)
1907
1908        result = self.local_endpoint.on_reconfigure_command(configuration)
1909        if result is not None:
1910            return result
1911
1912        return None
1913
1914    def on_open_command(self):
1915        if self.state != AVDTP_CONFIGURED_STATE:
1916            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1917
1918        result = self.local_endpoint.on_open_command()
1919        if result is not None:
1920            return result
1921
1922        # Register to accept the next channel
1923        self.protocol.channel_acceptor = self
1924
1925        self.change_state(AVDTP_OPEN_STATE)
1926        return None
1927
1928    def on_start_command(self):
1929        if self.state != AVDTP_OPEN_STATE:
1930            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1931
1932        # Check that we have an RTP channel
1933        if self.rtp_channel is None:
1934            logger.warning('received start command before RTP channel establishment')
1935            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1936
1937        result = self.local_endpoint.on_start_command()
1938        if result is not None:
1939            return result
1940
1941        self.change_state(AVDTP_STREAMING_STATE)
1942        return None
1943
1944    def on_suspend_command(self):
1945        if self.state != AVDTP_STREAMING_STATE:
1946            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1947
1948        result = self.local_endpoint.on_suspend_command()
1949        if result is not None:
1950            return result
1951
1952        self.change_state(AVDTP_OPEN_STATE)
1953        return None
1954
1955    def on_close_command(self):
1956        if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE):
1957            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1958
1959        result = self.local_endpoint.on_close_command()
1960        if result is not None:
1961            return result
1962
1963        self.change_state(AVDTP_CLOSING_STATE)
1964
1965        if self.rtp_channel is None:
1966            # No channel to release, we're done
1967            self.change_state(AVDTP_IDLE_STATE)
1968        else:
1969            # TODO: set a timer as we wait for the RTP channel to be closed
1970            pass
1971
1972        return None
1973
1974    def on_abort_command(self):
1975        if self.rtp_channel is None:
1976            # No need to wait
1977            self.change_state(AVDTP_IDLE_STATE)
1978        else:
1979            # Wait for the RTP channel to be closed
1980            self.change_state(AVDTP_ABORTING_STATE)
1981
1982    def on_l2cap_connection(self, channel):
1983        logger.debug(color('<<< stream channel connected', 'magenta'))
1984        self.rtp_channel = channel
1985        channel.on('open', self.on_l2cap_channel_open)
1986        channel.on('close', self.on_l2cap_channel_close)
1987
1988        # We don't need more channels
1989        self.protocol.channel_acceptor = None
1990
1991    def on_l2cap_channel_open(self):
1992        logger.debug(color('<<< stream channel open', 'magenta'))
1993        self.local_endpoint.on_rtp_channel_open()
1994
1995    def on_l2cap_channel_close(self):
1996        logger.debug(color('<<< stream channel closed', 'magenta'))
1997        self.local_endpoint.on_rtp_channel_close()
1998        self.local_endpoint.in_use = 0
1999        self.rtp_channel = None
2000
2001        if self.state in (AVDTP_CLOSING_STATE, AVDTP_ABORTING_STATE):
2002            self.change_state(AVDTP_IDLE_STATE)
2003        else:
2004            logger.warning('unexpected channel close while not CLOSING or ABORTING')
2005
2006    def __init__(
2007        self,
2008        protocol: Protocol,
2009        local_endpoint: LocalStreamEndPoint,
2010        remote_endpoint: StreamEndPointProxy,
2011    ) -> None:
2012        '''
2013        remote_endpoint must be a subclass of StreamEndPointProxy
2014
2015        '''
2016        self.protocol = protocol
2017        self.local_endpoint = local_endpoint
2018        self.remote_endpoint = remote_endpoint
2019        self.rtp_channel = None
2020        self.state = AVDTP_IDLE_STATE
2021
2022        local_endpoint.stream = self
2023        local_endpoint.in_use = 1
2024
2025    def __str__(self) -> str:
2026        return (
2027            f'Stream({self.local_endpoint.seid} -> '
2028            f'{self.remote_endpoint.seid} {self.state_name(self.state)})'
2029        )
2030
2031
2032# -----------------------------------------------------------------------------
2033class StreamEndPoint:
2034    def __init__(
2035        self,
2036        seid: int,
2037        media_type: int,
2038        tsep: int,
2039        in_use: int,
2040        capabilities: Iterable[ServiceCapabilities],
2041    ) -> None:
2042        self.seid = seid
2043        self.media_type = media_type
2044        self.tsep = tsep
2045        self.in_use = in_use
2046        self.capabilities = capabilities
2047
2048    def __str__(self) -> str:
2049        media_type = f'{name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}'
2050        tsep = f'{name_or_number(AVDTP_TSEP_NAMES, self.tsep)}'
2051        return '\n'.join(
2052            [
2053                'SEP(',
2054                f'  seid={self.seid}',
2055                f'  media_type={media_type}',
2056                f'  tsep={tsep}',
2057                f'  in_use={self.in_use}',
2058                '  capabilities=[',
2059                '\n'.join([f'    {x}' for x in self.capabilities]),
2060                '  ]',
2061                ')',
2062            ]
2063        )
2064
2065
2066# -----------------------------------------------------------------------------
2067class StreamEndPointProxy:
2068    def __init__(self, protocol: Protocol, seid: int) -> None:
2069        self.seid = seid
2070        self.protocol = protocol
2071
2072    async def set_configuration(
2073        self, int_seid: int, configuration: Iterable[ServiceCapabilities]
2074    ) -> Set_Configuration_Response:
2075        return await self.protocol.set_configuration(self.seid, int_seid, configuration)
2076
2077    async def open(self) -> Open_Response:
2078        return await self.protocol.open(self.seid)
2079
2080    async def start(self) -> Start_Response:
2081        return await self.protocol.start([self.seid])
2082
2083    async def stop(self) -> Suspend_Response:
2084        return await self.protocol.suspend([self.seid])
2085
2086    async def close(self) -> Close_Response:
2087        return await self.protocol.close(self.seid)
2088
2089    async def abort(self) -> Abort_Response:
2090        return await self.protocol.abort(self.seid)
2091
2092
2093# -----------------------------------------------------------------------------
2094class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy):
2095    def __init__(
2096        self,
2097        protocol: Protocol,
2098        seid: int,
2099        media_type: int,
2100        tsep: int,
2101        in_use: int,
2102        capabilities: Iterable[ServiceCapabilities],
2103    ) -> None:
2104        StreamEndPoint.__init__(self, seid, media_type, tsep, in_use, capabilities)
2105        StreamEndPointProxy.__init__(self, protocol, seid)
2106
2107
2108# -----------------------------------------------------------------------------
2109class LocalStreamEndPoint(StreamEndPoint, EventEmitter):
2110    stream: Optional[Stream]
2111
2112    def __init__(
2113        self,
2114        protocol: Protocol,
2115        seid: int,
2116        media_type: int,
2117        tsep: int,
2118        capabilities: Iterable[ServiceCapabilities],
2119        configuration: Optional[Iterable[ServiceCapabilities]] = None,
2120    ):
2121        StreamEndPoint.__init__(self, seid, media_type, tsep, 0, capabilities)
2122        EventEmitter.__init__(self)
2123        self.protocol = protocol
2124        self.configuration = configuration if configuration is not None else []
2125        self.stream = None
2126
2127    async def start(self):
2128        pass
2129
2130    async def stop(self):
2131        pass
2132
2133    async def close(self):
2134        pass
2135
2136    def on_reconfigure_command(self, command):
2137        pass
2138
2139    def on_set_configuration_command(self, configuration):
2140        logger.debug(
2141            '<<< received configuration: '
2142            f'{",".join([str(capability) for capability in configuration])}'
2143        )
2144        self.configuration = configuration
2145        self.emit('configuration')
2146
2147    def on_get_configuration_command(self):
2148        return Get_Configuration_Response(self.configuration)
2149
2150    def on_open_command(self):
2151        self.emit('open')
2152
2153    def on_start_command(self):
2154        self.emit('start')
2155
2156    def on_suspend_command(self):
2157        self.emit('suspend')
2158
2159    def on_close_command(self):
2160        self.emit('close')
2161
2162    def on_abort_command(self):
2163        self.emit('abort')
2164
2165    def on_rtp_channel_open(self):
2166        self.emit('rtp_channel_open')
2167
2168    def on_rtp_channel_close(self):
2169        self.emit('rtp_channel_close')
2170
2171
2172# -----------------------------------------------------------------------------
2173class LocalSource(LocalStreamEndPoint):
2174    def __init__(
2175        self,
2176        protocol: Protocol,
2177        seid: int,
2178        codec_capabilities: MediaCodecCapabilities,
2179        packet_pump: MediaPacketPump,
2180    ) -> None:
2181        capabilities = [
2182            ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
2183            codec_capabilities,
2184        ]
2185        super().__init__(
2186            protocol,
2187            seid,
2188            codec_capabilities.media_type,
2189            AVDTP_TSEP_SRC,
2190            capabilities,
2191            capabilities,
2192        )
2193        self.packet_pump = packet_pump
2194
2195    async def start(self) -> None:
2196        if self.packet_pump and self.stream and self.stream.rtp_channel:
2197            return await self.packet_pump.start(self.stream.rtp_channel)
2198
2199        self.emit('start')
2200
2201    async def stop(self) -> None:
2202        if self.packet_pump:
2203            return await self.packet_pump.stop()
2204
2205        self.emit('stop')
2206
2207    def on_start_command(self):
2208        asyncio.create_task(self.start())
2209
2210    def on_suspend_command(self):
2211        asyncio.create_task(self.stop())
2212
2213
2214# -----------------------------------------------------------------------------
2215class LocalSink(LocalStreamEndPoint):
2216    def __init__(
2217        self, protocol: Protocol, seid: int, codec_capabilities: MediaCodecCapabilities
2218    ) -> None:
2219        capabilities = [
2220            ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
2221            codec_capabilities,
2222        ]
2223        super().__init__(
2224            protocol,
2225            seid,
2226            codec_capabilities.media_type,
2227            AVDTP_TSEP_SNK,
2228            capabilities,
2229        )
2230
2231    def on_rtp_channel_open(self):
2232        logger.debug(color('<<< RTP channel open', 'magenta'))
2233        self.stream.rtp_channel.sink = self.on_avdtp_packet
2234        super().on_rtp_channel_open()
2235
2236    def on_rtp_channel_close(self):
2237        logger.debug(color('<<< RTP channel close', 'magenta'))
2238        super().on_rtp_channel_close()
2239
2240    def on_avdtp_packet(self, packet):
2241        rtp_packet = MediaPacket.from_bytes(packet)
2242        logger.debug(
2243            f'{color("<<< RTP Packet:", "green")} '
2244            f'{rtp_packet} {rtp_packet.payload[:16].hex()}'
2245        )
2246        self.emit('rtp_packet', rtp_packet)
2247