1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18
19 #include "src/core/lib/iomgr/executor.h"
20
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/cpu.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/sync.h>
25 #include <string.h>
26
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/strings/str_format.h"
30 #include "src/core/lib/debug/trace_impl.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/iomgr/iomgr_internal.h"
33 #include "src/core/util/crash.h"
34 #include "src/core/util/memory.h"
35 #include "src/core/util/useful.h"
36
37 #define MAX_DEPTH 2
38
39 namespace grpc_core {
40 namespace {
41
42 thread_local ThreadState* g_this_thread_state;
43
44 Executor* executors[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)];
45
default_enqueue_short(grpc_closure * closure,grpc_error_handle error)46 void default_enqueue_short(grpc_closure* closure, grpc_error_handle error) {
47 executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
48 closure, error, true /* is_short */);
49 }
50
default_enqueue_long(grpc_closure * closure,grpc_error_handle error)51 void default_enqueue_long(grpc_closure* closure, grpc_error_handle error) {
52 executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
53 closure, error, false /* is_short */);
54 }
55
resolver_enqueue_short(grpc_closure * closure,grpc_error_handle error)56 void resolver_enqueue_short(grpc_closure* closure, grpc_error_handle error) {
57 executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
58 closure, error, true /* is_short */);
59 }
60
resolver_enqueue_long(grpc_closure * closure,grpc_error_handle error)61 void resolver_enqueue_long(grpc_closure* closure, grpc_error_handle error) {
62 executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
63 closure, error, false /* is_short */);
64 }
65
66 using EnqueueFunc = void (*)(grpc_closure* closure, grpc_error_handle error);
67
68 const EnqueueFunc
69 executor_enqueue_fns_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
70 [static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] =
71 {{default_enqueue_short, default_enqueue_long},
72 {resolver_enqueue_short, resolver_enqueue_long}};
73
74 } // namespace
75
Executor(const char * name)76 Executor::Executor(const char* name) : name_(name) {
77 adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
78 gpr_atm_rel_store(&num_threads_, 0);
79 max_threads_ = std::max(1u, 2 * gpr_cpu_num_cores());
80 }
81
Init()82 void Executor::Init() { SetThreading(true); }
83
RunClosures(const char * executor_name,grpc_closure_list list)84 size_t Executor::RunClosures(const char* executor_name,
85 grpc_closure_list list) {
86 size_t n = 0;
87
88 // In the executor, the ExecCtx for the thread is declared in the executor
89 // thread itself, but this is the point where we could start seeing
90 // application-level callbacks. No need to create a new ExecCtx, though,
91 // since there already is one and it is flushed (but not destructed) in this
92 // function itself. The ApplicationCallbackExecCtx will have its callbacks
93 // invoked on its destruction, which will be after completing any closures in
94 // the executor's closure list (which were explicitly scheduled onto the
95 // executor).
96 ApplicationCallbackExecCtx callback_exec_ctx(
97 GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
98
99 grpc_closure* c = list.head;
100 while (c != nullptr) {
101 grpc_closure* next = c->next_data.next;
102 #ifndef NDEBUG
103 GRPC_TRACE_LOG(executor, INFO)
104 << "EXECUTOR (" << executor_name << ") run " << c << " [created by "
105 << c->file_created << ":" << c->line_created << "]";
106 c->scheduled = false;
107 #else
108 GRPC_TRACE_LOG(executor, INFO)
109 << "EXECUTOR (" << executor_name << ") run " << c;
110 #endif
111 grpc_error_handle error =
112 internal::StatusMoveFromHeapPtr(c->error_data.error);
113 c->error_data.error = 0;
114 c->cb(c->cb_arg, std::move(error));
115 c = next;
116 n++;
117 ExecCtx::Get()->Flush();
118 }
119
120 return n;
121 }
122
IsThreaded() const123 bool Executor::IsThreaded() const {
124 return gpr_atm_acq_load(&num_threads_) > 0;
125 }
126
SetThreading(bool threading)127 void Executor::SetThreading(bool threading) {
128 gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
129 GRPC_TRACE_LOG(executor, INFO)
130 << "EXECUTOR (" << name_ << ") SetThreading(" << threading << ") begin";
131
132 if (threading) {
133 if (curr_num_threads > 0) {
134 GRPC_TRACE_LOG(executor, INFO)
135 << "EXECUTOR (" << name_
136 << ") SetThreading(true). curr_num_threads > 0";
137 return;
138 }
139
140 CHECK_EQ(num_threads_, 0);
141 gpr_atm_rel_store(&num_threads_, 1);
142 thd_state_ = static_cast<ThreadState*>(
143 gpr_zalloc(sizeof(ThreadState) * max_threads_));
144
145 for (size_t i = 0; i < max_threads_; i++) {
146 gpr_mu_init(&thd_state_[i].mu);
147 gpr_cv_init(&thd_state_[i].cv);
148 thd_state_[i].id = i;
149 thd_state_[i].name = name_;
150 thd_state_[i].thd = Thread();
151 thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT;
152 }
153
154 thd_state_[0].thd = Thread(name_, &Executor::ThreadMain, &thd_state_[0]);
155 thd_state_[0].thd.Start();
156 } else { // !threading
157 if (curr_num_threads == 0) {
158 GRPC_TRACE_LOG(executor, INFO)
159 << "EXECUTOR (" << name_
160 << ") SetThreading(false). curr_num_threads == 0";
161 return;
162 }
163
164 for (size_t i = 0; i < max_threads_; i++) {
165 gpr_mu_lock(&thd_state_[i].mu);
166 thd_state_[i].shutdown = true;
167 gpr_cv_signal(&thd_state_[i].cv);
168 gpr_mu_unlock(&thd_state_[i].mu);
169 }
170
171 // Ensure no thread is adding a new thread. Once this is past, then no
172 // thread will try to add a new one either (since shutdown is true)
173 gpr_spinlock_lock(&adding_thread_lock_);
174 gpr_spinlock_unlock(&adding_thread_lock_);
175
176 curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
177 for (gpr_atm i = 0; i < curr_num_threads; i++) {
178 thd_state_[i].thd.Join();
179 GRPC_TRACE_LOG(executor, INFO)
180 << "EXECUTOR (" << name_ << ") Thread " << i + 1 << " of "
181 << curr_num_threads << " joined";
182 }
183
184 gpr_atm_rel_store(&num_threads_, 0);
185 for (size_t i = 0; i < max_threads_; i++) {
186 gpr_mu_destroy(&thd_state_[i].mu);
187 gpr_cv_destroy(&thd_state_[i].cv);
188 RunClosures(thd_state_[i].name, thd_state_[i].elems);
189 }
190
191 gpr_free(thd_state_);
192
193 // grpc_iomgr_shutdown_background_closure() will close all the registered
194 // fds in the background poller, and wait for all pending closures to
195 // finish. Thus, never call Executor::SetThreading(false) in the middle of
196 // an application.
197 // TODO(guantaol): create another method to finish all the pending closures
198 // registered in the background poller by Executor.
199 grpc_iomgr_platform_shutdown_background_closure();
200 }
201
202 GRPC_TRACE_LOG(executor, INFO)
203 << "EXECUTOR (" << name_ << ") SetThreading(" << threading << ") done";
204 }
205
Shutdown()206 void Executor::Shutdown() { SetThreading(false); }
207
ThreadMain(void * arg)208 void Executor::ThreadMain(void* arg) {
209 ThreadState* ts = static_cast<ThreadState*>(arg);
210 g_this_thread_state = ts;
211
212 ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
213
214 size_t subtract_depth = 0;
215 for (;;) {
216 GRPC_TRACE_LOG(executor, INFO)
217 << "EXECUTOR (" << ts->name << ") [" << ts->id
218 << "]: step (sub_depth=" << subtract_depth << ")";
219
220 gpr_mu_lock(&ts->mu);
221 ts->depth -= subtract_depth;
222 // Wait for closures to be enqueued or for the executor to be shutdown
223 while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
224 ts->queued_long_job = false;
225 gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
226 }
227
228 if (ts->shutdown) {
229 GRPC_TRACE_LOG(executor, INFO)
230 << "EXECUTOR (" << ts->name << ") [" << ts->id << "]: shutdown";
231 gpr_mu_unlock(&ts->mu);
232 break;
233 }
234
235 grpc_closure_list closures = ts->elems;
236 ts->elems = GRPC_CLOSURE_LIST_INIT;
237 gpr_mu_unlock(&ts->mu);
238
239 GRPC_TRACE_LOG(executor, INFO)
240 << "EXECUTOR (" << ts->name << ") [" << ts->id << "]: execute";
241
242 ExecCtx::Get()->InvalidateNow();
243 subtract_depth = RunClosures(ts->name, closures);
244 }
245
246 g_this_thread_state = nullptr;
247 }
248
Enqueue(grpc_closure * closure,grpc_error_handle error,bool is_short)249 void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error,
250 bool is_short) {
251 bool retry_push;
252
253 do {
254 retry_push = false;
255 size_t cur_thread_count =
256 static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
257
258 // If the number of threads is zero(i.e either the executor is not threaded
259 // or already shutdown), then queue the closure on the exec context itself
260 if (cur_thread_count == 0) {
261 #ifndef NDEBUG
262 GRPC_TRACE_LOG(executor, INFO)
263 << "EXECUTOR (" << name_ << ") schedule " << closure << " (created "
264 << closure->file_created << ":" << closure->line_created
265 << ") inline";
266 #else
267 GRPC_TRACE_LOG(executor, INFO)
268 << "EXECUTOR (" << name_ << ") schedule " << closure << " inline";
269 #endif
270 grpc_closure_list_append(ExecCtx::Get()->closure_list(), closure, error);
271 return;
272 }
273
274 if (grpc_iomgr_platform_add_closure_to_background_poller(closure, error)) {
275 return;
276 }
277
278 ThreadState* ts = g_this_thread_state;
279 if (ts == nullptr) {
280 ts = &thd_state_[HashPointer(ExecCtx::Get(), cur_thread_count)];
281 }
282
283 ThreadState* orig_ts = ts;
284 bool try_new_thread = false;
285
286 for (;;) {
287 #ifndef NDEBUG
288 GRPC_TRACE_LOG(executor, INFO)
289 << "EXECUTOR (" << name_ << ") try to schedule " << closure << " ("
290 << (is_short ? "short" : "long") << ") (created "
291 << closure->file_created << ":" << closure->line_created
292 << ") to thread " << ts->id;
293 #else
294 GRPC_TRACE_LOG(executor, INFO)
295 << "EXECUTOR (" << name_ << ") try to schedule " << closure << " ("
296 << (is_short ? "short" : "long") << ") to thread " << ts->id;
297 #endif
298
299 gpr_mu_lock(&ts->mu);
300 if (ts->queued_long_job) {
301 // if there's a long job queued, we never queue anything else to this
302 // queue (since long jobs can take 'infinite' time and we need to
303 // guarantee no starvation). Spin through queues and try again
304 gpr_mu_unlock(&ts->mu);
305 size_t idx = ts->id;
306 ts = &thd_state_[(idx + 1) % cur_thread_count];
307 if (ts == orig_ts) {
308 // We cycled through all the threads. Retry enqueue again by creating
309 // a new thread
310 //
311 // TODO (sreek): There is a potential issue here. We are
312 // unconditionally setting try_new_thread to true here. What if the
313 // executor is shutdown OR if cur_thread_count is already equal to
314 // max_threads ?
315 // (Fortunately, this is not an issue yet (as of july 2018) because
316 // there is only one instance of long job in gRPC and hence we will
317 // not hit this code path)
318 retry_push = true;
319 try_new_thread = true;
320 break;
321 }
322
323 continue; // Try the next thread-state
324 }
325
326 // == Found the thread state (i.e thread) to enqueue this closure! ==
327
328 // Also, if this thread has been waiting for closures, wake it up.
329 // - If grpc_closure_list_empty() is true and the Executor is not
330 // shutdown, it means that the thread must be waiting in ThreadMain()
331 // - Note that gpr_cv_signal() won't immediately wakeup the thread. That
332 // happens after we release the mutex &ts->mu a few lines below
333 if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
334 gpr_cv_signal(&ts->cv);
335 }
336
337 grpc_closure_list_append(&ts->elems, closure, error);
338
339 // If we already queued more than MAX_DEPTH number of closures on this
340 // thread, use this as a hint to create more threads
341 ts->depth++;
342 try_new_thread = ts->depth > MAX_DEPTH &&
343 cur_thread_count < max_threads_ && !ts->shutdown;
344
345 ts->queued_long_job = !is_short;
346
347 gpr_mu_unlock(&ts->mu);
348 break;
349 }
350
351 if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) {
352 cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
353 if (cur_thread_count < max_threads_) {
354 // Increment num_threads (safe to do a store instead of a cas because we
355 // always increment num_threads under the 'adding_thread_lock')
356 gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
357
358 thd_state_[cur_thread_count].thd =
359 Thread(name_, &Executor::ThreadMain, &thd_state_[cur_thread_count]);
360 thd_state_[cur_thread_count].thd.Start();
361 }
362 gpr_spinlock_unlock(&adding_thread_lock_);
363 }
364 } while (retry_push);
365 }
366
367 // Executor::InitAll() and Executor::ShutdownAll() functions are called in the
368 // the grpc_init() and grpc_shutdown() code paths which are protected by a
369 // global mutex. So it is okay to assume that these functions are thread-safe
InitAll()370 void Executor::InitAll() {
371 GRPC_TRACE_LOG(executor, INFO) << "Executor::InitAll() enter";
372
373 // Return if Executor::InitAll() is already called earlier
374 if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] != nullptr) {
375 CHECK(executors[static_cast<size_t>(ExecutorType::RESOLVER)] != nullptr);
376 return;
377 }
378
379 executors[static_cast<size_t>(ExecutorType::DEFAULT)] =
380 new Executor("default-executor");
381 executors[static_cast<size_t>(ExecutorType::RESOLVER)] =
382 new Executor("resolver-executor");
383
384 executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Init();
385 executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Init();
386
387 GRPC_TRACE_LOG(executor, INFO) << "Executor::InitAll() done";
388 }
389
Run(grpc_closure * closure,grpc_error_handle error,ExecutorType executor_type,ExecutorJobType job_type)390 void Executor::Run(grpc_closure* closure, grpc_error_handle error,
391 ExecutorType executor_type, ExecutorJobType job_type) {
392 executor_enqueue_fns_[static_cast<size_t>(executor_type)]
393 [static_cast<size_t>(job_type)](closure, error);
394 }
395
ShutdownAll()396 void Executor::ShutdownAll() {
397 GRPC_TRACE_LOG(executor, INFO) << "Executor::ShutdownAll() enter";
398
399 // Return if Executor:SshutdownAll() is already called earlier
400 if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] == nullptr) {
401 CHECK(executors[static_cast<size_t>(ExecutorType::RESOLVER)] == nullptr);
402 return;
403 }
404
405 executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Shutdown();
406 executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Shutdown();
407
408 // Delete the executor objects.
409 //
410 // NOTE: It is important to call Shutdown() on all executors first before
411 // calling delete because it is possible for one executor (that is not
412 // shutdown yet) to call Enqueue() on a different executor which is already
413 // shutdown. This is legal and in such cases, the Enqueue() operation
414 // effectively "fails" and enqueues that closure on the calling thread's
415 // exec_ctx.
416 //
417 // By ensuring that all executors are shutdown first, we are also ensuring
418 // that no thread is active across all executors.
419
420 delete executors[static_cast<size_t>(ExecutorType::DEFAULT)];
421 delete executors[static_cast<size_t>(ExecutorType::RESOLVER)];
422 executors[static_cast<size_t>(ExecutorType::DEFAULT)] = nullptr;
423 executors[static_cast<size_t>(ExecutorType::RESOLVER)] = nullptr;
424
425 GRPC_TRACE_LOG(executor, INFO) << "Executor::ShutdownAll() done";
426 }
427
IsThreaded(ExecutorType executor_type)428 bool Executor::IsThreaded(ExecutorType executor_type) {
429 CHECK(executor_type < ExecutorType::NUM_EXECUTORS);
430 return executors[static_cast<size_t>(executor_type)]->IsThreaded();
431 }
432
IsThreadedDefault()433 bool Executor::IsThreadedDefault() {
434 return Executor::IsThreaded(ExecutorType::DEFAULT);
435 }
436
SetThreadingAll(bool enable)437 void Executor::SetThreadingAll(bool enable) {
438 GRPC_TRACE_LOG(executor, INFO)
439 << "EXECUTOR Executor::SetThreadingAll(" << enable << ") called";
440 for (size_t i = 0; i < static_cast<size_t>(ExecutorType::NUM_EXECUTORS);
441 i++) {
442 executors[i]->SetThreading(enable);
443 }
444 }
445
SetThreadingDefault(bool enable)446 void Executor::SetThreadingDefault(bool enable) {
447 GRPC_TRACE_LOG(executor, INFO)
448 << "EXECUTOR Executor::SetThreadingDefault(" << enable << ") called";
449 executors[static_cast<size_t>(ExecutorType::DEFAULT)]->SetThreading(enable);
450 }
451
452 } // namespace grpc_core
453