1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7DNS: Domain Name System 8 9This implements: 10- RFC1035: Domain Names 11- RFC6762: Multicast DNS 12- RFC6763: DNS-Based Service Discovery 13""" 14 15import abc 16import collections 17import operator 18import itertools 19import socket 20import struct 21import time 22import warnings 23 24from scapy.arch import ( 25 get_if_addr, 26 get_if_addr6, 27 read_nameservers, 28) 29from scapy.ansmachine import AnsweringMachine 30from scapy.base_classes import Net, ScopedIP 31from scapy.config import conf 32from scapy.compat import orb, raw, chb, bytes_encode, plain_str 33from scapy.error import log_runtime, warning, Scapy_Exception 34from scapy.packet import Packet, bind_layers, Raw 35from scapy.fields import ( 36 BitEnumField, 37 BitField, 38 ByteEnumField, 39 ByteField, 40 ConditionalField, 41 Field, 42 FieldLenField, 43 FieldListField, 44 FlagsField, 45 I, 46 IP6Field, 47 IntField, 48 MACField, 49 MultipleTypeField, 50 PacketListField, 51 ShortEnumField, 52 ShortField, 53 StrField, 54 StrLenField, 55 UTCTimeField, 56 XStrFixedLenField, 57 XStrLenField, 58) 59from scapy.interfaces import resolve_iface 60from scapy.sendrecv import sr1, sr 61from scapy.supersocket import StreamSocket 62from scapy.plist import SndRcvList, _PacketList, QueryAnswer 63from scapy.pton_ntop import inet_ntop, inet_pton 64from scapy.utils import pretty_list 65from scapy.volatile import RandShort 66 67from scapy.layers.l2 import Ether 68from scapy.layers.inet import IP, DestIPField, IPField, UDP, TCP 69from scapy.layers.inet6 import IPv6 70 71from typing import ( 72 Any, 73 List, 74 Optional, 75 Tuple, 76 Type, 77 Union, 78) 79 80 81# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#dns-parameters-4 82dnstypes = { 83 0: "RESERVED", 84 1: "A", 2: "NS", 3: "MD", 4: "MF", 5: "CNAME", 6: "SOA", 7: "MB", 8: "MG", 85 9: "MR", 10: "NULL", 11: "WKS", 12: "PTR", 13: "HINFO", 14: "MINFO", 86 15: "MX", 16: "TXT", 17: "RP", 18: "AFSDB", 19: "X25", 20: "ISDN", 87 21: "RT", 22: "NSAP", 23: "NSAP-PTR", 24: "SIG", 25: "KEY", 26: "PX", 88 27: "GPOS", 28: "AAAA", 29: "LOC", 30: "NXT", 31: "EID", 32: "NIMLOC", 89 33: "SRV", 34: "ATMA", 35: "NAPTR", 36: "KX", 37: "CERT", 38: "A6", 90 39: "DNAME", 40: "SINK", 41: "OPT", 42: "APL", 43: "DS", 44: "SSHFP", 91 45: "IPSECKEY", 46: "RRSIG", 47: "NSEC", 48: "DNSKEY", 49: "DHCID", 92 50: "NSEC3", 51: "NSEC3PARAM", 52: "TLSA", 53: "SMIMEA", 55: "HIP", 93 56: "NINFO", 57: "RKEY", 58: "TALINK", 59: "CDS", 60: "CDNSKEY", 94 61: "OPENPGPKEY", 62: "CSYNC", 63: "ZONEMD", 64: "SVCB", 65: "HTTPS", 95 99: "SPF", 100: "UINFO", 101: "UID", 102: "GID", 103: "UNSPEC", 104: "NID", 96 105: "L32", 106: "L64", 107: "LP", 108: "EUI48", 109: "EUI64", 249: "TKEY", 97 250: "TSIG", 256: "URI", 257: "CAA", 258: "AVC", 259: "DOA", 98 260: "AMTRELAY", 32768: "TA", 32769: "DLV", 65535: "RESERVED" 99} 100 101 102dnsqtypes = {251: "IXFR", 252: "AXFR", 253: "MAILB", 254: "MAILA", 255: "ALL"} 103dnsqtypes.update(dnstypes) 104dnsclasses = {1: 'IN', 2: 'CS', 3: 'CH', 4: 'HS', 255: 'ANY'} 105 106 107# 12/2023 from https://www.iana.org/assignments/dns-sec-alg-numbers/dns-sec-alg-numbers.xhtml # noqa: E501 108dnssecalgotypes = {0: "Reserved", 1: "RSA/MD5", 2: "Diffie-Hellman", 3: "DSA/SHA-1", # noqa: E501 109 4: "Reserved", 5: "RSA/SHA-1", 6: "DSA-NSEC3-SHA1", 110 7: "RSASHA1-NSEC3-SHA1", 8: "RSA/SHA-256", 9: "Reserved", 111 10: "RSA/SHA-512", 11: "Reserved", 12: "GOST R 34.10-2001", 112 13: "ECDSA Curve P-256 with SHA-256", 14: "ECDSA Curve P-384 with SHA-384", # noqa: E501 113 15: "Ed25519", 16: "Ed448", 114 252: "Reserved for Indirect Keys", 253: "Private algorithms - domain name", # noqa: E501 115 254: "Private algorithms - OID", 255: "Reserved"} 116 117# 12/2023 from https://www.iana.org/assignments/ds-rr-types/ds-rr-types.xhtml 118dnssecdigesttypes = {0: "Reserved", 1: "SHA-1", 2: "SHA-256", 3: "GOST R 34.11-94", 4: "SHA-384"} # noqa: E501 119 120# 12/2023 from https://www.iana.org/assignments/dnssec-nsec3-parameters/dnssec-nsec3-parameters.xhtml # noqa: E501 121dnssecnsec3algotypes = {0: "Reserved", 1: "SHA-1"} 122 123 124def dns_get_str(s, full=None, _ignore_compression=False): 125 """This function decompresses a string s, starting 126 from the given pointer. 127 128 :param s: the string to decompress 129 :param full: (optional) the full packet (used for decompression) 130 131 :returns: (decoded_string, end_index, left_string) 132 """ 133 # _ignore_compression is for internal use only 134 max_length = len(s) 135 # The result = the extracted name 136 name = b"" 137 # Will contain the index after the pointer, to be returned 138 after_pointer = None 139 processed_pointers = [] # Used to check for decompression loops 140 bytes_left = None 141 _fullpacket = False # s = full packet 142 pointer = 0 143 while True: 144 if abs(pointer) >= max_length: 145 log_runtime.info( 146 "DNS RR prematured end (ofs=%i, len=%i)", pointer, len(s) 147 ) 148 break 149 cur = s[pointer] # get pointer value 150 pointer += 1 # make pointer go forward 151 if cur & 0xc0: # Label pointer 152 if after_pointer is None: 153 # after_pointer points to where the remaining bytes start, 154 # as pointer will follow the jump token 155 after_pointer = pointer + 1 156 if _ignore_compression: 157 # skip 158 pointer += 1 159 continue 160 if pointer >= max_length: 161 log_runtime.info( 162 "DNS incomplete jump token at (ofs=%i)", pointer 163 ) 164 break 165 if not full: 166 raise Scapy_Exception("DNS message can't be compressed " + 167 "at this point!") 168 # Follow the pointer 169 pointer = ((cur & ~0xc0) << 8) + s[pointer] 170 if pointer in processed_pointers: 171 warning("DNS decompression loop detected") 172 break 173 if len(processed_pointers) >= 20: 174 warning("More than 20 jumps in a single DNS decompression ! " 175 "Dropping (evil packet)") 176 break 177 if not _fullpacket: 178 # We switch our s buffer to full, so we need to remember 179 # the previous context 180 bytes_left = s[after_pointer:] 181 s = full 182 max_length = len(s) 183 _fullpacket = True 184 processed_pointers.append(pointer) 185 continue 186 elif cur > 0: # Label 187 # cur = length of the string 188 name += s[pointer:pointer + cur] + b"." 189 pointer += cur 190 else: # End 191 break 192 if after_pointer is not None: 193 # Return the real end index (not the one we followed) 194 pointer = after_pointer 195 if bytes_left is None: 196 bytes_left = s[pointer:] 197 # name, remaining 198 return name or b".", bytes_left 199 200 201def _is_ptr(x): 202 return b"." not in x and ( 203 (x and orb(x[-1]) == 0) or 204 (len(x) >= 2 and (orb(x[-2]) & 0xc0) == 0xc0) 205 ) 206 207 208def dns_encode(x, check_built=False): 209 """Encodes a bytes string into the DNS format 210 211 :param x: the string 212 :param check_built: detect already-built strings and ignore them 213 :returns: the encoded bytes string 214 """ 215 if not x or x == b".": 216 return b"\x00" 217 218 if check_built and _is_ptr(x): 219 # The value has already been processed. Do not process it again 220 return x 221 222 # Truncate chunks that cannot be encoded (more than 63 bytes..) 223 x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(b"."))) 224 if x[-1:] != b"\x00": 225 x += b"\x00" 226 return x 227 228 229def DNSgetstr(*args, **kwargs): 230 """Legacy function. Deprecated""" 231 warnings.warn( 232 "DNSgetstr is deprecated. Use dns_get_str instead.", 233 DeprecationWarning 234 ) 235 return dns_get_str(*args, **kwargs)[:-1] 236 237 238def dns_compress(pkt): 239 """This function compresses a DNS packet according to compression rules. 240 """ 241 if DNS not in pkt: 242 raise Scapy_Exception("Can only compress DNS layers") 243 pkt = pkt.copy() 244 dns_pkt = pkt.getlayer(DNS) 245 dns_pkt.clear_cache() 246 build_pkt = raw(dns_pkt) 247 248 def field_gen(dns_pkt): 249 """Iterates through all DNS strings that can be compressed""" 250 for lay in [dns_pkt.qd, dns_pkt.an, dns_pkt.ns, dns_pkt.ar]: 251 if not lay: 252 continue 253 for current in lay: 254 for field in current.fields_desc: 255 if isinstance(field, DNSStrField) or \ 256 (isinstance(field, MultipleTypeField) and 257 current.type in [2, 3, 4, 5, 12, 15, 39, 47]): 258 # Get the associated data and store it accordingly # noqa: E501 259 dat = current.getfieldval(field.name) 260 yield current, field.name, dat 261 262 def possible_shortens(dat): 263 """Iterates through all possible compression parts in a DNS string""" 264 if dat == b".": # we'd lose by compressing it 265 return 266 yield dat 267 for x in range(1, dat.count(b".")): 268 yield dat.split(b".", x)[x] 269 data = {} 270 for current, name, dat in field_gen(dns_pkt): 271 for part in possible_shortens(dat): 272 # Encode the data 273 encoded = dns_encode(part, check_built=True) 274 if part not in data: 275 # We have no occurrence of such data, let's store it as a 276 # possible pointer for future strings. 277 # We get the index of the encoded data 278 index = build_pkt.index(encoded) 279 # The following is used to build correctly the pointer 280 fb_index = ((index >> 8) | 0xc0) 281 sb_index = index - (256 * (fb_index - 0xc0)) 282 pointer = chb(fb_index) + chb(sb_index) 283 data[part] = [(current, name, pointer, index + 1)] 284 else: 285 # This string already exists, let's mark the current field 286 # with it, so that it gets compressed 287 data[part].append((current, name)) 288 _in = data[part][0][3] 289 build_pkt = build_pkt[:_in] + build_pkt[_in:].replace( 290 encoded, 291 b"\0\0", 292 1 293 ) 294 break 295 # Apply compression rules 296 for ck in data: 297 # compression_key is a DNS string 298 replacements = data[ck] 299 # replacements is the list of all tuples (layer, field name) 300 # where this string was found 301 replace_pointer = replacements.pop(0)[2] 302 # replace_pointer is the packed pointer that should replace 303 # those strings. Note that pop remove it from the list 304 for rep in replacements: 305 # setfieldval edits the value of the field in the layer 306 val = rep[0].getfieldval(rep[1]) 307 assert val.endswith(ck) 308 kept_string = dns_encode(val[:-len(ck)], check_built=True)[:-1] 309 new_val = kept_string + replace_pointer 310 rep[0].setfieldval(rep[1], new_val) 311 try: 312 del rep[0].rdlen 313 except AttributeError: 314 pass 315 # End of the compression algorithm 316 # Destroy the previous DNS layer if needed 317 if not isinstance(pkt, DNS) and pkt.getlayer(DNS).underlayer: 318 pkt.getlayer(DNS).underlayer.remove_payload() 319 return pkt / dns_pkt 320 return dns_pkt 321 322 323class DNSCompressedPacket(Packet): 324 """ 325 Class to mark that a packet contains DNSStrField and supports compression 326 """ 327 @abc.abstractmethod 328 def get_full(self): 329 pass 330 331 332class DNSStrField(StrLenField): 333 """ 334 Special StrField that handles DNS encoding/decoding. 335 It will also handle DNS decompression. 336 (may be StrLenField if a length_from is passed), 337 """ 338 def any2i(self, pkt, x): 339 if x and isinstance(x, list): 340 return [self.h2i(pkt, y) for y in x] 341 return super(DNSStrField, self).any2i(pkt, x) 342 343 def h2i(self, pkt, x): 344 # Setting a DNSStrField manually (h2i) means any current compression will break 345 if ( 346 pkt and 347 isinstance(pkt.parent, DNSCompressedPacket) and 348 pkt.parent.raw_packet_cache 349 ): 350 pkt.parent.clear_cache() 351 if not x: 352 return b"." 353 x = bytes_encode(x) 354 if x[-1:] != b"." and not _is_ptr(x): 355 return x + b"." 356 return x 357 358 def i2m(self, pkt, x): 359 return dns_encode(x, check_built=True) 360 361 def i2len(self, pkt, x): 362 return len(self.i2m(pkt, x)) 363 364 def get_full(self, pkt): 365 while pkt and not isinstance(pkt, DNSCompressedPacket): 366 pkt = pkt.parent or pkt.underlayer 367 if not pkt: 368 return None 369 return pkt.get_full() 370 371 def getfield(self, pkt, s): 372 remain = b"" 373 if self.length_from: 374 remain, s = super(DNSStrField, self).getfield(pkt, s) 375 # Decode the compressed DNS message 376 decoded, left = dns_get_str(s, full=self.get_full(pkt)) 377 # returns (remaining, decoded) 378 return left + remain, decoded 379 380 381class DNSTextField(StrLenField): 382 """ 383 Special StrLenField that handles DNS TEXT data (16) 384 """ 385 386 islist = 1 387 388 def i2h(self, pkt, x): 389 if not x: 390 return [] 391 return x 392 393 def m2i(self, pkt, s): 394 ret_s = list() 395 tmp_s = s 396 # RDATA contains a list of strings, each are prepended with 397 # a byte containing the size of the following string. 398 while tmp_s: 399 tmp_len = orb(tmp_s[0]) + 1 400 if tmp_len > len(tmp_s): 401 log_runtime.info( 402 "DNS RR TXT prematured end of character-string " 403 "(size=%i, remaining bytes=%i)", tmp_len, len(tmp_s) 404 ) 405 ret_s.append(tmp_s[1:tmp_len]) 406 tmp_s = tmp_s[tmp_len:] 407 return ret_s 408 409 def any2i(self, pkt, x): 410 if isinstance(x, (str, bytes)): 411 return [x] 412 return x 413 414 def i2len(self, pkt, x): 415 return len(self.i2m(pkt, x)) 416 417 def i2m(self, pkt, s): 418 ret_s = b"" 419 for text in s: 420 if not text: 421 ret_s += b"\x00" 422 continue 423 text = bytes_encode(text) 424 # The initial string must be split into a list of strings 425 # prepended with theirs sizes. 426 while len(text) >= 255: 427 ret_s += b"\xff" + text[:255] 428 text = text[255:] 429 # The remaining string is less than 255 bytes long 430 if len(text): 431 ret_s += struct.pack("!B", len(text)) + text 432 return ret_s 433 434 435# RFC 2671 - Extension Mechanisms for DNS (EDNS0) 436 437edns0types = {0: "Reserved", 1: "LLQ", 2: "UL", 3: "NSID", 4: "Owner", 438 5: "DAU", 6: "DHU", 7: "N3U", 8: "edns-client-subnet", 10: "COOKIE", 439 15: "Extended DNS Error"} 440 441 442class _EDNS0Dummy(Packet): 443 name = "Dummy class that implements extract_padding()" 444 445 def extract_padding(self, p): 446 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 447 return "", p 448 449 450class EDNS0TLV(_EDNS0Dummy): 451 name = "DNS EDNS0 TLV" 452 fields_desc = [ShortEnumField("optcode", 0, edns0types), 453 FieldLenField("optlen", None, "optdata", fmt="H"), 454 StrLenField("optdata", "", 455 length_from=lambda pkt: pkt.optlen)] 456 457 @classmethod 458 def dispatch_hook(cls, _pkt=None, *args, **kargs): 459 # type: (Optional[bytes], *Any, **Any) -> Type[Packet] 460 if _pkt is None: 461 return EDNS0TLV 462 if len(_pkt) < 2: 463 return Raw 464 edns0type = struct.unpack("!H", _pkt[:2])[0] 465 return EDNS0OPT_DISPATCHER.get(edns0type, EDNS0TLV) 466 467 468class DNSRROPT(Packet): 469 name = "DNS OPT Resource Record" 470 fields_desc = [DNSStrField("rrname", ""), 471 ShortEnumField("type", 41, dnstypes), 472 ShortEnumField("rclass", 4096, dnsclasses), 473 ByteField("extrcode", 0), 474 ByteField("version", 0), 475 # version 0 means EDNS0 476 BitEnumField("z", 32768, 16, {32768: "D0"}), 477 # D0 means DNSSEC OK from RFC 3225 478 FieldLenField("rdlen", None, length_of="rdata", fmt="H"), 479 PacketListField("rdata", [], EDNS0TLV, 480 length_from=lambda pkt: pkt.rdlen)] 481 482 483# draft-cheshire-edns0-owner-option-01 - EDNS0 OWNER Option 484 485class EDNS0OWN(_EDNS0Dummy): 486 name = "EDNS0 Owner (OWN)" 487 fields_desc = [ShortEnumField("optcode", 4, edns0types), 488 FieldLenField("optlen", None, count_of="primary_mac", fmt="H"), 489 ByteField("v", 0), 490 ByteField("s", 0), 491 MACField("primary_mac", "00:00:00:00:00:00"), 492 ConditionalField( 493 MACField("wakeup_mac", "00:00:00:00:00:00"), 494 lambda pkt: (pkt.optlen or 0) >= 18), 495 ConditionalField( 496 StrLenField("password", "", 497 length_from=lambda pkt: pkt.optlen - 18), 498 lambda pkt: (pkt.optlen or 0) >= 22)] 499 500 def post_build(self, pkt, pay): 501 pkt += pay 502 if self.optlen is None: 503 pkt = pkt[:2] + struct.pack("!H", len(pkt) - 4) + pkt[4:] 504 return pkt 505 506 507# RFC 6975 - Signaling Cryptographic Algorithm Understanding in 508# DNS Security Extensions (DNSSEC) 509 510class EDNS0DAU(_EDNS0Dummy): 511 name = "DNSSEC Algorithm Understood (DAU)" 512 fields_desc = [ShortEnumField("optcode", 5, edns0types), 513 FieldLenField("optlen", None, count_of="alg_code", fmt="H"), 514 FieldListField("alg_code", None, 515 ByteEnumField("", 0, dnssecalgotypes), 516 count_from=lambda pkt:pkt.optlen)] 517 518 519class EDNS0DHU(_EDNS0Dummy): 520 name = "DS Hash Understood (DHU)" 521 fields_desc = [ShortEnumField("optcode", 6, edns0types), 522 FieldLenField("optlen", None, count_of="alg_code", fmt="H"), 523 FieldListField("alg_code", None, 524 ByteEnumField("", 0, dnssecdigesttypes), 525 count_from=lambda pkt:pkt.optlen)] 526 527 528class EDNS0N3U(_EDNS0Dummy): 529 name = "NSEC3 Hash Understood (N3U)" 530 fields_desc = [ShortEnumField("optcode", 7, edns0types), 531 FieldLenField("optlen", None, count_of="alg_code", fmt="H"), 532 FieldListField("alg_code", None, 533 ByteEnumField("", 0, dnssecnsec3algotypes), 534 count_from=lambda pkt:pkt.optlen)] 535 536 537# RFC 7871 - Client Subnet in DNS Queries 538 539class ClientSubnetv4(StrLenField): 540 af_familly = socket.AF_INET 541 af_length = 32 542 af_default = b"\xc0" # 192.0.0.0 543 544 def getfield(self, pkt, s): 545 # type: (Packet, bytes) -> Tuple[bytes, I] 546 sz = operator.floordiv(self.length_from(pkt), 8) 547 sz = min(sz, operator.floordiv(self.af_length, 8)) 548 return s[sz:], self.m2i(pkt, s[:sz]) 549 550 def m2i(self, pkt, x): 551 # type: (Optional[Packet], bytes) -> str 552 padding = self.af_length - self.length_from(pkt) 553 if padding: 554 x += b"\x00" * operator.floordiv(padding, 8) 555 x = x[: operator.floordiv(self.af_length, 8)] 556 return inet_ntop(self.af_familly, x) 557 558 def _pack_subnet(self, subnet): 559 # type: (bytes) -> bytes 560 packed_subnet = inet_pton(self.af_familly, plain_str(subnet)) 561 for i in list(range(operator.floordiv(self.af_length, 8)))[::-1]: 562 if orb(packed_subnet[i]) != 0: 563 i += 1 564 break 565 return packed_subnet[:i] 566 567 def i2m(self, pkt, x): 568 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes 569 if x is None: 570 return self.af_default 571 try: 572 return self._pack_subnet(x) 573 except (OSError, socket.error): 574 pkt.family = 2 575 return ClientSubnetv6("", "")._pack_subnet(x) 576 577 def i2len(self, pkt, x): 578 # type: (Packet, Any) -> int 579 if x is None: 580 return 1 581 try: 582 return len(self._pack_subnet(x)) 583 except (OSError, socket.error): 584 pkt.family = 2 585 return len(ClientSubnetv6("", "")._pack_subnet(x)) 586 587 588class ClientSubnetv6(ClientSubnetv4): 589 af_familly = socket.AF_INET6 590 af_length = 128 591 af_default = b"\x20" # 2000:: 592 593 594class EDNS0ClientSubnet(_EDNS0Dummy): 595 name = "DNS EDNS0 Client Subnet" 596 fields_desc = [ShortEnumField("optcode", 8, edns0types), 597 FieldLenField("optlen", None, "address", fmt="H", 598 adjust=lambda pkt, x: x + 4), 599 ShortField("family", 1), 600 FieldLenField("source_plen", None, 601 length_of="address", 602 fmt="B", 603 adjust=lambda pkt, x: x * 8), 604 ByteField("scope_plen", 0), 605 MultipleTypeField( 606 [(ClientSubnetv4("address", "192.168.0.0", 607 length_from=lambda p: p.source_plen), 608 lambda pkt: pkt.family == 1), 609 (ClientSubnetv6("address", "2001:db8::", 610 length_from=lambda p: p.source_plen), 611 lambda pkt: pkt.family == 2)], 612 ClientSubnetv4("address", "192.168.0.0", 613 length_from=lambda p: p.source_plen))] 614 615 616class EDNS0COOKIE(_EDNS0Dummy): 617 name = "DNS EDNS0 COOKIE" 618 fields_desc = [ShortEnumField("optcode", 10, edns0types), 619 FieldLenField("optlen", None, length_of="server_cookie", fmt="!H", 620 adjust=lambda pkt, x: x + 8), 621 XStrFixedLenField("client_cookie", b"\x00" * 8, length=8), 622 XStrLenField("server_cookie", "", 623 length_from=lambda pkt: max(0, pkt.optlen - 8))] 624 625 626# RFC 8914 - Extended DNS Errors 627 628# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#extended-dns-error-codes 629extended_dns_error_codes = { 630 0: "Other", 631 1: "Unsupported DNSKEY Algorithm", 632 2: "Unsupported DS Digest Type", 633 3: "Stale Answer", 634 4: "Forged Answer", 635 5: "DNSSEC Indeterminate", 636 6: "DNSSEC Bogus", 637 7: "Signature Expired", 638 8: "Signature Not Yet Valid", 639 9: "DNSKEY Missing", 640 10: "RRSIGs Missing", 641 11: "No Zone Key Bit Set", 642 12: "NSEC Missing", 643 13: "Cached Error", 644 14: "Not Ready", 645 15: "Blocked", 646 16: "Censored", 647 17: "Filtered", 648 18: "Prohibited", 649 19: "Stale NXDOMAIN Answer", 650 20: "Not Authoritative", 651 21: "Not Supported", 652 22: "No Reachable Authority", 653 23: "Network Error", 654 24: "Invalid Data", 655 25: "Signature Expired before Valid", 656 26: "Too Early", 657 27: "Unsupported NSEC3 Iterations Value", 658 28: "Unable to conform to policy", 659 29: "Synthesized", 660} 661 662 663# https://www.rfc-editor.org/rfc/rfc8914.html 664class EDNS0ExtendedDNSError(_EDNS0Dummy): 665 name = "DNS EDNS0 Extended DNS Error" 666 fields_desc = [ShortEnumField("optcode", 15, edns0types), 667 FieldLenField("optlen", None, length_of="extra_text", fmt="!H", 668 adjust=lambda pkt, x: x + 2), 669 ShortEnumField("info_code", 0, extended_dns_error_codes), 670 StrLenField("extra_text", "", 671 length_from=lambda pkt: pkt.optlen - 2)] 672 673 674EDNS0OPT_DISPATCHER = { 675 4: EDNS0OWN, 676 5: EDNS0DAU, 677 6: EDNS0DHU, 678 7: EDNS0N3U, 679 8: EDNS0ClientSubnet, 680 10: EDNS0COOKIE, 681 15: EDNS0ExtendedDNSError, 682} 683 684 685# RFC 4034 - Resource Records for the DNS Security Extensions 686 687def bitmap2RRlist(bitmap): 688 """ 689 Decode the 'Type Bit Maps' field of the NSEC Resource Record into an 690 integer list. 691 """ 692 # RFC 4034, 4.1.2. The Type Bit Maps Field 693 694 RRlist = [] 695 696 while bitmap: 697 698 if len(bitmap) < 2: 699 log_runtime.info("bitmap too short (%i)", len(bitmap)) 700 return 701 702 window_block = orb(bitmap[0]) # window number 703 offset = 256 * window_block # offset of the Resource Record 704 bitmap_len = orb(bitmap[1]) # length of the bitmap in bytes 705 706 if bitmap_len <= 0 or bitmap_len > 32: 707 log_runtime.info("bitmap length is no valid (%i)", bitmap_len) 708 return 709 710 tmp_bitmap = bitmap[2:2 + bitmap_len] 711 712 # Let's compare each bit of tmp_bitmap and compute the real RR value 713 for b in range(len(tmp_bitmap)): 714 v = 128 715 for i in range(8): 716 if orb(tmp_bitmap[b]) & v: 717 # each of the RR is encoded as a bit 718 RRlist += [offset + b * 8 + i] 719 v = v >> 1 720 721 # Next block if any 722 bitmap = bitmap[2 + bitmap_len:] 723 724 return RRlist 725 726 727def RRlist2bitmap(lst): 728 """ 729 Encode a list of integers representing Resource Records to a bitmap field 730 used in the NSEC Resource Record. 731 """ 732 # RFC 4034, 4.1.2. The Type Bit Maps Field 733 734 import math 735 736 bitmap = b"" 737 lst = [abs(x) for x in sorted(set(lst)) if x <= 65535] 738 739 # number of window blocks 740 max_window_blocks = int(math.ceil(lst[-1] / 256.)) 741 min_window_blocks = int(math.floor(lst[0] / 256.)) 742 if min_window_blocks == max_window_blocks: 743 max_window_blocks += 1 744 745 for wb in range(min_window_blocks, max_window_blocks + 1): 746 # First, filter out RR not encoded in the current window block 747 # i.e. keep everything between 256*wb <= 256*(wb+1) 748 rrlist = sorted(x for x in lst if 256 * wb <= x < 256 * (wb + 1)) 749 if not rrlist: 750 continue 751 752 # Compute the number of bytes used to store the bitmap 753 if rrlist[-1] == 0: # only one element in the list 754 bytes_count = 1 755 else: 756 max = rrlist[-1] - 256 * wb 757 bytes_count = int(math.ceil(max // 8)) + 1 # use at least 1 byte 758 if bytes_count > 32: # Don't encode more than 256 bits / values 759 bytes_count = 32 760 761 bitmap += struct.pack("BB", wb, bytes_count) 762 763 # Generate the bitmap 764 # The idea is to remove out of range Resource Records with these steps 765 # 1. rescale to fit into 8 bits 766 # 2. x gives the bit position ; compute the corresponding value 767 # 3. sum everything 768 bitmap += b"".join( 769 struct.pack( 770 b"B", 771 sum(2 ** (7 - (x - 256 * wb) + (tmp * 8)) for x in rrlist 772 if 256 * wb + 8 * tmp <= x < 256 * wb + 8 * tmp + 8), 773 ) for tmp in range(bytes_count) 774 ) 775 776 return bitmap 777 778 779class RRlistField(StrField): 780 islist = 1 781 782 def h2i(self, pkt, x): 783 if x and isinstance(x, list): 784 return RRlist2bitmap(x) 785 return x 786 787 def i2repr(self, pkt, x): 788 if not x: 789 return "[]" 790 x = self.i2h(pkt, x) 791 rrlist = bitmap2RRlist(x) 792 return [dnstypes.get(rr, rr) for rr in rrlist] if rrlist else repr(x) 793 794 795class _DNSRRdummy(Packet): 796 name = "Dummy class that implements post_build() for Resource Records" 797 798 def post_build(self, pkt, pay): 799 if self.rdlen is not None: 800 return pkt + pay 801 802 lrrname = len(self.fields_desc[0].i2m("", self.getfieldval("rrname"))) 803 tmp_len = len(pkt) - lrrname - 10 804 tmp_pkt = pkt[:lrrname + 8] 805 pkt = struct.pack("!H", tmp_len) + pkt[lrrname + 8 + 2:] 806 807 return tmp_pkt + pkt + pay 808 809 def default_payload_class(self, payload): 810 return conf.padding_layer 811 812 813class DNSRRHINFO(_DNSRRdummy): 814 name = "DNS HINFO Resource Record" 815 fields_desc = [DNSStrField("rrname", ""), 816 ShortEnumField("type", 13, dnstypes), 817 BitField("cacheflush", 0, 1), # mDNS RFC 6762 818 BitEnumField("rclass", 1, 15, dnsclasses), 819 IntField("ttl", 0), 820 ShortField("rdlen", None), 821 FieldLenField("cpulen", None, fmt="!B", length_of="cpu"), 822 StrLenField("cpu", "", length_from=lambda x: x.cpulen), 823 FieldLenField("oslen", None, fmt="!B", length_of="os"), 824 StrLenField("os", "", length_from=lambda x: x.oslen)] 825 826 827class DNSRRMX(_DNSRRdummy): 828 name = "DNS MX Resource Record" 829 fields_desc = [DNSStrField("rrname", ""), 830 ShortEnumField("type", 15, dnstypes), 831 BitField("cacheflush", 0, 1), # mDNS RFC 6762 832 BitEnumField("rclass", 1, 15, dnsclasses), 833 IntField("ttl", 0), 834 ShortField("rdlen", None), 835 ShortField("preference", 0), 836 DNSStrField("exchange", ""), 837 ] 838 839 840class DNSRRSOA(_DNSRRdummy): 841 name = "DNS SOA Resource Record" 842 fields_desc = [DNSStrField("rrname", ""), 843 ShortEnumField("type", 6, dnstypes), 844 ShortEnumField("rclass", 1, dnsclasses), 845 IntField("ttl", 0), 846 ShortField("rdlen", None), 847 DNSStrField("mname", ""), 848 DNSStrField("rname", ""), 849 IntField("serial", 0), 850 IntField("refresh", 0), 851 IntField("retry", 0), 852 IntField("expire", 0), 853 IntField("minimum", 0) 854 ] 855 856 857class DNSRRRSIG(_DNSRRdummy): 858 name = "DNS RRSIG Resource Record" 859 fields_desc = [DNSStrField("rrname", ""), 860 ShortEnumField("type", 46, dnstypes), 861 BitField("cacheflush", 0, 1), # mDNS RFC 6762 862 BitEnumField("rclass", 1, 15, dnsclasses), 863 IntField("ttl", 0), 864 ShortField("rdlen", None), 865 ShortEnumField("typecovered", 1, dnstypes), 866 ByteEnumField("algorithm", 5, dnssecalgotypes), 867 ByteField("labels", 0), 868 IntField("originalttl", 0), 869 UTCTimeField("expiration", 0), 870 UTCTimeField("inception", 0), 871 ShortField("keytag", 0), 872 DNSStrField("signersname", ""), 873 StrField("signature", "") 874 ] 875 876 877class DNSRRNSEC(_DNSRRdummy): 878 name = "DNS NSEC Resource Record" 879 fields_desc = [DNSStrField("rrname", ""), 880 ShortEnumField("type", 47, dnstypes), 881 BitField("cacheflush", 0, 1), # mDNS RFC 6762 882 BitEnumField("rclass", 1, 15, dnsclasses), 883 IntField("ttl", 0), 884 ShortField("rdlen", None), 885 DNSStrField("nextname", ""), 886 RRlistField("typebitmaps", []) 887 ] 888 889 890class DNSRRDNSKEY(_DNSRRdummy): 891 name = "DNS DNSKEY Resource Record" 892 fields_desc = [DNSStrField("rrname", ""), 893 ShortEnumField("type", 48, dnstypes), 894 BitField("cacheflush", 0, 1), # mDNS RFC 6762 895 BitEnumField("rclass", 1, 15, dnsclasses), 896 IntField("ttl", 0), 897 ShortField("rdlen", None), 898 FlagsField("flags", 256, 16, "S???????Z???????"), 899 # S: Secure Entry Point 900 # Z: Zone Key 901 ByteField("protocol", 3), 902 ByteEnumField("algorithm", 5, dnssecalgotypes), 903 StrField("publickey", "") 904 ] 905 906 907class DNSRRDS(_DNSRRdummy): 908 name = "DNS DS Resource Record" 909 fields_desc = [DNSStrField("rrname", ""), 910 ShortEnumField("type", 43, dnstypes), 911 BitField("cacheflush", 0, 1), # mDNS RFC 6762 912 BitEnumField("rclass", 1, 15, dnsclasses), 913 IntField("ttl", 0), 914 ShortField("rdlen", None), 915 ShortField("keytag", 0), 916 ByteEnumField("algorithm", 5, dnssecalgotypes), 917 ByteEnumField("digesttype", 5, dnssecdigesttypes), 918 StrField("digest", "") 919 ] 920 921 922# RFC 5074 - DNSSEC Lookaside Validation (DLV) 923class DNSRRDLV(DNSRRDS): 924 name = "DNS DLV Resource Record" 925 926 def __init__(self, *args, **kargs): 927 DNSRRDS.__init__(self, *args, **kargs) 928 if not kargs.get('type', 0): 929 self.type = 32769 930 931# RFC 5155 - DNS Security (DNSSEC) Hashed Authenticated Denial of Existence 932 933 934class DNSRRNSEC3(_DNSRRdummy): 935 name = "DNS NSEC3 Resource Record" 936 fields_desc = [DNSStrField("rrname", ""), 937 ShortEnumField("type", 50, dnstypes), 938 BitField("cacheflush", 0, 1), # mDNS RFC 6762 939 BitEnumField("rclass", 1, 15, dnsclasses), 940 IntField("ttl", 0), 941 ShortField("rdlen", None), 942 ByteField("hashalg", 0), 943 BitEnumField("flags", 0, 8, {1: "Opt-Out"}), 944 ShortField("iterations", 0), 945 FieldLenField("saltlength", 0, fmt="!B", length_of="salt"), 946 StrLenField("salt", "", length_from=lambda x: x.saltlength), 947 FieldLenField("hashlength", 0, fmt="!B", length_of="nexthashedownername"), # noqa: E501 948 StrLenField("nexthashedownername", "", length_from=lambda x: x.hashlength), # noqa: E501 949 RRlistField("typebitmaps", []) 950 ] 951 952 953class DNSRRNSEC3PARAM(_DNSRRdummy): 954 name = "DNS NSEC3PARAM Resource Record" 955 fields_desc = [DNSStrField("rrname", ""), 956 ShortEnumField("type", 51, dnstypes), 957 BitField("cacheflush", 0, 1), # mDNS RFC 6762 958 BitEnumField("rclass", 1, 15, dnsclasses), 959 IntField("ttl", 0), 960 ShortField("rdlen", None), 961 ByteField("hashalg", 0), 962 ByteField("flags", 0), 963 ShortField("iterations", 0), 964 FieldLenField("saltlength", 0, fmt="!B", length_of="salt"), 965 StrLenField("salt", "", length_from=lambda pkt: pkt.saltlength) # noqa: E501 966 ] 967 968 969# RFC 9460 Service Binding and Parameter Specification via the DNS 970# https://www.rfc-editor.org/rfc/rfc9460.html 971 972 973# https://www.iana.org/assignments/dns-svcb/dns-svcb.xhtml 974svc_param_keys = { 975 0: "mandatory", 976 1: "alpn", 977 2: "no-default-alpn", 978 3: "port", 979 4: "ipv4hint", 980 5: "ech", 981 6: "ipv6hint", 982 7: "dohpath", 983 8: "ohttp", 984} 985 986 987class SvcParam(Packet): 988 name = "SvcParam" 989 fields_desc = [ShortEnumField("key", 0, svc_param_keys), 990 FieldLenField("len", None, length_of="value", fmt="H"), 991 MultipleTypeField( 992 [ 993 # mandatory 994 (FieldListField("value", [], 995 ShortEnumField("", 0, svc_param_keys), 996 length_from=lambda pkt: pkt.len), 997 lambda pkt: pkt.key == 0), 998 # alpn, no-default-alpn 999 (DNSTextField("value", [], 1000 length_from=lambda pkt: pkt.len), 1001 lambda pkt: pkt.key in (1, 2)), 1002 # port 1003 (ShortField("value", 0), 1004 lambda pkt: pkt.key == 3), 1005 # ipv4hint 1006 (FieldListField("value", [], 1007 IPField("", "0.0.0.0"), 1008 length_from=lambda pkt: pkt.len), 1009 lambda pkt: pkt.key == 4), 1010 # ipv6hint 1011 (FieldListField("value", [], 1012 IP6Field("", "::"), 1013 length_from=lambda pkt: pkt.len), 1014 lambda pkt: pkt.key == 6), 1015 ], 1016 StrLenField("value", "", 1017 length_from=lambda pkt:pkt.len))] 1018 1019 def extract_padding(self, p): 1020 return "", p 1021 1022 1023class DNSRRSVCB(_DNSRRdummy): 1024 name = "DNS SVCB Resource Record" 1025 fields_desc = [DNSStrField("rrname", ""), 1026 ShortEnumField("type", 64, dnstypes), 1027 BitField("cacheflush", 0, 1), # mDNS RFC 6762 1028 BitEnumField("rclass", 1, 15, dnsclasses), 1029 IntField("ttl", 0), 1030 ShortField("rdlen", None), 1031 ShortField("svc_priority", 0), 1032 DNSStrField("target_name", ""), 1033 PacketListField("svc_params", [], SvcParam)] 1034 1035 1036class DNSRRHTTPS(_DNSRRdummy): 1037 name = "DNS HTTPS Resource Record" 1038 fields_desc = [DNSStrField("rrname", ""), 1039 ShortEnumField("type", 65, dnstypes) 1040 ] + DNSRRSVCB.fields_desc[2:] 1041 1042 1043# RFC 2782 - A DNS RR for specifying the location of services (DNS SRV) 1044 1045 1046class DNSRRSRV(_DNSRRdummy): 1047 name = "DNS SRV Resource Record" 1048 fields_desc = [DNSStrField("rrname", ""), 1049 ShortEnumField("type", 33, dnstypes), 1050 BitField("cacheflush", 0, 1), # mDNS RFC 6762 1051 BitEnumField("rclass", 1, 15, dnsclasses), 1052 IntField("ttl", 0), 1053 ShortField("rdlen", None), 1054 ShortField("priority", 0), 1055 ShortField("weight", 0), 1056 ShortField("port", 0), 1057 DNSStrField("target", ""), ] 1058 1059 1060# RFC 2845 - Secret Key Transaction Authentication for DNS (TSIG) 1061tsig_algo_sizes = {"HMAC-MD5.SIG-ALG.REG.INT": 16, 1062 "hmac-sha1": 20} 1063 1064 1065class TimeSignedField(Field[int, bytes]): 1066 def __init__(self, name, default): 1067 Field.__init__(self, name, default, fmt="6s") 1068 1069 def _convert_seconds(self, packed_seconds): 1070 """Unpack the internal representation.""" 1071 seconds = struct.unpack("!H", packed_seconds[:2])[0] 1072 seconds += struct.unpack("!I", packed_seconds[2:])[0] 1073 return seconds 1074 1075 def i2m(self, pkt, seconds): 1076 """Convert the number of seconds since 1-Jan-70 UTC to the packed 1077 representation.""" 1078 1079 if seconds is None: 1080 seconds = 0 1081 1082 tmp_short = (seconds >> 32) & 0xFFFF 1083 tmp_int = seconds & 0xFFFFFFFF 1084 1085 return struct.pack("!HI", tmp_short, tmp_int) 1086 1087 def m2i(self, pkt, packed_seconds): 1088 """Convert the internal representation to the number of seconds 1089 since 1-Jan-70 UTC.""" 1090 1091 if packed_seconds is None: 1092 return None 1093 1094 return self._convert_seconds(packed_seconds) 1095 1096 def i2repr(self, pkt, packed_seconds): 1097 """Convert the internal representation to a nice one using the RFC 1098 format.""" 1099 time_struct = time.gmtime(packed_seconds) 1100 return time.strftime("%a %b %d %H:%M:%S %Y", time_struct) 1101 1102 1103class DNSRRTSIG(_DNSRRdummy): 1104 name = "DNS TSIG Resource Record" 1105 fields_desc = [DNSStrField("rrname", ""), 1106 ShortEnumField("type", 250, dnstypes), 1107 ShortEnumField("rclass", 1, dnsclasses), 1108 IntField("ttl", 0), 1109 ShortField("rdlen", None), 1110 DNSStrField("algo_name", "hmac-sha1"), 1111 TimeSignedField("time_signed", 0), 1112 ShortField("fudge", 0), 1113 FieldLenField("mac_len", 20, fmt="!H", length_of="mac_data"), # noqa: E501 1114 StrLenField("mac_data", "", length_from=lambda pkt: pkt.mac_len), # noqa: E501 1115 ShortField("original_id", 0), 1116 ShortField("error", 0), 1117 FieldLenField("other_len", 0, fmt="!H", length_of="other_data"), # noqa: E501 1118 StrLenField("other_data", "", length_from=lambda pkt: pkt.other_len) # noqa: E501 1119 ] 1120 1121 1122class DNSRRNAPTR(_DNSRRdummy): 1123 name = "DNS NAPTR Resource Record" 1124 fields_desc = [DNSStrField("rrname", ""), 1125 ShortEnumField("type", 35, dnstypes), 1126 BitField("cacheflush", 0, 1), # mDNS RFC 6762 1127 BitEnumField("rclass", 1, 15, dnsclasses), 1128 IntField("ttl", 0), 1129 ShortField("rdlen", None), 1130 ShortField("order", 0), 1131 ShortField("preference", 0), 1132 FieldLenField("flags_len", None, fmt="!B", length_of="flags"), 1133 StrLenField("flags", "", length_from=lambda pkt: pkt.flags_len), 1134 FieldLenField("services_len", None, fmt="!B", length_of="services"), 1135 StrLenField("services", "", 1136 length_from=lambda pkt: pkt.services_len), 1137 FieldLenField("regexp_len", None, fmt="!B", length_of="regexp"), 1138 StrLenField("regexp", "", length_from=lambda pkt: pkt.regexp_len), 1139 DNSStrField("replacement", ""), 1140 ] 1141 1142 1143DNSRR_DISPATCHER = { 1144 6: DNSRRSOA, # RFC 1035 1145 13: DNSRRHINFO, # RFC 1035 1146 15: DNSRRMX, # RFC 1035 1147 33: DNSRRSRV, # RFC 2782 1148 35: DNSRRNAPTR, # RFC 2915 1149 41: DNSRROPT, # RFC 1671 1150 43: DNSRRDS, # RFC 4034 1151 46: DNSRRRSIG, # RFC 4034 1152 47: DNSRRNSEC, # RFC 4034 1153 48: DNSRRDNSKEY, # RFC 4034 1154 50: DNSRRNSEC3, # RFC 5155 1155 51: DNSRRNSEC3PARAM, # RFC 5155 1156 64: DNSRRSVCB, # RFC 9460 1157 65: DNSRRHTTPS, # RFC 9460 1158 250: DNSRRTSIG, # RFC 2845 1159 32769: DNSRRDLV, # RFC 4431 1160} 1161 1162 1163class DNSRR(Packet): 1164 name = "DNS Resource Record" 1165 show_indent = 0 1166 fields_desc = [DNSStrField("rrname", ""), 1167 ShortEnumField("type", 1, dnstypes), 1168 BitField("cacheflush", 0, 1), # mDNS RFC 6762 1169 BitEnumField("rclass", 1, 15, dnsclasses), 1170 IntField("ttl", 0), 1171 FieldLenField("rdlen", None, length_of="rdata", fmt="H"), 1172 MultipleTypeField( 1173 [ 1174 # A 1175 (IPField("rdata", "0.0.0.0"), 1176 lambda pkt: pkt.type == 1), 1177 # AAAA 1178 (IP6Field("rdata", "::"), 1179 lambda pkt: pkt.type == 28), 1180 # NS, MD, MF, CNAME, PTR, DNAME 1181 (DNSStrField("rdata", "", 1182 length_from=lambda pkt: pkt.rdlen), 1183 lambda pkt: pkt.type in [2, 3, 4, 5, 12, 39]), 1184 # TEXT 1185 (DNSTextField("rdata", [""], 1186 length_from=lambda pkt: pkt.rdlen), 1187 lambda pkt: pkt.type == 16), 1188 ], 1189 StrLenField("rdata", "", 1190 length_from=lambda pkt:pkt.rdlen) 1191 )] 1192 1193 def default_payload_class(self, payload): 1194 return conf.padding_layer 1195 1196 1197def _DNSRR(s, **kwargs): 1198 """ 1199 DNSRR dispatcher func 1200 """ 1201 if s: 1202 # Try to find the type of the RR using the dispatcher 1203 _, remain = dns_get_str(s, _ignore_compression=True) 1204 cls = DNSRR_DISPATCHER.get( 1205 struct.unpack("!H", remain[:2])[0], 1206 DNSRR, 1207 ) 1208 rrlen = ( 1209 len(s) - len(remain) + # rrname len 1210 10 + 1211 struct.unpack("!H", remain[8:10])[0] 1212 ) 1213 pkt = cls(s[:rrlen], **kwargs) / conf.padding_layer(s[rrlen:]) 1214 # drop rdlen because if rdata was compressed, it will break everything 1215 # when rebuilding 1216 del pkt.fields["rdlen"] 1217 return pkt 1218 return None 1219 1220 1221class DNSQR(Packet): 1222 name = "DNS Question Record" 1223 show_indent = 0 1224 fields_desc = [DNSStrField("qname", "www.example.com"), 1225 ShortEnumField("qtype", 1, dnsqtypes), 1226 BitField("unicastresponse", 0, 1), # mDNS RFC 6762 1227 BitEnumField("qclass", 1, 15, dnsclasses)] 1228 1229 def default_payload_class(self, payload): 1230 return conf.padding_layer 1231 1232 1233class _DNSPacketListField(PacketListField): 1234 # A normal PacketListField with backward-compatible hacks 1235 def any2i(self, pkt, x): 1236 # type: (Optional[Packet], List[Any]) -> List[Any] 1237 if x is None: 1238 warnings.warn( 1239 ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now " 1240 "PacketListField(s) ! " 1241 "Setting a null default should be [] instead of None"), 1242 DeprecationWarning 1243 ) 1244 x = [] 1245 return super(_DNSPacketListField, self).any2i(pkt, x) 1246 1247 def i2h(self, pkt, x): 1248 # type: (Optional[Packet], List[Packet]) -> Any 1249 class _list(list): 1250 """ 1251 Fake list object to provide compatibility with older DNS fields 1252 """ 1253 def __getattr__(self, attr): 1254 try: 1255 ret = getattr(self[0], attr) 1256 warnings.warn( 1257 ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now " 1258 "PacketListField(s) ! " 1259 "To access the first element, use pkt.an[0] instead of " 1260 "pkt.an"), 1261 DeprecationWarning 1262 ) 1263 return ret 1264 except AttributeError: 1265 raise 1266 return _list(x) 1267 1268 1269class DNS(DNSCompressedPacket): 1270 name = "DNS" 1271 fields_desc = [ 1272 ConditionalField(ShortField("length", None), 1273 lambda p: isinstance(p.underlayer, TCP)), 1274 ShortField("id", 0), 1275 BitField("qr", 0, 1), 1276 BitEnumField("opcode", 0, 4, {0: "QUERY", 1: "IQUERY", 2: "STATUS"}), 1277 BitField("aa", 0, 1), 1278 BitField("tc", 0, 1), 1279 BitField("rd", 1, 1), 1280 BitField("ra", 0, 1), 1281 BitField("z", 0, 1), 1282 # AD and CD bits are defined in RFC 2535 1283 BitField("ad", 0, 1), # Authentic Data 1284 BitField("cd", 0, 1), # Checking Disabled 1285 BitEnumField("rcode", 0, 4, {0: "ok", 1: "format-error", 1286 2: "server-failure", 3: "name-error", 1287 4: "not-implemented", 5: "refused"}), 1288 FieldLenField("qdcount", None, count_of="qd"), 1289 FieldLenField("ancount", None, count_of="an"), 1290 FieldLenField("nscount", None, count_of="ns"), 1291 FieldLenField("arcount", None, count_of="ar"), 1292 _DNSPacketListField("qd", [DNSQR()], DNSQR, count_from=lambda pkt: pkt.qdcount), 1293 _DNSPacketListField("an", [], _DNSRR, count_from=lambda pkt: pkt.ancount), 1294 _DNSPacketListField("ns", [], _DNSRR, count_from=lambda pkt: pkt.nscount), 1295 _DNSPacketListField("ar", [], _DNSRR, count_from=lambda pkt: pkt.arcount), 1296 ] 1297 1298 def get_full(self): 1299 # Required for DNSCompressedPacket 1300 if isinstance(self.underlayer, TCP): 1301 return self.original[2:] 1302 else: 1303 return self.original 1304 1305 def answers(self, other): 1306 return (isinstance(other, DNS) and 1307 self.id == other.id and 1308 self.qr == 1 and 1309 other.qr == 0) 1310 1311 def mysummary(self): 1312 name = "" 1313 if self.qr: 1314 type = "Ans" 1315 if self.an and isinstance(self.an[0], DNSRR): 1316 name = ' %s' % self.an[0].rdata 1317 elif self.rcode != 0: 1318 name = self.sprintf(' %rcode%') 1319 else: 1320 type = "Qry" 1321 if self.qd and isinstance(self.qd[0], DNSQR): 1322 name = ' %s' % self.qd[0].qname 1323 return "%sDNS %s%s" % ( 1324 "m" 1325 if isinstance(self.underlayer, UDP) and self.underlayer.dport == 5353 1326 else "", 1327 type, 1328 name, 1329 ) 1330 1331 def post_build(self, pkt, pay): 1332 if isinstance(self.underlayer, TCP) and self.length is None: 1333 pkt = struct.pack("!H", len(pkt) - 2) + pkt[2:] 1334 return pkt + pay 1335 1336 def compress(self): 1337 """Return the compressed DNS packet (using `dns_compress()`)""" 1338 return dns_compress(self) 1339 1340 def pre_dissect(self, s): 1341 """ 1342 Check that a valid DNS over TCP message can be decoded 1343 """ 1344 if isinstance(self.underlayer, TCP): 1345 1346 # Compute the length of the DNS packet 1347 if len(s) >= 2: 1348 dns_len = struct.unpack("!H", s[:2])[0] 1349 else: 1350 message = "Malformed DNS message: too small!" 1351 log_runtime.info(message) 1352 raise Scapy_Exception(message) 1353 1354 # Check if the length is valid 1355 if dns_len < 14 or len(s) < dns_len: 1356 message = "Malformed DNS message: invalid length!" 1357 log_runtime.info(message) 1358 raise Scapy_Exception(message) 1359 1360 return s 1361 1362 1363bind_layers(UDP, DNS, dport=5353) 1364bind_layers(UDP, DNS, sport=5353) 1365bind_layers(UDP, DNS, dport=53) 1366bind_layers(UDP, DNS, sport=53) 1367DestIPField.bind_addr(UDP, "224.0.0.251", dport=5353) 1368if conf.ipv6_enabled: 1369 from scapy.layers.inet6 import DestIP6Field 1370 DestIP6Field.bind_addr(UDP, "ff02::fb", dport=5353) 1371bind_layers(TCP, DNS, dport=53) 1372bind_layers(TCP, DNS, sport=53) 1373 1374# Nameserver config 1375conf.nameservers = read_nameservers() 1376_dns_cache = conf.netcache.new_cache("dns_cache", 300) 1377 1378 1379@conf.commands.register 1380def dns_resolve(qname, qtype="A", raw=False, verbose=1, timeout=3, **kwargs): 1381 """ 1382 Perform a simple DNS resolution using conf.nameservers with caching 1383 1384 :param qname: the name to query 1385 :param qtype: the type to query (default A) 1386 :param raw: return the whole DNS packet (default False) 1387 :param verbose: show verbose errors 1388 :param timeout: seconds until timeout (per server) 1389 :raise TimeoutError: if no DNS servers were reached in time. 1390 """ 1391 # Unify types 1392 qtype = DNSQR.qtype.any2i_one(None, qtype) 1393 qname = DNSQR.qname.any2i(None, qname) 1394 # Check cache 1395 cache_ident = b";".join( 1396 [qname, struct.pack("!B", qtype)] + 1397 ([b"raw"] if raw else []) 1398 ) 1399 result = _dns_cache.get(cache_ident) 1400 if result: 1401 return result 1402 1403 kwargs.setdefault("timeout", timeout) 1404 kwargs.setdefault("verbose", 0) 1405 res = None 1406 for nameserver in conf.nameservers: 1407 # Try all nameservers 1408 try: 1409 # Spawn a UDP socket, connect to the nameserver on port 53 1410 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 1411 sock.settimeout(kwargs["timeout"]) 1412 sock.connect((nameserver, 53)) 1413 # Connected. Wrap it with DNS 1414 sock = StreamSocket(sock, DNS) 1415 # I/O 1416 res = sock.sr1( 1417 DNS(qd=[DNSQR(qname=qname, qtype=qtype)], id=RandShort()), 1418 **kwargs, 1419 ) 1420 except IOError as ex: 1421 if verbose: 1422 log_runtime.warning(str(ex)) 1423 continue 1424 finally: 1425 sock.close() 1426 if res: 1427 # We have a response ! Check for failure 1428 if res[DNS].rcode == 2: # server failure 1429 res = None 1430 if verbose: 1431 log_runtime.info( 1432 "DNS: %s answered with failure for %s" % ( 1433 nameserver, 1434 qname, 1435 ) 1436 ) 1437 else: 1438 break 1439 if res is not None: 1440 if raw: 1441 # Raw 1442 result = res 1443 else: 1444 # Find answers 1445 result = [ 1446 x 1447 for x in itertools.chain(res.an, res.ns, res.ar) 1448 if x.type == qtype 1449 ] 1450 if result: 1451 # Cache it 1452 _dns_cache[cache_ident] = result 1453 return result 1454 else: 1455 raise TimeoutError 1456 1457 1458@conf.commands.register 1459def dyndns_add(nameserver, name, rdata, type="A", ttl=10): 1460 """Send a DNS add message to a nameserver for "name" to have a new "rdata" 1461dyndns_add(nameserver, name, rdata, type="A", ttl=10) -> result code (0=ok) 1462 1463example: dyndns_add("ns1.toto.com", "dyn.toto.com", "127.0.0.1") 1464RFC2136 1465""" 1466 zone = name[name.find(".") + 1:] 1467 r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5, 1468 qd=[DNSQR(qname=zone, qtype="SOA")], # noqa: E501 1469 ns=[DNSRR(rrname=name, type="A", 1470 ttl=ttl, rdata=rdata)]), 1471 verbose=0, timeout=5) 1472 if r and r.haslayer(DNS): 1473 return r.getlayer(DNS).rcode 1474 else: 1475 return -1 1476 1477 1478@conf.commands.register 1479def dyndns_del(nameserver, name, type="ALL", ttl=10): 1480 """Send a DNS delete message to a nameserver for "name" 1481dyndns_del(nameserver, name, type="ANY", ttl=10) -> result code (0=ok) 1482 1483example: dyndns_del("ns1.toto.com", "dyn.toto.com") 1484RFC2136 1485""" 1486 zone = name[name.find(".") + 1:] 1487 r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5, 1488 qd=[DNSQR(qname=zone, qtype="SOA")], # noqa: E501 1489 ns=[DNSRR(rrname=name, type=type, 1490 rclass="ANY", ttl=0, rdata="")]), # noqa: E501 1491 verbose=0, timeout=5) 1492 if r and r.haslayer(DNS): 1493 return r.getlayer(DNS).rcode 1494 else: 1495 return -1 1496 1497 1498class DNS_am(AnsweringMachine): 1499 function_name = "dnsd" 1500 filter = "udp port 53" 1501 cls = DNS # We also use this automaton for llmnrd / mdnsd 1502 1503 def parse_options(self, joker=None, 1504 match=None, 1505 srvmatch=None, 1506 joker6=False, 1507 send_error=False, 1508 relay=False, 1509 from_ip=True, 1510 from_ip6=False, 1511 src_ip=None, 1512 src_ip6=None, 1513 ttl=10, 1514 jokerarpa=False): 1515 """ 1516 Simple DNS answering machine. 1517 1518 :param joker: default IPv4 for unresolved domains. 1519 Set to False to disable, None to mirror the interface's IP. 1520 Defaults to None, unless 'match' is used, then it defaults to 1521 False. 1522 :param joker6: default IPv6 for unresolved domains. 1523 Set to False to disable, None to mirror the interface's IPv6. 1524 Defaults to False. 1525 :param match: queries to match. 1526 This can be a dictionary of {name: val} where name is a string 1527 representing a domain name (A, AAAA) and val is a tuple of 2 1528 elements, each representing an IP or a list of IPs. If val is 1529 a single element, (A, None) is assumed. 1530 This can also be a list or names, in which case joker(6) are 1531 used as a response. 1532 :param jokerarpa: answer for .in-addr.arpa PTR requests. (Default: False) 1533 :param relay: relay unresolved domains to conf.nameservers (Default: False). 1534 :param send_error: send an error message when this server can't answer 1535 (Default: False) 1536 :param srvmatch: a dictionary of {name: (port, target)} used for SRV 1537 :param from_ip: an source IP to filter. Can contain a netmask. True for all, 1538 False for none. Default True 1539 :param from_ip6: an source IPv6 to filter. Can contain a netmask. True for all, 1540 False for none. Default False 1541 :param ttl: the DNS time to live (in seconds) 1542 :param src_ip: override the source IP 1543 :param src_ip6: 1544 1545 Examples: 1546 1547 - Answer all 'A' and 'AAAA' requests:: 1548 1549 $ sudo iptables -I OUTPUT -p icmp --icmp-type 3/3 -j DROP 1550 >>> dnsd(joker="192.168.0.2", joker6="fe80::260:8ff:fe52:f9d8", 1551 ... iface="eth0") 1552 1553 - Answer only 'A' query for google.com with 192.168.0.2:: 1554 1555 >>> dnsd(match={"google.com": "192.168.0.2"}, iface="eth0") 1556 1557 - Answer DNS for a Windows domain controller ('SRV', 'A' and 'AAAA'):: 1558 1559 >>> dnsd( 1560 ... srvmatch={ 1561 ... "_ldap._tcp.dc._msdcs.DOMAIN.LOCAL.": (389, 1562 ... "srv1.domain.local"), 1563 ... }, 1564 ... match={"src1.domain.local": ("192.168.0.102", 1565 ... "fe80::260:8ff:fe52:f9d8")}, 1566 ... ) 1567 1568 - Relay all queries to another DNS server, except some:: 1569 1570 >>> conf.nameservers = ["1.1.1.1"] # server to relay to 1571 >>> dnsd( 1572 ... match={"test.com": "1.1.1.1"}, 1573 ... relay=True, 1574 ... ) 1575 """ 1576 from scapy.layers.inet6 import Net6 1577 1578 self.mDNS = isinstance(self, mDNS_am) 1579 self.llmnr = self.cls != DNS 1580 1581 # Add some checks (to help) 1582 if not isinstance(joker, (str, bool)) and joker is not None: 1583 raise ValueError("Bad 'joker': should be an IPv4 (str) or False !") 1584 if not isinstance(joker6, (str, bool)) and joker6 is not None: 1585 raise ValueError("Bad 'joker6': should be an IPv6 (str) or False !") 1586 if not isinstance(jokerarpa, (str, bool)): 1587 raise ValueError("Bad 'jokerarpa': should be a hostname or False !") 1588 if not isinstance(from_ip, (str, Net, bool)): 1589 raise ValueError("Bad 'from_ip': should be an IPv4 (str), Net or False !") 1590 if not isinstance(from_ip6, (str, Net6, bool)): 1591 raise ValueError("Bad 'from_ip6': should be an IPv6 (str), Net or False !") 1592 if self.mDNS and src_ip: 1593 raise ValueError("Cannot use 'src_ip' in mDNS !") 1594 if self.mDNS and src_ip6: 1595 raise ValueError("Cannot use 'src_ip6' in mDNS !") 1596 1597 if joker is None and match is not None: 1598 joker = False 1599 self.joker = joker 1600 self.joker6 = joker6 1601 self.jokerarpa = jokerarpa 1602 1603 def normv(v): 1604 if isinstance(v, (tuple, list)) and len(v) == 2: 1605 return tuple(v) 1606 elif isinstance(v, str): 1607 return (v, joker6) 1608 else: 1609 raise ValueError("Bad match value: '%s'" % repr(v)) 1610 1611 def normk(k): 1612 k = bytes_encode(k).lower() 1613 if not k.endswith(b"."): 1614 k += b"." 1615 return k 1616 1617 self.match = collections.defaultdict(lambda: (joker, joker6)) 1618 if match: 1619 if isinstance(match, (list, set)): 1620 self.match.update({normk(k): (None, None) for k in match}) 1621 else: 1622 self.match.update({normk(k): normv(v) for k, v in match.items()}) 1623 if srvmatch is None: 1624 self.srvmatch = {} 1625 else: 1626 self.srvmatch = {normk(k): normv(v) for k, v in srvmatch.items()} 1627 1628 self.send_error = send_error 1629 self.relay = relay 1630 if isinstance(from_ip, str): 1631 self.from_ip = Net(from_ip) 1632 else: 1633 self.from_ip = from_ip 1634 if isinstance(from_ip6, str): 1635 self.from_ip6 = Net6(from_ip6) 1636 else: 1637 self.from_ip6 = from_ip6 1638 self.src_ip = src_ip 1639 self.src_ip6 = src_ip6 1640 self.ttl = ttl 1641 1642 def is_request(self, req): 1643 from scapy.layers.inet6 import IPv6 1644 return ( 1645 req.haslayer(self.cls) and 1646 req.getlayer(self.cls).qr == 0 and ( 1647 ( 1648 self.from_ip6 is True or 1649 (self.from_ip6 and req[IPv6].src in self.from_ip6) 1650 ) 1651 if IPv6 in req else 1652 ( 1653 self.from_ip is True or 1654 (self.from_ip and req[IP].src in self.from_ip) 1655 ) 1656 ) 1657 ) 1658 1659 def make_reply(self, req): 1660 # Build reply from the request 1661 resp = req.copy() 1662 if Ether in req: 1663 if self.mDNS: 1664 resp[Ether].src, resp[Ether].dst = None, None 1665 elif self.llmnr: 1666 resp[Ether].src, resp[Ether].dst = None, req[Ether].src 1667 else: 1668 resp[Ether].src, resp[Ether].dst = ( 1669 None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst, 1670 req[Ether].src, 1671 ) 1672 from scapy.layers.inet6 import IPv6 1673 if IPv6 in req: 1674 resp[IPv6].underlayer.remove_payload() 1675 if self.mDNS: 1676 # "All Multicast DNS responses (including responses sent via unicast) 1677 # SHOULD be sent with IP TTL set to 255." 1678 resp /= IPv6(dst="ff02::fb", src=self.src_ip6, 1679 fl=req[IPv6].fl, hlim=255) 1680 elif self.llmnr: 1681 resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6, 1682 fl=req[IPv6].fl, hlim=req[IPv6].hlim) 1683 else: 1684 resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6 or req[IPv6].dst, 1685 fl=req[IPv6].fl, hlim=req[IPv6].hlim) 1686 elif IP in req: 1687 resp[IP].underlayer.remove_payload() 1688 if self.mDNS: 1689 # "All Multicast DNS responses (including responses sent via unicast) 1690 # SHOULD be sent with IP TTL set to 255." 1691 resp /= IP(dst="224.0.0.251", src=self.src_ip, 1692 id=req[IP].id, ttl=255) 1693 elif self.llmnr: 1694 resp /= IP(dst=req[IP].src, src=self.src_ip, 1695 id=req[IP].id, ttl=req[IP].ttl) 1696 else: 1697 resp /= IP(dst=req[IP].src, src=self.src_ip or req[IP].dst, 1698 id=req[IP].id, ttl=req[IP].ttl) 1699 else: 1700 warning("No IP or IPv6 layer in %s", req.command()) 1701 return 1702 try: 1703 resp /= UDP(sport=req[UDP].dport, dport=req[UDP].sport) 1704 except IndexError: 1705 warning("No UDP layer in %s", req.command(), exc_info=True) 1706 return 1707 try: 1708 req = req[self.cls] 1709 except IndexError: 1710 warning( 1711 "No %s layer in %s", 1712 self.cls.__name__, 1713 req.command(), 1714 exc_info=True, 1715 ) 1716 return 1717 try: 1718 queries = req.qd 1719 except AttributeError: 1720 warning("No qd attribute in %s", req.command(), exc_info=True) 1721 return 1722 # Special case: alias 'ALL' query as 'A' + 'AAAA' 1723 try: 1724 allquery = next( 1725 (x for x in queries if getattr(x, "qtype", None) == 255) 1726 ) 1727 queries.remove(allquery) 1728 queries.extend([ 1729 DNSQR( 1730 qtype=x, 1731 qname=allquery.qname, 1732 unicastresponse=allquery.unicastresponse, 1733 qclass=allquery.qclass, 1734 ) 1735 for x in [1, 28] 1736 ]) 1737 except StopIteration: 1738 pass 1739 # Process each query 1740 ans = [] 1741 ars = [] 1742 for rq in queries: 1743 if isinstance(rq, Raw): 1744 warning("Cannot parse qd element %s", rq.command(), exc_info=True) 1745 continue 1746 rqname = rq.qname.lower() 1747 if rq.qtype in [1, 28]: 1748 # A or AAAA 1749 if rq.qtype == 28: 1750 # AAAA 1751 rdata = self.match[rqname][1] 1752 if rdata is None and not self.relay: 1753 # 'None' resolves to the default IPv6 1754 iface = resolve_iface(self.optsniff.get("iface", conf.iface)) 1755 if self.mDNS: 1756 # All IPs, as per mDNS. 1757 rdata = iface.ips[6] 1758 else: 1759 rdata = get_if_addr6( 1760 iface 1761 ) 1762 if self.mDNS and rdata and IPv6 in resp: 1763 # For mDNS, we must replace the IPv6 src 1764 resp[IPv6].src = rdata 1765 elif rq.qtype == 1: 1766 # A 1767 rdata = self.match[rqname][0] 1768 if rdata is None and not self.relay: 1769 # 'None' resolves to the default IPv4 1770 iface = resolve_iface(self.optsniff.get("iface", conf.iface)) 1771 if self.mDNS: 1772 # All IPs, as per mDNS. 1773 rdata = iface.ips[4] 1774 else: 1775 rdata = get_if_addr( 1776 iface 1777 ) 1778 if self.mDNS and rdata and IP in resp: 1779 # For mDNS, we must replace the IP src 1780 resp[IP].src = rdata 1781 if rdata: 1782 # Common A and AAAA 1783 if not isinstance(rdata, list): 1784 rdata = [rdata] 1785 ans.extend([ 1786 DNSRR( 1787 rrname=rq.qname, 1788 ttl=self.ttl, 1789 rdata=x, 1790 type=rq.qtype, 1791 cacheflush=self.mDNS and rq.qtype == rq.qtype, 1792 ) 1793 for x in rdata 1794 ]) 1795 continue # next 1796 elif rq.qtype == 33: 1797 # SRV 1798 try: 1799 port, target = self.srvmatch[rqname] 1800 ans.append(DNSRRSRV( 1801 rrname=rq.qname, 1802 port=port, 1803 target=target, 1804 weight=100, 1805 ttl=self.ttl 1806 )) 1807 continue # next 1808 except KeyError: 1809 # No result 1810 pass 1811 elif rq.qtype == 12: 1812 # PTR 1813 if rq.qname[-14:] == b".in-addr.arpa." and self.jokerarpa: 1814 ans.append(DNSRR( 1815 rrname=rq.qname, 1816 type=rq.qtype, 1817 ttl=self.ttl, 1818 rdata=self.jokerarpa, 1819 )) 1820 continue 1821 # It it arrives here, there is currently no answer 1822 if self.relay: 1823 # Relay mode ? 1824 try: 1825 _rslv = dns_resolve(rq.qname, qtype=rq.qtype) 1826 if _rslv: 1827 ans.extend(_rslv) 1828 continue # next 1829 except TimeoutError: 1830 pass 1831 # Still no answer. 1832 if self.mDNS: 1833 # "Any time a responder receives a query for a name for which it 1834 # has verified exclusive ownership, for a type for which that name 1835 # has no records, the responder MUST respond asserting the 1836 # nonexistence of that record using a DNS NSEC record [RFC4034]." 1837 ans.append(DNSRRNSEC( 1838 # RFC6762 sect 6.1 - Negative Response 1839 ttl=self.ttl, 1840 rrname=rq.qname, 1841 nextname=rq.qname, 1842 typebitmaps=RRlist2bitmap([rq.qtype]), 1843 )) 1844 if self.mDNS and all(x.type == 47 for x in ans): 1845 # If mDNS answers with only NSEC, discard. 1846 return 1847 if not ans: 1848 # No answer is available. 1849 if self.send_error: 1850 resp /= self.cls(id=req.id, qr=1, qd=req.qd, rcode=3) 1851 return resp 1852 log_runtime.info("No answer could be provided to: %s" % req.summary()) 1853 return 1854 # Handle Additional Records 1855 if self.mDNS: 1856 # Windows specific extension 1857 ars.append(DNSRROPT( 1858 z=0x1194, 1859 rdata=[ 1860 EDNS0OWN( 1861 primary_mac=resp[Ether].src, 1862 ), 1863 ], 1864 )) 1865 # All rq were answered 1866 if self.mDNS: 1867 # in mDNS mode, don't repeat the question, set aa=1, rd=0 1868 dns = self.cls(id=req.id, aa=1, rd=0, qr=1, qd=[], ar=ars, an=ans) 1869 else: 1870 dns = self.cls(id=req.id, qr=1, qd=req.qd, ar=ars, an=ans) 1871 # Compress DNS and mDNS 1872 if not self.llmnr: 1873 resp /= dns_compress(dns) 1874 else: 1875 resp /= dns 1876 return resp 1877 1878 1879class mDNS_am(DNS_am): 1880 """ 1881 mDNS answering machine. 1882 1883 This has the same arguments as DNS_am. See help(DNS_am) 1884 1885 Example:: 1886 1887 - Answer for 'TEST.local' with local IPv4:: 1888 1889 >>> mdnsd(match=["TEST.local"]) 1890 1891 - Answer all requests with other IP:: 1892 1893 >>> mdnsd(joker="192.168.0.2", joker6="fe80::260:8ff:fe52:f9d8", 1894 ... iface="eth0") 1895 1896 - Answer for multiple different mDNS names:: 1897 1898 >>> mdnsd(match={"TEST.local": "192.168.0.100", 1899 ... "BOB.local": "192.168.0.101"}) 1900 1901 - Answer with both A and AAAA records:: 1902 1903 >>> mdnsd(match={"TEST.local": ("192.168.0.100", 1904 ... "fe80::260:8ff:fe52:f9d8")}) 1905 """ 1906 function_name = "mdnsd" 1907 filter = "udp port 5353" 1908 1909 1910# DNS-SD (RFC 6763) 1911 1912 1913class DNSSDResult(SndRcvList): 1914 def __init__(self, 1915 res=None, # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]] # noqa: E501 1916 name="DNS-SD", # type: str 1917 stats=None # type: Optional[List[Type[Packet]]] 1918 ): 1919 SndRcvList.__init__(self, res, name, stats) 1920 1921 def show(self, types=['PTR', 'SRV'], alltypes=False): 1922 # type: (List[str], bool) -> None 1923 """ 1924 Print the list of discovered services. 1925 1926 :param types: types to show. Default ['PTR', 'SRV'] 1927 :param alltypes: show all types. Default False 1928 """ 1929 if alltypes: 1930 types = None 1931 data = list() # type: List[Tuple[str | List[str], ...]] 1932 1933 resolve_mac = ( 1934 self.res and isinstance(self.res[0][1].underlayer, Ether) and 1935 conf.manufdb 1936 ) 1937 1938 header = ("IP", "Service") 1939 if resolve_mac: 1940 header = ("Mac",) + header 1941 1942 for _, r in self.res: 1943 attrs = [] 1944 for attr in itertools.chain(r[DNS].an, r[DNS].ar): 1945 if types and dnstypes.get(attr.type) not in types: 1946 continue 1947 if isinstance(attr, DNSRRNSEC): 1948 attrs.append(attr.sprintf("%type%=%nextname%")) 1949 elif isinstance(attr, DNSRRSRV): 1950 attrs.append(attr.sprintf("%type%=(%target%,%port%)")) 1951 else: 1952 attrs.append(attr.sprintf("%type%=%rdata%")) 1953 ans = (r.src, attrs) 1954 if resolve_mac: 1955 mac = conf.manufdb._resolve_MAC(r.underlayer.src) 1956 data.append((mac,) + ans) 1957 else: 1958 data.append(ans) 1959 1960 print( 1961 pretty_list( 1962 data, 1963 [header], 1964 ) 1965 ) 1966 1967 1968@conf.commands.register 1969def dnssd(service="_services._dns-sd._udp.local", 1970 af=socket.AF_INET, 1971 qtype="PTR", 1972 iface=None, 1973 verbose=2, 1974 timeout=3): 1975 """ 1976 Performs a DNS-SD (RFC6763) request 1977 1978 :param service: the service name to query (e.g. _spotify-connect._tcp.local) 1979 :param af: the transport to use. socket.AF_INET or socket.AF_INET6 1980 :param qtype: the type to use in the mDNS. Either TXT, PTR or SRV. 1981 :param iface: the interface to do this discovery on. 1982 """ 1983 if af == socket.AF_INET: 1984 pkt = IP(dst=ScopedIP("224.0.0.251", iface), ttl=255) 1985 elif af == socket.AF_INET6: 1986 pkt = IPv6(dst=ScopedIP("ff02::fb", iface)) 1987 else: 1988 return 1989 pkt /= UDP(sport=5353, dport=5353) 1990 pkt /= DNS(rd=0, qd=[DNSQR(qname=service, qtype=qtype)]) 1991 ans, _ = sr(pkt, multi=True, timeout=timeout, verbose=verbose) 1992 return DNSSDResult(ans.res) 1993