1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #include "pw_multisink/multisink.h"
15
16 #include <cstring>
17
18 #include "pw_assert/check.h"
19 #include "pw_bytes/span.h"
20 #include "pw_function/function.h"
21 #include "pw_log/log.h"
22 #include "pw_result/result.h"
23 #include "pw_status/status.h"
24 #include "pw_status/try.h"
25 #include "pw_varint/varint.h"
26
27 namespace pw {
28 namespace multisink {
29
HandleEntry(ConstByteSpan entry)30 void MultiSink::HandleEntry(ConstByteSpan entry) {
31 std::lock_guard lock(lock_);
32 const Status push_back_status = ring_buffer_.PushBack(entry, sequence_id_++);
33 PW_DCHECK_OK(push_back_status);
34 NotifyListeners();
35 }
36
HandleDropped(uint32_t drop_count)37 void MultiSink::HandleDropped(uint32_t drop_count) {
38 std::lock_guard lock(lock_);
39 // Updating the sequence ID helps identify where the ingress drop happend when
40 // a drain peeks or pops.
41 sequence_id_ += drop_count;
42 total_ingress_drops_ += drop_count;
43 NotifyListeners();
44 }
45
PopEntry(Drain & drain,const Drain::PeekedEntry & entry)46 Status MultiSink::PopEntry(Drain& drain, const Drain::PeekedEntry& entry) {
47 std::lock_guard lock(lock_);
48 PW_DCHECK_PTR_EQ(drain.multisink_, this);
49
50 // Ignore the call if the entry has been handled already.
51 if (entry.sequence_id() == drain.last_handled_sequence_id_) {
52 return OkStatus();
53 }
54
55 uint32_t next_entry_sequence_id;
56 Status peek_status = drain.reader_.PeekFrontPreamble(next_entry_sequence_id);
57 if (!peek_status.ok()) {
58 // Ignore errors if the multisink is empty.
59 if (peek_status.IsOutOfRange()) {
60 return OkStatus();
61 }
62 return peek_status;
63 }
64 if (next_entry_sequence_id == entry.sequence_id()) {
65 // A crash should not happen, since the peek was successful and `lock_` is
66 // still held, there shouldn't be any modifications to the multisink in
67 // between peeking and popping.
68 PW_CHECK_OK(drain.reader_.PopFront());
69 }
70 // If the entry's sequence id is not the next one it means that the
71 // multisink advanced since PeekEntry() was called. Advance the last handled
72 // sequence id to the passed entry anyway to mark the fact that the dropped
73 // messages reported on PeekEntry() are handled.
74 drain.last_handled_sequence_id_ = entry.sequence_id();
75 return OkStatus();
76 }
77
PeekOrPopEntry(Drain & drain,ByteSpan buffer,Request request,uint32_t & drain_drop_count_out,uint32_t & ingress_drop_count_out,uint32_t & entry_sequence_id_out)78 Result<ConstByteSpan> MultiSink::PeekOrPopEntry(
79 Drain& drain,
80 ByteSpan buffer,
81 Request request,
82 uint32_t& drain_drop_count_out,
83 uint32_t& ingress_drop_count_out,
84 uint32_t& entry_sequence_id_out)
85 PW_NO_SANITIZE("unsigned-integer-overflow") {
86 size_t bytes_read = 0;
87 entry_sequence_id_out = 0;
88 drain_drop_count_out = 0;
89 ingress_drop_count_out = 0;
90
91 std::lock_guard lock(lock_);
92 PW_DCHECK_PTR_EQ(drain.multisink_, this);
93
94 const Status peek_status = drain.reader_.PeekFrontWithPreamble(
95 buffer, entry_sequence_id_out, bytes_read);
96
97 if (peek_status.IsOutOfRange()) {
98 // If the drain has caught up, report the last handled sequence ID so that
99 // it can still process any dropped entries.
100 entry_sequence_id_out = sequence_id_ - 1;
101 } else if (!peek_status.ok()) {
102 // Discard the entry if the result isn't OK or OUT_OF_RANGE and exit, as the
103 // entry_sequence_id_out cannot be used for computation. Later invocations
104 // will calculate the drop count.
105 PW_CHECK(drain.reader_.PopFront().ok());
106 return peek_status;
107 }
108
109 // Compute the drop count delta by comparing this entry's sequence ID with the
110 // last sequence ID this drain successfully read.
111 //
112 // The drop count calculation simply computes the difference between the
113 // current and last sequence IDs. Consecutive successful reads will always
114 // differ by one at least, so it is subtracted out. If the read was not
115 // successful, the difference is not adjusted.
116 drain_drop_count_out = entry_sequence_id_out -
117 drain.last_handled_sequence_id_ -
118 (peek_status.ok() ? 1 : 0);
119
120 // Only report the ingress drop count when the drain catches up to where the
121 // drop happened, accounting only for the drops found and no more, as
122 // indicated by the gap in sequence IDs.
123 if (drain_drop_count_out > 0) {
124 ingress_drop_count_out =
125 std::min(drain_drop_count_out,
126 total_ingress_drops_ - drain.last_handled_ingress_drop_count_);
127 // Remove the ingress drop count duplicated in drain_drop_count_out.
128 drain_drop_count_out -= ingress_drop_count_out;
129 // Check if all the ingress drops were reported.
130 drain.last_handled_ingress_drop_count_ =
131 total_ingress_drops_ > ingress_drop_count_out
132 ? total_ingress_drops_ - ingress_drop_count_out
133 : total_ingress_drops_;
134 }
135
136 // The Peek above may have failed due to OutOfRange, now that we've set the
137 // drop count see if we should return before attempting to pop.
138 if (peek_status.IsOutOfRange()) {
139 // No more entries, update the drain.
140 drain.last_handled_sequence_id_ = entry_sequence_id_out;
141 return peek_status;
142 }
143 if (request == Request::kPop) {
144 PW_CHECK(drain.reader_.PopFront().ok());
145 drain.last_handled_sequence_id_ = entry_sequence_id_out;
146 }
147 return as_bytes(buffer.first(bytes_read));
148 }
149
AttachDrain(Drain & drain)150 void MultiSink::AttachDrain(Drain& drain)
151 PW_NO_SANITIZE("unsigned-integer-overflow") {
152 std::lock_guard lock(lock_);
153 PW_DCHECK_PTR_EQ(drain.multisink_, nullptr);
154 drain.multisink_ = this;
155
156 PW_CHECK_OK(ring_buffer_.AttachReader(drain.reader_));
157 if (&drain == &oldest_entry_drain_) {
158 drain.last_handled_sequence_id_ = sequence_id_ - 1;
159 } else {
160 drain.last_handled_sequence_id_ =
161 oldest_entry_drain_.last_handled_sequence_id_;
162 }
163 drain.last_peek_sequence_id_ = drain.last_handled_sequence_id_;
164 drain.last_handled_ingress_drop_count_ = 0;
165 }
166
DetachDrain(Drain & drain)167 void MultiSink::DetachDrain(Drain& drain) {
168 std::lock_guard lock(lock_);
169 PW_DCHECK_PTR_EQ(drain.multisink_, this);
170 drain.multisink_ = nullptr;
171 PW_CHECK_OK(ring_buffer_.DetachReader(drain.reader_),
172 "The drain wasn't already attached.");
173 }
174
AttachListener(Listener & listener)175 void MultiSink::AttachListener(Listener& listener) {
176 std::lock_guard lock(lock_);
177 listeners_.push_back(listener);
178 // Notify the newly added entry, in case there are items in the sink.
179 listener.OnNewEntryAvailable();
180 }
181
DetachListener(Listener & listener)182 void MultiSink::DetachListener(Listener& listener) {
183 std::lock_guard lock(lock_);
184 [[maybe_unused]] bool was_detached = listeners_.remove(listener);
185 PW_DCHECK(was_detached, "The listener was already attached.");
186 }
187
Clear()188 void MultiSink::Clear() {
189 std::lock_guard lock(lock_);
190 ring_buffer_.Clear();
191 }
192
NotifyListeners()193 void MultiSink::NotifyListeners() {
194 for (auto& listener : listeners_) {
195 listener.OnNewEntryAvailable();
196 }
197 }
198
UnsafeForEachEntry(const Function<void (ConstByteSpan)> & callback,size_t max_num_entries)199 Status MultiSink::UnsafeForEachEntry(
200 const Function<void(ConstByteSpan)>& callback, size_t max_num_entries) {
201 MultiSink::UnsafeIterationWrapper multisink_iteration = UnsafeIteration();
202
203 // First count the number of entries.
204 size_t num_entries = 0;
205 for ([[maybe_unused]] ConstByteSpan entry : multisink_iteration) {
206 num_entries++;
207 }
208
209 // Log up to the max number of logs to avoid overflowing the crash log
210 // writer.
211 const size_t first_logged_offset =
212 max_num_entries > num_entries ? 0 : num_entries - max_num_entries;
213 pw::multisink::MultiSink::iterator it = multisink_iteration.begin();
214 for (size_t offset = 0; it != multisink_iteration.end(); ++it, ++offset) {
215 if (offset < first_logged_offset) {
216 continue; // Skip this log.
217 }
218 callback(*it);
219 }
220 if (!it.status().ok()) {
221 PW_LOG_WARN("Multisink corruption detected, some entries may be missing");
222 return Status::DataLoss();
223 }
224
225 return OkStatus();
226 }
227
UnsafeForEachEntryFromEnd(const Function<void (ConstByteSpan)> & callback,size_t max_size_bytes)228 Status MultiSink::UnsafeForEachEntryFromEnd(
229 const Function<void(ConstByteSpan)>& callback, size_t max_size_bytes) {
230 MultiSink::UnsafeIterationWrapper multisink_iteration = UnsafeIteration();
231
232 // First count the number of entries and total size of the entries.
233 size_t num_entries = 0;
234 size_t total_bytes = 0;
235 iterator it = multisink_iteration.begin();
236 iterator last_elem_it;
237 for (; it != multisink_iteration.end(); ++it) {
238 num_entries++;
239 total_bytes += (*it).size();
240 last_elem_it = it;
241 }
242
243 size_t max_num_entries = std::numeric_limits<size_t>::max();
244 // All entries won't fit in the available space, so reverse iterate
245 // from the end to calculate the number of elements from the end
246 // which will fit in the available space.
247 if (total_bytes > max_size_bytes) {
248 total_bytes = 0;
249 max_num_entries = 0;
250 while (total_bytes <= max_size_bytes) {
251 total_bytes += (*last_elem_it).size();
252 last_elem_it--;
253 max_num_entries++;
254 }
255 }
256
257 // Log up to the max number of logs to avoid overflowing the crash log
258 // writer.
259 const size_t first_logged_offset =
260 max_num_entries > num_entries ? 0 : num_entries - max_num_entries;
261 it = multisink_iteration.begin();
262 for (size_t offset = 0; it != multisink_iteration.end(); ++it, ++offset) {
263 if (offset < first_logged_offset) {
264 continue; // Skip this log.
265 }
266 callback(*it);
267 }
268 if (!it.status().ok()) {
269 PW_LOG_WARN("Multisink corruption detected, some entries may be missing");
270 return Status::DataLoss();
271 }
272
273 return OkStatus();
274 }
275
PopEntry(const PeekedEntry & entry)276 Status MultiSink::Drain::PopEntry(const PeekedEntry& entry) {
277 PW_DCHECK_NOTNULL(multisink_);
278 return multisink_->PopEntry(*this, entry);
279 }
280
PeekEntry(ByteSpan buffer,uint32_t & drain_drop_count_out,uint32_t & ingress_drop_count_out)281 Result<MultiSink::Drain::PeekedEntry> MultiSink::Drain::PeekEntry(
282 ByteSpan buffer,
283 uint32_t& drain_drop_count_out,
284 uint32_t& ingress_drop_count_out) {
285 PW_DCHECK_NOTNULL(multisink_);
286 uint32_t entry_sequence_id_out;
287 Result<ConstByteSpan> peek_result =
288 multisink_->PeekOrPopEntry(*this,
289 buffer,
290 Request::kPeek,
291 drain_drop_count_out,
292 ingress_drop_count_out,
293 entry_sequence_id_out);
294 if (!peek_result.ok()) {
295 return peek_result.status();
296 }
297 return PeekedEntry(peek_result.value(), entry_sequence_id_out);
298 }
299
PopEntry(ByteSpan buffer,uint32_t & drain_drop_count_out,uint32_t & ingress_drop_count_out)300 Result<ConstByteSpan> MultiSink::Drain::PopEntry(
301 ByteSpan buffer,
302 uint32_t& drain_drop_count_out,
303 uint32_t& ingress_drop_count_out) {
304 PW_DCHECK_NOTNULL(multisink_);
305 uint32_t entry_sequence_id_out;
306 return multisink_->PeekOrPopEntry(*this,
307 buffer,
308 Request::kPop,
309 drain_drop_count_out,
310 ingress_drop_count_out,
311 entry_sequence_id_out);
312 }
313
314 } // namespace multisink
315 } // namespace pw
316