• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_LATCH_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_LATCH_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <stdint.h>
20 
21 #include <atomic>
22 #include <string>
23 #include <utility>
24 
25 #include "absl/log/check.h"
26 #include "absl/log/log.h"
27 #include "absl/strings/str_cat.h"
28 #include "src/core/lib/debug/trace.h"
29 #include "src/core/lib/promise/activity.h"
30 #include "src/core/lib/promise/poll.h"
31 
32 namespace grpc_core {
33 
34 // Latch provides a single set waitable object.
35 // Initially the Latch is unset.
36 // It can be waited upon by the Wait method, which produces a Promise that
37 // resolves when the Latch is Set to a value of type T.
38 // Latches only work correctly within a single activity.
39 template <typename T>
40 class Latch {
41  public:
42   Latch() = default;
43   Latch(const Latch&) = delete;
Latch(T value)44   explicit Latch(T value) : value_(std::move(value)), has_value_(true) {}
45   Latch& operator=(const Latch&) = delete;
Latch(Latch && other)46   Latch(Latch&& other) noexcept
47       : value_(std::move(other.value_)), has_value_(other.has_value_) {
48 #ifndef NDEBUG
49     DCHECK(!other.has_had_waiters_);
50 #endif
51   }
52   Latch& operator=(Latch&& other) noexcept {
53 #ifndef NDEBUG
54     DCHECK(!other.has_had_waiters_);
55 #endif
56     value_ = std::move(other.value_);
57     has_value_ = other.has_value_;
58     return *this;
59   }
60 
61   // Produce a promise to wait for a value from this latch.
62   // Moves the result out of the latch.
Wait()63   auto Wait() {
64 #ifndef NDEBUG
65     has_had_waiters_ = true;
66 #endif
67     return [this]() -> Poll<T> {
68       GRPC_TRACE_LOG(promise_primitives, INFO)
69           << DebugTag() << "Wait " << StateString();
70       if (has_value_) {
71         return std::move(value_);
72       } else {
73         return waiter_.pending();
74       }
75     };
76   }
77 
78   // Produce a promise to wait for a value from this latch.
79   // Copies the result out of the latch.
WaitAndCopy()80   auto WaitAndCopy() {
81 #ifndef NDEBUG
82     has_had_waiters_ = true;
83 #endif
84     return [this]() -> Poll<T> {
85       GRPC_TRACE_LOG(promise_primitives, INFO)
86           << DebugTag() << "WaitAndCopy " << StateString();
87       if (has_value_) {
88         return value_;
89       } else {
90         return waiter_.pending();
91       }
92     };
93   }
94 
95   // Set the value of the latch. Can only be called once.
Set(T value)96   void Set(T value) {
97     GRPC_TRACE_LOG(promise_primitives, INFO)
98         << DebugTag() << "Set " << StateString();
99     DCHECK(!has_value_);
100     value_ = std::move(value);
101     has_value_ = true;
102     waiter_.Wake();
103   }
104 
is_set()105   bool is_set() const { return has_value_; }
106 
107  private:
DebugTag()108   std::string DebugTag() {
109     return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH[0x",
110                         reinterpret_cast<uintptr_t>(this), "]: ");
111   }
112 
StateString()113   std::string StateString() {
114     return absl::StrCat("has_value:", has_value_ ? "true" : "false",
115                         " waiter:", waiter_.DebugString());
116   }
117 
118   // The value stored (if has_value_ is true), otherwise some random value, we
119   // don't care.
120   // Why not absl::optional<>? Writing things this way lets us compress
121   // has_value_ with waiter_ and leads to some significant memory savings for
122   // some scenarios.
123   GPR_NO_UNIQUE_ADDRESS T value_;
124   // True if we have a value set, false otherwise.
125   bool has_value_ = false;
126 #ifndef NDEBUG
127   // Has this latch ever had waiters.
128   bool has_had_waiters_ = false;
129 #endif
130   IntraActivityWaiter waiter_;
131 };
132 
133 // Specialization for void.
134 template <>
135 class Latch<void> {
136  public:
137   Latch() = default;
138   Latch(const Latch&) = delete;
139   Latch& operator=(const Latch&) = delete;
Latch(Latch && other)140   Latch(Latch&& other) noexcept : is_set_(other.is_set_) {
141 #ifndef NDEBUG
142     DCHECK(!other.has_had_waiters_);
143 #endif
144   }
145   Latch& operator=(Latch&& other) noexcept {
146 #ifndef NDEBUG
147     DCHECK(!other.has_had_waiters_);
148 #endif
149     is_set_ = other.is_set_;
150     return *this;
151   }
152 
153   // Produce a promise to wait for this latch.
Wait()154   auto Wait() {
155 #ifndef NDEBUG
156     has_had_waiters_ = true;
157 #endif
158     return [this]() -> Poll<Empty> {
159       GRPC_TRACE_LOG(promise_primitives, INFO)
160           << DebugTag() << "PollWait " << StateString();
161       if (is_set_) {
162         return Empty{};
163       } else {
164         return waiter_.pending();
165       }
166     };
167   }
168 
169   // Set the latch. Can only be called once.
Set()170   void Set() {
171     GRPC_TRACE_LOG(promise_primitives, INFO)
172         << DebugTag() << "Set " << StateString();
173     DCHECK(!is_set_);
174     is_set_ = true;
175     waiter_.Wake();
176   }
177 
is_set()178   bool is_set() const { return is_set_; }
179 
180  private:
DebugTag()181   std::string DebugTag() {
182     return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x",
183                         reinterpret_cast<uintptr_t>(this), "]: ");
184   }
185 
StateString()186   std::string StateString() {
187     return absl::StrCat("is_set:", is_set_ ? "true" : "false",
188                         " waiter:", waiter_.DebugString());
189   }
190 
191   // True if we have a value set, false otherwise.
192   bool is_set_ = false;
193 #ifndef NDEBUG
194   // Has this latch ever had waiters.
195   bool has_had_waiters_ = false;
196 #endif
197   IntraActivityWaiter waiter_;
198 };
199 
200 template <typename T>
201 using LatchWaitPromise = decltype(std::declval<Latch<T>>().Wait());
202 
203 // A Latch that can have its value observed by outside threads, but only waited
204 // upon from inside a single activity.
205 template <typename T>
206 class ExternallyObservableLatch;
207 
208 template <>
209 class ExternallyObservableLatch<void> {
210  public:
211   ExternallyObservableLatch() = default;
212   ExternallyObservableLatch(const ExternallyObservableLatch&) = delete;
213   ExternallyObservableLatch& operator=(const ExternallyObservableLatch&) =
214       delete;
215 
216   // Produce a promise to wait for this latch.
Wait()217   auto Wait() {
218     return [this]() -> Poll<Empty> {
219       GRPC_TRACE_LOG(promise_primitives, INFO)
220           << DebugTag() << "PollWait " << StateString();
221       if (IsSet()) {
222         return Empty{};
223       } else {
224         return waiter_.pending();
225       }
226     };
227   }
228 
229   // Set the latch.
Set()230   void Set() {
231     GRPC_TRACE_LOG(promise_primitives, INFO)
232         << DebugTag() << "Set " << StateString();
233     is_set_.store(true, std::memory_order_relaxed);
234     waiter_.Wake();
235   }
236 
IsSet()237   bool IsSet() const { return is_set_.load(std::memory_order_relaxed); }
238 
Reset()239   void Reset() {
240     GRPC_TRACE_LOG(promise_primitives, INFO)
241         << DebugTag() << "Reset " << StateString();
242     is_set_.store(false, std::memory_order_relaxed);
243   }
244 
245  private:
DebugTag()246   std::string DebugTag() {
247     return absl::StrCat(GetContext<Activity>()->DebugTag(), " LATCH(void)[0x",
248                         reinterpret_cast<uintptr_t>(this), "]: ");
249   }
250 
StateString()251   std::string StateString() {
252     return absl::StrCat(
253         "is_set:", is_set_.load(std::memory_order_relaxed) ? "true" : "false",
254         " waiter:", waiter_.DebugString());
255   }
256 
257   // True if we have a value set, false otherwise.
258   std::atomic<bool> is_set_{false};
259   IntraActivityWaiter waiter_;
260 };
261 
262 }  // namespace grpc_core
263 
264 #endif  // GRPC_SRC_CORE_LIB_PROMISE_LATCH_H
265