1 // Copyright 2023 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H 17 18 #include <grpc/support/port_platform.h> 19 #include <stdint.h> 20 21 #include <array> 22 #include <utility> 23 24 #include "absl/base/thread_annotations.h" 25 #include "absl/types/optional.h" 26 #include "src/core/lib/promise/activity.h" 27 #include "src/core/lib/promise/poll.h" 28 #include "src/core/util/ref_counted.h" 29 #include "src/core/util/ref_counted_ptr.h" 30 #include "src/core/util/sync.h" 31 32 namespace grpc_core { 33 34 template <typename T, uint8_t kQueueSize> 35 class InterActivityPipe { 36 public: 37 class NextResult { 38 public: 39 template <typename... Args> NextResult(Args &&...args)40 explicit NextResult(Args&&... args) : value_(std::forward<Args>(args)...) {} 41 using value_type = T; reset()42 void reset() { value_.reset(); } cancelled()43 bool cancelled() const { return false; } has_value()44 bool has_value() const { return value_.has_value(); } value()45 const T& value() const { return value_.value(); } value()46 T& value() { return value_.value(); } 47 const T& operator*() const { return *value_; } 48 T& operator*() { return *value_; } 49 50 private: 51 absl::optional<T> value_; 52 }; 53 54 private: 55 class Center : public RefCounted<Center, NonPolymorphicRefCount> { 56 public: Push(T & value)57 Poll<bool> Push(T& value) { 58 ReleasableMutexLock lock(&mu_); 59 if (closed_) return false; 60 if (count_ == kQueueSize) { 61 on_available_ = GetContext<Activity>()->MakeNonOwningWaker(); 62 return Pending{}; 63 } 64 queue_[(first_ + count_) % kQueueSize] = std::move(value); 65 ++count_; 66 if (count_ == 1) { 67 auto on_occupied = std::move(on_occupied_); 68 lock.Release(); 69 on_occupied.Wakeup(); 70 } 71 return true; 72 } 73 Next()74 Poll<NextResult> Next() { 75 ReleasableMutexLock lock(&mu_); 76 if (count_ == 0) { 77 if (closed_) return absl::nullopt; 78 on_occupied_ = GetContext<Activity>()->MakeNonOwningWaker(); 79 return Pending{}; 80 } 81 auto value = std::move(queue_[first_]); 82 first_ = (first_ + 1) % kQueueSize; 83 --count_; 84 if (count_ == kQueueSize - 1) { 85 auto on_available = std::move(on_available_); 86 lock.Release(); 87 on_available.Wakeup(); 88 } 89 return std::move(value); 90 } 91 MarkClosed()92 void MarkClosed() { 93 ReleasableMutexLock lock(&mu_); 94 if (std::exchange(closed_, true)) return; 95 auto on_occupied = std::move(on_occupied_); 96 auto on_available = std::move(on_available_); 97 lock.Release(); 98 on_occupied.Wakeup(); 99 on_available.Wakeup(); 100 } 101 IsClosed()102 bool IsClosed() { 103 MutexLock lock(&mu_); 104 return closed_; 105 } 106 107 private: 108 Mutex mu_; 109 std::array<T, kQueueSize> queue_ ABSL_GUARDED_BY(mu_); 110 bool closed_ ABSL_GUARDED_BY(mu_) = false; 111 uint8_t first_ ABSL_GUARDED_BY(mu_) = 0; 112 uint8_t count_ ABSL_GUARDED_BY(mu_) = 0; 113 Waker on_occupied_ ABSL_GUARDED_BY(mu_); 114 Waker on_available_ ABSL_GUARDED_BY(mu_); 115 }; 116 RefCountedPtr<Center> center_{MakeRefCounted<Center>()}; 117 118 public: 119 class Sender { 120 public: Sender(RefCountedPtr<Center> center)121 explicit Sender(RefCountedPtr<Center> center) 122 : center_(std::move(center)) {} 123 Sender(const Sender&) = delete; 124 Sender& operator=(const Sender&) = delete; 125 Sender(Sender&&) noexcept = default; 126 Sender& operator=(Sender&&) noexcept = default; 127 ~Sender()128 ~Sender() { 129 if (center_ != nullptr) center_->MarkClosed(); 130 } 131 IsClosed()132 bool IsClosed() { return center_->IsClosed(); } 133 MarkClosed()134 void MarkClosed() { 135 if (center_ != nullptr) center_->MarkClosed(); 136 } 137 Push(T value)138 auto Push(T value) { 139 return [center = center_, value = std::move(value)]() mutable { 140 return center->Push(value); 141 }; 142 } 143 144 private: 145 RefCountedPtr<Center> center_; 146 }; 147 148 class Receiver { 149 public: Receiver(RefCountedPtr<Center> center)150 explicit Receiver(RefCountedPtr<Center> center) 151 : center_(std::move(center)) {} 152 Receiver(const Receiver&) = delete; 153 Receiver& operator=(const Receiver&) = delete; 154 Receiver(Receiver&&) noexcept = default; 155 Receiver& operator=(Receiver&&) noexcept = default; 156 ~Receiver()157 ~Receiver() { 158 if (center_ != nullptr) center_->MarkClosed(); 159 } 160 Next()161 auto Next() { 162 return [center = center_]() { return center->Next(); }; 163 } 164 IsClose()165 bool IsClose() { return center_->IsClosed(); } 166 MarkClose()167 void MarkClose() { 168 if (center_ != nullptr) center_->MarkClosed(); 169 } 170 171 private: 172 RefCountedPtr<Center> center_; 173 }; 174 175 Sender sender{center_}; 176 Receiver receiver{center_}; 177 }; 178 179 } // namespace grpc_core 180 181 #endif // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_PIPE_H 182