## This file is part of Scapy ## See http://www.secdev.org/projects/scapy for more informations ## Copyright (C) Philippe Biondi ## Copyright (C) Gabriel Potter ## This program is published under a GPLv2 license """ Customizations needed to support Microsoft Windows. """ from __future__ import absolute_import from __future__ import print_function import os, re, sys, socket, time, itertools, platform import subprocess as sp from glob import glob import tempfile from threading import Thread, Event import scapy from scapy.config import conf, ConfClass from scapy.error import Scapy_Exception, log_loading, log_runtime, warning from scapy.utils import atol, itom, inet_aton, inet_ntoa, PcapReader, pretty_list from scapy.utils6 import construct_source_candidate_set from scapy.base_classes import Gen, Net, SetGen from scapy.data import MTU, ETHER_BROADCAST, ETH_P_ARP import scapy.modules.six as six from scapy.modules.six.moves import range, zip, input from scapy.compat import plain_str conf.use_pcap = False conf.use_dnet = False conf.use_winpcapy = True WINDOWS = (os.name == 'nt') NEW_RELEASE = None #hot-patching socket for missing variables on Windows import socket if not hasattr(socket, 'IPPROTO_IPIP'): socket.IPPROTO_IPIP=4 if not hasattr(socket, 'IPPROTO_AH'): socket.IPPROTO_AH=51 if not hasattr(socket, 'IPPROTO_ESP'): socket.IPPROTO_ESP=50 if not hasattr(socket, 'IPPROTO_GRE'): socket.IPPROTO_GRE=47 from scapy.arch import pcapdnet from scapy.arch.pcapdnet import * _WlanHelper = NPCAP_PATH + "\\WlanHelper.exe" import scapy.consts def is_new_release(ignoreVBS=False): if NEW_RELEASE and conf.prog.powershell is not None: return True release = platform.release() if conf.prog.powershell is None and not ignoreVBS: return False try: if float(release) >= 8: return True except ValueError: if (release=="post2008Server"): return True return False def _encapsulate_admin(cmd): """Encapsulate a command with an Administrator flag""" # To get admin access, we start a new powershell instance with admin # rights, which will execute the command return "Start-Process PowerShell -windowstyle hidden -Wait -Verb RunAs -ArgumentList '-command &{%s}'" % cmd class _PowershellManager(Thread): """Instance used to send multiple commands on the same Powershell process. Will be instantiated on loading and automatically stopped. """ def __init__(self): # Start & redirect input if conf.prog.powershell: self.process = sp.Popen([conf.prog.powershell, "-NoLogo", "-NonInteractive", # Do not print headers "-Command", "-"], # Listen commands from stdin stdout=sp.PIPE, stdin=sp.PIPE, stderr=sp.STDOUT) self.cmd = False else: # Fallback on CMD (powershell-only commands will fail, but scapy use the VBS fallback) self.process = sp.Popen([conf.prog.cmd], stdout=sp.PIPE, stdin=sp.PIPE, stderr=sp.STDOUT) self.cmd = True self.buffer = [] self.running = True self.query_complete = Event() Thread.__init__(self) self.daemon = True self.start() if self.cmd: self.query(["echo @off"]) # Remove header else: self.query(["$FormatEnumerationLimit=-1"]) # Do not crop long IP lists def run(self): while self.running: read_line = self.process.stdout.readline().strip() if read_line == b"scapy_end": self.query_complete.set() else: self.buffer.append(read_line.decode("utf8", "ignore") if six.PY3 else read_line) def query(self, command): self.query_complete.clear() if not self.running: self.__init__(self) # Call powershell query using running process self.buffer = [] # 'scapy_end' is used as a marker of the end of execution query = " ".join(command) + ("&" if self.cmd else ";") + " echo scapy_end\n" self.process.stdin.write(query.encode()) self.process.stdin.flush() self.query_complete.wait() return self.buffer[1:] # Crops first line: the command def close(self): self.running = False try: self.process.stdin.write("exit\n") self.process.terminate() except: pass def _exec_query_ps(cmd, fields): """Execute a PowerShell query, using the cmd command, and select and parse the provided fields. """ if not conf.prog.powershell: raise OSError("Scapy could not detect powershell !") # Build query query_cmd = cmd + ['|', 'select %s' % ', '.join(fields), # select fields '|', 'fl', # print as a list '|', 'out-string', '-Width', '4096'] # do not crop l=[] # Ask the powershell manager to process the query stdout = POWERSHELL_PROCESS.query(query_cmd) # Process stdout for line in stdout: if not line.strip(): # skip empty lines continue sl = line.split(':', 1) if len(sl) == 1: l[-1] += sl[0].strip() continue else: l.append(sl[1].strip()) if len(l) == len(fields): yield l l=[] def _vbs_exec_code(code, split_tag="@"): if not conf.prog.cscript: raise OSError("Scapy could not detect cscript !") tmpfile = tempfile.NamedTemporaryFile(mode="wb", suffix=".vbs", delete=False) tmpfile.write(raw(code)) tmpfile.close() ps = sp.Popen([conf.prog.cscript, tmpfile.name], stdout=sp.PIPE, stderr=open(os.devnull), universal_newlines=True) for _ in range(3): # skip 3 first lines ps.stdout.readline() for line in ps.stdout: data = line.replace("\n", "").split(split_tag) for l in data: yield l os.unlink(tmpfile.name) def _vbs_get_hardware_iface_guid(devid): try: devid = str(int(devid) + 1) guid = next(iter(_vbs_exec_code("""WScript.Echo CreateObject("WScript.Shell").RegRead("HKEY_LOCAL_MACHINE\\SOFTWARE\\Microsoft\\Windows NT\\CurrentVersion\\NetworkCards\\%s\\ServiceName") """ % devid))) guid = guid[:-1] if guid.endswith('}\n') else guid if guid.startswith('{') and guid.endswith('}'): return guid except StopIteration: return None # Some names differ between VBS and PS ## None: field will not be returned under VBS _VBS_WMI_FIELDS = { "Win32_NetworkAdapter": { "InterfaceDescription": "Description", # Note: when using VBS, the GUID is not the same than with Powershell # So we use get the device ID instead, then use _vbs_get_hardware_iface_guid # To get its real GUID "GUID": "DeviceID" }, "*": { "Status": "State" } } _VBS_WMI_REPLACE = { "Win32_NetworkAdapterConfiguration": { "line.IPAddress": "\"{\" & Join( line.IPAddress, \", \" ) & \"}\"", } } _VBS_WMI_OUTPUT = { "Win32_NetworkAdapter": { "DeviceID": _vbs_get_hardware_iface_guid, } } def _exec_query_vbs(cmd, fields): """Execute a query using VBS. Currently Get-WmiObject, Get-Service queries are supported. """ if not(len(cmd) == 2 and cmd[0] in ["Get-WmiObject", "Get-Service"]): return action = cmd[0] fields = [_VBS_WMI_FIELDS.get(cmd[1], _VBS_WMI_FIELDS.get("*", {})).get(fld, fld) for fld in fields] parsed_command = "WScript.Echo " + " & \" @ \" & ".join("line.%s" % fld for fld in fields if fld is not None) # The IPAddress is an array: convert it to a string for key,val in _VBS_WMI_REPLACE.get(cmd[1], {}).items(): parsed_command = parsed_command.replace(key, val) if action == "Get-WmiObject": values = _vbs_exec_code("""Set wmi = GetObject("winmgmts:") Set lines = wmi.InstancesOf("%s") On Error Resume Next Err.clear For Each line in lines %s Next """ % (cmd[1], parsed_command), "@") elif action == "Get-Service": values = _vbs_exec_code("""serviceName = "%s" Set wmi = GetObject("winmgmts://./root/cimv2") Set line = wmi.Get("Win32_Service.Name='" & serviceName & "'") %s """ % (cmd[1], parsed_command), "@") while True: yield [None if fld is None else _VBS_WMI_OUTPUT.get(cmd[1], {}).get(fld, lambda x: x)( next(values).strip() ) for fld in fields] def exec_query(cmd, fields): """Execute a system query using PowerShell if it is available, and using VBS/cscript as a fallback. """ if conf.prog.powershell is None: return _exec_query_vbs(cmd, fields) return _exec_query_ps(cmd, fields) def _where(filename, dirs=None, env="PATH"): """Find file in current dir, in deep_lookup cache or in system path""" if dirs is None: dirs = [] if not isinstance(dirs, list): dirs = [dirs] if glob(filename): return filename paths = [os.curdir] + os.environ[env].split(os.path.pathsep) + dirs for path in paths: for match in glob(os.path.join(path, filename)): if match: return os.path.normpath(match) raise IOError("File not found: %s" % filename) def win_find_exe(filename, installsubdir=None, env="ProgramFiles"): """Find executable in current dir, system path or given ProgramFiles subdir""" fns = [filename] if filename.endswith(".exe") else [filename+".exe", filename] for fn in fns: try: if installsubdir is None: path = _where(fn) else: path = _where(fn, dirs=[os.path.join(os.environ[env], installsubdir)]) except IOError: path = None else: break return path class WinProgPath(ConfClass): _default = "" def __init__(self): self._reload() def _reload(self): # We try some magic to find the appropriate executables self.pdfreader = win_find_exe("AcroRd32") self.psreader = win_find_exe("gsview32") self.dot = win_find_exe("dot") self.tcpdump = win_find_exe("windump") self.tshark = win_find_exe("tshark") self.tcpreplay = win_find_exe("tcpreplay") self.display = self._default self.hexedit = win_find_exe("hexer") self.sox = win_find_exe("sox") self.wireshark = win_find_exe("wireshark", "wireshark") self.powershell = win_find_exe( "powershell", installsubdir="System32\\WindowsPowerShell\\v1.0", env="SystemRoot" ) self.cscript = win_find_exe("cscript", installsubdir="System32", env="SystemRoot") self.cmd = win_find_exe("cmd", installsubdir="System32", env="SystemRoot") if self.wireshark: manu_path = load_manuf(os.path.sep.join(self.wireshark.split(os.path.sep)[:-1])+os.path.sep+"manuf") scapy.data.MANUFDB = conf.manufdb = manu_path self.os_access = (self.powershell is not None) or (self.cscript is not None) conf.prog = WinProgPath() if not conf.prog.os_access: warning("Scapy did not detect powershell and cscript ! Routes, interfaces and much more won't work !", onlyOnce=True) if conf.prog.tcpdump and conf.use_npcap and conf.prog.os_access: def test_windump_npcap(): """Return wether windump version is correct or not""" try: p_test_windump = sp.Popen([conf.prog.tcpdump, "-help"], stdout=sp.PIPE, stderr=sp.STDOUT) stdout, err = p_test_windump.communicate() _output = stdout.lower() return b"npcap" in _output and not b"winpcap" in _output except: return False windump_ok = test_windump_npcap() if not windump_ok: warning("The installed Windump version does not work with Npcap ! Refer to 'Winpcap/Npcap conflicts' in scapy's doc", onlyOnce=True) del windump_ok # Auto-detect release NEW_RELEASE = is_new_release() class PcapNameNotFoundError(Scapy_Exception): pass def is_interface_valid(iface): if "guid" in iface and iface["guid"]: # Fix '-' instead of ':' if "mac" in iface: iface["mac"] = iface["mac"].replace("-", ":") return True return False def get_windows_if_list(): """Returns windows interfaces.""" if not conf.prog.os_access: return [] if is_new_release(): # This works only starting from Windows 8/2012 and up. For older Windows another solution is needed # Careful: this is weird, but Get-NetAdaptater works like: (Name isn't the interface name) # Name InterfaceDescription ifIndex Status MacAddress LinkSpeed # ---- -------------------- ------- ------ ---------- --------- # Ethernet Killer E2200 Gigabit Ethernet Contro... 13 Up D0-50-99-56-DD-F9 1 Gbps query = exec_query(['Get-NetAdapter'], ['InterfaceDescription', 'InterfaceIndex', 'Name', 'InterfaceGuid', 'MacAddress', 'InterfaceAlias']) # It is normal that it is in this order else: query = exec_query(['Get-WmiObject', 'Win32_NetworkAdapter'], ['Name', 'InterfaceIndex', 'InterfaceDescription', 'GUID', 'MacAddress', 'NetConnectionID']) return [ iface for iface in (dict(zip(['name', 'win_index', 'description', 'guid', 'mac', 'netid'], line)) for line in query) if is_interface_valid(iface) ] def get_ips(v6=False): """Returns all available IPs matching to interfaces, using the windows system. Should only be used as a WinPcapy fallback.""" res = {} for descr, ipaddr in exec_query(['Get-WmiObject', 'Win32_NetworkAdapterConfiguration'], ['Description', 'IPAddress']): if ipaddr.strip(): res[descr] = ipaddr.split(",", 1)[v6].strip('{}').strip() return res def get_ip_from_name(ifname, v6=False): """Backward compatibility: indirectly calls get_ips Deprecated.""" return get_ips(v6=v6).get(ifname, "") class NetworkInterface(object): """A network interface of your local host""" def __init__(self, data=None): self.name = None self.ip = None self.mac = None self.pcap_name = None self.description = None self.data = data self.invalid = False self.raw80211 = None if data is not None: self.update(data) def update(self, data): """Update info about network interface according to given dnet dictionary""" if 'netid' in data and data['netid'] == scapy.consts.LOOPBACK_NAME: # Force LOOPBACK_NAME: Some Windows systems overwrite 'name' self.name = scapy.consts.LOOPBACK_NAME else: self.name = data['name'] self.description = data['description'] self.win_index = data['win_index'] self.guid = data['guid'] if 'invalid' in data: self.invalid = data['invalid'] # Other attributes are optional self._update_pcapdata() try: # Npcap loopback interface if self.name == scapy.consts.LOOPBACK_NAME and conf.use_npcap: # https://nmap.org/npcap/guide/npcap-devguide.html self.mac = "00:00:00:00:00:00" self.ip = "127.0.0.1" conf.cache_ipaddrs[self.pcap_name] = socket.inet_aton(self.ip) return else: self.mac = data['mac'] except KeyError: pass try: self.ip = socket.inet_ntoa(get_if_raw_addr(self)) except (TypeError, NameError): pass try: # Windows native loopback interface if not self.ip and self.name == scapy.consts.LOOPBACK_NAME: self.ip = "127.0.0.1" conf.cache_ipaddrs[self.pcap_name] = socket.inet_aton(self.ip) except (KeyError, AttributeError, NameError) as e: print(e) def _update_pcapdata(self): if self.is_invalid(): return for i in get_if_list(): if i.endswith(self.data['guid']): self.pcap_name = i return raise PcapNameNotFoundError def is_invalid(self): return self.invalid def _check_npcap_requirement(self): if not conf.use_npcap: raise OSError("This operation requires Npcap.") if self.raw80211 is None: # This checks if npcap has Dot11 enabled and if the interface is compatible, # by looking for the npcap/Parameters/Dot11Adapters key in the registry. try: dot11adapters = next(iter(_vbs_exec_code("""WScript.Echo CreateObject("WScript.Shell").RegRead("HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Services\\npcap\\Parameters\\Dot11Adapters")"""))) except StopIteration: pass else: self.raw80211 = ("\\Device\\" + self.guid).lower() in dot11adapters.lower() if not self.raw80211: raise Scapy_Exception("This interface does not support raw 802.11") def mode(self): """Get the interface operation mode. Only available with Npcap.""" self._check_npcap_requirement() return sp.Popen([_WlanHelper, self.guid[1:-1], "mode"], stdout=sp.PIPE).communicate()[0].strip() def availablemodes(self): """Get all available interface modes. Only available with Npcap.""" # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 self._check_npcap_requirement() return sp.Popen([_WlanHelper, self.guid[1:-1], "modes"], stdout=sp.PIPE).communicate()[0].strip().split(",") def setmode(self, mode): """Set the interface mode. It can be: - 0 or managed: Managed Mode (aka "Extensible Station Mode") - 1 or monitor: Monitor Mode (aka "Network Monitor Mode") - 2 or master: Master Mode (aka "Extensible Access Point") (supported from Windows 7 and later) - 3 or wfd_device: The Wi-Fi Direct Device operation mode (supported from Windows 8 and later) - 4 or wfd_owner: The Wi-Fi Direct Group Owner operation mode (supported from Windows 8 and later) - 5 or wfd_client: The Wi-Fi Direct Client operation mode (supported from Windows 8 and later) Only available with Npcap.""" # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 self._check_npcap_requirement() _modes = { 0: "managed", 1: "monitor", 2: "master", 3: "wfd_device", 4: "wfd_owner", 5: "wfd_client" } m = _modes.get(mode, "unknown") if isinstance(mode, int) else mode return sp.call(_WlanHelper + " " + self.guid[1:-1] + " mode " + m) def channel(self): """Get the channel of the interface. Only available with Npcap.""" # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 self._check_npcap_requirement() return sp.Popen([_WlanHelper, self.guid[1:-1], "channel"], stdout=sp.PIPE).communicate()[0].strip() def setchannel(self, channel): """Set the channel of the interface (1-14): Only available with Npcap.""" # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 self._check_npcap_requirement() return sp.call(_WlanHelper + " " + self.guid[1:-1] + " channel " + str(channel)) def frequence(self): """Get the frequence of the interface. Only available with Npcap.""" # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 self._check_npcap_requirement() return sp.Popen([_WlanHelper, self.guid[1:-1], "freq"], stdout=sp.PIPE).communicate()[0].strip() def setfrequence(self, freq): """Set the channel of the interface (1-14): Only available with Npcap.""" # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 self._check_npcap_requirement() return sp.call(_WlanHelper + " " + self.guid[1:-1] + " freq " + str(freq)) def availablemodulations(self): """Get all available 802.11 interface modulations. Only available with Npcap.""" # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 self._check_npcap_requirement() return sp.Popen([_WlanHelper, self.guid[1:-1], "modus"], stdout=sp.PIPE).communicate()[0].strip().split(",") def modulation(self): """Get the 802.11 modulation of the interface. Only available with Npcap.""" # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 self._check_npcap_requirement() return sp.Popen([_WlanHelper, self.guid[1:-1], "modu"], stdout=sp.PIPE).communicate()[0].strip() def setmodulation(self, modu): """Set the interface modulation. It can be: - 0: dsss - 1: fhss - 2: irbaseband - 3: ofdm - 4: hrdss - 5: erp - 6: ht - 7: vht - 8: ihv - 9: mimo-ofdm - 10: mimo-ofdm Only available with Npcap.""" # According to https://nmap.org/npcap/guide/npcap-devguide.html#npcap-feature-dot11 self._check_npcap_requirement() _modus = { 0: "dsss", 1: "fhss", 2: "irbaseband", 3: "ofdm", 4: "hrdss", 5: "erp", 6: "ht", 7: "vht", 8: "ihv", 9: "mimo-ofdm", 10: "mimo-ofdm", } m = _modus.get(modu, "unknown") if isinstance(modu, int) else modu return sp.call(_WlanHelper + " " + self.guid[1:-1] + " mode " + m) def __repr__(self): return "<%s %s %s>" % (self.__class__.__name__, self.name, self.guid) def pcap_service_name(): """Return the pcap adapter service's name""" return "npcap" if conf.use_npcap else "npf" def pcap_service_status(): """Returns a tuple (name, description, started) of the windows pcap adapter""" for i in exec_query(['Get-Service', pcap_service_name()], ['Name', 'DisplayName', 'Status']): name = i[0] description = i[1] started = (i[2].lower().strip() == 'running') if name == pcap_service_name(): return (name, description, started) return (None, None, None) def pcap_service_control(action, askadmin=True): """Util to run pcap control command""" if not conf.prog.powershell: return False command = action + ' ' + pcap_service_name() stdout = POWERSHELL_PROCESS.query([_encapsulate_admin(command) if askadmin else command]) return "error" not in "".join(stdout).lower() def pcap_service_start(askadmin=True): """Starts the pcap adapter. Will ask for admin. Returns True if success""" return pcap_service_control('Start-Service', askadmin=askadmin) def pcap_service_stop(askadmin=True): """Stops the pcap adapter. Will ask for admin. Returns True if success""" return pcap_service_control('Stop-Service', askadmin=askadmin) from scapy.modules.six.moves import UserDict class NetworkInterfaceDict(UserDict): """Store information about network interfaces and convert between names""" def load_from_powershell(self): if not conf.prog.os_access: return ifaces_ips = None for i in get_windows_if_list(): try: interface = NetworkInterface(i) self.data[interface.guid] = interface # If no IP address was detected using winpcap and if # the interface is not the loopback one, look for # internal windows interfaces if not interface.ip: if not ifaces_ips: # ifaces_ips is used as a cache ifaces_ips = get_ips() # If it exists, retrieve the interface's IP from the cache interface.ip = ifaces_ips.get(interface.name, "") except (KeyError, PcapNameNotFoundError): pass if not self.data and conf.use_winpcapy: _detect = pcap_service_status() def _ask_user(): if not conf.interactive: return False while True: _confir = input("Do you want to start it ? (yes/no) [y]: ").lower().strip() if _confir in ["yes", "y", ""]: return True elif _confir in ["no", "n"]: return False return False _error_msg = "No match between your pcap and windows network interfaces found. " if _detect[0] and not _detect[2] and not (hasattr(self, "restarted_adapter") and self.restarted_adapter): warning("Scapy has detected that your pcap service is not running !") if not conf.interactive or _ask_user(): succeed = pcap_service_start(askadmin=conf.interactive) self.restarted_adapter = True if succeed: log_loading.info("Pcap service started !") self.load_from_powershell() return _error_msg = "Could not start the pcap service ! " warning(_error_msg + "You probably won't be able to send packets. " "Deactivating unneeded interfaces and restarting Scapy might help. " "Check your winpcap and powershell installation, and access rights.", onlyOnce=True) else: # Loading state: remove invalid interfaces self.remove_invalid_ifaces() # Replace LOOPBACK_INTERFACE try: scapy.consts.LOOPBACK_INTERFACE = self.dev_from_name( scapy.consts.LOOPBACK_NAME, ) except: pass def dev_from_name(self, name): """Return the first pcap device name for a given Windows device name. """ for iface in six.itervalues(self): if iface.name == name: return iface raise ValueError("Unknown network interface %r" % name) def dev_from_pcapname(self, pcap_name): """Return Windows device name for given pcap device name.""" for iface in six.itervalues(self): if iface.pcap_name == pcap_name: return iface raise ValueError("Unknown pypcap network interface %r" % pcap_name) def dev_from_index(self, if_index): """Return interface name from interface index""" for devname, iface in six.iteritems(self): if iface.win_index == str(if_index): return iface if str(if_index) == "1": # Test if the loopback interface is set up if isinstance(scapy.consts.LOOPBACK_INTERFACE, NetworkInterface): return scapy.consts.LOOPBACK_INTERFACE raise ValueError("Unknown network interface index %r" % if_index) def remove_invalid_ifaces(self): """Remove all invalid interfaces""" for devname in list(self.keys()): iface = self.data[devname] if iface.is_invalid(): self.data.pop(devname) def reload(self): """Reload interface list""" self.restarted_adapter = False self.data.clear() self.load_from_powershell() def show(self, resolve_mac=True, print_result=True): """Print list of available network interfaces in human readable form""" res = [] for iface_name in sorted(self.data): dev = self.data[iface_name] mac = dev.mac if resolve_mac and conf.manufdb: mac = conf.manufdb._resolve_MAC(mac) res.append((str(dev.win_index).ljust(5), str(dev.name).ljust(35), str(dev.ip).ljust(15), mac)) res = pretty_list(res, [("INDEX", "IFACE", "IP", "MAC")]) if print_result: print(res) else: return res def __repr__(self): return self.show(print_result=False) # Init POWERSHELL_PROCESS POWERSHELL_PROCESS = _PowershellManager() IFACES = NetworkInterfaceDict() IFACES.load_from_powershell() def pcapname(dev): """Return pypcap device name for given interface or libdnet/Scapy device name. """ if isinstance(dev, NetworkInterface): if dev.is_invalid(): return None return dev.pcap_name try: return IFACES.dev_from_name(dev).pcap_name except ValueError: if conf.use_pcap: # pcap.pcap() will choose a sensible default for sniffing if # iface=None return None raise def dev_from_pcapname(pcap_name): """Return libdnet/Scapy device name for given pypcap device name""" return IFACES.dev_from_pcapname(pcap_name) def dev_from_index(if_index): """Return Windows adapter name for given Windows interface index""" return IFACES.dev_from_index(if_index) def show_interfaces(resolve_mac=True): """Print list of available network interfaces""" return IFACES.show(resolve_mac) _orig_open_pcap = pcapdnet.open_pcap pcapdnet.open_pcap = lambda iface,*args,**kargs: _orig_open_pcap(pcapname(iface),*args,**kargs) get_if_raw_hwaddr = pcapdnet.get_if_raw_hwaddr = lambda iface, *args, **kargs: ( ARPHDR_ETHER, mac2str(IFACES.dev_from_pcapname(pcapname(iface)).mac) ) def _read_routes_xp(): # The InterfaceIndex in Win32_IP4RouteTable does not match the # InterfaceIndex in Win32_NetworkAdapter under some platforms # (namely Windows XP): let's try an IP association routes = [] partial_routes = [] # map local IP addresses to interfaces local_addresses = {iface.ip: iface for iface in six.itervalues(IFACES)} iface_indexes = {} for line in exec_query(['Get-WmiObject', 'Win32_IP4RouteTable'], ['Name', 'Mask', 'NextHop', 'InterfaceIndex', 'Metric1']): if line[2] in local_addresses: iface = local_addresses[line[2]] # This gives us an association InterfaceIndex <-> interface iface_indexes[line[3]] = iface routes.append((atol(line[0]), atol(line[1]), "0.0.0.0", iface, iface.ip, int(line[4]))) else: partial_routes.append((atol(line[0]), atol(line[1]), line[2], line[3], int(line[4]))) for dst, mask, gw, ifidx, metric in partial_routes: if ifidx in iface_indexes: iface = iface_indexes[ifidx] routes.append((dst, mask, gw, iface, iface.ip, metric)) return routes def _read_routes_7(): routes=[] for line in exec_query(['Get-WmiObject', 'Win32_IP4RouteTable'], ['Name', 'Mask', 'NextHop', 'InterfaceIndex', 'Metric1']): try: iface = dev_from_index(line[3]) ip = "127.0.0.1" if line[3] == "1" else iface.ip # Force loopback on iface 1 routes.append((atol(line[0]), atol(line[1]), line[2], iface, ip, int(line[4]))) except ValueError: continue return routes def read_routes(): routes = [] if not conf.prog.os_access: return routes release = platform.release() try: if is_new_release(): routes = _read_routes_post2008() elif release == "XP": routes = _read_routes_xp() else: routes = _read_routes_7() except Exception as e: warning("Error building scapy IPv4 routing table : %s", e, onlyOnce=True) else: if not routes: warning("No default IPv4 routes found. Your Windows release may no be supported and you have to enter your routes manually", onlyOnce=True) return routes def _get_metrics(ipv6=False): """Returns a dict containing all IPv4 or IPv6 interfaces' metric, ordered by their interface index. """ query_cmd = "netsh interface " + ("ipv6" if ipv6 else "ipv4") + " show interfaces level=verbose" stdout = POWERSHELL_PROCESS.query([query_cmd]) res = {} _buffer = [] _pattern = re.compile(".*:\s+(\d+)") for _line in stdout: if not _line.strip() and len(_buffer) > 0: if_index = re.search(_pattern, _buffer[3]).group(1) if_metric = int(re.search(_pattern, _buffer[5]).group(1)) res[if_index] = if_metric _buffer = [] else: _buffer.append(_line) return res def _read_routes_post2008(): routes = [] if4_metrics = None # This works only starting from Windows 8/2012 and up. For older Windows another solution is needed # Get-NetRoute -AddressFamily IPV4 | select ifIndex, DestinationPrefix, NextHop, RouteMetric, InterfaceMetric | fl for line in exec_query(['Get-NetRoute', '-AddressFamily IPV4'], ['ifIndex', 'DestinationPrefix', 'NextHop', 'RouteMetric', 'InterfaceMetric']): try: iface = dev_from_index(line[0]) if iface.ip == "0.0.0.0": continue except: continue # try: # intf = pcapdnet.dnet.intf().get_dst(pcapdnet.dnet.addr(type=2, addrtxt=dest)) # except OSError: # log_loading.warning("Building Scapy's routing table: Couldn't get outgoing interface for destination %s", dest) # continue dest, mask = line[1].split('/') ip = "127.0.0.1" if line[0] == "1" else iface.ip # Force loopback on iface 1 if not line[4].strip(): # InterfaceMetric is not available. Load it from netsh if not if4_metrics: if4_metrics = _get_metrics() metric = int(line[3]) + if4_metrics.get(iface.win_index, 0) # RouteMetric + InterfaceMetric else: metric = int(line[3]) + int(line[4]) # RouteMetric + InterfaceMetric routes.append((atol(dest), itom(int(mask)), line[2], iface, ip, metric)) return routes ############ ### IPv6 ### ############ def in6_getifaddr(): """ Returns all IPv6 addresses found on the computer """ ifaddrs = [] for ifaddr in in6_getifaddr_raw(): try: ifaddrs.append((ifaddr[0], ifaddr[1], dev_from_pcapname(ifaddr[2]))) except ValueError: pass # Appends Npcap loopback if available if conf.use_npcap and scapy.consts.LOOPBACK_INTERFACE: ifaddrs.append(("::1", 0, scapy.consts.LOOPBACK_INTERFACE)) return ifaddrs def _append_route6(routes, dpref, dp, nh, iface, lifaddr, metric): cset = [] # candidate set (possible source addresses) if iface.name == scapy.consts.LOOPBACK_NAME: if dpref == '::': return cset = ['::1'] else: devaddrs = (x for x in lifaddr if x[2] == iface) cset = construct_source_candidate_set(dpref, dp, devaddrs) if not cset: return # APPEND (DESTINATION, NETMASK, NEXT HOP, IFACE, CANDIDATS, METRIC) routes.append((dpref, dp, nh, iface, cset, metric)) def _read_routes6_post2008(): routes6 = [] # This works only starting from Windows 8/2012 and up. For older Windows another solution is needed # Get-NetRoute -AddressFamily IPV6 | select ifIndex, DestinationPrefix, NextHop | fl lifaddr = in6_getifaddr() for line in exec_query(['Get-NetRoute', '-AddressFamily IPV6'], ['ifIndex', 'DestinationPrefix', 'NextHop', 'RouteMetric', 'InterfaceMetric']): try: if_index = line[0] iface = dev_from_index(if_index) except: continue dpref, dp = line[1].split('/') dp = int(dp) nh = line[2] metric = int(line[3])+int(line[4]) _append_route6(routes6, dpref, dp, nh, iface, lifaddr, metric) return routes6 def _read_routes6_7(): # Not supported in powershell, we have to use netsh routes = [] query_cmd = "netsh interface ipv6 show route level=verbose" stdout = POWERSHELL_PROCESS.query([query_cmd]) lifaddr = in6_getifaddr() if6_metrics = _get_metrics(ipv6=True) # Define regexes r_int = [".*:\s+(\d+)"] r_all = ["(.*)"] r_ipv6 = [".*:\s+([A-z|0-9|:]+(\/\d+)?)"] # Build regex list for each object regex_list = r_ipv6*2 + r_int + r_all*3 + r_int + r_all*3 current_object = [] index = 0 for l in stdout: if not l.strip(): if not current_object: continue if len(current_object) == len(regex_list): try: if_index = current_object[2] iface = dev_from_index(if_index) except: current_object = [] index = 0 continue _ip = current_object[0].split("/") dpref = _ip[0] dp = int(_ip[1]) _match = re.search(r_ipv6[0], current_object[3]) nh = "::" if _match: # Detect if Next Hop is specified (if not, it will be the IFName) _nhg1 = _match.group(1) nh = _nhg1 if re.match(".*:.*:.*", _nhg1) else "::" metric = int(current_object[6]) + if6_metrics.get(if_index, 0) _append_route6(routes, dpref, dp, nh, iface, lifaddr, metric) # Reset current object current_object = [] index = 0 else: pattern = re.compile(regex_list[index]) match = re.search(pattern, l) if match: current_object.append(match.group(1)) index = index + 1 return routes def read_routes6(): routes6 = [] if not conf.prog.os_access: return routes6 try: if is_new_release(): routes6 = _read_routes6_post2008() else: routes6 = _read_routes6_7() except Exception as e: warning("Error building scapy IPv6 routing table : %s", e, onlyOnce=True) return routes6 def get_working_if(): try: # return the interface associated with the route with smallest # mask (route by default if it exists) return min(conf.route.routes, key=lambda x: x[1])[3] except ValueError: # no route return scapy.consts.LOOPBACK_INTERFACE def _get_valid_guid(): if scapy.consts.LOOPBACK_INTERFACE: return scapy.consts.LOOPBACK_INTERFACE.guid else: for i in six.itervalues(IFACES): if not i.is_invalid(): return i.guid def route_add_loopback(routes=None, ipv6=False, iflist=None): """Add a route to 127.0.0.1 and ::1 to simplify unit tests on Windows""" if not WINDOWS: warning("Not available") return warning("This will completly mess up the routes. Testing purpose only !") # Add only if some adpaters already exist if ipv6: if not conf.route6.routes: return else: if not conf.route.routes: return data = { 'name': scapy.consts.LOOPBACK_NAME, 'description': "Loopback", 'win_index': -1, 'guid': _get_valid_guid(), 'invalid': False, 'mac': '00:00:00:00:00:00', } data['pcap_name'] = six.text_type("\\Device\\NPF_" + data['guid']) adapter = NetworkInterface(data) adapter.ip = "127.0.0.1" if iflist: iflist.append(adapter.pcap_name) return # Remove all LOOPBACK_NAME routes for route in list(conf.route.routes): iface = route[3] if iface.name == scapy.consts.LOOPBACK_NAME: conf.route.routes.remove(route) # Remove LOOPBACK_NAME interface for devname, iface in list(IFACES.items()): if iface.name == scapy.consts.LOOPBACK_NAME: IFACES.pop(devname) # Inject interface IFACES["{0XX00000-X000-0X0X-X00X-00XXXX000XXX}"] = adapter scapy.consts.LOOPBACK_INTERFACE = adapter if isinstance(conf.iface, NetworkInterface): if conf.iface.name == LOOPBACK_NAME: conf.iface = adapter if isinstance(conf.iface6, NetworkInterface): if conf.iface6.name == LOOPBACK_NAME: conf.iface6 = adapter # Build the packed network addresses loop_net = struct.unpack("!I", socket.inet_aton("127.0.0.0"))[0] loop_mask = struct.unpack("!I", socket.inet_aton("255.0.0.0"))[0] # Build the fake routes loopback_route = (loop_net, loop_mask, "0.0.0.0", adapter, "127.0.0.1", 1) loopback_route6 = ('::1', 128, '::', adapter, ["::1"], 1) loopback_route6_custom = ("fe80::", 128, "::", adapter, ["::1"], 1) if routes == None: # Injection conf.route6.routes.append(loopback_route6) conf.route6.routes.append(loopback_route6_custom) conf.route.routes.append(loopback_route) # Flush the caches conf.route6.invalidate_cache() conf.route.invalidate_cache() else: if ipv6: routes.append(loopback_route6) routes.append(loopback_route6_custom) else: routes.append(loopback_route) if not conf.use_winpcapy: class NotAvailableSocket(SuperSocket): desc = "wpcap.dll missing" def __init__(self, *args, **kargs): raise RuntimeError("Sniffing and sending packets is not available: " "winpcap is not installed") conf.L2socket = NotAvailableSocket conf.L2listen = NotAvailableSocket conf.L3socket = NotAvailableSocket