1 /*
2 * Copyright (c) 2025 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_group_manager.h"
17
18 #include "helper/hitrace_helper.h"
19
20 namespace Commonlibrary::Concurrent::TaskPoolModule {
GetInstance()21 TaskGroupManager& TaskGroupManager::GetInstance()
22 {
23 static TaskGroupManager groupManager;
24 return groupManager;
25 }
26
AddTask(uint64_t groupId,napi_ref taskRef,uint32_t taskId)27 void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint32_t taskId)
28 {
29 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
30 auto groupIter = taskGroups_.find(groupId);
31 if (groupIter == taskGroups_.end()) {
32 HILOG_DEBUG("taskpool:: taskGroup has been released");
33 return;
34 }
35 auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
36 if (taskGroup == nullptr) {
37 HILOG_ERROR("taskpool:: taskGroup is null");
38 return;
39 }
40 taskGroup->taskRefs_.push_back(taskRef);
41 taskGroup->taskNum_++;
42 taskGroup->taskIds_.push_back(taskId);
43 }
44
ReleaseTaskGroupData(napi_env env,TaskGroup * group)45 void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
46 {
47 HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group");
48 TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
49 {
50 std::lock_guard<std::recursive_mutex> lock(group->taskGroupMutex_);
51 if (group->onRejectResultSignal_ != nullptr) {
52 if (!ConcurrentHelper::IsUvClosing(group->onRejectResultSignal_)) {
53 ConcurrentHelper::UvHandleClose(group->onRejectResultSignal_);
54 } else {
55 delete group->onRejectResultSignal_;
56 group->onRejectResultSignal_ = nullptr;
57 }
58 }
59 if (group->isValid_) {
60 for (uint32_t taskId : group->taskIds_) {
61 Task* task = TaskManager::GetInstance().GetTask(taskId);
62 if (task == nullptr || !task->IsValid()) {
63 continue;
64 }
65 napi_reference_unref(task->env_, task->taskRef_, nullptr);
66 }
67 }
68 if (group->currentGroupInfo_ != nullptr) {
69 delete group->currentGroupInfo_;
70 group->currentGroupInfo_ = nullptr;
71 }
72 }
73 group->CancelPendingGroup(env);
74 }
75
CancelGroup(napi_env env,uint64_t groupId)76 void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
77 {
78 std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId);
79 HITRACE_HELPER_METER_NAME(strTrace);
80 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
81 TaskGroup* taskGroup = GetTaskGroup(groupId);
82 if (taskGroup == nullptr) {
83 HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
84 return;
85 }
86 if (taskGroup->groupState_ == ExecuteState::CANCELED) {
87 return;
88 }
89 {
90 std::lock_guard<std::recursive_mutex> lock(taskGroup->taskGroupMutex_);
91 if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
92 taskGroup->groupState_ == ExecuteState::FINISHED) {
93 std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
94 HILOG_ERROR("%{public}s", errMsg.c_str());
95 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
96 return;
97 }
98 }
99 ExecuteState groupState = taskGroup->groupState_;
100 taskGroup->groupState_ = ExecuteState::CANCELED;
101 taskGroup->CancelPendingGroup(env);
102 std::lock_guard<std::recursive_mutex> lock(taskGroup->taskGroupMutex_);
103 if (taskGroup->currentGroupInfo_->finishedTaskNum != taskGroup->taskNum_) {
104 for (uint32_t taskId : taskGroup->taskIds_) {
105 CancelGroupTask(env, taskId, taskGroup);
106 }
107 if (taskGroup->currentGroupInfo_->finishedTaskNum == taskGroup->taskNum_) {
108 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
109 taskGroup->RejectResult(env, error);
110 return;
111 }
112 }
113 if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) {
114 auto engine = reinterpret_cast<NativeEngine*>(env);
115 for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) {
116 engine->DecreaseSubEnvCounter();
117 }
118 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
119 taskGroup->RejectResult(env, error);
120 }
121 }
122
CancelGroupTask(napi_env env,uint32_t taskId,TaskGroup * group)123 void TaskGroupManager::CancelGroupTask(napi_env env, uint32_t taskId, TaskGroup* group)
124 {
125 HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str());
126 auto task = TaskManager::GetInstance().GetTask(taskId);
127 if (task == nullptr) {
128 HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
129 return;
130 }
131 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
132 if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr &&
133 TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) {
134 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
135 task->DecreaseTaskRefCount();
136 TaskManager::GetInstance().DecreaseRefCount(env, taskId);
137 delete task->currentTaskInfo_;
138 task->currentTaskInfo_ = nullptr;
139 if (group->currentGroupInfo_ != nullptr) {
140 group->currentGroupInfo_->finishedTaskNum++;
141 }
142 }
143 task->taskState_ = ExecuteState::CANCELED;
144 }
145
StoreTaskGroup(uint64_t groupId,TaskGroup * taskGroup)146 void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
147 {
148 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
149 taskGroups_.emplace(groupId, taskGroup);
150 }
151
RemoveTaskGroup(uint64_t groupId)152 void TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
153 {
154 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
155 taskGroups_.erase(groupId);
156 }
157
GetTaskGroup(uint64_t groupId)158 TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
159 {
160 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
161 auto groupIter = taskGroups_.find(groupId);
162 if (groupIter == taskGroups_.end()) {
163 return nullptr;
164 }
165 return reinterpret_cast<TaskGroup*>(groupIter->second);
166 }
167
UpdateGroupState(uint64_t groupId)168 bool TaskGroupManager::UpdateGroupState(uint64_t groupId)
169 {
170 HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str());
171 // During the modification process of the group, prevent other sub threads from performing other
172 // operations on the group pointer, which may cause the modification to fail.
173 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
174 auto groupIter = taskGroups_.find(groupId);
175 if (groupIter == taskGroups_.end()) {
176 return false;
177 }
178 TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second);
179 if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) {
180 HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled");
181 return false;
182 }
183 group->groupState_ = ExecuteState::RUNNING;
184 return true;
185 }
186 } // namespace Commonlibrary::Concurrent::TaskPoolModule