• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // Copyright 2021 gRPC authors.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include <grpc/support/port_platform.h>
16 
17 #include "src/core/lib/promise/activity.h"
18 
19 #include <stddef.h>
20 
21 #include <vector>
22 
23 #include "absl/strings/str_cat.h"
24 #include "absl/strings/str_format.h"
25 #include "absl/strings/str_join.h"
26 
27 #include "src/core/lib/gprpp/atomic_utils.h"
28 
29 namespace grpc_core {
30 
31 ///////////////////////////////////////////////////////////////////////////////
32 // GLOBALS
33 
34 thread_local Activity* Activity::g_current_activity_{nullptr};
35 
36 namespace promise_detail {
37 
38 ///////////////////////////////////////////////////////////////////////////////
39 // HELPER TYPES
40 
ActivityDebugTag(WakeupMask) const41 std::string Unwakeable::ActivityDebugTag(WakeupMask) const {
42   return "<unknown>";
43 }
44 
45 // Weak handle to an Activity.
46 // Handle can persist while Activity goes away.
47 class FreestandingActivity::Handle final : public Wakeable {
48  public:
Handle(FreestandingActivity * activity)49   explicit Handle(FreestandingActivity* activity) : activity_(activity) {}
50 
51   // Ref the Handle (not the activity).
Ref()52   void Ref() { refs_.fetch_add(1, std::memory_order_relaxed); }
53 
54   // Activity is going away... drop its reference and sever the connection back.
DropActivity()55   void DropActivity() ABSL_LOCKS_EXCLUDED(mu_) {
56     mu_.Lock();
57     GPR_ASSERT(activity_ != nullptr);
58     activity_ = nullptr;
59     mu_.Unlock();
60     Unref();
61   }
62 
63   // Activity needs to wake up (if it still exists!) - wake it up, and drop the
64   // ref that was kept for this handle.
Wakeup(WakeupMask)65   void Wakeup(WakeupMask) override ABSL_LOCKS_EXCLUDED(mu_) {
66     mu_.Lock();
67     // Note that activity refcount can drop to zero, but we could win the lock
68     // against DropActivity, so we need to only increase activities refcount if
69     // it is non-zero.
70     if (activity_ && activity_->RefIfNonzero()) {
71       FreestandingActivity* activity = activity_;
72       mu_.Unlock();
73       // Activity still exists and we have a reference: wake it up, which will
74       // drop the ref.
75       activity->Wakeup(0);
76     } else {
77       // Could not get the activity - it's either gone or going. No need to wake
78       // it up!
79       mu_.Unlock();
80     }
81     // Drop the ref to the handle (we have one ref = one wakeup semantics).
82     Unref();
83   }
84 
WakeupAsync(WakeupMask)85   void WakeupAsync(WakeupMask) override ABSL_LOCKS_EXCLUDED(mu_) {
86     mu_.Lock();
87     // Note that activity refcount can drop to zero, but we could win the lock
88     // against DropActivity, so we need to only increase activities refcount if
89     // it is non-zero.
90     if (activity_ && activity_->RefIfNonzero()) {
91       FreestandingActivity* activity = activity_;
92       mu_.Unlock();
93       // Activity still exists and we have a reference: wake it up, which will
94       // drop the ref.
95       activity->WakeupAsync(0);
96     } else {
97       // Could not get the activity - it's either gone or going. No need to wake
98       // it up!
99       mu_.Unlock();
100     }
101     // Drop the ref to the handle (we have one ref = one wakeup semantics).
102     Unref();
103   }
104 
Drop(WakeupMask)105   void Drop(WakeupMask) override { Unref(); }
106 
ActivityDebugTag(WakeupMask) const107   std::string ActivityDebugTag(WakeupMask) const override {
108     MutexLock lock(&mu_);
109     return activity_ == nullptr ? "<unknown>" : activity_->DebugTag();
110   }
111 
112  private:
113   // Unref the Handle (not the activity).
Unref()114   void Unref() {
115     if (1 == refs_.fetch_sub(1, std::memory_order_acq_rel)) {
116       delete this;
117     }
118   }
119 
120   // Two initial refs: one for the waiter that caused instantiation, one for the
121   // activity.
122   std::atomic<size_t> refs_{2};
123   mutable Mutex mu_ ABSL_ACQUIRED_AFTER(activity_->mu_);
124   FreestandingActivity* activity_ ABSL_GUARDED_BY(mu_);
125 };
126 
127 ///////////////////////////////////////////////////////////////////////////////
128 // ACTIVITY IMPLEMENTATION
129 
RefIfNonzero()130 bool FreestandingActivity::RefIfNonzero() { return IncrementIfNonzero(&refs_); }
131 
RefHandle()132 FreestandingActivity::Handle* FreestandingActivity::RefHandle() {
133   if (handle_ == nullptr) {
134     // No handle created yet - construct it and return it.
135     handle_ = new Handle(this);
136     return handle_;
137   } else {
138     // Already had to create a handle, ref & return it.
139     handle_->Ref();
140     return handle_;
141   }
142 }
143 
DropHandle()144 void FreestandingActivity::DropHandle() {
145   handle_->DropActivity();
146   handle_ = nullptr;
147 }
148 
MakeNonOwningWaker()149 Waker FreestandingActivity::MakeNonOwningWaker() {
150   mu_.AssertHeld();
151   return Waker(RefHandle(), 0);
152 }
153 
154 }  // namespace promise_detail
155 
DebugTag() const156 std::string Activity::DebugTag() const {
157   return absl::StrFormat("ACTIVITY[%p]", this);
158 }
159 
160 ///////////////////////////////////////////////////////////////////////////////
161 // INTRA ACTIVITY WAKER IMPLEMENTATION
162 
DebugString() const163 std::string IntraActivityWaiter::DebugString() const {
164   std::vector<int> bits;
165   for (size_t i = 0; i < 8 * sizeof(WakeupMask); i++) {
166     if (wakeups_ & (1 << i)) bits.push_back(i);
167   }
168   return absl::StrCat("{", absl::StrJoin(bits, ","), "}");
169 }
170 
171 }  // namespace grpc_core
172