• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2022 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""LogScreen tracks lines to display on screen with a set of ScreenLines."""
15
16from __future__ import annotations
17import collections
18import dataclasses
19import logging
20from typing import Callable, List, Optional, Tuple, TYPE_CHECKING
21
22from prompt_toolkit.formatted_text import (
23    to_formatted_text,
24    StyleAndTextTuples,
25)
26
27from pw_console.log_filter import LogFilter
28from pw_console.text_formatting import (
29    fill_character_width,
30    insert_linebreaks,
31    split_lines,
32)
33
34if TYPE_CHECKING:
35    from pw_console.log_line import LogLine
36    from pw_console.log_pane import LogPane
37
38_LOG = logging.getLogger(__package__)
39
40
41@dataclasses.dataclass
42class ScreenLine:
43    """A single line of text for displaying on screen.
44
45    Instances of ScreenLine are stored in a LogScreen's line_buffer deque. When
46    a new log message is added it may be converted into multiple ScreenLine
47    instances if the text is wrapped across multiple lines.
48
49    For example: say our screen is 80 characters wide and a log message 240
50    characters long needs to be displayed. With line wrapping on we will need
51    240/80 = 3 lines to show the full message. Say also that this single log
52    message is at index #5 in the LogStore classes deque, this is the log_index
53    value. This single log message will then be split into 3 separate
54    ScreenLine instances:
55
56    ::
57        ScreenLine(fragments=[('', 'Log message text line one')],
58                   log_index=5, subline=0, height=3)
59        ScreenLine(fragments=[('', 'Log message text line two')],
60                   log_index=5, subline=1, height=3)
61        ScreenLine(fragments=[('', 'Log message text line three')],
62                   log_index=5, subline=2, height=3)
63
64    Each `fragments` attribute will store the formatted text indended to be
65    direcly drawn to the screen. Since these three lines are all displaying the
66    same log message their `log_index` reference will be the same. The `subline`
67    attribute is the zero-indexed number for this log's wrapped line count and
68    `height` is the total ScreenLines needed to show this log message.
69
70    Continuing with this example say the next two log messages to display both
71    fit on screen with no wrapping. They will both be represented with one
72    ScreenLine each:
73
74    ::
75        ScreenLine(fragments=[('', 'Another log message')],
76                   log_index=6, subline=0, height=1)
77        ScreenLine(fragments=[('', 'Yet another log message')],
78                   log_index=7, subline=0, height=1)
79
80    The `log_index` is different for each since these are both separate
81    logs. The subline is 0 since each line is the first one for this log. Both
82    have a height of 1 since no line wrapping was performed.
83    """
84    # The StyleAndTextTuples for this line ending with a '\n'. These are the raw
85    # prompt_toolkit formatted text tuples to display on screen. The colors and
86    # spacing can change depending on the formatters used in the
87    # LogScreen._get_fragments_per_line() function.
88    fragments: StyleAndTextTuples
89
90    # Log index reference for this screen line. This is the index to where the
91    # log message resides in the parent LogStore.logs deque. It is set to None
92    # if this is an empty ScreenLine. If a log message requires line wrapping
93    # then each resulting ScreenLine instance will have the same log_index
94    # value.
95    #
96    # This log_index may also be the integer index into a LogView.filtered_logs
97    # deque depending on if log messages are being filtered by the user. The
98    # LogScreen class below doesn't need to do anything different in either
99    # case. It's the responsibility of LogScreen.get_log_source() to return the
100    # correct source.
101    #
102    # Note this is NOT an index into LogScreen.line_buffer.
103    log_index: Optional[int] = None
104
105    # Keep track the total height and subline number for this log message.
106    # For example this line could be subline (0, 1, or 2) of a log message with
107    # a total height 3.
108
109    # Subline index.
110    subline: int = 0
111    # Total height in lines of text (also ScreenLine count) that the log message
112    # referred to by log_index requires. When a log message is split across
113    # multiple lines height will be set to the same value for each ScreenLine
114    # instance.
115    height: int = 1
116
117    # Empty lines will have no log_index
118    def empty(self) -> bool:
119        return self.log_index is None
120
121
122@dataclasses.dataclass
123class LogScreen:
124    """LogScreen maintains the state of visible logs on screen.
125
126    It is responsible for moving the cursor_position, prepending and appending
127    log lines as the user moves the cursor."""
128    # Callable functions to retrieve logs and display formatting.
129    get_log_source: Callable[[], Tuple[int, collections.deque[LogLine]]]
130    get_line_wrapping: Callable[[], bool]
131    get_log_formatter: Callable[[], Optional[Callable[[LogLine],
132                                                      StyleAndTextTuples]]]
133    get_search_filter: Callable[[], Optional[LogFilter]]
134    get_search_highlight: Callable[[], bool]
135
136    # Window row of the current cursor position
137    cursor_position: int = 0
138    # Screen width and height in number of characters.
139    width: int = 0
140    height: int = 0
141    # Buffer of literal text lines to be displayed on screen. Each visual line
142    # is represented by a ScreenLine instance and will have a max width equal
143    # to the screen's width. If any given whole log message requires line
144    # wrapping to be displayed it will be represented by multiple ScreenLine
145    # instances in this deque.
146    line_buffer: collections.deque[ScreenLine] = dataclasses.field(
147        default_factory=collections.deque)
148
149    def __post_init__(self) -> None:
150        # Empty screen flag. Will be true if the screen contains only newlines.
151        self._empty: bool = True
152        # Save the last log index when appending. Useful for tracking how many
153        # new lines need appending in follow mode.
154        self.last_appended_log_index: int = 0
155
156    def _fill_top_with_empty_lines(self) -> None:
157        """Add empty lines to fill the remaining empty screen space."""
158        for _ in range(self.height - len(self.line_buffer)):
159            self.line_buffer.appendleft(ScreenLine([('', '')]))
160
161    def clear_screen(self) -> None:
162        """Erase all lines and fill with empty lines."""
163        self.line_buffer.clear()
164        self._fill_top_with_empty_lines()
165        self._empty = True
166
167    def empty(self) -> bool:
168        """Return True if the screen has no lines with content."""
169        return self._empty
170
171    def reset_logs(
172        self,
173        log_index: int = 0,
174    ) -> None:
175        """Erase the screen and append logs starting from log_index."""
176        self.clear_screen()
177
178        start_log_index, log_source = self.get_log_source()
179        if len(log_source) == 0:
180            return
181
182        # Append at most at most the window height number worth of logs. If the
183        # number of available logs is less, use that amount.
184        max_log_messages_to_fetch = min(self.height, len(log_source))
185
186        # Including the target log_index, fetch the desired logs.
187        # For example if we are rendering log_index 10 and the window height is
188        # 6 the range below will be:
189        # >>> list(i for i in range((10 - 6) + 1, 10 + 1))
190        # [5, 6, 7, 8, 9, 10]
191        for i in range((log_index - max_log_messages_to_fetch) + 1,
192                       log_index + 1):
193            # If i is < 0 it's an invalid log, skip to the next line. The next
194            # index could be 0 or higher since we are traversing in increasing
195            # order.
196            if i < start_log_index:
197                continue
198            self.append_log(i)
199        # Make sure the bottom line is highlighted.
200        self.move_cursor_to_bottom()
201
202    def resize(self, width, height) -> None:
203        """Update screen width and height.
204
205        Following a resize the caller should run reset_logs()."""
206        self.width = width
207        self.height = height
208
209    def get_lines(
210        self,
211        marked_logs_start: Optional[int] = None,
212        marked_logs_end: Optional[int] = None,
213    ) -> List[StyleAndTextTuples]:
214        """Return lines for final display.
215
216        Styling is added for the line under the cursor."""
217        if not marked_logs_start:
218            marked_logs_start = -1
219        if not marked_logs_end:
220            marked_logs_end = -1
221
222        all_lines: List[StyleAndTextTuples] = []
223        # Loop through a copy of the line_buffer in case it is mutated before
224        # this function is complete.
225        for i, line in enumerate(list(self.line_buffer)):
226
227            # Is this line the cursor_position? Apply line highlighting
228            if (i == self.cursor_position
229                    and (self.cursor_position < len(self.line_buffer))
230                    and not self.line_buffer[self.cursor_position].empty()):
231                # Fill in empty charaters to the width of the screen. This
232                # ensures the backgound is highlighted to the edge of the
233                # screen.
234                new_fragments = fill_character_width(
235                    line.fragments,
236                    len(line.fragments) - 1,  # -1 for the ending line break
237                    self.width,
238                )
239
240                # Apply a style to highlight this line.
241                all_lines.append(
242                    to_formatted_text(new_fragments,
243                                      style='class:selected-log-line'))
244            elif line.log_index is not None and (
245                    marked_logs_start <= line.log_index <= marked_logs_end):
246                new_fragments = fill_character_width(
247                    line.fragments,
248                    len(line.fragments) - 1,  # -1 for the ending line break
249                    self.width,
250                )
251
252                # Apply a style to highlight this line.
253                all_lines.append(
254                    to_formatted_text(new_fragments,
255                                      style='class:marked-log-line'))
256
257            else:
258                all_lines.append(line.fragments)
259
260        return all_lines
261
262    def _prepend_line(self, line: ScreenLine) -> None:
263        """Add a line to the top of the screen."""
264        self.line_buffer.appendleft(line)
265        self._empty = False
266
267    def _append_line(self, line: ScreenLine) -> None:
268        """Add a line to the bottom of the screen."""
269        self.line_buffer.append(line)
270        self._empty = False
271
272    def _trim_top_lines(self) -> None:
273        """Remove lines from the top if larger than the screen height."""
274        overflow_amount = len(self.line_buffer) - self.height
275        for _ in range(overflow_amount):
276            self.line_buffer.popleft()
277
278    def _trim_bottom_lines(self) -> None:
279        """Remove lines from the bottom if larger than the screen height."""
280        overflow_amount = len(self.line_buffer) - self.height
281        for _ in range(overflow_amount):
282            self.line_buffer.pop()
283
284    def move_cursor_up(self, line_count: int) -> int:
285        """Move the cursor up as far as it can go without fetching new lines.
286
287        Args:
288            line_count: A negative number of lines to move the cursor by.
289
290        Returns:
291            int: The remaining line count that was not moved. This is the number
292            of new lines that need to be fetched and prepended to the screen
293            line buffer."""
294        remaining_lines = line_count
295
296        # Loop from a negative line_count value to zero.
297        # For example if line_count is -5 the loop will traverse:
298        # >>> list(i for i in range(-5, 0, 1))
299        # [-5, -4, -3, -2, -1]
300        for _ in range(line_count, 0, 1):
301            new_index = self.cursor_position - 1
302            if new_index < 0:
303                break
304            if (new_index < len(self.line_buffer)
305                    and self.line_buffer[new_index].empty()):
306                # The next line is empty and has no content.
307                break
308            self.cursor_position -= 1
309            remaining_lines += 1
310        return remaining_lines
311
312    def move_cursor_down(self, line_count: int) -> int:
313        """Move the cursor down as far as it can go without fetching new lines.
314
315        Args:
316            line_count: A positive number of lines to move the cursor down by.
317
318        Returns:
319            int: The remaining line count that was not moved. This is the number
320            of new lines that need to be fetched and appended to the screen line
321            buffer."""
322        remaining_lines = line_count
323        for _ in range(line_count):
324            new_index = self.cursor_position + 1
325            if new_index >= self.height:
326                break
327            if (new_index < len(self.line_buffer)
328                    and self.line_buffer[new_index].empty()):
329                # The next line is empty and has no content.
330                break
331            self.cursor_position += 1
332            remaining_lines -= 1
333        return remaining_lines
334
335    def move_cursor_to_bottom(self) -> None:
336        """Move the cursor to the bottom of the screen.
337
338        Only use this for movement not initiated by users. For example if new
339        logs were just added to the bottom of the screen in follow
340        mode. The LogScreen class does not allow scrolling beyond the bottom of
341        the content so the cursor will fall on a log message as long as there
342        are some log messages. If there are no log messages the line is not
343        highlighted by get_lines()."""
344        self.cursor_position = self.height - 1
345
346    def move_cursor_to_position(self, window_row: int) -> None:
347        """Move the cursor to a line if there is a log message there."""
348        if window_row >= len(self.line_buffer):
349            return
350        if 0 <= window_row < self.height:
351            current_line = self.line_buffer[window_row]
352            if current_line.log_index is not None:
353                self.cursor_position = window_row
354
355    def _move_selection_to_log(self, log_index: int, subline: int) -> None:
356        """Move the cursor to the location of log_index."""
357        for i, line in enumerate(self.line_buffer):
358            if line.log_index == log_index and line.subline == subline:
359                self.cursor_position = i
360                return
361
362    def shift_selected_log_to_top(self) -> None:
363        """Shift the selected line to the top.
364
365        This moves the lines on screen and keeps the originally selected line
366        highlighted. Example use case: when jumping to a search match the
367        matched line will be shown at the top of the screen."""
368        if not 0 <= self.cursor_position < len(self.line_buffer):
369            return
370
371        current_line = self.line_buffer[self.cursor_position]
372        amount = max(self.cursor_position, current_line.height)
373        amount -= current_line.subline
374        remaining_lines = self.scroll_subline(amount)
375        if remaining_lines != 0 and current_line.log_index is not None:
376            # Restore original selected line.
377            self._move_selection_to_log(current_line.log_index,
378                                        current_line.subline)
379            return
380        # Lines scrolled as expected, set cursor_position to top.
381        self.cursor_position = 0
382
383    def shift_selected_log_to_center(self) -> None:
384        """Shift the selected line to the center.
385
386        This moves the lines on screen and keeps the originally selected line
387        highlighted. Example use case: when jumping to a search match the
388        matched line will be shown at the center of the screen."""
389        if not 0 <= self.cursor_position < len(self.line_buffer):
390            return
391
392        half_height = int(self.height / 2)
393        current_line = self.line_buffer[self.cursor_position]
394
395        amount = max(self.cursor_position - half_height, current_line.height)
396        amount -= current_line.subline
397
398        remaining_lines = self.scroll_subline(amount)
399        if remaining_lines != 0 and current_line.log_index is not None:
400            # Restore original selected line.
401            self._move_selection_to_log(current_line.log_index,
402                                        current_line.subline)
403            return
404
405        # Lines scrolled as expected, set cursor_position to center.
406        self.cursor_position -= amount
407        self.cursor_position -= (current_line.height - 1)
408
409    def scroll_subline(self, line_count: int = 1) -> int:
410        """Move the cursor down or up by positive or negative lines.
411
412        Args:
413            line_count: A positive or negative number of lines the cursor should
414                move. Positive for down, negative for up.
415
416        Returns:
417            int: The remaining line count that was not moved. This is the number
418            of new lines that could not be fetched in the case that the top or
419            bottom of available log message lines was reached."""
420        # Move self.cursor_position as far as it can go on screen without
421        # fetching new log message lines.
422        if line_count > 0:
423            remaining_lines = self.move_cursor_down(line_count)
424        else:
425            remaining_lines = self.move_cursor_up(line_count)
426
427        if remaining_lines == 0:
428            # No more lines needed, return
429            return remaining_lines
430
431        # Top or bottom of the screen was reached, fetch and add new log lines.
432        if remaining_lines < 0:
433            return self.fetch_subline_up(remaining_lines)
434        return self.fetch_subline_down(remaining_lines)
435
436    def fetch_subline_up(self, line_count: int = -1) -> int:
437        """Fetch new lines from the top in order of decreasing log_indexes.
438
439        Args:
440            line_count: A negative number of lines that should be fetched and
441                added to the top of the screen.
442
443        Returns:
444            int: The number of lines that were not fetched. Returns 0 if the
445                desired number of lines were fetched successfully."""
446        start_log_index, _log_source = self.get_log_source()
447        remaining_lines = line_count
448        for _ in range(line_count, 0, 1):
449            current_line = self.get_line_at_cursor_position()
450            if current_line.log_index is None:
451                return remaining_lines + 1
452
453            target_log_index: int
454            target_subline: int
455
456            # If the current subline is at the start of this log, fetch the
457            # previous log message's last subline.
458            if current_line.subline == 0:
459                target_log_index = current_line.log_index - 1
460                # Set -1 to signal fetching the previous log's last subline
461                target_subline = -1
462            else:
463                # Get previous sub line of current log
464                target_log_index = current_line.log_index
465                target_subline = current_line.subline - 1
466
467            if target_log_index < start_log_index:
468                # Invalid log_index, don't scroll further
469                return remaining_lines + 1
470
471            self.prepend_log(target_log_index, subline=target_subline)
472            remaining_lines += 1
473
474        return remaining_lines
475
476    def get_line_at_cursor_position(self) -> ScreenLine:
477        """Returns the ScreenLine under the cursor."""
478        if (self.cursor_position >= len(self.line_buffer)
479                or self.cursor_position < 0):
480            return ScreenLine([('', '')])
481        return self.line_buffer[self.cursor_position]
482
483    def fetch_subline_down(self, line_count: int = 1) -> int:
484        """Fetch new lines from the bottom in order of increasing log_indexes.
485
486        Args:
487            line_count: A positive number of lines that should be fetched and
488                added to the bottom of the screen.
489
490        Returns:
491            int: The number of lines that were not fetched. Returns 0 if the
492                desired number of lines were fetched successfully."""
493        _start_log_index, log_source = self.get_log_source()
494        remaining_lines = line_count
495        for _ in range(line_count):
496            # Skip this line if not at the bottom
497            if self.cursor_position < self.height - 1:
498                self.cursor_position += 1
499                continue
500
501            current_line = self.get_line_at_cursor_position()
502            if current_line.log_index is None:
503                return remaining_lines - 1
504
505            target_log_index: int
506            target_subline: int
507
508            # If the current subline is at the height of this log, fetch the
509            # next log message.
510            if current_line.subline == current_line.height - 1:
511                # Get next log's first subline
512                target_log_index = current_line.log_index + 1
513                target_subline = 0
514            else:
515                # Get next sub line of current log
516                target_log_index = current_line.log_index
517                target_subline = current_line.subline + 1
518
519            if target_log_index >= len(log_source):
520                # Invalid log_index, don't scroll further
521                return remaining_lines - 1
522
523            self.append_log(target_log_index, subline=target_subline)
524            remaining_lines -= 1
525
526        return remaining_lines
527
528    def first_rendered_log_index(self) -> Optional[int]:
529        """Scan the screen for the first valid log_index and return it."""
530        log_index = None
531        for i in range(self.height):
532            if i >= len(self.line_buffer):
533                break
534            if self.line_buffer[i].log_index is not None:
535                log_index = self.line_buffer[i].log_index
536                break
537        return log_index
538
539    def last_rendered_log_index(self) -> Optional[int]:
540        """Return the last log_index shown on screen."""
541        log_index = None
542        if len(self.line_buffer) == 0:
543            return None
544        if self.line_buffer[-1].log_index is not None:
545            log_index = self.line_buffer[-1].log_index
546        return log_index
547
548    def _get_fragments_per_line(self,
549                                log_index: int) -> List[StyleAndTextTuples]:
550        """Return a list of lines wrapped to the screen width for a log.
551
552        Before fetching the log message this function updates the log_source and
553        formatting options."""
554        _start_log_index, log_source = self.get_log_source()
555        if log_index >= len(log_source):
556            return []
557        log = log_source[log_index]
558        table_formatter = self.get_log_formatter()
559        truncate_lines = not self.get_line_wrapping()
560        search_filter = self.get_search_filter()
561        search_highlight = self.get_search_highlight()
562
563        # Select the log display formatter; table or standard.
564        fragments: StyleAndTextTuples = []
565        if table_formatter:
566            fragments = table_formatter(log)
567        else:
568            fragments = log.get_fragments()
569
570        # Apply search term highlighting.
571        if search_filter and search_highlight and search_filter.matches(log):
572            fragments = search_filter.highlight_search_matches(fragments)
573
574        # Word wrap the log message or truncate to screen width
575        line_fragments, _log_line_height = insert_linebreaks(
576            fragments,
577            max_line_width=self.width,
578            truncate_long_lines=truncate_lines)
579        # Convert the existing flattened fragments to a list of lines.
580        fragments_per_line = split_lines(line_fragments)
581
582        return fragments_per_line
583
584    def prepend_log(
585        self,
586        log_index: int,
587        subline: Optional[int] = None,
588    ) -> None:
589        """Add a log message or a single line to the top of the screen.
590
591        Args:
592            log_index: The index of the log message to fetch.
593            subline: The desired subline of the log message. When displayed on
594                screen the log message may take up more than one line. If
595                subline is 0 or higher that line will be added. If subline is -1
596                the last subline will be prepended regardless of the total log
597                message height.
598        """
599        fragments_per_line = self._get_fragments_per_line(log_index)
600
601        # Target the last subline if the subline arg is set to -1.
602        fetch_last_subline = (subline == -1)
603
604        for line_index, line in enumerate(fragments_per_line):
605            # If we are looking for a specific subline and this isn't it, skip.
606            if subline is not None:
607                # If subline is set to -1 we need to append the last subline of
608                # this log message. Skip this line if it isn't the last one.
609                if fetch_last_subline and (line_index !=
610                                           len(fragments_per_line) - 1):
611                    continue
612                # If subline is not -1 (0 or higher) and this isn't the desired
613                # line, skip to the next one.
614                if not fetch_last_subline and line_index != subline:
615                    continue
616
617            self._prepend_line(
618                ScreenLine(
619                    fragments=line,
620                    log_index=log_index,
621                    subline=line_index,
622                    height=len(fragments_per_line),
623                ))
624
625        # Remove lines from the bottom if over the screen height.
626        if len(self.line_buffer) > self.height:
627            self._trim_bottom_lines()
628
629    def append_log(
630        self,
631        log_index: int,
632        subline: Optional[int] = None,
633    ) -> None:
634        """Add a log message or a single line to the bottom of the screen."""
635        # Save this log_index
636        self.last_appended_log_index = log_index
637        fragments_per_line = self._get_fragments_per_line(log_index)
638
639        for line_index, line in enumerate(fragments_per_line):
640            # If we are looking for a specific subline and this isn't it, skip.
641            if subline is not None and line_index != subline:
642                continue
643
644            self._append_line(
645                ScreenLine(
646                    fragments=line,
647                    log_index=log_index,
648                    subline=line_index,
649                    height=len(fragments_per_line),
650                ))
651
652        # Remove lines from the top if over the screen height.
653        if len(self.line_buffer) > self.height:
654            self._trim_top_lines()
655