• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""LogView maintains a log pane's scrolling and searching state."""
15
16from __future__ import annotations
17import asyncio
18import collections
19import copy
20from enum import Enum
21import itertools
22import json
23import logging
24import operator
25from pathlib import Path
26import re
27from threading import Thread
28from typing import Callable, Dict, List, Optional, Tuple, TYPE_CHECKING
29
30from prompt_toolkit.data_structures import Point
31from prompt_toolkit.formatted_text import StyleAndTextTuples
32import websockets
33
34from pw_console.log_filter import (
35    DEFAULT_SEARCH_MATCHER,
36    LogFilter,
37    RegexValidator,
38    SearchMatcher,
39    preprocess_search_regex,
40)
41from pw_console.log_screen import ScreenLine, LogScreen
42from pw_console.log_store import LogStore
43from pw_console.python_logging import log_record_to_json
44from pw_console.text_formatting import remove_formatting
45
46if TYPE_CHECKING:
47    from pw_console.console_app import ConsoleApp
48    from pw_console.log_line import LogLine
49    from pw_console.log_pane import LogPane
50
51_LOG = logging.getLogger(__package__)
52
53
54class FollowEvent(Enum):
55    """Follow mode scroll event types."""
56
57    SEARCH_MATCH = 'scroll_to_bottom'
58    STICKY_FOLLOW = 'scroll_to_bottom_with_sticky_follow'
59
60
61class LogView:
62    """Viewing window into a LogStore."""
63
64    # pylint: disable=too-many-instance-attributes,too-many-public-methods
65
66    def __init__(
67        self,
68        log_pane: 'LogPane',
69        application: 'ConsoleApp',
70        log_store: Optional[LogStore] = None,
71    ):
72        # Parent LogPane reference. Updated by calling `set_log_pane()`.
73        self.log_pane = log_pane
74        self.log_store = (
75            log_store if log_store else LogStore(prefs=application.prefs)
76        )
77        self.log_store.set_prefs(application.prefs)
78        self.log_store.register_viewer(self)
79
80        self.marked_logs_start: Optional[int] = None
81        self.marked_logs_end: Optional[int] = None
82
83        # Search variables
84        self.search_text: Optional[str] = None
85        self.search_filter: Optional[LogFilter] = None
86        self.search_highlight: bool = False
87        self.search_matcher = DEFAULT_SEARCH_MATCHER
88        self.search_validator = RegexValidator()
89
90        # Container for each log_index matched by active searches.
91        self.search_matched_lines: Dict[int, int] = {}
92        # Background task to find historical matched lines.
93        self.search_match_count_task: Optional[asyncio.Task] = None
94
95        # Flag for automatically jumping to each new search match as they
96        # appear.
97        self.follow_search_match: bool = False
98        self.last_search_matched_log: Optional[int] = None
99
100        # Follow event flag. This is set to by the new_logs_arrived() function
101        # as a signal that the log screen should be scrolled to the bottom.
102        # This is read by render_content() whenever the screen is drawn.
103        self.follow_event: Optional[FollowEvent] = None
104
105        self.log_screen = LogScreen(
106            get_log_source=self._get_log_lines,
107            get_line_wrapping=self.wrap_lines_enabled,
108            get_log_formatter=self._get_table_formatter,
109            get_search_filter=lambda: self.search_filter,
110            get_search_highlight=lambda: self.search_highlight,
111        )
112
113        # Filter
114        self.filtering_on: bool = False
115        self.filters: 'collections.OrderedDict[str, LogFilter]' = (
116            collections.OrderedDict()
117        )
118        self.filtered_logs: collections.deque = collections.deque()
119        self.filter_existing_logs_task: Optional[asyncio.Task] = None
120
121        # Current log line index state variables:
122        self._last_log_index = -1
123        self._log_index = 0
124        self._filtered_log_index = 0
125        self._last_start_index = 0
126        self._last_end_index = 0
127        self._current_start_index = 0
128        self._current_end_index = 0
129        self._scrollback_start_index = 0
130
131        # LogPane prompt_toolkit container render size.
132        self._window_height = 20
133        self._window_width = 80
134        self._reset_log_screen_on_next_render: bool = True
135        self._user_scroll_event: bool = False
136
137        self._last_log_store_index = 0
138        self._new_logs_since_last_render = True
139        self._new_logs_since_last_websocket_serve = True
140        self._last_served_websocket_index = -1
141
142        # Should new log lines be tailed?
143        self.follow: bool = True
144
145        self.visual_select_mode: bool = False
146
147        # Cache of formatted text tuples used in the last UI render.
148        self._line_fragment_cache: List[StyleAndTextTuples] = []
149
150        # websocket server variables
151        self.websocket_running: bool = False
152        self.websocket_server = None
153        self.websocket_port = None
154        self.websocket_loop = asyncio.new_event_loop()
155
156        # Check if any logs are already in the log_store and update the view.
157        self.new_logs_arrived()
158
159    def _websocket_thread_entry(self):
160        """Entry point for the user code thread."""
161        asyncio.set_event_loop(self.websocket_loop)
162        self.websocket_server = websockets.serve(  # type: ignore # pylint: disable=no-member
163            self._send_logs_over_websockets, '127.0.0.1'
164        )
165        self.websocket_loop.run_until_complete(self.websocket_server)
166        self.websocket_port = self.websocket_server.ws_server.sockets[
167            0
168        ].getsockname()[1]
169        self.log_pane.application.application.clipboard.set_text(
170            self.get_web_socket_url()
171        )
172        self.websocket_running = True
173        self.websocket_loop.run_forever()
174
175    def start_websocket_thread(self):
176        """Create a thread for running user code so the UI isn't blocked."""
177        thread = Thread(
178            target=self._websocket_thread_entry, args=(), daemon=True
179        )
180        thread.start()
181
182    def stop_websocket_thread(self):
183        """Stop websocket server."""
184        if self.websocket_running:
185            self.websocket_loop.call_soon_threadsafe(self.websocket_loop.stop)
186            self.websocket_server = None
187            self.websocket_port = None
188            self.websocket_running = False
189            if self.filtering_on:
190                self._restart_filtering()
191
192    async def _send_logs_over_websockets(self, websocket, _path) -> None:
193        formatter: Callable[[LogLine], str] = operator.attrgetter(
194            'ansi_stripped_log'
195        )
196        formatter = lambda log: log_record_to_json(log.record)
197
198        theme_colors = json.dumps(
199            self.log_pane.application.prefs.pw_console_color_config()
200        )
201        # Send colors
202        await websocket.send(theme_colors)
203
204        while True:
205            # Wait for new logs
206            if not self._new_logs_since_last_websocket_serve:
207                await asyncio.sleep(0.5)
208
209            _start_log_index, log_source = self._get_log_lines()
210            log_index_range = range(
211                self._last_served_websocket_index + 1, self.get_total_count()
212            )
213
214            for i in log_index_range:
215                log_text = formatter(log_source[i])
216                await websocket.send(log_text)
217                self._last_served_websocket_index = i
218
219            # Flag that all logs have been served.
220            self._new_logs_since_last_websocket_serve = False
221
222    def view_mode_changed(self) -> None:
223        self._reset_log_screen_on_next_render = True
224
225    @property
226    def log_index(self):
227        if self.filtering_on:
228            return self._filtered_log_index
229        return self._log_index
230
231    @log_index.setter
232    def log_index(self, new_log_index):
233        # Save the old log_index
234        self._last_log_index = self.log_index
235        if self.filtering_on:
236            self._filtered_log_index = new_log_index
237        else:
238            self._log_index = new_log_index
239
240    def _reset_log_index_changed(self) -> None:
241        self._last_log_index = self.log_index
242
243    def log_index_changed_since_last_render(self) -> bool:
244        return self._last_log_index != self.log_index
245
246    def _set_match_position(self, position: int):
247        self.follow = False
248        self.log_index = position
249        self.save_search_matched_line(position)
250        self.log_screen.reset_logs(log_index=self.log_index)
251        self.log_screen.shift_selected_log_to_center()
252        self._user_scroll_event = True
253        self.log_pane.application.redraw_ui()
254
255    def select_next_search_matcher(self):
256        matchers = list(SearchMatcher)
257        index = matchers.index(self.search_matcher)
258        new_index = (index + 1) % len(matchers)
259        self.search_matcher = matchers[new_index]
260
261    def search_forwards(self):
262        if not self.search_filter:
263            return
264        self.search_highlight = True
265
266        log_beginning_index = self.hidden_line_count()
267
268        starting_index = self.log_index + 1
269        if starting_index > self.get_last_log_index():
270            starting_index = log_beginning_index
271
272        _, logs = self._get_log_lines()
273
274        # From current position +1 and down
275        for i in range(starting_index, self.get_last_log_index() + 1):
276            if self.search_filter.matches(logs[i]):
277                self._set_match_position(i)
278                return
279
280        # From the beginning to the original start
281        for i in range(log_beginning_index, starting_index):
282            if self.search_filter.matches(logs[i]):
283                self._set_match_position(i)
284                return
285
286    def search_backwards(self):
287        if not self.search_filter:
288            return
289        self.search_highlight = True
290
291        log_beginning_index = self.hidden_line_count()
292
293        starting_index = self.log_index - 1
294        if starting_index < 0:
295            starting_index = self.get_last_log_index()
296
297        _, logs = self._get_log_lines()
298
299        # From current position - 1 and up
300        for i in range(starting_index, log_beginning_index - 1, -1):
301            if self.search_filter.matches(logs[i]):
302                self._set_match_position(i)
303                return
304
305        # From the end to the original start
306        for i in range(self.get_last_log_index(), starting_index, -1):
307            if self.search_filter.matches(logs[i]):
308                self._set_match_position(i)
309                return
310
311    def set_search_regex(
312        self, text, invert, field, matcher: Optional[SearchMatcher] = None
313    ) -> bool:
314        search_matcher = matcher if matcher else self.search_matcher
315        _LOG.debug(search_matcher)
316
317        regex_text, regex_flags = preprocess_search_regex(
318            text, matcher=search_matcher
319        )
320
321        try:
322            compiled_regex = re.compile(regex_text, regex_flags)
323            self.search_filter = LogFilter(
324                regex=compiled_regex,
325                input_text=text,
326                invert=invert,
327                field=field,
328            )
329            _LOG.debug(self.search_filter)
330        except re.error as error:
331            _LOG.debug(error)
332            return False
333
334        self.search_highlight = True
335        self.search_text = regex_text
336        return True
337
338    def new_search(
339        self,
340        text,
341        invert=False,
342        field: Optional[str] = None,
343        search_matcher: Optional[str] = None,
344        interactive: bool = True,
345    ) -> bool:
346        """Start a new search for the given text."""
347        valid_matchers = list(s.name for s in SearchMatcher)
348        selected_matcher: Optional[SearchMatcher] = None
349        if (
350            search_matcher is not None
351            and search_matcher.upper() in valid_matchers
352        ):
353            selected_matcher = SearchMatcher(search_matcher.upper())
354
355        if not self.set_search_regex(text, invert, field, selected_matcher):
356            return False
357
358        # Clear matched lines
359        self.search_matched_lines = {}
360
361        if interactive:
362            # Start count historical search matches task.
363            self.search_match_count_task = asyncio.create_task(
364                self.count_search_matches()
365            )
366
367        # Default search direction when hitting enter in the search bar.
368        if interactive:
369            self.search_forwards()
370        return True
371
372    def save_search_matched_line(self, log_index: int) -> None:
373        """Save the log_index at position as a matched line."""
374        self.search_matched_lines[log_index] = 0
375        # Keep matched lines sorted by position
376        self.search_matched_lines = {
377            # Save this log_index and its match number.
378            log_index: match_number
379            for match_number, log_index in enumerate(
380                sorted(self.search_matched_lines.keys())
381            )
382        }
383
384    def disable_search_highlighting(self):
385        self.log_pane.log_view.search_highlight = False
386
387    def _restart_filtering(self):
388        # Turn on follow
389        if not self.follow:
390            self.toggle_follow()
391
392        # Reset filtered logs.
393        self.filtered_logs.clear()
394        # Reset scrollback start
395        self._scrollback_start_index = 0
396
397        # Start filtering existing log lines.
398        self.filter_existing_logs_task = asyncio.create_task(
399            self.filter_past_logs()
400        )
401
402        # Reset existing search
403        self.clear_search()
404
405        # Trigger a main menu update to set log window menu titles.
406        self.log_pane.application.update_menu_items()
407        # Redraw the UI
408        self.log_pane.application.redraw_ui()
409
410    def install_new_filter(self):
411        """Set a filter using the current search_regex."""
412        if not self.search_filter:
413            return
414
415        self.filtering_on = True
416        self.filters[self.search_text] = copy.deepcopy(self.search_filter)
417
418        self.clear_search()
419
420    def apply_filter(self):
421        """Set new filter and schedule historical log filter asyncio task."""
422        if self.websocket_running:
423            return
424        self.install_new_filter()
425        self._restart_filtering()
426
427    def clear_search_highlighting(self):
428        self.search_highlight = False
429        self._reset_log_screen_on_next_render = True
430
431    def clear_search(self):
432        self.search_matched_lines = {}
433        self.search_text = None
434        self.search_filter = None
435        self.search_highlight = False
436        self._reset_log_screen_on_next_render = True
437
438    def _get_log_lines(self) -> Tuple[int, collections.deque[LogLine]]:
439        logs = self.log_store.logs
440        if self.filtering_on:
441            logs = self.filtered_logs
442        return self._scrollback_start_index, logs
443
444    def _get_visible_log_lines(self):
445        _, logs = self._get_log_lines()
446        if self._scrollback_start_index > 0:
447            return collections.deque(
448                itertools.islice(logs, self.hidden_line_count(), len(logs))
449            )
450        return logs
451
452    def _get_table_formatter(self) -> Optional[Callable]:
453        table_formatter = None
454        if self.log_pane.table_view:
455            table_formatter = self.log_store.table.formatted_row
456        return table_formatter
457
458    def delete_filter(self, filter_text):
459        if filter_text not in self.filters:
460            return
461
462        # Delete this filter
463        del self.filters[filter_text]
464
465        # If no filters left, stop filtering.
466        if len(self.filters) == 0:
467            self.clear_filters()
468        else:
469            # Erase existing filtered lines.
470            self._restart_filtering()
471
472    def clear_filters(self):
473        if not self.filtering_on:
474            return
475        self.clear_search()
476        self.filtering_on = False
477        self.filters: 'collections.OrderedDict[str, re.Pattern]' = (
478            collections.OrderedDict()
479        )
480        self.filtered_logs.clear()
481        # Reset scrollback start
482        self._scrollback_start_index = 0
483        if not self.follow:
484            self.toggle_follow()
485
486    async def count_search_matches(self):
487        """Count search matches and save their locations."""
488        # Wait for any filter_existing_logs_task to finish.
489        if self.filtering_on and self.filter_existing_logs_task:
490            await self.filter_existing_logs_task
491
492        starting_index = self.get_last_log_index()
493        ending_index, logs = self._get_log_lines()
494
495        # From the end of the log store to the beginning.
496        for i in range(starting_index, ending_index - 1, -1):
497            # Is this log a match?
498            if self.search_filter.matches(logs[i]):
499                self.save_search_matched_line(i)
500            # Pause every 100 lines or so
501            if i % 100 == 0:
502                await asyncio.sleep(0.1)
503
504    async def filter_past_logs(self):
505        """Filter past log lines."""
506        starting_index = self.log_store.get_last_log_index()
507        ending_index = -1
508
509        # From the end of the log store to the beginning.
510        for i in range(starting_index, ending_index, -1):
511            # Is this log a match?
512            if self.filter_scan(self.log_store.logs[i]):
513                # Add to the beginning of the deque.
514                self.filtered_logs.appendleft(self.log_store.logs[i])
515            # TODO(tonymd): Tune these values.
516            # Pause every 100 lines or so
517            if i % 100 == 0:
518                await asyncio.sleep(0.1)
519
520    def set_log_pane(self, log_pane: 'LogPane'):
521        """Set the parent LogPane instance."""
522        self.log_pane = log_pane
523
524    def _update_log_index(self) -> ScreenLine:
525        line_at_cursor = self.log_screen.get_line_at_cursor_position()
526        if line_at_cursor.log_index is not None:
527            self.log_index = line_at_cursor.log_index
528        return line_at_cursor
529
530    def get_current_line(self) -> int:
531        """Return the currently selected log event index."""
532        return self.log_index
533
534    def get_total_count(self):
535        """Total size of the logs store."""
536        return (
537            len(self.filtered_logs)
538            if self.filtering_on
539            else self.log_store.get_total_count()
540        )
541
542    def get_last_log_index(self):
543        total = self.get_total_count()
544        return 0 if total < 0 else total - 1
545
546    def clear_scrollback(self):
547        """Hide log lines before the max length of the stored logs."""
548        # Enable follow and scroll to the bottom, then clear.
549        if not self.follow:
550            self.toggle_follow()
551        self._scrollback_start_index = self.log_index
552        self._reset_log_screen_on_next_render = True
553
554    def hidden_line_count(self):
555        """Return the number of hidden lines."""
556        if self._scrollback_start_index > 0:
557            return self._scrollback_start_index + 1
558        return 0
559
560    def undo_clear_scrollback(self):
561        """Reset the current scrollback start index."""
562        self._scrollback_start_index = 0
563
564    def wrap_lines_enabled(self):
565        """Get the parent log pane wrap lines setting."""
566        if not self.log_pane:
567            return False
568        return self.log_pane.wrap_lines
569
570    def toggle_follow(self):
571        """Toggle auto line following."""
572        self.follow = not self.follow
573        if self.follow:
574            # Disable search match follow mode.
575            self.follow_search_match = False
576            self.scroll_to_bottom()
577
578    def filter_scan(self, log: 'LogLine'):
579        filter_match_count = 0
580        for _filter_text, log_filter in self.filters.items():
581            if log_filter.matches(log):
582                filter_match_count += 1
583            else:
584                break
585
586        if filter_match_count == len(self.filters):
587            return True
588        return False
589
590    def new_logs_arrived(self):
591        """Check newly arrived log messages.
592
593        Depending on where log statements occur ``new_logs_arrived`` may be in a
594        separate thread since it is triggerd by the Python log handler
595        ``emit()`` function. In this case the log handler is the LogStore
596        instance ``self.log_store``. This function should not redraw the screen
597        or scroll.
598        """
599        latest_total = self.log_store.get_total_count()
600
601        if self.filtering_on:
602            # Scan newly arived log lines
603            for i in range(self._last_log_store_index, latest_total):
604                if self.filter_scan(self.log_store.logs[i]):
605                    self.filtered_logs.append(self.log_store.logs[i])
606
607        if self.search_filter:
608            last_matched_log: Optional[int] = None
609            # Scan newly arived log lines
610            for i in range(self._last_log_store_index, latest_total):
611                if self.search_filter.matches(self.log_store.logs[i]):
612                    self.save_search_matched_line(i)
613                    last_matched_log = i
614            if last_matched_log and self.follow_search_match:
615                # Set the follow event flag for the next render_content call.
616                self.follow_event = FollowEvent.SEARCH_MATCH
617                self.last_search_matched_log = last_matched_log
618
619        self._last_log_store_index = latest_total
620        self._new_logs_since_last_render = True
621        self._new_logs_since_last_websocket_serve = True
622
623        if self.follow:
624            # Set the follow event flag for the next render_content call.
625            self.follow_event = FollowEvent.STICKY_FOLLOW
626
627        if self.websocket_running:
628            # No terminal screen redraws are required.
629            return
630
631        # Trigger a UI update if the log window is visible.
632        if self.log_pane.show_pane:
633            self.log_pane.application.logs_redraw()
634
635    def get_cursor_position(self) -> Point:
636        """Return the position of the cursor."""
637        return Point(0, self.log_screen.cursor_position)
638
639    def scroll_to_top(self):
640        """Move selected index to the beginning."""
641        # Stop following so cursor doesn't jump back down to the bottom.
642        self.follow = False
643        # First possible log index that should be displayed
644        log_beginning_index = self.hidden_line_count()
645        self.log_index = log_beginning_index
646        self.log_screen.reset_logs(log_index=self.log_index)
647        self.log_screen.shift_selected_log_to_top()
648        self._user_scroll_event = True
649
650    def move_selected_line_to_top(self):
651        self.follow = False
652
653        # Update selected line
654        self._update_log_index()
655
656        self.log_screen.reset_logs(log_index=self.log_index)
657        self.log_screen.shift_selected_log_to_top()
658        self._user_scroll_event = True
659
660    def center_log_line(self):
661        self.follow = False
662
663        # Update selected line
664        self._update_log_index()
665
666        self.log_screen.reset_logs(log_index=self.log_index)
667        self.log_screen.shift_selected_log_to_center()
668        self._user_scroll_event = True
669
670    def scroll_to_bottom(self, with_sticky_follow: bool = True):
671        """Move selected index to the end."""
672        # Don't change following state like scroll_to_top.
673        self.log_index = max(0, self.get_last_log_index())
674        self.log_screen.reset_logs(log_index=self.log_index)
675
676        # Sticky follow mode
677        if with_sticky_follow:
678            self.follow = True
679        self._user_scroll_event = True
680
681    def scroll(self, lines) -> None:
682        """Scroll up or down by plus or minus lines.
683
684        This method is only called by user keybindings.
685        """
686        # If the user starts scrolling, stop auto following.
687        self.follow = False
688
689        self.log_screen.scroll_subline(lines)
690        self._user_scroll_event = True
691
692        # Update the current log
693        current_line = self._update_log_index()
694
695        # Don't check for sticky follow mode if selecting lines.
696        if self.visual_select_mode:
697            return
698        # Is the last log line selected?
699        if self.log_index == self.get_last_log_index():
700            # Is the last line of the current log selected?
701            if current_line.subline + 1 == current_line.height:
702                # Sticky follow mode
703                self.follow = True
704
705    def visual_selected_log_count(self) -> int:
706        if self.marked_logs_start is None or self.marked_logs_end is None:
707            return 0
708        return (self.marked_logs_end - self.marked_logs_start) + 1
709
710    def clear_visual_selection(self) -> None:
711        self.marked_logs_start = None
712        self.marked_logs_end = None
713        self.visual_select_mode = False
714        self._user_scroll_event = True
715        self.log_pane.application.redraw_ui()
716
717    def visual_select_all(self) -> None:
718        self.marked_logs_start = self._scrollback_start_index
719        self.marked_logs_end = self.get_total_count() - 1
720
721        self.visual_select_mode = True
722        self._user_scroll_event = True
723        self.log_pane.application.redraw_ui()
724
725    def visual_select_up(self) -> None:
726        # Select the current line
727        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
728        # Move the cursor by 1
729        self.scroll_up(1)
730        # Select the new line
731        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
732
733    def visual_select_down(self) -> None:
734        # Select the current line
735        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
736        # Move the cursor by 1
737        self.scroll_down(1)
738        # Select the new line
739        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
740
741    def visual_select_line(
742        self, mouse_position: Point, autoscroll: bool = True
743    ) -> None:
744        """Mark the log under mouse_position as visually selected."""
745        # Check mouse_position is valid
746        if not 0 <= mouse_position.y < len(self.log_screen.line_buffer):
747            return
748        # Update mode flags
749        self.visual_select_mode = True
750        self.follow = False
751        # Get the ScreenLine for the cursor position
752        screen_line = self.log_screen.line_buffer[mouse_position.y]
753        if screen_line.log_index is None:
754            return
755
756        if self.marked_logs_start is None:
757            self.marked_logs_start = screen_line.log_index
758        if self.marked_logs_end is None:
759            self.marked_logs_end = screen_line.log_index
760
761        if screen_line.log_index < self.marked_logs_start:
762            self.marked_logs_start = screen_line.log_index
763        elif screen_line.log_index > self.marked_logs_end:
764            self.marked_logs_end = screen_line.log_index
765
766        # Update cursor position
767        self.log_screen.move_cursor_to_position(mouse_position.y)
768
769        # Autoscroll when mouse dragging on the top or bottom of the window.
770        if autoscroll:
771            if mouse_position.y == 0:
772                self.scroll_up(1)
773            elif mouse_position.y == self._window_height - 1:
774                self.scroll_down(1)
775
776        # Trigger a rerender.
777        self._user_scroll_event = True
778        self.log_pane.application.redraw_ui()
779
780    def scroll_to_position(self, mouse_position: Point):
781        """Set the selected log line to the mouse_position."""
782        # Disable follow mode when the user clicks or mouse drags on a log line.
783        self.follow = False
784
785        self.log_screen.move_cursor_to_position(mouse_position.y)
786        self._update_log_index()
787
788        self._user_scroll_event = True
789
790    def scroll_up_one_page(self):
791        """Move the selected log index up by one window height."""
792        lines = 1
793        if self._window_height > 0:
794            lines = self._window_height
795        self.scroll(-1 * lines)
796
797    def scroll_down_one_page(self):
798        """Move the selected log index down by one window height."""
799        lines = 1
800        if self._window_height > 0:
801            lines = self._window_height
802        self.scroll(lines)
803
804    def scroll_down(self, lines=1):
805        """Move the selected log index down by one or more lines."""
806        self.scroll(lines)
807
808    def scroll_up(self, lines=1):
809        """Move the selected log index up by one or more lines."""
810        self.scroll(-1 * lines)
811
812    def log_start_end_indexes_changed(self) -> bool:
813        return (
814            self._last_start_index != self._current_start_index
815            or self._last_end_index != self._current_end_index
816        )
817
818    def render_table_header(self):
819        """Get pre-formatted table header."""
820        return self.log_store.render_table_header()
821
822    def get_web_socket_url(self):
823        return f'http://127.0.0.1:3000/#ws={self.websocket_port}'
824
825    def render_content(self) -> List:
826        """Return logs to display on screen as a list of FormattedText tuples.
827
828        This function determines when the log screen requires re-rendeing based
829        on user scroll events, follow mode being on, or log pane being
830        empty. The FormattedText tuples passed to prompt_toolkit are cached if
831        no updates are required.
832        """
833        screen_update_needed = False
834
835        # Disable rendering if user is viewing logs on web
836        if self.websocket_running:
837            return []
838
839        # Check window size
840        if self.log_pane.pane_resized():
841            self._window_width = self.log_pane.current_log_pane_width
842            self._window_height = self.log_pane.current_log_pane_height
843            self.log_screen.resize(self._window_width, self._window_height)
844            self._reset_log_screen_on_next_render = True
845
846        if self.follow_event is not None:
847            if (
848                self.follow_event == FollowEvent.SEARCH_MATCH
849                and self.last_search_matched_log
850            ):
851                self.log_index = self.last_search_matched_log
852                self.last_search_matched_log = None
853                self._reset_log_screen_on_next_render = True
854
855            elif self.follow_event == FollowEvent.STICKY_FOLLOW:
856                # Jump to the last log message
857                self.log_index = max(0, self.get_last_log_index())
858
859            self.follow_event = None
860            screen_update_needed = True
861
862        if self._reset_log_screen_on_next_render or self.log_screen.empty():
863            # Clear the reset flag.
864            self._reset_log_screen_on_next_render = False
865            self.log_screen.reset_logs(log_index=self.log_index)
866            screen_update_needed = True
867
868        elif self.follow and self._new_logs_since_last_render:
869            # Follow mode is on so add new logs to the screen
870            self._new_logs_since_last_render = False
871
872            current_log_index = self.log_index
873            last_rendered_log_index = self.log_screen.last_appended_log_index
874            # If so many logs have arrived than can fit on the screen, redraw
875            # the whole screen from the new position.
876            if (
877                current_log_index - last_rendered_log_index
878            ) > self.log_screen.height:
879                self.log_screen.reset_logs(log_index=self.log_index)
880            # A small amount of logs have arrived, append them one at a time
881            # without redrawing the whole screen.
882            else:
883                for i in range(
884                    last_rendered_log_index + 1, current_log_index + 1
885                ):
886                    self.log_screen.append_log(i)
887
888            screen_update_needed = True
889
890        if self.follow:
891            # Select the last line for follow mode.
892            self.log_screen.move_cursor_to_bottom()
893            screen_update_needed = True
894
895        if self._user_scroll_event:
896            self._user_scroll_event = False
897            screen_update_needed = True
898
899        if screen_update_needed:
900            self._line_fragment_cache = self.log_screen.get_lines(
901                marked_logs_start=self.marked_logs_start,
902                marked_logs_end=self.marked_logs_end,
903            )
904        return self._line_fragment_cache
905
906    def _logs_to_text(
907        self,
908        use_table_formatting: bool = True,
909        selected_lines_only: bool = False,
910    ) -> str:
911        """Convert all or selected log messages to plaintext."""
912
913        def get_table_string(log: LogLine) -> str:
914            return remove_formatting(self.log_store.table.formatted_row(log))
915
916        formatter: Callable[[LogLine], str] = operator.attrgetter(
917            'ansi_stripped_log'
918        )
919        if use_table_formatting:
920            formatter = get_table_string
921
922        _start_log_index, log_source = self._get_log_lines()
923
924        log_index_range = range(
925            self._scrollback_start_index, self.get_total_count()
926        )
927        if (
928            selected_lines_only
929            and self.marked_logs_start is not None
930            and self.marked_logs_end is not None
931        ):
932            log_index_range = range(
933                self.marked_logs_start, self.marked_logs_end + 1
934            )
935
936        text_output = ''
937        for i in log_index_range:
938            log_text = formatter(log_source[i])
939            text_output += log_text
940            if not log_text.endswith('\n'):
941                text_output += '\n'
942
943        return text_output
944
945    def export_logs(
946        self,
947        use_table_formatting: bool = True,
948        selected_lines_only: bool = False,
949        file_name: Optional[str] = None,
950        to_clipboard: bool = False,
951        add_markdown_fence: bool = False,
952    ) -> bool:
953        """Export log lines to file or clipboard."""
954        text_output = self._logs_to_text(
955            use_table_formatting, selected_lines_only
956        )
957
958        if file_name:
959            target_path = Path(file_name).expanduser()
960            with target_path.open('w') as output_file:
961                output_file.write(text_output)
962            _LOG.debug('Saved to file: %s', file_name)
963
964        elif to_clipboard:
965            if add_markdown_fence:
966                text_output = '```\n' + text_output + '```\n'
967            self.log_pane.application.application.clipboard.set_text(
968                text_output
969            )
970            _LOG.debug('Copied logs to clipboard.')
971
972        return True
973