• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
5
6"""
7Interfaces management
8"""
9
10import itertools
11import uuid
12from collections import defaultdict
13
14from scapy.config import conf
15from scapy.consts import WINDOWS, LINUX
16from scapy.utils import pretty_list
17from scapy.utils6 import in6_isvalid
18
19# Typing imports
20import scapy
21from scapy.compat import UserDict
22from typing import (
23    cast,
24    Any,
25    DefaultDict,
26    Dict,
27    List,
28    NoReturn,
29    Optional,
30    Tuple,
31    Type,
32    Union,
33)
34
35
36class InterfaceProvider(object):
37    name = "Unknown"
38    headers: Tuple[str, ...] = ("Index", "Name", "MAC", "IPv4", "IPv6")
39    header_sort = 1
40    libpcap = False
41
42    def load(self):
43        # type: () -> Dict[str, NetworkInterface]
44        """Returns a dictionary of the loaded interfaces, by their
45        name."""
46        raise NotImplementedError
47
48    def reload(self):
49        # type: () -> Dict[str, NetworkInterface]
50        """Same than load() but for reloads. By default calls load"""
51        return self.load()
52
53    def _l2socket(self, dev):
54        # type: (NetworkInterface) -> Type[scapy.supersocket.SuperSocket]
55        """Return L2 socket used by interfaces of this provider"""
56        return conf.L2socket
57
58    def _l2listen(self, dev):
59        # type: (NetworkInterface) -> Type[scapy.supersocket.SuperSocket]
60        """Return L2listen socket used by interfaces of this provider"""
61        return conf.L2listen
62
63    def _l3socket(self, dev, ipv6):
64        # type: (NetworkInterface, bool) -> Type[scapy.supersocket.SuperSocket]
65        """Return L3 socket used by interfaces of this provider"""
66        if LINUX and not self.libpcap and dev.name == conf.loopback_name:
67            # handle the loopback case. see troubleshooting.rst
68            if ipv6:
69                from scapy.supersocket import L3RawSocket6
70                return cast(Type['scapy.supersocket.SuperSocket'], L3RawSocket6)
71            else:
72                from scapy.supersocket import L3RawSocket
73                return L3RawSocket
74        return conf.L3socket
75
76    def _is_valid(self, dev):
77        # type: (NetworkInterface) -> bool
78        """Returns whether an interface is valid or not"""
79        return bool((dev.ips[4] or dev.ips[6]) and dev.mac)
80
81    def _format(self,
82                dev,  # type: NetworkInterface
83                **kwargs  # type: Any
84                ):
85        # type: (...) -> Tuple[Union[str, List[str]], ...]
86        """Returns the elements used by show()
87
88        If a tuple is returned, this consist of the strings that will be
89        inlined along with the interface.
90        If a list of tuples is returned, they will be appended one above the
91        other and should all be part of a single interface.
92        """
93        mac = dev.mac
94        resolve_mac = kwargs.get("resolve_mac", True)
95        if resolve_mac and conf.manufdb and mac:
96            mac = conf.manufdb._resolve_MAC(mac)
97        index = str(dev.index)
98        return (index, dev.description, mac or "", dev.ips[4], dev.ips[6])
99
100    def __repr__(self) -> str:
101        """
102        repr
103        """
104        return "<InterfaceProvider: %s>" % self.name
105
106
107class NetworkInterface(object):
108    def __init__(self,
109                 provider,  # type: InterfaceProvider
110                 data=None,  # type: Optional[Dict[str, Any]]
111                 ):
112        # type: (...) -> None
113        self.provider = provider
114        self.name = ""
115        self.description = ""
116        self.network_name = ""
117        self.index = -1
118        self.ip = None  # type: Optional[str]
119        self.ips = defaultdict(list)  # type: DefaultDict[int, List[str]]
120        self.type = -1
121        self.mac = None  # type: Optional[str]
122        self.dummy = False
123        if data is not None:
124            self.update(data)
125
126    def update(self, data):
127        # type: (Dict[str, Any]) -> None
128        """Update info about a network interface according
129        to a given dictionary. Such data is provided by providers
130        """
131        self.name = data.get('name', "")
132        self.description = data.get('description', "")
133        self.network_name = data.get('network_name', "")
134        self.index = data.get('index', 0)
135        self.ip = data.get('ip', "")
136        self.type = data.get('type', -1)
137        self.mac = data.get('mac', "")
138        self.flags = data.get('flags', 0)
139        self.dummy = data.get('dummy', False)
140
141        for ip in data.get('ips', []):
142            if in6_isvalid(ip):
143                self.ips[6].append(ip)
144            else:
145                self.ips[4].append(ip)
146
147        # An interface often has multiple IPv6 so we don't store
148        # a "main" one, unlike IPv4.
149        if self.ips[4] and not self.ip:
150            self.ip = self.ips[4][0]
151
152    def __eq__(self, other):
153        # type: (Any) -> bool
154        if isinstance(other, str):
155            return other in [self.name, self.network_name, self.description]
156        if isinstance(other, NetworkInterface):
157            return self.__dict__ == other.__dict__
158        return False
159
160    def __ne__(self, other):
161        # type: (Any) -> bool
162        return not self.__eq__(other)
163
164    def __hash__(self):
165        # type: () -> int
166        return hash(self.network_name)
167
168    def is_valid(self):
169        # type: () -> bool
170        if self.dummy:
171            return False
172        return self.provider._is_valid(self)
173
174    def l2socket(self):
175        # type: () -> Type[scapy.supersocket.SuperSocket]
176        return self.provider._l2socket(self)
177
178    def l2listen(self):
179        # type: () -> Type[scapy.supersocket.SuperSocket]
180        return self.provider._l2listen(self)
181
182    def l3socket(self, ipv6=False):
183        # type: (bool) -> Type[scapy.supersocket.SuperSocket]
184        return self.provider._l3socket(self, ipv6)
185
186    def __repr__(self):
187        # type: () -> str
188        return "<%s %s [%s]>" % (self.__class__.__name__,
189                                 self.description,
190                                 self.dummy and "dummy" or (self.flags or ""))
191
192    def __str__(self):
193        # type: () -> str
194        return self.network_name
195
196    def __add__(self, other):
197        # type: (str) -> str
198        return self.network_name + other
199
200    def __radd__(self, other):
201        # type: (str) -> str
202        return other + self.network_name
203
204
205_GlobInterfaceType = Union[NetworkInterface, str]
206
207
208class NetworkInterfaceDict(UserDict[str, NetworkInterface]):
209    """Store information about network interfaces and convert between names"""
210
211    def __init__(self):
212        # type: () -> None
213        self.providers = {}  # type: Dict[Type[InterfaceProvider], InterfaceProvider]  # noqa: E501
214        super(NetworkInterfaceDict, self).__init__()
215
216    def _load(self,
217              dat,  # type: Dict[str, NetworkInterface]
218              prov,  # type: InterfaceProvider
219              ):
220        # type: (...) -> None
221        for ifname, iface in dat.items():
222            if ifname in self.data:
223                # Handle priorities: keep except if libpcap
224                if prov.libpcap:
225                    self.data[ifname] = iface
226            else:
227                self.data[ifname] = iface
228
229    def register_provider(self, provider):
230        # type: (type) -> None
231        prov = provider()
232        self.providers[provider] = prov
233        if self.data:
234            # late registration
235            self._load(prov.reload(), prov)
236
237    def load_confiface(self):
238        # type: () -> None
239        """
240        Reload conf.iface
241        """
242        # Can only be called after conf.route is populated
243        if not conf.route:
244            raise ValueError("Error: conf.route isn't populated !")
245        conf.iface = get_working_if()  # type: ignore
246
247    def _reload_provs(self):
248        # type: () -> None
249        self.clear()
250        for prov in self.providers.values():
251            self._load(prov.reload(), prov)
252
253    def reload(self):
254        # type: () -> None
255        self._reload_provs()
256        if not conf.route:
257            # routes are not loaded yet.
258            return
259        self.load_confiface()
260
261    def dev_from_name(self, name):
262        # type: (str) -> NetworkInterface
263        """Return the first network device name for a given
264        device name.
265        """
266        try:
267            return next(iface for iface in self.values()
268                        if (iface.name == name or iface.description == name))
269        except (StopIteration, RuntimeError):
270            raise ValueError("Unknown network interface %r" % name)
271
272    def dev_from_networkname(self, network_name):
273        # type: (str) -> NoReturn
274        """Return interface for a given network device name."""
275        try:
276            return next(iface for iface in self.values()  # type: ignore
277                        if iface.network_name == network_name)
278        except (StopIteration, RuntimeError):
279            raise ValueError(
280                "Unknown network interface %r" %
281                network_name)
282
283    def dev_from_index(self, if_index):
284        # type: (int) -> NetworkInterface
285        """Return interface name from interface index"""
286        try:
287            if_index = int(if_index)  # Backward compatibility
288            return next(iface for iface in self.values()
289                        if iface.index == if_index)
290        except (StopIteration, RuntimeError):
291            if str(if_index) == "1":
292                # Test if the loopback interface is set up
293                return self.dev_from_networkname(conf.loopback_name)
294            raise ValueError("Unknown network interface index %r" % if_index)
295
296    def _add_fake_iface(self, ifname, mac="00:00:00:00:00:00"):
297        # type: (str, str) -> None
298        """Internal function used for a testing purpose"""
299        data = {
300            'name': ifname,
301            'description': ifname,
302            'network_name': ifname,
303            'index': -1000,
304            'dummy': True,
305            'mac': mac,
306            'flags': 0,
307            'ips': ["127.0.0.1", "::"],
308            # Windows only
309            'guid': "{%s}" % uuid.uuid1(),
310            'ipv4_metric': 0,
311            'ipv6_metric': 0,
312            'nameservers': [],
313        }
314        if WINDOWS:
315            from scapy.arch.windows import NetworkInterface_Win, \
316                WindowsInterfacesProvider
317
318            class FakeProv(WindowsInterfacesProvider):
319                name = "fake"
320
321            self.data[ifname] = NetworkInterface_Win(
322                FakeProv(),
323                data
324            )
325        else:
326            self.data[ifname] = NetworkInterface(InterfaceProvider(), data)
327
328    def show(self, print_result=True, hidden=False, **kwargs):
329        # type: (bool, bool, **Any) -> Optional[str]
330        """
331        Print list of available network interfaces in human readable form
332
333        :param print_result: print the results if True, else return it
334        :param hidden: if True, also displays invalid interfaces
335        """
336        res = defaultdict(list)
337        for iface_name in sorted(self.data):
338            dev = self.data[iface_name]
339            if not hidden and not dev.is_valid():
340                continue
341            prov = dev.provider
342            res[(prov.headers, prov.header_sort)].append(
343                (prov.name,) + prov._format(dev, **kwargs)
344            )
345        output = ""
346        for key in res:
347            hdrs, sortBy = key
348            output += pretty_list(
349                res[key],
350                [("Source",) + hdrs],
351                sortBy=sortBy
352            ) + "\n"
353        output = output[:-1]
354        if print_result:
355            print(output)
356            return None
357        else:
358            return output
359
360    def __repr__(self):
361        # type: () -> str
362        return self.show(print_result=False)  # type: ignore
363
364
365conf.ifaces = IFACES = ifaces = NetworkInterfaceDict()
366
367
368def get_if_list():
369    # type: () -> List[str]
370    """Return a list of interface names"""
371    return list(conf.ifaces.keys())
372
373
374def get_working_if():
375    # type: () -> Optional[NetworkInterface]
376    """Return an interface that works"""
377    # return the interface associated with the route with smallest
378    # mask (route by default if it exists)
379    routes = conf.route.routes[:]
380    routes.sort(key=lambda x: x[1])
381    ifaces = (x[3] for x in routes)
382    # First check the routing ifaces from best to worse,
383    # then check all the available ifaces as backup.
384    for ifname in itertools.chain(ifaces, conf.ifaces.values()):
385        try:
386            iface = conf.ifaces.dev_from_networkname(ifname)  # type: ignore
387            if iface.is_valid():
388                return iface
389        except ValueError:
390            pass
391    # There is no hope left
392    try:
393        return conf.ifaces.dev_from_networkname(conf.loopback_name)
394    except ValueError:
395        return None
396
397
398def get_working_ifaces():
399    # type: () -> List[NetworkInterface]
400    """Return all interfaces that work"""
401    return [iface for iface in conf.ifaces.values() if iface.is_valid()]
402
403
404def dev_from_networkname(network_name):
405    # type: (str) -> NetworkInterface
406    """Return Scapy device name for given network device name"""
407    return conf.ifaces.dev_from_networkname(network_name)
408
409
410def dev_from_index(if_index):
411    # type: (int) -> NetworkInterface
412    """Return interface for a given interface index"""
413    return conf.ifaces.dev_from_index(if_index)
414
415
416def resolve_iface(dev, retry=True):
417    # type: (_GlobInterfaceType, bool) -> NetworkInterface
418    """
419    Resolve an interface name into the interface
420    """
421    if isinstance(dev, NetworkInterface):
422        return dev
423    try:
424        return conf.ifaces.dev_from_name(dev)
425    except ValueError:
426        try:
427            return conf.ifaces.dev_from_networkname(dev)
428        except ValueError:
429            pass
430    if not retry:
431        raise ValueError("Interface '%s' not found !" % dev)
432    # Nothing found yet. Reload to detect if it was added recently
433    conf.ifaces.reload()
434    return resolve_iface(dev, retry=False)
435
436
437def network_name(dev):
438    # type: (_GlobInterfaceType) -> str
439    """
440    Resolves the device network name of a device or Scapy NetworkInterface
441    """
442    return resolve_iface(dev).network_name
443
444
445def show_interfaces(resolve_mac=True):
446    # type: (bool) -> None
447    """Print list of available network interfaces"""
448    return conf.ifaces.show(resolve_mac)  # type: ignore
449