• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 //     https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14 
15 #include "pw_stream/mpsc_stream.h"
16 
17 #include <cstring>
18 
19 #include "pw_assert/check.h"
20 
21 namespace pw::stream {
22 namespace {
23 
24 // Wait to receive a thread notification with an optional timeout.
Await(sync::TimedThreadNotification & notification,const std::optional<chrono::SystemClock::duration> & timeout)25 bool Await(sync::TimedThreadNotification& notification,
26            const std::optional<chrono::SystemClock::duration>& timeout) {
27   if (timeout.has_value()) {
28     return notification.try_acquire_for(*timeout);
29   }
30   // Block indefinitely.
31   notification.acquire();
32   return true;
33 }
34 
35 }  // namespace
36 
CreateMpscStream(MpscReader & reader,MpscWriter & writer)37 void CreateMpscStream(MpscReader& reader, MpscWriter& writer) {
38   reader.Close();
39   std::lock_guard rlock(reader.mutex_);
40   PW_CHECK(reader.writers_.empty());
41   std::lock_guard wlock(writer.mutex_);
42   writer.CloseLocked();
43   reader.writers_.push_front(writer);
44   reader.IncreaseLimitLocked(Stream::kUnlimited);
45   writer.reader_ = &reader;
46 }
47 
48 ////////////////////////////////////////////////////////////////////////////////
49 // MpscWriter methods.
50 
MpscWriter(const MpscWriter & other)51 MpscWriter::MpscWriter(const MpscWriter& other) { *this = other; }
52 
operator =(const MpscWriter & other)53 MpscWriter& MpscWriter::operator=(const MpscWriter& other) {
54   Close();
55 
56   // Read the other object's internal state. Avoid holding both locks at once.
57   other.mutex_.lock();
58   MpscReader* reader = other.reader_;
59   duration timeout = other.timeout_;
60   size_t limit = other.limit_;
61   size_t last_write = other.last_write_;
62   other.mutex_.unlock();
63 
64   // Now update this object with the other's state.
65   mutex_.lock();
66   reader_ = reader;
67   timeout_ = timeout;
68   limit_ = limit;
69   last_write_ = last_write;
70   mutex_.unlock();
71 
72   // Add the writer to the reader outside the lock. If the reader was closed
73   // concurrently, this will close the writer.
74   if (reader != nullptr) {
75     std::lock_guard lock(reader->mutex_);
76     reader->writers_.push_front(*this);
77     reader->IncreaseLimitLocked(limit);
78   }
79   return *this;
80 }
81 
MpscWriter(MpscWriter && other)82 MpscWriter::MpscWriter(MpscWriter&& other) : MpscWriter() {
83   *this = std::move(other);
84 }
85 
operator =(MpscWriter && other)86 MpscWriter& MpscWriter::operator=(MpscWriter&& other) {
87   *this = other;
88   other.Close();
89   return *this;
90 }
91 
~MpscWriter()92 MpscWriter::~MpscWriter() { Close(); }
93 
connected() const94 bool MpscWriter::connected() const {
95   std::lock_guard lock(mutex_);
96   return reader_ != nullptr;
97 }
98 
last_write() const99 size_t MpscWriter::last_write() const {
100   std::lock_guard lock(mutex_);
101   return last_write_;
102 }
103 
SetTimeout(const duration & timeout)104 void MpscWriter::SetTimeout(const duration& timeout) {
105   std::lock_guard lock(mutex_);
106   timeout_ = timeout;
107 }
108 
SetLimit(size_t limit)109 void MpscWriter::SetLimit(size_t limit) {
110   std::lock_guard lock(mutex_);
111   if (reader_) {
112     reader_->DecreaseLimit(limit_);
113     reader_->IncreaseLimit(limit);
114   }
115   limit_ = limit;
116   if (limit_ == 0) {
117     CloseLocked();
118   }
119 }
120 
ConservativeLimit(LimitType type) const121 size_t MpscWriter::ConservativeLimit(LimitType type) const {
122   std::lock_guard lock(mutex_);
123   return reader_ != nullptr && type == LimitType::kWrite ? limit_ : 0;
124 }
125 
DoWrite(ConstByteSpan data)126 Status MpscWriter::DoWrite(ConstByteSpan data) {
127   // Check some conditions to see if an early exit is possible.
128   if (data.empty()) {
129     return OkStatus();
130   }
131   std::lock_guard lock(mutex_);
132   if (reader_ == nullptr) {
133     return Status::OutOfRange();
134   }
135   if (limit_ < data.size()) {
136     return Status::ResourceExhausted();
137   }
138   if (!write_request_.unlisted()) {
139     return Status::FailedPrecondition();
140   }
141   // Subscribe to the reader. This will enqueue this object's write request,
142   // which will be used to notify the writer when the reader has space available
143   // or has closed.
144   reader_->RequestWrite(write_request_);
145   last_write_ = 0;
146 
147   Status status;
148   while (!data.empty()) {
149     // Wait to be notified by the reader.
150     // Note: This manually unlocks and relocks the mutex currently held by the
151     // lock guard. It must not return while the mutex is not locked.
152     duration timeout = timeout_;
153     mutex_.unlock();
154     bool writeable = Await(write_request_.notification, timeout);
155     mutex_.lock();
156 
157     // Conditions may have changed while waiting; check again.
158     if (reader_ == nullptr) {
159       return Status::OutOfRange();
160     }
161     if (!writeable || limit_ < data.size()) {
162       status = Status::ResourceExhausted();
163       break;
164     }
165 
166     // Attempt to write data.
167     StatusWithSize result = reader_->WriteData(data, limit_);
168     last_write_ += result.size();
169     if (limit_ != kUnlimited) {
170       limit_ -= result.size();
171     }
172 
173     // WriteData() only returns an error if the reader is closed. In that case,
174     // or if the writer has written all of its data, the writer should close.
175     if (!result.ok() || limit_ == 0) {
176       CloseLocked();
177       return result.status();
178     }
179     data = data.subspan(result.size());
180   }
181 
182   // Unsubscribe from the reader.
183   reader_->CompleteWrite(write_request_);
184   return status;
185 }
186 
Close()187 void MpscWriter::Close() {
188   std::lock_guard lock(mutex_);
189   CloseLocked();
190 }
191 
CloseLocked()192 void MpscWriter::CloseLocked() {
193   if (reader_ != nullptr) {
194     std::lock_guard lock(reader_->mutex_);
195     reader_->CompleteWriteLocked(write_request_);
196     write_request_.notification.release();
197     if (reader_->writers_.remove(*this)) {
198       reader_->DecreaseLimitLocked(limit_);
199     }
200     if (reader_->writers_.empty()) {
201       reader_->readable_.release();
202     }
203     reader_ = nullptr;
204   }
205   limit_ = kUnlimited;
206 }
207 
208 ////////////////////////////////////////////////////////////////////////////////
209 // MpscReader methods.
210 
MpscReader()211 MpscReader::MpscReader() { last_request_ = write_requests_.begin(); }
212 
~MpscReader()213 MpscReader::~MpscReader() { Close(); }
214 
connected() const215 bool MpscReader::connected() const {
216   std::lock_guard lock(mutex_);
217   return !writers_.empty();
218 }
219 
SetBuffer(ByteSpan buffer)220 void MpscReader::SetBuffer(ByteSpan buffer) {
221   std::lock_guard lock(mutex_);
222   PW_CHECK(length_ == 0);
223   buffer_ = buffer;
224   offset_ = 0;
225 }
226 
SetTimeout(const duration & timeout)227 void MpscReader::SetTimeout(const duration& timeout) {
228   std::lock_guard lock(mutex_);
229   timeout_ = timeout;
230 }
231 
IncreaseLimit(size_t delta)232 void MpscReader::IncreaseLimit(size_t delta) {
233   std::lock_guard lock(mutex_);
234   IncreaseLimitLocked(delta);
235 }
236 
IncreaseLimitLocked(size_t delta)237 void MpscReader::IncreaseLimitLocked(size_t delta) {
238   if (delta == kUnlimited) {
239     ++num_unlimited_;
240     PW_CHECK_UINT_NE(num_unlimited_, 0);
241   } else if (limit_ != kUnlimited) {
242     PW_CHECK_UINT_LT(limit_, kUnlimited - delta);
243     limit_ += delta;
244   }
245 }
246 
DecreaseLimit(size_t delta)247 void MpscReader::DecreaseLimit(size_t delta) {
248   std::lock_guard lock(mutex_);
249   DecreaseLimitLocked(delta);
250 }
251 
DecreaseLimitLocked(size_t delta)252 void MpscReader::DecreaseLimitLocked(size_t delta) {
253   if (delta == kUnlimited) {
254     PW_CHECK_UINT_NE(num_unlimited_, 0);
255     --num_unlimited_;
256   } else if (limit_ != kUnlimited) {
257     PW_CHECK_UINT_LE(delta, limit_);
258     limit_ -= delta;
259   }
260 }
261 
ConservativeLimit(LimitType type) const262 size_t MpscReader::ConservativeLimit(LimitType type) const {
263   std::lock_guard lock(mutex_);
264   if (type != LimitType::kRead) {
265     return 0;
266   }
267   if (writers_.empty()) {
268     return length_;
269   }
270   if (num_unlimited_ != 0) {
271     return kUnlimited;
272   }
273   return limit_;
274 }
275 
RequestWrite(MpscWriter::Request & write_request)276 void MpscReader::RequestWrite(MpscWriter::Request& write_request) {
277   std::lock_guard lock(mutex_);
278   last_request_ = write_requests_.insert_after(last_request_, write_request);
279   CheckWriteableLocked();
280 }
281 
CheckWriteableLocked()282 void MpscReader::CheckWriteableLocked() {
283   if (write_requests_.empty()) {
284     return;
285   }
286   if (writers_.empty() || written_ < destination_.size() ||
287       length_ < buffer_.size()) {
288     MpscWriter::Request& write_request = write_requests_.front();
289     write_request.notification.release();
290   }
291 }
292 
WriteData(ConstByteSpan data,size_t limit)293 StatusWithSize MpscReader::WriteData(ConstByteSpan data, size_t limit) {
294   std::lock_guard lock(mutex_);
295   if (writers_.empty()) {
296     return StatusWithSize::OutOfRange(0);
297   }
298   size_t length = 0;
299   size_t available = buffer_.size() - length_;
300   if (written_ < destination_.size()) {
301     // A read is pending; copy directly into its buffer.
302     // Note: this condition is only true when the buffer is empty, so data
303     // order is preserved.
304     length = std::min(destination_.size() - written_, data.size());
305     memcpy(&destination_[written_], &data[0], length);
306     written_ += length;
307   } else if (available > 0) {
308     // The buffer has space for more data.
309     length = std::min(available, data.size());
310     size_t offset = (offset_ + length_) % buffer_.size();
311     size_t contiguous = buffer_.size() - offset;
312     if (length <= contiguous) {
313       memcpy(&buffer_[offset], &data[0], length);
314     } else {
315       memcpy(&buffer_[offset], &data[0], contiguous);
316       memcpy(&buffer_[0], &data[contiguous], length - contiguous);
317     }
318     length_ += length;
319   } else {
320     // If there is no space available, a write request can only be notified when
321     // its writer is closing. Do not notify the reader that data is available.
322     return StatusWithSize(0);
323   }
324   data = data.subspan(length);
325   // For unlimited writers, increase the read limit as needed.
326   // Do this before waking the reader and releasing the lock.
327   if (limit == kUnlimited) {
328     IncreaseLimitLocked(length);
329   }
330   readable_.release();
331   return StatusWithSize(length);
332 }
333 
CompleteWrite(MpscWriter::Request & write_request)334 void MpscReader::CompleteWrite(MpscWriter::Request& write_request) {
335   std::lock_guard lock(mutex_);
336   CompleteWriteLocked(write_request);
337 }
338 
CompleteWriteLocked(MpscWriter::Request & write_request)339 void MpscReader::CompleteWriteLocked(MpscWriter::Request& write_request) {
340   MpscWriter::Request& last_request = *last_request_;
341   write_requests_.remove(write_request);
342 
343   // If the last request is removed, find the new last request. This is O(n),
344   // but the oremoved element is first unless a request is being canceled due to
345   // its writer closing. Thus in the typical case of a successful write, this is
346   // O(1).
347   if (&last_request == &write_request) {
348     last_request_ = write_requests_.begin();
349     for (size_t i = 1; i < write_requests_.size(); ++i) {
350       ++last_request_;
351     }
352   }
353 
354   // The reader may have signaled this writer that it had space between the last
355   // call to WriteData() and this call. Check if that signal should be forwarded
356   // to the next write request.
357   CheckWriteableLocked();
358 }
359 
DoRead(ByteSpan destination)360 StatusWithSize MpscReader::DoRead(ByteSpan destination) {
361   if (destination.empty()) {
362     return StatusWithSize(0);
363   }
364   mutex_.lock();
365   PW_CHECK(!reading_, "All reads must happen from the same thread.");
366   reading_ = true;
367   Status status = OkStatus();
368   size_t length = 0;
369 
370   // Check for buffered data. Do this before checking if the reader is still
371   // connected in order to deliver data sent from a now-closed writer.
372   if (length_ != 0) {
373     length = std::min(length_, destination.size());
374     size_t contiguous = buffer_.size() - offset_;
375     if (length < contiguous) {
376       memcpy(&destination[0], &buffer_[offset_], length);
377       offset_ += length;
378     } else if (length == contiguous) {
379       memcpy(&destination[0], &buffer_[offset_], length);
380       offset_ = 0;
381     } else {
382       memcpy(&destination[0], &buffer_[offset_], contiguous);
383       offset_ = length - contiguous;
384       memcpy(&destination[contiguous], &buffer_[0], offset_);
385     }
386     length_ -= length;
387     DecreaseLimitLocked(length);
388     CheckWriteableLocked();
389 
390   } else {
391     // Register the output buffer to and wait for Write() to bypass the buffer
392     // and write directly into it. Note that the buffer is only bypassed when
393     // empty, so data order is preserved.
394     PW_CHECK(written_ == 0);
395     destination_ = destination;
396     CheckWriteableLocked();
397 
398     // The reader state may change while waiting, or even between acquiring the
399     // notification and acquiring the lock. As an example, the following
400     // sequence of events is possible:
401     //
402     //   1. A writer partially fills the output buffer and releases the
403     //      notification.
404     //   2. The reader acquires the notification.
405     //   3. Another writer fills the remainder of the buffer and releass the
406     //      notification *again*.
407     //   4. The reader acquires the lock.
408     //
409     // In this case, on the *next* read, the notification will be acquired
410     // immediately even if no data is available. As a result, this code loops
411     // until data is available.
412     while (status.ok()) {
413       bool readable = true;
414       if (!writers_.empty()) {
415         // Wait for a writer to provide data, or the reader to be closed.
416         duration timeout = timeout_;
417         mutex_.unlock();
418         readable = Await(readable_, timeout);
419         mutex_.lock();
420       }
421       if (!readable) {
422         status = Status::ResourceExhausted();
423       } else if (written_ != 0) {
424         break;
425       } else if (writers_.empty()) {
426         status = Status::OutOfRange();
427       }
428     }
429     destination_ = ByteSpan();
430     length = written_;
431     written_ = 0;
432     DecreaseLimitLocked(length);
433     CheckWriteableLocked();
434   }
435 
436   reading_ = false;
437   if (writers_.empty()) {
438     closeable_.release();
439   }
440   mutex_.unlock();
441   return StatusWithSize(status, length);
442 }
443 
ReadAll(ReadAllCallback callback)444 Status MpscReader::ReadAll(ReadAllCallback callback) {
445   mutex_.lock();
446   if (buffer_.empty()) {
447     mutex_.unlock();
448     return Status::FailedPrecondition();
449   }
450   PW_CHECK(!reading_, "All reads must happen from the same thread.");
451   reading_ = true;
452 
453   Status status = Status::OutOfRange();
454   while (true) {
455     // Check for buffered data. Do this before checking if the reader still has
456     // writers in order to deliver data sent from a now-closed writer.
457     if (length_ != 0) {
458       size_t length = std::min(buffer_.size() - offset_, length_);
459       ConstByteSpan data(&buffer_[offset_], length);
460       offset_ = (offset_ + length_) % buffer_.size();
461       length_ -= length;
462       DecreaseLimitLocked(data.size());
463       CheckWriteableLocked();
464       status = callback(data);
465       if (!status.ok()) {
466         break;
467       }
468     }
469     if (writers_.empty()) {
470       break;
471     }
472     // Wait for a writer to provide data.
473     duration timeout = timeout_;
474     mutex_.unlock();
475     bool readable = Await(readable_, timeout);
476     mutex_.lock();
477     if (!readable) {
478       status = Status::ResourceExhausted();
479       break;
480     }
481   }
482   reading_ = false;
483   if (writers_.empty()) {
484     closeable_.release();
485   }
486   mutex_.unlock();
487   return status;
488 }
489 
Close()490 void MpscReader::Close() {
491   mutex_.lock();
492   if (writers_.empty()) {
493     mutex_.unlock();
494     return;
495   }
496   IntrusiveList<MpscWriter> writers;
497   while (!writers_.empty()) {
498     MpscWriter& writer = writers_.front();
499     writers_.pop_front();
500     writers.push_front(writer);
501   }
502 
503   // Wait for any pending read to finish.
504   if (reading_) {
505     mutex_.unlock();
506     readable_.release();
507     closeable_.acquire();
508     mutex_.lock();
509   }
510 
511   num_unlimited_ = 0;
512   limit_ = 0;
513   written_ = 0;
514   offset_ = 0;
515   length_ = 0;
516   mutex_.unlock();
517 
518   for (auto& writer : writers) {
519     writer.Close();
520   }
521 }
522 
523 }  // namespace pw::stream
524