// Copyright 2022 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef GRPC_SRC_CORE_LIB_PROMISE_MPSC_H #define GRPC_SRC_CORE_LIB_PROMISE_MPSC_H #include #include #include #include #include #include "absl/base/thread_annotations.h" #include #include "src/core/lib/gprpp/ref_counted.h" #include "src/core/lib/gprpp/ref_counted_ptr.h" #include "src/core/lib/gprpp/sync.h" #include "src/core/lib/promise/activity.h" #include "src/core/lib/promise/poll.h" #include "src/core/lib/promise/wait_set.h" // Multi producer single consumer inter-activity comms. namespace grpc_core { namespace mpscpipe_detail { // "Center" of the communication pipe. // Contains sent but not received messages, and open/close state. template class Center : public RefCounted> { public: // Construct the center with a maximum queue size. explicit Center(size_t max_queued) : max_queued_(max_queued) {} // Poll for new items. // - Returns true if new items were obtained, in which case they are contained // in dest in the order they were added. Wakes up all pending senders since // there will now be space to send. // - If no new items are available, returns // false and sets up a waker to be awoken when more items are available. // TODO(ctiller): consider the problem of thundering herds here. There may be // more senders than there are queue spots, and so the strategy of waking up // all senders is ill-advised. // That said, some senders may have been cancelled by the time we wake them, // and so waking a subset could cause starvation. bool PollReceiveBatch(std::vector& dest) { ReleasableMutexLock lock(&mu_); if (queue_.empty()) { receive_waker_ = GetContext()->MakeNonOwningWaker(); return false; } dest.swap(queue_); queue_.clear(); auto wakeups = send_wakers_.TakeWakeupSet(); lock.Release(); wakeups.Wakeup(); return true; } // Poll to send one item. // Returns pending if no send slot was available. // Returns true if the item was sent. // Returns false if the receiver has been closed. Poll PollSend(T& t) { ReleasableMutexLock lock(&mu_); if (receiver_closed_) return Poll(false); if (queue_.size() < max_queued_) { queue_.push_back(std::move(t)); auto receive_waker = std::move(receive_waker_); lock.Release(); receive_waker.Wakeup(); return Poll(true); } send_wakers_.AddPending(GetContext()->MakeNonOwningWaker()); return Pending{}; } bool ImmediateSend(T t) { ReleasableMutexLock lock(&mu_); if (receiver_closed_) return false; queue_.push_back(std::move(t)); auto receive_waker = std::move(receive_waker_); lock.Release(); receive_waker.Wakeup(); return true; } // Mark that the receiver is closed. void ReceiverClosed() { ReleasableMutexLock lock(&mu_); if (receiver_closed_) return; receiver_closed_ = true; auto wakeups = send_wakers_.TakeWakeupSet(); lock.Release(); wakeups.Wakeup(); } private: Mutex mu_; const size_t max_queued_; std::vector queue_ ABSL_GUARDED_BY(mu_); bool receiver_closed_ ABSL_GUARDED_BY(mu_) = false; Waker receive_waker_ ABSL_GUARDED_BY(mu_); WaitSet send_wakers_ ABSL_GUARDED_BY(mu_); }; } // namespace mpscpipe_detail template class MpscReceiver; // Send half of an mpsc pipe. template class MpscSender { public: MpscSender(const MpscSender&) = default; MpscSender& operator=(const MpscSender&) = default; MpscSender(MpscSender&&) noexcept = default; MpscSender& operator=(MpscSender&&) noexcept = default; // Return a promise that will send one item. // Resolves to true if sent, false if the receiver was closed (and the value // will never be successfully sent). auto Send(T t) { return [center = center_, t = std::move(t)]() mutable -> Poll { if (center == nullptr) return false; return center->PollSend(t); }; } bool UnbufferedImmediateSend(T t) { return center_->ImmediateSend(std::move(t)); } private: friend class MpscReceiver; explicit MpscSender(RefCountedPtr> center) : center_(std::move(center)) {} RefCountedPtr> center_; }; // Receive half of an mpsc pipe. template class MpscReceiver { public: // max_buffer_hint is the maximum number of elements we'd like to buffer. // We half this before passing to Center so that the number there is the // maximum number of elements that can be queued in the center of the pipe. // The receiver also holds some of the buffered elements (up to half of them!) // so the total outstanding is equal to max_buffer_hint (unless it's 1 in // which case instantaneosly we may have two elements buffered). explicit MpscReceiver(size_t max_buffer_hint) : center_(MakeRefCounted>( std::max(static_cast(1), max_buffer_hint / 2))) {} ~MpscReceiver() { if (center_ != nullptr) center_->ReceiverClosed(); } void MarkClosed() { if (center_ != nullptr) center_->ReceiverClosed(); } MpscReceiver(const MpscReceiver&) = delete; MpscReceiver& operator=(const MpscReceiver&) = delete; // Only movable until it's first polled, and so we don't need to contend with // a non-empty buffer during a legal move! MpscReceiver(MpscReceiver&& other) noexcept : center_(std::move(other.center_)) { GPR_DEBUG_ASSERT(other.buffer_.empty()); } MpscReceiver& operator=(MpscReceiver&& other) noexcept { GPR_DEBUG_ASSERT(other.buffer_.empty()); center_ = std::move(other.center_); return *this; } // Construct a new sender for this receiver. MpscSender MakeSender() { return MpscSender(center_); } // Return a promise that will resolve to the next item (and remove said item). auto Next() { return [this]() -> Poll { if (buffer_it_ != buffer_.end()) { return Poll(std::move(*buffer_it_++)); } if (center_->PollReceiveBatch(buffer_)) { buffer_it_ = buffer_.begin(); return Poll(std::move(*buffer_it_++)); } return Pending{}; }; } private: // Received items. We move out of here one by one, but don't resize the // vector. Instead, when we run out of items, we poll the center for more - // which swaps this buffer in for the new receive queue and clears it. // In this way, upon hitting a steady state the queue ought to be allocation // free. std::vector buffer_; typename std::vector::iterator buffer_it_ = buffer_.end(); RefCountedPtr> center_; }; } // namespace grpc_core #endif // GRPC_SRC_CORE_LIB_PROMISE_MPSC_H