1 /*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19 #include "src/cpp/server/dynamic_thread_pool.h"
20
21 #include <grpc/support/log.h>
22 #include <grpcpp/impl/codegen/sync.h>
23
24 #include "src/core/lib/gprpp/thd.h"
25
26 namespace grpc {
27
DynamicThread(DynamicThreadPool * pool)28 DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
29 : pool_(pool),
30 thd_("grpcpp_dynamic_pool",
31 [](void* th) {
32 static_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc();
33 },
34 this) {
35 thd_.Start();
36 }
~DynamicThread()37 DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); }
38
ThreadFunc()39 void DynamicThreadPool::DynamicThread::ThreadFunc() {
40 pool_->ThreadFunc();
41 // Now that we have killed ourselves, we should reduce the thread count
42 grpc_core::MutexLock lock(&pool_->mu_);
43 pool_->nthreads_--;
44 // Move ourselves to dead list
45 pool_->dead_threads_.push_back(this);
46
47 if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
48 pool_->shutdown_cv_.Signal();
49 }
50 }
51
ThreadFunc()52 void DynamicThreadPool::ThreadFunc() {
53 for (;;) {
54 // Wait until work is available or we are shutting down.
55 grpc_core::ReleasableMutexLock lock(&mu_);
56 if (!shutdown_ && callbacks_.empty()) {
57 // If there are too many threads waiting, then quit this thread
58 if (threads_waiting_ >= reserve_threads_) {
59 break;
60 }
61 threads_waiting_++;
62 cv_.Wait(&mu_);
63 threads_waiting_--;
64 }
65 // Drain callbacks before considering shutdown to ensure all work
66 // gets completed.
67 if (!callbacks_.empty()) {
68 auto cb = callbacks_.front();
69 callbacks_.pop();
70 lock.Unlock();
71 cb();
72 } else if (shutdown_) {
73 break;
74 }
75 }
76 }
77
DynamicThreadPool(int reserve_threads)78 DynamicThreadPool::DynamicThreadPool(int reserve_threads)
79 : shutdown_(false),
80 reserve_threads_(reserve_threads),
81 nthreads_(0),
82 threads_waiting_(0) {
83 for (int i = 0; i < reserve_threads_; i++) {
84 grpc_core::MutexLock lock(&mu_);
85 nthreads_++;
86 new DynamicThread(this);
87 }
88 }
89
ReapThreads(std::list<DynamicThread * > * tlist)90 void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
91 for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
92 delete *t;
93 }
94 }
95
~DynamicThreadPool()96 DynamicThreadPool::~DynamicThreadPool() {
97 grpc_core::MutexLock lock(&mu_);
98 shutdown_ = true;
99 cv_.Broadcast();
100 while (nthreads_ != 0) {
101 shutdown_cv_.Wait(&mu_);
102 }
103 ReapThreads(&dead_threads_);
104 }
105
Add(const std::function<void ()> & callback)106 void DynamicThreadPool::Add(const std::function<void()>& callback) {
107 grpc_core::MutexLock lock(&mu_);
108 // Add works to the callbacks list
109 callbacks_.push(callback);
110 // Increase pool size or notify as needed
111 if (threads_waiting_ == 0) {
112 // Kick off a new thread
113 nthreads_++;
114 new DynamicThread(this);
115 } else {
116 cv_.Signal();
117 }
118 // Also use this chance to harvest dead threads
119 if (!dead_threads_.empty()) {
120 ReapThreads(&dead_threads_);
121 }
122 }
123
124 } // namespace grpc
125