1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_log_rpc/rpc_log_drain.h"
16
17 #include <limits>
18 #include <mutex>
19 #include <optional>
20 #include <string_view>
21
22 #include "pw_assert/check.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_log/proto/log.pwpb.h"
25 #include "pw_result/result.h"
26 #include "pw_rpc/raw/server_reader_writer.h"
27 #include "pw_span/span.h"
28 #include "pw_status/status.h"
29 #include "pw_status/try.h"
30
31 namespace pw::log_rpc {
32 namespace {
33
34 // Creates an encoded drop message on the provided buffer and adds it to the
35 // bulk log entries. Resets the drop count when successfull.
TryEncodeDropMessage(ByteSpan encoded_drop_message_buffer,std::string_view reason,uint32_t & drop_count,log::pwpb::LogEntries::MemoryEncoder & entries_encoder)36 void TryEncodeDropMessage(
37 ByteSpan encoded_drop_message_buffer,
38 std::string_view reason,
39 uint32_t& drop_count,
40 log::pwpb::LogEntries::MemoryEncoder& entries_encoder) {
41 // Encode drop count and reason, if any, in log proto.
42 log::pwpb::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer);
43 if (!reason.empty()) {
44 encoder.WriteMessage(as_bytes(span<const char>(reason))).IgnoreError();
45 }
46 encoder.WriteDropped(drop_count).IgnoreError();
47 if (!encoder.status().ok()) {
48 return;
49 }
50 // Add encoded drop messsage if fits in buffer.
51 ConstByteSpan drop_message(encoder);
52 if (drop_message.size() + RpcLogDrain::kLogEntriesEncodeFrameSize <
53 entries_encoder.ConservativeWriteLimit()) {
54 PW_CHECK_OK(entries_encoder.WriteBytes(
55 static_cast<uint32_t>(log::pwpb::LogEntries::Fields::kEntries),
56 drop_message));
57 drop_count = 0;
58 }
59 }
60
61 } // namespace
62
Open(rpc::RawServerWriter & writer)63 Status RpcLogDrain::Open(rpc::RawServerWriter& writer) {
64 if (!writer.active()) {
65 return Status::FailedPrecondition();
66 }
67 std::lock_guard lock(mutex_);
68 if (server_writer_.active()) {
69 return Status::AlreadyExists();
70 }
71 server_writer_ = std::move(writer);
72 if (on_open_callback_ != nullptr) {
73 on_open_callback_();
74 }
75 return OkStatus();
76 }
77
Flush(ByteSpan encoding_buffer)78 Status RpcLogDrain::Flush(ByteSpan encoding_buffer) {
79 Status status;
80 SendLogs(std::numeric_limits<size_t>::max(), encoding_buffer, status);
81 return status;
82 }
83
Trickle(ByteSpan encoding_buffer)84 std::optional<chrono::SystemClock::duration> RpcLogDrain::Trickle(
85 ByteSpan encoding_buffer) {
86 chrono::SystemClock::time_point now = chrono::SystemClock::now();
87 // Called before drain is ready to send more logs. Ignore this request and
88 // remind the caller how much longer they'll need to wait.
89 if (no_writes_until_ > now) {
90 return no_writes_until_ - now;
91 }
92
93 Status encoding_status;
94 if (SendLogs(max_bundles_per_trickle_, encoding_buffer, encoding_status) ==
95 LogDrainState::kCaughtUp) {
96 return std::nullopt;
97 }
98
99 no_writes_until_ = chrono::SystemClock::TimePointAfterAtLeast(trickle_delay_);
100 return trickle_delay_;
101 }
102
SendLogs(size_t max_num_bundles,ByteSpan encoding_buffer,Status & encoding_status_out)103 RpcLogDrain::LogDrainState RpcLogDrain::SendLogs(size_t max_num_bundles,
104 ByteSpan encoding_buffer,
105 Status& encoding_status_out) {
106 PW_CHECK_NOTNULL(multisink_);
107
108 LogDrainState log_sink_state = LogDrainState::kMoreEntriesRemaining;
109 std::lock_guard lock(mutex_);
110 size_t sent_bundle_count = 0;
111 while (sent_bundle_count < max_num_bundles &&
112 log_sink_state != LogDrainState::kCaughtUp) {
113 if (!server_writer_.active()) {
114 encoding_status_out = Status::Unavailable();
115 // No reason to keep polling this drain until the writer is opened.
116 return LogDrainState::kCaughtUp;
117 }
118 log::pwpb::LogEntries::MemoryEncoder encoder(encoding_buffer);
119 uint32_t packed_entry_count = 0;
120 log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
121
122 // Avoid sending empty packets.
123 if (encoder.size() == 0) {
124 continue;
125 }
126
127 encoder.WriteFirstEntrySequenceId(sequence_id_)
128 .IgnoreError(); // TODO(b/242598609): Handle Status properly
129 sequence_id_ += packed_entry_count;
130 const Status status = server_writer_.Write(encoder);
131 sent_bundle_count++;
132
133 if (!status.ok() &&
134 error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) {
135 // Only update this drop count when writer errors are not ignored.
136 drop_count_writer_error_ += packed_entry_count;
137 server_writer_.Finish().IgnoreError();
138 encoding_status_out = Status::Aborted();
139 return log_sink_state;
140 }
141 }
142 return log_sink_state;
143 }
144
EncodeOutgoingPacket(log::pwpb::LogEntries::MemoryEncoder & encoder,uint32_t & packed_entry_count_out)145 RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket(
146 log::pwpb::LogEntries::MemoryEncoder& encoder,
147 uint32_t& packed_entry_count_out) {
148 const size_t total_buffer_size = encoder.ConservativeWriteLimit();
149 do {
150 // Peek entry and get drop count from multisink.
151 uint32_t drop_count = 0;
152 uint32_t ingress_drop_count = 0;
153 Result<multisink::MultiSink::Drain::PeekedEntry> possible_entry =
154 PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count);
155 drop_count_ingress_error_ += ingress_drop_count;
156
157 // Check if the entry fits in the entry buffer.
158 if (possible_entry.status().IsResourceExhausted()) {
159 ++drop_count_small_stack_buffer_;
160 continue;
161 }
162
163 // Check if there are any entries left.
164 if (possible_entry.status().IsOutOfRange()) {
165 // Stash multisink's reported drop count that will be reported later with
166 // any other drop counts.
167 drop_count_slow_drain_ += drop_count;
168 return LogDrainState::kCaughtUp; // There are no more entries.
169 }
170
171 // At this point all expected errors have been handled.
172 PW_CHECK_OK(possible_entry.status());
173
174 // Check if the entry passes any set filter rules.
175 if (filter_ != nullptr &&
176 filter_->ShouldDropLog(possible_entry.value().entry())) {
177 // Add the drop count from the multisink peek, stored in `drop_count`, to
178 // the total drop count. Then drop the entry without counting it towards
179 // the total drop count. Drops will be reported later all together.
180 drop_count_slow_drain_ += drop_count;
181 PW_CHECK_OK(PopEntry(possible_entry.value()));
182 continue;
183 }
184
185 // Check if the entry fits in the encoder buffer by itself.
186 const size_t encoded_entry_size =
187 possible_entry.value().entry().size() + kLogEntriesEncodeFrameSize;
188 if (encoded_entry_size + kLogEntriesEncodeFrameSize > total_buffer_size) {
189 // Entry is larger than the entire available buffer.
190 ++drop_count_small_outbound_buffer_;
191 PW_CHECK_OK(PopEntry(possible_entry.value()));
192 continue;
193 }
194
195 // At this point, we have a valid entry that may fit in the encode buffer.
196 // Report any drop counts combined reusing the log_entry_buffer_ to encode a
197 // drop message.
198 drop_count_slow_drain_ += drop_count;
199 // Account for dropped entries too large for stack buffer, which PeekEntry()
200 // also reports.
201 drop_count_slow_drain_ -= drop_count_small_stack_buffer_;
202 bool log_entry_buffer_has_valid_entry = possible_entry.ok();
203 if (drop_count_slow_drain_ > 0) {
204 TryEncodeDropMessage(log_entry_buffer_,
205 std::string_view(kSlowDrainErrorMessage),
206 drop_count_slow_drain_,
207 encoder);
208 log_entry_buffer_has_valid_entry = false;
209 }
210 if (drop_count_ingress_error_ > 0) {
211 TryEncodeDropMessage(log_entry_buffer_,
212 std::string_view(kIngressErrorMessage),
213 drop_count_ingress_error_,
214 encoder);
215 log_entry_buffer_has_valid_entry = false;
216 }
217 if (drop_count_small_stack_buffer_ > 0) {
218 TryEncodeDropMessage(log_entry_buffer_,
219 std::string_view(kSmallStackBufferErrorMessage),
220 drop_count_small_stack_buffer_,
221 encoder);
222 log_entry_buffer_has_valid_entry = false;
223 }
224 if (drop_count_small_outbound_buffer_ > 0) {
225 TryEncodeDropMessage(log_entry_buffer_,
226 std::string_view(kSmallOutboundBufferErrorMessage),
227 drop_count_small_outbound_buffer_,
228 encoder);
229 log_entry_buffer_has_valid_entry = false;
230 }
231 if (drop_count_writer_error_ > 0) {
232 TryEncodeDropMessage(log_entry_buffer_,
233 std::string_view(kWriterErrorMessage),
234 drop_count_writer_error_,
235 encoder);
236 log_entry_buffer_has_valid_entry = false;
237 }
238 if (possible_entry.ok() && !log_entry_buffer_has_valid_entry) {
239 PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count)
240 .status());
241 }
242
243 // Check if the entry fits in the partially filled encoder buffer.
244 if (encoded_entry_size > encoder.ConservativeWriteLimit()) {
245 // Notify the caller there are more entries to send.
246 return LogDrainState::kMoreEntriesRemaining;
247 }
248
249 // Encode the entry and remove it from multisink.
250 PW_CHECK_OK(encoder.WriteBytes(
251 static_cast<uint32_t>(log::pwpb::LogEntries::Fields::kEntries),
252 possible_entry.value().entry()));
253 PW_CHECK_OK(PopEntry(possible_entry.value()));
254 ++packed_entry_count_out;
255 } while (true);
256 }
257
Close()258 Status RpcLogDrain::Close() {
259 std::lock_guard lock(mutex_);
260 return server_writer_.Finish();
261 }
262
263 } // namespace pw::log_rpc
264