• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "task_pool_impl.h"
16 #include "db_errno.h"
17 #include "log_print.h"
18 
19 namespace DistributedDB {
20 constexpr int TaskPoolImpl::IDLE_WAIT_PERIOD;
21 
TaskPoolImpl(int maxThreads,int minThreads)22 TaskPoolImpl::TaskPoolImpl(int maxThreads, int minThreads)
23     : genericTasks_(false),
24       genericTaskCount_(0),
25       queuedTaskCount_(0),
26       isStarted_(false),
27       isStopping_(false),
28       maxThreads_(maxThreads),
29       minThreads_(minThreads),
30       curThreads_(0),
31       idleThreads_(0)
32 {}
33 
~TaskPoolImpl()34 TaskPoolImpl::~TaskPoolImpl()
35 {}
36 
Start()37 int TaskPoolImpl::Start()
38 {
39     if (maxThreads_ < minThreads_) {
40         LOGE("Start task pool failed, maxThreads(%d) < minThreads(%d).",
41             maxThreads_, minThreads_);
42         return -E_INVALID_ARGS;
43     }
44     if (maxThreads_ <= 0) {
45         LOGE("Start task pool failed, maxThreads(%d) <= 0.", maxThreads_);
46         return -E_INVALID_ARGS;
47     }
48     if (minThreads_ < 0) {
49         LOGE("Start task pool failed, minThreads(%d) < 0.", minThreads_);
50         return -E_INVALID_ARGS;
51     }
52     LOGI("Start task pool min:%d, max:%d", minThreads_, maxThreads_);
53     std::lock_guard<std::mutex> guard(tasksMutex_);
54     isStarted_ = true; // parameters checked ok.
55     isStopping_ = false;
56     int errCode = SpawnThreads(true);
57     if (errCode != E_OK) {
58         LOGW("Spawn threads failed when starting the task pool.");
59         // ignore the error, we will try when schedule().
60     }
61     return E_OK;
62 }
63 
Stop()64 void TaskPoolImpl::Stop()
65 {
66     std::unique_lock<std::mutex> lock(tasksMutex_);
67     if (!isStarted_) {
68         return;
69     }
70     isStopping_ = true;
71     hasTasks_.notify_all();
72     allThreadsExited_.wait(lock, [this]() {
73         return this->curThreads_ <= 0;
74     });
75     isStarted_ = false;
76 }
77 
Schedule(const Task & task)78 int TaskPoolImpl::Schedule(const Task &task)
79 {
80     if (!task) {
81         return -E_INVALID_ARGS;
82     }
83     std::lock_guard<std::mutex> guard(tasksMutex_);
84     if (!isStarted_) {
85         LOGE("Schedule failed, the task pool is not started.");
86         return -E_NOT_PERMIT;
87     }
88     if (isStopping_) {
89         LOGI("Schedule failed, the task pool is stopping.");
90         return -E_STALE;
91     }
92     genericTasks_.PutTask(task);
93     ++genericTaskCount_;
94     hasTasks_.notify_one();
95     TryToSpawnThreads();
96     return E_OK;
97 }
98 
Schedule(const std::string & queueTag,const Task & task)99 int TaskPoolImpl::Schedule(const std::string &queueTag, const Task &task)
100 {
101     if (!task) {
102         return -E_INVALID_ARGS;
103     }
104     std::lock_guard<std::mutex> guard(tasksMutex_);
105     if (!isStarted_) {
106         LOGE("Schedule failed, the task pool is not started.");
107         return -E_NOT_PERMIT;
108     }
109     if (isStopping_) {
110         LOGI("Schedule failed, the task pool is stopping.");
111         return -E_STALE;
112     }
113     queuedTasks_[queueTag].PutTask(task);
114     ++queuedTaskCount_;
115     hasTasks_.notify_all();
116     TryToSpawnThreads();
117     return E_OK;
118 }
119 
ShrinkMemory(const std::string & tag)120 void TaskPoolImpl::ShrinkMemory(const std::string &tag)
121 {
122     std::lock_guard<std::mutex> guard(tasksMutex_);
123     auto iter = queuedTasks_.find(tag);
124     if (iter != queuedTasks_.end()) {
125         if (iter->second.IsEmptyAndUnlocked()) {
126             queuedTasks_.erase(iter);
127         }
128     }
129 }
130 
IdleExit(std::unique_lock<std::mutex> & lock)131 bool TaskPoolImpl::IdleExit(std::unique_lock<std::mutex> &lock)
132 {
133     if (isStopping_) {
134         return true;
135     }
136     ++idleThreads_;
137     bool isGenericWorker = IsGenericWorker();
138     if (!isGenericWorker && (curThreads_ > minThreads_)) {
139         std::cv_status status = hasTasks_.wait_for(lock,
140             std::chrono::seconds(IDLE_WAIT_PERIOD));
141         if (status == std::cv_status::timeout &&
142             genericTaskCount_ <= 0) {
143             --idleThreads_;
144             return true;
145         }
146     } else {
147         if (isGenericWorker) {
148             hasTasks_.notify_all();
149         }
150         hasTasks_.wait(lock);
151     }
152     --idleThreads_;
153     return false;
154 }
155 
SetThreadFree()156 void TaskPoolImpl::SetThreadFree()
157 {
158     for (auto &pair : queuedTasks_) {
159         TaskQueue *tq = &pair.second;
160         tq->ReleaseLock();
161     }
162 }
163 
ReapTask(TaskQueue * & queue)164 Task TaskPoolImpl::ReapTask(TaskQueue *&queue)
165 {
166     Task task = genericTasks_.GetTaskAutoLock();
167     if (task != nullptr) {
168         queue = nullptr;
169         return task;
170     }
171 
172     queue = nullptr;
173     if (IsGenericWorker() && (curThreads_ > 1)) { // 1 indicates self.
174         SetThreadFree();
175         return nullptr;
176     }
177     for (auto &pair : queuedTasks_) {
178         TaskQueue *tq = &pair.second;
179         task = tq->GetTaskAutoLock();
180         if (task != nullptr) {
181             queue = tq;
182             return task;
183         }
184     }
185     return nullptr;
186 }
187 
GetTask(Task & task,TaskQueue * & queue)188 void TaskPoolImpl::GetTask(Task &task, TaskQueue *&queue)
189 {
190     std::unique_lock<std::mutex> lock(tasksMutex_);
191 
192     while (true) {
193         task = ReapTask(queue);
194         if (task != nullptr) {
195             return;
196         }
197 
198         if (IdleExit(lock)) {
199             break;
200         }
201     }
202     if (task == nullptr) {
203         // Idle thread exit.
204         if (IsGenericWorker()) {
205             genericThread_ = std::thread::id();
206         }
207         --curThreads_;
208     }
209 }
210 
SpawnThreads(bool isStart)211 int TaskPoolImpl::SpawnThreads(bool isStart)
212 {
213     if (!isStarted_) {
214         LOGE("Spawn task pool threads failed, pool is not started.");
215         return -E_NOT_PERMIT;
216     }
217     if (curThreads_ >= maxThreads_) {
218         // the pool is full of threads.
219         return E_OK;
220     }
221 
222     int limits = isStart ? minThreads_ : (curThreads_ + 1);
223     while (curThreads_ < limits) {
224         ++curThreads_;
225         std::thread thread([this]() {
226             TaskWorker();
227         });
228         LOGI("Task pool spawn cur:%d idle:%d.", curThreads_, idleThreads_);
229         thread.detach();
230     }
231     return E_OK;
232 }
233 
IsGenericWorker() const234 bool TaskPoolImpl::IsGenericWorker() const
235 {
236     return genericThread_ == std::this_thread::get_id();
237 }
238 
BecomeGenericWorker()239 void TaskPoolImpl::BecomeGenericWorker()
240 {
241     std::lock_guard<std::mutex> guard(tasksMutex_);
242     if (genericThread_ == std::thread::id()) {
243         genericThread_ = std::this_thread::get_id();
244     }
245 }
246 
ExitWorker()247 void TaskPoolImpl::ExitWorker()
248 {
249     std::lock_guard<std::mutex> guard(tasksMutex_);
250     allThreadsExited_.notify_all();
251     LOGI("Task pool thread exit, cur:%d idle:%d, genericTaskCount:%d, queuedTaskCount:%d.",
252         curThreads_, idleThreads_, genericTaskCount_, queuedTaskCount_);
253 }
254 
TaskWorker()255 void TaskPoolImpl::TaskWorker()
256 {
257     BecomeGenericWorker();
258 
259     while (true) {
260         TaskQueue *taskQueue = nullptr;
261         Task task = nullptr;
262 
263         GetTask(task, taskQueue);
264         if (task == nullptr) {
265             // Idle thread exit.
266             break;
267         }
268 
269         task();
270         FinishExecuteTask(taskQueue);
271     }
272 
273     ExitWorker();
274 }
275 
FinishExecuteTask(TaskQueue * taskQueue)276 void TaskPoolImpl::FinishExecuteTask(TaskQueue *taskQueue)
277 {
278     std::lock_guard<std::mutex> guard(tasksMutex_);
279     if (taskQueue != nullptr) {
280         taskQueue->ReleaseLock();
281         --queuedTaskCount_;
282     } else {
283         --genericTaskCount_;
284     }
285 }
286 
TryToSpawnThreads()287 void TaskPoolImpl::TryToSpawnThreads()
288 {
289     if ((curThreads_ >= maxThreads_) ||
290         (curThreads_ >= (queuedTaskCount_ + genericTaskCount_))) {
291         return;
292     }
293     (void)(SpawnThreads(false));
294 }
295 } // namespace DistributedDB
296