1# Copyright 2000-2010 Michael Hudson-Doyle <micahel@gmail.com> 2# Antonio Cuni 3# Armin Rigo 4# 5# All Rights Reserved 6# 7# 8# Permission to use, copy, modify, and distribute this software and 9# its documentation for any purpose is hereby granted without fee, 10# provided that the above copyright notice appear in all copies and 11# that both that copyright notice and this permission notice appear in 12# supporting documentation. 13# 14# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO 15# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY 16# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, 17# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER 18# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF 19# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 20# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 21 22from __future__ import annotations 23 24import errno 25import os 26import re 27import select 28import signal 29import struct 30import termios 31import time 32import platform 33from fcntl import ioctl 34 35from . import curses 36from .console import Console, Event 37from .fancy_termios import tcgetattr, tcsetattr 38from .trace import trace 39from .unix_eventqueue import EventQueue 40from .utils import wlen 41 42 43TYPE_CHECKING = False 44 45# types 46if TYPE_CHECKING: 47 from typing import IO, Literal, overload 48else: 49 overload = lambda func: None 50 51 52class InvalidTerminal(RuntimeError): 53 pass 54 55 56_error = (termios.error, curses.error, InvalidTerminal) 57 58SIGWINCH_EVENT = "repaint" 59 60FIONREAD = getattr(termios, "FIONREAD", None) 61TIOCGWINSZ = getattr(termios, "TIOCGWINSZ", None) 62 63# ------------ start of baudrate definitions ------------ 64 65# Add (possibly) missing baudrates (check termios man page) to termios 66 67 68def add_baudrate_if_supported(dictionary: dict[int, int], rate: int) -> None: 69 baudrate_name = "B%d" % rate 70 if hasattr(termios, baudrate_name): 71 dictionary[getattr(termios, baudrate_name)] = rate 72 73 74# Check the termios man page (Line speed) to know where these 75# values come from. 76potential_baudrates = [ 77 0, 78 110, 79 115200, 80 1200, 81 134, 82 150, 83 1800, 84 19200, 85 200, 86 230400, 87 2400, 88 300, 89 38400, 90 460800, 91 4800, 92 50, 93 57600, 94 600, 95 75, 96 9600, 97] 98 99ratedict: dict[int, int] = {} 100for rate in potential_baudrates: 101 add_baudrate_if_supported(ratedict, rate) 102 103# Clean up variables to avoid unintended usage 104del rate, add_baudrate_if_supported 105 106# ------------ end of baudrate definitions ------------ 107 108delayprog = re.compile(b"\\$<([0-9]+)((?:/|\\*){0,2})>") 109 110try: 111 poll: type[select.poll] = select.poll 112except AttributeError: 113 # this is exactly the minumum necessary to support what we 114 # do with poll objects 115 class MinimalPoll: 116 def __init__(self): 117 pass 118 119 def register(self, fd, flag): 120 self.fd = fd 121 # note: The 'timeout' argument is received as *milliseconds* 122 def poll(self, timeout: float | None = None) -> list[int]: 123 if timeout is None: 124 r, w, e = select.select([self.fd], [], []) 125 else: 126 r, w, e = select.select([self.fd], [], [], timeout/1000) 127 return r 128 129 poll = MinimalPoll # type: ignore[assignment] 130 131 132class UnixConsole(Console): 133 def __init__( 134 self, 135 f_in: IO[bytes] | int = 0, 136 f_out: IO[bytes] | int = 1, 137 term: str = "", 138 encoding: str = "", 139 ): 140 """ 141 Initialize the UnixConsole. 142 143 Parameters: 144 - f_in (int or file-like object): Input file descriptor or object. 145 - f_out (int or file-like object): Output file descriptor or object. 146 - term (str): Terminal name. 147 - encoding (str): Encoding to use for I/O operations. 148 """ 149 super().__init__(f_in, f_out, term, encoding) 150 151 self.pollob = poll() 152 self.pollob.register(self.input_fd, select.POLLIN) 153 self.input_buffer = b"" 154 self.input_buffer_pos = 0 155 curses.setupterm(term or None, self.output_fd) 156 self.term = term 157 158 @overload 159 def _my_getstr(cap: str, optional: Literal[False] = False) -> bytes: ... 160 161 @overload 162 def _my_getstr(cap: str, optional: bool) -> bytes | None: ... 163 164 def _my_getstr(cap: str, optional: bool = False) -> bytes | None: 165 r = curses.tigetstr(cap) 166 if not optional and r is None: 167 raise InvalidTerminal( 168 f"terminal doesn't have the required {cap} capability" 169 ) 170 return r 171 172 self._bel = _my_getstr("bel") 173 self._civis = _my_getstr("civis", optional=True) 174 self._clear = _my_getstr("clear") 175 self._cnorm = _my_getstr("cnorm", optional=True) 176 self._cub = _my_getstr("cub", optional=True) 177 self._cub1 = _my_getstr("cub1", optional=True) 178 self._cud = _my_getstr("cud", optional=True) 179 self._cud1 = _my_getstr("cud1", optional=True) 180 self._cuf = _my_getstr("cuf", optional=True) 181 self._cuf1 = _my_getstr("cuf1", optional=True) 182 self._cup = _my_getstr("cup") 183 self._cuu = _my_getstr("cuu", optional=True) 184 self._cuu1 = _my_getstr("cuu1", optional=True) 185 self._dch1 = _my_getstr("dch1", optional=True) 186 self._dch = _my_getstr("dch", optional=True) 187 self._el = _my_getstr("el") 188 self._hpa = _my_getstr("hpa", optional=True) 189 self._ich = _my_getstr("ich", optional=True) 190 self._ich1 = _my_getstr("ich1", optional=True) 191 self._ind = _my_getstr("ind", optional=True) 192 self._pad = _my_getstr("pad", optional=True) 193 self._ri = _my_getstr("ri", optional=True) 194 self._rmkx = _my_getstr("rmkx", optional=True) 195 self._smkx = _my_getstr("smkx", optional=True) 196 197 self.__setup_movement() 198 199 self.event_queue = EventQueue(self.input_fd, self.encoding) 200 self.cursor_visible = 1 201 202 def more_in_buffer(self) -> bool: 203 return bool( 204 self.input_buffer 205 and self.input_buffer_pos < len(self.input_buffer) 206 ) 207 208 def __read(self, n: int) -> bytes: 209 if not self.more_in_buffer(): 210 self.input_buffer = os.read(self.input_fd, 10000) 211 212 ret = self.input_buffer[self.input_buffer_pos : self.input_buffer_pos + n] 213 self.input_buffer_pos += len(ret) 214 if self.input_buffer_pos >= len(self.input_buffer): 215 self.input_buffer = b"" 216 self.input_buffer_pos = 0 217 return ret 218 219 220 def change_encoding(self, encoding: str) -> None: 221 """ 222 Change the encoding used for I/O operations. 223 224 Parameters: 225 - encoding (str): New encoding to use. 226 """ 227 self.encoding = encoding 228 229 def refresh(self, screen, c_xy): 230 """ 231 Refresh the console screen. 232 233 Parameters: 234 - screen (list): List of strings representing the screen contents. 235 - c_xy (tuple): Cursor position (x, y) on the screen. 236 """ 237 cx, cy = c_xy 238 if not self.__gone_tall: 239 while len(self.screen) < min(len(screen), self.height): 240 self.__hide_cursor() 241 self.__move(0, len(self.screen) - 1) 242 self.__write("\n") 243 self.__posxy = 0, len(self.screen) 244 self.screen.append("") 245 else: 246 while len(self.screen) < len(screen): 247 self.screen.append("") 248 249 if len(screen) > self.height: 250 self.__gone_tall = 1 251 self.__move = self.__move_tall 252 253 px, py = self.__posxy 254 old_offset = offset = self.__offset 255 height = self.height 256 257 # we make sure the cursor is on the screen, and that we're 258 # using all of the screen if we can 259 if cy < offset: 260 offset = cy 261 elif cy >= offset + height: 262 offset = cy - height + 1 263 elif offset > 0 and len(screen) < offset + height: 264 offset = max(len(screen) - height, 0) 265 screen.append("") 266 267 oldscr = self.screen[old_offset : old_offset + height] 268 newscr = screen[offset : offset + height] 269 270 # use hardware scrolling if we have it. 271 if old_offset > offset and self._ri: 272 self.__hide_cursor() 273 self.__write_code(self._cup, 0, 0) 274 self.__posxy = 0, old_offset 275 for i in range(old_offset - offset): 276 self.__write_code(self._ri) 277 oldscr.pop(-1) 278 oldscr.insert(0, "") 279 elif old_offset < offset and self._ind: 280 self.__hide_cursor() 281 self.__write_code(self._cup, self.height - 1, 0) 282 self.__posxy = 0, old_offset + self.height - 1 283 for i in range(offset - old_offset): 284 self.__write_code(self._ind) 285 oldscr.pop(0) 286 oldscr.append("") 287 288 self.__offset = offset 289 290 for ( 291 y, 292 oldline, 293 newline, 294 ) in zip(range(offset, offset + height), oldscr, newscr): 295 if oldline != newline: 296 self.__write_changed_line(y, oldline, newline, px) 297 298 y = len(newscr) 299 while y < len(oldscr): 300 self.__hide_cursor() 301 self.__move(0, y) 302 self.__posxy = 0, y 303 self.__write_code(self._el) 304 y += 1 305 306 self.__show_cursor() 307 308 self.screen = screen.copy() 309 self.move_cursor(cx, cy) 310 self.flushoutput() 311 312 def move_cursor(self, x, y): 313 """ 314 Move the cursor to the specified position on the screen. 315 316 Parameters: 317 - x (int): X coordinate. 318 - y (int): Y coordinate. 319 """ 320 if y < self.__offset or y >= self.__offset + self.height: 321 self.event_queue.insert(Event("scroll", None)) 322 else: 323 self.__move(x, y) 324 self.__posxy = x, y 325 self.flushoutput() 326 327 def prepare(self): 328 """ 329 Prepare the console for input/output operations. 330 """ 331 self.__svtermstate = tcgetattr(self.input_fd) 332 raw = self.__svtermstate.copy() 333 raw.iflag &= ~(termios.INPCK | termios.ISTRIP | termios.IXON) 334 raw.oflag &= ~(termios.OPOST) 335 raw.cflag &= ~(termios.CSIZE | termios.PARENB) 336 raw.cflag |= termios.CS8 337 raw.iflag |= termios.BRKINT 338 raw.lflag &= ~(termios.ICANON | termios.ECHO | termios.IEXTEN) 339 raw.lflag |= termios.ISIG 340 raw.cc[termios.VMIN] = 1 341 raw.cc[termios.VTIME] = 0 342 tcsetattr(self.input_fd, termios.TCSADRAIN, raw) 343 344 # In macOS terminal we need to deactivate line wrap via ANSI escape code 345 if platform.system() == "Darwin" and os.getenv("TERM_PROGRAM") == "Apple_Terminal": 346 os.write(self.output_fd, b"\033[?7l") 347 348 self.screen = [] 349 self.height, self.width = self.getheightwidth() 350 351 self.__buffer = [] 352 353 self.__posxy = 0, 0 354 self.__gone_tall = 0 355 self.__move = self.__move_short 356 self.__offset = 0 357 358 self.__maybe_write_code(self._smkx) 359 360 try: 361 self.old_sigwinch = signal.signal(signal.SIGWINCH, self.__sigwinch) 362 except ValueError: 363 pass 364 365 self.__enable_bracketed_paste() 366 367 def restore(self): 368 """ 369 Restore the console to the default state 370 """ 371 self.__disable_bracketed_paste() 372 self.__maybe_write_code(self._rmkx) 373 self.flushoutput() 374 tcsetattr(self.input_fd, termios.TCSADRAIN, self.__svtermstate) 375 376 if platform.system() == "Darwin" and os.getenv("TERM_PROGRAM") == "Apple_Terminal": 377 os.write(self.output_fd, b"\033[?7h") 378 379 if hasattr(self, "old_sigwinch"): 380 signal.signal(signal.SIGWINCH, self.old_sigwinch) 381 del self.old_sigwinch 382 383 def push_char(self, char: int | bytes) -> None: 384 """ 385 Push a character to the console event queue. 386 """ 387 trace("push char {char!r}", char=char) 388 self.event_queue.push(char) 389 390 def get_event(self, block: bool = True) -> Event | None: 391 """ 392 Get an event from the console event queue. 393 394 Parameters: 395 - block (bool): Whether to block until an event is available. 396 397 Returns: 398 - Event: Event object from the event queue. 399 """ 400 if not block and not self.wait(timeout=0): 401 return None 402 403 while self.event_queue.empty(): 404 while True: 405 try: 406 self.push_char(self.__read(1)) 407 except OSError as err: 408 if err.errno == errno.EINTR: 409 if not self.event_queue.empty(): 410 return self.event_queue.get() 411 else: 412 continue 413 else: 414 raise 415 else: 416 break 417 return self.event_queue.get() 418 419 def wait(self, timeout: float | None = None) -> bool: 420 """ 421 Wait for events on the console. 422 """ 423 return ( 424 not self.event_queue.empty() 425 or self.more_in_buffer() 426 or bool(self.pollob.poll(timeout)) 427 ) 428 429 def set_cursor_vis(self, visible): 430 """ 431 Set the visibility of the cursor. 432 433 Parameters: 434 - visible (bool): Visibility flag. 435 """ 436 if visible: 437 self.__show_cursor() 438 else: 439 self.__hide_cursor() 440 441 if TIOCGWINSZ: 442 443 def getheightwidth(self): 444 """ 445 Get the height and width of the console. 446 447 Returns: 448 - tuple: Height and width of the console. 449 """ 450 try: 451 return int(os.environ["LINES"]), int(os.environ["COLUMNS"]) 452 except KeyError: 453 height, width = struct.unpack( 454 "hhhh", ioctl(self.input_fd, TIOCGWINSZ, b"\000" * 8) 455 )[0:2] 456 if not height: 457 return 25, 80 458 return height, width 459 460 else: 461 462 def getheightwidth(self): 463 """ 464 Get the height and width of the console. 465 466 Returns: 467 - tuple: Height and width of the console. 468 """ 469 try: 470 return int(os.environ["LINES"]), int(os.environ["COLUMNS"]) 471 except KeyError: 472 return 25, 80 473 474 def forgetinput(self): 475 """ 476 Discard any pending input on the console. 477 """ 478 termios.tcflush(self.input_fd, termios.TCIFLUSH) 479 480 def flushoutput(self): 481 """ 482 Flush the output buffer. 483 """ 484 for text, iscode in self.__buffer: 485 if iscode: 486 self.__tputs(text) 487 else: 488 os.write(self.output_fd, text.encode(self.encoding, "replace")) 489 del self.__buffer[:] 490 491 def finish(self): 492 """ 493 Finish console operations and flush the output buffer. 494 """ 495 y = len(self.screen) - 1 496 while y >= 0 and not self.screen[y]: 497 y -= 1 498 self.__move(0, min(y, self.height + self.__offset - 1)) 499 self.__write("\n\r") 500 self.flushoutput() 501 502 def beep(self): 503 """ 504 Emit a beep sound. 505 """ 506 self.__maybe_write_code(self._bel) 507 self.flushoutput() 508 509 if FIONREAD: 510 511 def getpending(self): 512 """ 513 Get pending events from the console event queue. 514 515 Returns: 516 - Event: Pending event from the event queue. 517 """ 518 e = Event("key", "", b"") 519 520 while not self.event_queue.empty(): 521 e2 = self.event_queue.get() 522 e.data += e2.data 523 e.raw += e.raw 524 525 amount = struct.unpack("i", ioctl(self.input_fd, FIONREAD, b"\0\0\0\0"))[0] 526 raw = self.__read(amount) 527 data = str(raw, self.encoding, "replace") 528 e.data += data 529 e.raw += raw 530 return e 531 532 else: 533 534 def getpending(self): 535 """ 536 Get pending events from the console event queue. 537 538 Returns: 539 - Event: Pending event from the event queue. 540 """ 541 e = Event("key", "", b"") 542 543 while not self.event_queue.empty(): 544 e2 = self.event_queue.get() 545 e.data += e2.data 546 e.raw += e.raw 547 548 amount = 10000 549 raw = self.__read(amount) 550 data = str(raw, self.encoding, "replace") 551 e.data += data 552 e.raw += raw 553 return e 554 555 def clear(self): 556 """ 557 Clear the console screen. 558 """ 559 self.__write_code(self._clear) 560 self.__gone_tall = 1 561 self.__move = self.__move_tall 562 self.__posxy = 0, 0 563 self.screen = [] 564 565 @property 566 def input_hook(self): 567 try: 568 import posix 569 except ImportError: 570 return None 571 if posix._is_inputhook_installed(): 572 return posix._inputhook 573 574 def __enable_bracketed_paste(self) -> None: 575 os.write(self.output_fd, b"\x1b[?2004h") 576 577 def __disable_bracketed_paste(self) -> None: 578 os.write(self.output_fd, b"\x1b[?2004l") 579 580 def __setup_movement(self): 581 """ 582 Set up the movement functions based on the terminal capabilities. 583 """ 584 if 0 and self._hpa: # hpa don't work in windows telnet :-( 585 self.__move_x = self.__move_x_hpa 586 elif self._cub and self._cuf: 587 self.__move_x = self.__move_x_cub_cuf 588 elif self._cub1 and self._cuf1: 589 self.__move_x = self.__move_x_cub1_cuf1 590 else: 591 raise RuntimeError("insufficient terminal (horizontal)") 592 593 if self._cuu and self._cud: 594 self.__move_y = self.__move_y_cuu_cud 595 elif self._cuu1 and self._cud1: 596 self.__move_y = self.__move_y_cuu1_cud1 597 else: 598 raise RuntimeError("insufficient terminal (vertical)") 599 600 if self._dch1: 601 self.dch1 = self._dch1 602 elif self._dch: 603 self.dch1 = curses.tparm(self._dch, 1) 604 else: 605 self.dch1 = None 606 607 if self._ich1: 608 self.ich1 = self._ich1 609 elif self._ich: 610 self.ich1 = curses.tparm(self._ich, 1) 611 else: 612 self.ich1 = None 613 614 self.__move = self.__move_short 615 616 def __write_changed_line(self, y, oldline, newline, px_coord): 617 # this is frustrating; there's no reason to test (say) 618 # self.dch1 inside the loop -- but alternative ways of 619 # structuring this function are equally painful (I'm trying to 620 # avoid writing code generators these days...) 621 minlen = min(wlen(oldline), wlen(newline)) 622 x_pos = 0 623 x_coord = 0 624 625 px_pos = 0 626 j = 0 627 for c in oldline: 628 if j >= px_coord: 629 break 630 j += wlen(c) 631 px_pos += 1 632 633 # reuse the oldline as much as possible, but stop as soon as we 634 # encounter an ESCAPE, because it might be the start of an escape 635 # sequene 636 while ( 637 x_coord < minlen 638 and oldline[x_pos] == newline[x_pos] 639 and newline[x_pos] != "\x1b" 640 ): 641 x_coord += wlen(newline[x_pos]) 642 x_pos += 1 643 644 # if we need to insert a single character right after the first detected change 645 if oldline[x_pos:] == newline[x_pos + 1 :] and self.ich1: 646 if ( 647 y == self.__posxy[1] 648 and x_coord > self.__posxy[0] 649 and oldline[px_pos:x_pos] == newline[px_pos + 1 : x_pos + 1] 650 ): 651 x_pos = px_pos 652 x_coord = px_coord 653 character_width = wlen(newline[x_pos]) 654 self.__move(x_coord, y) 655 self.__write_code(self.ich1) 656 self.__write(newline[x_pos]) 657 self.__posxy = x_coord + character_width, y 658 659 # if it's a single character change in the middle of the line 660 elif ( 661 x_coord < minlen 662 and oldline[x_pos + 1 :] == newline[x_pos + 1 :] 663 and wlen(oldline[x_pos]) == wlen(newline[x_pos]) 664 ): 665 character_width = wlen(newline[x_pos]) 666 self.__move(x_coord, y) 667 self.__write(newline[x_pos]) 668 self.__posxy = x_coord + character_width, y 669 670 # if this is the last character to fit in the line and we edit in the middle of the line 671 elif ( 672 self.dch1 673 and self.ich1 674 and wlen(newline) == self.width 675 and x_coord < wlen(newline) - 2 676 and newline[x_pos + 1 : -1] == oldline[x_pos:-2] 677 ): 678 self.__hide_cursor() 679 self.__move(self.width - 2, y) 680 self.__posxy = self.width - 2, y 681 self.__write_code(self.dch1) 682 683 character_width = wlen(newline[x_pos]) 684 self.__move(x_coord, y) 685 self.__write_code(self.ich1) 686 self.__write(newline[x_pos]) 687 self.__posxy = character_width + 1, y 688 689 else: 690 self.__hide_cursor() 691 self.__move(x_coord, y) 692 if wlen(oldline) > wlen(newline): 693 self.__write_code(self._el) 694 self.__write(newline[x_pos:]) 695 self.__posxy = wlen(newline), y 696 697 if "\x1b" in newline: 698 # ANSI escape characters are present, so we can't assume 699 # anything about the position of the cursor. Moving the cursor 700 # to the left margin should work to get to a known position. 701 self.move_cursor(0, y) 702 703 def __write(self, text): 704 self.__buffer.append((text, 0)) 705 706 def __write_code(self, fmt, *args): 707 self.__buffer.append((curses.tparm(fmt, *args), 1)) 708 709 def __maybe_write_code(self, fmt, *args): 710 if fmt: 711 self.__write_code(fmt, *args) 712 713 def __move_y_cuu1_cud1(self, y): 714 dy = y - self.__posxy[1] 715 if dy > 0: 716 self.__write_code(dy * self._cud1) 717 elif dy < 0: 718 self.__write_code((-dy) * self._cuu1) 719 720 def __move_y_cuu_cud(self, y): 721 dy = y - self.__posxy[1] 722 if dy > 0: 723 self.__write_code(self._cud, dy) 724 elif dy < 0: 725 self.__write_code(self._cuu, -dy) 726 727 def __move_x_hpa(self, x: int) -> None: 728 if x != self.__posxy[0]: 729 self.__write_code(self._hpa, x) 730 731 def __move_x_cub1_cuf1(self, x: int) -> None: 732 dx = x - self.__posxy[0] 733 if dx > 0: 734 self.__write_code(self._cuf1 * dx) 735 elif dx < 0: 736 self.__write_code(self._cub1 * (-dx)) 737 738 def __move_x_cub_cuf(self, x: int) -> None: 739 dx = x - self.__posxy[0] 740 if dx > 0: 741 self.__write_code(self._cuf, dx) 742 elif dx < 0: 743 self.__write_code(self._cub, -dx) 744 745 def __move_short(self, x, y): 746 self.__move_x(x) 747 self.__move_y(y) 748 749 def __move_tall(self, x, y): 750 assert 0 <= y - self.__offset < self.height, y - self.__offset 751 self.__write_code(self._cup, y - self.__offset, x) 752 753 def __sigwinch(self, signum, frame): 754 self.height, self.width = self.getheightwidth() 755 self.event_queue.insert(Event("resize", None)) 756 757 def __hide_cursor(self): 758 if self.cursor_visible: 759 self.__maybe_write_code(self._civis) 760 self.cursor_visible = 0 761 762 def __show_cursor(self): 763 if not self.cursor_visible: 764 self.__maybe_write_code(self._cnorm) 765 self.cursor_visible = 1 766 767 def repaint(self): 768 if not self.__gone_tall: 769 self.__posxy = 0, self.__posxy[1] 770 self.__write("\r") 771 ns = len(self.screen) * ["\000" * self.width] 772 self.screen = ns 773 else: 774 self.__posxy = 0, self.__offset 775 self.__move(0, self.__offset) 776 ns = self.height * ["\000" * self.width] 777 self.screen = ns 778 779 def __tputs(self, fmt, prog=delayprog): 780 """A Python implementation of the curses tputs function; the 781 curses one can't really be wrapped in a sane manner. 782 783 I have the strong suspicion that this is complexity that 784 will never do anyone any good.""" 785 # using .get() means that things will blow up 786 # only if the bps is actually needed (which I'm 787 # betting is pretty unlkely) 788 bps = ratedict.get(self.__svtermstate.ospeed) 789 while 1: 790 m = prog.search(fmt) 791 if not m: 792 os.write(self.output_fd, fmt) 793 break 794 x, y = m.span() 795 os.write(self.output_fd, fmt[:x]) 796 fmt = fmt[y:] 797 delay = int(m.group(1)) 798 if b"*" in m.group(2): 799 delay *= self.height 800 if self._pad and bps is not None: 801 nchars = (bps * delay) / 1000 802 os.write(self.output_fd, self._pad * nchars) 803 else: 804 time.sleep(float(delay) / 1000.0) 805