1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14 15"""Utils to decode logs.""" 16 17from dataclasses import dataclass 18import datetime 19import logging 20import re 21from typing import Any, Callable 22 23from pw_log.proto import log_pb2 24import pw_log_tokenized 25import pw_status 26from pw_tokenizer import Detokenizer 27from pw_tokenizer.proto import decode_optionally_tokenized 28 29_LOG = logging.getLogger(__name__) 30 31 32@dataclass(frozen=True) 33class LogLineLevel: 34 """Tuple of line number and level packed in LogEntry.""" 35 36 line: int 37 level: int 38 39 40class Log: 41 """A decoded, human-readable representation of a LogEntry. 42 43 Contains fields to represent a decoded pw_log/log.proto LogEntry message in 44 a human readable way. 45 46 Attributes: 47 message: The log message as a string. 48 level: A integer representing the log level, follows logging levels. 49 flags: An integer with the bit flags. 50 timestamp: A string representation of a timestamp. 51 module_name: The module name as a string. 52 thread_name: The thread name as a string. 53 source_name: The source name as a string. 54 file_and_line: The filepath and line as a string. 55 metadata_fields: Extra fields with string-string mapping. 56 """ 57 58 _LOG_LEVEL_NAMES = { 59 logging.DEBUG: 'DBG', 60 logging.INFO: 'INF', 61 logging.WARNING: 'WRN', 62 logging.ERROR: 'ERR', 63 logging.CRITICAL: 'CRT', 64 # The logging module does not have a FATAL level. This module's log 65 # level values are 10 * PW_LOG_LEVEL values. PW_LOG_LEVEL_FATAL is 7. 66 70: 'FTL', 67 } 68 _LEVEL_MASK = 0x7 # pylint: disable=C0103 69 _LINE_OFFSET = 3 # pylint: disable=C0103 70 71 def __init__( 72 self, 73 message: str = '', 74 level: int = logging.NOTSET, 75 flags: int = 0, 76 timestamp: str = '', 77 module_name: str = '', 78 thread_name: str = '', 79 source_name: str = '', 80 file_and_line: str = '', 81 metadata_fields: dict[str, str] | None = None, 82 ) -> None: 83 self.message = message 84 self.level = level # Value from logging levels. 85 self.flags = flags 86 self.timestamp = timestamp # A human readable value. 87 self.module_name = module_name 88 self.thread_name = thread_name 89 self.source_name = source_name 90 self.file_and_line = file_and_line 91 self.metadata_fields = dict() 92 if metadata_fields: 93 self.metadata_fields.update(metadata_fields) 94 95 def __eq__(self, other: Any) -> bool: 96 if not isinstance(other, Log): 97 return False 98 99 return ( 100 self.message == other.message 101 and self.level == other.level 102 and self.flags == other.flags 103 and self.timestamp == other.timestamp 104 and self.module_name == other.module_name 105 and self.thread_name == other.thread_name 106 and self.source_name == other.source_name 107 and self.file_and_line == other.file_and_line 108 and self.metadata_fields == other.metadata_fields 109 ) 110 111 def __repr__(self) -> str: 112 return self.__str__() 113 114 def __str__(self) -> str: 115 level_name = self._LOG_LEVEL_NAMES.get(self.level, '') 116 metadata = ' '.join(map(str, self.metadata_fields.values())) 117 return ( 118 f'{level_name} [{self.source_name}] {self.module_name} ' 119 f'{self.timestamp} {self.message} {self.file_and_line} ' 120 f'{metadata}' 121 ).strip() 122 123 @staticmethod 124 def pack_line_level(line: int, logging_log_level: int) -> int: 125 """Packs the line and level values into an integer as done in LogEntry. 126 127 Args: 128 line: the line number 129 level: the logging level using logging levels. 130 Returns: 131 An integer with the two values bitpacked. 132 """ 133 return (line << Log._LINE_OFFSET) | ( 134 Log.logging_level_to_pw_log_level(logging_log_level) 135 & Log._LEVEL_MASK 136 ) 137 138 @staticmethod 139 def unpack_line_level(packed_line_level: int) -> LogLineLevel: 140 """Unpacks the line and level values packed as done in LogEntry. 141 142 Args: 143 An integer with the two values bitpacked. 144 Returns: 145 line: the line number 146 level: the logging level using logging levels. 147 """ 148 line = packed_line_level >> Log._LINE_OFFSET 149 # Convert to logging module level number. 150 level = Log.pw_log_level_to_logging_level( 151 packed_line_level & Log._LEVEL_MASK 152 ) 153 return LogLineLevel(line=line, level=level) 154 155 @staticmethod 156 def pw_log_level_to_logging_level(pw_log_level: int) -> int: 157 """Maps a pw_log/levels.h value to Python logging level value.""" 158 return pw_log_level * 10 159 160 @staticmethod 161 def logging_level_to_pw_log_level(logging_log_level: int) -> int: 162 """Maps a Python logging level value to a pw_log/levels.h value.""" 163 return int(logging_log_level / 10) 164 165 166def pw_status_code_to_name( 167 message: str, status_pattern: str = 'pw::Status=([0-9]+)' 168) -> str: 169 """Replaces the pw::Status code number with the status name. 170 171 Searches for a matching pattern encapsulating the status code number and 172 replaces it with the status name. This is useful when logging the status 173 code instead of the string to save space. 174 175 Args: 176 message: The string to look for the status code. 177 status_pattern: The regex pattern describing the format encapsulating 178 the status code. 179 Returns: 180 The message string with the status name if found, else the original 181 string. 182 """ 183 # Min and max values for Status. 184 max_status_value = max(status.value for status in pw_status.Status) 185 min_status_value = min(status.value for status in pw_status.Status) 186 187 def replacement_callback(match: re.Match) -> str: 188 status_code = int(match.group(1)) 189 if min_status_value <= status_code <= max_status_value: 190 return pw_status.Status(status_code).name 191 return match.group(0) 192 193 return re.sub( 194 pattern=status_pattern, repl=replacement_callback, string=message 195 ) 196 197 198def log_decoded_log( 199 log: Log, logger: logging.Logger | logging.LoggerAdapter 200) -> None: 201 """Formats and saves the log information in a pw console compatible way. 202 203 Arg: 204 logger: The logger to emit the log information to. 205 """ 206 # Fields used for pw console table view. 207 log.metadata_fields['module'] = log.module_name 208 log.metadata_fields['source_name'] = log.source_name 209 log.metadata_fields['timestamp'] = log.timestamp 210 log.metadata_fields['msg'] = log.message 211 log.metadata_fields['file'] = log.file_and_line 212 213 logger.log( 214 log.level, 215 '[%s] %s %s %s %s', 216 log.source_name, 217 log.module_name, 218 log.timestamp, 219 log.message, 220 log.file_and_line, 221 extra=dict(extra_metadata_fields=log.metadata_fields), 222 ) 223 224 225def timestamp_parser_ns_since_boot(timestamp: int) -> str: 226 """Decodes timestamp as nanoseconds since boot. 227 228 Args: 229 timestamp: The timestamp as an integer. 230 Returns: 231 A string representation of the timestamp. 232 """ 233 return str(datetime.timedelta(seconds=timestamp / 1e9))[:-3] 234 235 236class LogStreamDecoder: 237 """Decodes an RPC stream of LogEntries packets. 238 239 Performs log drop detection on the stream of LogEntries proto messages. 240 241 Args: 242 decoded_log_handler: Callback called on each decoded log. 243 detokenizer: Detokenizes log messages if tokenized when provided. 244 source_name: Optional string to identify the logs source. 245 timestamp_parser: Optional timestamp parser number to a string. 246 message_parser: Optional message parser called after detokenization is 247 attempted on a log message. 248 """ 249 250 DROP_REASON_LOSS_AT_TRANSPORT = 'loss at transport' 251 DROP_REASON_SOURCE_NOT_CONNECTED = 'source not connected' 252 DROP_REASON_SOURCE_ENQUEUE_FAILURE = 'enqueue failure at source' 253 254 def __init__( 255 self, 256 decoded_log_handler: Callable[[Log], None], 257 detokenizer: Detokenizer | None = None, 258 source_name: str = '', 259 timestamp_parser: Callable[[int], str] | None = None, 260 message_parser: Callable[[str], str] | None = None, 261 ): 262 self.decoded_log_handler = decoded_log_handler 263 self.detokenizer = detokenizer 264 self.source_name = source_name 265 self.timestamp_parser = timestamp_parser 266 self.message_parser = message_parser 267 self._expected_log_sequence_id = 0 268 269 def parse_log_entries_proto( 270 self, log_entries_proto: log_pb2.LogEntries 271 ) -> None: 272 """Parses each LogEntry in log_entries_proto. 273 274 Args: 275 log_entry_proto: A LogEntry message proto. 276 Returns: 277 A Log object with the decoded log_entry_proto. 278 """ 279 has_received_logs = self._expected_log_sequence_id > 0 280 dropped_log_count = self._calculate_dropped_logs(log_entries_proto) 281 if dropped_log_count > 0: 282 reason = ( 283 self.DROP_REASON_LOSS_AT_TRANSPORT 284 if has_received_logs 285 else self.DROP_REASON_SOURCE_NOT_CONNECTED 286 ) 287 self.decoded_log_handler( 288 self._handle_log_drop_count(dropped_log_count, reason) 289 ) 290 elif dropped_log_count < 0: 291 _LOG.error('Log sequence ID is smaller than expected') 292 293 for i, log_entry_proto in enumerate(log_entries_proto.entries): 294 # Handle dropped count first. 295 if log_entry_proto.dropped: 296 # Avoid duplicating drop reports since the device will report 297 # a drop count due to a transmission failure, of the last 298 # attempted transmission only, in the first entry of the next 299 # successful transmission. 300 if i == 0 and dropped_log_count >= log_entry_proto.dropped: 301 continue 302 parsed_log = self.parse_log_entry_proto(log_entry_proto) 303 self.decoded_log_handler(parsed_log) 304 305 def parse_log_entry_proto(self, log_entry_proto: log_pb2.LogEntry) -> Log: 306 """Parses the log_entry_proto contents into a human readable format. 307 308 Args: 309 log_entry_proto: A LogEntry message proto. 310 Returns: 311 A Log object with the decoded log_entry_proto. 312 """ 313 detokenized_message = self._decode_optionally_tokenized_field( 314 log_entry_proto.message 315 ) 316 # Handle dropped count first. 317 if log_entry_proto.dropped: 318 drop_reason = self.DROP_REASON_SOURCE_ENQUEUE_FAILURE 319 if detokenized_message: 320 drop_reason = detokenized_message.lower() 321 return self._handle_log_drop_count( 322 log_entry_proto.dropped, drop_reason 323 ) 324 325 # Parse message and metadata, if any, encoded in a key-value format as 326 # described in pw_log/log.proto LogEntry::message field. 327 message_and_metadata = pw_log_tokenized.FormatStringWithMetadata( 328 detokenized_message 329 ) 330 module_name = self._decode_optionally_tokenized_field( 331 log_entry_proto.module 332 ) 333 if not module_name: 334 module_name = message_and_metadata.module 335 336 line_level_tuple = Log.unpack_line_level(log_entry_proto.line_level) 337 file_and_line = self._decode_optionally_tokenized_field( 338 log_entry_proto.file 339 ) 340 if not file_and_line: 341 # Set file name if found in the metadata. 342 if message_and_metadata.file is not None: 343 file_and_line = message_and_metadata.file 344 else: 345 file_and_line = '' 346 347 # Add line number to filepath if needed. 348 if line_level_tuple.line and ':' not in file_and_line: 349 file_and_line += f':{line_level_tuple.line}' 350 # Add extra log information avoiding duplicated data. 351 metadata_fields = { 352 k: v 353 for k, v in message_and_metadata.fields.items() 354 if k not in ['file', 'module', 'msg'] 355 } 356 357 message = message_and_metadata.message 358 if self.message_parser: 359 message = self.message_parser(message) 360 if self.timestamp_parser: 361 timestamp = self.timestamp_parser(log_entry_proto.timestamp) 362 else: 363 timestamp = str(log_entry_proto.timestamp) 364 log = Log( 365 message=message, 366 level=line_level_tuple.level, 367 flags=log_entry_proto.flags, 368 timestamp=timestamp, 369 module_name=module_name, 370 thread_name=self._decode_optionally_tokenized_field( 371 log_entry_proto.thread 372 ), 373 source_name=self.source_name, 374 file_and_line=file_and_line, 375 metadata_fields=metadata_fields, 376 ) 377 378 return log 379 380 def _handle_log_drop_count(self, drop_count: int, reason: str) -> Log: 381 log_word = 'logs' if drop_count > 1 else 'log' 382 log = Log( 383 message=f'Dropped {drop_count} {log_word} due to {reason}', 384 level=logging.WARNING, 385 source_name=self.source_name, 386 ) 387 return log 388 389 def _calculate_dropped_logs( 390 self, log_entries_proto: log_pb2.LogEntries 391 ) -> int: 392 # Count log messages received that don't use the dropped field. 393 messages_received = sum( 394 1 if not log_proto.dropped else 0 395 for log_proto in log_entries_proto.entries 396 ) 397 dropped_log_count = ( 398 log_entries_proto.first_entry_sequence_id 399 - self._expected_log_sequence_id 400 ) 401 self._expected_log_sequence_id = ( 402 log_entries_proto.first_entry_sequence_id + messages_received 403 ) 404 return dropped_log_count 405 406 def _decode_optionally_tokenized_field(self, field: bytes) -> str: 407 """Decodes tokenized field into a printable string. 408 Args: 409 field: Bytes. 410 Returns: 411 A printable string. 412 """ 413 if self.detokenizer: 414 return decode_optionally_tokenized(self.detokenizer, field) 415 return field.decode('utf-8') 416