• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 //
2 //
3 // Copyright 2015 gRPC authors.
4 //
5 // Licensed under the Apache License, Version 2.0 (the "License");
6 // you may not use this file except in compliance with the License.
7 // You may obtain a copy of the License at
8 //
9 //     http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing, software
12 // distributed under the License is distributed on an "AS IS" BASIS,
13 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 // See the License for the specific language governing permissions and
15 // limitations under the License.
16 //
17 //
18 
19 #include "src/core/lib/iomgr/executor.h"
20 
21 #include <grpc/support/alloc.h>
22 #include <grpc/support/cpu.h>
23 #include <grpc/support/port_platform.h>
24 #include <grpc/support/sync.h>
25 #include <string.h>
26 
27 #include "absl/log/check.h"
28 #include "absl/log/log.h"
29 #include "absl/strings/str_format.h"
30 #include "src/core/lib/debug/trace_impl.h"
31 #include "src/core/lib/iomgr/exec_ctx.h"
32 #include "src/core/lib/iomgr/iomgr_internal.h"
33 #include "src/core/util/crash.h"
34 #include "src/core/util/memory.h"
35 #include "src/core/util/useful.h"
36 
37 #define MAX_DEPTH 2
38 
39 namespace grpc_core {
40 namespace {
41 
42 thread_local ThreadState* g_this_thread_state;
43 
44 Executor* executors[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)];
45 
default_enqueue_short(grpc_closure * closure,grpc_error_handle error)46 void default_enqueue_short(grpc_closure* closure, grpc_error_handle error) {
47   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
48       closure, error, true /* is_short */);
49 }
50 
default_enqueue_long(grpc_closure * closure,grpc_error_handle error)51 void default_enqueue_long(grpc_closure* closure, grpc_error_handle error) {
52   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Enqueue(
53       closure, error, false /* is_short */);
54 }
55 
resolver_enqueue_short(grpc_closure * closure,grpc_error_handle error)56 void resolver_enqueue_short(grpc_closure* closure, grpc_error_handle error) {
57   executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
58       closure, error, true /* is_short */);
59 }
60 
resolver_enqueue_long(grpc_closure * closure,grpc_error_handle error)61 void resolver_enqueue_long(grpc_closure* closure, grpc_error_handle error) {
62   executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Enqueue(
63       closure, error, false /* is_short */);
64 }
65 
66 using EnqueueFunc = void (*)(grpc_closure* closure, grpc_error_handle error);
67 
68 const EnqueueFunc
69     executor_enqueue_fns_[static_cast<size_t>(ExecutorType::NUM_EXECUTORS)]
70                          [static_cast<size_t>(ExecutorJobType::NUM_JOB_TYPES)] =
71                              {{default_enqueue_short, default_enqueue_long},
72                               {resolver_enqueue_short, resolver_enqueue_long}};
73 
74 }  // namespace
75 
Executor(const char * name)76 Executor::Executor(const char* name) : name_(name) {
77   adding_thread_lock_ = GPR_SPINLOCK_STATIC_INITIALIZER;
78   gpr_atm_rel_store(&num_threads_, 0);
79   max_threads_ = std::max(1u, 2 * gpr_cpu_num_cores());
80 }
81 
Init()82 void Executor::Init() { SetThreading(true); }
83 
RunClosures(const char * executor_name,grpc_closure_list list)84 size_t Executor::RunClosures(const char* executor_name,
85                              grpc_closure_list list) {
86   size_t n = 0;
87 
88   // In the executor, the ExecCtx for the thread is declared in the executor
89   // thread itself, but this is the point where we could start seeing
90   // application-level callbacks. No need to create a new ExecCtx, though,
91   // since there already is one and it is flushed (but not destructed) in this
92   // function itself. The ApplicationCallbackExecCtx will have its callbacks
93   // invoked on its destruction, which will be after completing any closures in
94   // the executor's closure list (which were explicitly scheduled onto the
95   // executor).
96   ApplicationCallbackExecCtx callback_exec_ctx(
97       GRPC_APP_CALLBACK_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
98 
99   grpc_closure* c = list.head;
100   while (c != nullptr) {
101     grpc_closure* next = c->next_data.next;
102 #ifndef NDEBUG
103     GRPC_TRACE_LOG(executor, INFO)
104         << "EXECUTOR (" << executor_name << ") run " << c << " [created by "
105         << c->file_created << ":" << c->line_created << "]";
106     c->scheduled = false;
107 #else
108     GRPC_TRACE_LOG(executor, INFO)
109         << "EXECUTOR (" << executor_name << ") run " << c;
110 #endif
111     grpc_error_handle error =
112         internal::StatusMoveFromHeapPtr(c->error_data.error);
113     c->error_data.error = 0;
114     c->cb(c->cb_arg, std::move(error));
115     c = next;
116     n++;
117     ExecCtx::Get()->Flush();
118   }
119 
120   return n;
121 }
122 
IsThreaded() const123 bool Executor::IsThreaded() const {
124   return gpr_atm_acq_load(&num_threads_) > 0;
125 }
126 
SetThreading(bool threading)127 void Executor::SetThreading(bool threading) {
128   gpr_atm curr_num_threads = gpr_atm_acq_load(&num_threads_);
129   GRPC_TRACE_LOG(executor, INFO)
130       << "EXECUTOR (" << name_ << ") SetThreading(" << threading << ") begin";
131 
132   if (threading) {
133     if (curr_num_threads > 0) {
134       GRPC_TRACE_LOG(executor, INFO)
135           << "EXECUTOR (" << name_
136           << ") SetThreading(true). curr_num_threads > 0";
137       return;
138     }
139 
140     CHECK_EQ(num_threads_, 0);
141     gpr_atm_rel_store(&num_threads_, 1);
142     thd_state_ = static_cast<ThreadState*>(
143         gpr_zalloc(sizeof(ThreadState) * max_threads_));
144 
145     for (size_t i = 0; i < max_threads_; i++) {
146       gpr_mu_init(&thd_state_[i].mu);
147       gpr_cv_init(&thd_state_[i].cv);
148       thd_state_[i].id = i;
149       thd_state_[i].name = name_;
150       thd_state_[i].thd = Thread();
151       thd_state_[i].elems = GRPC_CLOSURE_LIST_INIT;
152     }
153 
154     thd_state_[0].thd = Thread(name_, &Executor::ThreadMain, &thd_state_[0]);
155     thd_state_[0].thd.Start();
156   } else {  // !threading
157     if (curr_num_threads == 0) {
158       GRPC_TRACE_LOG(executor, INFO)
159           << "EXECUTOR (" << name_
160           << ") SetThreading(false). curr_num_threads == 0";
161       return;
162     }
163 
164     for (size_t i = 0; i < max_threads_; i++) {
165       gpr_mu_lock(&thd_state_[i].mu);
166       thd_state_[i].shutdown = true;
167       gpr_cv_signal(&thd_state_[i].cv);
168       gpr_mu_unlock(&thd_state_[i].mu);
169     }
170 
171     // Ensure no thread is adding a new thread. Once this is past, then no
172     // thread will try to add a new one either (since shutdown is true)
173     gpr_spinlock_lock(&adding_thread_lock_);
174     gpr_spinlock_unlock(&adding_thread_lock_);
175 
176     curr_num_threads = gpr_atm_no_barrier_load(&num_threads_);
177     for (gpr_atm i = 0; i < curr_num_threads; i++) {
178       thd_state_[i].thd.Join();
179       GRPC_TRACE_LOG(executor, INFO)
180           << "EXECUTOR (" << name_ << ") Thread " << i + 1 << " of "
181           << curr_num_threads << " joined";
182     }
183 
184     gpr_atm_rel_store(&num_threads_, 0);
185     for (size_t i = 0; i < max_threads_; i++) {
186       gpr_mu_destroy(&thd_state_[i].mu);
187       gpr_cv_destroy(&thd_state_[i].cv);
188       RunClosures(thd_state_[i].name, thd_state_[i].elems);
189     }
190 
191     gpr_free(thd_state_);
192 
193     // grpc_iomgr_shutdown_background_closure() will close all the registered
194     // fds in the background poller, and wait for all pending closures to
195     // finish. Thus, never call Executor::SetThreading(false) in the middle of
196     // an application.
197     // TODO(guantaol): create another method to finish all the pending closures
198     // registered in the background poller by Executor.
199     grpc_iomgr_platform_shutdown_background_closure();
200   }
201 
202   GRPC_TRACE_LOG(executor, INFO)
203       << "EXECUTOR (" << name_ << ") SetThreading(" << threading << ") done";
204 }
205 
Shutdown()206 void Executor::Shutdown() { SetThreading(false); }
207 
ThreadMain(void * arg)208 void Executor::ThreadMain(void* arg) {
209   ThreadState* ts = static_cast<ThreadState*>(arg);
210   g_this_thread_state = ts;
211 
212   ExecCtx exec_ctx(GRPC_EXEC_CTX_FLAG_IS_INTERNAL_THREAD);
213 
214   size_t subtract_depth = 0;
215   for (;;) {
216     GRPC_TRACE_LOG(executor, INFO)
217         << "EXECUTOR (" << ts->name << ") [" << ts->id
218         << "]: step (sub_depth=" << subtract_depth << ")";
219 
220     gpr_mu_lock(&ts->mu);
221     ts->depth -= subtract_depth;
222     // Wait for closures to be enqueued or for the executor to be shutdown
223     while (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
224       ts->queued_long_job = false;
225       gpr_cv_wait(&ts->cv, &ts->mu, gpr_inf_future(GPR_CLOCK_MONOTONIC));
226     }
227 
228     if (ts->shutdown) {
229       GRPC_TRACE_LOG(executor, INFO)
230           << "EXECUTOR (" << ts->name << ") [" << ts->id << "]: shutdown";
231       gpr_mu_unlock(&ts->mu);
232       break;
233     }
234 
235     grpc_closure_list closures = ts->elems;
236     ts->elems = GRPC_CLOSURE_LIST_INIT;
237     gpr_mu_unlock(&ts->mu);
238 
239     GRPC_TRACE_LOG(executor, INFO)
240         << "EXECUTOR (" << ts->name << ") [" << ts->id << "]: execute";
241 
242     ExecCtx::Get()->InvalidateNow();
243     subtract_depth = RunClosures(ts->name, closures);
244   }
245 
246   g_this_thread_state = nullptr;
247 }
248 
Enqueue(grpc_closure * closure,grpc_error_handle error,bool is_short)249 void Executor::Enqueue(grpc_closure* closure, grpc_error_handle error,
250                        bool is_short) {
251   bool retry_push;
252 
253   do {
254     retry_push = false;
255     size_t cur_thread_count =
256         static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
257 
258     // If the number of threads is zero(i.e either the executor is not threaded
259     // or already shutdown), then queue the closure on the exec context itself
260     if (cur_thread_count == 0) {
261 #ifndef NDEBUG
262       GRPC_TRACE_LOG(executor, INFO)
263           << "EXECUTOR (" << name_ << ") schedule " << closure << " (created "
264           << closure->file_created << ":" << closure->line_created
265           << ") inline";
266 #else
267       GRPC_TRACE_LOG(executor, INFO)
268           << "EXECUTOR (" << name_ << ") schedule " << closure << " inline";
269 #endif
270       grpc_closure_list_append(ExecCtx::Get()->closure_list(), closure, error);
271       return;
272     }
273 
274     if (grpc_iomgr_platform_add_closure_to_background_poller(closure, error)) {
275       return;
276     }
277 
278     ThreadState* ts = g_this_thread_state;
279     if (ts == nullptr) {
280       ts = &thd_state_[HashPointer(ExecCtx::Get(), cur_thread_count)];
281     }
282 
283     ThreadState* orig_ts = ts;
284     bool try_new_thread = false;
285 
286     for (;;) {
287 #ifndef NDEBUG
288       GRPC_TRACE_LOG(executor, INFO)
289           << "EXECUTOR (" << name_ << ") try to schedule " << closure << " ("
290           << (is_short ? "short" : "long") << ") (created "
291           << closure->file_created << ":" << closure->line_created
292           << ") to thread " << ts->id;
293 #else
294       GRPC_TRACE_LOG(executor, INFO)
295           << "EXECUTOR (" << name_ << ") try to schedule " << closure << " ("
296           << (is_short ? "short" : "long") << ") to thread " << ts->id;
297 #endif
298 
299       gpr_mu_lock(&ts->mu);
300       if (ts->queued_long_job) {
301         // if there's a long job queued, we never queue anything else to this
302         // queue (since long jobs can take 'infinite' time and we need to
303         // guarantee no starvation). Spin through queues and try again
304         gpr_mu_unlock(&ts->mu);
305         size_t idx = ts->id;
306         ts = &thd_state_[(idx + 1) % cur_thread_count];
307         if (ts == orig_ts) {
308           // We cycled through all the threads. Retry enqueue again by creating
309           // a new thread
310           //
311           // TODO (sreek): There is a potential issue here. We are
312           // unconditionally setting try_new_thread to true here. What if the
313           // executor is shutdown OR if cur_thread_count is already equal to
314           // max_threads ?
315           // (Fortunately, this is not an issue yet (as of july 2018) because
316           // there is only one instance of long job in gRPC and hence we will
317           // not hit this code path)
318           retry_push = true;
319           try_new_thread = true;
320           break;
321         }
322 
323         continue;  // Try the next thread-state
324       }
325 
326       // == Found the thread state (i.e thread) to enqueue this closure! ==
327 
328       // Also, if this thread has been waiting for closures, wake it up.
329       // - If grpc_closure_list_empty() is true and the Executor is not
330       //   shutdown, it means that the thread must be waiting in ThreadMain()
331       // - Note that gpr_cv_signal() won't immediately wakeup the thread. That
332       //   happens after we release the mutex &ts->mu a few lines below
333       if (grpc_closure_list_empty(ts->elems) && !ts->shutdown) {
334         gpr_cv_signal(&ts->cv);
335       }
336 
337       grpc_closure_list_append(&ts->elems, closure, error);
338 
339       // If we already queued more than MAX_DEPTH number of closures on this
340       // thread, use this as a hint to create more threads
341       ts->depth++;
342       try_new_thread = ts->depth > MAX_DEPTH &&
343                        cur_thread_count < max_threads_ && !ts->shutdown;
344 
345       ts->queued_long_job = !is_short;
346 
347       gpr_mu_unlock(&ts->mu);
348       break;
349     }
350 
351     if (try_new_thread && gpr_spinlock_trylock(&adding_thread_lock_)) {
352       cur_thread_count = static_cast<size_t>(gpr_atm_acq_load(&num_threads_));
353       if (cur_thread_count < max_threads_) {
354         // Increment num_threads (safe to do a store instead of a cas because we
355         // always increment num_threads under the 'adding_thread_lock')
356         gpr_atm_rel_store(&num_threads_, cur_thread_count + 1);
357 
358         thd_state_[cur_thread_count].thd =
359             Thread(name_, &Executor::ThreadMain, &thd_state_[cur_thread_count]);
360         thd_state_[cur_thread_count].thd.Start();
361       }
362       gpr_spinlock_unlock(&adding_thread_lock_);
363     }
364   } while (retry_push);
365 }
366 
367 // Executor::InitAll() and Executor::ShutdownAll() functions are called in the
368 // the grpc_init() and grpc_shutdown() code paths which are protected by a
369 // global mutex. So it is okay to assume that these functions are thread-safe
InitAll()370 void Executor::InitAll() {
371   GRPC_TRACE_LOG(executor, INFO) << "Executor::InitAll() enter";
372 
373   // Return if Executor::InitAll() is already called earlier
374   if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] != nullptr) {
375     CHECK(executors[static_cast<size_t>(ExecutorType::RESOLVER)] != nullptr);
376     return;
377   }
378 
379   executors[static_cast<size_t>(ExecutorType::DEFAULT)] =
380       new Executor("default-executor");
381   executors[static_cast<size_t>(ExecutorType::RESOLVER)] =
382       new Executor("resolver-executor");
383 
384   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Init();
385   executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Init();
386 
387   GRPC_TRACE_LOG(executor, INFO) << "Executor::InitAll() done";
388 }
389 
Run(grpc_closure * closure,grpc_error_handle error,ExecutorType executor_type,ExecutorJobType job_type)390 void Executor::Run(grpc_closure* closure, grpc_error_handle error,
391                    ExecutorType executor_type, ExecutorJobType job_type) {
392   executor_enqueue_fns_[static_cast<size_t>(executor_type)]
393                        [static_cast<size_t>(job_type)](closure, error);
394 }
395 
ShutdownAll()396 void Executor::ShutdownAll() {
397   GRPC_TRACE_LOG(executor, INFO) << "Executor::ShutdownAll() enter";
398 
399   // Return if Executor:SshutdownAll() is already called earlier
400   if (executors[static_cast<size_t>(ExecutorType::DEFAULT)] == nullptr) {
401     CHECK(executors[static_cast<size_t>(ExecutorType::RESOLVER)] == nullptr);
402     return;
403   }
404 
405   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->Shutdown();
406   executors[static_cast<size_t>(ExecutorType::RESOLVER)]->Shutdown();
407 
408   // Delete the executor objects.
409   //
410   // NOTE: It is important to call Shutdown() on all executors first before
411   // calling delete  because it is possible for one executor (that is not
412   // shutdown yet) to call Enqueue() on a different executor which is already
413   // shutdown. This is legal and in such cases, the Enqueue() operation
414   // effectively "fails" and enqueues that closure on the calling thread's
415   // exec_ctx.
416   //
417   // By ensuring that all executors are shutdown first, we are also ensuring
418   // that no thread is active across all executors.
419 
420   delete executors[static_cast<size_t>(ExecutorType::DEFAULT)];
421   delete executors[static_cast<size_t>(ExecutorType::RESOLVER)];
422   executors[static_cast<size_t>(ExecutorType::DEFAULT)] = nullptr;
423   executors[static_cast<size_t>(ExecutorType::RESOLVER)] = nullptr;
424 
425   GRPC_TRACE_LOG(executor, INFO) << "Executor::ShutdownAll() done";
426 }
427 
IsThreaded(ExecutorType executor_type)428 bool Executor::IsThreaded(ExecutorType executor_type) {
429   CHECK(executor_type < ExecutorType::NUM_EXECUTORS);
430   return executors[static_cast<size_t>(executor_type)]->IsThreaded();
431 }
432 
IsThreadedDefault()433 bool Executor::IsThreadedDefault() {
434   return Executor::IsThreaded(ExecutorType::DEFAULT);
435 }
436 
SetThreadingAll(bool enable)437 void Executor::SetThreadingAll(bool enable) {
438   GRPC_TRACE_LOG(executor, INFO)
439       << "EXECUTOR Executor::SetThreadingAll(" << enable << ") called";
440   for (size_t i = 0; i < static_cast<size_t>(ExecutorType::NUM_EXECUTORS);
441        i++) {
442     executors[i]->SetThreading(enable);
443   }
444 }
445 
SetThreadingDefault(bool enable)446 void Executor::SetThreadingDefault(bool enable) {
447   GRPC_TRACE_LOG(executor, INFO)
448       << "EXECUTOR Executor::SetThreadingDefault(" << enable << ") called";
449   executors[static_cast<size_t>(ExecutorType::DEFAULT)]->SetThreading(enable);
450 }
451 
452 }  // namespace grpc_core
453