1% Regression tests for Scapy BPF mode 2 3# More information at http://www.secdev.org/projects/UTscapy/ 4 5 6############ 7############ 8+ Addresses manipulation functions 9 10= Get the packet IPv4 address configured on conf.iface 11 12get_if_raw_addr(conf.iface) 13 14 15= Get the MAC address of conf.iface 16 17get_if_hwaddr(conf.iface) 18 19= Get the MAC address of conf.loopback_name 20 21get_if_hwaddr(conf.loopback_name) == "00:00:00:00:00:00" 22 23 24############ 25############ 26+ BPF related functions 27 28= Imports 29 30from scapy.arch.bpf.supersocket import L3bpfSocket, L2bpfListenSocket, L2bpfSocket 31 32= Get a BPF handler 33~ needs_root 34 35from scapy.arch.bpf.supersocket import get_dev_bpf 36fd, _ = get_dev_bpf() 37 38= Attach a BPF filter 39~ needs_root libpcap 40 41from scapy.arch.bpf.supersocket import attach_filter 42attach_filter(fd, "arp or icmp", conf.iface) 43 44 45= Get network interfaces list 46 47iflist = get_if_list() 48len(iflist) > 0 49 50= Misc functions 51~ needs_root 52 53from scapy.arch.bpf.supersocket import bpf_select 54 55l = bpf_select([L2bpfSocket()]) 56l = bpf_select([L2bpfSocket(), sys.stdin.fileno()]) 57 58 59############ 60############ 61+ BPF sockets 62 63= L2bpfListenSocket - initialization variants 64~ needs_root 65 66L2bpfListenSocket() 67L2bpfListenSocket(iface=conf.iface) 68L2bpfListenSocket(promisc=True) 69L2bpfListenSocket(filter="icmp") 70L2bpfListenSocket(iface=conf.iface, promisc=True, filter="icmp") 71 72 73= L2bpfListenSocket - set_*() 74~ needs_root 75 76s = L2bpfListenSocket() 77s.set_promisc(0) 78s.set_nonblock(1) 79s.set_promisc(0) 80s.close() 81 82s = L2bpfListenSocket() 83s.set_nonblock(set_flag=False) 84s.set_nonblock(set_flag=True) 85s.set_nonblock(set_flag=False) 86s.close() 87 88= L2bpfListenSocket - get_*() 89~ needs_root 90 91s = L2bpfListenSocket() 92blen = s.get_blen() 93blen > 0 and type(blen) == int 94s.close() 95 96s = L2bpfListenSocket() 97stats = s.get_stats() 98len(stats) == 2 and type(stats) == tuple 99s.close() 100 101 102= L2bpfListenSocket - other methods 103~ needs_root 104 105s = L2bpfListenSocket() 106type(s.fileno()) == int 107s.close() 108 109s = L2bpfListenSocket() 110guessed = s.guess_cls() 111issubclass(guessed, Packet) 112s.close() 113 114= L2bpfListenSocket - read failure 115~ needs_root 116 117from unittest import mock 118 119@mock.patch("scapy.arch.bpf.supersocket.os.read") 120def _test_osread(osread): 121 osread.side_effect = OSError() 122 s = L2bpfListenSocket() 123 assert s.recv_raw() == (None, None, None) 124 125_test_osread() 126 127= L2bpfSocket - nonblock_recv() 128~ needs_root 129 130s = L2bpfSocket() 131s.nonblock_recv() 132s.close() 133 134 135= L*bpfSocket - send() 136~ needs_root 137 138s = L2bpfSocket() 139s.send(Ether()/IP(dst="8.8.8.8")/ICMP()) 140 141s = L3bpfSocket() 142s.send(IP(dst="8.8.8.8")/ICMP()) 143 144s = L3bpfSocket() 145s.assigned_interface = conf.loopback_name 146s.send(IP(dst="8.8.8.8")/ICMP()) 147 148= L3bpfSocket - send and sniff on loopback 149~ needs_root 150 151localhost_ip = conf.ifaces[conf.loopback_name].ips[4][0] 152 153def cb(): 154 # Send a ping to the loopback IP. 155 s = L3bpfSocket(iface=conf.loopback_name) 156 s.send(IP(dst=localhost_ip)/ICMP(seq=1001)) 157 158t = AsyncSniffer(iface=conf.loopback_name, started_callback=cb) 159t.start() 160time.sleep(1) 161t.stop() 162t.join(timeout=1) 163 164# We expect to see our packet and kernel's response. 165len(t.results.filter(lambda p: ( 166 IP in p and ICMP in p and (p[IP].src == localhost_ip or p[IP].dst == localhost_ip) and p[ICMP].seq == 1001))) == 2 167