1import os 2from code import InteractiveConsole 3from functools import partial 4from typing import Iterable 5from unittest.mock import MagicMock 6 7from _pyrepl.console import Console, Event 8from _pyrepl.readline import ReadlineAlikeReader, ReadlineConfig 9from _pyrepl.simple_interact import _strip_final_indent 10 11 12def multiline_input(reader: ReadlineAlikeReader, namespace: dict | None = None): 13 saved = reader.more_lines 14 try: 15 reader.more_lines = partial(more_lines, namespace=namespace) 16 reader.ps1 = reader.ps2 = ">>>" 17 reader.ps3 = reader.ps4 = "..." 18 return reader.readline() 19 finally: 20 reader.more_lines = saved 21 reader.paste_mode = False 22 23 24def more_lines(text: str, namespace: dict | None = None): 25 if namespace is None: 26 namespace = {} 27 src = _strip_final_indent(text) 28 console = InteractiveConsole(namespace, filename="<stdin>") 29 try: 30 code = console.compile(src, "<stdin>", "single") 31 except (OverflowError, SyntaxError, ValueError): 32 return False 33 else: 34 return code is None 35 36 37def code_to_events(code: str): 38 for c in code: 39 yield Event(evt="key", data=c, raw=bytearray(c.encode("utf-8"))) 40 41 42def clean_screen(screen: Iterable[str]): 43 """Cleans color and console characters out of a screen output. 44 45 This is useful for screen testing, it increases the test readability since 46 it strips out all the unreadable side of the screen. 47 """ 48 output = [] 49 for line in screen: 50 if line.startswith(">>>") or line.startswith("..."): 51 line = line[3:] 52 output.append(line) 53 return "\n".join(output).strip() 54 55 56def prepare_reader(console: Console, **kwargs): 57 config = ReadlineConfig(readline_completer=kwargs.pop("readline_completer", None)) 58 reader = ReadlineAlikeReader(console=console, config=config) 59 reader.more_lines = partial(more_lines, namespace=None) 60 reader.paste_mode = True # Avoid extra indents 61 62 def get_prompt(lineno, cursor_on_line) -> str: 63 return "" 64 65 reader.get_prompt = get_prompt # Remove prompt for easier calculations of (x, y) 66 67 for key, val in kwargs.items(): 68 setattr(reader, key, val) 69 70 return reader 71 72 73def prepare_console(events: Iterable[Event], **kwargs) -> MagicMock | Console: 74 console = MagicMock() 75 console.get_event.side_effect = events 76 console.height = 100 77 console.width = 80 78 for key, val in kwargs.items(): 79 setattr(console, key, val) 80 return console 81 82 83def handle_all_events( 84 events, prepare_console=prepare_console, prepare_reader=prepare_reader 85): 86 console = prepare_console(events) 87 reader = prepare_reader(console) 88 try: 89 while True: 90 reader.handle1() 91 except StopIteration: 92 pass 93 except KeyboardInterrupt: 94 pass 95 return reader, console 96 97 98handle_events_narrow_console = partial( 99 handle_all_events, 100 prepare_console=partial(prepare_console, width=10), 101) 102 103 104def make_clean_env() -> dict[str, str]: 105 clean_env = os.environ.copy() 106 for k in clean_env.copy(): 107 if k.startswith("PYTHON"): 108 clean_env.pop(k) 109 clean_env.pop("FORCE_COLOR", None) 110 clean_env.pop("NO_COLOR", None) 111 return clean_env 112 113 114class FakeConsole(Console): 115 def __init__(self, events, encoding="utf-8") -> None: 116 self.events = iter(events) 117 self.encoding = encoding 118 self.screen = [] 119 self.height = 100 120 self.width = 80 121 122 def get_event(self, block: bool = True) -> Event | None: 123 return next(self.events) 124 125 def getpending(self) -> Event: 126 return self.get_event(block=False) 127 128 def getheightwidth(self) -> tuple[int, int]: 129 return self.height, self.width 130 131 def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: 132 pass 133 134 def prepare(self) -> None: 135 pass 136 137 def restore(self) -> None: 138 pass 139 140 def move_cursor(self, x: int, y: int) -> None: 141 pass 142 143 def set_cursor_vis(self, visible: bool) -> None: 144 pass 145 146 def push_char(self, char: int | bytes) -> None: 147 pass 148 149 def beep(self) -> None: 150 pass 151 152 def clear(self) -> None: 153 pass 154 155 def finish(self) -> None: 156 pass 157 158 def flushoutput(self) -> None: 159 pass 160 161 def forgetinput(self) -> None: 162 pass 163 164 def wait(self, timeout: float | None = None) -> bool: 165 return True 166 167 def repaint(self) -> None: 168 pass 169