1# File generated from avdtp_packets.pdl with the command: 2# pdlc avdtp_packets.pdl | pdl-compiler/scripts/generate_python_backend.py 3# /!\ Do not edit by hand. 4from dataclasses import dataclass, field, fields 5from typing import Optional, List, Tuple, Union 6import enum 7import inspect 8import math 9 10 11@dataclass 12class Packet: 13 payload: Optional[bytes] = field(repr=False, default_factory=bytes, compare=False) 14 15 @classmethod 16 def parse_all(cls, span: bytes) -> "Packet": 17 packet, remain = getattr(cls, "parse")(span) 18 if len(remain) > 0: 19 raise Exception("Unexpected parsing remainder") 20 return packet 21 22 @property 23 def size(self) -> int: 24 pass 25 26 def show(self, prefix: str = ""): 27 print(f"{self.__class__.__name__}") 28 29 def print_val(p: str, pp: str, name: str, align: int, typ, val): 30 if name == "payload": 31 pass 32 33 # Scalar fields. 34 elif typ is int: 35 print(f"{p}{name:{align}} = {val} (0x{val:x})") 36 37 # Byte fields. 38 elif typ is bytes: 39 print(f"{p}{name:{align}} = [", end="") 40 line = "" 41 n_pp = "" 42 for idx, b in enumerate(val): 43 if idx > 0 and idx % 8 == 0: 44 print(f"{n_pp}{line}") 45 line = "" 46 n_pp = pp + (" " * (align + 4)) 47 line += f" {b:02x}" 48 print(f"{n_pp}{line} ]") 49 50 # Enum fields. 51 elif inspect.isclass(typ) and issubclass(typ, enum.IntEnum): 52 print(f"{p}{name:{align}} = {typ.__name__}::{val.name} (0x{val:x})") 53 54 # Struct fields. 55 elif inspect.isclass(typ) and issubclass(typ, globals().get("Packet")): 56 print(f"{p}{name:{align}} = ", end="") 57 val.show(prefix=pp) 58 59 # Array fields. 60 elif getattr(typ, "__origin__", None) == list: 61 print(f"{p}{name:{align}}") 62 last = len(val) - 1 63 align = 5 64 for idx, elt in enumerate(val): 65 n_p = pp + ("├── " if idx != last else "└── ") 66 n_pp = pp + ("│ " if idx != last else " ") 67 print_val(n_p, n_pp, f"[{idx}]", align, typ.__args__[0], val[idx]) 68 69 # Custom fields. 70 elif inspect.isclass(typ): 71 print(f"{p}{name:{align}} = {repr(val)}") 72 73 else: 74 print(f"{p}{name:{align}} = ##{typ}##") 75 76 last = len(fields(self)) - 1 77 align = max(len(f.name) for f in fields(self) if f.name != "payload") 78 79 for idx, f in enumerate(fields(self)): 80 p = prefix + ("├── " if idx != last else "└── ") 81 pp = prefix + ("│ " if idx != last else " ") 82 val = getattr(self, f.name) 83 84 print_val(p, pp, f.name, align, f.type, val) 85 86 87class PacketType(enum.IntEnum): 88 SINGLE_PACKET = 0x0 89 START_PACKET = 0x1 90 CONTINUE_PACKET = 0x2 91 END_PACKET = 0x3 92 93 @staticmethod 94 def from_int(v: int) -> Union[int, "PacketType"]: 95 try: 96 return PacketType(v) 97 except ValueError as exn: 98 raise exn 99 100 101class MessageType(enum.IntEnum): 102 COMMAND = 0x0 103 GENERAL_REJECT = 0x1 104 RESPONSE_ACCEPT = 0x2 105 RESPONSE_REJECT = 0x3 106 107 @staticmethod 108 def from_int(v: int) -> Union[int, "MessageType"]: 109 try: 110 return MessageType(v) 111 except ValueError as exn: 112 raise exn 113 114 115class SignalIdentifier(enum.IntEnum): 116 AVDTP_DISCOVER = 0x1 117 AVDTP_GET_CAPABILITIES = 0x2 118 AVDTP_SET_CONFIGURATION = 0x3 119 AVDTP_GET_CONFIGURATION = 0x4 120 AVDTP_RECONFIGURE = 0x5 121 AVDTP_OPEN = 0x6 122 AVDTP_START = 0x7 123 AVDTP_CLOSE = 0x8 124 AVDTP_SUSPEND = 0x9 125 AVDTP_ABORT = 0xA 126 AVDTP_SECURITY_CONTROL = 0xB 127 AVDTP_GET_ALL_CAPABILITIES = 0xC 128 AVDTP_DELAYREPORT = 0xD 129 130 @staticmethod 131 def from_int(v: int) -> Union[int, "SignalIdentifier"]: 132 try: 133 return SignalIdentifier(v) 134 except ValueError as exn: 135 raise exn 136 137 138class ErrorCode(enum.IntEnum): 139 SUCCESS = 0x0 140 AVDTP_BAD_HEADER_FORMAT = 0x1 141 AVDTP_BAD_LENGTH = 0x11 142 AVDTP_BAD_ACP_SEID = 0x12 143 AVDTP_SEP_IN_USE = 0x13 144 AVDTP_SEP_NOT_IN_USE = 0x14 145 AVDTP_BAD_SERV_CATEGORY = 0x17 146 AVDTP_BAD_PAYLOAD_FORMAT = 0x18 147 AVDTP_NOT_SUPPORTED_COMMAND = 0x19 148 AVDTP_INVALID_CAPABILITIES = 0x1A 149 AVDTP_BAD_RECOVERY_TYPE = 0x22 150 AVDTP_BAD_MEDIA_TRANSPORT_FORMAT = 0x23 151 AVDTP_BAD_RECOVERY_FORMAT = 0x25 152 AVDTP_BAD_ROHC_FORMAT = 0x26 153 AVDTP_BAD_CP_FORMAT = 0x27 154 AVDTP_BAD_MULTIPLEXING_FORMAT = 0x28 155 AVDTP_UNSUPPORTED_CONFIGURATION = 0x29 156 AVDTP_BAD_STATE = 0x31 157 GAVDTP_BAD_SERVICE = 0x80 158 GAVDTP_INSUFFICIENT_RESOURCES = 0x81 159 A2DP_INVALID_CODEC_TYPE = 0xC1 160 A2DP_NOT_SUPPORTED_CODEC_TYPE = 0xC2 161 A2DP_INVALID_SAMPLING_FREQUENCY = 0xC3 162 A2DP_NOT_SUPPORTED_SAMPLING_FREQUENCY = 0xC4 163 A2DP_INVALID_CHANNEL_MODE = 0xC5 164 A2DP_NOT_SUPPORTED_CHANNEL_MODE = 0xC6 165 A2DP_INVALID_SUBBANDS = 0xC7 166 A2DP_NOT_SUPPORTED_SUBBANDS = 0xC8 167 A2DP_INVALID_ALLOCATION_METHOD = 0xC9 168 A2DP_NOT_SUPPORTED_ALLOCATION_METHOD = 0xCA 169 A2DP_INVALID_MINIMUM_BITPOOL_VALUE = 0xCB 170 A2DP_NOT_SUPPORTED_MINIMUM_BITPOOL_VALUE = 0xCC 171 A2DP_INVALID_MAXIMUM_BITPOOL_VALUE = 0xCD 172 A2DP_NOT_SUPPORTED_MAXIMUM_BITPOOL_VALUE = 0xCE 173 A2DP_INVALID_LAYER = 0xCF 174 A2DP_NOT_SUPPORTED_LAYER = 0xD0 175 A2DP_NOT_SUPPORTED_CRC = 0xD1 176 A2DP_NOT_SUPPORTED_MPF = 0xD2 177 A2DP_NOT_SUPPORTED_VBR = 0xD3 178 A2DP_INVALID_BIT_RATE = 0xD4 179 A2DP_NOT_SUPPORTED_BIT_RATE = 0xD5 180 A2DP_INVALID_OBJECT_TYPE = 0xD6 181 A2DP_NOT_SUPPORTED_OBJECT_TYPE = 0xD7 182 A2DP_INVALID_CHANNELS = 0xD8 183 A2DP_NOT_SUPPORTED_CHANNELS = 0xD9 184 A2DP_INVALID_BLOCK_LENGTH = 0xDD 185 A2DP_INVALID_CP_TYPE = 0xE0 186 A2DP_INVALID_CP_FORMAT = 0xE1 187 A2DP_INVALID_CODEC_PARAMETER = 0xE2 188 A2DP_NOT_SUPPORTED_CODEC_PARAMETER = 0xE3 189 190 @staticmethod 191 def from_int(v: int) -> Union[int, "ErrorCode"]: 192 try: 193 return ErrorCode(v) 194 except ValueError as exn: 195 raise exn 196 197 198class Tsep(enum.IntEnum): 199 SOURCE = 0x0 200 SINK = 0x1 201 202 @staticmethod 203 def from_int(v: int) -> Union[int, "Tsep"]: 204 try: 205 return Tsep(v) 206 except ValueError as exn: 207 raise exn 208 209 210class ServiceCategory(enum.IntEnum): 211 MEDIA_TRANSPORT = 0x1 212 REPORTING = 0x2 213 RECOVERY = 0x3 214 CONTENT_PROTECTION = 0x4 215 HEADER_COMPRESSION = 0x5 216 MULTIPLEXING = 0x6 217 MEDIA_CODEC = 0x7 218 DELAY_REPORTING = 0x8 219 220 @staticmethod 221 def from_int(v: int) -> Union[int, "ServiceCategory"]: 222 try: 223 return ServiceCategory(v) 224 except ValueError as exn: 225 raise exn 226 227 228@dataclass 229class SeidInformation(Packet): 230 in_use: int = field(kw_only=True, default=0) 231 acp_seid: int = field(kw_only=True, default=0) 232 tsep: Tsep = field(kw_only=True, default=Tsep.SOURCE) 233 media_type: int = field(kw_only=True, default=0) 234 235 def __post_init__(self): 236 pass 237 238 @staticmethod 239 def parse(span: bytes) -> Tuple["SeidInformation", bytes]: 240 fields = {"payload": None} 241 if len(span) < 2: 242 raise Exception("Invalid packet size") 243 fields["in_use"] = (span[0] >> 1) & 0x1 244 fields["acp_seid"] = (span[0] >> 2) & 0x3F 245 fields["tsep"] = Tsep.from_int((span[1] >> 3) & 0x1) 246 fields["media_type"] = (span[1] >> 4) & 0xF 247 span = span[2:] 248 return SeidInformation(**fields), span 249 250 def serialize(self, payload: bytes = None) -> bytes: 251 _span = bytearray() 252 if self.in_use > 1: 253 print(f"Invalid value for field SeidInformation::in_use: {self.in_use} > 1; the value will be truncated") 254 self.in_use &= 1 255 if self.acp_seid > 63: 256 print( 257 f"Invalid value for field SeidInformation::acp_seid: {self.acp_seid} > 63; the value will be truncated") 258 self.acp_seid &= 63 259 _value = (self.in_use << 1) | (self.acp_seid << 2) 260 _span.append(_value) 261 if self.media_type > 15: 262 print( 263 f"Invalid value for field SeidInformation::media_type: {self.media_type} > 15; the value will be truncated" 264 ) 265 self.media_type &= 15 266 _value = (self.tsep << 3) | (self.media_type << 4) 267 _span.append(_value) 268 return bytes(_span) 269 270 @property 271 def size(self) -> int: 272 return 2 273 274 275@dataclass 276class ServiceCapability(Packet): 277 service_category: ServiceCategory = field(kw_only=True, default=ServiceCategory.MEDIA_TRANSPORT) 278 279 def __post_init__(self): 280 pass 281 282 @staticmethod 283 def parse(span: bytes) -> Tuple["ServiceCapability", bytes]: 284 fields = {"payload": None} 285 if len(span) < 2: 286 raise Exception("Invalid packet size") 287 fields["service_category"] = ServiceCategory.from_int(span[0]) 288 _payload__size = span[1] 289 span = span[2:] 290 if len(span) < _payload__size: 291 raise Exception("Invalid packet size") 292 payload = span[:_payload__size] 293 span = span[_payload__size:] 294 fields["payload"] = payload 295 try: 296 child, remain = MediaTransportCapability.parse(fields.copy(), payload) 297 assert len(remain) == 0 298 return child, span 299 except Exception as exn: 300 pass 301 try: 302 child, remain = ReportingCapability.parse(fields.copy(), payload) 303 assert len(remain) == 0 304 return child, span 305 except Exception as exn: 306 pass 307 try: 308 child, remain = RecoveryCapability.parse(fields.copy(), payload) 309 assert len(remain) == 0 310 return child, span 311 except Exception as exn: 312 pass 313 try: 314 child, remain = ContentProtectionCapability.parse(fields.copy(), payload) 315 assert len(remain) == 0 316 return child, span 317 except Exception as exn: 318 pass 319 try: 320 child, remain = HeaderCompressionCapability.parse(fields.copy(), payload) 321 assert len(remain) == 0 322 return child, span 323 except Exception as exn: 324 pass 325 try: 326 child, remain = MultiplexingCapability.parse(fields.copy(), payload) 327 assert len(remain) == 0 328 return child, span 329 except Exception as exn: 330 pass 331 try: 332 child, remain = MediaCodecCapability.parse(fields.copy(), payload) 333 assert len(remain) == 0 334 return child, span 335 except Exception as exn: 336 pass 337 try: 338 child, remain = DelayReportingCapability.parse(fields.copy(), payload) 339 assert len(remain) == 0 340 return child, span 341 except Exception as exn: 342 pass 343 return ServiceCapability(**fields), span 344 345 def serialize(self, payload: bytes = None) -> bytes: 346 _span = bytearray() 347 _span.append((self.service_category << 0)) 348 _payload_size = len(payload or self.payload or []) 349 if _payload_size > 255: 350 print(f"Invalid length for payload field: {_payload_size} > 255; the packet cannot be generated") 351 raise Exception("Invalid payload length") 352 _span.append((_payload_size << 0)) 353 _span.extend(payload or self.payload or []) 354 return bytes(_span) 355 356 @property 357 def size(self) -> int: 358 return len(self.payload) + 2 359 360 361@dataclass 362class MediaTransportCapability(ServiceCapability): 363 364 def __post_init__(self): 365 self.service_category = ServiceCategory.MEDIA_TRANSPORT 366 367 @staticmethod 368 def parse(fields: dict, span: bytes) -> Tuple["MediaTransportCapability", bytes]: 369 if fields["service_category"] != ServiceCategory.MEDIA_TRANSPORT: 370 raise Exception("Invalid constraint field values") 371 return MediaTransportCapability(**fields), span 372 373 def serialize(self, payload: bytes = None) -> bytes: 374 _span = bytearray() 375 return ServiceCapability.serialize(self, payload=bytes(_span)) 376 377 @property 378 def size(self) -> int: 379 return 0 380 381 382@dataclass 383class ReportingCapability(ServiceCapability): 384 385 def __post_init__(self): 386 self.service_category = ServiceCategory.REPORTING 387 388 @staticmethod 389 def parse(fields: dict, span: bytes) -> Tuple["ReportingCapability", bytes]: 390 if fields["service_category"] != ServiceCategory.REPORTING: 391 raise Exception("Invalid constraint field values") 392 return ReportingCapability(**fields), span 393 394 def serialize(self, payload: bytes = None) -> bytes: 395 _span = bytearray() 396 return ServiceCapability.serialize(self, payload=bytes(_span)) 397 398 @property 399 def size(self) -> int: 400 return 0 401 402 403@dataclass 404class RecoveryCapability(ServiceCapability): 405 recovery_type: int = field(kw_only=True, default=0) 406 maximum_recovery_window_size: int = field(kw_only=True, default=0) 407 maximum_number_of_media_packets_in_parity_code: int = field(kw_only=True, default=0) 408 409 def __post_init__(self): 410 self.service_category = ServiceCategory.RECOVERY 411 412 @staticmethod 413 def parse(fields: dict, span: bytes) -> Tuple["RecoveryCapability", bytes]: 414 if fields["service_category"] != ServiceCategory.RECOVERY: 415 raise Exception("Invalid constraint field values") 416 if len(span) < 3: 417 raise Exception("Invalid packet size") 418 fields["recovery_type"] = span[0] 419 fields["maximum_recovery_window_size"] = span[1] 420 fields["maximum_number_of_media_packets_in_parity_code"] = span[2] 421 span = span[3:] 422 return RecoveryCapability(**fields), span 423 424 def serialize(self, payload: bytes = None) -> bytes: 425 _span = bytearray() 426 if self.recovery_type > 255: 427 print( 428 f"Invalid value for field RecoveryCapability::recovery_type: {self.recovery_type} > 255; the value will be truncated" 429 ) 430 self.recovery_type &= 255 431 _span.append((self.recovery_type << 0)) 432 if self.maximum_recovery_window_size > 255: 433 print( 434 f"Invalid value for field RecoveryCapability::maximum_recovery_window_size: {self.maximum_recovery_window_size} > 255; the value will be truncated" 435 ) 436 self.maximum_recovery_window_size &= 255 437 _span.append((self.maximum_recovery_window_size << 0)) 438 if self.maximum_number_of_media_packets_in_parity_code > 255: 439 print( 440 f"Invalid value for field RecoveryCapability::maximum_number_of_media_packets_in_parity_code: {self.maximum_number_of_media_packets_in_parity_code} > 255; the value will be truncated" 441 ) 442 self.maximum_number_of_media_packets_in_parity_code &= 255 443 _span.append((self.maximum_number_of_media_packets_in_parity_code << 0)) 444 return ServiceCapability.serialize(self, payload=bytes(_span)) 445 446 @property 447 def size(self) -> int: 448 return 3 449 450 451@dataclass 452class ContentProtectionCapability(ServiceCapability): 453 cp_type: int = field(kw_only=True, default=0) 454 455 def __post_init__(self): 456 self.service_category = ServiceCategory.CONTENT_PROTECTION 457 458 @staticmethod 459 def parse(fields: dict, span: bytes) -> Tuple["ContentProtectionCapability", bytes]: 460 if fields["service_category"] != ServiceCategory.CONTENT_PROTECTION: 461 raise Exception("Invalid constraint field values") 462 if len(span) < 2: 463 raise Exception("Invalid packet size") 464 value_ = int.from_bytes(span[0:2], byteorder="little") 465 fields["cp_type"] = value_ 466 span = span[2:] 467 payload = span 468 span = bytes([]) 469 fields["payload"] = payload 470 return ContentProtectionCapability(**fields), span 471 472 def serialize(self, payload: bytes = None) -> bytes: 473 _span = bytearray() 474 if self.cp_type > 65535: 475 print( 476 f"Invalid value for field ContentProtectionCapability::cp_type: {self.cp_type} > 65535; the value will be truncated" 477 ) 478 self.cp_type &= 65535 479 _span.extend(int.to_bytes((self.cp_type << 0), length=2, byteorder="little")) 480 _span.extend(payload or self.payload or []) 481 return ServiceCapability.serialize(self, payload=bytes(_span)) 482 483 @property 484 def size(self) -> int: 485 return len(self.payload) + 2 486 487 488@dataclass 489class HeaderCompressionCapability(ServiceCapability): 490 recovery: int = field(kw_only=True, default=0) 491 media: int = field(kw_only=True, default=0) 492 back_ch: int = field(kw_only=True, default=0) 493 494 def __post_init__(self): 495 self.service_category = ServiceCategory.HEADER_COMPRESSION 496 497 @staticmethod 498 def parse(fields: dict, span: bytes) -> Tuple["HeaderCompressionCapability", bytes]: 499 if fields["service_category"] != ServiceCategory.HEADER_COMPRESSION: 500 raise Exception("Invalid constraint field values") 501 if len(span) < 1: 502 raise Exception("Invalid packet size") 503 fields["recovery"] = (span[0] >> 5) & 0x1 504 fields["media"] = (span[0] >> 6) & 0x1 505 fields["back_ch"] = (span[0] >> 7) & 0x1 506 span = span[1:] 507 return HeaderCompressionCapability(**fields), span 508 509 def serialize(self, payload: bytes = None) -> bytes: 510 _span = bytearray() 511 if self.recovery > 1: 512 print( 513 f"Invalid value for field HeaderCompressionCapability::recovery: {self.recovery} > 1; the value will be truncated" 514 ) 515 self.recovery &= 1 516 if self.media > 1: 517 print( 518 f"Invalid value for field HeaderCompressionCapability::media: {self.media} > 1; the value will be truncated" 519 ) 520 self.media &= 1 521 if self.back_ch > 1: 522 print( 523 f"Invalid value for field HeaderCompressionCapability::back_ch: {self.back_ch} > 1; the value will be truncated" 524 ) 525 self.back_ch &= 1 526 _value = (self.recovery << 5) | (self.media << 6) | (self.back_ch << 7) 527 _span.append(_value) 528 return ServiceCapability.serialize(self, payload=bytes(_span)) 529 530 @property 531 def size(self) -> int: 532 return 1 533 534 535@dataclass 536class MultiplexingCapability(ServiceCapability): 537 frag: int = field(kw_only=True, default=0) 538 539 def __post_init__(self): 540 self.service_category = ServiceCategory.MULTIPLEXING 541 542 @staticmethod 543 def parse(fields: dict, span: bytes) -> Tuple["MultiplexingCapability", bytes]: 544 if fields["service_category"] != ServiceCategory.MULTIPLEXING: 545 raise Exception("Invalid constraint field values") 546 if len(span) < 1: 547 raise Exception("Invalid packet size") 548 fields["frag"] = (span[0] >> 7) & 0x1 549 span = span[1:] 550 payload = span 551 span = bytes([]) 552 fields["payload"] = payload 553 return MultiplexingCapability(**fields), span 554 555 def serialize(self, payload: bytes = None) -> bytes: 556 _span = bytearray() 557 if self.frag > 1: 558 print(f"Invalid value for field MultiplexingCapability::frag: {self.frag} > 1; the value will be truncated") 559 self.frag &= 1 560 _span.append((self.frag << 7)) 561 _span.extend(payload or self.payload or []) 562 return ServiceCapability.serialize(self, payload=bytes(_span)) 563 564 @property 565 def size(self) -> int: 566 return len(self.payload) + 1 567 568 569@dataclass 570class MediaCodecCapability(ServiceCapability): 571 media_type: int = field(kw_only=True, default=0) 572 media_codec_type: int = field(kw_only=True, default=0) 573 media_codec_specific_information_elements: bytearray = field(kw_only=True, default_factory=bytearray) 574 575 def __post_init__(self): 576 self.service_category = ServiceCategory.MEDIA_CODEC 577 578 @staticmethod 579 def parse(fields: dict, span: bytes) -> Tuple["MediaCodecCapability", bytes]: 580 if fields["service_category"] != ServiceCategory.MEDIA_CODEC: 581 raise Exception("Invalid constraint field values") 582 if len(span) < 2: 583 raise Exception("Invalid packet size") 584 fields["media_type"] = (span[0] >> 4) & 0xF 585 fields["media_codec_type"] = span[1] 586 span = span[2:] 587 fields["media_codec_specific_information_elements"] = list(span) 588 span = bytes() 589 return MediaCodecCapability(**fields), span 590 591 def serialize(self, payload: bytes = None) -> bytes: 592 _span = bytearray() 593 if self.media_type > 15: 594 print( 595 f"Invalid value for field MediaCodecCapability::media_type: {self.media_type} > 15; the value will be truncated" 596 ) 597 self.media_type &= 15 598 _span.append((self.media_type << 4)) 599 if self.media_codec_type > 255: 600 print( 601 f"Invalid value for field MediaCodecCapability::media_codec_type: {self.media_codec_type} > 255; the value will be truncated" 602 ) 603 self.media_codec_type &= 255 604 _span.append((self.media_codec_type << 0)) 605 _span.extend(self.media_codec_specific_information_elements) 606 return ServiceCapability.serialize(self, payload=bytes(_span)) 607 608 @property 609 def size(self) -> int: 610 return len(self.media_codec_specific_information_elements) * 1 + 2 611 612 613@dataclass 614class DelayReportingCapability(ServiceCapability): 615 616 def __post_init__(self): 617 self.service_category = ServiceCategory.DELAY_REPORTING 618 619 @staticmethod 620 def parse(fields: dict, span: bytes) -> Tuple["DelayReportingCapability", bytes]: 621 if fields["service_category"] != ServiceCategory.DELAY_REPORTING: 622 raise Exception("Invalid constraint field values") 623 return DelayReportingCapability(**fields), span 624 625 def serialize(self, payload: bytes = None) -> bytes: 626 _span = bytearray() 627 return ServiceCapability.serialize(self, payload=bytes(_span)) 628 629 @property 630 def size(self) -> int: 631 return 0 632 633 634@dataclass 635class SignalingPacket(Packet): 636 message_type: MessageType = field(kw_only=True, default=MessageType.COMMAND) 637 packet_type: PacketType = field(kw_only=True, default=PacketType.SINGLE_PACKET) 638 transaction_label: int = field(kw_only=True, default=0) 639 signal_identifier: SignalIdentifier = field(kw_only=True, default=SignalIdentifier.AVDTP_DISCOVER) 640 641 def __post_init__(self): 642 pass 643 644 @staticmethod 645 def parse(span: bytes) -> Tuple["SignalingPacket", bytes]: 646 fields = {"payload": None} 647 if len(span) < 2: 648 raise Exception("Invalid packet size") 649 fields["message_type"] = MessageType.from_int((span[0] >> 0) & 0x3) 650 fields["packet_type"] = PacketType.from_int((span[0] >> 2) & 0x3) 651 fields["transaction_label"] = (span[0] >> 4) & 0xF 652 fields["signal_identifier"] = SignalIdentifier.from_int((span[1] >> 0) & 0x3F) 653 span = span[2:] 654 payload = span 655 span = bytes([]) 656 fields["payload"] = payload 657 try: 658 child, remain = DiscoverCommand.parse(fields.copy(), payload) 659 assert len(remain) == 0 660 return child, span 661 except Exception as exn: 662 pass 663 try: 664 child, remain = DiscoverResponse.parse(fields.copy(), payload) 665 assert len(remain) == 0 666 return child, span 667 except Exception as exn: 668 pass 669 try: 670 child, remain = DiscoverReject.parse(fields.copy(), payload) 671 assert len(remain) == 0 672 return child, span 673 except Exception as exn: 674 pass 675 try: 676 child, remain = GetCapabilitiesCommand.parse(fields.copy(), payload) 677 assert len(remain) == 0 678 return child, span 679 except Exception as exn: 680 pass 681 try: 682 child, remain = GetCapabilitiesResponse.parse(fields.copy(), payload) 683 assert len(remain) == 0 684 return child, span 685 except Exception as exn: 686 pass 687 try: 688 child, remain = GetCapabilitiesReject.parse(fields.copy(), payload) 689 assert len(remain) == 0 690 return child, span 691 except Exception as exn: 692 pass 693 try: 694 child, remain = GetAllCapabilitiesCommand.parse(fields.copy(), payload) 695 assert len(remain) == 0 696 return child, span 697 except Exception as exn: 698 pass 699 try: 700 child, remain = GetAllCapabilitiesResponse.parse(fields.copy(), payload) 701 assert len(remain) == 0 702 return child, span 703 except Exception as exn: 704 pass 705 try: 706 child, remain = GetAllCapabilitiesReject.parse(fields.copy(), payload) 707 assert len(remain) == 0 708 return child, span 709 except Exception as exn: 710 pass 711 try: 712 child, remain = SetConfigurationCommand.parse(fields.copy(), payload) 713 assert len(remain) == 0 714 return child, span 715 except Exception as exn: 716 pass 717 try: 718 child, remain = SetConfigurationResponse.parse(fields.copy(), payload) 719 assert len(remain) == 0 720 return child, span 721 except Exception as exn: 722 pass 723 try: 724 child, remain = SetConfigurationReject.parse(fields.copy(), payload) 725 assert len(remain) == 0 726 return child, span 727 except Exception as exn: 728 pass 729 try: 730 child, remain = GetConfigurationCommand.parse(fields.copy(), payload) 731 assert len(remain) == 0 732 return child, span 733 except Exception as exn: 734 pass 735 try: 736 child, remain = GetConfigurationResponse.parse(fields.copy(), payload) 737 assert len(remain) == 0 738 return child, span 739 except Exception as exn: 740 pass 741 try: 742 child, remain = GetConfigurationReject.parse(fields.copy(), payload) 743 assert len(remain) == 0 744 return child, span 745 except Exception as exn: 746 pass 747 try: 748 child, remain = ReconfigureCommand.parse(fields.copy(), payload) 749 assert len(remain) == 0 750 return child, span 751 except Exception as exn: 752 pass 753 try: 754 child, remain = ReconfigureResponse.parse(fields.copy(), payload) 755 assert len(remain) == 0 756 return child, span 757 except Exception as exn: 758 pass 759 try: 760 child, remain = ReconfigureReject.parse(fields.copy(), payload) 761 assert len(remain) == 0 762 return child, span 763 except Exception as exn: 764 pass 765 try: 766 child, remain = OpenCommand.parse(fields.copy(), payload) 767 assert len(remain) == 0 768 return child, span 769 except Exception as exn: 770 pass 771 try: 772 child, remain = OpenResponse.parse(fields.copy(), payload) 773 assert len(remain) == 0 774 return child, span 775 except Exception as exn: 776 pass 777 try: 778 child, remain = OpenReject.parse(fields.copy(), payload) 779 assert len(remain) == 0 780 return child, span 781 except Exception as exn: 782 pass 783 try: 784 child, remain = StartCommand.parse(fields.copy(), payload) 785 assert len(remain) == 0 786 return child, span 787 except Exception as exn: 788 pass 789 try: 790 child, remain = StartResponse.parse(fields.copy(), payload) 791 assert len(remain) == 0 792 return child, span 793 except Exception as exn: 794 pass 795 try: 796 child, remain = StartReject.parse(fields.copy(), payload) 797 assert len(remain) == 0 798 return child, span 799 except Exception as exn: 800 pass 801 try: 802 child, remain = CloseCommand.parse(fields.copy(), payload) 803 assert len(remain) == 0 804 return child, span 805 except Exception as exn: 806 pass 807 try: 808 child, remain = CloseResponse.parse(fields.copy(), payload) 809 assert len(remain) == 0 810 return child, span 811 except Exception as exn: 812 pass 813 try: 814 child, remain = CloseReject.parse(fields.copy(), payload) 815 assert len(remain) == 0 816 return child, span 817 except Exception as exn: 818 pass 819 try: 820 child, remain = SuspendCommand.parse(fields.copy(), payload) 821 assert len(remain) == 0 822 return child, span 823 except Exception as exn: 824 pass 825 try: 826 child, remain = SuspendResponse.parse(fields.copy(), payload) 827 assert len(remain) == 0 828 return child, span 829 except Exception as exn: 830 pass 831 try: 832 child, remain = SuspendReject.parse(fields.copy(), payload) 833 assert len(remain) == 0 834 return child, span 835 except Exception as exn: 836 pass 837 try: 838 child, remain = AbortCommand.parse(fields.copy(), payload) 839 assert len(remain) == 0 840 return child, span 841 except Exception as exn: 842 pass 843 try: 844 child, remain = AbortResponse.parse(fields.copy(), payload) 845 assert len(remain) == 0 846 return child, span 847 except Exception as exn: 848 pass 849 try: 850 child, remain = SecurityControlCommand.parse(fields.copy(), payload) 851 assert len(remain) == 0 852 return child, span 853 except Exception as exn: 854 pass 855 try: 856 child, remain = SecurityControlResponse.parse(fields.copy(), payload) 857 assert len(remain) == 0 858 return child, span 859 except Exception as exn: 860 pass 861 try: 862 child, remain = SecurityControlReject.parse(fields.copy(), payload) 863 assert len(remain) == 0 864 return child, span 865 except Exception as exn: 866 pass 867 try: 868 child, remain = GeneralReject.parse(fields.copy(), payload) 869 assert len(remain) == 0 870 return child, span 871 except Exception as exn: 872 pass 873 try: 874 child, remain = DelayReportCommand.parse(fields.copy(), payload) 875 assert len(remain) == 0 876 return child, span 877 except Exception as exn: 878 pass 879 try: 880 child, remain = DelayReportResponse.parse(fields.copy(), payload) 881 assert len(remain) == 0 882 return child, span 883 except Exception as exn: 884 pass 885 try: 886 child, remain = DelayReportReject.parse(fields.copy(), payload) 887 assert len(remain) == 0 888 return child, span 889 except Exception as exn: 890 pass 891 return SignalingPacket(**fields), span 892 893 def serialize(self, payload: bytes = None) -> bytes: 894 _span = bytearray() 895 if self.transaction_label > 15: 896 print( 897 f"Invalid value for field SignalingPacket::transaction_label: {self.transaction_label} > 15; the value will be truncated" 898 ) 899 self.transaction_label &= 15 900 _value = ((self.message_type << 0) | (self.packet_type << 2) | (self.transaction_label << 4)) 901 _span.append(_value) 902 _span.append((self.signal_identifier << 0)) 903 _span.extend(payload or self.payload or []) 904 return bytes(_span) 905 906 @property 907 def size(self) -> int: 908 return len(self.payload) + 2 909 910 911@dataclass 912class DiscoverCommand(SignalingPacket): 913 914 def __post_init__(self): 915 self.message_type = MessageType.COMMAND 916 self.signal_identifier = SignalIdentifier.AVDTP_DISCOVER 917 918 @staticmethod 919 def parse(fields: dict, span: bytes) -> Tuple["DiscoverCommand", bytes]: 920 if (fields["message_type"] != MessageType.COMMAND or 921 fields["signal_identifier"] != SignalIdentifier.AVDTP_DISCOVER): 922 raise Exception("Invalid constraint field values") 923 return DiscoverCommand(**fields), span 924 925 def serialize(self, payload: bytes = None) -> bytes: 926 _span = bytearray() 927 return SignalingPacket.serialize(self, payload=bytes(_span)) 928 929 @property 930 def size(self) -> int: 931 return 0 932 933 934@dataclass 935class DiscoverResponse(SignalingPacket): 936 seid_information: List[SeidInformation] = field(kw_only=True, default_factory=list) 937 938 def __post_init__(self): 939 self.message_type = MessageType.RESPONSE_ACCEPT 940 self.signal_identifier = SignalIdentifier.AVDTP_DISCOVER 941 942 @staticmethod 943 def parse(fields: dict, span: bytes) -> Tuple["DiscoverResponse", bytes]: 944 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 945 fields["signal_identifier"] != SignalIdentifier.AVDTP_DISCOVER): 946 raise Exception("Invalid constraint field values") 947 if len(span) % 2 != 0: 948 raise Exception("Array size is not a multiple of the element size") 949 seid_information_count = int(len(span) / 2) 950 seid_information = [] 951 for n in range(seid_information_count): 952 seid_information.append(SeidInformation.parse_all(span[n * 2:(n + 1) * 2])) 953 fields["seid_information"] = seid_information 954 span = bytes() 955 return DiscoverResponse(**fields), span 956 957 def serialize(self, payload: bytes = None) -> bytes: 958 _span = bytearray() 959 for _elt in self.seid_information: 960 _span.extend(_elt.serialize()) 961 return SignalingPacket.serialize(self, payload=bytes(_span)) 962 963 @property 964 def size(self) -> int: 965 return sum([elt.size for elt in self.seid_information]) 966 967 968@dataclass 969class DiscoverReject(SignalingPacket): 970 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 971 972 def __post_init__(self): 973 self.message_type = MessageType.RESPONSE_REJECT 974 self.signal_identifier = SignalIdentifier.AVDTP_DISCOVER 975 976 @staticmethod 977 def parse(fields: dict, span: bytes) -> Tuple["DiscoverReject", bytes]: 978 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 979 fields["signal_identifier"] != SignalIdentifier.AVDTP_DISCOVER): 980 raise Exception("Invalid constraint field values") 981 if len(span) < 1: 982 raise Exception("Invalid packet size") 983 fields["error_code"] = ErrorCode.from_int(span[0]) 984 span = span[1:] 985 return DiscoverReject(**fields), span 986 987 def serialize(self, payload: bytes = None) -> bytes: 988 _span = bytearray() 989 _span.append((self.error_code << 0)) 990 return SignalingPacket.serialize(self, payload=bytes(_span)) 991 992 @property 993 def size(self) -> int: 994 return 1 995 996 997@dataclass 998class GetCapabilitiesCommand(SignalingPacket): 999 acp_seid: int = field(kw_only=True, default=0) 1000 1001 def __post_init__(self): 1002 self.message_type = MessageType.COMMAND 1003 self.signal_identifier = SignalIdentifier.AVDTP_GET_CAPABILITIES 1004 1005 @staticmethod 1006 def parse(fields: dict, span: bytes) -> Tuple["GetCapabilitiesCommand", bytes]: 1007 if (fields["message_type"] != MessageType.COMMAND or 1008 fields["signal_identifier"] != SignalIdentifier.AVDTP_GET_CAPABILITIES): 1009 raise Exception("Invalid constraint field values") 1010 if len(span) < 1: 1011 raise Exception("Invalid packet size") 1012 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1013 span = span[1:] 1014 return GetCapabilitiesCommand(**fields), span 1015 1016 def serialize(self, payload: bytes = None) -> bytes: 1017 _span = bytearray() 1018 if self.acp_seid > 63: 1019 print( 1020 f"Invalid value for field GetCapabilitiesCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated" 1021 ) 1022 self.acp_seid &= 63 1023 _span.append((self.acp_seid << 2)) 1024 return SignalingPacket.serialize(self, payload=bytes(_span)) 1025 1026 @property 1027 def size(self) -> int: 1028 return 1 1029 1030 1031@dataclass 1032class GetCapabilitiesResponse(SignalingPacket): 1033 service_capabilities: List[ServiceCapability] = field(kw_only=True, default_factory=list) 1034 1035 def __post_init__(self): 1036 self.message_type = MessageType.RESPONSE_ACCEPT 1037 self.signal_identifier = SignalIdentifier.AVDTP_GET_CAPABILITIES 1038 1039 @staticmethod 1040 def parse(fields: dict, span: bytes) -> Tuple["GetCapabilitiesResponse", bytes]: 1041 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1042 fields["signal_identifier"] != SignalIdentifier.AVDTP_GET_CAPABILITIES): 1043 raise Exception("Invalid constraint field values") 1044 service_capabilities = [] 1045 while len(span) > 0: 1046 element, span = ServiceCapability.parse(span) 1047 service_capabilities.append(element) 1048 fields["service_capabilities"] = service_capabilities 1049 return GetCapabilitiesResponse(**fields), span 1050 1051 def serialize(self, payload: bytes = None) -> bytes: 1052 _span = bytearray() 1053 for _elt in self.service_capabilities: 1054 _span.extend(_elt.serialize()) 1055 return SignalingPacket.serialize(self, payload=bytes(_span)) 1056 1057 @property 1058 def size(self) -> int: 1059 return sum([elt.size for elt in self.service_capabilities]) 1060 1061 1062@dataclass 1063class GetCapabilitiesReject(SignalingPacket): 1064 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1065 1066 def __post_init__(self): 1067 self.message_type = MessageType.RESPONSE_REJECT 1068 self.signal_identifier = SignalIdentifier.AVDTP_GET_CAPABILITIES 1069 1070 @staticmethod 1071 def parse(fields: dict, span: bytes) -> Tuple["GetCapabilitiesReject", bytes]: 1072 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1073 fields["signal_identifier"] != SignalIdentifier.AVDTP_GET_CAPABILITIES): 1074 raise Exception("Invalid constraint field values") 1075 if len(span) < 1: 1076 raise Exception("Invalid packet size") 1077 fields["error_code"] = ErrorCode.from_int(span[0]) 1078 span = span[1:] 1079 return GetCapabilitiesReject(**fields), span 1080 1081 def serialize(self, payload: bytes = None) -> bytes: 1082 _span = bytearray() 1083 _span.append((self.error_code << 0)) 1084 return SignalingPacket.serialize(self, payload=bytes(_span)) 1085 1086 @property 1087 def size(self) -> int: 1088 return 1 1089 1090 1091@dataclass 1092class GetAllCapabilitiesCommand(SignalingPacket): 1093 acp_seid: int = field(kw_only=True, default=0) 1094 1095 def __post_init__(self): 1096 self.message_type = MessageType.COMMAND 1097 self.signal_identifier = SignalIdentifier.AVDTP_GET_ALL_CAPABILITIES 1098 1099 @staticmethod 1100 def parse(fields: dict, span: bytes) -> Tuple["GetAllCapabilitiesCommand", bytes]: 1101 if (fields["message_type"] != MessageType.COMMAND or 1102 fields["signal_identifier"] != SignalIdentifier.AVDTP_GET_ALL_CAPABILITIES): 1103 raise Exception("Invalid constraint field values") 1104 if len(span) < 1: 1105 raise Exception("Invalid packet size") 1106 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1107 span = span[1:] 1108 return GetAllCapabilitiesCommand(**fields), span 1109 1110 def serialize(self, payload: bytes = None) -> bytes: 1111 _span = bytearray() 1112 if self.acp_seid > 63: 1113 print( 1114 f"Invalid value for field GetAllCapabilitiesCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated" 1115 ) 1116 self.acp_seid &= 63 1117 _span.append((self.acp_seid << 2)) 1118 return SignalingPacket.serialize(self, payload=bytes(_span)) 1119 1120 @property 1121 def size(self) -> int: 1122 return 1 1123 1124 1125@dataclass 1126class GetAllCapabilitiesResponse(SignalingPacket): 1127 service_capabilities: List[ServiceCapability] = field(kw_only=True, default_factory=list) 1128 1129 def __post_init__(self): 1130 self.message_type = MessageType.RESPONSE_ACCEPT 1131 self.signal_identifier = SignalIdentifier.AVDTP_GET_ALL_CAPABILITIES 1132 1133 @staticmethod 1134 def parse(fields: dict, span: bytes) -> Tuple["GetAllCapabilitiesResponse", bytes]: 1135 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1136 fields["signal_identifier"] != SignalIdentifier.AVDTP_GET_ALL_CAPABILITIES): 1137 raise Exception("Invalid constraint field values") 1138 service_capabilities = [] 1139 while len(span) > 0: 1140 element, span = ServiceCapability.parse(span) 1141 service_capabilities.append(element) 1142 fields["service_capabilities"] = service_capabilities 1143 return GetAllCapabilitiesResponse(**fields), span 1144 1145 def serialize(self, payload: bytes = None) -> bytes: 1146 _span = bytearray() 1147 for _elt in self.service_capabilities: 1148 _span.extend(_elt.serialize()) 1149 return SignalingPacket.serialize(self, payload=bytes(_span)) 1150 1151 @property 1152 def size(self) -> int: 1153 return sum([elt.size for elt in self.service_capabilities]) 1154 1155 1156@dataclass 1157class GetAllCapabilitiesReject(SignalingPacket): 1158 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1159 1160 def __post_init__(self): 1161 self.message_type = MessageType.RESPONSE_REJECT 1162 self.signal_identifier = SignalIdentifier.AVDTP_GET_ALL_CAPABILITIES 1163 1164 @staticmethod 1165 def parse(fields: dict, span: bytes) -> Tuple["GetAllCapabilitiesReject", bytes]: 1166 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1167 fields["signal_identifier"] != SignalIdentifier.AVDTP_GET_ALL_CAPABILITIES): 1168 raise Exception("Invalid constraint field values") 1169 if len(span) < 1: 1170 raise Exception("Invalid packet size") 1171 fields["error_code"] = ErrorCode.from_int(span[0]) 1172 span = span[1:] 1173 return GetAllCapabilitiesReject(**fields), span 1174 1175 def serialize(self, payload: bytes = None) -> bytes: 1176 _span = bytearray() 1177 _span.append((self.error_code << 0)) 1178 return SignalingPacket.serialize(self, payload=bytes(_span)) 1179 1180 @property 1181 def size(self) -> int: 1182 return 1 1183 1184 1185@dataclass 1186class SetConfigurationCommand(SignalingPacket): 1187 acp_seid: int = field(kw_only=True, default=0) 1188 int_seid: int = field(kw_only=True, default=0) 1189 service_capabilities: List[ServiceCapability] = field(kw_only=True, default_factory=list) 1190 1191 def __post_init__(self): 1192 self.message_type = MessageType.COMMAND 1193 self.signal_identifier = SignalIdentifier.AVDTP_SET_CONFIGURATION 1194 1195 @staticmethod 1196 def parse(fields: dict, span: bytes) -> Tuple["SetConfigurationCommand", bytes]: 1197 if (fields["message_type"] != MessageType.COMMAND or 1198 fields["signal_identifier"] != SignalIdentifier.AVDTP_SET_CONFIGURATION): 1199 raise Exception("Invalid constraint field values") 1200 if len(span) < 2: 1201 raise Exception("Invalid packet size") 1202 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1203 fields["int_seid"] = (span[1] >> 2) & 0x3F 1204 span = span[2:] 1205 service_capabilities = [] 1206 while len(span) > 0: 1207 element, span = ServiceCapability.parse(span) 1208 service_capabilities.append(element) 1209 fields["service_capabilities"] = service_capabilities 1210 return SetConfigurationCommand(**fields), span 1211 1212 def serialize(self, payload: bytes = None) -> bytes: 1213 _span = bytearray() 1214 if self.acp_seid > 63: 1215 print( 1216 f"Invalid value for field SetConfigurationCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated" 1217 ) 1218 self.acp_seid &= 63 1219 _span.append((self.acp_seid << 2)) 1220 if self.int_seid > 63: 1221 print( 1222 f"Invalid value for field SetConfigurationCommand::int_seid: {self.int_seid} > 63; the value will be truncated" 1223 ) 1224 self.int_seid &= 63 1225 _span.append((self.int_seid << 2)) 1226 for _elt in self.service_capabilities: 1227 _span.extend(_elt.serialize()) 1228 return SignalingPacket.serialize(self, payload=bytes(_span)) 1229 1230 @property 1231 def size(self) -> int: 1232 return sum([elt.size for elt in self.service_capabilities]) + 2 1233 1234 1235@dataclass 1236class SetConfigurationResponse(SignalingPacket): 1237 1238 def __post_init__(self): 1239 self.message_type = MessageType.RESPONSE_ACCEPT 1240 self.signal_identifier = SignalIdentifier.AVDTP_SET_CONFIGURATION 1241 1242 @staticmethod 1243 def parse(fields: dict, span: bytes) -> Tuple["SetConfigurationResponse", bytes]: 1244 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1245 fields["signal_identifier"] != SignalIdentifier.AVDTP_SET_CONFIGURATION): 1246 raise Exception("Invalid constraint field values") 1247 return SetConfigurationResponse(**fields), span 1248 1249 def serialize(self, payload: bytes = None) -> bytes: 1250 _span = bytearray() 1251 return SignalingPacket.serialize(self, payload=bytes(_span)) 1252 1253 @property 1254 def size(self) -> int: 1255 return 0 1256 1257 1258@dataclass 1259class SetConfigurationReject(SignalingPacket): 1260 service_category: int = field(kw_only=True, default=0) 1261 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1262 1263 def __post_init__(self): 1264 self.message_type = MessageType.RESPONSE_REJECT 1265 self.signal_identifier = SignalIdentifier.AVDTP_SET_CONFIGURATION 1266 1267 @staticmethod 1268 def parse(fields: dict, span: bytes) -> Tuple["SetConfigurationReject", bytes]: 1269 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1270 fields["signal_identifier"] != SignalIdentifier.AVDTP_SET_CONFIGURATION): 1271 raise Exception("Invalid constraint field values") 1272 if len(span) < 2: 1273 raise Exception("Invalid packet size") 1274 fields["service_category"] = span[0] 1275 fields["error_code"] = ErrorCode.from_int(span[1]) 1276 span = span[2:] 1277 return SetConfigurationReject(**fields), span 1278 1279 def serialize(self, payload: bytes = None) -> bytes: 1280 _span = bytearray() 1281 if self.service_category > 255: 1282 print( 1283 f"Invalid value for field SetConfigurationReject::service_category: {self.service_category} > 255; the value will be truncated" 1284 ) 1285 self.service_category &= 255 1286 _span.append((self.service_category << 0)) 1287 _span.append((self.error_code << 0)) 1288 return SignalingPacket.serialize(self, payload=bytes(_span)) 1289 1290 @property 1291 def size(self) -> int: 1292 return 2 1293 1294 1295@dataclass 1296class GetConfigurationCommand(SignalingPacket): 1297 acp_seid: int = field(kw_only=True, default=0) 1298 1299 def __post_init__(self): 1300 self.message_type = MessageType.COMMAND 1301 self.signal_identifier = SignalIdentifier.AVDTP_GET_CONFIGURATION 1302 1303 @staticmethod 1304 def parse(fields: dict, span: bytes) -> Tuple["GetConfigurationCommand", bytes]: 1305 if (fields["message_type"] != MessageType.COMMAND or 1306 fields["signal_identifier"] != SignalIdentifier.AVDTP_GET_CONFIGURATION): 1307 raise Exception("Invalid constraint field values") 1308 if len(span) < 1: 1309 raise Exception("Invalid packet size") 1310 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1311 span = span[1:] 1312 return GetConfigurationCommand(**fields), span 1313 1314 def serialize(self, payload: bytes = None) -> bytes: 1315 _span = bytearray() 1316 if self.acp_seid > 63: 1317 print( 1318 f"Invalid value for field GetConfigurationCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated" 1319 ) 1320 self.acp_seid &= 63 1321 _span.append((self.acp_seid << 2)) 1322 return SignalingPacket.serialize(self, payload=bytes(_span)) 1323 1324 @property 1325 def size(self) -> int: 1326 return 1 1327 1328 1329@dataclass 1330class GetConfigurationResponse(SignalingPacket): 1331 service_capabilities: List[ServiceCapability] = field(kw_only=True, default_factory=list) 1332 1333 def __post_init__(self): 1334 self.message_type = MessageType.RESPONSE_ACCEPT 1335 self.signal_identifier = SignalIdentifier.AVDTP_GET_CONFIGURATION 1336 1337 @staticmethod 1338 def parse(fields: dict, span: bytes) -> Tuple["GetConfigurationResponse", bytes]: 1339 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1340 fields["signal_identifier"] != SignalIdentifier.AVDTP_GET_CONFIGURATION): 1341 raise Exception("Invalid constraint field values") 1342 service_capabilities = [] 1343 while len(span) > 0: 1344 element, span = ServiceCapability.parse(span) 1345 service_capabilities.append(element) 1346 fields["service_capabilities"] = service_capabilities 1347 return GetConfigurationResponse(**fields), span 1348 1349 def serialize(self, payload: bytes = None) -> bytes: 1350 _span = bytearray() 1351 for _elt in self.service_capabilities: 1352 _span.extend(_elt.serialize()) 1353 return SignalingPacket.serialize(self, payload=bytes(_span)) 1354 1355 @property 1356 def size(self) -> int: 1357 return sum([elt.size for elt in self.service_capabilities]) 1358 1359 1360@dataclass 1361class GetConfigurationReject(SignalingPacket): 1362 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1363 1364 def __post_init__(self): 1365 self.message_type = MessageType.RESPONSE_REJECT 1366 self.signal_identifier = SignalIdentifier.AVDTP_GET_CONFIGURATION 1367 1368 @staticmethod 1369 def parse(fields: dict, span: bytes) -> Tuple["GetConfigurationReject", bytes]: 1370 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1371 fields["signal_identifier"] != SignalIdentifier.AVDTP_GET_CONFIGURATION): 1372 raise Exception("Invalid constraint field values") 1373 if len(span) < 1: 1374 raise Exception("Invalid packet size") 1375 fields["error_code"] = ErrorCode.from_int(span[0]) 1376 span = span[1:] 1377 return GetConfigurationReject(**fields), span 1378 1379 def serialize(self, payload: bytes = None) -> bytes: 1380 _span = bytearray() 1381 _span.append((self.error_code << 0)) 1382 return SignalingPacket.serialize(self, payload=bytes(_span)) 1383 1384 @property 1385 def size(self) -> int: 1386 return 1 1387 1388 1389@dataclass 1390class ReconfigureCommand(SignalingPacket): 1391 acp_seid: int = field(kw_only=True, default=0) 1392 service_capabilities: List[ServiceCapability] = field(kw_only=True, default_factory=list) 1393 1394 def __post_init__(self): 1395 self.message_type = MessageType.COMMAND 1396 self.signal_identifier = SignalIdentifier.AVDTP_RECONFIGURE 1397 1398 @staticmethod 1399 def parse(fields: dict, span: bytes) -> Tuple["ReconfigureCommand", bytes]: 1400 if (fields["message_type"] != MessageType.COMMAND or 1401 fields["signal_identifier"] != SignalIdentifier.AVDTP_RECONFIGURE): 1402 raise Exception("Invalid constraint field values") 1403 if len(span) < 1: 1404 raise Exception("Invalid packet size") 1405 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1406 span = span[1:] 1407 service_capabilities = [] 1408 while len(span) > 0: 1409 element, span = ServiceCapability.parse(span) 1410 service_capabilities.append(element) 1411 fields["service_capabilities"] = service_capabilities 1412 return ReconfigureCommand(**fields), span 1413 1414 def serialize(self, payload: bytes = None) -> bytes: 1415 _span = bytearray() 1416 if self.acp_seid > 63: 1417 print( 1418 f"Invalid value for field ReconfigureCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated" 1419 ) 1420 self.acp_seid &= 63 1421 _span.append((self.acp_seid << 2)) 1422 for _elt in self.service_capabilities: 1423 _span.extend(_elt.serialize()) 1424 return SignalingPacket.serialize(self, payload=bytes(_span)) 1425 1426 @property 1427 def size(self) -> int: 1428 return sum([elt.size for elt in self.service_capabilities]) + 1 1429 1430 1431@dataclass 1432class ReconfigureResponse(SignalingPacket): 1433 1434 def __post_init__(self): 1435 self.message_type = MessageType.RESPONSE_ACCEPT 1436 self.signal_identifier = SignalIdentifier.AVDTP_RECONFIGURE 1437 1438 @staticmethod 1439 def parse(fields: dict, span: bytes) -> Tuple["ReconfigureResponse", bytes]: 1440 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1441 fields["signal_identifier"] != SignalIdentifier.AVDTP_RECONFIGURE): 1442 raise Exception("Invalid constraint field values") 1443 return ReconfigureResponse(**fields), span 1444 1445 def serialize(self, payload: bytes = None) -> bytes: 1446 _span = bytearray() 1447 return SignalingPacket.serialize(self, payload=bytes(_span)) 1448 1449 @property 1450 def size(self) -> int: 1451 return 0 1452 1453 1454@dataclass 1455class ReconfigureReject(SignalingPacket): 1456 service_category: int = field(kw_only=True, default=0) 1457 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1458 1459 def __post_init__(self): 1460 self.message_type = MessageType.RESPONSE_REJECT 1461 self.signal_identifier = SignalIdentifier.AVDTP_RECONFIGURE 1462 1463 @staticmethod 1464 def parse(fields: dict, span: bytes) -> Tuple["ReconfigureReject", bytes]: 1465 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1466 fields["signal_identifier"] != SignalIdentifier.AVDTP_RECONFIGURE): 1467 raise Exception("Invalid constraint field values") 1468 if len(span) < 2: 1469 raise Exception("Invalid packet size") 1470 fields["service_category"] = span[0] 1471 fields["error_code"] = ErrorCode.from_int(span[1]) 1472 span = span[2:] 1473 return ReconfigureReject(**fields), span 1474 1475 def serialize(self, payload: bytes = None) -> bytes: 1476 _span = bytearray() 1477 if self.service_category > 255: 1478 print( 1479 f"Invalid value for field ReconfigureReject::service_category: {self.service_category} > 255; the value will be truncated" 1480 ) 1481 self.service_category &= 255 1482 _span.append((self.service_category << 0)) 1483 _span.append((self.error_code << 0)) 1484 return SignalingPacket.serialize(self, payload=bytes(_span)) 1485 1486 @property 1487 def size(self) -> int: 1488 return 2 1489 1490 1491@dataclass 1492class OpenCommand(SignalingPacket): 1493 acp_seid: int = field(kw_only=True, default=0) 1494 1495 def __post_init__(self): 1496 self.message_type = MessageType.COMMAND 1497 self.signal_identifier = SignalIdentifier.AVDTP_OPEN 1498 1499 @staticmethod 1500 def parse(fields: dict, span: bytes) -> Tuple["OpenCommand", bytes]: 1501 if (fields["message_type"] != MessageType.COMMAND or 1502 fields["signal_identifier"] != SignalIdentifier.AVDTP_OPEN): 1503 raise Exception("Invalid constraint field values") 1504 if len(span) < 1: 1505 raise Exception("Invalid packet size") 1506 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1507 span = span[1:] 1508 return OpenCommand(**fields), span 1509 1510 def serialize(self, payload: bytes = None) -> bytes: 1511 _span = bytearray() 1512 if self.acp_seid > 63: 1513 print(f"Invalid value for field OpenCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated") 1514 self.acp_seid &= 63 1515 _span.append((self.acp_seid << 2)) 1516 return SignalingPacket.serialize(self, payload=bytes(_span)) 1517 1518 @property 1519 def size(self) -> int: 1520 return 1 1521 1522 1523@dataclass 1524class OpenResponse(SignalingPacket): 1525 1526 def __post_init__(self): 1527 self.message_type = MessageType.RESPONSE_ACCEPT 1528 self.signal_identifier = SignalIdentifier.AVDTP_OPEN 1529 1530 @staticmethod 1531 def parse(fields: dict, span: bytes) -> Tuple["OpenResponse", bytes]: 1532 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1533 fields["signal_identifier"] != SignalIdentifier.AVDTP_OPEN): 1534 raise Exception("Invalid constraint field values") 1535 return OpenResponse(**fields), span 1536 1537 def serialize(self, payload: bytes = None) -> bytes: 1538 _span = bytearray() 1539 return SignalingPacket.serialize(self, payload=bytes(_span)) 1540 1541 @property 1542 def size(self) -> int: 1543 return 0 1544 1545 1546@dataclass 1547class OpenReject(SignalingPacket): 1548 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1549 1550 def __post_init__(self): 1551 self.message_type = MessageType.RESPONSE_REJECT 1552 self.signal_identifier = SignalIdentifier.AVDTP_OPEN 1553 1554 @staticmethod 1555 def parse(fields: dict, span: bytes) -> Tuple["OpenReject", bytes]: 1556 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1557 fields["signal_identifier"] != SignalIdentifier.AVDTP_OPEN): 1558 raise Exception("Invalid constraint field values") 1559 if len(span) < 1: 1560 raise Exception("Invalid packet size") 1561 fields["error_code"] = ErrorCode.from_int(span[0]) 1562 span = span[1:] 1563 return OpenReject(**fields), span 1564 1565 def serialize(self, payload: bytes = None) -> bytes: 1566 _span = bytearray() 1567 _span.append((self.error_code << 0)) 1568 return SignalingPacket.serialize(self, payload=bytes(_span)) 1569 1570 @property 1571 def size(self) -> int: 1572 return 1 1573 1574 1575@dataclass 1576class StartCommand(SignalingPacket): 1577 acp_seid: int = field(kw_only=True, default=0) 1578 1579 def __post_init__(self): 1580 self.message_type = MessageType.COMMAND 1581 self.signal_identifier = SignalIdentifier.AVDTP_START 1582 1583 @staticmethod 1584 def parse(fields: dict, span: bytes) -> Tuple["StartCommand", bytes]: 1585 if (fields["message_type"] != MessageType.COMMAND or 1586 fields["signal_identifier"] != SignalIdentifier.AVDTP_START): 1587 raise Exception("Invalid constraint field values") 1588 if len(span) < 1: 1589 raise Exception("Invalid packet size") 1590 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1591 span = span[1:] 1592 return StartCommand(**fields), span 1593 1594 def serialize(self, payload: bytes = None) -> bytes: 1595 _span = bytearray() 1596 if self.acp_seid > 63: 1597 print(f"Invalid value for field StartCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated") 1598 self.acp_seid &= 63 1599 _span.append((self.acp_seid << 2)) 1600 return SignalingPacket.serialize(self, payload=bytes(_span)) 1601 1602 @property 1603 def size(self) -> int: 1604 return 1 1605 1606 1607@dataclass 1608class StartResponse(SignalingPacket): 1609 1610 def __post_init__(self): 1611 self.message_type = MessageType.RESPONSE_ACCEPT 1612 self.signal_identifier = SignalIdentifier.AVDTP_START 1613 1614 @staticmethod 1615 def parse(fields: dict, span: bytes) -> Tuple["StartResponse", bytes]: 1616 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1617 fields["signal_identifier"] != SignalIdentifier.AVDTP_START): 1618 raise Exception("Invalid constraint field values") 1619 return StartResponse(**fields), span 1620 1621 def serialize(self, payload: bytes = None) -> bytes: 1622 _span = bytearray() 1623 return SignalingPacket.serialize(self, payload=bytes(_span)) 1624 1625 @property 1626 def size(self) -> int: 1627 return 0 1628 1629 1630@dataclass 1631class StartReject(SignalingPacket): 1632 acp_seid: int = field(kw_only=True, default=0) 1633 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1634 1635 def __post_init__(self): 1636 self.message_type = MessageType.RESPONSE_REJECT 1637 self.signal_identifier = SignalIdentifier.AVDTP_START 1638 1639 @staticmethod 1640 def parse(fields: dict, span: bytes) -> Tuple["StartReject", bytes]: 1641 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1642 fields["signal_identifier"] != SignalIdentifier.AVDTP_START): 1643 raise Exception("Invalid constraint field values") 1644 if len(span) < 2: 1645 raise Exception("Invalid packet size") 1646 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1647 fields["error_code"] = ErrorCode.from_int(span[1]) 1648 span = span[2:] 1649 return StartReject(**fields), span 1650 1651 def serialize(self, payload: bytes = None) -> bytes: 1652 _span = bytearray() 1653 if self.acp_seid > 63: 1654 print(f"Invalid value for field StartReject::acp_seid: {self.acp_seid} > 63; the value will be truncated") 1655 self.acp_seid &= 63 1656 _span.append((self.acp_seid << 2)) 1657 _span.append((self.error_code << 0)) 1658 return SignalingPacket.serialize(self, payload=bytes(_span)) 1659 1660 @property 1661 def size(self) -> int: 1662 return 2 1663 1664 1665@dataclass 1666class CloseCommand(SignalingPacket): 1667 acp_seid: int = field(kw_only=True, default=0) 1668 1669 def __post_init__(self): 1670 self.message_type = MessageType.COMMAND 1671 self.signal_identifier = SignalIdentifier.AVDTP_CLOSE 1672 1673 @staticmethod 1674 def parse(fields: dict, span: bytes) -> Tuple["CloseCommand", bytes]: 1675 if (fields["message_type"] != MessageType.COMMAND or 1676 fields["signal_identifier"] != SignalIdentifier.AVDTP_CLOSE): 1677 raise Exception("Invalid constraint field values") 1678 if len(span) < 1: 1679 raise Exception("Invalid packet size") 1680 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1681 span = span[1:] 1682 return CloseCommand(**fields), span 1683 1684 def serialize(self, payload: bytes = None) -> bytes: 1685 _span = bytearray() 1686 if self.acp_seid > 255: 1687 print(f"Invalid value for field CloseCommand::acp_seid: {self.acp_seid} > 255; the value will be truncated") 1688 self.acp_seid &= 255 1689 _span.append((self.acp_seid << 2)) 1690 return SignalingPacket.serialize(self, payload=bytes(_span)) 1691 1692 @property 1693 def size(self) -> int: 1694 return 1 1695 1696 1697@dataclass 1698class CloseResponse(SignalingPacket): 1699 1700 def __post_init__(self): 1701 self.message_type = MessageType.RESPONSE_ACCEPT 1702 self.signal_identifier = SignalIdentifier.AVDTP_CLOSE 1703 1704 @staticmethod 1705 def parse(fields: dict, span: bytes) -> Tuple["CloseResponse", bytes]: 1706 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1707 fields["signal_identifier"] != SignalIdentifier.AVDTP_CLOSE): 1708 raise Exception("Invalid constraint field values") 1709 return CloseResponse(**fields), span 1710 1711 def serialize(self, payload: bytes = None) -> bytes: 1712 _span = bytearray() 1713 return SignalingPacket.serialize(self, payload=bytes(_span)) 1714 1715 @property 1716 def size(self) -> int: 1717 return 0 1718 1719 1720@dataclass 1721class CloseReject(SignalingPacket): 1722 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1723 1724 def __post_init__(self): 1725 self.message_type = MessageType.RESPONSE_REJECT 1726 self.signal_identifier = SignalIdentifier.AVDTP_CLOSE 1727 1728 @staticmethod 1729 def parse(fields: dict, span: bytes) -> Tuple["CloseReject", bytes]: 1730 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1731 fields["signal_identifier"] != SignalIdentifier.AVDTP_CLOSE): 1732 raise Exception("Invalid constraint field values") 1733 if len(span) < 1: 1734 raise Exception("Invalid packet size") 1735 fields["error_code"] = ErrorCode.from_int(span[0]) 1736 span = span[1:] 1737 return CloseReject(**fields), span 1738 1739 def serialize(self, payload: bytes = None) -> bytes: 1740 _span = bytearray() 1741 _span.append((self.error_code << 0)) 1742 return SignalingPacket.serialize(self, payload=bytes(_span)) 1743 1744 @property 1745 def size(self) -> int: 1746 return 1 1747 1748 1749@dataclass 1750class SuspendCommand(SignalingPacket): 1751 acp_seid: int = field(kw_only=True, default=0) 1752 1753 def __post_init__(self): 1754 self.message_type = MessageType.COMMAND 1755 self.signal_identifier = SignalIdentifier.AVDTP_SUSPEND 1756 1757 @staticmethod 1758 def parse(fields: dict, span: bytes) -> Tuple["SuspendCommand", bytes]: 1759 if (fields["message_type"] != MessageType.COMMAND or 1760 fields["signal_identifier"] != SignalIdentifier.AVDTP_SUSPEND): 1761 raise Exception("Invalid constraint field values") 1762 if len(span) < 1: 1763 raise Exception("Invalid packet size") 1764 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1765 span = span[1:] 1766 return SuspendCommand(**fields), span 1767 1768 def serialize(self, payload: bytes = None) -> bytes: 1769 _span = bytearray() 1770 if self.acp_seid > 63: 1771 print( 1772 f"Invalid value for field SuspendCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated") 1773 self.acp_seid &= 63 1774 _span.append((self.acp_seid << 2)) 1775 return SignalingPacket.serialize(self, payload=bytes(_span)) 1776 1777 @property 1778 def size(self) -> int: 1779 return 1 1780 1781 1782@dataclass 1783class SuspendResponse(SignalingPacket): 1784 1785 def __post_init__(self): 1786 self.message_type = MessageType.RESPONSE_ACCEPT 1787 self.signal_identifier = SignalIdentifier.AVDTP_SUSPEND 1788 1789 @staticmethod 1790 def parse(fields: dict, span: bytes) -> Tuple["SuspendResponse", bytes]: 1791 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1792 fields["signal_identifier"] != SignalIdentifier.AVDTP_SUSPEND): 1793 raise Exception("Invalid constraint field values") 1794 return SuspendResponse(**fields), span 1795 1796 def serialize(self, payload: bytes = None) -> bytes: 1797 _span = bytearray() 1798 return SignalingPacket.serialize(self, payload=bytes(_span)) 1799 1800 @property 1801 def size(self) -> int: 1802 return 0 1803 1804 1805@dataclass 1806class SuspendReject(SignalingPacket): 1807 acp_seid: int = field(kw_only=True, default=0) 1808 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1809 1810 def __post_init__(self): 1811 self.message_type = MessageType.RESPONSE_REJECT 1812 self.signal_identifier = SignalIdentifier.AVDTP_SUSPEND 1813 1814 @staticmethod 1815 def parse(fields: dict, span: bytes) -> Tuple["SuspendReject", bytes]: 1816 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1817 fields["signal_identifier"] != SignalIdentifier.AVDTP_SUSPEND): 1818 raise Exception("Invalid constraint field values") 1819 if len(span) < 2: 1820 raise Exception("Invalid packet size") 1821 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1822 fields["error_code"] = ErrorCode.from_int(span[1]) 1823 span = span[2:] 1824 return SuspendReject(**fields), span 1825 1826 def serialize(self, payload: bytes = None) -> bytes: 1827 _span = bytearray() 1828 if self.acp_seid > 63: 1829 print(f"Invalid value for field SuspendReject::acp_seid: {self.acp_seid} > 63; the value will be truncated") 1830 self.acp_seid &= 63 1831 _span.append((self.acp_seid << 2)) 1832 _span.append((self.error_code << 0)) 1833 return SignalingPacket.serialize(self, payload=bytes(_span)) 1834 1835 @property 1836 def size(self) -> int: 1837 return 2 1838 1839 1840@dataclass 1841class AbortCommand(SignalingPacket): 1842 acp_seid: int = field(kw_only=True, default=0) 1843 1844 def __post_init__(self): 1845 self.message_type = MessageType.COMMAND 1846 self.signal_identifier = SignalIdentifier.AVDTP_ABORT 1847 1848 @staticmethod 1849 def parse(fields: dict, span: bytes) -> Tuple["AbortCommand", bytes]: 1850 if (fields["message_type"] != MessageType.COMMAND or 1851 fields["signal_identifier"] != SignalIdentifier.AVDTP_ABORT): 1852 raise Exception("Invalid constraint field values") 1853 if len(span) < 1: 1854 raise Exception("Invalid packet size") 1855 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1856 span = span[1:] 1857 return AbortCommand(**fields), span 1858 1859 def serialize(self, payload: bytes = None) -> bytes: 1860 _span = bytearray() 1861 if self.acp_seid > 63: 1862 print(f"Invalid value for field AbortCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated") 1863 self.acp_seid &= 63 1864 _span.append((self.acp_seid << 2)) 1865 return SignalingPacket.serialize(self, payload=bytes(_span)) 1866 1867 @property 1868 def size(self) -> int: 1869 return 1 1870 1871 1872@dataclass 1873class AbortResponse(SignalingPacket): 1874 1875 def __post_init__(self): 1876 self.message_type = MessageType.RESPONSE_ACCEPT 1877 self.signal_identifier = SignalIdentifier.AVDTP_ABORT 1878 1879 @staticmethod 1880 def parse(fields: dict, span: bytes) -> Tuple["AbortResponse", bytes]: 1881 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1882 fields["signal_identifier"] != SignalIdentifier.AVDTP_ABORT): 1883 raise Exception("Invalid constraint field values") 1884 return AbortResponse(**fields), span 1885 1886 def serialize(self, payload: bytes = None) -> bytes: 1887 _span = bytearray() 1888 return SignalingPacket.serialize(self, payload=bytes(_span)) 1889 1890 @property 1891 def size(self) -> int: 1892 return 0 1893 1894 1895@dataclass 1896class SecurityControlCommand(SignalingPacket): 1897 acp_seid: int = field(kw_only=True, default=0) 1898 content_protection_data: bytearray = field(kw_only=True, default_factory=bytearray) 1899 1900 def __post_init__(self): 1901 self.message_type = MessageType.COMMAND 1902 self.signal_identifier = SignalIdentifier.AVDTP_SECURITY_CONTROL 1903 1904 @staticmethod 1905 def parse(fields: dict, span: bytes) -> Tuple["SecurityControlCommand", bytes]: 1906 if (fields["message_type"] != MessageType.COMMAND or 1907 fields["signal_identifier"] != SignalIdentifier.AVDTP_SECURITY_CONTROL): 1908 raise Exception("Invalid constraint field values") 1909 if len(span) < 1: 1910 raise Exception("Invalid packet size") 1911 fields["acp_seid"] = (span[0] >> 2) & 0x3F 1912 span = span[1:] 1913 fields["content_protection_data"] = list(span) 1914 span = bytes() 1915 return SecurityControlCommand(**fields), span 1916 1917 def serialize(self, payload: bytes = None) -> bytes: 1918 _span = bytearray() 1919 if self.acp_seid > 63: 1920 print( 1921 f"Invalid value for field SecurityControlCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated" 1922 ) 1923 self.acp_seid &= 63 1924 _span.append((self.acp_seid << 2)) 1925 _span.extend(self.content_protection_data) 1926 return SignalingPacket.serialize(self, payload=bytes(_span)) 1927 1928 @property 1929 def size(self) -> int: 1930 return len(self.content_protection_data) * 1 + 1 1931 1932 1933@dataclass 1934class SecurityControlResponse(SignalingPacket): 1935 content_protection_data: bytearray = field(kw_only=True, default_factory=bytearray) 1936 1937 def __post_init__(self): 1938 self.message_type = MessageType.RESPONSE_ACCEPT 1939 self.signal_identifier = SignalIdentifier.AVDTP_SECURITY_CONTROL 1940 1941 @staticmethod 1942 def parse(fields: dict, span: bytes) -> Tuple["SecurityControlResponse", bytes]: 1943 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 1944 fields["signal_identifier"] != SignalIdentifier.AVDTP_SECURITY_CONTROL): 1945 raise Exception("Invalid constraint field values") 1946 fields["content_protection_data"] = list(span) 1947 span = bytes() 1948 return SecurityControlResponse(**fields), span 1949 1950 def serialize(self, payload: bytes = None) -> bytes: 1951 _span = bytearray() 1952 _span.extend(self.content_protection_data) 1953 return SignalingPacket.serialize(self, payload=bytes(_span)) 1954 1955 @property 1956 def size(self) -> int: 1957 return len(self.content_protection_data) * 1 1958 1959 1960@dataclass 1961class SecurityControlReject(SignalingPacket): 1962 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 1963 1964 def __post_init__(self): 1965 self.message_type = MessageType.RESPONSE_REJECT 1966 self.signal_identifier = SignalIdentifier.AVDTP_SECURITY_CONTROL 1967 1968 @staticmethod 1969 def parse(fields: dict, span: bytes) -> Tuple["SecurityControlReject", bytes]: 1970 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 1971 fields["signal_identifier"] != SignalIdentifier.AVDTP_SECURITY_CONTROL): 1972 raise Exception("Invalid constraint field values") 1973 if len(span) < 1: 1974 raise Exception("Invalid packet size") 1975 fields["error_code"] = ErrorCode.from_int(span[0]) 1976 span = span[1:] 1977 return SecurityControlReject(**fields), span 1978 1979 def serialize(self, payload: bytes = None) -> bytes: 1980 _span = bytearray() 1981 _span.append((self.error_code << 0)) 1982 return SignalingPacket.serialize(self, payload=bytes(_span)) 1983 1984 @property 1985 def size(self) -> int: 1986 return 1 1987 1988 1989@dataclass 1990class GeneralReject(SignalingPacket): 1991 1992 def __post_init__(self): 1993 self.message_type = MessageType.GENERAL_REJECT 1994 1995 @staticmethod 1996 def parse(fields: dict, span: bytes) -> Tuple["GeneralReject", bytes]: 1997 if fields["message_type"] != MessageType.GENERAL_REJECT: 1998 raise Exception("Invalid constraint field values") 1999 return GeneralReject(**fields), span 2000 2001 def serialize(self, payload: bytes = None) -> bytes: 2002 _span = bytearray() 2003 return SignalingPacket.serialize(self, payload=bytes(_span)) 2004 2005 @property 2006 def size(self) -> int: 2007 return 0 2008 2009 2010@dataclass 2011class DelayReportCommand(SignalingPacket): 2012 acp_seid: int = field(kw_only=True, default=0) 2013 delay_msb: int = field(kw_only=True, default=0) 2014 delay_lsb: int = field(kw_only=True, default=0) 2015 2016 def __post_init__(self): 2017 self.message_type = MessageType.COMMAND 2018 self.signal_identifier = SignalIdentifier.AVDTP_DELAYREPORT 2019 2020 @staticmethod 2021 def parse(fields: dict, span: bytes) -> Tuple["DelayReportCommand", bytes]: 2022 if (fields["message_type"] != MessageType.COMMAND or 2023 fields["signal_identifier"] != SignalIdentifier.AVDTP_DELAYREPORT): 2024 raise Exception("Invalid constraint field values") 2025 if len(span) < 3: 2026 raise Exception("Invalid packet size") 2027 fields["acp_seid"] = (span[0] >> 2) & 0x3F 2028 fields["delay_msb"] = span[1] 2029 fields["delay_lsb"] = span[2] 2030 span = span[3:] 2031 return DelayReportCommand(**fields), span 2032 2033 def serialize(self, payload: bytes = None) -> bytes: 2034 _span = bytearray() 2035 if self.acp_seid > 63: 2036 print( 2037 f"Invalid value for field DelayReportCommand::acp_seid: {self.acp_seid} > 63; the value will be truncated" 2038 ) 2039 self.acp_seid &= 63 2040 _span.append((self.acp_seid << 2)) 2041 if self.delay_msb > 255: 2042 print( 2043 f"Invalid value for field DelayReportCommand::delay_msb: {self.delay_msb} > 255; the value will be truncated" 2044 ) 2045 self.delay_msb &= 255 2046 _span.append((self.delay_msb << 0)) 2047 if self.delay_lsb > 255: 2048 print( 2049 f"Invalid value for field DelayReportCommand::delay_lsb: {self.delay_lsb} > 255; the value will be truncated" 2050 ) 2051 self.delay_lsb &= 255 2052 _span.append((self.delay_lsb << 0)) 2053 return SignalingPacket.serialize(self, payload=bytes(_span)) 2054 2055 @property 2056 def size(self) -> int: 2057 return 3 2058 2059 2060@dataclass 2061class DelayReportResponse(SignalingPacket): 2062 2063 def __post_init__(self): 2064 self.message_type = MessageType.RESPONSE_ACCEPT 2065 self.signal_identifier = SignalIdentifier.AVDTP_DELAYREPORT 2066 2067 @staticmethod 2068 def parse(fields: dict, span: bytes) -> Tuple["DelayReportResponse", bytes]: 2069 if (fields["message_type"] != MessageType.RESPONSE_ACCEPT or 2070 fields["signal_identifier"] != SignalIdentifier.AVDTP_DELAYREPORT): 2071 raise Exception("Invalid constraint field values") 2072 return DelayReportResponse(**fields), span 2073 2074 def serialize(self, payload: bytes = None) -> bytes: 2075 _span = bytearray() 2076 return SignalingPacket.serialize(self, payload=bytes(_span)) 2077 2078 @property 2079 def size(self) -> int: 2080 return 0 2081 2082 2083@dataclass 2084class DelayReportReject(SignalingPacket): 2085 error_code: ErrorCode = field(kw_only=True, default=ErrorCode.SUCCESS) 2086 2087 def __post_init__(self): 2088 self.message_type = MessageType.RESPONSE_REJECT 2089 self.signal_identifier = SignalIdentifier.AVDTP_DELAYREPORT 2090 2091 @staticmethod 2092 def parse(fields: dict, span: bytes) -> Tuple["DelayReportReject", bytes]: 2093 if (fields["message_type"] != MessageType.RESPONSE_REJECT or 2094 fields["signal_identifier"] != SignalIdentifier.AVDTP_DELAYREPORT): 2095 raise Exception("Invalid constraint field values") 2096 if len(span) < 1: 2097 raise Exception("Invalid packet size") 2098 fields["error_code"] = ErrorCode.from_int(span[0]) 2099 span = span[1:] 2100 return DelayReportReject(**fields), span 2101 2102 def serialize(self, payload: bytes = None) -> bytes: 2103 _span = bytearray() 2104 _span.append((self.error_code << 0)) 2105 return SignalingPacket.serialize(self, payload=bytes(_span)) 2106 2107 @property 2108 def size(self) -> int: 2109 return 1 2110