1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include "pw_bytes/span.h" 17 #include "pw_result/result.h" 18 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h" 19 #include "pw_status/status.h" 20 21 namespace pw { 22 namespace multisink { 23 class Drain; 24 25 // An asynchronous single-writer multi-reader queue that ensures readers can 26 // poll for dropped message counts, which is useful for logging or similar 27 // scenarios where readers need to be aware of the input message sequence. 28 // TODO(pwbug/342): Support notifying readers when the queue is readable, 29 // rather than requiring them to poll to check for new entries. 30 // TODO(pwbug/343): Add thread-safety, separate from the thread-safety work 31 // planned for the underlying ring buffer. 32 class MultiSink { 33 public: 34 // Constructs a multisink using a ring buffer backed by the provided buffer. MultiSink(ByteSpan buffer)35 MultiSink(ByteSpan buffer) : ring_buffer_(true), sequence_id_(0) { 36 ring_buffer_.SetBuffer(buffer); 37 } 38 39 // Write an entry to the multisink. If available space is less than the 40 // size of the entry, the internal ring buffer will push the oldest entries 41 // out to make space, so long as the entry is not larger than the buffer. 42 // The sequence ID of the multisink will always increment as a result of 43 // calling HandleEntry, regardless of whether pushing the entry succeeds. 44 // 45 // Return values: 46 // Ok - Entry was successfully pushed to the ring buffer. 47 // InvalidArgument - Size of data to write is zero bytes. 48 // OutOfRange - Size of data is greater than buffer size. 49 // FailedPrecondition - Buffer was not initialized. HandleEntry(ConstByteSpan entry)50 Status HandleEntry(ConstByteSpan entry) { 51 return ring_buffer_.PushBack(entry, sequence_id_++); 52 } 53 54 // Notifies the multisink of messages dropped before ingress. The writer 55 // may use this to signal to readers that an entry (or entries) failed 56 // before being sent to the multisink (e.g. the writer failed to encode 57 // the message). This API increments the sequence ID of the multisink by 58 // the provided `drop_count`. 59 void HandleDropped(uint32_t drop_count = 1) { sequence_id_ += drop_count; } 60 61 // Attach a drain to the multisink. Drains may not be associated with more 62 // than one multisink at a time. Entries pushed before the drain was attached 63 // are not seen by the drain, so drains should be attached before entries 64 // are pushed. 65 // 66 // Return values: 67 // Ok - Drain was successfully attached. 68 // InvalidArgument - Drain is currently associated with another multisink. 69 Status AttachDrain(Drain& drain); 70 71 // Detaches a drain from the multisink. Drains may only be detached if they 72 // were previously attached to this multisink. 73 // 74 // Return values: 75 // Ok - Drain was successfully detached. 76 // InvalidArgument - Drain is not currently associated with this multisink. 77 Status DetachDrain(Drain& drain); 78 79 // Removes all data from the internal buffer. The multisink's sequence ID is 80 // not modified, so readers may interpret this event as droppping entries. Clear()81 void Clear() { ring_buffer_.Clear(); } 82 83 protected: 84 friend Drain; 85 // Gets an entry from the provided drain and unpacks sequence ID information. 86 // Drains use this API to strip away sequence ID information for drop 87 // calculation. 88 // 89 // Returns: 90 // Ok - An entry was successfully read from the multisink. The `sequence_id` 91 // is set to the ID encoded in the oldest entry. 92 // FailedPrecondition - The drain is not attached to a multisink. 93 // ResourceExhausted - The provided buffer was not large enough to store 94 // the next available entry. 95 // DataLoss - An entry was read from the multisink, but did not contains an 96 // encoded sequence ID. 97 static Result<ConstByteSpan> GetEntry(Drain& drain, 98 ByteSpan buffer, 99 uint32_t& sequence_id_out); 100 101 private: 102 ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_; 103 uint32_t sequence_id_ = 0; 104 }; 105 106 } // namespace multisink 107 } // namespace pw 108