1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_log_rpc/rpc_log_drain.h"
16 
17 #include <limits>
18 #include <mutex>
19 #include <optional>
20 #include <string_view>
21 
22 #include "pw_assert/check.h"
23 #include "pw_chrono/system_clock.h"
24 #include "pw_log/proto/log.pwpb.h"
25 #include "pw_result/result.h"
26 #include "pw_rpc/raw/server_reader_writer.h"
27 #include "pw_span/span.h"
28 #include "pw_status/status.h"
29 #include "pw_status/try.h"
30 
31 namespace pw::log_rpc {
32 namespace {
33 
34 // Creates an encoded drop message on the provided buffer and adds it to the
35 // bulk log entries. Resets the drop count when successfull.
TryEncodeDropMessage(ByteSpan encoded_drop_message_buffer,std::string_view reason,uint32_t & drop_count,log::pwpb::LogEntries::MemoryEncoder & entries_encoder)36 void TryEncodeDropMessage(
37     ByteSpan encoded_drop_message_buffer,
38     std::string_view reason,
39     uint32_t& drop_count,
40     log::pwpb::LogEntries::MemoryEncoder& entries_encoder) {
41   // Encode drop count and reason, if any, in log proto.
42   log::pwpb::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer);
43   if (!reason.empty()) {
44     encoder.WriteMessage(as_bytes(span<const char>(reason))).IgnoreError();
45   }
46   encoder.WriteDropped(drop_count).IgnoreError();
47   if (!encoder.status().ok()) {
48     return;
49   }
50   // Add encoded drop messsage if fits in buffer.
51   ConstByteSpan drop_message(encoder);
52   if (drop_message.size() + RpcLogDrain::kLogEntriesEncodeFrameSize <
53       entries_encoder.ConservativeWriteLimit()) {
54     PW_CHECK_OK(entries_encoder.WriteBytes(
55         static_cast<uint32_t>(log::pwpb::LogEntries::Fields::kEntries),
56         drop_message));
57     drop_count = 0;
58   }
59 }
60 
61 }  // namespace
62 
Open(rpc::RawServerWriter & writer)63 Status RpcLogDrain::Open(rpc::RawServerWriter& writer) {
64   if (!writer.active()) {
65     return Status::FailedPrecondition();
66   }
67   std::lock_guard lock(mutex_);
68   if (server_writer_.active()) {
69     return Status::AlreadyExists();
70   }
71   server_writer_ = std::move(writer);
72   if (on_open_callback_ != nullptr) {
73     on_open_callback_();
74   }
75   return OkStatus();
76 }
77 
Flush(ByteSpan encoding_buffer)78 Status RpcLogDrain::Flush(ByteSpan encoding_buffer) {
79   Status status;
80   SendLogs(std::numeric_limits<size_t>::max(), encoding_buffer, status);
81   return status;
82 }
83 
Trickle(ByteSpan encoding_buffer)84 std::optional<chrono::SystemClock::duration> RpcLogDrain::Trickle(
85     ByteSpan encoding_buffer) {
86   chrono::SystemClock::time_point now = chrono::SystemClock::now();
87   // Called before drain is ready to send more logs. Ignore this request and
88   // remind the caller how much longer they'll need to wait.
89   if (no_writes_until_ > now) {
90     return no_writes_until_ - now;
91   }
92 
93   Status encoding_status;
94   if (SendLogs(max_bundles_per_trickle_, encoding_buffer, encoding_status) ==
95       LogDrainState::kCaughtUp) {
96     return std::nullopt;
97   }
98 
99   no_writes_until_ = chrono::SystemClock::TimePointAfterAtLeast(trickle_delay_);
100   return trickle_delay_;
101 }
102 
SendLogs(size_t max_num_bundles,ByteSpan encoding_buffer,Status & encoding_status_out)103 RpcLogDrain::LogDrainState RpcLogDrain::SendLogs(size_t max_num_bundles,
104                                                  ByteSpan encoding_buffer,
105                                                  Status& encoding_status_out) {
106   PW_CHECK_NOTNULL(multisink_);
107 
108   LogDrainState log_sink_state = LogDrainState::kMoreEntriesRemaining;
109   std::lock_guard lock(mutex_);
110   size_t sent_bundle_count = 0;
111   while (sent_bundle_count < max_num_bundles &&
112          log_sink_state != LogDrainState::kCaughtUp) {
113     if (!server_writer_.active()) {
114       encoding_status_out = Status::Unavailable();
115       // No reason to keep polling this drain until the writer is opened.
116       return LogDrainState::kCaughtUp;
117     }
118     log::pwpb::LogEntries::MemoryEncoder encoder(encoding_buffer);
119     uint32_t packed_entry_count = 0;
120     log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
121 
122     // Avoid sending empty packets.
123     if (encoder.size() == 0) {
124       continue;
125     }
126 
127     encoder.WriteFirstEntrySequenceId(sequence_id_)
128         .IgnoreError();  // TODO(b/242598609): Handle Status properly
129     sequence_id_ += packed_entry_count;
130     const Status status = server_writer_.Write(encoder);
131     sent_bundle_count++;
132 
133     if (!status.ok() &&
134         error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) {
135       // Only update this drop count when writer errors are not ignored.
136       drop_count_writer_error_ += packed_entry_count;
137       server_writer_.Finish().IgnoreError();
138       encoding_status_out = Status::Aborted();
139       return log_sink_state;
140     }
141   }
142   return log_sink_state;
143 }
144 
EncodeOutgoingPacket(log::pwpb::LogEntries::MemoryEncoder & encoder,uint32_t & packed_entry_count_out)145 RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket(
146     log::pwpb::LogEntries::MemoryEncoder& encoder,
147     uint32_t& packed_entry_count_out) {
148   const size_t total_buffer_size = encoder.ConservativeWriteLimit();
149   do {
150     // Peek entry and get drop count from multisink.
151     uint32_t drop_count = 0;
152     uint32_t ingress_drop_count = 0;
153     Result<multisink::MultiSink::Drain::PeekedEntry> possible_entry =
154         PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count);
155     drop_count_ingress_error_ += ingress_drop_count;
156 
157     // Check if the entry fits in the entry buffer.
158     if (possible_entry.status().IsResourceExhausted()) {
159       ++drop_count_small_stack_buffer_;
160       continue;
161     }
162 
163     // Check if there are any entries left.
164     if (possible_entry.status().IsOutOfRange()) {
165       // Stash multisink's reported drop count that will be reported later with
166       // any other drop counts.
167       drop_count_slow_drain_ += drop_count;
168       return LogDrainState::kCaughtUp;  // There are no more entries.
169     }
170 
171     // At this point all expected errors have been handled.
172     PW_CHECK_OK(possible_entry.status());
173 
174     // Check if the entry passes any set filter rules.
175     if (filter_ != nullptr &&
176         filter_->ShouldDropLog(possible_entry.value().entry())) {
177       // Add the drop count from the multisink peek, stored in `drop_count`, to
178       // the total drop count. Then drop the entry without counting it towards
179       // the total drop count. Drops will be reported later all together.
180       drop_count_slow_drain_ += drop_count;
181       PW_CHECK_OK(PopEntry(possible_entry.value()));
182       continue;
183     }
184 
185     // Check if the entry fits in the encoder buffer by itself.
186     const size_t encoded_entry_size =
187         possible_entry.value().entry().size() + kLogEntriesEncodeFrameSize;
188     if (encoded_entry_size + kLogEntriesEncodeFrameSize > total_buffer_size) {
189       // Entry is larger than the entire available buffer.
190       ++drop_count_small_outbound_buffer_;
191       PW_CHECK_OK(PopEntry(possible_entry.value()));
192       continue;
193     }
194 
195     // At this point, we have a valid entry that may fit in the encode buffer.
196     // Report any drop counts combined reusing the log_entry_buffer_ to encode a
197     // drop message.
198     drop_count_slow_drain_ += drop_count;
199     // Account for dropped entries too large for stack buffer, which PeekEntry()
200     // also reports.
201     drop_count_slow_drain_ -= drop_count_small_stack_buffer_;
202     bool log_entry_buffer_has_valid_entry = possible_entry.ok();
203     if (drop_count_slow_drain_ > 0) {
204       TryEncodeDropMessage(log_entry_buffer_,
205                            std::string_view(kSlowDrainErrorMessage),
206                            drop_count_slow_drain_,
207                            encoder);
208       log_entry_buffer_has_valid_entry = false;
209     }
210     if (drop_count_ingress_error_ > 0) {
211       TryEncodeDropMessage(log_entry_buffer_,
212                            std::string_view(kIngressErrorMessage),
213                            drop_count_ingress_error_,
214                            encoder);
215       log_entry_buffer_has_valid_entry = false;
216     }
217     if (drop_count_small_stack_buffer_ > 0) {
218       TryEncodeDropMessage(log_entry_buffer_,
219                            std::string_view(kSmallStackBufferErrorMessage),
220                            drop_count_small_stack_buffer_,
221                            encoder);
222       log_entry_buffer_has_valid_entry = false;
223     }
224     if (drop_count_small_outbound_buffer_ > 0) {
225       TryEncodeDropMessage(log_entry_buffer_,
226                            std::string_view(kSmallOutboundBufferErrorMessage),
227                            drop_count_small_outbound_buffer_,
228                            encoder);
229       log_entry_buffer_has_valid_entry = false;
230     }
231     if (drop_count_writer_error_ > 0) {
232       TryEncodeDropMessage(log_entry_buffer_,
233                            std::string_view(kWriterErrorMessage),
234                            drop_count_writer_error_,
235                            encoder);
236       log_entry_buffer_has_valid_entry = false;
237     }
238     if (possible_entry.ok() && !log_entry_buffer_has_valid_entry) {
239       PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count)
240                       .status());
241     }
242 
243     // Check if the entry fits in the partially filled encoder buffer.
244     if (encoded_entry_size > encoder.ConservativeWriteLimit()) {
245       // Notify the caller there are more entries to send.
246       return LogDrainState::kMoreEntriesRemaining;
247     }
248 
249     // Encode the entry and remove it from multisink.
250     PW_CHECK_OK(encoder.WriteBytes(
251         static_cast<uint32_t>(log::pwpb::LogEntries::Fields::kEntries),
252         possible_entry.value().entry()));
253     PW_CHECK_OK(PopEntry(possible_entry.value()));
254     ++packed_entry_count_out;
255   } while (true);
256 }
257 
Close()258 Status RpcLogDrain::Close() {
259   std::lock_guard lock(mutex_);
260   return server_writer_.Finish();
261 }
262 
263 }  // namespace pw::log_rpc
264