• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "task_executor.h"
16 
17 #include "hilog_wrapper.h"
18 
19 namespace OHOS {
20 namespace AppExecFwk {
21 std::atomic<long> TaskExecutor::sequence(0);
22 
TaskExecutor(const std::shared_ptr<WorkerPoolConfig> & config)23 TaskExecutor::TaskExecutor(const std::shared_ptr<WorkerPoolConfig> &config) : WorkerPool(config)
24 {
25     terminated_ = false;
26     taskCounter_ = 0;
27     delayTasks_ = std::make_shared<DelayQueue>();
28     pendingTasks_ = std::make_shared<BlockingQueue>();
29 }
~TaskExecutor()30 TaskExecutor::~TaskExecutor()
31 {
32     if ((consumer_) && consumer_->joinable()) {
33         HILOG_INFO("TaskExecutor::~TaskExecutor consumer is running");
34         consumer_->join();
35     }
36     HILOG_INFO("TaskExecutor::~TaskExecutor");
37 }
38 
Execute(const std::shared_ptr<Task> & task)39 void TaskExecutor::Execute(const std::shared_ptr<Task> &task)
40 {
41     HILOG_INFO("TaskExecutor::Execute begin");
42     task->SetSequence(GetAndIncrement(sequence));
43 
44     std::shared_ptr<TaskExecutor> executor = shared_from_this();
45 
46     if (AddWorker(executor, task) == false) {
47         std::shared_ptr<PriorityTaskWrapper> priorityTaskWrapper =
48             std::make_shared<PriorityTaskWrapper>(task->GetPriority(), task);
49         if (pendingTasks_->Offer(priorityTaskWrapper) == false) {
50             HILOG_WARN("TaskExecutor::Execute rejected a task");
51         }
52     }
53     HILOG_INFO("TaskExecutor::Execute end");
54 }
55 
DoWorks(const std::shared_ptr<WorkerThread> & worker)56 ErrCode TaskExecutor::DoWorks(const std::shared_ptr<WorkerThread> &worker)
57 {
58     HILOG_INFO("TaskExecutor::DoWorks begin");
59     if (worker == nullptr) {
60         HILOG_ERROR("TaskExecutor::DoWorks worker is nullptr");
61         return ERR_APPEXECFWK_CHECK_FAILED;
62     }
63     if (worker->GetThread() == nullptr) {
64         HILOG_ERROR("TaskExecutor::DoWorks worker GetThread is nullptr");
65         return ERR_APPEXECFWK_CHECK_FAILED;
66     }
67 
68     std::shared_ptr<Task> task = worker->PollFirstTask();
69     bool isInterrupted = false;
70     bool done = false;
71     while (((task != nullptr && done == false) || ((task = GetTask(worker)) != nullptr))) {
72         HILOG_INFO("TaskExecutor::DoWorks loop tasks.");
73 
74         if (task) {
75             BeforeRun(task);
76             task->Run();
77             AfterRun(task);
78         }
79 
80         worker->IncTaskCount();
81         IncrementAndGet(taskCounter_);
82 
83         // loop condition
84         done = true;
85     }
86     OnWorkerExit(worker, isInterrupted);
87     HILOG_INFO("TaskExecutor::DoWorks end");
88     return ERR_OK;
89 }
GetTask(const std::shared_ptr<WorkerThread> & workerThread)90 std::shared_ptr<Task> TaskExecutor::GetTask(const std::shared_ptr<WorkerThread> &workerThread)
91 {
92     bool isTimeout = false;
93     std::shared_ptr<Task> nullRunnable = nullptr;
94     std::shared_ptr<Task> next = nullptr;
95 
96     for (;;) {
97         if (terminated_.load() && pendingTasks_->Empty()) {
98             HILOG_INFO("TaskExecutor::GetTask end: loop thread %{public}s is terminated",
99                 workerThread->GetThreadName().c_str());
100             DecrementThread();
101             return nullRunnable;
102         }
103 
104         int workerCount = GetWorkCount();
105         HILOG_INFO("TaskExecutor::GetTask  workerCount:%{public}d, GetCoreThreadCount: %{public}d",
106             workerCount,
107             GetCoreThreadCount());
108         bool needCheckTimeout = (workerCount > GetCoreThreadCount());
109         if (isTimeout && needCheckTimeout && pendingTasks_->Empty()) {
110             HILOG_INFO("TaskExecutor::GetTask isTimeout is true");
111             if (CompareAndDecNum(workerCount)) {
112                 HILOG_INFO("TaskExecutor::GetTask end: loop thread %{public}s is timeout",
113                     workerThread->GetThreadName().c_str());
114                 return nullRunnable;
115             }
116             continue;
117         }
118 
119         HILOG_INFO("TaskExecutor::GetTask need timeout=%{public}d", needCheckTimeout);
120         std::shared_ptr<PriorityTaskWrapper> next =
121             needCheckTimeout ? pendingTasks_->Poll(GetKeepAliveTime()) : pendingTasks_->Take();
122 
123         if (next != nullptr && next->task_ != nullptr) {
124             HILOG_INFO("TaskExecutor::GetTask end: loop thread %{public}s get next task",
125                 workerThread->GetThreadName().c_str());
126             return next->task_;
127         }
128         isTimeout = true;
129     }
130 }
131 
Terminate(bool force)132 void TaskExecutor::Terminate(bool force)
133 {
134     HILOG_INFO("TaskExecutor::Terminate begin");
135     TerminateConsumer();
136     ClosePool(force);
137     HILOG_INFO("TaskExecutor::Terminate end");
138 }
139 
AfterRun(const std::shared_ptr<Task> & task)140 void TaskExecutor::AfterRun(const std::shared_ptr<Task> &task)
141 {
142     task->AfterTaskExecute();
143 }
144 
BeforeRun(const std::shared_ptr<Task> & task)145 void TaskExecutor::BeforeRun(const std::shared_ptr<Task> &task)
146 {
147     task->BeforeTaskExecute();
148 }
149 
DelayExecute(const Runnable & task,long delayMs)150 bool TaskExecutor::DelayExecute(const Runnable &task, long delayMs)
151 {
152     if (delayMs <= 0) {
153         task();
154         HILOG_INFO("TaskExecutor::DelayExecute end and delayMs less than 0");
155         return true;
156     }
157     if (terminated_.load()) {
158         HILOG_INFO("TaskExecutor::DelayExecute end and terminate");
159         return false;
160     }
161     std::shared_ptr<DelayTaskWrapper> delayTaskWrapper = std::make_shared<DelayTaskWrapper>(delayMs, task);
162     if (delayTaskWrapper == nullptr) {
163         HILOG_INFO("TaskExecutor::DelayExecute end and delayTaskWrapper is nullptr");
164         return false;
165     }
166     delayTasks_->Offer(delayTaskWrapper);
167     return EnsureConsumeStarted();
168 }
169 
TerminateConsumer()170 void TaskExecutor::TerminateConsumer()
171 {
172     std::unique_lock<std::mutex> lock(dataMutex_);
173     terminated_.store(true);
174     pendingTasks_->Stop();
175     delayTasks_->Stop();
176 
177     if (consumer_ != nullptr) {
178         if (consumer_->joinable()) {
179             consumer_->join();
180         }
181         consumer_ = nullptr;
182     }
183 }
184 
EnsureConsumeStarted()185 bool TaskExecutor::EnsureConsumeStarted()
186 {
187     if (consumer_ == nullptr) {
188         {
189             std::unique_lock<std::mutex> lock(dataMutex_);
190             if (consumer_ == nullptr) {
191                 consumer_ = std::make_shared<std::thread>(&TaskExecutor::Consume, this);
192                 if (consumer_ == nullptr) {
193                     HILOG_ERROR("TaskExecutor::EnsureConsumeStarted consumer_ is nullptr");
194                     return false;
195                 }
196                 HILOG_INFO("TaskExecutor::EnsureConsumeStarted start a delay task consumer");
197             }
198         }
199     }
200     return true;
201 }
202 
Consume()203 void TaskExecutor::Consume()
204 {
205     for (;;) {
206         if (terminated_.load() && delayTasks_->Empty()) {
207             HILOG_INFO("TaskExecutor::Consume delay task is empty");
208             return;
209         }
210         std::shared_ptr<DelayTaskWrapper> delayTaskWrapper = delayTasks_->Take();
211         if (delayTaskWrapper == nullptr || delayTaskWrapper->runnable_ == nullptr) {
212             HILOG_ERROR("TaskExecutor::Consume delayTaskWrapper is nullptr");
213             return;
214         }
215         (delayTaskWrapper->runnable_)();
216         HILOG_INFO("TaskExecutor::Consume after run");
217     }
218 }
219 
GetPendingTasksSize()220 int TaskExecutor::GetPendingTasksSize()
221 {
222     return pendingTasks_->Size();
223 }
224 
GetTaskCounter()225 long TaskExecutor::GetTaskCounter()
226 {
227     return taskCounter_.load();
228 }
229 
GetAndIncrement(std::atomic<long> & atomiclong)230 long TaskExecutor::GetAndIncrement(std::atomic<long> &atomiclong)
231 {
232     long ret = atomiclong.load();
233     atomiclong.fetch_add(1, std::memory_order_relaxed);
234 
235     return ret;
236 }
237 
IncrementAndGet(std::atomic<long> & atomiclong)238 long TaskExecutor::IncrementAndGet(std::atomic<long> &atomiclong)
239 {
240     atomiclong.fetch_add(1, std::memory_order_relaxed);
241 
242     return atomiclong;
243 }
244 }  // namespace AppExecFwk
245 }  // namespace OHOS