1 /*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/call_combiner.h"
22
23 #include <inttypes.h>
24
25 #include <grpc/support/log.h>
26 #include "src/core/lib/debug/stats.h"
27 #include "src/core/lib/profiling/timers.h"
28
29 namespace grpc_core {
30
31 DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");
32
33 namespace {
34
DecodeCancelStateError(gpr_atm cancel_state)35 grpc_error* DecodeCancelStateError(gpr_atm cancel_state) {
36 if (cancel_state & 1) {
37 return reinterpret_cast<grpc_error*>(cancel_state &
38 ~static_cast<gpr_atm>(1));
39 }
40 return GRPC_ERROR_NONE;
41 }
42
EncodeCancelStateError(grpc_error * error)43 gpr_atm EncodeCancelStateError(grpc_error* error) {
44 return static_cast<gpr_atm>(1) | reinterpret_cast<gpr_atm>(error);
45 }
46
47 } // namespace
48
CallCombiner()49 CallCombiner::CallCombiner() {
50 gpr_atm_no_barrier_store(&cancel_state_, 0);
51 gpr_atm_no_barrier_store(&size_, 0);
52 #ifdef GRPC_TSAN_ENABLED
53 GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
54 grpc_schedule_on_exec_ctx);
55 #endif
56 }
57
~CallCombiner()58 CallCombiner::~CallCombiner() {
59 GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
60 }
61
62 #ifdef GRPC_TSAN_ENABLED
TsanClosure(void * arg,grpc_error * error)63 void CallCombiner::TsanClosure(void* arg, grpc_error* error) {
64 CallCombiner* self = static_cast<CallCombiner*>(arg);
65 // We ref-count the lock, and check if it's already taken.
66 // If it was taken, we should do nothing. Otherwise, we will mark it as
67 // locked. Note that if two different threads try to do this, only one of
68 // them will be able to mark the lock as acquired, while they both run their
69 // callbacks. In such cases (which should never happen for call_combiner),
70 // TSAN will correctly produce an error.
71 //
72 // TODO(soheil): This only covers the callbacks scheduled by
73 // CallCombiner::Start() and CallCombiner::Stop().
74 // If in the future, a callback gets scheduled using other
75 // mechanisms, we will need to add APIs to externally lock
76 // call combiners.
77 RefCountedPtr<TsanLock> lock = self->tsan_lock_;
78 bool prev = false;
79 if (lock->taken.compare_exchange_strong(prev, true)) {
80 TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
81 } else {
82 lock.reset();
83 }
84 grpc_core::Closure::Run(DEBUG_LOCATION, self->original_closure_,
85 GRPC_ERROR_REF(error));
86 if (lock != nullptr) {
87 TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
88 bool prev = true;
89 GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false));
90 }
91 }
92 #endif
93
ScheduleClosure(grpc_closure * closure,grpc_error * error)94 void CallCombiner::ScheduleClosure(grpc_closure* closure, grpc_error* error) {
95 #ifdef GRPC_TSAN_ENABLED
96 original_closure_ = closure;
97 ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error);
98 #else
99 ExecCtx::Run(DEBUG_LOCATION, closure, error);
100 #endif
101 }
102
103 #ifndef NDEBUG
104 #define DEBUG_ARGS const char *file, int line,
105 #define DEBUG_FMT_STR "%s:%d: "
106 #define DEBUG_FMT_ARGS , file, line
107 #else
108 #define DEBUG_ARGS
109 #define DEBUG_FMT_STR
110 #define DEBUG_FMT_ARGS
111 #endif
112
Start(grpc_closure * closure,grpc_error * error,DEBUG_ARGS const char * reason)113 void CallCombiner::Start(grpc_closure* closure, grpc_error* error,
114 DEBUG_ARGS const char* reason) {
115 GPR_TIMER_SCOPE("CallCombiner::Start", 0);
116 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
117 gpr_log(GPR_INFO,
118 "==> CallCombiner::Start() [%p] closure=%p [" DEBUG_FMT_STR
119 "%s] error=%s",
120 this, closure DEBUG_FMT_ARGS, reason, grpc_error_string(error));
121 }
122 size_t prev_size =
123 static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)1));
124 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
125 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
126 prev_size + 1);
127 }
128 GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
129 if (prev_size == 0) {
130 GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
131 GPR_TIMER_MARK("call_combiner_initiate", 0);
132 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
133 gpr_log(GPR_INFO, " EXECUTING IMMEDIATELY");
134 }
135 // Queue was empty, so execute this closure immediately.
136 ScheduleClosure(closure, error);
137 } else {
138 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
139 gpr_log(GPR_INFO, " QUEUING");
140 }
141 // Queue was not empty, so add closure to queue.
142 closure->error_data.error = error;
143 queue_.Push(
144 reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
145 }
146 }
147
Stop(DEBUG_ARGS const char * reason)148 void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
149 GPR_TIMER_SCOPE("CallCombiner::Stop", 0);
150 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
151 gpr_log(GPR_INFO, "==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]",
152 this DEBUG_FMT_ARGS, reason);
153 }
154 size_t prev_size =
155 static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1));
156 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
157 gpr_log(GPR_INFO, " size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
158 prev_size - 1);
159 }
160 GPR_ASSERT(prev_size >= 1);
161 if (prev_size > 1) {
162 while (true) {
163 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
164 gpr_log(GPR_INFO, " checking queue");
165 }
166 bool empty;
167 grpc_closure* closure =
168 reinterpret_cast<grpc_closure*>(queue_.PopAndCheckEnd(&empty));
169 if (closure == nullptr) {
170 // This can happen either due to a race condition within the mpscq
171 // code or because of a race with Start().
172 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
173 gpr_log(GPR_INFO, " queue returned no result; checking again");
174 }
175 continue;
176 }
177 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
178 gpr_log(GPR_INFO, " EXECUTING FROM QUEUE: closure=%p error=%s",
179 closure, grpc_error_string(closure->error_data.error));
180 }
181 ScheduleClosure(closure, closure->error_data.error);
182 break;
183 }
184 } else if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
185 gpr_log(GPR_INFO, " queue empty");
186 }
187 }
188
SetNotifyOnCancel(grpc_closure * closure)189 void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
190 GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
191 while (true) {
192 // Decode original state.
193 gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
194 grpc_error* original_error = DecodeCancelStateError(original_state);
195 // If error is set, invoke the cancellation closure immediately.
196 // Otherwise, store the new closure.
197 if (original_error != GRPC_ERROR_NONE) {
198 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
199 gpr_log(GPR_INFO,
200 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
201 "for pre-existing cancellation",
202 this, closure);
203 }
204 ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(original_error));
205 break;
206 } else {
207 if (gpr_atm_full_cas(&cancel_state_, original_state,
208 reinterpret_cast<gpr_atm>(closure))) {
209 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
210 gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
211 this, closure);
212 }
213 // If we replaced an earlier closure, invoke the original
214 // closure with GRPC_ERROR_NONE. This allows callers to clean
215 // up any resources they may be holding for the callback.
216 if (original_state != 0) {
217 closure = reinterpret_cast<grpc_closure*>(original_state);
218 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
219 gpr_log(GPR_INFO,
220 "call_combiner=%p: scheduling old cancel callback=%p", this,
221 closure);
222 }
223 ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
224 }
225 break;
226 }
227 }
228 // cas failed, try again.
229 }
230 }
231
Cancel(grpc_error * error)232 void CallCombiner::Cancel(grpc_error* error) {
233 GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
234 while (true) {
235 gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
236 grpc_error* original_error = DecodeCancelStateError(original_state);
237 if (original_error != GRPC_ERROR_NONE) {
238 GRPC_ERROR_UNREF(error);
239 break;
240 }
241 if (gpr_atm_full_cas(&cancel_state_, original_state,
242 EncodeCancelStateError(error))) {
243 if (original_state != 0) {
244 grpc_closure* notify_on_cancel =
245 reinterpret_cast<grpc_closure*>(original_state);
246 if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
247 gpr_log(GPR_INFO,
248 "call_combiner=%p: scheduling notify_on_cancel callback=%p",
249 this, notify_on_cancel);
250 }
251 ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, GRPC_ERROR_REF(error));
252 }
253 break;
254 }
255 // cas failed, try again.
256 }
257 }
258
259 } // namespace grpc_core
260