% Regression tests for Scapy Answering Machines # More information at http://www.secdev.org/projects/UTscapy/ ############ ############ + Answering Machines = Generic answering machine mocker from unittest import mock @mock.patch("scapy.ansmachine.sniff") def test_am(cls_name, packet_query, check_reply, mock_sniff, **kargs): packet_query = packet_query.__class__(bytes(packet_query)) def sniff(*args,**kargs): kargs["prn"](packet_query) mock_sniff.side_effect = sniff am = cls_name(**kargs) called = [False] def _sndrpl(x): called[0] = True check_reply(x.__class__(bytes(x))) am.send_reply = _sndrpl am() assert called[0], "Filter never passed for AnsweringMachine !" = BOOT_am def check_BOOTP_am_reply(packet): assert BOOTP in packet and packet[BOOTP].op == 2 assert packet[BOOTP].yiaddr == "192.168.1.128" and packet[BOOTP].giaddr == "192.168.1.1" test_am(BOOTP_am, Ether()/IP()/UDP()/BOOTP(op=1), check_BOOTP_am_reply) = DHCP_am def check_DHCP_am_reply(packet): assert DHCP in packet and len(packet[DHCP].options) assert ("domain", b"localnet") in packet[DHCP].options test_am(DHCP_am, Ether()/IP()/UDP()/BOOTP(op=1)/DHCP(options=[('message-type', 'request')]), check_DHCP_am_reply, domain="localnet") = ARP_am def check_ARP_am_reply(packet): assert ARP in packet and packet[ARP].psrc == "10.28.7.1" assert packet[ARP].hwsrc == "00:01:02:03:04:05" test_am(ARP_am, Ether()/ARP(pdst="10.28.7.1"), check_ARP_am_reply, IP_addr="10.28.7.1", ARP_addr="00:01:02:03:04:05") = ICMPEcho_am def check_ICMP_am_reply(packet): packet.show() assert packet[Ether].src != "ff:ff:ff:ff:ff:ff" assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa" assert IP in packet and ICMP in packet assert packet[IP].dst == "1.1.1.1" assert packet[IP].src == "2.2.2.2" assert packet[ICMP].seq == 12 test_am(ICMPEcho_am, Ether(src="aa:aa:aa:aa:aa:aa", dst="ff:ff:ff:ff:ff:ff")/IP(src="1.1.1.1", dst="2.2.2.2")/ICMP(seq=12), check_ICMP_am_reply) = DNS_am def check_DNS_am_reply(packet): assert packet[Ether].src == "bb:bb:bb:bb:bb:bb" assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa" assert packet[IP].src == "127.0.0.2" assert packet[IP].dst == "127.0.0.1" assert DNS in packet and packet[DNS].ancount == 1 assert packet[DNS].an[0].rdata == "192.168.1.1" assert packet[DNS].qd[0].qname == b"www.secdev.org." test_am(DNS_am, Ether(src="aa:aa:aa:aa:aa:aa", dst="bb:bb:bb:bb:bb:bb")/IP(src="127.0.0.1", dst="127.0.0.2")/UDP()/DNS(qd=DNSQR(qname="www.secdev.org")), check_DNS_am_reply, joker="192.168.1.1") def check_DNS_am_reply_srvmatch(packet): assert DNS in packet and packet[DNS].ancount == 1 assert isinstance(packet[DNS].an[0], DNSRRSRV) assert packet[DNS].an[0].rrname == b'_ldap._tcp.dc._msdcs.scapy.fr.' assert packet[DNS].an[0].port == 389 assert packet[DNS].an[0].target == b'dc.scapy.fr.' test_am(DNS_am, Ether()/IP()/UDP()/DNS(qd=DNSQR(qname=b'_ldap._tcp.dc._msdcs.scapy.fr.', qtype="SRV")), check_DNS_am_reply_srvmatch, srvmatch={"_ldap._tcp.dc._msdcs.scapy.fr": (389, "dc.scapy.fr")}) def check_DNS_am_reply_arpa(packet): assert DNS in packet and packet[DNS].ancount == 1 assert packet[DNS].an[0].rdata == b"scapy." assert packet[DNS].an[0].rrname == b"1.0.16.172.in-addr.arpa." test_am(DNS_am, Ether()/IP()/UDP()/DNS(qd=DNSQR(qname=b"1.0.16.172.in-addr.arpa.", qtype="PTR")), check_DNS_am_reply_arpa, jokerarpa="scapy") def check_DNS_am_reply2(packet): assert DNS in packet and packet[DNS].ancount == 2 assert packet[DNS].an[0].rdata == "128.0.0.1" assert packet[DNS].an[1].rdata == "::1" test_am(DNS_am, Ether()/IP(b'E\x00\x00H\x00\x01\x00\x00@\x11|\xa2\x7f\x00\x00\x01\x7f\x00\x00\x01\x005\x005\x004\xe8\x9a\x00\x00\x01\x00\x00\x02\x00\x00\x00\x00\x00\x00\x06gaagle\x03com\x00\x00\x01\x00\x01\x06google\x03com\x00\x00\x1c\x00\x01'), check_DNS_am_reply2, match={"google.com": ("127.0.0.1", "::1"), "gaagle.com": "128.0.0.1"}, joker=False) assert DNS_am().make_reply(Ether()) is None assert DNS_am().make_reply(Ether()/IP()) is None assert DNS_am().make_reply(Ether()/IP()/UDP()) is None assert DNS_am().make_reply( Ether()/IP()/UDP()/DNS(b'q\xa04\x00\x00\xa0\x01\x00\xf3\x00\x01\x04\x01y') ) is None = LLMNR_am def check_LLMNR_am_am_reply(packet): # assert packet[Ether].src == get_if_hwaddr(conf.iface) assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa" # assert packet[IP].src == get_if_addr(conf.iface) assert packet[IP].dst == "192.168.0.1" assert packet[UDP].dport == 51938 assert packet[UDP].sport == 5355 assert LLMNRResponse in packet and packet[LLMNRResponse].ancount == 1 and packet[LLMNRResponse].qdcount == 1 assert packet[LLMNRResponse].qd[0].qname == b"TEST." assert packet[LLMNRResponse].an[0].rdata == "192.168.1.1" assert packet[LLMNRResponse].an[0].rrname == b"TEST." assert packet[LLMNRResponse].an[0].ttl == 60 test_am(LLMNR_am, Ether(src="aa:aa:aa:aa:aa:aa", dst="01:00:5e:00:00:fc")/IP(src="192.168.0.1", dst="224.0.0.252")/UDP(dport=5355, sport=51938)/LLMNRQuery(qd=DNSQR(qname=b"TEST.", qtype="A")), check_LLMNR_am_am_reply, ttl=60, match={"TEST": "192.168.1.1"}) = mDNS_am def check_mDNS_am_reply(packet): packet.show() # assert packet[Ether].src == get_if_hwaddr(conf.iface) assert packet[Ether].dst == "01:00:5e:00:00:fb" # assert packet[IP].src == get_if_addr(conf.iface) assert packet[IP].dst == "224.0.0.251" assert packet[IP].ttl == 255 assert packet[UDP].dport == 5353 assert packet[UDP].sport == 5353 assert DNS in packet and packet[DNS].ancount == 1 and packet[DNS].qdcount == 0 assert packet[DNS].an[0].rdata == "192.168.1.1" assert packet[DNS].an[0].rrname == b"TEST.local." assert packet[DNS].an[0].ttl == 10 test_am(mDNS_am, Ether(src="aa:aa:aa:aa:aa:aa", dst="01:00:5e:00:00:fb")/IP(src="192.168.0.1", dst="224.0.0.251", ttl=1)/UDP(dport=5353, sport=5353)/DNS(qd=DNSQR(qname=b"TEST.local.", qtype="A")), check_mDNS_am_reply, joker="192.168.1.1") def check_mDNS_am_reply2(packet): # $ avahi-resolve -n bonjour.local packet.show() # assert packet[Ether].src == get_if_hwaddr(conf.iface) assert packet[Ether].dst == "01:00:5e:00:00:fb" # assert packet[IP].src == get_if_addr(conf.iface) assert packet[IP].dst == "224.0.0.251" assert packet[IP].ttl == 255 assert packet[UDP].dport == 5353 assert packet[UDP].sport == 5353 assert DNS in packet and packet[DNS].ancount == 2 and packet[DNS].qdcount == 0 assert packet[DNS].an[0].rdata == "192.168.1.1" assert packet[DNS].an[0].rrname == b"bonjour.local." assert packet[DNS].an[0].ttl == 120 assert packet[DNS].an[1].type == 47 assert packet[DNS].an[1].rrname == b"bonjour.local." assert packet[DNS].an[1].ttl == 120 test_am(mDNS_am, Ether(b'\x01\x00^\x00\x00\xfb\xaa\xaa\xaa\xaa\xaa\xaa\x08\x00E\x00\x00A\xce}@\x00\xff\x11\x0b\x89\xc0\xa8\x00\x01\xe0\x00\x00\xfb\x14\xe9\x14\xe9\x00-\xdbl\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x07bonjour\x05local\x00\x00\x01\x00\x01\xc0\x0c\x00\x1c\x00\x01'), check_mDNS_am_reply2, joker="192.168.1.1", ttl=120) = DHCPv6_am - Basic Instantiaion ~ osx netaccess a = DHCPv6_am() a.usage() a.parse_options(dns="2001:500::1035", domain="localdomain, local", duid=None, iface=conf.iface, advpref=255, sntpservers=None, sipdomains=None, sipservers=None, nisdomain=None, nisservers=None, nispdomain=None, nispservers=None, bcmcsdomains=None, bcmcsservers=None, debug=1) = DHCPv6_am - SOLICIT ~ osx netaccess req = IPv6(dst="::1")/UDP()/DHCP6(msgtype=1)/DHCP6OptClientId(duid=DUID_LLT()) assert a.is_request(req) res = a.make_reply(req) assert not a.is_request(res) assert res[DHCP6_Advertise] assert res[DHCP6OptPref].prefval == 255 assert res[DHCP6OptReconfAccept] a.print_reply(req, res) = DHCPv6_am - INFO-REQUEST ~ osx netaccess req = IPv6(dst="::1")/UDP()/DHCP6(msgtype=11)/DHCP6OptClientId(duid=DUID_LLT()) assert a.is_request(req) res = a.make_reply(req) assert not a.is_request(res) assert res[DHCP6_Reply] assert "local" in res[DHCP6OptDNSDomains].dnsdomains a.print_reply(req, res) = DHCPv6_am - REQUEST ~ osx netaccess req = IPv6(dst="::1")/UDP()/DHCP6(msgtype=3)/DHCP6OptClientId(duid=DUID_LLT())/DHCP6OptServerId(duid=a.duid) assert a.is_request(req) res = a.make_reply(req) assert not a.is_request(res) assert res[UDP].dport == 546 assert res[DHCP6_Solicit] a.print_reply(req, res) = WiFi_am from unittest import mock @mock.patch("scapy.layers.dot11.sniff") def test_WiFi_am(packet_query, check_reply, mock_sniff, **kargs): def sniff(*args,**kargs): kargs["prn"](packet_query) mock_sniff.side_effect = sniff am = WiFi_am(**kargs) am.send_reply = check_reply am() def check_WiFi_am_reply(packet): assert isinstance(packet, list) and len(packet) == 2 assert TCP in packet[0] and Raw in packet[0] and raw(packet[0][Raw]) == b"5c4pY" test_WiFi_am(Dot11(FCfield="to-DS")/IP()/TCP()/"Scapy", check_WiFi_am_reply, iffrom="scapy0", ifto="scapy1", replace="5c4pY", pattern="Scapy") = NBNS_am def check_NBNS_am_reply(name): def check(packet): packet.show() assert packet[Ether].src != "ff:ff:ff:ff:ff:ff" assert packet[Ether].dst == "aa:aa:aa:aa:aa:aa" assert NBNSQueryResponse in packet and packet[NBNSQueryResponse].RR_NAME == name return check for server_name in (None, "", b"test", "test"): test_am(NBNS_am, Ether(src="aa:aa:aa:aa:aa:aa", dst="ff:ff:ff:ff:ff:ff")/IP()/UDP()/NBNSHeader()/NBNSQueryRequest(QUESTION_NAME="test"), check_NBNS_am_reply(b"test"), server_name=server_name) test_am(NBNS_am, Ether(src="aa:aa:aa:aa:aa:aa", dst="ff:ff:ff:ff:ff:ff")/IP()/UDP()/NBNSHeader()/NBNSQueryRequest(QUESTION_NAME=b"\x85"), check_NBNS_am_reply(b"\x85"), server_name=b"\x85") = LdapPing_am def check_LdapPing_am_reply(packet): nlogon = packet[CLDAP].protocolOp.attributes[0] assert nlogon.type == b"Netlogon" logonresp = NETLOGON(nlogon.values[0].value.val) assert isinstance(logonresp, NETLOGON_SAM_LOGON_RESPONSE_EX) logonresp.show() assert logonresp.DnsForestName == b'scapy.fr.', "DnsForestName" assert logonresp.DnsDomainName == b'scapy.fr.', "DnsDomainName" assert logonresp.DnsHostName == b'DC.scapy.fr.', "DnsHostName" assert logonresp.NetbiosDomainName == b'SCAPY.', "NetbiosDomainName" assert logonresp.NetbiosComputerName == b'DC.', "NetbiosComputerName" assert logonresp.NtVersion == 3, "NtVersion" assert logonresp.Flags == 0x3f3fd, "Flags" assert logonresp.ClientSiteName == b'Default-First-Site-Name.', "ClientSiteName" test_am(LdapPing_am, Ether(b'\xaa\xaa\xaa\xaa\xaa\xaa\xbb\xbb\xbb\xbb\xbb\xbb\x08\x00E\x00\x00\xaf\x9d\xb1\x00\x00\x80\x11\x9c\x89\xac\x13P\x01\xac\x13W\xdb\xc7{\x01\x85\x00\x9bV[0q\x02\x01\x01cl\x04\x00\n\x01\x00\n\x01\x00\x02\x01\x00\x02\x01\x00\x01\x01\x00\xa0M\xa3\x15\x04\tDnsDomain\x04\x08scapy.fr\xa3\x0e\x04\x04Host\x04\x06HOST01\xa3\r\x04\x05NtVer\x04\x04\x16\x00\x00 \xa3\x15\x04\x0bDnsHostName\x04\x06HOST010\n\x04\x08Netlogon'), check_LdapPing_am_reply, NetbiosComputerName="DC", NetbiosDomainName="SCAPY", DnsForestName="scapy.fr") def check_NBNS_LdapPing_am_reply(packet): packet.show() assert SMBMailslot_Write in packet, "SMBMailslot_Write" assert packet[SMBMailslot_Write].Name == b'\\MAILSLOT\\NET\\GETDC510CC0AD', "SMBMailslot_Write.Name" logonresp = NETLOGON(packet[SMBMailslot_Write].Data.load) logonresp.show() assert logonresp.DcSockAddrSize == 16, "DcSockAddrSize" assert isinstance(logonresp.DcSockAddr, DcSockAddr) assert logonresp.DcSockAddr.sin_family == 2, "sin_family" assert logonresp.DcSockAddr.sin_port == 0, "sin_port" assert logonresp.DcSockAddr.sin_zero == 0, "sin_zero" assert logonresp.DcSockAddr.sin_addr == get_if_addr(conf.iface) assert logonresp.DnsForestName == b'scapy.fr.', "DnsForestName" assert logonresp.DnsDomainName == b'scapy.fr.', "DnsDomainName" assert logonresp.DnsHostName == b'DC.scapy.fr.', "DnsHostName" assert logonresp.NetbiosDomainName == b'SCAPY.', "NetbiosDomainName" assert logonresp.NetbiosComputerName == b'DC.', "NetbiosComputerName" assert logonresp.NtVersion == 13, "NtVersion" assert logonresp.Flags == 0x3f3fd, "Flags" assert logonresp.ClientSiteName == b'Default-First-Site-Name.', "ClientSiteName" test_am(LdapPing_am, Ether(b'\xaa\xaa\xaa\xaa\xaa\xaa\xbb\xbb\xbb\xbb\xbb\xbb\x08\x00E\x00\x01\n\xff\x82\x00\x00\x80\x11:]\xac\x13P\x01\xac\x13W\xdb\x00\x8a\x00\x8a\x00\xf6\xd5\xcb\x10\x02\xde\x9d\xac\x13P\x01\x00\x8a\x00\xe0\x00\x00 EIEPFDFEDADBCACACACACACACACACAAA\x00 FDEDEBFAFJCACACACACACACACACACABM\x00\xffSMB%\x00\x00\x00\x00\x18\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xff\xfe\x00\x00\x00\x00\x11\x00\x00@\x00\x02\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\\\x00@\x00\\\x00\x03\x00\x01\x00\x00\x00\x02\x00W\x00\\MAILSLOT\\NET\\NETLOGON\x00\x12\x00\x00\x00H\x00O\x00S\x00T\x000\x001\x00\x00\x00\x00\x00\\MAILSLOT\\NET\\GETDC510CC0AD\x00\x00\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00 \xff\xff\xff\xff'), check_NBNS_LdapPing_am_reply, NetbiosComputerName="DC", NetbiosDomainName="SCAPY", DnsForestName="scapy.fr")