1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "startup_task_dispatcher.h"
17
18 #include "event_handler.h"
19 #include "hilog_tag_wrapper.h"
20 #include "startup_manager.h"
21
22 namespace OHOS {
23 namespace AbilityRuntime {
StartupTaskDispatcher(const std::map<std::string,std::shared_ptr<StartupTask>> & tasks,const std::shared_ptr<StartupSortResult> & sortResult)24 StartupTaskDispatcher::StartupTaskDispatcher(const std::map<std::string, std::shared_ptr<StartupTask>> &tasks,
25 const std::shared_ptr<StartupSortResult> &sortResult) : tasks_(tasks), sortResult_(sortResult)
26 {}
27
~StartupTaskDispatcher()28 StartupTaskDispatcher::~StartupTaskDispatcher()
29 {
30 TAG_LOGD(AAFwkTag::STARTUP, "deconstruct");
31 }
32
Run(const std::shared_ptr<OnCompletedCallback> & completedCallback,const std::shared_ptr<OnCompletedCallback> & mainThreadAwaitCallback)33 int32_t StartupTaskDispatcher::Run(const std::shared_ptr<OnCompletedCallback> &completedCallback,
34 const std::shared_ptr<OnCompletedCallback> &mainThreadAwaitCallback)
35 {
36 if (sortResult_ == nullptr) {
37 TAG_LOGE(AAFwkTag::STARTUP, "sortResult null");
38 return ERR_STARTUP_INTERNAL_ERROR;
39 }
40 for (auto &iter : tasks_) {
41 if (iter.second == nullptr) {
42 TAG_LOGE(AAFwkTag::STARTUP, "startup task %{public}s null", iter.first.c_str());
43 return ERR_STARTUP_INTERNAL_ERROR;
44 }
45 inDegreeMap_.emplace(iter.first, iter.second->GetDependenciesCount());
46 if (iter.second->GetWaitOnMainThread()) {
47 mainThreadAwaitCount_++;
48 }
49 }
50 tasksCount_ = tasks_.size();
51 completedCallback_ = completedCallback;
52 mainThreadAwaitCallback_ = mainThreadAwaitCallback;
53
54 if (mainThreadAwaitCount_ == 0) {
55 TAG_LOGD(AAFwkTag::STARTUP, "no main thread await task");
56 if (mainThreadAwaitCallback_ != nullptr) {
57 auto result = std::make_shared<StartupTaskResult>();
58 mainThreadAwaitCallback_->Call(result);
59 }
60 }
61
62 for (auto &iter : sortResult_->zeroDequeResult_) {
63 auto findResult = tasks_.find(iter);
64 if (findResult == tasks_.end()) {
65 TAG_LOGE(AAFwkTag::STARTUP, "startup task not found %{public}s", iter.c_str());
66 return ERR_STARTUP_INTERNAL_ERROR;
67 }
68 if (findResult->second == nullptr) {
69 TAG_LOGE(AAFwkTag::STARTUP, "startup task %{public}s null", iter.c_str());
70 return ERR_STARTUP_INTERNAL_ERROR;
71 }
72 if (isTimeoutStopped_) {
73 TAG_LOGD(AAFwkTag::STARTUP, "startup task dispatch timeout, stop running %{public}s", iter.c_str());
74 return ERR_STARTUP_TIMEOUT;
75 }
76 int32_t result = RunTaskInit(iter, findResult->second);
77 if (result != ERR_OK) {
78 return result;
79 }
80 }
81 return ERR_OK;
82 }
83
TimeoutStop()84 void StartupTaskDispatcher::TimeoutStop()
85 {
86 isTimeoutStopped_ = true;
87 }
88
Dispatch(const std::string & name,const std::shared_ptr<StartupTaskResult> & result)89 void StartupTaskDispatcher::Dispatch(const std::string &name, const std::shared_ptr<StartupTaskResult> &result)
90 {
91 TAG_LOGD(AAFwkTag::STARTUP, "run startup task %{public}s dispatch", name.c_str());
92 if (result == nullptr) {
93 OnError(ERR_STARTUP_INTERNAL_ERROR, name + ": result is null");
94 return;
95 }
96 if (result->GetResultCode() != ERR_OK) {
97 OnError(name, result);
98 return;
99 }
100 auto findResult = tasks_.find(name);
101 if (findResult == tasks_.end() || findResult->second == nullptr) {
102 OnError(ERR_STARTUP_INTERNAL_ERROR, name + " not found");
103 return;
104 }
105 if (NotifyChildren(name, result) != ERR_OK) {
106 return;
107 }
108
109 if (findResult->second->GetWaitOnMainThread()) {
110 mainThreadAwaitCount_--;
111 TAG_LOGD(AAFwkTag::STARTUP, "mainThreadAwaitCount %{public}d", mainThreadAwaitCount_);
112 if (mainThreadAwaitCount_ == 0) {
113 if (mainThreadAwaitCallback_ != nullptr) {
114 mainThreadAwaitCallback_->Call(result);
115 }
116 }
117 }
118 tasksCount_--;
119 TAG_LOGD(AAFwkTag::STARTUP, "tasksCount %{public}d", tasksCount_);
120 if (tasksCount_ == 0) {
121 if (completedCallback_ != nullptr) {
122 completedCallback_->Call(result);
123 }
124 }
125 }
126
NotifyChildren(const std::string & name,const std::shared_ptr<StartupTaskResult> & result)127 int32_t StartupTaskDispatcher::NotifyChildren(const std::string &name, const std::shared_ptr<StartupTaskResult> &result)
128 {
129 if (sortResult_ == nullptr) {
130 OnError(ERR_STARTUP_INTERNAL_ERROR, name + ": sort result is null");
131 return ERR_STARTUP_INTERNAL_ERROR;
132 }
133 auto findResult = sortResult_->startupChildrenMap_.find(name);
134 if (findResult == sortResult_->startupChildrenMap_.end()) {
135 OnError(ERR_STARTUP_INTERNAL_ERROR, name + " is not found");
136 return ERR_STARTUP_INTERNAL_ERROR;
137 }
138 std::vector<std::shared_ptr<StartupTask>> zeroInDegree;
139 for (auto &child : findResult->second) {
140 auto childFindResult = inDegreeMap_.find(child);
141 if (childFindResult == inDegreeMap_.end()) {
142 OnError(ERR_STARTUP_INTERNAL_ERROR, child + "is not found in inDegreeMap_.");
143 return ERR_STARTUP_INTERNAL_ERROR;
144 }
145 auto childStartupTask = tasks_.find(child);
146 if (childStartupTask == tasks_.end()) {
147 OnError(ERR_STARTUP_INTERNAL_ERROR, child + "is not found in tasks_.");
148 return ERR_STARTUP_INTERNAL_ERROR;
149 }
150 if (childStartupTask->second == nullptr) {
151 OnError(ERR_STARTUP_INTERNAL_ERROR, child + " task is null.");
152 return ERR_STARTUP_INTERNAL_ERROR;
153 }
154 childStartupTask->second->RunTaskOnDependencyCompleted(name, result);
155 childFindResult->second--;
156 if (childFindResult->second == 0) {
157 zeroInDegree.emplace_back(childStartupTask->second);
158 }
159 }
160 for (auto &iter : zeroInDegree) {
161 if (isTimeoutStopped_) {
162 TAG_LOGD(AAFwkTag::STARTUP, "startup task dispatch timeout, stop running %{public}s",
163 iter->GetName().c_str());
164 return ERR_STARTUP_TIMEOUT;
165 }
166 int32_t runResult = RunTaskInit(iter->GetName(), iter);
167 if (runResult != ERR_OK) {
168 return runResult;
169 }
170 }
171 return ERR_OK;
172 }
173
RunTaskInit(const std::string & name,const std::shared_ptr<StartupTask> & task)174 int32_t StartupTaskDispatcher::RunTaskInit(const std::string &name, const std::shared_ptr<StartupTask> &task)
175 {
176 TAG_LOGD(AAFwkTag::STARTUP, "%{public}s init", name.c_str());
177 std::unique_ptr<StartupTaskResultCallback> callback = std::make_unique<StartupTaskResultCallback>();
178 callback->Push([weak = weak_from_this(), name](const std::shared_ptr<StartupTaskResult> &result) {
179 auto startupTaskDispatcher = weak.lock();
180 if (startupTaskDispatcher == nullptr) {
181 TAG_LOGD(AAFwkTag::STARTUP, "startupTaskDispatcher may have been release due to previous error");
182 return;
183 }
184 startupTaskDispatcher->Dispatch(name, result);
185 });
186 StartupTask::State state = task->GetState();
187 if (state == StartupTask::State::CREATED) {
188 int32_t result = task->RunTaskPreInit(callback);
189 if (result != ERR_OK) {
190 return result;
191 }
192 return task->RunTaskInit(std::move(callback));
193 } else if (state == StartupTask::State::INITIALIZED) {
194 callback->Call(task->GetResult());
195 return ERR_OK;
196 } else if (state == StartupTask::State::INITIALIZING) {
197 return task->AddExtraCallback(std::move(callback));
198 } else {
199 // state: INVALID
200 TAG_LOGE(AAFwkTag::STARTUP, "%{public}s task state: INVALID", name.c_str());
201 return ERR_STARTUP_INTERNAL_ERROR;
202 }
203 }
204
OnError(const std::string & name,const std::shared_ptr<StartupTaskResult> & result)205 void StartupTaskDispatcher::OnError(const std::string &name, const std::shared_ptr<StartupTaskResult> &result)
206 {
207 TAG_LOGE(AAFwkTag::STARTUP, "%{public}s failed, %{public}d", name.c_str(), result->GetResultCode());
208 std::string resultMessage = name + ": " + result->GetResultMessage();
209 result->SetResultMessage(resultMessage);
210 if (completedCallback_ != nullptr) {
211 completedCallback_->Call(result);
212 }
213 DelayedSingleton<StartupManager>::GetInstance()->PostMainThreadTask(
214 [mainThreadAwaitCallback = mainThreadAwaitCallback_, result]() {
215 if (mainThreadAwaitCallback != nullptr) {
216 mainThreadAwaitCallback->Call(result);
217 }
218 });
219 }
220
OnError(int32_t errorCode,const std::string & errorMessage)221 void StartupTaskDispatcher::OnError(int32_t errorCode, const std::string &errorMessage)
222 {
223 TAG_LOGE(AAFwkTag::STARTUP, "%{public}s", errorMessage.c_str());
224 auto result = std::make_shared<StartupTaskResult>(errorCode, errorMessage);
225 if (completedCallback_ != nullptr) {
226 completedCallback_->Call(result);
227 }
228 DelayedSingleton<StartupManager>::GetInstance()->PostMainThreadTask(
229 [mainThreadAwaitCallback = mainThreadAwaitCallback_, result]() {
230 if (mainThreadAwaitCallback != nullptr) {
231 mainThreadAwaitCallback->Call(result);
232 }
233 });
234 }
235 } // namespace AbilityRuntime
236 } // namespace OHOS
237