1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OHOS_CLOUD_SYNC_SERVICE_TASK_H
17 #define OHOS_CLOUD_SYNC_SERVICE_TASK_H
18
19 #include <condition_variable>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <nocopyable.h>
24 #include <singleton.h>
25 #include <shared_mutex>
26 #include <vector>
27
28 #include "thread_pool.h"
29
30 #include "data_handler.h"
31 #include "dfs_error.h"
32 #include "sdk_helper.h"
33 #include "utils_log.h"
34
35 namespace OHOS {
36 namespace FileManagement {
37 namespace CloudSync {
38 class TaskContext : public DriveKit::DKContext {
39 public:
TaskContext(std::shared_ptr<DataHandler> handler)40 TaskContext(std::shared_ptr<DataHandler> handler) : handler_(handler),
41 assets_(0)
42 {
43 }
44
GetHandler()45 std::shared_ptr<DataHandler> GetHandler()
46 {
47 return handler_;
48 }
49
SetAssets(std::vector<DriveKit::DKDownloadAsset> & assets)50 void SetAssets(std::vector<DriveKit::DKDownloadAsset> &assets)
51 {
52 assets_ = std::move(assets);
53 }
54
GetAssets()55 std::vector<DriveKit::DKDownloadAsset> GetAssets()
56 {
57 return std::move(assets_);
58 }
59
60 private:
61 std::shared_ptr<DataHandler> handler_;
62 /* not final, might put this in its derived class */
63 std::vector<DriveKit::DKDownloadAsset> assets_;
64 };
65
66 class DownloadTaskContext : public TaskContext {
67 public:
DownloadTaskContext(std::shared_ptr<DataHandler> handler,int32_t batchNo)68 DownloadTaskContext(std::shared_ptr<DataHandler> handler, int32_t batchNo)
69 : TaskContext(std::move(handler)), batchNo_(batchNo) {}
70
GetBatchNo()71 int32_t GetBatchNo() const
72 {
73 return batchNo_;
74 }
75
76 private:
77 int32_t batchNo_;
78 };
79
80 constexpr int32_t INVALID_ID = -1;
81
82 using TaskAction = std::function<void(std::shared_ptr<TaskContext>)>;
83 class Task {
84 public:
Task(std::shared_ptr<TaskContext> context,TaskAction action)85 Task(std::shared_ptr<TaskContext> context, TaskAction action) : id_(INVALID_ID),
86 context_(context), action_(action) {}
87 virtual ~Task() = default;
88
Run()89 virtual void Run()
90 {
91 action_(context_);
92 }
93
SetAction(TaskAction action)94 void SetAction(TaskAction action)
95 {
96 action_ = action;
97 }
98
SetId(int32_t id)99 void SetId(int32_t id)
100 {
101 id_ = id;
102 }
103
GetId()104 int32_t GetId()
105 {
106 return id_;
107 }
108
109 private:
110 int32_t id_;
111 std::shared_ptr<TaskContext> context_;
112 TaskAction action_;
113 };
114
115 class TaskRunner : public std::enable_shared_from_this<TaskRunner> {
116 public:
117 TaskRunner(std::function<void()> callback);
118 virtual ~TaskRunner();
119
120 /* async */
121 template<typename T>
122 int32_t AsyncRun(std::shared_ptr<TaskContext> context, void(T::*f)(std::shared_ptr<TaskContext>),
123 T *ptr);
124 template<typename T, typename RET, typename... ARGS>
125 std::function<RET(ARGS...)> AsyncCallback(RET(T::*f)(ARGS...), T *ptr);
126
127 /* dummy */
128 void CommitDummyTask();
129 void CompleteDummyTask();
130
131 /* reset and stop */
132 void Reset();
133 bool ReleaseTask();
134
135 bool NeedRun(std::shared_ptr<Task> t);
136
137 std::shared_ptr<bool> GetStopFlag();
138 void SetStopFlag(std::shared_ptr<bool> stopFlag);
139
140 private:
141 int32_t GenerateTaskId();
142
143 /* task operations */
144 int32_t AddTask(std::shared_ptr<Task> t);
145 int32_t StartTask(std::shared_ptr<Task> t, TaskAction action);
146 int32_t CommitTask(std::shared_ptr<Task> t);
147 void CompleteTask(int32_t id);
148
149 /* set commit func */
150 friend class TaskManager;
151 void SetCommitFunc(std::function<int32_t(std::shared_ptr<TaskRunner>,
152 std::shared_ptr<Task>)> func);
153 std::function<int32_t(std::shared_ptr<TaskRunner>, std::shared_ptr<Task>)> commitFunc_;
154
155 /* stop */
156 std::shared_ptr<bool> stopFlag_ = nullptr;
157
158 /* id */
159 std::atomic<int32_t> currentId_ = 0;
160
161 /* task list */
162 std::mutex mutex_;
163 std::list<std::shared_ptr<Task>> taskList_;
164
165 /* data syncer callback */
166 std::function<void()> callback_;
167 };
168
169 template<typename T>
AsyncRun(std::shared_ptr<TaskContext> context,void (T::* f)(std::shared_ptr<TaskContext>),T * ptr)170 inline int32_t TaskRunner::AsyncRun(std::shared_ptr<TaskContext> context,
171 void(T::*f)(std::shared_ptr<TaskContext>), T *ptr)
172 {
173 std::shared_ptr<Task> task = std::make_shared<Task>(context,
174 [ptr, f](std::shared_ptr<TaskContext> ctx) {
175 (ptr->*f)(ctx);
176 }
177 );
178
179 int32_t ret = CommitTask(task);
180 if (ret != E_OK) {
181 LOGE("async run commit task err %{public}d", ret);
182 return ret;
183 }
184
185 return E_OK;
186 }
187
188 /*
189 * About ARGS...
190 * <1> async execute requires value-copy or shared_ptr like input parameters,
191 * but no reference for lifecycle consideration.
192 * <2> In addition, [=] requires the wrapped function with const parameters.
193 */
194 template<typename T, typename RET, typename... ARGS>
AsyncCallback(RET (T::* f)(ARGS...),T * ptr)195 inline std::function<RET(ARGS...)> TaskRunner::AsyncCallback(RET(T::*f)(ARGS...), T *ptr)
196 {
197 std::shared_ptr<Task> task = std::make_shared<Task>(nullptr, nullptr);
198
199 int32_t ret = AddTask(task);
200 if (ret != E_OK) {
201 LOGE("async callback add task err %{public}d", ret);
202 return nullptr;
203 }
204
205 return [ptr, f, task, this](ARGS... args) -> RET {
206 int32_t ret = this->StartTask(task, [ptr, f, args...](std::shared_ptr<TaskContext>) {
207 (ptr->*f)(args...);
208 });
209 if (ret != E_OK) {
210 LOGE("async callback start task err %{public}d", ret);
211 }
212 };
213 }
214
215 class TaskManager : public NoCopyable {
216 DECLARE_DELAYED_SINGLETON(TaskManager);
217
218 public:
219 std::shared_ptr<TaskRunner> AllocRunner(int32_t userId, const std::string &bundleName,
220 std::function<void()> callback);
221 std::shared_ptr<TaskRunner> GetRunner(int32_t userId, const std::string &bundleName);
222 void ReleaseRunner(int32_t userId, const std::string &bundleName);
223
224 int32_t CommitTask(std::shared_ptr<TaskRunner> runner, std::shared_ptr<Task> t);
225
226 private:
227 std::string GetKey(int32_t userId, const std::string &bundleName);
228 void InitRunner(TaskRunner &runner);
229
230 /* runners */
231 std::unordered_map<std::string, std::shared_ptr<TaskRunner>> map_;
232 std::shared_mutex mapMutex_;
233
234 /* thread pool */
235 ThreadPool pool_ = ThreadPool("TaskManager");
236 const int32_t MAX_THREAD_NUM = 12;
237 };
238 } // namespace CloudSync
239 } // namespace FileManagement
240 } // namespace OHOS
241 #endif // OHOS_CLOUD_SYNC_SERVICE_TASK_H
242