1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""LogView maintains a log pane's scrolling and searching state.""" 15 16from __future__ import annotations 17import asyncio 18import collections 19import copy 20from enum import Enum 21import itertools 22import json 23import logging 24import operator 25from pathlib import Path 26import re 27from threading import Thread 28from typing import Callable, TYPE_CHECKING 29 30from prompt_toolkit.data_structures import Point 31from prompt_toolkit.formatted_text import StyleAndTextTuples 32import websockets 33 34from pw_console.log_filter import ( 35 DEFAULT_SEARCH_MATCHER, 36 LogFilter, 37 RegexValidator, 38 SearchMatcher, 39 preprocess_search_regex, 40) 41from pw_console.log_screen import ScreenLine, LogScreen 42from pw_console.log_store import LogStore 43from pw_console.python_logging import log_record_to_json 44from pw_console.text_formatting import remove_formatting 45 46if TYPE_CHECKING: 47 from pw_console.console_app import ConsoleApp 48 from pw_console.log_line import LogLine 49 from pw_console.log_pane import LogPane 50 51_LOG = logging.getLogger(__package__) 52 53 54class FollowEvent(Enum): 55 """Follow mode scroll event types.""" 56 57 SEARCH_MATCH = 'scroll_to_bottom' 58 STICKY_FOLLOW = 'scroll_to_bottom_with_sticky_follow' 59 60 61class LogView: 62 """Viewing window into a LogStore.""" 63 64 # pylint: disable=too-many-instance-attributes,too-many-public-methods 65 66 def __init__( 67 self, 68 log_pane: LogPane, 69 application: ConsoleApp, 70 log_store: LogStore | None = None, 71 ): 72 # Parent LogPane reference. Updated by calling `set_log_pane()`. 73 self.log_pane = log_pane 74 self.log_store = ( 75 log_store if log_store else LogStore(prefs=application.prefs) 76 ) 77 self.log_store.set_prefs(application.prefs) 78 self.log_store.register_viewer(self) 79 80 self.marked_logs_start: int | None = None 81 self.marked_logs_end: int | None = None 82 83 # Search variables 84 self.search_text: str | None = None 85 self.search_filter: LogFilter | None = None 86 self.search_highlight: bool = False 87 self.search_matcher = DEFAULT_SEARCH_MATCHER 88 self.search_validator = RegexValidator() 89 90 # Container for each log_index matched by active searches. 91 self.search_matched_lines: dict[int, int] = {} 92 # Background task to find historical matched lines. 93 self.search_match_count_task: asyncio.Task | None = None 94 95 # Flag for automatically jumping to each new search match as they 96 # appear. 97 self.follow_search_match: bool = False 98 self.last_search_matched_log: int | None = None 99 100 # Follow event flag. This is set to by the new_logs_arrived() function 101 # as a signal that the log screen should be scrolled to the bottom. 102 # This is read by render_content() whenever the screen is drawn. 103 self.follow_event: FollowEvent | None = None 104 105 self.log_screen = LogScreen( 106 get_log_source=self._get_log_lines, 107 get_line_wrapping=self.wrap_lines_enabled, 108 get_log_formatter=self._get_table_formatter, 109 get_search_filter=lambda: self.search_filter, 110 get_search_highlight=lambda: self.search_highlight, 111 ) 112 113 # Filter 114 self.filtering_on: bool = False 115 self.filters: collections.OrderedDict[ 116 str, LogFilter 117 ] = collections.OrderedDict() 118 self.filtered_logs: collections.deque = collections.deque() 119 self.filter_existing_logs_task: asyncio.Task | None = None 120 121 # Current log line index state variables: 122 self._last_log_index = -1 123 self._log_index = 0 124 self._filtered_log_index = 0 125 self._last_start_index = 0 126 self._last_end_index = 0 127 self._current_start_index = 0 128 self._current_end_index = 0 129 self._scrollback_start_index = 0 130 131 # LogPane prompt_toolkit container render size. 132 self._window_height = 20 133 self._window_width = 80 134 self._reset_log_screen_on_next_render: bool = True 135 self._user_scroll_event: bool = False 136 137 self._last_log_store_index = 0 138 self._new_logs_since_last_render = True 139 self._new_logs_since_last_websocket_serve = True 140 self._last_served_websocket_index = -1 141 142 # Should new log lines be tailed? 143 self.follow: bool = True 144 145 self.visual_select_mode: bool = False 146 147 # Cache of formatted text tuples used in the last UI render. 148 self._line_fragment_cache: list[StyleAndTextTuples] = [] 149 150 # websocket server variables 151 self.websocket_running: bool = False 152 self.websocket_server = None 153 self.websocket_port = None 154 self.websocket_loop = asyncio.new_event_loop() 155 156 # Check if any logs are already in the log_store and update the view. 157 self.new_logs_arrived() 158 159 def _websocket_thread_entry(self): 160 """Entry point for the user code thread.""" 161 asyncio.set_event_loop(self.websocket_loop) 162 self.websocket_server = websockets.serve( # type: ignore # pylint: disable=no-member 163 self._send_logs_over_websockets, '127.0.0.1' 164 ) 165 self.websocket_loop.run_until_complete(self.websocket_server) 166 self.websocket_port = self.websocket_server.ws_server.sockets[ 167 0 168 ].getsockname()[1] 169 self.websocket_running = True 170 self.websocket_loop.run_forever() 171 172 def start_websocket_thread(self): 173 """Create a thread for running user code so the UI isn't blocked.""" 174 thread = Thread( 175 target=self._websocket_thread_entry, args=(), daemon=True 176 ) 177 thread.start() 178 179 def stop_websocket_thread(self): 180 """Stop websocket server.""" 181 if self.websocket_running: 182 self.websocket_loop.call_soon_threadsafe(self.websocket_loop.stop) 183 self.websocket_server = None 184 self.websocket_port = None 185 self.websocket_running = False 186 if self.filtering_on: 187 self._restart_filtering() 188 189 async def _send_logs_over_websockets(self, websocket, _path) -> None: 190 formatter: Callable[[LogLine], str] = operator.attrgetter( 191 'ansi_stripped_log' 192 ) 193 formatter = lambda log: log_record_to_json(log.record) 194 195 theme_colors = json.dumps( 196 self.log_pane.application.prefs.pw_console_color_config() 197 ) 198 # Send colors 199 await websocket.send(theme_colors) 200 201 while True: 202 # Wait for new logs 203 if not self._new_logs_since_last_websocket_serve: 204 await asyncio.sleep(0.5) 205 206 _start_log_index, log_source = self._get_log_lines() 207 log_index_range = range( 208 self._last_served_websocket_index + 1, self.get_total_count() 209 ) 210 211 for i in log_index_range: 212 log_text = formatter(log_source[i]) 213 await websocket.send(log_text) 214 self._last_served_websocket_index = i 215 216 # Flag that all logs have been served. 217 self._new_logs_since_last_websocket_serve = False 218 219 def view_mode_changed(self) -> None: 220 self._reset_log_screen_on_next_render = True 221 222 @property 223 def log_index(self): 224 if self.filtering_on: 225 return self._filtered_log_index 226 return self._log_index 227 228 @log_index.setter 229 def log_index(self, new_log_index): 230 # Save the old log_index 231 self._last_log_index = self.log_index 232 if self.filtering_on: 233 self._filtered_log_index = new_log_index 234 else: 235 self._log_index = new_log_index 236 237 def _reset_log_index_changed(self) -> None: 238 self._last_log_index = self.log_index 239 240 def log_index_changed_since_last_render(self) -> bool: 241 return self._last_log_index != self.log_index 242 243 def _set_match_position(self, position: int): 244 self.follow = False 245 self.log_index = position 246 self.save_search_matched_line(position) 247 self.log_screen.reset_logs(log_index=self.log_index) 248 self.log_screen.shift_selected_log_to_center() 249 self._user_scroll_event = True 250 self.log_pane.application.redraw_ui() 251 252 def select_next_search_matcher(self): 253 matchers = list(SearchMatcher) 254 index = matchers.index(self.search_matcher) 255 new_index = (index + 1) % len(matchers) 256 self.search_matcher = matchers[new_index] 257 258 def search_forwards(self): 259 if not self.search_filter: 260 return 261 self.search_highlight = True 262 263 log_beginning_index = self.hidden_line_count() 264 265 starting_index = self.log_index + 1 266 if starting_index > self.get_last_log_index(): 267 starting_index = log_beginning_index 268 269 _, logs = self._get_log_lines() 270 271 # From current position +1 and down 272 for i in range(starting_index, self.get_last_log_index() + 1): 273 if self.search_filter.matches(logs[i]): 274 self._set_match_position(i) 275 return 276 277 # From the beginning to the original start 278 for i in range(log_beginning_index, starting_index): 279 if self.search_filter.matches(logs[i]): 280 self._set_match_position(i) 281 return 282 283 def search_backwards(self): 284 if not self.search_filter: 285 return 286 self.search_highlight = True 287 288 log_beginning_index = self.hidden_line_count() 289 290 starting_index = self.log_index - 1 291 if starting_index < 0: 292 starting_index = self.get_last_log_index() 293 294 _, logs = self._get_log_lines() 295 296 # From current position - 1 and up 297 for i in range(starting_index, log_beginning_index - 1, -1): 298 if self.search_filter.matches(logs[i]): 299 self._set_match_position(i) 300 return 301 302 # From the end to the original start 303 for i in range(self.get_last_log_index(), starting_index, -1): 304 if self.search_filter.matches(logs[i]): 305 self._set_match_position(i) 306 return 307 308 def set_search_regex( 309 self, text, invert, field, matcher: SearchMatcher | None = None 310 ) -> bool: 311 search_matcher = matcher if matcher else self.search_matcher 312 _LOG.debug(search_matcher) 313 314 regex_text, regex_flags = preprocess_search_regex( 315 text, matcher=search_matcher 316 ) 317 318 try: 319 compiled_regex = re.compile(regex_text, regex_flags) 320 self.search_filter = LogFilter( 321 regex=compiled_regex, 322 input_text=text, 323 invert=invert, 324 field=field, 325 ) 326 _LOG.debug(self.search_filter) 327 except re.error as error: 328 _LOG.debug(error) 329 return False 330 331 self.search_highlight = True 332 self.search_text = regex_text 333 return True 334 335 def new_search( 336 self, 337 text, 338 invert=False, 339 field: str | None = None, 340 search_matcher: str | None = None, 341 interactive: bool = True, 342 ) -> bool: 343 """Start a new search for the given text.""" 344 valid_matchers = list(s.name for s in SearchMatcher) 345 selected_matcher: SearchMatcher | None = None 346 if ( 347 search_matcher is not None 348 and search_matcher.upper() in valid_matchers 349 ): 350 selected_matcher = SearchMatcher(search_matcher.upper()) 351 352 if not self.set_search_regex(text, invert, field, selected_matcher): 353 return False 354 355 # Clear matched lines 356 self.search_matched_lines = {} 357 358 if interactive: 359 # Start count historical search matches task. 360 self.search_match_count_task = asyncio.create_task( 361 self.count_search_matches() 362 ) 363 364 # Default search direction when hitting enter in the search bar. 365 if interactive: 366 self.search_forwards() 367 return True 368 369 def save_search_matched_line(self, log_index: int) -> None: 370 """Save the log_index at position as a matched line.""" 371 self.search_matched_lines[log_index] = 0 372 # Keep matched lines sorted by position 373 self.search_matched_lines = { 374 # Save this log_index and its match number. 375 log_index: match_number 376 for match_number, log_index in enumerate( 377 sorted(self.search_matched_lines.keys()) 378 ) 379 } 380 381 def disable_search_highlighting(self): 382 self.log_pane.log_view.search_highlight = False 383 384 def _restart_filtering(self): 385 # Turn on follow 386 if not self.follow: 387 self.toggle_follow() 388 389 # Reset filtered logs. 390 self.filtered_logs.clear() 391 # Reset scrollback start 392 self._scrollback_start_index = 0 393 394 # Start filtering existing log lines. 395 self.filter_existing_logs_task = asyncio.create_task( 396 self.filter_past_logs() 397 ) 398 399 # Reset existing search 400 self.clear_search() 401 402 # Trigger a main menu update to set log window menu titles. 403 self.log_pane.application.update_menu_items() 404 # Redraw the UI 405 self.log_pane.application.redraw_ui() 406 407 def install_new_filter(self): 408 """Set a filter using the current search_regex.""" 409 if not self.search_filter: 410 return 411 412 self.filtering_on = True 413 self.filters[self.search_text] = copy.deepcopy(self.search_filter) 414 415 self.clear_search() 416 417 def apply_filter(self): 418 """Set new filter and schedule historical log filter asyncio task.""" 419 if self.websocket_running: 420 return 421 self.install_new_filter() 422 self._restart_filtering() 423 424 def clear_search_highlighting(self): 425 self.search_highlight = False 426 self._reset_log_screen_on_next_render = True 427 428 def clear_search(self): 429 self.search_matched_lines = {} 430 self.search_text = None 431 self.search_filter = None 432 self.search_highlight = False 433 self._reset_log_screen_on_next_render = True 434 435 def _get_log_lines(self) -> tuple[int, collections.deque[LogLine]]: 436 logs = self.log_store.logs 437 if self.filtering_on: 438 logs = self.filtered_logs 439 return self._scrollback_start_index, logs 440 441 def _get_visible_log_lines(self): 442 _, logs = self._get_log_lines() 443 if self._scrollback_start_index > 0: 444 return collections.deque( 445 itertools.islice(logs, self.hidden_line_count(), len(logs)) 446 ) 447 return logs 448 449 def _get_table_formatter(self) -> Callable | None: 450 table_formatter = None 451 if self.log_pane.table_view: 452 table_formatter = self.log_store.table.formatted_row 453 return table_formatter 454 455 def delete_filter(self, filter_text): 456 if filter_text not in self.filters: 457 return 458 459 # Delete this filter 460 del self.filters[filter_text] 461 462 # If no filters left, stop filtering. 463 if len(self.filters) == 0: 464 self.clear_filters() 465 else: 466 # Erase existing filtered lines. 467 self._restart_filtering() 468 469 def clear_filters(self): 470 if not self.filtering_on: 471 return 472 self.clear_search() 473 self.filtering_on = False 474 self.filters: collections.OrderedDict[ 475 str, re.Pattern 476 ] = collections.OrderedDict() 477 self.filtered_logs.clear() 478 # Reset scrollback start 479 self._scrollback_start_index = 0 480 if not self.follow: 481 self.toggle_follow() 482 483 async def count_search_matches(self): 484 """Count search matches and save their locations.""" 485 # Wait for any filter_existing_logs_task to finish. 486 if self.filtering_on and self.filter_existing_logs_task: 487 await self.filter_existing_logs_task 488 489 starting_index = self.get_last_log_index() 490 ending_index, logs = self._get_log_lines() 491 492 # From the end of the log store to the beginning. 493 for i in range(starting_index, ending_index - 1, -1): 494 # Is this log a match? 495 if self.search_filter.matches(logs[i]): 496 self.save_search_matched_line(i) 497 # Pause every 100 lines or so 498 if i % 100 == 0: 499 await asyncio.sleep(0.1) 500 501 async def filter_past_logs(self): 502 """Filter past log lines.""" 503 starting_index = self.log_store.get_last_log_index() 504 ending_index = -1 505 506 # From the end of the log store to the beginning. 507 for i in range(starting_index, ending_index, -1): 508 # Is this log a match? 509 if self.filter_scan(self.log_store.logs[i]): 510 # Add to the beginning of the deque. 511 self.filtered_logs.appendleft(self.log_store.logs[i]) 512 # TODO(tonymd): Tune these values. 513 # Pause every 100 lines or so 514 if i % 100 == 0: 515 await asyncio.sleep(0.1) 516 517 def set_log_pane(self, log_pane: LogPane): 518 """Set the parent LogPane instance.""" 519 self.log_pane = log_pane 520 521 def _update_log_index(self) -> ScreenLine: 522 line_at_cursor = self.log_screen.get_line_at_cursor_position() 523 if line_at_cursor.log_index is not None: 524 self.log_index = line_at_cursor.log_index 525 return line_at_cursor 526 527 def get_current_line(self) -> int: 528 """Return the currently selected log event index.""" 529 return self.log_index 530 531 def get_total_count(self): 532 """Total size of the logs store.""" 533 return ( 534 len(self.filtered_logs) 535 if self.filtering_on 536 else self.log_store.get_total_count() 537 ) 538 539 def get_last_log_index(self): 540 total = self.get_total_count() 541 return 0 if total < 0 else total - 1 542 543 def clear_scrollback(self): 544 """Hide log lines before the max length of the stored logs.""" 545 # Enable follow and scroll to the bottom, then clear. 546 if not self.follow: 547 self.toggle_follow() 548 self._scrollback_start_index = self.log_index 549 self._reset_log_screen_on_next_render = True 550 551 def hidden_line_count(self): 552 """Return the number of hidden lines.""" 553 if self._scrollback_start_index > 0: 554 return self._scrollback_start_index + 1 555 return 0 556 557 def undo_clear_scrollback(self): 558 """Reset the current scrollback start index.""" 559 self._scrollback_start_index = 0 560 561 def wrap_lines_enabled(self): 562 """Get the parent log pane wrap lines setting.""" 563 if not self.log_pane: 564 return False 565 return self.log_pane.wrap_lines 566 567 def toggle_follow(self): 568 """Toggle auto line following.""" 569 self.follow = not self.follow 570 if self.follow: 571 # Disable search match follow mode. 572 self.follow_search_match = False 573 self.scroll_to_bottom() 574 575 def filter_scan(self, log: LogLine): 576 filter_match_count = 0 577 for _filter_text, log_filter in self.filters.items(): 578 if log_filter.matches(log): 579 filter_match_count += 1 580 else: 581 break 582 583 if filter_match_count == len(self.filters): 584 return True 585 return False 586 587 def new_logs_arrived(self): 588 """Check newly arrived log messages. 589 590 Depending on where log statements occur ``new_logs_arrived`` may be in a 591 separate thread since it is triggerd by the Python log handler 592 ``emit()`` function. In this case the log handler is the LogStore 593 instance ``self.log_store``. This function should not redraw the screen 594 or scroll. 595 """ 596 latest_total = self.log_store.get_total_count() 597 598 if self.filtering_on: 599 # Scan newly arived log lines 600 for i in range(self._last_log_store_index, latest_total): 601 if self.filter_scan(self.log_store.logs[i]): 602 self.filtered_logs.append(self.log_store.logs[i]) 603 604 if self.search_filter: 605 last_matched_log: int | None = None 606 # Scan newly arived log lines 607 for i in range(self._last_log_store_index, latest_total): 608 if self.search_filter.matches(self.log_store.logs[i]): 609 self.save_search_matched_line(i) 610 last_matched_log = i 611 if last_matched_log and self.follow_search_match: 612 # Set the follow event flag for the next render_content call. 613 self.follow_event = FollowEvent.SEARCH_MATCH 614 self.last_search_matched_log = last_matched_log 615 616 self._last_log_store_index = latest_total 617 self._new_logs_since_last_render = True 618 self._new_logs_since_last_websocket_serve = True 619 620 if self.follow: 621 # Set the follow event flag for the next render_content call. 622 self.follow_event = FollowEvent.STICKY_FOLLOW 623 624 if self.websocket_running: 625 # No terminal screen redraws are required. 626 return 627 628 # Trigger a UI update if the log window is visible. 629 if self.log_pane.show_pane: 630 self.log_pane.application.logs_redraw() 631 632 def get_cursor_position(self) -> Point: 633 """Return the position of the cursor.""" 634 return Point(0, self.log_screen.cursor_position) 635 636 def scroll_to_top(self): 637 """Move selected index to the beginning.""" 638 # Stop following so cursor doesn't jump back down to the bottom. 639 self.follow = False 640 # First possible log index that should be displayed 641 log_beginning_index = self.hidden_line_count() 642 self.log_index = log_beginning_index 643 self.log_screen.reset_logs(log_index=self.log_index) 644 self.log_screen.shift_selected_log_to_top() 645 self._user_scroll_event = True 646 647 def move_selected_line_to_top(self): 648 self.follow = False 649 650 # Update selected line 651 self._update_log_index() 652 653 self.log_screen.reset_logs(log_index=self.log_index) 654 self.log_screen.shift_selected_log_to_top() 655 self._user_scroll_event = True 656 657 def center_log_line(self): 658 self.follow = False 659 660 # Update selected line 661 self._update_log_index() 662 663 self.log_screen.reset_logs(log_index=self.log_index) 664 self.log_screen.shift_selected_log_to_center() 665 self._user_scroll_event = True 666 667 def scroll_to_bottom(self, with_sticky_follow: bool = True): 668 """Move selected index to the end.""" 669 # Don't change following state like scroll_to_top. 670 self.log_index = max(0, self.get_last_log_index()) 671 self.log_screen.reset_logs(log_index=self.log_index) 672 673 # Sticky follow mode 674 if with_sticky_follow: 675 self.follow = True 676 self._user_scroll_event = True 677 678 def scroll(self, lines) -> None: 679 """Scroll up or down by plus or minus lines. 680 681 This method is only called by user keybindings. 682 """ 683 # If the user starts scrolling, stop auto following. 684 self.follow = False 685 686 self.log_screen.scroll_subline(lines) 687 self._user_scroll_event = True 688 689 # Update the current log 690 current_line = self._update_log_index() 691 692 # Don't check for sticky follow mode if selecting lines. 693 if self.visual_select_mode: 694 return 695 # Is the last log line selected? 696 if self.log_index == self.get_last_log_index(): 697 # Is the last line of the current log selected? 698 if current_line.subline + 1 == current_line.height: 699 # Sticky follow mode 700 self.follow = True 701 702 def visual_selected_log_count(self) -> int: 703 if self.marked_logs_start is None or self.marked_logs_end is None: 704 return 0 705 return (self.marked_logs_end - self.marked_logs_start) + 1 706 707 def clear_visual_selection(self) -> None: 708 self.marked_logs_start = None 709 self.marked_logs_end = None 710 self.visual_select_mode = False 711 self._user_scroll_event = True 712 self.log_pane.application.redraw_ui() 713 714 def visual_select_all(self) -> None: 715 self.marked_logs_start = self._scrollback_start_index 716 self.marked_logs_end = self.get_total_count() - 1 717 718 self.visual_select_mode = True 719 self._user_scroll_event = True 720 self.log_pane.application.redraw_ui() 721 722 def visual_select_up(self) -> None: 723 # Select the current line 724 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 725 # Move the cursor by 1 726 self.scroll_up(1) 727 # Select the new line 728 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 729 730 def visual_select_down(self) -> None: 731 # Select the current line 732 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 733 # Move the cursor by 1 734 self.scroll_down(1) 735 # Select the new line 736 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 737 738 def visual_select_line( 739 self, mouse_position: Point, autoscroll: bool = True 740 ) -> None: 741 """Mark the log under mouse_position as visually selected.""" 742 # Check mouse_position is valid 743 if not 0 <= mouse_position.y < len(self.log_screen.line_buffer): 744 return 745 # Update mode flags 746 self.visual_select_mode = True 747 self.follow = False 748 # Get the ScreenLine for the cursor position 749 screen_line = self.log_screen.line_buffer[mouse_position.y] 750 if screen_line.log_index is None: 751 return 752 753 if self.marked_logs_start is None: 754 self.marked_logs_start = screen_line.log_index 755 if self.marked_logs_end is None: 756 self.marked_logs_end = screen_line.log_index 757 758 if screen_line.log_index < self.marked_logs_start: 759 self.marked_logs_start = screen_line.log_index 760 elif screen_line.log_index > self.marked_logs_end: 761 self.marked_logs_end = screen_line.log_index 762 763 # Update cursor position 764 self.log_screen.move_cursor_to_position(mouse_position.y) 765 766 # Autoscroll when mouse dragging on the top or bottom of the window. 767 if autoscroll: 768 if mouse_position.y == 0: 769 self.scroll_up(1) 770 elif mouse_position.y == self._window_height - 1: 771 self.scroll_down(1) 772 773 # Trigger a rerender. 774 self._user_scroll_event = True 775 self.log_pane.application.redraw_ui() 776 777 def scroll_to_position(self, mouse_position: Point): 778 """Set the selected log line to the mouse_position.""" 779 # Disable follow mode when the user clicks or mouse drags on a log line. 780 self.follow = False 781 782 self.log_screen.move_cursor_to_position(mouse_position.y) 783 self._update_log_index() 784 785 self._user_scroll_event = True 786 787 def scroll_up_one_page(self): 788 """Move the selected log index up by one window height.""" 789 lines = 1 790 if self._window_height > 0: 791 lines = self._window_height 792 self.scroll(-1 * lines) 793 794 def scroll_down_one_page(self): 795 """Move the selected log index down by one window height.""" 796 lines = 1 797 if self._window_height > 0: 798 lines = self._window_height 799 self.scroll(lines) 800 801 def scroll_down(self, lines=1): 802 """Move the selected log index down by one or more lines.""" 803 self.scroll(lines) 804 805 def scroll_up(self, lines=1): 806 """Move the selected log index up by one or more lines.""" 807 self.scroll(-1 * lines) 808 809 def log_start_end_indexes_changed(self) -> bool: 810 return ( 811 self._last_start_index != self._current_start_index 812 or self._last_end_index != self._current_end_index 813 ) 814 815 def render_table_header(self): 816 """Get pre-formatted table header.""" 817 return self.log_store.render_table_header() 818 819 def get_web_socket_url(self): 820 return f'http://127.0.0.1:3000/#ws={self.websocket_port}' 821 822 def render_content(self) -> list: 823 """Return logs to display on screen as a list of FormattedText tuples. 824 825 This function determines when the log screen requires re-rendeing based 826 on user scroll events, follow mode being on, or log pane being 827 empty. The FormattedText tuples passed to prompt_toolkit are cached if 828 no updates are required. 829 """ 830 screen_update_needed = False 831 832 # Disable rendering if user is viewing logs on web 833 if self.websocket_running: 834 return [] 835 836 # Check window size 837 if self.log_pane.pane_resized(): 838 self._window_width = self.log_pane.current_log_pane_width 839 self._window_height = self.log_pane.current_log_pane_height 840 self.log_screen.resize(self._window_width, self._window_height) 841 self._reset_log_screen_on_next_render = True 842 843 if self.follow_event is not None: 844 if ( 845 self.follow_event == FollowEvent.SEARCH_MATCH 846 and self.last_search_matched_log 847 ): 848 self.log_index = self.last_search_matched_log 849 self.last_search_matched_log = None 850 self._reset_log_screen_on_next_render = True 851 852 elif self.follow_event == FollowEvent.STICKY_FOLLOW: 853 # Jump to the last log message 854 self.log_index = max(0, self.get_last_log_index()) 855 856 self.follow_event = None 857 screen_update_needed = True 858 859 if self._reset_log_screen_on_next_render or self.log_screen.empty(): 860 # Clear the reset flag. 861 self._reset_log_screen_on_next_render = False 862 self.log_screen.reset_logs(log_index=self.log_index) 863 screen_update_needed = True 864 865 elif self.follow and self._new_logs_since_last_render: 866 # Follow mode is on so add new logs to the screen 867 self._new_logs_since_last_render = False 868 869 current_log_index = self.log_index 870 last_rendered_log_index = self.log_screen.last_appended_log_index 871 # If so many logs have arrived than can fit on the screen, redraw 872 # the whole screen from the new position. 873 if ( 874 current_log_index - last_rendered_log_index 875 ) > self.log_screen.height: 876 self.log_screen.reset_logs(log_index=self.log_index) 877 # A small amount of logs have arrived, append them one at a time 878 # without redrawing the whole screen. 879 else: 880 for i in range( 881 last_rendered_log_index + 1, current_log_index + 1 882 ): 883 self.log_screen.append_log(i) 884 885 screen_update_needed = True 886 887 if self.follow: 888 # Select the last line for follow mode. 889 self.log_screen.move_cursor_to_bottom() 890 screen_update_needed = True 891 892 if self._user_scroll_event: 893 self._user_scroll_event = False 894 screen_update_needed = True 895 896 if screen_update_needed: 897 self._line_fragment_cache = self.log_screen.get_lines( 898 marked_logs_start=self.marked_logs_start, 899 marked_logs_end=self.marked_logs_end, 900 ) 901 return self._line_fragment_cache 902 903 def _logs_to_text( 904 self, 905 use_table_formatting: bool = True, 906 selected_lines_only: bool = False, 907 ) -> str: 908 """Convert all or selected log messages to plaintext.""" 909 910 def get_table_string(log: LogLine) -> str: 911 return remove_formatting(self.log_store.table.formatted_row(log)) 912 913 formatter: Callable[[LogLine], str] = operator.attrgetter( 914 'ansi_stripped_log' 915 ) 916 if use_table_formatting: 917 formatter = get_table_string 918 919 _start_log_index, log_source = self._get_log_lines() 920 921 log_index_range = range( 922 self._scrollback_start_index, self.get_total_count() 923 ) 924 if ( 925 selected_lines_only 926 and self.marked_logs_start is not None 927 and self.marked_logs_end is not None 928 ): 929 log_index_range = range( 930 self.marked_logs_start, self.marked_logs_end + 1 931 ) 932 933 text_output = '' 934 for i in log_index_range: 935 log_text = formatter(log_source[i]) 936 text_output += log_text 937 if not log_text.endswith('\n'): 938 text_output += '\n' 939 940 return text_output 941 942 def export_logs( 943 self, 944 use_table_formatting: bool = True, 945 selected_lines_only: bool = False, 946 file_name: str | None = None, 947 to_clipboard: bool = False, 948 add_markdown_fence: bool = False, 949 ) -> bool: 950 """Export log lines to file or clipboard.""" 951 text_output = self._logs_to_text( 952 use_table_formatting, selected_lines_only 953 ) 954 955 if file_name: 956 target_path = Path(file_name).expanduser() 957 with target_path.open('w') as output_file: 958 output_file.write(text_output) 959 _LOG.debug('Saved to file: %s', file_name) 960 961 elif to_clipboard: 962 if add_markdown_fence: 963 text_output = '```\n' + text_output + '```\n' 964 self.log_pane.application.set_system_clipboard(text_output) 965 _LOG.debug('Copied logs to clipboard.') 966 967 return True 968