• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Packet class
8
9Provides:
10 - the default Packet classes
11 - binding mechanisms
12 - fuzz() method
13 - exploration methods: explore() / ls()
14"""
15
16from collections import defaultdict
17
18import json
19import re
20import time
21import itertools
22import copy
23import types
24import warnings
25
26from scapy.fields import (
27    AnyField,
28    BitField,
29    ConditionalField,
30    Emph,
31    EnumField,
32    Field,
33    FlagsField,
34    FlagValue,
35    MayEnd,
36    MultiEnumField,
37    MultipleTypeField,
38    PadField,
39    PacketListField,
40    RawVal,
41    StrField,
42)
43from scapy.config import conf, _version_checker
44from scapy.compat import raw, orb, bytes_encode
45from scapy.base_classes import BasePacket, Gen, SetGen, Packet_metaclass, \
46    _CanvasDumpExtended
47from scapy.interfaces import _GlobInterfaceType
48from scapy.volatile import RandField, VolatileValue
49from scapy.utils import import_hexcap, tex_escape, colgen, issubtype, \
50    pretty_list, EDecimal
51from scapy.error import Scapy_Exception, log_runtime, warning
52from scapy.libs.test_pyx import PYX
53
54# Typing imports
55from typing import (
56    Any,
57    Callable,
58    Dict,
59    Iterator,
60    List,
61    NoReturn,
62    Optional,
63    Set,
64    Tuple,
65    Type,
66    TypeVar,
67    Union,
68    Sequence,
69    cast,
70)
71from scapy.compat import Self
72
73try:
74    import pyx
75except ImportError:
76    pass
77
78
79_T = TypeVar("_T", Dict[str, Any], Optional[Dict[str, Any]])
80
81
82class Packet(
83    BasePacket,
84    _CanvasDumpExtended,
85    metaclass=Packet_metaclass
86):
87    __slots__ = [
88        "time", "sent_time", "name",
89        "default_fields", "fields", "fieldtype",
90        "overload_fields", "overloaded_fields",
91        "packetfields",
92        "original", "explicit", "raw_packet_cache",
93        "raw_packet_cache_fields", "_pkt", "post_transforms",
94        "stop_dissection_after",
95        # then payload, underlayer and parent
96        "payload", "underlayer", "parent",
97        "name",
98        # used for sr()
99        "_answered",
100        # used when sniffing
101        "direction", "sniffed_on",
102        # handle snaplen Vs real length
103        "wirelen",
104        "comment",
105        "process_information"
106    ]
107    name = None
108    fields_desc = []  # type: List[AnyField]
109    deprecated_fields = {}  # type: Dict[str, Tuple[str, str]]
110    overload_fields = {}  # type: Dict[Type[Packet], Dict[str, Any]]
111    payload_guess = []  # type: List[Tuple[Dict[str, Any], Type[Packet]]]
112    show_indent = 1
113    show_summary = True
114    match_subclass = False
115    class_dont_cache = {}  # type: Dict[Type[Packet], bool]
116    class_packetfields = {}  # type: Dict[Type[Packet], Any]
117    class_default_fields = {}  # type: Dict[Type[Packet], Dict[str, Any]]
118    class_default_fields_ref = {}  # type: Dict[Type[Packet], List[str]]
119    class_fieldtype = {}  # type: Dict[Type[Packet], Dict[str, AnyField]]  # noqa: E501
120
121    @classmethod
122    def from_hexcap(cls):
123        # type: (Type[Packet]) -> Packet
124        return cls(import_hexcap())
125
126    @classmethod
127    def upper_bonds(self):
128        # type: () -> None
129        for fval, upper in self.payload_guess:
130            print(
131                "%-20s  %s" % (
132                    upper.__name__,
133                    ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
134                )
135            )
136
137    @classmethod
138    def lower_bonds(self):
139        # type: () -> None
140        for lower, fval in self._overload_fields.items():
141            print(
142                "%-20s  %s" % (
143                    lower.__name__,
144                    ", ".join("%-12s" % ("%s=%r" % i) for i in fval.items()),
145                )
146            )
147
148    def __init__(self,
149                 _pkt=b"",  # type: Union[bytes, bytearray]
150                 post_transform=None,  # type: Any
151                 _internal=0,  # type: int
152                 _underlayer=None,  # type: Optional[Packet]
153                 _parent=None,  # type: Optional[Packet]
154                 stop_dissection_after=None,  # type: Optional[Type[Packet]]
155                 **fields  # type: Any
156                 ):
157        # type: (...) -> None
158        self.time = time.time()  # type: Union[EDecimal, float]
159        self.sent_time = None  # type: Union[EDecimal, float, None]
160        self.name = (self.__class__.__name__
161                     if self._name is None else
162                     self._name)
163        self.default_fields = {}  # type: Dict[str, Any]
164        self.overload_fields = self._overload_fields
165        self.overloaded_fields = {}  # type: Dict[str, Any]
166        self.fields = {}  # type: Dict[str, Any]
167        self.fieldtype = {}  # type: Dict[str, AnyField]
168        self.packetfields = []  # type: List[AnyField]
169        self.payload = NoPayload()  # type: Packet
170        self.init_fields(bool(_pkt))
171        self.underlayer = _underlayer
172        self.parent = _parent
173        if isinstance(_pkt, bytearray):
174            _pkt = bytes(_pkt)
175        self.original = _pkt
176        self.explicit = 0
177        self.raw_packet_cache = None  # type: Optional[bytes]
178        self.raw_packet_cache_fields = None  # type: Optional[Dict[str, Any]]  # noqa: E501
179        self.wirelen = None  # type: Optional[int]
180        self.direction = None  # type: Optional[int]
181        self.sniffed_on = None  # type: Optional[_GlobInterfaceType]
182        self.comment = None  # type: Optional[bytes]
183        self.process_information = None  # type: Optional[Dict[str, Any]]
184        self.stop_dissection_after = stop_dissection_after
185        if _pkt:
186            self.dissect(_pkt)
187            if not _internal:
188                self.dissection_done(self)
189        # We use this strange initialization so that the fields
190        # are initialized in their declaration order.
191        # It is required to always support MultipleTypeField
192        for field in self.fields_desc:
193            fname = field.name
194            try:
195                value = fields.pop(fname)
196            except KeyError:
197                continue
198            self.fields[fname] = value if isinstance(value, RawVal) else \
199                self.get_field(fname).any2i(self, value)
200        # The remaining fields are unknown
201        for fname in fields:
202            if fname in self.deprecated_fields:
203                # Resolve deprecated fields
204                value = fields[fname]
205                fname = self._resolve_alias(fname)
206                self.fields[fname] = value if isinstance(value, RawVal) else \
207                    self.get_field(fname).any2i(self, value)
208                continue
209            raise AttributeError(fname)
210        if isinstance(post_transform, list):
211            self.post_transforms = post_transform
212        elif post_transform is None:
213            self.post_transforms = []
214        else:
215            self.post_transforms = [post_transform]
216
217    _PickleType = Tuple[
218        Union[EDecimal, float],
219        Optional[Union[EDecimal, float, None]],
220        Optional[int],
221        Optional[_GlobInterfaceType],
222        Optional[int],
223        Optional[bytes],
224    ]
225
226    def __reduce__(self):
227        # type: () -> Tuple[Type[Packet], Tuple[bytes], Packet._PickleType]
228        """Used by pickling methods"""
229        return (self.__class__, (self.build(),), (
230            self.time,
231            self.sent_time,
232            self.direction,
233            self.sniffed_on,
234            self.wirelen,
235            self.comment
236        ))
237
238    def __setstate__(self, state):
239        # type: (Packet._PickleType) -> Packet
240        """Rebuild state using pickable methods"""
241        self.time = state[0]
242        self.sent_time = state[1]
243        self.direction = state[2]
244        self.sniffed_on = state[3]
245        self.wirelen = state[4]
246        self.comment = state[5]
247        return self
248
249    def __deepcopy__(self,
250                     memo,  # type: Any
251                     ):
252        # type: (...) -> Packet
253        """Used by copy.deepcopy"""
254        return self.copy()
255
256    def init_fields(self, for_dissect_only=False):
257        # type: (bool) -> None
258        """
259        Initialize each fields of the fields_desc dict
260        """
261
262        if self.class_dont_cache.get(self.__class__, False):
263            self.do_init_fields(self.fields_desc)
264        else:
265            self.do_init_cached_fields(for_dissect_only=for_dissect_only)
266
267    def do_init_fields(self,
268                       flist,  # type: Sequence[AnyField]
269                       ):
270        # type: (...) -> None
271        """
272        Initialize each fields of the fields_desc dict
273        """
274        default_fields = {}
275        for f in flist:
276            default_fields[f.name] = copy.deepcopy(f.default)
277            self.fieldtype[f.name] = f
278            if f.holds_packets:
279                self.packetfields.append(f)
280        # We set default_fields last to avoid race issues
281        self.default_fields = default_fields
282
283    def do_init_cached_fields(self, for_dissect_only=False):
284        # type: (bool) -> None
285        """
286        Initialize each fields of the fields_desc dict, or use the cached
287        fields information
288        """
289
290        cls_name = self.__class__
291
292        # Build the fields information
293        if Packet.class_default_fields.get(cls_name, None) is None:
294            self.prepare_cached_fields(self.fields_desc)
295
296        # Use fields information from cache
297        default_fields = Packet.class_default_fields.get(cls_name, None)
298        if default_fields:
299            self.default_fields = default_fields
300            self.fieldtype = Packet.class_fieldtype[cls_name]
301            self.packetfields = Packet.class_packetfields[cls_name]
302
303            # Optimization: no need for references when only dissecting.
304            if for_dissect_only:
305                return
306
307            # Deepcopy default references
308            for fname in Packet.class_default_fields_ref[cls_name]:
309                value = self.default_fields[fname]
310                try:
311                    self.fields[fname] = value.copy()
312                except AttributeError:
313                    # Python 2.7 - list only
314                    self.fields[fname] = value[:]
315
316    def prepare_cached_fields(self, flist):
317        # type: (Sequence[AnyField]) -> None
318        """
319        Prepare the cached fields of the fields_desc dict
320        """
321
322        cls_name = self.__class__
323
324        # Fields cache initialization
325        if not flist:
326            return
327
328        class_default_fields = dict()
329        class_default_fields_ref = list()
330        class_fieldtype = dict()
331        class_packetfields = list()
332
333        # Fields initialization
334        for f in flist:
335            if isinstance(f, MultipleTypeField):
336                # Abort
337                self.class_dont_cache[cls_name] = True
338                self.do_init_fields(self.fields_desc)
339                return
340
341            class_default_fields[f.name] = copy.deepcopy(f.default)
342            class_fieldtype[f.name] = f
343            if f.holds_packets:
344                class_packetfields.append(f)
345
346            # Remember references
347            if isinstance(f.default, (list, dict, set, RandField, Packet)):
348                class_default_fields_ref.append(f.name)
349
350        # Apply
351        Packet.class_default_fields_ref[cls_name] = class_default_fields_ref
352        Packet.class_fieldtype[cls_name] = class_fieldtype
353        Packet.class_packetfields[cls_name] = class_packetfields
354        # Last to avoid racing issues
355        Packet.class_default_fields[cls_name] = class_default_fields
356
357    def dissection_done(self, pkt):
358        # type: (Packet) -> None
359        """DEV: will be called after a dissection is completed"""
360        self.post_dissection(pkt)
361        self.payload.dissection_done(pkt)
362
363    def post_dissection(self, pkt):
364        # type: (Packet) -> None
365        """DEV: is called after the dissection of the whole packet"""
366        pass
367
368    def get_field(self, fld):
369        # type: (str) -> AnyField
370        """DEV: returns the field instance from the name of the field"""
371        return self.fieldtype[fld]
372
373    def add_payload(self, payload):
374        # type: (Union[Packet, bytes]) -> None
375        if payload is None:
376            return
377        elif not isinstance(self.payload, NoPayload):
378            self.payload.add_payload(payload)
379        else:
380            if isinstance(payload, Packet):
381                self.payload = payload
382                payload.add_underlayer(self)
383                for t in self.aliastypes:
384                    if t in payload.overload_fields:
385                        self.overloaded_fields = payload.overload_fields[t]
386                        break
387            elif isinstance(payload, (bytes, str, bytearray, memoryview)):
388                self.payload = conf.raw_layer(load=bytes_encode(payload))
389            else:
390                raise TypeError("payload must be 'Packet', 'bytes', 'str', 'bytearray', or 'memoryview', not [%s]" % repr(payload))  # noqa: E501
391
392    def remove_payload(self):
393        # type: () -> None
394        self.payload.remove_underlayer(self)
395        self.payload = NoPayload()
396        self.overloaded_fields = {}
397
398    def add_underlayer(self, underlayer):
399        # type: (Packet) -> None
400        self.underlayer = underlayer
401
402    def remove_underlayer(self, other):
403        # type: (Packet) -> None
404        self.underlayer = None
405
406    def add_parent(self, parent):
407        # type: (Packet) -> None
408        """Set packet parent.
409        When packet is an element in PacketListField, parent field would
410        point to the list owner packet."""
411        self.parent = parent
412
413    def remove_parent(self, other):
414        # type: (Packet) -> None
415        """Remove packet parent.
416        When packet is an element in PacketListField, parent field would
417        point to the list owner packet."""
418        self.parent = None
419
420    def copy(self) -> Self:
421        """Returns a deep copy of the instance."""
422        clone = self.__class__()
423        clone.fields = self.copy_fields_dict(self.fields)
424        clone.default_fields = self.copy_fields_dict(self.default_fields)
425        clone.overloaded_fields = self.overloaded_fields.copy()
426        clone.underlayer = self.underlayer
427        clone.parent = self.parent
428        clone.explicit = self.explicit
429        clone.raw_packet_cache = self.raw_packet_cache
430        clone.raw_packet_cache_fields = self.copy_fields_dict(
431            self.raw_packet_cache_fields
432        )
433        clone.wirelen = self.wirelen
434        clone.post_transforms = self.post_transforms[:]
435        clone.payload = self.payload.copy()
436        clone.payload.add_underlayer(clone)
437        clone.time = self.time
438        clone.comment = self.comment
439        clone.direction = self.direction
440        clone.sniffed_on = self.sniffed_on
441        return clone
442
443    def _resolve_alias(self, attr):
444        # type: (str) -> str
445        new_attr, version = self.deprecated_fields[attr]
446        warnings.warn(
447            "%s has been deprecated in favor of %s since %s !" % (
448                attr, new_attr, version
449            ), DeprecationWarning
450        )
451        return new_attr
452
453    def getfieldval(self, attr):
454        # type: (str) -> Any
455        if self.deprecated_fields and attr in self.deprecated_fields:
456            attr = self._resolve_alias(attr)
457        if attr in self.fields:
458            return self.fields[attr]
459        if attr in self.overloaded_fields:
460            return self.overloaded_fields[attr]
461        if attr in self.default_fields:
462            return self.default_fields[attr]
463        return self.payload.getfieldval(attr)
464
465    def getfield_and_val(self, attr):
466        # type: (str) -> Tuple[AnyField, Any]
467        if self.deprecated_fields and attr in self.deprecated_fields:
468            attr = self._resolve_alias(attr)
469        if attr in self.fields:
470            return self.get_field(attr), self.fields[attr]
471        if attr in self.overloaded_fields:
472            return self.get_field(attr), self.overloaded_fields[attr]
473        if attr in self.default_fields:
474            return self.get_field(attr), self.default_fields[attr]
475        raise ValueError
476
477    def __getattr__(self, attr):
478        # type: (str) -> Any
479        try:
480            fld, v = self.getfield_and_val(attr)
481        except ValueError:
482            return self.payload.__getattr__(attr)
483        if fld is not None:
484            return v if isinstance(v, RawVal) else fld.i2h(self, v)
485        return v
486
487    def setfieldval(self, attr, val):
488        # type: (str, Any) -> None
489        if self.deprecated_fields and attr in self.deprecated_fields:
490            attr = self._resolve_alias(attr)
491        if attr in self.default_fields:
492            fld = self.get_field(attr)
493            if fld is None:
494                any2i = lambda x, y: y  # type: Callable[..., Any]
495            else:
496                any2i = fld.any2i
497            self.fields[attr] = val if isinstance(val, RawVal) else \
498                any2i(self, val)
499            self.explicit = 0
500            self.raw_packet_cache = None
501            self.raw_packet_cache_fields = None
502            self.wirelen = None
503        elif attr == "payload":
504            self.remove_payload()
505            self.add_payload(val)
506        else:
507            self.payload.setfieldval(attr, val)
508
509    def __setattr__(self, attr, val):
510        # type: (str, Any) -> None
511        if attr in self.__all_slots__:
512            return object.__setattr__(self, attr, val)
513        try:
514            return self.setfieldval(attr, val)
515        except AttributeError:
516            pass
517        return object.__setattr__(self, attr, val)
518
519    def delfieldval(self, attr):
520        # type: (str) -> None
521        if attr in self.fields:
522            del self.fields[attr]
523            self.explicit = 0  # in case a default value must be explicit
524            self.raw_packet_cache = None
525            self.raw_packet_cache_fields = None
526            self.wirelen = None
527        elif attr in self.default_fields:
528            pass
529        elif attr == "payload":
530            self.remove_payload()
531        else:
532            self.payload.delfieldval(attr)
533
534    def __delattr__(self, attr):
535        # type: (str) -> None
536        if attr == "payload":
537            return self.remove_payload()
538        if attr in self.__all_slots__:
539            return object.__delattr__(self, attr)
540        try:
541            return self.delfieldval(attr)
542        except AttributeError:
543            pass
544        return object.__delattr__(self, attr)
545
546    def _superdir(self):
547        # type: () -> Set[str]
548        """
549        Return a list of slots and methods, including those from subclasses.
550        """
551        attrs = set()  # type: Set[str]
552        cls = self.__class__
553        if hasattr(cls, '__all_slots__'):
554            attrs.update(cls.__all_slots__)
555        for bcls in cls.__mro__:
556            if hasattr(bcls, '__dict__'):
557                attrs.update(bcls.__dict__)
558        return attrs
559
560    def __dir__(self):
561        # type: () -> List[str]
562        """
563        Add fields to tab completion list.
564        """
565        return sorted(itertools.chain(self._superdir(), self.default_fields))
566
567    def __repr__(self):
568        # type: () -> str
569        s = ""
570        ct = conf.color_theme
571        for f in self.fields_desc:
572            if isinstance(f, ConditionalField) and not f._evalcond(self):
573                continue
574            if f.name in self.fields:
575                fval = self.fields[f.name]
576                if isinstance(fval, (list, dict, set)) and len(fval) == 0:
577                    continue
578                val = f.i2repr(self, fval)
579            elif f.name in self.overloaded_fields:
580                fover = self.overloaded_fields[f.name]
581                if isinstance(fover, (list, dict, set)) and len(fover) == 0:
582                    continue
583                val = f.i2repr(self, fover)
584            else:
585                continue
586            if isinstance(f, Emph) or f in conf.emph:
587                ncol = ct.emph_field_name
588                vcol = ct.emph_field_value
589            else:
590                ncol = ct.field_name
591                vcol = ct.field_value
592
593            s += " %s%s%s" % (ncol(f.name),
594                              ct.punct("="),
595                              vcol(val))
596        return "%s%s %s %s%s%s" % (ct.punct("<"),
597                                   ct.layer_name(self.__class__.__name__),
598                                   s,
599                                   ct.punct("|"),
600                                   repr(self.payload),
601                                   ct.punct(">"))
602
603    def __str__(self):
604        # type: () -> str
605        return self.summary()
606
607    def __bytes__(self):
608        # type: () -> bytes
609        return self.build()
610
611    def __div__(self, other):
612        # type: (Any) -> Self
613        if isinstance(other, Packet):
614            cloneA = self.copy()
615            cloneB = other.copy()
616            cloneA.add_payload(cloneB)
617            return cloneA
618        elif isinstance(other, (bytes, str, bytearray, memoryview)):
619            return self / conf.raw_layer(load=bytes_encode(other))
620        else:
621            return other.__rdiv__(self)  # type: ignore
622    __truediv__ = __div__
623
624    def __rdiv__(self, other):
625        # type: (Any) -> Packet
626        if isinstance(other, (bytes, str, bytearray, memoryview)):
627            return conf.raw_layer(load=bytes_encode(other)) / self
628        else:
629            raise TypeError
630    __rtruediv__ = __rdiv__
631
632    def __mul__(self, other):
633        # type: (Any) -> List[Packet]
634        if isinstance(other, int):
635            return [self] * other
636        else:
637            raise TypeError
638
639    def __rmul__(self, other):
640        # type: (Any) -> List[Packet]
641        return self.__mul__(other)
642
643    def __nonzero__(self):
644        # type: () -> bool
645        return True
646    __bool__ = __nonzero__
647
648    def __len__(self):
649        # type: () -> int
650        return len(self.__bytes__())
651
652    def copy_field_value(self, fieldname, value):
653        # type: (str, Any) -> Any
654        return self.get_field(fieldname).do_copy(value)
655
656    def copy_fields_dict(self, fields):
657        # type: (_T) -> _T
658        if fields is None:
659            return None
660        return {fname: self.copy_field_value(fname, fval)
661                for fname, fval in fields.items()}
662
663    def _raw_packet_cache_field_value(self, fld, val, copy=False):
664        # type: (AnyField, Any, bool) -> Optional[Any]
665        """Get a value representative of a mutable field to detect changes"""
666        _cpy = lambda x: fld.do_copy(x) if copy else x  # type: Callable[[Any], Any]
667        if fld.holds_packets:
668            # avoid copying whole packets (perf: #GH3894)
669            if fld.islist:
670                return [
671                    (_cpy(x.fields), x.payload.raw_packet_cache) for x in val
672                ]
673            else:
674                return (_cpy(val.fields), val.payload.raw_packet_cache)
675        elif fld.islist or fld.ismutable:
676            return _cpy(val)
677        return None
678
679    def clear_cache(self):
680        # type: () -> None
681        """Clear the raw packet cache for the field and all its subfields"""
682        self.raw_packet_cache = None
683        for fname, fval in self.fields.items():
684            fld = self.get_field(fname)
685            if fld.holds_packets:
686                if isinstance(fval, Packet):
687                    fval.clear_cache()
688                elif isinstance(fval, list):
689                    for fsubval in fval:
690                        fsubval.clear_cache()
691        self.payload.clear_cache()
692
693    def self_build(self):
694        # type: () -> bytes
695        """
696        Create the default layer regarding fields_desc dict
697
698        :param field_pos_list:
699        """
700        if self.raw_packet_cache is not None and \
701                self.raw_packet_cache_fields is not None:
702            for fname, fval in self.raw_packet_cache_fields.items():
703                fld, val = self.getfield_and_val(fname)
704                if self._raw_packet_cache_field_value(fld, val) != fval:
705                    self.raw_packet_cache = None
706                    self.raw_packet_cache_fields = None
707                    self.wirelen = None
708                    break
709            if self.raw_packet_cache is not None:
710                return self.raw_packet_cache
711        p = b""
712        for f in self.fields_desc:
713            val = self.getfieldval(f.name)
714            if isinstance(val, RawVal):
715                p += bytes(val)
716            else:
717                try:
718                    p = f.addfield(self, p, val)
719                except Exception as ex:
720                    try:
721                        ex.args = (
722                            "While dissecting field '%s': " % f.name +
723                            ex.args[0],
724                        ) + ex.args[1:]
725                    except (AttributeError, IndexError):
726                        pass
727                    raise ex
728        return p
729
730    def do_build_payload(self):
731        # type: () -> bytes
732        """
733        Create the default version of the payload layer
734
735        :return: a string of payload layer
736        """
737        return self.payload.do_build()
738
739    def do_build(self):
740        # type: () -> bytes
741        """
742        Create the default version of the layer
743
744        :return: a string of the packet with the payload
745        """
746        if not self.explicit:
747            self = next(iter(self))
748        pkt = self.self_build()
749        for t in self.post_transforms:
750            pkt = t(pkt)
751        pay = self.do_build_payload()
752        if self.raw_packet_cache is None:
753            return self.post_build(pkt, pay)
754        else:
755            return pkt + pay
756
757    def build_padding(self):
758        # type: () -> bytes
759        return self.payload.build_padding()
760
761    def build(self):
762        # type: () -> bytes
763        """
764        Create the current layer
765
766        :return: string of the packet with the payload
767        """
768        p = self.do_build()
769        p += self.build_padding()
770        p = self.build_done(p)
771        return p
772
773    def post_build(self, pkt, pay):
774        # type: (bytes, bytes) -> bytes
775        """
776        DEV: called right after the current layer is build.
777
778        :param str pkt: the current packet (build by self_build function)
779        :param str pay: the packet payload (build by do_build_payload function)
780        :return: a string of the packet with the payload
781        """
782        return pkt + pay
783
784    def build_done(self, p):
785        # type: (bytes) -> bytes
786        return self.payload.build_done(p)
787
788    def do_build_ps(self):
789        # type: () -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Field[Any, Any], str, bytes]]]]]  # noqa: E501
790        p = b""
791        pl = []
792        q = b""
793        for f in self.fields_desc:
794            if isinstance(f, ConditionalField) and not f._evalcond(self):
795                continue
796            p = f.addfield(self, p, self.getfieldval(f.name))
797            if isinstance(p, bytes):
798                r = p[len(q):]
799                q = p
800            else:
801                r = b""
802            pl.append((f, f.i2repr(self, self.getfieldval(f.name)), r))
803
804        pkt, lst = self.payload.build_ps(internal=1)
805        p += pkt
806        lst.append((self, pl))
807
808        return p, lst
809
810    def build_ps(self, internal=0):
811        # type: (int) -> Tuple[bytes, List[Tuple[Packet, List[Tuple[Any, Any, bytes]]]]]  # noqa: E501
812        p, lst = self.do_build_ps()
813#        if not internal:
814#            pkt = self
815#            while pkt.haslayer(conf.padding_layer):
816#                pkt = pkt.getlayer(conf.padding_layer)
817#                lst.append( (pkt, [ ("loakjkjd", pkt.load, pkt.load) ] ) )
818#                p += pkt.load
819#                pkt = pkt.payload
820        return p, lst
821
822    def canvas_dump(self, layer_shift=0, rebuild=1):
823        # type: (int, int) -> pyx.canvas.canvas
824        if PYX == 0:
825            raise ImportError("PyX and its dependencies must be installed")
826        canvas = pyx.canvas.canvas()
827        if rebuild:
828            _, t = self.__class__(raw(self)).build_ps()
829        else:
830            _, t = self.build_ps()
831        YTXTI = len(t)
832        for _, l in t:
833            YTXTI += len(l)
834        YTXT = float(YTXTI)
835        YDUMP = YTXT
836
837        XSTART = 1
838        XDSTART = 10
839        y = 0.0
840        yd = 0.0
841        XMUL = 0.55
842        YMUL = 0.4
843
844        backcolor = colgen(0.6, 0.8, 1.0, trans=pyx.color.rgb)
845        forecolor = colgen(0.2, 0.5, 0.8, trans=pyx.color.rgb)
846#        backcolor=makecol(0.376, 0.729, 0.525, 1.0)
847
848        def hexstr(x):
849            # type: (bytes) -> str
850            return " ".join("%02x" % orb(c) for c in x)
851
852        def make_dump_txt(x, y, txt):
853            # type: (int, float, bytes) -> pyx.text.text
854            return pyx.text.text(
855                XDSTART + x * XMUL,
856                (YDUMP - y) * YMUL,
857                r"\tt{%s}" % hexstr(txt),
858                [pyx.text.size.Large]
859            )
860
861        def make_box(o):
862            # type: (pyx.bbox.bbox) -> pyx.bbox.bbox
863            return pyx.box.rect(
864                o.left(), o.bottom(), o.width(), o.height(),
865                relcenter=(0.5, 0.5)
866            )
867
868        def make_frame(lst):
869            # type: (List[Any]) -> pyx.path.path
870            if len(lst) == 1:
871                b = lst[0].bbox()
872                b.enlarge(pyx.unit.u_pt)
873                return b.path()
874            else:
875                fb = lst[0].bbox()
876                fb.enlarge(pyx.unit.u_pt)
877                lb = lst[-1].bbox()
878                lb.enlarge(pyx.unit.u_pt)
879                if len(lst) == 2 and fb.left() > lb.right():
880                    return pyx.path.path(pyx.path.moveto(fb.right(), fb.top()),
881                                         pyx.path.lineto(fb.left(), fb.top()),
882                                         pyx.path.lineto(fb.left(), fb.bottom()),  # noqa: E501
883                                         pyx.path.lineto(fb.right(), fb.bottom()),  # noqa: E501
884                                         pyx.path.moveto(lb.left(), lb.top()),
885                                         pyx.path.lineto(lb.right(), lb.top()),
886                                         pyx.path.lineto(lb.right(), lb.bottom()),  # noqa: E501
887                                         pyx.path.lineto(lb.left(), lb.bottom()))  # noqa: E501
888                else:
889                    # XXX
890                    gb = lst[1].bbox()
891                    if gb != lb:
892                        gb.enlarge(pyx.unit.u_pt)
893                    kb = lst[-2].bbox()
894                    if kb != gb and kb != lb:
895                        kb.enlarge(pyx.unit.u_pt)
896                    return pyx.path.path(pyx.path.moveto(fb.left(), fb.top()),
897                                         pyx.path.lineto(fb.right(), fb.top()),
898                                         pyx.path.lineto(fb.right(), kb.bottom()),  # noqa: E501
899                                         pyx.path.lineto(lb.right(), kb.bottom()),  # noqa: E501
900                                         pyx.path.lineto(lb.right(), lb.bottom()),  # noqa: E501
901                                         pyx.path.lineto(lb.left(), lb.bottom()),  # noqa: E501
902                                         pyx.path.lineto(lb.left(), gb.top()),
903                                         pyx.path.lineto(fb.left(), gb.top()),
904                                         pyx.path.closepath(),)
905
906        def make_dump(s,   # type: bytes
907                      shift=0,  # type: int
908                      y=0.,  # type: float
909                      col=None,  # type: pyx.color.color
910                      bkcol=None,  # type: pyx.color.color
911                      large=16  # type: int
912                      ):
913            # type: (...) -> Tuple[pyx.canvas.canvas, pyx.bbox.bbox, int, float]  # noqa: E501
914            c = pyx.canvas.canvas()
915            tlist = []
916            while s:
917                dmp, s = s[:large - shift], s[large - shift:]
918                txt = make_dump_txt(shift, y, dmp)
919                tlist.append(txt)
920                shift += len(dmp)
921                if shift >= 16:
922                    shift = 0
923                    y += 1
924            if col is None:
925                col = pyx.color.rgb.red
926            if bkcol is None:
927                bkcol = pyx.color.rgb.white
928            c.stroke(make_frame(tlist), [col, pyx.deco.filled([bkcol]), pyx.style.linewidth.Thick])  # noqa: E501
929            for txt in tlist:
930                c.insert(txt)
931            return c, tlist[-1].bbox(), shift, y
932
933        last_shift, last_y = 0, 0.0
934        while t:
935            bkcol = next(backcolor)
936            proto, fields = t.pop()
937            y += 0.5
938            pt = pyx.text.text(
939                XSTART,
940                (YTXT - y) * YMUL,
941                r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(
942                    str(proto.name)
943                ),
944                [pyx.text.size.Large]
945            )
946            y += 1
947            ptbb = pt.bbox()
948            ptbb.enlarge(pyx.unit.u_pt * 2)
949            canvas.stroke(ptbb.path(), [pyx.color.rgb.black, pyx.deco.filled([bkcol])])  # noqa: E501
950            canvas.insert(pt)
951            for field, fval, fdump in fields:
952                col = next(forecolor)
953                ft = pyx.text.text(XSTART, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(field.name))  # noqa: E501
954                if isinstance(field, BitField):
955                    fsize = '%sb' % field.size
956                else:
957                    fsize = '%sB' % len(fdump)
958                if (hasattr(field, 'field') and
959                        'LE' in field.field.__class__.__name__[:3] or
960                        'LE' in field.__class__.__name__[:3]):
961                    fsize = r'$\scriptstyle\langle$' + fsize
962                st = pyx.text.text(XSTART + 3.4, (YTXT - y) * YMUL, r"\font\cmbxfont=cmssbx10 scaled 600\cmbxfont{%s}" % fsize, [pyx.text.halign.boxright])  # noqa: E501
963                if isinstance(fval, str):
964                    if len(fval) > 18:
965                        fval = fval[:18] + "[...]"
966                else:
967                    fval = ""
968                vt = pyx.text.text(XSTART + 3.5, (YTXT - y) * YMUL, r"\font\cmssfont=cmss10\cmssfont{%s}" % tex_escape(fval))  # noqa: E501
969                y += 1.0
970                if fdump:
971                    dt, target, last_shift, last_y = make_dump(fdump, last_shift, last_y, col, bkcol)  # noqa: E501
972
973                    dtb = target
974                    vtb = vt.bbox()
975                    bxvt = make_box(vtb)
976                    bxdt = make_box(dtb)
977                    dtb.enlarge(pyx.unit.u_pt)
978                    try:
979                        if yd < 0:
980                            cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=-90)  # noqa: E501
981                        else:
982                            cnx = pyx.connector.curve(bxvt, bxdt, absangle1=0, absangle2=90)  # noqa: E501
983                    except Exception:
984                        pass
985                    else:
986                        canvas.stroke(cnx, [pyx.style.linewidth.thin, pyx.deco.earrow.small, col])  # noqa: E501
987
988                    canvas.insert(dt)
989
990                canvas.insert(ft)
991                canvas.insert(st)
992                canvas.insert(vt)
993            last_y += layer_shift
994
995        return canvas
996
997    def extract_padding(self, s):
998        # type: (bytes) -> Tuple[bytes, Optional[bytes]]
999        """
1000        DEV: to be overloaded to extract current layer's padding.
1001
1002        :param str s: the current layer
1003        :return: a couple of strings (actual layer, padding)
1004        """
1005        return s, None
1006
1007    def post_dissect(self, s):
1008        # type: (bytes) -> bytes
1009        """DEV: is called right after the current layer has been dissected"""
1010        return s
1011
1012    def pre_dissect(self, s):
1013        # type: (bytes) -> bytes
1014        """DEV: is called right before the current layer is dissected"""
1015        return s
1016
1017    def do_dissect(self, s):
1018        # type: (bytes) -> bytes
1019        _raw = s
1020        self.raw_packet_cache_fields = {}
1021        for f in self.fields_desc:
1022            s, fval = f.getfield(self, s)
1023            # Skip unused ConditionalField
1024            if isinstance(f, ConditionalField) and fval is None:
1025                continue
1026            # We need to track fields with mutable values to discard
1027            # .raw_packet_cache when needed.
1028            if (f.islist or f.holds_packets or f.ismutable) and fval is not None:
1029                self.raw_packet_cache_fields[f.name] = \
1030                    self._raw_packet_cache_field_value(f, fval, copy=True)
1031            self.fields[f.name] = fval
1032            # Nothing left to dissect
1033            if not s and (isinstance(f, MayEnd) or
1034                          (fval is not None and isinstance(f, ConditionalField) and
1035                           isinstance(f.fld, MayEnd))):
1036                break
1037        self.raw_packet_cache = _raw[:-len(s)] if s else _raw
1038        self.explicit = 1
1039        return s
1040
1041    def do_dissect_payload(self, s):
1042        # type: (bytes) -> None
1043        """
1044        Perform the dissection of the layer's payload
1045
1046        :param str s: the raw layer
1047        """
1048        if s:
1049            if (
1050                self.stop_dissection_after and
1051                isinstance(self, self.stop_dissection_after)
1052            ):
1053                # stop dissection here
1054                p = conf.raw_layer(s, _internal=1, _underlayer=self)
1055                self.add_payload(p)
1056                return
1057            cls = self.guess_payload_class(s)
1058            try:
1059                p = cls(
1060                    s,
1061                    stop_dissection_after=self.stop_dissection_after,
1062                    _internal=1,
1063                    _underlayer=self,
1064                )
1065            except KeyboardInterrupt:
1066                raise
1067            except Exception:
1068                if conf.debug_dissector:
1069                    if issubtype(cls, Packet):
1070                        log_runtime.error("%s dissector failed", cls.__name__)
1071                    else:
1072                        log_runtime.error("%s.guess_payload_class() returned "
1073                                          "[%s]",
1074                                          self.__class__.__name__, repr(cls))
1075                    if cls is not None:
1076                        raise
1077                p = conf.raw_layer(s, _internal=1, _underlayer=self)
1078            self.add_payload(p)
1079
1080    def dissect(self, s):
1081        # type: (bytes) -> None
1082        s = self.pre_dissect(s)
1083
1084        s = self.do_dissect(s)
1085
1086        s = self.post_dissect(s)
1087
1088        payl, pad = self.extract_padding(s)
1089        self.do_dissect_payload(payl)
1090        if pad and conf.padding:
1091            self.add_payload(conf.padding_layer(pad))
1092
1093    def guess_payload_class(self, payload):
1094        # type: (bytes) -> Type[Packet]
1095        """
1096        DEV: Guesses the next payload class from layer bonds.
1097        Can be overloaded to use a different mechanism.
1098
1099        :param str payload: the layer's payload
1100        :return: the payload class
1101        """
1102        for t in self.aliastypes:
1103            for fval, cls in t.payload_guess:
1104                try:
1105                    if all(v == self.getfieldval(k)
1106                           for k, v in fval.items()):
1107                        return cls  # type: ignore
1108                except AttributeError:
1109                    pass
1110        return self.default_payload_class(payload)
1111
1112    def default_payload_class(self, payload):
1113        # type: (bytes) -> Type[Packet]
1114        """
1115        DEV: Returns the default payload class if nothing has been found by the
1116        guess_payload_class() method.
1117
1118        :param str payload: the layer's payload
1119        :return: the default payload class define inside the configuration file
1120        """
1121        return conf.raw_layer
1122
1123    def hide_defaults(self):
1124        # type: () -> None
1125        """Removes fields' values that are the same as default values."""
1126        # use list(): self.fields is modified in the loop
1127        for k, v in list(self.fields.items()):
1128            v = self.fields[k]
1129            if k in self.default_fields:
1130                if self.default_fields[k] == v:
1131                    del self.fields[k]
1132        self.payload.hide_defaults()
1133
1134    def clone_with(self, payload=None, **kargs):
1135        # type: (Optional[Any], **Any) -> Any
1136        pkt = self.__class__()
1137        pkt.explicit = 1
1138        pkt.fields = kargs
1139        pkt.default_fields = self.copy_fields_dict(self.default_fields)
1140        pkt.overloaded_fields = self.overloaded_fields.copy()
1141        pkt.time = self.time
1142        pkt.underlayer = self.underlayer
1143        pkt.parent = self.parent
1144        pkt.post_transforms = self.post_transforms
1145        pkt.raw_packet_cache = self.raw_packet_cache
1146        pkt.raw_packet_cache_fields = self.copy_fields_dict(
1147            self.raw_packet_cache_fields
1148        )
1149        pkt.wirelen = self.wirelen
1150        pkt.comment = self.comment
1151        pkt.sniffed_on = self.sniffed_on
1152        pkt.direction = self.direction
1153        if payload is not None:
1154            pkt.add_payload(payload)
1155        return pkt
1156
1157    def __iter__(self):
1158        # type: () -> Iterator[Packet]
1159        """Iterates through all sub-packets generated by this Packet."""
1160        def loop(todo, done, self=self):
1161            # type: (List[str], Dict[str, Any], Any) -> Iterator[Packet]
1162            if todo:
1163                eltname = todo.pop()
1164                elt = self.getfieldval(eltname)
1165                if not isinstance(elt, Gen):
1166                    if self.get_field(eltname).islist:
1167                        elt = SetGen([elt])
1168                    else:
1169                        elt = SetGen(elt)
1170                for e in elt:
1171                    done[eltname] = e
1172                    for x in loop(todo[:], done):
1173                        yield x
1174            else:
1175                if isinstance(self.payload, NoPayload):
1176                    payloads = SetGen([None])  # type: SetGen[Packet]
1177                else:
1178                    payloads = self.payload
1179                for payl in payloads:
1180                    # Let's make sure subpackets are consistent
1181                    done2 = done.copy()
1182                    for k in done2:
1183                        if isinstance(done2[k], VolatileValue):
1184                            done2[k] = done2[k]._fix()
1185                    pkt = self.clone_with(payload=payl, **done2)
1186                    yield pkt
1187
1188        if self.explicit or self.raw_packet_cache is not None:
1189            todo = []
1190            done = self.fields
1191        else:
1192            todo = [k for (k, v) in itertools.chain(self.default_fields.items(),
1193                                                    self.overloaded_fields.items())
1194                    if isinstance(v, VolatileValue)] + list(self.fields)
1195            done = {}
1196        return loop(todo, done)
1197
1198    def iterpayloads(self):
1199        # type: () -> Iterator[Packet]
1200        """Used to iter through the payloads of a Packet.
1201        Useful for DNS or 802.11 for instance.
1202        """
1203        yield self
1204        current = self
1205        while current.payload:
1206            current = current.payload
1207            yield current
1208
1209    def __gt__(self, other):
1210        # type: (Packet) -> int
1211        """True if other is an answer from self (self ==> other)."""
1212        if isinstance(other, Packet):
1213            return other < self
1214        elif isinstance(other, bytes):
1215            return 1
1216        else:
1217            raise TypeError((self, other))
1218
1219    def __lt__(self, other):
1220        # type: (Packet) -> int
1221        """True if self is an answer from other (other ==> self)."""
1222        if isinstance(other, Packet):
1223            return self.answers(other)
1224        elif isinstance(other, bytes):
1225            return 1
1226        else:
1227            raise TypeError((self, other))
1228
1229    def __eq__(self, other):
1230        # type: (Any) -> bool
1231        if not isinstance(other, self.__class__):
1232            return False
1233        for f in self.fields_desc:
1234            if f not in other.fields_desc:
1235                return False
1236            if self.getfieldval(f.name) != other.getfieldval(f.name):
1237                return False
1238        return self.payload == other.payload
1239
1240    def __ne__(self, other):
1241        # type: (Any) -> bool
1242        return not self.__eq__(other)
1243
1244    # Note: setting __hash__ to None is the standard way
1245    # of making an object un-hashable. mypy doesn't know that
1246    __hash__ = None  # type: ignore
1247
1248    def hashret(self):
1249        # type: () -> bytes
1250        """DEV: returns a string that has the same value for a request
1251        and its answer."""
1252        return self.payload.hashret()
1253
1254    def answers(self, other):
1255        # type: (Packet) -> int
1256        """DEV: true if self is an answer from other"""
1257        if other.__class__ == self.__class__:
1258            return self.payload.answers(other.payload)
1259        return 0
1260
1261    def layers(self):
1262        # type: () -> List[Type[Packet]]
1263        """returns a list of layer classes (including subclasses) in this packet"""  # noqa: E501
1264        layers = []
1265        lyr = self  # type: Optional[Packet]
1266        while lyr:
1267            layers.append(lyr.__class__)
1268            lyr = lyr.payload.getlayer(0, _subclass=True)
1269        return layers
1270
1271    def haslayer(self, cls, _subclass=None):
1272        # type: (Union[Type[Packet], str], Optional[bool]) -> int
1273        """
1274        true if self has a layer that is an instance of cls.
1275        Superseded by "cls in self" syntax.
1276        """
1277        if _subclass is None:
1278            _subclass = self.match_subclass or None
1279        if _subclass:
1280            match = issubtype
1281        else:
1282            match = lambda x, t: bool(x == t)
1283        if cls is None or match(self.__class__, cls) \
1284           or cls in [self.__class__.__name__, self._name]:
1285            return True
1286        for f in self.packetfields:
1287            fvalue_gen = self.getfieldval(f.name)
1288            if fvalue_gen is None:
1289                continue
1290            if not f.islist:
1291                fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1292            for fvalue in fvalue_gen:
1293                if isinstance(fvalue, Packet):
1294                    ret = fvalue.haslayer(cls, _subclass=_subclass)
1295                    if ret:
1296                        return ret
1297        return self.payload.haslayer(cls, _subclass=_subclass)
1298
1299    def getlayer(self,
1300                 cls,  # type: Union[int, Type[Packet], str]
1301                 nb=1,  # type: int
1302                 _track=None,  # type: Optional[List[int]]
1303                 _subclass=None,  # type: Optional[bool]
1304                 **flt  # type: Any
1305                 ):
1306        # type: (...) -> Optional[Packet]
1307        """Return the nb^th layer that is an instance of cls, matching flt
1308values.
1309        """
1310        if _subclass is None:
1311            _subclass = self.match_subclass or None
1312        if _subclass:
1313            match = issubtype
1314        else:
1315            match = lambda x, t: bool(x == t)
1316        # Note:
1317        # cls can be int, packet, str
1318        # string_class_name can be packet, str (packet or packet+field)
1319        # class_name can be packet, str (packet only)
1320        if isinstance(cls, int):
1321            nb = cls + 1
1322            string_class_name = ""  # type: Union[Type[Packet], str]
1323        else:
1324            string_class_name = cls
1325        class_name = ""  # type: Union[Type[Packet], str]
1326        fld = None  # type: Optional[str]
1327        if isinstance(string_class_name, str) and "." in string_class_name:
1328            class_name, fld = string_class_name.split(".", 1)
1329        else:
1330            class_name, fld = string_class_name, None
1331        if not class_name or match(self.__class__, class_name) \
1332           or class_name in [self.__class__.__name__, self._name]:
1333            if all(self.getfieldval(fldname) == fldvalue
1334                   for fldname, fldvalue in flt.items()):
1335                if nb == 1:
1336                    if fld is None:
1337                        return self
1338                    else:
1339                        return self.getfieldval(fld)  # type: ignore
1340                else:
1341                    nb -= 1
1342        for f in self.packetfields:
1343            fvalue_gen = self.getfieldval(f.name)
1344            if fvalue_gen is None:
1345                continue
1346            if not f.islist:
1347                fvalue_gen = SetGen(fvalue_gen, _iterpacket=0)
1348            for fvalue in fvalue_gen:
1349                if isinstance(fvalue, Packet):
1350                    track = []  # type: List[int]
1351                    ret = fvalue.getlayer(class_name, nb=nb, _track=track,
1352                                          _subclass=_subclass, **flt)
1353                    if ret is not None:
1354                        return ret
1355                    nb = track[0]
1356        return self.payload.getlayer(class_name, nb=nb, _track=_track,
1357                                     _subclass=_subclass, **flt)
1358
1359    def firstlayer(self):
1360        # type: () -> Packet
1361        q = self
1362        while q.underlayer is not None:
1363            q = q.underlayer
1364        return q
1365
1366    def __getitem__(self, cls):
1367        # type: (Union[Type[Packet], str]) -> Any
1368        if isinstance(cls, slice):
1369            lname = cls.start
1370            if cls.stop:
1371                ret = self.getlayer(cls.start, nb=cls.stop, **(cls.step or {}))
1372            else:
1373                ret = self.getlayer(cls.start, **(cls.step or {}))
1374        else:
1375            lname = cls
1376            ret = self.getlayer(cls)
1377        if ret is None:
1378            if isinstance(lname, type):
1379                name = lname.__name__
1380            elif not isinstance(lname, bytes):
1381                name = repr(lname)
1382            else:
1383                name = cast(str, lname)
1384            raise IndexError("Layer [%s] not found" % name)
1385        return ret
1386
1387    def __delitem__(self, cls):
1388        # type: (Type[Packet]) -> None
1389        del self[cls].underlayer.payload
1390
1391    def __setitem__(self, cls, val):
1392        # type: (Type[Packet], Packet) -> None
1393        self[cls].underlayer.payload = val
1394
1395    def __contains__(self, cls):
1396        # type: (Union[Type[Packet], str]) -> int
1397        """
1398        "cls in self" returns true if self has a layer which is an
1399        instance of cls.
1400        """
1401        return self.haslayer(cls)
1402
1403    def route(self):
1404        # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
1405        return self.payload.route()
1406
1407    def fragment(self, *args, **kargs):
1408        # type: (*Any, **Any) -> List[Packet]
1409        return self.payload.fragment(*args, **kargs)
1410
1411    def display(self, *args, **kargs):  # Deprecated. Use show()
1412        # type: (*Any, **Any) -> None
1413        """Deprecated. Use show() method."""
1414        self.show(*args, **kargs)
1415
1416    def _show_or_dump(self,
1417                      dump=False,  # type: bool
1418                      indent=3,  # type: int
1419                      lvl="",  # type: str
1420                      label_lvl="",  # type: str
1421                      first_call=True  # type: bool
1422                      ):
1423        # type: (...) -> Optional[str]
1424        """
1425        Internal method that shows or dumps a hierarchical view of a packet.
1426        Called by show.
1427
1428        :param dump: determine if it prints or returns the string value
1429        :param int indent: the size of indentation for each layer
1430        :param str lvl: additional information about the layer lvl
1431        :param str label_lvl: additional information about the layer fields
1432        :param first_call: determine if the current function is the first
1433        :return: return a hierarchical view if dump, else print it
1434        """
1435
1436        if dump:
1437            from scapy.themes import ColorTheme, AnsiColorTheme
1438            ct: ColorTheme = AnsiColorTheme()  # No color for dump output
1439        else:
1440            ct = conf.color_theme
1441        s = "%s%s %s %s\n" % (label_lvl,
1442                              ct.punct("###["),
1443                              ct.layer_name(self.name),
1444                              ct.punct("]###"))
1445        fields = self.fields_desc.copy()
1446        while fields:
1447            f = fields.pop(0)
1448            if isinstance(f, ConditionalField) and not f._evalcond(self):
1449                continue
1450            if hasattr(f, "fields"):  # Field has subfields
1451                s += "%s  %s =\n" % (
1452                    label_lvl + lvl,
1453                    ct.depreciate_field_name(f.name),
1454                )
1455                lvl += " " * indent * self.show_indent
1456                for i, fld in enumerate(x for x in f.fields if hasattr(self, x.name)):
1457                    fields.insert(i, fld)
1458                continue
1459            if isinstance(f, Emph) or f in conf.emph:
1460                ncol = ct.emph_field_name
1461                vcol = ct.emph_field_value
1462            else:
1463                ncol = ct.field_name
1464                vcol = ct.field_value
1465            pad = max(0, 10 - len(f.name)) * " "
1466            fvalue = self.getfieldval(f.name)
1467            if isinstance(fvalue, Packet) or (f.islist and f.holds_packets and isinstance(fvalue, list)):  # noqa: E501
1468                s += "%s  %s%s%s%s\n" % (label_lvl + lvl,
1469                                         ct.punct("\\"),
1470                                         ncol(f.name),
1471                                         pad,
1472                                         ct.punct("\\"))
1473                fvalue_gen = SetGen(
1474                    fvalue,
1475                    _iterpacket=0
1476                )  # type: SetGen[Packet]
1477                for fvalue in fvalue_gen:
1478                    s += fvalue._show_or_dump(dump=dump, indent=indent, label_lvl=label_lvl + lvl + "   |", first_call=False)  # noqa: E501
1479            else:
1480                begn = "%s  %s%s%s " % (label_lvl + lvl,
1481                                        ncol(f.name),
1482                                        pad,
1483                                        ct.punct("="),)
1484                reprval = f.i2repr(self, fvalue)
1485                if isinstance(reprval, str):
1486                    reprval = reprval.replace("\n", "\n" + " " * (len(label_lvl) +  # noqa: E501
1487                                                                  len(lvl) +
1488                                                                  len(f.name) +
1489                                                                  4))
1490                s += "%s%s\n" % (begn, vcol(reprval))
1491        if self.payload:
1492            s += self.payload._show_or_dump(  # type: ignore
1493                dump=dump,
1494                indent=indent,
1495                lvl=lvl + (" " * indent * self.show_indent),
1496                label_lvl=label_lvl,
1497                first_call=False
1498            )
1499
1500        if first_call and not dump:
1501            print(s)
1502            return None
1503        else:
1504            return s
1505
1506    def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1507        # type: (bool, int, str, str) -> Optional[Any]
1508        """
1509        Prints or returns (when "dump" is true) a hierarchical view of the
1510        packet.
1511
1512        :param dump: determine if it prints or returns the string value
1513        :param int indent: the size of indentation for each layer
1514        :param str lvl: additional information about the layer lvl
1515        :param str label_lvl: additional information about the layer fields
1516        :return: return a hierarchical view if dump, else print it
1517        """
1518        return self._show_or_dump(dump, indent, lvl, label_lvl)
1519
1520    def show2(self, dump=False, indent=3, lvl="", label_lvl=""):
1521        # type: (bool, int, str, str) -> Optional[Any]
1522        """
1523        Prints or returns (when "dump" is true) a hierarchical view of an
1524        assembled version of the packet, so that automatic fields are
1525        calculated (checksums, etc.)
1526
1527        :param dump: determine if it prints or returns the string value
1528        :param int indent: the size of indentation for each layer
1529        :param str lvl: additional information about the layer lvl
1530        :param str label_lvl: additional information about the layer fields
1531        :return: return a hierarchical view if dump, else print it
1532        """
1533        return self.__class__(raw(self)).show(dump, indent, lvl, label_lvl)
1534
1535    def sprintf(self, fmt, relax=1):
1536        # type: (str, int) -> str
1537        """
1538        sprintf(format, [relax=1]) -> str
1539
1540        Where format is a string that can include directives. A directive
1541        begins and ends by % and has the following format:
1542        ``%[fmt[r],][cls[:nb].]field%``
1543
1544        :param fmt: is a classic printf directive, "r" can be appended for raw
1545          substitution:
1546          (ex: IP.flags=0x18 instead of SA), nb is the number of the layer
1547          (ex: for IP/IP packets, IP:2.src is the src of the upper IP layer).
1548          Special case : "%.time%" is the creation time.
1549          Ex::
1550
1551            p.sprintf(
1552              "%.time% %-15s,IP.src% -> %-15s,IP.dst% %IP.chksum% "
1553              "%03xr,IP.proto% %r,TCP.flags%"
1554            )
1555
1556          Moreover, the format string can include conditional statements. A
1557          conditional statement looks like : {layer:string} where layer is a
1558          layer name, and string is the string to insert in place of the
1559          condition if it is true, i.e. if layer is present. If layer is
1560          preceded by a "!", the result is inverted. Conditions can be
1561          imbricated. A valid statement can be::
1562
1563            p.sprintf("This is a{TCP: TCP}{UDP: UDP}{ICMP:n ICMP} packet")
1564            p.sprintf("{IP:%IP.dst% {ICMP:%ICMP.type%}{TCP:%TCP.dport%}}")
1565
1566          A side effect is that, to obtain "{" and "}" characters, you must use
1567          "%(" and "%)".
1568        """
1569
1570        escape = {"%": "%",
1571                  "(": "{",
1572                  ")": "}"}
1573
1574        # Evaluate conditions
1575        while "{" in fmt:
1576            i = fmt.rindex("{")
1577            j = fmt[i + 1:].index("}")
1578            cond = fmt[i + 1:i + j + 1]
1579            k = cond.find(":")
1580            if k < 0:
1581                raise Scapy_Exception("Bad condition in format string: [%s] (read sprintf doc!)" % cond)  # noqa: E501
1582            cond, format_ = cond[:k], cond[k + 1:]
1583            res = False
1584            if cond[0] == "!":
1585                res = True
1586                cond = cond[1:]
1587            if self.haslayer(cond):
1588                res = not res
1589            if not res:
1590                format_ = ""
1591            fmt = fmt[:i] + format_ + fmt[i + j + 2:]
1592
1593        # Evaluate directives
1594        s = ""
1595        while "%" in fmt:
1596            i = fmt.index("%")
1597            s += fmt[:i]
1598            fmt = fmt[i + 1:]
1599            if fmt and fmt[0] in escape:
1600                s += escape[fmt[0]]
1601                fmt = fmt[1:]
1602                continue
1603            try:
1604                i = fmt.index("%")
1605                sfclsfld = fmt[:i]
1606                fclsfld = sfclsfld.split(",")
1607                if len(fclsfld) == 1:
1608                    f = "s"
1609                    clsfld = fclsfld[0]
1610                elif len(fclsfld) == 2:
1611                    f, clsfld = fclsfld
1612                else:
1613                    raise Scapy_Exception
1614                if "." in clsfld:
1615                    cls, fld = clsfld.split(".")
1616                else:
1617                    cls = self.__class__.__name__
1618                    fld = clsfld
1619                num = 1
1620                if ":" in cls:
1621                    cls, snum = cls.split(":")
1622                    num = int(snum)
1623                fmt = fmt[i + 1:]
1624            except Exception:
1625                raise Scapy_Exception("Bad format string [%%%s%s]" % (fmt[:25], fmt[25:] and "..."))  # noqa: E501
1626            else:
1627                if fld == "time":
1628                    val = time.strftime(
1629                        "%H:%M:%S.%%06i",
1630                        time.localtime(float(self.time))
1631                    ) % int((self.time - int(self.time)) * 1000000)
1632                elif cls == self.__class__.__name__ and hasattr(self, fld):
1633                    if num > 1:
1634                        val = self.payload.sprintf("%%%s,%s:%s.%s%%" % (f, cls, num - 1, fld), relax)  # noqa: E501
1635                        f = "s"
1636                    else:
1637                        try:
1638                            val = self.getfieldval(fld)
1639                        except AttributeError:
1640                            val = getattr(self, fld)
1641                        if f[-1] == "r":  # Raw field value
1642                            f = f[:-1]
1643                            if not f:
1644                                f = "s"
1645                        else:
1646                            if fld in self.fieldtype:
1647                                val = self.fieldtype[fld].i2repr(self, val)
1648                else:
1649                    val = self.payload.sprintf("%%%s%%" % sfclsfld, relax)
1650                    f = "s"
1651                s += ("%" + f) % val
1652
1653        s += fmt
1654        return s
1655
1656    def mysummary(self):
1657        # type: () -> str
1658        """DEV: can be overloaded to return a string that summarizes the layer.
1659           Only one mysummary() is used in a whole packet summary: the one of the upper layer,  # noqa: E501
1660           except if a mysummary() also returns (as a couple) a list of layers whose  # noqa: E501
1661           mysummary() must be called if they are present."""
1662        return ""
1663
1664    def _do_summary(self):
1665        # type: () -> Tuple[int, str, List[Any]]
1666        found, s, needed = self.payload._do_summary()
1667        ret = ""
1668        if not found or self.__class__ in needed:
1669            ret = self.mysummary()
1670            if isinstance(ret, tuple):
1671                ret, n = ret
1672                needed += n
1673        if ret or needed:
1674            found = 1
1675        if not ret:
1676            ret = self.__class__.__name__ if self.show_summary else ""
1677        if self.__class__ in conf.emph:
1678            impf = []
1679            for f in self.fields_desc:
1680                if f in conf.emph:
1681                    impf.append("%s=%s" % (f.name, f.i2repr(self, self.getfieldval(f.name))))  # noqa: E501
1682            ret = "%s [%s]" % (ret, " ".join(impf))
1683        if ret and s:
1684            ret = "%s / %s" % (ret, s)
1685        else:
1686            ret = "%s%s" % (ret, s)
1687        return found, ret, needed
1688
1689    def summary(self, intern=0):
1690        # type: (int) -> str
1691        """Prints a one line summary of a packet."""
1692        return self._do_summary()[1]
1693
1694    def lastlayer(self, layer=None):
1695        # type: (Optional[Packet]) -> Packet
1696        """Returns the uppest layer of the packet"""
1697        return self.payload.lastlayer(self)
1698
1699    def decode_payload_as(self, cls):
1700        # type: (Type[Packet]) -> None
1701        """Reassembles the payload and decode it using another packet class"""
1702        s = raw(self.payload)
1703        self.payload = cls(s, _internal=1, _underlayer=self)
1704        pp = self
1705        while pp.underlayer is not None:
1706            pp = pp.underlayer
1707        self.payload.dissection_done(pp)
1708
1709    def _command(self, json=False):
1710        # type: (bool) -> List[Tuple[str, Any]]
1711        """
1712        Internal method used to generate command() and json()
1713        """
1714        f = []
1715        iterator: Iterator[Tuple[str, Any]]
1716        if json:
1717            iterator = ((x.name, self.getfieldval(x.name)) for x in self.fields_desc)
1718        else:
1719            iterator = iter(self.fields.items())
1720        for fn, fv in iterator:
1721            fld = self.get_field(fn)
1722            if isinstance(fv, (list, dict, set)) and not fv and not fld.default:
1723                continue
1724            if isinstance(fv, Packet):
1725                if json:
1726                    fv = {k: v for (k, v) in fv._command(json=True)}
1727                else:
1728                    fv = fv.command()
1729            elif fld.islist and fld.holds_packets and isinstance(fv, list):
1730                if json:
1731                    fv = [
1732                        {k: v for (k, v) in x}
1733                        for x in map(lambda y: Packet._command(y, json=True), fv)
1734                    ]
1735                else:
1736                    fv = "[%s]" % ",".join(map(Packet.command, fv))
1737            elif fld.islist and isinstance(fv, list):
1738                if json:
1739                    fv = [
1740                        getattr(x, 'command', lambda: repr(x))()
1741                        for x in fv
1742                    ]
1743                else:
1744                    fv = "[%s]" % ",".join(
1745                        getattr(x, 'command', lambda: repr(x))()
1746                        for x in fv
1747                    )
1748            elif isinstance(fv, FlagValue):
1749                fv = int(fv)
1750            elif callable(getattr(fv, 'command', None)):
1751                fv = fv.command(json=json)
1752            else:
1753                if json:
1754                    if isinstance(fv, bytes):
1755                        fv = fv.decode("utf-8", errors="backslashreplace")
1756                    else:
1757                        fv = fld.i2h(self, fv)
1758                else:
1759                    fv = repr(fld.i2h(self, fv))
1760            f.append((fn, fv))
1761        return f
1762
1763    def command(self):
1764        # type: () -> str
1765        """
1766        Returns a string representing the command you have to type to
1767        obtain the same packet
1768        """
1769        c = "%s(%s)" % (
1770            self.__class__.__name__,
1771            ", ".join("%s=%s" % x for x in self._command())
1772        )
1773        pc = self.payload.command()
1774        if pc:
1775            c += "/" + pc
1776        return c
1777
1778    def json(self):
1779        # type: () -> str
1780        """
1781        Returns a JSON representing the packet.
1782
1783        Please note that this cannot be used for bijective usage: data loss WILL occur,
1784        so it will not make sense to try to rebuild the packet from the output.
1785        This must only be used for a grepping/displaying purpose.
1786        """
1787        dump = json.dumps({k: v for (k, v) in self._command(json=True)})
1788        pc = self.payload.json()
1789        if pc:
1790            dump = dump[:-1] + ", \"payload\": %s}" % pc
1791        return dump
1792
1793
1794class NoPayload(Packet):
1795    def __new__(cls, *args, **kargs):
1796        # type: (Type[Packet], *Any, **Any) -> NoPayload
1797        singl = cls.__dict__.get("__singl__")
1798        if singl is None:
1799            cls.__singl__ = singl = Packet.__new__(cls)
1800            Packet.__init__(singl)
1801        return cast(NoPayload, singl)
1802
1803    def __init__(self, *args, **kargs):
1804        # type: (*Any, **Any) -> None
1805        pass
1806
1807    def dissection_done(self, pkt):
1808        # type: (Packet) -> None
1809        pass
1810
1811    def add_payload(self, payload):
1812        # type: (Union[Packet, bytes]) -> NoReturn
1813        raise Scapy_Exception("Can't add payload to NoPayload instance")
1814
1815    def remove_payload(self):
1816        # type: () -> None
1817        pass
1818
1819    def add_underlayer(self, underlayer):
1820        # type: (Any) -> None
1821        pass
1822
1823    def remove_underlayer(self, other):
1824        # type: (Packet) -> None
1825        pass
1826
1827    def add_parent(self, parent):
1828        # type: (Any) -> None
1829        pass
1830
1831    def remove_parent(self, other):
1832        # type: (Packet) -> None
1833        pass
1834
1835    def copy(self):
1836        # type: () -> NoPayload
1837        return self
1838
1839    def clear_cache(self):
1840        # type: () -> None
1841        pass
1842
1843    def __repr__(self):
1844        # type: () -> str
1845        return ""
1846
1847    def __str__(self):
1848        # type: () -> str
1849        return ""
1850
1851    def __bytes__(self):
1852        # type: () -> bytes
1853        return b""
1854
1855    def __nonzero__(self):
1856        # type: () -> bool
1857        return False
1858    __bool__ = __nonzero__
1859
1860    def do_build(self):
1861        # type: () -> bytes
1862        return b""
1863
1864    def build(self):
1865        # type: () -> bytes
1866        return b""
1867
1868    def build_padding(self):
1869        # type: () -> bytes
1870        return b""
1871
1872    def build_done(self, p):
1873        # type: (bytes) -> bytes
1874        return p
1875
1876    def build_ps(self, internal=0):
1877        # type: (int) -> Tuple[bytes, List[Any]]
1878        return b"", []
1879
1880    def getfieldval(self, attr):
1881        # type: (str) -> NoReturn
1882        raise AttributeError(attr)
1883
1884    def getfield_and_val(self, attr):
1885        # type: (str) -> NoReturn
1886        raise AttributeError(attr)
1887
1888    def setfieldval(self, attr, val):
1889        # type: (str, Any) -> NoReturn
1890        raise AttributeError(attr)
1891
1892    def delfieldval(self, attr):
1893        # type: (str) -> NoReturn
1894        raise AttributeError(attr)
1895
1896    def hide_defaults(self):
1897        # type: () -> None
1898        pass
1899
1900    def __iter__(self):
1901        # type: () -> Iterator[Packet]
1902        return iter([])
1903
1904    def __eq__(self, other):
1905        # type: (Any) -> bool
1906        if isinstance(other, NoPayload):
1907            return True
1908        return False
1909
1910    def hashret(self):
1911        # type: () -> bytes
1912        return b""
1913
1914    def answers(self, other):
1915        # type: (Packet) -> bool
1916        return isinstance(other, (NoPayload, conf.padding_layer))  # noqa: E501
1917
1918    def haslayer(self, cls, _subclass=None):
1919        # type: (Union[Type[Packet], str], Optional[bool]) -> int
1920        return 0
1921
1922    def getlayer(self,
1923                 cls,  # type: Union[int, Type[Packet], str]
1924                 nb=1,  # type: int
1925                 _track=None,  # type: Optional[List[int]]
1926                 _subclass=None,  # type: Optional[bool]
1927                 **flt  # type: Any
1928                 ):
1929        # type: (...) -> Optional[Packet]
1930        if _track is not None:
1931            _track.append(nb)
1932        return None
1933
1934    def fragment(self, *args, **kargs):
1935        # type: (*Any, **Any) -> List[Packet]
1936        raise Scapy_Exception("cannot fragment this packet")
1937
1938    def show(self, dump=False, indent=3, lvl="", label_lvl=""):
1939        # type: (bool, int, str, str) -> None
1940        pass
1941
1942    def sprintf(self, fmt, relax=1):
1943        # type: (str, int) -> str
1944        if relax:
1945            return "??"
1946        else:
1947            raise Scapy_Exception("Format not found [%s]" % fmt)
1948
1949    def _do_summary(self):
1950        # type: () -> Tuple[int, str, List[Any]]
1951        return 0, "", []
1952
1953    def layers(self):
1954        # type: () -> List[Type[Packet]]
1955        return []
1956
1957    def lastlayer(self, layer=None):
1958        # type: (Optional[Packet]) -> Packet
1959        return layer or self
1960
1961    def command(self):
1962        # type: () -> str
1963        return ""
1964
1965    def json(self):
1966        # type: () -> str
1967        return ""
1968
1969    def route(self):
1970        # type: () -> Tuple[None, None, None]
1971        return (None, None, None)
1972
1973
1974####################
1975#  packet classes  #
1976####################
1977
1978
1979class Raw(Packet):
1980    name = "Raw"
1981    fields_desc = [StrField("load", b"")]
1982
1983    def __init__(self, _pkt=b"", *args, **kwargs):
1984        # type: (bytes, *Any, **Any) -> None
1985        if _pkt and not isinstance(_pkt, bytes):
1986            if isinstance(_pkt, tuple):
1987                _pkt, bn = _pkt
1988                _pkt = bytes_encode(_pkt), bn
1989            else:
1990                _pkt = bytes_encode(_pkt)
1991        super(Raw, self).__init__(_pkt, *args, **kwargs)
1992
1993    def answers(self, other):
1994        # type: (Packet) -> int
1995        return 1
1996
1997    def mysummary(self):
1998        # type: () -> str
1999        cs = conf.raw_summary
2000        if cs:
2001            if callable(cs):
2002                return "Raw %s" % cs(self.load)
2003            else:
2004                return "Raw %r" % self.load
2005        return Packet.mysummary(self)
2006
2007
2008class Padding(Raw):
2009    name = "Padding"
2010
2011    def self_build(self, field_pos_list=None):
2012        # type: (Optional[Any]) -> bytes
2013        return b""
2014
2015    def build_padding(self):
2016        # type: () -> bytes
2017        return (
2018            bytes_encode(self.load) if self.raw_packet_cache is None
2019            else self.raw_packet_cache
2020        ) + self.payload.build_padding()
2021
2022
2023conf.raw_layer = Raw
2024conf.padding_layer = Padding
2025if conf.default_l2 is None:
2026    conf.default_l2 = Raw
2027
2028#################
2029#  Bind layers  #
2030#################
2031
2032
2033def bind_bottom_up(lower,  # type: Type[Packet]
2034                   upper,  # type: Type[Packet]
2035                   __fval=None,  # type: Optional[Any]
2036                   **fval  # type: Any
2037                   ):
2038    # type: (...) -> None
2039    r"""Bind 2 layers for dissection.
2040    The upper layer will be chosen for dissection on top of the lower layer, if
2041    ALL the passed arguments are validated. If multiple calls are made with
2042    the same layers, the last one will be used as default.
2043
2044    ex:
2045        >>> bind_bottom_up(Ether, SNAP, type=0x1234)
2046        >>> Ether(b'\xff\xff\xff\xff\xff\xff\xd0P\x99V\xdd\xf9\x124\x00\x00\x00\x00\x00')  # noqa: E501
2047        <Ether  dst=ff:ff:ff:ff:ff:ff src=d0:50:99:56:dd:f9 type=0x1234 |<SNAP  OUI=0x0 code=0x0 |>>  # noqa: E501
2048    """
2049    if __fval is not None:
2050        fval.update(__fval)
2051    lower.payload_guess = lower.payload_guess[:]
2052    lower.payload_guess.append((fval, upper))
2053
2054
2055def bind_top_down(lower,  # type: Type[Packet]
2056                  upper,  # type: Type[Packet]
2057                  __fval=None,  # type: Optional[Any]
2058                  **fval  # type: Any
2059                  ):
2060    # type: (...) -> None
2061    """Bind 2 layers for building.
2062    When the upper layer is added as a payload of the lower layer, all the
2063    arguments will be applied to them.
2064
2065    ex:
2066        >>> bind_top_down(Ether, SNAP, type=0x1234)
2067        >>> Ether()/SNAP()
2068        <Ether  type=0x1234 |<SNAP  |>>
2069    """
2070    if __fval is not None:
2071        fval.update(__fval)
2072    upper._overload_fields = upper._overload_fields.copy()  # type: ignore
2073    upper._overload_fields[lower] = fval
2074
2075
2076@conf.commands.register
2077def bind_layers(lower,  # type: Type[Packet]
2078                upper,  # type: Type[Packet]
2079                __fval=None,  # type: Optional[Dict[str, int]]
2080                **fval  # type: Any
2081                ):
2082    # type: (...) -> None
2083    """Bind 2 layers on some specific fields' values.
2084
2085    It makes the packet being built and dissected when the arguments
2086    are present.
2087
2088    This function calls both bind_bottom_up and bind_top_down, with
2089    all passed arguments.
2090
2091    Please have a look at their docs:
2092     - help(bind_bottom_up)
2093     - help(bind_top_down)
2094     """
2095    if __fval is not None:
2096        fval.update(__fval)
2097    bind_top_down(lower, upper, **fval)
2098    bind_bottom_up(lower, upper, **fval)
2099
2100
2101def split_bottom_up(lower,  # type: Type[Packet]
2102                    upper,  # type: Type[Packet]
2103                    __fval=None,  # type: Optional[Any]
2104                    **fval  # type: Any
2105                    ):
2106    # type: (...) -> None
2107    """This call un-links an association that was made using bind_bottom_up.
2108    Have a look at help(bind_bottom_up)
2109    """
2110    if __fval is not None:
2111        fval.update(__fval)
2112
2113    def do_filter(params, cls):
2114        # type: (Dict[str, int], Type[Packet]) -> bool
2115        params_is_invalid = any(
2116            k not in params or params[k] != v for k, v in fval.items()
2117        )
2118        return cls != upper or params_is_invalid
2119    lower.payload_guess = [x for x in lower.payload_guess if do_filter(*x)]
2120
2121
2122def split_top_down(lower,  # type: Type[Packet]
2123                   upper,  # type: Type[Packet]
2124                   __fval=None,  # type: Optional[Any]
2125                   **fval  # type: Any
2126                   ):
2127    # type: (...) -> None
2128    """This call un-links an association that was made using bind_top_down.
2129    Have a look at help(bind_top_down)
2130    """
2131    if __fval is not None:
2132        fval.update(__fval)
2133    if lower in upper._overload_fields:
2134        ofval = upper._overload_fields[lower]
2135        if any(k not in ofval or ofval[k] != v for k, v in fval.items()):
2136            return
2137        upper._overload_fields = upper._overload_fields.copy()  # type: ignore
2138        del upper._overload_fields[lower]
2139
2140
2141@conf.commands.register
2142def split_layers(lower,  # type: Type[Packet]
2143                 upper,  # type: Type[Packet]
2144                 __fval=None,  # type: Optional[Any]
2145                 **fval  # type: Any
2146                 ):
2147    # type: (...) -> None
2148    """Split 2 layers previously bound.
2149    This call un-links calls bind_top_down and bind_bottom_up. It is the opposite of  # noqa: E501
2150    bind_layers.
2151
2152    Please have a look at their docs:
2153     - help(split_bottom_up)
2154     - help(split_top_down)
2155    """
2156    if __fval is not None:
2157        fval.update(__fval)
2158    split_bottom_up(lower, upper, **fval)
2159    split_top_down(lower, upper, **fval)
2160
2161
2162@conf.commands.register
2163def explore(layer=None):
2164    # type: (Optional[str]) -> None
2165    """Function used to discover the Scapy layers and protocols.
2166    It helps to see which packets exists in contrib or layer files.
2167
2168    params:
2169     - layer: If specified, the function will explore the layer. If not,
2170              the GUI mode will be activated, to browse the available layers
2171
2172    examples:
2173      >>> explore()  # Launches the GUI
2174      >>> explore("dns")  # Explore scapy.layers.dns
2175      >>> explore("http2")  # Explore scapy.contrib.http2
2176      >>> explore(scapy.layers.bluetooth4LE)
2177
2178    Note: to search a packet by name, use ls("name") rather than explore.
2179    """
2180    if layer is None:  # GUI MODE
2181        if not conf.interactive:
2182            raise Scapy_Exception("explore() GUI-mode cannot be run in "
2183                                  "interactive mode. Please provide a "
2184                                  "'layer' parameter !")
2185        # 0 - Imports
2186        try:
2187            import prompt_toolkit
2188        except ImportError:
2189            raise ImportError("prompt_toolkit is not installed ! "
2190                              "You may install IPython, which contains it, via"
2191                              " `pip install ipython`")
2192        if not _version_checker(prompt_toolkit, (2, 0)):
2193            raise ImportError("prompt_toolkit >= 2.0.0 is required !")
2194        # Only available with prompt_toolkit > 2.0, not released on PyPi yet
2195        from prompt_toolkit.shortcuts.dialogs import radiolist_dialog, \
2196            button_dialog
2197        from prompt_toolkit.formatted_text import HTML
2198        # Check for prompt_toolkit >= 3.0.0
2199        call_ptk = lambda x: cast(str, x)  # type: Callable[[Any], str]
2200        if _version_checker(prompt_toolkit, (3, 0)):
2201            call_ptk = lambda x: x.run()
2202        # 1 - Ask for layer or contrib
2203        btn_diag = button_dialog(
2204            title="Scapy v%s" % conf.version,
2205            text=HTML(
2206                '<style bg="white" fg="red">Chose the type of packets'
2207                ' you want to explore:</style>'
2208            ),
2209            buttons=[
2210                ("Layers", "layers"),
2211                ("Contribs", "contribs"),
2212                ("Cancel", "cancel")
2213            ])
2214        action = call_ptk(btn_diag)
2215        # 2 - Retrieve list of Packets
2216        if action == "layers":
2217            # Get all loaded layers
2218            lvalues = conf.layers.layers()
2219            # Restrict to layers-only (not contribs) + packet.py and asn1*.py
2220            values = [x for x in lvalues if ("layers" in x[0] or
2221                                             "packet" in x[0] or
2222                                             "asn1" in x[0])]
2223        elif action == "contribs":
2224            # Get all existing contribs
2225            from scapy.main import list_contrib
2226            cvalues = cast(List[Dict[str, str]], list_contrib(ret=True))
2227            values = [(x['name'], x['description'])
2228                      for x in cvalues]
2229            # Remove very specific modules
2230            values = [x for x in values if "can" not in x[0]]
2231        else:
2232            # Escape/Cancel was pressed
2233            return
2234        # Build tree
2235        if action == "contribs":
2236            # A tree is a dictionary. Each layer contains a keyword
2237            # _l which contains the files in the layer, and a _name
2238            # argument which is its name. The other keys are the subfolders,
2239            # which are similar dictionaries
2240            tree = defaultdict(list)  # type: Dict[str, Union[List[Any], Dict[str, Any]]]  # noqa: E501
2241            for name, desc in values:
2242                if "." in name:  # Folder detected
2243                    parts = name.split(".")
2244                    subtree = tree
2245                    for pa in parts[:-1]:
2246                        if pa not in subtree:
2247                            subtree[pa] = {}
2248                        # one layer deeper
2249                        subtree = subtree[pa]  # type: ignore
2250                        subtree["_name"] = pa  # type: ignore
2251                    if "_l" not in subtree:
2252                        subtree["_l"] = []
2253                    subtree["_l"].append((parts[-1], desc))  # type: ignore
2254                else:
2255                    tree["_l"].append((name, desc))  # type: ignore
2256        elif action == "layers":
2257            tree = {"_l": values}
2258        # 3 - Ask for the layer/contrib module to explore
2259        current = tree  # type: Any
2260        previous = []  # type: List[Dict[str, Union[List[Any], Dict[str, Any]]]]  # noqa: E501
2261        while True:
2262            # Generate tests & form
2263            folders = list(current.keys())
2264            _radio_values = [
2265                ("$" + name, str('[+] ' + name.capitalize()))
2266                for name in folders if not name.startswith("_")
2267            ] + current.get("_l", [])  # type: List[str]
2268            cur_path = ""
2269            if previous:
2270                cur_path = ".".join(
2271                    itertools.chain(
2272                        (x["_name"] for x in previous[1:]),  # type: ignore
2273                        (current["_name"],)
2274                    )
2275                )
2276            extra_text = (
2277                '\n<style bg="white" fg="green">> scapy.%s</style>'
2278            ) % (action + ("." + cur_path if cur_path else ""))
2279            # Show popup
2280            rd_diag = radiolist_dialog(
2281                values=_radio_values,
2282                title="Scapy v%s" % conf.version,
2283                text=HTML(
2284                    (
2285                        '<style bg="white" fg="red">Please select a file'
2286                        'among the following, to see all layers contained in'
2287                        ' it:</style>'
2288                    ) + extra_text
2289                ),
2290                cancel_text="Back" if previous else "Cancel"
2291            )
2292            result = call_ptk(rd_diag)
2293            if result is None:
2294                # User pressed "Cancel/Back"
2295                if previous:  # Back
2296                    current = previous.pop()
2297                    continue
2298                else:  # Cancel
2299                    return
2300            if result.startswith("$"):
2301                previous.append(current)
2302                current = current[result[1:]]
2303            else:
2304                # Enter on layer
2305                if previous:  # In subfolder
2306                    result = cur_path + "." + result
2307                break
2308        # 4 - (Contrib only): load contrib
2309        if action == "contribs":
2310            from scapy.main import load_contrib
2311            load_contrib(result)
2312            result = "scapy.contrib." + result
2313    else:  # NON-GUI MODE
2314        # We handle layer as a short layer name, full layer name
2315        # or the module itself
2316        if isinstance(layer, types.ModuleType):
2317            layer = layer.__name__
2318        if isinstance(layer, str):
2319            if layer.startswith("scapy.layers."):
2320                result = layer
2321            else:
2322                if layer.startswith("scapy.contrib."):
2323                    layer = layer.replace("scapy.contrib.", "")
2324                from scapy.main import load_contrib
2325                load_contrib(layer)
2326                result_layer, result_contrib = (("scapy.layers.%s" % layer),
2327                                                ("scapy.contrib.%s" % layer))
2328                if result_layer in conf.layers.ldict:
2329                    result = result_layer
2330                elif result_contrib in conf.layers.ldict:
2331                    result = result_contrib
2332                else:
2333                    raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2334        else:
2335            warning("Wrong usage ! Check out help(explore)")
2336            return
2337
2338    # COMMON PART
2339    # Get the list of all Packets contained in that module
2340    try:
2341        all_layers = conf.layers.ldict[result]
2342    except KeyError:
2343        raise Scapy_Exception("Unknown scapy module '%s'" % layer)
2344    # Print
2345    print(conf.color_theme.layer_name("Packets contained in %s:" % result))
2346    rtlst = []  # type: List[Tuple[Union[str, List[str]], ...]]
2347    rtlst = [(lay.__name__ or "", cast(str, lay._name) or "") for lay in all_layers]
2348    print(pretty_list(rtlst, [("Class", "Name")], borders=True))
2349
2350
2351def _pkt_ls(obj,  # type: Union[Packet, Type[Packet]]
2352            verbose=False,  # type: bool
2353            ):
2354    # type: (...) -> List[Tuple[str, Type[AnyField], str, str, List[str]]]  # noqa: E501
2355    """Internal function used to resolve `fields_desc` to display it.
2356
2357    :param obj: a packet object or class
2358    :returns: a list containing tuples [(name, clsname, clsname_extras,
2359        default, long_attrs)]
2360    """
2361    is_pkt = isinstance(obj, Packet)
2362    if not issubtype(obj, Packet) and not is_pkt:
2363        raise ValueError
2364    fields = []
2365    for f in obj.fields_desc:
2366        cur_fld = f
2367        attrs = []  # type: List[str]
2368        long_attrs = []  # type: List[str]
2369        while isinstance(cur_fld, (Emph, ConditionalField)):
2370            if isinstance(cur_fld, ConditionalField):
2371                attrs.append(cur_fld.__class__.__name__[:4])
2372            cur_fld = cur_fld.fld
2373        name = cur_fld.name
2374        default = cur_fld.default
2375        if verbose and isinstance(cur_fld, EnumField) \
2376           and hasattr(cur_fld, "i2s") and cur_fld.i2s:
2377            if len(cur_fld.i2s or []) < 50:
2378                long_attrs.extend(
2379                    "%s: %d" % (strval, numval)
2380                    for numval, strval in
2381                    sorted(cur_fld.i2s.items())
2382                )
2383        elif isinstance(cur_fld, MultiEnumField):
2384            if isinstance(obj, Packet):
2385                obj_pkt = obj
2386            else:
2387                obj_pkt = obj()
2388            fld_depend = cur_fld.depends_on(obj_pkt)
2389            attrs.append("Depends on %s" % fld_depend)
2390            if verbose:
2391                cur_i2s = cur_fld.i2s_multi.get(
2392                    cur_fld.depends_on(obj_pkt), {}
2393                )
2394                if len(cur_i2s) < 50:
2395                    long_attrs.extend(
2396                        "%s: %d" % (strval, numval)
2397                        for numval, strval in
2398                        sorted(cur_i2s.items())
2399                    )
2400        elif verbose and isinstance(cur_fld, FlagsField):
2401            names = cur_fld.names
2402            long_attrs.append(", ".join(names))
2403        elif isinstance(cur_fld, MultipleTypeField):
2404            default = cur_fld.dflt.default
2405            attrs.append(", ".join(
2406                x[0].__class__.__name__ for x in
2407                itertools.chain(cur_fld.flds, [(cur_fld.dflt,)])
2408            ))
2409
2410        cls = cur_fld.__class__
2411        class_name_extras = "(%s)" % (
2412            ", ".join(attrs)
2413        ) if attrs else ""
2414        if isinstance(cur_fld, BitField):
2415            class_name_extras += " (%d bit%s)" % (
2416                cur_fld.size,
2417                "s" if cur_fld.size > 1 else ""
2418            )
2419        fields.append(
2420            (name,
2421             cls,
2422             class_name_extras,
2423             repr(default),
2424             long_attrs)
2425        )
2426    return fields
2427
2428
2429@conf.commands.register
2430def ls(obj=None,  # type: Optional[Union[str, Packet, Type[Packet]]]
2431       case_sensitive=False,  # type: bool
2432       verbose=False  # type: bool
2433       ):
2434    # type: (...) -> None
2435    """List  available layers, or infos on a given layer class or name.
2436
2437    :param obj: Packet / packet name to use
2438    :param case_sensitive: if obj is a string, is it case sensitive?
2439    :param verbose:
2440    """
2441    if obj is None or isinstance(obj, str):
2442        tip = False
2443        if obj is None:
2444            tip = True
2445            all_layers = sorted(conf.layers, key=lambda x: x.__name__)
2446        else:
2447            pattern = re.compile(
2448                obj,
2449                0 if case_sensitive else re.I
2450            )
2451            # We first order by accuracy, then length
2452            if case_sensitive:
2453                sorter = lambda x: (x.__name__.index(obj), len(x.__name__))
2454            else:
2455                obj = obj.lower()
2456                sorter = lambda x: (x.__name__.lower().index(obj),
2457                                    len(x.__name__))
2458            all_layers = sorted((layer for layer in conf.layers
2459                                 if (isinstance(layer.__name__, str) and
2460                                     pattern.search(layer.__name__)) or
2461                                 (isinstance(layer.name, str) and
2462                                     pattern.search(layer.name))),
2463                                key=sorter)
2464        for layer in all_layers:
2465            print("%-10s : %s" % (layer.__name__, layer._name))
2466        if tip and conf.interactive:
2467            print("\nTIP: You may use explore() to navigate through all "
2468                  "layers using a clear GUI")
2469    else:
2470        try:
2471            fields = _pkt_ls(
2472                obj,
2473                verbose=verbose
2474            )
2475            is_pkt = isinstance(obj, Packet)
2476            # Print
2477            for fname, cls, clsne, dflt, long_attrs in fields:
2478                clsinfo = cls.__name__ + " " + clsne
2479                print("%-10s : %-35s =" % (fname, clsinfo), end=' ')
2480                if is_pkt:
2481                    print("%-15r" % (getattr(obj, fname),), end=' ')
2482                print("(%r)" % (dflt,))
2483                for attr in long_attrs:
2484                    print("%-15s%s" % ("", attr))
2485            # Restart for payload if any
2486            if is_pkt:
2487                obj = cast(Packet, obj)
2488                if isinstance(obj.payload, NoPayload):
2489                    return
2490                print("--")
2491                ls(obj.payload)
2492        except ValueError:
2493            print("Not a packet class or name. Type 'ls()' to list packet classes.")  # noqa: E501
2494
2495
2496@conf.commands.register
2497def rfc(cls, ret=False, legend=True):
2498    # type: (Type[Packet], bool, bool) -> Optional[str]
2499    """
2500    Generate an RFC-like representation of a packet def.
2501
2502    :param cls: the Packet class
2503    :param ret: return the result instead of printing (def. False)
2504    :param legend: show text under the diagram (default True)
2505
2506    Ex::
2507
2508        >>> rfc(Ether)
2509
2510    """
2511    if not issubclass(cls, Packet):
2512        raise TypeError("Packet class expected")
2513    cur_len = 0
2514    cur_line = []
2515    lines = []
2516    # Get the size (width) that a field will take
2517    # when formatted, from its length in bits
2518    clsize = lambda x: 2 * x - 1  # type: Callable[[int], int]
2519    ident = 0  # Fields UUID
2520
2521    # Generate packet groups
2522    def _iterfields() -> Iterator[Tuple[str, int]]:
2523        for f in cls.fields_desc:
2524            # Fancy field name
2525            fname = f.name.upper().replace("_", " ")
2526            fsize = int(f.sz * 8)
2527            yield fname, fsize
2528            # Add padding optionally
2529            if isinstance(f, PadField):
2530                if isinstance(f._align, tuple):
2531                    pad = - cur_len % (f._align[0] * 8)
2532                else:
2533                    pad = - cur_len % (f._align * 8)
2534                if pad:
2535                    yield "padding", pad
2536    for fname, flen in _iterfields():
2537        cur_len += flen
2538        ident += 1
2539        # The field might exceed the current line or
2540        # take more than one line. Copy it as required
2541        while True:
2542            over = max(0, cur_len - 32)  # Exceed
2543            len1 = clsize(flen - over)  # What fits
2544            cur_line.append((fname[:len1], len1, ident))
2545            if cur_len >= 32:
2546                # Current line is full. start a new line
2547                lines.append(cur_line)
2548                cur_len = flen = over
2549                fname = ""  # do not repeat the field
2550                cur_line = []
2551                if not over:
2552                    # there is no data left
2553                    break
2554            else:
2555                # End of the field
2556                break
2557    # Add the last line if un-finished
2558    if cur_line:
2559        lines.append(cur_line)
2560    # Calculate separations between lines
2561    seps = []
2562    seps.append("+-" * 32 + "+\n")
2563    for i in range(len(lines) - 1):
2564        # Start with a full line
2565        sep = "+-" * 32 + "+\n"
2566        # Get the line above and below the current
2567        # separation
2568        above, below = lines[i], lines[i + 1]
2569        # The last field of above is shared with below
2570        if above[-1][2] == below[0][2]:
2571            # where the field in "above" starts
2572            pos_above = sum(x[1] for x in above[:-1]) + len(above[:-1]) - 1
2573            # where the field in "below" ends
2574            pos_below = below[0][1]
2575            if pos_above < pos_below:
2576                # they are overlapping.
2577                # Now crop the space between those pos
2578                # and fill it with " "
2579                pos_above = pos_above + pos_above % 2
2580                sep = (
2581                    sep[:1 + pos_above] +
2582                    " " * (pos_below - pos_above) +
2583                    sep[1 + pos_below:]
2584                )
2585        # line is complete
2586        seps.append(sep)
2587    # Graph
2588    result = ""
2589    # Bytes markers
2590    result += " " + (" " * 19).join(
2591        str(x) for x in range(4)
2592    ) + "\n"
2593    # Bits markers
2594    result += " " + " ".join(
2595        str(x % 10) for x in range(32)
2596    ) + "\n"
2597    # Add fields and their separations
2598    for line, sep in zip(lines, seps):
2599        result += sep
2600        for elt, flen, _ in line:
2601            result += "|" + elt.center(flen, " ")
2602        result += "|\n"
2603    result += "+-" * (cur_len or 32) + "+\n"
2604    # Annotate with the figure name
2605    if legend:
2606        result += "\n" + ("Fig. " + cls.__name__).center(66, " ")
2607    # return if asked for, else print
2608    if ret:
2609        return result
2610    print(result)
2611    return None
2612
2613
2614#############
2615#  Fuzzing  #
2616#############
2617
2618_P = TypeVar('_P', bound=Packet)
2619
2620
2621@conf.commands.register
2622def fuzz(p,  # type: _P
2623         _inplace=0,  # type: int
2624         ):
2625    # type: (...) -> _P
2626    """
2627    Transform a layer into a fuzzy layer by replacing some default values
2628    by random objects.
2629
2630    :param p: the Packet instance to fuzz
2631    :return: the fuzzed packet.
2632    """
2633    if not _inplace:
2634        p = p.copy()
2635    q = cast(Packet, p)
2636    while not isinstance(q, NoPayload):
2637        new_default_fields = {}
2638        multiple_type_fields = []  # type: List[str]
2639        for f in q.fields_desc:
2640            if isinstance(f, PacketListField):
2641                for r in getattr(q, f.name):
2642                    fuzz(r, _inplace=1)
2643            elif isinstance(f, MultipleTypeField):
2644                # the type of the field will depend on others
2645                multiple_type_fields.append(f.name)
2646            elif f.default is not None:
2647                if not isinstance(f, ConditionalField) or f._evalcond(q):
2648                    rnd = f.randval()
2649                    if rnd is not None:
2650                        new_default_fields[f.name] = rnd
2651        # Process packets with MultipleTypeFields
2652        if multiple_type_fields:
2653            # freeze the other random values
2654            new_default_fields = {
2655                key: (val._fix() if isinstance(val, VolatileValue) else val)
2656                for key, val in new_default_fields.items()
2657            }
2658            q.default_fields.update(new_default_fields)
2659            new_default_fields.clear()
2660            # add the random values of the MultipleTypeFields
2661            for name in multiple_type_fields:
2662                fld = cast(MultipleTypeField, q.get_field(name))
2663                rnd = fld._find_fld_pkt(q).randval()
2664                if rnd is not None:
2665                    new_default_fields[name] = rnd
2666        q.default_fields.update(new_default_fields)
2667        q = q.payload
2668    return p
2669