1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5# Copyright (C) Michael Farrell <micolous+git@gmail.com> 6 7 8""" 9Fields: basic data structures that make up parts of packets. 10""" 11 12import calendar 13import collections 14import copy 15import datetime 16import inspect 17import math 18import socket 19import struct 20import time 21import warnings 22 23from types import MethodType 24from uuid import UUID 25from enum import Enum 26 27from scapy.config import conf 28from scapy.dadict import DADict 29from scapy.volatile import RandBin, RandByte, RandEnumKeys, RandInt, \ 30 RandIP, RandIP6, RandLong, RandMAC, RandNum, RandShort, RandSInt, \ 31 RandSByte, RandTermString, RandUUID, VolatileValue, RandSShort, \ 32 RandSLong, RandFloat 33from scapy.data import EPOCH 34from scapy.error import log_runtime, Scapy_Exception 35from scapy.compat import bytes_hex, plain_str, raw, bytes_encode 36from scapy.pton_ntop import inet_ntop, inet_pton 37from scapy.utils import inet_aton, inet_ntoa, lhex, mac2str, str2mac, EDecimal 38from scapy.utils6 import in6_6to4ExtractAddr, in6_isaddr6to4, \ 39 in6_isaddrTeredo, in6_ptop, Net6, teredoAddrExtractInfo 40from scapy.base_classes import ( 41 _ScopedIP, 42 BasePacket, 43 Field_metaclass, 44 Net, 45 ScopedIP, 46) 47 48# Typing imports 49from typing import ( 50 Any, 51 AnyStr, 52 Callable, 53 Dict, 54 List, 55 Generic, 56 Optional, 57 Set, 58 Tuple, 59 Type, 60 TypeVar, 61 Union, 62 # func 63 cast, 64 TYPE_CHECKING, 65) 66 67if TYPE_CHECKING: 68 # Do not import on runtime ! (import loop) 69 from scapy.packet import Packet 70 71 72class RawVal: 73 r""" 74 A raw value that will not be processed by the field and inserted 75 as-is in the packet string. 76 77 Example:: 78 79 >>> a = IP(len=RawVal("####")) 80 >>> bytes(a) 81 b'F\x00####\x00\x01\x00\x005\xb5\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00' 82 83 """ 84 85 def __init__(self, val=b""): 86 # type: (bytes) -> None 87 self.val = bytes_encode(val) 88 89 def __str__(self): 90 # type: () -> str 91 return str(self.val) 92 93 def __bytes__(self): 94 # type: () -> bytes 95 return self.val 96 97 def __len__(self): 98 # type: () -> int 99 return len(self.val) 100 101 def __repr__(self): 102 # type: () -> str 103 return "<RawVal [%r]>" % self.val 104 105 106class ObservableDict(Dict[int, str]): 107 """ 108 Helper class to specify a protocol extendable for runtime modifications 109 """ 110 111 def __init__(self, *args, **kw): 112 # type: (*Dict[int, str], **Any) -> None 113 self.observers = [] # type: List[_EnumField[Any]] 114 super(ObservableDict, self).__init__(*args, **kw) 115 116 def observe(self, observer): 117 # type: (_EnumField[Any]) -> None 118 self.observers.append(observer) 119 120 def __setitem__(self, key, value): 121 # type: (int, str) -> None 122 for o in self.observers: 123 o.notify_set(self, key, value) 124 super(ObservableDict, self).__setitem__(key, value) 125 126 def __delitem__(self, key): 127 # type: (int) -> None 128 for o in self.observers: 129 o.notify_del(self, key) 130 super(ObservableDict, self).__delitem__(key) 131 132 def update(self, anotherDict): # type: ignore 133 for k in anotherDict: 134 self[k] = anotherDict[k] 135 136 137############ 138# Fields # 139############ 140 141I = TypeVar('I') # Internal storage # noqa: E741 142M = TypeVar('M') # Machine storage 143 144 145class Field(Generic[I, M], metaclass=Field_metaclass): 146 """ 147 For more information on how this works, please refer to the 148 'Adding new protocols' chapter in the online documentation: 149 150 https://scapy.readthedocs.io/en/stable/build_dissect.html 151 """ 152 __slots__ = [ 153 "name", 154 "fmt", 155 "default", 156 "sz", 157 "owners", 158 "struct" 159 ] 160 islist = 0 161 ismutable = False 162 holds_packets = 0 163 164 def __init__(self, name, default, fmt="H"): 165 # type: (str, Any, str) -> None 166 if not isinstance(name, str): 167 raise ValueError("name should be a string") 168 self.name = name 169 if fmt[0] in "@=<>!": 170 self.fmt = fmt 171 else: 172 self.fmt = "!" + fmt 173 self.struct = struct.Struct(self.fmt) 174 self.default = self.any2i(None, default) 175 self.sz = struct.calcsize(self.fmt) # type: int 176 self.owners = [] # type: List[Type[Packet]] 177 178 def register_owner(self, cls): 179 # type: (Type[Packet]) -> None 180 self.owners.append(cls) 181 182 def i2len(self, 183 pkt, # type: Packet 184 x, # type: Any 185 ): 186 # type: (...) -> int 187 """Convert internal value to a length usable by a FieldLenField""" 188 if isinstance(x, RawVal): 189 return len(x) 190 return self.sz 191 192 def i2count(self, pkt, x): 193 # type: (Optional[Packet], I) -> int 194 """Convert internal value to a number of elements usable by a FieldLenField. 195 Always 1 except for list fields""" 196 return 1 197 198 def h2i(self, pkt, x): 199 # type: (Optional[Packet], Any) -> I 200 """Convert human value to internal value""" 201 return cast(I, x) 202 203 def i2h(self, pkt, x): 204 # type: (Optional[Packet], I) -> Any 205 """Convert internal value to human value""" 206 return x 207 208 def m2i(self, pkt, x): 209 # type: (Optional[Packet], M) -> I 210 """Convert machine value to internal value""" 211 return cast(I, x) 212 213 def i2m(self, pkt, x): 214 # type: (Optional[Packet], Optional[I]) -> M 215 """Convert internal value to machine value""" 216 if x is None: 217 return cast(M, 0) 218 elif isinstance(x, str): 219 return cast(M, bytes_encode(x)) 220 return cast(M, x) 221 222 def any2i(self, pkt, x): 223 # type: (Optional[Packet], Any) -> Optional[I] 224 """Try to understand the most input values possible and make an internal value from them""" # noqa: E501 225 return self.h2i(pkt, x) 226 227 def i2repr(self, pkt, x): 228 # type: (Optional[Packet], I) -> str 229 """Convert internal value to a nice representation""" 230 return repr(self.i2h(pkt, x)) 231 232 def addfield(self, pkt, s, val): 233 # type: (Packet, bytes, Optional[I]) -> bytes 234 """Add an internal value to a string 235 236 Copy the network representation of field `val` (belonging to layer 237 `pkt`) to the raw string packet `s`, and return the new string packet. 238 """ 239 try: 240 return s + self.struct.pack(self.i2m(pkt, val)) 241 except struct.error as ex: 242 raise ValueError( 243 "Incorrect type of value for field %s:\n" % self.name + 244 "struct.error('%s')\n" % ex + 245 "To inject bytes into the field regardless of the type, " + 246 "use RawVal. See help(RawVal)" 247 ) 248 249 def getfield(self, pkt, s): 250 # type: (Packet, bytes) -> Tuple[bytes, I] 251 """Extract an internal value from a string 252 253 Extract from the raw packet `s` the field value belonging to layer 254 `pkt`. 255 256 Returns a two-element list, 257 first the raw packet string after having removed the extracted field, 258 second the extracted field itself in internal representation. 259 """ 260 return s[self.sz:], self.m2i(pkt, self.struct.unpack(s[:self.sz])[0]) 261 262 def do_copy(self, x): 263 # type: (I) -> I 264 if isinstance(x, list): 265 x = x[:] # type: ignore 266 for i in range(len(x)): 267 if isinstance(x[i], BasePacket): 268 x[i] = x[i].copy() 269 return x # type: ignore 270 if hasattr(x, "copy"): 271 return x.copy() # type: ignore 272 return x 273 274 def __repr__(self): 275 # type: () -> str 276 return "<%s (%s).%s>" % ( 277 self.__class__.__name__, 278 ",".join(x.__name__ for x in self.owners), 279 self.name 280 ) 281 282 def copy(self): 283 # type: () -> Field[I, M] 284 return copy.copy(self) 285 286 def randval(self): 287 # type: () -> VolatileValue[Any] 288 """Return a volatile object whose value is both random and suitable for this field""" # noqa: E501 289 fmtt = self.fmt[-1] 290 if fmtt in "BbHhIiQq": 291 return {"B": RandByte, "b": RandSByte, 292 "H": RandShort, "h": RandSShort, 293 "I": RandInt, "i": RandSInt, 294 "Q": RandLong, "q": RandSLong}[fmtt]() 295 elif fmtt == "s": 296 if self.fmt[0] in "0123456789": 297 value = int(self.fmt[:-1]) 298 else: 299 value = int(self.fmt[1:-1]) 300 return RandBin(value) 301 else: 302 raise ValueError( 303 "no random class for [%s] (fmt=%s)." % ( 304 self.name, self.fmt 305 ) 306 ) 307 308 309class _FieldContainer(object): 310 """ 311 A field that acts as a container for another field 312 """ 313 __slots__ = ["fld"] 314 315 def __getattr__(self, attr): 316 # type: (str) -> Any 317 return getattr(self.fld, attr) 318 319 320AnyField = Union[Field[Any, Any], _FieldContainer] 321 322 323class Emph(_FieldContainer): 324 """Empathize sub-layer for display""" 325 __slots__ = ["fld"] 326 327 def __init__(self, fld): 328 # type: (Any) -> None 329 self.fld = fld 330 331 def __eq__(self, other): 332 # type: (Any) -> bool 333 return bool(self.fld == other) 334 335 def __hash__(self): 336 # type: () -> int 337 return hash(self.fld) 338 339 340class MayEnd(_FieldContainer): 341 """ 342 Allow packet dissection to end after the dissection of this field 343 if no bytes are left. 344 345 A good example would be a length field that can be 0 or a set value, 346 and where it would be too annoying to use multiple ConditionalFields 347 348 Important note: any field below this one MUST default 349 to an empty value, else the behavior will be unexpected. 350 """ 351 __slots__ = ["fld"] 352 353 def __init__(self, fld): 354 # type: (Any) -> None 355 self.fld = fld 356 357 def __eq__(self, other): 358 # type: (Any) -> bool 359 return bool(self.fld == other) 360 361 def __hash__(self): 362 # type: () -> int 363 return hash(self.fld) 364 365 366class ActionField(_FieldContainer): 367 __slots__ = ["fld", "_action_method", "_privdata"] 368 369 def __init__(self, fld, action_method, **kargs): 370 # type: (Field[Any, Any], str, **Any) -> None 371 self.fld = fld 372 self._action_method = action_method 373 self._privdata = kargs 374 375 def any2i(self, pkt, val): 376 # type: (Optional[Packet], int) -> Any 377 getattr(pkt, self._action_method)(val, self.fld, **self._privdata) 378 return getattr(self.fld, "any2i")(pkt, val) 379 380 381class ConditionalField(_FieldContainer): 382 __slots__ = ["fld", "cond"] 383 384 def __init__(self, 385 fld, # type: AnyField 386 cond # type: Callable[[Packet], bool] 387 ): 388 # type: (...) -> None 389 self.fld = fld 390 self.cond = cond 391 392 def _evalcond(self, pkt): 393 # type: (Packet) -> bool 394 return bool(self.cond(pkt)) 395 396 def any2i(self, pkt, x): 397 # type: (Optional[Packet], Any) -> Any 398 # BACKWARD COMPATIBILITY 399 # Note: we shouldn't need this function. (it's not correct) 400 # However, having i2h implemented (#2364), it changes the default 401 # behavior and broke all packets that wrongly use two ConditionalField 402 # with the same name. Those packets are the problem: they are wrongly 403 # built (they should either be re-using the same conditional field, or 404 # using a MultipleTypeField). 405 # But I don't want to dive into fixing all of them just yet, 406 # so for now, let's keep this this way, even though it's not correct. 407 if type(self.fld) is Field: 408 return x 409 return self.fld.any2i(pkt, x) 410 411 def i2h(self, pkt, val): 412 # type: (Optional[Packet], Any) -> Any 413 if pkt and not self._evalcond(pkt): 414 return None 415 return self.fld.i2h(pkt, val) 416 417 def getfield(self, pkt, s): 418 # type: (Packet, bytes) -> Tuple[bytes, Any] 419 if self._evalcond(pkt): 420 return self.fld.getfield(pkt, s) 421 else: 422 return s, None 423 424 def addfield(self, pkt, s, val): 425 # type: (Packet, bytes, Any) -> bytes 426 if self._evalcond(pkt): 427 return self.fld.addfield(pkt, s, val) 428 else: 429 return s 430 431 def __getattr__(self, attr): 432 # type: (str) -> Any 433 return getattr(self.fld, attr) 434 435 436class MultipleTypeField(_FieldContainer): 437 """ 438 MultipleTypeField are used for fields that can be implemented by 439 various Field subclasses, depending on conditions on the packet. 440 441 It is initialized with `flds` and `dflt`. 442 443 :param dflt: is the default field type, to be used when none of the 444 conditions matched the current packet. 445 :param flds: is a list of tuples (`fld`, `cond`) or (`fld`, `cond`, `hint`) 446 where `fld` if a field type, and `cond` a "condition" to 447 determine if `fld` is the field type that should be used. 448 449 ``cond`` is either: 450 451 - a callable `cond_pkt` that accepts one argument (the packet) and 452 returns True if `fld` should be used, False otherwise. 453 - a tuple (`cond_pkt`, `cond_pkt_val`), where `cond_pkt` is the same 454 as in the previous case and `cond_pkt_val` is a callable that 455 accepts two arguments (the packet, and the value to be set) and 456 returns True if `fld` should be used, False otherwise. 457 458 See scapy.layers.l2.ARP (type "help(ARP)" in Scapy) for an example of 459 use. 460 """ 461 462 __slots__ = ["flds", "dflt", "hints", "name", "default"] 463 464 def __init__( 465 self, 466 flds: List[Union[ 467 Tuple[Field[Any, Any], Any, str], 468 Tuple[Field[Any, Any], Any] 469 ]], 470 dflt: Field[Any, Any] 471 ) -> None: 472 self.hints = { 473 x[0]: x[2] 474 for x in flds 475 if len(x) == 3 476 } 477 self.flds = [ 478 (x[0], x[1]) for x in flds 479 ] 480 self.dflt = dflt 481 self.default = None # So that we can detect changes in defaults 482 self.name = self.dflt.name 483 if any(x[0].name != self.name for x in self.flds): 484 warnings.warn( 485 ("All fields should have the same name in a " 486 "MultipleTypeField (%s). Use hints.") % self.name, 487 SyntaxWarning 488 ) 489 490 def _iterate_fields_cond(self, pkt, val, use_val): 491 # type: (Optional[Packet], Any, bool) -> Field[Any, Any] 492 """Internal function used by _find_fld_pkt & _find_fld_pkt_val""" 493 # Iterate through the fields 494 for fld, cond in self.flds: 495 if isinstance(cond, tuple): 496 if use_val: 497 if val is None: 498 val = self.dflt.default 499 if cond[1](pkt, val): 500 return fld 501 continue 502 else: 503 cond = cond[0] 504 if cond(pkt): 505 return fld 506 return self.dflt 507 508 def _find_fld_pkt(self, pkt): 509 # type: (Optional[Packet]) -> Field[Any, Any] 510 """Given a Packet instance `pkt`, returns the Field subclass to be 511used. If you know the value to be set (e.g., in .addfield()), use 512._find_fld_pkt_val() instead. 513 514 """ 515 return self._iterate_fields_cond(pkt, None, False) 516 517 def _find_fld_pkt_val(self, 518 pkt, # type: Optional[Packet] 519 val, # type: Any 520 ): 521 # type: (...) -> Tuple[Field[Any, Any], Any] 522 """Given a Packet instance `pkt` and the value `val` to be set, 523returns the Field subclass to be used, and the updated `val` if necessary. 524 525 """ 526 fld = self._iterate_fields_cond(pkt, val, True) 527 if val is None: 528 val = fld.default 529 return fld, val 530 531 def _find_fld(self): 532 # type: () -> Field[Any, Any] 533 """Returns the Field subclass to be used, depending on the Packet 534instance, or the default subclass. 535 536DEV: since the Packet instance is not provided, we have to use a hack 537to guess it. It should only be used if you cannot provide the current 538Packet instance (for example, because of the current Scapy API). 539 540If you have the current Packet instance, use ._find_fld_pkt_val() (if 541the value to set is also known) of ._find_fld_pkt() instead. 542 543 """ 544 # Hack to preserve current Scapy API 545 # See https://stackoverflow.com/a/7272464/3223422 546 frame = inspect.currentframe().f_back.f_back # type: ignore 547 while frame is not None: 548 try: 549 pkt = frame.f_locals['self'] 550 except KeyError: 551 pass 552 else: 553 if isinstance(pkt, tuple(self.dflt.owners)): 554 if not pkt.default_fields: 555 # Packet not initialized 556 return self.dflt 557 return self._find_fld_pkt(pkt) 558 frame = frame.f_back 559 return self.dflt 560 561 def getfield(self, 562 pkt, # type: Packet 563 s, # type: bytes 564 ): 565 # type: (...) -> Tuple[bytes, Any] 566 return self._find_fld_pkt(pkt).getfield(pkt, s) 567 568 def addfield(self, pkt, s, val): 569 # type: (Packet, bytes, Any) -> bytes 570 fld, val = self._find_fld_pkt_val(pkt, val) 571 return fld.addfield(pkt, s, val) 572 573 def any2i(self, pkt, val): 574 # type: (Optional[Packet], Any) -> Any 575 fld, val = self._find_fld_pkt_val(pkt, val) 576 return fld.any2i(pkt, val) 577 578 def h2i(self, pkt, val): 579 # type: (Optional[Packet], Any) -> Any 580 fld, val = self._find_fld_pkt_val(pkt, val) 581 return fld.h2i(pkt, val) 582 583 def i2h(self, 584 pkt, # type: Packet 585 val, # type: Any 586 ): 587 # type: (...) -> Any 588 fld, val = self._find_fld_pkt_val(pkt, val) 589 return fld.i2h(pkt, val) 590 591 def i2m(self, pkt, val): 592 # type: (Optional[Packet], Optional[Any]) -> Any 593 fld, val = self._find_fld_pkt_val(pkt, val) 594 return fld.i2m(pkt, val) 595 596 def i2len(self, pkt, val): 597 # type: (Packet, Any) -> int 598 fld, val = self._find_fld_pkt_val(pkt, val) 599 return fld.i2len(pkt, val) 600 601 def i2repr(self, pkt, val): 602 # type: (Optional[Packet], Any) -> str 603 fld, val = self._find_fld_pkt_val(pkt, val) 604 hint = "" 605 if fld in self.hints: 606 hint = " (%s)" % self.hints[fld] 607 return fld.i2repr(pkt, val) + hint 608 609 def register_owner(self, cls): 610 # type: (Type[Packet]) -> None 611 for fld, _ in self.flds: 612 fld.owners.append(cls) 613 self.dflt.owners.append(cls) 614 615 def get_fields_list(self): 616 # type: () -> List[Any] 617 return [self] 618 619 @property 620 def fld(self): 621 # type: () -> Field[Any, Any] 622 return self._find_fld() 623 624 625class PadField(_FieldContainer): 626 """Add bytes after the proxified field so that it ends at the specified 627 alignment from its beginning""" 628 __slots__ = ["fld", "_align", "_padwith"] 629 630 def __init__(self, fld, align, padwith=None): 631 # type: (AnyField, int, Optional[bytes]) -> None 632 self.fld = fld 633 self._align = align 634 self._padwith = padwith or b"\x00" 635 636 def padlen(self, flen, pkt): 637 # type: (int, Packet) -> int 638 return -flen % self._align 639 640 def getfield(self, 641 pkt, # type: Packet 642 s, # type: bytes 643 ): 644 # type: (...) -> Tuple[bytes, Any] 645 remain, val = self.fld.getfield(pkt, s) 646 padlen = self.padlen(len(s) - len(remain), pkt) 647 return remain[padlen:], val 648 649 def addfield(self, 650 pkt, # type: Packet 651 s, # type: bytes 652 val, # type: Any 653 ): 654 # type: (...) -> bytes 655 sval = self.fld.addfield(pkt, b"", val) 656 return s + sval + ( 657 self.padlen(len(sval), pkt) * self._padwith 658 ) 659 660 661class ReversePadField(PadField): 662 """Add bytes BEFORE the proxified field so that it starts at the specified 663 alignment from its beginning""" 664 665 def original_length(self, pkt): 666 # type: (Packet) -> int 667 return len(pkt.original) 668 669 def getfield(self, 670 pkt, # type: Packet 671 s, # type: bytes 672 ): 673 # type: (...) -> Tuple[bytes, Any] 674 # We need to get the length that has already been dissected 675 padlen = self.padlen(self.original_length(pkt) - len(s), pkt) 676 return self.fld.getfield(pkt, s[padlen:]) 677 678 def addfield(self, 679 pkt, # type: Packet 680 s, # type: bytes 681 val, # type: Any 682 ): 683 # type: (...) -> bytes 684 sval = self.fld.addfield(pkt, b"", val) 685 return s + struct.pack("%is" % ( 686 self.padlen(len(s), pkt) 687 ), self._padwith) + sval 688 689 690class TrailerBytes(bytes): 691 """ 692 Reverses slice operations to take from the back of the packet, 693 not the front 694 """ 695 696 def __getitem__(self, item): # type: ignore 697 # type: (Union[int, slice]) -> Union[int, bytes] 698 if isinstance(item, int): 699 if item < 0: 700 item = 1 + item 701 else: 702 item = len(self) - 1 - item 703 elif isinstance(item, slice): 704 start, stop, step = item.start, item.stop, item.step 705 new_start = -stop if stop else None 706 new_stop = -start if start else None 707 item = slice(new_start, new_stop, step) 708 return super(self.__class__, self).__getitem__(item) 709 710 711class TrailerField(_FieldContainer): 712 """Special Field that gets its value from the end of the *packet* 713 (Note: not layer, but packet). 714 715 Mostly used for FCS 716 """ 717 __slots__ = ["fld"] 718 719 def __init__(self, fld): 720 # type: (Field[Any, Any]) -> None 721 self.fld = fld 722 723 # Note: this is ugly. Very ugly. 724 # Do not copy this crap elsewhere, so that if one day we get 725 # brave enough to refactor it, it'll be easier. 726 727 def getfield(self, pkt, s): 728 # type: (Packet, bytes) -> Tuple[bytes, int] 729 previous_post_dissect = pkt.post_dissect 730 731 def _post_dissect(self, s): 732 # type: (Packet, bytes) -> bytes 733 # Reset packet to allow post_build 734 self.raw_packet_cache = None 735 self.post_dissect = previous_post_dissect # type: ignore 736 return previous_post_dissect(s) 737 pkt.post_dissect = MethodType(_post_dissect, pkt) # type: ignore 738 s = TrailerBytes(s) 739 s, val = self.fld.getfield(pkt, s) 740 return bytes(s), val 741 742 def addfield(self, pkt, s, val): 743 # type: (Packet, bytes, Optional[int]) -> bytes 744 previous_post_build = pkt.post_build 745 value = self.fld.addfield(pkt, b"", val) 746 747 def _post_build(self, p, pay): 748 # type: (Packet, bytes, bytes) -> bytes 749 pay += value 750 self.post_build = previous_post_build # type: ignore 751 return previous_post_build(p, pay) 752 pkt.post_build = MethodType(_post_build, pkt) # type: ignore 753 return s 754 755 756class FCSField(TrailerField): 757 """ 758 A FCS field that gets appended at the end of the *packet* (not layer). 759 """ 760 761 def __init__(self, *args, **kwargs): 762 # type: (*Any, **Any) -> None 763 super(FCSField, self).__init__(Field(*args, **kwargs)) 764 765 def i2repr(self, pkt, x): 766 # type: (Optional[Packet], int) -> str 767 return lhex(self.i2h(pkt, x)) 768 769 770class DestField(Field[str, bytes]): 771 __slots__ = ["defaultdst"] 772 # Each subclass must have its own bindings attribute 773 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]] 774 775 def __init__(self, name, default): 776 # type: (str, str) -> None 777 self.defaultdst = default 778 779 def dst_from_pkt(self, pkt): 780 # type: (Packet) -> str 781 for addr, condition in self.bindings.get(pkt.payload.__class__, []): 782 try: 783 if all(pkt.payload.getfieldval(field) == value 784 for field, value in condition.items()): 785 return addr # type: ignore 786 except AttributeError: 787 pass 788 return self.defaultdst 789 790 @classmethod 791 def bind_addr(cls, layer, addr, **condition): 792 # type: (Type[Packet], str, **Any) -> None 793 cls.bindings.setdefault(layer, []).append( # type: ignore 794 (addr, condition) 795 ) 796 797 798class MACField(Field[Optional[str], bytes]): 799 def __init__(self, name, default): 800 # type: (str, Optional[Any]) -> None 801 Field.__init__(self, name, default, "6s") 802 803 def i2m(self, pkt, x): 804 # type: (Optional[Packet], Optional[str]) -> bytes 805 if not x: 806 return b"\0\0\0\0\0\0" 807 try: 808 y = mac2str(x) 809 except (struct.error, OverflowError): 810 y = bytes_encode(x) 811 return y 812 813 def m2i(self, pkt, x): 814 # type: (Optional[Packet], bytes) -> str 815 return str2mac(x) 816 817 def any2i(self, pkt, x): 818 # type: (Optional[Packet], Any) -> str 819 if isinstance(x, bytes) and len(x) == 6: 820 return self.m2i(pkt, x) 821 return cast(str, x) 822 823 def i2repr(self, pkt, x): 824 # type: (Optional[Packet], Optional[str]) -> str 825 x = self.i2h(pkt, x) 826 if x is None: 827 return repr(x) 828 if self in conf.resolve: 829 x = conf.manufdb._resolve_MAC(x) 830 return x 831 832 def randval(self): 833 # type: () -> RandMAC 834 return RandMAC() 835 836 837class LEMACField(MACField): 838 def i2m(self, pkt, x): 839 # type: (Optional[Packet], Optional[str]) -> bytes 840 return MACField.i2m(self, pkt, x)[::-1] 841 842 def m2i(self, pkt, x): 843 # type: (Optional[Packet], bytes) -> str 844 return MACField.m2i(self, pkt, x[::-1]) 845 846 847class IPField(Field[Union[str, Net], bytes]): 848 def __init__(self, name, default): 849 # type: (str, Optional[str]) -> None 850 Field.__init__(self, name, default, "4s") 851 852 def h2i(self, pkt, x): 853 # type: (Optional[Packet], Union[AnyStr, List[AnyStr]]) -> Any 854 if isinstance(x, bytes): 855 x = plain_str(x) # type: ignore 856 if isinstance(x, _ScopedIP): 857 return x 858 elif isinstance(x, str): 859 x = ScopedIP(x) 860 try: 861 inet_aton(x) 862 except socket.error: 863 return Net(x) 864 elif isinstance(x, tuple): 865 if len(x) != 2: 866 raise ValueError("Invalid IP format") 867 return Net(*x) 868 elif isinstance(x, list): 869 return [self.h2i(pkt, n) for n in x] 870 return x 871 872 def i2h(self, pkt, x): 873 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str 874 return cast(str, x) 875 876 def resolve(self, x): 877 # type: (str) -> str 878 if self in conf.resolve: 879 try: 880 ret = socket.gethostbyaddr(x)[0] 881 except Exception: 882 pass 883 else: 884 if ret: 885 return ret 886 return x 887 888 def i2m(self, pkt, x): 889 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes 890 if x is None: 891 return b'\x00\x00\x00\x00' 892 return inet_aton(plain_str(x)) 893 894 def m2i(self, pkt, x): 895 # type: (Optional[Packet], bytes) -> str 896 return inet_ntoa(x) 897 898 def any2i(self, pkt, x): 899 # type: (Optional[Packet], Any) -> Any 900 return self.h2i(pkt, x) 901 902 def i2repr(self, pkt, x): 903 # type: (Optional[Packet], Union[str, Net]) -> str 904 if isinstance(x, _ScopedIP) and x.scope: 905 return repr(x) 906 r = self.resolve(self.i2h(pkt, x)) 907 return r if isinstance(r, str) else repr(r) 908 909 def randval(self): 910 # type: () -> RandIP 911 return RandIP() 912 913 914class SourceIPField(IPField): 915 def __init__(self, name): 916 # type: (str) -> None 917 IPField.__init__(self, name, None) 918 919 def __findaddr(self, pkt): 920 # type: (Packet) -> Optional[str] 921 if conf.route is None: 922 # unused import, only to initialize conf.route 923 import scapy.route # noqa: F401 924 return pkt.route()[1] or conf.route.route()[1] 925 926 def i2m(self, pkt, x): 927 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes 928 if x is None and pkt is not None: 929 x = self.__findaddr(pkt) 930 return super(SourceIPField, self).i2m(pkt, x) 931 932 def i2h(self, pkt, x): 933 # type: (Optional[Packet], Optional[Union[str, Net]]) -> str 934 if x is None and pkt is not None: 935 x = self.__findaddr(pkt) 936 return super(SourceIPField, self).i2h(pkt, x) 937 938 939class IP6Field(Field[Optional[Union[str, Net6]], bytes]): 940 def __init__(self, name, default): 941 # type: (str, Optional[str]) -> None 942 Field.__init__(self, name, default, "16s") 943 944 def h2i(self, pkt, x): 945 # type: (Optional[Packet], Any) -> str 946 if isinstance(x, bytes): 947 x = plain_str(x) 948 if isinstance(x, _ScopedIP): 949 return x 950 elif isinstance(x, str): 951 x = ScopedIP(x) 952 try: 953 x = ScopedIP(in6_ptop(x), scope=x.scope) 954 except socket.error: 955 return Net6(x) # type: ignore 956 elif isinstance(x, tuple): 957 if len(x) != 2: 958 raise ValueError("Invalid IPv6 format") 959 return Net6(*x) # type: ignore 960 elif isinstance(x, list): 961 x = [self.h2i(pkt, n) for n in x] 962 return x # type: ignore 963 964 def i2h(self, pkt, x): 965 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 966 return cast(str, x) 967 968 def i2m(self, pkt, x): 969 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes 970 if x is None: 971 x = "::" 972 return inet_pton(socket.AF_INET6, plain_str(x)) 973 974 def m2i(self, pkt, x): 975 # type: (Optional[Packet], bytes) -> str 976 return inet_ntop(socket.AF_INET6, x) 977 978 def any2i(self, pkt, x): 979 # type: (Optional[Packet], Optional[str]) -> str 980 return self.h2i(pkt, x) 981 982 def i2repr(self, pkt, x): 983 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 984 if x is None: 985 return self.i2h(pkt, x) 986 elif not isinstance(x, Net6) and not isinstance(x, list): 987 if in6_isaddrTeredo(x): # print Teredo info 988 server, _, maddr, mport = teredoAddrExtractInfo(x) 989 return "%s [Teredo srv: %s cli: %s:%s]" % (self.i2h(pkt, x), server, maddr, mport) # noqa: E501 990 elif in6_isaddr6to4(x): # print encapsulated address 991 vaddr = in6_6to4ExtractAddr(x) 992 return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr) 993 elif isinstance(x, _ScopedIP) and x.scope: 994 return repr(x) 995 r = self.i2h(pkt, x) # No specific information to return 996 return r if isinstance(r, str) else repr(r) 997 998 def randval(self): 999 # type: () -> RandIP6 1000 return RandIP6() 1001 1002 1003class SourceIP6Field(IP6Field): 1004 def __init__(self, name): 1005 # type: (str) -> None 1006 IP6Field.__init__(self, name, None) 1007 1008 def __findaddr(self, pkt): 1009 # type: (Packet) -> Optional[str] 1010 if conf.route6 is None: 1011 # unused import, only to initialize conf.route 1012 import scapy.route6 # noqa: F401 1013 return pkt.route()[1] 1014 1015 def i2m(self, pkt, x): 1016 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes 1017 if x is None and pkt is not None: 1018 x = self.__findaddr(pkt) 1019 return super(SourceIP6Field, self).i2m(pkt, x) 1020 1021 def i2h(self, pkt, x): 1022 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 1023 if x is None and pkt is not None: 1024 x = self.__findaddr(pkt) 1025 return super(SourceIP6Field, self).i2h(pkt, x) 1026 1027 1028class DestIP6Field(IP6Field, DestField): 1029 bindings = {} # type: Dict[Type[Packet], Tuple[str, Any]] 1030 1031 def __init__(self, name, default): 1032 # type: (str, str) -> None 1033 IP6Field.__init__(self, name, None) 1034 DestField.__init__(self, name, default) 1035 1036 def i2m(self, pkt, x): 1037 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes 1038 if x is None and pkt is not None: 1039 x = self.dst_from_pkt(pkt) 1040 return IP6Field.i2m(self, pkt, x) 1041 1042 def i2h(self, pkt, x): 1043 # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str 1044 if x is None and pkt is not None: 1045 x = self.dst_from_pkt(pkt) 1046 return super(DestIP6Field, self).i2h(pkt, x) 1047 1048 1049class ByteField(Field[int, int]): 1050 def __init__(self, name, default): 1051 # type: (str, Optional[int]) -> None 1052 Field.__init__(self, name, default, "B") 1053 1054 1055class XByteField(ByteField): 1056 def i2repr(self, pkt, x): 1057 # type: (Optional[Packet], int) -> str 1058 return lhex(self.i2h(pkt, x)) 1059 1060 1061# XXX Unused field: at least add some tests 1062class OByteField(ByteField): 1063 def i2repr(self, pkt, x): 1064 # type: (Optional[Packet], int) -> str 1065 return "%03o" % self.i2h(pkt, x) 1066 1067 1068class ThreeBytesField(Field[int, int]): 1069 def __init__(self, name, default): 1070 # type: (str, int) -> None 1071 Field.__init__(self, name, default, "!I") 1072 1073 def addfield(self, pkt, s, val): 1074 # type: (Packet, bytes, Optional[int]) -> bytes 1075 return s + struct.pack(self.fmt, self.i2m(pkt, val))[1:4] 1076 1077 def getfield(self, pkt, s): 1078 # type: (Packet, bytes) -> Tuple[bytes, int] 1079 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00" + s[:3])[0]) # noqa: E501 1080 1081 1082class X3BytesField(ThreeBytesField, XByteField): 1083 def i2repr(self, pkt, x): 1084 # type: (Optional[Packet], int) -> str 1085 return XByteField.i2repr(self, pkt, x) 1086 1087 1088class LEThreeBytesField(ByteField): 1089 def __init__(self, name, default): 1090 # type: (str, Optional[int]) -> None 1091 Field.__init__(self, name, default, "<I") 1092 1093 def addfield(self, pkt, s, val): 1094 # type: (Packet, bytes, Optional[int]) -> bytes 1095 return s + struct.pack(self.fmt, self.i2m(pkt, val))[:3] 1096 1097 def getfield(self, pkt, s): 1098 # type: (Optional[Packet], bytes) -> Tuple[bytes, int] 1099 return s[3:], self.m2i(pkt, struct.unpack(self.fmt, s[:3] + b"\x00")[0]) # noqa: E501 1100 1101 1102class XLE3BytesField(LEThreeBytesField, XByteField): 1103 def i2repr(self, pkt, x): 1104 # type: (Optional[Packet], int) -> str 1105 return XByteField.i2repr(self, pkt, x) 1106 1107 1108def LEX3BytesField(*args, **kwargs): 1109 # type: (*Any, **Any) -> Any 1110 warnings.warn( 1111 "LEX3BytesField is deprecated. Use XLE3BytesField", 1112 DeprecationWarning 1113 ) 1114 return XLE3BytesField(*args, **kwargs) 1115 1116 1117class NBytesField(Field[int, List[int]]): 1118 def __init__(self, name, default, sz): 1119 # type: (str, Optional[int], int) -> None 1120 Field.__init__(self, name, default, "<" + "B" * sz) 1121 1122 def i2m(self, pkt, x): 1123 # type: (Optional[Packet], Optional[int]) -> List[int] 1124 if x is None: 1125 return [0] * self.sz 1126 x2m = list() 1127 for _ in range(self.sz): 1128 x2m.append(x % 256) 1129 x //= 256 1130 return x2m[::-1] 1131 1132 def m2i(self, pkt, x): 1133 # type: (Optional[Packet], Union[List[int], int]) -> int 1134 if isinstance(x, int): 1135 return x 1136 # x can be a tuple when coming from struct.unpack (from getfield) 1137 if isinstance(x, (list, tuple)): 1138 return sum(d * (256 ** i) for i, d in enumerate(reversed(x))) 1139 return 0 1140 1141 def i2repr(self, pkt, x): 1142 # type: (Optional[Packet], int) -> str 1143 if isinstance(x, int): 1144 return '%i' % x 1145 return super(NBytesField, self).i2repr(pkt, x) 1146 1147 def addfield(self, pkt, s, val): 1148 # type: (Optional[Packet], bytes, Optional[int]) -> bytes 1149 return s + self.struct.pack(*self.i2m(pkt, val)) 1150 1151 def getfield(self, pkt, s): 1152 # type: (Optional[Packet], bytes) -> Tuple[bytes, int] 1153 return (s[self.sz:], 1154 self.m2i(pkt, self.struct.unpack(s[:self.sz]))) # type: ignore 1155 1156 def randval(self): 1157 # type: () -> RandNum 1158 return RandNum(0, 2 ** (self.sz * 8) - 1) 1159 1160 1161class XNBytesField(NBytesField): 1162 def i2repr(self, pkt, x): 1163 # type: (Optional[Packet], int) -> str 1164 if isinstance(x, int): 1165 return '0x%x' % x 1166 # x can be a tuple when coming from struct.unpack (from getfield) 1167 if isinstance(x, (list, tuple)): 1168 return "0x" + "".join("%02x" % b for b in x) 1169 return super(XNBytesField, self).i2repr(pkt, x) 1170 1171 1172class SignedByteField(Field[int, int]): 1173 def __init__(self, name, default): 1174 # type: (str, Optional[int]) -> None 1175 Field.__init__(self, name, default, "b") 1176 1177 1178class FieldValueRangeException(Scapy_Exception): 1179 pass 1180 1181 1182class MaximumItemsCount(Scapy_Exception): 1183 pass 1184 1185 1186class FieldAttributeException(Scapy_Exception): 1187 pass 1188 1189 1190class YesNoByteField(ByteField): 1191 """ 1192 A byte based flag field that shows representation of its number 1193 based on a given association 1194 1195 In its default configuration the following representation is generated: 1196 x == 0 : 'no' 1197 x != 0 : 'yes' 1198 1199 In more sophisticated use-cases (e.g. yes/no/invalid) one can use the 1200 config attribute to configure. 1201 Key-value, key-range and key-value-set associations that will be used to 1202 generate the values representation. 1203 1204 - A range is given by a tuple (<first-val>, <last-value>) including the 1205 last value. 1206 - A single-value tuple is treated as scalar. 1207 - A list defines a set of (probably non consecutive) values that should be 1208 associated to a given key. 1209 1210 All values not associated with a key will be shown as number of type 1211 unsigned byte. 1212 1213 **For instance**:: 1214 1215 config = { 1216 'no' : 0, 1217 'foo' : (1,22), 1218 'yes' : 23, 1219 'bar' : [24,25, 42, 48, 87, 253] 1220 } 1221 1222 Generates the following representations:: 1223 1224 x == 0 : 'no' 1225 x == 15: 'foo' 1226 x == 23: 'yes' 1227 x == 42: 'bar' 1228 x == 43: 43 1229 1230 Another example, using the config attribute one could also revert 1231 the stock-yes-no-behavior:: 1232 1233 config = { 1234 'yes' : 0, 1235 'no' : (1,255) 1236 } 1237 1238 Will generate the following value representation:: 1239 1240 x == 0 : 'yes' 1241 x != 0 : 'no' 1242 1243 """ 1244 __slots__ = ['eval_fn'] 1245 1246 def _build_config_representation(self, config): 1247 # type: (Dict[str, Any]) -> None 1248 assoc_table = dict() 1249 for key in config: 1250 value_spec = config[key] 1251 1252 value_spec_type = type(value_spec) 1253 1254 if value_spec_type is int: 1255 if value_spec < 0 or value_spec > 255: 1256 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 1257 'must be in range [0..255]'.format(value_spec)) # noqa: E501 1258 assoc_table[value_spec] = key 1259 1260 elif value_spec_type is list: 1261 for value in value_spec: 1262 if value < 0 or value > 255: 1263 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 1264 'must be in range [0..255]'.format(value)) # noqa: E501 1265 assoc_table[value] = key 1266 1267 elif value_spec_type is tuple: 1268 value_spec_len = len(value_spec) 1269 if value_spec_len != 2: 1270 raise FieldAttributeException('invalid length {} of given config item tuple {} - must be ' # noqa: E501 1271 '(<start-range>, <end-range>).'.format(value_spec_len, value_spec)) # noqa: E501 1272 1273 value_range_start = value_spec[0] 1274 if value_range_start < 0 or value_range_start > 255: 1275 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 1276 'must be in range [0..255]'.format(value_range_start)) # noqa: E501 1277 1278 value_range_end = value_spec[1] 1279 if value_range_end < 0 or value_range_end > 255: 1280 raise FieldValueRangeException('given field value {} invalid - ' # noqa: E501 1281 'must be in range [0..255]'.format(value_range_end)) # noqa: E501 1282 1283 for value in range(value_range_start, value_range_end + 1): 1284 1285 assoc_table[value] = key 1286 1287 self.eval_fn = lambda x: assoc_table[x] if x in assoc_table else x 1288 1289 def __init__(self, name, default, config=None): 1290 # type: (str, int, Optional[Dict[str, Any]]) -> None 1291 1292 if not config: 1293 # this represents the common use case and therefore it is kept small # noqa: E501 1294 self.eval_fn = lambda x: 'no' if x == 0 else 'yes' 1295 else: 1296 self._build_config_representation(config) 1297 ByteField.__init__(self, name, default) 1298 1299 def i2repr(self, pkt, x): 1300 # type: (Optional[Packet], int) -> str 1301 return self.eval_fn(x) # type: ignore 1302 1303 1304class ShortField(Field[int, int]): 1305 def __init__(self, name, default): 1306 # type: (str, Optional[int]) -> None 1307 Field.__init__(self, name, default, "H") 1308 1309 1310class SignedShortField(Field[int, int]): 1311 def __init__(self, name, default): 1312 # type: (str, Optional[int]) -> None 1313 Field.__init__(self, name, default, "h") 1314 1315 1316class LEShortField(Field[int, int]): 1317 def __init__(self, name, default): 1318 # type: (str, Optional[int]) -> None 1319 Field.__init__(self, name, default, "<H") 1320 1321 1322class LESignedShortField(Field[int, int]): 1323 def __init__(self, name, default): 1324 # type: (str, Optional[int]) -> None 1325 Field.__init__(self, name, default, "<h") 1326 1327 1328class XShortField(ShortField): 1329 def i2repr(self, pkt, x): 1330 # type: (Optional[Packet], int) -> str 1331 return lhex(self.i2h(pkt, x)) 1332 1333 1334class IntField(Field[int, int]): 1335 def __init__(self, name, default): 1336 # type: (str, Optional[int]) -> None 1337 Field.__init__(self, name, default, "I") 1338 1339 1340class SignedIntField(Field[int, int]): 1341 def __init__(self, name, default): 1342 # type: (str, int) -> None 1343 Field.__init__(self, name, default, "i") 1344 1345 1346class LEIntField(Field[int, int]): 1347 def __init__(self, name, default): 1348 # type: (str, Optional[int]) -> None 1349 Field.__init__(self, name, default, "<I") 1350 1351 1352class LESignedIntField(Field[int, int]): 1353 def __init__(self, name, default): 1354 # type: (str, int) -> None 1355 Field.__init__(self, name, default, "<i") 1356 1357 1358class XIntField(IntField): 1359 def i2repr(self, pkt, x): 1360 # type: (Optional[Packet], int) -> str 1361 return lhex(self.i2h(pkt, x)) 1362 1363 1364class XLEIntField(LEIntField, XIntField): 1365 def i2repr(self, pkt, x): 1366 # type: (Optional[Packet], int) -> str 1367 return XIntField.i2repr(self, pkt, x) 1368 1369 1370class XLEShortField(LEShortField, XShortField): 1371 def i2repr(self, pkt, x): 1372 # type: (Optional[Packet], int) -> str 1373 return XShortField.i2repr(self, pkt, x) 1374 1375 1376class LongField(Field[int, int]): 1377 def __init__(self, name, default): 1378 # type: (str, int) -> None 1379 Field.__init__(self, name, default, "Q") 1380 1381 1382class SignedLongField(Field[int, int]): 1383 def __init__(self, name, default): 1384 # type: (str, Optional[int]) -> None 1385 Field.__init__(self, name, default, "q") 1386 1387 1388class LELongField(LongField): 1389 def __init__(self, name, default): 1390 # type: (str, Optional[int]) -> None 1391 Field.__init__(self, name, default, "<Q") 1392 1393 1394class LESignedLongField(Field[int, int]): 1395 def __init__(self, name, default): 1396 # type: (str, Optional[Any]) -> None 1397 Field.__init__(self, name, default, "<q") 1398 1399 1400class XLongField(LongField): 1401 def i2repr(self, pkt, x): 1402 # type: (Optional[Packet], int) -> str 1403 return lhex(self.i2h(pkt, x)) 1404 1405 1406class XLELongField(LELongField, XLongField): 1407 def i2repr(self, pkt, x): 1408 # type: (Optional[Packet], int) -> str 1409 return XLongField.i2repr(self, pkt, x) 1410 1411 1412class IEEEFloatField(Field[int, int]): 1413 def __init__(self, name, default): 1414 # type: (str, Optional[int]) -> None 1415 Field.__init__(self, name, default, "f") 1416 1417 1418class IEEEDoubleField(Field[int, int]): 1419 def __init__(self, name, default): 1420 # type: (str, Optional[int]) -> None 1421 Field.__init__(self, name, default, "d") 1422 1423 1424class _StrField(Field[I, bytes]): 1425 __slots__ = ["remain"] 1426 1427 def __init__(self, name, default, fmt="H", remain=0): 1428 # type: (str, Optional[I], str, int) -> None 1429 Field.__init__(self, name, default, fmt) 1430 self.remain = remain 1431 1432 def i2len(self, pkt, x): 1433 # type: (Optional[Packet], Any) -> int 1434 if x is None: 1435 return 0 1436 return len(x) 1437 1438 def any2i(self, pkt, x): 1439 # type: (Optional[Packet], Any) -> I 1440 if isinstance(x, str): 1441 x = bytes_encode(x) 1442 return super(_StrField, self).any2i(pkt, x) # type: ignore 1443 1444 def i2repr(self, pkt, x): 1445 # type: (Optional[Packet], I) -> str 1446 if x and isinstance(x, bytes): 1447 return repr(x) 1448 return super(_StrField, self).i2repr(pkt, x) 1449 1450 def i2m(self, pkt, x): 1451 # type: (Optional[Packet], Optional[I]) -> bytes 1452 if x is None: 1453 return b"" 1454 if not isinstance(x, bytes): 1455 return bytes_encode(x) 1456 return x 1457 1458 def addfield(self, pkt, s, val): 1459 # type: (Packet, bytes, Optional[I]) -> bytes 1460 return s + self.i2m(pkt, val) 1461 1462 def getfield(self, pkt, s): 1463 # type: (Packet, bytes) -> Tuple[bytes, I] 1464 if self.remain == 0: 1465 return b"", self.m2i(pkt, s) 1466 else: 1467 return s[-self.remain:], self.m2i(pkt, s[:-self.remain]) 1468 1469 def randval(self): 1470 # type: () -> RandBin 1471 return RandBin(RandNum(0, 1200)) 1472 1473 1474class StrField(_StrField[bytes]): 1475 pass 1476 1477 1478class StrFieldUtf16(StrField): 1479 def any2i(self, pkt, x): 1480 # type: (Optional[Packet], Optional[str]) -> bytes 1481 if isinstance(x, str): 1482 return self.h2i(pkt, x) 1483 return super(StrFieldUtf16, self).any2i(pkt, x) 1484 1485 def i2repr(self, pkt, x): 1486 # type: (Optional[Packet], bytes) -> str 1487 return plain_str(self.i2h(pkt, x)) 1488 1489 def h2i(self, pkt, x): 1490 # type: (Optional[Packet], Optional[str]) -> bytes 1491 return plain_str(x).encode('utf-16-le', errors="replace") 1492 1493 def i2h(self, pkt, x): 1494 # type: (Optional[Packet], bytes) -> str 1495 return bytes_encode(x).decode('utf-16-le', errors="replace") 1496 1497 1498class _StrEnumField: 1499 def __init__(self, **kwargs): 1500 # type: (**Any) -> None 1501 self.enum = kwargs.pop("enum", {}) 1502 1503 def i2repr(self, pkt, v): 1504 # type: (Optional[Packet], bytes) -> str 1505 r = v.rstrip(b"\0") 1506 rr = repr(r) 1507 if self.enum: 1508 if v in self.enum: 1509 rr = "%s (%s)" % (rr, self.enum[v]) 1510 elif r in self.enum: 1511 rr = "%s (%s)" % (rr, self.enum[r]) 1512 return rr 1513 1514 1515class StrEnumField(_StrEnumField, StrField): 1516 __slots__ = ["enum"] 1517 1518 def __init__( 1519 self, 1520 name, # type: str 1521 default, # type: bytes 1522 enum=None, # type: Optional[Dict[str, str]] 1523 **kwargs # type: Any 1524 ): 1525 # type: (...) -> None 1526 StrField.__init__(self, name, default, **kwargs) # type: ignore 1527 self.enum = enum 1528 1529 1530K = TypeVar('K', List[BasePacket], BasePacket, Optional[BasePacket]) 1531 1532 1533class _PacketField(_StrField[K]): 1534 __slots__ = ["cls"] 1535 holds_packets = 1 1536 1537 def __init__(self, 1538 name, # type: str 1539 default, # type: Optional[K] 1540 pkt_cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501 1541 ): 1542 # type: (...) -> None 1543 super(_PacketField, self).__init__(name, default) 1544 self.cls = pkt_cls 1545 1546 def i2m(self, 1547 pkt, # type: Optional[Packet] 1548 i, # type: Any 1549 ): 1550 # type: (...) -> bytes 1551 if i is None: 1552 return b"" 1553 return raw(i) 1554 1555 def m2i(self, pkt, m): # type: ignore 1556 # type: (Optional[Packet], bytes) -> Packet 1557 try: 1558 # we want to set parent wherever possible 1559 return self.cls(m, _parent=pkt) # type: ignore 1560 except TypeError: 1561 return self.cls(m) 1562 1563 1564class _PacketFieldSingle(_PacketField[K]): 1565 def any2i(self, pkt, x): 1566 # type: (Optional[Packet], Any) -> K 1567 if x and pkt and hasattr(x, "add_parent"): 1568 cast("Packet", x).add_parent(pkt) 1569 return super(_PacketFieldSingle, self).any2i(pkt, x) 1570 1571 def getfield(self, 1572 pkt, # type: Packet 1573 s, # type: bytes 1574 ): 1575 # type: (...) -> Tuple[bytes, K] 1576 i = self.m2i(pkt, s) 1577 remain = b"" 1578 if conf.padding_layer in i: 1579 r = i[conf.padding_layer] 1580 del r.underlayer.payload 1581 remain = r.load 1582 return remain, i # type: ignore 1583 1584 1585class PacketField(_PacketFieldSingle[BasePacket]): 1586 def randval(self): # type: ignore 1587 # type: () -> Packet 1588 from scapy.packet import fuzz 1589 return fuzz(self.cls()) # type: ignore 1590 1591 1592class PacketLenField(_PacketFieldSingle[Optional[BasePacket]]): 1593 __slots__ = ["length_from"] 1594 1595 def __init__(self, 1596 name, # type: str 1597 default, # type: Packet 1598 cls, # type: Union[Callable[[bytes], Packet], Type[Packet]] # noqa: E501 1599 length_from=None # type: Optional[Callable[[Packet], int]] # noqa: E501 1600 ): 1601 # type: (...) -> None 1602 super(PacketLenField, self).__init__(name, default, cls) 1603 self.length_from = length_from or (lambda x: 0) 1604 1605 def getfield(self, 1606 pkt, # type: Packet 1607 s, # type: bytes 1608 ): 1609 # type: (...) -> Tuple[bytes, Optional[BasePacket]] 1610 len_pkt = self.length_from(pkt) 1611 i = None 1612 if len_pkt: 1613 try: 1614 i = self.m2i(pkt, s[:len_pkt]) 1615 except Exception: 1616 if conf.debug_dissector: 1617 raise 1618 i = conf.raw_layer(load=s[:len_pkt]) 1619 return s[len_pkt:], i 1620 1621 1622class PacketListField(_PacketField[List[BasePacket]]): 1623 """PacketListField represents a list containing a series of Packet instances 1624 that might occur right in the middle of another Packet field. 1625 This field type may also be used to indicate that a series of Packet 1626 instances have a sibling semantic instead of a parent/child relationship 1627 (i.e. a stack of layers). All elements in PacketListField have current 1628 packet referenced in parent field. 1629 """ 1630 __slots__ = ["count_from", "length_from", "next_cls_cb", "max_count"] 1631 islist = 1 1632 1633 def __init__( 1634 self, 1635 name, # type: str 1636 default, # type: Optional[List[BasePacket]] 1637 pkt_cls=None, # type: Optional[Union[Callable[[bytes], Packet], Type[Packet]]] # noqa: E501 1638 count_from=None, # type: Optional[Callable[[Packet], int]] 1639 length_from=None, # type: Optional[Callable[[Packet], int]] 1640 next_cls_cb=None, # type: Optional[Callable[[Packet, List[BasePacket], Optional[Packet], bytes], Type[Packet]]] # noqa: E501 1641 max_count=None, # type: Optional[int] 1642 ): 1643 # type: (...) -> None 1644 """ 1645 The number of Packet instances that are dissected by this field can 1646 be parametrized using one of three different mechanisms/parameters: 1647 1648 * count_from: a callback that returns the number of Packet 1649 instances to dissect. The callback prototype is:: 1650 1651 count_from(pkt:Packet) -> int 1652 1653 * length_from: a callback that returns the number of bytes that 1654 must be dissected by this field. The callback prototype is:: 1655 1656 length_from(pkt:Packet) -> int 1657 1658 * next_cls_cb: a callback that enables a Scapy developer to 1659 dynamically discover if another Packet instance should be 1660 dissected or not. See below for this callback prototype. 1661 1662 The bytes that are not consumed during the dissection of this field 1663 are passed to the next field of the current packet. 1664 1665 For the serialization of such a field, the list of Packets that are 1666 contained in a PacketListField can be heterogeneous and is 1667 unrestricted. 1668 1669 The type of the Packet instances that are dissected with this field is 1670 specified or discovered using one of the following mechanism: 1671 1672 * the pkt_cls parameter may contain a callable that returns an 1673 instance of the dissected Packet. This may either be a 1674 reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py) 1675 to generate an homogeneous PacketListField or a function 1676 deciding the type of the Packet instance 1677 (e.g. _CDPGuessAddrRecord in contrib/cdp.py) 1678 1679 * the pkt_cls parameter may contain a class object with a defined 1680 ``dispatch_hook`` classmethod. That method must return a Packet 1681 instance. The ``dispatch_hook`` callmethod must implement the 1682 following prototype:: 1683 1684 dispatch_hook(cls, 1685 _pkt:Optional[Packet], 1686 *args, **kargs 1687 ) -> Type[Packet] 1688 1689 The _pkt parameter may contain a reference to the packet 1690 instance containing the PacketListField that is being 1691 dissected. 1692 1693 * the ``next_cls_cb`` parameter may contain a callable whose 1694 prototype is:: 1695 1696 cbk(pkt:Packet, 1697 lst:List[Packet], 1698 cur:Optional[Packet], 1699 remain:str 1700 ) -> Optional[Type[Packet]] 1701 1702 The pkt argument contains a reference to the Packet instance 1703 containing the PacketListField that is being dissected. 1704 The lst argument is the list of all Packet instances that were 1705 previously parsed during the current ``PacketListField`` 1706 dissection, saved for the very last Packet instance. 1707 The cur argument contains a reference to that very last parsed 1708 ``Packet`` instance. The remain argument contains the bytes 1709 that may still be consumed by the current PacketListField 1710 dissection operation. 1711 1712 This callback returns either the type of the next Packet to 1713 dissect or None to indicate that no more Packet are to be 1714 dissected. 1715 1716 These four arguments allows a variety of dynamic discovery of 1717 the number of Packet to dissect and of the type of each one of 1718 these Packets, including: type determination based on current 1719 Packet instances or its underlayers, continuation based on the 1720 previously parsed Packet instances within that PacketListField, 1721 continuation based on a look-ahead on the bytes to be 1722 dissected... 1723 1724 The pkt_cls and next_cls_cb parameters are semantically exclusive, 1725 although one could specify both. If both are specified, pkt_cls is 1726 silently ignored. The same is true for count_from and next_cls_cb. 1727 1728 length_from and next_cls_cb are compatible and the dissection will 1729 end, whichever of the two stop conditions comes first. 1730 1731 :param name: the name of the field 1732 :param default: the default value of this field; generally an empty 1733 Python list 1734 :param pkt_cls: either a callable returning a Packet instance or a 1735 class object defining a ``dispatch_hook`` class method 1736 :param count_from: a callback returning the number of Packet 1737 instances to dissect. 1738 :param length_from: a callback returning the number of bytes to dissect 1739 :param next_cls_cb: a callback returning either None or the type of 1740 the next Packet to dissect. 1741 :param max_count: an int containing the max amount of results. This is 1742 a safety mechanism, exceeding this value will raise a Scapy_Exception. 1743 """ 1744 if default is None: 1745 default = [] # Create a new list for each instance 1746 super(PacketListField, self).__init__( 1747 name, 1748 default, 1749 pkt_cls # type: ignore 1750 ) 1751 self.count_from = count_from 1752 self.length_from = length_from 1753 self.next_cls_cb = next_cls_cb 1754 self.max_count = max_count 1755 1756 def any2i(self, pkt, x): 1757 # type: (Optional[Packet], Any) -> List[BasePacket] 1758 if not isinstance(x, list): 1759 if x and pkt and hasattr(x, "add_parent"): 1760 x.add_parent(pkt) 1761 return [x] 1762 elif pkt: 1763 for i in x: 1764 if not i or not hasattr(i, "add_parent"): 1765 continue 1766 i.add_parent(pkt) 1767 return x 1768 1769 def i2count(self, 1770 pkt, # type: Optional[Packet] 1771 val, # type: List[BasePacket] 1772 ): 1773 # type: (...) -> int 1774 if isinstance(val, list): 1775 return len(val) 1776 return 1 1777 1778 def i2len(self, 1779 pkt, # type: Optional[Packet] 1780 val, # type: List[Packet] 1781 ): 1782 # type: (...) -> int 1783 return sum(len(self.i2m(pkt, p)) for p in val) 1784 1785 def getfield(self, pkt, s): 1786 # type: (Packet, bytes) -> Tuple[bytes, List[BasePacket]] 1787 c = len_pkt = cls = None 1788 if self.length_from is not None: 1789 len_pkt = self.length_from(pkt) 1790 elif self.count_from is not None: 1791 c = self.count_from(pkt) 1792 if self.next_cls_cb is not None: 1793 cls = self.next_cls_cb(pkt, [], None, s) 1794 c = 1 1795 if cls is None: 1796 c = 0 1797 1798 lst = [] # type: List[BasePacket] 1799 ret = b"" 1800 remain = s 1801 if len_pkt is not None: 1802 remain, ret = s[:len_pkt], s[len_pkt:] 1803 while remain: 1804 if c is not None: 1805 if c <= 0: 1806 break 1807 c -= 1 1808 try: 1809 if cls is not None: 1810 try: 1811 # we want to set parent wherever possible 1812 p = cls(remain, _parent=pkt) 1813 except TypeError: 1814 p = cls(remain) 1815 else: 1816 p = self.m2i(pkt, remain) 1817 except Exception: 1818 if conf.debug_dissector: 1819 raise 1820 p = conf.raw_layer(load=remain) 1821 remain = b"" 1822 else: 1823 if conf.padding_layer in p: 1824 pad = p[conf.padding_layer] 1825 remain = pad.load 1826 del pad.underlayer.payload 1827 if self.next_cls_cb is not None: 1828 cls = self.next_cls_cb(pkt, lst, p, remain) 1829 if cls is not None: 1830 c = 0 if c is None else c 1831 c += 1 1832 else: 1833 remain = b"" 1834 lst.append(p) 1835 if len(lst) > (self.max_count or conf.max_list_count): 1836 raise MaximumItemsCount( 1837 "Maximum amount of items reached in PacketListField: %s " 1838 "(defaults to conf.max_list_count)" 1839 % (self.max_count or conf.max_list_count) 1840 ) 1841 1842 if isinstance(remain, tuple): 1843 remain, nb = remain 1844 return (remain + ret, nb), lst 1845 else: 1846 return remain + ret, lst 1847 1848 def i2m(self, 1849 pkt, # type: Optional[Packet] 1850 i, # type: Any 1851 ): 1852 # type: (...) -> bytes 1853 return bytes_encode(i) 1854 1855 def addfield(self, pkt, s, val): 1856 # type: (Packet, bytes, Any) -> bytes 1857 return s + b"".join(self.i2m(pkt, v) for v in val) 1858 1859 1860class StrFixedLenField(StrField): 1861 __slots__ = ["length_from"] 1862 1863 def __init__( 1864 self, 1865 name, # type: str 1866 default, # type: Optional[bytes] 1867 length=None, # type: Optional[int] 1868 length_from=None, # type: Optional[Callable[[Packet], int]] # noqa: E501 1869 ): 1870 # type: (...) -> None 1871 super(StrFixedLenField, self).__init__(name, default) 1872 self.length_from = length_from or (lambda x: 0) 1873 if length is not None: 1874 self.sz = length 1875 self.length_from = lambda x, length=length: length # type: ignore 1876 1877 def i2repr(self, 1878 pkt, # type: Optional[Packet] 1879 v, # type: bytes 1880 ): 1881 # type: (...) -> str 1882 if isinstance(v, bytes): 1883 v = v.rstrip(b"\0") 1884 return super(StrFixedLenField, self).i2repr(pkt, v) 1885 1886 def getfield(self, pkt, s): 1887 # type: (Packet, bytes) -> Tuple[bytes, bytes] 1888 len_pkt = self.length_from(pkt) 1889 if len_pkt == 0: 1890 return s, b"" 1891 return s[len_pkt:], self.m2i(pkt, s[:len_pkt]) 1892 1893 def addfield(self, pkt, s, val): 1894 # type: (Packet, bytes, Optional[bytes]) -> bytes 1895 len_pkt = self.length_from(pkt) 1896 if len_pkt is None: 1897 return s + self.i2m(pkt, val) 1898 return s + struct.pack("%is" % len_pkt, self.i2m(pkt, val)) 1899 1900 def randval(self): 1901 # type: () -> RandBin 1902 try: 1903 return RandBin(self.length_from(None)) # type: ignore 1904 except Exception: 1905 return RandBin(RandNum(0, 200)) 1906 1907 1908class StrFixedLenFieldUtf16(StrFixedLenField, StrFieldUtf16): 1909 pass 1910 1911 1912class StrFixedLenEnumField(_StrEnumField, StrFixedLenField): 1913 __slots__ = ["enum"] 1914 1915 def __init__( 1916 self, 1917 name, # type: str 1918 default, # type: bytes 1919 enum=None, # type: Optional[Dict[str, str]] 1920 length=None, # type: Optional[int] 1921 length_from=None # type: Optional[Callable[[Optional[Packet]], int]] # noqa: E501 1922 ): 1923 # type: (...) -> None 1924 StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from) # noqa: E501 1925 self.enum = enum 1926 1927 1928class NetBIOSNameField(StrFixedLenField): 1929 def __init__(self, name, default, length=31): 1930 # type: (str, bytes, int) -> None 1931 StrFixedLenField.__init__(self, name, default, length) 1932 1933 def h2i(self, pkt, x): 1934 # type: (Optional[Packet], bytes) -> bytes 1935 if x and len(x) > 15: 1936 x = x[:15] 1937 return x 1938 1939 def i2m(self, pkt, y): 1940 # type: (Optional[Packet], Optional[bytes]) -> bytes 1941 if pkt: 1942 len_pkt = self.length_from(pkt) // 2 1943 else: 1944 len_pkt = 0 1945 x = bytes_encode(y or b"") # type: bytes 1946 x += b" " * len_pkt 1947 x = x[:len_pkt] 1948 x = b"".join( 1949 struct.pack( 1950 "!BB", 1951 0x41 + (b >> 4), 1952 0x41 + (b & 0xf), 1953 ) 1954 for b in x 1955 ) 1956 return b" " + x 1957 1958 def m2i(self, pkt, x): 1959 # type: (Optional[Packet], bytes) -> bytes 1960 x = x[1:].strip(b"\x00") 1961 return b"".join(map( 1962 lambda x, y: struct.pack( 1963 "!B", 1964 (((x - 1) & 0xf) << 4) + ((y - 1) & 0xf) 1965 ), 1966 x[::2], x[1::2] 1967 )).rstrip(b" ") 1968 1969 1970class StrLenField(StrField): 1971 """ 1972 StrField with a length 1973 1974 :param length_from: a function that returns the size of the string 1975 :param max_length: max size to use as randval 1976 """ 1977 __slots__ = ["length_from", "max_length"] 1978 ON_WIRE_SIZE_UTF16 = True 1979 1980 def __init__( 1981 self, 1982 name, # type: str 1983 default, # type: bytes 1984 length_from=None, # type: Optional[Callable[[Packet], int]] 1985 max_length=None, # type: Optional[Any] 1986 ): 1987 # type: (...) -> None 1988 super(StrLenField, self).__init__(name, default) 1989 self.length_from = length_from 1990 self.max_length = max_length 1991 1992 def getfield(self, pkt, s): 1993 # type: (Any, bytes) -> Tuple[bytes, bytes] 1994 len_pkt = (self.length_from or (lambda x: 0))(pkt) 1995 if not self.ON_WIRE_SIZE_UTF16: 1996 len_pkt *= 2 1997 if len_pkt == 0: 1998 return s, b"" 1999 return s[len_pkt:], self.m2i(pkt, s[:len_pkt]) 2000 2001 def randval(self): 2002 # type: () -> RandBin 2003 return RandBin(RandNum(0, self.max_length or 1200)) 2004 2005 2006class _XStrField(Field[bytes, bytes]): 2007 def i2repr(self, pkt, x): 2008 # type: (Optional[Packet], bytes) -> str 2009 if isinstance(x, bytes): 2010 return bytes_hex(x).decode() 2011 return super(_XStrField, self).i2repr(pkt, x) 2012 2013 2014class XStrField(_XStrField, StrField): 2015 """ 2016 StrField which value is printed as hexadecimal. 2017 """ 2018 2019 2020class XStrLenField(_XStrField, StrLenField): 2021 """ 2022 StrLenField which value is printed as hexadecimal. 2023 """ 2024 2025 2026class XStrFixedLenField(_XStrField, StrFixedLenField): 2027 """ 2028 StrFixedLenField which value is printed as hexadecimal. 2029 """ 2030 2031 2032class XLEStrLenField(XStrLenField): 2033 def i2m(self, pkt, x): 2034 # type: (Optional[Packet], Optional[bytes]) -> bytes 2035 if not x: 2036 return b"" 2037 return x[:: -1] 2038 2039 def m2i(self, pkt, x): 2040 # type: (Optional[Packet], bytes) -> bytes 2041 return x[:: -1] 2042 2043 2044class StrLenFieldUtf16(StrLenField, StrFieldUtf16): 2045 pass 2046 2047 2048class StrLenEnumField(_StrEnumField, StrLenField): 2049 __slots__ = ["enum"] 2050 2051 def __init__( 2052 self, 2053 name, # type: str 2054 default, # type: bytes 2055 enum=None, # type: Optional[Dict[str, str]] 2056 **kwargs # type: Any 2057 ): 2058 # type: (...) -> None 2059 StrLenField.__init__(self, name, default, **kwargs) 2060 self.enum = enum 2061 2062 2063class BoundStrLenField(StrLenField): 2064 __slots__ = ["minlen", "maxlen"] 2065 2066 def __init__( 2067 self, 2068 name, # type: str 2069 default, # type: bytes 2070 minlen=0, # type: int 2071 maxlen=255, # type: int 2072 length_from=None # type: Optional[Callable[[Packet], int]] 2073 ): 2074 # type: (...) -> None 2075 StrLenField.__init__(self, name, default, length_from=length_from) 2076 self.minlen = minlen 2077 self.maxlen = maxlen 2078 2079 def randval(self): 2080 # type: () -> RandBin 2081 return RandBin(RandNum(self.minlen, self.maxlen)) 2082 2083 2084class FieldListField(Field[List[Any], List[Any]]): 2085 __slots__ = ["field", "count_from", "length_from", "max_count"] 2086 islist = 1 2087 2088 def __init__( 2089 self, 2090 name, # type: str 2091 default, # type: Optional[List[AnyField]] 2092 field, # type: AnyField 2093 length_from=None, # type: Optional[Callable[[Packet], int]] 2094 count_from=None, # type: Optional[Callable[[Packet], int]] 2095 max_count=None, # type: Optional[int] 2096 ): 2097 # type: (...) -> None 2098 if default is None: 2099 default = [] # Create a new list for each instance 2100 self.field = field 2101 Field.__init__(self, name, default) 2102 self.count_from = count_from 2103 self.length_from = length_from 2104 self.max_count = max_count 2105 2106 def i2count(self, pkt, val): 2107 # type: (Optional[Packet], List[Any]) -> int 2108 if isinstance(val, list): 2109 return len(val) 2110 return 1 2111 2112 def i2len(self, pkt, val): 2113 # type: (Packet, List[Any]) -> int 2114 return int(sum(self.field.i2len(pkt, v) for v in val)) 2115 2116 def any2i(self, pkt, x): 2117 # type: (Optional[Packet], List[Any]) -> List[Any] 2118 if not isinstance(x, list): 2119 return [self.field.any2i(pkt, x)] 2120 else: 2121 return [self.field.any2i(pkt, e) for e in x] 2122 2123 def i2repr(self, 2124 pkt, # type: Optional[Packet] 2125 x, # type: List[Any] 2126 ): 2127 # type: (...) -> str 2128 return "[%s]" % ", ".join(self.field.i2repr(pkt, v) for v in x) 2129 2130 def addfield(self, 2131 pkt, # type: Packet 2132 s, # type: bytes 2133 val, # type: Optional[List[Any]] 2134 ): 2135 # type: (...) -> bytes 2136 val = self.i2m(pkt, val) 2137 for v in val: 2138 s = self.field.addfield(pkt, s, v) 2139 return s 2140 2141 def getfield(self, 2142 pkt, # type: Packet 2143 s, # type: bytes 2144 ): 2145 # type: (...) -> Any 2146 c = len_pkt = None 2147 if self.length_from is not None: 2148 len_pkt = self.length_from(pkt) 2149 elif self.count_from is not None: 2150 c = self.count_from(pkt) 2151 2152 val = [] 2153 ret = b"" 2154 if len_pkt is not None: 2155 s, ret = s[:len_pkt], s[len_pkt:] 2156 2157 while s: 2158 if c is not None: 2159 if c <= 0: 2160 break 2161 c -= 1 2162 s, v = self.field.getfield(pkt, s) 2163 val.append(v) 2164 if len(val) > (self.max_count or conf.max_list_count): 2165 raise MaximumItemsCount( 2166 "Maximum amount of items reached in FieldListField: %s " 2167 "(defaults to conf.max_list_count)" 2168 % (self.max_count or conf.max_list_count) 2169 ) 2170 2171 if isinstance(s, tuple): 2172 s, bn = s 2173 return (s + ret, bn), val 2174 else: 2175 return s + ret, val 2176 2177 2178class FieldLenField(Field[int, int]): 2179 __slots__ = ["length_of", "count_of", "adjust"] 2180 2181 def __init__( 2182 self, 2183 name, # type: str 2184 default, # type: Optional[Any] 2185 length_of=None, # type: Optional[str] 2186 fmt="H", # type: str 2187 count_of=None, # type: Optional[str] 2188 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int] 2189 ): 2190 # type: (...) -> None 2191 Field.__init__(self, name, default, fmt) 2192 self.length_of = length_of 2193 self.count_of = count_of 2194 self.adjust = adjust 2195 2196 def i2m(self, pkt, x): 2197 # type: (Optional[Packet], Optional[int]) -> int 2198 if x is None and pkt is not None: 2199 if self.length_of is not None: 2200 fld, fval = pkt.getfield_and_val(self.length_of) 2201 f = fld.i2len(pkt, fval) 2202 elif self.count_of is not None: 2203 fld, fval = pkt.getfield_and_val(self.count_of) 2204 f = fld.i2count(pkt, fval) 2205 else: 2206 raise ValueError( 2207 "Field should have either length_of or count_of" 2208 ) 2209 x = self.adjust(pkt, f) 2210 elif x is None: 2211 x = 0 2212 return x 2213 2214 2215class StrNullField(StrField): 2216 DELIMITER = b"\x00" 2217 2218 def addfield(self, pkt, s, val): 2219 # type: (Packet, bytes, Optional[bytes]) -> bytes 2220 return s + self.i2m(pkt, val) + self.DELIMITER 2221 2222 def getfield(self, 2223 pkt, # type: Packet 2224 s, # type: bytes 2225 ): 2226 # type: (...) -> Tuple[bytes, bytes] 2227 len_str = 0 2228 while True: 2229 len_str = s.find(self.DELIMITER, len_str) 2230 if len_str < 0: 2231 # DELIMITER not found: return empty 2232 return b"", s 2233 if len_str % len(self.DELIMITER): 2234 len_str += 1 2235 else: 2236 break 2237 return s[len_str + len(self.DELIMITER):], self.m2i(pkt, s[:len_str]) 2238 2239 def randval(self): 2240 # type: () -> RandTermString 2241 return RandTermString(RandNum(0, 1200), self.DELIMITER) 2242 2243 def i2len(self, pkt, x): 2244 # type: (Optional[Packet], Any) -> int 2245 return super(StrNullField, self).i2len(pkt, x) + 1 2246 2247 2248class StrNullFieldUtf16(StrNullField, StrFieldUtf16): 2249 DELIMITER = b"\x00\x00" 2250 2251 2252class StrStopField(StrField): 2253 __slots__ = ["stop", "additional"] 2254 2255 def __init__(self, name, default, stop, additional=0): 2256 # type: (str, str, bytes, int) -> None 2257 Field.__init__(self, name, default) 2258 self.stop = stop 2259 self.additional = additional 2260 2261 def getfield(self, pkt, s): 2262 # type: (Optional[Packet], bytes) -> Tuple[bytes, bytes] 2263 len_str = s.find(self.stop) 2264 if len_str < 0: 2265 return b"", s 2266 len_str += len(self.stop) + self.additional 2267 return s[len_str:], s[:len_str] 2268 2269 def randval(self): 2270 # type: () -> RandTermString 2271 return RandTermString(RandNum(0, 1200), self.stop) 2272 2273 2274class LenField(Field[int, int]): 2275 """ 2276 If None, will be filled with the size of the payload 2277 """ 2278 __slots__ = ["adjust"] 2279 2280 def __init__(self, name, default, fmt="H", adjust=lambda x: x): 2281 # type: (str, Optional[Any], str, Callable[[int], int]) -> None 2282 Field.__init__(self, name, default, fmt) 2283 self.adjust = adjust 2284 2285 def i2m(self, 2286 pkt, # type: Optional[Packet] 2287 x, # type: Optional[int] 2288 ): 2289 # type: (...) -> int 2290 if x is None: 2291 x = 0 2292 if pkt is not None: 2293 x = self.adjust(len(pkt.payload)) 2294 return x 2295 2296 2297class BCDFloatField(Field[float, int]): 2298 def i2m(self, pkt, x): 2299 # type: (Optional[Packet], Optional[float]) -> int 2300 if x is None: 2301 return 0 2302 return int(256 * x) 2303 2304 def m2i(self, pkt, x): 2305 # type: (Optional[Packet], int) -> float 2306 return x / 256.0 2307 2308 2309class _BitField(Field[I, int]): 2310 """ 2311 Field to handle bits. 2312 2313 :param name: name of the field 2314 :param default: default value 2315 :param size: size (in bits). If negative, Low endian 2316 :param tot_size: size of the total group of bits (in bytes) the bitfield 2317 is in. If negative, Low endian. 2318 :param end_tot_size: same but for the BitField ending a group. 2319 2320 Example - normal usage:: 2321 2322 0 1 2 3 2323 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2324 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 2325 | A | B | C | 2326 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ 2327 2328 Fig. TestPacket 2329 2330 class TestPacket(Packet): 2331 fields_desc = [ 2332 BitField("a", 0, 14), 2333 BitField("b", 0, 16), 2334 BitField("c", 0, 2), 2335 ] 2336 2337 Example - Low endian stored as 16 bits on the network:: 2338 2339 x x x x x x x x x x x x x x x x 2340 a [b] [ c ] [ a ] 2341 2342 Will first get reversed during dissecion: 2343 2344 x x x x x x x x x x x x x x x x 2345 [ a ] [b] [ c ] 2346 2347 class TestPacket(Packet): 2348 fields_desc = [ 2349 BitField("a", 0, 9, tot_size=-2), 2350 BitField("b", 0, 2), 2351 BitField("c", 0, 5, end_tot_size=-2) 2352 ] 2353 2354 """ 2355 __slots__ = ["rev", "size", "tot_size", "end_tot_size"] 2356 2357 def __init__(self, name, default, size, 2358 tot_size=0, end_tot_size=0): 2359 # type: (str, Optional[I], int, int, int) -> None 2360 Field.__init__(self, name, default) 2361 if callable(size): 2362 size = size(self) 2363 self.rev = size < 0 or tot_size < 0 or end_tot_size < 0 2364 self.size = abs(size) 2365 if not tot_size: 2366 tot_size = self.size // 8 2367 self.tot_size = abs(tot_size) 2368 if not end_tot_size: 2369 end_tot_size = self.size // 8 2370 self.end_tot_size = abs(end_tot_size) 2371 # Fields always have a round sz except BitField 2372 # so to keep it simple, we'll ignore it here. 2373 self.sz = self.size / 8. # type: ignore 2374 2375 # We need to # type: ignore a few things because of how special 2376 # BitField is 2377 def addfield(self, # type: ignore 2378 pkt, # type: Packet 2379 s, # type: Union[Tuple[bytes, int, int], bytes] 2380 ival, # type: I 2381 ): 2382 # type: (...) -> Union[Tuple[bytes, int, int], bytes] 2383 val = self.i2m(pkt, ival) 2384 if isinstance(s, tuple): 2385 s, bitsdone, v = s 2386 else: 2387 bitsdone = 0 2388 v = 0 2389 v <<= self.size 2390 v |= val & ((1 << self.size) - 1) 2391 bitsdone += self.size 2392 while bitsdone >= 8: 2393 bitsdone -= 8 2394 s = s + struct.pack("!B", v >> bitsdone) 2395 v &= (1 << bitsdone) - 1 2396 if bitsdone: 2397 return s, bitsdone, v 2398 else: 2399 # Apply LE if necessary 2400 if self.rev and self.end_tot_size > 1: 2401 s = s[:-self.end_tot_size] + s[-self.end_tot_size:][::-1] 2402 return s 2403 2404 def getfield(self, # type: ignore 2405 pkt, # type: Packet 2406 s, # type: Union[Tuple[bytes, int], bytes] 2407 ): 2408 # type: (...) -> Union[Tuple[Tuple[bytes, int], I], Tuple[bytes, I]] # noqa: E501 2409 if isinstance(s, tuple): 2410 s, bn = s 2411 else: 2412 bn = 0 2413 # Apply LE if necessary 2414 if self.rev and self.tot_size > 1: 2415 s = s[:self.tot_size][::-1] + s[self.tot_size:] 2416 2417 # we don't want to process all the string 2418 nb_bytes = (self.size + bn - 1) // 8 + 1 2419 w = s[:nb_bytes] 2420 2421 # split the substring byte by byte 2422 _bytes = struct.unpack('!%dB' % nb_bytes, w) 2423 2424 b = 0 2425 for c in range(nb_bytes): 2426 b |= int(_bytes[c]) << (nb_bytes - c - 1) * 8 2427 2428 # get rid of high order bits 2429 b &= (1 << (nb_bytes * 8 - bn)) - 1 2430 2431 # remove low order bits 2432 b = b >> (nb_bytes * 8 - self.size - bn) 2433 2434 bn += self.size 2435 s = s[bn // 8:] 2436 bn = bn % 8 2437 b2 = self.m2i(pkt, b) 2438 if bn: 2439 return (s, bn), b2 2440 else: 2441 return s, b2 2442 2443 def randval(self): 2444 # type: () -> RandNum 2445 return RandNum(0, 2**self.size - 1) 2446 2447 def i2len(self, pkt, x): # type: ignore 2448 # type: (Optional[Packet], Optional[float]) -> float 2449 return float(self.size) / 8 2450 2451 2452class BitField(_BitField[int]): 2453 __doc__ = _BitField.__doc__ 2454 2455 2456class BitLenField(BitField): 2457 __slots__ = ["length_from"] 2458 2459 def __init__(self, 2460 name, # type: str 2461 default, # type: Optional[int] 2462 length_from # type: Callable[[Packet], int] 2463 ): 2464 # type: (...) -> None 2465 self.length_from = length_from 2466 super(BitLenField, self).__init__(name, default, 0) 2467 2468 def getfield(self, # type: ignore 2469 pkt, # type: Packet 2470 s, # type: Union[Tuple[bytes, int], bytes] 2471 ): 2472 # type: (...) -> Union[Tuple[Tuple[bytes, int], int], Tuple[bytes, int]] # noqa: E501 2473 self.size = self.length_from(pkt) 2474 return super(BitLenField, self).getfield(pkt, s) 2475 2476 def addfield(self, # type: ignore 2477 pkt, # type: Packet 2478 s, # type: Union[Tuple[bytes, int, int], bytes] 2479 val # type: int 2480 ): 2481 # type: (...) -> Union[Tuple[bytes, int, int], bytes] 2482 self.size = self.length_from(pkt) 2483 return super(BitLenField, self).addfield(pkt, s, val) 2484 2485 2486class BitFieldLenField(BitField): 2487 __slots__ = ["length_of", "count_of", "adjust", "tot_size", "end_tot_size"] 2488 2489 def __init__(self, 2490 name, # type: str 2491 default, # type: Optional[int] 2492 size, # type: int 2493 length_of=None, # type: Optional[Union[Callable[[Optional[Packet]], int], str]] # noqa: E501 2494 count_of=None, # type: Optional[str] 2495 adjust=lambda pkt, x: x, # type: Callable[[Optional[Packet], int], int] # noqa: E501 2496 tot_size=0, # type: int 2497 end_tot_size=0, # type: int 2498 ): 2499 # type: (...) -> None 2500 super(BitFieldLenField, self).__init__(name, default, size, 2501 tot_size, end_tot_size) 2502 self.length_of = length_of 2503 self.count_of = count_of 2504 self.adjust = adjust 2505 2506 def i2m(self, pkt, x): 2507 # type: (Optional[Packet], Optional[Any]) -> int 2508 return FieldLenField.i2m(self, pkt, x) # type: ignore 2509 2510 2511class XBitField(BitField): 2512 def i2repr(self, pkt, x): 2513 # type: (Optional[Packet], int) -> str 2514 return lhex(self.i2h(pkt, x)) 2515 2516 2517class _EnumField(Field[Union[List[I], I], I]): 2518 def __init__(self, 2519 name, # type: str 2520 default, # type: Optional[I] 2521 enum, # type: Union[Dict[I, str], Dict[str, I], List[str], DADict[I, str], Type[Enum], Tuple[Callable[[I], str], Callable[[str], I]]] # noqa: E501 2522 fmt="H", # type: str 2523 ): 2524 # type: (...) -> None 2525 """ Initializes enum fields. 2526 2527 @param name: name of this field 2528 @param default: default value of this field 2529 @param enum: either an enum, a dict or a tuple of two callables. 2530 Dict keys are the internal values, while the dict 2531 values are the user-friendly representations. If the 2532 tuple is provided, the first callable receives the 2533 internal value as parameter and returns the 2534 user-friendly representation and the second callable 2535 does the converse. The first callable may return None 2536 to default to a literal string (repr()) representation. 2537 @param fmt: struct.pack format used to parse and serialize the 2538 internal value from and to machine representation. 2539 """ 2540 if isinstance(enum, ObservableDict): 2541 cast(ObservableDict, enum).observe(self) 2542 2543 if isinstance(enum, tuple): 2544 self.i2s_cb = enum[0] # type: Optional[Callable[[I], str]] 2545 self.s2i_cb = enum[1] # type: Optional[Callable[[str], I]] 2546 self.i2s = None # type: Optional[Dict[I, str]] 2547 self.s2i = None # type: Optional[Dict[str, I]] 2548 elif isinstance(enum, type) and issubclass(enum, Enum): 2549 # Python's Enum 2550 i2s = self.i2s = {} 2551 s2i = self.s2i = {} 2552 self.i2s_cb = None 2553 self.s2i_cb = None 2554 names = [x.name for x in enum] 2555 for n in names: 2556 value = enum[n].value 2557 i2s[value] = n 2558 s2i[n] = value 2559 else: 2560 i2s = self.i2s = {} 2561 s2i = self.s2i = {} 2562 self.i2s_cb = None 2563 self.s2i_cb = None 2564 keys = [] # type: List[I] 2565 if isinstance(enum, list): 2566 keys = list(range(len(enum))) # type: ignore 2567 elif isinstance(enum, DADict): 2568 keys = enum.keys() 2569 else: 2570 keys = list(enum) # type: ignore 2571 if any(isinstance(x, str) for x in keys): 2572 i2s, s2i = s2i, i2s # type: ignore 2573 for k in keys: 2574 value = cast(str, enum[k]) # type: ignore 2575 i2s[k] = value 2576 s2i[value] = k 2577 Field.__init__(self, name, default, fmt) 2578 2579 def any2i_one(self, pkt, x): 2580 # type: (Optional[Packet], Any) -> I 2581 if isinstance(x, Enum): 2582 return cast(I, x.value) 2583 elif isinstance(x, str): 2584 if self.s2i: 2585 x = self.s2i[x] 2586 elif self.s2i_cb: 2587 x = self.s2i_cb(x) 2588 return cast(I, x) 2589 2590 def _i2repr(self, pkt, x): 2591 # type: (Optional[Packet], I) -> str 2592 return repr(x) 2593 2594 def i2repr_one(self, pkt, x): 2595 # type: (Optional[Packet], I) -> str 2596 if self not in conf.noenum and not isinstance(x, VolatileValue): 2597 if self.i2s: 2598 try: 2599 return self.i2s[x] 2600 except KeyError: 2601 pass 2602 elif self.i2s_cb: 2603 ret = self.i2s_cb(x) 2604 if ret is not None: 2605 return ret 2606 return self._i2repr(pkt, x) 2607 2608 def any2i(self, pkt, x): 2609 # type: (Optional[Packet], Any) -> Union[I, List[I]] 2610 if isinstance(x, list): 2611 return [self.any2i_one(pkt, z) for z in x] 2612 else: 2613 return self.any2i_one(pkt, x) 2614 2615 def i2repr(self, pkt, x): # type: ignore 2616 # type: (Optional[Packet], Any) -> Union[List[str], str] 2617 if isinstance(x, list): 2618 return [self.i2repr_one(pkt, z) for z in x] 2619 else: 2620 return self.i2repr_one(pkt, x) 2621 2622 def notify_set(self, enum, key, value): 2623 # type: (ObservableDict, I, str) -> None 2624 ks = "0x%x" if isinstance(key, int) else "%s" 2625 log_runtime.debug( 2626 "At %s: Change to %s at " + ks, self, value, key 2627 ) 2628 if self.i2s is not None and self.s2i is not None: 2629 self.i2s[key] = value 2630 self.s2i[value] = key 2631 2632 def notify_del(self, enum, key): 2633 # type: (ObservableDict, I) -> None 2634 ks = "0x%x" if isinstance(key, int) else "%s" 2635 log_runtime.debug("At %s: Delete value at " + ks, self, key) 2636 if self.i2s is not None and self.s2i is not None: 2637 value = self.i2s[key] 2638 del self.i2s[key] 2639 del self.s2i[value] 2640 2641 2642class EnumField(_EnumField[I]): 2643 __slots__ = ["i2s", "s2i", "s2i_cb", "i2s_cb"] 2644 2645 2646class CharEnumField(EnumField[str]): 2647 def __init__(self, 2648 name, # type: str 2649 default, # type: str 2650 enum, # type: Union[Dict[str, str], Tuple[Callable[[str], str], Callable[[str], str]]] # noqa: E501 2651 fmt="1s", # type: str 2652 ): 2653 # type: (...) -> None 2654 super(CharEnumField, self).__init__(name, default, enum, fmt) 2655 if self.i2s is not None: 2656 k = list(self.i2s) 2657 if k and len(k[0]) != 1: 2658 self.i2s, self.s2i = self.s2i, self.i2s 2659 2660 def any2i_one(self, pkt, x): 2661 # type: (Optional[Packet], str) -> str 2662 if len(x) != 1: 2663 if self.s2i: 2664 x = self.s2i[x] 2665 elif self.s2i_cb: 2666 x = self.s2i_cb(x) 2667 return x 2668 2669 2670class BitEnumField(_BitField[Union[List[int], int]], _EnumField[int]): 2671 __slots__ = EnumField.__slots__ 2672 2673 def __init__(self, name, default, size, enum, **kwargs): 2674 # type: (str, Optional[int], int, Dict[int, str], **Any) -> None 2675 _EnumField.__init__(self, name, default, enum) 2676 _BitField.__init__(self, name, default, size, **kwargs) 2677 2678 def any2i(self, pkt, x): 2679 # type: (Optional[Packet], Any) -> Union[List[int], int] 2680 return _EnumField.any2i(self, pkt, x) 2681 2682 def i2repr(self, 2683 pkt, # type: Optional[Packet] 2684 x, # type: Union[List[int], int] 2685 ): 2686 # type: (...) -> Any 2687 return _EnumField.i2repr(self, pkt, x) 2688 2689 2690class BitLenEnumField(BitLenField, _EnumField[int]): 2691 __slots__ = EnumField.__slots__ 2692 2693 def __init__(self, 2694 name, # type: str 2695 default, # type: Optional[int] 2696 length_from, # type: Callable[[Packet], int] 2697 enum, # type: Dict[int, str] 2698 **kwargs, # type: Any 2699 ): 2700 # type: (...) -> None 2701 _EnumField.__init__(self, name, default, enum) 2702 BitLenField.__init__(self, name, default, length_from, **kwargs) 2703 2704 def any2i(self, pkt, x): 2705 # type: (Optional[Packet], Any) -> int 2706 return _EnumField.any2i(self, pkt, x) # type: ignore 2707 2708 def i2repr(self, 2709 pkt, # type: Optional[Packet] 2710 x, # type: Union[List[int], int] 2711 ): 2712 # type: (...) -> Any 2713 return _EnumField.i2repr(self, pkt, x) 2714 2715 2716class ShortEnumField(EnumField[int]): 2717 __slots__ = EnumField.__slots__ 2718 2719 def __init__(self, 2720 name, # type: str 2721 default, # type: int 2722 enum, # type: Union[Dict[int, str], Dict[str, int], Tuple[Callable[[int], str], Callable[[str], int]], DADict[int, str]] # noqa: E501 2723 ): 2724 # type: (...) -> None 2725 super(ShortEnumField, self).__init__(name, default, enum, "H") 2726 2727 2728class LEShortEnumField(EnumField[int]): 2729 def __init__(self, name, default, enum): 2730 # type: (str, int, Union[Dict[int, str], List[str]]) -> None 2731 super(LEShortEnumField, self).__init__(name, default, enum, "<H") 2732 2733 2734class LongEnumField(EnumField[int]): 2735 def __init__(self, name, default, enum): 2736 # type: (str, int, Union[Dict[int, str], List[str]]) -> None 2737 super(LongEnumField, self).__init__(name, default, enum, "Q") 2738 2739 2740class LELongEnumField(EnumField[int]): 2741 def __init__(self, name, default, enum): 2742 # type: (str, int, Union[Dict[int, str], List[str]]) -> None 2743 super(LELongEnumField, self).__init__(name, default, enum, "<Q") 2744 2745 2746class ByteEnumField(EnumField[int]): 2747 def __init__(self, name, default, enum): 2748 # type: (str, Optional[int], Dict[int, str]) -> None 2749 super(ByteEnumField, self).__init__(name, default, enum, "B") 2750 2751 2752class XByteEnumField(ByteEnumField): 2753 def i2repr_one(self, pkt, x): 2754 # type: (Optional[Packet], int) -> str 2755 if self not in conf.noenum and not isinstance(x, VolatileValue): 2756 if self.i2s: 2757 try: 2758 return self.i2s[x] 2759 except KeyError: 2760 pass 2761 elif self.i2s_cb: 2762 ret = self.i2s_cb(x) 2763 if ret is not None: 2764 return ret 2765 return lhex(x) 2766 2767 2768class IntEnumField(EnumField[int]): 2769 def __init__(self, name, default, enum): 2770 # type: (str, Optional[int], Dict[int, str]) -> None 2771 super(IntEnumField, self).__init__(name, default, enum, "I") 2772 2773 2774class SignedIntEnumField(EnumField[int]): 2775 def __init__(self, name, default, enum): 2776 # type: (str, Optional[int], Dict[int, str]) -> None 2777 super(SignedIntEnumField, self).__init__(name, default, enum, "i") 2778 2779 2780class LEIntEnumField(EnumField[int]): 2781 def __init__(self, name, default, enum): 2782 # type: (str, int, Dict[int, str]) -> None 2783 super(LEIntEnumField, self).__init__(name, default, enum, "<I") 2784 2785 2786class XShortEnumField(ShortEnumField): 2787 def _i2repr(self, pkt, x): 2788 # type: (Optional[Packet], Any) -> str 2789 return lhex(x) 2790 2791 2792class LE3BytesEnumField(LEThreeBytesField, _EnumField[int]): 2793 __slots__ = EnumField.__slots__ 2794 2795 def __init__(self, name, default, enum): 2796 # type: (str, Optional[int], Dict[int, str]) -> None 2797 _EnumField.__init__(self, name, default, enum) 2798 LEThreeBytesField.__init__(self, name, default) 2799 2800 def any2i(self, pkt, x): 2801 # type: (Optional[Packet], Any) -> int 2802 return _EnumField.any2i(self, pkt, x) # type: ignore 2803 2804 def i2repr(self, pkt, x): # type: ignore 2805 # type: (Optional[Packet], Any) -> Union[List[str], str] 2806 return _EnumField.i2repr(self, pkt, x) 2807 2808 2809class XLE3BytesEnumField(LE3BytesEnumField): 2810 def _i2repr(self, pkt, x): 2811 # type: (Optional[Packet], Any) -> str 2812 return lhex(x) 2813 2814 2815class _MultiEnumField(_EnumField[I]): 2816 def __init__(self, 2817 name, # type: str 2818 default, # type: int 2819 enum, # type: Dict[I, Dict[I, str]] 2820 depends_on, # type: Callable[[Optional[Packet]], I] 2821 fmt="H" # type: str 2822 ): 2823 # type: (...) -> None 2824 2825 self.depends_on = depends_on 2826 self.i2s_multi = enum 2827 self.s2i_multi = {} # type: Dict[I, Dict[str, I]] 2828 self.s2i_all = {} # type: Dict[str, I] 2829 for m in enum: 2830 s2i = {} # type: Dict[str, I] 2831 self.s2i_multi[m] = s2i 2832 for k, v in enum[m].items(): 2833 s2i[v] = k 2834 self.s2i_all[v] = k 2835 Field.__init__(self, name, default, fmt) 2836 2837 def any2i_one(self, pkt, x): 2838 # type: (Optional[Packet], Any) -> I 2839 if isinstance(x, str): 2840 v = self.depends_on(pkt) 2841 if v in self.s2i_multi: 2842 s2i = self.s2i_multi[v] 2843 if x in s2i: 2844 return s2i[x] 2845 return self.s2i_all[x] 2846 return cast(I, x) 2847 2848 def i2repr_one(self, pkt, x): 2849 # type: (Optional[Packet], I) -> str 2850 v = self.depends_on(pkt) 2851 if isinstance(v, VolatileValue): 2852 return repr(v) 2853 if v in self.i2s_multi: 2854 return str(self.i2s_multi[v].get(x, x)) 2855 return str(x) 2856 2857 2858class MultiEnumField(_MultiEnumField[int], EnumField[int]): 2859 __slots__ = ["depends_on", "i2s_multi", "s2i_multi", "s2i_all"] 2860 2861 2862class BitMultiEnumField(_BitField[Union[List[int], int]], 2863 _MultiEnumField[int]): 2864 __slots__ = EnumField.__slots__ + MultiEnumField.__slots__ 2865 2866 def __init__( 2867 self, 2868 name, # type: str 2869 default, # type: int 2870 size, # type: int 2871 enum, # type: Dict[int, Dict[int, str]] 2872 depends_on # type: Callable[[Optional[Packet]], int] 2873 ): 2874 # type: (...) -> None 2875 _MultiEnumField.__init__(self, name, default, enum, depends_on) 2876 self.rev = size < 0 2877 self.size = abs(size) 2878 self.sz = self.size / 8. # type: ignore 2879 2880 def any2i(self, pkt, x): 2881 # type: (Optional[Packet], Any) -> Union[List[int], int] 2882 return _MultiEnumField[int].any2i( 2883 self, # type: ignore 2884 pkt, 2885 x 2886 ) 2887 2888 def i2repr( # type: ignore 2889 self, 2890 pkt, # type: Optional[Packet] 2891 x # type: Union[List[int], int] 2892 ): 2893 # type: (...) -> Union[str, List[str]] 2894 return _MultiEnumField[int].i2repr( 2895 self, # type: ignore 2896 pkt, 2897 x 2898 ) 2899 2900 2901class ByteEnumKeysField(ByteEnumField): 2902 """ByteEnumField that picks valid values when fuzzed. """ 2903 2904 def randval(self): 2905 # type: () -> RandEnumKeys 2906 return RandEnumKeys(self.i2s or {}) 2907 2908 2909class ShortEnumKeysField(ShortEnumField): 2910 """ShortEnumField that picks valid values when fuzzed. """ 2911 2912 def randval(self): 2913 # type: () -> RandEnumKeys 2914 return RandEnumKeys(self.i2s or {}) 2915 2916 2917class IntEnumKeysField(IntEnumField): 2918 """IntEnumField that picks valid values when fuzzed. """ 2919 2920 def randval(self): 2921 # type: () -> RandEnumKeys 2922 return RandEnumKeys(self.i2s or {}) 2923 2924 2925# Little endian fixed length field 2926 2927 2928class LEFieldLenField(FieldLenField): 2929 def __init__( 2930 self, 2931 name, # type: str 2932 default, # type: Optional[Any] 2933 length_of=None, # type: Optional[str] 2934 fmt="<H", # type: str 2935 count_of=None, # type: Optional[str] 2936 adjust=lambda pkt, x: x, # type: Callable[[Packet, int], int] 2937 ): 2938 # type: (...) -> None 2939 FieldLenField.__init__( 2940 self, name, default, 2941 length_of=length_of, 2942 fmt=fmt, 2943 count_of=count_of, 2944 adjust=adjust 2945 ) 2946 2947 2948class FlagValueIter(object): 2949 2950 __slots__ = ["flagvalue", "cursor"] 2951 2952 def __init__(self, flagvalue): 2953 # type: (FlagValue) -> None 2954 self.flagvalue = flagvalue 2955 self.cursor = 0 2956 2957 def __iter__(self): 2958 # type: () -> FlagValueIter 2959 return self 2960 2961 def __next__(self): 2962 # type: () -> str 2963 x = int(self.flagvalue) 2964 x >>= self.cursor 2965 while x: 2966 self.cursor += 1 2967 if x & 1: 2968 return self.flagvalue.names[self.cursor - 1] 2969 x >>= 1 2970 raise StopIteration 2971 2972 next = __next__ 2973 2974 2975class FlagValue(object): 2976 __slots__ = ["value", "names", "multi"] 2977 2978 def _fixvalue(self, value): 2979 # type: (Any) -> int 2980 if not value: 2981 return 0 2982 if isinstance(value, str): 2983 value = value.split('+') if self.multi else list(value) 2984 if isinstance(value, list): 2985 y = 0 2986 for i in value: 2987 y |= 1 << self.names.index(i) 2988 value = y 2989 return int(value) 2990 2991 def __init__(self, value, names): 2992 # type: (Union[List[str], int, str], Union[List[str], str]) -> None 2993 self.multi = isinstance(names, list) 2994 self.names = names 2995 self.value = self._fixvalue(value) 2996 2997 def __hash__(self): 2998 # type: () -> int 2999 return hash(self.value) 3000 3001 def __int__(self): 3002 # type: () -> int 3003 return self.value 3004 3005 def __eq__(self, other): 3006 # type: (Any) -> bool 3007 return self.value == self._fixvalue(other) 3008 3009 def __lt__(self, other): 3010 # type: (Any) -> bool 3011 return self.value < self._fixvalue(other) 3012 3013 def __le__(self, other): 3014 # type: (Any) -> bool 3015 return self.value <= self._fixvalue(other) 3016 3017 def __gt__(self, other): 3018 # type: (Any) -> bool 3019 return self.value > self._fixvalue(other) 3020 3021 def __ge__(self, other): 3022 # type: (Any) -> bool 3023 return self.value >= self._fixvalue(other) 3024 3025 def __ne__(self, other): 3026 # type: (Any) -> bool 3027 return self.value != self._fixvalue(other) 3028 3029 def __and__(self, other): 3030 # type: (int) -> FlagValue 3031 return self.__class__(self.value & self._fixvalue(other), self.names) 3032 __rand__ = __and__ 3033 3034 def __or__(self, other): 3035 # type: (int) -> FlagValue 3036 return self.__class__(self.value | self._fixvalue(other), self.names) 3037 __ror__ = __or__ 3038 __add__ = __or__ # + is an alias for | 3039 3040 def __sub__(self, other): 3041 # type: (int) -> FlagValue 3042 return self.__class__( 3043 self.value & (2 ** len(self.names) - 1 - self._fixvalue(other)), 3044 self.names 3045 ) 3046 3047 def __xor__(self, other): 3048 # type: (int) -> FlagValue 3049 return self.__class__(self.value ^ self._fixvalue(other), self.names) 3050 3051 def __lshift__(self, other): 3052 # type: (int) -> int 3053 return self.value << self._fixvalue(other) 3054 3055 def __rshift__(self, other): 3056 # type: (int) -> int 3057 return self.value >> self._fixvalue(other) 3058 3059 def __nonzero__(self): 3060 # type: () -> bool 3061 return bool(self.value) 3062 __bool__ = __nonzero__ 3063 3064 def flagrepr(self): 3065 # type: () -> str 3066 warnings.warn( 3067 "obj.flagrepr() is obsolete. Use str(obj) instead.", 3068 DeprecationWarning 3069 ) 3070 return str(self) 3071 3072 def __str__(self): 3073 # type: () -> str 3074 i = 0 3075 r = [] 3076 x = int(self) 3077 while x: 3078 if x & 1: 3079 try: 3080 name = self.names[i] 3081 except IndexError: 3082 name = "?" 3083 r.append(name) 3084 i += 1 3085 x >>= 1 3086 return ("+" if self.multi else "").join(r) 3087 3088 def __iter__(self): 3089 # type: () -> FlagValueIter 3090 return FlagValueIter(self) 3091 3092 def __repr__(self): 3093 # type: () -> str 3094 return "<Flag %d (%s)>" % (self, self) 3095 3096 def __deepcopy__(self, memo): 3097 # type: (Dict[Any, Any]) -> FlagValue 3098 return self.__class__(int(self), self.names) 3099 3100 def __getattr__(self, attr): 3101 # type: (str) -> Any 3102 if attr in self.__slots__: 3103 return super(FlagValue, self).__getattribute__(attr) 3104 try: 3105 if self.multi: 3106 return bool((2 ** self.names.index(attr)) & int(self)) 3107 return all(bool((2 ** self.names.index(flag)) & int(self)) 3108 for flag in attr) 3109 except ValueError: 3110 if '_' in attr: 3111 try: 3112 return self.__getattr__(attr.replace('_', '-')) 3113 except AttributeError: 3114 pass 3115 return super(FlagValue, self).__getattribute__(attr) 3116 3117 def __setattr__(self, attr, value): 3118 # type: (str, Union[List[str], int, str]) -> None 3119 if attr == "value" and not isinstance(value, int): 3120 raise ValueError(value) 3121 if attr in self.__slots__: 3122 return super(FlagValue, self).__setattr__(attr, value) 3123 if attr in self.names: 3124 if value: 3125 self.value |= (2 ** self.names.index(attr)) 3126 else: 3127 self.value &= ~(2 ** self.names.index(attr)) 3128 else: 3129 return super(FlagValue, self).__setattr__(attr, value) 3130 3131 def copy(self): 3132 # type: () -> FlagValue 3133 return self.__class__(self.value, self.names) 3134 3135 3136class FlagsField(_BitField[Optional[Union[int, FlagValue]]]): 3137 """ Handle Flag type field 3138 3139 Make sure all your flags have a label 3140 3141 Example (list): 3142 >>> from scapy.packet import Packet 3143 >>> class FlagsTest(Packet): 3144 fields_desc = [FlagsField("flags", 0, 8, ["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7"])] # noqa: E501 3145 >>> FlagsTest(flags=9).show2() 3146 ###[ FlagsTest ]### 3147 flags = f0+f3 3148 3149 Example (str): 3150 >>> from scapy.packet import Packet 3151 >>> class TCPTest(Packet): 3152 fields_desc = [ 3153 BitField("reserved", 0, 7), 3154 FlagsField("flags", 0x2, 9, "FSRPAUECN") 3155 ] 3156 >>> TCPTest(flags=3).show2() 3157 ###[ FlagsTest ]### 3158 reserved = 0 3159 flags = FS 3160 3161 Example (dict): 3162 >>> from scapy.packet import Packet 3163 >>> class FlagsTest2(Packet): 3164 fields_desc = [ 3165 FlagsField("flags", 0x2, 16, { 3166 0x0001: "A", 3167 0x0008: "B", 3168 }) 3169 ] 3170 3171 :param name: field's name 3172 :param default: default value for the field 3173 :param size: number of bits in the field (in bits). if negative, LE 3174 :param names: (list or str or dict) label for each flag 3175 If it's a str or a list, the least Significant Bit tag's name 3176 is written first. 3177 """ 3178 ismutable = True 3179 __slots__ = ["names"] 3180 3181 def __init__(self, 3182 name, # type: str 3183 default, # type: Optional[Union[int, FlagValue]] 3184 size, # type: int 3185 names, # type: Union[List[str], str, Dict[int, str]] 3186 **kwargs # type: Any 3187 ): 3188 # type: (...) -> None 3189 # Convert the dict to a list 3190 if isinstance(names, dict): 3191 tmp = ["bit_%d" % i for i in range(abs(size))] 3192 for i, v in names.items(): 3193 tmp[int(math.floor(math.log(i, 2)))] = v 3194 names = tmp 3195 # Store the names as str or list 3196 self.names = names 3197 super(FlagsField, self).__init__(name, default, size, **kwargs) 3198 3199 def _fixup_val(self, x): 3200 # type: (Any) -> Optional[FlagValue] 3201 """Returns a FlagValue instance when needed. Internal method, to be 3202used in *2i() and i2*() methods. 3203 3204 """ 3205 if isinstance(x, (FlagValue, VolatileValue)): 3206 return x # type: ignore 3207 if x is None: 3208 return None 3209 return FlagValue(x, self.names) 3210 3211 def any2i(self, pkt, x): 3212 # type: (Optional[Packet], Any) -> Optional[FlagValue] 3213 return self._fixup_val(super(FlagsField, self).any2i(pkt, x)) 3214 3215 def m2i(self, pkt, x): 3216 # type: (Optional[Packet], int) -> Optional[FlagValue] 3217 return self._fixup_val(super(FlagsField, self).m2i(pkt, x)) 3218 3219 def i2h(self, pkt, x): 3220 # type: (Optional[Packet], Any) -> Optional[FlagValue] 3221 return self._fixup_val(super(FlagsField, self).i2h(pkt, x)) 3222 3223 def i2repr(self, 3224 pkt, # type: Optional[Packet] 3225 x, # type: Any 3226 ): 3227 # type: (...) -> str 3228 if isinstance(x, (list, tuple)): 3229 return repr(type(x)( 3230 "None" if v is None else str(self._fixup_val(v)) for v in x 3231 )) 3232 return "None" if x is None else str(self._fixup_val(x)) 3233 3234 3235MultiFlagsEntry = collections.namedtuple('MultiFlagsEntry', ['short', 'long']) 3236 3237 3238class MultiFlagsField(_BitField[Set[str]]): 3239 __slots__ = FlagsField.__slots__ + ["depends_on"] 3240 3241 def __init__(self, 3242 name, # type: str 3243 default, # type: Set[str] 3244 size, # type: int 3245 names, # type: Dict[int, Dict[int, MultiFlagsEntry]] 3246 depends_on, # type: Callable[[Optional[Packet]], int] 3247 ): 3248 # type: (...) -> None 3249 self.names = names 3250 self.depends_on = depends_on 3251 super(MultiFlagsField, self).__init__(name, default, size) 3252 3253 def any2i(self, pkt, x): 3254 # type: (Optional[Packet], Any) -> Set[str] 3255 if not isinstance(x, (set, int)): 3256 raise ValueError('set expected') 3257 3258 if pkt is not None: 3259 if isinstance(x, int): 3260 return self.m2i(pkt, x) 3261 else: 3262 v = self.depends_on(pkt) 3263 if v is not None: 3264 assert v in self.names, 'invalid dependency' 3265 these_names = self.names[v] 3266 s = set() 3267 for i in x: 3268 for val in these_names.values(): 3269 if val.short == i: 3270 s.add(i) 3271 break 3272 else: 3273 assert False, 'Unknown flag "{}" with this dependency'.format(i) # noqa: E501 3274 continue 3275 return s 3276 if isinstance(x, int): 3277 return set() 3278 return x 3279 3280 def i2m(self, pkt, x): 3281 # type: (Optional[Packet], Optional[Set[str]]) -> int 3282 v = self.depends_on(pkt) 3283 these_names = self.names.get(v, {}) 3284 3285 r = 0 3286 if x is None: 3287 return r 3288 for flag_set in x: 3289 for i, val in these_names.items(): 3290 if val.short == flag_set: 3291 r |= 1 << i 3292 break 3293 else: 3294 r |= 1 << int(flag_set[len('bit '):]) 3295 return r 3296 3297 def m2i(self, pkt, x): 3298 # type: (Optional[Packet], int) -> Set[str] 3299 v = self.depends_on(pkt) 3300 these_names = self.names.get(v, {}) 3301 3302 r = set() 3303 i = 0 3304 while x: 3305 if x & 1: 3306 if i in these_names: 3307 r.add(these_names[i].short) 3308 else: 3309 r.add('bit {}'.format(i)) 3310 x >>= 1 3311 i += 1 3312 return r 3313 3314 def i2repr(self, pkt, x): 3315 # type: (Optional[Packet], Set[str]) -> str 3316 v = self.depends_on(pkt) 3317 these_names = self.names.get(v, {}) 3318 3319 r = set() 3320 for flag_set in x: 3321 for i in these_names.values(): 3322 if i.short == flag_set: 3323 r.add("{} ({})".format(i.long, i.short)) 3324 break 3325 else: 3326 r.add(flag_set) 3327 return repr(r) 3328 3329 3330class FixedPointField(BitField): 3331 __slots__ = ['frac_bits'] 3332 3333 def __init__(self, name, default, size, frac_bits=16): 3334 # type: (str, int, int, int) -> None 3335 self.frac_bits = frac_bits 3336 super(FixedPointField, self).__init__(name, default, size) 3337 3338 def any2i(self, pkt, val): 3339 # type: (Optional[Packet], Optional[float]) -> Optional[int] 3340 if val is None: 3341 return val 3342 ival = int(val) 3343 fract = int((val - ival) * 2**self.frac_bits) 3344 return (ival << self.frac_bits) | fract 3345 3346 def i2h(self, pkt, val): 3347 # type: (Optional[Packet], Optional[int]) -> Optional[EDecimal] 3348 # A bit of trickery to get precise floats 3349 if val is None: 3350 return val 3351 int_part = val >> self.frac_bits 3352 pw = 2.0**self.frac_bits 3353 frac_part = EDecimal(val & (1 << self.frac_bits) - 1) 3354 frac_part /= pw # type: ignore 3355 return int_part + frac_part.normalize(int(math.log10(pw))) 3356 3357 def i2repr(self, pkt, val): 3358 # type: (Optional[Packet], int) -> str 3359 return str(self.i2h(pkt, val)) 3360 3361 3362# Base class for IPv4 and IPv6 Prefixes inspired by IPField and IP6Field. 3363# Machine values are encoded in a multiple of wordbytes bytes. 3364class _IPPrefixFieldBase(Field[Tuple[str, int], Tuple[bytes, int]]): 3365 __slots__ = ["wordbytes", "maxbytes", "aton", "ntoa", "length_from"] 3366 3367 def __init__( 3368 self, 3369 name, # type: str 3370 default, # type: Tuple[str, int] 3371 wordbytes, # type: int 3372 maxbytes, # type: int 3373 aton, # type: Callable[..., Any] 3374 ntoa, # type: Callable[..., Any] 3375 length_from=None # type: Optional[Callable[[Packet], int]] 3376 ): 3377 # type: (...) -> None 3378 self.wordbytes = wordbytes 3379 self.maxbytes = maxbytes 3380 self.aton = aton 3381 self.ntoa = ntoa 3382 Field.__init__(self, name, default, "%is" % self.maxbytes) 3383 if length_from is None: 3384 length_from = lambda x: 0 3385 self.length_from = length_from 3386 3387 def _numbytes(self, pfxlen): 3388 # type: (int) -> int 3389 wbits = self.wordbytes * 8 3390 return ((pfxlen + (wbits - 1)) // wbits) * self.wordbytes 3391 3392 def h2i(self, pkt, x): 3393 # type: (Optional[Packet], str) -> Tuple[str, int] 3394 # "fc00:1::1/64" -> ("fc00:1::1", 64) 3395 [pfx, pfxlen] = x.split('/') 3396 self.aton(pfx) # check for validity 3397 return (pfx, int(pfxlen)) 3398 3399 def i2h(self, pkt, x): 3400 # type: (Optional[Packet], Tuple[str, int]) -> str 3401 # ("fc00:1::1", 64) -> "fc00:1::1/64" 3402 (pfx, pfxlen) = x 3403 return "%s/%i" % (pfx, pfxlen) 3404 3405 def i2m(self, 3406 pkt, # type: Optional[Packet] 3407 x # type: Optional[Tuple[str, int]] 3408 ): 3409 # type: (...) -> Tuple[bytes, int] 3410 # ("fc00:1::1", 64) -> (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) # noqa: E501 3411 if x is None: 3412 pfx, pfxlen = "", 0 3413 else: 3414 (pfx, pfxlen) = x 3415 s = self.aton(pfx) 3416 return (s[:self._numbytes(pfxlen)], pfxlen) 3417 3418 def m2i(self, pkt, x): 3419 # type: (Optional[Packet], Tuple[bytes, int]) -> Tuple[str, int] 3420 # (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) -> ("fc00:1::1", 64) # noqa: E501 3421 (s, pfxlen) = x 3422 3423 if len(s) < self.maxbytes: 3424 s = s + (b"\0" * (self.maxbytes - len(s))) 3425 return (self.ntoa(s), pfxlen) 3426 3427 def any2i(self, pkt, x): 3428 # type: (Optional[Packet], Optional[Any]) -> Tuple[str, int] 3429 if x is None: 3430 return (self.ntoa(b"\0" * self.maxbytes), 1) 3431 3432 return self.h2i(pkt, x) 3433 3434 def i2len(self, pkt, x): 3435 # type: (Packet, Tuple[str, int]) -> int 3436 (_, pfxlen) = x 3437 return pfxlen 3438 3439 def addfield(self, pkt, s, val): 3440 # type: (Packet, bytes, Optional[Tuple[str, int]]) -> bytes 3441 (rawpfx, pfxlen) = self.i2m(pkt, val) 3442 fmt = "!%is" % self._numbytes(pfxlen) 3443 return s + struct.pack(fmt, rawpfx) 3444 3445 def getfield(self, pkt, s): 3446 # type: (Packet, bytes) -> Tuple[bytes, Tuple[str, int]] 3447 pfxlen = self.length_from(pkt) 3448 numbytes = self._numbytes(pfxlen) 3449 fmt = "!%is" % numbytes 3450 return s[numbytes:], self.m2i(pkt, (struct.unpack(fmt, s[:numbytes])[0], pfxlen)) # noqa: E501 3451 3452 3453class IPPrefixField(_IPPrefixFieldBase): 3454 def __init__( 3455 self, 3456 name, # type: str 3457 default, # type: Tuple[str, int] 3458 wordbytes=1, # type: int 3459 length_from=None # type: Optional[Callable[[Packet], int]] 3460 ): 3461 _IPPrefixFieldBase.__init__( 3462 self, 3463 name, 3464 default, 3465 wordbytes, 3466 4, 3467 inet_aton, 3468 inet_ntoa, 3469 length_from 3470 ) 3471 3472 3473class IP6PrefixField(_IPPrefixFieldBase): 3474 def __init__( 3475 self, 3476 name, # type: str 3477 default, # type: Tuple[str, int] 3478 wordbytes=1, # type: int 3479 length_from=None # type: Optional[Callable[[Packet], int]] 3480 ): 3481 # type: (...) -> None 3482 _IPPrefixFieldBase.__init__( 3483 self, 3484 name, 3485 default, 3486 wordbytes, 3487 16, 3488 lambda a: inet_pton(socket.AF_INET6, a), 3489 lambda n: inet_ntop(socket.AF_INET6, n), 3490 length_from 3491 ) 3492 3493 3494class UTCTimeField(Field[float, int]): 3495 __slots__ = ["epoch", "delta", "strf", 3496 "use_msec", "use_micro", "use_nano", "custom_scaling"] 3497 3498 def __init__(self, 3499 name, # type: str 3500 default, # type: int 3501 use_msec=False, # type: bool 3502 use_micro=False, # type: bool 3503 use_nano=False, # type: bool 3504 epoch=None, # type: Optional[Tuple[int, int, int, int, int, int, int, int, int]] # noqa: E501 3505 strf="%a, %d %b %Y %H:%M:%S %z", # type: str 3506 custom_scaling=None, # type: Optional[int] 3507 fmt="I" # type: str 3508 ): 3509 # type: (...) -> None 3510 Field.__init__(self, name, default, fmt=fmt) 3511 mk_epoch = EPOCH if epoch is None else calendar.timegm(epoch) 3512 self.epoch = mk_epoch 3513 self.delta = mk_epoch - EPOCH 3514 self.strf = strf 3515 self.use_msec = use_msec 3516 self.use_micro = use_micro 3517 self.use_nano = use_nano 3518 self.custom_scaling = custom_scaling 3519 3520 def i2repr(self, pkt, x): 3521 # type: (Optional[Packet], float) -> str 3522 if x is None: 3523 x = time.time() - self.delta 3524 elif self.use_msec: 3525 x = x / 1e3 3526 elif self.use_micro: 3527 x = x / 1e6 3528 elif self.use_nano: 3529 x = x / 1e9 3530 elif self.custom_scaling: 3531 x = x / self.custom_scaling 3532 x += self.delta 3533 # To make negative timestamps work on all plateforms (e.g. Windows), 3534 # we need a trick. 3535 t = ( 3536 datetime.datetime(1970, 1, 1) + 3537 datetime.timedelta(seconds=x) 3538 ).strftime(self.strf) 3539 return "%s (%d)" % (t, int(x)) 3540 3541 def i2m(self, pkt, x): 3542 # type: (Optional[Packet], Optional[float]) -> int 3543 if x is None: 3544 x = time.time() - self.delta 3545 if self.use_msec: 3546 x = x * 1e3 3547 elif self.use_micro: 3548 x = x * 1e6 3549 elif self.use_nano: 3550 x = x * 1e9 3551 elif self.custom_scaling: 3552 x = x * self.custom_scaling 3553 return int(x) 3554 return int(x) 3555 3556 3557class SecondsIntField(Field[float, int]): 3558 __slots__ = ["use_msec", "use_micro", "use_nano"] 3559 3560 def __init__(self, name, default, 3561 use_msec=False, 3562 use_micro=False, 3563 use_nano=False): 3564 # type: (str, int, bool, bool, bool) -> None 3565 Field.__init__(self, name, default, "I") 3566 self.use_msec = use_msec 3567 self.use_micro = use_micro 3568 self.use_nano = use_nano 3569 3570 def i2repr(self, pkt, x): 3571 # type: (Optional[Packet], Optional[float]) -> str 3572 if x is None: 3573 y = 0 # type: Union[int, float] 3574 elif self.use_msec: 3575 y = x / 1e3 3576 elif self.use_micro: 3577 y = x / 1e6 3578 elif self.use_nano: 3579 y = x / 1e9 3580 else: 3581 y = x 3582 return "%s sec" % y 3583 3584 3585class _ScalingField(object): 3586 def __init__(self, 3587 name, # type: str 3588 default, # type: float 3589 scaling=1, # type: Union[int, float] 3590 unit="", # type: str 3591 offset=0, # type: Union[int, float] 3592 ndigits=3, # type: int 3593 fmt="B", # type: str 3594 ): 3595 # type: (...) -> None 3596 self.scaling = scaling 3597 self.unit = unit 3598 self.offset = offset 3599 self.ndigits = ndigits 3600 Field.__init__(self, name, default, fmt) # type: ignore 3601 3602 def i2m(self, 3603 pkt, # type: Optional[Packet] 3604 x # type: Optional[Union[int, float]] 3605 ): 3606 # type: (...) -> Union[int, float] 3607 if x is None: 3608 x = 0 3609 x = (x - self.offset) / self.scaling 3610 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore 3611 x = int(round(x)) 3612 return x 3613 3614 def m2i(self, pkt, x): 3615 # type: (Optional[Packet], Union[int, float]) -> Union[int, float] 3616 x = x * self.scaling + self.offset 3617 if isinstance(x, float) and self.fmt[-1] != "f": # type: ignore 3618 x = round(x, self.ndigits) 3619 return x 3620 3621 def any2i(self, pkt, x): 3622 # type: (Optional[Packet], Any) -> Union[int, float] 3623 if isinstance(x, (str, bytes)): 3624 x = struct.unpack(self.fmt, bytes_encode(x))[0] # type: ignore 3625 x = self.m2i(pkt, x) 3626 if not isinstance(x, (int, float)): 3627 raise ValueError("Unknown type") 3628 return x 3629 3630 def i2repr(self, pkt, x): 3631 # type: (Optional[Packet], Union[int, float]) -> str 3632 return "%s %s" % ( 3633 self.i2h(pkt, x), # type: ignore 3634 self.unit 3635 ) 3636 3637 def randval(self): 3638 # type: () -> RandFloat 3639 value = Field.randval(self) # type: ignore 3640 if value is not None: 3641 min_val = round(value.min * self.scaling + self.offset, 3642 self.ndigits) 3643 max_val = round(value.max * self.scaling + self.offset, 3644 self.ndigits) 3645 3646 return RandFloat(min(min_val, max_val), max(min_val, max_val)) 3647 3648 3649class ScalingField(_ScalingField, 3650 Field[Union[int, float], Union[int, float]]): 3651 """ Handle physical values which are scaled and/or offset for communication 3652 3653 Example: 3654 >>> from scapy.packet import Packet 3655 >>> class ScalingFieldTest(Packet): 3656 fields_desc = [ScalingField('data', 0, scaling=0.1, offset=-1, unit='mV')] # noqa: E501 3657 >>> ScalingFieldTest(data=10).show2() 3658 ###[ ScalingFieldTest ]### 3659 data= 10.0 mV 3660 >>> hexdump(ScalingFieldTest(data=10)) 3661 0000 6E n 3662 >>> hexdump(ScalingFieldTest(data=b"\x6D")) 3663 0000 6D m 3664 >>> ScalingFieldTest(data=b"\x6D").show2() 3665 ###[ ScalingFieldTest ]### 3666 data= 9.9 mV 3667 3668 bytes(ScalingFieldTest(...)) will produce 0x6E in this example. 3669 0x6E is 110 (decimal). This is calculated through the scaling factor 3670 and the offset. "data" was set to 10, which means, we want to transfer 3671 the physical value 10 mV. To calculate the value, which has to be 3672 sent on the bus, the offset has to subtracted and the scaling has to be 3673 applied by division through the scaling factor. 3674 bytes = (data - offset) / scaling 3675 bytes = ( 10 - (-1) ) / 0.1 3676 bytes = 110 = 0x6E 3677 3678 If you want to force a certain internal value, you can assign a byte- 3679 string to the field (data=b"\x6D"). If a string of a bytes object is 3680 given to the field, no internal value conversion will be applied 3681 3682 :param name: field's name 3683 :param default: default value for the field 3684 :param scaling: scaling factor for the internal value conversion 3685 :param unit: string for the unit representation of the internal value 3686 :param offset: value to offset the internal value during conversion 3687 :param ndigits: number of fractional digits for the internal conversion 3688 :param fmt: struct.pack format used to parse and serialize the internal value from and to machine representation # noqa: E501 3689 """ 3690 3691 3692class BitScalingField(_ScalingField, BitField): # type: ignore 3693 """ 3694 A ScalingField that is a BitField 3695 """ 3696 3697 def __init__(self, name, default, size, *args, **kwargs): 3698 # type: (str, int, int, *Any, **Any) -> None 3699 _ScalingField.__init__(self, name, default, *args, **kwargs) 3700 BitField.__init__(self, name, default, size) # type: ignore 3701 3702 3703class OUIField(X3BytesField): 3704 """ 3705 A field designed to carry a OUI (3 bytes) 3706 """ 3707 3708 def i2repr(self, pkt, val): 3709 # type: (Optional[Packet], int) -> str 3710 by_val = struct.pack("!I", val or 0)[1:] 3711 oui = str2mac(by_val + b"\0" * 3)[:8] 3712 if conf.manufdb: 3713 fancy = conf.manufdb._get_manuf(oui) 3714 if fancy != oui: 3715 return "%s (%s)" % (fancy, oui) 3716 return oui 3717 3718 3719class UUIDField(Field[UUID, bytes]): 3720 """Field for UUID storage, wrapping Python's uuid.UUID type. 3721 3722 The internal storage format of this field is ``uuid.UUID`` from the Python 3723 standard library. 3724 3725 There are three formats (``uuid_fmt``) for this field type: 3726 3727 * ``FORMAT_BE`` (default): the UUID is six fields in big-endian byte order, 3728 per RFC 4122. 3729 3730 This format is used by DHCPv6 (RFC 6355) and most network protocols. 3731 3732 * ``FORMAT_LE``: the UUID is six fields, with ``time_low``, ``time_mid`` 3733 and ``time_high_version`` in little-endian byte order. This *doesn't* 3734 change the arrangement of the fields from RFC 4122. 3735 3736 This format is used by Microsoft's COM/OLE libraries. 3737 3738 * ``FORMAT_REV``: the UUID is a single 128-bit integer in little-endian 3739 byte order. This *changes the arrangement* of the fields. 3740 3741 This format is used by Bluetooth Low Energy. 3742 3743 Note: You should use the constants here. 3744 3745 The "human encoding" of this field supports a number of different input 3746 formats, and wraps Python's ``uuid.UUID`` library appropriately: 3747 3748 * Given a bytearray, bytes or str of 16 bytes, this class decodes UUIDs in 3749 wire format. 3750 3751 * Given a bytearray, bytes or str of other lengths, this delegates to 3752 ``uuid.UUID`` the Python standard library. This supports a number of 3753 different encoding options -- see the Python standard library 3754 documentation for more details. 3755 3756 * Given an int or long, presumed to be a 128-bit integer to pass to 3757 ``uuid.UUID``. 3758 3759 * Given a tuple: 3760 3761 * Tuples of 11 integers are treated as having the last 6 integers forming 3762 the ``node`` field, and are merged before being passed as a tuple of 6 3763 integers to ``uuid.UUID``. 3764 3765 * Otherwise, the tuple is passed as the ``fields`` parameter to 3766 ``uuid.UUID`` directly without modification. 3767 3768 ``uuid.UUID`` expects a tuple of 6 integers. 3769 3770 Other types (such as ``uuid.UUID``) are passed through. 3771 """ 3772 3773 __slots__ = ["uuid_fmt"] 3774 3775 FORMAT_BE = 0 3776 FORMAT_LE = 1 3777 FORMAT_REV = 2 3778 3779 # Change this when we get new formats 3780 FORMATS = (FORMAT_BE, FORMAT_LE, FORMAT_REV) 3781 3782 def __init__(self, name, default, uuid_fmt=FORMAT_BE): 3783 # type: (str, Optional[int], int) -> None 3784 self.uuid_fmt = uuid_fmt 3785 self._check_uuid_fmt() 3786 Field.__init__(self, name, default, "16s") 3787 3788 def _check_uuid_fmt(self): 3789 # type: () -> None 3790 """Checks .uuid_fmt, and raises an exception if it is not valid.""" 3791 if self.uuid_fmt not in UUIDField.FORMATS: 3792 raise FieldValueRangeException( 3793 "Unsupported uuid_fmt ({})".format(self.uuid_fmt)) 3794 3795 def i2m(self, pkt, x): 3796 # type: (Optional[Packet], Optional[UUID]) -> bytes 3797 self._check_uuid_fmt() 3798 if x is None: 3799 return b'\0' * 16 3800 if self.uuid_fmt == UUIDField.FORMAT_BE: 3801 return x.bytes 3802 elif self.uuid_fmt == UUIDField.FORMAT_LE: 3803 return x.bytes_le 3804 elif self.uuid_fmt == UUIDField.FORMAT_REV: 3805 return x.bytes[::-1] 3806 else: 3807 raise FieldAttributeException("Unknown fmt") 3808 3809 def m2i(self, 3810 pkt, # type: Optional[Packet] 3811 x, # type: bytes 3812 ): 3813 # type: (...) -> UUID 3814 self._check_uuid_fmt() 3815 if self.uuid_fmt == UUIDField.FORMAT_BE: 3816 return UUID(bytes=x) 3817 elif self.uuid_fmt == UUIDField.FORMAT_LE: 3818 return UUID(bytes_le=x) 3819 elif self.uuid_fmt == UUIDField.FORMAT_REV: 3820 return UUID(bytes=x[::-1]) 3821 else: 3822 raise FieldAttributeException("Unknown fmt") 3823 3824 def any2i(self, 3825 pkt, # type: Optional[Packet] 3826 x # type: Any # noqa: E501 3827 ): 3828 # type: (...) -> Optional[UUID] 3829 # Python's uuid doesn't handle bytearray, so convert to an immutable 3830 # type first. 3831 if isinstance(x, bytearray): 3832 x = bytes_encode(x) 3833 3834 if isinstance(x, int): 3835 u = UUID(int=x) 3836 elif isinstance(x, tuple): 3837 if len(x) == 11: 3838 # For compatibility with dce_rpc: this packs into a tuple where 3839 # elements 7..10 are the 48-bit node ID. 3840 node = 0 3841 for i in x[5:]: 3842 node = (node << 8) | i 3843 3844 x = (x[0], x[1], x[2], x[3], x[4], node) 3845 3846 u = UUID(fields=x) 3847 elif isinstance(x, (str, bytes)): 3848 if len(x) == 16: 3849 # Raw bytes 3850 u = self.m2i(pkt, bytes_encode(x)) 3851 else: 3852 u = UUID(plain_str(x)) 3853 elif isinstance(x, (UUID, RandUUID)): 3854 u = cast(UUID, x) 3855 else: 3856 return None 3857 return u 3858 3859 @staticmethod 3860 def randval(): 3861 # type: () -> RandUUID 3862 return RandUUID() 3863 3864 3865class UUIDEnumField(UUIDField, _EnumField[UUID]): 3866 __slots__ = EnumField.__slots__ 3867 3868 def __init__(self, name, default, enum, uuid_fmt=0): 3869 # type: (str, Optional[int], Any, int) -> None 3870 _EnumField.__init__(self, name, default, enum, "16s") # type: ignore 3871 UUIDField.__init__(self, name, default, uuid_fmt=uuid_fmt) 3872 3873 def any2i(self, pkt, x): 3874 # type: (Optional[Packet], Any) -> UUID 3875 return _EnumField.any2i(self, pkt, x) # type: ignore 3876 3877 def i2repr(self, 3878 pkt, # type: Optional[Packet] 3879 x, # type: UUID 3880 ): 3881 # type: (...) -> Any 3882 return _EnumField.i2repr(self, pkt, x) 3883 3884 3885class BitExtendedField(Field[Optional[int], bytes]): 3886 """ 3887 Bit Extended Field 3888 3889 This type of field has a variable number of bytes. Each byte is defined 3890 as follows: 3891 - 7 bits of data 3892 - 1 bit an an extension bit: 3893 3894 + 0 means it is last byte of the field ("stopping bit") 3895 + 1 means there is another byte after this one ("forwarding bit") 3896 3897 To get the actual data, it is necessary to hop the binary data byte per 3898 byte and to check the extension bit until 0 3899 """ 3900 3901 __slots__ = ["extension_bit"] 3902 3903 def prepare_byte(self, x): 3904 # type: (int) -> int 3905 # Moves the forwarding bit to the LSB 3906 x = int(x) 3907 fx_bit = (x & 2**self.extension_bit) >> self.extension_bit 3908 lsb_bits = x & 2**self.extension_bit - 1 3909 msb_bits = x >> (self.extension_bit + 1) 3910 x = (msb_bits << (self.extension_bit + 1)) + (lsb_bits << 1) + fx_bit 3911 return x 3912 3913 def str2extended(self, x=b""): 3914 # type: (bytes) -> Tuple[bytes, Optional[int]] 3915 # For convenience, we reorder the byte so that the forwarding 3916 # bit is always the LSB. We then apply the same algorithm 3917 # whatever the real forwarding bit position 3918 3919 # First bit is the stopping bit at zero 3920 bits = 0b0 3921 end = None 3922 3923 # We retrieve 7 bits. 3924 # If "forwarding bit" is 1 then we continue on another byte 3925 i = 0 3926 for c in bytearray(x): 3927 c = self.prepare_byte(c) 3928 bits = bits << 7 | (int(c) >> 1) 3929 if not int(c) & 0b1: 3930 end = x[i + 1:] 3931 break 3932 i = i + 1 3933 if end is None: 3934 # We reached the end of the data but there was no 3935 # "ending bit". This is not normal. 3936 return b"", None 3937 else: 3938 return end, bits 3939 3940 def extended2str(self, x): 3941 # type: (Optional[int]) -> bytes 3942 if x is None: 3943 return b"" 3944 x = int(x) 3945 s = [] 3946 LSByte = True 3947 FX_Missing = True 3948 bits = 0b0 3949 i = 0 3950 while (x > 0 or FX_Missing): 3951 if i == 8: 3952 # End of byte 3953 i = 0 3954 s.append(bits) 3955 bits = 0b0 3956 FX_Missing = True 3957 else: 3958 if i % 8 == self.extension_bit: 3959 # This is extension bit 3960 if LSByte: 3961 bits = bits | 0b0 << i 3962 LSByte = False 3963 else: 3964 bits = bits | 0b1 << i 3965 FX_Missing = False 3966 else: 3967 bits = bits | (x & 0b1) << i 3968 x = x >> 1 3969 # Still some bits 3970 i = i + 1 3971 s.append(bits) 3972 3973 result = "".encode() 3974 for x in s[:: -1]: 3975 result = result + struct.pack(">B", x) 3976 return result 3977 3978 def __init__(self, name, default, extension_bit): 3979 # type: (str, Optional[Any], int) -> None 3980 Field.__init__(self, name, default, "B") 3981 self.extension_bit = extension_bit 3982 3983 def i2m(self, pkt, x): 3984 # type: (Optional[Any], Optional[int]) -> bytes 3985 return self.extended2str(x) 3986 3987 def m2i(self, pkt, x): 3988 # type: (Optional[Any], bytes) -> Optional[int] 3989 return self.str2extended(x)[1] 3990 3991 def addfield(self, pkt, s, val): 3992 # type: (Optional[Packet], bytes, Optional[int]) -> bytes 3993 return s + self.i2m(pkt, val) 3994 3995 def getfield(self, pkt, s): 3996 # type: (Optional[Any], bytes) -> Tuple[bytes, Optional[int]] 3997 return self.str2extended(s) 3998 3999 4000class LSBExtendedField(BitExtendedField): 4001 # This is a BitExtendedField with the extension bit on LSB 4002 def __init__(self, name, default): 4003 # type: (str, Optional[Any]) -> None 4004 BitExtendedField.__init__(self, name, default, extension_bit=0) 4005 4006 4007class MSBExtendedField(BitExtendedField): 4008 # This is a BitExtendedField with the extension bit on MSB 4009 def __init__(self, name, default): 4010 # type: (str, Optional[Any]) -> None 4011 BitExtendedField.__init__(self, name, default, extension_bit=7) 4012