• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 #pragma once
15 
16 /// @file
17 /// This file defines types related to a multi-producer, single-consumer stream.
18 ///
19 /// The single readers must be constructed in place, while writers can be moved.
20 /// A reader and writer may be connected using `CreateMpscStream()`. Additional
21 /// writers may be connected by copying a previously connected writer.
22 ///
23 /// Example:
24 ///
25 /// @code{.cpp}
26 ///    void WriteThreadRoutine(void* arg) {
27 ///      auto *writer = static_cast<MpscWriter *>(arg);
28 ///      ConstByteSpan data = GenerateSomeData();
29 ///      Status status = writer->Write(data);
30 ///      ...
31 ///    }
32 ///    ...
33 ///    MpscReader reader;
34 ///    MpscWriter writer;
35 ///    CreateMpscStream(reader, writer);
36 ///    thread::Thread t(MakeThreadOptions(), WriteThreadRoutine, &writer);
37 ///    std::byte buffer[kBufSize];
38 ///    if (auto status = reader.Read(ByteSpan(buffer)); status.ok()) {
39 ///      ProcessSomeData(buffer);
40 ///    }
41 /// @endcode
42 ///
43 /// See the `MpscReader::ReadAll()` for additional examples.
44 ///
45 /// The types in the files are designed to be used across different threads,
46 /// but are not completely thread-safe. Data must only be written by an
47 /// MpscWriter using a single thread, and data must only be read by an
48 /// MpscReader using a single thread. In other words, multiple calls to
49 /// `Write()` must not be made concurrently, and multiple calls to `Read()` and
50 /// `ReadAll()` must not be made concurrently. Calls to other methods, e.g.
51 /// `Close()`, are thread-safe and may be made from any thread.
52 
53 #include <cstddef>
54 
55 #include "pw_bytes/span.h"
56 #include "pw_chrono/system_clock.h"
57 #include "pw_containers/intrusive_list.h"
58 #include "pw_function/function.h"
59 #include "pw_status/status.h"
60 #include "pw_status/status_with_size.h"
61 #include "pw_stream/stream.h"
62 #include "pw_sync/lock_annotations.h"
63 #include "pw_sync/mutex.h"
64 #include "pw_sync/timed_thread_notification.h"
65 
66 namespace pw::stream {
67 
68 // Forward declaration.
69 class MpscReader;
70 class MpscWriter;
71 
72 /// Creates a multi-producer, single consumer stream.
73 ///
74 /// This method creates a stream by associating a reader and writer. Both are
75 /// reset before being connected. This is the only way to connect a reader.
76 /// Additional writers may be connected by copying the given writer after it is
77 /// connected.
78 ///
79 /// This method is thread-safe with respect to other MpscReader and MpscWriter
80 /// methods. It is not thread-safe with respect to itself, i.e. callers must
81 /// not make concurrent calls to `CreateMpscStream()` from different threads
82 /// with the same objects.
83 ///
84 /// @param[out]   reader  The reader to connect.
85 /// @param[out]   writer  The writer to connect.
86 void CreateMpscStream(MpscReader& reader, MpscWriter& writer);
87 
88 /// Writer for a multi-producer, single consumer stream.
89 ///
90 /// This class has a default constructor that only produces disconnected
91 /// writers. To connect writers, use `CreateMpscStream()`. Additional connected
92 /// writers can be created by copying an existing one.
93 ///
94 /// Each thread should have its own dedicated writer. This class is thread-safe
95 /// with respect to the reader, but not with respect to itself. In particular,
96 /// attempting to call `Write()` concurrently on different threads may cause
97 /// result in a failure.
98 class MpscWriter : public NonSeekableWriter,
99                    public IntrusiveList<MpscWriter>::Item {
100  public:
101   using duration = std::optional<chrono::SystemClock::duration>;
102 
103   /// A per-writer thread notification that can be added to a reader's list.
104   ///
105   /// The reader maintains a list of outstanding requests to write data. As
106   /// data is read, and space to write data becomes available, it uses these
107   /// requests to signal the waiting the writers.
108   struct Request : public IntrusiveList<Request>::Item {
109     sync::TimedThreadNotification notification;
110     using IntrusiveList<Request>::Item::unlisted;
111   };
112 
113   MpscWriter() = default;
114   MpscWriter(const MpscWriter& other);
115   MpscWriter& operator=(const MpscWriter& other);
116   MpscWriter(MpscWriter&& other);
117   MpscWriter& operator=(MpscWriter&& other);
118   ~MpscWriter() override;
119 
120   /// Returns whether this object is connected to a reader.
121   bool connected() const PW_LOCKS_EXCLUDED(mutex_);
122 
123   /// Indicates how much data was sent in the last call to `Write()`.
124   size_t last_write() const PW_LOCKS_EXCLUDED(mutex_);
125 
126   /// Returns the optional maximum time elapsed before a `Write()` fails.
127   const duration& timeout() const PW_LOCKS_EXCLUDED(mutex_);
128 
129   /// Set the timeout for writing to this stream.
130   ///
131   /// After setting a timeout, if the given duration elapses while making a call
132   /// to `Write()`, @pw_status{RESOURCE_EXHAUSTED} will be returned. If desired,
133   /// a timeout should be set before calling `Write()`. Setting a timeout when a
134   /// writer is awaiting notification from a reader will not affect the duration
135   /// of that wait.
136   ///
137   /// Note that setting a write timeout makes partial writes possible. For
138   /// example, if a call to `Write()` of some length corresponds to 2 calls to
139   /// `Read()` of half that length with an sufficient delay between the calls
140   /// will result in the first half being written and read, but not the second.
141   /// This differs from `Stream::Write()` which stipulates that no data is
142   /// written on failure. If this happens, the length of the data written can be
143   /// retrieved using `last_write()`.
144   ///
145   /// Generally, callers should use one of three approaches:
146   ///  1. Do not set a write timeout, and let writers block arbitrarily long
147   ///     until space is available or the reader is disconnected.
148   ///  2. Use only a single writer, and use `last_write()` to resend data.
149   ///  3. Structure the data being sent so that the reader can always read
150   ///     complete messages and avoid blocking or performing complex work
151   ///     mid-message.
152   ///
153   /// @param[in]  timeout   The duration to wait before returning an error.
154   void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);
155 
156   /// Sets the maximum amount that can be written by this writer.
157   ///
158   /// By default, writers can write an unlimited amount of data. This method can
159   /// be used to set a limit, or remove it by providing a value of
160   /// Stream::kUnlimited.
161   ///
162   /// If a limit is set, the writer will automatically close once it has written
163   /// that much data. The current number of bytes remaining until the limit is
164   /// reached can be retrieved using `ConservativeWriteLimit()`.
165   ///
166   /// @param[in]  limit   The maximum amount that can be written by this writer.
167   void SetLimit(size_t limit) PW_LOCKS_EXCLUDED(mutex_);
168 
169   /// Disconnects this writer from its reader.
170   ///
171   /// This method does nothing if the writer is not connected.
172   void Close() PW_LOCKS_EXCLUDED(mutex_);
173 
174  private:
175   // The factory method is allowed to directly modify a writer to connect it
176   // to the reader.
177   friend void CreateMpscStream(MpscReader&, MpscWriter&);
178 
179   /// @copydoc Stream::ConservativeLimit
180   size_t ConservativeLimit(LimitType type) const override;
181 
182   /// @copydoc Stream::DoWrite
183   ///
184   /// This method is *not* thread-safe with respect to itself. If multiple
185   /// threads attempt to write concurrently using the same writer, those calls
186   /// may fail. Instead, each thread should have its own writer.
187   ///
188   /// @pre No other thread has called `Write()` on this object.
189   Status DoWrite(ConstByteSpan data) override;
190 
191   /// Locked implementation of `Close()`.
192   void CloseLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
193 
194   mutable sync::Mutex mutex_;
195   MpscReader* reader_ PW_GUARDED_BY(mutex_) = nullptr;
196   size_t limit_ PW_GUARDED_BY(mutex_) = kUnlimited;
197   Request write_request_;
198   duration timeout_ PW_GUARDED_BY(mutex_);
199   size_t last_write_ PW_GUARDED_BY(mutex_) = 0;
200 };
201 
202 /// Reader of a multi-producer, single-consumer stream.
203 ///
204 /// The reader manages 3 aspects of the stream:
205 ///   * The storage used to hold written data that is to be read.
206 ///   * The list of connected writers.
207 ///   * Accounting for how much data has and can be written.
208 ///
209 /// This class has a default constructor that can only produce a disconnected
210 /// reader. To connect a reader, use `CreateMpscStream()`.
211 class MpscReader : public NonSeekableReader {
212  public:
213   using duration = std::optional<chrono::SystemClock::duration>;
214 
215   MpscReader();
216   ~MpscReader() override;
217 
218   /// Returns whether this object has any connected writers.
219   bool connected() const PW_LOCKS_EXCLUDED(mutex_);
220 
221   /// Set the timeout for reading from this stream.
222   ///
223   /// After setting a timeout, if the given duration elapses while making a call
224   /// to `Read()`, RESOURCE_EXHAUSTED will be returned. If desired, a timeout
225   /// should be set before calling `Read()` or `ReadAll()`. Setting a timeout
226   /// when a reader is awaiting notification from a writer will not affect the
227   /// duration of that wait. `ReadUntilClose()` ignores timeouts entirely.
228   ///
229   /// @param[in]  timeout   The duration to wait before returning an error.
230   void SetTimeout(const duration& timeout) PW_LOCKS_EXCLUDED(mutex_);
231 
232   /// Associates the reader with storage to buffer written data to be read.
233   ///
234   /// If desired, callers can use this method to buffer written data. This can
235   /// improve writer performance by allowing calls to `WriteData()` to avoid
236   /// waiting for the reader, albeit at the cost of increased memory. This can
237   /// be useful when the reader needs time to process the data it reads, or when
238   /// the volume of writes varies over time, i.e. is "bursty".
239   ///
240   /// The reader does not take ownership of the storage, which must be valid
241   /// until a call to the destructor or another call to `SetBuffer()`.
242   ///
243   /// @param[in]  buffer  A view to the storage.
244   void SetBuffer(ByteSpan buffer) PW_LOCKS_EXCLUDED(mutex_);
245 
246   /// @fn ReadAll
247   /// Reads data in a loop and passes it to a provided callback.
248   ///
249   /// This will read continuously until all connected writers close.
250   ///
251   /// Example usage:
252   ///
253   /// @code(.cpp}
254   ///    MpscReader reader;
255   ///    MpscWriter writer;
256   ///    MpscStreamCreate(reader, writer);
257   ///    thread::Thread t(MakeThreadOptions(), [] (void*arg) {
258   ///      auto *writer = static_cast<MpscWriter *>(arg);
259   ///      writer->Write(GenerateSomeData()).IgnoreError();
260   ///    }, &writer);
261   ///    auto status = reader.ReadAll([] (ConstByteSpan data) {
262   ///      return ProcessSomeData();
263   ///    });
264   ///    t.join();
265   /// @endcode
266   ///
267   /// @param[in]  callback  A callable object to invoke on data as it is read.
268   /// @retval     OK                  Successfully read until writers closed.
269   /// @retval     FAILED_PRECONDITION The object does not have a buffer.
270   /// @retval     RESOURCE_EXHAUSTED  Timed out when reading data. This can only
271   ///                                 occur if a timeout has been set.
272   /// @retval     Any other error as returned by the callback.
273   using ReadAllCallback = Function<Status(ConstByteSpan data)>;
274   Status ReadAll(ReadAllCallback callback) PW_LOCKS_EXCLUDED(mutex_);
275 
276   /// Disconnects all writers and drops any unread data.
277   void Close() PW_LOCKS_EXCLUDED(mutex_);
278 
279  private:
280   // The factory method is allowed to directly modify the reader to connect it
281   // to a writer.
282   friend void CreateMpscStream(MpscReader&, MpscWriter&);
283 
284   // The writer is allowed to call directly into the reader to:
285   //  * Add/remove itself to the reader's list of writer.
286   //  * Request space to write data, and to write to that space.
287   friend class MpscWriter;
288 
289   /// @fn IncreaseLimit
290   /// @fn IncreaseLimitLocked
291   /// Increases the number of remaining bytes to be written.
292   ///
293   /// Used by `MpscWriter::SetLimit()` and `MpscWriter::WriteData()`.
294   ///
295   /// @param[in]  delta   How much to increase the number of remaining bytes.
296   void IncreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
297   void IncreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
298 
299   /// @fn DecreaseLimit
300   /// @fn DecreaseLimitLocked
301   /// Decreases the number of remaining bytes to be written.
302   ///
303   /// Used by `MpscWriter::SetLimit()` and `MpscWriter::RemoveWriter()`.
304   ///
305   /// @param[in]  delta   How much to decrease the number of remaining bytes.
306   void DecreaseLimit(size_t delta) PW_LOCKS_EXCLUDED(mutex_);
307   void DecreaseLimitLocked(size_t delta) PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
308 
309   /// @copydoc Stream::ConservativeLimit
310   size_t ConservativeLimit(Stream::LimitType type) const override
311       PW_LOCKS_EXCLUDED(mutex_);
312 
313   /// Adds the write request to the reader's list of pending requests.
314   ///
315   /// Used by `MpscWriter::WriteData()`.
316   ///
317   /// @param[in]  write_request   A writer's request object.
318   void RequestWrite(MpscWriter::Request& write_request)
319       PW_LOCKS_EXCLUDED(mutex_);
320 
321   /// Checks if a writer can write data, and signals it if so.
322   ///
323   /// A reader may signal a writer because:
324   ///   * Space to write data has become available.
325   ///   * The queue of write requests has changed.
326   ///   * The reader is closing. `WriteData()` will return OUT_OF_RANGE.
327   void CheckWriteableLocked() PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
328 
329   /// Adds data from a writer to the buffer to be read.
330   ///
331   /// @param[in]  data            The data to be written.
332   /// @param[in]  limit           The writer's current write limit.
333   ///
334   /// @retval OK                  Data was written to the buffer.
335   /// @retval RESOURCE_EXHAUSTED  Buffer has insufficent space for data.
336   /// @retval OUT_OF_RANGE        Stream is shut down or closed.
337   StatusWithSize WriteData(ConstByteSpan data, size_t limit)
338       PW_LOCKS_EXCLUDED(mutex_);
339 
340   /// @fn CompleteWrite
341   /// @fn CompleteWriteLocked
342   /// Removes the write request from the reader's list of pending requests.
343   ///
344   /// Used by `MpscWriter::WriteData()` and `MpscWriter::CloseLocked()`.
345   ///
346   /// @param[in]  write_request   A writer's request object.
347   void CompleteWrite(MpscWriter::Request& write_request)
348       PW_LOCKS_EXCLUDED(mutex_);
349   void CompleteWriteLocked(MpscWriter::Request& write_request)
350       PW_EXCLUSIVE_LOCKS_REQUIRED(mutex_);
351 
352   /// @copydoc Stream::DoRead
353   StatusWithSize DoRead(ByteSpan destination) override
354       PW_LOCKS_EXCLUDED(mutex_);
355 
356   // Locked implementations.
357 
358   mutable sync::Mutex mutex_;
359   IntrusiveList<MpscWriter> writers_ PW_GUARDED_BY(mutex_);
360   IntrusiveList<MpscWriter::Request> write_requests_ PW_GUARDED_BY(mutex_);
361   IntrusiveList<MpscWriter::Request>::iterator last_request_
362       PW_GUARDED_BY(mutex_);
363 
364   size_t num_unlimited_ PW_GUARDED_BY(mutex_) = 0;
365   size_t limit_ PW_GUARDED_BY(mutex_) = 0;
366 
367   bool reading_ PW_GUARDED_BY(mutex_) = false;
368   sync::TimedThreadNotification readable_;
369   sync::ThreadNotification closeable_;
370   duration timeout_ PW_GUARDED_BY(mutex_);
371 
372   ByteSpan destination_ PW_GUARDED_BY(mutex_);
373   size_t written_ PW_GUARDED_BY(mutex_) = 0;
374 
375   ByteSpan buffer_ PW_GUARDED_BY(mutex_);
376   size_t offset_ PW_GUARDED_BY(mutex_) = 0;
377   size_t length_ PW_GUARDED_BY(mutex_) = 0;
378 };
379 
380 /// Reader for a multi-producer, single consumer stream.
381 ///
382 /// This class includes an explicitly-sized buffer. It has a default constructor
383 /// that can only produce a disconnected reader. To connect a reader, use
384 /// `CreateMpscStream()`.
385 template <size_t kCapacity>
386 class BufferedMpscReader : public MpscReader {
387  public:
BufferedMpscReader()388   BufferedMpscReader() { SetBuffer(buffer_); }
389 
390  private:
391   std::array<std::byte, kCapacity> buffer_;
392 };
393 
394 }  // namespace pw::stream
395