• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2022 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_MPSC_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_MPSC_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <stddef.h>
20 
21 #include <algorithm>
22 #include <cstdint>
23 #include <limits>
24 #include <utility>
25 #include <vector>
26 
27 #include "absl/base/thread_annotations.h"
28 #include "absl/log/check.h"
29 #include "src/core/lib/promise/activity.h"
30 #include "src/core/lib/promise/poll.h"
31 #include "src/core/lib/promise/status_flag.h"
32 #include "src/core/lib/promise/wait_set.h"
33 #include "src/core/util/dump_args.h"
34 #include "src/core/util/ref_counted.h"
35 #include "src/core/util/ref_counted_ptr.h"
36 #include "src/core/util/sync.h"
37 
38 // Multi producer single consumer inter-activity comms.
39 
40 namespace grpc_core {
41 
42 namespace mpscpipe_detail {
43 
44 // "Center" of the communication pipe.
45 // Contains sent but not received messages, and open/close state.
46 template <typename T>
47 class Center : public RefCounted<Center<T>> {
48  public:
49   // Construct the center with a maximum queue size.
Center(size_t max_queued)50   explicit Center(size_t max_queued) : max_queued_(max_queued) {}
51 
52   static constexpr const uint64_t kClosedBatch =
53       std::numeric_limits<uint64_t>::max();
54 
55   // Poll for new items.
56   // - Returns true if new items were obtained, in which case they are contained
57   //   in dest in the order they were added. Wakes up all pending senders since
58   //   there will now be space to send.
59   // - If receives have been closed, returns false.
60   // - If no new items are available, returns
61   //   Pending and sets up a waker to be awoken when more items are available.
62   // TODO(ctiller): consider the problem of thundering herds here. There may be
63   // more senders than there are queue spots, and so the strategy of waking up
64   // all senders is ill-advised.
65   // That said, some senders may have been cancelled by the time we wake them,
66   // and so waking a subset could cause starvation.
PollReceiveBatch(std::vector<T> & dest)67   Poll<bool> PollReceiveBatch(std::vector<T>& dest) {
68     ReleasableMutexLock lock(&mu_);
69     GRPC_TRACE_LOG(promise_primitives, INFO)
70         << "MPSC::PollReceiveBatch: "
71         << GRPC_DUMP_ARGS(this, batch_, queue_.size());
72     if (queue_.empty()) {
73       if (batch_ == kClosedBatch) return false;
74       receive_waker_ = GetContext<Activity>()->MakeNonOwningWaker();
75       return Pending{};
76     }
77     dest.swap(queue_);
78     queue_.clear();
79     if (batch_ != kClosedBatch) ++batch_;
80     auto wakeups = send_wakers_.TakeWakeupSet();
81     lock.Release();
82     wakeups.Wakeup();
83     return true;
84   }
85 
86   // Return value:
87   //  - if the pipe is closed, returns kClosedBatch
88   //  - if await_receipt is false, returns the batch number the item was sent
89   //  in.
90   //  - if await_receipt is true, returns the first sending batch number that
91   //  guarantees the item has been received.
Send(T t,bool await_receipt)92   uint64_t Send(T t, bool await_receipt) {
93     ReleasableMutexLock lock(&mu_);
94     if (batch_ == kClosedBatch) return kClosedBatch;
95     queue_.push_back(std::move(t));
96     auto receive_waker = std::move(receive_waker_);
97     const uint64_t batch =
98         (!await_receipt && queue_.size() <= max_queued_) ? batch_ : batch_ + 1;
99     lock.Release();
100     receive_waker.Wakeup();
101     return batch;
102   }
103 
104   // Poll until a particular batch number is received.
PollReceiveBatch(uint64_t batch)105   Poll<Empty> PollReceiveBatch(uint64_t batch) {
106     ReleasableMutexLock lock(&mu_);
107     GRPC_TRACE_LOG(promise_primitives, INFO)
108         << "MPSC::PollReceiveBatch: " << GRPC_DUMP_ARGS(this, batch_, batch);
109     if (batch_ >= batch) return Empty{};
110     send_wakers_.AddPending(GetContext<Activity>()->MakeNonOwningWaker());
111     return Pending{};
112   }
113 
114   // Mark that the receiver is closed.
ReceiverClosed(bool wake_receiver)115   void ReceiverClosed(bool wake_receiver) {
116     ReleasableMutexLock lock(&mu_);
117     GRPC_TRACE_LOG(promise_primitives, INFO)
118         << "MPSC::ReceiverClosed: " << GRPC_DUMP_ARGS(this, batch_);
119     if (batch_ == kClosedBatch) return;
120     batch_ = kClosedBatch;
121     auto wakeups = send_wakers_.TakeWakeupSet();
122     auto receive_waker = std::move(receive_waker_);
123     lock.Release();
124     if (wake_receiver) receive_waker.Wakeup();
125     wakeups.Wakeup();
126   }
127 
128  private:
129   Mutex mu_;
130   const size_t max_queued_;
131   std::vector<T> queue_ ABSL_GUARDED_BY(mu_);
132   // Every time we give queue_ to the receiver, we increment batch_.
133   // When the receiver is closed we set batch_ to kClosedBatch.
134   uint64_t batch_ ABSL_GUARDED_BY(mu_) = 1;
135   Waker receive_waker_ ABSL_GUARDED_BY(mu_);
136   WaitSet send_wakers_ ABSL_GUARDED_BY(mu_);
137 };
138 
139 }  // namespace mpscpipe_detail
140 
141 template <typename T>
142 class MpscReceiver;
143 
144 // Send half of an mpsc pipe.
145 template <typename T>
146 class MpscSender {
147  public:
148   MpscSender(const MpscSender&) = default;
149   MpscSender& operator=(const MpscSender&) = default;
150   MpscSender(MpscSender&&) noexcept = default;
151   MpscSender& operator=(MpscSender&&) noexcept = default;
152 
153   // Return a promise that will send one item.
154   // Resolves to true if sent, false if the receiver was closed (and the value
155   // will never be successfully sent).
Send(T t)156   auto Send(T t) { return SendGeneric<false>(std::move(t)); }
157 
158   // Per send, but do not resolve until the item has been received by the
159   // receiver.
SendAcked(T t)160   auto SendAcked(T t) { return SendGeneric<true>(std::move(t)); }
161 
UnbufferedImmediateSend(T t)162   bool UnbufferedImmediateSend(T t) {
163     return center_->Send(std::move(t), false) !=
164            mpscpipe_detail::Center<T>::kClosedBatch;
165   }
166 
167  private:
168   template <bool kAwaitReceipt>
SendGeneric(T t)169   auto SendGeneric(T t) {
170     return [center = center_, t = std::move(t),
171             batch = uint64_t(0)]() mutable -> Poll<bool> {
172       if (center == nullptr) return false;
173       if (batch == 0) {
174         batch = center->Send(std::move(t), kAwaitReceipt);
175         CHECK_NE(batch, 0u);
176         if (batch == mpscpipe_detail::Center<T>::kClosedBatch) return false;
177       }
178       auto p = center->PollReceiveBatch(batch);
179       if (p.pending()) return Pending{};
180       return true;
181     };
182   }
183 
184   friend class MpscReceiver<T>;
MpscSender(RefCountedPtr<mpscpipe_detail::Center<T>> center)185   explicit MpscSender(RefCountedPtr<mpscpipe_detail::Center<T>> center)
186       : center_(std::move(center)) {}
187   RefCountedPtr<mpscpipe_detail::Center<T>> center_;
188 };
189 
190 // Receive half of an mpsc pipe.
191 template <typename T>
192 class MpscReceiver {
193  public:
194   // max_buffer_hint is the maximum number of elements we'd like to buffer.
195   // We half this before passing to Center so that the number there is the
196   // maximum number of elements that can be queued in the center of the pipe.
197   // The receiver also holds some of the buffered elements (up to half of them!)
198   // so the total outstanding is equal to max_buffer_hint (unless it's 1 in
199   // which case instantaneosly we may have two elements buffered).
MpscReceiver(size_t max_buffer_hint)200   explicit MpscReceiver(size_t max_buffer_hint)
201       : center_(MakeRefCounted<mpscpipe_detail::Center<T>>(
202             std::max(static_cast<size_t>(1), max_buffer_hint / 2))) {}
~MpscReceiver()203   ~MpscReceiver() {
204     if (center_ != nullptr) center_->ReceiverClosed(false);
205   }
MarkClosed()206   void MarkClosed() {
207     if (center_ != nullptr) center_->ReceiverClosed(true);
208   }
209   MpscReceiver(const MpscReceiver&) = delete;
210   MpscReceiver& operator=(const MpscReceiver&) = delete;
211   // Only movable until it's first polled, and so we don't need to contend with
212   // a non-empty buffer during a legal move!
MpscReceiver(MpscReceiver && other)213   MpscReceiver(MpscReceiver&& other) noexcept
214       : center_(std::move(other.center_)) {
215     DCHECK(other.buffer_.empty());
216   }
217   MpscReceiver& operator=(MpscReceiver&& other) noexcept {
218     DCHECK(other.buffer_.empty());
219     center_ = std::move(other.center_);
220     return *this;
221   }
222 
223   // Construct a new sender for this receiver.
MakeSender()224   MpscSender<T> MakeSender() { return MpscSender<T>(center_); }
225 
226   // Return a promise that will resolve to ValueOrFailure<T>.
227   // If receiving is closed, it will resolve to failure.
228   // Otherwise, resolves to the next item (and removes said item).
Next()229   auto Next() {
230     return [this]() -> Poll<ValueOrFailure<T>> {
231       if (buffer_it_ != buffer_.end()) {
232         return Poll<ValueOrFailure<T>>(std::move(*buffer_it_++));
233       }
234       auto p = center_->PollReceiveBatch(buffer_);
235       if (bool* r = p.value_if_ready()) {
236         if (!*r) return Failure{};
237         buffer_it_ = buffer_.begin();
238         return Poll<ValueOrFailure<T>>(std::move(*buffer_it_++));
239       }
240       return Pending{};
241     };
242   }
243 
244  private:
245   // Received items. We move out of here one by one, but don't resize the
246   // vector. Instead, when we run out of items, we poll the center for more -
247   // which swaps this buffer in for the new receive queue and clears it.
248   // In this way, upon hitting a steady state the queue ought to be allocation
249   // free.
250   std::vector<T> buffer_;
251   typename std::vector<T>::iterator buffer_it_ = buffer_.end();
252   RefCountedPtr<mpscpipe_detail::Center<T>> center_;
253 };
254 
255 }  // namespace grpc_core
256 
257 #endif  // GRPC_SRC_CORE_LIB_PROMISE_MPSC_H
258