1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7Packet class 8 9Provides: 10 - the default Packet classes 11 - binding mechanisms 12 - fuzz() method 13 - exploration methods: explore() / ls() 14""" 15 16from collections import defaultdict 17 18import json 19import re 20import time 21import itertools 22import copy 23import types 24import warnings 25 26from scapy.fields import ( 27 AnyField, 28 BitField, 29 ConditionalField, 30 Emph, 31 EnumField, 32 Field, 33 FlagsField, 34 FlagValue, 35 MayEnd, 36 MultiEnumField, 37 MultipleTypeField, 38 PadField, 39 PacketListField, 40 RawVal, 41 StrField, 42) 43from scapy.config import conf, _version_checker 44from scapy.compat import raw, orb, bytes_encode 45from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \ 46 _CanvasDumpExtended 47from scapy.interfaces import _GlobInterfaceType 48from scapy.volatile import RandField, VolatileValue 49from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \ 50 pretty_list, EDecimal 51from scapy.error import Scapy_Exception, log_runtime, warning 52from scapy.libs.test_pyx import PYX 53 54# Typing imports 55from typing import ( 56 Any, 57 Callable, 58 Dict, 59 Iterator, 60 List, 61 NoReturn, 62 Optional, 63 Set, 64 Tuple, 65 Type, 66 TypeVar, 67 Union, 68 Sequence, 69 cast, 70) 71from scapy.compat import Self 72 73try: 74 import pyx 75except ImportError: 76 pass 77 78 79_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]]) 80 81 82class Packet( 83 BasePacket, 84 _CanvasDumpExtended, 85 metaclass=Packet_metaclass 86): 87 __slots__ = [ 88 "time", "sent_time", "name", 89 "default_fields", "fields", "fieldtype", 90 "overload_fields", "overloaded_fields", 91 "packetfields", 92 "original", "explicit", "raw_packet_cache", 93 "raw_packet_cache_fields", "_pkt", "post_transforms", 94 "stop_dissection_after", 95 # then payload, underlayer and parent 96 "payload", "underlayer", "parent", 97 "name", 98 # used for sr() 99 "_answered", 100 # used when sniffing 101 "direction", "sniffed_on", 102 # handle snaplen Vs real length 103 "wirelen", 104 "comment", 105 "process_information" 106 ] 107 name = None 108 fields_desc = [] # type: List[AnyField] 109 deprecated_fields = {} # type: Dict[str, Tuple[str, str]] 110 overload_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 111 payload_guess = [] # type: List[Tuple[Dict[str, Any], Type[Packet]]] 112 show_indent = 1 113 show_summary = True 114 match_subclass = False 115 class_dont_cache = {} # type: Dict[Type[Packet], bool] 116 class_packetfields = {} # type: Dict[Type[Packet], Any] 117 class_default_fields = {} # type: Dict[Type[Packet], Dict[str, Any]] 118 class_default_fields_ref = {} # type: Dict[Type[Packet], List[str]] 119 class_fieldtype = {} # type: Dict[Type[Packet], Dict[str, AnyField]] # noqa: E501 120 121 @classmethod 122 def from_hexcap(cls): 123 # type: (Type[Packet]) -> Packet 124 return cls(import_hexcap()) 125 126 @classmethod 127 def upper_bonds(self): 128 # type: () -> None 129 for fval, upper in self.payload_guess: 130 print( 131 "%-20s %s" % ( 132 upper.__name__, 133 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 134 ) 135 ) 136 137 @classmethod 138 def lower_bonds(self): 139 # type: () -> None 140 for lower, fval in self._overload_fields.items(): 141 print( 142 "%-20s %s" % ( 143 lower.__name__, 144 ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()), 145 ) 146 ) 147 148 def __init__(self, 149 _pkt=b"", # type: Union[bytes, bytearray] 150 post_transform=None, # type: Any 151 _internal=0, # type: int 152 _underlayer=None, # type: Optional[Packet] 153 _parent=None, # type: Optional[Packet] 154 stop_dissection_after=None, # type: Optional[Type[Packet]] 155 **fields # type: Any 156 ): 157 # type: (...) -> None 158 self.time = time.time() # type: Union[EDecimal, float] 159 self.sent_time = None # type: Union[EDecimal, float, None] 160 self.name = (self.__class__.__name__ 161 if self._name is None else 162 self._name) 163 self.default_fields = {} # type: Dict[str, Any] 164 self.overload_fields = self._overload_fields 165 self.overloaded_fields = {} # type: Dict[str, Any] 166 self.fields = {} # type: Dict[str, Any] 167 self.fieldtype = {} # type: Dict[str, AnyField] 168 self.packetfields = [] # type: List[AnyField] 169 self.payload = NoPayload() # type: Packet 170 self.init_fields(bool(_pkt)) 171 self.underlayer = _underlayer 172 self.parent = _parent 173 if isinstance(_pkt, bytearray): 174 _pkt = bytes(_pkt) 175 self.original = _pkt 176 self.explicit = 0 177 self.raw_packet_cache = None # type: Optional[bytes] 178 self.raw_packet_cache_fields = None # type: Optional[Dict[str, Any]] # noqa: E501 179 self.wirelen = None # type: Optional[int] 180 self.direction = None # type: Optional[int] 181 self.sniffed_on = None # type: Optional[_GlobInterfaceType] 182 self.comment = None # type: Optional[bytes] 183 self.process_information = None # type: Optional[Dict[str, Any]] 184 self.stop_dissection_after = stop_dissection_after 185 if _pkt: 186 self.dissect(_pkt) 187 if not _internal: 188 self.dissection_done(self) 189 # We use this strange initialization so that the fields 190 # are initialized in their declaration order. 191 # It is required to always support MultipleTypeField 192 for field in self.fields_desc: 193 fname = field.name 194 try: 195 value = fields.pop(fname) 196 except KeyError: 197 continue 198 self.fields[fname] = value if isinstance(value, RawVal) else \ 199 self.get_field(fname).any2i(self, value) 200 # The remaining fields are unknown 201 for fname in fields: 202 if fname in self.deprecated_fields: 203 # Resolve deprecated fields 204 value = fields[fname] 205 fname = self._resolve_alias(fname) 206 self.fields[fname] = value if isinstance(value, RawVal) else \ 207 self.get_field(fname).any2i(self, value) 208 continue 209 raise AttributeError(fname) 210 if isinstance(post_transform, list): 211 self.post_transforms = post_transform 212 elif post_transform is None: 213 self.post_transforms = [] 214 else: 215 self.post_transforms = [post_transform] 216 217 _PickleType = Tuple[ 218 Union[EDecimal, float], 219 Optional[Union[EDecimal, float, None]], 220 Optional[int], 221 Optional[_GlobInterfaceType], 222 Optional[int], 223 Optional[bytes], 224 ] 225 226 def __reduce__(self): 227 # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType] 228 """Used by pickling methods""" 229 return (self.__class__, (self.build(),), ( 230 self.time, 231 self.sent_time, 232 self.direction, 233 self.sniffed_on, 234 self.wirelen, 235 self.comment 236 )) 237 238 def __setstate__(self, state): 239 # type: (Packet._PickleType) -> Packet 240 """Rebuild state using pickable methods""" 241 self.time = state[0] 242 self.sent_time = state[1] 243 self.direction = state[2] 244 self.sniffed_on = state[3] 245 self.wirelen = state[4] 246 self.comment = state[5] 247 return self 248 249 def __deepcopy__(self, 250 memo, # type: Any 251 ): 252 # type: (...) -> Packet 253 """Used by copy.deepcopy""" 254 return self.copy() 255 256 def init_fields(self, for_dissect_only=False): 257 # type: (bool) -> None 258 """ 259 Initialize each fields of the fields_desc dict 260 """ 261 262 if self.class_dont_cache.get(self.__class__, False): 263 self.do_init_fields(self.fields_desc) 264 else: 265 self.do_init_cached_fields(for_dissect_only=for_dissect_only) 266 267 def do_init_fields(self, 268 flist, # type: Sequence[AnyField] 269 ): 270 # type: (...) -> None 271 """ 272 Initialize each fields of the fields_desc dict 273 """ 274 default_fields = {} 275 for f in flist: 276 default_fields[f.name] = copy.deepcopy(f.default) 277 self.fieldtype[f.name] = f 278 if f.holds_packets: 279 self.packetfields.append(f) 280 # We set default_fields last to avoid race issues 281 self.default_fields = default_fields 282 283 def do_init_cached_fields(self, for_dissect_only=False): 284 # type: (bool) -> None 285 """ 286 Initialize each fields of the fields_desc dict, or use the cached 287 fields information 288 """ 289 290 cls_name = self.__class__ 291 292 # Build the fields information 293 if Packet.class_default_fields.get(cls_name, None) is None: 294 self.prepare_cached_fields(self.fields_desc) 295 296 # Use fields information from cache 297 default_fields = Packet.class_default_fields.get(cls_name, None) 298 if default_fields: 299 self.default_fields = default_fields 300 self.fieldtype = Packet.class_fieldtype[cls_name] 301 self.packetfields = Packet.class_packetfields[cls_name] 302 303 # Optimization: no need for references when only dissecting. 304 if for_dissect_only: 305 return 306 307 # Deepcopy default references 308 for fname in Packet.class_default_fields_ref[cls_name]: 309 value = self.default_fields[fname] 310 try: 311 self.fields[fname] = value.copy() 312 except AttributeError: 313 # Python 2.7 - list only 314 self.fields[fname] = value[:] 315 316 def prepare_cached_fields(self, flist): 317 # type: (Sequence[AnyField]) -> None 318 """ 319 Prepare the cached fields of the fields_desc dict 320 """ 321 322 cls_name = self.__class__ 323 324 # Fields cache initialization 325 if not flist: 326 return 327 328 class_default_fields = dict() 329 class_default_fields_ref = list() 330 class_fieldtype = dict() 331 class_packetfields = list() 332 333 # Fields initialization 334 for f in flist: 335 if isinstance(f, MultipleTypeField): 336 # Abort 337 self.class_dont_cache[cls_name] = True 338 self.do_init_fields(self.fields_desc) 339 return 340 341 class_default_fields[f.name] = copy.deepcopy(f.default) 342 class_fieldtype[f.name] = f 343 if f.holds_packets: 344 class_packetfields.append(f) 345 346 # Remember references 347 if isinstance(f.default, (list, dict, set, RandField, Packet)): 348 class_default_fields_ref.append(f.name) 349 350 # Apply 351 Packet.class_default_fields_ref[cls_name] = class_default_fields_ref 352 Packet.class_fieldtype[cls_name] = class_fieldtype 353 Packet.class_packetfields[cls_name] = class_packetfields 354 # Last to avoid racing issues 355 Packet.class_default_fields[cls_name] = class_default_fields 356 357 def dissection_done(self, pkt): 358 # type: (Packet) -> None 359 """DEV: will be called after a dissection is completed""" 360 self.post_dissection(pkt) 361 self.payload.dissection_done(pkt) 362 363 def post_dissection(self, pkt): 364 # type: (Packet) -> None 365 """DEV: is called after the dissection of the whole packet""" 366 pass 367 368 def get_field(self, fld): 369 # type: (str) -> AnyField 370 """DEV: returns the field instance from the name of the field""" 371 return self.fieldtype[fld] 372 373 def add_payload(self, payload): 374 # type: (Union[Packet, bytes]) -> None 375 if payload is None: 376 return 377 elif not isinstance(self.payload, NoPayload): 378 self.payload.add_payload(payload) 379 else: 380 if isinstance(payload, Packet): 381 self.payload = payload 382 payload.add_underlayer(self) 383 for t in self.aliastypes: 384 if t in payload.overload_fields: 385 self.overloaded_fields = payload.overload_fields[t] 386 break 387 elif isinstance(payload, (bytes, str, bytearray, memoryview)): 388 self.payload = conf.raw_layer(load=bytes_encode(payload)) 389 else: 390 raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload)) # noqa: E501 391 392 def remove_payload(self): 393 # type: () -> None 394 self.payload.remove_underlayer(self) 395 self.payload = NoPayload() 396 self.overloaded_fields = {} 397 398 def add_underlayer(self, underlayer): 399 # type: (Packet) -> None 400 self.underlayer = underlayer 401 402 def remove_underlayer(self, other): 403 # type: (Packet) -> None 404 self.underlayer = None 405 406 def add_parent(self, parent): 407 # type: (Packet) -> None 408 """Set packet parent. 409 When packet is an element in PacketListField, parent field would 410 point to the list owner packet.""" 411 self.parent = parent 412 413 def remove_parent(self, other): 414 # type: (Packet) -> None 415 """Remove packet parent. 416 When packet is an element in PacketListField, parent field would 417 point to the list owner packet.""" 418 self.parent = None 419 420 def copy(self) -> Self: 421 """Returns a deep copy of the instance.""" 422 clone = self.__class__() 423 clone.fields = self.copy_fields_dict(self.fields) 424 clone.default_fields = self.copy_fields_dict(self.default_fields) 425 clone.overloaded_fields = self.overloaded_fields.copy() 426 clone.underlayer = self.underlayer 427 clone.parent = self.parent 428 clone.explicit = self.explicit 429 clone.raw_packet_cache = self.raw_packet_cache 430 clone.raw_packet_cache_fields = self.copy_fields_dict( 431 self.raw_packet_cache_fields 432 ) 433 clone.wirelen = self.wirelen 434 clone.post_transforms = self.post_transforms[:] 435 clone.payload = self.payload.copy() 436 clone.payload.add_underlayer(clone) 437 clone.time = self.time 438 clone.comment = self.comment 439 clone.direction = self.direction 440 clone.sniffed_on = self.sniffed_on 441 return clone 442 443 def _resolve_alias(self, attr): 444 # type: (str) -> str 445 new_attr, version = self.deprecated_fields[attr] 446 warnings.warn( 447 "%s has been deprecated in favor of %s since %s !" % ( 448 attr, new_attr, version 449 ), DeprecationWarning 450 ) 451 return new_attr 452 453 def getfieldval(self, attr): 454 # type: (str) -> Any 455 if self.deprecated_fields and attr in self.deprecated_fields: 456 attr = self._resolve_alias(attr) 457 if attr in self.fields: 458 return self.fields[attr] 459 if attr in self.overloaded_fields: 460 return self.overloaded_fields[attr] 461 if attr in self.default_fields: 462 return self.default_fields[attr] 463 return self.payload.getfieldval(attr) 464 465 def getfield_and_val(self, attr): 466 # type: (str) -> Tuple[AnyField, Any] 467 if self.deprecated_fields and attr in self.deprecated_fields: 468 attr = self._resolve_alias(attr) 469 if attr in self.fields: 470 return self.get_field(attr), self.fields[attr] 471 if attr in self.overloaded_fields: 472 return self.get_field(attr), self.overloaded_fields[attr] 473 if attr in self.default_fields: 474 return self.get_field(attr), self.default_fields[attr] 475 raise ValueError 476 477 def __getattr__(self, attr): 478 # type: (str) -> Any 479 try: 480 fld, v = self.getfield_and_val(attr) 481 except ValueError: 482 return self.payload.__getattr__(attr) 483 if fld is not None: 484 return v if isinstance(v, RawVal) else fld.i2h(self, v) 485 return v 486 487 def setfieldval(self, attr, val): 488 # type: (str, Any) -> None 489 if self.deprecated_fields and attr in self.deprecated_fields: 490 attr = self._resolve_alias(attr) 491 if attr in self.default_fields: 492 fld = self.get_field(attr) 493 if fld is None: 494 any2i = lambda x, y: y # type: Callable[..., Any] 495 else: 496 any2i = fld.any2i 497 self.fields[attr] = val if isinstance(val, RawVal) else \ 498 any2i(self, val) 499 self.explicit = 0 500 self.raw_packet_cache = None 501 self.raw_packet_cache_fields = None 502 self.wirelen = None 503 elif attr == "payload": 504 self.remove_payload() 505 self.add_payload(val) 506 else: 507 self.payload.setfieldval(attr, val) 508 509 def __setattr__(self, attr, val): 510 # type: (str, Any) -> None 511 if attr in self.__all_slots__: 512 return object.__setattr__(self, attr, val) 513 try: 514 return self.setfieldval(attr, val) 515 except AttributeError: 516 pass 517 return object.__setattr__(self, attr, val) 518 519 def delfieldval(self, attr): 520 # type: (str) -> None 521 if attr in self.fields: 522 del self.fields[attr] 523 self.explicit = 0 # in case a default value must be explicit 524 self.raw_packet_cache = None 525 self.raw_packet_cache_fields = None 526 self.wirelen = None 527 elif attr in self.default_fields: 528 pass 529 elif attr == "payload": 530 self.remove_payload() 531 else: 532 self.payload.delfieldval(attr) 533 534 def __delattr__(self, attr): 535 # type: (str) -> None 536 if attr == "payload": 537 return self.remove_payload() 538 if attr in self.__all_slots__: 539 return object.__delattr__(self, attr) 540 try: 541 return self.delfieldval(attr) 542 except AttributeError: 543 pass 544 return object.__delattr__(self, attr) 545 546 def _superdir(self): 547 # type: () -> Set[str] 548 """ 549 Return a list of slots and methods, including those from subclasses. 550 """ 551 attrs = set() # type: Set[str] 552 cls = self.__class__ 553 if hasattr(cls, '__all_slots__'): 554 attrs.update(cls.__all_slots__) 555 for bcls in cls.__mro__: 556 if hasattr(bcls, '__dict__'): 557 attrs.update(bcls.__dict__) 558 return attrs 559 560 def __dir__(self): 561 # type: () -> List[str] 562 """ 563 Add fields to tab completion list. 564 """ 565 return sorted(itertools.chain(self._superdir(), self.default_fields)) 566 567 def __repr__(self): 568 # type: () -> str 569 s = "" 570 ct = conf.color_theme 571 for f in self.fields_desc: 572 if isinstance(f, ConditionalField) and not f._evalcond(self): 573 continue 574 if f.name in self.fields: 575 fval = self.fields[f.name] 576 if isinstance(fval, (list, dict, set)) and len(fval) == 0: 577 continue 578 val = f.i2repr(self, fval) 579 elif f.name in self.overloaded_fields: 580 fover = self.overloaded_fields[f.name] 581 if isinstance(fover, (list, dict, set)) and len(fover) == 0: 582 continue 583 val = f.i2repr(self, fover) 584 else: 585 continue 586 if isinstance(f, Emph) or f in conf.emph: 587 ncol = ct.emph_field_name 588 vcol = ct.emph_field_value 589 else: 590 ncol = ct.field_name 591 vcol = ct.field_value 592 593 s += " %s%s%s" % (ncol(f.name), 594 ct.punct("="), 595 vcol(val)) 596 return "%s%s %s %s%s%s" % (ct.punct("<"), 597 ct.layer_name(self.__class__.__name__), 598 s, 599 ct.punct("|"), 600 repr(self.payload), 601 ct.punct(">")) 602 603 def __str__(self): 604 # type: () -> str 605 return self.summary() 606 607 def __bytes__(self): 608 # type: () -> bytes 609 return self.build() 610 611 def __div__(self, other): 612 # type: (Any) -> Self 613 if isinstance(other, Packet): 614 cloneA = self.copy() 615 cloneB = other.copy() 616 cloneA.add_payload(cloneB) 617 return cloneA 618 elif isinstance(other, (bytes, str, bytearray, memoryview)): 619 return self / conf.raw_layer(load=bytes_encode(other)) 620 else: 621 return other.__rdiv__(self) # type: ignore 622 __truediv__ = __div__ 623 624 def __rdiv__(self, other): 625 # type: (Any) -> Packet 626 if isinstance(other, (bytes, str, bytearray, memoryview)): 627 return conf.raw_layer(load=bytes_encode(other)) / self 628 else: 629 raise TypeError 630 __rtruediv__ = __rdiv__ 631 632 def __mul__(self, other): 633 # type: (Any) -> List[Packet] 634 if isinstance(other, int): 635 return [self] * other 636 else: 637 raise TypeError 638 639 def __rmul__(self, other): 640 # type: (Any) -> List[Packet] 641 return self.__mul__(other) 642 643 def __nonzero__(self): 644 # type: () -> bool 645 return True 646 __bool__ = __nonzero__ 647 648 def __len__(self): 649 # type: () -> int 650 return len(self.__bytes__()) 651 652 def copy_field_value(self, fieldname, value): 653 # type: (str, Any) -> Any 654 return self.get_field(fieldname).do_copy(value) 655 656 def copy_fields_dict(self, fields): 657 # type: (_T) -> _T 658 if fields is None: 659 return None 660 return {fname: self.copy_field_value(fname, fval) 661 for fname, fval in fields.items()} 662 663 def _raw_packet_cache_field_value(self, fld, val, copy=False): 664 # type: (AnyField, Any, bool) -> Optional[Any] 665 """Get a value representative of a mutable field to detect changes""" 666 _cpy = lambda x: fld.do_copy(x) if copy else x # type: Callable[[Any], Any] 667 if fld.holds_packets: 668 # avoid copying whole packets (perf: #GH3894) 669 if fld.islist: 670 return [ 671 (_cpy(x.fields), x.payload.raw_packet_cache) for x in val 672 ] 673 else: 674 return (_cpy(val.fields), val.payload.raw_packet_cache) 675 elif fld.islist or fld.ismutable: 676 return _cpy(val) 677 return None 678 679 def clear_cache(self): 680 # type: () -> None 681 """Clear the raw packet cache for the field and all its subfields""" 682 self.raw_packet_cache = None 683 for fname, fval in self.fields.items(): 684 fld = self.get_field(fname) 685 if fld.holds_packets: 686 if isinstance(fval, Packet): 687 fval.clear_cache() 688 elif isinstance(fval, list): 689 for fsubval in fval: 690 fsubval.clear_cache() 691 self.payload.clear_cache() 692 693 def self_build(self): 694 # type: () -> bytes 695 """ 696 Create the default layer regarding fields_desc dict 697 698 :param field_pos_list: 699 """ 700 if self.raw_packet_cache is not None and \ 701 self.raw_packet_cache_fields is not None: 702 for fname, fval in self.raw_packet_cache_fields.items(): 703 fld, val = self.getfield_and_val(fname) 704 if self._raw_packet_cache_field_value(fld, val) != fval: 705 self.raw_packet_cache = None 706 self.raw_packet_cache_fields = None 707 self.wirelen = None 708 break 709 if self.raw_packet_cache is not None: 710 return self.raw_packet_cache 711 p = b"" 712 for f in self.fields_desc: 713 val = self.getfieldval(f.name) 714 if isinstance(val, RawVal): 715 p += bytes(val) 716 else: 717 try: 718 p = f.addfield(self, p, val) 719 except Exception as ex: 720 try: 721 ex.args = ( 722 "While dissecting field '%s': " % f.name + 723 ex.args[0], 724 ) + ex.args[1:] 725 except (AttributeError, IndexError): 726 pass 727 raise ex 728 return p 729 730 def do_build_payload(self): 731 # type: () -> bytes 732 """ 733 Create the default version of the payload layer 734 735 :return: a string of payload layer 736 """ 737 return self.payload.do_build() 738 739 def do_build(self): 740 # type: () -> bytes 741 """ 742 Create the default version of the layer 743 744 :return: a string of the packet with the payload 745 """ 746 if not self.explicit: 747 self = next(iter(self)) 748 pkt = self.self_build() 749 for t in self.post_transforms: 750 pkt = t(pkt) 751 pay = self.do_build_payload() 752 if self.raw_packet_cache is None: 753 return self.post_build(pkt, pay) 754 else: 755 return pkt + pay 756 757 def build_padding(self): 758 # type: () -> bytes 759 return self.payload.build_padding() 760 761 def build(self): 762 # type: () -> bytes 763 """ 764 Create the current layer 765 766 :return: string of the packet with the payload 767 """ 768 p = self.do_build() 769 p += self.build_padding() 770 p = self.build_done(p) 771 return p 772 773 def post_build(self, pkt, pay): 774 # type: (bytes, bytes) -> bytes 775 """ 776 DEV: called right after the current layer is build. 777 778 :param str pkt: the current packet (build by self_build function) 779 :param str pay: the packet payload (build by do_build_payload function) 780 :return: a string of the packet with the payload 781 """ 782 return pkt + pay 783 784 def build_done(self, p): 785 # type: (bytes) -> bytes 786 return self.payload.build_done(p) 787 788 def do_build_ps(self): 789 # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]] # noqa: E501 790 p = b"" 791 pl = [] 792 q = b"" 793 for f in self.fields_desc: 794 if isinstance(f, ConditionalField) and not f._evalcond(self): 795 continue 796 p = f.addfield(self, p, self.getfieldval(f.name)) 797 if isinstance(p, bytes): 798 r = p[len(q):] 799 q = p 800 else: 801 r = b"" 802 pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r)) 803 804 pkt, lst = self.payload.build_ps(internal=1) 805 p += pkt 806 lst.append((self, pl)) 807 808 return p, lst 809 810 def build_ps(self, internal=0): 811 # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]] # noqa: E501 812 p, lst = self.do_build_ps() 813# if not internal: 814# pkt = self 815# while pkt.haslayer(conf.padding_layer): 816# pkt = pkt.getlayer(conf.padding_layer) 817# lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) ) 818# p += pkt.load 819# pkt = pkt.payload 820 return p, lst 821 822 def canvas_dump(self, layer_shift=0, rebuild=1): 823 # type: (int, int) -> pyx.canvas.canvas 824 if PYX == 0: 825 raise ImportError("PyX and its dependencies must be installed") 826 canvas = pyx.canvas.canvas() 827 if rebuild: 828 _, t = self.__class__(raw(self)).build_ps() 829 else: 830 _, t = self.build_ps() 831 YTXTI = len(t) 832 for _, l in t: 833 YTXTI += len(l) 834 YTXT = float(YTXTI) 835 YDUMP = YTXT 836 837 XSTART = 1 838 XDSTART = 10 839 y = 0.0 840 yd = 0.0 841 XMUL = 0.55 842 YMUL = 0.4 843 844 backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb) 845 forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb) 846# backcolor=makecol(0.376, 0.729, 0.525, 1.0) 847 848 def hexstr(x): 849 # type: (bytes) -> str 850 return " ".join("%02x" % orb(c) for c in x) 851 852 def make_dump_txt(x, y, txt): 853 # type: (int, float, bytes) -> pyx.text.text 854 return pyx.text.text( 855 XDSTART + x * XMUL, 856 (YDUMP - y) * YMUL, 857 r"\tt{%s}" % hexstr(txt), 858 [pyx.text.size.Large] 859 ) 860 861 def make_box(o): 862 # type: (pyx.bbox.bbox) -> pyx.bbox.bbox 863 return pyx.box.rect( 864 o.left(), o.bottom(), o.width(), o.height(), 865 relcenter=(0.5, 0.5) 866 ) 867 868 def make_frame(lst): 869 # type: (List[Any]) -> pyx.path.path 870 if len(lst) == 1: 871 b = lst[0].bbox() 872 b.enlarge(pyx.unit.u_pt) 873 return b.path() 874 else: 875 fb = lst[0].bbox() 876 fb.enlarge(pyx.unit.u_pt) 877 lb = lst[-1].bbox() 878 lb.enlarge(pyx.unit.u_pt) 879 if len(lst) == 2 and fb.left() > lb.right(): 880 return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()), 881 pyx.path.lineto(fb.left(), fb.top()), 882 pyx.path.lineto(fb.left(), fb.bottom()), # noqa: E501 883 pyx.path.lineto(fb.right(), fb.bottom()), # noqa: E501 884 pyx.path.moveto(lb.left(), lb.top()), 885 pyx.path.lineto(lb.right(), lb.top()), 886 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 887 pyx.path.lineto(lb.left(), lb.bottom())) # noqa: E501 888 else: 889 # XXX 890 gb = lst[1].bbox() 891 if gb != lb: 892 gb.enlarge(pyx.unit.u_pt) 893 kb = lst[-2].bbox() 894 if kb != gb and kb != lb: 895 kb.enlarge(pyx.unit.u_pt) 896 return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()), 897 pyx.path.lineto(fb.right(), fb.top()), 898 pyx.path.lineto(fb.right(), kb.bottom()), # noqa: E501 899 pyx.path.lineto(lb.right(), kb.bottom()), # noqa: E501 900 pyx.path.lineto(lb.right(), lb.bottom()), # noqa: E501 901 pyx.path.lineto(lb.left(), lb.bottom()), # noqa: E501 902 pyx.path.lineto(lb.left(), gb.top()), 903 pyx.path.lineto(fb.left(), gb.top()), 904 pyx.path.closepath(),) 905 906 def make_dump(s, # type: bytes 907 shift=0, # type: int 908 y=0., # type: float 909 col=None, # type: pyx.color.color 910 bkcol=None, # type: pyx.color.color 911 large=16 # type: int 912 ): 913 # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float] # noqa: E501 914 c = pyx.canvas.canvas() 915 tlist = [] 916 while s: 917 dmp, s = s[:large - shift], s[large - shift:] 918 txt = make_dump_txt(shift, y, dmp) 919 tlist.append(txt) 920 shift += len(dmp) 921 if shift >= 16: 922 shift = 0 923 y += 1 924 if col is None: 925 col = pyx.color.rgb.red 926 if bkcol is None: 927 bkcol = pyx.color.rgb.white 928 c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick]) # noqa: E501 929 for txt in tlist: 930 c.insert(txt) 931 return c, tlist[-1].bbox(), shift, y 932 933 last_shift, last_y = 0, 0.0 934 while t: 935 bkcol = next(backcolor) 936 proto, fields = t.pop() 937 y += 0.5 938 pt = pyx.text.text( 939 XSTART, 940 (YTXT - y) * YMUL, 941 r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape( 942 str(proto.name) 943 ), 944 [pyx.text.size.Large] 945 ) 946 y += 1 947 ptbb = pt.bbox() 948 ptbb.enlarge(pyx.unit.u_pt * 2) 949 canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])]) # noqa: E501 950 canvas.insert(pt) 951 for field, fval, fdump in fields: 952 col = next(forecolor) 953 ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name)) # noqa: E501 954 if isinstance(field, BitField): 955 fsize = '%sb' % field.size 956 else: 957 fsize = '%sB' % len(fdump) 958 if (hasattr(field, 'field') and 959 'LE' in field.field.__class__.__name__[:3] or 960 'LE' in field.__class__.__name__[:3]): 961 fsize = r'$\scriptstyle\langle$' + fsize 962 st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright]) # noqa: E501 963 if isinstance(fval, str): 964 if len(fval) > 18: 965 fval = fval[:18] + "[...]" 966 else: 967 fval = "" 968 vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval)) # noqa: E501 969 y += 1.0 970 if fdump: 971 dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol) # noqa: E501 972 973 dtb = target 974 vtb = vt.bbox() 975 bxvt = make_box(vtb) 976 bxdt = make_box(dtb) 977 dtb.enlarge(pyx.unit.u_pt) 978 try: 979 if yd < 0: 980 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90) # noqa: E501 981 else: 982 cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90) # noqa: E501 983 except Exception: 984 pass 985 else: 986 canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col]) # noqa: E501 987 988 canvas.insert(dt) 989 990 canvas.insert(ft) 991 canvas.insert(st) 992 canvas.insert(vt) 993 last_y += layer_shift 994 995 return canvas 996 997 def extract_padding(self, s): 998 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 999 """ 1000 DEV: to be overloaded to extract current layer's padding. 1001 1002 :param str s: the current layer 1003 :return: a couple of strings (actual layer, padding) 1004 """ 1005 return s, None 1006 1007 def post_dissect(self, s): 1008 # type: (bytes) -> bytes 1009 """DEV: is called right after the current layer has been dissected""" 1010 return s 1011 1012 def pre_dissect(self, s): 1013 # type: (bytes) -> bytes 1014 """DEV: is called right before the current layer is dissected""" 1015 return s 1016 1017 def do_dissect(self, s): 1018 # type: (bytes) -> bytes 1019 _raw = s 1020 self.raw_packet_cache_fields = {} 1021 for f in self.fields_desc: 1022 s, fval = f.getfield(self, s) 1023 # Skip unused ConditionalField 1024 if isinstance(f, ConditionalField) and fval is None: 1025 continue 1026 # We need to track fields with mutable values to discard 1027 # .raw_packet_cache when needed. 1028 if (f.islist or f.holds_packets or f.ismutable) and fval is not None: 1029 self.raw_packet_cache_fields[f.name] = \ 1030 self._raw_packet_cache_field_value(f, fval, copy=True) 1031 self.fields[f.name] = fval 1032 # Nothing left to dissect 1033 if not s and (isinstance(f, MayEnd) or 1034 (fval is not None and isinstance(f, ConditionalField) and 1035 isinstance(f.fld, MayEnd))): 1036 break 1037 self.raw_packet_cache = _raw[:-len(s)] if s else _raw 1038 self.explicit = 1 1039 return s 1040 1041 def do_dissect_payload(self, s): 1042 # type: (bytes) -> None 1043 """ 1044 Perform the dissection of the layer's payload 1045 1046 :param str s: the raw layer 1047 """ 1048 if s: 1049 if ( 1050 self.stop_dissection_after and 1051 isinstance(self, self.stop_dissection_after) 1052 ): 1053 # stop dissection here 1054 p = conf.raw_layer(s, _internal=1, _underlayer=self) 1055 self.add_payload(p) 1056 return 1057 cls = self.guess_payload_class(s) 1058 try: 1059 p = cls( 1060 s, 1061 stop_dissection_after=self.stop_dissection_after, 1062 _internal=1, 1063 _underlayer=self, 1064 ) 1065 except KeyboardInterrupt: 1066 raise 1067 except Exception: 1068 if conf.debug_dissector: 1069 if issubtype(cls, Packet): 1070 log_runtime.error("%s dissector failed", cls.__name__) 1071 else: 1072 log_runtime.error("%s.guess_payload_class() returned " 1073 "[%s]", 1074 self.__class__.__name__, repr(cls)) 1075 if cls is not None: 1076 raise 1077 p = conf.raw_layer(s, _internal=1, _underlayer=self) 1078 self.add_payload(p) 1079 1080 def dissect(self, s): 1081 # type: (bytes) -> None 1082 s = self.pre_dissect(s) 1083 1084 s = self.do_dissect(s) 1085 1086 s = self.post_dissect(s) 1087 1088 payl, pad = self.extract_padding(s) 1089 self.do_dissect_payload(payl) 1090 if pad and conf.padding: 1091 self.add_payload(conf.padding_layer(pad)) 1092 1093 def guess_payload_class(self, payload): 1094 # type: (bytes) -> Type[Packet] 1095 """ 1096 DEV: Guesses the next payload class from layer bonds. 1097 Can be overloaded to use a different mechanism. 1098 1099 :param str payload: the layer's payload 1100 :return: the payload class 1101 """ 1102 for t in self.aliastypes: 1103 for fval, cls in t.payload_guess: 1104 try: 1105 if all(v == self.getfieldval(k) 1106 for k, v in fval.items()): 1107 return cls # type: ignore 1108 except AttributeError: 1109 pass 1110 return self.default_payload_class(payload) 1111 1112 def default_payload_class(self, payload): 1113 # type: (bytes) -> Type[Packet] 1114 """ 1115 DEV: Returns the default payload class if nothing has been found by the 1116 guess_payload_class() method. 1117 1118 :param str payload: the layer's payload 1119 :return: the default payload class define inside the configuration file 1120 """ 1121 return conf.raw_layer 1122 1123 def hide_defaults(self): 1124 # type: () -> None 1125 """Removes fields' values that are the same as default values.""" 1126 # use list(): self.fields is modified in the loop 1127 for k, v in list(self.fields.items()): 1128 v = self.fields[k] 1129 if k in self.default_fields: 1130 if self.default_fields[k] == v: 1131 del self.fields[k] 1132 self.payload.hide_defaults() 1133 1134 def clone_with(self, payload=None, **kargs): 1135 # type: (Optional[Any], **Any) -> Any 1136 pkt = self.__class__() 1137 pkt.explicit = 1 1138 pkt.fields = kargs 1139 pkt.default_fields = self.copy_fields_dict(self.default_fields) 1140 pkt.overloaded_fields = self.overloaded_fields.copy() 1141 pkt.time = self.time 1142 pkt.underlayer = self.underlayer 1143 pkt.parent = self.parent 1144 pkt.post_transforms = self.post_transforms 1145 pkt.raw_packet_cache = self.raw_packet_cache 1146 pkt.raw_packet_cache_fields = self.copy_fields_dict( 1147 self.raw_packet_cache_fields 1148 ) 1149 pkt.wirelen = self.wirelen 1150 pkt.comment = self.comment 1151 pkt.sniffed_on = self.sniffed_on 1152 pkt.direction = self.direction 1153 if payload is not None: 1154 pkt.add_payload(payload) 1155 return pkt 1156 1157 def __iter__(self): 1158 # type: () -> Iterator[Packet] 1159 """Iterates through all sub-packets generated by this Packet.""" 1160 def loop(todo, done, self=self): 1161 # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet] 1162 if todo: 1163 eltname = todo.pop() 1164 elt = self.getfieldval(eltname) 1165 if not isinstance(elt, Gen): 1166 if self.get_field(eltname).islist: 1167 elt = SetGen([elt]) 1168 else: 1169 elt = SetGen(elt) 1170 for e in elt: 1171 done[eltname] = e 1172 for x in loop(todo[:], done): 1173 yield x 1174 else: 1175 if isinstance(self.payload, NoPayload): 1176 payloads = SetGen([None]) # type: SetGen[Packet] 1177 else: 1178 payloads = self.payload 1179 for payl in payloads: 1180 # Let's make sure subpackets are consistent 1181 done2 = done.copy() 1182 for k in done2: 1183 if isinstance(done2[k], VolatileValue): 1184 done2[k] = done2[k]._fix() 1185 pkt = self.clone_with(payload=payl, **done2) 1186 yield pkt 1187 1188 if self.explicit or self.raw_packet_cache is not None: 1189 todo = [] 1190 done = self.fields 1191 else: 1192 todo = [k for (k, v) in itertools.chain(self.default_fields.items(), 1193 self.overloaded_fields.items()) 1194 if isinstance(v, VolatileValue)] + list(self.fields) 1195 done = {} 1196 return loop(todo, done) 1197 1198 def iterpayloads(self): 1199 # type: () -> Iterator[Packet] 1200 """Used to iter through the payloads of a Packet. 1201 Useful for DNS or 802.11 for instance. 1202 """ 1203 yield self 1204 current = self 1205 while current.payload: 1206 current = current.payload 1207 yield current 1208 1209 def __gt__(self, other): 1210 # type: (Packet) -> int 1211 """True if other is an answer from self (self ==> other).""" 1212 if isinstance(other, Packet): 1213 return other < self 1214 elif isinstance(other, bytes): 1215 return 1 1216 else: 1217 raise TypeError((self, other)) 1218 1219 def __lt__(self, other): 1220 # type: (Packet) -> int 1221 """True if self is an answer from other (other ==> self).""" 1222 if isinstance(other, Packet): 1223 return self.answers(other) 1224 elif isinstance(other, bytes): 1225 return 1 1226 else: 1227 raise TypeError((self, other)) 1228 1229 def __eq__(self, other): 1230 # type: (Any) -> bool 1231 if not isinstance(other, self.__class__): 1232 return False 1233 for f in self.fields_desc: 1234 if f not in other.fields_desc: 1235 return False 1236 if self.getfieldval(f.name) != other.getfieldval(f.name): 1237 return False 1238 return self.payload == other.payload 1239 1240 def __ne__(self, other): 1241 # type: (Any) -> bool 1242 return not self.__eq__(other) 1243 1244 # Note: setting __hash__ to None is the standard way 1245 # of making an object un-hashable. mypy doesn't know that 1246 __hash__ = None # type: ignore 1247 1248 def hashret(self): 1249 # type: () -> bytes 1250 """DEV: returns a string that has the same value for a request 1251 and its answer.""" 1252 return self.payload.hashret() 1253 1254 def answers(self, other): 1255 # type: (Packet) -> int 1256 """DEV: true if self is an answer from other""" 1257 if other.__class__ == self.__class__: 1258 return self.payload.answers(other.payload) 1259 return 0 1260 1261 def layers(self): 1262 # type: () -> List[Type[Packet]] 1263 """returns a list of layer classes (including subclasses) in this packet""" # noqa: E501 1264 layers = [] 1265 lyr = self # type: Optional[Packet] 1266 while lyr: 1267 layers.append(lyr.__class__) 1268 lyr = lyr.payload.getlayer(0, _subclass=True) 1269 return layers 1270 1271 def haslayer(self, cls, _subclass=None): 1272 # type: (Union[Type[Packet], str], Optional[bool]) -> int 1273 """ 1274 true if self has a layer that is an instance of cls. 1275 Superseded by "cls in self" syntax. 1276 """ 1277 if _subclass is None: 1278 _subclass = self.match_subclass or None 1279 if _subclass: 1280 match = issubtype 1281 else: 1282 match = lambda x, t: bool(x == t) 1283 if cls is None or match(self.__class__, cls) \ 1284 or cls in [self.__class__.__name__, self._name]: 1285 return True 1286 for f in self.packetfields: 1287 fvalue_gen = self.getfieldval(f.name) 1288 if fvalue_gen is None: 1289 continue 1290 if not f.islist: 1291 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 1292 for fvalue in fvalue_gen: 1293 if isinstance(fvalue, Packet): 1294 ret = fvalue.haslayer(cls, _subclass=_subclass) 1295 if ret: 1296 return ret 1297 return self.payload.haslayer(cls, _subclass=_subclass) 1298 1299 def getlayer(self, 1300 cls, # type: Union[int, Type[Packet], str] 1301 nb=1, # type: int 1302 _track=None, # type: Optional[List[int]] 1303 _subclass=None, # type: Optional[bool] 1304 **flt # type: Any 1305 ): 1306 # type: (...) -> Optional[Packet] 1307 """Return the nb^th layer that is an instance of cls, matching flt 1308values. 1309 """ 1310 if _subclass is None: 1311 _subclass = self.match_subclass or None 1312 if _subclass: 1313 match = issubtype 1314 else: 1315 match = lambda x, t: bool(x == t) 1316 # Note: 1317 # cls can be int, packet, str 1318 # string_class_name can be packet, str (packet or packet+field) 1319 # class_name can be packet, str (packet only) 1320 if isinstance(cls, int): 1321 nb = cls + 1 1322 string_class_name = "" # type: Union[Type[Packet], str] 1323 else: 1324 string_class_name = cls 1325 class_name = "" # type: Union[Type[Packet], str] 1326 fld = None # type: Optional[str] 1327 if isinstance(string_class_name, str) and "." in string_class_name: 1328 class_name, fld = string_class_name.split(".", 1) 1329 else: 1330 class_name, fld = string_class_name, None 1331 if not class_name or match(self.__class__, class_name) \ 1332 or class_name in [self.__class__.__name__, self._name]: 1333 if all(self.getfieldval(fldname) == fldvalue 1334 for fldname, fldvalue in flt.items()): 1335 if nb == 1: 1336 if fld is None: 1337 return self 1338 else: 1339 return self.getfieldval(fld) # type: ignore 1340 else: 1341 nb -= 1 1342 for f in self.packetfields: 1343 fvalue_gen = self.getfieldval(f.name) 1344 if fvalue_gen is None: 1345 continue 1346 if not f.islist: 1347 fvalue_gen = SetGen(fvalue_gen, _iterpacket=0) 1348 for fvalue in fvalue_gen: 1349 if isinstance(fvalue, Packet): 1350 track = [] # type: List[int] 1351 ret = fvalue.getlayer(class_name, nb=nb, _track=track, 1352 _subclass=_subclass, **flt) 1353 if ret is not None: 1354 return ret 1355 nb = track[0] 1356 return self.payload.getlayer(class_name, nb=nb, _track=_track, 1357 _subclass=_subclass, **flt) 1358 1359 def firstlayer(self): 1360 # type: () -> Packet 1361 q = self 1362 while q.underlayer is not None: 1363 q = q.underlayer 1364 return q 1365 1366 def __getitem__(self, cls): 1367 # type: (Union[Type[Packet], str]) -> Any 1368 if isinstance(cls, slice): 1369 lname = cls.start 1370 if cls.stop: 1371 ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {})) 1372 else: 1373 ret = self.getlayer(cls.start, **(cls.step or {})) 1374 else: 1375 lname = cls 1376 ret = self.getlayer(cls) 1377 if ret is None: 1378 if isinstance(lname, type): 1379 name = lname.__name__ 1380 elif not isinstance(lname, bytes): 1381 name = repr(lname) 1382 else: 1383 name = cast(str, lname) 1384 raise IndexError("Layer [%s] not found" % name) 1385 return ret 1386 1387 def __delitem__(self, cls): 1388 # type: (Type[Packet]) -> None 1389 del self[cls].underlayer.payload 1390 1391 def __setitem__(self, cls, val): 1392 # type: (Type[Packet], Packet) -> None 1393 self[cls].underlayer.payload = val 1394 1395 def __contains__(self, cls): 1396 # type: (Union[Type[Packet], str]) -> int 1397 """ 1398 "cls in self" returns true if self has a layer which is an 1399 instance of cls. 1400 """ 1401 return self.haslayer(cls) 1402 1403 def route(self): 1404 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]] 1405 return self.payload.route() 1406 1407 def fragment(self, *args, **kargs): 1408 # type: (*Any, **Any) -> List[Packet] 1409 return self.payload.fragment(*args, **kargs) 1410 1411 def display(self, *args, **kargs): # Deprecated. Use show() 1412 # type: (*Any, **Any) -> None 1413 """Deprecated. Use show() method.""" 1414 self.show(*args, **kargs) 1415 1416 def _show_or_dump(self, 1417 dump=False, # type: bool 1418 indent=3, # type: int 1419 lvl="", # type: str 1420 label_lvl="", # type: str 1421 first_call=True # type: bool 1422 ): 1423 # type: (...) -> Optional[str] 1424 """ 1425 Internal method that shows or dumps a hierarchical view of a packet. 1426 Called by show. 1427 1428 :param dump: determine if it prints or returns the string value 1429 :param int indent: the size of indentation for each layer 1430 :param str lvl: additional information about the layer lvl 1431 :param str label_lvl: additional information about the layer fields 1432 :param first_call: determine if the current function is the first 1433 :return: return a hierarchical view if dump, else print it 1434 """ 1435 1436 if dump: 1437 from scapy.themes import ColorTheme, AnsiColorTheme 1438 ct: ColorTheme = AnsiColorTheme() # No color for dump output 1439 else: 1440 ct = conf.color_theme 1441 s = "%s%s %s %s\n" % (label_lvl, 1442 ct.punct("###["), 1443 ct.layer_name(self.name), 1444 ct.punct("]###")) 1445 fields = self.fields_desc.copy() 1446 while fields: 1447 f = fields.pop(0) 1448 if isinstance(f, ConditionalField) and not f._evalcond(self): 1449 continue 1450 if hasattr(f, "fields"): # Field has subfields 1451 s += "%s %s =\n" % ( 1452 label_lvl + lvl, 1453 ct.depreciate_field_name(f.name), 1454 ) 1455 lvl += " " * indent * self.show_indent 1456 for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)): 1457 fields.insert(i, fld) 1458 continue 1459 if isinstance(f, Emph) or f in conf.emph: 1460 ncol = ct.emph_field_name 1461 vcol = ct.emph_field_value 1462 else: 1463 ncol = ct.field_name 1464 vcol = ct.field_value 1465 pad = max(0, 10 - len(f.name)) * " " 1466 fvalue = self.getfieldval(f.name) 1467 if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)): # noqa: E501 1468 s += "%s %s%s%s%s\n" % (label_lvl + lvl, 1469 ct.punct("\\"), 1470 ncol(f.name), 1471 pad, 1472 ct.punct("\\")) 1473 fvalue_gen = SetGen( 1474 fvalue, 1475 _iterpacket=0 1476 ) # type: SetGen[Packet] 1477 for fvalue in fvalue_gen: 1478 s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + " |", first_call=False) # noqa: E501 1479 else: 1480 begn = "%s %s%s%s " % (label_lvl + lvl, 1481 ncol(f.name), 1482 pad, 1483 ct.punct("="),) 1484 reprval = f.i2repr(self, fvalue) 1485 if isinstance(reprval, str): 1486 reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) + # noqa: E501 1487 len(lvl) + 1488 len(f.name) + 1489 4)) 1490 s += "%s%s\n" % (begn, vcol(reprval)) 1491 if self.payload: 1492 s += self.payload._show_or_dump( # type: ignore 1493 dump=dump, 1494 indent=indent, 1495 lvl=lvl + (" " * indent * self.show_indent), 1496 label_lvl=label_lvl, 1497 first_call=False 1498 ) 1499 1500 if first_call and not dump: 1501 print(s) 1502 return None 1503 else: 1504 return s 1505 1506 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 1507 # type: (bool, int, str, str) -> Optional[Any] 1508 """ 1509 Prints or returns (when "dump" is true) a hierarchical view of the 1510 packet. 1511 1512 :param dump: determine if it prints or returns the string value 1513 :param int indent: the size of indentation for each layer 1514 :param str lvl: additional information about the layer lvl 1515 :param str label_lvl: additional information about the layer fields 1516 :return: return a hierarchical view if dump, else print it 1517 """ 1518 return self._show_or_dump(dump, indent, lvl, label_lvl) 1519 1520 def show2(self, dump=False, indent=3, lvl="", label_lvl=""): 1521 # type: (bool, int, str, str) -> Optional[Any] 1522 """ 1523 Prints or returns (when "dump" is true) a hierarchical view of an 1524 assembled version of the packet, so that automatic fields are 1525 calculated (checksums, etc.) 1526 1527 :param dump: determine if it prints or returns the string value 1528 :param int indent: the size of indentation for each layer 1529 :param str lvl: additional information about the layer lvl 1530 :param str label_lvl: additional information about the layer fields 1531 :return: return a hierarchical view if dump, else print it 1532 """ 1533 return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl) 1534 1535 def sprintf(self, fmt, relax=1): 1536 # type: (str, int) -> str 1537 """ 1538 sprintf(format, [relax=1]) -> str 1539 1540 Where format is a string that can include directives. A directive 1541 begins and ends by % and has the following format: 1542 ``%[fmt[r],][cls[:nb].]field%`` 1543 1544 :param fmt: is a classic printf directive, "r" can be appended for raw 1545 substitution: 1546 (ex: IP.flags=0x18 instead of SA), nb is the number of the layer 1547 (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer). 1548 Special case : "%.time%" is the creation time. 1549 Ex:: 1550 1551 p.sprintf( 1552 "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% " 1553 "%03xr,IP.proto% %r,TCP.flags%" 1554 ) 1555 1556 Moreover, the format string can include conditional statements. A 1557 conditional statement looks like : {layer:string} where layer is a 1558 layer name, and string is the string to insert in place of the 1559 condition if it is true, i.e. if layer is present. If layer is 1560 preceded by a "!", the result is inverted. Conditions can be 1561 imbricated. A valid statement can be:: 1562 1563 p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet") 1564 p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}") 1565 1566 A side effect is that, to obtain "{" and "}" characters, you must use 1567 "%(" and "%)". 1568 """ 1569 1570 escape = {"%": "%", 1571 "(": "{", 1572 ")": "}"} 1573 1574 # Evaluate conditions 1575 while "{" in fmt: 1576 i = fmt.rindex("{") 1577 j = fmt[i + 1:].index("}") 1578 cond = fmt[i + 1:i + j + 1] 1579 k = cond.find(":") 1580 if k < 0: 1581 raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond) # noqa: E501 1582 cond, format_ = cond[:k], cond[k + 1:] 1583 res = False 1584 if cond[0] == "!": 1585 res = True 1586 cond = cond[1:] 1587 if self.haslayer(cond): 1588 res = not res 1589 if not res: 1590 format_ = "" 1591 fmt = fmt[:i] + format_ + fmt[i + j + 2:] 1592 1593 # Evaluate directives 1594 s = "" 1595 while "%" in fmt: 1596 i = fmt.index("%") 1597 s += fmt[:i] 1598 fmt = fmt[i + 1:] 1599 if fmt and fmt[0] in escape: 1600 s += escape[fmt[0]] 1601 fmt = fmt[1:] 1602 continue 1603 try: 1604 i = fmt.index("%") 1605 sfclsfld = fmt[:i] 1606 fclsfld = sfclsfld.split(",") 1607 if len(fclsfld) == 1: 1608 f = "s" 1609 clsfld = fclsfld[0] 1610 elif len(fclsfld) == 2: 1611 f, clsfld = fclsfld 1612 else: 1613 raise Scapy_Exception 1614 if "." in clsfld: 1615 cls, fld = clsfld.split(".") 1616 else: 1617 cls = self.__class__.__name__ 1618 fld = clsfld 1619 num = 1 1620 if ":" in cls: 1621 cls, snum = cls.split(":") 1622 num = int(snum) 1623 fmt = fmt[i + 1:] 1624 except Exception: 1625 raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "...")) # noqa: E501 1626 else: 1627 if fld == "time": 1628 val = time.strftime( 1629 "%H:%M:%S.%%06i", 1630 time.localtime(float(self.time)) 1631 ) % int((self.time - int(self.time)) * 1000000) 1632 elif cls == self.__class__.__name__ and hasattr(self, fld): 1633 if num > 1: 1634 val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax) # noqa: E501 1635 f = "s" 1636 else: 1637 try: 1638 val = self.getfieldval(fld) 1639 except AttributeError: 1640 val = getattr(self, fld) 1641 if f[-1] == "r": # Raw field value 1642 f = f[:-1] 1643 if not f: 1644 f = "s" 1645 else: 1646 if fld in self.fieldtype: 1647 val = self.fieldtype[fld].i2repr(self, val) 1648 else: 1649 val = self.payload.sprintf("%%%s%%" % sfclsfld, relax) 1650 f = "s" 1651 s += ("%" + f) % val 1652 1653 s += fmt 1654 return s 1655 1656 def mysummary(self): 1657 # type: () -> str 1658 """DEV: can be overloaded to return a string that summarizes the layer. 1659 Only one mysummary() is used in a whole packet summary: the one of the upper layer, # noqa: E501 1660 except if a mysummary() also returns (as a couple) a list of layers whose # noqa: E501 1661 mysummary() must be called if they are present.""" 1662 return "" 1663 1664 def _do_summary(self): 1665 # type: () -> Tuple[int, str, List[Any]] 1666 found, s, needed = self.payload._do_summary() 1667 ret = "" 1668 if not found or self.__class__ in needed: 1669 ret = self.mysummary() 1670 if isinstance(ret, tuple): 1671 ret, n = ret 1672 needed += n 1673 if ret or needed: 1674 found = 1 1675 if not ret: 1676 ret = self.__class__.__name__ if self.show_summary else "" 1677 if self.__class__ in conf.emph: 1678 impf = [] 1679 for f in self.fields_desc: 1680 if f in conf.emph: 1681 impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name)))) # noqa: E501 1682 ret = "%s [%s]" % (ret, " ".join(impf)) 1683 if ret and s: 1684 ret = "%s / %s" % (ret, s) 1685 else: 1686 ret = "%s%s" % (ret, s) 1687 return found, ret, needed 1688 1689 def summary(self, intern=0): 1690 # type: (int) -> str 1691 """Prints a one line summary of a packet.""" 1692 return self._do_summary()[1] 1693 1694 def lastlayer(self, layer=None): 1695 # type: (Optional[Packet]) -> Packet 1696 """Returns the uppest layer of the packet""" 1697 return self.payload.lastlayer(self) 1698 1699 def decode_payload_as(self, cls): 1700 # type: (Type[Packet]) -> None 1701 """Reassembles the payload and decode it using another packet class""" 1702 s = raw(self.payload) 1703 self.payload = cls(s, _internal=1, _underlayer=self) 1704 pp = self 1705 while pp.underlayer is not None: 1706 pp = pp.underlayer 1707 self.payload.dissection_done(pp) 1708 1709 def _command(self, json=False): 1710 # type: (bool) -> List[Tuple[str, Any]] 1711 """ 1712 Internal method used to generate command() and json() 1713 """ 1714 f = [] 1715 iterator: Iterator[Tuple[str, Any]] 1716 if json: 1717 iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc) 1718 else: 1719 iterator = iter(self.fields.items()) 1720 for fn, fv in iterator: 1721 fld = self.get_field(fn) 1722 if isinstance(fv, (list, dict, set)) and not fv and not fld.default: 1723 continue 1724 if isinstance(fv, Packet): 1725 if json: 1726 fv = {k: v for (k, v) in fv._command(json=True)} 1727 else: 1728 fv = fv.command() 1729 elif fld.islist and fld.holds_packets and isinstance(fv, list): 1730 if json: 1731 fv = [ 1732 {k: v for (k, v) in x} 1733 for x in map(lambda y: Packet._command(y, json=True), fv) 1734 ] 1735 else: 1736 fv = "[%s]" % ",".join(map(Packet.command, fv)) 1737 elif fld.islist and isinstance(fv, list): 1738 if json: 1739 fv = [ 1740 getattr(x, 'command', lambda: repr(x))() 1741 for x in fv 1742 ] 1743 else: 1744 fv = "[%s]" % ",".join( 1745 getattr(x, 'command', lambda: repr(x))() 1746 for x in fv 1747 ) 1748 elif isinstance(fv, FlagValue): 1749 fv = int(fv) 1750 elif callable(getattr(fv, 'command', None)): 1751 fv = fv.command(json=json) 1752 else: 1753 if json: 1754 if isinstance(fv, bytes): 1755 fv = fv.decode("utf-8", errors="backslashreplace") 1756 else: 1757 fv = fld.i2h(self, fv) 1758 else: 1759 fv = repr(fld.i2h(self, fv)) 1760 f.append((fn, fv)) 1761 return f 1762 1763 def command(self): 1764 # type: () -> str 1765 """ 1766 Returns a string representing the command you have to type to 1767 obtain the same packet 1768 """ 1769 c = "%s(%s)" % ( 1770 self.__class__.__name__, 1771 ", ".join("%s=%s" % x for x in self._command()) 1772 ) 1773 pc = self.payload.command() 1774 if pc: 1775 c += "/" + pc 1776 return c 1777 1778 def json(self): 1779 # type: () -> str 1780 """ 1781 Returns a JSON representing the packet. 1782 1783 Please note that this cannot be used for bijective usage: data loss WILL occur, 1784 so it will not make sense to try to rebuild the packet from the output. 1785 This must only be used for a grepping/displaying purpose. 1786 """ 1787 dump = json.dumps({k: v for (k, v) in self._command(json=True)}) 1788 pc = self.payload.json() 1789 if pc: 1790 dump = dump[:-1] + ", \"payload\": %s}" % pc 1791 return dump 1792 1793 1794class NoPayload(Packet): 1795 def __new__(cls, *args, **kargs): 1796 # type: (Type[Packet], *Any, **Any) -> NoPayload 1797 singl = cls.__dict__.get("__singl__") 1798 if singl is None: 1799 cls.__singl__ = singl = Packet.__new__(cls) 1800 Packet.__init__(singl) 1801 return cast(NoPayload, singl) 1802 1803 def __init__(self, *args, **kargs): 1804 # type: (*Any, **Any) -> None 1805 pass 1806 1807 def dissection_done(self, pkt): 1808 # type: (Packet) -> None 1809 pass 1810 1811 def add_payload(self, payload): 1812 # type: (Union[Packet, bytes]) -> NoReturn 1813 raise Scapy_Exception("Can't add payload to NoPayload instance") 1814 1815 def remove_payload(self): 1816 # type: () -> None 1817 pass 1818 1819 def add_underlayer(self, underlayer): 1820 # type: (Any) -> None 1821 pass 1822 1823 def remove_underlayer(self, other): 1824 # type: (Packet) -> None 1825 pass 1826 1827 def add_parent(self, parent): 1828 # type: (Any) -> None 1829 pass 1830 1831 def remove_parent(self, other): 1832 # type: (Packet) -> None 1833 pass 1834 1835 def copy(self): 1836 # type: () -> NoPayload 1837 return self 1838 1839 def clear_cache(self): 1840 # type: () -> None 1841 pass 1842 1843 def __repr__(self): 1844 # type: () -> str 1845 return "" 1846 1847 def __str__(self): 1848 # type: () -> str 1849 return "" 1850 1851 def __bytes__(self): 1852 # type: () -> bytes 1853 return b"" 1854 1855 def __nonzero__(self): 1856 # type: () -> bool 1857 return False 1858 __bool__ = __nonzero__ 1859 1860 def do_build(self): 1861 # type: () -> bytes 1862 return b"" 1863 1864 def build(self): 1865 # type: () -> bytes 1866 return b"" 1867 1868 def build_padding(self): 1869 # type: () -> bytes 1870 return b"" 1871 1872 def build_done(self, p): 1873 # type: (bytes) -> bytes 1874 return p 1875 1876 def build_ps(self, internal=0): 1877 # type: (int) -> Tuple[bytes, List[Any]] 1878 return b"", [] 1879 1880 def getfieldval(self, attr): 1881 # type: (str) -> NoReturn 1882 raise AttributeError(attr) 1883 1884 def getfield_and_val(self, attr): 1885 # type: (str) -> NoReturn 1886 raise AttributeError(attr) 1887 1888 def setfieldval(self, attr, val): 1889 # type: (str, Any) -> NoReturn 1890 raise AttributeError(attr) 1891 1892 def delfieldval(self, attr): 1893 # type: (str) -> NoReturn 1894 raise AttributeError(attr) 1895 1896 def hide_defaults(self): 1897 # type: () -> None 1898 pass 1899 1900 def __iter__(self): 1901 # type: () -> Iterator[Packet] 1902 return iter([]) 1903 1904 def __eq__(self, other): 1905 # type: (Any) -> bool 1906 if isinstance(other, NoPayload): 1907 return True 1908 return False 1909 1910 def hashret(self): 1911 # type: () -> bytes 1912 return b"" 1913 1914 def answers(self, other): 1915 # type: (Packet) -> bool 1916 return isinstance(other, (NoPayload, conf.padding_layer)) # noqa: E501 1917 1918 def haslayer(self, cls, _subclass=None): 1919 # type: (Union[Type[Packet], str], Optional[bool]) -> int 1920 return 0 1921 1922 def getlayer(self, 1923 cls, # type: Union[int, Type[Packet], str] 1924 nb=1, # type: int 1925 _track=None, # type: Optional[List[int]] 1926 _subclass=None, # type: Optional[bool] 1927 **flt # type: Any 1928 ): 1929 # type: (...) -> Optional[Packet] 1930 if _track is not None: 1931 _track.append(nb) 1932 return None 1933 1934 def fragment(self, *args, **kargs): 1935 # type: (*Any, **Any) -> List[Packet] 1936 raise Scapy_Exception("cannot fragment this packet") 1937 1938 def show(self, dump=False, indent=3, lvl="", label_lvl=""): 1939 # type: (bool, int, str, str) -> None 1940 pass 1941 1942 def sprintf(self, fmt, relax=1): 1943 # type: (str, int) -> str 1944 if relax: 1945 return "??" 1946 else: 1947 raise Scapy_Exception("Format not found [%s]" % fmt) 1948 1949 def _do_summary(self): 1950 # type: () -> Tuple[int, str, List[Any]] 1951 return 0, "", [] 1952 1953 def layers(self): 1954 # type: () -> List[Type[Packet]] 1955 return [] 1956 1957 def lastlayer(self, layer=None): 1958 # type: (Optional[Packet]) -> Packet 1959 return layer or self 1960 1961 def command(self): 1962 # type: () -> str 1963 return "" 1964 1965 def json(self): 1966 # type: () -> str 1967 return "" 1968 1969 def route(self): 1970 # type: () -> Tuple[None, None, None] 1971 return (None, None, None) 1972 1973 1974#################### 1975# packet classes # 1976#################### 1977 1978 1979class Raw(Packet): 1980 name = "Raw" 1981 fields_desc = [StrField("load", b"")] 1982 1983 def __init__(self, _pkt=b"", *args, **kwargs): 1984 # type: (bytes, *Any, **Any) -> None 1985 if _pkt and not isinstance(_pkt, bytes): 1986 if isinstance(_pkt, tuple): 1987 _pkt, bn = _pkt 1988 _pkt = bytes_encode(_pkt), bn 1989 else: 1990 _pkt = bytes_encode(_pkt) 1991 super(Raw, self).__init__(_pkt, *args, **kwargs) 1992 1993 def answers(self, other): 1994 # type: (Packet) -> int 1995 return 1 1996 1997 def mysummary(self): 1998 # type: () -> str 1999 cs = conf.raw_summary 2000 if cs: 2001 if callable(cs): 2002 return "Raw %s" % cs(self.load) 2003 else: 2004 return "Raw %r" % self.load 2005 return Packet.mysummary(self) 2006 2007 2008class Padding(Raw): 2009 name = "Padding" 2010 2011 def self_build(self, field_pos_list=None): 2012 # type: (Optional[Any]) -> bytes 2013 return b"" 2014 2015 def build_padding(self): 2016 # type: () -> bytes 2017 return ( 2018 bytes_encode(self.load) if self.raw_packet_cache is None 2019 else self.raw_packet_cache 2020 ) + self.payload.build_padding() 2021 2022 2023conf.raw_layer = Raw 2024conf.padding_layer = Padding 2025if conf.default_l2 is None: 2026 conf.default_l2 = Raw 2027 2028################# 2029# Bind layers # 2030################# 2031 2032 2033def bind_bottom_up(lower, # type: Type[Packet] 2034 upper, # type: Type[Packet] 2035 __fval=None, # type: Optional[Any] 2036 **fval # type: Any 2037 ): 2038 # type: (...) -> None 2039 r"""Bind 2 layers for dissection. 2040 The upper layer will be chosen for dissection on top of the lower layer, if 2041 ALL the passed arguments are validated. If multiple calls are made with 2042 the same layers, the last one will be used as default. 2043 2044 ex: 2045 >>> bind_bottom_up(Ether, SNAP, type=0x1234) 2046 >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00') # noqa: E501 2047 <Ether dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP OUI=0x0 code=0x0 |>> # noqa: E501 2048 """ 2049 if __fval is not None: 2050 fval.update(__fval) 2051 lower.payload_guess = lower.payload_guess[:] 2052 lower.payload_guess.append((fval, upper)) 2053 2054 2055def bind_top_down(lower, # type: Type[Packet] 2056 upper, # type: Type[Packet] 2057 __fval=None, # type: Optional[Any] 2058 **fval # type: Any 2059 ): 2060 # type: (...) -> None 2061 """Bind 2 layers for building. 2062 When the upper layer is added as a payload of the lower layer, all the 2063 arguments will be applied to them. 2064 2065 ex: 2066 >>> bind_top_down(Ether, SNAP, type=0x1234) 2067 >>> Ether()/SNAP() 2068 <Ether type=0x1234 |<SNAP |>> 2069 """ 2070 if __fval is not None: 2071 fval.update(__fval) 2072 upper._overload_fields = upper._overload_fields.copy() # type: ignore 2073 upper._overload_fields[lower] = fval 2074 2075 2076@conf.commands.register 2077def bind_layers(lower, # type: Type[Packet] 2078 upper, # type: Type[Packet] 2079 __fval=None, # type: Optional[Dict[str, int]] 2080 **fval # type: Any 2081 ): 2082 # type: (...) -> None 2083 """Bind 2 layers on some specific fields' values. 2084 2085 It makes the packet being built and dissected when the arguments 2086 are present. 2087 2088 This function calls both bind_bottom_up and bind_top_down, with 2089 all passed arguments. 2090 2091 Please have a look at their docs: 2092 - help(bind_bottom_up) 2093 - help(bind_top_down) 2094 """ 2095 if __fval is not None: 2096 fval.update(__fval) 2097 bind_top_down(lower, upper, **fval) 2098 bind_bottom_up(lower, upper, **fval) 2099 2100 2101def split_bottom_up(lower, # type: Type[Packet] 2102 upper, # type: Type[Packet] 2103 __fval=None, # type: Optional[Any] 2104 **fval # type: Any 2105 ): 2106 # type: (...) -> None 2107 """This call un-links an association that was made using bind_bottom_up. 2108 Have a look at help(bind_bottom_up) 2109 """ 2110 if __fval is not None: 2111 fval.update(__fval) 2112 2113 def do_filter(params, cls): 2114 # type: (Dict[str, int], Type[Packet]) -> bool 2115 params_is_invalid = any( 2116 k not in params or params[k] != v for k, v in fval.items() 2117 ) 2118 return cls != upper or params_is_invalid 2119 lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)] 2120 2121 2122def split_top_down(lower, # type: Type[Packet] 2123 upper, # type: Type[Packet] 2124 __fval=None, # type: Optional[Any] 2125 **fval # type: Any 2126 ): 2127 # type: (...) -> None 2128 """This call un-links an association that was made using bind_top_down. 2129 Have a look at help(bind_top_down) 2130 """ 2131 if __fval is not None: 2132 fval.update(__fval) 2133 if lower in upper._overload_fields: 2134 ofval = upper._overload_fields[lower] 2135 if any(k not in ofval or ofval[k] != v for k, v in fval.items()): 2136 return 2137 upper._overload_fields = upper._overload_fields.copy() # type: ignore 2138 del upper._overload_fields[lower] 2139 2140 2141@conf.commands.register 2142def split_layers(lower, # type: Type[Packet] 2143 upper, # type: Type[Packet] 2144 __fval=None, # type: Optional[Any] 2145 **fval # type: Any 2146 ): 2147 # type: (...) -> None 2148 """Split 2 layers previously bound. 2149 This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of # noqa: E501 2150 bind_layers. 2151 2152 Please have a look at their docs: 2153 - help(split_bottom_up) 2154 - help(split_top_down) 2155 """ 2156 if __fval is not None: 2157 fval.update(__fval) 2158 split_bottom_up(lower, upper, **fval) 2159 split_top_down(lower, upper, **fval) 2160 2161 2162@conf.commands.register 2163def explore(layer=None): 2164 # type: (Optional[str]) -> None 2165 """Function used to discover the Scapy layers and protocols. 2166 It helps to see which packets exists in contrib or layer files. 2167 2168 params: 2169 - layer: If specified, the function will explore the layer. If not, 2170 the GUI mode will be activated, to browse the available layers 2171 2172 examples: 2173 >>> explore() # Launches the GUI 2174 >>> explore("dns") # Explore scapy.layers.dns 2175 >>> explore("http2") # Explore scapy.contrib.http2 2176 >>> explore(scapy.layers.bluetooth4LE) 2177 2178 Note: to search a packet by name, use ls("name") rather than explore. 2179 """ 2180 if layer is None: # GUI MODE 2181 if not conf.interactive: 2182 raise Scapy_Exception("explore() GUI-mode cannot be run in " 2183 "interactive mode. Please provide a " 2184 "'layer' parameter !") 2185 # 0 - Imports 2186 try: 2187 import prompt_toolkit 2188 except ImportError: 2189 raise ImportError("prompt_toolkit is not installed ! " 2190 "You may install IPython, which contains it, via" 2191 " `pip install ipython`") 2192 if not _version_checker(prompt_toolkit, (2, 0)): 2193 raise ImportError("prompt_toolkit >= 2.0.0 is required !") 2194 # Only available with prompt_toolkit > 2.0, not released on PyPi yet 2195 from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \ 2196 button_dialog 2197 from prompt_toolkit.formatted_text import HTML 2198 # Check for prompt_toolkit >= 3.0.0 2199 call_ptk = lambda x: cast(str, x) # type: Callable[[Any], str] 2200 if _version_checker(prompt_toolkit, (3, 0)): 2201 call_ptk = lambda x: x.run() 2202 # 1 - Ask for layer or contrib 2203 btn_diag = button_dialog( 2204 title="Scapy v%s" % conf.version, 2205 text=HTML( 2206 '<style bg="white" fg="red">Chose the type of packets' 2207 ' you want to explore:</style>' 2208 ), 2209 buttons=[ 2210 ("Layers", "layers"), 2211 ("Contribs", "contribs"), 2212 ("Cancel", "cancel") 2213 ]) 2214 action = call_ptk(btn_diag) 2215 # 2 - Retrieve list of Packets 2216 if action == "layers": 2217 # Get all loaded layers 2218 lvalues = conf.layers.layers() 2219 # Restrict to layers-only (not contribs) + packet.py and asn1*.py 2220 values = [x for x in lvalues if ("layers" in x[0] or 2221 "packet" in x[0] or 2222 "asn1" in x[0])] 2223 elif action == "contribs": 2224 # Get all existing contribs 2225 from scapy.main import list_contrib 2226 cvalues = cast(List[Dict[str, str]], list_contrib(ret=True)) 2227 values = [(x['name'], x['description']) 2228 for x in cvalues] 2229 # Remove very specific modules 2230 values = [x for x in values if "can" not in x[0]] 2231 else: 2232 # Escape/Cancel was pressed 2233 return 2234 # Build tree 2235 if action == "contribs": 2236 # A tree is a dictionary. Each layer contains a keyword 2237 # _l which contains the files in the layer, and a _name 2238 # argument which is its name. The other keys are the subfolders, 2239 # which are similar dictionaries 2240 tree = defaultdict(list) # type: Dict[str, Union[List[Any], Dict[str, Any]]] # noqa: E501 2241 for name, desc in values: 2242 if "." in name: # Folder detected 2243 parts = name.split(".") 2244 subtree = tree 2245 for pa in parts[:-1]: 2246 if pa not in subtree: 2247 subtree[pa] = {} 2248 # one layer deeper 2249 subtree = subtree[pa] # type: ignore 2250 subtree["_name"] = pa # type: ignore 2251 if "_l" not in subtree: 2252 subtree["_l"] = [] 2253 subtree["_l"].append((parts[-1], desc)) # type: ignore 2254 else: 2255 tree["_l"].append((name, desc)) # type: ignore 2256 elif action == "layers": 2257 tree = {"_l": values} 2258 # 3 - Ask for the layer/contrib module to explore 2259 current = tree # type: Any 2260 previous = [] # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]] # noqa: E501 2261 while True: 2262 # Generate tests & form 2263 folders = list(current.keys()) 2264 _radio_values = [ 2265 ("$" + name, str('[+] ' + name.capitalize())) 2266 for name in folders if not name.startswith("_") 2267 ] + current.get("_l", []) # type: List[str] 2268 cur_path = "" 2269 if previous: 2270 cur_path = ".".join( 2271 itertools.chain( 2272 (x["_name"] for x in previous[1:]), # type: ignore 2273 (current["_name"],) 2274 ) 2275 ) 2276 extra_text = ( 2277 '\n<style bg="white" fg="green">> scapy.%s</style>' 2278 ) % (action + ("." + cur_path if cur_path else "")) 2279 # Show popup 2280 rd_diag = radiolist_dialog( 2281 values=_radio_values, 2282 title="Scapy v%s" % conf.version, 2283 text=HTML( 2284 ( 2285 '<style bg="white" fg="red">Please select a file' 2286 'among the following, to see all layers contained in' 2287 ' it:</style>' 2288 ) + extra_text 2289 ), 2290 cancel_text="Back" if previous else "Cancel" 2291 ) 2292 result = call_ptk(rd_diag) 2293 if result is None: 2294 # User pressed "Cancel/Back" 2295 if previous: # Back 2296 current = previous.pop() 2297 continue 2298 else: # Cancel 2299 return 2300 if result.startswith("$"): 2301 previous.append(current) 2302 current = current[result[1:]] 2303 else: 2304 # Enter on layer 2305 if previous: # In subfolder 2306 result = cur_path + "." + result 2307 break 2308 # 4 - (Contrib only): load contrib 2309 if action == "contribs": 2310 from scapy.main import load_contrib 2311 load_contrib(result) 2312 result = "scapy.contrib." + result 2313 else: # NON-GUI MODE 2314 # We handle layer as a short layer name, full layer name 2315 # or the module itself 2316 if isinstance(layer, types.ModuleType): 2317 layer = layer.__name__ 2318 if isinstance(layer, str): 2319 if layer.startswith("scapy.layers."): 2320 result = layer 2321 else: 2322 if layer.startswith("scapy.contrib."): 2323 layer = layer.replace("scapy.contrib.", "") 2324 from scapy.main import load_contrib 2325 load_contrib(layer) 2326 result_layer, result_contrib = (("scapy.layers.%s" % layer), 2327 ("scapy.contrib.%s" % layer)) 2328 if result_layer in conf.layers.ldict: 2329 result = result_layer 2330 elif result_contrib in conf.layers.ldict: 2331 result = result_contrib 2332 else: 2333 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 2334 else: 2335 warning("Wrong usage ! Check out help(explore)") 2336 return 2337 2338 # COMMON PART 2339 # Get the list of all Packets contained in that module 2340 try: 2341 all_layers = conf.layers.ldict[result] 2342 except KeyError: 2343 raise Scapy_Exception("Unknown scapy module '%s'" % layer) 2344 # Print 2345 print(conf.color_theme.layer_name("Packets contained in %s:" % result)) 2346 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 2347 rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers] 2348 print(pretty_list(rtlst, [("Class", "Name")], borders=True)) 2349 2350 2351def _pkt_ls(obj, # type: Union[Packet, Type[Packet]] 2352 verbose=False, # type: bool 2353 ): 2354 # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]] # noqa: E501 2355 """Internal function used to resolve `fields_desc` to display it. 2356 2357 :param obj: a packet object or class 2358 :returns: a list containing tuples [(name, clsname, clsname_extras, 2359 default, long_attrs)] 2360 """ 2361 is_pkt = isinstance(obj, Packet) 2362 if not issubtype(obj, Packet) and not is_pkt: 2363 raise ValueError 2364 fields = [] 2365 for f in obj.fields_desc: 2366 cur_fld = f 2367 attrs = [] # type: List[str] 2368 long_attrs = [] # type: List[str] 2369 while isinstance(cur_fld, (Emph, ConditionalField)): 2370 if isinstance(cur_fld, ConditionalField): 2371 attrs.append(cur_fld.__class__.__name__[:4]) 2372 cur_fld = cur_fld.fld 2373 name = cur_fld.name 2374 default = cur_fld.default 2375 if verbose and isinstance(cur_fld, EnumField) \ 2376 and hasattr(cur_fld, "i2s") and cur_fld.i2s: 2377 if len(cur_fld.i2s or []) < 50: 2378 long_attrs.extend( 2379 "%s: %d" % (strval, numval) 2380 for numval, strval in 2381 sorted(cur_fld.i2s.items()) 2382 ) 2383 elif isinstance(cur_fld, MultiEnumField): 2384 if isinstance(obj, Packet): 2385 obj_pkt = obj 2386 else: 2387 obj_pkt = obj() 2388 fld_depend = cur_fld.depends_on(obj_pkt) 2389 attrs.append("Depends on %s" % fld_depend) 2390 if verbose: 2391 cur_i2s = cur_fld.i2s_multi.get( 2392 cur_fld.depends_on(obj_pkt), {} 2393 ) 2394 if len(cur_i2s) < 50: 2395 long_attrs.extend( 2396 "%s: %d" % (strval, numval) 2397 for numval, strval in 2398 sorted(cur_i2s.items()) 2399 ) 2400 elif verbose and isinstance(cur_fld, FlagsField): 2401 names = cur_fld.names 2402 long_attrs.append(", ".join(names)) 2403 elif isinstance(cur_fld, MultipleTypeField): 2404 default = cur_fld.dflt.default 2405 attrs.append(", ".join( 2406 x[0].__class__.__name__ for x in 2407 itertools.chain(cur_fld.flds, [(cur_fld.dflt,)]) 2408 )) 2409 2410 cls = cur_fld.__class__ 2411 class_name_extras = "(%s)" % ( 2412 ", ".join(attrs) 2413 ) if attrs else "" 2414 if isinstance(cur_fld, BitField): 2415 class_name_extras += " (%d bit%s)" % ( 2416 cur_fld.size, 2417 "s" if cur_fld.size > 1 else "" 2418 ) 2419 fields.append( 2420 (name, 2421 cls, 2422 class_name_extras, 2423 repr(default), 2424 long_attrs) 2425 ) 2426 return fields 2427 2428 2429@conf.commands.register 2430def ls(obj=None, # type: Optional[Union[str, Packet, Type[Packet]]] 2431 case_sensitive=False, # type: bool 2432 verbose=False # type: bool 2433 ): 2434 # type: (...) -> None 2435 """List available layers, or infos on a given layer class or name. 2436 2437 :param obj: Packet / packet name to use 2438 :param case_sensitive: if obj is a string, is it case sensitive? 2439 :param verbose: 2440 """ 2441 if obj is None or isinstance(obj, str): 2442 tip = False 2443 if obj is None: 2444 tip = True 2445 all_layers = sorted(conf.layers, key=lambda x: x.__name__) 2446 else: 2447 pattern = re.compile( 2448 obj, 2449 0 if case_sensitive else re.I 2450 ) 2451 # We first order by accuracy, then length 2452 if case_sensitive: 2453 sorter = lambda x: (x.__name__.index(obj), len(x.__name__)) 2454 else: 2455 obj = obj.lower() 2456 sorter = lambda x: (x.__name__.lower().index(obj), 2457 len(x.__name__)) 2458 all_layers = sorted((layer for layer in conf.layers 2459 if (isinstance(layer.__name__, str) and 2460 pattern.search(layer.__name__)) or 2461 (isinstance(layer.name, str) and 2462 pattern.search(layer.name))), 2463 key=sorter) 2464 for layer in all_layers: 2465 print("%-10s : %s" % (layer.__name__, layer._name)) 2466 if tip and conf.interactive: 2467 print("\nTIP: You may use explore() to navigate through all " 2468 "layers using a clear GUI") 2469 else: 2470 try: 2471 fields = _pkt_ls( 2472 obj, 2473 verbose=verbose 2474 ) 2475 is_pkt = isinstance(obj, Packet) 2476 # Print 2477 for fname, cls, clsne, dflt, long_attrs in fields: 2478 clsinfo = cls.__name__ + " " + clsne 2479 print("%-10s : %-35s =" % (fname, clsinfo), end=' ') 2480 if is_pkt: 2481 print("%-15r" % (getattr(obj, fname),), end=' ') 2482 print("(%r)" % (dflt,)) 2483 for attr in long_attrs: 2484 print("%-15s%s" % ("", attr)) 2485 # Restart for payload if any 2486 if is_pkt: 2487 obj = cast(Packet, obj) 2488 if isinstance(obj.payload, NoPayload): 2489 return 2490 print("--") 2491 ls(obj.payload) 2492 except ValueError: 2493 print("Not a packet class or name. Type 'ls()' to list packet classes.") # noqa: E501 2494 2495 2496@conf.commands.register 2497def rfc(cls, ret=False, legend=True): 2498 # type: (Type[Packet], bool, bool) -> Optional[str] 2499 """ 2500 Generate an RFC-like representation of a packet def. 2501 2502 :param cls: the Packet class 2503 :param ret: return the result instead of printing (def. False) 2504 :param legend: show text under the diagram (default True) 2505 2506 Ex:: 2507 2508 >>> rfc(Ether) 2509 2510 """ 2511 if not issubclass(cls, Packet): 2512 raise TypeError("Packet class expected") 2513 cur_len = 0 2514 cur_line = [] 2515 lines = [] 2516 # Get the size (width) that a field will take 2517 # when formatted, from its length in bits 2518 clsize = lambda x: 2 * x - 1 # type: Callable[[int], int] 2519 ident = 0 # Fields UUID 2520 2521 # Generate packet groups 2522 def _iterfields() -> Iterator[Tuple[str, int]]: 2523 for f in cls.fields_desc: 2524 # Fancy field name 2525 fname = f.name.upper().replace("_", " ") 2526 fsize = int(f.sz * 8) 2527 yield fname, fsize 2528 # Add padding optionally 2529 if isinstance(f, PadField): 2530 if isinstance(f._align, tuple): 2531 pad = - cur_len % (f._align[0] * 8) 2532 else: 2533 pad = - cur_len % (f._align * 8) 2534 if pad: 2535 yield "padding", pad 2536 for fname, flen in _iterfields(): 2537 cur_len += flen 2538 ident += 1 2539 # The field might exceed the current line or 2540 # take more than one line. Copy it as required 2541 while True: 2542 over = max(0, cur_len - 32) # Exceed 2543 len1 = clsize(flen - over) # What fits 2544 cur_line.append((fname[:len1], len1, ident)) 2545 if cur_len >= 32: 2546 # Current line is full. start a new line 2547 lines.append(cur_line) 2548 cur_len = flen = over 2549 fname = "" # do not repeat the field 2550 cur_line = [] 2551 if not over: 2552 # there is no data left 2553 break 2554 else: 2555 # End of the field 2556 break 2557 # Add the last line if un-finished 2558 if cur_line: 2559 lines.append(cur_line) 2560 # Calculate separations between lines 2561 seps = [] 2562 seps.append("+-" * 32 + "+\n") 2563 for i in range(len(lines) - 1): 2564 # Start with a full line 2565 sep = "+-" * 32 + "+\n" 2566 # Get the line above and below the current 2567 # separation 2568 above, below = lines[i], lines[i + 1] 2569 # The last field of above is shared with below 2570 if above[-1][2] == below[0][2]: 2571 # where the field in "above" starts 2572 pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1 2573 # where the field in "below" ends 2574 pos_below = below[0][1] 2575 if pos_above < pos_below: 2576 # they are overlapping. 2577 # Now crop the space between those pos 2578 # and fill it with " " 2579 pos_above = pos_above + pos_above % 2 2580 sep = ( 2581 sep[:1 + pos_above] + 2582 " " * (pos_below - pos_above) + 2583 sep[1 + pos_below:] 2584 ) 2585 # line is complete 2586 seps.append(sep) 2587 # Graph 2588 result = "" 2589 # Bytes markers 2590 result += " " + (" " * 19).join( 2591 str(x) for x in range(4) 2592 ) + "\n" 2593 # Bits markers 2594 result += " " + " ".join( 2595 str(x % 10) for x in range(32) 2596 ) + "\n" 2597 # Add fields and their separations 2598 for line, sep in zip(lines, seps): 2599 result += sep 2600 for elt, flen, _ in line: 2601 result += "|" + elt.center(flen, " ") 2602 result += "|\n" 2603 result += "+-" * (cur_len or 32) + "+\n" 2604 # Annotate with the figure name 2605 if legend: 2606 result += "\n" + ("Fig. " + cls.__name__).center(66, " ") 2607 # return if asked for, else print 2608 if ret: 2609 return result 2610 print(result) 2611 return None 2612 2613 2614############# 2615# Fuzzing # 2616############# 2617 2618_P = TypeVar('_P', bound=Packet) 2619 2620 2621@conf.commands.register 2622def fuzz(p, # type: _P 2623 _inplace=0, # type: int 2624 ): 2625 # type: (...) -> _P 2626 """ 2627 Transform a layer into a fuzzy layer by replacing some default values 2628 by random objects. 2629 2630 :param p: the Packet instance to fuzz 2631 :return: the fuzzed packet. 2632 """ 2633 if not _inplace: 2634 p = p.copy() 2635 q = cast(Packet, p) 2636 while not isinstance(q, NoPayload): 2637 new_default_fields = {} 2638 multiple_type_fields = [] # type: List[str] 2639 for f in q.fields_desc: 2640 if isinstance(f, PacketListField): 2641 for r in getattr(q, f.name): 2642 fuzz(r, _inplace=1) 2643 elif isinstance(f, MultipleTypeField): 2644 # the type of the field will depend on others 2645 multiple_type_fields.append(f.name) 2646 elif f.default is not None: 2647 if not isinstance(f, ConditionalField) or f._evalcond(q): 2648 rnd = f.randval() 2649 if rnd is not None: 2650 new_default_fields[f.name] = rnd 2651 # Process packets with MultipleTypeFields 2652 if multiple_type_fields: 2653 # freeze the other random values 2654 new_default_fields = { 2655 key: (val._fix() if isinstance(val, VolatileValue) else val) 2656 for key, val in new_default_fields.items() 2657 } 2658 q.default_fields.update(new_default_fields) 2659 new_default_fields.clear() 2660 # add the random values of the MultipleTypeFields 2661 for name in multiple_type_fields: 2662 fld = cast(MultipleTypeField, q.get_field(name)) 2663 rnd = fld._find_fld_pkt(q).randval() 2664 if rnd is not None: 2665 new_default_fields[name] = rnd 2666 q.default_fields.update(new_default_fields) 2667 q = q.payload 2668 return p 2669