1# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com> 2# Antonio Cuni 3# Armin Rigo 4# 5# All Rights Reserved 6# 7# 8# Permission to use, copy, modify, and distribute this software and 9# its documentation for any purpose is hereby granted without fee, 10# provided that the above copyright notice appear in all copies and 11# that both that copyright notice and this permission notice appear in 12# supporting documentation. 13# 14# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO 15# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 16# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, 17# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER 18# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF 19# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 20# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 21 22from __future__ import annotations 23 24import sys 25 26from contextlib import contextmanager 27from dataclasses import dataclass, field, fields 28import unicodedata 29from _colorize import can_colorize, ANSIColors # type: ignore[import-not-found] 30 31 32from . import commands, console, input 33from .utils import ANSI_ESCAPE_SEQUENCE, wlen, str_width 34from .trace import trace 35 36 37# types 38Command = commands.Command 39from .types import Callback, SimpleContextManager, KeySpec, CommandName 40 41 42def disp_str(buffer: str) -> tuple[str, list[int]]: 43 """disp_str(buffer:string) -> (string, [int]) 44 45 Return the string that should be the printed representation of 46 |buffer| and a list detailing where the characters of |buffer| 47 get used up. E.g.: 48 49 >>> disp_str(chr(3)) 50 ('^C', [1, 0]) 51 52 """ 53 b: list[int] = [] 54 s: list[str] = [] 55 for c in buffer: 56 if c == '\x1a': 57 s.append(c) 58 b.append(2) 59 elif ord(c) < 128: 60 s.append(c) 61 b.append(1) 62 elif unicodedata.category(c).startswith("C"): 63 c = r"\u%04x" % ord(c) 64 s.append(c) 65 b.extend([0] * (len(c) - 1)) 66 else: 67 s.append(c) 68 b.append(str_width(c)) 69 return "".join(s), b 70 71 72# syntax classes: 73 74SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3) 75 76 77def make_default_syntax_table() -> dict[str, int]: 78 # XXX perhaps should use some unicodedata here? 79 st: dict[str, int] = {} 80 for c in map(chr, range(256)): 81 st[c] = SYNTAX_SYMBOL 82 for c in [a for a in map(chr, range(256)) if a.isalnum()]: 83 st[c] = SYNTAX_WORD 84 st["\n"] = st[" "] = SYNTAX_WHITESPACE 85 return st 86 87 88def make_default_commands() -> dict[CommandName, type[Command]]: 89 result: dict[CommandName, type[Command]] = {} 90 for v in vars(commands).values(): 91 if isinstance(v, type) and issubclass(v, Command) and v.__name__[0].islower(): 92 result[v.__name__] = v 93 result[v.__name__.replace("_", "-")] = v 94 return result 95 96 97default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple( 98 [ 99 (r"\C-a", "beginning-of-line"), 100 (r"\C-b", "left"), 101 (r"\C-c", "interrupt"), 102 (r"\C-d", "delete"), 103 (r"\C-e", "end-of-line"), 104 (r"\C-f", "right"), 105 (r"\C-g", "cancel"), 106 (r"\C-h", "backspace"), 107 (r"\C-j", "accept"), 108 (r"\<return>", "accept"), 109 (r"\C-k", "kill-line"), 110 (r"\C-l", "clear-screen"), 111 (r"\C-m", "accept"), 112 (r"\C-t", "transpose-characters"), 113 (r"\C-u", "unix-line-discard"), 114 (r"\C-w", "unix-word-rubout"), 115 (r"\C-x\C-u", "upcase-region"), 116 (r"\C-y", "yank"), 117 *(() if sys.platform == "win32" else ((r"\C-z", "suspend"), )), 118 (r"\M-b", "backward-word"), 119 (r"\M-c", "capitalize-word"), 120 (r"\M-d", "kill-word"), 121 (r"\M-f", "forward-word"), 122 (r"\M-l", "downcase-word"), 123 (r"\M-t", "transpose-words"), 124 (r"\M-u", "upcase-word"), 125 (r"\M-y", "yank-pop"), 126 (r"\M--", "digit-arg"), 127 (r"\M-0", "digit-arg"), 128 (r"\M-1", "digit-arg"), 129 (r"\M-2", "digit-arg"), 130 (r"\M-3", "digit-arg"), 131 (r"\M-4", "digit-arg"), 132 (r"\M-5", "digit-arg"), 133 (r"\M-6", "digit-arg"), 134 (r"\M-7", "digit-arg"), 135 (r"\M-8", "digit-arg"), 136 (r"\M-9", "digit-arg"), 137 (r"\M-\n", "accept"), 138 ("\\\\", "self-insert"), 139 (r"\x1b[200~", "enable_bracketed_paste"), 140 (r"\x1b[201~", "disable_bracketed_paste"), 141 (r"\x03", "ctrl-c"), 142 ] 143 + [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"] 144 + [(c, "self-insert") for c in map(chr, range(128, 256)) if c.isalpha()] 145 + [ 146 (r"\<up>", "up"), 147 (r"\<down>", "down"), 148 (r"\<left>", "left"), 149 (r"\C-\<left>", "backward-word"), 150 (r"\<right>", "right"), 151 (r"\C-\<right>", "forward-word"), 152 (r"\<delete>", "delete"), 153 (r"\x1b[3~", "delete"), 154 (r"\<backspace>", "backspace"), 155 (r"\M-\<backspace>", "backward-kill-word"), 156 (r"\<end>", "end-of-line"), # was 'end' 157 (r"\<home>", "beginning-of-line"), # was 'home' 158 (r"\<f1>", "help"), 159 (r"\<f2>", "show-history"), 160 (r"\<f3>", "paste-mode"), 161 (r"\EOF", "end"), # the entries in the terminfo database for xterms 162 (r"\EOH", "home"), # seem to be wrong. this is a less than ideal 163 # workaround 164 ] 165) 166 167 168@dataclass(slots=True) 169class Reader: 170 """The Reader class implements the bare bones of a command reader, 171 handling such details as editing and cursor motion. What it does 172 not support are such things as completion or history support - 173 these are implemented elsewhere. 174 175 Instance variables of note include: 176 177 * buffer: 178 A *list* (*not* a string at the moment :-) containing all the 179 characters that have been entered. 180 * console: 181 Hopefully encapsulates the OS dependent stuff. 182 * pos: 183 A 0-based index into `buffer' for where the insertion point 184 is. 185 * screeninfo: 186 Ahem. This list contains some info needed to move the 187 insertion point around reasonably efficiently. 188 * cxy, lxy: 189 the position of the insertion point in screen ... 190 * syntax_table: 191 Dictionary mapping characters to `syntax class'; read the 192 emacs docs to see what this means :-) 193 * commands: 194 Dictionary mapping command names to command classes. 195 * arg: 196 The emacs-style prefix argument. It will be None if no such 197 argument has been provided. 198 * dirty: 199 True if we need to refresh the display. 200 * kill_ring: 201 The emacs-style kill-ring; manipulated with yank & yank-pop 202 * ps1, ps2, ps3, ps4: 203 prompts. ps1 is the prompt for a one-line input; for a 204 multiline input it looks like: 205 ps2> first line of input goes here 206 ps3> second and further 207 ps3> lines get ps3 208 ... 209 ps4> and the last one gets ps4 210 As with the usual top-level, you can set these to instances if 211 you like; str() will be called on them (once) at the beginning 212 of each command. Don't put really long or newline containing 213 strings here, please! 214 This is just the default policy; you can change it freely by 215 overriding get_prompt() (and indeed some standard subclasses 216 do). 217 * finished: 218 handle1 will set this to a true value if a command signals 219 that we're done. 220 """ 221 222 console: console.Console 223 224 ## state 225 buffer: list[str] = field(default_factory=list) 226 pos: int = 0 227 ps1: str = "->> " 228 ps2: str = "/>> " 229 ps3: str = "|.. " 230 ps4: str = R"\__ " 231 kill_ring: list[list[str]] = field(default_factory=list) 232 msg: str = "" 233 arg: int | None = None 234 dirty: bool = False 235 finished: bool = False 236 paste_mode: bool = False 237 in_bracketed_paste: bool = False 238 commands: dict[str, type[Command]] = field(default_factory=make_default_commands) 239 last_command: type[Command] | None = None 240 syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table) 241 keymap: tuple[tuple[str, str], ...] = () 242 input_trans: input.KeymapTranslator = field(init=False) 243 input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list) 244 screen: list[str] = field(default_factory=list) 245 screeninfo: list[tuple[int, list[int]]] = field(init=False) 246 cxy: tuple[int, int] = field(init=False) 247 lxy: tuple[int, int] = field(init=False) 248 scheduled_commands: list[str] = field(default_factory=list) 249 can_colorize: bool = False 250 threading_hook: Callback | None = None 251 252 ## cached metadata to speed up screen refreshes 253 @dataclass 254 class RefreshCache: 255 in_bracketed_paste: bool = False 256 screen: list[str] = field(default_factory=list) 257 screeninfo: list[tuple[int, list[int]]] = field(init=False) 258 line_end_offsets: list[int] = field(default_factory=list) 259 pos: int = field(init=False) 260 cxy: tuple[int, int] = field(init=False) 261 dimensions: tuple[int, int] = field(init=False) 262 invalidated: bool = False 263 264 def update_cache(self, 265 reader: Reader, 266 screen: list[str], 267 screeninfo: list[tuple[int, list[int]]], 268 ) -> None: 269 self.in_bracketed_paste = reader.in_bracketed_paste 270 self.screen = screen.copy() 271 self.screeninfo = screeninfo.copy() 272 self.pos = reader.pos 273 self.cxy = reader.cxy 274 self.dimensions = reader.console.width, reader.console.height 275 self.invalidated = False 276 277 def valid(self, reader: Reader) -> bool: 278 if self.invalidated: 279 return False 280 dimensions = reader.console.width, reader.console.height 281 dimensions_changed = dimensions != self.dimensions 282 paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste 283 return not (dimensions_changed or paste_changed) 284 285 def get_cached_location(self, reader: Reader) -> tuple[int, int]: 286 if self.invalidated: 287 raise ValueError("Cache is invalidated") 288 offset = 0 289 earliest_common_pos = min(reader.pos, self.pos) 290 num_common_lines = len(self.line_end_offsets) 291 while num_common_lines > 0: 292 offset = self.line_end_offsets[num_common_lines - 1] 293 if earliest_common_pos > offset: 294 break 295 num_common_lines -= 1 296 else: 297 offset = 0 298 return offset, num_common_lines 299 300 last_refresh_cache: RefreshCache = field(default_factory=RefreshCache) 301 302 def __post_init__(self) -> None: 303 # Enable the use of `insert` without a `prepare` call - necessary to 304 # facilitate the tab completion hack implemented for 305 # <https://bugs.python.org/issue25660>. 306 self.keymap = self.collect_keymap() 307 self.input_trans = input.KeymapTranslator( 308 self.keymap, invalid_cls="invalid-key", character_cls="self-insert" 309 ) 310 self.screeninfo = [(0, [])] 311 self.cxy = self.pos2xy() 312 self.lxy = (self.pos, 0) 313 self.can_colorize = can_colorize() 314 315 self.last_refresh_cache.screeninfo = self.screeninfo 316 self.last_refresh_cache.pos = self.pos 317 self.last_refresh_cache.cxy = self.cxy 318 self.last_refresh_cache.dimensions = (0, 0) 319 320 def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]: 321 return default_keymap 322 323 def calc_screen(self) -> list[str]: 324 """Translate changes in self.buffer into changes in self.console.screen.""" 325 # Since the last call to calc_screen: 326 # screen and screeninfo may differ due to a completion menu being shown 327 # pos and cxy may differ due to edits, cursor movements, or completion menus 328 329 # Lines that are above both the old and new cursor position can't have changed, 330 # unless the terminal has been resized (which might cause reflowing) or we've 331 # entered or left paste mode (which changes prompts, causing reflowing). 332 num_common_lines = 0 333 offset = 0 334 if self.last_refresh_cache.valid(self): 335 offset, num_common_lines = self.last_refresh_cache.get_cached_location(self) 336 337 screen = self.last_refresh_cache.screen 338 del screen[num_common_lines:] 339 340 screeninfo = self.last_refresh_cache.screeninfo 341 del screeninfo[num_common_lines:] 342 343 last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets 344 del last_refresh_line_end_offsets[num_common_lines:] 345 346 pos = self.pos 347 pos -= offset 348 349 prompt_from_cache = (offset and self.buffer[offset - 1] != "\n") 350 351 lines = "".join(self.buffer[offset:]).split("\n") 352 353 cursor_found = False 354 lines_beyond_cursor = 0 355 for ln, line in enumerate(lines, num_common_lines): 356 ll = len(line) 357 if 0 <= pos <= ll: 358 self.lxy = pos, ln 359 cursor_found = True 360 elif cursor_found: 361 lines_beyond_cursor += 1 362 if lines_beyond_cursor > self.console.height: 363 # No need to keep formatting lines. 364 # The console can't show them. 365 break 366 if prompt_from_cache: 367 # Only the first line's prompt can come from the cache 368 prompt_from_cache = False 369 prompt = "" 370 else: 371 prompt = self.get_prompt(ln, ll >= pos >= 0) 372 while "\n" in prompt: 373 pre_prompt, _, prompt = prompt.partition("\n") 374 last_refresh_line_end_offsets.append(offset) 375 screen.append(pre_prompt) 376 screeninfo.append((0, [])) 377 pos -= ll + 1 378 prompt, lp = self.process_prompt(prompt) 379 l, l2 = disp_str(line) 380 wrapcount = (wlen(l) + lp) // self.console.width 381 if wrapcount == 0: 382 offset += ll + 1 # Takes all of the line plus the newline 383 last_refresh_line_end_offsets.append(offset) 384 screen.append(prompt + l) 385 screeninfo.append((lp, l2)) 386 else: 387 i = 0 388 while l: 389 prelen = lp if i == 0 else 0 390 index_to_wrap_before = 0 391 column = 0 392 for character_width in l2: 393 if column + character_width >= self.console.width - prelen: 394 break 395 index_to_wrap_before += 1 396 column += character_width 397 pre = prompt if i == 0 else "" 398 if len(l) > index_to_wrap_before: 399 offset += index_to_wrap_before 400 post = "\\" 401 after = [1] 402 else: 403 offset += index_to_wrap_before + 1 # Takes the newline 404 post = "" 405 after = [] 406 last_refresh_line_end_offsets.append(offset) 407 screen.append(pre + l[:index_to_wrap_before] + post) 408 screeninfo.append((prelen, l2[:index_to_wrap_before] + after)) 409 l = l[index_to_wrap_before:] 410 l2 = l2[index_to_wrap_before:] 411 i += 1 412 self.screeninfo = screeninfo 413 self.cxy = self.pos2xy() 414 if self.msg: 415 for mline in self.msg.split("\n"): 416 screen.append(mline) 417 screeninfo.append((0, [])) 418 419 self.last_refresh_cache.update_cache(self, screen, screeninfo) 420 return screen 421 422 @staticmethod 423 def process_prompt(prompt: str) -> tuple[str, int]: 424 """Process the prompt. 425 426 This means calculate the length of the prompt. The character \x01 427 and \x02 are used to bracket ANSI control sequences and need to be 428 excluded from the length calculation. So also a copy of the prompt 429 is returned with these control characters removed.""" 430 431 # The logic below also ignores the length of common escape 432 # sequences if they were not explicitly within \x01...\x02. 433 # They are CSI (or ANSI) sequences ( ESC [ ... LETTER ) 434 435 # wlen from utils already excludes ANSI_ESCAPE_SEQUENCE chars, 436 # which breaks the logic below so we redefine it here. 437 def wlen(s: str) -> int: 438 return sum(str_width(i) for i in s) 439 440 out_prompt = "" 441 l = wlen(prompt) 442 pos = 0 443 while True: 444 s = prompt.find("\x01", pos) 445 if s == -1: 446 break 447 e = prompt.find("\x02", s) 448 if e == -1: 449 break 450 # Found start and end brackets, subtract from string length 451 l = l - (e - s + 1) 452 keep = prompt[pos:s] 453 l -= sum(map(wlen, ANSI_ESCAPE_SEQUENCE.findall(keep))) 454 out_prompt += keep + prompt[s + 1 : e] 455 pos = e + 1 456 keep = prompt[pos:] 457 l -= sum(map(wlen, ANSI_ESCAPE_SEQUENCE.findall(keep))) 458 out_prompt += keep 459 return out_prompt, l 460 461 def bow(self, p: int | None = None) -> int: 462 """Return the 0-based index of the word break preceding p most 463 immediately. 464 465 p defaults to self.pos; word boundaries are determined using 466 self.syntax_table.""" 467 if p is None: 468 p = self.pos 469 st = self.syntax_table 470 b = self.buffer 471 p -= 1 472 while p >= 0 and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD: 473 p -= 1 474 while p >= 0 and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD: 475 p -= 1 476 return p + 1 477 478 def eow(self, p: int | None = None) -> int: 479 """Return the 0-based index of the word break following p most 480 immediately. 481 482 p defaults to self.pos; word boundaries are determined using 483 self.syntax_table.""" 484 if p is None: 485 p = self.pos 486 st = self.syntax_table 487 b = self.buffer 488 while p < len(b) and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD: 489 p += 1 490 while p < len(b) and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD: 491 p += 1 492 return p 493 494 def bol(self, p: int | None = None) -> int: 495 """Return the 0-based index of the line break preceding p most 496 immediately. 497 498 p defaults to self.pos.""" 499 if p is None: 500 p = self.pos 501 b = self.buffer 502 p -= 1 503 while p >= 0 and b[p] != "\n": 504 p -= 1 505 return p + 1 506 507 def eol(self, p: int | None = None) -> int: 508 """Return the 0-based index of the line break following p most 509 immediately. 510 511 p defaults to self.pos.""" 512 if p is None: 513 p = self.pos 514 b = self.buffer 515 while p < len(b) and b[p] != "\n": 516 p += 1 517 return p 518 519 def max_column(self, y: int) -> int: 520 """Return the last x-offset for line y""" 521 return self.screeninfo[y][0] + sum(self.screeninfo[y][1]) 522 523 def max_row(self) -> int: 524 return len(self.screeninfo) - 1 525 526 def get_arg(self, default: int = 1) -> int: 527 """Return any prefix argument that the user has supplied, 528 returning `default' if there is None. Defaults to 1. 529 """ 530 if self.arg is None: 531 return default 532 return self.arg 533 534 def get_prompt(self, lineno: int, cursor_on_line: bool) -> str: 535 """Return what should be in the left-hand margin for line 536 `lineno'.""" 537 if self.arg is not None and cursor_on_line: 538 prompt = f"(arg: {self.arg}) " 539 elif self.paste_mode and not self.in_bracketed_paste: 540 prompt = "(paste) " 541 elif "\n" in self.buffer: 542 if lineno == 0: 543 prompt = self.ps2 544 elif self.ps4 and lineno == self.buffer.count("\n"): 545 prompt = self.ps4 546 else: 547 prompt = self.ps3 548 else: 549 prompt = self.ps1 550 551 if self.can_colorize: 552 prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}" 553 return prompt 554 555 def push_input_trans(self, itrans: input.KeymapTranslator) -> None: 556 self.input_trans_stack.append(self.input_trans) 557 self.input_trans = itrans 558 559 def pop_input_trans(self) -> None: 560 self.input_trans = self.input_trans_stack.pop() 561 562 def setpos_from_xy(self, x: int, y: int) -> None: 563 """Set pos according to coordinates x, y""" 564 pos = 0 565 i = 0 566 while i < y: 567 prompt_len, character_widths = self.screeninfo[i] 568 offset = len(character_widths) - character_widths.count(0) 569 in_wrapped_line = prompt_len + sum(character_widths) >= self.console.width 570 if in_wrapped_line: 571 pos += offset - 1 # -1 cause backslash is not in buffer 572 else: 573 pos += offset + 1 # +1 cause newline is in buffer 574 i += 1 575 576 j = 0 577 cur_x = self.screeninfo[i][0] 578 while cur_x < x: 579 if self.screeninfo[i][1][j] == 0: 580 continue 581 cur_x += self.screeninfo[i][1][j] 582 j += 1 583 pos += 1 584 585 self.pos = pos 586 587 def pos2xy(self) -> tuple[int, int]: 588 """Return the x, y coordinates of position 'pos'.""" 589 # this *is* incomprehensible, yes. 590 y = 0 591 pos = self.pos 592 assert 0 <= pos <= len(self.buffer) 593 if pos == len(self.buffer): 594 y = len(self.screeninfo) - 1 595 p, l2 = self.screeninfo[y] 596 return p + sum(l2) + l2.count(0), y 597 598 for p, l2 in self.screeninfo: 599 l = len(l2) - l2.count(0) 600 in_wrapped_line = p + sum(l2) >= self.console.width 601 offset = l - 1 if in_wrapped_line else l # need to remove backslash 602 if offset >= pos: 603 break 604 605 if p + sum(l2) >= self.console.width: 606 pos -= l - 1 # -1 cause backslash is not in buffer 607 else: 608 pos -= l + 1 # +1 cause newline is in buffer 609 y += 1 610 return p + sum(l2[:pos]), y 611 612 def insert(self, text: str | list[str]) -> None: 613 """Insert 'text' at the insertion point.""" 614 self.buffer[self.pos : self.pos] = list(text) 615 self.pos += len(text) 616 self.dirty = True 617 618 def update_cursor(self) -> None: 619 """Move the cursor to reflect changes in self.pos""" 620 self.cxy = self.pos2xy() 621 self.console.move_cursor(*self.cxy) 622 623 def after_command(self, cmd: Command) -> None: 624 """This function is called to allow post command cleanup.""" 625 if getattr(cmd, "kills_digit_arg", True): 626 if self.arg is not None: 627 self.dirty = True 628 self.arg = None 629 630 def prepare(self) -> None: 631 """Get ready to run. Call restore when finished. You must not 632 write to the console in between the calls to prepare and 633 restore.""" 634 try: 635 self.console.prepare() 636 self.arg = None 637 self.finished = False 638 del self.buffer[:] 639 self.pos = 0 640 self.dirty = True 641 self.last_command = None 642 self.calc_screen() 643 except BaseException: 644 self.restore() 645 raise 646 647 while self.scheduled_commands: 648 cmd = self.scheduled_commands.pop() 649 self.do_cmd((cmd, [])) 650 651 def last_command_is(self, cls: type) -> bool: 652 if not self.last_command: 653 return False 654 return issubclass(cls, self.last_command) 655 656 def restore(self) -> None: 657 """Clean up after a run.""" 658 self.console.restore() 659 660 @contextmanager 661 def suspend(self) -> SimpleContextManager: 662 """A context manager to delegate to another reader.""" 663 prev_state = {f.name: getattr(self, f.name) for f in fields(self)} 664 try: 665 self.restore() 666 yield 667 finally: 668 for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"): 669 setattr(self, arg, prev_state[arg]) 670 self.prepare() 671 672 def finish(self) -> None: 673 """Called when a command signals that we're finished.""" 674 pass 675 676 def error(self, msg: str = "none") -> None: 677 self.msg = "! " + msg + " " 678 self.dirty = True 679 self.console.beep() 680 681 def update_screen(self) -> None: 682 if self.dirty: 683 self.refresh() 684 685 def refresh(self) -> None: 686 """Recalculate and refresh the screen.""" 687 if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n": 688 return 689 690 # this call sets up self.cxy, so call it first. 691 self.screen = self.calc_screen() 692 self.console.refresh(self.screen, self.cxy) 693 self.dirty = False 694 695 def do_cmd(self, cmd: tuple[str, list[str]]) -> None: 696 """`cmd` is a tuple of "event_name" and "event", which in the current 697 implementation is always just the "buffer" which happens to be a list 698 of single-character strings.""" 699 700 trace("received command {cmd}", cmd=cmd) 701 if isinstance(cmd[0], str): 702 command_type = self.commands.get(cmd[0], commands.invalid_command) 703 elif isinstance(cmd[0], type): 704 command_type = cmd[0] 705 else: 706 return # nothing to do 707 708 command = command_type(self, *cmd) # type: ignore[arg-type] 709 command.do() 710 711 self.after_command(command) 712 713 if self.dirty: 714 self.refresh() 715 else: 716 self.update_cursor() 717 718 if not isinstance(cmd, commands.digit_arg): 719 self.last_command = command_type 720 721 self.finished = bool(command.finish) 722 if self.finished: 723 self.console.finish() 724 self.finish() 725 726 def run_hooks(self) -> None: 727 threading_hook = self.threading_hook 728 if threading_hook is None and 'threading' in sys.modules: 729 from ._threading_handler import install_threading_hook 730 install_threading_hook(self) 731 if threading_hook is not None: 732 try: 733 threading_hook() 734 except Exception: 735 pass 736 737 input_hook = self.console.input_hook 738 if input_hook: 739 try: 740 input_hook() 741 except Exception: 742 pass 743 744 def handle1(self, block: bool = True) -> bool: 745 """Handle a single event. Wait as long as it takes if block 746 is true (the default), otherwise return False if no event is 747 pending.""" 748 749 if self.msg: 750 self.msg = "" 751 self.dirty = True 752 753 while True: 754 # We use the same timeout as in readline.c: 100ms 755 self.run_hooks() 756 self.console.wait(100) 757 event = self.console.get_event(block=False) 758 if not event: 759 if block: 760 continue 761 return False 762 763 translate = True 764 765 if event.evt == "key": 766 self.input_trans.push(event) 767 elif event.evt == "scroll": 768 self.refresh() 769 elif event.evt == "resize": 770 self.refresh() 771 else: 772 translate = False 773 774 if translate: 775 cmd = self.input_trans.get() 776 else: 777 cmd = [event.evt, event.data] 778 779 if cmd is None: 780 if block: 781 continue 782 return False 783 784 self.do_cmd(cmd) 785 return True 786 787 def push_char(self, char: int | bytes) -> None: 788 self.console.push_char(char) 789 self.handle1(block=False) 790 791 def readline(self, startup_hook: Callback | None = None) -> str: 792 """Read a line. The implementation of this method also shows 793 how to drive Reader if you want more control over the event 794 loop.""" 795 self.prepare() 796 try: 797 if startup_hook is not None: 798 startup_hook() 799 self.refresh() 800 while not self.finished: 801 self.handle1() 802 return self.get_unicode() 803 804 finally: 805 self.restore() 806 807 def bind(self, spec: KeySpec, command: CommandName) -> None: 808 self.keymap = self.keymap + ((spec, command),) 809 self.input_trans = input.KeymapTranslator( 810 self.keymap, invalid_cls="invalid-key", character_cls="self-insert" 811 ) 812 813 def get_unicode(self) -> str: 814 """Return the current buffer as a unicode string.""" 815 return "".join(self.buffer) 816