1 /*
2 *
3 * Copyright 2019 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include <grpc/support/port_platform.h>
20
21 #include "src/core/lib/iomgr/executor/threadpool.h"
22
23 namespace grpc_core {
24
Run()25 void ThreadPoolWorker::Run() {
26 while (true) {
27 void* elem;
28
29 if (GRPC_TRACE_FLAG_ENABLED(grpc_thread_pool_trace)) {
30 // Updates stats and print
31 gpr_timespec wait_time = gpr_time_0(GPR_TIMESPAN);
32 elem = queue_->Get(&wait_time);
33 stats_.sleep_time = gpr_time_add(stats_.sleep_time, wait_time);
34 gpr_log(GPR_INFO,
35 "ThreadPool Worker [%s %d] Stats: sleep_time %f",
36 thd_name_, index_, gpr_timespec_to_micros(stats_.sleep_time));
37 } else {
38 elem = queue_->Get(nullptr);
39 }
40 if (elem == nullptr) {
41 break;
42 }
43 // Runs closure
44 auto* closure =
45 static_cast<grpc_experimental_completion_queue_functor*>(elem);
46 closure->functor_run(closure, closure->internal_success);
47 }
48 }
49
SharedThreadPoolConstructor()50 void ThreadPool::SharedThreadPoolConstructor() {
51 // All worker threads in thread pool must be joinable.
52 thread_options_.set_joinable(true);
53
54 // Create at least 1 worker thread.
55 if (num_threads_ <= 0) num_threads_ = 1;
56
57 queue_ = new InfLenFIFOQueue();
58 threads_ = static_cast<ThreadPoolWorker**>(
59 gpr_zalloc(num_threads_ * sizeof(ThreadPoolWorker*)));
60 for (int i = 0; i < num_threads_; ++i) {
61 threads_[i] = new ThreadPoolWorker(thd_name_, queue_, thread_options_, i);
62 threads_[i]->Start();
63 }
64 }
65
DefaultStackSize()66 size_t ThreadPool::DefaultStackSize() {
67 #if defined(__ANDROID__) || defined(__APPLE__)
68 return 1952 * 1024;
69 #else
70 return 64 * 1024;
71 #endif
72 }
73
AssertHasNotBeenShutDown()74 void ThreadPool::AssertHasNotBeenShutDown() {
75 // For debug checking purpose, using RELAXED order is sufficient.
76 GPR_DEBUG_ASSERT(!shut_down_.Load(MemoryOrder::RELAXED));
77 }
78
ThreadPool(int num_threads)79 ThreadPool::ThreadPool(int num_threads) : num_threads_(num_threads) {
80 thd_name_ = "ThreadPoolWorker";
81 thread_options_ = Thread::Options();
82 thread_options_.set_stack_size(DefaultStackSize());
83 SharedThreadPoolConstructor();
84 }
85
ThreadPool(int num_threads,const char * thd_name)86 ThreadPool::ThreadPool(int num_threads, const char* thd_name)
87 : num_threads_(num_threads), thd_name_(thd_name) {
88 thread_options_ = Thread::Options();
89 thread_options_.set_stack_size(DefaultStackSize());
90 SharedThreadPoolConstructor();
91 }
92
ThreadPool(int num_threads,const char * thd_name,const Thread::Options & thread_options)93 ThreadPool::ThreadPool(int num_threads, const char* thd_name,
94 const Thread::Options& thread_options)
95 : num_threads_(num_threads),
96 thd_name_(thd_name),
97 thread_options_(thread_options) {
98 if (thread_options_.stack_size() == 0) {
99 thread_options_.set_stack_size(DefaultStackSize());
100 }
101 SharedThreadPoolConstructor();
102 }
103
~ThreadPool()104 ThreadPool::~ThreadPool() {
105 // For debug checking purpose, using RELAXED order is sufficient.
106 shut_down_.Store(true, MemoryOrder::RELAXED);
107
108 for (int i = 0; i < num_threads_; ++i) {
109 queue_->Put(nullptr);
110 }
111
112 for (int i = 0; i < num_threads_; ++i) {
113 threads_[i]->Join();
114 }
115
116 for (int i = 0; i < num_threads_; ++i) {
117 delete threads_[i];
118 }
119 gpr_free(threads_);
120 delete queue_;
121 }
122
Add(grpc_experimental_completion_queue_functor * closure)123 void ThreadPool::Add(grpc_experimental_completion_queue_functor* closure) {
124 AssertHasNotBeenShutDown();
125 queue_->Put(static_cast<void*>(closure));
126 }
127
num_pending_closures() const128 int ThreadPool::num_pending_closures() const { return queue_->count(); }
129
pool_capacity() const130 int ThreadPool::pool_capacity() const { return num_threads_; }
131
thread_options() const132 const Thread::Options& ThreadPool::thread_options() const {
133 return thread_options_;
134 }
135
thread_name() const136 const char* ThreadPool::thread_name() const { return thd_name_; }
137 } // namespace grpc_core
138