1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "async_runner_manager.h"
17
18 #include <cinttypes>
19
20 #include "helper/error_helper.h"
21 #include "task_manager.h"
22 #include "tools/log.h"
23
24 namespace Commonlibrary::Concurrent::TaskPoolModule {
25
GetInstance()26 AsyncRunnerManager& AsyncRunnerManager::GetInstance()
27 {
28 static AsyncRunnerManager asyncRunnerManager;
29 return asyncRunnerManager;
30 }
31
CreateOrGetGlobalRunner(napi_env env,napi_value thisVar,const std::string & name,uint32_t runningCapacity,uint32_t waitingCapacity)32 AsyncRunner* AsyncRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, const std::string& name,
33 uint32_t runningCapacity, uint32_t waitingCapacity)
34 {
35 AsyncRunner *asyncRunner = nullptr;
36 uint64_t asyncRunnerId = 0;
37 {
38 std::unique_lock<std::mutex> lock(globalAsyncRunnerMutex_);
39 auto iter = globalAsyncRunner_.find(name);
40 if (iter == globalAsyncRunner_.end()) {
41 asyncRunner = AsyncRunner::CreateGlobalRunner(name, runningCapacity, waitingCapacity);
42 globalAsyncRunner_.emplace(name, asyncRunner);
43 return asyncRunner;
44 } else {
45 asyncRunner = iter->second;
46 bool res = asyncRunner->CheckGlobalRunnerParams(env, runningCapacity, waitingCapacity);
47 if (!res) {
48 return nullptr;
49 }
50 asyncRunnerId = asyncRunner->asyncRunnerId_;
51 }
52 }
53 if (!FindRunnerAndRef(asyncRunnerId)) {
54 return nullptr;
55 }
56
57 return asyncRunner;
58 }
59
StoreAsyncRunner(uint64_t asyncRunnerId,AsyncRunner * asyncRunner)60 void AsyncRunnerManager::StoreAsyncRunner(uint64_t asyncRunnerId, AsyncRunner* asyncRunner)
61 {
62 std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
63 asyncRunners_.emplace(asyncRunnerId, asyncRunner);
64 }
65
RemoveAsyncRunner(uint64_t asyncRunnerId)66 void AsyncRunnerManager::RemoveAsyncRunner(uint64_t asyncRunnerId)
67 {
68 asyncRunners_.erase(asyncRunnerId);
69 }
70
GetAsyncRunner(uint64_t asyncRunnerId)71 AsyncRunner* AsyncRunnerManager::GetAsyncRunner(uint64_t asyncRunnerId)
72 {
73 std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
74 auto iter = asyncRunners_.find(asyncRunnerId);
75 if (iter != asyncRunners_.end()) {
76 return iter->second;
77 }
78 return nullptr;
79 }
80
TriggerAsyncRunner(napi_env env,Task * lastTask)81 bool AsyncRunnerManager::TriggerAsyncRunner(napi_env env, Task* lastTask)
82 {
83 uint64_t asyncRunnerId = lastTask->asyncRunnerId_;
84 AsyncRunner* asyncRunner = GetAsyncRunner(asyncRunnerId);
85 if (asyncRunner == nullptr) {
86 HILOG_ERROR("taskpool:: trigger asyncRunner not exist.");
87 return false;
88 }
89 if (UnrefAndDestroyRunner(asyncRunner)) {
90 HILOG_ERROR("taskpool:: trigger asyncRunner is remove.");
91 return false;
92 }
93 asyncRunner->TriggerWaitingTask();
94 return true;
95 }
96
RemoveGlobalAsyncRunner(const std::string & name)97 void AsyncRunnerManager::RemoveGlobalAsyncRunner(const std::string& name)
98 {
99 std::unique_lock<std::mutex> lock(globalAsyncRunnerMutex_);
100 auto iter = globalAsyncRunner_.find(name);
101 if (iter != globalAsyncRunner_.end()) {
102 globalAsyncRunner_.erase(iter);
103 }
104 }
105
CancelAsyncRunnerTask(napi_env env,Task * task)106 void AsyncRunnerManager::CancelAsyncRunnerTask(napi_env env, Task* task)
107 {
108 std::string errMsg = "";
109 ExecuteState state = ExecuteState::NOT_FOUND;
110 {
111 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
112 if (task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
113 errMsg = "AsyncRunner task has been executed.";
114 HILOG_ERROR("taskpool:: %{public}s", errMsg.c_str());
115 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
116 return;
117 }
118
119 state = task->taskState_.exchange(ExecuteState::CANCELED);
120 }
121 task->CancelPendingTask(env);
122 auto asyncRunner = GetAsyncRunner(task->asyncRunnerId_);
123 if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr &&
124 TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) {
125 task->DecreaseTaskLifecycleCount();
126 TaskManager::GetInstance().DecreaseSendDataRefCount(task->env_, task->taskId_);
127 if (asyncRunner != nullptr) {
128 asyncRunner->TriggerRejectErrorTimer(task, ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED, true);
129 }
130 TriggerAsyncRunner(env, task);
131 }
132
133 if (asyncRunner != nullptr) {
134 asyncRunner->RemoveWaitingTask(task);
135 }
136 }
137
RemoveWaitingTask(Task * task)138 void AsyncRunnerManager::RemoveWaitingTask(Task* task)
139 {
140 auto asyncRunner = GetAsyncRunner(task->asyncRunnerId_);
141 if (asyncRunner != nullptr) {
142 asyncRunner->RemoveWaitingTask(task, false);
143 }
144 }
145
FindRunnerAndRef(uint64_t asyncRunnerId)146 bool AsyncRunnerManager::FindRunnerAndRef(uint64_t asyncRunnerId)
147 {
148 std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
149 auto iter = asyncRunners_.find(asyncRunnerId);
150 if (iter == asyncRunners_.end()) {
151 HILOG_ERROR("taskpool:: asyncRunner not exist.");
152 return false;
153 }
154 iter->second->IncreaseAsyncCount();
155 return true;
156 }
157
UnrefAndDestroyRunner(AsyncRunner * asyncRunner)158 bool AsyncRunnerManager::UnrefAndDestroyRunner(AsyncRunner* asyncRunner)
159 {
160 {
161 std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
162 if (asyncRunner->DecreaseAsyncCount() != 0) {
163 return false;
164 }
165 RemoveAsyncRunner(asyncRunner->asyncRunnerId_);
166 }
167 if (asyncRunner->isGlobalRunner_) {
168 RemoveGlobalAsyncRunner(asyncRunner->name_);
169 }
170 delete asyncRunner;
171 asyncRunner = nullptr;
172 return true;
173 }
174
DecreaseRunningCount(uint64_t asyncRunnerId)175 void AsyncRunnerManager::DecreaseRunningCount(uint64_t asyncRunnerId)
176 {
177 std::unique_lock<std::mutex> lock(asyncRunnersMutex_);
178 auto iter = asyncRunners_.find(asyncRunnerId);
179 if (iter == asyncRunners_.end()) {
180 return;
181 }
182 iter->second->DecreaseRunningCount();
183 }
184 } // namespace Commonlibrary::Concurrent::TaskPoolModule