1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "task_executor.h"
16
17 #include "app_log_wrapper.h"
18
19 namespace OHOS {
20 namespace AppExecFwk {
21 std::atomic<long> TaskExecutor::sequence(0);
22
TaskExecutor(const std::shared_ptr<WorkerPoolConfig> & config)23 TaskExecutor::TaskExecutor(const std::shared_ptr<WorkerPoolConfig> &config) : WorkerPool(config)
24 {
25 terminated_ = false;
26 taskCounter_ = 0;
27 delayTasks_ = std::make_shared<DelayQueue>();
28 pendingTasks_ = std::make_shared<BlockingQueue>();
29 }
~TaskExecutor()30 TaskExecutor::~TaskExecutor()
31 {
32 if ((consumer_) && consumer_->joinable()) {
33 APP_LOGI("TaskExecutor::~TaskExecutor consumer is running");
34 consumer_->join();
35 }
36 APP_LOGI("TaskExecutor::~TaskExecutor");
37 }
38
Execute(const std::shared_ptr<Task> & task)39 void TaskExecutor::Execute(const std::shared_ptr<Task> &task)
40 {
41 APP_LOGI("TaskExecutor::Execute begin");
42 task->SetSequence(GetAndIncrement(sequence));
43
44 std::shared_ptr<TaskExecutor> executor = shared_from_this();
45
46 if (AddWorker(executor, task) == false) {
47 std::shared_ptr<PriorityTaskWrapper> priorityTaskWrapper =
48 std::make_shared<PriorityTaskWrapper>(task->GetPriority(), task);
49 if (pendingTasks_->Offer(priorityTaskWrapper) == false) {
50 APP_LOGW("TaskExecutor::Execute rejected a task");
51 }
52 }
53 APP_LOGI("TaskExecutor::Execute end");
54 }
55
DoWorks(const std::shared_ptr<WorkerThread> & worker)56 ErrCode TaskExecutor::DoWorks(const std::shared_ptr<WorkerThread> &worker)
57 {
58 APP_LOGI("TaskExecutor::DoWorks begin");
59 if (worker == nullptr) {
60 APP_LOGE("TaskExecutor::DoWorks worker is nullptr");
61 return ERR_APPEXECFWK_CHECK_FAILED;
62 }
63 if (worker->GetThread() == nullptr) {
64 APP_LOGE("TaskExecutor::DoWorks worker GetThread is nullptr");
65 return ERR_APPEXECFWK_CHECK_FAILED;
66 }
67
68 std::shared_ptr<Task> task = worker->PollFirstTask();
69
70 bool isInterrupted = false;
71 bool done = false;
72 while (((task != nullptr && done == false) || ((task = GetTask(worker)) != nullptr))) {
73 APP_LOGI("TaskExecutor::DoWorks loop tasks.");
74
75 BeforeRun(task);
76
77 task->Run();
78
79 AfterRun(task);
80
81 worker->IncTaskCount();
82 IncrementAndGet(taskCounter_);
83
84 // loop condition
85 done = true;
86 }
87 OnWorkerExit(worker, isInterrupted);
88 APP_LOGI("TaskExecutor::DoWorks end");
89 return ERR_OK;
90 }
GetTask(const std::shared_ptr<WorkerThread> & workerThread)91 std::shared_ptr<Task> TaskExecutor::GetTask(const std::shared_ptr<WorkerThread> &workerThread)
92 {
93 bool isTimeout = false;
94 std::shared_ptr<Task> nullRunnable = nullptr;
95 std::shared_ptr<Task> next = nullptr;
96
97 for(;;){
98 if (terminated_.load() && pendingTasks_->Empty()) {
99 APP_LOGI("TaskExecutor::GetTask end: loop thread %{public}s is terminated",
100 workerThread->GetThreadName().c_str());
101 DecrementThread();
102 return nullRunnable;
103 }
104
105 int workerCount = GetWorkCount();
106 APP_LOGI("TaskExecutor::GetTask workerCount:%{public}d, GetCoreThreadCount: %{public}d",
107 workerCount,
108 GetCoreThreadCount());
109 bool needCheckTimeout = (workerCount > GetCoreThreadCount());
110 if (isTimeout && needCheckTimeout && pendingTasks_->Empty()) {
111 APP_LOGI("TaskExecutor::GetTask isTimeout is true");
112 if (CompareAndDecNum(workerCount)) {
113 APP_LOGI("TaskExecutor::GetTask end: loop thread %{public}s is timeout",
114 workerThread->GetThreadName().c_str());
115 return nullRunnable;
116 }
117 continue;
118 }
119
120 APP_LOGI("TaskExecutor::GetTask need timeout=%{public}d", needCheckTimeout);
121 std::shared_ptr<PriorityTaskWrapper> next =
122 needCheckTimeout ? pendingTasks_->Poll(GetKeepAliveTime()) : pendingTasks_->Take();
123
124 if (next != nullptr && next->task_ != nullptr) {
125 APP_LOGI("TaskExecutor::GetTask end: loop thread %{public}s get next task",
126 workerThread->GetThreadName().c_str());
127 return next->task_;
128 }
129 isTimeout = true;
130 }
131 }
132
Terminate(bool force)133 void TaskExecutor::Terminate(bool force)
134 {
135 APP_LOGI("TaskExecutor::Terminate begin");
136 TerminateConsumer();
137 ClosePool(force);
138 APP_LOGI("TaskExecutor::Terminate end");
139 }
140
AfterRun(const std::shared_ptr<Task> & task)141 void TaskExecutor::AfterRun(const std::shared_ptr<Task> &task)
142 {
143 task->AfterTaskExecute();
144 }
145
BeforeRun(const std::shared_ptr<Task> & task)146 void TaskExecutor::BeforeRun(const std::shared_ptr<Task> &task)
147 {
148 task->BeforeTaskExecute();
149 }
150
DelayExecute(const Runnable & task,long delayMs)151 bool TaskExecutor::DelayExecute(const Runnable &task, long delayMs)
152 {
153 if (delayMs <= 0) {
154 task();
155 APP_LOGI("TaskExecutor::DelayExecute end and delayMs less than 0");
156 return true;
157 }
158 if (terminated_.load()) {
159 APP_LOGI("TaskExecutor::DelayExecute end and terminate");
160 return false;
161 }
162 std::shared_ptr<DelayTaskWrapper> delayTaskWrapper = std::make_shared<DelayTaskWrapper>(delayMs, task);
163 if (delayTaskWrapper == nullptr) {
164 APP_LOGI("TaskExecutor::DelayExecute end and delayTaskWrapper is nullptr");
165 return false;
166 }
167 delayTasks_->Offer(delayTaskWrapper);
168 return EnsureConsumeStarted();
169 }
170
TerminateConsumer()171 void TaskExecutor::TerminateConsumer()
172 {
173
174 std::unique_lock<std::mutex> lock(dataMutex_);
175 terminated_.store(true);
176 pendingTasks_->Stop();
177 delayTasks_->Stop();
178
179 if (consumer_ != nullptr) {
180 if (consumer_->joinable()) {
181 consumer_->join();
182 }
183 consumer_ = nullptr;
184 }
185 }
186
EnsureConsumeStarted()187 bool TaskExecutor::EnsureConsumeStarted()
188 {
189 if (consumer_ == nullptr) {
190 {
191 std::unique_lock<std::mutex> lock(dataMutex_);
192 if (consumer_ == nullptr) {
193 consumer_ = std::make_shared<std::thread>(&TaskExecutor::Consume, this);
194 if (consumer_ == nullptr) {
195 APP_LOGE("TaskExecutor::EnsureConsumeStarted consumer_ is nullptr");
196 return false;
197 }
198 APP_LOGI("TaskExecutor::EnsureConsumeStarted start a delay task consumer");
199 }
200 }
201 }
202 return true;
203 }
204
Consume()205 void TaskExecutor::Consume()
206 {
207 for(;;){
208 if (terminated_.load() && delayTasks_->Empty()) {
209 APP_LOGI("TaskExecutor::Consume delay task is empty");
210 break;
211 }
212 std::shared_ptr<DelayTaskWrapper> delayTaskWrapper = delayTasks_->Take();
213 if (delayTaskWrapper == nullptr || delayTaskWrapper->runnable_ == nullptr) {
214 APP_LOGE("TaskExecutor::Consume delayTaskWrapper is nullptr");
215 return;
216 };
217 (delayTaskWrapper->runnable_)();
218 APP_LOGI("TaskExecutor::Consume after run");
219 }
220 }
221
GetPendingTasksSize()222 int TaskExecutor::GetPendingTasksSize()
223 {
224 return pendingTasks_->Size();
225 }
226
GetTaskCounter()227 long TaskExecutor::GetTaskCounter()
228 {
229 return taskCounter_.load();
230 }
231
GetAndIncrement(std::atomic<long> & atomiclong)232 long TaskExecutor::GetAndIncrement(std::atomic<long> &atomiclong)
233 {
234 long ret = atomiclong.load();
235 atomiclong.fetch_add(1, std::memory_order_relaxed);
236
237 return ret;
238 }
239
IncrementAndGet(std::atomic<long> & atomiclong)240 long TaskExecutor::IncrementAndGet(std::atomic<long> &atomiclong)
241 {
242 atomiclong.fetch_add(1, std::memory_order_relaxed);
243
244 return atomiclong;
245 }
246 } // namespace AppExecFwk
247 } // namespace OHOS