• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Implementation of the configuration object.
8"""
9
10
11import atexit
12import copy
13import functools
14import os
15import re
16import socket
17import sys
18import time
19import warnings
20
21from dataclasses import dataclass
22from enum import Enum
23
24import importlib
25import importlib.abc
26import importlib.util
27
28import scapy
29from scapy import VERSION
30from scapy.base_classes import BasePacket
31from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS
32from scapy.error import (
33    log_loading,
34    log_scapy,
35    ScapyInvalidPlatformException,
36    warning,
37)
38from scapy.themes import ColorTheme, NoTheme, apply_ipython_style
39
40# Typing imports
41from typing import (
42    cast,
43    Any,
44    Callable,
45    Dict,
46    Iterator,
47    List,
48    NoReturn,
49    Optional,
50    Set,
51    Tuple,
52    Type,
53    Union,
54    overload,
55    TYPE_CHECKING,
56)
57from types import ModuleType
58from scapy.compat import DecoratorCallable
59
60if TYPE_CHECKING:
61    # Do not import at runtime
62    import scapy.as_resolvers
63    from scapy.modules.nmap import NmapKnowledgeBase
64    from scapy.packet import Packet
65    from scapy.supersocket import SuperSocket  # noqa: F401
66    import scapy.asn1.asn1
67    import scapy.asn1.mib
68
69############
70#  Config  #
71############
72
73
74class ConfClass(object):
75    def configure(self, cnf):
76        # type: (ConfClass) -> None
77        self.__dict__ = cnf.__dict__.copy()
78
79    def __repr__(self):
80        # type: () -> str
81        return str(self)
82
83    def __str__(self):
84        # type: () -> str
85        s = ""
86        dkeys = self.__class__.__dict__.copy()
87        dkeys.update(self.__dict__)
88        keys = sorted(dkeys)
89        for i in keys:
90            if i[0] != "_":
91                r = repr(getattr(self, i))
92                r = " ".join(r.split())
93                wlen = 76 - max(len(i), 10)
94                if len(r) > wlen:
95                    r = r[:wlen - 3] + "..."
96                s += "%-10s = %s\n" % (i, r)
97        return s[:-1]
98
99
100class Interceptor(object):
101    def __init__(self,
102                 name,  # type: str
103                 default,  # type: Any
104                 hook,  # type: Callable[..., Any]
105                 args=None,  # type: Optional[List[Any]]
106                 kargs=None  # type: Optional[Dict[str, Any]]
107                 ):
108        # type: (...) -> None
109        self.name = name
110        self.intname = "_intercepted_%s" % name
111        self.default = default
112        self.hook = hook
113        self.args = args if args is not None else []
114        self.kargs = kargs if kargs is not None else {}
115
116    def __get__(self, obj, typ=None):
117        # type: (Conf, Optional[type]) -> Any
118        if not hasattr(obj, self.intname):
119            setattr(obj, self.intname, self.default)
120        return getattr(obj, self.intname)
121
122    @staticmethod
123    def set_from_hook(obj, name, val):
124        # type: (Conf, str, bool) -> None
125        int_name = "_intercepted_%s" % name
126        setattr(obj, int_name, val)
127
128    def __set__(self, obj, val):
129        # type: (Conf, Any) -> None
130        old = getattr(obj, self.intname, self.default)
131        val = self.hook(self.name, val, old, *self.args, **self.kargs)
132        setattr(obj, self.intname, val)
133
134
135def _readonly(name):
136    # type: (str) -> NoReturn
137    default = Conf.__dict__[name].default
138    Interceptor.set_from_hook(conf, name, default)
139    raise ValueError("Read-only value !")
140
141
142ReadOnlyAttribute = functools.partial(
143    Interceptor,
144    hook=(lambda name, *args, **kwargs: _readonly(name))
145)
146ReadOnlyAttribute.__doc__ = "Read-only class attribute"
147
148
149class ProgPath(ConfClass):
150    _default: str = "<System default>"
151    universal_open: str = "open" if DARWIN else "xdg-open"
152    pdfreader: str = universal_open
153    psreader: str = universal_open
154    svgreader: str = universal_open
155    dot: str = "dot"
156    display: str = "display"
157    tcpdump: str = "tcpdump"
158    tcpreplay: str = "tcpreplay"
159    hexedit: str = "hexer"
160    tshark: str = "tshark"
161    wireshark: str = "wireshark"
162    ifconfig: str = "ifconfig"
163    extcap_folders: List[str] = [
164        os.path.join(os.path.expanduser("~"), ".config", "wireshark", "extcap"),
165        "/usr/lib/x86_64-linux-gnu/wireshark/extcap",
166    ]
167
168
169class ConfigFieldList:
170    def __init__(self):
171        # type: () -> None
172        self.fields = set()  # type: Set[Any]
173        self.layers = set()  # type: Set[Any]
174
175    @staticmethod
176    def _is_field(f):
177        # type: (Any) -> bool
178        return hasattr(f, "owners")
179
180    def _recalc_layer_list(self):
181        # type: () -> None
182        self.layers = {owner for f in self.fields for owner in f.owners}
183
184    def add(self, *flds):
185        # type: (*Any) -> None
186        self.fields |= {f for f in flds if self._is_field(f)}
187        self._recalc_layer_list()
188
189    def remove(self, *flds):
190        # type: (*Any) -> None
191        self.fields -= set(flds)
192        self._recalc_layer_list()
193
194    def __contains__(self, elt):
195        # type: (Any) -> bool
196        if isinstance(elt, BasePacket):
197            return elt in self.layers
198        return elt in self.fields
199
200    def __repr__(self):
201        # type: () -> str
202        return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields))  # noqa: E501
203
204
205class Emphasize(ConfigFieldList):
206    pass
207
208
209class Resolve(ConfigFieldList):
210    pass
211
212
213class Num2Layer:
214    def __init__(self):
215        # type: () -> None
216        self.num2layer = {}  # type: Dict[int, Type[Packet]]
217        self.layer2num = {}  # type: Dict[Type[Packet], int]
218
219    def register(self, num, layer):
220        # type: (int, Type[Packet]) -> None
221        self.register_num2layer(num, layer)
222        self.register_layer2num(num, layer)
223
224    def register_num2layer(self, num, layer):
225        # type: (int, Type[Packet]) -> None
226        self.num2layer[num] = layer
227
228    def register_layer2num(self, num, layer):
229        # type: (int, Type[Packet]) -> None
230        self.layer2num[layer] = num
231
232    @overload
233    def __getitem__(self, item):
234        # type: (Type[Packet]) -> int
235        pass
236
237    @overload
238    def __getitem__(self, item):  # noqa: F811
239        # type: (int) -> Type[Packet]
240        pass
241
242    def __getitem__(self, item):  # noqa: F811
243        # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]]
244        if isinstance(item, int):
245            return self.num2layer[item]
246        else:
247            return self.layer2num[item]
248
249    def __contains__(self, item):
250        # type: (Union[int, Type[Packet]]) -> bool
251        if isinstance(item, int):
252            return item in self.num2layer
253        else:
254            return item in self.layer2num
255
256    def get(self,
257            item,  # type: Union[int, Type[Packet]]
258            default=None,  # type: Optional[Type[Packet]]
259            ):
260        # type: (...) -> Optional[Union[int, Type[Packet]]]
261        return self[item] if item in self else default
262
263    def __repr__(self):
264        # type: () -> str
265        lst = []
266        for num, layer in self.num2layer.items():
267            if layer in self.layer2num and self.layer2num[layer] == num:
268                dir = "<->"
269            else:
270                dir = " ->"
271            lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__,
272                                                     layer._name)))
273        for layer, num in self.layer2num.items():
274            if num not in self.num2layer or self.num2layer[num] != layer:
275                lst.append((num, "%#6x <-  %-20s (%s)" % (num, layer.__name__,
276                                                          layer._name)))
277        lst.sort()
278        return "\n".join(y for x, y in lst)
279
280
281class LayersList(List[Type['scapy.packet.Packet']]):
282    def __init__(self):
283        # type: () -> None
284        list.__init__(self)
285        self.ldict = {}  # type: Dict[str, List[Type[Packet]]]
286        self.filtered = False
287        self._backup_dict = {}  # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]]  # noqa: E501
288
289    def __repr__(self):
290        # type: () -> str
291        return "\n".join("%-20s: %s" % (layer.__name__, layer.name)
292                         for layer in self)
293
294    def register(self, layer):
295        # type: (Type[Packet]) -> None
296        self.append(layer)
297        if layer.__module__ not in self.ldict:
298            self.ldict[layer.__module__] = []
299        self.ldict[layer.__module__].append(layer)
300
301    def layers(self):
302        # type: () -> List[Tuple[str, str]]
303        result = []
304        # This import may feel useless, but it is required for the eval below
305        import scapy  # noqa: F401
306        try:
307            import builtins  # noqa: F401
308        except ImportError:
309            import __builtin__  # noqa: F401
310        for lay in self.ldict:
311            doc = eval(lay).__doc__
312            result.append((lay, doc.strip().split("\n")[0] if doc else lay))
313        return result
314
315    def filter(self, items):
316        # type: (List[Type[Packet]]) -> None
317        """Disable dissection of unused layers to speed up dissection"""
318        if self.filtered:
319            raise ValueError("Already filtered. Please disable it first")
320        for lay in self.ldict.values():
321            for cls in lay:
322                if cls not in self._backup_dict:
323                    self._backup_dict[cls] = cls.payload_guess[:]
324                    cls.payload_guess = [
325                        y for y in cls.payload_guess if y[1] in items
326                    ]
327        self.filtered = True
328
329    def unfilter(self):
330        # type: () -> None
331        """Re-enable dissection for all layers"""
332        if not self.filtered:
333            raise ValueError("Not filtered. Please filter first")
334        for lay in self.ldict.values():
335            for cls in lay:
336                cls.payload_guess = self._backup_dict[cls]
337        self._backup_dict.clear()
338        self.filtered = False
339
340
341class CommandsList(List[Callable[..., Any]]):
342    def __repr__(self):
343        # type: () -> str
344        s = []
345        for li in sorted(self, key=lambda x: x.__name__):
346            doc = li.__doc__ if li.__doc__ else "--"
347            doc = doc.lstrip().split('\n', 1)[0]
348            s.append("%-22s: %s" % (li.__name__, doc))
349        return "\n".join(s)
350
351    def register(self, cmd):
352        # type: (DecoratorCallable) -> DecoratorCallable
353        self.append(cmd)
354        return cmd  # return cmd so that method can be used as a decorator
355
356
357def lsc():
358    # type: () -> None
359    """Displays Scapy's default commands"""
360    print(repr(conf.commands))
361
362
363class CacheInstance(Dict[str, Any]):
364    __slots__ = ["timeout", "name", "_timetable"]
365
366    def __init__(self, name="noname", timeout=None):
367        # type: (str, Optional[int]) -> None
368        self.timeout = timeout
369        self.name = name
370        self._timetable = {}  # type: Dict[str, float]
371
372    def flush(self):
373        # type: () -> None
374        self._timetable.clear()
375        self.clear()
376
377    def __getitem__(self, item):
378        # type: (str) -> Any
379        if item in self.__slots__:
380            return object.__getattribute__(self, item)
381        if not self.__contains__(item):
382            raise KeyError(item)
383        return super(CacheInstance, self).__getitem__(item)
384
385    def __contains__(self, item):
386        if not super(CacheInstance, self).__contains__(item):
387            return False
388        if self.timeout is not None:
389            t = self._timetable[item]
390            if time.time() - t > self.timeout:
391                return False
392        return True
393
394    def get(self, item, default=None):
395        # type: (str, Optional[Any]) -> Any
396        # overloading this method is needed to force the dict to go through
397        # the timetable check
398        try:
399            return self[item]
400        except KeyError:
401            return default
402
403    def __setitem__(self, item, v):
404        # type: (str, str) -> None
405        if item in self.__slots__:
406            return object.__setattr__(self, item, v)
407        self._timetable[item] = time.time()
408        super(CacheInstance, self).__setitem__(item, v)
409
410    def update(self,
411               other,  # type: Any
412               **kwargs  # type: Any
413               ):
414        # type: (...) -> None
415        for key, value in other.items():
416            # We only update an element from `other` either if it does
417            # not exist in `self` or if the entry in `self` is older.
418            if key not in self or self._timetable[key] < other._timetable[key]:
419                dict.__setitem__(self, key, value)
420                self._timetable[key] = other._timetable[key]
421
422    def iteritems(self):
423        # type: () -> Iterator[Tuple[str, Any]]
424        if self.timeout is None:
425            return super(CacheInstance, self).items()
426        t0 = time.time()
427        return (
428            (k, v)
429            for (k, v) in super(CacheInstance, self).items()
430            if t0 - self._timetable[k] < self.timeout
431        )
432
433    def iterkeys(self):
434        # type: () -> Iterator[str]
435        if self.timeout is None:
436            return super(CacheInstance, self).keys()
437        t0 = time.time()
438        return (
439            k
440            for k in super(CacheInstance, self).keys()
441            if t0 - self._timetable[k] < self.timeout
442        )
443
444    def __iter__(self):
445        # type: () -> Iterator[str]
446        return self.iterkeys()
447
448    def itervalues(self):
449        # type: () -> Iterator[Tuple[str, Any]]
450        if self.timeout is None:
451            return super(CacheInstance, self).values()
452        t0 = time.time()
453        return (
454            v
455            for (k, v) in super(CacheInstance, self).items()
456            if t0 - self._timetable[k] < self.timeout
457        )
458
459    def items(self):
460        # type: () -> Any
461        return list(self.iteritems())
462
463    def keys(self):
464        # type: () -> Any
465        return list(self.iterkeys())
466
467    def values(self):
468        # type: () -> Any
469        return list(self.itervalues())
470
471    def __len__(self):
472        # type: () -> int
473        if self.timeout is None:
474            return super(CacheInstance, self).__len__()
475        return len(self.keys())
476
477    def summary(self):
478        # type: () -> str
479        return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout)  # noqa: E501
480
481    def __repr__(self):
482        # type: () -> str
483        s = []
484        if self:
485            mk = max(len(k) for k in self)
486            fmt = "%%-%is %%s" % (mk + 1)
487            for item in self.items():
488                s.append(fmt % item)
489        return "\n".join(s)
490
491    def copy(self):
492        # type: () -> CacheInstance
493        return copy.copy(self)
494
495
496class NetCache:
497    def __init__(self):
498        # type: () -> None
499        self._caches_list = []  # type: List[CacheInstance]
500
501    def add_cache(self, cache):
502        # type: (CacheInstance) -> None
503        self._caches_list.append(cache)
504        setattr(self, cache.name, cache)
505
506    def new_cache(self, name, timeout=None):
507        # type: (str, Optional[int]) -> CacheInstance
508        c = CacheInstance(name=name, timeout=timeout)
509        self.add_cache(c)
510        return c
511
512    def __delattr__(self, attr):
513        # type: (str) -> NoReturn
514        raise AttributeError("Cannot delete attributes")
515
516    def update(self, other):
517        # type: (NetCache) -> None
518        for co in other._caches_list:
519            if hasattr(self, co.name):
520                getattr(self, co.name).update(co)
521            else:
522                self.add_cache(co.copy())
523
524    def flush(self):
525        # type: () -> None
526        for c in self._caches_list:
527            c.flush()
528
529    def __repr__(self):
530        # type: () -> str
531        return "\n".join(c.summary() for c in self._caches_list)
532
533
534class ScapyExt:
535    __slots__ = ["specs", "name", "version"]
536
537    class MODE(Enum):
538        LAYERS = "layers"
539        CONTRIB = "contrib"
540        MODULES = "modules"
541
542    @dataclass
543    class ScapyExtSpec:
544        fullname: str
545        mode: 'ScapyExt.MODE'
546        spec: Any
547        default: bool
548
549    def __init__(self):
550        self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {}
551
552    def config(self, name, version):
553        self.name = name
554        self.version = version
555
556    def register(self, name, mode, path, default=None):
557        assert mode in self.MODE, "mode must be one of ScapyExt.MODE !"
558        fullname = f"scapy.{mode.value}.{name}"
559        spec = importlib.util.spec_from_file_location(
560            fullname,
561            str(path),
562        )
563        spec = self.ScapyExtSpec(
564            fullname=fullname,
565            mode=mode,
566            spec=spec,
567            default=default or False,
568        )
569        if default is None:
570            spec.default = bool(importlib.util.find_spec(spec.fullname))
571        self.specs[fullname] = spec
572
573    def __repr__(self):
574        return "<ScapyExt %s %s (%s specs)>" % (
575            self.name,
576            self.version,
577            len(self.specs),
578        )
579
580
581class ExtsManager(importlib.abc.MetaPathFinder):
582    __slots__ = ["exts", "_loaded", "all_specs"]
583
584    SCAPY_PLUGIN_CLASSIFIER = 'Framework :: Scapy'
585    GPLV2_CLASSIFIERS = [
586        'License :: OSI Approved :: GNU General Public License v2 (GPLv2)',
587        'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)',
588    ]
589
590    def __init__(self):
591        self.exts: List[ScapyExt] = []
592        self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {}
593        self._loaded = []
594
595    def find_spec(self, fullname, path, target=None):
596        if fullname in self.all_specs:
597            return self.all_specs[fullname].spec
598
599    def invalidate_caches(self):
600        pass
601
602    def _register_spec(self, spec):
603        self.all_specs[spec.fullname] = spec
604        if spec.default:
605            loader = importlib.util.LazyLoader(spec.spec.loader)
606            spec.spec.loader = loader
607            module = importlib.util.module_from_spec(spec.spec)
608            sys.modules[spec.fullname] = module
609            loader.exec_module(module)
610
611    def load(self):
612        try:
613            import importlib.metadata
614        except ImportError:
615            return
616        for distr in importlib.metadata.distributions():
617            if any(
618                v == self.SCAPY_PLUGIN_CLASSIFIER
619                for k, v in distr.metadata.items() if k == 'Classifier'
620            ):
621                try:
622                    pkg = next(
623                        k
624                        for k, v in importlib.metadata.packages_distributions().items()
625                        if distr.name in v
626                    )
627                except KeyError:
628                    pkg = distr.name
629                if pkg in self._loaded:
630                    continue
631                if not any(
632                    v in self.GPLV2_CLASSIFIERS
633                    for k, v in distr.metadata.items() if k == 'Classifier'
634                ):
635                    log_loading.warning(
636                        "'%s' has no GPLv2 classifier therefore cannot be loaded." % pkg  # noqa: E501
637                    )
638                    continue
639                self._loaded.append(pkg)
640                ext = ScapyExt()
641                try:
642                    scapy_ext = importlib.import_module(pkg)
643                except Exception as ex:
644                    log_loading.warning(
645                        "'%s' failed during import with %s" % (
646                            pkg,
647                            ex
648                        )
649                    )
650                    continue
651                try:
652                    scapy_ext_func = scapy_ext.scapy_ext
653                except AttributeError:
654                    log_loading.info(
655                        "'%s' included the Scapy Framework specifier "
656                        "but did not include a scapy_ext" % pkg
657                    )
658                    continue
659                try:
660                    scapy_ext_func(ext)
661                except Exception as ex:
662                    log_loading.warning(
663                        "'%s' failed during initialization with %s" % (
664                            pkg,
665                            ex
666                        )
667                    )
668                    continue
669                for spec in ext.specs.values():
670                    self._register_spec(spec)
671                self.exts.append(ext)
672        if self not in sys.meta_path:
673            sys.meta_path.append(self)
674
675    def __repr__(self):
676        from scapy.utils import pretty_list
677        return pretty_list(
678            [
679                (x.name, x.version, [y.fullname for y in x.specs.values()])
680                for x in self.exts
681            ],
682            [("Name", "Version", "Specs")],
683            sortBy=0,
684        )
685
686
687def _version_checker(module, minver):
688    # type: (ModuleType, Tuple[int, ...]) -> bool
689    """Checks that module has a higher version that minver.
690
691    params:
692     - module: a module to test
693     - minver: a tuple of versions
694    """
695    # We could use LooseVersion, but distutils imports imp which is deprecated
696    version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?'
697    version_tags_r = re.match(
698        version_regexp,
699        getattr(module, "__version__", "")
700    )
701    if not version_tags_r:
702        return False
703    version_tags_i = version_tags_r.group(1).split(".")
704    version_tags = tuple(int(x) for x in version_tags_i)
705    return bool(version_tags >= minver)
706
707
708def isCryptographyValid():
709    # type: () -> bool
710    """
711    Check if the cryptography module >= 2.0.0 is present. This is the minimum
712    version for most usages in Scapy.
713    """
714    try:
715        import cryptography
716    except ImportError:
717        return False
718    return _version_checker(cryptography, (2, 0, 0))
719
720
721def isCryptographyAdvanced():
722    # type: () -> bool
723    """
724    Check if the cryptography module is present, and if it supports X25519,
725    ChaCha20Poly1305 and such.
726
727    Notes:
728    - cryptography >= 2.0 is required
729    - OpenSSL >= 1.1.0 is required
730    """
731    try:
732        from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey  # noqa: E501
733        X25519PrivateKey.generate()
734    except Exception:
735        return False
736    else:
737        return True
738
739
740def isPyPy():
741    # type: () -> bool
742    """Returns either scapy is running under PyPy or not"""
743    try:
744        import __pypy__  # noqa: F401
745        return True
746    except ImportError:
747        return False
748
749
750def _prompt_changer(attr, val, old):
751    # type: (str, Any, Any) -> Any
752    """Change the current prompt theme"""
753    Interceptor.set_from_hook(conf, attr, val)
754    try:
755        sys.ps1 = conf.color_theme.prompt(conf.prompt)
756    except Exception:
757        pass
758    try:
759        apply_ipython_style(
760            get_ipython()  # type: ignore
761        )
762    except NameError:
763        pass
764    return getattr(conf, attr, old)
765
766
767def _set_conf_sockets():
768    # type: () -> None
769    """Populate the conf.L2Socket and conf.L3Socket
770    according to the various use_* parameters
771    """
772    if conf.use_bpf and not BSD:
773        Interceptor.set_from_hook(conf, "use_bpf", False)
774        raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !")
775    if not conf.use_pcap and SOLARIS:
776        Interceptor.set_from_hook(conf, "use_pcap", True)
777        raise ScapyInvalidPlatformException(
778            "Scapy only supports libpcap on Solaris !"
779        )
780    # we are already in an Interceptor hook, use Interceptor.set_from_hook
781    if conf.use_pcap:
782        try:
783            from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \
784                L3pcapSocket
785        except (OSError, ImportError):
786            log_loading.warning("No libpcap provider available ! pcap won't be used")
787            Interceptor.set_from_hook(conf, "use_pcap", False)
788        else:
789            conf.L3socket = L3pcapSocket
790            conf.L3socket6 = functools.partial(
791                L3pcapSocket, filter="ip6")
792            conf.L2socket = L2pcapSocket
793            conf.L2listen = L2pcapListenSocket
794    elif conf.use_bpf:
795        from scapy.arch.bpf.supersocket import L2bpfListenSocket, \
796            L2bpfSocket, L3bpfSocket
797        conf.L3socket = L3bpfSocket
798        conf.L3socket6 = functools.partial(
799            L3bpfSocket, filter="ip6")
800        conf.L2socket = L2bpfSocket
801        conf.L2listen = L2bpfListenSocket
802    elif LINUX:
803        from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket
804        conf.L3socket = L3PacketSocket
805        conf.L3socket6 = cast(
806            "Type[SuperSocket]",
807            functools.partial(
808                L3PacketSocket,
809                filter="ip6"
810            )
811        )
812        conf.L2socket = L2Socket
813        conf.L2listen = L2ListenSocket
814    elif WINDOWS:
815        from scapy.arch.windows import _NotAvailableSocket
816        from scapy.arch.windows.native import L3WinSocket, L3WinSocket6
817        conf.L3socket = L3WinSocket
818        conf.L3socket6 = L3WinSocket6
819        conf.L2socket = _NotAvailableSocket
820        conf.L2listen = _NotAvailableSocket
821    else:
822        from scapy.supersocket import L3RawSocket, L3RawSocket6
823        conf.L3socket = L3RawSocket
824        conf.L3socket6 = L3RawSocket6
825    # Reload the interfaces
826    conf.ifaces.reload()
827
828
829def _socket_changer(attr, val, old):
830    # type: (str, bool, bool) -> Any
831    if not isinstance(val, bool):
832        raise TypeError("This argument should be a boolean")
833    Interceptor.set_from_hook(conf, attr, val)
834    dependencies = {  # Things that will be turned off
835        "use_pcap": ["use_bpf"],
836        "use_bpf": ["use_pcap"],
837    }
838    restore = {k: getattr(conf, k) for k in dependencies}
839    del restore[attr]  # This is handled directly by _set_conf_sockets
840    if val:  # Only if True
841        for param in dependencies[attr]:
842            Interceptor.set_from_hook(conf, param, False)
843    try:
844        _set_conf_sockets()
845    except (ScapyInvalidPlatformException, ImportError) as e:
846        for key, value in restore.items():
847            Interceptor.set_from_hook(conf, key, value)
848        if isinstance(e, ScapyInvalidPlatformException):
849            raise
850    return getattr(conf, attr)
851
852
853def _loglevel_changer(attr, val, old):
854    # type: (str, int, int) -> int
855    """Handle a change of conf.logLevel"""
856    log_scapy.setLevel(val)
857    return val
858
859
860def _iface_changer(attr, val, old):
861    # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface'
862    """Resolves the interface in conf.iface"""
863    if isinstance(val, str):
864        from scapy.interfaces import resolve_iface
865        iface = resolve_iface(val)
866        if old and iface.dummy:
867            warning(
868                "This interface is not specified in any provider ! "
869                "See conf.ifaces output"
870            )
871        return iface
872    return val
873
874
875def _reset_tls_nss_keys(attr, val, old):
876    # type: (str, Any, Any) -> Any
877    """Reset conf.tls_nss_keys when conf.tls_nss_filename changes"""
878    conf.tls_nss_keys = None
879    return val
880
881
882class Conf(ConfClass):
883    """
884    This object contains the configuration of Scapy.
885    """
886    version: str = ReadOnlyAttribute("version", VERSION)
887    session: str = ""  #: filename where the session will be saved
888    interactive = False
889    #: can be "ipython", "bpython", "ptpython", "ptipython", "python" or "auto".
890    #: Default: Auto
891    interactive_shell = "auto"
892    #: Configuration for "ipython" to use jedi (disabled by default)
893    ipython_use_jedi = False
894    #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...)
895    stealth = "not implemented"
896    #: selects the default output interface for srp() and sendp().
897    iface = Interceptor("iface", None, _iface_changer)  # type: 'scapy.interfaces.NetworkInterface'  # noqa: E501
898    layers: LayersList = LayersList()
899    commands = CommandsList()  # type: CommandsList
900    #: Codec used by default for ASN1 objects
901    ASN1_default_codec = None  # type: 'scapy.asn1.asn1.ASN1Codec'
902    #: Default size for ASN1 objects
903    ASN1_default_long_size = 0
904    #: choose the AS resolver class to use
905    AS_resolver = None  # type: scapy.as_resolvers.AS_resolver
906    dot15d4_protocol = None  # Used in dot15d4.py
907    logLevel: int = Interceptor("logLevel", log_scapy.level, _loglevel_changer)
908    #: if 0, doesn't check that IPID matches between IP sent and
909    #: ICMP IP citation received
910    #: if 1, checks that they either are equal or byte swapped
911    #: equals (bug in some IP stacks)
912    #: if 2, strictly checks that they are equals
913    checkIPID = False
914    #: if 1, checks IP src in IP and ICMP IP citation match
915    #: (bug in some NAT stacks)
916    checkIPsrc = True
917    checkIPaddr = True
918    #: if True, checks that IP-in-IP layers match. If False, do
919    #: not check IP layers that encapsulates another IP layer
920    checkIPinIP = True
921    #: if 1, also check that TCP seq and ack match the
922    #: ones in ICMP citation
923    check_TCPerror_seqack = False
924    verb = 2  #: level of verbosity, from 0 (almost mute) to 3 (verbose)
925    prompt: str = Interceptor("prompt", ">>> ", _prompt_changer)
926    #: default mode for the promiscuous mode of a socket (to get answers if you
927    #: spoof on a lan)
928    sniff_promisc = True  # type: bool
929    raw_layer = None  # type: Type[Packet]
930    raw_summary = False  # type: Union[bool, Callable[[bytes], Any]]
931    padding_layer = None  # type: Type[Packet]
932    default_l2 = None  # type: Type[Packet]
933    l2types: Num2Layer = Num2Layer()
934    l3types: Num2Layer = Num2Layer()
935    L3socket = None  # type: Type[scapy.supersocket.SuperSocket]
936    L3socket6 = None  # type: Type[scapy.supersocket.SuperSocket]
937    L2socket = None  # type: Type[scapy.supersocket.SuperSocket]
938    L2listen = None  # type: Type[scapy.supersocket.SuperSocket]
939    BTsocket = None  # type: Type[scapy.supersocket.SuperSocket]
940    min_pkt_size = 60
941    #: holds MIB direct access dictionary
942    mib = None  # type: 'scapy.asn1.mib.MIBDict'
943    bufsize = 2**16
944    #: history file
945    histfile: str = os.getenv(
946        'SCAPY_HISTFILE',
947        os.path.join(
948            os.path.expanduser("~"),
949            ".config", "scapy", "history"
950        )
951    )
952    #: includes padding in disassembled packets
953    padding = 1
954    #: BPF filter for packets to ignore
955    except_filter = ""
956    #: bpf filter added to every sniffing socket to exclude traffic
957    #: from analysis
958    filter = ""
959    #: when 1, store received packet that are not matched into `debug.recv`
960    debug_match = False
961    #: When 1, print some TLS session secrets when they are computed, and
962    #: warn about the session recognition.
963    debug_tls = False
964    wepkey = ""
965    #: holds the Scapy interface list and manager
966    ifaces = None  # type: 'scapy.interfaces.NetworkInterfaceDict'
967    #: holds the cache of interfaces loaded from Libpcap
968    cache_pcapiflist = {}  # type: Dict[str, Tuple[str, List[str], Any, str, int]]
969    # `neighbor` will be filed by scapy.layers.l2
970    neighbor = None  # type: 'scapy.layers.l2.Neighbor'
971    #: holds the name servers IP/hosts used for custom DNS resolution
972    nameservers = None  # type: str
973    #: automatically load IPv4 routes on startup. Disable this if your
974    #: routing table is too big.
975    route_autoload = True
976    #: automatically load IPv6 routes on startup. Disable this if your
977    #: routing table is too big.
978    route6_autoload = True
979    #: holds the Scapy IPv4 routing table and provides methods to
980    #: manipulate it
981    route = None  # type: 'scapy.route.Route'
982    # `route` will be filed by route.py
983    #: holds the Scapy IPv6 routing table and provides methods to
984    #: manipulate it
985    route6 = None  # type: 'scapy.route6.Route6'
986    manufdb = None  # type: 'scapy.data.ManufDA'
987    ethertypes = None  # type: 'scapy.data.EtherDA'
988    protocols = None  # type: 'scapy.dadict.DADict[int, str]'
989    services_udp = None  # type: 'scapy.dadict.DADict[int, str]'
990    services_tcp = None  # type: 'scapy.dadict.DADict[int, str]'
991    services_sctp = None  # type: 'scapy.dadict.DADict[int, str]'
992    # 'route6' will be filed by route6.py
993    teredoPrefix = ""  # type: str
994    teredoServerPort = None  # type: int
995    auto_fragment = True
996    #: raise exception when a packet dissector raises an exception
997    debug_dissector = False
998    color_theme: ColorTheme = Interceptor("color_theme", NoTheme(), _prompt_changer)
999    #: how much time between warnings from the same place
1000    warning_threshold = 5
1001    prog: ProgPath = ProgPath()
1002    #: holds list of fields for which resolution should be done
1003    resolve: Resolve = Resolve()
1004    #: holds list of enum fields for which conversion to string
1005    #: should NOT be done
1006    noenum: Resolve = Resolve()
1007    emph: Emphasize = Emphasize()
1008    #: read only attribute to show if PyPy is in use
1009    use_pypy: bool = ReadOnlyAttribute("use_pypy", isPyPy())
1010    #: use libpcap integration or not. Changing this value will update
1011    #: the conf.L[2/3] sockets
1012    use_pcap: bool = Interceptor(
1013        "use_pcap",
1014        os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"),
1015        _socket_changer
1016    )
1017    use_bpf: bool = Interceptor("use_bpf", False, _socket_changer)
1018    use_npcap = False
1019    ipv6_enabled: bool = socket.has_ipv6
1020    stats_classic_protocols = []  # type: List[Type[Packet]]
1021    stats_dot11_protocols = []  # type: List[Type[Packet]]
1022    temp_files = []  # type: List[str]
1023    #: netcache holds time-based caches for net operations
1024    netcache: NetCache = NetCache()
1025    geoip_city = None
1026    # can, tls, http and a few others are not loaded by default
1027    load_layers: List[str] = [
1028        'bluetooth',
1029        'bluetooth4LE',
1030        'dcerpc',
1031        'dhcp',
1032        'dhcp6',
1033        'dns',
1034        'dot11',
1035        'dot15d4',
1036        'eap',
1037        'gprs',
1038        'gssapi',
1039        'hsrp',
1040        'inet',
1041        'inet6',
1042        'ipsec',
1043        'ir',
1044        'isakmp',
1045        'kerberos',
1046        'l2',
1047        'l2tp',
1048        'ldap',
1049        'llmnr',
1050        'lltd',
1051        'mgcp',
1052        'mobileip',
1053        'netbios',
1054        'netflow',
1055        'ntlm',
1056        'ntp',
1057        'ppi',
1058        'ppp',
1059        'pptp',
1060        'radius',
1061        'rip',
1062        'rtp',
1063        'sctp',
1064        'sixlowpan',
1065        'skinny',
1066        'smb',
1067        'smb2',
1068        'smbclient',
1069        'smbserver',
1070        'snmp',
1071        'spnego',
1072        'tftp',
1073        'vrrp',
1074        'vxlan',
1075        'x509',
1076        'zigbee'
1077    ]
1078    #: a dict which can be used by contrib layers to store local
1079    #: configuration
1080    contribs = dict()  # type: Dict[str, Any]
1081    exts: ExtsManager = ExtsManager()
1082    crypto_valid = isCryptographyValid()
1083    crypto_valid_advanced = isCryptographyAdvanced()
1084    #: controls whether or not to display the fancy banner
1085    fancy_banner = True
1086    #: controls whether tables (conf.iface, conf.route...) should be cropped
1087    #: to fit the terminal
1088    auto_crop_tables = True
1089    #: how often to check for new packets.
1090    #: Defaults to 0.05s.
1091    recv_poll_rate = 0.05
1092    #: When True, raise exception if no dst MAC found otherwise broadcast.
1093    #: Default is False.
1094    raise_no_dst_mac = False
1095    loopback_name: str = "lo" if LINUX else "lo0"
1096    nmap_base = ""  # type: str
1097    nmap_kdb = None  # type: Optional[NmapKnowledgeBase]
1098    #: a safety mechanism: the maximum amount of items included in a PacketListField
1099    #: or a FieldListField
1100    max_list_count = 100
1101    #: When the TLS module is loaded (not by default), the following turns on sessions
1102    tls_session_enable = False
1103    #: Filename containing NSS Keys Log
1104    tls_nss_filename = Interceptor(
1105        "tls_nss_filename",
1106        None,
1107        _reset_tls_nss_keys
1108    )
1109    #: Dictionary containing parsed NSS Keys
1110    tls_nss_keys: Dict[str, bytes] = None
1111    #: When TCPSession is used, parse DCE/RPC sessions automatically.
1112    #: This should be used for passive sniffing.
1113    dcerpc_session_enable = False
1114    #: If a capture is missing the first DCE/RPC binding message, we might incorrectly
1115    #: assume that header signing isn't used. This forces it on.
1116    dcerpc_force_header_signing = False
1117    #: Windows SSPs for sniffing. This is used with
1118    #: dcerpc_session_enable
1119    winssps_passive = []
1120
1121    def __getattribute__(self, attr):
1122        # type: (str) -> Any
1123        # Those are loaded on runtime to avoid import loops
1124        if attr == "manufdb":
1125            from scapy.data import MANUFDB
1126            return MANUFDB
1127        if attr == "ethertypes":
1128            from scapy.data import ETHER_TYPES
1129            return ETHER_TYPES
1130        if attr == "protocols":
1131            from scapy.data import IP_PROTOS
1132            return IP_PROTOS
1133        if attr == "services_udp":
1134            from scapy.data import UDP_SERVICES
1135            return UDP_SERVICES
1136        if attr == "services_tcp":
1137            from scapy.data import TCP_SERVICES
1138            return TCP_SERVICES
1139        if attr == "services_sctp":
1140            from scapy.data import SCTP_SERVICES
1141            return SCTP_SERVICES
1142        if attr == "iface6":
1143            warnings.warn(
1144                "conf.iface6 is deprecated in favor of conf.iface",
1145                DeprecationWarning
1146            )
1147            attr = "iface"
1148        return object.__getattribute__(self, attr)
1149
1150
1151if not Conf.ipv6_enabled:
1152    log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.")  # noqa: E501
1153    for m in ["inet6", "dhcp6", "sixlowpan"]:
1154        if m in Conf.load_layers:
1155            Conf.load_layers.remove(m)
1156
1157conf = Conf()  # type: Conf
1158
1159# Python 3.8 Only
1160if sys.version_info >= (3, 8):
1161    conf.exts.load()
1162
1163
1164def crypto_validator(func):
1165    # type: (DecoratorCallable) -> DecoratorCallable
1166    """
1167    This a decorator to be used for any method relying on the cryptography library.  # noqa: E501
1168    Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'.
1169    """
1170    def func_in(*args, **kwargs):
1171        # type: (*Any, **Any) -> Any
1172        if not conf.crypto_valid:
1173            raise ImportError("Cannot execute crypto-related method! "
1174                              "Please install python-cryptography v1.7 or later.")  # noqa: E501
1175        return func(*args, **kwargs)
1176    return func_in
1177
1178
1179def scapy_delete_temp_files():
1180    # type: () -> None
1181    for f in conf.temp_files:
1182        try:
1183            os.unlink(f)
1184        except Exception:
1185            pass
1186    del conf.temp_files[:]
1187
1188
1189atexit.register(scapy_delete_temp_files)
1190