1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_PIPE_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_PIPE_H 17 18 #include <grpc/support/port_platform.h> 19 #include <stdint.h> 20 #include <stdlib.h> 21 22 #include <memory> 23 #include <string> 24 #include <utility> 25 26 #include "absl/log/check.h" 27 #include "absl/log/log.h" 28 #include "absl/strings/str_cat.h" 29 #include "absl/types/optional.h" 30 #include "absl/types/variant.h" 31 #include "src/core/lib/promise/activity.h" 32 #include "src/core/lib/promise/context.h" 33 #include "src/core/lib/promise/if.h" 34 #include "src/core/lib/promise/interceptor_list.h" 35 #include "src/core/lib/promise/map.h" 36 #include "src/core/lib/promise/poll.h" 37 #include "src/core/lib/promise/seq.h" 38 #include "src/core/lib/resource_quota/arena.h" 39 #include "src/core/util/debug_location.h" 40 #include "src/core/util/ref_counted_ptr.h" 41 42 namespace grpc_core { 43 44 namespace pipe_detail { 45 template <typename T> 46 class Center; 47 } 48 49 template <typename T> 50 struct Pipe; 51 52 // Result of Pipe::Next - represents a received value. 53 // If has_value() is false, the pipe was closed by the time we polled for the 54 // next value. No value was received, nor will there ever be. 55 // This type is movable but not copyable. 56 // Once the final move is destroyed the pipe will ack the read and unblock the 57 // send. 58 template <typename T> 59 class NextResult final { 60 public: NextResult()61 NextResult() : center_(nullptr) {} NextResult(RefCountedPtr<pipe_detail::Center<T>> center)62 explicit NextResult(RefCountedPtr<pipe_detail::Center<T>> center) 63 : center_(std::move(center)) { 64 CHECK(center_ != nullptr); 65 } NextResult(bool cancelled)66 explicit NextResult(bool cancelled) 67 : center_(nullptr), cancelled_(cancelled) {} 68 ~NextResult(); 69 NextResult(const NextResult&) = delete; 70 NextResult& operator=(const NextResult&) = delete; 71 NextResult(NextResult&& other) noexcept = default; 72 NextResult& operator=(NextResult&& other) noexcept = default; 73 74 using value_type = T; 75 76 void reset(); 77 bool has_value() const; 78 // Only valid if has_value() value()79 const T& value() const { 80 CHECK(has_value()); 81 return **this; 82 } value()83 T& value() { 84 CHECK(has_value()); 85 return **this; 86 } 87 const T& operator*() const; 88 T& operator*(); 89 // Only valid if !has_value() cancelled()90 bool cancelled() const { return cancelled_; } 91 92 private: 93 RefCountedPtr<pipe_detail::Center<T>> center_; 94 bool cancelled_; 95 }; 96 97 namespace pipe_detail { 98 99 template <typename T> 100 class Push; 101 template <typename T> 102 class Next; 103 104 // Center sits between a sender and a receiver to provide a one-deep buffer of 105 // Ts 106 template <typename T> 107 class Center : public InterceptorList<T> { 108 public: 109 // Initialize with one send ref (held by PipeSender) and one recv ref (held by 110 // PipeReceiver) Center()111 Center() { 112 refs_ = 2; 113 value_state_ = ValueState::kEmpty; 114 } 115 116 // Add one ref to this object, and return this. IncrementRefCount()117 void IncrementRefCount() { 118 GRPC_TRACE_VLOG(promise_primitives, 2) 119 << DebugOpString("IncrementRefCount"); 120 refs_++; 121 DCHECK_NE(refs_, 0); 122 } 123 Ref()124 RefCountedPtr<Center> Ref() { 125 IncrementRefCount(); 126 return RefCountedPtr<Center>(this); 127 } 128 129 // Drop a ref 130 // If no refs remain, destroy this object Unref()131 void Unref() { 132 GRPC_TRACE_VLOG(promise_primitives, 2) << DebugOpString("Unref"); 133 DCHECK_GT(refs_, 0); 134 refs_--; 135 if (0 == refs_) { 136 this->~Center(); 137 } 138 } 139 140 // Try to push *value into the pipe. 141 // Return Pending if there is no space. 142 // Return true if the value was pushed. 143 // Return false if the recv end is closed. Push(T * value)144 Poll<bool> Push(T* value) { 145 GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("Push"); 146 DCHECK_NE(refs_, 0); 147 switch (value_state_) { 148 case ValueState::kClosed: 149 case ValueState::kReadyClosed: 150 case ValueState::kCancelled: 151 case ValueState::kWaitingForAckAndClosed: 152 return false; 153 case ValueState::kReady: 154 case ValueState::kAcked: 155 case ValueState::kWaitingForAck: 156 return on_empty_.pending(); 157 case ValueState::kEmpty: 158 value_state_ = ValueState::kReady; 159 value_ = std::move(*value); 160 on_full_.Wake(); 161 return true; 162 } 163 GPR_UNREACHABLE_CODE(return false); 164 } 165 PollAck()166 Poll<bool> PollAck() { 167 GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("PollAck"); 168 DCHECK_NE(refs_, 0); 169 switch (value_state_) { 170 case ValueState::kClosed: 171 return true; 172 case ValueState::kCancelled: 173 return false; 174 case ValueState::kReady: 175 case ValueState::kReadyClosed: 176 case ValueState::kEmpty: 177 case ValueState::kWaitingForAck: 178 case ValueState::kWaitingForAckAndClosed: 179 return on_empty_.pending(); 180 case ValueState::kAcked: 181 value_state_ = ValueState::kEmpty; 182 on_empty_.Wake(); 183 return true; 184 } 185 return true; 186 } 187 188 // Try to receive a value from the pipe. 189 // Return Pending if there is no value. 190 // Return the value if one was retrieved. 191 // Return nullopt if the send end is closed and no value had been pushed. Next()192 Poll<absl::optional<T>> Next() { 193 GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("Next"); 194 DCHECK_NE(refs_, 0); 195 switch (value_state_) { 196 case ValueState::kEmpty: 197 case ValueState::kAcked: 198 case ValueState::kWaitingForAck: 199 case ValueState::kWaitingForAckAndClosed: 200 return on_full_.pending(); 201 case ValueState::kReadyClosed: 202 value_state_ = ValueState::kWaitingForAckAndClosed; 203 return std::move(value_); 204 case ValueState::kReady: 205 value_state_ = ValueState::kWaitingForAck; 206 return std::move(value_); 207 case ValueState::kClosed: 208 case ValueState::kCancelled: 209 return absl::nullopt; 210 } 211 GPR_UNREACHABLE_CODE(return absl::nullopt); 212 } 213 214 // Check if the pipe is closed for sending (if there is a value still queued 215 // but the pipe is closed, reports closed). PollClosedForSender()216 Poll<bool> PollClosedForSender() { 217 GRPC_TRACE_LOG(promise_primitives, INFO) 218 << DebugOpString("PollClosedForSender"); 219 DCHECK_NE(refs_, 0); 220 switch (value_state_) { 221 case ValueState::kEmpty: 222 case ValueState::kAcked: 223 case ValueState::kReady: 224 case ValueState::kWaitingForAck: 225 return on_closed_.pending(); 226 case ValueState::kWaitingForAckAndClosed: 227 case ValueState::kReadyClosed: 228 case ValueState::kClosed: 229 return false; 230 case ValueState::kCancelled: 231 return true; 232 } 233 GPR_UNREACHABLE_CODE(return true); 234 } 235 236 // Check if the pipe is closed for receiving (if there is a value still queued 237 // but the pipe is closed, reports open). PollClosedForReceiver()238 Poll<bool> PollClosedForReceiver() { 239 GRPC_TRACE_LOG(promise_primitives, INFO) 240 << DebugOpString("PollClosedForReceiver"); 241 DCHECK_NE(refs_, 0); 242 switch (value_state_) { 243 case ValueState::kEmpty: 244 case ValueState::kAcked: 245 case ValueState::kReady: 246 case ValueState::kReadyClosed: 247 case ValueState::kWaitingForAck: 248 case ValueState::kWaitingForAckAndClosed: 249 return on_closed_.pending(); 250 case ValueState::kClosed: 251 return false; 252 case ValueState::kCancelled: 253 return true; 254 } 255 GPR_UNREACHABLE_CODE(return true); 256 } 257 PollEmpty()258 Poll<Empty> PollEmpty() { 259 GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("PollEmpty"); 260 DCHECK_NE(refs_, 0); 261 switch (value_state_) { 262 case ValueState::kReady: 263 case ValueState::kReadyClosed: 264 return on_empty_.pending(); 265 case ValueState::kWaitingForAck: 266 case ValueState::kWaitingForAckAndClosed: 267 case ValueState::kAcked: 268 case ValueState::kEmpty: 269 case ValueState::kClosed: 270 case ValueState::kCancelled: 271 return Empty{}; 272 } 273 GPR_UNREACHABLE_CODE(return Empty{}); 274 } 275 AckNext()276 void AckNext() { 277 GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("AckNext"); 278 switch (value_state_) { 279 case ValueState::kReady: 280 case ValueState::kWaitingForAck: 281 value_state_ = ValueState::kAcked; 282 on_empty_.Wake(); 283 break; 284 case ValueState::kReadyClosed: 285 case ValueState::kWaitingForAckAndClosed: 286 this->ResetInterceptorList(); 287 value_state_ = ValueState::kClosed; 288 on_closed_.Wake(); 289 on_empty_.Wake(); 290 on_full_.Wake(); 291 break; 292 case ValueState::kClosed: 293 case ValueState::kCancelled: 294 break; 295 case ValueState::kEmpty: 296 case ValueState::kAcked: 297 abort(); 298 } 299 } 300 MarkClosed()301 void MarkClosed() { 302 GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("MarkClosed"); 303 switch (value_state_) { 304 case ValueState::kEmpty: 305 case ValueState::kAcked: 306 this->ResetInterceptorList(); 307 value_state_ = ValueState::kClosed; 308 on_empty_.Wake(); 309 on_full_.Wake(); 310 on_closed_.Wake(); 311 break; 312 case ValueState::kReady: 313 value_state_ = ValueState::kReadyClosed; 314 on_closed_.Wake(); 315 break; 316 case ValueState::kWaitingForAck: 317 value_state_ = ValueState::kWaitingForAckAndClosed; 318 on_closed_.Wake(); 319 break; 320 case ValueState::kReadyClosed: 321 case ValueState::kClosed: 322 case ValueState::kCancelled: 323 case ValueState::kWaitingForAckAndClosed: 324 break; 325 } 326 } 327 MarkCancelled()328 void MarkCancelled() { 329 GRPC_TRACE_LOG(promise_primitives, INFO) << DebugOpString("MarkCancelled"); 330 switch (value_state_) { 331 case ValueState::kEmpty: 332 case ValueState::kAcked: 333 case ValueState::kReady: 334 case ValueState::kReadyClosed: 335 case ValueState::kWaitingForAck: 336 case ValueState::kWaitingForAckAndClosed: 337 this->ResetInterceptorList(); 338 value_state_ = ValueState::kCancelled; 339 on_empty_.Wake(); 340 on_full_.Wake(); 341 on_closed_.Wake(); 342 break; 343 case ValueState::kClosed: 344 case ValueState::kCancelled: 345 break; 346 } 347 } 348 cancelled()349 bool cancelled() { return value_state_ == ValueState::kCancelled; } 350 value()351 T& value() { return value_; } value()352 const T& value() const { return value_; } 353 DebugTag()354 std::string DebugTag() { 355 if (auto* activity = GetContext<Activity>()) { 356 return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this), 357 "]: "); 358 } else { 359 return absl::StrCat("PIPE[0x", reinterpret_cast<uintptr_t>(this), "]: "); 360 } 361 } 362 363 private: 364 // State of value_. 365 enum class ValueState : uint8_t { 366 // No value is set, it's possible to send. 367 kEmpty, 368 // Value has been pushed but not acked, it's possible to receive. 369 kReady, 370 // Value has been read and not acked, both send/receive blocked until ack. 371 kWaitingForAck, 372 // Value has been received and acked, we can unblock senders and transition 373 // to empty. 374 kAcked, 375 // Pipe is closed successfully, no more values can be sent 376 kClosed, 377 // Pipe is closed successfully, no more values can be sent 378 // (but one value is queued and ready to be received) 379 kReadyClosed, 380 // Pipe is closed successfully, no more values can be sent 381 // (but one value is queued and waiting to be acked) 382 kWaitingForAckAndClosed, 383 // Pipe is closed unsuccessfully, no more values can be sent 384 kCancelled, 385 }; 386 DebugOpString(std::string op)387 std::string DebugOpString(std::string op) { 388 return absl::StrCat(DebugTag(), op, " refs=", refs_, 389 " value_state=", ValueStateName(value_state_), 390 " on_empty=", on_empty_.DebugString().c_str(), 391 " on_full=", on_full_.DebugString().c_str(), 392 " on_closed=", on_closed_.DebugString().c_str()); 393 } 394 ValueStateName(ValueState state)395 static const char* ValueStateName(ValueState state) { 396 switch (state) { 397 case ValueState::kEmpty: 398 return "Empty"; 399 case ValueState::kReady: 400 return "Ready"; 401 case ValueState::kAcked: 402 return "Acked"; 403 case ValueState::kClosed: 404 return "Closed"; 405 case ValueState::kReadyClosed: 406 return "ReadyClosed"; 407 case ValueState::kWaitingForAck: 408 return "WaitingForAck"; 409 case ValueState::kWaitingForAckAndClosed: 410 return "WaitingForAckAndClosed"; 411 case ValueState::kCancelled: 412 return "Cancelled"; 413 } 414 GPR_UNREACHABLE_CODE(return "unknown"); 415 } 416 417 T value_; 418 // Number of refs 419 uint8_t refs_; 420 // Current state of the value. 421 ValueState value_state_; 422 IntraActivityWaiter on_empty_; 423 IntraActivityWaiter on_full_; 424 IntraActivityWaiter on_closed_; 425 426 // Make failure to destruct show up in ASAN builds. 427 #ifndef NDEBUG 428 std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0); 429 #endif 430 }; 431 432 } // namespace pipe_detail 433 434 // Send end of a Pipe. 435 template <typename T> 436 class PipeSender { 437 public: 438 using PushType = pipe_detail::Push<T>; 439 440 PipeSender(const PipeSender&) = delete; 441 PipeSender& operator=(const PipeSender&) = delete; 442 PipeSender(PipeSender&& other) noexcept = default; 443 PipeSender& operator=(PipeSender&& other) noexcept = default; 444 ~PipeSender()445 ~PipeSender() { 446 if (center_ != nullptr) center_->MarkClosed(); 447 } 448 Close()449 void Close() { 450 if (center_ != nullptr) { 451 center_->MarkClosed(); 452 center_.reset(); 453 } 454 } 455 CloseWithError()456 void CloseWithError() { 457 if (center_ != nullptr) { 458 center_->MarkCancelled(); 459 center_.reset(); 460 } 461 } 462 Swap(PipeSender<T> * other)463 void Swap(PipeSender<T>* other) { std::swap(center_, other->center_); } 464 465 // Send a single message along the pipe. 466 // Returns a promise that will resolve to a bool - true if the message was 467 // sent, false if it could never be sent. Blocks the promise until the 468 // receiver is either closed or able to receive another message. 469 PushType Push(T value); 470 471 // Return a promise that resolves when the receiver is closed. 472 // The resolved value is a bool - true if the pipe was cancelled, false if it 473 // was closed successfully. 474 // Checks closed from the senders perspective: that is, if there is a value in 475 // the pipe but the pipe is closed, reports closed. AwaitClosed()476 auto AwaitClosed() { 477 return [center = center_]() { return center->PollClosedForSender(); }; 478 } 479 480 // Interject PromiseFactory f into the pipeline. 481 // f will be called with the current value traversing the pipe, and should 482 // return a value to replace it with. 483 // Interjects at the Push end of the pipe. 484 template <typename Fn> 485 void InterceptAndMap(Fn f, DebugLocation from = {}) { 486 center_->PrependMap(std::move(f), from); 487 } 488 489 // Per above, but calls cleanup_fn when the pipe is closed. 490 template <typename Fn, typename OnHalfClose> 491 void InterceptAndMap(Fn f, OnHalfClose cleanup_fn, DebugLocation from = {}) { 492 center_->PrependMapWithCleanup(std::move(f), std::move(cleanup_fn), from); 493 } 494 495 private: 496 friend struct Pipe<T>; 497 explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {} 498 RefCountedPtr<pipe_detail::Center<T>> center_; 499 500 // Make failure to destruct show up in ASAN builds. 501 #ifndef NDEBUG 502 std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0); 503 #endif 504 }; 505 506 template <typename T> 507 class PipeReceiver; 508 509 namespace pipe_detail { 510 511 // Implementation of PipeReceiver::Next promise. 512 template <typename T> 513 class Next { 514 public: 515 Next(const Next&) = delete; 516 Next& operator=(const Next&) = delete; 517 Next(Next&& other) noexcept = default; 518 Next& operator=(Next&& other) noexcept = default; 519 520 Poll<absl::optional<T>> operator()() { 521 return center_ == nullptr ? absl::nullopt : center_->Next(); 522 } 523 524 private: 525 friend class PipeReceiver<T>; 526 explicit Next(RefCountedPtr<Center<T>> center) : center_(std::move(center)) {} 527 528 RefCountedPtr<Center<T>> center_; 529 }; 530 531 } // namespace pipe_detail 532 533 // Receive end of a Pipe. 534 template <typename T> 535 class PipeReceiver { 536 public: 537 PipeReceiver(const PipeReceiver&) = delete; 538 PipeReceiver& operator=(const PipeReceiver&) = delete; 539 PipeReceiver(PipeReceiver&& other) noexcept = default; 540 PipeReceiver& operator=(PipeReceiver&& other) noexcept = default; 541 ~PipeReceiver() { 542 if (center_ != nullptr) center_->MarkCancelled(); 543 } 544 545 void Swap(PipeReceiver<T>* other) { std::swap(center_, other->center_); } 546 547 // Receive a single message from the pipe. 548 // Returns a promise that will resolve to an optional<T> - with a value if a 549 // message was received, or no value if the other end of the pipe was closed. 550 // Blocks the promise until the receiver is either closed or a message is 551 // available. 552 auto Next() { 553 return Seq(pipe_detail::Next<T>(center_), [center = center_]( 554 absl::optional<T> value) { 555 bool open = value.has_value(); 556 bool cancelled = center == nullptr ? true : center->cancelled(); 557 return If( 558 open, 559 [center = std::move(center), value = std::move(value)]() mutable { 560 auto run = center->Run(std::move(value)); 561 return Map(std::move(run), [center = std::move(center)]( 562 absl::optional<T> value) mutable { 563 if (value.has_value()) { 564 center->value() = std::move(*value); 565 return NextResult<T>(std::move(center)); 566 } else { 567 center->MarkCancelled(); 568 return NextResult<T>(true); 569 } 570 }); 571 }, 572 [cancelled]() { return NextResult<T>(cancelled); }); 573 }); 574 } 575 576 // Return a promise that resolves when the receiver is closed. 577 // The resolved value is a bool - true if the pipe was cancelled, false if it 578 // was closed successfully. 579 // Checks closed from the receivers perspective: that is, if there is a value 580 // in the pipe but the pipe is closed, reports open until that value is read. 581 auto AwaitClosed() { 582 return [center = center_]() -> Poll<bool> { 583 if (center == nullptr) return false; 584 return center->PollClosedForReceiver(); 585 }; 586 } 587 588 auto AwaitEmpty() { 589 return [center = center_]() { return center->PollEmpty(); }; 590 } 591 592 void CloseWithError() { 593 if (center_ != nullptr) { 594 center_->MarkCancelled(); 595 center_.reset(); 596 } 597 } 598 599 // Interject PromiseFactory f into the pipeline. 600 // f will be called with the current value traversing the pipe, and should 601 // return a value to replace it with. 602 // Interjects at the Next end of the pipe. 603 template <typename Fn> 604 void InterceptAndMap(Fn f, DebugLocation from = {}) { 605 center_->AppendMap(std::move(f), from); 606 } 607 608 // Per above, but calls cleanup_fn when the pipe is closed. 609 template <typename Fn, typename OnHalfClose> 610 void InterceptAndMapWithHalfClose(Fn f, OnHalfClose cleanup_fn, 611 DebugLocation from = {}) { 612 center_->AppendMapWithCleanup(std::move(f), std::move(cleanup_fn), from); 613 } 614 615 private: 616 friend struct Pipe<T>; 617 explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {} 618 RefCountedPtr<pipe_detail::Center<T>> center_; 619 }; 620 621 namespace pipe_detail { 622 623 // Implementation of PipeSender::Push promise. 624 template <typename T> 625 class Push { 626 public: 627 Push(const Push&) = delete; 628 629 Push& operator=(const Push&) = delete; 630 Push(Push&& other) noexcept = default; 631 Push& operator=(Push&& other) noexcept = default; 632 633 Poll<bool> operator()() { 634 if (center_ == nullptr) { 635 GRPC_TRACE_VLOG(promise_primitives, 2) 636 << GetContext<Activity>()->DebugTag() 637 << " Pipe push has a null center"; 638 return false; 639 } 640 if (auto* p = absl::get_if<T>(&state_)) { 641 auto r = center_->Push(p); 642 if (auto* ok = r.value_if_ready()) { 643 state_.template emplace<AwaitingAck>(); 644 if (!*ok) return false; 645 } else { 646 return Pending{}; 647 } 648 } 649 DCHECK(absl::holds_alternative<AwaitingAck>(state_)); 650 return center_->PollAck(); 651 } 652 653 private: 654 struct AwaitingAck {}; 655 656 friend class PipeSender<T>; 657 explicit Push(RefCountedPtr<pipe_detail::Center<T>> center, T push) 658 : center_(std::move(center)), state_(std::move(push)) {} 659 660 RefCountedPtr<Center<T>> center_; 661 absl::variant<T, AwaitingAck> state_; 662 }; 663 664 } // namespace pipe_detail 665 666 template <typename T> 667 pipe_detail::Push<T> PipeSender<T>::Push(T value) { 668 return pipe_detail::Push<T>(center_ == nullptr ? nullptr : center_->Ref(), 669 std::move(value)); 670 } 671 672 template <typename T> 673 using PipeReceiverNextType = decltype(std::declval<PipeReceiver<T>>().Next()); 674 675 template <typename T> 676 bool NextResult<T>::has_value() const { 677 return center_ != nullptr; 678 } 679 680 template <typename T> 681 T& NextResult<T>::operator*() { 682 return center_->value(); 683 } 684 685 template <typename T> 686 const T& NextResult<T>::operator*() const { 687 return center_->value(); 688 } 689 690 template <typename T> 691 NextResult<T>::~NextResult() { 692 if (center_ != nullptr) center_->AckNext(); 693 } 694 695 template <typename T> 696 void NextResult<T>::reset() { 697 if (center_ != nullptr) { 698 center_->AckNext(); 699 center_.reset(); 700 } 701 } 702 703 // A Pipe is an intra-Activity communications channel that transmits T's from 704 // one end to the other. 705 // It is only safe to use a Pipe within the context of a single Activity. 706 // No synchronization is performed internally. 707 // The primary Pipe data structure is allocated from an arena, so the activity 708 // must have an arena as part of its context. 709 // By performing that allocation we can ensure stable pointer to shared data 710 // allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their 711 // implementation. 712 // This type has been optimized with the expectation that there are relatively 713 // few pipes per activity. If this assumption does not hold then a design 714 // allowing inline filtering of pipe contents (instead of connecting pipes with 715 // polling code) would likely be more appropriate. 716 template <typename T> 717 struct Pipe { 718 Pipe() : Pipe(GetContext<Arena>()) {} 719 explicit Pipe(Arena* arena) : Pipe(arena->New<pipe_detail::Center<T>>()) {} 720 Pipe(const Pipe&) = delete; 721 Pipe& operator=(const Pipe&) = delete; 722 Pipe(Pipe&&) noexcept = default; 723 Pipe& operator=(Pipe&&) noexcept = default; 724 725 PipeSender<T> sender; 726 PipeReceiver<T> receiver; 727 728 private: 729 explicit Pipe(pipe_detail::Center<T>* center) 730 : sender(center), receiver(center) {} 731 }; 732 733 } // namespace grpc_core 734 735 #endif // GRPC_SRC_CORE_LIB_PROMISE_PIPE_H 736