1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #ifndef OHOS_CLOUD_SYNC_SERVICE_TASK_H
17 #define OHOS_CLOUD_SYNC_SERVICE_TASK_H
18
19 #include <condition_variable>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <nocopyable.h>
24 #include <singleton.h>
25 #include <shared_mutex>
26 #include <vector>
27
28 #include "thread_pool.h"
29
30 #include "data_handler.h"
31 #include "dfs_error.h"
32 #include "sdk_helper.h"
33 #include "utils_log.h"
34
35 namespace OHOS {
36 namespace FileManagement {
37 namespace CloudSync {
38 class TaskContext : public DriveKit::DKContext {
39 public:
TaskContext(std::shared_ptr<DataHandler> handler)40 TaskContext(std::shared_ptr<DataHandler> handler) : handler_(handler) {}
41
GetHandler()42 std::shared_ptr<DataHandler> GetHandler()
43 {
44 return handler_;
45 }
46
47 private:
48 std::shared_ptr<DataHandler> handler_;
49 };
50
51 class DownloadTaskContext : public TaskContext {
52 public:
DownloadTaskContext(std::shared_ptr<DataHandler> handler,int32_t batchNo)53 DownloadTaskContext(std::shared_ptr<DataHandler> handler, int32_t batchNo)
54 : TaskContext(std::move(handler)), batchNo_(batchNo) {}
55
GetBatchNo()56 int32_t GetBatchNo() const
57 {
58 return batchNo_;
59 }
60
61 private:
62 int32_t batchNo_;
63 };
64
65 constexpr int32_t INVALID_ID = -1;
66
67 using TaskAction = std::function<void(std::shared_ptr<TaskContext>)>;
68 class Task {
69 public:
Task(std::shared_ptr<TaskContext> context,TaskAction action)70 Task(std::shared_ptr<TaskContext> context, TaskAction action) : id_(INVALID_ID),
71 context_(context), action_(action) {}
72 virtual ~Task() = default;
73
Run()74 virtual void Run()
75 {
76 action_(context_);
77 }
78
SetAction(TaskAction action)79 void SetAction(TaskAction action)
80 {
81 action_ = action;
82 }
83
SetId(int32_t id)84 void SetId(int32_t id)
85 {
86 id_ = id;
87 }
88
GetId()89 int32_t GetId()
90 {
91 return id_;
92 }
93
94 private:
95 int32_t id_;
96 std::shared_ptr<TaskContext> context_;
97 TaskAction action_;
98 };
99
100 class TaskRunner : public std::enable_shared_from_this<TaskRunner> {
101 public:
102 TaskRunner(std::function<void()> callback);
103 virtual ~TaskRunner();
104
105 /* async */
106 template<typename T>
107 int32_t AsyncRun(std::shared_ptr<TaskContext> context, void(T::*f)(std::shared_ptr<TaskContext>),
108 T *ptr);
109 template<typename T, typename RET, typename... ARGS>
110 std::function<RET(ARGS...)> AsyncCallback(RET(T::*f)(ARGS...), T *ptr);
111
112 /* dummy */
113 void CommitDummyTask();
114 void CompleteDummyTask();
115
116 /* reset and stop */
117 void Reset();
118 bool StopAndWaitFor();
119
120 private:
121 int32_t GenerateTaskId();
122
123 /* task operations */
124 int32_t AddTask(std::shared_ptr<Task> t);
125 int32_t StartTask(std::shared_ptr<Task> t, TaskAction action);
126 int32_t CommitTask(std::shared_ptr<Task> t);
127 void CompleteTask(int32_t id);
128
129 /* set commit func */
130 friend class TaskManager;
131 void SetCommitFunc(std::function<int32_t(std::shared_ptr<TaskRunner>,
132 std::shared_ptr<Task>)> func);
133 std::function<int32_t(std::shared_ptr<TaskRunner>, std::shared_ptr<Task>)> commitFunc_;
134
135 /* stop */
136 bool stopFlag_ = false;
137 const int32_t WAIT_FOR_SECOND = 30;
138
139 /* id */
140 std::atomic<int32_t> currentId_ = 0;
141
142 /* task list */
143 std::mutex mutex_;
144 std::list<std::shared_ptr<Task>> taskList_;
145 std::condition_variable cv_;
146
147 /* data syncer callback */
148 std::function<void()> callback_;
149 };
150
151 template<typename T>
AsyncRun(std::shared_ptr<TaskContext> context,void (T::* f)(std::shared_ptr<TaskContext>),T * ptr)152 inline int32_t TaskRunner::AsyncRun(std::shared_ptr<TaskContext> context,
153 void(T::*f)(std::shared_ptr<TaskContext>), T *ptr)
154 {
155 std::shared_ptr<Task> task = std::make_shared<Task>(context,
156 [ptr, f](std::shared_ptr<TaskContext> ctx) {
157 (ptr->*f)(ctx);
158 }
159 );
160
161 int32_t ret = CommitTask(task);
162 if (ret != E_OK) {
163 LOGE("async run commit task err %{public}d", ret);
164 return ret;
165 }
166
167 return E_OK;
168 }
169
170 /*
171 * About ARGS...
172 * <1> async execute requires value-copy or shared_ptr like input parameters,
173 * but no reference for lifecycle consideration.
174 * <2> In addition, [=] requires the wrapped function with const parameters.
175 */
176 template<typename T, typename RET, typename... ARGS>
AsyncCallback(RET (T::* f)(ARGS...),T * ptr)177 inline std::function<RET(ARGS...)> TaskRunner::AsyncCallback(RET(T::*f)(ARGS...), T *ptr)
178 {
179 std::shared_ptr<Task> task = std::make_shared<Task>(nullptr, nullptr);
180
181 int32_t ret = AddTask(task);
182 if (ret != E_OK) {
183 LOGE("async callback add task err %{public}d", ret);
184 return nullptr;
185 }
186
187 return [ptr, f, task, this](ARGS... args) -> RET {
188 int32_t ret = this->StartTask(task, [ptr, f, args...](std::shared_ptr<TaskContext>) {
189 (ptr->*f)(args...);
190 });
191 if (ret != E_OK) {
192 LOGE("async callback start task err %{public}d", ret);
193 }
194 };
195 }
196
197 class TaskManager : public NoCopyable {
198 DECLARE_DELAYED_SINGLETON(TaskManager);
199
200 public:
201 std::shared_ptr<TaskRunner> AllocRunner(int32_t userId, const std::string &bundleName,
202 std::function<void()> callback);
203 std::shared_ptr<TaskRunner> GetRunner(int32_t userId, const std::string &bundleName);
204 void ReleaseRunner(int32_t userId, const std::string &bundleName);
205
206 int32_t CommitTask(std::shared_ptr<TaskRunner> runner, std::shared_ptr<Task> t);
207
208 private:
209 std::string GetKey(int32_t userId, const std::string &bundleName);
210 void InitRunner(TaskRunner &runner);
211
212 /* runners */
213 std::unordered_map<std::string, std::shared_ptr<TaskRunner>> map_;
214 std::shared_mutex mapMutex_;
215
216 /* thread pool */
217 ThreadPool pool_ = ThreadPool("TaskManager");
218 const int32_t MAX_THREAD_NUM = 12;
219 };
220 } // namespace CloudSync
221 } // namespace FileManagement
222 } // namespace OHOS
223 #endif // OHOS_CLOUD_SYNC_SERVICE_TASK_H
224