% tuntap tests for Scapy # Packet capture-based tests are in sendsniff.uts ####### + Test Linux-specific protocol headers for TunTap ~ linux tun not_libpcap = Linux-specific protocol headers p = LinuxTunPacketInfo()/IP() assert p.type == 2048 p = LinuxTunPacketInfo(raw(p)) assert p.type == 2048 assert isinstance(p.payload, IP) p = LinuxTunPacketInfo()/IPv6() assert p.type == 0x86dd p = LinuxTunPacketInfo(raw(p)) assert p.type == 0x86dd assert isinstance(p.payload, IPv6) ####### + Test tun device ~ tun needs_root not_libpcap = Create a tun interface import subprocess from threading import Thread tun0 = TunTapInterface("tun0", strip_packet_info=False) if LINUX: assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0 assert subprocess.check_call([ "ip", "addr", "change", "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0 elif BSD: assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0 assert subprocess.check_call([ "ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0 else: raise NotImplementedError() conf.ifaces.reload() conf.route.resync() conf.route6.resync() = Setup ICMPEcho_am on the interface am = tun0.am(ICMPEcho_am, count=3) am.defoptsniff['timeout'] = 5 t_am = Thread(target=am) t_am.start() = Send ping packets from OS into scapy send(IP(dst="192.0.2.2")/ICMP(seq=(1,3))) = Cleanup t_am.join(timeout=3) tun0.close() ####### + Test strip_packet_info=False on Linux ~ tun linux needs_root not_libpcap = Create a tun interface if not LINUX: raise NotImplementedError() import subprocess tun0 = TunTapInterface("tun0", strip_packet_info=False) assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0 assert subprocess.check_call([ "ip", "addr", "change", "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0 conf.ifaces.reload() conf.route.resync() conf.route6.resync() = Send ping packets from Linux into Scapy def cb(): send(IP(dst="192.0.2.2")/ICMP(seq=(1,3))) t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3) t.start() t.join(timeout=3) assert len(t.results) >= 3 icmp4_sequences = set() for pkt in t.results: pkt assert isinstance(pkt, LinuxTunPacketInfo) if not isinstance(pkt.payload, IP) or ICMP not in pkt: # We might get IPv6 router solicitation or other traffic... continue if pkt[IP].src != '192.0.2.1' or pkt[IP].dst != '192.0.2.2' or pkt[ICMP].type != 8: continue icmp4_sequences.add(pkt.seq) # Expect to get 3 different ICMP sequence numbers assert len(icmp4_sequences) == 3 = Delete the tun interface tun0.close() + Test strip_packet_info=True and IPv6 ~ tun needs_root ipv6 not_libpcap = Create a tun interface with IPv4 + IPv6 import subprocess tun0 = TunTapInterface("tun0", strip_packet_info=True) if LINUX: assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0 assert subprocess.check_call([ "ip", "addr", "change", "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0 assert subprocess.check_call([ "ip", "-6", "addr", "add", "2001:db8::1", "peer", "2001:db8::2", "dev", "tun0"]) == 0 elif BSD: assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0 assert subprocess.check_call([ "ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0 assert subprocess.check_call([ "ifconfig", "tun0", "inet6", "2001:db8::1/128", "2001:db8::2"]) == 0 else: raise NotImplementedError() conf.ifaces.reload() conf.route.resync() conf.route6.resync() = Send ping packets from OS into Scapy def cb(): send(IP(dst="192.0.2.2")/ICMP(seq=(1,3))) send(IPv6(dst="2001:db8::2")/ICMPv6EchoRequest(seq=(1,3))) t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x or ICMPv6EchoRequest in x, started_callback=cb, count=6) t.start() t.join(timeout=3) assert len(t.results) >= 6 icmp4_sequences = set() icmp6_sequences = set() for pkt in t.results: pkt assert isinstance(pkt, (IP, IPv6)) if (isinstance(pkt, IP) and pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and ICMP in pkt and pkt[ICMP].type == 8): icmp4_sequences.add(pkt[ICMP].seq) if (isinstance(pkt, IPv6) and pkt[IPv6].src == "2001:db8::1" and pkt[IPv6].dst == "2001:db8::2" and ICMPv6EchoRequest in pkt): icmp6_sequences.add(pkt[ICMPv6EchoRequest].seq) # Expect to get 3 different ICMP sequence numbers assert len(icmp4_sequences) == 3, ( "Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences)) assert len(icmp6_sequences) == 3, ( "Expected 3 IPv6 ICMP ping packets, got: " + repr(icmp6_sequences)) = Delete the tun interface tun0.close() + Test tap interfaces ~ tap needs_root not_libpcap = Create a tap interface with IPv4 import subprocess tap0 = TunTapInterface("tap0") if LINUX: assert subprocess.check_call(["ip", "link", "set", "tap0", "up"]) == 0 assert subprocess.check_call([ "ip", "addr", "change", "192.0.2.1/30", "dev", "tap0"]) == 0 assert subprocess.check_call([ "ip", "neigh", "replace", "192.0.2.2", "lladdr", "20:00:00:20:00:00", "dev", "tap0"]) == 0 else: assert subprocess.check_call(["ifconfig", "tap0", "up"]) == 0 assert subprocess.check_call([ "ifconfig", "tap0", "192.0.2.1", "netmask", "255.255.255.252"]) == 0 assert subprocess.check_call([ "arp", "-s", "192.0.2.2", "20:00:00:20:00:00", "temp"]) == 0 conf.ifaces.reload() conf.route.resync() conf.route6.resync() = Send ping packets from OS into Scapy conf.ifaces conf.route def cb(): sendp(Ether(dst="ff:ff:ff:ff:ff:ff")/IP(dst="192.0.2.2")/ICMP(seq=(1,3)), iface="tap0") t = AsyncSniffer(opened_socket=tap0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3) t.start() t.join(timeout=3) assert len(t.results) >= 3 icmp4_sequences = set() for pkt in t.results: pkt assert isinstance(pkt, Ether) if (IP in pkt and pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and ICMP in pkt and pkt[ICMP].type == 8): icmp4_sequences.add(pkt[ICMP].seq) # Expect to get 3 different ICMP sequence numbers assert len(icmp4_sequences) == 3, ( "Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences)) = Delete the tap interface tap0.close() + Refresh interfaces ~ linux tun tap not_libpcap = Cleanup conf.ifaces.reload() conf.route.resync() conf.route6.resync()