• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% send, sniff, sr* tests for Scapy
2
3~ needs_root
4
5############
6############
7+ Test bridge_and_sniff() using tap sockets
8
9~ tap
10
11= Create two tap interfaces
12
13import subprocess
14from threading import Thread
15
16tap0, tap1 = [TunTapInterface("tap%d" % i) for i in range(2)]
17
18chk_kwargs = {"timeout": 3}
19
20if LINUX:
21    for i in range(2):
22        assert subprocess.check_call(["ip", "link", "set", "tap%d" % i, "up"], **chk_kwargs) == 0
23else:
24    for i in range(2):
25        assert subprocess.check_call(["ifconfig", "tap%d" % i, "up"], **chk_kwargs) == 0
26
27= Run a sniff thread on the tap1 **interface**
28* It will terminate when 5 IP packets from 192.0.2.1 have been sniffed
29started = threading.Event()
30t_sniff = Thread(
31    target=sniff,
32    kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary,
33            "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
34            "started_callback": started.set},
35    name="tests sniff 1")
36t_sniff.start()
37started.wait(timeout=5)
38
39= Run a bridge_and_sniff thread between the taps **sockets**
40* It will terminate when 5 IP packets from 192.0.2.1 have been forwarded
41started = threading.Event()
42t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1),
43                  kwargs={"store": False, "count": 5, 'prn': Packet.summary,
44                          "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
45                          "started_callback": started.set},
46                  name="tests bridge_and_sniff 1")
47t_bridge.start()
48started.wait(timeout=5)
49
50= Send five IP packets from 192.0.2.1 to the tap0 **interface**
51sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0",
52      count=5)
53
54= Wait for the threads
55t_bridge.join(5)
56t_sniff.join(5)
57assert not t_bridge.is_alive()
58assert not t_sniff.is_alive()
59
60= Run a sniff thread on the tap1 **interface**
61* It will terminate when 5 IP packets from 198.51.100.1 have been sniffed
62started = threading.Event()
63t_sniff = Thread(
64    target=sniff,
65    kwargs={"iface": "tap1", "count": 5, "prn": Packet.summary,
66            "lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1",
67            "started_callback": started.set},
68    name="tests sniff 2")
69t_sniff.start()
70started.wait(timeout=5)
71
72= Run a bridge_and_sniff thread between the taps **sockets**
73* It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded
74def nat_1_2(pkt):
75    if IP in pkt and pkt[IP].src == "192.0.2.1":
76        pkt[IP].src = "198.51.100.1"
77        del pkt[IP].chksum
78        return pkt
79    return False
80
81started = threading.Event()
82t_bridge = Thread(target=bridge_and_sniff, args=(tap0, tap1),
83                  kwargs={"store": False, "count": 5, 'prn': Packet.summary,
84                          "xfrm12": nat_1_2,
85                          "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
86                          "started_callback": started.set},
87                  name="tests bridge_and_sniff 2")
88t_bridge.start()
89started.wait(timeout=5)
90
91= Send five IP packets from 192.0.2.1 to the tap0 **interface**
92sendp([Ether(dst=ETHER_BROADCAST) / IP(src="192.0.2.1") / ICMP()], iface="tap0",
93      count=5)
94
95= Wait for the threads
96t_bridge.join(5)
97t_sniff.join(5)
98assert not t_bridge.is_alive()
99assert not t_sniff.is_alive()
100
101= Delete the tap interfaces
102if conf.use_pypy:
103    # See https://pypy.readthedocs.io/en/latest/cpython_differences.html
104    tap0.close()
105    tap1.close()
106else:
107    del tap0, tap1
108
109
110############
111############
112+ Test bridge_and_sniff() using tun sockets
113
114~ tun not_libpcap
115
116= Create two tun interfaces
117
118import subprocess
119from threading import Thread
120
121tun0, tun1 = [TunTapInterface("tun%d" % i) for i in range(2)]
122
123chk_kwargs = {"timeout": 3}
124
125if LINUX:
126    for i in range(2):
127        assert subprocess.check_call(["ip", "link", "set", "tun%d" % i, "up"], **chk_kwargs) == 0
128    assert subprocess.check_call([
129        "ip", "addr", "change",
130        "192.0.2.1", "peer", "192.0.2.2", "dev", "tun0"], **chk_kwargs) == 0
131else:
132    for i in range(2):
133        assert subprocess.check_call(["ifconfig", "tun%d" % i, "up"], **chk_kwargs) == 0
134    assert subprocess.check_call(["ifconfig", "tun0", "192.0.2.1", "192.0.2.2"], **chk_kwargs) == 0
135
136= Run a sniff thread on the tun1 **interface**
137* It will terminate when 5 IP packets from 192.0.2.1 have been sniffed
138started = threading.Event()
139t_sniff = Thread(target=sniff,
140                 kwargs={"iface": "tun1", "count": 5,
141                         "prn": Packet.summary,
142                         "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
143                         "started_callback": started.set},
144                 name="tests sniff 3")
145
146t_sniff.start()
147started.wait(timeout=5)
148
149= Run a bridge_and_sniff thread between the tuns **sockets**
150* It will terminate when 5 IP packets from 192.0.2.1 have been forwarded.
151started = threading.Event()
152t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1),
153                  kwargs={"store": False, "count": 5, 'prn': Packet.summary,
154                          "xfrm12": lambda pkt: pkt,
155                          "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
156                          "started_callback": started.set},
157                          name="tests bridge_and_sniff 3")
158t_bridge.start()
159started.wait(timeout=5)
160
161= Send five IP packets from 192.0.2.1 to the tun0 **interface**
162conf.route.add(net="192.0.2.2/32", dev="tun0")
163send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0")
164conf.route.delt(net="192.0.2.2/32", dev="tun0")
165
166= Wait for the threads
167t_bridge.join(5)
168t_sniff.join(5)
169assert not t_bridge.is_alive()
170assert not t_sniff.is_alive()
171
172= Run a sniff thread on the tun1 **interface**
173* It will terminate when 5 IP packets from 198.51.100.1 have been sniffed
174started = threading.Event()
175t_sniff = Thread(target=sniff,
176                 kwargs={"iface": "tun1", "count": 5, "prn": Packet.summary,
177                         "lfilter": lambda p: IP in p and p[IP].src == "198.51.100.1",
178                         "started_callback": started.set},
179                 name="tests sniff 4")
180
181t_sniff.start()
182started.wait(timeout=5)
183
184= Run a bridge_and_sniff thread between the tuns **sockets**
185* It will "NAT" packets from 192.0.2.1 to 198.51.100.1 and will terminate when 5 IP packets have been forwarded
186def nat_1_2(pkt):
187    if IP in pkt and pkt[IP].src == "192.0.2.1":
188        pkt[IP].src = "198.51.100.1"
189        del pkt[IP].chksum
190        return pkt
191    return False
192
193started = threading.Event()
194t_bridge = Thread(target=bridge_and_sniff, args=(tun0, tun1),
195                  kwargs={"store": False, "count": 5, 'prn': Packet.summary,
196                          "xfrm12": nat_1_2,
197                          "lfilter": lambda p: IP in p and p[IP].src == "192.0.2.1",
198                          "started_callback": started.set},
199                          name="tests bridge_and_sniff 4")
200t_bridge.start()
201started.wait(timeout=5)
202
203= Send five IP packets from 192.0.2.1 to the tun0 **interface**
204conf.route.add(net="192.0.2.2/32", dev="tun0")
205send([IP(src="192.0.2.1", dst="192.0.2.2") / ICMP()], count=5, iface="tun0")
206conf.route.delt(net="192.0.2.2/32", dev="tun0")
207
208= Wait for the threads
209t_bridge.join(5)
210t_sniff.join(5)
211assert not t_bridge.is_alive()
212assert not t_sniff.is_alive()
213
214= Delete the tun interfaces
215if conf.use_pypy:
216    # See https://pypy.readthedocs.io/en/latest/cpython_differences.html
217    tun0.close()
218    tun1.close()
219else:
220    del tun0, tun1
221
222
223############
224############
225+ Test bridge_and_sniff() using veth pairs
226~ linux needs_root veth
227
228= Ensure bridge_and_sniff does not close sockets if data is send within xfrm on ingress interface
229
230from scapy.arch.linux import VEthPair
231
232with VEthPair('a_0', 'a_1') as veth_0:
233    with VEthPair('b_0', 'b_1') as veth_1:
234        xfrm_count = {
235            'a_0':0,
236            'b_0': 0
237        }
238        def xfrm_x(pkt):
239            pkt_tx = pkt.copy()
240            ether_lyr = pkt_tx[Ether]
241            ether_lyr.type = 0x1234  # we send to peer interface - avoid loop
242            # send on receiving interface - triggers return None on recv() in L2Socket
243            sendp(pkt_tx, iface=pkt.sniffed_on)
244            global xfrm_count
245            xfrm_count[pkt.sniffed_on] = xfrm_count[pkt.sniffed_on] + 1
246            return True
247        started = threading.Event()
248        t_bridge = Thread(target=bridge_and_sniff,
249                          args=('a_0', 'b_0'),
250                          kwargs={
251                              'xfrm12': xfrm_x,
252                              'xfrm21': xfrm_x,
253                              'store': False,
254                              'count': 4,
255                              'lfilter': lambda p: Ether in p and p[Ether].type == 0xbeef,
256                              "started_callback": started.set},
257                          name="tests bridge_and_sniff VEthPair")
258        t_bridge.start()
259        started.wait(timeout=5)
260        # send frames in both directions
261        for if_name in ['a_1', 'b_1', 'a_1', 'b_1']:
262            sendp([Ether(type=0xbeef) /
263                   Raw(b'On a scale from one to ten what is your favourite colour of the alphabet?')],
264                  iface=if_name)
265        t_bridge.join(1)
266        # now test of the socket used in bridge_and_sniff() was alive all the time
267        assert (xfrm_count['a_0'] == 2)
268        assert (xfrm_count['b_0'] == 2)
269
270
271############
272############
273+ Test arpleak() using a tap socket
274
275~ tap tcpdump
276
277= Create a tap interface
278
279from unittest import mock
280import struct
281import subprocess
282from threading import Thread
283import time
284
285tap0 = TunTapInterface("tap0")
286
287chk_kwargs = {"timeout": 3}
288
289if LINUX:
290    assert subprocess.check_call(["ip", "link", "set", "tap0", "up"], **chk_kwargs) == 0
291else:
292    assert subprocess.check_call(["ifconfig", "tap0", "up"], **chk_kwargs) == 0
293
294= Check for arpleak
295
296def answer_arp_leak(pkt):
297    mymac = b"\x00\x01\x02\x03\x04\x06"
298    myip = b"\xc0\x00\x02\x02" # 192.0.2.2
299    if not ARP in pkt:
300        return
301    e_src = pkt.src
302    pkt = raw(pkt[ARP])
303    if pkt[:4] != b'\x00\x01\x08\x00':
304        print("Invalid ARP")
305        return
306    hwlen, plen, op = struct.unpack('>BBH', pkt[4:8])
307    if op != 1:
308        print("Invalid ARP op")
309        return
310    fmt = ('%ds%ds' % (hwlen, plen)) * 2
311    hwsrc, psrc, hwdst, pdst = struct.unpack(fmt,
312                                             pkt[8:8 + (plen + hwlen) * 2])
313    if pdst[:4] != myip[:plen]:
314        print("Invalid ARP pdst %r" % pdst)
315        return
316    ans = Ether(dst=e_src, src=mymac, type=0x0806)
317    ans /= (b'\x00\x01\x08\x00' +
318            struct.pack('>BBH' + fmt,
319                        hwlen, plen, 2, mymac, myip, hwsrc, psrc))
320    tap0.send(ans)
321    print('Answered!')
322
323started = threading.Event()
324t_answer = Thread(
325    target=sniff,
326    kwargs={"prn": answer_arp_leak, "timeout": 10, "store": False,
327            "opened_socket": tap0,
328            "started_callback": started.set},
329    name="tests answer_arp_leak")
330
331t_answer.start()
332started.wait(timeout=5)
333
334@mock.patch("scapy.layers.l2.get_if_addr")
335@mock.patch("scapy.layers.l2.get_if_hwaddr")
336def test_arpleak(mock_get_if_hwaddr, mock_get_if_addr, hwlen=255, plen=255):
337    conf.route.ifadd("tap0", "192.0.2.0/24")
338    mock_get_if_addr.side_effect = lambda _: "192.0.2.1"
339    mock_get_if_hwaddr.side_effect = lambda _: "00:01:02:03:04:05"
340    return arpleak("192.0.2.2/31", timeout=2, hwlen=hwlen, plen=plen)
341
342ans, unans = test_arpleak()
343assert len(ans) == 1
344assert len(unans) == 1
345ans, unans = test_arpleak(hwlen=6)
346assert len(ans) == 1
347assert len(unans) == 1
348ans, unans = test_arpleak(plen=4)
349assert len(ans) == 1
350assert len(unans) == 1
351
352t_answer.join(15)
353
354if t_answer.is_alive():
355    raise Exception("Test timed out")
356
357if conf.use_pypy:
358    # See https://pypy.readthedocs.io/en/latest/cpython_differences.html
359    tap0.close()
360else:
361    del tap0
362
363#####
364#####
365+ Test sr() on multiple interfaces
366
367= Setup multiple linux interfaces and ranges
368~ linux needs_root dbg
369
370import os
371exit_status = os.system("ip netns add blob0")
372exit_status |= os.system("ip netns add blob1")
373exit_status |= os.system("ip link add name scapy0.0 type veth peer name scapy0.1")
374exit_status |= os.system("ip link add name scapy1.0 type veth peer name scapy1.1")
375exit_status |= os.system("ip link set scapy0.1 netns blob0 up")
376exit_status |= os.system("ip link set scapy1.1 netns blob1 up")
377exit_status |= os.system("ip addr add 100.64.2.1/24 dev scapy0.0")
378exit_status |= os.system("ip addr add 100.64.3.1/24 dev scapy1.0")
379exit_status |= os.system("ip --netns blob0 addr add 100.64.2.2/24 dev scapy0.1")
380exit_status |= os.system("ip --netns blob1 addr add 100.64.3.2/24 dev scapy1.1")
381exit_status |= os.system("ip link set scapy0.0 up")
382exit_status |= os.system("ip link set scapy1.0 up")
383assert exit_status == 0
384
385conf.ifaces.reload()
386conf.route.resync()
387
388try:
389    pkts = sr(IP(dst=["100.64.2.2", "100.64.3.2"])/ICMP(), timeout=1)[0]
390    assert len(pkts) == 2
391    assert pkts[0].answer.src in ["100.64.2.2", "100.64.3.2"]
392    assert pkts[1].answer.src in ["100.64.2.2", "100.64.3.2"]
393finally:
394    e = os.system("ip netns del blob0")
395    e = os.system("ip netns del blob1")
396    conf.ifaces.reload()
397    conf.route.resync()
398
399
400= sr() performance test
401~ linux needs_root veth not_pypy
402
403import subprocess
404import shlex
405
406try:
407    # Create a dedicated network name space to simulate remote host
408    subprocess.check_call(shlex.split("sudo ip netns add scapy"))
409    # Create a virtual Ethernet pair to connect default and new NS
410    subprocess.check_call(shlex.split("sudo ip link add type veth"))
411    # Move veth1 to the new NS
412    subprocess.check_call(shlex.split("sudo ip link set veth1 netns scapy"))
413    # Setup vNIC in the default NS
414    subprocess.check_call(shlex.split("sudo ip link set veth0 up"))
415    subprocess.check_call(shlex.split("sudo ip addr add 192.168.168.1/24 dev veth0"))
416    # Setup vNIC in the dedicated NS
417    subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo up"))
418    subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 up"))
419    subprocess.check_call(shlex.split("sudo ip netns exec scapy ip addr add 192.168.168.2/24 dev veth1"))
420    # Perform test
421    conf.route.resync()
422    res, unansw = sr(IP(dst='192.168.168.2') / ICMP(seq=(1, 1000)), timeout=1, verbose=False)
423finally:
424    try:
425        # Bring down the interfaces
426        subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set veth1 down"))
427        subprocess.check_call(shlex.split("sudo ip netns exec scapy ip link set lo down"))
428        # Delete the namespace
429        subprocess.check_call(shlex.split("sudo ip netns delete scapy"))
430        # Remove the virtual Ethernet pair
431        subprocess.check_call(shlex.split("sudo ip link delete veth0"))
432    except subprocess.CalledProcessError as e:
433        print(f"Error during cleanup: {e}")
434
435len(res) == 1000
436
437