1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7Implementation of the configuration object. 8""" 9 10 11import atexit 12import copy 13import functools 14import os 15import re 16import socket 17import sys 18import time 19import warnings 20 21from dataclasses import dataclass 22from enum import Enum 23 24import importlib 25import importlib.abc 26import importlib.util 27 28import scapy 29from scapy import VERSION 30from scapy.base_classes import BasePacket 31from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS 32from scapy.error import ( 33 log_loading, 34 log_scapy, 35 ScapyInvalidPlatformException, 36 warning, 37) 38from scapy.themes import ColorTheme, NoTheme, apply_ipython_style 39 40# Typing imports 41from typing import ( 42 cast, 43 Any, 44 Callable, 45 Dict, 46 Iterator, 47 List, 48 NoReturn, 49 Optional, 50 Set, 51 Tuple, 52 Type, 53 Union, 54 overload, 55 TYPE_CHECKING, 56) 57from types import ModuleType 58from scapy.compat import DecoratorCallable 59 60if TYPE_CHECKING: 61 # Do not import at runtime 62 import scapy.as_resolvers 63 from scapy.modules.nmap import NmapKnowledgeBase 64 from scapy.packet import Packet 65 from scapy.supersocket import SuperSocket # noqa: F401 66 import scapy.asn1.asn1 67 import scapy.asn1.mib 68 69############ 70# Config # 71############ 72 73 74class ConfClass(object): 75 def configure(self, cnf): 76 # type: (ConfClass) -> None 77 self.__dict__ = cnf.__dict__.copy() 78 79 def __repr__(self): 80 # type: () -> str 81 return str(self) 82 83 def __str__(self): 84 # type: () -> str 85 s = "" 86 dkeys = self.__class__.__dict__.copy() 87 dkeys.update(self.__dict__) 88 keys = sorted(dkeys) 89 for i in keys: 90 if i[0] != "_": 91 r = repr(getattr(self, i)) 92 r = " ".join(r.split()) 93 wlen = 76 - max(len(i), 10) 94 if len(r) > wlen: 95 r = r[:wlen - 3] + "..." 96 s += "%-10s = %s\n" % (i, r) 97 return s[:-1] 98 99 100class Interceptor(object): 101 def __init__(self, 102 name, # type: str 103 default, # type: Any 104 hook, # type: Callable[..., Any] 105 args=None, # type: Optional[List[Any]] 106 kargs=None # type: Optional[Dict[str, Any]] 107 ): 108 # type: (...) -> None 109 self.name = name 110 self.intname = "_intercepted_%s" % name 111 self.default = default 112 self.hook = hook 113 self.args = args if args is not None else [] 114 self.kargs = kargs if kargs is not None else {} 115 116 def __get__(self, obj, typ=None): 117 # type: (Conf, Optional[type]) -> Any 118 if not hasattr(obj, self.intname): 119 setattr(obj, self.intname, self.default) 120 return getattr(obj, self.intname) 121 122 @staticmethod 123 def set_from_hook(obj, name, val): 124 # type: (Conf, str, bool) -> None 125 int_name = "_intercepted_%s" % name 126 setattr(obj, int_name, val) 127 128 def __set__(self, obj, val): 129 # type: (Conf, Any) -> None 130 old = getattr(obj, self.intname, self.default) 131 val = self.hook(self.name, val, old, *self.args, **self.kargs) 132 setattr(obj, self.intname, val) 133 134 135def _readonly(name): 136 # type: (str) -> NoReturn 137 default = Conf.__dict__[name].default 138 Interceptor.set_from_hook(conf, name, default) 139 raise ValueError("Read-only value !") 140 141 142ReadOnlyAttribute = functools.partial( 143 Interceptor, 144 hook=(lambda name, *args, **kwargs: _readonly(name)) 145) 146ReadOnlyAttribute.__doc__ = "Read-only class attribute" 147 148 149class ProgPath(ConfClass): 150 _default: str = "<System default>" 151 universal_open: str = "open" if DARWIN else "xdg-open" 152 pdfreader: str = universal_open 153 psreader: str = universal_open 154 svgreader: str = universal_open 155 dot: str = "dot" 156 display: str = "display" 157 tcpdump: str = "tcpdump" 158 tcpreplay: str = "tcpreplay" 159 hexedit: str = "hexer" 160 tshark: str = "tshark" 161 wireshark: str = "wireshark" 162 ifconfig: str = "ifconfig" 163 extcap_folders: List[str] = [ 164 os.path.join(os.path.expanduser("~"), ".config", "wireshark", "extcap"), 165 "/usr/lib/x86_64-linux-gnu/wireshark/extcap", 166 ] 167 168 169class ConfigFieldList: 170 def __init__(self): 171 # type: () -> None 172 self.fields = set() # type: Set[Any] 173 self.layers = set() # type: Set[Any] 174 175 @staticmethod 176 def _is_field(f): 177 # type: (Any) -> bool 178 return hasattr(f, "owners") 179 180 def _recalc_layer_list(self): 181 # type: () -> None 182 self.layers = {owner for f in self.fields for owner in f.owners} 183 184 def add(self, *flds): 185 # type: (*Any) -> None 186 self.fields |= {f for f in flds if self._is_field(f)} 187 self._recalc_layer_list() 188 189 def remove(self, *flds): 190 # type: (*Any) -> None 191 self.fields -= set(flds) 192 self._recalc_layer_list() 193 194 def __contains__(self, elt): 195 # type: (Any) -> bool 196 if isinstance(elt, BasePacket): 197 return elt in self.layers 198 return elt in self.fields 199 200 def __repr__(self): 201 # type: () -> str 202 return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields)) # noqa: E501 203 204 205class Emphasize(ConfigFieldList): 206 pass 207 208 209class Resolve(ConfigFieldList): 210 pass 211 212 213class Num2Layer: 214 def __init__(self): 215 # type: () -> None 216 self.num2layer = {} # type: Dict[int, Type[Packet]] 217 self.layer2num = {} # type: Dict[Type[Packet], int] 218 219 def register(self, num, layer): 220 # type: (int, Type[Packet]) -> None 221 self.register_num2layer(num, layer) 222 self.register_layer2num(num, layer) 223 224 def register_num2layer(self, num, layer): 225 # type: (int, Type[Packet]) -> None 226 self.num2layer[num] = layer 227 228 def register_layer2num(self, num, layer): 229 # type: (int, Type[Packet]) -> None 230 self.layer2num[layer] = num 231 232 @overload 233 def __getitem__(self, item): 234 # type: (Type[Packet]) -> int 235 pass 236 237 @overload 238 def __getitem__(self, item): # noqa: F811 239 # type: (int) -> Type[Packet] 240 pass 241 242 def __getitem__(self, item): # noqa: F811 243 # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]] 244 if isinstance(item, int): 245 return self.num2layer[item] 246 else: 247 return self.layer2num[item] 248 249 def __contains__(self, item): 250 # type: (Union[int, Type[Packet]]) -> bool 251 if isinstance(item, int): 252 return item in self.num2layer 253 else: 254 return item in self.layer2num 255 256 def get(self, 257 item, # type: Union[int, Type[Packet]] 258 default=None, # type: Optional[Type[Packet]] 259 ): 260 # type: (...) -> Optional[Union[int, Type[Packet]]] 261 return self[item] if item in self else default 262 263 def __repr__(self): 264 # type: () -> str 265 lst = [] 266 for num, layer in self.num2layer.items(): 267 if layer in self.layer2num and self.layer2num[layer] == num: 268 dir = "<->" 269 else: 270 dir = " ->" 271 lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__, 272 layer._name))) 273 for layer, num in self.layer2num.items(): 274 if num not in self.num2layer or self.num2layer[num] != layer: 275 lst.append((num, "%#6x <- %-20s (%s)" % (num, layer.__name__, 276 layer._name))) 277 lst.sort() 278 return "\n".join(y for x, y in lst) 279 280 281class LayersList(List[Type['scapy.packet.Packet']]): 282 def __init__(self): 283 # type: () -> None 284 list.__init__(self) 285 self.ldict = {} # type: Dict[str, List[Type[Packet]]] 286 self.filtered = False 287 self._backup_dict = {} # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]] # noqa: E501 288 289 def __repr__(self): 290 # type: () -> str 291 return "\n".join("%-20s: %s" % (layer.__name__, layer.name) 292 for layer in self) 293 294 def register(self, layer): 295 # type: (Type[Packet]) -> None 296 self.append(layer) 297 if layer.__module__ not in self.ldict: 298 self.ldict[layer.__module__] = [] 299 self.ldict[layer.__module__].append(layer) 300 301 def layers(self): 302 # type: () -> List[Tuple[str, str]] 303 result = [] 304 # This import may feel useless, but it is required for the eval below 305 import scapy # noqa: F401 306 try: 307 import builtins # noqa: F401 308 except ImportError: 309 import __builtin__ # noqa: F401 310 for lay in self.ldict: 311 doc = eval(lay).__doc__ 312 result.append((lay, doc.strip().split("\n")[0] if doc else lay)) 313 return result 314 315 def filter(self, items): 316 # type: (List[Type[Packet]]) -> None 317 """Disable dissection of unused layers to speed up dissection""" 318 if self.filtered: 319 raise ValueError("Already filtered. Please disable it first") 320 for lay in self.ldict.values(): 321 for cls in lay: 322 if cls not in self._backup_dict: 323 self._backup_dict[cls] = cls.payload_guess[:] 324 cls.payload_guess = [ 325 y for y in cls.payload_guess if y[1] in items 326 ] 327 self.filtered = True 328 329 def unfilter(self): 330 # type: () -> None 331 """Re-enable dissection for all layers""" 332 if not self.filtered: 333 raise ValueError("Not filtered. Please filter first") 334 for lay in self.ldict.values(): 335 for cls in lay: 336 cls.payload_guess = self._backup_dict[cls] 337 self._backup_dict.clear() 338 self.filtered = False 339 340 341class CommandsList(List[Callable[..., Any]]): 342 def __repr__(self): 343 # type: () -> str 344 s = [] 345 for li in sorted(self, key=lambda x: x.__name__): 346 doc = li.__doc__ if li.__doc__ else "--" 347 doc = doc.lstrip().split('\n', 1)[0] 348 s.append("%-22s: %s" % (li.__name__, doc)) 349 return "\n".join(s) 350 351 def register(self, cmd): 352 # type: (DecoratorCallable) -> DecoratorCallable 353 self.append(cmd) 354 return cmd # return cmd so that method can be used as a decorator 355 356 357def lsc(): 358 # type: () -> None 359 """Displays Scapy's default commands""" 360 print(repr(conf.commands)) 361 362 363class CacheInstance(Dict[str, Any]): 364 __slots__ = ["timeout", "name", "_timetable"] 365 366 def __init__(self, name="noname", timeout=None): 367 # type: (str, Optional[int]) -> None 368 self.timeout = timeout 369 self.name = name 370 self._timetable = {} # type: Dict[str, float] 371 372 def flush(self): 373 # type: () -> None 374 self._timetable.clear() 375 self.clear() 376 377 def __getitem__(self, item): 378 # type: (str) -> Any 379 if item in self.__slots__: 380 return object.__getattribute__(self, item) 381 if not self.__contains__(item): 382 raise KeyError(item) 383 return super(CacheInstance, self).__getitem__(item) 384 385 def __contains__(self, item): 386 if not super(CacheInstance, self).__contains__(item): 387 return False 388 if self.timeout is not None: 389 t = self._timetable[item] 390 if time.time() - t > self.timeout: 391 return False 392 return True 393 394 def get(self, item, default=None): 395 # type: (str, Optional[Any]) -> Any 396 # overloading this method is needed to force the dict to go through 397 # the timetable check 398 try: 399 return self[item] 400 except KeyError: 401 return default 402 403 def __setitem__(self, item, v): 404 # type: (str, str) -> None 405 if item in self.__slots__: 406 return object.__setattr__(self, item, v) 407 self._timetable[item] = time.time() 408 super(CacheInstance, self).__setitem__(item, v) 409 410 def update(self, 411 other, # type: Any 412 **kwargs # type: Any 413 ): 414 # type: (...) -> None 415 for key, value in other.items(): 416 # We only update an element from `other` either if it does 417 # not exist in `self` or if the entry in `self` is older. 418 if key not in self or self._timetable[key] < other._timetable[key]: 419 dict.__setitem__(self, key, value) 420 self._timetable[key] = other._timetable[key] 421 422 def iteritems(self): 423 # type: () -> Iterator[Tuple[str, Any]] 424 if self.timeout is None: 425 return super(CacheInstance, self).items() 426 t0 = time.time() 427 return ( 428 (k, v) 429 for (k, v) in super(CacheInstance, self).items() 430 if t0 - self._timetable[k] < self.timeout 431 ) 432 433 def iterkeys(self): 434 # type: () -> Iterator[str] 435 if self.timeout is None: 436 return super(CacheInstance, self).keys() 437 t0 = time.time() 438 return ( 439 k 440 for k in super(CacheInstance, self).keys() 441 if t0 - self._timetable[k] < self.timeout 442 ) 443 444 def __iter__(self): 445 # type: () -> Iterator[str] 446 return self.iterkeys() 447 448 def itervalues(self): 449 # type: () -> Iterator[Tuple[str, Any]] 450 if self.timeout is None: 451 return super(CacheInstance, self).values() 452 t0 = time.time() 453 return ( 454 v 455 for (k, v) in super(CacheInstance, self).items() 456 if t0 - self._timetable[k] < self.timeout 457 ) 458 459 def items(self): 460 # type: () -> Any 461 return list(self.iteritems()) 462 463 def keys(self): 464 # type: () -> Any 465 return list(self.iterkeys()) 466 467 def values(self): 468 # type: () -> Any 469 return list(self.itervalues()) 470 471 def __len__(self): 472 # type: () -> int 473 if self.timeout is None: 474 return super(CacheInstance, self).__len__() 475 return len(self.keys()) 476 477 def summary(self): 478 # type: () -> str 479 return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout) # noqa: E501 480 481 def __repr__(self): 482 # type: () -> str 483 s = [] 484 if self: 485 mk = max(len(k) for k in self) 486 fmt = "%%-%is %%s" % (mk + 1) 487 for item in self.items(): 488 s.append(fmt % item) 489 return "\n".join(s) 490 491 def copy(self): 492 # type: () -> CacheInstance 493 return copy.copy(self) 494 495 496class NetCache: 497 def __init__(self): 498 # type: () -> None 499 self._caches_list = [] # type: List[CacheInstance] 500 501 def add_cache(self, cache): 502 # type: (CacheInstance) -> None 503 self._caches_list.append(cache) 504 setattr(self, cache.name, cache) 505 506 def new_cache(self, name, timeout=None): 507 # type: (str, Optional[int]) -> CacheInstance 508 c = CacheInstance(name=name, timeout=timeout) 509 self.add_cache(c) 510 return c 511 512 def __delattr__(self, attr): 513 # type: (str) -> NoReturn 514 raise AttributeError("Cannot delete attributes") 515 516 def update(self, other): 517 # type: (NetCache) -> None 518 for co in other._caches_list: 519 if hasattr(self, co.name): 520 getattr(self, co.name).update(co) 521 else: 522 self.add_cache(co.copy()) 523 524 def flush(self): 525 # type: () -> None 526 for c in self._caches_list: 527 c.flush() 528 529 def __repr__(self): 530 # type: () -> str 531 return "\n".join(c.summary() for c in self._caches_list) 532 533 534class ScapyExt: 535 __slots__ = ["specs", "name", "version"] 536 537 class MODE(Enum): 538 LAYERS = "layers" 539 CONTRIB = "contrib" 540 MODULES = "modules" 541 542 @dataclass 543 class ScapyExtSpec: 544 fullname: str 545 mode: 'ScapyExt.MODE' 546 spec: Any 547 default: bool 548 549 def __init__(self): 550 self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {} 551 552 def config(self, name, version): 553 self.name = name 554 self.version = version 555 556 def register(self, name, mode, path, default=None): 557 assert mode in self.MODE, "mode must be one of ScapyExt.MODE !" 558 fullname = f"scapy.{mode.value}.{name}" 559 spec = importlib.util.spec_from_file_location( 560 fullname, 561 str(path), 562 ) 563 spec = self.ScapyExtSpec( 564 fullname=fullname, 565 mode=mode, 566 spec=spec, 567 default=default or False, 568 ) 569 if default is None: 570 spec.default = bool(importlib.util.find_spec(spec.fullname)) 571 self.specs[fullname] = spec 572 573 def __repr__(self): 574 return "<ScapyExt %s %s (%s specs)>" % ( 575 self.name, 576 self.version, 577 len(self.specs), 578 ) 579 580 581class ExtsManager(importlib.abc.MetaPathFinder): 582 __slots__ = ["exts", "_loaded", "all_specs"] 583 584 SCAPY_PLUGIN_CLASSIFIER = 'Framework :: Scapy' 585 GPLV2_CLASSIFIERS = [ 586 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)', 587 'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)', 588 ] 589 590 def __init__(self): 591 self.exts: List[ScapyExt] = [] 592 self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {} 593 self._loaded = [] 594 595 def find_spec(self, fullname, path, target=None): 596 if fullname in self.all_specs: 597 return self.all_specs[fullname].spec 598 599 def invalidate_caches(self): 600 pass 601 602 def _register_spec(self, spec): 603 self.all_specs[spec.fullname] = spec 604 if spec.default: 605 loader = importlib.util.LazyLoader(spec.spec.loader) 606 spec.spec.loader = loader 607 module = importlib.util.module_from_spec(spec.spec) 608 sys.modules[spec.fullname] = module 609 loader.exec_module(module) 610 611 def load(self): 612 try: 613 import importlib.metadata 614 except ImportError: 615 return 616 for distr in importlib.metadata.distributions(): 617 if any( 618 v == self.SCAPY_PLUGIN_CLASSIFIER 619 for k, v in distr.metadata.items() if k == 'Classifier' 620 ): 621 try: 622 pkg = next( 623 k 624 for k, v in importlib.metadata.packages_distributions().items() 625 if distr.name in v 626 ) 627 except KeyError: 628 pkg = distr.name 629 if pkg in self._loaded: 630 continue 631 if not any( 632 v in self.GPLV2_CLASSIFIERS 633 for k, v in distr.metadata.items() if k == 'Classifier' 634 ): 635 log_loading.warning( 636 "'%s' has no GPLv2 classifier therefore cannot be loaded." % pkg # noqa: E501 637 ) 638 continue 639 self._loaded.append(pkg) 640 ext = ScapyExt() 641 try: 642 scapy_ext = importlib.import_module(pkg) 643 except Exception as ex: 644 log_loading.warning( 645 "'%s' failed during import with %s" % ( 646 pkg, 647 ex 648 ) 649 ) 650 continue 651 try: 652 scapy_ext_func = scapy_ext.scapy_ext 653 except AttributeError: 654 log_loading.info( 655 "'%s' included the Scapy Framework specifier " 656 "but did not include a scapy_ext" % pkg 657 ) 658 continue 659 try: 660 scapy_ext_func(ext) 661 except Exception as ex: 662 log_loading.warning( 663 "'%s' failed during initialization with %s" % ( 664 pkg, 665 ex 666 ) 667 ) 668 continue 669 for spec in ext.specs.values(): 670 self._register_spec(spec) 671 self.exts.append(ext) 672 if self not in sys.meta_path: 673 sys.meta_path.append(self) 674 675 def __repr__(self): 676 from scapy.utils import pretty_list 677 return pretty_list( 678 [ 679 (x.name, x.version, [y.fullname for y in x.specs.values()]) 680 for x in self.exts 681 ], 682 [("Name", "Version", "Specs")], 683 sortBy=0, 684 ) 685 686 687def _version_checker(module, minver): 688 # type: (ModuleType, Tuple[int, ...]) -> bool 689 """Checks that module has a higher version that minver. 690 691 params: 692 - module: a module to test 693 - minver: a tuple of versions 694 """ 695 # We could use LooseVersion, but distutils imports imp which is deprecated 696 version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?' 697 version_tags_r = re.match( 698 version_regexp, 699 getattr(module, "__version__", "") 700 ) 701 if not version_tags_r: 702 return False 703 version_tags_i = version_tags_r.group(1).split(".") 704 version_tags = tuple(int(x) for x in version_tags_i) 705 return bool(version_tags >= minver) 706 707 708def isCryptographyValid(): 709 # type: () -> bool 710 """ 711 Check if the cryptography module >= 2.0.0 is present. This is the minimum 712 version for most usages in Scapy. 713 """ 714 try: 715 import cryptography 716 except ImportError: 717 return False 718 return _version_checker(cryptography, (2, 0, 0)) 719 720 721def isCryptographyAdvanced(): 722 # type: () -> bool 723 """ 724 Check if the cryptography module is present, and if it supports X25519, 725 ChaCha20Poly1305 and such. 726 727 Notes: 728 - cryptography >= 2.0 is required 729 - OpenSSL >= 1.1.0 is required 730 """ 731 try: 732 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey # noqa: E501 733 X25519PrivateKey.generate() 734 except Exception: 735 return False 736 else: 737 return True 738 739 740def isPyPy(): 741 # type: () -> bool 742 """Returns either scapy is running under PyPy or not""" 743 try: 744 import __pypy__ # noqa: F401 745 return True 746 except ImportError: 747 return False 748 749 750def _prompt_changer(attr, val, old): 751 # type: (str, Any, Any) -> Any 752 """Change the current prompt theme""" 753 Interceptor.set_from_hook(conf, attr, val) 754 try: 755 sys.ps1 = conf.color_theme.prompt(conf.prompt) 756 except Exception: 757 pass 758 try: 759 apply_ipython_style( 760 get_ipython() # type: ignore 761 ) 762 except NameError: 763 pass 764 return getattr(conf, attr, old) 765 766 767def _set_conf_sockets(): 768 # type: () -> None 769 """Populate the conf.L2Socket and conf.L3Socket 770 according to the various use_* parameters 771 """ 772 if conf.use_bpf and not BSD: 773 Interceptor.set_from_hook(conf, "use_bpf", False) 774 raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !") 775 if not conf.use_pcap and SOLARIS: 776 Interceptor.set_from_hook(conf, "use_pcap", True) 777 raise ScapyInvalidPlatformException( 778 "Scapy only supports libpcap on Solaris !" 779 ) 780 # we are already in an Interceptor hook, use Interceptor.set_from_hook 781 if conf.use_pcap: 782 try: 783 from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \ 784 L3pcapSocket 785 except (OSError, ImportError): 786 log_loading.warning("No libpcap provider available ! pcap won't be used") 787 Interceptor.set_from_hook(conf, "use_pcap", False) 788 else: 789 conf.L3socket = L3pcapSocket 790 conf.L3socket6 = functools.partial( 791 L3pcapSocket, filter="ip6") 792 conf.L2socket = L2pcapSocket 793 conf.L2listen = L2pcapListenSocket 794 elif conf.use_bpf: 795 from scapy.arch.bpf.supersocket import L2bpfListenSocket, \ 796 L2bpfSocket, L3bpfSocket 797 conf.L3socket = L3bpfSocket 798 conf.L3socket6 = functools.partial( 799 L3bpfSocket, filter="ip6") 800 conf.L2socket = L2bpfSocket 801 conf.L2listen = L2bpfListenSocket 802 elif LINUX: 803 from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket 804 conf.L3socket = L3PacketSocket 805 conf.L3socket6 = cast( 806 "Type[SuperSocket]", 807 functools.partial( 808 L3PacketSocket, 809 filter="ip6" 810 ) 811 ) 812 conf.L2socket = L2Socket 813 conf.L2listen = L2ListenSocket 814 elif WINDOWS: 815 from scapy.arch.windows import _NotAvailableSocket 816 from scapy.arch.windows.native import L3WinSocket, L3WinSocket6 817 conf.L3socket = L3WinSocket 818 conf.L3socket6 = L3WinSocket6 819 conf.L2socket = _NotAvailableSocket 820 conf.L2listen = _NotAvailableSocket 821 else: 822 from scapy.supersocket import L3RawSocket, L3RawSocket6 823 conf.L3socket = L3RawSocket 824 conf.L3socket6 = L3RawSocket6 825 # Reload the interfaces 826 conf.ifaces.reload() 827 828 829def _socket_changer(attr, val, old): 830 # type: (str, bool, bool) -> Any 831 if not isinstance(val, bool): 832 raise TypeError("This argument should be a boolean") 833 Interceptor.set_from_hook(conf, attr, val) 834 dependencies = { # Things that will be turned off 835 "use_pcap": ["use_bpf"], 836 "use_bpf": ["use_pcap"], 837 } 838 restore = {k: getattr(conf, k) for k in dependencies} 839 del restore[attr] # This is handled directly by _set_conf_sockets 840 if val: # Only if True 841 for param in dependencies[attr]: 842 Interceptor.set_from_hook(conf, param, False) 843 try: 844 _set_conf_sockets() 845 except (ScapyInvalidPlatformException, ImportError) as e: 846 for key, value in restore.items(): 847 Interceptor.set_from_hook(conf, key, value) 848 if isinstance(e, ScapyInvalidPlatformException): 849 raise 850 return getattr(conf, attr) 851 852 853def _loglevel_changer(attr, val, old): 854 # type: (str, int, int) -> int 855 """Handle a change of conf.logLevel""" 856 log_scapy.setLevel(val) 857 return val 858 859 860def _iface_changer(attr, val, old): 861 # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface' 862 """Resolves the interface in conf.iface""" 863 if isinstance(val, str): 864 from scapy.interfaces import resolve_iface 865 iface = resolve_iface(val) 866 if old and iface.dummy: 867 warning( 868 "This interface is not specified in any provider ! " 869 "See conf.ifaces output" 870 ) 871 return iface 872 return val 873 874 875def _reset_tls_nss_keys(attr, val, old): 876 # type: (str, Any, Any) -> Any 877 """Reset conf.tls_nss_keys when conf.tls_nss_filename changes""" 878 conf.tls_nss_keys = None 879 return val 880 881 882class Conf(ConfClass): 883 """ 884 This object contains the configuration of Scapy. 885 """ 886 version: str = ReadOnlyAttribute("version", VERSION) 887 session: str = "" #: filename where the session will be saved 888 interactive = False 889 #: can be "ipython", "bpython", "ptpython", "ptipython", "python" or "auto". 890 #: Default: Auto 891 interactive_shell = "auto" 892 #: Configuration for "ipython" to use jedi (disabled by default) 893 ipython_use_jedi = False 894 #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...) 895 stealth = "not implemented" 896 #: selects the default output interface for srp() and sendp(). 897 iface = Interceptor("iface", None, _iface_changer) # type: 'scapy.interfaces.NetworkInterface' # noqa: E501 898 layers: LayersList = LayersList() 899 commands = CommandsList() # type: CommandsList 900 #: Codec used by default for ASN1 objects 901 ASN1_default_codec = None # type: 'scapy.asn1.asn1.ASN1Codec' 902 #: Default size for ASN1 objects 903 ASN1_default_long_size = 0 904 #: choose the AS resolver class to use 905 AS_resolver = None # type: scapy.as_resolvers.AS_resolver 906 dot15d4_protocol = None # Used in dot15d4.py 907 logLevel: int = Interceptor("logLevel", log_scapy.level, _loglevel_changer) 908 #: if 0, doesn't check that IPID matches between IP sent and 909 #: ICMP IP citation received 910 #: if 1, checks that they either are equal or byte swapped 911 #: equals (bug in some IP stacks) 912 #: if 2, strictly checks that they are equals 913 checkIPID = False 914 #: if 1, checks IP src in IP and ICMP IP citation match 915 #: (bug in some NAT stacks) 916 checkIPsrc = True 917 checkIPaddr = True 918 #: if True, checks that IP-in-IP layers match. If False, do 919 #: not check IP layers that encapsulates another IP layer 920 checkIPinIP = True 921 #: if 1, also check that TCP seq and ack match the 922 #: ones in ICMP citation 923 check_TCPerror_seqack = False 924 verb = 2 #: level of verbosity, from 0 (almost mute) to 3 (verbose) 925 prompt: str = Interceptor("prompt", ">>> ", _prompt_changer) 926 #: default mode for the promiscuous mode of a socket (to get answers if you 927 #: spoof on a lan) 928 sniff_promisc = True # type: bool 929 raw_layer = None # type: Type[Packet] 930 raw_summary = False # type: Union[bool, Callable[[bytes], Any]] 931 padding_layer = None # type: Type[Packet] 932 default_l2 = None # type: Type[Packet] 933 l2types: Num2Layer = Num2Layer() 934 l3types: Num2Layer = Num2Layer() 935 L3socket = None # type: Type[scapy.supersocket.SuperSocket] 936 L3socket6 = None # type: Type[scapy.supersocket.SuperSocket] 937 L2socket = None # type: Type[scapy.supersocket.SuperSocket] 938 L2listen = None # type: Type[scapy.supersocket.SuperSocket] 939 BTsocket = None # type: Type[scapy.supersocket.SuperSocket] 940 min_pkt_size = 60 941 #: holds MIB direct access dictionary 942 mib = None # type: 'scapy.asn1.mib.MIBDict' 943 bufsize = 2**16 944 #: history file 945 histfile: str = os.getenv( 946 'SCAPY_HISTFILE', 947 os.path.join( 948 os.path.expanduser("~"), 949 ".config", "scapy", "history" 950 ) 951 ) 952 #: includes padding in disassembled packets 953 padding = 1 954 #: BPF filter for packets to ignore 955 except_filter = "" 956 #: bpf filter added to every sniffing socket to exclude traffic 957 #: from analysis 958 filter = "" 959 #: when 1, store received packet that are not matched into `debug.recv` 960 debug_match = False 961 #: When 1, print some TLS session secrets when they are computed, and 962 #: warn about the session recognition. 963 debug_tls = False 964 wepkey = "" 965 #: holds the Scapy interface list and manager 966 ifaces = None # type: 'scapy.interfaces.NetworkInterfaceDict' 967 #: holds the cache of interfaces loaded from Libpcap 968 cache_pcapiflist = {} # type: Dict[str, Tuple[str, List[str], Any, str, int]] 969 # `neighbor` will be filed by scapy.layers.l2 970 neighbor = None # type: 'scapy.layers.l2.Neighbor' 971 #: holds the name servers IP/hosts used for custom DNS resolution 972 nameservers = None # type: str 973 #: automatically load IPv4 routes on startup. Disable this if your 974 #: routing table is too big. 975 route_autoload = True 976 #: automatically load IPv6 routes on startup. Disable this if your 977 #: routing table is too big. 978 route6_autoload = True 979 #: holds the Scapy IPv4 routing table and provides methods to 980 #: manipulate it 981 route = None # type: 'scapy.route.Route' 982 # `route` will be filed by route.py 983 #: holds the Scapy IPv6 routing table and provides methods to 984 #: manipulate it 985 route6 = None # type: 'scapy.route6.Route6' 986 manufdb = None # type: 'scapy.data.ManufDA' 987 ethertypes = None # type: 'scapy.data.EtherDA' 988 protocols = None # type: 'scapy.dadict.DADict[int, str]' 989 services_udp = None # type: 'scapy.dadict.DADict[int, str]' 990 services_tcp = None # type: 'scapy.dadict.DADict[int, str]' 991 services_sctp = None # type: 'scapy.dadict.DADict[int, str]' 992 # 'route6' will be filed by route6.py 993 teredoPrefix = "" # type: str 994 teredoServerPort = None # type: int 995 auto_fragment = True 996 #: raise exception when a packet dissector raises an exception 997 debug_dissector = False 998 color_theme: ColorTheme = Interceptor("color_theme", NoTheme(), _prompt_changer) 999 #: how much time between warnings from the same place 1000 warning_threshold = 5 1001 prog: ProgPath = ProgPath() 1002 #: holds list of fields for which resolution should be done 1003 resolve: Resolve = Resolve() 1004 #: holds list of enum fields for which conversion to string 1005 #: should NOT be done 1006 noenum: Resolve = Resolve() 1007 emph: Emphasize = Emphasize() 1008 #: read only attribute to show if PyPy is in use 1009 use_pypy: bool = ReadOnlyAttribute("use_pypy", isPyPy()) 1010 #: use libpcap integration or not. Changing this value will update 1011 #: the conf.L[2/3] sockets 1012 use_pcap: bool = Interceptor( 1013 "use_pcap", 1014 os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"), 1015 _socket_changer 1016 ) 1017 use_bpf: bool = Interceptor("use_bpf", False, _socket_changer) 1018 use_npcap = False 1019 ipv6_enabled: bool = socket.has_ipv6 1020 stats_classic_protocols = [] # type: List[Type[Packet]] 1021 stats_dot11_protocols = [] # type: List[Type[Packet]] 1022 temp_files = [] # type: List[str] 1023 #: netcache holds time-based caches for net operations 1024 netcache: NetCache = NetCache() 1025 geoip_city = None 1026 # can, tls, http and a few others are not loaded by default 1027 load_layers: List[str] = [ 1028 'bluetooth', 1029 'bluetooth4LE', 1030 'dcerpc', 1031 'dhcp', 1032 'dhcp6', 1033 'dns', 1034 'dot11', 1035 'dot15d4', 1036 'eap', 1037 'gprs', 1038 'gssapi', 1039 'hsrp', 1040 'inet', 1041 'inet6', 1042 'ipsec', 1043 'ir', 1044 'isakmp', 1045 'kerberos', 1046 'l2', 1047 'l2tp', 1048 'ldap', 1049 'llmnr', 1050 'lltd', 1051 'mgcp', 1052 'mobileip', 1053 'netbios', 1054 'netflow', 1055 'ntlm', 1056 'ntp', 1057 'ppi', 1058 'ppp', 1059 'pptp', 1060 'radius', 1061 'rip', 1062 'rtp', 1063 'sctp', 1064 'sixlowpan', 1065 'skinny', 1066 'smb', 1067 'smb2', 1068 'smbclient', 1069 'smbserver', 1070 'snmp', 1071 'spnego', 1072 'tftp', 1073 'vrrp', 1074 'vxlan', 1075 'x509', 1076 'zigbee' 1077 ] 1078 #: a dict which can be used by contrib layers to store local 1079 #: configuration 1080 contribs = dict() # type: Dict[str, Any] 1081 exts: ExtsManager = ExtsManager() 1082 crypto_valid = isCryptographyValid() 1083 crypto_valid_advanced = isCryptographyAdvanced() 1084 #: controls whether or not to display the fancy banner 1085 fancy_banner = True 1086 #: controls whether tables (conf.iface, conf.route...) should be cropped 1087 #: to fit the terminal 1088 auto_crop_tables = True 1089 #: how often to check for new packets. 1090 #: Defaults to 0.05s. 1091 recv_poll_rate = 0.05 1092 #: When True, raise exception if no dst MAC found otherwise broadcast. 1093 #: Default is False. 1094 raise_no_dst_mac = False 1095 loopback_name: str = "lo" if LINUX else "lo0" 1096 nmap_base = "" # type: str 1097 nmap_kdb = None # type: Optional[NmapKnowledgeBase] 1098 #: a safety mechanism: the maximum amount of items included in a PacketListField 1099 #: or a FieldListField 1100 max_list_count = 100 1101 #: When the TLS module is loaded (not by default), the following turns on sessions 1102 tls_session_enable = False 1103 #: Filename containing NSS Keys Log 1104 tls_nss_filename = Interceptor( 1105 "tls_nss_filename", 1106 None, 1107 _reset_tls_nss_keys 1108 ) 1109 #: Dictionary containing parsed NSS Keys 1110 tls_nss_keys: Dict[str, bytes] = None 1111 #: When TCPSession is used, parse DCE/RPC sessions automatically. 1112 #: This should be used for passive sniffing. 1113 dcerpc_session_enable = False 1114 #: If a capture is missing the first DCE/RPC binding message, we might incorrectly 1115 #: assume that header signing isn't used. This forces it on. 1116 dcerpc_force_header_signing = False 1117 #: Windows SSPs for sniffing. This is used with 1118 #: dcerpc_session_enable 1119 winssps_passive = [] 1120 1121 def __getattribute__(self, attr): 1122 # type: (str) -> Any 1123 # Those are loaded on runtime to avoid import loops 1124 if attr == "manufdb": 1125 from scapy.data import MANUFDB 1126 return MANUFDB 1127 if attr == "ethertypes": 1128 from scapy.data import ETHER_TYPES 1129 return ETHER_TYPES 1130 if attr == "protocols": 1131 from scapy.data import IP_PROTOS 1132 return IP_PROTOS 1133 if attr == "services_udp": 1134 from scapy.data import UDP_SERVICES 1135 return UDP_SERVICES 1136 if attr == "services_tcp": 1137 from scapy.data import TCP_SERVICES 1138 return TCP_SERVICES 1139 if attr == "services_sctp": 1140 from scapy.data import SCTP_SERVICES 1141 return SCTP_SERVICES 1142 if attr == "iface6": 1143 warnings.warn( 1144 "conf.iface6 is deprecated in favor of conf.iface", 1145 DeprecationWarning 1146 ) 1147 attr = "iface" 1148 return object.__getattribute__(self, attr) 1149 1150 1151if not Conf.ipv6_enabled: 1152 log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.") # noqa: E501 1153 for m in ["inet6", "dhcp6", "sixlowpan"]: 1154 if m in Conf.load_layers: 1155 Conf.load_layers.remove(m) 1156 1157conf = Conf() # type: Conf 1158 1159# Python 3.8 Only 1160if sys.version_info >= (3, 8): 1161 conf.exts.load() 1162 1163 1164def crypto_validator(func): 1165 # type: (DecoratorCallable) -> DecoratorCallable 1166 """ 1167 This a decorator to be used for any method relying on the cryptography library. # noqa: E501 1168 Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'. 1169 """ 1170 def func_in(*args, **kwargs): 1171 # type: (*Any, **Any) -> Any 1172 if not conf.crypto_valid: 1173 raise ImportError("Cannot execute crypto-related method! " 1174 "Please install python-cryptography v1.7 or later.") # noqa: E501 1175 return func(*args, **kwargs) 1176 return func_in 1177 1178 1179def scapy_delete_temp_files(): 1180 # type: () -> None 1181 for f in conf.temp_files: 1182 try: 1183 os.unlink(f) 1184 except Exception: 1185 pass 1186 del conf.temp_files[:] 1187 1188 1189atexit.register(scapy_delete_temp_files) 1190