1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "worker_pool.h"
16
17 #include <memory>
18 #include "default_thread_factory.h"
19
20 namespace OHOS {
21 namespace AppExecFwk {
22 const int WorkerPool::THREAD_UPPER_LIMIT = 256;
23 const int WorkerPool::MAX_THREAD_LOWER_LIMIT = 1;
24 const int WorkerPool::CORE_THREAD_LOWER_LIMIT = 0;
25 const int WorkerPool::COUNT_BITS = sizeof(int) * __CHAR_BIT__ - 3;
26 const unsigned int WorkerPool::CAPACITY = (1 << COUNT_BITS) - 1;
27 const int WorkerPool::RUNNING = (-(1 << COUNT_BITS));
28 const int WorkerPool::CLOSING = (0 << COUNT_BITS);
29 const int WorkerPool::INTERRUPT = (1 << COUNT_BITS);
30 const int WorkerPool::CLEANED = (2 << COUNT_BITS);
31 const int WorkerPool::CLOSED = (3 << COUNT_BITS);
32
WorkerPool(const std::shared_ptr<WorkerPoolConfig> & config)33 WorkerPool::WorkerPool(const std::shared_ptr<WorkerPoolConfig> &config)
34 {
35 control_ = CombineToControl(RUNNING, 0);
36 pool_.clear();
37 exitPool_.clear();
38 WorkerPool::factory_ = std::make_shared<DefaultThreadFactory>();
39 initFlag_.store(Init(config));
40 stop_.store(false);
41 }
42
~WorkerPool()43 WorkerPool::~WorkerPool()
44 {
45 control_ = 0;
46
47 HILOG_INFO("WorkerPool::~WorkerPool");
48 }
49
Init(const std::shared_ptr<WorkerPoolConfig> & config)50 bool WorkerPool::Init(const std::shared_ptr<WorkerPoolConfig> &config)
51 {
52 if (!CheckConfigParams(config)) {
53 HILOG_ERROR("WorkerPool::checkConfigParams parameters are illegal");
54 return false;
55 }
56
57 thread_limit_ = config->GetMaxThreadCount();
58 core_thread_limit_ = config->GetCoreThreadCount();
59 long keepAliveTime = config->GetKeepAliveTime();
60 alive_time_Limit_ = keepAliveTime > 0 ? keepAliveTime : 0;
61
62 return true;
63 }
64
CheckConfigParams(const std::shared_ptr<WorkerPoolConfig> & config)65 bool WorkerPool::CheckConfigParams(const std::shared_ptr<WorkerPoolConfig> &config)
66 {
67 if (config == nullptr) {
68 HILOG_ERROR("WorkerPool::CheckConfigParams config is nullptr");
69 return false;
70 }
71
72 int maxThreadCount = config->GetMaxThreadCount();
73 int coreThreadCount = config->GetCoreThreadCount();
74
75 if (!CheckThreadCount(maxThreadCount, coreThreadCount)) {
76 HILOG_ERROR("WorkerPool::CheckConfigParams parameters are illegal, maxThreadCount %{public}d is less than "
77 "coreThreadCount %{public}d",
78 maxThreadCount,
79 coreThreadCount);
80 return false;
81 }
82 if (!CheckMaxThreadCount(maxThreadCount)) {
83 HILOG_ERROR("WorkerPool::CheckConfigParams maxThreadCount %{public}d is illegal", maxThreadCount);
84 return false;
85 }
86 if (!CheckCoreThreadCount(coreThreadCount)) {
87 HILOG_ERROR("WorkerPool::CheckConfigParams coreThreadCount %{public}d is illegal", coreThreadCount);
88 return false;
89 }
90 return true;
91 }
92
CheckThreadCount(int maxThreadCount,int coreThreadCount)93 bool WorkerPool::CheckThreadCount(int maxThreadCount, int coreThreadCount)
94 {
95 return maxThreadCount >= coreThreadCount;
96 }
97
CheckMaxThreadCount(int maxThreadCount)98 bool WorkerPool::CheckMaxThreadCount(int maxThreadCount)
99 {
100 return (maxThreadCount <= THREAD_UPPER_LIMIT) && (maxThreadCount >= MAX_THREAD_LOWER_LIMIT);
101 }
102
CheckCoreThreadCount(int coreThreadCount)103 bool WorkerPool::CheckCoreThreadCount(int coreThreadCount)
104 {
105 return (coreThreadCount <= THREAD_UPPER_LIMIT) && (coreThreadCount >= CORE_THREAD_LOWER_LIMIT);
106 }
107
GetKeepAliveTime(void) const108 long WorkerPool::GetKeepAliveTime(void) const
109 {
110 return alive_time_Limit_;
111 }
112
GetCoreThreadCount(void) const113 int WorkerPool::GetCoreThreadCount(void) const
114 {
115 return core_thread_limit_;
116 }
117
GetMaxThreadCount(void) const118 int WorkerPool::GetMaxThreadCount(void) const
119 {
120 return thread_limit_;
121 }
122
GetWorkCount(void) const123 int WorkerPool::GetWorkCount(void) const
124 {
125 unsigned int value = control_.load();
126 return GetWorkingThreadNum(value);
127 }
128
GetWorkerThreadsInfo(void)129 std::map<std::string, long> WorkerPool::GetWorkerThreadsInfo(void)
130 {
131 std::unique_lock<std::mutex> mLock(poolLock_);
132 std::map<std::string, long> workerThreadsInfo;
133
134 for (auto it = pool_.begin(); it != pool_.end(); it++) {
135 if ((*it) != nullptr) {
136 workerThreadsInfo.emplace((*it)->GetThreadName(), (*it)->GetTaskCounter());
137 }
138 }
139 return workerThreadsInfo;
140 }
141
ClosePool(bool interrupt)142 void WorkerPool::ClosePool(bool interrupt)
143 {
144 HILOG_INFO("WorkerPool::ClosePool begin interrupt=%{public}d", interrupt);
145 std::unique_lock<std::mutex> mLock(poolLock_);
146
147 AdvanceStateTo(CLOSING);
148 InterruptWorkers();
149
150 HILOG_INFO("WorkerPool::ClosePool end");
151 }
152
InterruptWorkers(void)153 void WorkerPool::InterruptWorkers(void)
154 {
155 HILOG_INFO("WorkerPool::InterruptWorkers begin");
156 if (guardThread_ == nullptr) {
157 HILOG_ERROR("WorkerPool::InterruptWorkers guardThread is nullptr");
158 return;
159 }
160 poolLock_.unlock();
161
162 {
163 std::unique_lock<std::mutex> lock(exitPoolLock_);
164 stop_.store(true);
165 exit_.notify_all();
166 }
167
168 {
169 std::unique_lock<std::mutex> lock(exitPoolLock_);
170 exitGuard_.wait(lock);
171 if (guardThread_->joinable()) {
172 HILOG_INFO("WorkerPool::InterruptWorkers guardThread_ joinable");
173 guardThread_->join();
174 // Prevent manual call again
175 guardThread_ = nullptr;
176 }
177 }
178
179 HILOG_INFO("WorkerPool::InterruptWorkers end");
180 }
181
CreateGuardThread()182 void WorkerPool::CreateGuardThread()
183 {
184 HILOG_INFO("WorkerPool::CreateGuardThread START");
185 if (guardThread_ != nullptr) {
186 HILOG_WARN("WorkerPool::CreateGuardThread guardThread_ is not nullptr");
187 return;
188 }
189 auto guardTask = [&]() {
190 while (true) {
191 {
192 std::unique_lock<std::mutex> lock(exitPoolLock_);
193 if (!exitPool_.empty()) {
194 exitPool_.front()->Join();
195 exitPool_.erase(exitPool_.begin());
196 } else {
197 exit_.wait(lock, [this] {
198 return this->stop_.load() || !this->exitPool_.empty();
199 }); // return 防止先notify 后wait
200 }
201 }
202 if (stop_.load() && exitPool_.empty() && pool_.empty()) {
203 exitGuard_.notify_all();
204 HILOG_INFO("WorkerPool::CreateGuardThread break while");
205 break;
206 }
207 }
208 HILOG_INFO("WorkerPool::CreateGuardThread STOP");
209 };
210
211 guardThread_ = std::make_shared<std::thread>(guardTask);
212 }
213
AddWorker(const std::shared_ptr<Delegate> & delegate,const std::shared_ptr<Task> & task)214 bool WorkerPool::AddWorker(const std::shared_ptr<Delegate> &delegate, const std::shared_ptr<Task> &task)
215 {
216 bool added = false;
217 if (!initFlag_.load()) {
218 HILOG_ERROR("WorkerPool::AddWorker workPool init failed");
219 return added;
220 }
221 if (factory_ == nullptr) {
222 HILOG_ERROR("WorkerPool::AddWorker factory_ is nullptr");
223 return added;
224 }
225 if (task == nullptr) {
226 HILOG_ERROR("WorkerPool::AddWorker task is nullptr");
227 return added;
228 }
229 if (delegate == nullptr) {
230 HILOG_ERROR("WorkerPool::AddWorker delegate is nullptr");
231 return added;
232 }
233 std::unique_lock<std::mutex> mLock(poolLock_);
234 std::shared_ptr<WorkerThread> newThread = nullptr;
235
236 for (;;) {
237 unsigned int value = control_.load();
238 int num = GetWorkingThreadNum(value);
239 if (num >= thread_limit_) {
240 HILOG_INFO("WorkerPool::AddWorker thread count exceed limits, num=%{public}d, limits=%{public}d", num,
241 thread_limit_);
242 break;
243 }
244 if (!IsRunning(value)) {
245 HILOG_INFO("WorkerPool::AddWorker thread pool is not running. value=%{public}d, closing=%{public}d, "
246 "count_bits=%{public}d",
247 value,
248 CLOSING,
249 COUNT_BITS);
250 break;
251 }
252
253 if (CompareAndIncThreadNum(num)) {
254 newThread = std::make_shared<WorkerThread>(delegate, task, factory_);
255 if (newThread == nullptr) {
256 HILOG_ERROR("WorkerPool::AddWorker create thread fail");
257 break;
258 }
259
260 newThread->CreateThread();
261
262 HILOG_INFO("WorkerPool::AddWorker create new thread");
263
264 pool_.emplace_back(newThread);
265 HILOG_INFO("WorkerPool::AddWorker pool_ add thread ,POOL SIZE: %{public}zu", pool_.size());
266
267 added = true;
268 break;
269 }
270
271 HILOG_WARN("WorkerPool::AddWorker set thread state error. retry. ");
272 }
273 return added;
274 }
275
OnWorkerExit(const std::shared_ptr<WorkerThread> & worker,bool isInterrupted)276 void WorkerPool::OnWorkerExit(const std::shared_ptr<WorkerThread> &worker, bool isInterrupted)
277 {
278 std::unique_lock<std::mutex> mLock(poolLock_);
279 HILOG_INFO("WorkerPool::OnWorkerExit start, pool size: %{public}zu", pool_.size());
280 for (auto it = pool_.begin(); it != pool_.end(); it++) {
281 if ((*it).get() == worker.get()) {
282 HILOG_INFO("WorkerPool::OnWorkerExit erase current, size=%{public}zu, threads=%{public}d",
283 pool_.size(),
284 GetWorkingThreadNum(control_.load()));
285 {
286 std::unique_lock<std::mutex> lock(exitPoolLock_);
287 exitPool_.emplace_back(worker);
288 HILOG_INFO("WorkerPool::OnWorkerExit exit notify all");
289 exit_.notify_all();
290 }
291 pool_.erase(it);
292
293 break;
294 }
295 }
296 HILOG_INFO("WorkerPool::OnWorkerExit end");
297 }
298
AfterRun(const std::shared_ptr<Task> & task)299 void WorkerPool::AfterRun(const std::shared_ptr<Task> &task)
300 {}
301
BeforeRun(const std::shared_ptr<Task> & task)302 void WorkerPool::BeforeRun(const std::shared_ptr<Task> &task)
303 {}
304
GetWorkingThreadNum(unsigned int ctl)305 unsigned int WorkerPool::GetWorkingThreadNum(unsigned int ctl)
306 {
307 return ctl & CAPACITY;
308 }
309
IsRunning(int ctl)310 bool WorkerPool::IsRunning(int ctl)
311 {
312 return ctl < CLOSING;
313 }
314
GetStateFromControl(unsigned int ctl)315 int WorkerPool::GetStateFromControl(unsigned int ctl)
316 {
317 return ctl & ~CAPACITY;
318 }
319
AdvanceStateTo(unsigned int target)320 void WorkerPool::AdvanceStateTo(unsigned int target)
321 {
322 HILOG_INFO("WorkerPool::AdvanceStateTo begin");
323 for (;;) {
324 unsigned int current = control_.load();
325 if ((current >= target) ||
326 CompareAndSet(control_, current, CombineToControl(target, GetWorkingThreadNum(current)))) {
327 HILOG_INFO("WorkerPool::AdvanceStateTo break");
328 break;
329 }
330 }
331 HILOG_INFO("WorkerPool::AdvanceStateTo end");
332 }
333
CombineToControl(unsigned int state,unsigned int count)334 int WorkerPool::CombineToControl(unsigned int state, unsigned int count)
335 {
336 return state | count;
337 }
338
CompareAndIncThreadNum(int expect)339 bool WorkerPool::CompareAndIncThreadNum(int expect)
340 {
341 unsigned int ctl = control_.load();
342 int state = GetStateFromControl(ctl);
343 return CompareAndSet(control_, ctl, CombineToControl(state, expect + 1));
344 }
345
DecrementThread(void)346 void WorkerPool::DecrementThread(void)
347 {
348 HILOG_INFO("WorkerPool::DecrementThread begin");
349 int curr = control_.load();
350 while (!CompareAndDecThreadNum(curr)) {
351 curr = control_.load();
352 }
353 HILOG_INFO("WorkerPool::DecrementThread end");
354 }
355
CompareAndDecThreadNum(int expect)356 bool WorkerPool::CompareAndDecThreadNum(int expect)
357 {
358 return CompareAndSet(control_, expect, expect - 1);
359 }
360
CompareAndDecNum(int expectCount)361 bool WorkerPool::CompareAndDecNum(int expectCount)
362 {
363 unsigned int curr = control_.load();
364 int state = GetStateFromControl(curr);
365 int expectControl = CombineToControl(state, expectCount);
366 return CompareAndDecThreadNum(expectControl);
367 }
368
CompareAndSet(std::atomic<int> & atomicInt,int expect,int desire)369 bool WorkerPool::CompareAndSet(std::atomic<int> &atomicInt, int expect, int desire)
370 {
371 return atomicInt.compare_exchange_strong(expect, desire);
372 }
373 } // namespace AppExecFwk
374 } // namespace OHOS