• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18import asyncio
19import struct
20import time
21import logging
22from colors import color
23from pyee import EventEmitter
24
25from .core import (
26    BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE,
27    InvalidStateError,
28    ProtocolError,
29    name_or_number
30)
31from .a2dp import (
32    A2DP_CODEC_TYPE_NAMES,
33    A2DP_MPEG_2_4_AAC_CODEC_TYPE,
34    A2DP_NON_A2DP_CODEC_TYPE,
35    A2DP_SBC_CODEC_TYPE,
36    AacMediaCodecInformation,
37    SbcMediaCodecInformation,
38    VendorSpecificMediaCodecInformation
39)
40from . import sdp
41
42# -----------------------------------------------------------------------------
43# Logging
44# -----------------------------------------------------------------------------
45logger = logging.getLogger(__name__)
46
47
48# -----------------------------------------------------------------------------
49# Constants
50# -----------------------------------------------------------------------------
51AVDTP_PSM = 0x0019
52
53AVDTP_DEFAULT_RTX_SIG_TIMER = 5  # Seconds
54
55# Signal Identifiers (AVDTP spec - 8.5 Signal Command Set)
56AVDTP_DISCOVER             = 0x01
57AVDTP_GET_CAPABILITIES     = 0x02
58AVDTP_SET_CONFIGURATION    = 0x03
59AVDTP_GET_CONFIGURATION    = 0x04
60AVDTP_RECONFIGURE          = 0x05
61AVDTP_OPEN                 = 0x06
62AVDTP_START                = 0x07
63AVDTP_CLOSE                = 0x08
64AVDTP_SUSPEND              = 0x09
65AVDTP_ABORT                = 0x0A
66AVDTP_SECURITY_CONTROL     = 0x0B
67AVDTP_GET_ALL_CAPABILITIES = 0x0C
68AVDTP_DELAYREPORT          = 0x0D
69
70AVDTP_SIGNAL_NAMES = {
71    AVDTP_DISCOVER:             'AVDTP_DISCOVER',
72    AVDTP_GET_CAPABILITIES:     'AVDTP_GET_CAPABILITIES',
73    AVDTP_SET_CONFIGURATION:    'AVDTP_SET_CONFIGURATION',
74    AVDTP_GET_CONFIGURATION:    'AVDTP_GET_CONFIGURATION',
75    AVDTP_RECONFIGURE:          'AVDTP_RECONFIGURE',
76    AVDTP_OPEN:                 'AVDTP_OPEN',
77    AVDTP_START:                'AVDTP_START',
78    AVDTP_CLOSE:                'AVDTP_CLOSE',
79    AVDTP_SUSPEND:              'AVDTP_SUSPEND',
80    AVDTP_ABORT:                'AVDTP_ABORT',
81    AVDTP_SECURITY_CONTROL:     'AVDTP_SECURITY_CONTROL',
82    AVDTP_GET_ALL_CAPABILITIES: 'AVDTP_GET_ALL_CAPABILITIES',
83    AVDTP_DELAYREPORT:          'AVDTP_DELAYREPORT'
84}
85
86AVDTP_SIGNAL_IDENTIFIERS = {
87    'AVDTP_DISCOVER':             AVDTP_DISCOVER,
88    'AVDTP_GET_CAPABILITIES':     AVDTP_GET_CAPABILITIES,
89    'AVDTP_SET_CONFIGURATION':    AVDTP_SET_CONFIGURATION,
90    'AVDTP_GET_CONFIGURATION':    AVDTP_GET_CONFIGURATION,
91    'AVDTP_RECONFIGURE':          AVDTP_RECONFIGURE,
92    'AVDTP_OPEN':                 AVDTP_OPEN,
93    'AVDTP_START':                AVDTP_START,
94    'AVDTP_CLOSE':                AVDTP_CLOSE,
95    'AVDTP_SUSPEND':              AVDTP_SUSPEND,
96    'AVDTP_ABORT':                AVDTP_ABORT,
97    'AVDTP_SECURITY_CONTROL':     AVDTP_SECURITY_CONTROL,
98    'AVDTP_GET_ALL_CAPABILITIES': AVDTP_GET_ALL_CAPABILITIES,
99    'AVDTP_DELAYREPORT':          AVDTP_DELAYREPORT
100}
101
102# Error codes (AVDTP spec - 8.20.6.2 ERROR_CODE tables)
103AVDTP_BAD_HEADER_FORMAT_ERROR          = 0x01
104AVDTP_BAD_LENGTH_ERROR                 = 0x11
105AVDTP_BAD_ACP_SEID_ERROR               = 0x12
106AVDTP_SEP_IN_USE_ERROR                 = 0x13
107AVDTP_SEP_NOT_IN_USE_ERROR             = 0x14
108AVDTP_BAD_SERV_CATEGORY_ERROR          = 0x17
109AVDTP_BAD_PAYLOAD_FORMAT_ERROR         = 0x18
110AVDTP_NOT_SUPPORTED_COMMAND_ERROR      = 0x19
111AVDTP_INVALID_CAPABILITIES_ERROR       = 0x1A
112AVDTP_BAD_RECOVERY_TYPE_ERROR          = 0x22
113AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR = 0x23
114AVDTP_BAD_RECOVERY_FORMAT_ERROR        = 0x25
115AVDTP_BAD_ROHC_FORMAT_ERROR            = 0x26
116AVDTP_BAD_CP_FORMAT_ERROR              = 0x27
117AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR    = 0x28
118AVDTP_UNSUPPORTED_CONFIGURATION_ERROR  = 0x29
119AVDTP_BAD_STATE_ERROR                  = 0x31
120
121AVDTP_ERROR_NAMES = {
122    AVDTP_BAD_HEADER_FORMAT_ERROR:          'AVDTP_BAD_HEADER_FORMAT_ERROR',
123    AVDTP_BAD_LENGTH_ERROR:                 'AVDTP_BAD_LENGTH_ERROR',
124    AVDTP_BAD_ACP_SEID_ERROR:               'AVDTP_BAD_ACP_SEID_ERROR',
125    AVDTP_SEP_IN_USE_ERROR:                 'AVDTP_SEP_IN_USE_ERROR',
126    AVDTP_SEP_NOT_IN_USE_ERROR:             'AVDTP_SEP_NOT_IN_USE_ERROR',
127    AVDTP_BAD_SERV_CATEGORY_ERROR:          'AVDTP_BAD_SERV_CATEGORY_ERROR',
128    AVDTP_BAD_PAYLOAD_FORMAT_ERROR:         'AVDTP_BAD_PAYLOAD_FORMAT_ERROR',
129    AVDTP_NOT_SUPPORTED_COMMAND_ERROR:      'AVDTP_NOT_SUPPORTED_COMMAND_ERROR',
130    AVDTP_INVALID_CAPABILITIES_ERROR:       'AVDTP_INVALID_CAPABILITIES_ERROR',
131    AVDTP_BAD_RECOVERY_TYPE_ERROR:          'AVDTP_BAD_RECOVERY_TYPE_ERROR',
132    AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR: 'AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR',
133    AVDTP_BAD_RECOVERY_FORMAT_ERROR:        'AVDTP_BAD_RECOVERY_FORMAT_ERROR',
134    AVDTP_BAD_ROHC_FORMAT_ERROR:            'AVDTP_BAD_ROHC_FORMAT_ERROR',
135    AVDTP_BAD_CP_FORMAT_ERROR:              'AVDTP_BAD_CP_FORMAT_ERROR',
136    AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR:    'AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR',
137    AVDTP_UNSUPPORTED_CONFIGURATION_ERROR:  'AVDTP_UNSUPPORTED_CONFIGURATION_ERROR',
138    AVDTP_BAD_STATE_ERROR:                  'AVDTP_BAD_STATE_ERROR'
139}
140
141AVDTP_AUDIO_MEDIA_TYPE      = 0x00
142AVDTP_VIDEO_MEDIA_TYPE      = 0x01
143AVDTP_MULTIMEDIA_MEDIA_TYPE = 0x02
144
145AVDTP_MEDIA_TYPE_NAMES = {
146    AVDTP_AUDIO_MEDIA_TYPE:      'AVDTP_AUDIO_MEDIA_TYPE',
147    AVDTP_VIDEO_MEDIA_TYPE:      'AVDTP_VIDEO_MEDIA_TYPE',
148    AVDTP_MULTIMEDIA_MEDIA_TYPE: 'AVDTP_MULTIMEDIA_MEDIA_TYPE'
149}
150
151# TSEP (AVDTP spec - 8.20.3 Stream End-point Type, Source or Sink (TSEP))
152AVDTP_TSEP_SRC = 0x00
153AVDTP_TSEP_SNK = 0x01
154
155AVDTP_TSEP_NAMES = {
156    AVDTP_TSEP_SRC: 'AVDTP_TSEP_SRC',
157    AVDTP_TSEP_SNK: 'AVDTP_TSEP_SNK'
158}
159
160# Service Categories (AVDTP spec - Table 8.47: Service Category information element field values)
161AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY    = 0x01
162AVDTP_REPORTING_SERVICE_CATEGORY          = 0x02
163AVDTP_RECOVERY_SERVICE_CATEGORY           = 0x03
164AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY = 0x04
165AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY = 0x05
166AVDTP_MULTIPLEXING_SERVICE_CATEGORY       = 0x06
167AVDTP_MEDIA_CODEC_SERVICE_CATEGORY        = 0x07
168AVDTP_DELAY_REPORTING_SERVICE_CATEGORY    = 0x08
169
170AVDTP_SERVICE_CATEGORY_NAMES = {
171    AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY:    'AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY',
172    AVDTP_REPORTING_SERVICE_CATEGORY:          'AVDTP_REPORTING_SERVICE_CATEGORY',
173    AVDTP_RECOVERY_SERVICE_CATEGORY:           'AVDTP_RECOVERY_SERVICE_CATEGORY',
174    AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY: 'AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY',
175    AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY: 'AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY',
176    AVDTP_MULTIPLEXING_SERVICE_CATEGORY:       'AVDTP_MULTIPLEXING_SERVICE_CATEGORY',
177    AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:        'AVDTP_MEDIA_CODEC_SERVICE_CATEGORY',
178    AVDTP_DELAY_REPORTING_SERVICE_CATEGORY:    'AVDTP_DELAY_REPORTING_SERVICE_CATEGORY'
179}
180
181# States (AVDTP spec - 9.1 State Definitions)
182AVDTP_IDLE_STATE       = 0x00
183AVDTP_CONFIGURED_STATE = 0x01
184AVDTP_OPEN_STATE       = 0x02
185AVDTP_STREAMING_STATE  = 0x03
186AVDTP_CLOSING_STATE    = 0x04
187AVDTP_ABORTING_STATE   = 0x05
188
189AVDTP_STATE_NAMES = {
190    AVDTP_IDLE_STATE:       'AVDTP_IDLE_STATE',
191    AVDTP_CONFIGURED_STATE: 'AVDTP_CONFIGURED_STATE',
192    AVDTP_OPEN_STATE:       'AVDTP_OPEN_STATE',
193    AVDTP_STREAMING_STATE:  'AVDTP_STREAMING_STATE',
194    AVDTP_CLOSING_STATE:    'AVDTP_CLOSING_STATE',
195    AVDTP_ABORTING_STATE:   'AVDTP_ABORTING_STATE'
196}
197
198
199# -----------------------------------------------------------------------------
200async def find_avdtp_service_with_sdp_client(sdp_client):
201    '''
202    Find an AVDTP service, using a connected SDP client, and return its version,
203    or None if none is found
204    '''
205
206    # Search for services with an Audio Sink service class
207    search_result = await sdp_client.search_attributes(
208        [BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE],
209        [
210            sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
211        ]
212    )
213    for attribute_list in search_result:
214        profile_descriptor_list = sdp.ServiceAttribute.find_attribute_in_list(
215            attribute_list,
216            sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID
217        )
218        if profile_descriptor_list:
219            for profile_descriptor in profile_descriptor_list.value:
220                if len(profile_descriptor.value) >= 2:
221                    avdtp_version_major = profile_descriptor.value[1].value >> 8
222                    avdtp_version_minor = profile_descriptor.value[1].value & 0xFF
223                    return (avdtp_version_major, avdtp_version_minor)
224
225
226# -----------------------------------------------------------------------------
227async def find_avdtp_service_with_connection(device, connection):
228    '''
229    Find an AVDTP service, for a connection, and return its version,
230    or None if none is found
231    '''
232
233    sdp_client = sdp.Client(device)
234    await sdp_client.connect(connection)
235    service_version = await find_avdtp_service_with_sdp_client(sdp_client)
236    await sdp_client.disconnect()
237
238    return service_version
239
240
241# -----------------------------------------------------------------------------
242class RealtimeClock:
243    def now(self):
244        return time.time()
245
246    async def sleep(self, duration):
247        await asyncio.sleep(duration)
248
249
250# -----------------------------------------------------------------------------
251class MediaPacket:
252    @staticmethod
253    def from_bytes(data):
254        version         = (data[0] >> 6) & 0x03
255        padding         = (data[0] >> 5) & 0x01
256        extension       = (data[0] >> 4) & 0x01
257        csrc_count      = data[0] & 0x0F
258        marker          = (data[1] >> 7) & 0x01
259        payload_type    = data[1] & 0x7F
260        sequence_number = struct.unpack_from('>H', data, 2)[0]
261        timestamp       = struct.unpack_from('>I', data, 4)[0]
262        ssrc            = struct.unpack_from('>I', data, 8)[0]
263        csrc_list       = [struct.unpack_from('>I', data, 12 + i)[0] for i in range(csrc_count)]
264        payload         = data[12 + csrc_count * 4:]
265
266        return MediaPacket(
267            version,
268            padding,
269            extension,
270            marker,
271            sequence_number,
272            timestamp,
273            ssrc,
274            csrc_list,
275            payload_type,
276            payload
277        )
278
279    def __init__(
280        self,
281        version,
282        padding,
283        extension,
284        marker,
285        sequence_number,
286        timestamp,
287        ssrc,
288        csrc_list,
289        payload_type,
290        payload
291    ):
292        self.version         = version
293        self.padding         = padding
294        self.extension       = extension
295        self.marker          = marker
296        self.sequence_number = sequence_number
297        self.timestamp       = timestamp
298        self.ssrc            = ssrc
299        self.csrc_list       = csrc_list
300        self.payload_type    = payload_type
301        self.payload         = payload
302
303    def __bytes__(self):
304        header = (
305            bytes([
306                self.version << 6 | self.padding << 5 | self.extension << 4 | len(self.csrc_list),
307                self.marker << 7 | self.payload_type
308            ]) +
309            struct.pack('>HII', self.sequence_number, self.timestamp, self.ssrc)
310        )
311        for csrc in self.csrc_list:
312            header += struct.pack('>I', csrc)
313        return header + self.payload
314
315    def __str__(self):
316        return f'RTP(v={self.version},p={self.padding},x={self.extension},m={self.marker},pt={self.payload_type},sn={self.sequence_number},ts={self.timestamp},ssrc={self.ssrc},csrcs={self.csrc_list},payload_size={len(self.payload)})'
317
318
319# -----------------------------------------------------------------------------
320class MediaPacketPump:
321    def __init__(self, packets, clock=RealtimeClock()):
322        self.packets   = packets
323        self.clock     = clock
324        self.pump_task = None
325
326    async def start(self, rtp_channel):
327        async def pump_packets():
328            start_time      = 0
329            start_timestamp = 0
330
331            try:
332                logger.debug('pump starting')
333                async for packet in self.packets:
334                    # Capture the timestamp of the first packet
335                    if start_time == 0:
336                        start_time      = self.clock.now()
337                        start_timestamp = packet.timestamp_seconds
338
339                    # Wait until we can send
340                    when = start_time + (packet.timestamp_seconds - start_timestamp)
341                    now = self.clock.now()
342                    if when > now:
343                        delay = when - now
344                        logger.debug(f'waiting for {delay}')
345                        await self.clock.sleep(delay)
346
347                    # Emit
348                    rtp_channel.send_pdu(bytes(packet))
349                    logger.debug(f'{color(">>> sending RTP packet:", "green")} {packet}')
350            except asyncio.exceptions.CancelledError:
351                logger.debug('pump canceled')
352
353        # Pump packets
354        self.pump_task = asyncio.get_running_loop().create_task(pump_packets())
355
356    async def stop(self):
357        # Stop the pump
358        if self.pump_task:
359            self.pump_task.cancel()
360            await self.pump_task
361            self.pump_task = None
362
363
364# -----------------------------------------------------------------------------
365class MessageAssembler:
366    def __init__(self, callback):
367        self.callback = callback
368        self.reset()
369
370    def reset(self):
371        self.transaction_label        = 0
372        self.message                  = None
373        self.message_type             = 0
374        self.signal_identifier        = 0
375        self.number_of_signal_packets = 0
376        self.packet_count             = 0
377
378    def on_pdu(self, pdu):
379        self.packet_count += 1
380
381        transaction_label = pdu[0] >> 4
382        packet_type       = (pdu[0] >> 2) & 3
383        message_type      = pdu[0] & 3
384
385        logger.debug(f'transaction_label={transaction_label}, packet_type={Protocol.packet_type_name(packet_type)}, message_type={Message.message_type_name(message_type)}')
386        if packet_type == Protocol.SINGLE_PACKET or packet_type == Protocol.START_PACKET:
387            if self.message is not None:
388                # The previous message has not been terminated
389                logger.warning('received a start or single packet when expecting an end or continuation')
390                self.reset()
391
392            self.transaction_label = transaction_label
393            self.signal_identifier = pdu[1] & 0x3F
394            self.message_type      = message_type
395
396            if packet_type == Protocol.SINGLE_PACKET:
397                self.message = pdu[2:]
398                self.on_message_complete()
399            else:
400                self.number_of_signal_packets = pdu[2]
401                self.message                  = pdu[3:]
402        elif packet_type == Protocol.CONTINUE_PACKET or packet_type == Protocol.END_PACKET:
403            if self.packet_count == 0:
404                logger.warning('unexpected continuation')
405                return
406
407            if transaction_label != self.transaction_label:
408                logger.warning(f'transaction label mismatch: expected {self.transaction_label}, received {transaction_label}')
409                return
410
411            if message_type != self.message_type:
412                logger.warning(f'message type mismatch: expected {self.message_type}, received {message_type}')
413                return
414
415            self.message += pdu[1:]
416
417            if packet_type == Protocol.END_PACKET:
418                if self.packet_count != self.number_of_signal_packets:
419                    logger.warning(f'incomplete fragmented message: expected {self.number_of_signal_packets} packets, received {self.packet_count}')
420                    self.reset()
421                    return
422
423                self.on_message_complete()
424            else:
425                if self.packet_count > self.number_of_signal_packets:
426                    logger.warning(f'too many packets: expected {self.number_of_signal_packets}, received {self.packet_count}')
427                    self.reset()
428                    return
429
430    def on_message_complete(self):
431        message = Message.create(self.signal_identifier, self.message_type, self.message)
432
433        try:
434            self.callback(self.transaction_label, message)
435        except Exception as error:
436            logger.warning(color(f'!!! exception in callback: {error}'))
437
438        self.reset()
439
440
441# -----------------------------------------------------------------------------
442class ServiceCapabilities:
443    @staticmethod
444    def create(service_category, service_capabilities_bytes):
445        # Select the appropriate subclass
446        if service_category == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:
447            cls = MediaCodecCapabilities
448        else:
449            cls = ServiceCapabilities
450
451        # Create an instance and initialize it
452        instance = cls.__new__(cls)
453        instance.service_category = service_category
454        instance.service_capabilities_bytes = service_capabilities_bytes
455        instance.init_from_bytes()
456
457        return instance
458
459    @staticmethod
460    def parse_capabilities(payload):
461        capabilities = []
462        while payload:
463            service_category               = payload[0]
464            length_of_service_capabilities = payload[1]
465            service_capabilities_bytes     = payload[2:2 + length_of_service_capabilities]
466            capabilities.append(ServiceCapabilities.create(service_category, service_capabilities_bytes))
467
468            payload = payload[2 + length_of_service_capabilities:]
469
470        return capabilities
471
472    @staticmethod
473    def serialize_capabilities(capabilities):
474        serialized = b''
475        for item in capabilities:
476            serialized += bytes([
477                item.service_category,
478                len(item.service_capabilities_bytes)
479            ]) + item.service_capabilities_bytes
480        return serialized
481
482    def init_from_bytes(self):
483        pass
484
485    def __init__(self, service_category, service_capabilities_bytes=b''):
486        self.service_category           = service_category
487        self.service_capabilities_bytes = service_capabilities_bytes
488
489    def to_string(self, details=[]):
490        attributes = ','.join([name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)] + details)
491        return f'ServiceCapabilities({attributes})'
492
493    def __str__(self):
494        if self.service_capabilities_bytes:
495            details = [self.service_capabilities_bytes.hex()]
496        else:
497            details = []
498        return self.to_string(details)
499
500
501# -----------------------------------------------------------------------------
502class MediaCodecCapabilities(ServiceCapabilities):
503    def init_from_bytes(self):
504        self.media_type              = self.service_capabilities_bytes[0]
505        self.media_codec_type        = self.service_capabilities_bytes[1]
506        self.media_codec_information = self.service_capabilities_bytes[2:]
507
508        if self.media_codec_type == A2DP_SBC_CODEC_TYPE:
509            self.media_codec_information = SbcMediaCodecInformation.from_bytes(self.media_codec_information)
510        elif self.media_codec_type == A2DP_MPEG_2_4_AAC_CODEC_TYPE:
511            self.media_codec_information = AacMediaCodecInformation.from_bytes(self.media_codec_information)
512        elif self.media_codec_type == A2DP_NON_A2DP_CODEC_TYPE:
513            self.media_codec_information = VendorSpecificMediaCodecInformation.from_bytes(self.media_codec_information)
514
515    def __init__(self, media_type, media_codec_type, media_codec_information):
516        super().__init__(
517            AVDTP_MEDIA_CODEC_SERVICE_CATEGORY,
518            bytes([media_type, media_codec_type]) + bytes(media_codec_information)
519        )
520        self.media_type              = media_type
521        self.media_codec_type        = media_codec_type
522        self.media_codec_information = media_codec_information
523
524    def __str__(self):
525        details = [
526            f'media_type={name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}',
527            f'codec={name_or_number(A2DP_CODEC_TYPE_NAMES, self.media_codec_type)}',
528            f'codec_info={self.media_codec_information.hex() if type(self.media_codec_information) is bytes else str(self.media_codec_information)}'
529        ]
530        return self.to_string(details)
531
532
533# -----------------------------------------------------------------------------
534class EndPointInfo:
535    @staticmethod
536    def from_bytes(payload):
537        return EndPointInfo(
538            payload[0] >> 2,
539            payload[0] >> 1 & 1,
540            payload[1] >> 4,
541            payload[1] >> 3 & 1
542        )
543
544    def __bytes__(self):
545        return bytes([
546            self.seid << 2 | self.in_use << 1,
547            self.media_type << 4 | self.tsep << 3
548        ])
549
550    def __init__(self, seid, in_use, media_type, tsep):
551        self.seid       = seid
552        self.in_use     = in_use
553        self.media_type = media_type
554        self.tsep       = tsep
555
556
557# -----------------------------------------------------------------------------
558class Message:
559    COMMAND         = 0
560    GENERAL_REJECT  = 1
561    RESPONSE_ACCEPT = 2
562    RESPONSE_REJECT = 3
563
564    MESSAGE_TYPE_NAMES = {
565        COMMAND:         'COMMAND',
566        GENERAL_REJECT:  'GENERAL_REJECT',
567        RESPONSE_ACCEPT: 'RESPONSE_ACCEPT',
568        RESPONSE_REJECT: 'RESPONSE_REJECT'
569    }
570
571    subclasses = {}  # Subclasses, by signal identifier and message type
572
573    @staticmethod
574    def message_type_name(message_type):
575        return name_or_number(Message.MESSAGE_TYPE_NAMES, message_type)
576
577    @staticmethod
578    def subclass(cls):
579        # Infer the signal identifier and message subtype from the class name
580        name = cls.__name__
581        if name == 'General_Reject':
582            cls.signal_identifier = 0
583            signal_identifier_str = None
584            message_type = Message.COMMAND
585        elif name.endswith('_Command'):
586            signal_identifier_str = name[:-8]
587            message_type = Message.COMMAND
588        elif name.endswith('_Response'):
589            signal_identifier_str = name[:-9]
590            message_type = Message.RESPONSE_ACCEPT
591        elif name.endswith('_Reject'):
592            signal_identifier_str = name[:-7]
593            message_type = Message.RESPONSE_REJECT
594        else:
595            raise ValueError('invalid class name')
596
597        cls.message_type = message_type
598
599        if signal_identifier_str is not None:
600            for (name, signal_identifier) in AVDTP_SIGNAL_IDENTIFIERS.items():
601                if name.lower().endswith(signal_identifier_str.lower()):
602                    cls.signal_identifier = signal_identifier
603                    break
604
605            # Register the subclass
606            Message.subclasses.setdefault(cls.signal_identifier, {})[cls.message_type] = cls
607
608        return cls
609
610    # Factory method to create a subclass based on the signal identifier and message type
611    @staticmethod
612    def create(signal_identifier, message_type, payload):
613        # Look for a registered subclass
614        subclasses = Message.subclasses.get(signal_identifier)
615        if subclasses:
616            subclass = subclasses.get(message_type)
617            if subclass:
618                instance = subclass.__new__(subclass)
619                instance.payload = payload
620                instance.init_from_payload()
621                return instance
622
623        # Instantiate the appropriate class based on the message type
624        if message_type == Message.RESPONSE_REJECT:
625            # Assume a simple reject message
626            instance = Simple_Reject(payload)
627            instance.init_from_payload()
628        else:
629            instance = Message(payload)
630        instance.signal_identifier = signal_identifier
631        instance.message_type = message_type
632        return instance
633
634    def init_from_payload(self):
635        pass
636
637    def __init__(self, payload=b''):
638        self.payload           = payload
639
640    def to_string(self, details):
641        base = f'{color(f"{name_or_number(AVDTP_SIGNAL_NAMES, self.signal_identifier)}_{Message.message_type_name(self.message_type)}", "yellow")}'
642        if details:
643            if type(details) is str:
644                return f'{base}: {details}'
645            else:
646                return base + ':\n' + '\n'.join(['  ' + color(detail, 'cyan') for detail in details])
647        else:
648            return base
649
650    def __str__(self):
651        return self.to_string(self.payload.hex())
652
653
654# -----------------------------------------------------------------------------
655class Simple_Command(Message):
656    '''
657    Command message with just one seid
658    '''
659
660    def init_from_payload(self):
661        self.acp_seid = self.payload[0] >> 2
662
663    def __init__(self, seid):
664        self.acp_seid = seid
665        self.payload = bytes([seid << 2])
666
667    def __str__(self):
668        return self.to_string([f'ACP SEID: {self.acp_seid}'])
669
670
671# -----------------------------------------------------------------------------
672class Simple_Reject(Message):
673    '''
674    Reject messages with just an error code
675    '''
676
677    def init_from_payload(self):
678        self.error_code = self.payload[0]
679
680    def __init__(self, error_code):
681        self.error_code = error_code
682        self.payload = bytes([self.error_code])
683
684    def __str__(self):
685        details = [
686            f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}'
687        ]
688        return self.to_string(details)
689
690
691# -----------------------------------------------------------------------------
692@Message.subclass
693class Discover_Command(Message):
694    '''
695    See Bluetooth AVDTP spec - 8.6.1 Stream End Point Discovery Command
696    '''
697
698
699# -----------------------------------------------------------------------------
700@Message.subclass
701class Discover_Response(Message):
702    '''
703    See Bluetooth AVDTP spec - 8.6.2 Stream End Point Discovery Response
704    '''
705
706    def init_from_payload(self):
707        self.endpoints = []
708        endpoint_count = len(self.payload) // 2
709        for i in range(endpoint_count):
710            self.endpoints.append(EndPointInfo.from_bytes(self.payload[i * 2:(i + 1) * 2]))
711
712    def __init__(self, endpoints):
713        self.endpoints = endpoints
714        self.payload   = b''.join([bytes(endpoint) for endpoint in endpoints])
715
716    def __str__(self):
717        details = []
718        for endpoint in self.endpoints:
719            details.extend(
720                [
721                    f'ACP SEID: {endpoint.seid}',
722                    f'  in_use:     {endpoint.in_use}',
723                    f'  media_type: {name_or_number(AVDTP_MEDIA_TYPE_NAMES, endpoint.media_type)}',
724                    f'  tsep:       {name_or_number(AVDTP_TSEP_NAMES, endpoint.tsep)}'
725                ]
726            )
727        return self.to_string(details)
728
729
730# -----------------------------------------------------------------------------
731@Message.subclass
732class Get_Capabilities_Command(Simple_Command):
733    '''
734    See Bluetooth AVDTP spec - 8.7.1 Get Capabilities Command
735    '''
736
737
738# -----------------------------------------------------------------------------
739@Message.subclass
740class Get_Capabilities_Response(Message):
741    '''
742    See Bluetooth AVDTP spec - 8.7.2 Get All Capabilities Response
743    '''
744
745    def init_from_payload(self):
746        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload)
747
748    def __init__(self, capabilities):
749        self.capabilities = capabilities
750        self.payload = ServiceCapabilities.serialize_capabilities(capabilities)
751
752    def __str__(self):
753        details = [str(capability) for capability in self.capabilities]
754        return self.to_string(details)
755
756
757# -----------------------------------------------------------------------------
758@Message.subclass
759class Get_Capabilities_Reject(Simple_Reject):
760    '''
761    See Bluetooth AVDTP spec - 8.7.3 Get Capabilities Reject
762    '''
763
764
765# -----------------------------------------------------------------------------
766@Message.subclass
767class Get_All_Capabilities_Command(Get_Capabilities_Command):
768    '''
769    See Bluetooth AVDTP spec - 8.8.1 Get All Capabilities Command
770    '''
771
772
773# -----------------------------------------------------------------------------
774@Message.subclass
775class Get_All_Capabilities_Response(Get_Capabilities_Response):
776    '''
777    See Bluetooth AVDTP spec - 8.8.2 Get All Capabilities Response
778    '''
779
780
781# -----------------------------------------------------------------------------
782@Message.subclass
783class Get_All_Capabilities_Reject(Simple_Reject):
784    '''
785    See Bluetooth AVDTP spec - 8.8.3 Get All Capabilities Reject
786    '''
787
788
789# -----------------------------------------------------------------------------
790@Message.subclass
791class Set_Configuration_Command(Message):
792    '''
793    See Bluetooth AVDTP spec - 8.9.1 Set Configuration Command
794    '''
795
796    def init_from_payload(self):
797        self.acp_seid     = self.payload[0] >> 2
798        self.int_seid     = self.payload[1] >> 2
799        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[2:])
800
801    def __init__(self, acp_seid, int_seid, capabilities):
802        self.acp_seid     = acp_seid
803        self.int_seid     = int_seid
804        self.capabilities = capabilities
805        self.payload      = bytes([acp_seid << 2, int_seid << 2]) + ServiceCapabilities.serialize_capabilities(capabilities)
806
807    def __str__(self):
808        details = [
809            f'ACP SEID: {self.acp_seid}',
810            f'INT SEID: {self.int_seid}'
811        ] + [str(capability) for capability in self.capabilities]
812        return self.to_string(details)
813
814
815# -----------------------------------------------------------------------------
816@Message.subclass
817class Set_Configuration_Response(Message):
818    '''
819    See Bluetooth AVDTP spec - 8.9.2 Set Configuration Response
820    '''
821
822
823# -----------------------------------------------------------------------------
824@Message.subclass
825class Set_Configuration_Reject(Message):
826    '''
827    See Bluetooth AVDTP spec - 8.9.3 Set Configuration Reject
828    '''
829
830    def init_from_payload(self):
831        self.service_category = self.payload[0]
832        self.error_code       = self.payload[1]
833
834    def __init__(self, service_category, error_code):
835        self.service_category = service_category
836        self.error_code       = error_code
837        self.payload          = bytes([service_category, self.error_code])
838
839    def __str__(self):
840        details = [
841            f'service_category: {name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)}',
842            f'error_code:       {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}'
843        ]
844        return self.to_string(details)
845
846
847# -----------------------------------------------------------------------------
848@Message.subclass
849class Get_Configuration_Command(Simple_Command):
850    '''
851    See Bluetooth AVDTP spec - 8.10.1 Get Configuration Command
852    '''
853
854
855# -----------------------------------------------------------------------------
856@Message.subclass
857class Get_Configuration_Response(Message):
858    '''
859    See Bluetooth AVDTP spec - 8.10.2 Get Configuration Response
860    '''
861
862    def init_from_payload(self):
863        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload)
864
865    def __init__(self, capabilities):
866        self.capabilities = capabilities
867        self.payload = ServiceCapabilities.serialize_capabilities(capabilities)
868
869    def __str__(self):
870        details = [str(capability) for capability in self.capabilities]
871        return self.to_string(details)
872
873
874# -----------------------------------------------------------------------------
875@Message.subclass
876class Get_Configuration_Reject(Simple_Reject):
877    '''
878    See Bluetooth AVDTP spec - 8.10.3 Get Configuration Reject
879    '''
880
881
882# -----------------------------------------------------------------------------
883@Message.subclass
884class Reconfigure_Command(Message):
885    '''
886    See Bluetooth AVDTP spec - 8.11.1 Reconfigure Command
887    '''
888
889    def init_from_payload(self):
890        self.acp_seid     = self.payload[0] >> 2
891        self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[1:])
892
893    def __str__(self):
894        details = [
895            f'ACP SEID: {self.acp_seid}',
896        ] + [str(capability) for capability in self.capabilities]
897        return self.to_string(details)
898
899
900# -----------------------------------------------------------------------------
901@Message.subclass
902class Reconfigure_Response(Message):
903    '''
904    See Bluetooth AVDTP spec - 8.11.2 Reconfigure Response
905    '''
906
907
908# -----------------------------------------------------------------------------
909@Message.subclass
910class Reconfigure_Reject(Set_Configuration_Reject):
911    '''
912    See Bluetooth AVDTP spec - 8.11.3 Reconfigure Reject
913    '''
914
915
916# -----------------------------------------------------------------------------
917@Message.subclass
918class Open_Command(Simple_Command):
919    '''
920    See Bluetooth AVDTP spec - 8.12.1 Open Stream Command
921    '''
922
923
924# -----------------------------------------------------------------------------
925@Message.subclass
926class Open_Response(Message):
927    '''
928    See Bluetooth AVDTP spec - 8.12.2 Open Stream Response
929    '''
930
931
932# -----------------------------------------------------------------------------
933@Message.subclass
934class Open_Reject(Simple_Reject):
935    '''
936    See Bluetooth AVDTP spec - 8.12.3 Open Stream Reject
937    '''
938
939
940# -----------------------------------------------------------------------------
941@Message.subclass
942class Start_Command(Message):
943    '''
944    See Bluetooth AVDTP spec - 8.13.1 Start Stream Command
945    '''
946
947    def init_from_payload(self):
948        self.acp_seids = [x >> 2 for x in self.payload]
949
950    def __init__(self, seids):
951        self.acp_seids = seids
952        self.payload = bytes([seid << 2 for seid in self.acp_seids])
953
954    def __str__(self):
955        return self.to_string([f'ACP SEIDs: {self.acp_seids}'])
956
957
958# -----------------------------------------------------------------------------
959@Message.subclass
960class Start_Response(Message):
961    '''
962    See Bluetooth AVDTP spec - 8.13.2 Start Stream Response
963    '''
964
965
966# -----------------------------------------------------------------------------
967@Message.subclass
968class Start_Reject(Message):
969    '''
970    See Bluetooth AVDTP spec - 8.13.3 Set Configuration Reject
971    '''
972
973    def init_from_payload(self):
974        self.acp_seid   = self.payload[0] >> 2
975        self.error_code = self.payload[1]
976
977    def __init__(self, acp_seid, error_code):
978        self.acp_seid   = acp_seid
979        self.error_code = error_code
980        self.payload    = bytes([self.acp_seid << 2, self.error_code])
981
982    def __str__(self):
983        details = [
984            f'acp_seid:   {self.acp_seid}',
985            f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}'
986        ]
987        return self.to_string(details)
988
989
990# -----------------------------------------------------------------------------
991@Message.subclass
992class Close_Command(Simple_Command):
993    '''
994    See Bluetooth AVDTP spec - 8.14.1 Close Stream Command
995    '''
996
997
998# -----------------------------------------------------------------------------
999@Message.subclass
1000class Close_Response(Message):
1001    '''
1002    See Bluetooth AVDTP spec - 8.14.2 Close Stream Response
1003    '''
1004
1005
1006# -----------------------------------------------------------------------------
1007@Message.subclass
1008class Close_Reject(Simple_Reject):
1009    '''
1010    See Bluetooth AVDTP spec - 8.14.3 Close Stream Reject
1011    '''
1012
1013
1014# -----------------------------------------------------------------------------
1015@Message.subclass
1016class Suspend_Command(Start_Command):
1017    '''
1018    See Bluetooth AVDTP spec - 8.15.1 Suspend Command
1019    '''
1020
1021
1022# -----------------------------------------------------------------------------
1023@Message.subclass
1024class Suspend_Response(Message):
1025    '''
1026    See Bluetooth AVDTP spec - 8.15.2 Suspend Response
1027    '''
1028
1029
1030# -----------------------------------------------------------------------------
1031@Message.subclass
1032class Suspend_Reject(Start_Reject):
1033    '''
1034    See Bluetooth AVDTP spec - 8.15.3 Suspend Reject
1035    '''
1036
1037
1038# -----------------------------------------------------------------------------
1039@Message.subclass
1040class Abort_Command(Simple_Command):
1041    '''
1042    See Bluetooth AVDTP spec - 8.16.1 Abort Command
1043    '''
1044
1045
1046# -----------------------------------------------------------------------------
1047@Message.subclass
1048class Abort_Response(Message):
1049    '''
1050    See Bluetooth AVDTP spec - 8.16.2 Abort Response
1051    '''
1052
1053
1054# -----------------------------------------------------------------------------
1055@Message.subclass
1056class Security_Control_Command(Message):
1057    '''
1058    See Bluetooth AVDTP spec - 8.17.1 Security Control Command
1059    '''
1060
1061
1062# -----------------------------------------------------------------------------
1063@Message.subclass
1064class Security_Control_Response(Message):
1065    '''
1066    See Bluetooth AVDTP spec - 8.17.2 Security Control Response
1067    '''
1068
1069
1070# -----------------------------------------------------------------------------
1071@Message.subclass
1072class Security_Control_Reject(Simple_Reject):
1073    '''
1074    See Bluetooth AVDTP spec - 8.17.3 Security Control Reject
1075    '''
1076
1077
1078# -----------------------------------------------------------------------------
1079@Message.subclass
1080class General_Reject(Message):
1081    '''
1082    See Bluetooth AVDTP spec - 8.18 General Reject
1083    '''
1084
1085    def to_string(self, details):
1086        return f'{color(f"GENERAL_REJECT", "yellow")}'
1087
1088
1089# -----------------------------------------------------------------------------
1090@Message.subclass
1091class DelayReport_Command(Message):
1092    '''
1093    See Bluetooth AVDTP spec - 8.19.1 Delay Report Command
1094    '''
1095
1096    def init_from_payload(self):
1097        self.acp_seid = self.payload[0] >> 2
1098        self.delay    = (self.payload[1] << 8) | (self.payload[2])
1099
1100    def __str__(self):
1101        return self.to_string([
1102            f'ACP_SEID: {self.acp_seid}',
1103            f'delay:    {self.delay}'
1104        ])
1105
1106
1107# -----------------------------------------------------------------------------
1108@Message.subclass
1109class DelayReport_Response(Message):
1110    '''
1111    See Bluetooth AVDTP spec - 8.19.2 Delay Report Response
1112    '''
1113
1114
1115# -----------------------------------------------------------------------------
1116@Message.subclass
1117class DelayReport_Reject(Simple_Reject):
1118    '''
1119    See Bluetooth AVDTP spec - 8.19.3 Delay Report Reject
1120    '''
1121
1122
1123# -----------------------------------------------------------------------------
1124class Protocol:
1125    SINGLE_PACKET   = 0
1126    START_PACKET    = 1
1127    CONTINUE_PACKET = 2
1128    END_PACKET      = 3
1129
1130    PACKET_TYPE_NAMES = {
1131        SINGLE_PACKET:   'SINGLE_PACKET',
1132        START_PACKET:    'START_PACKET',
1133        CONTINUE_PACKET: 'CONTINUE_PACKET',
1134        END_PACKET:      'END_PACKET'
1135    }
1136
1137    @staticmethod
1138    def packet_type_name(packet_type):
1139        return name_or_number(Protocol.PACKET_TYPE_NAMES, packet_type)
1140
1141    @staticmethod
1142    async def connect(connection, version=(1, 3)):
1143        connector = connection.create_l2cap_connector(AVDTP_PSM)
1144        channel = await connector()
1145        protocol = Protocol(channel, version)
1146        protocol.channel_connector = connector
1147
1148        return protocol
1149
1150    def __init__(self, l2cap_channel, version=(1, 3)):
1151        self.l2cap_channel         = l2cap_channel
1152        self.version               = version
1153        self.rtx_sig_timer         = AVDTP_DEFAULT_RTX_SIG_TIMER
1154        self.message_assembler     = MessageAssembler(self.on_message)
1155        self.transaction_results   = [None] * 16  # Futures for up to 16 transactions
1156        self.transaction_semaphore = asyncio.Semaphore(16)
1157        self.transaction_count     = 0
1158        self.channel_acceptor      = None
1159        self.channel_connector     = None
1160        self.local_endpoints       = []  # Local endpoints, with contiguous seid values
1161        self.remote_endpoints      = {}  # Remote stream endpoints, by seid
1162        self.streams               = {}  # Streams, by seid
1163
1164        # Register to receive PDUs from the channel
1165        l2cap_channel.sink = self.on_pdu
1166        l2cap_channel.on('open', self.on_l2cap_channel_open)
1167
1168    def get_local_endpoint_by_seid(self, seid):
1169        if seid > 0 and seid <= len(self.local_endpoints):
1170            return self.local_endpoints[seid - 1]
1171
1172    def add_source(self, codec_capabilities, packet_pump):
1173        seid = len(self.local_endpoints) + 1
1174        source = LocalSource(self, seid, codec_capabilities, packet_pump)
1175        self.local_endpoints.append(source)
1176
1177        return source
1178
1179    def add_sink(self, codec_capabilities):
1180        seid = len(self.local_endpoints) + 1
1181        sink = LocalSink(self, seid, codec_capabilities)
1182        self.local_endpoints.append(sink)
1183
1184        return sink
1185
1186    async def create_stream(self, source, sink):
1187        # Check that the source isn't already used in a stream
1188        if source.in_use:
1189            raise InvalidStateError('source already in use')
1190
1191        # Create or reuse a new stream to associate the source and the sink
1192        if source.seid in self.streams:
1193            stream = self.streams[source.seid]
1194        else:
1195            stream = Stream(self, source, sink)
1196            self.streams[source.seid] = stream
1197
1198        # The stream can now be configured
1199        await stream.configure()
1200
1201        return stream
1202
1203    async def discover_remote_endpoints(self):
1204        self.remote_endpoints = {}
1205
1206        response = await self.send_command(Discover_Command())
1207        for endpoint_entry in response.endpoints:
1208            logger.debug(f'getting endpoint capabilities for endpoint {endpoint_entry.seid}')
1209            get_capabilities_response = await self.get_capabilities(endpoint_entry.seid)
1210            endpoint = DiscoveredStreamEndPoint(
1211                self,
1212                endpoint_entry.seid,
1213                endpoint_entry.media_type,
1214                endpoint_entry.tsep,
1215                endpoint_entry.in_use,
1216                get_capabilities_response.capabilities
1217            )
1218            self.remote_endpoints[endpoint_entry.seid] = endpoint
1219
1220        return self.remote_endpoints.values()
1221
1222    def find_remote_sink_by_codec(self, media_type, codec_type):
1223        for endpoint in self.remote_endpoints.values():
1224            if not endpoint.in_use and endpoint.media_type == media_type and endpoint.tsep == AVDTP_TSEP_SNK:
1225                has_media_transport = False
1226                has_codec = False
1227                for capabilities in endpoint.capabilities:
1228                    if capabilities.service_category == AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY:
1229                        has_media_transport = True
1230                    elif capabilities.service_category == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY:
1231                        if capabilities.media_type == AVDTP_AUDIO_MEDIA_TYPE and capabilities.media_codec_type == codec_type:
1232                            has_codec = True
1233                if has_media_transport and has_codec:
1234                    return endpoint
1235
1236    def on_pdu(self, pdu):
1237        self.message_assembler.on_pdu(pdu)
1238
1239    def on_message(self, transaction_label, message):
1240        logger.debug(f'{color("<<< Received AVDTP message", "magenta")}: [{transaction_label}] {message}')
1241
1242        # Check that the identifier is not reserved
1243        if message.signal_identifier == 0:
1244            logger.warning('!!! reserved signal identifier')
1245            return
1246
1247        # Check that the identifier is valid
1248        if message.signal_identifier < 0 or message.signal_identifier > AVDTP_DELAYREPORT:
1249            logger.warning('!!! invalid signal identifier')
1250            self.send_message(transaction_label, General_Reject())
1251
1252        if message.message_type == Message.COMMAND:
1253            # Command
1254            handler_name = f'on_{AVDTP_SIGNAL_NAMES.get(message.signal_identifier,"").replace("AVDTP_","").lower()}_command'
1255            handler = getattr(self, handler_name, None)
1256            if handler:
1257                try:
1258                    response = handler(message)
1259                    self.send_message(transaction_label, response)
1260                except Exception as error:
1261                    logger.warning(f'{color("!!! Exception in handler:", "red")} {error}')
1262            else:
1263                logger.warning('unhandled command')
1264        else:
1265            # Response, look for a pending transaction with the same label
1266            transaction_result = self.transaction_results[transaction_label]
1267            if transaction_result is None:
1268                logger.warning(color('!!! no pending transaction for label', 'red'))
1269                return
1270
1271            transaction_result.set_result(message)
1272            self.transaction_results[transaction_label] = None
1273            self.transaction_semaphore.release()
1274
1275    def on_l2cap_connection(self, channel):
1276        # Forward the channel to the endpoint that's expecting it
1277        if self.channel_acceptor:
1278            self.channel_acceptor.on_l2cap_connection(channel)
1279
1280    def on_l2cap_channel_open(self):
1281        logger.debug(color('<<< L2CAP channel open', 'magenta'))
1282
1283    def send_message(self, transaction_label, message):
1284        logger.debug(f'{color(">>> Sending AVDTP message", "magenta")}: [{transaction_label}] {message}')
1285        max_fragment_size = self.l2cap_channel.mtu - 3  # Enough space for a 3-byte start packet header
1286        payload = message.payload
1287        if len(payload) + 2 <= self.l2cap_channel.mtu:
1288            # Fits in a single packet
1289            packet_type = self.SINGLE_PACKET
1290        else:
1291            packet_type = self.START_PACKET
1292
1293        done = False
1294        while not done:
1295            first_header_byte = transaction_label << 4 | packet_type << 2 | message.message_type
1296
1297            if packet_type == self.SINGLE_PACKET:
1298                header = bytes([first_header_byte, message.signal_identifier])
1299            elif packet_type == self.START_PACKET:
1300                packet_count = (max_fragment_size - 1 + len(payload)) // max_fragment_size
1301                header = bytes([first_header_byte, message.signal_identifier, packet_count])
1302            else:
1303                header = bytes([first_header_byte])
1304
1305            # Send one packet
1306            self.l2cap_channel.send_pdu(header + payload[:max_fragment_size])
1307
1308            # Prepare for the next packet
1309            payload = payload[max_fragment_size:]
1310            if payload:
1311                packet_type = self.CONTINUE_PACKET if payload > max_fragment_size else self.END_PACKET
1312            else:
1313                done = True
1314
1315    async def send_command(self, command):
1316        # TODO: support timeouts
1317        # Send the command
1318        (transaction_label, transaction_result) = await self.start_transaction()
1319        self.send_message(transaction_label, command)
1320
1321        # Wait for the response
1322        response = await transaction_result
1323
1324        # Check for errors
1325        if response.message_type == Message.GENERAL_REJECT or response.message_type == Message.RESPONSE_REJECT:
1326            raise ProtocolError(response.error_code, 'avdtp')
1327
1328        return response
1329
1330    async def start_transaction(self):
1331        # Wait until we can start a new transaction
1332        await self.transaction_semaphore.acquire()
1333
1334        # Look for the next free entry to store the transaction result
1335        for i in range(16):
1336            transaction_label = (self.transaction_count + i) % 16
1337            if self.transaction_results[transaction_label] is None:
1338                transaction_result = asyncio.get_running_loop().create_future()
1339                self.transaction_results[transaction_label] = transaction_result
1340                self.transaction_count += 1
1341                return (transaction_label, transaction_result)
1342
1343        assert(False)  # Should never reach this
1344
1345    async def get_capabilities(self, seid):
1346        if self.version > (1, 2):
1347            return await self.send_command(Get_All_Capabilities_Command(seid))
1348        else:
1349            return await self.send_command(Get_Capabilities_Command(seid))
1350
1351    async def set_configuration(self, acp_seid, int_seid, capabilities):
1352        return await self.send_command(Set_Configuration_Command(acp_seid, int_seid, capabilities))
1353
1354    async def get_configuration(self, seid):
1355        response = await self.send_command(Get_Configuration_Command(seid))
1356        return response.capabilities
1357
1358    async def open(self, seid):
1359        return await self.send_command(Open_Command(seid))
1360
1361    async def start(self, seids):
1362        return await self.send_command(Start_Command(seids))
1363
1364    async def suspend(self, seids):
1365        return await self.send_command(Suspend_Command(seids))
1366
1367    async def close(self, seid):
1368        return await self.send_command(Close_Command(seid))
1369
1370    async def abort(self, seid):
1371        return await self.send_command(Abort_Command(seid))
1372
1373    def on_discover_command(self, command):
1374        endpoint_infos = [
1375            EndPointInfo(endpoint.seid, 0, endpoint.media_type, endpoint.tsep)
1376            for endpoint in self.local_endpoints
1377        ]
1378        return Discover_Response(endpoint_infos)
1379
1380    def on_get_capabilities_command(self, command):
1381        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1382        if endpoint is None:
1383            return Get_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1384
1385        return Get_Capabilities_Response(endpoint.capabilities)
1386
1387    def on_get_all_capabilities_command(self, command):
1388        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1389        if endpoint is None:
1390            return Get_All_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1391
1392        return Get_All_Capabilities_Response(endpoint.capabilities)
1393
1394    def on_set_configuration_command(self, command):
1395        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1396        if endpoint is None:
1397            return Set_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1398
1399        # Check that the local endpoint isn't in use
1400        if endpoint.in_use:
1401            return Set_Configuration_Reject(AVDTP_SEP_IN_USE_ERROR)
1402
1403        # Create a stream object for the pair of endpoints
1404        stream = Stream(self, endpoint, StreamEndPointProxy(self, command.int_seid))
1405        self.streams[command.acp_seid] = stream
1406
1407        result = stream.on_set_configuration_command(command.capabilities)
1408        return result or Set_Configuration_Response()
1409
1410    def on_get_configuration_command(self, command):
1411        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1412        if endpoint is None:
1413            return Get_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1414        if endpoint.stream is None:
1415            return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1416
1417        return endpoint.stream.on_get_configuration_command()
1418
1419    def on_reconfigure_command(self, command):
1420        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1421        if endpoint is None:
1422            return Reconfigure_Reject(0, AVDTP_BAD_ACP_SEID_ERROR)
1423        if endpoint.stream is None:
1424            return Reconfigure_Reject(0, AVDTP_BAD_STATE_ERROR)
1425
1426        result = endpoint.stream.on_reconfigure_command(command.capabilities)
1427        return result or Reconfigure_Response()
1428
1429    def on_open_command(self, command):
1430        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1431        if endpoint is None:
1432            return Open_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1433        if endpoint.stream is None:
1434            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1435
1436        result = endpoint.stream.on_open_command()
1437        return result or Open_Response()
1438
1439    def on_start_command(self, command):
1440        for seid in command.acp_seids:
1441            endpoint = self.get_local_endpoint_by_seid(seid)
1442            if endpoint is None:
1443                return Start_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR)
1444            if endpoint.stream is None:
1445                return Start_Reject(AVDTP_BAD_STATE_ERROR)
1446
1447        # Start all streams
1448        # TODO: deal with partial failures
1449        for seid in command.acp_seids:
1450            endpoint = self.get_local_endpoint_by_seid(seid)
1451            result = endpoint.stream.on_start_command()
1452            if result is not None:
1453                return result
1454
1455        return Start_Response()
1456
1457    def on_suspend_command(self, command):
1458        for seid in command.acp_seids:
1459            endpoint = self.get_local_endpoint_by_seid(seid)
1460            if endpoint is None:
1461                return Suspend_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR)
1462            if endpoint.stream is None:
1463                return Suspend_Reject(seid, AVDTP_BAD_STATE_ERROR)
1464
1465        # Suspend all streams
1466        # TODO: deal with partial failures
1467        for seid in command.acp_seids:
1468            endpoint = self.get_local_endpoint_by_seid(seid)
1469            result = endpoint.stream.on_suspend_command()
1470            if result is not None:
1471                return result
1472
1473        return Suspend_Response()
1474
1475    def on_close_command(self, command):
1476        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1477        if endpoint is None:
1478            return Close_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1479        if endpoint.stream is None:
1480            return Close_Reject(AVDTP_BAD_STATE_ERROR)
1481
1482        result = endpoint.stream.on_close_command()
1483        return result or Close_Response()
1484
1485    def on_abort_command(self, command):
1486        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1487        if endpoint is None or endpoint.stream is None:
1488            return Abort_Response()
1489
1490        endpoint.stream.on_abort_command()
1491        return Abort_Response()
1492
1493    def on_security_control_command(self, command):
1494        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1495        if endpoint is None:
1496            return Security_Control_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1497
1498        result = endpoint.on_security_control_command(command.payload)
1499        return result or Security_Control_Response()
1500
1501    def on_delayreport_command(self, command):
1502        endpoint = self.get_local_endpoint_by_seid(command.acp_seid)
1503        if endpoint is None:
1504            return DelayReport_Reject(AVDTP_BAD_ACP_SEID_ERROR)
1505
1506        result = endpoint.on_delayreport_command(command.delay)
1507        return result or DelayReport_Response()
1508
1509
1510# -----------------------------------------------------------------------------
1511class Listener(EventEmitter):
1512    @staticmethod
1513    def create_registrar(device):
1514        return device.create_l2cap_registrar(AVDTP_PSM)
1515
1516    def set_server(self, connection, server):
1517        self.servers[connection.handle] = server
1518
1519    def __init__(self, registrar, version=(1, 3)):
1520        super().__init__()
1521        self.version = version
1522        self.servers = {}  # Servers, by connection handle
1523
1524        # Listen for incoming L2CAP connections
1525        registrar(self.on_l2cap_connection)
1526
1527    def on_l2cap_connection(self, channel):
1528        logger.debug(f'{color("<<< incoming L2CAP connection:", "magenta")} {channel}')
1529
1530        if channel.connection.handle in self.servers:
1531            # This is a channel for a stream endpoint
1532            server = self.servers[channel.connection.handle]
1533            server.on_l2cap_connection(channel)
1534        else:
1535            # This is a new command/response channel
1536            def on_channel_open():
1537                server = Protocol(channel, self.version)
1538                self.set_server(channel.connection, server)
1539                self.emit('connection', server)
1540            channel.on('open', on_channel_open)
1541
1542
1543# -----------------------------------------------------------------------------
1544class Stream:
1545    '''
1546    Pair of a local and a remote stream endpoint that can stream from one to the other
1547    '''
1548
1549    @staticmethod
1550    def state_name(state):
1551        return name_or_number(AVDTP_STATE_NAMES, state)
1552
1553    def change_state(self, state):
1554        logger.debug(f'{self} state change -> {color(self.state_name(state), "cyan")}')
1555        self.state = state
1556
1557    def send_media_packet(self, packet):
1558        self.rtp_channel.send_pdu(bytes(packet))
1559
1560    async def configure(self):
1561        if self.state != AVDTP_IDLE_STATE:
1562            raise InvalidStateError('current state is not IDLE')
1563
1564        await self.remote_endpoint.set_configuration(
1565            self.local_endpoint.seid,
1566            self.local_endpoint.configuration
1567        )
1568        self.change_state(AVDTP_CONFIGURED_STATE)
1569
1570    async def open(self):
1571        if self.state != AVDTP_CONFIGURED_STATE:
1572            raise InvalidStateError('current state is not CONFIGURED')
1573
1574        logger.debug('opening remote endpoint')
1575        await self.remote_endpoint.open()
1576
1577        self.change_state(AVDTP_OPEN_STATE)
1578
1579        # Create a channel for RTP packets
1580        self.rtp_channel = await self.protocol.channel_connector()
1581
1582    async def start(self):
1583        # Auto-open if needed
1584        if self.state == AVDTP_CONFIGURED_STATE:
1585            await self.open()
1586
1587        if self.state != AVDTP_OPEN_STATE:
1588            raise InvalidStateError('current state is not OPEN')
1589
1590        logger.debug('starting remote endpoint')
1591        await self.remote_endpoint.start()
1592
1593        logger.debug('starting local endpoint')
1594        await self.local_endpoint.start()
1595
1596        self.change_state(AVDTP_STREAMING_STATE)
1597
1598    async def stop(self):
1599        if self.state != AVDTP_STREAMING_STATE:
1600            raise InvalidStateError('current state is not STREAMING')
1601
1602        logger.debug('stopping local endpoint')
1603        await self.local_endpoint.stop()
1604
1605        logger.debug('stopping remote endpoint')
1606        await self.remote_endpoint.stop()
1607
1608        self.change_state(AVDTP_OPEN_STATE)
1609
1610    async def close(self):
1611        if self.state not in {AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE}:
1612            raise InvalidStateError('current state is not OPEN or STREAMING')
1613
1614        logger.debug('closing local endpoint')
1615        await self.local_endpoint.close()
1616
1617        logger.debug('closing remote endpoint')
1618        await self.remote_endpoint.close()
1619
1620        # Release any channels we may have created
1621        self.change_state(AVDTP_CLOSING_STATE)
1622        if self.rtp_channel:
1623            await self.rtp_channel.disconnect()
1624            self.rtp_channel = None
1625
1626        # Release the endpoint
1627        self.local_endpoint.in_use = 0
1628
1629        self.change_state(AVDTP_IDLE_STATE)
1630
1631    def on_set_configuration_command(self, configuration):
1632        if self.state != AVDTP_IDLE_STATE:
1633            return Set_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1634
1635        result = self.local_endpoint.on_set_configuration_command(configuration)
1636        if result is not None:
1637            return result
1638
1639        self.change_state(AVDTP_CONFIGURED_STATE)
1640
1641    def on_get_configuration_command(self, configuration):
1642        if self.state not in {AVDTP_CONFIGURED_STATE, AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE}:
1643            return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR)
1644
1645        return self.local_endpoint.on_get_configuration_command(configuration)
1646
1647    def on_reconfigure_command(self, configuration):
1648        if self.state != AVDTP_OPEN_STATE:
1649            return Reconfigure_Reject(AVDTP_BAD_STATE_ERROR)
1650
1651        result = self.local_endpoint.on_reconfigure_command(configuration)
1652        if result is not None:
1653            return result
1654
1655    def on_open_command(self):
1656        if self.state != AVDTP_CONFIGURED_STATE:
1657            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1658
1659        result = self.local_endpoint.on_open_command()
1660        if result is not None:
1661            return result
1662
1663        # Register to accept the next channel
1664        self.protocol.channel_acceptor = self
1665
1666        self.change_state(AVDTP_OPEN_STATE)
1667
1668    def on_start_command(self):
1669        if self.state != AVDTP_OPEN_STATE:
1670            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1671
1672        # Check that we have an RTP channel
1673        if self.rtp_channel is None:
1674            logger.warning('received start command before RTP channel establishment')
1675            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1676
1677        result = self.local_endpoint.on_start_command()
1678        if result is not None:
1679            return result
1680
1681        self.change_state(AVDTP_STREAMING_STATE)
1682
1683    def on_suspend_command(self):
1684        if self.state != AVDTP_STREAMING_STATE:
1685            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1686
1687        result = self.local_endpoint.on_suspend_command()
1688        if result is not None:
1689            return result
1690
1691        self.change_state(AVDTP_OPEN_STATE)
1692
1693    def on_close_command(self):
1694        if self.state not in {AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE}:
1695            return Open_Reject(AVDTP_BAD_STATE_ERROR)
1696
1697        result = self.local_endpoint.on_close_command()
1698        if result is not None:
1699            return result
1700
1701        self.change_state(AVDTP_CLOSING_STATE)
1702
1703        if self.rtp_channel is None:
1704            # No channel to release, we're done
1705            self.change_state(AVDTP_IDLE_STATE)
1706        else:
1707            # TODO: set a timer as we wait for the RTP channel to be closed
1708            pass
1709
1710    def on_abort_command(self):
1711        if self.rtp_channel is None:
1712            # No need to wait
1713            self.change_state(AVDTP_IDLE_STATE)
1714        else:
1715            # Wait for the RTP channel to be closed
1716            self.change_state(AVDTP_ABORTING_STATE)
1717
1718    def on_l2cap_connection(self, channel):
1719        logger.debug(color('<<< stream channel connected', 'magenta'))
1720        self.rtp_channel = channel
1721        channel.on('open',  self.on_l2cap_channel_open)
1722        channel.on('close', self.on_l2cap_channel_close)
1723
1724        # We don't need more channels
1725        self.protocol.channel_acceptor = None
1726
1727    def on_l2cap_channel_open(self):
1728        logger.debug(color('<<< stream channel open', 'magenta'))
1729        self.local_endpoint.on_rtp_channel_open()
1730
1731    def on_l2cap_channel_close(self):
1732        logger.debug(color('<<< stream channel closed', 'magenta'))
1733        self.local_endpoint.on_rtp_channel_close()
1734        self.local_endpoint.in_use = 0
1735        self.rtp_channel = None
1736
1737        if self.state in {AVDTP_CLOSING_STATE, AVDTP_ABORTING_STATE}:
1738            self.change_state(AVDTP_IDLE_STATE)
1739        else:
1740            logger.warning('unexpected channel close while not CLOSING or ABORTING')
1741
1742    def __init__(self, protocol, local_endpoint, remote_endpoint):
1743        '''
1744        remote_endpoint must be a subclass of StreamEndPointProxy
1745
1746        '''
1747        self.protocol        = protocol
1748        self.local_endpoint  = local_endpoint
1749        self.remote_endpoint = remote_endpoint
1750        self.rtp_channel     = None
1751        self.state           = AVDTP_IDLE_STATE
1752
1753        local_endpoint.stream = self
1754        local_endpoint.in_use = 1
1755
1756    def __str__(self):
1757        return f'Stream({self.local_endpoint.seid} -> {self.remote_endpoint.seid} {self.state_name(self.state)})'
1758
1759
1760# -----------------------------------------------------------------------------
1761class StreamEndPoint:
1762    def __init__(self, seid, media_type, tsep, in_use, capabilities):
1763        self.seid         = seid
1764        self.media_type   = media_type
1765        self.tsep         = tsep
1766        self.in_use       = in_use
1767        self.capabilities = capabilities
1768
1769    def __str__(self):
1770        return '\n'.join([
1771            'SEP(',
1772            f'  seid={self.seid}',
1773            f'  media_type={name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}',
1774            f'  tsep={name_or_number(AVDTP_TSEP_NAMES, self.tsep)}',
1775            f'  in_use={self.in_use}',
1776            '  capabilities=[',
1777            '\n'.join([f'    {x}' for x in self.capabilities]),
1778            '  ]',
1779            ')'
1780        ])
1781
1782
1783# -----------------------------------------------------------------------------
1784class StreamEndPointProxy:
1785    def __init__(self, protocol, seid):
1786        self.seid     = seid
1787        self.protocol = protocol
1788
1789    async def set_configuration(self, int_seid, configuration):
1790        return await self.protocol.set_configuration(
1791            self.seid,
1792            int_seid,
1793            configuration
1794        )
1795
1796    async def open(self):
1797        return await self.protocol.open(self.seid)
1798
1799    async def start(self):
1800        return await self.protocol.start([self.seid])
1801
1802    async def stop(self):
1803        return await self.protocol.suspend([self.seid])
1804
1805    async def close(self):
1806        return await self.protocol.close(self.seid)
1807
1808    async def abort(self):
1809        return await self.protocol.abort(self.seid)
1810
1811
1812# -----------------------------------------------------------------------------
1813class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy):
1814    def __init__(self, protocol, seid, media_type, tsep, in_use, capabilities):
1815        StreamEndPoint.__init__(self, seid, media_type, tsep, in_use, capabilities)
1816        StreamEndPointProxy.__init__(self, protocol, seid)
1817
1818
1819# -----------------------------------------------------------------------------
1820class LocalStreamEndPoint(StreamEndPoint):
1821    def __init__(self, protocol, seid, media_type, tsep, capabilities, configuration=[]):
1822        super().__init__(seid, media_type, tsep, 0, capabilities)
1823        self.protocol      = protocol
1824        self.configuration = configuration
1825        self.stream        = None
1826
1827    async def start(self):
1828        pass
1829
1830    async def stop(self):
1831        pass
1832
1833    async def close(self):
1834        pass
1835
1836    def on_reconfigure_command(self, command):
1837        pass
1838
1839    def on_get_configuration_command(self):
1840        return Get_Configuration_Response(self.configuration)
1841
1842    def on_open_command(self):
1843        pass
1844
1845    def on_start_command(self):
1846        pass
1847
1848    def on_suspend_command(self):
1849        pass
1850
1851    def on_close_command(self):
1852        pass
1853
1854    def on_abort_command(self):
1855        pass
1856
1857    def on_rtp_channel_open(self):
1858        pass
1859
1860    def on_rtp_channel_close(self):
1861        pass
1862
1863
1864# -----------------------------------------------------------------------------
1865class LocalSource(LocalStreamEndPoint, EventEmitter):
1866    def __init__(self, protocol, seid, codec_capabilities, packet_pump):
1867        capabilities = [
1868            ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
1869            codec_capabilities
1870        ]
1871        LocalStreamEndPoint.__init__(self, protocol, seid, codec_capabilities.media_type, AVDTP_TSEP_SRC, capabilities, capabilities)
1872        EventEmitter.__init__(self)
1873        self.packet_pump = packet_pump
1874
1875    async def start(self):
1876        if self.packet_pump:
1877            return await self.packet_pump.start(self.stream.rtp_channel)
1878        else:
1879            self.emit('start', self.stream.rtp_channel)
1880
1881    async def stop(self):
1882        if self.packet_pump:
1883            return await self.packet_pump.stop()
1884        else:
1885            self.emit('stop')
1886
1887    def on_set_configuration_command(self, configuration):
1888        # For now, blindly accept the configuration
1889        logger.debug(f'<<< received source configuration: {configuration}')
1890        self.configuration = configuration
1891
1892    def on_start_command(self):
1893        asyncio.get_running_loop().create_task(self.start())
1894
1895    def on_suspend_command(self):
1896        asyncio.get_running_loop().create_task(self.stop())
1897
1898
1899# -----------------------------------------------------------------------------
1900class LocalSink(LocalStreamEndPoint, EventEmitter):
1901    def __init__(self, protocol, seid, codec_capabilities):
1902        capabilities = [
1903            ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY),
1904            codec_capabilities
1905        ]
1906        LocalStreamEndPoint.__init__(self, protocol, seid, codec_capabilities.media_type, AVDTP_TSEP_SNK, capabilities)
1907        EventEmitter.__init__(self)
1908
1909    def on_set_configuration_command(self, configuration):
1910        # For now, blindly accept the configuration
1911        logger.debug(f'<<< received sink configuration: {configuration}')
1912        self.configuration = configuration
1913
1914    def on_rtp_channel_open(self):
1915        logger.debug(color('<<< RTP channel open', 'magenta'))
1916        self.stream.rtp_channel.sink = self.on_avdtp_packet
1917
1918    def on_avdtp_packet(self, packet):
1919        rtp_packet = MediaPacket.from_bytes(packet)
1920        logger.debug(f'{color("<<< RTP Packet:", "green")} {rtp_packet} {rtp_packet.payload[:16].hex()}')
1921        self.emit('rtp_packet', rtp_packet)
1922