• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "taskpool.h"
17 
18 #include "async_runner_manager.h"
19 #include "helper/hitrace_helper.h"
20 #include "sequence_runner_manager.h"
21 #include "task_group_manager.h"
22 
23 namespace Commonlibrary::Concurrent::TaskPoolModule {
24 using namespace Commonlibrary::Concurrent::Common::Helper;
25 
InitTaskPool(napi_env env,napi_value exports)26 napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
27 {
28     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
29     napi_value taskClass = nullptr;
30     NAPI_CALL(env, napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor,
31               nullptr, 0, nullptr, &taskClass));
32     napi_value longTaskClass = nullptr;
33     NAPI_CALL(env, napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor,
34               nullptr, 0, nullptr, &longTaskClass));
35     napi_value genericsTaskClass = nullptr;
36     NAPI_CALL(env, napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor,
37               nullptr, 0, nullptr, &genericsTaskClass));
38     napi_value isCanceledFunc = nullptr;
39     NAPI_CALL(env, napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc));
40     napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
41     napi_set_named_property(env, longTaskClass, "isCanceled", isCanceledFunc);
42     napi_value sendDataFunc = nullptr;
43     NAPI_CALL(env, napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc));
44     napi_set_named_property(env, taskClass, "sendData", sendDataFunc);
45     napi_set_named_property(env, longTaskClass, "sendData", sendDataFunc);
46     napi_value taskGroupClass = nullptr;
47     NAPI_CALL(env, napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor,
48               nullptr, 0, nullptr, &taskGroupClass));
49     napi_value seqRunnerClass = nullptr;
50     NAPI_CALL(env, napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor,
51               nullptr, 0, nullptr, &seqRunnerClass));
52     napi_value asyncRunnerClass = nullptr;
53     NAPI_CALL(env, napi_define_class(env, "AsyncRunner", NAPI_AUTO_LENGTH, AsyncRunner::AsyncRunnerConstructor,
54               nullptr, 0, nullptr, &asyncRunnerClass));
55 
56     // define priority
57     napi_value priorityObj = NapiHelper::CreateObject(env);
58     napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
59     napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
60     napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
61     napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
62     napi_property_descriptor exportPriority[] = {
63         DECLARE_NAPI_PROPERTY("HIGH", highPriority),
64         DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
65         DECLARE_NAPI_PROPERTY("LOW", lowPriority),
66         DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
67     };
68     napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
69 
70     // define State
71     napi_value stateObj = NapiHelper::CreateObject(env);
72     napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING);
73     napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING);
74     napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED);
75     napi_property_descriptor exportState[] = {
76         DECLARE_NAPI_PROPERTY("WAITING", waitingState),
77         DECLARE_NAPI_PROPERTY("RUNNING", runningState),
78         DECLARE_NAPI_PROPERTY("CANCELED", canceledState),
79     };
80     napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState);
81 
82     napi_property_descriptor properties[] = {
83         DECLARE_NAPI_PROPERTY("Task", taskClass),
84         DECLARE_NAPI_PROPERTY("LongTask", longTaskClass),
85         DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass),
86         DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
87         DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass),
88         DECLARE_NAPI_PROPERTY("AsyncRunner", asyncRunnerClass),
89         DECLARE_NAPI_PROPERTY("Priority", priorityObj),
90         DECLARE_NAPI_PROPERTY("State", stateObj),
91         DECLARE_NAPI_FUNCTION("execute", Execute),
92         DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed),
93         DECLARE_NAPI_FUNCTION("cancel", Cancel),
94         DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
95         DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask),
96         DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent),
97         DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically),
98     };
99     napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
100 
101     TaskManager::GetInstance().InitTaskManager(env);
102     return exports;
103 }
104 
105 // ---------------------------------- SendData ---------------------------------------
ExecuteOnReceiveDataCallback(CallbackInfo * callbackInfo,TaskResultInfo * resultInfo)106 void TaskPool::ExecuteOnReceiveDataCallback(CallbackInfo* callbackInfo, TaskResultInfo* resultInfo)
107 {
108     ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false);
109     napi_status status = napi_ok;
110     std::string traceLabel = "ExecuteOnReceiveDataCallback type: " + callbackInfo->type
111         + ", taskId: " + std::to_string(resultInfo->taskId);
112     HITRACE_HELPER_METER_NAME(traceLabel);
113     auto env = callbackInfo->hostEnv;
114     CallbackScope callbackScope(env, resultInfo, status);
115     if (status != napi_ok) {
116         HILOG_ERROR("napi_open_handle_scope failed");
117         return;
118     }
119     auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef);
120     napi_value args;
121     napi_value result;
122     status = napi_deserialize(env, resultInfo->serializationArgs, &args);
123     napi_delete_serialization_data(env, resultInfo->serializationArgs);
124     if (status != napi_ok || args == nullptr) {
125         std::string errMessage = "taskpool:: failed to serialize function";
126         HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
127         ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
128         return;
129     }
130     uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
131     napi_value argsArray[argsNum];
132     for (size_t i = 0; i < argsNum; i++) {
133         argsArray[i] = NapiHelper::GetElement(env, args, i);
134     }
135     napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result);
136     if (NapiHelper::IsExceptionPending(env)) {
137         napi_value exception = nullptr;
138         napi_get_and_clear_last_exception(env, &exception);
139         HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function");
140     }
141 }
142 // ---------------------------------- SendData ---------------------------------------
143 
GetTaskPoolInfo(napi_env env,napi_callback_info cbinfo)144 napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
145 {
146     napi_value result = nullptr;
147     napi_create_object(env, &result);
148     napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
149     napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
150     napi_set_named_property(env, result, "threadInfos", threadInfos);
151     napi_set_named_property(env, result, "taskInfos", taskInfos);
152     return result;
153 }
154 
TerminateTask(napi_env env,napi_callback_info cbinfo)155 napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo)
156 {
157     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
158     size_t argc = 1; // 1: long task
159     napi_value args[1];
160     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
161     if (argc < 1) {
162         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be one.");
163         return nullptr;
164     }
165     if (!NapiHelper::IsObject(env, args[0])) {
166         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
167         return nullptr;
168     }
169     napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
170     uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId);
171     auto task = TaskManager::GetInstance().GetTask(taskId);
172     if (task == nullptr || !task->IsLongTask()) {
173         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be long task.");
174         return nullptr;
175     }
176     TaskManager::GetInstance().TerminateTask(taskId);
177     return nullptr;
178 }
179 
Execute(napi_env env,napi_callback_info cbinfo)180 napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
181 {
182     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
183     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
184     if (argc < 1) {
185         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one.");
186         return nullptr;
187     }
188     napi_value* args = new napi_value[argc];
189     ObjectScope<napi_value> scope(args, true);
190     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
191     napi_valuetype type = napi_undefined;
192     napi_typeof(env, args[0], &type);
193     if (type == napi_object) {
194         uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
195         if (argc > 1) {
196             if (!NapiHelper::IsNumber(env, args[1])) {
197                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be number.");
198                 return nullptr;
199             }
200             priority = NapiHelper::GetUint32Value(env, args[1]);
201             if (priority >= Priority::NUMBER) {
202                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error");
203                 return nullptr;
204             }
205         }
206         if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
207             return ExecuteGroup(env, args[0], static_cast<Priority>(priority));
208         }
209         Task* task = nullptr;
210         napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
211         if (task == nullptr) {
212             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
213             return nullptr;
214         }
215         if (!task->CanExecute(env)) {
216             return nullptr;
217         }
218         napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK,
219                                                       static_cast<Priority>(priority));
220         if (promise == nullptr) {
221             return nullptr;
222         }
223         ExecuteTask(env, task, static_cast<Priority>(priority));
224         return promise;
225     }
226     if (type != napi_function) {
227         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
228             "the type of the first param must be object or function.");
229         return nullptr;
230     }
231     Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK);
232     if (task == nullptr) {
233         HILOG_ERROR("taskpool:: GenerateFunctionTask failed");
234         return nullptr;
235     }
236     napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);
237     ExecuteTask(env, task);
238     return promise;
239 }
240 
DelayTask(uv_timer_t * handle)241 void TaskPool::DelayTask(uv_timer_t* handle)
242 {
243     TaskMessage* taskMessage = static_cast<TaskMessage*>(handle->data);
244     auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId);
245     napi_status status = napi_ok;
246     if (task == nullptr) {
247         HILOG_DEBUG("taskpool:: task is nullptr");
248     } else if (task->taskState_ == ExecuteState::CANCELED) {
249         HILOG_DEBUG("taskpool:: DelayTask task has been canceled");
250         HandleScope scope(task->env_, status);
251         if (status != napi_ok) {
252             HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
253             return;
254         }
255         napi_value error = TaskManager::GetInstance().CancelError(task->env_, 0, "taskpool:: task has been canceled");
256         napi_reject_deferred(task->env_, taskMessage->deferred, error);
257     } else {
258         HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str());
259         HandleScope scope(task->env_, status);
260         if (status != napi_ok) {
261             HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
262             return;
263         }
264         TaskManager::GetInstance().IncreaseSendDataRefCount(taskMessage->taskId);
265         task->IncreaseRefCount();
266         napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
267         TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority);
268         if (taskInfo != nullptr) {
269             taskInfo->deferred = taskMessage->deferred;
270             if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) {
271                 task->taskState_ = ExecuteState::WAITING;
272                 TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority));
273             }
274         } else {
275             napi_value execption = nullptr;
276             napi_get_and_clear_last_exception(task->env_, &execption);
277             if (execption != nullptr) {
278                 napi_reject_deferred(task->env_, taskMessage->deferred, execption);
279             }
280         }
281     }
282     if (task != nullptr) {
283         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
284         task->delayedTimers_.erase(handle);
285     }
286     uv_timer_stop(handle);
287     ConcurrentHelper::UvHandleClose(handle);
288     delete taskMessage;
289     taskMessage = nullptr;
290 }
291 
ExecuteDelayed(napi_env env,napi_callback_info cbinfo)292 napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo)
293 {
294     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
295     uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
296     int32_t delayTime = 0;
297     Task* task = nullptr;
298     if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) {
299         return nullptr;
300     }
301 
302     task->UpdateTaskStateToDelayed();
303     task->UpdateTaskType(TaskType::COMMON_TASK);
304 
305     uv_loop_t* loop = NapiHelper::GetLibUV(env);
306     uv_update_time(loop);
307     uv_timer_t* timer = new uv_timer_t;
308     uv_timer_init(loop, timer);
309     TaskMessage* taskMessage = new TaskMessage();
310     taskMessage->priority = static_cast<Priority>(priority);
311     taskMessage->taskId = task->taskId_;
312     napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred);
313     timer->data = taskMessage;
314 
315     std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_);
316     strTrace += ", priority: " + std::to_string(priority);
317     strTrace += ", delayTime " + std::to_string(delayTime);
318     HITRACE_HELPER_METER_NAME(strTrace);
319     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
320 
321     uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0);
322     {
323         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
324         task->delayedTimers_.insert(timer);
325     }
326     NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
327     if (engine->IsMainThread()) {
328         uv_async_send(&loop->wq_async);
329     }
330     return promise;
331 }
332 
ExecuteGroup(napi_env env,napi_value napiTaskGroup,Priority priority)333 napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)
334 {
335     napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR);
336     uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
337     HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str());
338     auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
339     if (taskGroup == nullptr) {
340         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskGroup is nullptr.");
341         return nullptr;
342     }
343     napi_reference_ref(env, taskGroup->groupRef_, nullptr);
344     if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED ||
345         taskGroup->groupState_ == ExecuteState::CANCELED) {
346         taskGroup->groupState_ = ExecuteState::WAITING;
347     }
348     GroupInfo* groupInfo = new GroupInfo();
349     groupInfo->priority = priority;
350     napi_value resArr;
351     napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr);
352     napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
353     groupInfo->resArr = arrRef;
354     napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
355     {
356         std::lock_guard<std::recursive_mutex> lock(taskGroup->taskGroupMutex_);
357         if (taskGroup->taskNum_ == 0) {
358             napi_resolve_deferred(env, groupInfo->deferred, resArr);
359             taskGroup->groupState_ = ExecuteState::FINISHED;
360             napi_delete_reference(env, groupInfo->resArr);
361             napi_reference_unref(env, taskGroup->groupRef_, nullptr);
362             delete groupInfo;
363             taskGroup->currentGroupInfo_ = nullptr;
364             return promise;
365         }
366         if (taskGroup->currentGroupInfo_ == nullptr) {
367             taskGroup->currentGroupInfo_ = groupInfo;
368             for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) {
369                 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
370                 Task* task = nullptr;
371                 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
372                 if (task == nullptr) {
373                     HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
374                     return nullptr;
375                 }
376                 napi_reference_ref(env, task->taskRef_, nullptr);
377                 if (task->IsGroupCommonTask()) {
378                     task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority));
379                 }
380                 ExecuteTask(env, task, static_cast<Priority>(priority));
381             }
382         } else {
383             taskGroup->pendingGroupInfos_.push_back(groupInfo);
384         }
385     }
386     return promise;
387 }
388 
HandleTaskResult(Task * task)389 void TaskPool::HandleTaskResult(Task* task)
390 {
391     HILOG_DEBUG("taskpool:: HandleTaskResult task");
392     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
393     if (!task->IsMainThreadTask()) {
394         if (task->ShouldDeleteTask(false)) {
395             delete task;
396             return;
397         }
398         if (task->IsFunctionTask()) {
399             napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task);
400         }
401     }
402     task->DecreaseTaskLifecycleCount();
403     HandleTaskResultInner(task);
404 }
405 
HandleTaskResultInner(Task * task)406 void TaskPool::HandleTaskResultInner(Task* task)
407 {
408     napi_handle_scope scope = nullptr;
409     NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope));
410     napi_value napiTaskResult = nullptr;
411     napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult);
412     napi_delete_serialization_data(task->env_, task->result_);
413 
414     bool isCancel = false;
415     RecordTaskResultLog(task, status, napiTaskResult, isCancel);
416 
417     if (napiTaskResult == nullptr) {
418         napi_get_undefined(task->env_, &napiTaskResult);
419     }
420     reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter();
421     bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_);
422     task->UpdateTaskStateToEnding();
423     task->isCancelToFinish_ = false;
424     if (task->IsGroupTask()) {
425         UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success);
426     } else if (!task->IsPeriodicTask()) {
427         if (success) {
428             napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
429             if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
430                 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_, task->taskId_);
431             }
432         } else {
433             napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
434             if (task->onExecutionFailedCallBackInfo_ != nullptr) {
435                 task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult;
436                 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_, task->taskId_);
437             }
438         }
439     }
440     NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope));
441     TriggerTask(task, isCancel);
442 }
443 
TriggerTask(Task * task,bool isCancel)444 void TaskPool::TriggerTask(Task* task, bool isCancel)
445 {
446     HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str());
447     if (task->IsGroupTask()) {
448         return;
449     }
450     TaskManager::GetInstance().DecreaseSendDataRefCount(task->env_, task->taskId_);
451     task->UpdateTaskStateToFinished();
452     // seqRunnerTask will trigger the next
453     if (task->IsSeqRunnerTask()) {
454         if (!SequenceRunnerManager::GetInstance().TriggerSeqRunner(task->env_, task)) {
455             HILOG_WARN("taskpool:: task %{public}s trigger in seqRunner %{public}s failed",
456                 std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str());
457         }
458     } else if (task->IsCommonTask()) {
459         if (!isCancel) {
460             TaskManager::GetInstance().NotifyDependencyTaskInfo(task->taskId_);
461         }
462         task->NotifyPendingTask();
463     } else if (task->IsAsyncRunnerTask()) {
464         if (!AsyncRunnerManager::GetInstance().TriggerAsyncRunner(task->env_, task)) {
465             HILOG_ERROR("taskpool:: task %{public}s trigger in asyncRunner %{public}s failed",
466                         std::to_string(task->taskId_).c_str(), std::to_string(task->asyncRunnerId_).c_str());
467         }
468     }
469     if (task->IsPeriodicTask()) {
470         return;
471     }
472     if (!task->IsFunctionTask()) {
473         napi_reference_unref(task->env_, task->taskRef_, nullptr);
474         return;
475     }
476     // function task need release data
477     task->ReleaseData();
478     TaskManager::GetInstance().RemoveTask(task->taskId_);
479     delete task;
480 }
481 
UpdateGroupInfoByResult(napi_env env,Task * task,napi_value res,bool success)482 void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)
483 {
484     HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str());
485     TaskManager::GetInstance().DecreaseSendDataRefCount(task->env_, task->taskId_);
486     task->UpdateTaskStateToFinished();
487     napi_reference_unref(env, task->taskRef_, nullptr);
488     if (task->IsGroupCommonTask()) {
489         delete task->currentTaskInfo_;
490         task->currentTaskInfo_ = nullptr;
491     }
492     TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
493     if (taskGroup == nullptr || taskGroup->currentGroupInfo_ == nullptr) {
494         HILOG_DEBUG("taskpool:: taskGroup may have been released or canceled");
495         return;
496     }
497     // store the result
498     uint32_t index = taskGroup->GetTaskIndex(task->taskId_);
499     auto groupInfo = taskGroup->currentGroupInfo_;
500     napi_ref arrRef = groupInfo->resArr;
501     napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
502     napi_set_element(env, resArr, index, res);
503     groupInfo->finishedTaskNum++;
504     // store the index when the first exception occurs
505     if (!success && !groupInfo->HasException()) {
506         groupInfo->SetFailedIndex(index);
507     }
508     // we will not handle the result until all tasks are finished
509     if (groupInfo->finishedTaskNum < taskGroup->taskNum_) {
510         return;
511     }
512     // if there is no exception, just resolve
513     if (!groupInfo->HasException()) {
514         HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str());
515         napi_resolve_deferred(env, groupInfo->deferred, resArr);
516         for (uint32_t taskId : taskGroup->taskIds_) {
517             auto task = TaskManager::GetInstance().GetTask(taskId);
518             if (task != nullptr && task->onExecutionSucceededCallBackInfo_ != nullptr) {
519                 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_, task->taskId_);
520             }
521         }
522     } else { // LOCV_EXCL_BR_LINE
523         napi_value res = nullptr;
524         napi_get_element(env, resArr, groupInfo->GetFailedIndex(), &res);
525         napi_reject_deferred(env, groupInfo->deferred, res);
526         auto iter = taskGroup->taskIds_.begin();
527         std::advance(iter, groupInfo->GetFailedIndex());
528         auto task = iter != taskGroup->taskIds_.end() ? TaskManager::GetInstance().GetTask(*iter) : nullptr;
529         if (task != nullptr && task->onExecutionFailedCallBackInfo_ != nullptr) {
530             task->onExecutionFailedCallBackInfo_->taskError_ = res;
531             task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_, task->taskId_);
532         }
533     }
534     taskGroup->groupState_ = ExecuteState::FINISHED;
535     napi_delete_reference(env, groupInfo->resArr);
536     napi_reference_unref(env, taskGroup->groupRef_, nullptr);
537     delete groupInfo;
538     taskGroup->currentGroupInfo_ = nullptr;
539     taskGroup->NotifyGroupTask(env);
540 }
541 
ExecuteTask(napi_env env,Task * task,Priority priority)542 void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority)
543 {
544     // tag for trace parse: Task Allocation
545     std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_)
546         + ", priority : " + std::to_string(priority)
547         + ", executeState : " + std::to_string(ExecuteState::WAITING);
548     HITRACE_HELPER_METER_NAME(strTrace);
549     std::string taskLog = "Task Allocation: " + std::to_string(task->taskId_)
550         + ", " + std::to_string(priority);
551     task->IncreaseRefCount();
552     TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_);
553     if (task->UpdateTaskStateToWaiting()) {
554         HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
555         task->isCancelToFinish_ = false;
556         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
557     } else {
558         HILOG_WARN("taskpool:: %{public}s, not enqueue", taskLog.c_str());
559     }
560 }
561 
Cancel(napi_env env,napi_callback_info cbinfo)562 napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
563 {
564     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
565     size_t argc = 1;
566     napi_value args[1];
567     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
568     if (argc < 1) {
569         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
570         return nullptr;
571     }
572 
573     if (NapiHelper::IsNumber(env, args[0])) {
574         uint32_t taskId = NapiHelper::GetUint32Value(env, args[0]);
575         TaskManager::GetInstance().CancelTask(env, taskId);
576         return nullptr;
577     }
578 
579     if (!NapiHelper::IsObject(env, args[0])) {
580         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
581         return nullptr;
582     }
583 
584     if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
585         napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
586         if (napiTaskId == nullptr) {
587             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task.");
588             return nullptr;
589         }
590         uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId);
591         TaskManager::GetInstance().CancelTask(env, taskId);
592     } else {
593         napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
594         if (napiGroupId == nullptr) {
595             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be taskGroup.");
596             return nullptr;
597         }
598         uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
599         TaskGroupManager::GetInstance().CancelGroup(env, groupId);
600     }
601     return nullptr;
602 }
603 
IsConcurrent(napi_env env,napi_callback_info cbinfo)604 napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo)
605 {
606     size_t argc = 1;
607     napi_value args[1];
608     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
609     if (argc != 1) {
610         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
611         return nullptr;
612     }
613 
614     if (!NapiHelper::IsFunction(env, args[0])) {
615         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be function.");
616         return nullptr;
617     }
618 
619     bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]);
620     return NapiHelper::CreateBooleanValue(env, isConcurrent);
621 }
622 
PeriodicTaskCallback(uv_timer_t * handle)623 void TaskPool::PeriodicTaskCallback(uv_timer_t* handle)
624 {
625     Task* task = reinterpret_cast<Task*>(handle->data);
626     if (task == nullptr) {
627         HILOG_DEBUG("taskpool:: the task is nullptr");
628         return;
629     } else if (!task->IsPeriodicTask()) {
630         HILOG_DEBUG("taskpool:: the current task is not a periodic task");
631         return;
632     } else if (task->taskState_ == ExecuteState::CANCELED) {
633         if (task->currentTaskInfo_ == nullptr) {
634             HILOG_DEBUG("taskpool:: the periodic task has been canceled");
635             napi_reference_unref(task->env_, task->taskRef_, nullptr);
636             task->CancelPendingTask(task->env_);
637             uv_timer_stop(task->timer_);
638             ConcurrentHelper::UvHandleClose(task->timer_);
639         }
640         return;
641     }
642     TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_);
643 
644     napi_status status = napi_ok;
645     HandleScope scope(task->env_, status);
646     if (status != napi_ok) {
647         HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
648         return;
649     }
650     napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
651     TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_);
652     if (taskInfo == nullptr) {
653         HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr");
654         return;
655     }
656 
657     task->IncreaseRefCount();
658     HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str());
659     if (task->UpdateTaskStateToWaiting()) {
660         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_);
661     }
662 }
663 
ExecutePeriodically(napi_env env,napi_callback_info cbinfo)664 napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo)
665 {
666     int32_t period = 0;
667     uint32_t priority = Priority::DEFAULT;
668     Task* periodicTask = nullptr;
669     if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) {
670         return nullptr;
671     }
672 
673     if (!periodicTask->CanExecutePeriodically(env)) {
674         return nullptr;
675     }
676     periodicTask->UpdatePeriodicTask();
677 
678     periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority);
679     napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_);
680     auto [func, args, transferList, cloneList] = Task::GetSerializeParams(env, napiTask);
681     if (func == nullptr || args == nullptr) {
682         return nullptr;
683     }
684     std::tuple<napi_value, napi_value, bool, bool> params = {
685         transferList, cloneList, periodicTask->defaultTransfer_, periodicTask->defaultCloneSendable_
686     };
687     auto [serFunction, serArguments] = Task::GetSerializeResult(env, func, args, params);
688     if (serFunction == nullptr || serArguments == nullptr) { // LOCV_EXCL_BR_LINE
689         return nullptr;
690     }
691 
692     TriggerTimer(env, periodicTask, period);
693     return nullptr;
694 }
695 
TriggerTimer(napi_env env,Task * task,int32_t period)696 void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period)
697 {
698     HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str());
699     uv_loop_t* loop = NapiHelper::GetLibUV(env);
700     task->timer_ = new uv_timer_t;
701     uv_timer_init(loop, task->timer_);
702     task->timer_->data = task;
703     uv_update_time(loop);
704     uv_timer_start(task->timer_, PeriodicTaskCallback, period, period);
705     NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
706     if (engine->IsMainThread()) {
707         uv_async_send(&loop->wq_async);
708     }
709 }
710 
CheckDelayedParams(napi_env env,napi_callback_info cbinfo,uint32_t & priority,int32_t & delayTime,Task * & task)711 bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime,
712                                   Task* &task)
713 {
714     size_t argc = 3; // 3: delayTime, task and priority
715     napi_value args[3]; // 3: delayTime, task and priority
716     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
717     if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority
718         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
719         return false;
720     }
721 
722     if (!NapiHelper::IsNumber(env, args[0])) {
723         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
724         return false;
725     }
726 
727     if (!NapiHelper::IsObject(env, args[1])) {
728         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be object.");
729         return false;
730     }
731 
732     delayTime = NapiHelper::GetInt32Value(env, args[0]);
733     if (delayTime < 0) {
734         ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero");
735         return false;
736     }
737 
738     if (argc > 2) { // 2: the params might have priority
739         if (!NapiHelper::IsNumber(env, args[2])) {
740             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the third param must be number.");
741             return false;
742         }
743         priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority
744         if (priority >= Priority::NUMBER) {
745             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error.");
746             return false;
747         }
748     }
749 
750     napi_unwrap(env, args[1], reinterpret_cast<void**>(&task));
751     if (task == nullptr) {
752         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of second param must be task");
753         return false;
754     }
755     if (!task->CanExecuteDelayed(env)) {
756         return false;
757     }
758     return true;
759 }
760 
CheckPeriodicallyParams(napi_env env,napi_callback_info cbinfo,int32_t & period,uint32_t & priority,Task * & periodicTask)761 bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period,
762                                        uint32_t &priority, Task* &periodicTask)
763 {
764     size_t argc = 3; // 3 : period, task, priority
765     napi_value args[3]; // 3 : period, task, priority
766     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
767     if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority
768         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
769         return false;
770     }
771     if (!NapiHelper::IsNumber(env, args[0])) {
772         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
773         return false;
774     }
775     period = NapiHelper::GetInt32Value(env, args[0]);
776     if (period < 0) {
777         ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero.");
778         return false;
779     }
780     if (!NapiHelper::IsObject(env, args[1])) {
781         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
782         return false;
783     }
784 
785     if (argc >= 3) { // 3 : third param maybe priority
786         if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority
787             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority.");
788             return false;
789         }
790         priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority
791         if (priority >= Priority::NUMBER) {
792             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid.");
793             return false;
794         }
795     }
796 
797     napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask));
798     if (periodicTask == nullptr) {
799         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
800         return false;
801     }
802 
803     return true;
804 }
805 
RecordTaskResultLog(Task * task,napi_status status,napi_value & napiTaskResult,bool & isCancel)806 void TaskPool::RecordTaskResultLog(Task* task, napi_status status, napi_value& napiTaskResult, bool& isCancel)
807 {
808     // tag for trace parse: Task PerformTask End
809     std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_);
810     std::string taskLog = "Task PerformTask End: " + std::to_string(task->taskId_);
811     if (task->taskState_ == ExecuteState::CANCELED) {
812         strTrace += ", performResult : IsCanceled";
813         napiTaskResult = task->IsAsyncRunnerTask() ? TaskManager::GetInstance().CancelError(task->env_,
814             ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED, nullptr, napiTaskResult, task->success_) :
815             TaskManager::GetInstance().CancelError(task->env_, 0, nullptr, napiTaskResult, task->success_);
816         isCancel = true;
817     } else if (status != napi_ok) {
818         strTrace += ", performResult : DeserializeFailed";
819         taskLog += ", DeserializeFailed";
820     } else if (task->success_) {
821         strTrace += ", performResult : Successful";
822     } else {
823         strTrace += ", performResult : Unsuccessful";
824         taskLog += ", Unsuccessful";
825     }
826     HITRACE_HELPER_METER_NAME(strTrace);
827     HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
828 }
829 } // namespace Commonlibrary::Concurrent::TaskPoolModule
830