• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include <limits>
17 #include <mutex>
18 
19 #include "pw_bytes/span.h"
20 #include "pw_function/function.h"
21 #include "pw_multisink/config.h"
22 #include "pw_result/result.h"
23 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
24 #include "pw_status/status.h"
25 #include "pw_sync/lock_annotations.h"
26 
27 namespace pw {
28 namespace multisink {
29 
30 // An asynchronous single-writer multi-reader queue that ensures readers can
31 // poll for dropped message counts, which is useful for logging or similar
32 // scenarios where readers need to be aware of the input message sequence.
33 //
34 // This class is thread-safe but NOT IRQ-safe when
35 // PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled.
36 class MultiSink {
37  public:
38   // An asynchronous reader which is attached to a MultiSink via AttachDrain.
39   // Each Drain holds a PrefixedEntryRingBufferMulti::Reader and abstracts away
40   // entry sequence information for clients when popping.
41   class Drain {
42    public:
43     // Holds the context for a peeked entry, tha the user may pass to `PopEntry`
44     // to advance the drain.
45     class PeekedEntry {
46      public:
47       // Provides access to the peeked entry's data.
entry()48       ConstByteSpan entry() const { return entry_; }
49 
50      private:
51       friend MultiSink;
52       friend MultiSink::Drain;
53 
PeekedEntry(ConstByteSpan entry,uint32_t sequence_id)54       constexpr PeekedEntry(ConstByteSpan entry, uint32_t sequence_id)
55           : entry_(entry), sequence_id_(sequence_id) {}
56 
sequence_id()57       uint32_t sequence_id() const { return sequence_id_; }
58 
59       const ConstByteSpan entry_;
60       const uint32_t sequence_id_;
61     };
62 
Drain()63     constexpr Drain()
64         : last_handled_sequence_id_(0),
65           last_peek_sequence_id_(0),
66           last_handled_ingress_drop_count_(0),
67           multisink_(nullptr) {}
68 
69     // Returns the next available entry if it exists and acquires the latest
70     // drop count in parallel.
71     //
72     // If the read operation was successful or returned OutOfRange (i.e. no
73     // entries to read) then the `drop_count_out` is set to the number of
74     // entries that were dropped since the last call to PopEntry due to
75     // advancing the drain, and `ingress_drop_count_out` is set to the number of
76     // logs that were dropped before being added to the MultiSink. Otherwise,
77     // the drop counts are set to zero, so should always be processed.
78     //
79     // Drop counts are internally maintained with a 32-bit counter. If
80     // UINT32_MAX entries have been handled by the attached multisink between
81     // subsequent calls to PopEntry, the drop count will overflow and will
82     // report a lower count erroneously. Users should ensure that sinks call
83     // PopEntry at least once every UINT32_MAX entries.
84     //
85     // Example Usage:
86     //
87     // void ProcessEntriesFromDrain(Drain& drain) {
88     //   std::array<std::byte, kEntryBufferSize> buffer;
89     //   uint32_t drop_count = 0;
90     //
91     //   // Example#1: Request the drain for a new entry.
92     //   {
93     //     const Result<ConstByteSpan> result = drain.PopEntry(buffer,
94     //                                                         drop_count);
95     //
96     //     // If a non-zero drop count is received, process them.
97     //     if (drop_count > 0) {
98     //       ProcessDropCount(drop_count);
99     //     }
100     //
101     //     // If the call was successful, process the entry that was received.
102     //     if (result.ok()) {
103     //       ProcessEntry(result.value());
104     //     }
105     //   }
106     //
107     //   // Example#2: Drain out all messages.
108     //   {
109     //     Result<ConstByteSpan> result = Status::OutOfRange();
110     //     do {
111     //       result = drain.PopEntry(buffer, drop_count);
112     //
113     //       if (drop_count > 0) {
114     //         ProcessDropCount(drop_count);
115     //       }
116     //
117     //       if (result.ok()) {
118     //         ProcessEntry(result.value());
119     //       }
120     //
121     //       // Keep trying until we hit OutOfRange. Note that a new entry may
122     //       // have arrived after the PopEntry call.
123     //     } while (!result.IsOutOfRange());
124     //   }
125     // }
126     // Precondition: the buffer data must not be corrupt, otherwise there will
127     // be a crash.
128     //
129     // Return values:
130     // OK - An entry was successfully read from the multisink.
131     // OUT_OF_RANGE - No entries were available.
132     // FAILED_PRECONDITION - The drain must be attached to a sink.
133     // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
134     // the next available entry, which was discarded.
135     Result<ConstByteSpan> PopEntry(ByteSpan buffer,
136                                    uint32_t& drop_count_out,
137                                    uint32_t& ingress_drop_count)
138         PW_LOCKS_EXCLUDED(multisink_->lock_);
139     // Overload that combines drop counts.
140     // TODO(cachinchilla): remove when downstream projects migrated to new API.
141     [[deprecated("Use PopEntry with different drop count outputs")]] Result<
142         ConstByteSpan>
PopEntry(ByteSpan buffer,uint32_t & drop_count_out)143     PopEntry(ByteSpan buffer, uint32_t& drop_count_out)
144         PW_LOCKS_EXCLUDED(multisink_->lock_) {
145       uint32_t ingress_drop_count = 0;
146       Result<ConstByteSpan> result =
147           PopEntry(buffer, drop_count_out, ingress_drop_count);
148       drop_count_out += ingress_drop_count;
149       return result;
150     }
151 
152     // Removes the previously peeked entry from the multisink.
153     //
154     // Example Usage:
155     //
156     //  // Peek entry to send it, and remove entry from multisink on success.
157     //  uint32_t drop_count;
158     //  const Result<PeekedEntry> peek_result =
159     //      PeekEntry(out_buffer, drop_count);
160     //  if (!peek_result.ok()) {
161     //    return peek_result.status();
162     //  }
163     //  Status send_status = UserSendFunction(peek_result.value().entry())
164     //  if (!send_status.ok())
165     //    return send_status;
166     //  }
167     //  PW_CHECK_OK(PopEntry(peek_result.value());
168     //
169     // Precondition: the buffer data must not be corrupt, otherwise there will
170     // be a crash.
171     //
172     // Return values:
173     // OK - the entry or entries were removed from the multisink succesfully.
174     // FAILED_PRECONDITION - The drain must be attached to a sink.
175     Status PopEntry(const PeekedEntry& entry)
176         PW_LOCKS_EXCLUDED(multisink_->lock_);
177 
178     // Returns a copy of the next available entry if it exists and acquires the
179     // latest drop count if the drain was advanced, and the latest ingress drop
180     // count, without moving the drain forward, except if there is a
181     // RESOURCE_EXHAUSTED error when peeking, in which case the drain is
182     // automatically advanced.
183     // The `drop_count_out` follows the same logic as `PopEntry`. The user must
184     // call `PopEntry` once the data in peek was used successfully.
185     //
186     // Precondition: the buffer data must not be corrupt, otherwise there will
187     // be a crash.
188     //
189     // Return values:
190     // OK - An entry was successfully read from the multisink.
191     // OUT_OF_RANGE - No entries were available.
192     // FAILED_PRECONDITION - The drain must be attached to a sink.
193     // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
194     // the next available entry, which was discarded.
195     Result<PeekedEntry> PeekEntry(ByteSpan buffer,
196                                   uint32_t& drop_count_out,
197                                   uint32_t& ingress_drop_count)
198         PW_LOCKS_EXCLUDED(multisink_->lock_);
199 
200     // Drains are not copyable or movable.
201     Drain(const Drain&) = delete;
202     Drain& operator=(const Drain&) = delete;
203     Drain(Drain&&) = delete;
204     Drain& operator=(Drain&&) = delete;
205 
206    protected:
207     friend MultiSink;
208 
209     // The `reader_` and `last_handled_sequence_id_` are managed by attached
210     // multisink and are guarded by `multisink_->lock_` when used.
211     ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_;
212     uint32_t last_handled_sequence_id_;
213     uint32_t last_peek_sequence_id_;
214     uint32_t last_handled_ingress_drop_count_;
215     MultiSink* multisink_;
216   };
217 
218   // A pure-virtual listener of a MultiSink, attached via AttachListener.
219   // MultiSink's invoke listeners when new data arrives, allowing them to
220   // schedule the draining of messages out of the MultiSink.
221   class Listener : public IntrusiveList<Listener>::Item {
222    public:
Listener()223     constexpr Listener() {}
224     virtual ~Listener() = default;
225 
226     // Listeners are not copyable or movable.
227     Listener(const Listener&) = delete;
228     Listener& operator=(const Drain&) = delete;
229     Listener(Listener&&) = delete;
230     Listener& operator=(Drain&&) = delete;
231 
232    protected:
233     friend MultiSink;
234 
235     // Invoked by the attached multisink when a new entry or drop count is
236     // available. The multisink lock is held during this call, so neither the
237     // multisink nor it's drains can be used during this callback.
238     virtual void OnNewEntryAvailable() = 0;
239   };
240 
241   class iterator {
242    public:
243     iterator& operator++() {
244       it_++;
245       return *this;
246     }
247     iterator operator++(int) {
248       iterator original = *this;
249       ++*this;
250       return original;
251     }
252 
253     ConstByteSpan& operator*() {
254       entry_ = (*it_).buffer;
255       return entry_;
256     }
257     ConstByteSpan* operator->() { return &operator*(); }
258 
259     constexpr bool operator==(const iterator& rhs) const {
260       return it_ == rhs.it_;
261     }
262 
263     constexpr bool operator!=(const iterator& rhs) const {
264       return it_ != rhs.it_;
265     }
266 
267     // Returns the status of the last iteration operation. If the iterator
268     // fails to read an entry, it will move to iterator::end() and indicate
269     // the failure reason here.
270     //
271     // Return values:
272     // OK - iteration is successful and iterator points to the next entry.
273     // DATA_LOSS - Failed to read the metadata at this location.
status()274     Status status() const { return it_.status(); }
275 
276    private:
277     friend class MultiSink;
278 
iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)279     iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
280         : it_(reader) {}
iterator()281     iterator() {}
282 
283     ring_buffer::PrefixedEntryRingBufferMulti::iterator it_;
284     ConstByteSpan entry_;
285   };
286 
287   class UnsafeIterationWrapper {
288    public:
289     using element_type = ConstByteSpan;
290     using value_type = std::remove_cv_t<ConstByteSpan>;
291     using pointer = ConstByteSpan*;
292     using reference = ConstByteSpan&;
293     using const_iterator = iterator;  // Standard alias for iterable types.
294 
begin()295     iterator begin() const { return iterator(*reader_); }
end()296     iterator end() const { return iterator(); }
cbegin()297     const_iterator cbegin() const { return begin(); }
cend()298     const_iterator cend() const { return end(); }
299 
300    private:
301     friend class MultiSink;
UnsafeIterationWrapper(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)302     UnsafeIterationWrapper(
303         ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader)
304         : reader_(&reader) {}
305     ring_buffer::PrefixedEntryRingBufferMulti::Reader* reader_;
306   };
307 
UnsafeIteration()308   UnsafeIterationWrapper UnsafeIteration() PW_NO_LOCK_SAFETY_ANALYSIS {
309     return UnsafeIterationWrapper(oldest_entry_drain_.reader_);
310   }
311 
312   // Constructs a multisink using a ring buffer backed by the provided buffer.
MultiSink(ByteSpan buffer)313   MultiSink(ByteSpan buffer)
314       : ring_buffer_(true), sequence_id_(0), total_ingress_drops_(0) {
315     ring_buffer_.SetBuffer(buffer)
316         .IgnoreError();  // TODO(pwbug/387): Handle Status properly
317     AttachDrain(oldest_entry_drain_);
318   }
319 
320   // Write an entry to the multisink. If available space is less than the
321   // size of the entry, the internal ring buffer will push the oldest entries
322   // out to make space, so long as the entry is not larger than the buffer.
323   // The sequence ID of the multisink will always increment as a result of
324   // calling HandleEntry, regardless of whether pushing the entry succeeds.
325   //
326   // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this
327   // function must not be called from an interrupt context.
328   // Precondition: entry.size() <= `ring_buffer_` size
329   void HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_);
330 
331   // Notifies the multisink of messages dropped before ingress. The writer
332   // may use this to signal to readers that an entry (or entries) failed
333   // before being sent to the multisink (e.g. the writer failed to encode
334   // the message). This API increments the sequence ID of the multisink by
335   // the provided `drop_count`.
336   void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_);
337 
338   // Attach a drain to the multisink. Drains may not be associated with more
339   // than one multisink at a time. Drains can consume entries pushed before
340   // the drain was attached, so long as they have not yet been evicted from
341   // the underlying ring buffer.
342   //
343   // Precondition: The drain must not be attached to a multisink.
344   void AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
345 
346   // Detaches a drain from the multisink. Drains may only be detached if they
347   // were previously attached to this multisink.
348   //
349   // Precondition: The drain must be attached to this multisink.
350   void DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_);
351 
352   // Attach a listener to the multisink. The listener will be notified
353   // immediately when attached, to allow late drain users to consume existing
354   // entries. If draining in response to the notification, ensure that the drain
355   // is attached prior to registering the listener; attempting to drain when
356   // unattached will crash. Once attached, listeners are invoked on all new
357   // messages.
358   //
359   // Precondition: The listener must not be attached to a multisink.
360   void AttachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
361 
362   // Detaches a listener from the multisink.
363   //
364   // Precondition: The listener must be attached to this multisink.
365   void DetachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_);
366 
367   // Removes all data from the internal buffer. The multisink's sequence ID is
368   // not modified, so readers may interpret this event as droppping entries.
369   void Clear() PW_LOCKS_EXCLUDED(lock_);
370 
371   // Uses MultiSink's unsafe iteration to dump the contents to a user-provided
372   // callback. max_num_entries can be used to limit the dump to the N most
373   // recent entries.
374   //
375   // Returns:
376   //   OK - Successfully dumped entire multisink.
377   //   DATA_LOSS - Corruption detected, some entries may have been lost.
378   Status UnsafeForEachEntry(
379       const Function<void(ConstByteSpan)>& callback,
380       size_t max_num_entries = std::numeric_limits<size_t>::max());
381 
382  protected:
383   friend Drain;
384 
385   enum class Request { kPop, kPeek };
386   // Removes the previously peeked entry from the front of the multisink.
387   Status PopEntry(Drain& drain, const Drain::PeekedEntry& entry)
388       PW_LOCKS_EXCLUDED(lock_);
389 
390   // Gets a copy of the entry from the provided drain and unpacks sequence ID
391   // information. The entry is removed from the multisink when `request` is set
392   // to `Request::kPop`. Drains use this API to strip away sequence ID
393   // information for drop calculation.
394   //
395   // Precondition: the buffer data must not be corrupt, otherwise there will
396   // be a crash.
397   //
398   // Returns:
399   // OK - An entry was successfully read from the multisink. The
400   // `drop_count_out` is set to the difference between the current sequence ID
401   // and the last handled ID.
402   // FAILED_PRECONDITION - The drain is not attached to
403   // a multisink.
404   // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store
405   // the next available entry, which was discarded.
406   Result<ConstByteSpan> PeekOrPopEntry(Drain& drain,
407                                        ByteSpan buffer,
408                                        Request request,
409                                        uint32_t& drop_count_out,
410                                        uint32_t& ingress_drop_count_out,
411                                        uint32_t& entry_sequence_id_out)
412       PW_LOCKS_EXCLUDED(lock_);
413 
414  private:
415   // Notifies attached listeners of new entries or an updated drop count.
416   void NotifyListeners() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_);
417 
418   IntrusiveList<Listener> listeners_ PW_GUARDED_BY(lock_);
419   ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_);
420   Drain oldest_entry_drain_ PW_GUARDED_BY(lock_);
421   uint32_t sequence_id_ PW_GUARDED_BY(lock_);
422   uint32_t total_ingress_drops_ PW_GUARDED_BY(lock_);
423   LockType lock_;
424 };
425 
426 }  // namespace multisink
427 }  // namespace pw
428