1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7Generators and packet meta classes. 8""" 9 10################ 11# Generators # 12################ 13 14 15from functools import reduce 16import abc 17import operator 18import os 19import random 20import re 21import socket 22import struct 23import subprocess 24import types 25import warnings 26 27import scapy 28from scapy.error import Scapy_Exception 29from scapy.consts import WINDOWS 30 31from typing import ( 32 Any, 33 Dict, 34 Generic, 35 Iterator, 36 List, 37 Optional, 38 Tuple, 39 Type, 40 TypeVar, 41 Union, 42 cast, 43 TYPE_CHECKING, 44) 45 46if TYPE_CHECKING: 47 try: 48 import pyx 49 except ImportError: 50 pass 51 from scapy.packet import Packet 52 53_T = TypeVar("_T") 54 55 56class Gen(Generic[_T]): 57 __slots__ = [] # type: List[str] 58 59 def __iter__(self): 60 # type: () -> Iterator[_T] 61 return iter([]) 62 63 def __iterlen__(self): 64 # type: () -> int 65 return sum(1 for _ in iter(self)) 66 67 68def _get_values(value): 69 # type: (Any) -> Any 70 """Generate a range object from (start, stop[, step]) tuples, or 71 return value. 72 73 """ 74 if (isinstance(value, tuple) and (2 <= len(value) <= 3) and 75 all(hasattr(i, "__int__") for i in value)): 76 # We use values[1] + 1 as stop value for (x)range to maintain 77 # the behavior of using tuples as field `values` 78 return range(*((int(value[0]), int(value[1]) + 1) + 79 tuple(int(v) for v in value[2:]))) 80 return value 81 82 83class SetGen(Gen[_T]): 84 def __init__(self, values, _iterpacket=1): 85 # type: (Any, int) -> None 86 self._iterpacket = _iterpacket 87 if isinstance(values, (list, BasePacketList)): 88 self.values = [_get_values(val) for val in values] 89 else: 90 self.values = [_get_values(values)] 91 92 def __iter__(self): 93 # type: () -> Iterator[Any] 94 for i in self.values: 95 if (isinstance(i, Gen) and 96 (self._iterpacket or not isinstance(i, BasePacket))) or ( 97 isinstance(i, (range, types.GeneratorType))): 98 for j in i: 99 yield j 100 else: 101 yield i 102 103 def __len__(self): 104 # type: () -> int 105 return self.__iterlen__() 106 107 def __repr__(self): 108 # type: () -> str 109 return "<SetGen %r>" % self.values 110 111 112class _ScopedIP(str): 113 """ 114 A str that also holds extra attributes. 115 """ 116 __slots__ = ["scope"] 117 118 def __init__(self, _: str) -> None: 119 self.scope = None 120 121 def __repr__(self) -> str: 122 val = super(_ScopedIP, self).__repr__() 123 if self.scope is not None: 124 return "ScopedIP(%s, scope=%s)" % (val, repr(self.scope)) 125 return val 126 127 128def ScopedIP(net: str, scope: Optional[Any] = None) -> _ScopedIP: 129 """ 130 An str that also holds extra attributes. 131 132 Examples:: 133 134 >>> ScopedIP("224.0.0.1%eth0") # interface 'eth0' 135 >>> ScopedIP("224.0.0.1%1") # interface index 1 136 >>> ScopedIP("224.0.0.1", scope=conf.iface) 137 """ 138 if "%" in net: 139 try: 140 net, scope = net.split("%", 1) 141 except ValueError: 142 raise Scapy_Exception("Scope identifier can only be present once !") 143 if scope is not None: 144 from scapy.interfaces import resolve_iface, network_name, dev_from_index 145 try: 146 iface = dev_from_index(int(scope)) 147 except (ValueError, TypeError): 148 iface = resolve_iface(scope) 149 if not iface.is_valid(): 150 raise Scapy_Exception( 151 "RFC6874 scope identifier '%s' could not be resolved to a " 152 "valid interface !" % scope 153 ) 154 scope = network_name(iface) 155 x = _ScopedIP(net) 156 x.scope = scope 157 return x 158 159 160class Net(Gen[str]): 161 """ 162 Network object from an IP address or hostname and mask 163 164 Examples: 165 166 - With mask:: 167 168 >>> list(Net("192.168.0.1/24")) 169 ['192.168.0.0', '192.168.0.1', ..., '192.168.0.255'] 170 171 - With 'end':: 172 173 >>> list(Net("192.168.0.100", "192.168.0.200")) 174 ['192.168.0.100', '192.168.0.101', ..., '192.168.0.200'] 175 176 - With 'scope' (for multicast):: 177 178 >>> Net("224.0.0.1%lo") 179 >>> Net("224.0.0.1", scope=conf.iface) 180 """ 181 name = "Net" # type: str 182 family = socket.AF_INET # type: int 183 max_mask = 32 # type: int 184 185 @classmethod 186 def name2addr(cls, name): 187 # type: (str) -> str 188 try: 189 return next( 190 addr_port[0] 191 for family, _, _, _, addr_port in 192 socket.getaddrinfo(name, None, cls.family) 193 if family == cls.family 194 ) 195 except socket.error: 196 if re.search("(^|\\.)[0-9]+-[0-9]+($|\\.)", name) is not None: 197 raise Scapy_Exception("Ranges are no longer accepted in %s()" % 198 cls.__name__) 199 raise 200 201 @classmethod 202 def ip2int(cls, addr): 203 # type: (str) -> int 204 return cast(int, struct.unpack( 205 "!I", socket.inet_aton(cls.name2addr(addr)) 206 )[0]) 207 208 @staticmethod 209 def int2ip(val): 210 # type: (int) -> str 211 return socket.inet_ntoa(struct.pack('!I', val)) 212 213 def __init__(self, net, stop=None, scope=None): 214 # type: (str, Optional[str], Optional[str]) -> None 215 if "*" in net: 216 raise Scapy_Exception("Wildcards are no longer accepted in %s()" % 217 self.__class__.__name__) 218 self.scope = None 219 if "%" in net: 220 net = ScopedIP(net) 221 if isinstance(net, _ScopedIP): 222 self.scope = net.scope 223 if stop is None: 224 try: 225 net, mask = net.split("/", 1) 226 except ValueError: 227 self.mask = self.max_mask # type: Union[None, int] 228 else: 229 self.mask = int(mask) 230 self.net = net # type: Union[None, str] 231 inv_mask = self.max_mask - self.mask 232 self.start = self.ip2int(net) >> inv_mask << inv_mask 233 self.count = 1 << inv_mask 234 self.stop = self.start + self.count - 1 235 else: 236 self.start = self.ip2int(net) 237 self.stop = self.ip2int(stop) 238 self.count = self.stop - self.start + 1 239 self.net = self.mask = None 240 241 def __str__(self): 242 # type: () -> str 243 return next(iter(self), "") 244 245 def __iter__(self): 246 # type: () -> Iterator[str] 247 # Python 2 won't handle huge (> sys.maxint) values in range() 248 for i in range(self.count): 249 yield ScopedIP( 250 self.int2ip(self.start + i), 251 scope=self.scope, 252 ) 253 254 def __len__(self): 255 # type: () -> int 256 return self.count 257 258 def __iterlen__(self): 259 # type: () -> int 260 # for compatibility 261 return len(self) 262 263 def choice(self): 264 # type: () -> str 265 return ScopedIP( 266 self.int2ip(random.randint(self.start, self.stop)), 267 scope=self.scope, 268 ) 269 270 def __repr__(self): 271 # type: () -> str 272 scope_id_repr = "" 273 if self.scope: 274 scope_id_repr = ", scope=%s" % repr(self.scope) 275 if self.mask is not None: 276 return '%s("%s/%d"%s)' % ( 277 self.__class__.__name__, 278 self.net, 279 self.mask, 280 scope_id_repr, 281 ) 282 return '%s("%s", "%s"%s)' % ( 283 self.__class__.__name__, 284 self.int2ip(self.start), 285 self.int2ip(self.stop), 286 scope_id_repr, 287 ) 288 289 def __eq__(self, other): 290 # type: (Any) -> bool 291 if isinstance(other, str): 292 return self == self.__class__(other) 293 if not isinstance(other, Net): 294 return False 295 if self.family != other.family: 296 return False 297 return (self.start == other.start) and (self.stop == other.stop) 298 299 def __ne__(self, other): 300 # type: (Any) -> bool 301 # Python 2.7 compat 302 return not self == other 303 304 def __hash__(self): 305 # type: () -> int 306 return hash(("scapy.Net", self.family, self.start, self.stop, self.scope)) 307 308 def __contains__(self, other): 309 # type: (Any) -> bool 310 if isinstance(other, int): 311 return self.start <= other <= self.stop 312 if isinstance(other, str): 313 return self.__class__(other) in self 314 if type(other) is not self.__class__: 315 return False 316 return self.start <= other.start <= other.stop <= self.stop 317 318 319class OID(Gen[str]): 320 name = "OID" 321 322 def __init__(self, oid): 323 # type: (str) -> None 324 self.oid = oid 325 self.cmpt = [] 326 fmt = [] 327 for i in oid.split("."): 328 if "-" in i: 329 fmt.append("%i") 330 self.cmpt.append(tuple(map(int, i.split("-")))) 331 else: 332 fmt.append(i) 333 self.fmt = ".".join(fmt) 334 335 def __repr__(self): 336 # type: () -> str 337 return "OID(%r)" % self.oid 338 339 def __iter__(self): 340 # type: () -> Iterator[str] 341 ii = [k[0] for k in self.cmpt] 342 while True: 343 yield self.fmt % tuple(ii) 344 i = 0 345 while True: 346 if i >= len(ii): 347 return 348 if ii[i] < self.cmpt[i][1]: 349 ii[i] += 1 350 break 351 else: 352 ii[i] = self.cmpt[i][0] 353 i += 1 354 355 def __iterlen__(self): 356 # type: () -> int 357 return reduce(operator.mul, (max(y - x, 0) + 1 for (x, y) in self.cmpt), 1) # noqa: E501 358 359 360###################################### 361# Packet abstract and base classes # 362###################################### 363 364class Packet_metaclass(type): 365 def __new__(cls: Type[_T], 366 name, # type: str 367 bases, # type: Tuple[type, ...] 368 dct # type: Dict[str, Any] 369 ): 370 # type: (...) -> Type['Packet'] 371 if "fields_desc" in dct: # perform resolution of references to other packets # noqa: E501 372 current_fld = dct["fields_desc"] # type: List[Union[scapy.fields.Field[Any, Any], Packet_metaclass]] # noqa: E501 373 resolved_fld = [] # type: List[scapy.fields.Field[Any, Any]] 374 for fld_or_pkt in current_fld: 375 if isinstance(fld_or_pkt, Packet_metaclass): 376 # reference to another fields_desc 377 for pkt_fld in fld_or_pkt.fields_desc: 378 resolved_fld.append(pkt_fld) 379 else: 380 resolved_fld.append(fld_or_pkt) 381 else: # look for a fields_desc in parent classes 382 resolved_fld = [] 383 for b in bases: 384 if hasattr(b, "fields_desc"): 385 resolved_fld = b.fields_desc 386 break 387 388 if resolved_fld: # perform default value replacements 389 final_fld = [] # type: List[scapy.fields.Field[Any, Any]] 390 names = [] 391 for f in resolved_fld: 392 if f.name in names: 393 war_msg = ( 394 "Packet '%s' has a duplicated '%s' field ! " 395 "If you are using several ConditionalFields, have " 396 "a look at MultipleTypeField instead ! This will " 397 "become a SyntaxError in a future version of " 398 "Scapy !" % ( 399 name, f.name 400 ) 401 ) 402 warnings.warn(war_msg, SyntaxWarning) 403 names.append(f.name) 404 if f.name in dct: 405 f = f.copy() 406 f.default = dct[f.name] 407 del dct[f.name] 408 final_fld.append(f) 409 410 dct["fields_desc"] = final_fld 411 412 dct.setdefault("__slots__", []) 413 for attr in ["name", "overload_fields"]: 414 try: 415 dct["_%s" % attr] = dct.pop(attr) 416 except KeyError: 417 pass 418 # Build and inject signature 419 try: 420 # Py3 only 421 import inspect 422 dct["__signature__"] = inspect.Signature([ 423 inspect.Parameter("_pkt", inspect.Parameter.POSITIONAL_ONLY), 424 ] + [ 425 inspect.Parameter(f.name, 426 inspect.Parameter.KEYWORD_ONLY, 427 default=f.default) 428 for f in dct["fields_desc"] 429 ]) 430 except (ImportError, AttributeError, KeyError): 431 pass 432 newcls = cast(Type['Packet'], type.__new__(cls, name, bases, dct)) 433 # Note: below can't be typed because we use attributes 434 # created dynamically.. 435 newcls.__all_slots__ = set( # type: ignore 436 attr 437 for cls in newcls.__mro__ if hasattr(cls, "__slots__") 438 for attr in cls.__slots__ 439 ) 440 441 newcls.aliastypes = ( # type: ignore 442 [newcls] + getattr(newcls, "aliastypes", []) 443 ) 444 445 if hasattr(newcls, "register_variant"): 446 newcls.register_variant() 447 for _f in newcls.fields_desc: 448 if hasattr(_f, "register_owner"): 449 _f.register_owner(newcls) 450 if newcls.__name__[0] != "_": 451 from scapy import config 452 config.conf.layers.register(newcls) 453 return newcls 454 455 def __getattr__(self, attr): 456 # type: (str) -> Any 457 for k in self.fields_desc: 458 if k.name == attr: 459 return k 460 raise AttributeError(attr) 461 462 def __call__(cls, 463 *args, # type: Any 464 **kargs # type: Any 465 ): 466 # type: (...) -> 'Packet' 467 if "dispatch_hook" in cls.__dict__: 468 try: 469 cls = cls.dispatch_hook(*args, **kargs) 470 except Exception: 471 from scapy import config 472 if config.conf.debug_dissector: 473 raise 474 cls = config.conf.raw_layer 475 i = cls.__new__( 476 cls, # type: ignore 477 cls.__name__, 478 cls.__bases__, 479 cls.__dict__ # type: ignore 480 ) 481 i.__init__(*args, **kargs) 482 return i # type: ignore 483 484 485# Note: see compat.py for an explanation 486 487class Field_metaclass(type): 488 def __new__(cls: Type[_T], 489 name, # type: str 490 bases, # type: Tuple[type, ...] 491 dct # type: Dict[str, Any] 492 ): 493 # type: (...) -> Type[_T] 494 dct.setdefault("__slots__", []) 495 newcls = type.__new__(cls, name, bases, dct) 496 return newcls # type: ignore 497 498 499PacketList_metaclass = Field_metaclass 500 501 502class BasePacket(Gen['Packet']): 503 __slots__ = [] # type: List[str] 504 505 506############################# 507# Packet list base class # 508############################# 509 510class BasePacketList(Gen[_T]): 511 __slots__ = [] # type: List[str] 512 513 514class _CanvasDumpExtended(object): 515 @abc.abstractmethod 516 def canvas_dump(self, layer_shift=0, rebuild=1): 517 # type: (int, int) -> pyx.canvas.canvas 518 pass 519 520 def psdump(self, filename=None, **kargs): 521 # type: (Optional[str], **Any) -> None 522 """ 523 psdump(filename=None, layer_shift=0, rebuild=1) 524 525 Creates an EPS file describing a packet. If filename is not provided a 526 temporary file is created and gs is called. 527 528 :param filename: the file's filename 529 """ 530 from scapy.config import conf 531 from scapy.utils import get_temp_file, ContextManagerSubprocess 532 canvas = self.canvas_dump(**kargs) 533 if filename is None: 534 fname = get_temp_file(autoext=kargs.get("suffix", ".eps")) 535 canvas.writeEPSfile(fname) 536 if WINDOWS and not conf.prog.psreader: 537 os.startfile(fname) 538 else: 539 with ContextManagerSubprocess(conf.prog.psreader): 540 subprocess.Popen([conf.prog.psreader, fname]) 541 else: 542 canvas.writeEPSfile(filename) 543 print() 544 545 def pdfdump(self, filename=None, **kargs): 546 # type: (Optional[str], **Any) -> None 547 """ 548 pdfdump(filename=None, layer_shift=0, rebuild=1) 549 550 Creates a PDF file describing a packet. If filename is not provided a 551 temporary file is created and xpdf is called. 552 553 :param filename: the file's filename 554 """ 555 from scapy.config import conf 556 from scapy.utils import get_temp_file, ContextManagerSubprocess 557 canvas = self.canvas_dump(**kargs) 558 if filename is None: 559 fname = get_temp_file(autoext=kargs.get("suffix", ".pdf")) 560 canvas.writePDFfile(fname) 561 if WINDOWS and not conf.prog.pdfreader: 562 os.startfile(fname) 563 else: 564 with ContextManagerSubprocess(conf.prog.pdfreader): 565 subprocess.Popen([conf.prog.pdfreader, fname]) 566 else: 567 canvas.writePDFfile(filename) 568 print() 569 570 def svgdump(self, filename=None, **kargs): 571 # type: (Optional[str], **Any) -> None 572 """ 573 svgdump(filename=None, layer_shift=0, rebuild=1) 574 575 Creates an SVG file describing a packet. If filename is not provided a 576 temporary file is created and gs is called. 577 578 :param filename: the file's filename 579 """ 580 from scapy.config import conf 581 from scapy.utils import get_temp_file, ContextManagerSubprocess 582 canvas = self.canvas_dump(**kargs) 583 if filename is None: 584 fname = get_temp_file(autoext=kargs.get("suffix", ".svg")) 585 canvas.writeSVGfile(fname) 586 if WINDOWS and not conf.prog.svgreader: 587 os.startfile(fname) 588 else: 589 with ContextManagerSubprocess(conf.prog.svgreader): 590 subprocess.Popen([conf.prog.svgreader, fname]) 591 else: 592 canvas.writeSVGfile(filename) 593 print() 594