• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2020 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
16 
17 #include <algorithm>
18 #include <cstring>
19 
20 #include "pw_assert/light.h"
21 #include "pw_varint/varint.h"
22 
23 namespace pw {
24 namespace ring_buffer {
25 
26 using std::byte;
27 using Reader = PrefixedEntryRingBufferMulti::Reader;
28 
Clear()29 void PrefixedEntryRingBufferMulti::Clear() {
30   write_idx_ = 0;
31   for (Reader& reader : readers_) {
32     reader.read_idx = 0;
33     reader.entry_count = 0;
34   }
35 }
36 
SetBuffer(std::span<byte> buffer)37 Status PrefixedEntryRingBufferMulti::SetBuffer(std::span<byte> buffer) {
38   if ((buffer.data() == nullptr) ||  //
39       (buffer.size_bytes() == 0) ||  //
40       (buffer.size_bytes() > kMaxBufferBytes)) {
41     return Status::InvalidArgument();
42   }
43 
44   buffer_ = buffer.data();
45   buffer_bytes_ = buffer.size_bytes();
46 
47   Clear();
48   return OkStatus();
49 }
50 
AttachReader(Reader & reader)51 Status PrefixedEntryRingBufferMulti::AttachReader(Reader& reader) {
52   if (reader.buffer != nullptr) {
53     return Status::InvalidArgument();
54   }
55   reader.buffer = this;
56 
57   // Note that a newly attached reader sees the buffer as empty,
58   // and is not privy to entries pushed before being attached.
59   reader.read_idx = write_idx_;
60   reader.entry_count = 0;
61   readers_.push_back(reader);
62   return OkStatus();
63 }
64 
DetachReader(Reader & reader)65 Status PrefixedEntryRingBufferMulti::DetachReader(Reader& reader) {
66   if (reader.buffer != this) {
67     return Status::InvalidArgument();
68   }
69   reader.buffer = nullptr;
70   reader.read_idx = 0;
71   reader.entry_count = 0;
72   readers_.remove(reader);
73   return OkStatus();
74 }
75 
InternalPushBack(std::span<const byte> data,uint32_t user_preamble_data,bool drop_elements_if_needed)76 Status PrefixedEntryRingBufferMulti::InternalPushBack(
77     std::span<const byte> data,
78     uint32_t user_preamble_data,
79     bool drop_elements_if_needed) {
80   if (buffer_ == nullptr) {
81     return Status::FailedPrecondition();
82   }
83   if (data.size_bytes() == 0) {
84     return Status::InvalidArgument();
85   }
86 
87   // Prepare a single buffer that can hold both the user preamble and entry
88   // length.
89   byte preamble_buf[varint::kMaxVarint32SizeBytes * 2];
90   size_t user_preamble_bytes = 0;
91   if (user_preamble_) {
92     user_preamble_bytes =
93         varint::Encode<uint32_t>(user_preamble_data, preamble_buf);
94   }
95   size_t length_bytes = varint::Encode<uint32_t>(
96       data.size_bytes(), std::span(preamble_buf).subspan(user_preamble_bytes));
97   size_t total_write_bytes =
98       user_preamble_bytes + length_bytes + data.size_bytes();
99   if (buffer_bytes_ < total_write_bytes) {
100     return Status::OutOfRange();
101   }
102 
103   if (drop_elements_if_needed) {
104     // PushBack() case: evict items as needed.
105     // Drop old entries until we have space for the new entry.
106     while (RawAvailableBytes() < total_write_bytes) {
107       InternalPopFrontAll();
108     }
109   } else if (RawAvailableBytes() < total_write_bytes) {
110     // TryPushBack() case: don't evict items.
111     return Status::ResourceExhausted();
112   }
113 
114   // Write the new entry into the ring buffer.
115   RawWrite(std::span(preamble_buf, user_preamble_bytes + length_bytes));
116   RawWrite(data);
117 
118   // Update all readers of the new count.
119   for (Reader& reader : readers_) {
120     reader.entry_count++;
121   }
122   return OkStatus();
123 }
124 
GetOutput(std::span<byte> data_out,size_t * write_index)125 auto GetOutput(std::span<byte> data_out, size_t* write_index) {
126   return [data_out, write_index](std::span<const byte> src) -> Status {
127     size_t copy_size = std::min(data_out.size_bytes(), src.size_bytes());
128 
129     memcpy(data_out.data() + *write_index, src.data(), copy_size);
130     *write_index += copy_size;
131 
132     return (copy_size == src.size_bytes()) ? OkStatus()
133                                            : Status::ResourceExhausted();
134   };
135 }
136 
InternalPeekFront(Reader & reader,std::span<byte> data,size_t * bytes_read_out)137 Status PrefixedEntryRingBufferMulti::InternalPeekFront(Reader& reader,
138                                                        std::span<byte> data,
139                                                        size_t* bytes_read_out) {
140   *bytes_read_out = 0;
141   return InternalRead(reader, GetOutput(data, bytes_read_out), false);
142 }
143 
InternalPeekFront(Reader & reader,ReadOutput output)144 Status PrefixedEntryRingBufferMulti::InternalPeekFront(Reader& reader,
145                                                        ReadOutput output) {
146   return InternalRead(reader, output, false);
147 }
148 
InternalPeekFrontWithPreamble(Reader & reader,std::span<byte> data,size_t * bytes_read_out)149 Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
150     Reader& reader, std::span<byte> data, size_t* bytes_read_out) {
151   *bytes_read_out = 0;
152   return InternalRead(reader, GetOutput(data, bytes_read_out), true);
153 }
154 
InternalPeekFrontWithPreamble(Reader & reader,ReadOutput output)155 Status PrefixedEntryRingBufferMulti::InternalPeekFrontWithPreamble(
156     Reader& reader, ReadOutput output) {
157   return InternalRead(reader, output, true);
158 }
159 
160 // TODO(pwbug/339): Consider whether this internal templating is required, or if
161 // we can simply promote GetOutput to a static function and remove the template.
162 // T should be similar to Status (*read_output)(std::span<const byte>)
163 template <typename T>
InternalRead(Reader & reader,T read_output,bool include_preamble_in_output,uint32_t * user_preamble_out)164 Status PrefixedEntryRingBufferMulti::InternalRead(
165     Reader& reader,
166     T read_output,
167     bool include_preamble_in_output,
168     uint32_t* user_preamble_out) {
169   if (buffer_ == nullptr) {
170     return Status::FailedPrecondition();
171   }
172   if (reader.entry_count == 0) {
173     return Status::OutOfRange();
174   }
175 
176   // Figure out where to start reading (wrapped); accounting for preamble.
177   EntryInfo info = FrontEntryInfo(reader);
178   size_t read_bytes = info.data_bytes;
179   size_t data_read_idx = reader.read_idx;
180   if (user_preamble_out) {
181     *user_preamble_out = info.user_preamble;
182   }
183   if (include_preamble_in_output) {
184     read_bytes += info.preamble_bytes;
185   } else {
186     data_read_idx = IncrementIndex(data_read_idx, info.preamble_bytes);
187   }
188 
189   // Read bytes, stopping at the end of the buffer if this entry wraps.
190   size_t bytes_until_wrap = buffer_bytes_ - data_read_idx;
191   size_t bytes_to_copy = std::min(read_bytes, bytes_until_wrap);
192   Status status =
193       read_output(std::span(buffer_ + data_read_idx, bytes_to_copy));
194 
195   // If the entry wrapped, read the remaining bytes.
196   if (status.ok() && (bytes_to_copy < read_bytes)) {
197     status = read_output(std::span(buffer_, read_bytes - bytes_to_copy));
198   }
199   return status;
200 }
201 
InternalPopFrontAll()202 void PrefixedEntryRingBufferMulti::InternalPopFrontAll() {
203   // Forcefully pop all readers. Find the slowest reader, which must have
204   // the highest entry count, then pop all readers that have the same count.
205   //
206   // It is expected that InternalPopFrontAll is called only when there is
207   // something to pop from at least one reader. If no readers exist, or all
208   // readers are caught up, this function will assert.
209   size_t entry_count = GetSlowestReader().entry_count;
210   PW_DASSERT(entry_count != 0);
211   // Otherwise, pop the readers that have the largest value.
212   for (Reader& reader : readers_) {
213     if (reader.entry_count == entry_count) {
214       reader.PopFront();
215     }
216   }
217 }
218 
GetSlowestReader()219 Reader& PrefixedEntryRingBufferMulti::GetSlowestReader() {
220   // Readers are guaranteed to be before the writer pointer (the class enforces
221   // this on every read/write operation that forces the write pointer ahead of
222   // an existing reader). To determine the slowest reader, we consider three
223   // scenarios:
224   //
225   // In all below cases, WH is the write-head, and R# are readers, with R1
226   // representing the slowest reader.
227   // [[R1 R2 R3 WH]] => Right-hand writer, slowest reader is left-most reader.
228   // [[WH R1 R2 R3]] => Left-hand writer, slowest reader is left-most reader.
229   // [[R3 WH R1 R2]] => Middle-writer, slowest reader is left-most reader after
230   // writer.
231   //
232   // Formally, choose the left-most reader after the writer (ex.2,3), but if
233   // that doesn't exist, choose the left-most reader before the writer (ex.1).
234   PW_DASSERT(readers_.size() > 0);
235   Reader* slowest_reader_after_writer = nullptr;
236   Reader* slowest_reader_before_writer = nullptr;
237   for (Reader& reader : readers_) {
238     if (reader.read_idx < write_idx_) {
239       if (!slowest_reader_before_writer ||
240           reader.read_idx < slowest_reader_before_writer->read_idx) {
241         slowest_reader_before_writer = &reader;
242       }
243     } else {
244       if (!slowest_reader_after_writer ||
245           reader.read_idx < slowest_reader_after_writer->read_idx) {
246         slowest_reader_after_writer = &reader;
247       }
248     }
249   }
250   return *(slowest_reader_after_writer ? slowest_reader_after_writer
251                                        : slowest_reader_before_writer);
252 }
253 
Dering()254 Status PrefixedEntryRingBufferMulti::Dering() {
255   if (buffer_ == nullptr || readers_.size() == 0) {
256     return Status::FailedPrecondition();
257   }
258 
259   // Check if by luck we're already deringed.
260   Reader* slowest_reader = &GetSlowestReader();
261   if (slowest_reader->read_idx == 0) {
262     return OkStatus();
263   }
264 
265   auto buffer_span = std::span(buffer_, buffer_bytes_);
266   std::rotate(buffer_span.begin(),
267               buffer_span.begin() + slowest_reader->read_idx,
268               buffer_span.end());
269 
270   // If the new index is past the end of the buffer,
271   // alias it back (wrap) to the start of the buffer.
272   if (write_idx_ < slowest_reader->read_idx) {
273     write_idx_ += buffer_bytes_;
274   }
275   write_idx_ -= slowest_reader->read_idx;
276 
277   for (Reader& reader : readers_) {
278     if (&reader == slowest_reader) {
279       continue;
280     }
281     if (reader.read_idx < slowest_reader->read_idx) {
282       reader.read_idx += buffer_bytes_;
283     }
284     reader.read_idx -= slowest_reader->read_idx;
285   }
286 
287   slowest_reader->read_idx = 0;
288   return OkStatus();
289 }
290 
InternalPopFront(Reader & reader)291 Status PrefixedEntryRingBufferMulti::InternalPopFront(Reader& reader) {
292   if (buffer_ == nullptr) {
293     return Status::FailedPrecondition();
294   }
295   if (reader.entry_count == 0) {
296     return Status::OutOfRange();
297   }
298 
299   // Advance the read pointer past the front entry to the next one.
300   EntryInfo info = FrontEntryInfo(reader);
301   size_t entry_bytes = info.preamble_bytes + info.data_bytes;
302   size_t prev_read_idx = reader.read_idx;
303   reader.read_idx = IncrementIndex(prev_read_idx, entry_bytes);
304   reader.entry_count--;
305   return OkStatus();
306 }
307 
InternalFrontEntryDataSizeBytes(Reader & reader)308 size_t PrefixedEntryRingBufferMulti::InternalFrontEntryDataSizeBytes(
309     Reader& reader) {
310   if (reader.entry_count == 0) {
311     return 0;
312   }
313   return FrontEntryInfo(reader).data_bytes;
314 }
315 
InternalFrontEntryTotalSizeBytes(Reader & reader)316 size_t PrefixedEntryRingBufferMulti::InternalFrontEntryTotalSizeBytes(
317     Reader& reader) {
318   if (reader.entry_count == 0) {
319     return 0;
320   }
321   EntryInfo info = FrontEntryInfo(reader);
322   return info.preamble_bytes + info.data_bytes;
323 }
324 
325 PrefixedEntryRingBufferMulti::EntryInfo
FrontEntryInfo(Reader & reader)326 PrefixedEntryRingBufferMulti::FrontEntryInfo(Reader& reader) {
327   // Entry headers consists of: (optional prefix byte, varint size, data...)
328 
329   // If a preamble exists, extract the varint and it's bytes in bytes.
330   size_t user_preamble_bytes = 0;
331   uint64_t user_preamble_data = 0;
332   byte varint_buf[varint::kMaxVarint32SizeBytes];
333   if (user_preamble_) {
334     RawRead(varint_buf, reader.read_idx, varint::kMaxVarint32SizeBytes);
335     user_preamble_bytes = varint::Decode(varint_buf, &user_preamble_data);
336     PW_DASSERT(user_preamble_bytes != 0u);
337   }
338 
339   // Read the entry header; extract the varint and it's bytes in bytes.
340   RawRead(varint_buf,
341           IncrementIndex(reader.read_idx, user_preamble_bytes),
342           varint::kMaxVarint32SizeBytes);
343   uint64_t entry_bytes;
344   size_t length_bytes = varint::Decode(varint_buf, &entry_bytes);
345   PW_DASSERT(length_bytes != 0u);
346 
347   EntryInfo info = {};
348   info.preamble_bytes = user_preamble_bytes + length_bytes;
349   info.user_preamble = static_cast<uint32_t>(user_preamble_data);
350   info.data_bytes = entry_bytes;
351   return info;
352 }
353 
354 // Comparisons ordered for more probable early exits, assuming the reader is
355 // not far behind the writer compared to the size of the ring.
RawAvailableBytes()356 size_t PrefixedEntryRingBufferMulti::RawAvailableBytes() {
357   // Compute slowest reader.
358   // TODO: Alternatively, the slowest reader could be actively mantained on
359   // every read operation, but reads are more likely than writes.
360   if (readers_.size() == 0) {
361     return buffer_bytes_;
362   }
363 
364   size_t read_idx = GetSlowestReader().read_idx;
365   // Case: Not wrapped.
366   if (read_idx < write_idx_) {
367     return buffer_bytes_ - (write_idx_ - read_idx);
368   }
369   // Case: Wrapped
370   if (read_idx > write_idx_) {
371     return read_idx - write_idx_;
372   }
373   // Case: Matched read and write heads; empty or full.
374   for (Reader& reader : readers_) {
375     if (reader.read_idx == read_idx && reader.entry_count != 0) {
376       return 0;
377     }
378   }
379   return buffer_bytes_;
380 }
381 
RawWrite(std::span<const std::byte> source)382 void PrefixedEntryRingBufferMulti::RawWrite(std::span<const std::byte> source) {
383   // Write until the end of the source or the backing buffer.
384   size_t bytes_until_wrap = buffer_bytes_ - write_idx_;
385   size_t bytes_to_copy = std::min(source.size(), bytes_until_wrap);
386   memcpy(buffer_ + write_idx_, source.data(), bytes_to_copy);
387 
388   // If there wasn't space in the backing buffer, wrap to the front.
389   if (bytes_to_copy < source.size()) {
390     memcpy(
391         buffer_, source.data() + bytes_to_copy, source.size() - bytes_to_copy);
392   }
393   write_idx_ = IncrementIndex(write_idx_, source.size());
394 }
395 
RawRead(byte * destination,size_t source_idx,size_t length_bytes)396 void PrefixedEntryRingBufferMulti::RawRead(byte* destination,
397                                            size_t source_idx,
398                                            size_t length_bytes) {
399   // Read the pre-wrap bytes.
400   size_t bytes_until_wrap = buffer_bytes_ - source_idx;
401   size_t bytes_to_copy = std::min(length_bytes, bytes_until_wrap);
402   memcpy(destination, buffer_ + source_idx, bytes_to_copy);
403 
404   // Read the post-wrap bytes, if needed.
405   if (bytes_to_copy < length_bytes) {
406     memcpy(destination + bytes_to_copy, buffer_, length_bytes - bytes_to_copy);
407   }
408 }
409 
IncrementIndex(size_t index,size_t count)410 size_t PrefixedEntryRingBufferMulti::IncrementIndex(size_t index,
411                                                     size_t count) {
412   // Note: This doesn't use modulus (%) since the branch is cheaper, and we
413   // guarantee that count will never be greater than buffer_bytes_.
414   index += count;
415   if (index > buffer_bytes_) {
416     index -= buffer_bytes_;
417   }
418   return index;
419 }
420 
PeekFrontWithPreamble(std::span<byte> data,uint32_t & user_preamble_out,size_t & entry_bytes_read_out)421 Status PrefixedEntryRingBufferMulti::Reader::PeekFrontWithPreamble(
422     std::span<byte> data,
423     uint32_t& user_preamble_out,
424     size_t& entry_bytes_read_out) {
425   entry_bytes_read_out = 0;
426   return buffer->InternalRead(
427       *this, GetOutput(data, &entry_bytes_read_out), false, &user_preamble_out);
428 }
429 
430 }  // namespace ring_buffer
431 }  // namespace pw
432