1 // Copyright 2021 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 #include <limits> 17 #include <mutex> 18 19 #include "pw_bytes/span.h" 20 #include "pw_function/function.h" 21 #include "pw_multisink/config.h" 22 #include "pw_result/result.h" 23 #include "pw_ring_buffer/prefixed_entry_ring_buffer.h" 24 #include "pw_status/status.h" 25 #include "pw_sync/lock_annotations.h" 26 27 namespace pw { 28 namespace multisink { 29 30 // An asynchronous single-writer multi-reader queue that ensures readers can 31 // poll for dropped message counts, which is useful for logging or similar 32 // scenarios where readers need to be aware of the input message sequence. 33 // 34 // This class is thread-safe but NOT IRQ-safe when 35 // PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled. 36 class MultiSink { 37 public: 38 // An asynchronous reader which is attached to a MultiSink via AttachDrain. 39 // Each Drain holds a PrefixedEntryRingBufferMulti::Reader and abstracts away 40 // entry sequence information for clients when popping. 41 class Drain { 42 public: 43 // Holds the context for a peeked entry, tha the user may pass to `PopEntry` 44 // to advance the drain. 45 class PeekedEntry { 46 public: 47 // Provides access to the peeked entry's data. entry()48 ConstByteSpan entry() const { return entry_; } 49 50 private: 51 friend MultiSink; 52 friend MultiSink::Drain; 53 PeekedEntry(ConstByteSpan entry,uint32_t sequence_id)54 constexpr PeekedEntry(ConstByteSpan entry, uint32_t sequence_id) 55 : entry_(entry), sequence_id_(sequence_id) {} 56 sequence_id()57 uint32_t sequence_id() const { return sequence_id_; } 58 59 const ConstByteSpan entry_; 60 const uint32_t sequence_id_; 61 }; 62 Drain()63 constexpr Drain() 64 : last_handled_sequence_id_(0), 65 last_peek_sequence_id_(0), 66 last_handled_ingress_drop_count_(0), 67 multisink_(nullptr) {} 68 69 // Returns the next available entry if it exists and acquires the latest 70 // drop count in parallel. 71 // 72 // If the read operation was successful or returned OutOfRange (i.e. no 73 // entries to read) then the `drop_count_out` is set to the number of 74 // entries that were dropped since the last call to PopEntry due to 75 // advancing the drain, and `ingress_drop_count_out` is set to the number of 76 // logs that were dropped before being added to the MultiSink. Otherwise, 77 // the drop counts are set to zero, so should always be processed. 78 // 79 // Drop counts are internally maintained with a 32-bit counter. If 80 // UINT32_MAX entries have been handled by the attached multisink between 81 // subsequent calls to PopEntry, the drop count will overflow and will 82 // report a lower count erroneously. Users should ensure that sinks call 83 // PopEntry at least once every UINT32_MAX entries. 84 // 85 // Example Usage: 86 // 87 // void ProcessEntriesFromDrain(Drain& drain) { 88 // std::array<std::byte, kEntryBufferSize> buffer; 89 // uint32_t drop_count = 0; 90 // 91 // // Example#1: Request the drain for a new entry. 92 // { 93 // const Result<ConstByteSpan> result = drain.PopEntry(buffer, 94 // drop_count); 95 // 96 // // If a non-zero drop count is received, process them. 97 // if (drop_count > 0) { 98 // ProcessDropCount(drop_count); 99 // } 100 // 101 // // If the call was successful, process the entry that was received. 102 // if (result.ok()) { 103 // ProcessEntry(result.value()); 104 // } 105 // } 106 // 107 // // Example#2: Drain out all messages. 108 // { 109 // Result<ConstByteSpan> result = Status::OutOfRange(); 110 // do { 111 // result = drain.PopEntry(buffer, drop_count); 112 // 113 // if (drop_count > 0) { 114 // ProcessDropCount(drop_count); 115 // } 116 // 117 // if (result.ok()) { 118 // ProcessEntry(result.value()); 119 // } 120 // 121 // // Keep trying until we hit OutOfRange. Note that a new entry may 122 // // have arrived after the PopEntry call. 123 // } while (!result.IsOutOfRange()); 124 // } 125 // } 126 // Precondition: the buffer data must not be corrupt, otherwise there will 127 // be a crash. 128 // 129 // Return values: 130 // OK - An entry was successfully read from the multisink. 131 // OUT_OF_RANGE - No entries were available. 132 // FAILED_PRECONDITION - The drain must be attached to a sink. 133 // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store 134 // the next available entry, which was discarded. 135 Result<ConstByteSpan> PopEntry(ByteSpan buffer, 136 uint32_t& drop_count_out, 137 uint32_t& ingress_drop_count) 138 PW_LOCKS_EXCLUDED(multisink_->lock_); 139 // Overload that combines drop counts. 140 // TODO(cachinchilla): remove when downstream projects migrated to new API. 141 [[deprecated("Use PopEntry with different drop count outputs")]] Result< 142 ConstByteSpan> PopEntry(ByteSpan buffer,uint32_t & drop_count_out)143 PopEntry(ByteSpan buffer, uint32_t& drop_count_out) 144 PW_LOCKS_EXCLUDED(multisink_->lock_) { 145 uint32_t ingress_drop_count = 0; 146 Result<ConstByteSpan> result = 147 PopEntry(buffer, drop_count_out, ingress_drop_count); 148 drop_count_out += ingress_drop_count; 149 return result; 150 } 151 152 // Removes the previously peeked entry from the multisink. 153 // 154 // Example Usage: 155 // 156 // // Peek entry to send it, and remove entry from multisink on success. 157 // uint32_t drop_count; 158 // const Result<PeekedEntry> peek_result = 159 // PeekEntry(out_buffer, drop_count); 160 // if (!peek_result.ok()) { 161 // return peek_result.status(); 162 // } 163 // Status send_status = UserSendFunction(peek_result.value().entry()) 164 // if (!send_status.ok()) 165 // return send_status; 166 // } 167 // PW_CHECK_OK(PopEntry(peek_result.value()); 168 // 169 // Precondition: the buffer data must not be corrupt, otherwise there will 170 // be a crash. 171 // 172 // Return values: 173 // OK - the entry or entries were removed from the multisink succesfully. 174 // FAILED_PRECONDITION - The drain must be attached to a sink. 175 Status PopEntry(const PeekedEntry& entry) 176 PW_LOCKS_EXCLUDED(multisink_->lock_); 177 178 // Returns a copy of the next available entry if it exists and acquires the 179 // latest drop count if the drain was advanced, and the latest ingress drop 180 // count, without moving the drain forward, except if there is a 181 // RESOURCE_EXHAUSTED error when peeking, in which case the drain is 182 // automatically advanced. 183 // The `drop_count_out` follows the same logic as `PopEntry`. The user must 184 // call `PopEntry` once the data in peek was used successfully. 185 // 186 // Precondition: the buffer data must not be corrupt, otherwise there will 187 // be a crash. 188 // 189 // Return values: 190 // OK - An entry was successfully read from the multisink. 191 // OUT_OF_RANGE - No entries were available. 192 // FAILED_PRECONDITION - The drain must be attached to a sink. 193 // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store 194 // the next available entry, which was discarded. 195 Result<PeekedEntry> PeekEntry(ByteSpan buffer, 196 uint32_t& drop_count_out, 197 uint32_t& ingress_drop_count) 198 PW_LOCKS_EXCLUDED(multisink_->lock_); 199 200 // Drains are not copyable or movable. 201 Drain(const Drain&) = delete; 202 Drain& operator=(const Drain&) = delete; 203 Drain(Drain&&) = delete; 204 Drain& operator=(Drain&&) = delete; 205 206 protected: 207 friend MultiSink; 208 209 // The `reader_` and `last_handled_sequence_id_` are managed by attached 210 // multisink and are guarded by `multisink_->lock_` when used. 211 ring_buffer::PrefixedEntryRingBufferMulti::Reader reader_; 212 uint32_t last_handled_sequence_id_; 213 uint32_t last_peek_sequence_id_; 214 uint32_t last_handled_ingress_drop_count_; 215 MultiSink* multisink_; 216 }; 217 218 // A pure-virtual listener of a MultiSink, attached via AttachListener. 219 // MultiSink's invoke listeners when new data arrives, allowing them to 220 // schedule the draining of messages out of the MultiSink. 221 class Listener : public IntrusiveList<Listener>::Item { 222 public: Listener()223 constexpr Listener() {} 224 virtual ~Listener() = default; 225 226 // Listeners are not copyable or movable. 227 Listener(const Listener&) = delete; 228 Listener& operator=(const Drain&) = delete; 229 Listener(Listener&&) = delete; 230 Listener& operator=(Drain&&) = delete; 231 232 protected: 233 friend MultiSink; 234 235 // Invoked by the attached multisink when a new entry or drop count is 236 // available. The multisink lock is held during this call, so neither the 237 // multisink nor it's drains can be used during this callback. 238 virtual void OnNewEntryAvailable() = 0; 239 }; 240 241 class iterator { 242 public: 243 iterator& operator++() { 244 it_++; 245 return *this; 246 } 247 iterator operator++(int) { 248 iterator original = *this; 249 ++*this; 250 return original; 251 } 252 253 ConstByteSpan& operator*() { 254 entry_ = (*it_).buffer; 255 return entry_; 256 } 257 ConstByteSpan* operator->() { return &operator*(); } 258 259 constexpr bool operator==(const iterator& rhs) const { 260 return it_ == rhs.it_; 261 } 262 263 constexpr bool operator!=(const iterator& rhs) const { 264 return it_ != rhs.it_; 265 } 266 267 // Returns the status of the last iteration operation. If the iterator 268 // fails to read an entry, it will move to iterator::end() and indicate 269 // the failure reason here. 270 // 271 // Return values: 272 // OK - iteration is successful and iterator points to the next entry. 273 // DATA_LOSS - Failed to read the metadata at this location. status()274 Status status() const { return it_.status(); } 275 276 private: 277 friend class MultiSink; 278 iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)279 iterator(ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader) 280 : it_(reader) {} iterator()281 iterator() {} 282 283 ring_buffer::PrefixedEntryRingBufferMulti::iterator it_; 284 ConstByteSpan entry_; 285 }; 286 287 class UnsafeIterationWrapper { 288 public: 289 using element_type = ConstByteSpan; 290 using value_type = std::remove_cv_t<ConstByteSpan>; 291 using pointer = ConstByteSpan*; 292 using reference = ConstByteSpan&; 293 using const_iterator = iterator; // Standard alias for iterable types. 294 begin()295 iterator begin() const { return iterator(*reader_); } end()296 iterator end() const { return iterator(); } cbegin()297 const_iterator cbegin() const { return begin(); } cend()298 const_iterator cend() const { return end(); } 299 300 private: 301 friend class MultiSink; UnsafeIterationWrapper(ring_buffer::PrefixedEntryRingBufferMulti::Reader & reader)302 UnsafeIterationWrapper( 303 ring_buffer::PrefixedEntryRingBufferMulti::Reader& reader) 304 : reader_(&reader) {} 305 ring_buffer::PrefixedEntryRingBufferMulti::Reader* reader_; 306 }; 307 UnsafeIteration()308 UnsafeIterationWrapper UnsafeIteration() PW_NO_LOCK_SAFETY_ANALYSIS { 309 return UnsafeIterationWrapper(oldest_entry_drain_.reader_); 310 } 311 312 // Constructs a multisink using a ring buffer backed by the provided buffer. MultiSink(ByteSpan buffer)313 MultiSink(ByteSpan buffer) 314 : ring_buffer_(true), sequence_id_(0), total_ingress_drops_(0) { 315 ring_buffer_.SetBuffer(buffer) 316 .IgnoreError(); // TODO(pwbug/387): Handle Status properly 317 AttachDrain(oldest_entry_drain_); 318 } 319 320 // Write an entry to the multisink. If available space is less than the 321 // size of the entry, the internal ring buffer will push the oldest entries 322 // out to make space, so long as the entry is not larger than the buffer. 323 // The sequence ID of the multisink will always increment as a result of 324 // calling HandleEntry, regardless of whether pushing the entry succeeds. 325 // 326 // Precondition: If PW_MULTISINK_LOCK_INTERRUPT_SAFE is disabled, this 327 // function must not be called from an interrupt context. 328 // Precondition: entry.size() <= `ring_buffer_` size 329 void HandleEntry(ConstByteSpan entry) PW_LOCKS_EXCLUDED(lock_); 330 331 // Notifies the multisink of messages dropped before ingress. The writer 332 // may use this to signal to readers that an entry (or entries) failed 333 // before being sent to the multisink (e.g. the writer failed to encode 334 // the message). This API increments the sequence ID of the multisink by 335 // the provided `drop_count`. 336 void HandleDropped(uint32_t drop_count = 1) PW_LOCKS_EXCLUDED(lock_); 337 338 // Attach a drain to the multisink. Drains may not be associated with more 339 // than one multisink at a time. Drains can consume entries pushed before 340 // the drain was attached, so long as they have not yet been evicted from 341 // the underlying ring buffer. 342 // 343 // Precondition: The drain must not be attached to a multisink. 344 void AttachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_); 345 346 // Detaches a drain from the multisink. Drains may only be detached if they 347 // were previously attached to this multisink. 348 // 349 // Precondition: The drain must be attached to this multisink. 350 void DetachDrain(Drain& drain) PW_LOCKS_EXCLUDED(lock_); 351 352 // Attach a listener to the multisink. The listener will be notified 353 // immediately when attached, to allow late drain users to consume existing 354 // entries. If draining in response to the notification, ensure that the drain 355 // is attached prior to registering the listener; attempting to drain when 356 // unattached will crash. Once attached, listeners are invoked on all new 357 // messages. 358 // 359 // Precondition: The listener must not be attached to a multisink. 360 void AttachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_); 361 362 // Detaches a listener from the multisink. 363 // 364 // Precondition: The listener must be attached to this multisink. 365 void DetachListener(Listener& listener) PW_LOCKS_EXCLUDED(lock_); 366 367 // Removes all data from the internal buffer. The multisink's sequence ID is 368 // not modified, so readers may interpret this event as droppping entries. 369 void Clear() PW_LOCKS_EXCLUDED(lock_); 370 371 // Uses MultiSink's unsafe iteration to dump the contents to a user-provided 372 // callback. max_num_entries can be used to limit the dump to the N most 373 // recent entries. 374 // 375 // Returns: 376 // OK - Successfully dumped entire multisink. 377 // DATA_LOSS - Corruption detected, some entries may have been lost. 378 Status UnsafeForEachEntry( 379 const Function<void(ConstByteSpan)>& callback, 380 size_t max_num_entries = std::numeric_limits<size_t>::max()); 381 382 protected: 383 friend Drain; 384 385 enum class Request { kPop, kPeek }; 386 // Removes the previously peeked entry from the front of the multisink. 387 Status PopEntry(Drain& drain, const Drain::PeekedEntry& entry) 388 PW_LOCKS_EXCLUDED(lock_); 389 390 // Gets a copy of the entry from the provided drain and unpacks sequence ID 391 // information. The entry is removed from the multisink when `request` is set 392 // to `Request::kPop`. Drains use this API to strip away sequence ID 393 // information for drop calculation. 394 // 395 // Precondition: the buffer data must not be corrupt, otherwise there will 396 // be a crash. 397 // 398 // Returns: 399 // OK - An entry was successfully read from the multisink. The 400 // `drop_count_out` is set to the difference between the current sequence ID 401 // and the last handled ID. 402 // FAILED_PRECONDITION - The drain is not attached to 403 // a multisink. 404 // RESOURCE_EXHAUSTED - The provided buffer was not large enough to store 405 // the next available entry, which was discarded. 406 Result<ConstByteSpan> PeekOrPopEntry(Drain& drain, 407 ByteSpan buffer, 408 Request request, 409 uint32_t& drop_count_out, 410 uint32_t& ingress_drop_count_out, 411 uint32_t& entry_sequence_id_out) 412 PW_LOCKS_EXCLUDED(lock_); 413 414 private: 415 // Notifies attached listeners of new entries or an updated drop count. 416 void NotifyListeners() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_); 417 418 IntrusiveList<Listener> listeners_ PW_GUARDED_BY(lock_); 419 ring_buffer::PrefixedEntryRingBufferMulti ring_buffer_ PW_GUARDED_BY(lock_); 420 Drain oldest_entry_drain_ PW_GUARDED_BY(lock_); 421 uint32_t sequence_id_ PW_GUARDED_BY(lock_); 422 uint32_t total_ingress_drops_ PW_GUARDED_BY(lock_); 423 LockType lock_; 424 }; 425 426 } // namespace multisink 427 } // namespace pw 428