• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 #include "pw_bytes/span.h"
17 #include "pw_result/result.h"
18 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h"
19 #include "pw_status/status.h"
20 
21 namespace pw {
22 namespace multisink {
23 class Drain;
24 
25 // An asynchronous single-writer multi-reader queue that ensures readers can
26 // poll for dropped message counts, which is useful for logging or similar
27 // scenarios where readers need to be aware of the input message sequence.
28 // TODO(pwbug/342): Support notifying readers when the queue is readable,
29 // rather than requiring them to poll to check for new entries.
30 // TODO(pwbug/343): Add thread-safety, separate from the thread-safety work
31 // planned for the underlying ring buffer.
32 class MultiSink {
33  public:
34   // Constructs a multisink using a ring buffer backed by the provided buffer.
MultiSink(ByteSpan buffer)35   MultiSink(ByteSpan buffer) : ring_buffer_(true), sequence_id_(0) {
36     ring_buffer_.SetBuffer(buffer);
37   }
38 
39   // Write an entry to the multisink. If available space is less than the
40   // size of the entry, the internal ring buffer will push the oldest entries
41   // out to make space, so long as the entry is not larger than the buffer.
42   // The sequence ID of the multisink will always increment as a result of
43   // calling HandleEntry, regardless of whether pushing the entry succeeds.
44   //
45   // Return values:
46   // Ok - Entry was successfully pushed to the ring buffer.
47   // InvalidArgument - Size of data to write is zero bytes.
48   // OutOfRange - Size of data is greater than buffer size.
49   // FailedPrecondition - Buffer was not initialized.
HandleEntry(ConstByteSpan entry)50   Status HandleEntry(ConstByteSpan entry) {
51     return ring_buffer_.PushBack(entry, sequence_id_++);
52   }
53 
54   // Notifies the multisink of messages dropped before ingress. The writer
55   // may use this to signal to readers that an entry (or entries) failed
56   // before being sent to the multisink (e.g. the writer failed to encode
57   // the message). This API increments the sequence ID of the multisink by
58   // the provided `drop_count`.
59   void HandleDropped(uint32_t drop_count = 1) { sequence_id_ += drop_count; }
60 
61   // Attach a drain to the multisink. Drains may not be associated with more
62   // than one multisink at a time. Entries pushed before the drain was attached
63   // are not seen by the drain, so drains should be attached before entries
64   // are pushed.
65   //
66   // Return values:
67   // Ok - Drain was successfully attached.
68   // InvalidArgument - Drain is currently associated with another multisink.
69   Status AttachDrain(Drain& drain);
70 
71   // Detaches a drain from the multisink. Drains may only be detached if they
72   // were previously attached to this multisink.
73   //
74   // Return values:
75   // Ok - Drain was successfully detached.
76   // InvalidArgument - Drain is not currently associated with this multisink.
77   Status DetachDrain(Drain& drain);
78 
79   // Removes all data from the internal buffer. The multisink's sequence ID is
80   // not modified, so readers may interpret this event as droppping entries.
Clear()81   void Clear() { ring_buffer_.Clear(); }
82 
83  protected:
84   friend Drain;
85   // Gets an entry from the provided drain and unpacks sequence ID information.
86   // Drains use this API to strip away sequence ID information for drop
87   // calculation.
88   //
89   // Returns:
90   // Ok - An entry was successfully read from the multisink. The `sequence_id`
91   // is set to the ID encoded in the oldest entry.
92   // FailedPrecondition - The drain is not attached to a multisink.
93   // ResourceExhausted - The provided buffer was not large enough to store
94   // the next available entry.
95   // DataLoss - An entry was read from the multisink, but did not contains an
96   // encoded sequence ID.
97   static Result<ConstByteSpan> GetEntry(Drain& drain,
98                                         ByteSpan buffer,
99                                         uint32_t& sequence_id_out);
100 
101  private:
102   ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_;
103   uint32_t sequence_id_ = 0;
104 };
105 
106 }  // namespace multisink
107 }  // namespace pw
108