• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task.h"
17 
18 #include <chrono>
19 #include <functional>
20 
21 #include "sdk_helper.h"
22 
23 namespace OHOS {
24 namespace FileManagement {
25 namespace CloudSync {
26 using namespace std;
27 
28 /* task runner */
TaskRunner(function<void ()> callback)29 TaskRunner::TaskRunner(function<void()> callback) : callback_(callback)
30 {
31 }
32 
~TaskRunner()33 TaskRunner::~TaskRunner()
34 {
35 }
36 
GenerateTaskId()37 int32_t TaskRunner::GenerateTaskId()
38 {
39     return currentId_.fetch_add(1);
40 }
41 
AddTask(shared_ptr<Task> t)42 int32_t TaskRunner::AddTask(shared_ptr<Task> t)
43 {
44     unique_lock<mutex> lock(mutex_);
45 
46     /* If stopped, no more tasks can be added */
47     if (stopFlag_) {
48         LOGI("add task fail since stop");
49         return E_STOP;
50     }
51 
52     /* insert task */
53     t->SetId(GenerateTaskId());
54     taskList_.emplace_back(t);
55 
56     return E_OK;
57 }
58 
StartTask(shared_ptr<Task> t,TaskAction action)59 int32_t TaskRunner::StartTask(shared_ptr<Task> t, TaskAction action)
60 {
61     /*
62      * Try to execute the previous callback even in stop process.
63      * Yet no new task could be added in the callback.
64      */
65     t->SetAction(action);
66     int32_t ret = commitFunc_(shared_from_this(), t);
67     if (ret != E_OK) {
68         LOGE("commit task err %{public}d", ret);
69         return ret;
70     }
71     return E_OK;
72 }
73 
CommitTask(shared_ptr<Task> t)74 int32_t TaskRunner::CommitTask(shared_ptr<Task> t)
75 {
76     /* add task */
77     int32_t ret = AddTask(t);
78     if (ret != E_OK) {
79         LOGE("add task err %{public}d", ret);
80         return ret;
81     }
82 
83     /* launch */
84     ret = commitFunc_(shared_from_this(), t);
85     if (ret != E_OK) {
86         LOGE("commit task err %{public}d", ret);
87         return ret;
88     }
89 
90     return E_OK;
91 }
92 
CompleteTask(int32_t id)93 void TaskRunner::CompleteTask(int32_t id)
94 {
95     /* remove task */
96     unique_lock<mutex> lock(mutex_);
97     for (auto entry = taskList_.begin(); entry != taskList_.end();) {
98         if (entry->get()->GetId() == id) {
99             (void)taskList_.erase(entry);
100             break;
101         } else {
102             entry++;
103         }
104     }
105 
106     if (taskList_.empty()) {
107         if (stopFlag_) {
108             /* if it has been stopped, notify the blocking caller */
109             stopFlag_ = false;
110             cv_.notify_all();
111         } else {
112             lock.unlock();
113             Reset();
114             /* otherwise just run its callback */
115             callback_();
116         }
117     }
118 }
119 
StopAndWaitFor()120 bool TaskRunner::StopAndWaitFor()
121 {
122     unique_lock<mutex> lock(mutex_);
123     LOGI("task manager stop");
124     if (taskList_.empty()) {
125         return true;
126     }
127 
128     stopFlag_ = true;
129     return cv_.wait_for(lock, chrono::seconds(WAIT_FOR_SECOND), [this] {
130         return taskList_.empty();
131     });
132 }
133 
Reset()134 void TaskRunner::Reset()
135 {
136     currentId_.store(0);
137 }
138 
SetCommitFunc(function<int32_t (shared_ptr<TaskRunner>,shared_ptr<Task>)> func)139 void TaskRunner::SetCommitFunc(function<int32_t(shared_ptr<TaskRunner>, shared_ptr<Task>)> func)
140 {
141     commitFunc_ = func;
142 }
143 
CommitDummyTask()144 void TaskRunner::CommitDummyTask()
145 {
146     auto task = make_shared<Task>(nullptr, nullptr);
147     task->SetId(INVALID_ID);
148 
149     unique_lock<mutex> lock(mutex_);
150     taskList_.emplace_back(task);
151 }
152 
CompleteDummyTask()153 void TaskRunner::CompleteDummyTask()
154 {
155     CompleteTask(INVALID_ID);
156 }
157 
158 /* TaskManager */
TaskManager()159 TaskManager::TaskManager()
160 {
161     pool_.SetMaxTaskNum(MAX_THREAD_NUM);
162     pool_.Start(MAX_THREAD_NUM);
163 }
164 
~TaskManager()165 TaskManager::~TaskManager()
166 {
167     pool_.Stop();
168 }
169 
AllocRunner(int32_t userId,const std::string & bundleName,function<void ()> callback)170 shared_ptr<TaskRunner> TaskManager::AllocRunner(int32_t userId, const std::string &bundleName,
171     function<void()> callback)
172 {
173     string key = GetKey(userId, bundleName);
174     unique_lock wlock(mapMutex_);
175     if (map_.find(key) == map_.end()) {
176         auto runner = make_shared<TaskRunner>(callback);
177         InitRunner(*runner);
178         map_.insert({ key, runner });
179     }
180     return map_[key];
181 }
182 
ReleaseRunner(int32_t userId,const std::string & bundleName)183 void TaskManager::ReleaseRunner(int32_t userId, const std::string &bundleName)
184 {
185     string key = GetKey(userId, bundleName);
186     unique_lock wlock(mapMutex_);
187     map_.erase(key);
188 }
189 
GetRunner(int32_t userId,const std::string & bundleName)190 shared_ptr<TaskRunner> TaskManager::GetRunner(int32_t userId, const std::string &bundleName)
191 {
192     string key = GetKey(userId, bundleName);
193     shared_lock rlock(mapMutex_);
194     if (map_.find(key) == map_.end()) {
195         return nullptr;
196     }
197     return map_[key];
198 }
199 
InitRunner(TaskRunner & runner)200 void TaskManager::InitRunner(TaskRunner &runner)
201 {
202     runner.SetCommitFunc(bind(&TaskManager::CommitTask, this, placeholders::_1, placeholders::_2));
203 }
204 
CommitTask(shared_ptr<TaskRunner> runner,shared_ptr<Task> t)205 int32_t TaskManager::CommitTask(shared_ptr<TaskRunner> runner, shared_ptr<Task> t)
206 {
207     pool_.AddTask([t, runner]() {
208         t->Run();
209         runner->CompleteTask(t->GetId());
210     });
211     return E_OK;
212 }
213 
GetKey(int32_t userId,const string & bundleName)214 string TaskManager::GetKey(int32_t userId, const string &bundleName)
215 {
216     return to_string(userId) + bundleName;
217 }
218 } // namespace CloudSync
219 } // namespace FileManagement
220 } // namespace OHOS
221