• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task.h"
17 
18 #include "helper/error_helper.h"
19 #include "helper/napi_helper.h"
20 #include "helper/object_helper.h"
21 #include "task_manager.h"
22 #include "taskpool.h"
23 #include "utils/log.h"
24 #include "worker.h"
25 
26 namespace Commonlibrary::Concurrent::TaskPoolModule {
27 static constexpr char ONRECEIVEDATA_STR[] = "onReceiveData";
28 static constexpr char SETTRANSFERLIST_STR[] = "setTransferList";
29 static constexpr char SET_CLONE_LIST_STR[] = "setCloneList";
30 
31 using namespace Commonlibrary::Concurrent::Common::Helper;
32 
Task(napi_env env,TaskType taskType,std::string name)33 Task::Task(napi_env env, TaskType taskType, std::string name) : env_(env), taskType_(taskType), name_(name) {}
34 
TaskConstructor(napi_env env,napi_callback_info cbinfo)35 napi_value Task::TaskConstructor(napi_env env, napi_callback_info cbinfo)
36 {
37     // check argv count
38     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
39     std::string errMessage = "";
40     if (argc < 1) {
41         errMessage = "taskpool:: create task need more than one param";
42         HILOG_ERROR("%{public}s", errMessage.c_str());
43         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
44         return nullptr;
45     }
46 
47     napi_value* args = new napi_value[argc];
48     ObjectScope<napi_value> scope(args, true);
49     napi_value thisVar;
50     napi_value func = nullptr;
51     napi_value name = nullptr;
52     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
53     // if the first is task name, the second might be func
54     if (argc > 1 && NapiHelper::IsString(env, args[0])) {
55         name = args[0];
56         func = args[1];
57         args += 2; // 2: name and func
58         argc -= 2; // 2: name and func
59     } else {
60         func = args[0];
61         args += 1; // 1: func
62         argc -= 1; // 1: func
63     }
64     if (!NapiHelper::IsFunction(env, func)) {
65         errMessage = "taskpool:: the first or second param of task must be function";
66         HILOG_ERROR("%{public}s", errMessage.c_str());
67         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
68         return nullptr;
69     }
70 
71     Task* task = GenerateTask(env, thisVar, func, name, args, argc);
72     TaskManager::GetInstance().StoreTask(task->taskId_, task);
73     napi_wrap(env, thisVar, task, TaskDestructor, nullptr, nullptr);
74     napi_create_reference(env, thisVar, 0, &task->taskRef_);
75     return thisVar;
76 }
77 
TaskDestructor(napi_env env,void * data,void * hint)78 void Task::TaskDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
79 {
80     Task* task = static_cast<Task*>(data);
81     TaskManager::GetInstance().ReleaseTaskData(env, task);
82     delete task;
83 }
84 
GenerateTask(napi_env env,napi_value napiTask,napi_value func,napi_value name,napi_value * args,size_t argc)85 Task* Task::GenerateTask(napi_env env, napi_value napiTask, napi_value func,
86                          napi_value name, napi_value* args, size_t argc)
87 {
88     napi_value argsArray;
89     napi_create_array_with_length(env, argc, &argsArray);
90     for (size_t i = 0; i < argc; i++) {
91         napi_set_element(env, argsArray, i, args[i]);
92     }
93     if (name == nullptr) {
94         name = NapiHelper::GetNameProperty(env, func, NAME);
95     }
96     char* nameStr = NapiHelper::GetString(env, name);
97     Task* task = new Task(env, TaskType::TASK, nameStr);
98     delete[] nameStr;
99     task->taskId_ = reinterpret_cast<uint64_t>(task);
100     uv_loop_t* loop = NapiHelper::GetLibUV(env);
101     task->onResultSignal_ = new uv_async_t;
102     uv_async_init(loop, task->onResultSignal_, reinterpret_cast<uv_async_cb>(TaskPool::HandleTaskResult));
103     task->onResultSignal_->data = task;
104     task->increaseRefSignal_ = new uv_async_t;
105     uv_async_init(loop, task->increaseRefSignal_, reinterpret_cast<uv_async_cb>(Task::IncreaseTaskRef));
106     task->increaseRefSignal_->data = task;
107     napi_value taskId = NapiHelper::CreateUint64(env, task->taskId_);
108     napi_value napiTrue = NapiHelper::CreateBooleanValue(env, true);
109     napi_value napiFalse = NapiHelper::CreateBooleanValue(env, false);
110     // add task name to task
111     napi_set_named_property(env, napiTask, NAME, name);
112     napi_set_named_property(env, napiTask, FUNCTION_STR, func);
113     napi_set_named_property(env, napiTask, TASKID_STR, taskId);
114     napi_set_named_property(env, napiTask, ARGUMENTS_STR, argsArray);
115     napi_set_named_property(env, napiTask, DEFAULT_TRANSFER_STR, napiTrue);
116     napi_set_named_property(env, napiTask, DEFAULT_CLONE_SENDABLE_STR, napiFalse);
117     napi_property_descriptor properties[] = {
118         DECLARE_NAPI_FUNCTION(SETTRANSFERLIST_STR, SetTransferList),
119         DECLARE_NAPI_FUNCTION(SET_CLONE_LIST_STR, SetCloneList),
120         DECLARE_NAPI_FUNCTION(ONRECEIVEDATA_STR, OnReceiveData),
121         DECLARE_NAPI_FUNCTION(ADD_DEPENDENCY_STR, AddDependency),
122         DECLARE_NAPI_FUNCTION(REMOVE_DEPENDENCY_STR, RemoveDependency),
123         DECLARE_NAPI_GETTER(TASK_TOTAL_TIME, GetTotalDuration),
124         DECLARE_NAPI_GETTER(TASK_CPU_TIME, GetCPUDuration),
125         DECLARE_NAPI_GETTER(TASK_IO_TIME, GetIODuration)
126     };
127     napi_define_properties(env, napiTask, sizeof(properties) / sizeof(properties[0]), properties);
128 
129     return task;
130 }
131 
GenerateFunctionTask(napi_env env,napi_value func,napi_value * args,size_t argc,TaskType type)132 Task* Task::GenerateFunctionTask(napi_env env, napi_value func, napi_value* args, size_t argc, TaskType type)
133 {
134     napi_value argsArray;
135     napi_create_array_with_length(env, argc, &argsArray);
136     for (size_t i = 0; i < argc; i++) {
137         napi_set_element(env, argsArray, i, args[i]);
138     }
139     napi_value undefined = NapiHelper::GetUndefinedValue(env);
140     TaskInfo* taskInfo = GenerateTaskInfo(env, func, argsArray, undefined, undefined, Priority::DEFAULT);
141     if (taskInfo == nullptr) {
142         return nullptr;
143     }
144     napi_value napiFuncName = NapiHelper::GetNameProperty(env, func, NAME);
145     char* nameStr = NapiHelper::GetString(env, napiFuncName);
146     Task* task = new Task(env, type, nameStr);
147     delete[] nameStr;
148     task->taskId_ = reinterpret_cast<uint64_t>(task);
149     task->currentTaskInfo_ = taskInfo;
150     task->onResultSignal_ = new uv_async_t;
151     uv_loop_t* loop = NapiHelper::GetLibUV(env);
152     uv_async_init(loop, task->onResultSignal_, reinterpret_cast<uv_async_cb>(TaskPool::HandleTaskResult));
153     task->onResultSignal_->data = task;
154     return task;
155 }
156 
GetTaskInfoPromise(napi_env env,napi_value task,TaskType taskType,Priority priority)157 napi_value Task::GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType, Priority priority)
158 {
159     TaskInfo* taskInfo = GetTaskInfo(env, task, taskType, priority);
160     if (taskInfo == nullptr) {
161         return nullptr;
162     }
163     return NapiHelper::CreatePromise(env, &taskInfo->deferred);
164 }
165 
GetTaskInfo(napi_env env,napi_value task,TaskType taskType,Priority priority)166 TaskInfo* Task::GetTaskInfo(napi_env env, napi_value task, TaskType taskType, Priority priority)
167 {
168     std::string errMessage = "";
169     if (!IsInitialized() && taskType_ != taskType) {
170         errMessage = "taskpool:: task type is error";
171         HILOG_ERROR("%{public}s", errMessage.c_str());
172         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
173         return nullptr;
174     }
175     taskType_ = taskType;
176     napi_value func = NapiHelper::GetNameProperty(env, task, FUNCTION_STR);
177     napi_value args = NapiHelper::GetNameProperty(env, task, ARGUMENTS_STR);
178     napi_value taskName = NapiHelper::GetNameProperty(env, task, NAME);
179     napi_value napiDefaultTransfer = NapiHelper::GetNameProperty(env, task, DEFAULT_TRANSFER_STR);
180     napi_value napiDefaultClone = NapiHelper::GetNameProperty(env, task, DEFAULT_CLONE_SENDABLE_STR);
181     if (func == nullptr || args == nullptr || napiDefaultTransfer == nullptr || napiDefaultClone == nullptr) {
182         errMessage = "taskpool:: task value is error";
183         HILOG_ERROR("%{public}s", errMessage.c_str());
184         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
185         return nullptr;
186     }
187     napi_value transferList = NapiHelper::GetUndefinedValue(env);
188     if (NapiHelper::HasNameProperty(env, task, TRANSFERLIST_STR)) {
189         transferList = NapiHelper::GetNameProperty(env, task, TRANSFERLIST_STR);
190     }
191     napi_value cloneList = NapiHelper::GetUndefinedValue(env);
192     if (NapiHelper::HasNameProperty(env, task, CLONE_LIST_STR)) {
193         cloneList = NapiHelper::GetNameProperty(env, task, CLONE_LIST_STR);
194     }
195     bool defaultTransfer = NapiHelper::GetBooleanValue(env, napiDefaultTransfer);
196     bool defaultCloneSendable = NapiHelper::GetBooleanValue(env, napiDefaultClone);
197     TaskInfo* pendingInfo = GenerateTaskInfo(env, func, args, transferList, cloneList, priority,
198                                              defaultTransfer, defaultCloneSendable);
199     if (pendingInfo == nullptr) {
200         return nullptr;
201     }
202     {
203         std::unique_lock<std::shared_mutex> lock(taskMutex_);
204         if (currentTaskInfo_ == nullptr) {
205             currentTaskInfo_ = pendingInfo;
206         } else {
207             pendingTaskInfos_.push_back(pendingInfo);
208         }
209         napi_reference_ref(env, taskRef_, nullptr);
210     }
211     char* name = NapiHelper::GetString(env, taskName);
212     if (strlen(name) == 0) {
213         napi_value funcName = NapiHelper::GetNameProperty(env, func, NAME);
214         name = NapiHelper::GetString(env, funcName);
215     }
216     name_ = std::string(name);
217     delete[] name;
218     reinterpret_cast<NativeEngine*>(env)->IncreaseSubEnvCounter();
219     return pendingInfo;
220 }
221 
SetTransferList(napi_env env,napi_callback_info cbinfo)222 napi_value Task::SetTransferList(napi_env env, napi_callback_info cbinfo)
223 {
224     size_t argc = 1;
225     napi_value args[1];
226     napi_value thisVar;
227     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
228     // Check whether clone list has been set
229     if (NapiHelper::HasNameProperty(env, thisVar, CLONE_LIST_STR)) {
230         ErrorHelper::ThrowError(env, ErrorHelper::ERR_IN_BOTH_CLONE_AND_TRANSFER);
231         return nullptr;
232     }
233     if (argc > 1) {
234         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
235                                 "taskpool:: the number of setTransferList parma must be less than 2");
236         return nullptr;
237     }
238     Task* task = nullptr;
239     napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
240     if (task == nullptr) {
241         HILOG_ERROR("taskpool:: task is nullptr");
242         return nullptr;
243     }
244     napi_value undefined = NapiHelper::GetUndefinedValue(env);
245     napi_value falseVal = NapiHelper::CreateBooleanValue(env, false);
246     if (argc == 0) {
247         HILOG_DEBUG("taskpool:: set task params not transfer");
248         napi_set_named_property(env, thisVar, TRANSFERLIST_STR, undefined);
249         // set task.defaultTransfer false
250         napi_set_named_property(env, thisVar, DEFAULT_TRANSFER_STR, falseVal);
251         return nullptr;
252     }
253     if (!NapiHelper::IsArray(env, args[0])) {
254         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: setTransferList first param must be array");
255         return nullptr;
256     }
257     // set task.defaultTransfer false
258     napi_set_named_property(env, thisVar, DEFAULT_TRANSFER_STR, falseVal);
259     uint32_t arrayLength = NapiHelper::GetArrayLength(env, args[0]);
260     if (arrayLength == 0) {
261         HILOG_DEBUG("taskpool:: set task params not transfer");
262         napi_set_named_property(env, thisVar, TRANSFERLIST_STR, undefined);
263         return nullptr;
264     }
265     for (size_t i = 0; i < arrayLength; i++) {
266         napi_value transferVal = NapiHelper::GetElement(env, args[0], i);
267         if (!NapiHelper::IsArrayBuffer(env, transferVal)) {
268             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
269                                     "taskpool:: the element in array must be arraybuffer");
270             return nullptr;
271         }
272     }
273     HILOG_DEBUG("taskpool:: check setTransferList param success");
274     napi_set_named_property(env, thisVar, TRANSFERLIST_STR, args[0]);
275     return nullptr;
276 }
277 
SetCloneList(napi_env env,napi_callback_info cbinfo)278 napi_value Task::SetCloneList(napi_env env, napi_callback_info cbinfo)
279 {
280     size_t argc = 1;
281     napi_value args[1];
282     napi_value thisVar;
283     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
284     // Check whether transfer list has been set
285     if (NapiHelper::HasNameProperty(env, thisVar, TRANSFERLIST_STR)) {
286         ErrorHelper::ThrowError(env, ErrorHelper::ERR_IN_BOTH_CLONE_AND_TRANSFER);
287         return nullptr;
288     }
289     if (argc != 1) {
290         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: the number of setCloneList parma must be 1");
291         return nullptr;
292     }
293     if (!NapiHelper::IsArray(env, args[0])) {
294         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: setCloneList first param must be array");
295         return nullptr;
296     }
297     Task* task = nullptr;
298     napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
299     if (task == nullptr) {
300         HILOG_ERROR("taskpool:: task is nullptr");
301         return nullptr;
302     }
303     napi_value undefined = NapiHelper::GetUndefinedValue(env);
304     uint32_t arrayLength = NapiHelper::GetArrayLength(env, args[0]);
305     if (arrayLength == 0) {
306         HILOG_DEBUG("taskpool:: clone list is empty");
307         napi_set_named_property(env, thisVar, CLONE_LIST_STR, undefined);
308         return nullptr;
309     }
310     for (size_t i = 0; i < arrayLength; i++) {
311         napi_value cloneVal = NapiHelper::GetElement(env, args[0], i);
312         if (!NapiHelper::IsArrayBuffer(env, cloneVal) && !NapiHelper::IsSendablObject(env, cloneVal)) {
313             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
314                 "taskpool:: setCloneList elements in array must be ArrayBuffer or Sendable Class instance");
315             return nullptr;
316         }
317     }
318     napi_set_named_property(env, thisVar, CLONE_LIST_STR, args[0]);
319     return nullptr;
320 }
321 
IsCanceled(napi_env env,napi_callback_info cbinfo)322 napi_value Task::IsCanceled(napi_env env, napi_callback_info cbinfo)
323 {
324     bool isCanceled = false;
325     auto engine = reinterpret_cast<NativeEngine*>(env);
326     if (!engine->IsTaskPoolThread()) {
327         HILOG_ERROR("taskpool:: call isCanceled not in taskpool thread");
328         return NapiHelper::CreateBooleanValue(env, isCanceled);
329     }
330     // Get task and query task cancel state
331     void* data = engine->GetCurrentTaskInfo();
332     if (data == nullptr) {
333         HILOG_ERROR("taskpool:: call isCanceled not in Concurrent function");
334     } else {
335         Task* task = static_cast<Task*>(data);
336         isCanceled = task->taskState_ == ExecuteState::CANCELED ? true : false;
337     }
338     return NapiHelper::CreateBooleanValue(env, isCanceled);
339 }
340 
OnReceiveData(napi_env env,napi_callback_info cbinfo)341 napi_value Task::OnReceiveData(napi_env env, napi_callback_info cbinfo)
342 {
343     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
344     if (argc >= 2) { // 2: the number of parmas
345         HILOG_ERROR("taskpool:: the number of OnReceiveData parma must be less than 2");
346         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
347             "taskpool:: the number of OnReceiveData parma must be less than 2");
348         return nullptr;
349     }
350 
351     napi_value thisVar;
352     if (argc == 0) {
353         HILOG_INFO("taskpool:: Set taskpool.Task.onReceiveData to undefined");
354         napi_get_cb_info(env, cbinfo, &argc, nullptr, &thisVar, nullptr);
355         napi_value id = NapiHelper::GetNameProperty(env, thisVar, "taskId");
356         uint64_t taskId = NapiHelper::GetUint64Value(env, id);
357         TaskManager::GetInstance().RegisterCallback(env, taskId, nullptr);
358         return nullptr;
359     }
360 
361     napi_value args[1];
362     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
363     napi_valuetype type;
364     NAPI_CALL(env, napi_typeof(env, args[0], &type));
365     if (type != napi_function) {
366         HILOG_ERROR("taskpool:: OnReceiveData's parameter should be function");
367         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
368             "taskpool:: OnReceiveData's parameter should be function");
369         return nullptr;
370     }
371     // store callbackInfo
372     napi_value napiTaskId = NapiHelper::GetNameProperty(env, thisVar, "taskId");
373     uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
374     napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, args[0], 1);
375     std::shared_ptr<CallbackInfo> callbackInfo = std::make_shared<CallbackInfo>(env, 1, callbackRef);
376     callbackInfo->onCallbackSignal = new uv_async_t;
377     auto loop = NapiHelper::GetLibUV(env);
378     uv_async_init(loop, callbackInfo->onCallbackSignal, TaskPool::ExecuteCallback);
379     TaskManager::GetInstance().RegisterCallback(env, taskId, callbackInfo);
380     return nullptr;
381 }
382 
SendData(napi_env env,napi_callback_info cbinfo)383 napi_value Task::SendData(napi_env env, napi_callback_info cbinfo)
384 {
385     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
386     napi_value args[argc];
387     napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
388 
389     napi_value argsArray;
390     napi_create_array_with_length(env, argc, &argsArray);
391     for (size_t i = 0; i < argc; i++) {
392         napi_set_element(env, argsArray, i, args[i]);
393     }
394 
395     auto engine = reinterpret_cast<NativeEngine*>(env);
396     if (!engine->IsTaskPoolThread()) {
397         HILOG_ERROR("taskpool:: SendData is not called in the taskpool thread");
398         ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_IN_TASKPOOL_THREAD);
399         return nullptr;
400     }
401     Task* task = nullptr;
402     void* data = engine->GetCurrentTaskInfo();
403     if (data == nullptr) {
404         HILOG_ERROR("taskpool:: SendData is not called in the concurrent function");
405         ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_IN_CONCURRENT_FUNCTION);
406         return nullptr;
407     } else {
408         task = static_cast<Task*>(data);
409     }
410 
411     napi_value undefined = NapiHelper::GetUndefinedValue(env);
412     napi_value serializationArgs;
413     bool defaultClone = true;
414     bool defaultTransfer = true;
415     napi_status status = napi_serialize(env, argsArray, undefined, argsArray,
416                                         defaultTransfer, defaultClone, &serializationArgs);
417     if (status != napi_ok || serializationArgs == nullptr) {
418         std::string errMessage = "taskpool:: failed to serialize function";
419         HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
420         ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
421         return nullptr;
422     }
423     uv_async_send(task->increaseRefSignal_);
424     TaskResultInfo* resultInfo = new TaskResultInfo(task->env_, task->taskId_, serializationArgs);
425     return TaskManager::GetInstance().NotifyCallbackExecute(env, resultInfo, task);
426 }
427 
AddDependency(napi_env env,napi_callback_info cbinfo)428 napi_value Task::AddDependency(napi_env env, napi_callback_info cbinfo)
429 {
430     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
431     if (argc == 0) {
432         std::string errMessage = "taskpool:: addDependency has no params";
433         HILOG_ERROR("%{public}s", errMessage.c_str());
434         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
435         return nullptr;
436     }
437 
438     napi_status status = napi_ok;
439     HandleScope scope(env, status);
440     napi_value args[argc];
441     napi_value napiTask;
442     napi_get_cb_info(env, cbinfo, &argc, args, &napiTask, nullptr);
443     Task* task = nullptr;
444     napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
445     std::string errMessage = "";
446     if (task->IsCommonTask() || task->IsSeqRunnerTask()) {
447         errMessage = "taskpool:: seqRunnerTask or executedTask cannot addDependency";
448         HILOG_ERROR("%{public}s", errMessage.c_str());
449         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
450         return nullptr;
451     }
452     if (task->IsGroupCommonTask()) {
453         errMessage = "taskpool:: groupTask cannot addDependency";
454         HILOG_ERROR("%{public}s", errMessage.c_str());
455         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
456         return nullptr;
457     }
458     std::set<uint64_t> idSet;
459     for (size_t i = 0; i < argc; i++) {
460         if (!NapiHelper::HasNameProperty(env, args[i], TASKID_STR)) {
461             errMessage = "taskpool:: addDependency param is not task";
462             HILOG_ERROR("%{public}s", errMessage.c_str());
463             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
464             return nullptr;
465         } else {
466             Task* dependentTask = nullptr;
467             napi_unwrap(env, args[i], reinterpret_cast<void**>(&dependentTask));
468             if (dependentTask->taskId_ == task->taskId_) {
469                 HILOG_ERROR("taskpool:: there is a circular dependency");
470                 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CIRCULAR_DEPENDENCY);
471                 return nullptr;
472             }
473             if (dependentTask->IsCommonTask() || dependentTask->IsSeqRunnerTask()) {
474                 errMessage = "taskpool:: seqRunnerTask or executedTask cannot be relied on";
475                 HILOG_ERROR("%{public}s", errMessage.c_str());
476                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
477                 return nullptr;
478             }
479             if (dependentTask->IsGroupCommonTask()) {
480                 errMessage = "taskpool:: groupTask cannot be relied on";
481                 HILOG_ERROR("%{public}s", errMessage.c_str());
482                 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
483                 return nullptr;
484             }
485             idSet.emplace(dependentTask->taskId_);
486         }
487     }
488     if (!TaskManager::GetInstance().StoreTaskDependency(task->taskId_, idSet)) {
489         HILOG_ERROR("taskpool:: there is a circular dependency");
490         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CIRCULAR_DEPENDENCY);
491     }
492     return nullptr;
493 }
494 
RemoveDependency(napi_env env,napi_callback_info cbinfo)495 napi_value Task::RemoveDependency(napi_env env, napi_callback_info cbinfo)
496 {
497     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
498     if (argc == 0) {
499         std::string errMessage = "taskpool:: removeDependency has no params";
500         HILOG_ERROR("%{public}s", errMessage.c_str());
501         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
502         return nullptr;
503     }
504 
505     napi_status status = napi_ok;
506     HandleScope scope(env, status);
507     napi_value args[argc];
508     napi_value napiTask;
509     napi_get_cb_info(env, cbinfo, &argc, args, &napiTask, nullptr);
510     Task* task = nullptr;
511     napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
512     for (size_t i = 0; i < argc; i++) {
513         if (!NapiHelper::HasNameProperty(env, args[i], TASKID_STR)) {
514             std::string errMessage = "taskpool:: removeDependency param is not task";
515             HILOG_ERROR("%{public}s", errMessage.c_str());
516             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
517             return nullptr;
518         }
519         Task* dependentTask = nullptr;
520         napi_unwrap(env, args[i], reinterpret_cast<void**>(&dependentTask));
521         if (!TaskManager::GetInstance().RemoveTaskDependency(task->taskId_, dependentTask->taskId_)) {
522             HILOG_ERROR("taskpool:: the dependency does not exist");
523             ErrorHelper::ThrowError(env, ErrorHelper::ERR_INEXISTENT_DEPENDENCY);
524             return nullptr;
525         }
526     }
527     return nullptr;
528 }
529 
GetTaskDuration(napi_env env,napi_callback_info & cbinfo,std::string durationType)530 napi_value Task::GetTaskDuration(napi_env env, napi_callback_info& cbinfo, std::string durationType)
531 {
532     napi_value thisVar = nullptr;
533     napi_get_cb_info(env, cbinfo, nullptr, nullptr, &thisVar, nullptr);
534     napi_value napiTaskId = NapiHelper::GetNameProperty(env, thisVar, TASKID_STR);
535     uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
536     uint64_t totalDuration = TaskManager::GetInstance().GetTaskDuration(taskId, durationType);
537     return NapiHelper::CreateUint32(env, totalDuration);
538 }
539 
GetTotalDuration(napi_env env,napi_callback_info cbinfo)540 napi_value Task::GetTotalDuration(napi_env env, napi_callback_info cbinfo)
541 {
542     return GetTaskDuration(env, cbinfo, TASK_TOTAL_TIME);
543 }
544 
GetCPUDuration(napi_env env,napi_callback_info cbinfo)545 napi_value Task::GetCPUDuration(napi_env env, napi_callback_info cbinfo)
546 {
547     return GetTaskDuration(env, cbinfo, TASK_CPU_TIME);
548 }
549 
GetIODuration(napi_env env,napi_callback_info cbinfo)550 napi_value Task::GetIODuration(napi_env env, napi_callback_info cbinfo)
551 {
552     return GetTaskDuration(env, cbinfo, TASK_IO_TIME);
553 }
554 
IsRepeatableTask()555 bool Task::IsRepeatableTask()
556 {
557     return IsCommonTask() || IsGroupCommonTask() || IsGroupFunctionTask();
558 }
559 
IsGroupTask()560 bool Task::IsGroupTask()
561 {
562     return IsGroupCommonTask() || IsGroupFunctionTask();
563 }
564 
IsGroupCommonTask()565 bool Task::IsGroupCommonTask()
566 {
567     return taskType_ == TaskType::GROUP_COMMON_TASK;
568 }
569 
IsGroupFunctionTask()570 bool Task::IsGroupFunctionTask()
571 {
572     return taskType_ == TaskType::GROUP_FUNCTION_TASK;
573 }
574 
IsCommonTask()575 bool Task::IsCommonTask()
576 {
577     return taskType_ == TaskType::COMMON_TASK;
578 }
579 
IsSeqRunnerTask()580 bool Task::IsSeqRunnerTask()
581 {
582     return taskType_ == TaskType::SEQRUNNER_TASK;
583 }
584 
IsFunctionTask()585 bool Task::IsFunctionTask()
586 {
587     return taskType_ == TaskType::FUNCTION_TASK;
588 }
589 
IsInitialized()590 bool Task::IsInitialized()
591 {
592     return taskType_ == TaskType::TASK;
593 }
594 
GenerateTaskInfo(napi_env env,napi_value func,napi_value args,napi_value transferList,napi_value cloneList,Priority priority,bool defaultTransfer,bool defaultCloneSendable)595 TaskInfo* Task::GenerateTaskInfo(napi_env env, napi_value func, napi_value args,
596                                  napi_value transferList, napi_value cloneList, Priority priority,
597                                  bool defaultTransfer, bool defaultCloneSendable)
598 {
599     napi_value undefined = NapiHelper::GetUndefinedValue(env);
600     napi_value serializationFunction;
601     napi_status status = napi_serialize(env, func, undefined, undefined,
602                                         defaultTransfer, defaultCloneSendable, &serializationFunction);
603     std::string errMessage = "";
604     if (status != napi_ok || serializationFunction == nullptr) {
605         errMessage = "taskpool: failed to serialize function.";
606         HILOG_ERROR("%{public}s", errMessage.c_str());
607         ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_CONCURRENT_FUNCTION, errMessage.c_str());
608         return nullptr;
609     }
610     napi_value serializationArguments;
611     status = napi_serialize(env, args, transferList, cloneList,
612                             defaultTransfer, defaultCloneSendable, &serializationArguments);
613     if (status != napi_ok || serializationArguments == nullptr) {
614         errMessage = "taskpool: failed to serialize arguments.";
615         HILOG_ERROR("%{public}s", errMessage.c_str());
616         ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
617         return nullptr;
618     }
619 
620     TaskInfo* taskInfo = new TaskInfo();
621     taskInfo->serializationFunction = serializationFunction;
622     taskInfo->serializationArguments = serializationArguments;
623     taskInfo->priority = priority;
624     return taskInfo;
625 }
626 
IncreaseRefCount()627 void Task::IncreaseRefCount()
628 {
629     taskRefCount_.fetch_add(2); // 2 : for PerformTask and TaskResultCallback
630 }
631 
DecreaseRefCount()632 void Task::DecreaseRefCount()
633 {
634     taskRefCount_.fetch_sub(1);
635 }
636 
IsReadyToHandle()637 bool Task::IsReadyToHandle()
638 {
639     return (taskRefCount_ & 1) == 0;
640 }
641 
NotifyPendingTask()642 void Task::NotifyPendingTask()
643 {
644     TaskManager::GetInstance().NotifyDependencyTaskInfo(taskId_);
645     std::unique_lock<std::shared_mutex> lock(taskMutex_);
646     auto finishedTaskInfo = currentTaskInfo_;
647     currentTaskInfo_ = nullptr;
648     delete finishedTaskInfo;
649     if (pendingTaskInfos_.size() == 0) {
650         return;
651     }
652     currentTaskInfo_ = pendingTaskInfos_.front();
653     pendingTaskInfos_.pop_front();
654     TaskManager::GetInstance().EnqueueTaskId(taskId_, currentTaskInfo_->priority);
655 }
656 
CancelPendingTask(napi_env env,ExecuteState state)657 void Task::CancelPendingTask(napi_env env, ExecuteState state)
658 {
659     napi_value undefined = NapiHelper::GetUndefinedValue(env);
660     if (state == ExecuteState::WAITING && currentTaskInfo_ != nullptr) {
661         napi_reject_deferred(env, currentTaskInfo_->deferred, undefined);
662         napi_reference_unref(env, taskRef_, nullptr);
663         delete currentTaskInfo_;
664         currentTaskInfo_ = nullptr;
665     }
666     if (pendingTaskInfos_.size() == 0) {
667         return;
668     }
669     auto pendingIter = pendingTaskInfos_.begin();
670     for (; pendingIter != pendingTaskInfos_.end(); ++pendingIter) {
671         TaskInfo* info = *pendingIter;
672         napi_reject_deferred(env, info->deferred, undefined);
673         napi_reference_unref(env, taskRef_, nullptr);
674         delete info;
675     }
676     pendingIter = pendingTaskInfos_.begin();
677     pendingTaskInfos_.erase(pendingIter, pendingTaskInfos_.end());
678 }
679 
UpdateTask(uint64_t startTime,void * worker)680 bool Task::UpdateTask(uint64_t startTime, void* worker)
681 {
682     if (taskState_ == ExecuteState::CANCELED) { // task may have been canceled
683         static_cast<Worker*>(worker)->NotifyTaskFinished();
684         HILOG_DEBUG("taskpool::PerformTask task is null");
685         return false;
686     }
687     taskState_ = ExecuteState::RUNNING;
688     startTime_ = startTime;
689     worker_ = worker;
690     return true;
691 }
692 
DeserializeValue(napi_env env,bool isFunc,bool isArgs)693 napi_value Task::DeserializeValue(napi_env env, bool isFunc, bool isArgs)
694 {
695     napi_value result = nullptr;
696     napi_status status = napi_ok;
697     std::string errMessage = "";
698     if (isFunc) {
699         status = napi_deserialize(env, currentTaskInfo_->serializationFunction, &result);
700         if (!IsGroupFunctionTask()) {
701             napi_delete_serialization_data(env, currentTaskInfo_->serializationFunction);
702         }
703         if (status != napi_ok || result == nullptr) {
704             errMessage = "taskpool:: failed to deserialize function.";
705             HILOG_ERROR("%{public}s", errMessage.c_str());
706             napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
707             success_ = false;
708             static_cast<Worker*>(worker_)->NotifyTaskResult(env, this, err);
709             return nullptr;
710         }
711         return result;
712     } else if (isArgs) {
713         status = napi_deserialize(env, currentTaskInfo_->serializationArguments, &result);
714         if (!IsGroupFunctionTask()) {
715             napi_delete_serialization_data(env, currentTaskInfo_->serializationArguments);
716         }
717         if (status != napi_ok || result == nullptr) {
718             errMessage = "taskpool:: failed to deserialize arguments.";
719             HILOG_ERROR("%{public}s", errMessage.c_str());
720             napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
721             success_ = false;
722             static_cast<Worker*>(worker_)->NotifyTaskResult(env, this, err);
723             return nullptr;
724         }
725         return result;
726     }
727     return nullptr;
728 }
729 
StoreTaskDuration()730 void Task::StoreTaskDuration()
731 {
732     cpuTime_ = ConcurrentHelper::GetMilliseconds();
733     uint64_t cpuDuration = cpuTime_ - startTime_;
734     if (ioTime_ != 0) {
735         uint64_t ioDuration = ioTime_ - startTime_;
736         TaskManager::GetInstance().StoreTaskDuration(taskId_, std::max(cpuDuration, ioDuration), cpuDuration);
737     } else {
738         TaskManager::GetInstance().StoreTaskDuration(taskId_, 0, cpuDuration);
739     }
740 }
741 
CanForSequenceRunner(napi_env env)742 bool Task::CanForSequenceRunner(napi_env env)
743 {
744     std::string errMessage = "";
745     // task with dependence is not allowed
746     if (TaskManager::GetInstance().IsDependendByTaskId(taskId_)) {
747         errMessage = "seqRunner:: dependent task not allowed.";
748         HILOG_ERROR("%{public}s", errMessage.c_str());
749         ErrorHelper::ThrowError(env, ErrorHelper::ERR_ADD_DEPENDENT_TASK_TO_SEQRUNNER, errMessage.c_str());
750         return false;
751     }
752     if (IsCommonTask() || IsSeqRunnerTask()) {
753         errMessage = "taskpool:: SequenceRunner cannot execute seqRunnerTask or executedTask";
754         HILOG_ERROR("%{public}s", errMessage.c_str());
755         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
756         return false;
757     }
758     if (IsGroupCommonTask()) {
759         errMessage = "taskpool:: SequenceRunner cannot execute groupTask";
760         HILOG_ERROR("%{public}s", errMessage.c_str());
761         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
762         return false;
763     }
764     return true;
765 }
766 
CanForTaskGroup(napi_env env)767 bool Task::CanForTaskGroup(napi_env env)
768 {
769     std::string errMessage = "";
770     if (TaskManager::GetInstance().IsDependendByTaskId(taskId_)) {
771         errMessage = "taskpool:: dependent task not allowed.";
772         HILOG_ERROR("%{public}s", errMessage.c_str());
773         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
774         return false;
775     }
776     if (IsCommonTask() || IsSeqRunnerTask()) {
777         errMessage = "taskpool:: taskGroup cannot add seqRunnerTask or executedTask";
778         HILOG_ERROR("%{public}s", errMessage.c_str());
779         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
780         return false;
781     }
782     if (IsGroupCommonTask()) {
783         errMessage = "taskpool:: taskGroup cannot add groupTask";
784         HILOG_ERROR("%{public}s", errMessage.c_str());
785         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
786         return false;
787     }
788     taskType_ = TaskType::GROUP_COMMON_TASK;
789     return true;
790 }
791 
CanExecute(napi_env env)792 bool Task::CanExecute(napi_env env)
793 {
794     std::string errMessage = "";
795     if (IsGroupCommonTask()) {
796         errMessage = "taskpool:: groupTask cannot execute outside";
797         HILOG_ERROR("%{public}s", errMessage.c_str());
798         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
799         return false;
800     }
801     if (IsSeqRunnerTask()) {
802         errMessage = "taskpool:: seqRunnerTask cannot execute outside";
803         HILOG_ERROR("%{public}s", errMessage.c_str());
804         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
805         return false;
806     }
807     return true;
808 }
809 
CanExecuteDelayed(napi_env env)810 bool Task::CanExecuteDelayed(napi_env env)
811 {
812     std::string errMessage = "";
813     if (IsGroupCommonTask()) {
814         errMessage = "taskpool:: groupTask cannot executeDelayed outside";
815         HILOG_ERROR("%{public}s", errMessage.c_str());
816         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
817         return false;
818     }
819     if (IsSeqRunnerTask()) {
820         errMessage = "taskpool:: seqRunnerTask cannot executeDelayed outside";
821         HILOG_ERROR("%{public}s", errMessage.c_str());
822         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
823         return false;
824     }
825     return true;
826 }
827 
IncreaseTaskRef(const uv_async_t * req)828 void Task::IncreaseTaskRef(const uv_async_t* req)
829 {
830     auto task = static_cast<Task*>(req->data);
831     if (task == nullptr) {
832         HILOG_FATAL("taskpool:: IncreaseTaskRef task is nullptr");
833         return;
834     }
835     napi_reference_ref(task->env_, task->taskRef_, nullptr);
836 }
837 } // namespace Commonlibrary::Concurrent::TaskPoolModule