1% send, sniff, sr* tests for Scapy 2 3~ needs_root 4 5############ 6############ 7+ Test bridge_and_sniff() using tap sockets 8 9~ tap 10 11= Create two tap interfaces 12 13import subprocess 14from threading import Thread 15 16tap0, tap1 = [TunTapInterface("tap%d" % i) for i in range(2)] 17 18chk_kwargs = {"timeout": 3} 19 20if LINUX: 21 for i in range(2): 22 assert subprocess.check_call(["ip", "link", "set", "tap%d" % i, "up"], **chk_kwargs) == 0 23else: 24 for i in range(2): 25 assert subprocess.check_call(["ifconfig", "tap%d" % i, "up"], **chk_kwargs) == 0 26 27= Run a sniff thread on the tap1 **interface** 28* It will terminate when 5 IP packets from 192.0.2.1 have been sniffed 29started = threading.Event() 30t_sniff = Thread( 31 target=sniff, 32 kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, 33 "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", 34 "started_callback": started.set}, 35 name="tests sniff 1") 36t_sniff.start() 37started.wait(timeout=5) 38 39= Run a bridge_and_sniff thread between the taps **sockets** 40* It will terminate when 5 IP packets from 192.0.2.1 have been forwarded 41started = threading.Event() 42t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), 43 kwargs={"store": False, "count": 5, 'prn': Packet.summary, 44 "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", 45 "started_callback": started.set}, 46 name="tests bridge_and_sniff 1") 47t_bridge.start() 48started.wait(timeout=5) 49 50= Send five IP packets from 192.0.2.1 to the tap0 **interface** 51sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0", 52 count=5) 53 54= Wait for the threads 55t_bridge.join(5) 56t_sniff.join(5) 57assert not t_bridge.is_alive() 58assert not t_sniff.is_alive() 59 60= Run a sniff thread on the tap1 **interface** 61* It will terminate when 5 IP packets from 198.51.100.1 have been sniffed 62started = threading.Event() 63t_sniff = Thread( 64 target=sniff, 65 kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, 66 "lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1", 67 "started_callback": started.set}, 68 name="tests sniff 2") 69t_sniff.start() 70started.wait(timeout=5) 71 72= Run a bridge_and_sniff thread between the taps **sockets** 73* It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded 74def nat_1_2(pkt): 75 if IP in pkt and pkt[IP].src == "192.0.2.1": 76 pkt[IP].src = "198.51.100.1" 77 del pkt[IP].chksum 78 return pkt 79 return False 80 81started = threading.Event() 82t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), 83 kwargs={"store": False, "count": 5, 'prn': Packet.summary, 84 "xfrm12": nat_1_2, 85 "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", 86 "started_callback": started.set}, 87 name="tests bridge_and_sniff 2") 88t_bridge.start() 89started.wait(timeout=5) 90 91= Send five IP packets from 192.0.2.1 to the tap0 **interface** 92sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0", 93 count=5) 94 95= Wait for the threads 96t_bridge.join(5) 97t_sniff.join(5) 98assert not t_bridge.is_alive() 99assert not t_sniff.is_alive() 100 101= Delete the tap interfaces 102if conf.use_pypy: 103 # See https://pypy.readthedocs.io/en/latest/cpython_differences.html 104 tap0.close() 105 tap1.close() 106else: 107 del tap0, tap1 108 109 110############ 111############ 112+ Test bridge_and_sniff() using tun sockets 113 114~ tun not_libpcap 115 116= Create two tun interfaces 117 118import subprocess 119from threading import Thread 120 121tun0, tun1 = [TunTapInterface("tun%d" % i) for i in range(2)] 122 123chk_kwargs = {"timeout": 3} 124 125if LINUX: 126 for i in range(2): 127 assert subprocess.check_call(["ip", "link", "set", "tun%d" % i, "up"], **chk_kwargs) == 0 128 assert subprocess.check_call([ 129 "ip", "addr", "change", 130 "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"], **chk_kwargs) == 0 131else: 132 for i in range(2): 133 assert subprocess.check_call(["ifconfig", "tun%d" % i, "up"], **chk_kwargs) == 0 134 assert subprocess.check_call(["ifconfig", "tun0", "192.0.2.1", "192.0.2.2"], **chk_kwargs) == 0 135 136= Run a sniff thread on the tun1 **interface** 137* It will terminate when 5 IP packets from 192.0.2.1 have been sniffed 138started = threading.Event() 139t_sniff = Thread(target=sniff, 140 kwargs={"iface": "tun1", "count": 5, 141 "prn": Packet.summary, 142 "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", 143 "started_callback": started.set}, 144 name="tests sniff 3") 145 146t_sniff.start() 147started.wait(timeout=5) 148 149= Run a bridge_and_sniff thread between the tuns **sockets** 150* It will terminate when 5 IP packets from 192.0.2.1 have been forwarded. 151started = threading.Event() 152t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), 153 kwargs={"store": False, "count": 5, 'prn': Packet.summary, 154 "xfrm12": lambda pkt: pkt, 155 "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", 156 "started_callback": started.set}, 157 name="tests bridge_and_sniff 3") 158t_bridge.start() 159started.wait(timeout=5) 160 161= Send five IP packets from 192.0.2.1 to the tun0 **interface** 162conf.route.add(net="192.0.2.2/32", dev="tun0") 163send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0") 164conf.route.delt(net="192.0.2.2/32", dev="tun0") 165 166= Wait for the threads 167t_bridge.join(5) 168t_sniff.join(5) 169assert not t_bridge.is_alive() 170assert not t_sniff.is_alive() 171 172= Run a sniff thread on the tun1 **interface** 173* It will terminate when 5 IP packets from 198.51.100.1 have been sniffed 174started = threading.Event() 175t_sniff = Thread(target=sniff, 176 kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, 177 "lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1", 178 "started_callback": started.set}, 179 name="tests sniff 4") 180 181t_sniff.start() 182started.wait(timeout=5) 183 184= Run a bridge_and_sniff thread between the tuns **sockets** 185* It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded 186def nat_1_2(pkt): 187 if IP in pkt and pkt[IP].src == "192.0.2.1": 188 pkt[IP].src = "198.51.100.1" 189 del pkt[IP].chksum 190 return pkt 191 return False 192 193started = threading.Event() 194t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), 195 kwargs={"store": False, "count": 5, 'prn': Packet.summary, 196 "xfrm12": nat_1_2, 197 "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1", 198 "started_callback": started.set}, 199 name="tests bridge_and_sniff 4") 200t_bridge.start() 201started.wait(timeout=5) 202 203= Send five IP packets from 192.0.2.1 to the tun0 **interface** 204conf.route.add(net="192.0.2.2/32", dev="tun0") 205send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0") 206conf.route.delt(net="192.0.2.2/32", dev="tun0") 207 208= Wait for the threads 209t_bridge.join(5) 210t_sniff.join(5) 211assert not t_bridge.is_alive() 212assert not t_sniff.is_alive() 213 214= Delete the tun interfaces 215if conf.use_pypy: 216 # See https://pypy.readthedocs.io/en/latest/cpython_differences.html 217 tun0.close() 218 tun1.close() 219else: 220 del tun0, tun1 221 222 223############ 224############ 225+ Test bridge_and_sniff() using veth pairs 226~ linux needs_root veth 227 228= Ensure bridge_and_sniff does not close sockets if data is send within xfrm on ingress interface 229 230from scapy.arch.linux import VEthPair 231 232with VEthPair('a_0', 'a_1') as veth_0: 233 with VEthPair('b_0', 'b_1') as veth_1: 234 xfrm_count = { 235 'a_0':0, 236 'b_0': 0 237 } 238 def xfrm_x(pkt): 239 pkt_tx = pkt.copy() 240 ether_lyr = pkt_tx[Ether] 241 ether_lyr.type = 0x1234 # we send to peer interface - avoid loop 242 # send on receiving interface - triggers return None on recv() in L2Socket 243 sendp(pkt_tx, iface=pkt.sniffed_on) 244 global xfrm_count 245 xfrm_count[pkt.sniffed_on] = xfrm_count[pkt.sniffed_on] + 1 246 return True 247 started = threading.Event() 248 t_bridge = Thread(target=bridge_and_sniff, 249 args=('a_0', 'b_0'), 250 kwargs={ 251 'xfrm12': xfrm_x, 252 'xfrm21': xfrm_x, 253 'store': False, 254 'count': 4, 255 'lfilter': lambda p: Ether in p and p[Ether].type == 0xbeef, 256 "started_callback": started.set}, 257 name="tests bridge_and_sniff VEthPair") 258 t_bridge.start() 259 started.wait(timeout=5) 260 # send frames in both directions 261 for if_name in ['a_1', 'b_1', 'a_1', 'b_1']: 262 sendp([Ether(type=0xbeef) / 263 Raw(b'On a scale from one to ten what is your favourite colour of the alphabet?')], 264 iface=if_name) 265 t_bridge.join(1) 266 # now test of the socket used in bridge_and_sniff() was alive all the time 267 assert (xfrm_count['a_0'] == 2) 268 assert (xfrm_count['b_0'] == 2) 269 270 271############ 272############ 273+ Test arpleak() using a tap socket 274 275~ tap tcpdump 276 277= Create a tap interface 278 279from unittest import mock 280import struct 281import subprocess 282from threading import Thread 283import time 284 285tap0 = TunTapInterface("tap0") 286 287chk_kwargs = {"timeout": 3} 288 289if LINUX: 290 assert subprocess.check_call(["ip", "link", "set", "tap0", "up"], **chk_kwargs) == 0 291else: 292 assert subprocess.check_call(["ifconfig", "tap0", "up"], **chk_kwargs) == 0 293 294= Check for arpleak 295 296def answer_arp_leak(pkt): 297 mymac = b"\x00\x01\x02\x03\x04\x06" 298 myip = b"\xc0\x00\x02\x02" # 192.0.2.2 299 if not ARP in pkt: 300 return 301 e_src = pkt.src 302 pkt = raw(pkt[ARP]) 303 if pkt[:4] != b'\x00\x01\x08\x00': 304 print("Invalid ARP") 305 return 306 hwlen, plen, op = struct.unpack('>BBH', pkt[4:8]) 307 if op != 1: 308 print("Invalid ARP op") 309 return 310 fmt = ('%ds%ds' % (hwlen, plen)) * 2 311 hwsrc, psrc, hwdst, pdst = struct.unpack(fmt, 312 pkt[8:8 + (plen + hwlen) * 2]) 313 if pdst[:4] != myip[:plen]: 314 print("Invalid ARP pdst %r" % pdst) 315 return 316 ans = Ether(dst=e_src, src=mymac, type=0x0806) 317 ans /= (b'\x00\x01\x08\x00' + 318 struct.pack('>BBH' + fmt, 319 hwlen, plen, 2, mymac, myip, hwsrc, psrc)) 320 tap0.send(ans) 321 print('Answered!') 322 323started = threading.Event() 324t_answer = Thread( 325 target=sniff, 326 kwargs={"prn": answer_arp_leak, "timeout": 10, "store": False, 327 "opened_socket": tap0, 328 "started_callback": started.set}, 329 name="tests answer_arp_leak") 330 331t_answer.start() 332started.wait(timeout=5) 333 334@mock.patch("scapy.layers.l2.get_if_addr") 335@mock.patch("scapy.layers.l2.get_if_hwaddr") 336def test_arpleak(mock_get_if_hwaddr, mock_get_if_addr, hwlen=255, plen=255): 337 conf.route.ifadd("tap0", "192.0.2.0/24") 338 mock_get_if_addr.side_effect = lambda _: "192.0.2.1" 339 mock_get_if_hwaddr.side_effect = lambda _: "00:01:02:03:04:05" 340 return arpleak("192.0.2.2/31", timeout=2, hwlen=hwlen, plen=plen) 341 342ans, unans = test_arpleak() 343assert len(ans) == 1 344assert len(unans) == 1 345ans, unans = test_arpleak(hwlen=6) 346assert len(ans) == 1 347assert len(unans) == 1 348ans, unans = test_arpleak(plen=4) 349assert len(ans) == 1 350assert len(unans) == 1 351 352t_answer.join(15) 353 354if t_answer.is_alive(): 355 raise Exception("Test timed out") 356 357if conf.use_pypy: 358 # See https://pypy.readthedocs.io/en/latest/cpython_differences.html 359 tap0.close() 360else: 361 del tap0 362 363##### 364##### 365+ Test sr() on multiple interfaces 366 367= Setup multiple linux interfaces and ranges 368~ linux needs_root dbg 369 370import os 371exit_status = os.system("ip netns add blob0") 372exit_status |= os.system("ip netns add blob1") 373exit_status |= os.system("ip link add name scapy0.0 type veth peer name scapy0.1") 374exit_status |= os.system("ip link add name scapy1.0 type veth peer name scapy1.1") 375exit_status |= os.system("ip link set scapy0.1 netns blob0 up") 376exit_status |= os.system("ip link set scapy1.1 netns blob1 up") 377exit_status |= os.system("ip addr add 100.64.2.1/24 dev scapy0.0") 378exit_status |= os.system("ip addr add 100.64.3.1/24 dev scapy1.0") 379exit_status |= os.system("ip --netns blob0 addr add 100.64.2.2/24 dev scapy0.1") 380exit_status |= os.system("ip --netns blob1 addr add 100.64.3.2/24 dev scapy1.1") 381exit_status |= os.system("ip link set scapy0.0 up") 382exit_status |= os.system("ip link set scapy1.0 up") 383assert exit_status == 0 384 385conf.ifaces.reload() 386conf.route.resync() 387 388try: 389 pkts = sr(IP(dst=["100.64.2.2", "100.64.3.2"])/ICMP(), timeout=1)[0] 390 assert len(pkts) == 2 391 assert pkts[0].answer.src in ["100.64.2.2", "100.64.3.2"] 392 assert pkts[1].answer.src in ["100.64.2.2", "100.64.3.2"] 393finally: 394 e = os.system("ip netns del blob0") 395 e = os.system("ip netns del blob1") 396 conf.ifaces.reload() 397 conf.route.resync() 398 399 400= sr() performance test 401~ linux needs_root veth not_pypy 402 403import subprocess 404import shlex 405 406try: 407 # Create a dedicated network name space to simulate remote host 408 subprocess.check_call(shlex.split("sudo ip netns add scapy")) 409 # Create a virtual Ethernet pair to connect default and new NS 410 subprocess.check_call(shlex.split("sudo ip link add type veth")) 411 # Move veth1 to the new NS 412 subprocess.check_call(shlex.split("sudo ip link set veth1 netns scapy")) 413 # Setup vNIC in the default NS 414 subprocess.check_call(shlex.split("sudo ip link set veth0 up")) 415 subprocess.check_call(shlex.split("sudo ip addr add 192.168.168.1/24 dev veth0")) 416 # Setup vNIC in the dedicated NS 417 subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo up")) 418 subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 up")) 419 subprocess.check_call(shlex.split("sudo ip netns exec scapy ip addr add 192.168.168.2/24 dev veth1")) 420 # Perform test 421 conf.route.resync() 422 res, unansw = sr(IP(dst='192.168.168.2') / ICMP(seq=(1, 1000)), timeout=1, verbose=False) 423finally: 424 try: 425 # Bring down the interfaces 426 subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 down")) 427 subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo down")) 428 # Delete the namespace 429 subprocess.check_call(shlex.split("sudo ip netns delete scapy")) 430 # Remove the virtual Ethernet pair 431 subprocess.check_call(shlex.split("sudo ip link delete veth0")) 432 except subprocess.CalledProcessError as e: 433 print(f"Error during cleanup: {e}") 434 435len(res) == 1000 436 437