1% Regression tests for Linux only 2 3# More information at http://www.secdev.org/projects/UTscapy/ 4 5 6############ 7############ 8 9+ Linux only test 10 11= L3RawSocket 12~ netaccess IP TCP linux needs_root 13 14with no_debug_dissector(): 15 x = sr1(IP(dst="www.google.com")/TCP(sport=RandShort(), dport=80, flags="S"),timeout=3) 16 17x 18assert x[IP].ottl() in [32, 64, 128, 255] 19assert 0 <= x[IP].hops() <= 126 20 21# TODO: fix this test (randomly stuck) 22# ex: https://travis-ci.org/secdev/scapy/jobs/247473497 23 24#= Supersocket _flush_fd 25#~ needs_root linux 26# 27#import select 28# 29#from scapy.arch.linux import _flush_fd 30#socket = conf.L2listen() 31#select.select([socket],[],[],2) 32#_flush_fd(socket.ins) 33 34= Interface aliases & sub-interfaces 35~ linux needs_root 36 37import os 38exit_status = os.system("ip link add name scapy0 type dummy") 39exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0") 40exit_status = os.system("ip link set scapy0 up") 41exit_status = os.system("ifconfig scapy0:0 inet 198.51.100.1/24 up") 42if exit_status == 0: 43 exit_status = os.system("ip addr show scapy0") 44 print(get_if_list()) 45 conf.route.resync() 46 print(conf.route.routes) 47 assert conf.route.route("198.51.100.254") == ("scapy0", "198.51.100.1", "0.0.0.0") 48 route_alias = (3325256704, 4294967040, "0.0.0.0", "scapy0", "198.51.100.1", 0) 49 assert route_alias in conf.route.routes 50 exit_status = os.system("ip link add link scapy0 name scapy0.42 type vlan id 42") 51 exit_status = os.system("ip addr add 203.0.113.42/24 dev scapy0.42") 52 exit_status = os.system("ip link set scapy0.42 up") 53 exit_status = os.system("ip route add 192.0.2.43/32 via 203.0.113.41") 54 print(get_if_list()) 55 conf.route.resync() 56 print(conf.route.routes) 57 assert conf.route.route("192.0.2.43") == ("scapy0.42", "203.0.113.42", "203.0.113.41") 58 route_specific = (3221226027, 4294967295, "203.0.113.41", "scapy0.42", "0.0.0.0", 0) 59 assert route_specific in conf.route.routes 60 assert conf.route.route("203.0.113.42") == ('scapy0.42', '203.0.113.42', '0.0.0.0') 61 assert conf.route.route("203.0.113.43") == ('scapy0.42', '203.0.113.42', '0.0.0.0') 62 exit_status = os.system("ip link del name dev scapy0") 63else: 64 assert True 65 66 67= Test scoped interface addresses 68~ linux needs_root 69 70import os 71exit_status = os.system("ip link add name scapy0 type dummy") 72exit_status = os.system("ip link add name scapy1 type dummy") 73exit_status |= os.system("ip addr add 192.0.2.1/24 dev scapy0") 74exit_status |= os.system("ip addr add 192.0.3.1/24 dev scapy1") 75exit_status |= os.system("ip link set scapy0 address 00:01:02:03:04:05 multicast on up") 76exit_status |= os.system("ip link set scapy1 address 06:07:08:09:10:11 multicast on up") 77assert exit_status == 0 78 79conf.ifaces.reload() 80conf.route.resync() 81conf.route6.resync() 82 83conf.route6 84 85try: 86 # IPv4 87 a = Ether()/IP(dst="224.0.0.1%scapy0") 88 assert a[Ether].src == "00:01:02:03:04:05" 89 assert a[IP].src == "192.0.2.1" 90 b = Ether()/IP(dst="224.0.0.1%scapy1") 91 assert b[Ether].src == "06:07:08:09:10:11" 92 assert b[IP].src == "192.0.3.1" 93 c = Ether()/IP(dst="224.0.0.1/24%scapy1") 94 assert c[Ether].src == "06:07:08:09:10:11" 95 assert c[IP].src == "192.0.3.1" 96 # IPv6 97 a = Ether()/IPv6(dst="ff02::fb%scapy0") 98 assert a[Ether].src == "00:01:02:03:04:05" 99 assert a[IPv6].src == "fe80::201:2ff:fe03:405" 100 b = Ether()/IPv6(dst="ff02::fb%scapy1") 101 assert b[Ether].src == "06:07:08:09:10:11" 102 assert b[IPv6].src == "fe80::407:8ff:fe09:1011" 103 c = Ether()/IPv6(dst="ff02::fb/30%scapy1") 104 assert c[Ether].src == "06:07:08:09:10:11" 105 assert c[IPv6].src == "fe80::407:8ff:fe09:1011" 106finally: 107 exit_status = os.system("ip link del scapy0") 108 exit_status = os.system("ip link del scapy1") 109 conf.ifaces.reload() 110 conf.route.resync() 111 conf.route6.resync() 112 113= catch loopback device missing 114~ linux needs_root 115 116from unittest.mock import patch 117 118# can't remove the lo device (or its address without causing trouble) - use some pseudo dummy instead 119 120with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo_x'): 121 routes = read_routes() 122 123= catch loopback device no address assigned 124~ linux needs_root 125 126import os, socket 127from unittest.mock import patch 128 129try: 130 exit_status = os.system("ip link add name scapy_lo type dummy") 131 assert exit_status == 0 132 exit_status = os.system("ip link set dev scapy_lo up") 133 assert exit_status == 0 134 135 with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'): 136 routes = read_routes() 137 138 exit_status = os.system("ip addr add dev scapy_lo 10.10.0.1/24") 139 assert exit_status == 0 140 141 with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'): 142 routes = read_routes() 143 144 lo_routes = [ 145 (ltoa(dst_int), ltoa(msk_int), gw_str, if_name, if_addr, metric) 146 for dst_int, msk_int, gw_str, if_name, if_addr, metric in routes 147 if if_name == "scapy_lo" 148 ] 149 lo_routes.sort(key=lambda x: x[0]) 150 151 expected_routes = [ 152 (168427520, 4294967040, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0), 153 (168427521, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0), 154 (168427775, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0), 155 ] 156 print(lo_routes) 157 print(expected_routes) 158finally: 159 exit_status = os.system("ip link del dev scapy_lo") 160 assert exit_status == 0 161 162= IPv6 link-local address selection 163 164conf.ifaces._add_fake_iface("scapy0", 'e2:39:91:79:19:10') 165 166from unittest.mock import patch 167conf.route6.routes = [('fe80::', 64, '::', 'scapy0', ['fe80::e039:91ff:fe79:1910'], 256)] 168conf.route6.ipv6_ifaces = set(['scapy0']) 169bck_conf_iface = conf.iface 170conf.iface = "scapy0" 171 172p = Ether()/IPv6(dst="ff02::1")/ICMPv6NIQueryName(data="ff02::1") 173print(p.sprintf("%Ether.src% > %Ether.dst%\n%IPv6.src% > %IPv6.dst%")) 174ip6_ll_address = 'fe80::e039:91ff:fe79:1910' 175print(p[IPv6].src, ip6_ll_address) 176assert p[IPv6].src == ip6_ll_address 177mac_address = 'e2:39:91:79:19:10' 178print(p[Ether].src, mac_address) 179assert p[Ether].src == mac_address 180 181conf.iface = bck_conf_iface 182conf.route6.resync() 183 184= IPv6 - check OS routes 185~ linux ipv6 186 187addrs = in6_getifaddr() 188if addrs: 189 assert all(in6_isvalid(addr[0]) for addr in in6_getifaddr()), 'invalid ipv6 address' 190 ifaces6 = [addr[2] for addr in in6_getifaddr()] 191 assert all(iface in ifaces6 for iface in conf.route6.ipv6_ifaces), 'ipv6 interface has route but no real' 192 193 194= veth interface error handling 195~ linux needs_root veth 196 197from scapy.arch.linux import VEthPair 198 199try: 200 veth = VEthPair('this_IF_name_is_to_long_and_will_cause_an_error', 'veth_scapy_1') 201 veth.setup() 202 assert False 203except subprocess.CalledProcessError: 204 pass 205except Exception: 206 assert False 207 208= veth interface usage - ctx manager 209~ linux needs_root veth 210 211from threading import Condition 212 213cond_started = Condition() 214 215def _sniffer_started(): 216 217 global cond_started 218 cond_started.acquire() 219 cond_started.notify() 220 cond_started.release() 221 222cond_started.acquire() 223 224try: 225 with VEthPair('veth_scapy_0', 'veth_scapy_1') as veth: 226 if_list = get_if_list() 227 assert ('veth_scapy_0' in if_list) 228 assert ('veth_scapy_1' in if_list) 229 frm_count = 0 230 def _sniffer(): 231 sniffed = sniff(iface='veth_scapy_1', 232 store=True, 233 count=2, 234 lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef, 235 started_callback=_sniffer_started, 236 timeout=3) 237 global frm_count 238 frm_count = 2 239 t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1") 240 t_sniffer.start() 241 cond_started.wait() 242 sendp(Ether(type=0xbeef)/Raw(b'0123456789'), 243 iface='veth_scapy_0', 244 count=2) 245 t_sniffer.join(1) 246 assert frm_count == 2 247 248 if_list = get_if_list() 249 assert ('veth_scapy_0' not in if_list) 250 assert ('veth_scapy_1' not in if_list) 251except subprocess.CalledProcessError: 252 assert False 253except Exception: 254 assert False 255 256= veth interface usage - manual interface handling 257~ linux needs_root veth 258 259from threading import Condition 260 261cond_started = Condition() 262 263def _sniffer_started(): 264 265 global cond_started 266 cond_started.acquire() 267 cond_started.notify() 268 cond_started.release() 269 270cond_started.acquire() 271 272 273veth = VEthPair('veth_scapy_0', 'veth_scapy_1') 274try: 275 veth.setup() 276 veth.up() 277except subprocess.CalledProcessError: 278 assert False 279except Exception: 280 assert False 281 282conf.ifaces.reload() 283if_list = get_if_list() 284assert ('veth_scapy_0' in if_list) 285assert ('veth_scapy_1' in if_list) 286 287frm_count = 0 288def _sniffer(): 289 sniffed = sniff(iface='veth_scapy_1', 290 store=True, 291 count=2, 292 lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef, 293 started_callback=_sniffer_started, 294 timeout=3) 295 global frm_count 296 frm_count = 2 297 298t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1 2") 299t_sniffer.start() 300cond_started.wait() 301sendp(Ether(type=0xbeef)/Raw(b'0123456789'), 302 iface='veth_scapy_0', 303 count=2) 304t_sniffer.join(1) 305assert frm_count == 2 306 307try: 308 veth.down() 309 veth.destroy() 310 311 conf.ifaces.reload() 312 if_list = get_if_list() 313 assert ('veth_scapy_0' not in if_list) 314 assert ('veth_scapy_1' not in if_list) 315except subprocess.CalledProcessError: 316 assert False 317except Exception: 318 assert False 319 320 321= Routing table, interface with no names 322~ linux 323 324from unittest.mock import patch 325 326@patch("scapy.arch.linux.ioctl") 327def test_read_routes(mock_ioctl): 328 def raise_ioerror(*args, **kwargs): 329 if args[1] == 0x8912: 330 return args[2] 331 raise IOError 332 mock_ioctl.side_effect = raise_ioerror 333 read_routes() 334 335test_read_routes() 336 337 338= L3PacketSocket sendto exception 339~ linux needs_root 340 341from scapy.arch.linux import L3PacketSocket 342 343import socket 344from unittest import mock 345 346@mock.patch("scapy.arch.linux.socket.socket.sendto") 347def test_L3PacketSocket_sendto_python3(mock_sendto): 348 mock_sendto.side_effect = OSError(22, 2807) 349 l3ps = L3PacketSocket() 350 l3ps.send(IP(dst="8.8.8.8")/ICMP()) 351 return True 352 353assert test_L3PacketSocket_sendto_python3() 354 355= Test _interface_selection 356~ netaccess linux needs_root 357 358import os 359from scapy.sendrecv import _interface_selection 360assert _interface_selection(IP(dst="8.8.8.8")/UDP()) == (conf.iface, False) 361exit_status = os.system("ip link add name scapy0 type dummy") 362exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0") 363exit_status = os.system("ip addr add fc00::/24 dev scapy0") 364exit_status = os.system("ip link set scapy0 up") 365conf.ifaces.reload() 366conf.route.resync() 367conf.route6.resync() 368assert _interface_selection(IP(dst="192.0.2.42")/UDP()) == ("scapy0", False) 369assert _interface_selection(IPv6(dst="fc00::ae0d")/UDP()) == ("scapy0", True) 370exit_status = os.system("ip link del name dev scapy0") 371conf.ifaces.reload() 372conf.route.resync() 373conf.route6.resync() 374 375= Test 802.1Q sniffing 376~ linux needs_root veth 377 378from scapy.arch.linux import VEthPair 379from threading import Thread, Condition 380 381def _send(): 382 sendp(Ether()/IP(dst="198.51.100.2")/ICMP(), iface='vlanleft0', count=2) 383 384 385with VEthPair("left0", "right0") as veth: 386 exit_status = os.system("ip link add link right0 name vlanright0 type vlan id 42") 387 exit_status = os.system("ip link add link left0 name vlanleft0 type vlan id 42") 388 exit_status = os.system("ip link set vlanright0 up") 389 exit_status = os.system("ip link set vlanleft0 up") 390 exit_status = os.system("ip addr add 198.51.100.1/24 dev vlanleft0") 391 exit_status = os.system("ip addr add 198.51.100.2/24 dev vlanright0") 392 sniffer = AsyncSniffer( 393 iface="right0", 394 lfilter=lambda p: Dot1Q in p, 395 count=2, 396 timeout=5, 397 started_callback=_send, 398 ) 399 sniffer.start() 400 sniffer.join(1) 401 if sniffer.running: 402 sniffer.stop() 403 raise Scapy_Exception("Sniffer did not stop !") 404 else: 405 results = sniffer.results 406 407 408assert len(results) == 2 409assert all(Dot1Q in x for x in results) 410 411 412= Reload interfaces & routes 413 414conf.ifaces.reload() 415conf.route.resync() 416conf.route6.resync() 417