1 // Copyright 2022 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_MPSC_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_MPSC_H 17 18 #include <grpc/support/port_platform.h> 19 #include <stddef.h> 20 21 #include <algorithm> 22 #include <cstdint> 23 #include <limits> 24 #include <utility> 25 #include <vector> 26 27 #include "absl/base/thread_annotations.h" 28 #include "absl/log/check.h" 29 #include "src/core/lib/promise/activity.h" 30 #include "src/core/lib/promise/poll.h" 31 #include "src/core/lib/promise/status_flag.h" 32 #include "src/core/lib/promise/wait_set.h" 33 #include "src/core/util/dump_args.h" 34 #include "src/core/util/ref_counted.h" 35 #include "src/core/util/ref_counted_ptr.h" 36 #include "src/core/util/sync.h" 37 38 // Multi producer single consumer inter-activity comms. 39 40 namespace grpc_core { 41 42 namespace mpscpipe_detail { 43 44 // "Center" of the communication pipe. 45 // Contains sent but not received messages, and open/close state. 46 template <typename T> 47 class Center : public RefCounted<Center<T>> { 48 public: 49 // Construct the center with a maximum queue size. Center(size_t max_queued)50 explicit Center(size_t max_queued) : max_queued_(max_queued) {} 51 52 static constexpr const uint64_t kClosedBatch = 53 std::numeric_limits<uint64_t>::max(); 54 55 // Poll for new items. 56 // - Returns true if new items were obtained, in which case they are contained 57 // in dest in the order they were added. Wakes up all pending senders since 58 // there will now be space to send. 59 // - If receives have been closed, returns false. 60 // - If no new items are available, returns 61 // Pending and sets up a waker to be awoken when more items are available. 62 // TODO(ctiller): consider the problem of thundering herds here. There may be 63 // more senders than there are queue spots, and so the strategy of waking up 64 // all senders is ill-advised. 65 // That said, some senders may have been cancelled by the time we wake them, 66 // and so waking a subset could cause starvation. PollReceiveBatch(std::vector<T> & dest)67 Poll<bool> PollReceiveBatch(std::vector<T>& dest) { 68 ReleasableMutexLock lock(&mu_); 69 GRPC_TRACE_LOG(promise_primitives, INFO) 70 << "MPSC::PollReceiveBatch: " 71 << GRPC_DUMP_ARGS(this, batch_, queue_.size()); 72 if (queue_.empty()) { 73 if (batch_ == kClosedBatch) return false; 74 receive_waker_ = GetContext<Activity>()->MakeNonOwningWaker(); 75 return Pending{}; 76 } 77 dest.swap(queue_); 78 queue_.clear(); 79 if (batch_ != kClosedBatch) ++batch_; 80 auto wakeups = send_wakers_.TakeWakeupSet(); 81 lock.Release(); 82 wakeups.Wakeup(); 83 return true; 84 } 85 86 // Return value: 87 // - if the pipe is closed, returns kClosedBatch 88 // - if await_receipt is false, returns the batch number the item was sent 89 // in. 90 // - if await_receipt is true, returns the first sending batch number that 91 // guarantees the item has been received. Send(T t,bool await_receipt)92 uint64_t Send(T t, bool await_receipt) { 93 ReleasableMutexLock lock(&mu_); 94 if (batch_ == kClosedBatch) return kClosedBatch; 95 queue_.push_back(std::move(t)); 96 auto receive_waker = std::move(receive_waker_); 97 const uint64_t batch = 98 (!await_receipt && queue_.size() <= max_queued_) ? batch_ : batch_ + 1; 99 lock.Release(); 100 receive_waker.Wakeup(); 101 return batch; 102 } 103 104 // Poll until a particular batch number is received. PollReceiveBatch(uint64_t batch)105 Poll<Empty> PollReceiveBatch(uint64_t batch) { 106 ReleasableMutexLock lock(&mu_); 107 GRPC_TRACE_LOG(promise_primitives, INFO) 108 << "MPSC::PollReceiveBatch: " << GRPC_DUMP_ARGS(this, batch_, batch); 109 if (batch_ >= batch) return Empty{}; 110 send_wakers_.AddPending(GetContext<Activity>()->MakeNonOwningWaker()); 111 return Pending{}; 112 } 113 114 // Mark that the receiver is closed. ReceiverClosed(bool wake_receiver)115 void ReceiverClosed(bool wake_receiver) { 116 ReleasableMutexLock lock(&mu_); 117 GRPC_TRACE_LOG(promise_primitives, INFO) 118 << "MPSC::ReceiverClosed: " << GRPC_DUMP_ARGS(this, batch_); 119 if (batch_ == kClosedBatch) return; 120 batch_ = kClosedBatch; 121 auto wakeups = send_wakers_.TakeWakeupSet(); 122 auto receive_waker = std::move(receive_waker_); 123 lock.Release(); 124 if (wake_receiver) receive_waker.Wakeup(); 125 wakeups.Wakeup(); 126 } 127 128 private: 129 Mutex mu_; 130 const size_t max_queued_; 131 std::vector<T> queue_ ABSL_GUARDED_BY(mu_); 132 // Every time we give queue_ to the receiver, we increment batch_. 133 // When the receiver is closed we set batch_ to kClosedBatch. 134 uint64_t batch_ ABSL_GUARDED_BY(mu_) = 1; 135 Waker receive_waker_ ABSL_GUARDED_BY(mu_); 136 WaitSet send_wakers_ ABSL_GUARDED_BY(mu_); 137 }; 138 139 } // namespace mpscpipe_detail 140 141 template <typename T> 142 class MpscReceiver; 143 144 // Send half of an mpsc pipe. 145 template <typename T> 146 class MpscSender { 147 public: 148 MpscSender(const MpscSender&) = default; 149 MpscSender& operator=(const MpscSender&) = default; 150 MpscSender(MpscSender&&) noexcept = default; 151 MpscSender& operator=(MpscSender&&) noexcept = default; 152 153 // Return a promise that will send one item. 154 // Resolves to true if sent, false if the receiver was closed (and the value 155 // will never be successfully sent). Send(T t)156 auto Send(T t) { return SendGeneric<false>(std::move(t)); } 157 158 // Per send, but do not resolve until the item has been received by the 159 // receiver. SendAcked(T t)160 auto SendAcked(T t) { return SendGeneric<true>(std::move(t)); } 161 UnbufferedImmediateSend(T t)162 bool UnbufferedImmediateSend(T t) { 163 return center_->Send(std::move(t), false) != 164 mpscpipe_detail::Center<T>::kClosedBatch; 165 } 166 167 private: 168 template <bool kAwaitReceipt> SendGeneric(T t)169 auto SendGeneric(T t) { 170 return [center = center_, t = std::move(t), 171 batch = uint64_t(0)]() mutable -> Poll<bool> { 172 if (center == nullptr) return false; 173 if (batch == 0) { 174 batch = center->Send(std::move(t), kAwaitReceipt); 175 CHECK_NE(batch, 0u); 176 if (batch == mpscpipe_detail::Center<T>::kClosedBatch) return false; 177 } 178 auto p = center->PollReceiveBatch(batch); 179 if (p.pending()) return Pending{}; 180 return true; 181 }; 182 } 183 184 friend class MpscReceiver<T>; MpscSender(RefCountedPtr<mpscpipe_detail::Center<T>> center)185 explicit MpscSender(RefCountedPtr<mpscpipe_detail::Center<T>> center) 186 : center_(std::move(center)) {} 187 RefCountedPtr<mpscpipe_detail::Center<T>> center_; 188 }; 189 190 // Receive half of an mpsc pipe. 191 template <typename T> 192 class MpscReceiver { 193 public: 194 // max_buffer_hint is the maximum number of elements we'd like to buffer. 195 // We half this before passing to Center so that the number there is the 196 // maximum number of elements that can be queued in the center of the pipe. 197 // The receiver also holds some of the buffered elements (up to half of them!) 198 // so the total outstanding is equal to max_buffer_hint (unless it's 1 in 199 // which case instantaneosly we may have two elements buffered). MpscReceiver(size_t max_buffer_hint)200 explicit MpscReceiver(size_t max_buffer_hint) 201 : center_(MakeRefCounted<mpscpipe_detail::Center<T>>( 202 std::max(static_cast<size_t>(1), max_buffer_hint / 2))) {} ~MpscReceiver()203 ~MpscReceiver() { 204 if (center_ != nullptr) center_->ReceiverClosed(false); 205 } MarkClosed()206 void MarkClosed() { 207 if (center_ != nullptr) center_->ReceiverClosed(true); 208 } 209 MpscReceiver(const MpscReceiver&) = delete; 210 MpscReceiver& operator=(const MpscReceiver&) = delete; 211 // Only movable until it's first polled, and so we don't need to contend with 212 // a non-empty buffer during a legal move! MpscReceiver(MpscReceiver && other)213 MpscReceiver(MpscReceiver&& other) noexcept 214 : center_(std::move(other.center_)) { 215 DCHECK(other.buffer_.empty()); 216 } 217 MpscReceiver& operator=(MpscReceiver&& other) noexcept { 218 DCHECK(other.buffer_.empty()); 219 center_ = std::move(other.center_); 220 return *this; 221 } 222 223 // Construct a new sender for this receiver. MakeSender()224 MpscSender<T> MakeSender() { return MpscSender<T>(center_); } 225 226 // Return a promise that will resolve to ValueOrFailure<T>. 227 // If receiving is closed, it will resolve to failure. 228 // Otherwise, resolves to the next item (and removes said item). Next()229 auto Next() { 230 return [this]() -> Poll<ValueOrFailure<T>> { 231 if (buffer_it_ != buffer_.end()) { 232 return Poll<ValueOrFailure<T>>(std::move(*buffer_it_++)); 233 } 234 auto p = center_->PollReceiveBatch(buffer_); 235 if (bool* r = p.value_if_ready()) { 236 if (!*r) return Failure{}; 237 buffer_it_ = buffer_.begin(); 238 return Poll<ValueOrFailure<T>>(std::move(*buffer_it_++)); 239 } 240 return Pending{}; 241 }; 242 } 243 244 private: 245 // Received items. We move out of here one by one, but don't resize the 246 // vector. Instead, when we run out of items, we poll the center for more - 247 // which swaps this buffer in for the new receive queue and clears it. 248 // In this way, upon hitting a steady state the queue ought to be allocation 249 // free. 250 std::vector<T> buffer_; 251 typename std::vector<T>::iterator buffer_it_ = buffer_.end(); 252 RefCountedPtr<mpscpipe_detail::Center<T>> center_; 253 }; 254 255 } // namespace grpc_core 256 257 #endif // GRPC_SRC_CORE_LIB_PROMISE_MPSC_H 258