1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "taskpool.h"
17
18 #include "async_runner_manager.h"
19 #include "helper/hitrace_helper.h"
20 #include "sequence_runner_manager.h"
21 #include "task_group_manager.h"
22
23 namespace Commonlibrary::Concurrent::TaskPoolModule {
24 using namespace Commonlibrary::Concurrent::Common::Helper;
25
InitTaskPool(napi_env env,napi_value exports)26 napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
27 {
28 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
29 napi_value taskClass = nullptr;
30 NAPI_CALL(env, napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor,
31 nullptr, 0, nullptr, &taskClass));
32 napi_value longTaskClass = nullptr;
33 NAPI_CALL(env, napi_define_class(env, "LongTask", NAPI_AUTO_LENGTH, Task::LongTaskConstructor,
34 nullptr, 0, nullptr, &longTaskClass));
35 napi_value genericsTaskClass = nullptr;
36 NAPI_CALL(env, napi_define_class(env, "GenericsTask", NAPI_AUTO_LENGTH, Task::TaskConstructor,
37 nullptr, 0, nullptr, &genericsTaskClass));
38 napi_value isCanceledFunc = nullptr;
39 NAPI_CALL(env, napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, Task::IsCanceled, NULL, &isCanceledFunc));
40 napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
41 napi_set_named_property(env, longTaskClass, "isCanceled", isCanceledFunc);
42 napi_value sendDataFunc = nullptr;
43 NAPI_CALL(env, napi_create_function(env, "sendData", NAPI_AUTO_LENGTH, Task::SendData, NULL, &sendDataFunc));
44 napi_set_named_property(env, taskClass, "sendData", sendDataFunc);
45 napi_set_named_property(env, longTaskClass, "sendData", sendDataFunc);
46 napi_value taskGroupClass = nullptr;
47 NAPI_CALL(env, napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor,
48 nullptr, 0, nullptr, &taskGroupClass));
49 napi_value seqRunnerClass = nullptr;
50 NAPI_CALL(env, napi_define_class(env, "SequenceRunner", NAPI_AUTO_LENGTH, SequenceRunner::SeqRunnerConstructor,
51 nullptr, 0, nullptr, &seqRunnerClass));
52 napi_value asyncRunnerClass = nullptr;
53 NAPI_CALL(env, napi_define_class(env, "AsyncRunner", NAPI_AUTO_LENGTH, AsyncRunner::AsyncRunnerConstructor,
54 nullptr, 0, nullptr, &asyncRunnerClass));
55
56 // define priority
57 napi_value priorityObj = NapiHelper::CreateObject(env);
58 napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
59 napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
60 napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
61 napi_value idlePriority = NapiHelper::CreateUint32(env, Priority::IDLE);
62 napi_property_descriptor exportPriority[] = {
63 DECLARE_NAPI_PROPERTY("HIGH", highPriority),
64 DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
65 DECLARE_NAPI_PROPERTY("LOW", lowPriority),
66 DECLARE_NAPI_PROPERTY("IDLE", idlePriority),
67 };
68 napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
69
70 // define State
71 napi_value stateObj = NapiHelper::CreateObject(env);
72 napi_value waitingState = NapiHelper::CreateUint32(env, ExecuteState::WAITING);
73 napi_value runningState = NapiHelper::CreateUint32(env, ExecuteState::RUNNING);
74 napi_value canceledState = NapiHelper::CreateUint32(env, ExecuteState::CANCELED);
75 napi_property_descriptor exportState[] = {
76 DECLARE_NAPI_PROPERTY("WAITING", waitingState),
77 DECLARE_NAPI_PROPERTY("RUNNING", runningState),
78 DECLARE_NAPI_PROPERTY("CANCELED", canceledState),
79 };
80 napi_define_properties(env, stateObj, sizeof(exportState) / sizeof(exportState[0]), exportState);
81
82 napi_property_descriptor properties[] = {
83 DECLARE_NAPI_PROPERTY("Task", taskClass),
84 DECLARE_NAPI_PROPERTY("LongTask", longTaskClass),
85 DECLARE_NAPI_PROPERTY("GenericsTask", genericsTaskClass),
86 DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
87 DECLARE_NAPI_PROPERTY("SequenceRunner", seqRunnerClass),
88 DECLARE_NAPI_PROPERTY("AsyncRunner", asyncRunnerClass),
89 DECLARE_NAPI_PROPERTY("Priority", priorityObj),
90 DECLARE_NAPI_PROPERTY("State", stateObj),
91 DECLARE_NAPI_FUNCTION("execute", Execute),
92 DECLARE_NAPI_FUNCTION("executeDelayed", ExecuteDelayed),
93 DECLARE_NAPI_FUNCTION("cancel", Cancel),
94 DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
95 DECLARE_NAPI_FUNCTION("terminateTask", TerminateTask),
96 DECLARE_NAPI_FUNCTION("isConcurrent", IsConcurrent),
97 DECLARE_NAPI_FUNCTION("executePeriodically", ExecutePeriodically),
98 };
99 napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
100
101 TaskManager::GetInstance().InitTaskManager(env);
102 return exports;
103 }
104
105 // ---------------------------------- SendData ---------------------------------------
ExecuteOnReceiveDataCallback(CallbackInfo * callbackInfo,TaskResultInfo * resultInfo)106 void TaskPool::ExecuteOnReceiveDataCallback(CallbackInfo* callbackInfo, TaskResultInfo* resultInfo)
107 {
108 ObjectScope<TaskResultInfo> resultInfoScope(resultInfo, false);
109 napi_status status = napi_ok;
110 std::string traceLabel = "ExecuteOnReceiveDataCallback type: " + callbackInfo->type
111 + ", taskId: " + std::to_string(resultInfo->taskId);
112 HITRACE_HELPER_METER_NAME(traceLabel);
113 auto env = callbackInfo->hostEnv;
114 CallbackScope callbackScope(env, resultInfo, status);
115 if (status != napi_ok) {
116 HILOG_ERROR("napi_open_handle_scope failed");
117 return;
118 }
119 auto func = NapiHelper::GetReferenceValue(env, callbackInfo->callbackRef);
120 napi_value args;
121 napi_value result;
122 status = napi_deserialize(env, resultInfo->serializationArgs, &args);
123 napi_delete_serialization_data(env, resultInfo->serializationArgs);
124 if (status != napi_ok || args == nullptr) {
125 std::string errMessage = "taskpool:: failed to serialize function";
126 HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
127 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
128 return;
129 }
130 uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
131 napi_value argsArray[argsNum];
132 for (size_t i = 0; i < argsNum; i++) {
133 argsArray[i] = NapiHelper::GetElement(env, args, i);
134 }
135 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, &result);
136 if (NapiHelper::IsExceptionPending(env)) {
137 napi_value exception = nullptr;
138 napi_get_and_clear_last_exception(env, &exception);
139 HILOG_ERROR("taskpool:: an exception has occurred in napi_call_function");
140 }
141 }
142 // ---------------------------------- SendData ---------------------------------------
143
GetTaskPoolInfo(napi_env env,napi_callback_info cbinfo)144 napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
145 {
146 napi_value result = nullptr;
147 napi_create_object(env, &result);
148 napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
149 napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
150 napi_set_named_property(env, result, "threadInfos", threadInfos);
151 napi_set_named_property(env, result, "taskInfos", taskInfos);
152 return result;
153 }
154
TerminateTask(napi_env env,napi_callback_info cbinfo)155 napi_value TaskPool::TerminateTask(napi_env env, napi_callback_info cbinfo)
156 {
157 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
158 size_t argc = 1; // 1: long task
159 napi_value args[1];
160 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
161 if (argc < 1) {
162 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be one.");
163 return nullptr;
164 }
165 if (!NapiHelper::IsObject(env, args[0])) {
166 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
167 return nullptr;
168 }
169 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
170 uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId);
171 auto task = TaskManager::GetInstance().GetTask(taskId);
172 if (task == nullptr || !task->IsLongTask()) {
173 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be long task.");
174 return nullptr;
175 }
176 TaskManager::GetInstance().TerminateTask(taskId);
177 return nullptr;
178 }
179
Execute(napi_env env,napi_callback_info cbinfo)180 napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
181 {
182 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
183 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
184 if (argc < 1) {
185 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one.");
186 return nullptr;
187 }
188 napi_value* args = new napi_value[argc];
189 ObjectScope<napi_value> scope(args, true);
190 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
191 napi_valuetype type = napi_undefined;
192 napi_typeof(env, args[0], &type);
193 if (type == napi_object) {
194 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
195 if (argc > 1) {
196 if (!NapiHelper::IsNumber(env, args[1])) {
197 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be number.");
198 return nullptr;
199 }
200 priority = NapiHelper::GetUint32Value(env, args[1]);
201 if (priority >= Priority::NUMBER) {
202 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error");
203 return nullptr;
204 }
205 }
206 if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
207 return ExecuteGroup(env, args[0], static_cast<Priority>(priority));
208 }
209 Task* task = nullptr;
210 napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
211 if (task == nullptr) {
212 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be task.");
213 return nullptr;
214 }
215 if (!task->CanExecute(env)) {
216 return nullptr;
217 }
218 napi_value promise = task->GetTaskInfoPromise(env, args[0], TaskType::COMMON_TASK,
219 static_cast<Priority>(priority));
220 if (promise == nullptr) {
221 return nullptr;
222 }
223 ExecuteTask(env, task, static_cast<Priority>(priority));
224 return promise;
225 }
226 if (type != napi_function) {
227 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
228 "the type of the first param must be object or function.");
229 return nullptr;
230 }
231 Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::FUNCTION_TASK);
232 if (task == nullptr) {
233 HILOG_ERROR("taskpool:: GenerateFunctionTask failed");
234 return nullptr;
235 }
236 napi_value promise = NapiHelper::CreatePromise(env, &task->currentTaskInfo_->deferred);
237 ExecuteTask(env, task);
238 return promise;
239 }
240
DelayTask(uv_timer_t * handle)241 void TaskPool::DelayTask(uv_timer_t* handle)
242 {
243 TaskMessage* taskMessage = static_cast<TaskMessage*>(handle->data);
244 auto task = TaskManager::GetInstance().GetTask(taskMessage->taskId);
245 napi_status status = napi_ok;
246 if (task == nullptr) {
247 HILOG_DEBUG("taskpool:: task is nullptr");
248 } else if (task->taskState_ == ExecuteState::CANCELED) {
249 HILOG_DEBUG("taskpool:: DelayTask task has been canceled");
250 HandleScope scope(task->env_, status);
251 if (status != napi_ok) {
252 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
253 return;
254 }
255 napi_value error = TaskManager::GetInstance().CancelError(task->env_, 0, "taskpool:: task has been canceled");
256 napi_reject_deferred(task->env_, taskMessage->deferred, error);
257 } else {
258 HILOG_INFO("taskpool:: DelayTask taskId %{public}s", std::to_string(taskMessage->taskId).c_str());
259 HandleScope scope(task->env_, status);
260 if (status != napi_ok) {
261 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
262 return;
263 }
264 TaskManager::GetInstance().IncreaseSendDataRefCount(taskMessage->taskId);
265 task->IncreaseRefCount();
266 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
267 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, taskMessage->priority);
268 if (taskInfo != nullptr) {
269 taskInfo->deferred = taskMessage->deferred;
270 if (task->taskState_ == ExecuteState::DELAYED || task->taskState_ == ExecuteState::FINISHED) {
271 task->taskState_ = ExecuteState::WAITING;
272 TaskManager::GetInstance().EnqueueTaskId(taskMessage->taskId, Priority(taskMessage->priority));
273 }
274 } else {
275 napi_value execption = nullptr;
276 napi_get_and_clear_last_exception(task->env_, &execption);
277 if (execption != nullptr) {
278 napi_reject_deferred(task->env_, taskMessage->deferred, execption);
279 }
280 }
281 }
282 if (task != nullptr) {
283 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
284 task->delayedTimers_.erase(handle);
285 }
286 uv_timer_stop(handle);
287 ConcurrentHelper::UvHandleClose(handle);
288 delete taskMessage;
289 taskMessage = nullptr;
290 }
291
ExecuteDelayed(napi_env env,napi_callback_info cbinfo)292 napi_value TaskPool::ExecuteDelayed(napi_env env, napi_callback_info cbinfo)
293 {
294 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
295 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
296 int32_t delayTime = 0;
297 Task* task = nullptr;
298 if (!CheckDelayedParams(env, cbinfo, priority, delayTime, task)) {
299 return nullptr;
300 }
301
302 task->UpdateTaskStateToDelayed();
303 task->UpdateTaskType(TaskType::COMMON_TASK);
304
305 uv_loop_t* loop = NapiHelper::GetLibUV(env);
306 uv_update_time(loop);
307 uv_timer_t* timer = new uv_timer_t;
308 uv_timer_init(loop, timer);
309 TaskMessage* taskMessage = new TaskMessage();
310 taskMessage->priority = static_cast<Priority>(priority);
311 taskMessage->taskId = task->taskId_;
312 napi_value promise = NapiHelper::CreatePromise(env, &taskMessage->deferred);
313 timer->data = taskMessage;
314
315 std::string strTrace = "ExecuteDelayed: taskId: " + std::to_string(task->taskId_);
316 strTrace += ", priority: " + std::to_string(priority);
317 strTrace += ", delayTime " + std::to_string(delayTime);
318 HITRACE_HELPER_METER_NAME(strTrace);
319 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
320
321 uv_timer_start(timer, reinterpret_cast<uv_timer_cb>(DelayTask), delayTime, 0);
322 {
323 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
324 task->delayedTimers_.insert(timer);
325 }
326 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
327 if (engine->IsMainThread()) {
328 uv_async_send(&loop->wq_async);
329 }
330 return promise;
331 }
332
ExecuteGroup(napi_env env,napi_value napiTaskGroup,Priority priority)333 napi_value TaskPool::ExecuteGroup(napi_env env, napi_value napiTaskGroup, Priority priority)
334 {
335 napi_value napiGroupId = NapiHelper::GetNameProperty(env, napiTaskGroup, GROUP_ID_STR);
336 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
337 HILOG_INFO("taskpool::ExecuteGroup groupId %{public}s", std::to_string(groupId).c_str());
338 auto taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
339 if (taskGroup == nullptr) {
340 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskGroup is nullptr.");
341 return nullptr;
342 }
343 napi_reference_ref(env, taskGroup->groupRef_, nullptr);
344 if (taskGroup->groupState_ == ExecuteState::NOT_FOUND || taskGroup->groupState_ == ExecuteState::FINISHED ||
345 taskGroup->groupState_ == ExecuteState::CANCELED) {
346 taskGroup->groupState_ = ExecuteState::WAITING;
347 }
348 GroupInfo* groupInfo = new GroupInfo();
349 groupInfo->priority = priority;
350 napi_value resArr;
351 napi_create_array_with_length(env, taskGroup->taskIds_.size(), &resArr);
352 napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
353 groupInfo->resArr = arrRef;
354 napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
355 {
356 std::lock_guard<std::recursive_mutex> lock(taskGroup->taskGroupMutex_);
357 if (taskGroup->taskNum_ == 0) {
358 napi_resolve_deferred(env, groupInfo->deferred, resArr);
359 taskGroup->groupState_ = ExecuteState::FINISHED;
360 napi_delete_reference(env, groupInfo->resArr);
361 napi_reference_unref(env, taskGroup->groupRef_, nullptr);
362 delete groupInfo;
363 taskGroup->currentGroupInfo_ = nullptr;
364 return promise;
365 }
366 if (taskGroup->currentGroupInfo_ == nullptr) {
367 taskGroup->currentGroupInfo_ = groupInfo;
368 for (auto iter = taskGroup->taskRefs_.begin(); iter != taskGroup->taskRefs_.end(); iter++) {
369 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
370 Task* task = nullptr;
371 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
372 if (task == nullptr) {
373 HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
374 return nullptr;
375 }
376 napi_reference_ref(env, task->taskRef_, nullptr);
377 if (task->IsGroupCommonTask()) {
378 task->GetTaskInfo(env, napiTask, static_cast<Priority>(priority));
379 }
380 ExecuteTask(env, task, static_cast<Priority>(priority));
381 }
382 } else {
383 taskGroup->pendingGroupInfos_.push_back(groupInfo);
384 }
385 }
386 return promise;
387 }
388
HandleTaskResult(Task * task)389 void TaskPool::HandleTaskResult(Task* task)
390 {
391 HILOG_DEBUG("taskpool:: HandleTaskResult task");
392 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
393 if (!task->IsMainThreadTask()) {
394 if (task->ShouldDeleteTask(false)) {
395 delete task;
396 return;
397 }
398 if (task->IsFunctionTask()) {
399 napi_remove_env_cleanup_hook(task->env_, Task::CleanupHookFunc, task);
400 }
401 }
402 task->DecreaseTaskLifecycleCount();
403 HandleTaskResultInner(task);
404 }
405
HandleTaskResultInner(Task * task)406 void TaskPool::HandleTaskResultInner(Task* task)
407 {
408 napi_handle_scope scope = nullptr;
409 NAPI_CALL_RETURN_VOID(task->env_, napi_open_handle_scope(task->env_, &scope));
410 napi_value napiTaskResult = nullptr;
411 napi_status status = napi_deserialize(task->env_, task->result_, &napiTaskResult);
412 napi_delete_serialization_data(task->env_, task->result_);
413
414 bool isCancel = false;
415 RecordTaskResultLog(task, status, napiTaskResult, isCancel);
416
417 if (napiTaskResult == nullptr) {
418 napi_get_undefined(task->env_, &napiTaskResult);
419 }
420 reinterpret_cast<NativeEngine*>(task->env_)->DecreaseSubEnvCounter();
421 bool success = ((status == napi_ok) && (task->taskState_ != ExecuteState::CANCELED)) && (task->success_);
422 task->UpdateTaskStateToEnding();
423 task->isCancelToFinish_ = false;
424 if (task->IsGroupTask()) {
425 UpdateGroupInfoByResult(task->env_, task, napiTaskResult, success);
426 } else if (!task->IsPeriodicTask()) {
427 if (success) {
428 napi_resolve_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
429 if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
430 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_, task->taskId_);
431 }
432 } else {
433 napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, napiTaskResult);
434 if (task->onExecutionFailedCallBackInfo_ != nullptr) {
435 task->onExecutionFailedCallBackInfo_->taskError_ = napiTaskResult;
436 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_, task->taskId_);
437 }
438 }
439 }
440 NAPI_CALL_RETURN_VOID(task->env_, napi_close_handle_scope(task->env_, scope));
441 TriggerTask(task, isCancel);
442 }
443
TriggerTask(Task * task,bool isCancel)444 void TaskPool::TriggerTask(Task* task, bool isCancel)
445 {
446 HILOG_DEBUG("taskpool:: task:%{public}s TriggerTask", std::to_string(task->taskId_).c_str());
447 if (task->IsGroupTask()) {
448 return;
449 }
450 TaskManager::GetInstance().DecreaseSendDataRefCount(task->env_, task->taskId_);
451 task->UpdateTaskStateToFinished();
452 // seqRunnerTask will trigger the next
453 if (task->IsSeqRunnerTask()) {
454 if (!SequenceRunnerManager::GetInstance().TriggerSeqRunner(task->env_, task)) {
455 HILOG_WARN("taskpool:: task %{public}s trigger in seqRunner %{public}s failed",
456 std::to_string(task->taskId_).c_str(), std::to_string(task->seqRunnerId_).c_str());
457 }
458 } else if (task->IsCommonTask()) {
459 if (!isCancel) {
460 TaskManager::GetInstance().NotifyDependencyTaskInfo(task->taskId_);
461 }
462 task->NotifyPendingTask();
463 } else if (task->IsAsyncRunnerTask()) {
464 if (!AsyncRunnerManager::GetInstance().TriggerAsyncRunner(task->env_, task)) {
465 HILOG_ERROR("taskpool:: task %{public}s trigger in asyncRunner %{public}s failed",
466 std::to_string(task->taskId_).c_str(), std::to_string(task->asyncRunnerId_).c_str());
467 }
468 }
469 if (task->IsPeriodicTask()) {
470 return;
471 }
472 if (!task->IsFunctionTask()) {
473 napi_reference_unref(task->env_, task->taskRef_, nullptr);
474 return;
475 }
476 // function task need release data
477 task->ReleaseData();
478 TaskManager::GetInstance().RemoveTask(task->taskId_);
479 delete task;
480 }
481
UpdateGroupInfoByResult(napi_env env,Task * task,napi_value res,bool success)482 void TaskPool::UpdateGroupInfoByResult(napi_env env, Task* task, napi_value res, bool success)
483 {
484 HILOG_DEBUG("taskpool:: task:%{public}s UpdateGroupInfoByResult", std::to_string(task->taskId_).c_str());
485 TaskManager::GetInstance().DecreaseSendDataRefCount(task->env_, task->taskId_);
486 task->UpdateTaskStateToFinished();
487 napi_reference_unref(env, task->taskRef_, nullptr);
488 if (task->IsGroupCommonTask()) {
489 delete task->currentTaskInfo_;
490 task->currentTaskInfo_ = nullptr;
491 }
492 TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
493 if (taskGroup == nullptr || taskGroup->currentGroupInfo_ == nullptr) {
494 HILOG_DEBUG("taskpool:: taskGroup may have been released or canceled");
495 return;
496 }
497 // store the result
498 uint32_t index = taskGroup->GetTaskIndex(task->taskId_);
499 auto groupInfo = taskGroup->currentGroupInfo_;
500 napi_ref arrRef = groupInfo->resArr;
501 napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
502 napi_set_element(env, resArr, index, res);
503 groupInfo->finishedTaskNum++;
504 // store the index when the first exception occurs
505 if (!success && !groupInfo->HasException()) {
506 groupInfo->SetFailedIndex(index);
507 }
508 // we will not handle the result until all tasks are finished
509 if (groupInfo->finishedTaskNum < taskGroup->taskNum_) {
510 return;
511 }
512 // if there is no exception, just resolve
513 if (!groupInfo->HasException()) {
514 HILOG_INFO("taskpool:: taskGroup perform end, taskGroupId %{public}s", std::to_string(task->groupId_).c_str());
515 napi_resolve_deferred(env, groupInfo->deferred, resArr);
516 for (uint32_t taskId : taskGroup->taskIds_) {
517 auto task = TaskManager::GetInstance().GetTask(taskId);
518 if (task != nullptr && task->onExecutionSucceededCallBackInfo_ != nullptr) {
519 task->ExecuteListenerCallback(task->onExecutionSucceededCallBackInfo_, task->taskId_);
520 }
521 }
522 } else { // LOCV_EXCL_BR_LINE
523 napi_value res = nullptr;
524 napi_get_element(env, resArr, groupInfo->GetFailedIndex(), &res);
525 napi_reject_deferred(env, groupInfo->deferred, res);
526 auto iter = taskGroup->taskIds_.begin();
527 std::advance(iter, groupInfo->GetFailedIndex());
528 auto task = iter != taskGroup->taskIds_.end() ? TaskManager::GetInstance().GetTask(*iter) : nullptr;
529 if (task != nullptr && task->onExecutionFailedCallBackInfo_ != nullptr) {
530 task->onExecutionFailedCallBackInfo_->taskError_ = res;
531 task->ExecuteListenerCallback(task->onExecutionFailedCallBackInfo_, task->taskId_);
532 }
533 }
534 taskGroup->groupState_ = ExecuteState::FINISHED;
535 napi_delete_reference(env, groupInfo->resArr);
536 napi_reference_unref(env, taskGroup->groupRef_, nullptr);
537 delete groupInfo;
538 taskGroup->currentGroupInfo_ = nullptr;
539 taskGroup->NotifyGroupTask(env);
540 }
541
ExecuteTask(napi_env env,Task * task,Priority priority)542 void TaskPool::ExecuteTask(napi_env env, Task* task, Priority priority)
543 {
544 // tag for trace parse: Task Allocation
545 std::string strTrace = "Task Allocation: taskId : " + std::to_string(task->taskId_)
546 + ", priority : " + std::to_string(priority)
547 + ", executeState : " + std::to_string(ExecuteState::WAITING);
548 HITRACE_HELPER_METER_NAME(strTrace);
549 std::string taskLog = "Task Allocation: " + std::to_string(task->taskId_)
550 + ", " + std::to_string(priority);
551 task->IncreaseRefCount();
552 TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_);
553 if (task->UpdateTaskStateToWaiting()) {
554 HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
555 task->isCancelToFinish_ = false;
556 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
557 } else {
558 HILOG_WARN("taskpool:: %{public}s, not enqueue", taskLog.c_str());
559 }
560 }
561
Cancel(napi_env env,napi_callback_info cbinfo)562 napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
563 {
564 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
565 size_t argc = 1;
566 napi_value args[1];
567 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
568 if (argc < 1) {
569 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
570 return nullptr;
571 }
572
573 if (NapiHelper::IsNumber(env, args[0])) {
574 uint32_t taskId = NapiHelper::GetUint32Value(env, args[0]);
575 TaskManager::GetInstance().CancelTask(env, taskId);
576 return nullptr;
577 }
578
579 if (!NapiHelper::IsObject(env, args[0])) {
580 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be object.");
581 return nullptr;
582 }
583
584 if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
585 napi_value napiTaskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
586 if (napiTaskId == nullptr) {
587 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task.");
588 return nullptr;
589 }
590 uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId);
591 TaskManager::GetInstance().CancelTask(env, taskId);
592 } else {
593 napi_value napiGroupId = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
594 if (napiGroupId == nullptr) {
595 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be taskGroup.");
596 return nullptr;
597 }
598 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
599 TaskGroupManager::GetInstance().CancelGroup(env, groupId);
600 }
601 return nullptr;
602 }
603
IsConcurrent(napi_env env,napi_callback_info cbinfo)604 napi_value TaskPool::IsConcurrent(napi_env env, napi_callback_info cbinfo)
605 {
606 size_t argc = 1;
607 napi_value args[1];
608 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
609 if (argc != 1) {
610 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of the params must be 1.");
611 return nullptr;
612 }
613
614 if (!NapiHelper::IsFunction(env, args[0])) {
615 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be function.");
616 return nullptr;
617 }
618
619 bool isConcurrent = NapiHelper::IsConcurrentFunction(env, args[0]);
620 return NapiHelper::CreateBooleanValue(env, isConcurrent);
621 }
622
PeriodicTaskCallback(uv_timer_t * handle)623 void TaskPool::PeriodicTaskCallback(uv_timer_t* handle)
624 {
625 Task* task = reinterpret_cast<Task*>(handle->data);
626 if (task == nullptr) {
627 HILOG_DEBUG("taskpool:: the task is nullptr");
628 return;
629 } else if (!task->IsPeriodicTask()) {
630 HILOG_DEBUG("taskpool:: the current task is not a periodic task");
631 return;
632 } else if (task->taskState_ == ExecuteState::CANCELED) {
633 if (task->currentTaskInfo_ == nullptr) {
634 HILOG_DEBUG("taskpool:: the periodic task has been canceled");
635 napi_reference_unref(task->env_, task->taskRef_, nullptr);
636 task->CancelPendingTask(task->env_);
637 uv_timer_stop(task->timer_);
638 ConcurrentHelper::UvHandleClose(task->timer_);
639 }
640 return;
641 }
642 TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_);
643
644 napi_status status = napi_ok;
645 HandleScope scope(task->env_, status);
646 if (status != napi_ok) {
647 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
648 return;
649 }
650 napi_value napiTask = NapiHelper::GetReferenceValue(task->env_, task->taskRef_);
651 TaskInfo* taskInfo = task->GetTaskInfo(task->env_, napiTask, task->periodicTaskPriority_);
652 if (taskInfo == nullptr) {
653 HILOG_DEBUG("taskpool:: the periodic task taskInfo is nullptr");
654 return;
655 }
656
657 task->IncreaseRefCount();
658 HILOG_INFO("taskpool:: PeriodicTaskCallback taskId %{public}s", std::to_string(task->taskId_).c_str());
659 if (task->UpdateTaskStateToWaiting()) {
660 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, task->periodicTaskPriority_);
661 }
662 }
663
ExecutePeriodically(napi_env env,napi_callback_info cbinfo)664 napi_value TaskPool::ExecutePeriodically(napi_env env, napi_callback_info cbinfo)
665 {
666 int32_t period = 0;
667 uint32_t priority = Priority::DEFAULT;
668 Task* periodicTask = nullptr;
669 if (!CheckPeriodicallyParams(env, cbinfo, period, priority, periodicTask)) {
670 return nullptr;
671 }
672
673 if (!periodicTask->CanExecutePeriodically(env)) {
674 return nullptr;
675 }
676 periodicTask->UpdatePeriodicTask();
677
678 periodicTask->periodicTaskPriority_ = static_cast<Priority>(priority);
679 napi_value napiTask = NapiHelper::GetReferenceValue(env, periodicTask->taskRef_);
680 auto [func, args, transferList, cloneList] = Task::GetSerializeParams(env, napiTask);
681 if (func == nullptr || args == nullptr) {
682 return nullptr;
683 }
684 std::tuple<napi_value, napi_value, bool, bool> params = {
685 transferList, cloneList, periodicTask->defaultTransfer_, periodicTask->defaultCloneSendable_
686 };
687 auto [serFunction, serArguments] = Task::GetSerializeResult(env, func, args, params);
688 if (serFunction == nullptr || serArguments == nullptr) { // LOCV_EXCL_BR_LINE
689 return nullptr;
690 }
691
692 TriggerTimer(env, periodicTask, period);
693 return nullptr;
694 }
695
TriggerTimer(napi_env env,Task * task,int32_t period)696 void TaskPool::TriggerTimer(napi_env env, Task* task, int32_t period)
697 {
698 HILOG_INFO("taskpool::TriggerTimer taskId %{public}s", std::to_string(task->taskId_).c_str());
699 uv_loop_t* loop = NapiHelper::GetLibUV(env);
700 task->timer_ = new uv_timer_t;
701 uv_timer_init(loop, task->timer_);
702 task->timer_->data = task;
703 uv_update_time(loop);
704 uv_timer_start(task->timer_, PeriodicTaskCallback, period, period);
705 NativeEngine* engine = reinterpret_cast<NativeEngine*>(env);
706 if (engine->IsMainThread()) {
707 uv_async_send(&loop->wq_async);
708 }
709 }
710
CheckDelayedParams(napi_env env,napi_callback_info cbinfo,uint32_t & priority,int32_t & delayTime,Task * & task)711 bool TaskPool::CheckDelayedParams(napi_env env, napi_callback_info cbinfo, uint32_t &priority, int32_t &delayTime,
712 Task* &task)
713 {
714 size_t argc = 3; // 3: delayTime, task and priority
715 napi_value args[3]; // 3: delayTime, task and priority
716 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
717 if (argc < 2 || argc > 3) { // 2: delayTime and task 3: delayTime, task and priority
718 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
719 return false;
720 }
721
722 if (!NapiHelper::IsNumber(env, args[0])) {
723 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
724 return false;
725 }
726
727 if (!NapiHelper::IsObject(env, args[1])) {
728 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be object.");
729 return false;
730 }
731
732 delayTime = NapiHelper::GetInt32Value(env, args[0]);
733 if (delayTime < 0) {
734 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The delayTime is less than zero");
735 return false;
736 }
737
738 if (argc > 2) { // 2: the params might have priority
739 if (!NapiHelper::IsNumber(env, args[2])) {
740 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the third param must be number.");
741 return false;
742 }
743 priority = NapiHelper::GetUint32Value(env, args[2]); // 2: get task priority
744 if (priority >= Priority::NUMBER) {
745 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "priority value is error.");
746 return false;
747 }
748 }
749
750 napi_unwrap(env, args[1], reinterpret_cast<void**>(&task));
751 if (task == nullptr) {
752 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of second param must be task");
753 return false;
754 }
755 if (!task->CanExecuteDelayed(env)) {
756 return false;
757 }
758 return true;
759 }
760
CheckPeriodicallyParams(napi_env env,napi_callback_info cbinfo,int32_t & period,uint32_t & priority,Task * & periodicTask)761 bool TaskPool::CheckPeriodicallyParams(napi_env env, napi_callback_info cbinfo, int32_t &period,
762 uint32_t &priority, Task* &periodicTask)
763 {
764 size_t argc = 3; // 3 : period, task, priority
765 napi_value args[3]; // 3 : period, task, priority
766 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
767 if (argc < 2 || argc > 3) { // 2 : period, task and 3 : period, task, priority
768 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be two or three.");
769 return false;
770 }
771 if (!NapiHelper::IsNumber(env, args[0])) {
772 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be number.");
773 return false;
774 }
775 period = NapiHelper::GetInt32Value(env, args[0]);
776 if (period < 0) {
777 ErrorHelper::ThrowError(env, ErrorHelper::ERR_DELAY_TIME_ERROR, "The period value is less than zero.");
778 return false;
779 }
780 if (!NapiHelper::IsObject(env, args[1])) {
781 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
782 return false;
783 }
784
785 if (argc >= 3) { // 3 : third param maybe priority
786 if (!NapiHelper::IsNumber(env, args[2])) { // 2 : priority
787 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the third param must be priority.");
788 return false;
789 }
790 priority = NapiHelper::GetUint32Value(env, args[2]); // 2 : priority
791 if (priority >= Priority::NUMBER) {
792 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the value of the priority is invalid.");
793 return false;
794 }
795 }
796
797 napi_unwrap(env, args[1], reinterpret_cast<void**>(&periodicTask));
798 if (periodicTask == nullptr) {
799 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the second param must be task.");
800 return false;
801 }
802
803 return true;
804 }
805
RecordTaskResultLog(Task * task,napi_status status,napi_value & napiTaskResult,bool & isCancel)806 void TaskPool::RecordTaskResultLog(Task* task, napi_status status, napi_value& napiTaskResult, bool& isCancel)
807 {
808 // tag for trace parse: Task PerformTask End
809 std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(task->taskId_);
810 std::string taskLog = "Task PerformTask End: " + std::to_string(task->taskId_);
811 if (task->taskState_ == ExecuteState::CANCELED) {
812 strTrace += ", performResult : IsCanceled";
813 napiTaskResult = task->IsAsyncRunnerTask() ? TaskManager::GetInstance().CancelError(task->env_,
814 ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED, nullptr, napiTaskResult, task->success_) :
815 TaskManager::GetInstance().CancelError(task->env_, 0, nullptr, napiTaskResult, task->success_);
816 isCancel = true;
817 } else if (status != napi_ok) {
818 strTrace += ", performResult : DeserializeFailed";
819 taskLog += ", DeserializeFailed";
820 } else if (task->success_) {
821 strTrace += ", performResult : Successful";
822 } else {
823 strTrace += ", performResult : Unsuccessful";
824 taskLog += ", Unsuccessful";
825 }
826 HITRACE_HELPER_METER_NAME(strTrace);
827 HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
828 }
829 } // namespace Commonlibrary::Concurrent::TaskPoolModule
830