• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H
16 #define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H
17 
18 #include <grpc/support/port_platform.h>
19 #include <stdint.h>
20 
21 #include <string>
22 
23 #include "absl/base/thread_annotations.h"
24 #include "absl/log/log.h"
25 #include "absl/strings/str_cat.h"
26 #include "src/core/lib/debug/trace.h"
27 #include "src/core/lib/promise/activity.h"
28 #include "src/core/lib/promise/poll.h"
29 #include "src/core/lib/promise/wait_set.h"
30 #include "src/core/util/sync.h"
31 
32 namespace grpc_core {
33 
34 // A latch providing true cross activity wakeups
35 template <typename T>
36 class InterActivityLatch {
37  public:
38   InterActivityLatch() = default;
39   InterActivityLatch(const InterActivityLatch&) = delete;
40   InterActivityLatch& operator=(const InterActivityLatch&) = delete;
41 
42   // Produce a promise to wait for this latch.
Wait()43   auto Wait() {
44     return [this]() -> Poll<T> {
45       MutexLock lock(&mu_);
46       GRPC_TRACE_LOG(promise_primitives, INFO)
47           << DebugTag() << "PollWait " << StateString();
48       if (is_set_) {
49         return std::move(value_);
50       } else {
51         return waiters_.AddPending(
52             GetContext<Activity>()->MakeNonOwningWaker());
53       }
54     };
55   }
56 
57   // Set the latch.
Set(T value)58   void Set(T value) {
59     MutexLock lock(&mu_);
60     GRPC_TRACE_LOG(promise_primitives, INFO)
61         << DebugTag() << "Set " << StateString();
62     is_set_ = true;
63     value_ = std::move(value);
64     waiters_.WakeupAsync();
65   }
66 
IsSet()67   bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) {
68     MutexLock lock(&mu_);
69     return is_set_;
70   }
71 
72  private:
DebugTag()73   std::string DebugTag() {
74     return absl::StrCat(GetContext<Activity>()->DebugTag(),
75                         " INTER_ACTIVITY_LATCH[0x",
76                         reinterpret_cast<uintptr_t>(this), "]: ");
77   }
78 
StateString()79   std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
80     return absl::StrCat("is_set:", is_set_);
81   }
82 
83   mutable Mutex mu_;
84   // True if we have a value set, false otherwise.
85   bool is_set_ ABSL_GUARDED_BY(mu_) = false;
86   WaitSet waiters_ ABSL_GUARDED_BY(mu_);
87   T value_ ABSL_GUARDED_BY(mu_);
88 };
89 
90 template <>
91 class InterActivityLatch<void> {
92  public:
93   InterActivityLatch() = default;
94   InterActivityLatch(const InterActivityLatch&) = delete;
95   InterActivityLatch& operator=(const InterActivityLatch&) = delete;
96 
97   // Produce a promise to wait for this latch.
Wait()98   auto Wait() {
99     return [this]() -> Poll<Empty> {
100       MutexLock lock(&mu_);
101       GRPC_TRACE_LOG(promise_primitives, INFO)
102           << DebugTag() << "PollWait " << StateString();
103       if (is_set_) {
104         return Empty{};
105       } else {
106         return waiters_.AddPending(
107             GetContext<Activity>()->MakeNonOwningWaker());
108       }
109     };
110   }
111 
112   // Set the latch.
Set()113   void Set() {
114     MutexLock lock(&mu_);
115     GRPC_TRACE_LOG(promise_primitives, INFO)
116         << DebugTag() << "Set " << StateString();
117     is_set_ = true;
118     waiters_.WakeupAsync();
119   }
120 
IsSet()121   bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) {
122     MutexLock lock(&mu_);
123     return is_set_;
124   }
125 
126  private:
DebugTag()127   std::string DebugTag() {
128     return absl::StrCat(
129         HasContext<Activity>() ? GetContext<Activity>()->DebugTag()
130                                : "NO_ACTIVITY:",
131         " INTER_ACTIVITY_LATCH[0x", reinterpret_cast<uintptr_t>(this), "]: ");
132   }
133 
StateString()134   std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
135     return absl::StrCat("is_set:", is_set_);
136   }
137 
138   mutable Mutex mu_;
139   // True if we have a value set, false otherwise.
140   bool is_set_ ABSL_GUARDED_BY(mu_) = false;
141   WaitSet waiters_ ABSL_GUARDED_BY(mu_);
142 };
143 
144 }  // namespace grpc_core
145 
146 #endif  // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H
147