• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5# Copyright (C) Michael Farrell <micolous+git@gmail.com>
6
7
8"""
9Fields: basic data structures that make up parts of packets.
10"""
11
12import calendar
13import collections
14import copy
15import datetime
16import inspect
17import math
18import socket
19import struct
20import time
21import warnings
22
23from types import MethodType
24from uuid import UUID
25from enum import Enum
26
27from scapy.config import conf
28from scapy.dadict import DADict
29from scapy.volatile import RandBin, RandByte, RandEnumKeys, RandInt, \
30    RandIP, RandIP6, RandLong, RandMAC, RandNum, RandShort, RandSInt, \
31    RandSByte, RandTermString, RandUUID, VolatileValue, RandSShort, \
32    RandSLong, RandFloat
33from scapy.data import EPOCH
34from scapy.error import log_runtime, Scapy_Exception
35from scapy.compat import bytes_hex, plain_str, raw, bytes_encode
36from scapy.pton_ntop import inet_ntop, inet_pton
37from scapy.utils import inet_aton, inet_ntoa, lhex, mac2str, str2mac, EDecimal
38from scapy.utils6 import in6_6to4ExtractAddr, in6_isaddr6to4, \
39    in6_isaddrTeredo, in6_ptop, Net6, teredoAddrExtractInfo
40from scapy.base_classes import (
41    _ScopedIP,
42    BasePacket,
43    Field_metaclass,
44    Net,
45    ScopedIP,
46)
47
48# Typing imports
49from typing import (
50    Any,
51    AnyStr,
52    Callable,
53    Dict,
54    List,
55    Generic,
56    Optional,
57    Set,
58    Tuple,
59    Type,
60    TypeVar,
61    Union,
62    # func
63    cast,
64    TYPE_CHECKING,
65)
66
67if TYPE_CHECKING:
68    # Do not import on runtime ! (import loop)
69    from scapy.packet import Packet
70
71
72class RawVal:
73    r"""
74    A raw value that will not be processed by the field and inserted
75    as-is in the packet string.
76
77    Example::
78
79        >>> a = IP(len=RawVal("####"))
80        >>> bytes(a)
81        b'F\x00####\x00\x01\x00\x005\xb5\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x00'
82
83    """
84
85    def __init__(self, val=b""):
86        # type: (bytes) -> None
87        self.val = bytes_encode(val)
88
89    def __str__(self):
90        # type: () -> str
91        return str(self.val)
92
93    def __bytes__(self):
94        # type: () -> bytes
95        return self.val
96
97    def __len__(self):
98        # type: () -> int
99        return len(self.val)
100
101    def __repr__(self):
102        # type: () -> str
103        return "<RawVal [%r]>" % self.val
104
105
106class ObservableDict(Dict[int, str]):
107    """
108    Helper class to specify a protocol extendable for runtime modifications
109    """
110
111    def __init__(self, *args, **kw):
112        # type: (*Dict[int, str], **Any) -> None
113        self.observers = []  # type: List[_EnumField[Any]]
114        super(ObservableDict, self).__init__(*args, **kw)
115
116    def observe(self, observer):
117        # type: (_EnumField[Any]) -> None
118        self.observers.append(observer)
119
120    def __setitem__(self, key, value):
121        # type: (int, str) -> None
122        for o in self.observers:
123            o.notify_set(self, key, value)
124        super(ObservableDict, self).__setitem__(key, value)
125
126    def __delitem__(self, key):
127        # type: (int) -> None
128        for o in self.observers:
129            o.notify_del(self, key)
130        super(ObservableDict, self).__delitem__(key)
131
132    def update(self, anotherDict):  # type: ignore
133        for k in anotherDict:
134            self[k] = anotherDict[k]
135
136
137############
138#  Fields  #
139############
140
141I = TypeVar('I')  # Internal storage  # noqa: E741
142M = TypeVar('M')  # Machine storage
143
144
145class Field(Generic[I, M], metaclass=Field_metaclass):
146    """
147    For more information on how this works, please refer to the
148    'Adding new protocols' chapter in the online documentation:
149
150    https://scapy.readthedocs.io/en/stable/build_dissect.html
151    """
152    __slots__ = [
153        "name",
154        "fmt",
155        "default",
156        "sz",
157        "owners",
158        "struct"
159    ]
160    islist = 0
161    ismutable = False
162    holds_packets = 0
163
164    def __init__(self, name, default, fmt="H"):
165        # type: (str, Any, str) -> None
166        if not isinstance(name, str):
167            raise ValueError("name should be a string")
168        self.name = name
169        if fmt[0] in "@=<>!":
170            self.fmt = fmt
171        else:
172            self.fmt = "!" + fmt
173        self.struct = struct.Struct(self.fmt)
174        self.default = self.any2i(None, default)
175        self.sz = struct.calcsize(self.fmt)  # type: int
176        self.owners = []  # type: List[Type[Packet]]
177
178    def register_owner(self, cls):
179        # type: (Type[Packet]) -> None
180        self.owners.append(cls)
181
182    def i2len(self,
183              pkt,  # type: Packet
184              x,  # type: Any
185              ):
186        # type: (...) -> int
187        """Convert internal value to a length usable by a FieldLenField"""
188        if isinstance(x, RawVal):
189            return len(x)
190        return self.sz
191
192    def i2count(self, pkt, x):
193        # type: (Optional[Packet], I) -> int
194        """Convert internal value to a number of elements usable by a FieldLenField.
195        Always 1 except for list fields"""
196        return 1
197
198    def h2i(self, pkt, x):
199        # type: (Optional[Packet], Any) -> I
200        """Convert human value to internal value"""
201        return cast(I, x)
202
203    def i2h(self, pkt, x):
204        # type: (Optional[Packet], I) -> Any
205        """Convert internal value to human value"""
206        return x
207
208    def m2i(self, pkt, x):
209        # type: (Optional[Packet], M) -> I
210        """Convert machine value to internal value"""
211        return cast(I, x)
212
213    def i2m(self, pkt, x):
214        # type: (Optional[Packet], Optional[I]) -> M
215        """Convert internal value to machine value"""
216        if x is None:
217            return cast(M, 0)
218        elif isinstance(x, str):
219            return cast(M, bytes_encode(x))
220        return cast(M, x)
221
222    def any2i(self, pkt, x):
223        # type: (Optional[Packet], Any) -> Optional[I]
224        """Try to understand the most input values possible and make an internal value from them"""  # noqa: E501
225        return self.h2i(pkt, x)
226
227    def i2repr(self, pkt, x):
228        # type: (Optional[Packet], I) -> str
229        """Convert internal value to a nice representation"""
230        return repr(self.i2h(pkt, x))
231
232    def addfield(self, pkt, s, val):
233        # type: (Packet, bytes, Optional[I]) -> bytes
234        """Add an internal value to a string
235
236        Copy the network representation of field `val` (belonging to layer
237        `pkt`) to the raw string packet `s`, and return the new string packet.
238        """
239        try:
240            return s + self.struct.pack(self.i2m(pkt, val))
241        except struct.error as ex:
242            raise ValueError(
243                "Incorrect type of value for field %s:\n" % self.name +
244                "struct.error('%s')\n" % ex +
245                "To inject bytes into the field regardless of the type, " +
246                "use RawVal. See help(RawVal)"
247            )
248
249    def getfield(self, pkt, s):
250        # type: (Packet, bytes) -> Tuple[bytes, I]
251        """Extract an internal value from a string
252
253        Extract from the raw packet `s` the field value belonging to layer
254        `pkt`.
255
256        Returns a two-element list,
257        first the raw packet string after having removed the extracted field,
258        second the extracted field itself in internal representation.
259        """
260        return s[self.sz:], self.m2i(pkt, self.struct.unpack(s[:self.sz])[0])
261
262    def do_copy(self, x):
263        # type: (I) -> I
264        if isinstance(x, list):
265            x = x[:]  # type: ignore
266            for i in range(len(x)):
267                if isinstance(x[i], BasePacket):
268                    x[i] = x[i].copy()
269            return x  # type: ignore
270        if hasattr(x, "copy"):
271            return x.copy()  # type: ignore
272        return x
273
274    def __repr__(self):
275        # type: () -> str
276        return "<%s (%s).%s>" % (
277            self.__class__.__name__,
278            ",".join(x.__name__ for x in self.owners),
279            self.name
280        )
281
282    def copy(self):
283        # type: () -> Field[I, M]
284        return copy.copy(self)
285
286    def randval(self):
287        # type: () -> VolatileValue[Any]
288        """Return a volatile object whose value is both random and suitable for this field"""  # noqa: E501
289        fmtt = self.fmt[-1]
290        if fmtt in "BbHhIiQq":
291            return {"B": RandByte, "b": RandSByte,
292                    "H": RandShort, "h": RandSShort,
293                    "I": RandInt, "i": RandSInt,
294                    "Q": RandLong, "q": RandSLong}[fmtt]()
295        elif fmtt == "s":
296            if self.fmt[0] in "0123456789":
297                value = int(self.fmt[:-1])
298            else:
299                value = int(self.fmt[1:-1])
300            return RandBin(value)
301        else:
302            raise ValueError(
303                "no random class for [%s] (fmt=%s)." % (
304                    self.name, self.fmt
305                )
306            )
307
308
309class _FieldContainer(object):
310    """
311    A field that acts as a container for another field
312    """
313    __slots__ = ["fld"]
314
315    def __getattr__(self, attr):
316        # type: (str) -> Any
317        return getattr(self.fld, attr)
318
319
320AnyField = Union[Field[Any, Any], _FieldContainer]
321
322
323class Emph(_FieldContainer):
324    """Empathize sub-layer for display"""
325    __slots__ = ["fld"]
326
327    def __init__(self, fld):
328        # type: (Any) -> None
329        self.fld = fld
330
331    def __eq__(self, other):
332        # type: (Any) -> bool
333        return bool(self.fld == other)
334
335    def __hash__(self):
336        # type: () -> int
337        return hash(self.fld)
338
339
340class MayEnd(_FieldContainer):
341    """
342    Allow packet dissection to end after the dissection of this field
343    if no bytes are left.
344
345    A good example would be a length field that can be 0 or a set value,
346    and where it would be too annoying to use multiple ConditionalFields
347
348    Important note: any field below this one MUST default
349    to an empty value, else the behavior will be unexpected.
350    """
351    __slots__ = ["fld"]
352
353    def __init__(self, fld):
354        # type: (Any) -> None
355        self.fld = fld
356
357    def __eq__(self, other):
358        # type: (Any) -> bool
359        return bool(self.fld == other)
360
361    def __hash__(self):
362        # type: () -> int
363        return hash(self.fld)
364
365
366class ActionField(_FieldContainer):
367    __slots__ = ["fld", "_action_method", "_privdata"]
368
369    def __init__(self, fld, action_method, **kargs):
370        # type: (Field[Any, Any], str, **Any) -> None
371        self.fld = fld
372        self._action_method = action_method
373        self._privdata = kargs
374
375    def any2i(self, pkt, val):
376        # type: (Optional[Packet], int) -> Any
377        getattr(pkt, self._action_method)(val, self.fld, **self._privdata)
378        return getattr(self.fld, "any2i")(pkt, val)
379
380
381class ConditionalField(_FieldContainer):
382    __slots__ = ["fld", "cond"]
383
384    def __init__(self,
385                 fld,  # type: AnyField
386                 cond  # type: Callable[[Packet], bool]
387                 ):
388        # type: (...) -> None
389        self.fld = fld
390        self.cond = cond
391
392    def _evalcond(self, pkt):
393        # type: (Packet) -> bool
394        return bool(self.cond(pkt))
395
396    def any2i(self, pkt, x):
397        # type: (Optional[Packet], Any) -> Any
398        # BACKWARD COMPATIBILITY
399        # Note: we shouldn't need this function. (it's not correct)
400        # However, having i2h implemented (#2364), it changes the default
401        # behavior and broke all packets that wrongly use two ConditionalField
402        # with the same name. Those packets are the problem: they are wrongly
403        # built (they should either be re-using the same conditional field, or
404        # using a MultipleTypeField).
405        # But I don't want to dive into fixing all of them just yet,
406        # so for now, let's keep this this way, even though it's not correct.
407        if type(self.fld) is Field:
408            return x
409        return self.fld.any2i(pkt, x)
410
411    def i2h(self, pkt, val):
412        # type: (Optional[Packet], Any) -> Any
413        if pkt and not self._evalcond(pkt):
414            return None
415        return self.fld.i2h(pkt, val)
416
417    def getfield(self, pkt, s):
418        # type: (Packet, bytes) -> Tuple[bytes, Any]
419        if self._evalcond(pkt):
420            return self.fld.getfield(pkt, s)
421        else:
422            return s, None
423
424    def addfield(self, pkt, s, val):
425        # type: (Packet, bytes, Any) -> bytes
426        if self._evalcond(pkt):
427            return self.fld.addfield(pkt, s, val)
428        else:
429            return s
430
431    def __getattr__(self, attr):
432        # type: (str) -> Any
433        return getattr(self.fld, attr)
434
435
436class MultipleTypeField(_FieldContainer):
437    """
438    MultipleTypeField are used for fields that can be implemented by
439    various Field subclasses, depending on conditions on the packet.
440
441    It is initialized with `flds` and `dflt`.
442
443    :param dflt: is the default field type, to be used when none of the
444                 conditions matched the current packet.
445    :param flds: is a list of tuples (`fld`, `cond`) or (`fld`, `cond`, `hint`)
446                 where `fld` if a field type, and `cond` a "condition" to
447                 determine if `fld` is the field type that should be used.
448
449    ``cond`` is either:
450
451    - a callable `cond_pkt` that accepts one argument (the packet) and
452      returns True if `fld` should be used, False otherwise.
453    - a tuple (`cond_pkt`, `cond_pkt_val`), where `cond_pkt` is the same
454      as in the previous case and `cond_pkt_val` is a callable that
455      accepts two arguments (the packet, and the value to be set) and
456      returns True if `fld` should be used, False otherwise.
457
458    See scapy.layers.l2.ARP (type "help(ARP)" in Scapy) for an example of
459    use.
460    """
461
462    __slots__ = ["flds", "dflt", "hints", "name", "default"]
463
464    def __init__(
465        self,
466        flds: List[Union[
467            Tuple[Field[Any, Any], Any, str],
468            Tuple[Field[Any, Any], Any]
469        ]],
470        dflt: Field[Any, Any]
471    ) -> None:
472        self.hints = {
473            x[0]: x[2]
474            for x in flds
475            if len(x) == 3
476        }
477        self.flds = [
478            (x[0], x[1]) for x in flds
479        ]
480        self.dflt = dflt
481        self.default = None  # So that we can detect changes in defaults
482        self.name = self.dflt.name
483        if any(x[0].name != self.name for x in self.flds):
484            warnings.warn(
485                ("All fields should have the same name in a "
486                 "MultipleTypeField (%s). Use hints.") % self.name,
487                SyntaxWarning
488            )
489
490    def _iterate_fields_cond(self, pkt, val, use_val):
491        # type: (Optional[Packet], Any, bool) -> Field[Any, Any]
492        """Internal function used by _find_fld_pkt & _find_fld_pkt_val"""
493        # Iterate through the fields
494        for fld, cond in self.flds:
495            if isinstance(cond, tuple):
496                if use_val:
497                    if val is None:
498                        val = self.dflt.default
499                    if cond[1](pkt, val):
500                        return fld
501                    continue
502                else:
503                    cond = cond[0]
504            if cond(pkt):
505                return fld
506        return self.dflt
507
508    def _find_fld_pkt(self, pkt):
509        # type: (Optional[Packet]) -> Field[Any, Any]
510        """Given a Packet instance `pkt`, returns the Field subclass to be
511used. If you know the value to be set (e.g., in .addfield()), use
512._find_fld_pkt_val() instead.
513
514        """
515        return self._iterate_fields_cond(pkt, None, False)
516
517    def _find_fld_pkt_val(self,
518                          pkt,  # type: Optional[Packet]
519                          val,  # type: Any
520                          ):
521        # type: (...) -> Tuple[Field[Any, Any], Any]
522        """Given a Packet instance `pkt` and the value `val` to be set,
523returns the Field subclass to be used, and the updated `val` if necessary.
524
525        """
526        fld = self._iterate_fields_cond(pkt, val, True)
527        if val is None:
528            val = fld.default
529        return fld, val
530
531    def _find_fld(self):
532        # type: () -> Field[Any, Any]
533        """Returns the Field subclass to be used, depending on the Packet
534instance, or the default subclass.
535
536DEV: since the Packet instance is not provided, we have to use a hack
537to guess it. It should only be used if you cannot provide the current
538Packet instance (for example, because of the current Scapy API).
539
540If you have the current Packet instance, use ._find_fld_pkt_val() (if
541the value to set is also known) of ._find_fld_pkt() instead.
542
543        """
544        # Hack to preserve current Scapy API
545        # See https://stackoverflow.com/a/7272464/3223422
546        frame = inspect.currentframe().f_back.f_back  # type: ignore
547        while frame is not None:
548            try:
549                pkt = frame.f_locals['self']
550            except KeyError:
551                pass
552            else:
553                if isinstance(pkt, tuple(self.dflt.owners)):
554                    if not pkt.default_fields:
555                        # Packet not initialized
556                        return self.dflt
557                    return self._find_fld_pkt(pkt)
558            frame = frame.f_back
559        return self.dflt
560
561    def getfield(self,
562                 pkt,  # type: Packet
563                 s,  # type: bytes
564                 ):
565        # type: (...) -> Tuple[bytes, Any]
566        return self._find_fld_pkt(pkt).getfield(pkt, s)
567
568    def addfield(self, pkt, s, val):
569        # type: (Packet, bytes, Any) -> bytes
570        fld, val = self._find_fld_pkt_val(pkt, val)
571        return fld.addfield(pkt, s, val)
572
573    def any2i(self, pkt, val):
574        # type: (Optional[Packet], Any) -> Any
575        fld, val = self._find_fld_pkt_val(pkt, val)
576        return fld.any2i(pkt, val)
577
578    def h2i(self, pkt, val):
579        # type: (Optional[Packet], Any) -> Any
580        fld, val = self._find_fld_pkt_val(pkt, val)
581        return fld.h2i(pkt, val)
582
583    def i2h(self,
584            pkt,  # type: Packet
585            val,  # type: Any
586            ):
587        # type: (...) -> Any
588        fld, val = self._find_fld_pkt_val(pkt, val)
589        return fld.i2h(pkt, val)
590
591    def i2m(self, pkt, val):
592        # type: (Optional[Packet], Optional[Any]) -> Any
593        fld, val = self._find_fld_pkt_val(pkt, val)
594        return fld.i2m(pkt, val)
595
596    def i2len(self, pkt, val):
597        # type: (Packet, Any) -> int
598        fld, val = self._find_fld_pkt_val(pkt, val)
599        return fld.i2len(pkt, val)
600
601    def i2repr(self, pkt, val):
602        # type: (Optional[Packet], Any) -> str
603        fld, val = self._find_fld_pkt_val(pkt, val)
604        hint = ""
605        if fld in self.hints:
606            hint = " (%s)" % self.hints[fld]
607        return fld.i2repr(pkt, val) + hint
608
609    def register_owner(self, cls):
610        # type: (Type[Packet]) -> None
611        for fld, _ in self.flds:
612            fld.owners.append(cls)
613        self.dflt.owners.append(cls)
614
615    def get_fields_list(self):
616        # type: () -> List[Any]
617        return [self]
618
619    @property
620    def fld(self):
621        # type: () -> Field[Any, Any]
622        return self._find_fld()
623
624
625class PadField(_FieldContainer):
626    """Add bytes after the proxified field so that it ends at the specified
627       alignment from its beginning"""
628    __slots__ = ["fld", "_align", "_padwith"]
629
630    def __init__(self, fld, align, padwith=None):
631        # type: (AnyField, int, Optional[bytes]) -> None
632        self.fld = fld
633        self._align = align
634        self._padwith = padwith or b"\x00"
635
636    def padlen(self, flen, pkt):
637        # type: (int, Packet) -> int
638        return -flen % self._align
639
640    def getfield(self,
641                 pkt,  # type: Packet
642                 s,  # type: bytes
643                 ):
644        # type: (...) -> Tuple[bytes, Any]
645        remain, val = self.fld.getfield(pkt, s)
646        padlen = self.padlen(len(s) - len(remain), pkt)
647        return remain[padlen:], val
648
649    def addfield(self,
650                 pkt,  # type: Packet
651                 s,  # type: bytes
652                 val,  # type: Any
653                 ):
654        # type: (...) -> bytes
655        sval = self.fld.addfield(pkt, b"", val)
656        return s + sval + (
657            self.padlen(len(sval), pkt) * self._padwith
658        )
659
660
661class ReversePadField(PadField):
662    """Add bytes BEFORE the proxified field so that it starts at the specified
663       alignment from its beginning"""
664
665    def original_length(self, pkt):
666        # type: (Packet) -> int
667        return len(pkt.original)
668
669    def getfield(self,
670                 pkt,  # type: Packet
671                 s,  # type: bytes
672                 ):
673        # type: (...) -> Tuple[bytes, Any]
674        # We need to get the length that has already been dissected
675        padlen = self.padlen(self.original_length(pkt) - len(s), pkt)
676        return self.fld.getfield(pkt, s[padlen:])
677
678    def addfield(self,
679                 pkt,  # type: Packet
680                 s,  # type: bytes
681                 val,  # type: Any
682                 ):
683        # type: (...) -> bytes
684        sval = self.fld.addfield(pkt, b"", val)
685        return s + struct.pack("%is" % (
686            self.padlen(len(s), pkt)
687        ), self._padwith) + sval
688
689
690class TrailerBytes(bytes):
691    """
692    Reverses slice operations to take from the back of the packet,
693    not the front
694    """
695
696    def __getitem__(self, item):  # type: ignore
697        # type: (Union[int, slice]) -> Union[int, bytes]
698        if isinstance(item, int):
699            if item < 0:
700                item = 1 + item
701            else:
702                item = len(self) - 1 - item
703        elif isinstance(item, slice):
704            start, stop, step = item.start, item.stop, item.step
705            new_start = -stop if stop else None
706            new_stop = -start if start else None
707            item = slice(new_start, new_stop, step)
708        return super(self.__class__, self).__getitem__(item)
709
710
711class TrailerField(_FieldContainer):
712    """Special Field that gets its value from the end of the *packet*
713    (Note: not layer, but packet).
714
715    Mostly used for FCS
716    """
717    __slots__ = ["fld"]
718
719    def __init__(self, fld):
720        # type: (Field[Any, Any]) -> None
721        self.fld = fld
722
723    # Note: this is ugly. Very ugly.
724    # Do not copy this crap elsewhere, so that if one day we get
725    # brave enough to refactor it, it'll be easier.
726
727    def getfield(self, pkt, s):
728        # type: (Packet, bytes) -> Tuple[bytes, int]
729        previous_post_dissect = pkt.post_dissect
730
731        def _post_dissect(self, s):
732            # type: (Packet, bytes) -> bytes
733            # Reset packet to allow post_build
734            self.raw_packet_cache = None
735            self.post_dissect = previous_post_dissect  # type: ignore
736            return previous_post_dissect(s)
737        pkt.post_dissect = MethodType(_post_dissect, pkt)  # type: ignore
738        s = TrailerBytes(s)
739        s, val = self.fld.getfield(pkt, s)
740        return bytes(s), val
741
742    def addfield(self, pkt, s, val):
743        # type: (Packet, bytes, Optional[int]) -> bytes
744        previous_post_build = pkt.post_build
745        value = self.fld.addfield(pkt, b"", val)
746
747        def _post_build(self, p, pay):
748            # type: (Packet, bytes, bytes) -> bytes
749            pay += value
750            self.post_build = previous_post_build  # type: ignore
751            return previous_post_build(p, pay)
752        pkt.post_build = MethodType(_post_build, pkt)  # type: ignore
753        return s
754
755
756class FCSField(TrailerField):
757    """
758    A FCS field that gets appended at the end of the *packet* (not layer).
759    """
760
761    def __init__(self, *args, **kwargs):
762        # type: (*Any, **Any) -> None
763        super(FCSField, self).__init__(Field(*args, **kwargs))
764
765    def i2repr(self, pkt, x):
766        # type: (Optional[Packet], int) -> str
767        return lhex(self.i2h(pkt, x))
768
769
770class DestField(Field[str, bytes]):
771    __slots__ = ["defaultdst"]
772    # Each subclass must have its own bindings attribute
773    bindings = {}  # type: Dict[Type[Packet], Tuple[str, Any]]
774
775    def __init__(self, name, default):
776        # type: (str, str) -> None
777        self.defaultdst = default
778
779    def dst_from_pkt(self, pkt):
780        # type: (Packet) -> str
781        for addr, condition in self.bindings.get(pkt.payload.__class__, []):
782            try:
783                if all(pkt.payload.getfieldval(field) == value
784                       for field, value in condition.items()):
785                    return addr  # type: ignore
786            except AttributeError:
787                pass
788        return self.defaultdst
789
790    @classmethod
791    def bind_addr(cls, layer, addr, **condition):
792        # type: (Type[Packet], str, **Any) -> None
793        cls.bindings.setdefault(layer, []).append(  # type: ignore
794            (addr, condition)
795        )
796
797
798class MACField(Field[Optional[str], bytes]):
799    def __init__(self, name, default):
800        # type: (str, Optional[Any]) -> None
801        Field.__init__(self, name, default, "6s")
802
803    def i2m(self, pkt, x):
804        # type: (Optional[Packet], Optional[str]) -> bytes
805        if not x:
806            return b"\0\0\0\0\0\0"
807        try:
808            y = mac2str(x)
809        except (struct.error, OverflowError):
810            y = bytes_encode(x)
811        return y
812
813    def m2i(self, pkt, x):
814        # type: (Optional[Packet], bytes) -> str
815        return str2mac(x)
816
817    def any2i(self, pkt, x):
818        # type: (Optional[Packet], Any) -> str
819        if isinstance(x, bytes) and len(x) == 6:
820            return self.m2i(pkt, x)
821        return cast(str, x)
822
823    def i2repr(self, pkt, x):
824        # type: (Optional[Packet], Optional[str]) -> str
825        x = self.i2h(pkt, x)
826        if x is None:
827            return repr(x)
828        if self in conf.resolve:
829            x = conf.manufdb._resolve_MAC(x)
830        return x
831
832    def randval(self):
833        # type: () -> RandMAC
834        return RandMAC()
835
836
837class LEMACField(MACField):
838    def i2m(self, pkt, x):
839        # type: (Optional[Packet], Optional[str]) -> bytes
840        return MACField.i2m(self, pkt, x)[::-1]
841
842    def m2i(self, pkt, x):
843        # type: (Optional[Packet], bytes) -> str
844        return MACField.m2i(self, pkt, x[::-1])
845
846
847class IPField(Field[Union[str, Net], bytes]):
848    def __init__(self, name, default):
849        # type: (str, Optional[str]) -> None
850        Field.__init__(self, name, default, "4s")
851
852    def h2i(self, pkt, x):
853        # type: (Optional[Packet], Union[AnyStr, List[AnyStr]]) -> Any
854        if isinstance(x, bytes):
855            x = plain_str(x)  # type: ignore
856        if isinstance(x, _ScopedIP):
857            return x
858        elif isinstance(x, str):
859            x = ScopedIP(x)
860            try:
861                inet_aton(x)
862            except socket.error:
863                return Net(x)
864        elif isinstance(x, tuple):
865            if len(x) != 2:
866                raise ValueError("Invalid IP format")
867            return Net(*x)
868        elif isinstance(x, list):
869            return [self.h2i(pkt, n) for n in x]
870        return x
871
872    def i2h(self, pkt, x):
873        # type: (Optional[Packet], Optional[Union[str, Net]]) -> str
874        return cast(str, x)
875
876    def resolve(self, x):
877        # type: (str) -> str
878        if self in conf.resolve:
879            try:
880                ret = socket.gethostbyaddr(x)[0]
881            except Exception:
882                pass
883            else:
884                if ret:
885                    return ret
886        return x
887
888    def i2m(self, pkt, x):
889        # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes
890        if x is None:
891            return b'\x00\x00\x00\x00'
892        return inet_aton(plain_str(x))
893
894    def m2i(self, pkt, x):
895        # type: (Optional[Packet], bytes) -> str
896        return inet_ntoa(x)
897
898    def any2i(self, pkt, x):
899        # type: (Optional[Packet], Any) -> Any
900        return self.h2i(pkt, x)
901
902    def i2repr(self, pkt, x):
903        # type: (Optional[Packet], Union[str, Net]) -> str
904        if isinstance(x, _ScopedIP) and x.scope:
905            return repr(x)
906        r = self.resolve(self.i2h(pkt, x))
907        return r if isinstance(r, str) else repr(r)
908
909    def randval(self):
910        # type: () -> RandIP
911        return RandIP()
912
913
914class SourceIPField(IPField):
915    def __init__(self, name):
916        # type: (str) -> None
917        IPField.__init__(self, name, None)
918
919    def __findaddr(self, pkt):
920        # type: (Packet) -> Optional[str]
921        if conf.route is None:
922            # unused import, only to initialize conf.route
923            import scapy.route  # noqa: F401
924        return pkt.route()[1] or conf.route.route()[1]
925
926    def i2m(self, pkt, x):
927        # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes
928        if x is None and pkt is not None:
929            x = self.__findaddr(pkt)
930        return super(SourceIPField, self).i2m(pkt, x)
931
932    def i2h(self, pkt, x):
933        # type: (Optional[Packet], Optional[Union[str, Net]]) -> str
934        if x is None and pkt is not None:
935            x = self.__findaddr(pkt)
936        return super(SourceIPField, self).i2h(pkt, x)
937
938
939class IP6Field(Field[Optional[Union[str, Net6]], bytes]):
940    def __init__(self, name, default):
941        # type: (str, Optional[str]) -> None
942        Field.__init__(self, name, default, "16s")
943
944    def h2i(self, pkt, x):
945        # type: (Optional[Packet], Any) -> str
946        if isinstance(x, bytes):
947            x = plain_str(x)
948        if isinstance(x, _ScopedIP):
949            return x
950        elif isinstance(x, str):
951            x = ScopedIP(x)
952            try:
953                x = ScopedIP(in6_ptop(x), scope=x.scope)
954            except socket.error:
955                return Net6(x)  # type: ignore
956        elif isinstance(x, tuple):
957            if len(x) != 2:
958                raise ValueError("Invalid IPv6 format")
959            return Net6(*x)  # type: ignore
960        elif isinstance(x, list):
961            x = [self.h2i(pkt, n) for n in x]
962        return x  # type: ignore
963
964    def i2h(self, pkt, x):
965        # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
966        return cast(str, x)
967
968    def i2m(self, pkt, x):
969        # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
970        if x is None:
971            x = "::"
972        return inet_pton(socket.AF_INET6, plain_str(x))
973
974    def m2i(self, pkt, x):
975        # type: (Optional[Packet], bytes) -> str
976        return inet_ntop(socket.AF_INET6, x)
977
978    def any2i(self, pkt, x):
979        # type: (Optional[Packet], Optional[str]) -> str
980        return self.h2i(pkt, x)
981
982    def i2repr(self, pkt, x):
983        # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
984        if x is None:
985            return self.i2h(pkt, x)
986        elif not isinstance(x, Net6) and not isinstance(x, list):
987            if in6_isaddrTeredo(x):   # print Teredo info
988                server, _, maddr, mport = teredoAddrExtractInfo(x)
989                return "%s [Teredo srv: %s cli: %s:%s]" % (self.i2h(pkt, x), server, maddr, mport)  # noqa: E501
990            elif in6_isaddr6to4(x):   # print encapsulated address
991                vaddr = in6_6to4ExtractAddr(x)
992                return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr)
993            elif isinstance(x, _ScopedIP) and x.scope:
994                return repr(x)
995        r = self.i2h(pkt, x)          # No specific information to return
996        return r if isinstance(r, str) else repr(r)
997
998    def randval(self):
999        # type: () -> RandIP6
1000        return RandIP6()
1001
1002
1003class SourceIP6Field(IP6Field):
1004    def __init__(self, name):
1005        # type: (str) -> None
1006        IP6Field.__init__(self, name, None)
1007
1008    def __findaddr(self, pkt):
1009        # type: (Packet) -> Optional[str]
1010        if conf.route6 is None:
1011            # unused import, only to initialize conf.route
1012            import scapy.route6  # noqa: F401
1013        return pkt.route()[1]
1014
1015    def i2m(self, pkt, x):
1016        # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
1017        if x is None and pkt is not None:
1018            x = self.__findaddr(pkt)
1019        return super(SourceIP6Field, self).i2m(pkt, x)
1020
1021    def i2h(self, pkt, x):
1022        # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
1023        if x is None and pkt is not None:
1024            x = self.__findaddr(pkt)
1025        return super(SourceIP6Field, self).i2h(pkt, x)
1026
1027
1028class DestIP6Field(IP6Field, DestField):
1029    bindings = {}  # type: Dict[Type[Packet], Tuple[str, Any]]
1030
1031    def __init__(self, name, default):
1032        # type: (str, str) -> None
1033        IP6Field.__init__(self, name, None)
1034        DestField.__init__(self, name, default)
1035
1036    def i2m(self, pkt, x):
1037        # type: (Optional[Packet], Optional[Union[str, Net6]]) -> bytes
1038        if x is None and pkt is not None:
1039            x = self.dst_from_pkt(pkt)
1040        return IP6Field.i2m(self, pkt, x)
1041
1042    def i2h(self, pkt, x):
1043        # type: (Optional[Packet], Optional[Union[str, Net6]]) -> str
1044        if x is None and pkt is not None:
1045            x = self.dst_from_pkt(pkt)
1046        return super(DestIP6Field, self).i2h(pkt, x)
1047
1048
1049class ByteField(Field[int, int]):
1050    def __init__(self, name, default):
1051        # type: (str, Optional[int]) -> None
1052        Field.__init__(self, name, default, "B")
1053
1054
1055class XByteField(ByteField):
1056    def i2repr(self, pkt, x):
1057        # type: (Optional[Packet], int) -> str
1058        return lhex(self.i2h(pkt, x))
1059
1060
1061# XXX Unused field: at least add some tests
1062class OByteField(ByteField):
1063    def i2repr(self, pkt, x):
1064        # type: (Optional[Packet], int) -> str
1065        return "%03o" % self.i2h(pkt, x)
1066
1067
1068class ThreeBytesField(Field[int, int]):
1069    def __init__(self, name, default):
1070        # type: (str, int) -> None
1071        Field.__init__(self, name, default, "!I")
1072
1073    def addfield(self, pkt, s, val):
1074        # type: (Packet, bytes, Optional[int]) -> bytes
1075        return s + struct.pack(self.fmt, self.i2m(pkt, val))[1:4]
1076
1077    def getfield(self, pkt, s):
1078        # type: (Packet, bytes) -> Tuple[bytes, int]
1079        return s[3:], self.m2i(pkt, struct.unpack(self.fmt, b"\x00" + s[:3])[0])  # noqa: E501
1080
1081
1082class X3BytesField(ThreeBytesField, XByteField):
1083    def i2repr(self, pkt, x):
1084        # type: (Optional[Packet], int) -> str
1085        return XByteField.i2repr(self, pkt, x)
1086
1087
1088class LEThreeBytesField(ByteField):
1089    def __init__(self, name, default):
1090        # type: (str, Optional[int]) -> None
1091        Field.__init__(self, name, default, "<I")
1092
1093    def addfield(self, pkt, s, val):
1094        # type: (Packet, bytes, Optional[int]) -> bytes
1095        return s + struct.pack(self.fmt, self.i2m(pkt, val))[:3]
1096
1097    def getfield(self, pkt, s):
1098        # type: (Optional[Packet], bytes) -> Tuple[bytes, int]
1099        return s[3:], self.m2i(pkt, struct.unpack(self.fmt, s[:3] + b"\x00")[0])  # noqa: E501
1100
1101
1102class XLE3BytesField(LEThreeBytesField, XByteField):
1103    def i2repr(self, pkt, x):
1104        # type: (Optional[Packet], int) -> str
1105        return XByteField.i2repr(self, pkt, x)
1106
1107
1108def LEX3BytesField(*args, **kwargs):
1109    # type: (*Any, **Any) -> Any
1110    warnings.warn(
1111        "LEX3BytesField is deprecated. Use XLE3BytesField",
1112        DeprecationWarning
1113    )
1114    return XLE3BytesField(*args, **kwargs)
1115
1116
1117class NBytesField(Field[int, List[int]]):
1118    def __init__(self, name, default, sz):
1119        # type: (str, Optional[int], int) -> None
1120        Field.__init__(self, name, default, "<" + "B" * sz)
1121
1122    def i2m(self, pkt, x):
1123        # type: (Optional[Packet], Optional[int]) -> List[int]
1124        if x is None:
1125            return [0] * self.sz
1126        x2m = list()
1127        for _ in range(self.sz):
1128            x2m.append(x % 256)
1129            x //= 256
1130        return x2m[::-1]
1131
1132    def m2i(self, pkt, x):
1133        # type: (Optional[Packet], Union[List[int], int]) -> int
1134        if isinstance(x, int):
1135            return x
1136        # x can be a tuple when coming from struct.unpack  (from getfield)
1137        if isinstance(x, (list, tuple)):
1138            return sum(d * (256 ** i) for i, d in enumerate(reversed(x)))
1139        return 0
1140
1141    def i2repr(self, pkt, x):
1142        # type: (Optional[Packet], int) -> str
1143        if isinstance(x, int):
1144            return '%i' % x
1145        return super(NBytesField, self).i2repr(pkt, x)
1146
1147    def addfield(self, pkt, s, val):
1148        # type: (Optional[Packet], bytes, Optional[int]) -> bytes
1149        return s + self.struct.pack(*self.i2m(pkt, val))
1150
1151    def getfield(self, pkt, s):
1152        # type: (Optional[Packet], bytes) -> Tuple[bytes, int]
1153        return (s[self.sz:],
1154                self.m2i(pkt, self.struct.unpack(s[:self.sz])))  # type: ignore
1155
1156    def randval(self):
1157        # type: () -> RandNum
1158        return RandNum(0, 2 ** (self.sz * 8) - 1)
1159
1160
1161class XNBytesField(NBytesField):
1162    def i2repr(self, pkt, x):
1163        # type: (Optional[Packet], int) -> str
1164        if isinstance(x, int):
1165            return '0x%x' % x
1166        # x can be a tuple when coming from struct.unpack (from getfield)
1167        if isinstance(x, (list, tuple)):
1168            return "0x" + "".join("%02x" % b for b in x)
1169        return super(XNBytesField, self).i2repr(pkt, x)
1170
1171
1172class SignedByteField(Field[int, int]):
1173    def __init__(self, name, default):
1174        # type: (str, Optional[int]) -> None
1175        Field.__init__(self, name, default, "b")
1176
1177
1178class FieldValueRangeException(Scapy_Exception):
1179    pass
1180
1181
1182class MaximumItemsCount(Scapy_Exception):
1183    pass
1184
1185
1186class FieldAttributeException(Scapy_Exception):
1187    pass
1188
1189
1190class YesNoByteField(ByteField):
1191    """
1192    A byte based flag field that shows representation of its number
1193    based on a given association
1194
1195    In its default configuration the following representation is generated:
1196        x == 0 : 'no'
1197        x != 0 : 'yes'
1198
1199    In more sophisticated use-cases (e.g. yes/no/invalid) one can use the
1200    config attribute to configure.
1201    Key-value, key-range and key-value-set associations that will be used to
1202    generate the values representation.
1203
1204    - A range is given by a tuple (<first-val>, <last-value>) including the
1205      last value.
1206    - A single-value tuple is treated as scalar.
1207    - A list defines a set of (probably non consecutive) values that should be
1208      associated to a given key.
1209
1210    All values not associated with a key will be shown as number of type
1211    unsigned byte.
1212
1213    **For instance**::
1214
1215        config = {
1216            'no' : 0,
1217            'foo' : (1,22),
1218            'yes' : 23,
1219            'bar' : [24,25, 42, 48, 87, 253]
1220        }
1221
1222    Generates the following representations::
1223
1224        x == 0 : 'no'
1225        x == 15: 'foo'
1226        x == 23: 'yes'
1227        x == 42: 'bar'
1228        x == 43: 43
1229
1230    Another example, using the config attribute one could also revert
1231    the stock-yes-no-behavior::
1232
1233        config = {
1234                'yes' : 0,
1235                'no' : (1,255)
1236        }
1237
1238    Will generate the following value representation::
1239
1240        x == 0 : 'yes'
1241        x != 0 : 'no'
1242
1243    """
1244    __slots__ = ['eval_fn']
1245
1246    def _build_config_representation(self, config):
1247        # type: (Dict[str, Any]) -> None
1248        assoc_table = dict()
1249        for key in config:
1250            value_spec = config[key]
1251
1252            value_spec_type = type(value_spec)
1253
1254            if value_spec_type is int:
1255                if value_spec < 0 or value_spec > 255:
1256                    raise FieldValueRangeException('given field value {} invalid - '  # noqa: E501
1257                                                   'must be in range [0..255]'.format(value_spec))  # noqa: E501
1258                assoc_table[value_spec] = key
1259
1260            elif value_spec_type is list:
1261                for value in value_spec:
1262                    if value < 0 or value > 255:
1263                        raise FieldValueRangeException('given field value {} invalid - '  # noqa: E501
1264                                                       'must be in range [0..255]'.format(value))  # noqa: E501
1265                    assoc_table[value] = key
1266
1267            elif value_spec_type is tuple:
1268                value_spec_len = len(value_spec)
1269                if value_spec_len != 2:
1270                    raise FieldAttributeException('invalid length {} of given config item tuple {} - must be '  # noqa: E501
1271                                                  '(<start-range>, <end-range>).'.format(value_spec_len, value_spec))  # noqa: E501
1272
1273                value_range_start = value_spec[0]
1274                if value_range_start < 0 or value_range_start > 255:
1275                    raise FieldValueRangeException('given field value {} invalid - '  # noqa: E501
1276                                                   'must be in range [0..255]'.format(value_range_start))  # noqa: E501
1277
1278                value_range_end = value_spec[1]
1279                if value_range_end < 0 or value_range_end > 255:
1280                    raise FieldValueRangeException('given field value {} invalid - '  # noqa: E501
1281                                                   'must be in range [0..255]'.format(value_range_end))  # noqa: E501
1282
1283                for value in range(value_range_start, value_range_end + 1):
1284
1285                    assoc_table[value] = key
1286
1287        self.eval_fn = lambda x: assoc_table[x] if x in assoc_table else x
1288
1289    def __init__(self, name, default, config=None):
1290        # type: (str, int, Optional[Dict[str, Any]]) -> None
1291
1292        if not config:
1293            # this represents the common use case and therefore it is kept small  # noqa: E501
1294            self.eval_fn = lambda x: 'no' if x == 0 else 'yes'
1295        else:
1296            self._build_config_representation(config)
1297        ByteField.__init__(self, name, default)
1298
1299    def i2repr(self, pkt, x):
1300        # type: (Optional[Packet], int) -> str
1301        return self.eval_fn(x)  # type: ignore
1302
1303
1304class ShortField(Field[int, int]):
1305    def __init__(self, name, default):
1306        # type: (str, Optional[int]) -> None
1307        Field.__init__(self, name, default, "H")
1308
1309
1310class SignedShortField(Field[int, int]):
1311    def __init__(self, name, default):
1312        # type: (str, Optional[int]) -> None
1313        Field.__init__(self, name, default, "h")
1314
1315
1316class LEShortField(Field[int, int]):
1317    def __init__(self, name, default):
1318        # type: (str, Optional[int]) -> None
1319        Field.__init__(self, name, default, "<H")
1320
1321
1322class LESignedShortField(Field[int, int]):
1323    def __init__(self, name, default):
1324        # type: (str, Optional[int]) -> None
1325        Field.__init__(self, name, default, "<h")
1326
1327
1328class XShortField(ShortField):
1329    def i2repr(self, pkt, x):
1330        # type: (Optional[Packet], int) -> str
1331        return lhex(self.i2h(pkt, x))
1332
1333
1334class IntField(Field[int, int]):
1335    def __init__(self, name, default):
1336        # type: (str, Optional[int]) -> None
1337        Field.__init__(self, name, default, "I")
1338
1339
1340class SignedIntField(Field[int, int]):
1341    def __init__(self, name, default):
1342        # type: (str, int) -> None
1343        Field.__init__(self, name, default, "i")
1344
1345
1346class LEIntField(Field[int, int]):
1347    def __init__(self, name, default):
1348        # type: (str, Optional[int]) -> None
1349        Field.__init__(self, name, default, "<I")
1350
1351
1352class LESignedIntField(Field[int, int]):
1353    def __init__(self, name, default):
1354        # type: (str, int) -> None
1355        Field.__init__(self, name, default, "<i")
1356
1357
1358class XIntField(IntField):
1359    def i2repr(self, pkt, x):
1360        # type: (Optional[Packet], int) -> str
1361        return lhex(self.i2h(pkt, x))
1362
1363
1364class XLEIntField(LEIntField, XIntField):
1365    def i2repr(self, pkt, x):
1366        # type: (Optional[Packet], int) -> str
1367        return XIntField.i2repr(self, pkt, x)
1368
1369
1370class XLEShortField(LEShortField, XShortField):
1371    def i2repr(self, pkt, x):
1372        # type: (Optional[Packet], int) -> str
1373        return XShortField.i2repr(self, pkt, x)
1374
1375
1376class LongField(Field[int, int]):
1377    def __init__(self, name, default):
1378        # type: (str, int) -> None
1379        Field.__init__(self, name, default, "Q")
1380
1381
1382class SignedLongField(Field[int, int]):
1383    def __init__(self, name, default):
1384        # type: (str, Optional[int]) -> None
1385        Field.__init__(self, name, default, "q")
1386
1387
1388class LELongField(LongField):
1389    def __init__(self, name, default):
1390        # type: (str, Optional[int]) -> None
1391        Field.__init__(self, name, default, "<Q")
1392
1393
1394class LESignedLongField(Field[int, int]):
1395    def __init__(self, name, default):
1396        # type: (str, Optional[Any]) -> None
1397        Field.__init__(self, name, default, "<q")
1398
1399
1400class XLongField(LongField):
1401    def i2repr(self, pkt, x):
1402        # type: (Optional[Packet], int) -> str
1403        return lhex(self.i2h(pkt, x))
1404
1405
1406class XLELongField(LELongField, XLongField):
1407    def i2repr(self, pkt, x):
1408        # type: (Optional[Packet], int) -> str
1409        return XLongField.i2repr(self, pkt, x)
1410
1411
1412class IEEEFloatField(Field[int, int]):
1413    def __init__(self, name, default):
1414        # type: (str, Optional[int]) -> None
1415        Field.__init__(self, name, default, "f")
1416
1417
1418class IEEEDoubleField(Field[int, int]):
1419    def __init__(self, name, default):
1420        # type: (str, Optional[int]) -> None
1421        Field.__init__(self, name, default, "d")
1422
1423
1424class _StrField(Field[I, bytes]):
1425    __slots__ = ["remain"]
1426
1427    def __init__(self, name, default, fmt="H", remain=0):
1428        # type: (str, Optional[I], str, int) -> None
1429        Field.__init__(self, name, default, fmt)
1430        self.remain = remain
1431
1432    def i2len(self, pkt, x):
1433        # type: (Optional[Packet], Any) -> int
1434        if x is None:
1435            return 0
1436        return len(x)
1437
1438    def any2i(self, pkt, x):
1439        # type: (Optional[Packet], Any) -> I
1440        if isinstance(x, str):
1441            x = bytes_encode(x)
1442        return super(_StrField, self).any2i(pkt, x)  # type: ignore
1443
1444    def i2repr(self, pkt, x):
1445        # type: (Optional[Packet], I) -> str
1446        if x and isinstance(x, bytes):
1447            return repr(x)
1448        return super(_StrField, self).i2repr(pkt, x)
1449
1450    def i2m(self, pkt, x):
1451        # type: (Optional[Packet], Optional[I]) -> bytes
1452        if x is None:
1453            return b""
1454        if not isinstance(x, bytes):
1455            return bytes_encode(x)
1456        return x
1457
1458    def addfield(self, pkt, s, val):
1459        # type: (Packet, bytes, Optional[I]) -> bytes
1460        return s + self.i2m(pkt, val)
1461
1462    def getfield(self, pkt, s):
1463        # type: (Packet, bytes) -> Tuple[bytes, I]
1464        if self.remain == 0:
1465            return b"", self.m2i(pkt, s)
1466        else:
1467            return s[-self.remain:], self.m2i(pkt, s[:-self.remain])
1468
1469    def randval(self):
1470        # type: () -> RandBin
1471        return RandBin(RandNum(0, 1200))
1472
1473
1474class StrField(_StrField[bytes]):
1475    pass
1476
1477
1478class StrFieldUtf16(StrField):
1479    def any2i(self, pkt, x):
1480        # type: (Optional[Packet], Optional[str]) -> bytes
1481        if isinstance(x, str):
1482            return self.h2i(pkt, x)
1483        return super(StrFieldUtf16, self).any2i(pkt, x)
1484
1485    def i2repr(self, pkt, x):
1486        # type: (Optional[Packet], bytes) -> str
1487        return plain_str(self.i2h(pkt, x))
1488
1489    def h2i(self, pkt, x):
1490        # type: (Optional[Packet], Optional[str]) -> bytes
1491        return plain_str(x).encode('utf-16-le', errors="replace")
1492
1493    def i2h(self, pkt, x):
1494        # type: (Optional[Packet], bytes) -> str
1495        return bytes_encode(x).decode('utf-16-le', errors="replace")
1496
1497
1498class _StrEnumField:
1499    def __init__(self, **kwargs):
1500        # type: (**Any) -> None
1501        self.enum = kwargs.pop("enum", {})
1502
1503    def i2repr(self, pkt, v):
1504        # type: (Optional[Packet], bytes) -> str
1505        r = v.rstrip(b"\0")
1506        rr = repr(r)
1507        if self.enum:
1508            if v in self.enum:
1509                rr = "%s (%s)" % (rr, self.enum[v])
1510            elif r in self.enum:
1511                rr = "%s (%s)" % (rr, self.enum[r])
1512        return rr
1513
1514
1515class StrEnumField(_StrEnumField, StrField):
1516    __slots__ = ["enum"]
1517
1518    def __init__(
1519            self,
1520            name,  # type: str
1521            default,  # type: bytes
1522            enum=None,  # type: Optional[Dict[str, str]]
1523            **kwargs  # type: Any
1524    ):
1525        # type: (...) -> None
1526        StrField.__init__(self, name, default, **kwargs)  # type: ignore
1527        self.enum = enum
1528
1529
1530K = TypeVar('K', List[BasePacket], BasePacket, Optional[BasePacket])
1531
1532
1533class _PacketField(_StrField[K]):
1534    __slots__ = ["cls"]
1535    holds_packets = 1
1536
1537    def __init__(self,
1538                 name,  # type: str
1539                 default,  # type: Optional[K]
1540                 pkt_cls,  # type: Union[Callable[[bytes], Packet], Type[Packet]]  # noqa: E501
1541                 ):
1542        # type: (...) -> None
1543        super(_PacketField, self).__init__(name, default)
1544        self.cls = pkt_cls
1545
1546    def i2m(self,
1547            pkt,  # type: Optional[Packet]
1548            i,  # type: Any
1549            ):
1550        # type: (...) -> bytes
1551        if i is None:
1552            return b""
1553        return raw(i)
1554
1555    def m2i(self, pkt, m):  # type: ignore
1556        # type: (Optional[Packet], bytes) -> Packet
1557        try:
1558            # we want to set parent wherever possible
1559            return self.cls(m, _parent=pkt)  # type: ignore
1560        except TypeError:
1561            return self.cls(m)
1562
1563
1564class _PacketFieldSingle(_PacketField[K]):
1565    def any2i(self, pkt, x):
1566        # type: (Optional[Packet], Any) -> K
1567        if x and pkt and hasattr(x, "add_parent"):
1568            cast("Packet", x).add_parent(pkt)
1569        return super(_PacketFieldSingle, self).any2i(pkt, x)
1570
1571    def getfield(self,
1572                 pkt,  # type: Packet
1573                 s,  # type: bytes
1574                 ):
1575        # type: (...) -> Tuple[bytes, K]
1576        i = self.m2i(pkt, s)
1577        remain = b""
1578        if conf.padding_layer in i:
1579            r = i[conf.padding_layer]
1580            del r.underlayer.payload
1581            remain = r.load
1582        return remain, i  # type: ignore
1583
1584
1585class PacketField(_PacketFieldSingle[BasePacket]):
1586    def randval(self):  # type: ignore
1587        # type: () -> Packet
1588        from scapy.packet import fuzz
1589        return fuzz(self.cls())  # type: ignore
1590
1591
1592class PacketLenField(_PacketFieldSingle[Optional[BasePacket]]):
1593    __slots__ = ["length_from"]
1594
1595    def __init__(self,
1596                 name,  # type: str
1597                 default,  # type: Packet
1598                 cls,  # type: Union[Callable[[bytes], Packet], Type[Packet]]  # noqa: E501
1599                 length_from=None  # type: Optional[Callable[[Packet], int]]  # noqa: E501
1600                 ):
1601        # type: (...) -> None
1602        super(PacketLenField, self).__init__(name, default, cls)
1603        self.length_from = length_from or (lambda x: 0)
1604
1605    def getfield(self,
1606                 pkt,  # type: Packet
1607                 s,  # type: bytes
1608                 ):
1609        # type: (...) -> Tuple[bytes, Optional[BasePacket]]
1610        len_pkt = self.length_from(pkt)
1611        i = None
1612        if len_pkt:
1613            try:
1614                i = self.m2i(pkt, s[:len_pkt])
1615            except Exception:
1616                if conf.debug_dissector:
1617                    raise
1618                i = conf.raw_layer(load=s[:len_pkt])
1619        return s[len_pkt:], i
1620
1621
1622class PacketListField(_PacketField[List[BasePacket]]):
1623    """PacketListField represents a list containing a series of Packet instances
1624    that might occur right in the middle of another Packet field.
1625    This field type may also be used to indicate that a series of Packet
1626    instances have a sibling semantic instead of a parent/child relationship
1627    (i.e. a stack of layers). All elements in PacketListField have current
1628    packet referenced in parent field.
1629    """
1630    __slots__ = ["count_from", "length_from", "next_cls_cb", "max_count"]
1631    islist = 1
1632
1633    def __init__(
1634            self,
1635            name,  # type: str
1636            default,  # type: Optional[List[BasePacket]]
1637            pkt_cls=None,  # type: Optional[Union[Callable[[bytes], Packet], Type[Packet]]]  # noqa: E501
1638            count_from=None,  # type: Optional[Callable[[Packet], int]]
1639            length_from=None,  # type: Optional[Callable[[Packet], int]]
1640            next_cls_cb=None,  # type: Optional[Callable[[Packet, List[BasePacket], Optional[Packet], bytes], Type[Packet]]]  # noqa: E501
1641            max_count=None,  # type: Optional[int]
1642    ):
1643        # type: (...) -> None
1644        """
1645        The number of Packet instances that are dissected by this field can
1646        be parametrized using one of three different mechanisms/parameters:
1647
1648            * count_from: a callback that returns the number of Packet
1649              instances to dissect. The callback prototype is::
1650
1651                count_from(pkt:Packet) -> int
1652
1653            * length_from: a callback that returns the number of bytes that
1654              must be dissected by this field. The callback prototype is::
1655
1656                length_from(pkt:Packet) -> int
1657
1658            * next_cls_cb: a callback that enables a Scapy developer to
1659              dynamically discover if another Packet instance should be
1660              dissected or not. See below for this callback prototype.
1661
1662        The bytes that are not consumed during the dissection of this field
1663        are passed to the next field of the current packet.
1664
1665        For the serialization of such a field, the list of Packets that are
1666        contained in a PacketListField can be heterogeneous and is
1667        unrestricted.
1668
1669        The type of the Packet instances that are dissected with this field is
1670        specified or discovered using one of the following mechanism:
1671
1672            * the pkt_cls parameter may contain a callable that returns an
1673              instance of the dissected Packet. This may either be a
1674              reference of a Packet subclass (e.g. DNSRROPT in layers/dns.py)
1675              to generate an homogeneous PacketListField or a function
1676              deciding the type of the Packet instance
1677              (e.g. _CDPGuessAddrRecord in contrib/cdp.py)
1678
1679            * the pkt_cls parameter may contain a class object with a defined
1680              ``dispatch_hook`` classmethod. That method must return a Packet
1681              instance. The ``dispatch_hook`` callmethod must implement the
1682                following prototype::
1683
1684                dispatch_hook(cls,
1685                              _pkt:Optional[Packet],
1686                              *args, **kargs
1687                ) -> Type[Packet]
1688
1689                The _pkt parameter may contain a reference to the packet
1690                instance containing the PacketListField that is being
1691                dissected.
1692
1693            * the ``next_cls_cb`` parameter may contain a callable whose
1694              prototype is::
1695
1696                cbk(pkt:Packet,
1697                    lst:List[Packet],
1698                    cur:Optional[Packet],
1699                    remain:str
1700                ) -> Optional[Type[Packet]]
1701
1702              The pkt argument contains a reference to the Packet instance
1703              containing the PacketListField that is being dissected.
1704              The lst argument is the list of all Packet instances that were
1705              previously parsed during the current ``PacketListField``
1706              dissection, saved for the very last Packet instance.
1707              The cur argument contains a reference to that very last parsed
1708              ``Packet`` instance. The remain argument contains the bytes
1709              that may still be consumed by the current PacketListField
1710              dissection operation.
1711
1712              This callback returns either the type of the next Packet to
1713              dissect or None to indicate that no more Packet are to be
1714              dissected.
1715
1716              These four arguments allows a variety of dynamic discovery of
1717              the number of Packet to dissect and of the type of each one of
1718              these Packets, including: type determination based on current
1719              Packet instances or its underlayers, continuation based on the
1720              previously parsed Packet instances within that PacketListField,
1721              continuation based on a look-ahead on the bytes to be
1722              dissected...
1723
1724        The pkt_cls and next_cls_cb parameters are semantically exclusive,
1725        although one could specify both. If both are specified, pkt_cls is
1726        silently ignored. The same is true for count_from and next_cls_cb.
1727
1728        length_from and next_cls_cb are compatible and the dissection will
1729        end, whichever of the two stop conditions comes first.
1730
1731        :param name: the name of the field
1732        :param default: the default value of this field; generally an empty
1733            Python list
1734        :param pkt_cls: either a callable returning a Packet instance or a
1735            class object defining a ``dispatch_hook`` class method
1736        :param count_from: a callback returning the number of Packet
1737            instances to dissect.
1738        :param length_from: a callback returning the number of bytes to dissect
1739        :param next_cls_cb: a callback returning either None or the type of
1740            the next Packet to dissect.
1741        :param max_count: an int containing the max amount of results. This is
1742            a safety mechanism, exceeding this value will raise a Scapy_Exception.
1743        """
1744        if default is None:
1745            default = []  # Create a new list for each instance
1746        super(PacketListField, self).__init__(
1747            name,
1748            default,
1749            pkt_cls  # type: ignore
1750        )
1751        self.count_from = count_from
1752        self.length_from = length_from
1753        self.next_cls_cb = next_cls_cb
1754        self.max_count = max_count
1755
1756    def any2i(self, pkt, x):
1757        # type: (Optional[Packet], Any) -> List[BasePacket]
1758        if not isinstance(x, list):
1759            if x and pkt and hasattr(x, "add_parent"):
1760                x.add_parent(pkt)
1761            return [x]
1762        elif pkt:
1763            for i in x:
1764                if not i or not hasattr(i, "add_parent"):
1765                    continue
1766                i.add_parent(pkt)
1767        return x
1768
1769    def i2count(self,
1770                pkt,  # type: Optional[Packet]
1771                val,  # type: List[BasePacket]
1772                ):
1773        # type: (...) -> int
1774        if isinstance(val, list):
1775            return len(val)
1776        return 1
1777
1778    def i2len(self,
1779              pkt,  # type: Optional[Packet]
1780              val,  # type: List[Packet]
1781              ):
1782        # type: (...) -> int
1783        return sum(len(self.i2m(pkt, p)) for p in val)
1784
1785    def getfield(self, pkt, s):
1786        # type: (Packet, bytes) -> Tuple[bytes, List[BasePacket]]
1787        c = len_pkt = cls = None
1788        if self.length_from is not None:
1789            len_pkt = self.length_from(pkt)
1790        elif self.count_from is not None:
1791            c = self.count_from(pkt)
1792        if self.next_cls_cb is not None:
1793            cls = self.next_cls_cb(pkt, [], None, s)
1794            c = 1
1795            if cls is None:
1796                c = 0
1797
1798        lst = []  # type: List[BasePacket]
1799        ret = b""
1800        remain = s
1801        if len_pkt is not None:
1802            remain, ret = s[:len_pkt], s[len_pkt:]
1803        while remain:
1804            if c is not None:
1805                if c <= 0:
1806                    break
1807                c -= 1
1808            try:
1809                if cls is not None:
1810                    try:
1811                        # we want to set parent wherever possible
1812                        p = cls(remain, _parent=pkt)
1813                    except TypeError:
1814                        p = cls(remain)
1815                else:
1816                    p = self.m2i(pkt, remain)
1817            except Exception:
1818                if conf.debug_dissector:
1819                    raise
1820                p = conf.raw_layer(load=remain)
1821                remain = b""
1822            else:
1823                if conf.padding_layer in p:
1824                    pad = p[conf.padding_layer]
1825                    remain = pad.load
1826                    del pad.underlayer.payload
1827                    if self.next_cls_cb is not None:
1828                        cls = self.next_cls_cb(pkt, lst, p, remain)
1829                        if cls is not None:
1830                            c = 0 if c is None else c
1831                            c += 1
1832                else:
1833                    remain = b""
1834            lst.append(p)
1835            if len(lst) > (self.max_count or conf.max_list_count):
1836                raise MaximumItemsCount(
1837                    "Maximum amount of items reached in PacketListField: %s "
1838                    "(defaults to conf.max_list_count)"
1839                    % (self.max_count or conf.max_list_count)
1840                )
1841
1842        if isinstance(remain, tuple):
1843            remain, nb = remain
1844            return (remain + ret, nb), lst
1845        else:
1846            return remain + ret, lst
1847
1848    def i2m(self,
1849            pkt,  # type: Optional[Packet]
1850            i,  # type: Any
1851            ):
1852        # type: (...) -> bytes
1853        return bytes_encode(i)
1854
1855    def addfield(self, pkt, s, val):
1856        # type: (Packet, bytes, Any) -> bytes
1857        return s + b"".join(self.i2m(pkt, v) for v in val)
1858
1859
1860class StrFixedLenField(StrField):
1861    __slots__ = ["length_from"]
1862
1863    def __init__(
1864            self,
1865            name,  # type: str
1866            default,  # type: Optional[bytes]
1867            length=None,  # type: Optional[int]
1868            length_from=None,  # type: Optional[Callable[[Packet], int]]  # noqa: E501
1869    ):
1870        # type: (...) -> None
1871        super(StrFixedLenField, self).__init__(name, default)
1872        self.length_from = length_from or (lambda x: 0)
1873        if length is not None:
1874            self.sz = length
1875            self.length_from = lambda x, length=length: length  # type: ignore
1876
1877    def i2repr(self,
1878               pkt,  # type: Optional[Packet]
1879               v,  # type: bytes
1880               ):
1881        # type: (...) -> str
1882        if isinstance(v, bytes):
1883            v = v.rstrip(b"\0")
1884        return super(StrFixedLenField, self).i2repr(pkt, v)
1885
1886    def getfield(self, pkt, s):
1887        # type: (Packet, bytes) -> Tuple[bytes, bytes]
1888        len_pkt = self.length_from(pkt)
1889        if len_pkt == 0:
1890            return s, b""
1891        return s[len_pkt:], self.m2i(pkt, s[:len_pkt])
1892
1893    def addfield(self, pkt, s, val):
1894        # type: (Packet, bytes, Optional[bytes]) -> bytes
1895        len_pkt = self.length_from(pkt)
1896        if len_pkt is None:
1897            return s + self.i2m(pkt, val)
1898        return s + struct.pack("%is" % len_pkt, self.i2m(pkt, val))
1899
1900    def randval(self):
1901        # type: () -> RandBin
1902        try:
1903            return RandBin(self.length_from(None))  # type: ignore
1904        except Exception:
1905            return RandBin(RandNum(0, 200))
1906
1907
1908class StrFixedLenFieldUtf16(StrFixedLenField, StrFieldUtf16):
1909    pass
1910
1911
1912class StrFixedLenEnumField(_StrEnumField, StrFixedLenField):
1913    __slots__ = ["enum"]
1914
1915    def __init__(
1916            self,
1917            name,  # type: str
1918            default,  # type: bytes
1919            enum=None,  # type: Optional[Dict[str, str]]
1920            length=None,  # type: Optional[int]
1921            length_from=None  # type: Optional[Callable[[Optional[Packet]], int]]  # noqa: E501
1922    ):
1923        # type: (...) -> None
1924        StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from)  # noqa: E501
1925        self.enum = enum
1926
1927
1928class NetBIOSNameField(StrFixedLenField):
1929    def __init__(self, name, default, length=31):
1930        # type: (str, bytes, int) -> None
1931        StrFixedLenField.__init__(self, name, default, length)
1932
1933    def h2i(self, pkt, x):
1934        # type: (Optional[Packet], bytes) -> bytes
1935        if x and len(x) > 15:
1936            x = x[:15]
1937        return x
1938
1939    def i2m(self, pkt, y):
1940        # type: (Optional[Packet], Optional[bytes]) -> bytes
1941        if pkt:
1942            len_pkt = self.length_from(pkt) // 2
1943        else:
1944            len_pkt = 0
1945        x = bytes_encode(y or b"")  # type: bytes
1946        x += b" " * len_pkt
1947        x = x[:len_pkt]
1948        x = b"".join(
1949            struct.pack(
1950                "!BB",
1951                0x41 + (b >> 4),
1952                0x41 + (b & 0xf),
1953            )
1954            for b in x
1955        )
1956        return b" " + x
1957
1958    def m2i(self, pkt, x):
1959        # type: (Optional[Packet], bytes) -> bytes
1960        x = x[1:].strip(b"\x00")
1961        return b"".join(map(
1962            lambda x, y: struct.pack(
1963                "!B",
1964                (((x - 1) & 0xf) << 4) + ((y - 1) & 0xf)
1965            ),
1966            x[::2], x[1::2]
1967        )).rstrip(b" ")
1968
1969
1970class StrLenField(StrField):
1971    """
1972    StrField with a length
1973
1974    :param length_from: a function that returns the size of the string
1975    :param max_length: max size to use as randval
1976    """
1977    __slots__ = ["length_from", "max_length"]
1978    ON_WIRE_SIZE_UTF16 = True
1979
1980    def __init__(
1981            self,
1982            name,  # type: str
1983            default,  # type: bytes
1984            length_from=None,  # type: Optional[Callable[[Packet], int]]
1985            max_length=None,  # type: Optional[Any]
1986    ):
1987        # type: (...) -> None
1988        super(StrLenField, self).__init__(name, default)
1989        self.length_from = length_from
1990        self.max_length = max_length
1991
1992    def getfield(self, pkt, s):
1993        # type: (Any, bytes) -> Tuple[bytes, bytes]
1994        len_pkt = (self.length_from or (lambda x: 0))(pkt)
1995        if not self.ON_WIRE_SIZE_UTF16:
1996            len_pkt *= 2
1997        if len_pkt == 0:
1998            return s, b""
1999        return s[len_pkt:], self.m2i(pkt, s[:len_pkt])
2000
2001    def randval(self):
2002        # type: () -> RandBin
2003        return RandBin(RandNum(0, self.max_length or 1200))
2004
2005
2006class _XStrField(Field[bytes, bytes]):
2007    def i2repr(self, pkt, x):
2008        # type: (Optional[Packet], bytes) -> str
2009        if isinstance(x, bytes):
2010            return bytes_hex(x).decode()
2011        return super(_XStrField, self).i2repr(pkt, x)
2012
2013
2014class XStrField(_XStrField, StrField):
2015    """
2016    StrField which value is printed as hexadecimal.
2017    """
2018
2019
2020class XStrLenField(_XStrField, StrLenField):
2021    """
2022    StrLenField which value is printed as hexadecimal.
2023    """
2024
2025
2026class XStrFixedLenField(_XStrField, StrFixedLenField):
2027    """
2028    StrFixedLenField which value is printed as hexadecimal.
2029    """
2030
2031
2032class XLEStrLenField(XStrLenField):
2033    def i2m(self, pkt, x):
2034        # type: (Optional[Packet], Optional[bytes]) -> bytes
2035        if not x:
2036            return b""
2037        return x[:: -1]
2038
2039    def m2i(self, pkt, x):
2040        # type: (Optional[Packet], bytes) -> bytes
2041        return x[:: -1]
2042
2043
2044class StrLenFieldUtf16(StrLenField, StrFieldUtf16):
2045    pass
2046
2047
2048class StrLenEnumField(_StrEnumField, StrLenField):
2049    __slots__ = ["enum"]
2050
2051    def __init__(
2052            self,
2053            name,  # type: str
2054            default,  # type: bytes
2055            enum=None,  # type: Optional[Dict[str, str]]
2056            **kwargs  # type: Any
2057    ):
2058        # type: (...) -> None
2059        StrLenField.__init__(self, name, default, **kwargs)
2060        self.enum = enum
2061
2062
2063class BoundStrLenField(StrLenField):
2064    __slots__ = ["minlen", "maxlen"]
2065
2066    def __init__(
2067            self,
2068            name,  # type: str
2069            default,  # type: bytes
2070            minlen=0,  # type: int
2071            maxlen=255,  # type: int
2072            length_from=None  # type: Optional[Callable[[Packet], int]]
2073    ):
2074        # type: (...) -> None
2075        StrLenField.__init__(self, name, default, length_from=length_from)
2076        self.minlen = minlen
2077        self.maxlen = maxlen
2078
2079    def randval(self):
2080        # type: () -> RandBin
2081        return RandBin(RandNum(self.minlen, self.maxlen))
2082
2083
2084class FieldListField(Field[List[Any], List[Any]]):
2085    __slots__ = ["field", "count_from", "length_from", "max_count"]
2086    islist = 1
2087
2088    def __init__(
2089            self,
2090            name,  # type: str
2091            default,  # type: Optional[List[AnyField]]
2092            field,  # type: AnyField
2093            length_from=None,  # type: Optional[Callable[[Packet], int]]
2094            count_from=None,  # type: Optional[Callable[[Packet], int]]
2095            max_count=None,  # type: Optional[int]
2096    ):
2097        # type: (...) -> None
2098        if default is None:
2099            default = []  # Create a new list for each instance
2100        self.field = field
2101        Field.__init__(self, name, default)
2102        self.count_from = count_from
2103        self.length_from = length_from
2104        self.max_count = max_count
2105
2106    def i2count(self, pkt, val):
2107        # type: (Optional[Packet], List[Any]) -> int
2108        if isinstance(val, list):
2109            return len(val)
2110        return 1
2111
2112    def i2len(self, pkt, val):
2113        # type: (Packet, List[Any]) -> int
2114        return int(sum(self.field.i2len(pkt, v) for v in val))
2115
2116    def any2i(self, pkt, x):
2117        # type: (Optional[Packet], List[Any]) -> List[Any]
2118        if not isinstance(x, list):
2119            return [self.field.any2i(pkt, x)]
2120        else:
2121            return [self.field.any2i(pkt, e) for e in x]
2122
2123    def i2repr(self,
2124               pkt,  # type: Optional[Packet]
2125               x,  # type: List[Any]
2126               ):
2127        # type: (...) -> str
2128        return "[%s]" % ", ".join(self.field.i2repr(pkt, v) for v in x)
2129
2130    def addfield(self,
2131                 pkt,  # type: Packet
2132                 s,  # type: bytes
2133                 val,  # type: Optional[List[Any]]
2134                 ):
2135        # type: (...) -> bytes
2136        val = self.i2m(pkt, val)
2137        for v in val:
2138            s = self.field.addfield(pkt, s, v)
2139        return s
2140
2141    def getfield(self,
2142                 pkt,  # type: Packet
2143                 s,  # type: bytes
2144                 ):
2145        # type: (...) -> Any
2146        c = len_pkt = None
2147        if self.length_from is not None:
2148            len_pkt = self.length_from(pkt)
2149        elif self.count_from is not None:
2150            c = self.count_from(pkt)
2151
2152        val = []
2153        ret = b""
2154        if len_pkt is not None:
2155            s, ret = s[:len_pkt], s[len_pkt:]
2156
2157        while s:
2158            if c is not None:
2159                if c <= 0:
2160                    break
2161                c -= 1
2162            s, v = self.field.getfield(pkt, s)
2163            val.append(v)
2164            if len(val) > (self.max_count or conf.max_list_count):
2165                raise MaximumItemsCount(
2166                    "Maximum amount of items reached in FieldListField: %s "
2167                    "(defaults to conf.max_list_count)"
2168                    % (self.max_count or conf.max_list_count)
2169                )
2170
2171        if isinstance(s, tuple):
2172            s, bn = s
2173            return (s + ret, bn), val
2174        else:
2175            return s + ret, val
2176
2177
2178class FieldLenField(Field[int, int]):
2179    __slots__ = ["length_of", "count_of", "adjust"]
2180
2181    def __init__(
2182            self,
2183            name,  # type: str
2184            default,  # type: Optional[Any]
2185            length_of=None,  # type: Optional[str]
2186            fmt="H",  # type: str
2187            count_of=None,  # type: Optional[str]
2188            adjust=lambda pkt, x: x,  # type: Callable[[Packet, int], int]
2189    ):
2190        # type: (...) -> None
2191        Field.__init__(self, name, default, fmt)
2192        self.length_of = length_of
2193        self.count_of = count_of
2194        self.adjust = adjust
2195
2196    def i2m(self, pkt, x):
2197        # type: (Optional[Packet], Optional[int]) -> int
2198        if x is None and pkt is not None:
2199            if self.length_of is not None:
2200                fld, fval = pkt.getfield_and_val(self.length_of)
2201                f = fld.i2len(pkt, fval)
2202            elif self.count_of is not None:
2203                fld, fval = pkt.getfield_and_val(self.count_of)
2204                f = fld.i2count(pkt, fval)
2205            else:
2206                raise ValueError(
2207                    "Field should have either length_of or count_of"
2208                )
2209            x = self.adjust(pkt, f)
2210        elif x is None:
2211            x = 0
2212        return x
2213
2214
2215class StrNullField(StrField):
2216    DELIMITER = b"\x00"
2217
2218    def addfield(self, pkt, s, val):
2219        # type: (Packet, bytes, Optional[bytes]) -> bytes
2220        return s + self.i2m(pkt, val) + self.DELIMITER
2221
2222    def getfield(self,
2223                 pkt,  # type: Packet
2224                 s,  # type: bytes
2225                 ):
2226        # type: (...) -> Tuple[bytes, bytes]
2227        len_str = 0
2228        while True:
2229            len_str = s.find(self.DELIMITER, len_str)
2230            if len_str < 0:
2231                # DELIMITER not found: return empty
2232                return b"", s
2233            if len_str % len(self.DELIMITER):
2234                len_str += 1
2235            else:
2236                break
2237        return s[len_str + len(self.DELIMITER):], self.m2i(pkt, s[:len_str])
2238
2239    def randval(self):
2240        # type: () -> RandTermString
2241        return RandTermString(RandNum(0, 1200), self.DELIMITER)
2242
2243    def i2len(self, pkt, x):
2244        # type: (Optional[Packet], Any) -> int
2245        return super(StrNullField, self).i2len(pkt, x) + 1
2246
2247
2248class StrNullFieldUtf16(StrNullField, StrFieldUtf16):
2249    DELIMITER = b"\x00\x00"
2250
2251
2252class StrStopField(StrField):
2253    __slots__ = ["stop", "additional"]
2254
2255    def __init__(self, name, default, stop, additional=0):
2256        # type: (str, str, bytes, int) -> None
2257        Field.__init__(self, name, default)
2258        self.stop = stop
2259        self.additional = additional
2260
2261    def getfield(self, pkt, s):
2262        # type: (Optional[Packet], bytes) -> Tuple[bytes, bytes]
2263        len_str = s.find(self.stop)
2264        if len_str < 0:
2265            return b"", s
2266        len_str += len(self.stop) + self.additional
2267        return s[len_str:], s[:len_str]
2268
2269    def randval(self):
2270        # type: () -> RandTermString
2271        return RandTermString(RandNum(0, 1200), self.stop)
2272
2273
2274class LenField(Field[int, int]):
2275    """
2276    If None, will be filled with the size of the payload
2277    """
2278    __slots__ = ["adjust"]
2279
2280    def __init__(self, name, default, fmt="H", adjust=lambda x: x):
2281        # type: (str, Optional[Any], str, Callable[[int], int]) -> None
2282        Field.__init__(self, name, default, fmt)
2283        self.adjust = adjust
2284
2285    def i2m(self,
2286            pkt,  # type: Optional[Packet]
2287            x,  # type: Optional[int]
2288            ):
2289        # type: (...) -> int
2290        if x is None:
2291            x = 0
2292            if pkt is not None:
2293                x = self.adjust(len(pkt.payload))
2294        return x
2295
2296
2297class BCDFloatField(Field[float, int]):
2298    def i2m(self, pkt, x):
2299        # type: (Optional[Packet], Optional[float]) -> int
2300        if x is None:
2301            return 0
2302        return int(256 * x)
2303
2304    def m2i(self, pkt, x):
2305        # type: (Optional[Packet], int) -> float
2306        return x / 256.0
2307
2308
2309class _BitField(Field[I, int]):
2310    """
2311    Field to handle bits.
2312
2313    :param name: name of the field
2314    :param default: default value
2315    :param size: size (in bits). If negative, Low endian
2316    :param tot_size: size of the total group of bits (in bytes) the bitfield
2317                     is in. If negative, Low endian.
2318    :param end_tot_size: same but for the BitField ending a group.
2319
2320    Example - normal usage::
2321
2322         0                   1                   2                   3
2323         0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
2324        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
2325        |             A             |               B               | C |
2326        +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
2327
2328                                 Fig. TestPacket
2329
2330        class TestPacket(Packet):
2331            fields_desc = [
2332                BitField("a", 0, 14),
2333                BitField("b", 0, 16),
2334                BitField("c", 0, 2),
2335            ]
2336
2337    Example - Low endian stored as 16 bits on the network::
2338
2339        x x x x x x x x x x x x x x x x
2340        a [b] [   c   ] [      a      ]
2341
2342        Will first get reversed during dissecion:
2343
2344        x x x x x x x x x x x x x x x x
2345        [      a        ] [b] [   c   ]
2346
2347        class TestPacket(Packet):
2348            fields_desc = [
2349                BitField("a", 0, 9, tot_size=-2),
2350                BitField("b", 0, 2),
2351                BitField("c", 0, 5, end_tot_size=-2)
2352            ]
2353
2354    """
2355    __slots__ = ["rev", "size", "tot_size", "end_tot_size"]
2356
2357    def __init__(self, name, default, size,
2358                 tot_size=0, end_tot_size=0):
2359        # type: (str, Optional[I], int, int, int) -> None
2360        Field.__init__(self, name, default)
2361        if callable(size):
2362            size = size(self)
2363        self.rev = size < 0 or tot_size < 0 or end_tot_size < 0
2364        self.size = abs(size)
2365        if not tot_size:
2366            tot_size = self.size // 8
2367        self.tot_size = abs(tot_size)
2368        if not end_tot_size:
2369            end_tot_size = self.size // 8
2370        self.end_tot_size = abs(end_tot_size)
2371        # Fields always have a round sz except BitField
2372        # so to keep it simple, we'll ignore it here.
2373        self.sz = self.size / 8.  # type: ignore
2374
2375    # We need to # type: ignore a few things because of how special
2376    # BitField is
2377    def addfield(self,  # type: ignore
2378                 pkt,  # type: Packet
2379                 s,  # type: Union[Tuple[bytes, int, int], bytes]
2380                 ival,  # type: I
2381                 ):
2382        # type: (...) -> Union[Tuple[bytes, int, int], bytes]
2383        val = self.i2m(pkt, ival)
2384        if isinstance(s, tuple):
2385            s, bitsdone, v = s
2386        else:
2387            bitsdone = 0
2388            v = 0
2389        v <<= self.size
2390        v |= val & ((1 << self.size) - 1)
2391        bitsdone += self.size
2392        while bitsdone >= 8:
2393            bitsdone -= 8
2394            s = s + struct.pack("!B", v >> bitsdone)
2395            v &= (1 << bitsdone) - 1
2396        if bitsdone:
2397            return s, bitsdone, v
2398        else:
2399            # Apply LE if necessary
2400            if self.rev and self.end_tot_size > 1:
2401                s = s[:-self.end_tot_size] + s[-self.end_tot_size:][::-1]
2402            return s
2403
2404    def getfield(self,  # type: ignore
2405                 pkt,  # type: Packet
2406                 s,  # type: Union[Tuple[bytes, int], bytes]
2407                 ):
2408        # type: (...) -> Union[Tuple[Tuple[bytes, int], I], Tuple[bytes, I]]  # noqa: E501
2409        if isinstance(s, tuple):
2410            s, bn = s
2411        else:
2412            bn = 0
2413            # Apply LE if necessary
2414            if self.rev and self.tot_size > 1:
2415                s = s[:self.tot_size][::-1] + s[self.tot_size:]
2416
2417        # we don't want to process all the string
2418        nb_bytes = (self.size + bn - 1) // 8 + 1
2419        w = s[:nb_bytes]
2420
2421        # split the substring byte by byte
2422        _bytes = struct.unpack('!%dB' % nb_bytes, w)
2423
2424        b = 0
2425        for c in range(nb_bytes):
2426            b |= int(_bytes[c]) << (nb_bytes - c - 1) * 8
2427
2428        # get rid of high order bits
2429        b &= (1 << (nb_bytes * 8 - bn)) - 1
2430
2431        # remove low order bits
2432        b = b >> (nb_bytes * 8 - self.size - bn)
2433
2434        bn += self.size
2435        s = s[bn // 8:]
2436        bn = bn % 8
2437        b2 = self.m2i(pkt, b)
2438        if bn:
2439            return (s, bn), b2
2440        else:
2441            return s, b2
2442
2443    def randval(self):
2444        # type: () -> RandNum
2445        return RandNum(0, 2**self.size - 1)
2446
2447    def i2len(self, pkt, x):  # type: ignore
2448        # type: (Optional[Packet], Optional[float]) -> float
2449        return float(self.size) / 8
2450
2451
2452class BitField(_BitField[int]):
2453    __doc__ = _BitField.__doc__
2454
2455
2456class BitLenField(BitField):
2457    __slots__ = ["length_from"]
2458
2459    def __init__(self,
2460                 name,  # type: str
2461                 default,  # type: Optional[int]
2462                 length_from  # type: Callable[[Packet], int]
2463                 ):
2464        # type: (...) -> None
2465        self.length_from = length_from
2466        super(BitLenField, self).__init__(name, default, 0)
2467
2468    def getfield(self,  # type: ignore
2469                 pkt,  # type: Packet
2470                 s,  # type: Union[Tuple[bytes, int], bytes]
2471                 ):
2472        # type: (...) -> Union[Tuple[Tuple[bytes, int], int], Tuple[bytes, int]]  # noqa: E501
2473        self.size = self.length_from(pkt)
2474        return super(BitLenField, self).getfield(pkt, s)
2475
2476    def addfield(self,  # type: ignore
2477                 pkt,  # type: Packet
2478                 s,  # type: Union[Tuple[bytes, int, int], bytes]
2479                 val  # type: int
2480                 ):
2481        # type: (...) -> Union[Tuple[bytes, int, int], bytes]
2482        self.size = self.length_from(pkt)
2483        return super(BitLenField, self).addfield(pkt, s, val)
2484
2485
2486class BitFieldLenField(BitField):
2487    __slots__ = ["length_of", "count_of", "adjust", "tot_size", "end_tot_size"]
2488
2489    def __init__(self,
2490                 name,  # type: str
2491                 default,  # type: Optional[int]
2492                 size,  # type: int
2493                 length_of=None,  # type: Optional[Union[Callable[[Optional[Packet]], int], str]]  # noqa: E501
2494                 count_of=None,  # type: Optional[str]
2495                 adjust=lambda pkt, x: x,  # type: Callable[[Optional[Packet], int], int]  # noqa: E501
2496                 tot_size=0,  # type: int
2497                 end_tot_size=0,  # type: int
2498                 ):
2499        # type: (...) -> None
2500        super(BitFieldLenField, self).__init__(name, default, size,
2501                                               tot_size, end_tot_size)
2502        self.length_of = length_of
2503        self.count_of = count_of
2504        self.adjust = adjust
2505
2506    def i2m(self, pkt, x):
2507        # type: (Optional[Packet], Optional[Any]) -> int
2508        return FieldLenField.i2m(self, pkt, x)  # type: ignore
2509
2510
2511class XBitField(BitField):
2512    def i2repr(self, pkt, x):
2513        # type: (Optional[Packet], int) -> str
2514        return lhex(self.i2h(pkt, x))
2515
2516
2517class _EnumField(Field[Union[List[I], I], I]):
2518    def __init__(self,
2519                 name,  # type: str
2520                 default,  # type: Optional[I]
2521                 enum,  # type: Union[Dict[I, str], Dict[str, I], List[str], DADict[I, str], Type[Enum], Tuple[Callable[[I], str], Callable[[str], I]]]  # noqa: E501
2522                 fmt="H",  # type: str
2523                 ):
2524        # type: (...) -> None
2525        """ Initializes enum fields.
2526
2527        @param name:    name of this field
2528        @param default: default value of this field
2529        @param enum:    either an enum, a dict or a tuple of two callables.
2530                        Dict keys are the internal values, while the dict
2531                        values are the user-friendly representations. If the
2532                        tuple is provided, the first callable receives the
2533                        internal value as parameter and returns the
2534                        user-friendly representation and the second callable
2535                        does the converse. The first callable may return None
2536                        to default to a literal string (repr()) representation.
2537        @param fmt:     struct.pack format used to parse and serialize the
2538                        internal value from and to machine representation.
2539        """
2540        if isinstance(enum, ObservableDict):
2541            cast(ObservableDict, enum).observe(self)
2542
2543        if isinstance(enum, tuple):
2544            self.i2s_cb = enum[0]  # type: Optional[Callable[[I], str]]
2545            self.s2i_cb = enum[1]  # type: Optional[Callable[[str], I]]
2546            self.i2s = None  # type: Optional[Dict[I, str]]
2547            self.s2i = None  # type: Optional[Dict[str, I]]
2548        elif isinstance(enum, type) and issubclass(enum, Enum):
2549            # Python's Enum
2550            i2s = self.i2s = {}
2551            s2i = self.s2i = {}
2552            self.i2s_cb = None
2553            self.s2i_cb = None
2554            names = [x.name for x in enum]
2555            for n in names:
2556                value = enum[n].value
2557                i2s[value] = n
2558                s2i[n] = value
2559        else:
2560            i2s = self.i2s = {}
2561            s2i = self.s2i = {}
2562            self.i2s_cb = None
2563            self.s2i_cb = None
2564            keys = []  # type: List[I]
2565            if isinstance(enum, list):
2566                keys = list(range(len(enum)))  # type: ignore
2567            elif isinstance(enum, DADict):
2568                keys = enum.keys()
2569            else:
2570                keys = list(enum)  # type: ignore
2571                if any(isinstance(x, str) for x in keys):
2572                    i2s, s2i = s2i, i2s  # type: ignore
2573            for k in keys:
2574                value = cast(str, enum[k])  # type: ignore
2575                i2s[k] = value
2576                s2i[value] = k
2577        Field.__init__(self, name, default, fmt)
2578
2579    def any2i_one(self, pkt, x):
2580        # type: (Optional[Packet], Any) -> I
2581        if isinstance(x, Enum):
2582            return cast(I, x.value)
2583        elif isinstance(x, str):
2584            if self.s2i:
2585                x = self.s2i[x]
2586            elif self.s2i_cb:
2587                x = self.s2i_cb(x)
2588        return cast(I, x)
2589
2590    def _i2repr(self, pkt, x):
2591        # type: (Optional[Packet], I) -> str
2592        return repr(x)
2593
2594    def i2repr_one(self, pkt, x):
2595        # type: (Optional[Packet], I) -> str
2596        if self not in conf.noenum and not isinstance(x, VolatileValue):
2597            if self.i2s:
2598                try:
2599                    return self.i2s[x]
2600                except KeyError:
2601                    pass
2602            elif self.i2s_cb:
2603                ret = self.i2s_cb(x)
2604                if ret is not None:
2605                    return ret
2606        return self._i2repr(pkt, x)
2607
2608    def any2i(self, pkt, x):
2609        # type: (Optional[Packet], Any) -> Union[I, List[I]]
2610        if isinstance(x, list):
2611            return [self.any2i_one(pkt, z) for z in x]
2612        else:
2613            return self.any2i_one(pkt, x)
2614
2615    def i2repr(self, pkt, x):  # type: ignore
2616        # type: (Optional[Packet], Any) -> Union[List[str], str]
2617        if isinstance(x, list):
2618            return [self.i2repr_one(pkt, z) for z in x]
2619        else:
2620            return self.i2repr_one(pkt, x)
2621
2622    def notify_set(self, enum, key, value):
2623        # type: (ObservableDict, I, str) -> None
2624        ks = "0x%x" if isinstance(key, int) else "%s"
2625        log_runtime.debug(
2626            "At %s: Change to %s at " + ks, self, value, key
2627        )
2628        if self.i2s is not None and self.s2i is not None:
2629            self.i2s[key] = value
2630            self.s2i[value] = key
2631
2632    def notify_del(self, enum, key):
2633        # type: (ObservableDict, I) -> None
2634        ks = "0x%x" if isinstance(key, int) else "%s"
2635        log_runtime.debug("At %s: Delete value at " + ks, self, key)
2636        if self.i2s is not None and self.s2i is not None:
2637            value = self.i2s[key]
2638            del self.i2s[key]
2639            del self.s2i[value]
2640
2641
2642class EnumField(_EnumField[I]):
2643    __slots__ = ["i2s", "s2i", "s2i_cb", "i2s_cb"]
2644
2645
2646class CharEnumField(EnumField[str]):
2647    def __init__(self,
2648                 name,  # type: str
2649                 default,  # type: str
2650                 enum,  # type: Union[Dict[str, str], Tuple[Callable[[str], str], Callable[[str], str]]]  # noqa: E501
2651                 fmt="1s",  # type: str
2652                 ):
2653        # type: (...) -> None
2654        super(CharEnumField, self).__init__(name, default, enum, fmt)
2655        if self.i2s is not None:
2656            k = list(self.i2s)
2657            if k and len(k[0]) != 1:
2658                self.i2s, self.s2i = self.s2i, self.i2s
2659
2660    def any2i_one(self, pkt, x):
2661        # type: (Optional[Packet], str) -> str
2662        if len(x) != 1:
2663            if self.s2i:
2664                x = self.s2i[x]
2665            elif self.s2i_cb:
2666                x = self.s2i_cb(x)
2667        return x
2668
2669
2670class BitEnumField(_BitField[Union[List[int], int]], _EnumField[int]):
2671    __slots__ = EnumField.__slots__
2672
2673    def __init__(self, name, default, size, enum, **kwargs):
2674        # type: (str, Optional[int], int, Dict[int, str], **Any) -> None
2675        _EnumField.__init__(self, name, default, enum)
2676        _BitField.__init__(self, name, default, size, **kwargs)
2677
2678    def any2i(self, pkt, x):
2679        # type: (Optional[Packet], Any) -> Union[List[int], int]
2680        return _EnumField.any2i(self, pkt, x)
2681
2682    def i2repr(self,
2683               pkt,  # type: Optional[Packet]
2684               x,  # type: Union[List[int], int]
2685               ):
2686        # type: (...) -> Any
2687        return _EnumField.i2repr(self, pkt, x)
2688
2689
2690class BitLenEnumField(BitLenField, _EnumField[int]):
2691    __slots__ = EnumField.__slots__
2692
2693    def __init__(self,
2694                 name,  # type: str
2695                 default,  # type: Optional[int]
2696                 length_from,  # type: Callable[[Packet], int]
2697                 enum,  # type: Dict[int, str]
2698                 **kwargs,  # type: Any
2699                 ):
2700        # type: (...) -> None
2701        _EnumField.__init__(self, name, default, enum)
2702        BitLenField.__init__(self, name, default, length_from, **kwargs)
2703
2704    def any2i(self, pkt, x):
2705        # type: (Optional[Packet], Any) -> int
2706        return _EnumField.any2i(self, pkt, x)  # type: ignore
2707
2708    def i2repr(self,
2709               pkt,  # type: Optional[Packet]
2710               x,  # type: Union[List[int], int]
2711               ):
2712        # type: (...) -> Any
2713        return _EnumField.i2repr(self, pkt, x)
2714
2715
2716class ShortEnumField(EnumField[int]):
2717    __slots__ = EnumField.__slots__
2718
2719    def __init__(self,
2720                 name,  # type: str
2721                 default,  # type: int
2722                 enum,  # type: Union[Dict[int, str], Dict[str, int], Tuple[Callable[[int], str], Callable[[str], int]], DADict[int, str]]  # noqa: E501
2723                 ):
2724        # type: (...) -> None
2725        super(ShortEnumField, self).__init__(name, default, enum, "H")
2726
2727
2728class LEShortEnumField(EnumField[int]):
2729    def __init__(self, name, default, enum):
2730        # type: (str, int, Union[Dict[int, str], List[str]]) -> None
2731        super(LEShortEnumField, self).__init__(name, default, enum, "<H")
2732
2733
2734class LongEnumField(EnumField[int]):
2735    def __init__(self, name, default, enum):
2736        # type: (str, int, Union[Dict[int, str], List[str]]) -> None
2737        super(LongEnumField, self).__init__(name, default, enum, "Q")
2738
2739
2740class LELongEnumField(EnumField[int]):
2741    def __init__(self, name, default, enum):
2742        # type: (str, int, Union[Dict[int, str], List[str]]) -> None
2743        super(LELongEnumField, self).__init__(name, default, enum, "<Q")
2744
2745
2746class ByteEnumField(EnumField[int]):
2747    def __init__(self, name, default, enum):
2748        # type: (str, Optional[int], Dict[int, str]) -> None
2749        super(ByteEnumField, self).__init__(name, default, enum, "B")
2750
2751
2752class XByteEnumField(ByteEnumField):
2753    def i2repr_one(self, pkt, x):
2754        # type: (Optional[Packet], int) -> str
2755        if self not in conf.noenum and not isinstance(x, VolatileValue):
2756            if self.i2s:
2757                try:
2758                    return self.i2s[x]
2759                except KeyError:
2760                    pass
2761            elif self.i2s_cb:
2762                ret = self.i2s_cb(x)
2763                if ret is not None:
2764                    return ret
2765        return lhex(x)
2766
2767
2768class IntEnumField(EnumField[int]):
2769    def __init__(self, name, default, enum):
2770        # type: (str, Optional[int], Dict[int, str]) -> None
2771        super(IntEnumField, self).__init__(name, default, enum, "I")
2772
2773
2774class SignedIntEnumField(EnumField[int]):
2775    def __init__(self, name, default, enum):
2776        # type: (str, Optional[int], Dict[int, str]) -> None
2777        super(SignedIntEnumField, self).__init__(name, default, enum, "i")
2778
2779
2780class LEIntEnumField(EnumField[int]):
2781    def __init__(self, name, default, enum):
2782        # type: (str, int, Dict[int, str]) -> None
2783        super(LEIntEnumField, self).__init__(name, default, enum, "<I")
2784
2785
2786class XShortEnumField(ShortEnumField):
2787    def _i2repr(self, pkt, x):
2788        # type: (Optional[Packet], Any) -> str
2789        return lhex(x)
2790
2791
2792class LE3BytesEnumField(LEThreeBytesField, _EnumField[int]):
2793    __slots__ = EnumField.__slots__
2794
2795    def __init__(self, name, default, enum):
2796        # type: (str, Optional[int], Dict[int, str]) -> None
2797        _EnumField.__init__(self, name, default, enum)
2798        LEThreeBytesField.__init__(self, name, default)
2799
2800    def any2i(self, pkt, x):
2801        # type: (Optional[Packet], Any) -> int
2802        return _EnumField.any2i(self, pkt, x)  # type: ignore
2803
2804    def i2repr(self, pkt, x):  # type: ignore
2805        # type: (Optional[Packet], Any) -> Union[List[str], str]
2806        return _EnumField.i2repr(self, pkt, x)
2807
2808
2809class XLE3BytesEnumField(LE3BytesEnumField):
2810    def _i2repr(self, pkt, x):
2811        # type: (Optional[Packet], Any) -> str
2812        return lhex(x)
2813
2814
2815class _MultiEnumField(_EnumField[I]):
2816    def __init__(self,
2817                 name,  # type: str
2818                 default,  # type: int
2819                 enum,  # type: Dict[I, Dict[I, str]]
2820                 depends_on,  # type: Callable[[Optional[Packet]], I]
2821                 fmt="H"  # type: str
2822                 ):
2823        # type: (...) -> None
2824
2825        self.depends_on = depends_on
2826        self.i2s_multi = enum
2827        self.s2i_multi = {}  # type: Dict[I, Dict[str, I]]
2828        self.s2i_all = {}  # type: Dict[str, I]
2829        for m in enum:
2830            s2i = {}  # type: Dict[str, I]
2831            self.s2i_multi[m] = s2i
2832            for k, v in enum[m].items():
2833                s2i[v] = k
2834                self.s2i_all[v] = k
2835        Field.__init__(self, name, default, fmt)
2836
2837    def any2i_one(self, pkt, x):
2838        # type: (Optional[Packet], Any) -> I
2839        if isinstance(x, str):
2840            v = self.depends_on(pkt)
2841            if v in self.s2i_multi:
2842                s2i = self.s2i_multi[v]
2843                if x in s2i:
2844                    return s2i[x]
2845            return self.s2i_all[x]
2846        return cast(I, x)
2847
2848    def i2repr_one(self, pkt, x):
2849        # type: (Optional[Packet], I) -> str
2850        v = self.depends_on(pkt)
2851        if isinstance(v, VolatileValue):
2852            return repr(v)
2853        if v in self.i2s_multi:
2854            return str(self.i2s_multi[v].get(x, x))
2855        return str(x)
2856
2857
2858class MultiEnumField(_MultiEnumField[int], EnumField[int]):
2859    __slots__ = ["depends_on", "i2s_multi", "s2i_multi", "s2i_all"]
2860
2861
2862class BitMultiEnumField(_BitField[Union[List[int], int]],
2863                        _MultiEnumField[int]):
2864    __slots__ = EnumField.__slots__ + MultiEnumField.__slots__
2865
2866    def __init__(
2867            self,
2868            name,  # type: str
2869            default,  # type: int
2870            size,  # type: int
2871            enum,  # type: Dict[int, Dict[int, str]]
2872            depends_on  # type: Callable[[Optional[Packet]], int]
2873    ):
2874        # type: (...) -> None
2875        _MultiEnumField.__init__(self, name, default, enum, depends_on)
2876        self.rev = size < 0
2877        self.size = abs(size)
2878        self.sz = self.size / 8.  # type: ignore
2879
2880    def any2i(self, pkt, x):
2881        # type: (Optional[Packet], Any) -> Union[List[int], int]
2882        return _MultiEnumField[int].any2i(
2883            self,  # type: ignore
2884            pkt,
2885            x
2886        )
2887
2888    def i2repr(  # type: ignore
2889            self,
2890            pkt,  # type: Optional[Packet]
2891            x  # type: Union[List[int], int]
2892    ):
2893        # type: (...) -> Union[str, List[str]]
2894        return _MultiEnumField[int].i2repr(
2895            self,  # type: ignore
2896            pkt,
2897            x
2898        )
2899
2900
2901class ByteEnumKeysField(ByteEnumField):
2902    """ByteEnumField that picks valid values when fuzzed. """
2903
2904    def randval(self):
2905        # type: () -> RandEnumKeys
2906        return RandEnumKeys(self.i2s or {})
2907
2908
2909class ShortEnumKeysField(ShortEnumField):
2910    """ShortEnumField that picks valid values when fuzzed. """
2911
2912    def randval(self):
2913        # type: () -> RandEnumKeys
2914        return RandEnumKeys(self.i2s or {})
2915
2916
2917class IntEnumKeysField(IntEnumField):
2918    """IntEnumField that picks valid values when fuzzed. """
2919
2920    def randval(self):
2921        # type: () -> RandEnumKeys
2922        return RandEnumKeys(self.i2s or {})
2923
2924
2925# Little endian fixed length field
2926
2927
2928class LEFieldLenField(FieldLenField):
2929    def __init__(
2930            self,
2931            name,  # type: str
2932            default,  # type: Optional[Any]
2933            length_of=None,  # type: Optional[str]
2934            fmt="<H",  # type: str
2935            count_of=None,  # type: Optional[str]
2936            adjust=lambda pkt, x: x,  # type: Callable[[Packet, int], int]
2937    ):
2938        # type: (...) -> None
2939        FieldLenField.__init__(
2940            self, name, default,
2941            length_of=length_of,
2942            fmt=fmt,
2943            count_of=count_of,
2944            adjust=adjust
2945        )
2946
2947
2948class FlagValueIter(object):
2949
2950    __slots__ = ["flagvalue", "cursor"]
2951
2952    def __init__(self, flagvalue):
2953        # type: (FlagValue) -> None
2954        self.flagvalue = flagvalue
2955        self.cursor = 0
2956
2957    def __iter__(self):
2958        # type: () -> FlagValueIter
2959        return self
2960
2961    def __next__(self):
2962        # type: () -> str
2963        x = int(self.flagvalue)
2964        x >>= self.cursor
2965        while x:
2966            self.cursor += 1
2967            if x & 1:
2968                return self.flagvalue.names[self.cursor - 1]
2969            x >>= 1
2970        raise StopIteration
2971
2972    next = __next__
2973
2974
2975class FlagValue(object):
2976    __slots__ = ["value", "names", "multi"]
2977
2978    def _fixvalue(self, value):
2979        # type: (Any) -> int
2980        if not value:
2981            return 0
2982        if isinstance(value, str):
2983            value = value.split('+') if self.multi else list(value)
2984        if isinstance(value, list):
2985            y = 0
2986            for i in value:
2987                y |= 1 << self.names.index(i)
2988            value = y
2989        return int(value)
2990
2991    def __init__(self, value, names):
2992        # type: (Union[List[str], int, str], Union[List[str], str]) -> None
2993        self.multi = isinstance(names, list)
2994        self.names = names
2995        self.value = self._fixvalue(value)
2996
2997    def __hash__(self):
2998        # type: () -> int
2999        return hash(self.value)
3000
3001    def __int__(self):
3002        # type: () -> int
3003        return self.value
3004
3005    def __eq__(self, other):
3006        # type: (Any) -> bool
3007        return self.value == self._fixvalue(other)
3008
3009    def __lt__(self, other):
3010        # type: (Any) -> bool
3011        return self.value < self._fixvalue(other)
3012
3013    def __le__(self, other):
3014        # type: (Any) -> bool
3015        return self.value <= self._fixvalue(other)
3016
3017    def __gt__(self, other):
3018        # type: (Any) -> bool
3019        return self.value > self._fixvalue(other)
3020
3021    def __ge__(self, other):
3022        # type: (Any) -> bool
3023        return self.value >= self._fixvalue(other)
3024
3025    def __ne__(self, other):
3026        # type: (Any) -> bool
3027        return self.value != self._fixvalue(other)
3028
3029    def __and__(self, other):
3030        # type: (int) -> FlagValue
3031        return self.__class__(self.value & self._fixvalue(other), self.names)
3032    __rand__ = __and__
3033
3034    def __or__(self, other):
3035        # type: (int) -> FlagValue
3036        return self.__class__(self.value | self._fixvalue(other), self.names)
3037    __ror__ = __or__
3038    __add__ = __or__  # + is an alias for |
3039
3040    def __sub__(self, other):
3041        # type: (int) -> FlagValue
3042        return self.__class__(
3043            self.value & (2 ** len(self.names) - 1 - self._fixvalue(other)),
3044            self.names
3045        )
3046
3047    def __xor__(self, other):
3048        # type: (int) -> FlagValue
3049        return self.__class__(self.value ^ self._fixvalue(other), self.names)
3050
3051    def __lshift__(self, other):
3052        # type: (int) -> int
3053        return self.value << self._fixvalue(other)
3054
3055    def __rshift__(self, other):
3056        # type: (int) -> int
3057        return self.value >> self._fixvalue(other)
3058
3059    def __nonzero__(self):
3060        # type: () -> bool
3061        return bool(self.value)
3062    __bool__ = __nonzero__
3063
3064    def flagrepr(self):
3065        # type: () -> str
3066        warnings.warn(
3067            "obj.flagrepr() is obsolete. Use str(obj) instead.",
3068            DeprecationWarning
3069        )
3070        return str(self)
3071
3072    def __str__(self):
3073        # type: () -> str
3074        i = 0
3075        r = []
3076        x = int(self)
3077        while x:
3078            if x & 1:
3079                try:
3080                    name = self.names[i]
3081                except IndexError:
3082                    name = "?"
3083                r.append(name)
3084            i += 1
3085            x >>= 1
3086        return ("+" if self.multi else "").join(r)
3087
3088    def __iter__(self):
3089        # type: () -> FlagValueIter
3090        return FlagValueIter(self)
3091
3092    def __repr__(self):
3093        # type: () -> str
3094        return "<Flag %d (%s)>" % (self, self)
3095
3096    def __deepcopy__(self, memo):
3097        # type: (Dict[Any, Any]) -> FlagValue
3098        return self.__class__(int(self), self.names)
3099
3100    def __getattr__(self, attr):
3101        # type: (str) -> Any
3102        if attr in self.__slots__:
3103            return super(FlagValue, self).__getattribute__(attr)
3104        try:
3105            if self.multi:
3106                return bool((2 ** self.names.index(attr)) & int(self))
3107            return all(bool((2 ** self.names.index(flag)) & int(self))
3108                       for flag in attr)
3109        except ValueError:
3110            if '_' in attr:
3111                try:
3112                    return self.__getattr__(attr.replace('_', '-'))
3113                except AttributeError:
3114                    pass
3115            return super(FlagValue, self).__getattribute__(attr)
3116
3117    def __setattr__(self, attr, value):
3118        # type: (str, Union[List[str], int, str]) -> None
3119        if attr == "value" and not isinstance(value, int):
3120            raise ValueError(value)
3121        if attr in self.__slots__:
3122            return super(FlagValue, self).__setattr__(attr, value)
3123        if attr in self.names:
3124            if value:
3125                self.value |= (2 ** self.names.index(attr))
3126            else:
3127                self.value &= ~(2 ** self.names.index(attr))
3128        else:
3129            return super(FlagValue, self).__setattr__(attr, value)
3130
3131    def copy(self):
3132        # type: () -> FlagValue
3133        return self.__class__(self.value, self.names)
3134
3135
3136class FlagsField(_BitField[Optional[Union[int, FlagValue]]]):
3137    """ Handle Flag type field
3138
3139   Make sure all your flags have a label
3140
3141   Example (list):
3142       >>> from scapy.packet import Packet
3143       >>> class FlagsTest(Packet):
3144               fields_desc = [FlagsField("flags", 0, 8, ["f0", "f1", "f2", "f3", "f4", "f5", "f6", "f7"])]  # noqa: E501
3145       >>> FlagsTest(flags=9).show2()
3146       ###[ FlagsTest ]###
3147         flags     = f0+f3
3148
3149    Example (str):
3150       >>> from scapy.packet import Packet
3151       >>> class TCPTest(Packet):
3152               fields_desc = [
3153                   BitField("reserved", 0, 7),
3154                   FlagsField("flags", 0x2, 9, "FSRPAUECN")
3155               ]
3156       >>> TCPTest(flags=3).show2()
3157       ###[ FlagsTest ]###
3158         reserved  = 0
3159         flags     = FS
3160
3161    Example (dict):
3162       >>> from scapy.packet import Packet
3163       >>> class FlagsTest2(Packet):
3164               fields_desc = [
3165                   FlagsField("flags", 0x2, 16, {
3166                       0x0001: "A",
3167                       0x0008: "B",
3168                   })
3169               ]
3170
3171   :param name: field's name
3172   :param default: default value for the field
3173   :param size: number of bits in the field (in bits). if negative, LE
3174   :param names: (list or str or dict) label for each flag
3175       If it's a str or a list, the least Significant Bit tag's name
3176       is written first.
3177   """
3178    ismutable = True
3179    __slots__ = ["names"]
3180
3181    def __init__(self,
3182                 name,  # type: str
3183                 default,  # type: Optional[Union[int, FlagValue]]
3184                 size,  # type: int
3185                 names,     # type: Union[List[str], str, Dict[int, str]]
3186                 **kwargs   # type: Any
3187                 ):
3188        # type: (...) -> None
3189        # Convert the dict to a list
3190        if isinstance(names, dict):
3191            tmp = ["bit_%d" % i for i in range(abs(size))]
3192            for i, v in names.items():
3193                tmp[int(math.floor(math.log(i, 2)))] = v
3194            names = tmp
3195        # Store the names as str or list
3196        self.names = names
3197        super(FlagsField, self).__init__(name, default, size, **kwargs)
3198
3199    def _fixup_val(self, x):
3200        # type: (Any) -> Optional[FlagValue]
3201        """Returns a FlagValue instance when needed. Internal method, to be
3202used in *2i() and i2*() methods.
3203
3204        """
3205        if isinstance(x, (FlagValue, VolatileValue)):
3206            return x  # type: ignore
3207        if x is None:
3208            return None
3209        return FlagValue(x, self.names)
3210
3211    def any2i(self, pkt, x):
3212        # type: (Optional[Packet], Any) -> Optional[FlagValue]
3213        return self._fixup_val(super(FlagsField, self).any2i(pkt, x))
3214
3215    def m2i(self, pkt, x):
3216        # type: (Optional[Packet], int) -> Optional[FlagValue]
3217        return self._fixup_val(super(FlagsField, self).m2i(pkt, x))
3218
3219    def i2h(self, pkt, x):
3220        # type: (Optional[Packet], Any) -> Optional[FlagValue]
3221        return self._fixup_val(super(FlagsField, self).i2h(pkt, x))
3222
3223    def i2repr(self,
3224               pkt,  # type: Optional[Packet]
3225               x,  # type: Any
3226               ):
3227        # type: (...) -> str
3228        if isinstance(x, (list, tuple)):
3229            return repr(type(x)(
3230                "None" if v is None else str(self._fixup_val(v)) for v in x
3231            ))
3232        return "None" if x is None else str(self._fixup_val(x))
3233
3234
3235MultiFlagsEntry = collections.namedtuple('MultiFlagsEntry', ['short', 'long'])
3236
3237
3238class MultiFlagsField(_BitField[Set[str]]):
3239    __slots__ = FlagsField.__slots__ + ["depends_on"]
3240
3241    def __init__(self,
3242                 name,  # type: str
3243                 default,  # type: Set[str]
3244                 size,  # type: int
3245                 names,  # type: Dict[int, Dict[int, MultiFlagsEntry]]
3246                 depends_on,  # type: Callable[[Optional[Packet]], int]
3247                 ):
3248        # type: (...) -> None
3249        self.names = names
3250        self.depends_on = depends_on
3251        super(MultiFlagsField, self).__init__(name, default, size)
3252
3253    def any2i(self, pkt, x):
3254        # type: (Optional[Packet], Any) -> Set[str]
3255        if not isinstance(x, (set, int)):
3256            raise ValueError('set expected')
3257
3258        if pkt is not None:
3259            if isinstance(x, int):
3260                return self.m2i(pkt, x)
3261            else:
3262                v = self.depends_on(pkt)
3263                if v is not None:
3264                    assert v in self.names, 'invalid dependency'
3265                    these_names = self.names[v]
3266                    s = set()
3267                    for i in x:
3268                        for val in these_names.values():
3269                            if val.short == i:
3270                                s.add(i)
3271                                break
3272                        else:
3273                            assert False, 'Unknown flag "{}" with this dependency'.format(i)  # noqa: E501
3274                            continue
3275                    return s
3276        if isinstance(x, int):
3277            return set()
3278        return x
3279
3280    def i2m(self, pkt, x):
3281        # type: (Optional[Packet], Optional[Set[str]]) -> int
3282        v = self.depends_on(pkt)
3283        these_names = self.names.get(v, {})
3284
3285        r = 0
3286        if x is None:
3287            return r
3288        for flag_set in x:
3289            for i, val in these_names.items():
3290                if val.short == flag_set:
3291                    r |= 1 << i
3292                    break
3293            else:
3294                r |= 1 << int(flag_set[len('bit '):])
3295        return r
3296
3297    def m2i(self, pkt, x):
3298        # type: (Optional[Packet], int) -> Set[str]
3299        v = self.depends_on(pkt)
3300        these_names = self.names.get(v, {})
3301
3302        r = set()
3303        i = 0
3304        while x:
3305            if x & 1:
3306                if i in these_names:
3307                    r.add(these_names[i].short)
3308                else:
3309                    r.add('bit {}'.format(i))
3310            x >>= 1
3311            i += 1
3312        return r
3313
3314    def i2repr(self, pkt, x):
3315        # type: (Optional[Packet], Set[str]) -> str
3316        v = self.depends_on(pkt)
3317        these_names = self.names.get(v, {})
3318
3319        r = set()
3320        for flag_set in x:
3321            for i in these_names.values():
3322                if i.short == flag_set:
3323                    r.add("{} ({})".format(i.long, i.short))
3324                    break
3325            else:
3326                r.add(flag_set)
3327        return repr(r)
3328
3329
3330class FixedPointField(BitField):
3331    __slots__ = ['frac_bits']
3332
3333    def __init__(self, name, default, size, frac_bits=16):
3334        # type: (str, int, int, int) -> None
3335        self.frac_bits = frac_bits
3336        super(FixedPointField, self).__init__(name, default, size)
3337
3338    def any2i(self, pkt, val):
3339        # type: (Optional[Packet], Optional[float]) -> Optional[int]
3340        if val is None:
3341            return val
3342        ival = int(val)
3343        fract = int((val - ival) * 2**self.frac_bits)
3344        return (ival << self.frac_bits) | fract
3345
3346    def i2h(self, pkt, val):
3347        # type: (Optional[Packet], Optional[int]) -> Optional[EDecimal]
3348        # A bit of trickery to get precise floats
3349        if val is None:
3350            return val
3351        int_part = val >> self.frac_bits
3352        pw = 2.0**self.frac_bits
3353        frac_part = EDecimal(val & (1 << self.frac_bits) - 1)
3354        frac_part /= pw  # type: ignore
3355        return int_part + frac_part.normalize(int(math.log10(pw)))
3356
3357    def i2repr(self, pkt, val):
3358        # type: (Optional[Packet], int) -> str
3359        return str(self.i2h(pkt, val))
3360
3361
3362# Base class for IPv4 and IPv6 Prefixes inspired by IPField and IP6Field.
3363# Machine values are encoded in a multiple of wordbytes bytes.
3364class _IPPrefixFieldBase(Field[Tuple[str, int], Tuple[bytes, int]]):
3365    __slots__ = ["wordbytes", "maxbytes", "aton", "ntoa", "length_from"]
3366
3367    def __init__(
3368            self,
3369            name,  # type: str
3370            default,  # type: Tuple[str, int]
3371            wordbytes,  # type: int
3372            maxbytes,  # type: int
3373            aton,  # type: Callable[..., Any]
3374            ntoa,  # type: Callable[..., Any]
3375            length_from=None  # type: Optional[Callable[[Packet], int]]
3376    ):
3377        # type: (...) -> None
3378        self.wordbytes = wordbytes
3379        self.maxbytes = maxbytes
3380        self.aton = aton
3381        self.ntoa = ntoa
3382        Field.__init__(self, name, default, "%is" % self.maxbytes)
3383        if length_from is None:
3384            length_from = lambda x: 0
3385        self.length_from = length_from
3386
3387    def _numbytes(self, pfxlen):
3388        # type: (int) -> int
3389        wbits = self.wordbytes * 8
3390        return ((pfxlen + (wbits - 1)) // wbits) * self.wordbytes
3391
3392    def h2i(self, pkt, x):
3393        # type: (Optional[Packet], str) -> Tuple[str, int]
3394        # "fc00:1::1/64" -> ("fc00:1::1", 64)
3395        [pfx, pfxlen] = x.split('/')
3396        self.aton(pfx)  # check for validity
3397        return (pfx, int(pfxlen))
3398
3399    def i2h(self, pkt, x):
3400        # type: (Optional[Packet], Tuple[str, int]) -> str
3401        # ("fc00:1::1", 64) -> "fc00:1::1/64"
3402        (pfx, pfxlen) = x
3403        return "%s/%i" % (pfx, pfxlen)
3404
3405    def i2m(self,
3406            pkt,  # type: Optional[Packet]
3407            x  # type: Optional[Tuple[str, int]]
3408            ):
3409        # type: (...) -> Tuple[bytes, int]
3410        # ("fc00:1::1", 64) -> (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64)  # noqa: E501
3411        if x is None:
3412            pfx, pfxlen = "", 0
3413        else:
3414            (pfx, pfxlen) = x
3415        s = self.aton(pfx)
3416        return (s[:self._numbytes(pfxlen)], pfxlen)
3417
3418    def m2i(self, pkt, x):
3419        # type: (Optional[Packet], Tuple[bytes, int]) -> Tuple[str, int]
3420        # (b"\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) -> ("fc00:1::1", 64)  # noqa: E501
3421        (s, pfxlen) = x
3422
3423        if len(s) < self.maxbytes:
3424            s = s + (b"\0" * (self.maxbytes - len(s)))
3425        return (self.ntoa(s), pfxlen)
3426
3427    def any2i(self, pkt, x):
3428        # type: (Optional[Packet], Optional[Any]) -> Tuple[str, int]
3429        if x is None:
3430            return (self.ntoa(b"\0" * self.maxbytes), 1)
3431
3432        return self.h2i(pkt, x)
3433
3434    def i2len(self, pkt, x):
3435        # type: (Packet, Tuple[str, int]) -> int
3436        (_, pfxlen) = x
3437        return pfxlen
3438
3439    def addfield(self, pkt, s, val):
3440        # type: (Packet, bytes, Optional[Tuple[str, int]]) -> bytes
3441        (rawpfx, pfxlen) = self.i2m(pkt, val)
3442        fmt = "!%is" % self._numbytes(pfxlen)
3443        return s + struct.pack(fmt, rawpfx)
3444
3445    def getfield(self, pkt, s):
3446        # type: (Packet, bytes) -> Tuple[bytes, Tuple[str, int]]
3447        pfxlen = self.length_from(pkt)
3448        numbytes = self._numbytes(pfxlen)
3449        fmt = "!%is" % numbytes
3450        return s[numbytes:], self.m2i(pkt, (struct.unpack(fmt, s[:numbytes])[0], pfxlen))  # noqa: E501
3451
3452
3453class IPPrefixField(_IPPrefixFieldBase):
3454    def __init__(
3455            self,
3456            name,  # type: str
3457            default,  # type: Tuple[str, int]
3458            wordbytes=1,  # type: int
3459            length_from=None  # type: Optional[Callable[[Packet], int]]
3460    ):
3461        _IPPrefixFieldBase.__init__(
3462            self,
3463            name,
3464            default,
3465            wordbytes,
3466            4,
3467            inet_aton,
3468            inet_ntoa,
3469            length_from
3470        )
3471
3472
3473class IP6PrefixField(_IPPrefixFieldBase):
3474    def __init__(
3475            self,
3476            name,  # type: str
3477            default,  # type: Tuple[str, int]
3478            wordbytes=1,  # type: int
3479            length_from=None  # type: Optional[Callable[[Packet], int]]
3480    ):
3481        # type: (...) -> None
3482        _IPPrefixFieldBase.__init__(
3483            self,
3484            name,
3485            default,
3486            wordbytes,
3487            16,
3488            lambda a: inet_pton(socket.AF_INET6, a),
3489            lambda n: inet_ntop(socket.AF_INET6, n),
3490            length_from
3491        )
3492
3493
3494class UTCTimeField(Field[float, int]):
3495    __slots__ = ["epoch", "delta", "strf",
3496                 "use_msec", "use_micro", "use_nano", "custom_scaling"]
3497
3498    def __init__(self,
3499                 name,  # type: str
3500                 default,  # type: int
3501                 use_msec=False,  # type: bool
3502                 use_micro=False,  # type: bool
3503                 use_nano=False,  # type: bool
3504                 epoch=None,  # type: Optional[Tuple[int, int, int, int, int, int, int, int, int]]  # noqa: E501
3505                 strf="%a, %d %b %Y %H:%M:%S %z",  # type: str
3506                 custom_scaling=None,  # type: Optional[int]
3507                 fmt="I"  # type: str
3508                 ):
3509        # type: (...) -> None
3510        Field.__init__(self, name, default, fmt=fmt)
3511        mk_epoch = EPOCH if epoch is None else calendar.timegm(epoch)
3512        self.epoch = mk_epoch
3513        self.delta = mk_epoch - EPOCH
3514        self.strf = strf
3515        self.use_msec = use_msec
3516        self.use_micro = use_micro
3517        self.use_nano = use_nano
3518        self.custom_scaling = custom_scaling
3519
3520    def i2repr(self, pkt, x):
3521        # type: (Optional[Packet], float) -> str
3522        if x is None:
3523            x = time.time() - self.delta
3524        elif self.use_msec:
3525            x = x / 1e3
3526        elif self.use_micro:
3527            x = x / 1e6
3528        elif self.use_nano:
3529            x = x / 1e9
3530        elif self.custom_scaling:
3531            x = x / self.custom_scaling
3532        x += self.delta
3533        # To make negative timestamps work on all plateforms (e.g. Windows),
3534        # we need a trick.
3535        t = (
3536            datetime.datetime(1970, 1, 1) +
3537            datetime.timedelta(seconds=x)
3538        ).strftime(self.strf)
3539        return "%s (%d)" % (t, int(x))
3540
3541    def i2m(self, pkt, x):
3542        # type: (Optional[Packet], Optional[float]) -> int
3543        if x is None:
3544            x = time.time() - self.delta
3545            if self.use_msec:
3546                x = x * 1e3
3547            elif self.use_micro:
3548                x = x * 1e6
3549            elif self.use_nano:
3550                x = x * 1e9
3551            elif self.custom_scaling:
3552                x = x * self.custom_scaling
3553            return int(x)
3554        return int(x)
3555
3556
3557class SecondsIntField(Field[float, int]):
3558    __slots__ = ["use_msec", "use_micro", "use_nano"]
3559
3560    def __init__(self, name, default,
3561                 use_msec=False,
3562                 use_micro=False,
3563                 use_nano=False):
3564        # type: (str, int, bool, bool, bool) -> None
3565        Field.__init__(self, name, default, "I")
3566        self.use_msec = use_msec
3567        self.use_micro = use_micro
3568        self.use_nano = use_nano
3569
3570    def i2repr(self, pkt, x):
3571        # type: (Optional[Packet], Optional[float]) -> str
3572        if x is None:
3573            y = 0  # type: Union[int, float]
3574        elif self.use_msec:
3575            y = x / 1e3
3576        elif self.use_micro:
3577            y = x / 1e6
3578        elif self.use_nano:
3579            y = x / 1e9
3580        else:
3581            y = x
3582        return "%s sec" % y
3583
3584
3585class _ScalingField(object):
3586    def __init__(self,
3587                 name,  # type: str
3588                 default,  # type: float
3589                 scaling=1,  # type: Union[int, float]
3590                 unit="",  # type: str
3591                 offset=0,  # type: Union[int, float]
3592                 ndigits=3,  # type: int
3593                 fmt="B",  # type: str
3594                 ):
3595        # type: (...) -> None
3596        self.scaling = scaling
3597        self.unit = unit
3598        self.offset = offset
3599        self.ndigits = ndigits
3600        Field.__init__(self, name, default, fmt)  # type: ignore
3601
3602    def i2m(self,
3603            pkt,  # type: Optional[Packet]
3604            x  # type: Optional[Union[int, float]]
3605            ):
3606        # type: (...) -> Union[int, float]
3607        if x is None:
3608            x = 0
3609        x = (x - self.offset) / self.scaling
3610        if isinstance(x, float) and self.fmt[-1] != "f":  # type: ignore
3611            x = int(round(x))
3612        return x
3613
3614    def m2i(self, pkt, x):
3615        # type: (Optional[Packet], Union[int, float]) -> Union[int, float]
3616        x = x * self.scaling + self.offset
3617        if isinstance(x, float) and self.fmt[-1] != "f":  # type: ignore
3618            x = round(x, self.ndigits)
3619        return x
3620
3621    def any2i(self, pkt, x):
3622        # type: (Optional[Packet], Any) -> Union[int, float]
3623        if isinstance(x, (str, bytes)):
3624            x = struct.unpack(self.fmt, bytes_encode(x))[0]  # type: ignore
3625            x = self.m2i(pkt, x)
3626        if not isinstance(x, (int, float)):
3627            raise ValueError("Unknown type")
3628        return x
3629
3630    def i2repr(self, pkt, x):
3631        # type: (Optional[Packet], Union[int, float]) -> str
3632        return "%s %s" % (
3633            self.i2h(pkt, x),  # type: ignore
3634            self.unit
3635        )
3636
3637    def randval(self):
3638        # type: () -> RandFloat
3639        value = Field.randval(self)  # type: ignore
3640        if value is not None:
3641            min_val = round(value.min * self.scaling + self.offset,
3642                            self.ndigits)
3643            max_val = round(value.max * self.scaling + self.offset,
3644                            self.ndigits)
3645
3646            return RandFloat(min(min_val, max_val), max(min_val, max_val))
3647
3648
3649class ScalingField(_ScalingField,
3650                   Field[Union[int, float], Union[int, float]]):
3651    """ Handle physical values which are scaled and/or offset for communication
3652
3653       Example:
3654           >>> from scapy.packet import Packet
3655           >>> class ScalingFieldTest(Packet):
3656                   fields_desc = [ScalingField('data', 0, scaling=0.1, offset=-1, unit='mV')]  # noqa: E501
3657           >>> ScalingFieldTest(data=10).show2()
3658           ###[ ScalingFieldTest ]###
3659             data= 10.0 mV
3660           >>> hexdump(ScalingFieldTest(data=10))
3661           0000  6E                                               n
3662           >>> hexdump(ScalingFieldTest(data=b"\x6D"))
3663           0000  6D                                               m
3664           >>> ScalingFieldTest(data=b"\x6D").show2()
3665           ###[ ScalingFieldTest ]###
3666             data= 9.9 mV
3667
3668        bytes(ScalingFieldTest(...)) will produce 0x6E in this example.
3669        0x6E is 110 (decimal). This is calculated through the scaling factor
3670        and the offset. "data" was set to 10, which means, we want to transfer
3671        the physical value 10 mV. To calculate the value, which has to be
3672        sent on the bus, the offset has to subtracted and the scaling has to be
3673        applied by division through the scaling factor.
3674        bytes = (data - offset) / scaling
3675        bytes = ( 10  -  (-1) ) /    0.1
3676        bytes =  110 = 0x6E
3677
3678        If you want to force a certain internal value, you can assign a byte-
3679        string to the field (data=b"\x6D"). If a string of a bytes object is
3680        given to the field, no internal value conversion will be applied
3681
3682       :param name: field's name
3683       :param default: default value for the field
3684       :param scaling: scaling factor for the internal value conversion
3685       :param unit: string for the unit representation of the internal value
3686       :param offset: value to offset the internal value during conversion
3687       :param ndigits: number of fractional digits for the internal conversion
3688       :param fmt: struct.pack format used to parse and serialize the internal value from and to machine representation # noqa: E501
3689       """
3690
3691
3692class BitScalingField(_ScalingField, BitField):  # type: ignore
3693    """
3694    A ScalingField that is a BitField
3695    """
3696
3697    def __init__(self, name, default, size, *args, **kwargs):
3698        # type: (str, int, int, *Any, **Any) -> None
3699        _ScalingField.__init__(self, name, default, *args, **kwargs)
3700        BitField.__init__(self, name, default, size)  # type: ignore
3701
3702
3703class OUIField(X3BytesField):
3704    """
3705    A field designed to carry a OUI (3 bytes)
3706    """
3707
3708    def i2repr(self, pkt, val):
3709        # type: (Optional[Packet], int) -> str
3710        by_val = struct.pack("!I", val or 0)[1:]
3711        oui = str2mac(by_val + b"\0" * 3)[:8]
3712        if conf.manufdb:
3713            fancy = conf.manufdb._get_manuf(oui)
3714            if fancy != oui:
3715                return "%s (%s)" % (fancy, oui)
3716        return oui
3717
3718
3719class UUIDField(Field[UUID, bytes]):
3720    """Field for UUID storage, wrapping Python's uuid.UUID type.
3721
3722    The internal storage format of this field is ``uuid.UUID`` from the Python
3723    standard library.
3724
3725    There are three formats (``uuid_fmt``) for this field type:
3726
3727    * ``FORMAT_BE`` (default): the UUID is six fields in big-endian byte order,
3728      per RFC 4122.
3729
3730      This format is used by DHCPv6 (RFC 6355) and most network protocols.
3731
3732    * ``FORMAT_LE``: the UUID is six fields, with ``time_low``, ``time_mid``
3733      and ``time_high_version`` in little-endian byte order. This *doesn't*
3734      change the arrangement of the fields from RFC 4122.
3735
3736      This format is used by Microsoft's COM/OLE libraries.
3737
3738    * ``FORMAT_REV``: the UUID is a single 128-bit integer in little-endian
3739      byte order. This *changes the arrangement* of the fields.
3740
3741      This format is used by Bluetooth Low Energy.
3742
3743    Note: You should use the constants here.
3744
3745    The "human encoding" of this field supports a number of different input
3746    formats, and wraps Python's ``uuid.UUID`` library appropriately:
3747
3748    * Given a bytearray, bytes or str of 16 bytes, this class decodes UUIDs in
3749      wire format.
3750
3751    * Given a bytearray, bytes or str of other lengths, this delegates to
3752      ``uuid.UUID`` the Python standard library. This supports a number of
3753      different encoding options -- see the Python standard library
3754      documentation for more details.
3755
3756    * Given an int or long, presumed to be a 128-bit integer to pass to
3757      ``uuid.UUID``.
3758
3759    * Given a tuple:
3760
3761      * Tuples of 11 integers are treated as having the last 6 integers forming
3762        the ``node`` field, and are merged before being passed as a tuple of 6
3763        integers to ``uuid.UUID``.
3764
3765      * Otherwise, the tuple is passed as the ``fields`` parameter to
3766        ``uuid.UUID`` directly without modification.
3767
3768        ``uuid.UUID`` expects a tuple of 6 integers.
3769
3770    Other types (such as ``uuid.UUID``) are passed through.
3771    """
3772
3773    __slots__ = ["uuid_fmt"]
3774
3775    FORMAT_BE = 0
3776    FORMAT_LE = 1
3777    FORMAT_REV = 2
3778
3779    # Change this when we get new formats
3780    FORMATS = (FORMAT_BE, FORMAT_LE, FORMAT_REV)
3781
3782    def __init__(self, name, default, uuid_fmt=FORMAT_BE):
3783        # type: (str, Optional[int], int) -> None
3784        self.uuid_fmt = uuid_fmt
3785        self._check_uuid_fmt()
3786        Field.__init__(self, name, default, "16s")
3787
3788    def _check_uuid_fmt(self):
3789        # type: () -> None
3790        """Checks .uuid_fmt, and raises an exception if it is not valid."""
3791        if self.uuid_fmt not in UUIDField.FORMATS:
3792            raise FieldValueRangeException(
3793                "Unsupported uuid_fmt ({})".format(self.uuid_fmt))
3794
3795    def i2m(self, pkt, x):
3796        # type: (Optional[Packet], Optional[UUID]) -> bytes
3797        self._check_uuid_fmt()
3798        if x is None:
3799            return b'\0' * 16
3800        if self.uuid_fmt == UUIDField.FORMAT_BE:
3801            return x.bytes
3802        elif self.uuid_fmt == UUIDField.FORMAT_LE:
3803            return x.bytes_le
3804        elif self.uuid_fmt == UUIDField.FORMAT_REV:
3805            return x.bytes[::-1]
3806        else:
3807            raise FieldAttributeException("Unknown fmt")
3808
3809    def m2i(self,
3810            pkt,  # type: Optional[Packet]
3811            x,  # type: bytes
3812            ):
3813        # type: (...) -> UUID
3814        self._check_uuid_fmt()
3815        if self.uuid_fmt == UUIDField.FORMAT_BE:
3816            return UUID(bytes=x)
3817        elif self.uuid_fmt == UUIDField.FORMAT_LE:
3818            return UUID(bytes_le=x)
3819        elif self.uuid_fmt == UUIDField.FORMAT_REV:
3820            return UUID(bytes=x[::-1])
3821        else:
3822            raise FieldAttributeException("Unknown fmt")
3823
3824    def any2i(self,
3825              pkt,  # type: Optional[Packet]
3826              x  # type: Any  # noqa: E501
3827              ):
3828        # type: (...) -> Optional[UUID]
3829        # Python's uuid doesn't handle bytearray, so convert to an immutable
3830        # type first.
3831        if isinstance(x, bytearray):
3832            x = bytes_encode(x)
3833
3834        if isinstance(x, int):
3835            u = UUID(int=x)
3836        elif isinstance(x, tuple):
3837            if len(x) == 11:
3838                # For compatibility with dce_rpc: this packs into a tuple where
3839                # elements 7..10 are the 48-bit node ID.
3840                node = 0
3841                for i in x[5:]:
3842                    node = (node << 8) | i
3843
3844                x = (x[0], x[1], x[2], x[3], x[4], node)
3845
3846            u = UUID(fields=x)
3847        elif isinstance(x, (str, bytes)):
3848            if len(x) == 16:
3849                # Raw bytes
3850                u = self.m2i(pkt, bytes_encode(x))
3851            else:
3852                u = UUID(plain_str(x))
3853        elif isinstance(x, (UUID, RandUUID)):
3854            u = cast(UUID, x)
3855        else:
3856            return None
3857        return u
3858
3859    @staticmethod
3860    def randval():
3861        # type: () -> RandUUID
3862        return RandUUID()
3863
3864
3865class UUIDEnumField(UUIDField, _EnumField[UUID]):
3866    __slots__ = EnumField.__slots__
3867
3868    def __init__(self, name, default, enum, uuid_fmt=0):
3869        # type: (str, Optional[int], Any, int) -> None
3870        _EnumField.__init__(self, name, default, enum, "16s")  # type: ignore
3871        UUIDField.__init__(self, name, default, uuid_fmt=uuid_fmt)
3872
3873    def any2i(self, pkt, x):
3874        # type: (Optional[Packet], Any) -> UUID
3875        return _EnumField.any2i(self, pkt, x)  # type: ignore
3876
3877    def i2repr(self,
3878               pkt,  # type: Optional[Packet]
3879               x,  # type: UUID
3880               ):
3881        # type: (...) -> Any
3882        return _EnumField.i2repr(self, pkt, x)
3883
3884
3885class BitExtendedField(Field[Optional[int], bytes]):
3886    """
3887    Bit Extended Field
3888
3889    This type of field has a variable number of bytes. Each byte is defined
3890    as follows:
3891    - 7 bits of data
3892    - 1 bit an an extension bit:
3893
3894      + 0 means it is last byte of the field ("stopping bit")
3895      + 1 means there is another byte after this one ("forwarding bit")
3896
3897    To get the actual data, it is necessary to hop the binary data byte per
3898    byte and to check the extension bit until 0
3899    """
3900
3901    __slots__ = ["extension_bit"]
3902
3903    def prepare_byte(self, x):
3904        # type: (int) -> int
3905        # Moves the forwarding bit to the LSB
3906        x = int(x)
3907        fx_bit = (x & 2**self.extension_bit) >> self.extension_bit
3908        lsb_bits = x & 2**self.extension_bit - 1
3909        msb_bits = x >> (self.extension_bit + 1)
3910        x = (msb_bits << (self.extension_bit + 1)) + (lsb_bits << 1) + fx_bit
3911        return x
3912
3913    def str2extended(self, x=b""):
3914        # type: (bytes) -> Tuple[bytes, Optional[int]]
3915        # For convenience, we reorder the byte so that the forwarding
3916        # bit is always the LSB. We then apply the same algorithm
3917        # whatever the real forwarding bit position
3918
3919        # First bit is the stopping bit at zero
3920        bits = 0b0
3921        end = None
3922
3923        # We retrieve 7 bits.
3924        # If "forwarding bit" is 1 then we continue on another byte
3925        i = 0
3926        for c in bytearray(x):
3927            c = self.prepare_byte(c)
3928            bits = bits << 7 | (int(c) >> 1)
3929            if not int(c) & 0b1:
3930                end = x[i + 1:]
3931                break
3932            i = i + 1
3933        if end is None:
3934            # We reached the end of the data but there was no
3935            # "ending bit". This is not normal.
3936            return b"", None
3937        else:
3938            return end, bits
3939
3940    def extended2str(self, x):
3941        # type: (Optional[int]) -> bytes
3942        if x is None:
3943            return b""
3944        x = int(x)
3945        s = []
3946        LSByte = True
3947        FX_Missing = True
3948        bits = 0b0
3949        i = 0
3950        while (x > 0 or FX_Missing):
3951            if i == 8:
3952                # End of byte
3953                i = 0
3954                s.append(bits)
3955                bits = 0b0
3956                FX_Missing = True
3957            else:
3958                if i % 8 == self.extension_bit:
3959                    # This is extension bit
3960                    if LSByte:
3961                        bits = bits | 0b0 << i
3962                        LSByte = False
3963                    else:
3964                        bits = bits | 0b1 << i
3965                    FX_Missing = False
3966                else:
3967                    bits = bits | (x & 0b1) << i
3968                    x = x >> 1
3969                # Still some bits
3970                i = i + 1
3971        s.append(bits)
3972
3973        result = "".encode()
3974        for x in s[:: -1]:
3975            result = result + struct.pack(">B", x)
3976        return result
3977
3978    def __init__(self, name, default, extension_bit):
3979        # type: (str, Optional[Any], int) -> None
3980        Field.__init__(self, name, default, "B")
3981        self.extension_bit = extension_bit
3982
3983    def i2m(self, pkt, x):
3984        # type: (Optional[Any], Optional[int]) -> bytes
3985        return self.extended2str(x)
3986
3987    def m2i(self, pkt, x):
3988        # type: (Optional[Any], bytes) -> Optional[int]
3989        return self.str2extended(x)[1]
3990
3991    def addfield(self, pkt, s, val):
3992        # type: (Optional[Packet], bytes, Optional[int]) -> bytes
3993        return s + self.i2m(pkt, val)
3994
3995    def getfield(self, pkt, s):
3996        # type: (Optional[Any], bytes) -> Tuple[bytes, Optional[int]]
3997        return self.str2extended(s)
3998
3999
4000class LSBExtendedField(BitExtendedField):
4001    # This is a BitExtendedField with the extension bit on LSB
4002    def __init__(self, name, default):
4003        # type: (str, Optional[Any]) -> None
4004        BitExtendedField.__init__(self, name, default, extension_bit=0)
4005
4006
4007class MSBExtendedField(BitExtendedField):
4008    # This is a BitExtendedField with the extension bit on MSB
4009    def __init__(self, name, default):
4010        # type: (str, Optional[Any]) -> None
4011        BitExtendedField.__init__(self, name, default, extension_bit=7)
4012