• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for Linux only
2
3# More information at http://www.secdev.org/projects/UTscapy/
4
5
6############
7############
8
9+ Linux only test
10
11= L3RawSocket
12~ netaccess IP TCP linux needs_root
13
14with no_debug_dissector():
15    x = sr1(IP(dst="www.google.com")/TCP(sport=RandShort(), dport=80, flags="S"),timeout=3)
16
17x
18assert x[IP].ottl() in [32, 64, 128, 255]
19assert 0 <= x[IP].hops() <= 126
20
21# TODO: fix this test (randomly stuck)
22# ex: https://travis-ci.org/secdev/scapy/jobs/247473497
23
24#= Supersocket _flush_fd
25#~ needs_root linux
26#
27#import select
28#
29#from scapy.arch.linux import _flush_fd
30#socket = conf.L2listen()
31#select.select([socket],[],[],2)
32#_flush_fd(socket.ins)
33
34= Interface aliases & sub-interfaces
35~ linux needs_root
36
37import os
38exit_status = os.system("ip link add name scapy0 type dummy")
39exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0")
40exit_status = os.system("ip link set scapy0 up")
41exit_status = os.system("ifconfig scapy0:0 inet 198.51.100.1/24 up")
42if exit_status == 0:
43    exit_status = os.system("ip addr show scapy0")
44    print(get_if_list())
45    conf.route.resync()
46    print(conf.route.routes)
47    assert conf.route.route("198.51.100.254") == ("scapy0", "198.51.100.1", "0.0.0.0")
48    route_alias = (3325256704, 4294967040, "0.0.0.0", "scapy0", "198.51.100.1", 0)
49    assert route_alias in conf.route.routes
50    exit_status = os.system("ip link add link scapy0 name scapy0.42 type vlan id 42")
51    exit_status = os.system("ip addr add 203.0.113.42/24 dev scapy0.42")
52    exit_status = os.system("ip link set scapy0.42 up")
53    exit_status = os.system("ip route add 192.0.2.43/32 via 203.0.113.41")
54    print(get_if_list())
55    conf.route.resync()
56    print(conf.route.routes)
57    assert conf.route.route("192.0.2.43") == ("scapy0.42", "203.0.113.42", "203.0.113.41")
58    route_specific = (3221226027, 4294967295, "203.0.113.41", "scapy0.42", "0.0.0.0", 0)
59    assert route_specific in conf.route.routes
60    assert conf.route.route("203.0.113.42") == ('scapy0.42', '203.0.113.42', '0.0.0.0')
61    assert conf.route.route("203.0.113.43") == ('scapy0.42', '203.0.113.42', '0.0.0.0')
62    exit_status = os.system("ip link del name dev scapy0")
63else:
64    assert True
65
66
67= Test scoped interface addresses
68~ linux needs_root
69
70import os
71exit_status = os.system("ip link add name scapy0 type dummy")
72exit_status = os.system("ip link add name scapy1 type dummy")
73exit_status |= os.system("ip addr add 192.0.2.1/24 dev scapy0")
74exit_status |= os.system("ip addr add 192.0.3.1/24 dev scapy1")
75exit_status |= os.system("ip link set scapy0 address 00:01:02:03:04:05 multicast on up")
76exit_status |= os.system("ip link set scapy1 address 06:07:08:09:10:11 multicast on up")
77assert exit_status == 0
78
79conf.ifaces.reload()
80conf.route.resync()
81conf.route6.resync()
82
83conf.route6
84
85try:
86    # IPv4
87    a = Ether()/IP(dst="224.0.0.1%scapy0")
88    assert a[Ether].src == "00:01:02:03:04:05"
89    assert a[IP].src == "192.0.2.1"
90    b = Ether()/IP(dst="224.0.0.1%scapy1")
91    assert b[Ether].src == "06:07:08:09:10:11"
92    assert b[IP].src == "192.0.3.1"
93    c = Ether()/IP(dst="224.0.0.1/24%scapy1")
94    assert c[Ether].src == "06:07:08:09:10:11"
95    assert c[IP].src == "192.0.3.1"
96    # IPv6
97    a = Ether()/IPv6(dst="ff02::fb%scapy0")
98    assert a[Ether].src == "00:01:02:03:04:05"
99    assert a[IPv6].src == "fe80::201:2ff:fe03:405"
100    b = Ether()/IPv6(dst="ff02::fb%scapy1")
101    assert b[Ether].src == "06:07:08:09:10:11"
102    assert b[IPv6].src == "fe80::407:8ff:fe09:1011"
103    c = Ether()/IPv6(dst="ff02::fb/30%scapy1")
104    assert c[Ether].src == "06:07:08:09:10:11"
105    assert c[IPv6].src == "fe80::407:8ff:fe09:1011"
106finally:
107    exit_status = os.system("ip link del scapy0")
108    exit_status = os.system("ip link del scapy1")
109    conf.ifaces.reload()
110    conf.route.resync()
111    conf.route6.resync()
112
113= catch loopback device missing
114~ linux needs_root
115
116from unittest.mock import patch
117
118# can't remove the lo device (or its address without causing trouble) - use some pseudo dummy instead
119
120with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo_x'):
121    routes = read_routes()
122
123= catch loopback device no address assigned
124~ linux needs_root
125
126import os, socket
127from unittest.mock import patch
128
129try:
130    exit_status = os.system("ip link add name scapy_lo type dummy")
131    assert exit_status == 0
132    exit_status = os.system("ip link set dev scapy_lo up")
133    assert exit_status == 0
134
135    with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'):
136        routes = read_routes()
137
138    exit_status = os.system("ip addr add dev scapy_lo 10.10.0.1/24")
139    assert exit_status == 0
140
141    with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'):
142        routes = read_routes()
143
144    lo_routes = [
145        (ltoa(dst_int), ltoa(msk_int), gw_str, if_name, if_addr, metric)
146        for dst_int, msk_int, gw_str, if_name, if_addr, metric in routes
147        if if_name == "scapy_lo"
148    ]
149    lo_routes.sort(key=lambda x: x[0])
150
151    expected_routes = [
152        (168427520, 4294967040, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0),
153        (168427521, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0),
154        (168427775, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0),
155    ]
156    print(lo_routes)
157    print(expected_routes)
158finally:
159    exit_status = os.system("ip link del dev scapy_lo")
160    assert exit_status == 0
161
162= IPv6 link-local address selection
163
164conf.ifaces._add_fake_iface("scapy0", 'e2:39:91:79:19:10')
165
166from unittest.mock import patch
167conf.route6.routes =  [('fe80::', 64, '::', 'scapy0', ['fe80::e039:91ff:fe79:1910'], 256)]
168conf.route6.ipv6_ifaces = set(['scapy0'])
169bck_conf_iface = conf.iface
170conf.iface = "scapy0"
171
172p = Ether()/IPv6(dst="ff02::1")/ICMPv6NIQueryName(data="ff02::1")
173print(p.sprintf("%Ether.src% > %Ether.dst%\n%IPv6.src% > %IPv6.dst%"))
174ip6_ll_address = 'fe80::e039:91ff:fe79:1910'
175print(p[IPv6].src, ip6_ll_address)
176assert p[IPv6].src == ip6_ll_address
177mac_address = 'e2:39:91:79:19:10'
178print(p[Ether].src, mac_address)
179assert p[Ether].src == mac_address
180
181conf.iface = bck_conf_iface
182conf.route6.resync()
183
184= IPv6 - check OS routes
185~ linux ipv6
186
187addrs = in6_getifaddr()
188if addrs:
189    assert all(in6_isvalid(addr[0]) for addr in in6_getifaddr()), 'invalid ipv6 address'
190    ifaces6 = [addr[2] for addr in in6_getifaddr()]
191    assert all(iface in ifaces6 for iface in conf.route6.ipv6_ifaces), 'ipv6 interface has route but no real'
192
193
194= veth interface error handling
195~ linux needs_root veth
196
197from scapy.arch.linux import VEthPair
198
199try:
200    veth = VEthPair('this_IF_name_is_to_long_and_will_cause_an_error', 'veth_scapy_1')
201    veth.setup()
202    assert False
203except subprocess.CalledProcessError:
204    pass
205except Exception:
206    assert False
207
208= veth interface usage - ctx manager
209~ linux needs_root veth
210
211from threading import Condition
212
213cond_started = Condition()
214
215def _sniffer_started():
216
217    global cond_started
218    cond_started.acquire()
219    cond_started.notify()
220    cond_started.release()
221
222cond_started.acquire()
223
224try:
225    with VEthPair('veth_scapy_0', 'veth_scapy_1') as veth:
226        if_list = get_if_list()
227        assert ('veth_scapy_0' in if_list)
228        assert ('veth_scapy_1' in if_list)
229        frm_count = 0
230        def _sniffer():
231            sniffed = sniff(iface='veth_scapy_1',
232                            store=True,
233                            count=2,
234                            lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef,
235                            started_callback=_sniffer_started,
236                            timeout=3)
237            global frm_count
238            frm_count = 2
239        t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1")
240        t_sniffer.start()
241        cond_started.wait()
242        sendp(Ether(type=0xbeef)/Raw(b'0123456789'),
243              iface='veth_scapy_0',
244              count=2)
245        t_sniffer.join(1)
246        assert frm_count == 2
247
248    if_list = get_if_list()
249    assert ('veth_scapy_0' not in if_list)
250    assert ('veth_scapy_1' not in if_list)
251except subprocess.CalledProcessError:
252    assert False
253except Exception:
254    assert False
255
256= veth interface usage - manual interface handling
257~ linux needs_root veth
258
259from threading import Condition
260
261cond_started = Condition()
262
263def _sniffer_started():
264
265    global cond_started
266    cond_started.acquire()
267    cond_started.notify()
268    cond_started.release()
269
270cond_started.acquire()
271
272
273veth = VEthPair('veth_scapy_0', 'veth_scapy_1')
274try:
275    veth.setup()
276    veth.up()
277except subprocess.CalledProcessError:
278    assert False
279except Exception:
280    assert False
281
282conf.ifaces.reload()
283if_list = get_if_list()
284assert ('veth_scapy_0' in if_list)
285assert ('veth_scapy_1' in if_list)
286
287frm_count = 0
288def _sniffer():
289    sniffed = sniff(iface='veth_scapy_1',
290                    store=True,
291                    count=2,
292                    lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef,
293                    started_callback=_sniffer_started,
294                    timeout=3)
295    global frm_count
296    frm_count = 2
297
298t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1 2")
299t_sniffer.start()
300cond_started.wait()
301sendp(Ether(type=0xbeef)/Raw(b'0123456789'),
302      iface='veth_scapy_0',
303      count=2)
304t_sniffer.join(1)
305assert frm_count == 2
306
307try:
308    veth.down()
309    veth.destroy()
310
311    conf.ifaces.reload()
312    if_list = get_if_list()
313    assert ('veth_scapy_0' not in if_list)
314    assert ('veth_scapy_1' not in if_list)
315except subprocess.CalledProcessError:
316    assert False
317except Exception:
318    assert False
319
320
321= Routing table, interface with no names
322~ linux
323
324from unittest.mock import patch
325
326@patch("scapy.arch.linux.ioctl")
327def test_read_routes(mock_ioctl):
328    def raise_ioerror(*args, **kwargs):
329        if args[1] == 0x8912:
330            return args[2]
331        raise IOError
332    mock_ioctl.side_effect = raise_ioerror
333    read_routes()
334
335test_read_routes()
336
337
338= L3PacketSocket sendto exception
339~ linux needs_root
340
341from scapy.arch.linux import L3PacketSocket
342
343import socket
344from unittest import mock
345
346@mock.patch("scapy.arch.linux.socket.socket.sendto")
347def test_L3PacketSocket_sendto_python3(mock_sendto):
348    mock_sendto.side_effect = OSError(22, 2807)
349    l3ps = L3PacketSocket()
350    l3ps.send(IP(dst="8.8.8.8")/ICMP())
351    return True
352
353assert test_L3PacketSocket_sendto_python3()
354
355= Test _interface_selection
356~ netaccess linux needs_root
357
358import os
359from scapy.sendrecv import _interface_selection
360assert _interface_selection(IP(dst="8.8.8.8")/UDP()) == (conf.iface, False)
361exit_status = os.system("ip link add name scapy0 type dummy")
362exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0")
363exit_status = os.system("ip addr add fc00::/24 dev scapy0")
364exit_status = os.system("ip link set scapy0 up")
365conf.ifaces.reload()
366conf.route.resync()
367conf.route6.resync()
368assert _interface_selection(IP(dst="192.0.2.42")/UDP()) == ("scapy0", False)
369assert _interface_selection(IPv6(dst="fc00::ae0d")/UDP()) == ("scapy0", True)
370exit_status = os.system("ip link del name dev scapy0")
371conf.ifaces.reload()
372conf.route.resync()
373conf.route6.resync()
374
375= Test 802.1Q sniffing
376~ linux needs_root veth
377
378from scapy.arch.linux import VEthPair
379from threading import Thread, Condition
380
381def _send():
382    sendp(Ether()/IP(dst="198.51.100.2")/ICMP(), iface='vlanleft0', count=2)
383
384
385with VEthPair("left0", "right0") as veth:
386    exit_status = os.system("ip link add link right0 name vlanright0 type vlan id 42")
387    exit_status = os.system("ip link add link left0 name vlanleft0 type vlan id 42")
388    exit_status = os.system("ip link set vlanright0 up")
389    exit_status = os.system("ip link set vlanleft0 up")
390    exit_status = os.system("ip addr add 198.51.100.1/24 dev vlanleft0")
391    exit_status = os.system("ip addr add 198.51.100.2/24 dev vlanright0")
392    sniffer = AsyncSniffer(
393        iface="right0",
394        lfilter=lambda p: Dot1Q in p,
395        count=2,
396        timeout=5,
397        started_callback=_send,
398    )
399    sniffer.start()
400    sniffer.join(1)
401    if sniffer.running:
402        sniffer.stop()
403        raise Scapy_Exception("Sniffer did not stop !")
404    else:
405        results = sniffer.results
406
407
408assert len(results) == 2
409assert all(Dot1Q in x for x in results)
410
411
412= Reload interfaces & routes
413
414conf.ifaces.reload()
415conf.route.resync()
416conf.route6.resync()
417