• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1# Copyright 2023 The Pigweed Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may not
4# use this file except in compliance with the License. You may obtain a copy of
5# the License at
6#
7#     https://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations under
13# the License.
14
15"""Utils to decode logs."""
16
17import logging
18
19from pw_log.log_decoder import LogStreamDecoder
20from pw_log.proto import log_pb2
21import pw_rpc
22import pw_status
23
24_LOG = logging.getLogger(__name__)
25
26
27class LogStreamHandler:
28    """Handles an RPC Log Stream.
29
30    Args:
31        rpcs: RPC services to request RPC Log Streams.
32        decoder: LogStreamDecoder
33    """
34
35    def __init__(
36        self, rpcs: pw_rpc.client.Services, decoder: LogStreamDecoder
37    ) -> None:
38        self.rpcs = rpcs
39        self._decoder = decoder
40
41    def listen_to_logs(self) -> None:
42        """Requests Logs streamed over RPC.
43
44        The RPCs remain open until the server cancels or closes them, either
45        with a response or error packet.
46        """
47
48        def on_log_entries(_, log_entries_proto: log_pb2.LogEntries) -> None:
49            self._decoder.parse_log_entries_proto(log_entries_proto)
50
51        self.rpcs.pw.log.Logs.Listen.open(
52            on_next=on_log_entries,
53            on_completed=lambda _, status: self.handle_log_stream_completed(
54                status
55            ),
56            on_error=lambda _, error: self.handle_log_stream_error(error),
57        )
58
59    def handle_log_stream_error(self, error: pw_status.Status) -> None:
60        """Resets the log stream RPC on error to avoid losing logs.
61
62        Override this function to change default behavior.
63        """
64        _LOG.error(
65            'Log stream error: %s from source %s',
66            error,
67            self.source_name,
68        )
69        # Only re-request logs if the RPC was not cancelled by the client.
70        if error != pw_status.Status.CANCELLED:
71            self.listen_to_logs()
72
73    def handle_log_stream_completed(self, status: pw_status.Status) -> None:
74        """Resets the log stream RPC on completed to avoid losing logs.
75
76        Override this function to change default behavior.
77        """
78        _LOG.debug(
79            'Log stream completed with status: %s for source: %s',
80            status,
81            self.source_name,
82        )
83        self.listen_to_logs()
84
85    @property
86    def source_name(self) -> str:
87        return self._decoder.source_name
88