1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7Routing and handling of network interfaces. 8""" 9 10 11from scapy.compat import plain_str 12from scapy.config import conf 13from scapy.error import Scapy_Exception, warning 14from scapy.interfaces import resolve_iface 15from scapy.utils import atol, ltoa, itom, pretty_list 16 17from typing import ( 18 Any, 19 Dict, 20 List, 21 Optional, 22 Tuple, 23 Union, 24) 25 26 27############################## 28# Routing/Interfaces stuff # 29############################## 30 31class Route: 32 def __init__(self): 33 # type: () -> None 34 self.routes = [] # type: List[Tuple[int, int, str, str, str, int]] 35 self.invalidate_cache() 36 if conf.route_autoload: 37 self.resync() 38 39 def invalidate_cache(self): 40 # type: () -> None 41 self.cache = {} # type: Dict[Tuple[str, Optional[str]], Tuple[str, str, str]] 42 43 def resync(self): 44 # type: () -> None 45 from scapy.arch import read_routes 46 self.invalidate_cache() 47 self.routes = read_routes() 48 49 def __repr__(self): 50 # type: () -> str 51 rtlst = [] # type: List[Tuple[Union[str, List[str]], ...]] 52 for net, msk, gw, iface, addr, metric in self.routes: 53 if_repr = resolve_iface(iface).description 54 rtlst.append((ltoa(net), 55 ltoa(msk), 56 gw, 57 if_repr, 58 addr, 59 str(metric))) 60 61 return pretty_list(rtlst, 62 [("Network", "Netmask", "Gateway", "Iface", "Output IP", "Metric")]) # noqa: E501 63 64 def make_route(self, 65 host=None, # type: Optional[str] 66 net=None, # type: Optional[str] 67 gw=None, # type: Optional[str] 68 dev=None, # type: Optional[str] 69 metric=1, # type: int 70 ): 71 # type: (...) -> Tuple[int, int, str, str, str, int] 72 if host is not None: 73 thenet, msk = host, 32 74 elif net is not None: 75 thenet, msk_b = net.split("/") 76 msk = int(msk_b) 77 else: 78 raise Scapy_Exception("make_route: Incorrect parameters. You should specify a host or a net") # noqa: E501 79 if gw is None: 80 gw = "0.0.0.0" 81 if dev is None: 82 if gw: 83 nhop = gw 84 else: 85 nhop = thenet 86 dev, ifaddr, _ = self.route(nhop) 87 else: 88 ifaddr = "0.0.0.0" # acts as a 'via' in `ip addr add` 89 return (atol(thenet), itom(msk), gw, dev, ifaddr, metric) 90 91 def add(self, *args, **kargs): 92 # type: (*Any, **Any) -> None 93 """Add a route to Scapy's IPv4 routing table. 94 add(host|net, gw|dev) 95 96 :param host: single IP to consider (/32) 97 :param net: range to consider 98 :param gw: gateway 99 :param dev: force the interface to use 100 :param metric: route metric 101 102 Examples: 103 104 - `ip route add 192.168.1.0/24 via 192.168.0.254`:: 105 >>> conf.route.add(net="192.168.1.0/24", gw="192.168.0.254") 106 107 - `ip route add 192.168.1.0/24 dev eth0`:: 108 >>> conf.route.add(net="192.168.1.0/24", dev="eth0") 109 110 - `ip route add 192.168.1.0/24 via 192.168.0.254 metric 1`:: 111 >>> conf.route.add(net="192.168.1.0/24", gw="192.168.0.254", metric=1) 112 """ 113 self.invalidate_cache() 114 self.routes.append(self.make_route(*args, **kargs)) 115 116 def delt(self, *args, **kargs): 117 # type: (*Any, **Any) -> None 118 """Remove a route from Scapy's IPv4 routing table. 119 delt(host|net, gw|dev) 120 121 Same syntax as add() 122 """ 123 self.invalidate_cache() 124 route = self.make_route(*args, **kargs) 125 try: 126 i = self.routes.index(route) 127 del self.routes[i] 128 except ValueError: 129 raise ValueError("No matching route found!") 130 131 def ifchange(self, iff, addr): 132 # type: (str, str) -> None 133 self.invalidate_cache() 134 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2] 135 the_msk = itom(int(the_msk_b)) 136 the_rawaddr = atol(the_addr) 137 the_net = the_rawaddr & the_msk 138 139 for i, route in enumerate(self.routes): 140 net, msk, gw, iface, addr, metric = route 141 if iff != iface: 142 continue 143 if gw == '0.0.0.0': 144 self.routes[i] = (the_net, the_msk, gw, iface, the_addr, metric) # noqa: E501 145 else: 146 self.routes[i] = (net, msk, gw, iface, the_addr, metric) 147 conf.netcache.flush() 148 149 def ifdel(self, iff): 150 # type: (str) -> None 151 self.invalidate_cache() 152 new_routes = [] 153 for rt in self.routes: 154 if iff == rt[3]: 155 continue 156 new_routes.append(rt) 157 self.routes = new_routes 158 159 def ifadd(self, iff, addr): 160 # type: (str, str) -> None 161 self.invalidate_cache() 162 the_addr, the_msk_b = (addr.split("/") + ["32"])[:2] 163 the_msk = itom(int(the_msk_b)) 164 the_rawaddr = atol(the_addr) 165 the_net = the_rawaddr & the_msk 166 self.routes.append((the_net, the_msk, '0.0.0.0', iff, the_addr, 1)) 167 168 def route(self, dst=None, dev=None, verbose=conf.verb, _internal=False): 169 # type: (Optional[str], Optional[str], int, bool) -> Tuple[str, str, str] 170 """Returns the IPv4 routes to a host. 171 172 :param dst: the IPv4 of the destination host 173 :param dev: (optional) filtering is performed to limit search to route 174 associated to that interface. 175 176 :returns: tuple (iface, output_ip, gateway_ip) where 177 - ``iface``: the interface used to connect to the host 178 - ``output_ip``: the outgoing IP that will be used 179 - ``gateway_ip``: the gateway IP that will be used 180 """ 181 dst = dst or "0.0.0.0" # Enable route(None) to return default route 182 if isinstance(dst, bytes): 183 try: 184 dst = plain_str(dst) 185 except UnicodeDecodeError: 186 raise TypeError("Unknown IP address input (bytes)") 187 if (dst, dev) in self.cache: 188 return self.cache[(dst, dev)] 189 # Transform "192.168.*.1-5" to one IP of the set 190 _dst = dst.split("/")[0].replace("*", "0") 191 while True: 192 idx = _dst.find("-") 193 if idx < 0: 194 break 195 m = (_dst[idx:] + ".").find(".") 196 _dst = _dst[:idx] + _dst[idx + m:] 197 198 atol_dst = atol(_dst) 199 paths = [] 200 for d, m, gw, i, a, me in self.routes: 201 if not a: # some interfaces may not currently be connected 202 continue 203 if dev is not None and i != dev: 204 continue 205 aa = atol(a) 206 if aa == atol_dst: 207 paths.append( 208 (0xffffffff, 1, (conf.loopback_name, a, "0.0.0.0")) # noqa: E501 209 ) 210 if (atol_dst & m) == (d & m): 211 paths.append((m, me, (i, a, gw))) 212 213 if not paths: 214 if verbose: 215 warning("No route found for IPv4 destination %s " 216 "(no default route?)", dst) 217 return (dev or conf.loopback_name, "0.0.0.0", "0.0.0.0") 218 # Choose the more specific route 219 # Sort by greatest netmask and use metrics as a tie-breaker 220 paths.sort(key=lambda x: (-x[0], x[1])) 221 # Return interface 222 ret = paths[0][2] 223 # Check if source is 0.0.0.0. This is a 'via' route with no src. 224 if ret[1] == "0.0.0.0" and not _internal: 225 # Then get the source from route(gw) 226 ret = (ret[0], self.route(ret[2], _internal=True)[1], ret[2]) 227 self.cache[(dst, dev)] = ret 228 return ret 229 230 def get_if_bcast(self, iff): 231 # type: (str) -> List[str] 232 bcast_list = [] 233 for net, msk, gw, iface, addr, metric in self.routes: 234 if net == 0: 235 continue # Ignore default route "0.0.0.0" 236 elif msk == 0xffffffff: 237 continue # Ignore host-specific routes 238 if iff != iface: 239 continue 240 bcast = net | (~msk & 0xffffffff) 241 bcast_list.append(ltoa(bcast)) 242 if not bcast_list: 243 warning("No broadcast address found for iface %s\n", iff) 244 return bcast_list 245 246 247conf.route = Route() 248 249# Update conf.iface 250conf.ifaces.load_confiface() 251