1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""LogView maintains a log pane's scrolling and searching state.""" 15 16from __future__ import annotations 17import asyncio 18import collections 19import copy 20from enum import Enum 21import itertools 22import json 23import logging 24import operator 25from pathlib import Path 26import re 27from threading import Thread 28from typing import Callable, Dict, List, Optional, Tuple, TYPE_CHECKING 29 30from prompt_toolkit.data_structures import Point 31from prompt_toolkit.formatted_text import StyleAndTextTuples 32import websockets 33 34from pw_console.log_filter import ( 35 DEFAULT_SEARCH_MATCHER, 36 LogFilter, 37 RegexValidator, 38 SearchMatcher, 39 preprocess_search_regex, 40) 41from pw_console.log_screen import ScreenLine, LogScreen 42from pw_console.log_store import LogStore 43from pw_console.python_logging import log_record_to_json 44from pw_console.text_formatting import remove_formatting 45 46if TYPE_CHECKING: 47 from pw_console.console_app import ConsoleApp 48 from pw_console.log_line import LogLine 49 from pw_console.log_pane import LogPane 50 51_LOG = logging.getLogger(__package__) 52 53 54class FollowEvent(Enum): 55 """Follow mode scroll event types.""" 56 57 SEARCH_MATCH = 'scroll_to_bottom' 58 STICKY_FOLLOW = 'scroll_to_bottom_with_sticky_follow' 59 60 61class LogView: 62 """Viewing window into a LogStore.""" 63 64 # pylint: disable=too-many-instance-attributes,too-many-public-methods 65 66 def __init__( 67 self, 68 log_pane: 'LogPane', 69 application: 'ConsoleApp', 70 log_store: Optional[LogStore] = None, 71 ): 72 # Parent LogPane reference. Updated by calling `set_log_pane()`. 73 self.log_pane = log_pane 74 self.log_store = ( 75 log_store if log_store else LogStore(prefs=application.prefs) 76 ) 77 self.log_store.set_prefs(application.prefs) 78 self.log_store.register_viewer(self) 79 80 self.marked_logs_start: Optional[int] = None 81 self.marked_logs_end: Optional[int] = None 82 83 # Search variables 84 self.search_text: Optional[str] = None 85 self.search_filter: Optional[LogFilter] = None 86 self.search_highlight: bool = False 87 self.search_matcher = DEFAULT_SEARCH_MATCHER 88 self.search_validator = RegexValidator() 89 90 # Container for each log_index matched by active searches. 91 self.search_matched_lines: Dict[int, int] = {} 92 # Background task to find historical matched lines. 93 self.search_match_count_task: Optional[asyncio.Task] = None 94 95 # Flag for automatically jumping to each new search match as they 96 # appear. 97 self.follow_search_match: bool = False 98 self.last_search_matched_log: Optional[int] = None 99 100 # Follow event flag. This is set to by the new_logs_arrived() function 101 # as a signal that the log screen should be scrolled to the bottom. 102 # This is read by render_content() whenever the screen is drawn. 103 self.follow_event: Optional[FollowEvent] = None 104 105 self.log_screen = LogScreen( 106 get_log_source=self._get_log_lines, 107 get_line_wrapping=self.wrap_lines_enabled, 108 get_log_formatter=self._get_table_formatter, 109 get_search_filter=lambda: self.search_filter, 110 get_search_highlight=lambda: self.search_highlight, 111 ) 112 113 # Filter 114 self.filtering_on: bool = False 115 self.filters: 'collections.OrderedDict[str, LogFilter]' = ( 116 collections.OrderedDict() 117 ) 118 self.filtered_logs: collections.deque = collections.deque() 119 self.filter_existing_logs_task: Optional[asyncio.Task] = None 120 121 # Current log line index state variables: 122 self._last_log_index = -1 123 self._log_index = 0 124 self._filtered_log_index = 0 125 self._last_start_index = 0 126 self._last_end_index = 0 127 self._current_start_index = 0 128 self._current_end_index = 0 129 self._scrollback_start_index = 0 130 131 # LogPane prompt_toolkit container render size. 132 self._window_height = 20 133 self._window_width = 80 134 self._reset_log_screen_on_next_render: bool = True 135 self._user_scroll_event: bool = False 136 137 self._last_log_store_index = 0 138 self._new_logs_since_last_render = True 139 self._new_logs_since_last_websocket_serve = True 140 self._last_served_websocket_index = -1 141 142 # Should new log lines be tailed? 143 self.follow: bool = True 144 145 self.visual_select_mode: bool = False 146 147 # Cache of formatted text tuples used in the last UI render. 148 self._line_fragment_cache: List[StyleAndTextTuples] = [] 149 150 # websocket server variables 151 self.websocket_running: bool = False 152 self.websocket_server = None 153 self.websocket_port = None 154 self.websocket_loop = asyncio.new_event_loop() 155 156 # Check if any logs are already in the log_store and update the view. 157 self.new_logs_arrived() 158 159 def _websocket_thread_entry(self): 160 """Entry point for the user code thread.""" 161 asyncio.set_event_loop(self.websocket_loop) 162 self.websocket_server = websockets.serve( # type: ignore # pylint: disable=no-member 163 self._send_logs_over_websockets, '127.0.0.1' 164 ) 165 self.websocket_loop.run_until_complete(self.websocket_server) 166 self.websocket_port = self.websocket_server.ws_server.sockets[ 167 0 168 ].getsockname()[1] 169 self.log_pane.application.application.clipboard.set_text( 170 self.get_web_socket_url() 171 ) 172 self.websocket_running = True 173 self.websocket_loop.run_forever() 174 175 def start_websocket_thread(self): 176 """Create a thread for running user code so the UI isn't blocked.""" 177 thread = Thread( 178 target=self._websocket_thread_entry, args=(), daemon=True 179 ) 180 thread.start() 181 182 def stop_websocket_thread(self): 183 """Stop websocket server.""" 184 if self.websocket_running: 185 self.websocket_loop.call_soon_threadsafe(self.websocket_loop.stop) 186 self.websocket_server = None 187 self.websocket_port = None 188 self.websocket_running = False 189 if self.filtering_on: 190 self._restart_filtering() 191 192 async def _send_logs_over_websockets(self, websocket, _path) -> None: 193 formatter: Callable[[LogLine], str] = operator.attrgetter( 194 'ansi_stripped_log' 195 ) 196 formatter = lambda log: log_record_to_json(log.record) 197 198 theme_colors = json.dumps( 199 self.log_pane.application.prefs.pw_console_color_config() 200 ) 201 # Send colors 202 await websocket.send(theme_colors) 203 204 while True: 205 # Wait for new logs 206 if not self._new_logs_since_last_websocket_serve: 207 await asyncio.sleep(0.5) 208 209 _start_log_index, log_source = self._get_log_lines() 210 log_index_range = range( 211 self._last_served_websocket_index + 1, self.get_total_count() 212 ) 213 214 for i in log_index_range: 215 log_text = formatter(log_source[i]) 216 await websocket.send(log_text) 217 self._last_served_websocket_index = i 218 219 # Flag that all logs have been served. 220 self._new_logs_since_last_websocket_serve = False 221 222 def view_mode_changed(self) -> None: 223 self._reset_log_screen_on_next_render = True 224 225 @property 226 def log_index(self): 227 if self.filtering_on: 228 return self._filtered_log_index 229 return self._log_index 230 231 @log_index.setter 232 def log_index(self, new_log_index): 233 # Save the old log_index 234 self._last_log_index = self.log_index 235 if self.filtering_on: 236 self._filtered_log_index = new_log_index 237 else: 238 self._log_index = new_log_index 239 240 def _reset_log_index_changed(self) -> None: 241 self._last_log_index = self.log_index 242 243 def log_index_changed_since_last_render(self) -> bool: 244 return self._last_log_index != self.log_index 245 246 def _set_match_position(self, position: int): 247 self.follow = False 248 self.log_index = position 249 self.save_search_matched_line(position) 250 self.log_screen.reset_logs(log_index=self.log_index) 251 self.log_screen.shift_selected_log_to_center() 252 self._user_scroll_event = True 253 self.log_pane.application.redraw_ui() 254 255 def select_next_search_matcher(self): 256 matchers = list(SearchMatcher) 257 index = matchers.index(self.search_matcher) 258 new_index = (index + 1) % len(matchers) 259 self.search_matcher = matchers[new_index] 260 261 def search_forwards(self): 262 if not self.search_filter: 263 return 264 self.search_highlight = True 265 266 log_beginning_index = self.hidden_line_count() 267 268 starting_index = self.log_index + 1 269 if starting_index > self.get_last_log_index(): 270 starting_index = log_beginning_index 271 272 _, logs = self._get_log_lines() 273 274 # From current position +1 and down 275 for i in range(starting_index, self.get_last_log_index() + 1): 276 if self.search_filter.matches(logs[i]): 277 self._set_match_position(i) 278 return 279 280 # From the beginning to the original start 281 for i in range(log_beginning_index, starting_index): 282 if self.search_filter.matches(logs[i]): 283 self._set_match_position(i) 284 return 285 286 def search_backwards(self): 287 if not self.search_filter: 288 return 289 self.search_highlight = True 290 291 log_beginning_index = self.hidden_line_count() 292 293 starting_index = self.log_index - 1 294 if starting_index < 0: 295 starting_index = self.get_last_log_index() 296 297 _, logs = self._get_log_lines() 298 299 # From current position - 1 and up 300 for i in range(starting_index, log_beginning_index - 1, -1): 301 if self.search_filter.matches(logs[i]): 302 self._set_match_position(i) 303 return 304 305 # From the end to the original start 306 for i in range(self.get_last_log_index(), starting_index, -1): 307 if self.search_filter.matches(logs[i]): 308 self._set_match_position(i) 309 return 310 311 def set_search_regex( 312 self, text, invert, field, matcher: Optional[SearchMatcher] = None 313 ) -> bool: 314 search_matcher = matcher if matcher else self.search_matcher 315 _LOG.debug(search_matcher) 316 317 regex_text, regex_flags = preprocess_search_regex( 318 text, matcher=search_matcher 319 ) 320 321 try: 322 compiled_regex = re.compile(regex_text, regex_flags) 323 self.search_filter = LogFilter( 324 regex=compiled_regex, 325 input_text=text, 326 invert=invert, 327 field=field, 328 ) 329 _LOG.debug(self.search_filter) 330 except re.error as error: 331 _LOG.debug(error) 332 return False 333 334 self.search_highlight = True 335 self.search_text = regex_text 336 return True 337 338 def new_search( 339 self, 340 text, 341 invert=False, 342 field: Optional[str] = None, 343 search_matcher: Optional[str] = None, 344 interactive: bool = True, 345 ) -> bool: 346 """Start a new search for the given text.""" 347 valid_matchers = list(s.name for s in SearchMatcher) 348 selected_matcher: Optional[SearchMatcher] = None 349 if ( 350 search_matcher is not None 351 and search_matcher.upper() in valid_matchers 352 ): 353 selected_matcher = SearchMatcher(search_matcher.upper()) 354 355 if not self.set_search_regex(text, invert, field, selected_matcher): 356 return False 357 358 # Clear matched lines 359 self.search_matched_lines = {} 360 361 if interactive: 362 # Start count historical search matches task. 363 self.search_match_count_task = asyncio.create_task( 364 self.count_search_matches() 365 ) 366 367 # Default search direction when hitting enter in the search bar. 368 if interactive: 369 self.search_forwards() 370 return True 371 372 def save_search_matched_line(self, log_index: int) -> None: 373 """Save the log_index at position as a matched line.""" 374 self.search_matched_lines[log_index] = 0 375 # Keep matched lines sorted by position 376 self.search_matched_lines = { 377 # Save this log_index and its match number. 378 log_index: match_number 379 for match_number, log_index in enumerate( 380 sorted(self.search_matched_lines.keys()) 381 ) 382 } 383 384 def disable_search_highlighting(self): 385 self.log_pane.log_view.search_highlight = False 386 387 def _restart_filtering(self): 388 # Turn on follow 389 if not self.follow: 390 self.toggle_follow() 391 392 # Reset filtered logs. 393 self.filtered_logs.clear() 394 # Reset scrollback start 395 self._scrollback_start_index = 0 396 397 # Start filtering existing log lines. 398 self.filter_existing_logs_task = asyncio.create_task( 399 self.filter_past_logs() 400 ) 401 402 # Reset existing search 403 self.clear_search() 404 405 # Trigger a main menu update to set log window menu titles. 406 self.log_pane.application.update_menu_items() 407 # Redraw the UI 408 self.log_pane.application.redraw_ui() 409 410 def install_new_filter(self): 411 """Set a filter using the current search_regex.""" 412 if not self.search_filter: 413 return 414 415 self.filtering_on = True 416 self.filters[self.search_text] = copy.deepcopy(self.search_filter) 417 418 self.clear_search() 419 420 def apply_filter(self): 421 """Set new filter and schedule historical log filter asyncio task.""" 422 if self.websocket_running: 423 return 424 self.install_new_filter() 425 self._restart_filtering() 426 427 def clear_search_highlighting(self): 428 self.search_highlight = False 429 self._reset_log_screen_on_next_render = True 430 431 def clear_search(self): 432 self.search_matched_lines = {} 433 self.search_text = None 434 self.search_filter = None 435 self.search_highlight = False 436 self._reset_log_screen_on_next_render = True 437 438 def _get_log_lines(self) -> Tuple[int, collections.deque[LogLine]]: 439 logs = self.log_store.logs 440 if self.filtering_on: 441 logs = self.filtered_logs 442 return self._scrollback_start_index, logs 443 444 def _get_visible_log_lines(self): 445 _, logs = self._get_log_lines() 446 if self._scrollback_start_index > 0: 447 return collections.deque( 448 itertools.islice(logs, self.hidden_line_count(), len(logs)) 449 ) 450 return logs 451 452 def _get_table_formatter(self) -> Optional[Callable]: 453 table_formatter = None 454 if self.log_pane.table_view: 455 table_formatter = self.log_store.table.formatted_row 456 return table_formatter 457 458 def delete_filter(self, filter_text): 459 if filter_text not in self.filters: 460 return 461 462 # Delete this filter 463 del self.filters[filter_text] 464 465 # If no filters left, stop filtering. 466 if len(self.filters) == 0: 467 self.clear_filters() 468 else: 469 # Erase existing filtered lines. 470 self._restart_filtering() 471 472 def clear_filters(self): 473 if not self.filtering_on: 474 return 475 self.clear_search() 476 self.filtering_on = False 477 self.filters: 'collections.OrderedDict[str, re.Pattern]' = ( 478 collections.OrderedDict() 479 ) 480 self.filtered_logs.clear() 481 # Reset scrollback start 482 self._scrollback_start_index = 0 483 if not self.follow: 484 self.toggle_follow() 485 486 async def count_search_matches(self): 487 """Count search matches and save their locations.""" 488 # Wait for any filter_existing_logs_task to finish. 489 if self.filtering_on and self.filter_existing_logs_task: 490 await self.filter_existing_logs_task 491 492 starting_index = self.get_last_log_index() 493 ending_index, logs = self._get_log_lines() 494 495 # From the end of the log store to the beginning. 496 for i in range(starting_index, ending_index - 1, -1): 497 # Is this log a match? 498 if self.search_filter.matches(logs[i]): 499 self.save_search_matched_line(i) 500 # Pause every 100 lines or so 501 if i % 100 == 0: 502 await asyncio.sleep(0.1) 503 504 async def filter_past_logs(self): 505 """Filter past log lines.""" 506 starting_index = self.log_store.get_last_log_index() 507 ending_index = -1 508 509 # From the end of the log store to the beginning. 510 for i in range(starting_index, ending_index, -1): 511 # Is this log a match? 512 if self.filter_scan(self.log_store.logs[i]): 513 # Add to the beginning of the deque. 514 self.filtered_logs.appendleft(self.log_store.logs[i]) 515 # TODO(tonymd): Tune these values. 516 # Pause every 100 lines or so 517 if i % 100 == 0: 518 await asyncio.sleep(0.1) 519 520 def set_log_pane(self, log_pane: 'LogPane'): 521 """Set the parent LogPane instance.""" 522 self.log_pane = log_pane 523 524 def _update_log_index(self) -> ScreenLine: 525 line_at_cursor = self.log_screen.get_line_at_cursor_position() 526 if line_at_cursor.log_index is not None: 527 self.log_index = line_at_cursor.log_index 528 return line_at_cursor 529 530 def get_current_line(self) -> int: 531 """Return the currently selected log event index.""" 532 return self.log_index 533 534 def get_total_count(self): 535 """Total size of the logs store.""" 536 return ( 537 len(self.filtered_logs) 538 if self.filtering_on 539 else self.log_store.get_total_count() 540 ) 541 542 def get_last_log_index(self): 543 total = self.get_total_count() 544 return 0 if total < 0 else total - 1 545 546 def clear_scrollback(self): 547 """Hide log lines before the max length of the stored logs.""" 548 # Enable follow and scroll to the bottom, then clear. 549 if not self.follow: 550 self.toggle_follow() 551 self._scrollback_start_index = self.log_index 552 self._reset_log_screen_on_next_render = True 553 554 def hidden_line_count(self): 555 """Return the number of hidden lines.""" 556 if self._scrollback_start_index > 0: 557 return self._scrollback_start_index + 1 558 return 0 559 560 def undo_clear_scrollback(self): 561 """Reset the current scrollback start index.""" 562 self._scrollback_start_index = 0 563 564 def wrap_lines_enabled(self): 565 """Get the parent log pane wrap lines setting.""" 566 if not self.log_pane: 567 return False 568 return self.log_pane.wrap_lines 569 570 def toggle_follow(self): 571 """Toggle auto line following.""" 572 self.follow = not self.follow 573 if self.follow: 574 # Disable search match follow mode. 575 self.follow_search_match = False 576 self.scroll_to_bottom() 577 578 def filter_scan(self, log: 'LogLine'): 579 filter_match_count = 0 580 for _filter_text, log_filter in self.filters.items(): 581 if log_filter.matches(log): 582 filter_match_count += 1 583 else: 584 break 585 586 if filter_match_count == len(self.filters): 587 return True 588 return False 589 590 def new_logs_arrived(self): 591 """Check newly arrived log messages. 592 593 Depending on where log statements occur ``new_logs_arrived`` may be in a 594 separate thread since it is triggerd by the Python log handler 595 ``emit()`` function. In this case the log handler is the LogStore 596 instance ``self.log_store``. This function should not redraw the screen 597 or scroll. 598 """ 599 latest_total = self.log_store.get_total_count() 600 601 if self.filtering_on: 602 # Scan newly arived log lines 603 for i in range(self._last_log_store_index, latest_total): 604 if self.filter_scan(self.log_store.logs[i]): 605 self.filtered_logs.append(self.log_store.logs[i]) 606 607 if self.search_filter: 608 last_matched_log: Optional[int] = None 609 # Scan newly arived log lines 610 for i in range(self._last_log_store_index, latest_total): 611 if self.search_filter.matches(self.log_store.logs[i]): 612 self.save_search_matched_line(i) 613 last_matched_log = i 614 if last_matched_log and self.follow_search_match: 615 # Set the follow event flag for the next render_content call. 616 self.follow_event = FollowEvent.SEARCH_MATCH 617 self.last_search_matched_log = last_matched_log 618 619 self._last_log_store_index = latest_total 620 self._new_logs_since_last_render = True 621 self._new_logs_since_last_websocket_serve = True 622 623 if self.follow: 624 # Set the follow event flag for the next render_content call. 625 self.follow_event = FollowEvent.STICKY_FOLLOW 626 627 if self.websocket_running: 628 # No terminal screen redraws are required. 629 return 630 631 # Trigger a UI update if the log window is visible. 632 if self.log_pane.show_pane: 633 self.log_pane.application.logs_redraw() 634 635 def get_cursor_position(self) -> Point: 636 """Return the position of the cursor.""" 637 return Point(0, self.log_screen.cursor_position) 638 639 def scroll_to_top(self): 640 """Move selected index to the beginning.""" 641 # Stop following so cursor doesn't jump back down to the bottom. 642 self.follow = False 643 # First possible log index that should be displayed 644 log_beginning_index = self.hidden_line_count() 645 self.log_index = log_beginning_index 646 self.log_screen.reset_logs(log_index=self.log_index) 647 self.log_screen.shift_selected_log_to_top() 648 self._user_scroll_event = True 649 650 def move_selected_line_to_top(self): 651 self.follow = False 652 653 # Update selected line 654 self._update_log_index() 655 656 self.log_screen.reset_logs(log_index=self.log_index) 657 self.log_screen.shift_selected_log_to_top() 658 self._user_scroll_event = True 659 660 def center_log_line(self): 661 self.follow = False 662 663 # Update selected line 664 self._update_log_index() 665 666 self.log_screen.reset_logs(log_index=self.log_index) 667 self.log_screen.shift_selected_log_to_center() 668 self._user_scroll_event = True 669 670 def scroll_to_bottom(self, with_sticky_follow: bool = True): 671 """Move selected index to the end.""" 672 # Don't change following state like scroll_to_top. 673 self.log_index = max(0, self.get_last_log_index()) 674 self.log_screen.reset_logs(log_index=self.log_index) 675 676 # Sticky follow mode 677 if with_sticky_follow: 678 self.follow = True 679 self._user_scroll_event = True 680 681 def scroll(self, lines) -> None: 682 """Scroll up or down by plus or minus lines. 683 684 This method is only called by user keybindings. 685 """ 686 # If the user starts scrolling, stop auto following. 687 self.follow = False 688 689 self.log_screen.scroll_subline(lines) 690 self._user_scroll_event = True 691 692 # Update the current log 693 current_line = self._update_log_index() 694 695 # Don't check for sticky follow mode if selecting lines. 696 if self.visual_select_mode: 697 return 698 # Is the last log line selected? 699 if self.log_index == self.get_last_log_index(): 700 # Is the last line of the current log selected? 701 if current_line.subline + 1 == current_line.height: 702 # Sticky follow mode 703 self.follow = True 704 705 def visual_selected_log_count(self) -> int: 706 if self.marked_logs_start is None or self.marked_logs_end is None: 707 return 0 708 return (self.marked_logs_end - self.marked_logs_start) + 1 709 710 def clear_visual_selection(self) -> None: 711 self.marked_logs_start = None 712 self.marked_logs_end = None 713 self.visual_select_mode = False 714 self._user_scroll_event = True 715 self.log_pane.application.redraw_ui() 716 717 def visual_select_all(self) -> None: 718 self.marked_logs_start = self._scrollback_start_index 719 self.marked_logs_end = self.get_total_count() - 1 720 721 self.visual_select_mode = True 722 self._user_scroll_event = True 723 self.log_pane.application.redraw_ui() 724 725 def visual_select_up(self) -> None: 726 # Select the current line 727 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 728 # Move the cursor by 1 729 self.scroll_up(1) 730 # Select the new line 731 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 732 733 def visual_select_down(self) -> None: 734 # Select the current line 735 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 736 # Move the cursor by 1 737 self.scroll_down(1) 738 # Select the new line 739 self.visual_select_line(self.get_cursor_position(), autoscroll=False) 740 741 def visual_select_line( 742 self, mouse_position: Point, autoscroll: bool = True 743 ) -> None: 744 """Mark the log under mouse_position as visually selected.""" 745 # Check mouse_position is valid 746 if not 0 <= mouse_position.y < len(self.log_screen.line_buffer): 747 return 748 # Update mode flags 749 self.visual_select_mode = True 750 self.follow = False 751 # Get the ScreenLine for the cursor position 752 screen_line = self.log_screen.line_buffer[mouse_position.y] 753 if screen_line.log_index is None: 754 return 755 756 if self.marked_logs_start is None: 757 self.marked_logs_start = screen_line.log_index 758 if self.marked_logs_end is None: 759 self.marked_logs_end = screen_line.log_index 760 761 if screen_line.log_index < self.marked_logs_start: 762 self.marked_logs_start = screen_line.log_index 763 elif screen_line.log_index > self.marked_logs_end: 764 self.marked_logs_end = screen_line.log_index 765 766 # Update cursor position 767 self.log_screen.move_cursor_to_position(mouse_position.y) 768 769 # Autoscroll when mouse dragging on the top or bottom of the window. 770 if autoscroll: 771 if mouse_position.y == 0: 772 self.scroll_up(1) 773 elif mouse_position.y == self._window_height - 1: 774 self.scroll_down(1) 775 776 # Trigger a rerender. 777 self._user_scroll_event = True 778 self.log_pane.application.redraw_ui() 779 780 def scroll_to_position(self, mouse_position: Point): 781 """Set the selected log line to the mouse_position.""" 782 # Disable follow mode when the user clicks or mouse drags on a log line. 783 self.follow = False 784 785 self.log_screen.move_cursor_to_position(mouse_position.y) 786 self._update_log_index() 787 788 self._user_scroll_event = True 789 790 def scroll_up_one_page(self): 791 """Move the selected log index up by one window height.""" 792 lines = 1 793 if self._window_height > 0: 794 lines = self._window_height 795 self.scroll(-1 * lines) 796 797 def scroll_down_one_page(self): 798 """Move the selected log index down by one window height.""" 799 lines = 1 800 if self._window_height > 0: 801 lines = self._window_height 802 self.scroll(lines) 803 804 def scroll_down(self, lines=1): 805 """Move the selected log index down by one or more lines.""" 806 self.scroll(lines) 807 808 def scroll_up(self, lines=1): 809 """Move the selected log index up by one or more lines.""" 810 self.scroll(-1 * lines) 811 812 def log_start_end_indexes_changed(self) -> bool: 813 return ( 814 self._last_start_index != self._current_start_index 815 or self._last_end_index != self._current_end_index 816 ) 817 818 def render_table_header(self): 819 """Get pre-formatted table header.""" 820 return self.log_store.render_table_header() 821 822 def get_web_socket_url(self): 823 return f'http://127.0.0.1:3000/#ws={self.websocket_port}' 824 825 def render_content(self) -> List: 826 """Return logs to display on screen as a list of FormattedText tuples. 827 828 This function determines when the log screen requires re-rendeing based 829 on user scroll events, follow mode being on, or log pane being 830 empty. The FormattedText tuples passed to prompt_toolkit are cached if 831 no updates are required. 832 """ 833 screen_update_needed = False 834 835 # Disable rendering if user is viewing logs on web 836 if self.websocket_running: 837 return [] 838 839 # Check window size 840 if self.log_pane.pane_resized(): 841 self._window_width = self.log_pane.current_log_pane_width 842 self._window_height = self.log_pane.current_log_pane_height 843 self.log_screen.resize(self._window_width, self._window_height) 844 self._reset_log_screen_on_next_render = True 845 846 if self.follow_event is not None: 847 if ( 848 self.follow_event == FollowEvent.SEARCH_MATCH 849 and self.last_search_matched_log 850 ): 851 self.log_index = self.last_search_matched_log 852 self.last_search_matched_log = None 853 self._reset_log_screen_on_next_render = True 854 855 elif self.follow_event == FollowEvent.STICKY_FOLLOW: 856 # Jump to the last log message 857 self.log_index = max(0, self.get_last_log_index()) 858 859 self.follow_event = None 860 screen_update_needed = True 861 862 if self._reset_log_screen_on_next_render or self.log_screen.empty(): 863 # Clear the reset flag. 864 self._reset_log_screen_on_next_render = False 865 self.log_screen.reset_logs(log_index=self.log_index) 866 screen_update_needed = True 867 868 elif self.follow and self._new_logs_since_last_render: 869 # Follow mode is on so add new logs to the screen 870 self._new_logs_since_last_render = False 871 872 current_log_index = self.log_index 873 last_rendered_log_index = self.log_screen.last_appended_log_index 874 # If so many logs have arrived than can fit on the screen, redraw 875 # the whole screen from the new position. 876 if ( 877 current_log_index - last_rendered_log_index 878 ) > self.log_screen.height: 879 self.log_screen.reset_logs(log_index=self.log_index) 880 # A small amount of logs have arrived, append them one at a time 881 # without redrawing the whole screen. 882 else: 883 for i in range( 884 last_rendered_log_index + 1, current_log_index + 1 885 ): 886 self.log_screen.append_log(i) 887 888 screen_update_needed = True 889 890 if self.follow: 891 # Select the last line for follow mode. 892 self.log_screen.move_cursor_to_bottom() 893 screen_update_needed = True 894 895 if self._user_scroll_event: 896 self._user_scroll_event = False 897 screen_update_needed = True 898 899 if screen_update_needed: 900 self._line_fragment_cache = self.log_screen.get_lines( 901 marked_logs_start=self.marked_logs_start, 902 marked_logs_end=self.marked_logs_end, 903 ) 904 return self._line_fragment_cache 905 906 def _logs_to_text( 907 self, 908 use_table_formatting: bool = True, 909 selected_lines_only: bool = False, 910 ) -> str: 911 """Convert all or selected log messages to plaintext.""" 912 913 def get_table_string(log: LogLine) -> str: 914 return remove_formatting(self.log_store.table.formatted_row(log)) 915 916 formatter: Callable[[LogLine], str] = operator.attrgetter( 917 'ansi_stripped_log' 918 ) 919 if use_table_formatting: 920 formatter = get_table_string 921 922 _start_log_index, log_source = self._get_log_lines() 923 924 log_index_range = range( 925 self._scrollback_start_index, self.get_total_count() 926 ) 927 if ( 928 selected_lines_only 929 and self.marked_logs_start is not None 930 and self.marked_logs_end is not None 931 ): 932 log_index_range = range( 933 self.marked_logs_start, self.marked_logs_end + 1 934 ) 935 936 text_output = '' 937 for i in log_index_range: 938 log_text = formatter(log_source[i]) 939 text_output += log_text 940 if not log_text.endswith('\n'): 941 text_output += '\n' 942 943 return text_output 944 945 def export_logs( 946 self, 947 use_table_formatting: bool = True, 948 selected_lines_only: bool = False, 949 file_name: Optional[str] = None, 950 to_clipboard: bool = False, 951 add_markdown_fence: bool = False, 952 ) -> bool: 953 """Export log lines to file or clipboard.""" 954 text_output = self._logs_to_text( 955 use_table_formatting, selected_lines_only 956 ) 957 958 if file_name: 959 target_path = Path(file_name).expanduser() 960 with target_path.open('w') as output_file: 961 output_file.write(text_output) 962 _LOG.debug('Saved to file: %s', file_name) 963 964 elif to_clipboard: 965 if add_markdown_fence: 966 text_output = '```\n' + text_output + '```\n' 967 self.log_pane.application.application.clipboard.set_text( 968 text_output 969 ) 970 _LOG.debug('Copied logs to clipboard.') 971 972 return True 973