• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20import struct
21import time
22import logging
23from pyee import EventEmitter
24from typing import Dict, Type
25
26from .core import (
27    BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE,
28    InvalidStateError,
29    ProtocolError,
30    name_or_number,
31)
32from .a2dp import (
33    A2DP_CODEC_TYPE_NAMES,
34    A2DP_MPEG_2_4_AAC_CODEC_TYPE,
35    A2DP_NON_A2DP_CODEC_TYPE,
36    A2DP_SBC_CODEC_TYPE,
37    AacMediaCodecInformation,
38    SbcMediaCodecInformation,
39    VendorSpecificMediaCodecInformation,
40)
41from . import sdp
42from .colors import color
43
44# -----------------------------------------------------------------------------
45# Logging
46# -----------------------------------------------------------------------------
47logger = logging.getLogger(__name__)
48
49
50# -----------------------------------------------------------------------------
51# Constants
52# -----------------------------------------------------------------------------
53# fmt: off
54# pylint: disable=line-too-long
55
56AVDTP_PSM = 0x0019
57
58AVDTP_DEFAULT_RTX_SIG_TIMER = 5  # Seconds
59
60# Signal Identifiers (AVDTP spec - 8.5 Signal Command Set)
61AVDTP_DISCOVER             = 0x01
62AVDTP_GET_CAPABILITIES     = 0x02
63AVDTP_SET_CONFIGURATION    = 0x03
64AVDTP_GET_CONFIGURATION    = 0x04
65AVDTP_RECONFIGURE          = 0x05
66AVDTP_OPEN                 = 0x06
67AVDTP_START                = 0x07
68AVDTP_CLOSE                = 0x08
69AVDTP_SUSPEND              = 0x09
70AVDTP_ABORT                = 0x0A
71AVDTP_SECURITY_CONTROL     = 0x0B
72AVDTP_GET_ALL_CAPABILITIES = 0x0C
73AVDTP_DELAYREPORT          = 0x0D
74
75AVDTP_SIGNAL_NAMES = {
76    AVDTP_DISCOVER:             'AVDTP_DISCOVER',
77    AVDTP_GET_CAPABILITIES:     'AVDTP_GET_CAPABILITIES',
78    AVDTP_SET_CONFIGURATION:    'AVDTP_SET_CONFIGURATION',
79    AVDTP_GET_CONFIGURATION:    'AVDTP_GET_CONFIGURATION',
80    AVDTP_RECONFIGURE:          'AVDTP_RECONFIGURE',
81    AVDTP_OPEN:                 'AVDTP_OPEN',
82    AVDTP_START:                'AVDTP_START',
83    AVDTP_CLOSE:                'AVDTP_CLOSE',
84    AVDTP_SUSPEND:              'AVDTP_SUSPEND',
85    AVDTP_ABORT:                'AVDTP_ABORT',
86    AVDTP_SECURITY_CONTROL:     'AVDTP_SECURITY_CONTROL',
87    AVDTP_GET_ALL_CAPABILITIES: 'AVDTP_GET_ALL_CAPABILITIES',
88    AVDTP_DELAYREPORT:          'AVDTP_DELAYREPORT'
89}
90
91AVDTP_SIGNAL_IDENTIFIERS = {
92    'AVDTP_DISCOVER':             AVDTP_DISCOVER,
93    'AVDTP_GET_CAPABILITIES':     AVDTP_GET_CAPABILITIES,
94    'AVDTP_SET_CONFIGURATION':    AVDTP_SET_CONFIGURATION,
95    'AVDTP_GET_CONFIGURATION':    AVDTP_GET_CONFIGURATION,
96    'AVDTP_RECONFIGURE':          AVDTP_RECONFIGURE,
97    'AVDTP_OPEN':                 AVDTP_OPEN,
98    'AVDTP_START':                AVDTP_START,
99    'AVDTP_CLOSE':                AVDTP_CLOSE,
100    'AVDTP_SUSPEND':              AVDTP_SUSPEND,
101    'AVDTP_ABORT':                AVDTP_ABORT,
102    'AVDTP_SECURITY_CONTROL':     AVDTP_SECURITY_CONTROL,
103    'AVDTP_GET_ALL_CAPABILITIES': AVDTP_GET_ALL_CAPABILITIES,
104    'AVDTP_DELAYREPORT':          AVDTP_DELAYREPORT
105}
106
107# Error codes (AVDTP spec - 8.20.6.2 ERROR_CODE tables)
108AVDTP_BAD_HEADER_FORMAT_ERROR          = 0x01
109AVDTP_BAD_LENGTH_ERROR                 = 0x11
110AVDTP_BAD_ACP_SEID_ERROR               = 0x12
111AVDTP_SEP_IN_USE_ERROR                 = 0x13
112AVDTP_SEP_NOT_IN_USE_ERROR             = 0x14
113AVDTP_BAD_SERV_CATEGORY_ERROR          = 0x17
114AVDTP_BAD_PAYLOAD_FORMAT_ERROR         = 0x18
115AVDTP_NOT_SUPPORTED_COMMAND_ERROR      = 0x19
116AVDTP_INVALID_CAPABILITIES_ERROR       = 0x1A
117AVDTP_BAD_RECOVERY_TYPE_ERROR          = 0x22
118AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR = 0x23
119AVDTP_BAD_RECOVERY_FORMAT_ERROR        = 0x25
120AVDTP_BAD_ROHC_FORMAT_ERROR            = 0x26
121AVDTP_BAD_CP_FORMAT_ERROR              = 0x27
122AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR    = 0x28
123AVDTP_UNSUPPORTED_CONFIGURATION_ERROR  = 0x29
124AVDTP_BAD_STATE_ERROR                  = 0x31
125
126AVDTP_ERROR_NAMES = {
127    AVDTP_BAD_HEADER_FORMAT_ERROR:          'AVDTP_BAD_HEADER_FORMAT_ERROR',
128    AVDTP_BAD_LENGTH_ERROR:                 'AVDTP_BAD_LENGTH_ERROR',
129    AVDTP_BAD_ACP_SEID_ERROR:               'AVDTP_BAD_ACP_SEID_ERROR',
130    AVDTP_SEP_IN_USE_ERROR:                 'AVDTP_SEP_IN_USE_ERROR',
131    AVDTP_SEP_NOT_IN_USE_ERROR:             'AVDTP_SEP_NOT_IN_USE_ERROR',
132    AVDTP_BAD_SERV_CATEGORY_ERROR:          'AVDTP_BAD_SERV_CATEGORY_ERROR',
133    AVDTP_BAD_PAYLOAD_FORMAT_ERROR:         'AVDTP_BAD_PAYLOAD_FORMAT_ERROR',
134    AVDTP_NOT_SUPPORTED_COMMAND_ERROR:      'AVDTP_NOT_SUPPORTED_COMMAND_ERROR',
135    AVDTP_INVALID_CAPABILITIES_ERROR:       'AVDTP_INVALID_CAPABILITIES_ERROR',
136    AVDTP_BAD_RECOVERY_TYPE_ERROR:          'AVDTP_BAD_RECOVERY_TYPE_ERROR',
137    AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR: 'AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR',
138    AVDTP_BAD_RECOVERY_FORMAT_ERROR:        'AVDTP_BAD_RECOVERY_FORMAT_ERROR',
139    AVDTP_BAD_ROHC_FORMAT_ERROR:            'AVDTP_BAD_ROHC_FORMAT_ERROR',
140    AVDTP_BAD_CP_FORMAT_ERROR:              'AVDTP_BAD_CP_FORMAT_ERROR',
141    AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR:    'AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR',
142    AVDTP_UNSUPPORTED_CONFIGURATION_ERROR:  'AVDTP_UNSUPPORTED_CONFIGURATION_ERROR',
143    AVDTP_BAD_STATE_ERROR:                  'AVDTP_BAD_STATE_ERROR'
144}
145
146AVDTP_AUDIO_MEDIA_TYPE      = 0x00
147AVDTP_VIDEO_MEDIA_TYPE      = 0x01
148AVDTP_MULTIMEDIA_MEDIA_TYPE = 0x02
149
150AVDTP_MEDIA_TYPE_NAMES = {
151    AVDTP_AUDIO_MEDIA_TYPE:      'AVDTP_AUDIO_MEDIA_TYPE',
152    AVDTP_VIDEO_MEDIA_TYPE:      'AVDTP_VIDEO_MEDIA_TYPE',
153    AVDTP_MULTIMEDIA_MEDIA_TYPE: 'AVDTP_MULTIMEDIA_MEDIA_TYPE'
154}
155
156# TSEP (AVDTP spec - 8.20.3 Stream End-point Type, Source or Sink (TSEP))
157AVDTP_TSEP_SRC = 0x00
158AVDTP_TSEP_SNK = 0x01
159
160AVDTP_TSEP_NAMES = {
161    AVDTP_TSEP_SRC: 'AVDTP_TSEP_SRC',
162    AVDTP_TSEP_SNK: 'AVDTP_TSEP_SNK'
163}
164
165# Service Categories (AVDTP spec - Table 8.47: Service Category information element field values)
166AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY    = 0x01
167AVDTP_REPORTING_SERVICE_CATEGORY          = 0x02
168AVDTP_RECOVERY_SERVICE_CATEGORY           = 0x03
169AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY = 0x04
170AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY = 0x05
171AVDTP_MULTIPLEXING_SERVICE_CATEGORY       = 0x06
172AVDTP_MEDIA_CODEC_SERVICE_CATEGORY        = 0x07
173AVDTP_DELAY_REPORTING_SERVICE_CATEGORY    = 0x08
174
175AVDTP_SERVICE_CATEGORY_NAMES = {
176    AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY:    'AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY',
177    AVDTP_REPORTING_SERVICE_CATEGORY:          'AVDTP_REPORTING_SERVICE_CATEGORY',
178    AVDTP_RECOVERY_SERVICE_CATEGORY:           'AVDTP_RECOVERY_SERVICE_CATEGORY',
179    AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY: 'AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY',
180    AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY: 'AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY',
181    AVDTP_MULTIPLEXING_SERVICE_CATEGORY:       'AVDTP_MULTIPLEXING_SERVICE_CATEGORY',
182    AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:        'AVDTP_MEDIA_CODEC_SERVICE_CATEGORY',
183    AVDTP_DELAY_REPORTING_SERVICE_CATEGORY:    'AVDTP_DELAY_REPORTING_SERVICE_CATEGORY'
184}
185
186# States (AVDTP spec - 9.1 State Definitions)
187AVDTP_IDLE_STATE       = 0x00
188AVDTP_CONFIGURED_STATE = 0x01
189AVDTP_OPEN_STATE       = 0x02
190AVDTP_STREAMING_STATE  = 0x03
191AVDTP_CLOSING_STATE    = 0x04
192AVDTP_ABORTING_STATE   = 0x05
193
194AVDTP_STATE_NAMES = {
195    AVDTP_IDLE_STATE:       'AVDTP_IDLE_STATE',
196    AVDTP_CONFIGURED_STATE: 'AVDTP_CONFIGURED_STATE',
197    AVDTP_OPEN_STATE:       'AVDTP_OPEN_STATE',
198    AVDTP_STREAMING_STATE:  'AVDTP_STREAMING_STATE',
199    AVDTP_CLOSING_STATE:    'AVDTP_CLOSING_STATE',
200    AVDTP_ABORTING_STATE:   'AVDTP_ABORTING_STATE'
201}
202
203# fmt: on
204# pylint: enable=line-too-long
205# pylint: disable=invalid-name
206
207
208# -----------------------------------------------------------------------------
209async def find_avdtp_service_with_sdp_client(sdp_client):
210    '''
211    Find an AVDTP service, using a connected SDP client, and return its version,
212    or None if none is found
213    '''
214
215    # Search for services with an Audio Sink service class
216    search_result = await sdp_client.search_attributes(
217        [BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE],
218        [sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID],
219    )
220    for attribute_list in search_result:
221        profile_descriptor_list = sdp.ServiceAttribute.find_attribute_in_list(
222            attribute_list, sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
223        )
224        if profile_descriptor_list:
225            for profile_descriptor in profile_descriptor_list.value:
226                if len(profile_descriptor.value) >= 2:
227                    avdtp_version_major = profile_descriptor.value[1].value >> 8
228                    avdtp_version_minor = profile_descriptor.value[1].value & 0xFF
229                    return (avdtp_version_major, avdtp_version_minor)
230
231
232# -----------------------------------------------------------------------------
233async def find_avdtp_service_with_connection(device, connection):
234    '''
235    Find an AVDTP service, for a connection, and return its version,
236    or None if none is found
237    '''
238
239    sdp_client = sdp.Client(device)
240    await sdp_client.connect(connection)
241    service_version = await find_avdtp_service_with_sdp_client(sdp_client)
242    await sdp_client.disconnect()
243
244    return service_version
245
246
247# -----------------------------------------------------------------------------
248class RealtimeClock:
249    def now(self):
250        return time.time()
251
252    async def sleep(self, duration):
253        await asyncio.sleep(duration)
254
255
256# -----------------------------------------------------------------------------
257class MediaPacket:
258    @staticmethod
259    def from_bytes(data):
260        version = (data[0] >> 6) & 0x03
261        padding = (data[0] >> 5) & 0x01
262        extension = (data[0] >> 4) & 0x01
263        csrc_count = data[0] & 0x0F
264        marker = (data[1] >> 7) & 0x01
265        payload_type = data[1] & 0x7F
266        sequence_number = struct.unpack_from('>H', data, 2)[0]
267        timestamp = struct.unpack_from('>I', data, 4)[0]
268        ssrc = struct.unpack_from('>I', data, 8)[0]
269        csrc_list = [
270            struct.unpack_from('>I', data, 12 + i)[0] for i in range(csrc_count)
271        ]
272        payload = data[12 + csrc_count * 4 :]
273
274        return MediaPacket(
275            version,
276            padding,
277            extension,
278            marker,
279            sequence_number,
280            timestamp,
281            ssrc,
282            csrc_list,
283            payload_type,
284            payload,
285        )
286
287    def __init__(
288        self,
289        version,
290        padding,
291        extension,
292        marker,
293        sequence_number,
294        timestamp,
295        ssrc,
296        csrc_list,
297        payload_type,
298        payload,
299    ):
300        self.version = version
301        self.padding = padding
302        self.extension = extension
303        self.marker = marker
304        self.sequence_number = sequence_number
305        self.timestamp = timestamp
306        self.ssrc = ssrc
307        self.csrc_list = csrc_list
308        self.payload_type = payload_type
309        self.payload = payload
310
311    def __bytes__(self):
312        header = bytes(
313            [
314                self.version << 6
315                | self.padding << 5
316                | self.extension << 4
317                | len(self.csrc_list),
318                self.marker << 7 | self.payload_type,
319            ]
320        ) + struct.pack('>HII', self.sequence_number, self.timestamp, self.ssrc)
321        for csrc in self.csrc_list:
322            header += struct.pack('>I', csrc)
323        return header + self.payload
324
325    def __str__(self):
326        return (
327            f'RTP(v={self.version},'
328            f'p={self.padding},'
329            f'x={self.extension},'
330            f'm={self.marker},'
331            f'pt={self.payload_type},'
332            f'sn={self.sequence_number},'
333            f'ts={self.timestamp},'
334            f'ssrc={self.ssrc},'
335            f'csrcs={self.csrc_list},'
336            f'payload_size={len(self.payload)})'
337        )
338
339
340# -----------------------------------------------------------------------------
341class MediaPacketPump:
342    def __init__(self, packets, clock=RealtimeClock()):
343        self.packets = packets
344        self.clock = clock
345        self.pump_task = None
346
347    async def start(self, rtp_channel):
348        async def pump_packets():
349            start_time = 0
350            start_timestamp = 0
351
352            try:
353                logger.debug('pump starting')
354                async for packet in self.packets:
355                    # Capture the timestamp of the first packet
356                    if start_time == 0:
357                        start_time = self.clock.now()
358                        start_timestamp = packet.timestamp_seconds
359
360                    # Wait until we can send
361                    when = start_time + (packet.timestamp_seconds - start_timestamp)
362                    now = self.clock.now()
363                    if when > now:
364                        delay = when - now
365                        logger.debug(f'waiting for {delay}')
366                        await self.clock.sleep(delay)
367
368                    # Emit
369                    rtp_channel.send_pdu(bytes(packet))
370                    logger.debug(
371                        f'{color(">>> sending RTP packet:", "green")} {packet}'
372                    )
373            except asyncio.exceptions.CancelledError:
374                logger.debug('pump canceled')
375
376        # Pump packets
377        self.pump_task = asyncio.create_task(pump_packets())
378
379    async def stop(self):
380        # Stop the pump
381        if self.pump_task:
382            self.pump_task.cancel()
383            await self.pump_task
384            self.pump_task = None
385
386
387# -----------------------------------------------------------------------------
388class MessageAssembler:  # pylint: disable=attribute-defined-outside-init
389    def __init__(self, callback):
390        self.callback = callback
391        self.reset()
392
393    def reset(self):
394        self.transaction_label = 0
395        self.message = None
396        self.message_type = 0
397        self.signal_identifier = 0
398        self.number_of_signal_packets = 0
399        self.packet_count = 0
400
401    def on_pdu(self, pdu):
402        self.packet_count += 1
403
404        transaction_label = pdu[0] >> 4
405        packet_type = (pdu[0] >> 2) & 3
406        message_type = pdu[0] & 3
407
408        logger.debug(
409            f'transaction_label={transaction_label}, '
410            f'packet_type={Protocol.packet_type_name(packet_type)}, '
411            f'message_type={Message.message_type_name(message_type)}'
412        )
413        if packet_type in (Protocol.SINGLE_PACKET, Protocol.START_PACKET):
414            if self.message is not None:
415                # The previous message has not been terminated
416                logger.warning(
417                    'received a start or single packet when expecting an end or '
418                    'continuation'
419                )
420                self.reset()
421
422            self.transaction_label = transaction_label
423            self.signal_identifier = pdu[1] & 0x3F
424            self.message_type = message_type
425
426            if packet_type == Protocol.SINGLE_PACKET:
427                self.message = pdu[2:]
428                self.on_message_complete()
429            else:
430                self.number_of_signal_packets = pdu[2]
431                self.message = pdu[3:]
432        elif packet_type in (Protocol.CONTINUE_PACKET, Protocol.END_PACKET):
433            if self.packet_count == 0:
434                logger.warning('unexpected continuation')
435                return
436
437            if transaction_label != self.transaction_label:
438                logger.warning(
439                    f'transaction label mismatch: expected {self.transaction_label}, '
440                    f'received {transaction_label}'
441                )
442                return
443
444            if message_type != self.message_type:
445                logger.warning(
446                    f'message type mismatch: expected {self.message_type}, '
447                    f'received {message_type}'
448                )
449                return
450
451            self.message += pdu[1:]
452
453            if packet_type == Protocol.END_PACKET:
454                if self.packet_count != self.number_of_signal_packets:
455                    logger.warning(
456                        'incomplete fragmented message: '
457                        f'expected {self.number_of_signal_packets} packets, '
458                        f'received {self.packet_count}'
459                    )
460                    self.reset()
461                    return
462
463                self.on_message_complete()
464            else:
465                if self.packet_count > self.number_of_signal_packets:
466                    logger.warning(
467                        'too many packets: '
468                        f'expected {self.number_of_signal_packets}, '
469                        f'received {self.packet_count}'
470                    )
471                    self.reset()
472                    return
473
474    def on_message_complete(self):
475        message = Message.create(
476            self.signal_identifier, self.message_type, self.message
477        )
478
479        try:
480            self.callback(self.transaction_label, message)
481        except Exception as error:
482            logger.warning(color(f'!!! exception in callback: {error}'))
483
484        self.reset()
485
486
487# -----------------------------------------------------------------------------
488class ServiceCapabilities:
489    @staticmethod
490    def create(service_category, service_capabilities_bytes):
491        # Select the appropriate subclass
492        if service_category == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:
493            cls = MediaCodecCapabilities
494        else:
495            cls = ServiceCapabilities
496
497        # Create an instance and initialize it
498        instance = cls.__new__(cls)
499        instance.service_category = service_category
500        instance.service_capabilities_bytes = service_capabilities_bytes
501        instance.init_from_bytes()
502
503        return instance
504
505    @staticmethod
506    def parse_capabilities(payload):
507        capabilities = []
508        while payload:
509            service_category = payload[0]
510            length_of_service_capabilities = payload[1]
511            service_capabilities_bytes = payload[2 : 2 + length_of_service_capabilities]
512            capabilities.append(
513                ServiceCapabilities.create(service_category, service_capabilities_bytes)
514            )
515
516            payload = payload[2 + length_of_service_capabilities :]
517
518        return capabilities
519
520    @staticmethod
521    def serialize_capabilities(capabilities):
522        serialized = b''
523        for item in capabilities:
524            serialized += (
525                bytes([item.service_category, len(item.service_capabilities_bytes)])
526                + item.service_capabilities_bytes
527            )
528        return serialized
529
530    def init_from_bytes(self):
531        pass
532
533    def __init__(self, service_category, service_capabilities_bytes=b''):
534        self.service_category = service_category
535        self.service_capabilities_bytes = service_capabilities_bytes
536
537    def to_string(self, details=[]):  # pylint: disable=dangerous-default-value
538        attributes = ','.join(
539            [name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)]
540            + details
541        )
542        return f'ServiceCapabilities({attributes})'
543
544    def __str__(self):
545        if self.service_capabilities_bytes:
546            details = [self.service_capabilities_bytes.hex()]
547        else:
548            details = []
549        return self.to_string(details)
550
551
552# -----------------------------------------------------------------------------
553class MediaCodecCapabilities(ServiceCapabilities):
554    def init_from_bytes(self):
555        self.media_type = self.service_capabilities_bytes[0]
556        self.media_codec_type = self.service_capabilities_bytes[1]
557        self.media_codec_information = self.service_capabilities_bytes[2:]
558
559        if self.media_codec_type == A2DP_SBC_CODEC_TYPE:
560            self.media_codec_information = SbcMediaCodecInformation.from_bytes(
561                self.media_codec_information
562            )
563        elif self.media_codec_type == A2DP_MPEG_2_4_AAC_CODEC_TYPE:
564            self.media_codec_information = AacMediaCodecInformation.from_bytes(
565                self.media_codec_information
566            )
567        elif self.media_codec_type == A2DP_NON_A2DP_CODEC_TYPE:
568            self.media_codec_information = (
569                VendorSpecificMediaCodecInformation.from_bytes(
570                    self.media_codec_information
571                )
572            )
573
574    def __init__(self, media_type, media_codec_type, media_codec_information):
575        super().__init__(
576            AVDTP_MEDIA_CODEC_SERVICE_CATEGORY,
577            bytes([media_type, media_codec_type]) + bytes(media_codec_information),
578        )
579        self.media_type = media_type
580        self.media_codec_type = media_codec_type
581        self.media_codec_information = media_codec_information
582
583    def __str__(self):
584        codec_info = (
585            self.media_codec_information.hex()
586            if isinstance(self.media_codec_information, bytes)
587            else str(self.media_codec_information)
588        )
589
590        details = [
591            f'media_type={name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}',
592            f'codec={name_or_number(A2DP_CODEC_TYPE_NAMES, self.media_codec_type)}',
593            f'codec_info={codec_info}',
594        ]
595        return self.to_string(details)
596
597
598# -----------------------------------------------------------------------------
599class EndPointInfo:
600    @staticmethod
601    def from_bytes(payload):
602        return EndPointInfo(
603            payload[0] >> 2, payload[0] >> 1 & 1, payload[1] >> 4, payload[1] >> 3 & 1
604        )
605
606    def __bytes__(self):
607        return bytes(
608            [self.seid << 2 | self.in_use << 1, self.media_type << 4 | self.tsep << 3]
609        )
610
611    def __init__(self, seid, in_use, media_type, tsep):
612        self.seid = seid
613        self.in_use = in_use
614        self.media_type = media_type
615        self.tsep = tsep
616
617
618# -----------------------------------------------------------------------------
619class Message:  # pylint:disable=attribute-defined-outside-init
620    COMMAND = 0
621    GENERAL_REJECT = 1
622    RESPONSE_ACCEPT = 2
623    RESPONSE_REJECT = 3
624
625    MESSAGE_TYPE_NAMES = {
626        COMMAND: 'COMMAND',
627        GENERAL_REJECT: 'GENERAL_REJECT',
628        RESPONSE_ACCEPT: 'RESPONSE_ACCEPT',
629        RESPONSE_REJECT: 'RESPONSE_REJECT',
630    }
631
632    # Subclasses, by signal identifier and message type
633    subclasses: Dict[int, Dict[int, Type[Message]]] = {}
634
635    @staticmethod
636    def message_type_name(message_type):
637        return name_or_number(Message.MESSAGE_TYPE_NAMES, message_type)
638
639    @staticmethod
640    def subclass(subclass):
641        # Infer the signal identifier and message subtype from the class name
642        name = subclass.__name__
643        if name == 'General_Reject':
644            subclass.signal_identifier = 0
645            signal_identifier_str = None
646            message_type = Message.COMMAND
647        elif name.endswith('_Command'):
648            signal_identifier_str = name[:-8]
649            message_type = Message.COMMAND
650        elif name.endswith('_Response'):
651            signal_identifier_str = name[:-9]
652            message_type = Message.RESPONSE_ACCEPT
653        elif name.endswith('_Reject'):
654            signal_identifier_str = name[:-7]
655            message_type = Message.RESPONSE_REJECT
656        else:
657            raise ValueError('invalid class name')
658
659        subclass.message_type = message_type
660
661        if signal_identifier_str is not None:
662            for (name, signal_identifier) in AVDTP_SIGNAL_IDENTIFIERS.items():
663                if name.lower().endswith(signal_identifier_str.lower()):
664                    subclass.signal_identifier = signal_identifier
665                    break
666
667            # Register the subclass
668            Message.subclasses.setdefault(subclass.signal_identifier, {})[
669                subclass.message_type
670            ] = subclass
671
672        return subclass
673
674    # Factory method to create a subclass based on the signal identifier and message
675    # type
676    @staticmethod
677    def create(signal_identifier, message_type, payload):
678        # Look for a registered subclass
679        subclasses = Message.subclasses.get(signal_identifier)
680        if subclasses:
681            subclass = subclasses.get(message_type)
682            if subclass:
683                instance = subclass.__new__(subclass)
684                instance.payload = payload
685                instance.init_from_payload()
686                return instance
687
688        # Instantiate the appropriate class based on the message type
689        if message_type == Message.RESPONSE_REJECT:
690            # Assume a simple reject message
691            instance = Simple_Reject(payload)
692            instance.init_from_payload()
693        else:
694            instance = Message(payload)
695        instance.signal_identifier = signal_identifier
696        instance.message_type = message_type
697        return instance
698
699    def init_from_payload(self):
700        pass
701
702    def __init__(self, payload=b''):
703        self.payload = payload
704
705    def to_string(self, details):
706        base = color(
707            f'{name_or_number(AVDTP_SIGNAL_NAMES, self.signal_identifier)}_'
708            f'{Message.message_type_name(self.message_type)}',
709            'yellow',
710        )
711
712        if details:
713            if isinstance(details, str):
714                return f'{base}: {details}'
715
716            return (
717                base
718                + ':\n'
719                + '\n'.join(['  ' + color(detail, 'cyan') for detail in details])
720            )
721
722        return base
723
724    def __str__(self):
725        return self.to_string(self.payload.hex())
726
727
728# -----------------------------------------------------------------------------
729class Simple_Command(Message):
730    '''
731    Command message with just one seid
732    '''
733
734    def init_from_payload(self):
735        self.acp_seid = self.payload[0] >> 2
736
737    def __init__(self, seid):
738        super().__init__(payload=bytes([seid << 2]))
739        self.acp_seid = seid
740
741    def __str__(self):
742        return self.to_string([f'ACP SEID: {self.acp_seid}'])
743
744
745# -----------------------------------------------------------------------------
746class Simple_Reject(Message):
747    '''
748    Reject messages with just an error code
749    '''
750
751    def init_from_payload(self):
752        self.error_code = self.payload[0]
753
754    def __init__(self, error_code):
755        super().__init__(payload=bytes([error_code]))
756        self.error_code = error_code
757
758    def __str__(self):
759        details = [f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}']
760        return self.to_string(details)
761
762
763# -----------------------------------------------------------------------------
764@Message.subclass
765class Discover_Command(Message):
766    '''
767    See Bluetooth AVDTP spec - 8.6.1 Stream End Point Discovery Command
768    '''
769
770
771# -----------------------------------------------------------------------------
772@Message.subclass
773class Discover_Response(Message):
774    '''
775    See Bluetooth AVDTP spec - 8.6.2 Stream End Point Discovery Response
776    '''
777
778    def init_from_payload(self):
779        self.endpoints = []
780        endpoint_count = len(self.payload) // 2
781        for i in range(endpoint_count):
782            self.endpoints.append(
783                EndPointInfo.from_bytes(self.payload[i * 2 : (i + 1) * 2])
784            )
785
786    def __init__(self, endpoints):
787        super().__init__(payload=b''.join([bytes(endpoint) for endpoint in endpoints]))
788        self.endpoints = endpoints
789
790    def __str__(self):
791        details = []
792        for endpoint in self.endpoints:
793            details.extend(
794                # pylint: disable=line-too-long
795                [
796                    f'ACP SEID: {endpoint.seid}',
797                    f'  in_use:     {endpoint.in_use}',
798                    f'  media_type: {name_or_number(AVDTP_MEDIA_TYPE_NAMES, endpoint.media_type)}',
799                    f'  tsep:       {name_or_number(AVDTP_TSEP_NAMES, endpoint.tsep)}',
800                ]
801            )
802        return self.to_string(details)
803
804
805# -----------------------------------------------------------------------------
806@Message.subclass
807class Get_Capabilities_Command(Simple_Command):
808    '''
809    See Bluetooth AVDTP spec - 8.7.1 Get Capabilities Command
810    '''
811
812
813# -----------------------------------------------------------------------------
814@Message.subclass
815class Get_Capabilities_Response(Message):
816    '''
817    See Bluetooth AVDTP spec - 8.7.2 Get All Capabilities Response
818    '''
819
820    def init_from_payload(self):
821        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload)
822
823    def __init__(self, capabilities):
824        super().__init__(
825            payload=ServiceCapabilities.serialize_capabilities(capabilities)
826        )
827        self.capabilities = capabilities
828
829    def __str__(self):
830        details = [str(capability) for capability in self.capabilities]
831        return self.to_string(details)
832
833
834# -----------------------------------------------------------------------------
835@Message.subclass
836class Get_Capabilities_Reject(Simple_Reject):
837    '''
838    See Bluetooth AVDTP spec - 8.7.3 Get Capabilities Reject
839    '''
840
841
842# -----------------------------------------------------------------------------
843@Message.subclass
844class Get_All_Capabilities_Command(Get_Capabilities_Command):
845    '''
846    See Bluetooth AVDTP spec - 8.8.1 Get All Capabilities Command
847    '''
848
849
850# -----------------------------------------------------------------------------
851@Message.subclass
852class Get_All_Capabilities_Response(Get_Capabilities_Response):
853    '''
854    See Bluetooth AVDTP spec - 8.8.2 Get All Capabilities Response
855    '''
856
857
858# -----------------------------------------------------------------------------
859@Message.subclass
860class Get_All_Capabilities_Reject(Simple_Reject):
861    '''
862    See Bluetooth AVDTP spec - 8.8.3 Get All Capabilities Reject
863    '''
864
865
866# -----------------------------------------------------------------------------
867@Message.subclass
868class Set_Configuration_Command(Message):
869    '''
870    See Bluetooth AVDTP spec - 8.9.1 Set Configuration Command
871    '''
872
873    def init_from_payload(self):
874        self.acp_seid = self.payload[0] >> 2
875        self.int_seid = self.payload[1] >> 2
876        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[2:])
877
878    def __init__(self, acp_seid, int_seid, capabilities):
879        super().__init__(
880            payload=bytes([acp_seid << 2, int_seid << 2])
881            + ServiceCapabilities.serialize_capabilities(capabilities)
882        )
883        self.acp_seid = acp_seid
884        self.int_seid = int_seid
885        self.capabilities = capabilities
886
887    def __str__(self):
888        details = [f'ACP SEID: {self.acp_seid}', f'INT SEID: {self.int_seid}'] + [
889            str(capability) for capability in self.capabilities
890        ]
891        return self.to_string(details)
892
893
894# -----------------------------------------------------------------------------
895@Message.subclass
896class Set_Configuration_Response(Message):
897    '''
898    See Bluetooth AVDTP spec - 8.9.2 Set Configuration Response
899    '''
900
901
902# -----------------------------------------------------------------------------
903@Message.subclass
904class Set_Configuration_Reject(Message):
905    '''
906    See Bluetooth AVDTP spec - 8.9.3 Set Configuration Reject
907    '''
908
909    def init_from_payload(self):
910        self.service_category = self.payload[0]
911        self.error_code = self.payload[1]
912
913    def __init__(self, service_category, error_code):
914        super().__init__(payload=bytes([service_category, error_code]))
915        self.service_category = service_category
916        self.error_code = error_code
917
918    def __str__(self):
919        details = [
920            (
921                'service_category: '
922                f'{name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)}'
923            ),
924            (
925                'error_code:       '
926                f'{name_or_number(AVDTP_ERROR_NAMES, self.error_code)}'
927            ),
928        ]
929        return self.to_string(details)
930
931
932# -----------------------------------------------------------------------------
933@Message.subclass
934class Get_Configuration_Command(Simple_Command):
935    '''
936    See Bluetooth AVDTP spec - 8.10.1 Get Configuration Command
937    '''
938
939
940# -----------------------------------------------------------------------------
941@Message.subclass
942class Get_Configuration_Response(Message):
943    '''
944    See Bluetooth AVDTP spec - 8.10.2 Get Configuration Response
945    '''
946
947    def init_from_payload(self):
948        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload)
949
950    def __init__(self, capabilities):
951        super().__init__(
952            payload=ServiceCapabilities.serialize_capabilities(capabilities)
953        )
954        self.capabilities = capabilities
955
956    def __str__(self):
957        details = [str(capability) for capability in self.capabilities]
958        return self.to_string(details)
959
960
961# -----------------------------------------------------------------------------
962@Message.subclass
963class Get_Configuration_Reject(Simple_Reject):
964    '''
965    See Bluetooth AVDTP spec - 8.10.3 Get Configuration Reject
966    '''
967
968
969# -----------------------------------------------------------------------------
970@Message.subclass
971class Reconfigure_Command(Message):
972    '''
973    See Bluetooth AVDTP spec - 8.11.1 Reconfigure Command
974    '''
975
976    def init_from_payload(self):
977        # pylint: disable=attribute-defined-outside-init
978        self.acp_seid = self.payload[0] >> 2
979        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[1:])
980
981    def __str__(self):
982        details = [
983            f'ACP SEID: {self.acp_seid}',
984        ] + [str(capability) for capability in self.capabilities]
985        return self.to_string(details)
986
987
988# -----------------------------------------------------------------------------
989@Message.subclass
990class Reconfigure_Response(Message):
991    '''
992    See Bluetooth AVDTP spec - 8.11.2 Reconfigure Response
993    '''
994
995
996# -----------------------------------------------------------------------------
997@Message.subclass
998class Reconfigure_Reject(Set_Configuration_Reject):
999    '''
1000    See Bluetooth AVDTP spec - 8.11.3 Reconfigure Reject
1001    '''
1002
1003
1004# -----------------------------------------------------------------------------
1005@Message.subclass
1006class Open_Command(Simple_Command):
1007    '''
1008    See Bluetooth AVDTP spec - 8.12.1 Open Stream Command
1009    '''
1010
1011
1012# -----------------------------------------------------------------------------
1013@Message.subclass
1014class Open_Response(Message):
1015    '''
1016    See Bluetooth AVDTP spec - 8.12.2 Open Stream Response
1017    '''
1018
1019
1020# -----------------------------------------------------------------------------
1021@Message.subclass
1022class Open_Reject(Simple_Reject):
1023    '''
1024    See Bluetooth AVDTP spec - 8.12.3 Open Stream Reject
1025    '''
1026
1027
1028# -----------------------------------------------------------------------------
1029@Message.subclass
1030class Start_Command(Message):
1031    '''
1032    See Bluetooth AVDTP spec - 8.13.1 Start Stream Command
1033    '''
1034
1035    def init_from_payload(self):
1036        self.acp_seids = [x >> 2 for x in self.payload]
1037
1038    def __init__(self, seids):
1039        super().__init__(payload=bytes([seid << 2 for seid in seids]))
1040        self.acp_seids = seids
1041
1042    def __str__(self):
1043        return self.to_string([f'ACP SEIDs: {self.acp_seids}'])
1044
1045
1046# -----------------------------------------------------------------------------
1047@Message.subclass
1048class Start_Response(Message):
1049    '''
1050    See Bluetooth AVDTP spec - 8.13.2 Start Stream Response
1051    '''
1052
1053
1054# -----------------------------------------------------------------------------
1055@Message.subclass
1056class Start_Reject(Message):
1057    '''
1058    See Bluetooth AVDTP spec - 8.13.3 Set Configuration Reject
1059    '''
1060
1061    def init_from_payload(self):
1062        self.acp_seid = self.payload[0] >> 2
1063        self.error_code = self.payload[1]
1064
1065    def __init__(self, acp_seid, error_code):
1066        super().__init__(payload=bytes([acp_seid << 2, error_code]))
1067        self.acp_seid = acp_seid
1068        self.error_code = error_code
1069
1070    def __str__(self):
1071        details = [
1072            f'acp_seid:   {self.acp_seid}',
1073            f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}',
1074        ]
1075        return self.to_string(details)
1076
1077
1078# -----------------------------------------------------------------------------
1079@Message.subclass
1080class Close_Command(Simple_Command):
1081    '''
1082    See Bluetooth AVDTP spec - 8.14.1 Close Stream Command
1083    '''
1084
1085
1086# -----------------------------------------------------------------------------
1087@Message.subclass
1088class Close_Response(Message):
1089    '''
1090    See Bluetooth AVDTP spec - 8.14.2 Close Stream Response
1091    '''
1092
1093
1094# -----------------------------------------------------------------------------
1095@Message.subclass
1096class Close_Reject(Simple_Reject):
1097    '''
1098    See Bluetooth AVDTP spec - 8.14.3 Close Stream Reject
1099    '''
1100
1101
1102# -----------------------------------------------------------------------------
1103@Message.subclass
1104class Suspend_Command(Start_Command):
1105    '''
1106    See Bluetooth AVDTP spec - 8.15.1 Suspend Command
1107    '''
1108
1109
1110# -----------------------------------------------------------------------------
1111@Message.subclass
1112class Suspend_Response(Message):
1113    '''
1114    See Bluetooth AVDTP spec - 8.15.2 Suspend Response
1115    '''
1116
1117
1118# -----------------------------------------------------------------------------
1119@Message.subclass
1120class Suspend_Reject(Start_Reject):
1121    '''
1122    See Bluetooth AVDTP spec - 8.15.3 Suspend Reject
1123    '''
1124
1125
1126# -----------------------------------------------------------------------------
1127@Message.subclass
1128class Abort_Command(Simple_Command):
1129    '''
1130    See Bluetooth AVDTP spec - 8.16.1 Abort Command
1131    '''
1132
1133
1134# -----------------------------------------------------------------------------
1135@Message.subclass
1136class Abort_Response(Message):
1137    '''
1138    See Bluetooth AVDTP spec - 8.16.2 Abort Response
1139    '''
1140
1141
1142# -----------------------------------------------------------------------------
1143@Message.subclass
1144class Security_Control_Command(Message):
1145    '''
1146    See Bluetooth AVDTP spec - 8.17.1 Security Control Command
1147    '''
1148
1149
1150# -----------------------------------------------------------------------------
1151@Message.subclass
1152class Security_Control_Response(Message):
1153    '''
1154    See Bluetooth AVDTP spec - 8.17.2 Security Control Response
1155    '''
1156
1157
1158# -----------------------------------------------------------------------------
1159@Message.subclass
1160class Security_Control_Reject(Simple_Reject):
1161    '''
1162    See Bluetooth AVDTP spec - 8.17.3 Security Control Reject
1163    '''
1164
1165
1166# -----------------------------------------------------------------------------
1167@Message.subclass
1168class General_Reject(Message):
1169    '''
1170    See Bluetooth AVDTP spec - 8.18 General Reject
1171    '''
1172
1173    def to_string(self, details):
1174        return color('GENERAL_REJECT', 'yellow')
1175
1176
1177# -----------------------------------------------------------------------------
1178@Message.subclass
1179class DelayReport_Command(Message):
1180    '''
1181    See Bluetooth AVDTP spec - 8.19.1 Delay Report Command
1182    '''
1183
1184    def init_from_payload(self):
1185        # pylint: disable=attribute-defined-outside-init
1186        self.acp_seid = self.payload[0] >> 2
1187        self.delay = (self.payload[1] << 8) | (self.payload[2])
1188
1189    def __str__(self):
1190        return self.to_string([f'ACP_SEID: {self.acp_seid}', f'delay:    {self.delay}'])
1191
1192
1193# -----------------------------------------------------------------------------
1194@Message.subclass
1195class DelayReport_Response(Message):
1196    '''
1197    See Bluetooth AVDTP spec - 8.19.2 Delay Report Response
1198    '''
1199
1200
1201# -----------------------------------------------------------------------------
1202@Message.subclass
1203class DelayReport_Reject(Simple_Reject):
1204    '''
1205    See Bluetooth AVDTP spec - 8.19.3 Delay Report Reject
1206    '''
1207
1208
1209# -----------------------------------------------------------------------------
1210class Protocol:
1211    SINGLE_PACKET = 0
1212    START_PACKET = 1
1213    CONTINUE_PACKET = 2
1214    END_PACKET = 3
1215
1216    PACKET_TYPE_NAMES = {
1217        SINGLE_PACKET: 'SINGLE_PACKET',
1218        START_PACKET: 'START_PACKET',
1219        CONTINUE_PACKET: 'CONTINUE_PACKET',
1220        END_PACKET: 'END_PACKET',
1221    }
1222
1223    @staticmethod
1224    def packet_type_name(packet_type):
1225        return name_or_number(Protocol.PACKET_TYPE_NAMES, packet_type)
1226
1227    @staticmethod
1228    async def connect(connection, version=(1, 3)):
1229        connector = connection.create_l2cap_connector(AVDTP_PSM)
1230        channel = await connector()
1231        protocol = Protocol(channel, version)
1232        protocol.channel_connector = connector
1233
1234        return protocol
1235
1236    def __init__(self, l2cap_channel, version=(1, 3)):
1237        self.l2cap_channel = l2cap_channel
1238        self.version = version
1239        self.rtx_sig_timer = AVDTP_DEFAULT_RTX_SIG_TIMER
1240        self.message_assembler = MessageAssembler(self.on_message)
1241        self.transaction_results = [None] * 16  # Futures for up to 16 transactions
1242        self.transaction_semaphore = asyncio.Semaphore(16)
1243        self.transaction_count = 0
1244        self.channel_acceptor = None
1245        self.channel_connector = None
1246        self.local_endpoints = []  # Local endpoints, with contiguous seid values
1247        self.remote_endpoints = {}  # Remote stream endpoints, by seid
1248        self.streams = {}  # Streams, by seid
1249
1250        # Register to receive PDUs from the channel
1251        l2cap_channel.sink = self.on_pdu
1252        l2cap_channel.on('open', self.on_l2cap_channel_open)
1253
1254    def get_local_endpoint_by_seid(self, seid):
1255        if 0 < seid <= len(self.local_endpoints):
1256            return self.local_endpoints[seid - 1]
1257
1258        return None
1259
1260    def add_source(self, codec_capabilities, packet_pump):
1261        seid = len(self.local_endpoints) + 1
1262        source = LocalSource(self, seid, codec_capabilities, packet_pump)
1263        self.local_endpoints.append(source)
1264
1265        return source
1266
1267    def add_sink(self, codec_capabilities):
1268        seid = len(self.local_endpoints) + 1
1269        sink = LocalSink(self, seid, codec_capabilities)
1270        self.local_endpoints.append(sink)
1271
1272        return sink
1273
1274    async def create_stream(self, source, sink):
1275        # Check that the source isn't already used in a stream
1276        if source.in_use:
1277            raise InvalidStateError('source already in use')
1278
1279        # Create or reuse a new stream to associate the source and the sink
1280        if source.seid in self.streams:
1281            stream = self.streams[source.seid]
1282        else:
1283            stream = Stream(self, source, sink)
1284            self.streams[source.seid] = stream
1285
1286        # The stream can now be configured
1287        await stream.configure()
1288
1289        return stream
1290
1291    async def discover_remote_endpoints(self):
1292        self.remote_endpoints = {}
1293
1294        response = await self.send_command(Discover_Command())
1295        for endpoint_entry in response.endpoints:
1296            logger.debug(
1297                f'getting endpoint capabilities for endpoint {endpoint_entry.seid}'
1298            )
1299            get_capabilities_response = await self.get_capabilities(endpoint_entry.seid)
1300            endpoint = DiscoveredStreamEndPoint(
1301                self,
1302                endpoint_entry.seid,
1303                endpoint_entry.media_type,
1304                endpoint_entry.tsep,
1305                endpoint_entry.in_use,
1306                get_capabilities_response.capabilities,
1307            )
1308            self.remote_endpoints[endpoint_entry.seid] = endpoint
1309
1310        return self.remote_endpoints.values()
1311
1312    def find_remote_sink_by_codec(self, media_type, codec_type):
1313        for endpoint in self.remote_endpoints.values():
1314            if (
1315                not endpoint.in_use
1316                and endpoint.media_type == media_type
1317                and endpoint.tsep == AVDTP_TSEP_SNK
1318            ):
1319                has_media_transport = False
1320                has_codec = False
1321                for capabilities in endpoint.capabilities:
1322                    if (
1323                        capabilities.service_category
1324                        == AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY
1325                    ):
1326                        has_media_transport = True
1327                    elif (
1328                        capabilities.service_category
1329                        == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY
1330                    ):
1331                        if (
1332                            capabilities.media_type == AVDTP_AUDIO_MEDIA_TYPE
1333                            and capabilities.media_codec_type == codec_type
1334                        ):
1335                            has_codec = True
1336                if has_media_transport and has_codec:
1337                    return endpoint
1338
1339        return None
1340
1341    def on_pdu(self, pdu):
1342        self.message_assembler.on_pdu(pdu)
1343
1344    def on_message(self, transaction_label, message):
1345        logger.debug(
1346            f'{color("<<< Received AVDTP message", "magenta")}: '
1347            f'[{transaction_label}] {message}'
1348        )
1349
1350        # Check that the identifier is not reserved
1351        if message.signal_identifier == 0:
1352            logger.warning('!!! reserved signal identifier')
1353            return
1354
1355        # Check that the identifier is valid
1356        if (
1357            message.signal_identifier < 0
1358            or message.signal_identifier > AVDTP_DELAYREPORT
1359        ):
1360            logger.warning('!!! invalid signal identifier')
1361            self.send_message(transaction_label, General_Reject())
1362
1363        if message.message_type == Message.COMMAND:
1364            # Command
1365            signal_name = (
1366                AVDTP_SIGNAL_NAMES.get(message.signal_identifier, "")
1367                .replace("AVDTP_", "")
1368                .lower()
1369            )
1370            handler_name = f'on_{signal_name}_command'
1371            handler = getattr(self, handler_name, None)
1372            if handler:
1373                try:
1374                    response = handler(message)
1375                    self.send_message(transaction_label, response)
1376                except Exception as error:
1377                    logger.warning(
1378                        f'{color("!!! Exception in handler:", "red")} {error}'
1379                    )
1380            else:
1381                logger.warning('unhandled command')
1382        else:
1383            # Response, look for a pending transaction with the same label
1384            transaction_result = self.transaction_results[transaction_label]
1385            if transaction_result is None:
1386                logger.warning(color('!!! no pending transaction for label', 'red'))
1387                return
1388
1389            transaction_result.set_result(message)
1390            self.transaction_results[transaction_label] = None
1391            self.transaction_semaphore.release()
1392
1393    def on_l2cap_connection(self, channel):
1394        # Forward the channel to the endpoint that's expecting it
1395        if self.channel_acceptor:
1396            self.channel_acceptor.on_l2cap_connection(channel)
1397
1398    def on_l2cap_channel_open(self):
1399        logger.debug(color('<<< L2CAP channel open', 'magenta'))
1400
1401    def send_message(self, transaction_label, message):
1402        logger.debug(
1403            f'{color(">>> Sending AVDTP message", "magenta")}: '
1404            f'[{transaction_label}] {message}'
1405        )
1406        max_fragment_size = (
1407            self.l2cap_channel.mtu - 3
1408        )  # Enough space for a 3-byte start packet header
1409        payload = message.payload
1410        if len(payload) + 2 <= self.l2cap_channel.mtu:
1411            # Fits in a single packet
1412            packet_type = self.SINGLE_PACKET
1413        else:
1414            packet_type = self.START_PACKET
1415
1416        done = False
1417        while not done:
1418            first_header_byte = (
1419                transaction_label << 4 | packet_type << 2 | message.message_type
1420            )
1421
1422            if packet_type == self.SINGLE_PACKET:
1423                header = bytes([first_header_byte, message.signal_identifier])
1424            elif packet_type == self.START_PACKET:
1425                packet_count = (
1426                    max_fragment_size - 1 + len(payload)
1427                ) // max_fragment_size
1428                header = bytes(
1429                    [first_header_byte, message.signal_identifier, packet_count]
1430                )
1431            else:
1432                header = bytes([first_header_byte])
1433
1434            # Send one packet
1435            self.l2cap_channel.send_pdu(header + payload[:max_fragment_size])
1436
1437            # Prepare for the next packet
1438            payload = payload[max_fragment_size:]
1439            if payload:
1440                packet_type = (
1441                    self.CONTINUE_PACKET
1442                    if payload > max_fragment_size
1443                    else self.END_PACKET
1444                )
1445            else:
1446                done = True
1447
1448    async def send_command(self, command):
1449        # TODO: support timeouts
1450        # Send the command
1451        (transaction_label, transaction_result) = await self.start_transaction()
1452        self.send_message(transaction_label, command)
1453
1454        # Wait for the response
1455        response = await transaction_result
1456
1457        # Check for errors
1458        if response.message_type in (Message.GENERAL_REJECT, Message.RESPONSE_REJECT):
1459            raise ProtocolError(response.error_code, 'avdtp')
1460
1461        return response
1462
1463    async def start_transaction(self):
1464        # Wait until we can start a new transaction
1465        await self.transaction_semaphore.acquire()
1466
1467        # Look for the next free entry to store the transaction result
1468        for i in range(16):
1469            transaction_label = (self.transaction_count + i) % 16
1470            if self.transaction_results[transaction_label] is None:
1471                transaction_result = asyncio.get_running_loop().create_future()
1472                self.transaction_results[transaction_label] = transaction_result
1473                self.transaction_count += 1
1474                return (transaction_label, transaction_result)
1475
1476        assert False  # Should never reach this
1477
1478    async def get_capabilities(self, seid):
1479        if self.version > (1, 2):
1480            return await self.send_command(Get_All_Capabilities_Command(seid))
1481
1482        return await self.send_command(Get_Capabilities_Command(seid))
1483
1484    async def set_configuration(self, acp_seid, int_seid, capabilities):
1485        return await self.send_command(
1486            Set_Configuration_Command(acp_seid, int_seid, capabilities)
1487        )
1488
1489    async def get_configuration(self, seid):
1490        response = await self.send_command(Get_Configuration_Command(seid))
1491        return response.capabilities
1492
1493    async def open(self, seid):
1494        return await self.send_command(Open_Command(seid))
1495
1496    async def start(self, seids):
1497        return await self.send_command(Start_Command(seids))
1498
1499    async def suspend(self, seids):
1500        return await self.send_command(Suspend_Command(seids))
1501
1502    async def close(self, seid):
1503        return await self.send_command(Close_Command(seid))
1504
1505    async def abort(self, seid):
1506        return await self.send_command(Abort_Command(seid))
1507
1508    def on_discover_command(self, _command):
1509        endpoint_infos = [
1510            EndPointInfo(endpoint.seid, 0, endpoint.media_type, endpoint.tsep)
1511            for endpoint in self.local_endpoints
1512        ]
1513        return Discover_Response(endpoint_infos)
1514
1515    def on_get_capabilities_command(self, command):
1516        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1517        if endpoint is None:
1518            return Get_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1519
1520        return Get_Capabilities_Response(endpoint.capabilities)
1521
1522    def on_get_all_capabilities_command(self, command):
1523        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1524        if endpoint is None:
1525            return Get_All_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1526
1527        return Get_All_Capabilities_Response(endpoint.capabilities)
1528
1529    def on_set_configuration_command(self, command):
1530        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1531        if endpoint is None:
1532            return Set_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1533
1534        # Check that the local endpoint isn't in use
1535        if endpoint.in_use:
1536            return Set_Configuration_Reject(AVDTP_SEP_IN_USE_ERROR)
1537
1538        # Create a stream object for the pair of endpoints
1539        stream = Stream(self, endpoint, StreamEndPointProxy(self, command.int_seid))
1540        self.streams[command.acp_seid] = stream
1541
1542        result = stream.on_set_configuration_command(command.capabilities)
1543        return result or Set_Configuration_Response()
1544
1545    def on_get_configuration_command(self, command):
1546        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1547        if endpoint is None:
1548            return Get_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1549        if endpoint.stream is None:
1550            return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1551
1552        return endpoint.stream.on_get_configuration_command()
1553
1554    def on_reconfigure_command(self, command):
1555        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1556        if endpoint is None:
1557            return Reconfigure_Reject(0, AVDTP_BAD_ACP_SEID_ERROR)
1558        if endpoint.stream is None:
1559            return Reconfigure_Reject(0, AVDTP_BAD_STATE_ERROR)
1560
1561        result = endpoint.stream.on_reconfigure_command(command.capabilities)
1562        return result or Reconfigure_Response()
1563
1564    def on_open_command(self, command):
1565        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1566        if endpoint is None:
1567            return Open_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1568        if endpoint.stream is None:
1569            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1570
1571        result = endpoint.stream.on_open_command()
1572        return result or Open_Response()
1573
1574    def on_start_command(self, command):
1575        for seid in command.acp_seids:
1576            endpoint = self.get_local_endpoint_by_seid(seid)
1577            if endpoint is None:
1578                return Start_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR)
1579            if endpoint.stream is None:
1580                return Start_Reject(AVDTP_BAD_STATE_ERROR)
1581
1582        # Start all streams
1583        # TODO: deal with partial failures
1584        for seid in command.acp_seids:
1585            endpoint = self.get_local_endpoint_by_seid(seid)
1586            result = endpoint.stream.on_start_command()
1587            if result is not None:
1588                return result
1589
1590        return Start_Response()
1591
1592    def on_suspend_command(self, command):
1593        for seid in command.acp_seids:
1594            endpoint = self.get_local_endpoint_by_seid(seid)
1595            if endpoint is None:
1596                return Suspend_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR)
1597            if endpoint.stream is None:
1598                return Suspend_Reject(seid, AVDTP_BAD_STATE_ERROR)
1599
1600        # Suspend all streams
1601        # TODO: deal with partial failures
1602        for seid in command.acp_seids:
1603            endpoint = self.get_local_endpoint_by_seid(seid)
1604            result = endpoint.stream.on_suspend_command()
1605            if result is not None:
1606                return result
1607
1608        return Suspend_Response()
1609
1610    def on_close_command(self, command):
1611        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1612        if endpoint is None:
1613            return Close_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1614        if endpoint.stream is None:
1615            return Close_Reject(AVDTP_BAD_STATE_ERROR)
1616
1617        result = endpoint.stream.on_close_command()
1618        return result or Close_Response()
1619
1620    def on_abort_command(self, command):
1621        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1622        if endpoint is None or endpoint.stream is None:
1623            return Abort_Response()
1624
1625        endpoint.stream.on_abort_command()
1626        return Abort_Response()
1627
1628    def on_security_control_command(self, command):
1629        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1630        if endpoint is None:
1631            return Security_Control_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1632
1633        result = endpoint.on_security_control_command(command.payload)
1634        return result or Security_Control_Response()
1635
1636    def on_delayreport_command(self, command):
1637        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1638        if endpoint is None:
1639            return DelayReport_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1640
1641        result = endpoint.on_delayreport_command(command.delay)
1642        return result or DelayReport_Response()
1643
1644
1645# -----------------------------------------------------------------------------
1646class Listener(EventEmitter):
1647    @staticmethod
1648    def create_registrar(device):
1649        return device.create_l2cap_registrar(AVDTP_PSM)
1650
1651    def set_server(self, connection, server):
1652        self.servers[connection.handle] = server
1653
1654    def __init__(self, registrar, version=(1, 3)):
1655        super().__init__()
1656        self.version = version
1657        self.servers = {}  # Servers, by connection handle
1658
1659        # Listen for incoming L2CAP connections
1660        registrar(self.on_l2cap_connection)
1661
1662    def on_l2cap_connection(self, channel):
1663        logger.debug(f'{color("<<< incoming L2CAP connection:", "magenta")} {channel}')
1664
1665        if channel.connection.handle in self.servers:
1666            # This is a channel for a stream endpoint
1667            server = self.servers[channel.connection.handle]
1668            server.on_l2cap_connection(channel)
1669        else:
1670            # This is a new command/response channel
1671            def on_channel_open():
1672                server = Protocol(channel, self.version)
1673                self.set_server(channel.connection, server)
1674                self.emit('connection', server)
1675
1676            channel.on('open', on_channel_open)
1677
1678
1679# -----------------------------------------------------------------------------
1680class Stream:
1681    '''
1682    Pair of a local and a remote stream endpoint that can stream from one to the other
1683    '''
1684
1685    @staticmethod
1686    def state_name(state):
1687        return name_or_number(AVDTP_STATE_NAMES, state)
1688
1689    def change_state(self, state):
1690        logger.debug(f'{self} state change -> {color(self.state_name(state), "cyan")}')
1691        self.state = state
1692
1693    def send_media_packet(self, packet):
1694        self.rtp_channel.send_pdu(bytes(packet))
1695
1696    async def configure(self):
1697        if self.state != AVDTP_IDLE_STATE:
1698            raise InvalidStateError('current state is not IDLE')
1699
1700        await self.remote_endpoint.set_configuration(
1701            self.local_endpoint.seid, self.local_endpoint.configuration
1702        )
1703        self.change_state(AVDTP_CONFIGURED_STATE)
1704
1705    async def open(self):
1706        if self.state != AVDTP_CONFIGURED_STATE:
1707            raise InvalidStateError('current state is not CONFIGURED')
1708
1709        logger.debug('opening remote endpoint')
1710        await self.remote_endpoint.open()
1711
1712        self.change_state(AVDTP_OPEN_STATE)
1713
1714        # Create a channel for RTP packets
1715        self.rtp_channel = await self.protocol.channel_connector()
1716
1717    async def start(self):
1718        # Auto-open if needed
1719        if self.state == AVDTP_CONFIGURED_STATE:
1720            await self.open()
1721
1722        if self.state != AVDTP_OPEN_STATE:
1723            raise InvalidStateError('current state is not OPEN')
1724
1725        logger.debug('starting remote endpoint')
1726        await self.remote_endpoint.start()
1727
1728        logger.debug('starting local endpoint')
1729        await self.local_endpoint.start()
1730
1731        self.change_state(AVDTP_STREAMING_STATE)
1732
1733    async def stop(self):
1734        if self.state != AVDTP_STREAMING_STATE:
1735            raise InvalidStateError('current state is not STREAMING')
1736
1737        logger.debug('stopping local endpoint')
1738        await self.local_endpoint.stop()
1739
1740        logger.debug('stopping remote endpoint')
1741        await self.remote_endpoint.stop()
1742
1743        self.change_state(AVDTP_OPEN_STATE)
1744
1745    async def close(self):
1746        if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE):
1747            raise InvalidStateError('current state is not OPEN or STREAMING')
1748
1749        logger.debug('closing local endpoint')
1750        await self.local_endpoint.close()
1751
1752        logger.debug('closing remote endpoint')
1753        await self.remote_endpoint.close()
1754
1755        # Release any channels we may have created
1756        self.change_state(AVDTP_CLOSING_STATE)
1757        if self.rtp_channel:
1758            await self.rtp_channel.disconnect()
1759            self.rtp_channel = None
1760
1761        # Release the endpoint
1762        self.local_endpoint.in_use = 0
1763
1764        self.change_state(AVDTP_IDLE_STATE)
1765
1766    def on_set_configuration_command(self, configuration):
1767        if self.state != AVDTP_IDLE_STATE:
1768            return Set_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1769
1770        result = self.local_endpoint.on_set_configuration_command(configuration)
1771        if result is not None:
1772            return result
1773
1774        self.change_state(AVDTP_CONFIGURED_STATE)
1775        return None
1776
1777    def on_get_configuration_command(self, configuration):
1778        if self.state not in (
1779            AVDTP_CONFIGURED_STATE,
1780            AVDTP_OPEN_STATE,
1781            AVDTP_STREAMING_STATE,
1782        ):
1783            return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1784
1785        return self.local_endpoint.on_get_configuration_command(configuration)
1786
1787    def on_reconfigure_command(self, configuration):
1788        if self.state != AVDTP_OPEN_STATE:
1789            return Reconfigure_Reject(AVDTP_BAD_STATE_ERROR)
1790
1791        result = self.local_endpoint.on_reconfigure_command(configuration)
1792        if result is not None:
1793            return result
1794
1795        return None
1796
1797    def on_open_command(self):
1798        if self.state != AVDTP_CONFIGURED_STATE:
1799            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1800
1801        result = self.local_endpoint.on_open_command()
1802        if result is not None:
1803            return result
1804
1805        # Register to accept the next channel
1806        self.protocol.channel_acceptor = self
1807
1808        self.change_state(AVDTP_OPEN_STATE)
1809        return None
1810
1811    def on_start_command(self):
1812        if self.state != AVDTP_OPEN_STATE:
1813            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1814
1815        # Check that we have an RTP channel
1816        if self.rtp_channel is None:
1817            logger.warning('received start command before RTP channel establishment')
1818            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1819
1820        result = self.local_endpoint.on_start_command()
1821        if result is not None:
1822            return result
1823
1824        self.change_state(AVDTP_STREAMING_STATE)
1825        return None
1826
1827    def on_suspend_command(self):
1828        if self.state != AVDTP_STREAMING_STATE:
1829            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1830
1831        result = self.local_endpoint.on_suspend_command()
1832        if result is not None:
1833            return result
1834
1835        self.change_state(AVDTP_OPEN_STATE)
1836        return None
1837
1838    def on_close_command(self):
1839        if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE):
1840            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1841
1842        result = self.local_endpoint.on_close_command()
1843        if result is not None:
1844            return result
1845
1846        self.change_state(AVDTP_CLOSING_STATE)
1847
1848        if self.rtp_channel is None:
1849            # No channel to release, we're done
1850            self.change_state(AVDTP_IDLE_STATE)
1851        else:
1852            # TODO: set a timer as we wait for the RTP channel to be closed
1853            pass
1854
1855        return None
1856
1857    def on_abort_command(self):
1858        if self.rtp_channel is None:
1859            # No need to wait
1860            self.change_state(AVDTP_IDLE_STATE)
1861        else:
1862            # Wait for the RTP channel to be closed
1863            self.change_state(AVDTP_ABORTING_STATE)
1864
1865    def on_l2cap_connection(self, channel):
1866        logger.debug(color('<<< stream channel connected', 'magenta'))
1867        self.rtp_channel = channel
1868        channel.on('open', self.on_l2cap_channel_open)
1869        channel.on('close', self.on_l2cap_channel_close)
1870
1871        # We don't need more channels
1872        self.protocol.channel_acceptor = None
1873
1874    def on_l2cap_channel_open(self):
1875        logger.debug(color('<<< stream channel open', 'magenta'))
1876        self.local_endpoint.on_rtp_channel_open()
1877
1878    def on_l2cap_channel_close(self):
1879        logger.debug(color('<<< stream channel closed', 'magenta'))
1880        self.local_endpoint.on_rtp_channel_close()
1881        self.local_endpoint.in_use = 0
1882        self.rtp_channel = None
1883
1884        if self.state in (AVDTP_CLOSING_STATE, AVDTP_ABORTING_STATE):
1885            self.change_state(AVDTP_IDLE_STATE)
1886        else:
1887            logger.warning('unexpected channel close while not CLOSING or ABORTING')
1888
1889    def __init__(self, protocol, local_endpoint, remote_endpoint):
1890        '''
1891        remote_endpoint must be a subclass of StreamEndPointProxy
1892
1893        '''
1894        self.protocol = protocol
1895        self.local_endpoint = local_endpoint
1896        self.remote_endpoint = remote_endpoint
1897        self.rtp_channel = None
1898        self.state = AVDTP_IDLE_STATE
1899
1900        local_endpoint.stream = self
1901        local_endpoint.in_use = 1
1902
1903    def __str__(self):
1904        return (
1905            f'Stream({self.local_endpoint.seid} -> '
1906            f'{self.remote_endpoint.seid} {self.state_name(self.state)})'
1907        )
1908
1909
1910# -----------------------------------------------------------------------------
1911class StreamEndPoint:
1912    def __init__(self, seid, media_type, tsep, in_use, capabilities):
1913        self.seid = seid
1914        self.media_type = media_type
1915        self.tsep = tsep
1916        self.in_use = in_use
1917        self.capabilities = capabilities
1918
1919    def __str__(self):
1920        media_type = f'{name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}'
1921        tsep = f'{name_or_number(AVDTP_TSEP_NAMES, self.tsep)}'
1922        return '\n'.join(
1923            [
1924                'SEP(',
1925                f'  seid={self.seid}',
1926                f'  media_type={media_type}',
1927                f'  tsep={tsep}',
1928                f'  in_use={self.in_use}',
1929                '  capabilities=[',
1930                '\n'.join([f'    {x}' for x in self.capabilities]),
1931                '  ]',
1932                ')',
1933            ]
1934        )
1935
1936
1937# -----------------------------------------------------------------------------
1938class StreamEndPointProxy:
1939    def __init__(self, protocol, seid):
1940        self.seid = seid
1941        self.protocol = protocol
1942
1943    async def set_configuration(self, int_seid, configuration):
1944        return await self.protocol.set_configuration(self.seid, int_seid, configuration)
1945
1946    async def open(self):
1947        return await self.protocol.open(self.seid)
1948
1949    async def start(self):
1950        return await self.protocol.start([self.seid])
1951
1952    async def stop(self):
1953        return await self.protocol.suspend([self.seid])
1954
1955    async def close(self):
1956        return await self.protocol.close(self.seid)
1957
1958    async def abort(self):
1959        return await self.protocol.abort(self.seid)
1960
1961
1962# -----------------------------------------------------------------------------
1963class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy):
1964    def __init__(self, protocol, seid, media_type, tsep, in_use, capabilities):
1965        StreamEndPoint.__init__(self, seid, media_type, tsep, in_use, capabilities)
1966        StreamEndPointProxy.__init__(self, protocol, seid)
1967
1968
1969# -----------------------------------------------------------------------------
1970class LocalStreamEndPoint(StreamEndPoint):
1971    def __init__(
1972        self, protocol, seid, media_type, tsep, capabilities, configuration=None
1973    ):
1974        super().__init__(seid, media_type, tsep, 0, capabilities)
1975        self.protocol = protocol
1976        self.configuration = configuration if configuration is not None else []
1977        self.stream = None
1978
1979    async def start(self):
1980        pass
1981
1982    async def stop(self):
1983        pass
1984
1985    async def close(self):
1986        pass
1987
1988    def on_reconfigure_command(self, command):
1989        pass
1990
1991    def on_get_configuration_command(self):
1992        return Get_Configuration_Response(self.configuration)
1993
1994    def on_open_command(self):
1995        pass
1996
1997    def on_start_command(self):
1998        pass
1999
2000    def on_suspend_command(self):
2001        pass
2002
2003    def on_close_command(self):
2004        pass
2005
2006    def on_abort_command(self):
2007        pass
2008
2009    def on_rtp_channel_open(self):
2010        pass
2011
2012    def on_rtp_channel_close(self):
2013        pass
2014
2015
2016# -----------------------------------------------------------------------------
2017class LocalSource(LocalStreamEndPoint, EventEmitter):
2018    def __init__(self, protocol, seid, codec_capabilities, packet_pump):
2019        capabilities = [
2020            ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
2021            codec_capabilities,
2022        ]
2023        LocalStreamEndPoint.__init__(
2024            self,
2025            protocol,
2026            seid,
2027            codec_capabilities.media_type,
2028            AVDTP_TSEP_SRC,
2029            capabilities,
2030            capabilities,
2031        )
2032        EventEmitter.__init__(self)
2033        self.packet_pump = packet_pump
2034
2035    async def start(self):
2036        if self.packet_pump:
2037            return await self.packet_pump.start(self.stream.rtp_channel)
2038
2039        self.emit('start', self.stream.rtp_channel)
2040
2041    async def stop(self):
2042        if self.packet_pump:
2043            return await self.packet_pump.stop()
2044
2045        self.emit('stop')
2046
2047    def on_set_configuration_command(self, configuration):
2048        # For now, blindly accept the configuration
2049        logger.debug(f'<<< received source configuration: {configuration}')
2050        self.configuration = configuration
2051
2052    def on_start_command(self):
2053        asyncio.create_task(self.start())
2054
2055    def on_suspend_command(self):
2056        asyncio.create_task(self.stop())
2057
2058
2059# -----------------------------------------------------------------------------
2060class LocalSink(LocalStreamEndPoint, EventEmitter):
2061    def __init__(self, protocol, seid, codec_capabilities):
2062        capabilities = [
2063            ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
2064            codec_capabilities,
2065        ]
2066        LocalStreamEndPoint.__init__(
2067            self,
2068            protocol,
2069            seid,
2070            codec_capabilities.media_type,
2071            AVDTP_TSEP_SNK,
2072            capabilities,
2073        )
2074        EventEmitter.__init__(self)
2075
2076    def on_set_configuration_command(self, configuration):
2077        # For now, blindly accept the configuration
2078        logger.debug(f'<<< received sink configuration: {configuration}')
2079        self.configuration = configuration
2080
2081    def on_rtp_channel_open(self):
2082        logger.debug(color('<<< RTP channel open', 'magenta'))
2083        self.stream.rtp_channel.sink = self.on_avdtp_packet
2084
2085    def on_avdtp_packet(self, packet):
2086        rtp_packet = MediaPacket.from_bytes(packet)
2087        logger.debug(
2088            f'{color("<<< RTP Packet:", "green")} '
2089            f'{rtp_packet} {rtp_packet.payload[:16].hex()}'
2090        )
2091        self.emit('rtp_packet', rtp_packet)
2092