1# Copyright 2022 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""LogScreen tracks lines to display on screen with a set of ScreenLines.""" 15 16from __future__ import annotations 17import collections 18import dataclasses 19import logging 20from typing import Callable, List, Optional, Tuple, TYPE_CHECKING 21 22from prompt_toolkit.formatted_text import ( 23 to_formatted_text, 24 StyleAndTextTuples, 25) 26 27from pw_console.log_filter import LogFilter 28from pw_console.text_formatting import ( 29 fill_character_width, 30 insert_linebreaks, 31 split_lines, 32) 33 34if TYPE_CHECKING: 35 from pw_console.log_line import LogLine 36 from pw_console.log_pane import LogPane 37 38_LOG = logging.getLogger(__package__) 39 40 41@dataclasses.dataclass 42class ScreenLine: 43 """A single line of text for displaying on screen. 44 45 Instances of ScreenLine are stored in a LogScreen's line_buffer deque. When 46 a new log message is added it may be converted into multiple ScreenLine 47 instances if the text is wrapped across multiple lines. 48 49 For example: say our screen is 80 characters wide and a log message 240 50 characters long needs to be displayed. With line wrapping on we will need 51 240/80 = 3 lines to show the full message. Say also that this single log 52 message is at index #5 in the LogStore classes deque, this is the log_index 53 value. This single log message will then be split into 3 separate 54 ScreenLine instances: 55 56 :: 57 ScreenLine(fragments=[('', 'Log message text line one')], 58 log_index=5, subline=0, height=3) 59 ScreenLine(fragments=[('', 'Log message text line two')], 60 log_index=5, subline=1, height=3) 61 ScreenLine(fragments=[('', 'Log message text line three')], 62 log_index=5, subline=2, height=3) 63 64 Each `fragments` attribute will store the formatted text indended to be 65 direcly drawn to the screen. Since these three lines are all displaying the 66 same log message their `log_index` reference will be the same. The `subline` 67 attribute is the zero-indexed number for this log's wrapped line count and 68 `height` is the total ScreenLines needed to show this log message. 69 70 Continuing with this example say the next two log messages to display both 71 fit on screen with no wrapping. They will both be represented with one 72 ScreenLine each: 73 74 :: 75 ScreenLine(fragments=[('', 'Another log message')], 76 log_index=6, subline=0, height=1) 77 ScreenLine(fragments=[('', 'Yet another log message')], 78 log_index=7, subline=0, height=1) 79 80 The `log_index` is different for each since these are both separate 81 logs. The subline is 0 since each line is the first one for this log. Both 82 have a height of 1 since no line wrapping was performed. 83 """ 84 # The StyleAndTextTuples for this line ending with a '\n'. These are the raw 85 # prompt_toolkit formatted text tuples to display on screen. The colors and 86 # spacing can change depending on the formatters used in the 87 # LogScreen._get_fragments_per_line() function. 88 fragments: StyleAndTextTuples 89 90 # Log index reference for this screen line. This is the index to where the 91 # log message resides in the parent LogStore.logs deque. It is set to None 92 # if this is an empty ScreenLine. If a log message requires line wrapping 93 # then each resulting ScreenLine instance will have the same log_index 94 # value. 95 # 96 # This log_index may also be the integer index into a LogView.filtered_logs 97 # deque depending on if log messages are being filtered by the user. The 98 # LogScreen class below doesn't need to do anything different in either 99 # case. It's the responsibility of LogScreen.get_log_source() to return the 100 # correct source. 101 # 102 # Note this is NOT an index into LogScreen.line_buffer. 103 log_index: Optional[int] = None 104 105 # Keep track the total height and subline number for this log message. 106 # For example this line could be subline (0, 1, or 2) of a log message with 107 # a total height 3. 108 109 # Subline index. 110 subline: int = 0 111 # Total height in lines of text (also ScreenLine count) that the log message 112 # referred to by log_index requires. When a log message is split across 113 # multiple lines height will be set to the same value for each ScreenLine 114 # instance. 115 height: int = 1 116 117 # Empty lines will have no log_index 118 def empty(self) -> bool: 119 return self.log_index is None 120 121 122@dataclasses.dataclass 123class LogScreen: 124 """LogScreen maintains the state of visible logs on screen. 125 126 It is responsible for moving the cursor_position, prepending and appending 127 log lines as the user moves the cursor.""" 128 # Callable functions to retrieve logs and display formatting. 129 get_log_source: Callable[[], Tuple[int, collections.deque[LogLine]]] 130 get_line_wrapping: Callable[[], bool] 131 get_log_formatter: Callable[[], Optional[Callable[[LogLine], 132 StyleAndTextTuples]]] 133 get_search_filter: Callable[[], Optional[LogFilter]] 134 get_search_highlight: Callable[[], bool] 135 136 # Window row of the current cursor position 137 cursor_position: int = 0 138 # Screen width and height in number of characters. 139 width: int = 0 140 height: int = 0 141 # Buffer of literal text lines to be displayed on screen. Each visual line 142 # is represented by a ScreenLine instance and will have a max width equal 143 # to the screen's width. If any given whole log message requires line 144 # wrapping to be displayed it will be represented by multiple ScreenLine 145 # instances in this deque. 146 line_buffer: collections.deque[ScreenLine] = dataclasses.field( 147 default_factory=collections.deque) 148 149 def __post_init__(self) -> None: 150 # Empty screen flag. Will be true if the screen contains only newlines. 151 self._empty: bool = True 152 # Save the last log index when appending. Useful for tracking how many 153 # new lines need appending in follow mode. 154 self.last_appended_log_index: int = 0 155 156 def _fill_top_with_empty_lines(self) -> None: 157 """Add empty lines to fill the remaining empty screen space.""" 158 for _ in range(self.height - len(self.line_buffer)): 159 self.line_buffer.appendleft(ScreenLine([('', '')])) 160 161 def clear_screen(self) -> None: 162 """Erase all lines and fill with empty lines.""" 163 self.line_buffer.clear() 164 self._fill_top_with_empty_lines() 165 self._empty = True 166 167 def empty(self) -> bool: 168 """Return True if the screen has no lines with content.""" 169 return self._empty 170 171 def reset_logs( 172 self, 173 log_index: int = 0, 174 ) -> None: 175 """Erase the screen and append logs starting from log_index.""" 176 self.clear_screen() 177 178 start_log_index, log_source = self.get_log_source() 179 if len(log_source) == 0: 180 return 181 182 # Append at most at most the window height number worth of logs. If the 183 # number of available logs is less, use that amount. 184 max_log_messages_to_fetch = min(self.height, len(log_source)) 185 186 # Including the target log_index, fetch the desired logs. 187 # For example if we are rendering log_index 10 and the window height is 188 # 6 the range below will be: 189 # >>> list(i for i in range((10 - 6) + 1, 10 + 1)) 190 # [5, 6, 7, 8, 9, 10] 191 for i in range((log_index - max_log_messages_to_fetch) + 1, 192 log_index + 1): 193 # If i is < 0 it's an invalid log, skip to the next line. The next 194 # index could be 0 or higher since we are traversing in increasing 195 # order. 196 if i < start_log_index: 197 continue 198 self.append_log(i) 199 # Make sure the bottom line is highlighted. 200 self.move_cursor_to_bottom() 201 202 def resize(self, width, height) -> None: 203 """Update screen width and height. 204 205 Following a resize the caller should run reset_logs().""" 206 self.width = width 207 self.height = height 208 209 def get_lines( 210 self, 211 marked_logs_start: Optional[int] = None, 212 marked_logs_end: Optional[int] = None, 213 ) -> List[StyleAndTextTuples]: 214 """Return lines for final display. 215 216 Styling is added for the line under the cursor.""" 217 if not marked_logs_start: 218 marked_logs_start = -1 219 if not marked_logs_end: 220 marked_logs_end = -1 221 222 all_lines: List[StyleAndTextTuples] = [] 223 # Loop through a copy of the line_buffer in case it is mutated before 224 # this function is complete. 225 for i, line in enumerate(list(self.line_buffer)): 226 227 # Is this line the cursor_position? Apply line highlighting 228 if (i == self.cursor_position 229 and (self.cursor_position < len(self.line_buffer)) 230 and not self.line_buffer[self.cursor_position].empty()): 231 # Fill in empty charaters to the width of the screen. This 232 # ensures the backgound is highlighted to the edge of the 233 # screen. 234 new_fragments = fill_character_width( 235 line.fragments, 236 len(line.fragments) - 1, # -1 for the ending line break 237 self.width, 238 ) 239 240 # Apply a style to highlight this line. 241 all_lines.append( 242 to_formatted_text(new_fragments, 243 style='class:selected-log-line')) 244 elif line.log_index is not None and ( 245 marked_logs_start <= line.log_index <= marked_logs_end): 246 new_fragments = fill_character_width( 247 line.fragments, 248 len(line.fragments) - 1, # -1 for the ending line break 249 self.width, 250 ) 251 252 # Apply a style to highlight this line. 253 all_lines.append( 254 to_formatted_text(new_fragments, 255 style='class:marked-log-line')) 256 257 else: 258 all_lines.append(line.fragments) 259 260 return all_lines 261 262 def _prepend_line(self, line: ScreenLine) -> None: 263 """Add a line to the top of the screen.""" 264 self.line_buffer.appendleft(line) 265 self._empty = False 266 267 def _append_line(self, line: ScreenLine) -> None: 268 """Add a line to the bottom of the screen.""" 269 self.line_buffer.append(line) 270 self._empty = False 271 272 def _trim_top_lines(self) -> None: 273 """Remove lines from the top if larger than the screen height.""" 274 overflow_amount = len(self.line_buffer) - self.height 275 for _ in range(overflow_amount): 276 self.line_buffer.popleft() 277 278 def _trim_bottom_lines(self) -> None: 279 """Remove lines from the bottom if larger than the screen height.""" 280 overflow_amount = len(self.line_buffer) - self.height 281 for _ in range(overflow_amount): 282 self.line_buffer.pop() 283 284 def move_cursor_up(self, line_count: int) -> int: 285 """Move the cursor up as far as it can go without fetching new lines. 286 287 Args: 288 line_count: A negative number of lines to move the cursor by. 289 290 Returns: 291 int: The remaining line count that was not moved. This is the number 292 of new lines that need to be fetched and prepended to the screen 293 line buffer.""" 294 remaining_lines = line_count 295 296 # Loop from a negative line_count value to zero. 297 # For example if line_count is -5 the loop will traverse: 298 # >>> list(i for i in range(-5, 0, 1)) 299 # [-5, -4, -3, -2, -1] 300 for _ in range(line_count, 0, 1): 301 new_index = self.cursor_position - 1 302 if new_index < 0: 303 break 304 if (new_index < len(self.line_buffer) 305 and self.line_buffer[new_index].empty()): 306 # The next line is empty and has no content. 307 break 308 self.cursor_position -= 1 309 remaining_lines += 1 310 return remaining_lines 311 312 def move_cursor_down(self, line_count: int) -> int: 313 """Move the cursor down as far as it can go without fetching new lines. 314 315 Args: 316 line_count: A positive number of lines to move the cursor down by. 317 318 Returns: 319 int: The remaining line count that was not moved. This is the number 320 of new lines that need to be fetched and appended to the screen line 321 buffer.""" 322 remaining_lines = line_count 323 for _ in range(line_count): 324 new_index = self.cursor_position + 1 325 if new_index >= self.height: 326 break 327 if (new_index < len(self.line_buffer) 328 and self.line_buffer[new_index].empty()): 329 # The next line is empty and has no content. 330 break 331 self.cursor_position += 1 332 remaining_lines -= 1 333 return remaining_lines 334 335 def move_cursor_to_bottom(self) -> None: 336 """Move the cursor to the bottom of the screen. 337 338 Only use this for movement not initiated by users. For example if new 339 logs were just added to the bottom of the screen in follow 340 mode. The LogScreen class does not allow scrolling beyond the bottom of 341 the content so the cursor will fall on a log message as long as there 342 are some log messages. If there are no log messages the line is not 343 highlighted by get_lines().""" 344 self.cursor_position = self.height - 1 345 346 def move_cursor_to_position(self, window_row: int) -> None: 347 """Move the cursor to a line if there is a log message there.""" 348 if window_row >= len(self.line_buffer): 349 return 350 if 0 <= window_row < self.height: 351 current_line = self.line_buffer[window_row] 352 if current_line.log_index is not None: 353 self.cursor_position = window_row 354 355 def _move_selection_to_log(self, log_index: int, subline: int) -> None: 356 """Move the cursor to the location of log_index.""" 357 for i, line in enumerate(self.line_buffer): 358 if line.log_index == log_index and line.subline == subline: 359 self.cursor_position = i 360 return 361 362 def shift_selected_log_to_top(self) -> None: 363 """Shift the selected line to the top. 364 365 This moves the lines on screen and keeps the originally selected line 366 highlighted. Example use case: when jumping to a search match the 367 matched line will be shown at the top of the screen.""" 368 if not 0 <= self.cursor_position < len(self.line_buffer): 369 return 370 371 current_line = self.line_buffer[self.cursor_position] 372 amount = max(self.cursor_position, current_line.height) 373 amount -= current_line.subline 374 remaining_lines = self.scroll_subline(amount) 375 if remaining_lines != 0 and current_line.log_index is not None: 376 # Restore original selected line. 377 self._move_selection_to_log(current_line.log_index, 378 current_line.subline) 379 return 380 # Lines scrolled as expected, set cursor_position to top. 381 self.cursor_position = 0 382 383 def shift_selected_log_to_center(self) -> None: 384 """Shift the selected line to the center. 385 386 This moves the lines on screen and keeps the originally selected line 387 highlighted. Example use case: when jumping to a search match the 388 matched line will be shown at the center of the screen.""" 389 if not 0 <= self.cursor_position < len(self.line_buffer): 390 return 391 392 half_height = int(self.height / 2) 393 current_line = self.line_buffer[self.cursor_position] 394 395 amount = max(self.cursor_position - half_height, current_line.height) 396 amount -= current_line.subline 397 398 remaining_lines = self.scroll_subline(amount) 399 if remaining_lines != 0 and current_line.log_index is not None: 400 # Restore original selected line. 401 self._move_selection_to_log(current_line.log_index, 402 current_line.subline) 403 return 404 405 # Lines scrolled as expected, set cursor_position to center. 406 self.cursor_position -= amount 407 self.cursor_position -= (current_line.height - 1) 408 409 def scroll_subline(self, line_count: int = 1) -> int: 410 """Move the cursor down or up by positive or negative lines. 411 412 Args: 413 line_count: A positive or negative number of lines the cursor should 414 move. Positive for down, negative for up. 415 416 Returns: 417 int: The remaining line count that was not moved. This is the number 418 of new lines that could not be fetched in the case that the top or 419 bottom of available log message lines was reached.""" 420 # Move self.cursor_position as far as it can go on screen without 421 # fetching new log message lines. 422 if line_count > 0: 423 remaining_lines = self.move_cursor_down(line_count) 424 else: 425 remaining_lines = self.move_cursor_up(line_count) 426 427 if remaining_lines == 0: 428 # No more lines needed, return 429 return remaining_lines 430 431 # Top or bottom of the screen was reached, fetch and add new log lines. 432 if remaining_lines < 0: 433 return self.fetch_subline_up(remaining_lines) 434 return self.fetch_subline_down(remaining_lines) 435 436 def fetch_subline_up(self, line_count: int = -1) -> int: 437 """Fetch new lines from the top in order of decreasing log_indexes. 438 439 Args: 440 line_count: A negative number of lines that should be fetched and 441 added to the top of the screen. 442 443 Returns: 444 int: The number of lines that were not fetched. Returns 0 if the 445 desired number of lines were fetched successfully.""" 446 start_log_index, _log_source = self.get_log_source() 447 remaining_lines = line_count 448 for _ in range(line_count, 0, 1): 449 current_line = self.get_line_at_cursor_position() 450 if current_line.log_index is None: 451 return remaining_lines + 1 452 453 target_log_index: int 454 target_subline: int 455 456 # If the current subline is at the start of this log, fetch the 457 # previous log message's last subline. 458 if current_line.subline == 0: 459 target_log_index = current_line.log_index - 1 460 # Set -1 to signal fetching the previous log's last subline 461 target_subline = -1 462 else: 463 # Get previous sub line of current log 464 target_log_index = current_line.log_index 465 target_subline = current_line.subline - 1 466 467 if target_log_index < start_log_index: 468 # Invalid log_index, don't scroll further 469 return remaining_lines + 1 470 471 self.prepend_log(target_log_index, subline=target_subline) 472 remaining_lines += 1 473 474 return remaining_lines 475 476 def get_line_at_cursor_position(self) -> ScreenLine: 477 """Returns the ScreenLine under the cursor.""" 478 if (self.cursor_position >= len(self.line_buffer) 479 or self.cursor_position < 0): 480 return ScreenLine([('', '')]) 481 return self.line_buffer[self.cursor_position] 482 483 def fetch_subline_down(self, line_count: int = 1) -> int: 484 """Fetch new lines from the bottom in order of increasing log_indexes. 485 486 Args: 487 line_count: A positive number of lines that should be fetched and 488 added to the bottom of the screen. 489 490 Returns: 491 int: The number of lines that were not fetched. Returns 0 if the 492 desired number of lines were fetched successfully.""" 493 _start_log_index, log_source = self.get_log_source() 494 remaining_lines = line_count 495 for _ in range(line_count): 496 # Skip this line if not at the bottom 497 if self.cursor_position < self.height - 1: 498 self.cursor_position += 1 499 continue 500 501 current_line = self.get_line_at_cursor_position() 502 if current_line.log_index is None: 503 return remaining_lines - 1 504 505 target_log_index: int 506 target_subline: int 507 508 # If the current subline is at the height of this log, fetch the 509 # next log message. 510 if current_line.subline == current_line.height - 1: 511 # Get next log's first subline 512 target_log_index = current_line.log_index + 1 513 target_subline = 0 514 else: 515 # Get next sub line of current log 516 target_log_index = current_line.log_index 517 target_subline = current_line.subline + 1 518 519 if target_log_index >= len(log_source): 520 # Invalid log_index, don't scroll further 521 return remaining_lines - 1 522 523 self.append_log(target_log_index, subline=target_subline) 524 remaining_lines -= 1 525 526 return remaining_lines 527 528 def first_rendered_log_index(self) -> Optional[int]: 529 """Scan the screen for the first valid log_index and return it.""" 530 log_index = None 531 for i in range(self.height): 532 if i >= len(self.line_buffer): 533 break 534 if self.line_buffer[i].log_index is not None: 535 log_index = self.line_buffer[i].log_index 536 break 537 return log_index 538 539 def last_rendered_log_index(self) -> Optional[int]: 540 """Return the last log_index shown on screen.""" 541 log_index = None 542 if len(self.line_buffer) == 0: 543 return None 544 if self.line_buffer[-1].log_index is not None: 545 log_index = self.line_buffer[-1].log_index 546 return log_index 547 548 def _get_fragments_per_line(self, 549 log_index: int) -> List[StyleAndTextTuples]: 550 """Return a list of lines wrapped to the screen width for a log. 551 552 Before fetching the log message this function updates the log_source and 553 formatting options.""" 554 _start_log_index, log_source = self.get_log_source() 555 if log_index >= len(log_source): 556 return [] 557 log = log_source[log_index] 558 table_formatter = self.get_log_formatter() 559 truncate_lines = not self.get_line_wrapping() 560 search_filter = self.get_search_filter() 561 search_highlight = self.get_search_highlight() 562 563 # Select the log display formatter; table or standard. 564 fragments: StyleAndTextTuples = [] 565 if table_formatter: 566 fragments = table_formatter(log) 567 else: 568 fragments = log.get_fragments() 569 570 # Apply search term highlighting. 571 if search_filter and search_highlight and search_filter.matches(log): 572 fragments = search_filter.highlight_search_matches(fragments) 573 574 # Word wrap the log message or truncate to screen width 575 line_fragments, _log_line_height = insert_linebreaks( 576 fragments, 577 max_line_width=self.width, 578 truncate_long_lines=truncate_lines) 579 # Convert the existing flattened fragments to a list of lines. 580 fragments_per_line = split_lines(line_fragments) 581 582 return fragments_per_line 583 584 def prepend_log( 585 self, 586 log_index: int, 587 subline: Optional[int] = None, 588 ) -> None: 589 """Add a log message or a single line to the top of the screen. 590 591 Args: 592 log_index: The index of the log message to fetch. 593 subline: The desired subline of the log message. When displayed on 594 screen the log message may take up more than one line. If 595 subline is 0 or higher that line will be added. If subline is -1 596 the last subline will be prepended regardless of the total log 597 message height. 598 """ 599 fragments_per_line = self._get_fragments_per_line(log_index) 600 601 # Target the last subline if the subline arg is set to -1. 602 fetch_last_subline = (subline == -1) 603 604 for line_index, line in enumerate(fragments_per_line): 605 # If we are looking for a specific subline and this isn't it, skip. 606 if subline is not None: 607 # If subline is set to -1 we need to append the last subline of 608 # this log message. Skip this line if it isn't the last one. 609 if fetch_last_subline and (line_index != 610 len(fragments_per_line) - 1): 611 continue 612 # If subline is not -1 (0 or higher) and this isn't the desired 613 # line, skip to the next one. 614 if not fetch_last_subline and line_index != subline: 615 continue 616 617 self._prepend_line( 618 ScreenLine( 619 fragments=line, 620 log_index=log_index, 621 subline=line_index, 622 height=len(fragments_per_line), 623 )) 624 625 # Remove lines from the bottom if over the screen height. 626 if len(self.line_buffer) > self.height: 627 self._trim_bottom_lines() 628 629 def append_log( 630 self, 631 log_index: int, 632 subline: Optional[int] = None, 633 ) -> None: 634 """Add a log message or a single line to the bottom of the screen.""" 635 # Save this log_index 636 self.last_appended_log_index = log_index 637 fragments_per_line = self._get_fragments_per_line(log_index) 638 639 for line_index, line in enumerate(fragments_per_line): 640 # If we are looking for a specific subline and this isn't it, skip. 641 if subline is not None and line_index != subline: 642 continue 643 644 self._append_line( 645 ScreenLine( 646 fragments=line, 647 log_index=log_index, 648 subline=line_index, 649 height=len(fragments_per_line), 650 )) 651 652 # Remove lines from the top if over the screen height. 653 if len(self.line_buffer) > self.height: 654 self._trim_top_lines() 655