• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2021 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14"""LogStore saves logs and acts as a Python logging handler."""
15
16from __future__ import annotations
17import collections
18import logging
19from datetime import datetime
20from typing import Dict, List, Optional, TYPE_CHECKING
21
22from pw_cli.color import colors as pw_cli_colors
23
24from pw_console.console_prefs import ConsolePrefs
25from pw_console.log_line import LogLine
26from pw_console.text_formatting import strip_ansi
27from pw_console.widgets.table import TableView
28
29if TYPE_CHECKING:
30    from pw_console.log_view import LogView
31
32
33class LogStore(logging.Handler):
34    """Pigweed Console logging handler.
35
36    This is a `Python logging.Handler
37    <https://docs.python.org/3/library/logging.html#handler-objects>`_ class
38    used to store logs for display in the pw_console user interface.
39
40    You may optionally add this as a handler to an existing logger
41    instances. This will be required if logs need to be captured for display in
42    the pw_console UI before the user interface is running.
43
44    Example usage:
45
46    .. code-block:: python
47
48        import logging
49
50        from pw_console import PwConsoleEmbed, LogStore
51
52        _DEVICE_LOG = logging.getLogger('usb_gadget')
53
54        # Create a log store and add as a handler.
55        device_log_store = LogStore()
56        _DEVICE_LOG.addHander(device_log_store)
57
58        # Start communication with your device here, before embedding
59        # pw_console.
60
61        # Create the pw_console embed instance
62        console = PwConsoleEmbed(
63            global_vars=globals(),
64            local_vars=locals(),
65            loggers={
66                'Host Logs': [
67                    logging.getLogger(__package__),
68                    logging.getLogger(__name__),
69                ],
70                # Set the LogStore as the value of this logger window.
71                'Device Logs': device_log_store,
72            },
73            app_title='My Awesome Console',
74        )
75
76        console.setup_python_logging()
77        console.embed()
78    """
79
80    def __init__(self, prefs: Optional[ConsolePrefs] = None):
81        """Initializes the LogStore instance."""
82
83        # ConsolePrefs may not be passed on init. For example, if the user is
84        # creating a LogStore to capture log messages before console startup.
85        if not prefs:
86            prefs = ConsolePrefs(
87                project_file=False, project_user_file=False, user_file=False
88            )
89        self.prefs = prefs
90        # Log storage deque for fast addition and deletion from the beginning
91        # and end of the iterable.
92        self.logs: collections.deque = collections.deque()
93
94        # Only allow this many log lines in memory.
95        self.max_history_size: int = 1000000
96
97        # Counts of logs per python logger name
98        self.channel_counts: Dict[str, int] = {}
99        # Widths of each logger prefix string. For example: the character length
100        # of the timestamp string.
101        self.channel_formatted_prefix_widths: Dict[str, int] = {}
102        # Longest of the above prefix widths.
103        self.longest_channel_prefix_width = 0
104
105        self.table: TableView = TableView(prefs=self.prefs)
106
107        # Erase existing logs.
108        self.clear_logs()
109
110        # List of viewers that should be notified on new log line arrival.
111        self.registered_viewers: List['LogView'] = []
112
113        super().__init__()
114
115        # Set formatting after logging.Handler init.
116        self.set_formatting()
117
118    def set_prefs(self, prefs: ConsolePrefs) -> None:
119        """Set the ConsolePrefs for this LogStore."""
120        self.prefs = prefs
121        self.table.set_prefs(prefs)
122
123    def register_viewer(self, viewer: 'LogView') -> None:
124        """Register this LogStore with a LogView."""
125        self.registered_viewers.append(viewer)
126
127    def set_formatting(self) -> None:
128        """Setup log formatting."""
129        # Copy of pw_cli log formatter
130        colors = pw_cli_colors(True)
131        timestamp_prefix = colors.black_on_white('%(asctime)s') + ' '
132        timestamp_format = '%Y%m%d %H:%M:%S'
133        format_string = timestamp_prefix + '%(levelname)s %(message)s'
134        formatter = logging.Formatter(format_string, timestamp_format)
135
136        self.setLevel(logging.DEBUG)
137        self.setFormatter(formatter)
138
139        # Update log time character width.
140        example_time_string = datetime.now().strftime(timestamp_format)
141        self.table.column_widths['time'] = len(example_time_string)
142
143    def clear_logs(self):
144        """Erase all stored pane lines."""
145        self.logs = collections.deque()
146        self.channel_counts = {}
147        self.channel_formatted_prefix_widths = {}
148        self.line_index = 0
149
150    def get_channel_counts(self):
151        """Return the seen channel log counts for this conatiner."""
152        return ', '.join(
153            [f'{name}: {count}' for name, count in self.channel_counts.items()]
154        )
155
156    def get_total_count(self):
157        """Total size of the logs store."""
158        return len(self.logs)
159
160    def get_last_log_index(self):
161        """Last valid index of the logs."""
162        # Subtract 1 since self.logs is zero indexed.
163        total = self.get_total_count()
164        return 0 if total < 0 else total - 1
165
166    def _update_log_prefix_width(self, record: logging.LogRecord):
167        """Save the formatted prefix width if this is a new logger channel
168        name."""
169        if self.formatter and (
170            record.name not in self.channel_formatted_prefix_widths.keys()
171        ):
172            # Find the width of the formatted timestamp and level
173            format_string = (
174                self.formatter._fmt  # pylint: disable=protected-access
175            )
176
177            # There may not be a _fmt defined.
178            if not format_string:
179                return
180
181            format_without_message = format_string.replace('%(message)s', '')
182            # If any other style parameters are left, get the width of them.
183            if (
184                format_without_message
185                and 'asctime' in format_without_message
186                and 'levelname' in format_without_message
187            ):
188                formatted_time_and_level = format_without_message % dict(
189                    asctime=record.asctime, levelname=record.levelname
190                )
191
192                # Delete ANSI escape sequences.
193                ansi_stripped_time_and_level = strip_ansi(
194                    formatted_time_and_level
195                )
196
197                self.channel_formatted_prefix_widths[record.name] = len(
198                    ansi_stripped_time_and_level
199                )
200            else:
201                self.channel_formatted_prefix_widths[record.name] = 0
202
203            # Set the max width of all known formats so far.
204            self.longest_channel_prefix_width = max(
205                self.channel_formatted_prefix_widths.values()
206            )
207
208    def _append_log(self, record: logging.LogRecord):
209        """Add a new log event."""
210        # Format incoming log line.
211        formatted_log = self.format(record)
212        ansi_stripped_log = strip_ansi(formatted_log)
213        # Save this log.
214        self.logs.append(
215            LogLine(
216                record=record,
217                formatted_log=formatted_log,
218                ansi_stripped_log=ansi_stripped_log,
219            )
220        )
221        # Increment this logger count
222        self.channel_counts[record.name] = (
223            self.channel_counts.get(record.name, 0) + 1
224        )
225
226        # TODO(b/235271486): Revisit calculating prefix widths automatically
227        # when line wrapping indentation is supported.
228        # Set the prefix width to 0
229        self.channel_formatted_prefix_widths[record.name] = 0
230
231        # Parse metadata fields
232        self.logs[-1].update_metadata()
233
234        # Check for bigger column widths.
235        self.table.update_metadata_column_widths(self.logs[-1])
236
237    def emit(self, record) -> None:
238        """Process a new log record.
239
240        This defines the logging.Handler emit() fuction which is called by
241        logging.Handler.handle() We don't implement handle() as it is done in
242        the parent class with thread safety and filters applied.
243        """
244        self._append_log(record)
245        # Notify viewers of new logs
246        for viewer in self.registered_viewers:
247            viewer.new_logs_arrived()
248
249    def render_table_header(self):
250        """Get pre-formatted table header."""
251        return self.table.formatted_header()
252