• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5# Copyright (C) 2005  Guillaume Valadon <guedou@hongo.wide.ad.jp>
6#                     Arnaud Ebalard <arnaud.ebalard@eads.net>
7
8"""
9Routing and network interface handling for IPv6.
10"""
11
12#############################################################################
13#                        Routing/Interfaces stuff                           #
14#############################################################################
15
16import socket
17from scapy.config import conf
18from scapy.interfaces import resolve_iface, NetworkInterface
19from scapy.utils6 import in6_ptop, in6_cidr2mask, in6_and, \
20    in6_islladdr, in6_ismlladdr, in6_isincluded, in6_isgladdr, \
21    in6_isaddr6to4, in6_ismaddr, construct_source_candidate_set, \
22    get_source_addr_from_candidate_set
23from scapy.arch import read_routes6, in6_getifaddr
24from scapy.pton_ntop import inet_pton, inet_ntop
25from scapy.error import warning, log_loading
26from scapy.utils import pretty_list
27
28from typing import (
29    Any,
30    Dict,
31    List,
32    Optional,
33    Set,
34    Tuple,
35    Union,
36)
37
38
39class Route6:
40
41    def __init__(self):
42        # type: () -> None
43        self.routes = []  # type: List[Tuple[str, int, str, str, List[str], int]]  # noqa: E501
44        self.ipv6_ifaces = set()  # type: Set[Union[str, NetworkInterface]]
45        self.invalidate_cache()
46        if conf.route6_autoload:
47            self.resync()
48
49    def invalidate_cache(self):
50        # type: () -> None
51        self.cache = {}  # type: Dict[str, Tuple[str, str, str]]
52
53    def flush(self):
54        # type: () -> None
55        self.invalidate_cache()
56        self.routes.clear()
57        self.ipv6_ifaces.clear()
58
59    def resync(self):
60        # type: () -> None
61        # TODO : At the moment, resync will drop existing Teredo routes
62        #        if any. Change that ...
63        self.invalidate_cache()
64        self.routes = read_routes6()
65        self.ipv6_ifaces = set()
66        for route in self.routes:
67            self.ipv6_ifaces.add(route[3])
68        if self.routes == []:
69            log_loading.info("No IPv6 support in kernel")
70
71    def __repr__(self):
72        # type: () -> str
73        rtlst = []  # type: List[Tuple[Union[str, List[str]], ...]]
74
75        for net, msk, gw, iface, cset, metric in self.routes:
76            if_repr = resolve_iface(iface).description
77            rtlst.append(('%s/%i' % (net, msk),
78                          gw,
79                          if_repr,
80                          cset,
81                          str(metric)))
82
83        return pretty_list(rtlst,
84                           [('Destination', 'Next Hop', "Iface", "Src candidates", "Metric")],  # noqa: E501
85                           sortBy=1)
86
87    # Unlike Scapy's Route.make_route() function, we do not have 'host' and 'net'  # noqa: E501
88    # parameters. We only have a 'dst' parameter that accepts 'prefix' and
89    # 'prefix/prefixlen' values.
90    def make_route(self,
91                   dst,  # type: str
92                   gw=None,  # type: Optional[str]
93                   dev=None,  # type: Optional[str]
94                   ):
95        # type: (...) -> Tuple[str, int, str, str, List[str], int]
96        """Internal function : create a route for 'dst' via 'gw'.
97        """
98        prefix, plen_b = (dst.split("/") + ["128"])[:2]
99        plen = int(plen_b)
100
101        if gw is None:
102            gw = "::"
103        if dev is None:
104            dev, ifaddr_uniq, x = self.route(gw)
105            ifaddr = [ifaddr_uniq]
106        else:
107            lifaddr = in6_getifaddr()
108            devaddrs = (x for x in lifaddr if x[2] == dev)
109            ifaddr = construct_source_candidate_set(prefix, plen, devaddrs)
110
111        self.ipv6_ifaces.add(dev)
112
113        return (prefix, plen, gw, dev, ifaddr, 1)
114
115    def add(self, *args, **kargs):
116        # type: (*Any, **Any) -> None
117        """Ex:
118        add(dst="2001:db8:cafe:f000::/56")
119        add(dst="2001:db8:cafe:f000::/56", gw="2001:db8:cafe::1")
120        add(dst="2001:db8:cafe:f000::/64", gw="2001:db8:cafe::1", dev="eth0")
121        """
122        self.invalidate_cache()
123        self.routes.append(self.make_route(*args, **kargs))
124
125    def remove_ipv6_iface(self, iface):
126        # type: (str) -> None
127        """
128        Remove the network interface 'iface' from the list of interfaces
129        supporting IPv6.
130        """
131
132        if not all(r[3] == iface for r in conf.route6.routes):
133            try:
134                self.ipv6_ifaces.remove(iface)
135            except KeyError:
136                pass
137
138    def delt(self, dst, gw=None):
139        # type: (str, Optional[str]) -> None
140        """ Ex:
141        delt(dst="::/0")
142        delt(dst="2001:db8:cafe:f000::/56")
143        delt(dst="2001:db8:cafe:f000::/56", gw="2001:db8:deca::1")
144        """
145        tmp = dst + "/128"
146        dst, plen_b = tmp.split('/')[:2]
147        dst = in6_ptop(dst)
148        plen = int(plen_b)
149        to_del = [x for x in self.routes
150                  if in6_ptop(x[0]) == dst and x[1] == plen]
151        if gw:
152            gw = in6_ptop(gw)
153            to_del = [x for x in self.routes if in6_ptop(x[2]) == gw]
154        if len(to_del) == 0:
155            warning("No matching route found")
156        elif len(to_del) > 1:
157            warning("Found more than one match. Aborting.")
158        else:
159            i = self.routes.index(to_del[0])
160            self.invalidate_cache()
161            self.remove_ipv6_iface(self.routes[i][3])
162            del self.routes[i]
163
164    def ifchange(self, iff, addr):
165        # type: (str, str) -> None
166        the_addr, the_plen_b = (addr.split("/") + ["128"])[:2]
167        the_plen = int(the_plen_b)
168
169        naddr = inet_pton(socket.AF_INET6, the_addr)
170        nmask = in6_cidr2mask(the_plen)
171        the_net = inet_ntop(socket.AF_INET6, in6_and(nmask, naddr))
172
173        for i, route in enumerate(self.routes):
174            net, plen, gw, iface, _, metric = route
175            if iface != iff:
176                continue
177
178            self.ipv6_ifaces.add(iface)
179
180            if gw == '::':
181                self.routes[i] = (the_net, the_plen, gw, iface, [the_addr], metric)  # noqa: E501
182            else:
183                self.routes[i] = (net, plen, gw, iface, [the_addr], metric)
184        self.invalidate_cache()
185        conf.netcache.in6_neighbor.flush()  # type: ignore
186
187    def ifdel(self, iff):
188        # type: (str) -> None
189        """ removes all route entries that uses 'iff' interface. """
190        new_routes = []
191        for rt in self.routes:
192            if rt[3] != iff:
193                new_routes.append(rt)
194        self.invalidate_cache()
195        self.routes = new_routes
196        self.remove_ipv6_iface(iff)
197
198    def ifadd(self, iff, addr):
199        # type: (str, str) -> None
200        """
201        Add an interface 'iff' with provided address into routing table.
202
203        Ex: ifadd('eth0', '2001:bd8:cafe:1::1/64') will add following entry into  # noqa: E501
204            Scapy6 internal routing table:
205
206            Destination           Next Hop  iface  Def src @           Metric
207            2001:bd8:cafe:1::/64  ::        eth0   2001:bd8:cafe:1::1  1
208
209            prefix length value can be omitted. In that case, a value of 128
210            will be used.
211        """
212        addr, plen_b = (addr.split("/") + ["128"])[:2]
213        addr = in6_ptop(addr)
214        plen = int(plen_b)
215        naddr = inet_pton(socket.AF_INET6, addr)
216        nmask = in6_cidr2mask(plen)
217        prefix = inet_ntop(socket.AF_INET6, in6_and(nmask, naddr))
218        self.invalidate_cache()
219        self.routes.append((prefix, plen, '::', iff, [addr], 1))
220        self.ipv6_ifaces.add(iff)
221
222    def route(self, dst="", dev=None, verbose=conf.verb):
223        # type: (str, Optional[str], int) -> Tuple[str, str, str]
224        """
225        Provide best route to IPv6 destination address, based on Scapy
226        internal routing table content.
227
228        When a set of address is passed (e.g. ``2001:db8:cafe:*::1-5``) an
229        address of the set is used. Be aware of that behavior when using
230        wildcards in upper parts of addresses !
231
232        If 'dst' parameter is a FQDN, name resolution is performed and result
233        is used.
234
235        if optional 'dev' parameter is provided a specific interface, filtering
236        is performed to limit search to route associated to that interface.
237        """
238        dst = dst or "::/0"  # Enable route(None) to return default route
239        # Transform "2001:db8:cafe:*::1-5:0/120" to one IPv6 address of the set
240        dst = dst.split("/")[0]
241        savedst = dst  # In case following inet_pton() fails
242        dst = dst.replace("*", "0")
243        idx = dst.find("-")
244        while idx >= 0:
245            m = (dst[idx:] + ":").find(":")
246            dst = dst[:idx] + dst[idx + m:]
247            idx = dst.find("-")
248
249        try:
250            inet_pton(socket.AF_INET6, dst)
251        except socket.error:
252            dst = socket.getaddrinfo(savedst, None, socket.AF_INET6)[0][-1][0]
253            # TODO : Check if name resolution went well
254
255        # Deal with dev-specific request for cache search
256        k = dst
257        if dev is not None:
258            k = dst + "%%" + dev
259        if k in self.cache:
260            return self.cache[k]
261
262        paths = []  # type: List[Tuple[int, int, Tuple[str, List[str], str]]]
263
264        # TODO : review all kinds of addresses (scope and *cast) to see
265        #        if we are able to cope with everything possible. I'm convinced
266        #        it's not the case.
267        # -- arnaud
268        for p, plen, gw, iface, cset, me in self.routes:
269            if dev is not None and iface != dev:
270                continue
271            if in6_isincluded(dst, p, plen):
272                paths.append((plen, me, (iface, cset, gw)))
273            elif (in6_ismlladdr(dst) and in6_islladdr(p) and in6_islladdr(cset[0])):  # noqa: E501
274                paths.append((plen, me, (iface, cset, gw)))
275
276        if not paths:
277            if dst == "::1":
278                return (conf.loopback_name, "::1", "::")
279            else:
280                if verbose:
281                    warning("No route found for IPv6 destination %s "
282                            "(no default route?)", dst)
283                return (dev or conf.loopback_name, "::", "::")
284
285        # Sort with longest prefix first then use metrics as a tie-breaker
286        paths.sort(key=lambda x: (-x[0], x[1]))
287
288        best_plen = (paths[0][0], paths[0][1])
289        paths = [x for x in paths if (x[0], x[1]) == best_plen]
290
291        res = []  # type: List[Tuple[int, int, Tuple[str, str, str]]]
292        for path in paths:  # we select best source address for every route
293            tmp_c = path[2]
294            srcaddr = get_source_addr_from_candidate_set(dst, tmp_c[1])
295            if srcaddr is not None:
296                res.append((path[0], path[1], (tmp_c[0], srcaddr, tmp_c[2])))
297
298        if res == []:
299            warning("Found a route for IPv6 destination '%s', but no possible source address.", dst)  # noqa: E501
300            return (conf.loopback_name, "::", "::")
301
302        # Symptom  : 2 routes with same weight (our weight is plen)
303        # Solution :
304        #  - dst is unicast global. Check if it is 6to4 and we have a source
305        #    6to4 address in those available
306        #  - dst is link local (unicast or multicast) and multiple output
307        #    interfaces are available. Take main one (conf.iface)
308        #  - if none of the previous or ambiguity persists, be lazy and keep
309        #    first one
310
311        if len(res) > 1:
312            tmp = []  # type: List[Tuple[int, int, Tuple[str, str, str]]]
313            if in6_isgladdr(dst) and in6_isaddr6to4(dst):
314                # TODO : see if taking the longest match between dst and
315                #        every source addresses would provide better results
316                tmp = [x for x in res if in6_isaddr6to4(x[2][1])]
317            elif in6_ismaddr(dst) or in6_islladdr(dst):
318                # TODO : I'm sure we are not covering all addresses. Check that
319                tmp = [x for x in res if x[2][0] == conf.iface]
320
321            if tmp:
322                res = tmp
323
324        # Fill the cache (including dev-specific request)
325        k = dst
326        if dev is not None:
327            k = dst + "%%" + dev
328        self.cache[k] = res[0][2]
329
330        return res[0][2]
331
332
333conf.route6 = Route6()
334