• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1% Regression tests for Scapy BPF mode
2
3# More information at http://www.secdev.org/projects/UTscapy/
4
5
6############
7############
8+ Addresses manipulation functions
9
10= Get the packet IPv4 address configured on conf.iface
11
12get_if_raw_addr(conf.iface)
13
14
15= Get the MAC address of conf.iface
16
17get_if_hwaddr(conf.iface)
18
19= Get the MAC address of conf.loopback_name
20
21get_if_hwaddr(conf.loopback_name) == "00:00:00:00:00:00"
22
23
24############
25############
26+ BPF related functions
27
28= Imports
29
30from scapy.arch.bpf.supersocket import L3bpfSocket, L2bpfListenSocket, L2bpfSocket
31
32= Get a BPF handler
33~ needs_root
34
35from scapy.arch.bpf.supersocket import get_dev_bpf
36fd, _ = get_dev_bpf()
37
38= Attach a BPF filter
39~ needs_root libpcap
40
41from scapy.arch.bpf.supersocket import attach_filter
42attach_filter(fd, "arp or icmp", conf.iface)
43
44
45= Get network interfaces list
46
47iflist = get_if_list()
48len(iflist) > 0
49
50= Misc functions
51~ needs_root
52
53from scapy.arch.bpf.supersocket import bpf_select
54
55l = bpf_select([L2bpfSocket()])
56l = bpf_select([L2bpfSocket(), sys.stdin.fileno()])
57
58
59############
60############
61+ BPF sockets
62
63= L2bpfListenSocket - initialization variants
64~ needs_root
65
66L2bpfListenSocket()
67L2bpfListenSocket(iface=conf.iface)
68L2bpfListenSocket(promisc=True)
69L2bpfListenSocket(filter="icmp")
70L2bpfListenSocket(iface=conf.iface, promisc=True, filter="icmp")
71
72
73= L2bpfListenSocket - set_*()
74~ needs_root
75
76s = L2bpfListenSocket()
77s.set_promisc(0)
78s.set_nonblock(1)
79s.set_promisc(0)
80s.close()
81
82s = L2bpfListenSocket()
83s.set_nonblock(set_flag=False)
84s.set_nonblock(set_flag=True)
85s.set_nonblock(set_flag=False)
86s.close()
87
88= L2bpfListenSocket - get_*()
89~ needs_root
90
91s = L2bpfListenSocket()
92blen = s.get_blen()
93blen > 0 and type(blen) == int
94s.close()
95
96s = L2bpfListenSocket()
97stats = s.get_stats()
98len(stats) == 2 and type(stats) == tuple
99s.close()
100
101
102= L2bpfListenSocket - other methods
103~ needs_root
104
105s = L2bpfListenSocket()
106type(s.fileno()) == int
107s.close()
108
109s = L2bpfListenSocket()
110guessed = s.guess_cls()
111issubclass(guessed, Packet)
112s.close()
113
114= L2bpfListenSocket - read failure
115~ needs_root
116
117from unittest import mock
118
119@mock.patch("scapy.arch.bpf.supersocket.os.read")
120def _test_osread(osread):
121    osread.side_effect = OSError()
122    s = L2bpfListenSocket()
123    assert s.recv_raw() == (None, None, None)
124
125_test_osread()
126
127= L2bpfSocket - nonblock_recv()
128~ needs_root
129
130s = L2bpfSocket()
131s.nonblock_recv()
132s.close()
133
134
135= L*bpfSocket - send()
136~ needs_root
137
138s = L2bpfSocket()
139s.send(Ether()/IP(dst="8.8.8.8")/ICMP())
140
141s = L3bpfSocket()
142s.send(IP(dst="8.8.8.8")/ICMP())
143
144s = L3bpfSocket()
145s.assigned_interface = conf.loopback_name
146s.send(IP(dst="8.8.8.8")/ICMP())
147
148= L3bpfSocket - send and sniff on loopback
149~ needs_root
150
151localhost_ip = conf.ifaces[conf.loopback_name].ips[4][0]
152
153def cb():
154    # Send a ping to the loopback IP.
155    s = L3bpfSocket(iface=conf.loopback_name)
156    s.send(IP(dst=localhost_ip)/ICMP(seq=1001))
157
158t = AsyncSniffer(iface=conf.loopback_name, started_callback=cb)
159t.start()
160time.sleep(1)
161t.stop()
162t.join(timeout=1)
163
164# We expect to see our packet and kernel's response.
165len(t.results.filter(lambda p: (
166    IP in p and ICMP in p and (p[IP].src == localhost_ip or p[IP].dst == localhost_ip) and p[ICMP].seq == 1001))) == 2
167