• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1#   Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com>
2#                       Antonio Cuni
3#                       Armin Rigo
4#
5#                        All Rights Reserved
6#
7#
8# Permission to use, copy, modify, and distribute this software and
9# its documentation for any purpose is hereby granted without fee,
10# provided that the above copyright notice appear in all copies and
11# that both that copyright notice and this permission notice appear in
12# supporting documentation.
13#
14# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
15# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
16# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
17# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
18# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
19# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
20# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
21
22from __future__ import annotations
23
24import sys
25
26from contextlib import contextmanager
27from dataclasses import dataclass, field, fields
28import unicodedata
29from _colorize import can_colorize, ANSIColors  # type: ignore[import-not-found]
30
31
32from . import commands, console, input
33from .utils import ANSI_ESCAPE_SEQUENCE, wlen, str_width
34from .trace import trace
35
36
37# types
38Command = commands.Command
39from .types import Callback, SimpleContextManager, KeySpec, CommandName
40
41
42def disp_str(buffer: str) -> tuple[str, list[int]]:
43    """disp_str(buffer:string) -> (string, [int])
44
45    Return the string that should be the printed representation of
46    |buffer| and a list detailing where the characters of |buffer|
47    get used up.  E.g.:
48
49    >>> disp_str(chr(3))
50    ('^C', [1, 0])
51
52    """
53    b: list[int] = []
54    s: list[str] = []
55    for c in buffer:
56        if c == '\x1a':
57            s.append(c)
58            b.append(2)
59        elif ord(c) < 128:
60            s.append(c)
61            b.append(1)
62        elif unicodedata.category(c).startswith("C"):
63            c = r"\u%04x" % ord(c)
64            s.append(c)
65            b.extend([0] * (len(c) - 1))
66        else:
67            s.append(c)
68            b.append(str_width(c))
69    return "".join(s), b
70
71
72# syntax classes:
73
74SYNTAX_WHITESPACE, SYNTAX_WORD, SYNTAX_SYMBOL = range(3)
75
76
77def make_default_syntax_table() -> dict[str, int]:
78    # XXX perhaps should use some unicodedata here?
79    st: dict[str, int] = {}
80    for c in map(chr, range(256)):
81        st[c] = SYNTAX_SYMBOL
82    for c in [a for a in map(chr, range(256)) if a.isalnum()]:
83        st[c] = SYNTAX_WORD
84    st["\n"] = st[" "] = SYNTAX_WHITESPACE
85    return st
86
87
88def make_default_commands() -> dict[CommandName, type[Command]]:
89    result: dict[CommandName, type[Command]] = {}
90    for v in vars(commands).values():
91        if isinstance(v, type) and issubclass(v, Command) and v.__name__[0].islower():
92            result[v.__name__] = v
93            result[v.__name__.replace("_", "-")] = v
94    return result
95
96
97default_keymap: tuple[tuple[KeySpec, CommandName], ...] = tuple(
98    [
99        (r"\C-a", "beginning-of-line"),
100        (r"\C-b", "left"),
101        (r"\C-c", "interrupt"),
102        (r"\C-d", "delete"),
103        (r"\C-e", "end-of-line"),
104        (r"\C-f", "right"),
105        (r"\C-g", "cancel"),
106        (r"\C-h", "backspace"),
107        (r"\C-j", "accept"),
108        (r"\<return>", "accept"),
109        (r"\C-k", "kill-line"),
110        (r"\C-l", "clear-screen"),
111        (r"\C-m", "accept"),
112        (r"\C-t", "transpose-characters"),
113        (r"\C-u", "unix-line-discard"),
114        (r"\C-w", "unix-word-rubout"),
115        (r"\C-x\C-u", "upcase-region"),
116        (r"\C-y", "yank"),
117        *(() if sys.platform == "win32" else ((r"\C-z", "suspend"), )),
118        (r"\M-b", "backward-word"),
119        (r"\M-c", "capitalize-word"),
120        (r"\M-d", "kill-word"),
121        (r"\M-f", "forward-word"),
122        (r"\M-l", "downcase-word"),
123        (r"\M-t", "transpose-words"),
124        (r"\M-u", "upcase-word"),
125        (r"\M-y", "yank-pop"),
126        (r"\M--", "digit-arg"),
127        (r"\M-0", "digit-arg"),
128        (r"\M-1", "digit-arg"),
129        (r"\M-2", "digit-arg"),
130        (r"\M-3", "digit-arg"),
131        (r"\M-4", "digit-arg"),
132        (r"\M-5", "digit-arg"),
133        (r"\M-6", "digit-arg"),
134        (r"\M-7", "digit-arg"),
135        (r"\M-8", "digit-arg"),
136        (r"\M-9", "digit-arg"),
137        (r"\M-\n", "accept"),
138        ("\\\\", "self-insert"),
139        (r"\x1b[200~", "enable_bracketed_paste"),
140        (r"\x1b[201~", "disable_bracketed_paste"),
141        (r"\x03", "ctrl-c"),
142    ]
143    + [(c, "self-insert") for c in map(chr, range(32, 127)) if c != "\\"]
144    + [(c, "self-insert") for c in map(chr, range(128, 256)) if c.isalpha()]
145    + [
146        (r"\<up>", "up"),
147        (r"\<down>", "down"),
148        (r"\<left>", "left"),
149        (r"\C-\<left>", "backward-word"),
150        (r"\<right>", "right"),
151        (r"\C-\<right>", "forward-word"),
152        (r"\<delete>", "delete"),
153        (r"\x1b[3~", "delete"),
154        (r"\<backspace>", "backspace"),
155        (r"\M-\<backspace>", "backward-kill-word"),
156        (r"\<end>", "end-of-line"),  # was 'end'
157        (r"\<home>", "beginning-of-line"),  # was 'home'
158        (r"\<f1>", "help"),
159        (r"\<f2>", "show-history"),
160        (r"\<f3>", "paste-mode"),
161        (r"\EOF", "end"),  # the entries in the terminfo database for xterms
162        (r"\EOH", "home"),  # seem to be wrong.  this is a less than ideal
163        # workaround
164    ]
165)
166
167
168@dataclass(slots=True)
169class Reader:
170    """The Reader class implements the bare bones of a command reader,
171    handling such details as editing and cursor motion.  What it does
172    not support are such things as completion or history support -
173    these are implemented elsewhere.
174
175    Instance variables of note include:
176
177      * buffer:
178        A *list* (*not* a string at the moment :-) containing all the
179        characters that have been entered.
180      * console:
181        Hopefully encapsulates the OS dependent stuff.
182      * pos:
183        A 0-based index into `buffer' for where the insertion point
184        is.
185      * screeninfo:
186        Ahem.  This list contains some info needed to move the
187        insertion point around reasonably efficiently.
188      * cxy, lxy:
189        the position of the insertion point in screen ...
190      * syntax_table:
191        Dictionary mapping characters to `syntax class'; read the
192        emacs docs to see what this means :-)
193      * commands:
194        Dictionary mapping command names to command classes.
195      * arg:
196        The emacs-style prefix argument.  It will be None if no such
197        argument has been provided.
198      * dirty:
199        True if we need to refresh the display.
200      * kill_ring:
201        The emacs-style kill-ring; manipulated with yank & yank-pop
202      * ps1, ps2, ps3, ps4:
203        prompts.  ps1 is the prompt for a one-line input; for a
204        multiline input it looks like:
205            ps2> first line of input goes here
206            ps3> second and further
207            ps3> lines get ps3
208            ...
209            ps4> and the last one gets ps4
210        As with the usual top-level, you can set these to instances if
211        you like; str() will be called on them (once) at the beginning
212        of each command.  Don't put really long or newline containing
213        strings here, please!
214        This is just the default policy; you can change it freely by
215        overriding get_prompt() (and indeed some standard subclasses
216        do).
217      * finished:
218        handle1 will set this to a true value if a command signals
219        that we're done.
220    """
221
222    console: console.Console
223
224    ## state
225    buffer: list[str] = field(default_factory=list)
226    pos: int = 0
227    ps1: str = "->> "
228    ps2: str = "/>> "
229    ps3: str = "|.. "
230    ps4: str = R"\__ "
231    kill_ring: list[list[str]] = field(default_factory=list)
232    msg: str = ""
233    arg: int | None = None
234    dirty: bool = False
235    finished: bool = False
236    paste_mode: bool = False
237    in_bracketed_paste: bool = False
238    commands: dict[str, type[Command]] = field(default_factory=make_default_commands)
239    last_command: type[Command] | None = None
240    syntax_table: dict[str, int] = field(default_factory=make_default_syntax_table)
241    keymap: tuple[tuple[str, str], ...] = ()
242    input_trans: input.KeymapTranslator = field(init=False)
243    input_trans_stack: list[input.KeymapTranslator] = field(default_factory=list)
244    screen: list[str] = field(default_factory=list)
245    screeninfo: list[tuple[int, list[int]]] = field(init=False)
246    cxy: tuple[int, int] = field(init=False)
247    lxy: tuple[int, int] = field(init=False)
248    scheduled_commands: list[str] = field(default_factory=list)
249    can_colorize: bool = False
250    threading_hook: Callback | None = None
251
252    ## cached metadata to speed up screen refreshes
253    @dataclass
254    class RefreshCache:
255        in_bracketed_paste: bool = False
256        screen: list[str] = field(default_factory=list)
257        screeninfo: list[tuple[int, list[int]]] = field(init=False)
258        line_end_offsets: list[int] = field(default_factory=list)
259        pos: int = field(init=False)
260        cxy: tuple[int, int] = field(init=False)
261        dimensions: tuple[int, int] = field(init=False)
262        invalidated: bool = False
263
264        def update_cache(self,
265                         reader: Reader,
266                         screen: list[str],
267                         screeninfo: list[tuple[int, list[int]]],
268            ) -> None:
269            self.in_bracketed_paste = reader.in_bracketed_paste
270            self.screen = screen.copy()
271            self.screeninfo = screeninfo.copy()
272            self.pos = reader.pos
273            self.cxy = reader.cxy
274            self.dimensions = reader.console.width, reader.console.height
275            self.invalidated = False
276
277        def valid(self, reader: Reader) -> bool:
278            if self.invalidated:
279                return False
280            dimensions = reader.console.width, reader.console.height
281            dimensions_changed = dimensions != self.dimensions
282            paste_changed = reader.in_bracketed_paste != self.in_bracketed_paste
283            return not (dimensions_changed or paste_changed)
284
285        def get_cached_location(self, reader: Reader) -> tuple[int, int]:
286            if self.invalidated:
287                raise ValueError("Cache is invalidated")
288            offset = 0
289            earliest_common_pos = min(reader.pos, self.pos)
290            num_common_lines = len(self.line_end_offsets)
291            while num_common_lines > 0:
292                offset = self.line_end_offsets[num_common_lines - 1]
293                if earliest_common_pos > offset:
294                    break
295                num_common_lines -= 1
296            else:
297                offset = 0
298            return offset, num_common_lines
299
300    last_refresh_cache: RefreshCache = field(default_factory=RefreshCache)
301
302    def __post_init__(self) -> None:
303        # Enable the use of `insert` without a `prepare` call - necessary to
304        # facilitate the tab completion hack implemented for
305        # <https://bugs.python.org/issue25660>.
306        self.keymap = self.collect_keymap()
307        self.input_trans = input.KeymapTranslator(
308            self.keymap, invalid_cls="invalid-key", character_cls="self-insert"
309        )
310        self.screeninfo = [(0, [])]
311        self.cxy = self.pos2xy()
312        self.lxy = (self.pos, 0)
313        self.can_colorize = can_colorize()
314
315        self.last_refresh_cache.screeninfo = self.screeninfo
316        self.last_refresh_cache.pos = self.pos
317        self.last_refresh_cache.cxy = self.cxy
318        self.last_refresh_cache.dimensions = (0, 0)
319
320    def collect_keymap(self) -> tuple[tuple[KeySpec, CommandName], ...]:
321        return default_keymap
322
323    def calc_screen(self) -> list[str]:
324        """Translate changes in self.buffer into changes in self.console.screen."""
325        # Since the last call to calc_screen:
326        # screen and screeninfo may differ due to a completion menu being shown
327        # pos and cxy may differ due to edits, cursor movements, or completion menus
328
329        # Lines that are above both the old and new cursor position can't have changed,
330        # unless the terminal has been resized (which might cause reflowing) or we've
331        # entered or left paste mode (which changes prompts, causing reflowing).
332        num_common_lines = 0
333        offset = 0
334        if self.last_refresh_cache.valid(self):
335            offset, num_common_lines = self.last_refresh_cache.get_cached_location(self)
336
337        screen = self.last_refresh_cache.screen
338        del screen[num_common_lines:]
339
340        screeninfo = self.last_refresh_cache.screeninfo
341        del screeninfo[num_common_lines:]
342
343        last_refresh_line_end_offsets = self.last_refresh_cache.line_end_offsets
344        del last_refresh_line_end_offsets[num_common_lines:]
345
346        pos = self.pos
347        pos -= offset
348
349        prompt_from_cache = (offset and self.buffer[offset - 1] != "\n")
350
351        lines = "".join(self.buffer[offset:]).split("\n")
352
353        cursor_found = False
354        lines_beyond_cursor = 0
355        for ln, line in enumerate(lines, num_common_lines):
356            ll = len(line)
357            if 0 <= pos <= ll:
358                self.lxy = pos, ln
359                cursor_found = True
360            elif cursor_found:
361                lines_beyond_cursor += 1
362                if lines_beyond_cursor > self.console.height:
363                    # No need to keep formatting lines.
364                    # The console can't show them.
365                    break
366            if prompt_from_cache:
367                # Only the first line's prompt can come from the cache
368                prompt_from_cache = False
369                prompt = ""
370            else:
371                prompt = self.get_prompt(ln, ll >= pos >= 0)
372            while "\n" in prompt:
373                pre_prompt, _, prompt = prompt.partition("\n")
374                last_refresh_line_end_offsets.append(offset)
375                screen.append(pre_prompt)
376                screeninfo.append((0, []))
377            pos -= ll + 1
378            prompt, lp = self.process_prompt(prompt)
379            l, l2 = disp_str(line)
380            wrapcount = (wlen(l) + lp) // self.console.width
381            if wrapcount == 0:
382                offset += ll + 1  # Takes all of the line plus the newline
383                last_refresh_line_end_offsets.append(offset)
384                screen.append(prompt + l)
385                screeninfo.append((lp, l2))
386            else:
387                i = 0
388                while l:
389                    prelen = lp if i == 0 else 0
390                    index_to_wrap_before = 0
391                    column = 0
392                    for character_width in l2:
393                        if column + character_width >= self.console.width - prelen:
394                            break
395                        index_to_wrap_before += 1
396                        column += character_width
397                    pre = prompt if i == 0 else ""
398                    if len(l) > index_to_wrap_before:
399                        offset += index_to_wrap_before
400                        post = "\\"
401                        after = [1]
402                    else:
403                        offset += index_to_wrap_before + 1  # Takes the newline
404                        post = ""
405                        after = []
406                    last_refresh_line_end_offsets.append(offset)
407                    screen.append(pre + l[:index_to_wrap_before] + post)
408                    screeninfo.append((prelen, l2[:index_to_wrap_before] + after))
409                    l = l[index_to_wrap_before:]
410                    l2 = l2[index_to_wrap_before:]
411                    i += 1
412        self.screeninfo = screeninfo
413        self.cxy = self.pos2xy()
414        if self.msg:
415            for mline in self.msg.split("\n"):
416                screen.append(mline)
417                screeninfo.append((0, []))
418
419        self.last_refresh_cache.update_cache(self, screen, screeninfo)
420        return screen
421
422    @staticmethod
423    def process_prompt(prompt: str) -> tuple[str, int]:
424        """Process the prompt.
425
426        This means calculate the length of the prompt. The character \x01
427        and \x02 are used to bracket ANSI control sequences and need to be
428        excluded from the length calculation.  So also a copy of the prompt
429        is returned with these control characters removed."""
430
431        # The logic below also ignores the length of common escape
432        # sequences if they were not explicitly within \x01...\x02.
433        # They are CSI (or ANSI) sequences  ( ESC [ ... LETTER )
434
435        # wlen from utils already excludes ANSI_ESCAPE_SEQUENCE chars,
436        # which breaks the logic below so we redefine it here.
437        def wlen(s: str) -> int:
438            return sum(str_width(i) for i in s)
439
440        out_prompt = ""
441        l = wlen(prompt)
442        pos = 0
443        while True:
444            s = prompt.find("\x01", pos)
445            if s == -1:
446                break
447            e = prompt.find("\x02", s)
448            if e == -1:
449                break
450            # Found start and end brackets, subtract from string length
451            l = l - (e - s + 1)
452            keep = prompt[pos:s]
453            l -= sum(map(wlen, ANSI_ESCAPE_SEQUENCE.findall(keep)))
454            out_prompt += keep + prompt[s + 1 : e]
455            pos = e + 1
456        keep = prompt[pos:]
457        l -= sum(map(wlen, ANSI_ESCAPE_SEQUENCE.findall(keep)))
458        out_prompt += keep
459        return out_prompt, l
460
461    def bow(self, p: int | None = None) -> int:
462        """Return the 0-based index of the word break preceding p most
463        immediately.
464
465        p defaults to self.pos; word boundaries are determined using
466        self.syntax_table."""
467        if p is None:
468            p = self.pos
469        st = self.syntax_table
470        b = self.buffer
471        p -= 1
472        while p >= 0 and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD:
473            p -= 1
474        while p >= 0 and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD:
475            p -= 1
476        return p + 1
477
478    def eow(self, p: int | None = None) -> int:
479        """Return the 0-based index of the word break following p most
480        immediately.
481
482        p defaults to self.pos; word boundaries are determined using
483        self.syntax_table."""
484        if p is None:
485            p = self.pos
486        st = self.syntax_table
487        b = self.buffer
488        while p < len(b) and st.get(b[p], SYNTAX_WORD) != SYNTAX_WORD:
489            p += 1
490        while p < len(b) and st.get(b[p], SYNTAX_WORD) == SYNTAX_WORD:
491            p += 1
492        return p
493
494    def bol(self, p: int | None = None) -> int:
495        """Return the 0-based index of the line break preceding p most
496        immediately.
497
498        p defaults to self.pos."""
499        if p is None:
500            p = self.pos
501        b = self.buffer
502        p -= 1
503        while p >= 0 and b[p] != "\n":
504            p -= 1
505        return p + 1
506
507    def eol(self, p: int | None = None) -> int:
508        """Return the 0-based index of the line break following p most
509        immediately.
510
511        p defaults to self.pos."""
512        if p is None:
513            p = self.pos
514        b = self.buffer
515        while p < len(b) and b[p] != "\n":
516            p += 1
517        return p
518
519    def max_column(self, y: int) -> int:
520        """Return the last x-offset for line y"""
521        return self.screeninfo[y][0] + sum(self.screeninfo[y][1])
522
523    def max_row(self) -> int:
524        return len(self.screeninfo) - 1
525
526    def get_arg(self, default: int = 1) -> int:
527        """Return any prefix argument that the user has supplied,
528        returning `default' if there is None.  Defaults to 1.
529        """
530        if self.arg is None:
531            return default
532        return self.arg
533
534    def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
535        """Return what should be in the left-hand margin for line
536        `lineno'."""
537        if self.arg is not None and cursor_on_line:
538            prompt = f"(arg: {self.arg}) "
539        elif self.paste_mode and not self.in_bracketed_paste:
540            prompt = "(paste) "
541        elif "\n" in self.buffer:
542            if lineno == 0:
543                prompt = self.ps2
544            elif self.ps4 and lineno == self.buffer.count("\n"):
545                prompt = self.ps4
546            else:
547                prompt = self.ps3
548        else:
549            prompt = self.ps1
550
551        if self.can_colorize:
552            prompt = f"{ANSIColors.BOLD_MAGENTA}{prompt}{ANSIColors.RESET}"
553        return prompt
554
555    def push_input_trans(self, itrans: input.KeymapTranslator) -> None:
556        self.input_trans_stack.append(self.input_trans)
557        self.input_trans = itrans
558
559    def pop_input_trans(self) -> None:
560        self.input_trans = self.input_trans_stack.pop()
561
562    def setpos_from_xy(self, x: int, y: int) -> None:
563        """Set pos according to coordinates x, y"""
564        pos = 0
565        i = 0
566        while i < y:
567            prompt_len, character_widths = self.screeninfo[i]
568            offset = len(character_widths) - character_widths.count(0)
569            in_wrapped_line = prompt_len + sum(character_widths) >= self.console.width
570            if in_wrapped_line:
571                pos += offset - 1  # -1 cause backslash is not in buffer
572            else:
573                pos += offset + 1  # +1 cause newline is in buffer
574            i += 1
575
576        j = 0
577        cur_x = self.screeninfo[i][0]
578        while cur_x < x:
579            if self.screeninfo[i][1][j] == 0:
580                continue
581            cur_x += self.screeninfo[i][1][j]
582            j += 1
583            pos += 1
584
585        self.pos = pos
586
587    def pos2xy(self) -> tuple[int, int]:
588        """Return the x, y coordinates of position 'pos'."""
589        # this *is* incomprehensible, yes.
590        y = 0
591        pos = self.pos
592        assert 0 <= pos <= len(self.buffer)
593        if pos == len(self.buffer):
594            y = len(self.screeninfo) - 1
595            p, l2 = self.screeninfo[y]
596            return p + sum(l2) + l2.count(0), y
597
598        for p, l2 in self.screeninfo:
599            l = len(l2) - l2.count(0)
600            in_wrapped_line = p + sum(l2) >= self.console.width
601            offset = l - 1 if in_wrapped_line else l  # need to remove backslash
602            if offset >= pos:
603                break
604
605            if p + sum(l2) >= self.console.width:
606                pos -= l - 1  # -1 cause backslash is not in buffer
607            else:
608                pos -= l + 1  # +1 cause newline is in buffer
609            y += 1
610        return p + sum(l2[:pos]), y
611
612    def insert(self, text: str | list[str]) -> None:
613        """Insert 'text' at the insertion point."""
614        self.buffer[self.pos : self.pos] = list(text)
615        self.pos += len(text)
616        self.dirty = True
617
618    def update_cursor(self) -> None:
619        """Move the cursor to reflect changes in self.pos"""
620        self.cxy = self.pos2xy()
621        self.console.move_cursor(*self.cxy)
622
623    def after_command(self, cmd: Command) -> None:
624        """This function is called to allow post command cleanup."""
625        if getattr(cmd, "kills_digit_arg", True):
626            if self.arg is not None:
627                self.dirty = True
628            self.arg = None
629
630    def prepare(self) -> None:
631        """Get ready to run.  Call restore when finished.  You must not
632        write to the console in between the calls to prepare and
633        restore."""
634        try:
635            self.console.prepare()
636            self.arg = None
637            self.finished = False
638            del self.buffer[:]
639            self.pos = 0
640            self.dirty = True
641            self.last_command = None
642            self.calc_screen()
643        except BaseException:
644            self.restore()
645            raise
646
647        while self.scheduled_commands:
648            cmd = self.scheduled_commands.pop()
649            self.do_cmd((cmd, []))
650
651    def last_command_is(self, cls: type) -> bool:
652        if not self.last_command:
653            return False
654        return issubclass(cls, self.last_command)
655
656    def restore(self) -> None:
657        """Clean up after a run."""
658        self.console.restore()
659
660    @contextmanager
661    def suspend(self) -> SimpleContextManager:
662        """A context manager to delegate to another reader."""
663        prev_state = {f.name: getattr(self, f.name) for f in fields(self)}
664        try:
665            self.restore()
666            yield
667        finally:
668            for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"):
669                setattr(self, arg, prev_state[arg])
670            self.prepare()
671
672    def finish(self) -> None:
673        """Called when a command signals that we're finished."""
674        pass
675
676    def error(self, msg: str = "none") -> None:
677        self.msg = "! " + msg + " "
678        self.dirty = True
679        self.console.beep()
680
681    def update_screen(self) -> None:
682        if self.dirty:
683            self.refresh()
684
685    def refresh(self) -> None:
686        """Recalculate and refresh the screen."""
687        if self.in_bracketed_paste and self.buffer and not self.buffer[-1] == "\n":
688            return
689
690        # this call sets up self.cxy, so call it first.
691        self.screen = self.calc_screen()
692        self.console.refresh(self.screen, self.cxy)
693        self.dirty = False
694
695    def do_cmd(self, cmd: tuple[str, list[str]]) -> None:
696        """`cmd` is a tuple of "event_name" and "event", which in the current
697        implementation is always just the "buffer" which happens to be a list
698        of single-character strings."""
699
700        trace("received command {cmd}", cmd=cmd)
701        if isinstance(cmd[0], str):
702            command_type = self.commands.get(cmd[0], commands.invalid_command)
703        elif isinstance(cmd[0], type):
704            command_type = cmd[0]
705        else:
706            return  # nothing to do
707
708        command = command_type(self, *cmd)  # type: ignore[arg-type]
709        command.do()
710
711        self.after_command(command)
712
713        if self.dirty:
714            self.refresh()
715        else:
716            self.update_cursor()
717
718        if not isinstance(cmd, commands.digit_arg):
719            self.last_command = command_type
720
721        self.finished = bool(command.finish)
722        if self.finished:
723            self.console.finish()
724            self.finish()
725
726    def run_hooks(self) -> None:
727        threading_hook = self.threading_hook
728        if threading_hook is None and 'threading' in sys.modules:
729            from ._threading_handler import install_threading_hook
730            install_threading_hook(self)
731        if threading_hook is not None:
732            try:
733                threading_hook()
734            except Exception:
735                pass
736
737        input_hook = self.console.input_hook
738        if input_hook:
739            try:
740                input_hook()
741            except Exception:
742                pass
743
744    def handle1(self, block: bool = True) -> bool:
745        """Handle a single event.  Wait as long as it takes if block
746        is true (the default), otherwise return False if no event is
747        pending."""
748
749        if self.msg:
750            self.msg = ""
751            self.dirty = True
752
753        while True:
754            # We use the same timeout as in readline.c: 100ms
755            self.run_hooks()
756            self.console.wait(100)
757            event = self.console.get_event(block=False)
758            if not event:
759                if block:
760                    continue
761                return False
762
763            translate = True
764
765            if event.evt == "key":
766                self.input_trans.push(event)
767            elif event.evt == "scroll":
768                self.refresh()
769            elif event.evt == "resize":
770                self.refresh()
771            else:
772                translate = False
773
774            if translate:
775                cmd = self.input_trans.get()
776            else:
777                cmd = [event.evt, event.data]
778
779            if cmd is None:
780                if block:
781                    continue
782                return False
783
784            self.do_cmd(cmd)
785            return True
786
787    def push_char(self, char: int | bytes) -> None:
788        self.console.push_char(char)
789        self.handle1(block=False)
790
791    def readline(self, startup_hook: Callback | None = None) -> str:
792        """Read a line.  The implementation of this method also shows
793        how to drive Reader if you want more control over the event
794        loop."""
795        self.prepare()
796        try:
797            if startup_hook is not None:
798                startup_hook()
799            self.refresh()
800            while not self.finished:
801                self.handle1()
802            return self.get_unicode()
803
804        finally:
805            self.restore()
806
807    def bind(self, spec: KeySpec, command: CommandName) -> None:
808        self.keymap = self.keymap + ((spec, command),)
809        self.input_trans = input.KeymapTranslator(
810            self.keymap, invalid_cls="invalid-key", character_cls="self-insert"
811        )
812
813    def get_unicode(self) -> str:
814        """Return the current buffer as a unicode string."""
815        return "".join(self.buffer)
816