1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_manager.h"
17
18 #include <thread>
19
20 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
21 #include "helper/concurrent_helper.h"
22 #include "helper/error_helper.h"
23 #include "helper/hitrace_helper.h"
24 #include "taskpool.h"
25 #include "utils/log.h"
26 #include "worker.h"
27
28 namespace Commonlibrary::Concurrent::TaskPoolModule {
29 using namespace OHOS::JsSysModule;
30
31 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
32 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
33 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
34 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
35 static constexpr int32_t MAX_RETRY_COUNT = 40; // 40: counter for stopping timer
36 static constexpr uint32_t STEP_SIZE = 2;
37 static constexpr uint32_t DEFAULT_THREADS = 3;
38 static constexpr uint32_t MIN_THREADS = 1; // 1: minimum thread num when idle
39 static constexpr uint32_t CHECK_INTERVAL = 60000; // 60000: 1min
40 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
41 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 6000000: 10min
42
43 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()44 TaskManager& TaskManager::GetInstance()
45 {
46 static TaskManager manager;
47 return manager;
48 }
49
TaskManager()50 TaskManager::TaskManager()
51 {
52 for (size_t i = 0; i < taskQueues_.size(); i++) {
53 std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
54 taskQueues_[i] = std::move(taskQueue);
55 }
56 }
57
~TaskManager()58 TaskManager::~TaskManager()
59 {
60 if (timer_ == nullptr) {
61 HILOG_ERROR("taskpool:: timer_ is nullptr");
62 } else {
63 uv_timer_stop(timer_);
64 uv_close(reinterpret_cast<uv_handle_t*>(timer_), [](uv_handle_t* handle) {
65 if (handle != nullptr) {
66 delete reinterpret_cast<uv_timer_t*>(handle);
67 handle = nullptr;
68 }
69 });
70
71 uv_close(reinterpret_cast<uv_handle_t*>(notifyRestartTimer_), [](uv_handle_t* handle) {
72 if (handle != nullptr) {
73 delete reinterpret_cast<uv_async_t*>(handle);
74 handle = nullptr;
75 }
76 });
77 }
78
79 if (loop_ != nullptr) {
80 uv_stop(loop_);
81 }
82
83 {
84 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
85 for (auto& worker : workers_) {
86 delete worker;
87 }
88 workers_.clear();
89 }
90
91 {
92 std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
93 for (auto& [_, taskInfo] : taskInfos_) {
94 delete taskInfo;
95 }
96 taskInfos_.clear();
97 }
98 }
99
GetThreadInfos(napi_env env)100 napi_value TaskManager::GetThreadInfos(napi_env env)
101 {
102 napi_value threadInfos = nullptr;
103 napi_create_array(env, &threadInfos);
104 {
105 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
106 int32_t i = 0;
107 for (auto& worker : workers_) {
108 if (worker->workerEnv_ == nullptr) {
109 continue;
110 }
111 napi_value tid = nullptr;
112 napi_value priority = nullptr;
113 napi_create_int32(env, static_cast<int32_t>(worker->tid_), &tid);
114 napi_create_int32(env, static_cast<int32_t>(worker->priority_), &priority);
115
116 napi_value taskId = nullptr;
117 napi_create_array(env, &taskId);
118 int32_t j = 0;
119 {
120 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
121 for (auto& currentId : worker->currentTaskId_) {
122 napi_value id = nullptr;
123 napi_create_uint32(env, currentId, &id);
124 napi_set_element(env, taskId, j, id);
125 j++;
126 }
127 }
128 napi_value threadInfo = nullptr;
129 napi_create_object(env, &threadInfo);
130 napi_set_named_property(env, threadInfo, "tid", tid);
131 napi_set_named_property(env, threadInfo, "priority", priority);
132 napi_set_named_property(env, threadInfo, "taskIds", taskId);
133
134 napi_set_element(env, threadInfos, i, threadInfo);
135 i++;
136 }
137 }
138 return threadInfos;
139 }
140
GetTaskInfos(napi_env env)141 napi_value TaskManager::GetTaskInfos(napi_env env)
142 {
143 napi_value taskInfos = nullptr;
144 napi_create_array(env, &taskInfos);
145 {
146 std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
147 int32_t i = 0;
148 for (auto& [_, taskInfo] : taskInfos_) {
149 napi_value taskInfoValue = nullptr;
150 napi_create_object(env, &taskInfoValue);
151 napi_value taskId = nullptr;
152 napi_create_uint32(env, taskInfo->taskId, &taskId);
153 napi_value stateValue = nullptr;
154 ExecuteState state;
155 uint64_t duration = 0;
156 if (taskInfo->isCanceled) {
157 state = ExecuteState::CANCELED;
158 } else if (taskInfo->worker != nullptr) {
159 Worker* worker = reinterpret_cast<Worker*>(taskInfo->worker);
160 duration = ConcurrentHelper::GetMilliseconds() - worker->startTime_;
161 state = ExecuteState::RUNNING;
162 } else {
163 state = ExecuteState::WAITING;
164 }
165 napi_create_int32(env, state, &stateValue);
166 napi_set_named_property(env, taskInfoValue, "taskId", taskId);
167 napi_set_named_property(env, taskInfoValue, "state", stateValue);
168 napi_value durationValue = nullptr;
169 napi_create_uint32(env, static_cast<uint32_t>(duration), &durationValue);
170 napi_set_named_property(env, taskInfoValue, "duration", durationValue);
171 napi_set_element(env, taskInfos, i, taskInfoValue);
172 i++;
173 }
174 }
175 return taskInfos;
176 }
177
UpdateExecutedInfo(uint64_t duration)178 void TaskManager::UpdateExecutedInfo(uint64_t duration)
179 {
180 totalExecTime_ += duration;
181 totalExecCount_++;
182 }
183
ComputeSuitableThreadNum()184 uint32_t TaskManager::ComputeSuitableThreadNum()
185 {
186 if (GetTaskNum() != 0 && totalExecCount_ == 0) {
187 // this branch is used for avoiding time-consuming works that may block the taskpool
188 return STEP_SIZE;
189 } else if (totalExecCount_ == 0) {
190 return 0; // no task since created
191 }
192
193 auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
194 return std::ceil(durationPerTask * GetTaskNum() / MAX_TASK_DURATION);
195 }
196
CheckForBlockedWorkers()197 void TaskManager::CheckForBlockedWorkers()
198 {
199 // monitor the running state
200 uint64_t now = ConcurrentHelper::GetMilliseconds();
201 if (UNLIKELY(nextCheckTime_ < now)) {
202 // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
203 // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
204 // else we will choose the longer one
205 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
206 bool state = GetThreadNum() == ConcurrentHelper::GetActiveCpus() - 1 && GetIdleWorkers() == 0;
207 uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
208 for (auto iter = workers_.begin(); iter != workers_.end();) {
209 auto worker = *iter;
210 std::lock_guard<std::mutex> stateLock(worker->stateMutex_);
211 // if the worker thread is idle, just skip it
212 if (worker->state_ == WorkerState::IDLE) {
213 iter++;
214 continue;
215 }
216
217 if (now - worker->startTime_ >= threshold) {
218 HILOG_DEBUG("taskpool:: The worker is marked for timeout.");
219 worker->state_ = WorkerState::BLOCKED;
220 timeoutWorkers_.insert(worker);
221 idleWorkers_.erase(worker);
222 workers_.erase(iter++);
223 } else {
224 iter++;
225 }
226 }
227 nextCheckTime_ = now + CHECK_INTERVAL;
228 }
229 }
230
CreateOrDeleteWorkers(uint32_t targetNum)231 void TaskManager::CreateOrDeleteWorkers(uint32_t targetNum)
232 {
233 // uv_timer_start should not run on the background frequently when there is no task
234 if (targetNum == 0 && retryCount_ >= MAX_RETRY_COUNT) {
235 uv_timer_stop(timer_);
236 suspend_ = true;
237 return;
238 } else if (GetTimeoutWorkers() == 0 && targetNum == 0) {
239 retryCount_++;
240 } else {
241 retryCount_ = 0;
242 }
243
244 uint32_t workerCount = GetThreadNum();
245 const uint32_t maxThreads = std::max(ConcurrentHelper::GetActiveCpus() - 1, DEFAULT_THREADS);
246 targetNum |= 1;
247 if (workerCount < maxThreads && workerCount < targetNum) {
248 uint32_t step = std::min(maxThreads, targetNum) - workerCount;
249 CreateWorkers(hostEnv_, step);
250 } else if (workerCount > MIN_THREADS && workerCount > targetNum) {
251 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
252 uint32_t maxNum = std::max(MIN_THREADS, targetNum);
253 uint32_t step = std::min(workerCount - maxNum, STEP_SIZE);
254 for (uint32_t i = 0; i < step; i++) {
255 auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [this](Worker *worker) {
256 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
257 return idleTime > MAX_IDLE_TIME && worker->runningCount_ == 0 &&
258 !Timer::HasTimer(worker->workerEnv_) && !HasTaskEnvInfo(worker->workerEnv_);
259 });
260 if (iter != idleWorkers_.end()) {
261 workers_.erase(*iter);
262 uv_async_send((*iter)->clearWorkerSignal_);
263 idleWorkers_.erase(iter);
264 }
265 }
266 }
267 if (UNLIKELY(!timeoutWorkers_.empty())) {
268 for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
269 auto worker = *iter;
270 if (worker->runningCount_ == 0 && worker->state_ == WorkerState::BLOCKED &&
271 !Timer::HasTimer(worker->workerEnv_) && !HasTaskEnvInfo(worker->workerEnv_)) {
272 uv_async_send(worker->clearWorkerSignal_);
273 timeoutWorkers_.erase(iter++);
274 } else {
275 iter++;
276 }
277 }
278 }
279 }
280
TriggerLoadBalance(const uv_timer_t * req)281 void TaskManager::TriggerLoadBalance(const uv_timer_t* req)
282 {
283 // Now, we will call triggerLoadBalance when enqueue or by monitor,
284 // and taking the time used to create worker threads into consideration,
285 // so we should ensure the the process is atomic.
286 TaskManager& taskManager = TaskManager::GetInstance();
287
288 HITRACE_HELPER_COUNT_TRACE("threadNum", static_cast<int64_t>(taskManager.GetThreadNum()));
289 HITRACE_HELPER_COUNT_TRACE("runningThreadNum", static_cast<int64_t>(taskManager.GetRunningWorkers()));
290 HITRACE_HELPER_COUNT_TRACE("idleThreadNum", static_cast<int64_t>(taskManager.GetIdleWorkers()));
291 HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", static_cast<int64_t>(taskManager.GetTimeoutWorkers()));
292
293 if (taskManager.expandingCount_ != 0) {
294 return;
295 }
296
297 taskManager.CheckForBlockedWorkers();
298 uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
299 if (targetNum != 0) {
300 // We have tasks in the queue, and all workers may be running.
301 // Therefore the target runnable threads should be the sum of runnig workers and the calculated result.
302 targetNum = std::min(targetNum, taskManager.GetTaskNum());
303 targetNum += taskManager.GetRunningWorkers();
304 } else {
305 // We have no task in the queue. Therefore we do not need extra threads.
306 // But, tasks may still be executed in workers or microtask queue,
307 // so we should return the num of running workers.
308 targetNum = taskManager.GetRunningWorkers();
309 }
310 taskManager.CreateOrDeleteWorkers(targetNum);
311 }
312
RestartTimer(const uv_async_t * req)313 void TaskManager::RestartTimer(const uv_async_t* req)
314 {
315 TaskManager& taskManager = TaskManager::GetInstance();
316 uv_timer_again(taskManager.timer_);
317 }
318
RunTaskManager()319 void TaskManager::RunTaskManager()
320 {
321 loop_ = uv_default_loop();
322 timer_ = new uv_timer_t;
323 uv_timer_init(loop_, timer_);
324 notifyRestartTimer_ = new uv_async_t;
325 uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, 1000); // 1000: 1s
326 uv_async_init(loop_, notifyRestartTimer_, reinterpret_cast<uv_async_cb>(TaskManager::RestartTimer));
327 #if defined IOS_PLATFORM || defined MAC_PLATFORM
328 pthread_setname_np("TaskMgrThread");
329 #else
330 pthread_setname_np(pthread_self(), "TaskMgrThread");
331 #endif
332 uv_run(loop_, UV_RUN_DEFAULT);
333 uv_loop_close(loop_);
334 }
335
GenerateTaskId()336 uint32_t TaskManager::GenerateTaskId()
337 {
338 return currentTaskId_++;
339 }
340
GenerateExecuteId()341 uint32_t TaskManager::GenerateExecuteId()
342 {
343 return currentExecuteId_++;
344 }
345
StoreTaskEnvInfo(napi_env env)346 void TaskManager::StoreTaskEnvInfo(napi_env env)
347 {
348 std::unique_lock<std::shared_mutex> lock(taskEnvInfoMutex_);
349 auto iter = taskEnvInfo_.find(env);
350 if (iter == taskEnvInfo_.end()) {
351 taskEnvInfo_.emplace(env, 1); // 1: default value
352 } else {
353 iter->second++;
354 }
355 }
356
PopTaskEnvInfo(napi_env env)357 void TaskManager::PopTaskEnvInfo(napi_env env)
358 {
359 std::unique_lock<std::shared_mutex> lock(taskEnvInfoMutex_);
360 auto iter = taskEnvInfo_.find(env);
361 if (iter == taskEnvInfo_.end()) {
362 return;
363 } else if (--iter->second == 0) {
364 taskEnvInfo_.erase(iter);
365 }
366 }
367
HasTaskEnvInfo(napi_env env)368 bool TaskManager::HasTaskEnvInfo(napi_env env)
369 {
370 std::shared_lock<std::shared_mutex> lock(taskEnvInfoMutex_);
371 return taskEnvInfo_.find(env) != taskEnvInfo_.end();
372 }
373
StoreTaskInfo(uint32_t executeId,TaskInfo * taskInfo)374 void TaskManager::StoreTaskInfo(uint32_t executeId, TaskInfo* taskInfo)
375 {
376 std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
377 taskInfos_.emplace(executeId, taskInfo);
378 }
379
StoreRunningInfo(uint32_t taskId,uint32_t executeId)380 void TaskManager::StoreRunningInfo(uint32_t taskId, uint32_t executeId)
381 {
382 std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
383 auto iter = runningInfos_.find(taskId);
384 if (iter == runningInfos_.end()) {
385 std::list<uint32_t> list {executeId};
386 runningInfos_.emplace(taskId, list);
387 } else {
388 iter->second.push_front(executeId);
389 }
390 }
391
PopTaskInfo(uint32_t executeId)392 TaskInfo* TaskManager::PopTaskInfo(uint32_t executeId)
393 {
394 std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
395 auto iter = taskInfos_.find(executeId);
396 if (iter == taskInfos_.end() || iter->second == nullptr) {
397 return nullptr;
398 }
399
400 TaskInfo* taskInfo = iter->second;
401 // remove the the taskInfo after call function
402 taskInfos_.erase(iter);
403 return taskInfo;
404 }
405
GetTaskInfo(uint32_t executeId)406 TaskInfo* TaskManager::GetTaskInfo(uint32_t executeId)
407 {
408 std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
409 auto iter = taskInfos_.find(executeId);
410 if (iter == taskInfos_.end() || iter->second == nullptr) {
411 return nullptr;
412 }
413 return iter->second;
414 }
415
MarkCanceledState(uint32_t executeId)416 bool TaskManager::MarkCanceledState(uint32_t executeId)
417 {
418 std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
419 auto iter = taskInfos_.find(executeId);
420 if (iter == taskInfos_.end() || iter->second == nullptr) {
421 return false;
422 }
423 if (!UpdateExecuteState(executeId, ExecuteState::CANCELED)) {
424 return false;
425 }
426 iter->second->isCanceled = true;
427 return true;
428 }
429
PopRunningInfo(uint32_t taskId,uint32_t executeId)430 void TaskManager::PopRunningInfo(uint32_t taskId, uint32_t executeId)
431 {
432 std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
433 auto iter = runningInfos_.find(taskId);
434 if (iter == runningInfos_.end()) {
435 return;
436 }
437 iter->second.remove(executeId);
438 }
439
AddExecuteState(uint32_t executeId)440 void TaskManager::AddExecuteState(uint32_t executeId)
441 {
442 std::unique_lock<std::shared_mutex> lock(executeStatesMutex_);
443 executeStates_.emplace(executeId, ExecuteState::WAITING);
444 }
445
UpdateExecuteState(uint32_t executeId,ExecuteState state)446 bool TaskManager::UpdateExecuteState(uint32_t executeId, ExecuteState state)
447 {
448 std::unique_lock<std::shared_mutex> lock(executeStatesMutex_);
449 auto iter = executeStates_.find(executeId);
450 if (iter == executeStates_.end()) {
451 return false;
452 }
453 std::string traceInfo = "UpdateExecuteState: executeId : " + std::to_string(executeId) +
454 ", executeState : " + std::to_string(state);
455 HITRACE_HELPER_METER_NAME(traceInfo);
456 iter->second = state;
457 return true;
458 }
459
RemoveExecuteState(uint32_t executeId)460 void TaskManager::RemoveExecuteState(uint32_t executeId)
461 {
462 std::unique_lock<std::shared_mutex> lock(executeStatesMutex_);
463 executeStates_.erase(executeId);
464 }
465
QueryExecuteState(uint32_t executeId)466 ExecuteState TaskManager::QueryExecuteState(uint32_t executeId)
467 {
468 std::shared_lock<std::shared_mutex> lock(executeStatesMutex_);
469 auto iter = executeStates_.find(executeId);
470 if (iter == executeStates_.end()) {
471 HILOG_DEBUG("taskpool:: Can not find the target task");
472 return ExecuteState::NOT_FOUND;
473 }
474 return iter->second;
475 }
476
CancelTask(napi_env env,uint32_t taskId)477 void TaskManager::CancelTask(napi_env env, uint32_t taskId)
478 {
479 // Cannot find task by taskId, throw error
480 std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
481 auto iter = runningInfos_.find(taskId);
482 if (iter == runningInfos_.end() || iter->second.empty()) {
483 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK);
484 HILOG_ERROR("taskpool:: query nonexist task");
485 return;
486 }
487 for (uint32_t executeId : iter->second) {
488 CancelExecution(env, executeId);
489 }
490 }
491
CancelExecution(napi_env env,uint32_t executeId)492 void TaskManager::CancelExecution(napi_env env, uint32_t executeId)
493 {
494 // 1. Cannot find taskInfo by executeId, throw error
495 // 2. Find executing taskInfo, skip it
496 // 3. Find waiting taskInfo, cancel it
497 // 4. Find canceled taskInfo, skip it
498 ExecuteState state = QueryExecuteState(executeId);
499 TaskInfo* taskInfo = nullptr;
500 switch (state) {
501 case ExecuteState::NOT_FOUND:
502 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK);
503 HILOG_ERROR("taskpool:: cancel nonexist task");
504 return;
505 case ExecuteState::RUNNING:
506 if (!MarkCanceledState(executeId)) {
507 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK,
508 "taskpool:: fail to mark cancel state");
509 return;
510 }
511 break;
512 case ExecuteState::WAITING:
513 HILOG_DEBUG("taskpool:: Cancel waiting task");
514 taskInfo = PopTaskInfo(executeId);
515 if (taskInfo != nullptr) {
516 napi_value undefined = NapiHelper::GetUndefinedValue(taskInfo->env);
517 napi_reject_deferred(taskInfo->env, taskInfo->deferred, undefined);
518 ReleaseTaskContent(taskInfo);
519 }
520 RemoveExecuteState(executeId);
521 break;
522 default: // Default is CANCELED, means task isCanceled, do not need to mark again.
523 break;
524 }
525 }
526
GenerateTaskInfo(napi_env env,napi_value func,napi_value args,uint32_t taskId,uint32_t executeId,napi_value transferList)527 TaskInfo* TaskManager::GenerateTaskInfo(napi_env env, napi_value func, napi_value args,
528 uint32_t taskId, uint32_t executeId, napi_value transferList)
529 {
530 napi_value undefined = NapiHelper::GetUndefinedValue(env);
531 napi_value serializationFunction;
532 napi_status status = napi_serialize(env, func, undefined, &serializationFunction);
533 if (status != napi_ok || serializationFunction == nullptr) {
534 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, "taskpool: failed to serialize function.");
535 return nullptr;
536 }
537 napi_value serializationArguments;
538 if (transferList == nullptr) {
539 status = napi_serialize(env, args, undefined, &serializationArguments);
540 } else {
541 status = napi_serialize(env, args, transferList, &serializationArguments);
542 }
543 if (status != napi_ok || serializationArguments == nullptr) {
544 ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, "taskpool: failed to serialize arguments.");
545 return nullptr;
546 }
547 TaskInfo* taskInfo = new TaskInfo();
548 taskInfo->env = env;
549 taskInfo->executeId = executeId;
550 taskInfo->serializationFunction = serializationFunction;
551 taskInfo->serializationArguments = serializationArguments;
552 taskInfo->taskId = taskId;
553 taskInfo->onResultSignal = new uv_async_t;
554 uv_loop_t* loop = NapiHelper::GetLibUV(env);
555 uv_async_init(loop, taskInfo->onResultSignal, reinterpret_cast<uv_async_cb>(TaskPool::HandleTaskResult));
556 taskInfo->onResultSignal->data = taskInfo;
557
558 StoreTaskInfo(executeId, taskInfo);
559 StoreTaskEnvInfo(env);
560 return taskInfo;
561 }
562
GenerateTaskInfoFromTask(napi_env env,napi_value task,uint32_t executeId)563 TaskInfo* TaskManager::GenerateTaskInfoFromTask(napi_env env, napi_value task, uint32_t executeId)
564 {
565 napi_value function = NapiHelper::GetNameProperty(env, task, FUNCTION_STR);
566 napi_value arguments = NapiHelper::GetNameProperty(env, task, ARGUMENTS_STR);
567 napi_value taskId = NapiHelper::GetNameProperty(env, task, TASKID_STR);
568 if (function == nullptr || arguments == nullptr || taskId == nullptr) {
569 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: task value is error");
570 return nullptr;
571 }
572 napi_value transferList = NapiHelper::GetNameProperty(env, task, TRANSFERLIST_STR);
573 uint32_t id = NapiHelper::GetUint32Value(env, taskId);
574 TaskInfo* taskInfo = GenerateTaskInfo(env, function, arguments, id, executeId, transferList);
575 return taskInfo;
576 }
577
ReleaseTaskContent(TaskInfo * taskInfo)578 void TaskManager::ReleaseTaskContent(TaskInfo* taskInfo)
579 {
580 PopTaskEnvInfo(taskInfo->env);
581 if (taskInfo->onResultSignal != nullptr &&
582 !uv_is_closing(reinterpret_cast<uv_handle_t*>(taskInfo->onResultSignal))) {
583 uv_close(reinterpret_cast<uv_handle_t*>(taskInfo->onResultSignal), [](uv_handle_t* handle) {
584 if (handle != nullptr) {
585 delete reinterpret_cast<uv_async_t*>(handle);
586 handle = nullptr;
587 }
588 });
589 }
590 delete taskInfo;
591 taskInfo = nullptr;
592 }
593
NotifyWorkerIdle(Worker * worker)594 void TaskManager::NotifyWorkerIdle(Worker* worker)
595 {
596 {
597 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
598 idleWorkers_.insert(worker);
599 }
600 if (GetTaskNum() != 0) {
601 NotifyExecuteTask();
602 }
603 }
604
NotifyWorkerCreated(Worker * worker)605 void TaskManager::NotifyWorkerCreated(Worker* worker)
606 {
607 NotifyWorkerIdle(worker);
608 expandingCount_--;
609 }
610
NotifyWorkerAdded(Worker * worker)611 void TaskManager::NotifyWorkerAdded(Worker* worker)
612 {
613 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
614 workers_.insert(worker);
615 }
616
TryTriggerLoadBalance()617 void TaskManager::TryTriggerLoadBalance()
618 {
619 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
620 if (idleWorkers_.empty()) {
621 TriggerLoadBalance();
622 }
623 }
624
GetIdleWorkers()625 uint32_t TaskManager::GetIdleWorkers()
626 {
627 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
628 return idleWorkers_.size();
629 }
630
GetRunningWorkers()631 uint32_t TaskManager::GetRunningWorkers()
632 {
633 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
634 return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
635 return worker->runningCount_ != 0 || Timer::HasTimer(worker->workerEnv_);
636 });
637 }
638
GetTimeoutWorkers()639 uint32_t TaskManager::GetTimeoutWorkers()
640 {
641 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
642 return timeoutWorkers_.size();
643 }
644
GetTaskNum()645 uint32_t TaskManager::GetTaskNum()
646 {
647 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
648 return taskQueues_[Priority::HIGH]->GetTaskNum() + taskQueues_[Priority::MEDIUM]->GetTaskNum() +
649 taskQueues_[Priority::LOW]->GetTaskNum();
650 }
651
GetThreadNum()652 uint32_t TaskManager::GetThreadNum()
653 {
654 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
655 return workers_.size();
656 }
657
EnqueueExecuteId(uint32_t executeId,Priority priority)658 void TaskManager::EnqueueExecuteId(uint32_t executeId, Priority priority)
659 {
660 // once enqueued, reset the counter to make threads released at given time
661 // if timer is stopped and then new tasks enqueue, restart it
662 retryCount_ = 0;
663 if (suspend_) {
664 suspend_ = false;
665 uv_async_send(notifyRestartTimer_);
666 }
667
668 {
669 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
670 taskQueues_[priority]->EnqueueExecuteId(executeId);
671 }
672 NotifyExecuteTask();
673 }
674
DequeueExecuteId()675 std::pair<uint32_t, Priority> TaskManager::DequeueExecuteId()
676 {
677 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
678 if (highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
679 auto& highTaskQueue = taskQueues_[Priority::HIGH];
680 highPrioExecuteCount_++;
681 return std::make_pair(highTaskQueue->DequeueExecuteId(), Priority::HIGH);
682 }
683 highPrioExecuteCount_ = 0;
684
685 if (mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
686 auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
687 mediumPrioExecuteCount_++;
688 return std::make_pair(mediumTaskQueue->DequeueExecuteId(), Priority::MEDIUM);
689 }
690 mediumPrioExecuteCount_ = 0;
691
692 auto& lowTaskQueue = taskQueues_[Priority::LOW];
693 return std::make_pair(lowTaskQueue->DequeueExecuteId(), Priority::LOW);
694 }
695
NotifyExecuteTask()696 void TaskManager::NotifyExecuteTask()
697 {
698 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
699 if (idleWorkers_.empty()) {
700 return;
701 }
702 auto candidator = idleWorkers_.begin();
703 Worker* worker = *candidator;
704 idleWorkers_.erase(candidator);
705 worker->NotifyExecuteTask();
706 }
707
InitTaskManager(napi_env env)708 void TaskManager::InitTaskManager(napi_env env)
709 {
710 HITRACE_HELPER_METER_NAME("InitTaskManager");
711 auto hostEngine = reinterpret_cast<NativeEngine*>(env);
712 while (hostEngine != nullptr && !hostEngine->IsMainThread()) {
713 hostEngine = hostEngine->GetHostEngine();
714 }
715 if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
716 hostEnv_ = reinterpret_cast<napi_env>(hostEngine);
717 // Add a reserved thread for taskpool
718 CreateWorkers(hostEnv_);
719 // Create a timer to manage worker threads
720 std::thread workerManager(&TaskManager::RunTaskManager, this);
721 workerManager.detach();
722 }
723 }
724
CreateWorkers(napi_env env,uint32_t num)725 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
726 {
727 for (uint32_t i = 0; i < num; i++) {
728 expandingCount_++;
729 auto worker = Worker::WorkerConstructor(env);
730 NotifyWorkerAdded(worker);
731 }
732 }
733
RemoveWorker(Worker * worker)734 void TaskManager::RemoveWorker(Worker* worker)
735 {
736 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
737 idleWorkers_.erase(worker);
738 workers_.erase(worker);
739 }
740
IsCanceled(napi_env env,napi_callback_info cbinfo)741 napi_value TaskManager::IsCanceled(napi_env env, napi_callback_info cbinfo)
742 {
743 bool isCanceled = false;
744 auto engine = reinterpret_cast<NativeEngine*>(env);
745 if (!engine->IsTaskPoolThread()) {
746 HILOG_ERROR("taskpool:: call isCanceled not in taskpool thread");
747 return NapiHelper::CreateBooleanValue(env, isCanceled);
748 }
749 // Get taskInfo and query task cancel state
750 void* data = engine->GetCurrentTaskInfo();
751 if (data == nullptr) {
752 HILOG_ERROR("taskpool:: call isCanceled not in Concurrent function");
753 } else {
754 TaskInfo* taskInfo = static_cast<TaskInfo*>(data);
755 isCanceled = taskInfo->isCanceled;
756 }
757 return NapiHelper::CreateBooleanValue(env, isCanceled);
758 }
759
760 // ----------------------------------- TaskGroupManager ----------------------------------------
GetInstance()761 TaskGroupManager &TaskGroupManager::GetInstance()
762 {
763 static TaskGroupManager groupManager;
764 return groupManager;
765 }
766
GenerateGroupId()767 uint32_t TaskGroupManager::GenerateGroupId()
768 {
769 return groupId_++;
770 }
771
GenerateGroupInfo(napi_env env,uint32_t taskNum,uint32_t groupId,uint32_t groupExecuteId)772 GroupInfo* TaskGroupManager::GenerateGroupInfo(napi_env env, uint32_t taskNum, uint32_t groupId,
773 uint32_t groupExecuteId)
774 {
775 GroupInfo* groupInfo = new GroupInfo();
776 groupInfo->taskNum = taskNum;
777 groupInfo->groupId = groupId;
778 napi_value resArr;
779 napi_create_array_with_length(env, taskNum, &resArr);
780 napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
781 groupInfo->resArr = arrRef;
782 StoreExecuteId(groupId, groupExecuteId);
783 StoreRunningExecuteId(groupExecuteId);
784 AddGroupInfoById(groupExecuteId, groupInfo);
785 return groupInfo;
786 }
787
ClearGroupInfo(napi_env env,uint32_t groupExecuteId,GroupInfo * groupInfo)788 void TaskGroupManager::ClearGroupInfo(napi_env env, uint32_t groupExecuteId, GroupInfo* groupInfo)
789 {
790 RemoveRunningExecuteId(groupExecuteId);
791 RemoveGroupInfoById(groupExecuteId);
792 napi_delete_reference(env, groupInfo->resArr);
793 delete groupInfo;
794 groupInfo = nullptr;
795 }
796
AddTask(uint32_t groupId,napi_ref task)797 void TaskGroupManager::AddTask(uint32_t groupId, napi_ref task)
798 {
799 std::unique_lock<std::shared_mutex> lock(tasksMutex_);
800 auto iter = tasks_.find(groupId);
801 if (iter == tasks_.end()) {
802 std::list<napi_ref> list {task};
803 tasks_.emplace(groupId, list);
804 } else {
805 iter->second.push_back(task);
806 }
807 }
808
GetTasksByGroup(uint32_t groupId)809 const std::list<napi_ref>& TaskGroupManager::GetTasksByGroup(uint32_t groupId)
810 {
811 std::shared_lock<std::shared_mutex> lock(tasksMutex_);
812 auto iter = tasks_.find(groupId);
813 if (iter == tasks_.end()) {
814 static const std::list<napi_ref> EMPTY_TASK_LIST {};
815 return EMPTY_TASK_LIST;
816 }
817 return iter->second;
818 }
819
ClearTasks(napi_env env,uint32_t groupId)820 void TaskGroupManager::ClearTasks(napi_env env, uint32_t groupId)
821 {
822 std::unique_lock<std::shared_mutex> lock(tasksMutex_);
823 auto iter = tasks_.find(groupId);
824 if (iter == tasks_.end()) {
825 return;
826 }
827 for (napi_ref task : iter->second) {
828 napi_delete_reference(env, task);
829 }
830 tasks_.erase(iter);
831 }
832
StoreExecuteId(uint32_t groupId,uint32_t groupExecuteId)833 void TaskGroupManager::StoreExecuteId(uint32_t groupId, uint32_t groupExecuteId)
834 {
835 std::lock_guard<std::mutex> lock(groupExecuteIdsMutex_);
836 auto iter = groupExecuteIds_.find(groupId);
837 if (iter == groupExecuteIds_.end()) {
838 std::list<uint32_t> list {groupExecuteId};
839 groupExecuteIds_.emplace(groupId, std::move(list));
840 } else {
841 iter->second.push_back(groupExecuteId);
842 }
843 }
844
RemoveExecuteId(uint32_t groupId,uint32_t groupExecuteId)845 void TaskGroupManager::RemoveExecuteId(uint32_t groupId, uint32_t groupExecuteId)
846 {
847 std::lock_guard<std::mutex> lock(groupExecuteIdsMutex_);
848 auto iter = groupExecuteIds_.find(groupId);
849 if (iter != groupExecuteIds_.end()) {
850 iter->second.remove(groupExecuteId);
851 }
852 if (iter->second.empty()) {
853 groupExecuteIds_.erase(iter);
854 }
855 }
856
ClearExecuteId(uint32_t groupId)857 void TaskGroupManager::ClearExecuteId(uint32_t groupId)
858 {
859 std::lock_guard<std::mutex> lock(groupExecuteIdsMutex_);
860 groupExecuteIds_.erase(groupId);
861 }
862
CancelGroup(napi_env env,uint32_t groupId)863 void TaskGroupManager::CancelGroup(napi_env env, uint32_t groupId)
864 {
865 std::lock_guard<std::mutex> lock(groupExecuteIdsMutex_);
866 auto iter = groupExecuteIds_.find(groupId);
867 if (iter == groupExecuteIds_.end() || iter->second.empty()) {
868 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP);
869 HILOG_ERROR("taskpool:: cancel nonexist task group");
870 return;
871 }
872 for (uint32_t groupExecuteId : iter->second) {
873 bool isRunning = IsRunning(groupExecuteId);
874 if (!isRunning) {
875 continue;
876 }
877 GroupInfo* info = GetGroupInfoByExecutionId(groupExecuteId);
878 if (info == nullptr) {
879 continue;
880 }
881 const std::list<uint32_t>& executeList = info->executeIds;
882 for (uint32_t executeId : executeList) {
883 CancelGroupExecution(executeId);
884 }
885 napi_value undefined = NapiHelper::GetUndefinedValue(env);
886 napi_reject_deferred(env, info->deferred, undefined);
887 TaskGroupManager::GetInstance().ClearGroupInfo(env, groupExecuteId, info);
888 }
889 }
890
GenerateGroupExecuteId()891 uint32_t TaskGroupManager::GenerateGroupExecuteId()
892 {
893 return groupExecuteId_++;
894 }
895
StoreRunningExecuteId(uint32_t groupExecuteId)896 void TaskGroupManager::StoreRunningExecuteId(uint32_t groupExecuteId)
897 {
898 std::unique_lock<std::shared_mutex> lock(groupExecutionsMutex_);
899 runningGroupExecutions_.insert(groupExecuteId);
900 }
901
RemoveRunningExecuteId(uint32_t groupExecuteId)902 void TaskGroupManager::RemoveRunningExecuteId(uint32_t groupExecuteId)
903 {
904 std::unique_lock<std::shared_mutex> lock(groupExecutionsMutex_);
905 runningGroupExecutions_.erase(groupExecuteId);
906 }
907
IsRunning(uint32_t groupExecuteId)908 bool TaskGroupManager::IsRunning(uint32_t groupExecuteId)
909 {
910 std::shared_lock<std::shared_mutex> lock(groupExecutionsMutex_);
911 bool isRunning = runningGroupExecutions_.find(groupExecuteId) != runningGroupExecutions_.end();
912 return isRunning;
913 }
914
AddGroupInfoById(uint32_t groupExecuteId,GroupInfo * info)915 void TaskGroupManager::AddGroupInfoById(uint32_t groupExecuteId, GroupInfo* info)
916 {
917 std::unique_lock<std::shared_mutex> lock(groupInfoMapMutex_);
918 groupInfoMap_.emplace(groupExecuteId, info);
919 }
920
RemoveGroupInfoById(uint32_t groupExecuteId)921 void TaskGroupManager::RemoveGroupInfoById(uint32_t groupExecuteId)
922 {
923 std::unique_lock<std::shared_mutex> lock(groupInfoMapMutex_);
924 groupInfoMap_.erase(groupExecuteId);
925 }
926
GetGroupInfoByExecutionId(uint32_t groupExecuteId)927 GroupInfo* TaskGroupManager::GetGroupInfoByExecutionId(uint32_t groupExecuteId)
928 {
929 std::shared_lock<std::shared_mutex> lock(groupInfoMapMutex_);
930 auto iter = groupInfoMap_.find(groupExecuteId);
931 if (iter != groupInfoMap_.end()) {
932 return iter->second;
933 }
934 return nullptr;
935 }
936
CancelGroupExecution(uint32_t executeId)937 void TaskGroupManager::CancelGroupExecution(uint32_t executeId)
938 {
939 ExecuteState state = TaskManager::GetInstance().QueryExecuteState(executeId);
940 TaskInfo* taskInfo = nullptr;
941 switch (state) {
942 case ExecuteState::NOT_FOUND:
943 break;
944 case ExecuteState::RUNNING:
945 TaskManager::GetInstance().MarkCanceledState(executeId);
946 break;
947 case ExecuteState::WAITING:
948 HILOG_DEBUG("taskpool:: Cancel waiting task in group");
949 taskInfo = TaskManager::GetInstance().PopTaskInfo(executeId);
950 if (taskInfo == nullptr) {
951 HILOG_ERROR("taskpool:: taskInfo is nullptr when cancel waiting execution");
952 return;
953 }
954 TaskManager::GetInstance().RemoveExecuteState(executeId);
955 TaskManager::GetInstance().ReleaseTaskContent(taskInfo);
956 break;
957 default: // Default is CANCELED, means task isCanceled, do not need to mark again.
958 break;
959 }
960 }
961 } // namespace Commonlibrary::Concurrent::TaskPoolModule
962