• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "task_pool_impl.h"
16 #include "db_errno.h"
17 #include "log_print.h"
18 
19 namespace DistributedDB {
20 constexpr int TaskPoolImpl::IDLE_WAIT_PERIOD;
21 
TaskPoolImpl(int maxThreads,int minThreads)22 TaskPoolImpl::TaskPoolImpl(int maxThreads, int minThreads)
23     : genericTasks_(false),
24       genericTaskCount_(0),
25       queuedTaskCount_(0),
26       isStarted_(false),
27       isStopping_(false),
28       maxThreads_(maxThreads),
29       minThreads_(minThreads),
30       curThreads_(0),
31       idleThreads_(0)
32 {}
33 
~TaskPoolImpl()34 TaskPoolImpl::~TaskPoolImpl()
35 {}
36 
Start()37 int TaskPoolImpl::Start()
38 {
39     if (maxThreads_ < minThreads_) {
40         LOGE("Start task pool failed, maxThreads(%d) < minThreads(%d).",
41             maxThreads_, minThreads_);
42         return -E_INVALID_ARGS;
43     }
44     if (maxThreads_ <= 0) {
45         LOGE("Start task pool failed, maxThreads(%d) <= 0.", maxThreads_);
46         return -E_INVALID_ARGS;
47     }
48     if (minThreads_ < 0) {
49         LOGE("Start task pool failed, minThreads(%d) < 0.", minThreads_);
50         return -E_INVALID_ARGS;
51     }
52     LOGI("Start task pool min:%d, max:%d", minThreads_, maxThreads_);
53     std::lock_guard<std::mutex> guard(tasksMutex_);
54     isStarted_ = true; // parameters checked ok.
55     isStopping_ = false;
56     int errCode = SpawnThreads(true);
57     if (errCode != E_OK) {
58         LOGW("Spawn threads failed when starting the task pool.");
59         // ignore the error, we will try when schedule().
60     }
61     return E_OK;
62 }
63 
Stop()64 void TaskPoolImpl::Stop()
65 {
66     std::unique_lock<std::mutex> lock(tasksMutex_);
67     if (!isStarted_) {
68         return;
69     }
70     isStopping_ = true;
71     hasTasks_.notify_all();
72     allThreadsExited_.wait(lock, [this]() {
73         return this->curThreads_ <= 0;
74     });
75     isStarted_ = false;
76 }
77 
Schedule(const Task & task)78 int TaskPoolImpl::Schedule(const Task &task)
79 {
80     if (!task) {
81         return -E_INVALID_ARGS;
82     }
83     std::lock_guard<std::mutex> guard(tasksMutex_);
84     if (!isStarted_) {
85         LOGE("Schedule failed, the task pool is not started.");
86         return -E_NOT_PERMIT;
87     }
88     if (isStopping_) {
89         LOGI("Schedule failed, the task pool is stopping.");
90         return -E_STALE;
91     }
92     genericTasks_.PutTask(task);
93     ++genericTaskCount_;
94     hasTasks_.notify_one();
95     TryToSpawnThreads();
96     return E_OK;
97 }
98 
Schedule(const std::string & queueTag,const Task & task)99 int TaskPoolImpl::Schedule(const std::string &queueTag, const Task &task)
100 {
101     if (!task) {
102         return -E_INVALID_ARGS;
103     }
104     std::lock_guard<std::mutex> guard(tasksMutex_);
105     if (!isStarted_) {
106         LOGE("Schedule failed, the task pool is not started.");
107         return -E_NOT_PERMIT;
108     }
109     if (isStopping_) {
110         LOGI("Schedule failed, the task pool is stopping.");
111         return -E_STALE;
112     }
113     queuedTasks_[queueTag].PutTask(task);
114     ++queuedTaskCount_;
115     hasTasks_.notify_all();
116     TryToSpawnThreads();
117     return E_OK;
118 }
119 
ShrinkMemory(const std::string & tag)120 void TaskPoolImpl::ShrinkMemory(const std::string &tag)
121 {
122     std::lock_guard<std::mutex> guard(tasksMutex_);
123     auto iter = queuedTasks_.find(tag);
124     if (iter != queuedTasks_.end()) {
125         if (iter->second.IsEmptyAndUnlocked()) {
126             queuedTasks_.erase(iter);
127         }
128     }
129 }
130 
IdleExit(std::unique_lock<std::mutex> & lock)131 bool TaskPoolImpl::IdleExit(std::unique_lock<std::mutex> &lock)
132 {
133     if (isStopping_) {
134         return true;
135     }
136     ++idleThreads_;
137     bool isGenericWorker = IsGenericWorker();
138     if (!isGenericWorker && (curThreads_ > minThreads_)) {
139         std::cv_status status = hasTasks_.wait_for(lock,
140             std::chrono::seconds(IDLE_WAIT_PERIOD));
141         if (status == std::cv_status::timeout &&
142             genericTaskCount_ <= 0) {
143             --idleThreads_;
144             return true;
145         }
146     } else {
147         if (isGenericWorker) {
148             hasTasks_.notify_all();
149         }
150         hasTasks_.wait(lock);
151     }
152     --idleThreads_;
153     return false;
154 }
155 
SetThreadFree()156 void TaskPoolImpl::SetThreadFree()
157 {
158     for (auto &pair : queuedTasks_) {
159         TaskQueue *tq = &pair.second;
160         tq->ReleaseLock();
161     }
162 }
163 
ReapTask(TaskQueue * & queue)164 Task TaskPoolImpl::ReapTask(TaskQueue *&queue)
165 {
166     Task task = genericTasks_.GetTaskAutoLock();
167     if (task != nullptr) {
168         queue = nullptr;
169         return task;
170     }
171 
172     queue = nullptr;
173     if (IsGenericWorker() && (curThreads_ > 1)) { // 1 indicates self.
174         SetThreadFree();
175         return nullptr;
176     }
177     for (auto &pair : queuedTasks_) {
178         TaskQueue *tq = &pair.second;
179         task = tq->GetTaskAutoLock();
180         if (task != nullptr) {
181             queue = tq;
182             return task;
183         }
184     }
185     return nullptr;
186 }
187 
GetTask(Task & task,TaskQueue * & queue)188 int TaskPoolImpl::GetTask(Task &task, TaskQueue *&queue)
189 {
190     std::unique_lock<std::mutex> lock(tasksMutex_);
191 
192     while (true) {
193         task = ReapTask(queue);
194         if (task != nullptr) {
195             return E_OK;
196         }
197 
198         if (IdleExit(lock)) {
199             break;
200         }
201     }
202     return E_OK;
203 }
204 
SpawnThreads(bool isStart)205 int TaskPoolImpl::SpawnThreads(bool isStart)
206 {
207     if (!isStarted_) {
208         LOGE("Spawn task pool threads failed, pool is not started.");
209         return -E_NOT_PERMIT;
210     }
211     if (curThreads_ >= maxThreads_) {
212         // the pool is full of threads.
213         return E_OK;
214     }
215 
216     int limits = isStart ? minThreads_ : (curThreads_ + 1);
217     while (curThreads_ < limits) {
218         ++curThreads_;
219         std::thread thread([this]() {
220             TaskWorker();
221         });
222         LOGI("Task pool spawn cur:%d idle:%d.", curThreads_, idleThreads_);
223         thread.detach();
224     }
225     return E_OK;
226 }
227 
IsGenericWorker() const228 bool TaskPoolImpl::IsGenericWorker() const
229 {
230     return genericThread_ == std::this_thread::get_id();
231 }
232 
BecomeGenericWorker()233 void TaskPoolImpl::BecomeGenericWorker()
234 {
235     std::lock_guard<std::mutex> guard(tasksMutex_);
236     if (genericThread_ == std::thread::id()) {
237         genericThread_ = std::this_thread::get_id();
238     }
239 }
240 
ExitWorker()241 void TaskPoolImpl::ExitWorker()
242 {
243     std::lock_guard<std::mutex> guard(tasksMutex_);
244     if (IsGenericWorker()) {
245         genericThread_ = std::thread::id();
246     }
247     --curThreads_;
248     allThreadsExited_.notify_all();
249     LOGI("Task pool thread exit, cur:%d idle:%d, genericTaskCount:%d, queuedTaskCount:%d.",
250         curThreads_, idleThreads_, genericTaskCount_, queuedTaskCount_);
251 }
252 
TaskWorker()253 void TaskPoolImpl::TaskWorker()
254 {
255     BecomeGenericWorker();
256 
257     while (true) {
258         TaskQueue *taskQueue = nullptr;
259         Task task = nullptr;
260 
261         int errCode = GetTask(task, taskQueue);
262         if (errCode != E_OK) {
263             LOGE("Thread worker gets task failed, err:'%d'.", errCode);
264             break;
265         }
266         if (task == nullptr) {
267             // Idle thread exit.
268             break;
269         }
270 
271         task();
272         FinishExecuteTask(taskQueue);
273     }
274 
275     ExitWorker();
276 }
277 
FinishExecuteTask(TaskQueue * taskQueue)278 void TaskPoolImpl::FinishExecuteTask(TaskQueue *taskQueue)
279 {
280     std::lock_guard<std::mutex> guard(tasksMutex_);
281     if (taskQueue != nullptr) {
282         taskQueue->ReleaseLock();
283         --queuedTaskCount_;
284     } else {
285         --genericTaskCount_;
286     }
287 }
288 
TryToSpawnThreads()289 void TaskPoolImpl::TryToSpawnThreads()
290 {
291     if ((curThreads_ >= maxThreads_) ||
292         (curThreads_ >= (queuedTaskCount_ + genericTaskCount_))) {
293         return;
294     }
295     (void)(SpawnThreads(false));
296 }
297 } // namespace DistributedDB
298