1% send, sniff, sr* tests for Scapy 2 3~ netaccess 4 5############ 6############ 7+ Test bridge_and_sniff() using tap sockets 8 9~ tap linux 10 11= Create two tap interfaces 12 13import subprocess 14from threading import Thread 15 16tap0, tap1 = [TunTapInterface("tap%d" % i) for i in range(2)] 17 18if LINUX: 19 for i in range(2): 20 assert subprocess.check_call(["ip", "link", "set", "tap%d" % i, "up"]) == 0 21else: 22 for i in range(2): 23 assert subprocess.check_call(["ifconfig", "tap%d" % i, "up"]) == 0 24 25= Run a sniff thread on the tap1 **interface** 26* It will terminate when 5 IP packets from 1.2.3.4 have been sniffed 27t_sniff = Thread( 28 target=sniff, 29 kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, 30 "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"} 31) 32t_sniff.start() 33 34= Run a bridge_and_sniff thread between the taps **sockets** 35* It will terminate when 5 IP packets from 1.2.3.4 have been forwarded 36t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), 37 kwargs={"store": False, "count": 5, 'prn': Packet.summary, 38 "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) 39t_bridge.start() 40 41= Send five IP packets from 1.2.3.4 to the tap0 **interface** 42time.sleep(1) 43sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0", 44 count=5) 45 46= Wait for the threads 47t_bridge.join() 48t_sniff.join() 49 50= Run a sniff thread on the tap1 **interface** 51* It will terminate when 5 IP packets from 2.3.4.5 have been sniffed 52t_sniff = Thread( 53 target=sniff, 54 kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary, 55 "lfilter": lambda p: IP in p and p[IP].src == "2.3.4.5"} 56) 57t_sniff.start() 58 59= Run a bridge_and_sniff thread between the taps **sockets** 60* It will "NAT" packets from 1.2.3.4 to 2.3.4.5 and will terminate when 5 IP packets have been forwarded 61def nat_1_2(pkt): 62 if IP in pkt and pkt[IP].src == "1.2.3.4": 63 pkt[IP].src = "2.3.4.5" 64 del pkt[IP].chksum 65 return pkt 66 return False 67 68t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1), 69 kwargs={"store": False, "count": 5, 'prn': Packet.summary, 70 "xfrm12": nat_1_2, 71 "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) 72t_bridge.start() 73 74= Send five IP packets from 1.2.3.4 to the tap0 **interface** 75time.sleep(1) 76sendp([Ether(dst=ETHER_BROADCAST) / IP(src="1.2.3.4") / ICMP()], iface="tap0", 77 count=5) 78 79= Wait for the threads 80t_bridge.join() 81t_sniff.join() 82 83= Delete the tap interfaces 84del tap0, tap1 85 86 87############ 88############ 89+ Test bridge_and_sniff() using tun sockets 90 91~ tun linux not_pcapdnet 92 93= Create two tun interfaces 94 95import subprocess 96from threading import Thread 97 98tun0, tun1 = [TunTapInterface("tun%d" % i) for i in range(2)] 99 100if LINUX: 101 for i in range(2): 102 assert subprocess.check_call(["ip", "link", "set", "tun%d" % i, "up"]) == 0 103else: 104 for i in range(2): 105 assert subprocess.check_call(["ifconfig", "tun%d" % i, "up"]) == 0 106 107= Run a sniff thread on the tun1 **interface** 108* It will terminate when 5 IP packets from 1.2.3.4 have been sniffed 109t_sniff = Thread( 110 target=sniff, 111 kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, 112 "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"} 113) 114t_sniff.start() 115 116= Run a bridge_and_sniff thread between the tuns **sockets** 117* It will terminate when 5 IP packets from 1.2.3.4 have been forwarded. 118t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), 119 kwargs={"store": False, "count": 5, 'prn': Packet.summary, 120 "xfrm12": lambda pkt: pkt, 121 "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) 122t_bridge.start() 123 124= Send five IP packets from 1.2.3.4 to the tun0 **interface** 125time.sleep(1) 126conf.route.add(net="1.2.3.4/32", dev="tun0") 127send(IP(src="1.2.3.4", dst="1.2.3.4") / ICMP(), count=5) 128conf.route.delt(net="1.2.3.4/32", dev="tun0") 129 130= Wait for the threads 131t_bridge.join() 132t_sniff.join() 133 134= Run a sniff thread on the tun1 **interface** 135* It will terminate when 5 IP packets from 2.3.4.5 have been sniffed 136t_sniff = Thread( 137 target=sniff, 138 kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary, 139 "lfilter": lambda p: IP in p and p[IP].src == "2.3.4.5"} 140) 141t_sniff.start() 142 143= Run a bridge_and_sniff thread between the tuns **sockets** 144* It will "NAT" packets from 1.2.3.4 to 2.3.4.5 and will terminate when 5 IP packets have been forwarded 145def nat_1_2(pkt): 146 if IP in pkt and pkt[IP].src == "1.2.3.4": 147 pkt[IP].src = "2.3.4.5" 148 del pkt[IP].chksum 149 return pkt 150 return False 151 152t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1), 153 kwargs={"store": False, "count": 5, 'prn': Packet.summary, 154 "xfrm12": nat_1_2, 155 "lfilter": lambda p: IP in p and p[IP].src == "1.2.3.4"}) 156t_bridge.start() 157 158= Send five IP packets from 1.2.3.4 to the tun0 **interface** 159time.sleep(1) 160conf.route.add(net="1.2.3.4/32", dev="tun0") 161send(IP(src="1.2.3.4", dst="1.2.3.4") / ICMP(), count=5) 162conf.route.delt(net="1.2.3.4/32", dev="tun0") 163 164= Wait for the threads 165t_bridge.join() 166t_sniff.join() 167 168= Delete the tun interfaces 169del tun0, tun1 170