• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #ifndef OHOS_CLOUD_SYNC_SERVICE_TASK_H
17 #define OHOS_CLOUD_SYNC_SERVICE_TASK_H
18 
19 #include <condition_variable>
20 #include <list>
21 #include <memory>
22 #include <mutex>
23 #include <nocopyable.h>
24 #include <singleton.h>
25 #include <shared_mutex>
26 #include <vector>
27 
28 #include "thread_pool.h"
29 
30 #include "data_handler.h"
31 #include "dfs_error.h"
32 #include "sdk_helper.h"
33 #include "utils_log.h"
34 
35 namespace OHOS {
36 namespace FileManagement {
37 namespace CloudSync {
38 class TaskContext : public DriveKit::DKContext {
39 public:
TaskContext(std::shared_ptr<DataHandler> handler)40     TaskContext(std::shared_ptr<DataHandler> handler) : handler_(handler) {}
41 
GetHandler()42     std::shared_ptr<DataHandler> GetHandler()
43     {
44         return handler_;
45     }
46 
47 private:
48     std::shared_ptr<DataHandler> handler_;
49 };
50 
51 class DownloadTaskContext : public TaskContext {
52 public:
DownloadTaskContext(std::shared_ptr<DataHandler> handler,int32_t batchNo)53     DownloadTaskContext(std::shared_ptr<DataHandler> handler, int32_t batchNo)
54         : TaskContext(std::move(handler)), batchNo_(batchNo) {}
55 
GetBatchNo()56     int32_t GetBatchNo() const
57     {
58         return batchNo_;
59     }
60 
61 private:
62     int32_t batchNo_;
63 };
64 
65 constexpr int32_t INVALID_ID = -1;
66 
67 using TaskAction = std::function<void(std::shared_ptr<TaskContext>)>;
68 class Task {
69 public:
Task(std::shared_ptr<TaskContext> context,TaskAction action)70     Task(std::shared_ptr<TaskContext> context, TaskAction action) : id_(INVALID_ID),
71         context_(context), action_(action) {}
72     virtual ~Task() = default;
73 
Run()74     virtual void Run()
75     {
76         action_(context_);
77     }
78 
SetAction(TaskAction action)79     void SetAction(TaskAction action)
80     {
81         action_ = action;
82     }
83 
SetId(int32_t id)84     void SetId(int32_t id)
85     {
86         id_ = id;
87     }
88 
GetId()89     int32_t GetId()
90     {
91         return id_;
92     }
93 
94 private:
95     int32_t id_;
96     std::shared_ptr<TaskContext> context_;
97     TaskAction action_;
98 };
99 
100 class TaskRunner : public std::enable_shared_from_this<TaskRunner> {
101 public:
102     TaskRunner(std::function<void()> callback);
103     virtual ~TaskRunner();
104 
105     /* async */
106     template<typename T>
107     int32_t AsyncRun(std::shared_ptr<TaskContext> context, void(T::*f)(std::shared_ptr<TaskContext>),
108         T *ptr);
109     template<typename T, typename RET, typename... ARGS>
110     std::function<RET(ARGS...)> AsyncCallback(RET(T::*f)(ARGS...), T *ptr);
111 
112     /* dummy */
113     void CommitDummyTask();
114     void CompleteDummyTask();
115 
116     /* reset and stop */
117     void Reset();
118     bool StopAndWaitFor();
119 
120 private:
121     int32_t GenerateTaskId();
122 
123     /* task operations */
124     int32_t AddTask(std::shared_ptr<Task> t);
125     int32_t StartTask(std::shared_ptr<Task> t, TaskAction action);
126     int32_t CommitTask(std::shared_ptr<Task> t);
127     void CompleteTask(int32_t id);
128 
129     /* set commit func */
130     friend class TaskManager;
131     void SetCommitFunc(std::function<int32_t(std::shared_ptr<TaskRunner>,
132         std::shared_ptr<Task>)> func);
133     std::function<int32_t(std::shared_ptr<TaskRunner>, std::shared_ptr<Task>)> commitFunc_;
134 
135     /* stop */
136     bool stopFlag_ = false;
137     const int32_t WAIT_FOR_SECOND = 30;
138 
139     /* id */
140     std::atomic<int32_t> currentId_ = 0;
141 
142     /* task list */
143     std::mutex mutex_;
144     std::list<std::shared_ptr<Task>> taskList_;
145     std::condition_variable cv_;
146 
147     /* data syncer callback */
148     std::function<void()> callback_;
149 };
150 
151 template<typename T>
AsyncRun(std::shared_ptr<TaskContext> context,void (T::* f)(std::shared_ptr<TaskContext>),T * ptr)152 inline int32_t TaskRunner::AsyncRun(std::shared_ptr<TaskContext> context,
153     void(T::*f)(std::shared_ptr<TaskContext>), T *ptr)
154 {
155     std::shared_ptr<Task> task = std::make_shared<Task>(context,
156         [ptr, f](std::shared_ptr<TaskContext> ctx) {
157             (ptr->*f)(ctx);
158         }
159     );
160 
161     int32_t ret = CommitTask(task);
162     if (ret != E_OK) {
163         LOGE("async run commit task err %{public}d", ret);
164         return ret;
165     }
166 
167     return E_OK;
168 }
169 
170 /*
171  * About ARGS...
172  * <1> async execute requires value-copy or shared_ptr like input parameters,
173  *     but no reference for lifecycle consideration.
174  * <2> In addition, [=] requires the wrapped function with const parameters.
175  */
176 template<typename T, typename RET, typename... ARGS>
AsyncCallback(RET (T::* f)(ARGS...),T * ptr)177 inline std::function<RET(ARGS...)> TaskRunner::AsyncCallback(RET(T::*f)(ARGS...), T *ptr)
178 {
179     std::shared_ptr<Task> task = std::make_shared<Task>(nullptr, nullptr);
180 
181     int32_t ret = AddTask(task);
182     if (ret != E_OK) {
183         LOGE("async callback add task err %{public}d", ret);
184         return nullptr;
185     }
186 
187     return [ptr, f, task, this](ARGS... args) -> RET {
188         int32_t ret = this->StartTask(task, [ptr, f, args...](std::shared_ptr<TaskContext>) {
189             (ptr->*f)(args...);
190         });
191         if (ret != E_OK) {
192             LOGE("async callback start task err %{public}d", ret);
193         }
194     };
195 }
196 
197 class TaskManager : public NoCopyable {
198 DECLARE_DELAYED_SINGLETON(TaskManager);
199 
200 public:
201     std::shared_ptr<TaskRunner> AllocRunner(int32_t userId, const std::string &bundleName,
202         std::function<void()> callback);
203     std::shared_ptr<TaskRunner> GetRunner(int32_t userId, const std::string &bundleName);
204     void ReleaseRunner(int32_t userId, const std::string &bundleName);
205 
206     int32_t CommitTask(std::shared_ptr<TaskRunner> runner, std::shared_ptr<Task> t);
207 
208 private:
209     std::string GetKey(int32_t userId, const std::string &bundleName);
210     void InitRunner(TaskRunner &runner);
211 
212     /* runners */
213     std::unordered_map<std::string, std::shared_ptr<TaskRunner>> map_;
214     std::shared_mutex mapMutex_;
215 
216     /* thread pool */
217     ThreadPool pool_ = ThreadPool("TaskManager");
218     const int32_t MAX_THREAD_NUM = 12;
219 };
220 } // namespace CloudSync
221 } // namespace FileManagement
222 } // namespace OHOS
223 #endif // OHOS_CLOUD_SYNC_SERVICE_TASK_H
224