• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <limits>
17 #include <mutex>
18 
19 #include "pw_assert/assert.h"
20 #include "pw_bytes/span.h"
21 #include "pw_function/function.h"
22 #include "pw_multisink/config.h"
23 #include "pw_result/result.h"
24 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
25 #include "pw_status/status.h"
26 #include "pw_sync/lock_annotations.h"
27 
28 namespace pw {
29 namespace multisink {
30 
31 // An asynchronous single-writer multi-reader queue that ensures readers can
32 // poll for dropped message counts, which is useful for logging or similar
33 // scenarios where readers need to be aware of the input message sequence.
34 //
35 // This class is thread-safe but NOT IRQ-safe when
36 // PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled.
37 class MultiSink {
38  public:
39   // An asynchronous reader which is attached to a MultiSink via AttachDrain.
40   // Each Drain holds a PrefixedEntryRingBufferMulti::Reader and abstracts away
41   // entry sequence information for clients when popping.
42   class Drain {
43    public:
44     // Holds the context for a peeked entry, tha the user may pass to `PopEntry`
45     // to advance the drain.
46     class PeekedEntry {
47      public:
48       // Provides access to the peeked entry's data.
entry()49       ConstByteSpan entry() const { return entry_; }
50 
51      private:
52       friend MultiSink;
53       friend MultiSink::Drain;
54 
PeekedEntry(ConstByteSpan entry,uint32_t sequence_id)55       constexpr PeekedEntry(ConstByteSpan entry, uint32_t sequence_id)
56           : entry_(entry), sequence_id_(sequence_id) {}
57 
sequence_id()58       uint32_t sequence_id() const { return sequence_id_; }
59 
60       const ConstByteSpan entry_;
61       const uint32_t sequence_id_;
62     };
63 
Drain()64     constexpr Drain()
65         : last_handled_sequence_id_(0),
66           last_peek_sequence_id_(0),
67           last_handled_ingress_drop_count_(0),
68           multisink_(nullptr) {}
69 
70     // Returns the next available entry if it exists and acquires the latest
71     // drop count in parallel.
72     //
73     // If the read operation was successful or returned OutOfRange (i.e. no
74     // entries to read) then the `drain_drop_count_out` is set to the number of
75     // entries that were dropped since the last call to PopEntry due to
76     // advancing the drain, and `ingress_drop_count_out` is set to the number of
77     // logs that were dropped before being added to the MultiSink. Otherwise,
78     // the drop counts are set to zero, so should always be processed.
79     //
80     // Drop counts are internally maintained with a 32-bit counter. If
81     // UINT32_MAX entries have been handled by the attached multisink between
82     // subsequent calls to PopEntry, the drop count will overflow and will
83     // report a lower count erroneously. Users should ensure that sinks call
84     // PopEntry at least once every UINT32_MAX entries.
85     //
86     // Example Usage:
87     //
88     // void ProcessEntriesFromDrain(Drain& drain) {
89     //   std::array<std::byte, kEntryBufferSize> buffer;
90     //   uint32_t drop_count = 0;
91     //
92     //   // Example#1: Request the drain for a new entry.
93     //   {
94     //     const Result<ConstByteSpan> result = drain.PopEntry(buffer,
95     //                                                         drop_count);
96     //
97     //     // If a non-zero drop count is received, process them.
98     //     if (drop_count > 0) {
99     //       ProcessDropCount(drop_count);
100     //     }
101     //
102     //     // If the call was successful, process the entry that was received.
103     //     if (result.ok()) {
104     //       ProcessEntry(result.value());
105     //     }
106     //   }
107     //
108     //   // Example#2: Drain out all messages.
109     //   {
110     //     Result<ConstByteSpan> result = Status::OutOfRange();
111     //     do {
112     //       result = drain.PopEntry(buffer, drop_count);
113     //
114     //       if (drop_count > 0) {
115     //         ProcessDropCount(drop_count);
116     //       }
117     //
118     //       if (result.ok()) {
119     //         ProcessEntry(result.value());
120     //       }
121     //
122     //       // Keep trying until we hit OutOfRange. Note that a new entry may
123     //       // have arrived after the PopEntry call.
124     //     } while (!result.IsOutOfRange());
125     //   }
126     // }
127     // Precondition: the buffer data must not be corrupt, otherwise there will
128     // be a crash.
129     //
130     // Return values:
131     // OK - An entry was successfully read from the multisink.
132     // OUT_OF_RANGE - No entries were available.
133     // FAILED_PRECONDITION - The drain must be attached to a sink.
134     // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
135     // the next available entry, which was discarded.
136     Result<ConstByteSpan> PopEntry(ByteSpan buffer,
137                                    uint32_t& drain_drop_count_out,
138                                    uint32_t& ingress_drop_count_out)
139         PW_LOCKS_EXCLUDED(multisink_->lock_);
140     // Overload that combines drop counts.
141     // TODO(cachinchilla): remove when downstream projects migrated to new API.
142     [[deprecated("Use PopEntry with different drop count outputs")]] Result<
143         ConstByteSpan>
PopEntry(ByteSpan buffer,uint32_t & drop_count_out)144     PopEntry(ByteSpan buffer, uint32_t& drop_count_out)
145         PW_LOCKS_EXCLUDED(multisink_->lock_) {
146       uint32_t ingress_drop_count = 0;
147       Result<ConstByteSpan> result =
148           PopEntry(buffer, drop_count_out, ingress_drop_count);
149       drop_count_out += ingress_drop_count;
150       return result;
151     }
152 
153     // Removes the previously peeked entry from the multisink.
154     //
155     // Example Usage:
156     //
157     //  // Peek entry to send it, and remove entry from multisink on success.
158     //  uint32_t drop_count;
159     //  const Result<PeekedEntry> peek_result =
160     //      PeekEntry(out_buffer, drop_count);
161     //  if (!peek_result.ok()) {
162     //    return peek_result.status();
163     //  }
164     //  Status send_status = UserSendFunction(peek_result.value().entry())
165     //  if (!send_status.ok()) {
166     //    return send_status;
167     //  }
168     //  PW_CHECK_OK(PopEntry(peek_result.value());
169     //
170     // Precondition: the buffer data must not be corrupt, otherwise there will
171     // be a crash.
172     //
173     // Return values:
174     // OK - the entry or entries were removed from the multisink successfully.
175     // FAILED_PRECONDITION - The drain must be attached to a sink.
176     Status PopEntry(const PeekedEntry& entry)
177         PW_LOCKS_EXCLUDED(multisink_->lock_);
178 
179     // Returns a copy of the next available entry if it exists and acquires the
180     // latest drop count if the drain was advanced, and the latest ingress drop
181     // count, without moving the drain forward, except if there is a
182     // RESOURCE_EXHAUSTED error when peeking, in which case the drain is
183     // automatically advanced.
184     // The `drain_drop_count_out` follows the same logic as `PopEntry`. The user
185     // must call `PopEntry` once the data in peek was used successfully.
186     //
187     // Precondition: the buffer data must not be corrupt, otherwise there will
188     // be a crash.
189     //
190     // Return values:
191     // OK - An entry was successfully read from the multisink.
192     // OUT_OF_RANGE - No entries were available.
193     // FAILED_PRECONDITION - The drain must be attached to a sink.
194     // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
195     // the next available entry, which was discarded.
196     Result<PeekedEntry> PeekEntry(ByteSpan buffer,
197                                   uint32_t& drain_drop_count_out,
198                                   uint32_t& ingress_drop_count_out)
199         PW_LOCKS_EXCLUDED(multisink_->lock_);
200 
201     // Drains are not copyable or movable.
202     Drain(const Drain&) = delete;
203     Drain& operator=(const Drain&) = delete;
204     Drain(Drain&&) = delete;
205     Drain& operator=(Drain&&) = delete;
206 
207     // Returns size of unread entries in the sink for this drain. This is a
208     // thread-safe version and must not be used inside a
209     // Listener::OnNewEntryAvailable() to avoid deadlocks. Use
210     // UnsafeGetUnreadEntriesSize() instead in such case.
GetUnreadEntriesSize()211     size_t GetUnreadEntriesSize() const PW_LOCKS_EXCLUDED(multisink_->lock_) {
212       std::lock_guard lock(multisink_->lock_);
213       return UnsafeGetUnreadEntriesSize();
214     }
215 
216     // Returns size of unread entries in the sink for this drain.
217     // Marked unsafe because it requires external synchronization.
UnsafeGetUnreadEntriesSize()218     size_t UnsafeGetUnreadEntriesSize() const PW_NO_LOCK_SAFETY_ANALYSIS {
219       return reader_.EntriesSize();
220     }
221 
222    protected:
223     friend MultiSink;
224 
225     // The `reader_` and `last_handled_sequence_id_` are managed by attached
226     // multisink and are guarded by `multisink_->lock_` when used.
227     ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_;
228     uint32_t last_handled_sequence_id_;
229     uint32_t last_peek_sequence_id_;
230     uint32_t last_handled_ingress_drop_count_;
231     MultiSink* multisink_;
232   };
233 
234   // A pure-virtual listener of a MultiSink, attached via AttachListener.
235   // MultiSink's invoke listeners when new data arrives, allowing them to
236   // schedule the draining of messages out of the MultiSink.
237   class Listener : public IntrusiveList<Listener>::Item {
238    public:
Listener()239     constexpr Listener() {}
240     virtual ~Listener() = default;
241 
242     // Listeners are not copyable or movable.
243     Listener(const Listener&) = delete;
244     Listener& operator=(const Drain&) = delete;
245     Listener(Listener&&) = delete;
246     Listener& operator=(Drain&&) = delete;
247 
248    protected:
249     friend MultiSink;
250 
251     // Invoked by the attached multisink when a new entry or drop count is
252     // available. The multisink lock is held during this call, so neither the
253     // multisink nor it's drains can be used during this callback.
254     virtual void OnNewEntryAvailable() = 0;
255   };
256 
257   class iterator {
258    public:
259     iterator& operator++() {
260       it_++;
261       return *this;
262     }
263     iterator operator++(int) {
264       iterator original = *this;
265       ++*this;
266       return original;
267     }
268 
269     ConstByteSpan& operator*() {
270       entry_ = (*it_).buffer;
271       return entry_;
272     }
273     ConstByteSpan* operator->() { return &operator*(); }
274 
275     constexpr bool operator==(const iterator& rhs) const {
276       return it_ == rhs.it_;
277     }
278 
279     constexpr bool operator!=(const iterator& rhs) const {
280       return it_ != rhs.it_;
281     }
282 
283     // Returns the status of the last iteration operation. If the iterator
284     // fails to read an entry, it will move to iterator::end() and indicate
285     // the failure reason here.
286     //
287     // Return values:
288     // OK - iteration is successful and iterator points to the next entry.
289     // DATA_LOSS - Failed to read the metadata at this location.
status()290     Status status() const { return it_.status(); }
291 
292    private:
293     friend class MultiSink;
294 
iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)295     iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
296         : it_(reader) {}
iterator()297     iterator() {}
298 
299     ring_buffer::PrefixedEntryRingBufferMulti::iterator it_;
300     ConstByteSpan entry_;
301   };
302 
303   class UnsafeIterationWrapper {
304    public:
305     using element_type = ConstByteSpan;
306     using value_type = std::remove_cv_t<ConstByteSpan>;
307     using pointer = ConstByteSpan*;
308     using reference = ConstByteSpan&;
309     using const_iterator = iterator;  // Standard alias for iterable types.
310 
begin()311     iterator begin() const { return iterator(*reader_); }
end()312     iterator end() const { return iterator(); }
cbegin()313     const_iterator cbegin() const { return begin(); }
cend()314     const_iterator cend() const { return end(); }
315 
316    private:
317     friend class MultiSink;
UnsafeIterationWrapper(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)318     UnsafeIterationWrapper(
319         ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
320         : reader_(&reader) {}
321     ring_buffer::PrefixedEntryRingBufferMulti::Reader* reader_;
322   };
323 
UnsafeIteration()324   UnsafeIterationWrapper UnsafeIteration() PW_NO_LOCK_SAFETY_ANALYSIS {
325     return UnsafeIterationWrapper(oldest_entry_drain_.reader_);
326   }
327 
328   // Constructs a multisink using a ring buffer backed by the provided buffer.
329   // If we're using a virtual lock, then the lock needs to be passed in.
330 #if PW_MULTISINK_CONFIG_LOCK_TYPE == PW_MULTISINK_VIRTUAL_LOCK
MultiSink(ByteSpan buffer,LockType lock)331   MultiSink(ByteSpan buffer, LockType lock)
332       : lock_(lock),
333 #else
334   MultiSink(ByteSpan buffer)
335       :
336 #endif
337         ring_buffer_(true),
338         sequence_id_(0),
339         total_ingress_drops_(0) {
340     PW_ASSERT(ring_buffer_.SetBuffer(buffer).ok());
341     AttachDrain(oldest_entry_drain_);
342   }
343 
344   // Write an entry to the multisink. If available space is less than the
345   // size of the entry, the internal ring buffer will push the oldest entries
346   // out to make space, so long as the entry is not larger than the buffer.
347   // The sequence ID of the multisink will always increment as a result of
348   // calling HandleEntry, regardless of whether pushing the entry succeeds.
349   //
350   // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this
351   // function must not be called from an interrupt context.
352   // Precondition: entry.size() <= `ring_buffer_` size
353   void HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_);
354 
355   // Notifies the multisink of messages dropped before ingress. The writer
356   // may use this to signal to readers that an entry (or entries) failed
357   // before being sent to the multisink (e.g. the writer failed to encode
358   // the message). This API increments the sequence ID of the multisink by
359   // the provided `drop_count`.
360   void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_);
361 
362   // Attach a drain to the multisink. Drains may not be associated with more
363   // than one multisink at a time. Drains can consume entries pushed before
364   // the drain was attached, so long as they have not yet been evicted from
365   // the underlying ring buffer.
366   //
367   // Precondition: The drain must not be attached to a multisink.
368   void AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
369 
370   // Detaches a drain from the multisink. Drains may only be detached if they
371   // were previously attached to this multisink.
372   //
373   // Precondition: The drain must be attached to this multisink.
374   void DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
375 
376   // Attach a listener to the multisink. The listener will be notified
377   // immediately when attached, to allow late drain users to consume existing
378   // entries. If draining in response to the notification, ensure that the drain
379   // is attached prior to registering the listener; attempting to drain when
380   // unattached will crash. Once attached, listeners are invoked on all new
381   // messages.
382   //
383   // Precondition: The listener must not be attached to a multisink.
384   void AttachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
385 
386   // Detaches a listener from the multisink.
387   //
388   // Precondition: The listener must be attached to this multisink.
389   void DetachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
390 
391   // Removes all data from the internal buffer. The multisink's sequence ID is
392   // not modified, so readers may interpret this event as droppping entries.
393   void Clear() PW_LOCKS_EXCLUDED(lock_);
394 
395   // Uses MultiSink's unsafe iteration to dump the contents to a user-provided
396   // callback. max_num_entries can be used to limit the dump to the N most
397   // recent entries.
398   //
399   // Returns:
400   //   OK - Successfully dumped entire multisink.
401   //   DATA_LOSS - Corruption detected, some entries may have been lost.
402   Status UnsafeForEachEntry(
403       const Function<void(ConstByteSpan)>& callback,
404       size_t max_num_entries = std::numeric_limits<size_t>::max());
405 
406  protected:
407   friend Drain;
408 
409   enum class Request { kPop, kPeek };
410   // Removes the previously peeked entry from the front of the multisink.
411   Status PopEntry(Drain& drain, const Drain::PeekedEntry& entry)
412       PW_LOCKS_EXCLUDED(lock_);
413 
414   // Gets a copy of the entry from the provided drain and unpacks sequence ID
415   // information. The entry is removed from the multisink when `request` is set
416   // to `Request::kPop`. Drains use this API to strip away sequence ID
417   // information for drop calculation.
418   //
419   // Precondition: the buffer data must not be corrupt, otherwise there will
420   // be a crash.
421   //
422   // Returns:
423   // OK - An entry was successfully read from the multisink. The
424   // `drain_drop_count_out` is set to the difference between the current
425   // sequence ID and the last handled ID.
426   // FAILED_PRECONDITION - The drain is not attached to
427   // a multisink.
428   // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
429   // the next available entry, which was discarded.
430   Result<ConstByteSpan> PeekOrPopEntry(Drain& drain,
431                                        ByteSpan buffer,
432                                        Request request,
433                                        uint32_t& drain_drop_count_out,
434                                        uint32_t& ingress_drop_count_out,
435                                        uint32_t& entry_sequence_id_out)
436       PW_LOCKS_EXCLUDED(lock_);
437 
438  private:
439   // Notifies attached listeners of new entries or an updated drop count.
440   void NotifyListeners() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_);
441 
442   LockType lock_;
443   IntrusiveList<Listener> listeners_ PW_GUARDED_BY(lock_);
444   ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_);
445   Drain oldest_entry_drain_ PW_GUARDED_BY(lock_);
446   uint32_t sequence_id_ PW_GUARDED_BY(lock_);
447   uint32_t total_ingress_drops_ PW_GUARDED_BY(lock_);
448 };
449 
450 }  // namespace multisink
451 }  // namespace pw
452