1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "taskpool.h"
17
18 #include "async_runner_manager.h"
19 #include "helper/hitrace_helper.h"
20 #include "sequence_runner_manager.h"
21 #include "task_group_manager.h"
22
23 namespace Commonlibrary::Concurrent::TaskPoolModule {
24 using namespace Commonlibrary::Concurrent::Common::Helper;
25
InitTaskPool(napi_env env,napi_value exports)26 napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
27 {
28 HILOG_INFO("taskpool:: Import taskpool");
29 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
30 napi_value taskClass = nullptr;
31 NAPI_CALL(env, napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor,
32 nullptr, 0, nullptr, &taskClass));
33 napi_value longTaskClass = nullptr;
34 NAPI_CALL(env, napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor,
35 nullptr, 0, nullptr, &longTaskClass));
36 napi_value genericsTaskClass = nullptr;
37 NAPI_CALL(env, napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor,
38 nullptr, 0, nullptr, &genericsTaskClass));
39 napi_value isCanceledFunc = nullptr;
40 NAPI_CALL(env, napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc));
41 napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
42 napi_set_named_property(env, longTaskClass, "isCanceled", isCanceledFunc);
43 napi_value sendDataFunc = nullptr;
44 NAPI_CALL(env, napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc));
45 napi_set_named_property(env, taskClass, "sendData", sendDataFunc);
46 napi_set_named_property(env, longTaskClass, "sendData", sendDataFunc);
47 napi_value taskGroupClass = nullptr;
48 NAPI_CALL(env, napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor,
49 nullptr, 0, nullptr, &taskGroupClass));
50 napi_value seqRunnerClass = nullptr;
51 NAPI_CALL(env, napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor,
52 nullptr, 0, nullptr, &seqRunnerClass));
53 napi_value asyncRunnerClass = nullptr;
54 NAPI_CALL(env, napi_define_class(env, "AsyncRunner", NAPI_AUTO_LENGTH, AsyncRunner::AsyncRunnerConstructor,
55 nullptr, 0, nullptr, &asyncRunnerClass));
56
57 // define priority
58 napi_value priorityObj = NapiHelper::CreateObject(env);
59 napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
60 napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
61 napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
62 napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
63 napi_property_descriptor exportPriority[] = {
64 DECLARE_NAPI_PROPERTY("HIGH", highPriority),
65 DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
66 DECLARE_NAPI_PROPERTY("LOW", lowPriority),
67 DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
68 };
69 napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
70
71 // define State
72 napi_value stateObj = NapiHelper::CreateObject(env);
73 napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING);
74 napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING);
75 napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED);
76 napi_property_descriptor exportState[] = {
77 DECLARE_NAPI_PROPERTY("WAITING", waitingState),
78 DECLARE_NAPI_PROPERTY("RUNNING", runningState),
79 DECLARE_NAPI_PROPERTY("CANCELED", canceledState),
80 };
81 napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState);
82
83 napi_property_descriptor properties[] = {
84 DECLARE_NAPI_PROPERTY("Task", taskClass),
85 DECLARE_NAPI_PROPERTY("LongTask", longTaskClass),
86 DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass),
87 DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
88 DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass),
89 DECLARE_NAPI_PROPERTY("AsyncRunner", asyncRunnerClass),
90 DECLARE_NAPI_PROPERTY("Priority", priorityObj),
91 DECLARE_NAPI_PROPERTY("State", stateObj),
92 DECLARE_NAPI_FUNCTION("execute", Execute),
93 DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed),
94 DECLARE_NAPI_FUNCTION("cancel", Cancel),
95 DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
96 DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask),
97 DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent),
98 DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically),
99 };
100 napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
101
102 TaskManager::GetInstance().InitTaskManager(env);
103 return exports;
104 }
105
106 // ---------------------------------- SendData ---------------------------------------
ExecuteCallback(const uv_async_t * req)107 void TaskPool::ExecuteCallback(const uv_async_t* req)
108 {
109 auto* msgQueue = TaskManager::GetInstance().GetMessageQueue(req);
110 if (msgQueue == nullptr) {
111 HILOG_WARN("taskpool:: msgQueue is nullptr");
112 return;
113 }
114 ExecuteCallbackInner(*msgQueue);
115 }
116
ExecuteCallbackTask(CallbackInfo * callbackInfo)117 void TaskPool::ExecuteCallbackTask(CallbackInfo* callbackInfo)
118 {
119 auto* msgQueue = TaskManager::GetInstance().GetMessageQueueFromCallbackInfo(callbackInfo);
120 if (msgQueue == nullptr) {
121 HILOG_WARN("taskpool:: msgQueue is nullptr");
122 return;
123 }
124 ExecuteCallbackInner(*msgQueue);
125 }
126
ExecuteCallbackInner(MsgQueue & msgQueue)127 void TaskPool::ExecuteCallbackInner(MsgQueue& msgQueue)
128 {
129 while (!msgQueue.IsEmpty()) {
130 auto resultInfo = msgQueue.DeQueue();
131 if (resultInfo == nullptr) {
132 HILOG_DEBUG("taskpool:: resultInfo is nullptr");
133 continue;
134 }
135 ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false);
136 napi_status status = napi_ok;
137 CallbackScope callbackScope(resultInfo->hostEnv, resultInfo->workerEnv, resultInfo->taskId, status);
138 if (status != napi_ok) {
139 HILOG_ERROR("napi_open_handle_scope failed");
140 return;
141 }
142 auto env = resultInfo->hostEnv;
143 auto callbackInfo = TaskManager::GetInstance().GetCallbackInfo(resultInfo->taskId);
144 if (callbackInfo == nullptr) {
145 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
146 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
147 continue;
148 }
149 TaskManager::GetInstance().ResetCallbackInfoWorker(callbackInfo);
150 auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef);
151 napi_value args;
152 napi_value result;
153 status = napi_deserialize(env, resultInfo->serializationArgs, &args);
154 napi_delete_serialization_data(env, resultInfo->serializationArgs);
155 if (status != napi_ok || args == nullptr) {
156 std::string errMessage = "taskpool:: failed to serialize function";
157 HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
158 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
159 continue;
160 }
161 uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
162 napi_value argsArray[argsNum];
163 for (size_t i = 0; i < argsNum; i++) {
164 argsArray[i] = NapiHelper::GetElement(env, args, i);
165 }
166 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result);
167 if (NapiHelper::IsExceptionPending(env)) {
168 napi_value exception = nullptr;
169 napi_get_and_clear_last_exception(env, &exception);
170 HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function");
171 }
172 }
173 }
174 // ---------------------------------- SendData ---------------------------------------
175
GetTaskPoolInfo(napi_env env,napi_callback_info cbinfo)176 napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
177 {
178 napi_value result = nullptr;
179 napi_create_object(env, &result);
180 napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
181 napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
182 napi_set_named_property(env, result, "threadInfos", threadInfos);
183 napi_set_named_property(env, result, "taskInfos", taskInfos);
184 return result;
185 }
186
TerminateTask(napi_env env,napi_callback_info cbinfo)187 napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo)
188 {
189 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
190 size_t argc = 1; // 1: long task
191 napi_value args[1];
192 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
193 if (argc < 1) {
194 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be one.");
195 return nullptr;
196 }
197 if (!NapiHelper::IsObject(env, args[0])) {
198 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
199 return nullptr;
200 }
201 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
202 uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId);
203 auto task = TaskManager::GetInstance().GetTask(taskId);
204 if (task == nullptr || !task->IsLongTask()) {
205 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be long task.");
206 return nullptr;
207 }
208 TaskManager::GetInstance().TerminateTask(taskId);
209 return nullptr;
210 }
211
Execute(napi_env env,napi_callback_info cbinfo)212 napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
213 {
214 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
215 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
216 if (argc < 1) {
217 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one.");
218 return nullptr;
219 }
220 napi_value* args = new napi_value[argc];
221 ObjectScope<napi_value> scope(args, true);
222 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
223 napi_valuetype type = napi_undefined;
224 napi_typeof(env, args[0], &type);
225 if (type == napi_object) {
226 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
227 if (argc > 1) {
228 if (!NapiHelper::IsNumber(env, args[1])) {
229 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be number.");
230 return nullptr;
231 }
232 priority = NapiHelper::GetUint32Value(env, args[1]);
233 if (priority >= Priority::NUMBER) {
234 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error");
235 return nullptr;
236 }
237 }
238 if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
239 return ExecuteGroup(env, args[0], static_cast<Priority>(priority));
240 }
241 Task* task = nullptr;
242 napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
243 if (task == nullptr) {
244 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
245 return nullptr;
246 }
247 if (!task->CanExecute(env)) {
248 return nullptr;
249 }
250 napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK,
251 static_cast<Priority>(priority));
252 if (promise == nullptr) {
253 return nullptr;
254 }
255 ExecuteTask(env, task, static_cast<Priority>(priority));
256 return promise;
257 }
258 if (type != napi_function) {
259 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
260 "the type of the first param must be object or function.");
261 return nullptr;
262 }
263 Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK);
264 if (task == nullptr) {
265 HILOG_ERROR("taskpool:: GenerateFunctionTask failed");
266 return nullptr;
267 }
268 napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);
269 ExecuteTask(env, task);
270 return promise;
271 }
272
DelayTask(uv_timer_t * handle)273 void TaskPool::DelayTask(uv_timer_t* handle)
274 {
275 TaskMessage* taskMessage = static_cast<TaskMessage*>(handle->data);
276 auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId);
277 napi_status status = napi_ok;
278 if (task == nullptr) {
279 HILOG_DEBUG("taskpool:: task is nullptr");
280 } else if (task->taskState_ == ExecuteState::CANCELED) {
281 HILOG_DEBUG("taskpool:: DelayTask task has been canceled");
282 HandleScope scope(task->env_, status);
283 if (status != napi_ok) {
284 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
285 return;
286 }
287 napi_value error = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
288 napi_reject_deferred(task->env_, taskMessage->deferred, error);
289 } else {
290 HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str());
291 HandleScope scope(task->env_, status);
292 if (status != napi_ok) {
293 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
294 return;
295 }
296 TaskManager::GetInstance().IncreaseRefCount(taskMessage->taskId);
297 task->IncreaseRefCount();
298 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
299 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority);
300 if (taskInfo != nullptr) {
301 taskInfo->deferred = taskMessage->deferred;
302 if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) {
303 task->taskState_ = ExecuteState::WAITING;
304 TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority));
305 }
306 } else {
307 napi_value execption = nullptr;
308 napi_get_and_clear_last_exception(task->env_, &execption);
309 if (execption != nullptr) {
310 napi_reject_deferred(task->env_, taskMessage->deferred, execption);
311 }
312 }
313 }
314 if (task != nullptr) {
315 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
316 task->delayedTimers_.erase(handle);
317 }
318 uv_timer_stop(handle);
319 ConcurrentHelper::UvHandleClose(handle);
320 delete taskMessage;
321 taskMessage = nullptr;
322 }
323
ExecuteDelayed(napi_env env,napi_callback_info cbinfo)324 napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo)
325 {
326 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
327 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
328 int32_t delayTime = 0;
329 Task* task = nullptr;
330 if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) {
331 return nullptr;
332 }
333
334 if (!task->IsExecuted() || task->taskState_ == ExecuteState::CANCELED ||
335 task->taskState_ == ExecuteState::FINISHED) {
336 task->taskState_ = ExecuteState::DELAYED;
337 }
338 task->UpdateTaskType(TaskType::COMMON_TASK);
339
340 uv_loop_t* loop = NapiHelper::GetLibUV(env);
341 uv_update_time(loop);
342 uv_timer_t* timer = new uv_timer_t;
343 uv_timer_init(loop, timer);
344 TaskMessage* taskMessage = new TaskMessage();
345 taskMessage->priority = static_cast<Priority>(priority);
346 taskMessage->taskId = task->taskId_;
347 napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred);
348 timer->data = taskMessage;
349
350 std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_);
351 strTrace += ", priority: " + std::to_string(priority);
352 strTrace += ", delayTime " + std::to_string(delayTime);
353 HITRACE_HELPER_METER_NAME(strTrace);
354 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
355
356 uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0);
357 {
358 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
359 task->delayedTimers_.insert(timer);
360 }
361 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
362 if (engine->IsMainThread()) {
363 uv_async_send(&loop->wq_async);
364 }
365 return promise;
366 }
367
ExecuteGroup(napi_env env,napi_value napiTaskGroup,Priority priority)368 napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)
369 {
370 napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR);
371 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
372 HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str());
373 auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
374 if (taskGroup == nullptr) {
375 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskGroup is nullptr.");
376 return nullptr;
377 }
378 napi_reference_ref(env, taskGroup->groupRef_, nullptr);
379 if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED ||
380 taskGroup->groupState_ == ExecuteState::CANCELED) {
381 taskGroup->groupState_ = ExecuteState::WAITING;
382 }
383 GroupInfo* groupInfo = new GroupInfo();
384 groupInfo->priority = priority;
385 napi_value resArr;
386 napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr);
387 napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
388 groupInfo->resArr = arrRef;
389 napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
390 {
391 std::lock_guard<std::recursive_mutex> lock(taskGroup->taskGroupMutex_);
392 if (taskGroup->taskNum_ == 0) {
393 napi_resolve_deferred(env, groupInfo->deferred, resArr);
394 taskGroup->groupState_ = ExecuteState::FINISHED;
395 napi_delete_reference(env, groupInfo->resArr);
396 napi_reference_unref(env, taskGroup->groupRef_, nullptr);
397 delete groupInfo;
398 taskGroup->currentGroupInfo_ = nullptr;
399 return promise;
400 }
401 if (taskGroup->currentGroupInfo_ == nullptr) {
402 taskGroup->currentGroupInfo_ = groupInfo;
403 for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) {
404 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
405 Task* task = nullptr;
406 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
407 if (task == nullptr) {
408 HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
409 return nullptr;
410 }
411 napi_reference_ref(env, task->taskRef_, nullptr);
412 if (task->IsGroupCommonTask()) {
413 task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority));
414 }
415 ExecuteTask(env, task, static_cast<Priority>(priority));
416 }
417 } else {
418 taskGroup->pendingGroupInfos_.push_back(groupInfo);
419 }
420 }
421 return promise;
422 }
423
HandleTaskResult(const uv_async_t * req)424 void TaskPool::HandleTaskResult(const uv_async_t* req)
425 {
426 HILOG_DEBUG("taskpool:: HandleTaskResult task");
427 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
428 auto task = static_cast<Task*>(req->data);
429 if (task == nullptr) { // LCOV_EXCL_BR_LINE
430 HILOG_FATAL("taskpool:: HandleTaskResult task is null");
431 return;
432 }
433 if (!task->IsMainThreadTask()) {
434 if (task->ShouldDeleteTask(false)) {
435 delete task;
436 return;
437 }
438 if (task->IsFunctionTask()) {
439 napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task);
440 }
441 }
442 task->DecreaseTaskRefCount();
443 HandleTaskResultCallback(task);
444 }
445
HandleTaskResultCallback(Task * task)446 void TaskPool::HandleTaskResultCallback(Task* task)
447 {
448 napi_handle_scope scope = nullptr;
449 NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope));
450 napi_value napiTaskResult = nullptr;
451 napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult);
452 napi_delete_serialization_data(task->env_, task->result_);
453
454 // tag for trace parse: Task PerformTask End
455 std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_);
456 std::string taskLog = "Task PerformTask End: " + std::to_string(task->taskId_);
457 if (task->taskState_ == ExecuteState::CANCELED) {
458 strTrace += ", performResult : IsCanceled";
459 napiTaskResult = task->IsAsyncRunnerTask() ?
460 ErrorHelper::NewError(task->env_, ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED) :
461 ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
462 } else if (status != napi_ok) {
463 strTrace += ", performResult : DeserializeFailed";
464 taskLog += ", DeserializeFailed";
465 } else if (task->success_) {
466 strTrace += ", performResult : Successful";
467 } else {
468 strTrace += ", performResult : Unsuccessful";
469 taskLog += ", Unsuccessful";
470 }
471 HITRACE_HELPER_METER_NAME(strTrace);
472 HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
473 if (napiTaskResult == nullptr) {
474 napi_get_undefined(task->env_, &napiTaskResult);
475 }
476 reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter();
477 bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_);
478 task->taskState_ = ExecuteState::ENDING;
479 task->isCancelToFinish_ = false;
480 if (task->IsGroupTask()) {
481 UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success);
482 } else if (!task->IsPeriodicTask()) {
483 if (success) {
484 napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
485 if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
486 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
487 }
488 } else {
489 napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
490 if (task->onExecutionFailedCallBackInfo_ != nullptr) {
491 task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult;
492 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
493 }
494 }
495 }
496 NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope));
497 TriggerTask(task);
498 }
499
TriggerTask(Task * task)500 void TaskPool::TriggerTask(Task* task)
501 {
502 HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str());
503 if (task->IsGroupTask()) {
504 return;
505 }
506 TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
507 task->taskState_ = ExecuteState::FINISHED;
508 // seqRunnerTask will trigger the next
509 if (task->IsSeqRunnerTask()) {
510 if (!SequenceRunnerManager::GetInstance().TriggerSeqRunner(task->env_, task)) {
511 HILOG_ERROR("taskpool:: task %{public}s trigger in seqRunner %{public}s failed",
512 std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str());
513 }
514 } else if (task->IsCommonTask()) {
515 task->NotifyPendingTask();
516 } else if (task->IsAsyncRunnerTask()) {
517 if (!AsyncRunnerManager::GetInstance().TriggerAsyncRunner(task->env_, task)) {
518 HILOG_ERROR("taskpool:: task %{public}s trigger in asyncRunner %{public}s failed",
519 std::to_string(task->taskId_).c_str(), std::to_string(task->asyncRunnerId_).c_str());
520 }
521 }
522 if (task->IsPeriodicTask()) {
523 return;
524 }
525 if (!task->IsFunctionTask()) {
526 napi_reference_unref(task->env_, task->taskRef_, nullptr);
527 return;
528 }
529 // function task need release data
530 task->ReleaseData();
531 TaskManager::GetInstance().RemoveTask(task->taskId_);
532 delete task;
533 }
534
UpdateGroupInfoByResult(napi_env env,Task * task,napi_value res,bool success)535 void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)
536 {
537 HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str());
538 TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
539 task->taskState_ = ExecuteState::FINISHED;
540 napi_reference_unref(env, task->taskRef_, nullptr);
541 if (task->IsGroupCommonTask()) {
542 delete task->currentTaskInfo_;
543 task->currentTaskInfo_ = nullptr;
544 }
545 TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
546 if (taskGroup == nullptr || taskGroup->currentGroupInfo_ == nullptr) {
547 HILOG_DEBUG("taskpool:: taskGroup may have been released or canceled");
548 return;
549 }
550 // store the result
551 uint32_t index = taskGroup->GetTaskIndex(task->taskId_);
552 auto groupInfo = taskGroup->currentGroupInfo_;
553 napi_ref arrRef = groupInfo->resArr;
554 napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
555 napi_set_element(env, resArr, index, res);
556 groupInfo->finishedTaskNum++;
557 // store the index when the first exception occurs
558 if (!success && !groupInfo->HasException()) {
559 groupInfo->SetFailedIndex(index);
560 }
561 // we will not handle the result until all tasks are finished
562 if (groupInfo->finishedTaskNum < taskGroup->taskNum_) {
563 return;
564 }
565 // if there is no exception, just resolve
566 if (!groupInfo->HasException()) {
567 HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str());
568 napi_resolve_deferred(env, groupInfo->deferred, resArr);
569 for (uint32_t taskId : taskGroup->taskIds_) {
570 auto task = TaskManager::GetInstance().GetTask(taskId);
571 if (task != nullptr && task->onExecutionSucceededCallBackInfo_ != nullptr) {
572 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
573 }
574 }
575 } else {
576 napi_value res = nullptr;
577 napi_get_element(env, resArr, groupInfo->GetFailedIndex(), &res);
578 napi_reject_deferred(env, groupInfo->deferred, res);
579 auto iter = taskGroup->taskIds_.begin();
580 std::advance(iter, groupInfo->GetFailedIndex());
581 auto task = iter != taskGroup->taskIds_.end() ? TaskManager::GetInstance().GetTask(*iter) : nullptr;
582 if (task != nullptr && task->onExecutionFailedCallBackInfo_ != nullptr) {
583 task->onExecutionFailedCallBackInfo_->taskError_ = res;
584 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
585 }
586 }
587 taskGroup->groupState_ = ExecuteState::FINISHED;
588 napi_delete_reference(env, groupInfo->resArr);
589 napi_reference_unref(env, taskGroup->groupRef_, nullptr);
590 delete groupInfo;
591 taskGroup->currentGroupInfo_ = nullptr;
592 taskGroup->NotifyGroupTask(env);
593 }
594
ExecuteTask(napi_env env,Task * task,Priority priority)595 void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority)
596 {
597 // tag for trace parse: Task Allocation
598 std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_)
599 + ", priority : " + std::to_string(priority)
600 + ", executeState : " + std::to_string(ExecuteState::WAITING);
601 HITRACE_HELPER_METER_NAME(strTrace);
602 std::string taskLog = "Task Allocation: " + std::to_string(task->taskId_)
603 + ", " + std::to_string(priority);
604 HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
605 task->IncreaseRefCount();
606 TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
607 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
608 (task->taskState_ == ExecuteState::CANCELED && task->isCancelToFinish_)) {
609 task->taskState_ = ExecuteState::WAITING;
610 task->isCancelToFinish_ = false;
611 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
612 }
613 }
614
Cancel(napi_env env,napi_callback_info cbinfo)615 napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
616 {
617 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
618 size_t argc = 1;
619 napi_value args[1];
620 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
621 if (argc < 1) {
622 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
623 return nullptr;
624 }
625
626 if (NapiHelper::IsNumber(env, args[0])) {
627 uint32_t taskId = NapiHelper::GetUint32Value(env, args[0]);
628 TaskManager::GetInstance().CancelTask(env, taskId);
629 return nullptr;
630 }
631
632 if (!NapiHelper::IsObject(env, args[0])) {
633 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
634 return nullptr;
635 }
636
637 if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
638 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
639 if (napiTaskId == nullptr) {
640 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task.");
641 return nullptr;
642 }
643 uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId);
644 TaskManager::GetInstance().CancelTask(env, taskId);
645 } else {
646 napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
647 if (napiGroupId == nullptr) {
648 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be taskGroup.");
649 return nullptr;
650 }
651 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
652 TaskGroupManager::GetInstance().CancelGroup(env, groupId);
653 }
654 return nullptr;
655 }
656
IsConcurrent(napi_env env,napi_callback_info cbinfo)657 napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo)
658 {
659 size_t argc = 1;
660 napi_value args[1];
661 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
662 if (argc != 1) {
663 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
664 return nullptr;
665 }
666
667 if (!NapiHelper::IsFunction(env, args[0])) {
668 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be function.");
669 return nullptr;
670 }
671
672 bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]);
673 return NapiHelper::CreateBooleanValue(env, isConcurrent);
674 }
675
PeriodicTaskCallback(uv_timer_t * handle)676 void TaskPool::PeriodicTaskCallback(uv_timer_t* handle)
677 {
678 Task* task = reinterpret_cast<Task*>(handle->data);
679 if (task == nullptr) {
680 HILOG_DEBUG("taskpool:: the task is nullptr");
681 return;
682 } else if (!task->IsPeriodicTask()) {
683 HILOG_DEBUG("taskpool:: the current task is not a periodic task");
684 return;
685 } else if (task->taskState_ == ExecuteState::CANCELED) {
686 HILOG_DEBUG("taskpool:: the periodic task has been canceled");
687 napi_reference_unref(task->env_, task->taskRef_, nullptr);
688 task->CancelPendingTask(task->env_);
689 uv_timer_stop(task->timer_);
690 ConcurrentHelper::UvHandleClose(task->timer_);
691 return;
692 }
693 TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
694
695 if (!task->isFirstTaskInfo_) {
696 napi_status status = napi_ok;
697 HandleScope scope(task->env_, status);
698 if (status != napi_ok) {
699 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
700 return;
701 }
702 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
703 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_);
704 if (taskInfo == nullptr) {
705 HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr");
706 return;
707 }
708 }
709 task->isFirstTaskInfo_ = false;
710
711 task->IncreaseRefCount();
712 HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str());
713 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) {
714 task->taskState_ = ExecuteState::WAITING;
715 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_);
716 }
717 }
718
ExecutePeriodically(napi_env env,napi_callback_info cbinfo)719 napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo)
720 {
721 int32_t period = 0;
722 uint32_t priority = Priority::DEFAULT;
723 Task* periodicTask = nullptr;
724 if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) {
725 return nullptr;
726 }
727
728 if (!periodicTask->CanExecutePeriodically(env)) {
729 return nullptr;
730 }
731 periodicTask->UpdatePeriodicTask();
732
733 periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority);
734 napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_);
735 TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_);
736 if (taskInfo == nullptr) {
737 return nullptr;
738 }
739
740 periodicTask->isFirstTaskInfo_ = true; // periodic task first Generate TaskInfo
741
742 TriggerTimer(env, periodicTask, period);
743 return nullptr;
744 }
745
TriggerTimer(napi_env env,Task * task,int32_t period)746 void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period)
747 {
748 HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str());
749 uv_loop_t* loop = NapiHelper::GetLibUV(env);
750 task->timer_ = new uv_timer_t;
751 uv_timer_init(loop, task->timer_);
752 task->timer_->data = task;
753 uv_update_time(loop);
754 uv_timer_start(task->timer_, PeriodicTaskCallback, period, period);
755 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
756 if (engine->IsMainThread()) {
757 uv_async_send(&loop->wq_async);
758 }
759 }
760
CheckDelayedParams(napi_env env,napi_callback_info cbinfo,uint32_t & priority,int32_t & delayTime,Task * & task)761 bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime,
762 Task* &task)
763 {
764 size_t argc = 3; // 3: delayTime, task and priority
765 napi_value args[3]; // 3: delayTime, task and priority
766 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
767 if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority
768 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
769 return false;
770 }
771
772 if (!NapiHelper::IsNumber(env, args[0])) {
773 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
774 return false;
775 }
776
777 if (!NapiHelper::IsObject(env, args[1])) {
778 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be object.");
779 return false;
780 }
781
782 delayTime = NapiHelper::GetInt32Value(env, args[0]);
783 if (delayTime < 0) {
784 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero");
785 return false;
786 }
787
788 if (argc > 2) { // 2: the params might have priority
789 if (!NapiHelper::IsNumber(env, args[2])) {
790 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the third param must be number.");
791 return false;
792 }
793 priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority
794 if (priority >= Priority::NUMBER) {
795 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error.");
796 return false;
797 }
798 }
799
800 napi_unwrap(env, args[1], reinterpret_cast<void**>(&task));
801 if (task == nullptr) {
802 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of second param must be task");
803 return false;
804 }
805 if (!task->CanExecuteDelayed(env)) {
806 return false;
807 }
808 return true;
809 }
810
CheckPeriodicallyParams(napi_env env,napi_callback_info cbinfo,int32_t & period,uint32_t & priority,Task * & periodicTask)811 bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period,
812 uint32_t &priority, Task* &periodicTask)
813 {
814 size_t argc = 3; // 3 : period, task, priority
815 napi_value args[3]; // 3 : period, task, priority
816 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
817 if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority
818 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
819 return false;
820 }
821 if (!NapiHelper::IsNumber(env, args[0])) {
822 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
823 return false;
824 }
825 period = NapiHelper::GetInt32Value(env, args[0]);
826 if (period < 0) {
827 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero.");
828 return false;
829 }
830 if (!NapiHelper::IsObject(env, args[1])) {
831 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
832 return false;
833 }
834
835 if (argc >= 3) { // 3 : third param maybe priority
836 if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority
837 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority.");
838 return false;
839 }
840 priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority
841 if (priority >= Priority::NUMBER) {
842 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid.");
843 return false;
844 }
845 }
846
847 napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask));
848 if (periodicTask == nullptr) {
849 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
850 return false;
851 }
852
853 return true;
854 }
855 } // namespace Commonlibrary::Concurrent::TaskPoolModule
856