1 // Copyright 2021 gRPC authors. 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 #ifndef GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H 16 #define GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H 17 18 #include <grpc/support/port_platform.h> 19 #include <stdint.h> 20 21 #include <string> 22 23 #include "absl/base/thread_annotations.h" 24 #include "absl/log/log.h" 25 #include "absl/strings/str_cat.h" 26 #include "src/core/lib/debug/trace.h" 27 #include "src/core/lib/promise/activity.h" 28 #include "src/core/lib/promise/poll.h" 29 #include "src/core/lib/promise/wait_set.h" 30 #include "src/core/util/sync.h" 31 32 namespace grpc_core { 33 34 // A latch providing true cross activity wakeups 35 template <typename T> 36 class InterActivityLatch { 37 public: 38 InterActivityLatch() = default; 39 InterActivityLatch(const InterActivityLatch&) = delete; 40 InterActivityLatch& operator=(const InterActivityLatch&) = delete; 41 42 // Produce a promise to wait for this latch. Wait()43 auto Wait() { 44 return [this]() -> Poll<T> { 45 MutexLock lock(&mu_); 46 GRPC_TRACE_LOG(promise_primitives, INFO) 47 << DebugTag() << "PollWait " << StateString(); 48 if (is_set_) { 49 return std::move(value_); 50 } else { 51 return waiters_.AddPending( 52 GetContext<Activity>()->MakeNonOwningWaker()); 53 } 54 }; 55 } 56 57 // Set the latch. Set(T value)58 void Set(T value) { 59 MutexLock lock(&mu_); 60 GRPC_TRACE_LOG(promise_primitives, INFO) 61 << DebugTag() << "Set " << StateString(); 62 is_set_ = true; 63 value_ = std::move(value); 64 waiters_.WakeupAsync(); 65 } 66 IsSet()67 bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) { 68 MutexLock lock(&mu_); 69 return is_set_; 70 } 71 72 private: DebugTag()73 std::string DebugTag() { 74 return absl::StrCat(GetContext<Activity>()->DebugTag(), 75 " INTER_ACTIVITY_LATCH[0x", 76 reinterpret_cast<uintptr_t>(this), "]: "); 77 } 78 StateString()79 std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 80 return absl::StrCat("is_set:", is_set_); 81 } 82 83 mutable Mutex mu_; 84 // True if we have a value set, false otherwise. 85 bool is_set_ ABSL_GUARDED_BY(mu_) = false; 86 WaitSet waiters_ ABSL_GUARDED_BY(mu_); 87 T value_ ABSL_GUARDED_BY(mu_); 88 }; 89 90 template <> 91 class InterActivityLatch<void> { 92 public: 93 InterActivityLatch() = default; 94 InterActivityLatch(const InterActivityLatch&) = delete; 95 InterActivityLatch& operator=(const InterActivityLatch&) = delete; 96 97 // Produce a promise to wait for this latch. Wait()98 auto Wait() { 99 return [this]() -> Poll<Empty> { 100 MutexLock lock(&mu_); 101 GRPC_TRACE_LOG(promise_primitives, INFO) 102 << DebugTag() << "PollWait " << StateString(); 103 if (is_set_) { 104 return Empty{}; 105 } else { 106 return waiters_.AddPending( 107 GetContext<Activity>()->MakeNonOwningWaker()); 108 } 109 }; 110 } 111 112 // Set the latch. Set()113 void Set() { 114 MutexLock lock(&mu_); 115 GRPC_TRACE_LOG(promise_primitives, INFO) 116 << DebugTag() << "Set " << StateString(); 117 is_set_ = true; 118 waiters_.WakeupAsync(); 119 } 120 IsSet()121 bool IsSet() const ABSL_LOCKS_EXCLUDED(mu_) { 122 MutexLock lock(&mu_); 123 return is_set_; 124 } 125 126 private: DebugTag()127 std::string DebugTag() { 128 return absl::StrCat( 129 HasContext<Activity>() ? GetContext<Activity>()->DebugTag() 130 : "NO_ACTIVITY:", 131 " INTER_ACTIVITY_LATCH[0x", reinterpret_cast<uintptr_t>(this), "]: "); 132 } 133 StateString()134 std::string StateString() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_) { 135 return absl::StrCat("is_set:", is_set_); 136 } 137 138 mutable Mutex mu_; 139 // True if we have a value set, false otherwise. 140 bool is_set_ ABSL_GUARDED_BY(mu_) = false; 141 WaitSet waiters_ ABSL_GUARDED_BY(mu_); 142 }; 143 144 } // namespace grpc_core 145 146 #endif // GRPC_SRC_CORE_LIB_PROMISE_INTER_ACTIVITY_LATCH_H 147