• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "worker_pool.h"
16 
17 #include <memory>
18 #include "default_thread_factory.h"
19 
20 namespace OHOS {
21 namespace AppExecFwk {
22 const int WorkerPool::THREAD_UPPER_LIMIT = 256;
23 const int WorkerPool::MAX_THREAD_LOWER_LIMIT = 1;
24 const int WorkerPool::CORE_THREAD_LOWER_LIMIT = 0;
25 const int WorkerPool::COUNT_BITS = sizeof(int) * __CHAR_BIT__ - 3;
26 const unsigned int WorkerPool::CAPACITY = (1 << COUNT_BITS) - 1;
27 const int WorkerPool::RUNNING = (-(1 << COUNT_BITS));
28 const int WorkerPool::CLOSING = (0 << COUNT_BITS);
29 const int WorkerPool::INTERRUPT = (1 << COUNT_BITS);
30 const int WorkerPool::CLEANED = (2 << COUNT_BITS);
31 const int WorkerPool::CLOSED = (3 << COUNT_BITS);
32 
WorkerPool(const std::shared_ptr<WorkerPoolConfig> & config)33 WorkerPool::WorkerPool(const std::shared_ptr<WorkerPoolConfig> &config)
34 {
35     control_ = CombineToControl(RUNNING, 0);
36     pool_.clear();
37     exitPool_.clear();
38     WorkerPool::factory_ = std::make_shared<DefaultThreadFactory>();
39     initFlag_.store(Init(config));
40     stop_.store(false);
41 }
42 
~WorkerPool()43 WorkerPool::~WorkerPool()
44 {
45     control_ = 0;
46 
47     HILOG_INFO("WorkerPool::~WorkerPool");
48 }
49 
Init(const std::shared_ptr<WorkerPoolConfig> & config)50 bool WorkerPool::Init(const std::shared_ptr<WorkerPoolConfig> &config)
51 {
52     if (!CheckConfigParams(config)) {
53         HILOG_ERROR("WorkerPool::checkConfigParams  parameters are illegal");
54         return false;
55     }
56 
57     thread_limit_ = config->GetMaxThreadCount();
58     core_thread_limit_ = config->GetCoreThreadCount();
59     long keepAliveTime = config->GetKeepAliveTime();
60     alive_time_Limit_ = keepAliveTime > 0 ? keepAliveTime : 0;
61 
62     return true;
63 }
64 
CheckConfigParams(const std::shared_ptr<WorkerPoolConfig> & config)65 bool WorkerPool::CheckConfigParams(const std::shared_ptr<WorkerPoolConfig> &config)
66 {
67     if (config == nullptr) {
68         HILOG_ERROR("WorkerPool::CheckConfigParams config is nullptr");
69         return false;
70     }
71 
72     int maxThreadCount = config->GetMaxThreadCount();
73     int coreThreadCount = config->GetCoreThreadCount();
74 
75     if (!CheckThreadCount(maxThreadCount, coreThreadCount)) {
76         HILOG_ERROR("WorkerPool::CheckConfigParams parameters are illegal, maxThreadCount %{public}d is less than "
77                  "coreThreadCount %{public}d",
78             maxThreadCount,
79             coreThreadCount);
80         return false;
81     }
82     if (!CheckMaxThreadCount(maxThreadCount)) {
83         HILOG_ERROR("WorkerPool::CheckConfigParams maxThreadCount %{public}d is illegal", maxThreadCount);
84         return false;
85     }
86     if (!CheckCoreThreadCount(coreThreadCount)) {
87         HILOG_ERROR("WorkerPool::CheckConfigParams coreThreadCount %{public}d is illegal", coreThreadCount);
88         return false;
89     }
90     return true;
91 }
92 
CheckThreadCount(int maxThreadCount,int coreThreadCount)93 bool WorkerPool::CheckThreadCount(int maxThreadCount, int coreThreadCount)
94 {
95     return maxThreadCount >= coreThreadCount;
96 }
97 
CheckMaxThreadCount(int maxThreadCount)98 bool WorkerPool::CheckMaxThreadCount(int maxThreadCount)
99 {
100     return (maxThreadCount <= THREAD_UPPER_LIMIT) && (maxThreadCount >= MAX_THREAD_LOWER_LIMIT);
101 }
102 
CheckCoreThreadCount(int coreThreadCount)103 bool WorkerPool::CheckCoreThreadCount(int coreThreadCount)
104 {
105     return (coreThreadCount <= THREAD_UPPER_LIMIT) && (coreThreadCount >= CORE_THREAD_LOWER_LIMIT);
106 }
107 
GetKeepAliveTime(void) const108 long WorkerPool::GetKeepAliveTime(void) const
109 {
110     return alive_time_Limit_;
111 }
112 
GetCoreThreadCount(void) const113 int WorkerPool::GetCoreThreadCount(void) const
114 {
115     return core_thread_limit_;
116 }
117 
GetMaxThreadCount(void) const118 int WorkerPool::GetMaxThreadCount(void) const
119 {
120     return thread_limit_;
121 }
122 
GetWorkCount(void) const123 int WorkerPool::GetWorkCount(void) const
124 {
125     unsigned int value = control_.load();
126     return GetWorkingThreadNum(value);
127 }
128 
GetWorkerThreadsInfo(void)129 std::map<std::string, long> WorkerPool::GetWorkerThreadsInfo(void)
130 {
131     std::unique_lock<std::mutex> mLock(poolLock_);
132     std::map<std::string, long> workerThreadsInfo;
133 
134     for (auto it = pool_.begin(); it != pool_.end(); it++) {
135         if ((*it) != nullptr) {
136             workerThreadsInfo.emplace((*it)->GetThreadName(), (*it)->GetTaskCounter());
137         }
138     }
139     return workerThreadsInfo;
140 }
141 
ClosePool(bool interrupt)142 void WorkerPool::ClosePool(bool interrupt)
143 {
144     HILOG_INFO("WorkerPool::ClosePool begin interrupt=%{public}d", interrupt);
145     std::unique_lock<std::mutex> mLock(poolLock_);
146 
147     AdvanceStateTo(CLOSING);
148     InterruptWorkers();
149 
150     HILOG_INFO("WorkerPool::ClosePool end");
151 }
152 
InterruptWorkers(void)153 void WorkerPool::InterruptWorkers(void)
154 {
155     HILOG_INFO("WorkerPool::InterruptWorkers begin");
156     if (guardThread_ == nullptr) {
157         HILOG_ERROR("WorkerPool::InterruptWorkers guardThread is nullptr");
158         return;
159     }
160     poolLock_.unlock();
161 
162     {
163         std::unique_lock<std::mutex> lock(exitPoolLock_);
164         stop_.store(true);
165         exit_.notify_all();
166     }
167 
168     {
169         std::unique_lock<std::mutex> lock(exitPoolLock_);
170         exitGuard_.wait(lock);
171         if (guardThread_->joinable()) {
172             HILOG_INFO("WorkerPool::InterruptWorkers guardThread_ joinable");
173             guardThread_->join();
174             // Prevent manual call again
175             guardThread_ = nullptr;
176         }
177     }
178 
179     HILOG_INFO("WorkerPool::InterruptWorkers end");
180 }
181 
CreateGuardThread()182 void WorkerPool::CreateGuardThread()
183 {
184     HILOG_INFO("WorkerPool::CreateGuardThread START");
185     if (guardThread_ != nullptr) {
186         HILOG_WARN("WorkerPool::CreateGuardThread guardThread_ is not nullptr");
187         return;
188     }
189     auto guardTask = [&]() {
190         while (true) {
191             {
192                 std::unique_lock<std::mutex> lock(exitPoolLock_);
193                 if (!exitPool_.empty()) {
194                     exitPool_.front()->Join();
195                     exitPool_.erase(exitPool_.begin());
196                 } else {
197                     exit_.wait(lock, [this] {
198                         return this->stop_.load() || !this->exitPool_.empty();
199                     });  // return 防止先notify 后wait
200                 }
201             }
202             if (stop_.load() && exitPool_.empty() && pool_.empty()) {
203                 exitGuard_.notify_all();
204                 HILOG_INFO("WorkerPool::CreateGuardThread break while");
205                 break;
206             }
207         }
208         HILOG_INFO("WorkerPool::CreateGuardThread STOP");
209     };
210 
211     guardThread_ = std::make_shared<std::thread>(guardTask);
212 }
213 
AddWorker(const std::shared_ptr<Delegate> & delegate,const std::shared_ptr<Task> & task)214 bool WorkerPool::AddWorker(const std::shared_ptr<Delegate> &delegate, const std::shared_ptr<Task> &task)
215 {
216     bool added = false;
217     if (!initFlag_.load()) {
218         HILOG_ERROR("WorkerPool::AddWorker workPool init failed");
219         return added;
220     }
221     if (factory_ == nullptr) {
222         HILOG_ERROR("WorkerPool::AddWorker factory_ is nullptr");
223         return added;
224     }
225     if (task == nullptr) {
226         HILOG_ERROR("WorkerPool::AddWorker task is nullptr");
227         return added;
228     }
229     if (delegate == nullptr) {
230         HILOG_ERROR("WorkerPool::AddWorker delegate is nullptr");
231         return added;
232     }
233     std::unique_lock<std::mutex> mLock(poolLock_);
234     std::shared_ptr<WorkerThread> newThread = nullptr;
235 
236     for (;;) {
237         unsigned int value = control_.load();
238         int num = GetWorkingThreadNum(value);
239         if (num >= thread_limit_) {
240             HILOG_INFO("WorkerPool::AddWorker thread count exceed limits, num=%{public}d, limits=%{public}d", num,
241                 thread_limit_);
242             break;
243         }
244         if (!IsRunning(value)) {
245             HILOG_INFO("WorkerPool::AddWorker thread pool is not running. value=%{public}d, closing=%{public}d, "
246                      "count_bits=%{public}d",
247                 value,
248                 CLOSING,
249                 COUNT_BITS);
250             break;
251         }
252 
253         if (CompareAndIncThreadNum(num)) {
254             newThread = std::make_shared<WorkerThread>(delegate, task, factory_);
255             if (newThread == nullptr) {
256                 HILOG_ERROR("WorkerPool::AddWorker create thread fail");
257                 break;
258             }
259 
260             newThread->CreateThread();
261 
262             HILOG_INFO("WorkerPool::AddWorker create new thread");
263 
264             pool_.emplace_back(newThread);
265             HILOG_INFO("WorkerPool::AddWorker pool_ add thread ,POOL SIZE: %{public}zu", pool_.size());
266 
267             added = true;
268             break;
269         }
270 
271         HILOG_WARN("WorkerPool::AddWorker set thread state error. retry. ");
272     }
273     return added;
274 }
275 
OnWorkerExit(const std::shared_ptr<WorkerThread> & worker,bool isInterrupted)276 void WorkerPool::OnWorkerExit(const std::shared_ptr<WorkerThread> &worker, bool isInterrupted)
277 {
278     std::unique_lock<std::mutex> mLock(poolLock_);
279     HILOG_INFO("WorkerPool::OnWorkerExit start, pool size: %{public}zu", pool_.size());
280     for (auto it = pool_.begin(); it != pool_.end(); it++) {
281         if ((*it).get() == worker.get()) {
282             HILOG_INFO("WorkerPool::OnWorkerExit erase current, size=%{public}zu, threads=%{public}d",
283                 pool_.size(),
284                 GetWorkingThreadNum(control_.load()));
285             {
286                 std::unique_lock<std::mutex> lock(exitPoolLock_);
287                 exitPool_.emplace_back(worker);
288                 HILOG_INFO("WorkerPool::OnWorkerExit exit notify all");
289                 exit_.notify_all();
290             }
291             pool_.erase(it);
292 
293             break;
294         }
295     }
296     HILOG_INFO("WorkerPool::OnWorkerExit end");
297 }
298 
AfterRun(const std::shared_ptr<Task> & task)299 void WorkerPool::AfterRun(const std::shared_ptr<Task> &task)
300 {}
301 
BeforeRun(const std::shared_ptr<Task> & task)302 void WorkerPool::BeforeRun(const std::shared_ptr<Task> &task)
303 {}
304 
GetWorkingThreadNum(unsigned int ctl)305 unsigned int WorkerPool::GetWorkingThreadNum(unsigned int ctl)
306 {
307     return ctl & CAPACITY;
308 }
309 
IsRunning(int ctl)310 bool WorkerPool::IsRunning(int ctl)
311 {
312     return ctl < CLOSING;
313 }
314 
GetStateFromControl(unsigned int ctl)315 int WorkerPool::GetStateFromControl(unsigned int ctl)
316 {
317     return ctl & ~CAPACITY;
318 }
319 
AdvanceStateTo(unsigned int target)320 void WorkerPool::AdvanceStateTo(unsigned int target)
321 {
322     HILOG_INFO("WorkerPool::AdvanceStateTo begin");
323     for (;;) {
324         unsigned int current = control_.load();
325         if ((current >= target) ||
326             CompareAndSet(control_, current, CombineToControl(target, GetWorkingThreadNum(current)))) {
327             HILOG_INFO("WorkerPool::AdvanceStateTo break");
328             break;
329         }
330     }
331     HILOG_INFO("WorkerPool::AdvanceStateTo end");
332 }
333 
CombineToControl(unsigned int state,unsigned int count)334 int WorkerPool::CombineToControl(unsigned int state, unsigned int count)
335 {
336     return state | count;
337 }
338 
CompareAndIncThreadNum(int expect)339 bool WorkerPool::CompareAndIncThreadNum(int expect)
340 {
341     unsigned int ctl = control_.load();
342     int state = GetStateFromControl(ctl);
343     return CompareAndSet(control_, ctl, CombineToControl(state, expect + 1));
344 }
345 
DecrementThread(void)346 void WorkerPool::DecrementThread(void)
347 {
348     HILOG_INFO("WorkerPool::DecrementThread begin");
349     int curr = control_.load();
350     while (!CompareAndDecThreadNum(curr)) {
351         curr = control_.load();
352     }
353     HILOG_INFO("WorkerPool::DecrementThread end");
354 }
355 
CompareAndDecThreadNum(int expect)356 bool WorkerPool::CompareAndDecThreadNum(int expect)
357 {
358     return CompareAndSet(control_, expect, expect - 1);
359 }
360 
CompareAndDecNum(int expectCount)361 bool WorkerPool::CompareAndDecNum(int expectCount)
362 {
363     unsigned int curr = control_.load();
364     int state = GetStateFromControl(curr);
365     int expectControl = CombineToControl(state, expectCount);
366     return CompareAndDecThreadNum(expectControl);
367 }
368 
CompareAndSet(std::atomic<int> & atomicInt,int expect,int desire)369 bool WorkerPool::CompareAndSet(std::atomic<int> &atomicInt, int expect, int desire)
370 {
371     return atomicInt.compare_exchange_strong(expect, desire);
372 }
373 }  // namespace AppExecFwk
374 }  // namespace OHOS