• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  *
3  * Copyright 2015 gRPC authors.
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  *     http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  */
18 
19 #include "src/cpp/server/dynamic_thread_pool.h"
20 
21 #include <grpc/support/log.h>
22 #include <grpcpp/impl/codegen/sync.h>
23 
24 #include "src/core/lib/gprpp/thd.h"
25 
26 namespace grpc {
27 
DynamicThread(DynamicThreadPool * pool)28 DynamicThreadPool::DynamicThread::DynamicThread(DynamicThreadPool* pool)
29     : pool_(pool),
30       thd_("grpcpp_dynamic_pool",
31            [](void* th) {
32              static_cast<DynamicThreadPool::DynamicThread*>(th)->ThreadFunc();
33            },
34            this) {
35   thd_.Start();
36 }
~DynamicThread()37 DynamicThreadPool::DynamicThread::~DynamicThread() { thd_.Join(); }
38 
ThreadFunc()39 void DynamicThreadPool::DynamicThread::ThreadFunc() {
40   pool_->ThreadFunc();
41   // Now that we have killed ourselves, we should reduce the thread count
42   grpc_core::MutexLock lock(&pool_->mu_);
43   pool_->nthreads_--;
44   // Move ourselves to dead list
45   pool_->dead_threads_.push_back(this);
46 
47   if ((pool_->shutdown_) && (pool_->nthreads_ == 0)) {
48     pool_->shutdown_cv_.Signal();
49   }
50 }
51 
ThreadFunc()52 void DynamicThreadPool::ThreadFunc() {
53   for (;;) {
54     // Wait until work is available or we are shutting down.
55     grpc_core::ReleasableMutexLock lock(&mu_);
56     if (!shutdown_ && callbacks_.empty()) {
57       // If there are too many threads waiting, then quit this thread
58       if (threads_waiting_ >= reserve_threads_) {
59         break;
60       }
61       threads_waiting_++;
62       cv_.Wait(&mu_);
63       threads_waiting_--;
64     }
65     // Drain callbacks before considering shutdown to ensure all work
66     // gets completed.
67     if (!callbacks_.empty()) {
68       auto cb = callbacks_.front();
69       callbacks_.pop();
70       lock.Unlock();
71       cb();
72     } else if (shutdown_) {
73       break;
74     }
75   }
76 }
77 
DynamicThreadPool(int reserve_threads)78 DynamicThreadPool::DynamicThreadPool(int reserve_threads)
79     : shutdown_(false),
80       reserve_threads_(reserve_threads),
81       nthreads_(0),
82       threads_waiting_(0) {
83   for (int i = 0; i < reserve_threads_; i++) {
84     grpc_core::MutexLock lock(&mu_);
85     nthreads_++;
86     new DynamicThread(this);
87   }
88 }
89 
ReapThreads(std::list<DynamicThread * > * tlist)90 void DynamicThreadPool::ReapThreads(std::list<DynamicThread*>* tlist) {
91   for (auto t = tlist->begin(); t != tlist->end(); t = tlist->erase(t)) {
92     delete *t;
93   }
94 }
95 
~DynamicThreadPool()96 DynamicThreadPool::~DynamicThreadPool() {
97   grpc_core::MutexLock lock(&mu_);
98   shutdown_ = true;
99   cv_.Broadcast();
100   while (nthreads_ != 0) {
101     shutdown_cv_.Wait(&mu_);
102   }
103   ReapThreads(&dead_threads_);
104 }
105 
Add(const std::function<void ()> & callback)106 void DynamicThreadPool::Add(const std::function<void()>& callback) {
107   grpc_core::MutexLock lock(&mu_);
108   // Add works to the callbacks list
109   callbacks_.push(callback);
110   // Increase pool size or notify as needed
111   if (threads_waiting_ == 0) {
112     // Kick off a new thread
113     nthreads_++;
114     new DynamicThread(this);
115   } else {
116     cv_.Signal();
117   }
118   // Also use this chance to harvest dead threads
119   if (!dead_threads_.empty()) {
120     ReapThreads(&dead_threads_);
121   }
122 }
123 
124 }  // namespace grpc
125