• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "serial_task_dispatcher.h"
17 #include "app_log_wrapper.h"
18 #include "appexecfwk_errors.h"
19 namespace OHOS {
20 namespace AppExecFwk {
21 
22 std::string SerialTaskDispatcher::DISPATCHER_TAG = "SerialTaskDispatcher";
23 std::string SerialTaskDispatcher::ASYNC_DISPATCHER_TAG = DISPATCHER_TAG + "::asyncDispatch";
24 std::string SerialTaskDispatcher::SYNC_DISPATCHER_TAG = DISPATCHER_TAG + "::syncDispatch";
25 std::string SerialTaskDispatcher::DELAY_DISPATCHER_TAG = DISPATCHER_TAG + "::delayDispatch";
26 
SerialTaskDispatcher(const std::string & dispatcherName,const TaskPriority priority,const std::shared_ptr<TaskExecutor> & executor)27 SerialTaskDispatcher::SerialTaskDispatcher(
28     const std::string &dispatcherName, const TaskPriority priority, const std::shared_ptr<TaskExecutor> &executor)
29     : BaseTaskDispatcher(dispatcherName, priority)
30 {
31     running_ = false;
32     executor_ = executor;
33 }
34 
GetWorkingTasksSize()35 int SerialTaskDispatcher::GetWorkingTasksSize()
36 {
37     return workingTasks_.Size();
38 }
39 
GetDispatcherName()40 std::string SerialTaskDispatcher::GetDispatcherName()
41 {
42     return dispatcherName_;
43 }
44 
SyncDispatch(const std::shared_ptr<Runnable> & runnable)45 ErrCode SerialTaskDispatcher::SyncDispatch(const std::shared_ptr<Runnable> &runnable)
46 {
47     APP_LOGI("SerialTaskDispatcher::SyncDispatch start");
48     if (Check(runnable) != ERR_OK) {
49         APP_LOGE("SerialTaskDispatcher::SyncDispatch check failed");
50         return ERR_APPEXECFWK_CHECK_FAILED;
51     }
52 
53     std::shared_ptr<SyncTask> innerSyncTask = std::make_shared<SyncTask>(runnable, GetPriority(), shared_from_this());
54     if (innerSyncTask == nullptr) {
55         APP_LOGE("SerialTaskDispatcher::SyncDispatch innerSyncTask is nullptr");
56         return ERR_APPEXECFWK_CHECK_FAILED;
57     }
58     std::shared_ptr<Task> innerTask = std::static_pointer_cast<Task>(innerSyncTask);
59     if (innerTask == nullptr) {
60         APP_LOGE("SerialTaskDispatcher::SyncDispatch innerTask is nullptr");
61         return ERR_APPEXECFWK_CHECK_FAILED;
62     }
63     TracePointBeforePost(innerTask, false, SYNC_DISPATCHER_TAG);
64     OnNewTaskIn(innerTask);
65     innerSyncTask->WaitTask();
66     TracePointAfterPost(innerTask, false, DISPATCHER_TAG);
67 
68     APP_LOGI("SerialTaskDispatcher::SyncDispatch end");
69     return ERR_OK;
70 }
71 
AsyncDispatch(const std::shared_ptr<Runnable> & runnable)72 std::shared_ptr<Revocable> SerialTaskDispatcher::AsyncDispatch(const std::shared_ptr<Runnable> &runnable)
73 {
74     APP_LOGI("SerialTaskDispatcher::AsyncDispatch start");
75     if (Check(runnable) != ERR_OK) {
76         APP_LOGE("SerialTaskDispatcher::AsyncDispatch Check failed");
77         return nullptr;
78     }
79 
80     std::shared_ptr<Task> innerTask = std::make_shared<Task>(runnable, GetPriority(), shared_from_this());
81     if (innerTask == nullptr) {
82         APP_LOGE("SerialTaskDispatcher::AsyncDispatch innerTask is nullptr");
83         return nullptr;
84     }
85     TracePointBeforePost(innerTask, true, ASYNC_DISPATCHER_TAG);
86     APP_LOGI("SerialTaskDispatcher::AsyncDispatch into new async task");
87     OnNewTaskIn(innerTask);
88     APP_LOGI("SerialTaskDispatcher::AsyncDispatch end");
89     return innerTask;
90 }
91 
DelayDispatch(const std::shared_ptr<Runnable> & runnable,long delayMs)92 std::shared_ptr<Revocable> SerialTaskDispatcher::DelayDispatch(const std::shared_ptr<Runnable> &runnable, long delayMs)
93 {
94     APP_LOGI("SerialTaskDispatcher::DelayDispatch start");
95     if (executor_ == nullptr) {
96         APP_LOGE("SerialTaskDispatcher::DelayDispatch executor_ is nullptr");
97         return nullptr;
98     }
99     if (Check(runnable) != ERR_OK) {
100         APP_LOGE("SerialTaskDispatcher::DelayDispatch Check failed");
101         return nullptr;
102     }
103 
104     std::shared_ptr<Task> innerTask = std::make_shared<Task>(runnable, GetPriority(), shared_from_this());
105     if (innerTask == nullptr) {
106         APP_LOGE("SerialTaskDispatcher::DelayDispatch innerTask is nullptr");
107         return nullptr;
108     }
109     TracePointBeforePost(innerTask, true, DELAY_DISPATCHER_TAG);
110     // bind parameter to avoid deconstruct.
111     std::function<void()> callback = std::bind(&SerialTaskDispatcher::OnNewTaskIn, this, innerTask);
112     bool executeFlag = executor_->DelayExecute(callback, delayMs);
113     if (!executeFlag) {
114         APP_LOGE("SerialTaskDispatcher::DelayDispatch execute failed");
115         return nullptr;
116     }
117     APP_LOGI("SerialTaskDispatcher::DelayDispatch end");
118     return innerTask;
119 }
120 
OnNewTaskIn(std::shared_ptr<Task> & task)121 ErrCode SerialTaskDispatcher::OnNewTaskIn(std::shared_ptr<Task> &task)
122 {
123     APP_LOGI("SerialTaskDispatcher::OnNewTaskIn start");
124     ErrCode code = Prepare(task);
125     if (code != ERR_OK) {
126         APP_LOGE("SerialTaskDispatcher::OnNewTaskIn Prepare failed");
127         return ERR_APPEXECFWK_CHECK_FAILED;
128     }
129     {
130         std::unique_lock<std::mutex> lock(mutex_);
131         if (workingTasks_.Offer(task) == false) {
132             APP_LOGW("SerialTaskDispatcher.onNewTaskIn exceed the maximum capacity of Queue");
133         }
134     }
135 
136     Schedule();
137     APP_LOGI("SerialTaskDispatcher::OnNewTaskIn end");
138     return ERR_OK;
139 }
140 
Prepare(std::shared_ptr<Task> & task)141 ErrCode SerialTaskDispatcher::Prepare(std::shared_ptr<Task> &task)
142 {
143     APP_LOGI("SerialTaskDispatcher::Prepare start");
144     if (task == nullptr) {
145         APP_LOGE("SerialTaskDispatcher::Prepare task is nullptr");
146         return ERR_APPEXECFWK_CHECK_FAILED;
147     }
148     // inline class
149     class MyTaskListener : public TaskListener {
150     private:
151         std::function<void()> callback_;
152 
153     public:
154         void OnChanged(const TaskStage &stage)
155         {
156             if (stage.IsDone()) {
157                 callback_();
158             }
159         }
160         // set callback function
161         void Callback(const std::function<void()> &callbackFunction)
162         {
163             callback_ = std::move(callbackFunction);
164         }
165     };
166 
167     // set inline listener
168     std::shared_ptr<MyTaskListener> ptrlistener = std::make_shared<MyTaskListener>();
169     if (ptrlistener == nullptr) {
170         APP_LOGE("SerialTaskDispatcher::Prepare MyTaskListener is nullptr");
171         return ERR_APPEXECFWK_CHECK_FAILED;
172     }
173     const std::function<void()> onTaskDone = [&]() { OnTaskDone(); };
174     ptrlistener->Callback(onTaskDone);
175     task->AddTaskListener(ptrlistener);
176     APP_LOGI("SerialTaskDispatcher::Prepare end");
177     return ERR_OK;
178 }
179 
OnTaskDone()180 void SerialTaskDispatcher::OnTaskDone()
181 {
182     APP_LOGI("SerialTaskDispatcher::OnTaskDone start");
183     bool isExhausted = workingTasks_.Empty();
184     DoNext(isExhausted);
185     APP_LOGI("SerialTaskDispatcher::OnTaskDone end");
186 }
187 
Schedule()188 bool SerialTaskDispatcher::Schedule()
189 {
190     bool init = false;
191     if (!running_.compare_exchange_strong(init, true)) {
192         APP_LOGW("SerialTaskDispatcher::schedule already running");
193         return false;
194     }
195     APP_LOGI("SerialTaskDispatcher::Schedule do next");
196     return DoNext(false);
197 }
198 
DoNext(bool isExhausted)199 bool SerialTaskDispatcher::DoNext(bool isExhausted)
200 {
201     APP_LOGI("SerialTaskDispatcher::DoNext start");
202     std::shared_ptr<Task> nextptr = nullptr;
203     {
204         std::unique_lock<std::mutex> lock(mutex_);
205         nextptr = workingTasks_.Poll();
206         if (nextptr == nullptr) {
207             running_.store(false);
208             APP_LOGW("SerialTaskDispatcher::DoNext no more task");
209             return false;
210         }
211     }
212 
213     DoWork(nextptr);
214     APP_LOGI("SerialTaskDispatcher::DoNext end");
215     return true;
216 }
217 
DoWork(std::shared_ptr<Task> & task)218 void SerialTaskDispatcher::DoWork(std::shared_ptr<Task> &task)
219 {
220     APP_LOGI("SerialTaskDispatcher::DoWork called.");
221     // |task| mustn't be null
222     executor_->Execute(task);
223 }
224 }  // namespace AppExecFwk
225 }  // namespace OHOS