1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_LATCH_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_LATCH_H 17 18 #include <grpc/support/port_platform.h> 19 20 #include <stdint.h> 21 22 #include <atomic> 23 #include <string> 24 #include <utility> 25 26 #include "absl/strings/str_cat.h" 27 28 #include <grpc/support/log.h> 29 30 #include "src/core/lib/promise/activity.h" 31 #include "src/core/lib/promise/poll.h" 32 #include "src/core/lib/promise/trace.h" 33 34 namespace grpc_core { 35 36 // Latch provides a single set waitable object. 37 // Initially the Latch is unset. 38 // It can be waited upon by the Wait method, which produces a Promise that 39 // resolves when the Latch is Set to a value of type T. 40 // Latches only work correctly within a single activity. 41 template <typename T> 42 class Latch { 43 public: 44 Latch() = default; 45 Latch(const Latch&) = delete; Latch(T value)46 explicit Latch(T value) : value_(std::move(value)), has_value_(true) {} 47 Latch& operator=(const Latch&) = delete; Latch(Latch && other)48 Latch(Latch&& other) noexcept 49 : value_(std::move(other.value_)), has_value_(other.has_value_) { 50 #ifndef NDEBUG 51 GPR_DEBUG_ASSERT(!other.has_had_waiters_); 52 #endif 53 } 54 Latch& operator=(Latch&& other) noexcept { 55 #ifndef NDEBUG 56 GPR_DEBUG_ASSERT(!other.has_had_waiters_); 57 #endif 58 value_ = std::move(other.value_); 59 has_value_ = other.has_value_; 60 return *this; 61 } 62 63 // Produce a promise to wait for a value from this latch. 64 // Moves the result out of the latch. Wait()65 auto Wait() { 66 #ifndef NDEBUG 67 has_had_waiters_ = true; 68 #endif 69 return [this]() -> Poll<T> { 70 if (grpc_trace_promise_primitives.enabled()) { 71 gpr_log(GPR_INFO, "%sWait %s", DebugTag().c_str(), 72 StateString().c_str()); 73 } 74 if (has_value_) { 75 return std::move(value_); 76 } else { 77 return waiter_.pending(); 78 } 79 }; 80 } 81 82 // Produce a promise to wait for a value from this latch. 83 // Copies the result out of the latch. WaitAndCopy()84 auto WaitAndCopy() { 85 #ifndef NDEBUG 86 has_had_waiters_ = true; 87 #endif 88 return [this]() -> Poll<T> { 89 if (grpc_trace_promise_primitives.enabled()) { 90 gpr_log(GPR_INFO, "%sWaitAndCopy %s", DebugTag().c_str(), 91 StateString().c_str()); 92 } 93 if (has_value_) { 94 return value_; 95 } else { 96 return waiter_.pending(); 97 } 98 }; 99 } 100 101 // Set the value of the latch. Can only be called once. Set(T value)102 void Set(T value) { 103 if (grpc_trace_promise_primitives.enabled()) { 104 gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str()); 105 } 106 GPR_DEBUG_ASSERT(!has_value_); 107 value_ = std::move(value); 108 has_value_ = true; 109 waiter_.Wake(); 110 } 111 is_set()112 bool is_set() const { return has_value_; } 113 114 private: DebugTag()115 std::string DebugTag() { 116 return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH[0x", 117 reinterpret_cast<uintptr_t>(this), "]: "); 118 } 119 StateString()120 std::string StateString() { 121 return absl::StrCat("has_value:", has_value_ ? "true" : "false", 122 " waiter:", waiter_.DebugString()); 123 } 124 125 // The value stored (if has_value_ is true), otherwise some random value, we 126 // don't care. 127 // Why not absl::optional<>? Writing things this way lets us compress 128 // has_value_ with waiter_ and leads to some significant memory savings for 129 // some scenarios. 130 GPR_NO_UNIQUE_ADDRESS T value_; 131 // True if we have a value set, false otherwise. 132 bool has_value_ = false; 133 #ifndef NDEBUG 134 // Has this latch ever had waiters. 135 bool has_had_waiters_ = false; 136 #endif 137 IntraActivityWaiter waiter_; 138 }; 139 140 // Specialization for void. 141 template <> 142 class Latch<void> { 143 public: 144 Latch() = default; 145 Latch(const Latch&) = delete; 146 Latch& operator=(const Latch&) = delete; Latch(Latch && other)147 Latch(Latch&& other) noexcept : is_set_(other.is_set_) { 148 #ifndef NDEBUG 149 GPR_DEBUG_ASSERT(!other.has_had_waiters_); 150 #endif 151 } 152 Latch& operator=(Latch&& other) noexcept { 153 #ifndef NDEBUG 154 GPR_DEBUG_ASSERT(!other.has_had_waiters_); 155 #endif 156 is_set_ = other.is_set_; 157 return *this; 158 } 159 160 // Produce a promise to wait for this latch. Wait()161 auto Wait() { 162 #ifndef NDEBUG 163 has_had_waiters_ = true; 164 #endif 165 return [this]() -> Poll<Empty> { 166 if (grpc_trace_promise_primitives.enabled()) { 167 gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(), 168 StateString().c_str()); 169 } 170 if (is_set_) { 171 return Empty{}; 172 } else { 173 return waiter_.pending(); 174 } 175 }; 176 } 177 178 // Set the latch. Can only be called once. Set()179 void Set() { 180 if (grpc_trace_promise_primitives.enabled()) { 181 gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str()); 182 } 183 GPR_DEBUG_ASSERT(!is_set_); 184 is_set_ = true; 185 waiter_.Wake(); 186 } 187 is_set()188 bool is_set() const { return is_set_; } 189 190 private: DebugTag()191 std::string DebugTag() { 192 return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x", 193 reinterpret_cast<uintptr_t>(this), "]: "); 194 } 195 StateString()196 std::string StateString() { 197 return absl::StrCat("is_set:", is_set_ ? "true" : "false", 198 " waiter:", waiter_.DebugString()); 199 } 200 201 // True if we have a value set, false otherwise. 202 bool is_set_ = false; 203 #ifndef NDEBUG 204 // Has this latch ever had waiters. 205 bool has_had_waiters_ = false; 206 #endif 207 IntraActivityWaiter waiter_; 208 }; 209 210 template <typename T> 211 using LatchWaitPromise = decltype(std::declval<Latch<T>>().Wait()); 212 213 // A Latch that can have its value observed by outside threads, but only waited 214 // upon from inside a single activity. 215 template <typename T> 216 class ExternallyObservableLatch; 217 218 template <> 219 class ExternallyObservableLatch<void> { 220 public: 221 ExternallyObservableLatch() = default; 222 ExternallyObservableLatch(const ExternallyObservableLatch&) = delete; 223 ExternallyObservableLatch& operator=(const ExternallyObservableLatch&) = 224 delete; 225 226 // Produce a promise to wait for this latch. Wait()227 auto Wait() { 228 return [this]() -> Poll<Empty> { 229 if (grpc_trace_promise_primitives.enabled()) { 230 gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(), 231 StateString().c_str()); 232 } 233 if (IsSet()) { 234 return Empty{}; 235 } else { 236 return waiter_.pending(); 237 } 238 }; 239 } 240 241 // Set the latch. Set()242 void Set() { 243 if (grpc_trace_promise_primitives.enabled()) { 244 gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str()); 245 } 246 is_set_.store(true, std::memory_order_relaxed); 247 waiter_.Wake(); 248 } 249 IsSet()250 bool IsSet() const { return is_set_.load(std::memory_order_relaxed); } 251 Reset()252 void Reset() { 253 if (grpc_trace_promise_primitives.enabled()) { 254 gpr_log(GPR_INFO, "%sReset %s", DebugTag().c_str(), 255 StateString().c_str()); 256 } 257 is_set_.store(false, std::memory_order_relaxed); 258 } 259 260 private: DebugTag()261 std::string DebugTag() { 262 return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x", 263 reinterpret_cast<uintptr_t>(this), "]: "); 264 } 265 StateString()266 std::string StateString() { 267 return absl::StrCat( 268 "is_set:", is_set_.load(std::memory_order_relaxed) ? "true" : "false", 269 " waiter:", waiter_.DebugString()); 270 } 271 272 // True if we have a value set, false otherwise. 273 std::atomic<bool> is_set_{false}; 274 IntraActivityWaiter waiter_; 275 }; 276 277 } // namespace grpc_core 278 279 #endif // GRPC_SRC_CORE_LIB_PROMISE_LATCH_H 280