1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/call_combiner.h"
22
23 #include <inttypes.h>
24
25 #include <grpc/support/log.h>
26 #include "src/core/lib/debug/stats.h"
27 #include "src/core/lib/profiling/timers.h"
28
29 namespace grpc_core {
30
31 DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");
32
33 namespace {
34
DecodeCancelStateError(gpr_atm cancel_state)35 grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) {
36 if (cancel_state & 1) {
37 return reinterpret_cast<grpc_error_handle>(cancel_state &
38 ~static_cast<gpr_atm>(1));
39 }
40 return GRPC_ERROR_NONE;
41 }
42
EncodeCancelStateError(grpc_error_handle error)43 gpr_atm EncodeCancelStateError(grpc_error_handle error) {
44 return static_cast<gpr_atm>(1) | reinterpret_cast<gpr_atm>(error);
45 }
46
47 } // namespace
48
CallCombiner()49 CallCombiner::CallCombiner() {
50 gpr_atm_no_barrier_store(&cancel_state_, 0);
51 gpr_atm_no_barrier_store(&size_, 0);
52 #ifdef GRPC_TSAN_ENABLED
53 GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
54 grpc_schedule_on_exec_ctx);
55 #endif
56 }
57
~CallCombiner()58 CallCombiner::~CallCombiner() {
59 GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
60 }
61
62 #ifdef GRPC_TSAN_ENABLED
TsanClosure(void * arg,grpc_error_handle error)63 void CallCombiner::TsanClosure(void* arg, grpc_error_handle error) {
64 CallCombiner* self = static_cast<CallCombiner*>(arg);
65 // We ref-count the lock, and check if it's already taken.
66 // If it was taken, we should do nothing. Otherwise, we will mark it as
67 // locked. Note that if two different threads try to do this, only one of
68 // them will be able to mark the lock as acquired, while they both run their
69 // callbacks. In such cases (which should never happen for call_combiner),
70 // TSAN will correctly produce an error.
71 //
72 // TODO(soheil): This only covers the callbacks scheduled by
73 // CallCombiner::Start() and CallCombiner::Stop().
74 // If in the future, a callback gets scheduled using other
75 // mechanisms, we will need to add APIs to externally lock
76 // call combiners.
77 RefCountedPtr<TsanLock> lock = self->tsan_lock_;
78 bool prev = false;
79 if (lock->taken.compare_exchange_strong(prev, true)) {
80 TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
81 } else {
82 lock.reset();
83 }
84 grpc_core::Closure::Run(DEBUG_LOCATION, self->original_closure_,
85 GRPC_ERROR_REF(error));
86 if (lock != nullptr) {
87 TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
88 bool prev = true;
89 GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false));
90 }
91 }
92 #endif
93
ScheduleClosure(grpc_closure * closure,grpc_error_handle error)94 void CallCombiner::ScheduleClosure(grpc_closure* closure,
95 grpc_error_handle error) {
96 #ifdef GRPC_TSAN_ENABLED
97 original_closure_ = closure;
98 ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error);
99 #else
100 ExecCtx::Run(DEBUG_LOCATION, closure, error);
101 #endif
102 }
103
104 #ifndef NDEBUG
105 #define DEBUG_ARGS const char *file, int line,
106 #define DEBUG_FMT_STR "%s:%d: "
107 #define DEBUG_FMT_ARGS , file, line
108 #else
109 #define DEBUG_ARGS
110 #define DEBUG_FMT_STR
111 #define DEBUG_FMT_ARGS
112 #endif
113
Start(grpc_closure * closure,grpc_error_handle error,DEBUG_ARGS const char * reason)114 void CallCombiner::Start(grpc_closure* closure, grpc_error_handle error,
115 DEBUG_ARGS const char* reason) {
116 GPR_TIMER_SCOPE("CallCombiner::Start", 0);
117 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
118 gpr_log(GPR_INFO,
119 "==> CallCombiner::Start() [%p] closure=%p [" DEBUG_FMT_STR
120 "%s] error=%s",
121 this, closure DEBUG_FMT_ARGS, reason,
122 grpc_error_std_string(error).c_str());
123 }
124 size_t prev_size =
125 static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)1));
126 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
127 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
128 prev_size + 1);
129 }
130 GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
131 if (prev_size == 0) {
132 GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
133 GPR_TIMER_MARK("call_combiner_initiate", 0);
134 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
135 gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY");
136 }
137 // Queue was empty, so execute this closure immediately.
138 ScheduleClosure(closure, error);
139 } else {
140 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
141 gpr_log(GPR_INFO, " QUEUING");
142 }
143 // Queue was not empty, so add closure to queue.
144 closure->error_data.error = error;
145 queue_.Push(
146 reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
147 }
148 }
149
Stop(DEBUG_ARGS const char * reason)150 void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
151 GPR_TIMER_SCOPE("CallCombiner::Stop", 0);
152 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
153 gpr_log(GPR_INFO, "==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]",
154 this DEBUG_FMT_ARGS, reason);
155 }
156 size_t prev_size =
157 static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1));
158 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
159 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
160 prev_size - 1);
161 }
162 GPR_ASSERT(prev_size >= 1);
163 if (prev_size > 1) {
164 while (true) {
165 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
166 gpr_log(GPR_INFO, " checking queue");
167 }
168 bool empty;
169 grpc_closure* closure =
170 reinterpret_cast<grpc_closure*>(queue_.PopAndCheckEnd(&empty));
171 if (closure == nullptr) {
172 // This can happen either due to a race condition within the mpscq
173 // code or because of a race with Start().
174 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
175 gpr_log(GPR_INFO, " queue returned no result; checking again");
176 }
177 continue;
178 }
179 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
180 gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
181 closure,
182 grpc_error_std_string(closure->error_data.error).c_str());
183 }
184 ScheduleClosure(closure, closure->error_data.error);
185 break;
186 }
187 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
188 gpr_log(GPR_INFO, " queue empty");
189 }
190 }
191
SetNotifyOnCancel(grpc_closure * closure)192 void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
193 GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
194 while (true) {
195 // Decode original state.
196 gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
197 grpc_error_handle original_error = DecodeCancelStateError(original_state);
198 // If error is set, invoke the cancellation closure immediately.
199 // Otherwise, store the new closure.
200 if (original_error != GRPC_ERROR_NONE) {
201 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
202 gpr_log(GPR_INFO,
203 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
204 "for pre-existing cancellation",
205 this, closure);
206 }
207 ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(original_error));
208 break;
209 } else {
210 if (gpr_atm_full_cas(&cancel_state_, original_state,
211 reinterpret_cast<gpr_atm>(closure))) {
212 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
213 gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
214 this, closure);
215 }
216 // If we replaced an earlier closure, invoke the original
217 // closure with GRPC_ERROR_NONE. This allows callers to clean
218 // up any resources they may be holding for the callback.
219 if (original_state != 0) {
220 closure = reinterpret_cast<grpc_closure*>(original_state);
221 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
222 gpr_log(GPR_INFO,
223 "call_combiner=%p: scheduling old cancel callback=%p", this,
224 closure);
225 }
226 ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
227 }
228 break;
229 }
230 }
231 // cas failed, try again.
232 }
233 }
234
Cancel(grpc_error_handle error)235 void CallCombiner::Cancel(grpc_error_handle error) {
236 GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
237 while (true) {
238 gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
239 grpc_error_handle original_error = DecodeCancelStateError(original_state);
240 if (original_error != GRPC_ERROR_NONE) {
241 GRPC_ERROR_UNREF(error);
242 break;
243 }
244 if (gpr_atm_full_cas(&cancel_state_, original_state,
245 EncodeCancelStateError(error))) {
246 if (original_state != 0) {
247 grpc_closure* notify_on_cancel =
248 reinterpret_cast<grpc_closure*>(original_state);
249 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
250 gpr_log(GPR_INFO,
251 "call_combiner=%p: scheduling notify_on_cancel callback=%p",
252 this, notify_on_cancel);
253 }
254 ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, GRPC_ERROR_REF(error));
255 }
256 break;
257 }
258 // cas failed, try again.
259 }
260 }
261
262 } // namespace grpc_core
263