/* * Copyright (c) 2023 Huawei Device Co., Ltd. * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "task.h" #include #include #include "sdk_helper.h" namespace OHOS { namespace FileManagement { namespace CloudSync { using namespace std; /* task runner */ TaskRunner::TaskRunner(function callback) : callback_(callback) { } TaskRunner::~TaskRunner() { } int32_t TaskRunner::GenerateTaskId() { return currentId_.fetch_add(1); } int32_t TaskRunner::AddTask(shared_ptr t) { unique_lock lock(mutex_); /* If stopped, no more tasks can be added */ if (stopFlag_) { LOGI("add task fail since stop"); return E_STOP; } /* insert task */ t->SetId(GenerateTaskId()); taskList_.emplace_back(t); return E_OK; } int32_t TaskRunner::StartTask(shared_ptr t, TaskAction action) { /* * Try to execute the previous callback even in stop process. * Yet no new task could be added in the callback. */ t->SetAction(action); int32_t ret = commitFunc_(shared_from_this(), t); if (ret != E_OK) { LOGE("commit task err %{public}d", ret); return ret; } return E_OK; } int32_t TaskRunner::CommitTask(shared_ptr t) { /* add task */ int32_t ret = AddTask(t); if (ret != E_OK) { LOGE("add task err %{public}d", ret); return ret; } /* launch */ ret = commitFunc_(shared_from_this(), t); if (ret != E_OK) { LOGE("commit task err %{public}d", ret); return ret; } return E_OK; } void TaskRunner::CompleteTask(int32_t id) { /* remove task */ unique_lock lock(mutex_); for (auto entry = taskList_.begin(); entry != taskList_.end();) { if (entry->get()->GetId() == id) { (void)taskList_.erase(entry); break; } else { entry++; } } if (taskList_.empty()) { if (stopFlag_) { /* if it has been stopped, notify the blocking caller */ stopFlag_ = false; cv_.notify_all(); } else { lock.unlock(); Reset(); /* otherwise just run its callback */ callback_(); } } } bool TaskRunner::StopAndWaitFor() { unique_lock lock(mutex_); LOGI("task manager stop"); if (taskList_.empty()) { return true; } stopFlag_ = true; return cv_.wait_for(lock, chrono::seconds(WAIT_FOR_SECOND), [this] { return taskList_.empty(); }); } void TaskRunner::Reset() { currentId_.store(0); } void TaskRunner::SetCommitFunc(function, shared_ptr)> func) { commitFunc_ = func; } void TaskRunner::CommitDummyTask() { auto task = make_shared(nullptr, nullptr); task->SetId(INVALID_ID); unique_lock lock(mutex_); taskList_.emplace_back(task); } void TaskRunner::CompleteDummyTask() { CompleteTask(INVALID_ID); } /* TaskManager */ TaskManager::TaskManager() { pool_.SetMaxTaskNum(MAX_THREAD_NUM); pool_.Start(MAX_THREAD_NUM); } TaskManager::~TaskManager() { pool_.Stop(); } shared_ptr TaskManager::AllocRunner(int32_t userId, const std::string &bundleName, function callback) { string key = GetKey(userId, bundleName); unique_lock wlock(mapMutex_); if (map_.find(key) == map_.end()) { auto runner = make_shared(callback); InitRunner(*runner); map_.insert({ key, runner }); } return map_[key]; } void TaskManager::ReleaseRunner(int32_t userId, const std::string &bundleName) { string key = GetKey(userId, bundleName); unique_lock wlock(mapMutex_); map_.erase(key); } shared_ptr TaskManager::GetRunner(int32_t userId, const std::string &bundleName) { string key = GetKey(userId, bundleName); shared_lock rlock(mapMutex_); if (map_.find(key) == map_.end()) { return nullptr; } return map_[key]; } void TaskManager::InitRunner(TaskRunner &runner) { runner.SetCommitFunc(bind(&TaskManager::CommitTask, this, placeholders::_1, placeholders::_2)); } int32_t TaskManager::CommitTask(shared_ptr runner, shared_ptr t) { pool_.AddTask([t, runner]() { t->Run(); runner->CompleteTask(t->GetId()); }); return E_OK; } string TaskManager::GetKey(int32_t userId, const string &bundleName) { return to_string(userId) + bundleName; } } // namespace CloudSync } // namespace FileManagement } // namespace OHOS