1% tuntap tests for Scapy 2 3# Packet capture-based tests are in sendsniff.uts 4 5####### 6+ Test Linux-specific protocol headers for TunTap 7~ linux tun not_libpcap 8 9= Linux-specific protocol headers 10 11p = LinuxTunPacketInfo()/IP() 12assert p.type == 2048 13 14p = LinuxTunPacketInfo(raw(p)) 15assert p.type == 2048 16assert isinstance(p.payload, IP) 17 18p = LinuxTunPacketInfo()/IPv6() 19assert p.type == 0x86dd 20 21p = LinuxTunPacketInfo(raw(p)) 22assert p.type == 0x86dd 23 24assert isinstance(p.payload, IPv6) 25 26####### 27+ Test tun device 28 29~ tun needs_root not_libpcap 30 31= Create a tun interface 32 33import subprocess 34from threading import Thread 35 36tun0 = TunTapInterface("tun0", strip_packet_info=False) 37 38if LINUX: 39 assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0 40 assert subprocess.check_call([ 41 "ip", "addr", "change", 42 "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0 43elif BSD: 44 assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0 45 assert subprocess.check_call([ 46 "ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0 47else: 48 raise NotImplementedError() 49 50conf.ifaces.reload() 51conf.route.resync() 52conf.route6.resync() 53 54= Setup ICMPEcho_am on the interface 55 56am = tun0.am(ICMPEcho_am, count=3) 57am.defoptsniff['timeout'] = 5 58t_am = Thread(target=am) 59t_am.start() 60 61= Send ping packets from OS into scapy 62 63send(IP(dst="192.0.2.2")/ICMP(seq=(1,3))) 64 65= Cleanup 66 67t_am.join(timeout=3) 68 69tun0.close() 70 71####### 72+ Test strip_packet_info=False on Linux 73 74~ tun linux needs_root not_libpcap 75 76= Create a tun interface 77 78if not LINUX: 79 raise NotImplementedError() 80 81import subprocess 82 83tun0 = TunTapInterface("tun0", strip_packet_info=False) 84 85assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0 86assert subprocess.check_call([ 87 "ip", "addr", "change", 88 "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0 89 90conf.ifaces.reload() 91conf.route.resync() 92conf.route6.resync() 93 94= Send ping packets from Linux into Scapy 95 96def cb(): 97 send(IP(dst="192.0.2.2")/ICMP(seq=(1,3))) 98 99t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3) 100t.start() 101t.join(timeout=3) 102 103assert len(t.results) >= 3 104icmp4_sequences = set() 105 106for pkt in t.results: 107 pkt 108 assert isinstance(pkt, LinuxTunPacketInfo) 109 if not isinstance(pkt.payload, IP) or ICMP not in pkt: 110 # We might get IPv6 router solicitation or other traffic... 111 continue 112 if pkt[IP].src != '192.0.2.1' or pkt[IP].dst != '192.0.2.2' or pkt[ICMP].type != 8: 113 continue 114 icmp4_sequences.add(pkt.seq) 115 116# Expect to get 3 different ICMP sequence numbers 117assert len(icmp4_sequences) == 3 118 119= Delete the tun interface 120tun0.close() 121 122+ Test strip_packet_info=True and IPv6 123 124~ tun needs_root ipv6 not_libpcap 125 126= Create a tun interface with IPv4 + IPv6 127 128import subprocess 129 130tun0 = TunTapInterface("tun0", strip_packet_info=True) 131 132if LINUX: 133 assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0 134 assert subprocess.check_call([ 135 "ip", "addr", "change", 136 "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0 137 assert subprocess.check_call([ 138 "ip", "-6", "addr", "add", 139 "2001:db8::1", "peer", "2001:db8::2", "dev", "tun0"]) == 0 140elif BSD: 141 assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0 142 assert subprocess.check_call([ 143 "ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0 144 assert subprocess.check_call([ 145 "ifconfig", "tun0", "inet6", "2001:db8::1/128", "2001:db8::2"]) == 0 146else: 147 raise NotImplementedError() 148 149conf.ifaces.reload() 150conf.route.resync() 151conf.route6.resync() 152 153= Send ping packets from OS into Scapy 154 155def cb(): 156 send(IP(dst="192.0.2.2")/ICMP(seq=(1,3))) 157 send(IPv6(dst="2001:db8::2")/ICMPv6EchoRequest(seq=(1,3))) 158 159t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x or ICMPv6EchoRequest in x, started_callback=cb, count=6) 160t.start() 161t.join(timeout=3) 162 163assert len(t.results) >= 6 164icmp4_sequences = set() 165icmp6_sequences = set() 166 167for pkt in t.results: 168 pkt 169 assert isinstance(pkt, (IP, IPv6)) 170 if (isinstance(pkt, IP) and 171 pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and 172 ICMP in pkt and pkt[ICMP].type == 8): 173 icmp4_sequences.add(pkt[ICMP].seq) 174 if (isinstance(pkt, IPv6) and 175 pkt[IPv6].src == "2001:db8::1" and pkt[IPv6].dst == "2001:db8::2" and 176 ICMPv6EchoRequest in pkt): 177 icmp6_sequences.add(pkt[ICMPv6EchoRequest].seq) 178 179# Expect to get 3 different ICMP sequence numbers 180assert len(icmp4_sequences) == 3, ( 181 "Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences)) 182assert len(icmp6_sequences) == 3, ( 183 "Expected 3 IPv6 ICMP ping packets, got: " + repr(icmp6_sequences)) 184 185= Delete the tun interface 186tun0.close() 187 188+ Test tap interfaces 189 190~ tap needs_root not_libpcap 191 192= Create a tap interface with IPv4 193 194import subprocess 195 196tap0 = TunTapInterface("tap0") 197 198if LINUX: 199 assert subprocess.check_call(["ip", "link", "set", "tap0", "up"]) == 0 200 assert subprocess.check_call([ 201 "ip", "addr", "change", "192.0.2.1/30", "dev", "tap0"]) == 0 202 assert subprocess.check_call([ 203 "ip", "neigh", "replace", 204 "192.0.2.2", "lladdr", "20:00:00:20:00:00", "dev", "tap0"]) == 0 205else: 206 assert subprocess.check_call(["ifconfig", "tap0", "up"]) == 0 207 assert subprocess.check_call([ 208 "ifconfig", "tap0", "192.0.2.1", "netmask", "255.255.255.252"]) == 0 209 assert subprocess.check_call([ 210 "arp", "-s", "192.0.2.2", "20:00:00:20:00:00", "temp"]) == 0 211 212conf.ifaces.reload() 213conf.route.resync() 214conf.route6.resync() 215 216= Send ping packets from OS into Scapy 217 218conf.ifaces 219conf.route 220 221def cb(): 222 sendp(Ether(dst="ff:ff:ff:ff:ff:ff")/IP(dst="192.0.2.2")/ICMP(seq=(1,3)), iface="tap0") 223 224t = AsyncSniffer(opened_socket=tap0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3) 225t.start() 226t.join(timeout=3) 227 228assert len(t.results) >= 3 229icmp4_sequences = set() 230 231for pkt in t.results: 232 pkt 233 assert isinstance(pkt, Ether) 234 if (IP in pkt and 235 pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and 236 ICMP in pkt and pkt[ICMP].type == 8): 237 icmp4_sequences.add(pkt[ICMP].seq) 238 239# Expect to get 3 different ICMP sequence numbers 240assert len(icmp4_sequences) == 3, ( 241 "Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences)) 242 243= Delete the tap interface 244tap0.close() 245 246+ Refresh interfaces 247~ linux tun tap not_libpcap 248 249= Cleanup 250 251conf.ifaces.reload() 252conf.route.resync() 253conf.route6.resync() 254