• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "taskpool.h"
17 
18 #include "async_runner_manager.h"
19 #include "helper/hitrace_helper.h"
20 #include "sequence_runner_manager.h"
21 #include "task_group_manager.h"
22 
23 namespace Commonlibrary::Concurrent::TaskPoolModule {
24 using namespace Commonlibrary::Concurrent::Common::Helper;
25 
InitTaskPool(napi_env env,napi_value exports)26 napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
27 {
28     HILOG_INFO("taskpool:: Import taskpool");
29     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
30     napi_value taskClass = nullptr;
31     NAPI_CALL(env, napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor,
32               nullptr, 0, nullptr, &taskClass));
33     napi_value longTaskClass = nullptr;
34     NAPI_CALL(env, napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor,
35               nullptr, 0, nullptr, &longTaskClass));
36     napi_value genericsTaskClass = nullptr;
37     NAPI_CALL(env, napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor,
38               nullptr, 0, nullptr, &genericsTaskClass));
39     napi_value isCanceledFunc = nullptr;
40     NAPI_CALL(env, napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc));
41     napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
42     napi_set_named_property(env, longTaskClass, "isCanceled", isCanceledFunc);
43     napi_value sendDataFunc = nullptr;
44     NAPI_CALL(env, napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc));
45     napi_set_named_property(env, taskClass, "sendData", sendDataFunc);
46     napi_set_named_property(env, longTaskClass, "sendData", sendDataFunc);
47     napi_value taskGroupClass = nullptr;
48     NAPI_CALL(env, napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor,
49               nullptr, 0, nullptr, &taskGroupClass));
50     napi_value seqRunnerClass = nullptr;
51     NAPI_CALL(env, napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor,
52               nullptr, 0, nullptr, &seqRunnerClass));
53     napi_value asyncRunnerClass = nullptr;
54     NAPI_CALL(env, napi_define_class(env, "AsyncRunner", NAPI_AUTO_LENGTH, AsyncRunner::AsyncRunnerConstructor,
55               nullptr, 0, nullptr, &asyncRunnerClass));
56 
57     // define priority
58     napi_value priorityObj = NapiHelper::CreateObject(env);
59     napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
60     napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
61     napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
62     napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
63     napi_property_descriptor exportPriority[] = {
64         DECLARE_NAPI_PROPERTY("HIGH", highPriority),
65         DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
66         DECLARE_NAPI_PROPERTY("LOW", lowPriority),
67         DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
68     };
69     napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
70 
71     // define State
72     napi_value stateObj = NapiHelper::CreateObject(env);
73     napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING);
74     napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING);
75     napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED);
76     napi_property_descriptor exportState[] = {
77         DECLARE_NAPI_PROPERTY("WAITING", waitingState),
78         DECLARE_NAPI_PROPERTY("RUNNING", runningState),
79         DECLARE_NAPI_PROPERTY("CANCELED", canceledState),
80     };
81     napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState);
82 
83     napi_property_descriptor properties[] = {
84         DECLARE_NAPI_PROPERTY("Task", taskClass),
85         DECLARE_NAPI_PROPERTY("LongTask", longTaskClass),
86         DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass),
87         DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
88         DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass),
89         DECLARE_NAPI_PROPERTY("AsyncRunner", asyncRunnerClass),
90         DECLARE_NAPI_PROPERTY("Priority", priorityObj),
91         DECLARE_NAPI_PROPERTY("State", stateObj),
92         DECLARE_NAPI_FUNCTION("execute", Execute),
93         DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed),
94         DECLARE_NAPI_FUNCTION("cancel", Cancel),
95         DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
96         DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask),
97         DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent),
98         DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically),
99     };
100     napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
101 
102     TaskManager::GetInstance().InitTaskManager(env);
103     return exports;
104 }
105 
106 // ---------------------------------- SendData ---------------------------------------
ExecuteCallback(const uv_async_t * req)107 void TaskPool::ExecuteCallback(const uv_async_t* req)
108 {
109     auto* msgQueue = TaskManager::GetInstance().GetMessageQueue(req);
110     if (msgQueue == nullptr) {
111         HILOG_WARN("taskpool:: msgQueue is nullptr");
112         return;
113     }
114     ExecuteCallbackInner(*msgQueue);
115 }
116 
ExecuteCallbackTask(CallbackInfo * callbackInfo)117 void TaskPool::ExecuteCallbackTask(CallbackInfo* callbackInfo)
118 {
119     auto* msgQueue = TaskManager::GetInstance().GetMessageQueueFromCallbackInfo(callbackInfo);
120     if (msgQueue == nullptr) {
121         HILOG_WARN("taskpool:: msgQueue is nullptr");
122         return;
123     }
124     ExecuteCallbackInner(*msgQueue);
125 }
126 
ExecuteCallbackInner(MsgQueue & msgQueue)127 void TaskPool::ExecuteCallbackInner(MsgQueue& msgQueue)
128 {
129     while (!msgQueue.IsEmpty()) {
130         auto resultInfo = msgQueue.DeQueue();
131         if (resultInfo == nullptr) {
132             HILOG_DEBUG("taskpool:: resultInfo is nullptr");
133             continue;
134         }
135         ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false);
136         napi_status status = napi_ok;
137         CallbackScope callbackScope(resultInfo->hostEnv, resultInfo->workerEnv, resultInfo->taskId, status);
138         if (status != napi_ok) {
139             HILOG_ERROR("napi_open_handle_scope failed");
140             return;
141         }
142         auto env = resultInfo->hostEnv;
143         auto callbackInfo = TaskManager::GetInstance().GetCallbackInfo(resultInfo->taskId);
144         if (callbackInfo == nullptr) {
145             HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
146             ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
147             continue;
148         }
149         TaskManager::GetInstance().ResetCallbackInfoWorker(callbackInfo);
150         auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef);
151         napi_value args;
152         napi_value result;
153         status = napi_deserialize(env, resultInfo->serializationArgs, &args);
154         napi_delete_serialization_data(env, resultInfo->serializationArgs);
155         if (status != napi_ok || args == nullptr) {
156             std::string errMessage = "taskpool:: failed to serialize function";
157             HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
158             ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
159             continue;
160         }
161         uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
162         napi_value argsArray[argsNum];
163         for (size_t i = 0; i < argsNum; i++) {
164             argsArray[i] = NapiHelper::GetElement(env, args, i);
165         }
166         napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result);
167         if (NapiHelper::IsExceptionPending(env)) {
168             napi_value exception = nullptr;
169             napi_get_and_clear_last_exception(env, &exception);
170             HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function");
171         }
172     }
173 }
174 // ---------------------------------- SendData ---------------------------------------
175 
GetTaskPoolInfo(napi_env env,napi_callback_info cbinfo)176 napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
177 {
178     napi_value result = nullptr;
179     napi_create_object(env, &result);
180     napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
181     napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
182     napi_set_named_property(env, result, "threadInfos", threadInfos);
183     napi_set_named_property(env, result, "taskInfos", taskInfos);
184     return result;
185 }
186 
TerminateTask(napi_env env,napi_callback_info cbinfo)187 napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo)
188 {
189     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
190     size_t argc = 1; // 1: long task
191     napi_value args[1];
192     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
193     if (argc < 1) {
194         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be one.");
195         return nullptr;
196     }
197     if (!NapiHelper::IsObject(env, args[0])) {
198         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
199         return nullptr;
200     }
201     napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
202     uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId);
203     auto task = TaskManager::GetInstance().GetTask(taskId);
204     if (task == nullptr || !task->IsLongTask()) {
205         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be long task.");
206         return nullptr;
207     }
208     TaskManager::GetInstance().TerminateTask(taskId);
209     return nullptr;
210 }
211 
Execute(napi_env env,napi_callback_info cbinfo)212 napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
213 {
214     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
215     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
216     if (argc < 1) {
217         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one.");
218         return nullptr;
219     }
220     napi_value* args = new napi_value[argc];
221     ObjectScope<napi_value> scope(args, true);
222     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
223     napi_valuetype type = napi_undefined;
224     napi_typeof(env, args[0], &type);
225     if (type == napi_object) {
226         uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
227         if (argc > 1) {
228             if (!NapiHelper::IsNumber(env, args[1])) {
229                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be number.");
230                 return nullptr;
231             }
232             priority = NapiHelper::GetUint32Value(env, args[1]);
233             if (priority >= Priority::NUMBER) {
234                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error");
235                 return nullptr;
236             }
237         }
238         if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
239             return ExecuteGroup(env, args[0], static_cast<Priority>(priority));
240         }
241         Task* task = nullptr;
242         napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
243         if (task == nullptr) {
244             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
245             return nullptr;
246         }
247         if (!task->CanExecute(env)) {
248             return nullptr;
249         }
250         napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK,
251                                                       static_cast<Priority>(priority));
252         if (promise == nullptr) {
253             return nullptr;
254         }
255         ExecuteTask(env, task, static_cast<Priority>(priority));
256         return promise;
257     }
258     if (type != napi_function) {
259         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
260             "the type of the first param must be object or function.");
261         return nullptr;
262     }
263     Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK);
264     if (task == nullptr) {
265         HILOG_ERROR("taskpool:: GenerateFunctionTask failed");
266         return nullptr;
267     }
268     napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);
269     ExecuteTask(env, task);
270     return promise;
271 }
272 
DelayTask(uv_timer_t * handle)273 void TaskPool::DelayTask(uv_timer_t* handle)
274 {
275     TaskMessage* taskMessage = static_cast<TaskMessage*>(handle->data);
276     auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId);
277     napi_status status = napi_ok;
278     if (task == nullptr) {
279         HILOG_DEBUG("taskpool:: task is nullptr");
280     } else if (task->taskState_ == ExecuteState::CANCELED) {
281         HILOG_DEBUG("taskpool:: DelayTask task has been canceled");
282         HandleScope scope(task->env_, status);
283         if (status != napi_ok) {
284             HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
285             return;
286         }
287         napi_value error = ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
288         napi_reject_deferred(task->env_, taskMessage->deferred, error);
289     } else {
290         HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str());
291         HandleScope scope(task->env_, status);
292         if (status != napi_ok) {
293             HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
294             return;
295         }
296         TaskManager::GetInstance().IncreaseRefCount(taskMessage->taskId);
297         task->IncreaseRefCount();
298         napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
299         TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority);
300         if (taskInfo != nullptr) {
301             taskInfo->deferred = taskMessage->deferred;
302             if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) {
303                 task->taskState_ = ExecuteState::WAITING;
304                 TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority));
305             }
306         } else {
307             napi_value execption = nullptr;
308             napi_get_and_clear_last_exception(task->env_, &execption);
309             if (execption != nullptr) {
310                 napi_reject_deferred(task->env_, taskMessage->deferred, execption);
311             }
312         }
313     }
314     if (task != nullptr) {
315         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
316         task->delayedTimers_.erase(handle);
317     }
318     uv_timer_stop(handle);
319     ConcurrentHelper::UvHandleClose(handle);
320     delete taskMessage;
321     taskMessage = nullptr;
322 }
323 
ExecuteDelayed(napi_env env,napi_callback_info cbinfo)324 napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo)
325 {
326     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
327     uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
328     int32_t delayTime = 0;
329     Task* task = nullptr;
330     if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) {
331         return nullptr;
332     }
333 
334     if (!task->IsExecuted() || task->taskState_ == ExecuteState::CANCELED ||
335         task->taskState_ == ExecuteState::FINISHED) {
336         task->taskState_ = ExecuteState::DELAYED;
337     }
338     task->UpdateTaskType(TaskType::COMMON_TASK);
339 
340     uv_loop_t* loop = NapiHelper::GetLibUV(env);
341     uv_update_time(loop);
342     uv_timer_t* timer = new uv_timer_t;
343     uv_timer_init(loop, timer);
344     TaskMessage* taskMessage = new TaskMessage();
345     taskMessage->priority = static_cast<Priority>(priority);
346     taskMessage->taskId = task->taskId_;
347     napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred);
348     timer->data = taskMessage;
349 
350     std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_);
351     strTrace += ", priority: " + std::to_string(priority);
352     strTrace += ", delayTime " + std::to_string(delayTime);
353     HITRACE_HELPER_METER_NAME(strTrace);
354     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
355 
356     uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0);
357     {
358         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
359         task->delayedTimers_.insert(timer);
360     }
361     NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
362     if (engine->IsMainThread()) {
363         uv_async_send(&loop->wq_async);
364     }
365     return promise;
366 }
367 
ExecuteGroup(napi_env env,napi_value napiTaskGroup,Priority priority)368 napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)
369 {
370     napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR);
371     uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
372     HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str());
373     auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
374     if (taskGroup == nullptr) {
375         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskGroup is nullptr.");
376         return nullptr;
377     }
378     napi_reference_ref(env, taskGroup->groupRef_, nullptr);
379     if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED ||
380         taskGroup->groupState_ == ExecuteState::CANCELED) {
381         taskGroup->groupState_ = ExecuteState::WAITING;
382     }
383     GroupInfo* groupInfo = new GroupInfo();
384     groupInfo->priority = priority;
385     napi_value resArr;
386     napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr);
387     napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
388     groupInfo->resArr = arrRef;
389     napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
390     {
391         std::lock_guard<std::recursive_mutex> lock(taskGroup->taskGroupMutex_);
392         if (taskGroup->taskNum_ == 0) {
393             napi_resolve_deferred(env, groupInfo->deferred, resArr);
394             taskGroup->groupState_ = ExecuteState::FINISHED;
395             napi_delete_reference(env, groupInfo->resArr);
396             napi_reference_unref(env, taskGroup->groupRef_, nullptr);
397             delete groupInfo;
398             taskGroup->currentGroupInfo_ = nullptr;
399             return promise;
400         }
401         if (taskGroup->currentGroupInfo_ == nullptr) {
402             taskGroup->currentGroupInfo_ = groupInfo;
403             for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) {
404                 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
405                 Task* task = nullptr;
406                 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
407                 if (task == nullptr) {
408                     HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
409                     return nullptr;
410                 }
411                 napi_reference_ref(env, task->taskRef_, nullptr);
412                 if (task->IsGroupCommonTask()) {
413                     task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority));
414                 }
415                 ExecuteTask(env, task, static_cast<Priority>(priority));
416             }
417         } else {
418             taskGroup->pendingGroupInfos_.push_back(groupInfo);
419         }
420     }
421     return promise;
422 }
423 
HandleTaskResult(const uv_async_t * req)424 void TaskPool::HandleTaskResult(const uv_async_t* req)
425 {
426     HILOG_DEBUG("taskpool:: HandleTaskResult task");
427     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
428     auto task = static_cast<Task*>(req->data);
429     if (task == nullptr) { // LCOV_EXCL_BR_LINE
430         HILOG_FATAL("taskpool:: HandleTaskResult task is null");
431         return;
432     }
433     if (!task->IsMainThreadTask()) {
434         if (task->ShouldDeleteTask(false)) {
435             delete task;
436             return;
437         }
438         if (task->IsFunctionTask()) {
439             napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task);
440         }
441     }
442     task->DecreaseTaskRefCount();
443     HandleTaskResultCallback(task);
444 }
445 
HandleTaskResultCallback(Task * task)446 void TaskPool::HandleTaskResultCallback(Task* task)
447 {
448     napi_handle_scope scope = nullptr;
449     NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope));
450     napi_value napiTaskResult = nullptr;
451     napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult);
452     napi_delete_serialization_data(task->env_, task->result_);
453 
454     // tag for trace parse: Task PerformTask End
455     std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_);
456     std::string taskLog = "Task PerformTask End: " + std::to_string(task->taskId_);
457     if (task->taskState_ == ExecuteState::CANCELED) {
458         strTrace += ", performResult : IsCanceled";
459         napiTaskResult = task->IsAsyncRunnerTask() ?
460             ErrorHelper::NewError(task->env_, ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED) :
461             ErrorHelper::NewError(task->env_, 0, "taskpool:: task has been canceled");
462     } else if (status != napi_ok) {
463         strTrace += ", performResult : DeserializeFailed";
464         taskLog += ", DeserializeFailed";
465     } else if (task->success_) {
466         strTrace += ", performResult : Successful";
467     } else {
468         strTrace += ", performResult : Unsuccessful";
469         taskLog += ", Unsuccessful";
470     }
471     HITRACE_HELPER_METER_NAME(strTrace);
472     HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
473     if (napiTaskResult == nullptr) {
474         napi_get_undefined(task->env_, &napiTaskResult);
475     }
476     reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter();
477     bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_);
478     task->taskState_ = ExecuteState::ENDING;
479     task->isCancelToFinish_ = false;
480     if (task->IsGroupTask()) {
481         UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success);
482     } else if (!task->IsPeriodicTask()) {
483         if (success) {
484             napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
485             if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
486                 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
487             }
488         } else {
489             napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
490             if (task->onExecutionFailedCallBackInfo_ != nullptr) {
491                 task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult;
492                 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
493             }
494         }
495     }
496     NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope));
497     TriggerTask(task);
498 }
499 
TriggerTask(Task * task)500 void TaskPool::TriggerTask(Task* task)
501 {
502     HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str());
503     if (task->IsGroupTask()) {
504         return;
505     }
506     TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
507     task->taskState_ = ExecuteState::FINISHED;
508     // seqRunnerTask will trigger the next
509     if (task->IsSeqRunnerTask()) {
510         if (!SequenceRunnerManager::GetInstance().TriggerSeqRunner(task->env_, task)) {
511             HILOG_ERROR("taskpool:: task %{public}s trigger in seqRunner %{public}s failed",
512                         std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str());
513         }
514     } else if (task->IsCommonTask()) {
515         task->NotifyPendingTask();
516     } else if (task->IsAsyncRunnerTask()) {
517         if (!AsyncRunnerManager::GetInstance().TriggerAsyncRunner(task->env_, task)) {
518             HILOG_ERROR("taskpool:: task %{public}s trigger in asyncRunner %{public}s failed",
519                         std::to_string(task->taskId_).c_str(), std::to_string(task->asyncRunnerId_).c_str());
520         }
521     }
522     if (task->IsPeriodicTask()) {
523         return;
524     }
525     if (!task->IsFunctionTask()) {
526         napi_reference_unref(task->env_, task->taskRef_, nullptr);
527         return;
528     }
529     // function task need release data
530     task->ReleaseData();
531     TaskManager::GetInstance().RemoveTask(task->taskId_);
532     delete task;
533 }
534 
UpdateGroupInfoByResult(napi_env env,Task * task,napi_value res,bool success)535 void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)
536 {
537     HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str());
538     TaskManager::GetInstance().DecreaseRefCount(task->env_, task->taskId_);
539     task->taskState_ = ExecuteState::FINISHED;
540     napi_reference_unref(env, task->taskRef_, nullptr);
541     if (task->IsGroupCommonTask()) {
542         delete task->currentTaskInfo_;
543         task->currentTaskInfo_ = nullptr;
544     }
545     TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
546     if (taskGroup == nullptr || taskGroup->currentGroupInfo_ == nullptr) {
547         HILOG_DEBUG("taskpool:: taskGroup may have been released or canceled");
548         return;
549     }
550     // store the result
551     uint32_t index = taskGroup->GetTaskIndex(task->taskId_);
552     auto groupInfo = taskGroup->currentGroupInfo_;
553     napi_ref arrRef = groupInfo->resArr;
554     napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
555     napi_set_element(env, resArr, index, res);
556     groupInfo->finishedTaskNum++;
557     // store the index when the first exception occurs
558     if (!success && !groupInfo->HasException()) {
559         groupInfo->SetFailedIndex(index);
560     }
561     // we will not handle the result until all tasks are finished
562     if (groupInfo->finishedTaskNum < taskGroup->taskNum_) {
563         return;
564     }
565     // if there is no exception, just resolve
566     if (!groupInfo->HasException()) {
567         HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str());
568         napi_resolve_deferred(env, groupInfo->deferred, resArr);
569         for (uint32_t taskId : taskGroup->taskIds_) {
570             auto task = TaskManager::GetInstance().GetTask(taskId);
571             if (task != nullptr && task->onExecutionSucceededCallBackInfo_ != nullptr) {
572                 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_);
573             }
574         }
575     } else {
576         napi_value res = nullptr;
577         napi_get_element(env, resArr, groupInfo->GetFailedIndex(), &res);
578         napi_reject_deferred(env, groupInfo->deferred, res);
579         auto iter = taskGroup->taskIds_.begin();
580         std::advance(iter, groupInfo->GetFailedIndex());
581         auto task = iter != taskGroup->taskIds_.end() ? TaskManager::GetInstance().GetTask(*iter) : nullptr;
582         if (task != nullptr && task->onExecutionFailedCallBackInfo_ != nullptr) {
583             task->onExecutionFailedCallBackInfo_->taskError_ = res;
584             task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_);
585         }
586     }
587     taskGroup->groupState_ = ExecuteState::FINISHED;
588     napi_delete_reference(env, groupInfo->resArr);
589     napi_reference_unref(env, taskGroup->groupRef_, nullptr);
590     delete groupInfo;
591     taskGroup->currentGroupInfo_ = nullptr;
592     taskGroup->NotifyGroupTask(env);
593 }
594 
ExecuteTask(napi_env env,Task * task,Priority priority)595 void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority)
596 {
597     // tag for trace parse: Task Allocation
598     std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_)
599         + ", priority : " + std::to_string(priority)
600         + ", executeState : " + std::to_string(ExecuteState::WAITING);
601     HITRACE_HELPER_METER_NAME(strTrace);
602     std::string taskLog = "Task Allocation: " + std::to_string(task->taskId_)
603         + ", " + std::to_string(priority);
604     HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
605     task->IncreaseRefCount();
606     TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
607     if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
608         (task->taskState_ == ExecuteState::CANCELED && task->isCancelToFinish_)) {
609         task->taskState_ = ExecuteState::WAITING;
610         task->isCancelToFinish_ = false;
611         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
612     }
613 }
614 
Cancel(napi_env env,napi_callback_info cbinfo)615 napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
616 {
617     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
618     size_t argc = 1;
619     napi_value args[1];
620     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
621     if (argc < 1) {
622         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
623         return nullptr;
624     }
625 
626     if (NapiHelper::IsNumber(env, args[0])) {
627         uint32_t taskId = NapiHelper::GetUint32Value(env, args[0]);
628         TaskManager::GetInstance().CancelTask(env, taskId);
629         return nullptr;
630     }
631 
632     if (!NapiHelper::IsObject(env, args[0])) {
633         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
634         return nullptr;
635     }
636 
637     if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
638         napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
639         if (napiTaskId == nullptr) {
640             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task.");
641             return nullptr;
642         }
643         uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId);
644         TaskManager::GetInstance().CancelTask(env, taskId);
645     } else {
646         napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
647         if (napiGroupId == nullptr) {
648             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be taskGroup.");
649             return nullptr;
650         }
651         uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
652         TaskGroupManager::GetInstance().CancelGroup(env, groupId);
653     }
654     return nullptr;
655 }
656 
IsConcurrent(napi_env env,napi_callback_info cbinfo)657 napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo)
658 {
659     size_t argc = 1;
660     napi_value args[1];
661     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
662     if (argc != 1) {
663         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
664         return nullptr;
665     }
666 
667     if (!NapiHelper::IsFunction(env, args[0])) {
668         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be function.");
669         return nullptr;
670     }
671 
672     bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]);
673     return NapiHelper::CreateBooleanValue(env, isConcurrent);
674 }
675 
PeriodicTaskCallback(uv_timer_t * handle)676 void TaskPool::PeriodicTaskCallback(uv_timer_t* handle)
677 {
678     Task* task = reinterpret_cast<Task*>(handle->data);
679     if (task == nullptr) {
680         HILOG_DEBUG("taskpool:: the task is nullptr");
681         return;
682     } else if (!task->IsPeriodicTask()) {
683         HILOG_DEBUG("taskpool:: the current task is not a periodic task");
684         return;
685     } else if (task->taskState_ == ExecuteState::CANCELED) {
686         HILOG_DEBUG("taskpool:: the periodic task has been canceled");
687         napi_reference_unref(task->env_, task->taskRef_, nullptr);
688         task->CancelPendingTask(task->env_);
689         uv_timer_stop(task->timer_);
690         ConcurrentHelper::UvHandleClose(task->timer_);
691         return;
692     }
693     TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
694 
695     if (!task->isFirstTaskInfo_) {
696         napi_status status = napi_ok;
697         HandleScope scope(task->env_, status);
698         if (status != napi_ok) {
699             HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
700             return;
701         }
702         napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
703         TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_);
704         if (taskInfo == nullptr) {
705             HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr");
706             return;
707         }
708     }
709     task->isFirstTaskInfo_ = false;
710 
711     task->IncreaseRefCount();
712     HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str());
713     if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED) {
714         task->taskState_ = ExecuteState::WAITING;
715         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_);
716     }
717 }
718 
ExecutePeriodically(napi_env env,napi_callback_info cbinfo)719 napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo)
720 {
721     int32_t period = 0;
722     uint32_t priority = Priority::DEFAULT;
723     Task* periodicTask = nullptr;
724     if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) {
725         return nullptr;
726     }
727 
728     if (!periodicTask->CanExecutePeriodically(env)) {
729         return nullptr;
730     }
731     periodicTask->UpdatePeriodicTask();
732 
733     periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority);
734     napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_);
735     TaskInfo* taskInfo = periodicTask->GetTaskInfo(env, napiTask, periodicTask->periodicTaskPriority_);
736     if (taskInfo == nullptr) {
737         return nullptr;
738     }
739 
740     periodicTask->isFirstTaskInfo_ = true; // periodic task first Generate TaskInfo
741 
742     TriggerTimer(env, periodicTask, period);
743     return nullptr;
744 }
745 
TriggerTimer(napi_env env,Task * task,int32_t period)746 void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period)
747 {
748     HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str());
749     uv_loop_t* loop = NapiHelper::GetLibUV(env);
750     task->timer_ = new uv_timer_t;
751     uv_timer_init(loop, task->timer_);
752     task->timer_->data = task;
753     uv_update_time(loop);
754     uv_timer_start(task->timer_, PeriodicTaskCallback, period, period);
755     NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
756     if (engine->IsMainThread()) {
757         uv_async_send(&loop->wq_async);
758     }
759 }
760 
CheckDelayedParams(napi_env env,napi_callback_info cbinfo,uint32_t & priority,int32_t & delayTime,Task * & task)761 bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime,
762                                   Task* &task)
763 {
764     size_t argc = 3; // 3: delayTime, task and priority
765     napi_value args[3]; // 3: delayTime, task and priority
766     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
767     if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority
768         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
769         return false;
770     }
771 
772     if (!NapiHelper::IsNumber(env, args[0])) {
773         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
774         return false;
775     }
776 
777     if (!NapiHelper::IsObject(env, args[1])) {
778         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be object.");
779         return false;
780     }
781 
782     delayTime = NapiHelper::GetInt32Value(env, args[0]);
783     if (delayTime < 0) {
784         ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero");
785         return false;
786     }
787 
788     if (argc > 2) { // 2: the params might have priority
789         if (!NapiHelper::IsNumber(env, args[2])) {
790             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the third param must be number.");
791             return false;
792         }
793         priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority
794         if (priority >= Priority::NUMBER) {
795             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error.");
796             return false;
797         }
798     }
799 
800     napi_unwrap(env, args[1], reinterpret_cast<void**>(&task));
801     if (task == nullptr) {
802         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of second param must be task");
803         return false;
804     }
805     if (!task->CanExecuteDelayed(env)) {
806         return false;
807     }
808     return true;
809 }
810 
CheckPeriodicallyParams(napi_env env,napi_callback_info cbinfo,int32_t & period,uint32_t & priority,Task * & periodicTask)811 bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period,
812                                        uint32_t &priority, Task* &periodicTask)
813 {
814     size_t argc = 3; // 3 : period, task, priority
815     napi_value args[3]; // 3 : period, task, priority
816     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
817     if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority
818         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
819         return false;
820     }
821     if (!NapiHelper::IsNumber(env, args[0])) {
822         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
823         return false;
824     }
825     period = NapiHelper::GetInt32Value(env, args[0]);
826     if (period < 0) {
827         ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero.");
828         return false;
829     }
830     if (!NapiHelper::IsObject(env, args[1])) {
831         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
832         return false;
833     }
834 
835     if (argc >= 3) { // 3 : third param maybe priority
836         if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority
837             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority.");
838             return false;
839         }
840         priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority
841         if (priority >= Priority::NUMBER) {
842             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid.");
843             return false;
844         }
845     }
846 
847     napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask));
848     if (periodicTask == nullptr) {
849         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
850         return false;
851     }
852 
853     return true;
854 }
855 } // namespace Commonlibrary::Concurrent::TaskPoolModule
856