1# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com> 2# Alex Gaynor 3# Antonio Cuni 4# Armin Rigo 5# Holger Krekel 6# 7# All Rights Reserved 8# 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose is hereby granted without fee, 12# provided that the above copyright notice appear in all copies and 13# that both that copyright notice and this permission notice appear in 14# supporting documentation. 15# 16# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO 17# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 18# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, 19# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER 20# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF 21# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 22# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 23 24"""A compatibility wrapper reimplementing the 'readline' standard module 25on top of pyrepl. Not all functionalities are supported. Contains 26extensions for multiline input. 27""" 28 29from __future__ import annotations 30 31import warnings 32from dataclasses import dataclass, field 33 34import os 35from site import gethistoryfile # type: ignore[attr-defined] 36import sys 37from rlcompleter import Completer as RLCompleter 38 39from . import commands, historical_reader 40from .completing_reader import CompletingReader 41from .console import Console as ConsoleType 42 43Console: type[ConsoleType] 44_error: tuple[type[Exception], ...] | type[Exception] 45try: 46 from .unix_console import UnixConsole as Console, _error 47except ImportError: 48 from .windows_console import WindowsConsole as Console, _error 49 50ENCODING = sys.getdefaultencoding() or "latin1" 51 52 53# types 54Command = commands.Command 55from collections.abc import Callable, Collection 56from .types import Callback, Completer, KeySpec, CommandName 57 58TYPE_CHECKING = False 59 60if TYPE_CHECKING: 61 from typing import Any, Mapping 62 63 64MoreLinesCallable = Callable[[str], bool] 65 66 67__all__ = [ 68 "add_history", 69 "clear_history", 70 "get_begidx", 71 "get_completer", 72 "get_completer_delims", 73 "get_current_history_length", 74 "get_endidx", 75 "get_history_item", 76 "get_history_length", 77 "get_line_buffer", 78 "insert_text", 79 "parse_and_bind", 80 "read_history_file", 81 # "read_init_file", 82 # "redisplay", 83 "remove_history_item", 84 "replace_history_item", 85 "set_auto_history", 86 "set_completer", 87 "set_completer_delims", 88 "set_history_length", 89 # "set_pre_input_hook", 90 "set_startup_hook", 91 "write_history_file", 92 # ---- multiline extensions ---- 93 "multiline_input", 94] 95 96# ____________________________________________________________ 97 98@dataclass 99class ReadlineConfig: 100 readline_completer: Completer | None = None 101 completer_delims: frozenset[str] = frozenset(" \t\n`~!@#$%^&*()-=+[{]}\\|;:'\",<>/?") 102 103 104@dataclass(kw_only=True) 105class ReadlineAlikeReader(historical_reader.HistoricalReader, CompletingReader): 106 # Class fields 107 assume_immutable_completions = False 108 use_brackets = False 109 sort_in_column = True 110 111 # Instance fields 112 config: ReadlineConfig 113 more_lines: MoreLinesCallable | None = None 114 last_used_indentation: str | None = None 115 116 def __post_init__(self) -> None: 117 super().__post_init__() 118 self.commands["maybe_accept"] = maybe_accept 119 self.commands["maybe-accept"] = maybe_accept 120 self.commands["backspace_dedent"] = backspace_dedent 121 self.commands["backspace-dedent"] = backspace_dedent 122 123 def error(self, msg: str = "none") -> None: 124 pass # don't show error messages by default 125 126 def get_stem(self) -> str: 127 b = self.buffer 128 p = self.pos - 1 129 completer_delims = self.config.completer_delims 130 while p >= 0 and b[p] not in completer_delims: 131 p -= 1 132 return "".join(b[p + 1 : self.pos]) 133 134 def get_completions(self, stem: str) -> list[str]: 135 if len(stem) == 0 and self.more_lines is not None: 136 b = self.buffer 137 p = self.pos 138 while p > 0 and b[p - 1] != "\n": 139 p -= 1 140 num_spaces = 4 - ((self.pos - p) % 4) 141 return [" " * num_spaces] 142 result = [] 143 function = self.config.readline_completer 144 if function is not None: 145 try: 146 stem = str(stem) # rlcompleter.py seems to not like unicode 147 except UnicodeEncodeError: 148 pass # but feed unicode anyway if we have no choice 149 state = 0 150 while True: 151 try: 152 next = function(stem, state) 153 except Exception: 154 break 155 if not isinstance(next, str): 156 break 157 result.append(next) 158 state += 1 159 # emulate the behavior of the standard readline that sorts 160 # the completions before displaying them. 161 result.sort() 162 return result 163 164 def get_trimmed_history(self, maxlength: int) -> list[str]: 165 if maxlength >= 0: 166 cut = len(self.history) - maxlength 167 if cut < 0: 168 cut = 0 169 else: 170 cut = 0 171 return self.history[cut:] 172 173 def update_last_used_indentation(self) -> None: 174 indentation = _get_first_indentation(self.buffer) 175 if indentation is not None: 176 self.last_used_indentation = indentation 177 178 # --- simplified support for reading multiline Python statements --- 179 180 def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: 181 return super().collect_keymap() + ( 182 (r"\n", "maybe-accept"), 183 (r"\<backspace>", "backspace-dedent"), 184 ) 185 186 def after_command(self, cmd: Command) -> None: 187 super().after_command(cmd) 188 if self.more_lines is None: 189 # Force single-line input if we are in raw_input() mode. 190 # Although there is no direct way to add a \n in this mode, 191 # multiline buffers can still show up using various 192 # commands, e.g. navigating the history. 193 try: 194 index = self.buffer.index("\n") 195 except ValueError: 196 pass 197 else: 198 self.buffer = self.buffer[:index] 199 if self.pos > len(self.buffer): 200 self.pos = len(self.buffer) 201 202 203def set_auto_history(_should_auto_add_history: bool) -> None: 204 """Enable or disable automatic history""" 205 historical_reader.should_auto_add_history = bool(_should_auto_add_history) 206 207 208def _get_this_line_indent(buffer: list[str], pos: int) -> int: 209 indent = 0 210 while pos > 0 and buffer[pos - 1] in " \t": 211 indent += 1 212 pos -= 1 213 if pos > 0 and buffer[pos - 1] == "\n": 214 return indent 215 return 0 216 217 218def _get_previous_line_indent(buffer: list[str], pos: int) -> tuple[int, int | None]: 219 prevlinestart = pos 220 while prevlinestart > 0 and buffer[prevlinestart - 1] != "\n": 221 prevlinestart -= 1 222 prevlinetext = prevlinestart 223 while prevlinetext < pos and buffer[prevlinetext] in " \t": 224 prevlinetext += 1 225 if prevlinetext == pos: 226 indent = None 227 else: 228 indent = prevlinetext - prevlinestart 229 return prevlinestart, indent 230 231 232def _get_first_indentation(buffer: list[str]) -> str | None: 233 indented_line_start = None 234 for i in range(len(buffer)): 235 if (i < len(buffer) - 1 236 and buffer[i] == "\n" 237 and buffer[i + 1] in " \t" 238 ): 239 indented_line_start = i + 1 240 elif indented_line_start is not None and buffer[i] not in " \t\n": 241 return ''.join(buffer[indented_line_start : i]) 242 return None 243 244 245def _should_auto_indent(buffer: list[str], pos: int) -> bool: 246 # check if last character before "pos" is a colon, ignoring 247 # whitespaces and comments. 248 last_char = None 249 while pos > 0: 250 pos -= 1 251 if last_char is None: 252 if buffer[pos] not in " \t\n#": # ignore whitespaces and comments 253 last_char = buffer[pos] 254 else: 255 # even if we found a non-whitespace character before 256 # original pos, we keep going back until newline is reached 257 # to make sure we ignore comments 258 if buffer[pos] == "\n": 259 break 260 if buffer[pos] == "#": 261 last_char = None 262 return last_char == ":" 263 264 265class maybe_accept(commands.Command): 266 def do(self) -> None: 267 r: ReadlineAlikeReader 268 r = self.reader # type: ignore[assignment] 269 r.dirty = True # this is needed to hide the completion menu, if visible 270 271 if self.reader.in_bracketed_paste: 272 r.insert("\n") 273 return 274 275 # if there are already several lines and the cursor 276 # is not on the last one, always insert a new \n. 277 text = r.get_unicode() 278 279 if "\n" in r.buffer[r.pos :] or ( 280 r.more_lines is not None and r.more_lines(text) 281 ): 282 def _newline_before_pos(): 283 before_idx = r.pos - 1 284 while before_idx > 0 and text[before_idx].isspace(): 285 before_idx -= 1 286 return text[before_idx : r.pos].count("\n") > 0 287 288 # if there's already a new line before the cursor then 289 # even if the cursor is followed by whitespace, we assume 290 # the user is trying to terminate the block 291 if _newline_before_pos() and text[r.pos:].isspace(): 292 self.finish = True 293 return 294 295 # auto-indent the next line like the previous line 296 prevlinestart, indent = _get_previous_line_indent(r.buffer, r.pos) 297 r.insert("\n") 298 if not self.reader.paste_mode: 299 if indent: 300 for i in range(prevlinestart, prevlinestart + indent): 301 r.insert(r.buffer[i]) 302 r.update_last_used_indentation() 303 if _should_auto_indent(r.buffer, r.pos): 304 if r.last_used_indentation is not None: 305 indentation = r.last_used_indentation 306 else: 307 # default 308 indentation = " " * 4 309 r.insert(indentation) 310 elif not self.reader.paste_mode: 311 self.finish = True 312 else: 313 r.insert("\n") 314 315 316class backspace_dedent(commands.Command): 317 def do(self) -> None: 318 r = self.reader 319 b = r.buffer 320 if r.pos > 0: 321 repeat = 1 322 if b[r.pos - 1] != "\n": 323 indent = _get_this_line_indent(b, r.pos) 324 if indent > 0: 325 ls = r.pos - indent 326 while ls > 0: 327 ls, pi = _get_previous_line_indent(b, ls - 1) 328 if pi is not None and pi < indent: 329 repeat = indent - pi 330 break 331 r.pos -= repeat 332 del b[r.pos : r.pos + repeat] 333 r.dirty = True 334 else: 335 self.reader.error("can't backspace at start") 336 337 338# ____________________________________________________________ 339 340 341@dataclass(slots=True) 342class _ReadlineWrapper: 343 f_in: int = -1 344 f_out: int = -1 345 reader: ReadlineAlikeReader | None = field(default=None, repr=False) 346 saved_history_length: int = -1 347 startup_hook: Callback | None = None 348 config: ReadlineConfig = field(default_factory=ReadlineConfig, repr=False) 349 350 def __post_init__(self) -> None: 351 if self.f_in == -1: 352 self.f_in = os.dup(0) 353 if self.f_out == -1: 354 self.f_out = os.dup(1) 355 356 def get_reader(self) -> ReadlineAlikeReader: 357 if self.reader is None: 358 console = Console(self.f_in, self.f_out, encoding=ENCODING) 359 self.reader = ReadlineAlikeReader(console=console, config=self.config) 360 return self.reader 361 362 def input(self, prompt: object = "") -> str: 363 try: 364 reader = self.get_reader() 365 except _error: 366 assert raw_input is not None 367 return raw_input(prompt) 368 prompt_str = str(prompt) 369 reader.ps1 = prompt_str 370 sys.audit("builtins.input", prompt_str) 371 result = reader.readline(startup_hook=self.startup_hook) 372 sys.audit("builtins.input/result", result) 373 return result 374 375 def multiline_input(self, more_lines: MoreLinesCallable, ps1: str, ps2: str) -> str: 376 """Read an input on possibly multiple lines, asking for more 377 lines as long as 'more_lines(unicodetext)' returns an object whose 378 boolean value is true. 379 """ 380 reader = self.get_reader() 381 saved = reader.more_lines 382 try: 383 reader.more_lines = more_lines 384 reader.ps1 = ps1 385 reader.ps2 = ps1 386 reader.ps3 = ps2 387 reader.ps4 = "" 388 with warnings.catch_warnings(action="ignore"): 389 return reader.readline() 390 finally: 391 reader.more_lines = saved 392 reader.paste_mode = False 393 394 def parse_and_bind(self, string: str) -> None: 395 pass # XXX we don't support parsing GNU-readline-style init files 396 397 def set_completer(self, function: Completer | None = None) -> None: 398 self.config.readline_completer = function 399 400 def get_completer(self) -> Completer | None: 401 return self.config.readline_completer 402 403 def set_completer_delims(self, delimiters: Collection[str]) -> None: 404 self.config.completer_delims = frozenset(delimiters) 405 406 def get_completer_delims(self) -> str: 407 return "".join(sorted(self.config.completer_delims)) 408 409 def _histline(self, line: str) -> str: 410 line = line.rstrip("\n") 411 return line 412 413 def get_history_length(self) -> int: 414 return self.saved_history_length 415 416 def set_history_length(self, length: int) -> None: 417 self.saved_history_length = length 418 419 def get_current_history_length(self) -> int: 420 return len(self.get_reader().history) 421 422 def read_history_file(self, filename: str = gethistoryfile()) -> None: 423 # multiline extension (really a hack) for the end of lines that 424 # are actually continuations inside a single multiline_input() 425 # history item: we use \r\n instead of just \n. If the history 426 # file is passed to GNU readline, the extra \r are just ignored. 427 history = self.get_reader().history 428 429 with open(os.path.expanduser(filename), 'rb') as f: 430 is_editline = f.readline().startswith(b"_HiStOrY_V2_") 431 if is_editline: 432 encoding = "unicode-escape" 433 else: 434 f.seek(0) 435 encoding = "utf-8" 436 437 lines = [line.decode(encoding, errors='replace') for line in f.read().split(b'\n')] 438 buffer = [] 439 for line in lines: 440 if line.endswith("\r"): 441 buffer.append(line+'\n') 442 else: 443 line = self._histline(line) 444 if buffer: 445 line = self._histline("".join(buffer).replace("\r", "") + line) 446 del buffer[:] 447 if line: 448 history.append(line) 449 450 def write_history_file(self, filename: str = gethistoryfile()) -> None: 451 maxlength = self.saved_history_length 452 history = self.get_reader().get_trimmed_history(maxlength) 453 f = open(os.path.expanduser(filename), "w", 454 encoding="utf-8", newline="\n") 455 with f: 456 for entry in history: 457 entry = entry.replace("\n", "\r\n") # multiline history support 458 f.write(entry + "\n") 459 460 def clear_history(self) -> None: 461 del self.get_reader().history[:] 462 463 def get_history_item(self, index: int) -> str | None: 464 history = self.get_reader().history 465 if 1 <= index <= len(history): 466 return history[index - 1] 467 else: 468 return None # like readline.c 469 470 def remove_history_item(self, index: int) -> None: 471 history = self.get_reader().history 472 if 0 <= index < len(history): 473 del history[index] 474 else: 475 raise ValueError("No history item at position %d" % index) 476 # like readline.c 477 478 def replace_history_item(self, index: int, line: str) -> None: 479 history = self.get_reader().history 480 if 0 <= index < len(history): 481 history[index] = self._histline(line) 482 else: 483 raise ValueError("No history item at position %d" % index) 484 # like readline.c 485 486 def add_history(self, line: str) -> None: 487 self.get_reader().history.append(self._histline(line)) 488 489 def set_startup_hook(self, function: Callback | None = None) -> None: 490 self.startup_hook = function 491 492 def get_line_buffer(self) -> str: 493 return self.get_reader().get_unicode() 494 495 def _get_idxs(self) -> tuple[int, int]: 496 start = cursor = self.get_reader().pos 497 buf = self.get_line_buffer() 498 for i in range(cursor - 1, -1, -1): 499 if buf[i] in self.get_completer_delims(): 500 break 501 start = i 502 return start, cursor 503 504 def get_begidx(self) -> int: 505 return self._get_idxs()[0] 506 507 def get_endidx(self) -> int: 508 return self._get_idxs()[1] 509 510 def insert_text(self, text: str) -> None: 511 self.get_reader().insert(text) 512 513 514_wrapper = _ReadlineWrapper() 515 516# ____________________________________________________________ 517# Public API 518 519parse_and_bind = _wrapper.parse_and_bind 520set_completer = _wrapper.set_completer 521get_completer = _wrapper.get_completer 522set_completer_delims = _wrapper.set_completer_delims 523get_completer_delims = _wrapper.get_completer_delims 524get_history_length = _wrapper.get_history_length 525set_history_length = _wrapper.set_history_length 526get_current_history_length = _wrapper.get_current_history_length 527read_history_file = _wrapper.read_history_file 528write_history_file = _wrapper.write_history_file 529clear_history = _wrapper.clear_history 530get_history_item = _wrapper.get_history_item 531remove_history_item = _wrapper.remove_history_item 532replace_history_item = _wrapper.replace_history_item 533add_history = _wrapper.add_history 534set_startup_hook = _wrapper.set_startup_hook 535get_line_buffer = _wrapper.get_line_buffer 536get_begidx = _wrapper.get_begidx 537get_endidx = _wrapper.get_endidx 538insert_text = _wrapper.insert_text 539 540# Extension 541multiline_input = _wrapper.multiline_input 542 543# Internal hook 544_get_reader = _wrapper.get_reader 545 546# ____________________________________________________________ 547# Stubs 548 549 550def _make_stub(_name: str, _ret: object) -> None: 551 def stub(*args: object, **kwds: object) -> None: 552 import warnings 553 554 warnings.warn("readline.%s() not implemented" % _name, stacklevel=2) 555 556 stub.__name__ = _name 557 globals()[_name] = stub 558 559 560for _name, _ret in [ 561 ("read_init_file", None), 562 ("redisplay", None), 563 ("set_pre_input_hook", None), 564]: 565 assert _name not in globals(), _name 566 _make_stub(_name, _ret) 567 568# ____________________________________________________________ 569 570 571def _setup(namespace: Mapping[str, Any]) -> None: 572 global raw_input 573 if raw_input is not None: 574 return # don't run _setup twice 575 576 try: 577 f_in = sys.stdin.fileno() 578 f_out = sys.stdout.fileno() 579 except (AttributeError, ValueError): 580 return 581 if not os.isatty(f_in) or not os.isatty(f_out): 582 return 583 584 _wrapper.f_in = f_in 585 _wrapper.f_out = f_out 586 587 # set up namespace in rlcompleter, which requires it to be a bona fide dict 588 if not isinstance(namespace, dict): 589 namespace = dict(namespace) 590 _wrapper.config.readline_completer = RLCompleter(namespace).complete 591 592 # this is not really what readline.c does. Better than nothing I guess 593 import builtins 594 raw_input = builtins.input 595 builtins.input = _wrapper.input 596 597 598raw_input: Callable[[object], str] | None = None 599