1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "task_executor.h"
16
17 #include "hilog_wrapper.h"
18
19 namespace OHOS {
20 namespace AppExecFwk {
21 std::atomic<long> TaskExecutor::sequence(0);
22
TaskExecutor(const std::shared_ptr<WorkerPoolConfig> & config)23 TaskExecutor::TaskExecutor(const std::shared_ptr<WorkerPoolConfig> &config) : WorkerPool(config)
24 {
25 terminated_ = false;
26 taskCounter_ = 0;
27 delayTasks_ = std::make_shared<DelayQueue>();
28 pendingTasks_ = std::make_shared<BlockingQueue>();
29 }
~TaskExecutor()30 TaskExecutor::~TaskExecutor()
31 {
32 if ((consumer_) && consumer_->joinable()) {
33 HILOG_INFO("TaskExecutor::~TaskExecutor consumer is running");
34 consumer_->join();
35 }
36 HILOG_INFO("TaskExecutor::~TaskExecutor");
37 }
38
Execute(const std::shared_ptr<Task> & task)39 void TaskExecutor::Execute(const std::shared_ptr<Task> &task)
40 {
41 HILOG_INFO("TaskExecutor::Execute begin");
42 task->SetSequence(GetAndIncrement(sequence));
43
44 std::shared_ptr<TaskExecutor> executor = shared_from_this();
45
46 if (AddWorker(executor, task) == false) {
47 std::shared_ptr<PriorityTaskWrapper> priorityTaskWrapper =
48 std::make_shared<PriorityTaskWrapper>(task->GetPriority(), task);
49 if (pendingTasks_->Offer(priorityTaskWrapper) == false) {
50 HILOG_WARN("TaskExecutor::Execute rejected a task");
51 }
52 }
53 HILOG_INFO("TaskExecutor::Execute end");
54 }
55
DoWorks(const std::shared_ptr<WorkerThread> & worker)56 ErrCode TaskExecutor::DoWorks(const std::shared_ptr<WorkerThread> &worker)
57 {
58 HILOG_INFO("TaskExecutor::DoWorks begin");
59 if (worker == nullptr) {
60 HILOG_ERROR("TaskExecutor::DoWorks worker is nullptr");
61 return ERR_APPEXECFWK_CHECK_FAILED;
62 }
63 if (worker->GetThread() == nullptr) {
64 HILOG_ERROR("TaskExecutor::DoWorks worker GetThread is nullptr");
65 return ERR_APPEXECFWK_CHECK_FAILED;
66 }
67
68 std::shared_ptr<Task> task = worker->PollFirstTask();
69 bool isInterrupted = false;
70 bool done = false;
71 while (((task != nullptr && done == false) || ((task = GetTask(worker)) != nullptr))) {
72 HILOG_INFO("TaskExecutor::DoWorks loop tasks.");
73
74 if (task) {
75 BeforeRun(task);
76 task->Run();
77 AfterRun(task);
78 }
79
80 worker->IncTaskCount();
81 IncrementAndGet(taskCounter_);
82
83 // loop condition
84 done = true;
85 }
86 OnWorkerExit(worker, isInterrupted);
87 HILOG_INFO("TaskExecutor::DoWorks end");
88 return ERR_OK;
89 }
GetTask(const std::shared_ptr<WorkerThread> & workerThread)90 std::shared_ptr<Task> TaskExecutor::GetTask(const std::shared_ptr<WorkerThread> &workerThread)
91 {
92 bool isTimeout = false;
93 std::shared_ptr<Task> nullRunnable = nullptr;
94 std::shared_ptr<Task> next = nullptr;
95
96 for (;;) {
97 if (terminated_.load() && pendingTasks_->Empty()) {
98 HILOG_INFO("TaskExecutor::GetTask end: loop thread %{public}s is terminated",
99 workerThread->GetThreadName().c_str());
100 DecrementThread();
101 return nullRunnable;
102 }
103
104 int workerCount = GetWorkCount();
105 HILOG_INFO("TaskExecutor::GetTask workerCount:%{public}d, GetCoreThreadCount: %{public}d",
106 workerCount,
107 GetCoreThreadCount());
108 bool needCheckTimeout = (workerCount > GetCoreThreadCount());
109 if (isTimeout && needCheckTimeout && pendingTasks_->Empty()) {
110 HILOG_INFO("TaskExecutor::GetTask isTimeout is true");
111 if (CompareAndDecNum(workerCount)) {
112 HILOG_INFO("TaskExecutor::GetTask end: loop thread %{public}s is timeout",
113 workerThread->GetThreadName().c_str());
114 return nullRunnable;
115 }
116 continue;
117 }
118
119 HILOG_INFO("TaskExecutor::GetTask need timeout=%{public}d", needCheckTimeout);
120 std::shared_ptr<PriorityTaskWrapper> next =
121 needCheckTimeout ? pendingTasks_->Poll(GetKeepAliveTime()) : pendingTasks_->Take();
122
123 if (next != nullptr && next->task_ != nullptr) {
124 HILOG_INFO("TaskExecutor::GetTask end: loop thread %{public}s get next task",
125 workerThread->GetThreadName().c_str());
126 return next->task_;
127 }
128 isTimeout = true;
129 }
130 }
131
Terminate(bool force)132 void TaskExecutor::Terminate(bool force)
133 {
134 HILOG_INFO("TaskExecutor::Terminate begin");
135 TerminateConsumer();
136 ClosePool(force);
137 HILOG_INFO("TaskExecutor::Terminate end");
138 }
139
AfterRun(const std::shared_ptr<Task> & task)140 void TaskExecutor::AfterRun(const std::shared_ptr<Task> &task)
141 {
142 task->AfterTaskExecute();
143 }
144
BeforeRun(const std::shared_ptr<Task> & task)145 void TaskExecutor::BeforeRun(const std::shared_ptr<Task> &task)
146 {
147 task->BeforeTaskExecute();
148 }
149
DelayExecute(const Runnable & task,long delayMs)150 bool TaskExecutor::DelayExecute(const Runnable &task, long delayMs)
151 {
152 if (delayMs <= 0) {
153 task();
154 HILOG_INFO("TaskExecutor::DelayExecute end and delayMs less than 0");
155 return true;
156 }
157 if (terminated_.load()) {
158 HILOG_INFO("TaskExecutor::DelayExecute end and terminate");
159 return false;
160 }
161 std::shared_ptr<DelayTaskWrapper> delayTaskWrapper = std::make_shared<DelayTaskWrapper>(delayMs, task);
162 if (delayTaskWrapper == nullptr) {
163 HILOG_INFO("TaskExecutor::DelayExecute end and delayTaskWrapper is nullptr");
164 return false;
165 }
166 delayTasks_->Offer(delayTaskWrapper);
167 return EnsureConsumeStarted();
168 }
169
TerminateConsumer()170 void TaskExecutor::TerminateConsumer()
171 {
172 std::unique_lock<std::mutex> lock(dataMutex_);
173 terminated_.store(true);
174 pendingTasks_->Stop();
175 delayTasks_->Stop();
176
177 if (consumer_ != nullptr) {
178 if (consumer_->joinable()) {
179 consumer_->join();
180 }
181 consumer_ = nullptr;
182 }
183 }
184
EnsureConsumeStarted()185 bool TaskExecutor::EnsureConsumeStarted()
186 {
187 if (consumer_ == nullptr) {
188 {
189 std::unique_lock<std::mutex> lock(dataMutex_);
190 if (consumer_ == nullptr) {
191 consumer_ = std::make_shared<std::thread>(&TaskExecutor::Consume, this);
192 if (consumer_ == nullptr) {
193 HILOG_ERROR("TaskExecutor::EnsureConsumeStarted consumer_ is nullptr");
194 return false;
195 }
196 HILOG_INFO("TaskExecutor::EnsureConsumeStarted start a delay task consumer");
197 }
198 }
199 }
200 return true;
201 }
202
Consume()203 void TaskExecutor::Consume()
204 {
205 for (;;) {
206 if (terminated_.load() && delayTasks_->Empty()) {
207 HILOG_INFO("TaskExecutor::Consume delay task is empty");
208 return;
209 }
210 std::shared_ptr<DelayTaskWrapper> delayTaskWrapper = delayTasks_->Take();
211 if (delayTaskWrapper == nullptr || delayTaskWrapper->runnable_ == nullptr) {
212 HILOG_ERROR("TaskExecutor::Consume delayTaskWrapper is nullptr");
213 return;
214 }
215 (delayTaskWrapper->runnable_)();
216 HILOG_INFO("TaskExecutor::Consume after run");
217 }
218 }
219
GetPendingTasksSize()220 int TaskExecutor::GetPendingTasksSize()
221 {
222 return pendingTasks_->Size();
223 }
224
GetTaskCounter()225 long TaskExecutor::GetTaskCounter()
226 {
227 return taskCounter_.load();
228 }
229
GetAndIncrement(std::atomic<long> & atomiclong)230 long TaskExecutor::GetAndIncrement(std::atomic<long> &atomiclong)
231 {
232 long ret = atomiclong.load();
233 atomiclong.fetch_add(1, std::memory_order_relaxed);
234
235 return ret;
236 }
237
IncrementAndGet(std::atomic<long> & atomiclong)238 long TaskExecutor::IncrementAndGet(std::atomic<long> &atomiclong)
239 {
240 atomiclong.fetch_add(1, std::memory_order_relaxed);
241
242 return atomiclong;
243 }
244 } // namespace AppExecFwk
245 } // namespace OHOS