% send, sniff, sr* tests for Scapy ~ needs_root ############ ############ + Test bridge_and_sniff() using tap sockets ~ tap = Create two tap interfaces import subprocess from threading import Thread tap0, tap1 = [TunTapInterface("tap%d" % i) for i in range(2)] chk_kwargs = {"timeout": 3} if LINUX: for i in range(2): assert subprocess.check_call(["ip", "link", "set", "tap%d" % i, "up"], **chk_kwargs) == 0 else: for i in range(2): assert subprocess.check_call(["ifconfig", "tap%d" % i, "up"], **chk_kwargs) == 0 = Run a sniff thread on the tap1 **interface** * It will terminate when 5 IP packets from 192.0.2.1 have been sniffed started = threading.Event() t_sniff = Thread( target=sniff, kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", "started_callback": started.set}, name="tests sniff 1") t_sniff.start() started.wait(timeout=5) = Run a bridge_and_sniff thread between the taps **sockets** * It will terminate when 5 IP packets from 192.0.2.1 have been forwarded started = threading.Event() t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), kwargs={"store": False, "count": 5, 'prn': Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", "started_callback": started.set}, name="tests bridge_and_sniff 1") t_bridge.start() started.wait(timeout=5) = Send five IP packets from 192.0.2.1 to the tap0 **interface** sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0", count=5) = Wait for the threads t_bridge.join(5) t_sniff.join(5) assert not t_bridge.is_alive() assert not t_sniff.is_alive() = Run a sniff thread on the tap1 **interface** * It will terminate when 5 IP packets from 198.51.100.1 have been sniffed started = threading.Event() t_sniff = Thread( target=sniff, kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1", "started_callback": started.set}, name="tests sniff 2") t_sniff.start() started.wait(timeout=5) = Run a bridge_and_sniff thread between the taps **sockets** * It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded def nat_1_2(pkt): if IP in pkt and pkt[IP].src == "192.0.2.1": pkt[IP].src = "198.51.100.1" del pkt[IP].chksum return pkt return False started = threading.Event() t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), kwargs={"store": False, "count": 5, 'prn': Packet.summary, "xfrm12": nat_1_2, "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", "started_callback": started.set}, name="tests bridge_and_sniff 2") t_bridge.start() started.wait(timeout=5) = Send five IP packets from 192.0.2.1 to the tap0 **interface** sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0", count=5) = Wait for the threads t_bridge.join(5) t_sniff.join(5) assert not t_bridge.is_alive() assert not t_sniff.is_alive() = Delete the tap interfaces if conf.use_pypy: # See https://pypy.readthedocs.io/en/latest/cpython_differences.html tap0.close() tap1.close() else: del tap0, tap1 ############ ############ + Test bridge_and_sniff() using tun sockets ~ tun not_libpcap = Create two tun interfaces import subprocess from threading import Thread tun0, tun1 = [TunTapInterface("tun%d" % i) for i in range(2)] chk_kwargs = {"timeout": 3} if LINUX: for i in range(2): assert subprocess.check_call(["ip", "link", "set", "tun%d" % i, "up"], **chk_kwargs) == 0 assert subprocess.check_call([ "ip", "addr", "change", "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"], **chk_kwargs) == 0 else: for i in range(2): assert subprocess.check_call(["ifconfig", "tun%d" % i, "up"], **chk_kwargs) == 0 assert subprocess.check_call(["ifconfig", "tun0", "192.0.2.1", "192.0.2.2"], **chk_kwargs) == 0 = Run a sniff thread on the tun1 **interface** * It will terminate when 5 IP packets from 192.0.2.1 have been sniffed started = threading.Event() t_sniff = Thread(target=sniff, kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", "started_callback": started.set}, name="tests sniff 3") t_sniff.start() started.wait(timeout=5) = Run a bridge_and_sniff thread between the tuns **sockets** * It will terminate when 5 IP packets from 192.0.2.1 have been forwarded. started = threading.Event() t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), kwargs={"store": False, "count": 5, 'prn': Packet.summary, "xfrm12": lambda pkt: pkt, "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", "started_callback": started.set}, name="tests bridge_and_sniff 3") t_bridge.start() started.wait(timeout=5) = Send five IP packets from 192.0.2.1 to the tun0 **interface** conf.route.add(net="192.0.2.2/32", dev="tun0") send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0") conf.route.delt(net="192.0.2.2/32", dev="tun0") = Wait for the threads t_bridge.join(5) t_sniff.join(5) assert not t_bridge.is_alive() assert not t_sniff.is_alive() = Run a sniff thread on the tun1 **interface** * It will terminate when 5 IP packets from 198.51.100.1 have been sniffed started = threading.Event() t_sniff = Thread(target=sniff, kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, "lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1", "started_callback": started.set}, name="tests sniff 4") t_sniff.start() started.wait(timeout=5) = Run a bridge_and_sniff thread between the tuns **sockets** * It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded def nat_1_2(pkt): if IP in pkt and pkt[IP].src == "192.0.2.1": pkt[IP].src = "198.51.100.1" del pkt[IP].chksum return pkt return False started = threading.Event() t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), kwargs={"store": False, "count": 5, 'prn': Packet.summary, "xfrm12": nat_1_2, "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", "started_callback": started.set}, name="tests bridge_and_sniff 4") t_bridge.start() started.wait(timeout=5) = Send five IP packets from 192.0.2.1 to the tun0 **interface** conf.route.add(net="192.0.2.2/32", dev="tun0") send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0") conf.route.delt(net="192.0.2.2/32", dev="tun0") = Wait for the threads t_bridge.join(5) t_sniff.join(5) assert not t_bridge.is_alive() assert not t_sniff.is_alive() = Delete the tun interfaces if conf.use_pypy: # See https://pypy.readthedocs.io/en/latest/cpython_differences.html tun0.close() tun1.close() else: del tun0, tun1 ############ ############ + Test bridge_and_sniff() using veth pairs ~ linux needs_root veth = Ensure bridge_and_sniff does not close sockets if data is send within xfrm on ingress interface from scapy.arch.linux import VEthPair with VEthPair('a_0', 'a_1') as veth_0: with VEthPair('b_0', 'b_1') as veth_1: xfrm_count = { 'a_0':0, 'b_0': 0 } def xfrm_x(pkt): pkt_tx = pkt.copy() ether_lyr = pkt_tx[Ether] ether_lyr.type = 0x1234 # we send to peer interface - avoid loop # send on receiving interface - triggers return None on recv() in L2Socket sendp(pkt_tx, iface=pkt.sniffed_on) global xfrm_count xfrm_count[pkt.sniffed_on] = xfrm_count[pkt.sniffed_on] + 1 return True started = threading.Event() t_bridge = Thread(target=bridge_and_sniff, args=('a_0', 'b_0'), kwargs={ 'xfrm12': xfrm_x, 'xfrm21': xfrm_x, 'store': False, 'count': 4, 'lfilter': lambda p: Ether in p and p[Ether].type == 0xbeef, "started_callback": started.set}, name="tests bridge_and_sniff VEthPair") t_bridge.start() started.wait(timeout=5) # send frames in both directions for if_name in ['a_1', 'b_1', 'a_1', 'b_1']: sendp([Ether(type=0xbeef) / Raw(b'On a scale from one to ten what is your favourite colour of the alphabet?')], iface=if_name) t_bridge.join(1) # now test of the socket used in bridge_and_sniff() was alive all the time assert (xfrm_count['a_0'] == 2) assert (xfrm_count['b_0'] == 2) ############ ############ + Test arpleak() using a tap socket ~ tap tcpdump = Create a tap interface from unittest import mock import struct import subprocess from threading import Thread import time tap0 = TunTapInterface("tap0") chk_kwargs = {"timeout": 3} if LINUX: assert subprocess.check_call(["ip", "link", "set", "tap0", "up"], **chk_kwargs) == 0 else: assert subprocess.check_call(["ifconfig", "tap0", "up"], **chk_kwargs) == 0 = Check for arpleak def answer_arp_leak(pkt): mymac = b"\x00\x01\x02\x03\x04\x06" myip = b"\xc0\x00\x02\x02" # 192.0.2.2 if not ARP in pkt: return e_src = pkt.src pkt = raw(pkt[ARP]) if pkt[:4] != b'\x00\x01\x08\x00': print("Invalid ARP") return hwlen, plen, op = struct.unpack('>BBH', pkt[4:8]) if op != 1: print("Invalid ARP op") return fmt = ('%ds%ds' % (hwlen, plen)) * 2 hwsrc, psrc, hwdst, pdst = struct.unpack(fmt, pkt[8:8 + (plen + hwlen) * 2]) if pdst[:4] != myip[:plen]: print("Invalid ARP pdst %r" % pdst) return ans = Ether(dst=e_src, src=mymac, type=0x0806) ans /= (b'\x00\x01\x08\x00' + struct.pack('>BBH' + fmt, hwlen, plen, 2, mymac, myip, hwsrc, psrc)) tap0.send(ans) print('Answered!') started = threading.Event() t_answer = Thread( target=sniff, kwargs={"prn": answer_arp_leak, "timeout": 10, "store": False, "opened_socket": tap0, "started_callback": started.set}, name="tests answer_arp_leak") t_answer.start() started.wait(timeout=5) @mock.patch("scapy.layers.l2.get_if_addr") @mock.patch("scapy.layers.l2.get_if_hwaddr") def test_arpleak(mock_get_if_hwaddr, mock_get_if_addr, hwlen=255, plen=255): conf.route.ifadd("tap0", "192.0.2.0/24") mock_get_if_addr.side_effect = lambda _: "192.0.2.1" mock_get_if_hwaddr.side_effect = lambda _: "00:01:02:03:04:05" return arpleak("192.0.2.2/31", timeout=2, hwlen=hwlen, plen=plen) ans, unans = test_arpleak() assert len(ans) == 1 assert len(unans) == 1 ans, unans = test_arpleak(hwlen=6) assert len(ans) == 1 assert len(unans) == 1 ans, unans = test_arpleak(plen=4) assert len(ans) == 1 assert len(unans) == 1 t_answer.join(15) if t_answer.is_alive(): raise Exception("Test timed out") if conf.use_pypy: # See https://pypy.readthedocs.io/en/latest/cpython_differences.html tap0.close() else: del tap0 ##### ##### + Test sr() on multiple interfaces = Setup multiple linux interfaces and ranges ~ linux needs_root dbg import os exit_status = os.system("ip netns add blob0") exit_status |= os.system("ip netns add blob1") exit_status |= os.system("ip link add name scapy0.0 type veth peer name scapy0.1") exit_status |= os.system("ip link add name scapy1.0 type veth peer name scapy1.1") exit_status |= os.system("ip link set scapy0.1 netns blob0 up") exit_status |= os.system("ip link set scapy1.1 netns blob1 up") exit_status |= os.system("ip addr add 100.64.2.1/24 dev scapy0.0") exit_status |= os.system("ip addr add 100.64.3.1/24 dev scapy1.0") exit_status |= os.system("ip --netns blob0 addr add 100.64.2.2/24 dev scapy0.1") exit_status |= os.system("ip --netns blob1 addr add 100.64.3.2/24 dev scapy1.1") exit_status |= os.system("ip link set scapy0.0 up") exit_status |= os.system("ip link set scapy1.0 up") assert exit_status == 0 conf.ifaces.reload() conf.route.resync() try: pkts = sr(IP(dst=["100.64.2.2", "100.64.3.2"])/ICMP(), timeout=1)[0] assert len(pkts) == 2 assert pkts[0].answer.src in ["100.64.2.2", "100.64.3.2"] assert pkts[1].answer.src in ["100.64.2.2", "100.64.3.2"] finally: e = os.system("ip netns del blob0") e = os.system("ip netns del blob1") conf.ifaces.reload() conf.route.resync() = sr() performance test ~ linux needs_root veth not_pypy import subprocess import shlex try: # Create a dedicated network name space to simulate remote host subprocess.check_call(shlex.split("sudo ip netns add scapy")) # Create a virtual Ethernet pair to connect default and new NS subprocess.check_call(shlex.split("sudo ip link add type veth")) # Move veth1 to the new NS subprocess.check_call(shlex.split("sudo ip link set veth1 netns scapy")) # Setup vNIC in the default NS subprocess.check_call(shlex.split("sudo ip link set veth0 up")) subprocess.check_call(shlex.split("sudo ip addr add 192.168.168.1/24 dev veth0")) # Setup vNIC in the dedicated NS subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo up")) subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 up")) subprocess.check_call(shlex.split("sudo ip netns exec scapy ip addr add 192.168.168.2/24 dev veth1")) # Perform test conf.route.resync() res, unansw = sr(IP(dst='192.168.168.2') / ICMP(seq=(1, 1000)), timeout=1, verbose=False) finally: try: # Bring down the interfaces subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 down")) subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo down")) # Delete the namespace subprocess.check_call(shlex.split("sudo ip netns delete scapy")) # Remove the virtual Ethernet pair subprocess.check_call(shlex.split("sudo ip link delete veth0")) except subprocess.CalledProcessError as e: print(f"Error during cleanup: {e}") len(res) == 1000