• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2017 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include <grpc/support/port_platform.h>
20 
21 #include "src/core/lib/iomgr/call_combiner.h"
22 
23 #include <inttypes.h>
24 
25 #include <grpc/support/log.h>
26 #include "src/core/lib/debug/stats.h"
27 #include "src/core/lib/profiling/timers.h"
28 
29 namespace grpc_core {
30 
31 DebugOnlyTraceFlag grpc_call_combiner_trace(false, "call_combiner");
32 
33 namespace {
34 
DecodeCancelStateError(gpr_atm cancel_state)35 grpc_error* DecodeCancelStateError(gpr_atm cancel_state) {
36   if (cancel_state & 1) {
37     return (grpc_error*)(cancel_state & ~static_cast<gpr_atm>(1));
38   }
39   return GRPC_ERROR_NONE;
40 }
41 
EncodeCancelStateError(grpc_error * error)42 gpr_atm EncodeCancelStateError(grpc_error* error) {
43   return static_cast<gpr_atm>(1) | (gpr_atm)error;
44 }
45 
46 }  // namespace
47 
CallCombiner()48 CallCombiner::CallCombiner() {
49   gpr_atm_no_barrier_store(&cancel_state_, 0);
50   gpr_atm_no_barrier_store(&size_, 0);
51 #ifdef GRPC_TSAN_ENABLED
52   GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
53                     grpc_schedule_on_exec_ctx);
54 #endif
55 }
56 
~CallCombiner()57 CallCombiner::~CallCombiner() {
58   GRPC_ERROR_UNREF(DecodeCancelStateError(cancel_state_));
59 }
60 
61 #ifdef GRPC_TSAN_ENABLED
TsanClosure(void * arg,grpc_error * error)62 void CallCombiner::TsanClosure(void* arg, grpc_error* error) {
63   CallCombiner* self = static_cast<CallCombiner*>(arg);
64   // We ref-count the lock, and check if it's already taken.
65   // If it was taken, we should do nothing. Otherwise, we will mark it as
66   // locked. Note that if two different threads try to do this, only one of
67   // them will be able to mark the lock as acquired, while they both run their
68   // callbacks. In such cases (which should never happen for call_combiner),
69   // TSAN will correctly produce an error.
70   //
71   // TODO(soheil): This only covers the callbacks scheduled by
72   //               CallCombiner::Start() and CallCombiner::Stop().
73   //               If in the future, a callback gets scheduled using other
74   //               mechanisms, we will need to add APIs to externally lock
75   //               call combiners.
76   RefCountedPtr<TsanLock> lock = self->tsan_lock_;
77   bool prev = false;
78   if (lock->taken.compare_exchange_strong(prev, true)) {
79     TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
80   } else {
81     lock.reset();
82   }
83   grpc_core::Closure::Run(DEBUG_LOCATION, self->original_closure_,
84                           GRPC_ERROR_REF(error));
85   if (lock != nullptr) {
86     TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
87     bool prev = true;
88     GPR_ASSERT(lock->taken.compare_exchange_strong(prev, false));
89   }
90 }
91 #endif
92 
ScheduleClosure(grpc_closure * closure,grpc_error * error)93 void CallCombiner::ScheduleClosure(grpc_closure* closure, grpc_error* error) {
94 #ifdef GRPC_TSAN_ENABLED
95   original_closure_ = closure;
96   ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error);
97 #else
98   ExecCtx::Run(DEBUG_LOCATION, closure, error);
99 #endif
100 }
101 
102 #ifndef NDEBUG
103 #define DEBUG_ARGS const char *file, int line,
104 #define DEBUG_FMT_STR "%s:%d: "
105 #define DEBUG_FMT_ARGS , file, line
106 #else
107 #define DEBUG_ARGS
108 #define DEBUG_FMT_STR
109 #define DEBUG_FMT_ARGS
110 #endif
111 
Start(grpc_closure * closure,grpc_error * error,DEBUG_ARGS const char * reason)112 void CallCombiner::Start(grpc_closure* closure, grpc_error* error,
113                          DEBUG_ARGS const char* reason) {
114   GPR_TIMER_SCOPE("CallCombiner::Start", 0);
115   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
116     gpr_log(GPR_INFO,
117             "==> CallCombiner::Start() [%p] closure=%p [" DEBUG_FMT_STR
118             "%s] error=%s",
119             this, closure DEBUG_FMT_ARGS, reason, grpc_error_string(error));
120   }
121   size_t prev_size =
122       static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)1));
123   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
124     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
125             prev_size + 1);
126   }
127   GRPC_STATS_INC_CALL_COMBINER_LOCKS_SCHEDULED_ITEMS();
128   if (prev_size == 0) {
129     GRPC_STATS_INC_CALL_COMBINER_LOCKS_INITIATED();
130     GPR_TIMER_MARK("call_combiner_initiate", 0);
131     if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
132       gpr_log(GPR_INFO, "  EXECUTING IMMEDIATELY");
133     }
134     // Queue was empty, so execute this closure immediately.
135     ScheduleClosure(closure, error);
136   } else {
137     if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
138       gpr_log(GPR_INFO, "  QUEUING");
139     }
140     // Queue was not empty, so add closure to queue.
141     closure->error_data.error = error;
142     queue_.Push(
143         reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
144   }
145 }
146 
Stop(DEBUG_ARGS const char * reason)147 void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
148   GPR_TIMER_SCOPE("CallCombiner::Stop", 0);
149   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
150     gpr_log(GPR_INFO, "==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]",
151             this DEBUG_FMT_ARGS, reason);
152   }
153   size_t prev_size =
154       static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1));
155   if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
156     gpr_log(GPR_INFO, "  size: %" PRIdPTR " -> %" PRIdPTR, prev_size,
157             prev_size - 1);
158   }
159   GPR_ASSERT(prev_size >= 1);
160   if (prev_size > 1) {
161     while (true) {
162       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
163         gpr_log(GPR_INFO, "  checking queue");
164       }
165       bool empty;
166       grpc_closure* closure =
167           reinterpret_cast<grpc_closure*>(queue_.PopAndCheckEnd(&empty));
168       if (closure == nullptr) {
169         // This can happen either due to a race condition within the mpscq
170         // code or because of a race with Start().
171         if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
172           gpr_log(GPR_INFO, "  queue returned no result; checking again");
173         }
174         continue;
175       }
176       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
177         gpr_log(GPR_INFO, "  EXECUTING FROM QUEUE: closure=%p error=%s",
178                 closure, grpc_error_string(closure->error_data.error));
179       }
180       ScheduleClosure(closure, closure->error_data.error);
181       break;
182     }
183   } else if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
184     gpr_log(GPR_INFO, "  queue empty");
185   }
186 }
187 
SetNotifyOnCancel(grpc_closure * closure)188 void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
189   GRPC_STATS_INC_CALL_COMBINER_SET_NOTIFY_ON_CANCEL();
190   while (true) {
191     // Decode original state.
192     gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
193     grpc_error* original_error = DecodeCancelStateError(original_state);
194     // If error is set, invoke the cancellation closure immediately.
195     // Otherwise, store the new closure.
196     if (original_error != GRPC_ERROR_NONE) {
197       if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
198         gpr_log(GPR_INFO,
199                 "call_combiner=%p: scheduling notify_on_cancel callback=%p "
200                 "for pre-existing cancellation",
201                 this, closure);
202       }
203       ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_REF(original_error));
204       break;
205     } else {
206       if (gpr_atm_full_cas(&cancel_state_, original_state, (gpr_atm)closure)) {
207         if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
208           gpr_log(GPR_INFO, "call_combiner=%p: setting notify_on_cancel=%p",
209                   this, closure);
210         }
211         // If we replaced an earlier closure, invoke the original
212         // closure with GRPC_ERROR_NONE.  This allows callers to clean
213         // up any resources they may be holding for the callback.
214         if (original_state != 0) {
215           closure = (grpc_closure*)original_state;
216           if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
217             gpr_log(GPR_INFO,
218                     "call_combiner=%p: scheduling old cancel callback=%p", this,
219                     closure);
220           }
221           ExecCtx::Run(DEBUG_LOCATION, closure, GRPC_ERROR_NONE);
222         }
223         break;
224       }
225     }
226     // cas failed, try again.
227   }
228 }
229 
Cancel(grpc_error * error)230 void CallCombiner::Cancel(grpc_error* error) {
231   GRPC_STATS_INC_CALL_COMBINER_CANCELLED();
232   while (true) {
233     gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
234     grpc_error* original_error = DecodeCancelStateError(original_state);
235     if (original_error != GRPC_ERROR_NONE) {
236       GRPC_ERROR_UNREF(error);
237       break;
238     }
239     if (gpr_atm_full_cas(&cancel_state_, original_state,
240                          EncodeCancelStateError(error))) {
241       if (original_state != 0) {
242         grpc_closure* notify_on_cancel = (grpc_closure*)original_state;
243         if (GRPC_TRACE_FLAG_ENABLED(grpc_call_combiner_trace)) {
244           gpr_log(GPR_INFO,
245                   "call_combiner=%p: scheduling notify_on_cancel callback=%p",
246                   this, notify_on_cancel);
247         }
248         ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, GRPC_ERROR_REF(error));
249       }
250       break;
251     }
252     // cas failed, try again.
253   }
254 }
255 
256 }  // namespace grpc_core
257