1 /*
2 * Copyright (c) 2023 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_group.h"
17
18 #include "helper/concurrent_helper.h"
19 #include "task_group_manager.h"
20
21 namespace Commonlibrary::Concurrent::TaskPoolModule {
22 using namespace Commonlibrary::Concurrent::Common::Helper;
23
TaskGroupConstructor(napi_env env,napi_callback_info cbinfo)24 napi_value TaskGroup::TaskGroupConstructor(napi_env env, napi_callback_info cbinfo)
25 {
26 size_t argc = 1;
27 napi_value args[1];
28 napi_value thisVar;
29 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
30 if (argc > 1) {
31 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be zero or one.");
32 return nullptr;
33 }
34 napi_value name;
35 if (argc == 1) {
36 // check 1st param is taskGroupName
37 if (!NapiHelper::IsString(env, args[0])) {
38 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the first param must be string.");
39 return nullptr;
40 }
41 name = args[0];
42 } else {
43 name = NapiHelper::CreateEmptyString(env);
44 }
45 TaskGroup* group = new TaskGroup(env);
46 uint64_t groupId = reinterpret_cast<uint64_t>(group);
47 group->groupId_ = groupId;
48 TaskGroupManager::GetInstance().StoreTaskGroup(groupId, group);
49 group->InitHandle(env);
50 napi_value napiGroupId = NapiHelper::CreateUint64(env, groupId);
51 napi_property_descriptor properties[] = {
52 DECLARE_NAPI_PROPERTY(GROUP_ID_STR, napiGroupId),
53 DECLARE_NAPI_FUNCTION_WITH_DATA("addTask", AddTask, thisVar),
54 };
55 napi_set_named_property(env, thisVar, NAME, name);
56 napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties);
57 napi_status status = napi_wrap(env, thisVar, group, TaskGroupDestructor, nullptr, nullptr);
58 if (status != napi_ok) {
59 HILOG_ERROR("taskpool::TaskGroupConstructor napi_wrap return value is %{public}d", status);
60 TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
61 delete group;
62 group = nullptr;
63 return nullptr;
64 }
65 napi_create_reference(env, thisVar, 0, &group->groupRef_);
66 napi_add_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group);
67 return thisVar;
68 }
69
TaskGroupDestructor(napi_env env,void * data,void * hint)70 void TaskGroup::TaskGroupDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
71 {
72 HILOG_DEBUG("taskpool::TaskGroupDestructor");
73 TaskGroup* group = static_cast<TaskGroup*>(data);
74 napi_remove_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group);
75 TaskGroupManager::GetInstance().ReleaseTaskGroupData(env, group);
76 napi_delete_reference(env, group->groupRef_);
77 delete group;
78 }
79
AddTask(napi_env env,napi_callback_info cbinfo)80 napi_value TaskGroup::AddTask(napi_env env, napi_callback_info cbinfo)
81 {
82 size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
83 std::string errMessage = "";
84 if (argc < 1) {
85 errMessage = "taskGroup:: the number of params must be at least one";
86 HILOG_ERROR("%{public}s", errMessage.c_str());
87 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one.");
88 return nullptr;
89 }
90 napi_value* args = new napi_value[argc];
91 ObjectScope<napi_value> scope(args, true);
92 napi_value thisVar;
93 napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
94 napi_value napiGroupId = NapiHelper::GetNameProperty(env, thisVar, GROUP_ID_STR);
95 uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
96 TaskGroup* group = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
97 if (group == nullptr || group->groupState_ != ExecuteState::NOT_FOUND) {
98 errMessage = "taskpool:: executed taskGroup cannot addTask";
99 HILOG_ERROR("%{public}s", errMessage.c_str());
100 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
101 return nullptr;
102 }
103 napi_valuetype type = napi_undefined;
104 napi_typeof(env, args[0], &type);
105 if (type == napi_object) {
106 Task* task = nullptr;
107 napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
108 if (task == nullptr) {
109 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task.");
110 return nullptr;
111 }
112 if (!task->CanForTaskGroup(env)) {
113 return nullptr;
114 }
115 task->taskType_ = TaskType::GROUP_COMMON_TASK;
116 task->groupId_ = groupId;
117 napi_reference_ref(env, task->taskRef_, nullptr);
118 TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_);
119 return nullptr;
120 } else if (type == napi_function) {
121 napi_value napiTask = NapiHelper::CreateObject(env);
122 Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::GROUP_FUNCTION_TASK);
123 if (task == nullptr) {
124 return nullptr;
125 }
126 task->groupId_ = groupId;
127 napi_status status = napi_wrap(env, napiTask, task, Task::TaskDestructor, nullptr, nullptr);
128 if (status != napi_ok) {
129 HILOG_ERROR("taskpool::AddTask napi_wrap return value is %{public}d", status);
130 TaskManager::GetInstance().RemoveTask(task->taskId_);
131 delete task;
132 task = nullptr;
133 return nullptr;
134 }
135 napi_create_reference(env, napiTask, 1, &task->taskRef_);
136 TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_);
137 return nullptr;
138 }
139 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be object or function.");
140 return nullptr;
141 }
142
HostEnvCleanupHook(void * data)143 void TaskGroup::HostEnvCleanupHook(void* data)
144 {
145 if (data == nullptr) {
146 HILOG_ERROR("taskpool:: taskGroup cleanupHook arg is nullptr");
147 return;
148 }
149 TaskGroup* group = static_cast<TaskGroup*>(data);
150 std::lock_guard<std::recursive_mutex> lock(group->taskGroupMutex_);
151 ConcurrentHelper::UvHandleClose(group->onRejectResultSignal_);
152 group->isValid_ = false;
153 }
154
StartRejectResult(const uv_async_t * req)155 void TaskGroup::StartRejectResult(const uv_async_t* req)
156 {
157 auto* group = static_cast<TaskGroup*>(req->data);
158 if (group == nullptr) {
159 HILOG_DEBUG("taskpool::StartRejectResult group is nullptr");
160 return;
161 }
162 napi_status status = napi_ok;
163 HandleScope scope(group->env_, status);
164 if (status != napi_ok) {
165 HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
166 return;
167 }
168 group->RejectResult(group->env_);
169 }
170
GetTaskIndex(uint32_t taskId)171 uint32_t TaskGroup::GetTaskIndex(uint32_t taskId)
172 {
173 uint32_t index = 0;
174 for (uint32_t id : taskIds_) {
175 if (taskId == id) {
176 break;
177 }
178 index++;
179 }
180 return index;
181 }
182
NotifyGroupTask(napi_env env)183 void TaskGroup::NotifyGroupTask(napi_env env)
184 {
185 HILOG_DEBUG("taskpool:: NotifyGroupTask");
186 std::lock_guard<std::recursive_mutex> lock(taskGroupMutex_);
187 if (pendingGroupInfos_.empty()) {
188 return;
189 }
190 groupState_ = ExecuteState::WAITING;
191 currentGroupInfo_ = pendingGroupInfos_.front();
192 pendingGroupInfos_.pop_front();
193 for (auto iter = taskRefs_.begin(); iter != taskRefs_.end(); iter++) {
194 napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
195 Task* task = nullptr;
196 napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
197 if (task == nullptr) {
198 HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
199 return;
200 }
201 napi_reference_ref(env, task->taskRef_, nullptr);
202 Priority priority = currentGroupInfo_->priority;
203 if (task->IsGroupCommonTask()) {
204 task->GetTaskInfo(env, napiTask, priority);
205 } else {
206 reinterpret_cast<NativeEngine*>(env)->IncreaseSubEnvCounter();
207 }
208 task->IncreaseRefCount();
209 TaskManager::GetInstance().IncreaseSendDataRefCount(task->taskId_);
210 task->taskState_ = ExecuteState::WAITING;
211 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
212 }
213 }
214
CancelPendingGroup(napi_env env)215 void TaskGroup::CancelPendingGroup(napi_env env)
216 {
217 HILOG_DEBUG("taskpool:: CancelPendingGroup");
218 std::list<napi_deferred> deferreds {};
219 {
220 std::lock_guard<std::recursive_mutex> lock(taskGroupMutex_);
221 if (pendingGroupInfos_.empty()) {
222 return;
223 }
224 auto pendingIter = pendingGroupInfos_.begin();
225 auto engine = reinterpret_cast<NativeEngine*>(env);
226 for (; pendingIter != pendingGroupInfos_.end(); ++pendingIter) {
227 for (size_t i = 0; i < taskIds_.size(); i++) {
228 engine->DecreaseSubEnvCounter();
229 }
230 GroupInfo* info = *pendingIter;
231 deferreds.push_back(info->deferred);
232 napi_reference_unref(env, groupRef_, nullptr);
233 delete info;
234 }
235 pendingIter = pendingGroupInfos_.begin();
236 pendingGroupInfos_.erase(pendingIter, pendingGroupInfos_.end());
237 }
238 TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, "taskpool:: taskGroup has been canceled");
239 }
240
CancelGroupTask(napi_env env,uint32_t taskId)241 void TaskGroup::CancelGroupTask(napi_env env, uint32_t taskId)
242 {
243 TaskGroupManager::GetInstance().CancelGroupTask(env_, taskId, this);
244 if (IsSameEnv(env)) {
245 RejectResult(env);
246 return;
247 }
248 TriggerRejectResult();
249 }
250
RejectResult(napi_env env,napi_value res)251 void TaskGroup::RejectResult(napi_env env, napi_value res)
252 {
253 napi_reject_deferred(env, currentGroupInfo_->deferred, res);
254 napi_delete_reference(env, currentGroupInfo_->resArr);
255 napi_reference_unref(env, groupRef_, nullptr);
256 delete currentGroupInfo_;
257 currentGroupInfo_ = nullptr;
258 }
259
RejectResult(napi_env env)260 void TaskGroup::RejectResult(napi_env env)
261 {
262 std::list<napi_deferred> deferreds {};
263 {
264 std::lock_guard<std::recursive_mutex> lock(taskGroupMutex_);
265 if (currentGroupInfo_ != nullptr && currentGroupInfo_->finishedTaskNum == taskNum_) {
266 deferreds.push_back(currentGroupInfo_->deferred);
267 napi_delete_reference(env, currentGroupInfo_->resArr);
268 napi_reference_unref(env, groupRef_, nullptr);
269 delete currentGroupInfo_;
270 currentGroupInfo_ = nullptr;
271 }
272 }
273 std::string error = "taskpool:: taskGroup has been canceled";
274 TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, error);
275 }
276
InitHandle(napi_env env)277 void TaskGroup::InitHandle(napi_env env)
278 {
279 uv_loop_t* loop = NapiHelper::GetLibUV(env);
280 ConcurrentHelper::UvHandleInit(loop, onRejectResultSignal_, TaskGroup::StartRejectResult, this);
281 }
282
TriggerRejectResult()283 void TaskGroup::TriggerRejectResult()
284 {
285 std::lock_guard<std::recursive_mutex> lock(taskGroupMutex_);
286 ConcurrentHelper::UvCheckAndAsyncSend(onRejectResultSignal_);
287 }
288
IsSameEnv(napi_env env)289 bool TaskGroup::IsSameEnv(napi_env env)
290 {
291 return env_ == env;
292 }
293 } // namespace Commonlibrary::Concurrent::TaskPoolModule