1 // Copyright 2023 The Pigweed Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not 4 // use this file except in compliance with the License. You may obtain a copy of 5 // the License at 6 // 7 // https://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12 // License for the specific language governing permissions and limitations under 13 // the License. 14 #pragma once 15 16 /// @file 17 /// This file defines types related to a multi-producer, single-consumer stream. 18 /// 19 /// The single readers must be constructed in place, while writers can be moved. 20 /// A reader and writer may be connected using `CreateMpscStream()`. Additional 21 /// writers may be connected by copying a previously connected writer. 22 /// 23 /// Example: 24 /// 25 /// @code{.cpp} 26 /// void WriteThreadRoutine(void* arg) { 27 /// auto *writer = static_cast<MpscWriter *>(arg); 28 /// ConstByteSpan data = GenerateSomeData(); 29 /// Status status = writer->Write(data); 30 /// ... 31 /// } 32 /// ... 33 /// MpscReader reader; 34 /// MpscWriter writer; 35 /// CreateMpscStream(reader, writer); 36 /// thread::Thread t(MakeThreadOptions(), WriteThreadRoutine, &writer); 37 /// std::byte buffer[kBufSize]; 38 /// if (auto status = reader.Read(ByteSpan(buffer)); status.ok()) { 39 /// ProcessSomeData(buffer); 40 /// } 41 /// @endcode 42 /// 43 /// See the `MpscReader::ReadAll()` for additional examples. 44 /// 45 /// The types in the files are designed to be used across different threads, 46 /// but are not completely thread-safe. Data must only be written by an 47 /// MpscWriter using a single thread, and data must only be read by an 48 /// MpscReader using a single thread. In other words, multiple calls to 49 /// `Write()` must not be made concurrently, and multiple calls to `Read()` and 50 /// `ReadAll()` must not be made concurrently. Calls to other methods, e.g. 51 /// `Close()`, are thread-safe and may be made from any thread. 52 53 #include <cstddef> 54 55 #include "pw_bytes/span.h" 56 #include "pw_chrono/system_clock.h" 57 #include "pw_containers/intrusive_list.h" 58 #include "pw_function/function.h" 59 #include "pw_status/status.h" 60 #include "pw_status/status_with_size.h" 61 #include "pw_stream/stream.h" 62 #include "pw_sync/lock_annotations.h" 63 #include "pw_sync/mutex.h" 64 #include "pw_sync/timed_thread_notification.h" 65 66 namespace pw::stream { 67 68 // Forward declaration. 69 class MpscReader; 70 class MpscWriter; 71 72 /// Creates a multi-producer, single consumer stream. 73 /// 74 /// This method creates a stream by associating a reader and writer. Both are 75 /// reset before being connected. This is the only way to connect a reader. 76 /// Additional writers may be connected by copying the given writer after it is 77 /// connected. 78 /// 79 /// This method is thread-safe with respect to other MpscReader and MpscWriter 80 /// methods. It is not thread-safe with respect to itself, i.e. callers must 81 /// not make concurrent calls to `CreateMpscStream()` from different threads 82 /// with the same objects. 83 /// 84 /// @param[out] reader The reader to connect. 85 /// @param[out] writer The writer to connect. 86 void CreateMpscStream(MpscReader& reader, MpscWriter& writer); 87 88 /// Writer for a multi-producer, single consumer stream. 89 /// 90 /// This class has a default constructor that only produces disconnected 91 /// writers. To connect writers, use `CreateMpscStream()`. Additional connected 92 /// writers can be created by copying an existing one. 93 /// 94 /// Each thread should have its own dedicated writer. This class is thread-safe 95 /// with respect to the reader, but not with respect to itself. In particular, 96 /// attempting to call `Write()` concurrently on different threads may cause 97 /// result in a failure. 98 class MpscWriter : public NonSeekableWriter, 99 public IntrusiveList<MpscWriter>::Item { 100 public: 101 using duration = std::optional<chrono::SystemClock::duration>; 102 103 /// A per-writer thread notification that can be added to a reader's list. 104 /// 105 /// The reader maintains a list of outstanding requests to write data. As 106 /// data is read, and space to write data becomes available, it uses these 107 /// requests to signal the waiting the writers. 108 struct Request : public IntrusiveList<Request>::Item { 109 sync::TimedThreadNotification notification; 110 using IntrusiveList<Request>::Item::unlisted; 111 }; 112 113 MpscWriter() = default; 114 MpscWriter(const MpscWriter& other); 115 MpscWriter& operator=(const MpscWriter& other); 116 MpscWriter(MpscWriter&& other); 117 MpscWriter& operator=(MpscWriter&& other); 118 ~MpscWriter() override; 119 120 /// Returns whether this object is connected to a reader. 121 bool connected() const PW_LOCKS_EXCLUDED(mutex_); 122 123 /// Indicates how much data was sent in the last call to `Write()`. 124 size_t last_write() const PW_LOCKS_EXCLUDED(mutex_); 125 126 /// Returns the optional maximum time elapsed before a `Write()` fails. 127 const duration& timeout() const PW_LOCKS_EXCLUDED(mutex_); 128 129 /// Set the timeout for writing to this stream. 130 /// 131 /// After setting a timeout, if the given duration elapses while making a call 132 /// to `Write()`, @pw_status{RESOURCE_EXHAUSTED} will be returned. If desired, 133 /// a timeout should be set before calling `Write()`. Setting a timeout when a 134 /// writer is awaiting notification from a reader will not affect the duration 135 /// of that wait. 136 /// 137 /// Note that setting a write timeout makes partial writes possible. For 138 /// example, if a call to `Write()` of some length corresponds to 2 calls to 139 /// `Read()` of half that length with an sufficient delay between the calls 140 /// will result in the first half being written and read, but not the second. 141 /// This differs from `Stream::Write()` which stipulates that no data is 142 /// written on failure. If this happens, the length of the data written can be 143 /// retrieved using `last_write()`. 144 /// 145 /// Generally, callers should use one of three approaches: 146 /// 1. Do not set a write timeout, and let writers block arbitrarily long 147 /// until space is available or the reader is disconnected. 148 /// 2. Use only a single writer, and use `last_write()` to resend data. 149 /// 3. Structure the data being sent so that the reader can always read 150 /// complete messages and avoid blocking or performing complex work 151 /// mid-message. 152 /// 153 /// @param[in] timeout The duration to wait before returning an error. 154 void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_); 155 156 /// Sets the maximum amount that can be written by this writer. 157 /// 158 /// By default, writers can write an unlimited amount of data. This method can 159 /// be used to set a limit, or remove it by providing a value of 160 /// Stream::kUnlimited. 161 /// 162 /// If a limit is set, the writer will automatically close once it has written 163 /// that much data. The current number of bytes remaining until the limit is 164 /// reached can be retrieved using `ConservativeWriteLimit()`. 165 /// 166 /// @param[in] limit The maximum amount that can be written by this writer. 167 void SetLimit(size_t limit) PW_LOCKS_EXCLUDED(mutex_); 168 169 /// Disconnects this writer from its reader. 170 /// 171 /// This method does nothing if the writer is not connected. 172 void Close() PW_LOCKS_EXCLUDED(mutex_); 173 174 private: 175 // The factory method is allowed to directly modify a writer to connect it 176 // to the reader. 177 friend void CreateMpscStream(MpscReader&, MpscWriter&); 178 179 /// @copydoc Stream::ConservativeLimit 180 size_t ConservativeLimit(LimitType type) const override; 181 182 /// @copydoc Stream::DoWrite 183 /// 184 /// This method is *not* thread-safe with respect to itself. If multiple 185 /// threads attempt to write concurrently using the same writer, those calls 186 /// may fail. Instead, each thread should have its own writer. 187 /// 188 /// @pre No other thread has called `Write()` on this object. 189 Status DoWrite(ConstByteSpan data) override; 190 191 /// Locked implementation of `Close()`. 192 void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 193 194 mutable sync::Mutex mutex_; 195 MpscReader* reader_ PW_GUARDED_BY(mutex_) = nullptr; 196 size_t limit_ PW_GUARDED_BY(mutex_) = kUnlimited; 197 Request write_request_; 198 duration timeout_ PW_GUARDED_BY(mutex_); 199 size_t last_write_ PW_GUARDED_BY(mutex_) = 0; 200 }; 201 202 /// Reader of a multi-producer, single-consumer stream. 203 /// 204 /// The reader manages 3 aspects of the stream: 205 /// * The storage used to hold written data that is to be read. 206 /// * The list of connected writers. 207 /// * Accounting for how much data has and can be written. 208 /// 209 /// This class has a default constructor that can only produce a disconnected 210 /// reader. To connect a reader, use `CreateMpscStream()`. 211 class MpscReader : public NonSeekableReader { 212 public: 213 using duration = std::optional<chrono::SystemClock::duration>; 214 215 MpscReader(); 216 ~MpscReader() override; 217 218 /// Returns whether this object has any connected writers. 219 bool connected() const PW_LOCKS_EXCLUDED(mutex_); 220 221 /// Set the timeout for reading from this stream. 222 /// 223 /// After setting a timeout, if the given duration elapses while making a call 224 /// to `Read()`, RESOURCE_EXHAUSTED will be returned. If desired, a timeout 225 /// should be set before calling `Read()` or `ReadAll()`. Setting a timeout 226 /// when a reader is awaiting notification from a writer will not affect the 227 /// duration of that wait. `ReadUntilClose()` ignores timeouts entirely. 228 /// 229 /// @param[in] timeout The duration to wait before returning an error. 230 void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_); 231 232 /// Associates the reader with storage to buffer written data to be read. 233 /// 234 /// If desired, callers can use this method to buffer written data. This can 235 /// improve writer performance by allowing calls to `WriteData()` to avoid 236 /// waiting for the reader, albeit at the cost of increased memory. This can 237 /// be useful when the reader needs time to process the data it reads, or when 238 /// the volume of writes varies over time, i.e. is "bursty". 239 /// 240 /// The reader does not take ownership of the storage, which must be valid 241 /// until a call to the destructor or another call to `SetBuffer()`. 242 /// 243 /// @param[in] buffer A view to the storage. 244 void SetBuffer(ByteSpan buffer) PW_LOCKS_EXCLUDED(mutex_); 245 246 /// @fn ReadAll 247 /// Reads data in a loop and passes it to a provided callback. 248 /// 249 /// This will read continuously until all connected writers close. 250 /// 251 /// Example usage: 252 /// 253 /// @code(.cpp} 254 /// MpscReader reader; 255 /// MpscWriter writer; 256 /// MpscStreamCreate(reader, writer); 257 /// thread::Thread t(MakeThreadOptions(), [] (void*arg) { 258 /// auto *writer = static_cast<MpscWriter *>(arg); 259 /// writer->Write(GenerateSomeData()).IgnoreError(); 260 /// }, &writer); 261 /// auto status = reader.ReadAll([] (ConstByteSpan data) { 262 /// return ProcessSomeData(); 263 /// }); 264 /// t.join(); 265 /// @endcode 266 /// 267 /// @param[in] callback A callable object to invoke on data as it is read. 268 /// @retval OK Successfully read until writers closed. 269 /// @retval FAILED_PRECONDITION The object does not have a buffer. 270 /// @retval RESOURCE_EXHAUSTED Timed out when reading data. This can only 271 /// occur if a timeout has been set. 272 /// @retval Any other error as returned by the callback. 273 using ReadAllCallback = Function<Status(ConstByteSpan data)>; 274 Status ReadAll(ReadAllCallback callback) PW_LOCKS_EXCLUDED(mutex_); 275 276 /// Disconnects all writers and drops any unread data. 277 void Close() PW_LOCKS_EXCLUDED(mutex_); 278 279 private: 280 // The factory method is allowed to directly modify the reader to connect it 281 // to a writer. 282 friend void CreateMpscStream(MpscReader&, MpscWriter&); 283 284 // The writer is allowed to call directly into the reader to: 285 // * Add/remove itself to the reader's list of writer. 286 // * Request space to write data, and to write to that space. 287 friend class MpscWriter; 288 289 /// @fn IncreaseLimit 290 /// @fn IncreaseLimitLocked 291 /// Increases the number of remaining bytes to be written. 292 /// 293 /// Used by `MpscWriter::SetLimit()` and `MpscWriter::WriteData()`. 294 /// 295 /// @param[in] delta How much to increase the number of remaining bytes. 296 void IncreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_); 297 void IncreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 298 299 /// @fn DecreaseLimit 300 /// @fn DecreaseLimitLocked 301 /// Decreases the number of remaining bytes to be written. 302 /// 303 /// Used by `MpscWriter::SetLimit()` and `MpscWriter::RemoveWriter()`. 304 /// 305 /// @param[in] delta How much to decrease the number of remaining bytes. 306 void DecreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_); 307 void DecreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 308 309 /// @copydoc Stream::ConservativeLimit 310 size_t ConservativeLimit(Stream::LimitType type) const override 311 PW_LOCKS_EXCLUDED(mutex_); 312 313 /// Adds the write request to the reader's list of pending requests. 314 /// 315 /// Used by `MpscWriter::WriteData()`. 316 /// 317 /// @param[in] write_request A writer's request object. 318 void RequestWrite(MpscWriter::Request& write_request) 319 PW_LOCKS_EXCLUDED(mutex_); 320 321 /// Checks if a writer can write data, and signals it if so. 322 /// 323 /// A reader may signal a writer because: 324 /// * Space to write data has become available. 325 /// * The queue of write requests has changed. 326 /// * The reader is closing. `WriteData()` will return OUT_OF_RANGE. 327 void CheckWriteableLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 328 329 /// Adds data from a writer to the buffer to be read. 330 /// 331 /// @param[in] data The data to be written. 332 /// @param[in] limit The writer's current write limit. 333 /// 334 /// @retval OK Data was written to the buffer. 335 /// @retval RESOURCE_EXHAUSTED Buffer has insufficent space for data. 336 /// @retval OUT_OF_RANGE Stream is shut down or closed. 337 StatusWithSize WriteData(ConstByteSpan data, size_t limit) 338 PW_LOCKS_EXCLUDED(mutex_); 339 340 /// @fn CompleteWrite 341 /// @fn CompleteWriteLocked 342 /// Removes the write request from the reader's list of pending requests. 343 /// 344 /// Used by `MpscWriter::WriteData()` and `MpscWriter::CloseLocked()`. 345 /// 346 /// @param[in] write_request A writer's request object. 347 void CompleteWrite(MpscWriter::Request& write_request) 348 PW_LOCKS_EXCLUDED(mutex_); 349 void CompleteWriteLocked(MpscWriter::Request& write_request) 350 PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_); 351 352 /// @copydoc Stream::DoRead 353 StatusWithSize DoRead(ByteSpan destination) override 354 PW_LOCKS_EXCLUDED(mutex_); 355 356 // Locked implementations. 357 358 mutable sync::Mutex mutex_; 359 IntrusiveList<MpscWriter> writers_ PW_GUARDED_BY(mutex_); 360 IntrusiveList<MpscWriter::Request> write_requests_ PW_GUARDED_BY(mutex_); 361 IntrusiveList<MpscWriter::Request>::iterator last_request_ 362 PW_GUARDED_BY(mutex_); 363 364 size_t num_unlimited_ PW_GUARDED_BY(mutex_) = 0; 365 size_t limit_ PW_GUARDED_BY(mutex_) = 0; 366 367 bool reading_ PW_GUARDED_BY(mutex_) = false; 368 sync::TimedThreadNotification readable_; 369 sync::ThreadNotification closeable_; 370 duration timeout_ PW_GUARDED_BY(mutex_); 371 372 ByteSpan destination_ PW_GUARDED_BY(mutex_); 373 size_t written_ PW_GUARDED_BY(mutex_) = 0; 374 375 ByteSpan buffer_ PW_GUARDED_BY(mutex_); 376 size_t offset_ PW_GUARDED_BY(mutex_) = 0; 377 size_t length_ PW_GUARDED_BY(mutex_) = 0; 378 }; 379 380 /// Reader for a multi-producer, single consumer stream. 381 /// 382 /// This class includes an explicitly-sized buffer. It has a default constructor 383 /// that can only produce a disconnected reader. To connect a reader, use 384 /// `CreateMpscStream()`. 385 template <size_t kCapacity> 386 class BufferedMpscReader : public MpscReader { 387 public: BufferedMpscReader()388 BufferedMpscReader() { SetBuffer(buffer_); } 389 390 private: 391 std::array<std::byte, kCapacity> buffer_; 392 }; 393 394 } // namespace pw::stream 395