• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7DNS: Domain Name System
8
9This implements:
10- RFC1035: Domain Names
11- RFC6762: Multicast DNS
12- RFC6763: DNS-Based Service Discovery
13"""
14
15import abc
16import collections
17import operator
18import itertools
19import socket
20import struct
21import time
22import warnings
23
24from scapy.arch import (
25    get_if_addr,
26    get_if_addr6,
27    read_nameservers,
28)
29from scapy.ansmachine import AnsweringMachine
30from scapy.base_classes import Net, ScopedIP
31from scapy.config import conf
32from scapy.compat import orb, raw, chb, bytes_encode, plain_str
33from scapy.error import log_runtime, warning, Scapy_Exception
34from scapy.packet import Packet, bind_layers, Raw
35from scapy.fields import (
36    BitEnumField,
37    BitField,
38    ByteEnumField,
39    ByteField,
40    ConditionalField,
41    Field,
42    FieldLenField,
43    FieldListField,
44    FlagsField,
45    I,
46    IP6Field,
47    IntField,
48    MACField,
49    MultipleTypeField,
50    PacketListField,
51    ShortEnumField,
52    ShortField,
53    StrField,
54    StrLenField,
55    UTCTimeField,
56    XStrFixedLenField,
57    XStrLenField,
58)
59from scapy.interfaces import resolve_iface
60from scapy.sendrecv import sr1, sr
61from scapy.supersocket import StreamSocket
62from scapy.plist import SndRcvList, _PacketList, QueryAnswer
63from scapy.pton_ntop import inet_ntop, inet_pton
64from scapy.utils import pretty_list
65from scapy.volatile import RandShort
66
67from scapy.layers.l2 import Ether
68from scapy.layers.inet import IP, DestIPField, IPField, UDP, TCP
69from scapy.layers.inet6 import IPv6
70
71from typing import (
72    Any,
73    List,
74    Optional,
75    Tuple,
76    Type,
77    Union,
78)
79
80
81# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#dns-parameters-4
82dnstypes = {
83    0: "RESERVED",
84    1: "A", 2: "NS", 3: "MD", 4: "MF", 5: "CNAME", 6: "SOA", 7: "MB", 8: "MG",
85    9: "MR", 10: "NULL", 11: "WKS", 12: "PTR", 13: "HINFO", 14: "MINFO",
86    15: "MX", 16: "TXT", 17: "RP", 18: "AFSDB", 19: "X25", 20: "ISDN",
87    21: "RT", 22: "NSAP", 23: "NSAP-PTR", 24: "SIG", 25: "KEY", 26: "PX",
88    27: "GPOS", 28: "AAAA", 29: "LOC", 30: "NXT", 31: "EID", 32: "NIMLOC",
89    33: "SRV", 34: "ATMA", 35: "NAPTR", 36: "KX", 37: "CERT", 38: "A6",
90    39: "DNAME", 40: "SINK", 41: "OPT", 42: "APL", 43: "DS", 44: "SSHFP",
91    45: "IPSECKEY", 46: "RRSIG", 47: "NSEC", 48: "DNSKEY", 49: "DHCID",
92    50: "NSEC3", 51: "NSEC3PARAM", 52: "TLSA", 53: "SMIMEA", 55: "HIP",
93    56: "NINFO", 57: "RKEY", 58: "TALINK", 59: "CDS", 60: "CDNSKEY",
94    61: "OPENPGPKEY", 62: "CSYNC", 63: "ZONEMD", 64: "SVCB", 65: "HTTPS",
95    99: "SPF", 100: "UINFO", 101: "UID", 102: "GID", 103: "UNSPEC", 104: "NID",
96    105: "L32", 106: "L64", 107: "LP", 108: "EUI48", 109: "EUI64", 249: "TKEY",
97    250: "TSIG", 256: "URI", 257: "CAA", 258: "AVC", 259: "DOA",
98    260: "AMTRELAY", 32768: "TA", 32769: "DLV", 65535: "RESERVED"
99}
100
101
102dnsqtypes = {251: "IXFR", 252: "AXFR", 253: "MAILB", 254: "MAILA", 255: "ALL"}
103dnsqtypes.update(dnstypes)
104dnsclasses = {1: 'IN', 2: 'CS', 3: 'CH', 4: 'HS', 255: 'ANY'}
105
106
107# 12/2023 from https://www.iana.org/assignments/dns-sec-alg-numbers/dns-sec-alg-numbers.xhtml  # noqa: E501
108dnssecalgotypes = {0: "Reserved", 1: "RSA/MD5", 2: "Diffie-Hellman", 3: "DSA/SHA-1",  # noqa: E501
109                   4: "Reserved", 5: "RSA/SHA-1", 6: "DSA-NSEC3-SHA1",
110                   7: "RSASHA1-NSEC3-SHA1", 8: "RSA/SHA-256", 9: "Reserved",
111                   10: "RSA/SHA-512", 11: "Reserved", 12: "GOST R 34.10-2001",
112                   13: "ECDSA Curve P-256 with SHA-256", 14: "ECDSA Curve P-384 with SHA-384",  # noqa: E501
113                   15: "Ed25519", 16: "Ed448",
114                   252: "Reserved for Indirect Keys", 253: "Private algorithms - domain name",  # noqa: E501
115                   254: "Private algorithms - OID", 255: "Reserved"}
116
117# 12/2023 from https://www.iana.org/assignments/ds-rr-types/ds-rr-types.xhtml
118dnssecdigesttypes = {0: "Reserved", 1: "SHA-1", 2: "SHA-256", 3: "GOST R 34.11-94", 4: "SHA-384"}  # noqa: E501
119
120# 12/2023 from https://www.iana.org/assignments/dnssec-nsec3-parameters/dnssec-nsec3-parameters.xhtml  # noqa: E501
121dnssecnsec3algotypes = {0: "Reserved", 1: "SHA-1"}
122
123
124def dns_get_str(s, full=None, _ignore_compression=False):
125    """This function decompresses a string s, starting
126    from the given pointer.
127
128    :param s: the string to decompress
129    :param full: (optional) the full packet (used for decompression)
130
131    :returns: (decoded_string, end_index, left_string)
132    """
133    # _ignore_compression is for internal use only
134    max_length = len(s)
135    # The result = the extracted name
136    name = b""
137    # Will contain the index after the pointer, to be returned
138    after_pointer = None
139    processed_pointers = []  # Used to check for decompression loops
140    bytes_left = None
141    _fullpacket = False  # s = full packet
142    pointer = 0
143    while True:
144        if abs(pointer) >= max_length:
145            log_runtime.info(
146                "DNS RR prematured end (ofs=%i, len=%i)", pointer, len(s)
147            )
148            break
149        cur = s[pointer]  # get pointer value
150        pointer += 1  # make pointer go forward
151        if cur & 0xc0:  # Label pointer
152            if after_pointer is None:
153                # after_pointer points to where the remaining bytes start,
154                # as pointer will follow the jump token
155                after_pointer = pointer + 1
156            if _ignore_compression:
157                # skip
158                pointer += 1
159                continue
160            if pointer >= max_length:
161                log_runtime.info(
162                    "DNS incomplete jump token at (ofs=%i)", pointer
163                )
164                break
165            if not full:
166                raise Scapy_Exception("DNS message can't be compressed " +
167                                      "at this point!")
168            # Follow the pointer
169            pointer = ((cur & ~0xc0) << 8) + s[pointer]
170            if pointer in processed_pointers:
171                warning("DNS decompression loop detected")
172                break
173            if len(processed_pointers) >= 20:
174                warning("More than 20 jumps in a single DNS decompression ! "
175                        "Dropping (evil packet)")
176                break
177            if not _fullpacket:
178                # We switch our s buffer to full, so we need to remember
179                # the previous context
180                bytes_left = s[after_pointer:]
181                s = full
182                max_length = len(s)
183                _fullpacket = True
184            processed_pointers.append(pointer)
185            continue
186        elif cur > 0:  # Label
187            # cur = length of the string
188            name += s[pointer:pointer + cur] + b"."
189            pointer += cur
190        else:  # End
191            break
192    if after_pointer is not None:
193        # Return the real end index (not the one we followed)
194        pointer = after_pointer
195    if bytes_left is None:
196        bytes_left = s[pointer:]
197    # name, remaining
198    return name or b".", bytes_left
199
200
201def _is_ptr(x):
202    return b"." not in x and (
203        (x and orb(x[-1]) == 0) or
204        (len(x) >= 2 and (orb(x[-2]) & 0xc0) == 0xc0)
205    )
206
207
208def dns_encode(x, check_built=False):
209    """Encodes a bytes string into the DNS format
210
211    :param x: the string
212    :param check_built: detect already-built strings and ignore them
213    :returns: the encoded bytes string
214    """
215    if not x or x == b".":
216        return b"\x00"
217
218    if check_built and _is_ptr(x):
219        # The value has already been processed. Do not process it again
220        return x
221
222    # Truncate chunks that cannot be encoded (more than 63 bytes..)
223    x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(b".")))
224    if x[-1:] != b"\x00":
225        x += b"\x00"
226    return x
227
228
229def DNSgetstr(*args, **kwargs):
230    """Legacy function. Deprecated"""
231    warnings.warn(
232        "DNSgetstr is deprecated. Use dns_get_str instead.",
233        DeprecationWarning
234    )
235    return dns_get_str(*args, **kwargs)[:-1]
236
237
238def dns_compress(pkt):
239    """This function compresses a DNS packet according to compression rules.
240    """
241    if DNS not in pkt:
242        raise Scapy_Exception("Can only compress DNS layers")
243    pkt = pkt.copy()
244    dns_pkt = pkt.getlayer(DNS)
245    dns_pkt.clear_cache()
246    build_pkt = raw(dns_pkt)
247
248    def field_gen(dns_pkt):
249        """Iterates through all DNS strings that can be compressed"""
250        for lay in [dns_pkt.qd, dns_pkt.an, dns_pkt.ns, dns_pkt.ar]:
251            if not lay:
252                continue
253            for current in lay:
254                for field in current.fields_desc:
255                    if isinstance(field, DNSStrField) or \
256                        (isinstance(field, MultipleTypeField) and
257                         current.type in [2, 3, 4, 5, 12, 15, 39, 47]):
258                        # Get the associated data and store it accordingly  # noqa: E501
259                        dat = current.getfieldval(field.name)
260                        yield current, field.name, dat
261
262    def possible_shortens(dat):
263        """Iterates through all possible compression parts in a DNS string"""
264        if dat == b".":  # we'd lose by compressing it
265            return
266        yield dat
267        for x in range(1, dat.count(b".")):
268            yield dat.split(b".", x)[x]
269    data = {}
270    for current, name, dat in field_gen(dns_pkt):
271        for part in possible_shortens(dat):
272            # Encode the data
273            encoded = dns_encode(part, check_built=True)
274            if part not in data:
275                # We have no occurrence of such data, let's store it as a
276                # possible pointer for future strings.
277                # We get the index of the encoded data
278                index = build_pkt.index(encoded)
279                # The following is used to build correctly the pointer
280                fb_index = ((index >> 8) | 0xc0)
281                sb_index = index - (256 * (fb_index - 0xc0))
282                pointer = chb(fb_index) + chb(sb_index)
283                data[part] = [(current, name, pointer, index + 1)]
284            else:
285                # This string already exists, let's mark the current field
286                # with it, so that it gets compressed
287                data[part].append((current, name))
288                _in = data[part][0][3]
289                build_pkt = build_pkt[:_in] + build_pkt[_in:].replace(
290                    encoded,
291                    b"\0\0",
292                    1
293                )
294                break
295    # Apply compression rules
296    for ck in data:
297        # compression_key is a DNS string
298        replacements = data[ck]
299        # replacements is the list of all tuples (layer, field name)
300        # where this string was found
301        replace_pointer = replacements.pop(0)[2]
302        # replace_pointer is the packed pointer that should replace
303        # those strings. Note that pop remove it from the list
304        for rep in replacements:
305            # setfieldval edits the value of the field in the layer
306            val = rep[0].getfieldval(rep[1])
307            assert val.endswith(ck)
308            kept_string = dns_encode(val[:-len(ck)], check_built=True)[:-1]
309            new_val = kept_string + replace_pointer
310            rep[0].setfieldval(rep[1], new_val)
311            try:
312                del rep[0].rdlen
313            except AttributeError:
314                pass
315    # End of the compression algorithm
316    # Destroy the previous DNS layer if needed
317    if not isinstance(pkt, DNS) and pkt.getlayer(DNS).underlayer:
318        pkt.getlayer(DNS).underlayer.remove_payload()
319        return pkt / dns_pkt
320    return dns_pkt
321
322
323class DNSCompressedPacket(Packet):
324    """
325    Class to mark that a packet contains DNSStrField and supports compression
326    """
327    @abc.abstractmethod
328    def get_full(self):
329        pass
330
331
332class DNSStrField(StrLenField):
333    """
334    Special StrField that handles DNS encoding/decoding.
335    It will also handle DNS decompression.
336    (may be StrLenField if a length_from is passed),
337    """
338    def any2i(self, pkt, x):
339        if x and isinstance(x, list):
340            return [self.h2i(pkt, y) for y in x]
341        return super(DNSStrField, self).any2i(pkt, x)
342
343    def h2i(self, pkt, x):
344        # Setting a DNSStrField manually (h2i) means any current compression will break
345        if (
346            pkt and
347            isinstance(pkt.parent, DNSCompressedPacket) and
348            pkt.parent.raw_packet_cache
349        ):
350            pkt.parent.clear_cache()
351        if not x:
352            return b"."
353        x = bytes_encode(x)
354        if x[-1:] != b"." and not _is_ptr(x):
355            return x + b"."
356        return x
357
358    def i2m(self, pkt, x):
359        return dns_encode(x, check_built=True)
360
361    def i2len(self, pkt, x):
362        return len(self.i2m(pkt, x))
363
364    def get_full(self, pkt):
365        while pkt and not isinstance(pkt, DNSCompressedPacket):
366            pkt = pkt.parent or pkt.underlayer
367        if not pkt:
368            return None
369        return pkt.get_full()
370
371    def getfield(self, pkt, s):
372        remain = b""
373        if self.length_from:
374            remain, s = super(DNSStrField, self).getfield(pkt, s)
375        # Decode the compressed DNS message
376        decoded, left = dns_get_str(s, full=self.get_full(pkt))
377        # returns (remaining, decoded)
378        return left + remain, decoded
379
380
381class DNSTextField(StrLenField):
382    """
383    Special StrLenField that handles DNS TEXT data (16)
384    """
385
386    islist = 1
387
388    def i2h(self, pkt, x):
389        if not x:
390            return []
391        return x
392
393    def m2i(self, pkt, s):
394        ret_s = list()
395        tmp_s = s
396        # RDATA contains a list of strings, each are prepended with
397        # a byte containing the size of the following string.
398        while tmp_s:
399            tmp_len = orb(tmp_s[0]) + 1
400            if tmp_len > len(tmp_s):
401                log_runtime.info(
402                    "DNS RR TXT prematured end of character-string "
403                    "(size=%i, remaining bytes=%i)", tmp_len, len(tmp_s)
404                )
405            ret_s.append(tmp_s[1:tmp_len])
406            tmp_s = tmp_s[tmp_len:]
407        return ret_s
408
409    def any2i(self, pkt, x):
410        if isinstance(x, (str, bytes)):
411            return [x]
412        return x
413
414    def i2len(self, pkt, x):
415        return len(self.i2m(pkt, x))
416
417    def i2m(self, pkt, s):
418        ret_s = b""
419        for text in s:
420            if not text:
421                ret_s += b"\x00"
422                continue
423            text = bytes_encode(text)
424            # The initial string must be split into a list of strings
425            # prepended with theirs sizes.
426            while len(text) >= 255:
427                ret_s += b"\xff" + text[:255]
428                text = text[255:]
429            # The remaining string is less than 255 bytes long
430            if len(text):
431                ret_s += struct.pack("!B", len(text)) + text
432        return ret_s
433
434
435# RFC 2671 - Extension Mechanisms for DNS (EDNS0)
436
437edns0types = {0: "Reserved", 1: "LLQ", 2: "UL", 3: "NSID", 4: "Owner",
438              5: "DAU", 6: "DHU", 7: "N3U", 8: "edns-client-subnet", 10: "COOKIE",
439              15: "Extended DNS Error"}
440
441
442class _EDNS0Dummy(Packet):
443    name = "Dummy class that implements extract_padding()"
444
445    def extract_padding(self, p):
446        # type: (bytes) -> Tuple[bytes, Optional[bytes]]
447        return "", p
448
449
450class EDNS0TLV(_EDNS0Dummy):
451    name = "DNS EDNS0 TLV"
452    fields_desc = [ShortEnumField("optcode", 0, edns0types),
453                   FieldLenField("optlen", None, "optdata", fmt="H"),
454                   StrLenField("optdata", "",
455                               length_from=lambda pkt: pkt.optlen)]
456
457    @classmethod
458    def dispatch_hook(cls, _pkt=None, *args, **kargs):
459        # type: (Optional[bytes], *Any, **Any) -> Type[Packet]
460        if _pkt is None:
461            return EDNS0TLV
462        if len(_pkt) < 2:
463            return Raw
464        edns0type = struct.unpack("!H", _pkt[:2])[0]
465        return EDNS0OPT_DISPATCHER.get(edns0type, EDNS0TLV)
466
467
468class DNSRROPT(Packet):
469    name = "DNS OPT Resource Record"
470    fields_desc = [DNSStrField("rrname", ""),
471                   ShortEnumField("type", 41, dnstypes),
472                   ShortEnumField("rclass", 4096, dnsclasses),
473                   ByteField("extrcode", 0),
474                   ByteField("version", 0),
475                   # version 0 means EDNS0
476                   BitEnumField("z", 32768, 16, {32768: "D0"}),
477                   # D0 means DNSSEC OK from RFC 3225
478                   FieldLenField("rdlen", None, length_of="rdata", fmt="H"),
479                   PacketListField("rdata", [], EDNS0TLV,
480                                   length_from=lambda pkt: pkt.rdlen)]
481
482
483# draft-cheshire-edns0-owner-option-01 - EDNS0 OWNER Option
484
485class EDNS0OWN(_EDNS0Dummy):
486    name = "EDNS0 Owner (OWN)"
487    fields_desc = [ShortEnumField("optcode", 4, edns0types),
488                   FieldLenField("optlen", None, count_of="primary_mac", fmt="H"),
489                   ByteField("v", 0),
490                   ByteField("s", 0),
491                   MACField("primary_mac", "00:00:00:00:00:00"),
492                   ConditionalField(
493                       MACField("wakeup_mac", "00:00:00:00:00:00"),
494                       lambda pkt: (pkt.optlen or 0) >= 18),
495                   ConditionalField(
496                       StrLenField("password", "",
497                                   length_from=lambda pkt: pkt.optlen - 18),
498                       lambda pkt: (pkt.optlen or 0) >= 22)]
499
500    def post_build(self, pkt, pay):
501        pkt += pay
502        if self.optlen is None:
503            pkt = pkt[:2] + struct.pack("!H", len(pkt) - 4) + pkt[4:]
504        return pkt
505
506
507# RFC 6975 - Signaling Cryptographic Algorithm Understanding in
508# DNS Security Extensions (DNSSEC)
509
510class EDNS0DAU(_EDNS0Dummy):
511    name = "DNSSEC Algorithm Understood (DAU)"
512    fields_desc = [ShortEnumField("optcode", 5, edns0types),
513                   FieldLenField("optlen", None, count_of="alg_code", fmt="H"),
514                   FieldListField("alg_code", None,
515                                  ByteEnumField("", 0, dnssecalgotypes),
516                                  count_from=lambda pkt:pkt.optlen)]
517
518
519class EDNS0DHU(_EDNS0Dummy):
520    name = "DS Hash Understood (DHU)"
521    fields_desc = [ShortEnumField("optcode", 6, edns0types),
522                   FieldLenField("optlen", None, count_of="alg_code", fmt="H"),
523                   FieldListField("alg_code", None,
524                                  ByteEnumField("", 0, dnssecdigesttypes),
525                                  count_from=lambda pkt:pkt.optlen)]
526
527
528class EDNS0N3U(_EDNS0Dummy):
529    name = "NSEC3 Hash Understood (N3U)"
530    fields_desc = [ShortEnumField("optcode", 7, edns0types),
531                   FieldLenField("optlen", None, count_of="alg_code", fmt="H"),
532                   FieldListField("alg_code", None,
533                                  ByteEnumField("", 0, dnssecnsec3algotypes),
534                                  count_from=lambda pkt:pkt.optlen)]
535
536
537# RFC 7871 - Client Subnet in DNS Queries
538
539class ClientSubnetv4(StrLenField):
540    af_familly = socket.AF_INET
541    af_length = 32
542    af_default = b"\xc0"  # 192.0.0.0
543
544    def getfield(self, pkt, s):
545        # type: (Packet, bytes) -> Tuple[bytes, I]
546        sz = operator.floordiv(self.length_from(pkt), 8)
547        sz = min(sz, operator.floordiv(self.af_length, 8))
548        return s[sz:], self.m2i(pkt, s[:sz])
549
550    def m2i(self, pkt, x):
551        # type: (Optional[Packet], bytes) -> str
552        padding = self.af_length - self.length_from(pkt)
553        if padding:
554            x += b"\x00" * operator.floordiv(padding, 8)
555        x = x[: operator.floordiv(self.af_length, 8)]
556        return inet_ntop(self.af_familly, x)
557
558    def _pack_subnet(self, subnet):
559        # type: (bytes) -> bytes
560        packed_subnet = inet_pton(self.af_familly, plain_str(subnet))
561        for i in list(range(operator.floordiv(self.af_length, 8)))[::-1]:
562            if orb(packed_subnet[i]) != 0:
563                i += 1
564                break
565        return packed_subnet[:i]
566
567    def i2m(self, pkt, x):
568        # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes
569        if x is None:
570            return self.af_default
571        try:
572            return self._pack_subnet(x)
573        except (OSError, socket.error):
574            pkt.family = 2
575            return ClientSubnetv6("", "")._pack_subnet(x)
576
577    def i2len(self, pkt, x):
578        # type: (Packet, Any) -> int
579        if x is None:
580            return 1
581        try:
582            return len(self._pack_subnet(x))
583        except (OSError, socket.error):
584            pkt.family = 2
585            return len(ClientSubnetv6("", "")._pack_subnet(x))
586
587
588class ClientSubnetv6(ClientSubnetv4):
589    af_familly = socket.AF_INET6
590    af_length = 128
591    af_default = b"\x20"  # 2000::
592
593
594class EDNS0ClientSubnet(_EDNS0Dummy):
595    name = "DNS EDNS0 Client Subnet"
596    fields_desc = [ShortEnumField("optcode", 8, edns0types),
597                   FieldLenField("optlen", None, "address", fmt="H",
598                                 adjust=lambda pkt, x: x + 4),
599                   ShortField("family", 1),
600                   FieldLenField("source_plen", None,
601                                 length_of="address",
602                                 fmt="B",
603                                 adjust=lambda pkt, x: x * 8),
604                   ByteField("scope_plen", 0),
605                   MultipleTypeField(
606                       [(ClientSubnetv4("address", "192.168.0.0",
607                         length_from=lambda p: p.source_plen),
608                         lambda pkt: pkt.family == 1),
609                        (ClientSubnetv6("address", "2001:db8::",
610                         length_from=lambda p: p.source_plen),
611                         lambda pkt: pkt.family == 2)],
612                       ClientSubnetv4("address", "192.168.0.0",
613                                      length_from=lambda p: p.source_plen))]
614
615
616class EDNS0COOKIE(_EDNS0Dummy):
617    name = "DNS EDNS0 COOKIE"
618    fields_desc = [ShortEnumField("optcode", 10, edns0types),
619                   FieldLenField("optlen", None, length_of="server_cookie", fmt="!H",
620                                 adjust=lambda pkt, x: x + 8),
621                   XStrFixedLenField("client_cookie", b"\x00" * 8, length=8),
622                   XStrLenField("server_cookie", "",
623                                length_from=lambda pkt: max(0, pkt.optlen - 8))]
624
625
626# RFC 8914 - Extended DNS Errors
627
628# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#extended-dns-error-codes
629extended_dns_error_codes = {
630    0: "Other",
631    1: "Unsupported DNSKEY Algorithm",
632    2: "Unsupported DS Digest Type",
633    3: "Stale Answer",
634    4: "Forged Answer",
635    5: "DNSSEC Indeterminate",
636    6: "DNSSEC Bogus",
637    7: "Signature Expired",
638    8: "Signature Not Yet Valid",
639    9: "DNSKEY Missing",
640    10: "RRSIGs Missing",
641    11: "No Zone Key Bit Set",
642    12: "NSEC Missing",
643    13: "Cached Error",
644    14: "Not Ready",
645    15: "Blocked",
646    16: "Censored",
647    17: "Filtered",
648    18: "Prohibited",
649    19: "Stale NXDOMAIN Answer",
650    20: "Not Authoritative",
651    21: "Not Supported",
652    22: "No Reachable Authority",
653    23: "Network Error",
654    24: "Invalid Data",
655    25: "Signature Expired before Valid",
656    26: "Too Early",
657    27: "Unsupported NSEC3 Iterations Value",
658    28: "Unable to conform to policy",
659    29: "Synthesized",
660}
661
662
663# https://www.rfc-editor.org/rfc/rfc8914.html
664class EDNS0ExtendedDNSError(_EDNS0Dummy):
665    name = "DNS EDNS0 Extended DNS Error"
666    fields_desc = [ShortEnumField("optcode", 15, edns0types),
667                   FieldLenField("optlen", None, length_of="extra_text", fmt="!H",
668                                 adjust=lambda pkt, x: x + 2),
669                   ShortEnumField("info_code", 0, extended_dns_error_codes),
670                   StrLenField("extra_text", "",
671                               length_from=lambda pkt: pkt.optlen - 2)]
672
673
674EDNS0OPT_DISPATCHER = {
675    4: EDNS0OWN,
676    5: EDNS0DAU,
677    6: EDNS0DHU,
678    7: EDNS0N3U,
679    8: EDNS0ClientSubnet,
680    10: EDNS0COOKIE,
681    15: EDNS0ExtendedDNSError,
682}
683
684
685# RFC 4034 - Resource Records for the DNS Security Extensions
686
687def bitmap2RRlist(bitmap):
688    """
689    Decode the 'Type Bit Maps' field of the NSEC Resource Record into an
690    integer list.
691    """
692    # RFC 4034, 4.1.2. The Type Bit Maps Field
693
694    RRlist = []
695
696    while bitmap:
697
698        if len(bitmap) < 2:
699            log_runtime.info("bitmap too short (%i)", len(bitmap))
700            return
701
702        window_block = orb(bitmap[0])  # window number
703        offset = 256 * window_block  # offset of the Resource Record
704        bitmap_len = orb(bitmap[1])  # length of the bitmap in bytes
705
706        if bitmap_len <= 0 or bitmap_len > 32:
707            log_runtime.info("bitmap length is no valid (%i)", bitmap_len)
708            return
709
710        tmp_bitmap = bitmap[2:2 + bitmap_len]
711
712        # Let's compare each bit of tmp_bitmap and compute the real RR value
713        for b in range(len(tmp_bitmap)):
714            v = 128
715            for i in range(8):
716                if orb(tmp_bitmap[b]) & v:
717                    # each of the RR is encoded as a bit
718                    RRlist += [offset + b * 8 + i]
719                v = v >> 1
720
721        # Next block if any
722        bitmap = bitmap[2 + bitmap_len:]
723
724    return RRlist
725
726
727def RRlist2bitmap(lst):
728    """
729    Encode a list of integers representing Resource Records to a bitmap field
730    used in the NSEC Resource Record.
731    """
732    # RFC 4034, 4.1.2. The Type Bit Maps Field
733
734    import math
735
736    bitmap = b""
737    lst = [abs(x) for x in sorted(set(lst)) if x <= 65535]
738
739    # number of window blocks
740    max_window_blocks = int(math.ceil(lst[-1] / 256.))
741    min_window_blocks = int(math.floor(lst[0] / 256.))
742    if min_window_blocks == max_window_blocks:
743        max_window_blocks += 1
744
745    for wb in range(min_window_blocks, max_window_blocks + 1):
746        # First, filter out RR not encoded in the current window block
747        # i.e. keep everything between 256*wb <= 256*(wb+1)
748        rrlist = sorted(x for x in lst if 256 * wb <= x < 256 * (wb + 1))
749        if not rrlist:
750            continue
751
752        # Compute the number of bytes used to store the bitmap
753        if rrlist[-1] == 0:  # only one element in the list
754            bytes_count = 1
755        else:
756            max = rrlist[-1] - 256 * wb
757            bytes_count = int(math.ceil(max // 8)) + 1  # use at least 1 byte
758        if bytes_count > 32:  # Don't encode more than 256 bits / values
759            bytes_count = 32
760
761        bitmap += struct.pack("BB", wb, bytes_count)
762
763        # Generate the bitmap
764        # The idea is to remove out of range Resource Records with these steps
765        # 1. rescale to fit into 8 bits
766        # 2. x gives the bit position ; compute the corresponding value
767        # 3. sum everything
768        bitmap += b"".join(
769            struct.pack(
770                b"B",
771                sum(2 ** (7 - (x - 256 * wb) + (tmp * 8)) for x in rrlist
772                    if 256 * wb + 8 * tmp <= x < 256 * wb + 8 * tmp + 8),
773            ) for tmp in range(bytes_count)
774        )
775
776    return bitmap
777
778
779class RRlistField(StrField):
780    islist = 1
781
782    def h2i(self, pkt, x):
783        if x and isinstance(x, list):
784            return RRlist2bitmap(x)
785        return x
786
787    def i2repr(self, pkt, x):
788        if not x:
789            return "[]"
790        x = self.i2h(pkt, x)
791        rrlist = bitmap2RRlist(x)
792        return [dnstypes.get(rr, rr) for rr in rrlist] if rrlist else repr(x)
793
794
795class _DNSRRdummy(Packet):
796    name = "Dummy class that implements post_build() for Resource Records"
797
798    def post_build(self, pkt, pay):
799        if self.rdlen is not None:
800            return pkt + pay
801
802        lrrname = len(self.fields_desc[0].i2m("", self.getfieldval("rrname")))
803        tmp_len = len(pkt) - lrrname - 10
804        tmp_pkt = pkt[:lrrname + 8]
805        pkt = struct.pack("!H", tmp_len) + pkt[lrrname + 8 + 2:]
806
807        return tmp_pkt + pkt + pay
808
809    def default_payload_class(self, payload):
810        return conf.padding_layer
811
812
813class DNSRRHINFO(_DNSRRdummy):
814    name = "DNS HINFO Resource Record"
815    fields_desc = [DNSStrField("rrname", ""),
816                   ShortEnumField("type", 13, dnstypes),
817                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
818                   BitEnumField("rclass", 1, 15, dnsclasses),
819                   IntField("ttl", 0),
820                   ShortField("rdlen", None),
821                   FieldLenField("cpulen", None, fmt="!B", length_of="cpu"),
822                   StrLenField("cpu", "", length_from=lambda x: x.cpulen),
823                   FieldLenField("oslen", None, fmt="!B", length_of="os"),
824                   StrLenField("os", "", length_from=lambda x: x.oslen)]
825
826
827class DNSRRMX(_DNSRRdummy):
828    name = "DNS MX Resource Record"
829    fields_desc = [DNSStrField("rrname", ""),
830                   ShortEnumField("type", 15, dnstypes),
831                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
832                   BitEnumField("rclass", 1, 15, dnsclasses),
833                   IntField("ttl", 0),
834                   ShortField("rdlen", None),
835                   ShortField("preference", 0),
836                   DNSStrField("exchange", ""),
837                   ]
838
839
840class DNSRRSOA(_DNSRRdummy):
841    name = "DNS SOA Resource Record"
842    fields_desc = [DNSStrField("rrname", ""),
843                   ShortEnumField("type", 6, dnstypes),
844                   ShortEnumField("rclass", 1, dnsclasses),
845                   IntField("ttl", 0),
846                   ShortField("rdlen", None),
847                   DNSStrField("mname", ""),
848                   DNSStrField("rname", ""),
849                   IntField("serial", 0),
850                   IntField("refresh", 0),
851                   IntField("retry", 0),
852                   IntField("expire", 0),
853                   IntField("minimum", 0)
854                   ]
855
856
857class DNSRRRSIG(_DNSRRdummy):
858    name = "DNS RRSIG Resource Record"
859    fields_desc = [DNSStrField("rrname", ""),
860                   ShortEnumField("type", 46, dnstypes),
861                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
862                   BitEnumField("rclass", 1, 15, dnsclasses),
863                   IntField("ttl", 0),
864                   ShortField("rdlen", None),
865                   ShortEnumField("typecovered", 1, dnstypes),
866                   ByteEnumField("algorithm", 5, dnssecalgotypes),
867                   ByteField("labels", 0),
868                   IntField("originalttl", 0),
869                   UTCTimeField("expiration", 0),
870                   UTCTimeField("inception", 0),
871                   ShortField("keytag", 0),
872                   DNSStrField("signersname", ""),
873                   StrField("signature", "")
874                   ]
875
876
877class DNSRRNSEC(_DNSRRdummy):
878    name = "DNS NSEC Resource Record"
879    fields_desc = [DNSStrField("rrname", ""),
880                   ShortEnumField("type", 47, dnstypes),
881                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
882                   BitEnumField("rclass", 1, 15, dnsclasses),
883                   IntField("ttl", 0),
884                   ShortField("rdlen", None),
885                   DNSStrField("nextname", ""),
886                   RRlistField("typebitmaps", [])
887                   ]
888
889
890class DNSRRDNSKEY(_DNSRRdummy):
891    name = "DNS DNSKEY Resource Record"
892    fields_desc = [DNSStrField("rrname", ""),
893                   ShortEnumField("type", 48, dnstypes),
894                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
895                   BitEnumField("rclass", 1, 15, dnsclasses),
896                   IntField("ttl", 0),
897                   ShortField("rdlen", None),
898                   FlagsField("flags", 256, 16, "S???????Z???????"),
899                   # S: Secure Entry Point
900                   # Z: Zone Key
901                   ByteField("protocol", 3),
902                   ByteEnumField("algorithm", 5, dnssecalgotypes),
903                   StrField("publickey", "")
904                   ]
905
906
907class DNSRRDS(_DNSRRdummy):
908    name = "DNS DS Resource Record"
909    fields_desc = [DNSStrField("rrname", ""),
910                   ShortEnumField("type", 43, dnstypes),
911                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
912                   BitEnumField("rclass", 1, 15, dnsclasses),
913                   IntField("ttl", 0),
914                   ShortField("rdlen", None),
915                   ShortField("keytag", 0),
916                   ByteEnumField("algorithm", 5, dnssecalgotypes),
917                   ByteEnumField("digesttype", 5, dnssecdigesttypes),
918                   StrField("digest", "")
919                   ]
920
921
922# RFC 5074 - DNSSEC Lookaside Validation (DLV)
923class DNSRRDLV(DNSRRDS):
924    name = "DNS DLV Resource Record"
925
926    def __init__(self, *args, **kargs):
927        DNSRRDS.__init__(self, *args, **kargs)
928        if not kargs.get('type', 0):
929            self.type = 32769
930
931# RFC 5155 - DNS Security (DNSSEC) Hashed Authenticated Denial of Existence
932
933
934class DNSRRNSEC3(_DNSRRdummy):
935    name = "DNS NSEC3 Resource Record"
936    fields_desc = [DNSStrField("rrname", ""),
937                   ShortEnumField("type", 50, dnstypes),
938                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
939                   BitEnumField("rclass", 1, 15, dnsclasses),
940                   IntField("ttl", 0),
941                   ShortField("rdlen", None),
942                   ByteField("hashalg", 0),
943                   BitEnumField("flags", 0, 8, {1: "Opt-Out"}),
944                   ShortField("iterations", 0),
945                   FieldLenField("saltlength", 0, fmt="!B", length_of="salt"),
946                   StrLenField("salt", "", length_from=lambda x: x.saltlength),
947                   FieldLenField("hashlength", 0, fmt="!B", length_of="nexthashedownername"),  # noqa: E501
948                   StrLenField("nexthashedownername", "", length_from=lambda x: x.hashlength),  # noqa: E501
949                   RRlistField("typebitmaps", [])
950                   ]
951
952
953class DNSRRNSEC3PARAM(_DNSRRdummy):
954    name = "DNS NSEC3PARAM Resource Record"
955    fields_desc = [DNSStrField("rrname", ""),
956                   ShortEnumField("type", 51, dnstypes),
957                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
958                   BitEnumField("rclass", 1, 15, dnsclasses),
959                   IntField("ttl", 0),
960                   ShortField("rdlen", None),
961                   ByteField("hashalg", 0),
962                   ByteField("flags", 0),
963                   ShortField("iterations", 0),
964                   FieldLenField("saltlength", 0, fmt="!B", length_of="salt"),
965                   StrLenField("salt", "", length_from=lambda pkt: pkt.saltlength)  # noqa: E501
966                   ]
967
968
969# RFC 9460 Service Binding and Parameter Specification via the DNS
970# https://www.rfc-editor.org/rfc/rfc9460.html
971
972
973# https://www.iana.org/assignments/dns-svcb/dns-svcb.xhtml
974svc_param_keys = {
975    0: "mandatory",
976    1: "alpn",
977    2: "no-default-alpn",
978    3: "port",
979    4: "ipv4hint",
980    5: "ech",
981    6: "ipv6hint",
982    7: "dohpath",
983    8: "ohttp",
984}
985
986
987class SvcParam(Packet):
988    name = "SvcParam"
989    fields_desc = [ShortEnumField("key", 0, svc_param_keys),
990                   FieldLenField("len", None, length_of="value", fmt="H"),
991                   MultipleTypeField(
992                       [
993                           # mandatory
994                           (FieldListField("value", [],
995                                           ShortEnumField("", 0, svc_param_keys),
996                                           length_from=lambda pkt: pkt.len),
997                               lambda pkt: pkt.key == 0),
998                           # alpn, no-default-alpn
999                           (DNSTextField("value", [],
1000                                         length_from=lambda pkt: pkt.len),
1001                               lambda pkt: pkt.key in (1, 2)),
1002                           # port
1003                           (ShortField("value", 0),
1004                               lambda pkt: pkt.key == 3),
1005                           # ipv4hint
1006                           (FieldListField("value", [],
1007                                           IPField("", "0.0.0.0"),
1008                                           length_from=lambda pkt: pkt.len),
1009                               lambda pkt: pkt.key == 4),
1010                           # ipv6hint
1011                           (FieldListField("value", [],
1012                                           IP6Field("", "::"),
1013                                           length_from=lambda pkt: pkt.len),
1014                               lambda pkt: pkt.key == 6),
1015                       ],
1016                       StrLenField("value", "",
1017                                   length_from=lambda pkt:pkt.len))]
1018
1019    def extract_padding(self, p):
1020        return "", p
1021
1022
1023class DNSRRSVCB(_DNSRRdummy):
1024    name = "DNS SVCB Resource Record"
1025    fields_desc = [DNSStrField("rrname", ""),
1026                   ShortEnumField("type", 64, dnstypes),
1027                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
1028                   BitEnumField("rclass", 1, 15, dnsclasses),
1029                   IntField("ttl", 0),
1030                   ShortField("rdlen", None),
1031                   ShortField("svc_priority", 0),
1032                   DNSStrField("target_name", ""),
1033                   PacketListField("svc_params", [], SvcParam)]
1034
1035
1036class DNSRRHTTPS(_DNSRRdummy):
1037    name = "DNS HTTPS Resource Record"
1038    fields_desc = [DNSStrField("rrname", ""),
1039                   ShortEnumField("type", 65, dnstypes)
1040                   ] + DNSRRSVCB.fields_desc[2:]
1041
1042
1043# RFC 2782 - A DNS RR for specifying the location of services (DNS SRV)
1044
1045
1046class DNSRRSRV(_DNSRRdummy):
1047    name = "DNS SRV Resource Record"
1048    fields_desc = [DNSStrField("rrname", ""),
1049                   ShortEnumField("type", 33, dnstypes),
1050                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
1051                   BitEnumField("rclass", 1, 15, dnsclasses),
1052                   IntField("ttl", 0),
1053                   ShortField("rdlen", None),
1054                   ShortField("priority", 0),
1055                   ShortField("weight", 0),
1056                   ShortField("port", 0),
1057                   DNSStrField("target", ""), ]
1058
1059
1060# RFC 2845 - Secret Key Transaction Authentication for DNS (TSIG)
1061tsig_algo_sizes = {"HMAC-MD5.SIG-ALG.REG.INT": 16,
1062                   "hmac-sha1": 20}
1063
1064
1065class TimeSignedField(Field[int, bytes]):
1066    def __init__(self, name, default):
1067        Field.__init__(self, name, default, fmt="6s")
1068
1069    def _convert_seconds(self, packed_seconds):
1070        """Unpack the internal representation."""
1071        seconds = struct.unpack("!H", packed_seconds[:2])[0]
1072        seconds += struct.unpack("!I", packed_seconds[2:])[0]
1073        return seconds
1074
1075    def i2m(self, pkt, seconds):
1076        """Convert the number of seconds since 1-Jan-70 UTC to the packed
1077           representation."""
1078
1079        if seconds is None:
1080            seconds = 0
1081
1082        tmp_short = (seconds >> 32) & 0xFFFF
1083        tmp_int = seconds & 0xFFFFFFFF
1084
1085        return struct.pack("!HI", tmp_short, tmp_int)
1086
1087    def m2i(self, pkt, packed_seconds):
1088        """Convert the internal representation to the number of seconds
1089           since 1-Jan-70 UTC."""
1090
1091        if packed_seconds is None:
1092            return None
1093
1094        return self._convert_seconds(packed_seconds)
1095
1096    def i2repr(self, pkt, packed_seconds):
1097        """Convert the internal representation to a nice one using the RFC
1098           format."""
1099        time_struct = time.gmtime(packed_seconds)
1100        return time.strftime("%a %b %d %H:%M:%S %Y", time_struct)
1101
1102
1103class DNSRRTSIG(_DNSRRdummy):
1104    name = "DNS TSIG Resource Record"
1105    fields_desc = [DNSStrField("rrname", ""),
1106                   ShortEnumField("type", 250, dnstypes),
1107                   ShortEnumField("rclass", 1, dnsclasses),
1108                   IntField("ttl", 0),
1109                   ShortField("rdlen", None),
1110                   DNSStrField("algo_name", "hmac-sha1"),
1111                   TimeSignedField("time_signed", 0),
1112                   ShortField("fudge", 0),
1113                   FieldLenField("mac_len", 20, fmt="!H", length_of="mac_data"),  # noqa: E501
1114                   StrLenField("mac_data", "", length_from=lambda pkt: pkt.mac_len),  # noqa: E501
1115                   ShortField("original_id", 0),
1116                   ShortField("error", 0),
1117                   FieldLenField("other_len", 0, fmt="!H", length_of="other_data"),  # noqa: E501
1118                   StrLenField("other_data", "", length_from=lambda pkt: pkt.other_len)  # noqa: E501
1119                   ]
1120
1121
1122class DNSRRNAPTR(_DNSRRdummy):
1123    name = "DNS NAPTR Resource Record"
1124    fields_desc = [DNSStrField("rrname", ""),
1125                   ShortEnumField("type", 35, dnstypes),
1126                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
1127                   BitEnumField("rclass", 1, 15, dnsclasses),
1128                   IntField("ttl", 0),
1129                   ShortField("rdlen", None),
1130                   ShortField("order", 0),
1131                   ShortField("preference", 0),
1132                   FieldLenField("flags_len", None, fmt="!B", length_of="flags"),
1133                   StrLenField("flags", "", length_from=lambda pkt: pkt.flags_len),
1134                   FieldLenField("services_len", None, fmt="!B", length_of="services"),
1135                   StrLenField("services", "",
1136                               length_from=lambda pkt: pkt.services_len),
1137                   FieldLenField("regexp_len", None, fmt="!B", length_of="regexp"),
1138                   StrLenField("regexp", "", length_from=lambda pkt: pkt.regexp_len),
1139                   DNSStrField("replacement", ""),
1140                   ]
1141
1142
1143DNSRR_DISPATCHER = {
1144    6: DNSRRSOA,         # RFC 1035
1145    13: DNSRRHINFO,      # RFC 1035
1146    15: DNSRRMX,         # RFC 1035
1147    33: DNSRRSRV,        # RFC 2782
1148    35: DNSRRNAPTR,      # RFC 2915
1149    41: DNSRROPT,        # RFC 1671
1150    43: DNSRRDS,         # RFC 4034
1151    46: DNSRRRSIG,       # RFC 4034
1152    47: DNSRRNSEC,       # RFC 4034
1153    48: DNSRRDNSKEY,     # RFC 4034
1154    50: DNSRRNSEC3,      # RFC 5155
1155    51: DNSRRNSEC3PARAM,  # RFC 5155
1156    64: DNSRRSVCB,       # RFC 9460
1157    65: DNSRRHTTPS,      # RFC 9460
1158    250: DNSRRTSIG,      # RFC 2845
1159    32769: DNSRRDLV,     # RFC 4431
1160}
1161
1162
1163class DNSRR(Packet):
1164    name = "DNS Resource Record"
1165    show_indent = 0
1166    fields_desc = [DNSStrField("rrname", ""),
1167                   ShortEnumField("type", 1, dnstypes),
1168                   BitField("cacheflush", 0, 1),  # mDNS RFC 6762
1169                   BitEnumField("rclass", 1, 15, dnsclasses),
1170                   IntField("ttl", 0),
1171                   FieldLenField("rdlen", None, length_of="rdata", fmt="H"),
1172                   MultipleTypeField(
1173                       [
1174                           # A
1175                           (IPField("rdata", "0.0.0.0"),
1176                               lambda pkt: pkt.type == 1),
1177                           # AAAA
1178                           (IP6Field("rdata", "::"),
1179                               lambda pkt: pkt.type == 28),
1180                           # NS, MD, MF, CNAME, PTR, DNAME
1181                           (DNSStrField("rdata", "",
1182                                        length_from=lambda pkt: pkt.rdlen),
1183                               lambda pkt: pkt.type in [2, 3, 4, 5, 12, 39]),
1184                           # TEXT
1185                           (DNSTextField("rdata", [""],
1186                                         length_from=lambda pkt: pkt.rdlen),
1187                               lambda pkt: pkt.type == 16),
1188                       ],
1189                       StrLenField("rdata", "",
1190                                   length_from=lambda pkt:pkt.rdlen)
1191    )]
1192
1193    def default_payload_class(self, payload):
1194        return conf.padding_layer
1195
1196
1197def _DNSRR(s, **kwargs):
1198    """
1199    DNSRR dispatcher func
1200    """
1201    if s:
1202        # Try to find the type of the RR using the dispatcher
1203        _, remain = dns_get_str(s, _ignore_compression=True)
1204        cls = DNSRR_DISPATCHER.get(
1205            struct.unpack("!H", remain[:2])[0],
1206            DNSRR,
1207        )
1208        rrlen = (
1209            len(s) - len(remain) +  # rrname len
1210            10 +
1211            struct.unpack("!H", remain[8:10])[0]
1212        )
1213        pkt = cls(s[:rrlen], **kwargs) / conf.padding_layer(s[rrlen:])
1214        # drop rdlen because if rdata was compressed, it will break everything
1215        # when rebuilding
1216        del pkt.fields["rdlen"]
1217        return pkt
1218    return None
1219
1220
1221class DNSQR(Packet):
1222    name = "DNS Question Record"
1223    show_indent = 0
1224    fields_desc = [DNSStrField("qname", "www.example.com"),
1225                   ShortEnumField("qtype", 1, dnsqtypes),
1226                   BitField("unicastresponse", 0, 1),  # mDNS RFC 6762
1227                   BitEnumField("qclass", 1, 15, dnsclasses)]
1228
1229    def default_payload_class(self, payload):
1230        return conf.padding_layer
1231
1232
1233class _DNSPacketListField(PacketListField):
1234    # A normal PacketListField with backward-compatible hacks
1235    def any2i(self, pkt, x):
1236        # type: (Optional[Packet], List[Any]) -> List[Any]
1237        if x is None:
1238            warnings.warn(
1239                ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now "
1240                 "PacketListField(s) ! "
1241                 "Setting a null default should be [] instead of None"),
1242                DeprecationWarning
1243            )
1244            x = []
1245        return super(_DNSPacketListField, self).any2i(pkt, x)
1246
1247    def i2h(self, pkt, x):
1248        # type: (Optional[Packet], List[Packet]) -> Any
1249        class _list(list):
1250            """
1251            Fake list object to provide compatibility with older DNS fields
1252            """
1253            def __getattr__(self, attr):
1254                try:
1255                    ret = getattr(self[0], attr)
1256                    warnings.warn(
1257                        ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now "
1258                         "PacketListField(s) ! "
1259                         "To access the first element, use pkt.an[0] instead of "
1260                         "pkt.an"),
1261                        DeprecationWarning
1262                    )
1263                    return ret
1264                except AttributeError:
1265                    raise
1266        return _list(x)
1267
1268
1269class DNS(DNSCompressedPacket):
1270    name = "DNS"
1271    fields_desc = [
1272        ConditionalField(ShortField("length", None),
1273                         lambda p: isinstance(p.underlayer, TCP)),
1274        ShortField("id", 0),
1275        BitField("qr", 0, 1),
1276        BitEnumField("opcode", 0, 4, {0: "QUERY", 1: "IQUERY", 2: "STATUS"}),
1277        BitField("aa", 0, 1),
1278        BitField("tc", 0, 1),
1279        BitField("rd", 1, 1),
1280        BitField("ra", 0, 1),
1281        BitField("z", 0, 1),
1282        # AD and CD bits are defined in RFC 2535
1283        BitField("ad", 0, 1),  # Authentic Data
1284        BitField("cd", 0, 1),  # Checking Disabled
1285        BitEnumField("rcode", 0, 4, {0: "ok", 1: "format-error",
1286                                     2: "server-failure", 3: "name-error",
1287                                     4: "not-implemented", 5: "refused"}),
1288        FieldLenField("qdcount", None, count_of="qd"),
1289        FieldLenField("ancount", None, count_of="an"),
1290        FieldLenField("nscount", None, count_of="ns"),
1291        FieldLenField("arcount", None, count_of="ar"),
1292        _DNSPacketListField("qd", [DNSQR()], DNSQR, count_from=lambda pkt: pkt.qdcount),
1293        _DNSPacketListField("an", [], _DNSRR, count_from=lambda pkt: pkt.ancount),
1294        _DNSPacketListField("ns", [], _DNSRR, count_from=lambda pkt: pkt.nscount),
1295        _DNSPacketListField("ar", [], _DNSRR, count_from=lambda pkt: pkt.arcount),
1296    ]
1297
1298    def get_full(self):
1299        # Required for DNSCompressedPacket
1300        if isinstance(self.underlayer, TCP):
1301            return self.original[2:]
1302        else:
1303            return self.original
1304
1305    def answers(self, other):
1306        return (isinstance(other, DNS) and
1307                self.id == other.id and
1308                self.qr == 1 and
1309                other.qr == 0)
1310
1311    def mysummary(self):
1312        name = ""
1313        if self.qr:
1314            type = "Ans"
1315            if self.an and isinstance(self.an[0], DNSRR):
1316                name = ' %s' % self.an[0].rdata
1317            elif self.rcode != 0:
1318                name = self.sprintf(' %rcode%')
1319        else:
1320            type = "Qry"
1321            if self.qd and isinstance(self.qd[0], DNSQR):
1322                name = ' %s' % self.qd[0].qname
1323        return "%sDNS %s%s" % (
1324            "m"
1325            if isinstance(self.underlayer, UDP) and self.underlayer.dport == 5353
1326            else "",
1327            type,
1328            name,
1329        )
1330
1331    def post_build(self, pkt, pay):
1332        if isinstance(self.underlayer, TCP) and self.length is None:
1333            pkt = struct.pack("!H", len(pkt) - 2) + pkt[2:]
1334        return pkt + pay
1335
1336    def compress(self):
1337        """Return the compressed DNS packet (using `dns_compress()`)"""
1338        return dns_compress(self)
1339
1340    def pre_dissect(self, s):
1341        """
1342        Check that a valid DNS over TCP message can be decoded
1343        """
1344        if isinstance(self.underlayer, TCP):
1345
1346            # Compute the length of the DNS packet
1347            if len(s) >= 2:
1348                dns_len = struct.unpack("!H", s[:2])[0]
1349            else:
1350                message = "Malformed DNS message: too small!"
1351                log_runtime.info(message)
1352                raise Scapy_Exception(message)
1353
1354            # Check if the length is valid
1355            if dns_len < 14 or len(s) < dns_len:
1356                message = "Malformed DNS message: invalid length!"
1357                log_runtime.info(message)
1358                raise Scapy_Exception(message)
1359
1360        return s
1361
1362
1363bind_layers(UDP, DNS, dport=5353)
1364bind_layers(UDP, DNS, sport=5353)
1365bind_layers(UDP, DNS, dport=53)
1366bind_layers(UDP, DNS, sport=53)
1367DestIPField.bind_addr(UDP, "224.0.0.251", dport=5353)
1368if conf.ipv6_enabled:
1369    from scapy.layers.inet6 import DestIP6Field
1370    DestIP6Field.bind_addr(UDP, "ff02::fb", dport=5353)
1371bind_layers(TCP, DNS, dport=53)
1372bind_layers(TCP, DNS, sport=53)
1373
1374# Nameserver config
1375conf.nameservers = read_nameservers()
1376_dns_cache = conf.netcache.new_cache("dns_cache", 300)
1377
1378
1379@conf.commands.register
1380def dns_resolve(qname, qtype="A", raw=False, verbose=1, timeout=3, **kwargs):
1381    """
1382    Perform a simple DNS resolution using conf.nameservers with caching
1383
1384    :param qname: the name to query
1385    :param qtype: the type to query (default A)
1386    :param raw: return the whole DNS packet (default False)
1387    :param verbose: show verbose errors
1388    :param timeout: seconds until timeout (per server)
1389    :raise TimeoutError: if no DNS servers were reached in time.
1390    """
1391    # Unify types
1392    qtype = DNSQR.qtype.any2i_one(None, qtype)
1393    qname = DNSQR.qname.any2i(None, qname)
1394    # Check cache
1395    cache_ident = b";".join(
1396        [qname, struct.pack("!B", qtype)] +
1397        ([b"raw"] if raw else [])
1398    )
1399    result = _dns_cache.get(cache_ident)
1400    if result:
1401        return result
1402
1403    kwargs.setdefault("timeout", timeout)
1404    kwargs.setdefault("verbose", 0)
1405    res = None
1406    for nameserver in conf.nameservers:
1407        # Try all nameservers
1408        try:
1409            # Spawn a UDP socket, connect to the nameserver on port 53
1410            sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1411            sock.settimeout(kwargs["timeout"])
1412            sock.connect((nameserver, 53))
1413            # Connected. Wrap it with DNS
1414            sock = StreamSocket(sock, DNS)
1415            # I/O
1416            res = sock.sr1(
1417                DNS(qd=[DNSQR(qname=qname, qtype=qtype)], id=RandShort()),
1418                **kwargs,
1419            )
1420        except IOError as ex:
1421            if verbose:
1422                log_runtime.warning(str(ex))
1423            continue
1424        finally:
1425            sock.close()
1426        if res:
1427            # We have a response ! Check for failure
1428            if res[DNS].rcode == 2:  # server failure
1429                res = None
1430                if verbose:
1431                    log_runtime.info(
1432                        "DNS: %s answered with failure for %s" % (
1433                            nameserver,
1434                            qname,
1435                        )
1436                    )
1437            else:
1438                break
1439    if res is not None:
1440        if raw:
1441            # Raw
1442            result = res
1443        else:
1444            # Find answers
1445            result = [
1446                x
1447                for x in itertools.chain(res.an, res.ns, res.ar)
1448                if x.type == qtype
1449            ]
1450        if result:
1451            # Cache it
1452            _dns_cache[cache_ident] = result
1453        return result
1454    else:
1455        raise TimeoutError
1456
1457
1458@conf.commands.register
1459def dyndns_add(nameserver, name, rdata, type="A", ttl=10):
1460    """Send a DNS add message to a nameserver for "name" to have a new "rdata"
1461dyndns_add(nameserver, name, rdata, type="A", ttl=10) -> result code (0=ok)
1462
1463example: dyndns_add("ns1.toto.com", "dyn.toto.com", "127.0.0.1")
1464RFC2136
1465"""
1466    zone = name[name.find(".") + 1:]
1467    r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5,
1468                                             qd=[DNSQR(qname=zone, qtype="SOA")],  # noqa: E501
1469                                             ns=[DNSRR(rrname=name, type="A",
1470                                                       ttl=ttl, rdata=rdata)]),
1471            verbose=0, timeout=5)
1472    if r and r.haslayer(DNS):
1473        return r.getlayer(DNS).rcode
1474    else:
1475        return -1
1476
1477
1478@conf.commands.register
1479def dyndns_del(nameserver, name, type="ALL", ttl=10):
1480    """Send a DNS delete message to a nameserver for "name"
1481dyndns_del(nameserver, name, type="ANY", ttl=10) -> result code (0=ok)
1482
1483example: dyndns_del("ns1.toto.com", "dyn.toto.com")
1484RFC2136
1485"""
1486    zone = name[name.find(".") + 1:]
1487    r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5,
1488                                             qd=[DNSQR(qname=zone, qtype="SOA")],  # noqa: E501
1489                                             ns=[DNSRR(rrname=name, type=type,
1490                                                       rclass="ANY", ttl=0, rdata="")]),  # noqa: E501
1491            verbose=0, timeout=5)
1492    if r and r.haslayer(DNS):
1493        return r.getlayer(DNS).rcode
1494    else:
1495        return -1
1496
1497
1498class DNS_am(AnsweringMachine):
1499    function_name = "dnsd"
1500    filter = "udp port 53"
1501    cls = DNS  # We also use this automaton for llmnrd / mdnsd
1502
1503    def parse_options(self, joker=None,
1504                      match=None,
1505                      srvmatch=None,
1506                      joker6=False,
1507                      send_error=False,
1508                      relay=False,
1509                      from_ip=True,
1510                      from_ip6=False,
1511                      src_ip=None,
1512                      src_ip6=None,
1513                      ttl=10,
1514                      jokerarpa=False):
1515        """
1516        Simple DNS answering machine.
1517
1518        :param joker: default IPv4 for unresolved domains.
1519                      Set to False to disable, None to mirror the interface's IP.
1520                      Defaults to None, unless 'match' is used, then it defaults to
1521                      False.
1522        :param joker6: default IPv6 for unresolved domains.
1523                       Set to False to disable, None to mirror the interface's IPv6.
1524                       Defaults to False.
1525        :param match: queries to match.
1526                      This can be a dictionary of {name: val} where name is a string
1527                      representing a domain name (A, AAAA) and val is a tuple of 2
1528                      elements, each representing an IP or a list of IPs. If val is
1529                      a single element, (A, None) is assumed.
1530                      This can also be a list or names, in which case joker(6) are
1531                      used as a response.
1532        :param jokerarpa: answer for .in-addr.arpa PTR requests. (Default: False)
1533        :param relay: relay unresolved domains to conf.nameservers (Default: False).
1534        :param send_error: send an error message when this server can't answer
1535                           (Default: False)
1536        :param srvmatch: a dictionary of {name: (port, target)} used for SRV
1537        :param from_ip: an source IP to filter. Can contain a netmask. True for all,
1538                        False for none. Default True
1539        :param from_ip6: an source IPv6 to filter. Can contain a netmask. True for all,
1540                        False for none. Default False
1541        :param ttl: the DNS time to live (in seconds)
1542        :param src_ip: override the source IP
1543        :param src_ip6:
1544
1545        Examples:
1546
1547        - Answer all 'A' and 'AAAA' requests::
1548
1549            $ sudo iptables -I OUTPUT -p icmp --icmp-type 3/3 -j DROP
1550            >>> dnsd(joker="192.168.0.2", joker6="fe80::260:8ff:fe52:f9d8",
1551            ...      iface="eth0")
1552
1553        - Answer only 'A' query for google.com with 192.168.0.2::
1554
1555            >>> dnsd(match={"google.com": "192.168.0.2"}, iface="eth0")
1556
1557        - Answer DNS for a Windows domain controller ('SRV', 'A' and 'AAAA')::
1558
1559            >>> dnsd(
1560            ...     srvmatch={
1561            ...         "_ldap._tcp.dc._msdcs.DOMAIN.LOCAL.": (389,
1562            ...                                                "srv1.domain.local"),
1563            ...     },
1564            ...     match={"src1.domain.local": ("192.168.0.102",
1565            ...                                  "fe80::260:8ff:fe52:f9d8")},
1566            ... )
1567
1568        - Relay all queries to another DNS server, except some::
1569
1570            >>> conf.nameservers = ["1.1.1.1"]  # server to relay to
1571            >>> dnsd(
1572            ...     match={"test.com": "1.1.1.1"},
1573            ...     relay=True,
1574            ... )
1575        """
1576        from scapy.layers.inet6 import Net6
1577
1578        self.mDNS = isinstance(self, mDNS_am)
1579        self.llmnr = self.cls != DNS
1580
1581        # Add some checks (to help)
1582        if not isinstance(joker, (str, bool)) and joker is not None:
1583            raise ValueError("Bad 'joker': should be an IPv4 (str) or False !")
1584        if not isinstance(joker6, (str, bool)) and joker6 is not None:
1585            raise ValueError("Bad 'joker6': should be an IPv6 (str) or False !")
1586        if not isinstance(jokerarpa, (str, bool)):
1587            raise ValueError("Bad 'jokerarpa': should be a hostname or False !")
1588        if not isinstance(from_ip, (str, Net, bool)):
1589            raise ValueError("Bad 'from_ip': should be an IPv4 (str), Net or False !")
1590        if not isinstance(from_ip6, (str, Net6, bool)):
1591            raise ValueError("Bad 'from_ip6': should be an IPv6 (str), Net or False !")
1592        if self.mDNS and src_ip:
1593            raise ValueError("Cannot use 'src_ip' in mDNS !")
1594        if self.mDNS and src_ip6:
1595            raise ValueError("Cannot use 'src_ip6' in mDNS !")
1596
1597        if joker is None and match is not None:
1598            joker = False
1599        self.joker = joker
1600        self.joker6 = joker6
1601        self.jokerarpa = jokerarpa
1602
1603        def normv(v):
1604            if isinstance(v, (tuple, list)) and len(v) == 2:
1605                return tuple(v)
1606            elif isinstance(v, str):
1607                return (v, joker6)
1608            else:
1609                raise ValueError("Bad match value: '%s'" % repr(v))
1610
1611        def normk(k):
1612            k = bytes_encode(k).lower()
1613            if not k.endswith(b"."):
1614                k += b"."
1615            return k
1616
1617        self.match = collections.defaultdict(lambda: (joker, joker6))
1618        if match:
1619            if isinstance(match, (list, set)):
1620                self.match.update({normk(k): (None, None) for k in match})
1621            else:
1622                self.match.update({normk(k): normv(v) for k, v in match.items()})
1623        if srvmatch is None:
1624            self.srvmatch = {}
1625        else:
1626            self.srvmatch = {normk(k): normv(v) for k, v in srvmatch.items()}
1627
1628        self.send_error = send_error
1629        self.relay = relay
1630        if isinstance(from_ip, str):
1631            self.from_ip = Net(from_ip)
1632        else:
1633            self.from_ip = from_ip
1634        if isinstance(from_ip6, str):
1635            self.from_ip6 = Net6(from_ip6)
1636        else:
1637            self.from_ip6 = from_ip6
1638        self.src_ip = src_ip
1639        self.src_ip6 = src_ip6
1640        self.ttl = ttl
1641
1642    def is_request(self, req):
1643        from scapy.layers.inet6 import IPv6
1644        return (
1645            req.haslayer(self.cls) and
1646            req.getlayer(self.cls).qr == 0 and (
1647                (
1648                    self.from_ip6 is True or
1649                    (self.from_ip6 and req[IPv6].src in self.from_ip6)
1650                )
1651                if IPv6 in req else
1652                (
1653                    self.from_ip is True or
1654                    (self.from_ip and req[IP].src in self.from_ip)
1655                )
1656            )
1657        )
1658
1659    def make_reply(self, req):
1660        # Build reply from the request
1661        resp = req.copy()
1662        if Ether in req:
1663            if self.mDNS:
1664                resp[Ether].src, resp[Ether].dst = None, None
1665            elif self.llmnr:
1666                resp[Ether].src, resp[Ether].dst = None, req[Ether].src
1667            else:
1668                resp[Ether].src, resp[Ether].dst = (
1669                    None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst,
1670                    req[Ether].src,
1671                )
1672        from scapy.layers.inet6 import IPv6
1673        if IPv6 in req:
1674            resp[IPv6].underlayer.remove_payload()
1675            if self.mDNS:
1676                # "All Multicast DNS responses (including responses sent via unicast)
1677                # SHOULD be sent with IP TTL set to 255."
1678                resp /= IPv6(dst="ff02::fb", src=self.src_ip6,
1679                             fl=req[IPv6].fl, hlim=255)
1680            elif self.llmnr:
1681                resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6,
1682                             fl=req[IPv6].fl, hlim=req[IPv6].hlim)
1683            else:
1684                resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6 or req[IPv6].dst,
1685                             fl=req[IPv6].fl, hlim=req[IPv6].hlim)
1686        elif IP in req:
1687            resp[IP].underlayer.remove_payload()
1688            if self.mDNS:
1689                # "All Multicast DNS responses (including responses sent via unicast)
1690                # SHOULD be sent with IP TTL set to 255."
1691                resp /= IP(dst="224.0.0.251", src=self.src_ip,
1692                           id=req[IP].id, ttl=255)
1693            elif self.llmnr:
1694                resp /= IP(dst=req[IP].src, src=self.src_ip,
1695                           id=req[IP].id, ttl=req[IP].ttl)
1696            else:
1697                resp /= IP(dst=req[IP].src, src=self.src_ip or req[IP].dst,
1698                           id=req[IP].id, ttl=req[IP].ttl)
1699        else:
1700            warning("No IP or IPv6 layer in %s", req.command())
1701            return
1702        try:
1703            resp /= UDP(sport=req[UDP].dport, dport=req[UDP].sport)
1704        except IndexError:
1705            warning("No UDP layer in %s", req.command(), exc_info=True)
1706            return
1707        try:
1708            req = req[self.cls]
1709        except IndexError:
1710            warning(
1711                "No %s layer in %s",
1712                self.cls.__name__,
1713                req.command(),
1714                exc_info=True,
1715            )
1716            return
1717        try:
1718            queries = req.qd
1719        except AttributeError:
1720            warning("No qd attribute in %s", req.command(), exc_info=True)
1721            return
1722        # Special case: alias 'ALL' query as 'A' + 'AAAA'
1723        try:
1724            allquery = next(
1725                (x for x in queries if getattr(x, "qtype", None) == 255)
1726            )
1727            queries.remove(allquery)
1728            queries.extend([
1729                DNSQR(
1730                    qtype=x,
1731                    qname=allquery.qname,
1732                    unicastresponse=allquery.unicastresponse,
1733                    qclass=allquery.qclass,
1734                )
1735                for x in [1, 28]
1736            ])
1737        except StopIteration:
1738            pass
1739        # Process each query
1740        ans = []
1741        ars = []
1742        for rq in queries:
1743            if isinstance(rq, Raw):
1744                warning("Cannot parse qd element %s", rq.command(), exc_info=True)
1745                continue
1746            rqname = rq.qname.lower()
1747            if rq.qtype in [1, 28]:
1748                # A or AAAA
1749                if rq.qtype == 28:
1750                    # AAAA
1751                    rdata = self.match[rqname][1]
1752                    if rdata is None and not self.relay:
1753                        # 'None' resolves to the default IPv6
1754                        iface = resolve_iface(self.optsniff.get("iface", conf.iface))
1755                        if self.mDNS:
1756                            # All IPs, as per mDNS.
1757                            rdata = iface.ips[6]
1758                        else:
1759                            rdata = get_if_addr6(
1760                                iface
1761                            )
1762                    if self.mDNS and rdata and IPv6 in resp:
1763                        # For mDNS, we must replace the IPv6 src
1764                        resp[IPv6].src = rdata
1765                elif rq.qtype == 1:
1766                    # A
1767                    rdata = self.match[rqname][0]
1768                    if rdata is None and not self.relay:
1769                        # 'None' resolves to the default IPv4
1770                        iface = resolve_iface(self.optsniff.get("iface", conf.iface))
1771                        if self.mDNS:
1772                            # All IPs, as per mDNS.
1773                            rdata = iface.ips[4]
1774                        else:
1775                            rdata = get_if_addr(
1776                                iface
1777                            )
1778                    if self.mDNS and rdata and IP in resp:
1779                        # For mDNS, we must replace the IP src
1780                        resp[IP].src = rdata
1781                if rdata:
1782                    # Common A and AAAA
1783                    if not isinstance(rdata, list):
1784                        rdata = [rdata]
1785                    ans.extend([
1786                        DNSRR(
1787                            rrname=rq.qname,
1788                            ttl=self.ttl,
1789                            rdata=x,
1790                            type=rq.qtype,
1791                            cacheflush=self.mDNS and rq.qtype == rq.qtype,
1792                        )
1793                        for x in rdata
1794                    ])
1795                    continue  # next
1796            elif rq.qtype == 33:
1797                # SRV
1798                try:
1799                    port, target = self.srvmatch[rqname]
1800                    ans.append(DNSRRSRV(
1801                        rrname=rq.qname,
1802                        port=port,
1803                        target=target,
1804                        weight=100,
1805                        ttl=self.ttl
1806                    ))
1807                    continue  # next
1808                except KeyError:
1809                    # No result
1810                    pass
1811            elif rq.qtype == 12:
1812                # PTR
1813                if rq.qname[-14:] == b".in-addr.arpa." and self.jokerarpa:
1814                    ans.append(DNSRR(
1815                        rrname=rq.qname,
1816                        type=rq.qtype,
1817                        ttl=self.ttl,
1818                        rdata=self.jokerarpa,
1819                    ))
1820                    continue
1821            # It it arrives here, there is currently no answer
1822            if self.relay:
1823                # Relay mode ?
1824                try:
1825                    _rslv = dns_resolve(rq.qname, qtype=rq.qtype)
1826                    if _rslv:
1827                        ans.extend(_rslv)
1828                        continue  # next
1829                except TimeoutError:
1830                    pass
1831            # Still no answer.
1832            if self.mDNS:
1833                # "Any time a responder receives a query for a name for which it
1834                # has verified exclusive ownership, for a type for which that name
1835                # has no records, the responder MUST respond asserting the
1836                # nonexistence of that record using a DNS NSEC record [RFC4034]."
1837                ans.append(DNSRRNSEC(
1838                    # RFC6762 sect 6.1 - Negative Response
1839                    ttl=self.ttl,
1840                    rrname=rq.qname,
1841                    nextname=rq.qname,
1842                    typebitmaps=RRlist2bitmap([rq.qtype]),
1843                ))
1844        if self.mDNS and all(x.type == 47 for x in ans):
1845            # If mDNS answers with only NSEC, discard.
1846            return
1847        if not ans:
1848            # No answer is available.
1849            if self.send_error:
1850                resp /= self.cls(id=req.id, qr=1, qd=req.qd, rcode=3)
1851                return resp
1852            log_runtime.info("No answer could be provided to: %s" % req.summary())
1853            return
1854        # Handle Additional Records
1855        if self.mDNS:
1856            # Windows specific extension
1857            ars.append(DNSRROPT(
1858                z=0x1194,
1859                rdata=[
1860                    EDNS0OWN(
1861                        primary_mac=resp[Ether].src,
1862                    ),
1863                ],
1864            ))
1865        # All rq were answered
1866        if self.mDNS:
1867            # in mDNS mode, don't repeat the question, set aa=1, rd=0
1868            dns = self.cls(id=req.id, aa=1, rd=0, qr=1, qd=[], ar=ars, an=ans)
1869        else:
1870            dns = self.cls(id=req.id, qr=1, qd=req.qd, ar=ars, an=ans)
1871        # Compress DNS and mDNS
1872        if not self.llmnr:
1873            resp /= dns_compress(dns)
1874        else:
1875            resp /= dns
1876        return resp
1877
1878
1879class mDNS_am(DNS_am):
1880    """
1881    mDNS answering machine.
1882
1883    This has the same arguments as DNS_am. See help(DNS_am)
1884
1885    Example::
1886
1887        - Answer for 'TEST.local' with local IPv4::
1888
1889            >>> mdnsd(match=["TEST.local"])
1890
1891        - Answer all requests with other IP::
1892
1893            >>> mdnsd(joker="192.168.0.2", joker6="fe80::260:8ff:fe52:f9d8",
1894            ...       iface="eth0")
1895
1896        - Answer for multiple different mDNS names::
1897
1898            >>> mdnsd(match={"TEST.local": "192.168.0.100",
1899            ...              "BOB.local": "192.168.0.101"})
1900
1901        - Answer with both A and AAAA records::
1902
1903            >>> mdnsd(match={"TEST.local": ("192.168.0.100",
1904            ...                             "fe80::260:8ff:fe52:f9d8")})
1905    """
1906    function_name = "mdnsd"
1907    filter = "udp port 5353"
1908
1909
1910# DNS-SD (RFC 6763)
1911
1912
1913class DNSSDResult(SndRcvList):
1914    def __init__(self,
1915                 res=None,  # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]]  # noqa: E501
1916                 name="DNS-SD",  # type: str
1917                 stats=None  # type: Optional[List[Type[Packet]]]
1918                 ):
1919        SndRcvList.__init__(self, res, name, stats)
1920
1921    def show(self, types=['PTR', 'SRV'], alltypes=False):
1922        # type: (List[str], bool) -> None
1923        """
1924        Print the list of discovered services.
1925
1926        :param types: types to show. Default ['PTR', 'SRV']
1927        :param alltypes: show all types. Default False
1928        """
1929        if alltypes:
1930            types = None
1931        data = list()  # type: List[Tuple[str | List[str], ...]]
1932
1933        resolve_mac = (
1934            self.res and isinstance(self.res[0][1].underlayer, Ether) and
1935            conf.manufdb
1936        )
1937
1938        header = ("IP", "Service")
1939        if resolve_mac:
1940            header = ("Mac",) + header
1941
1942        for _, r in self.res:
1943            attrs = []
1944            for attr in itertools.chain(r[DNS].an, r[DNS].ar):
1945                if types and dnstypes.get(attr.type) not in types:
1946                    continue
1947                if isinstance(attr, DNSRRNSEC):
1948                    attrs.append(attr.sprintf("%type%=%nextname%"))
1949                elif isinstance(attr, DNSRRSRV):
1950                    attrs.append(attr.sprintf("%type%=(%target%,%port%)"))
1951                else:
1952                    attrs.append(attr.sprintf("%type%=%rdata%"))
1953            ans = (r.src, attrs)
1954            if resolve_mac:
1955                mac = conf.manufdb._resolve_MAC(r.underlayer.src)
1956                data.append((mac,) + ans)
1957            else:
1958                data.append(ans)
1959
1960        print(
1961            pretty_list(
1962                data,
1963                [header],
1964            )
1965        )
1966
1967
1968@conf.commands.register
1969def dnssd(service="_services._dns-sd._udp.local",
1970          af=socket.AF_INET,
1971          qtype="PTR",
1972          iface=None,
1973          verbose=2,
1974          timeout=3):
1975    """
1976    Performs a DNS-SD (RFC6763) request
1977
1978    :param service: the service name to query (e.g. _spotify-connect._tcp.local)
1979    :param af: the transport to use. socket.AF_INET or socket.AF_INET6
1980    :param qtype: the type to use in the mDNS. Either TXT, PTR or SRV.
1981    :param iface: the interface to do this discovery on.
1982    """
1983    if af == socket.AF_INET:
1984        pkt = IP(dst=ScopedIP("224.0.0.251", iface), ttl=255)
1985    elif af == socket.AF_INET6:
1986        pkt = IPv6(dst=ScopedIP("ff02::fb", iface))
1987    else:
1988        return
1989    pkt /= UDP(sport=5353, dport=5353)
1990    pkt /= DNS(rd=0, qd=[DNSQR(qname=service, qtype=qtype)])
1991    ans, _ = sr(pkt, multi=True, timeout=timeout, verbose=verbose)
1992    return DNSSDResult(ans.res)
1993