• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import asyncio
20from dataclasses import dataclass
21import enum
22import logging
23import struct
24from typing import (
25    AsyncIterator,
26    Awaitable,
27    Callable,
28    cast,
29    Dict,
30    Iterable,
31    List,
32    Optional,
33    Sequence,
34    SupportsBytes,
35    Tuple,
36    Type,
37    TypeVar,
38    Union,
39)
40
41import pyee
42
43from bumble.colors import color
44from bumble.device import Device, Connection
45from bumble.sdp import (
46    SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
47    SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
48    SDP_PUBLIC_BROWSE_ROOT,
49    SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
50    SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
51    SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
52    SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
53    DataElement,
54    ServiceAttribute,
55)
56from bumble.utils import AsyncRunner, OpenIntEnum
57from bumble.core import (
58    ProtocolError,
59    BT_L2CAP_PROTOCOL_ID,
60    BT_AVCTP_PROTOCOL_ID,
61    BT_AV_REMOTE_CONTROL_SERVICE,
62    BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE,
63    BT_AV_REMOTE_CONTROL_TARGET_SERVICE,
64)
65from bumble import l2cap
66from bumble import avc
67from bumble import avctp
68from bumble import utils
69
70
71# -----------------------------------------------------------------------------
72# Logging
73# -----------------------------------------------------------------------------
74logger = logging.getLogger(__name__)
75
76
77# -----------------------------------------------------------------------------
78# Constants
79# -----------------------------------------------------------------------------
80AVRCP_PID = 0x110E
81AVRCP_BLUETOOTH_SIG_COMPANY_ID = 0x001958
82
83
84# -----------------------------------------------------------------------------
85def make_controller_service_sdp_records(
86    service_record_handle: int,
87    avctp_version: Tuple[int, int] = (1, 4),
88    avrcp_version: Tuple[int, int] = (1, 6),
89    supported_features: int = 1,
90) -> List[ServiceAttribute]:
91    # TODO: support a way to compute the supported features from a feature list
92    avctp_version_int = avctp_version[0] << 8 | avctp_version[1]
93    avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1]
94
95    return [
96        ServiceAttribute(
97            SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
98            DataElement.unsigned_integer_32(service_record_handle),
99        ),
100        ServiceAttribute(
101            SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
102            DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
103        ),
104        ServiceAttribute(
105            SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
106            DataElement.sequence(
107                [
108                    DataElement.uuid(BT_AV_REMOTE_CONTROL_SERVICE),
109                    DataElement.uuid(BT_AV_REMOTE_CONTROL_CONTROLLER_SERVICE),
110                ]
111            ),
112        ),
113        ServiceAttribute(
114            SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
115            DataElement.sequence(
116                [
117                    DataElement.sequence(
118                        [
119                            DataElement.uuid(BT_L2CAP_PROTOCOL_ID),
120                            DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
121                        ]
122                    ),
123                    DataElement.sequence(
124                        [
125                            DataElement.uuid(BT_AVCTP_PROTOCOL_ID),
126                            DataElement.unsigned_integer_16(avctp_version_int),
127                        ]
128                    ),
129                ]
130            ),
131        ),
132        ServiceAttribute(
133            SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
134            DataElement.sequence(
135                [
136                    DataElement.sequence(
137                        [
138                            DataElement.uuid(BT_AV_REMOTE_CONTROL_SERVICE),
139                            DataElement.unsigned_integer_16(avrcp_version_int),
140                        ]
141                    ),
142                ]
143            ),
144        ),
145        ServiceAttribute(
146            SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
147            DataElement.unsigned_integer_16(supported_features),
148        ),
149    ]
150
151
152# -----------------------------------------------------------------------------
153def make_target_service_sdp_records(
154    service_record_handle: int,
155    avctp_version: Tuple[int, int] = (1, 4),
156    avrcp_version: Tuple[int, int] = (1, 6),
157    supported_features: int = 0x23,
158) -> List[ServiceAttribute]:
159    # TODO: support a way to compute the supported features from a feature list
160    avctp_version_int = avctp_version[0] << 8 | avctp_version[1]
161    avrcp_version_int = avrcp_version[0] << 8 | avrcp_version[1]
162
163    return [
164        ServiceAttribute(
165            SDP_SERVICE_RECORD_HANDLE_ATTRIBUTE_ID,
166            DataElement.unsigned_integer_32(service_record_handle),
167        ),
168        ServiceAttribute(
169            SDP_BROWSE_GROUP_LIST_ATTRIBUTE_ID,
170            DataElement.sequence([DataElement.uuid(SDP_PUBLIC_BROWSE_ROOT)]),
171        ),
172        ServiceAttribute(
173            SDP_SERVICE_CLASS_ID_LIST_ATTRIBUTE_ID,
174            DataElement.sequence(
175                [
176                    DataElement.uuid(BT_AV_REMOTE_CONTROL_TARGET_SERVICE),
177                ]
178            ),
179        ),
180        ServiceAttribute(
181            SDP_PROTOCOL_DESCRIPTOR_LIST_ATTRIBUTE_ID,
182            DataElement.sequence(
183                [
184                    DataElement.sequence(
185                        [
186                            DataElement.uuid(BT_L2CAP_PROTOCOL_ID),
187                            DataElement.unsigned_integer_16(avctp.AVCTP_PSM),
188                        ]
189                    ),
190                    DataElement.sequence(
191                        [
192                            DataElement.uuid(BT_AVCTP_PROTOCOL_ID),
193                            DataElement.unsigned_integer_16(avctp_version_int),
194                        ]
195                    ),
196                ]
197            ),
198        ),
199        ServiceAttribute(
200            SDP_BLUETOOTH_PROFILE_DESCRIPTOR_LIST_ATTRIBUTE_ID,
201            DataElement.sequence(
202                [
203                    DataElement.sequence(
204                        [
205                            DataElement.uuid(BT_AV_REMOTE_CONTROL_SERVICE),
206                            DataElement.unsigned_integer_16(avrcp_version_int),
207                        ]
208                    ),
209                ]
210            ),
211        ),
212        ServiceAttribute(
213            SDP_SUPPORTED_FEATURES_ATTRIBUTE_ID,
214            DataElement.unsigned_integer_16(supported_features),
215        ),
216    ]
217
218
219# -----------------------------------------------------------------------------
220def _decode_attribute_value(value: bytes, character_set: CharacterSetId) -> str:
221    try:
222        if character_set == CharacterSetId.UTF_8:
223            return value.decode("utf-8")
224        return value.decode("ascii")
225    except UnicodeDecodeError:
226        logger.warning(f"cannot decode string with bytes: {value.hex()}")
227        return ""
228
229
230# -----------------------------------------------------------------------------
231class PduAssembler:
232    """
233    PDU Assembler to support fragmented PDUs are defined in:
234    Audio/Video Remote Control / Profile Specification
235    6.3.1 AVRCP specific AV//C commands
236    """
237
238    pdu_id: Optional[Protocol.PduId]
239    payload: bytes
240
241    def __init__(self, callback: Callable[[Protocol.PduId, bytes], None]) -> None:
242        self.callback = callback
243        self.reset()
244
245    def reset(self) -> None:
246        self.pdu_id = None
247        self.parameter = b''
248
249    def on_pdu(self, pdu: bytes) -> None:
250        pdu_id = Protocol.PduId(pdu[0])
251        packet_type = Protocol.PacketType(pdu[1] & 3)
252        parameter_length = struct.unpack_from('>H', pdu, 2)[0]
253        parameter = pdu[4 : 4 + parameter_length]
254        if len(parameter) != parameter_length:
255            logger.warning("parameter length exceeds pdu size")
256            self.reset()
257            return
258
259        if packet_type in (Protocol.PacketType.SINGLE, Protocol.PacketType.START):
260            if self.pdu_id is not None:
261                # We are already in a PDU
262                logger.warning("received START or SINGLE fragment while in pdu")
263                self.reset()
264
265        if packet_type in (Protocol.PacketType.CONTINUE, Protocol.PacketType.END):
266            if pdu_id != self.pdu_id:
267                logger.warning("PID does not match")
268                self.reset()
269                return
270        else:
271            self.pdu_id = pdu_id
272
273        self.parameter += parameter
274
275        if packet_type in (Protocol.PacketType.SINGLE, Protocol.PacketType.END):
276            self.on_pdu_complete()
277
278    def on_pdu_complete(self) -> None:
279        assert self.pdu_id is not None
280        try:
281            self.callback(self.pdu_id, self.parameter)
282        except Exception as error:
283            logger.exception(color(f'!!! exception in callback: {error}', 'red'))
284
285        self.reset()
286
287
288# -----------------------------------------------------------------------------
289@dataclass
290class Command:
291    pdu_id: Protocol.PduId
292    parameter: bytes
293
294    def to_string(self, properties: Dict[str, str]) -> str:
295        properties_str = ",".join(
296            [f"{name}={value}" for name, value in properties.items()]
297        )
298        return f"Command[{self.pdu_id.name}]({properties_str})"
299
300    def __str__(self) -> str:
301        return self.to_string({"parameters": self.parameter.hex()})
302
303    def __repr__(self) -> str:
304        return str(self)
305
306
307# -----------------------------------------------------------------------------
308class GetCapabilitiesCommand(Command):
309    class CapabilityId(OpenIntEnum):
310        COMPANY_ID = 0x02
311        EVENTS_SUPPORTED = 0x03
312
313    capability_id: CapabilityId
314
315    @classmethod
316    def from_bytes(cls, pdu: bytes) -> GetCapabilitiesCommand:
317        return cls(cls.CapabilityId(pdu[0]))
318
319    def __init__(self, capability_id: CapabilityId) -> None:
320        super().__init__(Protocol.PduId.GET_CAPABILITIES, bytes([capability_id]))
321        self.capability_id = capability_id
322
323    def __str__(self) -> str:
324        return self.to_string({"capability_id": self.capability_id.name})
325
326
327# -----------------------------------------------------------------------------
328class GetPlayStatusCommand(Command):
329    @classmethod
330    def from_bytes(cls, _: bytes) -> GetPlayStatusCommand:
331        return cls()
332
333    def __init__(self) -> None:
334        super().__init__(Protocol.PduId.GET_PLAY_STATUS, b'')
335
336
337# -----------------------------------------------------------------------------
338class GetElementAttributesCommand(Command):
339    identifier: int
340    attribute_ids: List[MediaAttributeId]
341
342    @classmethod
343    def from_bytes(cls, pdu: bytes) -> GetElementAttributesCommand:
344        identifier = struct.unpack_from(">Q", pdu)[0]
345        num_attributes = pdu[8]
346        attribute_ids = [MediaAttributeId(pdu[9 + i]) for i in range(num_attributes)]
347        return cls(identifier, attribute_ids)
348
349    def __init__(
350        self, identifier: int, attribute_ids: Sequence[MediaAttributeId]
351    ) -> None:
352        parameter = struct.pack(">QB", identifier, len(attribute_ids)) + b''.join(
353            [struct.pack(">I", int(attribute_id)) for attribute_id in attribute_ids]
354        )
355        super().__init__(Protocol.PduId.GET_ELEMENT_ATTRIBUTES, parameter)
356        self.identifier = identifier
357        self.attribute_ids = list(attribute_ids)
358
359
360# -----------------------------------------------------------------------------
361class SetAbsoluteVolumeCommand(Command):
362    MAXIMUM_VOLUME = 0x7F
363
364    volume: int
365
366    @classmethod
367    def from_bytes(cls, pdu: bytes) -> SetAbsoluteVolumeCommand:
368        return cls(pdu[0])
369
370    def __init__(self, volume: int) -> None:
371        super().__init__(Protocol.PduId.SET_ABSOLUTE_VOLUME, bytes([volume]))
372        self.volume = volume
373
374    def __str__(self) -> str:
375        return self.to_string({"volume": str(self.volume)})
376
377
378# -----------------------------------------------------------------------------
379class RegisterNotificationCommand(Command):
380    event_id: EventId
381    playback_interval: int
382
383    @classmethod
384    def from_bytes(cls, pdu: bytes) -> RegisterNotificationCommand:
385        event_id = EventId(pdu[0])
386        playback_interval = struct.unpack_from(">I", pdu, 1)[0]
387        return cls(event_id, playback_interval)
388
389    def __init__(self, event_id: EventId, playback_interval: int) -> None:
390        super().__init__(
391            Protocol.PduId.REGISTER_NOTIFICATION,
392            struct.pack(">BI", int(event_id), playback_interval),
393        )
394        self.event_id = event_id
395        self.playback_interval = playback_interval
396
397    def __str__(self) -> str:
398        return self.to_string(
399            {
400                "event_id": self.event_id.name,
401                "playback_interval": str(self.playback_interval),
402            }
403        )
404
405
406# -----------------------------------------------------------------------------
407@dataclass
408class Response:
409    pdu_id: Protocol.PduId
410    parameter: bytes
411
412    def to_string(self, properties: Dict[str, str]) -> str:
413        properties_str = ",".join(
414            [f"{name}={value}" for name, value in properties.items()]
415        )
416        return f"Response[{self.pdu_id.name}]({properties_str})"
417
418    def __str__(self) -> str:
419        return self.to_string({"parameter": self.parameter.hex()})
420
421    def __repr__(self) -> str:
422        return str(self)
423
424
425# -----------------------------------------------------------------------------
426class RejectedResponse(Response):
427    status_code: Protocol.StatusCode
428
429    @classmethod
430    def from_bytes(cls, pdu_id: Protocol.PduId, pdu: bytes) -> RejectedResponse:
431        return cls(pdu_id, Protocol.StatusCode(pdu[0]))
432
433    def __init__(
434        self, pdu_id: Protocol.PduId, status_code: Protocol.StatusCode
435    ) -> None:
436        super().__init__(pdu_id, bytes([int(status_code)]))
437        self.status_code = status_code
438
439    def __str__(self) -> str:
440        return self.to_string(
441            {
442                "status_code": self.status_code.name,
443            }
444        )
445
446
447# -----------------------------------------------------------------------------
448class NotImplementedResponse(Response):
449    @classmethod
450    def from_bytes(cls, pdu_id: Protocol.PduId, pdu: bytes) -> NotImplementedResponse:
451        return cls(pdu_id, pdu[1:])
452
453
454# -----------------------------------------------------------------------------
455class GetCapabilitiesResponse(Response):
456    capability_id: GetCapabilitiesCommand.CapabilityId
457    capabilities: List[Union[SupportsBytes, bytes]]
458
459    @classmethod
460    def from_bytes(cls, pdu: bytes) -> GetCapabilitiesResponse:
461        if len(pdu) < 2:
462            # Possibly a reject response.
463            return cls(GetCapabilitiesCommand.CapabilityId(0), [])
464
465        # Assume that the payloads all follow the same pattern:
466        #  <CapabilityID><CapabilityCount><Capability*>
467        capability_id = GetCapabilitiesCommand.CapabilityId(pdu[0])
468        capability_count = pdu[1]
469
470        capabilities: List[Union[SupportsBytes, bytes]]
471        if capability_id == GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED:
472            capabilities = [EventId(pdu[2 + x]) for x in range(capability_count)]
473        else:
474            capability_size = (len(pdu) - 2) // capability_count
475            capabilities = [
476                pdu[x : x + capability_size]
477                for x in range(2, len(pdu), capability_size)
478            ]
479
480        return cls(capability_id, capabilities)
481
482    def __init__(
483        self,
484        capability_id: GetCapabilitiesCommand.CapabilityId,
485        capabilities: Sequence[Union[SupportsBytes, bytes]],
486    ) -> None:
487        super().__init__(
488            Protocol.PduId.GET_CAPABILITIES,
489            bytes([capability_id, len(capabilities)])
490            + b''.join(bytes(capability) for capability in capabilities),
491        )
492        self.capability_id = capability_id
493        self.capabilities = list(capabilities)
494
495    def __str__(self) -> str:
496        return self.to_string(
497            {
498                "capability_id": self.capability_id.name,
499                "capabilities": str(self.capabilities),
500            }
501        )
502
503
504# -----------------------------------------------------------------------------
505class GetPlayStatusResponse(Response):
506    song_length: int
507    song_position: int
508    play_status: PlayStatus
509
510    @classmethod
511    def from_bytes(cls, pdu: bytes) -> GetPlayStatusResponse:
512        (song_length, song_position) = struct.unpack_from(">II", pdu, 0)
513        play_status = PlayStatus(pdu[8])
514
515        return cls(song_length, song_position, play_status)
516
517    def __init__(
518        self,
519        song_length: int,
520        song_position: int,
521        play_status: PlayStatus,
522    ) -> None:
523        super().__init__(
524            Protocol.PduId.GET_PLAY_STATUS,
525            struct.pack(">IIB", song_length, song_position, int(play_status)),
526        )
527        self.song_length = song_length
528        self.song_position = song_position
529        self.play_status = play_status
530
531    def __str__(self) -> str:
532        return self.to_string(
533            {
534                "song_length": str(self.song_length),
535                "song_position": str(self.song_position),
536                "play_status": self.play_status.name,
537            }
538        )
539
540
541# -----------------------------------------------------------------------------
542class GetElementAttributesResponse(Response):
543    attributes: List[MediaAttribute]
544
545    @classmethod
546    def from_bytes(cls, pdu: bytes) -> GetElementAttributesResponse:
547        num_attributes = pdu[0]
548        offset = 1
549        attributes: List[MediaAttribute] = []
550        for _ in range(num_attributes):
551            (
552                attribute_id_int,
553                character_set_id_int,
554                attribute_value_length,
555            ) = struct.unpack_from(">IHH", pdu, offset)
556            attribute_value_bytes = pdu[
557                offset + 8 : offset + 8 + attribute_value_length
558            ]
559            attribute_id = MediaAttributeId(attribute_id_int)
560            character_set_id = CharacterSetId(character_set_id_int)
561            attribute_value = _decode_attribute_value(
562                attribute_value_bytes, character_set_id
563            )
564            attributes.append(
565                MediaAttribute(attribute_id, character_set_id, attribute_value)
566            )
567            offset += 8 + attribute_value_length
568
569        return cls(attributes)
570
571    def __init__(self, attributes: Sequence[MediaAttribute]) -> None:
572        parameter = bytes([len(attributes)])
573        for attribute in attributes:
574            attribute_value_bytes = attribute.attribute_value.encode("utf-8")
575            parameter += (
576                struct.pack(
577                    ">IHH",
578                    int(attribute.attribute_id),
579                    int(CharacterSetId.UTF_8),
580                    len(attribute_value_bytes),
581                )
582                + attribute_value_bytes
583            )
584        super().__init__(
585            Protocol.PduId.GET_ELEMENT_ATTRIBUTES,
586            parameter,
587        )
588        self.attributes = list(attributes)
589
590    def __str__(self) -> str:
591        attribute_strs = [str(attribute) for attribute in self.attributes]
592        return self.to_string(
593            {
594                "attributes": f"[{', '.join(attribute_strs)}]",
595            }
596        )
597
598
599# -----------------------------------------------------------------------------
600class SetAbsoluteVolumeResponse(Response):
601    volume: int
602
603    @classmethod
604    def from_bytes(cls, pdu: bytes) -> SetAbsoluteVolumeResponse:
605        return cls(pdu[0])
606
607    def __init__(self, volume: int) -> None:
608        super().__init__(Protocol.PduId.SET_ABSOLUTE_VOLUME, bytes([volume]))
609        self.volume = volume
610
611    def __str__(self) -> str:
612        return self.to_string({"volume": str(self.volume)})
613
614
615# -----------------------------------------------------------------------------
616class RegisterNotificationResponse(Response):
617    event: Event
618
619    @classmethod
620    def from_bytes(cls, pdu: bytes) -> RegisterNotificationResponse:
621        return cls(Event.from_bytes(pdu))
622
623    def __init__(self, event: Event) -> None:
624        super().__init__(
625            Protocol.PduId.REGISTER_NOTIFICATION,
626            bytes(event),
627        )
628        self.event = event
629
630    def __str__(self) -> str:
631        return self.to_string(
632            {
633                "event": str(self.event),
634            }
635        )
636
637
638# -----------------------------------------------------------------------------
639class EventId(OpenIntEnum):
640    PLAYBACK_STATUS_CHANGED = 0x01
641    TRACK_CHANGED = 0x02
642    TRACK_REACHED_END = 0x03
643    TRACK_REACHED_START = 0x04
644    PLAYBACK_POS_CHANGED = 0x05
645    BATT_STATUS_CHANGED = 0x06
646    SYSTEM_STATUS_CHANGED = 0x07
647    PLAYER_APPLICATION_SETTING_CHANGED = 0x08
648    NOW_PLAYING_CONTENT_CHANGED = 0x09
649    AVAILABLE_PLAYERS_CHANGED = 0x0A
650    ADDRESSED_PLAYER_CHANGED = 0x0B
651    UIDS_CHANGED = 0x0C
652    VOLUME_CHANGED = 0x0D
653
654    def __bytes__(self) -> bytes:
655        return bytes([int(self)])
656
657
658# -----------------------------------------------------------------------------
659class CharacterSetId(OpenIntEnum):
660    UTF_8 = 0x06
661
662
663# -----------------------------------------------------------------------------
664class MediaAttributeId(OpenIntEnum):
665    TITLE = 0x01
666    ARTIST_NAME = 0x02
667    ALBUM_NAME = 0x03
668    TRACK_NUMBER = 0x04
669    TOTAL_NUMBER_OF_TRACKS = 0x05
670    GENRE = 0x06
671    PLAYING_TIME = 0x07
672    DEFAULT_COVER_ART = 0x08
673
674
675# -----------------------------------------------------------------------------
676@dataclass
677class MediaAttribute:
678    attribute_id: MediaAttributeId
679    character_set_id: CharacterSetId
680    attribute_value: str
681
682
683# -----------------------------------------------------------------------------
684class PlayStatus(OpenIntEnum):
685    STOPPED = 0x00
686    PLAYING = 0x01
687    PAUSED = 0x02
688    FWD_SEEK = 0x03
689    REV_SEEK = 0x04
690    ERROR = 0xFF
691
692
693# -----------------------------------------------------------------------------
694@dataclass
695class SongAndPlayStatus:
696    song_length: int
697    song_position: int
698    play_status: PlayStatus
699
700
701# -----------------------------------------------------------------------------
702class ApplicationSetting:
703    class AttributeId(OpenIntEnum):
704        EQUALIZER_ON_OFF = 0x01
705        REPEAT_MODE = 0x02
706        SHUFFLE_ON_OFF = 0x03
707        SCAN_ON_OFF = 0x04
708
709    class EqualizerOnOffStatus(OpenIntEnum):
710        OFF = 0x01
711        ON = 0x02
712
713    class RepeatModeStatus(OpenIntEnum):
714        OFF = 0x01
715        SINGLE_TRACK_REPEAT = 0x02
716        ALL_TRACK_REPEAT = 0x03
717        GROUP_REPEAT = 0x04
718
719    class ShuffleOnOffStatus(OpenIntEnum):
720        OFF = 0x01
721        ALL_TRACKS_SHUFFLE = 0x02
722        GROUP_SHUFFLE = 0x03
723
724    class ScanOnOffStatus(OpenIntEnum):
725        OFF = 0x01
726        ALL_TRACKS_SCAN = 0x02
727        GROUP_SCAN = 0x03
728
729    class GenericValue(OpenIntEnum):
730        pass
731
732
733# -----------------------------------------------------------------------------
734@dataclass
735class Event:
736    event_id: EventId
737
738    @classmethod
739    def from_bytes(cls, pdu: bytes) -> Event:
740        event_id = EventId(pdu[0])
741        subclass = EVENT_SUBCLASSES.get(event_id, GenericEvent)
742        return subclass.from_bytes(pdu)
743
744    def __bytes__(self) -> bytes:
745        return bytes([self.event_id])
746
747
748# -----------------------------------------------------------------------------
749@dataclass
750class GenericEvent(Event):
751    data: bytes
752
753    @classmethod
754    def from_bytes(cls, pdu: bytes) -> GenericEvent:
755        return cls(event_id=EventId(pdu[0]), data=pdu[1:])
756
757    def __bytes__(self) -> bytes:
758        return bytes([self.event_id]) + self.data
759
760
761# -----------------------------------------------------------------------------
762@dataclass
763class PlaybackStatusChangedEvent(Event):
764    play_status: PlayStatus
765
766    @classmethod
767    def from_bytes(cls, pdu: bytes) -> PlaybackStatusChangedEvent:
768        return cls(play_status=PlayStatus(pdu[1]))
769
770    def __init__(self, play_status: PlayStatus) -> None:
771        super().__init__(EventId.PLAYBACK_STATUS_CHANGED)
772        self.play_status = play_status
773
774    def __bytes__(self) -> bytes:
775        return bytes([self.event_id]) + bytes([self.play_status])
776
777
778# -----------------------------------------------------------------------------
779@dataclass
780class PlaybackPositionChangedEvent(Event):
781    playback_position: int
782
783    @classmethod
784    def from_bytes(cls, pdu: bytes) -> PlaybackPositionChangedEvent:
785        return cls(playback_position=struct.unpack_from(">I", pdu, 1)[0])
786
787    def __init__(self, playback_position: int) -> None:
788        super().__init__(EventId.PLAYBACK_POS_CHANGED)
789        self.playback_position = playback_position
790
791    def __bytes__(self) -> bytes:
792        return bytes([self.event_id]) + struct.pack(">I", self.playback_position)
793
794
795# -----------------------------------------------------------------------------
796@dataclass
797class TrackChangedEvent(Event):
798    identifier: bytes
799
800    @classmethod
801    def from_bytes(cls, pdu: bytes) -> TrackChangedEvent:
802        return cls(identifier=pdu[1:])
803
804    def __init__(self, identifier: bytes) -> None:
805        super().__init__(EventId.TRACK_CHANGED)
806        self.identifier = identifier
807
808    def __bytes__(self) -> bytes:
809        return bytes([self.event_id]) + self.identifier
810
811
812# -----------------------------------------------------------------------------
813@dataclass
814class PlayerApplicationSettingChangedEvent(Event):
815    @dataclass
816    class Setting:
817        attribute_id: ApplicationSetting.AttributeId
818        value_id: OpenIntEnum
819
820    player_application_settings: List[Setting]
821
822    @classmethod
823    def from_bytes(cls, pdu: bytes) -> PlayerApplicationSettingChangedEvent:
824        def setting(attribute_id_int: int, value_id_int: int):
825            attribute_id = ApplicationSetting.AttributeId(attribute_id_int)
826            value_id: OpenIntEnum
827            if attribute_id == ApplicationSetting.AttributeId.EQUALIZER_ON_OFF:
828                value_id = ApplicationSetting.EqualizerOnOffStatus(value_id_int)
829            elif attribute_id == ApplicationSetting.AttributeId.REPEAT_MODE:
830                value_id = ApplicationSetting.RepeatModeStatus(value_id_int)
831            elif attribute_id == ApplicationSetting.AttributeId.SHUFFLE_ON_OFF:
832                value_id = ApplicationSetting.ShuffleOnOffStatus(value_id_int)
833            elif attribute_id == ApplicationSetting.AttributeId.SCAN_ON_OFF:
834                value_id = ApplicationSetting.ScanOnOffStatus(value_id_int)
835            else:
836                value_id = ApplicationSetting.GenericValue(value_id_int)
837
838            return cls.Setting(attribute_id, value_id)
839
840        settings = [
841            setting(pdu[2 + (i * 2)], pdu[2 + (i * 2) + 1]) for i in range(pdu[1])
842        ]
843        return cls(player_application_settings=settings)
844
845    def __init__(self, player_application_settings: Sequence[Setting]) -> None:
846        super().__init__(EventId.PLAYER_APPLICATION_SETTING_CHANGED)
847        self.player_application_settings = list(player_application_settings)
848
849    def __bytes__(self) -> bytes:
850        return (
851            bytes([self.event_id])
852            + bytes([len(self.player_application_settings)])
853            + b''.join(
854                [
855                    bytes([setting.attribute_id, setting.value_id])
856                    for setting in self.player_application_settings
857                ]
858            )
859        )
860
861
862# -----------------------------------------------------------------------------
863@dataclass
864class NowPlayingContentChangedEvent(Event):
865    @classmethod
866    def from_bytes(cls, pdu: bytes) -> NowPlayingContentChangedEvent:
867        return cls()
868
869    def __init__(self) -> None:
870        super().__init__(EventId.NOW_PLAYING_CONTENT_CHANGED)
871
872
873# -----------------------------------------------------------------------------
874@dataclass
875class AvailablePlayersChangedEvent(Event):
876    @classmethod
877    def from_bytes(cls, pdu: bytes) -> AvailablePlayersChangedEvent:
878        return cls()
879
880    def __init__(self) -> None:
881        super().__init__(EventId.AVAILABLE_PLAYERS_CHANGED)
882
883
884# -----------------------------------------------------------------------------
885@dataclass
886class AddressedPlayerChangedEvent(Event):
887    @dataclass
888    class Player:
889        player_id: int
890        uid_counter: int
891
892    @classmethod
893    def from_bytes(cls, pdu: bytes) -> AddressedPlayerChangedEvent:
894        player_id, uid_counter = struct.unpack_from("<HH", pdu, 1)
895        return cls(cls.Player(player_id, uid_counter))
896
897    def __init__(self, player: Player) -> None:
898        super().__init__(EventId.ADDRESSED_PLAYER_CHANGED)
899        self.player = player
900
901    def __bytes__(self) -> bytes:
902        return bytes([self.event_id]) + struct.pack(
903            ">HH", self.player.player_id, self.player.uid_counter
904        )
905
906
907# -----------------------------------------------------------------------------
908@dataclass
909class UidsChangedEvent(Event):
910    uid_counter: int
911
912    @classmethod
913    def from_bytes(cls, pdu: bytes) -> UidsChangedEvent:
914        return cls(uid_counter=struct.unpack_from(">H", pdu, 1)[0])
915
916    def __init__(self, uid_counter: int) -> None:
917        super().__init__(EventId.UIDS_CHANGED)
918        self.uid_counter = uid_counter
919
920    def __bytes__(self) -> bytes:
921        return bytes([self.event_id]) + struct.pack(">H", self.uid_counter)
922
923
924# -----------------------------------------------------------------------------
925@dataclass
926class VolumeChangedEvent(Event):
927    volume: int
928
929    @classmethod
930    def from_bytes(cls, pdu: bytes) -> VolumeChangedEvent:
931        return cls(volume=pdu[1])
932
933    def __init__(self, volume: int) -> None:
934        super().__init__(EventId.VOLUME_CHANGED)
935        self.volume = volume
936
937    def __bytes__(self) -> bytes:
938        return bytes([self.event_id]) + bytes([self.volume])
939
940
941# -----------------------------------------------------------------------------
942EVENT_SUBCLASSES: Dict[EventId, Type[Event]] = {
943    EventId.PLAYBACK_STATUS_CHANGED: PlaybackStatusChangedEvent,
944    EventId.PLAYBACK_POS_CHANGED: PlaybackPositionChangedEvent,
945    EventId.TRACK_CHANGED: TrackChangedEvent,
946    EventId.PLAYER_APPLICATION_SETTING_CHANGED: PlayerApplicationSettingChangedEvent,
947    EventId.NOW_PLAYING_CONTENT_CHANGED: NowPlayingContentChangedEvent,
948    EventId.AVAILABLE_PLAYERS_CHANGED: AvailablePlayersChangedEvent,
949    EventId.ADDRESSED_PLAYER_CHANGED: AddressedPlayerChangedEvent,
950    EventId.UIDS_CHANGED: UidsChangedEvent,
951    EventId.VOLUME_CHANGED: VolumeChangedEvent,
952}
953
954
955# -----------------------------------------------------------------------------
956class Delegate:
957    """
958    Base class for AVRCP delegates.
959
960    All the methods are async, even if they don't always need to be, so that
961    delegates that do need to wait for an async result may do so.
962    """
963
964    class Error(Exception):
965        """The delegate method failed, with a specified status code."""
966
967        def __init__(self, status_code: Protocol.StatusCode) -> None:
968            self.status_code = status_code
969
970    supported_events: List[EventId]
971    volume: int
972
973    def __init__(self, supported_events: Iterable[EventId] = ()) -> None:
974        self.supported_events = list(supported_events)
975        self.volume = 0
976
977    async def get_supported_events(self) -> List[EventId]:
978        return self.supported_events
979
980    async def set_absolute_volume(self, volume: int) -> None:
981        """
982        Set the absolute volume.
983
984        Returns: the effective volume that was set.
985        """
986        logger.debug(f"@@@ set_absolute_volume: volume={volume}")
987        self.volume = volume
988
989    async def get_absolute_volume(self) -> int:
990        return self.volume
991
992    # TODO add other delegate methods
993
994
995# -----------------------------------------------------------------------------
996class Protocol(pyee.EventEmitter):
997    """AVRCP Controller and Target protocol."""
998
999    class PacketType(enum.IntEnum):
1000        SINGLE = 0b00
1001        START = 0b01
1002        CONTINUE = 0b10
1003        END = 0b11
1004
1005    class PduId(OpenIntEnum):
1006        GET_CAPABILITIES = 0x10
1007        LIST_PLAYER_APPLICATION_SETTING_ATTRIBUTES = 0x11
1008        LIST_PLAYER_APPLICATION_SETTING_VALUES = 0x12
1009        GET_CURRENT_PLAYER_APPLICATION_SETTING_VALUE = 0x13
1010        SET_PLAYER_APPLICATION_SETTING_VALUE = 0x14
1011        GET_PLAYER_APPLICATION_SETTING_ATTRIBUTE_TEXT = 0x15
1012        GET_PLAYER_APPLICATION_SETTING_VALUE_TEXT = 0x16
1013        INFORM_DISPLAYABLE_CHARACTER_SET = 0x17
1014        INFORM_BATTERY_STATUS_OF_CT = 0x18
1015        GET_ELEMENT_ATTRIBUTES = 0x20
1016        GET_PLAY_STATUS = 0x30
1017        REGISTER_NOTIFICATION = 0x31
1018        REQUEST_CONTINUING_RESPONSE = 0x40
1019        ABORT_CONTINUING_RESPONSE = 0x41
1020        SET_ABSOLUTE_VOLUME = 0x50
1021        SET_ADDRESSED_PLAYER = 0x60
1022        SET_BROWSED_PLAYER = 0x70
1023        GET_FOLDER_ITEMS = 0x71
1024        GET_TOTAL_NUMBER_OF_ITEMS = 0x75
1025
1026    class StatusCode(OpenIntEnum):
1027        INVALID_COMMAND = 0x00
1028        INVALID_PARAMETER = 0x01
1029        PARAMETER_CONTENT_ERROR = 0x02
1030        INTERNAL_ERROR = 0x03
1031        OPERATION_COMPLETED = 0x04
1032        UID_CHANGED = 0x05
1033        INVALID_DIRECTION = 0x07
1034        NOT_A_DIRECTORY = 0x08
1035        DOES_NOT_EXIST = 0x09
1036        INVALID_SCOPE = 0x0A
1037        RANGE_OUT_OF_BOUNDS = 0x0B
1038        FOLDER_ITEM_IS_NOT_PLAYABLE = 0x0C
1039        MEDIA_IN_USE = 0x0D
1040        NOW_PLAYING_LIST_FULL = 0x0E
1041        SEARCH_NOT_SUPPORTED = 0x0F
1042        SEARCH_IN_PROGRESS = 0x10
1043        INVALID_PLAYER_ID = 0x11
1044        PLAYER_NOT_BROWSABLE = 0x12
1045        PLAYER_NOT_ADDRESSED = 0x13
1046        NO_VALID_SEARCH_RESULTS = 0x14
1047        NO_AVAILABLE_PLAYERS = 0x15
1048        ADDRESSED_PLAYER_CHANGED = 0x16
1049
1050    class InvalidPidError(Exception):
1051        """A response frame with ipid==1 was received."""
1052
1053    class NotPendingError(Exception):
1054        """There is no pending command for a transaction label."""
1055
1056    class MismatchedResponseError(Exception):
1057        """The response type does not corresponding to the request type."""
1058
1059        def __init__(self, response: Response) -> None:
1060            self.response = response
1061
1062    class UnexpectedResponseTypeError(Exception):
1063        """The response type is not the expected one."""
1064
1065        def __init__(self, response: Protocol.ResponseContext) -> None:
1066            self.response = response
1067
1068    class UnexpectedResponseCodeError(Exception):
1069        """The response code was not the expected one."""
1070
1071        def __init__(
1072            self, response_code: avc.ResponseFrame.ResponseCode, response: Response
1073        ) -> None:
1074            self.response_code = response_code
1075            self.response = response
1076
1077    class PendingCommand:
1078        response: asyncio.Future
1079
1080        def __init__(self, transaction_label: int) -> None:
1081            self.transaction_label = transaction_label
1082            self.reset()
1083
1084        def reset(self):
1085            self.response = asyncio.get_running_loop().create_future()
1086
1087    @dataclass
1088    class ReceiveCommandState:
1089        transaction_label: int
1090        command_type: avc.CommandFrame.CommandType
1091
1092    @dataclass
1093    class ReceiveResponseState:
1094        transaction_label: int
1095        response_code: avc.ResponseFrame.ResponseCode
1096
1097    @dataclass
1098    class ResponseContext:
1099        transaction_label: int
1100        response: Response
1101
1102    @dataclass
1103    class FinalResponse(ResponseContext):
1104        response_code: avc.ResponseFrame.ResponseCode
1105
1106    @dataclass
1107    class InterimResponse(ResponseContext):
1108        final: Awaitable[Protocol.FinalResponse]
1109
1110    @dataclass
1111    class NotificationListener:
1112        transaction_label: int
1113        register_notification_command: RegisterNotificationCommand
1114
1115    delegate: Delegate
1116    send_transaction_label: int
1117    command_pdu_assembler: PduAssembler
1118    receive_command_state: Optional[ReceiveCommandState]
1119    response_pdu_assembler: PduAssembler
1120    receive_response_state: Optional[ReceiveResponseState]
1121    avctp_protocol: Optional[avctp.Protocol]
1122    free_commands: asyncio.Queue
1123    pending_commands: Dict[int, PendingCommand]  # Pending commands, by label
1124    notification_listeners: Dict[EventId, NotificationListener]
1125
1126    @staticmethod
1127    def _check_vendor_dependent_frame(
1128        frame: Union[avc.VendorDependentCommandFrame, avc.VendorDependentResponseFrame]
1129    ) -> bool:
1130        if frame.company_id != AVRCP_BLUETOOTH_SIG_COMPANY_ID:
1131            logger.debug("unsupported company id, ignoring")
1132            return False
1133
1134        if frame.subunit_type != avc.Frame.SubunitType.PANEL or frame.subunit_id != 0:
1135            logger.debug("unsupported subunit")
1136            return False
1137
1138        return True
1139
1140    def __init__(self, delegate: Optional[Delegate] = None) -> None:
1141        super().__init__()
1142        self.delegate = delegate if delegate else Delegate()
1143        self.command_pdu_assembler = PduAssembler(self._on_command_pdu)
1144        self.receive_command_state = None
1145        self.response_pdu_assembler = PduAssembler(self._on_response_pdu)
1146        self.receive_response_state = None
1147        self.avctp_protocol = None
1148        self.notification_listeners = {}
1149
1150        # Create an initial pool of free commands
1151        self.pending_commands = {}
1152        self.free_commands = asyncio.Queue()
1153        for transaction_label in range(16):
1154            self.free_commands.put_nowait(self.PendingCommand(transaction_label))
1155
1156    def listen(self, device: Device) -> None:
1157        """
1158        Listen for incoming connections.
1159
1160        A 'connection' event will be emitted when a connection is made, and a 'start'
1161        event will be emitted when the protocol is ready to be used on that connection.
1162        """
1163        device.register_l2cap_server(avctp.AVCTP_PSM, self._on_avctp_connection)
1164
1165    async def connect(self, connection: Connection) -> None:
1166        """
1167        Connect to a peer.
1168        """
1169        avctp_channel = await connection.create_l2cap_channel(
1170            l2cap.ClassicChannelSpec(psm=avctp.AVCTP_PSM)
1171        )
1172        self._on_avctp_channel_open(avctp_channel)
1173
1174    async def _obtain_pending_command(self) -> PendingCommand:
1175        pending_command = await self.free_commands.get()
1176        self.pending_commands[pending_command.transaction_label] = pending_command
1177        return pending_command
1178
1179    def recycle_pending_command(self, pending_command: PendingCommand) -> None:
1180        pending_command.reset()
1181        del self.pending_commands[pending_command.transaction_label]
1182        self.free_commands.put_nowait(pending_command)
1183        logger.debug(f"recycled pending command, {self.free_commands.qsize()} free")
1184
1185    _R = TypeVar('_R')
1186
1187    @staticmethod
1188    def _check_response(
1189        response_context: ResponseContext, expected_type: Type[_R]
1190    ) -> _R:
1191        if isinstance(response_context, Protocol.FinalResponse):
1192            if (
1193                response_context.response_code
1194                != avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE
1195            ):
1196                raise Protocol.UnexpectedResponseCodeError(
1197                    response_context.response_code, response_context.response
1198                )
1199
1200            if not (isinstance(response_context.response, expected_type)):
1201                raise Protocol.MismatchedResponseError(response_context.response)
1202
1203            return response_context.response
1204
1205        raise Protocol.UnexpectedResponseTypeError(response_context)
1206
1207    def _delegate_command(
1208        self, transaction_label: int, command: Command, method: Awaitable
1209    ) -> None:
1210        async def call():
1211            try:
1212                await method
1213            except Delegate.Error as error:
1214                self.send_rejected_avrcp_response(
1215                    transaction_label,
1216                    command.pdu_id,
1217                    error.status_code,
1218                )
1219            except Exception:
1220                logger.exception("delegate method raised exception")
1221                self.send_rejected_avrcp_response(
1222                    transaction_label,
1223                    command.pdu_id,
1224                    Protocol.StatusCode.INTERNAL_ERROR,
1225                )
1226
1227        utils.AsyncRunner.spawn(call())
1228
1229    async def get_supported_events(self) -> List[EventId]:
1230        """Get the list of events supported by the connected peer."""
1231        response_context = await self.send_avrcp_command(
1232            avc.CommandFrame.CommandType.STATUS,
1233            GetCapabilitiesCommand(
1234                GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED
1235            ),
1236        )
1237        response = self._check_response(response_context, GetCapabilitiesResponse)
1238        return cast(List[EventId], response.capabilities)
1239
1240    async def get_play_status(self) -> SongAndPlayStatus:
1241        """Get the play status of the connected peer."""
1242        response_context = await self.send_avrcp_command(
1243            avc.CommandFrame.CommandType.STATUS, GetPlayStatusCommand()
1244        )
1245        response = self._check_response(response_context, GetPlayStatusResponse)
1246        return SongAndPlayStatus(
1247            response.song_length, response.song_position, response.play_status
1248        )
1249
1250    async def get_element_attributes(
1251        self, element_identifier: int, attribute_ids: Sequence[MediaAttributeId]
1252    ) -> List[MediaAttribute]:
1253        """Get element attributes from the connected peer."""
1254        response_context = await self.send_avrcp_command(
1255            avc.CommandFrame.CommandType.STATUS,
1256            GetElementAttributesCommand(element_identifier, attribute_ids),
1257        )
1258        response = self._check_response(response_context, GetElementAttributesResponse)
1259        return response.attributes
1260
1261    async def monitor_events(
1262        self, event_id: EventId, playback_interval: int = 0
1263    ) -> AsyncIterator[Event]:
1264        """
1265        Monitor events emitted from a peer.
1266
1267        This generator yields Event objects.
1268        """
1269
1270        def check_response(response) -> Event:
1271            if not isinstance(response, RegisterNotificationResponse):
1272                raise self.MismatchedResponseError(response)
1273
1274            return response.event
1275
1276        while True:
1277            response = await self.send_avrcp_command(
1278                avc.CommandFrame.CommandType.NOTIFY,
1279                RegisterNotificationCommand(event_id, playback_interval),
1280            )
1281
1282            if isinstance(response, self.InterimResponse):
1283                logger.debug(f"interim: {response}")
1284                yield check_response(response.response)
1285
1286                logger.debug("waiting for final response")
1287                response = await response.final
1288
1289            if not isinstance(response, self.FinalResponse):
1290                raise self.UnexpectedResponseTypeError(response)
1291
1292            logger.debug(f"final: {response}")
1293            if response.response_code != avc.ResponseFrame.ResponseCode.CHANGED:
1294                raise self.UnexpectedResponseCodeError(
1295                    response.response_code, response.response
1296                )
1297
1298            yield check_response(response.response)
1299
1300    async def monitor_playback_status(
1301        self,
1302    ) -> AsyncIterator[PlayStatus]:
1303        """Monitor Playback Status changes from the connected peer."""
1304        async for event in self.monitor_events(EventId.PLAYBACK_STATUS_CHANGED, 0):
1305            if not isinstance(event, PlaybackStatusChangedEvent):
1306                logger.warning("unexpected event class")
1307                continue
1308            yield event.play_status
1309
1310    async def monitor_track_changed(
1311        self,
1312    ) -> AsyncIterator[bytes]:
1313        """Monitor Track changes from the connected peer."""
1314        async for event in self.monitor_events(EventId.TRACK_CHANGED, 0):
1315            if not isinstance(event, TrackChangedEvent):
1316                logger.warning("unexpected event class")
1317                continue
1318            yield event.identifier
1319
1320    async def monitor_playback_position(
1321        self, playback_interval: int
1322    ) -> AsyncIterator[int]:
1323        """Monitor Playback Position changes from the connected peer."""
1324        async for event in self.monitor_events(
1325            EventId.PLAYBACK_POS_CHANGED, playback_interval
1326        ):
1327            if not isinstance(event, PlaybackPositionChangedEvent):
1328                logger.warning("unexpected event class")
1329                continue
1330            yield event.playback_position
1331
1332    async def monitor_player_application_settings(
1333        self,
1334    ) -> AsyncIterator[List[PlayerApplicationSettingChangedEvent.Setting]]:
1335        """Monitor Player Application Setting changes from the connected peer."""
1336        async for event in self.monitor_events(
1337            EventId.PLAYER_APPLICATION_SETTING_CHANGED, 0
1338        ):
1339            if not isinstance(event, PlayerApplicationSettingChangedEvent):
1340                logger.warning("unexpected event class")
1341                continue
1342            yield event.player_application_settings
1343
1344    async def monitor_now_playing_content(self) -> AsyncIterator[None]:
1345        """Monitor Now Playing changes from the connected peer."""
1346        async for event in self.monitor_events(EventId.NOW_PLAYING_CONTENT_CHANGED, 0):
1347            if not isinstance(event, NowPlayingContentChangedEvent):
1348                logger.warning("unexpected event class")
1349                continue
1350            yield None
1351
1352    async def monitor_available_players(self) -> AsyncIterator[None]:
1353        """Monitor Available Players changes from the connected peer."""
1354        async for event in self.monitor_events(EventId.AVAILABLE_PLAYERS_CHANGED, 0):
1355            if not isinstance(event, AvailablePlayersChangedEvent):
1356                logger.warning("unexpected event class")
1357                continue
1358            yield None
1359
1360    async def monitor_addressed_player(
1361        self,
1362    ) -> AsyncIterator[AddressedPlayerChangedEvent.Player]:
1363        """Monitor Addressed Player changes from the connected peer."""
1364        async for event in self.monitor_events(EventId.ADDRESSED_PLAYER_CHANGED, 0):
1365            if not isinstance(event, AddressedPlayerChangedEvent):
1366                logger.warning("unexpected event class")
1367                continue
1368            yield event.player
1369
1370    async def monitor_uids(
1371        self,
1372    ) -> AsyncIterator[int]:
1373        """Monitor UID changes from the connected peer."""
1374        async for event in self.monitor_events(EventId.UIDS_CHANGED, 0):
1375            if not isinstance(event, UidsChangedEvent):
1376                logger.warning("unexpected event class")
1377                continue
1378            yield event.uid_counter
1379
1380    async def monitor_volume(
1381        self,
1382    ) -> AsyncIterator[int]:
1383        """Monitor Volume changes from the connected peer."""
1384        async for event in self.monitor_events(EventId.VOLUME_CHANGED, 0):
1385            if not isinstance(event, VolumeChangedEvent):
1386                logger.warning("unexpected event class")
1387                continue
1388            yield event.volume
1389
1390    def notify_event(self, event: Event):
1391        """Notify an event to the connected peer."""
1392        if (listener := self.notification_listeners.get(event.event_id)) is None:
1393            logger.debug(f"no listener for {event.event_id.name}")
1394            return
1395
1396        # Emit the notification.
1397        notification = RegisterNotificationResponse(event)
1398        self.send_avrcp_response(
1399            listener.transaction_label,
1400            avc.ResponseFrame.ResponseCode.CHANGED,
1401            notification,
1402        )
1403
1404        # Remove the listener (they will need to re-register).
1405        del self.notification_listeners[event.event_id]
1406
1407    def notify_playback_status_changed(self, status: PlayStatus) -> None:
1408        """Notify the connected peer of a Playback Status change."""
1409        self.notify_event(PlaybackStatusChangedEvent(status))
1410
1411    def notify_track_changed(self, identifier: bytes) -> None:
1412        """Notify the connected peer of a Track change."""
1413        if len(identifier) != 8:
1414            raise ValueError("identifier must be 8 bytes")
1415        self.notify_event(TrackChangedEvent(identifier))
1416
1417    def notify_playback_position_changed(self, position: int) -> None:
1418        """Notify the connected peer of a Position change."""
1419        self.notify_event(PlaybackPositionChangedEvent(position))
1420
1421    def notify_player_application_settings_changed(
1422        self, settings: Sequence[PlayerApplicationSettingChangedEvent.Setting]
1423    ) -> None:
1424        """Notify the connected peer of an Player Application Setting change."""
1425        self.notify_event(
1426            PlayerApplicationSettingChangedEvent(settings),
1427        )
1428
1429    def notify_now_playing_content_changed(self) -> None:
1430        """Notify the connected peer of a Now Playing change."""
1431        self.notify_event(NowPlayingContentChangedEvent())
1432
1433    def notify_available_players_changed(self) -> None:
1434        """Notify the connected peer of an Available Players change."""
1435        self.notify_event(AvailablePlayersChangedEvent())
1436
1437    def notify_addressed_player_changed(
1438        self, player: AddressedPlayerChangedEvent.Player
1439    ) -> None:
1440        """Notify the connected peer of an Addressed Player change."""
1441        self.notify_event(AddressedPlayerChangedEvent(player))
1442
1443    def notify_uids_changed(self, uid_counter: int) -> None:
1444        """Notify the connected peer of a UID change."""
1445        self.notify_event(UidsChangedEvent(uid_counter))
1446
1447    def notify_volume_changed(self, volume: int) -> None:
1448        """Notify the connected peer of a Volume change."""
1449        self.notify_event(VolumeChangedEvent(volume))
1450
1451    def _register_notification_listener(
1452        self, transaction_label: int, command: RegisterNotificationCommand
1453    ) -> None:
1454        listener = self.NotificationListener(transaction_label, command)
1455        self.notification_listeners[command.event_id] = listener
1456
1457    def _on_avctp_connection(self, l2cap_channel: l2cap.ClassicChannel) -> None:
1458        logger.debug("AVCTP connection established")
1459        l2cap_channel.on("open", lambda: self._on_avctp_channel_open(l2cap_channel))
1460
1461        self.emit("connection")
1462
1463    def _on_avctp_channel_open(self, l2cap_channel: l2cap.ClassicChannel) -> None:
1464        logger.debug("AVCTP channel open")
1465        if self.avctp_protocol is not None:
1466            # TODO: find a better strategy instead of just closing
1467            logger.warning("AVCTP protocol already active, closing connection")
1468            AsyncRunner.spawn(l2cap_channel.disconnect())
1469            return
1470
1471        self.avctp_protocol = avctp.Protocol(l2cap_channel)
1472        self.avctp_protocol.register_command_handler(AVRCP_PID, self._on_avctp_command)
1473        self.avctp_protocol.register_response_handler(
1474            AVRCP_PID, self._on_avctp_response
1475        )
1476        l2cap_channel.on("close", self._on_avctp_channel_close)
1477
1478        self.emit("start")
1479
1480    def _on_avctp_channel_close(self) -> None:
1481        logger.debug("AVCTP channel closed")
1482        self.avctp_protocol = None
1483
1484        self.emit("stop")
1485
1486    def _on_avctp_command(
1487        self, transaction_label: int, command: avc.CommandFrame
1488    ) -> None:
1489        logger.debug(
1490            f"<<< AVCTP Command, transaction_label={transaction_label}: " f"{command}"
1491        )
1492
1493        # Only the PANEL subunit type with subunit ID 0 is supported in this profile.
1494        if (
1495            command.subunit_type != avc.Frame.SubunitType.PANEL
1496            or command.subunit_id != 0
1497        ):
1498            logger.debug("subunit not supported")
1499            self.send_not_implemented_response(transaction_label, command)
1500            return
1501
1502        if isinstance(command, avc.VendorDependentCommandFrame):
1503            if not self._check_vendor_dependent_frame(command):
1504                return
1505
1506            if self.receive_command_state is None:
1507                self.receive_command_state = self.ReceiveCommandState(
1508                    transaction_label=transaction_label, command_type=command.ctype
1509                )
1510            elif (
1511                self.receive_command_state.transaction_label != transaction_label
1512                or self.receive_command_state.command_type != command.ctype
1513            ):
1514                # We're in the middle of some other PDU
1515                logger.warning("received interleaved PDU, resetting state")
1516                self.command_pdu_assembler.reset()
1517                self.receive_command_state = None
1518                return
1519            else:
1520                self.receive_command_state.command_type = command.ctype
1521                self.receive_command_state.transaction_label = transaction_label
1522
1523            self.command_pdu_assembler.on_pdu(command.vendor_dependent_data)
1524            return
1525
1526        if isinstance(command, avc.PassThroughCommandFrame):
1527            # TODO: delegate
1528            response = avc.PassThroughResponseFrame(
1529                avc.ResponseFrame.ResponseCode.ACCEPTED,
1530                avc.Frame.SubunitType.PANEL,
1531                0,
1532                command.state_flag,
1533                command.operation_id,
1534                command.operation_data,
1535            )
1536            self.send_response(transaction_label, response)
1537            return
1538
1539        # TODO handle other types
1540        self.send_not_implemented_response(transaction_label, command)
1541
1542    def _on_avctp_response(
1543        self, transaction_label: int, response: Optional[avc.ResponseFrame]
1544    ) -> None:
1545        logger.debug(
1546            f"<<< AVCTP Response, transaction_label={transaction_label}: {response}"
1547        )
1548
1549        # Check that we have a pending command that matches this response.
1550        if not (pending_command := self.pending_commands.get(transaction_label)):
1551            logger.warning("no pending command with this transaction label")
1552            return
1553
1554        # A None response means an invalid PID was used in the request.
1555        if response is None:
1556            pending_command.response.set_exception(self.InvalidPidError())
1557
1558        if isinstance(response, avc.VendorDependentResponseFrame):
1559            if not self._check_vendor_dependent_frame(response):
1560                return
1561
1562            if self.receive_response_state is None:
1563                self.receive_response_state = self.ReceiveResponseState(
1564                    transaction_label=transaction_label, response_code=response.response
1565                )
1566            elif (
1567                self.receive_response_state.transaction_label != transaction_label
1568                or self.receive_response_state.response_code != response.response
1569            ):
1570                # We're in the middle of some other PDU
1571                logger.warning("received interleaved PDU, resetting state")
1572                self.response_pdu_assembler.reset()
1573                self.receive_response_state = None
1574                return
1575            else:
1576                self.receive_response_state.response_code = response.response
1577                self.receive_response_state.transaction_label = transaction_label
1578
1579            self.response_pdu_assembler.on_pdu(response.vendor_dependent_data)
1580            return
1581
1582        if isinstance(response, avc.PassThroughResponseFrame):
1583            pending_command.response.set_result(response)
1584
1585        # TODO handle other types
1586
1587        self.recycle_pending_command(pending_command)
1588
1589    def _on_command_pdu(self, pdu_id: PduId, pdu: bytes) -> None:
1590        logger.debug(f"<<< AVRCP command PDU [pdu_id={pdu_id.name}]: {pdu.hex()}")
1591
1592        assert self.receive_command_state is not None
1593        transaction_label = self.receive_command_state.transaction_label
1594
1595        # Dispatch the command.
1596        # NOTE: with a small number of supported commands, a manual dispatch like this
1597        # is Ok, but if/when more commands are supported, a lookup dispatch mechanism
1598        # would be more appropriate.
1599        # TODO: switch on ctype
1600        if self.receive_command_state.command_type in (
1601            avc.CommandFrame.CommandType.CONTROL,
1602            avc.CommandFrame.CommandType.STATUS,
1603            avc.CommandFrame.CommandType.NOTIFY,
1604        ):
1605            # TODO: catch exceptions from delegates
1606            if pdu_id == self.PduId.GET_CAPABILITIES:
1607                self._on_get_capabilities_command(
1608                    transaction_label, GetCapabilitiesCommand.from_bytes(pdu)
1609                )
1610            elif pdu_id == self.PduId.SET_ABSOLUTE_VOLUME:
1611                self._on_set_absolute_volume_command(
1612                    transaction_label, SetAbsoluteVolumeCommand.from_bytes(pdu)
1613                )
1614            elif pdu_id == self.PduId.REGISTER_NOTIFICATION:
1615                self._on_register_notification_command(
1616                    transaction_label, RegisterNotificationCommand.from_bytes(pdu)
1617                )
1618            else:
1619                # Not supported.
1620                # TODO: check that this is the right way to respond in this case.
1621                logger.debug("unsupported PDU ID")
1622                self.send_rejected_avrcp_response(
1623                    transaction_label, pdu_id, self.StatusCode.INVALID_PARAMETER
1624                )
1625        else:
1626            logger.debug("unsupported command type")
1627            self.send_rejected_avrcp_response(
1628                transaction_label, pdu_id, self.StatusCode.INVALID_COMMAND
1629            )
1630
1631        self.receive_command_state = None
1632
1633    def _on_response_pdu(self, pdu_id: PduId, pdu: bytes) -> None:
1634        logger.debug(f"<<< AVRCP response PDU [pdu_id={pdu_id.name}]: {pdu.hex()}")
1635
1636        assert self.receive_response_state is not None
1637
1638        transaction_label = self.receive_response_state.transaction_label
1639        response_code = self.receive_response_state.response_code
1640        self.receive_response_state = None
1641
1642        # Check that we have a pending command that matches this response.
1643        if not (pending_command := self.pending_commands.get(transaction_label)):
1644            logger.warning("no pending command with this transaction label")
1645            return
1646
1647        # Convert the PDU bytes into a response object.
1648        # NOTE: with a small number of supported responses, a manual switch like this
1649        # is Ok, but if/when more responses are supported, a lookup mechanism would be
1650        # more appropriate.
1651        response: Optional[Response] = None
1652        if response_code == avc.ResponseFrame.ResponseCode.REJECTED:
1653            response = RejectedResponse.from_bytes(pdu_id, pdu)
1654        elif response_code == avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED:
1655            response = NotImplementedResponse.from_bytes(pdu_id, pdu)
1656        elif response_code in (
1657            avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
1658            avc.ResponseFrame.ResponseCode.INTERIM,
1659            avc.ResponseFrame.ResponseCode.CHANGED,
1660            avc.ResponseFrame.ResponseCode.ACCEPTED,
1661        ):
1662            if pdu_id == self.PduId.GET_CAPABILITIES:
1663                response = GetCapabilitiesResponse.from_bytes(pdu)
1664            elif pdu_id == self.PduId.GET_PLAY_STATUS:
1665                response = GetPlayStatusResponse.from_bytes(pdu)
1666            elif pdu_id == self.PduId.GET_ELEMENT_ATTRIBUTES:
1667                response = GetElementAttributesResponse.from_bytes(pdu)
1668            elif pdu_id == self.PduId.SET_ABSOLUTE_VOLUME:
1669                response = SetAbsoluteVolumeResponse.from_bytes(pdu)
1670            elif pdu_id == self.PduId.REGISTER_NOTIFICATION:
1671                response = RegisterNotificationResponse.from_bytes(pdu)
1672            else:
1673                logger.debug("unexpected PDU ID")
1674                pending_command.response.set_exception(
1675                    ProtocolError(
1676                        error_code=None,
1677                        error_namespace="avrcp",
1678                        details="unexpected PDU ID",
1679                    )
1680                )
1681        else:
1682            logger.debug("unexpected response code")
1683            pending_command.response.set_exception(
1684                ProtocolError(
1685                    error_code=None,
1686                    error_namespace="avrcp",
1687                    details="unexpected response code",
1688                )
1689            )
1690
1691        if response is None:
1692            self.recycle_pending_command(pending_command)
1693            return
1694
1695        logger.debug(f"<<< AVRCP response: {response}")
1696
1697        # Make the response available to the waiter.
1698        if response_code == avc.ResponseFrame.ResponseCode.INTERIM:
1699            pending_interim_response = pending_command.response
1700            pending_command.reset()
1701            pending_interim_response.set_result(
1702                self.InterimResponse(
1703                    pending_command.transaction_label,
1704                    response,
1705                    pending_command.response,
1706                )
1707            )
1708        else:
1709            pending_command.response.set_result(
1710                self.FinalResponse(
1711                    pending_command.transaction_label,
1712                    response,
1713                    response_code,
1714                )
1715            )
1716            self.recycle_pending_command(pending_command)
1717
1718    def send_command(self, transaction_label: int, command: avc.CommandFrame) -> None:
1719        logger.debug(f">>> AVRCP command: {command}")
1720
1721        if self.avctp_protocol is None:
1722            logger.warning("trying to send command while avctp_protocol is None")
1723            return
1724
1725        self.avctp_protocol.send_command(transaction_label, AVRCP_PID, bytes(command))
1726
1727    async def send_passthrough_command(
1728        self, command: avc.PassThroughCommandFrame
1729    ) -> avc.PassThroughResponseFrame:
1730        # Wait for a free command slot.
1731        pending_command = await self._obtain_pending_command()
1732
1733        # Send the command.
1734        self.send_command(pending_command.transaction_label, command)
1735
1736        # Wait for the response.
1737        return await pending_command.response
1738
1739    async def send_key_event(
1740        self, key: avc.PassThroughCommandFrame.OperationId, pressed: bool
1741    ) -> avc.PassThroughResponseFrame:
1742        """Send a key event to the connected peer."""
1743        return await self.send_passthrough_command(
1744            avc.PassThroughCommandFrame(
1745                avc.CommandFrame.CommandType.CONTROL,
1746                avc.Frame.SubunitType.PANEL,
1747                0,
1748                (
1749                    avc.PassThroughFrame.StateFlag.PRESSED
1750                    if pressed
1751                    else avc.PassThroughFrame.StateFlag.RELEASED
1752                ),
1753                key,
1754                b'',
1755            )
1756        )
1757
1758    async def send_avrcp_command(
1759        self, command_type: avc.CommandFrame.CommandType, command: Command
1760    ) -> ResponseContext:
1761        # Wait for a free command slot.
1762        pending_command = await self._obtain_pending_command()
1763
1764        # TODO: fragmentation
1765        # Send the command.
1766        logger.debug(f">>> AVRCP command PDU: {command}")
1767        pdu = (
1768            struct.pack(">BBH", command.pdu_id, 0, len(command.parameter))
1769            + command.parameter
1770        )
1771        command_frame = avc.VendorDependentCommandFrame(
1772            command_type,
1773            avc.Frame.SubunitType.PANEL,
1774            0,
1775            AVRCP_BLUETOOTH_SIG_COMPANY_ID,
1776            pdu,
1777        )
1778        self.send_command(pending_command.transaction_label, command_frame)
1779
1780        # Wait for the response.
1781        return await pending_command.response
1782
1783    def send_response(
1784        self, transaction_label: int, response: avc.ResponseFrame
1785    ) -> None:
1786        assert self.avctp_protocol is not None
1787        logger.debug(f">>> AVRCP response: {response}")
1788        self.avctp_protocol.send_response(transaction_label, AVRCP_PID, bytes(response))
1789
1790    def send_passthrough_response(
1791        self,
1792        transaction_label: int,
1793        command: avc.PassThroughCommandFrame,
1794        response_code: avc.ResponseFrame.ResponseCode,
1795    ):
1796        response = avc.PassThroughResponseFrame(
1797            response_code,
1798            avc.Frame.SubunitType.PANEL,
1799            0,
1800            command.state_flag,
1801            command.operation_id,
1802            command.operation_data,
1803        )
1804        self.send_response(transaction_label, response)
1805
1806    def send_avrcp_response(
1807        self,
1808        transaction_label: int,
1809        response_code: avc.ResponseFrame.ResponseCode,
1810        response: Response,
1811    ) -> None:
1812        # TODO: fragmentation
1813        logger.debug(f">>> AVRCP response PDU: {response}")
1814        pdu = (
1815            struct.pack(">BBH", response.pdu_id, 0, len(response.parameter))
1816            + response.parameter
1817        )
1818        response_frame = avc.VendorDependentResponseFrame(
1819            response_code,
1820            avc.Frame.SubunitType.PANEL,
1821            0,
1822            AVRCP_BLUETOOTH_SIG_COMPANY_ID,
1823            pdu,
1824        )
1825        self.send_response(transaction_label, response_frame)
1826
1827    def send_not_implemented_response(
1828        self, transaction_label: int, command: avc.CommandFrame
1829    ) -> None:
1830        response = avc.ResponseFrame(
1831            avc.ResponseFrame.ResponseCode.NOT_IMPLEMENTED,
1832            command.subunit_type,
1833            command.subunit_id,
1834            command.opcode,
1835            command.operands,
1836        )
1837        self.send_response(transaction_label, response)
1838
1839    def send_rejected_avrcp_response(
1840        self, transaction_label: int, pdu_id: Protocol.PduId, status_code: StatusCode
1841    ) -> None:
1842        self.send_avrcp_response(
1843            transaction_label,
1844            avc.ResponseFrame.ResponseCode.REJECTED,
1845            RejectedResponse(pdu_id, status_code),
1846        )
1847
1848    def _on_get_capabilities_command(
1849        self, transaction_label: int, command: GetCapabilitiesCommand
1850    ) -> None:
1851        logger.debug(f"<<< AVRCP command PDU: {command}")
1852
1853        async def get_supported_events():
1854            if (
1855                command.capability_id
1856                != GetCapabilitiesCommand.CapabilityId.EVENTS_SUPPORTED
1857            ):
1858                raise Protocol.InvalidParameterError
1859
1860            supported_events = await self.delegate.get_supported_events()
1861            self.send_avrcp_response(
1862                transaction_label,
1863                avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
1864                GetCapabilitiesResponse(command.capability_id, supported_events),
1865            )
1866
1867        self._delegate_command(transaction_label, command, get_supported_events())
1868
1869    def _on_set_absolute_volume_command(
1870        self, transaction_label: int, command: SetAbsoluteVolumeCommand
1871    ) -> None:
1872        logger.debug(f"<<< AVRCP command PDU: {command}")
1873
1874        async def set_absolute_volume():
1875            await self.delegate.set_absolute_volume(command.volume)
1876            effective_volume = await self.delegate.get_absolute_volume()
1877            self.send_avrcp_response(
1878                transaction_label,
1879                avc.ResponseFrame.ResponseCode.IMPLEMENTED_OR_STABLE,
1880                SetAbsoluteVolumeResponse(effective_volume),
1881            )
1882
1883        self._delegate_command(transaction_label, command, set_absolute_volume())
1884
1885    def _on_register_notification_command(
1886        self, transaction_label: int, command: RegisterNotificationCommand
1887    ) -> None:
1888        logger.debug(f"<<< AVRCP command PDU: {command}")
1889
1890        async def register_notification():
1891            # Check if the event is supported.
1892            supported_events = await self.delegate.get_supported_events()
1893            if command.event_id in supported_events:
1894                if command.event_id == EventId.VOLUME_CHANGED:
1895                    volume = await self.delegate.get_absolute_volume()
1896                    response = RegisterNotificationResponse(VolumeChangedEvent(volume))
1897                    self.send_avrcp_response(
1898                        transaction_label,
1899                        avc.ResponseFrame.ResponseCode.INTERIM,
1900                        response,
1901                    )
1902                    self._register_notification_listener(transaction_label, command)
1903                    return
1904
1905                if command.event_id == EventId.PLAYBACK_STATUS_CHANGED:
1906                    # TODO: testing only, use delegate
1907                    response = RegisterNotificationResponse(
1908                        PlaybackStatusChangedEvent(play_status=PlayStatus.PLAYING)
1909                    )
1910                    self.send_avrcp_response(
1911                        transaction_label,
1912                        avc.ResponseFrame.ResponseCode.INTERIM,
1913                        response,
1914                    )
1915                    self._register_notification_listener(transaction_label, command)
1916                    return
1917
1918        self._delegate_command(transaction_label, command, register_notification())
1919