• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2017 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/iomgr/call_combiner.h"
20 
21 #include <grpc/support/port_platform.h>
22 #include <inttypes.h>
23 
24 #include "absl/log/check.h"
25 #include "absl/log/log.h"
26 #include "src/core/telemetry/stats.h"
27 #include "src/core/telemetry/stats_data.h"
28 #include "src/core/util/crash.h"
29 
30 namespace grpc_core {
31 
32 namespace {
33 
34 // grpc_error LSB can be used
35 constexpr intptr_t kErrorBit = 1;
36 
DecodeCancelStateError(gpr_atm cancel_state)37 grpc_error_handle DecodeCancelStateError(gpr_atm cancel_state) {
38   if (cancel_state & kErrorBit) {
39     return internal::StatusGetFromHeapPtr(cancel_state & ~kErrorBit);
40   }
41   return absl::OkStatus();
42 }
43 
44 }  // namespace
45 
CallCombiner()46 CallCombiner::CallCombiner() {
47   gpr_atm_no_barrier_store(&cancel_state_, 0);
48   gpr_atm_no_barrier_store(&size_, 0);
49 #ifdef GRPC_TSAN_ENABLED
50   GRPC_CLOSURE_INIT(&tsan_closure_, TsanClosure, this,
51                     grpc_schedule_on_exec_ctx);
52 #endif
53 }
54 
~CallCombiner()55 CallCombiner::~CallCombiner() {
56   if (cancel_state_ & kErrorBit) {
57     internal::StatusFreeHeapPtr(cancel_state_ & ~kErrorBit);
58   }
59 }
60 
61 #ifdef GRPC_TSAN_ENABLED
TsanClosure(void * arg,grpc_error_handle error)62 void CallCombiner::TsanClosure(void* arg, grpc_error_handle error) {
63   CallCombiner* self = static_cast<CallCombiner*>(arg);
64   // We ref-count the lock, and check if it's already taken.
65   // If it was taken, we should do nothing. Otherwise, we will mark it as
66   // locked. Note that if two different threads try to do this, only one of
67   // them will be able to mark the lock as acquired, while they both run their
68   // callbacks. In such cases (which should never happen for call_combiner),
69   // TSAN will correctly produce an error.
70   //
71   // TODO(soheil): This only covers the callbacks scheduled by
72   //               CallCombiner::Start() and CallCombiner::Stop().
73   //               If in the future, a callback gets scheduled using other
74   //               mechanisms, we will need to add APIs to externally lock
75   //               call combiners.
76   RefCountedPtr<TsanLock> lock = self->tsan_lock_;
77   bool prev = false;
78   if (lock->taken.compare_exchange_strong(prev, true)) {
79     TSAN_ANNOTATE_RWLOCK_ACQUIRED(&lock->taken, true);
80   } else {
81     lock.reset();
82   }
83   Closure::Run(DEBUG_LOCATION, self->original_closure_, error);
84   if (lock != nullptr) {
85     TSAN_ANNOTATE_RWLOCK_RELEASED(&lock->taken, true);
86     bool prev = true;
87     CHECK(lock->taken.compare_exchange_strong(prev, false));
88   }
89 }
90 #endif
91 
ScheduleClosure(grpc_closure * closure,grpc_error_handle error)92 void CallCombiner::ScheduleClosure(grpc_closure* closure,
93                                    grpc_error_handle error) {
94 #ifdef GRPC_TSAN_ENABLED
95   original_closure_ = closure;
96   ExecCtx::Run(DEBUG_LOCATION, &tsan_closure_, error);
97 #else
98   ExecCtx::Run(DEBUG_LOCATION, closure, error);
99 #endif
100 }
101 
102 #ifndef NDEBUG
103 #define DEBUG_ARGS const char *file, int line,
104 #define DEBUG_FMT_STR "%s:%d: "
105 #define DEBUG_FMT_ARGS , file, line
106 #else
107 #define DEBUG_ARGS
108 #define DEBUG_FMT_STR
109 #define DEBUG_FMT_ARGS
110 #endif
111 
Start(grpc_closure * closure,grpc_error_handle error,DEBUG_ARGS const char * reason)112 void CallCombiner::Start(grpc_closure* closure, grpc_error_handle error,
113                          DEBUG_ARGS const char* reason) {
114   GRPC_TRACE_LOG(call_combiner, INFO) << absl::StrFormat(
115       "==> CallCombiner::Start() [%p] closure=%s [" DEBUG_FMT_STR
116       "%s] error=%s",
117       this, closure->DebugString().c_str() DEBUG_FMT_ARGS, reason,
118       StatusToString(error).c_str());
119   size_t prev_size =
120       static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)1));
121   GRPC_TRACE_LOG(call_combiner, INFO)
122       << "  size: " << prev_size << " -> " << prev_size + 1;
123   if (prev_size == 0) {
124     GRPC_TRACE_LOG(call_combiner, INFO) << "  EXECUTING IMMEDIATELY";
125     // Queue was empty, so execute this closure immediately.
126     ScheduleClosure(closure, error);
127   } else {
128     GRPC_TRACE_LOG(call_combiner, INFO) << "  QUEUING";
129     // Queue was not empty, so add closure to queue.
130     closure->error_data.error = internal::StatusAllocHeapPtr(error);
131     queue_.Push(
132         reinterpret_cast<MultiProducerSingleConsumerQueue::Node*>(closure));
133   }
134 }
135 
Stop(DEBUG_ARGS const char * reason)136 void CallCombiner::Stop(DEBUG_ARGS const char* reason) {
137   GRPC_TRACE_LOG(call_combiner, INFO)
138       << absl::StrFormat("==> CallCombiner::Stop() [%p] [" DEBUG_FMT_STR "%s]",
139                          this DEBUG_FMT_ARGS, reason);
140   size_t prev_size =
141       static_cast<size_t>(gpr_atm_full_fetch_add(&size_, (gpr_atm)-1));
142   GRPC_TRACE_LOG(call_combiner, INFO)
143       << "  size: " << prev_size << " -> " << prev_size - 1;
144   CHECK_GE(prev_size, 1u);
145   if (prev_size > 1) {
146     while (true) {
147       GRPC_TRACE_LOG(call_combiner, INFO) << "  checking queue";
148       bool empty;
149       grpc_closure* closure =
150           reinterpret_cast<grpc_closure*>(queue_.PopAndCheckEnd(&empty));
151       if (closure == nullptr) {
152         // This can happen either due to a race condition within the mpscq
153         // code or because of a race with Start().
154         GRPC_TRACE_LOG(call_combiner, INFO)
155             << "  queue returned no result; checking again";
156         continue;
157       }
158       grpc_error_handle error =
159           internal::StatusMoveFromHeapPtr(closure->error_data.error);
160       closure->error_data.error = 0;
161       GRPC_TRACE_LOG(call_combiner, INFO)
162           << "  EXECUTING FROM QUEUE: closure=" << closure->DebugString()
163           << " error=" << StatusToString(error);
164       ScheduleClosure(closure, error);
165       break;
166     }
167     GRPC_TRACE_LOG(call_combiner, INFO) << "  queue empty";
168   }
169 }
170 
SetNotifyOnCancel(grpc_closure * closure)171 void CallCombiner::SetNotifyOnCancel(grpc_closure* closure) {
172   while (true) {
173     // Decode original state.
174     gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
175     grpc_error_handle original_error = DecodeCancelStateError(original_state);
176     // If error is set, invoke the cancellation closure immediately.
177     // Otherwise, store the new closure.
178     if (!original_error.ok()) {
179       GRPC_TRACE_LOG(call_combiner, INFO)
180           << "call_combiner=" << this
181           << ": scheduling notify_on_cancel callback=" << closure
182           << " for pre-existing cancellation";
183       ExecCtx::Run(DEBUG_LOCATION, closure, original_error);
184       break;
185     } else {
186       if (gpr_atm_full_cas(&cancel_state_, original_state,
187                            reinterpret_cast<gpr_atm>(closure))) {
188         GRPC_TRACE_LOG(call_combiner, INFO)
189             << "call_combiner=" << this
190             << ": setting notify_on_cancel=" << closure;
191         // If we replaced an earlier closure, invoke the original
192         // closure with absl::OkStatus().  This allows callers to clean
193         // up any resources they may be holding for the callback.
194         if (original_state != 0) {
195           closure = reinterpret_cast<grpc_closure*>(original_state);
196           GRPC_TRACE_LOG(call_combiner, INFO)
197               << "call_combiner=" << this
198               << ": scheduling old cancel callback=" << closure;
199           ExecCtx::Run(DEBUG_LOCATION, closure, absl::OkStatus());
200         }
201         break;
202       }
203     }
204     // cas failed, try again.
205   }
206 }
207 
Cancel(grpc_error_handle error)208 void CallCombiner::Cancel(grpc_error_handle error) {
209   intptr_t status_ptr = internal::StatusAllocHeapPtr(error);
210   gpr_atm new_state = kErrorBit | status_ptr;
211   while (true) {
212     gpr_atm original_state = gpr_atm_acq_load(&cancel_state_);
213     grpc_error_handle original_error = DecodeCancelStateError(original_state);
214     if (!original_error.ok()) {
215       internal::StatusFreeHeapPtr(status_ptr);
216       break;
217     }
218     if (gpr_atm_full_cas(&cancel_state_, original_state, new_state)) {
219       if (original_state != 0) {
220         grpc_closure* notify_on_cancel =
221             reinterpret_cast<grpc_closure*>(original_state);
222         GRPC_TRACE_LOG(call_combiner, INFO)
223             << "call_combiner=" << this
224             << ": scheduling notify_on_cancel callback=" << notify_on_cancel;
225         ExecCtx::Run(DEBUG_LOCATION, notify_on_cancel, error);
226       }
227       break;
228     }
229     // cas failed, try again.
230   }
231 }
232 
233 }  // namespace grpc_core
234