• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2023 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_group.h"
17 
18 #include "helper/concurrent_helper.h"
19 #include "task_group_manager.h"
20 
21 namespace Commonlibrary::Concurrent::TaskPoolModule {
22 using namespace Commonlibrary::Concurrent::Common::Helper;
23 
TaskGroupConstructor(napi_env env,napi_callback_info cbinfo)24 napi_value TaskGroup::TaskGroupConstructor(napi_env env, napi_callback_info cbinfo)
25 {
26     size_t argc = 1;
27     napi_value args[1];
28     napi_value thisVar;
29     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
30     if (argc > 1) {
31         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be zero or one.");
32         return nullptr;
33     }
34     napi_value name;
35     if (argc == 1) {
36         // check 1st param is taskGroupName
37         if (!NapiHelper::IsString(env, args[0])) {
38             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the first param must be string.");
39             return nullptr;
40         }
41         name = args[0];
42     } else {
43         name = NapiHelper::CreateEmptyString(env);
44     }
45     TaskGroup* group = new TaskGroup(env);
46     uint64_t groupId = reinterpret_cast<uint64_t>(group);
47     group->groupId_ = groupId;
48     TaskGroupManager::GetInstance().StoreTaskGroup(groupId, group);
49     group->InitHandle(env);
50     napi_value napiGroupId = NapiHelper::CreateUint64(env, groupId);
51     napi_property_descriptor properties[] = {
52         DECLARE_NAPI_PROPERTY(GROUP_ID_STR, napiGroupId),
53         DECLARE_NAPI_FUNCTION_WITH_DATA("addTask", AddTask, thisVar),
54     };
55     napi_set_named_property(env, thisVar, NAME, name);
56     napi_define_properties(env, thisVar, sizeof(properties) / sizeof(properties[0]), properties);
57     napi_status status = napi_wrap(env, thisVar, group, TaskGroupDestructor, nullptr, nullptr);
58     if (status != napi_ok) {
59         HILOG_ERROR("taskpool::TaskGroupConstructor napi_wrap return value is %{public}d", status);
60         TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
61         delete group;
62         group = nullptr;
63         return nullptr;
64     }
65     napi_create_reference(env, thisVar, 0, &group->groupRef_);
66     napi_add_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group);
67     return thisVar;
68 }
69 
TaskGroupDestructor(napi_env env,void * data,void * hint)70 void TaskGroup::TaskGroupDestructor(napi_env env, void* data, [[maybe_unused]] void* hint)
71 {
72     HILOG_DEBUG("taskpool::TaskGroupDestructor");
73     TaskGroup* group = static_cast<TaskGroup*>(data);
74     napi_remove_env_cleanup_hook(env, TaskGroup::HostEnvCleanupHook, group);
75     TaskGroupManager::GetInstance().ReleaseTaskGroupData(env, group);
76     napi_delete_reference(env, group->groupRef_);
77     delete group;
78 }
79 
AddTask(napi_env env,napi_callback_info cbinfo)80 napi_value TaskGroup::AddTask(napi_env env, napi_callback_info cbinfo)
81 {
82     size_t argc = NapiHelper::GetCallbackInfoArgc(env, cbinfo);
83     std::string errMessage = "";
84     if (argc < 1) {
85         errMessage = "taskGroup:: the number of params must be at least one";
86         HILOG_ERROR("%{public}s", errMessage.c_str());
87         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the number of params must be at least one.");
88         return nullptr;
89     }
90     napi_value* args = new napi_value[argc];
91     ObjectScope<napi_value> scope(args, true);
92     napi_value thisVar;
93     napi_get_cb_info(env, cbinfo, &argc, args, &thisVar, nullptr);
94     napi_value napiGroupId = NapiHelper::GetNameProperty(env, thisVar, GROUP_ID_STR);
95     uint64_t groupId = NapiHelper::GetUint64Value(env, napiGroupId);
96     TaskGroup* group = TaskGroupManager::GetInstance().GetTaskGroup(groupId);
97     if (group == nullptr || group->groupState_ != ExecuteState::NOT_FOUND) {
98         errMessage = "taskpool:: executed taskGroup cannot addTask";
99         HILOG_ERROR("%{public}s", errMessage.c_str());
100         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, errMessage.c_str());
101         return nullptr;
102     }
103     napi_valuetype type = napi_undefined;
104     napi_typeof(env, args[0], &type);
105     if (type == napi_object) {
106         Task* task = nullptr;
107         napi_unwrap(env, args[0], reinterpret_cast<void**>(&task));
108         if (task == nullptr) {
109             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the params must be task.");
110             return nullptr;
111         }
112         if (!task->CanForTaskGroup(env)) {
113             return nullptr;
114         }
115         task->taskType_ = TaskType::GROUP_COMMON_TASK;
116         task->groupId_ = groupId;
117         napi_reference_ref(env, task->taskRef_, nullptr);
118         TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_);
119         return nullptr;
120     } else if (type == napi_function) {
121         napi_value napiTask = NapiHelper::CreateObject(env);
122         Task* task = Task::GenerateFunctionTask(env, args[0], args + 1, argc - 1, TaskType::GROUP_FUNCTION_TASK);
123         if (task == nullptr) {
124             return nullptr;
125         }
126         task->groupId_ = groupId;
127         napi_status status = napi_wrap(env, napiTask, task, Task::TaskDestructor, nullptr, nullptr);
128         if (status != napi_ok) {
129             HILOG_ERROR("taskpool::AddTask napi_wrap return value is %{public}d", status);
130             TaskManager::GetInstance().RemoveTask(task->taskId_);
131             delete task;
132             task = nullptr;
133             return nullptr;
134         }
135         napi_create_reference(env, napiTask, 1, &task->taskRef_);
136         TaskGroupManager::GetInstance().AddTask(groupId, task->taskRef_, task->taskId_);
137         return nullptr;
138     }
139     ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "the type of the first param must be object or function.");
140     return nullptr;
141 }
142 
HostEnvCleanupHook(void * data)143 void TaskGroup::HostEnvCleanupHook(void* data)
144 {
145     if (data == nullptr) {
146         HILOG_ERROR("taskpool:: taskGroup cleanupHook arg is nullptr");
147         return;
148     }
149     TaskGroup* group = static_cast<TaskGroup*>(data);
150     std::lock_guard<std::recursive_mutex> lock(group->taskGroupMutex_);
151     ConcurrentHelper::UvHandleClose(group->onRejectResultSignal_);
152     group->isValid_ = false;
153 }
154 
StartRejectResult(const uv_async_t * req)155 void TaskGroup::StartRejectResult(const uv_async_t* req)
156 {
157     auto* group = static_cast<TaskGroup*>(req->data);
158     if (group == nullptr) {
159         HILOG_DEBUG("taskpool::StartRejectResult group is nullptr");
160         return;
161     }
162     napi_status status = napi_ok;
163     HandleScope scope(group->env_, status);
164     if (status != napi_ok) {
165         HILOG_ERROR("taskpool:: napi_open_handle_scope failed");
166         return;
167     }
168     group->RejectResult(group->env_);
169 }
170 
GetTaskIndex(uint32_t taskId)171 uint32_t TaskGroup::GetTaskIndex(uint32_t taskId)
172 {
173     uint32_t index = 0;
174     for (uint32_t id : taskIds_) {
175         if (taskId == id) {
176             break;
177         }
178         index++;
179     }
180     return index;
181 }
182 
NotifyGroupTask(napi_env env)183 void TaskGroup::NotifyGroupTask(napi_env env)
184 {
185     HILOG_DEBUG("taskpool:: NotifyGroupTask");
186     std::lock_guard<std::recursive_mutex> lock(taskGroupMutex_);
187     if (pendingGroupInfos_.empty()) {
188         return;
189     }
190     groupState_ = ExecuteState::WAITING;
191     currentGroupInfo_ = pendingGroupInfos_.front();
192     pendingGroupInfos_.pop_front();
193     for (auto iter = taskRefs_.begin(); iter != taskRefs_.end(); iter++) {
194         napi_value napiTask = NapiHelper::GetReferenceValue(env, *iter);
195         Task* task = nullptr;
196         napi_unwrap(env, napiTask, reinterpret_cast<void**>(&task));
197         if (task == nullptr) {
198             HILOG_ERROR("taskpool::ExecuteGroup task is nullptr");
199             return;
200         }
201         napi_reference_ref(env, task->taskRef_, nullptr);
202         Priority priority = currentGroupInfo_->priority;
203         if (task->IsGroupCommonTask()) {
204             task->GetTaskInfo(env, napiTask, priority);
205         } else {
206             reinterpret_cast<NativeEngine*>(env)->IncreaseSubEnvCounter();
207         }
208         task->IncreaseRefCount();
209         TaskManager::GetInstance().IncreaseRefCount(task->taskId_);
210         task->taskState_ = ExecuteState::WAITING;
211         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, priority);
212     }
213 }
214 
CancelPendingGroup(napi_env env)215 void TaskGroup::CancelPendingGroup(napi_env env)
216 {
217     HILOG_DEBUG("taskpool:: CancelPendingGroup");
218     std::list<napi_deferred> deferreds {};
219     {
220         std::lock_guard<std::recursive_mutex> lock(taskGroupMutex_);
221         if (pendingGroupInfos_.empty()) {
222             return;
223         }
224         auto pendingIter = pendingGroupInfos_.begin();
225         auto engine = reinterpret_cast<NativeEngine*>(env);
226         for (; pendingIter != pendingGroupInfos_.end(); ++pendingIter) {
227             for (size_t i = 0; i < taskIds_.size(); i++) {
228                 engine->DecreaseSubEnvCounter();
229             }
230             GroupInfo* info = *pendingIter;
231             deferreds.push_back(info->deferred);
232             napi_reference_unref(env, groupRef_, nullptr);
233             delete info;
234         }
235         pendingIter = pendingGroupInfos_.begin();
236         pendingGroupInfos_.erase(pendingIter, pendingGroupInfos_.end());
237     }
238     TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, "taskpool:: taskGroup has been canceled");
239 }
240 
CancelGroupTask(napi_env env,uint32_t taskId)241 void TaskGroup::CancelGroupTask(napi_env env, uint32_t taskId)
242 {
243     TaskGroupManager::GetInstance().CancelGroupTask(env_, taskId, this);
244     if (IsSameEnv(env)) {
245         RejectResult(env);
246         return;
247     }
248     TriggerRejectResult();
249 }
250 
RejectResult(napi_env env,napi_value res)251 void TaskGroup::RejectResult(napi_env env, napi_value res)
252 {
253     napi_reject_deferred(env, currentGroupInfo_->deferred, res);
254     napi_delete_reference(env, currentGroupInfo_->resArr);
255     napi_reference_unref(env, groupRef_, nullptr);
256     delete currentGroupInfo_;
257     currentGroupInfo_ = nullptr;
258 }
259 
RejectResult(napi_env env)260 void TaskGroup::RejectResult(napi_env env)
261 {
262     std::list<napi_deferred> deferreds {};
263     {
264         std::lock_guard<std::recursive_mutex> lock(taskGroupMutex_);
265         if (currentGroupInfo_ != nullptr && currentGroupInfo_->finishedTaskNum == taskNum_) {
266             deferreds.push_back(currentGroupInfo_->deferred);
267             napi_delete_reference(env, currentGroupInfo_->resArr);
268             napi_reference_unref(env, groupRef_, nullptr);
269             delete currentGroupInfo_;
270             currentGroupInfo_ = nullptr;
271         }
272     }
273     std::string error = "taskpool:: taskGroup has been canceled";
274     TaskManager::GetInstance().BatchRejectDeferred(env, deferreds, error);
275 }
276 
InitHandle(napi_env env)277 void TaskGroup::InitHandle(napi_env env)
278 {
279     uv_loop_t* loop = NapiHelper::GetLibUV(env);
280     ConcurrentHelper::UvHandleInit(loop, onRejectResultSignal_, TaskGroup::StartRejectResult, this);
281 }
282 
TriggerRejectResult()283 void TaskGroup::TriggerRejectResult()
284 {
285     std::lock_guard<std::recursive_mutex> lock(taskGroupMutex_);
286     ConcurrentHelper::UvCheckAndAsyncSend(onRejectResultSignal_);
287 }
288 
IsSameEnv(napi_env env)289 bool TaskGroup::IsSameEnv(napi_env env)
290 {
291     return env_ == env;
292 }
293 } // namespace Commonlibrary::Concurrent::TaskPoolModule