1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task.h"
17
18 #include <chrono>
19 #include <functional>
20
21 #include "dfsu_memory_guard.h"
22 #include "sdk_helper.h"
23 #include "xcollie_helper.h"
24
25 namespace OHOS {
26 namespace FileManagement {
27 namespace CloudSync {
28 using namespace std;
29
30 /* task runner */
TaskRunner(function<void ()> callback)31 TaskRunner::TaskRunner(function<void()> callback) : callback_(callback) {}
32
~TaskRunner()33 TaskRunner::~TaskRunner() {}
34
GenerateTaskId()35 int32_t TaskRunner::GenerateTaskId()
36 {
37 return currentId_.fetch_add(1);
38 }
39
AddTask(shared_ptr<Task> t)40 int32_t TaskRunner::AddTask(shared_ptr<Task> t)
41 {
42 unique_lock<mutex> lock(mutex_);
43
44 /* insert task */
45 t->SetId(GenerateTaskId());
46 taskList_.emplace_back(t);
47
48 return E_OK;
49 }
50
StartTask(shared_ptr<Task> t,TaskAction action)51 int32_t TaskRunner::StartTask(shared_ptr<Task> t, TaskAction action)
52 {
53 /*
54 * Try to execute the previous callback even in stop process.
55 * Yet no new task could be added in the callback.
56 */
57 LOGI("StartTask begin, Task id: %{public}d", t->GetId());
58 t->SetAction(action);
59 int32_t ret = commitFunc_(shared_from_this(), t);
60 if (ret != E_OK) {
61 LOGE("commit task err %{public}d", ret);
62 return ret;
63 }
64 LOGI("StartTask end, Task id: %{public}d", t->GetId());
65 return E_OK;
66 }
67
CommitTask(shared_ptr<Task> t)68 int32_t TaskRunner::CommitTask(shared_ptr<Task> t)
69 {
70 /* add task */
71 LOGI("start CommitTask, Task id: %{public}d", t->GetId());
72 int32_t ret = AddTask(t);
73 if (ret != E_OK) {
74 LOGE("add task err %{public}d", ret);
75 return ret;
76 }
77
78 /* launch */
79 ret = commitFunc_(shared_from_this(), t);
80 if (ret != E_OK) {
81 LOGE("commit task err %{public}d", ret);
82 return ret;
83 }
84
85 LOGI("CommitTask success,Task id: %{public}d", t->GetId());
86 return E_OK;
87 }
88
CompleteTask(int32_t id)89 void TaskRunner::CompleteTask(int32_t id)
90 {
91 /* remove task */
92 unique_lock<mutex> lock(mutex_);
93 for (auto entry = taskList_.begin(); entry != taskList_.end();) {
94 if (entry->get()->GetId() == id) {
95 (void)taskList_.erase(entry);
96 break;
97 } else {
98 entry++;
99 }
100 }
101
102 LOGI("taskList size: %{public}zu, Task id: %{public}d", taskList_.size(), id);
103 if (taskList_.empty()) {
104 if (!(*stopFlag_)) {
105 const int32_t DFX_DELAY_S = 60;
106 int32_t xcollieId = XCollieHelper::SetTimer("CloudSyncService_CompleteTask",
107 DFX_DELAY_S, nullptr, nullptr, true);
108 lock.unlock();
109 XCollieHelper::CancelTimer(xcollieId);
110 /* otherwise just run its callback */
111 callback_();
112 }
113 }
114 }
115
ReleaseTask()116 bool TaskRunner::ReleaseTask()
117 {
118 unique_lock<mutex> lock(mutex_);
119 LOGI("task manager stop");
120 taskList_.clear();
121 return taskList_.empty();
122 }
123
Reset()124 void TaskRunner::Reset()
125 {
126 currentId_.store(0);
127 }
128
SetCommitFunc(function<int32_t (shared_ptr<TaskRunner>,shared_ptr<Task>)> func)129 void TaskRunner::SetCommitFunc(function<int32_t(shared_ptr<TaskRunner>, shared_ptr<Task>)> func)
130 {
131 commitFunc_ = func;
132 }
133
CommitDummyTask()134 void TaskRunner::CommitDummyTask()
135 {
136 auto task = make_shared<Task>(nullptr, nullptr);
137 task->SetId(INVALID_ID);
138
139 unique_lock<mutex> lock(mutex_);
140 taskList_.emplace_back(task);
141 }
142
CompleteDummyTask()143 void TaskRunner::CompleteDummyTask()
144 {
145 CompleteTask(INVALID_ID);
146 }
147
GetStopFlag()148 std::shared_ptr<bool> TaskRunner::GetStopFlag()
149 {
150 return stopFlag_;
151 }
152
SetStopFlag(std::shared_ptr<bool> stopFlag)153 void TaskRunner::SetStopFlag(std::shared_ptr<bool> stopFlag)
154 {
155 stopFlag_ = stopFlag;
156 }
157
NeedRun(shared_ptr<Task> t)158 bool TaskRunner::NeedRun(shared_ptr<Task> t)
159 {
160 unique_lock<mutex> lock(mutex_);
161 for (auto it = taskList_.begin(); it != taskList_.end();) {
162 if (it->get()->GetId() == t->GetId()) {
163 return true;
164 } else {
165 it++;
166 }
167 }
168 return false;
169 }
170
171 /* TaskManager */
TaskManager()172 TaskManager::TaskManager()
173 {
174 pool_.SetMaxTaskNum(MAX_THREAD_NUM);
175 pool_.Start(MAX_THREAD_NUM);
176 }
177
~TaskManager()178 TaskManager::~TaskManager()
179 {
180 pool_.Stop();
181 }
182
AllocRunner(int32_t userId,const std::string & bundleName,function<void ()> callback)183 shared_ptr<TaskRunner> TaskManager::AllocRunner(int32_t userId, const std::string &bundleName,
184 function<void()> callback)
185 {
186 string key = GetKey(userId, bundleName);
187 unique_lock wlock(mapMutex_);
188 if (map_.find(key) == map_.end()) {
189 auto runner = make_shared<TaskRunner>(callback);
190 InitRunner(*runner);
191 map_.insert({ key, runner });
192 }
193 return map_[key];
194 }
195
ReleaseRunner(int32_t userId,const std::string & bundleName)196 void TaskManager::ReleaseRunner(int32_t userId, const std::string &bundleName)
197 {
198 string key = GetKey(userId, bundleName);
199 unique_lock wlock(mapMutex_);
200 map_.erase(key);
201 }
202
GetRunner(int32_t userId,const std::string & bundleName)203 shared_ptr<TaskRunner> TaskManager::GetRunner(int32_t userId, const std::string &bundleName)
204 {
205 string key = GetKey(userId, bundleName);
206 shared_lock rlock(mapMutex_);
207 if (map_.find(key) == map_.end()) {
208 return nullptr;
209 }
210 return map_[key];
211 }
212
InitRunner(TaskRunner & runner)213 void TaskManager::InitRunner(TaskRunner &runner)
214 {
215 runner.SetCommitFunc(bind(&TaskManager::CommitTask, this, placeholders::_1, placeholders::_2));
216 }
217
GetKey(int32_t userId,const string & bundleName)218 string TaskManager::GetKey(int32_t userId, const string &bundleName)
219 {
220 return to_string(userId) + bundleName;
221 }
222
CommitTask(shared_ptr<TaskRunner> runner,shared_ptr<Task> t)223 int32_t TaskManager::CommitTask(shared_ptr<TaskRunner> runner, shared_ptr<Task> t)
224 {
225 LOGI("CommitTask begin, Task id: %{public}d", t->GetId());
226 if (runner->NeedRun(t)) {
227 LOGI("task need run");
228 pool_.AddTask([t, runner]() {
229 DfsuMemoryGuard cacheGuard;
230 t->Run();
231 runner->CompleteTask(t->GetId());
232 });
233 }
234 LOGI("CommitTask end, Task id: %{public}d", t->GetId());
235 return E_OK;
236 }
237 } // namespace CloudSync
238 } // namespace FileManagement
239 } // namespace OHOS
240