• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% tuntap tests for Scapy
2
3# Packet capture-based tests are in sendsniff.uts
4
5#######
6+ Test Linux-specific protocol headers for TunTap
7~ linux tun not_libpcap
8
9= Linux-specific protocol headers
10
11p = LinuxTunPacketInfo()/IP()
12assert p.type == 2048
13
14p = LinuxTunPacketInfo(raw(p))
15assert p.type == 2048
16assert isinstance(p.payload, IP)
17
18p = LinuxTunPacketInfo()/IPv6()
19assert p.type == 0x86dd
20
21p = LinuxTunPacketInfo(raw(p))
22assert p.type == 0x86dd
23
24assert isinstance(p.payload, IPv6)
25
26#######
27+ Test tun device
28
29~ tun needs_root not_libpcap
30
31= Create a tun interface
32
33import subprocess
34from threading import Thread
35
36tun0 = TunTapInterface("tun0", strip_packet_info=False)
37
38if LINUX:
39    assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0
40    assert subprocess.check_call([
41        "ip", "addr", "change",
42        "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
43elif BSD:
44    assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0
45    assert subprocess.check_call([
46        "ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0
47else:
48    raise NotImplementedError()
49
50conf.ifaces.reload()
51conf.route.resync()
52conf.route6.resync()
53
54= Setup ICMPEcho_am on the interface
55
56am = tun0.am(ICMPEcho_am, count=3)
57am.defoptsniff['timeout'] = 5
58t_am = Thread(target=am)
59t_am.start()
60
61= Send ping packets from OS into scapy
62
63send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
64
65= Cleanup
66
67t_am.join(timeout=3)
68
69tun0.close()
70
71#######
72+ Test strip_packet_info=False on Linux
73
74~ tun linux needs_root not_libpcap
75
76= Create a tun interface
77
78if not LINUX:
79    raise NotImplementedError()
80
81import subprocess
82
83tun0 = TunTapInterface("tun0", strip_packet_info=False)
84
85assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0
86assert subprocess.check_call([
87    "ip", "addr", "change",
88    "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
89
90conf.ifaces.reload()
91conf.route.resync()
92conf.route6.resync()
93
94= Send ping packets from Linux into Scapy
95
96def cb():
97    send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
98
99t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3)
100t.start()
101t.join(timeout=3)
102
103assert len(t.results) >= 3
104icmp4_sequences = set()
105
106for pkt in t.results:
107    pkt
108    assert isinstance(pkt, LinuxTunPacketInfo)
109    if not isinstance(pkt.payload, IP) or ICMP not in pkt:
110        # We might get IPv6 router solicitation or other traffic...
111        continue
112    if pkt[IP].src != '192.0.2.1' or pkt[IP].dst != '192.0.2.2' or pkt[ICMP].type != 8:
113        continue
114    icmp4_sequences.add(pkt.seq)
115
116# Expect to get 3 different ICMP sequence numbers
117assert len(icmp4_sequences) == 3
118
119= Delete the tun interface
120tun0.close()
121
122+ Test strip_packet_info=True and IPv6
123
124~ tun needs_root ipv6 not_libpcap
125
126= Create a tun interface with IPv4 + IPv6
127
128import subprocess
129
130tun0 = TunTapInterface("tun0", strip_packet_info=True)
131
132if LINUX:
133    assert subprocess.check_call(["ip", "link", "set", "tun0", "up"]) == 0
134    assert subprocess.check_call([
135        "ip", "addr", "change",
136        "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"]) == 0
137    assert subprocess.check_call([
138        "ip", "-6", "addr", "add",
139        "2001:db8::1", "peer", "2001:db8::2", "dev", "tun0"]) == 0
140elif BSD:
141    assert subprocess.check_call(["ifconfig", "tun0", "up"]) == 0
142    assert subprocess.check_call([
143        "ifconfig", "tun0", "192.0.2.1", "192.0.2.2"]) == 0
144    assert subprocess.check_call([
145        "ifconfig", "tun0", "inet6", "2001:db8::1/128", "2001:db8::2"]) == 0
146else:
147    raise NotImplementedError()
148
149conf.ifaces.reload()
150conf.route.resync()
151conf.route6.resync()
152
153= Send ping packets from OS into Scapy
154
155def cb():
156    send(IP(dst="192.0.2.2")/ICMP(seq=(1,3)))
157    send(IPv6(dst="2001:db8::2")/ICMPv6EchoRequest(seq=(1,3)))
158
159t = AsyncSniffer(opened_socket=tun0, lfilter=lambda x: ICMP in x or ICMPv6EchoRequest in x, started_callback=cb, count=6)
160t.start()
161t.join(timeout=3)
162
163assert len(t.results) >= 6
164icmp4_sequences = set()
165icmp6_sequences = set()
166
167for pkt in t.results:
168    pkt
169    assert isinstance(pkt, (IP, IPv6))
170    if (isinstance(pkt, IP) and
171        pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and
172        ICMP in pkt and pkt[ICMP].type == 8):
173        icmp4_sequences.add(pkt[ICMP].seq)
174    if (isinstance(pkt, IPv6) and
175        pkt[IPv6].src == "2001:db8::1" and pkt[IPv6].dst == "2001:db8::2" and
176        ICMPv6EchoRequest in pkt):
177        icmp6_sequences.add(pkt[ICMPv6EchoRequest].seq)
178
179# Expect to get 3 different ICMP sequence numbers
180assert len(icmp4_sequences) == 3, (
181    "Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences))
182assert len(icmp6_sequences) == 3, (
183    "Expected 3 IPv6 ICMP ping packets, got: " + repr(icmp6_sequences))
184
185= Delete the tun interface
186tun0.close()
187
188+ Test tap interfaces
189
190~ tap needs_root not_libpcap
191
192= Create a tap interface with IPv4
193
194import subprocess
195
196tap0 = TunTapInterface("tap0")
197
198if LINUX:
199    assert subprocess.check_call(["ip", "link", "set", "tap0", "up"]) == 0
200    assert subprocess.check_call([
201        "ip", "addr", "change", "192.0.2.1/30", "dev", "tap0"]) == 0
202    assert subprocess.check_call([
203        "ip", "neigh", "replace",
204        "192.0.2.2", "lladdr", "20:00:00:20:00:00", "dev", "tap0"]) == 0
205else:
206    assert subprocess.check_call(["ifconfig", "tap0", "up"]) == 0
207    assert subprocess.check_call([
208        "ifconfig", "tap0", "192.0.2.1", "netmask", "255.255.255.252"]) == 0
209    assert subprocess.check_call([
210        "arp", "-s", "192.0.2.2", "20:00:00:20:00:00", "temp"]) == 0
211
212conf.ifaces.reload()
213conf.route.resync()
214conf.route6.resync()
215
216= Send ping packets from OS into Scapy
217
218conf.ifaces
219conf.route
220
221def cb():
222    sendp(Ether(dst="ff:ff:ff:ff:ff:ff")/IP(dst="192.0.2.2")/ICMP(seq=(1,3)), iface="tap0")
223
224t = AsyncSniffer(opened_socket=tap0, lfilter=lambda x: ICMP in x, started_callback=cb, count=3)
225t.start()
226t.join(timeout=3)
227
228assert len(t.results) >= 3
229icmp4_sequences = set()
230
231for pkt in t.results:
232    pkt
233    assert isinstance(pkt, Ether)
234    if (IP in pkt and
235        pkt[IP].src == "192.0.2.1" and pkt[IP].dst == "192.0.2.2" and
236        ICMP in pkt and pkt[ICMP].type == 8):
237        icmp4_sequences.add(pkt[ICMP].seq)
238
239# Expect to get 3 different ICMP sequence numbers
240assert len(icmp4_sequences) == 3, (
241    "Expected 3 IPv4 ICMP ping packets, got: " + repr(icmp4_sequences))
242
243= Delete the tap interface
244tap0.close()
245
246+ Refresh interfaces
247~ linux tun tap not_libpcap
248
249= Cleanup
250
251conf.ifaces.reload()
252conf.route.resync()
253conf.route6.resync()
254