• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021-2023 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7#      https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# -----------------------------------------------------------------------------
16# Imports
17# -----------------------------------------------------------------------------
18from __future__ import annotations
19import enum
20import struct
21from typing import Dict, Type, Union, Tuple
22
23from bumble.utils import OpenIntEnum
24
25
26# -----------------------------------------------------------------------------
27class Frame:
28    class SubunitType(enum.IntEnum):
29        # AV/C Digital Interface Command Set General Specification Version 4.1
30        # Table 7.4
31        MONITOR = 0x00
32        AUDIO = 0x01
33        PRINTER = 0x02
34        DISC = 0x03
35        TAPE_RECORDER_OR_PLAYER = 0x04
36        TUNER = 0x05
37        CA = 0x06
38        CAMERA = 0x07
39        PANEL = 0x09
40        BULLETIN_BOARD = 0x0A
41        VENDOR_UNIQUE = 0x1C
42        EXTENDED = 0x1E
43        UNIT = 0x1F
44
45    class OperationCode(OpenIntEnum):
46        # 0x00 - 0x0F: Unit and subunit commands
47        VENDOR_DEPENDENT = 0x00
48        RESERVE = 0x01
49        PLUG_INFO = 0x02
50
51        # 0x10 - 0x3F: Unit commands
52        DIGITAL_OUTPUT = 0x10
53        DIGITAL_INPUT = 0x11
54        CHANNEL_USAGE = 0x12
55        OUTPUT_PLUG_SIGNAL_FORMAT = 0x18
56        INPUT_PLUG_SIGNAL_FORMAT = 0x19
57        GENERAL_BUS_SETUP = 0x1F
58        CONNECT_AV = 0x20
59        DISCONNECT_AV = 0x21
60        CONNECTIONS = 0x22
61        CONNECT = 0x24
62        DISCONNECT = 0x25
63        UNIT_INFO = 0x30
64        SUBUNIT_INFO = 0x31
65
66        # 0x40 - 0x7F: Subunit commands
67        PASS_THROUGH = 0x7C
68        GUI_UPDATE = 0x7D
69        PUSH_GUI_DATA = 0x7E
70        USER_ACTION = 0x7F
71
72        # 0xA0 - 0xBF: Unit and subunit commands
73        VERSION = 0xB0
74        POWER = 0xB2
75
76    subunit_type: SubunitType
77    subunit_id: int
78    opcode: OperationCode
79    operands: bytes
80
81    @staticmethod
82    def subclass(subclass):
83        # Infer the opcode from the class name
84        if subclass.__name__.endswith("CommandFrame"):
85            short_name = subclass.__name__.replace("CommandFrame", "")
86            category_class = CommandFrame
87        elif subclass.__name__.endswith("ResponseFrame"):
88            short_name = subclass.__name__.replace("ResponseFrame", "")
89            category_class = ResponseFrame
90        else:
91            raise ValueError(f"invalid subclass name {subclass.__name__}")
92
93        uppercase_indexes = [
94            i for i in range(len(short_name)) if short_name[i].isupper()
95        ]
96        uppercase_indexes.append(len(short_name))
97        words = [
98            short_name[uppercase_indexes[i] : uppercase_indexes[i + 1]].upper()
99            for i in range(len(uppercase_indexes) - 1)
100        ]
101        opcode_name = "_".join(words)
102        opcode = Frame.OperationCode[opcode_name]
103        category_class.subclasses[opcode] = subclass
104        return subclass
105
106    @staticmethod
107    def from_bytes(data: bytes) -> Frame:
108        if data[0] >> 4 != 0:
109            raise ValueError("first 4 bits must be 0s")
110
111        ctype_or_response = data[0] & 0xF
112        subunit_type = Frame.SubunitType(data[1] >> 3)
113        subunit_id = data[1] & 7
114
115        if subunit_type == Frame.SubunitType.EXTENDED:
116            # Not supported
117            raise NotImplementedError("extended subunit types not supported")
118
119        if subunit_id < 5:
120            opcode_offset = 2
121        elif subunit_id == 5:
122            # Extended to the next byte
123            extension = data[2]
124            if extension == 0:
125                raise ValueError("extended subunit ID value reserved")
126            if extension == 0xFF:
127                subunit_id = 5 + 254 + data[3]
128                opcode_offset = 4
129            else:
130                subunit_id = 5 + extension
131                opcode_offset = 3
132
133        elif subunit_id == 6:
134            raise ValueError("reserved subunit ID")
135
136        opcode = Frame.OperationCode(data[opcode_offset])
137        operands = data[opcode_offset + 1 :]
138
139        # Look for a registered subclass
140        if ctype_or_response < 8:
141            # Command
142            ctype = CommandFrame.CommandType(ctype_or_response)
143            if c_subclass := CommandFrame.subclasses.get(opcode):
144                return c_subclass(
145                    ctype,
146                    subunit_type,
147                    subunit_id,
148                    *c_subclass.parse_operands(operands),
149                )
150            return CommandFrame(ctype, subunit_type, subunit_id, opcode, operands)
151        else:
152            # Response
153            response = ResponseFrame.ResponseCode(ctype_or_response)
154            if r_subclass := ResponseFrame.subclasses.get(opcode):
155                return r_subclass(
156                    response,
157                    subunit_type,
158                    subunit_id,
159                    *r_subclass.parse_operands(operands),
160                )
161            return ResponseFrame(response, subunit_type, subunit_id, opcode, operands)
162
163    def to_bytes(
164        self,
165        ctype_or_response: Union[CommandFrame.CommandType, ResponseFrame.ResponseCode],
166    ) -> bytes:
167        # TODO: support extended subunit types and ids.
168        return (
169            bytes(
170                [
171                    ctype_or_response,
172                    self.subunit_type << 3 | self.subunit_id,
173                    self.opcode,
174                ]
175            )
176            + self.operands
177        )
178
179    def to_string(self, extra: str) -> str:
180        return (
181            f"{self.__class__.__name__}({extra}"
182            f"subunit_type={self.subunit_type.name}, "
183            f"subunit_id=0x{self.subunit_id:02X}, "
184            f"opcode={self.opcode.name}, "
185            f"operands={self.operands.hex()})"
186        )
187
188    def __init__(
189        self,
190        subunit_type: SubunitType,
191        subunit_id: int,
192        opcode: OperationCode,
193        operands: bytes,
194    ) -> None:
195        self.subunit_type = subunit_type
196        self.subunit_id = subunit_id
197        self.opcode = opcode
198        self.operands = operands
199
200
201# -----------------------------------------------------------------------------
202class CommandFrame(Frame):
203    class CommandType(OpenIntEnum):
204        # AV/C Digital Interface Command Set General Specification Version 4.1
205        # Table 7.1
206        CONTROL = 0x00
207        STATUS = 0x01
208        SPECIFIC_INQUIRY = 0x02
209        NOTIFY = 0x03
210        GENERAL_INQUIRY = 0x04
211
212    subclasses: Dict[Frame.OperationCode, Type[CommandFrame]] = {}
213    ctype: CommandType
214
215    @staticmethod
216    def parse_operands(operands: bytes) -> Tuple:
217        raise NotImplementedError
218
219    def __init__(
220        self,
221        ctype: CommandType,
222        subunit_type: Frame.SubunitType,
223        subunit_id: int,
224        opcode: Frame.OperationCode,
225        operands: bytes,
226    ) -> None:
227        super().__init__(subunit_type, subunit_id, opcode, operands)
228        self.ctype = ctype
229
230    def __bytes__(self):
231        return self.to_bytes(self.ctype)
232
233    def __str__(self):
234        return self.to_string(f"ctype={self.ctype.name}, ")
235
236
237# -----------------------------------------------------------------------------
238class ResponseFrame(Frame):
239    class ResponseCode(OpenIntEnum):
240        # AV/C Digital Interface Command Set General Specification Version 4.1
241        # Table 7.2
242        NOT_IMPLEMENTED = 0x08
243        ACCEPTED = 0x09
244        REJECTED = 0x0A
245        IN_TRANSITION = 0x0B
246        IMPLEMENTED_OR_STABLE = 0x0C
247        CHANGED = 0x0D
248        INTERIM = 0x0F
249
250    subclasses: Dict[Frame.OperationCode, Type[ResponseFrame]] = {}
251    response: ResponseCode
252
253    @staticmethod
254    def parse_operands(operands: bytes) -> Tuple:
255        raise NotImplementedError
256
257    def __init__(
258        self,
259        response: ResponseCode,
260        subunit_type: Frame.SubunitType,
261        subunit_id: int,
262        opcode: Frame.OperationCode,
263        operands: bytes,
264    ) -> None:
265        super().__init__(subunit_type, subunit_id, opcode, operands)
266        self.response = response
267
268    def __bytes__(self):
269        return self.to_bytes(self.response)
270
271    def __str__(self):
272        return self.to_string(f"response={self.response.name}, ")
273
274
275# -----------------------------------------------------------------------------
276class VendorDependentFrame:
277    company_id: int
278    vendor_dependent_data: bytes
279
280    @staticmethod
281    def parse_operands(operands: bytes) -> Tuple:
282        return (
283            struct.unpack(">I", b"\x00" + operands[:3])[0],
284            operands[3:],
285        )
286
287    def make_operands(self) -> bytes:
288        return struct.pack(">I", self.company_id)[1:] + self.vendor_dependent_data
289
290    def __init__(self, company_id: int, vendor_dependent_data: bytes):
291        self.company_id = company_id
292        self.vendor_dependent_data = vendor_dependent_data
293
294
295# -----------------------------------------------------------------------------
296@Frame.subclass
297class VendorDependentCommandFrame(VendorDependentFrame, CommandFrame):
298    def __init__(
299        self,
300        ctype: CommandFrame.CommandType,
301        subunit_type: Frame.SubunitType,
302        subunit_id: int,
303        company_id: int,
304        vendor_dependent_data: bytes,
305    ) -> None:
306        VendorDependentFrame.__init__(self, company_id, vendor_dependent_data)
307        CommandFrame.__init__(
308            self,
309            ctype,
310            subunit_type,
311            subunit_id,
312            Frame.OperationCode.VENDOR_DEPENDENT,
313            self.make_operands(),
314        )
315
316    def __str__(self):
317        return (
318            f"VendorDependentCommandFrame(ctype={self.ctype.name}, "
319            f"subunit_type={self.subunit_type.name}, "
320            f"subunit_id=0x{self.subunit_id:02X}, "
321            f"company_id=0x{self.company_id:06X}, "
322            f"vendor_dependent_data={self.vendor_dependent_data.hex()})"
323        )
324
325
326# -----------------------------------------------------------------------------
327@Frame.subclass
328class VendorDependentResponseFrame(VendorDependentFrame, ResponseFrame):
329    def __init__(
330        self,
331        response: ResponseFrame.ResponseCode,
332        subunit_type: Frame.SubunitType,
333        subunit_id: int,
334        company_id: int,
335        vendor_dependent_data: bytes,
336    ) -> None:
337        VendorDependentFrame.__init__(self, company_id, vendor_dependent_data)
338        ResponseFrame.__init__(
339            self,
340            response,
341            subunit_type,
342            subunit_id,
343            Frame.OperationCode.VENDOR_DEPENDENT,
344            self.make_operands(),
345        )
346
347    def __str__(self):
348        return (
349            f"VendorDependentResponseFrame(response={self.response.name}, "
350            f"subunit_type={self.subunit_type.name}, "
351            f"subunit_id=0x{self.subunit_id:02X}, "
352            f"company_id=0x{self.company_id:06X}, "
353            f"vendor_dependent_data={self.vendor_dependent_data.hex()})"
354        )
355
356
357# -----------------------------------------------------------------------------
358class PassThroughFrame:
359    """
360    See AV/C Panel Subunit Specification 1.1 - 9.4 PASS THROUGH control command
361    """
362
363    class StateFlag(enum.IntEnum):
364        PRESSED = 0
365        RELEASED = 1
366
367    class OperationId(OpenIntEnum):
368        SELECT = 0x00
369        UP = 0x01
370        DOWN = 0x01
371        LEFT = 0x03
372        RIGHT = 0x04
373        RIGHT_UP = 0x05
374        RIGHT_DOWN = 0x06
375        LEFT_UP = 0x07
376        LEFT_DOWN = 0x08
377        ROOT_MENU = 0x09
378        SETUP_MENU = 0x0A
379        CONTENTS_MENU = 0x0B
380        FAVORITE_MENU = 0x0C
381        EXIT = 0x0D
382        NUMBER_0 = 0x20
383        NUMBER_1 = 0x21
384        NUMBER_2 = 0x22
385        NUMBER_3 = 0x23
386        NUMBER_4 = 0x24
387        NUMBER_5 = 0x25
388        NUMBER_6 = 0x26
389        NUMBER_7 = 0x27
390        NUMBER_8 = 0x28
391        NUMBER_9 = 0x29
392        DOT = 0x2A
393        ENTER = 0x2B
394        CLEAR = 0x2C
395        CHANNEL_UP = 0x30
396        CHANNEL_DOWN = 0x31
397        PREVIOUS_CHANNEL = 0x32
398        SOUND_SELECT = 0x33
399        INPUT_SELECT = 0x34
400        DISPLAY_INFORMATION = 0x35
401        HELP = 0x36
402        PAGE_UP = 0x37
403        PAGE_DOWN = 0x38
404        POWER = 0x40
405        VOLUME_UP = 0x41
406        VOLUME_DOWN = 0x42
407        MUTE = 0x43
408        PLAY = 0x44
409        STOP = 0x45
410        PAUSE = 0x46
411        RECORD = 0x47
412        REWIND = 0x48
413        FAST_FORWARD = 0x49
414        EJECT = 0x4A
415        FORWARD = 0x4B
416        BACKWARD = 0x4C
417        ANGLE = 0x50
418        SUBPICTURE = 0x51
419        F1 = 0x71
420        F2 = 0x72
421        F3 = 0x73
422        F4 = 0x74
423        F5 = 0x75
424        VENDOR_UNIQUE = 0x7E
425
426    state_flag: StateFlag
427    operation_id: OperationId
428    operation_data: bytes
429
430    @staticmethod
431    def parse_operands(operands: bytes) -> Tuple:
432        return (
433            PassThroughFrame.StateFlag(operands[0] >> 7),
434            PassThroughFrame.OperationId(operands[0] & 0x7F),
435            operands[1 : 1 + operands[1]],
436        )
437
438    def make_operands(self):
439        return (
440            bytes([self.state_flag << 7 | self.operation_id, len(self.operation_data)])
441            + self.operation_data
442        )
443
444    def __init__(
445        self,
446        state_flag: StateFlag,
447        operation_id: OperationId,
448        operation_data: bytes,
449    ) -> None:
450        if len(operation_data) > 255:
451            raise ValueError("operation data must be <= 255 bytes")
452        self.state_flag = state_flag
453        self.operation_id = operation_id
454        self.operation_data = operation_data
455
456
457# -----------------------------------------------------------------------------
458@Frame.subclass
459class PassThroughCommandFrame(PassThroughFrame, CommandFrame):
460    def __init__(
461        self,
462        ctype: CommandFrame.CommandType,
463        subunit_type: Frame.SubunitType,
464        subunit_id: int,
465        state_flag: PassThroughFrame.StateFlag,
466        operation_id: PassThroughFrame.OperationId,
467        operation_data: bytes,
468    ) -> None:
469        PassThroughFrame.__init__(self, state_flag, operation_id, operation_data)
470        CommandFrame.__init__(
471            self,
472            ctype,
473            subunit_type,
474            subunit_id,
475            Frame.OperationCode.PASS_THROUGH,
476            self.make_operands(),
477        )
478
479    def __str__(self):
480        return (
481            f"PassThroughCommandFrame(ctype={self.ctype.name}, "
482            f"subunit_type={self.subunit_type.name}, "
483            f"subunit_id=0x{self.subunit_id:02X}, "
484            f"state_flag={self.state_flag.name}, "
485            f"operation_id={self.operation_id.name}, "
486            f"operation_data={self.operation_data.hex()})"
487        )
488
489
490# -----------------------------------------------------------------------------
491@Frame.subclass
492class PassThroughResponseFrame(PassThroughFrame, ResponseFrame):
493    def __init__(
494        self,
495        response: ResponseFrame.ResponseCode,
496        subunit_type: Frame.SubunitType,
497        subunit_id: int,
498        state_flag: PassThroughFrame.StateFlag,
499        operation_id: PassThroughFrame.OperationId,
500        operation_data: bytes,
501    ) -> None:
502        PassThroughFrame.__init__(self, state_flag, operation_id, operation_data)
503        ResponseFrame.__init__(
504            self,
505            response,
506            subunit_type,
507            subunit_id,
508            Frame.OperationCode.PASS_THROUGH,
509            self.make_operands(),
510        )
511
512    def __str__(self):
513        return (
514            f"PassThroughResponseFrame(response={self.response.name}, "
515            f"subunit_type={self.subunit_type.name}, "
516            f"subunit_id=0x{self.subunit_id:02X}, "
517            f"state_flag={self.state_flag.name}, "
518            f"operation_id={self.operation_id.name}, "
519            f"operation_data={self.operation_data.hex()})"
520        )
521