• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14
15"""Utils to decode logs."""
16
17from dataclasses import dataclass
18import datetime
19import logging
20import re
21from typing import Any, Callable
22
23from pw_log.proto import log_pb2
24import pw_log_tokenized
25import pw_status
26from pw_tokenizer import Detokenizer
27from pw_tokenizer.proto import decode_optionally_tokenized
28
29_LOG = logging.getLogger(__name__)
30
31
32@dataclass(frozen=True)
33class LogLineLevel:
34    """Tuple of line number and level packed in LogEntry."""
35
36    line: int
37    level: int
38
39
40class Log:
41    """A decoded, human-readable representation of a LogEntry.
42
43    Contains fields to represent a decoded pw_log/log.proto LogEntry message in
44    a human readable way.
45
46    Attributes:
47        message: The log message as a string.
48        level: A integer representing the log level, follows logging levels.
49        flags: An integer with the bit flags.
50        timestamp: A string representation of a timestamp.
51        module_name: The module name as a string.
52        thread_name: The thread name as a string.
53        source_name: The source name as a string.
54        file_and_line: The filepath and line as a string.
55        metadata_fields: Extra fields with string-string mapping.
56    """
57
58    _LOG_LEVEL_NAMES = {
59        logging.DEBUG: 'DBG',
60        logging.INFO: 'INF',
61        logging.WARNING: 'WRN',
62        logging.ERROR: 'ERR',
63        logging.CRITICAL: 'CRT',
64        # The logging module does not have a FATAL level. This module's log
65        # level values are 10 * PW_LOG_LEVEL values. PW_LOG_LEVEL_FATAL is 7.
66        70: 'FTL',
67    }
68    _LEVEL_MASK = 0x7  # pylint: disable=C0103
69    _LINE_OFFSET = 3  # pylint: disable=C0103
70
71    def __init__(
72        self,
73        message: str = '',
74        level: int = logging.NOTSET,
75        flags: int = 0,
76        timestamp: str = '',
77        module_name: str = '',
78        thread_name: str = '',
79        source_name: str = '',
80        file_and_line: str = '',
81        metadata_fields: dict[str, str] | None = None,
82    ) -> None:
83        self.message = message
84        self.level = level  # Value from logging levels.
85        self.flags = flags
86        self.timestamp = timestamp  # A human readable value.
87        self.module_name = module_name
88        self.thread_name = thread_name
89        self.source_name = source_name
90        self.file_and_line = file_and_line
91        self.metadata_fields = dict()
92        if metadata_fields:
93            self.metadata_fields.update(metadata_fields)
94
95    def __eq__(self, other: Any) -> bool:
96        if not isinstance(other, Log):
97            return False
98
99        return (
100            self.message == other.message
101            and self.level == other.level
102            and self.flags == other.flags
103            and self.timestamp == other.timestamp
104            and self.module_name == other.module_name
105            and self.thread_name == other.thread_name
106            and self.source_name == other.source_name
107            and self.file_and_line == other.file_and_line
108            and self.metadata_fields == other.metadata_fields
109        )
110
111    def __repr__(self) -> str:
112        return self.__str__()
113
114    def __str__(self) -> str:
115        level_name = self._LOG_LEVEL_NAMES.get(self.level, '')
116        metadata = ' '.join(map(str, self.metadata_fields.values()))
117        return (
118            f'{level_name} [{self.source_name}] {self.module_name} '
119            f'{self.timestamp} {self.message} {self.file_and_line} '
120            f'{metadata}'
121        ).strip()
122
123    @staticmethod
124    def pack_line_level(line: int, logging_log_level: int) -> int:
125        """Packs the line and level values into an integer as done in LogEntry.
126
127        Args:
128            line: the line number
129            level: the logging level using logging levels.
130        Returns:
131            An integer with the two values bitpacked.
132        """
133        return (line << Log._LINE_OFFSET) | (
134            Log.logging_level_to_pw_log_level(logging_log_level)
135            & Log._LEVEL_MASK
136        )
137
138    @staticmethod
139    def unpack_line_level(packed_line_level: int) -> LogLineLevel:
140        """Unpacks the line and level values packed as done in LogEntry.
141
142        Args:
143            An integer with the two values bitpacked.
144        Returns:
145            line: the line number
146            level: the logging level using logging levels.
147        """
148        line = packed_line_level >> Log._LINE_OFFSET
149        # Convert to logging module level number.
150        level = Log.pw_log_level_to_logging_level(
151            packed_line_level & Log._LEVEL_MASK
152        )
153        return LogLineLevel(line=line, level=level)
154
155    @staticmethod
156    def pw_log_level_to_logging_level(pw_log_level: int) -> int:
157        """Maps a pw_log/levels.h value to Python logging level value."""
158        return pw_log_level * 10
159
160    @staticmethod
161    def logging_level_to_pw_log_level(logging_log_level: int) -> int:
162        """Maps a Python logging level value to a pw_log/levels.h value."""
163        return int(logging_log_level / 10)
164
165
166def pw_status_code_to_name(
167    message: str, status_pattern: str = 'pw::Status=([0-9]+)'
168) -> str:
169    """Replaces the pw::Status code number with the status name.
170
171    Searches for a matching pattern encapsulating the status code number and
172    replaces it with the status name. This is useful when logging the status
173    code instead of the string to save space.
174
175    Args:
176        message: The string to look for the status code.
177        status_pattern: The regex pattern describing the format encapsulating
178          the status code.
179    Returns:
180       The message string with the status name if found, else the original
181       string.
182    """
183    # Min and max values for Status.
184    max_status_value = max(status.value for status in pw_status.Status)
185    min_status_value = min(status.value for status in pw_status.Status)
186
187    def replacement_callback(match: re.Match) -> str:
188        status_code = int(match.group(1))
189        if min_status_value <= status_code <= max_status_value:
190            return pw_status.Status(status_code).name
191        return match.group(0)
192
193    return re.sub(
194        pattern=status_pattern, repl=replacement_callback, string=message
195    )
196
197
198def log_decoded_log(
199    log: Log, logger: logging.Logger | logging.LoggerAdapter
200) -> None:
201    """Formats and saves the log information in a pw console compatible way.
202
203    Arg:
204        logger: The logger to emit the log information to.
205    """
206    # Fields used for pw console table view.
207    log.metadata_fields['module'] = log.module_name
208    log.metadata_fields['source_name'] = log.source_name
209    log.metadata_fields['timestamp'] = log.timestamp
210    log.metadata_fields['msg'] = log.message
211    log.metadata_fields['file'] = log.file_and_line
212
213    logger.log(
214        log.level,
215        '[%s] %s %s %s %s',
216        log.source_name,
217        log.module_name,
218        log.timestamp,
219        log.message,
220        log.file_and_line,
221        extra=dict(extra_metadata_fields=log.metadata_fields),
222    )
223
224
225def timestamp_parser_ns_since_boot(timestamp: int) -> str:
226    """Decodes timestamp as nanoseconds since boot.
227
228    Args:
229        timestamp: The timestamp as an integer.
230    Returns:
231        A string representation of the timestamp.
232    """
233    return str(datetime.timedelta(seconds=timestamp / 1e9))[:-3]
234
235
236class LogStreamDecoder:
237    """Decodes an RPC stream of LogEntries packets.
238
239    Performs log drop detection on the stream of LogEntries proto messages.
240
241    Args:
242        decoded_log_handler: Callback called on each decoded log.
243        detokenizer: Detokenizes log messages if tokenized when provided.
244        source_name: Optional string to identify the logs source.
245        timestamp_parser: Optional timestamp parser number to a string.
246        message_parser: Optional message parser called after detokenization is
247          attempted on a log message.
248    """
249
250    DROP_REASON_LOSS_AT_TRANSPORT = 'loss at transport'
251    DROP_REASON_SOURCE_NOT_CONNECTED = 'source not connected'
252    DROP_REASON_SOURCE_ENQUEUE_FAILURE = 'enqueue failure at source'
253
254    def __init__(
255        self,
256        decoded_log_handler: Callable[[Log], None],
257        detokenizer: Detokenizer | None = None,
258        source_name: str = '',
259        timestamp_parser: Callable[[int], str] | None = None,
260        message_parser: Callable[[str], str] | None = None,
261    ):
262        self.decoded_log_handler = decoded_log_handler
263        self.detokenizer = detokenizer
264        self.source_name = source_name
265        self.timestamp_parser = timestamp_parser
266        self.message_parser = message_parser
267        self._expected_log_sequence_id = 0
268
269    def parse_log_entries_proto(
270        self, log_entries_proto: log_pb2.LogEntries
271    ) -> None:
272        """Parses each LogEntry in log_entries_proto.
273
274        Args:
275            log_entry_proto: A LogEntry message proto.
276        Returns:
277            A Log object with the decoded log_entry_proto.
278        """
279        has_received_logs = self._expected_log_sequence_id > 0
280        dropped_log_count = self._calculate_dropped_logs(log_entries_proto)
281        if dropped_log_count > 0:
282            reason = (
283                self.DROP_REASON_LOSS_AT_TRANSPORT
284                if has_received_logs
285                else self.DROP_REASON_SOURCE_NOT_CONNECTED
286            )
287            self.decoded_log_handler(
288                self._handle_log_drop_count(dropped_log_count, reason)
289            )
290        elif dropped_log_count < 0:
291            _LOG.error('Log sequence ID is smaller than expected')
292
293        for i, log_entry_proto in enumerate(log_entries_proto.entries):
294            # Handle dropped count first.
295            if log_entry_proto.dropped:
296                # Avoid duplicating drop reports since the device will report
297                # a drop count due to a transmission failure, of the last
298                # attempted transmission only, in the first entry of the next
299                # successful transmission.
300                if i == 0 and dropped_log_count >= log_entry_proto.dropped:
301                    continue
302            parsed_log = self.parse_log_entry_proto(log_entry_proto)
303            self.decoded_log_handler(parsed_log)
304
305    def parse_log_entry_proto(self, log_entry_proto: log_pb2.LogEntry) -> Log:
306        """Parses the log_entry_proto contents into a human readable format.
307
308        Args:
309            log_entry_proto: A LogEntry message proto.
310        Returns:
311            A Log object with the decoded log_entry_proto.
312        """
313        detokenized_message = self._decode_optionally_tokenized_field(
314            log_entry_proto.message
315        )
316        # Handle dropped count first.
317        if log_entry_proto.dropped:
318            drop_reason = self.DROP_REASON_SOURCE_ENQUEUE_FAILURE
319            if detokenized_message:
320                drop_reason = detokenized_message.lower()
321            return self._handle_log_drop_count(
322                log_entry_proto.dropped, drop_reason
323            )
324
325        # Parse message and metadata, if any, encoded in a key-value format as
326        # described in pw_log/log.proto LogEntry::message field.
327        message_and_metadata = pw_log_tokenized.FormatStringWithMetadata(
328            detokenized_message
329        )
330        module_name = self._decode_optionally_tokenized_field(
331            log_entry_proto.module
332        )
333        if not module_name:
334            module_name = message_and_metadata.module
335
336        line_level_tuple = Log.unpack_line_level(log_entry_proto.line_level)
337        file_and_line = self._decode_optionally_tokenized_field(
338            log_entry_proto.file
339        )
340        if not file_and_line:
341            # Set file name if found in the metadata.
342            if message_and_metadata.file is not None:
343                file_and_line = message_and_metadata.file
344            else:
345                file_and_line = ''
346
347        # Add line number to filepath if needed.
348        if line_level_tuple.line and ':' not in file_and_line:
349            file_and_line += f':{line_level_tuple.line}'
350        # Add extra log information avoiding duplicated data.
351        metadata_fields = {
352            k: v
353            for k, v in message_and_metadata.fields.items()
354            if k not in ['file', 'module', 'msg']
355        }
356
357        message = message_and_metadata.message
358        if self.message_parser:
359            message = self.message_parser(message)
360        if self.timestamp_parser:
361            timestamp = self.timestamp_parser(log_entry_proto.timestamp)
362        else:
363            timestamp = str(log_entry_proto.timestamp)
364        log = Log(
365            message=message,
366            level=line_level_tuple.level,
367            flags=log_entry_proto.flags,
368            timestamp=timestamp,
369            module_name=module_name,
370            thread_name=self._decode_optionally_tokenized_field(
371                log_entry_proto.thread
372            ),
373            source_name=self.source_name,
374            file_and_line=file_and_line,
375            metadata_fields=metadata_fields,
376        )
377
378        return log
379
380    def _handle_log_drop_count(self, drop_count: int, reason: str) -> Log:
381        log_word = 'logs' if drop_count > 1 else 'log'
382        log = Log(
383            message=f'Dropped {drop_count} {log_word} due to {reason}',
384            level=logging.WARNING,
385            source_name=self.source_name,
386        )
387        return log
388
389    def _calculate_dropped_logs(
390        self, log_entries_proto: log_pb2.LogEntries
391    ) -> int:
392        # Count log messages received that don't use the dropped field.
393        messages_received = sum(
394            1 if not log_proto.dropped else 0
395            for log_proto in log_entries_proto.entries
396        )
397        dropped_log_count = (
398            log_entries_proto.first_entry_sequence_id
399            - self._expected_log_sequence_id
400        )
401        self._expected_log_sequence_id = (
402            log_entries_proto.first_entry_sequence_id + messages_received
403        )
404        return dropped_log_count
405
406    def _decode_optionally_tokenized_field(self, field: bytes) -> str:
407        """Decodes tokenized field into a printable string.
408        Args:
409            field: Bytes.
410        Returns:
411            A printable string.
412        """
413        if self.detokenizer:
414            return decode_optionally_tokenized(self.detokenizer, field)
415        return field.decode('utf-8')
416