1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_LATCH_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_LATCH_H 17 18 #include <grpc/support/port_platform.h> 19 #include <stdint.h> 20 21 #include <atomic> 22 #include <string> 23 #include <utility> 24 25 #include "absl/log/check.h" 26 #include "absl/log/log.h" 27 #include "absl/strings/str_cat.h" 28 #include "src/core/lib/debug/trace.h" 29 #include "src/core/lib/promise/activity.h" 30 #include "src/core/lib/promise/poll.h" 31 32 namespace grpc_core { 33 34 // Latch provides a single set waitable object. 35 // Initially the Latch is unset. 36 // It can be waited upon by the Wait method, which produces a Promise that 37 // resolves when the Latch is Set to a value of type T. 38 // Latches only work correctly within a single activity. 39 template <typename T> 40 class Latch { 41 public: 42 Latch() = default; 43 Latch(const Latch&) = delete; Latch(T value)44 explicit Latch(T value) : value_(std::move(value)), has_value_(true) {} 45 Latch& operator=(const Latch&) = delete; Latch(Latch && other)46 Latch(Latch&& other) noexcept 47 : value_(std::move(other.value_)), has_value_(other.has_value_) { 48 #ifndef NDEBUG 49 DCHECK(!other.has_had_waiters_); 50 #endif 51 } 52 Latch& operator=(Latch&& other) noexcept { 53 #ifndef NDEBUG 54 DCHECK(!other.has_had_waiters_); 55 #endif 56 value_ = std::move(other.value_); 57 has_value_ = other.has_value_; 58 return *this; 59 } 60 61 // Produce a promise to wait for a value from this latch. 62 // Moves the result out of the latch. Wait()63 auto Wait() { 64 #ifndef NDEBUG 65 has_had_waiters_ = true; 66 #endif 67 return [this]() -> Poll<T> { 68 GRPC_TRACE_LOG(promise_primitives, INFO) 69 << DebugTag() << "Wait " << StateString(); 70 if (has_value_) { 71 return std::move(value_); 72 } else { 73 return waiter_.pending(); 74 } 75 }; 76 } 77 78 // Produce a promise to wait for a value from this latch. 79 // Copies the result out of the latch. WaitAndCopy()80 auto WaitAndCopy() { 81 #ifndef NDEBUG 82 has_had_waiters_ = true; 83 #endif 84 return [this]() -> Poll<T> { 85 GRPC_TRACE_LOG(promise_primitives, INFO) 86 << DebugTag() << "WaitAndCopy " << StateString(); 87 if (has_value_) { 88 return value_; 89 } else { 90 return waiter_.pending(); 91 } 92 }; 93 } 94 95 // Set the value of the latch. Can only be called once. Set(T value)96 void Set(T value) { 97 GRPC_TRACE_LOG(promise_primitives, INFO) 98 << DebugTag() << "Set " << StateString(); 99 DCHECK(!has_value_); 100 value_ = std::move(value); 101 has_value_ = true; 102 waiter_.Wake(); 103 } 104 is_set()105 bool is_set() const { return has_value_; } 106 107 private: DebugTag()108 std::string DebugTag() { 109 return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH[0x", 110 reinterpret_cast<uintptr_t>(this), "]: "); 111 } 112 StateString()113 std::string StateString() { 114 return absl::StrCat("has_value:", has_value_ ? "true" : "false", 115 " waiter:", waiter_.DebugString()); 116 } 117 118 // The value stored (if has_value_ is true), otherwise some random value, we 119 // don't care. 120 // Why not absl::optional<>? Writing things this way lets us compress 121 // has_value_ with waiter_ and leads to some significant memory savings for 122 // some scenarios. 123 GPR_NO_UNIQUE_ADDRESS T value_; 124 // True if we have a value set, false otherwise. 125 bool has_value_ = false; 126 #ifndef NDEBUG 127 // Has this latch ever had waiters. 128 bool has_had_waiters_ = false; 129 #endif 130 IntraActivityWaiter waiter_; 131 }; 132 133 // Specialization for void. 134 template <> 135 class Latch<void> { 136 public: 137 Latch() = default; 138 Latch(const Latch&) = delete; 139 Latch& operator=(const Latch&) = delete; Latch(Latch && other)140 Latch(Latch&& other) noexcept : is_set_(other.is_set_) { 141 #ifndef NDEBUG 142 DCHECK(!other.has_had_waiters_); 143 #endif 144 } 145 Latch& operator=(Latch&& other) noexcept { 146 #ifndef NDEBUG 147 DCHECK(!other.has_had_waiters_); 148 #endif 149 is_set_ = other.is_set_; 150 return *this; 151 } 152 153 // Produce a promise to wait for this latch. Wait()154 auto Wait() { 155 #ifndef NDEBUG 156 has_had_waiters_ = true; 157 #endif 158 return [this]() -> Poll<Empty> { 159 GRPC_TRACE_LOG(promise_primitives, INFO) 160 << DebugTag() << "PollWait " << StateString(); 161 if (is_set_) { 162 return Empty{}; 163 } else { 164 return waiter_.pending(); 165 } 166 }; 167 } 168 169 // Set the latch. Can only be called once. Set()170 void Set() { 171 GRPC_TRACE_LOG(promise_primitives, INFO) 172 << DebugTag() << "Set " << StateString(); 173 DCHECK(!is_set_); 174 is_set_ = true; 175 waiter_.Wake(); 176 } 177 is_set()178 bool is_set() const { return is_set_; } 179 180 private: DebugTag()181 std::string DebugTag() { 182 return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x", 183 reinterpret_cast<uintptr_t>(this), "]: "); 184 } 185 StateString()186 std::string StateString() { 187 return absl::StrCat("is_set:", is_set_ ? "true" : "false", 188 " waiter:", waiter_.DebugString()); 189 } 190 191 // True if we have a value set, false otherwise. 192 bool is_set_ = false; 193 #ifndef NDEBUG 194 // Has this latch ever had waiters. 195 bool has_had_waiters_ = false; 196 #endif 197 IntraActivityWaiter waiter_; 198 }; 199 200 template <typename T> 201 using LatchWaitPromise = decltype(std::declval<Latch<T>>().Wait()); 202 203 // A Latch that can have its value observed by outside threads, but only waited 204 // upon from inside a single activity. 205 template <typename T> 206 class ExternallyObservableLatch; 207 208 template <> 209 class ExternallyObservableLatch<void> { 210 public: 211 ExternallyObservableLatch() = default; 212 ExternallyObservableLatch(const ExternallyObservableLatch&) = delete; 213 ExternallyObservableLatch& operator=(const ExternallyObservableLatch&) = 214 delete; 215 216 // Produce a promise to wait for this latch. Wait()217 auto Wait() { 218 return [this]() -> Poll<Empty> { 219 GRPC_TRACE_LOG(promise_primitives, INFO) 220 << DebugTag() << "PollWait " << StateString(); 221 if (IsSet()) { 222 return Empty{}; 223 } else { 224 return waiter_.pending(); 225 } 226 }; 227 } 228 229 // Set the latch. Set()230 void Set() { 231 GRPC_TRACE_LOG(promise_primitives, INFO) 232 << DebugTag() << "Set " << StateString(); 233 is_set_.store(true, std::memory_order_relaxed); 234 waiter_.Wake(); 235 } 236 IsSet()237 bool IsSet() const { return is_set_.load(std::memory_order_relaxed); } 238 Reset()239 void Reset() { 240 GRPC_TRACE_LOG(promise_primitives, INFO) 241 << DebugTag() << "Reset " << StateString(); 242 is_set_.store(false, std::memory_order_relaxed); 243 } 244 245 private: DebugTag()246 std::string DebugTag() { 247 return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x", 248 reinterpret_cast<uintptr_t>(this), "]: "); 249 } 250 StateString()251 std::string StateString() { 252 return absl::StrCat( 253 "is_set:", is_set_.load(std::memory_order_relaxed) ? "true" : "false", 254 " waiter:", waiter_.DebugString()); 255 } 256 257 // True if we have a value set, false otherwise. 258 std::atomic<bool> is_set_{false}; 259 IntraActivityWaiter waiter_; 260 }; 261 262 } // namespace grpc_core 263 264 #endif // GRPC_SRC_CORE_LIB_PROMISE_LATCH_H 265