% Regression tests for Linux only # More information at http://www.secdev.org/projects/UTscapy/ ############ ############ + Linux only test = L3RawSocket ~ netaccess IP TCP linux needs_root with no_debug_dissector(): x = sr1(IP(dst="www.google.com")/TCP(sport=RandShort(), dport=80, flags="S"),timeout=3) x assert x[IP].ottl() in [32, 64, 128, 255] assert 0 <= x[IP].hops() <= 126 # TODO: fix this test (randomly stuck) # ex: https://travis-ci.org/secdev/scapy/jobs/247473497 #= Supersocket _flush_fd #~ needs_root linux # #import select # #from scapy.arch.linux import _flush_fd #socket = conf.L2listen() #select.select([socket],[],[],2) #_flush_fd(socket.ins) = Interface aliases & sub-interfaces ~ linux needs_root import os exit_status = os.system("ip link add name scapy0 type dummy") exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0") exit_status = os.system("ip link set scapy0 up") exit_status = os.system("ifconfig scapy0:0 inet 198.51.100.1/24 up") if exit_status == 0: exit_status = os.system("ip addr show scapy0") print(get_if_list()) conf.route.resync() print(conf.route.routes) assert conf.route.route("198.51.100.254") == ("scapy0", "198.51.100.1", "0.0.0.0") route_alias = (3325256704, 4294967040, "0.0.0.0", "scapy0", "198.51.100.1", 0) assert route_alias in conf.route.routes exit_status = os.system("ip link add link scapy0 name scapy0.42 type vlan id 42") exit_status = os.system("ip addr add 203.0.113.42/24 dev scapy0.42") exit_status = os.system("ip link set scapy0.42 up") exit_status = os.system("ip route add 192.0.2.43/32 via 203.0.113.41") print(get_if_list()) conf.route.resync() print(conf.route.routes) assert conf.route.route("192.0.2.43") == ("scapy0.42", "203.0.113.42", "203.0.113.41") route_specific = (3221226027, 4294967295, "203.0.113.41", "scapy0.42", "0.0.0.0", 0) assert route_specific in conf.route.routes assert conf.route.route("203.0.113.42") == ('scapy0.42', '203.0.113.42', '0.0.0.0') assert conf.route.route("203.0.113.43") == ('scapy0.42', '203.0.113.42', '0.0.0.0') exit_status = os.system("ip link del name dev scapy0") else: assert True = Test scoped interface addresses ~ linux needs_root import os exit_status = os.system("ip link add name scapy0 type dummy") exit_status = os.system("ip link add name scapy1 type dummy") exit_status |= os.system("ip addr add 192.0.2.1/24 dev scapy0") exit_status |= os.system("ip addr add 192.0.3.1/24 dev scapy1") exit_status |= os.system("ip link set scapy0 address 00:01:02:03:04:05 multicast on up") exit_status |= os.system("ip link set scapy1 address 06:07:08:09:10:11 multicast on up") assert exit_status == 0 conf.ifaces.reload() conf.route.resync() conf.route6.resync() conf.route6 try: # IPv4 a = Ether()/IP(dst="224.0.0.1%scapy0") assert a[Ether].src == "00:01:02:03:04:05" assert a[IP].src == "192.0.2.1" b = Ether()/IP(dst="224.0.0.1%scapy1") assert b[Ether].src == "06:07:08:09:10:11" assert b[IP].src == "192.0.3.1" c = Ether()/IP(dst="224.0.0.1/24%scapy1") assert c[Ether].src == "06:07:08:09:10:11" assert c[IP].src == "192.0.3.1" # IPv6 a = Ether()/IPv6(dst="ff02::fb%scapy0") assert a[Ether].src == "00:01:02:03:04:05" assert a[IPv6].src == "fe80::201:2ff:fe03:405" b = Ether()/IPv6(dst="ff02::fb%scapy1") assert b[Ether].src == "06:07:08:09:10:11" assert b[IPv6].src == "fe80::407:8ff:fe09:1011" c = Ether()/IPv6(dst="ff02::fb/30%scapy1") assert c[Ether].src == "06:07:08:09:10:11" assert c[IPv6].src == "fe80::407:8ff:fe09:1011" finally: exit_status = os.system("ip link del scapy0") exit_status = os.system("ip link del scapy1") conf.ifaces.reload() conf.route.resync() conf.route6.resync() = catch loopback device missing ~ linux needs_root from unittest.mock import patch # can't remove the lo device (or its address without causing trouble) - use some pseudo dummy instead with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo_x'): routes = read_routes() = catch loopback device no address assigned ~ linux needs_root import os, socket from unittest.mock import patch try: exit_status = os.system("ip link add name scapy_lo type dummy") assert exit_status == 0 exit_status = os.system("ip link set dev scapy_lo up") assert exit_status == 0 with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'): routes = read_routes() exit_status = os.system("ip addr add dev scapy_lo 10.10.0.1/24") assert exit_status == 0 with patch('scapy.arch.linux.conf.loopback_name', 'scapy_lo'): routes = read_routes() lo_routes = [ (ltoa(dst_int), ltoa(msk_int), gw_str, if_name, if_addr, metric) for dst_int, msk_int, gw_str, if_name, if_addr, metric in routes if if_name == "scapy_lo" ] lo_routes.sort(key=lambda x: x[0]) expected_routes = [ (168427520, 4294967040, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0), (168427521, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0), (168427775, 4294967295, '0.0.0.0', 'scapy_lo', '10.10.0.1', 0), ] print(lo_routes) print(expected_routes) finally: exit_status = os.system("ip link del dev scapy_lo") assert exit_status == 0 = IPv6 link-local address selection conf.ifaces._add_fake_iface("scapy0", 'e2:39:91:79:19:10') from unittest.mock import patch conf.route6.routes = [('fe80::', 64, '::', 'scapy0', ['fe80::e039:91ff:fe79:1910'], 256)] conf.route6.ipv6_ifaces = set(['scapy0']) bck_conf_iface = conf.iface conf.iface = "scapy0" p = Ether()/IPv6(dst="ff02::1")/ICMPv6NIQueryName(data="ff02::1") print(p.sprintf("%Ether.src% > %Ether.dst%\n%IPv6.src% > %IPv6.dst%")) ip6_ll_address = 'fe80::e039:91ff:fe79:1910' print(p[IPv6].src, ip6_ll_address) assert p[IPv6].src == ip6_ll_address mac_address = 'e2:39:91:79:19:10' print(p[Ether].src, mac_address) assert p[Ether].src == mac_address conf.iface = bck_conf_iface conf.route6.resync() = IPv6 - check OS routes ~ linux ipv6 addrs = in6_getifaddr() if addrs: assert all(in6_isvalid(addr[0]) for addr in in6_getifaddr()), 'invalid ipv6 address' ifaces6 = [addr[2] for addr in in6_getifaddr()] assert all(iface in ifaces6 for iface in conf.route6.ipv6_ifaces), 'ipv6 interface has route but no real' = veth interface error handling ~ linux needs_root veth from scapy.arch.linux import VEthPair try: veth = VEthPair('this_IF_name_is_to_long_and_will_cause_an_error', 'veth_scapy_1') veth.setup() assert False except subprocess.CalledProcessError: pass except Exception: assert False = veth interface usage - ctx manager ~ linux needs_root veth from threading import Condition cond_started = Condition() def _sniffer_started(): global cond_started cond_started.acquire() cond_started.notify() cond_started.release() cond_started.acquire() try: with VEthPair('veth_scapy_0', 'veth_scapy_1') as veth: if_list = get_if_list() assert ('veth_scapy_0' in if_list) assert ('veth_scapy_1' in if_list) frm_count = 0 def _sniffer(): sniffed = sniff(iface='veth_scapy_1', store=True, count=2, lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef, started_callback=_sniffer_started, timeout=3) global frm_count frm_count = 2 t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1") t_sniffer.start() cond_started.wait() sendp(Ether(type=0xbeef)/Raw(b'0123456789'), iface='veth_scapy_0', count=2) t_sniffer.join(1) assert frm_count == 2 if_list = get_if_list() assert ('veth_scapy_0' not in if_list) assert ('veth_scapy_1' not in if_list) except subprocess.CalledProcessError: assert False except Exception: assert False = veth interface usage - manual interface handling ~ linux needs_root veth from threading import Condition cond_started = Condition() def _sniffer_started(): global cond_started cond_started.acquire() cond_started.notify() cond_started.release() cond_started.acquire() veth = VEthPair('veth_scapy_0', 'veth_scapy_1') try: veth.setup() veth.up() except subprocess.CalledProcessError: assert False except Exception: assert False conf.ifaces.reload() if_list = get_if_list() assert ('veth_scapy_0' in if_list) assert ('veth_scapy_1' in if_list) frm_count = 0 def _sniffer(): sniffed = sniff(iface='veth_scapy_1', store=True, count=2, lfilter=lambda p: Ether in p and p[Ether].type == 0xbeef, started_callback=_sniffer_started, timeout=3) global frm_count frm_count = 2 t_sniffer = Thread(target=_sniffer, name="linux.uts sniff veth_scapy_1 2") t_sniffer.start() cond_started.wait() sendp(Ether(type=0xbeef)/Raw(b'0123456789'), iface='veth_scapy_0', count=2) t_sniffer.join(1) assert frm_count == 2 try: veth.down() veth.destroy() conf.ifaces.reload() if_list = get_if_list() assert ('veth_scapy_0' not in if_list) assert ('veth_scapy_1' not in if_list) except subprocess.CalledProcessError: assert False except Exception: assert False = Routing table, interface with no names ~ linux from unittest.mock import patch @patch("scapy.arch.linux.ioctl") def test_read_routes(mock_ioctl): def raise_ioerror(*args, **kwargs): if args[1] == 0x8912: return args[2] raise IOError mock_ioctl.side_effect = raise_ioerror read_routes() test_read_routes() = L3PacketSocket sendto exception ~ linux needs_root from scapy.arch.linux import L3PacketSocket import socket from unittest import mock @mock.patch("scapy.arch.linux.socket.socket.sendto") def test_L3PacketSocket_sendto_python3(mock_sendto): mock_sendto.side_effect = OSError(22, 2807) l3ps = L3PacketSocket() l3ps.send(IP(dst="8.8.8.8")/ICMP()) return True assert test_L3PacketSocket_sendto_python3() = Test _interface_selection ~ netaccess linux needs_root import os from scapy.sendrecv import _interface_selection assert _interface_selection(IP(dst="8.8.8.8")/UDP()) == (conf.iface, False) exit_status = os.system("ip link add name scapy0 type dummy") exit_status = os.system("ip addr add 192.0.2.1/24 dev scapy0") exit_status = os.system("ip addr add fc00::/24 dev scapy0") exit_status = os.system("ip link set scapy0 up") conf.ifaces.reload() conf.route.resync() conf.route6.resync() assert _interface_selection(IP(dst="192.0.2.42")/UDP()) == ("scapy0", False) assert _interface_selection(IPv6(dst="fc00::ae0d")/UDP()) == ("scapy0", True) exit_status = os.system("ip link del name dev scapy0") conf.ifaces.reload() conf.route.resync() conf.route6.resync() = Test 802.1Q sniffing ~ linux needs_root veth from scapy.arch.linux import VEthPair from threading import Thread, Condition def _send(): sendp(Ether()/IP(dst="198.51.100.2")/ICMP(), iface='vlanleft0', count=2) with VEthPair("left0", "right0") as veth: exit_status = os.system("ip link add link right0 name vlanright0 type vlan id 42") exit_status = os.system("ip link add link left0 name vlanleft0 type vlan id 42") exit_status = os.system("ip link set vlanright0 up") exit_status = os.system("ip link set vlanleft0 up") exit_status = os.system("ip addr add 198.51.100.1/24 dev vlanleft0") exit_status = os.system("ip addr add 198.51.100.2/24 dev vlanright0") sniffer = AsyncSniffer( iface="right0", lfilter=lambda p: Dot1Q in p, count=2, timeout=5, started_callback=_send, ) sniffer.start() sniffer.join(1) if sniffer.running: sniffer.stop() raise Scapy_Exception("Sniffer did not stop !") else: results = sniffer.results assert len(results) == 2 assert all(Dot1Q in x for x in results) = Reload interfaces & routes conf.ifaces.reload() conf.route.resync() conf.route6.resync()