1# Copyright 2023 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14 15"""Utils to decode logs.""" 16 17import logging 18 19from pw_log.log_decoder import LogStreamDecoder 20from pw_log.proto import log_pb2 21import pw_rpc 22import pw_status 23 24_LOG = logging.getLogger(__name__) 25 26 27class LogStreamHandler: 28 """Handles an RPC Log Stream. 29 30 Args: 31 rpcs: RPC services to request RPC Log Streams. 32 decoder: LogStreamDecoder 33 """ 34 35 def __init__( 36 self, rpcs: pw_rpc.client.Services, decoder: LogStreamDecoder 37 ) -> None: 38 self.rpcs = rpcs 39 self._decoder = decoder 40 41 def listen_to_logs(self) -> None: 42 """Requests Logs streamed over RPC. 43 44 The RPCs remain open until the server cancels or closes them, either 45 with a response or error packet. 46 """ 47 48 def on_log_entries(_, log_entries_proto: log_pb2.LogEntries) -> None: 49 self._decoder.parse_log_entries_proto(log_entries_proto) 50 51 self.rpcs.pw.log.Logs.Listen.open( 52 on_next=on_log_entries, 53 on_completed=lambda _, status: self.handle_log_stream_completed( 54 status 55 ), 56 on_error=lambda _, error: self.handle_log_stream_error(error), 57 ) 58 59 def handle_log_stream_error(self, error: pw_status.Status) -> None: 60 """Resets the log stream RPC on error to avoid losing logs. 61 62 Override this function to change default behavior. 63 """ 64 _LOG.error( 65 'Log stream error: %s from source %s', 66 error, 67 self.source_name, 68 ) 69 # Only re-request logs if the RPC was not cancelled by the client. 70 if error != pw_status.Status.CANCELLED: 71 self.listen_to_logs() 72 73 def handle_log_stream_completed(self, status: pw_status.Status) -> None: 74 """Resets the log stream RPC on completed to avoid losing logs. 75 76 Override this function to change default behavior. 77 """ 78 _LOG.debug( 79 'Log stream completed with status: %s for source: %s', 80 status, 81 self.source_name, 82 ) 83 self.listen_to_logs() 84 85 @property 86 def source_name(self) -> str: 87 return self._decoder.source_name 88