• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_manager.h"
17 
18 #include <thread>
19 
20 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
21 #include "helper/concurrent_helper.h"
22 #include "helper/error_helper.h"
23 #include "helper/hitrace_helper.h"
24 #include "taskpool.h"
25 #include "utils/log.h"
26 #include "worker.h"
27 
28 namespace Commonlibrary::Concurrent::TaskPoolModule {
29 using namespace OHOS::JsSysModule;
30 
31 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
32 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
33 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
34 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
35 static constexpr int32_t MAX_RETRY_COUNT = 40; // 40: counter for stopping timer
36 static constexpr uint32_t STEP_SIZE = 2;
37 static constexpr uint32_t DEFAULT_THREADS = 3;
38 static constexpr uint32_t MIN_THREADS = 1; // 1: minimum thread num when idle
39 static constexpr uint32_t CHECK_INTERVAL = 60000; // 60000: 1min
40 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
41 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 6000000: 10min
42 
43 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()44 TaskManager& TaskManager::GetInstance()
45 {
46     static TaskManager manager;
47     return manager;
48 }
49 
TaskManager()50 TaskManager::TaskManager()
51 {
52     for (size_t i = 0; i < taskQueues_.size(); i++) {
53         std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
54         taskQueues_[i] = std::move(taskQueue);
55     }
56 }
57 
~TaskManager()58 TaskManager::~TaskManager()
59 {
60     if (timer_ == nullptr) {
61         HILOG_ERROR("taskpool:: timer_ is nullptr");
62     } else {
63         uv_timer_stop(timer_);
64         uv_close(reinterpret_cast<uv_handle_t*>(timer_), [](uv_handle_t* handle) {
65             if (handle != nullptr) {
66                 delete reinterpret_cast<uv_timer_t*>(handle);
67                 handle = nullptr;
68             }
69         });
70 
71         uv_close(reinterpret_cast<uv_handle_t*>(notifyRestartTimer_), [](uv_handle_t* handle) {
72             if (handle != nullptr) {
73                 delete reinterpret_cast<uv_async_t*>(handle);
74                 handle = nullptr;
75             }
76         });
77     }
78 
79     if (loop_ != nullptr) {
80         uv_stop(loop_);
81     }
82 
83     {
84         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
85         for (auto& worker : workers_) {
86             delete worker;
87         }
88         workers_.clear();
89     }
90 
91     {
92         std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
93         for (auto& [_, taskInfo] : taskInfos_) {
94             delete taskInfo;
95         }
96         taskInfos_.clear();
97     }
98 }
99 
GetThreadInfos(napi_env env)100 napi_value TaskManager::GetThreadInfos(napi_env env)
101 {
102     napi_value threadInfos = nullptr;
103     napi_create_array(env, &threadInfos);
104     {
105         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
106         int32_t i = 0;
107         for (auto& worker : workers_) {
108             if (worker->workerEnv_ == nullptr) {
109                 continue;
110             }
111             napi_value tid = nullptr;
112             napi_value priority = nullptr;
113             napi_create_int32(env, static_cast<int32_t>(worker->tid_), &tid);
114             napi_create_int32(env, static_cast<int32_t>(worker->priority_), &priority);
115 
116             napi_value taskId = nullptr;
117             napi_create_array(env, &taskId);
118             int32_t j = 0;
119             {
120                 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
121                 for (auto& currentId : worker->currentTaskId_) {
122                     napi_value id = nullptr;
123                     napi_create_uint32(env, currentId, &id);
124                     napi_set_element(env, taskId, j, id);
125                     j++;
126                 }
127             }
128             napi_value threadInfo = nullptr;
129             napi_create_object(env, &threadInfo);
130             napi_set_named_property(env, threadInfo, "tid", tid);
131             napi_set_named_property(env, threadInfo, "priority", priority);
132             napi_set_named_property(env, threadInfo, "taskIds", taskId);
133 
134             napi_set_element(env, threadInfos, i, threadInfo);
135             i++;
136         }
137     }
138     return threadInfos;
139 }
140 
GetTaskInfos(napi_env env)141 napi_value TaskManager::GetTaskInfos(napi_env env)
142 {
143     napi_value taskInfos = nullptr;
144     napi_create_array(env, &taskInfos);
145     {
146         std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
147         int32_t i = 0;
148         for (auto& [_, taskInfo] : taskInfos_) {
149             napi_value taskInfoValue = nullptr;
150             napi_create_object(env, &taskInfoValue);
151             napi_value taskId = nullptr;
152             napi_create_uint32(env, taskInfo->taskId, &taskId);
153             napi_value stateValue = nullptr;
154             ExecuteState state;
155             uint64_t duration = 0;
156             if (taskInfo->isCanceled) {
157                 state = ExecuteState::CANCELED;
158             } else if (taskInfo->worker != nullptr) {
159                 Worker* worker = reinterpret_cast<Worker*>(taskInfo->worker);
160                 duration = ConcurrentHelper::GetMilliseconds() - worker->startTime_;
161                 state = ExecuteState::RUNNING;
162             } else {
163                 state = ExecuteState::WAITING;
164             }
165             napi_create_int32(env, state, &stateValue);
166             napi_set_named_property(env, taskInfoValue, "taskId", taskId);
167             napi_set_named_property(env, taskInfoValue, "state", stateValue);
168             napi_value durationValue = nullptr;
169             napi_create_uint32(env, static_cast<uint32_t>(duration), &durationValue);
170             napi_set_named_property(env, taskInfoValue, "duration", durationValue);
171             napi_set_element(env, taskInfos, i, taskInfoValue);
172             i++;
173         }
174     }
175     return taskInfos;
176 }
177 
UpdateExecutedInfo(uint64_t duration)178 void TaskManager::UpdateExecutedInfo(uint64_t duration)
179 {
180     totalExecTime_ += duration;
181     totalExecCount_++;
182 }
183 
ComputeSuitableThreadNum()184 uint32_t TaskManager::ComputeSuitableThreadNum()
185 {
186     if (GetTaskNum() != 0 && totalExecCount_ == 0) {
187         // this branch is used for avoiding time-consuming works that may block the taskpool
188         return STEP_SIZE;
189     } else if (totalExecCount_ == 0) {
190         return 0; // no task since created
191     }
192 
193     auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
194     return std::ceil(durationPerTask * GetTaskNum() / MAX_TASK_DURATION);
195 }
196 
CheckForBlockedWorkers()197 void TaskManager::CheckForBlockedWorkers()
198 {
199     // monitor the running state
200     uint64_t now = ConcurrentHelper::GetMilliseconds();
201     if (UNLIKELY(nextCheckTime_ < now)) {
202         // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
203         // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
204         // else we will choose the longer one
205         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
206         bool state = GetThreadNum() == ConcurrentHelper::GetActiveCpus() - 1 && GetIdleWorkers() == 0;
207         uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
208         for (auto iter = workers_.begin(); iter != workers_.end();) {
209             auto worker = *iter;
210             std::lock_guard<std::mutex> stateLock(worker->stateMutex_);
211             // if the worker thread is idle, just skip it
212             if (worker->state_ == WorkerState::IDLE) {
213                 iter++;
214                 continue;
215             }
216 
217             if (now - worker->startTime_ >= threshold) {
218                 HILOG_DEBUG("taskpool:: The worker is marked for timeout.");
219                 worker->state_ = WorkerState::BLOCKED;
220                 timeoutWorkers_.insert(worker);
221                 idleWorkers_.erase(worker);
222                 workers_.erase(iter++);
223             } else {
224                 iter++;
225             }
226         }
227         nextCheckTime_ = now + CHECK_INTERVAL;
228     }
229 }
230 
CreateOrDeleteWorkers(uint32_t targetNum)231 void TaskManager::CreateOrDeleteWorkers(uint32_t targetNum)
232 {
233     // uv_timer_start should not run on the background frequently when there is no task
234     if (targetNum == 0 && retryCount_ >= MAX_RETRY_COUNT) {
235         uv_timer_stop(timer_);
236         suspend_ = true;
237         return;
238     } else if (GetTimeoutWorkers() == 0 && targetNum == 0) {
239         retryCount_++;
240     } else {
241         retryCount_ = 0;
242     }
243 
244     uint32_t workerCount = GetThreadNum();
245     const uint32_t maxThreads = std::max(ConcurrentHelper::GetActiveCpus() - 1, DEFAULT_THREADS);
246     targetNum |= 1;
247     if (workerCount < maxThreads && workerCount < targetNum) {
248         uint32_t step = std::min(maxThreads, targetNum) - workerCount;
249         CreateWorkers(hostEnv_, step);
250     } else if (workerCount > MIN_THREADS && workerCount > targetNum) {
251         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
252         uint32_t maxNum = std::max(MIN_THREADS, targetNum);
253         uint32_t step = std::min(workerCount - maxNum, STEP_SIZE);
254         for (uint32_t i = 0; i < step; i++) {
255             auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [this](Worker *worker) {
256                 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
257                 return idleTime > MAX_IDLE_TIME && worker->runningCount_ == 0 &&
258                     !Timer::HasTimer(worker->workerEnv_) && !HasTaskEnvInfo(worker->workerEnv_);
259             });
260             if (iter != idleWorkers_.end()) {
261                 workers_.erase(*iter);
262                 uv_async_send((*iter)->clearWorkerSignal_);
263                 idleWorkers_.erase(iter);
264             }
265         }
266     }
267     if (UNLIKELY(!timeoutWorkers_.empty())) {
268         for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
269             auto worker = *iter;
270             if (worker->runningCount_ == 0 && worker->state_ == WorkerState::BLOCKED &&
271                 !Timer::HasTimer(worker->workerEnv_) && !HasTaskEnvInfo(worker->workerEnv_)) {
272                 uv_async_send(worker->clearWorkerSignal_);
273                 timeoutWorkers_.erase(iter++);
274             } else {
275                 iter++;
276             }
277         }
278     }
279 }
280 
TriggerLoadBalance(const uv_timer_t * req)281 void TaskManager::TriggerLoadBalance(const uv_timer_t* req)
282 {
283     // Now, we will call triggerLoadBalance when enqueue or by monitor,
284     // and taking the time used to create worker threads into consideration,
285     // so we should ensure the the process is atomic.
286     TaskManager& taskManager = TaskManager::GetInstance();
287 
288     HITRACE_HELPER_COUNT_TRACE("threadNum", static_cast<int64_t>(taskManager.GetThreadNum()));
289     HITRACE_HELPER_COUNT_TRACE("runningThreadNum", static_cast<int64_t>(taskManager.GetRunningWorkers()));
290     HITRACE_HELPER_COUNT_TRACE("idleThreadNum", static_cast<int64_t>(taskManager.GetIdleWorkers()));
291     HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", static_cast<int64_t>(taskManager.GetTimeoutWorkers()));
292 
293     if (taskManager.expandingCount_ != 0) {
294         return;
295     }
296 
297     taskManager.CheckForBlockedWorkers();
298     uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
299     if (targetNum != 0) {
300         // We have tasks in the queue, and all workers may be running.
301         // Therefore the target runnable threads should be the sum of runnig workers and the calculated result.
302         targetNum = std::min(targetNum, taskManager.GetTaskNum());
303         targetNum += taskManager.GetRunningWorkers();
304     } else {
305         // We have no task in the queue. Therefore we do not need extra threads.
306         // But, tasks may still be executed in workers or microtask queue,
307         // so we should return the num of running workers.
308         targetNum = taskManager.GetRunningWorkers();
309     }
310     taskManager.CreateOrDeleteWorkers(targetNum);
311 }
312 
RestartTimer(const uv_async_t * req)313 void TaskManager::RestartTimer(const uv_async_t* req)
314 {
315     TaskManager& taskManager = TaskManager::GetInstance();
316     uv_timer_again(taskManager.timer_);
317 }
318 
RunTaskManager()319 void TaskManager::RunTaskManager()
320 {
321     loop_ = uv_default_loop();
322     timer_ = new uv_timer_t;
323     uv_timer_init(loop_, timer_);
324     notifyRestartTimer_ = new uv_async_t;
325     uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, 1000); // 1000: 1s
326     uv_async_init(loop_, notifyRestartTimer_, reinterpret_cast<uv_async_cb>(TaskManager::RestartTimer));
327 #if defined IOS_PLATFORM || defined MAC_PLATFORM
328     pthread_setname_np("TaskMgrThread");
329 #else
330     pthread_setname_np(pthread_self(), "TaskMgrThread");
331 #endif
332     uv_run(loop_, UV_RUN_DEFAULT);
333     uv_loop_close(loop_);
334 }
335 
GenerateTaskId()336 uint32_t TaskManager::GenerateTaskId()
337 {
338     return currentTaskId_++;
339 }
340 
GenerateExecuteId()341 uint32_t TaskManager::GenerateExecuteId()
342 {
343     return currentExecuteId_++;
344 }
345 
StoreTaskEnvInfo(napi_env env)346 void TaskManager::StoreTaskEnvInfo(napi_env env)
347 {
348     std::unique_lock<std::shared_mutex> lock(taskEnvInfoMutex_);
349     auto iter = taskEnvInfo_.find(env);
350     if (iter == taskEnvInfo_.end()) {
351         taskEnvInfo_.emplace(env, 1); // 1: default value
352     } else {
353         iter->second++;
354     }
355 }
356 
PopTaskEnvInfo(napi_env env)357 void TaskManager::PopTaskEnvInfo(napi_env env)
358 {
359     std::unique_lock<std::shared_mutex> lock(taskEnvInfoMutex_);
360     auto iter = taskEnvInfo_.find(env);
361     if (iter == taskEnvInfo_.end()) {
362         return;
363     } else if (--iter->second == 0) {
364         taskEnvInfo_.erase(iter);
365     }
366 }
367 
HasTaskEnvInfo(napi_env env)368 bool TaskManager::HasTaskEnvInfo(napi_env env)
369 {
370     std::shared_lock<std::shared_mutex> lock(taskEnvInfoMutex_);
371     return taskEnvInfo_.find(env) != taskEnvInfo_.end();
372 }
373 
StoreTaskInfo(uint32_t executeId,TaskInfo * taskInfo)374 void TaskManager::StoreTaskInfo(uint32_t executeId, TaskInfo* taskInfo)
375 {
376     std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
377     taskInfos_.emplace(executeId, taskInfo);
378 }
379 
StoreRunningInfo(uint32_t taskId,uint32_t executeId)380 void TaskManager::StoreRunningInfo(uint32_t taskId, uint32_t executeId)
381 {
382     std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
383     auto iter = runningInfos_.find(taskId);
384     if (iter == runningInfos_.end()) {
385         std::list<uint32_t> list {executeId};
386         runningInfos_.emplace(taskId, list);
387     } else {
388         iter->second.push_front(executeId);
389     }
390 }
391 
PopTaskInfo(uint32_t executeId)392 TaskInfo* TaskManager::PopTaskInfo(uint32_t executeId)
393 {
394     std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
395     auto iter = taskInfos_.find(executeId);
396     if (iter == taskInfos_.end() || iter->second == nullptr) {
397         return nullptr;
398     }
399 
400     TaskInfo* taskInfo = iter->second;
401     // remove the the taskInfo after call function
402     taskInfos_.erase(iter);
403     return taskInfo;
404 }
405 
GetTaskInfo(uint32_t executeId)406 TaskInfo* TaskManager::GetTaskInfo(uint32_t executeId)
407 {
408     std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
409     auto iter = taskInfos_.find(executeId);
410     if (iter == taskInfos_.end() || iter->second == nullptr) {
411         return nullptr;
412     }
413     return iter->second;
414 }
415 
MarkCanceledState(uint32_t executeId)416 bool TaskManager::MarkCanceledState(uint32_t executeId)
417 {
418     std::unique_lock<std::shared_mutex> lock(taskInfosMutex_);
419     auto iter = taskInfos_.find(executeId);
420     if (iter == taskInfos_.end() || iter->second == nullptr) {
421         return false;
422     }
423     if (!UpdateExecuteState(executeId, ExecuteState::CANCELED)) {
424         return false;
425     }
426     iter->second->isCanceled = true;
427     return true;
428 }
429 
PopRunningInfo(uint32_t taskId,uint32_t executeId)430 void TaskManager::PopRunningInfo(uint32_t taskId, uint32_t executeId)
431 {
432     std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
433     auto iter = runningInfos_.find(taskId);
434     if (iter == runningInfos_.end()) {
435         return;
436     }
437     iter->second.remove(executeId);
438 }
439 
AddExecuteState(uint32_t executeId)440 void TaskManager::AddExecuteState(uint32_t executeId)
441 {
442     std::unique_lock<std::shared_mutex> lock(executeStatesMutex_);
443     executeStates_.emplace(executeId, ExecuteState::WAITING);
444 }
445 
UpdateExecuteState(uint32_t executeId,ExecuteState state)446 bool TaskManager::UpdateExecuteState(uint32_t executeId, ExecuteState state)
447 {
448     std::unique_lock<std::shared_mutex> lock(executeStatesMutex_);
449     auto iter = executeStates_.find(executeId);
450     if (iter == executeStates_.end()) {
451         return false;
452     }
453     std::string traceInfo = "UpdateExecuteState: executeId : " + std::to_string(executeId) +
454                             ", executeState : " + std::to_string(state);
455     HITRACE_HELPER_METER_NAME(traceInfo);
456     iter->second = state;
457     return true;
458 }
459 
RemoveExecuteState(uint32_t executeId)460 void TaskManager::RemoveExecuteState(uint32_t executeId)
461 {
462     std::unique_lock<std::shared_mutex> lock(executeStatesMutex_);
463     executeStates_.erase(executeId);
464 }
465 
QueryExecuteState(uint32_t executeId)466 ExecuteState TaskManager::QueryExecuteState(uint32_t executeId)
467 {
468     std::shared_lock<std::shared_mutex> lock(executeStatesMutex_);
469     auto iter = executeStates_.find(executeId);
470     if (iter == executeStates_.end()) {
471         HILOG_DEBUG("taskpool:: Can not find the target task");
472         return ExecuteState::NOT_FOUND;
473     }
474     return iter->second;
475 }
476 
CancelTask(napi_env env,uint32_t taskId)477 void TaskManager::CancelTask(napi_env env, uint32_t taskId)
478 {
479     // Cannot find task by taskId, throw error
480     std::unique_lock<std::shared_mutex> lock(runningInfosMutex_);
481     auto iter = runningInfos_.find(taskId);
482     if (iter == runningInfos_.end() || iter->second.empty()) {
483         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK);
484         HILOG_ERROR("taskpool:: query nonexist task");
485         return;
486     }
487     for (uint32_t executeId : iter->second) {
488         CancelExecution(env, executeId);
489     }
490 }
491 
CancelExecution(napi_env env,uint32_t executeId)492 void TaskManager::CancelExecution(napi_env env, uint32_t executeId)
493 {
494     // 1. Cannot find taskInfo by executeId, throw error
495     // 2. Find executing taskInfo, skip it
496     // 3. Find waiting taskInfo, cancel it
497     // 4. Find canceled taskInfo, skip it
498     ExecuteState state = QueryExecuteState(executeId);
499     TaskInfo* taskInfo = nullptr;
500     switch (state) {
501         case ExecuteState::NOT_FOUND:
502             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK);
503             HILOG_ERROR("taskpool:: cancel nonexist task");
504             return;
505         case ExecuteState::RUNNING:
506             if (!MarkCanceledState(executeId)) {
507                 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK,
508                                         "taskpool:: fail to mark cancel state");
509                 return;
510             }
511             break;
512         case ExecuteState::WAITING:
513             HILOG_DEBUG("taskpool:: Cancel waiting task");
514             taskInfo = PopTaskInfo(executeId);
515             if (taskInfo != nullptr) {
516                 napi_value undefined = NapiHelper::GetUndefinedValue(taskInfo->env);
517                 napi_reject_deferred(taskInfo->env, taskInfo->deferred, undefined);
518                 ReleaseTaskContent(taskInfo);
519             }
520             RemoveExecuteState(executeId);
521             break;
522         default: // Default is CANCELED, means task isCanceled, do not need to mark again.
523             break;
524     }
525 }
526 
GenerateTaskInfo(napi_env env,napi_value func,napi_value args,uint32_t taskId,uint32_t executeId,napi_value transferList)527 TaskInfo* TaskManager::GenerateTaskInfo(napi_env env, napi_value func, napi_value args,
528                                         uint32_t taskId, uint32_t executeId, napi_value transferList)
529 {
530     napi_value undefined = NapiHelper::GetUndefinedValue(env);
531     napi_value serializationFunction;
532     napi_status status = napi_serialize(env, func, undefined, &serializationFunction);
533     if (status != napi_ok || serializationFunction == nullptr) {
534         ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, "taskpool: failed to serialize function.");
535         return nullptr;
536     }
537     napi_value serializationArguments;
538     if (transferList == nullptr) {
539         status = napi_serialize(env, args, undefined, &serializationArguments);
540     } else {
541         status = napi_serialize(env, args, transferList, &serializationArguments);
542     }
543     if (status != napi_ok || serializationArguments == nullptr) {
544         ErrorHelper::ThrowError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, "taskpool: failed to serialize arguments.");
545         return nullptr;
546     }
547     TaskInfo* taskInfo = new TaskInfo();
548     taskInfo->env = env;
549     taskInfo->executeId = executeId;
550     taskInfo->serializationFunction = serializationFunction;
551     taskInfo->serializationArguments = serializationArguments;
552     taskInfo->taskId = taskId;
553     taskInfo->onResultSignal = new uv_async_t;
554     uv_loop_t* loop = NapiHelper::GetLibUV(env);
555     uv_async_init(loop, taskInfo->onResultSignal, reinterpret_cast<uv_async_cb>(TaskPool::HandleTaskResult));
556     taskInfo->onResultSignal->data = taskInfo;
557 
558     StoreTaskInfo(executeId, taskInfo);
559     StoreTaskEnvInfo(env);
560     return taskInfo;
561 }
562 
GenerateTaskInfoFromTask(napi_env env,napi_value task,uint32_t executeId)563 TaskInfo* TaskManager::GenerateTaskInfoFromTask(napi_env env, napi_value task, uint32_t executeId)
564 {
565     napi_value function = NapiHelper::GetNameProperty(env, task, FUNCTION_STR);
566     napi_value arguments = NapiHelper::GetNameProperty(env, task, ARGUMENTS_STR);
567     napi_value taskId = NapiHelper::GetNameProperty(env, task, TASKID_STR);
568     if (function == nullptr || arguments == nullptr || taskId == nullptr) {
569         ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "taskpool:: task value is error");
570         return nullptr;
571     }
572     napi_value transferList = NapiHelper::GetNameProperty(env, task, TRANSFERLIST_STR);
573     uint32_t id = NapiHelper::GetUint32Value(env, taskId);
574     TaskInfo* taskInfo = GenerateTaskInfo(env, function, arguments, id, executeId, transferList);
575     return taskInfo;
576 }
577 
ReleaseTaskContent(TaskInfo * taskInfo)578 void TaskManager::ReleaseTaskContent(TaskInfo* taskInfo)
579 {
580     PopTaskEnvInfo(taskInfo->env);
581     if (taskInfo->onResultSignal != nullptr &&
582         !uv_is_closing(reinterpret_cast<uv_handle_t*>(taskInfo->onResultSignal))) {
583         uv_close(reinterpret_cast<uv_handle_t*>(taskInfo->onResultSignal), [](uv_handle_t* handle) {
584             if (handle != nullptr) {
585                 delete reinterpret_cast<uv_async_t*>(handle);
586                 handle = nullptr;
587             }
588         });
589     }
590     delete taskInfo;
591     taskInfo = nullptr;
592 }
593 
NotifyWorkerIdle(Worker * worker)594 void TaskManager::NotifyWorkerIdle(Worker* worker)
595 {
596     {
597         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
598         idleWorkers_.insert(worker);
599     }
600     if (GetTaskNum() != 0) {
601         NotifyExecuteTask();
602     }
603 }
604 
NotifyWorkerCreated(Worker * worker)605 void TaskManager::NotifyWorkerCreated(Worker* worker)
606 {
607     NotifyWorkerIdle(worker);
608     expandingCount_--;
609 }
610 
NotifyWorkerAdded(Worker * worker)611 void TaskManager::NotifyWorkerAdded(Worker* worker)
612 {
613     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
614     workers_.insert(worker);
615 }
616 
TryTriggerLoadBalance()617 void TaskManager::TryTriggerLoadBalance()
618 {
619     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
620     if (idleWorkers_.empty()) {
621         TriggerLoadBalance();
622     }
623 }
624 
GetIdleWorkers()625 uint32_t TaskManager::GetIdleWorkers()
626 {
627     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
628     return idleWorkers_.size();
629 }
630 
GetRunningWorkers()631 uint32_t TaskManager::GetRunningWorkers()
632 {
633     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
634     return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
635         return worker->runningCount_ != 0 || Timer::HasTimer(worker->workerEnv_);
636     });
637 }
638 
GetTimeoutWorkers()639 uint32_t TaskManager::GetTimeoutWorkers()
640 {
641     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
642     return timeoutWorkers_.size();
643 }
644 
GetTaskNum()645 uint32_t TaskManager::GetTaskNum()
646 {
647     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
648     return taskQueues_[Priority::HIGH]->GetTaskNum() + taskQueues_[Priority::MEDIUM]->GetTaskNum() +
649         taskQueues_[Priority::LOW]->GetTaskNum();
650 }
651 
GetThreadNum()652 uint32_t TaskManager::GetThreadNum()
653 {
654     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
655     return workers_.size();
656 }
657 
EnqueueExecuteId(uint32_t executeId,Priority priority)658 void TaskManager::EnqueueExecuteId(uint32_t executeId, Priority priority)
659 {
660     // once enqueued, reset the counter to make threads released at given time
661     // if timer is stopped and then new tasks enqueue, restart it
662     retryCount_ = 0;
663     if (suspend_) {
664         suspend_ = false;
665         uv_async_send(notifyRestartTimer_);
666     }
667 
668     {
669         std::lock_guard<std::mutex> lock(taskQueuesMutex_);
670         taskQueues_[priority]->EnqueueExecuteId(executeId);
671     }
672     NotifyExecuteTask();
673 }
674 
DequeueExecuteId()675 std::pair<uint32_t, Priority> TaskManager::DequeueExecuteId()
676 {
677     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
678     if (highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
679         auto& highTaskQueue = taskQueues_[Priority::HIGH];
680         highPrioExecuteCount_++;
681         return std::make_pair(highTaskQueue->DequeueExecuteId(), Priority::HIGH);
682     }
683     highPrioExecuteCount_ = 0;
684 
685     if (mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
686         auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
687         mediumPrioExecuteCount_++;
688         return std::make_pair(mediumTaskQueue->DequeueExecuteId(), Priority::MEDIUM);
689     }
690     mediumPrioExecuteCount_ = 0;
691 
692     auto& lowTaskQueue = taskQueues_[Priority::LOW];
693     return std::make_pair(lowTaskQueue->DequeueExecuteId(), Priority::LOW);
694 }
695 
NotifyExecuteTask()696 void TaskManager::NotifyExecuteTask()
697 {
698     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
699     if (idleWorkers_.empty()) {
700         return;
701     }
702     auto candidator = idleWorkers_.begin();
703     Worker* worker = *candidator;
704     idleWorkers_.erase(candidator);
705     worker->NotifyExecuteTask();
706 }
707 
InitTaskManager(napi_env env)708 void TaskManager::InitTaskManager(napi_env env)
709 {
710     HITRACE_HELPER_METER_NAME("InitTaskManager");
711     auto hostEngine = reinterpret_cast<NativeEngine*>(env);
712     while (hostEngine != nullptr && !hostEngine->IsMainThread()) {
713         hostEngine = hostEngine->GetHostEngine();
714     }
715     if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
716         hostEnv_ = reinterpret_cast<napi_env>(hostEngine);
717         // Add a reserved thread for taskpool
718         CreateWorkers(hostEnv_);
719         // Create a timer to manage worker threads
720         std::thread workerManager(&TaskManager::RunTaskManager, this);
721         workerManager.detach();
722     }
723 }
724 
CreateWorkers(napi_env env,uint32_t num)725 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
726 {
727     for (uint32_t i = 0; i < num; i++) {
728         expandingCount_++;
729         auto worker = Worker::WorkerConstructor(env);
730         NotifyWorkerAdded(worker);
731     }
732 }
733 
RemoveWorker(Worker * worker)734 void TaskManager::RemoveWorker(Worker* worker)
735 {
736     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
737     idleWorkers_.erase(worker);
738     workers_.erase(worker);
739 }
740 
IsCanceled(napi_env env,napi_callback_info cbinfo)741 napi_value TaskManager::IsCanceled(napi_env env, napi_callback_info cbinfo)
742 {
743     bool isCanceled = false;
744     auto engine = reinterpret_cast<NativeEngine*>(env);
745     if (!engine->IsTaskPoolThread()) {
746         HILOG_ERROR("taskpool:: call isCanceled not in taskpool thread");
747         return NapiHelper::CreateBooleanValue(env, isCanceled);
748     }
749     // Get taskInfo and query task cancel state
750     void* data = engine->GetCurrentTaskInfo();
751     if (data == nullptr) {
752         HILOG_ERROR("taskpool:: call isCanceled not in Concurrent function");
753     } else {
754         TaskInfo* taskInfo = static_cast<TaskInfo*>(data);
755         isCanceled = taskInfo->isCanceled;
756     }
757     return NapiHelper::CreateBooleanValue(env, isCanceled);
758 }
759 
760 // ----------------------------------- TaskGroupManager ----------------------------------------
GetInstance()761 TaskGroupManager &TaskGroupManager::GetInstance()
762 {
763     static TaskGroupManager groupManager;
764     return groupManager;
765 }
766 
GenerateGroupId()767 uint32_t TaskGroupManager::GenerateGroupId()
768 {
769     return groupId_++;
770 }
771 
GenerateGroupInfo(napi_env env,uint32_t taskNum,uint32_t groupId,uint32_t groupExecuteId)772 GroupInfo* TaskGroupManager::GenerateGroupInfo(napi_env env, uint32_t taskNum, uint32_t groupId,
773                                                uint32_t groupExecuteId)
774 {
775     GroupInfo* groupInfo = new GroupInfo();
776     groupInfo->taskNum = taskNum;
777     groupInfo->groupId = groupId;
778     napi_value resArr;
779     napi_create_array_with_length(env, taskNum, &resArr);
780     napi_ref arrRef = NapiHelper::CreateReference(env, resArr, 1);
781     groupInfo->resArr = arrRef;
782     StoreExecuteId(groupId, groupExecuteId);
783     StoreRunningExecuteId(groupExecuteId);
784     AddGroupInfoById(groupExecuteId, groupInfo);
785     return groupInfo;
786 }
787 
ClearGroupInfo(napi_env env,uint32_t groupExecuteId,GroupInfo * groupInfo)788 void TaskGroupManager::ClearGroupInfo(napi_env env, uint32_t groupExecuteId, GroupInfo* groupInfo)
789 {
790     RemoveRunningExecuteId(groupExecuteId);
791     RemoveGroupInfoById(groupExecuteId);
792     napi_delete_reference(env, groupInfo->resArr);
793     delete groupInfo;
794     groupInfo = nullptr;
795 }
796 
AddTask(uint32_t groupId,napi_ref task)797 void TaskGroupManager::AddTask(uint32_t groupId, napi_ref task)
798 {
799     std::unique_lock<std::shared_mutex> lock(tasksMutex_);
800     auto iter = tasks_.find(groupId);
801     if (iter == tasks_.end()) {
802         std::list<napi_ref> list {task};
803         tasks_.emplace(groupId, list);
804     } else {
805         iter->second.push_back(task);
806     }
807 }
808 
GetTasksByGroup(uint32_t groupId)809 const std::list<napi_ref>& TaskGroupManager::GetTasksByGroup(uint32_t groupId)
810 {
811     std::shared_lock<std::shared_mutex> lock(tasksMutex_);
812     auto iter = tasks_.find(groupId);
813     if (iter == tasks_.end()) {
814         static const std::list<napi_ref> EMPTY_TASK_LIST {};
815         return EMPTY_TASK_LIST;
816     }
817     return iter->second;
818 }
819 
ClearTasks(napi_env env,uint32_t groupId)820 void TaskGroupManager::ClearTasks(napi_env env, uint32_t groupId)
821 {
822     std::unique_lock<std::shared_mutex> lock(tasksMutex_);
823     auto iter = tasks_.find(groupId);
824     if (iter == tasks_.end()) {
825         return;
826     }
827     for (napi_ref task : iter->second) {
828         napi_delete_reference(env, task);
829     }
830     tasks_.erase(iter);
831 }
832 
StoreExecuteId(uint32_t groupId,uint32_t groupExecuteId)833 void TaskGroupManager::StoreExecuteId(uint32_t groupId, uint32_t groupExecuteId)
834 {
835     std::lock_guard<std::mutex> lock(groupExecuteIdsMutex_);
836     auto iter = groupExecuteIds_.find(groupId);
837     if (iter == groupExecuteIds_.end()) {
838         std::list<uint32_t> list {groupExecuteId};
839         groupExecuteIds_.emplace(groupId, std::move(list));
840     } else {
841         iter->second.push_back(groupExecuteId);
842     }
843 }
844 
RemoveExecuteId(uint32_t groupId,uint32_t groupExecuteId)845 void TaskGroupManager::RemoveExecuteId(uint32_t groupId, uint32_t groupExecuteId)
846 {
847     std::lock_guard<std::mutex> lock(groupExecuteIdsMutex_);
848     auto iter = groupExecuteIds_.find(groupId);
849     if (iter != groupExecuteIds_.end()) {
850         iter->second.remove(groupExecuteId);
851     }
852     if (iter->second.empty()) {
853         groupExecuteIds_.erase(iter);
854     }
855 }
856 
ClearExecuteId(uint32_t groupId)857 void TaskGroupManager::ClearExecuteId(uint32_t groupId)
858 {
859     std::lock_guard<std::mutex> lock(groupExecuteIdsMutex_);
860     groupExecuteIds_.erase(groupId);
861 }
862 
CancelGroup(napi_env env,uint32_t groupId)863 void TaskGroupManager::CancelGroup(napi_env env, uint32_t groupId)
864 {
865     std::lock_guard<std::mutex> lock(groupExecuteIdsMutex_);
866     auto iter = groupExecuteIds_.find(groupId);
867     if (iter == groupExecuteIds_.end() || iter->second.empty()) {
868         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP);
869         HILOG_ERROR("taskpool:: cancel nonexist task group");
870         return;
871     }
872     for (uint32_t groupExecuteId : iter->second) {
873         bool isRunning = IsRunning(groupExecuteId);
874         if (!isRunning) {
875             continue;
876         }
877         GroupInfo* info = GetGroupInfoByExecutionId(groupExecuteId);
878         if (info == nullptr) {
879             continue;
880         }
881         const std::list<uint32_t>& executeList = info->executeIds;
882         for (uint32_t executeId : executeList) {
883             CancelGroupExecution(executeId);
884         }
885         napi_value undefined = NapiHelper::GetUndefinedValue(env);
886         napi_reject_deferred(env, info->deferred, undefined);
887         TaskGroupManager::GetInstance().ClearGroupInfo(env, groupExecuteId, info);
888     }
889 }
890 
GenerateGroupExecuteId()891 uint32_t TaskGroupManager::GenerateGroupExecuteId()
892 {
893     return groupExecuteId_++;
894 }
895 
StoreRunningExecuteId(uint32_t groupExecuteId)896 void TaskGroupManager::StoreRunningExecuteId(uint32_t groupExecuteId)
897 {
898     std::unique_lock<std::shared_mutex> lock(groupExecutionsMutex_);
899     runningGroupExecutions_.insert(groupExecuteId);
900 }
901 
RemoveRunningExecuteId(uint32_t groupExecuteId)902 void TaskGroupManager::RemoveRunningExecuteId(uint32_t groupExecuteId)
903 {
904     std::unique_lock<std::shared_mutex> lock(groupExecutionsMutex_);
905     runningGroupExecutions_.erase(groupExecuteId);
906 }
907 
IsRunning(uint32_t groupExecuteId)908 bool TaskGroupManager::IsRunning(uint32_t groupExecuteId)
909 {
910     std::shared_lock<std::shared_mutex> lock(groupExecutionsMutex_);
911     bool isRunning = runningGroupExecutions_.find(groupExecuteId) != runningGroupExecutions_.end();
912     return isRunning;
913 }
914 
AddGroupInfoById(uint32_t groupExecuteId,GroupInfo * info)915 void TaskGroupManager::AddGroupInfoById(uint32_t groupExecuteId, GroupInfo* info)
916 {
917     std::unique_lock<std::shared_mutex> lock(groupInfoMapMutex_);
918     groupInfoMap_.emplace(groupExecuteId, info);
919 }
920 
RemoveGroupInfoById(uint32_t groupExecuteId)921 void TaskGroupManager::RemoveGroupInfoById(uint32_t groupExecuteId)
922 {
923     std::unique_lock<std::shared_mutex> lock(groupInfoMapMutex_);
924     groupInfoMap_.erase(groupExecuteId);
925 }
926 
GetGroupInfoByExecutionId(uint32_t groupExecuteId)927 GroupInfo* TaskGroupManager::GetGroupInfoByExecutionId(uint32_t groupExecuteId)
928 {
929     std::shared_lock<std::shared_mutex> lock(groupInfoMapMutex_);
930     auto iter = groupInfoMap_.find(groupExecuteId);
931     if (iter != groupInfoMap_.end()) {
932         return iter->second;
933     }
934     return nullptr;
935 }
936 
CancelGroupExecution(uint32_t executeId)937 void TaskGroupManager::CancelGroupExecution(uint32_t executeId)
938 {
939     ExecuteState state = TaskManager::GetInstance().QueryExecuteState(executeId);
940     TaskInfo* taskInfo = nullptr;
941     switch (state) {
942         case ExecuteState::NOT_FOUND:
943             break;
944         case ExecuteState::RUNNING:
945             TaskManager::GetInstance().MarkCanceledState(executeId);
946             break;
947         case ExecuteState::WAITING:
948             HILOG_DEBUG("taskpool:: Cancel waiting task in group");
949             taskInfo = TaskManager::GetInstance().PopTaskInfo(executeId);
950             if (taskInfo == nullptr) {
951                 HILOG_ERROR("taskpool:: taskInfo is nullptr when cancel waiting execution");
952                 return;
953             }
954             TaskManager::GetInstance().RemoveExecuteState(executeId);
955             TaskManager::GetInstance().ReleaseTaskContent(taskInfo);
956             break;
957         default: // Default is CANCELED, means task isCanceled, do not need to mark again.
958             break;
959     }
960 }
961 } // namespace Commonlibrary::Concurrent::TaskPoolModule
962