1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "thread_pool.h"
17 #include "errors.h"
18 #include "utils_log.h"
19
20 #include <memory>
21 #include <pthread.h>
22
23 namespace OHOS {
24
ThreadPool(const std::string & name)25 ThreadPool::ThreadPool(const std::string& name)
26 : myName_(name), maxTaskNum_(0), running_(false)
27 {
28 }
29
~ThreadPool()30 ThreadPool::~ThreadPool()
31 {
32 if (running_) {
33 Stop();
34 }
35 }
36
Start(int numThreads)37 uint32_t ThreadPool::Start(int numThreads)
38 {
39 if (!threads_.empty()) {
40 return ERR_INVALID_OPERATION;
41 }
42
43 if (numThreads <= 0) {
44 return ERR_INVALID_VALUE;
45 }
46 running_ = true;
47 threads_.reserve(numThreads);
48
49 for (int i = 0; i < numThreads; ++i) {
50 std::thread t(&ThreadPool::WorkInThread, this);
51 // Give the name of ThreadPool to threads created by the ThreadPool.
52 int err = pthread_setname_np(t.native_handle(), (myName_ + std::to_string(i)).c_str());
53 if (err != 0) {
54 UTILS_LOGW("Failed to set name to thread. %{public}s", strerror(err));
55 }
56 threads_.push_back(std::move(t));
57 }
58 return ERR_OK;
59 }
60
Stop()61 void ThreadPool::Stop()
62 {
63 {
64 std::unique_lock<std::mutex> lock(mutex_);
65 running_ = false;
66 hasTaskToDo_.notify_all();
67 }
68
69 for (auto& e : threads_) {
70 e.join();
71 }
72 }
73
AddTask(const Task & f)74 void ThreadPool::AddTask(const Task &f)
75 {
76 if (threads_.empty()) {
77 f();
78 } else {
79 std::unique_lock<std::mutex> lock(mutex_);
80 while (Overloaded()) {
81 acceptNewTask_.wait(lock);
82 }
83
84 tasks_.push_back(f);
85 hasTaskToDo_.notify_one();
86 }
87 }
88
GetCurTaskNum()89 size_t ThreadPool::GetCurTaskNum()
90 {
91 std::unique_lock<std::mutex> lock(mutex_);
92 return tasks_.size();
93 }
94
95
ScheduleTask()96 ThreadPool::Task ThreadPool::ScheduleTask()
97 {
98 std::unique_lock<std::mutex> lock(mutex_);
99 while (tasks_.empty() && running_) {
100 hasTaskToDo_.wait(lock);
101 }
102
103 Task task;
104 if (!tasks_.empty()) {
105 task = tasks_.front();
106 tasks_.pop_front();
107
108 if (maxTaskNum_ > 0) {
109 acceptNewTask_.notify_one();
110 }
111 }
112 return task;
113 }
114
Overloaded() const115 bool ThreadPool::Overloaded() const
116 {
117 return (maxTaskNum_ > 0) && (tasks_.size() >= maxTaskNum_);
118 }
119
WorkInThread()120 void ThreadPool::WorkInThread()
121 {
122 while (running_) {
123 Task task = ScheduleTask();
124 if (task) {
125 task();
126 }
127 }
128 }
129
130 } // namespace OHOS
131