1 //
2 //
3 // Copyright 2016 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/iomgr/combiner.h"
20
21 #include <assert.h>
22 #include <grpc/support/alloc.h>
23 #include <grpc/support/port_platform.h>
24 #include <inttypes.h>
25 #include <string.h>
26
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "src/core/lib/experiments/experiments.h"
30 #include "src/core/lib/iomgr/executor.h"
31 #include "src/core/lib/iomgr/iomgr_internal.h"
32 #include "src/core/util/crash.h"
33 #include "src/core/util/mpscq.h"
34
35 #define STATE_UNORPHANED 1
36 #define STATE_ELEM_COUNT_LOW_BIT 2
37
38 static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* closure,
39 grpc_error_handle error);
40 static void combiner_finally_exec(grpc_core::Combiner* lock,
41 grpc_closure* closure,
42 grpc_error_handle error);
43
grpc_combiner_create(std::shared_ptr<grpc_event_engine::experimental::EventEngine> event_engine)44 grpc_core::Combiner* grpc_combiner_create(
45 std::shared_ptr<grpc_event_engine::experimental::EventEngine>
46 event_engine) {
47 grpc_core::Combiner* lock = new grpc_core::Combiner();
48 lock->event_engine = event_engine;
49 gpr_ref_init(&lock->refs, 1);
50 gpr_atm_no_barrier_store(&lock->state, STATE_UNORPHANED);
51 grpc_closure_list_init(&lock->final_list);
52 GRPC_TRACE_LOG(combiner, INFO) << "C:" << lock << " create";
53 return lock;
54 }
55
really_destroy(grpc_core::Combiner * lock)56 static void really_destroy(grpc_core::Combiner* lock) {
57 GRPC_TRACE_LOG(combiner, INFO) << "C:" << lock << " really_destroy";
58 CHECK_EQ(gpr_atm_no_barrier_load(&lock->state), 0);
59 delete lock;
60 }
61
start_destroy(grpc_core::Combiner * lock)62 static void start_destroy(grpc_core::Combiner* lock) {
63 gpr_atm old_state = gpr_atm_full_fetch_add(&lock->state, -STATE_UNORPHANED);
64 GRPC_TRACE_LOG(combiner, INFO)
65 << "C:" << lock << " really_destroy old_state=" << old_state;
66 if (old_state == 1) {
67 really_destroy(lock);
68 }
69 }
70
71 #ifndef NDEBUG
72 #define GRPC_COMBINER_DEBUG_SPAM(op, delta) \
73 GRPC_TRACE_VLOG(combiner, 2).AtLocation(file, line) \
74 << "C:" << lock << " " << (op) << " " \
75 << gpr_atm_no_barrier_load(&lock->refs.count) << " --> " \
76 << gpr_atm_no_barrier_load(&lock->refs.count) + (delta) << " " \
77 << reason;
78 #else
79 #define GRPC_COMBINER_DEBUG_SPAM(op, delta)
80 #endif
81
grpc_combiner_unref(grpc_core::Combiner * lock GRPC_COMBINER_DEBUG_ARGS)82 void grpc_combiner_unref(grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
83 GRPC_COMBINER_DEBUG_SPAM("UNREF", -1);
84 if (gpr_unref(&lock->refs)) {
85 start_destroy(lock);
86 }
87 }
88
grpc_combiner_ref(grpc_core::Combiner * lock GRPC_COMBINER_DEBUG_ARGS)89 grpc_core::Combiner* grpc_combiner_ref(
90 grpc_core::Combiner* lock GRPC_COMBINER_DEBUG_ARGS) {
91 GRPC_COMBINER_DEBUG_SPAM(" REF", 1);
92 gpr_ref(&lock->refs);
93 return lock;
94 }
95
push_last_on_exec_ctx(grpc_core::Combiner * lock)96 static void push_last_on_exec_ctx(grpc_core::Combiner* lock) {
97 lock->next_combiner_on_this_exec_ctx = nullptr;
98 if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
99 grpc_core::ExecCtx::Get()->combiner_data()->active_combiner =
100 grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock;
101 } else {
102 grpc_core::ExecCtx::Get()
103 ->combiner_data()
104 ->last_combiner->next_combiner_on_this_exec_ctx = lock;
105 grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock;
106 }
107 }
108
push_first_on_exec_ctx(grpc_core::Combiner * lock)109 static void push_first_on_exec_ctx(grpc_core::Combiner* lock) {
110 lock->next_combiner_on_this_exec_ctx =
111 grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
112 grpc_core::ExecCtx::Get()->combiner_data()->active_combiner = lock;
113 if (lock->next_combiner_on_this_exec_ctx == nullptr) {
114 grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = lock;
115 }
116 }
117
combiner_exec(grpc_core::Combiner * lock,grpc_closure * cl,grpc_error_handle error)118 static void combiner_exec(grpc_core::Combiner* lock, grpc_closure* cl,
119 grpc_error_handle error) {
120 gpr_atm last = gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
121 GRPC_TRACE_LOG(combiner, INFO)
122 << "C:" << lock << " grpc_combiner_execute c=" << cl << " last=" << last;
123 if (last == 1) {
124 gpr_atm_no_barrier_store(
125 &lock->initiating_exec_ctx_or_null,
126 reinterpret_cast<gpr_atm>(grpc_core::ExecCtx::Get()));
127 // first element on this list: add it to the list of combiner locks
128 // executing within this exec_ctx
129 push_last_on_exec_ctx(lock);
130 } else {
131 // there may be a race with setting here: if that happens, we may delay
132 // offload for one or two actions, and that's fine
133 gpr_atm initiator =
134 gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null);
135 if (initiator != 0 &&
136 initiator != reinterpret_cast<gpr_atm>(grpc_core::ExecCtx::Get())) {
137 gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 0);
138 }
139 }
140 CHECK(last & STATE_UNORPHANED); // ensure lock has not been destroyed
141 assert(cl->cb);
142 cl->error_data.error = grpc_core::internal::StatusAllocHeapPtr(error);
143 lock->queue.Push(cl->next_data.mpscq_node.get());
144 }
145
move_next()146 static void move_next() {
147 grpc_core::ExecCtx::Get()->combiner_data()->active_combiner =
148 grpc_core::ExecCtx::Get()
149 ->combiner_data()
150 ->active_combiner->next_combiner_on_this_exec_ctx;
151 if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner == nullptr) {
152 grpc_core::ExecCtx::Get()->combiner_data()->last_combiner = nullptr;
153 }
154 }
155
queue_offload(grpc_core::Combiner * lock)156 static void queue_offload(grpc_core::Combiner* lock) {
157 move_next();
158 // Make the combiner look uncontended by storing a non-null value here, so
159 // that we don't immediately offload again.
160 gpr_atm_no_barrier_store(&lock->initiating_exec_ctx_or_null, 1);
161 GRPC_TRACE_LOG(combiner, INFO) << "C:" << lock << " queue_offload";
162 lock->event_engine->Run([lock] {
163 grpc_core::ApplicationCallbackExecCtx callback_exec_ctx;
164 grpc_core::ExecCtx exec_ctx(0);
165 push_last_on_exec_ctx(lock);
166 exec_ctx.Flush();
167 });
168 }
169
grpc_combiner_continue_exec_ctx()170 bool grpc_combiner_continue_exec_ctx() {
171 grpc_core::Combiner* lock =
172 grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
173 if (lock == nullptr) {
174 return false;
175 }
176
177 bool contended =
178 gpr_atm_no_barrier_load(&lock->initiating_exec_ctx_or_null) == 0;
179
180 GRPC_TRACE_LOG(combiner, INFO)
181 << "C:" << lock << " grpc_combiner_continue_exec_ctx "
182 << "contended=" << contended << " exec_ctx_ready_to_finish="
183 << grpc_core::ExecCtx::Get()->IsReadyToFinish()
184 << " time_to_execute_final_list=" << lock->time_to_execute_final_list;
185
186 // offload only if both (1) the combiner is contended and has more than one
187 // closure to execute, and (2) the current execution context needs to finish
188 // as soon as possible
189 if (contended && grpc_core::ExecCtx::Get()->IsReadyToFinish()) {
190 // this execution context wants to move on: schedule remaining work to be
191 // picked up on the executor
192 queue_offload(lock);
193 return true;
194 }
195
196 if (!lock->time_to_execute_final_list ||
197 // peek to see if something new has shown up, and execute that with
198 // priority
199 (gpr_atm_acq_load(&lock->state) >> 1) > 1) {
200 grpc_core::MultiProducerSingleConsumerQueue::Node* n = lock->queue.Pop();
201 GRPC_TRACE_LOG(combiner, INFO)
202 << "C:" << lock << " maybe_finish_one n=" << n;
203
204 if (n == nullptr) {
205 // queue is in an inconsistent state: use this as a cue that we should
206 // go off and do something else for a while (and come back later)
207 queue_offload(lock);
208 return true;
209 }
210 grpc_closure* cl = reinterpret_cast<grpc_closure*>(n);
211 #ifndef NDEBUG
212 cl->scheduled = false;
213 #endif
214 grpc_error_handle cl_err =
215 grpc_core::internal::StatusMoveFromHeapPtr(cl->error_data.error);
216 cl->error_data.error = 0;
217 cl->cb(cl->cb_arg, std::move(cl_err));
218 } else {
219 grpc_closure* c = lock->final_list.head;
220 CHECK_NE(c, nullptr);
221 grpc_closure_list_init(&lock->final_list);
222 int loops = 0;
223 while (c != nullptr) {
224 GRPC_TRACE_LOG(combiner, INFO)
225 << "C:" << lock << " execute_final[" << loops << "] c=" << c;
226 grpc_closure* next = c->next_data.next;
227 #ifndef NDEBUG
228 c->scheduled = false;
229 #endif
230 grpc_error_handle error =
231 grpc_core::internal::StatusMoveFromHeapPtr(c->error_data.error);
232 c->error_data.error = 0;
233 c->cb(c->cb_arg, std::move(error));
234 c = next;
235 }
236 }
237
238 move_next();
239 lock->time_to_execute_final_list = false;
240 gpr_atm old_state =
241 gpr_atm_full_fetch_add(&lock->state, -STATE_ELEM_COUNT_LOW_BIT);
242 GRPC_TRACE_LOG(combiner, INFO)
243 << "C:" << lock << " finish old_state=" << old_state;
244 // Define a macro to ease readability of the following switch statement.
245 #define OLD_STATE_WAS(orphaned, elem_count) \
246 (((orphaned) ? 0 : STATE_UNORPHANED) | \
247 ((elem_count) * STATE_ELEM_COUNT_LOW_BIT))
248 // Depending on what the previous state was, we need to perform different
249 // actions.
250 switch (old_state) {
251 default:
252 // we have multiple queued work items: just continue executing them
253 break;
254 case OLD_STATE_WAS(false, 2):
255 case OLD_STATE_WAS(true, 2):
256 // we're down to one queued item: if it's the final list we should do that
257 if (!grpc_closure_list_empty(lock->final_list)) {
258 lock->time_to_execute_final_list = true;
259 }
260 break;
261 case OLD_STATE_WAS(false, 1):
262 // had one count, one unorphaned --> unlocked unorphaned
263 return true;
264 case OLD_STATE_WAS(true, 1):
265 // and one count, one orphaned --> unlocked and orphaned
266 really_destroy(lock);
267 return true;
268 case OLD_STATE_WAS(false, 0):
269 case OLD_STATE_WAS(true, 0):
270 // these values are illegal - representing an already unlocked or
271 // deleted lock
272 GPR_UNREACHABLE_CODE(return true);
273 }
274 push_first_on_exec_ctx(lock);
275 return true;
276 }
277
278 static void enqueue_finally(void* closure, grpc_error_handle error);
279
combiner_finally_exec(grpc_core::Combiner * lock,grpc_closure * closure,grpc_error_handle error)280 static void combiner_finally_exec(grpc_core::Combiner* lock,
281 grpc_closure* closure,
282 grpc_error_handle error) {
283 CHECK_NE(lock, nullptr);
284 GRPC_TRACE_LOG(combiner, INFO)
285 << "C:" << lock << " grpc_combiner_execute_finally c=" << closure
286 << "; ac=" << grpc_core::ExecCtx::Get()->combiner_data()->active_combiner;
287 if (grpc_core::ExecCtx::Get()->combiner_data()->active_combiner != lock) {
288 // Using error_data.scratch to store the combiner so that it can be accessed
289 // in enqueue_finally.
290 closure->error_data.scratch = reinterpret_cast<uintptr_t>(lock);
291 lock->Run(GRPC_CLOSURE_CREATE(enqueue_finally, closure, nullptr), error);
292 return;
293 }
294
295 if (grpc_closure_list_empty(lock->final_list)) {
296 gpr_atm_full_fetch_add(&lock->state, STATE_ELEM_COUNT_LOW_BIT);
297 }
298 grpc_closure_list_append(&lock->final_list, closure, error);
299 }
300
enqueue_finally(void * closure,grpc_error_handle error)301 static void enqueue_finally(void* closure, grpc_error_handle error) {
302 grpc_closure* cl = static_cast<grpc_closure*>(closure);
303 grpc_core::Combiner* lock =
304 reinterpret_cast<grpc_core::Combiner*>(cl->error_data.scratch);
305 cl->error_data.scratch = 0;
306 combiner_finally_exec(lock, cl, error);
307 }
308
309 namespace grpc_core {
Run(grpc_closure * closure,grpc_error_handle error)310 void Combiner::Run(grpc_closure* closure, grpc_error_handle error) {
311 combiner_exec(this, closure, error);
312 }
313
FinallyRun(grpc_closure * closure,grpc_error_handle error)314 void Combiner::FinallyRun(grpc_closure* closure, grpc_error_handle error) {
315 combiner_finally_exec(this, closure, error);
316 }
317
ForceOffload()318 void Combiner::ForceOffload() {
319 gpr_atm_no_barrier_store(&initiating_exec_ctx_or_null, 0);
320 ExecCtx::Get()->SetReadyToFinishFlag();
321 }
322
323 } // namespace grpc_core
324