• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/call_combiner.h"
22 
23 #include <inttypes.h>
24 
25 #include <grpc/support/log.h>
26 #include "src/core/lib/debug/stats.h"
27 #include "src/core/lib/profiling/timers.h"
28 
29 namespace grpc_core {
30 
31 DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");
32 
33 namespace {
34 
DecodeCancelStateError(gpr_atm cancel_state)35 grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) {
36   if (cancel_state & 1) {
37     return reinterpret_cast<grpc_error_handle>(cancel_state &
38                                                ~static_cast<gpr_atm>(1));
39   }
40   return GRPC_ERROR_NONE;
41 }
42 
EncodeCancelStateError(grpc_error_handle error)43 gpr_atm EncodeCancelStateError(grpc_error_handle error) {
44   return static_cast<gpr_atm>(1) | reinterpret_cast<gpr_atm>(error);
45 }
46 
47 }  // namespace
48 
CallCombiner()49 CallCombiner::CallCombiner() {
50   gpr_atm_no_barrier_store(&cancel_state_, 0);
51   gpr_atm_no_barrier_store(&size_, 0);
52 #ifdef GRPC_TSAN_ENABLED
53   GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
54                     grpc_schedule_on_exec_ctx);
55 #endif
56 }
57 
~CallCombiner()58 CallCombiner::~CallCombiner() {
59   GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
60 }
61 
62 #ifdef GRPC_TSAN_ENABLED
TsanClosure(void * arg,grpc_error_handle error)63 void CallCombiner::TsanClosure(void* arg, grpc_error_handle error) {
64   CallCombiner* self = static_cast<CallCombiner*>(arg);
65   // We ref-count the lock, and check if it's already taken.
66   // If it was taken, we should do nothing. Otherwise, we will mark it as
67   // locked. Note that if two different threads try to do this, only one of
68   // them will be able to mark the lock as acquired, while they both run their
69   // callbacks. In such cases (which should never happen for call_combiner),
70   // TSAN will correctly produce an error.
71   //
72   // TODO(soheil): This only covers the callbacks scheduled by
73   //               CallCombiner::Start() and CallCombiner::Stop().
74   //               If in the future, a callback gets scheduled using other
75   //               mechanisms, we will need to add APIs to externally lock
76   //               call combiners.
77   RefCountedPtr<TsanLock> lock = self->tsan_lock_;
78   bool prev = false;
79   if (lock->taken.compare_exchange_strong(prev, true)) {
80     TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
81   } else {
82     lock.reset();
83   }
84   grpc_core::Closure::Run(DEBUG_LOCATION, self->original_closure_,
85                           GRPC_ERROR_REF(error));
86   if (lock != nullptr) {
87     TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
88     bool prev = true;
89     GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false));
90   }
91 }
92 #endif
93 
ScheduleClosure(grpc_closure * closure,grpc_error_handle error)94 void CallCombiner::ScheduleClosure(grpc_closure* closure,
95                                    grpc_error_handle error) {
96 #ifdef GRPC_TSAN_ENABLED
97   original_closure_ = closure;
98   ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error);
99 #else
100   ExecCtx::Run(DEBUG_LOCATION, closure, error);
101 #endif
102 }
103 
104 #ifndef NDEBUG
105 #define DEBUG_ARGS const char *file, int line,
106 #define DEBUG_FMT_STR "%s:%d: "
107 #define DEBUG_FMT_ARGS , file, line
108 #else
109 #define DEBUG_ARGS
110 #define DEBUG_FMT_STR
111 #define DEBUG_FMT_ARGS
112 #endif
113 
Start(grpc_closure * closure,grpc_error_handle error,DEBUG_ARGS const char * reason)114 void CallCombiner::Start(grpc_closure* closure, grpc_error_handle error,
115                          DEBUG_ARGS const char* reason) {
116   GPR_TIMER_SCOPE("CallCombiner::Start", 0);
117   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
118     gpr_log(GPR_INFO,
119             "==> CallCombiner::Start() [%p] closure=%p [" DEBUG_FMT_STR
120             "%s] error=%s",
121             this, closure DEBUG_FMT_ARGS, reason,
122             grpc_error_std_string(error).c_str());
123   }
124   size_t prev_size =
125       static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)1));
126   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
127     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
128             prev_size + 1);
129   }
130   GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
131   if (prev_size == 0) {
132     GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
133     GPR_TIMER_MARK("call_combiner_initiate", 0);
134     if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
135       gpr_log(GPR_INFO, "  EXECUTING IMMEDIATELY");
136     }
137     // Queue was empty, so execute this closure immediately.
138     ScheduleClosure(closure, error);
139   } else {
140     if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
141       gpr_log(GPR_INFO, "  QUEUING");
142     }
143     // Queue was not empty, so add closure to queue.
144     closure->error_data.error = error;
145     queue_.Push(
146         reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
147   }
148 }
149 
Stop(DEBUG_ARGS const char * reason)150 void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
151   GPR_TIMER_SCOPE("CallCombiner::Stop", 0);
152   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
153     gpr_log(GPR_INFO, "==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]",
154             this DEBUG_FMT_ARGS, reason);
155   }
156   size_t prev_size =
157       static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1));
158   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
159     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
160             prev_size - 1);
161   }
162   GPR_ASSERT(prev_size >= 1);
163   if (prev_size > 1) {
164     while (true) {
165       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
166         gpr_log(GPR_INFO, "  checking queue");
167       }
168       bool empty;
169       grpc_closure* closure =
170           reinterpret_cast<grpc_closure*>(queue_.PopAndCheckEnd(&empty));
171       if (closure == nullptr) {
172         // This can happen either due to a race condition within the mpscq
173         // code or because of a race with Start().
174         if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
175           gpr_log(GPR_INFO, "  queue returned no result; checking again");
176         }
177         continue;
178       }
179       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
180         gpr_log(GPR_INFO, "  EXECUTING FROM QUEUE: closure=%p error=%s",
181                 closure,
182                 grpc_error_std_string(closure->error_data.error).c_str());
183       }
184       ScheduleClosure(closure, closure->error_data.error);
185       break;
186     }
187   } else if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
188     gpr_log(GPR_INFO, "  queue empty");
189   }
190 }
191 
SetNotifyOnCancel(grpc_closure * closure)192 void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
193   GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
194   while (true) {
195     // Decode original state.
196     gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
197     grpc_error_handle original_error = DecodeCancelStateError(original_state);
198     // If error is set, invoke the cancellation closure immediately.
199     // Otherwise, store the new closure.
200     if (original_error != GRPC_ERROR_NONE) {
201       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
202         gpr_log(GPR_INFO,
203                 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
204                 "for pre-existing cancellation",
205                 this, closure);
206       }
207       ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(original_error));
208       break;
209     } else {
210       if (gpr_atm_full_cas(&cancel_state_, original_state,
211                            reinterpret_cast<gpr_atm>(closure))) {
212         if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
213           gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
214                   this, closure);
215         }
216         // If we replaced an earlier closure, invoke the original
217         // closure with GRPC_ERROR_NONE.  This allows callers to clean
218         // up any resources they may be holding for the callback.
219         if (original_state != 0) {
220           closure = reinterpret_cast<grpc_closure*>(original_state);
221           if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
222             gpr_log(GPR_INFO,
223                     "call_combiner=%p: scheduling old cancel callback=%p", this,
224                     closure);
225           }
226           ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
227         }
228         break;
229       }
230     }
231     // cas failed, try again.
232   }
233 }
234 
Cancel(grpc_error_handle error)235 void CallCombiner::Cancel(grpc_error_handle error) {
236   GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
237   while (true) {
238     gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
239     grpc_error_handle original_error = DecodeCancelStateError(original_state);
240     if (original_error != GRPC_ERROR_NONE) {
241       GRPC_ERROR_UNREF(error);
242       break;
243     }
244     if (gpr_atm_full_cas(&cancel_state_, original_state,
245                          EncodeCancelStateError(error))) {
246       if (original_state != 0) {
247         grpc_closure* notify_on_cancel =
248             reinterpret_cast<grpc_closure*>(original_state);
249         if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
250           gpr_log(GPR_INFO,
251                   "call_combiner=%p: scheduling notify_on_cancel callback=%p",
252                   this, notify_on_cancel);
253         }
254         ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, GRPC_ERROR_REF(error));
255       }
256       break;
257     }
258     // cas failed, try again.
259   }
260 }
261 
262 }  // namespace grpc_core
263