1# Copyright 2021-2023 Google LLC 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# ----------------------------------------------------------------------------- 16# Imports 17# ----------------------------------------------------------------------------- 18from __future__ import annotations 19import enum 20import struct 21from typing import Dict, Type, Union, Tuple 22 23from bumble.utils import OpenIntEnum 24 25 26# ----------------------------------------------------------------------------- 27class Frame: 28 class SubunitType(enum.IntEnum): 29 # AV/C Digital Interface Command Set General Specification Version 4.1 30 # Table 7.4 31 MONITOR = 0x00 32 AUDIO = 0x01 33 PRINTER = 0x02 34 DISC = 0x03 35 TAPE_RECORDER_OR_PLAYER = 0x04 36 TUNER = 0x05 37 CA = 0x06 38 CAMERA = 0x07 39 PANEL = 0x09 40 BULLETIN_BOARD = 0x0A 41 VENDOR_UNIQUE = 0x1C 42 EXTENDED = 0x1E 43 UNIT = 0x1F 44 45 class OperationCode(OpenIntEnum): 46 # 0x00 - 0x0F: Unit and subunit commands 47 VENDOR_DEPENDENT = 0x00 48 RESERVE = 0x01 49 PLUG_INFO = 0x02 50 51 # 0x10 - 0x3F: Unit commands 52 DIGITAL_OUTPUT = 0x10 53 DIGITAL_INPUT = 0x11 54 CHANNEL_USAGE = 0x12 55 OUTPUT_PLUG_SIGNAL_FORMAT = 0x18 56 INPUT_PLUG_SIGNAL_FORMAT = 0x19 57 GENERAL_BUS_SETUP = 0x1F 58 CONNECT_AV = 0x20 59 DISCONNECT_AV = 0x21 60 CONNECTIONS = 0x22 61 CONNECT = 0x24 62 DISCONNECT = 0x25 63 UNIT_INFO = 0x30 64 SUBUNIT_INFO = 0x31 65 66 # 0x40 - 0x7F: Subunit commands 67 PASS_THROUGH = 0x7C 68 GUI_UPDATE = 0x7D 69 PUSH_GUI_DATA = 0x7E 70 USER_ACTION = 0x7F 71 72 # 0xA0 - 0xBF: Unit and subunit commands 73 VERSION = 0xB0 74 POWER = 0xB2 75 76 subunit_type: SubunitType 77 subunit_id: int 78 opcode: OperationCode 79 operands: bytes 80 81 @staticmethod 82 def subclass(subclass): 83 # Infer the opcode from the class name 84 if subclass.__name__.endswith("CommandFrame"): 85 short_name = subclass.__name__.replace("CommandFrame", "") 86 category_class = CommandFrame 87 elif subclass.__name__.endswith("ResponseFrame"): 88 short_name = subclass.__name__.replace("ResponseFrame", "") 89 category_class = ResponseFrame 90 else: 91 raise ValueError(f"invalid subclass name {subclass.__name__}") 92 93 uppercase_indexes = [ 94 i for i in range(len(short_name)) if short_name[i].isupper() 95 ] 96 uppercase_indexes.append(len(short_name)) 97 words = [ 98 short_name[uppercase_indexes[i] : uppercase_indexes[i + 1]].upper() 99 for i in range(len(uppercase_indexes) - 1) 100 ] 101 opcode_name = "_".join(words) 102 opcode = Frame.OperationCode[opcode_name] 103 category_class.subclasses[opcode] = subclass 104 return subclass 105 106 @staticmethod 107 def from_bytes(data: bytes) -> Frame: 108 if data[0] >> 4 != 0: 109 raise ValueError("first 4 bits must be 0s") 110 111 ctype_or_response = data[0] & 0xF 112 subunit_type = Frame.SubunitType(data[1] >> 3) 113 subunit_id = data[1] & 7 114 115 if subunit_type == Frame.SubunitType.EXTENDED: 116 # Not supported 117 raise NotImplementedError("extended subunit types not supported") 118 119 if subunit_id < 5: 120 opcode_offset = 2 121 elif subunit_id == 5: 122 # Extended to the next byte 123 extension = data[2] 124 if extension == 0: 125 raise ValueError("extended subunit ID value reserved") 126 if extension == 0xFF: 127 subunit_id = 5 + 254 + data[3] 128 opcode_offset = 4 129 else: 130 subunit_id = 5 + extension 131 opcode_offset = 3 132 133 elif subunit_id == 6: 134 raise ValueError("reserved subunit ID") 135 136 opcode = Frame.OperationCode(data[opcode_offset]) 137 operands = data[opcode_offset + 1 :] 138 139 # Look for a registered subclass 140 if ctype_or_response < 8: 141 # Command 142 ctype = CommandFrame.CommandType(ctype_or_response) 143 if c_subclass := CommandFrame.subclasses.get(opcode): 144 return c_subclass( 145 ctype, 146 subunit_type, 147 subunit_id, 148 *c_subclass.parse_operands(operands), 149 ) 150 return CommandFrame(ctype, subunit_type, subunit_id, opcode, operands) 151 else: 152 # Response 153 response = ResponseFrame.ResponseCode(ctype_or_response) 154 if r_subclass := ResponseFrame.subclasses.get(opcode): 155 return r_subclass( 156 response, 157 subunit_type, 158 subunit_id, 159 *r_subclass.parse_operands(operands), 160 ) 161 return ResponseFrame(response, subunit_type, subunit_id, opcode, operands) 162 163 def to_bytes( 164 self, 165 ctype_or_response: Union[CommandFrame.CommandType, ResponseFrame.ResponseCode], 166 ) -> bytes: 167 # TODO: support extended subunit types and ids. 168 return ( 169 bytes( 170 [ 171 ctype_or_response, 172 self.subunit_type << 3 | self.subunit_id, 173 self.opcode, 174 ] 175 ) 176 + self.operands 177 ) 178 179 def to_string(self, extra: str) -> str: 180 return ( 181 f"{self.__class__.__name__}({extra}" 182 f"subunit_type={self.subunit_type.name}, " 183 f"subunit_id=0x{self.subunit_id:02X}, " 184 f"opcode={self.opcode.name}, " 185 f"operands={self.operands.hex()})" 186 ) 187 188 def __init__( 189 self, 190 subunit_type: SubunitType, 191 subunit_id: int, 192 opcode: OperationCode, 193 operands: bytes, 194 ) -> None: 195 self.subunit_type = subunit_type 196 self.subunit_id = subunit_id 197 self.opcode = opcode 198 self.operands = operands 199 200 201# ----------------------------------------------------------------------------- 202class CommandFrame(Frame): 203 class CommandType(OpenIntEnum): 204 # AV/C Digital Interface Command Set General Specification Version 4.1 205 # Table 7.1 206 CONTROL = 0x00 207 STATUS = 0x01 208 SPECIFIC_INQUIRY = 0x02 209 NOTIFY = 0x03 210 GENERAL_INQUIRY = 0x04 211 212 subclasses: Dict[Frame.OperationCode, Type[CommandFrame]] = {} 213 ctype: CommandType 214 215 @staticmethod 216 def parse_operands(operands: bytes) -> Tuple: 217 raise NotImplementedError 218 219 def __init__( 220 self, 221 ctype: CommandType, 222 subunit_type: Frame.SubunitType, 223 subunit_id: int, 224 opcode: Frame.OperationCode, 225 operands: bytes, 226 ) -> None: 227 super().__init__(subunit_type, subunit_id, opcode, operands) 228 self.ctype = ctype 229 230 def __bytes__(self): 231 return self.to_bytes(self.ctype) 232 233 def __str__(self): 234 return self.to_string(f"ctype={self.ctype.name}, ") 235 236 237# ----------------------------------------------------------------------------- 238class ResponseFrame(Frame): 239 class ResponseCode(OpenIntEnum): 240 # AV/C Digital Interface Command Set General Specification Version 4.1 241 # Table 7.2 242 NOT_IMPLEMENTED = 0x08 243 ACCEPTED = 0x09 244 REJECTED = 0x0A 245 IN_TRANSITION = 0x0B 246 IMPLEMENTED_OR_STABLE = 0x0C 247 CHANGED = 0x0D 248 INTERIM = 0x0F 249 250 subclasses: Dict[Frame.OperationCode, Type[ResponseFrame]] = {} 251 response: ResponseCode 252 253 @staticmethod 254 def parse_operands(operands: bytes) -> Tuple: 255 raise NotImplementedError 256 257 def __init__( 258 self, 259 response: ResponseCode, 260 subunit_type: Frame.SubunitType, 261 subunit_id: int, 262 opcode: Frame.OperationCode, 263 operands: bytes, 264 ) -> None: 265 super().__init__(subunit_type, subunit_id, opcode, operands) 266 self.response = response 267 268 def __bytes__(self): 269 return self.to_bytes(self.response) 270 271 def __str__(self): 272 return self.to_string(f"response={self.response.name}, ") 273 274 275# ----------------------------------------------------------------------------- 276class VendorDependentFrame: 277 company_id: int 278 vendor_dependent_data: bytes 279 280 @staticmethod 281 def parse_operands(operands: bytes) -> Tuple: 282 return ( 283 struct.unpack(">I", b"\x00" + operands[:3])[0], 284 operands[3:], 285 ) 286 287 def make_operands(self) -> bytes: 288 return struct.pack(">I", self.company_id)[1:] + self.vendor_dependent_data 289 290 def __init__(self, company_id: int, vendor_dependent_data: bytes): 291 self.company_id = company_id 292 self.vendor_dependent_data = vendor_dependent_data 293 294 295# ----------------------------------------------------------------------------- 296@Frame.subclass 297class VendorDependentCommandFrame(VendorDependentFrame, CommandFrame): 298 def __init__( 299 self, 300 ctype: CommandFrame.CommandType, 301 subunit_type: Frame.SubunitType, 302 subunit_id: int, 303 company_id: int, 304 vendor_dependent_data: bytes, 305 ) -> None: 306 VendorDependentFrame.__init__(self, company_id, vendor_dependent_data) 307 CommandFrame.__init__( 308 self, 309 ctype, 310 subunit_type, 311 subunit_id, 312 Frame.OperationCode.VENDOR_DEPENDENT, 313 self.make_operands(), 314 ) 315 316 def __str__(self): 317 return ( 318 f"VendorDependentCommandFrame(ctype={self.ctype.name}, " 319 f"subunit_type={self.subunit_type.name}, " 320 f"subunit_id=0x{self.subunit_id:02X}, " 321 f"company_id=0x{self.company_id:06X}, " 322 f"vendor_dependent_data={self.vendor_dependent_data.hex()})" 323 ) 324 325 326# ----------------------------------------------------------------------------- 327@Frame.subclass 328class VendorDependentResponseFrame(VendorDependentFrame, ResponseFrame): 329 def __init__( 330 self, 331 response: ResponseFrame.ResponseCode, 332 subunit_type: Frame.SubunitType, 333 subunit_id: int, 334 company_id: int, 335 vendor_dependent_data: bytes, 336 ) -> None: 337 VendorDependentFrame.__init__(self, company_id, vendor_dependent_data) 338 ResponseFrame.__init__( 339 self, 340 response, 341 subunit_type, 342 subunit_id, 343 Frame.OperationCode.VENDOR_DEPENDENT, 344 self.make_operands(), 345 ) 346 347 def __str__(self): 348 return ( 349 f"VendorDependentResponseFrame(response={self.response.name}, " 350 f"subunit_type={self.subunit_type.name}, " 351 f"subunit_id=0x{self.subunit_id:02X}, " 352 f"company_id=0x{self.company_id:06X}, " 353 f"vendor_dependent_data={self.vendor_dependent_data.hex()})" 354 ) 355 356 357# ----------------------------------------------------------------------------- 358class PassThroughFrame: 359 """ 360 See AV/C Panel Subunit Specification 1.1 - 9.4 PASS THROUGH control command 361 """ 362 363 class StateFlag(enum.IntEnum): 364 PRESSED = 0 365 RELEASED = 1 366 367 class OperationId(OpenIntEnum): 368 SELECT = 0x00 369 UP = 0x01 370 DOWN = 0x01 371 LEFT = 0x03 372 RIGHT = 0x04 373 RIGHT_UP = 0x05 374 RIGHT_DOWN = 0x06 375 LEFT_UP = 0x07 376 LEFT_DOWN = 0x08 377 ROOT_MENU = 0x09 378 SETUP_MENU = 0x0A 379 CONTENTS_MENU = 0x0B 380 FAVORITE_MENU = 0x0C 381 EXIT = 0x0D 382 NUMBER_0 = 0x20 383 NUMBER_1 = 0x21 384 NUMBER_2 = 0x22 385 NUMBER_3 = 0x23 386 NUMBER_4 = 0x24 387 NUMBER_5 = 0x25 388 NUMBER_6 = 0x26 389 NUMBER_7 = 0x27 390 NUMBER_8 = 0x28 391 NUMBER_9 = 0x29 392 DOT = 0x2A 393 ENTER = 0x2B 394 CLEAR = 0x2C 395 CHANNEL_UP = 0x30 396 CHANNEL_DOWN = 0x31 397 PREVIOUS_CHANNEL = 0x32 398 SOUND_SELECT = 0x33 399 INPUT_SELECT = 0x34 400 DISPLAY_INFORMATION = 0x35 401 HELP = 0x36 402 PAGE_UP = 0x37 403 PAGE_DOWN = 0x38 404 POWER = 0x40 405 VOLUME_UP = 0x41 406 VOLUME_DOWN = 0x42 407 MUTE = 0x43 408 PLAY = 0x44 409 STOP = 0x45 410 PAUSE = 0x46 411 RECORD = 0x47 412 REWIND = 0x48 413 FAST_FORWARD = 0x49 414 EJECT = 0x4A 415 FORWARD = 0x4B 416 BACKWARD = 0x4C 417 ANGLE = 0x50 418 SUBPICTURE = 0x51 419 F1 = 0x71 420 F2 = 0x72 421 F3 = 0x73 422 F4 = 0x74 423 F5 = 0x75 424 VENDOR_UNIQUE = 0x7E 425 426 state_flag: StateFlag 427 operation_id: OperationId 428 operation_data: bytes 429 430 @staticmethod 431 def parse_operands(operands: bytes) -> Tuple: 432 return ( 433 PassThroughFrame.StateFlag(operands[0] >> 7), 434 PassThroughFrame.OperationId(operands[0] & 0x7F), 435 operands[1 : 1 + operands[1]], 436 ) 437 438 def make_operands(self): 439 return ( 440 bytes([self.state_flag << 7 | self.operation_id, len(self.operation_data)]) 441 + self.operation_data 442 ) 443 444 def __init__( 445 self, 446 state_flag: StateFlag, 447 operation_id: OperationId, 448 operation_data: bytes, 449 ) -> None: 450 if len(operation_data) > 255: 451 raise ValueError("operation data must be <= 255 bytes") 452 self.state_flag = state_flag 453 self.operation_id = operation_id 454 self.operation_data = operation_data 455 456 457# ----------------------------------------------------------------------------- 458@Frame.subclass 459class PassThroughCommandFrame(PassThroughFrame, CommandFrame): 460 def __init__( 461 self, 462 ctype: CommandFrame.CommandType, 463 subunit_type: Frame.SubunitType, 464 subunit_id: int, 465 state_flag: PassThroughFrame.StateFlag, 466 operation_id: PassThroughFrame.OperationId, 467 operation_data: bytes, 468 ) -> None: 469 PassThroughFrame.__init__(self, state_flag, operation_id, operation_data) 470 CommandFrame.__init__( 471 self, 472 ctype, 473 subunit_type, 474 subunit_id, 475 Frame.OperationCode.PASS_THROUGH, 476 self.make_operands(), 477 ) 478 479 def __str__(self): 480 return ( 481 f"PassThroughCommandFrame(ctype={self.ctype.name}, " 482 f"subunit_type={self.subunit_type.name}, " 483 f"subunit_id=0x{self.subunit_id:02X}, " 484 f"state_flag={self.state_flag.name}, " 485 f"operation_id={self.operation_id.name}, " 486 f"operation_data={self.operation_data.hex()})" 487 ) 488 489 490# ----------------------------------------------------------------------------- 491@Frame.subclass 492class PassThroughResponseFrame(PassThroughFrame, ResponseFrame): 493 def __init__( 494 self, 495 response: ResponseFrame.ResponseCode, 496 subunit_type: Frame.SubunitType, 497 subunit_id: int, 498 state_flag: PassThroughFrame.StateFlag, 499 operation_id: PassThroughFrame.OperationId, 500 operation_data: bytes, 501 ) -> None: 502 PassThroughFrame.__init__(self, state_flag, operation_id, operation_data) 503 ResponseFrame.__init__( 504 self, 505 response, 506 subunit_type, 507 subunit_id, 508 Frame.OperationCode.PASS_THROUGH, 509 self.make_operands(), 510 ) 511 512 def __str__(self): 513 return ( 514 f"PassThroughResponseFrame(response={self.response.name}, " 515 f"subunit_type={self.subunit_type.name}, " 516 f"subunit_id=0x{self.subunit_id:02X}, " 517 f"state_flag={self.state_flag.name}, " 518 f"operation_id={self.operation_id.name}, " 519 f"operation_data={self.operation_data.hex()})" 520 ) 521