1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "taskpool.h"
17
18 #include "helper/error_helper.h"
19 #include "helper/hitrace_helper.h"
20 #include "helper/napi_helper.h"
21 #include "helper/object_helper.h"
22 #include "task_manager.h"
23 #include "utils/log.h"
24
25 namespace Commonlibrary::Concurrent::TaskPoolModule {
26 using namespace Commonlibrary::Concurrent::Common::Helper;
27
InitTaskPool(napi_env env,napi_value exports)28 napi_value TaskPool::InitTaskPool(napi_env env, napi_value exports)
29 {
30 HILOG_INFO("taskpool:: Import taskpool");
31 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
32 napi_value taskClass = nullptr;
33 napi_define_class(env, "Task", NAPI_AUTO_LENGTH, Task::TaskConstructor, nullptr, 0, nullptr, &taskClass);
34 napi_value taskGroupClass = nullptr;
35 napi_define_class(env, "TaskGroup", NAPI_AUTO_LENGTH, TaskGroup::TaskGroupConstructor, nullptr, 0, nullptr,
36 &taskGroupClass);
37 napi_value isCanceledFunc;
38 napi_create_function(env, "isCanceled", NAPI_AUTO_LENGTH, TaskManager::IsCanceled, NULL, &isCanceledFunc);
39 napi_set_named_property(env, taskClass, "isCanceled", isCanceledFunc);
40
41 // define priority
42 napi_value priorityObj = NapiHelper::CreateObject(env);
43 napi_value highPriority = NapiHelper::CreateUint32(env, Priority::HIGH);
44 napi_value mediumPriority = NapiHelper::CreateUint32(env, Priority::MEDIUM);
45 napi_value lowPriority = NapiHelper::CreateUint32(env, Priority::LOW);
46 napi_property_descriptor exportPriority[] = {
47 DECLARE_NAPI_PROPERTY("HIGH", highPriority),
48 DECLARE_NAPI_PROPERTY("MEDIUM", mediumPriority),
49 DECLARE_NAPI_PROPERTY("LOW", lowPriority),
50 };
51 napi_define_properties(env, priorityObj, sizeof(exportPriority) / sizeof(exportPriority[0]), exportPriority);
52
53 napi_property_descriptor properties[] = {
54 DECLARE_NAPI_PROPERTY("Task", taskClass),
55 DECLARE_NAPI_PROPERTY("TaskGroup", taskGroupClass),
56 DECLARE_NAPI_PROPERTY("Priority", priorityObj),
57 DECLARE_NAPI_FUNCTION("execute", Execute),
58 DECLARE_NAPI_FUNCTION("cancel", Cancel),
59 DECLARE_NAPI_FUNCTION("getTaskPoolInfo", GetTaskPoolInfo),
60 };
61 napi_define_properties(env, exports, sizeof(properties) / sizeof(properties[0]), properties);
62
63 TaskManager::GetInstance().InitTaskManager(env);
64 return exports;
65 }
66
GetTaskPoolInfo(napi_env env,napi_callback_info cbinfo)67 napi_value TaskPool::GetTaskPoolInfo(napi_env env, [[maybe_unused]] napi_callback_info cbinfo)
68 {
69 napi_value result = nullptr;
70 napi_create_object(env, &result);
71 napi_value threadInfos = TaskManager::GetInstance().GetThreadInfos(env);
72 napi_value taskInfos = TaskManager::GetInstance().GetTaskInfos(env);
73 napi_set_named_property(env, result, "threadInfos", threadInfos);
74 napi_set_named_property(env, result, "taskInfos", taskInfos);
75 return result;
76 }
77
Execute(napi_env env,napi_callback_info cbinfo)78 napi_value TaskPool::Execute(napi_env env, napi_callback_info cbinfo)
79 {
80 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
81 // check the argc
82 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
83 if (argc < 1) {
84 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: the number of params must be at least one");
85 return nullptr;
86 }
87
88 // check the first param is object or func
89 napi_value* args = new napi_value[argc];
90 ObjectScope<napi_value> scope(args, true);
91 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
92 napi_valuetype type;
93 napi_typeof(env, args[0], &type);
94
95 uint32_t priority = Priority::DEFAULT; // DEFAULT priority is MEDIUM
96 if (type == napi_object) {
97 // Get execution priority
98 if (argc > 1) {
99 if (!NapiHelper::IsNumber(args[1])) {
100 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: priority type is error");
101 return nullptr;
102 }
103 priority = NapiHelper::GetUint32Value(env, args[1]);
104 if (priority >= Priority::NUMBER) {
105 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: priority value is error");
106 return nullptr;
107 }
108 }
109 if (NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
110 return ExecuteGroup(env, args[0], Priority(priority));
111 }
112 uint32_t executeId = TaskManager::GetInstance().GenerateExecuteId();
113 TaskInfo* taskInfo = TaskManager::GetInstance().GenerateTaskInfoFromTask(env, args[0], executeId);
114 if (taskInfo == nullptr) {
115 HILOG_ERROR("taskpool::ExecuteTask taskInfo is nullptr");
116 return nullptr;
117 }
118 napi_value promise = NapiHelper::CreatePromise(env, &taskInfo->deferred);
119 TaskManager::GetInstance().StoreRunningInfo(taskInfo->taskId, executeId);
120 ExecuteFunction(env, taskInfo, Priority(priority));
121 return promise;
122 }
123 if (type != napi_function) {
124 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: first param must be object or function");
125 return nullptr;
126 }
127 // Type is napi_function, execute from func directly
128 napi_value argsArray;
129 napi_create_array_with_length(env, argc - 1, &argsArray);
130 for (size_t i = 0; i < argc - 1; i++) {
131 napi_set_element(env, argsArray, i, args[i + 1]);
132 }
133 uint32_t executeId = TaskManager::GetInstance().GenerateExecuteId();
134 // Set task id to 0 when execute from func directly
135 TaskInfo* taskInfo = TaskManager::GetInstance().GenerateTaskInfo(env, args[0], argsArray, 0, executeId);
136 if (taskInfo == nullptr) {
137 HILOG_ERROR("taskpool::ExecuteFunction taskInfo is nullptr");
138 return nullptr;
139 }
140 napi_value promise = NapiHelper::CreatePromise(env, &taskInfo->deferred);
141 TaskManager::GetInstance().StoreRunningInfo(0, executeId);
142 ExecuteFunction(env, taskInfo);
143 return promise;
144 }
145
ExecuteGroup(napi_env env,napi_value taskGroup,Priority priority)146 napi_value TaskPool::ExecuteGroup(napi_env env, napi_value taskGroup, Priority priority)
147 {
148 napi_value groupIdVal = NapiHelper::GetNameProperty(env, taskGroup, GROUP_ID_STR);
149 uint32_t groupId = NapiHelper::GetUint32Value(env, groupIdVal);
150 TaskGroupManager& groupManager = TaskGroupManager::GetInstance();
151 const std::list<napi_ref>& taskRefs = groupManager.GetTasksByGroup(groupId);
152 uint32_t groupExecuteId = groupManager.GenerateGroupExecuteId();
153 GroupInfo* groupInfo = groupManager.GenerateGroupInfo(env, taskRefs.size(), groupId, groupExecuteId);
154 napi_value promise = NapiHelper::CreatePromise(env, &groupInfo->deferred);
155 for (auto iter = taskRefs.begin(); iter != taskRefs.end(); iter++) {
156 uint32_t executeId = TaskManager::GetInstance().GenerateExecuteId();
157 groupInfo->executeIds.push_back(executeId);
158 napi_value task = NapiHelper::GetReferenceValue(env, *iter);
159 TaskInfo* taskInfo = TaskManager::GetInstance().GenerateTaskInfoFromTask(env, task, executeId);
160 if (taskInfo == nullptr) {
161 HILOG_ERROR("taskpool::ExecuteGroup taskInfo is nullptr");
162 return nullptr;
163 }
164 taskInfo->groupExecuteId = groupExecuteId;
165 ExecuteFunction(env, taskInfo, Priority(priority));
166 }
167 return promise;
168 }
169
HandleTaskResult(const uv_async_t * req)170 void TaskPool::HandleTaskResult(const uv_async_t* req)
171 {
172 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
173 auto taskInfo = static_cast<TaskInfo*>(req->data);
174 if (taskInfo == nullptr) {
175 HILOG_FATAL("taskpool::HandleTaskResult taskInfo is null");
176 return;
177 }
178 napi_handle_scope scope = nullptr;
179 NAPI_CALL_RETURN_VOID(taskInfo->env, napi_open_handle_scope(taskInfo->env, &scope));
180 napi_value taskData = nullptr;
181 napi_status status = napi_deserialize(taskInfo->env, taskInfo->result, &taskData);
182
183 // tag for trace parse: Task PerformTask End
184 std::string strTrace = "Task PerformTask End: taskId : " + std::to_string(taskInfo->taskId);
185 strTrace += ", executeId : " + std::to_string(taskInfo->executeId);
186 if (taskInfo->isCanceled) {
187 strTrace += ", performResult : IsCanceled";
188 } else if (status != napi_ok) {
189 strTrace += ", performResult : DeserializeFailed";
190 } else if (taskInfo->success) {
191 strTrace += ", performResult : Successful";
192 } else {
193 strTrace += ", performResult : Unsuccessful";
194 }
195 HITRACE_HELPER_METER_NAME(strTrace);
196
197 bool success = status == napi_ok && !taskInfo->isCanceled && taskInfo->success;
198 if (taskData == nullptr) {
199 napi_get_undefined(taskInfo->env, &taskData);
200 }
201 if (taskInfo->groupExecuteId == 0) {
202 if (success) {
203 napi_resolve_deferred(taskInfo->env, taskInfo->deferred, taskData);
204 } else {
205 napi_reject_deferred(taskInfo->env, taskInfo->deferred, taskData);
206 }
207 } else {
208 UpdateGroupInfoByResult(taskInfo->env, taskInfo, taskData, success);
209 }
210 NAPI_CALL_RETURN_VOID(taskInfo->env, napi_close_handle_scope(taskInfo->env, scope));
211 TaskManager::GetInstance().ReleaseTaskContent(taskInfo);
212 }
213
UpdateGroupInfoByResult(napi_env env,TaskInfo * taskInfo,napi_value res,bool success)214 void TaskPool::UpdateGroupInfoByResult(napi_env env, TaskInfo* taskInfo, napi_value res, bool success)
215 {
216 uint32_t groupExecuteId = taskInfo->groupExecuteId;
217 bool isRunning = TaskGroupManager::GetInstance().IsRunning(groupExecuteId);
218 if (!isRunning) {
219 return;
220 }
221 GroupInfo* groupInfo = TaskGroupManager::GetInstance().GetGroupInfoByExecutionId(groupExecuteId);
222 if (groupInfo == nullptr) {
223 return;
224 }
225 uint32_t headId = *groupInfo->executeIds.begin();
226 uint32_t index = taskInfo->executeId - headId;
227 if (success) {
228 // Update res at resArr
229 napi_ref arrRef = groupInfo->resArr;
230 napi_value resArr = NapiHelper::GetReferenceValue(env, arrRef);
231 napi_set_element(env, resArr, index, res);
232
233 groupInfo->finishedTask++;
234 if (groupInfo->finishedTask < groupInfo->taskNum) {
235 return;
236 }
237 napi_resolve_deferred(env, groupInfo->deferred, resArr);
238 } else {
239 napi_value undefined = nullptr;
240 napi_get_undefined(env, &undefined);
241 napi_reject_deferred(env, groupInfo->deferred, undefined);
242 }
243 TaskGroupManager::GetInstance().RemoveExecuteId(groupInfo->groupId, groupExecuteId);
244 TaskGroupManager::GetInstance().ClearGroupInfo(env, groupExecuteId, groupInfo);
245 }
246
ExecuteFunction(napi_env env,TaskInfo * taskInfo,Priority priority)247 void TaskPool::ExecuteFunction(napi_env env, TaskInfo* taskInfo, Priority priority)
248 {
249 uint32_t executeId = taskInfo->executeId;
250 taskInfo->priority = priority;
251 // tag for trace parse: Task Allocation
252 std::string strTrace = "Task Allocation: taskId : " + std::to_string(taskInfo->taskId)
253 + ", executeId : " + std::to_string(executeId)
254 + ", priority : " + std::to_string(priority)
255 + ", executeState : " + std::to_string(ExecuteState::WAITING);
256 HITRACE_HELPER_METER_NAME(strTrace);
257 TaskManager::GetInstance().AddExecuteState(executeId);
258 TaskManager::GetInstance().EnqueueExecuteId(executeId, priority);
259 TaskManager::GetInstance().TryTriggerLoadBalance();
260 }
261
Cancel(napi_env env,napi_callback_info cbinfo)262 napi_value TaskPool::Cancel(napi_env env, napi_callback_info cbinfo)
263 {
264 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
265 size_t argc = 1;
266 napi_value args[1];
267 napi_get_cb_info(env, cbinfo, &argc, args, nullptr, nullptr);
268 if (argc < 1) {
269 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: the number of the params must be one");
270 return nullptr;
271 }
272
273 if (!NapiHelper::IsObject(args[0])) {
274 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: the type of the params must be object");
275 return nullptr;
276 }
277
278 if (!NapiHelper::HasNameProperty(env, args[0], GROUP_ID_STR)) {
279 napi_value taskId = NapiHelper::GetNameProperty(env, args[0], TASKID_STR);
280 if (taskId == nullptr) {
281 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: the type of the params must be task");
282 return nullptr;
283 }
284 uint32_t id = NapiHelper::GetUint32Value(env, taskId);
285 TaskManager::GetInstance().CancelTask(env, id);
286 } else {
287 napi_value groupIdVal = NapiHelper::GetNameProperty(env, args[0], GROUP_ID_STR);
288 if (groupIdVal == nullptr) {
289 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR,
290 "taskpool:: the type of the params must be taskGroup");
291 return nullptr;
292 }
293 uint32_t groupId = NapiHelper::GetUint32Value(env, groupIdVal);
294 TaskGroupManager::GetInstance().CancelGroup(env, groupId);
295 TaskGroupManager::GetInstance().ClearExecuteId(groupId);
296 }
297 return nullptr;
298 }
299 } // namespace Commonlibrary::Concurrent::TaskPoolModule