1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_PIPE_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_PIPE_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stdint.h> 21 #include <stdlib.h> 22 23 #include <memory> 24 #include <string> 25 #include <utility> 26 27 #include "absl/strings/str_cat.h" 28 #include "absl/types/optional.h" 29 #include "absl/types/variant.h" 30 31 #include <grpc/support/log.h> 32 33 #include "src/core/lib/gprpp/debug_location.h" 34 #include "src/core/lib/gprpp/ref_counted_ptr.h" 35 #include "src/core/lib/promise/activity.h" 36 #include "src/core/lib/promise/context.h" 37 #include "src/core/lib/promise/if.h" 38 #include "src/core/lib/promise/interceptor_list.h" 39 #include "src/core/lib/promise/map.h" 40 #include "src/core/lib/promise/poll.h" 41 #include "src/core/lib/promise/seq.h" 42 #include "src/core/lib/promise/trace.h" 43 #include "src/core/lib/resource_quota/arena.h" 44 45 namespace grpc_core { 46 47 namespace pipe_detail { 48 template <typename T> 49 class Center; 50 } 51 52 template <typename T> 53 struct Pipe; 54 55 // Result of Pipe::Next - represents a received value. 56 // If has_value() is false, the pipe was closed by the time we polled for the 57 // next value. No value was received, nor will there ever be. 58 // This type is movable but not copyable. 59 // Once the final move is destroyed the pipe will ack the read and unblock the 60 // send. 61 template <typename T> 62 class NextResult final { 63 public: NextResult()64 NextResult() : center_(nullptr) {} NextResult(RefCountedPtr<pipe_detail::Center<T>> center)65 explicit NextResult(RefCountedPtr<pipe_detail::Center<T>> center) 66 : center_(std::move(center)) { 67 GPR_ASSERT(center_ != nullptr); 68 } NextResult(bool cancelled)69 explicit NextResult(bool cancelled) 70 : center_(nullptr), cancelled_(cancelled) {} 71 ~NextResult(); 72 NextResult(const NextResult&) = delete; 73 NextResult& operator=(const NextResult&) = delete; 74 NextResult(NextResult&& other) noexcept = default; 75 NextResult& operator=(NextResult&& other) noexcept = default; 76 77 using value_type = T; 78 79 void reset(); 80 bool has_value() const; 81 // Only valid if has_value() value()82 const T& value() const { 83 GPR_ASSERT(has_value()); 84 return **this; 85 } value()86 T& value() { 87 GPR_ASSERT(has_value()); 88 return **this; 89 } 90 const T& operator*() const; 91 T& operator*(); 92 // Only valid if !has_value() cancelled()93 bool cancelled() { return cancelled_; } 94 95 private: 96 RefCountedPtr<pipe_detail::Center<T>> center_; 97 bool cancelled_; 98 }; 99 100 namespace pipe_detail { 101 102 template <typename T> 103 class Push; 104 template <typename T> 105 class Next; 106 107 // Center sits between a sender and a receiver to provide a one-deep buffer of 108 // Ts 109 template <typename T> 110 class Center : public InterceptorList<T> { 111 public: 112 // Initialize with one send ref (held by PipeSender) and one recv ref (held by 113 // PipeReceiver) Center()114 Center() { 115 refs_ = 2; 116 value_state_ = ValueState::kEmpty; 117 } 118 119 // Add one ref to this object, and return this. IncrementRefCount()120 void IncrementRefCount() { 121 if (grpc_trace_promise_primitives.enabled()) { 122 gpr_log(GPR_DEBUG, "%s", DebugOpString("IncrementRefCount").c_str()); 123 } 124 refs_++; 125 GPR_DEBUG_ASSERT(refs_ != 0); 126 } 127 Ref()128 RefCountedPtr<Center> Ref() { 129 IncrementRefCount(); 130 return RefCountedPtr<Center>(this); 131 } 132 133 // Drop a ref 134 // If no refs remain, destroy this object Unref()135 void Unref() { 136 if (grpc_trace_promise_primitives.enabled()) { 137 gpr_log(GPR_DEBUG, "%s", DebugOpString("Unref").c_str()); 138 } 139 GPR_DEBUG_ASSERT(refs_ > 0); 140 refs_--; 141 if (0 == refs_) { 142 this->~Center(); 143 } 144 } 145 146 // Try to push *value into the pipe. 147 // Return Pending if there is no space. 148 // Return true if the value was pushed. 149 // Return false if the recv end is closed. Push(T * value)150 Poll<bool> Push(T* value) { 151 if (grpc_trace_promise_primitives.enabled()) { 152 gpr_log(GPR_INFO, "%s", DebugOpString("Push").c_str()); 153 } 154 GPR_DEBUG_ASSERT(refs_ != 0); 155 switch (value_state_) { 156 case ValueState::kClosed: 157 case ValueState::kReadyClosed: 158 case ValueState::kCancelled: 159 case ValueState::kWaitingForAckAndClosed: 160 return false; 161 case ValueState::kReady: 162 case ValueState::kAcked: 163 case ValueState::kWaitingForAck: 164 return on_empty_.pending(); 165 case ValueState::kEmpty: 166 value_state_ = ValueState::kReady; 167 value_ = std::move(*value); 168 on_full_.Wake(); 169 return true; 170 } 171 GPR_UNREACHABLE_CODE(return false); 172 } 173 PollAck()174 Poll<bool> PollAck() { 175 if (grpc_trace_promise_primitives.enabled()) { 176 gpr_log(GPR_INFO, "%s", DebugOpString("PollAck").c_str()); 177 } 178 GPR_DEBUG_ASSERT(refs_ != 0); 179 switch (value_state_) { 180 case ValueState::kClosed: 181 return true; 182 case ValueState::kCancelled: 183 return false; 184 case ValueState::kReady: 185 case ValueState::kReadyClosed: 186 case ValueState::kEmpty: 187 case ValueState::kWaitingForAck: 188 case ValueState::kWaitingForAckAndClosed: 189 return on_empty_.pending(); 190 case ValueState::kAcked: 191 value_state_ = ValueState::kEmpty; 192 on_empty_.Wake(); 193 return true; 194 } 195 return true; 196 } 197 198 // Try to receive a value from the pipe. 199 // Return Pending if there is no value. 200 // Return the value if one was retrieved. 201 // Return nullopt if the send end is closed and no value had been pushed. Next()202 Poll<absl::optional<T>> Next() { 203 if (grpc_trace_promise_primitives.enabled()) { 204 gpr_log(GPR_INFO, "%s", DebugOpString("Next").c_str()); 205 } 206 GPR_DEBUG_ASSERT(refs_ != 0); 207 switch (value_state_) { 208 case ValueState::kEmpty: 209 case ValueState::kAcked: 210 case ValueState::kWaitingForAck: 211 case ValueState::kWaitingForAckAndClosed: 212 return on_full_.pending(); 213 case ValueState::kReadyClosed: 214 value_state_ = ValueState::kWaitingForAckAndClosed; 215 return std::move(value_); 216 case ValueState::kReady: 217 value_state_ = ValueState::kWaitingForAck; 218 return std::move(value_); 219 case ValueState::kClosed: 220 case ValueState::kCancelled: 221 return absl::nullopt; 222 } 223 GPR_UNREACHABLE_CODE(return absl::nullopt); 224 } 225 226 // Check if the pipe is closed for sending (if there is a value still queued 227 // but the pipe is closed, reports closed). PollClosedForSender()228 Poll<bool> PollClosedForSender() { 229 if (grpc_trace_promise_primitives.enabled()) { 230 gpr_log(GPR_INFO, "%s", DebugOpString("PollClosedForSender").c_str()); 231 } 232 GPR_DEBUG_ASSERT(refs_ != 0); 233 switch (value_state_) { 234 case ValueState::kEmpty: 235 case ValueState::kAcked: 236 case ValueState::kReady: 237 case ValueState::kWaitingForAck: 238 return on_closed_.pending(); 239 case ValueState::kWaitingForAckAndClosed: 240 case ValueState::kReadyClosed: 241 case ValueState::kClosed: 242 return false; 243 case ValueState::kCancelled: 244 return true; 245 } 246 GPR_UNREACHABLE_CODE(return true); 247 } 248 249 // Check if the pipe is closed for receiving (if there is a value still queued 250 // but the pipe is closed, reports open). PollClosedForReceiver()251 Poll<bool> PollClosedForReceiver() { 252 if (grpc_trace_promise_primitives.enabled()) { 253 gpr_log(GPR_INFO, "%s", DebugOpString("PollClosedForReceiver").c_str()); 254 } 255 GPR_DEBUG_ASSERT(refs_ != 0); 256 switch (value_state_) { 257 case ValueState::kEmpty: 258 case ValueState::kAcked: 259 case ValueState::kReady: 260 case ValueState::kReadyClosed: 261 case ValueState::kWaitingForAck: 262 case ValueState::kWaitingForAckAndClosed: 263 return on_closed_.pending(); 264 case ValueState::kClosed: 265 return false; 266 case ValueState::kCancelled: 267 return true; 268 } 269 GPR_UNREACHABLE_CODE(return true); 270 } 271 PollEmpty()272 Poll<Empty> PollEmpty() { 273 if (grpc_trace_promise_primitives.enabled()) { 274 gpr_log(GPR_INFO, "%s", DebugOpString("PollEmpty").c_str()); 275 } 276 GPR_DEBUG_ASSERT(refs_ != 0); 277 switch (value_state_) { 278 case ValueState::kReady: 279 case ValueState::kReadyClosed: 280 return on_empty_.pending(); 281 case ValueState::kWaitingForAck: 282 case ValueState::kWaitingForAckAndClosed: 283 case ValueState::kAcked: 284 case ValueState::kEmpty: 285 case ValueState::kClosed: 286 case ValueState::kCancelled: 287 return Empty{}; 288 } 289 GPR_UNREACHABLE_CODE(return Empty{}); 290 } 291 AckNext()292 void AckNext() { 293 if (grpc_trace_promise_primitives.enabled()) { 294 gpr_log(GPR_INFO, "%s", DebugOpString("AckNext").c_str()); 295 } 296 switch (value_state_) { 297 case ValueState::kReady: 298 case ValueState::kWaitingForAck: 299 value_state_ = ValueState::kAcked; 300 on_empty_.Wake(); 301 break; 302 case ValueState::kReadyClosed: 303 case ValueState::kWaitingForAckAndClosed: 304 this->ResetInterceptorList(); 305 value_state_ = ValueState::kClosed; 306 on_closed_.Wake(); 307 on_empty_.Wake(); 308 on_full_.Wake(); 309 break; 310 case ValueState::kClosed: 311 case ValueState::kCancelled: 312 break; 313 case ValueState::kEmpty: 314 case ValueState::kAcked: 315 abort(); 316 } 317 } 318 MarkClosed()319 void MarkClosed() { 320 if (grpc_trace_promise_primitives.enabled()) { 321 gpr_log(GPR_INFO, "%s", DebugOpString("MarkClosed").c_str()); 322 } 323 switch (value_state_) { 324 case ValueState::kEmpty: 325 case ValueState::kAcked: 326 this->ResetInterceptorList(); 327 value_state_ = ValueState::kClosed; 328 on_empty_.Wake(); 329 on_full_.Wake(); 330 on_closed_.Wake(); 331 break; 332 case ValueState::kReady: 333 value_state_ = ValueState::kReadyClosed; 334 on_closed_.Wake(); 335 break; 336 case ValueState::kWaitingForAck: 337 value_state_ = ValueState::kWaitingForAckAndClosed; 338 on_closed_.Wake(); 339 break; 340 case ValueState::kReadyClosed: 341 case ValueState::kClosed: 342 case ValueState::kCancelled: 343 case ValueState::kWaitingForAckAndClosed: 344 break; 345 } 346 } 347 MarkCancelled()348 void MarkCancelled() { 349 if (grpc_trace_promise_primitives.enabled()) { 350 gpr_log(GPR_INFO, "%s", DebugOpString("MarkCancelled").c_str()); 351 } 352 switch (value_state_) { 353 case ValueState::kEmpty: 354 case ValueState::kAcked: 355 case ValueState::kReady: 356 case ValueState::kReadyClosed: 357 case ValueState::kWaitingForAck: 358 case ValueState::kWaitingForAckAndClosed: 359 this->ResetInterceptorList(); 360 value_state_ = ValueState::kCancelled; 361 on_empty_.Wake(); 362 on_full_.Wake(); 363 on_closed_.Wake(); 364 break; 365 case ValueState::kClosed: 366 case ValueState::kCancelled: 367 break; 368 } 369 } 370 cancelled()371 bool cancelled() { return value_state_ == ValueState::kCancelled; } 372 value()373 T& value() { return value_; } value()374 const T& value() const { return value_; } 375 DebugTag()376 std::string DebugTag() { 377 if (auto* activity = GetContext<Activity>()) { 378 return absl::StrCat(activity->DebugTag(), " PIPE[0x", absl::Hex(this), 379 "]: "); 380 } else { 381 return absl::StrCat("PIPE[0x", reinterpret_cast<uintptr_t>(this), "]: "); 382 } 383 } 384 385 private: 386 // State of value_. 387 enum class ValueState : uint8_t { 388 // No value is set, it's possible to send. 389 kEmpty, 390 // Value has been pushed but not acked, it's possible to receive. 391 kReady, 392 // Value has been read and not acked, both send/receive blocked until ack. 393 kWaitingForAck, 394 // Value has been received and acked, we can unblock senders and transition 395 // to empty. 396 kAcked, 397 // Pipe is closed successfully, no more values can be sent 398 kClosed, 399 // Pipe is closed successfully, no more values can be sent 400 // (but one value is queued and ready to be received) 401 kReadyClosed, 402 // Pipe is closed successfully, no more values can be sent 403 // (but one value is queued and waiting to be acked) 404 kWaitingForAckAndClosed, 405 // Pipe is closed unsuccessfully, no more values can be sent 406 kCancelled, 407 }; 408 DebugOpString(std::string op)409 std::string DebugOpString(std::string op) { 410 return absl::StrCat(DebugTag(), op, " refs=", refs_, 411 " value_state=", ValueStateName(value_state_), 412 " on_empty=", on_empty_.DebugString().c_str(), 413 " on_full=", on_full_.DebugString().c_str(), 414 " on_closed=", on_closed_.DebugString().c_str()); 415 } 416 ValueStateName(ValueState state)417 static const char* ValueStateName(ValueState state) { 418 switch (state) { 419 case ValueState::kEmpty: 420 return "Empty"; 421 case ValueState::kReady: 422 return "Ready"; 423 case ValueState::kAcked: 424 return "Acked"; 425 case ValueState::kClosed: 426 return "Closed"; 427 case ValueState::kReadyClosed: 428 return "ReadyClosed"; 429 case ValueState::kWaitingForAck: 430 return "WaitingForAck"; 431 case ValueState::kWaitingForAckAndClosed: 432 return "WaitingForAckAndClosed"; 433 case ValueState::kCancelled: 434 return "Cancelled"; 435 } 436 GPR_UNREACHABLE_CODE(return "unknown"); 437 } 438 439 T value_; 440 // Number of refs 441 uint8_t refs_; 442 // Current state of the value. 443 ValueState value_state_; 444 IntraActivityWaiter on_empty_; 445 IntraActivityWaiter on_full_; 446 IntraActivityWaiter on_closed_; 447 448 // Make failure to destruct show up in ASAN builds. 449 #ifndef NDEBUG 450 std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0); 451 #endif 452 }; 453 454 } // namespace pipe_detail 455 456 // Send end of a Pipe. 457 template <typename T> 458 class PipeSender { 459 public: 460 using PushType = pipe_detail::Push<T>; 461 462 PipeSender(const PipeSender&) = delete; 463 PipeSender& operator=(const PipeSender&) = delete; 464 PipeSender(PipeSender&& other) noexcept = default; 465 PipeSender& operator=(PipeSender&& other) noexcept = default; 466 ~PipeSender()467 ~PipeSender() { 468 if (center_ != nullptr) center_->MarkClosed(); 469 } 470 Close()471 void Close() { 472 if (center_ != nullptr) { 473 center_->MarkClosed(); 474 center_.reset(); 475 } 476 } 477 CloseWithError()478 void CloseWithError() { 479 if (center_ != nullptr) { 480 center_->MarkCancelled(); 481 center_.reset(); 482 } 483 } 484 Swap(PipeSender<T> * other)485 void Swap(PipeSender<T>* other) { std::swap(center_, other->center_); } 486 487 // Send a single message along the pipe. 488 // Returns a promise that will resolve to a bool - true if the message was 489 // sent, false if it could never be sent. Blocks the promise until the 490 // receiver is either closed or able to receive another message. 491 PushType Push(T value); 492 493 // Return a promise that resolves when the receiver is closed. 494 // The resolved value is a bool - true if the pipe was cancelled, false if it 495 // was closed successfully. 496 // Checks closed from the senders perspective: that is, if there is a value in 497 // the pipe but the pipe is closed, reports closed. AwaitClosed()498 auto AwaitClosed() { 499 return [center = center_]() { return center->PollClosedForSender(); }; 500 } 501 502 // Interject PromiseFactory f into the pipeline. 503 // f will be called with the current value traversing the pipe, and should 504 // return a value to replace it with. 505 // Interjects at the Push end of the pipe. 506 template <typename Fn> 507 void InterceptAndMap(Fn f, DebugLocation from = {}) { 508 center_->PrependMap(std::move(f), from); 509 } 510 511 // Per above, but calls cleanup_fn when the pipe is closed. 512 template <typename Fn, typename OnHalfClose> 513 void InterceptAndMap(Fn f, OnHalfClose cleanup_fn, DebugLocation from = {}) { 514 center_->PrependMapWithCleanup(std::move(f), std::move(cleanup_fn), from); 515 } 516 517 private: 518 friend struct Pipe<T>; 519 explicit PipeSender(pipe_detail::Center<T>* center) : center_(center) {} 520 RefCountedPtr<pipe_detail::Center<T>> center_; 521 522 // Make failure to destruct show up in ASAN builds. 523 #ifndef NDEBUG 524 std::unique_ptr<int> asan_canary_ = std::make_unique<int>(0); 525 #endif 526 }; 527 528 template <typename T> 529 class PipeReceiver; 530 531 namespace pipe_detail { 532 533 // Implementation of PipeReceiver::Next promise. 534 template <typename T> 535 class Next { 536 public: 537 Next(const Next&) = delete; 538 Next& operator=(const Next&) = delete; 539 Next(Next&& other) noexcept = default; 540 Next& operator=(Next&& other) noexcept = default; 541 542 Poll<absl::optional<T>> operator()() { 543 return center_ == nullptr ? absl::nullopt : center_->Next(); 544 } 545 546 private: 547 friend class PipeReceiver<T>; 548 explicit Next(RefCountedPtr<Center<T>> center) : center_(std::move(center)) {} 549 550 RefCountedPtr<Center<T>> center_; 551 }; 552 553 } // namespace pipe_detail 554 555 // Receive end of a Pipe. 556 template <typename T> 557 class PipeReceiver { 558 public: 559 PipeReceiver(const PipeReceiver&) = delete; 560 PipeReceiver& operator=(const PipeReceiver&) = delete; 561 PipeReceiver(PipeReceiver&& other) noexcept = default; 562 PipeReceiver& operator=(PipeReceiver&& other) noexcept = default; 563 ~PipeReceiver() { 564 if (center_ != nullptr) center_->MarkCancelled(); 565 } 566 567 void Swap(PipeReceiver<T>* other) { std::swap(center_, other->center_); } 568 569 // Receive a single message from the pipe. 570 // Returns a promise that will resolve to an optional<T> - with a value if a 571 // message was received, or no value if the other end of the pipe was closed. 572 // Blocks the promise until the receiver is either closed or a message is 573 // available. 574 auto Next() { 575 return Seq(pipe_detail::Next<T>(center_), [center = center_]( 576 absl::optional<T> value) { 577 bool open = value.has_value(); 578 bool cancelled = center == nullptr ? true : center->cancelled(); 579 return If( 580 open, 581 [center = std::move(center), value = std::move(value)]() mutable { 582 auto run = center->Run(std::move(value)); 583 return Map(std::move(run), [center = std::move(center)]( 584 absl::optional<T> value) mutable { 585 if (value.has_value()) { 586 center->value() = std::move(*value); 587 return NextResult<T>(std::move(center)); 588 } else { 589 center->MarkCancelled(); 590 return NextResult<T>(true); 591 } 592 }); 593 }, 594 [cancelled]() { return NextResult<T>(cancelled); }); 595 }); 596 } 597 598 // Return a promise that resolves when the receiver is closed. 599 // The resolved value is a bool - true if the pipe was cancelled, false if it 600 // was closed successfully. 601 // Checks closed from the receivers perspective: that is, if there is a value 602 // in the pipe but the pipe is closed, reports open until that value is read. 603 auto AwaitClosed() { 604 return [center = center_]() -> Poll<bool> { 605 if (center == nullptr) return false; 606 return center->PollClosedForReceiver(); 607 }; 608 } 609 610 auto AwaitEmpty() { 611 return [center = center_]() { return center->PollEmpty(); }; 612 } 613 614 void CloseWithError() { 615 if (center_ != nullptr) { 616 center_->MarkCancelled(); 617 center_.reset(); 618 } 619 } 620 621 // Interject PromiseFactory f into the pipeline. 622 // f will be called with the current value traversing the pipe, and should 623 // return a value to replace it with. 624 // Interjects at the Next end of the pipe. 625 template <typename Fn> 626 void InterceptAndMap(Fn f, DebugLocation from = {}) { 627 center_->AppendMap(std::move(f), from); 628 } 629 630 // Per above, but calls cleanup_fn when the pipe is closed. 631 template <typename Fn, typename OnHalfClose> 632 void InterceptAndMapWithHalfClose(Fn f, OnHalfClose cleanup_fn, 633 DebugLocation from = {}) { 634 center_->AppendMapWithCleanup(std::move(f), std::move(cleanup_fn), from); 635 } 636 637 private: 638 friend struct Pipe<T>; 639 explicit PipeReceiver(pipe_detail::Center<T>* center) : center_(center) {} 640 RefCountedPtr<pipe_detail::Center<T>> center_; 641 }; 642 643 namespace pipe_detail { 644 645 // Implementation of PipeSender::Push promise. 646 template <typename T> 647 class Push { 648 public: 649 Push(const Push&) = delete; 650 651 Push& operator=(const Push&) = delete; 652 Push(Push&& other) noexcept = default; 653 Push& operator=(Push&& other) noexcept = default; 654 655 Poll<bool> operator()() { 656 if (center_ == nullptr) { 657 if (grpc_trace_promise_primitives.enabled()) { 658 gpr_log(GPR_DEBUG, "%s Pipe push has a null center", 659 GetContext<Activity>()->DebugTag().c_str()); 660 } 661 return false; 662 } 663 if (auto* p = absl::get_if<T>(&state_)) { 664 auto r = center_->Push(p); 665 if (auto* ok = r.value_if_ready()) { 666 state_.template emplace<AwaitingAck>(); 667 if (!*ok) return false; 668 } else { 669 return Pending{}; 670 } 671 } 672 GPR_DEBUG_ASSERT(absl::holds_alternative<AwaitingAck>(state_)); 673 return center_->PollAck(); 674 } 675 676 private: 677 struct AwaitingAck {}; 678 679 friend class PipeSender<T>; 680 explicit Push(RefCountedPtr<pipe_detail::Center<T>> center, T push) 681 : center_(std::move(center)), state_(std::move(push)) {} 682 683 RefCountedPtr<Center<T>> center_; 684 absl::variant<T, AwaitingAck> state_; 685 }; 686 687 } // namespace pipe_detail 688 689 template <typename T> 690 pipe_detail::Push<T> PipeSender<T>::Push(T value) { 691 return pipe_detail::Push<T>(center_ == nullptr ? nullptr : center_->Ref(), 692 std::move(value)); 693 } 694 695 template <typename T> 696 using PipeReceiverNextType = decltype(std::declval<PipeReceiver<T>>().Next()); 697 698 template <typename T> 699 bool NextResult<T>::has_value() const { 700 return center_ != nullptr; 701 } 702 703 template <typename T> 704 T& NextResult<T>::operator*() { 705 return center_->value(); 706 } 707 708 template <typename T> 709 const T& NextResult<T>::operator*() const { 710 return center_->value(); 711 } 712 713 template <typename T> 714 NextResult<T>::~NextResult() { 715 if (center_ != nullptr) center_->AckNext(); 716 } 717 718 template <typename T> 719 void NextResult<T>::reset() { 720 if (center_ != nullptr) { 721 center_->AckNext(); 722 center_.reset(); 723 } 724 } 725 726 // A Pipe is an intra-Activity communications channel that transmits T's from 727 // one end to the other. 728 // It is only safe to use a Pipe within the context of a single Activity. 729 // No synchronization is performed internally. 730 // The primary Pipe data structure is allocated from an arena, so the activity 731 // must have an arena as part of its context. 732 // By performing that allocation we can ensure stable pointer to shared data 733 // allowing PipeSender/PipeReceiver/Push/Next to be relatively simple in their 734 // implementation. 735 // This type has been optimized with the expectation that there are relatively 736 // few pipes per activity. If this assumption does not hold then a design 737 // allowing inline filtering of pipe contents (instead of connecting pipes with 738 // polling code) would likely be more appropriate. 739 template <typename T> 740 struct Pipe { 741 Pipe() : Pipe(GetContext<Arena>()) {} 742 explicit Pipe(Arena* arena) : Pipe(arena->New<pipe_detail::Center<T>>()) {} 743 Pipe(const Pipe&) = delete; 744 Pipe& operator=(const Pipe&) = delete; 745 Pipe(Pipe&&) noexcept = default; 746 Pipe& operator=(Pipe&&) noexcept = default; 747 748 PipeSender<T> sender; 749 PipeReceiver<T> receiver; 750 751 private: 752 explicit Pipe(pipe_detail::Center<T>* center) 753 : sender(center), receiver(center) {} 754 }; 755 756 } // namespace grpc_core 757 758 #endif // GRPC_SRC_CORE_LIB_PROMISE_PIPE_H 759