1# Copyright 2021 The Pigweed Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may not 4# use this file except in compliance with the License. You may obtain a copy of 5# the License at 6# 7# https://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations under 13# the License. 14"""Device classes to interact with targets via RPC.""" 15 16import datetime 17import logging 18from pathlib import Path 19from types import ModuleType 20from typing import Any, Callable, List, Union, Optional 21 22from pw_hdlc.rpc import HdlcRpcClient, default_channels 23import pw_log_tokenized 24 25from pw_log.proto import log_pb2 26from pw_rpc import callback_client, console_tools 27from pw_status import Status 28from pw_tokenizer.detokenize import Detokenizer 29from pw_tokenizer.proto import decode_optionally_tokenized 30 31# Internal log for troubleshooting this tool (the console). 32_LOG = logging.getLogger('tools') 33DEFAULT_DEVICE_LOGGER = logging.getLogger('rpc_device') 34 35 36class Device: 37 """Represents an RPC Client for a device running a Pigweed target. 38 39 The target must have and RPC support, RPC logging. 40 Note: use this class as a base for specialized device representations. 41 """ 42 def __init__(self, 43 channel_id: int, 44 read, 45 write, 46 proto_library: List[Union[ModuleType, Path]], 47 detokenizer: Optional[Detokenizer], 48 timestamp_decoder: Optional[Callable[[int], str]], 49 rpc_timeout_s=5): 50 self.channel_id = channel_id 51 self.protos = proto_library 52 self.detokenizer = detokenizer 53 54 self.logger = DEFAULT_DEVICE_LOGGER 55 self.logger.setLevel(logging.DEBUG) # Allow all device logs through. 56 self.timestamp_decoder = timestamp_decoder 57 self._expected_log_sequence_id = 0 58 59 callback_client_impl = callback_client.Impl( 60 default_unary_timeout_s=rpc_timeout_s, 61 default_stream_timeout_s=None, 62 ) 63 self.client = HdlcRpcClient( 64 read, 65 self.protos, 66 default_channels(write), 67 lambda data: self.logger.info("%s", str(data)), 68 client_impl=callback_client_impl) 69 70 # Start listening to logs as soon as possible. 71 self.listen_to_log_stream() 72 73 def info(self) -> console_tools.ClientInfo: 74 return console_tools.ClientInfo('device', self.rpcs, 75 self.client.client) 76 77 @property 78 def rpcs(self) -> Any: 79 """Returns an object for accessing services on the specified channel.""" 80 return next(iter(self.client.client.channels())).rpcs 81 82 def listen_to_log_stream(self): 83 """Opens a log RPC for the device's unrequested log stream. 84 85 The RPCs remain open until the server cancels or closes them, either 86 with a response or error packet. 87 """ 88 self.rpcs.pw.log.Logs.Listen.open( 89 on_next=lambda _, log_entries_proto: self. 90 _log_entries_proto_parser(log_entries_proto), 91 on_completed=lambda _, status: _LOG.info( 92 'Log stream completed with status: %s', status), 93 on_error=lambda _, error: self._handle_log_stream_error(error)) 94 95 def _handle_log_stream_error(self, error: Status): 96 """Resets the log stream RPC on error to avoid losing logs.""" 97 _LOG.error('Log stream error: %s', error) 98 99 # Only re-request logs if the RPC was not cancelled by the client. 100 if error != Status.CANCELLED: 101 self.listen_to_log_stream() 102 103 def _handle_log_drop_count(self, drop_count: int, reason: str): 104 log_text = 'log' if drop_count == 1 else 'logs' 105 message = f'Dropped {drop_count} {log_text} due to {reason}' 106 self._emit_device_log(logging.WARNING, '', '', '', message) 107 108 def _check_for_dropped_logs(self, log_entries_proto: log_pb2.LogEntries): 109 # Count log messages received that don't use the dropped field. 110 messages_received = sum(1 if not log_proto.dropped else 0 111 for log_proto in log_entries_proto.entries) 112 dropped_log_count = (log_entries_proto.first_entry_sequence_id - 113 self._expected_log_sequence_id) 114 self._expected_log_sequence_id = ( 115 log_entries_proto.first_entry_sequence_id + messages_received) 116 if dropped_log_count > 0: 117 self._handle_log_drop_count(dropped_log_count, 'loss at transport') 118 elif dropped_log_count < 0: 119 _LOG.error('Log sequence ID is smaller than expected') 120 121 def _log_entries_proto_parser(self, log_entries_proto: log_pb2.LogEntries): 122 self._check_for_dropped_logs(log_entries_proto) 123 for log_proto in log_entries_proto.entries: 124 decoded_timestamp = self.decode_timestamp(log_proto.timestamp) 125 # Parse level and convert to logging module level number. 126 level = (log_proto.line_level & 0x7) * 10 127 if self.detokenizer: 128 message = str( 129 decode_optionally_tokenized(self.detokenizer, 130 log_proto.message)) 131 else: 132 message = log_proto.message.decode("utf-8") 133 log = pw_log_tokenized.FormatStringWithMetadata(message) 134 135 # Handle dropped count. 136 if log_proto.dropped: 137 drop_reason = log_proto.message.decode("utf-8").lower( 138 ) if log_proto.message else 'enqueue failure on device' 139 self._handle_log_drop_count(log_proto.dropped, drop_reason) 140 continue 141 self._emit_device_log(level, '', decoded_timestamp, log.module, 142 log.message, **dict(log.fields)) 143 144 def _emit_device_log(self, level: int, source_name: str, timestamp: str, 145 module_name: str, message: str, **metadata_fields): 146 # Fields used for console table view 147 fields = metadata_fields 148 fields['source_name'] = source_name 149 fields['timestamp'] = timestamp 150 fields['msg'] = message 151 fields['module'] = module_name 152 153 # Format used for file or stdout logging. 154 self.logger.log(level, 155 '[%s] %s %s%s', 156 source_name, 157 timestamp, 158 f'{module_name} '.lstrip(), 159 message, 160 extra=dict(extra_metadata_fields=fields)) 161 162 def decode_timestamp(self, timestamp: int) -> str: 163 """Decodes timestamp to a human-readable value. 164 165 Defaults to interpreting the input timestamp as nanoseconds since boot. 166 Devices can override this to match their timestamp units. 167 """ 168 if self.timestamp_decoder: 169 return self.timestamp_decoder(timestamp) 170 return str(datetime.timedelta(seconds=timestamp / 1e9))[:-3] 171