• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  * http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include "task_executor.h"
16 
17 #include "app_log_wrapper.h"
18 
19 namespace OHOS {
20 namespace AppExecFwk {
21 std::atomic<long> TaskExecutor::sequence(0);
22 
TaskExecutor(const std::shared_ptr<WorkerPoolConfig> & config)23 TaskExecutor::TaskExecutor(const std::shared_ptr<WorkerPoolConfig> &config) : WorkerPool(config)
24 {
25     terminated_ = false;
26     taskCounter_ = 0;
27     delayTasks_ = std::make_shared<DelayQueue>();
28     pendingTasks_ = std::make_shared<BlockingQueue>();
29 }
~TaskExecutor()30 TaskExecutor::~TaskExecutor()
31 {
32     if ((consumer_) && consumer_->joinable()) {
33         APP_LOGI("TaskExecutor::~TaskExecutor consumer is running");
34         consumer_->join();
35     }
36     APP_LOGI("TaskExecutor::~TaskExecutor");
37 }
38 
Execute(const std::shared_ptr<Task> & task)39 void TaskExecutor::Execute(const std::shared_ptr<Task> &task)
40 {
41     APP_LOGI("TaskExecutor::Execute begin");
42     task->SetSequence(GetAndIncrement(sequence));
43 
44     std::shared_ptr<TaskExecutor> executor = shared_from_this();
45 
46     if (AddWorker(executor, task) == false) {
47         std::shared_ptr<PriorityTaskWrapper> priorityTaskWrapper =
48             std::make_shared<PriorityTaskWrapper>(task->GetPriority(), task);
49         if (pendingTasks_->Offer(priorityTaskWrapper) == false) {
50             APP_LOGW("TaskExecutor::Execute rejected a task");
51         }
52     }
53     APP_LOGI("TaskExecutor::Execute end");
54 }
55 
DoWorks(const std::shared_ptr<WorkerThread> & worker)56 ErrCode TaskExecutor::DoWorks(const std::shared_ptr<WorkerThread> &worker)
57 {
58     APP_LOGI("TaskExecutor::DoWorks begin");
59     if (worker == nullptr) {
60         APP_LOGE("TaskExecutor::DoWorks worker is nullptr");
61         return ERR_APPEXECFWK_CHECK_FAILED;
62     }
63     if (worker->GetThread() == nullptr) {
64         APP_LOGE("TaskExecutor::DoWorks worker GetThread is nullptr");
65         return ERR_APPEXECFWK_CHECK_FAILED;
66     }
67 
68     std::shared_ptr<Task> task = worker->PollFirstTask();
69 
70     bool isInterrupted = false;
71     bool done = false;
72     while (((task != nullptr && done == false) || ((task = GetTask(worker)) != nullptr))) {
73         APP_LOGI("TaskExecutor::DoWorks loop tasks.");
74 
75         BeforeRun(task);
76 
77         task->Run();
78 
79         AfterRun(task);
80 
81         worker->IncTaskCount();
82         IncrementAndGet(taskCounter_);
83 
84         // loop condition
85         done = true;
86     }
87     OnWorkerExit(worker, isInterrupted);
88     APP_LOGI("TaskExecutor::DoWorks end");
89     return ERR_OK;
90 }
GetTask(const std::shared_ptr<WorkerThread> & workerThread)91 std::shared_ptr<Task> TaskExecutor::GetTask(const std::shared_ptr<WorkerThread> &workerThread)
92 {
93     bool isTimeout = false;
94     std::shared_ptr<Task> nullRunnable = nullptr;
95     std::shared_ptr<Task> next = nullptr;
96 
97     for(;;){
98         if (terminated_.load() && pendingTasks_->Empty()) {
99             APP_LOGI("TaskExecutor::GetTask end: loop thread %{public}s is terminated",
100                 workerThread->GetThreadName().c_str());
101             DecrementThread();
102             return nullRunnable;
103         }
104 
105         int workerCount = GetWorkCount();
106         APP_LOGI("TaskExecutor::GetTask  workerCount:%{public}d, GetCoreThreadCount: %{public}d",
107             workerCount,
108             GetCoreThreadCount());
109         bool needCheckTimeout = (workerCount > GetCoreThreadCount());
110         if (isTimeout && needCheckTimeout && pendingTasks_->Empty()) {
111             APP_LOGI("TaskExecutor::GetTask isTimeout is true");
112             if (CompareAndDecNum(workerCount)) {
113                 APP_LOGI("TaskExecutor::GetTask end: loop thread %{public}s is timeout",
114                     workerThread->GetThreadName().c_str());
115                 return nullRunnable;
116             }
117             continue;
118         }
119 
120         APP_LOGI("TaskExecutor::GetTask need timeout=%{public}d", needCheckTimeout);
121         std::shared_ptr<PriorityTaskWrapper> next =
122             needCheckTimeout ? pendingTasks_->Poll(GetKeepAliveTime()) : pendingTasks_->Take();
123 
124         if (next != nullptr && next->task_ != nullptr) {
125             APP_LOGI("TaskExecutor::GetTask end: loop thread %{public}s get next task",
126                 workerThread->GetThreadName().c_str());
127             return next->task_;
128         }
129         isTimeout = true;
130     }
131 }
132 
Terminate(bool force)133 void TaskExecutor::Terminate(bool force)
134 {
135     APP_LOGI("TaskExecutor::Terminate begin");
136     TerminateConsumer();
137     ClosePool(force);
138     APP_LOGI("TaskExecutor::Terminate end");
139 }
140 
AfterRun(const std::shared_ptr<Task> & task)141 void TaskExecutor::AfterRun(const std::shared_ptr<Task> &task)
142 {
143     task->AfterTaskExecute();
144 }
145 
BeforeRun(const std::shared_ptr<Task> & task)146 void TaskExecutor::BeforeRun(const std::shared_ptr<Task> &task)
147 {
148     task->BeforeTaskExecute();
149 }
150 
DelayExecute(const Runnable & task,long delayMs)151 bool TaskExecutor::DelayExecute(const Runnable &task, long delayMs)
152 {
153     if (delayMs <= 0) {
154         task();
155         APP_LOGI("TaskExecutor::DelayExecute end and delayMs less than 0");
156         return true;
157     }
158     if (terminated_.load()) {
159         APP_LOGI("TaskExecutor::DelayExecute end and terminate");
160         return false;
161     }
162     std::shared_ptr<DelayTaskWrapper> delayTaskWrapper = std::make_shared<DelayTaskWrapper>(delayMs, task);
163     if (delayTaskWrapper == nullptr) {
164         APP_LOGI("TaskExecutor::DelayExecute end and delayTaskWrapper is nullptr");
165         return false;
166     }
167     delayTasks_->Offer(delayTaskWrapper);
168     return EnsureConsumeStarted();
169 }
170 
TerminateConsumer()171 void TaskExecutor::TerminateConsumer()
172 {
173 
174     std::unique_lock<std::mutex> lock(dataMutex_);
175     terminated_.store(true);
176     pendingTasks_->Stop();
177     delayTasks_->Stop();
178 
179     if (consumer_ != nullptr) {
180         if (consumer_->joinable()) {
181             consumer_->join();
182         }
183         consumer_ = nullptr;
184     }
185 }
186 
EnsureConsumeStarted()187 bool TaskExecutor::EnsureConsumeStarted()
188 {
189     if (consumer_ == nullptr) {
190         {
191             std::unique_lock<std::mutex> lock(dataMutex_);
192             if (consumer_ == nullptr) {
193                 consumer_ = std::make_shared<std::thread>(&TaskExecutor::Consume, this);
194                 if (consumer_ == nullptr) {
195                     APP_LOGE("TaskExecutor::EnsureConsumeStarted consumer_ is nullptr");
196                     return false;
197                 }
198                 APP_LOGI("TaskExecutor::EnsureConsumeStarted start a delay task consumer");
199             }
200         }
201     }
202     return true;
203 }
204 
Consume()205 void TaskExecutor::Consume()
206 {
207     for(;;){
208         if (terminated_.load() && delayTasks_->Empty()) {
209             APP_LOGI("TaskExecutor::Consume delay task is empty");
210             break;
211         }
212         std::shared_ptr<DelayTaskWrapper> delayTaskWrapper = delayTasks_->Take();
213         if (delayTaskWrapper == nullptr || delayTaskWrapper->runnable_ == nullptr) {
214             APP_LOGE("TaskExecutor::Consume delayTaskWrapper is nullptr");
215             return;
216         };
217         (delayTaskWrapper->runnable_)();
218         APP_LOGI("TaskExecutor::Consume after run");
219     }
220 }
221 
GetPendingTasksSize()222 int TaskExecutor::GetPendingTasksSize()
223 {
224     return pendingTasks_->Size();
225 }
226 
GetTaskCounter()227 long TaskExecutor::GetTaskCounter()
228 {
229     return taskCounter_.load();
230 }
231 
GetAndIncrement(std::atomic<long> & atomiclong)232 long TaskExecutor::GetAndIncrement(std::atomic<long> &atomiclong)
233 {
234     long ret = atomiclong.load();
235     atomiclong.fetch_add(1, std::memory_order_relaxed);
236 
237     return ret;
238 }
239 
IncrementAndGet(std::atomic<long> & atomiclong)240 long TaskExecutor::IncrementAndGet(std::atomic<long> &atomiclong)
241 {
242     atomiclong.fetch_add(1, std::memory_order_relaxed);
243 
244     return atomiclong;
245 }
246 }  // namespace AppExecFwk
247 }  // namespace OHOS