• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_pool.h"
17 #include <climits>
18 #include "logger.h"
19 #include "preferences_errno.h"
20 
21 namespace OHOS {
22 namespace NativePreferences {
23 constexpr int TaskPool::IDLE_WAIT_PERIOD;
24 
TaskPool(int maxThreads,int minThreads)25 TaskPool::TaskPool(int maxThreads, int minThreads)
26     : genericTasks_(false), genericTaskCount_(0), queuedTaskCount_(0), isStarted_(false), isStopping_(false),
27       isGenericThreadIdle_(false), maxThreads_(maxThreads), minThreads_(minThreads), curThreads_(0), idleThreads_(0)
28 {
29 }
30 
~TaskPool()31 TaskPool::~TaskPool()
32 {
33 }
34 
Start()35 int TaskPool::Start()
36 {
37     if (maxThreads_ < minThreads_) {
38         LOG_ERROR("Start task pool failed, maxThreads(%d) < minThreads(%d).", maxThreads_, minThreads_);
39         return E_INVALID_ARGS;
40     }
41     if (maxThreads_ <= 0) {
42         LOG_ERROR("Start task pool failed, maxThreads(%d) <= 0.", maxThreads_);
43         return E_INVALID_ARGS;
44     }
45     if (minThreads_ < 0) {
46         LOG_ERROR("Start task pool failed, minThreads(%d) < 0.", minThreads_);
47         return E_INVALID_ARGS;
48     }
49 
50     std::lock_guard<std::mutex> guard(tasksMutex_);
51     isStarted_ = true; // parameters checked ok.
52     isStopping_ = false;
53     int errCode = SpawnThreads(true);
54     if (errCode != E_OK) {
55         LOG_WARN("Spawn threads failed when starting the task pool.");
56         // ignore the error, we will try when schedule().
57     }
58     return E_OK;
59 }
60 
Stop()61 void TaskPool::Stop()
62 {
63     std::unique_lock<std::mutex> lock(tasksMutex_);
64     if (!isStarted_) {
65         return;
66     }
67     isStopping_ = true;
68     hasTasks_.notify_all();
69     allThreadsExited_.wait(lock, [this]() { return this->curThreads_ <= 0; });
70     isStarted_ = false;
71 }
72 
Schedule(const Task & task)73 int TaskPool::Schedule(const Task &task)
74 {
75     if (!task) {
76         return E_INVALID_ARGS;
77     }
78     std::lock_guard<std::mutex> guard(tasksMutex_);
79     if (!isStarted_) {
80         LOG_ERROR("Schedule failed, the task pool is not started.");
81         return E_NOT_PERMIT;
82     }
83     if (isStopping_) {
84         LOG_ERROR("Schedule failed, the task pool is stopping.");
85         return E_STALE;
86     }
87     if (genericTaskCount_ == INT_MAX) {
88         LOG_ERROR("Schedule failed, the task pool is full.");
89         return E_ERROR;
90     }
91     genericTasks_.PutTask(task);
92     ++genericTaskCount_;
93     hasTasks_.notify_one();
94     TryToSpawnThreads();
95     return E_OK;
96 }
97 
Schedule(const std::string & queueTag,const Task & task)98 int TaskPool::Schedule(const std::string &queueTag, const Task &task)
99 {
100     if (!task) {
101         return E_INVALID_ARGS;
102     }
103     std::lock_guard<std::mutex> guard(tasksMutex_);
104     if (!isStarted_) {
105         LOG_ERROR("Schedule failed, the task pool is not started.");
106         return E_NOT_PERMIT;
107     }
108     if (isStopping_) {
109         LOG_ERROR("Schedule failed, the task pool is stopping.");
110         return E_STALE;
111     }
112     if (queuedTaskCount_ == INT_MAX) {
113         LOG_ERROR("Schedule failed, the task pool is full.");
114         return E_ERROR;
115     }
116     queuedTasks_[queueTag].PutTask(task);
117     ++queuedTaskCount_;
118     hasTasks_.notify_all();
119     TryToSpawnThreads();
120     return E_OK;
121 }
122 
ShrinkMemory(const std::string & tag)123 void TaskPool::ShrinkMemory(const std::string &tag)
124 {
125     std::lock_guard<std::mutex> guard(tasksMutex_);
126     auto iter = queuedTasks_.find(tag);
127     if (iter != queuedTasks_.end()) {
128         if (iter->second.IsEmptyAndUnlocked()) {
129             queuedTasks_.erase(iter);
130         }
131     }
132 }
133 
Report()134 void TaskPool::Report()
135 {
136     std::lock_guard<std::mutex> guard(tasksMutex_);
137     LOG_INFO("[Task pool report:1] maxThreads:%d, minThreads:%d, curThreads:%d, "
138              "idleThreads:%d, genericTaskCount:%d, queuedTaskCount:%d.",
139         maxThreads_, minThreads_, curThreads_, idleThreads_, genericTaskCount_, queuedTaskCount_);
140     LOG_INFO("[Task pool report:2] taskQueueCount:%zu.", queuedTasks_.size());
141 }
142 
IdleExit(std::unique_lock<std::mutex> & lock)143 bool TaskPool::IdleExit(std::unique_lock<std::mutex> &lock)
144 {
145     if (isStopping_) {
146         return true;
147     }
148     ++idleThreads_;
149     bool isGenericWorker = IsGenericWorker();
150     if (!isGenericWorker && (curThreads_ > minThreads_)) {
151         std::cv_status status = hasTasks_.wait_for(lock, std::chrono::seconds(IDLE_WAIT_PERIOD));
152         if (status == std::cv_status::timeout && genericTaskCount_ <= 0) {
153             --idleThreads_;
154             return true;
155         }
156     } else {
157         if (isGenericWorker) {
158             isGenericThreadIdle_ = true;
159             hasTasks_.notify_all();
160         }
161         hasTasks_.wait(lock);
162         if (isGenericWorker) {
163             isGenericThreadIdle_ = false;
164         }
165     }
166     --idleThreads_;
167     return false;
168 }
169 
SetThreadFree()170 void TaskPool::SetThreadFree()
171 {
172     for (auto &pair : queuedTasks_) {
173         TaskQueue *tq = &pair.second;
174         tq->ReleaseLock();
175     }
176 }
177 
ReapTask(TaskQueue * & queue)178 Task TaskPool::ReapTask(TaskQueue *&queue)
179 {
180     Task task = genericTasks_.GetTaskAutoLock();
181     if (task != nullptr) {
182         queue = nullptr;
183         return task;
184     }
185 
186     queue = nullptr;
187     if (IsGenericWorker() && (curThreads_ > 1)) { // 1 indicates self.
188         SetThreadFree();
189         return nullptr;
190     }
191 
192     for (auto &pair : queuedTasks_) {
193         TaskQueue *tq = &pair.second;
194         task = tq->GetTaskAutoLock();
195         if (task != nullptr) {
196             queue = tq;
197             return task;
198         }
199     }
200     return nullptr;
201 }
202 
GetTask(Task & task,TaskQueue * & queue)203 int TaskPool::GetTask(Task &task, TaskQueue *&queue)
204 {
205     std::unique_lock<std::mutex> lock(tasksMutex_);
206 
207     while (true) {
208         task = ReapTask(queue);
209         if (task != nullptr) {
210             return E_OK;
211         }
212 
213         if (IdleExit(lock)) {
214             break;
215         }
216     }
217     return E_OK;
218 }
219 
SpawnThreads(bool isStart)220 int TaskPool::SpawnThreads(bool isStart)
221 {
222     if (!isStarted_) {
223         LOG_ERROR("Spawn task pool threads failed, pool is not started.");
224         return E_NOT_PERMIT;
225     }
226     if (curThreads_ >= maxThreads_) {
227         // the pool is full of threads.
228         return E_OK;
229     }
230 
231     int limits = isStart ? minThreads_ : (curThreads_ + 1);
232     while (curThreads_ < limits) {
233         ++curThreads_;
234         std::thread thread([this]() { TaskWorker(); });
235         LOG_INFO("Spawn task pool threads, min:%d cur:%d max:%d", minThreads_, curThreads_, maxThreads_);
236         thread.detach();
237     }
238     return E_OK;
239 }
240 
IsGenericWorker() const241 bool TaskPool::IsGenericWorker() const
242 {
243     return genericThread_ == std::this_thread::get_id();
244 }
245 
BecomeGenericWorker()246 void TaskPool::BecomeGenericWorker()
247 {
248     std::lock_guard<std::mutex> guard(tasksMutex_);
249     if (genericThread_ == std::thread::id()) {
250         genericThread_ = std::this_thread::get_id();
251     }
252 }
253 
ExitGenericWorker()254 void TaskPool::ExitGenericWorker()
255 {
256     std::lock_guard<std::mutex> guard(tasksMutex_);
257     if (IsGenericWorker()) {
258         genericThread_ = std::thread::id();
259     }
260     --curThreads_;
261     allThreadsExited_.notify_all();
262     LOG_INFO("Task pool thread exit, min:%d cur:%d max:%d, genericTaskCount:%d, queuedTaskCount:%d.", minThreads_,
263         curThreads_, maxThreads_, genericTaskCount_, queuedTaskCount_);
264 }
265 
TaskWorker()266 void TaskPool::TaskWorker()
267 {
268     BecomeGenericWorker();
269 
270     while (true) {
271         TaskQueue *taskQueue = nullptr;
272         Task task = nullptr;
273 
274         int errCode = GetTask(task, taskQueue);
275         if (errCode != E_OK) {
276             LOG_ERROR("Thread worker gets task failed, err:'%d'.", errCode);
277             break;
278         }
279         if (task == nullptr) {
280             // Idle thread exit.
281             break;
282         }
283 
284         task();
285         FinishExecuteTask(taskQueue);
286     }
287 
288     ExitGenericWorker();
289 }
290 
FinishExecuteTask(TaskQueue * taskQueue)291 void TaskPool::FinishExecuteTask(TaskQueue *taskQueue)
292 {
293     std::lock_guard<std::mutex> guard(tasksMutex_);
294     if (taskQueue != nullptr) {
295         taskQueue->ReleaseLock();
296         --queuedTaskCount_;
297     } else {
298         --genericTaskCount_;
299     }
300 }
301 
TryToSpawnThreads()302 void TaskPool::TryToSpawnThreads()
303 {
304     if ((curThreads_ >= maxThreads_) || (curThreads_ >= (queuedTaskCount_ + genericTaskCount_))) {
305         return;
306     }
307     SpawnThreads(false);
308 }
309 } // namespace NativePreferences
310 } // namespace OHOS
311