1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 15 #pragma once 16 17 #include <algorithm> 18 #include <array> 19 #include <cstdint> 20 #include <limits> 21 #include <optional> 22 #include <string_view> 23 24 #include "pw_assert/assert.h" 25 #include "pw_bytes/span.h" 26 #include "pw_chrono/system_clock.h" 27 #include "pw_function/function.h" 28 #include "pw_log/proto/log.pwpb.h" 29 #include "pw_log_rpc/internal/config.h" 30 #include "pw_log_rpc/log_filter.h" 31 #include "pw_multisink/multisink.h" 32 #include "pw_protobuf/serialized_size.h" 33 #include "pw_result/result.h" 34 #include "pw_rpc/raw/server_reader_writer.h" 35 #include "pw_status/status.h" 36 #include "pw_sync/lock_annotations.h" 37 #include "pw_sync/mutex.h" 38 39 namespace pw::log_rpc { 40 41 // RpcLogDrain matches a MultiSink::Drain with with an RPC channel's writer. A 42 // RPC channel ID identifies this drain. The user must attach this drain 43 // to a MultiSink that returns a log::LogEntry, and provide a buffer large 44 // enough to hold the largest log::LogEntry transmittable. The user must call 45 // Flush(), which, on every call, packs as many log::LogEntry items as possible 46 // into a log::LogEntries message, writes the message to the provided writer, 47 // then repeats the process until there are no more entries in the MultiSink or 48 // the writer failed to write the outgoing package and error_handling is set to 49 // `kCloseStreamOnWriterError`. When error_handling is `kIgnoreWriterErrors` the 50 // drain will continue to retrieve log entries out of the MultiSink and attempt 51 // to send them out ignoring the writer errors without sending a drop count. 52 // Note: the error handling and drop count reporting might change in the future. 53 // Log filtering is done using the rules of the Filter provided if any. 54 class RpcLogDrain : public multisink::MultiSink::Drain { 55 public: 56 // Dictates how to handle server writer errors. 57 enum class LogDrainErrorHandling { 58 kIgnoreWriterErrors, 59 kCloseStreamOnWriterError, 60 }; 61 62 // The minimum buffer size, without the message payload or module sizes, 63 // needed to retrieve a log::LogEntry from the attached MultiSink. The user 64 // must account for the max message size to avoid log entry drops. The dropped 65 // field is not accounted since a dropped message has all other fields unset. 66 static constexpr size_t kMinEntrySizeWithoutPayload = 67 protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MESSAGE, 0) + 68 protobuf::SizeOfFieldUint32(log::LogEntry::Fields::LINE_LEVEL) + 69 protobuf::SizeOfFieldUint32(log::LogEntry::Fields::FLAGS) + 70 protobuf::SizeOfFieldInt64(log::LogEntry::Fields::TIMESTAMP) + 71 protobuf::SizeOfFieldBytes(log::LogEntry::Fields::MODULE, 0) + 72 protobuf::SizeOfFieldBytes(log::LogEntry::Fields::FILE, 0) + 73 protobuf::SizeOfFieldBytes(log::LogEntry::Fields::THREAD, 0); 74 75 // Error messages sent when logs are dropped. 76 static constexpr std::string_view kIngressErrorMessage{ 77 PW_LOG_RPC_INGRESS_ERROR_MSG}; 78 static constexpr std::string_view kSlowDrainErrorMessage{ 79 PW_LOG_RPC_SLOW_DRAIN_MSG}; 80 static constexpr std::string_view kSmallOutboundBufferErrorMessage{ 81 PW_LOG_RPC_SMALL_OUTBOUND_BUFFER_MSG}; 82 static constexpr std::string_view kSmallStackBufferErrorMessage{ 83 PW_LOG_RPC_SMALL_STACK_BUFFER_MSG}; 84 static constexpr std::string_view kWriterErrorMessage{ 85 PW_LOG_RPC_WRITER_ERROR_MSG}; 86 // The smallest entry buffer must fit the largest error message, or a typical 87 // token size (4B), whichever is largest. 88 static constexpr size_t kLargestErrorMessageOrTokenSize = 89 std::max({size_t(4), 90 kIngressErrorMessage.size(), 91 kSlowDrainErrorMessage.size(), 92 kSmallOutboundBufferErrorMessage.size(), 93 kSmallStackBufferErrorMessage.size(), 94 kWriterErrorMessage.size()}); 95 static constexpr size_t kMinEntryBufferSize = 96 kMinEntrySizeWithoutPayload + sizeof(kLargestErrorMessageOrTokenSize); 97 98 // When encoding LogEntry in LogEntries, there are kLogEntriesEncodeFrameSize 99 // bytes added to the encoded LogEntry. This constant and kMinEntryBufferSize 100 // can be used to calculate the minimum RPC ChannelOutput buffer size. 101 static constexpr size_t kLogEntriesEncodeFrameSize = 102 protobuf::FieldNumberSizeBytes(log::LogEntries::Fields::ENTRIES) + 103 protobuf::kMaxSizeOfLength + 104 protobuf::SizeOfFieldUint32( 105 log::LogEntries::Fields::FIRST_ENTRY_SEQUENCE_ID); 106 107 // Creates a closed log stream with a writer that can be set at a later time. 108 // The provided buffer must be large enough to hold the largest transmittable 109 // log::LogEntry or a drop count message at the very least. The user can 110 // choose to provide a unique mutex for the drain, or share it to save RAM as 111 // long as they are aware of contengency issues. 112 RpcLogDrain( 113 const uint32_t channel_id, 114 ByteSpan log_entry_buffer, 115 sync::Mutex& mutex, 116 LogDrainErrorHandling error_handling, 117 Filter* filter = nullptr, 118 size_t max_bundles_per_trickle = std::numeric_limits<size_t>::max(), 119 pw::chrono::SystemClock::duration trickle_delay = 120 chrono::SystemClock::duration::zero()) channel_id_(channel_id)121 : channel_id_(channel_id), 122 error_handling_(error_handling), 123 server_writer_(), 124 log_entry_buffer_(log_entry_buffer), 125 drop_count_ingress_error_(0), 126 drop_count_slow_drain_(0), 127 drop_count_small_outbound_buffer_(0), 128 drop_count_small_stack_buffer_(0), 129 drop_count_writer_error_(0), 130 mutex_(mutex), 131 filter_(filter), 132 sequence_id_(0), 133 max_bundles_per_trickle_(max_bundles_per_trickle), 134 trickle_delay_(trickle_delay), 135 no_writes_until_(chrono::SystemClock::now()), 136 on_open_callback_(nullptr) { 137 PW_ASSERT(log_entry_buffer.size_bytes() >= kMinEntryBufferSize); 138 } 139 140 // Not copyable. 141 RpcLogDrain(const RpcLogDrain&) = delete; 142 RpcLogDrain& operator=(const RpcLogDrain&) = delete; 143 144 // Configures the drain with a new open server writer if the current one is 145 // not open. 146 // 147 // Return values: 148 // OK - Successfully set the new open writer. 149 // FAILED_PRECONDITION - The given writer is not open. 150 // ALREADY_EXISTS - an open writer is already set. 151 Status Open(rpc::RawServerWriter& writer) PW_LOCKS_EXCLUDED(mutex_); 152 153 // Accesses log entries and sends them via the writer. Expected to be called 154 // frequently to avoid log drops. If the writer fails to send a packet with 155 // multiple log entries, the entries are dropped and a drop message with the 156 // count is sent. When error_handling is kCloseStreamOnWriterError, the stream 157 // will automatically be closed and Flush will return the writer error. 158 // 159 // Precondition: the drain must be attached to a MultiSink. 160 // 161 // Return values: 162 // OK - all entries were consumed. 163 // ABORTED - there was an error writing the packet, and error_handling equals 164 // `kCloseStreamOnWriterError`. 165 Status Flush(ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_); 166 167 // Writes entries as dictated by this drain's rate limiting configuration. 168 // 169 // Returns: 170 // A minimum wait duration before Trickle() will be ready to write more logs 171 // If no duration is returned, this drain is caught up. 172 std::optional<pw::chrono::SystemClock::duration> Trickle( 173 ByteSpan encoding_buffer) PW_LOCKS_EXCLUDED(mutex_); 174 175 // Ends RPC log stream without flushing. 176 // 177 // Return values: 178 // OK - successfully closed the server writer. 179 // FAILED_PRECONDITION - The given writer is not open. 180 // Errors from the underlying writer send packet. 181 Status Close() PW_LOCKS_EXCLUDED(mutex_); 182 channel_id()183 uint32_t channel_id() const { return channel_id_; } 184 max_bundles_per_trickle()185 size_t max_bundles_per_trickle() const { return max_bundles_per_trickle_; } set_max_bundles_per_trickle(size_t max_num_entries)186 void set_max_bundles_per_trickle(size_t max_num_entries) { 187 max_bundles_per_trickle_ = max_num_entries; 188 } 189 trickle_delay()190 chrono::SystemClock::duration trickle_delay() const { return trickle_delay_; } set_trickle_delay(chrono::SystemClock::duration trickle_delay)191 void set_trickle_delay(chrono::SystemClock::duration trickle_delay) { 192 trickle_delay_ = trickle_delay; 193 } 194 195 // Stores a function that is called when Open() is successful. Pass nulltpr to 196 // clear it. This is useful in cases where the owner of the drain needs to be 197 // notified that the drain was opened. set_on_open_callback(pw::Function<void ()> && callback)198 void set_on_open_callback(pw::Function<void()>&& callback) { 199 on_open_callback_ = std::move(callback); 200 } 201 202 private: 203 enum class LogDrainState { 204 kCaughtUp, 205 kMoreEntriesRemaining, 206 }; 207 208 LogDrainState SendLogs(size_t max_num_bundles, 209 ByteSpan encoding_buffer, 210 Status& encoding_status) PW_LOCKS_EXCLUDED(mutex_); 211 212 // Fills the outgoing buffer with as many entries as possible. 213 LogDrainState EncodeOutgoingPacket(log::LogEntries::MemoryEncoder& encoder, 214 uint32_t& packed_entry_count_out) 215 PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 216 217 const uint32_t channel_id_; 218 const LogDrainErrorHandling error_handling_; 219 rpc::RawServerWriter server_writer_ PW_GUARDED_BY(mutex_); 220 const ByteSpan log_entry_buffer_ PW_GUARDED_BY(mutex_); 221 uint32_t drop_count_ingress_error_ PW_GUARDED_BY(mutex_); 222 uint32_t drop_count_slow_drain_ PW_GUARDED_BY(mutex_); 223 uint32_t drop_count_small_outbound_buffer_ PW_GUARDED_BY(mutex_); 224 uint32_t drop_count_small_stack_buffer_ PW_GUARDED_BY(mutex_); 225 uint32_t drop_count_writer_error_ PW_GUARDED_BY(mutex_); 226 sync::Mutex& mutex_; 227 Filter* filter_; 228 uint32_t sequence_id_; 229 size_t max_bundles_per_trickle_; 230 pw::chrono::SystemClock::duration trickle_delay_; 231 pw::chrono::SystemClock::time_point no_writes_until_; 232 pw::Function<void()> on_open_callback_; 233 }; 234 235 } // namespace pw::log_rpc 236