• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""LogView maintains a log pane's scrolling and searching state."""
15
16from __future__ import annotations
17import asyncio
18import collections
19import copy
20from enum import Enum
21import itertools
22import json
23import logging
24import operator
25from pathlib import Path
26import re
27from threading import Thread
28from typing import Callable, TYPE_CHECKING
29
30from prompt_toolkit.data_structures import Point
31from prompt_toolkit.formatted_text import StyleAndTextTuples
32import websockets
33
34from pw_console.log_filter import (
35    DEFAULT_SEARCH_MATCHER,
36    LogFilter,
37    RegexValidator,
38    SearchMatcher,
39    preprocess_search_regex,
40)
41from pw_console.log_screen import ScreenLine, LogScreen
42from pw_console.log_store import LogStore
43from pw_console.python_logging import log_record_to_json
44from pw_console.text_formatting import remove_formatting
45
46if TYPE_CHECKING:
47    from pw_console.console_app import ConsoleApp
48    from pw_console.log_line import LogLine
49    from pw_console.log_pane import LogPane
50
51_LOG = logging.getLogger(__package__)
52
53
54class FollowEvent(Enum):
55    """Follow mode scroll event types."""
56
57    SEARCH_MATCH = 'scroll_to_bottom'
58    STICKY_FOLLOW = 'scroll_to_bottom_with_sticky_follow'
59
60
61class LogView:
62    """Viewing window into a LogStore."""
63
64    # pylint: disable=too-many-instance-attributes,too-many-public-methods
65
66    def __init__(
67        self,
68        log_pane: LogPane,
69        application: ConsoleApp,
70        log_store: LogStore | None = None,
71    ):
72        # Parent LogPane reference. Updated by calling `set_log_pane()`.
73        self.log_pane = log_pane
74        self.log_store = (
75            log_store if log_store else LogStore(prefs=application.prefs)
76        )
77        self.log_store.set_prefs(application.prefs)
78        self.log_store.register_viewer(self)
79
80        self.marked_logs_start: int | None = None
81        self.marked_logs_end: int | None = None
82
83        # Search variables
84        self.search_text: str | None = None
85        self.search_filter: LogFilter | None = None
86        self.search_highlight: bool = False
87        self.search_matcher = DEFAULT_SEARCH_MATCHER
88        self.search_validator = RegexValidator()
89
90        # Container for each log_index matched by active searches.
91        self.search_matched_lines: dict[int, int] = {}
92        # Background task to find historical matched lines.
93        self.search_match_count_task: asyncio.Task | None = None
94
95        # Flag for automatically jumping to each new search match as they
96        # appear.
97        self.follow_search_match: bool = False
98        self.last_search_matched_log: int | None = None
99
100        # Follow event flag. This is set to by the new_logs_arrived() function
101        # as a signal that the log screen should be scrolled to the bottom.
102        # This is read by render_content() whenever the screen is drawn.
103        self.follow_event: FollowEvent | None = None
104
105        self.log_screen = LogScreen(
106            get_log_source=self._get_log_lines,
107            get_line_wrapping=self.wrap_lines_enabled,
108            get_log_formatter=self._get_table_formatter,
109            get_search_filter=lambda: self.search_filter,
110            get_search_highlight=lambda: self.search_highlight,
111        )
112
113        # Filter
114        self.filtering_on: bool = False
115        self.filters: collections.OrderedDict[
116            str, LogFilter
117        ] = collections.OrderedDict()
118        self.filtered_logs: collections.deque = collections.deque()
119        self.filter_existing_logs_task: asyncio.Task | None = None
120
121        # Current log line index state variables:
122        self._last_log_index = -1
123        self._log_index = 0
124        self._filtered_log_index = 0
125        self._last_start_index = 0
126        self._last_end_index = 0
127        self._current_start_index = 0
128        self._current_end_index = 0
129        self._scrollback_start_index = 0
130
131        # LogPane prompt_toolkit container render size.
132        self._window_height = 20
133        self._window_width = 80
134        self._reset_log_screen_on_next_render: bool = True
135        self._user_scroll_event: bool = False
136
137        self._last_log_store_index = 0
138        self._new_logs_since_last_render = True
139        self._new_logs_since_last_websocket_serve = True
140        self._last_served_websocket_index = -1
141
142        # Should new log lines be tailed?
143        self.follow: bool = True
144
145        self.visual_select_mode: bool = False
146
147        # Cache of formatted text tuples used in the last UI render.
148        self._line_fragment_cache: list[StyleAndTextTuples] = []
149
150        # websocket server variables
151        self.websocket_running: bool = False
152        self.websocket_server = None
153        self.websocket_port = None
154        self.websocket_loop = asyncio.new_event_loop()
155
156        # Check if any logs are already in the log_store and update the view.
157        self.new_logs_arrived()
158
159    def _websocket_thread_entry(self):
160        """Entry point for the user code thread."""
161        asyncio.set_event_loop(self.websocket_loop)
162        self.websocket_server = websockets.serve(  # type: ignore # pylint: disable=no-member
163            self._send_logs_over_websockets, '127.0.0.1'
164        )
165        self.websocket_loop.run_until_complete(self.websocket_server)
166        self.websocket_port = self.websocket_server.ws_server.sockets[
167            0
168        ].getsockname()[1]
169        self.websocket_running = True
170        self.websocket_loop.run_forever()
171
172    def start_websocket_thread(self):
173        """Create a thread for running user code so the UI isn't blocked."""
174        thread = Thread(
175            target=self._websocket_thread_entry, args=(), daemon=True
176        )
177        thread.start()
178
179    def stop_websocket_thread(self):
180        """Stop websocket server."""
181        if self.websocket_running:
182            self.websocket_loop.call_soon_threadsafe(self.websocket_loop.stop)
183            self.websocket_server = None
184            self.websocket_port = None
185            self.websocket_running = False
186            if self.filtering_on:
187                self._restart_filtering()
188
189    async def _send_logs_over_websockets(self, websocket, _path) -> None:
190        formatter: Callable[[LogLine], str] = operator.attrgetter(
191            'ansi_stripped_log'
192        )
193        formatter = lambda log: log_record_to_json(log.record)
194
195        theme_colors = json.dumps(
196            self.log_pane.application.prefs.pw_console_color_config()
197        )
198        # Send colors
199        await websocket.send(theme_colors)
200
201        while True:
202            # Wait for new logs
203            if not self._new_logs_since_last_websocket_serve:
204                await asyncio.sleep(0.5)
205
206            _start_log_index, log_source = self._get_log_lines()
207            log_index_range = range(
208                self._last_served_websocket_index + 1, self.get_total_count()
209            )
210
211            for i in log_index_range:
212                log_text = formatter(log_source[i])
213                await websocket.send(log_text)
214                self._last_served_websocket_index = i
215
216            # Flag that all logs have been served.
217            self._new_logs_since_last_websocket_serve = False
218
219    def view_mode_changed(self) -> None:
220        self._reset_log_screen_on_next_render = True
221
222    @property
223    def log_index(self):
224        if self.filtering_on:
225            return self._filtered_log_index
226        return self._log_index
227
228    @log_index.setter
229    def log_index(self, new_log_index):
230        # Save the old log_index
231        self._last_log_index = self.log_index
232        if self.filtering_on:
233            self._filtered_log_index = new_log_index
234        else:
235            self._log_index = new_log_index
236
237    def _reset_log_index_changed(self) -> None:
238        self._last_log_index = self.log_index
239
240    def log_index_changed_since_last_render(self) -> bool:
241        return self._last_log_index != self.log_index
242
243    def _set_match_position(self, position: int):
244        self.follow = False
245        self.log_index = position
246        self.save_search_matched_line(position)
247        self.log_screen.reset_logs(log_index=self.log_index)
248        self.log_screen.shift_selected_log_to_center()
249        self._user_scroll_event = True
250        self.log_pane.application.redraw_ui()
251
252    def select_next_search_matcher(self):
253        matchers = list(SearchMatcher)
254        index = matchers.index(self.search_matcher)
255        new_index = (index + 1) % len(matchers)
256        self.search_matcher = matchers[new_index]
257
258    def search_forwards(self):
259        if not self.search_filter:
260            return
261        self.search_highlight = True
262
263        log_beginning_index = self.hidden_line_count()
264
265        starting_index = self.log_index + 1
266        if starting_index > self.get_last_log_index():
267            starting_index = log_beginning_index
268
269        _, logs = self._get_log_lines()
270
271        # From current position +1 and down
272        for i in range(starting_index, self.get_last_log_index() + 1):
273            if self.search_filter.matches(logs[i]):
274                self._set_match_position(i)
275                return
276
277        # From the beginning to the original start
278        for i in range(log_beginning_index, starting_index):
279            if self.search_filter.matches(logs[i]):
280                self._set_match_position(i)
281                return
282
283    def search_backwards(self):
284        if not self.search_filter:
285            return
286        self.search_highlight = True
287
288        log_beginning_index = self.hidden_line_count()
289
290        starting_index = self.log_index - 1
291        if starting_index < 0:
292            starting_index = self.get_last_log_index()
293
294        _, logs = self._get_log_lines()
295
296        # From current position - 1 and up
297        for i in range(starting_index, log_beginning_index - 1, -1):
298            if self.search_filter.matches(logs[i]):
299                self._set_match_position(i)
300                return
301
302        # From the end to the original start
303        for i in range(self.get_last_log_index(), starting_index, -1):
304            if self.search_filter.matches(logs[i]):
305                self._set_match_position(i)
306                return
307
308    def set_search_regex(
309        self, text, invert, field, matcher: SearchMatcher | None = None
310    ) -> bool:
311        search_matcher = matcher if matcher else self.search_matcher
312        _LOG.debug(search_matcher)
313
314        regex_text, regex_flags = preprocess_search_regex(
315            text, matcher=search_matcher
316        )
317
318        try:
319            compiled_regex = re.compile(regex_text, regex_flags)
320            self.search_filter = LogFilter(
321                regex=compiled_regex,
322                input_text=text,
323                invert=invert,
324                field=field,
325            )
326            _LOG.debug(self.search_filter)
327        except re.error as error:
328            _LOG.debug(error)
329            return False
330
331        self.search_highlight = True
332        self.search_text = regex_text
333        return True
334
335    def new_search(
336        self,
337        text,
338        invert=False,
339        field: str | None = None,
340        search_matcher: str | None = None,
341        interactive: bool = True,
342    ) -> bool:
343        """Start a new search for the given text."""
344        valid_matchers = list(s.name for s in SearchMatcher)
345        selected_matcher: SearchMatcher | None = None
346        if (
347            search_matcher is not None
348            and search_matcher.upper() in valid_matchers
349        ):
350            selected_matcher = SearchMatcher(search_matcher.upper())
351
352        if not self.set_search_regex(text, invert, field, selected_matcher):
353            return False
354
355        # Clear matched lines
356        self.search_matched_lines = {}
357
358        if interactive:
359            # Start count historical search matches task.
360            self.search_match_count_task = asyncio.create_task(
361                self.count_search_matches()
362            )
363
364        # Default search direction when hitting enter in the search bar.
365        if interactive:
366            self.search_forwards()
367        return True
368
369    def save_search_matched_line(self, log_index: int) -> None:
370        """Save the log_index at position as a matched line."""
371        self.search_matched_lines[log_index] = 0
372        # Keep matched lines sorted by position
373        self.search_matched_lines = {
374            # Save this log_index and its match number.
375            log_index: match_number
376            for match_number, log_index in enumerate(
377                sorted(self.search_matched_lines.keys())
378            )
379        }
380
381    def disable_search_highlighting(self):
382        self.log_pane.log_view.search_highlight = False
383
384    def _restart_filtering(self):
385        # Turn on follow
386        if not self.follow:
387            self.toggle_follow()
388
389        # Reset filtered logs.
390        self.filtered_logs.clear()
391        # Reset scrollback start
392        self._scrollback_start_index = 0
393
394        # Start filtering existing log lines.
395        self.filter_existing_logs_task = asyncio.create_task(
396            self.filter_past_logs()
397        )
398
399        # Reset existing search
400        self.clear_search()
401
402        # Trigger a main menu update to set log window menu titles.
403        self.log_pane.application.update_menu_items()
404        # Redraw the UI
405        self.log_pane.application.redraw_ui()
406
407    def install_new_filter(self):
408        """Set a filter using the current search_regex."""
409        if not self.search_filter:
410            return
411
412        self.filtering_on = True
413        self.filters[self.search_text] = copy.deepcopy(self.search_filter)
414
415        self.clear_search()
416
417    def apply_filter(self):
418        """Set new filter and schedule historical log filter asyncio task."""
419        if self.websocket_running:
420            return
421        self.install_new_filter()
422        self._restart_filtering()
423
424    def clear_search_highlighting(self):
425        self.search_highlight = False
426        self._reset_log_screen_on_next_render = True
427
428    def clear_search(self):
429        self.search_matched_lines = {}
430        self.search_text = None
431        self.search_filter = None
432        self.search_highlight = False
433        self._reset_log_screen_on_next_render = True
434
435    def _get_log_lines(self) -> tuple[int, collections.deque[LogLine]]:
436        logs = self.log_store.logs
437        if self.filtering_on:
438            logs = self.filtered_logs
439        return self._scrollback_start_index, logs
440
441    def _get_visible_log_lines(self):
442        _, logs = self._get_log_lines()
443        if self._scrollback_start_index > 0:
444            return collections.deque(
445                itertools.islice(logs, self.hidden_line_count(), len(logs))
446            )
447        return logs
448
449    def _get_table_formatter(self) -> Callable | None:
450        table_formatter = None
451        if self.log_pane.table_view:
452            table_formatter = self.log_store.table.formatted_row
453        return table_formatter
454
455    def delete_filter(self, filter_text):
456        if filter_text not in self.filters:
457            return
458
459        # Delete this filter
460        del self.filters[filter_text]
461
462        # If no filters left, stop filtering.
463        if len(self.filters) == 0:
464            self.clear_filters()
465        else:
466            # Erase existing filtered lines.
467            self._restart_filtering()
468
469    def clear_filters(self):
470        if not self.filtering_on:
471            return
472        self.clear_search()
473        self.filtering_on = False
474        self.filters: collections.OrderedDict[
475            str, re.Pattern
476        ] = collections.OrderedDict()
477        self.filtered_logs.clear()
478        # Reset scrollback start
479        self._scrollback_start_index = 0
480        if not self.follow:
481            self.toggle_follow()
482
483    async def count_search_matches(self):
484        """Count search matches and save their locations."""
485        # Wait for any filter_existing_logs_task to finish.
486        if self.filtering_on and self.filter_existing_logs_task:
487            await self.filter_existing_logs_task
488
489        starting_index = self.get_last_log_index()
490        ending_index, logs = self._get_log_lines()
491
492        # From the end of the log store to the beginning.
493        for i in range(starting_index, ending_index - 1, -1):
494            # Is this log a match?
495            if self.search_filter.matches(logs[i]):
496                self.save_search_matched_line(i)
497            # Pause every 100 lines or so
498            if i % 100 == 0:
499                await asyncio.sleep(0.1)
500
501    async def filter_past_logs(self):
502        """Filter past log lines."""
503        starting_index = self.log_store.get_last_log_index()
504        ending_index = -1
505
506        # From the end of the log store to the beginning.
507        for i in range(starting_index, ending_index, -1):
508            # Is this log a match?
509            if self.filter_scan(self.log_store.logs[i]):
510                # Add to the beginning of the deque.
511                self.filtered_logs.appendleft(self.log_store.logs[i])
512            # TODO(tonymd): Tune these values.
513            # Pause every 100 lines or so
514            if i % 100 == 0:
515                await asyncio.sleep(0.1)
516
517    def set_log_pane(self, log_pane: LogPane):
518        """Set the parent LogPane instance."""
519        self.log_pane = log_pane
520
521    def _update_log_index(self) -> ScreenLine:
522        line_at_cursor = self.log_screen.get_line_at_cursor_position()
523        if line_at_cursor.log_index is not None:
524            self.log_index = line_at_cursor.log_index
525        return line_at_cursor
526
527    def get_current_line(self) -> int:
528        """Return the currently selected log event index."""
529        return self.log_index
530
531    def get_total_count(self):
532        """Total size of the logs store."""
533        return (
534            len(self.filtered_logs)
535            if self.filtering_on
536            else self.log_store.get_total_count()
537        )
538
539    def get_last_log_index(self):
540        total = self.get_total_count()
541        return 0 if total < 0 else total - 1
542
543    def clear_scrollback(self):
544        """Hide log lines before the max length of the stored logs."""
545        # Enable follow and scroll to the bottom, then clear.
546        if not self.follow:
547            self.toggle_follow()
548        self._scrollback_start_index = self.log_index
549        self._reset_log_screen_on_next_render = True
550
551    def hidden_line_count(self):
552        """Return the number of hidden lines."""
553        if self._scrollback_start_index > 0:
554            return self._scrollback_start_index + 1
555        return 0
556
557    def undo_clear_scrollback(self):
558        """Reset the current scrollback start index."""
559        self._scrollback_start_index = 0
560
561    def wrap_lines_enabled(self):
562        """Get the parent log pane wrap lines setting."""
563        if not self.log_pane:
564            return False
565        return self.log_pane.wrap_lines
566
567    def toggle_follow(self):
568        """Toggle auto line following."""
569        self.follow = not self.follow
570        if self.follow:
571            # Disable search match follow mode.
572            self.follow_search_match = False
573            self.scroll_to_bottom()
574
575    def filter_scan(self, log: LogLine):
576        filter_match_count = 0
577        for _filter_text, log_filter in self.filters.items():
578            if log_filter.matches(log):
579                filter_match_count += 1
580            else:
581                break
582
583        if filter_match_count == len(self.filters):
584            return True
585        return False
586
587    def new_logs_arrived(self):
588        """Check newly arrived log messages.
589
590        Depending on where log statements occur ``new_logs_arrived`` may be in a
591        separate thread since it is triggerd by the Python log handler
592        ``emit()`` function. In this case the log handler is the LogStore
593        instance ``self.log_store``. This function should not redraw the screen
594        or scroll.
595        """
596        latest_total = self.log_store.get_total_count()
597
598        if self.filtering_on:
599            # Scan newly arived log lines
600            for i in range(self._last_log_store_index, latest_total):
601                if self.filter_scan(self.log_store.logs[i]):
602                    self.filtered_logs.append(self.log_store.logs[i])
603
604        if self.search_filter:
605            last_matched_log: int | None = None
606            # Scan newly arived log lines
607            for i in range(self._last_log_store_index, latest_total):
608                if self.search_filter.matches(self.log_store.logs[i]):
609                    self.save_search_matched_line(i)
610                    last_matched_log = i
611            if last_matched_log and self.follow_search_match:
612                # Set the follow event flag for the next render_content call.
613                self.follow_event = FollowEvent.SEARCH_MATCH
614                self.last_search_matched_log = last_matched_log
615
616        self._last_log_store_index = latest_total
617        self._new_logs_since_last_render = True
618        self._new_logs_since_last_websocket_serve = True
619
620        if self.follow:
621            # Set the follow event flag for the next render_content call.
622            self.follow_event = FollowEvent.STICKY_FOLLOW
623
624        if self.websocket_running:
625            # No terminal screen redraws are required.
626            return
627
628        # Trigger a UI update if the log window is visible.
629        if self.log_pane.show_pane:
630            self.log_pane.application.logs_redraw()
631
632    def get_cursor_position(self) -> Point:
633        """Return the position of the cursor."""
634        return Point(0, self.log_screen.cursor_position)
635
636    def scroll_to_top(self):
637        """Move selected index to the beginning."""
638        # Stop following so cursor doesn't jump back down to the bottom.
639        self.follow = False
640        # First possible log index that should be displayed
641        log_beginning_index = self.hidden_line_count()
642        self.log_index = log_beginning_index
643        self.log_screen.reset_logs(log_index=self.log_index)
644        self.log_screen.shift_selected_log_to_top()
645        self._user_scroll_event = True
646
647    def move_selected_line_to_top(self):
648        self.follow = False
649
650        # Update selected line
651        self._update_log_index()
652
653        self.log_screen.reset_logs(log_index=self.log_index)
654        self.log_screen.shift_selected_log_to_top()
655        self._user_scroll_event = True
656
657    def center_log_line(self):
658        self.follow = False
659
660        # Update selected line
661        self._update_log_index()
662
663        self.log_screen.reset_logs(log_index=self.log_index)
664        self.log_screen.shift_selected_log_to_center()
665        self._user_scroll_event = True
666
667    def scroll_to_bottom(self, with_sticky_follow: bool = True):
668        """Move selected index to the end."""
669        # Don't change following state like scroll_to_top.
670        self.log_index = max(0, self.get_last_log_index())
671        self.log_screen.reset_logs(log_index=self.log_index)
672
673        # Sticky follow mode
674        if with_sticky_follow:
675            self.follow = True
676        self._user_scroll_event = True
677
678    def scroll(self, lines) -> None:
679        """Scroll up or down by plus or minus lines.
680
681        This method is only called by user keybindings.
682        """
683        # If the user starts scrolling, stop auto following.
684        self.follow = False
685
686        self.log_screen.scroll_subline(lines)
687        self._user_scroll_event = True
688
689        # Update the current log
690        current_line = self._update_log_index()
691
692        # Don't check for sticky follow mode if selecting lines.
693        if self.visual_select_mode:
694            return
695        # Is the last log line selected?
696        if self.log_index == self.get_last_log_index():
697            # Is the last line of the current log selected?
698            if current_line.subline + 1 == current_line.height:
699                # Sticky follow mode
700                self.follow = True
701
702    def visual_selected_log_count(self) -> int:
703        if self.marked_logs_start is None or self.marked_logs_end is None:
704            return 0
705        return (self.marked_logs_end - self.marked_logs_start) + 1
706
707    def clear_visual_selection(self) -> None:
708        self.marked_logs_start = None
709        self.marked_logs_end = None
710        self.visual_select_mode = False
711        self._user_scroll_event = True
712        self.log_pane.application.redraw_ui()
713
714    def visual_select_all(self) -> None:
715        self.marked_logs_start = self._scrollback_start_index
716        self.marked_logs_end = self.get_total_count() - 1
717
718        self.visual_select_mode = True
719        self._user_scroll_event = True
720        self.log_pane.application.redraw_ui()
721
722    def visual_select_up(self) -> None:
723        # Select the current line
724        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
725        # Move the cursor by 1
726        self.scroll_up(1)
727        # Select the new line
728        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
729
730    def visual_select_down(self) -> None:
731        # Select the current line
732        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
733        # Move the cursor by 1
734        self.scroll_down(1)
735        # Select the new line
736        self.visual_select_line(self.get_cursor_position(), autoscroll=False)
737
738    def visual_select_line(
739        self, mouse_position: Point, autoscroll: bool = True
740    ) -> None:
741        """Mark the log under mouse_position as visually selected."""
742        # Check mouse_position is valid
743        if not 0 <= mouse_position.y < len(self.log_screen.line_buffer):
744            return
745        # Update mode flags
746        self.visual_select_mode = True
747        self.follow = False
748        # Get the ScreenLine for the cursor position
749        screen_line = self.log_screen.line_buffer[mouse_position.y]
750        if screen_line.log_index is None:
751            return
752
753        if self.marked_logs_start is None:
754            self.marked_logs_start = screen_line.log_index
755        if self.marked_logs_end is None:
756            self.marked_logs_end = screen_line.log_index
757
758        if screen_line.log_index < self.marked_logs_start:
759            self.marked_logs_start = screen_line.log_index
760        elif screen_line.log_index > self.marked_logs_end:
761            self.marked_logs_end = screen_line.log_index
762
763        # Update cursor position
764        self.log_screen.move_cursor_to_position(mouse_position.y)
765
766        # Autoscroll when mouse dragging on the top or bottom of the window.
767        if autoscroll:
768            if mouse_position.y == 0:
769                self.scroll_up(1)
770            elif mouse_position.y == self._window_height - 1:
771                self.scroll_down(1)
772
773        # Trigger a rerender.
774        self._user_scroll_event = True
775        self.log_pane.application.redraw_ui()
776
777    def scroll_to_position(self, mouse_position: Point):
778        """Set the selected log line to the mouse_position."""
779        # Disable follow mode when the user clicks or mouse drags on a log line.
780        self.follow = False
781
782        self.log_screen.move_cursor_to_position(mouse_position.y)
783        self._update_log_index()
784
785        self._user_scroll_event = True
786
787    def scroll_up_one_page(self):
788        """Move the selected log index up by one window height."""
789        lines = 1
790        if self._window_height > 0:
791            lines = self._window_height
792        self.scroll(-1 * lines)
793
794    def scroll_down_one_page(self):
795        """Move the selected log index down by one window height."""
796        lines = 1
797        if self._window_height > 0:
798            lines = self._window_height
799        self.scroll(lines)
800
801    def scroll_down(self, lines=1):
802        """Move the selected log index down by one or more lines."""
803        self.scroll(lines)
804
805    def scroll_up(self, lines=1):
806        """Move the selected log index up by one or more lines."""
807        self.scroll(-1 * lines)
808
809    def log_start_end_indexes_changed(self) -> bool:
810        return (
811            self._last_start_index != self._current_start_index
812            or self._last_end_index != self._current_end_index
813        )
814
815    def render_table_header(self):
816        """Get pre-formatted table header."""
817        return self.log_store.render_table_header()
818
819    def get_web_socket_url(self):
820        return f'http://127.0.0.1:3000/#ws={self.websocket_port}'
821
822    def render_content(self) -> list:
823        """Return logs to display on screen as a list of FormattedText tuples.
824
825        This function determines when the log screen requires re-rendeing based
826        on user scroll events, follow mode being on, or log pane being
827        empty. The FormattedText tuples passed to prompt_toolkit are cached if
828        no updates are required.
829        """
830        screen_update_needed = False
831
832        # Disable rendering if user is viewing logs on web
833        if self.websocket_running:
834            return []
835
836        # Check window size
837        if self.log_pane.pane_resized():
838            self._window_width = self.log_pane.current_log_pane_width
839            self._window_height = self.log_pane.current_log_pane_height
840            self.log_screen.resize(self._window_width, self._window_height)
841            self._reset_log_screen_on_next_render = True
842
843        if self.follow_event is not None:
844            if (
845                self.follow_event == FollowEvent.SEARCH_MATCH
846                and self.last_search_matched_log
847            ):
848                self.log_index = self.last_search_matched_log
849                self.last_search_matched_log = None
850                self._reset_log_screen_on_next_render = True
851
852            elif self.follow_event == FollowEvent.STICKY_FOLLOW:
853                # Jump to the last log message
854                self.log_index = max(0, self.get_last_log_index())
855
856            self.follow_event = None
857            screen_update_needed = True
858
859        if self._reset_log_screen_on_next_render or self.log_screen.empty():
860            # Clear the reset flag.
861            self._reset_log_screen_on_next_render = False
862            self.log_screen.reset_logs(log_index=self.log_index)
863            screen_update_needed = True
864
865        elif self.follow and self._new_logs_since_last_render:
866            # Follow mode is on so add new logs to the screen
867            self._new_logs_since_last_render = False
868
869            current_log_index = self.log_index
870            last_rendered_log_index = self.log_screen.last_appended_log_index
871            # If so many logs have arrived than can fit on the screen, redraw
872            # the whole screen from the new position.
873            if (
874                current_log_index - last_rendered_log_index
875            ) > self.log_screen.height:
876                self.log_screen.reset_logs(log_index=self.log_index)
877            # A small amount of logs have arrived, append them one at a time
878            # without redrawing the whole screen.
879            else:
880                for i in range(
881                    last_rendered_log_index + 1, current_log_index + 1
882                ):
883                    self.log_screen.append_log(i)
884
885            screen_update_needed = True
886
887        if self.follow:
888            # Select the last line for follow mode.
889            self.log_screen.move_cursor_to_bottom()
890            screen_update_needed = True
891
892        if self._user_scroll_event:
893            self._user_scroll_event = False
894            screen_update_needed = True
895
896        if screen_update_needed:
897            self._line_fragment_cache = self.log_screen.get_lines(
898                marked_logs_start=self.marked_logs_start,
899                marked_logs_end=self.marked_logs_end,
900            )
901        return self._line_fragment_cache
902
903    def _logs_to_text(
904        self,
905        use_table_formatting: bool = True,
906        selected_lines_only: bool = False,
907    ) -> str:
908        """Convert all or selected log messages to plaintext."""
909
910        def get_table_string(log: LogLine) -> str:
911            return remove_formatting(self.log_store.table.formatted_row(log))
912
913        formatter: Callable[[LogLine], str] = operator.attrgetter(
914            'ansi_stripped_log'
915        )
916        if use_table_formatting:
917            formatter = get_table_string
918
919        _start_log_index, log_source = self._get_log_lines()
920
921        log_index_range = range(
922            self._scrollback_start_index, self.get_total_count()
923        )
924        if (
925            selected_lines_only
926            and self.marked_logs_start is not None
927            and self.marked_logs_end is not None
928        ):
929            log_index_range = range(
930                self.marked_logs_start, self.marked_logs_end + 1
931            )
932
933        text_output = ''
934        for i in log_index_range:
935            log_text = formatter(log_source[i])
936            text_output += log_text
937            if not log_text.endswith('\n'):
938                text_output += '\n'
939
940        return text_output
941
942    def export_logs(
943        self,
944        use_table_formatting: bool = True,
945        selected_lines_only: bool = False,
946        file_name: str | None = None,
947        to_clipboard: bool = False,
948        add_markdown_fence: bool = False,
949    ) -> bool:
950        """Export log lines to file or clipboard."""
951        text_output = self._logs_to_text(
952            use_table_formatting, selected_lines_only
953        )
954
955        if file_name:
956            target_path = Path(file_name).expanduser()
957            with target_path.open('w') as output_file:
958                output_file.write(text_output)
959            _LOG.debug('Saved to file: %s', file_name)
960
961        elif to_clipboard:
962            if add_markdown_fence:
963                text_output = '```\n' + text_output + '```\n'
964            self.log_pane.application.set_system_clipboard(text_output)
965            _LOG.debug('Copied logs to clipboard.')
966
967        return True
968