/* * Copyright (c) 2023 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef OHOS_CLOUD_SYNC_SERVICE_TASK_H #define OHOS_CLOUD_SYNC_SERVICE_TASK_H #include #include #include #include #include #include #include #include #include "thread_pool.h" #include "data_handler.h" #include "dfs_error.h" #include "sdk_helper.h" #include "utils_log.h" namespace OHOS { namespace FileManagement { namespace CloudSync { class TaskContext : public DriveKit::DKContext { public: TaskContext(std::shared_ptr handler) : handler_(handler), assets_(0) { } std::shared_ptr GetHandler() { return handler_; } void SetAssets(std::vector &assets) { assets_ = std::move(assets); } std::vector GetAssets() { return std::move(assets_); } private: std::shared_ptr handler_; /* not final, might put this in its derived class */ std::vector assets_; }; class DownloadTaskContext : public TaskContext { public: DownloadTaskContext(std::shared_ptr handler, int32_t batchNo) : TaskContext(std::move(handler)), batchNo_(batchNo) {} int32_t GetBatchNo() const { return batchNo_; } private: int32_t batchNo_; }; constexpr int32_t INVALID_ID = -1; using TaskAction = std::function)>; class Task { public: Task(std::shared_ptr context, TaskAction action) : id_(INVALID_ID), context_(context), action_(action) {} virtual ~Task() = default; virtual void Run() { action_(context_); } void SetAction(TaskAction action) { action_ = action; } void SetId(int32_t id) { id_ = id; } int32_t GetId() { return id_; } private: int32_t id_; std::shared_ptr context_; TaskAction action_; }; class TaskRunner : public std::enable_shared_from_this { public: TaskRunner(std::function callback); virtual ~TaskRunner(); /* async */ template int32_t AsyncRun(std::shared_ptr context, void(T::*f)(std::shared_ptr), T *ptr); template std::function AsyncCallback(RET(T::*f)(ARGS...), T *ptr); /* dummy */ void CommitDummyTask(); void CompleteDummyTask(); /* reset and stop */ void Reset(); bool ReleaseTask(); bool NeedRun(std::shared_ptr t); std::shared_ptr GetStopFlag(); void SetStopFlag(std::shared_ptr stopFlag); private: int32_t GenerateTaskId(); /* task operations */ int32_t AddTask(std::shared_ptr t); int32_t StartTask(std::shared_ptr t, TaskAction action); int32_t CommitTask(std::shared_ptr t); void CompleteTask(int32_t id); /* set commit func */ friend class TaskManager; void SetCommitFunc(std::function, std::shared_ptr)> func); std::function, std::shared_ptr)> commitFunc_; /* stop */ std::shared_ptr stopFlag_ = nullptr; /* id */ std::atomic currentId_ = 0; /* task list */ std::mutex mutex_; std::list> taskList_; /* data syncer callback */ std::function callback_; }; template inline int32_t TaskRunner::AsyncRun(std::shared_ptr context, void(T::*f)(std::shared_ptr), T *ptr) { std::shared_ptr task = std::make_shared(context, [ptr, f](std::shared_ptr ctx) { (ptr->*f)(ctx); } ); int32_t ret = CommitTask(task); if (ret != E_OK) { LOGE("async run commit task err %{public}d", ret); return ret; } return E_OK; } /* * About ARGS... * <1> async execute requires value-copy or shared_ptr like input parameters, * but no reference for lifecycle consideration. * <2> In addition, [=] requires the wrapped function with const parameters. */ template inline std::function TaskRunner::AsyncCallback(RET(T::*f)(ARGS...), T *ptr) { std::shared_ptr task = std::make_shared(nullptr, nullptr); int32_t ret = AddTask(task); if (ret != E_OK) { LOGE("async callback add task err %{public}d", ret); return nullptr; } return [ptr, f, task, this](ARGS... args) -> RET { int32_t ret = this->StartTask(task, [ptr, f, args...](std::shared_ptr) { (ptr->*f)(args...); }); if (ret != E_OK) { LOGE("async callback start task err %{public}d", ret); } }; } class TaskManager : public NoCopyable { DECLARE_DELAYED_SINGLETON(TaskManager); public: std::shared_ptr AllocRunner(int32_t userId, const std::string &bundleName, std::function callback); std::shared_ptr GetRunner(int32_t userId, const std::string &bundleName); void ReleaseRunner(int32_t userId, const std::string &bundleName); int32_t CommitTask(std::shared_ptr runner, std::shared_ptr t); private: std::string GetKey(int32_t userId, const std::string &bundleName); void InitRunner(TaskRunner &runner); /* runners */ std::unordered_map> map_; std::shared_mutex mapMutex_; /* thread pool */ ThreadPool pool_ = ThreadPool("TaskManager"); const int32_t MAX_THREAD_NUM = 12; }; } // namespace CloudSync } // namespace FileManagement } // namespace OHOS #endif // OHOS_CLOUD_SYNC_SERVICE_TASK_H