• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
16 
17 #include <algorithm>
18 #include <cstring>
19 
20 #include "pw_assert/assert.h"
21 #include "pw_assert/check.h"
22 #include "pw_status/try.h"
23 #include "pw_varint/varint.h"
24 
25 namespace pw {
26 namespace ring_buffer {
27 
28 using std::byte;
29 using Entry = PrefixedEntryRingBufferMulti::Entry;
30 using Reader = PrefixedEntryRingBufferMulti::Reader;
31 using iterator = PrefixedEntryRingBufferMulti::iterator;
32 
Clear()33 void PrefixedEntryRingBufferMulti::Clear() {
34   write_idx_ = 0;
35   for (Reader& reader : readers_) {
36     reader.read_idx_ = 0;
37     reader.entry_count_ = 0;
38   }
39 }
40 
SetBuffer(span<byte> buffer)41 Status PrefixedEntryRingBufferMulti::SetBuffer(span<byte> buffer) {
42   if ((buffer.data() == nullptr) ||  //
43       (buffer.size_bytes() == 0) ||  //
44       (buffer.size_bytes() > kMaxBufferBytes)) {
45     return Status::InvalidArgument();
46   }
47 
48   buffer_ = buffer.data();
49   buffer_bytes_ = buffer.size_bytes();
50 
51   Clear();
52   return OkStatus();
53 }
54 
AttachReader(Reader & reader)55 Status PrefixedEntryRingBufferMulti::AttachReader(Reader& reader) {
56   if (reader.buffer_ != nullptr) {
57     return Status::InvalidArgument();
58   }
59   reader.buffer_ = this;
60 
61   if (readers_.empty()) {
62     reader.read_idx_ = write_idx_;
63     reader.entry_count_ = 0;
64   } else {
65     const Reader& slowest_reader = GetSlowestReader();
66     reader.read_idx_ = slowest_reader.read_idx_;
67     reader.entry_count_ = slowest_reader.entry_count_;
68   }
69 
70   readers_.push_back(reader);
71   return OkStatus();
72 }
73 
DetachReader(Reader & reader)74 Status PrefixedEntryRingBufferMulti::DetachReader(Reader& reader) {
75   if (reader.buffer_ != this) {
76     return Status::InvalidArgument();
77   }
78   reader.buffer_ = nullptr;
79   reader.read_idx_ = 0;
80   reader.entry_count_ = 0;
81   readers_.remove(reader);
82   return OkStatus();
83 }
84 
InternalPushBack(span<const byte> data,uint32_t user_preamble_data,bool pop_front_if_needed)85 Status PrefixedEntryRingBufferMulti::InternalPushBack(
86     span<const byte> data,
87     uint32_t user_preamble_data,
88     bool pop_front_if_needed) {
89   if (buffer_ == nullptr) {
90     return Status::FailedPrecondition();
91   }
92 
93   // Prepare a single buffer that can hold both the user preamble and entry
94   // length.
95   byte preamble_buf[varint::kMaxVarint32SizeBytes * 2];
96   size_t user_preamble_bytes = 0;
97   if (user_preamble_) {
98     user_preamble_bytes =
99         varint::Encode<uint32_t>(user_preamble_data, preamble_buf);
100   }
101   size_t length_bytes = varint::Encode<uint32_t>(
102       data.size_bytes(), span(preamble_buf).subspan(user_preamble_bytes));
103   size_t total_write_bytes =
104       user_preamble_bytes + length_bytes + data.size_bytes();
105   if (buffer_bytes_ < total_write_bytes) {
106     return Status::OutOfRange();
107   }
108 
109   if (pop_front_if_needed) {
110     // PushBack() case: evict items as needed.
111     // Drop old entries until we have space for the new entry.
112     while (RawAvailableBytes() < total_write_bytes) {
113       InternalPopFrontAll();
114     }
115   } else if (RawAvailableBytes() < total_write_bytes) {
116     // TryPushBack() case: don't evict items.
117     return Status::ResourceExhausted();
118   }
119 
120   // Write the new entry into the ring buffer.
121   RawWrite(span(preamble_buf, user_preamble_bytes + length_bytes));
122   RawWrite(data);
123 
124   // Update all readers of the new count.
125   for (Reader& reader : readers_) {
126     reader.entry_count_++;
127   }
128   return OkStatus();
129 }
130 
GetOutput(span<byte> data_out,size_t * write_index)131 auto GetOutput(span<byte> data_out, size_t* write_index) {
132   return [data_out, write_index](span<const byte> src) -> Status {
133     size_t copy_size = std::min(data_out.size_bytes(), src.size_bytes());
134 
135     memcpy(data_out.data() + *write_index, src.data(), copy_size);
136     *write_index += copy_size;
137 
138     return (copy_size == src.size_bytes()) ? OkStatus()
139                                            : Status::ResourceExhausted();
140   };
141 }
142 
InternalPeekFront(const Reader & reader,span<byte> data,size_t * bytes_read_out) const143 Status PrefixedEntryRingBufferMulti::InternalPeekFront(
144     const Reader& reader, span<byte> data, size_t* bytes_read_out) const {
145   *bytes_read_out = 0;
146   return InternalRead(reader, GetOutput(data, bytes_read_out), false);
147 }
148 
InternalPeekFront(const Reader & reader,ReadOutput output) const149 Status PrefixedEntryRingBufferMulti::InternalPeekFront(
150     const Reader& reader, ReadOutput output) const {
151   return InternalRead(reader, output, false);
152 }
153 
InternalPeekFrontWithPreamble(const Reader & reader,span<byte> data,size_t * bytes_read_out) const154 Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
155     const Reader& reader, span<byte> data, size_t* bytes_read_out) const {
156   *bytes_read_out = 0;
157   return InternalRead(reader, GetOutput(data, bytes_read_out), true);
158 }
159 
InternalPeekFrontWithPreamble(const Reader & reader,ReadOutput output) const160 Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
161     const Reader& reader, ReadOutput output) const {
162   return InternalRead(reader, output, true);
163 }
164 
InternalPeekFrontPreamble(const Reader & reader,uint32_t & user_preamble_out) const165 Status PrefixedEntryRingBufferMulti::InternalPeekFrontPreamble(
166     const Reader& reader, uint32_t& user_preamble_out) const {
167   if (reader.entry_count_ == 0) {
168     return Status::OutOfRange();
169   }
170   // Figure out where to start reading (wrapped); accounting for preamble.
171   EntryInfo info = FrontEntryInfo(reader);
172   user_preamble_out = info.user_preamble;
173   return OkStatus();
174 }
175 
176 // TODO(b/235351046): Consider whether this internal templating is required, or
177 // if we can simply promote GetOutput to a static function and remove the
178 // template. T should be similar to Status (*read_output)(span<const byte>)
179 template <typename T>
InternalRead(const Reader & reader,T read_output,bool include_preamble_in_output,uint32_t * user_preamble_out) const180 Status PrefixedEntryRingBufferMulti::InternalRead(
181     const Reader& reader,
182     T read_output,
183     bool include_preamble_in_output,
184     uint32_t* user_preamble_out) const {
185   if (buffer_ == nullptr) {
186     return Status::FailedPrecondition();
187   }
188   if (reader.entry_count_ == 0) {
189     return Status::OutOfRange();
190   }
191 
192   // Figure out where to start reading (wrapped); accounting for preamble.
193   EntryInfo info = FrontEntryInfo(reader);
194   size_t read_bytes = info.data_bytes;
195   size_t data_read_idx = reader.read_idx_;
196   if (user_preamble_out) {
197     *user_preamble_out = info.user_preamble;
198   }
199   if (include_preamble_in_output) {
200     read_bytes += info.preamble_bytes;
201   } else {
202     data_read_idx = IncrementIndex(data_read_idx, info.preamble_bytes);
203   }
204 
205   // Read bytes, stopping at the end of the buffer if this entry wraps.
206   size_t bytes_until_wrap = buffer_bytes_ - data_read_idx;
207   size_t bytes_to_copy = std::min(read_bytes, bytes_until_wrap);
208   Status status = read_output(span(buffer_ + data_read_idx, bytes_to_copy));
209 
210   // If the entry wrapped, read the remaining bytes.
211   if (status.ok() && (bytes_to_copy < read_bytes)) {
212     status = read_output(span(buffer_, read_bytes - bytes_to_copy));
213   }
214   return status;
215 }
216 
InternalPopFrontAll()217 void PrefixedEntryRingBufferMulti::InternalPopFrontAll() {
218   // Forcefully pop all readers. Find the slowest reader, which must have
219   // the highest entry count, then pop all readers that have the same count.
220   //
221   // It is expected that InternalPopFrontAll is called only when there is
222   // something to pop from at least one reader. If no readers exist, or all
223   // readers are caught up, this function will assert.
224   size_t entry_count = GetSlowestReader().entry_count_;
225   PW_DCHECK_INT_NE(entry_count, 0);
226   // Otherwise, pop the readers that have the largest value.
227   for (Reader& reader : readers_) {
228     if (reader.entry_count_ == entry_count) {
229       reader.PopFront()
230           .IgnoreError();  // TODO(b/242598609): Handle Status properly
231     }
232   }
233 }
234 
GetSlowestReader() const235 const Reader& PrefixedEntryRingBufferMulti::GetSlowestReader() const {
236   PW_DCHECK_INT_GT(readers_.size(), 0);
237   const Reader* slowest_reader = &(*readers_.begin());
238   for (const Reader& reader : readers_) {
239     if (reader.entry_count_ > slowest_reader->entry_count_) {
240       slowest_reader = &reader;
241     }
242   }
243   return *slowest_reader;
244 }
245 
Dering()246 Status PrefixedEntryRingBufferMulti::Dering() {
247   if (buffer_ == nullptr || readers_.empty()) {
248     return Status::FailedPrecondition();
249   }
250 
251   // Check if by luck we're already deringed.
252   Reader& slowest_reader = GetSlowestReaderWritable();
253   if (slowest_reader.read_idx_ == 0) {
254     return OkStatus();
255   }
256 
257   return InternalDering(slowest_reader);
258 }
259 
InternalDering(Reader & dering_reader)260 Status PrefixedEntryRingBufferMulti::InternalDering(Reader& dering_reader) {
261   if (buffer_ == nullptr || readers_.empty()) {
262     return Status::FailedPrecondition();
263   }
264 
265   auto buffer_span = span(buffer_, buffer_bytes_);
266   std::rotate(buffer_span.begin(),
267               buffer_span.begin() + dering_reader.read_idx_,
268               buffer_span.end());
269 
270   // If the new index is past the end of the buffer,
271   // alias it back (wrap) to the start of the buffer.
272   if (write_idx_ < dering_reader.read_idx_) {
273     write_idx_ += buffer_bytes_;
274   }
275   write_idx_ -= dering_reader.read_idx_;
276 
277   for (Reader& reader : readers_) {
278     if (&reader == &dering_reader) {
279       continue;
280     }
281     if (reader.read_idx_ < dering_reader.read_idx_) {
282       reader.read_idx_ += buffer_bytes_;
283     }
284     reader.read_idx_ -= dering_reader.read_idx_;
285   }
286 
287   dering_reader.read_idx_ = 0;
288   return OkStatus();
289 }
290 
InternalPopFront(Reader & reader)291 Status PrefixedEntryRingBufferMulti::InternalPopFront(Reader& reader) {
292   if (buffer_ == nullptr) {
293     return Status::FailedPrecondition();
294   }
295   if (reader.entry_count_ == 0) {
296     return Status::OutOfRange();
297   }
298 
299   // Advance the read pointer past the front entry to the next one.
300   EntryInfo info = FrontEntryInfo(reader);
301   size_t entry_bytes = info.preamble_bytes + info.data_bytes;
302   size_t prev_read_idx = reader.read_idx_;
303   reader.read_idx_ = IncrementIndex(prev_read_idx, entry_bytes);
304   reader.entry_count_--;
305   return OkStatus();
306 }
307 
InternalFrontEntryDataSizeBytes(const Reader & reader) const308 size_t PrefixedEntryRingBufferMulti::InternalFrontEntryDataSizeBytes(
309     const Reader& reader) const {
310   if (reader.entry_count_ == 0) {
311     return 0;
312   }
313   return FrontEntryInfo(reader).data_bytes;
314 }
315 
InternalFrontEntryTotalSizeBytes(const Reader & reader) const316 size_t PrefixedEntryRingBufferMulti::InternalFrontEntryTotalSizeBytes(
317     const Reader& reader) const {
318   if (reader.entry_count_ == 0) {
319     return 0;
320   }
321   EntryInfo info = FrontEntryInfo(reader);
322   return info.preamble_bytes + info.data_bytes;
323 }
324 
325 PrefixedEntryRingBufferMulti::EntryInfo
FrontEntryInfo(const Reader & reader) const326 PrefixedEntryRingBufferMulti::FrontEntryInfo(const Reader& reader) const {
327   Result<PrefixedEntryRingBufferMulti::EntryInfo> entry_info =
328       RawFrontEntryInfo(reader.read_idx_);
329   PW_CHECK_OK(entry_info.status());
330   return entry_info.value();
331 }
332 
333 Result<PrefixedEntryRingBufferMulti::EntryInfo>
RawFrontEntryInfo(size_t source_idx) const334 PrefixedEntryRingBufferMulti::RawFrontEntryInfo(size_t source_idx) const {
335   // Entry headers consists of: (optional prefix byte, varint size, data...)
336 
337   // If a preamble exists, extract the varint and it's bytes in bytes.
338   size_t user_preamble_bytes = 0;
339   uint64_t user_preamble_data = 0;
340   byte varint_buf[varint::kMaxVarint32SizeBytes];
341   if (user_preamble_) {
342     RawRead(varint_buf, source_idx, varint::kMaxVarint32SizeBytes);
343     user_preamble_bytes = varint::Decode(varint_buf, &user_preamble_data);
344     if (user_preamble_bytes == 0u) {
345       return Status::DataLoss();
346     }
347   }
348 
349   // Read the entry header; extract the varint and it's bytes in bytes.
350   RawRead(varint_buf,
351           IncrementIndex(source_idx, user_preamble_bytes),
352           varint::kMaxVarint32SizeBytes);
353   uint64_t entry_bytes;
354   size_t length_bytes = varint::Decode(varint_buf, &entry_bytes);
355   if (length_bytes == 0u) {
356     return Status::DataLoss();
357   }
358 
359   EntryInfo info = {};
360   info.preamble_bytes = user_preamble_bytes + length_bytes;
361   info.user_preamble = static_cast<uint32_t>(user_preamble_data);
362   info.data_bytes = entry_bytes;
363   return info;
364 }
365 
366 // Comparisons ordered for more probable early exits, assuming the reader is
367 // not far behind the writer compared to the size of the ring.
RawAvailableBytes() const368 size_t PrefixedEntryRingBufferMulti::RawAvailableBytes() const {
369   // Compute slowest reader. If no readers exist, the entire buffer can be
370   // written.
371   if (readers_.empty()) {
372     return buffer_bytes_;
373   }
374 
375   size_t read_idx = GetSlowestReader().read_idx_;
376   // Case: Not wrapped.
377   if (read_idx < write_idx_) {
378     return buffer_bytes_ - (write_idx_ - read_idx);
379   }
380   // Case: Wrapped
381   if (read_idx > write_idx_) {
382     return read_idx - write_idx_;
383   }
384   // Case: Matched read and write heads; empty or full.
385   for (const Reader& reader : readers_) {
386     if (reader.read_idx_ == read_idx && reader.entry_count_ != 0) {
387       return 0;
388     }
389   }
390   return buffer_bytes_;
391 }
392 
RawWrite(span<const std::byte> source)393 void PrefixedEntryRingBufferMulti::RawWrite(span<const std::byte> source) {
394   if (source.size_bytes() == 0) {
395     return;
396   }
397 
398   // Write until the end of the source or the backing buffer.
399   size_t bytes_until_wrap = buffer_bytes_ - write_idx_;
400   size_t bytes_to_copy = std::min(source.size(), bytes_until_wrap);
401   memcpy(buffer_ + write_idx_, source.data(), bytes_to_copy);
402 
403   // If there wasn't space in the backing buffer, wrap to the front.
404   if (bytes_to_copy < source.size()) {
405     memcpy(
406         buffer_, source.data() + bytes_to_copy, source.size() - bytes_to_copy);
407   }
408   write_idx_ = IncrementIndex(write_idx_, source.size());
409 }
410 
RawRead(byte * destination,size_t source_idx,size_t length_bytes) const411 void PrefixedEntryRingBufferMulti::RawRead(byte* destination,
412                                            size_t source_idx,
413                                            size_t length_bytes) const {
414   if (length_bytes == 0) {
415     return;
416   }
417 
418   // Read the pre-wrap bytes.
419   size_t bytes_until_wrap = buffer_bytes_ - source_idx;
420   size_t bytes_to_copy = std::min(length_bytes, bytes_until_wrap);
421   memcpy(destination, buffer_ + source_idx, bytes_to_copy);
422 
423   // Read the post-wrap bytes, if needed.
424   if (bytes_to_copy < length_bytes) {
425     memcpy(destination + bytes_to_copy, buffer_, length_bytes - bytes_to_copy);
426   }
427 }
428 
IncrementIndex(size_t index,size_t count) const429 size_t PrefixedEntryRingBufferMulti::IncrementIndex(size_t index,
430                                                     size_t count) const {
431   // Note: This doesn't use modulus (%) since the branch is cheaper, and we
432   // guarantee that count will never be greater than buffer_bytes_.
433   index += count;
434   if (index > buffer_bytes_) {
435     index -= buffer_bytes_;
436   }
437   return index;
438 }
439 
PeekFrontWithPreamble(span<byte> data,uint32_t & user_preamble_out,size_t & entry_bytes_read_out) const440 Status PrefixedEntryRingBufferMulti::Reader::PeekFrontWithPreamble(
441     span<byte> data,
442     uint32_t& user_preamble_out,
443     size_t& entry_bytes_read_out) const {
444   entry_bytes_read_out = 0;
445   return buffer_->InternalRead(
446       *this, GetOutput(data, &entry_bytes_read_out), false, &user_preamble_out);
447 }
448 
operator ++()449 iterator& iterator::operator++() {
450   PW_DCHECK_OK(iteration_status_);
451   PW_DCHECK_INT_NE(entry_count_, 0);
452 
453   Result<EntryInfo> info = ring_buffer_->RawFrontEntryInfo(read_idx_);
454   if (!info.status().ok()) {
455     SkipToEnd(info.status());
456     return *this;
457   }
458 
459   // It is guaranteed that the buffer is deringed at this point.
460   read_idx_ += info.value().preamble_bytes + info.value().data_bytes;
461   entry_count_--;
462 
463   if (entry_count_ == 0) {
464     SkipToEnd(OkStatus());
465     return *this;
466   }
467 
468   if (read_idx_ >= ring_buffer_->TotalUsedBytes()) {
469     SkipToEnd(Status::DataLoss());
470     return *this;
471   }
472 
473   info = ring_buffer_->RawFrontEntryInfo(read_idx_);
474   if (!info.status().ok()) {
475     SkipToEnd(info.status());
476     return *this;
477   }
478   return *this;
479 }
480 
operator *() const481 const Entry& iterator::operator*() const {
482   PW_DCHECK_OK(iteration_status_);
483   PW_DCHECK_INT_NE(entry_count_, 0);
484 
485   Result<EntryInfo> info = ring_buffer_->RawFrontEntryInfo(read_idx_);
486   PW_DCHECK_OK(info.status());
487 
488   entry_ = {
489       .buffer = span<const byte>(
490           ring_buffer_->buffer_ + read_idx_ + info.value().preamble_bytes,
491           info.value().data_bytes),
492       .preamble = info.value().user_preamble,
493   };
494   return entry_;
495 }
496 
497 }  // namespace ring_buffer
498 }  // namespace pw
499