// Copyright 2023 The Pigweed Authors // // Licensed under the Apache License, Version 2.0 (the "License"); you may not // use this file except in compliance with the License. You may obtain a copy of // the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the // License for the specific language governing permissions and limitations under // the License. #include "pw_stream/mpsc_stream.h" #include #include #include "pw_assert/check.h" namespace pw::stream { namespace { // Wait to receive a thread notification with an optional timeout. bool Await(sync::TimedThreadNotification& notification, const std::optional& timeout) { if (timeout.has_value()) { return notification.try_acquire_for(*timeout); } // Block indefinitely. notification.acquire(); return true; } } // namespace void CreateMpscStream(MpscReader& reader, MpscWriter& writer) { reader.Close(); std::lock_guard rlock(reader.mutex_); PW_CHECK(reader.writers_.empty()); std::lock_guard wlock(writer.mutex_); writer.CloseLocked(); reader.writers_.push_front(writer); reader.IncreaseLimitLocked(Stream::kUnlimited); writer.reader_ = &reader; } //////////////////////////////////////////////////////////////////////////////// // MpscWriter methods. MpscWriter::MpscWriter(const MpscWriter& other) : MpscWriter() { *this = other; } MpscWriter& MpscWriter::operator=(const MpscWriter& other) { Close(); // Read the other object's internal state. Avoid holding both locks at once. other.mutex_.lock(); MpscReader* reader = other.reader_; duration timeout = other.timeout_; size_t limit = other.limit_; size_t last_write = other.last_write_; other.mutex_.unlock(); // Now update this object with the other's state. mutex_.lock(); reader_ = reader; timeout_ = timeout; limit_ = limit; last_write_ = last_write; mutex_.unlock(); // Add the writer to the reader outside the lock. If the reader was closed // concurrently, this will close the writer. if (reader != nullptr) { std::lock_guard lock(reader->mutex_); reader->writers_.push_front(*this); reader->IncreaseLimitLocked(limit); } return *this; } MpscWriter::MpscWriter(MpscWriter&& other) : MpscWriter() { *this = std::move(other); } MpscWriter& MpscWriter::operator=(MpscWriter&& other) { *this = other; other.Close(); return *this; } MpscWriter::~MpscWriter() { Close(); } bool MpscWriter::connected() const { std::lock_guard lock(mutex_); return reader_ != nullptr; } size_t MpscWriter::last_write() const { std::lock_guard lock(mutex_); return last_write_; } void MpscWriter::SetTimeout(const duration& timeout) { std::lock_guard lock(mutex_); timeout_ = timeout; } void MpscWriter::SetLimit(size_t limit) { std::lock_guard lock(mutex_); if (reader_) { reader_->DecreaseLimit(limit_); reader_->IncreaseLimit(limit); } limit_ = limit; if (limit_ == 0) { CloseLocked(); } } size_t MpscWriter::ConservativeLimit(LimitType type) const { std::lock_guard lock(mutex_); return reader_ != nullptr && type == LimitType::kWrite ? limit_ : 0; } Status MpscWriter::DoWrite(ConstByteSpan data) { // Check some conditions to see if an early exit is possible. if (data.empty()) { return OkStatus(); } std::lock_guard lock(mutex_); if (reader_ == nullptr) { return Status::OutOfRange(); } if (limit_ < data.size()) { return Status::ResourceExhausted(); } if (!write_request_.unlisted()) { return Status::FailedPrecondition(); } // Subscribe to the reader. This will enqueue this object's write request, // which will be used to notify the writer when the reader has space available // or has closed. reader_->RequestWrite(write_request_); last_write_ = 0; Status status; while (!data.empty()) { // Wait to be notified by the reader. // Note: This manually unlocks and relocks the mutex currently held by the // lock guard. It must not return while the mutex is not locked. duration timeout = timeout_; mutex_.unlock(); bool writeable = Await(write_request_.notification, timeout); mutex_.lock(); // Conditions may have changed while waiting; check again. if (reader_ == nullptr) { return Status::OutOfRange(); } if (!writeable || limit_ < data.size()) { status = Status::ResourceExhausted(); break; } // Attempt to write data. StatusWithSize result = reader_->WriteData(data, limit_); last_write_ += result.size(); if (limit_ != kUnlimited) { limit_ -= result.size(); } // WriteData() only returns an error if the reader is closed. In that case, // or if the writer has written all of its data, the writer should close. if (!result.ok() || limit_ == 0) { CloseLocked(); return result.status(); } data = data.subspan(result.size()); } // Unsubscribe from the reader. reader_->CompleteWrite(write_request_); return status; } void MpscWriter::Close() { std::lock_guard lock(mutex_); CloseLocked(); } void MpscWriter::CloseLocked() { if (reader_ != nullptr) { std::lock_guard lock(reader_->mutex_); reader_->CompleteWriteLocked(write_request_); write_request_.notification.release(); if (reader_->writers_.remove(*this)) { reader_->DecreaseLimitLocked(limit_); } if (reader_->writers_.empty()) { reader_->readable_.release(); } reader_ = nullptr; } limit_ = kUnlimited; } //////////////////////////////////////////////////////////////////////////////// // MpscReader methods. MpscReader::MpscReader() { last_request_ = write_requests_.begin(); } MpscReader::~MpscReader() { Close(); } bool MpscReader::connected() const { std::lock_guard lock(mutex_); return !writers_.empty(); } void MpscReader::SetBuffer(ByteSpan buffer) { std::lock_guard lock(mutex_); PW_CHECK(length_ == 0); buffer_ = buffer; offset_ = 0; } void MpscReader::SetTimeout(const duration& timeout) { std::lock_guard lock(mutex_); timeout_ = timeout; } void MpscReader::IncreaseLimit(size_t delta) { std::lock_guard lock(mutex_); IncreaseLimitLocked(delta); } void MpscReader::IncreaseLimitLocked(size_t delta) { if (delta == kUnlimited) { ++num_unlimited_; PW_CHECK_UINT_NE(num_unlimited_, 0); } else if (limit_ != kUnlimited) { PW_CHECK_UINT_LT(limit_, kUnlimited - delta); limit_ += delta; } } void MpscReader::DecreaseLimit(size_t delta) { std::lock_guard lock(mutex_); DecreaseLimitLocked(delta); } void MpscReader::DecreaseLimitLocked(size_t delta) { if (delta == kUnlimited) { PW_CHECK_UINT_NE(num_unlimited_, 0); --num_unlimited_; } else if (limit_ != kUnlimited) { PW_CHECK_UINT_LE(delta, limit_); limit_ -= delta; } } size_t MpscReader::ConservativeLimit(LimitType type) const { std::lock_guard lock(mutex_); if (type != LimitType::kRead) { return 0; } if (writers_.empty()) { return length_; } if (num_unlimited_ != 0) { return kUnlimited; } return limit_; } void MpscReader::RequestWrite(MpscWriter::Request& write_request) { std::lock_guard lock(mutex_); last_request_ = write_requests_.insert_after(last_request_, write_request); CheckWriteableLocked(); } void MpscReader::CheckWriteableLocked() { if (write_requests_.empty()) { return; } if (writers_.empty() || written_ < destination_.size() || length_ < buffer_.size()) { MpscWriter::Request& write_request = write_requests_.front(); write_request.notification.release(); } } StatusWithSize MpscReader::WriteData(ConstByteSpan data, size_t limit) { std::lock_guard lock(mutex_); if (writers_.empty()) { return StatusWithSize::OutOfRange(0); } size_t length = 0; size_t available = buffer_.size() - length_; if (written_ < destination_.size()) { // A read is pending; copy directly into its buffer. // Note: this condition is only true when the buffer is empty, so data // order is preserved. length = std::min(destination_.size() - written_, data.size()); memcpy(&destination_[written_], &data[0], length); written_ += length; } else if (available > 0) { // The buffer has space for more data. length = std::min(available, data.size()); size_t offset = (offset_ + length_) % buffer_.size(); size_t contiguous = buffer_.size() - offset; if (length <= contiguous) { memcpy(&buffer_[offset], &data[0], length); } else { memcpy(&buffer_[offset], &data[0], contiguous); memcpy(&buffer_[0], &data[contiguous], length - contiguous); } length_ += length; } else { // If there is no space available, a write request can only be notified when // its writer is closing. Do not notify the reader that data is available. return StatusWithSize(0); } data = data.subspan(length); // For unlimited writers, increase the read limit as needed. // Do this before waking the reader and releasing the lock. if (limit == kUnlimited) { IncreaseLimitLocked(length); } readable_.release(); return StatusWithSize(length); } void MpscReader::CompleteWrite(MpscWriter::Request& write_request) { std::lock_guard lock(mutex_); CompleteWriteLocked(write_request); } void MpscReader::CompleteWriteLocked(MpscWriter::Request& write_request) { MpscWriter::Request& last_request = *last_request_; write_requests_.remove(write_request); // If the last request is removed, find the new last request. This is O(n), // but the oremoved element is first unless a request is being canceled due to // its writer closing. Thus in the typical case of a successful write, this is // O(1). if (&last_request == &write_request) { last_request_ = write_requests_.begin(); for (size_t i = 1; i < write_requests_.size(); ++i) { ++last_request_; } } // The reader may have signaled this writer that it had space between the last // call to WriteData() and this call. Check if that signal should be forwarded // to the next write request. CheckWriteableLocked(); } StatusWithSize MpscReader::DoRead(ByteSpan destination) { if (destination.empty()) { return StatusWithSize(0); } mutex_.lock(); PW_CHECK(!reading_, "All reads must happen from the same thread."); reading_ = true; Status status = OkStatus(); size_t length = 0; // Check for buffered data. Do this before checking if the reader is still // connected in order to deliver data sent from a now-closed writer. if (length_ != 0) { length = std::min(length_, destination.size()); size_t contiguous = buffer_.size() - offset_; if (length < contiguous) { memcpy(&destination[0], &buffer_[offset_], length); offset_ += length; } else if (length == contiguous) { memcpy(&destination[0], &buffer_[offset_], length); offset_ = 0; } else { memcpy(&destination[0], &buffer_[offset_], contiguous); offset_ = length - contiguous; memcpy(&destination[contiguous], &buffer_[0], offset_); } length_ -= length; DecreaseLimitLocked(length); CheckWriteableLocked(); } else { // Register the output buffer to and wait for Write() to bypass the buffer // and write directly into it. Note that the buffer is only bypassed when // empty, so data order is preserved. PW_CHECK(written_ == 0); destination_ = destination; CheckWriteableLocked(); // The reader state may change while waiting, or even between acquiring the // notification and acquiring the lock. As an example, the following // sequence of events is possible: // // 1. A writer partially fills the output buffer and releases the // notification. // 2. The reader acquires the notification. // 3. Another writer fills the remainder of the buffer and releass the // notification *again*. // 4. The reader acquires the lock. // // In this case, on the *next* read, the notification will be acquired // immediately even if no data is available. As a result, this code loops // until data is available. while (status.ok()) { bool readable = true; if (!writers_.empty()) { // Wait for a writer to provide data, or the reader to be closed. duration timeout = timeout_; mutex_.unlock(); readable = Await(readable_, timeout); mutex_.lock(); } if (!readable) { status = Status::ResourceExhausted(); } else if (written_ != 0) { break; } else if (writers_.empty()) { status = Status::OutOfRange(); } } destination_ = ByteSpan(); length = written_; written_ = 0; DecreaseLimitLocked(length); CheckWriteableLocked(); } reading_ = false; if (writers_.empty()) { closeable_.release(); } mutex_.unlock(); return StatusWithSize(status, length); } Status MpscReader::ReadAll(ReadAllCallback callback) { mutex_.lock(); if (buffer_.empty()) { mutex_.unlock(); return Status::FailedPrecondition(); } PW_CHECK(!reading_, "All reads must happen from the same thread."); reading_ = true; Status status = Status::OutOfRange(); while (true) { // Check for buffered data. Do this before checking if the reader still has // writers in order to deliver data sent from a now-closed writer. if (length_ != 0) { size_t length = std::min(buffer_.size() - offset_, length_); ConstByteSpan data(&buffer_[offset_], length); offset_ = (offset_ + length_) % buffer_.size(); length_ -= length; DecreaseLimitLocked(data.size()); CheckWriteableLocked(); status = callback(data); if (!status.ok()) { break; } } if (writers_.empty()) { break; } // Wait for a writer to provide data. duration timeout = timeout_; mutex_.unlock(); bool readable = Await(readable_, timeout); mutex_.lock(); if (!readable) { status = Status::ResourceExhausted(); break; } } reading_ = false; if (writers_.empty()) { closeable_.release(); } mutex_.unlock(); return status; } void MpscReader::Close() { mutex_.lock(); if (writers_.empty()) { mutex_.unlock(); return; } IntrusiveList writers; while (!writers_.empty()) { MpscWriter& writer = writers_.front(); writers_.pop_front(); writers.push_front(writer); } // Wait for any pending read to finish. if (reading_) { mutex_.unlock(); readable_.release(); closeable_.acquire(); mutex_.lock(); } num_unlimited_ = 0; limit_ = 0; written_ = 0; offset_ = 0; length_ = 0; mutex_.unlock(); for (auto& writer : writers) { writer.Close(); } } } // namespace pw::stream