• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Generators and packet meta classes.
8"""
9
10################
11#  Generators  #
12################
13
14
15from functools import reduce
16import abc
17import operator
18import os
19import random
20import re
21import socket
22import struct
23import subprocess
24import types
25import warnings
26
27import scapy
28from scapy.error import Scapy_Exception
29from scapy.consts import WINDOWS
30
31from typing import (
32    Any,
33    Dict,
34    Generic,
35    Iterator,
36    List,
37    Optional,
38    Tuple,
39    Type,
40    TypeVar,
41    Union,
42    cast,
43    TYPE_CHECKING,
44)
45
46if TYPE_CHECKING:
47    try:
48        import pyx
49    except ImportError:
50        pass
51    from scapy.packet import Packet
52
53_T = TypeVar("_T")
54
55
56class Gen(Generic[_T]):
57    __slots__ = []  # type: List[str]
58
59    def __iter__(self):
60        # type: () -> Iterator[_T]
61        return iter([])
62
63    def __iterlen__(self):
64        # type: () -> int
65        return sum(1 for _ in iter(self))
66
67
68def _get_values(value):
69    # type: (Any) -> Any
70    """Generate a range object from (start, stop[, step]) tuples, or
71    return value.
72
73    """
74    if (isinstance(value, tuple) and (2 <= len(value) <= 3) and
75            all(hasattr(i, "__int__") for i in value)):
76        # We use values[1] + 1 as stop value for (x)range to maintain
77        # the behavior of using tuples as field `values`
78        return range(*((int(value[0]), int(value[1]) + 1) +
79                       tuple(int(v) for v in value[2:])))
80    return value
81
82
83class SetGen(Gen[_T]):
84    def __init__(self, values, _iterpacket=1):
85        # type: (Any, int) -> None
86        self._iterpacket = _iterpacket
87        if isinstance(values, (list, BasePacketList)):
88            self.values = [_get_values(val) for val in values]
89        else:
90            self.values = [_get_values(values)]
91
92    def __iter__(self):
93        # type: () -> Iterator[Any]
94        for i in self.values:
95            if (isinstance(i, Gen) and
96                (self._iterpacket or not isinstance(i, BasePacket))) or (
97                    isinstance(i, (range, types.GeneratorType))):
98                for j in i:
99                    yield j
100            else:
101                yield i
102
103    def __len__(self):
104        # type: () -> int
105        return self.__iterlen__()
106
107    def __repr__(self):
108        # type: () -> str
109        return "<SetGen %r>" % self.values
110
111
112class _ScopedIP(str):
113    """
114    A str that also holds extra attributes.
115    """
116    __slots__ = ["scope"]
117
118    def __init__(self, _: str) -> None:
119        self.scope = None
120
121    def __repr__(self) -> str:
122        val = super(_ScopedIP, self).__repr__()
123        if self.scope is not None:
124            return "ScopedIP(%s, scope=%s)" % (val, repr(self.scope))
125        return val
126
127
128def ScopedIP(net: str, scope: Optional[Any] = None) -> _ScopedIP:
129    """
130    An str that also holds extra attributes.
131
132    Examples::
133
134        >>> ScopedIP("224.0.0.1%eth0")  # interface 'eth0'
135        >>> ScopedIP("224.0.0.1%1")  # interface index 1
136        >>> ScopedIP("224.0.0.1", scope=conf.iface)
137    """
138    if "%" in net:
139        try:
140            net, scope = net.split("%", 1)
141        except ValueError:
142            raise Scapy_Exception("Scope identifier can only be present once !")
143    if scope is not None:
144        from scapy.interfaces import resolve_iface, network_name, dev_from_index
145        try:
146            iface = dev_from_index(int(scope))
147        except (ValueError, TypeError):
148            iface = resolve_iface(scope)
149        if not iface.is_valid():
150            raise Scapy_Exception(
151                "RFC6874 scope identifier '%s' could not be resolved to a "
152                "valid interface !" % scope
153            )
154        scope = network_name(iface)
155    x = _ScopedIP(net)
156    x.scope = scope
157    return x
158
159
160class Net(Gen[str]):
161    """
162    Network object from an IP address or hostname and mask
163
164    Examples:
165
166        - With mask::
167
168            >>> list(Net("192.168.0.1/24"))
169            ['192.168.0.0', '192.168.0.1', ..., '192.168.0.255']
170
171        - With 'end'::
172
173            >>> list(Net("192.168.0.100", "192.168.0.200"))
174            ['192.168.0.100', '192.168.0.101', ..., '192.168.0.200']
175
176        - With 'scope' (for multicast)::
177
178            >>> Net("224.0.0.1%lo")
179            >>> Net("224.0.0.1", scope=conf.iface)
180    """
181    name = "Net"  # type: str
182    family = socket.AF_INET  # type: int
183    max_mask = 32  # type: int
184
185    @classmethod
186    def name2addr(cls, name):
187        # type: (str) -> str
188        try:
189            return next(
190                addr_port[0]
191                for family, _, _, _, addr_port in
192                socket.getaddrinfo(name, None, cls.family)
193                if family == cls.family
194            )
195        except socket.error:
196            if re.search("(^|\\.)[0-9]+-[0-9]+($|\\.)", name) is not None:
197                raise Scapy_Exception("Ranges are no longer accepted in %s()" %
198                                      cls.__name__)
199            raise
200
201    @classmethod
202    def ip2int(cls, addr):
203        # type: (str) -> int
204        return cast(int, struct.unpack(
205            "!I", socket.inet_aton(cls.name2addr(addr))
206        )[0])
207
208    @staticmethod
209    def int2ip(val):
210        # type: (int) -> str
211        return socket.inet_ntoa(struct.pack('!I', val))
212
213    def __init__(self, net, stop=None, scope=None):
214        # type: (str, Optional[str], Optional[str]) -> None
215        if "*" in net:
216            raise Scapy_Exception("Wildcards are no longer accepted in %s()" %
217                                  self.__class__.__name__)
218        self.scope = None
219        if "%" in net:
220            net = ScopedIP(net)
221        if isinstance(net, _ScopedIP):
222            self.scope = net.scope
223        if stop is None:
224            try:
225                net, mask = net.split("/", 1)
226            except ValueError:
227                self.mask = self.max_mask  # type: Union[None, int]
228            else:
229                self.mask = int(mask)
230            self.net = net  # type: Union[None, str]
231            inv_mask = self.max_mask - self.mask
232            self.start = self.ip2int(net) >> inv_mask << inv_mask
233            self.count = 1 << inv_mask
234            self.stop = self.start + self.count - 1
235        else:
236            self.start = self.ip2int(net)
237            self.stop = self.ip2int(stop)
238            self.count = self.stop - self.start + 1
239            self.net = self.mask = None
240
241    def __str__(self):
242        # type: () -> str
243        return next(iter(self), "")
244
245    def __iter__(self):
246        # type: () -> Iterator[str]
247        # Python 2 won't handle huge (> sys.maxint) values in range()
248        for i in range(self.count):
249            yield ScopedIP(
250                self.int2ip(self.start + i),
251                scope=self.scope,
252            )
253
254    def __len__(self):
255        # type: () -> int
256        return self.count
257
258    def __iterlen__(self):
259        # type: () -> int
260        # for compatibility
261        return len(self)
262
263    def choice(self):
264        # type: () -> str
265        return ScopedIP(
266            self.int2ip(random.randint(self.start, self.stop)),
267            scope=self.scope,
268        )
269
270    def __repr__(self):
271        # type: () -> str
272        scope_id_repr = ""
273        if self.scope:
274            scope_id_repr = ", scope=%s" % repr(self.scope)
275        if self.mask is not None:
276            return '%s("%s/%d"%s)' % (
277                self.__class__.__name__,
278                self.net,
279                self.mask,
280                scope_id_repr,
281            )
282        return '%s("%s", "%s"%s)' % (
283            self.__class__.__name__,
284            self.int2ip(self.start),
285            self.int2ip(self.stop),
286            scope_id_repr,
287        )
288
289    def __eq__(self, other):
290        # type: (Any) -> bool
291        if isinstance(other, str):
292            return self == self.__class__(other)
293        if not isinstance(other, Net):
294            return False
295        if self.family != other.family:
296            return False
297        return (self.start == other.start) and (self.stop == other.stop)
298
299    def __ne__(self, other):
300        # type: (Any) -> bool
301        # Python 2.7 compat
302        return not self == other
303
304    def __hash__(self):
305        # type: () -> int
306        return hash(("scapy.Net", self.family, self.start, self.stop, self.scope))
307
308    def __contains__(self, other):
309        # type: (Any) -> bool
310        if isinstance(other, int):
311            return self.start <= other <= self.stop
312        if isinstance(other, str):
313            return self.__class__(other) in self
314        if type(other) is not self.__class__:
315            return False
316        return self.start <= other.start <= other.stop <= self.stop
317
318
319class OID(Gen[str]):
320    name = "OID"
321
322    def __init__(self, oid):
323        # type: (str) -> None
324        self.oid = oid
325        self.cmpt = []
326        fmt = []
327        for i in oid.split("."):
328            if "-" in i:
329                fmt.append("%i")
330                self.cmpt.append(tuple(map(int, i.split("-"))))
331            else:
332                fmt.append(i)
333        self.fmt = ".".join(fmt)
334
335    def __repr__(self):
336        # type: () -> str
337        return "OID(%r)" % self.oid
338
339    def __iter__(self):
340        # type: () -> Iterator[str]
341        ii = [k[0] for k in self.cmpt]
342        while True:
343            yield self.fmt % tuple(ii)
344            i = 0
345            while True:
346                if i >= len(ii):
347                    return
348                if ii[i] < self.cmpt[i][1]:
349                    ii[i] += 1
350                    break
351                else:
352                    ii[i] = self.cmpt[i][0]
353                i += 1
354
355    def __iterlen__(self):
356        # type: () -> int
357        return reduce(operator.mul, (max(y - x, 0) + 1 for (x, y) in self.cmpt), 1)  # noqa: E501
358
359
360######################################
361#  Packet abstract and base classes  #
362######################################
363
364class Packet_metaclass(type):
365    def __new__(cls: Type[_T],
366                name,  # type: str
367                bases,  # type: Tuple[type, ...]
368                dct  # type: Dict[str, Any]
369                ):
370        # type: (...) -> Type['Packet']
371        if "fields_desc" in dct:  # perform resolution of references to other packets  # noqa: E501
372            current_fld = dct["fields_desc"]  # type: List[Union[scapy.fields.Field[Any, Any], Packet_metaclass]]  # noqa: E501
373            resolved_fld = []  # type: List[scapy.fields.Field[Any, Any]]
374            for fld_or_pkt in current_fld:
375                if isinstance(fld_or_pkt, Packet_metaclass):
376                    # reference to another fields_desc
377                    for pkt_fld in fld_or_pkt.fields_desc:
378                        resolved_fld.append(pkt_fld)
379                else:
380                    resolved_fld.append(fld_or_pkt)
381        else:  # look for a fields_desc in parent classes
382            resolved_fld = []
383            for b in bases:
384                if hasattr(b, "fields_desc"):
385                    resolved_fld = b.fields_desc
386                    break
387
388        if resolved_fld:  # perform default value replacements
389            final_fld = []  # type: List[scapy.fields.Field[Any, Any]]
390            names = []
391            for f in resolved_fld:
392                if f.name in names:
393                    war_msg = (
394                        "Packet '%s' has a duplicated '%s' field ! "
395                        "If you are using several ConditionalFields, have "
396                        "a look at MultipleTypeField instead ! This will "
397                        "become a SyntaxError in a future version of "
398                        "Scapy !" % (
399                            name, f.name
400                        )
401                    )
402                    warnings.warn(war_msg, SyntaxWarning)
403                names.append(f.name)
404                if f.name in dct:
405                    f = f.copy()
406                    f.default = dct[f.name]
407                    del dct[f.name]
408                final_fld.append(f)
409
410            dct["fields_desc"] = final_fld
411
412        dct.setdefault("__slots__", [])
413        for attr in ["name", "overload_fields"]:
414            try:
415                dct["_%s" % attr] = dct.pop(attr)
416            except KeyError:
417                pass
418        # Build and inject signature
419        try:
420            # Py3 only
421            import inspect
422            dct["__signature__"] = inspect.Signature([
423                inspect.Parameter("_pkt", inspect.Parameter.POSITIONAL_ONLY),
424            ] + [
425                inspect.Parameter(f.name,
426                                  inspect.Parameter.KEYWORD_ONLY,
427                                  default=f.default)
428                for f in dct["fields_desc"]
429            ])
430        except (ImportError, AttributeError, KeyError):
431            pass
432        newcls = cast(Type['Packet'], type.__new__(cls, name, bases, dct))
433        # Note: below can't be typed because we use attributes
434        # created dynamically..
435        newcls.__all_slots__ = set(  # type: ignore
436            attr
437            for cls in newcls.__mro__ if hasattr(cls, "__slots__")
438            for attr in cls.__slots__
439        )
440
441        newcls.aliastypes = (  # type: ignore
442            [newcls] + getattr(newcls, "aliastypes", [])
443        )
444
445        if hasattr(newcls, "register_variant"):
446            newcls.register_variant()
447        for _f in newcls.fields_desc:
448            if hasattr(_f, "register_owner"):
449                _f.register_owner(newcls)
450        if newcls.__name__[0] != "_":
451            from scapy import config
452            config.conf.layers.register(newcls)
453        return newcls
454
455    def __getattr__(self, attr):
456        # type: (str) -> Any
457        for k in self.fields_desc:
458            if k.name == attr:
459                return k
460        raise AttributeError(attr)
461
462    def __call__(cls,
463                 *args,  # type: Any
464                 **kargs  # type: Any
465                 ):
466        # type: (...) -> 'Packet'
467        if "dispatch_hook" in cls.__dict__:
468            try:
469                cls = cls.dispatch_hook(*args, **kargs)
470            except Exception:
471                from scapy import config
472                if config.conf.debug_dissector:
473                    raise
474                cls = config.conf.raw_layer
475        i = cls.__new__(
476            cls,  # type: ignore
477            cls.__name__,
478            cls.__bases__,
479            cls.__dict__  # type: ignore
480        )
481        i.__init__(*args, **kargs)
482        return i  # type: ignore
483
484
485# Note: see compat.py for an explanation
486
487class Field_metaclass(type):
488    def __new__(cls: Type[_T],
489                name,  # type: str
490                bases,  # type: Tuple[type, ...]
491                dct  # type: Dict[str, Any]
492                ):
493        # type: (...) -> Type[_T]
494        dct.setdefault("__slots__", [])
495        newcls = type.__new__(cls, name, bases, dct)
496        return newcls  # type: ignore
497
498
499PacketList_metaclass = Field_metaclass
500
501
502class BasePacket(Gen['Packet']):
503    __slots__ = []  # type: List[str]
504
505
506#############################
507#  Packet list base class   #
508#############################
509
510class BasePacketList(Gen[_T]):
511    __slots__ = []  # type: List[str]
512
513
514class _CanvasDumpExtended(object):
515    @abc.abstractmethod
516    def canvas_dump(self, layer_shift=0, rebuild=1):
517        # type: (int, int) -> pyx.canvas.canvas
518        pass
519
520    def psdump(self, filename=None, **kargs):
521        # type: (Optional[str], **Any) -> None
522        """
523        psdump(filename=None, layer_shift=0, rebuild=1)
524
525        Creates an EPS file describing a packet. If filename is not provided a
526        temporary file is created and gs is called.
527
528        :param filename: the file's filename
529        """
530        from scapy.config import conf
531        from scapy.utils import get_temp_file, ContextManagerSubprocess
532        canvas = self.canvas_dump(**kargs)
533        if filename is None:
534            fname = get_temp_file(autoext=kargs.get("suffix", ".eps"))
535            canvas.writeEPSfile(fname)
536            if WINDOWS and not conf.prog.psreader:
537                os.startfile(fname)
538            else:
539                with ContextManagerSubprocess(conf.prog.psreader):
540                    subprocess.Popen([conf.prog.psreader, fname])
541        else:
542            canvas.writeEPSfile(filename)
543        print()
544
545    def pdfdump(self, filename=None, **kargs):
546        # type: (Optional[str], **Any) -> None
547        """
548        pdfdump(filename=None, layer_shift=0, rebuild=1)
549
550        Creates a PDF file describing a packet. If filename is not provided a
551        temporary file is created and xpdf is called.
552
553        :param filename: the file's filename
554        """
555        from scapy.config import conf
556        from scapy.utils import get_temp_file, ContextManagerSubprocess
557        canvas = self.canvas_dump(**kargs)
558        if filename is None:
559            fname = get_temp_file(autoext=kargs.get("suffix", ".pdf"))
560            canvas.writePDFfile(fname)
561            if WINDOWS and not conf.prog.pdfreader:
562                os.startfile(fname)
563            else:
564                with ContextManagerSubprocess(conf.prog.pdfreader):
565                    subprocess.Popen([conf.prog.pdfreader, fname])
566        else:
567            canvas.writePDFfile(filename)
568        print()
569
570    def svgdump(self, filename=None, **kargs):
571        # type: (Optional[str], **Any) -> None
572        """
573        svgdump(filename=None, layer_shift=0, rebuild=1)
574
575        Creates an SVG file describing a packet. If filename is not provided a
576        temporary file is created and gs is called.
577
578        :param filename: the file's filename
579        """
580        from scapy.config import conf
581        from scapy.utils import get_temp_file, ContextManagerSubprocess
582        canvas = self.canvas_dump(**kargs)
583        if filename is None:
584            fname = get_temp_file(autoext=kargs.get("suffix", ".svg"))
585            canvas.writeSVGfile(fname)
586            if WINDOWS and not conf.prog.svgreader:
587                os.startfile(fname)
588            else:
589                with ContextManagerSubprocess(conf.prog.svgreader):
590                    subprocess.Popen([conf.prog.svgreader, fname])
591        else:
592            canvas.writeSVGfile(filename)
593        print()
594