1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task.h"
17
18 #include "helper/error_helper.h"
19 #include "helper/napi_helper.h"
20 #include "helper/object_helper.h"
21 #include "task_manager.h"
22 #include "taskpool.h"
23 #include "utils/log.h"
24 #include "worker.h"
25
26 namespace Commonlibrary::Concurrent::TaskPoolModule {
27 static constexpr char ONRECEIVEDATA_STR[] = "onReceiveData";
28 static constexpr char SETTRANSFERLIST_STR[] = "setTransferList";
29 static constexpr char SET_CLONE_LIST_STR[] = "setCloneList";
30
31 using namespace Commonlibrary::Concurrent::Common::Helper;
32
Task(napi_env env,TaskType taskType,std::string name)33 Task::Task(napi_env env, TaskType taskType, std::string name) : env_(env), taskType_(taskType), name_(name) {}
34
TaskConstructor(napi_env env,napi_callback_info cbinfo)35 napi_value Task::TaskConstructor(napi_env env, napi_callback_info cbinfo)
36 {
37 // check argv count
38 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
39 std::string errMessage = "";
40 if (argc < 1) {
41 errMessage = "taskpool:: create task need more than one param";
42 HILOG_ERROR("%{public}s", errMessage.c_str());
43 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
44 return nullptr;
45 }
46
47 napi_value* args = new napi_value[argc];
48 ObjectScope<napi_value> scope(args, true);
49 napi_value thisVar;
50 napi_value func = nullptr;
51 napi_value name = nullptr;
52 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
53 // if the first is task name, the second might be func
54 if (argc > 1 && NapiHelper::IsString(env, args[0])) {
55 name = args[0];
56 func = args[1];
57 args += 2; // 2: name and func
58 argc -= 2; // 2: name and func
59 } else {
60 func = args[0];
61 args += 1; // 1: func
62 argc -= 1; // 1: func
63 }
64 if (!NapiHelper::IsFunction(env, func)) {
65 errMessage = "taskpool:: the first or second param of task must be function";
66 HILOG_ERROR("%{public}s", errMessage.c_str());
67 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
68 return nullptr;
69 }
70
71 Task* task = GenerateTask(env, thisVar, func, name, args, argc);
72 TaskManager::GetInstance().StoreTask(task->taskId_, task);
73 napi_wrap(env, thisVar, task, TaskDestructor, nullptr, nullptr);
74 napi_create_reference(env, thisVar, 0, &task->taskRef_);
75 return thisVar;
76 }
77
TaskDestructor(napi_env env,void * data,void * hint)78 void Task::TaskDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
79 {
80 Task* task = static_cast<Task*>(data);
81 TaskManager::GetInstance().ReleaseTaskData(env, task);
82 delete task;
83 }
84
GenerateTask(napi_env env,napi_value napiTask,napi_value func,napi_value name,napi_value * args,size_t argc)85 Task* Task::GenerateTask(napi_env env, napi_value napiTask, napi_value func,
86 napi_value name, napi_value* args, size_t argc)
87 {
88 napi_value argsArray;
89 napi_create_array_with_length(env, argc, &argsArray);
90 for (size_t i = 0; i < argc; i++) {
91 napi_set_element(env, argsArray, i, args[i]);
92 }
93 if (name == nullptr) {
94 name = NapiHelper::GetNameProperty(env, func, NAME);
95 }
96 char* nameStr = NapiHelper::GetString(env, name);
97 Task* task = new Task(env, TaskType::TASK, nameStr);
98 delete[] nameStr;
99 task->taskId_ = reinterpret_cast<uint64_t>(task);
100 uv_loop_t* loop = NapiHelper::GetLibUV(env);
101 task->onResultSignal_ = new uv_async_t;
102 uv_async_init(loop, task->onResultSignal_, reinterpret_cast<uv_async_cb>(TaskPool::HandleTaskResult));
103 task->onResultSignal_->data = task;
104 task->increaseRefSignal_ = new uv_async_t;
105 uv_async_init(loop, task->increaseRefSignal_, reinterpret_cast<uv_async_cb>(Task::IncreaseTaskRef));
106 task->increaseRefSignal_->data = task;
107 napi_value taskId = NapiHelper::CreateUint64(env, task->taskId_);
108 napi_value napiTrue = NapiHelper::CreateBooleanValue(env, true);
109 napi_value napiFalse = NapiHelper::CreateBooleanValue(env, false);
110 // add task name to task
111 napi_set_named_property(env, napiTask, NAME, name);
112 napi_set_named_property(env, napiTask, FUNCTION_STR, func);
113 napi_set_named_property(env, napiTask, TASKID_STR, taskId);
114 napi_set_named_property(env, napiTask, ARGUMENTS_STR, argsArray);
115 napi_set_named_property(env, napiTask, DEFAULT_TRANSFER_STR, napiTrue);
116 napi_set_named_property(env, napiTask, DEFAULT_CLONE_SENDABLE_STR, napiFalse);
117 napi_property_descriptor properties[] = {
118 DECLARE_NAPI_FUNCTION(SETTRANSFERLIST_STR, SetTransferList),
119 DECLARE_NAPI_FUNCTION(SET_CLONE_LIST_STR, SetCloneList),
120 DECLARE_NAPI_FUNCTION(ONRECEIVEDATA_STR, OnReceiveData),
121 DECLARE_NAPI_FUNCTION(ADD_DEPENDENCY_STR, AddDependency),
122 DECLARE_NAPI_FUNCTION(REMOVE_DEPENDENCY_STR, RemoveDependency),
123 DECLARE_NAPI_GETTER(TASK_TOTAL_TIME, GetTotalDuration),
124 DECLARE_NAPI_GETTER(TASK_CPU_TIME, GetCPUDuration),
125 DECLARE_NAPI_GETTER(TASK_IO_TIME, GetIODuration)
126 };
127 napi_define_properties(env, napiTask, sizeof(properties) / sizeof(properties[0]), properties);
128
129 return task;
130 }
131
GenerateFunctionTask(napi_env env,napi_value func,napi_value * args,size_t argc,TaskType type)132 Task* Task::GenerateFunctionTask(napi_env env, napi_value func, napi_value* args, size_t argc, TaskType type)
133 {
134 napi_value argsArray;
135 napi_create_array_with_length(env, argc, &argsArray);
136 for (size_t i = 0; i < argc; i++) {
137 napi_set_element(env, argsArray, i, args[i]);
138 }
139 napi_value undefined = NapiHelper::GetUndefinedValue(env);
140 TaskInfo* taskInfo = GenerateTaskInfo(env, func, argsArray, undefined, undefined, Priority::DEFAULT);
141 if (taskInfo == nullptr) {
142 return nullptr;
143 }
144 napi_value napiFuncName = NapiHelper::GetNameProperty(env, func, NAME);
145 char* nameStr = NapiHelper::GetString(env, napiFuncName);
146 Task* task = new Task(env, type, nameStr);
147 delete[] nameStr;
148 task->taskId_ = reinterpret_cast<uint64_t>(task);
149 task->currentTaskInfo_ = taskInfo;
150 task->onResultSignal_ = new uv_async_t;
151 uv_loop_t* loop = NapiHelper::GetLibUV(env);
152 uv_async_init(loop, task->onResultSignal_, reinterpret_cast<uv_async_cb>(TaskPool::HandleTaskResult));
153 task->onResultSignal_->data = task;
154 return task;
155 }
156
GetTaskInfoPromise(napi_env env,napi_value task,TaskType taskType,Priority priority)157 napi_value Task::GetTaskInfoPromise(napi_env env, napi_value task, TaskType taskType, Priority priority)
158 {
159 TaskInfo* taskInfo = GetTaskInfo(env, task, taskType, priority);
160 if (taskInfo == nullptr) {
161 return nullptr;
162 }
163 return NapiHelper::CreatePromise(env, &taskInfo->deferred);
164 }
165
GetTaskInfo(napi_env env,napi_value task,TaskType taskType,Priority priority)166 TaskInfo* Task::GetTaskInfo(napi_env env, napi_value task, TaskType taskType, Priority priority)
167 {
168 std::string errMessage = "";
169 if (!IsInitialized() && taskType_ != taskType) {
170 errMessage = "taskpool:: task type is error";
171 HILOG_ERROR("%{public}s", errMessage.c_str());
172 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
173 return nullptr;
174 }
175 taskType_ = taskType;
176 napi_value func = NapiHelper::GetNameProperty(env, task, FUNCTION_STR);
177 napi_value args = NapiHelper::GetNameProperty(env, task, ARGUMENTS_STR);
178 napi_value taskName = NapiHelper::GetNameProperty(env, task, NAME);
179 napi_value napiDefaultTransfer = NapiHelper::GetNameProperty(env, task, DEFAULT_TRANSFER_STR);
180 napi_value napiDefaultClone = NapiHelper::GetNameProperty(env, task, DEFAULT_CLONE_SENDABLE_STR);
181 if (func == nullptr || args == nullptr || napiDefaultTransfer == nullptr || napiDefaultClone == nullptr) {
182 errMessage = "taskpool:: task value is error";
183 HILOG_ERROR("%{public}s", errMessage.c_str());
184 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
185 return nullptr;
186 }
187 napi_value transferList = NapiHelper::GetUndefinedValue(env);
188 if (NapiHelper::HasNameProperty(env, task, TRANSFERLIST_STR)) {
189 transferList = NapiHelper::GetNameProperty(env, task, TRANSFERLIST_STR);
190 }
191 napi_value cloneList = NapiHelper::GetUndefinedValue(env);
192 if (NapiHelper::HasNameProperty(env, task, CLONE_LIST_STR)) {
193 cloneList = NapiHelper::GetNameProperty(env, task, CLONE_LIST_STR);
194 }
195 bool defaultTransfer = NapiHelper::GetBooleanValue(env, napiDefaultTransfer);
196 bool defaultCloneSendable = NapiHelper::GetBooleanValue(env, napiDefaultClone);
197 TaskInfo* pendingInfo = GenerateTaskInfo(env, func, args, transferList, cloneList, priority,
198 defaultTransfer, defaultCloneSendable);
199 if (pendingInfo == nullptr) {
200 return nullptr;
201 }
202 {
203 std::unique_lock<std::shared_mutex> lock(taskMutex_);
204 if (currentTaskInfo_ == nullptr) {
205 currentTaskInfo_ = pendingInfo;
206 } else {
207 pendingTaskInfos_.push_back(pendingInfo);
208 }
209 napi_reference_ref(env, taskRef_, nullptr);
210 }
211 char* name = NapiHelper::GetString(env, taskName);
212 if (strlen(name) == 0) {
213 napi_value funcName = NapiHelper::GetNameProperty(env, func, NAME);
214 name = NapiHelper::GetString(env, funcName);
215 }
216 name_ = std::string(name);
217 delete[] name;
218 reinterpret_cast<NativeEngine*>(env)->IncreaseSubEnvCounter();
219 return pendingInfo;
220 }
221
SetTransferList(napi_env env,napi_callback_info cbinfo)222 napi_value Task::SetTransferList(napi_env env, napi_callback_info cbinfo)
223 {
224 size_t argc = 1;
225 napi_value args[1];
226 napi_value thisVar;
227 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
228 // Check whether clone list has been set
229 if (NapiHelper::HasNameProperty(env, thisVar, CLONE_LIST_STR)) {
230 ErrorHelper::ThrowError(env, ErrorHelper::ERR_IN_BOTH_CLONE_AND_TRANSFER);
231 return nullptr;
232 }
233 if (argc > 1) {
234 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
235 "taskpool:: the number of setTransferList parma must be less than 2");
236 return nullptr;
237 }
238 Task* task = nullptr;
239 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
240 if (task == nullptr) {
241 HILOG_ERROR("taskpool:: task is nullptr");
242 return nullptr;
243 }
244 napi_value undefined = NapiHelper::GetUndefinedValue(env);
245 napi_value falseVal = NapiHelper::CreateBooleanValue(env, false);
246 if (argc == 0) {
247 HILOG_DEBUG("taskpool:: set task params not transfer");
248 napi_set_named_property(env, thisVar, TRANSFERLIST_STR, undefined);
249 // set task.defaultTransfer false
250 napi_set_named_property(env, thisVar, DEFAULT_TRANSFER_STR, falseVal);
251 return nullptr;
252 }
253 if (!NapiHelper::IsArray(env, args[0])) {
254 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: setTransferList first param must be array");
255 return nullptr;
256 }
257 // set task.defaultTransfer false
258 napi_set_named_property(env, thisVar, DEFAULT_TRANSFER_STR, falseVal);
259 uint32_t arrayLength = NapiHelper::GetArrayLength(env, args[0]);
260 if (arrayLength == 0) {
261 HILOG_DEBUG("taskpool:: set task params not transfer");
262 napi_set_named_property(env, thisVar, TRANSFERLIST_STR, undefined);
263 return nullptr;
264 }
265 for (size_t i = 0; i < arrayLength; i++) {
266 napi_value transferVal = NapiHelper::GetElement(env, args[0], i);
267 if (!NapiHelper::IsArrayBuffer(env, transferVal)) {
268 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
269 "taskpool:: the element in array must be arraybuffer");
270 return nullptr;
271 }
272 }
273 HILOG_DEBUG("taskpool:: check setTransferList param success");
274 napi_set_named_property(env, thisVar, TRANSFERLIST_STR, args[0]);
275 return nullptr;
276 }
277
SetCloneList(napi_env env,napi_callback_info cbinfo)278 napi_value Task::SetCloneList(napi_env env, napi_callback_info cbinfo)
279 {
280 size_t argc = 1;
281 napi_value args[1];
282 napi_value thisVar;
283 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
284 // Check whether transfer list has been set
285 if (NapiHelper::HasNameProperty(env, thisVar, TRANSFERLIST_STR)) {
286 ErrorHelper::ThrowError(env, ErrorHelper::ERR_IN_BOTH_CLONE_AND_TRANSFER);
287 return nullptr;
288 }
289 if (argc != 1) {
290 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: the number of setCloneList parma must be 1");
291 return nullptr;
292 }
293 if (!NapiHelper::IsArray(env, args[0])) {
294 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: setCloneList first param must be array");
295 return nullptr;
296 }
297 Task* task = nullptr;
298 napi_unwrap(env, thisVar, reinterpret_cast<void**>(&task));
299 if (task == nullptr) {
300 HILOG_ERROR("taskpool:: task is nullptr");
301 return nullptr;
302 }
303 napi_value undefined = NapiHelper::GetUndefinedValue(env);
304 uint32_t arrayLength = NapiHelper::GetArrayLength(env, args[0]);
305 if (arrayLength == 0) {
306 HILOG_DEBUG("taskpool:: clone list is empty");
307 napi_set_named_property(env, thisVar, CLONE_LIST_STR, undefined);
308 return nullptr;
309 }
310 for (size_t i = 0; i < arrayLength; i++) {
311 napi_value cloneVal = NapiHelper::GetElement(env, args[0], i);
312 if (!NapiHelper::IsArrayBuffer(env, cloneVal) && !NapiHelper::IsSendablObject(env, cloneVal)) {
313 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
314 "taskpool:: setCloneList elements in array must be ArrayBuffer or Sendable Class instance");
315 return nullptr;
316 }
317 }
318 napi_set_named_property(env, thisVar, CLONE_LIST_STR, args[0]);
319 return nullptr;
320 }
321
IsCanceled(napi_env env,napi_callback_info cbinfo)322 napi_value Task::IsCanceled(napi_env env, napi_callback_info cbinfo)
323 {
324 bool isCanceled = false;
325 auto engine = reinterpret_cast<NativeEngine*>(env);
326 if (!engine->IsTaskPoolThread()) {
327 HILOG_ERROR("taskpool:: call isCanceled not in taskpool thread");
328 return NapiHelper::CreateBooleanValue(env, isCanceled);
329 }
330 // Get task and query task cancel state
331 void* data = engine->GetCurrentTaskInfo();
332 if (data == nullptr) {
333 HILOG_ERROR("taskpool:: call isCanceled not in Concurrent function");
334 } else {
335 Task* task = static_cast<Task*>(data);
336 isCanceled = task->taskState_ == ExecuteState::CANCELED ? true : false;
337 }
338 return NapiHelper::CreateBooleanValue(env, isCanceled);
339 }
340
OnReceiveData(napi_env env,napi_callback_info cbinfo)341 napi_value Task::OnReceiveData(napi_env env, napi_callback_info cbinfo)
342 {
343 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
344 if (argc >= 2) { // 2: the number of parmas
345 HILOG_ERROR("taskpool:: the number of OnReceiveData parma must be less than 2");
346 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
347 "taskpool:: the number of OnReceiveData parma must be less than 2");
348 return nullptr;
349 }
350
351 napi_value thisVar;
352 if (argc == 0) {
353 HILOG_INFO("taskpool:: Set taskpool.Task.onReceiveData to undefined");
354 napi_get_cb_info(env, cbinfo, &argc, nullptr, &thisVar, nullptr);
355 napi_value id = NapiHelper::GetNameProperty(env, thisVar, "taskId");
356 uint64_t taskId = NapiHelper::GetUint64Value(env, id);
357 TaskManager::GetInstance().RegisterCallback(env, taskId, nullptr);
358 return nullptr;
359 }
360
361 napi_value args[1];
362 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
363 napi_valuetype type;
364 NAPI_CALL(env, napi_typeof(env, args[0], &type));
365 if (type != napi_function) {
366 HILOG_ERROR("taskpool:: OnReceiveData's parameter should be function");
367 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
368 "taskpool:: OnReceiveData's parameter should be function");
369 return nullptr;
370 }
371 // store callbackInfo
372 napi_value napiTaskId = NapiHelper::GetNameProperty(env, thisVar, "taskId");
373 uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
374 napi_ref callbackRef = Helper::NapiHelper::CreateReference(env, args[0], 1);
375 std::shared_ptr<CallbackInfo> callbackInfo = std::make_shared<CallbackInfo>(env, 1, callbackRef);
376 callbackInfo->onCallbackSignal = new uv_async_t;
377 auto loop = NapiHelper::GetLibUV(env);
378 uv_async_init(loop, callbackInfo->onCallbackSignal, TaskPool::ExecuteCallback);
379 TaskManager::GetInstance().RegisterCallback(env, taskId, callbackInfo);
380 return nullptr;
381 }
382
SendData(napi_env env,napi_callback_info cbinfo)383 napi_value Task::SendData(napi_env env, napi_callback_info cbinfo)
384 {
385 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
386 napi_value args[argc];
387 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
388
389 napi_value argsArray;
390 napi_create_array_with_length(env, argc, &argsArray);
391 for (size_t i = 0; i < argc; i++) {
392 napi_set_element(env, argsArray, i, args[i]);
393 }
394
395 auto engine = reinterpret_cast<NativeEngine*>(env);
396 if (!engine->IsTaskPoolThread()) {
397 HILOG_ERROR("taskpool:: SendData is not called in the taskpool thread");
398 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_IN_TASKPOOL_THREAD);
399 return nullptr;
400 }
401 Task* task = nullptr;
402 void* data = engine->GetCurrentTaskInfo();
403 if (data == nullptr) {
404 HILOG_ERROR("taskpool:: SendData is not called in the concurrent function");
405 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_IN_CONCURRENT_FUNCTION);
406 return nullptr;
407 } else {
408 task = static_cast<Task*>(data);
409 }
410
411 napi_value undefined = NapiHelper::GetUndefinedValue(env);
412 napi_value serializationArgs;
413 bool defaultClone = true;
414 bool defaultTransfer = true;
415 napi_status status = napi_serialize(env, argsArray, undefined, argsArray,
416 defaultTransfer, defaultClone, &serializationArgs);
417 if (status != napi_ok || serializationArgs == nullptr) {
418 std::string errMessage = "taskpool:: failed to serialize function";
419 HILOG_ERROR("%{public}s in SendData", errMessage.c_str());
420 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
421 return nullptr;
422 }
423 uv_async_send(task->increaseRefSignal_);
424 TaskResultInfo* resultInfo = new TaskResultInfo(task->env_, task->taskId_, serializationArgs);
425 return TaskManager::GetInstance().NotifyCallbackExecute(env, resultInfo, task);
426 }
427
AddDependency(napi_env env,napi_callback_info cbinfo)428 napi_value Task::AddDependency(napi_env env, napi_callback_info cbinfo)
429 {
430 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
431 if (argc == 0) {
432 std::string errMessage = "taskpool:: addDependency has no params";
433 HILOG_ERROR("%{public}s", errMessage.c_str());
434 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
435 return nullptr;
436 }
437
438 napi_status status = napi_ok;
439 HandleScope scope(env, status);
440 napi_value args[argc];
441 napi_value napiTask;
442 napi_get_cb_info(env, cbinfo, &argc, args, &napiTask, nullptr);
443 Task* task = nullptr;
444 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
445 std::string errMessage = "";
446 if (task->IsCommonTask() || task->IsSeqRunnerTask()) {
447 errMessage = "taskpool:: seqRunnerTask or executedTask cannot addDependency";
448 HILOG_ERROR("%{public}s", errMessage.c_str());
449 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
450 return nullptr;
451 }
452 if (task->IsGroupCommonTask()) {
453 errMessage = "taskpool:: groupTask cannot addDependency";
454 HILOG_ERROR("%{public}s", errMessage.c_str());
455 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
456 return nullptr;
457 }
458 std::set<uint64_t> idSet;
459 for (size_t i = 0; i < argc; i++) {
460 if (!NapiHelper::HasNameProperty(env, args[i], TASKID_STR)) {
461 errMessage = "taskpool:: addDependency param is not task";
462 HILOG_ERROR("%{public}s", errMessage.c_str());
463 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
464 return nullptr;
465 } else {
466 Task* dependentTask = nullptr;
467 napi_unwrap(env, args[i], reinterpret_cast<void**>(&dependentTask));
468 if (dependentTask->taskId_ == task->taskId_) {
469 HILOG_ERROR("taskpool:: there is a circular dependency");
470 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CIRCULAR_DEPENDENCY);
471 return nullptr;
472 }
473 if (dependentTask->IsCommonTask() || dependentTask->IsSeqRunnerTask()) {
474 errMessage = "taskpool:: seqRunnerTask or executedTask cannot be relied on";
475 HILOG_ERROR("%{public}s", errMessage.c_str());
476 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
477 return nullptr;
478 }
479 if (dependentTask->IsGroupCommonTask()) {
480 errMessage = "taskpool:: groupTask cannot be relied on";
481 HILOG_ERROR("%{public}s", errMessage.c_str());
482 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
483 return nullptr;
484 }
485 idSet.emplace(dependentTask->taskId_);
486 }
487 }
488 if (!TaskManager::GetInstance().StoreTaskDependency(task->taskId_, idSet)) {
489 HILOG_ERROR("taskpool:: there is a circular dependency");
490 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CIRCULAR_DEPENDENCY);
491 }
492 return nullptr;
493 }
494
RemoveDependency(napi_env env,napi_callback_info cbinfo)495 napi_value Task::RemoveDependency(napi_env env, napi_callback_info cbinfo)
496 {
497 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
498 if (argc == 0) {
499 std::string errMessage = "taskpool:: removeDependency has no params";
500 HILOG_ERROR("%{public}s", errMessage.c_str());
501 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
502 return nullptr;
503 }
504
505 napi_status status = napi_ok;
506 HandleScope scope(env, status);
507 napi_value args[argc];
508 napi_value napiTask;
509 napi_get_cb_info(env, cbinfo, &argc, args, &napiTask, nullptr);
510 Task* task = nullptr;
511 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
512 for (size_t i = 0; i < argc; i++) {
513 if (!NapiHelper::HasNameProperty(env, args[i], TASKID_STR)) {
514 std::string errMessage = "taskpool:: removeDependency param is not task";
515 HILOG_ERROR("%{public}s", errMessage.c_str());
516 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
517 return nullptr;
518 }
519 Task* dependentTask = nullptr;
520 napi_unwrap(env, args[i], reinterpret_cast<void**>(&dependentTask));
521 if (!TaskManager::GetInstance().RemoveTaskDependency(task->taskId_, dependentTask->taskId_)) {
522 HILOG_ERROR("taskpool:: the dependency does not exist");
523 ErrorHelper::ThrowError(env, ErrorHelper::ERR_INEXISTENT_DEPENDENCY);
524 return nullptr;
525 }
526 }
527 return nullptr;
528 }
529
GetTaskDuration(napi_env env,napi_callback_info & cbinfo,std::string durationType)530 napi_value Task::GetTaskDuration(napi_env env, napi_callback_info& cbinfo, std::string durationType)
531 {
532 napi_value thisVar = nullptr;
533 napi_get_cb_info(env, cbinfo, nullptr, nullptr, &thisVar, nullptr);
534 napi_value napiTaskId = NapiHelper::GetNameProperty(env, thisVar, TASKID_STR);
535 uint64_t taskId = NapiHelper::GetUint64Value(env, napiTaskId);
536 uint64_t totalDuration = TaskManager::GetInstance().GetTaskDuration(taskId, durationType);
537 return NapiHelper::CreateUint32(env, totalDuration);
538 }
539
GetTotalDuration(napi_env env,napi_callback_info cbinfo)540 napi_value Task::GetTotalDuration(napi_env env, napi_callback_info cbinfo)
541 {
542 return GetTaskDuration(env, cbinfo, TASK_TOTAL_TIME);
543 }
544
GetCPUDuration(napi_env env,napi_callback_info cbinfo)545 napi_value Task::GetCPUDuration(napi_env env, napi_callback_info cbinfo)
546 {
547 return GetTaskDuration(env, cbinfo, TASK_CPU_TIME);
548 }
549
GetIODuration(napi_env env,napi_callback_info cbinfo)550 napi_value Task::GetIODuration(napi_env env, napi_callback_info cbinfo)
551 {
552 return GetTaskDuration(env, cbinfo, TASK_IO_TIME);
553 }
554
IsRepeatableTask()555 bool Task::IsRepeatableTask()
556 {
557 return IsCommonTask() || IsGroupCommonTask() || IsGroupFunctionTask();
558 }
559
IsGroupTask()560 bool Task::IsGroupTask()
561 {
562 return IsGroupCommonTask() || IsGroupFunctionTask();
563 }
564
IsGroupCommonTask()565 bool Task::IsGroupCommonTask()
566 {
567 return taskType_ == TaskType::GROUP_COMMON_TASK;
568 }
569
IsGroupFunctionTask()570 bool Task::IsGroupFunctionTask()
571 {
572 return taskType_ == TaskType::GROUP_FUNCTION_TASK;
573 }
574
IsCommonTask()575 bool Task::IsCommonTask()
576 {
577 return taskType_ == TaskType::COMMON_TASK;
578 }
579
IsSeqRunnerTask()580 bool Task::IsSeqRunnerTask()
581 {
582 return taskType_ == TaskType::SEQRUNNER_TASK;
583 }
584
IsFunctionTask()585 bool Task::IsFunctionTask()
586 {
587 return taskType_ == TaskType::FUNCTION_TASK;
588 }
589
IsInitialized()590 bool Task::IsInitialized()
591 {
592 return taskType_ == TaskType::TASK;
593 }
594
GenerateTaskInfo(napi_env env,napi_value func,napi_value args,napi_value transferList,napi_value cloneList,Priority priority,bool defaultTransfer,bool defaultCloneSendable)595 TaskInfo* Task::GenerateTaskInfo(napi_env env, napi_value func, napi_value args,
596 napi_value transferList, napi_value cloneList, Priority priority,
597 bool defaultTransfer, bool defaultCloneSendable)
598 {
599 napi_value undefined = NapiHelper::GetUndefinedValue(env);
600 napi_value serializationFunction;
601 napi_status status = napi_serialize(env, func, undefined, undefined,
602 defaultTransfer, defaultCloneSendable, &serializationFunction);
603 std::string errMessage = "";
604 if (status != napi_ok || serializationFunction == nullptr) {
605 errMessage = "taskpool: failed to serialize function.";
606 HILOG_ERROR("%{public}s", errMessage.c_str());
607 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_CONCURRENT_FUNCTION, errMessage.c_str());
608 return nullptr;
609 }
610 napi_value serializationArguments;
611 status = napi_serialize(env, args, transferList, cloneList,
612 defaultTransfer, defaultCloneSendable, &serializationArguments);
613 if (status != napi_ok || serializationArguments == nullptr) {
614 errMessage = "taskpool: failed to serialize arguments.";
615 HILOG_ERROR("%{public}s", errMessage.c_str());
616 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
617 return nullptr;
618 }
619
620 TaskInfo* taskInfo = new TaskInfo();
621 taskInfo->serializationFunction = serializationFunction;
622 taskInfo->serializationArguments = serializationArguments;
623 taskInfo->priority = priority;
624 return taskInfo;
625 }
626
IncreaseRefCount()627 void Task::IncreaseRefCount()
628 {
629 taskRefCount_.fetch_add(2); // 2 : for PerformTask and TaskResultCallback
630 }
631
DecreaseRefCount()632 void Task::DecreaseRefCount()
633 {
634 taskRefCount_.fetch_sub(1);
635 }
636
IsReadyToHandle()637 bool Task::IsReadyToHandle()
638 {
639 return (taskRefCount_ & 1) == 0;
640 }
641
NotifyPendingTask()642 void Task::NotifyPendingTask()
643 {
644 TaskManager::GetInstance().NotifyDependencyTaskInfo(taskId_);
645 std::unique_lock<std::shared_mutex> lock(taskMutex_);
646 auto finishedTaskInfo = currentTaskInfo_;
647 currentTaskInfo_ = nullptr;
648 delete finishedTaskInfo;
649 if (pendingTaskInfos_.size() == 0) {
650 return;
651 }
652 currentTaskInfo_ = pendingTaskInfos_.front();
653 pendingTaskInfos_.pop_front();
654 TaskManager::GetInstance().EnqueueTaskId(taskId_, currentTaskInfo_->priority);
655 }
656
CancelPendingTask(napi_env env,ExecuteState state)657 void Task::CancelPendingTask(napi_env env, ExecuteState state)
658 {
659 napi_value undefined = NapiHelper::GetUndefinedValue(env);
660 if (state == ExecuteState::WAITING && currentTaskInfo_ != nullptr) {
661 napi_reject_deferred(env, currentTaskInfo_->deferred, undefined);
662 napi_reference_unref(env, taskRef_, nullptr);
663 delete currentTaskInfo_;
664 currentTaskInfo_ = nullptr;
665 }
666 if (pendingTaskInfos_.size() == 0) {
667 return;
668 }
669 auto pendingIter = pendingTaskInfos_.begin();
670 for (; pendingIter != pendingTaskInfos_.end(); ++pendingIter) {
671 TaskInfo* info = *pendingIter;
672 napi_reject_deferred(env, info->deferred, undefined);
673 napi_reference_unref(env, taskRef_, nullptr);
674 delete info;
675 }
676 pendingIter = pendingTaskInfos_.begin();
677 pendingTaskInfos_.erase(pendingIter, pendingTaskInfos_.end());
678 }
679
UpdateTask(uint64_t startTime,void * worker)680 bool Task::UpdateTask(uint64_t startTime, void* worker)
681 {
682 if (taskState_ == ExecuteState::CANCELED) { // task may have been canceled
683 static_cast<Worker*>(worker)->NotifyTaskFinished();
684 HILOG_DEBUG("taskpool::PerformTask task is null");
685 return false;
686 }
687 taskState_ = ExecuteState::RUNNING;
688 startTime_ = startTime;
689 worker_ = worker;
690 return true;
691 }
692
DeserializeValue(napi_env env,bool isFunc,bool isArgs)693 napi_value Task::DeserializeValue(napi_env env, bool isFunc, bool isArgs)
694 {
695 napi_value result = nullptr;
696 napi_status status = napi_ok;
697 std::string errMessage = "";
698 if (isFunc) {
699 status = napi_deserialize(env, currentTaskInfo_->serializationFunction, &result);
700 if (!IsGroupFunctionTask()) {
701 napi_delete_serialization_data(env, currentTaskInfo_->serializationFunction);
702 }
703 if (status != napi_ok || result == nullptr) {
704 errMessage = "taskpool:: failed to deserialize function.";
705 HILOG_ERROR("%{public}s", errMessage.c_str());
706 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
707 success_ = false;
708 static_cast<Worker*>(worker_)->NotifyTaskResult(env, this, err);
709 return nullptr;
710 }
711 return result;
712 } else if (isArgs) {
713 status = napi_deserialize(env, currentTaskInfo_->serializationArguments, &result);
714 if (!IsGroupFunctionTask()) {
715 napi_delete_serialization_data(env, currentTaskInfo_->serializationArguments);
716 }
717 if (status != napi_ok || result == nullptr) {
718 errMessage = "taskpool:: failed to deserialize arguments.";
719 HILOG_ERROR("%{public}s", errMessage.c_str());
720 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
721 success_ = false;
722 static_cast<Worker*>(worker_)->NotifyTaskResult(env, this, err);
723 return nullptr;
724 }
725 return result;
726 }
727 return nullptr;
728 }
729
StoreTaskDuration()730 void Task::StoreTaskDuration()
731 {
732 cpuTime_ = ConcurrentHelper::GetMilliseconds();
733 uint64_t cpuDuration = cpuTime_ - startTime_;
734 if (ioTime_ != 0) {
735 uint64_t ioDuration = ioTime_ - startTime_;
736 TaskManager::GetInstance().StoreTaskDuration(taskId_, std::max(cpuDuration, ioDuration), cpuDuration);
737 } else {
738 TaskManager::GetInstance().StoreTaskDuration(taskId_, 0, cpuDuration);
739 }
740 }
741
CanForSequenceRunner(napi_env env)742 bool Task::CanForSequenceRunner(napi_env env)
743 {
744 std::string errMessage = "";
745 // task with dependence is not allowed
746 if (TaskManager::GetInstance().IsDependendByTaskId(taskId_)) {
747 errMessage = "seqRunner:: dependent task not allowed.";
748 HILOG_ERROR("%{public}s", errMessage.c_str());
749 ErrorHelper::ThrowError(env, ErrorHelper::ERR_ADD_DEPENDENT_TASK_TO_SEQRUNNER, errMessage.c_str());
750 return false;
751 }
752 if (IsCommonTask() || IsSeqRunnerTask()) {
753 errMessage = "taskpool:: SequenceRunner cannot execute seqRunnerTask or executedTask";
754 HILOG_ERROR("%{public}s", errMessage.c_str());
755 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
756 return false;
757 }
758 if (IsGroupCommonTask()) {
759 errMessage = "taskpool:: SequenceRunner cannot execute groupTask";
760 HILOG_ERROR("%{public}s", errMessage.c_str());
761 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
762 return false;
763 }
764 return true;
765 }
766
CanForTaskGroup(napi_env env)767 bool Task::CanForTaskGroup(napi_env env)
768 {
769 std::string errMessage = "";
770 if (TaskManager::GetInstance().IsDependendByTaskId(taskId_)) {
771 errMessage = "taskpool:: dependent task not allowed.";
772 HILOG_ERROR("%{public}s", errMessage.c_str());
773 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
774 return false;
775 }
776 if (IsCommonTask() || IsSeqRunnerTask()) {
777 errMessage = "taskpool:: taskGroup cannot add seqRunnerTask or executedTask";
778 HILOG_ERROR("%{public}s", errMessage.c_str());
779 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
780 return false;
781 }
782 if (IsGroupCommonTask()) {
783 errMessage = "taskpool:: taskGroup cannot add groupTask";
784 HILOG_ERROR("%{public}s", errMessage.c_str());
785 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
786 return false;
787 }
788 taskType_ = TaskType::GROUP_COMMON_TASK;
789 return true;
790 }
791
CanExecute(napi_env env)792 bool Task::CanExecute(napi_env env)
793 {
794 std::string errMessage = "";
795 if (IsGroupCommonTask()) {
796 errMessage = "taskpool:: groupTask cannot execute outside";
797 HILOG_ERROR("%{public}s", errMessage.c_str());
798 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
799 return false;
800 }
801 if (IsSeqRunnerTask()) {
802 errMessage = "taskpool:: seqRunnerTask cannot execute outside";
803 HILOG_ERROR("%{public}s", errMessage.c_str());
804 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
805 return false;
806 }
807 return true;
808 }
809
CanExecuteDelayed(napi_env env)810 bool Task::CanExecuteDelayed(napi_env env)
811 {
812 std::string errMessage = "";
813 if (IsGroupCommonTask()) {
814 errMessage = "taskpool:: groupTask cannot executeDelayed outside";
815 HILOG_ERROR("%{public}s", errMessage.c_str());
816 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
817 return false;
818 }
819 if (IsSeqRunnerTask()) {
820 errMessage = "taskpool:: seqRunnerTask cannot executeDelayed outside";
821 HILOG_ERROR("%{public}s", errMessage.c_str());
822 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
823 return false;
824 }
825 return true;
826 }
827
IncreaseTaskRef(const uv_async_t * req)828 void Task::IncreaseTaskRef(const uv_async_t* req)
829 {
830 auto task = static_cast<Task*>(req->data);
831 if (task == nullptr) {
832 HILOG_FATAL("taskpool:: IncreaseTaskRef task is nullptr");
833 return;
834 }
835 napi_reference_ref(task->env_, task->taskRef_, nullptr);
836 }
837 } // namespace Commonlibrary::Concurrent::TaskPoolModule