1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 5 6""" 7Packet sending and receiving libpcap/WinPcap. 8""" 9 10import os 11import platform 12import socket 13import struct 14import time 15 16from scapy.automaton import select_objects 17from scapy.compat import raw, plain_str 18from scapy.config import conf 19from scapy.consts import WINDOWS, LINUX, BSD, SOLARIS 20from scapy.data import ( 21 DLT_RAW_ALT, 22 DLT_RAW, 23 ETH_P_ALL, 24 MTU, 25) 26from scapy.error import ( 27 Scapy_Exception, 28 log_loading, 29 log_runtime, 30 warning, 31) 32from scapy.interfaces import ( 33 InterfaceProvider, 34 NetworkInterface, 35 _GlobInterfaceType, 36 network_name, 37) 38from scapy.packet import Packet 39from scapy.pton_ntop import inet_ntop 40from scapy.supersocket import SuperSocket 41from scapy.utils import str2mac, decode_locale_str 42 43import scapy.consts 44 45from typing import ( 46 Any, 47 Dict, 48 List, 49 NoReturn, 50 Optional, 51 Tuple, 52 Type, 53 cast, 54) 55 56if not scapy.consts.WINDOWS: 57 from fcntl import ioctl 58 59# AF_LINK is only available and provided on BSD (MAC) 60# but because we use its value elsewhere, let's patch it. 61if not hasattr(socket, "AF_LINK"): 62 socket.AF_LINK = 18 # type: ignore 63 64############ 65# COMMON # 66############ 67 68# From BSD net/bpf.h 69# BIOCIMMEDIATE = 0x80044270 70BIOCIMMEDIATE = -2147204496 71 72# https://github.com/the-tcpdump-group/libpcap/blob/master/pcap/pcap.h 73PCAP_IF_UP = 0x00000002 # interface is up 74_pcap_if_flags = [ 75 "LOOPBACK", 76 "UP", 77 "RUNNING", 78 "WIRELESS", 79 "OK", 80 "DISCONNECTED", 81 "NA" 82] 83 84 85class _L2libpcapSocket(SuperSocket): 86 __slots__ = ["pcap_fd", "lvl"] 87 88 def __init__(self, fd): 89 # type: (_PcapWrapper_libpcap) -> None 90 self.pcap_fd = fd 91 ll = self.pcap_fd.datalink() 92 if ll in conf.l2types: 93 self.LL = conf.l2types[ll] 94 if ll in [ 95 DLT_RAW, 96 DLT_RAW_ALT, 97 ]: 98 self.lvl = 3 99 else: 100 self.lvl = 2 101 else: 102 self.LL = conf.default_l2 103 warning( 104 "Unable to guess datalink type " 105 "(interface=%s linktype=%i). Using %s", 106 self.iface, ll, self.LL.name 107 ) 108 109 def recv_raw(self, x=MTU): 110 # type: (int) -> Tuple[Optional[Type[Packet]], Optional[bytes], Optional[float]] # noqa: E501 111 """ 112 Receives a packet, then returns a tuple containing 113 (cls, pkt_data, time) 114 """ 115 ts, pkt = self.pcap_fd.next() 116 if pkt is None: 117 return None, None, None 118 return self.LL, pkt, ts 119 120 def nonblock_recv(self, x=MTU): 121 # type: (int) -> Optional[Packet] 122 """Receives and dissect a packet in non-blocking mode.""" 123 self.pcap_fd.setnonblock(True) 124 p = self.recv(x) 125 self.pcap_fd.setnonblock(False) 126 return p 127 128 def fileno(self): 129 # type: () -> int 130 return self.pcap_fd.fileno() 131 132 @staticmethod 133 def select(sockets, remain=None): 134 # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] 135 return select_objects(sockets, remain) 136 137 def close(self): 138 # type: () -> None 139 if self.closed: 140 return 141 self.closed = True 142 if hasattr(self, "pcap_fd"): 143 # If failed to open, won't exist 144 self.pcap_fd.close() 145 146 147########## 148# PCAP # 149########## 150 151if WINDOWS: 152 NPCAP_PATH = "" 153 154if conf.use_pcap: 155 if WINDOWS: 156 # Windows specific 157 NPCAP_PATH = os.environ["WINDIR"] + "\\System32\\Npcap" 158 from scapy.libs.winpcapy import pcap_setmintocopy, pcap_getevent 159 else: 160 from scapy.libs.winpcapy import pcap_get_selectable_fd 161 from ctypes import POINTER, byref, create_string_buffer, c_ubyte, cast as ccast 162 163 # Part of the Winpcapy integration was inspired by phaethon/scapy 164 # but he destroyed the commit history, so there is no link to that 165 try: 166 from scapy.libs.winpcapy import ( 167 PCAP_ERRBUF_SIZE, 168 PCAP_ERROR, 169 PCAP_ERROR_NO_SUCH_DEVICE, 170 PCAP_ERROR_PERM_DENIED, 171 bpf_program, 172 pcap_close, 173 pcap_compile, 174 pcap_datalink, 175 pcap_findalldevs, 176 pcap_freealldevs, 177 pcap_geterr, 178 pcap_if_t, 179 pcap_lib_version, 180 pcap_next_ex, 181 pcap_open_live, 182 pcap_pkthdr, 183 pcap_setfilter, 184 pcap_setnonblock, 185 sockaddr_in, 186 sockaddr_in6, 187 ) 188 try: 189 from scapy.libs.winpcapy import pcap_inject 190 except ImportError: 191 # Fallback for Winpcap... (for how long?) 192 from scapy.libs.winpcapy import pcap_sendpacket as pcap_inject 193 194 def load_winpcapy(): 195 # type: () -> None 196 """This functions calls libpcap ``pcap_findalldevs`` function, 197 and extracts and parse all the data scapy will need 198 to build the Interface List. 199 200 The data will be stored in ``conf.cache_pcapiflist`` 201 """ 202 from scapy.fields import FlagValue 203 204 err = create_string_buffer(PCAP_ERRBUF_SIZE) 205 devs = POINTER(pcap_if_t)() 206 if_list = {} 207 if pcap_findalldevs(byref(devs), err) < 0: 208 return 209 try: 210 p = devs 211 # Iterate through the different interfaces 212 while p: 213 name = plain_str(p.contents.name) # GUID 214 description = plain_str( 215 p.contents.description or "" 216 ) # DESC 217 flags = p.contents.flags # FLAGS 218 ips = [] 219 mac = "" 220 itype = -1 221 a = p.contents.addresses 222 while a: 223 # IPv4 address 224 family = a.contents.addr.contents.sa_family 225 ap = a.contents.addr 226 if family == socket.AF_INET: 227 val = ccast(ap, POINTER(sockaddr_in)) 228 addr_raw = val.contents.sin_addr[:] 229 elif family == socket.AF_INET6: 230 val = ccast(ap, POINTER(sockaddr_in6)) 231 addr_raw = val.contents.sin6_addr[:] 232 elif family == socket.AF_LINK: 233 # Special case: MAC 234 # (AF_LINK is mostly BSD specific) 235 val = ap.contents.sa_data 236 mac = str2mac(bytes(bytearray(val[:6]))) 237 a = a.contents.next 238 continue 239 else: 240 # Unknown AF 241 a = a.contents.next 242 continue 243 addr = inet_ntop(family, bytes(bytearray(addr_raw))) 244 if addr != "0.0.0.0": 245 ips.append(addr) 246 a = a.contents.next 247 flags = FlagValue(flags, _pcap_if_flags) 248 if_list[name] = (description, ips, flags, mac, itype) 249 p = p.contents.next 250 conf.cache_pcapiflist = if_list 251 except Exception: 252 raise 253 finally: 254 pcap_freealldevs(devs) 255 except OSError: 256 conf.use_pcap = False 257 if WINDOWS: 258 if conf.interactive: 259 log_loading.critical( 260 "Npcap/Winpcap is not installed ! See " 261 "https://scapy.readthedocs.io/en/latest/installation.html#windows" # noqa: E501 262 ) 263 else: 264 if conf.interactive: 265 log_loading.critical( 266 "Libpcap is not installed!" 267 ) 268 else: 269 if WINDOWS: 270 # Detect Pcap version: check for Npcap 271 version = pcap_lib_version() 272 if b"winpcap" in version.lower(): 273 if os.path.exists(NPCAP_PATH + "\\wpcap.dll"): 274 warning("Winpcap is installed over Npcap. " 275 "Will use Winpcap (see 'Winpcap/Npcap conflicts' " 276 "in Scapy's docs)") 277 elif platform.release() != "XP": 278 warning("WinPcap is now deprecated (not maintained). " 279 "Please use Npcap instead") 280 elif b"npcap" in version.lower(): 281 conf.use_npcap = True 282 conf.loopback_name = conf.loopback_name = "Npcap Loopback Adapter" # noqa: E501 283 284if conf.use_pcap: 285 class _PcapWrapper_libpcap: # noqa: F811 286 """Wrapper for the libpcap calls""" 287 288 def __init__(self, 289 device, # type: _GlobInterfaceType 290 snaplen, # type: int 291 promisc, # type: bool 292 to_ms, # type: int 293 monitor=None, # type: Optional[bool] 294 ): 295 # type: (...) -> None 296 self.errbuf = create_string_buffer(PCAP_ERRBUF_SIZE) 297 self.iface = create_string_buffer( 298 network_name(device).encode("utf8") 299 ) 300 self.dtl = -1 301 if not WINDOWS or conf.use_npcap: 302 from scapy.libs.winpcapy import pcap_create 303 self.pcap = pcap_create(self.iface, self.errbuf) 304 if not self.pcap: 305 error = decode_locale_str(bytearray(self.errbuf).strip(b"\x00")) 306 if error: 307 raise OSError(error) 308 # Non-winpcap functions 309 from scapy.libs.winpcapy import ( 310 pcap_set_snaplen, 311 pcap_set_promisc, 312 pcap_set_timeout, 313 pcap_set_rfmon, 314 pcap_activate, 315 pcap_statustostr, 316 pcap_geterr, 317 ) 318 if pcap_set_snaplen(self.pcap, snaplen) != 0: 319 error = decode_locale_str(bytearray(self.errbuf).strip(b"\x00")) 320 if error: 321 raise OSError(error) 322 log_runtime.error("Could not set snaplen") 323 if pcap_set_promisc(self.pcap, promisc) != 0: 324 error = decode_locale_str(bytearray(self.errbuf).strip(b"\x00")) 325 if error: 326 raise OSError(error) 327 log_runtime.error("Could not set promisc") 328 if pcap_set_timeout(self.pcap, to_ms) != 0: 329 error = decode_locale_str(bytearray(self.errbuf).strip(b"\x00")) 330 if error: 331 raise OSError(error) 332 log_runtime.error("Could not set timeout") 333 if monitor: 334 if pcap_set_rfmon(self.pcap, 1) != 0: 335 error = decode_locale_str(bytearray(self.errbuf).strip(b"\x00")) 336 if error: 337 raise OSError(error) 338 log_runtime.error("Could not set monitor mode") 339 status = pcap_activate(self.pcap) 340 # status == 0 means success 341 # status < 0 means error 342 # status > 0 means success, but with a warning 343 if status < 0: 344 # self.iface, and strings we get back from 345 # pcap_geterr() and pcap_statustostr(), have the 346 # type "bytes". 347 # 348 # decode_locale_str() turns them into strings. 349 iface = decode_locale_str( 350 bytearray(self.iface).strip(b"\x00") 351 ) 352 errstr = decode_locale_str( 353 bytearray(pcap_geterr(self.pcap)).strip(b"\x00") 354 ) 355 statusstr = decode_locale_str( 356 bytearray(pcap_statustostr(status)).strip(b"\x00") 357 ) 358 if status == PCAP_ERROR: 359 errmsg = errstr 360 elif status == PCAP_ERROR_NO_SUCH_DEVICE: 361 errmsg = "%s: %s\n(%s)" % (iface, statusstr, errstr) 362 elif status == PCAP_ERROR_PERM_DENIED and errstr != "": 363 errmsg = "%s: %s\n(%s)" % (iface, statusstr, errstr) 364 else: 365 errmsg = "%s: %s" % (iface, statusstr) 366 raise OSError(errmsg) 367 else: 368 if WINDOWS and monitor: 369 raise OSError("On Windows, this feature requires NPcap !") 370 self.pcap = pcap_open_live(self.iface, 371 snaplen, promisc, to_ms, 372 self.errbuf) 373 error = decode_locale_str(bytearray(self.errbuf).strip(b"\x00")) 374 if error: 375 raise OSError(error) 376 377 if WINDOWS: 378 # On Windows, we need to cache whether there are still packets in the 379 # queue or not. When they aren't, then we select normally like on linux. 380 self.remaining = True 381 # Winpcap/Npcap exclusive: make every packet to be instantly 382 # returned, and not buffered within Winpcap/Npcap 383 pcap_setmintocopy(self.pcap, 0) 384 385 self.header = POINTER(pcap_pkthdr)() 386 self.pkt_data = POINTER(c_ubyte)() 387 self.bpf_program = bpf_program() 388 389 def next(self): 390 # type: () -> Tuple[Optional[float], Optional[bytes]] 391 """ 392 Returns the next packet as the tuple 393 (timestamp, raw_packet) 394 """ 395 c = pcap_next_ex( 396 self.pcap, 397 byref(self.header), 398 byref(self.pkt_data) 399 ) 400 if not c > 0: 401 self.remaining = False # we emptied the queue 402 return None, None 403 else: 404 self.remaining = True 405 ts = ( 406 self.header.contents.ts.tv_sec + 407 float(self.header.contents.ts.tv_usec) / 1e6 408 ) 409 pkt = bytes(bytearray( 410 self.pkt_data[:self.header.contents.len] 411 )) 412 return ts, pkt 413 __next__ = next 414 415 def datalink(self): 416 # type: () -> int 417 """Wrapper around pcap_datalink""" 418 if self.dtl == -1: 419 self.dtl = pcap_datalink(self.pcap) 420 return self.dtl 421 422 def fileno(self): 423 # type: () -> int 424 if WINDOWS: 425 if self.remaining: 426 # Still packets in the queue. Don't select 427 return -1 428 return cast(int, pcap_getevent(self.pcap)) 429 else: 430 # This does not exist under Windows 431 return cast(int, pcap_get_selectable_fd(self.pcap)) 432 433 def setfilter(self, f): 434 # type: (str) -> None 435 filter_exp = create_string_buffer(f.encode("utf8")) 436 if pcap_compile(self.pcap, byref(self.bpf_program), filter_exp, 1, -1) >= 0: # noqa: E501 437 if pcap_setfilter(self.pcap, byref(self.bpf_program)) >= 0: 438 # Success 439 return 440 errstr = decode_locale_str( 441 bytearray(pcap_geterr(self.pcap)).strip(b"\x00") 442 ) 443 raise Scapy_Exception("Cannot set filter: %s" % errstr) 444 445 def setnonblock(self, i): 446 # type: (bool) -> None 447 pcap_setnonblock(self.pcap, i, self.errbuf) 448 449 def send(self, x): 450 # type: (bytes) -> int 451 return pcap_inject(self.pcap, x, len(x)) # type: ignore 452 453 def close(self): 454 # type: () -> None 455 pcap_close(self.pcap) 456 open_pcap = _PcapWrapper_libpcap 457 458 class LibpcapProvider(InterfaceProvider): 459 """ 460 Load interfaces from Libpcap on non-Windows machines 461 """ 462 name = "libpcap" 463 libpcap = True 464 465 def load(self): 466 # type: () -> Dict[str, NetworkInterface] 467 if not conf.use_pcap or WINDOWS: 468 return {} 469 if not conf.cache_pcapiflist: 470 load_winpcapy() 471 data = {} 472 i = 0 473 for ifname, dat in conf.cache_pcapiflist.items(): 474 description, ips, flags, mac, itype = dat 475 i += 1 476 if LINUX or BSD or SOLARIS and not mac: 477 from scapy.arch.unix import get_if_raw_hwaddr 478 try: 479 itype, _mac = get_if_raw_hwaddr(ifname) 480 mac = str2mac(_mac) 481 except Exception: 482 # There are at least 3 different possible exceptions 483 mac = "00:00:00:00:00:00" 484 if_data = { 485 'name': ifname, 486 'description': description or ifname, 487 'network_name': ifname, 488 'index': i, 489 'mac': mac, 490 'type': itype, 491 'ips': ips, 492 'flags': flags 493 } 494 data[ifname] = NetworkInterface(self, if_data) 495 return data 496 497 def reload(self): 498 # type: () -> Dict[str, NetworkInterface] 499 if conf.use_pcap: 500 from scapy.arch.libpcap import load_winpcapy 501 load_winpcapy() 502 return self.load() 503 504 if not WINDOWS: 505 conf.ifaces.register_provider(LibpcapProvider) 506 507 # pcap sockets 508 509 class L2pcapListenSocket(_L2libpcapSocket): 510 desc = "read packets at layer 2 using libpcap" 511 512 def __init__(self, 513 iface=None, # type: Optional[_GlobInterfaceType] 514 type=ETH_P_ALL, # type: int 515 promisc=None, # type: Optional[bool] 516 filter=None, # type: Optional[str] 517 monitor=None, # type: Optional[bool] 518 ): 519 # type: (...) -> None 520 self.type = type 521 self.outs = None 522 if iface is None: 523 iface = conf.iface 524 self.iface = iface 525 if promisc is not None: 526 self.promisc = promisc 527 else: 528 self.promisc = conf.sniff_promisc 529 self.monitor = monitor 530 fd = open_pcap( 531 device=iface, 532 snaplen=MTU, 533 promisc=self.promisc, 534 to_ms=100, 535 monitor=self.monitor, 536 ) 537 super(L2pcapListenSocket, self).__init__(fd) 538 try: 539 if not WINDOWS: 540 ioctl( 541 self.pcap_fd.fileno(), 542 BIOCIMMEDIATE, 543 struct.pack("I", 1) 544 ) 545 except Exception: 546 pass 547 if type == ETH_P_ALL: # Do not apply any filter if Ethernet type is given # noqa: E501 548 if conf.except_filter: 549 if filter: 550 filter = "(%s) and not (%s)" % (filter, conf.except_filter) # noqa: E501 551 else: 552 filter = "not (%s)" % conf.except_filter 553 if filter: 554 self.pcap_fd.setfilter(filter) 555 556 def send(self, x): 557 # type: (Packet) -> NoReturn 558 raise Scapy_Exception( 559 "Can't send anything with L2pcapListenSocket" 560 ) 561 562 class L2pcapSocket(_L2libpcapSocket): 563 desc = "read/write packets at layer 2 using only libpcap" 564 565 def __init__(self, 566 iface=None, # type: Optional[_GlobInterfaceType] 567 type=ETH_P_ALL, # type: int 568 promisc=None, # type: Optional[bool] 569 filter=None, # type: Optional[str] 570 nofilter=0, # type: int 571 monitor=None # type: Optional[bool] 572 ): 573 # type: (...) -> None 574 if iface is None: 575 iface = conf.iface 576 self.iface = iface 577 self.type = type 578 if promisc is not None: 579 self.promisc = promisc 580 else: 581 self.promisc = conf.sniff_promisc 582 self.monitor = monitor 583 fd = open_pcap( 584 device=iface, 585 snaplen=MTU, 586 promisc=self.promisc, 587 to_ms=100, 588 monitor=self.monitor, 589 ) 590 super(L2pcapSocket, self).__init__(fd) 591 try: 592 if not WINDOWS: 593 ioctl( 594 self.pcap_fd.fileno(), 595 BIOCIMMEDIATE, 596 struct.pack("I", 1) 597 ) 598 except Exception: 599 pass 600 if nofilter: 601 if type != ETH_P_ALL: 602 # PF_PACKET stuff. Need to emulate this for pcap 603 filter = "ether proto %i" % type 604 else: 605 filter = None 606 else: 607 if conf.except_filter: 608 if filter: 609 filter = "(%s) and not (%s)" % (filter, conf.except_filter) # noqa: E501 610 else: 611 filter = "not (%s)" % conf.except_filter 612 if type != ETH_P_ALL: 613 # PF_PACKET stuff. Need to emulate this for pcap 614 if filter: 615 filter = "(ether proto %i) and (%s)" % (type, filter) 616 else: 617 filter = "ether proto %i" % type 618 self.filter = filter 619 if filter: 620 self.pcap_fd.setfilter(filter) 621 622 def send(self, x): 623 # type: (Packet) -> int 624 sx = raw(x) 625 try: 626 x.sent_time = time.time() 627 except AttributeError: 628 pass 629 return self.pcap_fd.send(sx) 630 631 class L3pcapSocket(L2pcapSocket): 632 desc = "read/write packets at layer 3 using only libpcap" 633 634 def __init__(self, *args, **kwargs): 635 # type: (*Any, **Any) -> None 636 super(L3pcapSocket, self).__init__(*args, **kwargs) 637 self.send_socks = {network_name(self.iface): self} 638 639 def recv(self, x=MTU, **kwargs): 640 # type: (int, **Any) -> Optional[Packet] 641 r = L2pcapSocket.recv(self, x, **kwargs) 642 if r and self.lvl == 2: 643 r.payload.time = r.time 644 return r.payload 645 return r 646 647 def send(self, x): 648 # type: (Packet) -> int 649 # Select the file descriptor to send the packet on. 650 iff = x.route()[0] 651 if iff is None: 652 iff = network_name(conf.iface) 653 type_x = type(x) 654 if iff not in self.send_socks: 655 self.send_socks[iff] = L3pcapSocket( 656 iface=iff, 657 type=self.type, 658 filter=self.filter, 659 promisc=self.promisc, 660 monitor=self.monitor, 661 ) 662 sock = self.send_socks[iff] 663 fd = sock.pcap_fd 664 if sock.lvl == 3: 665 if not issubclass(sock.LL, type_x): 666 warning("Incompatible L3 types detected using %s instead of %s !", 667 type_x, sock.LL) 668 sock.LL = type_x 669 if sock.lvl == 2: 670 sx = bytes(sock.LL() / x) 671 else: 672 sx = bytes(x) 673 # Now send. 674 try: 675 x.sent_time = time.time() 676 except AttributeError: 677 pass 678 return fd.send(sx) 679 680 @staticmethod 681 def select(sockets, remain=None): 682 # type: (List[SuperSocket], Optional[float]) -> List[SuperSocket] 683 socks = [] # type: List[SuperSocket] 684 for sock in sockets: 685 if isinstance(sock, L3pcapSocket): 686 socks += sock.send_socks.values() 687 else: 688 socks.append(sock) 689 return L2pcapSocket.select(socks, remain=remain) 690 691 def close(self): 692 # type: () -> None 693 if self.closed: 694 return 695 super(L3pcapSocket, self).close() 696 for fd in self.send_socks.values(): 697 if fd is not self: 698 fd.close() 699