• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <stdint.h>
20 #include <stdlib.h>
21 
22 #include <memory>
23 #include <string>
24 #include <utility>
25 
26 #include "absl/log/check.h"
27 #include "absl/log/log.h"
28 #include "absl/strings/str_cat.h"
29 #include "absl/types/optional.h"
30 #include "absl/types/variant.h"
31 #include "src/core/lib/promise/activity.h"
32 #include "src/core/lib/promise/context.h"
33 #include "src/core/lib/promise/if.h"
34 #include "src/core/lib/promise/interceptor_list.h"
35 #include "src/core/lib/promise/map.h"
36 #include "src/core/lib/promise/poll.h"
37 #include "src/core/lib/promise/seq.h"
38 #include "src/core/lib/resource_quota/arena.h"
39 #include "src/core/util/debug_location.h"
40 #include "src/core/util/ref_counted_ptr.h"
41 
42 namespace grpc_core {
43 
44 namespace pipe_detail {
45 template <typename T>
46 class Center;
47 }
48 
49 template <typename T>
50 struct Pipe;
51 
52 // Result of Pipe::Next - represents a received value.
53 // If has_value() is false, the pipe was closed by the time we polled for the
54 // next value. No value was received, nor will there ever be.
55 // This type is movable but not copyable.
56 // Once the final move is destroyed the pipe will ack the read and unblock the
57 // send.
58 template <typename T>
59 class NextResult final {
60  public:
NextResult()61   NextResult() : center_(nullptr) {}
NextResult(RefCountedPtr<pipe_detail::Center<T>> center)62   explicit NextResult(RefCountedPtr<pipe_detail::Center<T>> center)
63       : center_(std::move(center)) {
64     CHECK(center_ != nullptr);
65   }
NextResult(bool cancelled)66   explicit NextResult(bool cancelled)
67       : center_(nullptr), cancelled_(cancelled) {}
68   ~NextResult();
69   NextResult(const NextResult&) = delete;
70   NextResult& operator=(const NextResult&) = delete;
71   NextResult(NextResult&& other) noexcept = default;
72   NextResult& operator=(NextResult&& other) noexcept = default;
73 
74   using value_type = T;
75 
76   void reset();
77   bool has_value() const;
78   // Only valid if has_value()
value()79   const T& value() const {
80     CHECK(has_value());
81     return **this;
82   }
value()83   T& value() {
84     CHECK(has_value());
85     return **this;
86   }
87   const T& operator*() const;
88   T& operator*();
89   // Only valid if !has_value()
cancelled()90   bool cancelled() const { return cancelled_; }
91 
92  private:
93   RefCountedPtr<pipe_detail::Center<T>> center_;
94   bool cancelled_;
95 };
96 
97 namespace pipe_detail {
98 
99 template <typename T>
100 class Push;
101 template <typename T>
102 class Next;
103 
104 // Center sits between a sender and a receiver to provide a one-deep buffer of
105 // Ts
106 template <typename T>
107 class Center : public InterceptorList<T> {
108  public:
109   // Initialize with one send ref (held by PipeSender) and one recv ref (held by
110   // PipeReceiver)
Center()111   Center() {
112     refs_ = 2;
113     value_state_ = ValueState::kEmpty;
114   }
115 
116   // Add one ref to this object, and return this.
IncrementRefCount()117   void IncrementRefCount() {
118     GRPC_TRACE_VLOG(promise_primitives, 2)
119         << DebugOpString("IncrementRefCount");
120     refs_++;
121     DCHECK_NE(refs_, 0);
122   }
123 
Ref()124   RefCountedPtr<Center> Ref() {
125     IncrementRefCount();
126     return RefCountedPtr<Center>(this);
127   }
128 
129   // Drop a ref
130   // If no refs remain, destroy this object
Unref()131   void Unref() {
132     GRPC_TRACE_VLOG(promise_primitives, 2) << DebugOpString("Unref");
133     DCHECK_GT(refs_, 0);
134     refs_--;
135     if (0 == refs_) {
136       this->~Center();
137     }
138   }
139 
140   // Try to push *value into the pipe.
141   // Return Pending if there is no space.
142   // Return true if the value was pushed.
143   // Return false if the recv end is closed.
Push(T * value)144   Poll<bool> Push(T* value) {
145     GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("Push");
146     DCHECK_NE(refs_, 0);
147     switch (value_state_) {
148       case ValueState::kClosed:
149       case ValueState::kReadyClosed:
150       case ValueState::kCancelled:
151       case ValueState::kWaitingForAckAndClosed:
152         return false;
153       case ValueState::kReady:
154       case ValueState::kAcked:
155       case ValueState::kWaitingForAck:
156         return on_empty_.pending();
157       case ValueState::kEmpty:
158         value_state_ = ValueState::kReady;
159         value_ = std::move(*value);
160         on_full_.Wake();
161         return true;
162     }
163     GPR_UNREACHABLE_CODE(return false);
164   }
165 
PollAck()166   Poll<bool> PollAck() {
167     GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("PollAck");
168     DCHECK_NE(refs_, 0);
169     switch (value_state_) {
170       case ValueState::kClosed:
171         return true;
172       case ValueState::kCancelled:
173         return false;
174       case ValueState::kReady:
175       case ValueState::kReadyClosed:
176       case ValueState::kEmpty:
177       case ValueState::kWaitingForAck:
178       case ValueState::kWaitingForAckAndClosed:
179         return on_empty_.pending();
180       case ValueState::kAcked:
181         value_state_ = ValueState::kEmpty;
182         on_empty_.Wake();
183         return true;
184     }
185     return true;
186   }
187 
188   // Try to receive a value from the pipe.
189   // Return Pending if there is no value.
190   // Return the value if one was retrieved.
191   // Return nullopt if the send end is closed and no value had been pushed.
Next()192   Poll<absl::optional<T>> Next() {
193     GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("Next");
194     DCHECK_NE(refs_, 0);
195     switch (value_state_) {
196       case ValueState::kEmpty:
197       case ValueState::kAcked:
198       case ValueState::kWaitingForAck:
199       case ValueState::kWaitingForAckAndClosed:
200         return on_full_.pending();
201       case ValueState::kReadyClosed:
202         value_state_ = ValueState::kWaitingForAckAndClosed;
203         return std::move(value_);
204       case ValueState::kReady:
205         value_state_ = ValueState::kWaitingForAck;
206         return std::move(value_);
207       case ValueState::kClosed:
208       case ValueState::kCancelled:
209         return absl::nullopt;
210     }
211     GPR_UNREACHABLE_CODE(return absl::nullopt);
212   }
213 
214   // Check if the pipe is closed for sending (if there is a value still queued
215   // but the pipe is closed, reports closed).
PollClosedForSender()216   Poll<bool> PollClosedForSender() {
217     GRPC_TRACE_LOG(promise_primitives, INFO)
218         << DebugOpString("PollClosedForSender");
219     DCHECK_NE(refs_, 0);
220     switch (value_state_) {
221       case ValueState::kEmpty:
222       case ValueState::kAcked:
223       case ValueState::kReady:
224       case ValueState::kWaitingForAck:
225         return on_closed_.pending();
226       case ValueState::kWaitingForAckAndClosed:
227       case ValueState::kReadyClosed:
228       case ValueState::kClosed:
229         return false;
230       case ValueState::kCancelled:
231         return true;
232     }
233     GPR_UNREACHABLE_CODE(return true);
234   }
235 
236   // Check if the pipe is closed for receiving (if there is a value still queued
237   // but the pipe is closed, reports open).
PollClosedForReceiver()238   Poll<bool> PollClosedForReceiver() {
239     GRPC_TRACE_LOG(promise_primitives, INFO)
240         << DebugOpString("PollClosedForReceiver");
241     DCHECK_NE(refs_, 0);
242     switch (value_state_) {
243       case ValueState::kEmpty:
244       case ValueState::kAcked:
245       case ValueState::kReady:
246       case ValueState::kReadyClosed:
247       case ValueState::kWaitingForAck:
248       case ValueState::kWaitingForAckAndClosed:
249         return on_closed_.pending();
250       case ValueState::kClosed:
251         return false;
252       case ValueState::kCancelled:
253         return true;
254     }
255     GPR_UNREACHABLE_CODE(return true);
256   }
257 
PollEmpty()258   Poll<Empty> PollEmpty() {
259     GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("PollEmpty");
260     DCHECK_NE(refs_, 0);
261     switch (value_state_) {
262       case ValueState::kReady:
263       case ValueState::kReadyClosed:
264         return on_empty_.pending();
265       case ValueState::kWaitingForAck:
266       case ValueState::kWaitingForAckAndClosed:
267       case ValueState::kAcked:
268       case ValueState::kEmpty:
269       case ValueState::kClosed:
270       case ValueState::kCancelled:
271         return Empty{};
272     }
273     GPR_UNREACHABLE_CODE(return Empty{});
274   }
275 
AckNext()276   void AckNext() {
277     GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("AckNext");
278     switch (value_state_) {
279       case ValueState::kReady:
280       case ValueState::kWaitingForAck:
281         value_state_ = ValueState::kAcked;
282         on_empty_.Wake();
283         break;
284       case ValueState::kReadyClosed:
285       case ValueState::kWaitingForAckAndClosed:
286         this->ResetInterceptorList();
287         value_state_ = ValueState::kClosed;
288         on_closed_.Wake();
289         on_empty_.Wake();
290         on_full_.Wake();
291         break;
292       case ValueState::kClosed:
293       case ValueState::kCancelled:
294         break;
295       case ValueState::kEmpty:
296       case ValueState::kAcked:
297         abort();
298     }
299   }
300 
MarkClosed()301   void MarkClosed() {
302     GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("MarkClosed");
303     switch (value_state_) {
304       case ValueState::kEmpty:
305       case ValueState::kAcked:
306         this->ResetInterceptorList();
307         value_state_ = ValueState::kClosed;
308         on_empty_.Wake();
309         on_full_.Wake();
310         on_closed_.Wake();
311         break;
312       case ValueState::kReady:
313         value_state_ = ValueState::kReadyClosed;
314         on_closed_.Wake();
315         break;
316       case ValueState::kWaitingForAck:
317         value_state_ = ValueState::kWaitingForAckAndClosed;
318         on_closed_.Wake();
319         break;
320       case ValueState::kReadyClosed:
321       case ValueState::kClosed:
322       case ValueState::kCancelled:
323       case ValueState::kWaitingForAckAndClosed:
324         break;
325     }
326   }
327 
MarkCancelled()328   void MarkCancelled() {
329     GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("MarkCancelled");
330     switch (value_state_) {
331       case ValueState::kEmpty:
332       case ValueState::kAcked:
333       case ValueState::kReady:
334       case ValueState::kReadyClosed:
335       case ValueState::kWaitingForAck:
336       case ValueState::kWaitingForAckAndClosed:
337         this->ResetInterceptorList();
338         value_state_ = ValueState::kCancelled;
339         on_empty_.Wake();
340         on_full_.Wake();
341         on_closed_.Wake();
342         break;
343       case ValueState::kClosed:
344       case ValueState::kCancelled:
345         break;
346     }
347   }
348 
cancelled()349   bool cancelled() { return value_state_ == ValueState::kCancelled; }
350 
value()351   T& value() { return value_; }
value()352   const T& value() const { return value_; }
353 
DebugTag()354   std::string DebugTag() {
355     if (auto* activity = GetContext<Activity>()) {
356       return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this),
357                           "]: ");
358     } else {
359       return absl::StrCat("PIPE[0x", reinterpret_cast<uintptr_t>(this), "]: ");
360     }
361   }
362 
363  private:
364   // State of value_.
365   enum class ValueState : uint8_t {
366     // No value is set, it's possible to send.
367     kEmpty,
368     // Value has been pushed but not acked, it's possible to receive.
369     kReady,
370     // Value has been read and not acked, both send/receive blocked until ack.
371     kWaitingForAck,
372     // Value has been received and acked, we can unblock senders and transition
373     // to empty.
374     kAcked,
375     // Pipe is closed successfully, no more values can be sent
376     kClosed,
377     // Pipe is closed successfully, no more values can be sent
378     // (but one value is queued and ready to be received)
379     kReadyClosed,
380     // Pipe is closed successfully, no more values can be sent
381     // (but one value is queued and waiting to be acked)
382     kWaitingForAckAndClosed,
383     // Pipe is closed unsuccessfully, no more values can be sent
384     kCancelled,
385   };
386 
DebugOpString(std::string op)387   std::string DebugOpString(std::string op) {
388     return absl::StrCat(DebugTag(), op, " refs=", refs_,
389                         " value_state=", ValueStateName(value_state_),
390                         " on_empty=", on_empty_.DebugString().c_str(),
391                         " on_full=", on_full_.DebugString().c_str(),
392                         " on_closed=", on_closed_.DebugString().c_str());
393   }
394 
ValueStateName(ValueState state)395   static const char* ValueStateName(ValueState state) {
396     switch (state) {
397       case ValueState::kEmpty:
398         return "Empty";
399       case ValueState::kReady:
400         return "Ready";
401       case ValueState::kAcked:
402         return "Acked";
403       case ValueState::kClosed:
404         return "Closed";
405       case ValueState::kReadyClosed:
406         return "ReadyClosed";
407       case ValueState::kWaitingForAck:
408         return "WaitingForAck";
409       case ValueState::kWaitingForAckAndClosed:
410         return "WaitingForAckAndClosed";
411       case ValueState::kCancelled:
412         return "Cancelled";
413     }
414     GPR_UNREACHABLE_CODE(return "unknown");
415   }
416 
417   T value_;
418   // Number of refs
419   uint8_t refs_;
420   // Current state of the value.
421   ValueState value_state_;
422   IntraActivityWaiter on_empty_;
423   IntraActivityWaiter on_full_;
424   IntraActivityWaiter on_closed_;
425 
426   // Make failure to destruct show up in ASAN builds.
427 #ifndef NDEBUG
428   std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
429 #endif
430 };
431 
432 }  // namespace pipe_detail
433 
434 // Send end of a Pipe.
435 template <typename T>
436 class PipeSender {
437  public:
438   using PushType = pipe_detail::Push<T>;
439 
440   PipeSender(const PipeSender&) = delete;
441   PipeSender& operator=(const PipeSender&) = delete;
442   PipeSender(PipeSender&& other) noexcept = default;
443   PipeSender& operator=(PipeSender&& other) noexcept = default;
444 
~PipeSender()445   ~PipeSender() {
446     if (center_ != nullptr) center_->MarkClosed();
447   }
448 
Close()449   void Close() {
450     if (center_ != nullptr) {
451       center_->MarkClosed();
452       center_.reset();
453     }
454   }
455 
CloseWithError()456   void CloseWithError() {
457     if (center_ != nullptr) {
458       center_->MarkCancelled();
459       center_.reset();
460     }
461   }
462 
Swap(PipeSender<T> * other)463   void Swap(PipeSender<T>* other) { std::swap(center_, other->center_); }
464 
465   // Send a single message along the pipe.
466   // Returns a promise that will resolve to a bool - true if the message was
467   // sent, false if it could never be sent. Blocks the promise until the
468   // receiver is either closed or able to receive another message.
469   PushType Push(T value);
470 
471   // Return a promise that resolves when the receiver is closed.
472   // The resolved value is a bool - true if the pipe was cancelled, false if it
473   // was closed successfully.
474   // Checks closed from the senders perspective: that is, if there is a value in
475   // the pipe but the pipe is closed, reports closed.
AwaitClosed()476   auto AwaitClosed() {
477     return [center = center_]() { return center->PollClosedForSender(); };
478   }
479 
480   // Interject PromiseFactory f into the pipeline.
481   // f will be called with the current value traversing the pipe, and should
482   // return a value to replace it with.
483   // Interjects at the Push end of the pipe.
484   template <typename Fn>
485   void InterceptAndMap(Fn f, DebugLocation from = {}) {
486     center_->PrependMap(std::move(f), from);
487   }
488 
489   // Per above, but calls cleanup_fn when the pipe is closed.
490   template <typename Fn, typename OnHalfClose>
491   void InterceptAndMap(Fn f, OnHalfClose cleanup_fn, DebugLocation from = {}) {
492     center_->PrependMapWithCleanup(std::move(f), std::move(cleanup_fn), from);
493   }
494 
495  private:
496   friend struct Pipe<T>;
497   explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {}
498   RefCountedPtr<pipe_detail::Center<T>> center_;
499 
500   // Make failure to destruct show up in ASAN builds.
501 #ifndef NDEBUG
502   std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0);
503 #endif
504 };
505 
506 template <typename T>
507 class PipeReceiver;
508 
509 namespace pipe_detail {
510 
511 // Implementation of PipeReceiver::Next promise.
512 template <typename T>
513 class Next {
514  public:
515   Next(const Next&) = delete;
516   Next& operator=(const Next&) = delete;
517   Next(Next&& other) noexcept = default;
518   Next& operator=(Next&& other) noexcept = default;
519 
520   Poll<absl::optional<T>> operator()() {
521     return center_ == nullptr ? absl::nullopt : center_->Next();
522   }
523 
524  private:
525   friend class PipeReceiver<T>;
526   explicit Next(RefCountedPtr<Center<T>> center) : center_(std::move(center)) {}
527 
528   RefCountedPtr<Center<T>> center_;
529 };
530 
531 }  // namespace pipe_detail
532 
533 // Receive end of a Pipe.
534 template <typename T>
535 class PipeReceiver {
536  public:
537   PipeReceiver(const PipeReceiver&) = delete;
538   PipeReceiver& operator=(const PipeReceiver&) = delete;
539   PipeReceiver(PipeReceiver&& other) noexcept = default;
540   PipeReceiver& operator=(PipeReceiver&& other) noexcept = default;
541   ~PipeReceiver() {
542     if (center_ != nullptr) center_->MarkCancelled();
543   }
544 
545   void Swap(PipeReceiver<T>* other) { std::swap(center_, other->center_); }
546 
547   // Receive a single message from the pipe.
548   // Returns a promise that will resolve to an optional<T> - with a value if a
549   // message was received, or no value if the other end of the pipe was closed.
550   // Blocks the promise until the receiver is either closed or a message is
551   // available.
552   auto Next() {
553     return Seq(pipe_detail::Next<T>(center_), [center = center_](
554                                                   absl::optional<T> value) {
555       bool open = value.has_value();
556       bool cancelled = center == nullptr ? true : center->cancelled();
557       return If(
558           open,
559           [center = std::move(center), value = std::move(value)]() mutable {
560             auto run = center->Run(std::move(value));
561             return Map(std::move(run), [center = std::move(center)](
562                                            absl::optional<T> value) mutable {
563               if (value.has_value()) {
564                 center->value() = std::move(*value);
565                 return NextResult<T>(std::move(center));
566               } else {
567                 center->MarkCancelled();
568                 return NextResult<T>(true);
569               }
570             });
571           },
572           [cancelled]() { return NextResult<T>(cancelled); });
573     });
574   }
575 
576   // Return a promise that resolves when the receiver is closed.
577   // The resolved value is a bool - true if the pipe was cancelled, false if it
578   // was closed successfully.
579   // Checks closed from the receivers perspective: that is, if there is a value
580   // in the pipe but the pipe is closed, reports open until that value is read.
581   auto AwaitClosed() {
582     return [center = center_]() -> Poll<bool> {
583       if (center == nullptr) return false;
584       return center->PollClosedForReceiver();
585     };
586   }
587 
588   auto AwaitEmpty() {
589     return [center = center_]() { return center->PollEmpty(); };
590   }
591 
592   void CloseWithError() {
593     if (center_ != nullptr) {
594       center_->MarkCancelled();
595       center_.reset();
596     }
597   }
598 
599   // Interject PromiseFactory f into the pipeline.
600   // f will be called with the current value traversing the pipe, and should
601   // return a value to replace it with.
602   // Interjects at the Next end of the pipe.
603   template <typename Fn>
604   void InterceptAndMap(Fn f, DebugLocation from = {}) {
605     center_->AppendMap(std::move(f), from);
606   }
607 
608   // Per above, but calls cleanup_fn when the pipe is closed.
609   template <typename Fn, typename OnHalfClose>
610   void InterceptAndMapWithHalfClose(Fn f, OnHalfClose cleanup_fn,
611                                     DebugLocation from = {}) {
612     center_->AppendMapWithCleanup(std::move(f), std::move(cleanup_fn), from);
613   }
614 
615  private:
616   friend struct Pipe<T>;
617   explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {}
618   RefCountedPtr<pipe_detail::Center<T>> center_;
619 };
620 
621 namespace pipe_detail {
622 
623 // Implementation of PipeSender::Push promise.
624 template <typename T>
625 class Push {
626  public:
627   Push(const Push&) = delete;
628 
629   Push& operator=(const Push&) = delete;
630   Push(Push&& other) noexcept = default;
631   Push& operator=(Push&& other) noexcept = default;
632 
633   Poll<bool> operator()() {
634     if (center_ == nullptr) {
635       GRPC_TRACE_VLOG(promise_primitives, 2)
636           << GetContext<Activity>()->DebugTag()
637           << " Pipe push has a null center";
638       return false;
639     }
640     if (auto* p = absl::get_if<T>(&state_)) {
641       auto r = center_->Push(p);
642       if (auto* ok = r.value_if_ready()) {
643         state_.template emplace<AwaitingAck>();
644         if (!*ok) return false;
645       } else {
646         return Pending{};
647       }
648     }
649     DCHECK(absl::holds_alternative<AwaitingAck>(state_));
650     return center_->PollAck();
651   }
652 
653  private:
654   struct AwaitingAck {};
655 
656   friend class PipeSender<T>;
657   explicit Push(RefCountedPtr<pipe_detail::Center<T>> center, T push)
658       : center_(std::move(center)), state_(std::move(push)) {}
659 
660   RefCountedPtr<Center<T>> center_;
661   absl::variant<T, AwaitingAck> state_;
662 };
663 
664 }  // namespace pipe_detail
665 
666 template <typename T>
667 pipe_detail::Push<T> PipeSender<T>::Push(T value) {
668   return pipe_detail::Push<T>(center_ == nullptr ? nullptr : center_->Ref(),
669                               std::move(value));
670 }
671 
672 template <typename T>
673 using PipeReceiverNextType = decltype(std::declval<PipeReceiver<T>>().Next());
674 
675 template <typename T>
676 bool NextResult<T>::has_value() const {
677   return center_ != nullptr;
678 }
679 
680 template <typename T>
681 T& NextResult<T>::operator*() {
682   return center_->value();
683 }
684 
685 template <typename T>
686 const T& NextResult<T>::operator*() const {
687   return center_->value();
688 }
689 
690 template <typename T>
691 NextResult<T>::~NextResult() {
692   if (center_ != nullptr) center_->AckNext();
693 }
694 
695 template <typename T>
696 void NextResult<T>::reset() {
697   if (center_ != nullptr) {
698     center_->AckNext();
699     center_.reset();
700   }
701 }
702 
703 // A Pipe is an intra-Activity communications channel that transmits T's from
704 // one end to the other.
705 // It is only safe to use a Pipe within the context of a single Activity.
706 // No synchronization is performed internally.
707 // The primary Pipe data structure is allocated from an arena, so the activity
708 // must have an arena as part of its context.
709 // By performing that allocation we can ensure stable pointer to shared data
710 // allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their
711 // implementation.
712 // This type has been optimized with the expectation that there are relatively
713 // few pipes per activity. If this assumption does not hold then a design
714 // allowing inline filtering of pipe contents (instead of connecting pipes with
715 // polling code) would likely be more appropriate.
716 template <typename T>
717 struct Pipe {
718   Pipe() : Pipe(GetContext<Arena>()) {}
719   explicit Pipe(Arena* arena) : Pipe(arena->New<pipe_detail::Center<T>>()) {}
720   Pipe(const Pipe&) = delete;
721   Pipe& operator=(const Pipe&) = delete;
722   Pipe(Pipe&&) noexcept = default;
723   Pipe& operator=(Pipe&&) noexcept = default;
724 
725   PipeSender<T> sender;
726   PipeReceiver<T> receiver;
727 
728  private:
729   explicit Pipe(pipe_detail::Center<T>* center)
730       : sender(center), receiver(center) {}
731 };
732 
733 }  // namespace grpc_core
734 
735 #endif  // GRPC_SRC_CORE_LIB_PROMISE_PIPE_H
736