• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for Linux only
2
3# More informations at http://www.secdev.org/projects/UTscapy/
4
5
6############
7############
8
9+ Linux only test
10
11= TCP client automaton
12~ automaton netaccess linux needs_root
13* This test retries on failure because it often fails
14
15from __future__ import print_function
16import os
17import time
18import signal
19
20from scapy.modules.six.moves import range
21
22def handler(signum, stack_frame):
23    raise Exception("Timer expired !")
24
25tmp = signal.signal(signal.SIGALRM, handler)
26
27SECDEV_IP4 = "203.178.141.194"
28IPTABLE_RULE = "iptables -%c INPUT -s %s -p tcp --sport 80 -j DROP"
29
30# Drop packets from SECDEV_IP4
31assert(os.system(IPTABLE_RULE % ('A', SECDEV_IP4)) == 0)
32
33success = False
34for i in range(10):
35    tmp = signal.alarm(5)
36    try:
37        r, w = os.pipe()
38        t = TCP_client(SECDEV_IP4, 80, external_fd={ "tcp": (r,w) })
39        tmp = os.write(w, b"HEAD / HTTP/1.0\r\n\r\n")
40        t.runbg()
41        time.sleep(0.5)
42        response = os.read(r, 4096)
43        tmp = signal.alarm(0)  # cancel the alarm
44        t.stop()
45        os.close(r)
46        os.close(w)
47        if response.startswith(b"HTTP/1.1 200 OK"):
48            success = True
49            break
50        else:
51            time.sleep(0.5)
52    except Exception as e:
53        print(e)
54
55# Remove the iptables rule
56assert(os.system(IPTABLE_RULE % ('D', SECDEV_IP4)) == 0)
57
58assert(success)
59
60= L3RawSocket
61~ netaccess IP ICMP linux needs_root
62
63old_l3socket = conf.L3socket
64old_debug_dissector = conf.debug_dissector
65conf.debug_dissector = False
66conf.L3socket = L3RawSocket
67x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3)
68conf.debug_dissector = old_debug_dissector
69conf.L3socket = old_l3socket
70x
71assert x[IP].ottl() in [32, 64, 128, 255]
72assert 0 <= x[IP].hops() <= 126
73x is not None and ICMP in x and x[ICMP].type == 0
74
75# TODO: fix this test (randomly stuck)
76# ex: https://travis-ci.org/secdev/scapy/jobs/247473497
77
78#= Supersocket _flush_fd
79#~ needs_root linux
80#
81#import select
82#
83#from scapy.arch.linux import _flush_fd
84#socket = conf.L2listen()
85#select.select([socket],[],[],2)
86#_flush_fd(socket.ins)
87
88= Test legacy attach_filter function
89~ linux needs_root
90from scapy.arch.common import get_bpf_pointer
91
92old_pypy = conf.use_pypy
93conf.use_pypy = True
94
95tcpdump_lines = ['12\n', '40 0 0 12\n', '21 0 5 34525\n', '48 0 0 20\n', '21 6 0 6\n', '21 0 6 44\n', '48 0 0 54\n', '21 3 4 6\n', '21 0 3 2048\n', '48 0 0 23\n', '21 0 1 6\n', '6 0 0 1600\n', '6 0 0 0\n']
96pointer = get_bpf_pointer(tcpdump_lines)
97assert six.PY3 or isinstance(pointer, str)
98assert six.PY3 or len(pointer) > 1
99
100conf.use_pypy = old_pypy
101
102= Interface aliases & sub-interfaces
103~ linux needs_root
104
105import os
106exit_status = os.system("ip link add name scapy0 type dummy")
107exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0")
108exit_status = os.system("ip link set scapy0 up")
109exit_status = os.system("ifconfig scapy0:0 inet 198.51.100.1/24 up")
110exit_status = os.system("ip addr show scapy0")
111print(get_if_list())
112conf.route.resync()
113print(conf.route.routes)
114assert(conf.route.route("198.51.100.254") == ("scapy0", "198.51.100.1", "0.0.0.0"))
115route_alias = (3325256704, 4294967040, "0.0.0.0", "scapy0", "198.51.100.1", 0)
116assert(route_alias in conf.route.routes)
117exit_status = os.system("ip link add link scapy0 name scapy0.42 type vlan id 42")
118exit_status = os.system("ip addr add 203.0.113.42/24 dev scapy0.42")
119exit_status = os.system("ip link set scapy0.42 up")
120exit_status = os.system("ip route add 192.0.2.43/32 via 203.0.113.41")
121print(get_if_list())
122conf.route.resync()
123print(conf.route.routes)
124assert(conf.route.route("192.0.2.43") == ("scapy0.42", "203.0.113.42", "203.0.113.41"))
125route_specific = (3221226027, 4294967295, "203.0.113.41", "scapy0.42", "203.0.113.42", 0)
126assert(route_specific in conf.route.routes)
127exit_status = os.system("ip link del name dev scapy0")
128