• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7General utility functions.
8"""
9
10
11from decimal import Decimal
12from io import StringIO
13from itertools import zip_longest
14from uuid import UUID
15
16import argparse
17import array
18import base64
19import collections
20import decimal
21import difflib
22import gzip
23import inspect
24import locale
25import math
26import os
27import pickle
28import random
29import re
30import shutil
31import socket
32import struct
33import subprocess
34import sys
35import tempfile
36import threading
37import time
38import traceback
39import warnings
40
41from scapy.config import conf
42from scapy.consts import DARWIN, OPENBSD, WINDOWS
43from scapy.data import MTU, DLT_EN10MB, DLT_RAW
44from scapy.compat import (
45    orb,
46    plain_str,
47    chb,
48    hex_bytes,
49    bytes_encode,
50)
51from scapy.error import (
52    log_interactive,
53    log_runtime,
54    Scapy_Exception,
55    warning,
56)
57from scapy.pton_ntop import inet_pton
58
59# Typing imports
60from typing import (
61    cast,
62    Any,
63    AnyStr,
64    Callable,
65    Dict,
66    IO,
67    Iterator,
68    List,
69    Optional,
70    TYPE_CHECKING,
71    Tuple,
72    Type,
73    Union,
74    overload,
75)
76from scapy.compat import (
77    DecoratorCallable,
78    Literal,
79)
80
81if TYPE_CHECKING:
82    from scapy.packet import Packet
83    from scapy.plist import _PacketIterable, PacketList
84    from scapy.supersocket import SuperSocket
85    import prompt_toolkit
86
87_ByteStream = Union[IO[bytes], gzip.GzipFile]
88
89###########
90#  Tools  #
91###########
92
93
94def issubtype(x,  # type: Any
95              t,  # type: Union[type, str]
96              ):
97    # type: (...) -> bool
98    """issubtype(C, B) -> bool
99
100    Return whether C is a class and if it is a subclass of class B.
101    When using a tuple as the second argument issubtype(X, (A, B, ...)),
102    is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.).
103    """
104    if isinstance(t, str):
105        return t in (z.__name__ for z in x.__bases__)
106    if isinstance(x, type) and issubclass(x, t):
107        return True
108    return False
109
110
111_Decimal = Union[Decimal, int]
112
113
114class EDecimal(Decimal):
115    """Extended Decimal
116
117    This implements arithmetic and comparison with float for
118    backward compatibility
119    """
120
121    def __add__(self, other, context=None):
122        # type: (_Decimal, Any) -> EDecimal
123        return EDecimal(Decimal.__add__(self, Decimal(other)))
124
125    def __radd__(self, other):
126        # type: (_Decimal) -> EDecimal
127        return EDecimal(Decimal.__add__(self, Decimal(other)))
128
129    def __sub__(self, other):
130        # type: (_Decimal) -> EDecimal
131        return EDecimal(Decimal.__sub__(self, Decimal(other)))
132
133    def __rsub__(self, other):
134        # type: (_Decimal) -> EDecimal
135        return EDecimal(Decimal.__rsub__(self, Decimal(other)))
136
137    def __mul__(self, other):
138        # type: (_Decimal) -> EDecimal
139        return EDecimal(Decimal.__mul__(self, Decimal(other)))
140
141    def __rmul__(self, other):
142        # type: (_Decimal) -> EDecimal
143        return EDecimal(Decimal.__mul__(self, Decimal(other)))
144
145    def __truediv__(self, other):
146        # type: (_Decimal) -> EDecimal
147        return EDecimal(Decimal.__truediv__(self, Decimal(other)))
148
149    def __floordiv__(self, other):
150        # type: (_Decimal) -> EDecimal
151        return EDecimal(Decimal.__floordiv__(self, Decimal(other)))
152
153    def __divmod__(self, other):
154        # type: (_Decimal) -> Tuple[EDecimal, EDecimal]
155        r = Decimal.__divmod__(self, Decimal(other))
156        return EDecimal(r[0]), EDecimal(r[1])
157
158    def __mod__(self, other):
159        # type: (_Decimal) -> EDecimal
160        return EDecimal(Decimal.__mod__(self, Decimal(other)))
161
162    def __rmod__(self, other):
163        # type: (_Decimal) -> EDecimal
164        return EDecimal(Decimal.__rmod__(self, Decimal(other)))
165
166    def __pow__(self, other, modulo=None):
167        # type: (_Decimal, Optional[_Decimal]) -> EDecimal
168        return EDecimal(Decimal.__pow__(self, Decimal(other), modulo))
169
170    def __eq__(self, other):
171        # type: (Any) -> bool
172        if isinstance(other, Decimal):
173            return super(EDecimal, self).__eq__(other)
174        else:
175            return bool(float(self) == other)
176
177    def normalize(self, precision):  # type: ignore
178        # type: (int) -> EDecimal
179        with decimal.localcontext() as ctx:
180            ctx.prec = precision
181            return EDecimal(super(EDecimal, self).normalize(ctx))
182
183
184@overload
185def get_temp_file(keep, autoext, fd):
186    # type: (bool, str, Literal[True]) -> IO[bytes]
187    pass
188
189
190@overload
191def get_temp_file(keep=False, autoext="", fd=False):
192    # type: (bool, str, Literal[False]) -> str
193    pass
194
195
196def get_temp_file(keep=False, autoext="", fd=False):
197    # type: (bool, str, bool) -> Union[IO[bytes], str]
198    """Creates a temporary file.
199
200    :param keep: If False, automatically delete the file when Scapy exits.
201    :param autoext: Suffix to add to the generated file name.
202    :param fd: If True, this returns a file-like object with the temporary
203               file opened. If False (default), this returns a file path.
204    """
205    f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext,
206                                    delete=False)
207    if not keep:
208        conf.temp_files.append(f.name)
209
210    if fd:
211        return f
212    else:
213        # Close the file so something else can take it.
214        f.close()
215        return f.name
216
217
218def get_temp_dir(keep=False):
219    # type: (bool) -> str
220    """Creates a temporary file, and returns its name.
221
222    :param keep: If False (default), the directory will be recursively
223                 deleted when Scapy exits.
224    :return: A full path to a temporary directory.
225    """
226
227    dname = tempfile.mkdtemp(prefix="scapy")
228
229    if not keep:
230        conf.temp_files.append(dname)
231
232    return dname
233
234
235def _create_fifo() -> Tuple[str, Any]:
236    """Creates a temporary fifo.
237
238    You must then use open_fifo() on the server_fd once
239    the client is connected to use it.
240
241    :returns: (client_file, server_fd)
242    """
243    if WINDOWS:
244        from scapy.arch.windows.structures import _get_win_fifo
245        return _get_win_fifo()
246    else:
247        f = get_temp_file()
248        os.unlink(f)
249        os.mkfifo(f)
250        return f, f
251
252
253def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]:
254    """Open the server_fd (see create_fifo)
255    """
256    if WINDOWS:
257        from scapy.arch.windows.structures import _win_fifo_open
258        return _win_fifo_open(fd)
259    else:
260        return open(fd, mode)
261
262
263def sane(x, color=False):
264    # type: (AnyStr, bool) -> str
265    r = ""
266    for i in x:
267        j = orb(i)
268        if (j < 32) or (j >= 127):
269            if color:
270                r += conf.color_theme.not_printable(".")
271            else:
272                r += "."
273        else:
274            r += chr(j)
275    return r
276
277
278@conf.commands.register
279def restart():
280    # type: () -> None
281    """Restarts scapy"""
282    if not conf.interactive or not os.path.isfile(sys.argv[0]):
283        raise OSError("Scapy was not started from console")
284    if WINDOWS:
285        res_code = 1
286        try:
287            res_code = subprocess.call([sys.executable] + sys.argv)
288        finally:
289            os._exit(res_code)
290    os.execv(sys.executable, [sys.executable] + sys.argv)
291
292
293def lhex(x):
294    # type: (Any) -> str
295    from scapy.volatile import VolatileValue
296    if isinstance(x, VolatileValue):
297        return repr(x)
298    if isinstance(x, int):
299        return hex(x)
300    if isinstance(x, tuple):
301        return "(%s)" % ", ".join(lhex(v) for v in x)
302    if isinstance(x, list):
303        return "[%s]" % ", ".join(lhex(v) for v in x)
304    return str(x)
305
306
307@conf.commands.register
308def hexdump(p, dump=False):
309    # type: (Union[Packet, AnyStr], bool) -> Optional[str]
310    """Build a tcpdump like hexadecimal view
311
312    :param p: a Packet
313    :param dump: define if the result must be printed or returned in a variable
314    :return: a String only when dump=True
315    """
316    s = ""
317    x = bytes_encode(p)
318    x_len = len(x)
319    i = 0
320    while i < x_len:
321        s += "%04x  " % i
322        for j in range(16):
323            if i + j < x_len:
324                s += "%02X " % orb(x[i + j])
325            else:
326                s += "   "
327        s += " %s\n" % sane(x[i:i + 16], color=True)
328        i += 16
329    # remove trailing \n
330    s = s[:-1] if s.endswith("\n") else s
331    if dump:
332        return s
333    else:
334        print(s)
335        return None
336
337
338@conf.commands.register
339def linehexdump(p, onlyasc=0, onlyhex=0, dump=False):
340    # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str]
341    """Build an equivalent view of hexdump() on a single line
342
343    Note that setting both onlyasc and onlyhex to 1 results in a empty output
344
345    :param p: a Packet
346    :param onlyasc: 1 to display only the ascii view
347    :param onlyhex: 1 to display only the hexadecimal view
348    :param dump: print the view if False
349    :return: a String only when dump=True
350    """
351    s = ""
352    s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump)
353    if dump:
354        return s
355    else:
356        print(s)
357        return None
358
359
360@conf.commands.register
361def chexdump(p, dump=False):
362    # type: (Union[Packet, AnyStr], bool) -> Optional[str]
363    """Build a per byte hexadecimal representation
364
365    Example:
366        >>> chexdump(IP())
367        0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01  # noqa: E501
368
369    :param p: a Packet
370    :param dump: print the view if False
371    :return: a String only if dump=True
372    """
373    x = bytes_encode(p)
374    s = ", ".join("%#04x" % orb(x) for x in x)
375    if dump:
376        return s
377    else:
378        print(s)
379        return None
380
381
382@conf.commands.register
383def hexstr(p, onlyasc=0, onlyhex=0, color=False):
384    # type: (Union[Packet, AnyStr], int, int, bool) -> str
385    """Build a fancy tcpdump like hex from bytes."""
386    x = bytes_encode(p)
387    s = []
388    if not onlyasc:
389        s.append(" ".join("%02X" % orb(b) for b in x))
390    if not onlyhex:
391        s.append(sane(x, color=color))
392    return "  ".join(s)
393
394
395def repr_hex(s):
396    # type: (bytes) -> str
397    """ Convert provided bitstring to a simple string of hex digits """
398    return "".join("%02x" % orb(x) for x in s)
399
400
401@conf.commands.register
402def hexdiff(
403    a: Union['Packet', AnyStr],
404    b: Union['Packet', AnyStr],
405    algo: Optional[str] = None,
406    autojunk: bool = False,
407) -> None:
408    """
409    Show differences between 2 binary strings, Packets...
410
411    Available algorithms:
412        - wagnerfischer: Use the Wagner and Fischer algorithm to compute the
413          Levenstein distance between the strings then backtrack.
414        - difflib: Use the difflib.SequenceMatcher implementation. This based on a
415          modified version of the Ratcliff and Obershelp algorithm.
416          This is much faster, but far less accurate.
417          https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher
418
419    :param a:
420    :param b: The binary strings, packets... to compare
421    :param algo: Force the algo to be 'wagnerfischer' or 'difflib'.
422                 By default, this is chosen depending on the complexity, optimistically
423                 preferring wagnerfischer unless really necessary.
424    :param autojunk: (difflib only) See difflib documentation.
425    """
426    xb = bytes_encode(a)
427    yb = bytes_encode(b)
428
429    if algo is None:
430        # Choose the best algorithm
431        complexity = len(xb) * len(yb)
432        if complexity < 1e7:
433            # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable.
434            # Anything much larger than this shouldn't be attempted by default.
435            algo = "wagnerfischer"
436            if complexity > 1e6:
437                log_interactive.info(
438                    "Complexity is a bit high. hexdiff will take a few seconds."
439                )
440        else:
441            algo = "difflib"
442
443    backtrackx = []
444    backtracky = []
445
446    if algo == "wagnerfischer":
447        xb = xb[::-1]
448        yb = yb[::-1]
449
450        # costs for the 3 operations
451        INSERT = 1
452        DELETE = 1
453        SUBST = 1
454
455        # Typically, d[i,j] will hold the distance between
456        # the first i characters of xb and the first j characters of yb.
457        # We change the Wagner Fischer to also store pointers to all
458        # the intermediate steps taken while calculating the Levenstein distance.
459        d = {(-1, -1): (0, (-1, -1))}
460        for j in range(len(yb)):
461            d[-1, j] = (j + 1) * INSERT, (-1, j - 1)
462        for i in range(len(xb)):
463            d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1)
464
465        # Compute the Levenstein distance between the two strings, but
466        # store all the steps to be able to backtrack at the end.
467        for j in range(len(yb)):
468            for i in range(len(xb)):
469                d[i, j] = min(
470                    (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)),
471                    (d[i - 1, j][0] + DELETE, (i - 1, j)),
472                    (d[i, j - 1][0] + INSERT, (i, j - 1)),
473                )
474
475        # Iterate through the steps backwards to create the diff
476        i = len(xb) - 1
477        j = len(yb) - 1
478        while not (i == j == -1):
479            i2, j2 = d[i, j][1]
480            backtrackx.append(xb[i2 + 1:i + 1])
481            backtracky.append(yb[j2 + 1:j + 1])
482            i, j = i2, j2
483    elif algo == "difflib":
484        sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk)
485        xarr = [xb[i:i + 1] for i in range(len(xb))]
486        yarr = [yb[i:i + 1] for i in range(len(yb))]
487        # Iterate through opcodes to build the backtrack
488        for opcode in sm.get_opcodes():
489            typ, x0, x1, y0, y1 = opcode
490            if typ == 'delete':
491                backtrackx += xarr[x0:x1]
492                backtracky += [b''] * (x1 - x0)
493            elif typ == 'insert':
494                backtrackx += [b''] * (y1 - y0)
495                backtracky += yarr[y0:y1]
496            elif typ in ['equal', 'replace']:
497                backtrackx += xarr[x0:x1]
498                backtracky += yarr[y0:y1]
499        # Some lines may have been considered as junk. Check the sizes
500        if autojunk:
501            lbx = len(backtrackx)
502            lby = len(backtracky)
503            backtrackx += [b''] * (max(lbx, lby) - lbx)
504            backtracky += [b''] * (max(lbx, lby) - lby)
505    else:
506        raise ValueError("Unknown algorithm '%s'" % algo)
507
508    # Print the diff
509
510    x = y = i = 0
511    colorize: Dict[int, Callable[[str], str]] = {
512        0: lambda x: x,
513        -1: conf.color_theme.left,
514        1: conf.color_theme.right
515    }
516
517    dox = 1
518    doy = 0
519    btx_len = len(backtrackx)
520    while i < btx_len:
521        linex = backtrackx[i:i + 16]
522        liney = backtracky[i:i + 16]
523        xx = sum(len(k) for k in linex)
524        yy = sum(len(k) for k in liney)
525        if dox and not xx:
526            dox = 0
527            doy = 1
528        if dox and linex == liney:
529            doy = 1
530
531        if dox:
532            xd = y
533            j = 0
534            while not linex[j]:
535                j += 1
536                xd -= 1
537            print(colorize[doy - dox]("%04x" % xd), end=' ')
538            x += xx
539            line = linex
540        else:
541            print("    ", end=' ')
542        if doy:
543            yd = y
544            j = 0
545            while not liney[j]:
546                j += 1
547                yd -= 1
548            print(colorize[doy - dox]("%04x" % yd), end=' ')
549            y += yy
550            line = liney
551        else:
552            print("    ", end=' ')
553
554        print(" ", end=' ')
555
556        cl = ""
557        for j in range(16):
558            if i + j < min(len(backtrackx), len(backtracky)):
559                if line[j]:
560                    col = colorize[(linex[j] != liney[j]) * (doy - dox)]
561                    print(col("%02X" % orb(line[j])), end=' ')
562                    if linex[j] == liney[j]:
563                        cl += sane(line[j], color=True)
564                    else:
565                        cl += col(sane(line[j]))
566                else:
567                    print("  ", end=' ')
568                    cl += " "
569            else:
570                print("  ", end=' ')
571            if j == 7:
572                print("", end=' ')
573
574        print(" ", cl)
575
576        if doy or not yy:
577            doy = 0
578            dox = 1
579            i += 16
580        else:
581            if yy:
582                dox = 0
583                doy = 1
584            else:
585                i += 16
586
587
588if struct.pack("H", 1) == b"\x00\x01":  # big endian
589    checksum_endian_transform = lambda chk: chk  # type: Callable[[int], int]
590else:
591    checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8
592
593
594def checksum(pkt):
595    # type: (bytes) -> int
596    if len(pkt) % 2 == 1:
597        pkt += b"\0"
598    s = sum(array.array("H", pkt))
599    s = (s >> 16) + (s & 0xffff)
600    s += s >> 16
601    s = ~s
602    return checksum_endian_transform(s) & 0xffff
603
604
605def _fletcher16(charbuf):
606    # type: (bytes) -> Tuple[int, int]
607    # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/>  # noqa: E501
608    c0 = c1 = 0
609    for char in charbuf:
610        c0 += char
611        c1 += c0
612
613    c0 %= 255
614    c1 %= 255
615    return (c0, c1)
616
617
618@conf.commands.register
619def fletcher16_checksum(binbuf):
620    # type: (bytes) -> int
621    """Calculates Fletcher-16 checksum of the given buffer.
622
623       Note:
624       If the buffer contains the two checkbytes derived from the Fletcher-16 checksum  # noqa: E501
625       the result of this function has to be 0. Otherwise the buffer has been corrupted.  # noqa: E501
626    """
627    (c0, c1) = _fletcher16(binbuf)
628    return (c1 << 8) | c0
629
630
631@conf.commands.register
632def fletcher16_checkbytes(binbuf, offset):
633    # type: (bytes, int) -> bytes
634    """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string.
635
636       Including the bytes into the buffer (at the position marked by offset) the  # noqa: E501
637       global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify  # noqa: E501
638       the integrity of the buffer on the receiver side.
639
640       For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B.  # noqa: E501
641    """
642
643    # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/>  # noqa: E501
644    if len(binbuf) < offset:
645        raise Exception("Packet too short for checkbytes %d" % len(binbuf))
646
647    binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:]
648    (c0, c1) = _fletcher16(binbuf)
649
650    x = ((len(binbuf) - offset - 1) * c0 - c1) % 255
651
652    if (x <= 0):
653        x += 255
654
655    y = 510 - c0 - x
656
657    if (y > 255):
658        y -= 255
659    return chb(x) + chb(y)
660
661
662def mac2str(mac):
663    # type: (str) -> bytes
664    return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':'))
665
666
667def valid_mac(mac):
668    # type: (str) -> bool
669    try:
670        return len(mac2str(mac)) == 6
671    except ValueError:
672        pass
673    return False
674
675
676def str2mac(s):
677    # type: (bytes) -> str
678    if isinstance(s, str):
679        return ("%02x:" * len(s))[:-1] % tuple(map(ord, s))
680    return ("%02x:" * len(s))[:-1] % tuple(s)
681
682
683def randstring(length):
684    # type: (int) -> bytes
685    """
686    Returns a random string of length (length >= 0)
687    """
688    return b"".join(struct.pack('B', random.randint(0, 255))
689                    for _ in range(length))
690
691
692def zerofree_randstring(length):
693    # type: (int) -> bytes
694    """
695    Returns a random string of length (length >= 0) without zero in it.
696    """
697    return b"".join(struct.pack('B', random.randint(1, 255))
698                    for _ in range(length))
699
700
701def stror(s1, s2):
702    # type: (bytes, bytes) -> bytes
703    """
704    Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2
705    must be of same length.
706    """
707    return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2))
708
709
710def strxor(s1, s2):
711    # type: (bytes, bytes) -> bytes
712    """
713    Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2
714    must be of same length.
715    """
716    return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2))
717
718
719def strand(s1, s2):
720    # type: (bytes, bytes) -> bytes
721    """
722    Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2
723    must be of same length.
724    """
725    return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2))
726
727
728def strrot(s1, count, right=True):
729    # type: (bytes, int, bool) -> bytes
730    """
731    Rotate the binary by 'count' bytes
732    """
733    off = count % len(s1)
734    if right:
735        return s1[-off:] + s1[:-off]
736    else:
737        return s1[off:] + s1[:off]
738
739
740# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470  # noqa: E501
741try:
742    socket.inet_aton("255.255.255.255")
743except socket.error:
744    def inet_aton(ip_string):
745        # type: (str) -> bytes
746        if ip_string == "255.255.255.255":
747            return b"\xff" * 4
748        else:
749            return socket.inet_aton(ip_string)
750else:
751    inet_aton = socket.inet_aton  # type: ignore
752
753inet_ntoa = socket.inet_ntoa
754
755
756def atol(x):
757    # type: (str) -> int
758    try:
759        ip = inet_aton(x)
760    except socket.error:
761        raise ValueError("Bad IP format: %s" % x)
762    return cast(int, struct.unpack("!I", ip)[0])
763
764
765def valid_ip(addr):
766    # type: (str) -> bool
767    try:
768        addr = plain_str(addr)
769    except UnicodeDecodeError:
770        return False
771    try:
772        atol(addr)
773    except (OSError, ValueError, socket.error):
774        return False
775    return True
776
777
778def valid_net(addr):
779    # type: (str) -> bool
780    try:
781        addr = plain_str(addr)
782    except UnicodeDecodeError:
783        return False
784    if '/' in addr:
785        ip, mask = addr.split('/', 1)
786        return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32
787    return valid_ip(addr)
788
789
790def valid_ip6(addr):
791    # type: (str) -> bool
792    try:
793        addr = plain_str(addr)
794    except UnicodeDecodeError:
795        return False
796    try:
797        inet_pton(socket.AF_INET6, addr)
798    except socket.error:
799        return False
800    return True
801
802
803def valid_net6(addr):
804    # type: (str) -> bool
805    try:
806        addr = plain_str(addr)
807    except UnicodeDecodeError:
808        return False
809    if '/' in addr:
810        ip, mask = addr.split('/', 1)
811        return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128
812    return valid_ip6(addr)
813
814
815def ltoa(x):
816    # type: (int) -> str
817    return inet_ntoa(struct.pack("!I", x & 0xffffffff))
818
819
820def itom(x):
821    # type: (int) -> int
822    return (0xffffffff00000000 >> x) & 0xffffffff
823
824
825def in4_cidr2mask(m):
826    # type: (int) -> bytes
827    """
828    Return the mask (bitstring) associated with provided length
829    value. For instance if function is called on 20, return value is
830    b'\xff\xff\xf0\x00'.
831    """
832    if m > 32 or m < 0:
833        raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m)  # noqa: E501
834
835    return strxor(
836        b"\xff" * 4,
837        struct.pack(">I", 2**(32 - m) - 1)
838    )
839
840
841def in4_isincluded(addr, prefix, mask):
842    # type: (str, str, int) -> bool
843    """
844    Returns True when 'addr' belongs to prefix/mask. False otherwise.
845    """
846    temp = inet_pton(socket.AF_INET, addr)
847    pref = in4_cidr2mask(mask)
848    zero = inet_pton(socket.AF_INET, prefix)
849    return zero == strand(temp, pref)
850
851
852def in4_ismaddr(str):
853    # type: (str) -> bool
854    """
855    Returns True if provided address in printable format belongs to
856    allocated Multicast address space (224.0.0.0/4).
857    """
858    return in4_isincluded(str, "224.0.0.0", 4)
859
860
861def in4_ismlladdr(str):
862    # type: (str) -> bool
863    """
864    Returns True if address belongs to link-local multicast address
865    space (224.0.0.0/24)
866    """
867    return in4_isincluded(str, "224.0.0.0", 24)
868
869
870def in4_ismgladdr(str):
871    # type: (str) -> bool
872    """
873    Returns True if address belongs to global multicast address
874    space (224.0.1.0-238.255.255.255).
875    """
876    return (
877        in4_isincluded(str, "224.0.0.0", 4) and
878        not in4_isincluded(str, "224.0.0.0", 24) and
879        not in4_isincluded(str, "239.0.0.0", 8)
880    )
881
882
883def in4_ismlsaddr(str):
884    # type: (str) -> bool
885    """
886    Returns True if address belongs to limited scope multicast address
887    space (239.0.0.0/8).
888    """
889    return in4_isincluded(str, "239.0.0.0", 8)
890
891
892def in4_isaddrllallnodes(str):
893    # type: (str) -> bool
894    """
895    Returns True if address is the link-local all-nodes multicast
896    address (224.0.0.1).
897    """
898    return (inet_pton(socket.AF_INET, "224.0.0.1") ==
899            inet_pton(socket.AF_INET, str))
900
901
902def in4_getnsmac(a):
903    # type: (bytes) -> str
904    """
905    Return the multicast mac address associated with provided
906    IPv4 address. Passed address must be in network format.
907    """
908
909    return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3])
910
911
912def decode_locale_str(x):
913    # type: (bytes) -> str
914    """
915    Decode bytes into a string using the system locale.
916    Useful on Windows where it can be unusual (e.g. cp1252)
917    """
918    return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace")
919
920
921class ContextManagerSubprocess(object):
922    """
923    Context manager that eases checking for unknown command, without
924    crashing.
925
926    Example:
927    >>> with ContextManagerSubprocess("tcpdump"):
928    >>>     subprocess.Popen(["tcpdump", "--version"])
929    ERROR: Could not execute tcpdump, is it installed?
930
931    """
932
933    def __init__(self, prog, suppress=True):
934        # type: (str, bool) -> None
935        self.prog = prog
936        self.suppress = suppress
937
938    def __enter__(self):
939        # type: () -> None
940        pass
941
942    def __exit__(self,
943                 exc_type,  # type: Optional[type]
944                 exc_value,  # type: Optional[Exception]
945                 traceback,  # type: Optional[Any]
946                 ):
947        # type: (...) -> Optional[bool]
948        if exc_value is None or exc_type is None:
949            return None
950        # Errored
951        if isinstance(exc_value, EnvironmentError):
952            msg = "Could not execute %s, is it installed?" % self.prog
953        else:
954            msg = "%s: execution failed (%s)" % (
955                self.prog,
956                exc_type.__class__.__name__
957            )
958        if not self.suppress:
959            raise exc_type(msg)
960        log_runtime.error(msg, exc_info=True)
961        return True  # Suppress the exception
962
963
964class ContextManagerCaptureOutput(object):
965    """
966    Context manager that intercept the console's output.
967
968    Example:
969    >>> with ContextManagerCaptureOutput() as cmco:
970    ...     print("hey")
971    ...     assert cmco.get_output() == "hey"
972    """
973
974    def __init__(self):
975        # type: () -> None
976        self.result_export_object = ""
977
978    def __enter__(self):
979        # type: () -> ContextManagerCaptureOutput
980        from unittest import mock
981
982        def write(s, decorator=self):
983            # type: (str, ContextManagerCaptureOutput) -> None
984            decorator.result_export_object += s
985        mock_stdout = mock.Mock()
986        mock_stdout.write = write
987        self.bck_stdout = sys.stdout
988        sys.stdout = mock_stdout
989        return self
990
991    def __exit__(self, *exc):
992        # type: (*Any) -> Literal[False]
993        sys.stdout = self.bck_stdout
994        return False
995
996    def get_output(self, eval_bytes=False):
997        # type: (bool) -> str
998        if self.result_export_object.startswith("b'") and eval_bytes:
999            return plain_str(eval(self.result_export_object))
1000        return self.result_export_object
1001
1002
1003def do_graph(
1004    graph,  # type: str
1005    prog=None,  # type: Optional[str]
1006    format=None,  # type: Optional[str]
1007    target=None,  # type: Optional[Union[IO[bytes], str]]
1008    type=None,  # type: Optional[str]
1009    string=None,  # type: Optional[bool]
1010    options=None  # type: Optional[List[str]]
1011):
1012    # type: (...) -> Optional[str]
1013    """Processes graph description using an external software.
1014    This method is used to convert a graphviz format to an image.
1015
1016    :param graph: GraphViz graph description
1017    :param prog: which graphviz program to use
1018    :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T"
1019        option
1020    :param string: if not None, simply return the graph string
1021    :param target: filename or redirect. Defaults pipe to Imagemagick's
1022        display program
1023    :param options: options to be passed to prog
1024    """
1025
1026    if format is None:
1027        format = "svg"
1028    if string:
1029        return graph
1030    if type is not None:
1031        warnings.warn(
1032            "type is deprecated, and was renamed format",
1033            DeprecationWarning
1034        )
1035        format = type
1036    if prog is None:
1037        prog = conf.prog.dot
1038    start_viewer = False
1039    if target is None:
1040        if WINDOWS:
1041            target = get_temp_file(autoext="." + format)
1042            start_viewer = True
1043        else:
1044            with ContextManagerSubprocess(conf.prog.display):
1045                target = subprocess.Popen([conf.prog.display],
1046                                          stdin=subprocess.PIPE).stdin
1047    if format is not None:
1048        format = "-T%s" % format
1049    if isinstance(target, str):
1050        if target.startswith('|'):
1051            target = subprocess.Popen(target[1:].lstrip(), shell=True,
1052                                      stdin=subprocess.PIPE).stdin
1053        elif target.startswith('>'):
1054            target = open(target[1:].lstrip(), "wb")
1055        else:
1056            target = open(os.path.abspath(target), "wb")
1057    target = cast(IO[bytes], target)
1058    proc = subprocess.Popen(
1059        "\"%s\" %s %s" % (prog, options or "", format or ""),
1060        shell=True, stdin=subprocess.PIPE, stdout=target,
1061        stderr=subprocess.PIPE
1062    )
1063    _, stderr = proc.communicate(bytes_encode(graph))
1064    if proc.returncode != 0:
1065        raise OSError(
1066            "GraphViz call failed (is it installed?):\n" +
1067            plain_str(stderr)
1068        )
1069    try:
1070        target.close()
1071    except Exception:
1072        pass
1073    if start_viewer:
1074        # Workaround for file not found error: We wait until tempfile is written.  # noqa: E501
1075        waiting_start = time.time()
1076        while not os.path.exists(target.name):
1077            time.sleep(0.1)
1078            if time.time() - waiting_start > 3:
1079                warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile)  # noqa: E501
1080                break
1081        else:
1082            if WINDOWS and conf.prog.display == conf.prog._default:
1083                os.startfile(target.name)
1084            else:
1085                with ContextManagerSubprocess(conf.prog.display):
1086                    subprocess.Popen([conf.prog.display, target.name])
1087    return None
1088
1089
1090_TEX_TR = {
1091    "{": "{\\tt\\char123}",
1092    "}": "{\\tt\\char125}",
1093    "\\": "{\\tt\\char92}",
1094    "^": "\\^{}",
1095    "$": "\\$",
1096    "#": "\\#",
1097    "_": "\\_",
1098    "&": "\\&",
1099    "%": "\\%",
1100    "|": "{\\tt\\char124}",
1101    "~": "{\\tt\\char126}",
1102    "<": "{\\tt\\char60}",
1103    ">": "{\\tt\\char62}",
1104}
1105
1106
1107def tex_escape(x):
1108    # type: (str) -> str
1109    s = ""
1110    for c in x:
1111        s += _TEX_TR.get(c, c)
1112    return s
1113
1114
1115def colgen(*lstcol,  # type: Any
1116           **kargs  # type: Any
1117           ):
1118    # type: (...) -> Iterator[Any]
1119    """Returns a generator that mixes provided quantities forever
1120    trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default"""  # noqa: E501
1121    if len(lstcol) < 2:
1122        lstcol *= 2
1123    trans = kargs.get("trans", lambda x, y, z: (x, y, z))
1124    while True:
1125        for i in range(len(lstcol)):
1126            for j in range(len(lstcol)):
1127                for k in range(len(lstcol)):
1128                    if i != j or j != k or k != i:
1129                        yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)])  # noqa: E501
1130
1131
1132def incremental_label(label="tag%05i", start=0):
1133    # type: (str, int) -> Iterator[str]
1134    while True:
1135        yield label % start
1136        start += 1
1137
1138
1139def binrepr(val):
1140    # type: (int) -> str
1141    return bin(val)[2:]
1142
1143
1144def long_converter(s):
1145    # type: (str) -> int
1146    return int(s.replace('\n', '').replace(' ', ''), 16)
1147
1148#########################
1149#    Enum management    #
1150#########################
1151
1152
1153class EnumElement:
1154    def __init__(self, key, value):
1155        # type: (str, int) -> None
1156        self._key = key
1157        self._value = value
1158
1159    def __repr__(self):
1160        # type: () -> str
1161        return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value)  # noqa: E501
1162
1163    def __getattr__(self, attr):
1164        # type: (str) -> Any
1165        return getattr(self._value, attr)
1166
1167    def __str__(self):
1168        # type: () -> str
1169        return self._key
1170
1171    def __bytes__(self):
1172        # type: () -> bytes
1173        return bytes_encode(self.__str__())
1174
1175    def __hash__(self):
1176        # type: () -> int
1177        return self._value
1178
1179    def __int__(self):
1180        # type: () -> int
1181        return int(self._value)
1182
1183    def __eq__(self, other):
1184        # type: (Any) -> bool
1185        return self._value == int(other)
1186
1187    def __neq__(self, other):
1188        # type: (Any) -> bool
1189        return not self.__eq__(other)
1190
1191
1192class Enum_metaclass(type):
1193    element_class = EnumElement
1194
1195    def __new__(cls, name, bases, dct):
1196        # type: (Any, str, Any, Dict[str, Any]) -> Any
1197        rdict = {}
1198        for k, v in dct.items():
1199            if isinstance(v, int):
1200                v = cls.element_class(k, v)
1201                dct[k] = v
1202                rdict[v] = k
1203        dct["__rdict__"] = rdict
1204        return super(Enum_metaclass, cls).__new__(cls, name, bases, dct)
1205
1206    def __getitem__(self, attr):
1207        # type: (int) -> Any
1208        return self.__rdict__[attr]  # type: ignore
1209
1210    def __contains__(self, val):
1211        # type: (int) -> bool
1212        return val in self.__rdict__  # type: ignore
1213
1214    def get(self, attr, val=None):
1215        # type: (str, Optional[Any]) -> Any
1216        return self.__rdict__.get(attr, val)  # type: ignore
1217
1218    def __repr__(self):
1219        # type: () -> str
1220        return "<%s>" % self.__dict__.get("name", self.__name__)
1221
1222
1223###################
1224#  Object saving  #
1225###################
1226
1227
1228def export_object(obj):
1229    # type: (Any) -> None
1230    import zlib
1231    print(base64.b64encode(zlib.compress(pickle.dumps(obj, 2), 9)).decode())
1232
1233
1234def import_object(obj=None):
1235    # type: (Optional[str]) -> Any
1236    import zlib
1237    if obj is None:
1238        obj = sys.stdin.read()
1239    return pickle.loads(zlib.decompress(base64.b64decode(obj.strip())))
1240
1241
1242def save_object(fname, obj):
1243    # type: (str, Any) -> None
1244    """Pickle a Python object"""
1245
1246    fd = gzip.open(fname, "wb")
1247    pickle.dump(obj, fd)
1248    fd.close()
1249
1250
1251def load_object(fname):
1252    # type: (str) -> Any
1253    """unpickle a Python object"""
1254    return pickle.load(gzip.open(fname, "rb"))
1255
1256
1257@conf.commands.register
1258def corrupt_bytes(data, p=0.01, n=None):
1259    # type: (str, float, Optional[int]) -> bytes
1260    """
1261    Corrupt a given percentage (at least one byte) or number of bytes
1262    from a string
1263    """
1264    s = array.array("B", bytes_encode(data))
1265    s_len = len(s)
1266    if n is None:
1267        n = max(1, int(s_len * p))
1268    for i in random.sample(range(s_len), n):
1269        s[i] = (s[i] + random.randint(1, 255)) % 256
1270    return s.tobytes()
1271
1272
1273@conf.commands.register
1274def corrupt_bits(data, p=0.01, n=None):
1275    # type: (str, float, Optional[int]) -> bytes
1276    """
1277    Flip a given percentage (at least one bit) or number of bits
1278    from a string
1279    """
1280    s = array.array("B", bytes_encode(data))
1281    s_len = len(s) * 8
1282    if n is None:
1283        n = max(1, int(s_len * p))
1284    for i in random.sample(range(s_len), n):
1285        s[i // 8] ^= 1 << (i % 8)
1286    return s.tobytes()
1287
1288
1289#############################
1290#  pcap capture file stuff  #
1291#############################
1292
1293@conf.commands.register
1294def wrpcap(filename,  # type: Union[IO[bytes], str]
1295           pkt,  # type: _PacketIterable
1296           *args,  # type: Any
1297           **kargs  # type: Any
1298           ):
1299    # type: (...) -> None
1300    """Write a list of packets to a pcap file
1301
1302    :param filename: the name of the file to write packets to, or an open,
1303        writable file-like object. The file descriptor will be
1304        closed at the end of the call, so do not use an object you
1305        do not want to close (e.g., running wrpcap(sys.stdout, [])
1306        in interactive mode will crash Scapy).
1307    :param gz: set to 1 to save a gzipped capture
1308    :param linktype: force linktype value
1309    :param endianness: "<" or ">", force endianness
1310    :param sync: do not bufferize writes to the capture file
1311    """
1312    with PcapWriter(filename, *args, **kargs) as fdesc:
1313        fdesc.write(pkt)
1314
1315
1316@conf.commands.register
1317def wrpcapng(filename,  # type: str
1318             pkt,  # type: _PacketIterable
1319             ):
1320    # type: (...) -> None
1321    """Write a list of packets to a pcapng file
1322
1323    :param filename: the name of the file to write packets to, or an open,
1324        writable file-like object. The file descriptor will be
1325        closed at the end of the call, so do not use an object you
1326        do not want to close (e.g., running wrpcapng(sys.stdout, [])
1327        in interactive mode will crash Scapy).
1328    :param pkt: packets to write
1329    """
1330    with PcapNgWriter(filename) as fdesc:
1331        fdesc.write(pkt)
1332
1333
1334@conf.commands.register
1335def rdpcap(filename, count=-1):
1336    # type: (Union[IO[bytes], str], int) -> PacketList
1337    """Read a pcap or pcapng file and return a packet list
1338
1339    :param count: read only <count> packets
1340    """
1341    # Rant: Our complicated use of metaclasses and especially the
1342    # __call__ function is, of course, not supported by MyPy.
1343    # One day we should simplify this mess and use a much simpler
1344    # layout that will actually be supported and properly dissected.
1345    with PcapReader(filename) as fdesc:  # type: ignore
1346        return fdesc.read_all(count=count)
1347
1348
1349# NOTE: Type hinting
1350# Mypy doesn't understand the following metaclass, and thinks each
1351# constructor (PcapReader...) needs 3 arguments each. To avoid this,
1352# we add a fake (=None) to the last 2 arguments then force the value
1353# to not be None in the signature and pack the whole thing in an ignore.
1354# This allows to not have # type: ignore every time we call those
1355# constructors.
1356
1357class PcapReader_metaclass(type):
1358    """Metaclass for (Raw)Pcap(Ng)Readers"""
1359
1360    def __new__(cls, name, bases, dct):
1361        # type: (Any, str, Any, Dict[str, Any]) -> Any
1362        """The `alternative` class attribute is declared in the PcapNg
1363        variant, and set here to the Pcap variant.
1364
1365        """
1366        newcls = super(PcapReader_metaclass, cls).__new__(
1367            cls, name, bases, dct
1368        )
1369        if 'alternative' in dct:
1370            dct['alternative'].alternative = newcls
1371        return newcls
1372
1373    def __call__(cls, filename):
1374        # type: (Union[IO[bytes], str]) -> Any
1375        """Creates a cls instance, use the `alternative` if that
1376        fails.
1377
1378        """
1379        i = cls.__new__(
1380            cls,
1381            cls.__name__,
1382            cls.__bases__,
1383            cls.__dict__  # type: ignore
1384        )
1385        filename, fdesc, magic = cls.open(filename)
1386        if not magic:
1387            raise Scapy_Exception(
1388                "No data could be read!"
1389            )
1390        try:
1391            i.__init__(filename, fdesc, magic)
1392            return i
1393        except (Scapy_Exception, EOFError):
1394            pass
1395
1396        if "alternative" in cls.__dict__:
1397            cls = cls.__dict__["alternative"]
1398            i = cls.__new__(
1399                cls,
1400                cls.__name__,
1401                cls.__bases__,
1402                cls.__dict__  # type: ignore
1403            )
1404            try:
1405                i.__init__(filename, fdesc, magic)
1406                return i
1407            except (Scapy_Exception, EOFError):
1408                pass
1409
1410        raise Scapy_Exception("Not a supported capture file")
1411
1412    @staticmethod
1413    def open(fname  # type: Union[IO[bytes], str]
1414             ):
1415        # type: (...) -> Tuple[str, _ByteStream, bytes]
1416        """Open (if necessary) filename, and read the magic."""
1417        if isinstance(fname, str):
1418            filename = fname
1419            fdesc = open(filename, "rb")  # type: _ByteStream
1420            magic = fdesc.read(2)
1421            if magic == b"\x1f\x8b":
1422                # GZIP header detected.
1423                fdesc.seek(0)
1424                fdesc = gzip.GzipFile(fileobj=fdesc)
1425                magic = fdesc.read(2)
1426            magic += fdesc.read(2)
1427        else:
1428            fdesc = fname
1429            filename = getattr(fdesc, "name", "No name")
1430            magic = fdesc.read(4)
1431        return filename, fdesc, magic
1432
1433
1434class RawPcapReader(metaclass=PcapReader_metaclass):
1435    """A stateful pcap reader. Each packet is returned as a string"""
1436
1437    # TODO: use Generics to properly type the various readers.
1438    # As of right now, RawPcapReader is typed as if it returned packets
1439    # because all of its child do. Fix that
1440
1441    nonblocking_socket = True
1442    PacketMetadata = collections.namedtuple("PacketMetadata",
1443                                            ["sec", "usec", "wirelen", "caplen"])  # noqa: E501
1444
1445    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
1446        # type: (str, _ByteStream, bytes) -> None
1447        self.filename = filename
1448        self.f = fdesc
1449        if magic == b"\xa1\xb2\xc3\xd4":  # big endian
1450            self.endian = ">"
1451            self.nano = False
1452        elif magic == b"\xd4\xc3\xb2\xa1":  # little endian
1453            self.endian = "<"
1454            self.nano = False
1455        elif magic == b"\xa1\xb2\x3c\x4d":  # big endian, nanosecond-precision
1456            self.endian = ">"
1457            self.nano = True
1458        elif magic == b"\x4d\x3c\xb2\xa1":  # little endian, nanosecond-precision  # noqa: E501
1459            self.endian = "<"
1460            self.nano = True
1461        else:
1462            raise Scapy_Exception(
1463                "Not a pcap capture file (bad magic: %r)" % magic
1464            )
1465        hdr = self.f.read(20)
1466        if len(hdr) < 20:
1467            raise Scapy_Exception("Invalid pcap file (too short)")
1468        vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack(
1469            self.endian + "HHIIII", hdr
1470        )
1471        self.linktype = linktype
1472        self.snaplen = snaplen
1473
1474    def __enter__(self):
1475        # type: () -> RawPcapReader
1476        return self
1477
1478    def __iter__(self):
1479        # type: () -> RawPcapReader
1480        return self
1481
1482    def __next__(self):
1483        # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata]
1484        """
1485        implement the iterator protocol on a set of packets in a pcap file
1486        """
1487        try:
1488            return self._read_packet()
1489        except EOFError:
1490            raise StopIteration
1491
1492    def _read_packet(self, size=MTU):
1493        # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata]
1494        """return a single packet read from the file as a tuple containing
1495        (pkt_data, pkt_metadata)
1496
1497        raise EOFError when no more packets are available
1498        """
1499        hdr = self.f.read(16)
1500        if len(hdr) < 16:
1501            raise EOFError
1502        sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr)
1503
1504        try:
1505            data = self.f.read(caplen)[:size]
1506        except OverflowError as e:
1507            warning(f"Pcap: {e}")
1508            raise EOFError
1509
1510        return (data,
1511                RawPcapReader.PacketMetadata(sec=sec, usec=usec,
1512                                             wirelen=wirelen, caplen=caplen))
1513
1514    def read_packet(self, size=MTU):
1515        # type: (int) -> Packet
1516        raise Exception(
1517            "Cannot call read_packet() in RawPcapReader. Use "
1518            "_read_packet()"
1519        )
1520
1521    def dispatch(self,
1522                 callback  # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any]  # noqa: E501
1523                 ):
1524        # type: (...) -> None
1525        """call the specified callback routine for each packet read
1526
1527        This is just a convenience function for the main loop
1528        that allows for easy launching of packet processing in a
1529        thread.
1530        """
1531        for p in self:
1532            callback(p)
1533
1534    def _read_all(self, count=-1):
1535        # type: (int) -> List[Packet]
1536        """return a list of all packets in the pcap file
1537        """
1538        res = []  # type: List[Packet]
1539        while count != 0:
1540            count -= 1
1541            try:
1542                p = self.read_packet()  # type: Packet
1543            except EOFError:
1544                break
1545            res.append(p)
1546        return res
1547
1548    def recv(self, size=MTU):
1549        # type: (int) -> bytes
1550        """ Emulate a socket
1551        """
1552        return self._read_packet(size=size)[0]
1553
1554    def fileno(self):
1555        # type: () -> int
1556        return -1 if WINDOWS else self.f.fileno()
1557
1558    def close(self):
1559        # type: () -> None
1560        if isinstance(self.f, gzip.GzipFile):
1561            self.f.fileobj.close()  # type: ignore
1562        self.f.close()
1563
1564    def __exit__(self, exc_type, exc_value, tracback):
1565        # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
1566        self.close()
1567
1568    # emulate SuperSocket
1569    @staticmethod
1570    def select(sockets,  # type: List[SuperSocket]
1571               remain=None,  # type: Optional[float]
1572               ):
1573        # type: (...) -> List[SuperSocket]
1574        return sockets
1575
1576
1577class PcapReader(RawPcapReader):
1578    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
1579        # type: (str, IO[bytes], bytes) -> None
1580        RawPcapReader.__init__(self, filename, fdesc, magic)
1581        try:
1582            self.LLcls = conf.l2types.num2layer[
1583                self.linktype
1584            ]  # type: Type[Packet]
1585        except KeyError:
1586            warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype))  # noqa: E501
1587            if conf.raw_layer is None:
1588                # conf.raw_layer is set on import
1589                import scapy.packet  # noqa: F401
1590            self.LLcls = conf.raw_layer
1591
1592    def __enter__(self):
1593        # type: () -> PcapReader
1594        return self
1595
1596    def read_packet(self, size=MTU, **kwargs):
1597        # type: (int, **Any) -> Packet
1598        rp = super(PcapReader, self)._read_packet(size=size)
1599        if rp is None:
1600            raise EOFError
1601        s, pkt_info = rp
1602
1603        try:
1604            p = self.LLcls(s, **kwargs)  # type: Packet
1605        except KeyboardInterrupt:
1606            raise
1607        except Exception:
1608            if conf.debug_dissector:
1609                from scapy.sendrecv import debug
1610                debug.crashed_on = (self.LLcls, s)
1611                raise
1612            if conf.raw_layer is None:
1613                # conf.raw_layer is set on import
1614                import scapy.packet  # noqa: F401
1615            p = conf.raw_layer(s)
1616        power = Decimal(10) ** Decimal(-9 if self.nano else -6)
1617        p.time = EDecimal(pkt_info.sec + power * pkt_info.usec)
1618        p.wirelen = pkt_info.wirelen
1619        return p
1620
1621    def recv(self, size=MTU, **kwargs):  # type: ignore
1622        # type: (int, **Any) -> Packet
1623        return self.read_packet(size=size, **kwargs)
1624
1625    def __next__(self):  # type: ignore
1626        # type: () -> Packet
1627        try:
1628            return self.read_packet()
1629        except EOFError:
1630            raise StopIteration
1631
1632    def read_all(self, count=-1):
1633        # type: (int) -> PacketList
1634        res = self._read_all(count)
1635        from scapy import plist
1636        return plist.PacketList(res, name=os.path.basename(self.filename))
1637
1638
1639class RawPcapNgReader(RawPcapReader):
1640    """A stateful pcapng reader. Each packet is returned as
1641    bytes.
1642
1643    """
1644
1645    alternative = RawPcapReader  # type: Type[Any]
1646
1647    PacketMetadata = collections.namedtuple("PacketMetadataNg",  # type: ignore
1648                                            ["linktype", "tsresol",
1649                                             "tshigh", "tslow", "wirelen",
1650                                             "comment", "ifname", "direction",
1651                                             "process_information"])
1652
1653    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
1654        # type: (str, IO[bytes], bytes) -> None
1655        self.filename = filename
1656        self.f = fdesc
1657        # A list of (linktype, snaplen, tsresol); will be populated by IDBs.
1658        self.interfaces = []  # type: List[Tuple[int, int, Dict[str, Any]]]
1659        self.default_options = {
1660            "tsresol": 1000000
1661        }
1662        self.blocktypes: Dict[
1663            int,
1664            Callable[
1665                [bytes, int],
1666                Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]]
1667            ]] = {
1668                1: self._read_block_idb,
1669                2: self._read_block_pkt,
1670                3: self._read_block_spb,
1671                6: self._read_block_epb,
1672                10: self._read_block_dsb,
1673                0x80000001: self._read_block_pib,
1674        }
1675        self.endian = "!"  # Will be overwritten by first SHB
1676        self.process_information = []  # type: List[Dict[str, Any]]
1677
1678        if magic != b"\x0a\x0d\x0d\x0a":  # PcapNg:
1679            raise Scapy_Exception(
1680                "Not a pcapng capture file (bad magic: %r)" % magic
1681            )
1682
1683        try:
1684            self._read_block_shb()
1685        except EOFError:
1686            raise Scapy_Exception(
1687                "The first SHB of the pcapng file is malformed !"
1688            )
1689
1690    def _read_block(self, size=MTU):
1691        # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]]  # noqa: E501
1692        try:
1693            blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0]
1694        except struct.error:
1695            raise EOFError
1696        if blocktype == 0x0A0D0D0A:
1697            # This function updates the endianness based on the block content.
1698            self._read_block_shb()
1699            return None
1700        try:
1701            blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0]
1702        except struct.error:
1703            warning("PcapNg: Error reading blocklen before block body")
1704            raise EOFError
1705        if blocklen < 12:
1706            warning("PcapNg: Invalid block length !")
1707            raise EOFError
1708
1709        _block_body_length = blocklen - 12
1710        block = self.f.read(_block_body_length)
1711        if len(block) != _block_body_length:
1712            raise Scapy_Exception("PcapNg: Invalid Block body length "
1713                                  "(too short)")
1714        self._read_block_tail(blocklen)
1715        if blocktype in self.blocktypes:
1716            return self.blocktypes[blocktype](block, size)
1717        return None
1718
1719    def _read_block_tail(self, blocklen):
1720        # type: (int) -> None
1721        if blocklen % 4:
1722            pad = self.f.read(-blocklen % 4)
1723            warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. "
1724                    "Ignored padding %r" % (blocklen, pad))
1725        try:
1726            if blocklen != struct.unpack(self.endian + 'I',
1727                                         self.f.read(4))[0]:
1728                raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)")
1729        except struct.error:
1730            warning("PcapNg: Could not read blocklen after block body")
1731            raise EOFError
1732
1733    def _read_block_shb(self):
1734        # type: () -> None
1735        """Section Header Block"""
1736        _blocklen = self.f.read(4)
1737        endian = self.f.read(4)
1738        if endian == b"\x1a\x2b\x3c\x4d":
1739            self.endian = ">"
1740        elif endian == b"\x4d\x3c\x2b\x1a":
1741            self.endian = "<"
1742        else:
1743            warning("PcapNg: Bad magic in Section Header Block"
1744                    " (not a pcapng file?)")
1745            raise EOFError
1746
1747        try:
1748            blocklen = struct.unpack(self.endian + "I", _blocklen)[0]
1749        except struct.error:
1750            warning("PcapNg: Could not read blocklen")
1751            raise EOFError
1752        if blocklen < 28:
1753            warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!")  # noqa: E501
1754            raise EOFError
1755
1756        # Major version must be 1
1757        _major = self.f.read(2)
1758        try:
1759            major = struct.unpack(self.endian + "H", _major)[0]
1760        except struct.error:
1761            warning("PcapNg: Could not read major value")
1762            raise EOFError
1763        if major != 1:
1764            warning(f"PcapNg: SHB Major version {major} unsupported !")
1765            raise EOFError
1766
1767        # Skip minor version & section length
1768        skipped = self.f.read(10)
1769        if len(skipped) != 10:
1770            warning("PcapNg: Could not read minor value & section length")
1771            raise EOFError
1772
1773        _options_len = blocklen - 28
1774        options = self.f.read(_options_len)
1775        if len(options) != _options_len:
1776            raise Scapy_Exception("PcapNg: Invalid Section Header Block "
1777                                  " options (too short)")
1778        self._read_block_tail(blocklen)
1779        self._read_options(options)
1780
1781    def _read_packet(self, size=MTU):  # type: ignore
1782        # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1783        """Read blocks until it reaches either EOF or a packet, and
1784        returns None or (packet, (linktype, sec, usec, wirelen)),
1785        where packet is a string.
1786
1787        """
1788        while True:
1789            res = self._read_block()
1790            if res is not None:
1791                return res
1792
1793    def _read_options(self, options):
1794        # type: (bytes) -> Dict[int, bytes]
1795        opts = dict()
1796        while len(options) >= 4:
1797            try:
1798                code, length = struct.unpack(self.endian + "HH", options[:4])
1799            except struct.error:
1800                warning("PcapNg: options header is too small "
1801                        "%d !" % len(options))
1802                raise EOFError
1803            if code != 0 and 4 + length < len(options):
1804                opts[code] = options[4:4 + length]
1805            if code == 0:
1806                if length != 0:
1807                    warning("PcapNg: invalid option "
1808                            "length %d for end-of-option" % length)
1809                break
1810            if length % 4:
1811                length += (4 - (length % 4))
1812            options = options[4 + length:]
1813        return opts
1814
1815    def _read_block_idb(self, block, _):
1816        # type: (bytes, int) -> None
1817        """Interface Description Block"""
1818        # 2 bytes LinkType + 2 bytes Reserved
1819        # 4 bytes Snaplen
1820        options_raw = self._read_options(block[8:])
1821        options = self.default_options.copy()  # type: Dict[str, Any]
1822        for c, v in options_raw.items():
1823            if c == 9:
1824                length = len(v)
1825                if length == 1:
1826                    tsresol = orb(v)
1827                    options["tsresol"] = (2 if tsresol & 128 else 10) ** (
1828                        tsresol & 127
1829                    )
1830                else:
1831                    warning("PcapNg: invalid options "
1832                            "length %d for IDB tsresol" % length)
1833            elif c == 2:
1834                options["name"] = v
1835            elif c == 1:
1836                options["comment"] = v
1837        try:
1838            interface: Tuple[int, int, Dict[str, Any]] = struct.unpack(
1839                self.endian + "HxxI",
1840                block[:8]
1841            ) + (options,)
1842        except struct.error:
1843            warning("PcapNg: IDB is too small %d/8 !" % len(block))
1844            raise EOFError
1845        self.interfaces.append(interface)
1846
1847    def _check_interface_id(self, intid):
1848        # type: (int) -> None
1849        """Check the interface id value and raise EOFError if invalid."""
1850        tmp_len = len(self.interfaces)
1851        if intid >= tmp_len:
1852            warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len))
1853            raise EOFError
1854
1855    def _read_block_epb(self, block, size):
1856        # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1857        """Enhanced Packet Block"""
1858        try:
1859            intid, tshigh, tslow, caplen, wirelen = struct.unpack(
1860                self.endian + "5I",
1861                block[:20],
1862            )
1863        except struct.error:
1864            warning("PcapNg: EPB is too small %d/20 !" % len(block))
1865            raise EOFError
1866
1867        # Compute the options offset taking padding into account
1868        if caplen % 4:
1869            opt_offset = 20 + caplen + (-caplen) % 4
1870        else:
1871            opt_offset = 20 + caplen
1872
1873        # Parse options
1874        options = self._read_options(block[opt_offset:])
1875
1876        process_information = {}
1877        for code, value in options.items():
1878            if code in [0x8001, 0x8003]:  # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX
1879                try:
1880                    proc_index = struct.unpack(self.endian + "I", value)[0]
1881                except struct.error:
1882                    warning("PcapNg: EPB invalid proc index"
1883                            "(expected 4 bytes, got %d) !" % len(value))
1884                    raise EOFError
1885                if proc_index < len(self.process_information):
1886                    key = "proc" if code == 0x8001 else "eproc"
1887                    process_information[key] = self.process_information[proc_index]
1888                else:
1889                    warning("PcapNg: EPB invalid process information index "
1890                            "(%d/%d) !" % (proc_index, len(self.process_information)))
1891
1892        comment = options.get(1, None)
1893        epb_flags_raw = options.get(2, None)
1894        if epb_flags_raw:
1895            try:
1896                epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw)
1897            except struct.error:
1898                warning("PcapNg: EPB invalid flags size"
1899                        "(expected 4 bytes, got %d) !" % len(epb_flags_raw))
1900                raise EOFError
1901            direction = epb_flags & 3
1902
1903        else:
1904            direction = None
1905
1906        self._check_interface_id(intid)
1907        ifname = self.interfaces[intid][2].get('name', None)
1908
1909        return (block[20:20 + caplen][:size],
1910                RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0],  # noqa: E501
1911                                               tsresol=self.interfaces[intid][2]['tsresol'],  # noqa: E501
1912                                               tshigh=tshigh,
1913                                               tslow=tslow,
1914                                               wirelen=wirelen,
1915                                               comment=comment,
1916                                               ifname=ifname,
1917                                               direction=direction,
1918                                               process_information=process_information))
1919
1920    def _read_block_spb(self, block, size):
1921        # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1922        """Simple Packet Block"""
1923        # "it MUST be assumed that all the Simple Packet Blocks have
1924        # been captured on the interface previously specified in the
1925        # first Interface Description Block."
1926        intid = 0
1927        self._check_interface_id(intid)
1928
1929        try:
1930            wirelen, = struct.unpack(self.endian + "I", block[:4])
1931        except struct.error:
1932            warning("PcapNg: SPB is too small %d/4 !" % len(block))
1933            raise EOFError
1934
1935        caplen = min(wirelen, self.interfaces[intid][1])
1936        return (block[4:4 + caplen][:size],
1937                RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0],  # noqa: E501
1938                                               tsresol=self.interfaces[intid][2]['tsresol'],  # noqa: E501
1939                                               tshigh=None,
1940                                               tslow=None,
1941                                               wirelen=wirelen,
1942                                               comment=None,
1943                                               ifname=None,
1944                                               direction=None,
1945                                               process_information={}))
1946
1947    def _read_block_pkt(self, block, size):
1948        # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata]
1949        """(Obsolete) Packet Block"""
1950        try:
1951            intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack(
1952                self.endian + "HH4I",
1953                block[:20],
1954            )
1955        except struct.error:
1956            warning("PcapNg: PKT is too small %d/20 !" % len(block))
1957            raise EOFError
1958
1959        self._check_interface_id(intid)
1960        return (block[20:20 + caplen][:size],
1961                RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0],  # noqa: E501
1962                                               tsresol=self.interfaces[intid][2]['tsresol'],  # noqa: E501
1963                                               tshigh=tshigh,
1964                                               tslow=tslow,
1965                                               wirelen=wirelen,
1966                                               comment=None,
1967                                               ifname=None,
1968                                               direction=None,
1969                                               process_information={}))
1970
1971    def _read_block_dsb(self, block, size):
1972        # type: (bytes, int) -> None
1973        """Decryption Secrets Block"""
1974
1975        # Parse the secrets type and length fields
1976        try:
1977            secrets_type, secrets_length = struct.unpack(
1978                self.endian + "II",
1979                block[:8],
1980            )
1981            block = block[8:]
1982        except struct.error:
1983            warning("PcapNg: DSB is too small %d!", len(block))
1984            raise EOFError
1985
1986        # Compute the secrets length including the padding
1987        padded_secrets_length = secrets_length + (-secrets_length) % 4
1988        if len(block) < padded_secrets_length:
1989            warning("PcapNg: invalid DSB secrets length!")
1990            raise EOFError
1991
1992        # Extract secrets data and options
1993        secrets_data = block[:padded_secrets_length][:secrets_length]
1994        if block[padded_secrets_length:]:
1995            warning("PcapNg: DSB options are not supported!")
1996
1997        # TLS Key Log
1998        if secrets_type == 0x544c534b:
1999            if getattr(conf, "tls_sessions", False) is False:
2000                warning("PcapNg: TLS Key Log available, but "
2001                        "the TLS layer is not loaded! Scapy won't be able "
2002                        "to decrypt the packets.")
2003            else:
2004                from scapy.layers.tls.session import load_nss_keys
2005
2006                # Write Key Log to a file and parse it
2007                filename = get_temp_file()
2008                with open(filename, "wb") as fd:
2009                    fd.write(secrets_data)
2010                    fd.close()
2011
2012                keys = load_nss_keys(filename)
2013                if not keys:
2014                    warning("PcapNg: invalid TLS Key Log in DSB!")
2015                else:
2016                    # Note: these attributes are only available when the TLS
2017                    #       layer is loaded.
2018                    conf.tls_nss_keys = keys
2019                    conf.tls_session_enable = True
2020        else:
2021            warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type)
2022
2023    def _read_block_pib(self, block, _):
2024        # type: (bytes, int) -> None
2025        """Apple Process Information Block"""
2026
2027        # Get the Process ID
2028        try:
2029            dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0]
2030            process_information = {"id": dpeb_pid}
2031            block = block[4:]
2032        except struct.error:
2033            warning("PcapNg: DPEB is too small (%d). Cannot get PID!",
2034                    len(block))
2035            raise EOFError
2036
2037        # Get Options
2038        options = self._read_options(block)
2039        for code, value in options.items():
2040            if code == 2:
2041                process_information["name"] = value.decode("ascii", "backslashreplace")
2042            elif code == 4:
2043                if len(value) == 16:
2044                    process_information["uuid"] = str(UUID(bytes=value))
2045                else:
2046                    warning("PcapNg: DPEB UUID length is invalid (%d)!",
2047                            len(value))
2048
2049        # Store process information
2050        self.process_information.append(process_information)
2051
2052
2053class PcapNgReader(RawPcapNgReader, PcapReader):
2054
2055    alternative = PcapReader
2056
2057    def __init__(self, filename, fdesc=None, magic=None):  # type: ignore
2058        # type: (str, IO[bytes], bytes) -> None
2059        RawPcapNgReader.__init__(self, filename, fdesc, magic)
2060
2061    def __enter__(self):
2062        # type: () -> PcapNgReader
2063        return self
2064
2065    def read_packet(self, size=MTU, **kwargs):
2066        # type: (int, **Any) -> Packet
2067        rp = super(PcapNgReader, self)._read_packet(size=size)
2068        if rp is None:
2069            raise EOFError
2070        s, (linktype, tsresol, tshigh, tslow, wirelen, comment, ifname, direction, process_information) = rp  # noqa: E501
2071        try:
2072            cls = conf.l2types.num2layer[linktype]  # type: Type[Packet]
2073            p = cls(s, **kwargs)  # type: Packet
2074        except KeyboardInterrupt:
2075            raise
2076        except Exception:
2077            if conf.debug_dissector:
2078                raise
2079            if conf.raw_layer is None:
2080                # conf.raw_layer is set on import
2081                import scapy.packet  # noqa: F401
2082            p = conf.raw_layer(s)
2083        if tshigh is not None:
2084            p.time = EDecimal((tshigh << 32) + tslow) / tsresol
2085        p.wirelen = wirelen
2086        p.comment = comment
2087        p.direction = direction
2088        p.process_information = process_information.copy()
2089        if ifname is not None:
2090            p.sniffed_on = ifname.decode('utf-8', 'backslashreplace')
2091        return p
2092
2093    def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet':  # type: ignore
2094        return self.read_packet(size=size, **kwargs)
2095
2096
2097class GenericPcapWriter(object):
2098    nano = False
2099    linktype: int
2100
2101    def _write_header(self, pkt):
2102        # type: (Optional[Union[Packet, bytes]]) -> None
2103        raise NotImplementedError
2104
2105    def _write_packet(self,
2106                      packet,  # type: Union[bytes, Packet]
2107                      linktype,  # type: int
2108                      sec=None,  # type: Optional[float]
2109                      usec=None,  # type: Optional[int]
2110                      caplen=None,  # type: Optional[int]
2111                      wirelen=None,  # type: Optional[int]
2112                      comment=None,  # type: Optional[bytes]
2113                      ifname=None,  # type: Optional[bytes]
2114                      direction=None,  # type: Optional[int]
2115                      ):
2116        # type: (...) -> None
2117        raise NotImplementedError
2118
2119    def _get_time(self,
2120                  packet,  # type: Union[bytes, Packet]
2121                  sec,  # type: Optional[float]
2122                  usec  # type: Optional[int]
2123                  ):
2124        # type: (...) -> Tuple[float, int]
2125        if hasattr(packet, "time"):
2126            if sec is None:
2127                packet_time = packet.time
2128                tmp = int(packet_time)
2129                usec = int(round((packet_time - tmp) *
2130                           (1000000000 if self.nano else 1000000)))
2131                sec = float(packet_time)
2132        if sec is not None and usec is None:
2133            usec = 0
2134        return sec, usec  # type: ignore
2135
2136    def write_header(self, pkt):
2137        # type: (Optional[Union[Packet, bytes]]) -> None
2138        if not hasattr(self, 'linktype'):
2139            try:
2140                if pkt is None or isinstance(pkt, bytes):
2141                    # Can't guess LL
2142                    raise KeyError
2143                self.linktype = conf.l2types.layer2num[
2144                    pkt.__class__
2145                ]
2146            except KeyError:
2147                msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)"
2148                warning(msg, self.__class__.__name__, pkt.__class__.__name__)
2149                self.linktype = DLT_EN10MB
2150        self._write_header(pkt)
2151
2152    def write_packet(self,
2153                     packet,  # type: Union[bytes, Packet]
2154                     sec=None,  # type: Optional[float]
2155                     usec=None,  # type: Optional[int]
2156                     caplen=None,  # type: Optional[int]
2157                     wirelen=None,  # type: Optional[int]
2158                     ):
2159        # type: (...) -> None
2160        """
2161        Writes a single packet to the pcap file.
2162
2163        :param packet: Packet, or bytes for a single packet
2164        :type packet: scapy.packet.Packet or bytes
2165        :param sec: time the packet was captured, in seconds since epoch. If
2166                    not supplied, defaults to now.
2167        :type sec: float
2168        :param usec: If ``nano=True``, then number of nanoseconds after the
2169                     second that the packet was captured. If ``nano=False``,
2170                     then the number of microseconds after the second the
2171                     packet was captured. If ``sec`` is not specified,
2172                     this value is ignored.
2173        :type usec: int or long
2174        :param caplen: The length of the packet in the capture file. If not
2175                       specified, uses ``len(raw(packet))``.
2176        :type caplen: int
2177        :param wirelen: The length of the packet on the wire. If not
2178                        specified, tries ``packet.wirelen``, otherwise uses
2179                        ``caplen``.
2180        :type wirelen: int
2181        :return: None
2182        :rtype: None
2183        """
2184        f_sec, usec = self._get_time(packet, sec, usec)
2185
2186        rawpkt = bytes_encode(packet)
2187        caplen = len(rawpkt) if caplen is None else caplen
2188
2189        if wirelen is None:
2190            if hasattr(packet, "wirelen"):
2191                wirelen = packet.wirelen
2192        if wirelen is None:
2193            wirelen = caplen
2194
2195        comment = getattr(packet, "comment", None)
2196        ifname = getattr(packet, "sniffed_on", None)
2197        direction = getattr(packet, "direction", None)
2198        if not isinstance(packet, bytes):
2199            linktype: int = conf.l2types.layer2num[
2200                packet.__class__
2201            ]
2202        else:
2203            linktype = self.linktype
2204        if ifname is not None:
2205            ifname = str(ifname).encode('utf-8')
2206        self._write_packet(
2207            rawpkt,
2208            sec=f_sec, usec=usec,
2209            caplen=caplen, wirelen=wirelen,
2210            comment=comment,
2211            ifname=ifname,
2212            direction=direction,
2213            linktype=linktype
2214        )
2215
2216
2217class GenericRawPcapWriter(GenericPcapWriter):
2218    header_present = False
2219    nano = False
2220    sync = False
2221    f = None  # type: Union[IO[bytes], gzip.GzipFile]
2222
2223    def fileno(self):
2224        # type: () -> int
2225        return -1 if WINDOWS else self.f.fileno()
2226
2227    def flush(self):
2228        # type: () -> Optional[Any]
2229        return self.f.flush()
2230
2231    def close(self):
2232        # type: () -> Optional[Any]
2233        if not self.header_present:
2234            self.write_header(None)
2235        return self.f.close()
2236
2237    def __enter__(self):
2238        # type: () -> GenericRawPcapWriter
2239        return self
2240
2241    def __exit__(self, exc_type, exc_value, tracback):
2242        # type: (Optional[Any], Optional[Any], Optional[Any]) -> None
2243        self.flush()
2244        self.close()
2245
2246    def write(self, pkt):
2247        # type: (Union[_PacketIterable, bytes]) -> None
2248        """
2249        Writes a Packet, a SndRcvList object, or bytes to a pcap file.
2250
2251        :param pkt: Packet(s) to write (one record for each Packet), or raw
2252                    bytes to write (as one record).
2253        :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes
2254        """
2255        if isinstance(pkt, bytes):
2256            if not self.header_present:
2257                self.write_header(pkt)
2258            self.write_packet(pkt)
2259        else:
2260            # Import here to avoid circular dependency
2261            from scapy.supersocket import IterSocket
2262            for p in IterSocket(pkt).iter:
2263                if not self.header_present:
2264                    self.write_header(p)
2265
2266                if not isinstance(p, bytes) and \
2267                        self.linktype != conf.l2types.get(type(p), None):
2268                    warning("Inconsistent linktypes detected!"
2269                            " The resulting file might contain"
2270                            " invalid packets."
2271                            )
2272
2273                self.write_packet(p)
2274
2275
2276class RawPcapWriter(GenericRawPcapWriter):
2277    """A stream PCAP writer with more control than wrpcap()"""
2278
2279    def __init__(self,
2280                 filename,  # type: Union[IO[bytes], str]
2281                 linktype=None,  # type: Optional[int]
2282                 gz=False,  # type: bool
2283                 endianness="",  # type: str
2284                 append=False,  # type: bool
2285                 sync=False,  # type: bool
2286                 nano=False,  # type: bool
2287                 snaplen=MTU,  # type: int
2288                 bufsz=4096,  # type: int
2289                 ):
2290        # type: (...) -> None
2291        """
2292        :param filename: the name of the file to write packets to, or an open,
2293            writable file-like object.
2294        :param linktype: force linktype to a given value. If None, linktype is
2295            taken from the first writer packet
2296        :param gz: compress the capture on the fly
2297        :param endianness: force an endianness (little:"<", big:">").
2298            Default is native
2299        :param append: append packets to the capture file instead of
2300            truncating it
2301        :param sync: do not bufferize writes to the capture file
2302        :param nano: use nanosecond-precision (requires libpcap >= 1.5.0)
2303
2304        """
2305
2306        if linktype:
2307            self.linktype = linktype
2308        self.snaplen = snaplen
2309        self.append = append
2310        self.gz = gz
2311        self.endian = endianness
2312        self.sync = sync
2313        self.nano = nano
2314        if sync:
2315            bufsz = 0
2316
2317        if isinstance(filename, str):
2318            self.filename = filename
2319            if gz:
2320                self.f = cast(_ByteStream, gzip.open(
2321                    filename, append and "ab" or "wb", 9
2322                ))
2323            else:
2324                self.f = open(filename, append and "ab" or "wb", bufsz)
2325        else:
2326            self.f = filename
2327            self.filename = getattr(filename, "name", "No name")
2328
2329    def _write_header(self, pkt):
2330        # type: (Optional[Union[Packet, bytes]]) -> None
2331        self.header_present = True
2332
2333        if self.append:
2334            # Even if prone to race conditions, this seems to be
2335            # safest way to tell whether the header is already present
2336            # because we have to handle compressed streams that
2337            # are not as flexible as basic files
2338            if self.gz:
2339                g = gzip.open(self.filename, "rb")  # type: _ByteStream
2340            else:
2341                g = open(self.filename, "rb")
2342            try:
2343                if g.read(16):
2344                    return
2345            finally:
2346                g.close()
2347
2348        if not hasattr(self, 'linktype'):
2349            raise ValueError(
2350                "linktype could not be guessed. "
2351                "Please pass a linktype while creating the writer"
2352            )
2353
2354        self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4,  # noqa: E501
2355                                 2, 4, 0, 0, self.snaplen, self.linktype))
2356        self.f.flush()
2357
2358    def _write_packet(self,
2359                      packet,  # type: Union[bytes, Packet]
2360                      linktype,  # type: int
2361                      sec=None,  # type: Optional[float]
2362                      usec=None,  # type: Optional[int]
2363                      caplen=None,  # type: Optional[int]
2364                      wirelen=None,  # type: Optional[int]
2365                      comment=None,  # type: Optional[bytes]
2366                      ifname=None,  # type: Optional[bytes]
2367                      direction=None,  # type: Optional[int]
2368                      ):
2369        # type: (...) -> None
2370        """
2371        Writes a single packet to the pcap file.
2372
2373        :param packet: bytes for a single packet
2374        :type packet: bytes
2375        :param linktype: linktype value associated with the packet
2376        :type linktype: int
2377        :param sec: time the packet was captured, in seconds since epoch. If
2378                    not supplied, defaults to now.
2379        :type sec: float
2380        :param usec: not used with pcapng
2381                     packet was captured
2382        :type usec: int or long
2383        :param caplen: The length of the packet in the capture file. If not
2384                       specified, uses ``len(packet)``.
2385        :type caplen: int
2386        :param wirelen: The length of the packet on the wire. If not
2387                        specified, uses ``caplen``.
2388        :type wirelen: int
2389        :return: None
2390        :rtype: None
2391        """
2392        if caplen is None:
2393            caplen = len(packet)
2394        if wirelen is None:
2395            wirelen = caplen
2396        if sec is None or usec is None:
2397            t = time.time()
2398            it = int(t)
2399            if sec is None:
2400                sec = it
2401                usec = int(round((t - it) *
2402                                 (1000000000 if self.nano else 1000000)))
2403            elif usec is None:
2404                usec = 0
2405
2406        self.f.write(struct.pack(self.endian + "IIII",
2407                                 int(sec), usec, caplen, wirelen))
2408        self.f.write(bytes(packet))
2409        if self.sync:
2410            self.f.flush()
2411
2412
2413class RawPcapNgWriter(GenericRawPcapWriter):
2414    """A stream pcapng writer with more control than wrpcapng()"""
2415
2416    def __init__(self,
2417                 filename,  # type: str
2418                 ):
2419        # type: (...) -> None
2420
2421        self.header_present = False
2422        self.tsresol = 1000000
2423        # A dict to keep if_name to IDB id mapping.
2424        # unknown if_name(None) id=0
2425        self.interfaces2id: Dict[Optional[bytes], int] = {None: 0}
2426
2427        # tcpdump only support little-endian in PCAPng files
2428        self.endian = "<"
2429        self.endian_magic = b"\x4d\x3c\x2b\x1a"
2430
2431        self.filename = filename
2432        self.f = open(filename, "wb", 4096)
2433
2434    def _get_time(self,
2435                  packet,  # type: Union[bytes, Packet]
2436                  sec,  # type: Optional[float]
2437                  usec  # type: Optional[int]
2438                  ):
2439        # type: (...) -> Tuple[float, int]
2440        if hasattr(packet, "time"):
2441            if sec is None:
2442                sec = float(packet.time)
2443
2444        if usec is None:
2445            usec = 0
2446
2447        return sec, usec  # type: ignore
2448
2449    def _add_padding(self, raw_data):
2450        # type: (bytes) -> bytes
2451        raw_data += ((-len(raw_data)) % 4) * b"\x00"
2452        return raw_data
2453
2454    def build_block(self, block_type, block_body, options=None):
2455        # type: (bytes, bytes, Optional[bytes]) -> bytes
2456
2457        # Pad Block Body to 32 bits
2458        block_body = self._add_padding(block_body)
2459
2460        if options:
2461            block_body += options
2462
2463        # An empty block is 12 bytes long
2464        block_total_length = 12 + len(block_body)
2465
2466        # Block Type
2467        block = block_type
2468        # Block Total Length$
2469        block += struct.pack(self.endian + "I", block_total_length)
2470        # Block Body
2471        block += block_body
2472        # Block Total Length$
2473        block += struct.pack(self.endian + "I", block_total_length)
2474
2475        return block
2476
2477    def _write_header(self, pkt):
2478        # type: (Optional[Union[Packet, bytes]]) -> None
2479        if not self.header_present:
2480            self.header_present = True
2481            self._write_block_shb()
2482            self._write_block_idb(linktype=self.linktype)
2483
2484    def _write_block_shb(self):
2485        # type: () -> None
2486
2487        # Block Type
2488        block_type = b"\x0A\x0D\x0D\x0A"
2489        # Byte-Order Magic
2490        block_shb = self.endian_magic
2491        # Major Version
2492        block_shb += struct.pack(self.endian + "H", 1)
2493        # Minor Version
2494        block_shb += struct.pack(self.endian + "H", 0)
2495        # Section Length
2496        block_shb += struct.pack(self.endian + "q", -1)
2497
2498        self.f.write(self.build_block(block_type, block_shb))
2499
2500    def _write_block_idb(self,
2501                         linktype,  # type: int
2502                         ifname=None  # type: Optional[bytes]
2503                         ):
2504        # type: (...) -> None
2505
2506        # Block Type
2507        block_type = struct.pack(self.endian + "I", 1)
2508        # LinkType
2509        block_idb = struct.pack(self.endian + "H", linktype)
2510        # Reserved
2511        block_idb += struct.pack(self.endian + "H", 0)
2512        # SnapLen
2513        block_idb += struct.pack(self.endian + "I", 262144)
2514
2515        # if_name option
2516        opts = None
2517        if ifname is not None:
2518            opts = struct.pack(self.endian + "HH", 2, len(ifname))
2519            # Pad Option Value to 32 bits
2520            opts += self._add_padding(ifname)
2521            opts += struct.pack(self.endian + "HH", 0, 0)
2522
2523        self.f.write(self.build_block(block_type, block_idb, options=opts))
2524
2525    def _write_block_spb(self, raw_pkt):
2526        # type: (bytes) -> None
2527
2528        # Block Type
2529        block_type = struct.pack(self.endian + "I", 3)
2530        # Original Packet Length
2531        block_spb = struct.pack(self.endian + "I", len(raw_pkt))
2532        # Packet Data
2533        block_spb += raw_pkt
2534
2535        self.f.write(self.build_block(block_type, block_spb))
2536
2537    def _write_block_epb(self,
2538                         raw_pkt,  # type: bytes
2539                         ifid,  # type: int
2540                         timestamp=None,  # type: Optional[Union[EDecimal, float]]  # noqa: E501
2541                         caplen=None,  # type: Optional[int]
2542                         orglen=None,  # type: Optional[int]
2543                         comment=None,  # type: Optional[bytes]
2544                         flags=None,  # type: Optional[int]
2545                         ):
2546        # type: (...) -> None
2547
2548        if timestamp:
2549            tmp_ts = int(timestamp * self.tsresol)
2550            ts_high = tmp_ts >> 32
2551            ts_low = tmp_ts & 0xFFFFFFFF
2552        else:
2553            ts_high = ts_low = 0
2554
2555        if not caplen:
2556            caplen = len(raw_pkt)
2557
2558        if not orglen:
2559            orglen = len(raw_pkt)
2560
2561        # Block Type
2562        block_type = struct.pack(self.endian + "I", 6)
2563        # Interface ID
2564        block_epb = struct.pack(self.endian + "I", ifid)
2565        # Timestamp (High)
2566        block_epb += struct.pack(self.endian + "I", ts_high)
2567        # Timestamp (Low)
2568        block_epb += struct.pack(self.endian + "I", ts_low)
2569        # Captured Packet Length
2570        block_epb += struct.pack(self.endian + "I", caplen)
2571        # Original Packet Length
2572        block_epb += struct.pack(self.endian + "I", orglen)
2573        # Packet Data
2574        block_epb += raw_pkt
2575
2576        # Options
2577        opts = b''
2578        if comment is not None:
2579            comment = bytes_encode(comment)
2580            opts += struct.pack(self.endian + "HH", 1, len(comment))
2581            # Pad Option Value to 32 bits
2582            opts += self._add_padding(comment)
2583        if type(flags) == int:
2584            opts += struct.pack(self.endian + "HH", 2, 4)
2585            opts += struct.pack(self.endian + "I", flags)
2586        if opts:
2587            opts += struct.pack(self.endian + "HH", 0, 0)
2588
2589        self.f.write(self.build_block(block_type, block_epb,
2590                                      options=opts))
2591
2592    def _write_packet(self,  # type: ignore
2593                      packet,  # type: bytes
2594                      linktype,  # type: int
2595                      sec=None,  # type: Optional[float]
2596                      usec=None,  # type: Optional[int]
2597                      caplen=None,  # type: Optional[int]
2598                      wirelen=None,  # type: Optional[int]
2599                      comment=None,  # type: Optional[bytes]
2600                      ifname=None,  # type: Optional[bytes]
2601                      direction=None,  # type: Optional[int]
2602                      ):
2603        # type: (...) -> None
2604        """
2605        Writes a single packet to the pcap file.
2606
2607        :param packet: bytes for a single packet
2608        :type packet: bytes
2609        :param linktype: linktype value associated with the packet
2610        :type linktype: int
2611        :param sec: time the packet was captured, in seconds since epoch. If
2612                    not supplied, defaults to now.
2613        :type sec: float
2614        :param caplen: The length of the packet in the capture file. If not
2615                       specified, uses ``len(packet)``.
2616        :type caplen: int
2617        :param wirelen: The length of the packet on the wire. If not
2618                        specified, uses ``caplen``.
2619        :type wirelen: int
2620        :param comment: UTF-8 string containing human-readable comment text
2621                        that is associated to the current block. Line separators
2622                        SHOULD be a carriage-return + linefeed ('\r\n') or
2623                        just linefeed ('\n'); either form may appear and
2624                        be considered a line separator. The string is not
2625                        zero-terminated.
2626        :type bytes
2627        :param ifname: UTF-8 string containing the
2628                       name of the device used to capture data.
2629                       The string is not zero-terminated.
2630        :type bytes
2631        :param direction:  0 = information not available,
2632                           1 = inbound,
2633                           2 = outbound
2634        :type int
2635        :return: None
2636        :rtype: None
2637        """
2638        if caplen is None:
2639            caplen = len(packet)
2640        if wirelen is None:
2641            wirelen = caplen
2642
2643        ifid = self.interfaces2id.get(ifname, None)
2644        if ifid is None:
2645            ifid = max(self.interfaces2id.values()) + 1
2646            self.interfaces2id[ifname] = ifid
2647            self._write_block_idb(linktype=linktype, ifname=ifname)
2648
2649        # EPB flags (32 bits).
2650        # currently only direction is implemented (least 2 significant bits)
2651        if type(direction) == int:
2652            flags = direction & 0x3
2653        else:
2654            flags = None
2655
2656        self._write_block_epb(packet, timestamp=sec, caplen=caplen,
2657                              orglen=wirelen, comment=comment, ifid=ifid, flags=flags)
2658        if self.sync:
2659            self.f.flush()
2660
2661
2662class PcapWriter(RawPcapWriter):
2663    """A stream PCAP writer with more control than wrpcap()"""
2664    pass
2665
2666
2667class PcapNgWriter(RawPcapNgWriter):
2668    """A stream pcapng writer with more control than wrpcapng()"""
2669
2670    def _get_time(self,
2671                  packet,  # type: Union[bytes, Packet]
2672                  sec,  # type: Optional[float]
2673                  usec  # type: Optional[int]
2674                  ):
2675        # type: (...) -> Tuple[float, int]
2676        if hasattr(packet, "time"):
2677            if sec is None:
2678                sec = float(packet.time)
2679
2680        if usec is None:
2681            usec = 0
2682
2683        return sec, usec  # type: ignore
2684
2685
2686@conf.commands.register
2687def rderf(filename, count=-1):
2688    # type: (Union[IO[bytes], str], int) -> PacketList
2689    """Read a ERF file and return a packet list
2690
2691    :param count: read only <count> packets
2692    """
2693    with ERFEthernetReader(filename) as fdesc:
2694        return fdesc.read_all(count=count)
2695
2696
2697class ERFEthernetReader_metaclass(PcapReader_metaclass):
2698    def __call__(cls, filename):
2699        # type: (Union[IO[bytes], str]) -> Any
2700        i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__)  # type: ignore
2701        filename, fdesc = cls.open(filename)
2702        try:
2703            i.__init__(filename, fdesc)
2704            return i
2705        except (Scapy_Exception, EOFError):
2706            pass
2707
2708        if "alternative" in cls.__dict__:
2709            cls = cls.__dict__["alternative"]
2710            i = cls.__new__(
2711                cls,
2712                cls.__name__,
2713                cls.__bases__,
2714                cls.__dict__  # type: ignore
2715            )
2716            try:
2717                i.__init__(filename, fdesc)
2718                return i
2719            except (Scapy_Exception, EOFError):
2720                pass
2721
2722        raise Scapy_Exception("Not a supported capture file")
2723
2724    @staticmethod
2725    def open(fname  # type: ignore
2726             ):
2727        # type: (...) -> Tuple[str, _ByteStream]
2728        """Open (if necessary) filename"""
2729        if isinstance(fname, str):
2730            filename = fname
2731            try:
2732                with gzip.open(filename, "rb") as tmp:
2733                    tmp.read(1)
2734                fdesc = gzip.open(filename, "rb")  # type: _ByteStream
2735            except IOError:
2736                fdesc = open(filename, "rb")
2737
2738        else:
2739            fdesc = fname
2740            filename = getattr(fdesc, "name", "No name")
2741        return filename, fdesc
2742
2743
2744class ERFEthernetReader(PcapReader,
2745                        metaclass=ERFEthernetReader_metaclass):
2746
2747    def __init__(self, filename, fdesc=None):  # type: ignore
2748        # type: (Union[IO[bytes], str], IO[bytes]) -> None
2749        self.filename = filename  # type: ignore
2750        self.f = fdesc
2751        self.power = Decimal(10) ** Decimal(-9)
2752
2753    # time is in 64-bits Endace's format which can be see here:
2754    # https://www.endace.com/erf-extensible-record-format-types.pdf
2755    def _convert_erf_timestamp(self, t):
2756        # type: (int) -> EDecimal
2757        sec = t >> 32
2758        frac_sec = t & 0xffffffff
2759        frac_sec *= 10**9
2760        frac_sec += (frac_sec & 0x80000000) << 1
2761        frac_sec >>= 32
2762        return EDecimal(sec + self.power * frac_sec)
2763
2764    # The details of ERF Packet format can be see here:
2765    # https://www.endace.com/erf-extensible-record-format-types.pdf
2766    def read_packet(self, size=MTU, **kwargs):
2767        # type: (int, **Any) -> Packet
2768
2769        # General ERF Header have exactly 16 bytes
2770        hdr = self.f.read(16)
2771        if len(hdr) < 16:
2772            raise EOFError
2773
2774        # The timestamp is in little-endian byte-order.
2775        time = struct.unpack('<Q', hdr[:8])[0]
2776        # The rest is in big-endian byte-order.
2777        # Ignoring flags and lctr (loss counter) since they are ERF specific
2778        # header fields which Packet object does not support.
2779        type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:])
2780        # Check if the type != 0x02, type Ethernet
2781        if type & 0x02 == 0:
2782            raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)")
2783
2784        # If there are extended headers, ignore it because Packet object does
2785        # not support it. Extended headers size is 8 bytes before the payload.
2786        if type & 0x80:
2787            _ = self.f.read(8)
2788            s = self.f.read(rlen - 24)
2789        else:
2790            s = self.f.read(rlen - 16)
2791
2792        # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both
2793        # of the fields are disregarded by Endace.
2794        pb = s[2:size]
2795        from scapy.layers.l2 import Ether
2796        try:
2797            p = Ether(pb, **kwargs)  # type: Packet
2798        except KeyboardInterrupt:
2799            raise
2800        except Exception:
2801            if conf.debug_dissector:
2802                from scapy.sendrecv import debug
2803                debug.crashed_on = (Ether, s)
2804                raise
2805            if conf.raw_layer is None:
2806                # conf.raw_layer is set on import
2807                import scapy.packet  # noqa: F401
2808            p = conf.raw_layer(s)
2809
2810        p.time = self._convert_erf_timestamp(time)
2811        p.wirelen = wlen
2812
2813        return p
2814
2815
2816@conf.commands.register
2817def wrerf(filename,  # type: Union[IO[bytes], str]
2818          pkt,  # type: _PacketIterable
2819          *args,  # type: Any
2820          **kargs  # type: Any
2821          ):
2822    # type: (...) -> None
2823    """Write a list of packets to a ERF file
2824
2825    :param filename: the name of the file to write packets to, or an open,
2826        writable file-like object. The file descriptor will be
2827        closed at the end of the call, so do not use an object you
2828        do not want to close (e.g., running wrerf(sys.stdout, [])
2829        in interactive mode will crash Scapy).
2830    :param gz: set to 1 to save a gzipped capture
2831    :param append: append packets to the capture file instead of
2832        truncating it
2833    :param sync: do not bufferize writes to the capture file
2834    """
2835    with ERFEthernetWriter(filename, *args, **kargs) as fdesc:
2836        fdesc.write(pkt)
2837
2838
2839class ERFEthernetWriter(PcapWriter):
2840    """A stream ERF Ethernet writer with more control than wrerf()"""
2841
2842    def __init__(self,
2843                 filename,  # type: Union[IO[bytes], str]
2844                 gz=False,  # type: bool
2845                 append=False,  # type: bool
2846                 sync=False,  # type: bool
2847                 ):
2848        # type: (...) -> None
2849        """
2850        :param filename: the name of the file to write packets to, or an open,
2851            writable file-like object.
2852        :param gz: compress the capture on the fly
2853        :param append: append packets to the capture file instead of
2854            truncating it
2855        :param sync: do not bufferize writes to the capture file
2856        """
2857        super(ERFEthernetWriter, self).__init__(filename,
2858                                                gz=gz,
2859                                                append=append,
2860                                                sync=sync)
2861
2862    def write(self, pkt):  # type: ignore
2863        # type: (_PacketIterable) -> None
2864        """
2865        Writes a Packet, a SndRcvList object, or bytes to a ERF file.
2866
2867        :param pkt: Packet(s) to write (one record for each Packet)
2868        :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet
2869        """
2870        # Import here to avoid circular dependency
2871        from scapy.supersocket import IterSocket
2872        for p in IterSocket(pkt).iter:
2873            self.write_packet(p)
2874
2875    def write_packet(self, pkt):  # type: ignore
2876        # type: (Packet) -> None
2877
2878        if hasattr(pkt, "time"):
2879            sec = int(pkt.time)
2880            usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9)
2881            t = (sec << 32) + usec
2882        else:
2883            t = int(time.time()) << 32
2884
2885        # There are 16 bytes of headers + 2 bytes of padding before the packets
2886        # payload.
2887        rlen = len(pkt) + 18
2888
2889        if hasattr(pkt, "wirelen"):
2890            wirelen = pkt.wirelen
2891        if wirelen is None:
2892            wirelen = rlen
2893
2894        self.f.write(struct.pack("<Q", t))
2895        self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0))
2896        self.f.write(bytes(pkt))
2897        self.f.flush()
2898
2899    def close(self):
2900        # type: () -> Optional[Any]
2901        return self.f.close()
2902
2903
2904@conf.commands.register
2905def import_hexcap(input_string=None):
2906    # type: (Optional[str]) -> bytes
2907    """Imports a tcpdump like hexadecimal view
2908
2909    e.g: exported via hexdump() or tcpdump or wireshark's "export as hex"
2910
2911    :param input_string: String containing the hexdump input to parse. If None,
2912        read from standard input.
2913    """
2914    re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})")  # noqa: E501
2915    p = ""
2916    try:
2917        if input_string:
2918            input_function = StringIO(input_string).readline
2919        else:
2920            input_function = input
2921        while True:
2922            line = input_function().strip()
2923            if not line:
2924                break
2925            try:
2926                p += re_extract_hexcap.match(line).groups()[2]  # type: ignore
2927            except Exception:
2928                warning("Parsing error during hexcap")
2929                continue
2930    except EOFError:
2931        pass
2932
2933    p = p.replace(" ", "")
2934    return hex_bytes(p)
2935
2936
2937@conf.commands.register
2938def wireshark(pktlist, wait=False, **kwargs):
2939    # type: (List[Packet], bool, **Any) -> Optional[Any]
2940    """
2941    Runs Wireshark on a list of packets.
2942
2943    See :func:`tcpdump` for more parameter description.
2944
2945    Note: this defaults to wait=False, to run Wireshark in the background.
2946    """
2947    return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs)
2948
2949
2950@conf.commands.register
2951def tdecode(
2952    pktlist,  # type: Union[IO[bytes], None, str, _PacketIterable]
2953    args=None,  # type: Optional[List[str]]
2954    **kwargs  # type: Any
2955):
2956    # type: (...) -> Any
2957    """
2958    Run tshark on a list of packets.
2959
2960    :param args: If not specified, defaults to ``tshark -V``.
2961
2962    See :func:`tcpdump` for more parameters.
2963    """
2964    if args is None:
2965        args = ["-V"]
2966    return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs)
2967
2968
2969def _guess_linktype_name(value):
2970    # type: (int) -> str
2971    """Guess the DLT name from its value."""
2972    from scapy.libs.winpcapy import pcap_datalink_val_to_name
2973    return cast(bytes, pcap_datalink_val_to_name(value)).decode()
2974
2975
2976def _guess_linktype_value(name):
2977    # type: (str) -> int
2978    """Guess the value of a DLT name."""
2979    from scapy.libs.winpcapy import pcap_datalink_name_to_val
2980    val = cast(int, pcap_datalink_name_to_val(name.encode()))
2981    if val == -1:
2982        warning("Unknown linktype: %s. Using EN10MB", name)
2983        return DLT_EN10MB
2984    return val
2985
2986
2987@conf.commands.register
2988def tcpdump(
2989    pktlist=None,  # type: Union[IO[bytes], None, str, _PacketIterable]
2990    dump=False,  # type: bool
2991    getfd=False,  # type: bool
2992    args=None,  # type: Optional[List[str]]
2993    flt=None,  # type: Optional[str]
2994    prog=None,  # type: Optional[Any]
2995    getproc=False,  # type: bool
2996    quiet=False,  # type: bool
2997    use_tempfile=None,  # type: Optional[Any]
2998    read_stdin_opts=None,  # type: Optional[Any]
2999    linktype=None,  # type: Optional[Any]
3000    wait=True,  # type: bool
3001    _suppress=False  # type: bool
3002):
3003    # type: (...) -> Any
3004    """Run tcpdump or tshark on a list of packets.
3005
3006    When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a
3007    temporary file to store the packets. This works around a bug in Apple's
3008    version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/
3009
3010    Otherwise, the packets are passed in stdin.
3011
3012    This function can be explicitly enabled or disabled with the
3013    ``use_tempfile`` parameter.
3014
3015    When using ``wireshark``, it will be called with ``-ki -`` to start
3016    immediately capturing packets from stdin.
3017
3018    Otherwise, the command will be run with ``-r -`` (which is correct for
3019    ``tcpdump`` and ``tshark``).
3020
3021    This can be overridden with ``read_stdin_opts``. This has no effect when
3022    ``use_tempfile=True``, or otherwise reading packets from a regular file.
3023
3024    :param pktlist: a Packet instance, a PacketList instance or a list of
3025        Packet instances. Can also be a filename (as a string), an open
3026        file-like object that must be a file format readable by
3027        tshark (Pcap, PcapNg, etc.) or None (to sniff)
3028    :param flt: a filter to use with tcpdump
3029    :param dump:    when set to True, returns a string instead of displaying it.
3030    :param getfd:   when set to True, returns a file-like object to read data
3031        from tcpdump or tshark from.
3032    :param getproc: when set to True, the subprocess.Popen object is returned
3033    :param args:    arguments (as a list) to pass to tshark (example for tshark:
3034        args=["-T", "json"]).
3035    :param prog:    program to use (defaults to tcpdump, will work with tshark)
3036    :param quiet:   when set to True, the process stderr is discarded
3037    :param use_tempfile: When set to True, always use a temporary file to store
3038        packets.
3039        When set to False, pipe packets through stdin.
3040        When set to None (default), only use a temporary file with
3041        ``tcpdump`` on OSX.
3042    :param read_stdin_opts: When set, a list of arguments needed to capture
3043        from stdin. Otherwise, attempts to guess.
3044    :param linktype: A custom DLT value or name, to overwrite the default
3045        values.
3046    :param wait: If True (default), waits for the process to terminate before
3047        returning to Scapy. If False, the process will be detached to the
3048        background. If dump, getproc or getfd is True, these have the same
3049        effect as ``wait=False``.
3050
3051    Examples::
3052
3053        >>> tcpdump([IP()/TCP(), IP()/UDP()])
3054        reading from file -, link-type RAW (Raw IP)
3055        16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0  # noqa: E501
3056        16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain]
3057
3058        >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark)
3059          1   0.000000    127.0.0.1 -> 127.0.0.1    TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0  # noqa: E501
3060          2   0.000459    127.0.0.1 -> 127.0.0.1    UDP 28 53->53 Len=0
3061
3062    To get a JSON representation of a tshark-parsed PacketList(), one can::
3063
3064        >>> import json, pprint
3065        >>> json_data = json.load(tcpdump(IP(src="217.25.178.5",
3066        ...                                  dst="45.33.32.156"),
3067        ...                               prog=conf.prog.tshark,
3068        ...                               args=["-T", "json"],
3069        ...                               getfd=True))
3070        >>> pprint.pprint(json_data)
3071        [{u'_index': u'packets-2016-12-23',
3072          u'_score': None,
3073          u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20',
3074                                              u'frame.encap_type': u'7',
3075        [...]
3076                                              },
3077                                   u'ip': {u'ip.addr': u'45.33.32.156',
3078                                           u'ip.checksum': u'0x0000a20d',
3079        [...]
3080                                           u'ip.ttl': u'64',
3081                                           u'ip.version': u'4'},
3082                                   u'raw': u'Raw packet data'}},
3083          u'_type': u'pcap_file'}]
3084        >>> json_data[0]['_source']['layers']['ip']['ip.ttl']
3085        u'64'
3086    """
3087    getfd = getfd or getproc
3088    if prog is None:
3089        if not conf.prog.tcpdump:
3090            raise Scapy_Exception(
3091                "tcpdump is not available"
3092            )
3093        prog = [conf.prog.tcpdump]
3094    elif isinstance(prog, str):
3095        prog = [prog]
3096    else:
3097        raise ValueError("prog must be a string")
3098
3099    if linktype is not None:
3100        if isinstance(linktype, int):
3101            # Guess name from value
3102            try:
3103                linktype_name = _guess_linktype_name(linktype)
3104            except StopIteration:
3105                linktype = -1
3106        else:
3107            # Guess value from name
3108            if linktype.startswith("DLT_"):
3109                linktype = linktype[4:]
3110            linktype_name = linktype
3111            try:
3112                linktype = _guess_linktype_value(linktype)
3113            except KeyError:
3114                linktype = -1
3115        if linktype == -1:
3116            raise ValueError(
3117                "Unknown linktype. Try passing its datalink name instead"
3118            )
3119        prog += ["-y", linktype_name]
3120
3121    # Build Popen arguments
3122    if args is None:
3123        args = []
3124    else:
3125        # Make a copy of args
3126        args = list(args)
3127
3128    if flt is not None:
3129        # Check the validity of the filter
3130        if linktype is None and isinstance(pktlist, str):
3131            # linktype is unknown but required. Read it from file
3132            with PcapReader(pktlist) as rd:
3133                if isinstance(rd, PcapNgReader):
3134                    # Get the linktype from the first packet
3135                    try:
3136                        _, metadata = rd._read_packet()
3137                        linktype = metadata.linktype
3138                        if OPENBSD and linktype == 228:
3139                            linktype = DLT_RAW
3140                    except EOFError:
3141                        raise ValueError(
3142                            "Cannot get linktype from a PcapNg packet."
3143                        )
3144                else:
3145                    linktype = rd.linktype
3146        from scapy.arch.common import compile_filter
3147        compile_filter(flt, linktype=linktype)
3148        args.append(flt)
3149
3150    stdout = subprocess.PIPE if dump or getfd else None
3151    stderr = open(os.devnull) if quiet else None
3152    proc = None
3153
3154    if use_tempfile is None:
3155        # Apple's tcpdump cannot read from stdin, see:
3156        # http://apple.stackexchange.com/questions/152682/
3157        use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump
3158
3159    if read_stdin_opts is None:
3160        if prog[0] == conf.prog.wireshark:
3161            # Start capturing immediately (-k) from stdin (-i -)
3162            read_stdin_opts = ["-ki", "-"]
3163        elif prog[0] == conf.prog.tcpdump and not OPENBSD:
3164            # Capture in packet-buffered mode (-U) from stdin (-r -)
3165            read_stdin_opts = ["-U", "-r", "-"]
3166        else:
3167            read_stdin_opts = ["-r", "-"]
3168    else:
3169        # Make a copy of read_stdin_opts
3170        read_stdin_opts = list(read_stdin_opts)
3171
3172    if pktlist is None:
3173        # sniff
3174        with ContextManagerSubprocess(prog[0], suppress=_suppress):
3175            proc = subprocess.Popen(
3176                prog + args,
3177                stdout=stdout,
3178                stderr=stderr,
3179            )
3180    elif isinstance(pktlist, str):
3181        # file
3182        with ContextManagerSubprocess(prog[0], suppress=_suppress):
3183            proc = subprocess.Popen(
3184                prog + ["-r", pktlist] + args,
3185                stdout=stdout,
3186                stderr=stderr,
3187            )
3188    elif use_tempfile:
3189        tmpfile = get_temp_file(  # type: ignore
3190            autoext=".pcap",
3191            fd=True
3192        )  # type: IO[bytes]
3193        try:
3194            tmpfile.writelines(
3195                iter(lambda: pktlist.read(1048576), b"")  # type: ignore
3196            )
3197        except AttributeError:
3198            pktlist = cast("_PacketIterable", pktlist)
3199            wrpcap(tmpfile, pktlist, linktype=linktype)
3200        else:
3201            tmpfile.close()
3202        with ContextManagerSubprocess(prog[0], suppress=_suppress):
3203            proc = subprocess.Popen(
3204                prog + ["-r", tmpfile.name] + args,
3205                stdout=stdout,
3206                stderr=stderr,
3207            )
3208    else:
3209        try:
3210            pktlist.fileno()  # type: ignore
3211            # pass the packet stream
3212            with ContextManagerSubprocess(prog[0], suppress=_suppress):
3213                proc = subprocess.Popen(
3214                    prog + read_stdin_opts + args,
3215                    stdin=pktlist,  # type: ignore
3216                    stdout=stdout,
3217                    stderr=stderr,
3218                )
3219        except (AttributeError, ValueError):
3220            # write the packet stream to stdin
3221            with ContextManagerSubprocess(prog[0], suppress=_suppress):
3222                proc = subprocess.Popen(
3223                    prog + read_stdin_opts + args,
3224                    stdin=subprocess.PIPE,
3225                    stdout=stdout,
3226                    stderr=stderr,
3227                )
3228            if proc is None:
3229                # An error has occurred
3230                return
3231            try:
3232                proc.stdin.writelines(  # type: ignore
3233                    iter(lambda: pktlist.read(1048576), b"")  # type: ignore
3234                )
3235            except AttributeError:
3236                wrpcap(proc.stdin, pktlist, linktype=linktype)  # type: ignore
3237            except UnboundLocalError:
3238                # The error was handled by ContextManagerSubprocess
3239                pass
3240            else:
3241                proc.stdin.close()  # type: ignore
3242    if proc is None:
3243        # An error has occurred
3244        return
3245    if dump:
3246        data = b"".join(
3247            iter(lambda: proc.stdout.read(1048576), b"")  # type: ignore
3248        )
3249        proc.terminate()
3250        return data
3251    if getproc:
3252        return proc
3253    if getfd:
3254        return proc.stdout
3255    if wait:
3256        proc.wait()
3257
3258
3259@conf.commands.register
3260def hexedit(pktlist):
3261    # type: (_PacketIterable) -> PacketList
3262    """Run hexedit on a list of packets, then return the edited packets."""
3263    f = get_temp_file()
3264    wrpcap(f, pktlist)
3265    with ContextManagerSubprocess(conf.prog.hexedit):
3266        subprocess.call([conf.prog.hexedit, f])
3267    rpktlist = rdpcap(f)
3268    os.unlink(f)
3269    return rpktlist
3270
3271
3272def get_terminal_width():
3273    # type: () -> Optional[int]
3274    """Get terminal width (number of characters) if in a window.
3275
3276    Notice: this will try several methods in order to
3277    support as many terminals and OS as possible.
3278    """
3279    sizex = shutil.get_terminal_size(fallback=(0, 0))[0]
3280    if sizex != 0:
3281        return sizex
3282    # Backups
3283    if WINDOWS:
3284        from ctypes import windll, create_string_buffer
3285        # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/
3286        h = windll.kernel32.GetStdHandle(-12)
3287        csbi = create_string_buffer(22)
3288        res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi)
3289        if res:
3290            (bufx, bufy, curx, cury, wattr,
3291             left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw)  # noqa: E501
3292            sizex = right - left + 1
3293            # sizey = bottom - top + 1
3294            return sizex
3295        return sizex
3296    # We have various methods
3297    # COLUMNS is set on some terminals
3298    try:
3299        sizex = int(os.environ['COLUMNS'])
3300    except Exception:
3301        pass
3302    if sizex:
3303        return sizex
3304    # We can query TIOCGWINSZ
3305    try:
3306        import fcntl
3307        import termios
3308        s = struct.pack('HHHH', 0, 0, 0, 0)
3309        x = fcntl.ioctl(1, termios.TIOCGWINSZ, s)
3310        sizex = struct.unpack('HHHH', x)[1]
3311    except (IOError, ModuleNotFoundError):
3312        # If everything failed, return default terminal size
3313        sizex = 79
3314    return sizex
3315
3316
3317def pretty_list(rtlst,  # type: List[Tuple[Union[str, List[str]], ...]]
3318                header,  # type: List[Tuple[str, ...]]
3319                sortBy=0,  # type: Optional[int]
3320                borders=False,  # type: bool
3321                ):
3322    # type: (...) -> str
3323    """
3324    Pretty list to fit the terminal, and add header.
3325
3326    :param rtlst: a list of tuples. each tuple contains a value which can
3327        be either a string or a list of string.
3328    :param sortBy: the column id (starting with 0) which will be used for
3329        ordering
3330    :param borders: whether to put borders on the table or not
3331    """
3332    if borders:
3333        _space = "|"
3334    else:
3335        _space = "  "
3336    cols = len(header[0])
3337    # Windows has a fat terminal border
3338    _spacelen = len(_space) * (cols - 1) + int(WINDOWS)
3339    _croped = False
3340    if sortBy is not None:
3341        # Sort correctly
3342        rtlst.sort(key=lambda x: x[sortBy])
3343    # Resolve multi-values
3344    for i, line in enumerate(rtlst):
3345        ids = []  # type: List[int]
3346        values = []  # type: List[Union[str, List[str]]]
3347        for j, val in enumerate(line):
3348            if isinstance(val, list):
3349                ids.append(j)
3350                values.append(val or " ")
3351        if values:
3352            del rtlst[i]
3353            k = 0
3354            for ex_vals in zip_longest(*values, fillvalue=" "):
3355                if k:
3356                    extra_line = [" "] * cols
3357                else:
3358                    extra_line = list(line)  # type: ignore
3359                for j, h in enumerate(ids):
3360                    extra_line[h] = ex_vals[j]
3361                rtlst.insert(i + k, tuple(extra_line))
3362                k += 1
3363    rtslst = cast(List[Tuple[str, ...]], rtlst)
3364    # Append tag
3365    rtslst = header + rtslst
3366    # Detect column's width
3367    colwidth = [max(len(y) for y in x) for x in zip(*rtslst)]
3368    # Make text fit in box (if required)
3369    width = get_terminal_width()
3370    if conf.auto_crop_tables and width:
3371        width = width - _spacelen
3372        while sum(colwidth) > width:
3373            _croped = True
3374            # Needs to be cropped
3375            # Get the longest row
3376            i = colwidth.index(max(colwidth))
3377            # Get all elements of this row
3378            row = [len(x[i]) for x in rtslst]
3379            # Get biggest element of this row: biggest of the array
3380            j = row.index(max(row))
3381            # Re-build column tuple with the edited element
3382            t = list(rtslst[j])
3383            t[i] = t[i][:-2] + "_"
3384            rtslst[j] = tuple(t)
3385            # Update max size
3386            row[j] = len(t[i])
3387            colwidth[i] = max(row)
3388    if _croped:
3389        log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)")  # noqa: E501
3390    # Generate padding scheme
3391    fmt = _space.join(["%%-%ds" % x for x in colwidth])
3392    # Append separation line if needed
3393    if borders:
3394        rtslst.insert(1, tuple("-" * x for x in colwidth))
3395    # Compile
3396    return "\n".join(fmt % x for x in rtslst)
3397
3398
3399def human_size(x, fmt=".1f"):
3400    # type: (int, str) -> str
3401    """
3402    Convert a size in octets to a human string representation
3403    """
3404    units = ['K', 'M', 'G', 'T', 'P', 'E']
3405    if not x:
3406        return "0B"
3407    i = int(math.log(x, 2**10))
3408    if i and i < len(units):
3409        return format(x / 2**(10 * i), fmt) + units[i - 1]
3410    return str(x) + "B"
3411
3412
3413def __make_table(
3414    yfmtfunc,  # type: Callable[[int], str]
3415    fmtfunc,  # type: Callable[[int], str]
3416    endline,  # type: str
3417    data,  # type: List[Tuple[Packet, Packet]]
3418    fxyz,  # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]]
3419    sortx=None,  # type: Optional[Callable[[str], Tuple[Any, ...]]]
3420    sorty=None,  # type: Optional[Callable[[str], Tuple[Any, ...]]]
3421    seplinefunc=None,  # type: Optional[Callable[[int, List[int]], str]]
3422    dump=False  # type: bool
3423):
3424    # type: (...) -> Optional[str]
3425    """Core function of the make_table suite, which generates the table"""
3426    vx = {}  # type: Dict[str, int]
3427    vy = {}  # type: Dict[str, Optional[int]]
3428    vz = {}  # type: Dict[Tuple[str, str], str]
3429    vxf = {}  # type: Dict[str, str]
3430
3431    tmp_len = 0
3432    for e in data:
3433        xx, yy, zz = [str(s) for s in fxyz(*e)]
3434        tmp_len = max(len(yy), tmp_len)
3435        vx[xx] = max(vx.get(xx, 0), len(xx), len(zz))
3436        vy[yy] = None
3437        vz[(xx, yy)] = zz
3438
3439    vxk = list(vx)
3440    vyk = list(vy)
3441    if sortx:
3442        vxk.sort(key=sortx)
3443    else:
3444        try:
3445            vxk.sort(key=int)
3446        except Exception:
3447            try:
3448                vxk.sort(key=atol)
3449            except Exception:
3450                vxk.sort()
3451    if sorty:
3452        vyk.sort(key=sorty)
3453    else:
3454        try:
3455            vyk.sort(key=int)
3456        except Exception:
3457            try:
3458                vyk.sort(key=atol)
3459            except Exception:
3460                vyk.sort()
3461
3462    s = ""
3463    if seplinefunc:
3464        sepline = seplinefunc(tmp_len, [vx[x] for x in vxk])
3465        s += sepline + "\n"
3466
3467    fmt = yfmtfunc(tmp_len)
3468    s += fmt % ""
3469    s += ' '
3470    for x in vxk:
3471        vxf[x] = fmtfunc(vx[x])
3472        s += vxf[x] % x
3473        s += ' '
3474    s += endline + "\n"
3475    if seplinefunc:
3476        s += sepline + "\n"
3477    for y in vyk:
3478        s += fmt % y
3479        s += ' '
3480        for x in vxk:
3481            s += vxf[x] % vz.get((x, y), "-")
3482            s += ' '
3483        s += endline + "\n"
3484    if seplinefunc:
3485        s += sepline + "\n"
3486
3487    if dump:
3488        return s
3489    else:
3490        print(s, end="")
3491        return None
3492
3493
3494def make_table(*args, **kargs):
3495    # type: (*Any, **Any) -> Optional[Any]
3496    return __make_table(
3497        lambda l: "%%-%is" % l,
3498        lambda l: "%%-%is" % l,
3499        "",
3500        *args,
3501        **kargs
3502    )
3503
3504
3505def make_lined_table(*args, **kargs):
3506    # type: (*Any, **Any) -> Optional[str]
3507    return __make_table(  # type: ignore
3508        lambda l: "%%-%is |" % l,
3509        lambda l: "%%-%is |" % l,
3510        "",
3511        *args,
3512        seplinefunc=lambda a, x: "+".join(
3513            '-' * (y + 2) for y in [a - 1] + x + [-2]
3514        ),
3515        **kargs
3516    )
3517
3518
3519def make_tex_table(*args, **kargs):
3520    # type: (*Any, **Any) -> Optional[str]
3521    return __make_table(  # type: ignore
3522        lambda l: "%s",
3523        lambda l: "& %s",
3524        "\\\\",
3525        *args,
3526        seplinefunc=lambda a, x: "\\hline",
3527        **kargs
3528    )
3529
3530####################
3531#   WHOIS CLIENT   #
3532####################
3533
3534
3535def whois(ip_address):
3536    # type: (str) -> bytes
3537    """Whois client for Python"""
3538    whois_ip = str(ip_address)
3539    try:
3540        query = socket.gethostbyname(whois_ip)
3541    except Exception:
3542        query = whois_ip
3543    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
3544    s.connect(("whois.ripe.net", 43))
3545    s.send(query.encode("utf8") + b"\r\n")
3546    answer = b""
3547    while True:
3548        d = s.recv(4096)
3549        answer += d
3550        if not d:
3551            break
3552    s.close()
3553    ignore_tag = b"remarks:"
3554    # ignore all lines starting with the ignore_tag
3555    lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))]  # noqa: E501
3556    # remove empty lines at the bottom
3557    for i in range(1, len(lines)):
3558        if not lines[-i].strip():
3559            del lines[-i]
3560        else:
3561            break
3562    return b"\n".join(lines[3:])
3563
3564####################
3565#     CLI utils    #
3566####################
3567
3568
3569class CLIUtil:
3570    """
3571    Provides a Util class to easily create simple CLI tools in Scapy,
3572    that can still be used as an API.
3573
3574    Doc:
3575        - override the ps1() function
3576        - register commands with the @CLIUtil.addcomment decorator
3577        - call the loop() function when ready
3578    """
3579
3580    def _depcheck(self) -> None:
3581        """
3582        Check that all dependencies are installed
3583        """
3584        try:
3585            import prompt_toolkit  # noqa: F401
3586        except ImportError:
3587            # okay we lie but prompt_toolkit is a dependency...
3588            raise ImportError("You need to have IPython installed to use the CLI")
3589
3590    # Okay let's do nice code
3591    commands: Dict[str, Callable[..., Any]] = {}
3592    # print output of command
3593    commands_output: Dict[str, Callable[..., str]] = {}
3594    # provides completion to command
3595    commands_complete: Dict[str, Callable[..., List[str]]] = {}
3596
3597    @staticmethod
3598    def _inspectkwargs(func: DecoratorCallable) -> None:
3599        """
3600        Internal function to parse arguments from the kwargs of the functions
3601        """
3602        func._flagnames = [  # type: ignore
3603            x.name for x in
3604            inspect.signature(func).parameters.values()
3605            if x.kind == inspect.Parameter.KEYWORD_ONLY
3606        ]
3607        func._flags = [  # type: ignore
3608            ("-%s" % x) if len(x) == 1 else ("--%s" % x)
3609            for x in func._flagnames  # type: ignore
3610        ]
3611
3612    @staticmethod
3613    def _parsekwargs(
3614        func: DecoratorCallable,
3615        args: List[str]
3616    ) -> Tuple[List[str], Dict[str, Literal[True]]]:
3617        """
3618        Internal function to parse CLI arguments of a function.
3619        """
3620        kwargs: Dict[str, Literal[True]] = {}
3621        if func._flags:  # type: ignore
3622            i = 0
3623            for arg in args:
3624                if arg in func._flags:  # type: ignore
3625                    i += 1
3626                    kwargs[func._flagnames[func._flags.index(arg)]] = True  # type: ignore  # noqa: E501
3627                    continue
3628                break
3629            args = args[i:]
3630        return args, kwargs
3631
3632    @classmethod
3633    def _parseallargs(
3634        cls,
3635        func: DecoratorCallable,
3636        cmd: str, args: List[str]
3637    ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]:
3638        """
3639        Internal function to parse CLI arguments of both the function
3640        and its output function.
3641        """
3642        args, kwargs = cls._parsekwargs(func, args)
3643        outkwargs: Dict[str, Literal[True]] = {}
3644        if cmd in cls.commands_output:
3645            args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args)
3646        return args, kwargs, outkwargs
3647
3648    @classmethod
3649    def addcommand(
3650        cls,
3651        spaces: bool = False,
3652        globsupport: bool = False,
3653    ) -> Callable[[DecoratorCallable], DecoratorCallable]:
3654        """
3655        Decorator to register a command
3656        """
3657        def func(cmd: DecoratorCallable) -> DecoratorCallable:
3658            cls.commands[cmd.__name__] = cmd
3659            cmd._spaces = spaces  # type: ignore
3660            cmd._globsupport = globsupport  # type: ignore
3661            cls._inspectkwargs(cmd)
3662            if cmd._globsupport and not cmd._spaces:  # type: ignore
3663                raise ValueError("Cannot use globsupport without spaces.")
3664            return cmd
3665        return func
3666
3667    @classmethod
3668    def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]:  # noqa: E501
3669        """
3670        Decorator to register a command output processor
3671        """
3672        def func(processor: DecoratorCallable) -> DecoratorCallable:
3673            cls.commands_output[cmd.__name__] = processor
3674            cls._inspectkwargs(processor)
3675            return processor
3676        return func
3677
3678    @classmethod
3679    def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]:  # noqa: E501
3680        """
3681        Decorator to register a command completor
3682        """
3683        def func(processor: DecoratorCallable) -> DecoratorCallable:
3684            cls.commands_complete[cmd.__name__] = processor
3685            return processor
3686        return func
3687
3688    def ps1(self) -> str:
3689        """
3690        Return the PS1 of the shell
3691        """
3692        return "> "
3693
3694    def close(self) -> None:
3695        """
3696        Function called on exiting
3697        """
3698        print("Exited")
3699
3700    def help(self, cmd: Optional[str] = None) -> None:
3701        """
3702        Return the help related to this CLI util
3703        """
3704        def _args(func: Any) -> str:
3705            flags = func._flags.copy()
3706            if func.__name__ in self.commands_output:
3707                flags += self.commands_output[func.__name__]._flags  # type: ignore
3708            return " %s%s" % (
3709                (
3710                    "%s " % " ".join("[%s]" % x for x in flags)
3711                    if flags else ""
3712                ),
3713                " ".join(
3714                    "<%s%s>" % (
3715                        x.name,
3716                        "?" if
3717                        (x.default is None or x.default != inspect.Parameter.empty)
3718                        else ""
3719                    )
3720                    for x in list(inspect.signature(func).parameters.values())[1:]
3721                    if x.name not in func._flagnames and x.name[0] != "_"
3722                )
3723            )
3724
3725        if cmd:
3726            if cmd not in self.commands:
3727                print("Unknown command '%s'" % cmd)
3728                return
3729            # help for one command
3730            func = self.commands[cmd]
3731            print("%s%s: %s" % (
3732                cmd,
3733                _args(func),
3734                func.__doc__ and func.__doc__.strip()
3735            ))
3736        else:
3737            header = "│ %s - Help │" % self.__class__.__name__
3738            print("┌" + "─" * (len(header) - 2) + "┐")
3739            print(header)
3740            print("└" + "─" * (len(header) - 2) + "┘")
3741            print(
3742                pretty_list(
3743                    [
3744                        (
3745                            cmd,
3746                            _args(func),
3747                            func.__doc__ and func.__doc__.strip().split("\n")[0] or ""
3748                        )
3749                        for cmd, func in self.commands.items()
3750                    ],
3751                    [("Command", "Arguments", "Description")]
3752                )
3753            )
3754
3755    def _completer(self) -> 'prompt_toolkit.completion.Completer':
3756        """
3757        Returns a prompt_toolkit custom completer
3758        """
3759        from prompt_toolkit.completion import Completer, Completion
3760
3761        class CLICompleter(Completer):
3762            def get_completions(cmpl, document, complete_event):  # type: ignore
3763                if not complete_event.completion_requested:
3764                    # Only activate when the user does <TAB>
3765                    return
3766                parts = document.text.split(" ")
3767                cmd = parts[0].lower()
3768                if cmd not in self.commands:
3769                    # We are trying to complete the command
3770                    for possible_cmd in (x for x in self.commands if x.startswith(cmd)):
3771                        yield Completion(possible_cmd, start_position=-len(cmd))
3772                else:
3773                    # We are trying to complete the command content
3774                    if len(parts) == 1:
3775                        return
3776                    args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:])
3777                    arg = " ".join(args)
3778                    if cmd in self.commands_complete:
3779                        for possible_arg in self.commands_complete[cmd](self, arg):
3780                            yield Completion(possible_arg, start_position=-len(arg))
3781                return
3782        return CLICompleter()
3783
3784    def loop(self, debug: int = 0) -> None:
3785        """
3786        Main command handling loop
3787        """
3788        from prompt_toolkit import PromptSession
3789        session = PromptSession(completer=self._completer())
3790
3791        while True:
3792            try:
3793                cmd = session.prompt(self.ps1()).strip()
3794            except KeyboardInterrupt:
3795                continue
3796            except EOFError:
3797                self.close()
3798                break
3799            args = cmd.split(" ")[1:]
3800            cmd = cmd.split(" ")[0].strip().lower()
3801            if not cmd:
3802                continue
3803            if cmd in ["help", "h", "?"]:
3804                self.help(" ".join(args))
3805                continue
3806            if cmd in "exit":
3807                break
3808            if cmd not in self.commands:
3809                print("Unknown command. Type help or ?")
3810            else:
3811                # check the number of arguments
3812                func = self.commands[cmd]
3813                args, kwargs, outkwargs = self._parseallargs(func, cmd, args)
3814                if func._spaces:  # type: ignore
3815                    args = [" ".join(args)]
3816                    # if globsupport is set, we might need to do several calls
3817                    if func._globsupport and "*" in args[0]:  # type: ignore
3818                        if args[0].count("*") > 1:
3819                            print("More than 1 glob star (*) is currently unsupported.")
3820                            continue
3821                        before, after = args[0].split("*", 1)
3822                        reg = re.compile(re.escape(before) + r".*" + after)
3823                        calls = [
3824                            [x] for x in
3825                            self.commands_complete[cmd](self, before)
3826                            if reg.match(x)
3827                        ]
3828                    else:
3829                        calls = [args]
3830                else:
3831                    calls = [args]
3832                # now iterate if required, call the function and print its output
3833                res = None
3834                for args in calls:
3835                    try:
3836                        res = func(self, *args, **kwargs)
3837                    except TypeError:
3838                        print("Bad number of arguments !")
3839                        self.help(cmd=cmd)
3840                        continue
3841                    except Exception as ex:
3842                        print("Command failed with error: %s" % ex)
3843                        if debug:
3844                            traceback.print_exception(ex)
3845                    try:
3846                        if res and cmd in self.commands_output:
3847                            self.commands_output[cmd](self, res, **outkwargs)
3848                    except Exception as ex:
3849                        print("Output processor failed with error: %s" % ex)
3850
3851
3852def AutoArgparse(func: DecoratorCallable) -> None:
3853    """
3854    Generate an Argparse call from a function, then call this function.
3855
3856    Notes:
3857
3858    - for the arguments to have a description, the sphinx docstring format
3859      must be used. See
3860      https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html
3861    - the arguments must be typed in Python (we ignore Sphinx-specific types)
3862      untyped arguments are ignored.
3863    - only types that would be supported by argparse are supported. The others
3864      are omitted.
3865    """
3866    argsdoc = {}
3867    if func.__doc__:
3868        # Sphinx doc format parser
3869        m = re.match(
3870            r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)",
3871            func.__doc__.strip(),
3872        )
3873        if not m:
3874            desc = func.__doc__.strip()
3875        else:
3876            desc = m.group(1)
3877            sphinxargs = re.findall(
3878                r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)",
3879                m.group(2),
3880            )
3881            for argtype, argparam, argdesc in sphinxargs:
3882                argparam = argparam.strip()
3883                argdesc = argdesc.strip()
3884                if argtype == "param":
3885                    if not argparam:
3886                        raise ValueError(":param: without a name !")
3887                    argsdoc[argparam] = argdesc
3888    else:
3889        desc = ""
3890    # Now build the argparse.ArgumentParser
3891    parser = argparse.ArgumentParser(
3892        prog=func.__name__,
3893        description=desc,
3894        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
3895    )
3896    # Process the parameters
3897    positional = []
3898    for param in inspect.signature(func).parameters.values():
3899        if not param.annotation:
3900            continue
3901        parname = param.name
3902        paramkwargs = {}
3903        if param.annotation is bool:
3904            if param.default is True:
3905                parname = "no-" + parname
3906                paramkwargs["action"] = "store_false"
3907            else:
3908                paramkwargs["action"] = "store_true"
3909        elif param.annotation in [str, int, float]:
3910            paramkwargs["type"] = param.annotation
3911        else:
3912            continue
3913        if param.default != inspect.Parameter.empty:
3914            if param.kind == inspect.Parameter.POSITIONAL_ONLY:
3915                positional.append(param.name)
3916                paramkwargs["nargs"] = '?'
3917            else:
3918                parname = "--" + parname
3919            paramkwargs["default"] = param.default
3920        else:
3921            positional.append(param.name)
3922        if param.kind == inspect.Parameter.VAR_POSITIONAL:
3923            paramkwargs["action"] = "append"
3924        if param.name in argsdoc:
3925            paramkwargs["help"] = argsdoc[param.name]
3926        parser.add_argument(parname, **paramkwargs)  # type: ignore
3927    # Now parse the sys.argv parameters
3928    params = vars(parser.parse_args())
3929    # Act as in interactive mode
3930    conf.logLevel = 20
3931    from scapy.themes import DefaultTheme
3932    conf.color_theme = DefaultTheme()
3933    # And call the function
3934    try:
3935        func(
3936            *[params.pop(x) for x in positional],
3937            **{
3938                (k[3:] if k.startswith("no_") else k): v
3939                for k, v in params.items()
3940            }
3941        )
3942    except AssertionError as ex:
3943        print("ERROR: " + str(ex))
3944        parser.print_help()
3945
3946
3947#######################
3948#   PERIODIC SENDER   #
3949#######################
3950
3951
3952class PeriodicSenderThread(threading.Thread):
3953    def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True):
3954        # type: (Any, _PacketIterable, float, bool) -> None
3955        """ Thread to send packets periodically
3956
3957        Args:
3958            sock: socket where packet is sent periodically
3959            pkt: packet or list of packets to send
3960            interval: interval between two packets
3961        """
3962        if not isinstance(pkt, list):
3963            self._pkts = [cast("Packet", pkt)]  # type: _PacketIterable
3964        else:
3965            self._pkts = pkt
3966        self._socket = sock
3967        self._stopped = threading.Event()
3968        self._enabled = threading.Event()
3969        self._enabled.set()
3970        self._interval = interval
3971        self._ignore_exceptions = ignore_exceptions
3972        threading.Thread.__init__(self)
3973
3974    def enable(self):
3975        # type: () -> None
3976        self._enabled.set()
3977
3978    def disable(self):
3979        # type: () -> None
3980        self._enabled.clear()
3981
3982    def run(self):
3983        # type: () -> None
3984        while not self._stopped.is_set() and not self._socket.closed:
3985            for p in self._pkts:
3986                try:
3987                    if self._enabled.is_set():
3988                        self._socket.send(p)
3989                except (OSError, TimeoutError) as e:
3990                    if self._ignore_exceptions:
3991                        return
3992                    else:
3993                        raise e
3994                self._stopped.wait(timeout=self._interval)
3995                if self._stopped.is_set() or self._socket.closed:
3996                    break
3997
3998    def stop(self):
3999        # type: () -> None
4000        self._stopped.set()
4001        self.join(self._interval * 2)
4002
4003
4004class SingleConversationSocket(object):
4005    def __init__(self, o):
4006        # type: (Any) -> None
4007        self._inner = o
4008        self._tx_mutex = threading.RLock()
4009
4010    @property
4011    def __dict__(self):  # type: ignore
4012        return self._inner.__dict__
4013
4014    def __getattr__(self, name):
4015        # type: (str) -> Any
4016        return getattr(self._inner, name)
4017
4018    def sr1(self, *args, **kargs):
4019        # type: (*Any, **Any) -> Any
4020        with self._tx_mutex:
4021            return self._inner.sr1(*args, **kargs)
4022
4023    def sr(self, *args, **kargs):
4024        # type: (*Any, **Any) -> Any
4025        with self._tx_mutex:
4026            return self._inner.sr(*args, **kargs)
4027
4028    def send(self, x):
4029        # type: (Packet) -> Any
4030        with self._tx_mutex:
4031            try:
4032                return self._inner.send(x)
4033            except (ConnectionError, OSError) as e:
4034                self._inner.close()
4035                raise e
4036