• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Run commands when the Scapy interpreter starts.
8"""
9
10import builtins
11import code
12from io import StringIO
13import logging
14from queue import Queue
15import sys
16import threading
17import traceback
18
19from scapy.config import conf
20from scapy.themes import NoTheme, DefaultTheme, HTMLTheme2, LatexTheme2
21from scapy.error import log_scapy, Scapy_Exception
22from scapy.utils import tex_escape
23
24from typing import (
25    Any,
26    Optional,
27    TextIO,
28    Dict,
29    Tuple,
30)
31
32
33#########################
34#     Autorun stuff     #
35#########################
36
37class StopAutorun(Scapy_Exception):
38    code_run = ""
39
40
41class StopAutorunTimeout(StopAutorun):
42    pass
43
44
45class ScapyAutorunInterpreter(code.InteractiveInterpreter):
46    def __init__(self, *args, **kargs):
47        # type: (*Any, **Any) -> None
48        code.InteractiveInterpreter.__init__(self, *args, **kargs)
49
50    def write(self, data):
51        # type: (str) -> None
52        pass
53
54
55def autorun_commands(_cmds, my_globals=None, verb=None):
56    # type: (str, Optional[Dict[str, Any]], Optional[int]) -> Any
57    sv = conf.verb
58    try:
59        try:
60            if my_globals is None:
61                from scapy.main import _scapy_builtins
62                my_globals = _scapy_builtins()
63            interp = ScapyAutorunInterpreter(locals=my_globals)
64            try:
65                del builtins.__dict__["scapy_session"]["_"]
66            except KeyError:
67                pass
68            if verb is not None:
69                conf.verb = verb
70            cmd = ""
71            cmds = _cmds.splitlines()
72            cmds.append("")  # ensure we finish multi-line commands
73            cmds.reverse()
74            while True:
75                if cmd:
76                    sys.stderr.write(sys.__dict__.get("ps2", "... "))
77                else:
78                    sys.stderr.write(sys.__dict__.get("ps1", ">>> "))
79
80                line = cmds.pop()
81                print(line)
82                cmd += "\n" + line
83                sys.last_value = None
84                if interp.runsource(cmd):
85                    continue
86                if sys.last_value:  # An error occurred
87                    traceback.print_exception(sys.last_type,
88                                              sys.last_value,
89                                              sys.last_traceback.tb_next,
90                                              file=sys.stdout)
91                    sys.last_value = None
92                    return False
93                cmd = ""
94                if len(cmds) <= 1:
95                    break
96        except SystemExit:
97            pass
98    finally:
99        conf.verb = sv
100    try:
101        return builtins.__dict__["scapy_session"]["_"]
102    except KeyError:
103        return builtins.__dict__.get("_", None)
104
105
106def autorun_commands_timeout(cmds, timeout=None, **kwargs):
107    # type: (str, Optional[int], **Any) -> Any
108    """
109    Wraps autorun_commands with a timeout that raises StopAutorunTimeout
110    on expiration.
111    """
112    if timeout is None:
113        return autorun_commands(cmds, **kwargs)
114
115    q = Queue()  # type: Queue[Any]
116
117    def _runner():
118        # type: () -> None
119        q.put(autorun_commands(cmds, **kwargs))
120    th = threading.Thread(target=_runner)
121    th.daemon = True
122    th.start()
123    th.join(timeout)
124    if th.is_alive():
125        raise StopAutorunTimeout
126    return q.get()
127
128
129class StringWriter(StringIO):
130    """Util to mock sys.stdout and sys.stderr, and
131    store their output in a 's' var."""
132    def __init__(self, debug=None):
133        # type: (Optional[TextIO]) -> None
134        self.s = ""
135        self.debug = debug
136        super().__init__()
137
138    def write(self, x):
139        # type: (str) -> int
140        # Object can be in the middle of being destroyed.
141        if getattr(self, "debug", None) and self.debug:
142            self.debug.write(x)
143        if getattr(self, "s", None) is not None:
144            self.s += x
145        return len(x)
146
147    def flush(self):
148        # type: () -> None
149        if getattr(self, "debug", None) and self.debug:
150            self.debug.flush()
151
152
153def autorun_get_interactive_session(cmds, **kargs):
154    # type: (str, **Any) -> Tuple[str, Any]
155    """Create an interactive session and execute the
156    commands passed as "cmds" and return all output
157
158    :param cmds: a list of commands to run
159    :param timeout: timeout in seconds
160    :returns: (output, returned) contains both sys.stdout and sys.stderr logs
161    """
162    sstdout, sstderr, sexcepthook = sys.stdout, sys.stderr, sys.excepthook
163    sw = StringWriter()
164    h_old = log_scapy.handlers[0]
165    log_scapy.removeHandler(h_old)
166    log_scapy.addHandler(logging.StreamHandler(stream=sw))
167    try:
168        try:
169            sys.stdout = sys.stderr = sw
170            sys.excepthook = sys.__excepthook__
171            res = autorun_commands_timeout(cmds, **kargs)
172        except StopAutorun as e:
173            e.code_run = sw.s
174            raise
175    finally:
176        sys.stdout, sys.stderr, sys.excepthook = sstdout, sstderr, sexcepthook
177        log_scapy.removeHandler(log_scapy.handlers[0])
178        log_scapy.addHandler(h_old)
179    return sw.s, res
180
181
182def autorun_get_interactive_live_session(cmds, **kargs):
183    # type: (str, **Any) -> Tuple[str, Any]
184    """Create an interactive session and execute the
185    commands passed as "cmds" and return all output
186
187    :param cmds: a list of commands to run
188    :param timeout: timeout in seconds
189    :returns: (output, returned) contains both sys.stdout and sys.stderr logs
190    """
191    sstdout, sstderr = sys.stdout, sys.stderr
192    sw = StringWriter(debug=sstdout)
193    try:
194        try:
195            sys.stdout = sys.stderr = sw
196            res = autorun_commands_timeout(cmds, **kargs)
197        except StopAutorun as e:
198            e.code_run = sw.s
199            raise
200    finally:
201        sys.stdout, sys.stderr = sstdout, sstderr
202    return sw.s, res
203
204
205def autorun_get_text_interactive_session(cmds, **kargs):
206    # type: (str, **Any) -> Tuple[str, Any]
207    ct = conf.color_theme
208    try:
209        conf.color_theme = NoTheme()
210        s, res = autorun_get_interactive_session(cmds, **kargs)
211    finally:
212        conf.color_theme = ct
213    return s, res
214
215
216def autorun_get_live_interactive_session(cmds, **kargs):
217    # type: (str, **Any) -> Tuple[str, Any]
218    ct = conf.color_theme
219    try:
220        conf.color_theme = DefaultTheme()
221        s, res = autorun_get_interactive_live_session(cmds, **kargs)
222    finally:
223        conf.color_theme = ct
224    return s, res
225
226
227def autorun_get_ansi_interactive_session(cmds, **kargs):
228    # type: (str, **Any) -> Tuple[str, Any]
229    ct = conf.color_theme
230    try:
231        conf.color_theme = DefaultTheme()
232        s, res = autorun_get_interactive_session(cmds, **kargs)
233    finally:
234        conf.color_theme = ct
235    return s, res
236
237
238def autorun_get_html_interactive_session(cmds, **kargs):
239    # type: (str, **Any) -> Tuple[str, Any]
240    ct = conf.color_theme
241
242    def to_html(s):
243        # type: (str) -> str
244        return s.replace("<", "&lt;").replace(">", "&gt;").replace("#[#", "<").replace("#]#", ">")  # noqa: E501
245    try:
246        try:
247            conf.color_theme = HTMLTheme2()
248            s, res = autorun_get_interactive_session(cmds, **kargs)
249        except StopAutorun as e:
250            e.code_run = to_html(e.code_run)
251            raise
252    finally:
253        conf.color_theme = ct
254
255    return to_html(s), res
256
257
258def autorun_get_latex_interactive_session(cmds, **kargs):
259    # type: (str, **Any) -> Tuple[str, Any]
260    ct = conf.color_theme
261
262    def to_latex(s):
263        # type: (str) -> str
264        return tex_escape(s).replace("@[@", "{").replace("@]@", "}").replace("@`@", "\\")  # noqa: E501
265    try:
266        try:
267            conf.color_theme = LatexTheme2()
268            s, res = autorun_get_interactive_session(cmds, **kargs)
269        except StopAutorun as e:
270            e.code_run = to_latex(e.code_run)
271            raise
272    finally:
273        conf.color_theme = ct
274    return to_latex(s), res
275