• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2025 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_group_manager.h"
17 
18 #include "helper/hitrace_helper.h"
19 
20 namespace Commonlibrary::Concurrent::TaskPoolModule {
GetInstance()21 TaskGroupManager& TaskGroupManager::GetInstance()
22 {
23     static TaskGroupManager groupManager;
24     return groupManager;
25 }
26 
AddTask(uint64_t groupId,napi_ref taskRef,uint32_t taskId)27 void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint32_t taskId)
28 {
29     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
30     auto groupIter = taskGroups_.find(groupId);
31     if (groupIter == taskGroups_.end()) {
32         HILOG_DEBUG("taskpool:: taskGroup has been released");
33         return;
34     }
35     auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
36     if (taskGroup == nullptr) {
37         HILOG_ERROR("taskpool:: taskGroup is null");
38         return;
39     }
40     taskGroup->taskRefs_.push_back(taskRef);
41     taskGroup->taskNum_++;
42     taskGroup->taskIds_.push_back(taskId);
43 }
44 
ReleaseTaskGroupData(napi_env env,TaskGroup * group)45 void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
46 {
47     HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group");
48     TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
49     {
50         std::lock_guard<std::recursive_mutex> lock(group->taskGroupMutex_);
51         if (group->onRejectResultSignal_ != nullptr) {
52             if (!ConcurrentHelper::IsUvClosing(group->onRejectResultSignal_)) {
53                 ConcurrentHelper::UvHandleClose(group->onRejectResultSignal_);
54             } else {
55                 delete group->onRejectResultSignal_;
56                 group->onRejectResultSignal_ = nullptr;
57             }
58         }
59         if (group->isValid_) {
60             for (uint32_t taskId : group->taskIds_) {
61                 Task* task = TaskManager::GetInstance().GetTask(taskId);
62                 if (task == nullptr || !task->IsValid()) {
63                     continue;
64                 }
65                 napi_reference_unref(task->env_, task->taskRef_, nullptr);
66             }
67         }
68         if (group->currentGroupInfo_ != nullptr) {
69             delete group->currentGroupInfo_;
70             group->currentGroupInfo_ = nullptr;
71         }
72     }
73     group->CancelPendingGroup(env);
74 }
75 
CancelGroup(napi_env env,uint64_t groupId)76 void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
77 {
78     std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId);
79     HITRACE_HELPER_METER_NAME(strTrace);
80     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
81     TaskGroup* taskGroup = GetTaskGroup(groupId);
82     if (taskGroup == nullptr) {
83         HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
84         return;
85     }
86     if (taskGroup->groupState_ == ExecuteState::CANCELED) {
87         return;
88     }
89     {
90         std::lock_guard<std::recursive_mutex> lock(taskGroup->taskGroupMutex_);
91         if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
92             taskGroup->groupState_ == ExecuteState::FINISHED) {
93             std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
94             HILOG_ERROR("%{public}s", errMsg.c_str());
95             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
96             return;
97         }
98     }
99     ExecuteState groupState = taskGroup->groupState_;
100     taskGroup->groupState_ = ExecuteState::CANCELED;
101     taskGroup->CancelPendingGroup(env);
102     std::lock_guard<std::recursive_mutex> lock(taskGroup->taskGroupMutex_);
103     if (taskGroup->currentGroupInfo_->finishedTaskNum != taskGroup->taskNum_) {
104         for (uint32_t taskId : taskGroup->taskIds_) {
105             CancelGroupTask(env, taskId, taskGroup);
106         }
107         if (taskGroup->currentGroupInfo_->finishedTaskNum == taskGroup->taskNum_) {
108             napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
109             taskGroup->RejectResult(env, error);
110             return;
111         }
112     }
113     if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) {
114         auto engine = reinterpret_cast<NativeEngine*>(env);
115         for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) {
116             engine->DecreaseSubEnvCounter();
117         }
118         napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
119         taskGroup->RejectResult(env, error);
120     }
121 }
122 
CancelGroupTask(napi_env env,uint32_t taskId,TaskGroup * group)123 void TaskGroupManager::CancelGroupTask(napi_env env, uint32_t taskId, TaskGroup* group)
124 {
125     HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str());
126     auto task = TaskManager::GetInstance().GetTask(taskId);
127     if (task == nullptr) {
128         HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
129         return;
130     }
131     std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
132     if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr &&
133         TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) {
134         reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
135         task->DecreaseTaskRefCount();
136         TaskManager::GetInstance().DecreaseRefCount(env, taskId);
137         delete task->currentTaskInfo_;
138         task->currentTaskInfo_ = nullptr;
139         if (group->currentGroupInfo_ != nullptr) {
140             group->currentGroupInfo_->finishedTaskNum++;
141         }
142     }
143     task->taskState_ = ExecuteState::CANCELED;
144 }
145 
StoreTaskGroup(uint64_t groupId,TaskGroup * taskGroup)146 void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
147 {
148     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
149     taskGroups_.emplace(groupId, taskGroup);
150 }
151 
RemoveTaskGroup(uint64_t groupId)152 void TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
153 {
154     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
155     taskGroups_.erase(groupId);
156 }
157 
GetTaskGroup(uint64_t groupId)158 TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
159 {
160     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
161     auto groupIter = taskGroups_.find(groupId);
162     if (groupIter == taskGroups_.end()) {
163         return nullptr;
164     }
165     return reinterpret_cast<TaskGroup*>(groupIter->second);
166 }
167 
UpdateGroupState(uint64_t groupId)168 bool TaskGroupManager::UpdateGroupState(uint64_t groupId)
169 {
170     HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str());
171     // During the modification process of the group, prevent other sub threads from performing other
172     // operations on the group pointer, which may cause the modification to fail.
173     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
174     auto groupIter = taskGroups_.find(groupId);
175     if (groupIter == taskGroups_.end()) {
176         return false;
177     }
178     TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second);
179     if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) {
180         HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled");
181         return false;
182     }
183     group->groupState_ = ExecuteState::RUNNING;
184     return true;
185 }
186 } // namespace Commonlibrary::Concurrent::TaskPoolModule