1# SPDX-License-Identifier: GPL-2.0-only 2# This file is part of Scapy 3# See https://scapy.net/ for more information 4# Copyright (C) Philippe Biondi <phil@secdev.org> 5 6""" 7Run commands when the Scapy interpreter starts. 8""" 9 10import builtins 11import code 12from io import StringIO 13import logging 14from queue import Queue 15import sys 16import threading 17import traceback 18 19from scapy.config import conf 20from scapy.themes import NoTheme, DefaultTheme, HTMLTheme2, LatexTheme2 21from scapy.error import log_scapy, Scapy_Exception 22from scapy.utils import tex_escape 23 24from typing import ( 25 Any, 26 Optional, 27 TextIO, 28 Dict, 29 Tuple, 30) 31 32 33######################### 34# Autorun stuff # 35######################### 36 37class StopAutorun(Scapy_Exception): 38 code_run = "" 39 40 41class StopAutorunTimeout(StopAutorun): 42 pass 43 44 45class ScapyAutorunInterpreter(code.InteractiveInterpreter): 46 def __init__(self, *args, **kargs): 47 # type: (*Any, **Any) -> None 48 code.InteractiveInterpreter.__init__(self, *args, **kargs) 49 50 def write(self, data): 51 # type: (str) -> None 52 pass 53 54 55def autorun_commands(_cmds, my_globals=None, verb=None): 56 # type: (str, Optional[Dict[str, Any]], Optional[int]) -> Any 57 sv = conf.verb 58 try: 59 try: 60 if my_globals is None: 61 from scapy.main import _scapy_builtins 62 my_globals = _scapy_builtins() 63 interp = ScapyAutorunInterpreter(locals=my_globals) 64 try: 65 del builtins.__dict__["scapy_session"]["_"] 66 except KeyError: 67 pass 68 if verb is not None: 69 conf.verb = verb 70 cmd = "" 71 cmds = _cmds.splitlines() 72 cmds.append("") # ensure we finish multi-line commands 73 cmds.reverse() 74 while True: 75 if cmd: 76 sys.stderr.write(sys.__dict__.get("ps2", "... ")) 77 else: 78 sys.stderr.write(sys.__dict__.get("ps1", ">>> ")) 79 80 line = cmds.pop() 81 print(line) 82 cmd += "\n" + line 83 sys.last_value = None 84 if interp.runsource(cmd): 85 continue 86 if sys.last_value: # An error occurred 87 traceback.print_exception(sys.last_type, 88 sys.last_value, 89 sys.last_traceback.tb_next, 90 file=sys.stdout) 91 sys.last_value = None 92 return False 93 cmd = "" 94 if len(cmds) <= 1: 95 break 96 except SystemExit: 97 pass 98 finally: 99 conf.verb = sv 100 try: 101 return builtins.__dict__["scapy_session"]["_"] 102 except KeyError: 103 return builtins.__dict__.get("_", None) 104 105 106def autorun_commands_timeout(cmds, timeout=None, **kwargs): 107 # type: (str, Optional[int], **Any) -> Any 108 """ 109 Wraps autorun_commands with a timeout that raises StopAutorunTimeout 110 on expiration. 111 """ 112 if timeout is None: 113 return autorun_commands(cmds, **kwargs) 114 115 q = Queue() # type: Queue[Any] 116 117 def _runner(): 118 # type: () -> None 119 q.put(autorun_commands(cmds, **kwargs)) 120 th = threading.Thread(target=_runner) 121 th.daemon = True 122 th.start() 123 th.join(timeout) 124 if th.is_alive(): 125 raise StopAutorunTimeout 126 return q.get() 127 128 129class StringWriter(StringIO): 130 """Util to mock sys.stdout and sys.stderr, and 131 store their output in a 's' var.""" 132 def __init__(self, debug=None): 133 # type: (Optional[TextIO]) -> None 134 self.s = "" 135 self.debug = debug 136 super().__init__() 137 138 def write(self, x): 139 # type: (str) -> int 140 # Object can be in the middle of being destroyed. 141 if getattr(self, "debug", None) and self.debug: 142 self.debug.write(x) 143 if getattr(self, "s", None) is not None: 144 self.s += x 145 return len(x) 146 147 def flush(self): 148 # type: () -> None 149 if getattr(self, "debug", None) and self.debug: 150 self.debug.flush() 151 152 153def autorun_get_interactive_session(cmds, **kargs): 154 # type: (str, **Any) -> Tuple[str, Any] 155 """Create an interactive session and execute the 156 commands passed as "cmds" and return all output 157 158 :param cmds: a list of commands to run 159 :param timeout: timeout in seconds 160 :returns: (output, returned) contains both sys.stdout and sys.stderr logs 161 """ 162 sstdout, sstderr, sexcepthook = sys.stdout, sys.stderr, sys.excepthook 163 sw = StringWriter() 164 h_old = log_scapy.handlers[0] 165 log_scapy.removeHandler(h_old) 166 log_scapy.addHandler(logging.StreamHandler(stream=sw)) 167 try: 168 try: 169 sys.stdout = sys.stderr = sw 170 sys.excepthook = sys.__excepthook__ 171 res = autorun_commands_timeout(cmds, **kargs) 172 except StopAutorun as e: 173 e.code_run = sw.s 174 raise 175 finally: 176 sys.stdout, sys.stderr, sys.excepthook = sstdout, sstderr, sexcepthook 177 log_scapy.removeHandler(log_scapy.handlers[0]) 178 log_scapy.addHandler(h_old) 179 return sw.s, res 180 181 182def autorun_get_interactive_live_session(cmds, **kargs): 183 # type: (str, **Any) -> Tuple[str, Any] 184 """Create an interactive session and execute the 185 commands passed as "cmds" and return all output 186 187 :param cmds: a list of commands to run 188 :param timeout: timeout in seconds 189 :returns: (output, returned) contains both sys.stdout and sys.stderr logs 190 """ 191 sstdout, sstderr = sys.stdout, sys.stderr 192 sw = StringWriter(debug=sstdout) 193 try: 194 try: 195 sys.stdout = sys.stderr = sw 196 res = autorun_commands_timeout(cmds, **kargs) 197 except StopAutorun as e: 198 e.code_run = sw.s 199 raise 200 finally: 201 sys.stdout, sys.stderr = sstdout, sstderr 202 return sw.s, res 203 204 205def autorun_get_text_interactive_session(cmds, **kargs): 206 # type: (str, **Any) -> Tuple[str, Any] 207 ct = conf.color_theme 208 try: 209 conf.color_theme = NoTheme() 210 s, res = autorun_get_interactive_session(cmds, **kargs) 211 finally: 212 conf.color_theme = ct 213 return s, res 214 215 216def autorun_get_live_interactive_session(cmds, **kargs): 217 # type: (str, **Any) -> Tuple[str, Any] 218 ct = conf.color_theme 219 try: 220 conf.color_theme = DefaultTheme() 221 s, res = autorun_get_interactive_live_session(cmds, **kargs) 222 finally: 223 conf.color_theme = ct 224 return s, res 225 226 227def autorun_get_ansi_interactive_session(cmds, **kargs): 228 # type: (str, **Any) -> Tuple[str, Any] 229 ct = conf.color_theme 230 try: 231 conf.color_theme = DefaultTheme() 232 s, res = autorun_get_interactive_session(cmds, **kargs) 233 finally: 234 conf.color_theme = ct 235 return s, res 236 237 238def autorun_get_html_interactive_session(cmds, **kargs): 239 # type: (str, **Any) -> Tuple[str, Any] 240 ct = conf.color_theme 241 242 def to_html(s): 243 # type: (str) -> str 244 return s.replace("<", "<").replace(">", ">").replace("#[#", "<").replace("#]#", ">") # noqa: E501 245 try: 246 try: 247 conf.color_theme = HTMLTheme2() 248 s, res = autorun_get_interactive_session(cmds, **kargs) 249 except StopAutorun as e: 250 e.code_run = to_html(e.code_run) 251 raise 252 finally: 253 conf.color_theme = ct 254 255 return to_html(s), res 256 257 258def autorun_get_latex_interactive_session(cmds, **kargs): 259 # type: (str, **Any) -> Tuple[str, Any] 260 ct = conf.color_theme 261 262 def to_latex(s): 263 # type: (str) -> str 264 return tex_escape(s).replace("@[@", "{").replace("@]@", "}").replace("@`@", "\\") # noqa: E501 265 try: 266 try: 267 conf.color_theme = LatexTheme2() 268 s, res = autorun_get_interactive_session(cmds, **kargs) 269 except StopAutorun as e: 270 e.code_run = to_latex(e.code_run) 271 raise 272 finally: 273 conf.color_theme = ct 274 return to_latex(s), res 275