1 /*
2 * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "serial_task_dispatcher.h"
17 #include "hilog_wrapper.h"
18 #include "appexecfwk_errors.h"
19 namespace OHOS {
20 namespace AppExecFwk {
21 std::string SerialTaskDispatcher::DISPATCHER_TAG = "SerialTaskDispatcher";
22 std::string SerialTaskDispatcher::ASYNC_DISPATCHER_TAG = DISPATCHER_TAG + "::asyncDispatch";
23 std::string SerialTaskDispatcher::SYNC_DISPATCHER_TAG = DISPATCHER_TAG + "::syncDispatch";
24 std::string SerialTaskDispatcher::DELAY_DISPATCHER_TAG = DISPATCHER_TAG + "::delayDispatch";
25
SerialTaskDispatcher(const std::string & dispatcherName,const TaskPriority priority,const std::shared_ptr<TaskExecutor> & executor)26 SerialTaskDispatcher::SerialTaskDispatcher(
27 const std::string &dispatcherName, const TaskPriority priority, const std::shared_ptr<TaskExecutor> &executor)
28 : BaseTaskDispatcher(dispatcherName, priority)
29 {
30 running_ = false;
31 executor_ = executor;
32 }
33
GetWorkingTasksSize()34 int SerialTaskDispatcher::GetWorkingTasksSize()
35 {
36 return workingTasks_.Size();
37 }
38
GetDispatcherName()39 std::string SerialTaskDispatcher::GetDispatcherName()
40 {
41 return dispatcherName_;
42 }
43
SyncDispatch(const std::shared_ptr<Runnable> & runnable)44 ErrCode SerialTaskDispatcher::SyncDispatch(const std::shared_ptr<Runnable> &runnable)
45 {
46 HILOG_INFO("SerialTaskDispatcher::SyncDispatch start");
47 if (Check(runnable) != ERR_OK) {
48 HILOG_ERROR("SerialTaskDispatcher::SyncDispatch check failed");
49 return ERR_APPEXECFWK_CHECK_FAILED;
50 }
51
52 std::shared_ptr<SyncTask> innerSyncTask = std::make_shared<SyncTask>(runnable, GetPriority(), shared_from_this());
53 if (innerSyncTask == nullptr) {
54 HILOG_ERROR("SerialTaskDispatcher::SyncDispatch innerSyncTask is nullptr");
55 return ERR_APPEXECFWK_CHECK_FAILED;
56 }
57 std::shared_ptr<Task> innerTask = std::static_pointer_cast<Task>(innerSyncTask);
58 if (innerTask == nullptr) {
59 HILOG_ERROR("SerialTaskDispatcher::SyncDispatch innerTask is nullptr");
60 return ERR_APPEXECFWK_CHECK_FAILED;
61 }
62 TracePointBeforePost(innerTask, false, SYNC_DISPATCHER_TAG);
63 OnNewTaskIn(innerTask);
64 innerSyncTask->WaitTask();
65 TracePointAfterPost(innerTask, false, DISPATCHER_TAG);
66
67 HILOG_INFO("SerialTaskDispatcher::SyncDispatch end");
68 return ERR_OK;
69 }
70
AsyncDispatch(const std::shared_ptr<Runnable> & runnable)71 std::shared_ptr<Revocable> SerialTaskDispatcher::AsyncDispatch(const std::shared_ptr<Runnable> &runnable)
72 {
73 HILOG_INFO("SerialTaskDispatcher::AsyncDispatch start");
74 if (Check(runnable) != ERR_OK) {
75 HILOG_ERROR("SerialTaskDispatcher::AsyncDispatch Check failed");
76 return nullptr;
77 }
78
79 std::shared_ptr<Task> innerTask = std::make_shared<Task>(runnable, GetPriority(), shared_from_this());
80 if (innerTask == nullptr) {
81 HILOG_ERROR("SerialTaskDispatcher::AsyncDispatch innerTask is nullptr");
82 return nullptr;
83 }
84 TracePointBeforePost(innerTask, true, ASYNC_DISPATCHER_TAG);
85 HILOG_INFO("SerialTaskDispatcher::AsyncDispatch into new async task");
86 OnNewTaskIn(innerTask);
87 HILOG_INFO("SerialTaskDispatcher::AsyncDispatch end");
88 return innerTask;
89 }
90
DelayDispatch(const std::shared_ptr<Runnable> & runnable,long delayMs)91 std::shared_ptr<Revocable> SerialTaskDispatcher::DelayDispatch(const std::shared_ptr<Runnable> &runnable, long delayMs)
92 {
93 HILOG_INFO("SerialTaskDispatcher::DelayDispatch start");
94 if (executor_ == nullptr) {
95 HILOG_ERROR("SerialTaskDispatcher::DelayDispatch executor_ is nullptr");
96 return nullptr;
97 }
98 if (Check(runnable) != ERR_OK) {
99 HILOG_ERROR("SerialTaskDispatcher::DelayDispatch Check failed");
100 return nullptr;
101 }
102
103 std::shared_ptr<Task> innerTask = std::make_shared<Task>(runnable, GetPriority(), shared_from_this());
104 if (innerTask == nullptr) {
105 HILOG_ERROR("SerialTaskDispatcher::DelayDispatch innerTask is nullptr");
106 return nullptr;
107 }
108 TracePointBeforePost(innerTask, true, DELAY_DISPATCHER_TAG);
109 // bind parameter to avoid deconstruct.
110 std::function<void()> callback = std::bind(&SerialTaskDispatcher::OnNewTaskIn, this, innerTask);
111 bool executeFlag = executor_->DelayExecute(callback, delayMs);
112 if (!executeFlag) {
113 HILOG_ERROR("SerialTaskDispatcher::DelayDispatch execute failed");
114 return nullptr;
115 }
116 HILOG_INFO("SerialTaskDispatcher::DelayDispatch end");
117 return innerTask;
118 }
119
OnNewTaskIn(std::shared_ptr<Task> & task)120 ErrCode SerialTaskDispatcher::OnNewTaskIn(std::shared_ptr<Task> &task)
121 {
122 HILOG_INFO("SerialTaskDispatcher::OnNewTaskIn start");
123 ErrCode code = Prepare(task);
124 if (code != ERR_OK) {
125 HILOG_ERROR("SerialTaskDispatcher::OnNewTaskIn Prepare failed");
126 return ERR_APPEXECFWK_CHECK_FAILED;
127 }
128 {
129 std::unique_lock<std::mutex> lock(mutex_);
130 if (workingTasks_.Offer(task) == false) {
131 HILOG_WARN("SerialTaskDispatcher.onNewTaskIn exceed the maximum capacity of Queue");
132 }
133 }
134
135 Schedule();
136 HILOG_INFO("SerialTaskDispatcher::OnNewTaskIn end");
137 return ERR_OK;
138 }
139
Prepare(std::shared_ptr<Task> & task)140 ErrCode SerialTaskDispatcher::Prepare(std::shared_ptr<Task> &task)
141 {
142 HILOG_INFO("SerialTaskDispatcher::Prepare start");
143 if (task == nullptr) {
144 HILOG_ERROR("SerialTaskDispatcher::Prepare task is nullptr");
145 return ERR_APPEXECFWK_CHECK_FAILED;
146 }
147 // inline class
148 class MyTaskListener : public TaskListener {
149 private:
150 std::function<void()> callback_;
151
152 public:
153 void OnChanged(const TaskStage &stage)
154 {
155 if (stage.IsDone()) {
156 callback_();
157 }
158 }
159 // set callback function
160 void Callback(const std::function<void()> &callbackFunction)
161 {
162 callback_ = std::move(callbackFunction);
163 }
164 };
165
166 // set inline listener
167 std::shared_ptr<MyTaskListener> ptrlistener = std::make_shared<MyTaskListener>();
168 if (ptrlistener == nullptr) {
169 HILOG_ERROR("SerialTaskDispatcher::Prepare MyTaskListener is nullptr");
170 return ERR_APPEXECFWK_CHECK_FAILED;
171 }
172 const std::function<void()> onTaskDone = [&]() { OnTaskDone(); };
173 ptrlistener->Callback(onTaskDone);
174 task->AddTaskListener(ptrlistener);
175 HILOG_INFO("SerialTaskDispatcher::Prepare end");
176 return ERR_OK;
177 }
178
OnTaskDone()179 void SerialTaskDispatcher::OnTaskDone()
180 {
181 HILOG_INFO("SerialTaskDispatcher::OnTaskDone start");
182 bool isExhausted = workingTasks_.Empty();
183 DoNext(isExhausted);
184 HILOG_INFO("SerialTaskDispatcher::OnTaskDone end");
185 }
186
Schedule()187 bool SerialTaskDispatcher::Schedule()
188 {
189 bool init = false;
190 if (!running_.compare_exchange_strong(init, true)) {
191 HILOG_WARN("SerialTaskDispatcher::schedule already running");
192 return false;
193 }
194 HILOG_INFO("SerialTaskDispatcher::Schedule do next");
195 return DoNext(false);
196 }
197
DoNext(bool isExhausted)198 bool SerialTaskDispatcher::DoNext(bool isExhausted)
199 {
200 HILOG_INFO("SerialTaskDispatcher::DoNext start");
201 std::shared_ptr<Task> nextptr = nullptr;
202 {
203 std::unique_lock<std::mutex> lock(mutex_);
204 nextptr = workingTasks_.Poll();
205 if (nextptr == nullptr) {
206 running_.store(false);
207 HILOG_WARN("SerialTaskDispatcher::DoNext no more task");
208 return false;
209 }
210 }
211
212 DoWork(nextptr);
213 HILOG_INFO("SerialTaskDispatcher::DoNext end");
214 return true;
215 }
216
DoWork(std::shared_ptr<Task> & task)217 void SerialTaskDispatcher::DoWork(std::shared_ptr<Task> &task)
218 {
219 HILOG_INFO("SerialTaskDispatcher::DoWork called.");
220 // |task| mustn't be null
221 executor_->Execute(task);
222 }
223 } // namespace AppExecFwk
224 } // namespace OHOS