1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/executor.h"
22
23 #include <string.h>
24
25 #include <grpc/support/alloc.h>
26 #include <grpc/support/cpu.h>
27 #include <grpc/support/log.h>
28 #include <grpc/support/sync.h>
29
30 #include "src/core/lib/debug/stats.h"
31 #include "src/core/lib/gpr/tls.h"
32 #include "src/core/lib/gpr/useful.h"
33 #include "src/core/lib/gprpp/memory.h"
34 #include "src/core/lib/iomgr/exec_ctx.h"
35
36 #define MAX_DEPTH 2
37
38 #define EXECUTOR_TRACE(format, ...) \
39 if (executor_trace.enabled()) { \
40 gpr_log(GPR_INFO, "EXECUTOR " format, __VA_ARGS__); \
41 }
42
43 #define EXECUTOR_TRACE0(str) \
44 if (executor_trace.enabled()) { \
45 gpr_log(GPR_INFO, "EXECUTOR " str); \
46 }
47
48 grpc_core::TraceFlag executor_trace(false, "executor");
49
50 GPR_TLS_DECL(g_this_thread_state);
51
GrpcExecutor(const char * name)52 GrpcExecutor::GrpcExecutor(const char* name) : name_(name) {
53 adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
54 gpr_atm_rel_store(&num_threads_, 0);
55 max_threads_ = GPR_MAX(1, 2 * gpr_cpu_num_cores());
56 }
57
Init()58 void GrpcExecutor::Init() { SetThreading(true); }
59
RunClosures(const char * executor_name,grpc_closure_list list)60 size_t GrpcExecutor::RunClosures(const char* executor_name,
61 grpc_closure_list list) {
62 size_t n = 0;
63
64 grpc_closure* c = list.head;
65 while (c != nullptr) {
66 grpc_closure* next = c->next_data.next;
67 grpc_error* error = c->error_data.error;
68 #ifndef NDEBUG
69 EXECUTOR_TRACE("(%s) run %p [created by %s:%d]", executor_name, c,
70 c->file_created, c->line_created);
71 c->scheduled = false;
72 #else
73 EXECUTOR_TRACE("(%s) run %p", executor_name, c);
74 #endif
75 c->cb(c->cb_arg, error);
76 GRPC_ERROR_UNREF(error);
77 c = next;
78 n++;
79 grpc_core::ExecCtx::Get()->Flush();
80 }
81
82 return n;
83 }
84
IsThreaded() const85 bool GrpcExecutor::IsThreaded() const {
86 return gpr_atm_acq_load(&num_threads_) > 0;
87 }
88
SetThreading(bool threading)89 void GrpcExecutor::SetThreading(bool threading) {
90 gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
91 EXECUTOR_TRACE("(%s) SetThreading(%d) begin", name_, threading);
92
93 if (threading) {
94 if (curr_num_threads > 0) {
95 EXECUTOR_TRACE("(%s) SetThreading(true). curr_num_threads == 0", name_);
96 return;
97 }
98
99 GPR_ASSERT(num_threads_ == 0);
100 gpr_atm_rel_store(&num_threads_, 1);
101 gpr_tls_init(&g_this_thread_state);
102 thd_state_ = static_cast<ThreadState*>(
103 gpr_zalloc(sizeof(ThreadState) * max_threads_));
104
105 for (size_t i = 0; i < max_threads_; i++) {
106 gpr_mu_init(&thd_state_[i].mu);
107 gpr_cv_init(&thd_state_[i].cv);
108 thd_state_[i].id = i;
109 thd_state_[i].name = name_;
110 thd_state_[i].thd = grpc_core::Thread();
111 thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT;
112 }
113
114 thd_state_[0].thd =
115 grpc_core::Thread(name_, &GrpcExecutor::ThreadMain, &thd_state_[0]);
116 thd_state_[0].thd.Start();
117 } else { // !threading
118 if (curr_num_threads == 0) {
119 EXECUTOR_TRACE("(%s) SetThreading(false). curr_num_threads == 0", name_);
120 return;
121 }
122
123 for (size_t i = 0; i < max_threads_; i++) {
124 gpr_mu_lock(&thd_state_[i].mu);
125 thd_state_[i].shutdown = true;
126 gpr_cv_signal(&thd_state_[i].cv);
127 gpr_mu_unlock(&thd_state_[i].mu);
128 }
129
130 /* Ensure no thread is adding a new thread. Once this is past, then no
131 * thread will try to add a new one either (since shutdown is true) */
132 gpr_spinlock_lock(&adding_thread_lock_);
133 gpr_spinlock_unlock(&adding_thread_lock_);
134
135 curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
136 for (gpr_atm i = 0; i < curr_num_threads; i++) {
137 thd_state_[i].thd.Join();
138 EXECUTOR_TRACE("(%s) Thread %" PRIdPTR " of %" PRIdPTR " joined", name_,
139 i + 1, curr_num_threads);
140 }
141
142 gpr_atm_rel_store(&num_threads_, 0);
143 for (size_t i = 0; i < max_threads_; i++) {
144 gpr_mu_destroy(&thd_state_[i].mu);
145 gpr_cv_destroy(&thd_state_[i].cv);
146 RunClosures(thd_state_[i].name, thd_state_[i].elems);
147 }
148
149 gpr_free(thd_state_);
150 gpr_tls_destroy(&g_this_thread_state);
151 }
152
153 EXECUTOR_TRACE("(%s) SetThreading(%d) done", name_, threading);
154 }
155
Shutdown()156 void GrpcExecutor::Shutdown() { SetThreading(false); }
157
ThreadMain(void * arg)158 void GrpcExecutor::ThreadMain(void* arg) {
159 ThreadState* ts = static_cast<ThreadState*>(arg);
160 gpr_tls_set(&g_this_thread_state, reinterpret_cast<intptr_t>(ts));
161
162 grpc_core::ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
163
164 size_t subtract_depth = 0;
165 for (;;) {
166 EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: step (sub_depth=%" PRIdPTR ")",
167 ts->name, ts->id, subtract_depth);
168
169 gpr_mu_lock(&ts->mu);
170 ts->depth -= subtract_depth;
171 // Wait for closures to be enqueued or for the executor to be shutdown
172 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
173 ts->queued_long_job = false;
174 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
175 }
176
177 if (ts->shutdown) {
178 EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: shutdown", ts->name, ts->id);
179 gpr_mu_unlock(&ts->mu);
180 break;
181 }
182
183 GRPC_STATS_INC_EXECUTOR_QUEUE_DRAINED();
184 grpc_closure_list closures = ts->elems;
185 ts->elems = GRPC_CLOSURE_LIST_INIT;
186 gpr_mu_unlock(&ts->mu);
187
188 EXECUTOR_TRACE("(%s) [%" PRIdPTR "]: execute", ts->name, ts->id);
189
190 grpc_core::ExecCtx::Get()->InvalidateNow();
191 subtract_depth = RunClosures(ts->name, closures);
192 }
193 }
194
Enqueue(grpc_closure * closure,grpc_error * error,bool is_short)195 void GrpcExecutor::Enqueue(grpc_closure* closure, grpc_error* error,
196 bool is_short) {
197 bool retry_push;
198 if (is_short) {
199 GRPC_STATS_INC_EXECUTOR_SCHEDULED_SHORT_ITEMS();
200 } else {
201 GRPC_STATS_INC_EXECUTOR_SCHEDULED_LONG_ITEMS();
202 }
203
204 do {
205 retry_push = false;
206 size_t cur_thread_count =
207 static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
208
209 // If the number of threads is zero(i.e either the executor is not threaded
210 // or already shutdown), then queue the closure on the exec context itself
211 if (cur_thread_count == 0) {
212 #ifndef NDEBUG
213 EXECUTOR_TRACE("(%s) schedule %p (created %s:%d) inline", name_, closure,
214 closure->file_created, closure->line_created);
215 #else
216 EXECUTOR_TRACE("(%s) schedule %p inline", name_, closure);
217 #endif
218 grpc_closure_list_append(grpc_core::ExecCtx::Get()->closure_list(),
219 closure, error);
220 return;
221 }
222
223 ThreadState* ts = (ThreadState*)gpr_tls_get(&g_this_thread_state);
224 if (ts == nullptr) {
225 ts = &thd_state_[GPR_HASH_POINTER(grpc_core::ExecCtx::Get(),
226 cur_thread_count)];
227 } else {
228 GRPC_STATS_INC_EXECUTOR_SCHEDULED_TO_SELF();
229 }
230
231 ThreadState* orig_ts = ts;
232 bool try_new_thread = false;
233
234 for (;;) {
235 #ifndef NDEBUG
236 EXECUTOR_TRACE(
237 "(%s) try to schedule %p (%s) (created %s:%d) to thread "
238 "%" PRIdPTR,
239 name_, closure, is_short ? "short" : "long", closure->file_created,
240 closure->line_created, ts->id);
241 #else
242 EXECUTOR_TRACE("(%s) try to schedule %p (%s) to thread %" PRIdPTR, name_,
243 closure, is_short ? "short" : "long", ts->id);
244 #endif
245
246 gpr_mu_lock(&ts->mu);
247 if (ts->queued_long_job) {
248 // if there's a long job queued, we never queue anything else to this
249 // queue (since long jobs can take 'infinite' time and we need to
250 // guarantee no starvation). Spin through queues and try again
251 gpr_mu_unlock(&ts->mu);
252 size_t idx = ts->id;
253 ts = &thd_state_[(idx + 1) % cur_thread_count];
254 if (ts == orig_ts) {
255 // We cycled through all the threads. Retry enqueue again by creating
256 // a new thread
257 //
258 // TODO (sreek): There is a potential issue here. We are
259 // unconditionally setting try_new_thread to true here. What if the
260 // executor is shutdown OR if cur_thread_count is already equal to
261 // max_threads ?
262 // (Fortunately, this is not an issue yet (as of july 2018) because
263 // there is only one instance of long job in gRPC and hence we will
264 // not hit this code path)
265 retry_push = true;
266 try_new_thread = true;
267 break;
268 }
269
270 continue; // Try the next thread-state
271 }
272
273 // == Found the thread state (i.e thread) to enqueue this closure! ==
274
275 // Also, if this thread has been waiting for closures, wake it up.
276 // - If grpc_closure_list_empty() is true and the Executor is not
277 // shutdown, it means that the thread must be waiting in ThreadMain()
278 // - Note that gpr_cv_signal() won't immediately wakeup the thread. That
279 // happens after we release the mutex &ts->mu a few lines below
280 if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
281 GRPC_STATS_INC_EXECUTOR_WAKEUP_INITIATED();
282 gpr_cv_signal(&ts->cv);
283 }
284
285 grpc_closure_list_append(&ts->elems, closure, error);
286
287 // If we already queued more than MAX_DEPTH number of closures on this
288 // thread, use this as a hint to create more threads
289 ts->depth++;
290 try_new_thread = ts->depth > MAX_DEPTH &&
291 cur_thread_count < max_threads_ && !ts->shutdown;
292
293 ts->queued_long_job = !is_short;
294
295 gpr_mu_unlock(&ts->mu);
296 break;
297 }
298
299 if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) {
300 cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
301 if (cur_thread_count < max_threads_) {
302 // Increment num_threads (safe to do a store instead of a cas because we
303 // always increment num_threads under the 'adding_thread_lock')
304 gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
305
306 thd_state_[cur_thread_count].thd = grpc_core::Thread(
307 name_, &GrpcExecutor::ThreadMain, &thd_state_[cur_thread_count]);
308 thd_state_[cur_thread_count].thd.Start();
309 }
310 gpr_spinlock_unlock(&adding_thread_lock_);
311 }
312
313 if (retry_push) {
314 GRPC_STATS_INC_EXECUTOR_PUSH_RETRIES();
315 }
316 } while (retry_push);
317 }
318
319 static GrpcExecutor* executors[GRPC_NUM_EXECUTORS];
320
default_enqueue_short(grpc_closure * closure,grpc_error * error)321 void default_enqueue_short(grpc_closure* closure, grpc_error* error) {
322 executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
323 true /* is_short */);
324 }
325
default_enqueue_long(grpc_closure * closure,grpc_error * error)326 void default_enqueue_long(grpc_closure* closure, grpc_error* error) {
327 executors[GRPC_DEFAULT_EXECUTOR]->Enqueue(closure, error,
328 false /* is_short */);
329 }
330
resolver_enqueue_short(grpc_closure * closure,grpc_error * error)331 void resolver_enqueue_short(grpc_closure* closure, grpc_error* error) {
332 executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
333 true /* is_short */);
334 }
335
resolver_enqueue_long(grpc_closure * closure,grpc_error * error)336 void resolver_enqueue_long(grpc_closure* closure, grpc_error* error) {
337 executors[GRPC_RESOLVER_EXECUTOR]->Enqueue(closure, error,
338 false /* is_short */);
339 }
340
341 static const grpc_closure_scheduler_vtable
342 vtables_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
343 {{&default_enqueue_short, &default_enqueue_short, "def-ex-short"},
344 {&default_enqueue_long, &default_enqueue_long, "def-ex-long"}},
345 {{&resolver_enqueue_short, &resolver_enqueue_short, "res-ex-short"},
346 {&resolver_enqueue_long, &resolver_enqueue_long, "res-ex-long"}}};
347
348 static grpc_closure_scheduler
349 schedulers_[GRPC_NUM_EXECUTORS][GRPC_NUM_EXECUTOR_JOB_TYPES] = {
350 {{&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_SHORT]},
351 {&vtables_[GRPC_DEFAULT_EXECUTOR][GRPC_EXECUTOR_LONG]}},
352 {{&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_SHORT]},
353 {&vtables_[GRPC_RESOLVER_EXECUTOR][GRPC_EXECUTOR_LONG]}}};
354
355 // grpc_executor_init() and grpc_executor_shutdown() functions are called in the
356 // the grpc_init() and grpc_shutdown() code paths which are protected by a
357 // global mutex. So it is okay to assume that these functions are thread-safe
grpc_executor_init()358 void grpc_executor_init() {
359 EXECUTOR_TRACE0("grpc_executor_init() enter");
360
361 // Return if grpc_executor_init() is already called earlier
362 if (executors[GRPC_DEFAULT_EXECUTOR] != nullptr) {
363 GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] != nullptr);
364 return;
365 }
366
367 executors[GRPC_DEFAULT_EXECUTOR] =
368 grpc_core::New<GrpcExecutor>("default-executor");
369 executors[GRPC_RESOLVER_EXECUTOR] =
370 grpc_core::New<GrpcExecutor>("resolver-executor");
371
372 executors[GRPC_DEFAULT_EXECUTOR]->Init();
373 executors[GRPC_RESOLVER_EXECUTOR]->Init();
374
375 EXECUTOR_TRACE0("grpc_executor_init() done");
376 }
377
grpc_executor_scheduler(GrpcExecutorType executor_type,GrpcExecutorJobType job_type)378 grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorType executor_type,
379 GrpcExecutorJobType job_type) {
380 return &schedulers_[executor_type][job_type];
381 }
382
grpc_executor_scheduler(GrpcExecutorJobType job_type)383 grpc_closure_scheduler* grpc_executor_scheduler(GrpcExecutorJobType job_type) {
384 return grpc_executor_scheduler(GRPC_DEFAULT_EXECUTOR, job_type);
385 }
386
grpc_executor_shutdown()387 void grpc_executor_shutdown() {
388 EXECUTOR_TRACE0("grpc_executor_shutdown() enter");
389
390 // Return if grpc_executor_shutdown() is already called earlier
391 if (executors[GRPC_DEFAULT_EXECUTOR] == nullptr) {
392 GPR_ASSERT(executors[GRPC_RESOLVER_EXECUTOR] == nullptr);
393 return;
394 }
395
396 executors[GRPC_DEFAULT_EXECUTOR]->Shutdown();
397 executors[GRPC_RESOLVER_EXECUTOR]->Shutdown();
398
399 // Delete the executor objects.
400 //
401 // NOTE: It is important to call Shutdown() on all executors first before
402 // calling Delete() because it is possible for one executor (that is not
403 // shutdown yet) to call Enqueue() on a different executor which is already
404 // shutdown. This is legal and in such cases, the Enqueue() operation
405 // effectively "fails" and enqueues that closure on the calling thread's
406 // exec_ctx.
407 //
408 // By ensuring that all executors are shutdown first, we are also ensuring
409 // that no thread is active across all executors.
410
411 grpc_core::Delete<GrpcExecutor>(executors[GRPC_DEFAULT_EXECUTOR]);
412 grpc_core::Delete<GrpcExecutor>(executors[GRPC_RESOLVER_EXECUTOR]);
413 executors[GRPC_DEFAULT_EXECUTOR] = nullptr;
414 executors[GRPC_RESOLVER_EXECUTOR] = nullptr;
415
416 EXECUTOR_TRACE0("grpc_executor_shutdown() done");
417 }
418
grpc_executor_is_threaded(GrpcExecutorType executor_type)419 bool grpc_executor_is_threaded(GrpcExecutorType executor_type) {
420 GPR_ASSERT(executor_type < GRPC_NUM_EXECUTORS);
421 return executors[executor_type]->IsThreaded();
422 }
423
grpc_executor_is_threaded()424 bool grpc_executor_is_threaded() {
425 return grpc_executor_is_threaded(GRPC_DEFAULT_EXECUTOR);
426 }
427
grpc_executor_set_threading(bool enable)428 void grpc_executor_set_threading(bool enable) {
429 EXECUTOR_TRACE("grpc_executor_set_threading(%d) called", enable);
430 for (int i = 0; i < GRPC_NUM_EXECUTORS; i++) {
431 executors[i]->SetThreading(enable);
432 }
433 }
434