• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_CLOUD_SYNC_SERVICE_TASK_H
17 #define OHOS_CLOUD_SYNC_SERVICE_TASK_H
18 
19 #include <condition_variable>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <nocopyable.h>
24 #include <singleton.h>
25 #include <shared_mutex>
26 #include <vector>
27 
28 #include "thread_pool.h"
29 
30 #include "data_handler.h"
31 #include "dfs_error.h"
32 #include "sdk_helper.h"
33 #include "utils_log.h"
34 
35 namespace OHOS {
36 namespace FileManagement {
37 namespace CloudSync {
38 class TaskContext : public DriveKit::DKContext {
39 public:
TaskContext(std::shared_ptr<DataHandler> handler)40     TaskContext(std::shared_ptr<DataHandler> handler) : handler_(handler),
41         assets_(0)
42     {
43     }
44 
GetHandler()45     std::shared_ptr<DataHandler> GetHandler()
46     {
47         return handler_;
48     }
49 
SetAssets(std::vector<DriveKit::DKDownloadAsset> & assets)50     void SetAssets(std::vector<DriveKit::DKDownloadAsset> &assets)
51     {
52         assets_ = std::move(assets);
53     }
54 
GetAssets()55     std::vector<DriveKit::DKDownloadAsset> GetAssets()
56     {
57         return std::move(assets_);
58     }
59 
60 private:
61     std::shared_ptr<DataHandler> handler_;
62     /* not final, might put this in its derived class */
63     std::vector<DriveKit::DKDownloadAsset> assets_;
64 };
65 
66 class DownloadTaskContext : public TaskContext {
67 public:
DownloadTaskContext(std::shared_ptr<DataHandler> handler,int32_t batchNo)68     DownloadTaskContext(std::shared_ptr<DataHandler> handler, int32_t batchNo)
69         : TaskContext(std::move(handler)), batchNo_(batchNo) {}
70 
GetBatchNo()71     int32_t GetBatchNo() const
72     {
73         return batchNo_;
74     }
75 
76 private:
77     int32_t batchNo_;
78 };
79 
80 constexpr int32_t INVALID_ID = -1;
81 
82 using TaskAction = std::function<void(std::shared_ptr<TaskContext>)>;
83 class Task {
84 public:
Task(std::shared_ptr<TaskContext> context,TaskAction action)85     Task(std::shared_ptr<TaskContext> context, TaskAction action) : id_(INVALID_ID),
86         context_(context), action_(action) {}
87     virtual ~Task() = default;
88 
Run()89     virtual void Run()
90     {
91         action_(context_);
92     }
93 
SetAction(TaskAction action)94     void SetAction(TaskAction action)
95     {
96         action_ = action;
97     }
98 
SetId(int32_t id)99     void SetId(int32_t id)
100     {
101         id_ = id;
102     }
103 
GetId()104     int32_t GetId()
105     {
106         return id_;
107     }
108 
109 private:
110     int32_t id_;
111     std::shared_ptr<TaskContext> context_;
112     TaskAction action_;
113 };
114 
115 class TaskRunner : public std::enable_shared_from_this<TaskRunner> {
116 public:
117     TaskRunner(std::function<void()> callback);
118     virtual ~TaskRunner();
119 
120     /* async */
121     template<typename T>
122     int32_t AsyncRun(std::shared_ptr<TaskContext> context, void(T::*f)(std::shared_ptr<TaskContext>),
123         T *ptr);
124     template<typename T, typename RET, typename... ARGS>
125     std::function<RET(ARGS...)> AsyncCallback(RET(T::*f)(ARGS...), T *ptr);
126 
127     /* dummy */
128     void CommitDummyTask();
129     void CompleteDummyTask();
130 
131     /* reset and stop */
132     void Reset();
133     bool ReleaseTask();
134 
135     bool NeedRun(std::shared_ptr<Task> t);
136 
137     std::shared_ptr<bool> GetStopFlag();
138     void SetStopFlag(std::shared_ptr<bool> stopFlag);
139 
140 private:
141     int32_t GenerateTaskId();
142 
143     /* task operations */
144     int32_t AddTask(std::shared_ptr<Task> t);
145     int32_t StartTask(std::shared_ptr<Task> t, TaskAction action);
146     int32_t CommitTask(std::shared_ptr<Task> t);
147     void CompleteTask(int32_t id);
148 
149     /* set commit func */
150     friend class TaskManager;
151     void SetCommitFunc(std::function<int32_t(std::shared_ptr<TaskRunner>,
152         std::shared_ptr<Task>)> func);
153     std::function<int32_t(std::shared_ptr<TaskRunner>, std::shared_ptr<Task>)> commitFunc_;
154 
155     /* stop */
156     std::shared_ptr<bool> stopFlag_ = nullptr;
157 
158     /* id */
159     std::atomic<int32_t> currentId_ = 0;
160 
161     /* task list */
162     std::mutex mutex_;
163     std::list<std::shared_ptr<Task>> taskList_;
164 
165     /* data syncer callback */
166     std::function<void()> callback_;
167 };
168 
169 template<typename T>
AsyncRun(std::shared_ptr<TaskContext> context,void (T::* f)(std::shared_ptr<TaskContext>),T * ptr)170 inline int32_t TaskRunner::AsyncRun(std::shared_ptr<TaskContext> context,
171     void(T::*f)(std::shared_ptr<TaskContext>), T *ptr)
172 {
173     std::shared_ptr<Task> task = std::make_shared<Task>(context,
174         [ptr, f](std::shared_ptr<TaskContext> ctx) {
175             (ptr->*f)(ctx);
176         }
177     );
178 
179     int32_t ret = CommitTask(task);
180     if (ret != E_OK) {
181         LOGE("async run commit task err %{public}d", ret);
182         return ret;
183     }
184 
185     return E_OK;
186 }
187 
188 /*
189  * About ARGS...
190  * <1> async execute requires value-copy or shared_ptr like input parameters,
191  *     but no reference for lifecycle consideration.
192  * <2> In addition, [=] requires the wrapped function with const parameters.
193  */
194 template<typename T, typename RET, typename... ARGS>
AsyncCallback(RET (T::* f)(ARGS...),T * ptr)195 inline std::function<RET(ARGS...)> TaskRunner::AsyncCallback(RET(T::*f)(ARGS...), T *ptr)
196 {
197     std::shared_ptr<Task> task = std::make_shared<Task>(nullptr, nullptr);
198 
199     int32_t ret = AddTask(task);
200     if (ret != E_OK) {
201         LOGE("async callback add task err %{public}d", ret);
202         return nullptr;
203     }
204 
205     return [ptr, f, task, this](ARGS... args) -> RET {
206         int32_t ret = this->StartTask(task, [ptr, f, args...](std::shared_ptr<TaskContext>) {
207             (ptr->*f)(args...);
208         });
209         if (ret != E_OK) {
210             LOGE("async callback start task err %{public}d", ret);
211         }
212     };
213 }
214 
215 class TaskManager : public NoCopyable {
216 DECLARE_DELAYED_SINGLETON(TaskManager);
217 
218 public:
219     std::shared_ptr<TaskRunner> AllocRunner(int32_t userId, const std::string &bundleName,
220         std::function<void()> callback);
221     std::shared_ptr<TaskRunner> GetRunner(int32_t userId, const std::string &bundleName);
222     void ReleaseRunner(int32_t userId, const std::string &bundleName);
223 
224     int32_t CommitTask(std::shared_ptr<TaskRunner> runner, std::shared_ptr<Task> t);
225 
226 private:
227     std::string GetKey(int32_t userId, const std::string &bundleName);
228     void InitRunner(TaskRunner &runner);
229 
230     /* runners */
231     std::unordered_map<std::string, std::shared_ptr<TaskRunner>> map_;
232     std::shared_mutex mapMutex_;
233 
234     /* thread pool */
235     ThreadPool pool_ = ThreadPool("TaskManager");
236     const int32_t MAX_THREAD_NUM = 12;
237 };
238 } // namespace CloudSync
239 } // namespace FileManagement
240 } // namespace OHOS
241 #endif // OHOS_CLOUD_SYNC_SERVICE_TASK_H
242