• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task.h"
17 
18 #include <chrono>
19 #include <functional>
20 
21 #include "dfsu_memory_guard.h"
22 #include "sdk_helper.h"
23 #include "xcollie_helper.h"
24 
25 namespace OHOS {
26 namespace FileManagement {
27 namespace CloudSync {
28 using namespace std;
29 
30 /* task runner */
TaskRunner(function<void ()> callback)31 TaskRunner::TaskRunner(function<void()> callback) : callback_(callback) {}
32 
~TaskRunner()33 TaskRunner::~TaskRunner() {}
34 
GenerateTaskId()35 int32_t TaskRunner::GenerateTaskId()
36 {
37     return currentId_.fetch_add(1);
38 }
39 
AddTask(shared_ptr<Task> t)40 int32_t TaskRunner::AddTask(shared_ptr<Task> t)
41 {
42     unique_lock<mutex> lock(mutex_);
43 
44     /* insert task */
45     t->SetId(GenerateTaskId());
46     taskList_.emplace_back(t);
47 
48     return E_OK;
49 }
50 
StartTask(shared_ptr<Task> t,TaskAction action)51 int32_t TaskRunner::StartTask(shared_ptr<Task> t, TaskAction action)
52 {
53     /*
54      * Try to execute the previous callback even in stop process.
55      * Yet no new task could be added in the callback.
56      */
57     LOGI("StartTask begin, Task id: %{public}d", t->GetId());
58     t->SetAction(action);
59     int32_t ret = commitFunc_(shared_from_this(), t);
60     if (ret != E_OK) {
61         LOGE("commit task err %{public}d", ret);
62         return ret;
63     }
64     LOGI("StartTask end, Task id: %{public}d", t->GetId());
65     return E_OK;
66 }
67 
CommitTask(shared_ptr<Task> t)68 int32_t TaskRunner::CommitTask(shared_ptr<Task> t)
69 {
70     /* add task */
71     LOGI("start CommitTask, Task id: %{public}d", t->GetId());
72     int32_t ret = AddTask(t);
73     if (ret != E_OK) {
74         LOGE("add task err %{public}d", ret);
75         return ret;
76     }
77 
78     /* launch */
79     ret = commitFunc_(shared_from_this(), t);
80     if (ret != E_OK) {
81         LOGE("commit task err %{public}d", ret);
82         return ret;
83     }
84 
85     LOGI("CommitTask success,Task id: %{public}d", t->GetId());
86     return E_OK;
87 }
88 
CompleteTask(int32_t id)89 void TaskRunner::CompleteTask(int32_t id)
90 {
91     /* remove task */
92     unique_lock<mutex> lock(mutex_);
93     for (auto entry = taskList_.begin(); entry != taskList_.end();) {
94         if (entry->get()->GetId() == id) {
95             (void)taskList_.erase(entry);
96             break;
97         } else {
98             entry++;
99         }
100     }
101 
102     LOGI("taskList size: %{public}zu, Task id: %{public}d", taskList_.size(), id);
103     if (taskList_.empty()) {
104         if (!(*stopFlag_)) {
105             const int32_t DFX_DELAY_S = 60;
106             int32_t xcollieId = XCollieHelper::SetTimer("CloudSyncService_CompleteTask",
107                 DFX_DELAY_S, nullptr, nullptr, true);
108             lock.unlock();
109             XCollieHelper::CancelTimer(xcollieId);
110             /* otherwise just run its callback */
111             callback_();
112         }
113     }
114 }
115 
ReleaseTask()116 bool TaskRunner::ReleaseTask()
117 {
118     unique_lock<mutex> lock(mutex_);
119     LOGI("task manager stop");
120     taskList_.clear();
121     return taskList_.empty();
122 }
123 
Reset()124 void TaskRunner::Reset()
125 {
126     currentId_.store(0);
127 }
128 
SetCommitFunc(function<int32_t (shared_ptr<TaskRunner>,shared_ptr<Task>)> func)129 void TaskRunner::SetCommitFunc(function<int32_t(shared_ptr<TaskRunner>, shared_ptr<Task>)> func)
130 {
131     commitFunc_ = func;
132 }
133 
CommitDummyTask()134 void TaskRunner::CommitDummyTask()
135 {
136     auto task = make_shared<Task>(nullptr, nullptr);
137     task->SetId(INVALID_ID);
138 
139     unique_lock<mutex> lock(mutex_);
140     taskList_.emplace_back(task);
141 }
142 
CompleteDummyTask()143 void TaskRunner::CompleteDummyTask()
144 {
145     CompleteTask(INVALID_ID);
146 }
147 
GetStopFlag()148 std::shared_ptr<bool> TaskRunner::GetStopFlag()
149 {
150     return stopFlag_;
151 }
152 
SetStopFlag(std::shared_ptr<bool> stopFlag)153 void TaskRunner::SetStopFlag(std::shared_ptr<bool> stopFlag)
154 {
155     stopFlag_ = stopFlag;
156 }
157 
NeedRun(shared_ptr<Task> t)158 bool TaskRunner::NeedRun(shared_ptr<Task> t)
159 {
160     unique_lock<mutex> lock(mutex_);
161     for (auto it = taskList_.begin(); it != taskList_.end();) {
162         if (it->get()->GetId() == t->GetId()) {
163             return true;
164         } else {
165             it++;
166         }
167     }
168     return false;
169 }
170 
171 /* TaskManager */
TaskManager()172 TaskManager::TaskManager()
173 {
174     pool_.SetMaxTaskNum(MAX_THREAD_NUM);
175     pool_.Start(MAX_THREAD_NUM);
176 }
177 
~TaskManager()178 TaskManager::~TaskManager()
179 {
180     pool_.Stop();
181 }
182 
AllocRunner(int32_t userId,const std::string & bundleName,function<void ()> callback)183 shared_ptr<TaskRunner> TaskManager::AllocRunner(int32_t userId, const std::string &bundleName,
184                                                 function<void()> callback)
185 {
186     string key = GetKey(userId, bundleName);
187     unique_lock wlock(mapMutex_);
188     if (map_.find(key) == map_.end()) {
189         auto runner = make_shared<TaskRunner>(callback);
190         InitRunner(*runner);
191         map_.insert({ key, runner });
192     }
193     return map_[key];
194 }
195 
ReleaseRunner(int32_t userId,const std::string & bundleName)196 void TaskManager::ReleaseRunner(int32_t userId, const std::string &bundleName)
197 {
198     string key = GetKey(userId, bundleName);
199     unique_lock wlock(mapMutex_);
200     map_.erase(key);
201 }
202 
GetRunner(int32_t userId,const std::string & bundleName)203 shared_ptr<TaskRunner> TaskManager::GetRunner(int32_t userId, const std::string &bundleName)
204 {
205     string key = GetKey(userId, bundleName);
206     shared_lock rlock(mapMutex_);
207     if (map_.find(key) == map_.end()) {
208         return nullptr;
209     }
210     return map_[key];
211 }
212 
InitRunner(TaskRunner & runner)213 void TaskManager::InitRunner(TaskRunner &runner)
214 {
215     runner.SetCommitFunc(bind(&TaskManager::CommitTask, this, placeholders::_1, placeholders::_2));
216 }
217 
GetKey(int32_t userId,const string & bundleName)218 string TaskManager::GetKey(int32_t userId, const string &bundleName)
219 {
220     return to_string(userId) + bundleName;
221 }
222 
CommitTask(shared_ptr<TaskRunner> runner,shared_ptr<Task> t)223 int32_t TaskManager::CommitTask(shared_ptr<TaskRunner> runner, shared_ptr<Task> t)
224 {
225     LOGI("CommitTask begin, Task id: %{public}d", t->GetId());
226     if (runner->NeedRun(t)) {
227         LOGI("task need run");
228         pool_.AddTask([t, runner]() {
229             DfsuMemoryGuard cacheGuard;
230             t->Run();
231             runner->CompleteTask(t->GetId());
232         });
233     }
234     LOGI("CommitTask end, Task id: %{public}d", t->GetId());
235     return E_OK;
236 }
237 } // namespace CloudSync
238 } // namespace FileManagement
239 } // namespace OHOS
240