1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
16
17 #include <algorithm>
18 #include <cstring>
19
20 #include "pw_assert/assert.h"
21 #include "pw_assert/check.h"
22 #include "pw_status/try.h"
23 #include "pw_varint/varint.h"
24
25 namespace pw {
26 namespace ring_buffer {
27
28 using std::byte;
29 using Entry = PrefixedEntryRingBufferMulti::Entry;
30 using Reader = PrefixedEntryRingBufferMulti::Reader;
31 using iterator = PrefixedEntryRingBufferMulti::iterator;
32
Clear()33 void PrefixedEntryRingBufferMulti::Clear() {
34 write_idx_ = 0;
35 for (Reader& reader : readers_) {
36 reader.read_idx_ = 0;
37 reader.entry_count_ = 0;
38 }
39 }
40
SetBuffer(std::span<byte> buffer)41 Status PrefixedEntryRingBufferMulti::SetBuffer(std::span<byte> buffer) {
42 if ((buffer.data() == nullptr) || //
43 (buffer.size_bytes() == 0) || //
44 (buffer.size_bytes() > kMaxBufferBytes)) {
45 return Status::InvalidArgument();
46 }
47
48 buffer_ = buffer.data();
49 buffer_bytes_ = buffer.size_bytes();
50
51 Clear();
52 return OkStatus();
53 }
54
AttachReader(Reader & reader)55 Status PrefixedEntryRingBufferMulti::AttachReader(Reader& reader) {
56 if (reader.buffer_ != nullptr) {
57 return Status::InvalidArgument();
58 }
59 reader.buffer_ = this;
60
61 if (readers_.empty()) {
62 reader.read_idx_ = write_idx_;
63 reader.entry_count_ = 0;
64 } else {
65 const Reader& slowest_reader = GetSlowestReader();
66 reader.read_idx_ = slowest_reader.read_idx_;
67 reader.entry_count_ = slowest_reader.entry_count_;
68 }
69
70 readers_.push_back(reader);
71 return OkStatus();
72 }
73
DetachReader(Reader & reader)74 Status PrefixedEntryRingBufferMulti::DetachReader(Reader& reader) {
75 if (reader.buffer_ != this) {
76 return Status::InvalidArgument();
77 }
78 reader.buffer_ = nullptr;
79 reader.read_idx_ = 0;
80 reader.entry_count_ = 0;
81 readers_.remove(reader);
82 return OkStatus();
83 }
84
InternalPushBack(std::span<const byte> data,uint32_t user_preamble_data,bool pop_front_if_needed)85 Status PrefixedEntryRingBufferMulti::InternalPushBack(
86 std::span<const byte> data,
87 uint32_t user_preamble_data,
88 bool pop_front_if_needed) {
89 if (buffer_ == nullptr) {
90 return Status::FailedPrecondition();
91 }
92
93 // Prepare a single buffer that can hold both the user preamble and entry
94 // length.
95 byte preamble_buf[varint::kMaxVarint32SizeBytes * 2];
96 size_t user_preamble_bytes = 0;
97 if (user_preamble_) {
98 user_preamble_bytes =
99 varint::Encode<uint32_t>(user_preamble_data, preamble_buf);
100 }
101 size_t length_bytes = varint::Encode<uint32_t>(
102 data.size_bytes(), std::span(preamble_buf).subspan(user_preamble_bytes));
103 size_t total_write_bytes =
104 user_preamble_bytes + length_bytes + data.size_bytes();
105 if (buffer_bytes_ < total_write_bytes) {
106 return Status::OutOfRange();
107 }
108
109 if (pop_front_if_needed) {
110 // PushBack() case: evict items as needed.
111 // Drop old entries until we have space for the new entry.
112 while (RawAvailableBytes() < total_write_bytes) {
113 InternalPopFrontAll();
114 }
115 } else if (RawAvailableBytes() < total_write_bytes) {
116 // TryPushBack() case: don't evict items.
117 return Status::ResourceExhausted();
118 }
119
120 // Write the new entry into the ring buffer.
121 RawWrite(std::span(preamble_buf, user_preamble_bytes + length_bytes));
122 RawWrite(data);
123
124 // Update all readers of the new count.
125 for (Reader& reader : readers_) {
126 reader.entry_count_++;
127 }
128 return OkStatus();
129 }
130
GetOutput(std::span<byte> data_out,size_t * write_index)131 auto GetOutput(std::span<byte> data_out, size_t* write_index) {
132 return [data_out, write_index](std::span<const byte> src) -> Status {
133 size_t copy_size = std::min(data_out.size_bytes(), src.size_bytes());
134
135 memcpy(data_out.data() + *write_index, src.data(), copy_size);
136 *write_index += copy_size;
137
138 return (copy_size == src.size_bytes()) ? OkStatus()
139 : Status::ResourceExhausted();
140 };
141 }
142
InternalPeekFront(const Reader & reader,std::span<byte> data,size_t * bytes_read_out) const143 Status PrefixedEntryRingBufferMulti::InternalPeekFront(
144 const Reader& reader, std::span<byte> data, size_t* bytes_read_out) const {
145 *bytes_read_out = 0;
146 return InternalRead(reader, GetOutput(data, bytes_read_out), false);
147 }
148
InternalPeekFront(const Reader & reader,ReadOutput output) const149 Status PrefixedEntryRingBufferMulti::InternalPeekFront(
150 const Reader& reader, ReadOutput output) const {
151 return InternalRead(reader, output, false);
152 }
153
InternalPeekFrontWithPreamble(const Reader & reader,std::span<byte> data,size_t * bytes_read_out) const154 Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
155 const Reader& reader, std::span<byte> data, size_t* bytes_read_out) const {
156 *bytes_read_out = 0;
157 return InternalRead(reader, GetOutput(data, bytes_read_out), true);
158 }
159
InternalPeekFrontWithPreamble(const Reader & reader,ReadOutput output) const160 Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
161 const Reader& reader, ReadOutput output) const {
162 return InternalRead(reader, output, true);
163 }
164
InternalPeekFrontPreamble(const Reader & reader,uint32_t & user_preamble_out) const165 Status PrefixedEntryRingBufferMulti::InternalPeekFrontPreamble(
166 const Reader& reader, uint32_t& user_preamble_out) const {
167 if (reader.entry_count_ == 0) {
168 return Status::OutOfRange();
169 }
170 // Figure out where to start reading (wrapped); accounting for preamble.
171 EntryInfo info = FrontEntryInfo(reader);
172 user_preamble_out = info.user_preamble;
173 return OkStatus();
174 }
175
176 // TODO(pwbug/339): Consider whether this internal templating is required, or if
177 // we can simply promote GetOutput to a static function and remove the template.
178 // T should be similar to Status (*read_output)(std::span<const byte>)
179 template <typename T>
InternalRead(const Reader & reader,T read_output,bool include_preamble_in_output,uint32_t * user_preamble_out) const180 Status PrefixedEntryRingBufferMulti::InternalRead(
181 const Reader& reader,
182 T read_output,
183 bool include_preamble_in_output,
184 uint32_t* user_preamble_out) const {
185 if (buffer_ == nullptr) {
186 return Status::FailedPrecondition();
187 }
188 if (reader.entry_count_ == 0) {
189 return Status::OutOfRange();
190 }
191
192 // Figure out where to start reading (wrapped); accounting for preamble.
193 EntryInfo info = FrontEntryInfo(reader);
194 size_t read_bytes = info.data_bytes;
195 size_t data_read_idx = reader.read_idx_;
196 if (user_preamble_out) {
197 *user_preamble_out = info.user_preamble;
198 }
199 if (include_preamble_in_output) {
200 read_bytes += info.preamble_bytes;
201 } else {
202 data_read_idx = IncrementIndex(data_read_idx, info.preamble_bytes);
203 }
204
205 // Read bytes, stopping at the end of the buffer if this entry wraps.
206 size_t bytes_until_wrap = buffer_bytes_ - data_read_idx;
207 size_t bytes_to_copy = std::min(read_bytes, bytes_until_wrap);
208 Status status =
209 read_output(std::span(buffer_ + data_read_idx, bytes_to_copy));
210
211 // If the entry wrapped, read the remaining bytes.
212 if (status.ok() && (bytes_to_copy < read_bytes)) {
213 status = read_output(std::span(buffer_, read_bytes - bytes_to_copy));
214 }
215 return status;
216 }
217
InternalPopFrontAll()218 void PrefixedEntryRingBufferMulti::InternalPopFrontAll() {
219 // Forcefully pop all readers. Find the slowest reader, which must have
220 // the highest entry count, then pop all readers that have the same count.
221 //
222 // It is expected that InternalPopFrontAll is called only when there is
223 // something to pop from at least one reader. If no readers exist, or all
224 // readers are caught up, this function will assert.
225 size_t entry_count = GetSlowestReader().entry_count_;
226 PW_DCHECK_INT_NE(entry_count, 0);
227 // Otherwise, pop the readers that have the largest value.
228 for (Reader& reader : readers_) {
229 if (reader.entry_count_ == entry_count) {
230 reader.PopFront()
231 .IgnoreError(); // TODO(pwbug/387): Handle Status properly
232 }
233 }
234 }
235
GetSlowestReader() const236 const Reader& PrefixedEntryRingBufferMulti::GetSlowestReader() const {
237 PW_DCHECK_INT_GT(readers_.size(), 0);
238 const Reader* slowest_reader = &(*readers_.begin());
239 for (const Reader& reader : readers_) {
240 if (reader.entry_count_ > slowest_reader->entry_count_) {
241 slowest_reader = &reader;
242 }
243 }
244 return *slowest_reader;
245 }
246
Dering()247 Status PrefixedEntryRingBufferMulti::Dering() {
248 if (buffer_ == nullptr || readers_.empty()) {
249 return Status::FailedPrecondition();
250 }
251
252 // Check if by luck we're already deringed.
253 Reader& slowest_reader = GetSlowestReaderWritable();
254 if (slowest_reader.read_idx_ == 0) {
255 return OkStatus();
256 }
257
258 return InternalDering(slowest_reader);
259 }
260
InternalDering(Reader & dering_reader)261 Status PrefixedEntryRingBufferMulti::InternalDering(Reader& dering_reader) {
262 if (buffer_ == nullptr || readers_.empty()) {
263 return Status::FailedPrecondition();
264 }
265
266 auto buffer_span = std::span(buffer_, buffer_bytes_);
267 std::rotate(buffer_span.begin(),
268 buffer_span.begin() + dering_reader.read_idx_,
269 buffer_span.end());
270
271 // If the new index is past the end of the buffer,
272 // alias it back (wrap) to the start of the buffer.
273 if (write_idx_ < dering_reader.read_idx_) {
274 write_idx_ += buffer_bytes_;
275 }
276 write_idx_ -= dering_reader.read_idx_;
277
278 for (Reader& reader : readers_) {
279 if (&reader == &dering_reader) {
280 continue;
281 }
282 if (reader.read_idx_ < dering_reader.read_idx_) {
283 reader.read_idx_ += buffer_bytes_;
284 }
285 reader.read_idx_ -= dering_reader.read_idx_;
286 }
287
288 dering_reader.read_idx_ = 0;
289 return OkStatus();
290 }
291
InternalPopFront(Reader & reader)292 Status PrefixedEntryRingBufferMulti::InternalPopFront(Reader& reader) {
293 if (buffer_ == nullptr) {
294 return Status::FailedPrecondition();
295 }
296 if (reader.entry_count_ == 0) {
297 return Status::OutOfRange();
298 }
299
300 // Advance the read pointer past the front entry to the next one.
301 EntryInfo info = FrontEntryInfo(reader);
302 size_t entry_bytes = info.preamble_bytes + info.data_bytes;
303 size_t prev_read_idx = reader.read_idx_;
304 reader.read_idx_ = IncrementIndex(prev_read_idx, entry_bytes);
305 reader.entry_count_--;
306 return OkStatus();
307 }
308
InternalFrontEntryDataSizeBytes(const Reader & reader) const309 size_t PrefixedEntryRingBufferMulti::InternalFrontEntryDataSizeBytes(
310 const Reader& reader) const {
311 if (reader.entry_count_ == 0) {
312 return 0;
313 }
314 return FrontEntryInfo(reader).data_bytes;
315 }
316
InternalFrontEntryTotalSizeBytes(const Reader & reader) const317 size_t PrefixedEntryRingBufferMulti::InternalFrontEntryTotalSizeBytes(
318 const Reader& reader) const {
319 if (reader.entry_count_ == 0) {
320 return 0;
321 }
322 EntryInfo info = FrontEntryInfo(reader);
323 return info.preamble_bytes + info.data_bytes;
324 }
325
326 PrefixedEntryRingBufferMulti::EntryInfo
FrontEntryInfo(const Reader & reader) const327 PrefixedEntryRingBufferMulti::FrontEntryInfo(const Reader& reader) const {
328 Result<PrefixedEntryRingBufferMulti::EntryInfo> entry_info =
329 RawFrontEntryInfo(reader.read_idx_);
330 PW_CHECK_OK(entry_info.status());
331 return entry_info.value();
332 }
333
334 Result<PrefixedEntryRingBufferMulti::EntryInfo>
RawFrontEntryInfo(size_t source_idx) const335 PrefixedEntryRingBufferMulti::RawFrontEntryInfo(size_t source_idx) const {
336 // Entry headers consists of: (optional prefix byte, varint size, data...)
337
338 // If a preamble exists, extract the varint and it's bytes in bytes.
339 size_t user_preamble_bytes = 0;
340 uint64_t user_preamble_data = 0;
341 byte varint_buf[varint::kMaxVarint32SizeBytes];
342 if (user_preamble_) {
343 RawRead(varint_buf, source_idx, varint::kMaxVarint32SizeBytes);
344 user_preamble_bytes = varint::Decode(varint_buf, &user_preamble_data);
345 if (user_preamble_bytes == 0u) {
346 return Status::DataLoss();
347 }
348 }
349
350 // Read the entry header; extract the varint and it's bytes in bytes.
351 RawRead(varint_buf,
352 IncrementIndex(source_idx, user_preamble_bytes),
353 varint::kMaxVarint32SizeBytes);
354 uint64_t entry_bytes;
355 size_t length_bytes = varint::Decode(varint_buf, &entry_bytes);
356 if (length_bytes == 0u) {
357 return Status::DataLoss();
358 }
359
360 EntryInfo info = {};
361 info.preamble_bytes = user_preamble_bytes + length_bytes;
362 info.user_preamble = static_cast<uint32_t>(user_preamble_data);
363 info.data_bytes = entry_bytes;
364 return info;
365 }
366
367 // Comparisons ordered for more probable early exits, assuming the reader is
368 // not far behind the writer compared to the size of the ring.
RawAvailableBytes() const369 size_t PrefixedEntryRingBufferMulti::RawAvailableBytes() const {
370 // Compute slowest reader. If no readers exist, the entire buffer can be
371 // written.
372 if (readers_.empty()) {
373 return buffer_bytes_;
374 }
375
376 size_t read_idx = GetSlowestReader().read_idx_;
377 // Case: Not wrapped.
378 if (read_idx < write_idx_) {
379 return buffer_bytes_ - (write_idx_ - read_idx);
380 }
381 // Case: Wrapped
382 if (read_idx > write_idx_) {
383 return read_idx - write_idx_;
384 }
385 // Case: Matched read and write heads; empty or full.
386 for (const Reader& reader : readers_) {
387 if (reader.read_idx_ == read_idx && reader.entry_count_ != 0) {
388 return 0;
389 }
390 }
391 return buffer_bytes_;
392 }
393
RawWrite(std::span<const std::byte> source)394 void PrefixedEntryRingBufferMulti::RawWrite(std::span<const std::byte> source) {
395 if (source.size_bytes() == 0) {
396 return;
397 }
398
399 // Write until the end of the source or the backing buffer.
400 size_t bytes_until_wrap = buffer_bytes_ - write_idx_;
401 size_t bytes_to_copy = std::min(source.size(), bytes_until_wrap);
402 memcpy(buffer_ + write_idx_, source.data(), bytes_to_copy);
403
404 // If there wasn't space in the backing buffer, wrap to the front.
405 if (bytes_to_copy < source.size()) {
406 memcpy(
407 buffer_, source.data() + bytes_to_copy, source.size() - bytes_to_copy);
408 }
409 write_idx_ = IncrementIndex(write_idx_, source.size());
410 }
411
RawRead(byte * destination,size_t source_idx,size_t length_bytes) const412 void PrefixedEntryRingBufferMulti::RawRead(byte* destination,
413 size_t source_idx,
414 size_t length_bytes) const {
415 if (length_bytes == 0) {
416 return;
417 }
418
419 // Read the pre-wrap bytes.
420 size_t bytes_until_wrap = buffer_bytes_ - source_idx;
421 size_t bytes_to_copy = std::min(length_bytes, bytes_until_wrap);
422 memcpy(destination, buffer_ + source_idx, bytes_to_copy);
423
424 // Read the post-wrap bytes, if needed.
425 if (bytes_to_copy < length_bytes) {
426 memcpy(destination + bytes_to_copy, buffer_, length_bytes - bytes_to_copy);
427 }
428 }
429
IncrementIndex(size_t index,size_t count) const430 size_t PrefixedEntryRingBufferMulti::IncrementIndex(size_t index,
431 size_t count) const {
432 // Note: This doesn't use modulus (%) since the branch is cheaper, and we
433 // guarantee that count will never be greater than buffer_bytes_.
434 index += count;
435 if (index > buffer_bytes_) {
436 index -= buffer_bytes_;
437 }
438 return index;
439 }
440
PeekFrontWithPreamble(std::span<byte> data,uint32_t & user_preamble_out,size_t & entry_bytes_read_out) const441 Status PrefixedEntryRingBufferMulti::Reader::PeekFrontWithPreamble(
442 std::span<byte> data,
443 uint32_t& user_preamble_out,
444 size_t& entry_bytes_read_out) const {
445 entry_bytes_read_out = 0;
446 return buffer_->InternalRead(
447 *this, GetOutput(data, &entry_bytes_read_out), false, &user_preamble_out);
448 }
449
operator ++()450 iterator& iterator::operator++() {
451 PW_DCHECK_OK(iteration_status_);
452 PW_DCHECK_INT_NE(entry_count_, 0);
453
454 Result<EntryInfo> info = ring_buffer_->RawFrontEntryInfo(read_idx_);
455 if (!info.status().ok()) {
456 SkipToEnd(info.status());
457 return *this;
458 }
459
460 // It is guaranteed that the buffer is deringed at this point.
461 read_idx_ += info.value().preamble_bytes + info.value().data_bytes;
462 entry_count_--;
463
464 if (entry_count_ == 0) {
465 SkipToEnd(OkStatus());
466 return *this;
467 }
468
469 if (read_idx_ >= ring_buffer_->TotalUsedBytes()) {
470 SkipToEnd(Status::DataLoss());
471 return *this;
472 }
473
474 info = ring_buffer_->RawFrontEntryInfo(read_idx_);
475 if (!info.status().ok()) {
476 SkipToEnd(info.status());
477 return *this;
478 }
479 return *this;
480 }
481
operator *() const482 const Entry& iterator::operator*() const {
483 PW_DCHECK_OK(iteration_status_);
484 PW_DCHECK_INT_NE(entry_count_, 0);
485
486 Result<EntryInfo> info = ring_buffer_->RawFrontEntryInfo(read_idx_);
487 PW_DCHECK_OK(info.status());
488
489 entry_ = {
490 .buffer = std::span<const byte>(
491 ring_buffer_->buffer_ + read_idx_ + info.value().preamble_bytes,
492 info.value().data_bytes),
493 .preamble = info.value().user_preamble,
494 };
495 return entry_;
496 }
497
498 } // namespace ring_buffer
499 } // namespace pw
500