1# Copyright 2021-2022 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19import asyncio 20import struct 21import time 22import logging 23import enum 24import warnings 25from pyee import EventEmitter 26from typing import ( 27 Any, 28 Awaitable, 29 Dict, 30 Type, 31 Tuple, 32 Optional, 33 Callable, 34 List, 35 AsyncGenerator, 36 Iterable, 37 Union, 38 SupportsBytes, 39 cast, 40) 41 42from .core import ( 43 BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE, 44 InvalidStateError, 45 ProtocolError, 46 name_or_number, 47) 48from .a2dp import ( 49 A2DP_CODEC_TYPE_NAMES, 50 A2DP_MPEG_2_4_AAC_CODEC_TYPE, 51 A2DP_NON_A2DP_CODEC_TYPE, 52 A2DP_SBC_CODEC_TYPE, 53 AacMediaCodecInformation, 54 SbcMediaCodecInformation, 55 VendorSpecificMediaCodecInformation, 56) 57from . import sdp, device, l2cap 58from .colors import color 59 60# ----------------------------------------------------------------------------- 61# Logging 62# ----------------------------------------------------------------------------- 63logger = logging.getLogger(__name__) 64 65 66# ----------------------------------------------------------------------------- 67# Constants 68# ----------------------------------------------------------------------------- 69# fmt: off 70# pylint: disable=line-too-long 71 72AVDTP_PSM = 0x0019 73 74AVDTP_DEFAULT_RTX_SIG_TIMER = 5 # Seconds 75 76# Signal Identifiers (AVDTP spec - 8.5 Signal Command Set) 77AVDTP_DISCOVER = 0x01 78AVDTP_GET_CAPABILITIES = 0x02 79AVDTP_SET_CONFIGURATION = 0x03 80AVDTP_GET_CONFIGURATION = 0x04 81AVDTP_RECONFIGURE = 0x05 82AVDTP_OPEN = 0x06 83AVDTP_START = 0x07 84AVDTP_CLOSE = 0x08 85AVDTP_SUSPEND = 0x09 86AVDTP_ABORT = 0x0A 87AVDTP_SECURITY_CONTROL = 0x0B 88AVDTP_GET_ALL_CAPABILITIES = 0x0C 89AVDTP_DELAYREPORT = 0x0D 90 91AVDTP_SIGNAL_NAMES = { 92 AVDTP_DISCOVER: 'AVDTP_DISCOVER', 93 AVDTP_GET_CAPABILITIES: 'AVDTP_GET_CAPABILITIES', 94 AVDTP_SET_CONFIGURATION: 'AVDTP_SET_CONFIGURATION', 95 AVDTP_GET_CONFIGURATION: 'AVDTP_GET_CONFIGURATION', 96 AVDTP_RECONFIGURE: 'AVDTP_RECONFIGURE', 97 AVDTP_OPEN: 'AVDTP_OPEN', 98 AVDTP_START: 'AVDTP_START', 99 AVDTP_CLOSE: 'AVDTP_CLOSE', 100 AVDTP_SUSPEND: 'AVDTP_SUSPEND', 101 AVDTP_ABORT: 'AVDTP_ABORT', 102 AVDTP_SECURITY_CONTROL: 'AVDTP_SECURITY_CONTROL', 103 AVDTP_GET_ALL_CAPABILITIES: 'AVDTP_GET_ALL_CAPABILITIES', 104 AVDTP_DELAYREPORT: 'AVDTP_DELAYREPORT' 105} 106 107AVDTP_SIGNAL_IDENTIFIERS = { 108 'AVDTP_DISCOVER': AVDTP_DISCOVER, 109 'AVDTP_GET_CAPABILITIES': AVDTP_GET_CAPABILITIES, 110 'AVDTP_SET_CONFIGURATION': AVDTP_SET_CONFIGURATION, 111 'AVDTP_GET_CONFIGURATION': AVDTP_GET_CONFIGURATION, 112 'AVDTP_RECONFIGURE': AVDTP_RECONFIGURE, 113 'AVDTP_OPEN': AVDTP_OPEN, 114 'AVDTP_START': AVDTP_START, 115 'AVDTP_CLOSE': AVDTP_CLOSE, 116 'AVDTP_SUSPEND': AVDTP_SUSPEND, 117 'AVDTP_ABORT': AVDTP_ABORT, 118 'AVDTP_SECURITY_CONTROL': AVDTP_SECURITY_CONTROL, 119 'AVDTP_GET_ALL_CAPABILITIES': AVDTP_GET_ALL_CAPABILITIES, 120 'AVDTP_DELAYREPORT': AVDTP_DELAYREPORT 121} 122 123# Error codes (AVDTP spec - 8.20.6.2 ERROR_CODE tables) 124AVDTP_BAD_HEADER_FORMAT_ERROR = 0x01 125AVDTP_BAD_LENGTH_ERROR = 0x11 126AVDTP_BAD_ACP_SEID_ERROR = 0x12 127AVDTP_SEP_IN_USE_ERROR = 0x13 128AVDTP_SEP_NOT_IN_USE_ERROR = 0x14 129AVDTP_BAD_SERV_CATEGORY_ERROR = 0x17 130AVDTP_BAD_PAYLOAD_FORMAT_ERROR = 0x18 131AVDTP_NOT_SUPPORTED_COMMAND_ERROR = 0x19 132AVDTP_INVALID_CAPABILITIES_ERROR = 0x1A 133AVDTP_BAD_RECOVERY_TYPE_ERROR = 0x22 134AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR = 0x23 135AVDTP_BAD_RECOVERY_FORMAT_ERROR = 0x25 136AVDTP_BAD_ROHC_FORMAT_ERROR = 0x26 137AVDTP_BAD_CP_FORMAT_ERROR = 0x27 138AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR = 0x28 139AVDTP_UNSUPPORTED_CONFIGURATION_ERROR = 0x29 140AVDTP_BAD_STATE_ERROR = 0x31 141 142AVDTP_ERROR_NAMES = { 143 AVDTP_BAD_HEADER_FORMAT_ERROR: 'AVDTP_BAD_HEADER_FORMAT_ERROR', 144 AVDTP_BAD_LENGTH_ERROR: 'AVDTP_BAD_LENGTH_ERROR', 145 AVDTP_BAD_ACP_SEID_ERROR: 'AVDTP_BAD_ACP_SEID_ERROR', 146 AVDTP_SEP_IN_USE_ERROR: 'AVDTP_SEP_IN_USE_ERROR', 147 AVDTP_SEP_NOT_IN_USE_ERROR: 'AVDTP_SEP_NOT_IN_USE_ERROR', 148 AVDTP_BAD_SERV_CATEGORY_ERROR: 'AVDTP_BAD_SERV_CATEGORY_ERROR', 149 AVDTP_BAD_PAYLOAD_FORMAT_ERROR: 'AVDTP_BAD_PAYLOAD_FORMAT_ERROR', 150 AVDTP_NOT_SUPPORTED_COMMAND_ERROR: 'AVDTP_NOT_SUPPORTED_COMMAND_ERROR', 151 AVDTP_INVALID_CAPABILITIES_ERROR: 'AVDTP_INVALID_CAPABILITIES_ERROR', 152 AVDTP_BAD_RECOVERY_TYPE_ERROR: 'AVDTP_BAD_RECOVERY_TYPE_ERROR', 153 AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR: 'AVDTP_BAD_MEDIA_TRANSPORT_FORMAT_ERROR', 154 AVDTP_BAD_RECOVERY_FORMAT_ERROR: 'AVDTP_BAD_RECOVERY_FORMAT_ERROR', 155 AVDTP_BAD_ROHC_FORMAT_ERROR: 'AVDTP_BAD_ROHC_FORMAT_ERROR', 156 AVDTP_BAD_CP_FORMAT_ERROR: 'AVDTP_BAD_CP_FORMAT_ERROR', 157 AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR: 'AVDTP_BAD_MULTIPLEXING_FORMAT_ERROR', 158 AVDTP_UNSUPPORTED_CONFIGURATION_ERROR: 'AVDTP_UNSUPPORTED_CONFIGURATION_ERROR', 159 AVDTP_BAD_STATE_ERROR: 'AVDTP_BAD_STATE_ERROR' 160} 161 162AVDTP_AUDIO_MEDIA_TYPE = 0x00 163AVDTP_VIDEO_MEDIA_TYPE = 0x01 164AVDTP_MULTIMEDIA_MEDIA_TYPE = 0x02 165 166AVDTP_MEDIA_TYPE_NAMES = { 167 AVDTP_AUDIO_MEDIA_TYPE: 'AVDTP_AUDIO_MEDIA_TYPE', 168 AVDTP_VIDEO_MEDIA_TYPE: 'AVDTP_VIDEO_MEDIA_TYPE', 169 AVDTP_MULTIMEDIA_MEDIA_TYPE: 'AVDTP_MULTIMEDIA_MEDIA_TYPE' 170} 171 172# TSEP (AVDTP spec - 8.20.3 Stream End-point Type, Source or Sink (TSEP)) 173AVDTP_TSEP_SRC = 0x00 174AVDTP_TSEP_SNK = 0x01 175 176AVDTP_TSEP_NAMES = { 177 AVDTP_TSEP_SRC: 'AVDTP_TSEP_SRC', 178 AVDTP_TSEP_SNK: 'AVDTP_TSEP_SNK' 179} 180 181# Service Categories (AVDTP spec - Table 8.47: Service Category information element field values) 182AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY = 0x01 183AVDTP_REPORTING_SERVICE_CATEGORY = 0x02 184AVDTP_RECOVERY_SERVICE_CATEGORY = 0x03 185AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY = 0x04 186AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY = 0x05 187AVDTP_MULTIPLEXING_SERVICE_CATEGORY = 0x06 188AVDTP_MEDIA_CODEC_SERVICE_CATEGORY = 0x07 189AVDTP_DELAY_REPORTING_SERVICE_CATEGORY = 0x08 190 191AVDTP_SERVICE_CATEGORY_NAMES = { 192 AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY: 'AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY', 193 AVDTP_REPORTING_SERVICE_CATEGORY: 'AVDTP_REPORTING_SERVICE_CATEGORY', 194 AVDTP_RECOVERY_SERVICE_CATEGORY: 'AVDTP_RECOVERY_SERVICE_CATEGORY', 195 AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY: 'AVDTP_CONTENT_PROTECTION_SERVICE_CATEGORY', 196 AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY: 'AVDTP_HEADER_COMPRESSION_SERVICE_CATEGORY', 197 AVDTP_MULTIPLEXING_SERVICE_CATEGORY: 'AVDTP_MULTIPLEXING_SERVICE_CATEGORY', 198 AVDTP_MEDIA_CODEC_SERVICE_CATEGORY: 'AVDTP_MEDIA_CODEC_SERVICE_CATEGORY', 199 AVDTP_DELAY_REPORTING_SERVICE_CATEGORY: 'AVDTP_DELAY_REPORTING_SERVICE_CATEGORY' 200} 201 202# States (AVDTP spec - 9.1 State Definitions) 203AVDTP_IDLE_STATE = 0x00 204AVDTP_CONFIGURED_STATE = 0x01 205AVDTP_OPEN_STATE = 0x02 206AVDTP_STREAMING_STATE = 0x03 207AVDTP_CLOSING_STATE = 0x04 208AVDTP_ABORTING_STATE = 0x05 209 210AVDTP_STATE_NAMES = { 211 AVDTP_IDLE_STATE: 'AVDTP_IDLE_STATE', 212 AVDTP_CONFIGURED_STATE: 'AVDTP_CONFIGURED_STATE', 213 AVDTP_OPEN_STATE: 'AVDTP_OPEN_STATE', 214 AVDTP_STREAMING_STATE: 'AVDTP_STREAMING_STATE', 215 AVDTP_CLOSING_STATE: 'AVDTP_CLOSING_STATE', 216 AVDTP_ABORTING_STATE: 'AVDTP_ABORTING_STATE' 217} 218 219# fmt: on 220# pylint: enable=line-too-long 221# pylint: disable=invalid-name 222 223 224# ----------------------------------------------------------------------------- 225async def find_avdtp_service_with_sdp_client( 226 sdp_client: sdp.Client, 227) -> Optional[Tuple[int, int]]: 228 ''' 229 Find an AVDTP service, using a connected SDP client, and return its version, 230 or None if none is found 231 ''' 232 233 # Search for services with an Audio Sink service class 234 search_result = await sdp_client.search_attributes( 235 [BT_ADVANCED_AUDIO_DISTRIBUTION_SERVICE], 236 [sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID], 237 ) 238 for attribute_list in search_result: 239 profile_descriptor_list = sdp.ServiceAttribute.find_attribute_in_list( 240 attribute_list, sdp.SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID 241 ) 242 if profile_descriptor_list: 243 for profile_descriptor in profile_descriptor_list.value: 244 if ( 245 profile_descriptor.type == sdp.DataElement.SEQUENCE 246 and len(profile_descriptor.value) >= 2 247 ): 248 avdtp_version_major = profile_descriptor.value[1].value >> 8 249 avdtp_version_minor = profile_descriptor.value[1].value & 0xFF 250 return (avdtp_version_major, avdtp_version_minor) 251 return None 252 253 254# ----------------------------------------------------------------------------- 255async def find_avdtp_service_with_connection( 256 connection: device.Connection, 257) -> Optional[Tuple[int, int]]: 258 ''' 259 Find an AVDTP service, for a connection, and return its version, 260 or None if none is found 261 ''' 262 263 sdp_client = sdp.Client(connection) 264 await sdp_client.connect() 265 service_version = await find_avdtp_service_with_sdp_client(sdp_client) 266 await sdp_client.disconnect() 267 268 return service_version 269 270 271# ----------------------------------------------------------------------------- 272class RealtimeClock: 273 def now(self) -> float: 274 return time.time() 275 276 async def sleep(self, duration: float) -> None: 277 await asyncio.sleep(duration) 278 279 280# ----------------------------------------------------------------------------- 281class MediaPacket: 282 @staticmethod 283 def from_bytes(data: bytes) -> MediaPacket: 284 version = (data[0] >> 6) & 0x03 285 padding = (data[0] >> 5) & 0x01 286 extension = (data[0] >> 4) & 0x01 287 csrc_count = data[0] & 0x0F 288 marker = (data[1] >> 7) & 0x01 289 payload_type = data[1] & 0x7F 290 sequence_number = struct.unpack_from('>H', data, 2)[0] 291 timestamp = struct.unpack_from('>I', data, 4)[0] 292 ssrc = struct.unpack_from('>I', data, 8)[0] 293 csrc_list = [ 294 struct.unpack_from('>I', data, 12 + i)[0] for i in range(csrc_count) 295 ] 296 payload = data[12 + csrc_count * 4 :] 297 298 return MediaPacket( 299 version, 300 padding, 301 extension, 302 marker, 303 sequence_number, 304 timestamp, 305 ssrc, 306 csrc_list, 307 payload_type, 308 payload, 309 ) 310 311 def __init__( 312 self, 313 version: int, 314 padding: int, 315 extension: int, 316 marker: int, 317 sequence_number: int, 318 timestamp: int, 319 ssrc: int, 320 csrc_list: List[int], 321 payload_type: int, 322 payload: bytes, 323 ) -> None: 324 self.version = version 325 self.padding = padding 326 self.extension = extension 327 self.marker = marker 328 self.sequence_number = sequence_number & 0xFFFF 329 self.timestamp = timestamp & 0xFFFFFFFF 330 self.ssrc = ssrc 331 self.csrc_list = csrc_list 332 self.payload_type = payload_type 333 self.payload = payload 334 335 def __bytes__(self) -> bytes: 336 header = bytes( 337 [ 338 self.version << 6 339 | self.padding << 5 340 | self.extension << 4 341 | len(self.csrc_list), 342 self.marker << 7 | self.payload_type, 343 ] 344 ) + struct.pack( 345 '>HII', 346 self.sequence_number, 347 self.timestamp, 348 self.ssrc, 349 ) 350 for csrc in self.csrc_list: 351 header += struct.pack('>I', csrc) 352 return header + self.payload 353 354 def __str__(self) -> str: 355 return ( 356 f'RTP(v={self.version},' 357 f'p={self.padding},' 358 f'x={self.extension},' 359 f'm={self.marker},' 360 f'pt={self.payload_type},' 361 f'sn={self.sequence_number},' 362 f'ts={self.timestamp},' 363 f'ssrc={self.ssrc},' 364 f'csrcs={self.csrc_list},' 365 f'payload_size={len(self.payload)})' 366 ) 367 368 369# ----------------------------------------------------------------------------- 370class MediaPacketPump: 371 pump_task: Optional[asyncio.Task] 372 373 def __init__( 374 self, packets: AsyncGenerator, clock: RealtimeClock = RealtimeClock() 375 ) -> None: 376 self.packets = packets 377 self.clock = clock 378 self.pump_task = None 379 380 async def start(self, rtp_channel: l2cap.ClassicChannel) -> None: 381 async def pump_packets(): 382 start_time = 0 383 start_timestamp = 0 384 385 try: 386 logger.debug('pump starting') 387 async for packet in self.packets: 388 # Capture the timestamp of the first packet 389 if start_time == 0: 390 start_time = self.clock.now() 391 start_timestamp = packet.timestamp_seconds 392 393 # Wait until we can send 394 when = start_time + (packet.timestamp_seconds - start_timestamp) 395 now = self.clock.now() 396 if when > now: 397 delay = when - now 398 logger.debug(f'waiting for {delay}') 399 await self.clock.sleep(delay) 400 401 # Emit 402 rtp_channel.send_pdu(bytes(packet)) 403 logger.debug( 404 f'{color(">>> sending RTP packet:", "green")} {packet}' 405 ) 406 except asyncio.exceptions.CancelledError: 407 logger.debug('pump canceled') 408 409 # Pump packets 410 self.pump_task = asyncio.create_task(pump_packets()) 411 412 async def stop(self) -> None: 413 # Stop the pump 414 if self.pump_task: 415 self.pump_task.cancel() 416 await self.pump_task 417 self.pump_task = None 418 419 420# ----------------------------------------------------------------------------- 421class MessageAssembler: 422 message: Optional[bytes] 423 424 def __init__(self, callback: Callable[[int, Message], Any]) -> None: 425 self.callback = callback 426 self.reset() 427 428 def reset(self) -> None: 429 self.transaction_label = 0 430 self.message = None 431 self.message_type = Message.MessageType.COMMAND 432 self.signal_identifier = 0 433 self.number_of_signal_packets = 0 434 self.packet_count = 0 435 436 def on_pdu(self, pdu: bytes) -> None: 437 self.packet_count += 1 438 439 transaction_label = pdu[0] >> 4 440 packet_type = Protocol.PacketType((pdu[0] >> 2) & 3) 441 message_type = Message.MessageType(pdu[0] & 3) 442 443 logger.debug( 444 f'transaction_label={transaction_label}, ' 445 f'packet_type={packet_type.name}, ' 446 f'message_type={message_type.name}' 447 ) 448 if packet_type in ( 449 Protocol.PacketType.SINGLE_PACKET, 450 Protocol.PacketType.START_PACKET, 451 ): 452 if self.message is not None: 453 # The previous message has not been terminated 454 logger.warning( 455 'received a start or single packet when expecting an end or ' 456 'continuation' 457 ) 458 self.reset() 459 460 self.transaction_label = transaction_label 461 self.signal_identifier = pdu[1] & 0x3F 462 self.message_type = message_type 463 464 if packet_type == Protocol.PacketType.SINGLE_PACKET: 465 self.message = pdu[2:] 466 self.on_message_complete() 467 else: 468 self.number_of_signal_packets = pdu[2] 469 self.message = pdu[3:] 470 elif packet_type in ( 471 Protocol.PacketType.CONTINUE_PACKET, 472 Protocol.PacketType.END_PACKET, 473 ): 474 if self.packet_count == 0: 475 logger.warning('unexpected continuation') 476 return 477 478 if transaction_label != self.transaction_label: 479 logger.warning( 480 f'transaction label mismatch: expected {self.transaction_label}, ' 481 f'received {transaction_label}' 482 ) 483 return 484 485 if message_type != self.message_type: 486 logger.warning( 487 f'message type mismatch: expected {self.message_type}, ' 488 f'received {message_type}' 489 ) 490 return 491 492 self.message = (self.message or b'') + pdu[1:] 493 494 if packet_type == Protocol.PacketType.END_PACKET: 495 if self.packet_count != self.number_of_signal_packets: 496 logger.warning( 497 'incomplete fragmented message: ' 498 f'expected {self.number_of_signal_packets} packets, ' 499 f'received {self.packet_count}' 500 ) 501 self.reset() 502 return 503 504 self.on_message_complete() 505 else: 506 if self.packet_count > self.number_of_signal_packets: 507 logger.warning( 508 'too many packets: ' 509 f'expected {self.number_of_signal_packets}, ' 510 f'received {self.packet_count}' 511 ) 512 self.reset() 513 return 514 515 def on_message_complete(self) -> None: 516 message = Message.create( 517 self.signal_identifier, self.message_type, self.message or b'' 518 ) 519 try: 520 self.callback(self.transaction_label, message) 521 except Exception as error: 522 logger.exception(color(f'!!! exception in callback: {error}', 'red')) 523 524 self.reset() 525 526 527# ----------------------------------------------------------------------------- 528class ServiceCapabilities: 529 @staticmethod 530 def create( 531 service_category: int, service_capabilities_bytes: bytes 532 ) -> ServiceCapabilities: 533 # Select the appropriate subclass 534 cls: Type[ServiceCapabilities] 535 if service_category == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY: 536 cls = MediaCodecCapabilities 537 else: 538 cls = ServiceCapabilities 539 540 # Create an instance and initialize it 541 instance = cls.__new__(cls) 542 instance.service_category = service_category 543 instance.service_capabilities_bytes = service_capabilities_bytes 544 instance.init_from_bytes() 545 546 return instance 547 548 @staticmethod 549 def parse_capabilities(payload: bytes) -> List[ServiceCapabilities]: 550 capabilities = [] 551 while payload: 552 service_category = payload[0] 553 length_of_service_capabilities = payload[1] 554 service_capabilities_bytes = payload[2 : 2 + length_of_service_capabilities] 555 capabilities.append( 556 ServiceCapabilities.create(service_category, service_capabilities_bytes) 557 ) 558 559 payload = payload[2 + length_of_service_capabilities :] 560 561 return capabilities 562 563 @staticmethod 564 def serialize_capabilities(capabilities: Iterable[ServiceCapabilities]) -> bytes: 565 serialized = b'' 566 for item in capabilities: 567 serialized += ( 568 bytes([item.service_category, len(item.service_capabilities_bytes)]) 569 + item.service_capabilities_bytes 570 ) 571 return serialized 572 573 def init_from_bytes(self) -> None: 574 pass 575 576 def __init__( 577 self, service_category: int, service_capabilities_bytes: bytes = b'' 578 ) -> None: 579 self.service_category = service_category 580 self.service_capabilities_bytes = service_capabilities_bytes 581 582 def to_string(self, details: List[str] = []) -> str: 583 attributes = ','.join( 584 [name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)] 585 + details 586 ) 587 return f'ServiceCapabilities({attributes})' 588 589 def __str__(self) -> str: 590 if self.service_capabilities_bytes: 591 details = [self.service_capabilities_bytes.hex()] 592 else: 593 details = [] 594 return self.to_string(details) 595 596 597# ----------------------------------------------------------------------------- 598class MediaCodecCapabilities(ServiceCapabilities): 599 media_codec_information: Union[bytes, SupportsBytes] 600 media_type: int 601 media_codec_type: int 602 603 def init_from_bytes(self) -> None: 604 self.media_type = self.service_capabilities_bytes[0] 605 self.media_codec_type = self.service_capabilities_bytes[1] 606 self.media_codec_information = self.service_capabilities_bytes[2:] 607 608 if self.media_codec_type == A2DP_SBC_CODEC_TYPE: 609 self.media_codec_information = SbcMediaCodecInformation.from_bytes( 610 self.media_codec_information 611 ) 612 elif self.media_codec_type == A2DP_MPEG_2_4_AAC_CODEC_TYPE: 613 self.media_codec_information = AacMediaCodecInformation.from_bytes( 614 self.media_codec_information 615 ) 616 elif self.media_codec_type == A2DP_NON_A2DP_CODEC_TYPE: 617 self.media_codec_information = ( 618 VendorSpecificMediaCodecInformation.from_bytes( 619 self.media_codec_information 620 ) 621 ) 622 623 def __init__( 624 self, 625 media_type: int, 626 media_codec_type: int, 627 media_codec_information: Union[bytes, SupportsBytes], 628 ) -> None: 629 super().__init__( 630 AVDTP_MEDIA_CODEC_SERVICE_CATEGORY, 631 bytes([media_type, media_codec_type]) + bytes(media_codec_information), 632 ) 633 self.media_type = media_type 634 self.media_codec_type = media_codec_type 635 self.media_codec_information = media_codec_information 636 637 def __str__(self) -> str: 638 codec_info = ( 639 self.media_codec_information.hex() 640 if isinstance(self.media_codec_information, bytes) 641 else str(self.media_codec_information) 642 ) 643 644 details = [ 645 f'media_type={name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}', 646 f'codec={name_or_number(A2DP_CODEC_TYPE_NAMES, self.media_codec_type)}', 647 f'codec_info={codec_info}', 648 ] 649 return self.to_string(details) 650 651 652# ----------------------------------------------------------------------------- 653class EndPointInfo: 654 @staticmethod 655 def from_bytes(payload: bytes) -> EndPointInfo: 656 return EndPointInfo( 657 payload[0] >> 2, payload[0] >> 1 & 1, payload[1] >> 4, payload[1] >> 3 & 1 658 ) 659 660 def __bytes__(self) -> bytes: 661 return bytes( 662 [self.seid << 2 | self.in_use << 1, self.media_type << 4 | self.tsep << 3] 663 ) 664 665 def __init__(self, seid: int, in_use: int, media_type: int, tsep: int) -> None: 666 self.seid = seid 667 self.in_use = in_use 668 self.media_type = media_type 669 self.tsep = tsep 670 671 672# ----------------------------------------------------------------------------- 673class Message: # pylint:disable=attribute-defined-outside-init 674 class MessageType(enum.IntEnum): 675 COMMAND = 0 676 GENERAL_REJECT = 1 677 RESPONSE_ACCEPT = 2 678 RESPONSE_REJECT = 3 679 680 # Subclasses, by signal identifier and message type 681 subclasses: Dict[int, Dict[int, Type[Message]]] = {} 682 message_type: MessageType 683 signal_identifier: int 684 685 @staticmethod 686 def subclass(subclass): 687 # Infer the signal identifier and message subtype from the class name 688 name = subclass.__name__ 689 if name == 'General_Reject': 690 subclass.signal_identifier = 0 691 signal_identifier_str = None 692 message_type = Message.MessageType.COMMAND 693 elif name.endswith('_Command'): 694 signal_identifier_str = name[:-8] 695 message_type = Message.MessageType.COMMAND 696 elif name.endswith('_Response'): 697 signal_identifier_str = name[:-9] 698 message_type = Message.MessageType.RESPONSE_ACCEPT 699 elif name.endswith('_Reject'): 700 signal_identifier_str = name[:-7] 701 message_type = Message.MessageType.RESPONSE_REJECT 702 else: 703 raise ValueError('invalid class name') 704 705 subclass.message_type = message_type 706 707 if signal_identifier_str is not None: 708 for name, signal_identifier in AVDTP_SIGNAL_IDENTIFIERS.items(): 709 if name.lower().endswith(signal_identifier_str.lower()): 710 subclass.signal_identifier = signal_identifier 711 break 712 713 # Register the subclass 714 Message.subclasses.setdefault(subclass.signal_identifier, {})[ 715 subclass.message_type 716 ] = subclass 717 718 return subclass 719 720 # Factory method to create a subclass based on the signal identifier and message 721 # type 722 @staticmethod 723 def create( 724 signal_identifier: int, message_type: MessageType, payload: bytes 725 ) -> Message: 726 # Look for a registered subclass 727 subclasses = Message.subclasses.get(signal_identifier) 728 if subclasses: 729 subclass = subclasses.get(message_type) 730 if subclass: 731 instance = subclass.__new__(subclass) 732 instance.payload = payload 733 instance.init_from_payload() 734 return instance 735 736 # Instantiate the appropriate class based on the message type 737 if message_type == Message.MessageType.RESPONSE_REJECT: 738 # Assume a simple reject message 739 instance = Simple_Reject(payload) 740 instance.init_from_payload() 741 else: 742 instance = Message(payload) 743 instance.signal_identifier = signal_identifier 744 instance.message_type = message_type 745 return instance 746 747 def init_from_payload(self) -> None: 748 pass 749 750 def __init__(self, payload: bytes = b'') -> None: 751 self.payload = payload 752 753 def to_string(self, details: Union[str, Iterable[str]]) -> str: 754 base = color( 755 f'{name_or_number(AVDTP_SIGNAL_NAMES, self.signal_identifier)}_' 756 f'{self.message_type.name}', 757 'yellow', 758 ) 759 760 if details: 761 if isinstance(details, str): 762 return f'{base}: {details}' 763 764 return ( 765 base 766 + ':\n' 767 + '\n'.join([' ' + color(detail, 'cyan') for detail in details]) 768 ) 769 770 return base 771 772 def __str__(self) -> str: 773 return self.to_string(self.payload.hex()) 774 775 776# ----------------------------------------------------------------------------- 777class Simple_Command(Message): 778 ''' 779 Command message with just one seid 780 ''' 781 782 def init_from_payload(self): 783 self.acp_seid = self.payload[0] >> 2 784 785 def __init__(self, seid): 786 super().__init__(payload=bytes([seid << 2])) 787 self.acp_seid = seid 788 789 def __str__(self) -> str: 790 return self.to_string([f'ACP SEID: {self.acp_seid}']) 791 792 793# ----------------------------------------------------------------------------- 794class Simple_Reject(Message): 795 ''' 796 Reject messages with just an error code 797 ''' 798 799 def init_from_payload(self): 800 self.error_code = self.payload[0] 801 802 def __init__(self, error_code): 803 super().__init__(payload=bytes([error_code])) 804 self.error_code = error_code 805 806 def __str__(self) -> str: 807 details = [f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}'] 808 return self.to_string(details) 809 810 811# ----------------------------------------------------------------------------- 812@Message.subclass 813class Discover_Command(Message): 814 ''' 815 See Bluetooth AVDTP spec - 8.6.1 Stream End Point Discovery Command 816 ''' 817 818 819# ----------------------------------------------------------------------------- 820@Message.subclass 821class Discover_Response(Message): 822 ''' 823 See Bluetooth AVDTP spec - 8.6.2 Stream End Point Discovery Response 824 ''' 825 826 endpoints: List[EndPointInfo] 827 828 def init_from_payload(self): 829 self.endpoints = [] 830 endpoint_count = len(self.payload) // 2 831 for i in range(endpoint_count): 832 self.endpoints.append( 833 EndPointInfo.from_bytes(self.payload[i * 2 : (i + 1) * 2]) 834 ) 835 836 def __init__(self, endpoints): 837 super().__init__(payload=b''.join([bytes(endpoint) for endpoint in endpoints])) 838 self.endpoints = endpoints 839 840 def __str__(self) -> str: 841 details = [] 842 for endpoint in self.endpoints: 843 details.extend( 844 # pylint: disable=line-too-long 845 [ 846 f'ACP SEID: {endpoint.seid}', 847 f' in_use: {endpoint.in_use}', 848 f' media_type: {name_or_number(AVDTP_MEDIA_TYPE_NAMES, endpoint.media_type)}', 849 f' tsep: {name_or_number(AVDTP_TSEP_NAMES, endpoint.tsep)}', 850 ] 851 ) 852 return self.to_string(details) 853 854 855# ----------------------------------------------------------------------------- 856@Message.subclass 857class Get_Capabilities_Command(Simple_Command): 858 ''' 859 See Bluetooth AVDTP spec - 8.7.1 Get Capabilities Command 860 ''' 861 862 863# ----------------------------------------------------------------------------- 864@Message.subclass 865class Get_Capabilities_Response(Message): 866 ''' 867 See Bluetooth AVDTP spec - 8.7.2 Get All Capabilities Response 868 ''' 869 870 def init_from_payload(self): 871 self.capabilities = ServiceCapabilities.parse_capabilities(self.payload) 872 873 def __init__(self, capabilities): 874 super().__init__( 875 payload=ServiceCapabilities.serialize_capabilities(capabilities) 876 ) 877 self.capabilities = capabilities 878 879 def __str__(self) -> str: 880 details = [str(capability) for capability in self.capabilities] 881 return self.to_string(details) 882 883 884# ----------------------------------------------------------------------------- 885@Message.subclass 886class Get_Capabilities_Reject(Simple_Reject): 887 ''' 888 See Bluetooth AVDTP spec - 8.7.3 Get Capabilities Reject 889 ''' 890 891 892# ----------------------------------------------------------------------------- 893@Message.subclass 894class Get_All_Capabilities_Command(Get_Capabilities_Command): 895 ''' 896 See Bluetooth AVDTP spec - 8.8.1 Get All Capabilities Command 897 ''' 898 899 900# ----------------------------------------------------------------------------- 901@Message.subclass 902class Get_All_Capabilities_Response(Get_Capabilities_Response): 903 ''' 904 See Bluetooth AVDTP spec - 8.8.2 Get All Capabilities Response 905 ''' 906 907 908# ----------------------------------------------------------------------------- 909@Message.subclass 910class Get_All_Capabilities_Reject(Simple_Reject): 911 ''' 912 See Bluetooth AVDTP spec - 8.8.3 Get All Capabilities Reject 913 ''' 914 915 916# ----------------------------------------------------------------------------- 917@Message.subclass 918class Set_Configuration_Command(Message): 919 ''' 920 See Bluetooth AVDTP spec - 8.9.1 Set Configuration Command 921 ''' 922 923 def init_from_payload(self): 924 self.acp_seid = self.payload[0] >> 2 925 self.int_seid = self.payload[1] >> 2 926 self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[2:]) 927 928 def __init__( 929 self, acp_seid: int, int_seid: int, capabilities: Iterable[ServiceCapabilities] 930 ) -> None: 931 super().__init__( 932 payload=bytes([acp_seid << 2, int_seid << 2]) 933 + ServiceCapabilities.serialize_capabilities(capabilities) 934 ) 935 self.acp_seid = acp_seid 936 self.int_seid = int_seid 937 self.capabilities = capabilities 938 939 def __str__(self) -> str: 940 details = [f'ACP SEID: {self.acp_seid}', f'INT SEID: {self.int_seid}'] + [ 941 str(capability) for capability in self.capabilities 942 ] 943 return self.to_string(details) 944 945 946# ----------------------------------------------------------------------------- 947@Message.subclass 948class Set_Configuration_Response(Message): 949 ''' 950 See Bluetooth AVDTP spec - 8.9.2 Set Configuration Response 951 ''' 952 953 954# ----------------------------------------------------------------------------- 955@Message.subclass 956class Set_Configuration_Reject(Message): 957 ''' 958 See Bluetooth AVDTP spec - 8.9.3 Set Configuration Reject 959 ''' 960 961 def init_from_payload(self): 962 self.service_category = self.payload[0] 963 self.error_code = self.payload[1] 964 965 def __init__(self, service_category, error_code): 966 super().__init__(payload=bytes([service_category, error_code])) 967 self.service_category = service_category 968 self.error_code = error_code 969 970 def __str__(self) -> str: 971 details = [ 972 ( 973 'service_category: ' 974 f'{name_or_number(AVDTP_SERVICE_CATEGORY_NAMES, self.service_category)}' 975 ), 976 ( 977 'error_code: ' 978 f'{name_or_number(AVDTP_ERROR_NAMES, self.error_code)}' 979 ), 980 ] 981 return self.to_string(details) 982 983 984# ----------------------------------------------------------------------------- 985@Message.subclass 986class Get_Configuration_Command(Simple_Command): 987 ''' 988 See Bluetooth AVDTP spec - 8.10.1 Get Configuration Command 989 ''' 990 991 992# ----------------------------------------------------------------------------- 993@Message.subclass 994class Get_Configuration_Response(Message): 995 ''' 996 See Bluetooth AVDTP spec - 8.10.2 Get Configuration Response 997 ''' 998 999 def init_from_payload(self): 1000 self.capabilities = ServiceCapabilities.parse_capabilities(self.payload) 1001 1002 def __init__(self, capabilities: Iterable[ServiceCapabilities]) -> None: 1003 super().__init__( 1004 payload=ServiceCapabilities.serialize_capabilities(capabilities) 1005 ) 1006 self.capabilities = capabilities 1007 1008 def __str__(self) -> str: 1009 details = [str(capability) for capability in self.capabilities] 1010 return self.to_string(details) 1011 1012 1013# ----------------------------------------------------------------------------- 1014@Message.subclass 1015class Get_Configuration_Reject(Simple_Reject): 1016 ''' 1017 See Bluetooth AVDTP spec - 8.10.3 Get Configuration Reject 1018 ''' 1019 1020 1021# ----------------------------------------------------------------------------- 1022@Message.subclass 1023class Reconfigure_Command(Message): 1024 ''' 1025 See Bluetooth AVDTP spec - 8.11.1 Reconfigure Command 1026 ''' 1027 1028 def init_from_payload(self): 1029 # pylint: disable=attribute-defined-outside-init 1030 self.acp_seid = self.payload[0] >> 2 1031 self.capabilities = ServiceCapabilities.parse_capabilities(self.payload[1:]) 1032 1033 def __str__(self) -> str: 1034 details = [ 1035 f'ACP SEID: {self.acp_seid}', 1036 ] + [str(capability) for capability in self.capabilities] 1037 return self.to_string(details) 1038 1039 1040# ----------------------------------------------------------------------------- 1041@Message.subclass 1042class Reconfigure_Response(Message): 1043 ''' 1044 See Bluetooth AVDTP spec - 8.11.2 Reconfigure Response 1045 ''' 1046 1047 1048# ----------------------------------------------------------------------------- 1049@Message.subclass 1050class Reconfigure_Reject(Set_Configuration_Reject): 1051 ''' 1052 See Bluetooth AVDTP spec - 8.11.3 Reconfigure Reject 1053 ''' 1054 1055 1056# ----------------------------------------------------------------------------- 1057@Message.subclass 1058class Open_Command(Simple_Command): 1059 ''' 1060 See Bluetooth AVDTP spec - 8.12.1 Open Stream Command 1061 ''' 1062 1063 1064# ----------------------------------------------------------------------------- 1065@Message.subclass 1066class Open_Response(Message): 1067 ''' 1068 See Bluetooth AVDTP spec - 8.12.2 Open Stream Response 1069 ''' 1070 1071 1072# ----------------------------------------------------------------------------- 1073@Message.subclass 1074class Open_Reject(Simple_Reject): 1075 ''' 1076 See Bluetooth AVDTP spec - 8.12.3 Open Stream Reject 1077 ''' 1078 1079 1080# ----------------------------------------------------------------------------- 1081@Message.subclass 1082class Start_Command(Message): 1083 ''' 1084 See Bluetooth AVDTP spec - 8.13.1 Start Stream Command 1085 ''' 1086 1087 def init_from_payload(self): 1088 self.acp_seids = [x >> 2 for x in self.payload] 1089 1090 def __init__(self, seids: Iterable[int]) -> None: 1091 super().__init__(payload=bytes([seid << 2 for seid in seids])) 1092 self.acp_seids = seids 1093 1094 def __str__(self) -> str: 1095 return self.to_string([f'ACP SEIDs: {self.acp_seids}']) 1096 1097 1098# ----------------------------------------------------------------------------- 1099@Message.subclass 1100class Start_Response(Message): 1101 ''' 1102 See Bluetooth AVDTP spec - 8.13.2 Start Stream Response 1103 ''' 1104 1105 1106# ----------------------------------------------------------------------------- 1107@Message.subclass 1108class Start_Reject(Message): 1109 ''' 1110 See Bluetooth AVDTP spec - 8.13.3 Set Configuration Reject 1111 ''' 1112 1113 def init_from_payload(self): 1114 self.acp_seid = self.payload[0] >> 2 1115 self.error_code = self.payload[1] 1116 1117 def __init__(self, acp_seid, error_code): 1118 super().__init__(payload=bytes([acp_seid << 2, error_code])) 1119 self.acp_seid = acp_seid 1120 self.error_code = error_code 1121 1122 def __str__(self) -> str: 1123 details = [ 1124 f'acp_seid: {self.acp_seid}', 1125 f'error_code: {name_or_number(AVDTP_ERROR_NAMES, self.error_code)}', 1126 ] 1127 return self.to_string(details) 1128 1129 1130# ----------------------------------------------------------------------------- 1131@Message.subclass 1132class Close_Command(Simple_Command): 1133 ''' 1134 See Bluetooth AVDTP spec - 8.14.1 Close Stream Command 1135 ''' 1136 1137 1138# ----------------------------------------------------------------------------- 1139@Message.subclass 1140class Close_Response(Message): 1141 ''' 1142 See Bluetooth AVDTP spec - 8.14.2 Close Stream Response 1143 ''' 1144 1145 1146# ----------------------------------------------------------------------------- 1147@Message.subclass 1148class Close_Reject(Simple_Reject): 1149 ''' 1150 See Bluetooth AVDTP spec - 8.14.3 Close Stream Reject 1151 ''' 1152 1153 1154# ----------------------------------------------------------------------------- 1155@Message.subclass 1156class Suspend_Command(Start_Command): 1157 ''' 1158 See Bluetooth AVDTP spec - 8.15.1 Suspend Command 1159 ''' 1160 1161 1162# ----------------------------------------------------------------------------- 1163@Message.subclass 1164class Suspend_Response(Message): 1165 ''' 1166 See Bluetooth AVDTP spec - 8.15.2 Suspend Response 1167 ''' 1168 1169 1170# ----------------------------------------------------------------------------- 1171@Message.subclass 1172class Suspend_Reject(Start_Reject): 1173 ''' 1174 See Bluetooth AVDTP spec - 8.15.3 Suspend Reject 1175 ''' 1176 1177 1178# ----------------------------------------------------------------------------- 1179@Message.subclass 1180class Abort_Command(Simple_Command): 1181 ''' 1182 See Bluetooth AVDTP spec - 8.16.1 Abort Command 1183 ''' 1184 1185 1186# ----------------------------------------------------------------------------- 1187@Message.subclass 1188class Abort_Response(Message): 1189 ''' 1190 See Bluetooth AVDTP spec - 8.16.2 Abort Response 1191 ''' 1192 1193 1194# ----------------------------------------------------------------------------- 1195@Message.subclass 1196class Security_Control_Command(Message): 1197 ''' 1198 See Bluetooth AVDTP spec - 8.17.1 Security Control Command 1199 ''' 1200 1201 1202# ----------------------------------------------------------------------------- 1203@Message.subclass 1204class Security_Control_Response(Message): 1205 ''' 1206 See Bluetooth AVDTP spec - 8.17.2 Security Control Response 1207 ''' 1208 1209 1210# ----------------------------------------------------------------------------- 1211@Message.subclass 1212class Security_Control_Reject(Simple_Reject): 1213 ''' 1214 See Bluetooth AVDTP spec - 8.17.3 Security Control Reject 1215 ''' 1216 1217 1218# ----------------------------------------------------------------------------- 1219@Message.subclass 1220class General_Reject(Message): 1221 ''' 1222 See Bluetooth AVDTP spec - 8.18 General Reject 1223 ''' 1224 1225 def to_string(self, details): 1226 return color('GENERAL_REJECT', 'yellow') 1227 1228 1229# ----------------------------------------------------------------------------- 1230@Message.subclass 1231class DelayReport_Command(Message): 1232 ''' 1233 See Bluetooth AVDTP spec - 8.19.1 Delay Report Command 1234 ''' 1235 1236 def init_from_payload(self): 1237 # pylint: disable=attribute-defined-outside-init 1238 self.acp_seid = self.payload[0] >> 2 1239 self.delay = (self.payload[1] << 8) | (self.payload[2]) 1240 1241 def __str__(self) -> str: 1242 return self.to_string([f'ACP_SEID: {self.acp_seid}', f'delay: {self.delay}']) 1243 1244 1245# ----------------------------------------------------------------------------- 1246@Message.subclass 1247class DelayReport_Response(Message): 1248 ''' 1249 See Bluetooth AVDTP spec - 8.19.2 Delay Report Response 1250 ''' 1251 1252 1253# ----------------------------------------------------------------------------- 1254@Message.subclass 1255class DelayReport_Reject(Simple_Reject): 1256 ''' 1257 See Bluetooth AVDTP spec - 8.19.3 Delay Report Reject 1258 ''' 1259 1260 1261# ----------------------------------------------------------------------------- 1262class Protocol(EventEmitter): 1263 local_endpoints: List[LocalStreamEndPoint] 1264 remote_endpoints: Dict[int, DiscoveredStreamEndPoint] 1265 streams: Dict[int, Stream] 1266 transaction_results: List[Optional[asyncio.Future[Message]]] 1267 channel_connector: Callable[[], Awaitable[l2cap.ClassicChannel]] 1268 1269 class PacketType(enum.IntEnum): 1270 SINGLE_PACKET = 0 1271 START_PACKET = 1 1272 CONTINUE_PACKET = 2 1273 END_PACKET = 3 1274 1275 @staticmethod 1276 def packet_type_name(packet_type): 1277 return name_or_number(Protocol.PACKET_TYPE_NAMES, packet_type) 1278 1279 @staticmethod 1280 async def connect( 1281 connection: device.Connection, version: Tuple[int, int] = (1, 3) 1282 ) -> Protocol: 1283 channel = await connection.create_l2cap_channel( 1284 spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM) 1285 ) 1286 protocol = Protocol(channel, version) 1287 1288 return protocol 1289 1290 def __init__( 1291 self, l2cap_channel: l2cap.ClassicChannel, version: Tuple[int, int] = (1, 3) 1292 ) -> None: 1293 super().__init__() 1294 self.l2cap_channel = l2cap_channel 1295 self.version = version 1296 self.rtx_sig_timer = AVDTP_DEFAULT_RTX_SIG_TIMER 1297 self.message_assembler = MessageAssembler(self.on_message) 1298 self.transaction_results = [None] * 16 # Futures for up to 16 transactions 1299 self.transaction_semaphore = asyncio.Semaphore(16) 1300 self.transaction_count = 0 1301 self.channel_acceptor = None 1302 self.local_endpoints = [] # Local endpoints, with contiguous seid values 1303 self.remote_endpoints = {} # Remote stream endpoints, by seid 1304 self.streams = {} # Streams, by seid 1305 1306 # Register to receive PDUs from the channel 1307 l2cap_channel.sink = self.on_pdu 1308 l2cap_channel.on('open', self.on_l2cap_channel_open) 1309 l2cap_channel.on('close', self.on_l2cap_channel_close) 1310 1311 def get_local_endpoint_by_seid(self, seid: int) -> Optional[LocalStreamEndPoint]: 1312 if 0 < seid <= len(self.local_endpoints): 1313 return self.local_endpoints[seid - 1] 1314 1315 return None 1316 1317 def add_source( 1318 self, codec_capabilities: MediaCodecCapabilities, packet_pump: MediaPacketPump 1319 ) -> LocalSource: 1320 seid = len(self.local_endpoints) + 1 1321 source = LocalSource(self, seid, codec_capabilities, packet_pump) 1322 self.local_endpoints.append(source) 1323 1324 return source 1325 1326 def add_sink(self, codec_capabilities: MediaCodecCapabilities) -> LocalSink: 1327 seid = len(self.local_endpoints) + 1 1328 sink = LocalSink(self, seid, codec_capabilities) 1329 self.local_endpoints.append(sink) 1330 1331 return sink 1332 1333 async def create_stream( 1334 self, source: LocalStreamEndPoint, sink: StreamEndPointProxy 1335 ) -> Stream: 1336 # Check that the source isn't already used in a stream 1337 if source.in_use: 1338 raise InvalidStateError('source already in use') 1339 1340 # Create or reuse a new stream to associate the source and the sink 1341 if source.seid in self.streams: 1342 stream = self.streams[source.seid] 1343 else: 1344 stream = Stream(self, source, sink) 1345 self.streams[source.seid] = stream 1346 1347 # The stream can now be configured 1348 await stream.configure() 1349 1350 return stream 1351 1352 async def discover_remote_endpoints(self) -> Iterable[DiscoveredStreamEndPoint]: 1353 self.remote_endpoints = {} 1354 1355 response: Discover_Response = await self.send_command(Discover_Command()) 1356 for endpoint_entry in response.endpoints: 1357 logger.debug( 1358 f'getting endpoint capabilities for endpoint {endpoint_entry.seid}' 1359 ) 1360 get_capabilities_response = await self.get_capabilities(endpoint_entry.seid) 1361 endpoint = DiscoveredStreamEndPoint( 1362 self, 1363 endpoint_entry.seid, 1364 endpoint_entry.media_type, 1365 endpoint_entry.tsep, 1366 endpoint_entry.in_use, 1367 get_capabilities_response.capabilities, 1368 ) 1369 self.remote_endpoints[endpoint_entry.seid] = endpoint 1370 1371 return self.remote_endpoints.values() 1372 1373 def find_remote_sink_by_codec( 1374 self, media_type: int, codec_type: int 1375 ) -> Optional[DiscoveredStreamEndPoint]: 1376 for endpoint in self.remote_endpoints.values(): 1377 if ( 1378 not endpoint.in_use 1379 and endpoint.media_type == media_type 1380 and endpoint.tsep == AVDTP_TSEP_SNK 1381 ): 1382 has_media_transport = False 1383 has_codec = False 1384 for capabilities in endpoint.capabilities: 1385 if ( 1386 capabilities.service_category 1387 == AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY 1388 ): 1389 has_media_transport = True 1390 elif ( 1391 capabilities.service_category 1392 == AVDTP_MEDIA_CODEC_SERVICE_CATEGORY 1393 ): 1394 codec_capabilities = cast(MediaCodecCapabilities, capabilities) 1395 if ( 1396 codec_capabilities.media_type == AVDTP_AUDIO_MEDIA_TYPE 1397 and codec_capabilities.media_codec_type == codec_type 1398 ): 1399 has_codec = True 1400 if has_media_transport and has_codec: 1401 return endpoint 1402 1403 return None 1404 1405 def on_pdu(self, pdu: bytes) -> None: 1406 self.message_assembler.on_pdu(pdu) 1407 1408 def on_message(self, transaction_label: int, message: Message) -> None: 1409 logger.debug( 1410 f'{color("<<< Received AVDTP message", "magenta")}: ' 1411 f'[{transaction_label}] {message}' 1412 ) 1413 1414 # Check that the identifier is not reserved 1415 if message.signal_identifier == 0: 1416 logger.warning('!!! reserved signal identifier') 1417 return 1418 1419 # Check that the identifier is valid 1420 if ( 1421 message.signal_identifier < 0 1422 or message.signal_identifier > AVDTP_DELAYREPORT 1423 ): 1424 logger.warning('!!! invalid signal identifier') 1425 self.send_message(transaction_label, General_Reject()) 1426 1427 if message.message_type == Message.MessageType.COMMAND: 1428 # Command 1429 signal_name = ( 1430 AVDTP_SIGNAL_NAMES.get(message.signal_identifier, "") 1431 .replace("AVDTP_", "") 1432 .lower() 1433 ) 1434 handler_name = f'on_{signal_name}_command' 1435 handler = getattr(self, handler_name, None) 1436 if handler: 1437 try: 1438 response = handler(message) 1439 self.send_message(transaction_label, response) 1440 except Exception as error: 1441 logger.warning( 1442 f'{color("!!! Exception in handler:", "red")} {error}' 1443 ) 1444 else: 1445 logger.warning('unhandled command') 1446 else: 1447 # Response, look for a pending transaction with the same label 1448 transaction_result = self.transaction_results[transaction_label] 1449 if transaction_result is None: 1450 logger.warning(color('!!! no pending transaction for label', 'red')) 1451 return 1452 1453 transaction_result.set_result(message) 1454 self.transaction_results[transaction_label] = None 1455 self.transaction_semaphore.release() 1456 1457 def on_l2cap_connection(self, channel): 1458 # Forward the channel to the endpoint that's expecting it 1459 if self.channel_acceptor is None: 1460 logger.warning(color('!!! l2cap connection with no acceptor', 'red')) 1461 return 1462 self.channel_acceptor.on_l2cap_connection(channel) 1463 1464 def on_l2cap_channel_open(self): 1465 logger.debug(color('<<< L2CAP channel open', 'magenta')) 1466 self.emit('open') 1467 1468 def on_l2cap_channel_close(self): 1469 logger.debug(color('<<< L2CAP channel close', 'magenta')) 1470 self.emit('close') 1471 1472 def send_message(self, transaction_label: int, message: Message) -> None: 1473 logger.debug( 1474 f'{color(">>> Sending AVDTP message", "magenta")}: ' 1475 f'[{transaction_label}] {message}' 1476 ) 1477 max_fragment_size = ( 1478 self.l2cap_channel.peer_mtu - 3 1479 ) # Enough space for a 3-byte start packet header 1480 payload = message.payload 1481 if len(payload) + 2 <= self.l2cap_channel.peer_mtu: 1482 # Fits in a single packet 1483 packet_type = self.PacketType.SINGLE_PACKET 1484 else: 1485 packet_type = self.PacketType.START_PACKET 1486 1487 done = False 1488 while not done: 1489 first_header_byte = ( 1490 transaction_label << 4 | packet_type << 2 | message.message_type 1491 ) 1492 1493 if packet_type == self.PacketType.SINGLE_PACKET: 1494 header = bytes([first_header_byte, message.signal_identifier]) 1495 elif packet_type == self.PacketType.START_PACKET: 1496 packet_count = ( 1497 max_fragment_size - 1 + len(payload) 1498 ) // max_fragment_size 1499 header = bytes( 1500 [first_header_byte, message.signal_identifier, packet_count] 1501 ) 1502 else: 1503 header = bytes([first_header_byte]) 1504 1505 # Send one packet 1506 self.l2cap_channel.send_pdu(header + payload[:max_fragment_size]) 1507 1508 # Prepare for the next packet 1509 payload = payload[max_fragment_size:] 1510 if payload: 1511 packet_type = ( 1512 self.PacketType.CONTINUE_PACKET 1513 if len(payload) > max_fragment_size 1514 else self.PacketType.END_PACKET 1515 ) 1516 else: 1517 done = True 1518 1519 async def send_command(self, command: Message): 1520 # TODO: support timeouts 1521 # Send the command 1522 (transaction_label, transaction_result) = await self.start_transaction() 1523 self.send_message(transaction_label, command) 1524 1525 # Wait for the response 1526 response = await transaction_result 1527 1528 # Check for errors 1529 if response.message_type in ( 1530 Message.MessageType.GENERAL_REJECT, 1531 Message.MessageType.RESPONSE_REJECT, 1532 ): 1533 assert hasattr(response, 'error_code') 1534 raise ProtocolError(response.error_code, 'avdtp') 1535 1536 return response 1537 1538 async def start_transaction(self) -> Tuple[int, asyncio.Future[Message]]: 1539 # Wait until we can start a new transaction 1540 await self.transaction_semaphore.acquire() 1541 1542 # Look for the next free entry to store the transaction result 1543 for i in range(16): 1544 transaction_label = (self.transaction_count + i) % 16 1545 if self.transaction_results[transaction_label] is None: 1546 transaction_result = asyncio.get_running_loop().create_future() 1547 self.transaction_results[transaction_label] = transaction_result 1548 self.transaction_count += 1 1549 return (transaction_label, transaction_result) 1550 1551 assert False # Should never reach this 1552 1553 async def get_capabilities(self, seid: int) -> Union[ 1554 Get_Capabilities_Response, 1555 Get_All_Capabilities_Response, 1556 ]: 1557 if self.version > (1, 2): 1558 return await self.send_command(Get_All_Capabilities_Command(seid)) 1559 1560 return await self.send_command(Get_Capabilities_Command(seid)) 1561 1562 async def set_configuration( 1563 self, acp_seid: int, int_seid: int, capabilities: Iterable[ServiceCapabilities] 1564 ) -> Set_Configuration_Response: 1565 return await self.send_command( 1566 Set_Configuration_Command(acp_seid, int_seid, capabilities) 1567 ) 1568 1569 async def get_configuration(self, seid: int) -> Get_Configuration_Response: 1570 response = await self.send_command(Get_Configuration_Command(seid)) 1571 return response.capabilities 1572 1573 async def open(self, seid: int) -> Open_Response: 1574 return await self.send_command(Open_Command(seid)) 1575 1576 async def start(self, seids: Iterable[int]) -> Start_Response: 1577 return await self.send_command(Start_Command(seids)) 1578 1579 async def suspend(self, seids: Iterable[int]) -> Suspend_Response: 1580 return await self.send_command(Suspend_Command(seids)) 1581 1582 async def close(self, seid: int) -> Close_Response: 1583 return await self.send_command(Close_Command(seid)) 1584 1585 async def abort(self, seid: int) -> Abort_Response: 1586 return await self.send_command(Abort_Command(seid)) 1587 1588 def on_discover_command(self, _command): 1589 endpoint_infos = [ 1590 EndPointInfo(endpoint.seid, 0, endpoint.media_type, endpoint.tsep) 1591 for endpoint in self.local_endpoints 1592 ] 1593 return Discover_Response(endpoint_infos) 1594 1595 def on_get_capabilities_command(self, command): 1596 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1597 if endpoint is None: 1598 return Get_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1599 1600 return Get_Capabilities_Response(endpoint.capabilities) 1601 1602 def on_get_all_capabilities_command(self, command): 1603 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1604 if endpoint is None: 1605 return Get_All_Capabilities_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1606 1607 return Get_All_Capabilities_Response(endpoint.capabilities) 1608 1609 def on_set_configuration_command(self, command): 1610 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1611 if endpoint is None: 1612 return Set_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1613 1614 # Check that the local endpoint isn't in use 1615 if endpoint.in_use: 1616 return Set_Configuration_Reject(AVDTP_SEP_IN_USE_ERROR) 1617 1618 # Create a stream object for the pair of endpoints 1619 stream = Stream(self, endpoint, StreamEndPointProxy(self, command.int_seid)) 1620 self.streams[command.acp_seid] = stream 1621 1622 result = stream.on_set_configuration_command(command.capabilities) 1623 return result or Set_Configuration_Response() 1624 1625 def on_get_configuration_command(self, command): 1626 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1627 if endpoint is None: 1628 return Get_Configuration_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1629 if endpoint.stream is None: 1630 return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR) 1631 1632 return endpoint.stream.on_get_configuration_command() 1633 1634 def on_reconfigure_command(self, command): 1635 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1636 if endpoint is None: 1637 return Reconfigure_Reject(0, AVDTP_BAD_ACP_SEID_ERROR) 1638 if endpoint.stream is None: 1639 return Reconfigure_Reject(0, AVDTP_BAD_STATE_ERROR) 1640 1641 result = endpoint.stream.on_reconfigure_command(command.capabilities) 1642 return result or Reconfigure_Response() 1643 1644 def on_open_command(self, command): 1645 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1646 if endpoint is None: 1647 return Open_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1648 if endpoint.stream is None: 1649 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1650 1651 result = endpoint.stream.on_open_command() 1652 return result or Open_Response() 1653 1654 def on_start_command(self, command): 1655 for seid in command.acp_seids: 1656 endpoint = self.get_local_endpoint_by_seid(seid) 1657 if endpoint is None: 1658 return Start_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR) 1659 if endpoint.stream is None: 1660 return Start_Reject(AVDTP_BAD_STATE_ERROR) 1661 1662 # Start all streams 1663 # TODO: deal with partial failures 1664 for seid in command.acp_seids: 1665 endpoint = self.get_local_endpoint_by_seid(seid) 1666 result = endpoint.stream.on_start_command() 1667 if result is not None: 1668 return result 1669 1670 return Start_Response() 1671 1672 def on_suspend_command(self, command): 1673 for seid in command.acp_seids: 1674 endpoint = self.get_local_endpoint_by_seid(seid) 1675 if endpoint is None: 1676 return Suspend_Reject(seid, AVDTP_BAD_ACP_SEID_ERROR) 1677 if endpoint.stream is None: 1678 return Suspend_Reject(seid, AVDTP_BAD_STATE_ERROR) 1679 1680 # Suspend all streams 1681 # TODO: deal with partial failures 1682 for seid in command.acp_seids: 1683 endpoint = self.get_local_endpoint_by_seid(seid) 1684 result = endpoint.stream.on_suspend_command() 1685 if result is not None: 1686 return result 1687 1688 return Suspend_Response() 1689 1690 def on_close_command(self, command): 1691 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1692 if endpoint is None: 1693 return Close_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1694 if endpoint.stream is None: 1695 return Close_Reject(AVDTP_BAD_STATE_ERROR) 1696 1697 result = endpoint.stream.on_close_command() 1698 return result or Close_Response() 1699 1700 def on_abort_command(self, command): 1701 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1702 if endpoint is None or endpoint.stream is None: 1703 return Abort_Response() 1704 1705 endpoint.stream.on_abort_command() 1706 return Abort_Response() 1707 1708 def on_security_control_command(self, command): 1709 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1710 if endpoint is None: 1711 return Security_Control_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1712 1713 result = endpoint.on_security_control_command(command.payload) 1714 return result or Security_Control_Response() 1715 1716 def on_delayreport_command(self, command): 1717 endpoint = self.get_local_endpoint_by_seid(command.acp_seid) 1718 if endpoint is None: 1719 return DelayReport_Reject(AVDTP_BAD_ACP_SEID_ERROR) 1720 1721 result = endpoint.on_delayreport_command(command.delay) 1722 return result or DelayReport_Response() 1723 1724 1725# ----------------------------------------------------------------------------- 1726class Listener(EventEmitter): 1727 servers: Dict[int, Protocol] 1728 1729 @staticmethod 1730 def create_registrar(device: device.Device): 1731 warnings.warn("Please use Listener.for_device()", DeprecationWarning) 1732 1733 def wrapper(handler: Callable[[l2cap.ClassicChannel], None]) -> None: 1734 device.create_l2cap_server(l2cap.ClassicChannelSpec(psm=AVDTP_PSM), handler) 1735 1736 return wrapper 1737 1738 def set_server(self, connection: device.Connection, server: Protocol) -> None: 1739 self.servers[connection.handle] = server 1740 1741 def remove_server(self, connection: device.Connection) -> None: 1742 if connection.handle in self.servers: 1743 del self.servers[connection.handle] 1744 1745 def __init__(self, registrar=None, version=(1, 3)): 1746 super().__init__() 1747 self.version = version 1748 self.servers = {} # Servers, by connection handle 1749 1750 # Listen for incoming L2CAP connections 1751 if registrar: 1752 warnings.warn("Please use Listener.for_device()", DeprecationWarning) 1753 registrar(self.on_l2cap_connection) 1754 1755 @classmethod 1756 def for_device( 1757 cls, device: device.Device, version: Tuple[int, int] = (1, 3) 1758 ) -> Listener: 1759 listener = Listener(registrar=None, version=version) 1760 l2cap_server = device.create_l2cap_server( 1761 spec=l2cap.ClassicChannelSpec(psm=AVDTP_PSM) 1762 ) 1763 l2cap_server.on('connection', listener.on_l2cap_connection) 1764 return listener 1765 1766 def on_l2cap_connection(self, channel: l2cap.ClassicChannel) -> None: 1767 logger.debug(f'{color("<<< incoming L2CAP connection:", "magenta")} {channel}') 1768 1769 if channel.connection.handle in self.servers: 1770 # This is a channel for a stream endpoint 1771 server = self.servers[channel.connection.handle] 1772 server.on_l2cap_connection(channel) 1773 else: 1774 # This is a new command/response channel 1775 def on_channel_open(): 1776 logger.debug('setting up new Protocol for the connection') 1777 server = Protocol(channel, self.version) 1778 self.set_server(channel.connection, server) 1779 self.emit('connection', server) 1780 1781 def on_channel_close(): 1782 logger.debug('removing Protocol for the connection') 1783 self.remove_server(channel.connection) 1784 1785 channel.on('open', on_channel_open) 1786 channel.on('close', on_channel_close) 1787 1788 1789# ----------------------------------------------------------------------------- 1790class Stream: 1791 ''' 1792 Pair of a local and a remote stream endpoint that can stream from one to the other 1793 ''' 1794 1795 rtp_channel: Optional[l2cap.ClassicChannel] 1796 1797 @staticmethod 1798 def state_name(state: int) -> str: 1799 return name_or_number(AVDTP_STATE_NAMES, state) 1800 1801 def change_state(self, state: int) -> None: 1802 logger.debug(f'{self} state change -> {color(self.state_name(state), "cyan")}') 1803 self.state = state 1804 1805 def send_media_packet(self, packet: MediaPacket) -> None: 1806 assert self.rtp_channel 1807 self.rtp_channel.send_pdu(bytes(packet)) 1808 1809 async def configure(self) -> None: 1810 if self.state != AVDTP_IDLE_STATE: 1811 raise InvalidStateError('current state is not IDLE') 1812 1813 await self.remote_endpoint.set_configuration( 1814 self.local_endpoint.seid, self.local_endpoint.configuration 1815 ) 1816 self.change_state(AVDTP_CONFIGURED_STATE) 1817 1818 async def open(self) -> None: 1819 if self.state != AVDTP_CONFIGURED_STATE: 1820 raise InvalidStateError('current state is not CONFIGURED') 1821 1822 logger.debug('opening remote endpoint') 1823 await self.remote_endpoint.open() 1824 1825 self.change_state(AVDTP_OPEN_STATE) 1826 1827 # Create a channel for RTP packets 1828 self.rtp_channel = ( 1829 await self.protocol.l2cap_channel.connection.create_l2cap_channel( 1830 l2cap.ClassicChannelSpec(psm=AVDTP_PSM) 1831 ) 1832 ) 1833 1834 async def start(self) -> None: 1835 # Auto-open if needed 1836 if self.state == AVDTP_CONFIGURED_STATE: 1837 await self.open() 1838 1839 if self.state != AVDTP_OPEN_STATE: 1840 raise InvalidStateError('current state is not OPEN') 1841 1842 logger.debug('starting remote endpoint') 1843 await self.remote_endpoint.start() 1844 1845 logger.debug('starting local endpoint') 1846 await self.local_endpoint.start() 1847 1848 self.change_state(AVDTP_STREAMING_STATE) 1849 1850 async def stop(self) -> None: 1851 if self.state != AVDTP_STREAMING_STATE: 1852 raise InvalidStateError('current state is not STREAMING') 1853 1854 logger.debug('stopping local endpoint') 1855 await self.local_endpoint.stop() 1856 1857 logger.debug('stopping remote endpoint') 1858 await self.remote_endpoint.stop() 1859 1860 self.change_state(AVDTP_OPEN_STATE) 1861 1862 async def close(self) -> None: 1863 if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE): 1864 raise InvalidStateError('current state is not OPEN or STREAMING') 1865 1866 logger.debug('closing local endpoint') 1867 await self.local_endpoint.close() 1868 1869 logger.debug('closing remote endpoint') 1870 await self.remote_endpoint.close() 1871 1872 # Release any channels we may have created 1873 self.change_state(AVDTP_CLOSING_STATE) 1874 if self.rtp_channel: 1875 await self.rtp_channel.disconnect() 1876 self.rtp_channel = None 1877 1878 # Release the endpoint 1879 self.local_endpoint.in_use = 0 1880 1881 self.change_state(AVDTP_IDLE_STATE) 1882 1883 def on_set_configuration_command(self, configuration): 1884 if self.state != AVDTP_IDLE_STATE: 1885 return Set_Configuration_Reject(AVDTP_BAD_STATE_ERROR) 1886 1887 result = self.local_endpoint.on_set_configuration_command(configuration) 1888 if result is not None: 1889 return result 1890 1891 self.change_state(AVDTP_CONFIGURED_STATE) 1892 return None 1893 1894 def on_get_configuration_command(self, configuration): 1895 if self.state not in ( 1896 AVDTP_CONFIGURED_STATE, 1897 AVDTP_OPEN_STATE, 1898 AVDTP_STREAMING_STATE, 1899 ): 1900 return Get_Configuration_Reject(AVDTP_BAD_STATE_ERROR) 1901 1902 return self.local_endpoint.on_get_configuration_command(configuration) 1903 1904 def on_reconfigure_command(self, configuration): 1905 if self.state != AVDTP_OPEN_STATE: 1906 return Reconfigure_Reject(AVDTP_BAD_STATE_ERROR) 1907 1908 result = self.local_endpoint.on_reconfigure_command(configuration) 1909 if result is not None: 1910 return result 1911 1912 return None 1913 1914 def on_open_command(self): 1915 if self.state != AVDTP_CONFIGURED_STATE: 1916 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1917 1918 result = self.local_endpoint.on_open_command() 1919 if result is not None: 1920 return result 1921 1922 # Register to accept the next channel 1923 self.protocol.channel_acceptor = self 1924 1925 self.change_state(AVDTP_OPEN_STATE) 1926 return None 1927 1928 def on_start_command(self): 1929 if self.state != AVDTP_OPEN_STATE: 1930 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1931 1932 # Check that we have an RTP channel 1933 if self.rtp_channel is None: 1934 logger.warning('received start command before RTP channel establishment') 1935 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1936 1937 result = self.local_endpoint.on_start_command() 1938 if result is not None: 1939 return result 1940 1941 self.change_state(AVDTP_STREAMING_STATE) 1942 return None 1943 1944 def on_suspend_command(self): 1945 if self.state != AVDTP_STREAMING_STATE: 1946 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1947 1948 result = self.local_endpoint.on_suspend_command() 1949 if result is not None: 1950 return result 1951 1952 self.change_state(AVDTP_OPEN_STATE) 1953 return None 1954 1955 def on_close_command(self): 1956 if self.state not in (AVDTP_OPEN_STATE, AVDTP_STREAMING_STATE): 1957 return Open_Reject(AVDTP_BAD_STATE_ERROR) 1958 1959 result = self.local_endpoint.on_close_command() 1960 if result is not None: 1961 return result 1962 1963 self.change_state(AVDTP_CLOSING_STATE) 1964 1965 if self.rtp_channel is None: 1966 # No channel to release, we're done 1967 self.change_state(AVDTP_IDLE_STATE) 1968 else: 1969 # TODO: set a timer as we wait for the RTP channel to be closed 1970 pass 1971 1972 return None 1973 1974 def on_abort_command(self): 1975 if self.rtp_channel is None: 1976 # No need to wait 1977 self.change_state(AVDTP_IDLE_STATE) 1978 else: 1979 # Wait for the RTP channel to be closed 1980 self.change_state(AVDTP_ABORTING_STATE) 1981 1982 def on_l2cap_connection(self, channel): 1983 logger.debug(color('<<< stream channel connected', 'magenta')) 1984 self.rtp_channel = channel 1985 channel.on('open', self.on_l2cap_channel_open) 1986 channel.on('close', self.on_l2cap_channel_close) 1987 1988 # We don't need more channels 1989 self.protocol.channel_acceptor = None 1990 1991 def on_l2cap_channel_open(self): 1992 logger.debug(color('<<< stream channel open', 'magenta')) 1993 self.local_endpoint.on_rtp_channel_open() 1994 1995 def on_l2cap_channel_close(self): 1996 logger.debug(color('<<< stream channel closed', 'magenta')) 1997 self.local_endpoint.on_rtp_channel_close() 1998 self.local_endpoint.in_use = 0 1999 self.rtp_channel = None 2000 2001 if self.state in (AVDTP_CLOSING_STATE, AVDTP_ABORTING_STATE): 2002 self.change_state(AVDTP_IDLE_STATE) 2003 else: 2004 logger.warning('unexpected channel close while not CLOSING or ABORTING') 2005 2006 def __init__( 2007 self, 2008 protocol: Protocol, 2009 local_endpoint: LocalStreamEndPoint, 2010 remote_endpoint: StreamEndPointProxy, 2011 ) -> None: 2012 ''' 2013 remote_endpoint must be a subclass of StreamEndPointProxy 2014 2015 ''' 2016 self.protocol = protocol 2017 self.local_endpoint = local_endpoint 2018 self.remote_endpoint = remote_endpoint 2019 self.rtp_channel = None 2020 self.state = AVDTP_IDLE_STATE 2021 2022 local_endpoint.stream = self 2023 local_endpoint.in_use = 1 2024 2025 def __str__(self) -> str: 2026 return ( 2027 f'Stream({self.local_endpoint.seid} -> ' 2028 f'{self.remote_endpoint.seid} {self.state_name(self.state)})' 2029 ) 2030 2031 2032# ----------------------------------------------------------------------------- 2033class StreamEndPoint: 2034 def __init__( 2035 self, 2036 seid: int, 2037 media_type: int, 2038 tsep: int, 2039 in_use: int, 2040 capabilities: Iterable[ServiceCapabilities], 2041 ) -> None: 2042 self.seid = seid 2043 self.media_type = media_type 2044 self.tsep = tsep 2045 self.in_use = in_use 2046 self.capabilities = capabilities 2047 2048 def __str__(self) -> str: 2049 media_type = f'{name_or_number(AVDTP_MEDIA_TYPE_NAMES, self.media_type)}' 2050 tsep = f'{name_or_number(AVDTP_TSEP_NAMES, self.tsep)}' 2051 return '\n'.join( 2052 [ 2053 'SEP(', 2054 f' seid={self.seid}', 2055 f' media_type={media_type}', 2056 f' tsep={tsep}', 2057 f' in_use={self.in_use}', 2058 ' capabilities=[', 2059 '\n'.join([f' {x}' for x in self.capabilities]), 2060 ' ]', 2061 ')', 2062 ] 2063 ) 2064 2065 2066# ----------------------------------------------------------------------------- 2067class StreamEndPointProxy: 2068 def __init__(self, protocol: Protocol, seid: int) -> None: 2069 self.seid = seid 2070 self.protocol = protocol 2071 2072 async def set_configuration( 2073 self, int_seid: int, configuration: Iterable[ServiceCapabilities] 2074 ) -> Set_Configuration_Response: 2075 return await self.protocol.set_configuration(self.seid, int_seid, configuration) 2076 2077 async def open(self) -> Open_Response: 2078 return await self.protocol.open(self.seid) 2079 2080 async def start(self) -> Start_Response: 2081 return await self.protocol.start([self.seid]) 2082 2083 async def stop(self) -> Suspend_Response: 2084 return await self.protocol.suspend([self.seid]) 2085 2086 async def close(self) -> Close_Response: 2087 return await self.protocol.close(self.seid) 2088 2089 async def abort(self) -> Abort_Response: 2090 return await self.protocol.abort(self.seid) 2091 2092 2093# ----------------------------------------------------------------------------- 2094class DiscoveredStreamEndPoint(StreamEndPoint, StreamEndPointProxy): 2095 def __init__( 2096 self, 2097 protocol: Protocol, 2098 seid: int, 2099 media_type: int, 2100 tsep: int, 2101 in_use: int, 2102 capabilities: Iterable[ServiceCapabilities], 2103 ) -> None: 2104 StreamEndPoint.__init__(self, seid, media_type, tsep, in_use, capabilities) 2105 StreamEndPointProxy.__init__(self, protocol, seid) 2106 2107 2108# ----------------------------------------------------------------------------- 2109class LocalStreamEndPoint(StreamEndPoint, EventEmitter): 2110 stream: Optional[Stream] 2111 2112 def __init__( 2113 self, 2114 protocol: Protocol, 2115 seid: int, 2116 media_type: int, 2117 tsep: int, 2118 capabilities: Iterable[ServiceCapabilities], 2119 configuration: Optional[Iterable[ServiceCapabilities]] = None, 2120 ): 2121 StreamEndPoint.__init__(self, seid, media_type, tsep, 0, capabilities) 2122 EventEmitter.__init__(self) 2123 self.protocol = protocol 2124 self.configuration = configuration if configuration is not None else [] 2125 self.stream = None 2126 2127 async def start(self): 2128 pass 2129 2130 async def stop(self): 2131 pass 2132 2133 async def close(self): 2134 pass 2135 2136 def on_reconfigure_command(self, command): 2137 pass 2138 2139 def on_set_configuration_command(self, configuration): 2140 logger.debug( 2141 '<<< received configuration: ' 2142 f'{",".join([str(capability) for capability in configuration])}' 2143 ) 2144 self.configuration = configuration 2145 self.emit('configuration') 2146 2147 def on_get_configuration_command(self): 2148 return Get_Configuration_Response(self.configuration) 2149 2150 def on_open_command(self): 2151 self.emit('open') 2152 2153 def on_start_command(self): 2154 self.emit('start') 2155 2156 def on_suspend_command(self): 2157 self.emit('suspend') 2158 2159 def on_close_command(self): 2160 self.emit('close') 2161 2162 def on_abort_command(self): 2163 self.emit('abort') 2164 2165 def on_rtp_channel_open(self): 2166 self.emit('rtp_channel_open') 2167 2168 def on_rtp_channel_close(self): 2169 self.emit('rtp_channel_close') 2170 2171 2172# ----------------------------------------------------------------------------- 2173class LocalSource(LocalStreamEndPoint): 2174 def __init__( 2175 self, 2176 protocol: Protocol, 2177 seid: int, 2178 codec_capabilities: MediaCodecCapabilities, 2179 packet_pump: MediaPacketPump, 2180 ) -> None: 2181 capabilities = [ 2182 ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY), 2183 codec_capabilities, 2184 ] 2185 super().__init__( 2186 protocol, 2187 seid, 2188 codec_capabilities.media_type, 2189 AVDTP_TSEP_SRC, 2190 capabilities, 2191 capabilities, 2192 ) 2193 self.packet_pump = packet_pump 2194 2195 async def start(self) -> None: 2196 if self.packet_pump and self.stream and self.stream.rtp_channel: 2197 return await self.packet_pump.start(self.stream.rtp_channel) 2198 2199 self.emit('start') 2200 2201 async def stop(self) -> None: 2202 if self.packet_pump: 2203 return await self.packet_pump.stop() 2204 2205 self.emit('stop') 2206 2207 def on_start_command(self): 2208 asyncio.create_task(self.start()) 2209 2210 def on_suspend_command(self): 2211 asyncio.create_task(self.stop()) 2212 2213 2214# ----------------------------------------------------------------------------- 2215class LocalSink(LocalStreamEndPoint): 2216 def __init__( 2217 self, protocol: Protocol, seid: int, codec_capabilities: MediaCodecCapabilities 2218 ) -> None: 2219 capabilities = [ 2220 ServiceCapabilities(AVDTP_MEDIA_TRANSPORT_SERVICE_CATEGORY), 2221 codec_capabilities, 2222 ] 2223 super().__init__( 2224 protocol, 2225 seid, 2226 codec_capabilities.media_type, 2227 AVDTP_TSEP_SNK, 2228 capabilities, 2229 ) 2230 2231 def on_rtp_channel_open(self): 2232 logger.debug(color('<<< RTP channel open', 'magenta')) 2233 self.stream.rtp_channel.sink = self.on_avdtp_packet 2234 super().on_rtp_channel_open() 2235 2236 def on_rtp_channel_close(self): 2237 logger.debug(color('<<< RTP channel close', 'magenta')) 2238 super().on_rtp_channel_close() 2239 2240 def on_avdtp_packet(self, packet): 2241 rtp_packet = MediaPacket.from_bytes(packet) 2242 logger.debug( 2243 f'{color("<<< RTP Packet:", "green")} ' 2244 f'{rtp_packet} {rtp_packet.payload[:16].hex()}' 2245 ) 2246 self.emit('rtp_packet', rtp_packet) 2247