1# Copyright 2021-2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19import asyncio 20from dataclasses import dataclass 21import enum 22import logging 23import struct 24from typing import ( 25 AsyncIterator, 26 Awaitable, 27 Callable, 28 cast, 29 Dict, 30 Iterable, 31 List, 32 Optional, 33 Sequence, 34 SupportsBytes, 35 Tuple, 36 Type, 37 TypeVar, 38 Union, 39) 40 41import pyee 42 43from bumble.colors import color 44from bumble.device import Device, Connection 45from bumble.sdp import ( 46 SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, 47 SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, 48 SDP_PUBLIC_BROWSE_ROOT, 49 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 50 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 51 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 52 SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, 53 DataElement, 54 ServiceAttribute, 55) 56from bumble.utils import AsyncRunner, OpenIntEnum 57from bumble.core import ( 58 ProtocolError, 59 BT_L2CAP_PROTOCOL_ID, 60 BT_AVCTP_PROTOCOL_ID, 61 BT_AV_REMOTE_CONTROL_SERVICE, 62 BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE, 63 BT_AV_REMOTE_CONTROL_TARGET_SERVICE, 64) 65from bumble import l2cap 66from bumble import avc 67from bumble import avctp 68from bumble import utils 69 70 71# ----------------------------------------------------------------------------- 72# Logging 73# ----------------------------------------------------------------------------- 74logger = logging.getLogger(__name__) 75 76 77# ----------------------------------------------------------------------------- 78# Constants 79# ----------------------------------------------------------------------------- 80AVRCP_PID = 0x110E 81AVRCP_BLUETOOTH_SIG_COMPANY_ID = 0x001958 82 83 84# ----------------------------------------------------------------------------- 85def make_controller_service_sdp_records( 86 service_record_handle: int, 87 avctp_version: Tuple[int, int] = (1, 4), 88 avrcp_version: Tuple[int, int] = (1, 6), 89 supported_features: int = 1, 90) -> List[ServiceAttribute]: 91 # TODO: support a way to compute the supported features from a feature list 92 avctp_version_int = avctp_version[0] << 8 | avctp_version[1] 93 avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1] 94 95 return [ 96 ServiceAttribute( 97 SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, 98 DataElement.unsigned_integer_32(service_record_handle), 99 ), 100 ServiceAttribute( 101 SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, 102 DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), 103 ), 104 ServiceAttribute( 105 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 106 DataElement.sequence( 107 [ 108 DataElement.uuid(BT_AV_REMOTE_CONTROL_SERVICE), 109 DataElement.uuid(BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE), 110 ] 111 ), 112 ), 113 ServiceAttribute( 114 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 115 DataElement.sequence( 116 [ 117 DataElement.sequence( 118 [ 119 DataElement.uuid(BT_L2CAP_PROTOCOL_ID), 120 DataElement.unsigned_integer_16(avctp.AVCTP_PSM), 121 ] 122 ), 123 DataElement.sequence( 124 [ 125 DataElement.uuid(BT_AVCTP_PROTOCOL_ID), 126 DataElement.unsigned_integer_16(avctp_version_int), 127 ] 128 ), 129 ] 130 ), 131 ), 132 ServiceAttribute( 133 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 134 DataElement.sequence( 135 [ 136 DataElement.sequence( 137 [ 138 DataElement.uuid(BT_AV_REMOTE_CONTROL_SERVICE), 139 DataElement.unsigned_integer_16(avrcp_version_int), 140 ] 141 ), 142 ] 143 ), 144 ), 145 ServiceAttribute( 146 SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, 147 DataElement.unsigned_integer_16(supported_features), 148 ), 149 ] 150 151 152# ----------------------------------------------------------------------------- 153def make_target_service_sdp_records( 154 service_record_handle: int, 155 avctp_version: Tuple[int, int] = (1, 4), 156 avrcp_version: Tuple[int, int] = (1, 6), 157 supported_features: int = 0x23, 158) -> List[ServiceAttribute]: 159 # TODO: support a way to compute the supported features from a feature list 160 avctp_version_int = avctp_version[0] << 8 | avctp_version[1] 161 avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1] 162 163 return [ 164 ServiceAttribute( 165 SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID, 166 DataElement.unsigned_integer_32(service_record_handle), 167 ), 168 ServiceAttribute( 169 SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID, 170 DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]), 171 ), 172 ServiceAttribute( 173 SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID, 174 DataElement.sequence( 175 [ 176 DataElement.uuid(BT_AV_REMOTE_CONTROL_TARGET_SERVICE), 177 ] 178 ), 179 ), 180 ServiceAttribute( 181 SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID, 182 DataElement.sequence( 183 [ 184 DataElement.sequence( 185 [ 186 DataElement.uuid(BT_L2CAP_PROTOCOL_ID), 187 DataElement.unsigned_integer_16(avctp.AVCTP_PSM), 188 ] 189 ), 190 DataElement.sequence( 191 [ 192 DataElement.uuid(BT_AVCTP_PROTOCOL_ID), 193 DataElement.unsigned_integer_16(avctp_version_int), 194 ] 195 ), 196 ] 197 ), 198 ), 199 ServiceAttribute( 200 SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID, 201 DataElement.sequence( 202 [ 203 DataElement.sequence( 204 [ 205 DataElement.uuid(BT_AV_REMOTE_CONTROL_SERVICE), 206 DataElement.unsigned_integer_16(avrcp_version_int), 207 ] 208 ), 209 ] 210 ), 211 ), 212 ServiceAttribute( 213 SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID, 214 DataElement.unsigned_integer_16(supported_features), 215 ), 216 ] 217 218 219# ----------------------------------------------------------------------------- 220def _decode_attribute_value(value: bytes, character_set: CharacterSetId) -> str: 221 try: 222 if character_set == CharacterSetId.UTF_8: 223 return value.decode("utf-8") 224 return value.decode("ascii") 225 except UnicodeDecodeError: 226 logger.warning(f"cannot decode string with bytes: {value.hex()}") 227 return "" 228 229 230# ----------------------------------------------------------------------------- 231class PduAssembler: 232 """ 233 PDU Assembler to support fragmented PDUs are defined in: 234 Audio/Video Remote Control / Profile Specification 235 6.3.1 AVRCP specific AV//C commands 236 """ 237 238 pdu_id: Optional[Protocol.PduId] 239 payload: bytes 240 241 def __init__(self, callback: Callable[[Protocol.PduId, bytes], None]) -> None: 242 self.callback = callback 243 self.reset() 244 245 def reset(self) -> None: 246 self.pdu_id = None 247 self.parameter = b'' 248 249 def on_pdu(self, pdu: bytes) -> None: 250 pdu_id = Protocol.PduId(pdu[0]) 251 packet_type = Protocol.PacketType(pdu[1] & 3) 252 parameter_length = struct.unpack_from('>H', pdu, 2)[0] 253 parameter = pdu[4 : 4 + parameter_length] 254 if len(parameter) != parameter_length: 255 logger.warning("parameter length exceeds pdu size") 256 self.reset() 257 return 258 259 if packet_type in (Protocol.PacketType.SINGLE, Protocol.PacketType.START): 260 if self.pdu_id is not None: 261 # We are already in a PDU 262 logger.warning("received START or SINGLE fragment while in pdu") 263 self.reset() 264 265 if packet_type in (Protocol.PacketType.CONTINUE, Protocol.PacketType.END): 266 if pdu_id != self.pdu_id: 267 logger.warning("PID does not match") 268 self.reset() 269 return 270 else: 271 self.pdu_id = pdu_id 272 273 self.parameter += parameter 274 275 if packet_type in (Protocol.PacketType.SINGLE, Protocol.PacketType.END): 276 self.on_pdu_complete() 277 278 def on_pdu_complete(self) -> None: 279 assert self.pdu_id is not None 280 try: 281 self.callback(self.pdu_id, self.parameter) 282 except Exception as error: 283 logger.exception(color(f'!!! exception in callback: {error}', 'red')) 284 285 self.reset() 286 287 288# ----------------------------------------------------------------------------- 289@dataclass 290class Command: 291 pdu_id: Protocol.PduId 292 parameter: bytes 293 294 def to_string(self, properties: Dict[str, str]) -> str: 295 properties_str = ",".join( 296 [f"{name}={value}" for name, value in properties.items()] 297 ) 298 return f"Command[{self.pdu_id.name}]({properties_str})" 299 300 def __str__(self) -> str: 301 return self.to_string({"parameters": self.parameter.hex()}) 302 303 def __repr__(self) -> str: 304 return str(self) 305 306 307# ----------------------------------------------------------------------------- 308class GetCapabilitiesCommand(Command): 309 class CapabilityId(OpenIntEnum): 310 COMPANY_ID = 0x02 311 EVENTS_SUPPORTED = 0x03 312 313 capability_id: CapabilityId 314 315 @classmethod 316 def from_bytes(cls, pdu: bytes) -> GetCapabilitiesCommand: 317 return cls(cls.CapabilityId(pdu[0])) 318 319 def __init__(self, capability_id: CapabilityId) -> None: 320 super().__init__(Protocol.PduId.GET_CAPABILITIES, bytes([capability_id])) 321 self.capability_id = capability_id 322 323 def __str__(self) -> str: 324 return self.to_string({"capability_id": self.capability_id.name}) 325 326 327# ----------------------------------------------------------------------------- 328class GetPlayStatusCommand(Command): 329 @classmethod 330 def from_bytes(cls, _: bytes) -> GetPlayStatusCommand: 331 return cls() 332 333 def __init__(self) -> None: 334 super().__init__(Protocol.PduId.GET_PLAY_STATUS, b'') 335 336 337# ----------------------------------------------------------------------------- 338class GetElementAttributesCommand(Command): 339 identifier: int 340 attribute_ids: List[MediaAttributeId] 341 342 @classmethod 343 def from_bytes(cls, pdu: bytes) -> GetElementAttributesCommand: 344 identifier = struct.unpack_from(">Q", pdu)[0] 345 num_attributes = pdu[8] 346 attribute_ids = [MediaAttributeId(pdu[9 + i]) for i in range(num_attributes)] 347 return cls(identifier, attribute_ids) 348 349 def __init__( 350 self, identifier: int, attribute_ids: Sequence[MediaAttributeId] 351 ) -> None: 352 parameter = struct.pack(">QB", identifier, len(attribute_ids)) + b''.join( 353 [struct.pack(">I", int(attribute_id)) for attribute_id in attribute_ids] 354 ) 355 super().__init__(Protocol.PduId.GET_ELEMENT_ATTRIBUTES, parameter) 356 self.identifier = identifier 357 self.attribute_ids = list(attribute_ids) 358 359 360# ----------------------------------------------------------------------------- 361class SetAbsoluteVolumeCommand(Command): 362 MAXIMUM_VOLUME = 0x7F 363 364 volume: int 365 366 @classmethod 367 def from_bytes(cls, pdu: bytes) -> SetAbsoluteVolumeCommand: 368 return cls(pdu[0]) 369 370 def __init__(self, volume: int) -> None: 371 super().__init__(Protocol.PduId.SET_ABSOLUTE_VOLUME, bytes([volume])) 372 self.volume = volume 373 374 def __str__(self) -> str: 375 return self.to_string({"volume": str(self.volume)}) 376 377 378# ----------------------------------------------------------------------------- 379class RegisterNotificationCommand(Command): 380 event_id: EventId 381 playback_interval: int 382 383 @classmethod 384 def from_bytes(cls, pdu: bytes) -> RegisterNotificationCommand: 385 event_id = EventId(pdu[0]) 386 playback_interval = struct.unpack_from(">I", pdu, 1)[0] 387 return cls(event_id, playback_interval) 388 389 def __init__(self, event_id: EventId, playback_interval: int) -> None: 390 super().__init__( 391 Protocol.PduId.REGISTER_NOTIFICATION, 392 struct.pack(">BI", int(event_id), playback_interval), 393 ) 394 self.event_id = event_id 395 self.playback_interval = playback_interval 396 397 def __str__(self) -> str: 398 return self.to_string( 399 { 400 "event_id": self.event_id.name, 401 "playback_interval": str(self.playback_interval), 402 } 403 ) 404 405 406# ----------------------------------------------------------------------------- 407@dataclass 408class Response: 409 pdu_id: Protocol.PduId 410 parameter: bytes 411 412 def to_string(self, properties: Dict[str, str]) -> str: 413 properties_str = ",".join( 414 [f"{name}={value}" for name, value in properties.items()] 415 ) 416 return f"Response[{self.pdu_id.name}]({properties_str})" 417 418 def __str__(self) -> str: 419 return self.to_string({"parameter": self.parameter.hex()}) 420 421 def __repr__(self) -> str: 422 return str(self) 423 424 425# ----------------------------------------------------------------------------- 426class RejectedResponse(Response): 427 status_code: Protocol.StatusCode 428 429 @classmethod 430 def from_bytes(cls, pdu_id: Protocol.PduId, pdu: bytes) -> RejectedResponse: 431 return cls(pdu_id, Protocol.StatusCode(pdu[0])) 432 433 def __init__( 434 self, pdu_id: Protocol.PduId, status_code: Protocol.StatusCode 435 ) -> None: 436 super().__init__(pdu_id, bytes([int(status_code)])) 437 self.status_code = status_code 438 439 def __str__(self) -> str: 440 return self.to_string( 441 { 442 "status_code": self.status_code.name, 443 } 444 ) 445 446 447# ----------------------------------------------------------------------------- 448class NotImplementedResponse(Response): 449 @classmethod 450 def from_bytes(cls, pdu_id: Protocol.PduId, pdu: bytes) -> NotImplementedResponse: 451 return cls(pdu_id, pdu[1:]) 452 453 454# ----------------------------------------------------------------------------- 455class GetCapabilitiesResponse(Response): 456 capability_id: GetCapabilitiesCommand.CapabilityId 457 capabilities: List[Union[SupportsBytes, bytes]] 458 459 @classmethod 460 def from_bytes(cls, pdu: bytes) -> GetCapabilitiesResponse: 461 if len(pdu) < 2: 462 # Possibly a reject response. 463 return cls(GetCapabilitiesCommand.CapabilityId(0), []) 464 465 # Assume that the payloads all follow the same pattern: 466 # <CapabilityID><CapabilityCount><Capability*> 467 capability_id = GetCapabilitiesCommand.CapabilityId(pdu[0]) 468 capability_count = pdu[1] 469 470 capabilities: List[Union[SupportsBytes, bytes]] 471 if capability_id == GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED: 472 capabilities = [EventId(pdu[2 + x]) for x in range(capability_count)] 473 else: 474 capability_size = (len(pdu) - 2) // capability_count 475 capabilities = [ 476 pdu[x : x + capability_size] 477 for x in range(2, len(pdu), capability_size) 478 ] 479 480 return cls(capability_id, capabilities) 481 482 def __init__( 483 self, 484 capability_id: GetCapabilitiesCommand.CapabilityId, 485 capabilities: Sequence[Union[SupportsBytes, bytes]], 486 ) -> None: 487 super().__init__( 488 Protocol.PduId.GET_CAPABILITIES, 489 bytes([capability_id, len(capabilities)]) 490 + b''.join(bytes(capability) for capability in capabilities), 491 ) 492 self.capability_id = capability_id 493 self.capabilities = list(capabilities) 494 495 def __str__(self) -> str: 496 return self.to_string( 497 { 498 "capability_id": self.capability_id.name, 499 "capabilities": str(self.capabilities), 500 } 501 ) 502 503 504# ----------------------------------------------------------------------------- 505class GetPlayStatusResponse(Response): 506 song_length: int 507 song_position: int 508 play_status: PlayStatus 509 510 @classmethod 511 def from_bytes(cls, pdu: bytes) -> GetPlayStatusResponse: 512 (song_length, song_position) = struct.unpack_from(">II", pdu, 0) 513 play_status = PlayStatus(pdu[8]) 514 515 return cls(song_length, song_position, play_status) 516 517 def __init__( 518 self, 519 song_length: int, 520 song_position: int, 521 play_status: PlayStatus, 522 ) -> None: 523 super().__init__( 524 Protocol.PduId.GET_PLAY_STATUS, 525 struct.pack(">IIB", song_length, song_position, int(play_status)), 526 ) 527 self.song_length = song_length 528 self.song_position = song_position 529 self.play_status = play_status 530 531 def __str__(self) -> str: 532 return self.to_string( 533 { 534 "song_length": str(self.song_length), 535 "song_position": str(self.song_position), 536 "play_status": self.play_status.name, 537 } 538 ) 539 540 541# ----------------------------------------------------------------------------- 542class GetElementAttributesResponse(Response): 543 attributes: List[MediaAttribute] 544 545 @classmethod 546 def from_bytes(cls, pdu: bytes) -> GetElementAttributesResponse: 547 num_attributes = pdu[0] 548 offset = 1 549 attributes: List[MediaAttribute] = [] 550 for _ in range(num_attributes): 551 ( 552 attribute_id_int, 553 character_set_id_int, 554 attribute_value_length, 555 ) = struct.unpack_from(">IHH", pdu, offset) 556 attribute_value_bytes = pdu[ 557 offset + 8 : offset + 8 + attribute_value_length 558 ] 559 attribute_id = MediaAttributeId(attribute_id_int) 560 character_set_id = CharacterSetId(character_set_id_int) 561 attribute_value = _decode_attribute_value( 562 attribute_value_bytes, character_set_id 563 ) 564 attributes.append( 565 MediaAttribute(attribute_id, character_set_id, attribute_value) 566 ) 567 offset += 8 + attribute_value_length 568 569 return cls(attributes) 570 571 def __init__(self, attributes: Sequence[MediaAttribute]) -> None: 572 parameter = bytes([len(attributes)]) 573 for attribute in attributes: 574 attribute_value_bytes = attribute.attribute_value.encode("utf-8") 575 parameter += ( 576 struct.pack( 577 ">IHH", 578 int(attribute.attribute_id), 579 int(CharacterSetId.UTF_8), 580 len(attribute_value_bytes), 581 ) 582 + attribute_value_bytes 583 ) 584 super().__init__( 585 Protocol.PduId.GET_ELEMENT_ATTRIBUTES, 586 parameter, 587 ) 588 self.attributes = list(attributes) 589 590 def __str__(self) -> str: 591 attribute_strs = [str(attribute) for attribute in self.attributes] 592 return self.to_string( 593 { 594 "attributes": f"[{', '.join(attribute_strs)}]", 595 } 596 ) 597 598 599# ----------------------------------------------------------------------------- 600class SetAbsoluteVolumeResponse(Response): 601 volume: int 602 603 @classmethod 604 def from_bytes(cls, pdu: bytes) -> SetAbsoluteVolumeResponse: 605 return cls(pdu[0]) 606 607 def __init__(self, volume: int) -> None: 608 super().__init__(Protocol.PduId.SET_ABSOLUTE_VOLUME, bytes([volume])) 609 self.volume = volume 610 611 def __str__(self) -> str: 612 return self.to_string({"volume": str(self.volume)}) 613 614 615# ----------------------------------------------------------------------------- 616class RegisterNotificationResponse(Response): 617 event: Event 618 619 @classmethod 620 def from_bytes(cls, pdu: bytes) -> RegisterNotificationResponse: 621 return cls(Event.from_bytes(pdu)) 622 623 def __init__(self, event: Event) -> None: 624 super().__init__( 625 Protocol.PduId.REGISTER_NOTIFICATION, 626 bytes(event), 627 ) 628 self.event = event 629 630 def __str__(self) -> str: 631 return self.to_string( 632 { 633 "event": str(self.event), 634 } 635 ) 636 637 638# ----------------------------------------------------------------------------- 639class EventId(OpenIntEnum): 640 PLAYBACK_STATUS_CHANGED = 0x01 641 TRACK_CHANGED = 0x02 642 TRACK_REACHED_END = 0x03 643 TRACK_REACHED_START = 0x04 644 PLAYBACK_POS_CHANGED = 0x05 645 BATT_STATUS_CHANGED = 0x06 646 SYSTEM_STATUS_CHANGED = 0x07 647 PLAYER_APPLICATION_SETTING_CHANGED = 0x08 648 NOW_PLAYING_CONTENT_CHANGED = 0x09 649 AVAILABLE_PLAYERS_CHANGED = 0x0A 650 ADDRESSED_PLAYER_CHANGED = 0x0B 651 UIDS_CHANGED = 0x0C 652 VOLUME_CHANGED = 0x0D 653 654 def __bytes__(self) -> bytes: 655 return bytes([int(self)]) 656 657 658# ----------------------------------------------------------------------------- 659class CharacterSetId(OpenIntEnum): 660 UTF_8 = 0x06 661 662 663# ----------------------------------------------------------------------------- 664class MediaAttributeId(OpenIntEnum): 665 TITLE = 0x01 666 ARTIST_NAME = 0x02 667 ALBUM_NAME = 0x03 668 TRACK_NUMBER = 0x04 669 TOTAL_NUMBER_OF_TRACKS = 0x05 670 GENRE = 0x06 671 PLAYING_TIME = 0x07 672 DEFAULT_COVER_ART = 0x08 673 674 675# ----------------------------------------------------------------------------- 676@dataclass 677class MediaAttribute: 678 attribute_id: MediaAttributeId 679 character_set_id: CharacterSetId 680 attribute_value: str 681 682 683# ----------------------------------------------------------------------------- 684class PlayStatus(OpenIntEnum): 685 STOPPED = 0x00 686 PLAYING = 0x01 687 PAUSED = 0x02 688 FWD_SEEK = 0x03 689 REV_SEEK = 0x04 690 ERROR = 0xFF 691 692 693# ----------------------------------------------------------------------------- 694@dataclass 695class SongAndPlayStatus: 696 song_length: int 697 song_position: int 698 play_status: PlayStatus 699 700 701# ----------------------------------------------------------------------------- 702class ApplicationSetting: 703 class AttributeId(OpenIntEnum): 704 EQUALIZER_ON_OFF = 0x01 705 REPEAT_MODE = 0x02 706 SHUFFLE_ON_OFF = 0x03 707 SCAN_ON_OFF = 0x04 708 709 class EqualizerOnOffStatus(OpenIntEnum): 710 OFF = 0x01 711 ON = 0x02 712 713 class RepeatModeStatus(OpenIntEnum): 714 OFF = 0x01 715 SINGLE_TRACK_REPEAT = 0x02 716 ALL_TRACK_REPEAT = 0x03 717 GROUP_REPEAT = 0x04 718 719 class ShuffleOnOffStatus(OpenIntEnum): 720 OFF = 0x01 721 ALL_TRACKS_SHUFFLE = 0x02 722 GROUP_SHUFFLE = 0x03 723 724 class ScanOnOffStatus(OpenIntEnum): 725 OFF = 0x01 726 ALL_TRACKS_SCAN = 0x02 727 GROUP_SCAN = 0x03 728 729 class GenericValue(OpenIntEnum): 730 pass 731 732 733# ----------------------------------------------------------------------------- 734@dataclass 735class Event: 736 event_id: EventId 737 738 @classmethod 739 def from_bytes(cls, pdu: bytes) -> Event: 740 event_id = EventId(pdu[0]) 741 subclass = EVENT_SUBCLASSES.get(event_id, GenericEvent) 742 return subclass.from_bytes(pdu) 743 744 def __bytes__(self) -> bytes: 745 return bytes([self.event_id]) 746 747 748# ----------------------------------------------------------------------------- 749@dataclass 750class GenericEvent(Event): 751 data: bytes 752 753 @classmethod 754 def from_bytes(cls, pdu: bytes) -> GenericEvent: 755 return cls(event_id=EventId(pdu[0]), data=pdu[1:]) 756 757 def __bytes__(self) -> bytes: 758 return bytes([self.event_id]) + self.data 759 760 761# ----------------------------------------------------------------------------- 762@dataclass 763class PlaybackStatusChangedEvent(Event): 764 play_status: PlayStatus 765 766 @classmethod 767 def from_bytes(cls, pdu: bytes) -> PlaybackStatusChangedEvent: 768 return cls(play_status=PlayStatus(pdu[1])) 769 770 def __init__(self, play_status: PlayStatus) -> None: 771 super().__init__(EventId.PLAYBACK_STATUS_CHANGED) 772 self.play_status = play_status 773 774 def __bytes__(self) -> bytes: 775 return bytes([self.event_id]) + bytes([self.play_status]) 776 777 778# ----------------------------------------------------------------------------- 779@dataclass 780class PlaybackPositionChangedEvent(Event): 781 playback_position: int 782 783 @classmethod 784 def from_bytes(cls, pdu: bytes) -> PlaybackPositionChangedEvent: 785 return cls(playback_position=struct.unpack_from(">I", pdu, 1)[0]) 786 787 def __init__(self, playback_position: int) -> None: 788 super().__init__(EventId.PLAYBACK_POS_CHANGED) 789 self.playback_position = playback_position 790 791 def __bytes__(self) -> bytes: 792 return bytes([self.event_id]) + struct.pack(">I", self.playback_position) 793 794 795# ----------------------------------------------------------------------------- 796@dataclass 797class TrackChangedEvent(Event): 798 identifier: bytes 799 800 @classmethod 801 def from_bytes(cls, pdu: bytes) -> TrackChangedEvent: 802 return cls(identifier=pdu[1:]) 803 804 def __init__(self, identifier: bytes) -> None: 805 super().__init__(EventId.TRACK_CHANGED) 806 self.identifier = identifier 807 808 def __bytes__(self) -> bytes: 809 return bytes([self.event_id]) + self.identifier 810 811 812# ----------------------------------------------------------------------------- 813@dataclass 814class PlayerApplicationSettingChangedEvent(Event): 815 @dataclass 816 class Setting: 817 attribute_id: ApplicationSetting.AttributeId 818 value_id: OpenIntEnum 819 820 player_application_settings: List[Setting] 821 822 @classmethod 823 def from_bytes(cls, pdu: bytes) -> PlayerApplicationSettingChangedEvent: 824 def setting(attribute_id_int: int, value_id_int: int): 825 attribute_id = ApplicationSetting.AttributeId(attribute_id_int) 826 value_id: OpenIntEnum 827 if attribute_id == ApplicationSetting.AttributeId.EQUALIZER_ON_OFF: 828 value_id = ApplicationSetting.EqualizerOnOffStatus(value_id_int) 829 elif attribute_id == ApplicationSetting.AttributeId.REPEAT_MODE: 830 value_id = ApplicationSetting.RepeatModeStatus(value_id_int) 831 elif attribute_id == ApplicationSetting.AttributeId.SHUFFLE_ON_OFF: 832 value_id = ApplicationSetting.ShuffleOnOffStatus(value_id_int) 833 elif attribute_id == ApplicationSetting.AttributeId.SCAN_ON_OFF: 834 value_id = ApplicationSetting.ScanOnOffStatus(value_id_int) 835 else: 836 value_id = ApplicationSetting.GenericValue(value_id_int) 837 838 return cls.Setting(attribute_id, value_id) 839 840 settings = [ 841 setting(pdu[2 + (i * 2)], pdu[2 + (i * 2) + 1]) for i in range(pdu[1]) 842 ] 843 return cls(player_application_settings=settings) 844 845 def __init__(self, player_application_settings: Sequence[Setting]) -> None: 846 super().__init__(EventId.PLAYER_APPLICATION_SETTING_CHANGED) 847 self.player_application_settings = list(player_application_settings) 848 849 def __bytes__(self) -> bytes: 850 return ( 851 bytes([self.event_id]) 852 + bytes([len(self.player_application_settings)]) 853 + b''.join( 854 [ 855 bytes([setting.attribute_id, setting.value_id]) 856 for setting in self.player_application_settings 857 ] 858 ) 859 ) 860 861 862# ----------------------------------------------------------------------------- 863@dataclass 864class NowPlayingContentChangedEvent(Event): 865 @classmethod 866 def from_bytes(cls, pdu: bytes) -> NowPlayingContentChangedEvent: 867 return cls() 868 869 def __init__(self) -> None: 870 super().__init__(EventId.NOW_PLAYING_CONTENT_CHANGED) 871 872 873# ----------------------------------------------------------------------------- 874@dataclass 875class AvailablePlayersChangedEvent(Event): 876 @classmethod 877 def from_bytes(cls, pdu: bytes) -> AvailablePlayersChangedEvent: 878 return cls() 879 880 def __init__(self) -> None: 881 super().__init__(EventId.AVAILABLE_PLAYERS_CHANGED) 882 883 884# ----------------------------------------------------------------------------- 885@dataclass 886class AddressedPlayerChangedEvent(Event): 887 @dataclass 888 class Player: 889 player_id: int 890 uid_counter: int 891 892 @classmethod 893 def from_bytes(cls, pdu: bytes) -> AddressedPlayerChangedEvent: 894 player_id, uid_counter = struct.unpack_from("<HH", pdu, 1) 895 return cls(cls.Player(player_id, uid_counter)) 896 897 def __init__(self, player: Player) -> None: 898 super().__init__(EventId.ADDRESSED_PLAYER_CHANGED) 899 self.player = player 900 901 def __bytes__(self) -> bytes: 902 return bytes([self.event_id]) + struct.pack( 903 ">HH", self.player.player_id, self.player.uid_counter 904 ) 905 906 907# ----------------------------------------------------------------------------- 908@dataclass 909class UidsChangedEvent(Event): 910 uid_counter: int 911 912 @classmethod 913 def from_bytes(cls, pdu: bytes) -> UidsChangedEvent: 914 return cls(uid_counter=struct.unpack_from(">H", pdu, 1)[0]) 915 916 def __init__(self, uid_counter: int) -> None: 917 super().__init__(EventId.UIDS_CHANGED) 918 self.uid_counter = uid_counter 919 920 def __bytes__(self) -> bytes: 921 return bytes([self.event_id]) + struct.pack(">H", self.uid_counter) 922 923 924# ----------------------------------------------------------------------------- 925@dataclass 926class VolumeChangedEvent(Event): 927 volume: int 928 929 @classmethod 930 def from_bytes(cls, pdu: bytes) -> VolumeChangedEvent: 931 return cls(volume=pdu[1]) 932 933 def __init__(self, volume: int) -> None: 934 super().__init__(EventId.VOLUME_CHANGED) 935 self.volume = volume 936 937 def __bytes__(self) -> bytes: 938 return bytes([self.event_id]) + bytes([self.volume]) 939 940 941# ----------------------------------------------------------------------------- 942EVENT_SUBCLASSES: Dict[EventId, Type[Event]] = { 943 EventId.PLAYBACK_STATUS_CHANGED: PlaybackStatusChangedEvent, 944 EventId.PLAYBACK_POS_CHANGED: PlaybackPositionChangedEvent, 945 EventId.TRACK_CHANGED: TrackChangedEvent, 946 EventId.PLAYER_APPLICATION_SETTING_CHANGED: PlayerApplicationSettingChangedEvent, 947 EventId.NOW_PLAYING_CONTENT_CHANGED: NowPlayingContentChangedEvent, 948 EventId.AVAILABLE_PLAYERS_CHANGED: AvailablePlayersChangedEvent, 949 EventId.ADDRESSED_PLAYER_CHANGED: AddressedPlayerChangedEvent, 950 EventId.UIDS_CHANGED: UidsChangedEvent, 951 EventId.VOLUME_CHANGED: VolumeChangedEvent, 952} 953 954 955# ----------------------------------------------------------------------------- 956class Delegate: 957 """ 958 Base class for AVRCP delegates. 959 960 All the methods are async, even if they don't always need to be, so that 961 delegates that do need to wait for an async result may do so. 962 """ 963 964 class Error(Exception): 965 """The delegate method failed, with a specified status code.""" 966 967 def __init__(self, status_code: Protocol.StatusCode) -> None: 968 self.status_code = status_code 969 970 supported_events: List[EventId] 971 volume: int 972 973 def __init__(self, supported_events: Iterable[EventId] = ()) -> None: 974 self.supported_events = list(supported_events) 975 self.volume = 0 976 977 async def get_supported_events(self) -> List[EventId]: 978 return self.supported_events 979 980 async def set_absolute_volume(self, volume: int) -> None: 981 """ 982 Set the absolute volume. 983 984 Returns: the effective volume that was set. 985 """ 986 logger.debug(f"@@@ set_absolute_volume: volume={volume}") 987 self.volume = volume 988 989 async def get_absolute_volume(self) -> int: 990 return self.volume 991 992 # TODO add other delegate methods 993 994 995# ----------------------------------------------------------------------------- 996class Protocol(pyee.EventEmitter): 997 """AVRCP Controller and Target protocol.""" 998 999 class PacketType(enum.IntEnum): 1000 SINGLE = 0b00 1001 START = 0b01 1002 CONTINUE = 0b10 1003 END = 0b11 1004 1005 class PduId(OpenIntEnum): 1006 GET_CAPABILITIES = 0x10 1007 LIST_PLAYER_APPLICATION_SETTING_ATTRIBUTES = 0x11 1008 LIST_PLAYER_APPLICATION_SETTING_VALUES = 0x12 1009 GET_CURRENT_PLAYER_APPLICATION_SETTING_VALUE = 0x13 1010 SET_PLAYER_APPLICATION_SETTING_VALUE = 0x14 1011 GET_PLAYER_APPLICATION_SETTING_ATTRIBUTE_TEXT = 0x15 1012 GET_PLAYER_APPLICATION_SETTING_VALUE_TEXT = 0x16 1013 INFORM_DISPLAYABLE_CHARACTER_SET = 0x17 1014 INFORM_BATTERY_STATUS_OF_CT = 0x18 1015 GET_ELEMENT_ATTRIBUTES = 0x20 1016 GET_PLAY_STATUS = 0x30 1017 REGISTER_NOTIFICATION = 0x31 1018 REQUEST_CONTINUING_RESPONSE = 0x40 1019 ABORT_CONTINUING_RESPONSE = 0x41 1020 SET_ABSOLUTE_VOLUME = 0x50 1021 SET_ADDRESSED_PLAYER = 0x60 1022 SET_BROWSED_PLAYER = 0x70 1023 GET_FOLDER_ITEMS = 0x71 1024 GET_TOTAL_NUMBER_OF_ITEMS = 0x75 1025 1026 class StatusCode(OpenIntEnum): 1027 INVALID_COMMAND = 0x00 1028 INVALID_PARAMETER = 0x01 1029 PARAMETER_CONTENT_ERROR = 0x02 1030 INTERNAL_ERROR = 0x03 1031 OPERATION_COMPLETED = 0x04 1032 UID_CHANGED = 0x05 1033 INVALID_DIRECTION = 0x07 1034 NOT_A_DIRECTORY = 0x08 1035 DOES_NOT_EXIST = 0x09 1036 INVALID_SCOPE = 0x0A 1037 RANGE_OUT_OF_BOUNDS = 0x0B 1038 FOLDER_ITEM_IS_NOT_PLAYABLE = 0x0C 1039 MEDIA_IN_USE = 0x0D 1040 NOW_PLAYING_LIST_FULL = 0x0E 1041 SEARCH_NOT_SUPPORTED = 0x0F 1042 SEARCH_IN_PROGRESS = 0x10 1043 INVALID_PLAYER_ID = 0x11 1044 PLAYER_NOT_BROWSABLE = 0x12 1045 PLAYER_NOT_ADDRESSED = 0x13 1046 NO_VALID_SEARCH_RESULTS = 0x14 1047 NO_AVAILABLE_PLAYERS = 0x15 1048 ADDRESSED_PLAYER_CHANGED = 0x16 1049 1050 class InvalidPidError(Exception): 1051 """A response frame with ipid==1 was received.""" 1052 1053 class NotPendingError(Exception): 1054 """There is no pending command for a transaction label.""" 1055 1056 class MismatchedResponseError(Exception): 1057 """The response type does not corresponding to the request type.""" 1058 1059 def __init__(self, response: Response) -> None: 1060 self.response = response 1061 1062 class UnexpectedResponseTypeError(Exception): 1063 """The response type is not the expected one.""" 1064 1065 def __init__(self, response: Protocol.ResponseContext) -> None: 1066 self.response = response 1067 1068 class UnexpectedResponseCodeError(Exception): 1069 """The response code was not the expected one.""" 1070 1071 def __init__( 1072 self, response_code: avc.ResponseFrame.ResponseCode, response: Response 1073 ) -> None: 1074 self.response_code = response_code 1075 self.response = response 1076 1077 class PendingCommand: 1078 response: asyncio.Future 1079 1080 def __init__(self, transaction_label: int) -> None: 1081 self.transaction_label = transaction_label 1082 self.reset() 1083 1084 def reset(self): 1085 self.response = asyncio.get_running_loop().create_future() 1086 1087 @dataclass 1088 class ReceiveCommandState: 1089 transaction_label: int 1090 command_type: avc.CommandFrame.CommandType 1091 1092 @dataclass 1093 class ReceiveResponseState: 1094 transaction_label: int 1095 response_code: avc.ResponseFrame.ResponseCode 1096 1097 @dataclass 1098 class ResponseContext: 1099 transaction_label: int 1100 response: Response 1101 1102 @dataclass 1103 class FinalResponse(ResponseContext): 1104 response_code: avc.ResponseFrame.ResponseCode 1105 1106 @dataclass 1107 class InterimResponse(ResponseContext): 1108 final: Awaitable[Protocol.FinalResponse] 1109 1110 @dataclass 1111 class NotificationListener: 1112 transaction_label: int 1113 register_notification_command: RegisterNotificationCommand 1114 1115 delegate: Delegate 1116 send_transaction_label: int 1117 command_pdu_assembler: PduAssembler 1118 receive_command_state: Optional[ReceiveCommandState] 1119 response_pdu_assembler: PduAssembler 1120 receive_response_state: Optional[ReceiveResponseState] 1121 avctp_protocol: Optional[avctp.Protocol] 1122 free_commands: asyncio.Queue 1123 pending_commands: Dict[int, PendingCommand] # Pending commands, by label 1124 notification_listeners: Dict[EventId, NotificationListener] 1125 1126 @staticmethod 1127 def _check_vendor_dependent_frame( 1128 frame: Union[avc.VendorDependentCommandFrame, avc.VendorDependentResponseFrame] 1129 ) -> bool: 1130 if frame.company_id != AVRCP_BLUETOOTH_SIG_COMPANY_ID: 1131 logger.debug("unsupported company id, ignoring") 1132 return False 1133 1134 if frame.subunit_type != avc.Frame.SubunitType.PANEL or frame.subunit_id != 0: 1135 logger.debug("unsupported subunit") 1136 return False 1137 1138 return True 1139 1140 def __init__(self, delegate: Optional[Delegate] = None) -> None: 1141 super().__init__() 1142 self.delegate = delegate if delegate else Delegate() 1143 self.command_pdu_assembler = PduAssembler(self._on_command_pdu) 1144 self.receive_command_state = None 1145 self.response_pdu_assembler = PduAssembler(self._on_response_pdu) 1146 self.receive_response_state = None 1147 self.avctp_protocol = None 1148 self.notification_listeners = {} 1149 1150 # Create an initial pool of free commands 1151 self.pending_commands = {} 1152 self.free_commands = asyncio.Queue() 1153 for transaction_label in range(16): 1154 self.free_commands.put_nowait(self.PendingCommand(transaction_label)) 1155 1156 def listen(self, device: Device) -> None: 1157 """ 1158 Listen for incoming connections. 1159 1160 A 'connection' event will be emitted when a connection is made, and a 'start' 1161 event will be emitted when the protocol is ready to be used on that connection. 1162 """ 1163 device.register_l2cap_server(avctp.AVCTP_PSM, self._on_avctp_connection) 1164 1165 async def connect(self, connection: Connection) -> None: 1166 """ 1167 Connect to a peer. 1168 """ 1169 avctp_channel = await connection.create_l2cap_channel( 1170 l2cap.ClassicChannelSpec(psm=avctp.AVCTP_PSM) 1171 ) 1172 self._on_avctp_channel_open(avctp_channel) 1173 1174 async def _obtain_pending_command(self) -> PendingCommand: 1175 pending_command = await self.free_commands.get() 1176 self.pending_commands[pending_command.transaction_label] = pending_command 1177 return pending_command 1178 1179 def recycle_pending_command(self, pending_command: PendingCommand) -> None: 1180 pending_command.reset() 1181 del self.pending_commands[pending_command.transaction_label] 1182 self.free_commands.put_nowait(pending_command) 1183 logger.debug(f"recycled pending command, {self.free_commands.qsize()} free") 1184 1185 _R = TypeVar('_R') 1186 1187 @staticmethod 1188 def _check_response( 1189 response_context: ResponseContext, expected_type: Type[_R] 1190 ) -> _R: 1191 if isinstance(response_context, Protocol.FinalResponse): 1192 if ( 1193 response_context.response_code 1194 != avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE 1195 ): 1196 raise Protocol.UnexpectedResponseCodeError( 1197 response_context.response_code, response_context.response 1198 ) 1199 1200 if not (isinstance(response_context.response, expected_type)): 1201 raise Protocol.MismatchedResponseError(response_context.response) 1202 1203 return response_context.response 1204 1205 raise Protocol.UnexpectedResponseTypeError(response_context) 1206 1207 def _delegate_command( 1208 self, transaction_label: int, command: Command, method: Awaitable 1209 ) -> None: 1210 async def call(): 1211 try: 1212 await method 1213 except Delegate.Error as error: 1214 self.send_rejected_avrcp_response( 1215 transaction_label, 1216 command.pdu_id, 1217 error.status_code, 1218 ) 1219 except Exception: 1220 logger.exception("delegate method raised exception") 1221 self.send_rejected_avrcp_response( 1222 transaction_label, 1223 command.pdu_id, 1224 Protocol.StatusCode.INTERNAL_ERROR, 1225 ) 1226 1227 utils.AsyncRunner.spawn(call()) 1228 1229 async def get_supported_events(self) -> List[EventId]: 1230 """Get the list of events supported by the connected peer.""" 1231 response_context = await self.send_avrcp_command( 1232 avc.CommandFrame.CommandType.STATUS, 1233 GetCapabilitiesCommand( 1234 GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED 1235 ), 1236 ) 1237 response = self._check_response(response_context, GetCapabilitiesResponse) 1238 return cast(List[EventId], response.capabilities) 1239 1240 async def get_play_status(self) -> SongAndPlayStatus: 1241 """Get the play status of the connected peer.""" 1242 response_context = await self.send_avrcp_command( 1243 avc.CommandFrame.CommandType.STATUS, GetPlayStatusCommand() 1244 ) 1245 response = self._check_response(response_context, GetPlayStatusResponse) 1246 return SongAndPlayStatus( 1247 response.song_length, response.song_position, response.play_status 1248 ) 1249 1250 async def get_element_attributes( 1251 self, element_identifier: int, attribute_ids: Sequence[MediaAttributeId] 1252 ) -> List[MediaAttribute]: 1253 """Get element attributes from the connected peer.""" 1254 response_context = await self.send_avrcp_command( 1255 avc.CommandFrame.CommandType.STATUS, 1256 GetElementAttributesCommand(element_identifier, attribute_ids), 1257 ) 1258 response = self._check_response(response_context, GetElementAttributesResponse) 1259 return response.attributes 1260 1261 async def monitor_events( 1262 self, event_id: EventId, playback_interval: int = 0 1263 ) -> AsyncIterator[Event]: 1264 """ 1265 Monitor events emitted from a peer. 1266 1267 This generator yields Event objects. 1268 """ 1269 1270 def check_response(response) -> Event: 1271 if not isinstance(response, RegisterNotificationResponse): 1272 raise self.MismatchedResponseError(response) 1273 1274 return response.event 1275 1276 while True: 1277 response = await self.send_avrcp_command( 1278 avc.CommandFrame.CommandType.NOTIFY, 1279 RegisterNotificationCommand(event_id, playback_interval), 1280 ) 1281 1282 if isinstance(response, self.InterimResponse): 1283 logger.debug(f"interim: {response}") 1284 yield check_response(response.response) 1285 1286 logger.debug("waiting for final response") 1287 response = await response.final 1288 1289 if not isinstance(response, self.FinalResponse): 1290 raise self.UnexpectedResponseTypeError(response) 1291 1292 logger.debug(f"final: {response}") 1293 if response.response_code != avc.ResponseFrame.ResponseCode.CHANGED: 1294 raise self.UnexpectedResponseCodeError( 1295 response.response_code, response.response 1296 ) 1297 1298 yield check_response(response.response) 1299 1300 async def monitor_playback_status( 1301 self, 1302 ) -> AsyncIterator[PlayStatus]: 1303 """Monitor Playback Status changes from the connected peer.""" 1304 async for event in self.monitor_events(EventId.PLAYBACK_STATUS_CHANGED, 0): 1305 if not isinstance(event, PlaybackStatusChangedEvent): 1306 logger.warning("unexpected event class") 1307 continue 1308 yield event.play_status 1309 1310 async def monitor_track_changed( 1311 self, 1312 ) -> AsyncIterator[bytes]: 1313 """Monitor Track changes from the connected peer.""" 1314 async for event in self.monitor_events(EventId.TRACK_CHANGED, 0): 1315 if not isinstance(event, TrackChangedEvent): 1316 logger.warning("unexpected event class") 1317 continue 1318 yield event.identifier 1319 1320 async def monitor_playback_position( 1321 self, playback_interval: int 1322 ) -> AsyncIterator[int]: 1323 """Monitor Playback Position changes from the connected peer.""" 1324 async for event in self.monitor_events( 1325 EventId.PLAYBACK_POS_CHANGED, playback_interval 1326 ): 1327 if not isinstance(event, PlaybackPositionChangedEvent): 1328 logger.warning("unexpected event class") 1329 continue 1330 yield event.playback_position 1331 1332 async def monitor_player_application_settings( 1333 self, 1334 ) -> AsyncIterator[List[PlayerApplicationSettingChangedEvent.Setting]]: 1335 """Monitor Player Application Setting changes from the connected peer.""" 1336 async for event in self.monitor_events( 1337 EventId.PLAYER_APPLICATION_SETTING_CHANGED, 0 1338 ): 1339 if not isinstance(event, PlayerApplicationSettingChangedEvent): 1340 logger.warning("unexpected event class") 1341 continue 1342 yield event.player_application_settings 1343 1344 async def monitor_now_playing_content(self) -> AsyncIterator[None]: 1345 """Monitor Now Playing changes from the connected peer.""" 1346 async for event in self.monitor_events(EventId.NOW_PLAYING_CONTENT_CHANGED, 0): 1347 if not isinstance(event, NowPlayingContentChangedEvent): 1348 logger.warning("unexpected event class") 1349 continue 1350 yield None 1351 1352 async def monitor_available_players(self) -> AsyncIterator[None]: 1353 """Monitor Available Players changes from the connected peer.""" 1354 async for event in self.monitor_events(EventId.AVAILABLE_PLAYERS_CHANGED, 0): 1355 if not isinstance(event, AvailablePlayersChangedEvent): 1356 logger.warning("unexpected event class") 1357 continue 1358 yield None 1359 1360 async def monitor_addressed_player( 1361 self, 1362 ) -> AsyncIterator[AddressedPlayerChangedEvent.Player]: 1363 """Monitor Addressed Player changes from the connected peer.""" 1364 async for event in self.monitor_events(EventId.ADDRESSED_PLAYER_CHANGED, 0): 1365 if not isinstance(event, AddressedPlayerChangedEvent): 1366 logger.warning("unexpected event class") 1367 continue 1368 yield event.player 1369 1370 async def monitor_uids( 1371 self, 1372 ) -> AsyncIterator[int]: 1373 """Monitor UID changes from the connected peer.""" 1374 async for event in self.monitor_events(EventId.UIDS_CHANGED, 0): 1375 if not isinstance(event, UidsChangedEvent): 1376 logger.warning("unexpected event class") 1377 continue 1378 yield event.uid_counter 1379 1380 async def monitor_volume( 1381 self, 1382 ) -> AsyncIterator[int]: 1383 """Monitor Volume changes from the connected peer.""" 1384 async for event in self.monitor_events(EventId.VOLUME_CHANGED, 0): 1385 if not isinstance(event, VolumeChangedEvent): 1386 logger.warning("unexpected event class") 1387 continue 1388 yield event.volume 1389 1390 def notify_event(self, event: Event): 1391 """Notify an event to the connected peer.""" 1392 if (listener := self.notification_listeners.get(event.event_id)) is None: 1393 logger.debug(f"no listener for {event.event_id.name}") 1394 return 1395 1396 # Emit the notification. 1397 notification = RegisterNotificationResponse(event) 1398 self.send_avrcp_response( 1399 listener.transaction_label, 1400 avc.ResponseFrame.ResponseCode.CHANGED, 1401 notification, 1402 ) 1403 1404 # Remove the listener (they will need to re-register). 1405 del self.notification_listeners[event.event_id] 1406 1407 def notify_playback_status_changed(self, status: PlayStatus) -> None: 1408 """Notify the connected peer of a Playback Status change.""" 1409 self.notify_event(PlaybackStatusChangedEvent(status)) 1410 1411 def notify_track_changed(self, identifier: bytes) -> None: 1412 """Notify the connected peer of a Track change.""" 1413 if len(identifier) != 8: 1414 raise ValueError("identifier must be 8 bytes") 1415 self.notify_event(TrackChangedEvent(identifier)) 1416 1417 def notify_playback_position_changed(self, position: int) -> None: 1418 """Notify the connected peer of a Position change.""" 1419 self.notify_event(PlaybackPositionChangedEvent(position)) 1420 1421 def notify_player_application_settings_changed( 1422 self, settings: Sequence[PlayerApplicationSettingChangedEvent.Setting] 1423 ) -> None: 1424 """Notify the connected peer of an Player Application Setting change.""" 1425 self.notify_event( 1426 PlayerApplicationSettingChangedEvent(settings), 1427 ) 1428 1429 def notify_now_playing_content_changed(self) -> None: 1430 """Notify the connected peer of a Now Playing change.""" 1431 self.notify_event(NowPlayingContentChangedEvent()) 1432 1433 def notify_available_players_changed(self) -> None: 1434 """Notify the connected peer of an Available Players change.""" 1435 self.notify_event(AvailablePlayersChangedEvent()) 1436 1437 def notify_addressed_player_changed( 1438 self, player: AddressedPlayerChangedEvent.Player 1439 ) -> None: 1440 """Notify the connected peer of an Addressed Player change.""" 1441 self.notify_event(AddressedPlayerChangedEvent(player)) 1442 1443 def notify_uids_changed(self, uid_counter: int) -> None: 1444 """Notify the connected peer of a UID change.""" 1445 self.notify_event(UidsChangedEvent(uid_counter)) 1446 1447 def notify_volume_changed(self, volume: int) -> None: 1448 """Notify the connected peer of a Volume change.""" 1449 self.notify_event(VolumeChangedEvent(volume)) 1450 1451 def _register_notification_listener( 1452 self, transaction_label: int, command: RegisterNotificationCommand 1453 ) -> None: 1454 listener = self.NotificationListener(transaction_label, command) 1455 self.notification_listeners[command.event_id] = listener 1456 1457 def _on_avctp_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None: 1458 logger.debug("AVCTP connection established") 1459 l2cap_channel.on("open", lambda: self._on_avctp_channel_open(l2cap_channel)) 1460 1461 self.emit("connection") 1462 1463 def _on_avctp_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None: 1464 logger.debug("AVCTP channel open") 1465 if self.avctp_protocol is not None: 1466 # TODO: find a better strategy instead of just closing 1467 logger.warning("AVCTP protocol already active, closing connection") 1468 AsyncRunner.spawn(l2cap_channel.disconnect()) 1469 return 1470 1471 self.avctp_protocol = avctp.Protocol(l2cap_channel) 1472 self.avctp_protocol.register_command_handler(AVRCP_PID, self._on_avctp_command) 1473 self.avctp_protocol.register_response_handler( 1474 AVRCP_PID, self._on_avctp_response 1475 ) 1476 l2cap_channel.on("close", self._on_avctp_channel_close) 1477 1478 self.emit("start") 1479 1480 def _on_avctp_channel_close(self) -> None: 1481 logger.debug("AVCTP channel closed") 1482 self.avctp_protocol = None 1483 1484 self.emit("stop") 1485 1486 def _on_avctp_command( 1487 self, transaction_label: int, command: avc.CommandFrame 1488 ) -> None: 1489 logger.debug( 1490 f"<<< AVCTP Command, transaction_label={transaction_label}: " f"{command}" 1491 ) 1492 1493 # Only the PANEL subunit type with subunit ID 0 is supported in this profile. 1494 if ( 1495 command.subunit_type != avc.Frame.SubunitType.PANEL 1496 or command.subunit_id != 0 1497 ): 1498 logger.debug("subunit not supported") 1499 self.send_not_implemented_response(transaction_label, command) 1500 return 1501 1502 if isinstance(command, avc.VendorDependentCommandFrame): 1503 if not self._check_vendor_dependent_frame(command): 1504 return 1505 1506 if self.receive_command_state is None: 1507 self.receive_command_state = self.ReceiveCommandState( 1508 transaction_label=transaction_label, command_type=command.ctype 1509 ) 1510 elif ( 1511 self.receive_command_state.transaction_label != transaction_label 1512 or self.receive_command_state.command_type != command.ctype 1513 ): 1514 # We're in the middle of some other PDU 1515 logger.warning("received interleaved PDU, resetting state") 1516 self.command_pdu_assembler.reset() 1517 self.receive_command_state = None 1518 return 1519 else: 1520 self.receive_command_state.command_type = command.ctype 1521 self.receive_command_state.transaction_label = transaction_label 1522 1523 self.command_pdu_assembler.on_pdu(command.vendor_dependent_data) 1524 return 1525 1526 if isinstance(command, avc.PassThroughCommandFrame): 1527 # TODO: delegate 1528 response = avc.PassThroughResponseFrame( 1529 avc.ResponseFrame.ResponseCode.ACCEPTED, 1530 avc.Frame.SubunitType.PANEL, 1531 0, 1532 command.state_flag, 1533 command.operation_id, 1534 command.operation_data, 1535 ) 1536 self.send_response(transaction_label, response) 1537 return 1538 1539 # TODO handle other types 1540 self.send_not_implemented_response(transaction_label, command) 1541 1542 def _on_avctp_response( 1543 self, transaction_label: int, response: Optional[avc.ResponseFrame] 1544 ) -> None: 1545 logger.debug( 1546 f"<<< AVCTP Response, transaction_label={transaction_label}: {response}" 1547 ) 1548 1549 # Check that we have a pending command that matches this response. 1550 if not (pending_command := self.pending_commands.get(transaction_label)): 1551 logger.warning("no pending command with this transaction label") 1552 return 1553 1554 # A None response means an invalid PID was used in the request. 1555 if response is None: 1556 pending_command.response.set_exception(self.InvalidPidError()) 1557 1558 if isinstance(response, avc.VendorDependentResponseFrame): 1559 if not self._check_vendor_dependent_frame(response): 1560 return 1561 1562 if self.receive_response_state is None: 1563 self.receive_response_state = self.ReceiveResponseState( 1564 transaction_label=transaction_label, response_code=response.response 1565 ) 1566 elif ( 1567 self.receive_response_state.transaction_label != transaction_label 1568 or self.receive_response_state.response_code != response.response 1569 ): 1570 # We're in the middle of some other PDU 1571 logger.warning("received interleaved PDU, resetting state") 1572 self.response_pdu_assembler.reset() 1573 self.receive_response_state = None 1574 return 1575 else: 1576 self.receive_response_state.response_code = response.response 1577 self.receive_response_state.transaction_label = transaction_label 1578 1579 self.response_pdu_assembler.on_pdu(response.vendor_dependent_data) 1580 return 1581 1582 if isinstance(response, avc.PassThroughResponseFrame): 1583 pending_command.response.set_result(response) 1584 1585 # TODO handle other types 1586 1587 self.recycle_pending_command(pending_command) 1588 1589 def _on_command_pdu(self, pdu_id: PduId, pdu: bytes) -> None: 1590 logger.debug(f"<<< AVRCP command PDU [pdu_id={pdu_id.name}]: {pdu.hex()}") 1591 1592 assert self.receive_command_state is not None 1593 transaction_label = self.receive_command_state.transaction_label 1594 1595 # Dispatch the command. 1596 # NOTE: with a small number of supported commands, a manual dispatch like this 1597 # is Ok, but if/when more commands are supported, a lookup dispatch mechanism 1598 # would be more appropriate. 1599 # TODO: switch on ctype 1600 if self.receive_command_state.command_type in ( 1601 avc.CommandFrame.CommandType.CONTROL, 1602 avc.CommandFrame.CommandType.STATUS, 1603 avc.CommandFrame.CommandType.NOTIFY, 1604 ): 1605 # TODO: catch exceptions from delegates 1606 if pdu_id == self.PduId.GET_CAPABILITIES: 1607 self._on_get_capabilities_command( 1608 transaction_label, GetCapabilitiesCommand.from_bytes(pdu) 1609 ) 1610 elif pdu_id == self.PduId.SET_ABSOLUTE_VOLUME: 1611 self._on_set_absolute_volume_command( 1612 transaction_label, SetAbsoluteVolumeCommand.from_bytes(pdu) 1613 ) 1614 elif pdu_id == self.PduId.REGISTER_NOTIFICATION: 1615 self._on_register_notification_command( 1616 transaction_label, RegisterNotificationCommand.from_bytes(pdu) 1617 ) 1618 else: 1619 # Not supported. 1620 # TODO: check that this is the right way to respond in this case. 1621 logger.debug("unsupported PDU ID") 1622 self.send_rejected_avrcp_response( 1623 transaction_label, pdu_id, self.StatusCode.INVALID_PARAMETER 1624 ) 1625 else: 1626 logger.debug("unsupported command type") 1627 self.send_rejected_avrcp_response( 1628 transaction_label, pdu_id, self.StatusCode.INVALID_COMMAND 1629 ) 1630 1631 self.receive_command_state = None 1632 1633 def _on_response_pdu(self, pdu_id: PduId, pdu: bytes) -> None: 1634 logger.debug(f"<<< AVRCP response PDU [pdu_id={pdu_id.name}]: {pdu.hex()}") 1635 1636 assert self.receive_response_state is not None 1637 1638 transaction_label = self.receive_response_state.transaction_label 1639 response_code = self.receive_response_state.response_code 1640 self.receive_response_state = None 1641 1642 # Check that we have a pending command that matches this response. 1643 if not (pending_command := self.pending_commands.get(transaction_label)): 1644 logger.warning("no pending command with this transaction label") 1645 return 1646 1647 # Convert the PDU bytes into a response object. 1648 # NOTE: with a small number of supported responses, a manual switch like this 1649 # is Ok, but if/when more responses are supported, a lookup mechanism would be 1650 # more appropriate. 1651 response: Optional[Response] = None 1652 if response_code == avc.ResponseFrame.ResponseCode.REJECTED: 1653 response = RejectedResponse.from_bytes(pdu_id, pdu) 1654 elif response_code == avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED: 1655 response = NotImplementedResponse.from_bytes(pdu_id, pdu) 1656 elif response_code in ( 1657 avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, 1658 avc.ResponseFrame.ResponseCode.INTERIM, 1659 avc.ResponseFrame.ResponseCode.CHANGED, 1660 avc.ResponseFrame.ResponseCode.ACCEPTED, 1661 ): 1662 if pdu_id == self.PduId.GET_CAPABILITIES: 1663 response = GetCapabilitiesResponse.from_bytes(pdu) 1664 elif pdu_id == self.PduId.GET_PLAY_STATUS: 1665 response = GetPlayStatusResponse.from_bytes(pdu) 1666 elif pdu_id == self.PduId.GET_ELEMENT_ATTRIBUTES: 1667 response = GetElementAttributesResponse.from_bytes(pdu) 1668 elif pdu_id == self.PduId.SET_ABSOLUTE_VOLUME: 1669 response = SetAbsoluteVolumeResponse.from_bytes(pdu) 1670 elif pdu_id == self.PduId.REGISTER_NOTIFICATION: 1671 response = RegisterNotificationResponse.from_bytes(pdu) 1672 else: 1673 logger.debug("unexpected PDU ID") 1674 pending_command.response.set_exception( 1675 ProtocolError( 1676 error_code=None, 1677 error_namespace="avrcp", 1678 details="unexpected PDU ID", 1679 ) 1680 ) 1681 else: 1682 logger.debug("unexpected response code") 1683 pending_command.response.set_exception( 1684 ProtocolError( 1685 error_code=None, 1686 error_namespace="avrcp", 1687 details="unexpected response code", 1688 ) 1689 ) 1690 1691 if response is None: 1692 self.recycle_pending_command(pending_command) 1693 return 1694 1695 logger.debug(f"<<< AVRCP response: {response}") 1696 1697 # Make the response available to the waiter. 1698 if response_code == avc.ResponseFrame.ResponseCode.INTERIM: 1699 pending_interim_response = pending_command.response 1700 pending_command.reset() 1701 pending_interim_response.set_result( 1702 self.InterimResponse( 1703 pending_command.transaction_label, 1704 response, 1705 pending_command.response, 1706 ) 1707 ) 1708 else: 1709 pending_command.response.set_result( 1710 self.FinalResponse( 1711 pending_command.transaction_label, 1712 response, 1713 response_code, 1714 ) 1715 ) 1716 self.recycle_pending_command(pending_command) 1717 1718 def send_command(self, transaction_label: int, command: avc.CommandFrame) -> None: 1719 logger.debug(f">>> AVRCP command: {command}") 1720 1721 if self.avctp_protocol is None: 1722 logger.warning("trying to send command while avctp_protocol is None") 1723 return 1724 1725 self.avctp_protocol.send_command(transaction_label, AVRCP_PID, bytes(command)) 1726 1727 async def send_passthrough_command( 1728 self, command: avc.PassThroughCommandFrame 1729 ) -> avc.PassThroughResponseFrame: 1730 # Wait for a free command slot. 1731 pending_command = await self._obtain_pending_command() 1732 1733 # Send the command. 1734 self.send_command(pending_command.transaction_label, command) 1735 1736 # Wait for the response. 1737 return await pending_command.response 1738 1739 async def send_key_event( 1740 self, key: avc.PassThroughCommandFrame.OperationId, pressed: bool 1741 ) -> avc.PassThroughResponseFrame: 1742 """Send a key event to the connected peer.""" 1743 return await self.send_passthrough_command( 1744 avc.PassThroughCommandFrame( 1745 avc.CommandFrame.CommandType.CONTROL, 1746 avc.Frame.SubunitType.PANEL, 1747 0, 1748 ( 1749 avc.PassThroughFrame.StateFlag.PRESSED 1750 if pressed 1751 else avc.PassThroughFrame.StateFlag.RELEASED 1752 ), 1753 key, 1754 b'', 1755 ) 1756 ) 1757 1758 async def send_avrcp_command( 1759 self, command_type: avc.CommandFrame.CommandType, command: Command 1760 ) -> ResponseContext: 1761 # Wait for a free command slot. 1762 pending_command = await self._obtain_pending_command() 1763 1764 # TODO: fragmentation 1765 # Send the command. 1766 logger.debug(f">>> AVRCP command PDU: {command}") 1767 pdu = ( 1768 struct.pack(">BBH", command.pdu_id, 0, len(command.parameter)) 1769 + command.parameter 1770 ) 1771 command_frame = avc.VendorDependentCommandFrame( 1772 command_type, 1773 avc.Frame.SubunitType.PANEL, 1774 0, 1775 AVRCP_BLUETOOTH_SIG_COMPANY_ID, 1776 pdu, 1777 ) 1778 self.send_command(pending_command.transaction_label, command_frame) 1779 1780 # Wait for the response. 1781 return await pending_command.response 1782 1783 def send_response( 1784 self, transaction_label: int, response: avc.ResponseFrame 1785 ) -> None: 1786 assert self.avctp_protocol is not None 1787 logger.debug(f">>> AVRCP response: {response}") 1788 self.avctp_protocol.send_response(transaction_label, AVRCP_PID, bytes(response)) 1789 1790 def send_passthrough_response( 1791 self, 1792 transaction_label: int, 1793 command: avc.PassThroughCommandFrame, 1794 response_code: avc.ResponseFrame.ResponseCode, 1795 ): 1796 response = avc.PassThroughResponseFrame( 1797 response_code, 1798 avc.Frame.SubunitType.PANEL, 1799 0, 1800 command.state_flag, 1801 command.operation_id, 1802 command.operation_data, 1803 ) 1804 self.send_response(transaction_label, response) 1805 1806 def send_avrcp_response( 1807 self, 1808 transaction_label: int, 1809 response_code: avc.ResponseFrame.ResponseCode, 1810 response: Response, 1811 ) -> None: 1812 # TODO: fragmentation 1813 logger.debug(f">>> AVRCP response PDU: {response}") 1814 pdu = ( 1815 struct.pack(">BBH", response.pdu_id, 0, len(response.parameter)) 1816 + response.parameter 1817 ) 1818 response_frame = avc.VendorDependentResponseFrame( 1819 response_code, 1820 avc.Frame.SubunitType.PANEL, 1821 0, 1822 AVRCP_BLUETOOTH_SIG_COMPANY_ID, 1823 pdu, 1824 ) 1825 self.send_response(transaction_label, response_frame) 1826 1827 def send_not_implemented_response( 1828 self, transaction_label: int, command: avc.CommandFrame 1829 ) -> None: 1830 response = avc.ResponseFrame( 1831 avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED, 1832 command.subunit_type, 1833 command.subunit_id, 1834 command.opcode, 1835 command.operands, 1836 ) 1837 self.send_response(transaction_label, response) 1838 1839 def send_rejected_avrcp_response( 1840 self, transaction_label: int, pdu_id: Protocol.PduId, status_code: StatusCode 1841 ) -> None: 1842 self.send_avrcp_response( 1843 transaction_label, 1844 avc.ResponseFrame.ResponseCode.REJECTED, 1845 RejectedResponse(pdu_id, status_code), 1846 ) 1847 1848 def _on_get_capabilities_command( 1849 self, transaction_label: int, command: GetCapabilitiesCommand 1850 ) -> None: 1851 logger.debug(f"<<< AVRCP command PDU: {command}") 1852 1853 async def get_supported_events(): 1854 if ( 1855 command.capability_id 1856 != GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED 1857 ): 1858 raise Protocol.InvalidParameterError 1859 1860 supported_events = await self.delegate.get_supported_events() 1861 self.send_avrcp_response( 1862 transaction_label, 1863 avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, 1864 GetCapabilitiesResponse(command.capability_id, supported_events), 1865 ) 1866 1867 self._delegate_command(transaction_label, command, get_supported_events()) 1868 1869 def _on_set_absolute_volume_command( 1870 self, transaction_label: int, command: SetAbsoluteVolumeCommand 1871 ) -> None: 1872 logger.debug(f"<<< AVRCP command PDU: {command}") 1873 1874 async def set_absolute_volume(): 1875 await self.delegate.set_absolute_volume(command.volume) 1876 effective_volume = await self.delegate.get_absolute_volume() 1877 self.send_avrcp_response( 1878 transaction_label, 1879 avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE, 1880 SetAbsoluteVolumeResponse(effective_volume), 1881 ) 1882 1883 self._delegate_command(transaction_label, command, set_absolute_volume()) 1884 1885 def _on_register_notification_command( 1886 self, transaction_label: int, command: RegisterNotificationCommand 1887 ) -> None: 1888 logger.debug(f"<<< AVRCP command PDU: {command}") 1889 1890 async def register_notification(): 1891 # Check if the event is supported. 1892 supported_events = await self.delegate.get_supported_events() 1893 if command.event_id in supported_events: 1894 if command.event_id == EventId.VOLUME_CHANGED: 1895 volume = await self.delegate.get_absolute_volume() 1896 response = RegisterNotificationResponse(VolumeChangedEvent(volume)) 1897 self.send_avrcp_response( 1898 transaction_label, 1899 avc.ResponseFrame.ResponseCode.INTERIM, 1900 response, 1901 ) 1902 self._register_notification_listener(transaction_label, command) 1903 return 1904 1905 if command.event_id == EventId.PLAYBACK_STATUS_CHANGED: 1906 # TODO: testing only, use delegate 1907 response = RegisterNotificationResponse( 1908 PlaybackStatusChangedEvent(play_status=PlayStatus.PLAYING) 1909 ) 1910 self.send_avrcp_response( 1911 transaction_label, 1912 avc.ResponseFrame.ResponseCode.INTERIM, 1913 response, 1914 ) 1915 self._register_notification_listener(transaction_label, command) 1916 return 1917 1918 self._delegate_command(transaction_label, command, register_notification()) 1919