1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""LogStore saves logs and acts as a Python logging handler.""" 15 16from __future__ import annotations 17import collections 18import logging 19from datetime import datetime 20from typing import Dict, List, Optional, TYPE_CHECKING 21 22from pw_cli.color import colors as pw_cli_colors 23 24from pw_console.console_prefs import ConsolePrefs 25from pw_console.log_line import LogLine 26from pw_console.text_formatting import strip_ansi 27from pw_console.widgets.table import TableView 28 29if TYPE_CHECKING: 30 from pw_console.log_view import LogView 31 32 33class LogStore(logging.Handler): 34 """Pigweed Console logging handler. 35 36 This is a `Python logging.Handler 37 <https://docs.python.org/3/library/logging.html#handler-objects>`_ class 38 used to store logs for display in the pw_console user interface. 39 40 You may optionally add this as a handler to an existing logger 41 instances. This will be required if logs need to be captured for display in 42 the pw_console UI before the user interface is running. 43 44 Example usage: 45 46 .. code-block:: python 47 48 import logging 49 50 from pw_console import PwConsoleEmbed, LogStore 51 52 _DEVICE_LOG = logging.getLogger('usb_gadget') 53 54 # Create a log store and add as a handler. 55 device_log_store = LogStore() 56 _DEVICE_LOG.addHander(device_log_store) 57 58 # Start communication with your device here, before embedding 59 # pw_console. 60 61 # Create the pw_console embed instance 62 console = PwConsoleEmbed( 63 global_vars=globals(), 64 local_vars=locals(), 65 loggers={ 66 'Host Logs': [ 67 logging.getLogger(__package__), 68 logging.getLogger(__name__), 69 ], 70 # Set the LogStore as the value of this logger window. 71 'Device Logs': device_log_store, 72 }, 73 app_title='My Awesome Console', 74 ) 75 76 console.setup_python_logging() 77 console.embed() 78 """ 79 80 def __init__(self, prefs: Optional[ConsolePrefs] = None): 81 """Initializes the LogStore instance.""" 82 83 # ConsolePrefs may not be passed on init. For example, if the user is 84 # creating a LogStore to capture log messages before console startup. 85 if not prefs: 86 prefs = ConsolePrefs( 87 project_file=False, project_user_file=False, user_file=False 88 ) 89 self.prefs = prefs 90 # Log storage deque for fast addition and deletion from the beginning 91 # and end of the iterable. 92 self.logs: collections.deque = collections.deque() 93 94 # Only allow this many log lines in memory. 95 self.max_history_size: int = 1000000 96 97 # Counts of logs per python logger name 98 self.channel_counts: Dict[str, int] = {} 99 # Widths of each logger prefix string. For example: the character length 100 # of the timestamp string. 101 self.channel_formatted_prefix_widths: Dict[str, int] = {} 102 # Longest of the above prefix widths. 103 self.longest_channel_prefix_width = 0 104 105 self.table: TableView = TableView(prefs=self.prefs) 106 107 # Erase existing logs. 108 self.clear_logs() 109 110 # List of viewers that should be notified on new log line arrival. 111 self.registered_viewers: List['LogView'] = [] 112 113 super().__init__() 114 115 # Set formatting after logging.Handler init. 116 self.set_formatting() 117 118 def set_prefs(self, prefs: ConsolePrefs) -> None: 119 """Set the ConsolePrefs for this LogStore.""" 120 self.prefs = prefs 121 self.table.set_prefs(prefs) 122 123 def register_viewer(self, viewer: 'LogView') -> None: 124 """Register this LogStore with a LogView.""" 125 self.registered_viewers.append(viewer) 126 127 def set_formatting(self) -> None: 128 """Setup log formatting.""" 129 # Copy of pw_cli log formatter 130 colors = pw_cli_colors(True) 131 timestamp_prefix = colors.black_on_white('%(asctime)s') + ' ' 132 timestamp_format = '%Y%m%d %H:%M:%S' 133 format_string = timestamp_prefix + '%(levelname)s %(message)s' 134 formatter = logging.Formatter(format_string, timestamp_format) 135 136 self.setLevel(logging.DEBUG) 137 self.setFormatter(formatter) 138 139 # Update log time character width. 140 example_time_string = datetime.now().strftime(timestamp_format) 141 self.table.column_widths['time'] = len(example_time_string) 142 143 def clear_logs(self): 144 """Erase all stored pane lines.""" 145 self.logs = collections.deque() 146 self.channel_counts = {} 147 self.channel_formatted_prefix_widths = {} 148 self.line_index = 0 149 150 def get_channel_counts(self): 151 """Return the seen channel log counts for this conatiner.""" 152 return ', '.join( 153 [f'{name}: {count}' for name, count in self.channel_counts.items()] 154 ) 155 156 def get_total_count(self): 157 """Total size of the logs store.""" 158 return len(self.logs) 159 160 def get_last_log_index(self): 161 """Last valid index of the logs.""" 162 # Subtract 1 since self.logs is zero indexed. 163 total = self.get_total_count() 164 return 0 if total < 0 else total - 1 165 166 def _update_log_prefix_width(self, record: logging.LogRecord): 167 """Save the formatted prefix width if this is a new logger channel 168 name.""" 169 if self.formatter and ( 170 record.name not in self.channel_formatted_prefix_widths.keys() 171 ): 172 # Find the width of the formatted timestamp and level 173 format_string = ( 174 self.formatter._fmt # pylint: disable=protected-access 175 ) 176 177 # There may not be a _fmt defined. 178 if not format_string: 179 return 180 181 format_without_message = format_string.replace('%(message)s', '') 182 # If any other style parameters are left, get the width of them. 183 if ( 184 format_without_message 185 and 'asctime' in format_without_message 186 and 'levelname' in format_without_message 187 ): 188 formatted_time_and_level = format_without_message % dict( 189 asctime=record.asctime, levelname=record.levelname 190 ) 191 192 # Delete ANSI escape sequences. 193 ansi_stripped_time_and_level = strip_ansi( 194 formatted_time_and_level 195 ) 196 197 self.channel_formatted_prefix_widths[record.name] = len( 198 ansi_stripped_time_and_level 199 ) 200 else: 201 self.channel_formatted_prefix_widths[record.name] = 0 202 203 # Set the max width of all known formats so far. 204 self.longest_channel_prefix_width = max( 205 self.channel_formatted_prefix_widths.values() 206 ) 207 208 def _append_log(self, record: logging.LogRecord): 209 """Add a new log event.""" 210 # Format incoming log line. 211 formatted_log = self.format(record) 212 ansi_stripped_log = strip_ansi(formatted_log) 213 # Save this log. 214 self.logs.append( 215 LogLine( 216 record=record, 217 formatted_log=formatted_log, 218 ansi_stripped_log=ansi_stripped_log, 219 ) 220 ) 221 # Increment this logger count 222 self.channel_counts[record.name] = ( 223 self.channel_counts.get(record.name, 0) + 1 224 ) 225 226 # TODO(b/235271486): Revisit calculating prefix widths automatically 227 # when line wrapping indentation is supported. 228 # Set the prefix width to 0 229 self.channel_formatted_prefix_widths[record.name] = 0 230 231 # Parse metadata fields 232 self.logs[-1].update_metadata() 233 234 # Check for bigger column widths. 235 self.table.update_metadata_column_widths(self.logs[-1]) 236 237 def emit(self, record) -> None: 238 """Process a new log record. 239 240 This defines the logging.Handler emit() fuction which is called by 241 logging.Handler.handle() We don't implement handle() as it is done in 242 the parent class with thread safety and filters applied. 243 """ 244 self._append_log(record) 245 # Notify viewers of new logs 246 for viewer in self.registered_viewers: 247 viewer.new_logs_arrived() 248 249 def render_table_header(self): 250 """Get pre-formatted table header.""" 251 return self.table.formatted_header() 252