1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "task_pool_impl.h"
16 #include "db_errno.h"
17 #include "log_print.h"
18
19 namespace DistributedDB {
20 constexpr int TaskPoolImpl::IDLE_WAIT_PERIOD;
21
TaskPoolImpl(int maxThreads,int minThreads)22 TaskPoolImpl::TaskPoolImpl(int maxThreads, int minThreads)
23 : genericTasks_(false),
24 genericTaskCount_(0),
25 queuedTaskCount_(0),
26 isStarted_(false),
27 isStopping_(false),
28 maxThreads_(maxThreads),
29 minThreads_(minThreads),
30 curThreads_(0),
31 idleThreads_(0)
32 {}
33
~TaskPoolImpl()34 TaskPoolImpl::~TaskPoolImpl()
35 {}
36
Start()37 int TaskPoolImpl::Start()
38 {
39 if (maxThreads_ < minThreads_) {
40 LOGE("Start task pool failed, maxThreads(%d) < minThreads(%d).",
41 maxThreads_, minThreads_);
42 return -E_INVALID_ARGS;
43 }
44 if (maxThreads_ <= 0) {
45 LOGE("Start task pool failed, maxThreads(%d) <= 0.", maxThreads_);
46 return -E_INVALID_ARGS;
47 }
48 if (minThreads_ < 0) {
49 LOGE("Start task pool failed, minThreads(%d) < 0.", minThreads_);
50 return -E_INVALID_ARGS;
51 }
52 LOGI("Start task pool min:%d, max:%d", minThreads_, maxThreads_);
53 std::lock_guard<std::mutex> guard(tasksMutex_);
54 isStarted_ = true; // parameters checked ok.
55 isStopping_ = false;
56 int errCode = SpawnThreads(true);
57 if (errCode != E_OK) {
58 LOGW("Spawn threads failed when starting the task pool.");
59 // ignore the error, we will try when schedule().
60 }
61 return E_OK;
62 }
63
Stop()64 void TaskPoolImpl::Stop()
65 {
66 std::unique_lock<std::mutex> lock(tasksMutex_);
67 if (!isStarted_) {
68 return;
69 }
70 isStopping_ = true;
71 hasTasks_.notify_all();
72 allThreadsExited_.wait(lock, [this]() {
73 return this->curThreads_ <= 0;
74 });
75 isStarted_ = false;
76 }
77
Schedule(const Task & task)78 int TaskPoolImpl::Schedule(const Task &task)
79 {
80 if (!task) {
81 return -E_INVALID_ARGS;
82 }
83 std::lock_guard<std::mutex> guard(tasksMutex_);
84 if (!isStarted_) {
85 LOGE("Schedule failed, the task pool is not started.");
86 return -E_NOT_PERMIT;
87 }
88 if (isStopping_) {
89 LOGI("Schedule failed, the task pool is stopping.");
90 return -E_STALE;
91 }
92 genericTasks_.PutTask(task);
93 ++genericTaskCount_;
94 hasTasks_.notify_one();
95 TryToSpawnThreads();
96 return E_OK;
97 }
98
Schedule(const std::string & queueTag,const Task & task)99 int TaskPoolImpl::Schedule(const std::string &queueTag, const Task &task)
100 {
101 if (!task) {
102 return -E_INVALID_ARGS;
103 }
104 std::lock_guard<std::mutex> guard(tasksMutex_);
105 if (!isStarted_) {
106 LOGE("Schedule failed, the task pool is not started.");
107 return -E_NOT_PERMIT;
108 }
109 if (isStopping_) {
110 LOGI("Schedule failed, the task pool is stopping.");
111 return -E_STALE;
112 }
113 queuedTasks_[queueTag].PutTask(task);
114 ++queuedTaskCount_;
115 hasTasks_.notify_all();
116 TryToSpawnThreads();
117 return E_OK;
118 }
119
ShrinkMemory(const std::string & tag)120 void TaskPoolImpl::ShrinkMemory(const std::string &tag)
121 {
122 std::lock_guard<std::mutex> guard(tasksMutex_);
123 auto iter = queuedTasks_.find(tag);
124 if (iter != queuedTasks_.end()) {
125 if (iter->second.IsEmptyAndUnlocked()) {
126 queuedTasks_.erase(iter);
127 }
128 }
129 }
130
IdleExit(std::unique_lock<std::mutex> & lock)131 bool TaskPoolImpl::IdleExit(std::unique_lock<std::mutex> &lock)
132 {
133 if (isStopping_) {
134 return true;
135 }
136 ++idleThreads_;
137 bool isGenericWorker = IsGenericWorker();
138 if (!isGenericWorker && (curThreads_ > minThreads_)) {
139 std::cv_status status = hasTasks_.wait_for(lock,
140 std::chrono::seconds(IDLE_WAIT_PERIOD));
141 if (status == std::cv_status::timeout &&
142 genericTaskCount_ <= 0) {
143 --idleThreads_;
144 return true;
145 }
146 } else {
147 if (isGenericWorker) {
148 hasTasks_.notify_all();
149 }
150 hasTasks_.wait(lock);
151 }
152 --idleThreads_;
153 return false;
154 }
155
SetThreadFree()156 void TaskPoolImpl::SetThreadFree()
157 {
158 for (auto &pair : queuedTasks_) {
159 TaskQueue *tq = &pair.second;
160 tq->ReleaseLock();
161 }
162 }
163
ReapTask(TaskQueue * & queue)164 Task TaskPoolImpl::ReapTask(TaskQueue *&queue)
165 {
166 Task task = genericTasks_.GetTaskAutoLock();
167 if (task != nullptr) {
168 queue = nullptr;
169 return task;
170 }
171
172 queue = nullptr;
173 if (IsGenericWorker() && (curThreads_ > 1)) { // 1 indicates self.
174 SetThreadFree();
175 return nullptr;
176 }
177 for (auto &pair : queuedTasks_) {
178 TaskQueue *tq = &pair.second;
179 task = tq->GetTaskAutoLock();
180 if (task != nullptr) {
181 queue = tq;
182 return task;
183 }
184 }
185 return nullptr;
186 }
187
GetTask(Task & task,TaskQueue * & queue)188 int TaskPoolImpl::GetTask(Task &task, TaskQueue *&queue)
189 {
190 std::unique_lock<std::mutex> lock(tasksMutex_);
191
192 while (true) {
193 task = ReapTask(queue);
194 if (task != nullptr) {
195 return E_OK;
196 }
197
198 if (IdleExit(lock)) {
199 break;
200 }
201 }
202 return E_OK;
203 }
204
SpawnThreads(bool isStart)205 int TaskPoolImpl::SpawnThreads(bool isStart)
206 {
207 if (!isStarted_) {
208 LOGE("Spawn task pool threads failed, pool is not started.");
209 return -E_NOT_PERMIT;
210 }
211 if (curThreads_ >= maxThreads_) {
212 // the pool is full of threads.
213 return E_OK;
214 }
215
216 int limits = isStart ? minThreads_ : (curThreads_ + 1);
217 while (curThreads_ < limits) {
218 ++curThreads_;
219 std::thread thread([this]() {
220 TaskWorker();
221 });
222 LOGI("Task pool spawn cur:%d idle:%d.", curThreads_, idleThreads_);
223 thread.detach();
224 }
225 return E_OK;
226 }
227
IsGenericWorker() const228 bool TaskPoolImpl::IsGenericWorker() const
229 {
230 return genericThread_ == std::this_thread::get_id();
231 }
232
BecomeGenericWorker()233 void TaskPoolImpl::BecomeGenericWorker()
234 {
235 std::lock_guard<std::mutex> guard(tasksMutex_);
236 if (genericThread_ == std::thread::id()) {
237 genericThread_ = std::this_thread::get_id();
238 }
239 }
240
ExitWorker()241 void TaskPoolImpl::ExitWorker()
242 {
243 std::lock_guard<std::mutex> guard(tasksMutex_);
244 if (IsGenericWorker()) {
245 genericThread_ = std::thread::id();
246 }
247 --curThreads_;
248 allThreadsExited_.notify_all();
249 LOGI("Task pool thread exit, cur:%d idle:%d, genericTaskCount:%d, queuedTaskCount:%d.",
250 curThreads_, idleThreads_, genericTaskCount_, queuedTaskCount_);
251 }
252
TaskWorker()253 void TaskPoolImpl::TaskWorker()
254 {
255 BecomeGenericWorker();
256
257 while (true) {
258 TaskQueue *taskQueue = nullptr;
259 Task task = nullptr;
260
261 int errCode = GetTask(task, taskQueue);
262 if (errCode != E_OK) {
263 LOGE("Thread worker gets task failed, err:'%d'.", errCode);
264 break;
265 }
266 if (task == nullptr) {
267 // Idle thread exit.
268 break;
269 }
270
271 task();
272 FinishExecuteTask(taskQueue);
273 }
274
275 ExitWorker();
276 }
277
FinishExecuteTask(TaskQueue * taskQueue)278 void TaskPoolImpl::FinishExecuteTask(TaskQueue *taskQueue)
279 {
280 std::lock_guard<std::mutex> guard(tasksMutex_);
281 if (taskQueue != nullptr) {
282 taskQueue->ReleaseLock();
283 --queuedTaskCount_;
284 } else {
285 --genericTaskCount_;
286 }
287 }
288
TryToSpawnThreads()289 void TaskPoolImpl::TryToSpawnThreads()
290 {
291 if ((curThreads_ >= maxThreads_) ||
292 (curThreads_ >= (queuedTaskCount_ + genericTaskCount_))) {
293 return;
294 }
295 (void)(SpawnThreads(false));
296 }
297 } // namespace DistributedDB
298