1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_manager.h"
17
18 #include "helper/error_helper.h"
19 #include "taskpool.h"
20 #include "utils/log.h"
21 #include "worker.h"
22
23 namespace Commonlibrary::Concurrent::TaskPoolModule {
24 const static int MAX_THREADPOOL_SIZE = 4;
25
26 using namespace Commonlibrary::Concurrent::Common::Helper;
27
GetInstance()28 TaskManager &TaskManager::GetInstance()
29 {
30 static TaskManager manager;
31 return manager;
32 }
33
~TaskManager()34 TaskManager::~TaskManager()
35 {
36 {
37 std::lock_guard<std::mutex> lock(workersMutex_);
38 for (auto &worker : workers_) {
39 delete worker;
40 }
41 workers_.clear();
42 }
43 {
44 std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
45 for (auto iter = taskInfos_.begin(); iter != taskInfos_.end(); iter++) {
46 delete iter->second;
47 iter->second = nullptr;
48 }
49 taskInfos_.clear();
50 }
51 }
52
GenerateTaskId()53 uint32_t TaskManager::GenerateTaskId()
54 {
55 std::unique_lock<std::mutex> lock(idMutex_);
56 return currentTaskId_++;
57 }
58
GenerateExecuteId()59 uint32_t TaskManager::GenerateExecuteId()
60 {
61 std::unique_lock<std::mutex> lock(idMutex_);
62 return currentExecuteId_++;
63 }
64
StoreTaskInfo(uint32_t executeId,TaskInfo * taskInfo)65 void TaskManager::StoreTaskInfo(uint32_t executeId, TaskInfo* taskInfo)
66 {
67 std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
68 taskInfos_.emplace(executeId, taskInfo);
69 }
70
StoreStateInfo(uint32_t executeId,TaskState state)71 void TaskManager::StoreStateInfo(uint32_t executeId, TaskState state)
72 {
73 std::unique_lock<std::shared_mutex> lock(taskStatesMutex_);
74 taskStates_.emplace(executeId, state);
75 }
76
StoreRunningInfo(uint32_t taskId,uint32_t executeId)77 void TaskManager::StoreRunningInfo(uint32_t taskId, uint32_t executeId)
78 {
79 std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
80 auto iter = runningInfos_.find(taskId);
81 if (iter == runningInfos_.end()) {
82 std::list<uint32_t> list {executeId};
83 runningInfos_.emplace(taskId, list);
84 } else {
85 iter->second.push_front(executeId);
86 }
87 }
88
PopTaskInfo(uint32_t executeId)89 TaskInfo* TaskManager::PopTaskInfo(uint32_t executeId)
90 {
91 std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
92 auto iter = taskInfos_.find(executeId);
93 if (iter == taskInfos_.end() || iter->second == nullptr) {
94 return nullptr;
95 }
96
97 TaskInfo* taskInfo = iter->second;
98 // remove the the taskInfo when executed
99 taskInfos_.erase(iter);
100 return taskInfo;
101 }
102
PopRunningInfo(uint32_t taskId,uint32_t executeId)103 void TaskManager::PopRunningInfo(uint32_t taskId, uint32_t executeId)
104 {
105 std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
106 auto iter = runningInfos_.find(taskId);
107 if (iter == runningInfos_.end()) {
108 return;
109 }
110 iter->second.remove(executeId);
111 }
112
QueryState(uint32_t executeId)113 TaskState TaskManager::QueryState(uint32_t executeId)
114 {
115 std::shared_lock<std::shared_mutex> lock(taskStatesMutex_);
116 auto iter = taskStates_.find(executeId);
117 if (iter == taskStates_.end()) {
118 HILOG_ERROR("taskpool:: failed to find the target task");
119 return TaskState::NOT_FOUND;
120 }
121 return iter->second;
122 }
123
UpdateState(uint32_t executeId,TaskState state)124 bool TaskManager::UpdateState(uint32_t executeId, TaskState state)
125 {
126 std::unique_lock<std::shared_mutex> lock(taskStatesMutex_);
127 auto iter = taskStates_.find(executeId);
128 if (iter == taskStates_.end()) {
129 return false;
130 }
131 if (state == TaskState::RUNNING) {
132 iter->second = state;
133 } else {
134 taskStates_.erase(iter);
135 }
136 return true;
137 }
138
CancelTask(napi_env env,uint32_t taskId)139 void TaskManager::CancelTask(napi_env env, uint32_t taskId)
140 {
141 std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
142 auto iter = runningInfos_.find(taskId);
143 if (iter == runningInfos_.end() || iter->second.empty()) {
144 ErrorHelper::ThrowError(env, ErrorHelper::NOTEXIST_ERROR, "taskpool:: can not find the task");
145 return;
146 }
147 int32_t result;
148 for (auto item : iter->second) {
149 TaskState state = QueryState(item);
150 if (state == TaskState::NOT_FOUND) {
151 result = ErrorHelper::NOTEXIST_ERROR;
152 break;
153 }
154 UpdateState(item, TaskState::CANCELED);
155 if (state == TaskState::WAITING) {
156 TaskInfo* taskInfo = PopTaskInfo(item);
157 ReleaseTaskContent(taskInfo);
158 } else {
159 result = ErrorHelper::RUNNING_ERROR;
160 }
161 }
162
163 if (result == ErrorHelper::NOTEXIST_ERROR) {
164 ErrorHelper::ThrowError(env, ErrorHelper::NOTEXIST_ERROR, "taskpool:: can not find the task");
165 } else if (result == ErrorHelper::RUNNING_ERROR) {
166 ErrorHelper::ThrowError(env, ErrorHelper::RUNNING_ERROR, "taskpool:: can not cancel the running task");
167 } else {
168 runningInfos_.erase(iter);
169 }
170 }
171
GenerateTaskInfo(napi_env env,napi_value object,uint32_t taskId,uint32_t executeId)172 TaskInfo* TaskManager::GenerateTaskInfo(napi_env env, napi_value object, uint32_t taskId, uint32_t executeId)
173 {
174 napi_value undefined;
175 napi_get_undefined(env, &undefined);
176 napi_value taskData;
177 napi_status serializeStatus = napi_ok;
178 serializeStatus = napi_serialize(env, object, undefined, &taskData);
179 if (serializeStatus != napi_ok || taskData == nullptr) {
180 ErrorHelper::ThrowError(env, ErrorHelper::WORKERSERIALIZATION_ERROR,
181 "taskpool: failed to serialize message.");
182 return nullptr;
183 }
184 TaskInfo* taskInfo = new (std::nothrow) TaskInfo();
185 taskInfo->env = env;
186 taskInfo->executeId = executeId;
187 taskInfo->serializationData = taskData;
188 taskInfo->taskId = taskId;
189 taskInfo->onResultSignal = new uv_async_t;
190 uv_loop_t* loop = NapiHelper::GetLibUV(env);
191 uv_async_init(loop, taskInfo->onResultSignal, reinterpret_cast<uv_async_cb>(TaskPool::HandleTaskResult));
192 taskInfo->onResultSignal->data = taskInfo;
193
194 StoreTaskInfo(executeId, taskInfo);
195 return taskInfo;
196 }
197
ReleaseTaskContent(TaskInfo * taskInfo)198 void TaskManager::ReleaseTaskContent(TaskInfo* taskInfo)
199 {
200 if (taskInfo == nullptr) {
201 return;
202 }
203 if (taskInfo->onResultSignal != nullptr &&
204 !uv_is_closing(reinterpret_cast<uv_handle_t*>(taskInfo->onResultSignal))) {
205 uv_close(reinterpret_cast<uv_handle_t*>(taskInfo->onResultSignal), [](uv_handle_t* handle) {
206 if (handle != nullptr) {
207 delete reinterpret_cast<uv_async_t*>(handle);
208 handle = nullptr;
209 }
210 });
211 }
212 delete taskInfo;
213 taskInfo = nullptr;
214 }
215
NeedExpandWorker()216 bool TaskManager::NeedExpandWorker()
217 {
218 std::unique_lock<std::mutex> lock(workersMutex_);
219 return workers_.size() < MAX_THREADPOOL_SIZE;
220 }
221
NotifyWorkerIdle(Worker * worker)222 void TaskManager::NotifyWorkerIdle(Worker *worker)
223 {
224 {
225 std::unique_lock<std::mutex> lock(workersMutex_);
226 idleWorkers_.insert(worker);
227 }
228 NotifyExecuteTask();
229 }
230
NotifyWorkerAdded(Worker * worker)231 void TaskManager::NotifyWorkerAdded(Worker *worker)
232 {
233 std::unique_lock<std::mutex> lock(workersMutex_);
234 workers_.insert(worker);
235 }
236
EnqueueTask(std::unique_ptr<Task> task)237 void TaskManager::EnqueueTask(std::unique_ptr<Task> task)
238 {
239 taskQueue_.EnqueueTask(std::move(task));
240 NotifyExecuteTask();
241 }
242
DequeueTask()243 std::unique_ptr<Task> TaskManager::DequeueTask()
244 {
245 return taskQueue_.DequeueTask();
246 }
247
NotifyExecuteTask()248 void TaskManager::NotifyExecuteTask()
249 {
250 if (taskQueue_.IsEmpty()) {
251 return;
252 }
253
254 std::unique_lock<std::mutex> lock(workersMutex_);
255 if (idleWorkers_.empty()) {
256 return;
257 }
258 auto candidator = idleWorkers_.begin();
259 Worker *worker = *candidator;
260 idleWorkers_.erase(candidator);
261 worker->NotifyExecuteTask();
262 }
263
InitTaskRunner(napi_env env)264 void TaskManager::InitTaskRunner(napi_env env)
265 {
266 if (NeedExpandWorker()) {
267 auto worker = Worker::WorkerConstructor(env);
268 NotifyWorkerAdded(worker);
269 }
270 }
271 } // namespace Commonlibrary::Concurrent::TaskPoolModule