1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "serial_task_dispatcher.h"
17 #include "app_log_wrapper.h"
18 #include "appexecfwk_errors.h"
19 namespace OHOS {
20 namespace AppExecFwk {
21
22 std::string SerialTaskDispatcher::DISPATCHER_TAG = "SerialTaskDispatcher";
23 std::string SerialTaskDispatcher::ASYNC_DISPATCHER_TAG = DISPATCHER_TAG + "::asyncDispatch";
24 std::string SerialTaskDispatcher::SYNC_DISPATCHER_TAG = DISPATCHER_TAG + "::syncDispatch";
25 std::string SerialTaskDispatcher::DELAY_DISPATCHER_TAG = DISPATCHER_TAG + "::delayDispatch";
26
SerialTaskDispatcher(const std::string & dispatcherName,const TaskPriority priority,const std::shared_ptr<TaskExecutor> & executor)27 SerialTaskDispatcher::SerialTaskDispatcher(
28 const std::string &dispatcherName, const TaskPriority priority, const std::shared_ptr<TaskExecutor> &executor)
29 : BaseTaskDispatcher(dispatcherName, priority)
30 {
31 running_ = false;
32 executor_ = executor;
33 }
34
GetWorkingTasksSize()35 int SerialTaskDispatcher::GetWorkingTasksSize()
36 {
37 return workingTasks_.Size();
38 }
39
GetDispatcherName()40 std::string SerialTaskDispatcher::GetDispatcherName()
41 {
42 return dispatcherName_;
43 }
44
SyncDispatch(const std::shared_ptr<Runnable> & runnable)45 ErrCode SerialTaskDispatcher::SyncDispatch(const std::shared_ptr<Runnable> &runnable)
46 {
47 APP_LOGI("SerialTaskDispatcher::SyncDispatch start");
48 if (Check(runnable) != ERR_OK) {
49 APP_LOGE("SerialTaskDispatcher::SyncDispatch check failed");
50 return ERR_APPEXECFWK_CHECK_FAILED;
51 }
52
53 std::shared_ptr<SyncTask> innerSyncTask = std::make_shared<SyncTask>(runnable, GetPriority(), shared_from_this());
54 if (innerSyncTask == nullptr) {
55 APP_LOGE("SerialTaskDispatcher::SyncDispatch innerSyncTask is nullptr");
56 return ERR_APPEXECFWK_CHECK_FAILED;
57 }
58 std::shared_ptr<Task> innerTask = std::static_pointer_cast<Task>(innerSyncTask);
59 if (innerTask == nullptr) {
60 APP_LOGE("SerialTaskDispatcher::SyncDispatch innerTask is nullptr");
61 return ERR_APPEXECFWK_CHECK_FAILED;
62 }
63 TracePointBeforePost(innerTask, false, SYNC_DISPATCHER_TAG);
64 OnNewTaskIn(innerTask);
65 innerSyncTask->WaitTask();
66 TracePointAfterPost(innerTask, false, DISPATCHER_TAG);
67
68 APP_LOGI("SerialTaskDispatcher::SyncDispatch end");
69 return ERR_OK;
70 }
71
AsyncDispatch(const std::shared_ptr<Runnable> & runnable)72 std::shared_ptr<Revocable> SerialTaskDispatcher::AsyncDispatch(const std::shared_ptr<Runnable> &runnable)
73 {
74 APP_LOGI("SerialTaskDispatcher::AsyncDispatch start");
75 if (Check(runnable) != ERR_OK) {
76 APP_LOGE("SerialTaskDispatcher::AsyncDispatch Check failed");
77 return nullptr;
78 }
79
80 std::shared_ptr<Task> innerTask = std::make_shared<Task>(runnable, GetPriority(), shared_from_this());
81 if (innerTask == nullptr) {
82 APP_LOGE("SerialTaskDispatcher::AsyncDispatch innerTask is nullptr");
83 return nullptr;
84 }
85 TracePointBeforePost(innerTask, true, ASYNC_DISPATCHER_TAG);
86 APP_LOGI("SerialTaskDispatcher::AsyncDispatch into new async task");
87 OnNewTaskIn(innerTask);
88 APP_LOGI("SerialTaskDispatcher::AsyncDispatch end");
89 return innerTask;
90 }
91
DelayDispatch(const std::shared_ptr<Runnable> & runnable,long delayMs)92 std::shared_ptr<Revocable> SerialTaskDispatcher::DelayDispatch(const std::shared_ptr<Runnable> &runnable, long delayMs)
93 {
94 APP_LOGI("SerialTaskDispatcher::DelayDispatch start");
95 if (executor_ == nullptr) {
96 APP_LOGE("SerialTaskDispatcher::DelayDispatch executor_ is nullptr");
97 return nullptr;
98 }
99 if (Check(runnable) != ERR_OK) {
100 APP_LOGE("SerialTaskDispatcher::DelayDispatch Check failed");
101 return nullptr;
102 }
103
104 std::shared_ptr<Task> innerTask = std::make_shared<Task>(runnable, GetPriority(), shared_from_this());
105 if (innerTask == nullptr) {
106 APP_LOGE("SerialTaskDispatcher::DelayDispatch innerTask is nullptr");
107 return nullptr;
108 }
109 TracePointBeforePost(innerTask, true, DELAY_DISPATCHER_TAG);
110 // bind parameter to avoid deconstruct.
111 std::function<void()> callback = std::bind(&SerialTaskDispatcher::OnNewTaskIn, this, innerTask);
112 bool executeFlag = executor_->DelayExecute(callback, delayMs);
113 if (!executeFlag) {
114 APP_LOGE("SerialTaskDispatcher::DelayDispatch execute failed");
115 return nullptr;
116 }
117 APP_LOGI("SerialTaskDispatcher::DelayDispatch end");
118 return innerTask;
119 }
120
OnNewTaskIn(std::shared_ptr<Task> & task)121 ErrCode SerialTaskDispatcher::OnNewTaskIn(std::shared_ptr<Task> &task)
122 {
123 APP_LOGI("SerialTaskDispatcher::OnNewTaskIn start");
124 ErrCode code = Prepare(task);
125 if (code != ERR_OK) {
126 APP_LOGE("SerialTaskDispatcher::OnNewTaskIn Prepare failed");
127 return ERR_APPEXECFWK_CHECK_FAILED;
128 }
129 {
130 std::unique_lock<std::mutex> lock(mutex_);
131 if (workingTasks_.Offer(task) == false) {
132 APP_LOGW("SerialTaskDispatcher.onNewTaskIn exceed the maximum capacity of Queue");
133 }
134 }
135
136 Schedule();
137 APP_LOGI("SerialTaskDispatcher::OnNewTaskIn end");
138 return ERR_OK;
139 }
140
Prepare(std::shared_ptr<Task> & task)141 ErrCode SerialTaskDispatcher::Prepare(std::shared_ptr<Task> &task)
142 {
143 APP_LOGI("SerialTaskDispatcher::Prepare start");
144 if (task == nullptr) {
145 APP_LOGE("SerialTaskDispatcher::Prepare task is nullptr");
146 return ERR_APPEXECFWK_CHECK_FAILED;
147 }
148 // inline class
149 class MyTaskListener : public TaskListener {
150 private:
151 std::function<void()> callback_;
152
153 public:
154 void OnChanged(const TaskStage &stage)
155 {
156 if (stage.IsDone()) {
157 callback_();
158 }
159 }
160 // set callback function
161 void Callback(const std::function<void()> &callbackFunction)
162 {
163 callback_ = std::move(callbackFunction);
164 }
165 };
166
167 // set inline listener
168 std::shared_ptr<MyTaskListener> ptrlistener = std::make_shared<MyTaskListener>();
169 if (ptrlistener == nullptr) {
170 APP_LOGE("SerialTaskDispatcher::Prepare MyTaskListener is nullptr");
171 return ERR_APPEXECFWK_CHECK_FAILED;
172 }
173 const std::function<void()> onTaskDone = [&]() { OnTaskDone(); };
174 ptrlistener->Callback(onTaskDone);
175 task->AddTaskListener(ptrlistener);
176 APP_LOGI("SerialTaskDispatcher::Prepare end");
177 return ERR_OK;
178 }
179
OnTaskDone()180 void SerialTaskDispatcher::OnTaskDone()
181 {
182 APP_LOGI("SerialTaskDispatcher::OnTaskDone start");
183 bool isExhausted = workingTasks_.Empty();
184 DoNext(isExhausted);
185 APP_LOGI("SerialTaskDispatcher::OnTaskDone end");
186 }
187
Schedule()188 bool SerialTaskDispatcher::Schedule()
189 {
190 bool init = false;
191 if (!running_.compare_exchange_strong(init, true)) {
192 APP_LOGW("SerialTaskDispatcher::schedule already running");
193 return false;
194 }
195 APP_LOGI("SerialTaskDispatcher::Schedule do next");
196 return DoNext(false);
197 }
198
DoNext(bool isExhausted)199 bool SerialTaskDispatcher::DoNext(bool isExhausted)
200 {
201 APP_LOGI("SerialTaskDispatcher::DoNext start");
202 std::shared_ptr<Task> nextptr = nullptr;
203 {
204 std::unique_lock<std::mutex> lock(mutex_);
205 nextptr = workingTasks_.Poll();
206 if (nextptr == nullptr) {
207 running_.store(false);
208 APP_LOGW("SerialTaskDispatcher::DoNext no more task");
209 return false;
210 }
211 }
212
213 DoWork(nextptr);
214 APP_LOGI("SerialTaskDispatcher::DoNext end");
215 return true;
216 }
217
DoWork(std::shared_ptr<Task> & task)218 void SerialTaskDispatcher::DoWork(std::shared_ptr<Task> &task)
219 {
220 APP_LOGI("SerialTaskDispatcher::DoWork called.");
221 // |task| mustn't be null
222 executor_->Execute(task);
223 }
224 } // namespace AppExecFwk
225 } // namespace OHOS