1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_log_rpc/rpc_log_drain.h"
16
17 #include <limits>
18 #include <mutex>
19 #include <optional>
20 #include <span>
21 #include <string_view>
22
23 #include "pw_assert/check.h"
24 #include "pw_chrono/system_clock.h"
25 #include "pw_log/proto/log.pwpb.h"
26 #include "pw_result/result.h"
27 #include "pw_rpc/raw/server_reader_writer.h"
28 #include "pw_status/status.h"
29 #include "pw_status/try.h"
30
31 namespace pw::log_rpc {
32 namespace {
33
34 // Creates an encoded drop message on the provided buffer and adds it to the
35 // bulk log entries. Resets the drop count when successfull.
TryEncodeDropMessage(ByteSpan encoded_drop_message_buffer,std::string_view reason,uint32_t & drop_count,log::LogEntries::MemoryEncoder & entries_encoder)36 void TryEncodeDropMessage(ByteSpan encoded_drop_message_buffer,
37 std::string_view reason,
38 uint32_t& drop_count,
39 log::LogEntries::MemoryEncoder& entries_encoder) {
40 // Encode drop count and reason, if any, in log proto.
41 log::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer);
42 if (!reason.empty()) {
43 encoder.WriteMessage(std::as_bytes(std::span(reason))).IgnoreError();
44 }
45 encoder.WriteDropped(drop_count).IgnoreError();
46 if (!encoder.status().ok()) {
47 return;
48 }
49 // Add encoded drop messsage if fits in buffer.
50 ConstByteSpan drop_message(encoder);
51 if (drop_message.size() + RpcLogDrain::kLogEntriesEncodeFrameSize <
52 entries_encoder.ConservativeWriteLimit()) {
53 PW_CHECK_OK(entries_encoder.WriteBytes(
54 static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES), drop_message));
55 drop_count = 0;
56 }
57 }
58
59 } // namespace
60
Open(rpc::RawServerWriter & writer)61 Status RpcLogDrain::Open(rpc::RawServerWriter& writer) {
62 if (!writer.active()) {
63 return Status::FailedPrecondition();
64 }
65 std::lock_guard lock(mutex_);
66 if (server_writer_.active()) {
67 return Status::AlreadyExists();
68 }
69 server_writer_ = std::move(writer);
70 if (on_open_callback_ != nullptr) {
71 on_open_callback_();
72 }
73 return OkStatus();
74 }
75
Flush(ByteSpan encoding_buffer)76 Status RpcLogDrain::Flush(ByteSpan encoding_buffer) {
77 Status status;
78 SendLogs(std::numeric_limits<size_t>::max(), encoding_buffer, status);
79 return status;
80 }
81
Trickle(ByteSpan encoding_buffer)82 std::optional<chrono::SystemClock::duration> RpcLogDrain::Trickle(
83 ByteSpan encoding_buffer) {
84 chrono::SystemClock::time_point now = chrono::SystemClock::now();
85 // Called before drain is ready to send more logs. Ignore this request and
86 // remind the caller how much longer they'll need to wait.
87 if (no_writes_until_ > now) {
88 return no_writes_until_ - now;
89 }
90
91 Status encoding_status;
92 if (SendLogs(max_bundles_per_trickle_, encoding_buffer, encoding_status) ==
93 LogDrainState::kCaughtUp) {
94 return std::nullopt;
95 }
96
97 no_writes_until_ = chrono::SystemClock::TimePointAfterAtLeast(trickle_delay_);
98 return trickle_delay_;
99 }
100
SendLogs(size_t max_num_bundles,ByteSpan encoding_buffer,Status & encoding_status_out)101 RpcLogDrain::LogDrainState RpcLogDrain::SendLogs(size_t max_num_bundles,
102 ByteSpan encoding_buffer,
103 Status& encoding_status_out) {
104 PW_CHECK_NOTNULL(multisink_);
105
106 LogDrainState log_sink_state = LogDrainState::kMoreEntriesRemaining;
107 std::lock_guard lock(mutex_);
108 size_t sent_bundle_count = 0;
109 while (sent_bundle_count < max_num_bundles &&
110 log_sink_state != LogDrainState::kCaughtUp) {
111 if (!server_writer_.active()) {
112 encoding_status_out = Status::Unavailable();
113 // No reason to keep polling this drain until the writer is opened.
114 return LogDrainState::kCaughtUp;
115 }
116 log::LogEntries::MemoryEncoder encoder(encoding_buffer);
117 uint32_t packed_entry_count = 0;
118 log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
119
120 // Avoid sending empty packets.
121 if (encoder.size() == 0) {
122 continue;
123 }
124
125 encoder.WriteFirstEntrySequenceId(sequence_id_)
126 .IgnoreError(); // TODO(pwbug/387): Handle Status properly
127 sequence_id_ += packed_entry_count;
128 const Status status = server_writer_.Write(encoder);
129 sent_bundle_count++;
130
131 if (!status.ok() &&
132 error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) {
133 // Only update this drop count when writer errors are not ignored.
134 drop_count_writer_error_ += packed_entry_count;
135 server_writer_.Finish().IgnoreError();
136 encoding_status_out = Status::Aborted();
137 return log_sink_state;
138 }
139 }
140 return log_sink_state;
141 }
142
EncodeOutgoingPacket(log::LogEntries::MemoryEncoder & encoder,uint32_t & packed_entry_count_out)143 RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket(
144 log::LogEntries::MemoryEncoder& encoder, uint32_t& packed_entry_count_out) {
145 const size_t total_buffer_size = encoder.ConservativeWriteLimit();
146 do {
147 // Peek entry and get drop count from multisink.
148 uint32_t drop_count = 0;
149 uint32_t ingress_drop_count = 0;
150 Result<multisink::MultiSink::Drain::PeekedEntry> possible_entry =
151 PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count);
152 drop_count_ingress_error_ += ingress_drop_count;
153
154 // Check if the entry fits in the entry buffer.
155 if (possible_entry.status().IsResourceExhausted()) {
156 ++drop_count_small_stack_buffer_;
157 continue;
158 }
159
160 // Check if there are any entries left.
161 if (possible_entry.status().IsOutOfRange()) {
162 // Stash multisink's reported drop count that will be reported later with
163 // any other drop counts.
164 drop_count_slow_drain_ += drop_count;
165 return LogDrainState::kCaughtUp; // There are no more entries.
166 }
167
168 // At this point all expected errors have been handled.
169 PW_CHECK_OK(possible_entry.status());
170
171 // Check if the entry passes any set filter rules.
172 if (filter_ != nullptr &&
173 filter_->ShouldDropLog(possible_entry.value().entry())) {
174 // Add the drop count from the multisink peek, stored in `drop_count`, to
175 // the total drop count. Then drop the entry without counting it towards
176 // the total drop count. Drops will be reported later all together.
177 drop_count_slow_drain_ += drop_count;
178 PW_CHECK_OK(PopEntry(possible_entry.value()));
179 continue;
180 }
181
182 // Check if the entry fits in the encoder buffer by itself.
183 const size_t encoded_entry_size =
184 possible_entry.value().entry().size() + kLogEntriesEncodeFrameSize;
185 if (encoded_entry_size + kLogEntriesEncodeFrameSize > total_buffer_size) {
186 // Entry is larger than the entire available buffer.
187 ++drop_count_small_outbound_buffer_;
188 PW_CHECK_OK(PopEntry(possible_entry.value()));
189 continue;
190 }
191
192 // At this point, we have a valid entry that may fit in the encode buffer.
193 // Report any drop counts combined reusing the log_entry_buffer_ to encode a
194 // drop message.
195 drop_count_slow_drain_ += drop_count;
196 // Account for dropped entries too large for stack buffer, which PeekEntry()
197 // also reports.
198 drop_count_slow_drain_ -= drop_count_small_stack_buffer_;
199 bool log_entry_buffer_has_valid_entry = possible_entry.ok();
200 if (drop_count_slow_drain_ > 0) {
201 TryEncodeDropMessage(log_entry_buffer_,
202 std::string_view(kSlowDrainErrorMessage),
203 drop_count_slow_drain_,
204 encoder);
205 log_entry_buffer_has_valid_entry = false;
206 }
207 if (drop_count_ingress_error_ > 0) {
208 TryEncodeDropMessage(log_entry_buffer_,
209 std::string_view(kIngressErrorMessage),
210 drop_count_ingress_error_,
211 encoder);
212 log_entry_buffer_has_valid_entry = false;
213 }
214 if (drop_count_small_stack_buffer_ > 0) {
215 TryEncodeDropMessage(log_entry_buffer_,
216 std::string_view(kSmallStackBufferErrorMessage),
217 drop_count_small_stack_buffer_,
218 encoder);
219 log_entry_buffer_has_valid_entry = false;
220 }
221 if (drop_count_small_outbound_buffer_ > 0) {
222 TryEncodeDropMessage(log_entry_buffer_,
223 std::string_view(kSmallOutboundBufferErrorMessage),
224 drop_count_small_outbound_buffer_,
225 encoder);
226 log_entry_buffer_has_valid_entry = false;
227 }
228 if (drop_count_writer_error_ > 0) {
229 TryEncodeDropMessage(log_entry_buffer_,
230 std::string_view(kWriterErrorMessage),
231 drop_count_writer_error_,
232 encoder);
233 log_entry_buffer_has_valid_entry = false;
234 }
235 if (possible_entry.ok() && !log_entry_buffer_has_valid_entry) {
236 PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count)
237 .status());
238 }
239
240 // Check if the entry fits in the partially filled encoder buffer.
241 if (encoded_entry_size > encoder.ConservativeWriteLimit()) {
242 // Notify the caller there are more entries to send.
243 return LogDrainState::kMoreEntriesRemaining;
244 }
245
246 // Encode the entry and remove it from multisink.
247 PW_CHECK_OK(encoder.WriteBytes(
248 static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES),
249 possible_entry.value().entry()));
250 PW_CHECK_OK(PopEntry(possible_entry.value()));
251 ++packed_entry_count_out;
252 } while (true);
253 }
254
Close()255 Status RpcLogDrain::Close() {
256 std::lock_guard lock(mutex_);
257 return server_writer_.Finish();
258 }
259
260 } // namespace pw::log_rpc
261