1 /*
2 * Copyright (c) 2024 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15 #include "async_runner.h"
16
17 #include <cinttypes>
18
19 #include "async_runner_manager.h"
20 #include "helper/error_helper.h"
21 #include "helper/napi_helper.h"
22 #include "task_manager.h"
23 #include "tools/log.h"
24
25 namespace Commonlibrary::Concurrent::TaskPoolModule {
26 using namespace Commonlibrary::Concurrent::Common::Helper;
27 static constexpr char EXECUTE_STR[] = "execute";
28
AsyncRunnerConstructor(napi_env env,napi_callback_info cbinfo)29 napi_value AsyncRunner::AsyncRunnerConstructor(napi_env env, napi_callback_info cbinfo)
30 {
31 size_t argc = 3; // 3 : name, runningCapacity, waitingCapacity
32 napi_value args[3];
33 napi_value thisVar = nullptr;
34 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
35 napi_value runningCapacity = nullptr;
36 napi_value name = nullptr;
37 napi_value waitingCapacity = nullptr;
38 if (argc == 3) { // 3: AsyncRunner(name, runningCapacity, waitingCapacity)
39 name = args[0];
40 runningCapacity = args[1]; // 1: the index of argument runningCapacity
41 waitingCapacity = args[2]; // 2: the index of argument waitingCapacity
42 } else if (argc == 2) { // 2: AsyncRunner(name, runningCapacity) or AsyncRunner(runningCapacity, waitingCapacity)
43 if (NapiHelper::IsString(env, args[0])) {
44 name = args[0];
45 runningCapacity = args[1];
46 } else {
47 runningCapacity = args[0];
48 waitingCapacity = args[1];
49 }
50 } else if (argc == 1) { // 1: AsyncRunner(runningCapacity)
51 runningCapacity = args[0];
52 } else {
53 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "The numbers of params not more than three or less one.");
54 return nullptr;
55 }
56 AsyncRunner* asyncRunner = CheckAndCreateAsyncRunner(env, thisVar, name, runningCapacity, waitingCapacity);
57 if (asyncRunner == nullptr) {
58 HILOG_ERROR("taskpool:: create asyncRunner failed.");
59 return nullptr;
60 }
61
62 if (AsyncRunnerConstructorInner(env, thisVar, asyncRunner)) {
63 return thisVar;
64 }
65 return nullptr;
66 }
67
AsyncRunnerConstructorInner(napi_env env,napi_value & thisVar,AsyncRunner * asyncRunner)68 bool AsyncRunner::AsyncRunnerConstructorInner(napi_env env, napi_value& thisVar, AsyncRunner* asyncRunner)
69 {
70 uint64_t asyncRunnerId = reinterpret_cast<uint64_t>(asyncRunner);
71 asyncRunner->asyncRunnerId_ = asyncRunnerId;
72 napi_value napiAsyncRunnerId = NapiHelper::CreateUint64(env, asyncRunnerId);
73 AsyncRunnerManager::GetInstance().StoreAsyncRunner(asyncRunnerId, asyncRunner);
74 napi_property_descriptor properties[] = {
75 DECLARE_NAPI_FUNCTION(EXECUTE_STR, Execute),
76 };
77 napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties);
78 HILOG_INFO("taskpool:: construct asyncRunner name is %{public}s, asyncRunnerId %{public}s.",
79 asyncRunner->name_.c_str(), std::to_string(asyncRunnerId).c_str());
80 napi_status status = napi_wrap(env, thisVar, asyncRunner, AsyncRunnerDestructor, nullptr, nullptr);
81 if (status != napi_ok) {
82 HILOG_ERROR("taskpool:: AsyncRunnerConstructorInner napi_wrap return value is %{public}d.", status);
83 AsyncRunnerDestructor(env, asyncRunner, nullptr);
84 return false;
85 }
86 return true;
87 }
88
Execute(napi_env env,napi_callback_info cbinfo)89 napi_value AsyncRunner::Execute(napi_env env, napi_callback_info cbinfo)
90 {
91 size_t argc = 2; // 2 : task, Priority
92 napi_value args[2];
93 napi_value thisVar;
94 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
95 if (argc < 1 || argc > 2) { // 2 : task, Priority
96 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "The numbers of params not more than two or less one.");
97 return nullptr;
98 }
99 napi_value napiTask = args[0];
100 napi_value napiPriority = nullptr;
101 if (argc > 1) {
102 napiPriority = args[1];
103 }
104 if (!CheckExecuteArgs(env, napiTask, napiPriority)) {
105 return nullptr;
106 }
107
108 AsyncRunner* asyncRunner = nullptr;
109 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&asyncRunner));
110 if (asyncRunner == nullptr) {
111 return nullptr;
112 }
113 Task* task = nullptr;
114 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
115 if (task == nullptr) {
116 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "The type of param must be task.");
117 return nullptr;
118 }
119 if (!task->CanForAsyncRunner(env)) {
120 return nullptr;
121 }
122 if (napiPriority != nullptr) {
123 uint32_t priority = NapiHelper::GetUint32Value(env, napiPriority);
124 task->asyncTaskPriority_ = static_cast<Priority>(priority);
125 }
126 task->asyncRunnerId_ = asyncRunner->asyncRunnerId_;
127 napi_value promise = task->GetTaskInfoPromise(env, napiTask, TaskType::ASYNCRUNNER_TASK, task->asyncTaskPriority_);
128 if (promise == nullptr) {
129 return nullptr;
130 }
131 if (!AsyncRunnerManager::GetInstance().FindRunnerAndRef(asyncRunner->asyncRunnerId_)) {
132 return nullptr;
133 }
134 if (!AddTasksToAsyncRunner(asyncRunner, task)) {
135 ExecuteTaskImmediately(asyncRunner, task);
136 }
137
138 return promise;
139 }
140
CheckAndCreateAsyncRunner(napi_env env,napi_value & thisVar,napi_value name,napi_value runningCapacity,napi_value waitingCapacity)141 AsyncRunner* AsyncRunner::CheckAndCreateAsyncRunner(napi_env env, napi_value& thisVar, napi_value name,
142 napi_value runningCapacity, napi_value waitingCapacity)
143 {
144 std::string nameValue = "";
145 std::uint32_t runningCapacityVal = 0;
146 std::int32_t waitingCapacityValue = 0;
147 if (name != nullptr) {
148 if (!NapiHelper::IsString(env, name)) {
149 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "Name must be string.");
150 return nullptr;
151 }
152 nameValue = NapiHelper::GetString(env, name);
153 }
154 if (runningCapacity == nullptr) {
155 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "RunningCapacity cannot be empty.");
156 return nullptr;
157 }
158 if (!NapiHelper::IsNumber(env, runningCapacity)) {
159 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "RunningCapacity must be number.");
160 return nullptr;
161 }
162 runningCapacityVal = NapiHelper::GetUint32Value(env, runningCapacity);
163 if (runningCapacityVal < 1) {
164 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "RunningCapacity must be greater than zero.");
165 return nullptr;
166 }
167 if (waitingCapacity != nullptr) {
168 if (!NapiHelper::IsNumber(env, waitingCapacity)) {
169 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "WaitingCapacity must be number.");
170 return nullptr;
171 }
172 waitingCapacityValue = NapiHelper::GetInt32Value(env, waitingCapacity);
173 if (waitingCapacityValue < 0) {
174 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
175 "WaitingCapacity must be greater than or equal zero.");
176 return nullptr;
177 }
178 }
179 AsyncRunner* asyncRunner = nullptr;
180 std::uint32_t waitingCapacityVal = static_cast<std::uint32_t>(waitingCapacityValue);
181 if (nameValue != "") {
182 asyncRunner = AsyncRunnerManager::GetInstance().CreateOrGetGlobalRunner(env, thisVar, nameValue,
183 runningCapacityVal, waitingCapacityVal);
184 } else {
185 asyncRunner = new AsyncRunner();
186 asyncRunner->runningCapacity_ = runningCapacityVal;
187 asyncRunner->waitingCapacity_ = waitingCapacityVal;
188 }
189 return asyncRunner;
190 }
191
CheckExecuteArgs(napi_env env,napi_value napiTask,napi_value napiPriority)192 bool AsyncRunner::CheckExecuteArgs(napi_env env, napi_value napiTask, napi_value napiPriority)
193 {
194 if (!NapiHelper::IsObject(env, napiTask) || !NapiHelper::HasNameProperty(env, napiTask, TASKID_STR)) {
195 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "First param must be task.");
196 return false;
197 }
198 if (napiPriority != nullptr) {
199 if (!NapiHelper::IsNumber(env, napiPriority)) {
200 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "The type of the second param must be number.");
201 return false;
202 }
203 uint32_t priority = NapiHelper::GetUint32Value(env, napiPriority);
204 if (priority >= Priority::NUMBER) {
205 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "Priority value is error.");
206 return false;
207 }
208 }
209 return true;
210 }
211
ExecuteTaskImmediately(AsyncRunner * asyncRunner,Task * task)212 void AsyncRunner::ExecuteTaskImmediately(AsyncRunner* asyncRunner, Task* task)
213 {
214 HILOG_DEBUG("taskpool:: task %{public}s in asyncRunner %{public}s immediately.",
215 std::to_string(task->taskId_).c_str(), std::to_string(asyncRunner->asyncRunnerId_).c_str());
216 task->IncreaseRefCount();
217 TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
218 task->taskState_ = ExecuteState::WAITING;
219 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->asyncTaskPriority_);
220 }
221
AddTasksToAsyncRunner(AsyncRunner * asyncRunner,Task * task)222 bool AsyncRunner::AddTasksToAsyncRunner(AsyncRunner* asyncRunner, Task* task)
223 {
224 Task* frontTask = nullptr;
225 {
226 std::unique_lock<std::shared_mutex> asyncRunnerLock(asyncRunner->waitingTasksMutex_);
227 if (asyncRunner->runningCount_ < asyncRunner->runningCapacity_) {
228 asyncRunner->runningCount_.fetch_add(1);
229 return false;
230 }
231 if (asyncRunner->waitingCapacity_ && asyncRunner->waitingTasks_.size() == asyncRunner->waitingCapacity_) {
232 frontTask = asyncRunner->waitingTasks_.front();
233 asyncRunner->waitingTasks_.pop_front();
234 frontTask->taskState_ = ExecuteState::CANCELED;
235 }
236 asyncRunner->waitingTasks_.push_back(task);
237 }
238
239 if (frontTask != nullptr) {
240 asyncRunner->TriggerRejectErrorTimer(frontTask, ErrorHelper::ERR_ASYNCRUNNER_TASK_DISCARDED);
241 }
242 return true;
243 }
244
AsyncRunnerDestructor(napi_env env,void * data,void * hint)245 void AsyncRunner::AsyncRunnerDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
246 {
247 AsyncRunner* asyncRunner = static_cast<AsyncRunner*>(data);
248 if (env == nullptr || asyncRunner == nullptr) {
249 return;
250 }
251 auto runner = AsyncRunnerManager::GetInstance().GetAsyncRunner(asyncRunner->asyncRunnerId_);
252 if (runner == nullptr) {
253 return;
254 }
255 AsyncRunnerManager::GetInstance().UnrefAndDestroyRunner(asyncRunner);
256 }
257
RemoveWaitingTask(Task * task,bool isReject)258 bool AsyncRunner::RemoveWaitingTask(Task* task, bool isReject)
259 {
260 bool flag = false;
261 {
262 std::unique_lock<std::shared_mutex> lock(waitingTasksMutex_);
263 if (waitingTasks_.empty()) {
264 return flag;
265 }
266 auto iter = std::find(waitingTasks_.begin(), waitingTasks_.end(), task);
267 if (iter != waitingTasks_.end()) {
268 waitingTasks_.erase(iter);
269 flag = true;
270 }
271 }
272 if (flag) {
273 if (isReject) {
274 TriggerRejectErrorTimer(task, ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED);
275 } else {
276 AsyncRunnerManager::GetInstance().UnrefAndDestroyRunner(this);
277 }
278 }
279 return flag;
280 }
281
TriggerRejectErrorTimer(Task * task,int32_t errCode,bool isWaiting)282 void AsyncRunner::TriggerRejectErrorTimer(Task* task, int32_t errCode, bool isWaiting)
283 {
284 if (task == nullptr || !task->IsValid()) {
285 return;
286 }
287 if (!isWaiting) {
288 AsyncRunnerManager::GetInstance().UnrefAndDestroyRunner(this);
289 }
290 DiscardTaskMessage* message = new DiscardTaskMessage(task->env_, task->taskId_, errCode, isWaiting);
291 task->DiscardAsyncRunnerTask(message);
292 }
293
TriggerWaitingTask()294 void AsyncRunner::TriggerWaitingTask()
295 {
296 std::unique_lock<std::shared_mutex> lock(waitingTasksMutex_);
297 DecreaseRunningCount();
298 Task* task = nullptr;
299 while (runningCount_ < runningCapacity_) {
300 if (waitingTasks_.empty()) {
301 HILOG_DEBUG("taskpool:: asyncRunner %{public}s empty.", std::to_string(asyncRunnerId_).c_str());
302 break;
303 }
304 task = waitingTasks_.front();
305 waitingTasks_.pop_front();
306 runningCount_.fetch_add(1);
307 task->IncreaseRefCount();
308 TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
309 task->taskState_ = ExecuteState::WAITING;
310 HILOG_DEBUG("taskpool:: Trig task %{public}s in asyncRunner %{public}s.",
311 std::to_string(task->taskId_).c_str(), std::to_string(asyncRunnerId_).c_str());
312 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->asyncTaskPriority_);
313 }
314 }
315
CreateGlobalRunner(const std::string & name,uint32_t runningCapacity,uint32_t waitingCapacity)316 AsyncRunner* AsyncRunner::CreateGlobalRunner(const std::string& name, uint32_t runningCapacity,
317 uint32_t waitingCapacity)
318 {
319 AsyncRunner* asyncRunner = new AsyncRunner();
320 asyncRunner->waitingCapacity_ = waitingCapacity;
321 asyncRunner->runningCapacity_ = runningCapacity;
322 asyncRunner->isGlobalRunner_ = true;
323 asyncRunner->name_ = name;
324 return asyncRunner;
325 }
326
DecreaseAsyncCount()327 uint64_t AsyncRunner::DecreaseAsyncCount()
328 {
329 if (refCount_ > 0) {
330 refCount_--;
331 }
332 return refCount_;
333 }
334
IncreaseAsyncCount()335 void AsyncRunner::IncreaseAsyncCount()
336 {
337 refCount_++;
338 }
339
CheckGlobalRunnerParams(napi_env env,uint32_t runningCapacity,uint32_t waitingCapacity)340 bool AsyncRunner::CheckGlobalRunnerParams(napi_env env, uint32_t runningCapacity, uint32_t waitingCapacity)
341 {
342 if (runningCapacity != runningCapacity_) {
343 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "AsyncRunner runningCapacity can not changed.");
344 return false;
345 }
346 if (waitingCapacity != waitingCapacity_) {
347 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "AsyncRunner waitingCapacity can not changed.");
348 return false;
349 }
350 return true;
351 }
352
DecreaseRunningCount()353 void AsyncRunner::DecreaseRunningCount()
354 {
355 if (runningCount_ > 0) {
356 runningCount_.fetch_sub(1);
357 }
358 }
359 } // namespace Commonlibrary::Concurrent::TaskPoolModule