1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_pool.h"
17 #include <climits>
18 #include "logger.h"
19 #include "preferences_errno.h"
20
21 namespace OHOS {
22 namespace NativePreferences {
23 constexpr int TaskPool::IDLE_WAIT_PERIOD;
24
TaskPool(int maxThreads,int minThreads)25 TaskPool::TaskPool(int maxThreads, int minThreads)
26 : genericTasks_(false), genericTaskCount_(0), queuedTaskCount_(0), isStarted_(false), isStopping_(false),
27 isGenericThreadIdle_(false), maxThreads_(maxThreads), minThreads_(minThreads), curThreads_(0), idleThreads_(0)
28 {
29 }
30
~TaskPool()31 TaskPool::~TaskPool()
32 {
33 }
34
Start()35 int TaskPool::Start()
36 {
37 if (maxThreads_ < minThreads_) {
38 LOG_ERROR("Start task pool failed, maxThreads(%d) < minThreads(%d).", maxThreads_, minThreads_);
39 return E_INVALID_ARGS;
40 }
41 if (maxThreads_ <= 0) {
42 LOG_ERROR("Start task pool failed, maxThreads(%d) <= 0.", maxThreads_);
43 return E_INVALID_ARGS;
44 }
45 if (minThreads_ < 0) {
46 LOG_ERROR("Start task pool failed, minThreads(%d) < 0.", minThreads_);
47 return E_INVALID_ARGS;
48 }
49
50 std::lock_guard<std::mutex> guard(tasksMutex_);
51 isStarted_ = true; // parameters checked ok.
52 isStopping_ = false;
53 int errCode = SpawnThreads(true);
54 if (errCode != E_OK) {
55 LOG_WARN("Spawn threads failed when starting the task pool.");
56 // ignore the error, we will try when schedule().
57 }
58 return E_OK;
59 }
60
Stop()61 void TaskPool::Stop()
62 {
63 std::unique_lock<std::mutex> lock(tasksMutex_);
64 if (!isStarted_) {
65 return;
66 }
67 isStopping_ = true;
68 hasTasks_.notify_all();
69 allThreadsExited_.wait(lock, [this]() { return this->curThreads_ <= 0; });
70 isStarted_ = false;
71 }
72
Schedule(const Task & task)73 int TaskPool::Schedule(const Task &task)
74 {
75 if (!task) {
76 return E_INVALID_ARGS;
77 }
78 std::lock_guard<std::mutex> guard(tasksMutex_);
79 if (!isStarted_) {
80 LOG_ERROR("Schedule failed, the task pool is not started.");
81 return E_NOT_PERMIT;
82 }
83 if (isStopping_) {
84 LOG_ERROR("Schedule failed, the task pool is stopping.");
85 return E_STALE;
86 }
87 if (genericTaskCount_ == INT_MAX) {
88 LOG_ERROR("Schedule failed, the task pool is full.");
89 return E_ERROR;
90 }
91 genericTasks_.PutTask(task);
92 ++genericTaskCount_;
93 hasTasks_.notify_one();
94 TryToSpawnThreads();
95 return E_OK;
96 }
97
Schedule(const std::string & queueTag,const Task & task)98 int TaskPool::Schedule(const std::string &queueTag, const Task &task)
99 {
100 if (!task) {
101 return E_INVALID_ARGS;
102 }
103 std::lock_guard<std::mutex> guard(tasksMutex_);
104 if (!isStarted_) {
105 LOG_ERROR("Schedule failed, the task pool is not started.");
106 return E_NOT_PERMIT;
107 }
108 if (isStopping_) {
109 LOG_ERROR("Schedule failed, the task pool is stopping.");
110 return E_STALE;
111 }
112 if (queuedTaskCount_ == INT_MAX) {
113 LOG_ERROR("Schedule failed, the task pool is full.");
114 return E_ERROR;
115 }
116 queuedTasks_[queueTag].PutTask(task);
117 ++queuedTaskCount_;
118 hasTasks_.notify_all();
119 TryToSpawnThreads();
120 return E_OK;
121 }
122
ShrinkMemory(const std::string & tag)123 void TaskPool::ShrinkMemory(const std::string &tag)
124 {
125 std::lock_guard<std::mutex> guard(tasksMutex_);
126 auto iter = queuedTasks_.find(tag);
127 if (iter != queuedTasks_.end()) {
128 if (iter->second.IsEmptyAndUnlocked()) {
129 queuedTasks_.erase(iter);
130 }
131 }
132 }
133
Report()134 void TaskPool::Report()
135 {
136 std::lock_guard<std::mutex> guard(tasksMutex_);
137 LOG_INFO("[Task pool report:1] maxThreads:%d, minThreads:%d, curThreads:%d, "
138 "idleThreads:%d, genericTaskCount:%d, queuedTaskCount:%d.",
139 maxThreads_, minThreads_, curThreads_, idleThreads_, genericTaskCount_, queuedTaskCount_);
140 LOG_INFO("[Task pool report:2] taskQueueCount:%zu.", queuedTasks_.size());
141 }
142
IdleExit(std::unique_lock<std::mutex> & lock)143 bool TaskPool::IdleExit(std::unique_lock<std::mutex> &lock)
144 {
145 if (isStopping_) {
146 return true;
147 }
148 ++idleThreads_;
149 bool isGenericWorker = IsGenericWorker();
150 if (!isGenericWorker && (curThreads_ > minThreads_)) {
151 std::cv_status status = hasTasks_.wait_for(lock, std::chrono::seconds(IDLE_WAIT_PERIOD));
152 if (status == std::cv_status::timeout && genericTaskCount_ <= 0) {
153 --idleThreads_;
154 return true;
155 }
156 } else {
157 if (isGenericWorker) {
158 isGenericThreadIdle_ = true;
159 hasTasks_.notify_all();
160 }
161 hasTasks_.wait(lock);
162 if (isGenericWorker) {
163 isGenericThreadIdle_ = false;
164 }
165 }
166 --idleThreads_;
167 return false;
168 }
169
SetThreadFree()170 void TaskPool::SetThreadFree()
171 {
172 for (auto &pair : queuedTasks_) {
173 TaskQueue *tq = &pair.second;
174 tq->ReleaseLock();
175 }
176 }
177
ReapTask(TaskQueue * & queue)178 Task TaskPool::ReapTask(TaskQueue *&queue)
179 {
180 Task task = genericTasks_.GetTaskAutoLock();
181 if (task != nullptr) {
182 queue = nullptr;
183 return task;
184 }
185
186 queue = nullptr;
187 if (IsGenericWorker() && (curThreads_ > 1)) { // 1 indicates self.
188 SetThreadFree();
189 return nullptr;
190 }
191
192 for (auto &pair : queuedTasks_) {
193 TaskQueue *tq = &pair.second;
194 task = tq->GetTaskAutoLock();
195 if (task != nullptr) {
196 queue = tq;
197 return task;
198 }
199 }
200 return nullptr;
201 }
202
GetTask(Task & task,TaskQueue * & queue)203 int TaskPool::GetTask(Task &task, TaskQueue *&queue)
204 {
205 std::unique_lock<std::mutex> lock(tasksMutex_);
206
207 while (true) {
208 task = ReapTask(queue);
209 if (task != nullptr) {
210 return E_OK;
211 }
212
213 if (IdleExit(lock)) {
214 break;
215 }
216 }
217 return E_OK;
218 }
219
SpawnThreads(bool isStart)220 int TaskPool::SpawnThreads(bool isStart)
221 {
222 if (!isStarted_) {
223 LOG_ERROR("Spawn task pool threads failed, pool is not started.");
224 return E_NOT_PERMIT;
225 }
226 if (curThreads_ >= maxThreads_) {
227 // the pool is full of threads.
228 return E_OK;
229 }
230
231 int limits = isStart ? minThreads_ : (curThreads_ + 1);
232 while (curThreads_ < limits) {
233 ++curThreads_;
234 std::thread thread([this]() { TaskWorker(); });
235 LOG_INFO("Spawn task pool threads, min:%d cur:%d max:%d", minThreads_, curThreads_, maxThreads_);
236 thread.detach();
237 }
238 return E_OK;
239 }
240
IsGenericWorker() const241 bool TaskPool::IsGenericWorker() const
242 {
243 return genericThread_ == std::this_thread::get_id();
244 }
245
BecomeGenericWorker()246 void TaskPool::BecomeGenericWorker()
247 {
248 std::lock_guard<std::mutex> guard(tasksMutex_);
249 if (genericThread_ == std::thread::id()) {
250 genericThread_ = std::this_thread::get_id();
251 }
252 }
253
ExitGenericWorker()254 void TaskPool::ExitGenericWorker()
255 {
256 std::lock_guard<std::mutex> guard(tasksMutex_);
257 if (IsGenericWorker()) {
258 genericThread_ = std::thread::id();
259 }
260 --curThreads_;
261 allThreadsExited_.notify_all();
262 LOG_INFO("Task pool thread exit, min:%d cur:%d max:%d, genericTaskCount:%d, queuedTaskCount:%d.", minThreads_,
263 curThreads_, maxThreads_, genericTaskCount_, queuedTaskCount_);
264 }
265
TaskWorker()266 void TaskPool::TaskWorker()
267 {
268 BecomeGenericWorker();
269
270 while (true) {
271 TaskQueue *taskQueue = nullptr;
272 Task task = nullptr;
273
274 int errCode = GetTask(task, taskQueue);
275 if (errCode != E_OK) {
276 LOG_ERROR("Thread worker gets task failed, err:'%d'.", errCode);
277 break;
278 }
279 if (task == nullptr) {
280 // Idle thread exit.
281 break;
282 }
283
284 task();
285 FinishExecuteTask(taskQueue);
286 }
287
288 ExitGenericWorker();
289 }
290
FinishExecuteTask(TaskQueue * taskQueue)291 void TaskPool::FinishExecuteTask(TaskQueue *taskQueue)
292 {
293 std::lock_guard<std::mutex> guard(tasksMutex_);
294 if (taskQueue != nullptr) {
295 taskQueue->ReleaseLock();
296 --queuedTaskCount_;
297 } else {
298 --genericTaskCount_;
299 }
300 }
301
TryToSpawnThreads()302 void TaskPool::TryToSpawnThreads()
303 {
304 if ((curThreads_ >= maxThreads_) || (curThreads_ >= (queuedTaskCount_ + genericTaskCount_))) {
305 return;
306 }
307 SpawnThreads(false);
308 }
309 } // namespace NativePreferences
310 } // namespace OHOS
311