• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_log_rpc/rpc_log_drain.h"
16 
17 #include <limits>
18 #include <mutex>
19 #include <optional>
20 #include <span>
21 #include <string_view>
22 
23 #include "pw_assert/check.h"
24 #include "pw_chrono/system_clock.h"
25 #include "pw_log/proto/log.pwpb.h"
26 #include "pw_result/result.h"
27 #include "pw_rpc/raw/server_reader_writer.h"
28 #include "pw_status/status.h"
29 #include "pw_status/try.h"
30 
31 namespace pw::log_rpc {
32 namespace {
33 
34 // Creates an encoded drop message on the provided buffer and adds it to the
35 // bulk log entries. Resets the drop count when successfull.
TryEncodeDropMessage(ByteSpan encoded_drop_message_buffer,std::string_view reason,uint32_t & drop_count,log::LogEntries::MemoryEncoder & entries_encoder)36 void TryEncodeDropMessage(ByteSpan encoded_drop_message_buffer,
37                           std::string_view reason,
38                           uint32_t& drop_count,
39                           log::LogEntries::MemoryEncoder& entries_encoder) {
40   // Encode drop count and reason, if any, in log proto.
41   log::LogEntry::MemoryEncoder encoder(encoded_drop_message_buffer);
42   if (!reason.empty()) {
43     encoder.WriteMessage(std::as_bytes(std::span(reason))).IgnoreError();
44   }
45   encoder.WriteDropped(drop_count).IgnoreError();
46   if (!encoder.status().ok()) {
47     return;
48   }
49   // Add encoded drop messsage if fits in buffer.
50   ConstByteSpan drop_message(encoder);
51   if (drop_message.size() + RpcLogDrain::kLogEntriesEncodeFrameSize <
52       entries_encoder.ConservativeWriteLimit()) {
53     PW_CHECK_OK(entries_encoder.WriteBytes(
54         static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES), drop_message));
55     drop_count = 0;
56   }
57 }
58 
59 }  // namespace
60 
Open(rpc::RawServerWriter & writer)61 Status RpcLogDrain::Open(rpc::RawServerWriter& writer) {
62   if (!writer.active()) {
63     return Status::FailedPrecondition();
64   }
65   std::lock_guard lock(mutex_);
66   if (server_writer_.active()) {
67     return Status::AlreadyExists();
68   }
69   server_writer_ = std::move(writer);
70   if (on_open_callback_ != nullptr) {
71     on_open_callback_();
72   }
73   return OkStatus();
74 }
75 
Flush(ByteSpan encoding_buffer)76 Status RpcLogDrain::Flush(ByteSpan encoding_buffer) {
77   Status status;
78   SendLogs(std::numeric_limits<size_t>::max(), encoding_buffer, status);
79   return status;
80 }
81 
Trickle(ByteSpan encoding_buffer)82 std::optional<chrono::SystemClock::duration> RpcLogDrain::Trickle(
83     ByteSpan encoding_buffer) {
84   chrono::SystemClock::time_point now = chrono::SystemClock::now();
85   // Called before drain is ready to send more logs. Ignore this request and
86   // remind the caller how much longer they'll need to wait.
87   if (no_writes_until_ > now) {
88     return no_writes_until_ - now;
89   }
90 
91   Status encoding_status;
92   if (SendLogs(max_bundles_per_trickle_, encoding_buffer, encoding_status) ==
93       LogDrainState::kCaughtUp) {
94     return std::nullopt;
95   }
96 
97   no_writes_until_ = chrono::SystemClock::TimePointAfterAtLeast(trickle_delay_);
98   return trickle_delay_;
99 }
100 
SendLogs(size_t max_num_bundles,ByteSpan encoding_buffer,Status & encoding_status_out)101 RpcLogDrain::LogDrainState RpcLogDrain::SendLogs(size_t max_num_bundles,
102                                                  ByteSpan encoding_buffer,
103                                                  Status& encoding_status_out) {
104   PW_CHECK_NOTNULL(multisink_);
105 
106   LogDrainState log_sink_state = LogDrainState::kMoreEntriesRemaining;
107   std::lock_guard lock(mutex_);
108   size_t sent_bundle_count = 0;
109   while (sent_bundle_count < max_num_bundles &&
110          log_sink_state != LogDrainState::kCaughtUp) {
111     if (!server_writer_.active()) {
112       encoding_status_out = Status::Unavailable();
113       // No reason to keep polling this drain until the writer is opened.
114       return LogDrainState::kCaughtUp;
115     }
116     log::LogEntries::MemoryEncoder encoder(encoding_buffer);
117     uint32_t packed_entry_count = 0;
118     log_sink_state = EncodeOutgoingPacket(encoder, packed_entry_count);
119 
120     // Avoid sending empty packets.
121     if (encoder.size() == 0) {
122       continue;
123     }
124 
125     encoder.WriteFirstEntrySequenceId(sequence_id_)
126         .IgnoreError();  // TODO(pwbug/387): Handle Status properly
127     sequence_id_ += packed_entry_count;
128     const Status status = server_writer_.Write(encoder);
129     sent_bundle_count++;
130 
131     if (!status.ok() &&
132         error_handling_ == LogDrainErrorHandling::kCloseStreamOnWriterError) {
133       // Only update this drop count when writer errors are not ignored.
134       drop_count_writer_error_ += packed_entry_count;
135       server_writer_.Finish().IgnoreError();
136       encoding_status_out = Status::Aborted();
137       return log_sink_state;
138     }
139   }
140   return log_sink_state;
141 }
142 
EncodeOutgoingPacket(log::LogEntries::MemoryEncoder & encoder,uint32_t & packed_entry_count_out)143 RpcLogDrain::LogDrainState RpcLogDrain::EncodeOutgoingPacket(
144     log::LogEntries::MemoryEncoder& encoder, uint32_t& packed_entry_count_out) {
145   const size_t total_buffer_size = encoder.ConservativeWriteLimit();
146   do {
147     // Peek entry and get drop count from multisink.
148     uint32_t drop_count = 0;
149     uint32_t ingress_drop_count = 0;
150     Result<multisink::MultiSink::Drain::PeekedEntry> possible_entry =
151         PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count);
152     drop_count_ingress_error_ += ingress_drop_count;
153 
154     // Check if the entry fits in the entry buffer.
155     if (possible_entry.status().IsResourceExhausted()) {
156       ++drop_count_small_stack_buffer_;
157       continue;
158     }
159 
160     // Check if there are any entries left.
161     if (possible_entry.status().IsOutOfRange()) {
162       // Stash multisink's reported drop count that will be reported later with
163       // any other drop counts.
164       drop_count_slow_drain_ += drop_count;
165       return LogDrainState::kCaughtUp;  // There are no more entries.
166     }
167 
168     // At this point all expected errors have been handled.
169     PW_CHECK_OK(possible_entry.status());
170 
171     // Check if the entry passes any set filter rules.
172     if (filter_ != nullptr &&
173         filter_->ShouldDropLog(possible_entry.value().entry())) {
174       // Add the drop count from the multisink peek, stored in `drop_count`, to
175       // the total drop count. Then drop the entry without counting it towards
176       // the total drop count. Drops will be reported later all together.
177       drop_count_slow_drain_ += drop_count;
178       PW_CHECK_OK(PopEntry(possible_entry.value()));
179       continue;
180     }
181 
182     // Check if the entry fits in the encoder buffer by itself.
183     const size_t encoded_entry_size =
184         possible_entry.value().entry().size() + kLogEntriesEncodeFrameSize;
185     if (encoded_entry_size + kLogEntriesEncodeFrameSize > total_buffer_size) {
186       // Entry is larger than the entire available buffer.
187       ++drop_count_small_outbound_buffer_;
188       PW_CHECK_OK(PopEntry(possible_entry.value()));
189       continue;
190     }
191 
192     // At this point, we have a valid entry that may fit in the encode buffer.
193     // Report any drop counts combined reusing the log_entry_buffer_ to encode a
194     // drop message.
195     drop_count_slow_drain_ += drop_count;
196     // Account for dropped entries too large for stack buffer, which PeekEntry()
197     // also reports.
198     drop_count_slow_drain_ -= drop_count_small_stack_buffer_;
199     bool log_entry_buffer_has_valid_entry = possible_entry.ok();
200     if (drop_count_slow_drain_ > 0) {
201       TryEncodeDropMessage(log_entry_buffer_,
202                            std::string_view(kSlowDrainErrorMessage),
203                            drop_count_slow_drain_,
204                            encoder);
205       log_entry_buffer_has_valid_entry = false;
206     }
207     if (drop_count_ingress_error_ > 0) {
208       TryEncodeDropMessage(log_entry_buffer_,
209                            std::string_view(kIngressErrorMessage),
210                            drop_count_ingress_error_,
211                            encoder);
212       log_entry_buffer_has_valid_entry = false;
213     }
214     if (drop_count_small_stack_buffer_ > 0) {
215       TryEncodeDropMessage(log_entry_buffer_,
216                            std::string_view(kSmallStackBufferErrorMessage),
217                            drop_count_small_stack_buffer_,
218                            encoder);
219       log_entry_buffer_has_valid_entry = false;
220     }
221     if (drop_count_small_outbound_buffer_ > 0) {
222       TryEncodeDropMessage(log_entry_buffer_,
223                            std::string_view(kSmallOutboundBufferErrorMessage),
224                            drop_count_small_outbound_buffer_,
225                            encoder);
226       log_entry_buffer_has_valid_entry = false;
227     }
228     if (drop_count_writer_error_ > 0) {
229       TryEncodeDropMessage(log_entry_buffer_,
230                            std::string_view(kWriterErrorMessage),
231                            drop_count_writer_error_,
232                            encoder);
233       log_entry_buffer_has_valid_entry = false;
234     }
235     if (possible_entry.ok() && !log_entry_buffer_has_valid_entry) {
236       PW_CHECK_OK(PeekEntry(log_entry_buffer_, drop_count, ingress_drop_count)
237                       .status());
238     }
239 
240     // Check if the entry fits in the partially filled encoder buffer.
241     if (encoded_entry_size > encoder.ConservativeWriteLimit()) {
242       // Notify the caller there are more entries to send.
243       return LogDrainState::kMoreEntriesRemaining;
244     }
245 
246     // Encode the entry and remove it from multisink.
247     PW_CHECK_OK(encoder.WriteBytes(
248         static_cast<uint32_t>(log::LogEntries::Fields::ENTRIES),
249         possible_entry.value().entry()));
250     PW_CHECK_OK(PopEntry(possible_entry.value()));
251     ++packed_entry_count_out;
252   } while (true);
253 }
254 
Close()255 Status RpcLogDrain::Close() {
256   std::lock_guard lock(mutex_);
257   return server_writer_.Finish();
258 }
259 
260 }  // namespace pw::log_rpc
261