• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021-2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "serial_task_dispatcher.h"
17 #include "hilog_wrapper.h"
18 #include "appexecfwk_errors.h"
19 namespace OHOS {
20 namespace AppExecFwk {
21 std::string SerialTaskDispatcher::DISPATCHER_TAG = "SerialTaskDispatcher";
22 std::string SerialTaskDispatcher::ASYNC_DISPATCHER_TAG = DISPATCHER_TAG + "::asyncDispatch";
23 std::string SerialTaskDispatcher::SYNC_DISPATCHER_TAG = DISPATCHER_TAG + "::syncDispatch";
24 std::string SerialTaskDispatcher::DELAY_DISPATCHER_TAG = DISPATCHER_TAG + "::delayDispatch";
25 
SerialTaskDispatcher(const std::string & dispatcherName,const TaskPriority priority,const std::shared_ptr<TaskExecutor> & executor)26 SerialTaskDispatcher::SerialTaskDispatcher(
27     const std::string &dispatcherName, const TaskPriority priority, const std::shared_ptr<TaskExecutor> &executor)
28     : BaseTaskDispatcher(dispatcherName, priority)
29 {
30     running_ = false;
31     executor_ = executor;
32 }
33 
GetWorkingTasksSize()34 int SerialTaskDispatcher::GetWorkingTasksSize()
35 {
36     return workingTasks_.Size();
37 }
38 
GetDispatcherName()39 std::string SerialTaskDispatcher::GetDispatcherName()
40 {
41     return dispatcherName_;
42 }
43 
SyncDispatch(const std::shared_ptr<Runnable> & runnable)44 ErrCode SerialTaskDispatcher::SyncDispatch(const std::shared_ptr<Runnable> &runnable)
45 {
46     HILOG_INFO("SerialTaskDispatcher::SyncDispatch start");
47     if (Check(runnable) != ERR_OK) {
48         HILOG_ERROR("SerialTaskDispatcher::SyncDispatch check failed");
49         return ERR_APPEXECFWK_CHECK_FAILED;
50     }
51 
52     std::shared_ptr<SyncTask> innerSyncTask = std::make_shared<SyncTask>(runnable, GetPriority(), shared_from_this());
53     if (innerSyncTask == nullptr) {
54         HILOG_ERROR("SerialTaskDispatcher::SyncDispatch innerSyncTask is nullptr");
55         return ERR_APPEXECFWK_CHECK_FAILED;
56     }
57     std::shared_ptr<Task> innerTask = std::static_pointer_cast<Task>(innerSyncTask);
58     if (innerTask == nullptr) {
59         HILOG_ERROR("SerialTaskDispatcher::SyncDispatch innerTask is nullptr");
60         return ERR_APPEXECFWK_CHECK_FAILED;
61     }
62     TracePointBeforePost(innerTask, false, SYNC_DISPATCHER_TAG);
63     OnNewTaskIn(innerTask);
64     innerSyncTask->WaitTask();
65     TracePointAfterPost(innerTask, false, DISPATCHER_TAG);
66 
67     HILOG_INFO("SerialTaskDispatcher::SyncDispatch end");
68     return ERR_OK;
69 }
70 
AsyncDispatch(const std::shared_ptr<Runnable> & runnable)71 std::shared_ptr<Revocable> SerialTaskDispatcher::AsyncDispatch(const std::shared_ptr<Runnable> &runnable)
72 {
73     HILOG_INFO("SerialTaskDispatcher::AsyncDispatch start");
74     if (Check(runnable) != ERR_OK) {
75         HILOG_ERROR("SerialTaskDispatcher::AsyncDispatch Check failed");
76         return nullptr;
77     }
78 
79     std::shared_ptr<Task> innerTask = std::make_shared<Task>(runnable, GetPriority(), shared_from_this());
80     if (innerTask == nullptr) {
81         HILOG_ERROR("SerialTaskDispatcher::AsyncDispatch innerTask is nullptr");
82         return nullptr;
83     }
84     TracePointBeforePost(innerTask, true, ASYNC_DISPATCHER_TAG);
85     HILOG_INFO("SerialTaskDispatcher::AsyncDispatch into new async task");
86     OnNewTaskIn(innerTask);
87     HILOG_INFO("SerialTaskDispatcher::AsyncDispatch end");
88     return innerTask;
89 }
90 
DelayDispatch(const std::shared_ptr<Runnable> & runnable,long delayMs)91 std::shared_ptr<Revocable> SerialTaskDispatcher::DelayDispatch(const std::shared_ptr<Runnable> &runnable, long delayMs)
92 {
93     HILOG_INFO("SerialTaskDispatcher::DelayDispatch start");
94     if (executor_ == nullptr) {
95         HILOG_ERROR("SerialTaskDispatcher::DelayDispatch executor_ is nullptr");
96         return nullptr;
97     }
98     if (Check(runnable) != ERR_OK) {
99         HILOG_ERROR("SerialTaskDispatcher::DelayDispatch Check failed");
100         return nullptr;
101     }
102 
103     std::shared_ptr<Task> innerTask = std::make_shared<Task>(runnable, GetPriority(), shared_from_this());
104     if (innerTask == nullptr) {
105         HILOG_ERROR("SerialTaskDispatcher::DelayDispatch innerTask is nullptr");
106         return nullptr;
107     }
108     TracePointBeforePost(innerTask, true, DELAY_DISPATCHER_TAG);
109     // bind parameter to avoid deconstruct.
110     std::function<void()> callback = std::bind(&SerialTaskDispatcher::OnNewTaskIn, this, innerTask);
111     bool executeFlag = executor_->DelayExecute(callback, delayMs);
112     if (!executeFlag) {
113         HILOG_ERROR("SerialTaskDispatcher::DelayDispatch execute failed");
114         return nullptr;
115     }
116     HILOG_INFO("SerialTaskDispatcher::DelayDispatch end");
117     return innerTask;
118 }
119 
OnNewTaskIn(std::shared_ptr<Task> & task)120 ErrCode SerialTaskDispatcher::OnNewTaskIn(std::shared_ptr<Task> &task)
121 {
122     HILOG_INFO("SerialTaskDispatcher::OnNewTaskIn start");
123     ErrCode code = Prepare(task);
124     if (code != ERR_OK) {
125         HILOG_ERROR("SerialTaskDispatcher::OnNewTaskIn Prepare failed");
126         return ERR_APPEXECFWK_CHECK_FAILED;
127     }
128     {
129         std::unique_lock<std::mutex> lock(mutex_);
130         if (workingTasks_.Offer(task) == false) {
131             HILOG_WARN("SerialTaskDispatcher.onNewTaskIn exceed the maximum capacity of Queue");
132         }
133     }
134 
135     Schedule();
136     HILOG_INFO("SerialTaskDispatcher::OnNewTaskIn end");
137     return ERR_OK;
138 }
139 
Prepare(std::shared_ptr<Task> & task)140 ErrCode SerialTaskDispatcher::Prepare(std::shared_ptr<Task> &task)
141 {
142     HILOG_INFO("SerialTaskDispatcher::Prepare start");
143     if (task == nullptr) {
144         HILOG_ERROR("SerialTaskDispatcher::Prepare task is nullptr");
145         return ERR_APPEXECFWK_CHECK_FAILED;
146     }
147     // inline class
148     class MyTaskListener : public TaskListener {
149     private:
150         std::function<void()> callback_;
151 
152     public:
153         void OnChanged(const TaskStage &stage)
154         {
155             if (stage.IsDone()) {
156                 callback_();
157             }
158         }
159         // set callback function
160         void Callback(const std::function<void()> &callbackFunction)
161         {
162             callback_ = std::move(callbackFunction);
163         }
164     };
165 
166     // set inline listener
167     std::shared_ptr<MyTaskListener> ptrlistener = std::make_shared<MyTaskListener>();
168     if (ptrlistener == nullptr) {
169         HILOG_ERROR("SerialTaskDispatcher::Prepare MyTaskListener is nullptr");
170         return ERR_APPEXECFWK_CHECK_FAILED;
171     }
172     const std::function<void()> onTaskDone = [&]() { OnTaskDone(); };
173     ptrlistener->Callback(onTaskDone);
174     task->AddTaskListener(ptrlistener);
175     HILOG_INFO("SerialTaskDispatcher::Prepare end");
176     return ERR_OK;
177 }
178 
OnTaskDone()179 void SerialTaskDispatcher::OnTaskDone()
180 {
181     HILOG_INFO("SerialTaskDispatcher::OnTaskDone start");
182     bool isExhausted = workingTasks_.Empty();
183     DoNext(isExhausted);
184     HILOG_INFO("SerialTaskDispatcher::OnTaskDone end");
185 }
186 
Schedule()187 bool SerialTaskDispatcher::Schedule()
188 {
189     bool init = false;
190     if (!running_.compare_exchange_strong(init, true)) {
191         HILOG_WARN("SerialTaskDispatcher::schedule already running");
192         return false;
193     }
194     HILOG_INFO("SerialTaskDispatcher::Schedule do next");
195     return DoNext(false);
196 }
197 
DoNext(bool isExhausted)198 bool SerialTaskDispatcher::DoNext(bool isExhausted)
199 {
200     HILOG_INFO("SerialTaskDispatcher::DoNext start");
201     std::shared_ptr<Task> nextptr = nullptr;
202     {
203         std::unique_lock<std::mutex> lock(mutex_);
204         nextptr = workingTasks_.Poll();
205         if (nextptr == nullptr) {
206             running_.store(false);
207             HILOG_WARN("SerialTaskDispatcher::DoNext no more task");
208             return false;
209         }
210     }
211 
212     DoWork(nextptr);
213     HILOG_INFO("SerialTaskDispatcher::DoNext end");
214     return true;
215 }
216 
DoWork(std::shared_ptr<Task> & task)217 void SerialTaskDispatcher::DoWork(std::shared_ptr<Task> &task)
218 {
219     HILOG_INFO("SerialTaskDispatcher::DoWork called.");
220     // |task| mustn't be null
221     executor_->Execute(task);
222 }
223 }  // namespace AppExecFwk
224 }  // namespace OHOS