1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""LogView maintains a log pane's scrolling and searching state.""" 15 16from __future__ import annotations 17import asyncio 18import collections 19import copy 20from enum import Enum 21import itertools 22import logging 23import operator 24from pathlib import Path 25import re 26import time 27from typing import Callable, Dict, List, Optional, Tuple, TYPE_CHECKING 28 29from prompt_toolkit.data_structures import Point 30from prompt_toolkit.formatted_text import StyleAndTextTuples 31 32from pw_console.log_filter import ( 33 DEFAULT_SEARCH_MATCHER, 34 LogFilter, 35 RegexValidator, 36 SearchMatcher, 37 preprocess_search_regex, 38) 39from pw_console.log_screen import ScreenLine, LogScreen 40from pw_console.log_store import LogStore 41from pw_console.text_formatting import remove_formatting 42 43if TYPE_CHECKING: 44 from pw_console.console_app import ConsoleApp 45 from pw_console.log_line import LogLine 46 from pw_console.log_pane import LogPane 47 48_LOG = logging.getLogger(__package__) 49 50 51class FollowEvent(Enum): 52 """Follow mode scroll event types.""" 53 SEARCH_MATCH = 'scroll_to_bottom' 54 STICKY_FOLLOW = 'scroll_to_bottom_with_sticky_follow' 55 56 57class LogView: 58 """Viewing window into a LogStore.""" 59 60 # pylint: disable=too-many-instance-attributes,too-many-public-methods 61 62 def __init__( 63 self, 64 log_pane: 'LogPane', 65 application: 'ConsoleApp', 66 log_store: Optional[LogStore] = None, 67 ): 68 # Parent LogPane reference. Updated by calling `set_log_pane()`. 69 self.log_pane = log_pane 70 self.log_store = log_store if log_store else LogStore( 71 prefs=application.prefs) 72 self.log_store.set_prefs(application.prefs) 73 self.log_store.register_viewer(self) 74 75 self.marked_logs_start: Optional[int] = None 76 self.marked_logs_end: Optional[int] = None 77 78 # Search variables 79 self.search_text: Optional[str] = None 80 self.search_filter: Optional[LogFilter] = None 81 self.search_highlight: bool = False 82 self.search_matcher = DEFAULT_SEARCH_MATCHER 83 self.search_validator = RegexValidator() 84 85 # Container for each log_index matched by active searches. 86 self.search_matched_lines: Dict[int, int] = {} 87 # Background task to find historical matched lines. 88 self.search_match_count_task: Optional[asyncio.Task] = None 89 90 # Flag for automatically jumping to each new search match as they 91 # appear. 92 self.follow_search_match: bool = False 93 self.last_search_matched_log: Optional[int] = None 94 95 # Follow event flag. This is set to by the new_logs_arrived() function 96 # as a signal that the log screen should be scrolled to the bottom. 97 # This is read by render_content() whenever the screen is drawn. 98 self.follow_event: Optional[FollowEvent] = None 99 100 self.log_screen = LogScreen( 101 get_log_source=self._get_log_lines, 102 get_line_wrapping=self.wrap_lines_enabled, 103 get_log_formatter=self._get_table_formatter, 104 get_search_filter=lambda: self.search_filter, 105 get_search_highlight=lambda: self.search_highlight, 106 ) 107 108 # Filter 109 self.filtering_on: bool = False 110 self.filters: 'collections.OrderedDict[str, LogFilter]' = ( 111 collections.OrderedDict()) 112 self.filtered_logs: collections.deque = collections.deque() 113 self.filter_existing_logs_task: Optional[asyncio.Task] = None 114 115 # Current log line index state variables: 116 self._last_log_index = -1 117 self._log_index = 0 118 self._filtered_log_index = 0 119 self._last_start_index = 0 120 self._last_end_index = 0 121 self._current_start_index = 0 122 self._current_end_index = 0 123 self._scrollback_start_index = 0 124 125 # LogPane prompt_toolkit container render size. 126 self._window_height = 20 127 self._window_width = 80 128 self._reset_log_screen_on_next_render: bool = True 129 self._user_scroll_event: bool = False 130 131 # Max frequency in seconds of prompt_toolkit UI redraws triggered by new 132 # log lines. 133 self._ui_update_frequency = 0.05 134 self._last_ui_update_time = time.time() 135 self._last_log_store_index = 0 136 self._new_logs_since_last_render = True 137 138 # Should new log lines be tailed? 139 self.follow: bool = True 140 141 self.visual_select_mode: bool = False 142 143 # Cache of formatted text tuples used in the last UI render. 144 self._line_fragment_cache: List[StyleAndTextTuples] = [] 145 146 def view_mode_changed(self) -> None: 147 self._reset_log_screen_on_next_render = True 148 149 @property 150 def log_index(self): 151 if self.filtering_on: 152 return self._filtered_log_index 153 return self._log_index 154 155 @log_index.setter 156 def log_index(self, new_log_index): 157 # Save the old log_index 158 self._last_log_index = self.log_index 159 if self.filtering_on: 160 self._filtered_log_index = new_log_index 161 else: 162 self._log_index = new_log_index 163 164 def _reset_log_index_changed(self) -> None: 165 self._last_log_index = self.log_index 166 167 def log_index_changed_since_last_render(self) -> bool: 168 return self._last_log_index != self.log_index 169 170 def _set_match_position(self, position: int): 171 self.follow = False 172 self.log_index = position 173 self.save_search_matched_line(position) 174 self.log_screen.reset_logs(log_index=self.log_index) 175 self.log_screen.shift_selected_log_to_center() 176 self._user_scroll_event = True 177 self.log_pane.application.redraw_ui() 178 179 def select_next_search_matcher(self): 180 matchers = list(SearchMatcher) 181 index = matchers.index(self.search_matcher) 182 new_index = (index + 1) % len(matchers) 183 self.search_matcher = matchers[new_index] 184 185 def search_forwards(self): 186 if not self.search_filter: 187 return 188 self.search_highlight = True 189 190 log_beginning_index = self.hidden_line_count() 191 192 starting_index = self.log_index + 1 193 if starting_index > self.get_last_log_index(): 194 starting_index = log_beginning_index 195 196 _, logs = self._get_log_lines() 197 198 # From current position +1 and down 199 for i in range(starting_index, self.get_last_log_index() + 1): 200 if self.search_filter.matches(logs[i]): 201 self._set_match_position(i) 202 return 203 204 # From the beginning to the original start 205 for i in range(log_beginning_index, starting_index): 206 if self.search_filter.matches(logs[i]): 207 self._set_match_position(i) 208 return 209 210 def search_backwards(self): 211 if not self.search_filter: 212 return 213 self.search_highlight = True 214 215 log_beginning_index = self.hidden_line_count() 216 217 starting_index = self.log_index - 1 218 if starting_index < 0: 219 starting_index = self.get_last_log_index() 220 221 _, logs = self._get_log_lines() 222 223 # From current position - 1 and up 224 for i in range(starting_index, log_beginning_index - 1, -1): 225 if self.search_filter.matches(logs[i]): 226 self._set_match_position(i) 227 return 228 229 # From the end to the original start 230 for i in range(self.get_last_log_index(), starting_index, -1): 231 if self.search_filter.matches(logs[i]): 232 self._set_match_position(i) 233 return 234 235 def set_search_regex(self, 236 text, 237 invert, 238 field, 239 matcher: Optional[SearchMatcher] = None) -> bool: 240 search_matcher = matcher if matcher else self.search_matcher 241 _LOG.debug(search_matcher) 242 243 regex_text, regex_flags = preprocess_search_regex( 244 text, matcher=search_matcher) 245 246 try: 247 compiled_regex = re.compile(regex_text, regex_flags) 248 self.search_filter = LogFilter( 249 regex=compiled_regex, 250 input_text=text, 251 invert=invert, 252 field=field, 253 ) 254 _LOG.debug(self.search_filter) 255 except re.error as error: 256 _LOG.debug(error) 257 return False 258 259 self.search_highlight = True 260 self.search_text = regex_text 261 return True 262 263 def new_search( 264 self, 265 text, 266 invert=False, 267 field: Optional[str] = None, 268 search_matcher: Optional[str] = None, 269 interactive: bool = True, 270 ) -> bool: 271 """Start a new search for the given text.""" 272 valid_matchers = list(s.name for s in SearchMatcher) 273 selected_matcher: Optional[SearchMatcher] = None 274 if (search_matcher is not None 275 and search_matcher.upper() in valid_matchers): 276 selected_matcher = SearchMatcher(search_matcher.upper()) 277 278 if not self.set_search_regex(text, invert, field, selected_matcher): 279 return False 280 281 # Clear matched lines 282 self.search_matched_lines = {} 283 284 if interactive: 285 # Start count historical search matches task. 286 self.search_match_count_task = asyncio.create_task( 287 self.count_search_matches()) 288 289 # Default search direction when hitting enter in the search bar. 290 if interactive: 291 self.search_forwards() 292 return True 293 294 def save_search_matched_line(self, log_index: int) -> None: 295 """Save the log_index at position as a matched line.""" 296 self.search_matched_lines[log_index] = 0 297 # Keep matched lines sorted by position 298 self.search_matched_lines = { 299 # Save this log_index and its match number. 300 log_index: match_number 301 for match_number, log_index in enumerate( 302 sorted(self.search_matched_lines.keys())) 303 } 304 305 def disable_search_highlighting(self): 306 self.log_pane.log_view.search_highlight = False 307 308 def _restart_filtering(self): 309 # Turn on follow 310 if not self.follow: 311 self.toggle_follow() 312 313 # Reset filtered logs. 314 self.filtered_logs.clear() 315 # Reset scrollback start 316 self._scrollback_start_index = 0 317 318 # Start filtering existing log lines. 319 self.filter_existing_logs_task = asyncio.create_task( 320 self.filter_past_logs()) 321 322 # Reset existing search 323 self.clear_search() 324 325 # Trigger a main menu update to set log window menu titles. 326 self.log_pane.application.update_menu_items() 327 # Redraw the UI 328 self.log_pane.application.redraw_ui() 329 330 def install_new_filter(self): 331 """Set a filter using the current search_regex.""" 332 if not self.search_filter: 333 return 334 335 self.filtering_on = True 336 self.filters[self.search_text] = copy.deepcopy(self.search_filter) 337 338 self.clear_search() 339 340 def apply_filter(self): 341 """Set new filter and schedule historical log filter asyncio task.""" 342 self.install_new_filter() 343 self._restart_filtering() 344 345 def clear_search_highlighting(self): 346 self.search_highlight = False 347 self._reset_log_screen_on_next_render = True 348 349 def clear_search(self): 350 self.search_matched_lines = {} 351 self.search_text = None 352 self.search_filter = None 353 self.search_highlight = False 354 self._reset_log_screen_on_next_render = True 355 356 def _get_log_lines(self) -> Tuple[int, collections.deque[LogLine]]: 357 logs = self.log_store.logs 358 if self.filtering_on: 359 logs = self.filtered_logs 360 return self._scrollback_start_index, logs 361 362 def _get_visible_log_lines(self): 363 _, logs = self._get_log_lines() 364 if self._scrollback_start_index > 0: 365 return collections.deque( 366 itertools.islice(logs, self.hidden_line_count(), len(logs))) 367 return logs 368 369 def _get_table_formatter(self) -> Optional[Callable]: 370 table_formatter = None 371 if self.log_pane.table_view: 372 table_formatter = self.log_store.table.formatted_row 373 return table_formatter 374 375 def delete_filter(self, filter_text): 376 if filter_text not in self.filters: 377 return 378 379 # Delete this filter 380 del self.filters[filter_text] 381 382 # If no filters left, stop filtering. 383 if len(self.filters) == 0: 384 self.clear_filters() 385 else: 386 # Erase existing filtered lines. 387 self._restart_filtering() 388 389 def clear_filters(self): 390 if not self.filtering_on: 391 return 392 self.clear_search() 393 self.filtering_on = False 394 self.filters: 'collections.OrderedDict[str, re.Pattern]' = ( 395 collections.OrderedDict()) 396 self.filtered_logs.clear() 397 # Reset scrollback start 398 self._scrollback_start_index = 0 399 if not self.follow: 400 self.toggle_follow() 401 402 async def count_search_matches(self): 403 """Count search matches and save their locations.""" 404 # Wait for any filter_existing_logs_task to finish. 405 if self.filtering_on and self.filter_existing_logs_task: 406 await self.filter_existing_logs_task 407 408 starting_index = self.get_last_log_index() 409 ending_index, logs = self._get_log_lines() 410 411 # From the end of the log store to the beginning. 412 for i in range(starting_index, ending_index - 1, -1): 413 # Is this log a match? 414 if self.search_filter.matches(logs[i]): 415 self.save_search_matched_line(i) 416 # Pause every 100 lines or so 417 if i % 100 == 0: 418 await asyncio.sleep(.1) 419 420 async def filter_past_logs(self): 421 """Filter past log lines.""" 422 starting_index = self.log_store.get_last_log_index() 423 ending_index = -1 424 425 # From the end of the log store to the beginning. 426 for i in range(starting_index, ending_index, -1): 427 # Is this log a match? 428 if self.filter_scan(self.log_store.logs[i]): 429 # Add to the beginning of the deque. 430 self.filtered_logs.appendleft(self.log_store.logs[i]) 431 # TODO(tonymd): Tune these values. 432 # Pause every 100 lines or so 433 if i % 100 == 0: 434 await asyncio.sleep(.1) 435 436 def set_log_pane(self, log_pane: 'LogPane'): 437 """Set the parent LogPane instance.""" 438 self.log_pane = log_pane 439 440 def _update_log_index(self) -> ScreenLine: 441 line_at_cursor = self.log_screen.get_line_at_cursor_position() 442 if line_at_cursor.log_index is not None: 443 self.log_index = line_at_cursor.log_index 444 return line_at_cursor 445 446 def get_current_line(self) -> int: 447 """Return the currently selected log event index.""" 448 return self.log_index 449 450 def get_total_count(self): 451 """Total size of the logs store.""" 452 return (len(self.filtered_logs) 453 if self.filtering_on else self.log_store.get_total_count()) 454 455 def get_last_log_index(self): 456 total = self.get_total_count() 457 return 0 if total < 0 else total - 1 458 459 def clear_scrollback(self): 460 """Hide log lines before the max length of the stored logs.""" 461 # Enable follow and scroll to the bottom, then clear. 462 if not self.follow: 463 self.toggle_follow() 464 self._scrollback_start_index = self.log_index 465 self._reset_log_screen_on_next_render = True 466 467 def hidden_line_count(self): 468 """Return the number of hidden lines.""" 469 if self._scrollback_start_index > 0: 470 return self._scrollback_start_index + 1 471 return 0 472 473 def undo_clear_scrollback(self): 474 """Reset the current scrollback start index.""" 475 self._scrollback_start_index = 0 476 477 def wrap_lines_enabled(self): 478 """Get the parent log pane wrap lines setting.""" 479 if not self.log_pane: 480 return False 481 return self.log_pane.wrap_lines 482 483 def toggle_follow(self): 484 """Toggle auto line following.""" 485 self.follow = not self.follow 486 if self.follow: 487 # Disable search match follow mode. 488 self.follow_search_match = False 489 self.scroll_to_bottom() 490 491 def filter_scan(self, log: 'LogLine'): 492 filter_match_count = 0 493 for _filter_text, log_filter in self.filters.items(): 494 if log_filter.matches(log): 495 filter_match_count += 1 496 else: 497 break 498 499 if filter_match_count == len(self.filters): 500 return True 501 return False 502 503 def new_logs_arrived(self): 504 """Check newly arrived log messages. 505 506 Depending on where log statements occur ``new_logs_arrived`` may be in a 507 separate thread since it is triggerd by the Python log handler 508 ``emit()`` function. In this case the log handler is the LogStore 509 instance ``self.log_store``. This function should not redraw the screen 510 or scroll. 511 """ 512 latest_total = self.log_store.get_total_count() 513 514 if self.filtering_on: 515 # Scan newly arived log lines 516 for i in range(self._last_log_store_index, latest_total): 517 if self.filter_scan(self.log_store.logs[i]): 518 self.filtered_logs.append(self.log_store.logs[i]) 519 520 if self.search_filter: 521 last_matched_log: Optional[int] = None 522 # Scan newly arived log lines 523 for i in range(self._last_log_store_index, latest_total): 524 if self.search_filter.matches(self.log_store.logs[i]): 525 self.save_search_matched_line(i) 526 last_matched_log = i 527 if last_matched_log and self.follow_search_match: 528 # Set the follow event flag for the next render_content call. 529 self.follow_event = FollowEvent.SEARCH_MATCH 530 self.last_search_matched_log = last_matched_log 531 532 self._last_log_store_index = latest_total 533 self._new_logs_since_last_render = True 534 535 if self.follow: 536 # Set the follow event flag for the next render_content call. 537 self.follow_event = FollowEvent.STICKY_FOLLOW 538 539 # Trigger a UI update 540 self._update_prompt_toolkit_ui() 541 542 def _update_prompt_toolkit_ui(self): 543 """Update Prompt Toolkit UI if a certain amount of time has passed.""" 544 emit_time = time.time() 545 # Has enough time passed since last UI redraw? 546 if emit_time > self._last_ui_update_time + self._ui_update_frequency: 547 # Update last log time 548 self._last_ui_update_time = emit_time 549 550 # Trigger Prompt Toolkit UI redraw. 551 self.log_pane.application.redraw_ui() 552 553 def get_cursor_position(self) -> Point: 554 """Return the position of the cursor.""" 555 return Point(0, self.log_screen.cursor_position) 556 557 def scroll_to_top(self): 558 """Move selected index to the beginning.""" 559 # Stop following so cursor doesn't jump back down to the bottom. 560 self.follow = False 561 # First possible log index that should be displayed 562 log_beginning_index = self.hidden_line_count() 563 self.log_index = log_beginning_index 564 self.log_screen.reset_logs(log_index=self.log_index) 565 self.log_screen.shift_selected_log_to_top() 566 self._user_scroll_event = True 567 568 def move_selected_line_to_top(self): 569 self.follow = False 570 571 # Update selected line 572 self._update_log_index() 573 574 self.log_screen.reset_logs(log_index=self.log_index) 575 self.log_screen.shift_selected_log_to_top() 576 self._user_scroll_event = True 577 578 def center_log_line(self): 579 self.follow = False 580 581 # Update selected line 582 self._update_log_index() 583 584 self.log_screen.reset_logs(log_index=self.log_index) 585 self.log_screen.shift_selected_log_to_center() 586 self._user_scroll_event = True 587 588 def scroll_to_bottom(self, with_sticky_follow: bool = True): 589 """Move selected index to the end.""" 590 # Don't change following state like scroll_to_top. 591 self.log_index = max(0, self.get_last_log_index()) 592 self.log_screen.reset_logs(log_index=self.log_index) 593 594 # Sticky follow mode 595 if with_sticky_follow: 596 self.follow = True 597 self._user_scroll_event = True 598 599 def scroll(self, lines) -> None: 600 """Scroll up or down by plus or minus lines. 601 602 This method is only called by user keybindings. 603 """ 604 # If the user starts scrolling, stop auto following. 605 self.follow = False 606 607 self.log_screen.scroll_subline(lines) 608 self._user_scroll_event = True 609 610 # Update the current log 611 current_line = self._update_log_index() 612 613 # Don't check for sticky follow mode if selecting lines. 614 if self.visual_select_mode: 615 return 616 # Is the last log line selected? 617 if self.log_index == self.get_last_log_index(): 618 # Is the last line of the current log selected? 619 if current_line.subline + 1 == current_line.height: 620 # Sticky follow mode 621 self.follow = True 622 623 def visual_selected_log_count(self) -> int: 624 if self.marked_logs_start is None or self.marked_logs_end is None: 625 return 0 626 return (self.marked_logs_end - self.marked_logs_start) + 1 627 628 def clear_visual_selection(self) -> None: 629 self.marked_logs_start = None 630 self.marked_logs_end = None 631 self.visual_select_mode = False 632 self._user_scroll_event = True 633 self.log_pane.application.redraw_ui() 634 635 def visual_select_all(self) -> None: 636 self.marked_logs_start = self._scrollback_start_index 637 self.marked_logs_end = self.get_total_count() - 1 638 639 self.visual_select_mode = True 640 self._user_scroll_event = True 641 self.log_pane.application.redraw_ui() 642 643 def visual_select_up(self) -> None: 644 # Select the current line 645 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 646 # Move the cursor by 1 647 self.scroll_up(1) 648 # Select the new line 649 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 650 651 def visual_select_down(self) -> None: 652 # Select the current line 653 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 654 # Move the cursor by 1 655 self.scroll_down(1) 656 # Select the new line 657 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 658 659 def visual_select_line(self, 660 mouse_position: Point, 661 autoscroll: bool = True) -> None: 662 """Mark the log under mouse_position as visually selected.""" 663 # Check mouse_position is valid 664 if not 0 <= mouse_position.y < len(self.log_screen.line_buffer): 665 return 666 # Update mode flags 667 self.visual_select_mode = True 668 self.follow = False 669 # Get the ScreenLine for the cursor position 670 screen_line = self.log_screen.line_buffer[mouse_position.y] 671 if screen_line.log_index is None: 672 return 673 674 if self.marked_logs_start is None: 675 self.marked_logs_start = screen_line.log_index 676 if self.marked_logs_end is None: 677 self.marked_logs_end = screen_line.log_index 678 679 if screen_line.log_index < self.marked_logs_start: 680 self.marked_logs_start = screen_line.log_index 681 elif screen_line.log_index > self.marked_logs_end: 682 self.marked_logs_end = screen_line.log_index 683 684 # Update cursor position 685 self.log_screen.move_cursor_to_position(mouse_position.y) 686 687 # Autoscroll when mouse dragging on the top or bottom of the window. 688 if autoscroll: 689 if mouse_position.y == 0: 690 self.scroll_up(1) 691 elif mouse_position.y == self._window_height - 1: 692 self.scroll_down(1) 693 694 # Trigger a rerender. 695 self._user_scroll_event = True 696 self.log_pane.application.redraw_ui() 697 698 def scroll_to_position(self, mouse_position: Point): 699 """Set the selected log line to the mouse_position.""" 700 # Disable follow mode when the user clicks or mouse drags on a log line. 701 self.follow = False 702 703 self.log_screen.move_cursor_to_position(mouse_position.y) 704 self._update_log_index() 705 706 self._user_scroll_event = True 707 708 def scroll_up_one_page(self): 709 """Move the selected log index up by one window height.""" 710 lines = 1 711 if self._window_height > 0: 712 lines = self._window_height 713 self.scroll(-1 * lines) 714 715 def scroll_down_one_page(self): 716 """Move the selected log index down by one window height.""" 717 lines = 1 718 if self._window_height > 0: 719 lines = self._window_height 720 self.scroll(lines) 721 722 def scroll_down(self, lines=1): 723 """Move the selected log index down by one or more lines.""" 724 self.scroll(lines) 725 726 def scroll_up(self, lines=1): 727 """Move the selected log index up by one or more lines.""" 728 self.scroll(-1 * lines) 729 730 def log_start_end_indexes_changed(self) -> bool: 731 return (self._last_start_index != self._current_start_index 732 or self._last_end_index != self._current_end_index) 733 734 def render_table_header(self): 735 """Get pre-formatted table header.""" 736 return self.log_store.render_table_header() 737 738 def render_content(self) -> list: 739 """Return logs to display on screen as a list of FormattedText tuples. 740 741 This function determines when the log screen requires re-rendeing based 742 on user scroll events, follow mode being on, or log pane being 743 empty. The FormattedText tuples passed to prompt_toolkit are cached if 744 no updates are required. 745 """ 746 screen_update_needed = False 747 748 # Check window size 749 if self.log_pane.pane_resized(): 750 self._window_width = self.log_pane.current_log_pane_width 751 self._window_height = self.log_pane.current_log_pane_height 752 self.log_screen.resize(self._window_width, self._window_height) 753 self._reset_log_screen_on_next_render = True 754 755 if self.follow_event is not None: 756 if (self.follow_event == FollowEvent.SEARCH_MATCH 757 and self.last_search_matched_log): 758 self.log_index = self.last_search_matched_log 759 self.last_search_matched_log = None 760 self._reset_log_screen_on_next_render = True 761 762 elif self.follow_event == FollowEvent.STICKY_FOLLOW: 763 # Jump to the last log message 764 self.log_index = max(0, self.get_last_log_index()) 765 766 self.follow_event = None 767 screen_update_needed = True 768 769 if self._reset_log_screen_on_next_render or self.log_screen.empty(): 770 # Clear the reset flag. 771 self._reset_log_screen_on_next_render = False 772 self.log_screen.reset_logs(log_index=self.log_index) 773 screen_update_needed = True 774 775 elif self.follow and self._new_logs_since_last_render: 776 # Follow mode is on so add new logs to the screen 777 self._new_logs_since_last_render = False 778 779 current_log_index = self.log_index 780 last_rendered_log_index = self.log_screen.last_appended_log_index 781 # If so many logs have arrived than can fit on the screen, redraw 782 # the whole screen from the new position. 783 if (current_log_index - 784 last_rendered_log_index) > self.log_screen.height: 785 self.log_screen.reset_logs(log_index=self.log_index) 786 # A small amount of logs have arrived, append them one at a time 787 # without redrawing the whole screen. 788 else: 789 for i in range(last_rendered_log_index + 1, 790 current_log_index + 1): 791 self.log_screen.append_log(i) 792 793 screen_update_needed = True 794 795 if self.follow: 796 # Select the last line for follow mode. 797 self.log_screen.move_cursor_to_bottom() 798 screen_update_needed = True 799 800 if self._user_scroll_event: 801 self._user_scroll_event = False 802 screen_update_needed = True 803 804 if screen_update_needed: 805 self._line_fragment_cache = self.log_screen.get_lines( 806 marked_logs_start=self.marked_logs_start, 807 marked_logs_end=self.marked_logs_end, 808 ) 809 return self._line_fragment_cache 810 811 def _logs_to_text( 812 self, 813 use_table_formatting: bool = True, 814 selected_lines_only: bool = False, 815 ) -> str: 816 """Convert all or selected log messages to plaintext.""" 817 def get_table_string(log: LogLine) -> str: 818 return remove_formatting(self.log_store.table.formatted_row(log)) 819 820 formatter: Callable[[LogLine], 821 str] = operator.attrgetter('ansi_stripped_log') 822 if use_table_formatting: 823 formatter = get_table_string 824 825 _start_log_index, log_source = self._get_log_lines() 826 827 log_index_range = range(self._scrollback_start_index, 828 self.get_total_count()) 829 if (selected_lines_only and self.marked_logs_start is not None 830 and self.marked_logs_end is not None): 831 log_index_range = range(self.marked_logs_start, 832 self.marked_logs_end + 1) 833 834 text_output = '' 835 for i in log_index_range: 836 log_text = formatter(log_source[i]) 837 text_output += log_text 838 if not log_text.endswith('\n'): 839 text_output += '\n' 840 841 return text_output 842 843 def export_logs( 844 self, 845 use_table_formatting: bool = True, 846 selected_lines_only: bool = False, 847 file_name: Optional[str] = None, 848 to_clipboard: bool = False, 849 add_markdown_fence: bool = False, 850 ) -> bool: 851 """Export log lines to file or clipboard.""" 852 text_output = self._logs_to_text(use_table_formatting, 853 selected_lines_only) 854 855 if file_name: 856 target_path = Path(file_name).expanduser() 857 with target_path.open('w') as output_file: 858 output_file.write(text_output) 859 _LOG.debug('Saved to file: %s', file_name) 860 861 elif to_clipboard: 862 if add_markdown_fence: 863 text_output = '```\n' + text_output + '```\n' 864 self.log_pane.application.application.clipboard.set_text( 865 text_output) 866 _LOG.debug('Copied logs to clipboard.') 867 868 return True 869