1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5# Copyright (C) 2005 Guillaume Valadon <guedou@hongo.wide.ad.jp> 6# Arnaud Ebalard <arnaud.ebalard@eads.net> 7 8""" 9Routing and network interface handling for IPv6. 10""" 11 12############################################################################# 13# Routing/Interfaces stuff # 14############################################################################# 15 16import socket 17from scapy.config import conf 18from scapy.interfaces import resolve_iface, NetworkInterface 19from scapy.utils6 import in6_ptop, in6_cidr2mask, in6_and, \ 20 in6_islladdr, in6_ismlladdr, in6_isincluded, in6_isgladdr, \ 21 in6_isaddr6to4, in6_ismaddr, construct_source_candidate_set, \ 22 get_source_addr_from_candidate_set 23from scapy.arch import read_routes6, in6_getifaddr 24from scapy.pton_ntop import inet_pton, inet_ntop 25from scapy.error import warning, log_loading 26from scapy.utils import pretty_list 27 28from typing import ( 29 Any, 30 Dict, 31 List, 32 Optional, 33 Set, 34 Tuple, 35 Union, 36) 37 38 39class Route6: 40 41 def __init__(self): 42 # type: () -> None 43 self.routes = [] # type: List[Tuple[str, int, str, str, List[str], int]] # noqa: E501 44 self.ipv6_ifaces = set() # type: Set[Union[str, NetworkInterface]] 45 self.invalidate_cache() 46 if conf.route6_autoload: 47 self.resync() 48 49 def invalidate_cache(self): 50 # type: () -> None 51 self.cache = {} # type: Dict[str, Tuple[str, str, str]] 52 53 def flush(self): 54 # type: () -> None 55 self.invalidate_cache() 56 self.routes.clear() 57 self.ipv6_ifaces.clear() 58 59 def resync(self): 60 # type: () -> None 61 # TODO : At the moment, resync will drop existing Teredo routes 62 # if any. Change that ... 63 self.invalidate_cache() 64 self.routes = read_routes6() 65 self.ipv6_ifaces = set() 66 for route in self.routes: 67 self.ipv6_ifaces.add(route[3]) 68 if self.routes == []: 69 log_loading.info("No IPv6 support in kernel") 70 71 def __repr__(self): 72 # type: () -> str 73 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 74 75 for net, msk, gw, iface, cset, metric in self.routes: 76 if_repr = resolve_iface(iface).description 77 rtlst.append(('%s/%i' % (net, msk), 78 gw, 79 if_repr, 80 cset, 81 str(metric))) 82 83 return pretty_list(rtlst, 84 [('Destination', 'Next Hop', "Iface", "Src candidates", "Metric")], # noqa: E501 85 sortBy=1) 86 87 # Unlike Scapy's Route.make_route() function, we do not have 'host' and 'net' # noqa: E501 88 # parameters. We only have a 'dst' parameter that accepts 'prefix' and 89 # 'prefix/prefixlen' values. 90 def make_route(self, 91 dst, # type: str 92 gw=None, # type: Optional[str] 93 dev=None, # type: Optional[str] 94 ): 95 # type: (...) -> Tuple[str, int, str, str, List[str], int] 96 """Internal function : create a route for 'dst' via 'gw'. 97 """ 98 prefix, plen_b = (dst.split("/") + ["128"])[:2] 99 plen = int(plen_b) 100 101 if gw is None: 102 gw = "::" 103 if dev is None: 104 dev, ifaddr_uniq, x = self.route(gw) 105 ifaddr = [ifaddr_uniq] 106 else: 107 lifaddr = in6_getifaddr() 108 devaddrs = (x for x in lifaddr if x[2] == dev) 109 ifaddr = construct_source_candidate_set(prefix, plen, devaddrs) 110 111 self.ipv6_ifaces.add(dev) 112 113 return (prefix, plen, gw, dev, ifaddr, 1) 114 115 def add(self, *args, **kargs): 116 # type: (*Any, **Any) -> None 117 """Ex: 118 add(dst="2001:db8:cafe:f000::/56") 119 add(dst="2001:db8:cafe:f000::/56", gw="2001:db8:cafe::1") 120 add(dst="2001:db8:cafe:f000::/64", gw="2001:db8:cafe::1", dev="eth0") 121 """ 122 self.invalidate_cache() 123 self.routes.append(self.make_route(*args, **kargs)) 124 125 def remove_ipv6_iface(self, iface): 126 # type: (str) -> None 127 """ 128 Remove the network interface 'iface' from the list of interfaces 129 supporting IPv6. 130 """ 131 132 if not all(r[3] == iface for r in conf.route6.routes): 133 try: 134 self.ipv6_ifaces.remove(iface) 135 except KeyError: 136 pass 137 138 def delt(self, dst, gw=None): 139 # type: (str, Optional[str]) -> None 140 """ Ex: 141 delt(dst="::/0") 142 delt(dst="2001:db8:cafe:f000::/56") 143 delt(dst="2001:db8:cafe:f000::/56", gw="2001:db8:deca::1") 144 """ 145 tmp = dst + "/128" 146 dst, plen_b = tmp.split('/')[:2] 147 dst = in6_ptop(dst) 148 plen = int(plen_b) 149 to_del = [x for x in self.routes 150 if in6_ptop(x[0]) == dst and x[1] == plen] 151 if gw: 152 gw = in6_ptop(gw) 153 to_del = [x for x in self.routes if in6_ptop(x[2]) == gw] 154 if len(to_del) == 0: 155 warning("No matching route found") 156 elif len(to_del) > 1: 157 warning("Found more than one match. Aborting.") 158 else: 159 i = self.routes.index(to_del[0]) 160 self.invalidate_cache() 161 self.remove_ipv6_iface(self.routes[i][3]) 162 del self.routes[i] 163 164 def ifchange(self, iff, addr): 165 # type: (str, str) -> None 166 the_addr, the_plen_b = (addr.split("/") + ["128"])[:2] 167 the_plen = int(the_plen_b) 168 169 naddr = inet_pton(socket.AF_INET6, the_addr) 170 nmask = in6_cidr2mask(the_plen) 171 the_net = inet_ntop(socket.AF_INET6, in6_and(nmask, naddr)) 172 173 for i, route in enumerate(self.routes): 174 net, plen, gw, iface, _, metric = route 175 if iface != iff: 176 continue 177 178 self.ipv6_ifaces.add(iface) 179 180 if gw == '::': 181 self.routes[i] = (the_net, the_plen, gw, iface, [the_addr], metric) # noqa: E501 182 else: 183 self.routes[i] = (net, plen, gw, iface, [the_addr], metric) 184 self.invalidate_cache() 185 conf.netcache.in6_neighbor.flush() # type: ignore 186 187 def ifdel(self, iff): 188 # type: (str) -> None 189 """ removes all route entries that uses 'iff' interface. """ 190 new_routes = [] 191 for rt in self.routes: 192 if rt[3] != iff: 193 new_routes.append(rt) 194 self.invalidate_cache() 195 self.routes = new_routes 196 self.remove_ipv6_iface(iff) 197 198 def ifadd(self, iff, addr): 199 # type: (str, str) -> None 200 """ 201 Add an interface 'iff' with provided address into routing table. 202 203 Ex: ifadd('eth0', '2001:bd8:cafe:1::1/64') will add following entry into # noqa: E501 204 Scapy6 internal routing table: 205 206 Destination Next Hop iface Def src @ Metric 207 2001:bd8:cafe:1::/64 :: eth0 2001:bd8:cafe:1::1 1 208 209 prefix length value can be omitted. In that case, a value of 128 210 will be used. 211 """ 212 addr, plen_b = (addr.split("/") + ["128"])[:2] 213 addr = in6_ptop(addr) 214 plen = int(plen_b) 215 naddr = inet_pton(socket.AF_INET6, addr) 216 nmask = in6_cidr2mask(plen) 217 prefix = inet_ntop(socket.AF_INET6, in6_and(nmask, naddr)) 218 self.invalidate_cache() 219 self.routes.append((prefix, plen, '::', iff, [addr], 1)) 220 self.ipv6_ifaces.add(iff) 221 222 def route(self, dst="", dev=None, verbose=conf.verb): 223 # type: (str, Optional[str], int) -> Tuple[str, str, str] 224 """ 225 Provide best route to IPv6 destination address, based on Scapy 226 internal routing table content. 227 228 When a set of address is passed (e.g. ``2001:db8:cafe:*::1-5``) an 229 address of the set is used. Be aware of that behavior when using 230 wildcards in upper parts of addresses ! 231 232 If 'dst' parameter is a FQDN, name resolution is performed and result 233 is used. 234 235 if optional 'dev' parameter is provided a specific interface, filtering 236 is performed to limit search to route associated to that interface. 237 """ 238 dst = dst or "::/0" # Enable route(None) to return default route 239 # Transform "2001:db8:cafe:*::1-5:0/120" to one IPv6 address of the set 240 dst = dst.split("/")[0] 241 savedst = dst # In case following inet_pton() fails 242 dst = dst.replace("*", "0") 243 idx = dst.find("-") 244 while idx >= 0: 245 m = (dst[idx:] + ":").find(":") 246 dst = dst[:idx] + dst[idx + m:] 247 idx = dst.find("-") 248 249 try: 250 inet_pton(socket.AF_INET6, dst) 251 except socket.error: 252 dst = socket.getaddrinfo(savedst, None, socket.AF_INET6)[0][-1][0] 253 # TODO : Check if name resolution went well 254 255 # Deal with dev-specific request for cache search 256 k = dst 257 if dev is not None: 258 k = dst + "%%" + dev 259 if k in self.cache: 260 return self.cache[k] 261 262 paths = [] # type: List[Tuple[int, int, Tuple[str, List[str], str]]] 263 264 # TODO : review all kinds of addresses (scope and *cast) to see 265 # if we are able to cope with everything possible. I'm convinced 266 # it's not the case. 267 # -- arnaud 268 for p, plen, gw, iface, cset, me in self.routes: 269 if dev is not None and iface != dev: 270 continue 271 if in6_isincluded(dst, p, plen): 272 paths.append((plen, me, (iface, cset, gw))) 273 elif (in6_ismlladdr(dst) and in6_islladdr(p) and in6_islladdr(cset[0])): # noqa: E501 274 paths.append((plen, me, (iface, cset, gw))) 275 276 if not paths: 277 if dst == "::1": 278 return (conf.loopback_name, "::1", "::") 279 else: 280 if verbose: 281 warning("No route found for IPv6 destination %s " 282 "(no default route?)", dst) 283 return (dev or conf.loopback_name, "::", "::") 284 285 # Sort with longest prefix first then use metrics as a tie-breaker 286 paths.sort(key=lambda x: (-x[0], x[1])) 287 288 best_plen = (paths[0][0], paths[0][1]) 289 paths = [x for x in paths if (x[0], x[1]) == best_plen] 290 291 res = [] # type: List[Tuple[int, int, Tuple[str, str, str]]] 292 for path in paths: # we select best source address for every route 293 tmp_c = path[2] 294 srcaddr = get_source_addr_from_candidate_set(dst, tmp_c[1]) 295 if srcaddr is not None: 296 res.append((path[0], path[1], (tmp_c[0], srcaddr, tmp_c[2]))) 297 298 if res == []: 299 warning("Found a route for IPv6 destination '%s', but no possible source address.", dst) # noqa: E501 300 return (conf.loopback_name, "::", "::") 301 302 # Symptom : 2 routes with same weight (our weight is plen) 303 # Solution : 304 # - dst is unicast global. Check if it is 6to4 and we have a source 305 # 6to4 address in those available 306 # - dst is link local (unicast or multicast) and multiple output 307 # interfaces are available. Take main one (conf.iface) 308 # - if none of the previous or ambiguity persists, be lazy and keep 309 # first one 310 311 if len(res) > 1: 312 tmp = [] # type: List[Tuple[int, int, Tuple[str, str, str]]] 313 if in6_isgladdr(dst) and in6_isaddr6to4(dst): 314 # TODO : see if taking the longest match between dst and 315 # every source addresses would provide better results 316 tmp = [x for x in res if in6_isaddr6to4(x[2][1])] 317 elif in6_ismaddr(dst) or in6_islladdr(dst): 318 # TODO : I'm sure we are not covering all addresses. Check that 319 tmp = [x for x in res if x[2][0] == conf.iface] 320 321 if tmp: 322 res = tmp 323 324 # Fill the cache (including dev-specific request) 325 k = dst 326 if dev is not None: 327 k = dst + "%%" + dev 328 self.cache[k] = res[0][2] 329 330 return res[0][2] 331 332 333conf.route6 = Route6() 334