1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Python logging helper fuctions.""" 15 16import copy 17from datetime import datetime 18import json 19import logging 20import tempfile 21from typing import Any, Iterable, Iterator 22 23 24def all_loggers() -> Iterator[logging.Logger]: 25 """Iterates over all loggers known to Python logging.""" 26 manager = logging.getLogger().manager # type: ignore[attr-defined] 27 28 for logger_name in manager.loggerDict: # pylint: disable=no-member 29 yield logging.getLogger(logger_name) 30 31 32def create_temp_log_file( 33 prefix: str | None = None, add_time: bool = True 34) -> str: 35 """Create a unique tempfile for saving logs. 36 37 Example format: /tmp/pw_console_2021-05-04_151807_8hem6iyq 38 """ 39 if not prefix: 40 prefix = str(__package__) 41 42 # Grab the current system timestamp as a string. 43 isotime = datetime.now().isoformat(sep="_", timespec="seconds") 44 # Timestamp string should not have colons in it. 45 isotime = isotime.replace(":", "") 46 47 if add_time: 48 prefix += f"_{isotime}" 49 50 log_file_name = None 51 with tempfile.NamedTemporaryFile( 52 prefix=f"{prefix}_", delete=False 53 ) as log_file: 54 log_file_name = log_file.name 55 56 return log_file_name 57 58 59def set_logging_last_resort_file_handler( 60 file_name: str | None = None, 61) -> None: 62 log_file = file_name if file_name else create_temp_log_file() 63 logging.lastResort = logging.FileHandler(log_file) 64 65 66def disable_stdout_handlers(logger: logging.Logger) -> None: 67 """Remove all stdout and stdout & stderr logger handlers.""" 68 for handler in copy.copy(logger.handlers): 69 # Must use type() check here since this returns True: 70 # isinstance(logging.FileHandler, logging.StreamHandler) 71 # pylint: disable=unidiomatic-typecheck 72 if type(handler) == logging.StreamHandler: 73 logger.removeHandler(handler) 74 # pylint: enable=unidiomatic-typecheck 75 76 77def setup_python_logging( 78 last_resort_filename: str | None = None, 79 loggers_with_no_propagation: Iterable[logging.Logger] | None = None, 80) -> None: 81 """Disable log handlers for full screen prompt_toolkit applications.""" 82 if not loggers_with_no_propagation: 83 loggers_with_no_propagation = [] 84 disable_stdout_handlers(logging.getLogger()) 85 86 if logging.lastResort is not None: 87 set_logging_last_resort_file_handler(last_resort_filename) 88 89 for logger in list(all_loggers()): 90 # Prevent stdout handlers from corrupting the prompt_toolkit UI. 91 disable_stdout_handlers(logger) 92 if logger in loggers_with_no_propagation: 93 continue 94 # Make sure all known loggers propagate to the root logger. 95 logger.propagate = True 96 97 # Prevent these loggers from propagating to the root logger. 98 hidden_host_loggers = [ 99 "blib2to3.pgen2.driver", # spammy and unhelpful 100 "pw_console", 101 "pw_console.plugins", 102 # prompt_toolkit triggered debug log messages 103 "prompt_toolkit", 104 "prompt_toolkit.buffer", 105 "parso.python.diff", 106 "parso.cache", 107 "pw_console.serial_debug_logger", 108 "websockets.server", 109 ] 110 for logger_name in hidden_host_loggers: 111 logging.getLogger(logger_name).propagate = False 112 113 # Set asyncio log level to WARNING 114 logging.getLogger("asyncio").setLevel(logging.WARNING) 115 116 # Always set DEBUG level for serial debug. 117 logging.getLogger("pw_console.serial_debug_logger").setLevel(logging.DEBUG) 118 119 120def log_record_to_json(record: logging.LogRecord) -> str: 121 log_dict: dict[str, Any] = {} 122 log_dict["message"] = record.getMessage() 123 log_dict["levelno"] = record.levelno 124 log_dict["levelname"] = record.levelname 125 log_dict["args"] = record.args 126 log_dict["time"] = str(record.created) 127 log_dict["time_string"] = datetime.fromtimestamp(record.created).isoformat( 128 timespec="seconds" 129 ) 130 131 lineno = record.lineno 132 file_name = str(record.filename) 133 log_dict['py_file'] = f'{file_name}:{lineno}' 134 log_dict['py_logger'] = str(record.name) 135 136 if hasattr(record, "extra_metadata_fields") and ( 137 record.extra_metadata_fields # type: ignore 138 ): 139 fields = record.extra_metadata_fields # type: ignore 140 log_dict["fields"] = {} 141 for key, value in fields.items(): 142 if key == "msg": 143 log_dict["message"] = value 144 continue 145 146 log_dict["fields"][key] = str(value) 147 148 return json.dumps(log_dict) 149 150 151class JsonLogFormatter(logging.Formatter): 152 """Json Python logging Formatter 153 154 Use this formatter to log pw_console messages to a file in json 155 format. Column values normally shown in table view will be populated in the 156 'fields' key. 157 158 Example log entry: 159 160 .. code-block:: json 161 162 { 163 "message": "System init", 164 "levelno": 20, 165 "levelname": "INF", 166 "args": [ 167 "0:00", 168 "pw_system ", 169 "System init" 170 ], 171 "time": "1692302986.4729185", 172 "time_string": "2023-08-17T13:09:46", 173 "fields": { 174 "module": "pw_system", 175 "file": "pw_system/init.cc", 176 "timestamp": "0:00" 177 }, 178 "py_file": "script.py:1234", 179 "py_logger": "root" 180 } 181 182 Example usage: 183 184 .. code-block:: python 185 186 import logging 187 import pw_console.python_logging 188 189 _DEVICE_LOG = logging.getLogger('rpc_device') 190 191 json_filehandler = logging.FileHandler('logs.json', encoding='utf-8') 192 json_filehandler.setLevel(logging.DEBUG) 193 json_filehandler.setFormatter( 194 pw_console.python_logging.JsonLogFormatter()) 195 _DEVICE_LOG.addHandler(json_filehandler) 196 197 """ 198 199 def __init__(self, *args, **kwargs): 200 super().__init__(*args, **kwargs) 201 202 def format(self, record: logging.LogRecord) -> str: 203 return log_record_to_json(record) 204