1 // Copyright 2023 The Pigweed Authors
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License"); you may not
4 // use this file except in compliance with the License. You may obtain a copy of
5 // the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11 // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12 // License for the specific language governing permissions and limitations under
13 // the License.
14
15 #include "pw_stream/mpsc_stream.h"
16
17 #include <cstring>
18
19 #include "pw_assert/check.h"
20
21 namespace pw::stream {
22 namespace {
23
24 // Wait to receive a thread notification with an optional timeout.
Await(sync::TimedThreadNotification & notification,const std::optional<chrono::SystemClock::duration> & timeout)25 bool Await(sync::TimedThreadNotification& notification,
26 const std::optional<chrono::SystemClock::duration>& timeout) {
27 if (timeout.has_value()) {
28 return notification.try_acquire_for(*timeout);
29 }
30 // Block indefinitely.
31 notification.acquire();
32 return true;
33 }
34
35 } // namespace
36
CreateMpscStream(MpscReader & reader,MpscWriter & writer)37 void CreateMpscStream(MpscReader& reader, MpscWriter& writer) {
38 reader.Close();
39 std::lock_guard rlock(reader.mutex_);
40 PW_CHECK(reader.writers_.empty());
41 std::lock_guard wlock(writer.mutex_);
42 writer.CloseLocked();
43 reader.writers_.push_front(writer);
44 reader.IncreaseLimitLocked(Stream::kUnlimited);
45 writer.reader_ = &reader;
46 }
47
48 ////////////////////////////////////////////////////////////////////////////////
49 // MpscWriter methods.
50
MpscWriter(const MpscWriter & other)51 MpscWriter::MpscWriter(const MpscWriter& other) { *this = other; }
52
operator =(const MpscWriter & other)53 MpscWriter& MpscWriter::operator=(const MpscWriter& other) {
54 Close();
55
56 // Read the other object's internal state. Avoid holding both locks at once.
57 other.mutex_.lock();
58 MpscReader* reader = other.reader_;
59 duration timeout = other.timeout_;
60 size_t limit = other.limit_;
61 size_t last_write = other.last_write_;
62 other.mutex_.unlock();
63
64 // Now update this object with the other's state.
65 mutex_.lock();
66 reader_ = reader;
67 timeout_ = timeout;
68 limit_ = limit;
69 last_write_ = last_write;
70 mutex_.unlock();
71
72 // Add the writer to the reader outside the lock. If the reader was closed
73 // concurrently, this will close the writer.
74 if (reader != nullptr) {
75 std::lock_guard lock(reader->mutex_);
76 reader->writers_.push_front(*this);
77 reader->IncreaseLimitLocked(limit);
78 }
79 return *this;
80 }
81
MpscWriter(MpscWriter && other)82 MpscWriter::MpscWriter(MpscWriter&& other) : MpscWriter() {
83 *this = std::move(other);
84 }
85
operator =(MpscWriter && other)86 MpscWriter& MpscWriter::operator=(MpscWriter&& other) {
87 *this = other;
88 other.Close();
89 return *this;
90 }
91
~MpscWriter()92 MpscWriter::~MpscWriter() { Close(); }
93
connected() const94 bool MpscWriter::connected() const {
95 std::lock_guard lock(mutex_);
96 return reader_ != nullptr;
97 }
98
last_write() const99 size_t MpscWriter::last_write() const {
100 std::lock_guard lock(mutex_);
101 return last_write_;
102 }
103
SetTimeout(const duration & timeout)104 void MpscWriter::SetTimeout(const duration& timeout) {
105 std::lock_guard lock(mutex_);
106 timeout_ = timeout;
107 }
108
SetLimit(size_t limit)109 void MpscWriter::SetLimit(size_t limit) {
110 std::lock_guard lock(mutex_);
111 if (reader_) {
112 reader_->DecreaseLimit(limit_);
113 reader_->IncreaseLimit(limit);
114 }
115 limit_ = limit;
116 if (limit_ == 0) {
117 CloseLocked();
118 }
119 }
120
ConservativeLimit(LimitType type) const121 size_t MpscWriter::ConservativeLimit(LimitType type) const {
122 std::lock_guard lock(mutex_);
123 return reader_ != nullptr && type == LimitType::kWrite ? limit_ : 0;
124 }
125
DoWrite(ConstByteSpan data)126 Status MpscWriter::DoWrite(ConstByteSpan data) {
127 // Check some conditions to see if an early exit is possible.
128 if (data.empty()) {
129 return OkStatus();
130 }
131 std::lock_guard lock(mutex_);
132 if (reader_ == nullptr) {
133 return Status::OutOfRange();
134 }
135 if (limit_ < data.size()) {
136 return Status::ResourceExhausted();
137 }
138 if (!write_request_.unlisted()) {
139 return Status::FailedPrecondition();
140 }
141 // Subscribe to the reader. This will enqueue this object's write request,
142 // which will be used to notify the writer when the reader has space available
143 // or has closed.
144 reader_->RequestWrite(write_request_);
145 last_write_ = 0;
146
147 Status status;
148 while (!data.empty()) {
149 // Wait to be notified by the reader.
150 // Note: This manually unlocks and relocks the mutex currently held by the
151 // lock guard. It must not return while the mutex is not locked.
152 duration timeout = timeout_;
153 mutex_.unlock();
154 bool writeable = Await(write_request_.notification, timeout);
155 mutex_.lock();
156
157 // Conditions may have changed while waiting; check again.
158 if (reader_ == nullptr) {
159 return Status::OutOfRange();
160 }
161 if (!writeable || limit_ < data.size()) {
162 status = Status::ResourceExhausted();
163 break;
164 }
165
166 // Attempt to write data.
167 StatusWithSize result = reader_->WriteData(data, limit_);
168 last_write_ += result.size();
169 if (limit_ != kUnlimited) {
170 limit_ -= result.size();
171 }
172
173 // WriteData() only returns an error if the reader is closed. In that case,
174 // or if the writer has written all of its data, the writer should close.
175 if (!result.ok() || limit_ == 0) {
176 CloseLocked();
177 return result.status();
178 }
179 data = data.subspan(result.size());
180 }
181
182 // Unsubscribe from the reader.
183 reader_->CompleteWrite(write_request_);
184 return status;
185 }
186
Close()187 void MpscWriter::Close() {
188 std::lock_guard lock(mutex_);
189 CloseLocked();
190 }
191
CloseLocked()192 void MpscWriter::CloseLocked() {
193 if (reader_ != nullptr) {
194 std::lock_guard lock(reader_->mutex_);
195 reader_->CompleteWriteLocked(write_request_);
196 write_request_.notification.release();
197 if (reader_->writers_.remove(*this)) {
198 reader_->DecreaseLimitLocked(limit_);
199 }
200 if (reader_->writers_.empty()) {
201 reader_->readable_.release();
202 }
203 reader_ = nullptr;
204 }
205 limit_ = kUnlimited;
206 }
207
208 ////////////////////////////////////////////////////////////////////////////////
209 // MpscReader methods.
210
MpscReader()211 MpscReader::MpscReader() { last_request_ = write_requests_.begin(); }
212
~MpscReader()213 MpscReader::~MpscReader() { Close(); }
214
connected() const215 bool MpscReader::connected() const {
216 std::lock_guard lock(mutex_);
217 return !writers_.empty();
218 }
219
SetBuffer(ByteSpan buffer)220 void MpscReader::SetBuffer(ByteSpan buffer) {
221 std::lock_guard lock(mutex_);
222 PW_CHECK(length_ == 0);
223 buffer_ = buffer;
224 offset_ = 0;
225 }
226
SetTimeout(const duration & timeout)227 void MpscReader::SetTimeout(const duration& timeout) {
228 std::lock_guard lock(mutex_);
229 timeout_ = timeout;
230 }
231
IncreaseLimit(size_t delta)232 void MpscReader::IncreaseLimit(size_t delta) {
233 std::lock_guard lock(mutex_);
234 IncreaseLimitLocked(delta);
235 }
236
IncreaseLimitLocked(size_t delta)237 void MpscReader::IncreaseLimitLocked(size_t delta) {
238 if (delta == kUnlimited) {
239 ++num_unlimited_;
240 PW_CHECK_UINT_NE(num_unlimited_, 0);
241 } else if (limit_ != kUnlimited) {
242 PW_CHECK_UINT_LT(limit_, kUnlimited - delta);
243 limit_ += delta;
244 }
245 }
246
DecreaseLimit(size_t delta)247 void MpscReader::DecreaseLimit(size_t delta) {
248 std::lock_guard lock(mutex_);
249 DecreaseLimitLocked(delta);
250 }
251
DecreaseLimitLocked(size_t delta)252 void MpscReader::DecreaseLimitLocked(size_t delta) {
253 if (delta == kUnlimited) {
254 PW_CHECK_UINT_NE(num_unlimited_, 0);
255 --num_unlimited_;
256 } else if (limit_ != kUnlimited) {
257 PW_CHECK_UINT_LE(delta, limit_);
258 limit_ -= delta;
259 }
260 }
261
ConservativeLimit(LimitType type) const262 size_t MpscReader::ConservativeLimit(LimitType type) const {
263 std::lock_guard lock(mutex_);
264 if (type != LimitType::kRead) {
265 return 0;
266 }
267 if (writers_.empty()) {
268 return length_;
269 }
270 if (num_unlimited_ != 0) {
271 return kUnlimited;
272 }
273 return limit_;
274 }
275
RequestWrite(MpscWriter::Request & write_request)276 void MpscReader::RequestWrite(MpscWriter::Request& write_request) {
277 std::lock_guard lock(mutex_);
278 last_request_ = write_requests_.insert_after(last_request_, write_request);
279 CheckWriteableLocked();
280 }
281
CheckWriteableLocked()282 void MpscReader::CheckWriteableLocked() {
283 if (write_requests_.empty()) {
284 return;
285 }
286 if (writers_.empty() || written_ < destination_.size() ||
287 length_ < buffer_.size()) {
288 MpscWriter::Request& write_request = write_requests_.front();
289 write_request.notification.release();
290 }
291 }
292
WriteData(ConstByteSpan data,size_t limit)293 StatusWithSize MpscReader::WriteData(ConstByteSpan data, size_t limit) {
294 std::lock_guard lock(mutex_);
295 if (writers_.empty()) {
296 return StatusWithSize::OutOfRange(0);
297 }
298 size_t length = 0;
299 size_t available = buffer_.size() - length_;
300 if (written_ < destination_.size()) {
301 // A read is pending; copy directly into its buffer.
302 // Note: this condition is only true when the buffer is empty, so data
303 // order is preserved.
304 length = std::min(destination_.size() - written_, data.size());
305 memcpy(&destination_[written_], &data[0], length);
306 written_ += length;
307 } else if (available > 0) {
308 // The buffer has space for more data.
309 length = std::min(available, data.size());
310 size_t offset = (offset_ + length_) % buffer_.size();
311 size_t contiguous = buffer_.size() - offset;
312 if (length <= contiguous) {
313 memcpy(&buffer_[offset], &data[0], length);
314 } else {
315 memcpy(&buffer_[offset], &data[0], contiguous);
316 memcpy(&buffer_[0], &data[contiguous], length - contiguous);
317 }
318 length_ += length;
319 } else {
320 // If there is no space available, a write request can only be notified when
321 // its writer is closing. Do not notify the reader that data is available.
322 return StatusWithSize(0);
323 }
324 data = data.subspan(length);
325 // For unlimited writers, increase the read limit as needed.
326 // Do this before waking the reader and releasing the lock.
327 if (limit == kUnlimited) {
328 IncreaseLimitLocked(length);
329 }
330 readable_.release();
331 return StatusWithSize(length);
332 }
333
CompleteWrite(MpscWriter::Request & write_request)334 void MpscReader::CompleteWrite(MpscWriter::Request& write_request) {
335 std::lock_guard lock(mutex_);
336 CompleteWriteLocked(write_request);
337 }
338
CompleteWriteLocked(MpscWriter::Request & write_request)339 void MpscReader::CompleteWriteLocked(MpscWriter::Request& write_request) {
340 MpscWriter::Request& last_request = *last_request_;
341 write_requests_.remove(write_request);
342
343 // If the last request is removed, find the new last request. This is O(n),
344 // but the oremoved element is first unless a request is being canceled due to
345 // its writer closing. Thus in the typical case of a successful write, this is
346 // O(1).
347 if (&last_request == &write_request) {
348 last_request_ = write_requests_.begin();
349 for (size_t i = 1; i < write_requests_.size(); ++i) {
350 ++last_request_;
351 }
352 }
353
354 // The reader may have signaled this writer that it had space between the last
355 // call to WriteData() and this call. Check if that signal should be forwarded
356 // to the next write request.
357 CheckWriteableLocked();
358 }
359
DoRead(ByteSpan destination)360 StatusWithSize MpscReader::DoRead(ByteSpan destination) {
361 if (destination.empty()) {
362 return StatusWithSize(0);
363 }
364 mutex_.lock();
365 PW_CHECK(!reading_, "All reads must happen from the same thread.");
366 reading_ = true;
367 Status status = OkStatus();
368 size_t length = 0;
369
370 // Check for buffered data. Do this before checking if the reader is still
371 // connected in order to deliver data sent from a now-closed writer.
372 if (length_ != 0) {
373 length = std::min(length_, destination.size());
374 size_t contiguous = buffer_.size() - offset_;
375 if (length < contiguous) {
376 memcpy(&destination[0], &buffer_[offset_], length);
377 offset_ += length;
378 } else if (length == contiguous) {
379 memcpy(&destination[0], &buffer_[offset_], length);
380 offset_ = 0;
381 } else {
382 memcpy(&destination[0], &buffer_[offset_], contiguous);
383 offset_ = length - contiguous;
384 memcpy(&destination[contiguous], &buffer_[0], offset_);
385 }
386 length_ -= length;
387 DecreaseLimitLocked(length);
388 CheckWriteableLocked();
389
390 } else {
391 // Register the output buffer to and wait for Write() to bypass the buffer
392 // and write directly into it. Note that the buffer is only bypassed when
393 // empty, so data order is preserved.
394 PW_CHECK(written_ == 0);
395 destination_ = destination;
396 CheckWriteableLocked();
397
398 // The reader state may change while waiting, or even between acquiring the
399 // notification and acquiring the lock. As an example, the following
400 // sequence of events is possible:
401 //
402 // 1. A writer partially fills the output buffer and releases the
403 // notification.
404 // 2. The reader acquires the notification.
405 // 3. Another writer fills the remainder of the buffer and releass the
406 // notification *again*.
407 // 4. The reader acquires the lock.
408 //
409 // In this case, on the *next* read, the notification will be acquired
410 // immediately even if no data is available. As a result, this code loops
411 // until data is available.
412 while (status.ok()) {
413 bool readable = true;
414 if (!writers_.empty()) {
415 // Wait for a writer to provide data, or the reader to be closed.
416 duration timeout = timeout_;
417 mutex_.unlock();
418 readable = Await(readable_, timeout);
419 mutex_.lock();
420 }
421 if (!readable) {
422 status = Status::ResourceExhausted();
423 } else if (written_ != 0) {
424 break;
425 } else if (writers_.empty()) {
426 status = Status::OutOfRange();
427 }
428 }
429 destination_ = ByteSpan();
430 length = written_;
431 written_ = 0;
432 DecreaseLimitLocked(length);
433 CheckWriteableLocked();
434 }
435
436 reading_ = false;
437 if (writers_.empty()) {
438 closeable_.release();
439 }
440 mutex_.unlock();
441 return StatusWithSize(status, length);
442 }
443
ReadAll(ReadAllCallback callback)444 Status MpscReader::ReadAll(ReadAllCallback callback) {
445 mutex_.lock();
446 if (buffer_.empty()) {
447 mutex_.unlock();
448 return Status::FailedPrecondition();
449 }
450 PW_CHECK(!reading_, "All reads must happen from the same thread.");
451 reading_ = true;
452
453 Status status = Status::OutOfRange();
454 while (true) {
455 // Check for buffered data. Do this before checking if the reader still has
456 // writers in order to deliver data sent from a now-closed writer.
457 if (length_ != 0) {
458 size_t length = std::min(buffer_.size() - offset_, length_);
459 ConstByteSpan data(&buffer_[offset_], length);
460 offset_ = (offset_ + length_) % buffer_.size();
461 length_ -= length;
462 DecreaseLimitLocked(data.size());
463 CheckWriteableLocked();
464 status = callback(data);
465 if (!status.ok()) {
466 break;
467 }
468 }
469 if (writers_.empty()) {
470 break;
471 }
472 // Wait for a writer to provide data.
473 duration timeout = timeout_;
474 mutex_.unlock();
475 bool readable = Await(readable_, timeout);
476 mutex_.lock();
477 if (!readable) {
478 status = Status::ResourceExhausted();
479 break;
480 }
481 }
482 reading_ = false;
483 if (writers_.empty()) {
484 closeable_.release();
485 }
486 mutex_.unlock();
487 return status;
488 }
489
Close()490 void MpscReader::Close() {
491 mutex_.lock();
492 if (writers_.empty()) {
493 mutex_.unlock();
494 return;
495 }
496 IntrusiveList<MpscWriter> writers;
497 while (!writers_.empty()) {
498 MpscWriter& writer = writers_.front();
499 writers_.pop_front();
500 writers.push_front(writer);
501 }
502
503 // Wait for any pending read to finish.
504 if (reading_) {
505 mutex_.unlock();
506 readable_.release();
507 closeable_.acquire();
508 mutex_.lock();
509 }
510
511 num_unlimited_ = 0;
512 limit_ = 0;
513 written_ = 0;
514 offset_ = 0;
515 length_ = 0;
516 mutex_.unlock();
517
518 for (auto& writer : writers) {
519 writer.Close();
520 }
521 }
522
523 } // namespace pw::stream
524