1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7General utility functions. 8""" 9 10 11from decimal import Decimal 12from io import StringIO 13from itertools import zip_longest 14from uuid import UUID 15 16import argparse 17import array 18import base64 19import collections 20import decimal 21import difflib 22import gzip 23import inspect 24import locale 25import math 26import os 27import pickle 28import random 29import re 30import shutil 31import socket 32import struct 33import subprocess 34import sys 35import tempfile 36import threading 37import time 38import traceback 39import warnings 40 41from scapy.config import conf 42from scapy.consts import DARWIN, OPENBSD, WINDOWS 43from scapy.data import MTU, DLT_EN10MB, DLT_RAW 44from scapy.compat import ( 45 orb, 46 plain_str, 47 chb, 48 hex_bytes, 49 bytes_encode, 50) 51from scapy.error import ( 52 log_interactive, 53 log_runtime, 54 Scapy_Exception, 55 warning, 56) 57from scapy.pton_ntop import inet_pton 58 59# Typing imports 60from typing import ( 61 cast, 62 Any, 63 AnyStr, 64 Callable, 65 Dict, 66 IO, 67 Iterator, 68 List, 69 Optional, 70 TYPE_CHECKING, 71 Tuple, 72 Type, 73 Union, 74 overload, 75) 76from scapy.compat import ( 77 DecoratorCallable, 78 Literal, 79) 80 81if TYPE_CHECKING: 82 from scapy.packet import Packet 83 from scapy.plist import _PacketIterable, PacketList 84 from scapy.supersocket import SuperSocket 85 import prompt_toolkit 86 87_ByteStream = Union[IO[bytes], gzip.GzipFile] 88 89########### 90# Tools # 91########### 92 93 94def issubtype(x, # type: Any 95 t, # type: Union[type, str] 96 ): 97 # type: (...) -> bool 98 """issubtype(C, B) -> bool 99 100 Return whether C is a class and if it is a subclass of class B. 101 When using a tuple as the second argument issubtype(X, (A, B, ...)), 102 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.). 103 """ 104 if isinstance(t, str): 105 return t in (z.__name__ for z in x.__bases__) 106 if isinstance(x, type) and issubclass(x, t): 107 return True 108 return False 109 110 111_Decimal = Union[Decimal, int] 112 113 114class EDecimal(Decimal): 115 """Extended Decimal 116 117 This implements arithmetic and comparison with float for 118 backward compatibility 119 """ 120 121 def __add__(self, other, context=None): 122 # type: (_Decimal, Any) -> EDecimal 123 return EDecimal(Decimal.__add__(self, Decimal(other))) 124 125 def __radd__(self, other): 126 # type: (_Decimal) -> EDecimal 127 return EDecimal(Decimal.__add__(self, Decimal(other))) 128 129 def __sub__(self, other): 130 # type: (_Decimal) -> EDecimal 131 return EDecimal(Decimal.__sub__(self, Decimal(other))) 132 133 def __rsub__(self, other): 134 # type: (_Decimal) -> EDecimal 135 return EDecimal(Decimal.__rsub__(self, Decimal(other))) 136 137 def __mul__(self, other): 138 # type: (_Decimal) -> EDecimal 139 return EDecimal(Decimal.__mul__(self, Decimal(other))) 140 141 def __rmul__(self, other): 142 # type: (_Decimal) -> EDecimal 143 return EDecimal(Decimal.__mul__(self, Decimal(other))) 144 145 def __truediv__(self, other): 146 # type: (_Decimal) -> EDecimal 147 return EDecimal(Decimal.__truediv__(self, Decimal(other))) 148 149 def __floordiv__(self, other): 150 # type: (_Decimal) -> EDecimal 151 return EDecimal(Decimal.__floordiv__(self, Decimal(other))) 152 153 def __divmod__(self, other): 154 # type: (_Decimal) -> Tuple[EDecimal, EDecimal] 155 r = Decimal.__divmod__(self, Decimal(other)) 156 return EDecimal(r[0]), EDecimal(r[1]) 157 158 def __mod__(self, other): 159 # type: (_Decimal) -> EDecimal 160 return EDecimal(Decimal.__mod__(self, Decimal(other))) 161 162 def __rmod__(self, other): 163 # type: (_Decimal) -> EDecimal 164 return EDecimal(Decimal.__rmod__(self, Decimal(other))) 165 166 def __pow__(self, other, modulo=None): 167 # type: (_Decimal, Optional[_Decimal]) -> EDecimal 168 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo)) 169 170 def __eq__(self, other): 171 # type: (Any) -> bool 172 if isinstance(other, Decimal): 173 return super(EDecimal, self).__eq__(other) 174 else: 175 return bool(float(self) == other) 176 177 def normalize(self, precision): # type: ignore 178 # type: (int) -> EDecimal 179 with decimal.localcontext() as ctx: 180 ctx.prec = precision 181 return EDecimal(super(EDecimal, self).normalize(ctx)) 182 183 184@overload 185def get_temp_file(keep, autoext, fd): 186 # type: (bool, str, Literal[True]) -> IO[bytes] 187 pass 188 189 190@overload 191def get_temp_file(keep=False, autoext="", fd=False): 192 # type: (bool, str, Literal[False]) -> str 193 pass 194 195 196def get_temp_file(keep=False, autoext="", fd=False): 197 # type: (bool, str, bool) -> Union[IO[bytes], str] 198 """Creates a temporary file. 199 200 :param keep: If False, automatically delete the file when Scapy exits. 201 :param autoext: Suffix to add to the generated file name. 202 :param fd: If True, this returns a file-like object with the temporary 203 file opened. If False (default), this returns a file path. 204 """ 205 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext, 206 delete=False) 207 if not keep: 208 conf.temp_files.append(f.name) 209 210 if fd: 211 return f 212 else: 213 # Close the file so something else can take it. 214 f.close() 215 return f.name 216 217 218def get_temp_dir(keep=False): 219 # type: (bool) -> str 220 """Creates a temporary file, and returns its name. 221 222 :param keep: If False (default), the directory will be recursively 223 deleted when Scapy exits. 224 :return: A full path to a temporary directory. 225 """ 226 227 dname = tempfile.mkdtemp(prefix="scapy") 228 229 if not keep: 230 conf.temp_files.append(dname) 231 232 return dname 233 234 235def _create_fifo() -> Tuple[str, Any]: 236 """Creates a temporary fifo. 237 238 You must then use open_fifo() on the server_fd once 239 the client is connected to use it. 240 241 :returns: (client_file, server_fd) 242 """ 243 if WINDOWS: 244 from scapy.arch.windows.structures import _get_win_fifo 245 return _get_win_fifo() 246 else: 247 f = get_temp_file() 248 os.unlink(f) 249 os.mkfifo(f) 250 return f, f 251 252 253def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]: 254 """Open the server_fd (see create_fifo) 255 """ 256 if WINDOWS: 257 from scapy.arch.windows.structures import _win_fifo_open 258 return _win_fifo_open(fd) 259 else: 260 return open(fd, mode) 261 262 263def sane(x, color=False): 264 # type: (AnyStr, bool) -> str 265 r = "" 266 for i in x: 267 j = orb(i) 268 if (j < 32) or (j >= 127): 269 if color: 270 r += conf.color_theme.not_printable(".") 271 else: 272 r += "." 273 else: 274 r += chr(j) 275 return r 276 277 278@conf.commands.register 279def restart(): 280 # type: () -> None 281 """Restarts scapy""" 282 if not conf.interactive or not os.path.isfile(sys.argv[0]): 283 raise OSError("Scapy was not started from console") 284 if WINDOWS: 285 res_code = 1 286 try: 287 res_code = subprocess.call([sys.executable] + sys.argv) 288 finally: 289 os._exit(res_code) 290 os.execv(sys.executable, [sys.executable] + sys.argv) 291 292 293def lhex(x): 294 # type: (Any) -> str 295 from scapy.volatile import VolatileValue 296 if isinstance(x, VolatileValue): 297 return repr(x) 298 if isinstance(x, int): 299 return hex(x) 300 if isinstance(x, tuple): 301 return "(%s)" % ", ".join(lhex(v) for v in x) 302 if isinstance(x, list): 303 return "[%s]" % ", ".join(lhex(v) for v in x) 304 return str(x) 305 306 307@conf.commands.register 308def hexdump(p, dump=False): 309 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 310 """Build a tcpdump like hexadecimal view 311 312 :param p: a Packet 313 :param dump: define if the result must be printed or returned in a variable 314 :return: a String only when dump=True 315 """ 316 s = "" 317 x = bytes_encode(p) 318 x_len = len(x) 319 i = 0 320 while i < x_len: 321 s += "%04x " % i 322 for j in range(16): 323 if i + j < x_len: 324 s += "%02X " % orb(x[i + j]) 325 else: 326 s += " " 327 s += " %s\n" % sane(x[i:i + 16], color=True) 328 i += 16 329 # remove trailing \n 330 s = s[:-1] if s.endswith("\n") else s 331 if dump: 332 return s 333 else: 334 print(s) 335 return None 336 337 338@conf.commands.register 339def linehexdump(p, onlyasc=0, onlyhex=0, dump=False): 340 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str] 341 """Build an equivalent view of hexdump() on a single line 342 343 Note that setting both onlyasc and onlyhex to 1 results in a empty output 344 345 :param p: a Packet 346 :param onlyasc: 1 to display only the ascii view 347 :param onlyhex: 1 to display only the hexadecimal view 348 :param dump: print the view if False 349 :return: a String only when dump=True 350 """ 351 s = "" 352 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump) 353 if dump: 354 return s 355 else: 356 print(s) 357 return None 358 359 360@conf.commands.register 361def chexdump(p, dump=False): 362 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 363 """Build a per byte hexadecimal representation 364 365 Example: 366 >>> chexdump(IP()) 367 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501 368 369 :param p: a Packet 370 :param dump: print the view if False 371 :return: a String only if dump=True 372 """ 373 x = bytes_encode(p) 374 s = ", ".join("%#04x" % orb(x) for x in x) 375 if dump: 376 return s 377 else: 378 print(s) 379 return None 380 381 382@conf.commands.register 383def hexstr(p, onlyasc=0, onlyhex=0, color=False): 384 # type: (Union[Packet, AnyStr], int, int, bool) -> str 385 """Build a fancy tcpdump like hex from bytes.""" 386 x = bytes_encode(p) 387 s = [] 388 if not onlyasc: 389 s.append(" ".join("%02X" % orb(b) for b in x)) 390 if not onlyhex: 391 s.append(sane(x, color=color)) 392 return " ".join(s) 393 394 395def repr_hex(s): 396 # type: (bytes) -> str 397 """ Convert provided bitstring to a simple string of hex digits """ 398 return "".join("%02x" % orb(x) for x in s) 399 400 401@conf.commands.register 402def hexdiff( 403 a: Union['Packet', AnyStr], 404 b: Union['Packet', AnyStr], 405 algo: Optional[str] = None, 406 autojunk: bool = False, 407) -> None: 408 """ 409 Show differences between 2 binary strings, Packets... 410 411 Available algorithms: 412 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the 413 Levenstein distance between the strings then backtrack. 414 - difflib: Use the difflib.SequenceMatcher implementation. This based on a 415 modified version of the Ratcliff and Obershelp algorithm. 416 This is much faster, but far less accurate. 417 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher 418 419 :param a: 420 :param b: The binary strings, packets... to compare 421 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'. 422 By default, this is chosen depending on the complexity, optimistically 423 preferring wagnerfischer unless really necessary. 424 :param autojunk: (difflib only) See difflib documentation. 425 """ 426 xb = bytes_encode(a) 427 yb = bytes_encode(b) 428 429 if algo is None: 430 # Choose the best algorithm 431 complexity = len(xb) * len(yb) 432 if complexity < 1e7: 433 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable. 434 # Anything much larger than this shouldn't be attempted by default. 435 algo = "wagnerfischer" 436 if complexity > 1e6: 437 log_interactive.info( 438 "Complexity is a bit high. hexdiff will take a few seconds." 439 ) 440 else: 441 algo = "difflib" 442 443 backtrackx = [] 444 backtracky = [] 445 446 if algo == "wagnerfischer": 447 xb = xb[::-1] 448 yb = yb[::-1] 449 450 # costs for the 3 operations 451 INSERT = 1 452 DELETE = 1 453 SUBST = 1 454 455 # Typically, d[i,j] will hold the distance between 456 # the first i characters of xb and the first j characters of yb. 457 # We change the Wagner Fischer to also store pointers to all 458 # the intermediate steps taken while calculating the Levenstein distance. 459 d = {(-1, -1): (0, (-1, -1))} 460 for j in range(len(yb)): 461 d[-1, j] = (j + 1) * INSERT, (-1, j - 1) 462 for i in range(len(xb)): 463 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1) 464 465 # Compute the Levenstein distance between the two strings, but 466 # store all the steps to be able to backtrack at the end. 467 for j in range(len(yb)): 468 for i in range(len(xb)): 469 d[i, j] = min( 470 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)), 471 (d[i - 1, j][0] + DELETE, (i - 1, j)), 472 (d[i, j - 1][0] + INSERT, (i, j - 1)), 473 ) 474 475 # Iterate through the steps backwards to create the diff 476 i = len(xb) - 1 477 j = len(yb) - 1 478 while not (i == j == -1): 479 i2, j2 = d[i, j][1] 480 backtrackx.append(xb[i2 + 1:i + 1]) 481 backtracky.append(yb[j2 + 1:j + 1]) 482 i, j = i2, j2 483 elif algo == "difflib": 484 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk) 485 xarr = [xb[i:i + 1] for i in range(len(xb))] 486 yarr = [yb[i:i + 1] for i in range(len(yb))] 487 # Iterate through opcodes to build the backtrack 488 for opcode in sm.get_opcodes(): 489 typ, x0, x1, y0, y1 = opcode 490 if typ == 'delete': 491 backtrackx += xarr[x0:x1] 492 backtracky += [b''] * (x1 - x0) 493 elif typ == 'insert': 494 backtrackx += [b''] * (y1 - y0) 495 backtracky += yarr[y0:y1] 496 elif typ in ['equal', 'replace']: 497 backtrackx += xarr[x0:x1] 498 backtracky += yarr[y0:y1] 499 # Some lines may have been considered as junk. Check the sizes 500 if autojunk: 501 lbx = len(backtrackx) 502 lby = len(backtracky) 503 backtrackx += [b''] * (max(lbx, lby) - lbx) 504 backtracky += [b''] * (max(lbx, lby) - lby) 505 else: 506 raise ValueError("Unknown algorithm '%s'" % algo) 507 508 # Print the diff 509 510 x = y = i = 0 511 colorize: Dict[int, Callable[[str], str]] = { 512 0: lambda x: x, 513 -1: conf.color_theme.left, 514 1: conf.color_theme.right 515 } 516 517 dox = 1 518 doy = 0 519 btx_len = len(backtrackx) 520 while i < btx_len: 521 linex = backtrackx[i:i + 16] 522 liney = backtracky[i:i + 16] 523 xx = sum(len(k) for k in linex) 524 yy = sum(len(k) for k in liney) 525 if dox and not xx: 526 dox = 0 527 doy = 1 528 if dox and linex == liney: 529 doy = 1 530 531 if dox: 532 xd = y 533 j = 0 534 while not linex[j]: 535 j += 1 536 xd -= 1 537 print(colorize[doy - dox]("%04x" % xd), end=' ') 538 x += xx 539 line = linex 540 else: 541 print(" ", end=' ') 542 if doy: 543 yd = y 544 j = 0 545 while not liney[j]: 546 j += 1 547 yd -= 1 548 print(colorize[doy - dox]("%04x" % yd), end=' ') 549 y += yy 550 line = liney 551 else: 552 print(" ", end=' ') 553 554 print(" ", end=' ') 555 556 cl = "" 557 for j in range(16): 558 if i + j < min(len(backtrackx), len(backtracky)): 559 if line[j]: 560 col = colorize[(linex[j] != liney[j]) * (doy - dox)] 561 print(col("%02X" % orb(line[j])), end=' ') 562 if linex[j] == liney[j]: 563 cl += sane(line[j], color=True) 564 else: 565 cl += col(sane(line[j])) 566 else: 567 print(" ", end=' ') 568 cl += " " 569 else: 570 print(" ", end=' ') 571 if j == 7: 572 print("", end=' ') 573 574 print(" ", cl) 575 576 if doy or not yy: 577 doy = 0 578 dox = 1 579 i += 16 580 else: 581 if yy: 582 dox = 0 583 doy = 1 584 else: 585 i += 16 586 587 588if struct.pack("H", 1) == b"\x00\x01": # big endian 589 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int] 590else: 591 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8 592 593 594def checksum(pkt): 595 # type: (bytes) -> int 596 if len(pkt) % 2 == 1: 597 pkt += b"\0" 598 s = sum(array.array("H", pkt)) 599 s = (s >> 16) + (s & 0xffff) 600 s += s >> 16 601 s = ~s 602 return checksum_endian_transform(s) & 0xffff 603 604 605def _fletcher16(charbuf): 606 # type: (bytes) -> Tuple[int, int] 607 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 608 c0 = c1 = 0 609 for char in charbuf: 610 c0 += char 611 c1 += c0 612 613 c0 %= 255 614 c1 %= 255 615 return (c0, c1) 616 617 618@conf.commands.register 619def fletcher16_checksum(binbuf): 620 # type: (bytes) -> int 621 """Calculates Fletcher-16 checksum of the given buffer. 622 623 Note: 624 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501 625 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501 626 """ 627 (c0, c1) = _fletcher16(binbuf) 628 return (c1 << 8) | c0 629 630 631@conf.commands.register 632def fletcher16_checkbytes(binbuf, offset): 633 # type: (bytes, int) -> bytes 634 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string. 635 636 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501 637 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501 638 the integrity of the buffer on the receiver side. 639 640 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501 641 """ 642 643 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 644 if len(binbuf) < offset: 645 raise Exception("Packet too short for checkbytes %d" % len(binbuf)) 646 647 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:] 648 (c0, c1) = _fletcher16(binbuf) 649 650 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255 651 652 if (x <= 0): 653 x += 255 654 655 y = 510 - c0 - x 656 657 if (y > 255): 658 y -= 255 659 return chb(x) + chb(y) 660 661 662def mac2str(mac): 663 # type: (str) -> bytes 664 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':')) 665 666 667def valid_mac(mac): 668 # type: (str) -> bool 669 try: 670 return len(mac2str(mac)) == 6 671 except ValueError: 672 pass 673 return False 674 675 676def str2mac(s): 677 # type: (bytes) -> str 678 if isinstance(s, str): 679 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s)) 680 return ("%02x:" * len(s))[:-1] % tuple(s) 681 682 683def randstring(length): 684 # type: (int) -> bytes 685 """ 686 Returns a random string of length (length >= 0) 687 """ 688 return b"".join(struct.pack('B', random.randint(0, 255)) 689 for _ in range(length)) 690 691 692def zerofree_randstring(length): 693 # type: (int) -> bytes 694 """ 695 Returns a random string of length (length >= 0) without zero in it. 696 """ 697 return b"".join(struct.pack('B', random.randint(1, 255)) 698 for _ in range(length)) 699 700 701def stror(s1, s2): 702 # type: (bytes, bytes) -> bytes 703 """ 704 Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2 705 must be of same length. 706 """ 707 return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2)) 708 709 710def strxor(s1, s2): 711 # type: (bytes, bytes) -> bytes 712 """ 713 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2 714 must be of same length. 715 """ 716 return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2)) 717 718 719def strand(s1, s2): 720 # type: (bytes, bytes) -> bytes 721 """ 722 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2 723 must be of same length. 724 """ 725 return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2)) 726 727 728def strrot(s1, count, right=True): 729 # type: (bytes, int, bool) -> bytes 730 """ 731 Rotate the binary by 'count' bytes 732 """ 733 off = count % len(s1) 734 if right: 735 return s1[-off:] + s1[:-off] 736 else: 737 return s1[off:] + s1[:off] 738 739 740# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501 741try: 742 socket.inet_aton("255.255.255.255") 743except socket.error: 744 def inet_aton(ip_string): 745 # type: (str) -> bytes 746 if ip_string == "255.255.255.255": 747 return b"\xff" * 4 748 else: 749 return socket.inet_aton(ip_string) 750else: 751 inet_aton = socket.inet_aton # type: ignore 752 753inet_ntoa = socket.inet_ntoa 754 755 756def atol(x): 757 # type: (str) -> int 758 try: 759 ip = inet_aton(x) 760 except socket.error: 761 raise ValueError("Bad IP format: %s" % x) 762 return cast(int, struct.unpack("!I", ip)[0]) 763 764 765def valid_ip(addr): 766 # type: (str) -> bool 767 try: 768 addr = plain_str(addr) 769 except UnicodeDecodeError: 770 return False 771 try: 772 atol(addr) 773 except (OSError, ValueError, socket.error): 774 return False 775 return True 776 777 778def valid_net(addr): 779 # type: (str) -> bool 780 try: 781 addr = plain_str(addr) 782 except UnicodeDecodeError: 783 return False 784 if '/' in addr: 785 ip, mask = addr.split('/', 1) 786 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32 787 return valid_ip(addr) 788 789 790def valid_ip6(addr): 791 # type: (str) -> bool 792 try: 793 addr = plain_str(addr) 794 except UnicodeDecodeError: 795 return False 796 try: 797 inet_pton(socket.AF_INET6, addr) 798 except socket.error: 799 return False 800 return True 801 802 803def valid_net6(addr): 804 # type: (str) -> bool 805 try: 806 addr = plain_str(addr) 807 except UnicodeDecodeError: 808 return False 809 if '/' in addr: 810 ip, mask = addr.split('/', 1) 811 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128 812 return valid_ip6(addr) 813 814 815def ltoa(x): 816 # type: (int) -> str 817 return inet_ntoa(struct.pack("!I", x & 0xffffffff)) 818 819 820def itom(x): 821 # type: (int) -> int 822 return (0xffffffff00000000 >> x) & 0xffffffff 823 824 825def in4_cidr2mask(m): 826 # type: (int) -> bytes 827 """ 828 Return the mask (bitstring) associated with provided length 829 value. For instance if function is called on 20, return value is 830 b'\xff\xff\xf0\x00'. 831 """ 832 if m > 32 or m < 0: 833 raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m) # noqa: E501 834 835 return strxor( 836 b"\xff" * 4, 837 struct.pack(">I", 2**(32 - m) - 1) 838 ) 839 840 841def in4_isincluded(addr, prefix, mask): 842 # type: (str, str, int) -> bool 843 """ 844 Returns True when 'addr' belongs to prefix/mask. False otherwise. 845 """ 846 temp = inet_pton(socket.AF_INET, addr) 847 pref = in4_cidr2mask(mask) 848 zero = inet_pton(socket.AF_INET, prefix) 849 return zero == strand(temp, pref) 850 851 852def in4_ismaddr(str): 853 # type: (str) -> bool 854 """ 855 Returns True if provided address in printable format belongs to 856 allocated Multicast address space (224.0.0.0/4). 857 """ 858 return in4_isincluded(str, "224.0.0.0", 4) 859 860 861def in4_ismlladdr(str): 862 # type: (str) -> bool 863 """ 864 Returns True if address belongs to link-local multicast address 865 space (224.0.0.0/24) 866 """ 867 return in4_isincluded(str, "224.0.0.0", 24) 868 869 870def in4_ismgladdr(str): 871 # type: (str) -> bool 872 """ 873 Returns True if address belongs to global multicast address 874 space (224.0.1.0-238.255.255.255). 875 """ 876 return ( 877 in4_isincluded(str, "224.0.0.0", 4) and 878 not in4_isincluded(str, "224.0.0.0", 24) and 879 not in4_isincluded(str, "239.0.0.0", 8) 880 ) 881 882 883def in4_ismlsaddr(str): 884 # type: (str) -> bool 885 """ 886 Returns True if address belongs to limited scope multicast address 887 space (239.0.0.0/8). 888 """ 889 return in4_isincluded(str, "239.0.0.0", 8) 890 891 892def in4_isaddrllallnodes(str): 893 # type: (str) -> bool 894 """ 895 Returns True if address is the link-local all-nodes multicast 896 address (224.0.0.1). 897 """ 898 return (inet_pton(socket.AF_INET, "224.0.0.1") == 899 inet_pton(socket.AF_INET, str)) 900 901 902def in4_getnsmac(a): 903 # type: (bytes) -> str 904 """ 905 Return the multicast mac address associated with provided 906 IPv4 address. Passed address must be in network format. 907 """ 908 909 return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3]) 910 911 912def decode_locale_str(x): 913 # type: (bytes) -> str 914 """ 915 Decode bytes into a string using the system locale. 916 Useful on Windows where it can be unusual (e.g. cp1252) 917 """ 918 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace") 919 920 921class ContextManagerSubprocess(object): 922 """ 923 Context manager that eases checking for unknown command, without 924 crashing. 925 926 Example: 927 >>> with ContextManagerSubprocess("tcpdump"): 928 >>> subprocess.Popen(["tcpdump", "--version"]) 929 ERROR: Could not execute tcpdump, is it installed? 930 931 """ 932 933 def __init__(self, prog, suppress=True): 934 # type: (str, bool) -> None 935 self.prog = prog 936 self.suppress = suppress 937 938 def __enter__(self): 939 # type: () -> None 940 pass 941 942 def __exit__(self, 943 exc_type, # type: Optional[type] 944 exc_value, # type: Optional[Exception] 945 traceback, # type: Optional[Any] 946 ): 947 # type: (...) -> Optional[bool] 948 if exc_value is None or exc_type is None: 949 return None 950 # Errored 951 if isinstance(exc_value, EnvironmentError): 952 msg = "Could not execute %s, is it installed?" % self.prog 953 else: 954 msg = "%s: execution failed (%s)" % ( 955 self.prog, 956 exc_type.__class__.__name__ 957 ) 958 if not self.suppress: 959 raise exc_type(msg) 960 log_runtime.error(msg, exc_info=True) 961 return True # Suppress the exception 962 963 964class ContextManagerCaptureOutput(object): 965 """ 966 Context manager that intercept the console's output. 967 968 Example: 969 >>> with ContextManagerCaptureOutput() as cmco: 970 ... print("hey") 971 ... assert cmco.get_output() == "hey" 972 """ 973 974 def __init__(self): 975 # type: () -> None 976 self.result_export_object = "" 977 978 def __enter__(self): 979 # type: () -> ContextManagerCaptureOutput 980 from unittest import mock 981 982 def write(s, decorator=self): 983 # type: (str, ContextManagerCaptureOutput) -> None 984 decorator.result_export_object += s 985 mock_stdout = mock.Mock() 986 mock_stdout.write = write 987 self.bck_stdout = sys.stdout 988 sys.stdout = mock_stdout 989 return self 990 991 def __exit__(self, *exc): 992 # type: (*Any) -> Literal[False] 993 sys.stdout = self.bck_stdout 994 return False 995 996 def get_output(self, eval_bytes=False): 997 # type: (bool) -> str 998 if self.result_export_object.startswith("b'") and eval_bytes: 999 return plain_str(eval(self.result_export_object)) 1000 return self.result_export_object 1001 1002 1003def do_graph( 1004 graph, # type: str 1005 prog=None, # type: Optional[str] 1006 format=None, # type: Optional[str] 1007 target=None, # type: Optional[Union[IO[bytes], str]] 1008 type=None, # type: Optional[str] 1009 string=None, # type: Optional[bool] 1010 options=None # type: Optional[List[str]] 1011): 1012 # type: (...) -> Optional[str] 1013 """Processes graph description using an external software. 1014 This method is used to convert a graphviz format to an image. 1015 1016 :param graph: GraphViz graph description 1017 :param prog: which graphviz program to use 1018 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" 1019 option 1020 :param string: if not None, simply return the graph string 1021 :param target: filename or redirect. Defaults pipe to Imagemagick's 1022 display program 1023 :param options: options to be passed to prog 1024 """ 1025 1026 if format is None: 1027 format = "svg" 1028 if string: 1029 return graph 1030 if type is not None: 1031 warnings.warn( 1032 "type is deprecated, and was renamed format", 1033 DeprecationWarning 1034 ) 1035 format = type 1036 if prog is None: 1037 prog = conf.prog.dot 1038 start_viewer = False 1039 if target is None: 1040 if WINDOWS: 1041 target = get_temp_file(autoext="." + format) 1042 start_viewer = True 1043 else: 1044 with ContextManagerSubprocess(conf.prog.display): 1045 target = subprocess.Popen([conf.prog.display], 1046 stdin=subprocess.PIPE).stdin 1047 if format is not None: 1048 format = "-T%s" % format 1049 if isinstance(target, str): 1050 if target.startswith('|'): 1051 target = subprocess.Popen(target[1:].lstrip(), shell=True, 1052 stdin=subprocess.PIPE).stdin 1053 elif target.startswith('>'): 1054 target = open(target[1:].lstrip(), "wb") 1055 else: 1056 target = open(os.path.abspath(target), "wb") 1057 target = cast(IO[bytes], target) 1058 proc = subprocess.Popen( 1059 "\"%s\" %s %s" % (prog, options or "", format or ""), 1060 shell=True, stdin=subprocess.PIPE, stdout=target, 1061 stderr=subprocess.PIPE 1062 ) 1063 _, stderr = proc.communicate(bytes_encode(graph)) 1064 if proc.returncode != 0: 1065 raise OSError( 1066 "GraphViz call failed (is it installed?):\n" + 1067 plain_str(stderr) 1068 ) 1069 try: 1070 target.close() 1071 except Exception: 1072 pass 1073 if start_viewer: 1074 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501 1075 waiting_start = time.time() 1076 while not os.path.exists(target.name): 1077 time.sleep(0.1) 1078 if time.time() - waiting_start > 3: 1079 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501 1080 break 1081 else: 1082 if WINDOWS and conf.prog.display == conf.prog._default: 1083 os.startfile(target.name) 1084 else: 1085 with ContextManagerSubprocess(conf.prog.display): 1086 subprocess.Popen([conf.prog.display, target.name]) 1087 return None 1088 1089 1090_TEX_TR = { 1091 "{": "{\\tt\\char123}", 1092 "}": "{\\tt\\char125}", 1093 "\\": "{\\tt\\char92}", 1094 "^": "\\^{}", 1095 "$": "\\$", 1096 "#": "\\#", 1097 "_": "\\_", 1098 "&": "\\&", 1099 "%": "\\%", 1100 "|": "{\\tt\\char124}", 1101 "~": "{\\tt\\char126}", 1102 "<": "{\\tt\\char60}", 1103 ">": "{\\tt\\char62}", 1104} 1105 1106 1107def tex_escape(x): 1108 # type: (str) -> str 1109 s = "" 1110 for c in x: 1111 s += _TEX_TR.get(c, c) 1112 return s 1113 1114 1115def colgen(*lstcol, # type: Any 1116 **kargs # type: Any 1117 ): 1118 # type: (...) -> Iterator[Any] 1119 """Returns a generator that mixes provided quantities forever 1120 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501 1121 if len(lstcol) < 2: 1122 lstcol *= 2 1123 trans = kargs.get("trans", lambda x, y, z: (x, y, z)) 1124 while True: 1125 for i in range(len(lstcol)): 1126 for j in range(len(lstcol)): 1127 for k in range(len(lstcol)): 1128 if i != j or j != k or k != i: 1129 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501 1130 1131 1132def incremental_label(label="tag%05i", start=0): 1133 # type: (str, int) -> Iterator[str] 1134 while True: 1135 yield label % start 1136 start += 1 1137 1138 1139def binrepr(val): 1140 # type: (int) -> str 1141 return bin(val)[2:] 1142 1143 1144def long_converter(s): 1145 # type: (str) -> int 1146 return int(s.replace('\n', '').replace(' ', ''), 16) 1147 1148######################### 1149# Enum management # 1150######################### 1151 1152 1153class EnumElement: 1154 def __init__(self, key, value): 1155 # type: (str, int) -> None 1156 self._key = key 1157 self._value = value 1158 1159 def __repr__(self): 1160 # type: () -> str 1161 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501 1162 1163 def __getattr__(self, attr): 1164 # type: (str) -> Any 1165 return getattr(self._value, attr) 1166 1167 def __str__(self): 1168 # type: () -> str 1169 return self._key 1170 1171 def __bytes__(self): 1172 # type: () -> bytes 1173 return bytes_encode(self.__str__()) 1174 1175 def __hash__(self): 1176 # type: () -> int 1177 return self._value 1178 1179 def __int__(self): 1180 # type: () -> int 1181 return int(self._value) 1182 1183 def __eq__(self, other): 1184 # type: (Any) -> bool 1185 return self._value == int(other) 1186 1187 def __neq__(self, other): 1188 # type: (Any) -> bool 1189 return not self.__eq__(other) 1190 1191 1192class Enum_metaclass(type): 1193 element_class = EnumElement 1194 1195 def __new__(cls, name, bases, dct): 1196 # type: (Any, str, Any, Dict[str, Any]) -> Any 1197 rdict = {} 1198 for k, v in dct.items(): 1199 if isinstance(v, int): 1200 v = cls.element_class(k, v) 1201 dct[k] = v 1202 rdict[v] = k 1203 dct["__rdict__"] = rdict 1204 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) 1205 1206 def __getitem__(self, attr): 1207 # type: (int) -> Any 1208 return self.__rdict__[attr] # type: ignore 1209 1210 def __contains__(self, val): 1211 # type: (int) -> bool 1212 return val in self.__rdict__ # type: ignore 1213 1214 def get(self, attr, val=None): 1215 # type: (str, Optional[Any]) -> Any 1216 return self.__rdict__.get(attr, val) # type: ignore 1217 1218 def __repr__(self): 1219 # type: () -> str 1220 return "<%s>" % self.__dict__.get("name", self.__name__) 1221 1222 1223################### 1224# Object saving # 1225################### 1226 1227 1228def export_object(obj): 1229 # type: (Any) -> None 1230 import zlib 1231 print(base64.b64encode(zlib.compress(pickle.dumps(obj, 2), 9)).decode()) 1232 1233 1234def import_object(obj=None): 1235 # type: (Optional[str]) -> Any 1236 import zlib 1237 if obj is None: 1238 obj = sys.stdin.read() 1239 return pickle.loads(zlib.decompress(base64.b64decode(obj.strip()))) 1240 1241 1242def save_object(fname, obj): 1243 # type: (str, Any) -> None 1244 """Pickle a Python object""" 1245 1246 fd = gzip.open(fname, "wb") 1247 pickle.dump(obj, fd) 1248 fd.close() 1249 1250 1251def load_object(fname): 1252 # type: (str) -> Any 1253 """unpickle a Python object""" 1254 return pickle.load(gzip.open(fname, "rb")) 1255 1256 1257@conf.commands.register 1258def corrupt_bytes(data, p=0.01, n=None): 1259 # type: (str, float, Optional[int]) -> bytes 1260 """ 1261 Corrupt a given percentage (at least one byte) or number of bytes 1262 from a string 1263 """ 1264 s = array.array("B", bytes_encode(data)) 1265 s_len = len(s) 1266 if n is None: 1267 n = max(1, int(s_len * p)) 1268 for i in random.sample(range(s_len), n): 1269 s[i] = (s[i] + random.randint(1, 255)) % 256 1270 return s.tobytes() 1271 1272 1273@conf.commands.register 1274def corrupt_bits(data, p=0.01, n=None): 1275 # type: (str, float, Optional[int]) -> bytes 1276 """ 1277 Flip a given percentage (at least one bit) or number of bits 1278 from a string 1279 """ 1280 s = array.array("B", bytes_encode(data)) 1281 s_len = len(s) * 8 1282 if n is None: 1283 n = max(1, int(s_len * p)) 1284 for i in random.sample(range(s_len), n): 1285 s[i // 8] ^= 1 << (i % 8) 1286 return s.tobytes() 1287 1288 1289############################# 1290# pcap capture file stuff # 1291############################# 1292 1293@conf.commands.register 1294def wrpcap(filename, # type: Union[IO[bytes], str] 1295 pkt, # type: _PacketIterable 1296 *args, # type: Any 1297 **kargs # type: Any 1298 ): 1299 # type: (...) -> None 1300 """Write a list of packets to a pcap file 1301 1302 :param filename: the name of the file to write packets to, or an open, 1303 writable file-like object. The file descriptor will be 1304 closed at the end of the call, so do not use an object you 1305 do not want to close (e.g., running wrpcap(sys.stdout, []) 1306 in interactive mode will crash Scapy). 1307 :param gz: set to 1 to save a gzipped capture 1308 :param linktype: force linktype value 1309 :param endianness: "<" or ">", force endianness 1310 :param sync: do not bufferize writes to the capture file 1311 """ 1312 with PcapWriter(filename, *args, **kargs) as fdesc: 1313 fdesc.write(pkt) 1314 1315 1316@conf.commands.register 1317def wrpcapng(filename, # type: str 1318 pkt, # type: _PacketIterable 1319 ): 1320 # type: (...) -> None 1321 """Write a list of packets to a pcapng file 1322 1323 :param filename: the name of the file to write packets to, or an open, 1324 writable file-like object. The file descriptor will be 1325 closed at the end of the call, so do not use an object you 1326 do not want to close (e.g., running wrpcapng(sys.stdout, []) 1327 in interactive mode will crash Scapy). 1328 :param pkt: packets to write 1329 """ 1330 with PcapNgWriter(filename) as fdesc: 1331 fdesc.write(pkt) 1332 1333 1334@conf.commands.register 1335def rdpcap(filename, count=-1): 1336 # type: (Union[IO[bytes], str], int) -> PacketList 1337 """Read a pcap or pcapng file and return a packet list 1338 1339 :param count: read only <count> packets 1340 """ 1341 # Rant: Our complicated use of metaclasses and especially the 1342 # __call__ function is, of course, not supported by MyPy. 1343 # One day we should simplify this mess and use a much simpler 1344 # layout that will actually be supported and properly dissected. 1345 with PcapReader(filename) as fdesc: # type: ignore 1346 return fdesc.read_all(count=count) 1347 1348 1349# NOTE: Type hinting 1350# Mypy doesn't understand the following metaclass, and thinks each 1351# constructor (PcapReader...) needs 3 arguments each. To avoid this, 1352# we add a fake (=None) to the last 2 arguments then force the value 1353# to not be None in the signature and pack the whole thing in an ignore. 1354# This allows to not have # type: ignore every time we call those 1355# constructors. 1356 1357class PcapReader_metaclass(type): 1358 """Metaclass for (Raw)Pcap(Ng)Readers""" 1359 1360 def __new__(cls, name, bases, dct): 1361 # type: (Any, str, Any, Dict[str, Any]) -> Any 1362 """The `alternative` class attribute is declared in the PcapNg 1363 variant, and set here to the Pcap variant. 1364 1365 """ 1366 newcls = super(PcapReader_metaclass, cls).__new__( 1367 cls, name, bases, dct 1368 ) 1369 if 'alternative' in dct: 1370 dct['alternative'].alternative = newcls 1371 return newcls 1372 1373 def __call__(cls, filename): 1374 # type: (Union[IO[bytes], str]) -> Any 1375 """Creates a cls instance, use the `alternative` if that 1376 fails. 1377 1378 """ 1379 i = cls.__new__( 1380 cls, 1381 cls.__name__, 1382 cls.__bases__, 1383 cls.__dict__ # type: ignore 1384 ) 1385 filename, fdesc, magic = cls.open(filename) 1386 if not magic: 1387 raise Scapy_Exception( 1388 "No data could be read!" 1389 ) 1390 try: 1391 i.__init__(filename, fdesc, magic) 1392 return i 1393 except (Scapy_Exception, EOFError): 1394 pass 1395 1396 if "alternative" in cls.__dict__: 1397 cls = cls.__dict__["alternative"] 1398 i = cls.__new__( 1399 cls, 1400 cls.__name__, 1401 cls.__bases__, 1402 cls.__dict__ # type: ignore 1403 ) 1404 try: 1405 i.__init__(filename, fdesc, magic) 1406 return i 1407 except (Scapy_Exception, EOFError): 1408 pass 1409 1410 raise Scapy_Exception("Not a supported capture file") 1411 1412 @staticmethod 1413 def open(fname # type: Union[IO[bytes], str] 1414 ): 1415 # type: (...) -> Tuple[str, _ByteStream, bytes] 1416 """Open (if necessary) filename, and read the magic.""" 1417 if isinstance(fname, str): 1418 filename = fname 1419 fdesc = open(filename, "rb") # type: _ByteStream 1420 magic = fdesc.read(2) 1421 if magic == b"\x1f\x8b": 1422 # GZIP header detected. 1423 fdesc.seek(0) 1424 fdesc = gzip.GzipFile(fileobj=fdesc) 1425 magic = fdesc.read(2) 1426 magic += fdesc.read(2) 1427 else: 1428 fdesc = fname 1429 filename = getattr(fdesc, "name", "No name") 1430 magic = fdesc.read(4) 1431 return filename, fdesc, magic 1432 1433 1434class RawPcapReader(metaclass=PcapReader_metaclass): 1435 """A stateful pcap reader. Each packet is returned as a string""" 1436 1437 # TODO: use Generics to properly type the various readers. 1438 # As of right now, RawPcapReader is typed as if it returned packets 1439 # because all of its child do. Fix that 1440 1441 nonblocking_socket = True 1442 PacketMetadata = collections.namedtuple("PacketMetadata", 1443 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501 1444 1445 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 1446 # type: (str, _ByteStream, bytes) -> None 1447 self.filename = filename 1448 self.f = fdesc 1449 if magic == b"\xa1\xb2\xc3\xd4": # big endian 1450 self.endian = ">" 1451 self.nano = False 1452 elif magic == b"\xd4\xc3\xb2\xa1": # little endian 1453 self.endian = "<" 1454 self.nano = False 1455 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision 1456 self.endian = ">" 1457 self.nano = True 1458 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501 1459 self.endian = "<" 1460 self.nano = True 1461 else: 1462 raise Scapy_Exception( 1463 "Not a pcap capture file (bad magic: %r)" % magic 1464 ) 1465 hdr = self.f.read(20) 1466 if len(hdr) < 20: 1467 raise Scapy_Exception("Invalid pcap file (too short)") 1468 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack( 1469 self.endian + "HHIIII", hdr 1470 ) 1471 self.linktype = linktype 1472 self.snaplen = snaplen 1473 1474 def __enter__(self): 1475 # type: () -> RawPcapReader 1476 return self 1477 1478 def __iter__(self): 1479 # type: () -> RawPcapReader 1480 return self 1481 1482 def __next__(self): 1483 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata] 1484 """ 1485 implement the iterator protocol on a set of packets in a pcap file 1486 """ 1487 try: 1488 return self._read_packet() 1489 except EOFError: 1490 raise StopIteration 1491 1492 def _read_packet(self, size=MTU): 1493 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata] 1494 """return a single packet read from the file as a tuple containing 1495 (pkt_data, pkt_metadata) 1496 1497 raise EOFError when no more packets are available 1498 """ 1499 hdr = self.f.read(16) 1500 if len(hdr) < 16: 1501 raise EOFError 1502 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr) 1503 1504 try: 1505 data = self.f.read(caplen)[:size] 1506 except OverflowError as e: 1507 warning(f"Pcap: {e}") 1508 raise EOFError 1509 1510 return (data, 1511 RawPcapReader.PacketMetadata(sec=sec, usec=usec, 1512 wirelen=wirelen, caplen=caplen)) 1513 1514 def read_packet(self, size=MTU): 1515 # type: (int) -> Packet 1516 raise Exception( 1517 "Cannot call read_packet() in RawPcapReader. Use " 1518 "_read_packet()" 1519 ) 1520 1521 def dispatch(self, 1522 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501 1523 ): 1524 # type: (...) -> None 1525 """call the specified callback routine for each packet read 1526 1527 This is just a convenience function for the main loop 1528 that allows for easy launching of packet processing in a 1529 thread. 1530 """ 1531 for p in self: 1532 callback(p) 1533 1534 def _read_all(self, count=-1): 1535 # type: (int) -> List[Packet] 1536 """return a list of all packets in the pcap file 1537 """ 1538 res = [] # type: List[Packet] 1539 while count != 0: 1540 count -= 1 1541 try: 1542 p = self.read_packet() # type: Packet 1543 except EOFError: 1544 break 1545 res.append(p) 1546 return res 1547 1548 def recv(self, size=MTU): 1549 # type: (int) -> bytes 1550 """ Emulate a socket 1551 """ 1552 return self._read_packet(size=size)[0] 1553 1554 def fileno(self): 1555 # type: () -> int 1556 return -1 if WINDOWS else self.f.fileno() 1557 1558 def close(self): 1559 # type: () -> None 1560 if isinstance(self.f, gzip.GzipFile): 1561 self.f.fileobj.close() # type: ignore 1562 self.f.close() 1563 1564 def __exit__(self, exc_type, exc_value, tracback): 1565 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 1566 self.close() 1567 1568 # emulate SuperSocket 1569 @staticmethod 1570 def select(sockets, # type: List[SuperSocket] 1571 remain=None, # type: Optional[float] 1572 ): 1573 # type: (...) -> List[SuperSocket] 1574 return sockets 1575 1576 1577class PcapReader(RawPcapReader): 1578 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 1579 # type: (str, IO[bytes], bytes) -> None 1580 RawPcapReader.__init__(self, filename, fdesc, magic) 1581 try: 1582 self.LLcls = conf.l2types.num2layer[ 1583 self.linktype 1584 ] # type: Type[Packet] 1585 except KeyError: 1586 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501 1587 if conf.raw_layer is None: 1588 # conf.raw_layer is set on import 1589 import scapy.packet # noqa: F401 1590 self.LLcls = conf.raw_layer 1591 1592 def __enter__(self): 1593 # type: () -> PcapReader 1594 return self 1595 1596 def read_packet(self, size=MTU, **kwargs): 1597 # type: (int, **Any) -> Packet 1598 rp = super(PcapReader, self)._read_packet(size=size) 1599 if rp is None: 1600 raise EOFError 1601 s, pkt_info = rp 1602 1603 try: 1604 p = self.LLcls(s, **kwargs) # type: Packet 1605 except KeyboardInterrupt: 1606 raise 1607 except Exception: 1608 if conf.debug_dissector: 1609 from scapy.sendrecv import debug 1610 debug.crashed_on = (self.LLcls, s) 1611 raise 1612 if conf.raw_layer is None: 1613 # conf.raw_layer is set on import 1614 import scapy.packet # noqa: F401 1615 p = conf.raw_layer(s) 1616 power = Decimal(10) ** Decimal(-9 if self.nano else -6) 1617 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec) 1618 p.wirelen = pkt_info.wirelen 1619 return p 1620 1621 def recv(self, size=MTU, **kwargs): # type: ignore 1622 # type: (int, **Any) -> Packet 1623 return self.read_packet(size=size, **kwargs) 1624 1625 def __next__(self): # type: ignore 1626 # type: () -> Packet 1627 try: 1628 return self.read_packet() 1629 except EOFError: 1630 raise StopIteration 1631 1632 def read_all(self, count=-1): 1633 # type: (int) -> PacketList 1634 res = self._read_all(count) 1635 from scapy import plist 1636 return plist.PacketList(res, name=os.path.basename(self.filename)) 1637 1638 1639class RawPcapNgReader(RawPcapReader): 1640 """A stateful pcapng reader. Each packet is returned as 1641 bytes. 1642 1643 """ 1644 1645 alternative = RawPcapReader # type: Type[Any] 1646 1647 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore 1648 ["linktype", "tsresol", 1649 "tshigh", "tslow", "wirelen", 1650 "comment", "ifname", "direction", 1651 "process_information"]) 1652 1653 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 1654 # type: (str, IO[bytes], bytes) -> None 1655 self.filename = filename 1656 self.f = fdesc 1657 # A list of (linktype, snaplen, tsresol); will be populated by IDBs. 1658 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]] 1659 self.default_options = { 1660 "tsresol": 1000000 1661 } 1662 self.blocktypes: Dict[ 1663 int, 1664 Callable[ 1665 [bytes, int], 1666 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] 1667 ]] = { 1668 1: self._read_block_idb, 1669 2: self._read_block_pkt, 1670 3: self._read_block_spb, 1671 6: self._read_block_epb, 1672 10: self._read_block_dsb, 1673 0x80000001: self._read_block_pib, 1674 } 1675 self.endian = "!" # Will be overwritten by first SHB 1676 self.process_information = [] # type: List[Dict[str, Any]] 1677 1678 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg: 1679 raise Scapy_Exception( 1680 "Not a pcapng capture file (bad magic: %r)" % magic 1681 ) 1682 1683 try: 1684 self._read_block_shb() 1685 except EOFError: 1686 raise Scapy_Exception( 1687 "The first SHB of the pcapng file is malformed !" 1688 ) 1689 1690 def _read_block(self, size=MTU): 1691 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501 1692 try: 1693 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0] 1694 except struct.error: 1695 raise EOFError 1696 if blocktype == 0x0A0D0D0A: 1697 # This function updates the endianness based on the block content. 1698 self._read_block_shb() 1699 return None 1700 try: 1701 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0] 1702 except struct.error: 1703 warning("PcapNg: Error reading blocklen before block body") 1704 raise EOFError 1705 if blocklen < 12: 1706 warning("PcapNg: Invalid block length !") 1707 raise EOFError 1708 1709 _block_body_length = blocklen - 12 1710 block = self.f.read(_block_body_length) 1711 if len(block) != _block_body_length: 1712 raise Scapy_Exception("PcapNg: Invalid Block body length " 1713 "(too short)") 1714 self._read_block_tail(blocklen) 1715 if blocktype in self.blocktypes: 1716 return self.blocktypes[blocktype](block, size) 1717 return None 1718 1719 def _read_block_tail(self, blocklen): 1720 # type: (int) -> None 1721 if blocklen % 4: 1722 pad = self.f.read(-blocklen % 4) 1723 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. " 1724 "Ignored padding %r" % (blocklen, pad)) 1725 try: 1726 if blocklen != struct.unpack(self.endian + 'I', 1727 self.f.read(4))[0]: 1728 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)") 1729 except struct.error: 1730 warning("PcapNg: Could not read blocklen after block body") 1731 raise EOFError 1732 1733 def _read_block_shb(self): 1734 # type: () -> None 1735 """Section Header Block""" 1736 _blocklen = self.f.read(4) 1737 endian = self.f.read(4) 1738 if endian == b"\x1a\x2b\x3c\x4d": 1739 self.endian = ">" 1740 elif endian == b"\x4d\x3c\x2b\x1a": 1741 self.endian = "<" 1742 else: 1743 warning("PcapNg: Bad magic in Section Header Block" 1744 " (not a pcapng file?)") 1745 raise EOFError 1746 1747 try: 1748 blocklen = struct.unpack(self.endian + "I", _blocklen)[0] 1749 except struct.error: 1750 warning("PcapNg: Could not read blocklen") 1751 raise EOFError 1752 if blocklen < 28: 1753 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501 1754 raise EOFError 1755 1756 # Major version must be 1 1757 _major = self.f.read(2) 1758 try: 1759 major = struct.unpack(self.endian + "H", _major)[0] 1760 except struct.error: 1761 warning("PcapNg: Could not read major value") 1762 raise EOFError 1763 if major != 1: 1764 warning(f"PcapNg: SHB Major version {major} unsupported !") 1765 raise EOFError 1766 1767 # Skip minor version & section length 1768 skipped = self.f.read(10) 1769 if len(skipped) != 10: 1770 warning("PcapNg: Could not read minor value & section length") 1771 raise EOFError 1772 1773 _options_len = blocklen - 28 1774 options = self.f.read(_options_len) 1775 if len(options) != _options_len: 1776 raise Scapy_Exception("PcapNg: Invalid Section Header Block " 1777 " options (too short)") 1778 self._read_block_tail(blocklen) 1779 self._read_options(options) 1780 1781 def _read_packet(self, size=MTU): # type: ignore 1782 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 1783 """Read blocks until it reaches either EOF or a packet, and 1784 returns None or (packet, (linktype, sec, usec, wirelen)), 1785 where packet is a string. 1786 1787 """ 1788 while True: 1789 res = self._read_block() 1790 if res is not None: 1791 return res 1792 1793 def _read_options(self, options): 1794 # type: (bytes) -> Dict[int, bytes] 1795 opts = dict() 1796 while len(options) >= 4: 1797 try: 1798 code, length = struct.unpack(self.endian + "HH", options[:4]) 1799 except struct.error: 1800 warning("PcapNg: options header is too small " 1801 "%d !" % len(options)) 1802 raise EOFError 1803 if code != 0 and 4 + length < len(options): 1804 opts[code] = options[4:4 + length] 1805 if code == 0: 1806 if length != 0: 1807 warning("PcapNg: invalid option " 1808 "length %d for end-of-option" % length) 1809 break 1810 if length % 4: 1811 length += (4 - (length % 4)) 1812 options = options[4 + length:] 1813 return opts 1814 1815 def _read_block_idb(self, block, _): 1816 # type: (bytes, int) -> None 1817 """Interface Description Block""" 1818 # 2 bytes LinkType + 2 bytes Reserved 1819 # 4 bytes Snaplen 1820 options_raw = self._read_options(block[8:]) 1821 options = self.default_options.copy() # type: Dict[str, Any] 1822 for c, v in options_raw.items(): 1823 if c == 9: 1824 length = len(v) 1825 if length == 1: 1826 tsresol = orb(v) 1827 options["tsresol"] = (2 if tsresol & 128 else 10) ** ( 1828 tsresol & 127 1829 ) 1830 else: 1831 warning("PcapNg: invalid options " 1832 "length %d for IDB tsresol" % length) 1833 elif c == 2: 1834 options["name"] = v 1835 elif c == 1: 1836 options["comment"] = v 1837 try: 1838 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack( 1839 self.endian + "HxxI", 1840 block[:8] 1841 ) + (options,) 1842 except struct.error: 1843 warning("PcapNg: IDB is too small %d/8 !" % len(block)) 1844 raise EOFError 1845 self.interfaces.append(interface) 1846 1847 def _check_interface_id(self, intid): 1848 # type: (int) -> None 1849 """Check the interface id value and raise EOFError if invalid.""" 1850 tmp_len = len(self.interfaces) 1851 if intid >= tmp_len: 1852 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len)) 1853 raise EOFError 1854 1855 def _read_block_epb(self, block, size): 1856 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 1857 """Enhanced Packet Block""" 1858 try: 1859 intid, tshigh, tslow, caplen, wirelen = struct.unpack( 1860 self.endian + "5I", 1861 block[:20], 1862 ) 1863 except struct.error: 1864 warning("PcapNg: EPB is too small %d/20 !" % len(block)) 1865 raise EOFError 1866 1867 # Compute the options offset taking padding into account 1868 if caplen % 4: 1869 opt_offset = 20 + caplen + (-caplen) % 4 1870 else: 1871 opt_offset = 20 + caplen 1872 1873 # Parse options 1874 options = self._read_options(block[opt_offset:]) 1875 1876 process_information = {} 1877 for code, value in options.items(): 1878 if code in [0x8001, 0x8003]: # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX 1879 try: 1880 proc_index = struct.unpack(self.endian + "I", value)[0] 1881 except struct.error: 1882 warning("PcapNg: EPB invalid proc index" 1883 "(expected 4 bytes, got %d) !" % len(value)) 1884 raise EOFError 1885 if proc_index < len(self.process_information): 1886 key = "proc" if code == 0x8001 else "eproc" 1887 process_information[key] = self.process_information[proc_index] 1888 else: 1889 warning("PcapNg: EPB invalid process information index " 1890 "(%d/%d) !" % (proc_index, len(self.process_information))) 1891 1892 comment = options.get(1, None) 1893 epb_flags_raw = options.get(2, None) 1894 if epb_flags_raw: 1895 try: 1896 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw) 1897 except struct.error: 1898 warning("PcapNg: EPB invalid flags size" 1899 "(expected 4 bytes, got %d) !" % len(epb_flags_raw)) 1900 raise EOFError 1901 direction = epb_flags & 3 1902 1903 else: 1904 direction = None 1905 1906 self._check_interface_id(intid) 1907 ifname = self.interfaces[intid][2].get('name', None) 1908 1909 return (block[20:20 + caplen][:size], 1910 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 1911 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 1912 tshigh=tshigh, 1913 tslow=tslow, 1914 wirelen=wirelen, 1915 comment=comment, 1916 ifname=ifname, 1917 direction=direction, 1918 process_information=process_information)) 1919 1920 def _read_block_spb(self, block, size): 1921 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 1922 """Simple Packet Block""" 1923 # "it MUST be assumed that all the Simple Packet Blocks have 1924 # been captured on the interface previously specified in the 1925 # first Interface Description Block." 1926 intid = 0 1927 self._check_interface_id(intid) 1928 1929 try: 1930 wirelen, = struct.unpack(self.endian + "I", block[:4]) 1931 except struct.error: 1932 warning("PcapNg: SPB is too small %d/4 !" % len(block)) 1933 raise EOFError 1934 1935 caplen = min(wirelen, self.interfaces[intid][1]) 1936 return (block[4:4 + caplen][:size], 1937 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 1938 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 1939 tshigh=None, 1940 tslow=None, 1941 wirelen=wirelen, 1942 comment=None, 1943 ifname=None, 1944 direction=None, 1945 process_information={})) 1946 1947 def _read_block_pkt(self, block, size): 1948 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 1949 """(Obsolete) Packet Block""" 1950 try: 1951 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack( 1952 self.endian + "HH4I", 1953 block[:20], 1954 ) 1955 except struct.error: 1956 warning("PcapNg: PKT is too small %d/20 !" % len(block)) 1957 raise EOFError 1958 1959 self._check_interface_id(intid) 1960 return (block[20:20 + caplen][:size], 1961 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 1962 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 1963 tshigh=tshigh, 1964 tslow=tslow, 1965 wirelen=wirelen, 1966 comment=None, 1967 ifname=None, 1968 direction=None, 1969 process_information={})) 1970 1971 def _read_block_dsb(self, block, size): 1972 # type: (bytes, int) -> None 1973 """Decryption Secrets Block""" 1974 1975 # Parse the secrets type and length fields 1976 try: 1977 secrets_type, secrets_length = struct.unpack( 1978 self.endian + "II", 1979 block[:8], 1980 ) 1981 block = block[8:] 1982 except struct.error: 1983 warning("PcapNg: DSB is too small %d!", len(block)) 1984 raise EOFError 1985 1986 # Compute the secrets length including the padding 1987 padded_secrets_length = secrets_length + (-secrets_length) % 4 1988 if len(block) < padded_secrets_length: 1989 warning("PcapNg: invalid DSB secrets length!") 1990 raise EOFError 1991 1992 # Extract secrets data and options 1993 secrets_data = block[:padded_secrets_length][:secrets_length] 1994 if block[padded_secrets_length:]: 1995 warning("PcapNg: DSB options are not supported!") 1996 1997 # TLS Key Log 1998 if secrets_type == 0x544c534b: 1999 if getattr(conf, "tls_sessions", False) is False: 2000 warning("PcapNg: TLS Key Log available, but " 2001 "the TLS layer is not loaded! Scapy won't be able " 2002 "to decrypt the packets.") 2003 else: 2004 from scapy.layers.tls.session import load_nss_keys 2005 2006 # Write Key Log to a file and parse it 2007 filename = get_temp_file() 2008 with open(filename, "wb") as fd: 2009 fd.write(secrets_data) 2010 fd.close() 2011 2012 keys = load_nss_keys(filename) 2013 if not keys: 2014 warning("PcapNg: invalid TLS Key Log in DSB!") 2015 else: 2016 # Note: these attributes are only available when the TLS 2017 # layer is loaded. 2018 conf.tls_nss_keys = keys 2019 conf.tls_session_enable = True 2020 else: 2021 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type) 2022 2023 def _read_block_pib(self, block, _): 2024 # type: (bytes, int) -> None 2025 """Apple Process Information Block""" 2026 2027 # Get the Process ID 2028 try: 2029 dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0] 2030 process_information = {"id": dpeb_pid} 2031 block = block[4:] 2032 except struct.error: 2033 warning("PcapNg: DPEB is too small (%d). Cannot get PID!", 2034 len(block)) 2035 raise EOFError 2036 2037 # Get Options 2038 options = self._read_options(block) 2039 for code, value in options.items(): 2040 if code == 2: 2041 process_information["name"] = value.decode("ascii", "backslashreplace") 2042 elif code == 4: 2043 if len(value) == 16: 2044 process_information["uuid"] = str(UUID(bytes=value)) 2045 else: 2046 warning("PcapNg: DPEB UUID length is invalid (%d)!", 2047 len(value)) 2048 2049 # Store process information 2050 self.process_information.append(process_information) 2051 2052 2053class PcapNgReader(RawPcapNgReader, PcapReader): 2054 2055 alternative = PcapReader 2056 2057 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 2058 # type: (str, IO[bytes], bytes) -> None 2059 RawPcapNgReader.__init__(self, filename, fdesc, magic) 2060 2061 def __enter__(self): 2062 # type: () -> PcapNgReader 2063 return self 2064 2065 def read_packet(self, size=MTU, **kwargs): 2066 # type: (int, **Any) -> Packet 2067 rp = super(PcapNgReader, self)._read_packet(size=size) 2068 if rp is None: 2069 raise EOFError 2070 s, (linktype, tsresol, tshigh, tslow, wirelen, comment, ifname, direction, process_information) = rp # noqa: E501 2071 try: 2072 cls = conf.l2types.num2layer[linktype] # type: Type[Packet] 2073 p = cls(s, **kwargs) # type: Packet 2074 except KeyboardInterrupt: 2075 raise 2076 except Exception: 2077 if conf.debug_dissector: 2078 raise 2079 if conf.raw_layer is None: 2080 # conf.raw_layer is set on import 2081 import scapy.packet # noqa: F401 2082 p = conf.raw_layer(s) 2083 if tshigh is not None: 2084 p.time = EDecimal((tshigh << 32) + tslow) / tsresol 2085 p.wirelen = wirelen 2086 p.comment = comment 2087 p.direction = direction 2088 p.process_information = process_information.copy() 2089 if ifname is not None: 2090 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace') 2091 return p 2092 2093 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore 2094 return self.read_packet(size=size, **kwargs) 2095 2096 2097class GenericPcapWriter(object): 2098 nano = False 2099 linktype: int 2100 2101 def _write_header(self, pkt): 2102 # type: (Optional[Union[Packet, bytes]]) -> None 2103 raise NotImplementedError 2104 2105 def _write_packet(self, 2106 packet, # type: Union[bytes, Packet] 2107 linktype, # type: int 2108 sec=None, # type: Optional[float] 2109 usec=None, # type: Optional[int] 2110 caplen=None, # type: Optional[int] 2111 wirelen=None, # type: Optional[int] 2112 comment=None, # type: Optional[bytes] 2113 ifname=None, # type: Optional[bytes] 2114 direction=None, # type: Optional[int] 2115 ): 2116 # type: (...) -> None 2117 raise NotImplementedError 2118 2119 def _get_time(self, 2120 packet, # type: Union[bytes, Packet] 2121 sec, # type: Optional[float] 2122 usec # type: Optional[int] 2123 ): 2124 # type: (...) -> Tuple[float, int] 2125 if hasattr(packet, "time"): 2126 if sec is None: 2127 packet_time = packet.time 2128 tmp = int(packet_time) 2129 usec = int(round((packet_time - tmp) * 2130 (1000000000 if self.nano else 1000000))) 2131 sec = float(packet_time) 2132 if sec is not None and usec is None: 2133 usec = 0 2134 return sec, usec # type: ignore 2135 2136 def write_header(self, pkt): 2137 # type: (Optional[Union[Packet, bytes]]) -> None 2138 if not hasattr(self, 'linktype'): 2139 try: 2140 if pkt is None or isinstance(pkt, bytes): 2141 # Can't guess LL 2142 raise KeyError 2143 self.linktype = conf.l2types.layer2num[ 2144 pkt.__class__ 2145 ] 2146 except KeyError: 2147 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)" 2148 warning(msg, self.__class__.__name__, pkt.__class__.__name__) 2149 self.linktype = DLT_EN10MB 2150 self._write_header(pkt) 2151 2152 def write_packet(self, 2153 packet, # type: Union[bytes, Packet] 2154 sec=None, # type: Optional[float] 2155 usec=None, # type: Optional[int] 2156 caplen=None, # type: Optional[int] 2157 wirelen=None, # type: Optional[int] 2158 ): 2159 # type: (...) -> None 2160 """ 2161 Writes a single packet to the pcap file. 2162 2163 :param packet: Packet, or bytes for a single packet 2164 :type packet: scapy.packet.Packet or bytes 2165 :param sec: time the packet was captured, in seconds since epoch. If 2166 not supplied, defaults to now. 2167 :type sec: float 2168 :param usec: If ``nano=True``, then number of nanoseconds after the 2169 second that the packet was captured. If ``nano=False``, 2170 then the number of microseconds after the second the 2171 packet was captured. If ``sec`` is not specified, 2172 this value is ignored. 2173 :type usec: int or long 2174 :param caplen: The length of the packet in the capture file. If not 2175 specified, uses ``len(raw(packet))``. 2176 :type caplen: int 2177 :param wirelen: The length of the packet on the wire. If not 2178 specified, tries ``packet.wirelen``, otherwise uses 2179 ``caplen``. 2180 :type wirelen: int 2181 :return: None 2182 :rtype: None 2183 """ 2184 f_sec, usec = self._get_time(packet, sec, usec) 2185 2186 rawpkt = bytes_encode(packet) 2187 caplen = len(rawpkt) if caplen is None else caplen 2188 2189 if wirelen is None: 2190 if hasattr(packet, "wirelen"): 2191 wirelen = packet.wirelen 2192 if wirelen is None: 2193 wirelen = caplen 2194 2195 comment = getattr(packet, "comment", None) 2196 ifname = getattr(packet, "sniffed_on", None) 2197 direction = getattr(packet, "direction", None) 2198 if not isinstance(packet, bytes): 2199 linktype: int = conf.l2types.layer2num[ 2200 packet.__class__ 2201 ] 2202 else: 2203 linktype = self.linktype 2204 if ifname is not None: 2205 ifname = str(ifname).encode('utf-8') 2206 self._write_packet( 2207 rawpkt, 2208 sec=f_sec, usec=usec, 2209 caplen=caplen, wirelen=wirelen, 2210 comment=comment, 2211 ifname=ifname, 2212 direction=direction, 2213 linktype=linktype 2214 ) 2215 2216 2217class GenericRawPcapWriter(GenericPcapWriter): 2218 header_present = False 2219 nano = False 2220 sync = False 2221 f = None # type: Union[IO[bytes], gzip.GzipFile] 2222 2223 def fileno(self): 2224 # type: () -> int 2225 return -1 if WINDOWS else self.f.fileno() 2226 2227 def flush(self): 2228 # type: () -> Optional[Any] 2229 return self.f.flush() 2230 2231 def close(self): 2232 # type: () -> Optional[Any] 2233 if not self.header_present: 2234 self.write_header(None) 2235 return self.f.close() 2236 2237 def __enter__(self): 2238 # type: () -> GenericRawPcapWriter 2239 return self 2240 2241 def __exit__(self, exc_type, exc_value, tracback): 2242 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 2243 self.flush() 2244 self.close() 2245 2246 def write(self, pkt): 2247 # type: (Union[_PacketIterable, bytes]) -> None 2248 """ 2249 Writes a Packet, a SndRcvList object, or bytes to a pcap file. 2250 2251 :param pkt: Packet(s) to write (one record for each Packet), or raw 2252 bytes to write (as one record). 2253 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes 2254 """ 2255 if isinstance(pkt, bytes): 2256 if not self.header_present: 2257 self.write_header(pkt) 2258 self.write_packet(pkt) 2259 else: 2260 # Import here to avoid circular dependency 2261 from scapy.supersocket import IterSocket 2262 for p in IterSocket(pkt).iter: 2263 if not self.header_present: 2264 self.write_header(p) 2265 2266 if not isinstance(p, bytes) and \ 2267 self.linktype != conf.l2types.get(type(p), None): 2268 warning("Inconsistent linktypes detected!" 2269 " The resulting file might contain" 2270 " invalid packets." 2271 ) 2272 2273 self.write_packet(p) 2274 2275 2276class RawPcapWriter(GenericRawPcapWriter): 2277 """A stream PCAP writer with more control than wrpcap()""" 2278 2279 def __init__(self, 2280 filename, # type: Union[IO[bytes], str] 2281 linktype=None, # type: Optional[int] 2282 gz=False, # type: bool 2283 endianness="", # type: str 2284 append=False, # type: bool 2285 sync=False, # type: bool 2286 nano=False, # type: bool 2287 snaplen=MTU, # type: int 2288 bufsz=4096, # type: int 2289 ): 2290 # type: (...) -> None 2291 """ 2292 :param filename: the name of the file to write packets to, or an open, 2293 writable file-like object. 2294 :param linktype: force linktype to a given value. If None, linktype is 2295 taken from the first writer packet 2296 :param gz: compress the capture on the fly 2297 :param endianness: force an endianness (little:"<", big:">"). 2298 Default is native 2299 :param append: append packets to the capture file instead of 2300 truncating it 2301 :param sync: do not bufferize writes to the capture file 2302 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0) 2303 2304 """ 2305 2306 if linktype: 2307 self.linktype = linktype 2308 self.snaplen = snaplen 2309 self.append = append 2310 self.gz = gz 2311 self.endian = endianness 2312 self.sync = sync 2313 self.nano = nano 2314 if sync: 2315 bufsz = 0 2316 2317 if isinstance(filename, str): 2318 self.filename = filename 2319 if gz: 2320 self.f = cast(_ByteStream, gzip.open( 2321 filename, append and "ab" or "wb", 9 2322 )) 2323 else: 2324 self.f = open(filename, append and "ab" or "wb", bufsz) 2325 else: 2326 self.f = filename 2327 self.filename = getattr(filename, "name", "No name") 2328 2329 def _write_header(self, pkt): 2330 # type: (Optional[Union[Packet, bytes]]) -> None 2331 self.header_present = True 2332 2333 if self.append: 2334 # Even if prone to race conditions, this seems to be 2335 # safest way to tell whether the header is already present 2336 # because we have to handle compressed streams that 2337 # are not as flexible as basic files 2338 if self.gz: 2339 g = gzip.open(self.filename, "rb") # type: _ByteStream 2340 else: 2341 g = open(self.filename, "rb") 2342 try: 2343 if g.read(16): 2344 return 2345 finally: 2346 g.close() 2347 2348 if not hasattr(self, 'linktype'): 2349 raise ValueError( 2350 "linktype could not be guessed. " 2351 "Please pass a linktype while creating the writer" 2352 ) 2353 2354 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501 2355 2, 4, 0, 0, self.snaplen, self.linktype)) 2356 self.f.flush() 2357 2358 def _write_packet(self, 2359 packet, # type: Union[bytes, Packet] 2360 linktype, # type: int 2361 sec=None, # type: Optional[float] 2362 usec=None, # type: Optional[int] 2363 caplen=None, # type: Optional[int] 2364 wirelen=None, # type: Optional[int] 2365 comment=None, # type: Optional[bytes] 2366 ifname=None, # type: Optional[bytes] 2367 direction=None, # type: Optional[int] 2368 ): 2369 # type: (...) -> None 2370 """ 2371 Writes a single packet to the pcap file. 2372 2373 :param packet: bytes for a single packet 2374 :type packet: bytes 2375 :param linktype: linktype value associated with the packet 2376 :type linktype: int 2377 :param sec: time the packet was captured, in seconds since epoch. If 2378 not supplied, defaults to now. 2379 :type sec: float 2380 :param usec: not used with pcapng 2381 packet was captured 2382 :type usec: int or long 2383 :param caplen: The length of the packet in the capture file. If not 2384 specified, uses ``len(packet)``. 2385 :type caplen: int 2386 :param wirelen: The length of the packet on the wire. If not 2387 specified, uses ``caplen``. 2388 :type wirelen: int 2389 :return: None 2390 :rtype: None 2391 """ 2392 if caplen is None: 2393 caplen = len(packet) 2394 if wirelen is None: 2395 wirelen = caplen 2396 if sec is None or usec is None: 2397 t = time.time() 2398 it = int(t) 2399 if sec is None: 2400 sec = it 2401 usec = int(round((t - it) * 2402 (1000000000 if self.nano else 1000000))) 2403 elif usec is None: 2404 usec = 0 2405 2406 self.f.write(struct.pack(self.endian + "IIII", 2407 int(sec), usec, caplen, wirelen)) 2408 self.f.write(bytes(packet)) 2409 if self.sync: 2410 self.f.flush() 2411 2412 2413class RawPcapNgWriter(GenericRawPcapWriter): 2414 """A stream pcapng writer with more control than wrpcapng()""" 2415 2416 def __init__(self, 2417 filename, # type: str 2418 ): 2419 # type: (...) -> None 2420 2421 self.header_present = False 2422 self.tsresol = 1000000 2423 # A dict to keep if_name to IDB id mapping. 2424 # unknown if_name(None) id=0 2425 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0} 2426 2427 # tcpdump only support little-endian in PCAPng files 2428 self.endian = "<" 2429 self.endian_magic = b"\x4d\x3c\x2b\x1a" 2430 2431 self.filename = filename 2432 self.f = open(filename, "wb", 4096) 2433 2434 def _get_time(self, 2435 packet, # type: Union[bytes, Packet] 2436 sec, # type: Optional[float] 2437 usec # type: Optional[int] 2438 ): 2439 # type: (...) -> Tuple[float, int] 2440 if hasattr(packet, "time"): 2441 if sec is None: 2442 sec = float(packet.time) 2443 2444 if usec is None: 2445 usec = 0 2446 2447 return sec, usec # type: ignore 2448 2449 def _add_padding(self, raw_data): 2450 # type: (bytes) -> bytes 2451 raw_data += ((-len(raw_data)) % 4) * b"\x00" 2452 return raw_data 2453 2454 def build_block(self, block_type, block_body, options=None): 2455 # type: (bytes, bytes, Optional[bytes]) -> bytes 2456 2457 # Pad Block Body to 32 bits 2458 block_body = self._add_padding(block_body) 2459 2460 if options: 2461 block_body += options 2462 2463 # An empty block is 12 bytes long 2464 block_total_length = 12 + len(block_body) 2465 2466 # Block Type 2467 block = block_type 2468 # Block Total Length$ 2469 block += struct.pack(self.endian + "I", block_total_length) 2470 # Block Body 2471 block += block_body 2472 # Block Total Length$ 2473 block += struct.pack(self.endian + "I", block_total_length) 2474 2475 return block 2476 2477 def _write_header(self, pkt): 2478 # type: (Optional[Union[Packet, bytes]]) -> None 2479 if not self.header_present: 2480 self.header_present = True 2481 self._write_block_shb() 2482 self._write_block_idb(linktype=self.linktype) 2483 2484 def _write_block_shb(self): 2485 # type: () -> None 2486 2487 # Block Type 2488 block_type = b"\x0A\x0D\x0D\x0A" 2489 # Byte-Order Magic 2490 block_shb = self.endian_magic 2491 # Major Version 2492 block_shb += struct.pack(self.endian + "H", 1) 2493 # Minor Version 2494 block_shb += struct.pack(self.endian + "H", 0) 2495 # Section Length 2496 block_shb += struct.pack(self.endian + "q", -1) 2497 2498 self.f.write(self.build_block(block_type, block_shb)) 2499 2500 def _write_block_idb(self, 2501 linktype, # type: int 2502 ifname=None # type: Optional[bytes] 2503 ): 2504 # type: (...) -> None 2505 2506 # Block Type 2507 block_type = struct.pack(self.endian + "I", 1) 2508 # LinkType 2509 block_idb = struct.pack(self.endian + "H", linktype) 2510 # Reserved 2511 block_idb += struct.pack(self.endian + "H", 0) 2512 # SnapLen 2513 block_idb += struct.pack(self.endian + "I", 262144) 2514 2515 # if_name option 2516 opts = None 2517 if ifname is not None: 2518 opts = struct.pack(self.endian + "HH", 2, len(ifname)) 2519 # Pad Option Value to 32 bits 2520 opts += self._add_padding(ifname) 2521 opts += struct.pack(self.endian + "HH", 0, 0) 2522 2523 self.f.write(self.build_block(block_type, block_idb, options=opts)) 2524 2525 def _write_block_spb(self, raw_pkt): 2526 # type: (bytes) -> None 2527 2528 # Block Type 2529 block_type = struct.pack(self.endian + "I", 3) 2530 # Original Packet Length 2531 block_spb = struct.pack(self.endian + "I", len(raw_pkt)) 2532 # Packet Data 2533 block_spb += raw_pkt 2534 2535 self.f.write(self.build_block(block_type, block_spb)) 2536 2537 def _write_block_epb(self, 2538 raw_pkt, # type: bytes 2539 ifid, # type: int 2540 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501 2541 caplen=None, # type: Optional[int] 2542 orglen=None, # type: Optional[int] 2543 comment=None, # type: Optional[bytes] 2544 flags=None, # type: Optional[int] 2545 ): 2546 # type: (...) -> None 2547 2548 if timestamp: 2549 tmp_ts = int(timestamp * self.tsresol) 2550 ts_high = tmp_ts >> 32 2551 ts_low = tmp_ts & 0xFFFFFFFF 2552 else: 2553 ts_high = ts_low = 0 2554 2555 if not caplen: 2556 caplen = len(raw_pkt) 2557 2558 if not orglen: 2559 orglen = len(raw_pkt) 2560 2561 # Block Type 2562 block_type = struct.pack(self.endian + "I", 6) 2563 # Interface ID 2564 block_epb = struct.pack(self.endian + "I", ifid) 2565 # Timestamp (High) 2566 block_epb += struct.pack(self.endian + "I", ts_high) 2567 # Timestamp (Low) 2568 block_epb += struct.pack(self.endian + "I", ts_low) 2569 # Captured Packet Length 2570 block_epb += struct.pack(self.endian + "I", caplen) 2571 # Original Packet Length 2572 block_epb += struct.pack(self.endian + "I", orglen) 2573 # Packet Data 2574 block_epb += raw_pkt 2575 2576 # Options 2577 opts = b'' 2578 if comment is not None: 2579 comment = bytes_encode(comment) 2580 opts += struct.pack(self.endian + "HH", 1, len(comment)) 2581 # Pad Option Value to 32 bits 2582 opts += self._add_padding(comment) 2583 if type(flags) == int: 2584 opts += struct.pack(self.endian + "HH", 2, 4) 2585 opts += struct.pack(self.endian + "I", flags) 2586 if opts: 2587 opts += struct.pack(self.endian + "HH", 0, 0) 2588 2589 self.f.write(self.build_block(block_type, block_epb, 2590 options=opts)) 2591 2592 def _write_packet(self, # type: ignore 2593 packet, # type: bytes 2594 linktype, # type: int 2595 sec=None, # type: Optional[float] 2596 usec=None, # type: Optional[int] 2597 caplen=None, # type: Optional[int] 2598 wirelen=None, # type: Optional[int] 2599 comment=None, # type: Optional[bytes] 2600 ifname=None, # type: Optional[bytes] 2601 direction=None, # type: Optional[int] 2602 ): 2603 # type: (...) -> None 2604 """ 2605 Writes a single packet to the pcap file. 2606 2607 :param packet: bytes for a single packet 2608 :type packet: bytes 2609 :param linktype: linktype value associated with the packet 2610 :type linktype: int 2611 :param sec: time the packet was captured, in seconds since epoch. If 2612 not supplied, defaults to now. 2613 :type sec: float 2614 :param caplen: The length of the packet in the capture file. If not 2615 specified, uses ``len(packet)``. 2616 :type caplen: int 2617 :param wirelen: The length of the packet on the wire. If not 2618 specified, uses ``caplen``. 2619 :type wirelen: int 2620 :param comment: UTF-8 string containing human-readable comment text 2621 that is associated to the current block. Line separators 2622 SHOULD be a carriage-return + linefeed ('\r\n') or 2623 just linefeed ('\n'); either form may appear and 2624 be considered a line separator. The string is not 2625 zero-terminated. 2626 :type bytes 2627 :param ifname: UTF-8 string containing the 2628 name of the device used to capture data. 2629 The string is not zero-terminated. 2630 :type bytes 2631 :param direction: 0 = information not available, 2632 1 = inbound, 2633 2 = outbound 2634 :type int 2635 :return: None 2636 :rtype: None 2637 """ 2638 if caplen is None: 2639 caplen = len(packet) 2640 if wirelen is None: 2641 wirelen = caplen 2642 2643 ifid = self.interfaces2id.get(ifname, None) 2644 if ifid is None: 2645 ifid = max(self.interfaces2id.values()) + 1 2646 self.interfaces2id[ifname] = ifid 2647 self._write_block_idb(linktype=linktype, ifname=ifname) 2648 2649 # EPB flags (32 bits). 2650 # currently only direction is implemented (least 2 significant bits) 2651 if type(direction) == int: 2652 flags = direction & 0x3 2653 else: 2654 flags = None 2655 2656 self._write_block_epb(packet, timestamp=sec, caplen=caplen, 2657 orglen=wirelen, comment=comment, ifid=ifid, flags=flags) 2658 if self.sync: 2659 self.f.flush() 2660 2661 2662class PcapWriter(RawPcapWriter): 2663 """A stream PCAP writer with more control than wrpcap()""" 2664 pass 2665 2666 2667class PcapNgWriter(RawPcapNgWriter): 2668 """A stream pcapng writer with more control than wrpcapng()""" 2669 2670 def _get_time(self, 2671 packet, # type: Union[bytes, Packet] 2672 sec, # type: Optional[float] 2673 usec # type: Optional[int] 2674 ): 2675 # type: (...) -> Tuple[float, int] 2676 if hasattr(packet, "time"): 2677 if sec is None: 2678 sec = float(packet.time) 2679 2680 if usec is None: 2681 usec = 0 2682 2683 return sec, usec # type: ignore 2684 2685 2686@conf.commands.register 2687def rderf(filename, count=-1): 2688 # type: (Union[IO[bytes], str], int) -> PacketList 2689 """Read a ERF file and return a packet list 2690 2691 :param count: read only <count> packets 2692 """ 2693 with ERFEthernetReader(filename) as fdesc: 2694 return fdesc.read_all(count=count) 2695 2696 2697class ERFEthernetReader_metaclass(PcapReader_metaclass): 2698 def __call__(cls, filename): 2699 # type: (Union[IO[bytes], str]) -> Any 2700 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore 2701 filename, fdesc = cls.open(filename) 2702 try: 2703 i.__init__(filename, fdesc) 2704 return i 2705 except (Scapy_Exception, EOFError): 2706 pass 2707 2708 if "alternative" in cls.__dict__: 2709 cls = cls.__dict__["alternative"] 2710 i = cls.__new__( 2711 cls, 2712 cls.__name__, 2713 cls.__bases__, 2714 cls.__dict__ # type: ignore 2715 ) 2716 try: 2717 i.__init__(filename, fdesc) 2718 return i 2719 except (Scapy_Exception, EOFError): 2720 pass 2721 2722 raise Scapy_Exception("Not a supported capture file") 2723 2724 @staticmethod 2725 def open(fname # type: ignore 2726 ): 2727 # type: (...) -> Tuple[str, _ByteStream] 2728 """Open (if necessary) filename""" 2729 if isinstance(fname, str): 2730 filename = fname 2731 try: 2732 with gzip.open(filename, "rb") as tmp: 2733 tmp.read(1) 2734 fdesc = gzip.open(filename, "rb") # type: _ByteStream 2735 except IOError: 2736 fdesc = open(filename, "rb") 2737 2738 else: 2739 fdesc = fname 2740 filename = getattr(fdesc, "name", "No name") 2741 return filename, fdesc 2742 2743 2744class ERFEthernetReader(PcapReader, 2745 metaclass=ERFEthernetReader_metaclass): 2746 2747 def __init__(self, filename, fdesc=None): # type: ignore 2748 # type: (Union[IO[bytes], str], IO[bytes]) -> None 2749 self.filename = filename # type: ignore 2750 self.f = fdesc 2751 self.power = Decimal(10) ** Decimal(-9) 2752 2753 # time is in 64-bits Endace's format which can be see here: 2754 # https://www.endace.com/erf-extensible-record-format-types.pdf 2755 def _convert_erf_timestamp(self, t): 2756 # type: (int) -> EDecimal 2757 sec = t >> 32 2758 frac_sec = t & 0xffffffff 2759 frac_sec *= 10**9 2760 frac_sec += (frac_sec & 0x80000000) << 1 2761 frac_sec >>= 32 2762 return EDecimal(sec + self.power * frac_sec) 2763 2764 # The details of ERF Packet format can be see here: 2765 # https://www.endace.com/erf-extensible-record-format-types.pdf 2766 def read_packet(self, size=MTU, **kwargs): 2767 # type: (int, **Any) -> Packet 2768 2769 # General ERF Header have exactly 16 bytes 2770 hdr = self.f.read(16) 2771 if len(hdr) < 16: 2772 raise EOFError 2773 2774 # The timestamp is in little-endian byte-order. 2775 time = struct.unpack('<Q', hdr[:8])[0] 2776 # The rest is in big-endian byte-order. 2777 # Ignoring flags and lctr (loss counter) since they are ERF specific 2778 # header fields which Packet object does not support. 2779 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:]) 2780 # Check if the type != 0x02, type Ethernet 2781 if type & 0x02 == 0: 2782 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)") 2783 2784 # If there are extended headers, ignore it because Packet object does 2785 # not support it. Extended headers size is 8 bytes before the payload. 2786 if type & 0x80: 2787 _ = self.f.read(8) 2788 s = self.f.read(rlen - 24) 2789 else: 2790 s = self.f.read(rlen - 16) 2791 2792 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both 2793 # of the fields are disregarded by Endace. 2794 pb = s[2:size] 2795 from scapy.layers.l2 import Ether 2796 try: 2797 p = Ether(pb, **kwargs) # type: Packet 2798 except KeyboardInterrupt: 2799 raise 2800 except Exception: 2801 if conf.debug_dissector: 2802 from scapy.sendrecv import debug 2803 debug.crashed_on = (Ether, s) 2804 raise 2805 if conf.raw_layer is None: 2806 # conf.raw_layer is set on import 2807 import scapy.packet # noqa: F401 2808 p = conf.raw_layer(s) 2809 2810 p.time = self._convert_erf_timestamp(time) 2811 p.wirelen = wlen 2812 2813 return p 2814 2815 2816@conf.commands.register 2817def wrerf(filename, # type: Union[IO[bytes], str] 2818 pkt, # type: _PacketIterable 2819 *args, # type: Any 2820 **kargs # type: Any 2821 ): 2822 # type: (...) -> None 2823 """Write a list of packets to a ERF file 2824 2825 :param filename: the name of the file to write packets to, or an open, 2826 writable file-like object. The file descriptor will be 2827 closed at the end of the call, so do not use an object you 2828 do not want to close (e.g., running wrerf(sys.stdout, []) 2829 in interactive mode will crash Scapy). 2830 :param gz: set to 1 to save a gzipped capture 2831 :param append: append packets to the capture file instead of 2832 truncating it 2833 :param sync: do not bufferize writes to the capture file 2834 """ 2835 with ERFEthernetWriter(filename, *args, **kargs) as fdesc: 2836 fdesc.write(pkt) 2837 2838 2839class ERFEthernetWriter(PcapWriter): 2840 """A stream ERF Ethernet writer with more control than wrerf()""" 2841 2842 def __init__(self, 2843 filename, # type: Union[IO[bytes], str] 2844 gz=False, # type: bool 2845 append=False, # type: bool 2846 sync=False, # type: bool 2847 ): 2848 # type: (...) -> None 2849 """ 2850 :param filename: the name of the file to write packets to, or an open, 2851 writable file-like object. 2852 :param gz: compress the capture on the fly 2853 :param append: append packets to the capture file instead of 2854 truncating it 2855 :param sync: do not bufferize writes to the capture file 2856 """ 2857 super(ERFEthernetWriter, self).__init__(filename, 2858 gz=gz, 2859 append=append, 2860 sync=sync) 2861 2862 def write(self, pkt): # type: ignore 2863 # type: (_PacketIterable) -> None 2864 """ 2865 Writes a Packet, a SndRcvList object, or bytes to a ERF file. 2866 2867 :param pkt: Packet(s) to write (one record for each Packet) 2868 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet 2869 """ 2870 # Import here to avoid circular dependency 2871 from scapy.supersocket import IterSocket 2872 for p in IterSocket(pkt).iter: 2873 self.write_packet(p) 2874 2875 def write_packet(self, pkt): # type: ignore 2876 # type: (Packet) -> None 2877 2878 if hasattr(pkt, "time"): 2879 sec = int(pkt.time) 2880 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9) 2881 t = (sec << 32) + usec 2882 else: 2883 t = int(time.time()) << 32 2884 2885 # There are 16 bytes of headers + 2 bytes of padding before the packets 2886 # payload. 2887 rlen = len(pkt) + 18 2888 2889 if hasattr(pkt, "wirelen"): 2890 wirelen = pkt.wirelen 2891 if wirelen is None: 2892 wirelen = rlen 2893 2894 self.f.write(struct.pack("<Q", t)) 2895 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0)) 2896 self.f.write(bytes(pkt)) 2897 self.f.flush() 2898 2899 def close(self): 2900 # type: () -> Optional[Any] 2901 return self.f.close() 2902 2903 2904@conf.commands.register 2905def import_hexcap(input_string=None): 2906 # type: (Optional[str]) -> bytes 2907 """Imports a tcpdump like hexadecimal view 2908 2909 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex" 2910 2911 :param input_string: String containing the hexdump input to parse. If None, 2912 read from standard input. 2913 """ 2914 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501 2915 p = "" 2916 try: 2917 if input_string: 2918 input_function = StringIO(input_string).readline 2919 else: 2920 input_function = input 2921 while True: 2922 line = input_function().strip() 2923 if not line: 2924 break 2925 try: 2926 p += re_extract_hexcap.match(line).groups()[2] # type: ignore 2927 except Exception: 2928 warning("Parsing error during hexcap") 2929 continue 2930 except EOFError: 2931 pass 2932 2933 p = p.replace(" ", "") 2934 return hex_bytes(p) 2935 2936 2937@conf.commands.register 2938def wireshark(pktlist, wait=False, **kwargs): 2939 # type: (List[Packet], bool, **Any) -> Optional[Any] 2940 """ 2941 Runs Wireshark on a list of packets. 2942 2943 See :func:`tcpdump` for more parameter description. 2944 2945 Note: this defaults to wait=False, to run Wireshark in the background. 2946 """ 2947 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs) 2948 2949 2950@conf.commands.register 2951def tdecode( 2952 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable] 2953 args=None, # type: Optional[List[str]] 2954 **kwargs # type: Any 2955): 2956 # type: (...) -> Any 2957 """ 2958 Run tshark on a list of packets. 2959 2960 :param args: If not specified, defaults to ``tshark -V``. 2961 2962 See :func:`tcpdump` for more parameters. 2963 """ 2964 if args is None: 2965 args = ["-V"] 2966 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs) 2967 2968 2969def _guess_linktype_name(value): 2970 # type: (int) -> str 2971 """Guess the DLT name from its value.""" 2972 from scapy.libs.winpcapy import pcap_datalink_val_to_name 2973 return cast(bytes, pcap_datalink_val_to_name(value)).decode() 2974 2975 2976def _guess_linktype_value(name): 2977 # type: (str) -> int 2978 """Guess the value of a DLT name.""" 2979 from scapy.libs.winpcapy import pcap_datalink_name_to_val 2980 val = cast(int, pcap_datalink_name_to_val(name.encode())) 2981 if val == -1: 2982 warning("Unknown linktype: %s. Using EN10MB", name) 2983 return DLT_EN10MB 2984 return val 2985 2986 2987@conf.commands.register 2988def tcpdump( 2989 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable] 2990 dump=False, # type: bool 2991 getfd=False, # type: bool 2992 args=None, # type: Optional[List[str]] 2993 flt=None, # type: Optional[str] 2994 prog=None, # type: Optional[Any] 2995 getproc=False, # type: bool 2996 quiet=False, # type: bool 2997 use_tempfile=None, # type: Optional[Any] 2998 read_stdin_opts=None, # type: Optional[Any] 2999 linktype=None, # type: Optional[Any] 3000 wait=True, # type: bool 3001 _suppress=False # type: bool 3002): 3003 # type: (...) -> Any 3004 """Run tcpdump or tshark on a list of packets. 3005 3006 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a 3007 temporary file to store the packets. This works around a bug in Apple's 3008 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/ 3009 3010 Otherwise, the packets are passed in stdin. 3011 3012 This function can be explicitly enabled or disabled with the 3013 ``use_tempfile`` parameter. 3014 3015 When using ``wireshark``, it will be called with ``-ki -`` to start 3016 immediately capturing packets from stdin. 3017 3018 Otherwise, the command will be run with ``-r -`` (which is correct for 3019 ``tcpdump`` and ``tshark``). 3020 3021 This can be overridden with ``read_stdin_opts``. This has no effect when 3022 ``use_tempfile=True``, or otherwise reading packets from a regular file. 3023 3024 :param pktlist: a Packet instance, a PacketList instance or a list of 3025 Packet instances. Can also be a filename (as a string), an open 3026 file-like object that must be a file format readable by 3027 tshark (Pcap, PcapNg, etc.) or None (to sniff) 3028 :param flt: a filter to use with tcpdump 3029 :param dump: when set to True, returns a string instead of displaying it. 3030 :param getfd: when set to True, returns a file-like object to read data 3031 from tcpdump or tshark from. 3032 :param getproc: when set to True, the subprocess.Popen object is returned 3033 :param args: arguments (as a list) to pass to tshark (example for tshark: 3034 args=["-T", "json"]). 3035 :param prog: program to use (defaults to tcpdump, will work with tshark) 3036 :param quiet: when set to True, the process stderr is discarded 3037 :param use_tempfile: When set to True, always use a temporary file to store 3038 packets. 3039 When set to False, pipe packets through stdin. 3040 When set to None (default), only use a temporary file with 3041 ``tcpdump`` on OSX. 3042 :param read_stdin_opts: When set, a list of arguments needed to capture 3043 from stdin. Otherwise, attempts to guess. 3044 :param linktype: A custom DLT value or name, to overwrite the default 3045 values. 3046 :param wait: If True (default), waits for the process to terminate before 3047 returning to Scapy. If False, the process will be detached to the 3048 background. If dump, getproc or getfd is True, these have the same 3049 effect as ``wait=False``. 3050 3051 Examples:: 3052 3053 >>> tcpdump([IP()/TCP(), IP()/UDP()]) 3054 reading from file -, link-type RAW (Raw IP) 3055 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501 3056 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain] 3057 3058 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark) 3059 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501 3060 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0 3061 3062 To get a JSON representation of a tshark-parsed PacketList(), one can:: 3063 3064 >>> import json, pprint 3065 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5", 3066 ... dst="45.33.32.156"), 3067 ... prog=conf.prog.tshark, 3068 ... args=["-T", "json"], 3069 ... getfd=True)) 3070 >>> pprint.pprint(json_data) 3071 [{u'_index': u'packets-2016-12-23', 3072 u'_score': None, 3073 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20', 3074 u'frame.encap_type': u'7', 3075 [...] 3076 }, 3077 u'ip': {u'ip.addr': u'45.33.32.156', 3078 u'ip.checksum': u'0x0000a20d', 3079 [...] 3080 u'ip.ttl': u'64', 3081 u'ip.version': u'4'}, 3082 u'raw': u'Raw packet data'}}, 3083 u'_type': u'pcap_file'}] 3084 >>> json_data[0]['_source']['layers']['ip']['ip.ttl'] 3085 u'64' 3086 """ 3087 getfd = getfd or getproc 3088 if prog is None: 3089 if not conf.prog.tcpdump: 3090 raise Scapy_Exception( 3091 "tcpdump is not available" 3092 ) 3093 prog = [conf.prog.tcpdump] 3094 elif isinstance(prog, str): 3095 prog = [prog] 3096 else: 3097 raise ValueError("prog must be a string") 3098 3099 if linktype is not None: 3100 if isinstance(linktype, int): 3101 # Guess name from value 3102 try: 3103 linktype_name = _guess_linktype_name(linktype) 3104 except StopIteration: 3105 linktype = -1 3106 else: 3107 # Guess value from name 3108 if linktype.startswith("DLT_"): 3109 linktype = linktype[4:] 3110 linktype_name = linktype 3111 try: 3112 linktype = _guess_linktype_value(linktype) 3113 except KeyError: 3114 linktype = -1 3115 if linktype == -1: 3116 raise ValueError( 3117 "Unknown linktype. Try passing its datalink name instead" 3118 ) 3119 prog += ["-y", linktype_name] 3120 3121 # Build Popen arguments 3122 if args is None: 3123 args = [] 3124 else: 3125 # Make a copy of args 3126 args = list(args) 3127 3128 if flt is not None: 3129 # Check the validity of the filter 3130 if linktype is None and isinstance(pktlist, str): 3131 # linktype is unknown but required. Read it from file 3132 with PcapReader(pktlist) as rd: 3133 if isinstance(rd, PcapNgReader): 3134 # Get the linktype from the first packet 3135 try: 3136 _, metadata = rd._read_packet() 3137 linktype = metadata.linktype 3138 if OPENBSD and linktype == 228: 3139 linktype = DLT_RAW 3140 except EOFError: 3141 raise ValueError( 3142 "Cannot get linktype from a PcapNg packet." 3143 ) 3144 else: 3145 linktype = rd.linktype 3146 from scapy.arch.common import compile_filter 3147 compile_filter(flt, linktype=linktype) 3148 args.append(flt) 3149 3150 stdout = subprocess.PIPE if dump or getfd else None 3151 stderr = open(os.devnull) if quiet else None 3152 proc = None 3153 3154 if use_tempfile is None: 3155 # Apple's tcpdump cannot read from stdin, see: 3156 # http://apple.stackexchange.com/questions/152682/ 3157 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump 3158 3159 if read_stdin_opts is None: 3160 if prog[0] == conf.prog.wireshark: 3161 # Start capturing immediately (-k) from stdin (-i -) 3162 read_stdin_opts = ["-ki", "-"] 3163 elif prog[0] == conf.prog.tcpdump and not OPENBSD: 3164 # Capture in packet-buffered mode (-U) from stdin (-r -) 3165 read_stdin_opts = ["-U", "-r", "-"] 3166 else: 3167 read_stdin_opts = ["-r", "-"] 3168 else: 3169 # Make a copy of read_stdin_opts 3170 read_stdin_opts = list(read_stdin_opts) 3171 3172 if pktlist is None: 3173 # sniff 3174 with ContextManagerSubprocess(prog[0], suppress=_suppress): 3175 proc = subprocess.Popen( 3176 prog + args, 3177 stdout=stdout, 3178 stderr=stderr, 3179 ) 3180 elif isinstance(pktlist, str): 3181 # file 3182 with ContextManagerSubprocess(prog[0], suppress=_suppress): 3183 proc = subprocess.Popen( 3184 prog + ["-r", pktlist] + args, 3185 stdout=stdout, 3186 stderr=stderr, 3187 ) 3188 elif use_tempfile: 3189 tmpfile = get_temp_file( # type: ignore 3190 autoext=".pcap", 3191 fd=True 3192 ) # type: IO[bytes] 3193 try: 3194 tmpfile.writelines( 3195 iter(lambda: pktlist.read(1048576), b"") # type: ignore 3196 ) 3197 except AttributeError: 3198 pktlist = cast("_PacketIterable", pktlist) 3199 wrpcap(tmpfile, pktlist, linktype=linktype) 3200 else: 3201 tmpfile.close() 3202 with ContextManagerSubprocess(prog[0], suppress=_suppress): 3203 proc = subprocess.Popen( 3204 prog + ["-r", tmpfile.name] + args, 3205 stdout=stdout, 3206 stderr=stderr, 3207 ) 3208 else: 3209 try: 3210 pktlist.fileno() # type: ignore 3211 # pass the packet stream 3212 with ContextManagerSubprocess(prog[0], suppress=_suppress): 3213 proc = subprocess.Popen( 3214 prog + read_stdin_opts + args, 3215 stdin=pktlist, # type: ignore 3216 stdout=stdout, 3217 stderr=stderr, 3218 ) 3219 except (AttributeError, ValueError): 3220 # write the packet stream to stdin 3221 with ContextManagerSubprocess(prog[0], suppress=_suppress): 3222 proc = subprocess.Popen( 3223 prog + read_stdin_opts + args, 3224 stdin=subprocess.PIPE, 3225 stdout=stdout, 3226 stderr=stderr, 3227 ) 3228 if proc is None: 3229 # An error has occurred 3230 return 3231 try: 3232 proc.stdin.writelines( # type: ignore 3233 iter(lambda: pktlist.read(1048576), b"") # type: ignore 3234 ) 3235 except AttributeError: 3236 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore 3237 except UnboundLocalError: 3238 # The error was handled by ContextManagerSubprocess 3239 pass 3240 else: 3241 proc.stdin.close() # type: ignore 3242 if proc is None: 3243 # An error has occurred 3244 return 3245 if dump: 3246 data = b"".join( 3247 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore 3248 ) 3249 proc.terminate() 3250 return data 3251 if getproc: 3252 return proc 3253 if getfd: 3254 return proc.stdout 3255 if wait: 3256 proc.wait() 3257 3258 3259@conf.commands.register 3260def hexedit(pktlist): 3261 # type: (_PacketIterable) -> PacketList 3262 """Run hexedit on a list of packets, then return the edited packets.""" 3263 f = get_temp_file() 3264 wrpcap(f, pktlist) 3265 with ContextManagerSubprocess(conf.prog.hexedit): 3266 subprocess.call([conf.prog.hexedit, f]) 3267 rpktlist = rdpcap(f) 3268 os.unlink(f) 3269 return rpktlist 3270 3271 3272def get_terminal_width(): 3273 # type: () -> Optional[int] 3274 """Get terminal width (number of characters) if in a window. 3275 3276 Notice: this will try several methods in order to 3277 support as many terminals and OS as possible. 3278 """ 3279 sizex = shutil.get_terminal_size(fallback=(0, 0))[0] 3280 if sizex != 0: 3281 return sizex 3282 # Backups 3283 if WINDOWS: 3284 from ctypes import windll, create_string_buffer 3285 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ 3286 h = windll.kernel32.GetStdHandle(-12) 3287 csbi = create_string_buffer(22) 3288 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 3289 if res: 3290 (bufx, bufy, curx, cury, wattr, 3291 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501 3292 sizex = right - left + 1 3293 # sizey = bottom - top + 1 3294 return sizex 3295 return sizex 3296 # We have various methods 3297 # COLUMNS is set on some terminals 3298 try: 3299 sizex = int(os.environ['COLUMNS']) 3300 except Exception: 3301 pass 3302 if sizex: 3303 return sizex 3304 # We can query TIOCGWINSZ 3305 try: 3306 import fcntl 3307 import termios 3308 s = struct.pack('HHHH', 0, 0, 0, 0) 3309 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) 3310 sizex = struct.unpack('HHHH', x)[1] 3311 except (IOError, ModuleNotFoundError): 3312 # If everything failed, return default terminal size 3313 sizex = 79 3314 return sizex 3315 3316 3317def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]] 3318 header, # type: List[Tuple[str, ...]] 3319 sortBy=0, # type: Optional[int] 3320 borders=False, # type: bool 3321 ): 3322 # type: (...) -> str 3323 """ 3324 Pretty list to fit the terminal, and add header. 3325 3326 :param rtlst: a list of tuples. each tuple contains a value which can 3327 be either a string or a list of string. 3328 :param sortBy: the column id (starting with 0) which will be used for 3329 ordering 3330 :param borders: whether to put borders on the table or not 3331 """ 3332 if borders: 3333 _space = "|" 3334 else: 3335 _space = " " 3336 cols = len(header[0]) 3337 # Windows has a fat terminal border 3338 _spacelen = len(_space) * (cols - 1) + int(WINDOWS) 3339 _croped = False 3340 if sortBy is not None: 3341 # Sort correctly 3342 rtlst.sort(key=lambda x: x[sortBy]) 3343 # Resolve multi-values 3344 for i, line in enumerate(rtlst): 3345 ids = [] # type: List[int] 3346 values = [] # type: List[Union[str, List[str]]] 3347 for j, val in enumerate(line): 3348 if isinstance(val, list): 3349 ids.append(j) 3350 values.append(val or " ") 3351 if values: 3352 del rtlst[i] 3353 k = 0 3354 for ex_vals in zip_longest(*values, fillvalue=" "): 3355 if k: 3356 extra_line = [" "] * cols 3357 else: 3358 extra_line = list(line) # type: ignore 3359 for j, h in enumerate(ids): 3360 extra_line[h] = ex_vals[j] 3361 rtlst.insert(i + k, tuple(extra_line)) 3362 k += 1 3363 rtslst = cast(List[Tuple[str, ...]], rtlst) 3364 # Append tag 3365 rtslst = header + rtslst 3366 # Detect column's width 3367 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)] 3368 # Make text fit in box (if required) 3369 width = get_terminal_width() 3370 if conf.auto_crop_tables and width: 3371 width = width - _spacelen 3372 while sum(colwidth) > width: 3373 _croped = True 3374 # Needs to be cropped 3375 # Get the longest row 3376 i = colwidth.index(max(colwidth)) 3377 # Get all elements of this row 3378 row = [len(x[i]) for x in rtslst] 3379 # Get biggest element of this row: biggest of the array 3380 j = row.index(max(row)) 3381 # Re-build column tuple with the edited element 3382 t = list(rtslst[j]) 3383 t[i] = t[i][:-2] + "_" 3384 rtslst[j] = tuple(t) 3385 # Update max size 3386 row[j] = len(t[i]) 3387 colwidth[i] = max(row) 3388 if _croped: 3389 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501 3390 # Generate padding scheme 3391 fmt = _space.join(["%%-%ds" % x for x in colwidth]) 3392 # Append separation line if needed 3393 if borders: 3394 rtslst.insert(1, tuple("-" * x for x in colwidth)) 3395 # Compile 3396 return "\n".join(fmt % x for x in rtslst) 3397 3398 3399def human_size(x, fmt=".1f"): 3400 # type: (int, str) -> str 3401 """ 3402 Convert a size in octets to a human string representation 3403 """ 3404 units = ['K', 'M', 'G', 'T', 'P', 'E'] 3405 if not x: 3406 return "0B" 3407 i = int(math.log(x, 2**10)) 3408 if i and i < len(units): 3409 return format(x / 2**(10 * i), fmt) + units[i - 1] 3410 return str(x) + "B" 3411 3412 3413def __make_table( 3414 yfmtfunc, # type: Callable[[int], str] 3415 fmtfunc, # type: Callable[[int], str] 3416 endline, # type: str 3417 data, # type: List[Tuple[Packet, Packet]] 3418 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]] 3419 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 3420 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 3421 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]] 3422 dump=False # type: bool 3423): 3424 # type: (...) -> Optional[str] 3425 """Core function of the make_table suite, which generates the table""" 3426 vx = {} # type: Dict[str, int] 3427 vy = {} # type: Dict[str, Optional[int]] 3428 vz = {} # type: Dict[Tuple[str, str], str] 3429 vxf = {} # type: Dict[str, str] 3430 3431 tmp_len = 0 3432 for e in data: 3433 xx, yy, zz = [str(s) for s in fxyz(*e)] 3434 tmp_len = max(len(yy), tmp_len) 3435 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz)) 3436 vy[yy] = None 3437 vz[(xx, yy)] = zz 3438 3439 vxk = list(vx) 3440 vyk = list(vy) 3441 if sortx: 3442 vxk.sort(key=sortx) 3443 else: 3444 try: 3445 vxk.sort(key=int) 3446 except Exception: 3447 try: 3448 vxk.sort(key=atol) 3449 except Exception: 3450 vxk.sort() 3451 if sorty: 3452 vyk.sort(key=sorty) 3453 else: 3454 try: 3455 vyk.sort(key=int) 3456 except Exception: 3457 try: 3458 vyk.sort(key=atol) 3459 except Exception: 3460 vyk.sort() 3461 3462 s = "" 3463 if seplinefunc: 3464 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk]) 3465 s += sepline + "\n" 3466 3467 fmt = yfmtfunc(tmp_len) 3468 s += fmt % "" 3469 s += ' ' 3470 for x in vxk: 3471 vxf[x] = fmtfunc(vx[x]) 3472 s += vxf[x] % x 3473 s += ' ' 3474 s += endline + "\n" 3475 if seplinefunc: 3476 s += sepline + "\n" 3477 for y in vyk: 3478 s += fmt % y 3479 s += ' ' 3480 for x in vxk: 3481 s += vxf[x] % vz.get((x, y), "-") 3482 s += ' ' 3483 s += endline + "\n" 3484 if seplinefunc: 3485 s += sepline + "\n" 3486 3487 if dump: 3488 return s 3489 else: 3490 print(s, end="") 3491 return None 3492 3493 3494def make_table(*args, **kargs): 3495 # type: (*Any, **Any) -> Optional[Any] 3496 return __make_table( 3497 lambda l: "%%-%is" % l, 3498 lambda l: "%%-%is" % l, 3499 "", 3500 *args, 3501 **kargs 3502 ) 3503 3504 3505def make_lined_table(*args, **kargs): 3506 # type: (*Any, **Any) -> Optional[str] 3507 return __make_table( # type: ignore 3508 lambda l: "%%-%is |" % l, 3509 lambda l: "%%-%is |" % l, 3510 "", 3511 *args, 3512 seplinefunc=lambda a, x: "+".join( 3513 '-' * (y + 2) for y in [a - 1] + x + [-2] 3514 ), 3515 **kargs 3516 ) 3517 3518 3519def make_tex_table(*args, **kargs): 3520 # type: (*Any, **Any) -> Optional[str] 3521 return __make_table( # type: ignore 3522 lambda l: "%s", 3523 lambda l: "& %s", 3524 "\\\\", 3525 *args, 3526 seplinefunc=lambda a, x: "\\hline", 3527 **kargs 3528 ) 3529 3530#################### 3531# WHOIS CLIENT # 3532#################### 3533 3534 3535def whois(ip_address): 3536 # type: (str) -> bytes 3537 """Whois client for Python""" 3538 whois_ip = str(ip_address) 3539 try: 3540 query = socket.gethostbyname(whois_ip) 3541 except Exception: 3542 query = whois_ip 3543 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 3544 s.connect(("whois.ripe.net", 43)) 3545 s.send(query.encode("utf8") + b"\r\n") 3546 answer = b"" 3547 while True: 3548 d = s.recv(4096) 3549 answer += d 3550 if not d: 3551 break 3552 s.close() 3553 ignore_tag = b"remarks:" 3554 # ignore all lines starting with the ignore_tag 3555 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501 3556 # remove empty lines at the bottom 3557 for i in range(1, len(lines)): 3558 if not lines[-i].strip(): 3559 del lines[-i] 3560 else: 3561 break 3562 return b"\n".join(lines[3:]) 3563 3564#################### 3565# CLI utils # 3566#################### 3567 3568 3569class CLIUtil: 3570 """ 3571 Provides a Util class to easily create simple CLI tools in Scapy, 3572 that can still be used as an API. 3573 3574 Doc: 3575 - override the ps1() function 3576 - register commands with the @CLIUtil.addcomment decorator 3577 - call the loop() function when ready 3578 """ 3579 3580 def _depcheck(self) -> None: 3581 """ 3582 Check that all dependencies are installed 3583 """ 3584 try: 3585 import prompt_toolkit # noqa: F401 3586 except ImportError: 3587 # okay we lie but prompt_toolkit is a dependency... 3588 raise ImportError("You need to have IPython installed to use the CLI") 3589 3590 # Okay let's do nice code 3591 commands: Dict[str, Callable[..., Any]] = {} 3592 # print output of command 3593 commands_output: Dict[str, Callable[..., str]] = {} 3594 # provides completion to command 3595 commands_complete: Dict[str, Callable[..., List[str]]] = {} 3596 3597 @staticmethod 3598 def _inspectkwargs(func: DecoratorCallable) -> None: 3599 """ 3600 Internal function to parse arguments from the kwargs of the functions 3601 """ 3602 func._flagnames = [ # type: ignore 3603 x.name for x in 3604 inspect.signature(func).parameters.values() 3605 if x.kind == inspect.Parameter.KEYWORD_ONLY 3606 ] 3607 func._flags = [ # type: ignore 3608 ("-%s" % x) if len(x) == 1 else ("--%s" % x) 3609 for x in func._flagnames # type: ignore 3610 ] 3611 3612 @staticmethod 3613 def _parsekwargs( 3614 func: DecoratorCallable, 3615 args: List[str] 3616 ) -> Tuple[List[str], Dict[str, Literal[True]]]: 3617 """ 3618 Internal function to parse CLI arguments of a function. 3619 """ 3620 kwargs: Dict[str, Literal[True]] = {} 3621 if func._flags: # type: ignore 3622 i = 0 3623 for arg in args: 3624 if arg in func._flags: # type: ignore 3625 i += 1 3626 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501 3627 continue 3628 break 3629 args = args[i:] 3630 return args, kwargs 3631 3632 @classmethod 3633 def _parseallargs( 3634 cls, 3635 func: DecoratorCallable, 3636 cmd: str, args: List[str] 3637 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]: 3638 """ 3639 Internal function to parse CLI arguments of both the function 3640 and its output function. 3641 """ 3642 args, kwargs = cls._parsekwargs(func, args) 3643 outkwargs: Dict[str, Literal[True]] = {} 3644 if cmd in cls.commands_output: 3645 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args) 3646 return args, kwargs, outkwargs 3647 3648 @classmethod 3649 def addcommand( 3650 cls, 3651 spaces: bool = False, 3652 globsupport: bool = False, 3653 ) -> Callable[[DecoratorCallable], DecoratorCallable]: 3654 """ 3655 Decorator to register a command 3656 """ 3657 def func(cmd: DecoratorCallable) -> DecoratorCallable: 3658 cls.commands[cmd.__name__] = cmd 3659 cmd._spaces = spaces # type: ignore 3660 cmd._globsupport = globsupport # type: ignore 3661 cls._inspectkwargs(cmd) 3662 if cmd._globsupport and not cmd._spaces: # type: ignore 3663 raise ValueError("Cannot use globsupport without spaces.") 3664 return cmd 3665 return func 3666 3667 @classmethod 3668 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 3669 """ 3670 Decorator to register a command output processor 3671 """ 3672 def func(processor: DecoratorCallable) -> DecoratorCallable: 3673 cls.commands_output[cmd.__name__] = processor 3674 cls._inspectkwargs(processor) 3675 return processor 3676 return func 3677 3678 @classmethod 3679 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 3680 """ 3681 Decorator to register a command completor 3682 """ 3683 def func(processor: DecoratorCallable) -> DecoratorCallable: 3684 cls.commands_complete[cmd.__name__] = processor 3685 return processor 3686 return func 3687 3688 def ps1(self) -> str: 3689 """ 3690 Return the PS1 of the shell 3691 """ 3692 return "> " 3693 3694 def close(self) -> None: 3695 """ 3696 Function called on exiting 3697 """ 3698 print("Exited") 3699 3700 def help(self, cmd: Optional[str] = None) -> None: 3701 """ 3702 Return the help related to this CLI util 3703 """ 3704 def _args(func: Any) -> str: 3705 flags = func._flags.copy() 3706 if func.__name__ in self.commands_output: 3707 flags += self.commands_output[func.__name__]._flags # type: ignore 3708 return " %s%s" % ( 3709 ( 3710 "%s " % " ".join("[%s]" % x for x in flags) 3711 if flags else "" 3712 ), 3713 " ".join( 3714 "<%s%s>" % ( 3715 x.name, 3716 "?" if 3717 (x.default is None or x.default != inspect.Parameter.empty) 3718 else "" 3719 ) 3720 for x in list(inspect.signature(func).parameters.values())[1:] 3721 if x.name not in func._flagnames and x.name[0] != "_" 3722 ) 3723 ) 3724 3725 if cmd: 3726 if cmd not in self.commands: 3727 print("Unknown command '%s'" % cmd) 3728 return 3729 # help for one command 3730 func = self.commands[cmd] 3731 print("%s%s: %s" % ( 3732 cmd, 3733 _args(func), 3734 func.__doc__ and func.__doc__.strip() 3735 )) 3736 else: 3737 header = "│ %s - Help │" % self.__class__.__name__ 3738 print("┌" + "─" * (len(header) - 2) + "┐") 3739 print(header) 3740 print("└" + "─" * (len(header) - 2) + "┘") 3741 print( 3742 pretty_list( 3743 [ 3744 ( 3745 cmd, 3746 _args(func), 3747 func.__doc__ and func.__doc__.strip().split("\n")[0] or "" 3748 ) 3749 for cmd, func in self.commands.items() 3750 ], 3751 [("Command", "Arguments", "Description")] 3752 ) 3753 ) 3754 3755 def _completer(self) -> 'prompt_toolkit.completion.Completer': 3756 """ 3757 Returns a prompt_toolkit custom completer 3758 """ 3759 from prompt_toolkit.completion import Completer, Completion 3760 3761 class CLICompleter(Completer): 3762 def get_completions(cmpl, document, complete_event): # type: ignore 3763 if not complete_event.completion_requested: 3764 # Only activate when the user does <TAB> 3765 return 3766 parts = document.text.split(" ") 3767 cmd = parts[0].lower() 3768 if cmd not in self.commands: 3769 # We are trying to complete the command 3770 for possible_cmd in (x for x in self.commands if x.startswith(cmd)): 3771 yield Completion(possible_cmd, start_position=-len(cmd)) 3772 else: 3773 # We are trying to complete the command content 3774 if len(parts) == 1: 3775 return 3776 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:]) 3777 arg = " ".join(args) 3778 if cmd in self.commands_complete: 3779 for possible_arg in self.commands_complete[cmd](self, arg): 3780 yield Completion(possible_arg, start_position=-len(arg)) 3781 return 3782 return CLICompleter() 3783 3784 def loop(self, debug: int = 0) -> None: 3785 """ 3786 Main command handling loop 3787 """ 3788 from prompt_toolkit import PromptSession 3789 session = PromptSession(completer=self._completer()) 3790 3791 while True: 3792 try: 3793 cmd = session.prompt(self.ps1()).strip() 3794 except KeyboardInterrupt: 3795 continue 3796 except EOFError: 3797 self.close() 3798 break 3799 args = cmd.split(" ")[1:] 3800 cmd = cmd.split(" ")[0].strip().lower() 3801 if not cmd: 3802 continue 3803 if cmd in ["help", "h", "?"]: 3804 self.help(" ".join(args)) 3805 continue 3806 if cmd in "exit": 3807 break 3808 if cmd not in self.commands: 3809 print("Unknown command. Type help or ?") 3810 else: 3811 # check the number of arguments 3812 func = self.commands[cmd] 3813 args, kwargs, outkwargs = self._parseallargs(func, cmd, args) 3814 if func._spaces: # type: ignore 3815 args = [" ".join(args)] 3816 # if globsupport is set, we might need to do several calls 3817 if func._globsupport and "*" in args[0]: # type: ignore 3818 if args[0].count("*") > 1: 3819 print("More than 1 glob star (*) is currently unsupported.") 3820 continue 3821 before, after = args[0].split("*", 1) 3822 reg = re.compile(re.escape(before) + r".*" + after) 3823 calls = [ 3824 [x] for x in 3825 self.commands_complete[cmd](self, before) 3826 if reg.match(x) 3827 ] 3828 else: 3829 calls = [args] 3830 else: 3831 calls = [args] 3832 # now iterate if required, call the function and print its output 3833 res = None 3834 for args in calls: 3835 try: 3836 res = func(self, *args, **kwargs) 3837 except TypeError: 3838 print("Bad number of arguments !") 3839 self.help(cmd=cmd) 3840 continue 3841 except Exception as ex: 3842 print("Command failed with error: %s" % ex) 3843 if debug: 3844 traceback.print_exception(ex) 3845 try: 3846 if res and cmd in self.commands_output: 3847 self.commands_output[cmd](self, res, **outkwargs) 3848 except Exception as ex: 3849 print("Output processor failed with error: %s" % ex) 3850 3851 3852def AutoArgparse(func: DecoratorCallable) -> None: 3853 """ 3854 Generate an Argparse call from a function, then call this function. 3855 3856 Notes: 3857 3858 - for the arguments to have a description, the sphinx docstring format 3859 must be used. See 3860 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html 3861 - the arguments must be typed in Python (we ignore Sphinx-specific types) 3862 untyped arguments are ignored. 3863 - only types that would be supported by argparse are supported. The others 3864 are omitted. 3865 """ 3866 argsdoc = {} 3867 if func.__doc__: 3868 # Sphinx doc format parser 3869 m = re.match( 3870 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)", 3871 func.__doc__.strip(), 3872 ) 3873 if not m: 3874 desc = func.__doc__.strip() 3875 else: 3876 desc = m.group(1) 3877 sphinxargs = re.findall( 3878 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)", 3879 m.group(2), 3880 ) 3881 for argtype, argparam, argdesc in sphinxargs: 3882 argparam = argparam.strip() 3883 argdesc = argdesc.strip() 3884 if argtype == "param": 3885 if not argparam: 3886 raise ValueError(":param: without a name !") 3887 argsdoc[argparam] = argdesc 3888 else: 3889 desc = "" 3890 # Now build the argparse.ArgumentParser 3891 parser = argparse.ArgumentParser( 3892 prog=func.__name__, 3893 description=desc, 3894 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 3895 ) 3896 # Process the parameters 3897 positional = [] 3898 for param in inspect.signature(func).parameters.values(): 3899 if not param.annotation: 3900 continue 3901 parname = param.name 3902 paramkwargs = {} 3903 if param.annotation is bool: 3904 if param.default is True: 3905 parname = "no-" + parname 3906 paramkwargs["action"] = "store_false" 3907 else: 3908 paramkwargs["action"] = "store_true" 3909 elif param.annotation in [str, int, float]: 3910 paramkwargs["type"] = param.annotation 3911 else: 3912 continue 3913 if param.default != inspect.Parameter.empty: 3914 if param.kind == inspect.Parameter.POSITIONAL_ONLY: 3915 positional.append(param.name) 3916 paramkwargs["nargs"] = '?' 3917 else: 3918 parname = "--" + parname 3919 paramkwargs["default"] = param.default 3920 else: 3921 positional.append(param.name) 3922 if param.kind == inspect.Parameter.VAR_POSITIONAL: 3923 paramkwargs["action"] = "append" 3924 if param.name in argsdoc: 3925 paramkwargs["help"] = argsdoc[param.name] 3926 parser.add_argument(parname, **paramkwargs) # type: ignore 3927 # Now parse the sys.argv parameters 3928 params = vars(parser.parse_args()) 3929 # Act as in interactive mode 3930 conf.logLevel = 20 3931 from scapy.themes import DefaultTheme 3932 conf.color_theme = DefaultTheme() 3933 # And call the function 3934 try: 3935 func( 3936 *[params.pop(x) for x in positional], 3937 **{ 3938 (k[3:] if k.startswith("no_") else k): v 3939 for k, v in params.items() 3940 } 3941 ) 3942 except AssertionError as ex: 3943 print("ERROR: " + str(ex)) 3944 parser.print_help() 3945 3946 3947####################### 3948# PERIODIC SENDER # 3949####################### 3950 3951 3952class PeriodicSenderThread(threading.Thread): 3953 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True): 3954 # type: (Any, _PacketIterable, float, bool) -> None 3955 """ Thread to send packets periodically 3956 3957 Args: 3958 sock: socket where packet is sent periodically 3959 pkt: packet or list of packets to send 3960 interval: interval between two packets 3961 """ 3962 if not isinstance(pkt, list): 3963 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable 3964 else: 3965 self._pkts = pkt 3966 self._socket = sock 3967 self._stopped = threading.Event() 3968 self._enabled = threading.Event() 3969 self._enabled.set() 3970 self._interval = interval 3971 self._ignore_exceptions = ignore_exceptions 3972 threading.Thread.__init__(self) 3973 3974 def enable(self): 3975 # type: () -> None 3976 self._enabled.set() 3977 3978 def disable(self): 3979 # type: () -> None 3980 self._enabled.clear() 3981 3982 def run(self): 3983 # type: () -> None 3984 while not self._stopped.is_set() and not self._socket.closed: 3985 for p in self._pkts: 3986 try: 3987 if self._enabled.is_set(): 3988 self._socket.send(p) 3989 except (OSError, TimeoutError) as e: 3990 if self._ignore_exceptions: 3991 return 3992 else: 3993 raise e 3994 self._stopped.wait(timeout=self._interval) 3995 if self._stopped.is_set() or self._socket.closed: 3996 break 3997 3998 def stop(self): 3999 # type: () -> None 4000 self._stopped.set() 4001 self.join(self._interval * 2) 4002 4003 4004class SingleConversationSocket(object): 4005 def __init__(self, o): 4006 # type: (Any) -> None 4007 self._inner = o 4008 self._tx_mutex = threading.RLock() 4009 4010 @property 4011 def __dict__(self): # type: ignore 4012 return self._inner.__dict__ 4013 4014 def __getattr__(self, name): 4015 # type: (str) -> Any 4016 return getattr(self._inner, name) 4017 4018 def sr1(self, *args, **kargs): 4019 # type: (*Any, **Any) -> Any 4020 with self._tx_mutex: 4021 return self._inner.sr1(*args, **kargs) 4022 4023 def sr(self, *args, **kargs): 4024 # type: (*Any, **Any) -> Any 4025 with self._tx_mutex: 4026 return self._inner.sr(*args, **kargs) 4027 4028 def send(self, x): 4029 # type: (Packet) -> Any 4030 with self._tx_mutex: 4031 try: 4032 return self._inner.send(x) 4033 except (ConnectionError, OSError) as e: 4034 self._inner.close() 4035 raise e 4036