• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stdint.h>
21 #include <stdlib.h>
22 
23 #include <memory>
24 #include <string>
25 #include <utility>
26 
27 #include "absl/strings/str_cat.h"
28 #include "absl/types/optional.h"
29 #include "absl/types/variant.h"
30 
31 #include <grpc/support/log.h>
32 
33 #include "src/core/lib/gprpp/debug_location.h"
34 #include "src/core/lib/gprpp/ref_counted_ptr.h"
35 #include "src/core/lib/promise/activity.h"
36 #include "src/core/lib/promise/context.h"
37 #include "src/core/lib/promise/if.h"
38 #include "src/core/lib/promise/interceptor_list.h"
39 #include "src/core/lib/promise/map.h"
40 #include "src/core/lib/promise/poll.h"
41 #include "src/core/lib/promise/seq.h"
42 #include "src/core/lib/promise/trace.h"
43 #include "src/core/lib/resource_quota/arena.h"
44 
45 namespace grpc_core {
46 
47 namespace pipe_detail {
48 template <typename T>
49 class Center;
50 }
51 
52 template <typename T>
53 struct Pipe;
54 
55 // Result of Pipe::Next - represents a received value.
56 // If has_value() is false, the pipe was closed by the time we polled for the
57 // next value. No value was received, nor will there ever be.
58 // This type is movable but not copyable.
59 // Once the final move is destroyed the pipe will ack the read and unblock the
60 // send.
61 template <typename T>
62 class NextResult final {
63  public:
NextResult()64   NextResult() : center_(nullptr) {}
NextResult(RefCountedPtr<pipe_detail::Center<T>> center)65   explicit NextResult(RefCountedPtr<pipe_detail::Center<T>> center)
66       : center_(std::move(center)) {
67     GPR_ASSERT(center_ != nullptr);
68   }
NextResult(bool cancelled)69   explicit NextResult(bool cancelled)
70       : center_(nullptr), cancelled_(cancelled) {}
71   ~NextResult();
72   NextResult(const NextResult&) = delete;
73   NextResult& operator=(const NextResult&) = delete;
74   NextResult(NextResult&& other) noexcept = default;
75   NextResult& operator=(NextResult&& other) noexcept = default;
76 
77   using value_type = T;
78 
79   void reset();
80   bool has_value() const;
81   // Only valid if has_value()
value()82   const T& value() const {
83     GPR_ASSERT(has_value());
84     return **this;
85   }
value()86   T& value() {
87     GPR_ASSERT(has_value());
88     return **this;
89   }
90   const T& operator*() const;
91   T& operator*();
92   // Only valid if !has_value()
cancelled()93   bool cancelled() { return cancelled_; }
94 
95  private:
96   RefCountedPtr<pipe_detail::Center<T>> center_;
97   bool cancelled_;
98 };
99 
100 namespace pipe_detail {
101 
102 template <typename T>
103 class Push;
104 template <typename T>
105 class Next;
106 
107 // Center sits between a sender and a receiver to provide a one-deep buffer of
108 // Ts
109 template <typename T>
110 class Center : public InterceptorList<T> {
111  public:
112   // Initialize with one send ref (held by PipeSender) and one recv ref (held by
113   // PipeReceiver)
Center()114   Center() {
115     refs_ = 2;
116     value_state_ = ValueState::kEmpty;
117   }
118 
119   // Add one ref to this object, and return this.
IncrementRefCount()120   void IncrementRefCount() {
121     if (grpc_trace_promise_primitives.enabled()) {
122       gpr_log(GPR_DEBUG, "%s", DebugOpString("IncrementRefCount").c_str());
123     }
124     refs_++;
125     GPR_DEBUG_ASSERT(refs_ != 0);
126   }
127 
Ref()128   RefCountedPtr<Center> Ref() {
129     IncrementRefCount();
130     return RefCountedPtr<Center>(this);
131   }
132 
133   // Drop a ref
134   // If no refs remain, destroy this object
Unref()135   void Unref() {
136     if (grpc_trace_promise_primitives.enabled()) {
137       gpr_log(GPR_DEBUG, "%s", DebugOpString("Unref").c_str());
138     }
139     GPR_DEBUG_ASSERT(refs_ > 0);
140     refs_--;
141     if (0 == refs_) {
142       this->~Center();
143     }
144   }
145 
146   // Try to push *value into the pipe.
147   // Return Pending if there is no space.
148   // Return true if the value was pushed.
149   // Return false if the recv end is closed.
Push(T * value)150   Poll<bool> Push(T* value) {
151     if (grpc_trace_promise_primitives.enabled()) {
152       gpr_log(GPR_INFO, "%s", DebugOpString("Push").c_str());
153     }
154     GPR_DEBUG_ASSERT(refs_ != 0);
155     switch (value_state_) {
156       case ValueState::kClosed:
157       case ValueState::kReadyClosed:
158       case ValueState::kCancelled:
159       case ValueState::kWaitingForAckAndClosed:
160         return false;
161       case ValueState::kReady:
162       case ValueState::kAcked:
163       case ValueState::kWaitingForAck:
164         return on_empty_.pending();
165       case ValueState::kEmpty:
166         value_state_ = ValueState::kReady;
167         value_ = std::move(*value);
168         on_full_.Wake();
169         return true;
170     }
171     GPR_UNREACHABLE_CODE(return false);
172   }
173 
PollAck()174   Poll<bool> PollAck() {
175     if (grpc_trace_promise_primitives.enabled()) {
176       gpr_log(GPR_INFO, "%s", DebugOpString("PollAck").c_str());
177     }
178     GPR_DEBUG_ASSERT(refs_ != 0);
179     switch (value_state_) {
180       case ValueState::kClosed:
181         return true;
182       case ValueState::kCancelled:
183         return false;
184       case ValueState::kReady:
185       case ValueState::kReadyClosed:
186       case ValueState::kEmpty:
187       case ValueState::kWaitingForAck:
188       case ValueState::kWaitingForAckAndClosed:
189         return on_empty_.pending();
190       case ValueState::kAcked:
191         value_state_ = ValueState::kEmpty;
192         on_empty_.Wake();
193         return true;
194     }
195     return true;
196   }
197 
198   // Try to receive a value from the pipe.
199   // Return Pending if there is no value.
200   // Return the value if one was retrieved.
201   // Return nullopt if the send end is closed and no value had been pushed.
Next()202   Poll<absl::optional<T>> Next() {
203     if (grpc_trace_promise_primitives.enabled()) {
204       gpr_log(GPR_INFO, "%s", DebugOpString("Next").c_str());
205     }
206     GPR_DEBUG_ASSERT(refs_ != 0);
207     switch (value_state_) {
208       case ValueState::kEmpty:
209       case ValueState::kAcked:
210       case ValueState::kWaitingForAck:
211       case ValueState::kWaitingForAckAndClosed:
212         return on_full_.pending();
213       case ValueState::kReadyClosed:
214         value_state_ = ValueState::kWaitingForAckAndClosed;
215         return std::move(value_);
216       case ValueState::kReady:
217         value_state_ = ValueState::kWaitingForAck;
218         return std::move(value_);
219       case ValueState::kClosed:
220       case ValueState::kCancelled:
221         return absl::nullopt;
222     }
223     GPR_UNREACHABLE_CODE(return absl::nullopt);
224   }
225 
226   // Check if the pipe is closed for sending (if there is a value still queued
227   // but the pipe is closed, reports closed).
PollClosedForSender()228   Poll<bool> PollClosedForSender() {
229     if (grpc_trace_promise_primitives.enabled()) {
230       gpr_log(GPR_INFO, "%s", DebugOpString("PollClosedForSender").c_str());
231     }
232     GPR_DEBUG_ASSERT(refs_ != 0);
233     switch (value_state_) {
234       case ValueState::kEmpty:
235       case ValueState::kAcked:
236       case ValueState::kReady:
237       case ValueState::kWaitingForAck:
238         return on_closed_.pending();
239       case ValueState::kWaitingForAckAndClosed:
240       case ValueState::kReadyClosed:
241       case ValueState::kClosed:
242         return false;
243       case ValueState::kCancelled:
244         return true;
245     }
246     GPR_UNREACHABLE_CODE(return true);
247   }
248 
249   // Check if the pipe is closed for receiving (if there is a value still queued
250   // but the pipe is closed, reports open).
PollClosedForReceiver()251   Poll<bool> PollClosedForReceiver() {
252     if (grpc_trace_promise_primitives.enabled()) {
253       gpr_log(GPR_INFO, "%s", DebugOpString("PollClosedForReceiver").c_str());
254     }
255     GPR_DEBUG_ASSERT(refs_ != 0);
256     switch (value_state_) {
257       case ValueState::kEmpty:
258       case ValueState::kAcked:
259       case ValueState::kReady:
260       case ValueState::kReadyClosed:
261       case ValueState::kWaitingForAck:
262       case ValueState::kWaitingForAckAndClosed:
263         return on_closed_.pending();
264       case ValueState::kClosed:
265         return false;
266       case ValueState::kCancelled:
267         return true;
268     }
269     GPR_UNREACHABLE_CODE(return true);
270   }
271 
PollEmpty()272   Poll<Empty> PollEmpty() {
273     if (grpc_trace_promise_primitives.enabled()) {
274       gpr_log(GPR_INFO, "%s", DebugOpString("PollEmpty").c_str());
275     }
276     GPR_DEBUG_ASSERT(refs_ != 0);
277     switch (value_state_) {
278       case ValueState::kReady:
279       case ValueState::kReadyClosed:
280         return on_empty_.pending();
281       case ValueState::kWaitingForAck:
282       case ValueState::kWaitingForAckAndClosed:
283       case ValueState::kAcked:
284       case ValueState::kEmpty:
285       case ValueState::kClosed:
286       case ValueState::kCancelled:
287         return Empty{};
288     }
289     GPR_UNREACHABLE_CODE(return Empty{});
290   }
291 
AckNext()292   void AckNext() {
293     if (grpc_trace_promise_primitives.enabled()) {
294       gpr_log(GPR_INFO, "%s", DebugOpString("AckNext").c_str());
295     }
296     switch (value_state_) {
297       case ValueState::kReady:
298       case ValueState::kWaitingForAck:
299         value_state_ = ValueState::kAcked;
300         on_empty_.Wake();
301         break;
302       case ValueState::kReadyClosed:
303       case ValueState::kWaitingForAckAndClosed:
304         this->ResetInterceptorList();
305         value_state_ = ValueState::kClosed;
306         on_closed_.Wake();
307         on_empty_.Wake();
308         on_full_.Wake();
309         break;
310       case ValueState::kClosed:
311       case ValueState::kCancelled:
312         break;
313       case ValueState::kEmpty:
314       case ValueState::kAcked:
315         abort();
316     }
317   }
318 
MarkClosed()319   void MarkClosed() {
320     if (grpc_trace_promise_primitives.enabled()) {
321       gpr_log(GPR_INFO, "%s", DebugOpString("MarkClosed").c_str());
322     }
323     switch (value_state_) {
324       case ValueState::kEmpty:
325       case ValueState::kAcked:
326         this->ResetInterceptorList();
327         value_state_ = ValueState::kClosed;
328         on_empty_.Wake();
329         on_full_.Wake();
330         on_closed_.Wake();
331         break;
332       case ValueState::kReady:
333         value_state_ = ValueState::kReadyClosed;
334         on_closed_.Wake();
335         break;
336       case ValueState::kWaitingForAck:
337         value_state_ = ValueState::kWaitingForAckAndClosed;
338         on_closed_.Wake();
339         break;
340       case ValueState::kReadyClosed:
341       case ValueState::kClosed:
342       case ValueState::kCancelled:
343       case ValueState::kWaitingForAckAndClosed:
344         break;
345     }
346   }
347 
MarkCancelled()348   void MarkCancelled() {
349     if (grpc_trace_promise_primitives.enabled()) {
350       gpr_log(GPR_INFO, "%s", DebugOpString("MarkCancelled").c_str());
351     }
352     switch (value_state_) {
353       case ValueState::kEmpty:
354       case ValueState::kAcked:
355       case ValueState::kReady:
356       case ValueState::kReadyClosed:
357       case ValueState::kWaitingForAck:
358       case ValueState::kWaitingForAckAndClosed:
359         this->ResetInterceptorList();
360         value_state_ = ValueState::kCancelled;
361         on_empty_.Wake();
362         on_full_.Wake();
363         on_closed_.Wake();
364         break;
365       case ValueState::kClosed:
366       case ValueState::kCancelled:
367         break;
368     }
369   }
370 
cancelled()371   bool cancelled() { return value_state_ == ValueState::kCancelled; }
372 
value()373   T& value() { return value_; }
value()374   const T& value() const { return value_; }
375 
DebugTag()376   std::string DebugTag() {
377     if (auto* activity = GetContext<Activity>()) {
378       return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this),
379                           "]: ");
380     } else {
381       return absl::StrCat("PIPE[0x", reinterpret_cast<uintptr_t>(this), "]: ");
382     }
383   }
384 
385  private:
386   // State of value_.
387   enum class ValueState : uint8_t {
388     // No value is set, it's possible to send.
389     kEmpty,
390     // Value has been pushed but not acked, it's possible to receive.
391     kReady,
392     // Value has been read and not acked, both send/receive blocked until ack.
393     kWaitingForAck,
394     // Value has been received and acked, we can unblock senders and transition
395     // to empty.
396     kAcked,
397     // Pipe is closed successfully, no more values can be sent
398     kClosed,
399     // Pipe is closed successfully, no more values can be sent
400     // (but one value is queued and ready to be received)
401     kReadyClosed,
402     // Pipe is closed successfully, no more values can be sent
403     // (but one value is queued and waiting to be acked)
404     kWaitingForAckAndClosed,
405     // Pipe is closed unsuccessfully, no more values can be sent
406     kCancelled,
407   };
408 
DebugOpString(std::string op)409   std::string DebugOpString(std::string op) {
410     return absl::StrCat(DebugTag(), op, " refs=", refs_,
411                         " value_state=", ValueStateName(value_state_),
412                         " on_empty=", on_empty_.DebugString().c_str(),
413                         " on_full=", on_full_.DebugString().c_str(),
414                         " on_closed=", on_closed_.DebugString().c_str());
415   }
416 
ValueStateName(ValueState state)417   static const char* ValueStateName(ValueState state) {
418     switch (state) {
419       case ValueState::kEmpty:
420         return "Empty";
421       case ValueState::kReady:
422         return "Ready";
423       case ValueState::kAcked:
424         return "Acked";
425       case ValueState::kClosed:
426         return "Closed";
427       case ValueState::kReadyClosed:
428         return "ReadyClosed";
429       case ValueState::kWaitingForAck:
430         return "WaitingForAck";
431       case ValueState::kWaitingForAckAndClosed:
432         return "WaitingForAckAndClosed";
433       case ValueState::kCancelled:
434         return "Cancelled";
435     }
436     GPR_UNREACHABLE_CODE(return "unknown");
437   }
438 
439   T value_;
440   // Number of refs
441   uint8_t refs_;
442   // Current state of the value.
443   ValueState value_state_;
444   IntraActivityWaiter on_empty_;
445   IntraActivityWaiter on_full_;
446   IntraActivityWaiter on_closed_;
447 
448   // Make failure to destruct show up in ASAN builds.
449 #ifndef NDEBUG
450   std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
451 #endif
452 };
453 
454 }  // namespace pipe_detail
455 
456 // Send end of a Pipe.
457 template <typename T>
458 class PipeSender {
459  public:
460   using PushType = pipe_detail::Push<T>;
461 
462   PipeSender(const PipeSender&) = delete;
463   PipeSender& operator=(const PipeSender&) = delete;
464   PipeSender(PipeSender&& other) noexcept = default;
465   PipeSender& operator=(PipeSender&& other) noexcept = default;
466 
~PipeSender()467   ~PipeSender() {
468     if (center_ != nullptr) center_->MarkClosed();
469   }
470 
Close()471   void Close() {
472     if (center_ != nullptr) {
473       center_->MarkClosed();
474       center_.reset();
475     }
476   }
477 
CloseWithError()478   void CloseWithError() {
479     if (center_ != nullptr) {
480       center_->MarkCancelled();
481       center_.reset();
482     }
483   }
484 
Swap(PipeSender<T> * other)485   void Swap(PipeSender<T>* other) { std::swap(center_, other->center_); }
486 
487   // Send a single message along the pipe.
488   // Returns a promise that will resolve to a bool - true if the message was
489   // sent, false if it could never be sent. Blocks the promise until the
490   // receiver is either closed or able to receive another message.
491   PushType Push(T value);
492 
493   // Return a promise that resolves when the receiver is closed.
494   // The resolved value is a bool - true if the pipe was cancelled, false if it
495   // was closed successfully.
496   // Checks closed from the senders perspective: that is, if there is a value in
497   // the pipe but the pipe is closed, reports closed.
AwaitClosed()498   auto AwaitClosed() {
499     return [center = center_]() { return center->PollClosedForSender(); };
500   }
501 
502   // Interject PromiseFactory f into the pipeline.
503   // f will be called with the current value traversing the pipe, and should
504   // return a value to replace it with.
505   // Interjects at the Push end of the pipe.
506   template <typename Fn>
507   void InterceptAndMap(Fn f, DebugLocation from = {}) {
508     center_->PrependMap(std::move(f), from);
509   }
510 
511   // Per above, but calls cleanup_fn when the pipe is closed.
512   template <typename Fn, typename OnHalfClose>
513   void InterceptAndMap(Fn f, OnHalfClose cleanup_fn, DebugLocation from = {}) {
514     center_->PrependMapWithCleanup(std::move(f), std::move(cleanup_fn), from);
515   }
516 
517  private:
518   friend struct Pipe<T>;
519   explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
520   RefCountedPtr<pipe_detail::Center<T>> center_;
521 
522   // Make failure to destruct show up in ASAN builds.
523 #ifndef NDEBUG
524   std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
525 #endif
526 };
527 
528 template <typename T>
529 class PipeReceiver;
530 
531 namespace pipe_detail {
532 
533 // Implementation of PipeReceiver::Next promise.
534 template <typename T>
535 class Next {
536  public:
537   Next(const Next&) = delete;
538   Next& operator=(const Next&) = delete;
539   Next(Next&& other) noexcept = default;
540   Next& operator=(Next&& other) noexcept = default;
541 
542   Poll<absl::optional<T>> operator()() {
543     return center_ == nullptr ? absl::nullopt : center_->Next();
544   }
545 
546  private:
547   friend class PipeReceiver<T>;
548   explicit Next(RefCountedPtr<Center<T>> center) : center_(std::move(center)) {}
549 
550   RefCountedPtr<Center<T>> center_;
551 };
552 
553 }  // namespace pipe_detail
554 
555 // Receive end of a Pipe.
556 template <typename T>
557 class PipeReceiver {
558  public:
559   PipeReceiver(const PipeReceiver&) = delete;
560   PipeReceiver& operator=(const PipeReceiver&) = delete;
561   PipeReceiver(PipeReceiver&& other) noexcept = default;
562   PipeReceiver& operator=(PipeReceiver&& other) noexcept = default;
563   ~PipeReceiver() {
564     if (center_ != nullptr) center_->MarkCancelled();
565   }
566 
567   void Swap(PipeReceiver<T>* other) { std::swap(center_, other->center_); }
568 
569   // Receive a single message from the pipe.
570   // Returns a promise that will resolve to an optional<T> - with a value if a
571   // message was received, or no value if the other end of the pipe was closed.
572   // Blocks the promise until the receiver is either closed or a message is
573   // available.
574   auto Next() {
575     return Seq(pipe_detail::Next<T>(center_), [center = center_](
576                                                   absl::optional<T> value) {
577       bool open = value.has_value();
578       bool cancelled = center == nullptr ? true : center->cancelled();
579       return If(
580           open,
581           [center = std::move(center), value = std::move(value)]() mutable {
582             auto run = center->Run(std::move(value));
583             return Map(std::move(run), [center = std::move(center)](
584                                            absl::optional<T> value) mutable {
585               if (value.has_value()) {
586                 center->value() = std::move(*value);
587                 return NextResult<T>(std::move(center));
588               } else {
589                 center->MarkCancelled();
590                 return NextResult<T>(true);
591               }
592             });
593           },
594           [cancelled]() { return NextResult<T>(cancelled); });
595     });
596   }
597 
598   // Return a promise that resolves when the receiver is closed.
599   // The resolved value is a bool - true if the pipe was cancelled, false if it
600   // was closed successfully.
601   // Checks closed from the receivers perspective: that is, if there is a value
602   // in the pipe but the pipe is closed, reports open until that value is read.
603   auto AwaitClosed() {
604     return [center = center_]() -> Poll<bool> {
605       if (center == nullptr) return false;
606       return center->PollClosedForReceiver();
607     };
608   }
609 
610   auto AwaitEmpty() {
611     return [center = center_]() { return center->PollEmpty(); };
612   }
613 
614   void CloseWithError() {
615     if (center_ != nullptr) {
616       center_->MarkCancelled();
617       center_.reset();
618     }
619   }
620 
621   // Interject PromiseFactory f into the pipeline.
622   // f will be called with the current value traversing the pipe, and should
623   // return a value to replace it with.
624   // Interjects at the Next end of the pipe.
625   template <typename Fn>
626   void InterceptAndMap(Fn f, DebugLocation from = {}) {
627     center_->AppendMap(std::move(f), from);
628   }
629 
630   // Per above, but calls cleanup_fn when the pipe is closed.
631   template <typename Fn, typename OnHalfClose>
632   void InterceptAndMapWithHalfClose(Fn f, OnHalfClose cleanup_fn,
633                                     DebugLocation from = {}) {
634     center_->AppendMapWithCleanup(std::move(f), std::move(cleanup_fn), from);
635   }
636 
637  private:
638   friend struct Pipe<T>;
639   explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
640   RefCountedPtr<pipe_detail::Center<T>> center_;
641 };
642 
643 namespace pipe_detail {
644 
645 // Implementation of PipeSender::Push promise.
646 template <typename T>
647 class Push {
648  public:
649   Push(const Push&) = delete;
650 
651   Push& operator=(const Push&) = delete;
652   Push(Push&& other) noexcept = default;
653   Push& operator=(Push&& other) noexcept = default;
654 
655   Poll<bool> operator()() {
656     if (center_ == nullptr) {
657       if (grpc_trace_promise_primitives.enabled()) {
658         gpr_log(GPR_DEBUG, "%s Pipe push has a null center",
659                 GetContext<Activity>()->DebugTag().c_str());
660       }
661       return false;
662     }
663     if (auto* p = absl::get_if<T>(&state_)) {
664       auto r = center_->Push(p);
665       if (auto* ok = r.value_if_ready()) {
666         state_.template emplace<AwaitingAck>();
667         if (!*ok) return false;
668       } else {
669         return Pending{};
670       }
671     }
672     GPR_DEBUG_ASSERT(absl::holds_alternative<AwaitingAck>(state_));
673     return center_->PollAck();
674   }
675 
676  private:
677   struct AwaitingAck {};
678 
679   friend class PipeSender<T>;
680   explicit Push(RefCountedPtr<pipe_detail::Center<T>> center, T push)
681       : center_(std::move(center)), state_(std::move(push)) {}
682 
683   RefCountedPtr<Center<T>> center_;
684   absl::variant<T, AwaitingAck> state_;
685 };
686 
687 }  // namespace pipe_detail
688 
689 template <typename T>
690 pipe_detail::Push<T> PipeSender<T>::Push(T value) {
691   return pipe_detail::Push<T>(center_ == nullptr ? nullptr : center_->Ref(),
692                               std::move(value));
693 }
694 
695 template <typename T>
696 using PipeReceiverNextType = decltype(std::declval<PipeReceiver<T>>().Next());
697 
698 template <typename T>
699 bool NextResult<T>::has_value() const {
700   return center_ != nullptr;
701 }
702 
703 template <typename T>
704 T& NextResult<T>::operator*() {
705   return center_->value();
706 }
707 
708 template <typename T>
709 const T& NextResult<T>::operator*() const {
710   return center_->value();
711 }
712 
713 template <typename T>
714 NextResult<T>::~NextResult() {
715   if (center_ != nullptr) center_->AckNext();
716 }
717 
718 template <typename T>
719 void NextResult<T>::reset() {
720   if (center_ != nullptr) {
721     center_->AckNext();
722     center_.reset();
723   }
724 }
725 
726 // A Pipe is an intra-Activity communications channel that transmits T's from
727 // one end to the other.
728 // It is only safe to use a Pipe within the context of a single Activity.
729 // No synchronization is performed internally.
730 // The primary Pipe data structure is allocated from an arena, so the activity
731 // must have an arena as part of its context.
732 // By performing that allocation we can ensure stable pointer to shared data
733 // allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
734 // implementation.
735 // This type has been optimized with the expectation that there are relatively
736 // few pipes per activity. If this assumption does not hold then a design
737 // allowing inline filtering of pipe contents (instead of connecting pipes with
738 // polling code) would likely be more appropriate.
739 template <typename T>
740 struct Pipe {
741   Pipe() : Pipe(GetContext<Arena>()) {}
742   explicit Pipe(Arena* arena) : Pipe(arena->New<pipe_detail::Center<T>>()) {}
743   Pipe(const Pipe&) = delete;
744   Pipe& operator=(const Pipe&) = delete;
745   Pipe(Pipe&&) noexcept = default;
746   Pipe& operator=(Pipe&&) noexcept = default;
747 
748   PipeSender<T> sender;
749   PipeReceiver<T> receiver;
750 
751  private:
752   explicit Pipe(pipe_detail::Center<T>* center)
753       : sender(center), receiver(center) {}
754 };
755 
756 }  // namespace grpc_core
757 
758 #endif  // GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
759