% Regression tests for Scapy # More information at http://www.secdev.org/projects/UTscapy/ ############ ############ + Information on Scapy = Setup def expect_exception(e, c): try: c() return False except e: return True = Get conf ~ conf command * Dump the current configuration conf IP().src conf.loopback_name = Test module version detection ~ conf class FakeModule(object): __version__ = "v1.12" class FakeModule2(object): __version__ = "5.143.3.12" class FakeModule3(object): __version__ = "v2.4.2.dev42" from scapy.config import _version_checker assert _version_checker(FakeModule, (1,11,5)) assert not _version_checker(FakeModule, (1,13)) assert _version_checker(FakeModule2, (5, 1)) assert not _version_checker(FakeModule2, (5, 143, 4)) assert _version_checker(FakeModule3, (2, 4, 2)) = Check Scapy version from unittest import mock import scapy from scapy import _parse_tag, _version_from_git_describe from scapy.config import _version_checker b = Bunch(returncode=0, communicate=lambda *args, **kargs: (b"v2.4.5rc1-261-g44b98e14", None)) with mock.patch('scapy.subprocess.Popen', return_value=b): with mock.patch('scapy.os.path.isdir', return_value=True): class GitModuleScapy(object): __version__ = _version_from_git_describe() # GH3847 with mock.patch('scapy.subprocess.Popen', return_value=b): with mock.patch('scapy.os.path.isdir', return_value=False): try: _version_from_git_describe() assert False except ValueError: pass assert GitModuleScapy.__version__ == '2.4.5rc1.dev261' assert _version_checker(GitModuleScapy, (2, 4, 5)) = List layers ~ conf command ls() = List layers - advanced ~ conf command with ContextManagerCaptureOutput() as cmco: ls("IP", case_sensitive=True) result_ls = cmco.get_output().split("\n") assert all("IP" in x for x in result_ls if x.strip()) assert len(result_ls) >= 3 = List packet fields - ls ~ command with ContextManagerCaptureOutput() as cmco: ls(ARP(hwsrc="aa:aa:aa:aa:aa:aa", psrc="1.1.1.1")) result_ls = cmco.get_output().split("\n") result_ls assert result_ls[5] == "hwsrc : MultipleTypeField (SourceMACField, StrFixedLenField) = 'aa:aa:aa:aa:aa:aa' ('None')" assert result_ls[6] == "psrc : MultipleTypeField (SourceIPField, SourceIP6Field, StrFixedLenField) = '1.1.1.1' ('None')" = List commands ~ conf command lsc() = List contribs ~ command def test_list_contrib(): with ContextManagerCaptureOutput() as cmco: list_contrib() result_list_contrib = cmco.get_output() assert "http2 : HTTP/2 (RFC 7540, RFC 7541) status=loads" in result_list_contrib assert len(result_list_contrib.split('\n')) > 40 test_list_contrib() = Test packet show() on LatexTheme % with LatexTheme class SmallPacket(Packet): fields_desc = [ByteField("a", 0)] conf_color_theme = conf.color_theme conf.color_theme = LatexTheme() pkt = SmallPacket() with ContextManagerCaptureOutput() as cmco: pkt.show() result = cmco.get_output().strip() assert result == '\\#\\#\\#[ \\textcolor{red}{\\bf SmallPacket} ]\\#\\#\\#\n \\textcolor{blue}{a} = \\textcolor{purple}{0}' conf.color_theme = conf_color_theme = Test rfc() ~ command dat = rfc(IP, ret=True).split("\n") assert dat[0].replace(" ", "").strip() == "0123" assert "0123456789" in dat[1].replace(" ", "") for l in dat: # only upper case and +- assert re.match(r"[A-Z+-]*", l) # Add a space before each + to avoid conflicts with UTscapy ! # They will be stripped below result = """ 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |VERSION| IHL | TOS | LEN | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | ID |FLAGS| FRAG | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | TTL | PROTO | CHKSUM | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | SRC | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | DST | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | OPTIONS | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Fig. IP """.strip() result = [x.strip() for x in result.split("\n")] output = [x.strip() for x in rfc(IP, ret=True).strip().split("\n")] assert result == output result = """ 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | CODE | ID | LEN | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | TYPE |L|M|S|RES|VERSI| MESSAGE LEN | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | | DATA | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Fig. EAP_TTLS """.strip() result = [x.strip() for x in result.split("\n")] output = [x.strip() for x in rfc(EAP_TTLS, ret=True).strip().split("\n")] assert result == output result = """ 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ |VERSION| TC | FL | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | PLEN | NH | HLIM | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | SRC | + + | | + + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | DST | + + | | + + | | + + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Fig. IPv6 """.strip() result = [x.strip() for x in result.split("\n")] output = [x.strip() for x in rfc(IPv6, ret=True).strip().split("\n")] assert result == output class TestPad(Packet): fields_desc = [ShortField("f0", 0), ShortField("f1", 0), PadField(ByteField("f2", 1), 8), PadField(ShortField("f3", 0), 4)] result = """ 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | F0 | F1 | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | F2 | padding | +-+-+-+-+-+-+-+-+ + | | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | F3 | padding | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ Fig. TestPad """.strip() result = [x.strip() for x in result.split("\n")] output = [x.strip() for x in rfc(TestPad, ret=True).strip().split("\n")] assert result == output = Check that all contrib modules are well-configured ~ command list_contrib(_debug=True) = Configuration ~ conf conf.debug_dissector = True = Configuration conf.use_* LINUX ~ linux try: conf.use_bpf = True assert False except: True assert not conf.use_bpf = Configuration conf.use_* WINDOWS ~ windows try: conf.use_bpf = True assert False except: True assert not conf.use_bpf = Configuration conf.use_pcap ~ linux libpcap if not conf.use_pcap: assert not conf.iface.provider.libpcap conf.use_pcap = True assert conf.iface.provider.libpcap for iface in conf.ifaces.values(): assert iface.provider.libpcap or iface.is_valid() == False conf.use_pcap = False assert not conf.iface.provider.libpcap = Test layer filtering ~ filter pkt = NetflowHeader()/NetflowHeaderV5()/NetflowRecordV5() conf.layers.filter([NetflowHeader, NetflowHeaderV5]) assert NetflowRecordV5 not in NetflowHeader(bytes(pkt)) conf.layers.unfilter() assert NetflowRecordV5 in NetflowHeader(bytes(pkt)) ########### ########### = UTscapy route check * Check that UTscapy has correctly replaced the routes. Many tests won't work otherwise p = IP().src p assert p == "127.0.0.1" ############ ############ + Scapy functions tests = Interface related functions from unittest import mock conf.iface get_if_addr(conf.iface) get_if_hwaddr(conf.iface) bytes_hex(get_if_raw_addr(conf.iface)) def get_dummy_interface(): """Returns a dummy network interface""" conf.ifaces._add_fake_iface("dummy0") return "dummy0" get_if_raw_addr(get_dummy_interface()) get_if_list() get_working_if() get_if_raw_addr6(conf.iface) = More Interfaces related functions # Test name resolution old = conf.iface conf.iface = conf.iface.name assert conf.iface == old assert isinstance(conf.iface, NetworkInterface) assert conf.iface.is_valid() from unittest import mock @mock.patch("scapy.interfaces.conf.route.routes", []) @mock.patch("scapy.interfaces.conf.ifaces.values") def _test_get_working_if(rou): rou.side_effect = lambda: [] assert get_working_if() is None assert conf.iface + "a" # left + assert "hey! are you, ready to go ? %s" % conf.iface # format assert "cuz you know the way to go" + conf.iface # right + _test_get_working_if() = Test conf.ifaces conf.iface conf.ifaces assert conf.iface in conf.ifaces.values() assert conf.ifaces.dev_from_index(conf.iface.index) == conf.iface assert conf.ifaces.dev_from_networkname(conf.iface.network_name) == conf.iface conf.ifaces.data = {'a': NetworkInterface(InterfaceProvider(), {"name": 'a', "network_name": 'a', "description": 'a', "ips": ["127.0.0.1", "::1", "::2", "127.0.0.2"], "mac": 'aa:aa:aa:aa:aa:aa'})} with ContextManagerCaptureOutput() as cmco: conf.ifaces.show() output = cmco.get_output() data = """ Source Index Name MAC IPv4 IPv6 Unknown 0 a aa:aa:aa:aa:aa:aa 127.0.0.1 ::1 127.0.0.2 ::2 """.strip() output = [x.strip() for x in output.strip().split("\n")] data = [x.strip() for x in data.strip().split("\n")] assert output == data conf.ifaces.reload() = Test extcap detection in conf.ifaces ~ linux extcap import os from scapy.libs.extcap import load_extcap _bkp_extcap = conf.prog.extcap_folders _bkp_providers = conf.ifaces.providers.copy() conf.ifaces.providers.clear() # Create some sort of extcap parody program extcapfld = get_temp_dir() extcapprog = os.path.join(extcapfld, "runner.sh") data = """#!/usr/bin/env python3 import struct import argparse parser = argparse.ArgumentParser() parser.add_argument('--extcap-interfaces', action='store_true') parser.add_argument('--capture', action='store_true') parser.add_argument('--extcap-config', action='store_true') parser.add_argument('--scan-follow-rsp', action='store_true') parser.add_argument('--scan-follow-aux', action='store_true') parser.add_argument('--extcap-interface', type=str) parser.add_argument('--fifo', type=str) args = parser.parse_args() if args.extcap_interfaces: # List interfaces print(bytes.fromhex("0a657874636170207b76657273696f6e3d342e312e317d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d7b68656c703d68747470733a2f2f7777772e6e6f7264696373656d692e636f6d2f536f6674776172652d616e642d546f6f6c732f446576656c6f706d656e742d546f6f6c732f6e52462d536e69666665722d666f722d426c7565746f6f74682d4c457d0a696e74657266616365207b76616c75653d2f6465762f747479555342352d4e6f6e657d7b646973706c61793d6e524620536e696666657220666f7220426c7565746f6f7468204c457d0a636f6e74726f6c207b6e756d6265723d307d7b747970653d73656c6563746f727d7b646973706c61793d4465766963657d7b746f6f6c7469703d446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d317d7b747970653d73656c6563746f727d7b646973706c61793d4b65797d7b746f6f6c7469703d7d0a636f6e74726f6c207b6e756d6265723d327d7b747970653d737472696e677d7b646973706c61793d56616c75657d7b746f6f6c7469703d3620646967697420706173736b6579206f72203136206f7220333220627974657320656e6372797074696f6e206b657920696e2068657861646563696d616c207374617274696e67207769746820273078272c2062696720656e6469616e20666f726d61742e49662074686520656e7465726564206b65792069732073686f72746572207468616e203136206f722033322062797465732c2069742077696c6c206265207a65726f2d70616464656420696e2066726f6e74277d7b76616c69646174696f6e3d5c625e28285b302d395d7b367d297c2830785b302d39612d66412d465d7b312c36347d297c285b302d39412d46612d665d7b327d5b3a2d5d297b357d285b302d39412d46612d665d7b327d2920287075626c69637c72616e646f6d2929245c627d0a636f6e74726f6c207b6e756d6265723d337d7b747970653d737472696e677d7b646973706c61793d41647620486f707d7b64656661756c743d33372c33382c33397d7b746f6f6c7469703d4164766572746973696e67206368616e6e656c20686f702073657175656e63652e204368616e676520746865206f7264657220696e2077686963682074686520736e6966666572207377697463686573206164766572746973696e67206368616e6e656c732e2056616c6964206368616e6e656c73206172652033372c20333820616e642033392073657061726174656420627920636f6d6d612e7d7b76616c69646174696f6e3d5e5c732a282833377c33387c3339295c732a2c5c732a297b302c327d2833377c33387c3339297b317d5c732a247d7b72657175697265643d747275657d0a636f6e74726f6c207b6e756d6265723d377d7b747970653d627574746f6e7d7b646973706c61793d436c6561727d7b746f6f6c746f703d436c656172206f722072656d6f7665206465766963652066726f6d20446576696365206c6973747d0a636f6e74726f6c207b6e756d6265723d347d7b747970653d627574746f6e7d7b726f6c653d68656c707d7b646973706c61793d48656c707d7b746f6f6c7469703d416363657373207573657220677569646520286c61756e636865732062726f77736572297d0a636f6e74726f6c207b6e756d6265723d357d7b747970653d627574746f6e7d7b726f6c653d726573746f72657d7b646973706c61793d44656661756c74737d7b746f6f6c7469703d52657365747320746865207573657220696e7465726661636520616e6420636c6561727320746865206c6f672066696c657d0a636f6e74726f6c207b6e756d6265723d367d7b747970653d627574746f6e7d7b726f6c653d6c6f676765727d7b646973706c61793d4c6f677d7b746f6f6c7469703d4c6f672070657220696e746572666163657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d207d7b646973706c61793d416c6c206164766572746973696e6720646576696365737d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d307d7b76616c75653d5b30302c30302c30302c30302c30302c30302c305d7d7b646973706c61793d466f6c6c6f772049524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d307d7b646973706c61793d4c656761637920506173736b65797d7b64656661756c743d747275657d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d317d7b646973706c61793d4c6567616379204f4f4220646174617d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d327d7b646973706c61793d4c6567616379204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d337d7b646973706c61793d5343204c544b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d347d7b646973706c61793d53432050726976617465204b65797d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d357d7b646973706c61793d49524b7d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d367d7b646973706c61793d416464204c4520616464726573737d0a76616c7565207b636f6e74726f6c3d317d7b76616c75653d377d7b646973706c61793d466f6c6c6f77204c4520616464726573737d").decode()) elif args.extcap_interface and args.extcap_config: # List config print(bytes.fromhex("617267207b6e756d6265723d307d7b63616c6c3d2d2d6f6e6c792d6164766572746973696e677d7b646973706c61793d4f6e6c79206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d317d7b63616c6c3d2d2d6f6e6c792d6c65676163792d6164766572746973696e677d7b646973706c61793d4f6e6c79206c6567616379206164766572746973696e67207061636b6574737d7b746f6f6c7469703d54686520736e69666665722077696c6c206f6e6c792063617074757265206c6567616379206164766572746973696e67207061636b6574732066726f6d207468652073656c6563746564206465766963657d7b747970653d626f6f6c666c61677d7b736176653d747275657d0a617267207b6e756d6265723d327d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d7273707d7b646973706c61793d46696e64207363616e20726573706f6e736520646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f77207363616e20726571756573747320616e64207363616e20726573706f6e73657320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d7363616e2d666f6c6c6f772d6175787d7b646973706c61793d46696e6420617578696c6961727920706f696e74657220646174617d7b746f6f6c7469703d54686520736e69666665722077696c6c20666f6c6c6f772061757820706f696e7465727320696e207363616e206d6f64657d7b747970653d626f6f6c666c61677d7b64656661756c743d747275657d7b736176653d747275657d0a617267207b6e756d6265723d337d7b63616c6c3d2d2d636f6465647d7b646973706c61793d5363616e20616e6420666f6c6c6f772064657669636573206f6e204c4520436f646564205048597d7b746f6f6c7469703d5363616e20666f72206465766963657320616e6420666f6c6c6f772061647665727469736572206f6e204c4520436f646564205048597d7b747970653d626f6f6c666c61677d7b64656661756c743d66616c73657d7b736176653d747275657d").decode()) elif args.capture and args.extcap_interface and args.fifo: # Capture pkts = [ bytes.fromhex("ffffffffffff00000000000008004500001c0001000040117cce7f0000017f0000010035003500080172") ] with open(args.fifo, "wb", 0) as fd: # header fd.write( struct.pack( "IHHIIII", 0xa1b2c3d4, 2, 4, 0, 0, 65535, 1 ) ) for pkt in pkts: fd.write(struct.pack("IIII", 0, 0, len(pkt), len(pkt))) fd.write(bytes(pkt)) else: raise ValueError("Bad arguments") """.strip() with open(extcapprog, "w") as fd: fd.write(data) print(data) os.chmod(extcapprog, 0o777) # Inject and load provider conf.prog.extcap_folders = [extcapfld] load_extcap() print(conf.ifaces.providers) conf.ifaces.reload() # Now do the tests iface = conf.ifaces.dev_from_networkname('/dev/ttyUSB5-None') assert iface.name == "nRF Sniffer for Bluetooth LE" sock = iface.l2listen()(iface=iface) pkts = sock.sniff(timeout=2) sock.close() assert UDP in pkts[0] config = iface.get_extcap_config() assert config["arg"] == [ ('0', '--only-advertising', 'Only advertising packets', '', ''), ('1', '--only-legacy-advertising', 'Only legacy advertising packets', '', ''), ('2', '--scan-follow-rsp', 'Find scan response data', 'true', ''), ('3', '--scan-follow-aux', 'Find auxiliary pointer data', 'true', ''), ('3', '--coded', 'Scan and follow devices on LE Coded PHY', 'false', '') ] # Restore conf.prog.extcap_folders = _bkp_extcap conf.ifaces.providers = _bkp_providers conf.ifaces.reload() = Test read_routes6() - default output routes6 = read_routes6() if WINDOWS: from scapy.arch.windows import _route_add_loopback _route_add_loopback(routes6, True) routes6 # Expected results: # - one route if there is only the loopback interface # - one route if IPv6 is supported but disabled on network interfaces # - three routes if there is a network interface # - on OpenBSD, only two routes on lo0 are expected if routes6: iflist = get_if_list() if WINDOWS: from scapy.arch.windows import _route_add_loopback _route_add_loopback(ipv6=True, iflist=iflist) if OPENBSD: len(routes6) >= 2 elif iflist == [conf.loopback_name]: len(routes6) == 1 elif len(iflist) >= 2: len(routes6) >= 1 else: False else: # IPv6 seems disabled. Force a route to ::1 conf.route6.routes.append(("::1", 128, "::", conf.loopback_name, ["::1"], 1)) conf.route6.ipv6_ifaces = set([conf.loopback_name]) True = Build HBHOptUnknown for IPv6ExtHdrHopByHop with disabled autopad ~ ipv6 hbh opt * Build the HBHOptUnknown of IPv6ExtHdrHopByHop with autopad=0 v6Opt = HBHOptUnknown(otype=3, optlen=7, optdata="Beijing") pkt = Ether()/IPv6()/IPv6ExtHdrHopByHop(autopad=0, options=[v6Opt, ]) pkt.build() = Build HBHOptUnknown for IPv6ExtHdrDestOpt with disabled autopad ~ ipv6 hbh opt * Build the HBHOptUnknown of IPv6ExtHdrDestOpt with autopad=0 v6Opt = HBHOptUnknown(otype=3, optlen=6, optdata="Haikou") pkt = Ether()/IPv6()/IPv6ExtHdrDestOpt(autopad=0, options=[v6Opt, ]) pkt.build() = Test read_routes6() - check mandatory routes import re ll_route = re.compile(r"fe80:\d{0,2}:") # match fe80::, fe80:5:, etc. (if scoped) conf.route6 if len(routes6) > 2 and not WINDOWS: # Identify routes to fe80::/64 assert sum(1 for r in routes6 if r[0] == "::1" and r[4] == ["::1"]) >= 1 if len(iflist) >= 2: assert sum(1 for r in routes6 if ll_route.match(r[0]) and r[1] == 64) >= 1 try: # Identify a route to a node IPv6 link-local address assert sum(1 for r in routes6 if in6_islladdr(r[0]) and r[1] == 128) >= 1 except: # IPv6 is not available, but we still check the loopback assert conf.route6.route("::/0") == (conf.loopback_name, "::", "::") assert sum(1 for r in routes6 if r[1] == 128 and r[4] == ["::1"]) >= 1 else: True = Test ifchange() conf.route6.ifchange(conf.loopback_name, "::1/128") if WINDOWS: conf.netcache.in6_neighbor["::1"] = "ff:ff:ff:ff:ff:ff" # Restore fake cache True = Packet.route() assert (Ether() / ARP()).route()[0] is not None assert (Ether() / ARP()).payload.route()[0] is not None assert (ARP(ptype=0, pdst="hello. this isn't a valid IP")).route()[0] is None = utils/in4_is* assert in4_ismaddr("224.0.0.1") assert not in4_ismaddr("192.168.0.1") assert in4_ismaddr("239.0.0.255") assert in4_ismlladdr("224.0.0.1") assert in4_ismlladdr("224.0.0.255") assert not in4_ismlladdr("224.0.1.255") assert in4_ismgladdr("235.0.0.1") assert not in4_ismgladdr("224.0.0.1") assert not in4_ismgladdr("239.0.0.1") assert in4_ismlsaddr("239.0.0.1") assert not in4_ismlsaddr("224.0.0.1") assert in4_isaddrllallnodes("224.0.0.1") assert not in4_isaddrllallnodes("224.0.0.3") assert in4_getnsmac(b'\xe0\x00\x00\x01') == '01:00:5e:00:00:01' assert getmacbyip("224.0.0.1") == '01:00:5e:00:00:01' = plain_str test data = b"\xffsweet\xef celestia\xab" assert plain_str(data) == "\\xffsweet\\xef celestia\\xab" ############ ############ + compat.py = test bytes_hex/hex_bytes monty_data = b"Stop! Who approaches the Bridge of Death must answer me these questions three, 'ere the other side he see." hex_data = bytes_hex(monty_data) assert hex_data == b'53746f70212057686f20617070726f61636865732074686520427269646765206f66204465617468206d75737420616e73776572206d65207468657365207175657374696f6e732074687265652c202765726520746865206f746865722073696465206865207365652e' assert hex_bytes(hex_data) == monty_data = orb/chb assert orb(b"\x01"[0]) == 1 assert chb(1) == b"\x01" ############ ############ + Main.py tests = Pickle and unpickle a packet import pickle a = IP(dst="192.168.0.1")/UDP() b = pickle.dumps(a) c = pickle.loads(b) assert c[IP].dst == "192.168.0.1" assert raw(c) == raw(a) = Usage test from scapy.main import _usage try: _usage() assert False except SystemExit: assert True = Session test import builtins # This is automatic when using the console def get_var(var): return builtins.__dict__["scapy_session"][var] def set_var(var, value): builtins.__dict__["scapy_session"][var] = value def del_var(var): del builtins.__dict__["scapy_session"][var] init_session(None, {"init_value": 123}) set_var("test_value", "8.8.8.8") # test_value = "8.8.8.8" save_session() del_var("test_value") load_session() update_session() assert get_var("test_value") == "8.8.8.8" #test_value == "8.8.8.8" assert get_var("init_value") == 123 = Session test with fname session_name = tempfile.mktemp() init_session(session_name) set_var("test_value", IP(dst="192.168.0.1")) # test_value = IP(dst="192.168.0.1") save_session(fname="%s.dat" % session_name) del_var("test_value") set_var("z", True) #z = True load_session(fname="%s.dat" % session_name) try: get_var("z") assert False except: pass set_var("z", False) #z = False update_session(fname="%s.dat" % session_name) assert get_var("test_value").dst == "192.168.0.1" #test_value.dst == "192.168.0.1" assert not get_var("z") = Clear session files os.remove("%s.dat" % session_name) = Test temporary file creation ~ ci_only scapy_delete_temp_files() tmpfile = get_temp_file(autoext=".ut") tmpfile if WINDOWS: assert "scapy" in tmpfile and tmpfile.lower().startswith('c:\\users\\appveyor\\appdata\\local\\temp') else: import platform BYPASS_TMP = platform.python_implementation().lower() == "pypy" or DARWIN assert "scapy" in tmpfile and (BYPASS_TMP == True or "/tmp/" in tmpfile) assert conf.temp_files[0].endswith(".ut") scapy_delete_temp_files() assert len(conf.temp_files) == 0 = Emulate interact() ~ interact import sys from unittest import mock from scapy.main import interact from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file _read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART) # By now .config/scapy/startup.py should have been created with open(DEFAULT_PRESTART_FILE, "r") as fd: OLD_DEFAULT_PRESTART = fd.read() with open(DEFAULT_PRESTART_FILE, "w+") as fd: fd.write("conf.interactive_shell = 'ipython'") # Detect IPython try: import IPython except: code_interact_import = "scapy.main.code.interact" else: code_interact_import = "IPython.embed" @mock.patch(code_interact_import) def interact_emulator(code_int, extra_args=[]): try: code_int.side_effect = lambda *args, **kwargs: lambda *args, **kwargs: None interact(argv=["-s scapy1"] + extra_args, mybanner="What a test") finally: sys.ps1 = ">>> " interact_emulator() # Default try: interact_emulator(extra_args=["-?"]) # Failing assert False except: pass interact_emulator(extra_args=["-d"]) # Extended = Emulate interact() and test startup.py with ptpython ~ interact import sys from unittest import mock from scapy.main import DEFAULT_PRESTART_FILE, DEFAULT_PRESTART, _read_config_file _read_config_file(DEFAULT_PRESTART_FILE, _locals=globals(), default=DEFAULT_PRESTART) # By now .config/scapy/startup.py should have been created with open(DEFAULT_PRESTART_FILE, "w+") as fd: fd.write("conf.interactive_shell = 'ptpython'") called = [] def checker(*args, **kwargs): locals = kwargs.pop("locals") assert locals["IP"] history_filename = kwargs.pop("history_filename") assert history_filename == conf.histfile called.append(True) ptpython_mocked_module = Bunch( repl=Bunch( embed=checker ) ) modules_patched = { "ptpython": ptpython_mocked_module, "ptpython.repl": ptpython_mocked_module.repl, "ptpython.repl.embed": ptpython_mocked_module.repl.embed, } with mock.patch.dict("sys.modules", modules_patched): try: interact() finally: sys.ps1 = ">>> " # Restore with open(DEFAULT_PRESTART_FILE, "w") as fd: print(OLD_DEFAULT_PRESTART) r = fd.write(OLD_DEFAULT_PRESTART) assert called = Test explore() with GUI mode ~ command from unittest import mock def test_explore_gui(is_layer, layer): prompt_toolkit_mocked_module = Bunch( shortcuts=Bunch( dialogs=Bunch( radiolist_dialog=(lambda *args, **kargs: layer), button_dialog=(lambda *args, **kargs: "layers" if is_layer else "contribs") ) ), formatted_text=Bunch(HTML=lambda x: x), __version__="2.0.0" ) # a mock.patch isn't enough to mock a module. Let's roll sys.modules modules_patched = { "prompt_toolkit": prompt_toolkit_mocked_module, "prompt_toolkit.shortcuts": prompt_toolkit_mocked_module.shortcuts, "prompt_toolkit.shortcuts.dialogs": prompt_toolkit_mocked_module.shortcuts.dialogs, "prompt_toolkit.formatted_text": prompt_toolkit_mocked_module.formatted_text, } with mock.patch.dict("sys.modules", modules_patched): with ContextManagerCaptureOutput() as cmco: explore() result_explore = cmco.get_output() return result_explore conf.interactive = True explore_dns = test_explore_gui(True, "scapy.layers.dns") assert "DNS" in explore_dns assert "DNS Question Record" in explore_dns assert "DNSRRNSEC3" in explore_dns assert "DNS TSIG Resource Record" in explore_dns explore_avs = test_explore_gui(False, "avs") assert "AVSWLANHeader" in explore_avs assert "AVS WLAN Monitor Header" in explore_avs = Test explore() with non-GUI mode ~ command def test_explore_non_gui(layer): with ContextManagerCaptureOutput() as cmco: explore(layer) result_explore = cmco.get_output() return result_explore explore_dns = test_explore_non_gui("scapy.layers.dns") assert "DNS" in explore_dns assert "DNS Question Record" in explore_dns assert "DNSRRNSEC3" in explore_dns assert "DNS TSIG Resource Record" in explore_dns explore_avs = test_explore_non_gui("avs") assert "AVSWLANHeader" in explore_avs assert "AVS WLAN Monitor Header" in explore_avs assert test_explore_non_gui("scapy.layers.dns") == test_explore_non_gui("dns") assert test_explore_non_gui("scapy.contrib.avs") == test_explore_non_gui("avs") try: explore("unknown_module") assert False # The previous should have raised an exception except Scapy_Exception: pass = Test load_contrib overwrite load_contrib("gtp") assert GTPHeader.__module__ == "scapy.contrib.gtp" load_contrib("gtp_v2") assert GTPHeader.__module__ == "scapy.contrib.gtp_v2" load_contrib("gtp") assert GTPHeader.__module__ == "scapy.contrib.gtp" = Test load_contrib failure try: load_contrib("doesnotexist") assert False except: pass = Test sane function sane("A\x00\xFFB") == "A..B" = Test lhex function assert lhex(42) == "0x2a" assert lhex((28,7)) == "(0x1c, 0x7)" assert lhex([28,7]) == "[0x1c, 0x7]" = Test restart function from unittest import mock conf.interactive = True try: from scapy.utils import restart import os @mock.patch("os.execv") @mock.patch("subprocess.call") @mock.patch("os._exit") def _test(e, m, m2): def check(x, y=[]): z = [x] + y if not isinstance(x, list) else x + y assert os.path.isfile(z[0]) assert os.path.isfile(z[1]) return 0 m2.side_effect = check m.side_effect = check e.side_effect = lambda x: None restart() _test() finally: conf.interactive = False = Test linehexdump function conf_color_theme = conf.color_theme conf.color_theme = BlackAndWhite() assert linehexdump(Ether(src="00:01:02:03:04:05"), dump=True) == 'FF FF FF FF FF FF 00 01 02 03 04 05 90 00 ..............' conf.color_theme = conf_color_theme = Test chexdump function chexdump(Ether(src="00:01:02:02:04:05"), dump=True) == "0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0x00, 0x01, 0x02, 0x02, 0x04, 0x05, 0x90, 0x00" = Test repr_hex function repr_hex("scapy") == "7363617079" = Test hexstr function hexstr(b"A\x00\xFFB") == "41 00 FF 42 A..B" = Test fletcher16 functions assert fletcher16_checksum(b"\x28\x07") == 22319 assert fletcher16_checkbytes(b"\x28\x07", 1) == b"\xaf(" = Test hexdiff function ~ not_pypy def test_hexdiff(a, b, algo=None, autojunk=False): conf_color_theme = conf.color_theme conf.color_theme = BlackAndWhite() with ContextManagerCaptureOutput() as cmco: hexdiff(a, b, algo=algo, autojunk=autojunk) result_hexdiff = cmco.get_output() conf.interactive = True conf.color_theme = conf_color_theme return result_hexdiff # Basic string test result_hexdiff = test_hexdiff("abcde", "abCde") expected = "0000 61 62 63 64 65 abcde\n" expected += " 0000 61 62 43 64 65 abCde\n" assert result_hexdiff == expected # More advanced string test result_hexdiff = test_hexdiff("add_common_", "_common_removed") expected = "0000 61 64 64 5F 63 6F 6D 6D 6F 6E 5F add_common_ \n" expected += " -003 5F 63 6F 6D 6D 6F 6E 5F 72 65 6D 6F 76 _common_remov\n" expected += " 000d 65 64 ed\n" assert result_hexdiff == expected # Compare packets result_hexdiff = test_hexdiff(IP(dst="127.0.0.1", src="127.0.0.1"), IP(dst="127.0.0.2", src="127.0.0.1")) expected = "0000 45 00 00 14 00 01 00 00 40 00 7C E7 7F 00 00 01 E.......@.|.....\n" expected += " 0000 45 00 00 14 00 01 00 00 40 00 7C E6 7F 00 00 01 E.......@.|.....\n" expected += "0010 7F 00 00 01 ....\n" expected += " 0010 7F 00 00 02 ....\n" assert result_hexdiff == expected # Compare using difflib a = "A" * 1000 + "findme" + "B" * 1000 b = "A" * 1000 + "B" * 1000 ret1 = test_hexdiff(a, b, algo="difflib") ret2 = test_hexdiff(a, b, algo="difflib", autojunk=True) expected_ret1 = """ 03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA 03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB 03e0 41 41 41 41 41 41 41 41 42 42 AAAAAAAA BB 03ea 03ea 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB """ expected_ret2 = """ 03d0 03d0 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 41 AAAAAAAAAAAAAAAA 03e0 41 41 41 41 41 41 41 41 66 69 6E 64 6D 65 42 42 AAAAAAAAfindmeBB 03e0 41 41 41 41 41 41 41 41 42 42 42 42 42 42 42 42 AAAAAAAABBBBBBBB 03f0 03f0 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 42 BBBBBBBBBBBBBBBB """ assert ret1 != ret2 assert expected_ret1 in ret1 assert expected_ret2 in ret2 # Test corner cases that should not crash hexdiff(b"abc", IP() / TCP()) hexdiff(IP() / TCP(), b"abc") = Test mysummary functions - Ether p = Ether(dst="ff:ff:ff:ff:ff:ff", src="ff:ff:ff:ff:ff:ff", type=0x9000) p assert p.mysummary() in ['ff:ff:ff:ff:ff:ff > ff:ff:ff:ff:ff:ff (%s)' % loop for loop in ['0x9000', 'LOOP']] = Test zerofree_randstring function random.seed(0x2807) zerofree_randstring(4) in [b"\xd2\x12\xe4\x5b", b'\xd3\x8b\x13\x12'] = Test strand function assert strand(b"AC", b"BC") == b'@C' = Test export_object and import_object functions from unittest import mock def test_export_import_object(): with ContextManagerCaptureOutput() as cmco: export_object(2807) result_export_object = cmco.get_output(eval_bytes=True) assert result_export_object.startswith("eNprYPL9zqUHAAdrAf8=") assert import_object(result_export_object) == 2807 test_export_import_object() = Test tex_escape function tex_escape("$#_") == "\\$\\#\\_" = Test colgen function f = colgen(range(3)) assert len([next(f) for i in range(2)]) == 2 = Test incremental_label function f = incremental_label() assert [next(f) for i in range(2)] == ["tag00000", "tag00001"] = Test corrupt_* functions import random random.seed(0x2807) assert corrupt_bytes("ABCDE") in [b"ABCDW", b"ABCDX"] assert sane(corrupt_bytes("ABCDE", n=3)) in ["A.8D4", ".2.DE"] assert corrupt_bits("ABCDE") in [b"EBCDE", b"ABCDG"] assert sane(corrupt_bits("ABCDE", n=3)) in ["AF.EE", "QB.TE"] = Test save_object and load_object functions import tempfile fd, fname = tempfile.mkstemp() save_object(fname, 2807) assert load_object(fname) == 2807 = Test whois function ~ netaccess if not WINDOWS: result = whois("193.0.6.139") assert b"inetnum" in result and b"Amsterdam" in result = Test manuf DB methods ~ manufdb assert conf.manufdb._resolve_MAC("00:00:0F:01:02:03") == "Next:01:02:03" assert conf.manufdb._get_short_manuf("00:00:0F:01:02:03") == "Next" assert in6_addrtovendor("fe80::0200:0fff:fe01:0203").lower().startswith("next") assert conf.manufdb.lookup("00:00:0F:01:02:03") == ('Next', 'Next, Inc.') assert "00:00:0F" in conf.manufdb.reverse_lookup("Next") = Test multiple wireshark's manuf formats ~ manufdb new_format = """ # comment 00:00:00 JokyIsland Joky Insland Corp SA 00:01:12 SecdevCorp Secdev Corporation SA LLC EE:05:01 Scapy Scapy CO LTD & CIE FF:00:11 NoName """ old_format = """ # comment 00:00:00 JokyIsland # Joky Insland Corp SA 00:01:12 SecdevCorp # Secdev Corporation SA LLC EE:05:01 Scapy # Scapy CO LTD & CIE FF:00:11 NoName """ manuf1 = get_temp_file() manuf2 = get_temp_file() with open(manuf1, "w") as w: w.write(old_format) with open(manuf2, "w") as w: w.write(new_format) a = load_manuf(manuf1) b = load_manuf(manuf2) assert a.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA') assert a.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName') assert a.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')} assert a.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')} assert b.lookup("00:00:00") == ('JokyIsland', 'Joky Insland Corp SA') assert b.lookup("FF:00:11:00:00:00") == ('NoName', 'NoName') assert b.reverse_lookup("Scapy") == {'EE:05:01': ('Scapy', 'Scapy CO LTD & CIE')} assert b.reverse_lookup("Secdevcorp") == {'00:01:12': ('SecdevCorp', 'Secdev Corporation SA LLC')} scapy_delete_temp_files() = Test load_services data_services = """ itu-bicc-stc 3097/sctp cvsup 5999/udp # CVSup x11 6000-6063/tcp # X Window System x11 6000-6063/udp # X Window System ndl-ahp-svc 6064/tcp # NDL-AHP-SVC """ services = get_temp_file() with open(services, "w") as w: w.write(data_services) tcp, udp, sctp = load_services(services) assert tcp[6002] == "x11" assert tcp.ndl_ahp_svc == 6064 assert tcp.x11 in range(6000, 6093) assert udp[6002] == "x11" assert udp.x11 in range(6000, 6093) assert udp.cvsup == 5999 assert sctp[3097] == "itu_bicc_stc" assert sctp.itu_bicc_stc == 3097 scapy_delete_temp_files() = Test utility functions - network related ~ netaccess assert atol("1.1.1.1") == 0x1010101 assert atol("192.168.0.1") == 0xc0a80001 = Test autorun functions ~ autorun ret = autorun_get_text_interactive_session("IP().src") ret assert ret == (">>> IP().src\n'127.0.0.1'\n", '127.0.0.1') ret = autorun_get_html_interactive_session("IP().src") ret assert ret == (">>> IP().src\n'127.0.0.1'\n", '127.0.0.1') ret = autorun_get_latex_interactive_session("IP().src") ret assert ret == ("\\textcolor{blue}{{\\tt\\char62}{\\tt\\char62}{\\tt\\char62} }IP().src\n'127.0.0.1'\n", '127.0.0.1') ret = autorun_get_text_interactive_session("scapy_undefined") assert "NameError" in ret[0] = Test autorun with logging cmds = """log_runtime.info(hex_bytes("446166742050756e6b"))\n""" ret = autorun_get_text_interactive_session(cmds) ret assert "Daft Punk" in ret[0] = Test utility TEX functions assert tex_escape("{scapy}\\^$~#_&%|><") == "{\\tt\\char123}scapy{\\tt\\char125}{\\tt\\char92}\\^{}\\${\\tt\\char126}\\#\\_\\&\\%{\\tt\\char124}{\\tt\\char62}{\\tt\\char60}" a = colgen(1, 2, 3) assert next(a) == (1, 2, 2) assert next(a) == (1, 3, 3) assert next(a) == (2, 2, 1) assert next(a) == (2, 3, 2) assert next(a) == (2, 1, 3) assert next(a) == (3, 3, 1) assert next(a) == (3, 1, 2) assert next(a) == (3, 2, 3) = Test config file functions saved_conf_verb = conf.verb fd, fname = tempfile.mkstemp() os.write(fd, b"conf.verb = 42\n") os.close(fd) from scapy.main import _read_config_file _read_config_file(fname, globals(), locals()) assert conf.verb == 42 conf.verb = saved_conf_verb = Test config file functions failures from scapy.main import _read_config_file, _probe_config_folder assert _read_config_file(_probe_config_folder("filethatdoesnotexistnorwillever.tsppajfsrdrr")) is None = Test CacheInstance repr conf.netcache = Test pyx detection functions from unittest.mock import patch def _r(*args, **kwargs): raise OSError with patch("scapy.libs.test_pyx.subprocess.check_call", _r): from scapy.libs.test_pyx import _test_pyx assert _test_pyx() == False = Test matplotlib detection functions from unittest.mock import MagicMock, patch bck_scapy_libs_matplot = sys.modules.get("scapy.libs.matplot", None) if bck_scapy_libs_matplot: del sys.modules["scapy.libs.matplot"] mock_matplotlib = MagicMock() mock_matplotlib.get_backend.return_value = "inline" mock_matplotlib.pyplot = MagicMock() mock_matplotlib.pyplot.plt = None with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}): from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS, Line2D assert MATPLOTLIB == 1 assert MATPLOTLIB_INLINED == 1 assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS mock_matplotlib.get_backend.return_value = "ko" with patch.dict("sys.modules", **{ "matplotlib": mock_matplotlib, "matplotlib.lines": mock_matplotlib}): from scapy.libs.matplot import MATPLOTLIB, MATPLOTLIB_INLINED, MATPLOTLIB_DEFAULT_PLOT_KARGS assert MATPLOTLIB == 1 assert MATPLOTLIB_INLINED == 0 assert "marker" in MATPLOTLIB_DEFAULT_PLOT_KARGS if bck_scapy_libs_matplot: sys.modules["scapy.libs.matplot"] = bck_scapy_libs_matplot ############ ############ + Basic tests * Those test are here mainly to check nothing has been broken * and to catch Exceptions = Packet class methods p = IP()/ICMP() ret = p.do_build_ps() assert ret[0] == b"@\x00\x00\x00\x00\x01\x00\x00@\x01\x00\x00\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\x00\x00\x00\x00\x00\x00" assert len(ret[1]) == 2 assert p[ICMP].firstlayer() == p assert p.command() == "IP()/ICMP()" p.decode_payload_as(UDP) assert p.sport == 2048 and p.dport == 63487 = hide_defaults conf_color_theme = conf.color_theme conf.color_theme = BlackAndWhite() p = IP(ttl=64)/ICMP() assert repr(p) in [">", ">"] p.hide_defaults() assert repr(p) in [">", ">"] conf.color_theme = conf_color_theme = split_layers p = IP()/ICMP() s = raw(p) split_layers(IP, ICMP, proto=1) assert Raw in IP(s) bind_layers(IP, ICMP, frag=0, proto=1) = fuzz r = fuzz(IP(tos=2)/ICMP()) assert r.tos == 2 z = r.ttl assert r.ttl != z assert r.ttl != z = fuzz a Packet with MultipleTypeField fuzz(ARP(pdst="127.0.0.1")) fuzz(IP()/ARP(pdst='10.0.0.254')) = fuzz on packets with advanced RandNum x = IP(dst="8.8.8.8")/fuzz(UDP()/NTP(version=4)) x.show2() x = IP(raw(x)) assert NTP in x = fuzz on packets with FlagsField assert isinstance(fuzz(TCP()).flags, VolatileValue) = Building some packets ~ basic IP TCP UDP NTP LLC SNAP Dot11 IP()/TCP() Ether()/IP()/UDP()/NTP() Dot11()/LLC()/SNAP()/IP()/TCP()/"XXX" IP(ttl=25)/TCP(sport=12, dport=42) IP().summary() = Manipulating some packets ~ basic IP TCP a=IP(ttl=4)/TCP() a.ttl a.ttl=10 del a.ttl a.ttl TCP in a a[TCP] a[TCP].dport=[80,443] a assert a.copy().time == a.time a=3 = Bind string array as payload ~ basic assert bytes(Raw("sca")/"py") == b"scapy" assert bytes(Raw("sca")/b"py") == b"scapy" assert bytes(Raw("sca")/bytearray(b"py")) == b"scapy" assert bytes("sca"/Raw("py")) == b"scapy" assert bytes(b"sca"/Raw("py")) == b"scapy" assert bytes(bytearray(b"sca")/Raw("py")) == b"scapy" a=Raw("sca") a.add_payload("py") assert bytes(a) == b"scapy" a=Raw("sca") a.add_payload(b"py") assert bytes(a) == b"scapy" a=Raw("sca") a.add_payload(bytearray(b"py")) assert bytes(a) == b"scapy" = Checking overloads ~ basic IP TCP Ether a=Ether()/IP()/TCP() r = a.proto r r == 6 = sprintf() function ~ basic sprintf Ether IP UDP NTP a=Ether()/IP()/IP(ttl=4)/UDP()/NTP() r = a.sprintf("%type% %IP.ttl% %#05xr,UDP.sport% %IP:2.ttl%") r r in ['0x800 64 0x07b 4', 'IPv4 64 0x07b 4'] = sprintf() function ~ basic sprintf IP TCP SNAP LLC Dot11 * This test is on the conditional substring feature of sprintf() a=Dot11()/LLC()/SNAP()/IP()/TCP() r = a.sprintf("{IP:{TCP:flags=%TCP.flags%}{UDP:port=%UDP.ports%} %IP.src%}") r r == 'flags=S 127.0.0.1' = haslayer function ~ basic haslayer IP TCP ICMP ISAKMP x=IP(id=1)/ISAKMP_payload_SA(prop=ISAKMP_payload_SA(prop=IP()/ICMP()))/TCP() r = (TCP in x, ICMP in x, IP in x, UDP in x) r r == (True,True,True,False) = getlayer function ~ basic getlayer IP ISAKMP UDP x=IP(id=1)/ISAKMP_payload_SA(prop=IP(id=2)/UDP(dport=1))/IP(id=3)/UDP(dport=2) x[IP] x[IP:2] x[IP:3] x.getlayer(IP,3) x.getlayer(IP,4) x[UDP] x[UDP:1] x[UDP:2] assert(x[IP].id == 1 and x[IP:2].id == 2 and x[IP:3].id == 3 and x.getlayer(IP).id == 1 and x.getlayer(IP,3).id == 3 and x.getlayer(IP,4) == None and x[UDP].dport == 1 and x[UDP:2].dport == 2) try: x[IP:4] except IndexError: True else: False = getlayer / haslayer with name ~ basic getlayer IP ICMP IPerror TCPerror x = IP() / ICMP() / IPerror() assert x.getlayer(ICMP) is not None assert x.getlayer(IPerror) is not None assert x.getlayer("IP in ICMP") is not None assert x.getlayer(TCPerror) is None assert x.getlayer("TCP in ICMP") is None assert x.haslayer(ICMP) assert x.haslayer(IPerror) assert x.haslayer("IP in ICMP") assert not x.haslayer(TCPerror) assert not x.haslayer("TCP in ICMP") = getlayer with a filter ~ getlayer IP pkt = IP() / IP(ttl=3) / IP() assert pkt[IP::{"ttl":3}].ttl == 3 assert pkt.getlayer(IP, ttl=3).ttl == 3 assert IPv6ExtHdrHopByHop(options=[HBHOptUnknown()]).getlayer(HBHOptUnknown, otype=42) is None = specific haslayer and getlayer implementations for EAP ~ haslayer getlayer EAP pkt = Ether() / EAPOL() / EAP_MD5() assert EAP in pkt assert pkt.haslayer(EAP) assert isinstance(pkt[EAP], EAP_MD5) assert isinstance(pkt.getlayer(EAP), EAP_MD5) = specific haslayer and getlayer implementations for RadiusAttribute ~ haslayer getlayer RadiusAttribute pkt = RadiusAttr_EAP_Message() assert RadiusAttribute in pkt assert pkt.haslayer(RadiusAttribute) assert isinstance(pkt[RadiusAttribute], RadiusAttr_EAP_Message) assert isinstance(pkt.getlayer(RadiusAttribute), RadiusAttr_EAP_Message) = equality ~ basic w=Ether()/IP()/UDP(dport=53) x=Ether()/IP(version=4)/UDP() y=Ether()/IP()/UDP(dport=4) z=Ether()/IP()/UDP()/NTP() t=Ether()/IP()/TCP() assert x != y and x != z and x != t and y != z and y != t and z != t and w == x = answers ~ basic a1, a2 = "1.2.3.4", "5.6.7.8" p1 = IP(src=a1, dst=a2)/ICMP(type=8) p2 = IP(src=a2, dst=a1)/ICMP(type=0) assert p1.hashret() == p2.hashret() assert not p1.answers(p2) assert p2.answers(p1) assert p1 > p2 assert p2 < p1 assert p1 == p1 conf_back = conf.checkIPinIP conf.checkIPinIP = True px = [IP()/p1, IPv6()/p1] assert not any(p.hashret() == p2.hashret() for p in px) assert not any(p.answers(p2) for p in px) assert not any(p2.answers(p) for p in px) conf.checkIPinIP = False assert all(p.hashret() == p2.hashret() for p in px) assert not any(p.answers(p2) for p in px) assert all(p2.answers(p) for p in px) conf.checkIPinIP = conf_back = answers - Net ~ netaccess a1, a2 = Net("www.google.com"), Net("www.secdev.org") prt1, prt2 = 12345, 54321 s1, s2 = 2767216324, 3845532842 p1 = IP(src=a1, dst=a2)/TCP(flags='SA', seq=s1, ack=s2, sport=prt1, dport=prt2) p2 = IP(src=a2, dst=a1)/TCP(flags='R', seq=s2, ack=0, sport=prt2, dport=prt1) assert p2.answers(p1) assert not p1.answers(p2) # Not available yet because of IPv6 # a1, a2 = Net6("www.google.com"), Net6("www.secdev.org") p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2) p2 = IP(src=a2, dst=a1)/TCP(flags='RA', seq=0, ack=s1+1, sport=prt2, dport=prt1) assert p2.answers(p1) assert not p1.answers(p2) p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2) p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+1, sport=prt2, dport=prt1) assert p2.answers(p1) assert not p1.answers(p2) p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2) assert not p2.answers(p1) assert p1.answers(p2) p1 = IP(src=a1, dst=a2)/TCP(flags='S', seq=s1, ack=0, sport=prt1, dport=prt2) p2 = IP(src=a2, dst=a1)/TCP(flags='SA', seq=s2, ack=s1+10, sport=prt2, dport=prt1) assert not p2.answers(p1) assert not p1.answers(p2) p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1, ack=s2+1, sport=prt1, dport=prt2) assert not p2.answers(p1) assert not p1.answers(p2) p1 = IP(src=a1, dst=a2)/TCP(flags='A', seq=s1+9, ack=s2+10, sport=prt1, dport=prt2) assert not p2.answers(p1) assert not p1.answers(p2) = conf.checkIPsrc conf_checkIPsrc = conf.checkIPsrc conf.checkIPsrc = 0 query = IP(id=42676, src='10.128.0.7', dst='192.168.0.1')/ICMP(id=26) answer = IP(src='192.168.48.19', dst='10.128.0.7')/ICMP(type=11)/IPerror(id=42676, src='192.168.51.23', dst='192.168.0.1')/ICMPerror(id=26) assert answer.answers(query) conf.checkIPsrc = conf_checkIPsrc ############ ############ + command() / json() tests ~ command = Test command() with normal packet pkt = IP(dst="127.0.0.1", src="127.0.0.1") / UDP(dport=12345, sport=654) assert pkt.command() == "IP(src='127.0.0.1', dst='127.0.0.1')/UDP(sport=654, dport=12345)" = Test json() with normal packet assert pkt.json() == '{"version": 4, "ihl": null, "tos": 0, "len": null, "id": 1, "flags": 0, "frag": 0, "ttl": 64, "proto": 17, "chksum": null, "src": "127.0.0.1", "dst": "127.0.0.1", "payload": {"sport": 654, "dport": 12345, "len": null, "chksum": null}}' = Test command() with nested packet pkt = DNS(qd=[DNSQR(qtype="A", qname="google.com")]) assert pkt.command() == "DNS(qd=[DNSQR(qname=b'google.com.', qtype=1)])" = Test json() with nested packet assert pkt.json() == '{"length": null, "id": 0, "qr": 0, "opcode": 0, "aa": 0, "tc": 0, "rd": 1, "ra": 0, "z": 0, "ad": 0, "cd": 0, "rcode": 0, "qdcount": null, "ancount": null, "nscount": null, "arcount": null, "qd": [{"qname": "google.com.", "qtype": 1, "unicastresponse": 0, "qclass": 1}]}' = Test command() with ASN.1 packet pkt = KRB_AP_REP(bytes(KRB_AP_REP(encPart=EncryptedData()))) assert pkt.command() == "KRB_AP_REP(pvno=ASN1_INTEGER(5), msgType=ASN1_INTEGER(15), encPart=EncryptedData(etype=ASN1_INTEGER(23), kvno=None, cipher=ASN1_STRING(b'')))" = Test json(à with ASN.1 packet assert pkt.json() == '{"pvno": {"type": "ASN1_INTEGER", "value": "5"}, "msgType": {"type": "ASN1_INTEGER", "value": "15"}, "encPart": {"etype": {"type": "ASN1_INTEGER", "value": "23"}, "kvno": null, "cipher": {"type": "ASN1_STRING", "value": ""}}}' = Test command() with meaningless payload pkt = PPTPStartControlConnectionReply() / IP(dst="127.0.0.1", src="127.0.0.1") assert pkt.command() == "PPTPStartControlConnectionReply()/IP(src='127.0.0.1', dst='127.0.0.1')" = Test json() with meaningless payload assert pkt.json() == '{"len": 156, "type": 1, "magic_cookie": 439041101, "ctrl_msg_type": 2, "reserved_0": 0, "protocol_version": 256, "result_code": 1, "error_code": 0, "framing_capabilities": 0, "bearer_capabilities": 0, "maximum_channels": 65535, "firmware_revision": 256, "host_name": "linux", "vendor_string": "", "payload": {"version": 4, "ihl": null, "tos": 0, "len": null, "id": 1, "flags": 0, "frag": 0, "ttl": 64, "proto": 0, "chksum": null, "src": "127.0.0.1", "dst": "127.0.0.1"}}' ############ ############ + Tests on padding = Padding assembly r = raw(Padding("abc")) r assert r == b"abc" r = raw(Padding("abc")/Padding("def")) r assert r == b"abcdef" r = raw(Raw("ABC")/Padding("abc")/Padding("def")) r assert r == b"ABCabcdef" r = raw(Raw("ABC")/Padding("abc")/Raw("DEF")/Padding("def")) r assert r == b"ABCDEFabcdef" = Padding and length computation p = IP(raw(IP()/Padding("abc"))) p assert p.len == 20 and len(p) == 23 p = IP(raw(IP()/Raw("ABC")/Padding("abc"))) p assert p.len == 23 and len(p) == 26 p = IP(raw(IP()/Raw("ABC")/Padding("abc")/Padding("def"))) p assert p.len == 23 and len(p) == 29 = PadField test ~ PadField padding class TestPad(Packet): fields_desc = [ PadField(StrNullField("st", b""), 6, padwith=b"\xff"), StrField("id", b"")] assert TestPad() == TestPad(raw(TestPad())) assert raw(TestPad(st=b"st", id=b"id")) == b'st\x00\xff\xff\xffid' = ReversePadField ~ PadField padding class TestReversePad(Packet): fields_desc = [ ByteField("a", 0), ReversePadField(IntField("b", 0), 4)] assert raw(TestReversePad(a=1, b=0xffffffff)) == b'\x01\x00\x00\x00\xff\xff\xff\xff' assert TestReversePad(raw(TestReversePad(a=1, b=0xffffffff))).b == 0xffffffff ############ ############ + Tests on default value changes mechanism = Creation of an IPv3 class from IP class with different default values class IPv3(IP): version = 3 ttl = 32 = Test of IPv3 class a = IPv3() v,t = a.version, a.ttl v,t assert (v,t) == (3,32) r = raw(a) r assert r == b'5\x00\x00\x14\x00\x01\x00\x00 \x00\xac\xe7\x7f\x00\x00\x01\x7f\x00\x00\x01' ############ ############ + ASN.1 tests = ASN1 - ASN1_Object assert ASN1_Object(1) == ASN1_Object(1) assert ASN1_Object(1) > ASN1_Object(0) assert ASN1_Object(1) >= ASN1_Object(1) assert ASN1_Object(0) < ASN1_Object(1) assert ASN1_Object(1) <= ASN1_Object(2) assert ASN1_Object(1) != ASN1_Object(2) ASN1_Object(2).show() = ASN1 - RandASN1Object a = RandASN1Object() random.seed(0x2807) o = bytes(a) o assert o in [ b'\x1e\x023V', # PyPy 2.7 b'A\x02\x07q', # Python 2.7 b'F\x02\xfe\x92', # python 3.7-3.9 ] = ASN1 - ASN1_BIT_STRING a = ASN1_BIT_STRING("test", readable=True) a assert a.val == '01110100011001010111001101110100' assert raw(a) == b'\x03\x05\x00test' a = ASN1_BIT_STRING(b"\xff"*16, readable=True) a assert a.val == "1" * 128 assert raw(a) == b'\x03\x11\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff' = ASN1 - ASN1_SEQUENCE a = ASN1_SEQUENCE([ASN1_Object(1), ASN1_Object(0)]) assert a.strshow() == '# ASN1_SEQUENCE:\n \n \n' = ASN1 - ASN1_DECODING_ERROR a = ASN1_DECODING_ERROR("error", exc=OSError(1)) assert repr(a) == "" b = ASN1_DECODING_ERROR("error", exc=OSError(ASN1_BIT_STRING("0"))) assert repr(b) in ["}}>", "}}>"] = ASN1 - ASN1_INTEGER a = ASN1_INTEGER(int("1"*23)) assert repr(a) in ["0x25a55a46e5da99c71c7 ", "0x25a55a46e5da99c71c7 "] = ASN1 - ASN1_OID assert raw(ASN1_OID("")) == b"\x06\x00" = RandASN1Object(), specific crashes import random # ASN1F_NUMERIC_STRING random.seed(1514315682) raw(RandASN1Object()) # ASN1F_VIDEOTEX_STRING random.seed(1240186058) raw(RandASN1Object()) # ASN1F_UTC_TIME & ASN1F_GENERALIZED_TIME random.seed(1873503288) raw(RandASN1Object()) = SSID is parsed properly even with the presence of RSN Information ~ SSID RSN Information # A regression test for https://github.com/secdev/scapy/pull/2685. # https://github.com/secdev/scapy/issues/2683 describes a packet with # RSN Information that isn't parsed properly, # causing the SSID to be overridden. # This test checks the SSID is parsed properly. filename = scapy_path("/test/pcaps/bad_rsn_parsing_overrides_ssid.pcap") frame = rdpcap(filename)[0] beacon = frame.getlayer(5) ssid = beacon.network_stats()['ssid'] assert ssid == "ROUTE-821E295" = SSID is parsed properly even when the Country Information Tag Element has an odd length (not complying with the standard) and a missing pad byte ~ Missing Pad Byte in Country Info # A regression test for https://github.com/secdev/scapy/pull/2685. # https://github.com/secdev/scapy/issues/4132 describes a packet with # a Country Information element tag that has an odd length, even though it's against the standard. # The transmitter should have added a padding byte to make the length even, but it didn't. # The effect on scapy used to be improper parsing of the next tag elements, causing the SSID to be overridden. # This test checks the SSID is parsed properly. from io import BytesIO pcapfile = BytesIO(b'\n\r\r\n\x80\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x03\x00\x10\x00Linux 6.1.21-v8+\x04\x00E\x00Dumpcap (Wireshark) 3.4.10 (Git v3.4.10 packaged as 3.4.10-0+deb11u1)\x00\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00\x01\x00\x00\x00@\x00\x00\x00\x7f\x00\x00\x00\x00\x04\x00\x00\x02\x00\x05\x00wifi2\x00\x00\x00\t\x00\x01\x00\t\x00\x00\x00\x0c\x00\x10\x00Linux 6.1.21-v8+\x00\x00\x00\x00@\x00\x00\x00\x06\x00\x00\x00\xb0\x01\x00\x00\x00\x00\x00\x00c\xd3\x87\x17\xe3c5\x82\x90\x01\x00\x00\x90\x01\x00\x00\x00\x00 \x00\xae@\x00\xa0 \x08\x00\xa0 \x08\x00\x00\x10\x0cd\x14@\x01\xa9\x00\x0c\x00\x00\x00\xa6\x00\xa8\x01\x80\x00\x00\x00\xff\xff\xff\xff\xff\xff\x02\xbf\xaf\x9f\xf8\x07\x02\xbf\xaf\x9f\xf8\x070\x96[p\xdcM\x06\x00\x00\x00d\x00\x11\x00\x00\x00\x01\x08\x8c\x12\x98$\xb0H`l\x03\x01,\x05\x04\x00\x01\x00\x00\x07QUS \x01\r\x80$\x01\x80(\x01\x80,\x01\x800\x01\x804\x01\x808\x01\x80<\x01\x80@\x01\x80d\x01\x80h\x01\x80l\x01\x80p\x01\x80t\x01\x80x\x01\x80|\x01\x80\x80\x01\x80\x84\x01\x80\x88\x01\x80\x8c\x01\x80\x90\x01\x80\x95\x01\x80\x99\x01\x80\x9d\x01\x80\xa1\x01\x80\xa5\x01\x800\x14\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x04\x01\x00\x00\x0f\xac\x02\x0c\x00;\x02s\x00-\x1a,\t\x13\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00,\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00=\x16,\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xdd\x18\x00P\xf2\x02\x01\x01\x81\x00\x03\xa4\x00\x00\'\xa4\x00\x00BC]\x00a\x11.\x00\xdd;\x00P\xf2\x04\x10J\x00\x01\x10\x10D\x00\x01\x02\x10I\x00\x06\x007*\x00\x01 \x10\x11\x00\x1358" Hisense Roku TV\x10T\x00\x08\x00\x07\x00P\xf2\x04\x00\x01\xdd\x16\xc8:k\x01\x01\x1048<@dhlptx|\x80\x84\x88\x8c\x90\xdd\x12Po\x9a\t\x02\x02\x00!\x0b\x03\x06\x00\x02\xbf\xaf\x9f\xf8\x07\xdd\rPo\x9a\n\x00\x00\x06\x01\x11\x1cD\x002\xf5N\xfbh\xb0\x01\x00\x00') pktpcap = rdpcap(pcapfile) frame = pktpcap[0] beacon = frame.getlayer(4) stats = beacon.network_stats() ssid = stats['ssid'] assert ssid == "" country = stats['country'] assert country == 'US' ############ ############ + Network tests * Those tests need network access = Sending and receiving an ICMP ~ netaccess needs_root IP ICMP icmp_firewall def _test(): with no_debug_dissector(): x = sr1(IP(dst="www.google.com")/ICMP(),timeout=3) assert x is not None x assert x[IP].ottl() in [32, 64, 128, 255] assert 0 <= x[IP].hops() <= 126 assert ICMP in x and x[ICMP].type == 0 retry_test(_test) = Sending a TCP syn message at layer 2 and layer 3 ~ netaccess needs_root IP def _test(): tmp = send(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True) assert len(tmp) == 1 tmp = sendp(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), return_packets=True, realtime=True) assert len(tmp) == 1 p = Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S") from decimal import Decimal p.time = Decimal(p.time) tmp = sendp(p, return_packets=True, realtime=True) assert len(tmp) == 1 retry_test(_test) = Latency check: localhost ICMP ~ netaccess needs_root linux latency # Note: still needs to enforce L3RawSocket as this won't work otherwise with libpcap sock = conf.L3socket conf.L3socket = L3RawSocket def _test(): req = IP(dst="127.0.0.1")/ICMP() ans = sr1(req, timeout=3) assert (ans.time - req.sent_time) >= 0 assert (ans.time - req.sent_time) <= 1e-3 try: retry_test(_test) finally: conf.L3socket = sock = Test sniffing on multiple sockets ~ netaccess needs_root sniff # This test sniffs on the same interface twice at the same time, to # simulate sniffing on multiple interfaces. def _test(): iface = conf.route.route(str(Net("www.google.com")))[0] port = int(RandShort()) pkt = IP(dst="www.google.com")/TCP(sport=port, dport=80, flags="S") def cb(): sr1(pkt, timeout=3) sniffer = AsyncSniffer(started_callback=cb, iface=[iface, iface], lfilter=lambda x: TCP in x and x[TCP].dport == port, prn=lambda x: x.summary(), count=2) sniffer.start() sniffer.join(timeout=3) assert len(sniffer.results) == 2 for pkt in sniffer.results: assert pkt.sniffed_on == iface retry_test(_test) = Test sniffing with AsyncSniffer on failed try: sniffer = AsyncSniffer(iface="this_interface_does_not_exists") sniffer.start() sniffer.join() assert False, "Should have errored by now" except ValueError: assert True = Sending a TCP syn 'forever' at layer 2 and layer 3 ~ netaccess needs_root IP def _test(): tmp = srloop(IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3) assert type(tmp) == tuple and len(tmp[0]) == 1 tmp = srploop(Ether()/IP(dst="8.8.8.8")/TCP(sport=RandShort(), dport=53, flags="S"), count=1, timeout=3) assert type(tmp) == tuple and len(tmp[0]) == 1 retry_test(_test) = Sending and receiving an TCP syn with flooding methods ~ netaccess needs_root IP flood from functools import partial # flooding methods do not support timeout. Packing the test for security def _test_flood(ip, flood_function, add_ether=False): with no_debug_dissector(): p = IP(dst=ip)/TCP(sport=RandShort(), dport=80, flags="S") if add_ether: p = Ether()/p p.show2() x = flood_function(p, timeout=0.5, maxretries=10) if type(x) == tuple: x = x[0][0][1] x assert x[IP].ottl() in [32, 64, 128, 255] assert 0 <= x[IP].hops() <= 126 _test_srflood = partial(_test_flood, "www.google.com", srflood) retry_test(_test_srflood) _test_sr1flood = partial(_test_flood, "www.google.fr", sr1flood) retry_test(_test_sr1flood) _test_srpflood = partial(_test_flood, "www.google.net", srpflood, True) retry_test(_test_srpflood) _test_srp1flood = partial(_test_flood, "www.google.co.uk", srp1flood, True) retry_test(_test_srp1flood) = test chainEX ~ netaccess import socket sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM) ssck = StreamSocket(sck) try: r = ssck.sr1(ICMP(type='echo-request'), timeout=0.1, chainEX=True, threaded=False) assert False except Exception: assert True finally: sck.close() = Sending and receiving an ICMPv6EchoRequest ~ netaccess ipv6 def _test(): with no_debug_dissector(): x = sr1(IPv6(dst="www.google.com")/ICMPv6EchoRequest(),timeout=3) x assert x[IPv6].ottl() in [32, 64, 128, 255] assert 0 <= x[IPv6].hops() <= 126 x is not None and ICMPv6EchoReply in x and x[ICMPv6EchoReply].type == 129 retry_test(_test) = Whois request ~ netaccess IP as_resolvers * This test retries on failure because it often fails def _test(): IP(src="8.8.8.8").whois() retry_test(_test) = AS resolvers ~ netaccess IP as_resolvers * This test retries on failure because it often fails def _test(): ret = conf.AS_resolver.resolve("8.8.8.8", "8.8.4.4") assert (len(ret) == 2) assert any(x[1] == "AS15169" for x in ret) retry_test(_test) riswhois_data = b"route: 8.8.8.0/24\ndescr: Google\norigin: AS15169\nnotify: radb-contact@google.com\nmnt-by: MAINT-AS15169\nchanged: radb-contact@google.com 20150728\nsource: RADB\n\nroute: 8.0.0.0/9\ndescr: Proxy-registered route object\norigin: AS3356\nremarks: auto-generated route object\nremarks: this next line gives the robot something to recognize\nremarks: L'enfer, c'est les autres\nremarks: \nremarks: This route object is for a Level 3 customer route\nremarks: which is being exported under this origin AS.\nremarks: \nremarks: This route object was created because no existing\nremarks: route object with the same origin was found, and\nremarks: since some Level 3 peers filter based on these objects\nremarks: this route may be rejected if this object is not created.\nremarks: \nremarks: Please contact routing@Level3.net if you have any\nremarks: questions regarding this object.\nmnt-by: LEVEL3-MNT\nchanged: roy@Level3.net 20060203\nsource: LEVEL3\n\n\n" ret = AS_resolver_riswhois()._parse_whois(riswhois_data) assert ret == ('AS15169', 'Google') retry_test(_test) # This test is too buggy, and is simulated below #def _test(): # ret = AS_resolver_cymru().resolve("8.8.8.8") # assert (len(ret) == 1) # all(x[1] == "AS15169" for x in ret) # #retry_test(_test) cymru_bulk_data = """ Bulk mode; whois.cymru.com [2017-10-03 08:38:08 +0000] 24776 | 217.25.178.5 | INFOCLIP-AS, FR 36459 | 192.30.253.112 | GITHUB - GitHub, Inc., US 26496 | 68.178.213.61 | AS-26496-GO-DADDY-COM-LLC - GoDaddy.com, LLC, US """ tmp = AS_resolver_cymru().parse(cymru_bulk_data) assert len(tmp) == 3 assert [l[1] for l in tmp] == ['AS24776', 'AS36459', 'AS26496'] = AS resolver - IPv6 ~ netaccess IP as_resolvers * This test retries on failure because it often fails def _test(): as_resolver6 = AS_resolver6() ret = as_resolver6.resolve("2001:4860:4860::8888", "2001:4860:4860::4444") assert (len(ret) == 2) assert any(x[1] == 15169 for x in ret) retry_test(_test) = AS resolver - socket error ~ IP * This test checks that a failing resolver will not crash a script class MockAS_resolver(object): def resolve(self, *ips): raise socket.error asrm = AS_resolver_multi(MockAS_resolver()) assert len(asrm.resolve(["8.8.8.8", "8.8.4.4"])) == 0 = sendpfast ~ tcpreplay old_interactive = conf.interactive conf.interactive = False try: sendpfast([]) assert False except Exception: assert True conf.interactive = old_interactive assert True ############ ############ + Generator tests = Implicit logic 1 ~ IP TCP a = Ether() / IP(ttl=(5, 10)) / TCP(dport=[80, 443]) ls(a) ls(a, verbose=True) l = [p for p in a] len(l) == 12 = Implicit logic 2 ~ IP a = IP(ttl=[1,2,(5,9)]) ls(a) ls(a, verbose=True) l = [p for p in a] len(l) == 7 = Implicit logic 3 # In case there's a single option: __iter__ should not return self a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP() for i in a: i.sent_time = 1 assert a.sent_time is None # In case they are several, self should never be returned a = Ether()/IP(src="127.0.0.1", dst="127.0.0.1")/ICMP(seq=(0, 5)) for i in a: i.sent_time = 1 assert a.sent_time is None ############ ############ + Real usages = Port scan ~ netaccess needs_root IP TCP def _test(): with no_debug_dissector(): ans,unans=sr(IP(dst="www.google.com/30")/TCP(dport=[80,443]), timeout=2) # New format: all Python versions ans.make_table(lambda s, r: (s.dst, s.dport, r.sprintf("{TCP:%TCP.flags%}{ICMP:%ICMP.code%}"))) retry_test(_test) = Send & receive with debug_match ~ netaccess needs_root IP ICMP def _test(): old_debug_match = conf.debug_match conf.debug_match = True with no_debug_dissector(): ans, unans = sr(IP(dst="www.google.fr") / TCP(sport=RandShort(), dport=80, flags="S"), timeout=2) assert ans[0].query == ans[0][0] assert ans[0].answer == ans[0][1] conf.debug_match = old_debug_match assert ans and not unans retry_test(_test) = Send & receive with retry ~ netaccess needs_root IP ICMP def _test(): with no_debug_dissector(): ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, retry=1) assert len(ans) == 1 and len(unans) == 1 retry_test(_test) = Send & receive with multi ~ netaccess needs_root IP ICMP def _test(): with no_debug_dissector(): ans, unans = sr(IP(dst=["8.8.8.8", "1.2.3.4"]) / TCP(sport=RandShort(), dport=53, flags="S"), timeout=2, multi=1) assert len(ans) >= 1 and len(unans) == 1 retry_test(_test) = Traceroute function ~ netaccess needs_root tcpdump * Let's test traceroute def _test(): with no_debug_dissector(): ans, unans = traceroute("www.slashdot.org") ans.nsummary() s,r=ans[0] s.show() s.show(2) retry_test(_test) = send() and sniff() ~ netaccess needs_root def _test(): sendp(Ether()/IP(src="9.0.0.0")/UDP(), count=3, iface=conf.iface) r = sniff(timeout=3, count=1, lfilter=lambda x: IP in x and x[IP].src == "9.0.0.0", iface=conf.iface, started_callback=_test) assert r = sniff() with socket failure * GH issue 3631 REFPACKET = Ether()/IP()/UDP() # A socket that fails after 10 packets class OOPipe(ObjectPipe): def recv(self, x=MTU): self.i = getattr(self, "i", 0) + 1 if self.i == 11: self.close() raise OSError("Giant failure") pkt = super(OOPipe, self).recv(x) self.send(REFPACKET) return pkt o = OOPipe() o.send(REFPACKET) pkts = sniff(opened_socket=[o], timeout=3) assert len(pkts) == 10 = GH issue 3306 ~ netaccess needs_root send(fuzz(ARP())) = Test SuperSocket.select ~ select from unittest import mock @mock.patch("scapy.supersocket.select") def _test_select(select): def f(a, b, c, d): raise IOError(0) select.side_effect = f try: SuperSocket.select([]) return False except: return True assert _test_select() = Test L2ListenTcpdump socket ~ netaccess # Needs to be fixed. Fails randomly #import time #for i in range(10): # read_s = L2ListenTcpdump(iface=conf.iface) # out_s = conf.L2socket(iface=conf.iface) # time.sleep(5) # wait for read_s to be ready # icmp_r = Ether()/IP(dst="secdev.org")/ICMP() # res = sndrcv(out_s, icmp_r, timeout=5, rcv_pks=read_s)[0] # read_s.close() # out_s.close() # time.sleep(5) # if res: # break # #response = res[0][1] #assert response[ICMP].type == 0 True = Test set of sent_time by sr ~ netaccess needs_root IP ICMP def _test(): packet = IP(dst="8.8.8.8")/ICMP() r = sr(packet, timeout=2) assert packet.sent_time is not None retry_test(_test) = Test set of sent_time by sr (multiple packets) ~ netaccess needs_root IP ICMP def _test(): packet1 = IP(dst="8.8.8.8")/ICMP() packet2 = IP(dst="8.8.4.4")/ICMP() r = sr([packet1, packet2], timeout=2) assert packet1.sent_time is not None assert packet2.sent_time is not None retry_test(_test) = Test set of sent_time by srflood ~ netaccess needs_root IP ICMP def _test(): packet = IP(dst="8.8.8.8")/ICMP() r = srflood(packet, timeout=0.5) assert packet.sent_time is not None retry_test(_test) = Test set of sent_time by srflood (multiple packets) ~ netaccess needs_root IP ICMP def _test(): packet1 = IP(dst="8.8.8.8")/ICMP() packet2 = IP(dst="8.8.4.4")/ICMP() r = srflood([packet1, packet2], timeout=0.5) assert packet1.sent_time is not None assert packet2.sent_time is not None retry_test(_test) ############ ############ + ManuFDB tests = __repr__ if conf.manufdb: len(conf.manufdb) else: True = check _resolve_MAC if conf.manufdb: assert conf.manufdb._resolve_MAC("00:00:17") == "Oracle" else: True ############ ############ + Ether tests with IPv6 = Ether IPv6 checking for dst ~ netaccess ipv6 p = Ether()/IPv6(dst="www.google.com")/TCP() assert p.dst != p[IPv6].dst p.show() ############ ############ + pcap / pcapng format support ~ pcap = Variable creations from io import BytesIO pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00') pcapngfile = BytesIO(b'\n\r\r\n\\\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x01\x00,\x00File created by merging: \nFile1: test.pcap \n\x04\x00\x08\x00mergecap\x00\x00\x00\x00\\\x00\x00\x00\x01\x00\x00\x00\\\x00\x00\x00e\x00\x00\x00\xff\xff\x00\x00\x02\x006\x00Unknown/not available in original file format(libpcap)\x00\x00\t\x00\x01\x00\x06\x00\x00\x00\x00\x00\x00\x00\\\x00\x00\x00\x06\x00\x00\x00H\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00/\xfc[\xcd(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00H\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\x1f\xff[\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r<\x00\x00\x00\x06\x00\x00\x00<\x00\x00\x00\x00\x00\x00\x00\x8d*\x05\x00\xb9\x02\\\xcd\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00<\x00\x00\x00') pcapnanofile = BytesIO(b"M<\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacV\xc9\xc1\xb5'(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV-;\xc1'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\x9aL\xcf'\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00") pcapwirelenfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x00}\x87pZ.\xa2\x08\x00\x0f\x00\x00\x00\x10\x00\x00\x00\xff\xff\xff\xff\xff\xff GG\xee\xdd\xa8\x90\x00a') pcapngdefaults = BytesIO(base64.b64decode(b'Cg0NChwAAABNPCsaAQAAAP//////////HAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACUeZiQAAAAAgAAAAAQAAACAAAAASAQAA//8AAAkAAQAJAAAAAAAAACAAAAABAAAAIAAAABIBAAD//wAACQABAAkAAAAAAAAAIAAAAAEAAAAgAAAAEgEAAP//AAAJAAEACQAAAAAAAAAgAAAABgAAAIQBAAADAAAApO/bFdgJaeBiAQAAYgEAAFVVVVVVVVXV////////IMbr4D7PCABFAAFIlQkAAEAR5JwAAAAA/////wBEAEMBNJDsAQEGAFSpVwIACoAAAAAAAAAAAAAAAAAAAAAAACDG6+A+zwAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABjglNjNQEB/wAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAsOs+bAAAhAEAAAYAAACAAQAAAwAAAKTv2xXIDYznYAEAAGABAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABRgGPAAAEEal3qf5wqO////rhbgdsATJi0U5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRg0KDQpcQcvWgAEAAAYAAAC4AQAAAwAAAKTv2xV4Ao3nlQEAAJUBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABewGQAAAEEalBqf5wqO////rhbgdsAWfu+k5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOmRldmljZTpwMDBSZW1vdGVDb250cm9sbGVyOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46cGFuYXNvbmljLWNvbTpkZXZpY2U6cDAwUmVtb3RlQ29udHJvbGxlcjoxDQoNCrLVKmoAAAC4AQAABgAAAHgBAAADAAAApO/bFVjbjedXAQAAVwEAAFVVVVVVVVXVAQBef//6IMbr4D7PCABFAAE9AZEAAAQRqX6p/nCo7///+uFuB2wBKaZATk9USUZZICogSFRUUC8xLjENCkhPU1Q6IDIzOS4yNTUuMjU1LjI1MDoxOTAwDQpDQUNIRS1DT05UUk9MOiBtYXgtYWdlPTE4MDANCkxPQ0FUSU9OOiBodHRwOi8vMTY5LjI1NC4xMTIuMTY4OjU1MDAwL25yYy9kZGQueG1sDQpOVDogdXBucDpyb290ZGV2aWNlDQpOVFM6IHNzZHA6YWxpdmUNClNFUlZFUjogRnJlZUJTRC84LjAgVVBuUC8xLjAgUGFuYXNvbmljLU1JTC1ETE5BLVNWLzEuMA0KVVNOOiB1dWlkOjRENDU0OTMwLTAyMDAtMTAwMC04MDAxLTIwQzZFQkUwM0VDRjo6dXBucDpyb290ZGV2aWNlDQoNCjagXoUAeAEAAAYAAAC0AQAAAwAAAKTv2xXYw47nkwEAAJMBAABVVVVVVVVV1QEAXn//+iDG6+A+zwgARQABeQGSAAAEEalBqf5wqO////rhbgdsAWWV4E5PVElGWSAqIEhUVFAvMS4xDQpIT1NUOiAyMzkuMjU1LjI1NS4yNTA6MTkwMA0KQ0FDSEUtQ09OVFJPTDogbWF4LWFnZT0xODAwDQpMT0NBVElPTjogaHR0cDovLzE2OS4yNTQuMTEyLjE2ODo1NTAwMC9ucmMvZGRkLnhtbA0KTlQ6IHVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KTlRTOiBzc2RwOmFsaXZlDQpTRVJWRVI6IEZyZWVCU0QvOC4wIFVQblAvMS4wIFBhbmFzb25pYy1NSUwtRExOQS1TVi8xLjANClVTTjogdXVpZDo0RDQ1NDkzMC0wMjAwLTEwMDAtODAwMS0yMEM2RUJFMDNFQ0Y6OnVybjpwYW5hc29uaWMtY29tOnNlcnZpY2U6cDAwTmV0d29ya0NvbnRyb2w6MQ0KDQovXKFrALQBAAAGAAAAqAEAAAMAAACk79sVuJKP54cBAACHAQAAVVVVVVVVVdUBAF5///ogxuvgPs8IAEUAAW0BkwAABBGpTKn+cKjv///64W4HbAFZRNJOT1RJRlkgKiBIVFRQLzEuMQ0KSE9TVDogMjM5LjI1NS4yNTUuMjUwOjE5MDANCkNBQ0hFLUNPTlRST0w6IG1heC1hZ2U9MTgwMA0KTE9DQVRJT046IGh0dHA6Ly8xNjkuMjU0LjExMi4xNjg6NTUwMDAvbnJjL2RkZC54bWwNCk5UOiB1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCk5UUzogc3NkcDphbGl2ZQ0KU0VSVkVSOiBGcmVlQlNELzguMCBVUG5QLzEuMCBQYW5hc29uaWMtTUlMLURMTkEtU1YvMS4wDQpVU046IHV1aWQ6NEQ0NTQ5MzAtMDIwMC0xMDAwLTgwMDEtMjBDNkVCRTAzRUNGOjp1cm46ZGlhbC1tdWx0aXNjcmVlbi1vcmc6c2VydmljZTpkaWFsOjENCg0KLn5A6QCoAQAA')) = Read a pcap file pktpcap = rdpcap(pcapfile) = Read a pcapng file pktpcapng = rdpcap(pcapngfile) assert pktpcapng[0].time == 1454163407.666223 = Read a pcap file with nanosecond precision pktpcapnano = rdpcap(pcapnanofile) assert pktpcapnano[0].time == 1454163407.666223049 = Read a pcapng file with nanosecond precision and default tsresol pktpcapngdefaults = rdpcap(pcapngdefaults) assert pktpcapngdefaults[0].time == 1575115986.114775512 assert Ether in pktpcapngdefaults[0] = Read a pcapng with little-endian SHB pktcapng = sniff(offline=scapy_path("/test/pcaps/macos.pcapng.gz")) assert len(pktcapng) != 0 = Write a pcapng tmpfile = get_temp_file(autoext=".pcapng") r = RawPcapNgWriter(tmpfile) r._write_block_shb() r._write_block_idb(linktype=DLT_EN10MB) ts = 1632568366.384185 r._write_block_epb(raw(Ether()/"Hello Scapy!!!"), ifid=0, timestamp=ts) r.f.close() assert os.stat(tmpfile).st_size == 108 l = rdpcap(tmpfile) assert b"Scapy" in l[0][Raw].load assert l[0].time == ts = Check wrpcapng() tmpfile = get_temp_file(autoext=".pcapng") p = Ether()/"Hello Scapy!!!" p.time = 1632568366.384185 wrpcapng(tmpfile, p) assert os.stat(tmpfile).st_size == 108 l = rdpcap(tmpfile) assert b"Scapy" in l[0][Raw].load assert l[0].time == ts p = Ether() / IPv6() / TCP() p.comment = b"Hello Scapy!" wrpcapng(tmpfile, p) l = rdpcap(tmpfile) assert l[0].comment == p.comment = rdpcap on fifo ~ linux f = get_temp_file() os.unlink(f) os.mkfifo(f) p = Ether(bytes(Ether(dst="ff:ff:ff:ff:ff:ff")/"Hello Scapy!!!")) s = AsyncSniffer(offline=f) s.start() wrpcap(f, p) s.join(timeout=1) assert s.results[0] == p = Check multiple packets with different combination of linktype,comment,direction,sniffed_on fields. test both wrpcap() and wrpcapng() import random,string random.seed(0x2807) plist = [] ptypes = [] ptypes.append(Ether((Ether() / IPv6() / TCP()).build())) ptypes.append(IP((IP() / IPv6() / TCP()).build())) ifaces=[None,'','i','int0',''.join(random.choices(string.printable,k=20))] comments=[None,'','a','abcd',''.join(random.choices(string.printable,k=20))] directions=[None,0,1,2,3] for iface in ifaces: for comment in comments: if comment is not None: comment=comment.encode('utf-8') for direction in directions: for p in ptypes: if iface is not None and type(ptypes[ifaces.index(iface) % len(ptypes)]) != type(p): continue pnew = p.copy() pnew.time = 1632568366.384185 pnew.sniffed_on = iface pnew.direction = direction pnew.comment = comment plist.append(pnew) random.shuffle(plist) tmpfile = get_temp_file(autoext=".pcapng") wrpcapng(tmpfile, plist) plist_check = rdpcap(tmpfile) assert len(plist_check) == len(plist) for i in range(len(plist)): assert plist_check[i].comment == plist[i].comment assert plist_check[i].direction == plist[i].direction assert plist_check[i].sniffed_on == plist[i].sniffed_on assert plist_check[i].time == plist[i].time #if interface is unknown, verify pkt bytes integrity and that linktype was set to first packet if plist[i].sniffed_on is None: assert bytes(plist_check[i]) == bytes(plist[i]) assert type(plist_check[i]) == type(plist[0]) else: assert plist_check[i] == plist[i] tmpfile = get_temp_file(autoext=".pcap") wrpcap(tmpfile, plist) plist_check = rdpcap(tmpfile) for i in range(len(plist)): assert plist_check[i].time == plist[i].time assert type(plist_check[i]) == type(plist[0]) assert bytes(plist_check[i]) == bytes(plist[i]) = PcapNg - Process Information Block pib_pcapng_file = BytesIO(b'\n\r\r\n\xbc\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x02\x00\x05\x00arm64\x00\x00\x00\x03\x00f\x00Darwin Kernel Version 23.3.0: Thu Dec 21 02:29:41 PST 2023; root:xnu-10002.81.5~11/RELEASE_ARM64_T8122\x00\x00\x04\x00 \x00tcpdump (libpcap version 1.10.1)\x00\x00\x00\x00\xbc\x00\x00\x00\x01\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x00\x00\x00\x08\x00\x02\x00\x03\x00en0\x00\x00\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x80 \x00\x00\x00$\'\x00\x00\x02\x00\x06\x00trustd\x00\x00\x00\x00\x00\x00 \x00\x00\x00\x01\x00\x00\x80$\x00\x00\x00")\x00\x00\x02\x00\x0c\x00mobileassetd\x00\x00\x00\x00$\x00\x00\x00\x06\x00\x00\x00\x90\x00\x00\x00\x00\x00\x00\x00\xfb\x18\x06\x00EcqdB\x00\x00\x00B\x00\x00\x00\xe8\x9f\x80\xfa\x8c\xc6P\xa6\xd8\xd5\x83v\x08\x00E\x00\x004\x00\x00@\x00@\x06\x90T\nh\x01\xc3\xc0\xe5\xdd_\xf4\xb8\x00P\x95\xc3\xcb\x01\xcb\xeb\x11\xe8\x80\x11\x08\x00\x0c\xe6\x00\x00\x01\x01\x08\n\xbe\xb8\xd4\xb3\xbb\x9b4\xbc\x00\x00\x01\x80\x04\x00\x00\x00\x00\x00\x03\x80\x04\x00\x01\x00\x00\x00\x02\x00\x04\x00\x02\x00\x00\x00\x02\x80\x04\x00\x00\x00\x00\x00\x04\x80\x04\x00\x10\x00\x00\x00\x00\x00\x00\x00\x90\x00\x00\x00') l = rdpcap(pib_pcapng_file) assert(len(l) == 1) assert(TCP in l[0]) assert(len(l[0].process_information) == 2) assert(l[0].process_information["proc"]["name"] == "trustd") = OSS-Fuzz Findings from io import BytesIO # Issue 68352 file = BytesIO(b"\n\r\r\n\x1c\x00\x00\x00M<+\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\x1c\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00\xe4\x00\x00\x00\x00\x00\x04\x00\x14\x00\x00\x00\x01\x00\x00\x00(\x00\x00\x00\xe4\x00\x00\x00\x00\x00\x04\x00\x02\x00\t\x00b'ens16\xb0'\x00\x00\x00\x00\x00\x00\x00(\x00\x00\x00\x06\x00\x00\x004\x00\x00\x00\x01\x00\x00\x00}\x17\x06\x00\xb5t\x1d\x85\x14\x00\x00\x00\x14\x00\x00\x00E\x00\x00\x14\x00\x01\x00\x00@\x00|\xe7\x7f\x00\x00\x01\x7f\x00\x00\x014\x00\x00\x00") rdpcap(file) # Issue 68354 file = BytesIO(b'\n\r\r\n\xff\xfe\xfe\xffM<+\x1a') try: rdpcap(file) except Scapy_Exception: pass # Issue #70115 file = BytesIO(b"\n\r\r\n\x00\x00\x008\x1a+ Raise try: invalid_pcapngfile_1 = BytesIO(b'\n\r\r\n\r\x00\x00\x00M<+\x1a\xb2<\xb2\xa1\x01\x00\x00\x00\r\x00\x00\x00M<+\x1a\x80\xaa\xb2\x02') rdpcap(invalid_pcapngfile_1) assert False except Scapy_Exception: pass # Invalid Packet in PCAPNG -> return invalid_pcapngfile_2 = BytesIO(b'\n\r\r\n\x00\x00\x00\x1c\x1a+ raise EOFError try: invalid_pcapngfile_3 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x03\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00') rdpcap(invalid_pcapngfile_3) assert False except Scapy_Exception: pass # Invalid SPB in PCAPNG -> raise EOFError try: invalid_pcapngfile_4 = BytesIO(b'\n\n\n\x14\x00\x00\x00M<+\x1a \x14\x00\x00\x00\x01\x00\x00\x00\x14\x00\x00\x00 \x14\x00\x00\x00\x03\x00\x00\x00\x0c\x00\x00\x00\x0c\x00\x00\x00') rdpcap(invalid_pcapngfile_4) assert False except Scapy_Exception: pass = Check PcapWriter on null write f = BytesIO() w = PcapWriter(f) w.write([]) assert len(f.getvalue()) == 0 # Stop being closed for reals, but we still want to have the header written with mock.patch.object(f, 'close') as cf: w.close() cf.assert_called_once_with() assert len(f.getvalue()) != 0 = Check PcapWriter sets correct linktype after null write f = BytesIO() w = PcapWriter(f) w.write([]) assert len(f.getvalue()) == 0 w.write(Ether()/IP()/ICMP()) assert len(f.getvalue()) != 0 # Stop being closed for reals, but we still want to have the header written with mock.patch.object(f, 'close') as cf: w.close() cf.assert_called_once_with() f.seek(0) or None assert len(f.getvalue()) != 0 r = PcapReader(f) f.seek(0) or None assert r.LLcls is Ether assert r.linktype == DLT_EN10MB l = [ p for p in RawPcapReader(f) ] assert len(l) == 1 = Check RawPcapReader on pcap ~ pcap fd = get_temp_file() wrpcap(fd, [Ether()/IP()/ICMP()]) assert len([p for p in RawPcapReader(fd)]) == 1 for (x, y) in RawPcapReader(fd): pass = Check RawPcapReader with a Context Manager ~ pcap filename = get_temp_file(fd=False) wrpcap(filename, [IP()/TCP(), IP()/UDP()]) try: with RawPcapReader(filename) as reader: packet = next(reader, None) assert True except TypeError: assert False = Check RawPcapWriter ~ pcap # GH3256 fd = get_temp_file() with RawPcapWriter(fd, linktype=1) as w: w.write(b"test") fd = get_temp_file() with RawPcapWriter(fd) as w: w.write(b"test") assert w.linktype == 1 = Check tcpdump() ~ tcpdump from io import BytesIO * No very specific tests because we do not want to depend on tcpdump output pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00\x01\x00\x00\x000}$]\xff\\\t\x006\x00\x00\x006\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x000}$]\x87i\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r0}$]\xfbp\t\x00*\x00\x00\x00*\x00\x00\x00\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x08\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00') data = tcpdump(pcapfile, dump=True, args=['-nn']).split(b'\n') print(data) assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0] assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1] assert b'127.0.0.1 > 127.0.0.1:' in data[2] * Non existing tcpdump binary from unittest import mock conf_prog_tcpdump = conf.prog.tcpdump conf.prog.tcpdump = "tcpdump_fake" def _test_tcpdump_notcpdump(): try: tcpdump(IP()/TCP()) assert False except: assert True _test_tcpdump_notcpdump() conf.prog.tcpdump = conf_prog_tcpdump # Also check with use_tempfile=True (for non-OSX platforms) pcapfile.seek(0) or None tempfile_count = len(conf.temp_files) data = tcpdump(pcapfile, dump=True, args=['-nn'], use_tempfile=True).split(b'\n') print(data) assert b'127.0.0.1.20 > 127.0.0.1.80:' in data[0] assert b'127.0.0.1.53 > 127.0.0.1.53:' in data[1] assert b'127.0.0.1 > 127.0.0.1:' in data[2] # We should have another tempfile tracked. assert len(conf.temp_files) > tempfile_count # Check with a simple packet data = tcpdump([Ether()/IP()/ICMP()], dump=True, args=['-nn']).split(b'\n') print(data) assert b'127.0.0.1 > 127.0.0.1: ICMP' in data[0].upper() = Check tcpdump() command with linktype ~ tcpdump libpcap f = BytesIO() pkt = Ether()/IP()/ICMP() with mock.patch('subprocess.Popen', return_value=Bunch( stdin=f, wait=lambda: None)) as popen: # Prevent closing the BytesIO with mock.patch.object(f, 'close'): tcpdump([pkt], linktype="DLT_EN10MB", use_tempfile=False) expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-'] if OPENBSD: expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-'] popen.assert_called_once_with( expected_command, stdin=subprocess.PIPE, stdout=None, stderr=None) print(bytes_hex(f.getvalue())) assert raw(pkt) in f.getvalue() f.close() del f, pkt = Check tcpdump() command with linktype and args ~ tcpdump libpcap f = BytesIO() pkt = Ether()/IP()/ICMP() with mock.patch('subprocess.Popen', return_value=Bunch( stdin=f, wait=lambda: None)) as popen: # Prevent closing the BytesIO with mock.patch.object(f, 'close'): tcpdump([pkt], linktype=scapy.data.DLT_EN10MB, use_tempfile=False) expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-U', '-r', '-'] if OPENBSD: expected_command = [conf.prog.tcpdump, '-y', 'EN10MB', '-r', '-'] popen.assert_called_once_with( expected_command, stdin=subprocess.PIPE, stdout=None, stderr=None) print(bytes_hex(f.getvalue())) assert raw(pkt) in f.getvalue() f.close() del f, pkt = Check sniff() offline with linktype & 802.11 filter ~ tcpdump linux fd = get_temp_file() wrpcap(fd, [RadioTap()/Dot11()/Dot11ProbeReq(), RadioTap()/Dot11()]) lst = sniff(offline=fd, filter="subtype probe-req") assert len(lst) == 1 = Check tcpdump() command rejects non-string input for prog pkt = Ether()/IP()/ICMP() try: tcpdump([pkt], prog=+17607067425, args=['-nn']) except ValueError as e: if hasattr(e, 'args'): assert 'prog' in e.args[0] else: assert 'prog' in e.message else: assert False, 'expected exception' = Check tcpdump() command with tshark ~ tshark pcapfile = BytesIO(b'\xd4\xc3\xb2\xa1\x02\x00\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\x00\x00e\x00\x00\x00\xcf\xc5\xacVo*\n\x00(\x00\x00\x00(\x00\x00\x00E\x00\x00(\x00\x01\x00\x00@\x06|\xcd\x7f\x00\x00\x01\x7f\x00\x00\x01\x00\x14\x00P\x00\x00\x00\x00\x00\x00\x00\x00P\x02 \x00\x91|\x00\x00\xcf\xc5\xacV_-\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x11|\xce\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x00\x08\x01r\xcf\xc5\xacV\xf90\n\x00\x1c\x00\x00\x00\x1c\x00\x00\x00E\x00\x00\x1c\x00\x01\x00\x00@\x01|\xde\x7f\x00\x00\x01\x7f\x00\x00\x01\x08\x00\xf7\xff\x00\x00\x00\x00') # tshark doesn't need workarounds on OSX tempfile_count = len(conf.temp_files) values = [tuple(int(val) for val in line[:-1].split(b'\t')) for line in tcpdump(pcapfile, prog=conf.prog.tshark, getfd=True, args=['-T', 'fields', '-e', 'ip.ttl', '-e', 'ip.proto'])] assert values == [(64, 6), (64, 17), (64, 1)] assert len(conf.temp_files) == tempfile_count = Check tdecode command directly for tshark ~ tshark pkts = [ Ether()/IP(src='192.0.2.1', dst='192.0.2.2')/ICMP(type='echo-request')/Raw(b'X'*100), Ether()/IP(src='192.0.2.2', dst='192.0.2.1')/ICMP(type='echo-reply')/Raw(b'X'*100), ] # tshark doesn't need workarounds on OSX tempfile_count = len(conf.temp_files) r = tdecode(pkts, dump=True) r assert b'Src: 192.0.2.1' in r assert b'Src: 192.0.2.2' in r assert b'Dst: 192.0.2.2' in r assert b'Dst: 192.0.2.1' in r assert b'Echo (ping) request' in r assert b'Echo (ping) reply' in r assert b'ICMP' in r assert len(conf.temp_files) == tempfile_count = Check tdecode with linktype ~ tshark # These are the same as the ping packets above pkts = [ b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x01\xc0\x00\x02\x02\x08\x00\xb6\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', b'\xff\xff\xff\xff\xff\xff\xac"\x0b\xc5j\xdb\x08\x00E\x00\x00\x80\x00\x01\x00\x00@\x01\xf6x\xc0\x00\x02\x02\xc0\x00\x02\x01\x00\x00\xbe\xbe\x00\x00\x00\x00XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX', ] # tshark doesn't need workarounds on OSX tempfile_count = len(conf.temp_files) r = tdecode(pkts, dump=True, linktype=DLT_EN10MB) assert b'Src: 192.0.2.1' in r assert b'Src: 192.0.2.2' in r assert b'Dst: 192.0.2.2' in r assert b'Dst: 192.0.2.1' in r assert b'Echo (ping) request' in r assert b'Echo (ping) reply' in r assert b'ICMP' in r assert len(conf.temp_files) == tempfile_count = Run scapy's tshark command ~ needs_root tshark(count=1, timeout=3) = Check wireshark() ~ wireshark f = BytesIO() pkt = Ether()/IP()/ICMP() with mock.patch('subprocess.Popen', return_value=Bunch(stdin=f)) as popen: # Prevent closing the BytesIO with mock.patch.object(f, 'close'): wireshark([pkt]) popen.assert_called_once_with( [conf.prog.wireshark, '-ki', '-'], stdin=subprocess.PIPE, stdout=None, stderr=None) print(bytes_hex(f.getvalue())) assert raw(pkt) in f.getvalue() f.close() del f, pkt = Check Raw IP pcap files import tempfile filename = tempfile.mktemp(suffix=".pcap") wrpcap(filename, [IP()/UDP(), IPv6()/UDP()], linktype=DLT_RAW) packets = rdpcap(filename) assert isinstance(packets[0], IP) and isinstance(packets[1], IPv6) = Check wrpcap() with no packet import tempfile filename = tempfile.mktemp(suffix=".pcap") wrpcap(filename, []) fstat = os.stat(filename) assert fstat.st_size != 0 os.remove(filename) = Check wrpcap() with SndRcvList import tempfile filename = tempfile.mktemp(suffix=".pcap") wrpcap(filename, SndRcvList(res=[(Ether()/IP(), Ether()/IP())])) assert len(rdpcap(filename)) == 2 os.remove(filename) = Check wrpcap() with different packets types from unittest import mock import os import tempfile with mock.patch("scapy.utils.warning") as warning: filename = tempfile.mktemp() wrpcap(filename, [IP(), Ether(), IP(), IP()]) os.remove(filename) assert any("Inconsistent" in arg for arg in warning.call_args[0]) = Check wrpcap() with the Loopback layer ~ tshark for cls in [Loopback, LoopbackOpenBSD]: filename = tempfile.mktemp(suffix=".pcap") wrpcap(filename, [cls()/IP()/ICMP()]) return_value = b"".join(line for line in tcpdump(filename, prog=conf.prog.tshark, getfd=True)) assert b"Echo (ping) request" in return_value ############ ############ + ERF Ethernet format support = Variable creations erffile = BytesIO(b'3;!E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x00\x0fS?\xca\xc0\x1cjz\x18\x90\xed\x81\x00\x01:\x08\x00E\x00\x00(\xdf\xab@\x00;\x06\xb3s\n\x01]\xdb\n\xfb9\xda\xc3v\x84\xecD\x16\xb9\xab\xda\xa1b\xf9P\x10f\x98\x18\xcb\x00\x00\x00\x00\x90\x9e\xd7\xd2_\x929_\x0f\x9e\xcd\x1f\x01\x88\xb9\x15[/s<\x01\x88\xb9\x15[/\xcd\x1f\x01\x88\xb9\x15[/0\xcd"E_9\x92_\x02\x04\x00p\x00\x00\x00P\x00\x00\x1cjz\x18\x90\xed\x00\x0fS?\xca\xc0\x08\x00E\x00\x00(\xa2\xdd@\x00@\x06\xebA\n\xfb9\xda\n\x01]\xdb\x84\xec\xc3v\xda\xa1b\xf9D\x16\xb9\xacP\x10\x9a\xf0\xe4q\x00\x00\x00\x00\x00\x00\x00\x00o\xbc\xe2{_\x929_\x0f\x9f+3\x01\x88\xb9\x15u\x1e(^\x01\x88\xb9\x15u\x1e+3\x01\x88\xb9\x15u\x1e') erffilewithheader = BytesIO(b'4;!E_9\x92_\x82\x00\x00x\x00\x00\x00P\x00\x00\x1a+' pkt.flags.MF = 0 pkt.flags.DF = 1 assert not pkt.flags.MF assert pkt.flags.DF assert not pkt.flags.evil assert repr(pkt.flags) == '' pkt.flags |= 'evil+MF' pkt.flags &= 'DF+MF' assert pkt.flags.MF assert pkt.flags.DF assert not pkt.flags.evil assert repr(pkt.flags) == '' pkt = IP(flags=3) assert pkt.flags.MF assert pkt.flags.DF assert not pkt.flags.evil assert repr(pkt.flags) == '' pkt.flags = 6 assert not pkt.flags.MF assert pkt.flags.DF assert pkt.flags.evil assert repr(pkt.flags) == '' assert len({IP().flags, IP().flags}) == 1 pkt = IP() pkt.flags = "" assert pkt.flags == 0 = TCP flags ~ TCP pkt = TCP(flags="SA") assert pkt.flags == 18 assert pkt.flags.S assert pkt.flags.A assert pkt.flags.SA assert not any(getattr(pkt.flags, f) for f in 'FRPUECN') assert repr(pkt.flags) == '' pkt.flags.U = True pkt.flags.S = False assert pkt.flags.A assert pkt.flags.U assert pkt.flags.AU assert not any(getattr(pkt.flags, f) for f in 'FSRPECN') assert repr(pkt.flags) == '' pkt.flags &= 'SFA' pkt.flags |= 'P' assert pkt.flags.P assert pkt.flags.A assert pkt.flags.PA assert not any(getattr(pkt.flags, f) for f in 'FSRUECN') pkt = TCP(flags=56) assert all(getattr(pkt.flags, f) for f in 'PAU') assert pkt.flags.PAU assert not any(getattr(pkt.flags, f) for f in 'FSRECN') assert repr(pkt.flags) == '' pkt.flags = 50 assert all(getattr(pkt.flags, f) for f in 'SAU') assert pkt.flags.SAU assert not any(getattr(pkt.flags, f) for f in 'FRPECN') assert repr(pkt.flags) == '' = Flag values mutation with .raw_packet_cache ~ IP TCP pkt = IP(raw(IP(flags="MF")/TCP(flags="SA"))) assert pkt.raw_packet_cache is not None assert pkt[TCP].raw_packet_cache is not None assert pkt.flags.MF assert not pkt.flags.DF assert not pkt.flags.evil assert repr(pkt.flags) == '' assert pkt[TCP].flags.S assert pkt[TCP].flags.A assert pkt[TCP].flags.SA assert not any(getattr(pkt[TCP].flags, f) for f in 'FRPUECN') assert repr(pkt[TCP].flags) == '' pkt.flags.MF = 0 pkt.flags.DF = 1 pkt[TCP].flags.U = True pkt[TCP].flags.S = False pkt = IP(raw(pkt)) assert not pkt.flags.MF assert pkt.flags.DF assert not pkt.flags.evil assert repr(pkt.flags) == '' assert pkt[TCP].flags.A assert pkt[TCP].flags.U assert pkt[TCP].flags.AU assert not any(getattr(pkt[TCP].flags, f) for f in 'FSRPECN') assert repr(pkt[TCP].flags) == '' = Operations on flag values ~ TCP p1, p2 = TCP(flags="SU"), TCP(flags="AU") assert (p1.flags & p2.flags).U assert not any(getattr(p1.flags & p2.flags, f) for f in 'FSRPAECN') assert all(getattr(p1.flags | p2.flags, f) for f in 'SAU') assert (p1.flags | p2.flags).SAU assert not any(getattr(p1.flags | p2.flags, f) for f in 'FRPECN') assert TCP(flags="SA").flags & TCP(flags="S").flags == TCP(flags="S").flags assert TCP(flags="SA").flags | TCP(flags="S").flags == TCP(flags="SA").flags ############ ############ + 802.3 = Test detection assert isinstance(Dot3(raw(Ether())),Ether) assert isinstance(Ether(raw(Dot3())),Dot3) a = Ether(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00') assert isinstance(a,Dot3) assert a.dst == 'ff:ff:ff:ff:ff:ff' assert a.src == '00:00:00:00:00:00' a = Dot3(b'\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x90\x00') assert isinstance(a,Ether) assert a.dst == 'ff:ff:ff:ff:ff:ff' assert a.src == '00:00:00:00:00:00' ############ ############ + ASN.1 = MIB ~ mib import tempfile fd, fname = tempfile.mkstemp() os.write(fd, b"-- MIB test\nscapy OBJECT IDENTIFIER ::= {test 2807}\n") os.close(fd) load_mib(fname) assert sum(1 for k in conf.mib.d.values() if "scapy" in k) == 1 assert sum(1 for oid in conf.mib) > 100 = MIB - graph ~ mib from unittest import mock @mock.patch("scapy.asn1.mib.do_graph") def get_mib_graph(do_graph): def store_graph(graph, **kargs): assert graph.startswith("""digraph "mib" {""") assert """"test.2807" [ label="scapy" ];""" in graph do_graph.side_effect = store_graph conf.mib._make_graph() get_mib_graph() = MIB - test aliases ~ mib # https://github.com/secdev/scapy/issues/2542 assert conf.mib._oidname("2.5.29.19") == "basicConstraints" = DADict tests a = DADict("test") a[0] = "test_value1" a["scapy"] = "test_value2" assert a.test_value1 == 0 assert a.test_value2 == "scapy" with ContextManagerCaptureOutput() as cmco: a._show() outp = cmco.get_output() assert "scapy = 'test_value2'" in outp assert "0 = 'test_value1'" in outp = Test ETHER_TYPES assert ETHER_TYPES.IPv4 == 2048 try: import warnings with warnings.catch_warnings(record=True) as w: warnings.simplefilter("always") ETHER_TYPES["BAOBAB"] = 0xffff assert ETHER_TYPES.BAOBAB == 0xffff assert issubclass(w[-1].category, DeprecationWarning) except DeprecationWarning: # -Werror is used pass = MIB - Check that MIB OIDs are not duplicated ~ mib from scapy.asn1.mib import x509_oids_sets _dct = {} for d in x509_oids_sets: for elt in d: if elt in _dct: raise ValueError("OID %s already exists" % elt) _dct.update(d) = BER tests BER_id_enc(42) == '*' BER_id_enc(2807) == b'\xbfw' b = BERcodec_IPADDRESS() r1 = b.enc("8.8.8.8") r1 == b'@\x04\x08\x08\x08\x08' r2 = b.dec(r1)[0] r2.val == '8.8.8.8' a = b'\x1f\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\xfe\x01\x01\x00C\x02\x01U0\x0f0\r\x06\x08+\x06\x01\x02\x01\x02\x01\x00\x02\x01!' ret = False try: BERcodec_Object.check_type(a) except BER_BadTag_Decoding_Error: ret = True else: ret = False assert ret = BER trigger failures try: BERcodec_INTEGER.do_dec(b"\x02\x01") assert False except BER_Decoding_Error: pass ############ ############ + Fields = FieldLenField with BitField class Test(Packet): name = "Test" fields_desc = [ FieldLenField("BitCount", None, fmt="H", count_of="Values"), FieldLenField("ByteCount", None, fmt="B", length_of="Values"), FieldListField("Values", [], BitField("data", 0x0, size=1), count_from=lambda pkt: pkt.BitCount), ] pkt = Test(raw(Test(Values=[0, 0, 0, 0, 1, 1, 1, 1]))) assert pkt.BitCount == 8 assert pkt.ByteCount == 1 = PacketListField class TestPacket(Packet): name = 'TestPacket' fields_desc = [ PacketListField('list', [], 0) ] a = TestPacket() a.list.append(1) assert len(a.list) == 1 b = TestPacket() assert len(b.list) == 0 = Test PacketListField deepcopy class SubPacket(Packet): name = "SubPacket" fields_desc = [ ByteField("mem", 1), ] class TestPacket(Packet): name = "TestPacket" fields_desc = [ PacketListField("packlist", SubPacket(), SubPacket), ] a = TestPacket() b = a.copy() fuzz(b) assert a.packlist[0].mem == 1 = PacketField class InnerPacket(Packet): fields_desc = [ StrField("f_name", "test") ] class TestPacket(Packet): fields_desc = [ PacketField("inner", InnerPacket(), InnerPacket) ] p = TestPacket() print(p.inner.f_name) assert p.inner.f_name == b"test" p = TestPacket() p.inner.f_name = b"scapy" assert p.inner.f_name == b"scapy" p = TestPacket() assert p.inner.f_name == b"test" + UUIDField = Parsing a human-readable UUID f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef') f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef') = Parsing a machine-encoded UUID f = UUIDField('f', bytearray.fromhex('0123456789abcdef0123456789abcdef')) f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef') = Parsing a tuple of values f = UUIDField('f', (0x01234567, 0x89ab, 0xcdef, 0x01, 0x23, 0x45, 0x67, 0x89, 0xab, 0xcd, 0xef)) f.addfield(None, b'', f.default) == hex_bytes('0123456789abcdef0123456789abcdef') = Handle None values f = UUIDField('f', None) f.addfield(None, b'', f.default) == hex_bytes('00000000000000000000000000000000') = Get a UUID for dissection from uuid import UUID f = UUIDField('f', None) f.getfield(None, bytearray.fromhex('0123456789abcdef0123456789abcdef01')) == (b'\x01', UUID('01234567-89ab-cdef-0123-456789abcdef')) = Verify little endian UUIDField * The endianness of a UUIDField should be apply by block on each block in parenthesis '(01234567)-(89ab)-(cdef)-(01)(23)-(45)(67)(89)(ab)(cd)(ef)' f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_LE) f.addfield(None, b'', f.default) == hex_bytes('67452301ab89efcd0123456789abcdef') = Verify reversed UUIDField * This should reverse the entire value as 128-bits f = UUIDField('f', '01234567-89ab-cdef-0123-456789abcdef', uuid_fmt=UUIDField.FORMAT_REV) f.addfield(None, b'', f.default) == hex_bytes('efcdab8967452301efcdab8967452301') + RandUUID = RandUUID setup RANDUUID_TEMPLATE = '01234567-89ab-*-01*-*****ef' RANDUUID_FIXED = uuid.uuid4() = RandUUID default behaviour ru = RandUUID() assert ru._fix().version == 4 assert ru.command() == "RandUUID()" = RandUUID incorrect implicit args assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, name="scapy")) assert expect_exception(ValueError, lambda: RandUUID(node=0x1234, namespace=uuid.uuid4())) assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, name="scapy")) assert expect_exception(ValueError, lambda: RandUUID(clock_seq=0x1234, namespace=uuid.uuid4())) assert expect_exception(ValueError, lambda: RandUUID(name="scapy")) assert expect_exception(ValueError, lambda: RandUUID(namespace=uuid.uuid4())) = RandUUID v4 UUID (correct args) u = RandUUID(version=4)._fix() assert u.version == 4 u2 = RandUUID(version=4)._fix() assert u2.version == 4 assert str(u) != str(u2) = RandUUID v4 UUID (incorrect args) assert expect_exception(ValueError, lambda: RandUUID(version=4, template=RANDUUID_TEMPLATE)) assert expect_exception(ValueError, lambda: RandUUID(version=4, node=0x1234)) assert expect_exception(ValueError, lambda: RandUUID(version=4, clock_seq=0x1234)) assert expect_exception(ValueError, lambda: RandUUID(version=4, namespace=uuid.uuid4())) assert expect_exception(ValueError, lambda: RandUUID(version=4, name="scapy")) = RandUUID v1 UUID u = RandUUID(version=1)._fix() assert u.version in [1, 4] u = RandUUID(version=1, node=0x1234)._fix() assert u.version == 1 assert u.node == 0x1234 u = RandUUID(version=1, clock_seq=0x1234)._fix() assert u.version == 1 assert u.clock_seq == 0x1234 ru = RandUUID(version=1, node=0x1234, clock_seq=0x1bcd) assert ru.command() == "RandUUID(node=4660, clock_seq=7117, version=1)" u = ru._fix() assert u.version == 1 assert u.node == 0x1234 assert u.clock_seq == 0x1bcd = RandUUID v1 UUID (implicit version) u = RandUUID(node=0x1234)._fix() assert u.version == 1 assert u.node == 0x1234 u = RandUUID(clock_seq=0x1234)._fix() assert u.version == 1 assert u.clock_seq == 0x1234 u = RandUUID(node=0x1234, clock_seq=0x1bcd)._fix() assert u.version == 1 assert u.node == 0x1234 assert u.clock_seq == 0x1bcd = RandUUID v1 UUID (incorrect args) assert expect_exception(ValueError, lambda: RandUUID(version=1, template=RANDUUID_TEMPLATE)) assert expect_exception(ValueError, lambda: RandUUID(version=1, namespace=uuid.uuid4())) assert expect_exception(ValueError, lambda: RandUUID(version=1, name="scapy")) = RandUUID v5 UUID ru = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy") u = ru._fix() assert u.version == 5 assert ru.command() == "RandUUID(namespace=%r, name='scapy', version=5)" % RANDUUID_FIXED u2 = RandUUID(version=5, namespace=RANDUUID_FIXED, name="scapy")._fix() assert u2.version == 5 assert u.bytes == u2.bytes # implicit v5 u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix() assert u.bytes == u2.bytes = RandUUID v5 UUID (incorrect args) assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE)) assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234)) assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234)) = RandUUID v3 UUID u = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix() assert u.version == 3 u2 = RandUUID(version=3, namespace=RANDUUID_FIXED, name="scapy")._fix() assert u2.version == 3 assert u.bytes == u2.bytes # implicit v5 u2 = RandUUID(namespace=RANDUUID_FIXED, name="scapy")._fix() assert u.bytes != u2.bytes = RandUUID v3 UUID (incorrect args) assert expect_exception(ValueError, lambda: RandUUID(version=5, template=RANDUUID_TEMPLATE)) assert expect_exception(ValueError, lambda: RandUUID(version=5, node=0x1234)) assert expect_exception(ValueError, lambda: RandUUID(version=5, clock_seq=0x1234)) = RandUUID looks like a UUID with str assert re.match(r'[0-9a-f]{8}(-[0-9a-f]{4}){3}-[0-9a-f]{12}', str(RandUUID()), re.I) is not None = RandUUID with a static part * RandUUID template can contain static part such a 01234567-89ab-*-01*-*****ef ru = RandUUID('01234567-89ab-*-01*-*****ef') assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{10}ef', str(ru), re.I) is not None assert ru.command() == "RandUUID(template='01234567-89ab-*-01*-*****ef')" = RandUUID with a range part * RandUUID template can contain a part with a range of values such a 01234567-89ab-*-01*-****c0:c9ef assert re.match(r'01234567-89ab-[0-9a-f]{4}-01[0-9a-f]{2}-[0-9a-f]{8}c[0-9]ef', str(RandUUID('01234567-89ab-*-01*-****c0:c9ef')), re.I) is not None ############ ############ + MPLS tests = MPLS - build/dissection from scapy.contrib.mpls import EoMCW, MPLS p1 = MPLS()/IP()/UDP() assert p1[MPLS].s == 1 p2 = MPLS()/MPLS()/IP()/UDP() assert p2[MPLS].s == 0 p1[MPLS] p1[IP] p2[MPLS] p2[MPLS:1] p2[IP] = MPLS encapsulated Ethernet with CW - build/dissection p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22") p /= MPLS(label=1)/EoMCW(seq=1234) p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP() p = Ether(raw(p)) assert p[EoMCW].zero == 0 assert p[EoMCW].reserved == 0 assert p[EoMCW].seq == 1234 = MPLS encapsulated Ethernet without CW - build/dissection p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22") p /= MPLS(label=2)/MPLS(label=1) p /= Ether(dst="33:33:33:33:33:33", src="44:44:44:44:44:44")/IP() p = Ether(raw(p)) assert p[Ether:2].type == 0x0800 try: p[EoMCW] except IndexError: ret = True else: ret = False assert ret assert p[Ether:2].type == 0x0800 = MPLS encapsulated IP - build/dissection p = Ether(dst="11:11:11:11:11:11", src="22:22:22:22:22:22") p /= MPLS(label=1)/IP() p = Ether(raw(p)) try: p[EoMCW] except IndexError: ret = True else: ret = False assert ret try: p[Ether:2] except IndexError: ret = True else: ret = False assert ret p[IP] ############ ############ + PacketList methods = sr() class Req(Packet): fields_desc = [ ByteField("raw", 0) ] def answers(self, other): return False class Res(Packet): fields_desc = [ ByteField("raw", 0) ] def answers(self, other): return other.__class__ == Req and other.raw == self.raw pl = PacketList([Req(b"1"), Res(b"1"), Req(b"2"), Req(b"3"), Req(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"4")]) srl, rl = pl.sr() assert len(srl) == 3 assert len(rl) == 3 srl, rl = pl.sr(lookahead=1) assert len(srl) == 1 assert len(rl) == 7 srl, rl = pl.sr(lookahead=2) assert len(srl) == 2 assert len(rl) == 5 srl, rl = pl.sr(lookahead=3) assert len(srl) == 3 assert len(rl) == 3 pl = PacketList([Req(b"\x05"), Res(b"1"), Res(b"2"), Res(b"3"), Res(b"4"), Res(b"3"), Res(b"1"), Res(b"1"), Res(b"\x05")]) srl, rl = pl.sr(lookahead=3) assert len(srl) == 0 assert len(rl) == 9 srl, rl = pl.sr(lookahead=7) assert len(srl) == 0 assert len(rl) == 9 srl, rl = pl.sr(lookahead=8) assert len(srl) == 1 assert len(rl) == 7 srl, rl = pl.sr(lookahead=0) assert len(srl) == 1 assert len(rl) == 7 srl, rl = pl.sr(lookahead=None) assert len(srl) == 1 assert len(rl) == 7 = pickle test import pickle import io srl, rl = PacketList([Raw(b"1"), Raw(b"1"), Raw(b"2"), Raw(b"3"), Raw(b"4"), Raw(b"3"), Raw(b"1"), Raw(b"1"), Raw(b"4")]).sr() assert len(srl) == 4 f = io.BytesIO() pickle.dump(srl, f) unp = pickle.loads(f.getvalue()) assert len(unp) == len(srl) assert all(bytes(a[0]) == bytes(b[0]) for a, b in zip(unp, srl)) = plot() from unittest import mock import scapy.libs.matplot @mock.patch("scapy.libs.matplot.plt") def test_plot(mock_plt): def fake_plot(data, **kwargs): return data mock_plt.plot = fake_plot plist = PacketList([IP(id=i)/TCP() for i in range(10)]) lines = plist.plot(lambda p: (p.time, p.id)) assert len(lines) == 10 test_plot() = diffplot() from unittest import mock import scapy.libs.matplot @mock.patch("scapy.libs.matplot.plt") def test_diffplot(mock_plt): def fake_plot(data, **kwargs): return data mock_plt.plot = fake_plot plist = PacketList([IP(id=i)/TCP() for i in range(10)]) lines = plist.diffplot(lambda x,y: (x.time, y.id-x.id)) assert len(lines) == 9 test_diffplot() = multiplot() from unittest import mock import scapy.libs.matplot @mock.patch("scapy.libs.matplot.plt") def test_multiplot(mock_plt): def fake_plot(data, **kwargs): return data mock_plt.plot = fake_plot tmp = [IP(id=i)/TCP() for i in range(10)] plist = PacketList([tuple(tmp[i-2:i]) for i in range(2, 10, 2)]) lines = plist.multiplot(lambda x, y: (y[IP].src, (y.time, y[IP].id))) assert len(lines) == 1 assert len(lines[0]) == 4 test_multiplot() = rawhexdump() def test_rawhexdump(): with ContextManagerCaptureOutput() as cmco: p = PacketList([IP()/TCP() for i in range(2)]) p.rawhexdump() result_pl_rawhexdump = cmco.get_output() assert len(result_pl_rawhexdump.split('\n')) == 7 assert result_pl_rawhexdump.startswith("0000 45 00 00 28") test_rawhexdump() = hexraw() def test_hexraw(): with ContextManagerCaptureOutput() as cmco: p = PacketList([IP()/Raw(str(i)) for i in range(2)]) p.hexraw() result_pl_hexraw = cmco.get_output() assert len(result_pl_hexraw.split('\n')) == 5 assert "0000 30" in result_pl_hexraw test_hexraw() = hexdump() def test_hexdump(): with ContextManagerCaptureOutput() as cmco: p = PacketList([IP()/Raw(str(i)) for i in range(2)]) p.hexdump() result_pl_hexdump = cmco.get_output() assert len(result_pl_hexdump.split('\n')) == 7 assert "0010 7F 00 00 01 31" in result_pl_hexdump test_hexdump() = import_hexcap() @mock.patch("scapy.utils.input") def test_import_hexcap(mock_input): data = """ 0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E. 0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|....... 0020 00 01 08 00 F7 FF 00 00 00 00 .......... """[1:].split("\n") lines = iter(data) mock_input.side_effect = lambda: next(lines) return import_hexcap() pkt = test_import_hexcap() pkt = Ether(pkt) assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff" assert pkt[IP].dst == "127.0.0.1" assert ICMP in pkt = import_hexcap(input_string) data = """ 0000 FF FF FF FF FF FF AA AA AA AA AA AA 08 00 45 00 ..............E. 0010 00 1C 00 01 00 00 40 01 7C DE 7F 00 00 01 7F 00 ......@.|....... 0020 00 01 08 00 F7 FF 00 00 00 00 .......... """[1:] pkt = import_hexcap(data) pkt = Ether(pkt) assert pkt[Ether].dst == "ff:ff:ff:ff:ff:ff" assert pkt[IP].dst == "127.0.0.1" assert ICMP in pkt = padding() def test_padding(): with ContextManagerCaptureOutput() as cmco: p = PacketList([IP()/conf.padding_layer(str(i)) for i in range(2)]) p.padding() result_pl_padding = cmco.get_output() assert len(result_pl_padding.split('\n')) == 5 assert "0000 30" in result_pl_padding test_padding() = nzpadding() def test_nzpadding(): with ContextManagerCaptureOutput() as cmco: p = PacketList([IP()/conf.padding_layer("AB"), IP()/conf.padding_layer("\x00\x00")]) p.nzpadding() result_pl_nzpadding = cmco.get_output() assert len(result_pl_nzpadding.split('\n')) == 3 assert "0000 41 42" in result_pl_nzpadding test_nzpadding() = conversations() from unittest import mock @mock.patch("scapy.plist.do_graph") def test_conversations(mock_do_graph): def fake_do_graph(graph, **kwargs): return graph mock_do_graph.side_effect = fake_do_graph plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)]) plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)]) plist.extend([IPv6(dst="::2", src="::1")/TCP(sport=i) for i in range(2)]) plist.extend([IPv6(src="::2", dst="::1")/TCP(sport=i) for i in range(2)]) plist.extend([Ether()/ARP(pdst="127.0.0.1")]) result_conversations = plist.conversations() assert len(result_conversations.split('\n')) == 8 assert result_conversations.startswith('digraph "conv" {') assert "127.0.0.1" in result_conversations assert "::1" in result_conversations test_conversations() = sessions() pl = PacketList([Ether()/IPv6()/ICMPv6EchoRequest(), Ether()/IPv6()/IPv6()]) pl.extend([Ether()/IP()/IP(), Ether()/ARP()]) pl.extend([Ether()/Ether()/IP()]) assert len(pl.sessions().keys()) == 5 = afterglow() from unittest import mock @mock.patch("scapy.plist.do_graph") def test_afterglow(mock_do_graph): def fake_do_graph(graph, **kwargs): return graph mock_do_graph.side_effect = fake_do_graph plist = PacketList([IP(dst="127.0.0.2")/TCP(dport=i) for i in range(2)]) plist.extend([IP(src="127.0.0.2")/TCP(sport=i) for i in range(2)]) result_afterglow = plist.afterglow() assert len(result_afterglow.split('\n')) == 19 assert result_afterglow.startswith('digraph "afterglow" {') test_afterglow() = psdump() print("PYX: %d" % PYX) if PYX: import tempfile import os filename = tempfile.mktemp(suffix=".eps") plist = PacketList([IP()/TCP()]) plist.psdump(filename) assert os.path.exists(filename) os.unlink(filename) = pdfdump() print("PYX: %d" % PYX) if PYX: import tempfile import os filename = tempfile.mktemp(suffix=".pdf") plist = PacketList([IP()/TCP()]) plist.pdfdump(filename) assert os.path.exists(filename) os.unlink(filename) = svgdump() print("PYX: %d" % PYX) if PYX: import tempfile import os filename = tempfile.mktemp(suffix=".svg") plist = PacketList([IP()/TCP()]) plist.svgdump(filename) assert os.path.exists(filename) os.unlink(filename) = __getstate__ / __setstate__ (used by pickle) import pickle frm = Ether(src='00:11:22:33:44:55', dst='00:22:33:44:55:66')/Raw() frm.time = EDecimal(123.45) frm.sniffed_on = "iface" frm.wirelen = 1 pl = PacketList(res=[frm, frm], name='WhatAGreatName') pickled = pickle.dumps(pl) pl = pickle.loads(pickled) assert pl.listname == "WhatAGreatName" assert len(pl) == 2 assert pl[0].time == 123.45 assert pl[0].sniffed_on == "iface" assert pl[0].wirelen == 1 assert pl[0][Ether].src == '00:11:22:33:44:55' assert pl[1][Ether].dst == '00:22:33:44:55:66' = EDecimal # GH4488 p1, p2 = EDecimal('1722417787.778435252'), EDecimal('1722417787.778435216') assert p1 != p2 assert p1 > p2 assert not (p1 < p2) assert p1 == 1722417787.778435252 # float test assert p2 == 1722417787.778435216 assert (p1, 0) > (p2, 1) ############ ############ + Scapy version = _version() import os from datetime import datetime, timezone version_filename = os.path.join(scapy._SCAPY_PKG_DIR, "VERSION") mtime = datetime.fromtimestamp(os.path.getmtime(scapy.__file__), timezone.utc) version = "2.0.0" with open(version_filename, "w") as fd: fd.write(version) os.environ["SCAPY_VERSION"] = "9.9.9" assert scapy._version() == "9.9.9" del os.environ["SCAPY_VERSION"] assert scapy._version() == version os.unlink(version_filename) from unittest import mock with mock.patch("scapy._version_from_git_archive") as archive: archive.return_value = "4.4.4" assert scapy._version() == "4.4.4" archive.side_effect = ValueError() with mock.patch("scapy._version_from_git_describe") as git: git.return_value = "3.3.3" assert scapy._version() == "3.3.3" git.side_effect = Exception() assert scapy._version() == mtime.strftime("%Y.%m.%d") with mock.patch("os.path.getmtime") as getmtime: getmtime.side_effect = Exception() assert scapy._version() == "0.0.0" = UTscapy HTML output import tempfile, os from scapy.tools.UTscapy import TestCampaign, pack_html_campaigns test_campaign = TestCampaign("test") test_campaign.output_file = tempfile.mktemp() html = pack_html_campaigns([test_campaign], None, local=True) dirname = os.path.dirname(test_campaign.output_file) filename_js = "%s/UTscapy.js" % dirname filename_css = "%s/UTscapy.css" % dirname assert os.path.isfile(filename_js) assert os.path.isfile(filename_css) os.remove(filename_js) os.remove(filename_css) = test get_temp_dir dname = get_temp_dir() assert os.path.isdir(dname) = test fragleak functions ~ netaccess linux fragleak from unittest import mock @mock.patch("scapy.layers.inet.conf.L3socket") @mock.patch("scapy.layers.inet.select.select") @mock.patch("scapy.layers.inet.sr1") def _test_fragleak(func, sr1, select, L3socket): packets = [IP(src="4.4.4.4")/ICMP()/IPerror(dst="8.8.8.8")/conf.padding_layer(load=b"greatdata")] iterator = iter(packets) ne = lambda *args, **kwargs: next(iterator) L3socket.side_effect = lambda: Bunch(recv=ne, send=lambda x: None) sr1.side_effect = ne select.side_effect = lambda a, b, c, d: a+b+c with ContextManagerCaptureOutput() as cmco: func("8.8.8.8", count=1) out = cmco.get_output() return "greatdata" in out assert _test_fragleak(fragleak) assert _test_fragleak(fragleak2)