• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_LATCH_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_LATCH_H
17 
18 #include <grpc/support/port_platform.h>
19 
20 #include <stdint.h>
21 
22 #include <atomic>
23 #include <string>
24 #include <utility>
25 
26 #include "absl/strings/str_cat.h"
27 
28 #include <grpc/support/log.h>
29 
30 #include "src/core/lib/promise/activity.h"
31 #include "src/core/lib/promise/poll.h"
32 #include "src/core/lib/promise/trace.h"
33 
34 namespace grpc_core {
35 
36 // Latch provides a single set waitable object.
37 // Initially the Latch is unset.
38 // It can be waited upon by the Wait method, which produces a Promise that
39 // resolves when the Latch is Set to a value of type T.
40 // Latches only work correctly within a single activity.
41 template <typename T>
42 class Latch {
43  public:
44   Latch() = default;
45   Latch(const Latch&) = delete;
Latch(T value)46   explicit Latch(T value) : value_(std::move(value)), has_value_(true) {}
47   Latch& operator=(const Latch&) = delete;
Latch(Latch && other)48   Latch(Latch&& other) noexcept
49       : value_(std::move(other.value_)), has_value_(other.has_value_) {
50 #ifndef NDEBUG
51     GPR_DEBUG_ASSERT(!other.has_had_waiters_);
52 #endif
53   }
54   Latch& operator=(Latch&& other) noexcept {
55 #ifndef NDEBUG
56     GPR_DEBUG_ASSERT(!other.has_had_waiters_);
57 #endif
58     value_ = std::move(other.value_);
59     has_value_ = other.has_value_;
60     return *this;
61   }
62 
63   // Produce a promise to wait for a value from this latch.
64   // Moves the result out of the latch.
Wait()65   auto Wait() {
66 #ifndef NDEBUG
67     has_had_waiters_ = true;
68 #endif
69     return [this]() -> Poll<T> {
70       if (grpc_trace_promise_primitives.enabled()) {
71         gpr_log(GPR_INFO, "%sWait %s", DebugTag().c_str(),
72                 StateString().c_str());
73       }
74       if (has_value_) {
75         return std::move(value_);
76       } else {
77         return waiter_.pending();
78       }
79     };
80   }
81 
82   // Produce a promise to wait for a value from this latch.
83   // Copies the result out of the latch.
WaitAndCopy()84   auto WaitAndCopy() {
85 #ifndef NDEBUG
86     has_had_waiters_ = true;
87 #endif
88     return [this]() -> Poll<T> {
89       if (grpc_trace_promise_primitives.enabled()) {
90         gpr_log(GPR_INFO, "%sWaitAndCopy %s", DebugTag().c_str(),
91                 StateString().c_str());
92       }
93       if (has_value_) {
94         return value_;
95       } else {
96         return waiter_.pending();
97       }
98     };
99   }
100 
101   // Set the value of the latch. Can only be called once.
Set(T value)102   void Set(T value) {
103     if (grpc_trace_promise_primitives.enabled()) {
104       gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str());
105     }
106     GPR_DEBUG_ASSERT(!has_value_);
107     value_ = std::move(value);
108     has_value_ = true;
109     waiter_.Wake();
110   }
111 
is_set()112   bool is_set() const { return has_value_; }
113 
114  private:
DebugTag()115   std::string DebugTag() {
116     return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH[0x",
117                         reinterpret_cast<uintptr_t>(this), "]: ");
118   }
119 
StateString()120   std::string StateString() {
121     return absl::StrCat("has_value:", has_value_ ? "true" : "false",
122                         " waiter:", waiter_.DebugString());
123   }
124 
125   // The value stored (if has_value_ is true), otherwise some random value, we
126   // don't care.
127   // Why not absl::optional<>? Writing things this way lets us compress
128   // has_value_ with waiter_ and leads to some significant memory savings for
129   // some scenarios.
130   GPR_NO_UNIQUE_ADDRESS T value_;
131   // True if we have a value set, false otherwise.
132   bool has_value_ = false;
133 #ifndef NDEBUG
134   // Has this latch ever had waiters.
135   bool has_had_waiters_ = false;
136 #endif
137   IntraActivityWaiter waiter_;
138 };
139 
140 // Specialization for void.
141 template <>
142 class Latch<void> {
143  public:
144   Latch() = default;
145   Latch(const Latch&) = delete;
146   Latch& operator=(const Latch&) = delete;
Latch(Latch && other)147   Latch(Latch&& other) noexcept : is_set_(other.is_set_) {
148 #ifndef NDEBUG
149     GPR_DEBUG_ASSERT(!other.has_had_waiters_);
150 #endif
151   }
152   Latch& operator=(Latch&& other) noexcept {
153 #ifndef NDEBUG
154     GPR_DEBUG_ASSERT(!other.has_had_waiters_);
155 #endif
156     is_set_ = other.is_set_;
157     return *this;
158   }
159 
160   // Produce a promise to wait for this latch.
Wait()161   auto Wait() {
162 #ifndef NDEBUG
163     has_had_waiters_ = true;
164 #endif
165     return [this]() -> Poll<Empty> {
166       if (grpc_trace_promise_primitives.enabled()) {
167         gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(),
168                 StateString().c_str());
169       }
170       if (is_set_) {
171         return Empty{};
172       } else {
173         return waiter_.pending();
174       }
175     };
176   }
177 
178   // Set the latch. Can only be called once.
Set()179   void Set() {
180     if (grpc_trace_promise_primitives.enabled()) {
181       gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str());
182     }
183     GPR_DEBUG_ASSERT(!is_set_);
184     is_set_ = true;
185     waiter_.Wake();
186   }
187 
is_set()188   bool is_set() const { return is_set_; }
189 
190  private:
DebugTag()191   std::string DebugTag() {
192     return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x",
193                         reinterpret_cast<uintptr_t>(this), "]: ");
194   }
195 
StateString()196   std::string StateString() {
197     return absl::StrCat("is_set:", is_set_ ? "true" : "false",
198                         " waiter:", waiter_.DebugString());
199   }
200 
201   // True if we have a value set, false otherwise.
202   bool is_set_ = false;
203 #ifndef NDEBUG
204   // Has this latch ever had waiters.
205   bool has_had_waiters_ = false;
206 #endif
207   IntraActivityWaiter waiter_;
208 };
209 
210 template <typename T>
211 using LatchWaitPromise = decltype(std::declval<Latch<T>>().Wait());
212 
213 // A Latch that can have its value observed by outside threads, but only waited
214 // upon from inside a single activity.
215 template <typename T>
216 class ExternallyObservableLatch;
217 
218 template <>
219 class ExternallyObservableLatch<void> {
220  public:
221   ExternallyObservableLatch() = default;
222   ExternallyObservableLatch(const ExternallyObservableLatch&) = delete;
223   ExternallyObservableLatch& operator=(const ExternallyObservableLatch&) =
224       delete;
225 
226   // Produce a promise to wait for this latch.
Wait()227   auto Wait() {
228     return [this]() -> Poll<Empty> {
229       if (grpc_trace_promise_primitives.enabled()) {
230         gpr_log(GPR_INFO, "%sPollWait %s", DebugTag().c_str(),
231                 StateString().c_str());
232       }
233       if (IsSet()) {
234         return Empty{};
235       } else {
236         return waiter_.pending();
237       }
238     };
239   }
240 
241   // Set the latch.
Set()242   void Set() {
243     if (grpc_trace_promise_primitives.enabled()) {
244       gpr_log(GPR_INFO, "%sSet %s", DebugTag().c_str(), StateString().c_str());
245     }
246     is_set_.store(true, std::memory_order_relaxed);
247     waiter_.Wake();
248   }
249 
IsSet()250   bool IsSet() const { return is_set_.load(std::memory_order_relaxed); }
251 
Reset()252   void Reset() {
253     if (grpc_trace_promise_primitives.enabled()) {
254       gpr_log(GPR_INFO, "%sReset %s", DebugTag().c_str(),
255               StateString().c_str());
256     }
257     is_set_.store(false, std::memory_order_relaxed);
258   }
259 
260  private:
DebugTag()261   std::string DebugTag() {
262     return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x",
263                         reinterpret_cast<uintptr_t>(this), "]: ");
264   }
265 
StateString()266   std::string StateString() {
267     return absl::StrCat(
268         "is_set:", is_set_.load(std::memory_order_relaxed) ? "true" : "false",
269         " waiter:", waiter_.DebugString());
270   }
271 
272   // True if we have a value set, false otherwise.
273   std::atomic<bool> is_set_{false};
274   IntraActivityWaiter waiter_;
275 };
276 
277 }  // namespace grpc_core
278 
279 #endif  // GRPC_SRC_CORE_LIB_PROMISE_LATCH_H
280