1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task.h"
17
18 #include <chrono>
19 #include <functional>
20
21 #include "sdk_helper.h"
22
23 namespace OHOS {
24 namespace FileManagement {
25 namespace CloudSync {
26 using namespace std;
27
28 /* task runner */
TaskRunner(function<void ()> callback)29 TaskRunner::TaskRunner(function<void()> callback) : callback_(callback)
30 {
31 }
32
~TaskRunner()33 TaskRunner::~TaskRunner()
34 {
35 }
36
GenerateTaskId()37 int32_t TaskRunner::GenerateTaskId()
38 {
39 return currentId_.fetch_add(1);
40 }
41
AddTask(shared_ptr<Task> t)42 int32_t TaskRunner::AddTask(shared_ptr<Task> t)
43 {
44 unique_lock<mutex> lock(mutex_);
45
46 /* If stopped, no more tasks can be added */
47 if (stopFlag_) {
48 LOGI("add task fail since stop");
49 return E_STOP;
50 }
51
52 /* insert task */
53 t->SetId(GenerateTaskId());
54 taskList_.emplace_back(t);
55
56 return E_OK;
57 }
58
StartTask(shared_ptr<Task> t,TaskAction action)59 int32_t TaskRunner::StartTask(shared_ptr<Task> t, TaskAction action)
60 {
61 /*
62 * Try to execute the previous callback even in stop process.
63 * Yet no new task could be added in the callback.
64 */
65 t->SetAction(action);
66 int32_t ret = commitFunc_(shared_from_this(), t);
67 if (ret != E_OK) {
68 LOGE("commit task err %{public}d", ret);
69 return ret;
70 }
71 return E_OK;
72 }
73
CommitTask(shared_ptr<Task> t)74 int32_t TaskRunner::CommitTask(shared_ptr<Task> t)
75 {
76 /* add task */
77 int32_t ret = AddTask(t);
78 if (ret != E_OK) {
79 LOGE("add task err %{public}d", ret);
80 return ret;
81 }
82
83 /* launch */
84 ret = commitFunc_(shared_from_this(), t);
85 if (ret != E_OK) {
86 LOGE("commit task err %{public}d", ret);
87 return ret;
88 }
89
90 return E_OK;
91 }
92
CompleteTask(int32_t id)93 void TaskRunner::CompleteTask(int32_t id)
94 {
95 /* remove task */
96 unique_lock<mutex> lock(mutex_);
97 for (auto entry = taskList_.begin(); entry != taskList_.end();) {
98 if (entry->get()->GetId() == id) {
99 (void)taskList_.erase(entry);
100 break;
101 } else {
102 entry++;
103 }
104 }
105
106 if (taskList_.empty()) {
107 if (stopFlag_) {
108 /* if it has been stopped, notify the blocking caller */
109 stopFlag_ = false;
110 cv_.notify_all();
111 } else {
112 lock.unlock();
113 Reset();
114 /* otherwise just run its callback */
115 callback_();
116 }
117 }
118 }
119
StopAndWaitFor()120 bool TaskRunner::StopAndWaitFor()
121 {
122 unique_lock<mutex> lock(mutex_);
123 LOGI("task manager stop");
124 if (taskList_.empty()) {
125 return true;
126 }
127
128 stopFlag_ = true;
129 return cv_.wait_for(lock, chrono::seconds(WAIT_FOR_SECOND), [this] {
130 return taskList_.empty();
131 });
132 }
133
Reset()134 void TaskRunner::Reset()
135 {
136 currentId_.store(0);
137 }
138
SetCommitFunc(function<int32_t (shared_ptr<TaskRunner>,shared_ptr<Task>)> func)139 void TaskRunner::SetCommitFunc(function<int32_t(shared_ptr<TaskRunner>, shared_ptr<Task>)> func)
140 {
141 commitFunc_ = func;
142 }
143
CommitDummyTask()144 void TaskRunner::CommitDummyTask()
145 {
146 auto task = make_shared<Task>(nullptr, nullptr);
147 task->SetId(INVALID_ID);
148
149 unique_lock<mutex> lock(mutex_);
150 taskList_.emplace_back(task);
151 }
152
CompleteDummyTask()153 void TaskRunner::CompleteDummyTask()
154 {
155 CompleteTask(INVALID_ID);
156 }
157
158 /* TaskManager */
TaskManager()159 TaskManager::TaskManager()
160 {
161 pool_.SetMaxTaskNum(MAX_THREAD_NUM);
162 pool_.Start(MAX_THREAD_NUM);
163 }
164
~TaskManager()165 TaskManager::~TaskManager()
166 {
167 pool_.Stop();
168 }
169
AllocRunner(int32_t userId,const std::string & bundleName,function<void ()> callback)170 shared_ptr<TaskRunner> TaskManager::AllocRunner(int32_t userId, const std::string &bundleName,
171 function<void()> callback)
172 {
173 string key = GetKey(userId, bundleName);
174 unique_lock wlock(mapMutex_);
175 if (map_.find(key) == map_.end()) {
176 auto runner = make_shared<TaskRunner>(callback);
177 InitRunner(*runner);
178 map_.insert({ key, runner });
179 }
180 return map_[key];
181 }
182
ReleaseRunner(int32_t userId,const std::string & bundleName)183 void TaskManager::ReleaseRunner(int32_t userId, const std::string &bundleName)
184 {
185 string key = GetKey(userId, bundleName);
186 unique_lock wlock(mapMutex_);
187 map_.erase(key);
188 }
189
GetRunner(int32_t userId,const std::string & bundleName)190 shared_ptr<TaskRunner> TaskManager::GetRunner(int32_t userId, const std::string &bundleName)
191 {
192 string key = GetKey(userId, bundleName);
193 shared_lock rlock(mapMutex_);
194 if (map_.find(key) == map_.end()) {
195 return nullptr;
196 }
197 return map_[key];
198 }
199
InitRunner(TaskRunner & runner)200 void TaskManager::InitRunner(TaskRunner &runner)
201 {
202 runner.SetCommitFunc(bind(&TaskManager::CommitTask, this, placeholders::_1, placeholders::_2));
203 }
204
CommitTask(shared_ptr<TaskRunner> runner,shared_ptr<Task> t)205 int32_t TaskManager::CommitTask(shared_ptr<TaskRunner> runner, shared_ptr<Task> t)
206 {
207 pool_.AddTask([t, runner]() {
208 t->Run();
209 runner->CompleteTask(t->GetId());
210 });
211 return E_OK;
212 }
213
GetKey(int32_t userId,const string & bundleName)214 string TaskManager::GetKey(int32_t userId, const string &bundleName)
215 {
216 return to_string(userId) + bundleName;
217 }
218 } // namespace CloudSync
219 } // namespace FileManagement
220 } // namespace OHOS
221