• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Routing and handling of network interfaces.
8"""
9
10
11from scapy.compat import plain_str
12from scapy.config import conf
13from scapy.error import Scapy_Exception, warning
14from scapy.interfaces import resolve_iface
15from scapy.utils import atol, ltoa, itom, pretty_list
16
17from typing import (
18    Any,
19    Dict,
20    List,
21    Optional,
22    Tuple,
23    Union,
24)
25
26
27##############################
28#  Routing/Interfaces stuff  #
29##############################
30
31class Route:
32    def __init__(self):
33        # type: () -> None
34        self.routes = []  # type: List[Tuple[int, int, str, str, str, int]]
35        self.invalidate_cache()
36        if conf.route_autoload:
37            self.resync()
38
39    def invalidate_cache(self):
40        # type: () -> None
41        self.cache = {}  # type: Dict[Tuple[str, Optional[str]], Tuple[str, str, str]]
42
43    def resync(self):
44        # type: () -> None
45        from scapy.arch import read_routes
46        self.invalidate_cache()
47        self.routes = read_routes()
48
49    def __repr__(self):
50        # type: () -> str
51        rtlst = []  # type: List[Tuple[Union[str, List[str]], ...]]
52        for net, msk, gw, iface, addr, metric in self.routes:
53            if_repr = resolve_iface(iface).description
54            rtlst.append((ltoa(net),
55                          ltoa(msk),
56                          gw,
57                          if_repr,
58                          addr,
59                          str(metric)))
60
61        return pretty_list(rtlst,
62                           [("Network", "Netmask", "Gateway", "Iface", "Output IP", "Metric")])  # noqa: E501
63
64    def make_route(self,
65                   host=None,  # type: Optional[str]
66                   net=None,  # type: Optional[str]
67                   gw=None,  # type: Optional[str]
68                   dev=None,  # type: Optional[str]
69                   metric=1,  # type: int
70                   ):
71        # type: (...) -> Tuple[int, int, str, str, str, int]
72        if host is not None:
73            thenet, msk = host, 32
74        elif net is not None:
75            thenet, msk_b = net.split("/")
76            msk = int(msk_b)
77        else:
78            raise Scapy_Exception("make_route: Incorrect parameters. You should specify a host or a net")  # noqa: E501
79        if gw is None:
80            gw = "0.0.0.0"
81        if dev is None:
82            if gw:
83                nhop = gw
84            else:
85                nhop = thenet
86            dev, ifaddr, _ = self.route(nhop)
87        else:
88            ifaddr = "0.0.0.0"  # acts as a 'via' in `ip addr add`
89        return (atol(thenet), itom(msk), gw, dev, ifaddr, metric)
90
91    def add(self, *args, **kargs):
92        # type: (*Any, **Any) -> None
93        """Add a route to Scapy's IPv4 routing table.
94        add(host|net, gw|dev)
95
96        :param host: single IP to consider (/32)
97        :param net: range to consider
98        :param gw: gateway
99        :param dev: force the interface to use
100        :param metric: route metric
101
102        Examples:
103
104        - `ip route add 192.168.1.0/24 via 192.168.0.254`::
105            >>> conf.route.add(net="192.168.1.0/24", gw="192.168.0.254")
106
107        - `ip route add 192.168.1.0/24 dev eth0`::
108            >>> conf.route.add(net="192.168.1.0/24", dev="eth0")
109
110        - `ip route add 192.168.1.0/24 via 192.168.0.254 metric 1`::
111            >>> conf.route.add(net="192.168.1.0/24", gw="192.168.0.254", metric=1)
112        """
113        self.invalidate_cache()
114        self.routes.append(self.make_route(*args, **kargs))
115
116    def delt(self, *args, **kargs):
117        # type: (*Any, **Any) -> None
118        """Remove a route from Scapy's IPv4 routing table.
119        delt(host|net, gw|dev)
120
121        Same syntax as add()
122        """
123        self.invalidate_cache()
124        route = self.make_route(*args, **kargs)
125        try:
126            i = self.routes.index(route)
127            del self.routes[i]
128        except ValueError:
129            raise ValueError("No matching route found!")
130
131    def ifchange(self, iff, addr):
132        # type: (str, str) -> None
133        self.invalidate_cache()
134        the_addr, the_msk_b = (addr.split("/") + ["32"])[:2]
135        the_msk = itom(int(the_msk_b))
136        the_rawaddr = atol(the_addr)
137        the_net = the_rawaddr & the_msk
138
139        for i, route in enumerate(self.routes):
140            net, msk, gw, iface, addr, metric = route
141            if iff != iface:
142                continue
143            if gw == '0.0.0.0':
144                self.routes[i] = (the_net, the_msk, gw, iface, the_addr, metric)  # noqa: E501
145            else:
146                self.routes[i] = (net, msk, gw, iface, the_addr, metric)
147        conf.netcache.flush()
148
149    def ifdel(self, iff):
150        # type: (str) -> None
151        self.invalidate_cache()
152        new_routes = []
153        for rt in self.routes:
154            if iff == rt[3]:
155                continue
156            new_routes.append(rt)
157        self.routes = new_routes
158
159    def ifadd(self, iff, addr):
160        # type: (str, str) -> None
161        self.invalidate_cache()
162        the_addr, the_msk_b = (addr.split("/") + ["32"])[:2]
163        the_msk = itom(int(the_msk_b))
164        the_rawaddr = atol(the_addr)
165        the_net = the_rawaddr & the_msk
166        self.routes.append((the_net, the_msk, '0.0.0.0', iff, the_addr, 1))
167
168    def route(self, dst=None, dev=None, verbose=conf.verb, _internal=False):
169        # type: (Optional[str], Optional[str], int, bool) -> Tuple[str, str, str]
170        """Returns the IPv4 routes to a host.
171
172        :param dst: the IPv4 of the destination host
173        :param dev: (optional) filtering is performed to limit search to route
174                    associated to that interface.
175
176        :returns: tuple (iface, output_ip, gateway_ip) where
177            - ``iface``: the interface used to connect to the host
178            - ``output_ip``: the outgoing IP that will be used
179            - ``gateway_ip``: the gateway IP that will be used
180        """
181        dst = dst or "0.0.0.0"  # Enable route(None) to return default route
182        if isinstance(dst, bytes):
183            try:
184                dst = plain_str(dst)
185            except UnicodeDecodeError:
186                raise TypeError("Unknown IP address input (bytes)")
187        if (dst, dev) in self.cache:
188            return self.cache[(dst, dev)]
189        # Transform "192.168.*.1-5" to one IP of the set
190        _dst = dst.split("/")[0].replace("*", "0")
191        while True:
192            idx = _dst.find("-")
193            if idx < 0:
194                break
195            m = (_dst[idx:] + ".").find(".")
196            _dst = _dst[:idx] + _dst[idx + m:]
197
198        atol_dst = atol(_dst)
199        paths = []
200        for d, m, gw, i, a, me in self.routes:
201            if not a:  # some interfaces may not currently be connected
202                continue
203            if dev is not None and i != dev:
204                continue
205            aa = atol(a)
206            if aa == atol_dst:
207                paths.append(
208                    (0xffffffff, 1, (conf.loopback_name, a, "0.0.0.0"))  # noqa: E501
209                )
210            if (atol_dst & m) == (d & m):
211                paths.append((m, me, (i, a, gw)))
212
213        if not paths:
214            if verbose:
215                warning("No route found for IPv4 destination %s "
216                        "(no default route?)", dst)
217            return (dev or conf.loopback_name, "0.0.0.0", "0.0.0.0")
218        # Choose the more specific route
219        # Sort by greatest netmask and use metrics as a tie-breaker
220        paths.sort(key=lambda x: (-x[0], x[1]))
221        # Return interface
222        ret = paths[0][2]
223        # Check if source is 0.0.0.0. This is a 'via' route with no src.
224        if ret[1] == "0.0.0.0" and not _internal:
225            # Then get the source from route(gw)
226            ret = (ret[0], self.route(ret[2], _internal=True)[1], ret[2])
227        self.cache[(dst, dev)] = ret
228        return ret
229
230    def get_if_bcast(self, iff):
231        # type: (str) -> List[str]
232        bcast_list = []
233        for net, msk, gw, iface, addr, metric in self.routes:
234            if net == 0:
235                continue    # Ignore default route "0.0.0.0"
236            elif msk == 0xffffffff:
237                continue    # Ignore host-specific routes
238            if iff != iface:
239                continue
240            bcast = net | (~msk & 0xffffffff)
241            bcast_list.append(ltoa(bcast))
242        if not bcast_list:
243            warning("No broadcast address found for iface %s\n", iff)
244        return bcast_list
245
246
247conf.route = Route()
248
249# Update conf.iface
250conf.ifaces.load_confiface()
251