• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_manager.h"
17 
18 #include "helper/error_helper.h"
19 #include "taskpool.h"
20 #include "utils/log.h"
21 #include "worker.h"
22 
23 namespace Commonlibrary::Concurrent::TaskPoolModule {
24 const static int MAX_THREADPOOL_SIZE = 4;
25 
26 using namespace Commonlibrary::Concurrent::Common::Helper;
27 
GetInstance()28 TaskManager &TaskManager::GetInstance()
29 {
30     static TaskManager manager;
31     return manager;
32 }
33 
~TaskManager()34 TaskManager::~TaskManager()
35 {
36     {
37         std::lock_guard<std::mutex> lock(workersMutex_);
38         for (auto &worker : workers_) {
39             delete worker;
40         }
41         workers_.clear();
42     }
43     {
44         std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
45         for (auto iter = taskInfos_.begin(); iter != taskInfos_.end(); iter++) {
46             delete iter->second;
47             iter->second = nullptr;
48         }
49         taskInfos_.clear();
50     }
51 }
52 
GenerateTaskId()53 uint32_t TaskManager::GenerateTaskId()
54 {
55     std::unique_lock<std::mutex> lock(idMutex_);
56     return currentTaskId_++;
57 }
58 
GenerateExecuteId()59 uint32_t TaskManager::GenerateExecuteId()
60 {
61     std::unique_lock<std::mutex> lock(idMutex_);
62     return currentExecuteId_++;
63 }
64 
StoreTaskInfo(uint32_t executeId,TaskInfo * taskInfo)65 void TaskManager::StoreTaskInfo(uint32_t executeId, TaskInfo* taskInfo)
66 {
67     std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
68     taskInfos_.emplace(executeId, taskInfo);
69 }
70 
StoreStateInfo(uint32_t executeId,TaskState state)71 void TaskManager::StoreStateInfo(uint32_t executeId, TaskState state)
72 {
73     std::unique_lock<std::shared_mutex> lock(taskStatesMutex_);
74     taskStates_.emplace(executeId, state);
75 }
76 
StoreRunningInfo(uint32_t taskId,uint32_t executeId)77 void TaskManager::StoreRunningInfo(uint32_t taskId, uint32_t executeId)
78 {
79     std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
80     auto iter = runningInfos_.find(taskId);
81     if (iter == runningInfos_.end()) {
82         std::list<uint32_t> list {executeId};
83         runningInfos_.emplace(taskId, list);
84     } else {
85         iter->second.push_front(executeId);
86     }
87 }
88 
PopTaskInfo(uint32_t executeId)89 TaskInfo* TaskManager::PopTaskInfo(uint32_t executeId)
90 {
91     std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
92     auto iter = taskInfos_.find(executeId);
93     if (iter == taskInfos_.end() || iter->second == nullptr) {
94         return nullptr;
95     }
96 
97     TaskInfo* taskInfo = iter->second;
98     // remove the the taskInfo when executed
99     taskInfos_.erase(iter);
100     return taskInfo;
101 }
102 
PopRunningInfo(uint32_t taskId,uint32_t executeId)103 void TaskManager::PopRunningInfo(uint32_t taskId, uint32_t executeId)
104 {
105     std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
106     auto iter = runningInfos_.find(taskId);
107     if (iter == runningInfos_.end()) {
108         return;
109     }
110     iter->second.remove(executeId);
111 }
112 
QueryState(uint32_t executeId)113 TaskState TaskManager::QueryState(uint32_t executeId)
114 {
115     std::shared_lock<std::shared_mutex> lock(taskStatesMutex_);
116     auto iter = taskStates_.find(executeId);
117     if (iter == taskStates_.end()) {
118         HILOG_ERROR("taskpool:: failed to find the target task");
119         return TaskState::NOT_FOUND;
120     }
121     return iter->second;
122 }
123 
UpdateState(uint32_t executeId,TaskState state)124 bool TaskManager::UpdateState(uint32_t executeId, TaskState state)
125 {
126     std::unique_lock<std::shared_mutex> lock(taskStatesMutex_);
127     auto iter = taskStates_.find(executeId);
128     if (iter == taskStates_.end()) {
129         return false;
130     }
131     if (state == TaskState::RUNNING) {
132         iter->second = state;
133     } else {
134         taskStates_.erase(iter);
135     }
136     return true;
137 }
138 
CancelTask(napi_env env,uint32_t taskId)139 void TaskManager::CancelTask(napi_env env, uint32_t taskId)
140 {
141     std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
142     auto iter = runningInfos_.find(taskId);
143     if (iter == runningInfos_.end() || iter->second.empty()) {
144         ErrorHelper::ThrowError(env, ErrorHelper::NOTEXIST_ERROR, "taskpool:: can not find the task");
145         return;
146     }
147     int32_t result;
148     for (auto item : iter->second) {
149         TaskState state = QueryState(item);
150         if (state == TaskState::NOT_FOUND) {
151             result = ErrorHelper::NOTEXIST_ERROR;
152             break;
153         }
154         UpdateState(item, TaskState::CANCELED);
155         if (state == TaskState::WAITING) {
156             TaskInfo* taskInfo = PopTaskInfo(item);
157             ReleaseTaskContent(taskInfo);
158         } else {
159             result = ErrorHelper::RUNNING_ERROR;
160         }
161     }
162 
163     if (result == ErrorHelper::NOTEXIST_ERROR) {
164         ErrorHelper::ThrowError(env, ErrorHelper::NOTEXIST_ERROR, "taskpool:: can not find the task");
165     } else if (result == ErrorHelper::RUNNING_ERROR) {
166         ErrorHelper::ThrowError(env, ErrorHelper::RUNNING_ERROR, "taskpool:: can not cancel the running task");
167     } else {
168         runningInfos_.erase(iter);
169     }
170 }
171 
GenerateTaskInfo(napi_env env,napi_value object,uint32_t taskId,uint32_t executeId)172 TaskInfo* TaskManager::GenerateTaskInfo(napi_env env, napi_value object, uint32_t taskId, uint32_t executeId)
173 {
174     napi_value undefined;
175     napi_get_undefined(env, &undefined);
176     napi_value taskData;
177     napi_status serializeStatus = napi_ok;
178     serializeStatus = napi_serialize(env, object, undefined, &taskData);
179     if (serializeStatus != napi_ok || taskData == nullptr) {
180         ErrorHelper::ThrowError(env, ErrorHelper::WORKERSERIALIZATION_ERROR,
181             "taskpool: failed to serialize message.");
182         return nullptr;
183     }
184     TaskInfo* taskInfo = new (std::nothrow) TaskInfo();
185     taskInfo->env = env;
186     taskInfo->executeId = executeId;
187     taskInfo->serializationData = taskData;
188     taskInfo->taskId = taskId;
189     taskInfo->onResultSignal = new uv_async_t;
190     uv_loop_t* loop = NapiHelper::GetLibUV(env);
191     uv_async_init(loop, taskInfo->onResultSignal, reinterpret_cast<uv_async_cb>(TaskPool::HandleTaskResult));
192     taskInfo->onResultSignal->data = taskInfo;
193 
194     StoreTaskInfo(executeId, taskInfo);
195     return taskInfo;
196 }
197 
ReleaseTaskContent(TaskInfo * taskInfo)198 void TaskManager::ReleaseTaskContent(TaskInfo* taskInfo)
199 {
200     if (taskInfo == nullptr) {
201         return;
202     }
203     if (taskInfo->onResultSignal != nullptr &&
204         !uv_is_closing(reinterpret_cast<uv_handle_t*>(taskInfo->onResultSignal))) {
205         uv_close(reinterpret_cast<uv_handle_t*>(taskInfo->onResultSignal), [](uv_handle_t* handle) {
206             if (handle != nullptr) {
207                 delete reinterpret_cast<uv_async_t*>(handle);
208                 handle = nullptr;
209             }
210         });
211     }
212     delete taskInfo;
213     taskInfo = nullptr;
214 }
215 
NeedExpandWorker()216 bool TaskManager::NeedExpandWorker()
217 {
218     std::unique_lock<std::mutex> lock(workersMutex_);
219     return workers_.size() < MAX_THREADPOOL_SIZE;
220 }
221 
NotifyWorkerIdle(Worker * worker)222 void TaskManager::NotifyWorkerIdle(Worker *worker)
223 {
224     {
225         std::unique_lock<std::mutex> lock(workersMutex_);
226         idleWorkers_.insert(worker);
227     }
228     NotifyExecuteTask();
229 }
230 
NotifyWorkerAdded(Worker * worker)231 void TaskManager::NotifyWorkerAdded(Worker *worker)
232 {
233     std::unique_lock<std::mutex> lock(workersMutex_);
234     workers_.insert(worker);
235 }
236 
EnqueueTask(std::unique_ptr<Task> task)237 void TaskManager::EnqueueTask(std::unique_ptr<Task> task)
238 {
239     taskQueue_.EnqueueTask(std::move(task));
240     NotifyExecuteTask();
241 }
242 
DequeueTask()243 std::unique_ptr<Task> TaskManager::DequeueTask()
244 {
245     return taskQueue_.DequeueTask();
246 }
247 
NotifyExecuteTask()248 void TaskManager::NotifyExecuteTask()
249 {
250     if (taskQueue_.IsEmpty()) {
251         return;
252     }
253 
254     std::unique_lock<std::mutex> lock(workersMutex_);
255     if (idleWorkers_.empty()) {
256         return;
257     }
258     auto candidator = idleWorkers_.begin();
259     Worker *worker = *candidator;
260     idleWorkers_.erase(candidator);
261     worker->NotifyExecuteTask();
262 }
263 
InitTaskRunner(napi_env env)264 void TaskManager::InitTaskRunner(napi_env env)
265 {
266     if (NeedExpandWorker()) {
267         auto worker = Worker::WorkerConstructor(env);
268         NotifyWorkerAdded(worker);
269     }
270 }
271 } // namespace Commonlibrary::Concurrent::TaskPoolModule