1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task.h"
17
18 #include "async_runner_manager.h"
19 #include "helper/concurrent_helper.h"
20 #include "helper/hitrace_helper.h"
21 #include "sequence_runner_manager.h"
22 #include "taskpool.h"
23 #include "worker.h"
24
25 namespace Commonlibrary::Concurrent::TaskPoolModule {
26 static constexpr char ONRECEIVEDATA_STR[] = "onReceiveData";
27 static constexpr char SETTRANSFERLIST_STR[] = "setTransferList";
28 static constexpr char SET_CLONE_LIST_STR[] = "setCloneList";
29 static constexpr char ONENQUEUED_STR[] = "onEnqueued";
30 static constexpr char ONSTARTEXECUTION_STR[] = "onStartExecution";
31 static constexpr char ONEXECUTIONFAILED_STR[] = "onExecutionFailed";
32 static constexpr char ONEXECUTIONSUCCEEDED_STR[] = "onExecutionSucceeded";
33 static constexpr char ISDONE_STR[] = "isDone";
34
35 using namespace Commonlibrary::Concurrent::Common::Helper;
36
Task(napi_env env,TaskType taskType,std::string name)37 Task::Task(napi_env env, TaskType taskType, std::string name) : env_(env), taskType_(taskType), name_(name) {}
38
TaskConstructor(napi_env env,napi_callback_info cbinfo)39 napi_value Task::TaskConstructor(napi_env env, napi_callback_info cbinfo)
40 {
41 // check argv count
42 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
43 std::string errMessage = "";
44 if (argc < 1) {
45 errMessage = "taskpool:: create task need more than one param";
46 HILOG_ERROR("%{public}s", errMessage.c_str());
47 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
48 return nullptr;
49 }
50 napi_value* args = new napi_value[argc];
51 ObjectScope<napi_value> scope(args, true);
52 napi_value thisVar = nullptr;
53 napi_value func = nullptr;
54 napi_value name = nullptr;
55 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
56 // if the first is task name, the second might be func
57 if (argc > 1 && NapiHelper::IsString(env, args[0])) {
58 name = args[0];
59 func = args[1];
60 args += 2; // 2: name and func
61 argc -= 2; // 2: name and func
62 } else {
63 func = args[0];
64 args += 1; // 1: func
65 argc -= 1; // 1: func
66 }
67 if (!NapiHelper::IsFunction(env, func)) {
68 errMessage = "taskpool:: the first or second param of task must be function";
69 HILOG_ERROR("%{public}s", errMessage.c_str());
70 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
71 "the type of the first or second param of task must be function.");
72 return nullptr;
73 }
74
75 Task* task = GenerateTask(env, thisVar, func, name, args, argc);
76 napi_status status = napi_wrap(env, thisVar, task, TaskDestructor, nullptr, nullptr);
77 if (status != napi_ok) {
78 HILOG_ERROR("taskpool::TaskConstructor napi_wrap return value is %{public}d", status);
79 TaskManager::GetInstance().RemoveTask(task->taskId_);
80 delete task;
81 task = nullptr;
82 return nullptr;
83 }
84 napi_create_reference(env, thisVar, 0, &task->taskRef_);
85 if (!task->IsMainThreadTask()) {
86 napi_add_env_cleanup_hook(env, Task::CleanupHookFunc, task);
87 }
88 return thisVar;
89 }
90
LongTaskConstructor(napi_env env,napi_callback_info cbinfo)91 napi_value Task::LongTaskConstructor(napi_env env, napi_callback_info cbinfo)
92 {
93 auto thisVar = TaskConstructor(env, cbinfo);
94 if (thisVar == nullptr) {
95 return nullptr;
96 }
97 Task* task;
98 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
99 task->isLongTask_ = true;
100 return thisVar;
101 }
102
TaskDestructor(napi_env env,void * data,void * hint)103 void Task::TaskDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
104 {
105 Task* task = static_cast<Task*>(data);
106 HILOG_DEBUG("taskpool:: taskId:%{public}s TaskDestructor", std::to_string(task->taskId_).c_str());
107 if (!task->IsMainThreadTask()) {
108 napi_remove_env_cleanup_hook(env, Task::CleanupHookFunc, task);
109 }
110 // for performance, do not lock first
111 if (task->IsMainThreadTask() || task->refCount_ == 0) {
112 TaskManager::GetInstance().ReleaseTaskData(env, task);
113 napi_delete_reference(env, task->taskRef_);
114 delete task;
115 return;
116 }
117 bool shouldDelete = false;
118 {
119 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
120 task->SetValid(false);
121 if (task->refCount_ == 0) {
122 shouldDelete = true;
123 }
124 TaskManager::GetInstance().ReleaseTaskData(env, task, shouldDelete);
125 napi_delete_reference(env, task->taskRef_);
126 }
127 if (shouldDelete) {
128 delete task;
129 }
130 }
131
CleanupHookFunc(void * arg)132 void Task::CleanupHookFunc(void* arg)
133 {
134 if (arg == nullptr) {
135 HILOG_ERROR("taskpool:: cleanupHook arg is nullptr");
136 return;
137 }
138 Task* task = static_cast<Task*>(arg);
139 {
140 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
141 ConcurrentHelper::UvHandleClose(task->onResultSignal_);
142 ConcurrentHelper::UvHandleClose(task->onStartCancelSignal_);
143 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
144 ConcurrentHelper::UvHandleClose(task->onStartDiscardSignal_);
145 if (task->IsFunctionTask()) {
146 task->SetValid(false);
147 }
148 }
149 if (task->IsAsyncRunnerTask()) {
150 AsyncRunnerManager::GetInstance().RemoveWaitingTask(task);
151 }
152 if (task->IsSeqRunnerTask()) {
153 SequenceRunnerManager::GetInstance().RemoveWaitingTask(task);
154 }
155 }
156
Cancel(const uv_async_t * req)157 void Task::Cancel(const uv_async_t* req)
158 {
159 auto message = static_cast<CancelTaskMessage*>(req->data);
160 if (message == nullptr) {
161 HILOG_DEBUG("taskpool:: cancel message is nullptr");
162 return;
163 }
164 Task* task = TaskManager::GetInstance().GetTask(message->taskId);
165 if (task == nullptr) {
166 HILOG_DEBUG("taskpool:: cancel task is nullptr");
167 CloseHelp::DeletePointer(message, false);
168 return;
169 }
170 napi_status status = napi_ok;
171 HandleScope scope(task->env_, status);
172 if (status != napi_ok) {
173 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
174 CloseHelp::DeletePointer(message, false);
175 return;
176 }
177 task->CancelInner(message->state);
178 CloseHelp::DeletePointer(message, false);
179 }
180
GenerateTask(napi_env env,napi_value napiTask,napi_value func,napi_value name,napi_value * args,size_t argc)181 Task* Task::GenerateTask(napi_env env, napi_value napiTask, napi_value func,
182 napi_value name, napi_value* args, size_t argc)
183 {
184 HILOG_DEBUG("taskpool:: task GenerateTask");
185 napi_value argsArray = NapiHelper::CreateArrayWithLength(env, argc);
186 for (size_t i = 0; i < argc; i++) {
187 napi_set_element(env, argsArray, i, args[i]);
188 }
189 if (name == nullptr) {
190 name = NapiHelper::GetNameProperty(env, func, NAME);
191 }
192 char* nameStr = NapiHelper::GetChars(env, name);
193 Task* task = new Task(env, TaskType::TASK, nameStr);
194 delete[] nameStr;
195 TaskManager::GetInstance().StoreTask(task);
196 task->InitHandle(env);
197
198 napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_);
199 napi_set_named_property(env, napiTask, FUNCTION_STR, func);
200 napi_set_named_property(env, napiTask, TASKID_STR, taskId);
201 napi_set_named_property(env, napiTask, ARGUMENTS_STR, argsArray);
202 napi_property_descriptor properties[] = {
203 DECLARE_NAPI_FUNCTION(SETTRANSFERLIST_STR, SetTransferList),
204 DECLARE_NAPI_FUNCTION(SET_CLONE_LIST_STR, SetCloneList),
205 DECLARE_NAPI_FUNCTION(ONRECEIVEDATA_STR, OnReceiveData),
206 DECLARE_NAPI_FUNCTION(ADD_DEPENDENCY_STR, AddDependency),
207 DECLARE_NAPI_FUNCTION(REMOVE_DEPENDENCY_STR, RemoveDependency),
208 DECLARE_NAPI_FUNCTION(ONENQUEUED_STR, OnEnqueued),
209 DECLARE_NAPI_FUNCTION(ONSTARTEXECUTION_STR, OnStartExecution),
210 DECLARE_NAPI_FUNCTION(ONEXECUTIONFAILED_STR, OnExecutionFailed),
211 DECLARE_NAPI_FUNCTION(ONEXECUTIONSUCCEEDED_STR, OnExecutionSucceeded),
212 DECLARE_NAPI_FUNCTION(ISDONE_STR, IsDone),
213 DECLARE_NAPI_GETTER(TASK_TOTAL_TIME, GetTotalDuration),
214 DECLARE_NAPI_GETTER(TASK_CPU_TIME, GetCPUDuration),
215 DECLARE_NAPI_GETTER(TASK_IO_TIME, GetIODuration),
216 DECLARE_NAPI_GETTER(NAME, GetName),
217 DECLARE_NAPI_GETTER(TASKID_STR, GetTaskId)
218 };
219 napi_define_properties(env, napiTask, sizeof(properties) / sizeof(properties[0]), properties);
220 return task;
221 }
222
GenerateFunctionTask(napi_env env,napi_value func,napi_value * args,size_t argc,TaskType type)223 Task* Task::GenerateFunctionTask(napi_env env, napi_value func, napi_value* args, size_t argc, TaskType type)
224 {
225 HILOG_DEBUG("taskpool:: task GenerateFunctionTask");
226 napi_value argsArray;
227 napi_create_array_with_length(env, argc, &argsArray);
228 for (size_t i = 0; i < argc; i++) {
229 napi_set_element(env, argsArray, i, args[i]);
230 }
231 napi_value undefined = NapiHelper::GetUndefinedValue(env);
232 TaskInfo* taskInfo = GenerateTaskInfo(env, func, argsArray, undefined, undefined, Priority::DEFAULT);
233 if (taskInfo == nullptr) {
234 HILOG_ERROR("taskpool:: task GenerateFunctionTask end, taskInfo is nullptr");
235 return nullptr;
236 }
237 napi_value napiFuncName = NapiHelper::GetNameProperty(env, func, NAME);
238 char* nameStr = NapiHelper::GetChars(env, napiFuncName);
239 Task* task = new Task(env, type, nameStr);
240 delete[] nameStr;
241 task->currentTaskInfo_ = taskInfo;
242 task->InitHandle(env);
243 if (!task->IsMainThreadTask()) {
244 napi_add_env_cleanup_hook(env, CleanupHookFunc, task);
245 }
246 TaskManager::GetInstance().StoreTask(task);
247 return task;
248 }
249
GetTaskInfoPromise(napi_env env,napi_value task,TaskType taskType,Priority priority)250 napi_value Task::GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType, Priority priority)
251 {
252 TaskInfo* taskInfo = GetTaskInfo(env, task, priority);
253 if (taskInfo == nullptr) {
254 return nullptr;
255 }
256 UpdateTaskType(taskType);
257 return NapiHelper::CreatePromise(env, &taskInfo->deferred);
258 }
259
GetTaskInfo(napi_env env,napi_value napiTask,Priority priority)260 TaskInfo* Task::GetTaskInfo(napi_env env, napi_value napiTask, Priority priority)
261 {
262 napi_value func = NapiHelper::GetNameProperty(env, napiTask, FUNCTION_STR);
263 napi_value args = NapiHelper::GetNameProperty(env, napiTask, ARGUMENTS_STR);
264 if (func == nullptr || args == nullptr) {
265 std::string errMessage = "taskpool:: task value is error";
266 HILOG_ERROR("%{public}s", errMessage.c_str());
267 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
268 return nullptr;
269 }
270 napi_value transferList = NapiHelper::GetUndefinedValue(env);
271 if (NapiHelper::HasNameProperty(env, napiTask, TRANSFERLIST_STR)) {
272 transferList = NapiHelper::GetNameProperty(env, napiTask, TRANSFERLIST_STR);
273 }
274 napi_value cloneList = NapiHelper::GetUndefinedValue(env);
275 if (NapiHelper::HasNameProperty(env, napiTask, CLONE_LIST_STR)) {
276 cloneList = NapiHelper::GetNameProperty(env, napiTask, CLONE_LIST_STR);
277 }
278 TaskInfo* pendingInfo = GenerateTaskInfo(env, func, args, transferList, cloneList, priority,
279 defaultTransfer_, defaultCloneSendable_);
280 if (pendingInfo == nullptr) {
281 return nullptr;
282 }
283 {
284 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
285 if (currentTaskInfo_ == nullptr) {
286 currentTaskInfo_ = pendingInfo;
287 } else {
288 pendingTaskInfos_.push_back(pendingInfo);
289 }
290 }
291 if (name_.empty()) {
292 napi_value funcName = NapiHelper::GetNameProperty(env, func, NAME);
293 name_ = NapiHelper::GetString(env, funcName);
294 }
295 return pendingInfo;
296 }
297
SetTransferList(napi_env env,napi_callback_info cbinfo)298 napi_value Task::SetTransferList(napi_env env, napi_callback_info cbinfo)
299 {
300 size_t argc = 1;
301 napi_value args[1];
302 napi_value thisVar;
303 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
304 // Check whether clone list has been set
305 if (NapiHelper::HasNameProperty(env, thisVar, CLONE_LIST_STR)) {
306 ErrorHelper::ThrowError(env, ErrorHelper::ERR_IN_BOTH_CLONE_AND_TRANSFER);
307 return nullptr;
308 }
309 if (argc > 1) {
310 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
311 "the number of setTransferList parma must be less than 2.");
312 return nullptr;
313 }
314 Task* task = nullptr;
315 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
316 if (task == nullptr) {
317 HILOG_ERROR("taskpool:: task is nullptr");
318 return nullptr;
319 }
320 napi_value undefined = NapiHelper::GetUndefinedValue(env);
321 napi_value falseVal = NapiHelper::CreateBooleanValue(env, false);
322 if (argc == 0) {
323 HILOG_DEBUG("taskpool:: set task params not transfer");
324 napi_set_named_property(env, thisVar, TRANSFERLIST_STR, undefined);
325 // set task.defaultTransfer false
326 task->defaultTransfer_ = false;
327 return nullptr;
328 }
329 if (!NapiHelper::IsArray(env, args[0])) {
330 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
331 "the type of setTransferList first param must be array.");
332 return nullptr;
333 }
334 // set task.defaultTransfer false
335 task->defaultTransfer_ = false;
336 uint32_t arrayLength = NapiHelper::GetArrayLength(env, args[0]);
337 if (arrayLength == 0) {
338 HILOG_DEBUG("taskpool:: set task params not transfer");
339 napi_set_named_property(env, thisVar, TRANSFERLIST_STR, undefined);
340 return nullptr;
341 }
342 for (size_t i = 0; i < arrayLength; i++) {
343 napi_value transferVal = NapiHelper::GetElement(env, args[0], i);
344 if (!NapiHelper::IsArrayBuffer(env, transferVal)) {
345 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
346 "the type of the element in array must be arraybuffer.");
347 return nullptr;
348 }
349 }
350 HILOG_DEBUG("taskpool:: check setTransferList param success");
351 napi_set_named_property(env, thisVar, TRANSFERLIST_STR, args[0]);
352 return nullptr;
353 }
354
SetCloneList(napi_env env,napi_callback_info cbinfo)355 napi_value Task::SetCloneList(napi_env env, napi_callback_info cbinfo)
356 {
357 size_t argc = 1;
358 napi_value args[1];
359 napi_value thisVar;
360 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
361 // Check whether transfer list has been set
362 if (NapiHelper::HasNameProperty(env, thisVar, TRANSFERLIST_STR)) {
363 ErrorHelper::ThrowError(env, ErrorHelper::ERR_IN_BOTH_CLONE_AND_TRANSFER);
364 return nullptr;
365 }
366 if (argc != 1) {
367 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of setCloneList parma must be 1.");
368 return nullptr;
369 }
370 if (!NapiHelper::IsArray(env, args[0])) {
371 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of setCloneList first param must be array.");
372 return nullptr;
373 }
374 Task* task = nullptr;
375 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
376 if (task == nullptr) {
377 HILOG_ERROR("taskpool:: task is nullptr");
378 return nullptr;
379 }
380 napi_value undefined = NapiHelper::GetUndefinedValue(env);
381 uint32_t arrayLength = NapiHelper::GetArrayLength(env, args[0]);
382 if (arrayLength == 0) {
383 HILOG_DEBUG("taskpool:: clone list is empty");
384 napi_set_named_property(env, thisVar, CLONE_LIST_STR, undefined);
385 return nullptr;
386 }
387 for (size_t i = 0; i < arrayLength; i++) {
388 napi_value cloneVal = NapiHelper::GetElement(env, args[0], i);
389 if (NapiHelper::IsBitVector(env, cloneVal)) {
390 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "BitVector not support setCloneList.");
391 return nullptr;
392 }
393 if (!NapiHelper::IsArrayBuffer(env, cloneVal) && !NapiHelper::IsSendable(env, cloneVal)) {
394 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
395 "the type of setCloneList elements in array must be arraybuffer or sendable.");
396 return nullptr;
397 }
398 }
399 napi_set_named_property(env, thisVar, CLONE_LIST_STR, args[0]);
400 return nullptr;
401 }
402
IsCanceled(napi_env env,napi_callback_info cbinfo)403 napi_value Task::IsCanceled(napi_env env, napi_callback_info cbinfo)
404 {
405 bool isCanceled = false;
406 auto engine = reinterpret_cast<NativeEngine*>(env);
407 if (!engine->IsTaskPoolThread()) {
408 HILOG_ERROR("taskpool:: call isCanceled not in taskpool thread");
409 return NapiHelper::CreateBooleanValue(env, isCanceled);
410 }
411 // Get task and query task cancel state
412 void* data = engine->GetCurrentTaskInfo();
413 if (data == nullptr) {
414 HILOG_ERROR("taskpool:: call isCanceled not in Concurrent function");
415 } else {
416 Task* task = static_cast<Task*>(data);
417 isCanceled = task->taskState_ == ExecuteState::CANCELED ? true : false;
418 }
419 return NapiHelper::CreateBooleanValue(env, isCanceled);
420 }
421
OnReceiveData(napi_env env,napi_callback_info cbinfo)422 napi_value Task::OnReceiveData(napi_env env, napi_callback_info cbinfo)
423 {
424 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
425 if (argc >= 2) { // 2: the number of parmas
426 HILOG_ERROR("taskpool:: the number of OnReceiveData parma must be less than 2");
427 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
428 "the number of OnReceiveData parma must be less than 2.");
429 return nullptr;
430 }
431
432 napi_value thisVar;
433 if (argc == 0) {
434 HILOG_INFO("taskpool:: Set taskpool.Task.onReceiveData to undefined");
435 napi_get_cb_info(env, cbinfo, &argc, nullptr, &thisVar, nullptr);
436 napi_value id = NapiHelper::GetNameProperty(env, thisVar, "taskId");
437 uint32_t taskId = NapiHelper::GetUint32Value(env, id);
438 TaskManager::GetInstance().RegisterCallback(env, taskId, nullptr);
439 return nullptr;
440 }
441
442 napi_value args[1];
443 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
444 napi_valuetype type;
445 NAPI_CALL(env, napi_typeof(env, args[0], &type));
446 if (type != napi_function) {
447 HILOG_ERROR("taskpool:: OnReceiveData's parameter should be function");
448 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
449 "the type of onReceiveData's parameter must be function.");
450 return nullptr;
451 }
452 // store callbackInfo
453 napi_value napiTaskId = NapiHelper::GetNameProperty(env, thisVar, "taskId");
454 uint32_t taskId = NapiHelper::GetUint32Value(env, napiTaskId);
455 auto task = TaskManager::GetInstance().GetTask(taskId);
456 if (task == nullptr) {
457 HILOG_ERROR("taskpool:: OnReceiveData's task is nullptr");
458 return nullptr;
459 }
460 napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, args[0], 1);
461 std::shared_ptr<CallbackInfo> callbackInfo = std::make_shared<CallbackInfo>(env, 1, callbackRef, task);
462 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
463 if (!task->IsMainThreadTask()) {
464 auto loop = NapiHelper::GetLibUV(env);
465 ConcurrentHelper::UvHandleInit(loop, callbackInfo->onCallbackSignal, TaskPool::ExecuteCallback);
466 }
467 #else
468 auto loop = NapiHelper::GetLibUV(env);
469 ConcurrentHelper::UvHandleInit(loop, callbackInfo->onCallbackSignal, TaskPool::ExecuteCallback);
470 #endif
471 TaskManager::GetInstance().RegisterCallback(env, taskId, callbackInfo);
472 return nullptr;
473 }
474
SendData(napi_env env,napi_callback_info cbinfo)475 napi_value Task::SendData(napi_env env, napi_callback_info cbinfo)
476 {
477 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
478 napi_value args[argc];
479 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
480
481 napi_value argsArray;
482 napi_create_array_with_length(env, argc, &argsArray);
483 for (size_t i = 0; i < argc; i++) {
484 napi_set_element(env, argsArray, i, args[i]);
485 }
486
487 auto engine = reinterpret_cast<NativeEngine*>(env);
488 if (!engine->IsTaskPoolThread()) {
489 HILOG_ERROR("taskpool:: SendData is not called in the taskpool thread");
490 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_IN_TASKPOOL_THREAD);
491 return nullptr;
492 }
493 Task* task = nullptr;
494 void* data = engine->GetCurrentTaskInfo();
495 if (data == nullptr) {
496 HILOG_ERROR("taskpool:: SendData is not called in the concurrent function");
497 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_IN_CONCURRENT_FUNCTION);
498 return nullptr;
499 } else {
500 task = static_cast<Task*>(data);
501 }
502
503 napi_value undefined = NapiHelper::GetUndefinedValue(env);
504 void* serializationArgs = nullptr;
505 bool defaultClone = false;
506 bool defaultTransfer = true;
507 napi_status status = napi_serialize_inner(env, argsArray, undefined, undefined,
508 defaultTransfer, defaultClone, &serializationArgs);
509 if (status != napi_ok || serializationArgs == nullptr) {
510 std::string errMessage = "taskpool:: failed to serialize function";
511 HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
512 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
513 return nullptr;
514 }
515
516 TaskResultInfo* resultInfo = new TaskResultInfo(task->env_, env, task->taskId_, serializationArgs);
517 return TaskManager::GetInstance().NotifyCallbackExecute(env, resultInfo, task);
518 }
519
AddDependency(napi_env env,napi_callback_info cbinfo)520 napi_value Task::AddDependency(napi_env env, napi_callback_info cbinfo)
521 {
522 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
523 if (argc == 0) {
524 std::string errMessage = "taskpool:: addDependency has no params";
525 HILOG_ERROR("%{public}s", errMessage.c_str());
526 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "addDependency has no params.");
527 return nullptr;
528 }
529
530 napi_status status = napi_ok;
531 HandleScope scope(env, status);
532 napi_value args[argc];
533 napi_value napiTask;
534 napi_get_cb_info(env, cbinfo, &argc, args, &napiTask, nullptr);
535 Task* task = nullptr;
536 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
537 if (task == nullptr) {
538 HILOG_ERROR("taskpool:: task is nullptr");
539 return nullptr;
540 }
541 std::string errMessage = "";
542 if (task->IsPeriodicTask()) {
543 HILOG_ERROR("taskpool:: the periodic task cannot have a dependency");
544 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_HAVE_DEPENDENCY);
545 return nullptr;
546 }
547 if (task->IsCommonTask() || task->IsSeqRunnerTask()) {
548 errMessage = "taskpool:: seqRunnerTask or executedTask cannot addDependency";
549 HILOG_ERROR("%{public}s", errMessage.c_str());
550 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
551 return nullptr;
552 }
553 if (task->IsAsyncRunnerTask()) {
554 HILOG_ERROR("taskpool:: AsyncRunnerTask cannot addDependency.");
555 ErrorHelper::ThrowError(env, ErrorHelper::ERR_ASYNCRUNNER_TASK_HAVE_DEPENDENCY);
556 return nullptr;
557 }
558 if (task->IsGroupCommonTask()) {
559 errMessage = "taskpool:: groupTask cannot addDependency";
560 HILOG_ERROR("%{public}s", errMessage.c_str());
561 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
562 return nullptr;
563 }
564 task->SetHasDependency(true);
565 std::set<uint32_t> idSet;
566 for (size_t i = 0; i < argc; i++) {
567 if (!NapiHelper::HasNameProperty(env, args[i], TASKID_STR)) {
568 errMessage = "taskpool:: addDependency param is not task";
569 HILOG_ERROR("%{public}s", errMessage.c_str());
570 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the addDependency param must be task.");
571 return nullptr;
572 } else {
573 Task* dependentTask = nullptr;
574 napi_unwrap(env, args[i], reinterpret_cast<void**>(&dependentTask));
575 if (dependentTask == nullptr) {
576 HILOG_ERROR("taskpool:: dependentTask is nullptr");
577 return nullptr;
578 }
579 if (dependentTask->taskId_ == task->taskId_) {
580 HILOG_ERROR("taskpool:: there is a circular dependency");
581 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CIRCULAR_DEPENDENCY);
582 return nullptr;
583 }
584 if (dependentTask->IsPeriodicTask()) {
585 HILOG_ERROR("taskpool:: the periodic task cannot have a dependency");
586 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_HAVE_DEPENDENCY);
587 return nullptr;
588 }
589 if (dependentTask->IsCommonTask() || dependentTask->IsSeqRunnerTask()) {
590 errMessage = "taskpool:: seqRunnerTask or executedTask cannot be relied on";
591 HILOG_ERROR("%{public}s", errMessage.c_str());
592 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
593 return nullptr;
594 }
595 if (dependentTask->IsAsyncRunnerTask()) {
596 HILOG_ERROR("taskpool:: AsyncRunnerTask cannot be relied on.");
597 ErrorHelper::ThrowError(env, ErrorHelper::ERR_ASYNCRUNNER_TASK_HAVE_DEPENDENCY);
598 return nullptr;
599 }
600 if (dependentTask->IsGroupCommonTask()) {
601 errMessage = "taskpool:: groupTask cannot be relied on";
602 HILOG_ERROR("%{public}s", errMessage.c_str());
603 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
604 return nullptr;
605 }
606 idSet.emplace(dependentTask->taskId_);
607 dependentTask->SetHasDependency(true);
608 }
609 }
610 if (!TaskManager::GetInstance().StoreTaskDependency(task->taskId_, idSet)) {
611 HILOG_ERROR("taskpool:: there is a circular dependency");
612 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CIRCULAR_DEPENDENCY);
613 }
614 std::string strTrace = "Task::AddDependency: ";
615 HITRACE_HELPER_METER_NAME(strTrace + TaskManager::GetInstance().GetTaskDependInfoToString(task->taskId_));
616 return nullptr;
617 }
618
RemoveDependency(napi_env env,napi_callback_info cbinfo)619 napi_value Task::RemoveDependency(napi_env env, napi_callback_info cbinfo)
620 {
621 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
622 if (argc == 0) {
623 std::string errMessage = "taskpool:: removeDependency has no params";
624 HILOG_ERROR("%{public}s", errMessage.c_str());
625 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "removeDependency has no params.");
626 return nullptr;
627 }
628 napi_status status = napi_ok;
629 HandleScope scope(env, status);
630 napi_value args[argc];
631 napi_value napiTask;
632 napi_get_cb_info(env, cbinfo, &argc, args, &napiTask, nullptr);
633 Task* task = nullptr;
634 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
635 if (task == nullptr) {
636 HILOG_ERROR("taskpool:: the task is nullptr");
637 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the task is nullptr");
638 return nullptr;
639 }
640 if (!task->HasDependency()) {
641 ThrowNoDependencyError(env);
642 return nullptr;
643 }
644 if (task->IsPeriodicTask()) {
645 HILOG_ERROR("taskpool:: the periodic task cannot call removeDependency");
646 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_HAVE_DEPENDENCY);
647 return nullptr;
648 }
649 if (task->IsCommonTask()) {
650 std::string errMessage = "taskpool:: executedTask cannot removeDependency";
651 HILOG_ERROR("%{public}s", errMessage.c_str());
652 ErrorHelper::ThrowError(env, ErrorHelper::ERR_INEXISTENT_DEPENDENCY, errMessage.c_str());
653 return nullptr;
654 }
655 if (task->IsAsyncRunnerTask()) {
656 HILOG_ERROR("taskpool:: AsyncRunnerTask cannot call removeDependency.");
657 ErrorHelper::ThrowError(env, ErrorHelper::ERR_ASYNCRUNNER_TASK_HAVE_DEPENDENCY);
658 return nullptr;
659 }
660 for (size_t i = 0; i < argc; i++) {
661 if (!NapiHelper::HasNameProperty(env, args[i], TASKID_STR)) {
662 std::string errMessage = "taskpool:: removeDependency param is not task";
663 HILOG_ERROR("%{public}s", errMessage.c_str());
664 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of removeDependency param must be task.");
665 return nullptr;
666 }
667 Task* dependentTask = nullptr;
668 napi_unwrap(env, args[i], reinterpret_cast<void**>(&dependentTask));
669 if (dependentTask == nullptr) {
670 HILOG_ERROR("taskpool:: the dependent task is nullptr");
671 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the dependent task is nullptr");
672 return nullptr;
673 }
674 if (!dependentTask->HasDependency()) {
675 ThrowNoDependencyError(env);
676 return nullptr;
677 }
678 if (dependentTask->IsPeriodicTask()) {
679 HILOG_ERROR("taskpool:: the periodic task cannot call removeDependency");
680 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_HAVE_DEPENDENCY);
681 return nullptr;
682 }
683 if (dependentTask->IsCommonTask()) {
684 std::string errMessage = "taskpool:: cannot removeDependency on a dependent and executed task";
685 HILOG_ERROR("%{public}s", errMessage.c_str());
686 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
687 return nullptr;
688 }
689 if (dependentTask->IsAsyncRunnerTask()) {
690 HILOG_ERROR("taskpool:: AsyncRunnerTask cannot call removeDependency.");
691 ErrorHelper::ThrowError(env, ErrorHelper::ERR_ASYNCRUNNER_TASK_HAVE_DEPENDENCY);
692 return nullptr;
693 }
694 if (!TaskManager::GetInstance().RemoveTaskDependency(task->taskId_, dependentTask->taskId_)) {
695 HILOG_ERROR("taskpool:: the dependency does not exist");
696 ErrorHelper::ThrowError(env, ErrorHelper::ERR_INEXISTENT_DEPENDENCY);
697 return nullptr;
698 }
699 dependentTask->TryClearHasDependency();
700 }
701 task->TryClearHasDependency();
702 std::string strTrace = "Task::RemoveDependency: ";
703 HITRACE_HELPER_METER_NAME(strTrace + TaskManager::GetInstance().GetTaskDependInfoToString(task->taskId_));
704 return nullptr;
705 }
706
StartExecutionCallback(const uv_async_t * req)707 void Task::StartExecutionCallback(const uv_async_t* req)
708 {
709 HILOG_DEBUG("taskpool:: task StartExecutionCallback");
710 auto listenerCallBackInfo = static_cast<ListenerCallBackInfo*>(req->data);
711 if (listenerCallBackInfo == nullptr) { // LCOV_EXCL_BR_LINE
712 HILOG_FATAL("taskpool:: StartExecutionCallBackInfo is null");
713 return;
714 }
715 StartExecutionTask(listenerCallBackInfo);
716 }
717
StartExecutionTask(ListenerCallBackInfo * listenerCallBackInfo)718 void Task::StartExecutionTask(ListenerCallBackInfo* listenerCallBackInfo)
719 {
720 auto env = listenerCallBackInfo->env_;
721 napi_status status = napi_ok;
722 HandleScope scope(env, status);
723 if (status != napi_ok) {
724 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
725 return;
726 }
727 auto func = NapiHelper::GetReferenceValue(env, listenerCallBackInfo->callbackRef_);
728 if (func == nullptr) {
729 HILOG_INFO("taskpool:: StartExecutionCallback func is null");
730 return;
731 }
732
733 napi_value result;
734 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, 0, nullptr, &result);
735 if (NapiHelper::IsExceptionPending(env)) {
736 napi_value exception = nullptr;
737 napi_get_and_clear_last_exception(env, &exception);
738 std::string funcStr = NapiHelper::GetPrintString(env, func);
739 HILOG_ERROR("taskpool:: an exception has occurred napi_call_function, func is %{public}s", funcStr.c_str());
740 }
741 }
742
ExecuteListenerCallback(ListenerCallBackInfo * listenerCallBackInfo)743 void Task::ExecuteListenerCallback(ListenerCallBackInfo* listenerCallBackInfo)
744 {
745 HILOG_DEBUG("taskpool:: task ExecuteListenerCallback");
746 if (listenerCallBackInfo == nullptr) { // LCOV_EXCL_BR_LINE
747 HILOG_FATAL("taskpool:: listenerCallBackInfo is null");
748 return;
749 }
750
751 napi_env env = listenerCallBackInfo->env_;
752 napi_value func = NapiHelper::GetReferenceValue(env, listenerCallBackInfo->callbackRef_);
753 if (func == nullptr) {
754 HILOG_INFO("taskpool:: ExecuteListenerCallback func is null");
755 return;
756 }
757
758 napi_value result;
759 napi_value args = listenerCallBackInfo->taskError_;
760 if (args != nullptr) {
761 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, 1, &args, &result);
762 } else {
763 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, 0, nullptr, &result);
764 }
765
766 if (NapiHelper::IsExceptionPending(env)) {
767 napi_value exception = nullptr;
768 napi_get_and_clear_last_exception(env, &exception);
769 std::string funcStr = NapiHelper::GetPrintString(env, func);
770 HILOG_ERROR("taskpool:: an exception has occurred napi_call_function, func is %{public}s", funcStr.c_str());
771 }
772 }
773
OnEnqueued(napi_env env,napi_callback_info cbinfo)774 napi_value Task::OnEnqueued(napi_env env, napi_callback_info cbinfo)
775 {
776 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
777 napi_value thisVar;
778 if (argc == 0) {
779 HILOG_INFO("taskpool:: the number of the params must be one");
780 return nullptr;
781 }
782
783 napi_value args[1];
784 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
785 napi_valuetype type;
786 NAPI_CALL(env, napi_typeof(env, args[0], &type));
787 if (type != napi_function) {
788 HILOG_ERROR("taskpool:: OnEnqueued's parameter should be function");
789 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of OnEnqueued's parameter must be function.");
790 return nullptr;
791 }
792
793 Task* task = nullptr;
794 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
795 if (task == nullptr) {
796 HILOG_ERROR("taskpool:: task is nullptr");
797 return nullptr;
798 }
799
800 if (task->taskState_ != ExecuteState::NOT_FOUND) {
801 HILOG_ERROR("taskpool:: The executed task does not support the registration of listeners.");
802 ErrorHelper::ThrowError(env, ErrorHelper::ERR_REGISTRATION_OF_LISTENERS);
803 return nullptr;
804 }
805
806 napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, args[0], 1);
807 task->onEnqueuedCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr);
808 return nullptr;
809 }
810
OnStartExecution(napi_env env,napi_callback_info cbinfo)811 napi_value Task::OnStartExecution(napi_env env, napi_callback_info cbinfo)
812 {
813 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
814 napi_value thisVar;
815 if (argc == 0) {
816 HILOG_INFO("taskpool:: the number of the params must be one");
817 return nullptr;
818 }
819
820 napi_value args[1];
821 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
822 napi_valuetype type;
823 NAPI_CALL(env, napi_typeof(env, args[0], &type));
824 if (type != napi_function) {
825 HILOG_ERROR("taskpool:: OnStartExecution's parameter should be function");
826 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
827 "the type of OnStartExecution's parameter must be function.");
828 return nullptr;
829 }
830
831 Task* task = nullptr;
832 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
833 if (task == nullptr) {
834 HILOG_ERROR("taskpool:: task is nullptr");
835 return nullptr;
836 }
837
838 if (task->taskState_ != ExecuteState::NOT_FOUND) {
839 HILOG_ERROR("taskpool:: The executed task does not support the registration of listeners.");
840 ErrorHelper::ThrowError(env, ErrorHelper::ERR_REGISTRATION_OF_LISTENERS);
841 return nullptr;
842 }
843
844 napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, args[0], 1);
845 task->onStartExecutionCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr);
846 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
847 if (!task->IsMainThreadTask()) {
848 auto loop = NapiHelper::GetLibUV(env);
849 ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_,
850 Task::StartExecutionCallback, task->onStartExecutionCallBackInfo_);
851 }
852 #else
853 auto loop = NapiHelper::GetLibUV(env);
854 ConcurrentHelper::UvHandleInit(loop, task->onStartExecutionSignal_,
855 Task::StartExecutionCallback, task->onStartExecutionCallBackInfo_);
856 #endif
857
858 return nullptr;
859 }
860
OnExecutionFailed(napi_env env,napi_callback_info cbinfo)861 napi_value Task::OnExecutionFailed(napi_env env, napi_callback_info cbinfo)
862 {
863 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
864 napi_value thisVar;
865 if (argc == 0) {
866 HILOG_INFO("taskpool:: the number of the params must be one");
867 return nullptr;
868 }
869
870 napi_value args[1];
871 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
872 napi_valuetype type;
873 NAPI_CALL(env, napi_typeof(env, args[0], &type));
874 if (type != napi_function) {
875 HILOG_ERROR("taskpool:: OnExecutionFailed's parameter should be function");
876 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
877 "the type of OnExecutionFailed's parameter must be function.");
878 return nullptr;
879 }
880
881 Task* task = nullptr;
882 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
883 if (task == nullptr) {
884 HILOG_ERROR("taskpool:: task is nullptr");
885 return nullptr;
886 }
887
888 if (task->taskState_ != ExecuteState::NOT_FOUND) {
889 HILOG_ERROR("taskpool:: The executed task does not support the registration of listeners.");
890 ErrorHelper::ThrowError(env, ErrorHelper::ERR_REGISTRATION_OF_LISTENERS);
891 return nullptr;
892 }
893
894 napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, args[0], 1);
895 task->onExecutionFailedCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr);
896 return nullptr;
897 }
898
OnExecutionSucceeded(napi_env env,napi_callback_info cbinfo)899 napi_value Task::OnExecutionSucceeded(napi_env env, napi_callback_info cbinfo)
900 {
901 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
902 napi_value thisVar;
903 if (argc == 0) {
904 HILOG_INFO("taskpool:: the number of the params must be one");
905 return nullptr;
906 }
907
908 napi_value args[1];
909 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
910 napi_valuetype type;
911 NAPI_CALL(env, napi_typeof(env, args[0], &type));
912 if (type != napi_function) {
913 HILOG_ERROR("taskpool:: OnExecutionSucceeded's parameter should be function");
914 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
915 "the type of OnExecutionSucceeded's parameter must be function");
916 return nullptr;
917 }
918
919 Task* task = nullptr;
920 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
921 if (task == nullptr) {
922 HILOG_ERROR("taskpool:: task is nullptr");
923 return nullptr;
924 }
925
926 if (task->taskState_ != ExecuteState::NOT_FOUND) {
927 HILOG_ERROR("taskpool:: The executed task does not support the registration of listeners.");
928 ErrorHelper::ThrowError(env, ErrorHelper::ERR_REGISTRATION_OF_LISTENERS);
929 return nullptr;
930 }
931
932 napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, args[0], 1);
933 task->onExecutionSucceededCallBackInfo_ = new ListenerCallBackInfo(env, callbackRef, nullptr);
934 return nullptr;
935 }
936
IsDone(napi_env env,napi_callback_info cbinfo)937 napi_value Task::IsDone(napi_env env, napi_callback_info cbinfo)
938 {
939 napi_value thisVar = nullptr;
940 napi_get_cb_info(env, cbinfo, nullptr, nullptr, &thisVar, nullptr);
941 Task* task = nullptr;
942 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
943 if (task == nullptr) {
944 HILOG_ERROR("taskpool:: task is nullptr");
945 return NapiHelper::CreateBooleanValue(env, false);
946 }
947
948 if (task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
949 return NapiHelper::CreateBooleanValue(env, true);
950 }
951 return NapiHelper::CreateBooleanValue(env, false);
952 }
953
GetTaskDuration(napi_env env,napi_callback_info & cbinfo,std::string durationType)954 napi_value Task::GetTaskDuration(napi_env env, napi_callback_info& cbinfo, std::string durationType)
955 {
956 napi_value thisVar = nullptr;
957 Task* task = nullptr;
958 napi_get_cb_info(env, cbinfo, nullptr, nullptr, &thisVar, nullptr);
959 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
960 if (task == nullptr) {
961 uint64_t totalDuration = 0;
962 return NapiHelper::CreateUint32(env, totalDuration);
963 }
964 uint64_t totalDuration = TaskManager::GetInstance().GetTaskDuration(task->taskId_, durationType);
965 return NapiHelper::CreateUint32(env, totalDuration);
966 }
967
GetTotalDuration(napi_env env,napi_callback_info cbinfo)968 napi_value Task::GetTotalDuration(napi_env env, napi_callback_info cbinfo)
969 {
970 return GetTaskDuration(env, cbinfo, TASK_TOTAL_TIME);
971 }
972
GetCPUDuration(napi_env env,napi_callback_info cbinfo)973 napi_value Task::GetCPUDuration(napi_env env, napi_callback_info cbinfo)
974 {
975 return GetTaskDuration(env, cbinfo, TASK_CPU_TIME);
976 }
977
GetIODuration(napi_env env,napi_callback_info cbinfo)978 napi_value Task::GetIODuration(napi_env env, napi_callback_info cbinfo)
979 {
980 return GetTaskDuration(env, cbinfo, TASK_IO_TIME);
981 }
982
GetName(napi_env env,napi_callback_info cbinfo)983 napi_value Task::GetName(napi_env env, napi_callback_info cbinfo)
984 {
985 napi_value thisVar = nullptr;
986 Task* task = nullptr;
987 napi_get_cb_info(env, cbinfo, nullptr, nullptr, &thisVar, nullptr);
988 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
989 if (task == nullptr) {
990 return NapiHelper::CreateEmptyString(env);
991 }
992 napi_value name = nullptr;
993 napi_create_string_utf8(env, task->name_.c_str(), NAPI_AUTO_LENGTH, &name);
994 return name;
995 }
996
GetTaskId(napi_env env,napi_callback_info cbinfo)997 napi_value Task::GetTaskId(napi_env env, napi_callback_info cbinfo)
998 {
999 napi_value thisVar = nullptr;
1000 Task* task = nullptr;
1001 napi_get_cb_info(env, cbinfo, nullptr, nullptr, &thisVar, nullptr);
1002 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
1003 if (task == nullptr) {
1004 return NapiHelper::CreateUint32(env, 0); // 0 : default value
1005 }
1006 return NapiHelper::CreateUint32(env, task->taskId_);
1007 }
1008
UpdateTaskType(TaskType taskType)1009 void Task::UpdateTaskType(TaskType taskType)
1010 {
1011 taskType_ = taskType;
1012 napi_reference_ref(env_, taskRef_, nullptr);
1013 }
1014
IsRepeatableTask() const1015 bool Task::IsRepeatableTask() const
1016 {
1017 return IsCommonTask() || IsGroupCommonTask() || IsGroupFunctionTask();
1018 }
1019
IsGroupTask() const1020 bool Task::IsGroupTask() const
1021 {
1022 return IsGroupCommonTask() || IsGroupFunctionTask();
1023 }
1024
IsGroupCommonTask() const1025 bool Task::IsGroupCommonTask() const
1026 {
1027 return taskType_ == TaskType::GROUP_COMMON_TASK;
1028 }
1029
IsGroupFunctionTask() const1030 bool Task::IsGroupFunctionTask() const
1031 {
1032 return taskType_ == TaskType::GROUP_FUNCTION_TASK;
1033 }
1034
IsCommonTask() const1035 bool Task::IsCommonTask() const
1036 {
1037 return taskType_ == TaskType::COMMON_TASK;
1038 }
1039
IsSeqRunnerTask() const1040 bool Task::IsSeqRunnerTask() const
1041 {
1042 return taskType_ == TaskType::SEQRUNNER_TASK;
1043 }
1044
IsFunctionTask() const1045 bool Task::IsFunctionTask() const
1046 {
1047 return taskType_ == TaskType::FUNCTION_TASK;
1048 }
1049
IsLongTask() const1050 bool Task::IsLongTask() const
1051 {
1052 return isLongTask_;
1053 }
1054
IsPeriodicTask() const1055 bool Task::IsPeriodicTask() const
1056 {
1057 return isPeriodicTask_;
1058 }
1059
IsMainThreadTask() const1060 bool Task::IsMainThreadTask() const
1061 {
1062 return isMainThreadTask_;
1063 }
1064
1065 // The uninitialized state is Task, and then taskType_ will be updated based on the task type.
IsExecuted() const1066 bool Task::IsExecuted() const
1067 {
1068 return taskType_ != TaskType::TASK;
1069 }
1070
GenerateTaskInfo(napi_env env,napi_value func,napi_value args,napi_value transferList,napi_value cloneList,Priority priority,bool defaultTransfer,bool defaultCloneSendable)1071 TaskInfo* Task::GenerateTaskInfo(napi_env env, napi_value func, napi_value args,
1072 napi_value transferList, napi_value cloneList, Priority priority,
1073 bool defaultTransfer, bool defaultCloneSendable)
1074 {
1075 HILOG_DEBUG("taskpool:: task GenerateTaskInfo");
1076 napi_value undefined = NapiHelper::GetUndefinedValue(env);
1077 void* serializationFunction = nullptr;
1078 napi_status status = napi_serialize_inner(env, func, undefined, undefined,
1079 defaultTransfer, defaultCloneSendable, &serializationFunction);
1080 std::string errMessage = "";
1081 if (status != napi_ok || serializationFunction == nullptr) {
1082 errMessage = "taskpool: failed to serialize function.";
1083 HILOG_ERROR("%{public}s", errMessage.c_str());
1084 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_CONCURRENT_FUNCTION, errMessage.c_str());
1085 return nullptr;
1086 }
1087 void* serializationArguments = nullptr;
1088 status = napi_serialize_inner(env, args, transferList, cloneList,
1089 defaultTransfer, defaultCloneSendable, &serializationArguments);
1090 if (status != napi_ok || serializationArguments == nullptr) {
1091 errMessage = "taskpool: failed to serialize arguments.";
1092 HILOG_ERROR("%{public}s", errMessage.c_str());
1093 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
1094 return nullptr;
1095 }
1096
1097 TaskInfo* taskInfo = new TaskInfo();
1098 taskInfo->serializationFunction = serializationFunction;
1099 taskInfo->serializationArguments = serializationArguments;
1100 taskInfo->priority = priority;
1101 reinterpret_cast<NativeEngine*>(env)->IncreaseSubEnvCounter();
1102 return taskInfo;
1103 }
1104
IncreaseRefCount()1105 void Task::IncreaseRefCount()
1106 {
1107 taskRefCount_.fetch_add(2); // 2 : for PerformTask and TaskResultCallback
1108 }
1109
DecreaseRefCount()1110 void Task::DecreaseRefCount()
1111 {
1112 taskRefCount_.fetch_sub(1);
1113 }
1114
IsReadyToHandle() const1115 bool Task::IsReadyToHandle() const
1116 {
1117 return (taskRefCount_ & 1) == 0;
1118 }
1119
NotifyPendingTask()1120 void Task::NotifyPendingTask()
1121 {
1122 HILOG_DEBUG("taskpool:: task:%{public}s NotifyPendingTask", std::to_string(taskId_).c_str());
1123 TaskManager::GetInstance().NotifyDependencyTaskInfo(taskId_);
1124 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1125 delete currentTaskInfo_;
1126 if (pendingTaskInfos_.empty()) {
1127 currentTaskInfo_ = nullptr;
1128 HILOG_DEBUG("taskpool:: task:%{public}s NotifyPendingTask end, currentTaskInfo_ nullptr",
1129 std::to_string(taskId_).c_str());
1130 return;
1131 }
1132 currentTaskInfo_ = pendingTaskInfos_.front();
1133 pendingTaskInfos_.pop_front();
1134 taskState_ = ExecuteState::WAITING;
1135 TaskManager::GetInstance().EnqueueTaskId(taskId_, currentTaskInfo_->priority);
1136 }
1137
CancelPendingTask(napi_env env)1138 void Task::CancelPendingTask(napi_env env)
1139 {
1140 HILOG_DEBUG("taskpool:: task:%{public}s CancelPendingTask", std::to_string(taskId_).c_str());
1141 std::list<napi_deferred> deferreds {};
1142 {
1143 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1144 if (pendingTaskInfos_.empty()) {
1145 HILOG_DEBUG("taskpool:: task CancelPendingTask end, pendingTaskInfos_ nullptr");
1146 return;
1147 }
1148 auto engine = reinterpret_cast<NativeEngine*>(env);
1149 for (const auto& info : pendingTaskInfos_) {
1150 engine->DecreaseSubEnvCounter();
1151 if (!IsPeriodicTask()) {
1152 deferreds.push_back(info->deferred);
1153 }
1154 napi_reference_unref(env, taskRef_, nullptr);
1155 delete info;
1156 }
1157 pendingTaskInfos_.clear();
1158 }
1159 std::string error = "taskpool:: task has been canceled";
1160 TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error);
1161 }
1162
UpdateTask(uint64_t startTime,void * worker)1163 bool Task::UpdateTask(uint64_t startTime, void* worker)
1164 {
1165 HILOG_DEBUG("taskpool:: task:%{public}s UpdateTask", std::to_string(taskId_).c_str());
1166 if (taskState_ != ExecuteState::CANCELED) {
1167 taskState_ = ExecuteState::RUNNING;
1168 }
1169 startTime_ = startTime;
1170 worker_ = worker;
1171 return true;
1172 }
1173
DeserializeValue(napi_env env,napi_value * func,napi_value * args)1174 napi_value Task::DeserializeValue(napi_env env, napi_value* func, napi_value* args)
1175 {
1176 void* serializationFunction = nullptr;
1177 void* serializationArguments = nullptr;
1178 {
1179 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1180 if (UNLIKELY(currentTaskInfo_ == nullptr)) {
1181 HILOG_ERROR("taskpool:: the currentTaskInfo is nullptr, the task may have been cancelled");
1182 return nullptr;
1183 }
1184 serializationFunction = currentTaskInfo_->serializationFunction;
1185 serializationArguments = currentTaskInfo_->serializationArguments;
1186 }
1187 napi_status status = napi_ok;
1188 std::string errMessage = "";
1189 status = napi_deserialize(env, serializationFunction, func);
1190 if (!IsGroupFunctionTask()) {
1191 napi_delete_serialization_data(env, serializationFunction);
1192 }
1193 if (status != napi_ok || func == nullptr) {
1194 errMessage = "taskpool:: failed to deserialize function.";
1195 HILOG_ERROR("%{public}s", errMessage.c_str());
1196 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
1197 success_ = false;
1198 return err;
1199 }
1200
1201 status = napi_deserialize(env, serializationArguments, args);
1202 if (!IsGroupFunctionTask()) {
1203 napi_delete_serialization_data(env, serializationArguments);
1204 }
1205 if (status != napi_ok || args == nullptr) {
1206 errMessage = "taskpool:: failed to deserialize function.";
1207 HILOG_ERROR("%{public}s", errMessage.c_str());
1208 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
1209 success_ = false;
1210 return err;
1211 }
1212 return nullptr;
1213 }
1214
StoreTaskDuration()1215 void Task::StoreTaskDuration()
1216 {
1217 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId_).c_str());
1218 cpuTime_ = ConcurrentHelper::GetMilliseconds();
1219 uint64_t cpuDuration = cpuTime_ - startTime_;
1220 if (ioTime_ != 0) {
1221 uint64_t ioDuration = ioTime_ - startTime_;
1222 TaskManager::GetInstance().StoreTaskDuration(taskId_, std::max(cpuDuration, ioDuration), cpuDuration);
1223 } else {
1224 TaskManager::GetInstance().StoreTaskDuration(taskId_, 0, cpuDuration);
1225 }
1226 }
1227
CanForSequenceRunner(napi_env env)1228 bool Task::CanForSequenceRunner(napi_env env)
1229 {
1230 std::string errMessage = "";
1231 // task with dependence is not allowed
1232 if (HasDependency()) {
1233 errMessage = "seqRunner:: dependent task not allowed.";
1234 HILOG_ERROR("%{public}s", errMessage.c_str());
1235 ErrorHelper::ThrowError(env, ErrorHelper::ERR_ADD_DEPENDENT_TASK_TO_SEQRUNNER, errMessage.c_str());
1236 return false;
1237 }
1238 if (IsPeriodicTask()) {
1239 errMessage = "taskpool:: SequenceRunner cannot execute the periodicTask";
1240 HILOG_ERROR("%{public}s", errMessage.c_str());
1241 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_EXECUTE_AGAIN, errMessage.c_str());
1242 return false;
1243 }
1244 if (IsCommonTask() || IsSeqRunnerTask()) {
1245 errMessage = "taskpool:: SequenceRunner cannot execute seqRunnerTask or executedTask";
1246 HILOG_ERROR("%{public}s", errMessage.c_str());
1247 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1248 return false;
1249 }
1250 if (IsGroupCommonTask()) {
1251 errMessage = "taskpool:: SequenceRunner cannot execute groupTask";
1252 HILOG_ERROR("%{public}s", errMessage.c_str());
1253 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1254 return false;
1255 }
1256 if (IsAsyncRunnerTask()) {
1257 errMessage = "SequenceRunner cannot execute asyncRunnerTask.";
1258 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
1259 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_CANNOT_EXECUTED, errMessage.c_str());
1260 return false;
1261 }
1262 return true;
1263 }
1264
CanForTaskGroup(napi_env env)1265 bool Task::CanForTaskGroup(napi_env env)
1266 {
1267 std::string errMessage = "";
1268 if (HasDependency()) {
1269 errMessage = "taskpool:: dependent task not allowed.";
1270 HILOG_ERROR("%{public}s", errMessage.c_str());
1271 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1272 return false;
1273 }
1274 if (IsPeriodicTask()) {
1275 errMessage = "taskpool:: The interface does not support the periodicTask";
1276 HILOG_ERROR("%{public}s", errMessage.c_str());
1277 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_EXECUTE_AGAIN, errMessage.c_str());
1278 return false;
1279 }
1280 if (IsCommonTask() || IsSeqRunnerTask()) {
1281 errMessage = "taskpool:: taskGroup cannot add seqRunnerTask or executedTask";
1282 HILOG_ERROR("%{public}s", errMessage.c_str());
1283 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1284 return false;
1285 }
1286 if (IsGroupCommonTask()) {
1287 errMessage = "taskpool:: taskGroup cannot add groupTask";
1288 HILOG_ERROR("%{public}s", errMessage.c_str());
1289 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1290 return false;
1291 }
1292 if (IsLongTask()) {
1293 errMessage = "taskpool:: The interface does not support the long task";
1294 HILOG_ERROR("%{public}s", errMessage.c_str());
1295 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1296 return false;
1297 }
1298 if (IsAsyncRunnerTask()) {
1299 errMessage = "TaskGroup cannot execute asyncRunnerTask.";
1300 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
1301 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_CANNOT_EXECUTED, errMessage.c_str());
1302 return false;
1303 }
1304 taskType_ = TaskType::GROUP_COMMON_TASK;
1305 return true;
1306 }
1307
CanExecute(napi_env env)1308 bool Task::CanExecute(napi_env env)
1309 {
1310 std::string errMessage = "";
1311 if (IsGroupCommonTask()) {
1312 errMessage = "taskpool:: groupTask cannot execute outside";
1313 HILOG_ERROR("%{public}s", errMessage.c_str());
1314 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1315 return false;
1316 }
1317 if (IsSeqRunnerTask()) {
1318 errMessage = "taskpool:: seqRunnerTask cannot execute outside";
1319 HILOG_ERROR("%{public}s", errMessage.c_str());
1320 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1321 return false;
1322 }
1323 if (IsCommonTask() && HasDependency()) {
1324 errMessage = "taskpool:: executedTask with dependency cannot execute again";
1325 HILOG_ERROR("%{public}s", errMessage.c_str());
1326 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1327 return false;
1328 }
1329 if (IsExecuted() && IsLongTask()) {
1330 errMessage = "taskpool:: The long task can only be executed once";
1331 HILOG_ERROR("%{public}s", errMessage.c_str());
1332 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1333 return false;
1334 }
1335 if (IsPeriodicTask()) {
1336 errMessage = "taskpool:: the periodicTask cannot execute again";
1337 HILOG_ERROR("%{public}s", errMessage.c_str());
1338 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_EXECUTE_AGAIN, errMessage.c_str());
1339 return false;
1340 }
1341 if (IsAsyncRunnerTask()) {
1342 errMessage = "AsyncRunnerTask cannot execute outside.";
1343 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
1344 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_CANNOT_EXECUTED, errMessage.c_str());
1345 return false;
1346 }
1347 return true;
1348 }
1349
CanExecuteDelayed(napi_env env)1350 bool Task::CanExecuteDelayed(napi_env env)
1351 {
1352 std::string errMessage = "";
1353 if (IsGroupCommonTask()) {
1354 errMessage = "taskpool:: groupTask cannot executeDelayed outside";
1355 HILOG_ERROR("%{public}s", errMessage.c_str());
1356 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1357 return false;
1358 }
1359 if (IsSeqRunnerTask()) {
1360 errMessage = "taskpool:: seqRunnerTask cannot executeDelayed outside";
1361 HILOG_ERROR("%{public}s", errMessage.c_str());
1362 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1363 return false;
1364 }
1365 if (IsCommonTask() && HasDependency()) {
1366 errMessage = "taskpool:: executedTask with dependency cannot executeDelayed again";
1367 HILOG_ERROR("%{public}s", errMessage.c_str());
1368 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1369 return false;
1370 }
1371 if (IsExecuted() && IsLongTask()) {
1372 errMessage = "taskpool:: Multiple executions of longTask are not supported in the executeDelayed";
1373 HILOG_ERROR("%{public}s", errMessage.c_str());
1374 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
1375 return false;
1376 }
1377 if (IsPeriodicTask()) {
1378 errMessage = "taskpool:: the periodicTask cannot executeDelayed";
1379 HILOG_ERROR("%{public}s", errMessage.c_str());
1380 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_EXECUTE_AGAIN, errMessage.c_str());
1381 return false;
1382 }
1383 if (IsAsyncRunnerTask()) {
1384 errMessage = "AsyncRunnerTask cannot executeDelayed.";
1385 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
1386 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_CANNOT_EXECUTED, errMessage.c_str());
1387 return false;
1388 }
1389 return true;
1390 }
1391
CanExecutePeriodically(napi_env env)1392 bool Task::CanExecutePeriodically(napi_env env)
1393 {
1394 if (IsAsyncRunnerTask()) {
1395 std::string errMessage = "AsyncRunnerTask cannot executePeriodically.";
1396 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_CANNOT_EXECUTED, errMessage.c_str());
1397 return false;
1398 }
1399 if (IsExecuted() || IsPeriodicTask()) {
1400 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_EXECUTE_PERIODICALLY);
1401 return false;
1402 }
1403 if (HasDependency()) {
1404 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
1405 "taskpool:: the task with dependency cannot executePeriodically");
1406 return false;
1407 }
1408 return true;
1409 }
1410
SetHasDependency(bool hasDependency)1411 void Task::SetHasDependency(bool hasDependency)
1412 {
1413 hasDependency_ = hasDependency;
1414 }
1415
HasDependency() const1416 bool Task::HasDependency() const
1417 {
1418 return hasDependency_;
1419 }
1420
TryClearHasDependency()1421 void Task::TryClearHasDependency()
1422 {
1423 HILOG_DEBUG("taskpool:: task:%{public}s TryClearHasDependency", std::to_string(taskId_).c_str());
1424 if (IsExecuted()) {
1425 HILOG_DEBUG("taskpool:: task TryClearHasDependency end, task is executed");
1426 return;
1427 }
1428 if ((!TaskManager::GetInstance().IsDependentByTaskId(taskId_)) &&
1429 (!TaskManager::GetInstance().IsDependendByTaskId(taskId_))) {
1430 SetHasDependency(false);
1431 }
1432 }
1433
ThrowNoDependencyError(napi_env env)1434 void Task::ThrowNoDependencyError(napi_env env)
1435 {
1436 std::string errMessage = "taskpool:: task has no dependency";
1437 HILOG_ERROR("%{public}s", errMessage.c_str());
1438 ErrorHelper::ThrowError(env, ErrorHelper::ERR_INEXISTENT_DEPENDENCY, errMessage.c_str());
1439 }
1440
UpdatePeriodicTask()1441 void Task::UpdatePeriodicTask()
1442 {
1443 taskType_ = TaskType::COMMON_TASK;
1444 napi_reference_ref(env_, taskRef_, nullptr);
1445 isPeriodicTask_ = true;
1446 }
1447
InitHandle(napi_env env)1448 void Task::InitHandle(napi_env env)
1449 {
1450 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1451 if (!OHOS::AppExecFwk::EventRunner::IsAppMainThread()) {
1452 uv_loop_t* loop = NapiHelper::GetLibUV(env);
1453 ConcurrentHelper::UvHandleInit(loop, onResultSignal_, TaskPool::HandleTaskResult, this);
1454 ConcurrentHelper::UvHandleInit(loop, onStartCancelSignal_, Task::Cancel);
1455 ConcurrentHelper::UvHandleInit(loop, onStartDiscardSignal_, Task::DiscardTask);
1456 } else {
1457 isMainThreadTask_ = true;
1458 HILOG_DEBUG("taskpool:: eventrunner should be nullptr if the current thread is not the main thread");
1459 }
1460 #else
1461 uv_loop_t* loop = NapiHelper::GetLibUV(env);
1462 ConcurrentHelper::UvHandleInit(loop, onResultSignal_, TaskPool::HandleTaskResult, this);
1463 ConcurrentHelper::UvHandleInit(loop, onStartCancelSignal_, Task::Cancel);
1464 ConcurrentHelper::UvHandleInit(loop, onStartDiscardSignal_, Task::DiscardTask);
1465 auto engine = reinterpret_cast<NativeEngine*>(env);
1466 isMainThreadTask_ = engine->IsMainThread();
1467 #endif
1468 }
1469
ClearDelayedTimers()1470 void Task::ClearDelayedTimers()
1471 {
1472 HILOG_DEBUG("taskpool:: task ClearDelayedTimers");
1473 std::list<napi_deferred> deferreds {};
1474 {
1475 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1476 TaskMessage* taskMessage = nullptr;
1477 for (auto t: delayedTimers_) {
1478 if (t == nullptr) {
1479 continue;
1480 }
1481 taskMessage = static_cast<TaskMessage*>(t->data);
1482 deferreds.push_back(taskMessage->deferred);
1483 uv_timer_stop(t);
1484 uv_close(reinterpret_cast<uv_handle_t*>(t), [](uv_handle_t* handle) {
1485 delete (uv_timer_t*)handle;
1486 handle = nullptr;
1487 });
1488 delete taskMessage;
1489 taskMessage = nullptr;
1490 }
1491 delayedTimers_.clear();
1492 }
1493 std::string error = "taskpool:: task has been canceled";
1494 TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error);
1495 }
1496
VerifyAndPostResult(Priority priority)1497 bool Task::VerifyAndPostResult(Priority priority)
1498 {
1499 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1500 if (IsMainThreadTask()) {
1501 HITRACE_HELPER_METER_NAME("VerifyAndPostResult: PostTask");
1502 uint32_t taskId = taskId_;
1503 auto onResultTask = [taskId]() {
1504 Task* task = TaskManager::GetInstance().GetTask(taskId);
1505 if (task == nullptr) {
1506 return;
1507 }
1508 TaskPool::HandleTaskResultCallback(task);
1509 };
1510 TaskManager::GetInstance().PostTask(onResultTask, "TaskPoolOnResultTask", priority);
1511 return true;
1512 } else {
1513 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1514 if (!IsValid() || !ConcurrentHelper::IsUvActive(onResultSignal_)) {
1515 return false;
1516 }
1517 uv_async_send(onResultSignal_);
1518 return true;
1519 }
1520 #else
1521 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1522 if (!IsValid() || !ConcurrentHelper::IsUvActive(onResultSignal_)) {
1523 return false;
1524 }
1525 uv_async_send(onResultSignal_);
1526 return true;
1527 #endif
1528 }
1529
IncreaseTaskRefCount()1530 void Task::IncreaseTaskRefCount()
1531 {
1532 refCount_++; // when tasks are created or executed, refCount_ will increment
1533 }
1534
DecreaseTaskRefCount()1535 void Task::DecreaseTaskRefCount()
1536 {
1537 refCount_--; // when tasks finished, refCount_ will decrement
1538 }
1539
ShouldDeleteTask(bool needUnref)1540 bool Task::ShouldDeleteTask(bool needUnref)
1541 {
1542 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1543 if (!IsValid()) {
1544 HILOG_WARN("taskpool:: task is invalid");
1545 if (IsAsyncRunnerTask()) {
1546 AsyncRunnerManager::GetInstance().DecreaseRunningCount(asyncRunnerId_);
1547 }
1548 TaskManager::GetInstance().RemoveTask(taskId_);
1549 return true;
1550 }
1551 if (needUnref) {
1552 DecreaseTaskRefCount();
1553 }
1554 return false;
1555 }
1556
CheckStartExecution(Priority priority)1557 bool Task::CheckStartExecution(Priority priority)
1558 {
1559 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1560 if (IsMainThreadTask()) {
1561 if (onStartExecutionCallBackInfo_ == nullptr) {
1562 return true;
1563 }
1564 HITRACE_HELPER_METER_NAME("PerformTask: PostTask");
1565 uint32_t taskId = taskId_;
1566 auto onStartExecutionTask = [taskId]() {
1567 Task* task = TaskManager::GetInstance().GetTask(taskId);
1568 if (task == nullptr || task->onStartExecutionCallBackInfo_ == nullptr) {
1569 return;
1570 }
1571 Task::StartExecutionTask(task->onStartExecutionCallBackInfo_);
1572 };
1573 TaskManager::GetInstance().PostTask(onStartExecutionTask, "TaskPoolOnStartExecutionTask", priority);
1574 } else {
1575 if (onStartExecutionSignal_ == nullptr) {
1576 return true;
1577 }
1578 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1579 if (!IsValid()) {
1580 return false;
1581 }
1582 ConcurrentHelper::UvCheckAndAsyncSend(onStartExecutionSignal_);
1583 }
1584 return true;
1585 #else
1586 if (onStartExecutionSignal_ == nullptr) {
1587 return true;
1588 }
1589 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1590 if (!IsValid()) {
1591 return false;
1592 }
1593 ConcurrentHelper::UvCheckAndAsyncSend(onStartExecutionSignal_);
1594 return true;
1595 #endif
1596 }
1597
SetValid(bool isValid)1598 void Task::SetValid(bool isValid)
1599 {
1600 isValid_.store(isValid);
1601 }
1602
IsValid()1603 bool Task::IsValid()
1604 {
1605 return isValid_.load();
1606 }
1607
CanForAsyncRunner(napi_env env)1608 bool Task::CanForAsyncRunner(napi_env env)
1609 {
1610 std::string errMessage = "";
1611 if (HasDependency()) {
1612 errMessage = "AsyncRunner:: dependent task not allowed.";
1613 HILOG_ERROR("%{public}s", errMessage.c_str());
1614 ErrorHelper::ThrowError(env, ErrorHelper::ERR_ADD_DEPENDENT_TASK_TO_SEQRUNNER, errMessage.c_str());
1615 return false;
1616 }
1617 if (IsPeriodicTask()) {
1618 errMessage = "AsyncRunner cannot execute the periodicTask.";
1619 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
1620 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_EXECUTE_AGAIN, errMessage.c_str());
1621 return false;
1622 }
1623 if (IsCommonTask() || IsSeqRunnerTask()) {
1624 errMessage = "AsyncRunner cannot execute seqRunnerTask or executedTask.";
1625 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
1626 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_CANNOT_EXECUTED, errMessage.c_str());
1627 return false;
1628 }
1629 if (IsGroupCommonTask()) {
1630 errMessage = "AsyncRunner cannot execute groupTask.";
1631 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
1632 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_CANNOT_EXECUTED, errMessage.c_str());
1633 return false;
1634 }
1635 if (IsAsyncRunnerTask()) {
1636 errMessage = "AsyncRunner cannot execute asyncRunnerTask.";
1637 HILOG_ERROR("taskpool:: %{public}s", errMessage.c_str());
1638 ErrorHelper::ThrowError(env, ErrorHelper::ERR_TASK_CANNOT_EXECUTED, errMessage.c_str());
1639 return false;
1640 }
1641 return true;
1642 }
1643
IsAsyncRunnerTask()1644 bool Task::IsAsyncRunnerTask()
1645 {
1646 return taskType_ == TaskType::ASYNCRUNNER_TASK;
1647 }
1648
SetTaskId(uint32_t taskId)1649 void Task::SetTaskId(uint32_t taskId)
1650 {
1651 taskId_ = taskId;
1652 }
1653
TriggerCancel(CancelTaskMessage * message)1654 void Task::TriggerCancel(CancelTaskMessage* message)
1655 {
1656 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1657 if (IsMainThreadTask()) {
1658 HITRACE_HELPER_METER_NAME("TriggerCancel: PostTask");
1659 auto onCancelTask = [message]() {
1660 Task* task = TaskManager::GetInstance().GetTask(message->taskId);
1661 if (task == nullptr) {
1662 CloseHelp::DeletePointer(message, false);
1663 return;
1664 }
1665 napi_status status = napi_ok;
1666 HandleScope scope(task->env_, status);
1667 if (status != napi_ok) {
1668 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
1669 CloseHelp::DeletePointer(message, false);
1670 return;
1671 }
1672 task->CancelInner(message->state);
1673 CloseHelp::DeletePointer(message, false);
1674 };
1675 TaskManager::GetInstance().PostTask(onCancelTask, "TaskOnCancelTask", Priority::DEFAULT);
1676 } else {
1677 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1678 if (!IsValid() || !ConcurrentHelper::IsUvActive(onStartCancelSignal_)) {
1679 return;
1680 }
1681 onStartCancelSignal_->data = message;
1682 uv_async_send(onStartCancelSignal_);
1683 }
1684 #else
1685 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1686 if (!IsValid() || !ConcurrentHelper::IsUvActive(onStartCancelSignal_)) {
1687 CloseHelp::DeletePointer(message, false);
1688 return;
1689 }
1690 onStartCancelSignal_->data = message;
1691 uv_async_send(onStartCancelSignal_);
1692 #endif
1693 }
1694
CancelInner(ExecuteState state)1695 void Task::CancelInner(ExecuteState state)
1696 {
1697 ClearDelayedTimers();
1698 CancelPendingTask(env_);
1699 if (HasDependency()) {
1700 TaskManager::GetInstance().ClearDependentTask(taskId_);
1701 }
1702 std::list<napi_deferred> deferreds {};
1703 {
1704 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1705 if (state == ExecuteState::WAITING && currentTaskInfo_ != nullptr &&
1706 TaskManager::GetInstance().EraseWaitingTaskId(taskId_, currentTaskInfo_->priority)) {
1707 reinterpret_cast<NativeEngine*>(env_)->DecreaseSubEnvCounter();
1708 DecreaseTaskRefCount();
1709 TaskManager::GetInstance().DecreaseRefCount(env_, taskId_);
1710 deferreds.push_back(currentTaskInfo_->deferred);
1711 napi_reference_unref(env_, taskRef_, nullptr);
1712 delete currentTaskInfo_;
1713 currentTaskInfo_ = nullptr;
1714 isCancelToFinish_ = true;
1715 }
1716 if (IsSeqRunnerTask() && state == ExecuteState::CANCELED) {
1717 DisposeCanceledTask();
1718 return;
1719 }
1720 if (state == ExecuteState::DELAYED) {
1721 isCancelToFinish_ = true;
1722 }
1723 }
1724 std::string error = "taskpool:: task has been canceled";
1725 TaskManager::GetInstance().BatchRejectDeferred(env_, deferreds, error);
1726 }
1727
IsSameEnv(napi_env env)1728 bool Task::IsSameEnv(napi_env env)
1729 {
1730 return env_ == env;
1731 }
1732
DiscardAsyncRunnerTask(DiscardTaskMessage * message)1733 void Task::DiscardAsyncRunnerTask(DiscardTaskMessage* message)
1734 {
1735 if (message == nullptr || !IsAsyncRunnerTask() || !IsValid()) {
1736 CloseHelp::DeletePointer(message, false);
1737 return;
1738 }
1739 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1740 if (IsMainThreadTask()) {
1741 HITRACE_HELPER_METER_NAME("DiscardAsyncRunnerTask: PostTask");
1742 auto onDiscardTask = [message]() {
1743 Task* task = TaskManager::GetInstance().GetTask(message->taskId);
1744 if (task == nullptr) {
1745 CloseHelp::DeletePointer(message, false);
1746 return;
1747 }
1748 napi_status status = napi_ok;
1749 HandleScope scope(task->env_, status);
1750 if (status != napi_ok) {
1751 CloseHelp::DeletePointer(message, false);
1752 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
1753 return;
1754 }
1755 task->DiscardInner(message);
1756 };
1757 TaskManager::GetInstance().PostTask(onDiscardTask, "TaskOnDiscardTask", Priority::DEFAULT);
1758 } else {
1759 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1760 if (ConcurrentHelper::IsUvActive(onStartDiscardSignal_)) {
1761 onStartDiscardSignal_->data = message;
1762 uv_async_send(onStartDiscardSignal_);
1763 }
1764 }
1765 #else
1766 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1767 if (ConcurrentHelper::IsUvActive(onStartDiscardSignal_)) {
1768 onStartDiscardSignal_->data = message;
1769 uv_async_send(onStartDiscardSignal_);
1770 }
1771 #endif
1772 }
1773
DiscardInner(DiscardTaskMessage * message)1774 void Task::DiscardInner(DiscardTaskMessage* message)
1775 {
1776 if (message == nullptr) {
1777 CloseHelp::DeletePointer(message, false);
1778 return;
1779 }
1780 auto task = TaskManager::GetInstance().GetTask(message->taskId);
1781 if (task == nullptr || !task->IsValid() || message->env != task->env_) {
1782 CloseHelp::DeletePointer(message, false);
1783 HILOG_DEBUG("taskpool:: discard task is nullptr.");
1784 return;
1785 }
1786 napi_value error = ErrorHelper::NewError(task->env_, message->errCode);
1787 napi_reject_deferred(task->env_, task->currentTaskInfo_->deferred, error);
1788 DisposeCanceledTask();
1789 TaskManager::GetInstance().RemoveTask(message->taskId);
1790
1791 CloseHelp::DeletePointer(message, false);
1792 }
1793
DiscardTask(const uv_async_t * req)1794 void Task::DiscardTask(const uv_async_t* req)
1795 {
1796 auto message = static_cast<DiscardTaskMessage*>(req->data);
1797 if (message == nullptr) {
1798 return;
1799 }
1800 auto task = TaskManager::GetInstance().GetTask(message->taskId);
1801 if (task == nullptr || task->env_ != message->env) {
1802 CloseHelp::DeletePointer(message, false);
1803 HILOG_DEBUG("taskpool:: task is nullptr.");
1804 return;
1805 }
1806 napi_status status = napi_ok;
1807 HandleScope scope(task->env_, status);
1808 if (status != napi_ok) {
1809 CloseHelp::DeletePointer(message, false);
1810 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
1811 return;
1812 }
1813
1814 task->DiscardInner(message);
1815 }
1816
ReleaseData()1817 void Task::ReleaseData()
1818 {
1819 std::lock_guard<std::recursive_mutex> lock(taskMutex_);
1820 if (onResultSignal_ != nullptr) {
1821 if (!ConcurrentHelper::IsUvClosing(onResultSignal_)) {
1822 ConcurrentHelper::UvHandleClose(onResultSignal_);
1823 } else {
1824 delete onResultSignal_;
1825 onResultSignal_ = nullptr;
1826 }
1827 }
1828
1829 if (onStartCancelSignal_ != nullptr) {
1830 if (!ConcurrentHelper::IsUvClosing(onStartCancelSignal_)) {
1831 ConcurrentHelper::UvHandleClose(onStartCancelSignal_);
1832 } else {
1833 delete onStartCancelSignal_;
1834 onStartCancelSignal_ = nullptr;
1835 }
1836 }
1837
1838 if (onStartDiscardSignal_ != nullptr) {
1839 if (!ConcurrentHelper::IsUvClosing(onStartDiscardSignal_)) {
1840 ConcurrentHelper::UvHandleClose(onStartDiscardSignal_);
1841 } else {
1842 delete onStartDiscardSignal_;
1843 onStartDiscardSignal_ = nullptr;
1844 }
1845 }
1846
1847 if (currentTaskInfo_ != nullptr) {
1848 delete currentTaskInfo_;
1849 currentTaskInfo_ = nullptr;
1850 }
1851 }
1852
DisposeCanceledTask()1853 void Task::DisposeCanceledTask()
1854 {
1855 reinterpret_cast<NativeEngine*>(env_)->DecreaseSubEnvCounter();
1856 napi_reference_unref(env_, taskRef_, nullptr);
1857 delete currentTaskInfo_;
1858 currentTaskInfo_ = nullptr;
1859 }
1860 } // namespace Commonlibrary::Concurrent::TaskPoolModule