• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_manager.h"
17 
18 #include <cinttypes>
19 #include <securec.h>
20 #include <thread>
21 
22 #if defined(ENABLE_TASKPOOL_FFRT)
23 #include "bundle_info.h"
24 #include "bundle_mgr_interface.h"
25 #include "bundle_mgr_proxy.h"
26 #include "iservice_registry.h"
27 #include "parameters.h"
28 #include "status_receiver_interface.h"
29 #include "system_ability_definition.h"
30 #include "c/executor_task.h"
31 #include "ffrt_inner.h"
32 #endif
33 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
34 #include "helper/concurrent_helper.h"
35 #include "helper/error_helper.h"
36 #include "helper/hitrace_helper.h"
37 #include "taskpool.h"
38 #include "tools/log.h"
39 #include "worker.h"
40 
41 namespace Commonlibrary::Concurrent::TaskPoolModule {
42 using namespace OHOS::JsSysModule;
43 
44 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
45 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
46 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
47 static constexpr uint32_t STEP_SIZE = 2;
48 static constexpr uint32_t DEFAULT_THREADS = 3;
49 static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
50 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
51 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
52 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
53 static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
54 static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
55 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
56 
57 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
58 static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
59     {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
60     {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
61     {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
62     {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
63 };
64 #endif
65 
66 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()67 TaskManager& TaskManager::GetInstance()
68 {
69     static TaskManager manager;
70     return manager;
71 }
72 
TaskManager()73 TaskManager::TaskManager()
74 {
75     for (size_t i = 0; i < taskQueues_.size(); i++) {
76         std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
77         taskQueues_[i] = std::move(taskQueue);
78     }
79 }
80 
~TaskManager()81 TaskManager::~TaskManager()
82 {
83     HILOG_INFO("taskpool:: ~TaskManager");
84     if (timer_ == nullptr) {
85         HILOG_ERROR("taskpool:: timer_ is nullptr");
86     } else {
87         uv_timer_stop(timer_);
88         ConcurrentHelper::UvHandleClose(timer_);
89         ConcurrentHelper::UvHandleClose(expandHandle_);
90     }
91 
92     if (loop_ != nullptr) {
93         uv_stop(loop_);
94     }
95 
96     {
97         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
98         for (auto& worker : workers_) {
99             delete worker;
100         }
101         workers_.clear();
102     }
103 
104     {
105         std::lock_guard<std::mutex> lock(callbackMutex_);
106         for (auto& [_, callbackPtr] : callbackTable_) {
107             if (callbackPtr == nullptr) {
108                 continue;
109             }
110             callbackPtr.reset();
111         }
112         callbackTable_.clear();
113     }
114 
115     {
116         std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
117         for (auto& [_, task] : tasks_) {
118             delete task;
119             task = nullptr;
120         }
121         tasks_.clear();
122     }
123     CountTraceForWorker();
124 }
125 
CountTraceForWorker()126 void TaskManager::CountTraceForWorker()
127 {
128     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
129     int64_t threadNum = static_cast<int64_t>(workers_.size());
130     int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
131     int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
132     HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
133     HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
134     HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
135     HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
136 }
137 
GetThreadInfos(napi_env env)138 napi_value TaskManager::GetThreadInfos(napi_env env)
139 {
140     napi_value threadInfos = nullptr;
141     napi_create_array(env, &threadInfos);
142     {
143         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
144         int32_t i = 0;
145         for (auto& worker : workers_) {
146             if (worker->workerEnv_ == nullptr) {
147                 continue;
148             }
149             napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_));
150             napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_));
151 
152             napi_value taskId = nullptr;
153             napi_create_array(env, &taskId);
154             int32_t j = 0;
155             {
156                 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
157                 for (auto& currentId : worker->currentTaskId_) {
158                     napi_value id = NapiHelper::CreateUint32(env, currentId);
159                     napi_set_element(env, taskId, j, id);
160                     j++;
161                 }
162             }
163             napi_value threadInfo = nullptr;
164             napi_create_object(env, &threadInfo);
165             napi_set_named_property(env, threadInfo, "tid", tid);
166             napi_set_named_property(env, threadInfo, "priority", priority);
167             napi_set_named_property(env, threadInfo, "taskIds", taskId);
168             napi_set_element(env, threadInfos, i, threadInfo);
169             i++;
170         }
171     }
172     return threadInfos;
173 }
174 
GetTaskInfos(napi_env env)175 napi_value TaskManager::GetTaskInfos(napi_env env)
176 {
177     napi_value taskInfos = nullptr;
178     napi_create_array(env, &taskInfos);
179     {
180         std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
181         int32_t i = 0;
182         for (const auto& [_, task] : tasks_) {
183             if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
184                 task->taskState_ == ExecuteState::FINISHED) {
185                 continue;
186             }
187             napi_value taskInfoValue = NapiHelper::CreateObject(env);
188             std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
189             napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_);
190             napi_value name = nullptr;
191             napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
192             napi_set_named_property(env, taskInfoValue, "name", name);
193             ExecuteState state = task->taskState_;
194             uint64_t duration = 0;
195             if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
196                 duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
197             }
198             napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
199             napi_set_named_property(env, taskInfoValue, "taskId", taskId);
200             napi_set_named_property(env, taskInfoValue, "state", stateValue);
201             napi_value durationValue = NapiHelper::CreateUint32(env, duration);
202             napi_set_named_property(env, taskInfoValue, "duration", durationValue);
203             napi_set_element(env, taskInfos, i, taskInfoValue);
204             i++;
205         }
206     }
207     return taskInfos;
208 }
209 
UpdateExecutedInfo(uint64_t duration)210 void TaskManager::UpdateExecutedInfo(uint64_t duration)
211 {
212     totalExecTime_ += duration;
213     totalExecCount_++;
214 }
215 
ComputeSuitableThreadNum()216 uint32_t TaskManager::ComputeSuitableThreadNum()
217 {
218     uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
219     return targetNum;
220 }
221 
ComputeSuitableIdleNum()222 uint32_t TaskManager::ComputeSuitableIdleNum()
223 {
224     uint32_t targetNum = 0;
225     if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
226         // this branch is used for avoiding time-consuming tasks that may block the taskpool
227         targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
228     } else if (totalExecCount_ != 0) {
229         auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
230         uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
231         targetNum = std::min(result, GetNonIdleTaskNum());
232     }
233     return targetNum;
234 }
235 
CheckForBlockedWorkers()236 void TaskManager::CheckForBlockedWorkers()
237 {
238     // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
239     // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
240     // else we will choose the longer one
241     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
242     bool needChecking = false;
243     bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
244     uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
245     for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
246         auto worker = *iter;
247         // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
248         // if the worker is executing the longTask, we will not do the check
249         if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
250             (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
251             !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
252             continue;
253         }
254         // When executing the promise task, the worker state may not be updated and will be
255         // marked as 'BLOCKED', so we should exclude this situation.
256         // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
257         // the task like I/O in uv threads, we should also exclude this situation.
258         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
259         if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
260             if (!workerEngine->HasWaitingRequest()) {
261                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
262             } else {
263                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
264                 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
265             }
266             continue;
267         }
268 
269         HILOG_INFO("taskpool:: The worker has been marked as timeout.");
270         // If the current worker has a longTask and is not executing, we will only interrupt it.
271         if (worker->HasLongTask()) {
272             continue;
273         }
274         needChecking = true;
275         idleWorkers_.erase(worker);
276         timeoutWorkers_.insert(worker);
277     }
278     // should trigger the check when we have marked and removed workers
279     if (UNLIKELY(needChecking)) {
280         TryExpand();
281     }
282 }
283 
TryTriggerExpand()284 void TaskManager::TryTriggerExpand()
285 {
286     // post the signal to notify the monitor thread to expand
287     if (UNLIKELY(!isHandleInited_)) {
288         NotifyExecuteTask();
289         needChecking_ = true;
290         HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr");
291         return;
292     }
293     uv_async_send(expandHandle_);
294 }
295 
296 #if defined(OHOS_PLATFORM)
297 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(pid_t tid,char * buf,uint32_t size)298 bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
299 {
300     char path[128]; // 128: buffer for path
301     pid_t pid = getpid();
302     ssize_t bytesLen = -1;
303     int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
304     if (ret < 0) {
305         HILOG_ERROR("snprintf_s failed");
306         return false;
307     }
308     int fd = open(path, O_RDONLY | O_NONBLOCK);
309     if (UNLIKELY(fd == -1)) {
310         return false;
311     }
312     bytesLen = read(fd, buf, size - 1);
313     close(fd);
314     if (bytesLen <= 0) {
315         HILOG_ERROR("taskpool:: failed to read %{public}s", path);
316         return false;
317     }
318     buf[bytesLen] = '\0';
319     return true;
320 }
321 
GetIdleWorkers()322 uint32_t TaskManager::GetIdleWorkers()
323 {
324     char buf[4096]; // 4096: buffer for thread info
325     uint32_t idleCount = 0;
326     std::unordered_set<pid_t> tids {};
327     {
328         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
329         for (auto& worker : idleWorkers_) {
330 #if defined(ENABLE_TASKPOOL_FFRT)
331             if (worker->ffrtTaskHandle_ != nullptr) {
332                 if (worker->GetWaitTime() > 0) {
333                     idleCount++;
334                 }
335                 continue;
336             }
337 #endif
338             tids.emplace(worker->tid_);
339         }
340     }
341     // The ffrt thread does not read thread info
342     for (auto tid : tids) {
343         if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
344             continue;
345         }
346         char state;
347         if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
348             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
349             return 0;
350         }
351         if (state == 'S') {
352             idleCount++;
353         }
354     }
355     return idleCount;
356 }
357 
GetIdleWorkersList(uint32_t step)358 void TaskManager::GetIdleWorkersList(uint32_t step)
359 {
360     char buf[4096]; // 4096: buffer for thread info
361     for (auto& worker : idleWorkers_) {
362 #if defined(ENABLE_TASKPOOL_FFRT)
363         if (worker->ffrtTaskHandle_ != nullptr) {
364             uint64_t workerWaitTime = worker->GetWaitTime();
365             bool isWorkerLoopActive = worker->IsLoopActive();
366             if (workerWaitTime == 0) {
367                 continue;
368             }
369             uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
370                 std::chrono::steady_clock::now().time_since_epoch()).count());
371             if (!isWorkerLoopActive) {
372                 freeList_.emplace_back(worker);
373             } else if ((currTime - workerWaitTime) > IDLE_THRESHOLD * TRIGGER_INTERVAL) {
374                 freeList_.emplace_back(worker);
375                 HILOG_INFO("taskpool:: worker in ffrt epoll wait more than 2 intervals, force to free.");
376             } else {
377                 auto waitTime = std::to_string(workerWaitTime);
378                 HILOG_INFO("taskpool:: worker uv alive, will free 2 intervals, time: %{public}s.", waitTime.c_str());
379             }
380             continue;
381         }
382 #endif
383         if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
384             continue;
385         }
386         char state;
387         uint64_t utime;
388         if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
389             &state, sizeof(state), &utime) != 2) { // 2: state and utime
390             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
391             return;
392         }
393         if (state != 'S' || utime != worker->lastCpuTime_) {
394             worker->idleCount_ = 0;
395             worker->lastCpuTime_ = utime;
396             continue;
397         }
398         if (++worker->idleCount_ >= IDLE_THRESHOLD) {
399             freeList_.emplace_back(worker);
400         }
401     }
402 }
403 
TriggerShrink(uint32_t step)404 void TaskManager::TriggerShrink(uint32_t step)
405 {
406     GetIdleWorkersList(step);
407     step = std::min(step, static_cast<uint32_t>(freeList_.size()));
408     uint32_t count = 0;
409     for (size_t i = 0; i < freeList_.size(); i++) {
410         auto worker = freeList_[i];
411         if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
412             continue;
413         }
414         auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
415         if (idleTime < MAX_IDLE_TIME || worker->runningCount_ != 0) {
416             continue;
417         }
418         idleWorkers_.erase(worker);
419         HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
420         uv_async_send(worker->clearWorkerSignal_);
421         if (++count == step) {
422             break;
423         }
424     }
425     freeList_.clear();
426 }
427 #else
GetIdleWorkers()428 uint32_t TaskManager::GetIdleWorkers()
429 {
430     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
431     return idleWorkers_.size();
432 }
433 
TriggerShrink(uint32_t step)434 void TaskManager::TriggerShrink(uint32_t step)
435 {
436     for (uint32_t i = 0; i < step; i++) {
437         // try to free the worker that idle time meets the requirement
438         auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) {
439             auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
440             return idleTime > MAX_IDLE_TIME && worker->runningCount_ == 0 && !worker->HasLongTask();
441         });
442         // remove it from all sets
443         if (iter != idleWorkers_.end()) {
444             auto worker = *iter;
445             idleWorkers_.erase(worker);
446             HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
447             uv_async_send(worker->clearWorkerSignal_);
448         }
449     }
450 }
451 #endif
452 
NotifyShrink(uint32_t targetNum)453 void TaskManager::NotifyShrink(uint32_t targetNum)
454 {
455     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
456     uint32_t workerCount = workers_.size();
457     uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
458     if (minThread == 0) {
459         HILOG_INFO("taskpool:: the system now is under low memory");
460     }
461     if (workerCount > minThread && workerCount > targetNum) {
462         targetNum = std::max(minThread, targetNum);
463         uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
464         TriggerShrink(step);
465     }
466     // remove all timeout workers
467     for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
468         if (workers_.find(*iter) == workers_.end()) {
469             HILOG_WARN("taskpool:: current worker maybe release");
470             iter = timeoutWorkers_.erase(iter);
471         } else if ((*iter)->runningCount_ == 0) {
472             HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", (*iter)->tid_);
473             uv_async_send((*iter)->clearWorkerSignal_);
474             timeoutWorkers_.erase(iter++);
475             return;
476         } else {
477             iter++;
478         }
479     }
480     uint32_t idleNum = idleWorkers_.size();
481     // System memory state is moderate and the worker has exeuted tasks, we will try to release it
482     if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
483         auto worker = *(idleWorkers_.begin());
484         if (worker == nullptr || worker->clearWorkerSignal_ == nullptr) {
485             return;
486         }
487         if (worker->HasLongTask()) { // worker that has longTask should not be released
488             return;
489         }
490         if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
491             TriggerShrink(DEFAULT_MIN_THREADS);
492             return;
493         }
494     }
495 
496     // Create a worker for performance
497     if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
498         CreateWorkers(hostEnv_);
499     }
500     // stop the timer
501     if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
502         suspend_ = true;
503         uv_timer_stop(timer_);
504         HILOG_DEBUG("taskpool:: timer will be suspended");
505     }
506 }
507 
TriggerLoadBalance(const uv_timer_t * req)508 void TaskManager::TriggerLoadBalance(const uv_timer_t* req)
509 {
510     TaskManager& taskManager = TaskManager::GetInstance();
511     taskManager.CheckForBlockedWorkers();
512     uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
513     taskManager.NotifyShrink(targetNum);
514     taskManager.CountTraceForWorker();
515 }
516 
TryExpand()517 void TaskManager::TryExpand()
518 {
519     // dispatch task in the TaskPoolManager thread
520     NotifyExecuteTask();
521     // do not trigger when there are more idleWorkers than tasks
522     uint32_t idleNum = GetIdleWorkers();
523     if (idleNum > GetNonIdleTaskNum()) {
524         return;
525     }
526     needChecking_ = false; // do not need to check
527     uint32_t targetNum = ComputeSuitableIdleNum();
528     uint32_t workerCount = 0;
529     uint32_t idleCount = 0;
530     uint32_t timeoutWorkers = 0;
531     {
532         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
533         idleCount = std::min(idleNum, static_cast<uint32_t>(idleWorkers_.size()));
534         workerCount = workers_.size();
535         timeoutWorkers = timeoutWorkers_.size();
536     }
537     uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS);
538     maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
539     if (workerCount < maxThreads && idleCount < targetNum) {
540         uint32_t step = std::min(maxThreads, targetNum) - idleCount;
541         // Prevent the total number of expanded threads from exceeding maxThreads
542         if (step + workerCount > maxThreads) {
543             step = maxThreads - workerCount;
544         }
545         CreateWorkers(hostEnv_, step);
546         HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
547             maxThreads, step, GetThreadNum());
548     }
549     if (UNLIKELY(suspend_)) {
550         suspend_ = false;
551         uv_timer_again(timer_);
552     }
553 }
554 
NotifyExpand(const uv_async_t * req)555 void TaskManager::NotifyExpand(const uv_async_t* req)
556 {
557     TaskManager& taskManager = TaskManager::GetInstance();
558     taskManager.TryExpand();
559 }
560 
RunTaskManager()561 void TaskManager::RunTaskManager()
562 {
563     loop_ = uv_loop_new();
564     if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
565         HILOG_FATAL("taskpool:: new loop failed.");
566         return;
567     }
568     ConcurrentHelper::UvHandleInit(loop_, expandHandle_, TaskManager::NotifyExpand);
569     timer_ = new uv_timer_t;
570     uv_timer_init(loop_, timer_);
571     uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
572     isHandleInited_ = true;
573 #if defined IOS_PLATFORM || defined MAC_PLATFORM
574     pthread_setname_np("OS_TaskManager");
575 #else
576     pthread_setname_np(pthread_self(), "OS_TaskManager");
577 #endif
578     if (UNLIKELY(needChecking_)) {
579         needChecking_ = false;
580         uv_async_send(expandHandle_);
581     }
582     uv_run(loop_, UV_RUN_DEFAULT);
583     if (loop_ != nullptr) {
584         uv_loop_delete(loop_);
585     }
586 }
587 
CancelTask(napi_env env,uint64_t taskId)588 void TaskManager::CancelTask(napi_env env, uint64_t taskId)
589 {
590     // 1. Cannot find taskInfo by executeId, throw error
591     // 2. Find executing taskInfo, skip it
592     // 3. Find waiting taskInfo, cancel it
593     // 4. Find canceled taskInfo, skip it
594     std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
595     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
596     HITRACE_HELPER_METER_NAME(strTrace);
597     Task* task = GetTask(taskId);
598     if (task == nullptr) {
599         std::string errMsg = "taskpool:: the task may not exist";
600         HILOG_ERROR("%{public}s", errMsg.c_str());
601         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
602         return;
603     }
604     if (task->taskState_ == ExecuteState::CANCELED) {
605         HILOG_DEBUG("taskpool:: task has been canceled");
606         return;
607     }
608     if (task->IsGroupCommonTask()) {
609         // when task is a group common task, still check the state
610         if (task->currentTaskInfo_ == nullptr || task->taskState_ == ExecuteState::NOT_FOUND ||
611             task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
612             std::string errMsg = "taskpool:: task is not executed or has been executed";
613             HILOG_ERROR("%{public}s", errMsg.c_str());
614             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
615             return;
616         }
617         TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
618         if (taskGroup == nullptr) {
619             return;
620         }
621         return taskGroup->CancelGroupTask(env, task->taskId_);
622     }
623 
624     if (task->IsPeriodicTask()) {
625         napi_reference_unref(env, task->taskRef_, nullptr);
626         task->CancelPendingTask(env);
627         uv_timer_stop(task->timer_);
628         ConcurrentHelper::UvHandleClose(task->timer_);
629         return;
630     } else if (task->IsSeqRunnerTask()) {
631         CancelSeqRunnerTask(env, task);
632         return;
633     }
634     ExecuteState state = ExecuteState::NOT_FOUND;
635     {
636         std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
637         if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
638             task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
639             task->taskState_ == ExecuteState::ENDING) {
640             std::string errMsg = "taskpool:: task is not executed or has been executed";
641             HILOG_ERROR("%{public}s", errMsg.c_str());
642             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
643             return;
644         }
645         state = task->taskState_.exchange(ExecuteState::CANCELED);
646     }
647     task->ClearDelayedTimers();
648     task->CancelPendingTask(env);
649     std::list<napi_deferred> deferreds {};
650     {
651         std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
652         if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr &&
653             EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) {
654             reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
655             task->DecreaseTaskRefCount();
656             DecreaseRefCount(env, task->taskId_);
657             deferreds.push_back(task->currentTaskInfo_->deferred);
658             napi_reference_unref(env, task->taskRef_, nullptr);
659             delete task->currentTaskInfo_;
660             task->currentTaskInfo_ = nullptr;
661             task->isCancelToFinish_ = true;
662         }
663         if (state == ExecuteState::DELAYED) {
664             task->isCancelToFinish_ = true;
665         }
666     }
667     std::string error = "taskpool:: task has been canceled";
668     BatchRejectDeferred(env, deferreds, error);
669 }
670 
CancelSeqRunnerTask(napi_env env,Task * task)671 void TaskManager::CancelSeqRunnerTask(napi_env env, Task *task)
672 {
673     if (task->taskState_ == ExecuteState::FINISHED) {
674         std::string errMsg = "taskpool:: sequenceRunner task has been executed";
675         HILOG_ERROR("%{public}s", errMsg.c_str());
676         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
677     } else {
678         task->taskState_ = ExecuteState::CANCELED;
679     }
680 }
681 
NotifyWorkerIdle(Worker * worker)682 void TaskManager::NotifyWorkerIdle(Worker* worker)
683 {
684     {
685         std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
686         if (worker->state_ == WorkerState::BLOCKED) {
687             return;
688         }
689         idleWorkers_.insert(worker);
690     }
691     if (GetTaskNum() != 0) {
692         NotifyExecuteTask();
693     }
694     CountTraceForWorker();
695 }
696 
NotifyWorkerCreated(Worker * worker)697 void TaskManager::NotifyWorkerCreated(Worker* worker)
698 {
699     NotifyWorkerIdle(worker);
700 }
701 
NotifyWorkerAdded(Worker * worker)702 void TaskManager::NotifyWorkerAdded(Worker* worker)
703 {
704     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
705     workers_.insert(worker);
706     HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
707 }
708 
NotifyWorkerRunning(Worker * worker)709 void TaskManager::NotifyWorkerRunning(Worker* worker)
710 {
711     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
712     idleWorkers_.erase(worker);
713     CountTraceForWorker();
714 }
715 
GetRunningWorkers()716 uint32_t TaskManager::GetRunningWorkers()
717 {
718     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
719     return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
720         return worker->runningCount_ != 0;
721     });
722 }
723 
GetTimeoutWorkers()724 uint32_t TaskManager::GetTimeoutWorkers()
725 {
726     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
727     return timeoutWorkers_.size();
728 }
729 
GetTaskNum()730 uint32_t TaskManager::GetTaskNum()
731 {
732     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
733     uint32_t sum = 0;
734     for (const auto& elements : taskQueues_) {
735         sum += elements->GetTaskNum();
736     }
737     return sum;
738 }
739 
GetNonIdleTaskNum()740 uint32_t TaskManager::GetNonIdleTaskNum()
741 {
742     return nonIdleTaskNum_;
743 }
744 
IncreaseNumIfNoIdle(Priority priority)745 void TaskManager::IncreaseNumIfNoIdle(Priority priority)
746 {
747     if (priority != Priority::IDLE) {
748         ++nonIdleTaskNum_;
749     }
750 }
751 
DecreaseNumIfNoIdle(Priority priority)752 void TaskManager::DecreaseNumIfNoIdle(Priority priority)
753 {
754     if (priority != Priority::IDLE) {
755         --nonIdleTaskNum_;
756     }
757 }
758 
GetThreadNum()759 uint32_t TaskManager::GetThreadNum()
760 {
761     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
762     return workers_.size();
763 }
764 
EnqueueTaskId(uint64_t taskId,Priority priority)765 void TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority)
766 {
767     {
768         std::lock_guard<std::mutex> lock(taskQueuesMutex_);
769         IncreaseNumIfNoIdle(priority);
770         taskQueues_[priority]->EnqueueTaskId(taskId);
771     }
772     TryTriggerExpand();
773     Task* task = GetTask(taskId);
774     if (task == nullptr) {
775         HILOG_FATAL("taskpool:: task is nullptr");
776         return;
777     }
778     task->IncreaseTaskRefCount();
779     if (task->onEnqueuedCallBackInfo_ != nullptr) {
780         task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_);
781     }
782 }
783 
EraseWaitingTaskId(uint64_t taskId,Priority priority)784 bool TaskManager::EraseWaitingTaskId(uint64_t taskId, Priority priority)
785 {
786     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
787     if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
788         HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
789         return false;
790     }
791     return true;
792 }
793 
DequeueTaskId()794 std::pair<uint64_t, Priority> TaskManager::DequeueTaskId()
795 {
796     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
797     auto& highTaskQueue = taskQueues_[Priority::HIGH];
798     if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
799         highPrioExecuteCount_++;
800         return GetTaskByPriority(highTaskQueue, Priority::HIGH);
801     }
802     highPrioExecuteCount_ = 0;
803 
804     auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
805     if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
806         mediumPrioExecuteCount_++;
807         return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
808     }
809     mediumPrioExecuteCount_ = 0;
810 
811     auto& lowTaskQueue = taskQueues_[Priority::LOW];
812     if (!lowTaskQueue->IsEmpty()) {
813         return GetTaskByPriority(lowTaskQueue, Priority::LOW);
814     }
815 
816     auto& idleTaskQueue = taskQueues_[Priority::IDLE];
817     if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) {
818         return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
819     }
820     return std::make_pair(0, Priority::LOW);
821 }
822 
IsChooseIdle()823 bool TaskManager::IsChooseIdle()
824 {
825     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
826     for (auto& worker : workers_) {
827         if (worker->state_ == WorkerState::IDLE) {
828             // If worker->state_ is WorkerState::IDLE, it means that the worker is free
829             continue;
830         }
831         // If there is a worker running a task, do not take the idle task.
832         return false;
833     }
834     // Only when all workers are free, will idle task be taken.
835     return true;
836 }
837 
GetTaskByPriority(const std::unique_ptr<ExecuteQueue> & taskQueue,Priority priority)838 std::pair<uint64_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
839     Priority priority)
840 {
841     uint64_t taskId = taskQueue->DequeueTaskId();
842     if (IsDependendByTaskId(taskId)) {
843         EnqueuePendingTaskInfo(taskId, priority);
844         return std::make_pair(0, priority);
845     }
846     DecreaseNumIfNoIdle(priority);
847     return std::make_pair(taskId, priority);
848 }
849 
NotifyExecuteTask()850 void TaskManager::NotifyExecuteTask()
851 {
852     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
853     if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
854         // When there are only idle tasks and workers executing them, it is not triggered
855         return;
856     }
857 
858     for (auto& worker : idleWorkers_) {
859         worker->NotifyExecuteTask();
860     }
861 }
862 
InitTaskManager(napi_env env)863 void TaskManager::InitTaskManager(napi_env env)
864 {
865     HITRACE_HELPER_METER_NAME("InitTaskManager");
866     if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
867 #if defined(ENABLE_TASKPOOL_FFRT)
868         globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
869         if (!globalEnableFfrtFlag_) {
870             UpdateSystemAppFlag();
871             if (IsSystemApp()) {
872                 disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
873             }
874         }
875         if (EnableFfrt()) {
876             HILOG_INFO("taskpool:: apps use ffrt");
877         } else {
878             HILOG_INFO("taskpool:: apps do not use ffrt");
879         }
880 #endif
881 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
882         mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
883             OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
884 #endif
885         auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
886         if (mainThreadEngine == nullptr) {
887             HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
888             return;
889         }
890         hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
891         // Add a reserved thread for taskpool
892         CreateWorkers(hostEnv_);
893         // Create a timer to manage worker threads
894         std::thread workerManager([this] {this->RunTaskManager();});
895         workerManager.detach();
896     }
897 }
898 
CreateWorkers(napi_env env,uint32_t num)899 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
900 {
901     HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
902     for (uint32_t i = 0; i < num; i++) {
903         auto worker = Worker::WorkerConstructor(env);
904         NotifyWorkerAdded(worker);
905     }
906     CountTraceForWorker();
907 }
908 
RemoveWorker(Worker * worker)909 void TaskManager::RemoveWorker(Worker* worker)
910 {
911     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
912     idleWorkers_.erase(worker);
913     timeoutWorkers_.erase(worker);
914     workers_.erase(worker);
915 }
916 
RestoreWorker(Worker * worker)917 void TaskManager::RestoreWorker(Worker* worker)
918 {
919     std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
920     if (UNLIKELY(suspend_)) {
921         suspend_ = false;
922         uv_timer_again(timer_);
923     }
924     if (worker->state_ == WorkerState::BLOCKED) {
925         // since the worker is blocked, we should add it to the timeout set
926         timeoutWorkers_.insert(worker);
927         return;
928     }
929     // Since the worker may be executing some tasks in IO thread, we should add it to the
930     // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
931     HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
932     idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
933     if (GetTaskNum() != 0) {
934         NotifyExecuteTask();
935     }
936 }
937 
938 // ---------------------------------- SendData ---------------------------------------
RegisterCallback(napi_env env,uint64_t taskId,std::shared_ptr<CallbackInfo> callbackInfo)939 void TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
940 {
941     std::lock_guard<std::mutex> lock(callbackMutex_);
942     callbackTable_[taskId] = callbackInfo;
943 }
944 
GetCallbackInfo(uint64_t taskId)945 std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId)
946 {
947     std::lock_guard<std::mutex> lock(callbackMutex_);
948     auto iter = callbackTable_.find(taskId);
949     if (iter == callbackTable_.end() || iter->second == nullptr) {
950         HILOG_ERROR("taskpool:: the callback does not exist");
951         return nullptr;
952     }
953     return iter->second;
954 }
955 
IncreaseRefCount(uint64_t taskId)956 void TaskManager::IncreaseRefCount(uint64_t taskId)
957 {
958     if (taskId == 0) { // do not support func
959         return;
960     }
961     std::lock_guard<std::mutex> lock(callbackMutex_);
962     auto iter = callbackTable_.find(taskId);
963     if (iter == callbackTable_.end() || iter->second == nullptr) {
964         return;
965     }
966     iter->second->refCount++;
967 }
968 
DecreaseRefCount(napi_env env,uint64_t taskId)969 void TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId)
970 {
971     if (taskId == 0) { // do not support func
972         return;
973     }
974     std::lock_guard<std::mutex> lock(callbackMutex_);
975     auto iter = callbackTable_.find(taskId);
976     if (iter == callbackTable_.end() || iter->second == nullptr) {
977         return;
978     }
979 
980     auto task = reinterpret_cast<Task*>(taskId);
981     if (!task->IsValid()) {
982         callbackTable_.erase(iter);
983         return;
984     }
985 
986     iter->second->refCount--;
987     if (iter->second->refCount == 0) {
988         callbackTable_.erase(iter);
989     }
990 }
991 
ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo> & callbackInfo)992 void TaskManager::ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo)
993 {
994     std::lock_guard<std::mutex> lock(callbackMutex_);
995     callbackInfo->worker = nullptr;
996 }
997 
NotifyCallbackExecute(napi_env env,TaskResultInfo * resultInfo,Task * task)998 napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
999 {
1000     HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str());
1001     std::lock_guard<std::mutex> lock(callbackMutex_);
1002     auto iter = callbackTable_.find(task->taskId_);
1003     if (iter == callbackTable_.end() || iter->second == nullptr) {
1004         HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
1005         ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
1006         delete resultInfo;
1007         return nullptr;
1008     }
1009     Worker* worker = static_cast<Worker*>(task->worker_);
1010     worker->Enqueue(task->env_, resultInfo);
1011     auto callbackInfo = iter->second;
1012     callbackInfo->refCount++;
1013     callbackInfo->worker = worker;
1014     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
1015     workerEngine->IncreaseListeningCounter();
1016 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1017     if (task->IsMainThreadTask()) {
1018         HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask");
1019         auto onCallbackTask = [callbackInfo]() {
1020             TaskPool::ExecuteCallbackTask(callbackInfo.get());
1021         };
1022         TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_);
1023     } else {
1024         callbackInfo->onCallbackSignal->data = callbackInfo.get();
1025         uv_async_send(callbackInfo->onCallbackSignal);
1026     }
1027 #else
1028     callbackInfo->onCallbackSignal->data = callbackInfo.get();
1029     uv_async_send(callbackInfo->onCallbackSignal);
1030 #endif
1031     return nullptr;
1032 }
1033 
GetMessageQueue(const uv_async_t * req)1034 MsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req)
1035 {
1036     std::lock_guard<std::mutex> lock(callbackMutex_);
1037     auto info = static_cast<CallbackInfo*>(req->data);
1038     if (info == nullptr || info->worker == nullptr) {
1039         HILOG_WARN("taskpool:: info or worker is nullptr");
1040         return nullptr;
1041     }
1042     auto worker = info->worker;
1043     MsgQueue* queue = nullptr;
1044     worker->Dequeue(info->hostEnv, queue);
1045     return queue;
1046 }
1047 
GetMessageQueueFromCallbackInfo(CallbackInfo * callbackInfo)1048 MsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo)
1049 {
1050     std::lock_guard<std::mutex> lock(callbackMutex_);
1051     if (callbackInfo == nullptr || callbackInfo->worker == nullptr) {
1052         HILOG_WARN("taskpool:: callbackInfo or worker is nullptr");
1053         return nullptr;
1054     }
1055     auto worker = callbackInfo->worker;
1056     MsgQueue* queue = nullptr;
1057     worker->Dequeue(callbackInfo->hostEnv, queue);
1058     return queue;
1059 }
1060 // ---------------------------------- SendData ---------------------------------------
1061 
NotifyDependencyTaskInfo(uint64_t taskId)1062 void TaskManager::NotifyDependencyTaskInfo(uint64_t taskId)
1063 {
1064     HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
1065     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
1066     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1067     auto iter = dependentTaskInfos_.find(taskId);
1068     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1069         HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1070         return;
1071     }
1072     for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1073         auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1074         RemoveDependencyById(taskId, *taskIdIter);
1075         taskIdIter = iter->second.erase(taskIdIter);
1076         if (taskInfo.first != 0) {
1077             EnqueueTaskId(taskInfo.first, taskInfo.second);
1078         }
1079     }
1080 }
1081 
RemoveDependencyById(uint64_t dependentTaskId,uint64_t taskId)1082 void TaskManager::RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId)
1083 {
1084     HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
1085     // remove dependency after task execute
1086     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1087     auto dependTaskIter = dependTaskInfos_.find(taskId);
1088     if (dependTaskIter != dependTaskInfos_.end()) {
1089         auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
1090         if (dependTaskInnerIter != dependTaskIter->second.end()) {
1091             dependTaskIter->second.erase(dependTaskInnerIter);
1092         }
1093     }
1094 }
1095 
IsDependendByTaskId(uint64_t taskId)1096 bool TaskManager::IsDependendByTaskId(uint64_t taskId)
1097 {
1098     std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1099     auto iter = dependTaskInfos_.find(taskId);
1100     if (iter == dependTaskInfos_.end() || iter->second.empty()) {
1101         return false;
1102     }
1103     return true;
1104 }
1105 
IsDependentByTaskId(uint64_t dependentTaskId)1106 bool TaskManager::IsDependentByTaskId(uint64_t dependentTaskId)
1107 {
1108     std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1109     auto iter = dependentTaskInfos_.find(dependentTaskId);
1110     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1111         return false;
1112     }
1113     return true;
1114 }
1115 
StoreTaskDependency(uint64_t taskId,std::set<uint64_t> taskIdSet)1116 bool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet)
1117 {
1118     HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
1119     StoreDependentTaskInfo(taskIdSet, taskId);
1120     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1121     auto iter = dependTaskInfos_.find(taskId);
1122     if (iter == dependTaskInfos_.end()) {
1123         for (const auto& dependentId : taskIdSet) {
1124             auto idIter = dependTaskInfos_.find(dependentId);
1125             if (idIter == dependTaskInfos_.end()) {
1126                 continue;
1127             }
1128             if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
1129                 return false;
1130             }
1131         }
1132         dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
1133         return true;
1134     }
1135 
1136     for (const auto& dependentId : iter->second) {
1137         auto idIter = dependTaskInfos_.find(dependentId);
1138         if (idIter == dependTaskInfos_.end()) {
1139             continue;
1140         }
1141         if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
1142             return false;
1143         }
1144     }
1145     iter->second.insert(taskIdSet.begin(), taskIdSet.end());
1146     return true;
1147 }
1148 
CheckCircularDependency(std::set<uint64_t> dependentIdSet,std::set<uint64_t> idSet,uint64_t taskId)1149 bool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId)
1150 {
1151     for (const auto& id : idSet) {
1152         if (id == taskId) {
1153             return false;
1154         }
1155         auto iter = dependentIdSet.find(id);
1156         if (iter != dependentIdSet.end()) {
1157             continue;
1158         }
1159         auto dIter = dependTaskInfos_.find(id);
1160         if (dIter == dependTaskInfos_.end()) {
1161             continue;
1162         }
1163         if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
1164             return false;
1165         }
1166     }
1167     return true;
1168 }
1169 
RemoveTaskDependency(uint64_t taskId,uint64_t dependentId)1170 bool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId)
1171 {
1172     HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
1173     RemoveDependentTaskInfo(dependentId, taskId);
1174     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1175     auto iter = dependTaskInfos_.find(taskId);
1176     if (iter == dependTaskInfos_.end()) {
1177         return false;
1178     }
1179     auto dependIter = iter->second.find(dependentId);
1180     if (dependIter ==  iter->second.end()) {
1181         return false;
1182     }
1183     iter->second.erase(dependIter);
1184     return true;
1185 }
1186 
EnqueuePendingTaskInfo(uint64_t taskId,Priority priority)1187 void TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority)
1188 {
1189     if (taskId == 0) {
1190         return;
1191     }
1192     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1193     pendingTaskInfos_.emplace(taskId, priority);
1194 }
1195 
DequeuePendingTaskInfo(uint64_t taskId)1196 std::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId)
1197 {
1198     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1199     if (pendingTaskInfos_.empty()) {
1200         return std::make_pair(0, Priority::DEFAULT);
1201     }
1202     std::pair<uint64_t, Priority> result;
1203     for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
1204         if (it->first == taskId) {
1205             result = std::make_pair(it->first, it->second);
1206             it = pendingTaskInfos_.erase(it);
1207             break;
1208         }
1209     }
1210     return result;
1211 }
1212 
RemovePendingTaskInfo(uint64_t taskId)1213 void TaskManager::RemovePendingTaskInfo(uint64_t taskId)
1214 {
1215     HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
1216     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1217     pendingTaskInfos_.erase(taskId);
1218 }
1219 
StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet,uint64_t taskId)1220 void TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId)
1221 {
1222     HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
1223     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1224     for (const auto& id : dependentTaskIdSet) {
1225         auto iter = dependentTaskInfos_.find(id);
1226         if (iter == dependentTaskInfos_.end()) {
1227             std::set<uint64_t> set{taskId};
1228             dependentTaskInfos_.emplace(id, std::move(set));
1229         } else {
1230             iter->second.emplace(taskId);
1231         }
1232     }
1233 }
1234 
RemoveDependentTaskInfo(uint64_t dependentTaskId,uint64_t taskId)1235 void TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId)
1236 {
1237     HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
1238     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1239     auto iter = dependentTaskInfos_.find(dependentTaskId);
1240     if (iter == dependentTaskInfos_.end()) {
1241         return;
1242     }
1243     auto taskIter = iter->second.find(taskId);
1244     if (taskIter == iter->second.end()) {
1245         return;
1246     }
1247     iter->second.erase(taskIter);
1248 }
1249 
GetTaskDependInfoToString(uint64_t taskId)1250 std::string TaskManager::GetTaskDependInfoToString(uint64_t taskId)
1251 {
1252     std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1253     std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
1254     auto iter = dependTaskInfos_.find(taskId);
1255     if (iter != dependTaskInfos_.end()) {
1256         for (const auto& id : iter->second) {
1257             str += " " + std::to_string(id);
1258         }
1259     }
1260     return str;
1261 }
1262 
StoreTaskDuration(uint64_t taskId,uint64_t totalDuration,uint64_t cpuDuration)1263 void TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
1264 {
1265     HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
1266     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1267     auto iter = taskDurationInfos_.find(taskId);
1268     if (iter == taskDurationInfos_.end()) {
1269         std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
1270         taskDurationInfos_.emplace(taskId, std::move(durationData));
1271     } else {
1272         if (totalDuration != 0) {
1273             iter->second.first = totalDuration;
1274         }
1275         if (cpuDuration != 0) {
1276             iter->second.second = cpuDuration;
1277         }
1278     }
1279 }
1280 
GetTaskDuration(uint64_t taskId,std::string durationType)1281 uint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType)
1282 {
1283     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1284     auto iter = taskDurationInfos_.find(taskId);
1285     if (iter == taskDurationInfos_.end()) {
1286         return 0;
1287     }
1288     if (durationType == TASK_TOTAL_TIME) {
1289         return iter->second.first;
1290     } else if (durationType == TASK_CPU_TIME) {
1291         return iter->second.second;
1292     } else if (iter->second.first == 0) {
1293         return 0;
1294     }
1295     return iter->second.first - iter->second.second;
1296 }
1297 
GetTaskName(uint64_t taskId)1298 std::string TaskManager::GetTaskName(uint64_t taskId)
1299 {
1300     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1301     auto iter = tasks_.find(taskId);
1302     if (iter == tasks_.end()) {
1303         return "";
1304     }
1305     return iter->second->name_;
1306 }
1307 
RemoveTaskDuration(uint64_t taskId)1308 void TaskManager::RemoveTaskDuration(uint64_t taskId)
1309 {
1310     HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
1311     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1312     auto iter = taskDurationInfos_.find(taskId);
1313     if (iter != taskDurationInfos_.end()) {
1314         taskDurationInfos_.erase(iter);
1315     }
1316 }
1317 
StoreLongTaskInfo(uint64_t taskId,Worker * worker)1318 void TaskManager::StoreLongTaskInfo(uint64_t taskId, Worker* worker)
1319 {
1320     std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1321     longTasksMap_.emplace(taskId, worker);
1322 }
1323 
RemoveLongTaskInfo(uint64_t taskId)1324 void TaskManager::RemoveLongTaskInfo(uint64_t taskId)
1325 {
1326     std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1327     longTasksMap_.erase(taskId);
1328 }
1329 
GetLongTaskInfo(uint64_t taskId)1330 Worker* TaskManager::GetLongTaskInfo(uint64_t taskId)
1331 {
1332     std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
1333     auto iter = longTasksMap_.find(taskId);
1334     return iter != longTasksMap_.end() ? iter->second : nullptr;
1335 }
1336 
TerminateTask(uint64_t taskId)1337 void TaskManager::TerminateTask(uint64_t taskId)
1338 {
1339     HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
1340     auto worker = GetLongTaskInfo(taskId);
1341     if (UNLIKELY(worker == nullptr)) {
1342         return;
1343     }
1344     worker->TerminateTask(taskId);
1345     RemoveLongTaskInfo(taskId);
1346 }
1347 
ReleaseTaskData(napi_env env,Task * task,bool shouldDeleteTask)1348 void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
1349 {
1350     uint64_t taskId = task->taskId_;
1351     if (shouldDeleteTask) {
1352         RemoveTask(taskId);
1353     }
1354     if (task->onResultSignal_ != nullptr) {
1355         if (!uv_is_closing((uv_handle_t*)task->onResultSignal_)) {
1356             ConcurrentHelper::UvHandleClose(task->onResultSignal_);
1357         } else {
1358             delete task->onResultSignal_;
1359         }
1360         task->onResultSignal_ = nullptr;
1361     }
1362 
1363     if (task->currentTaskInfo_ != nullptr) {
1364         delete task->currentTaskInfo_;
1365         task->currentTaskInfo_ = nullptr;
1366     }
1367 
1368     task->CancelPendingTask(env);
1369 
1370     task->ClearDelayedTimers();
1371 
1372     if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
1373         return;
1374     }
1375     DecreaseRefCount(env, taskId);
1376     RemoveTaskDuration(taskId);
1377     RemovePendingTaskInfo(taskId);
1378     ReleaseCallBackInfo(task);
1379     {
1380         std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1381         for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
1382             if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
1383                 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
1384             } else {
1385                 ++dependentTaskIter;
1386             }
1387         }
1388     }
1389     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1390     auto dependTaskIter = dependTaskInfos_.find(taskId);
1391     if (dependTaskIter != dependTaskInfos_.end()) {
1392         dependTaskInfos_.erase(dependTaskIter);
1393     }
1394 }
1395 
ReleaseCallBackInfo(Task * task)1396 void TaskManager::ReleaseCallBackInfo(Task* task)
1397 {
1398     HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
1399     if (task->onEnqueuedCallBackInfo_ != nullptr) {
1400         delete task->onEnqueuedCallBackInfo_;
1401         task->onEnqueuedCallBackInfo_ = nullptr;
1402     }
1403 
1404     if (task->onStartExecutionCallBackInfo_ != nullptr) {
1405         delete task->onStartExecutionCallBackInfo_;
1406         task->onStartExecutionCallBackInfo_ = nullptr;
1407     }
1408 
1409     if (task->onExecutionFailedCallBackInfo_ != nullptr) {
1410         delete task->onExecutionFailedCallBackInfo_;
1411         task->onExecutionFailedCallBackInfo_ = nullptr;
1412     }
1413 
1414     if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
1415         delete task->onExecutionSucceededCallBackInfo_;
1416         task->onExecutionSucceededCallBackInfo_ = nullptr;
1417     }
1418 
1419 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1420     if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
1421         if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1422             ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1423         } else {
1424             delete task->onStartExecutionSignal_;
1425         }
1426         task->onStartExecutionSignal_ = nullptr;
1427     }
1428 #else
1429     if (task->onStartExecutionSignal_ != nullptr) {
1430         if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1431             ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1432         } else {
1433             delete task->onStartExecutionSignal_;
1434         }
1435         task->onStartExecutionSignal_ = nullptr;
1436     }
1437 #endif
1438 }
1439 
StoreTask(uint64_t taskId,Task * task)1440 void TaskManager::StoreTask(uint64_t taskId, Task* task)
1441 {
1442     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1443     tasks_.emplace(taskId, task);
1444 }
1445 
RemoveTask(uint64_t taskId)1446 void TaskManager::RemoveTask(uint64_t taskId)
1447 {
1448     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1449     tasks_.erase(taskId);
1450 }
1451 
GetTask(uint64_t taskId)1452 Task* TaskManager::GetTask(uint64_t taskId)
1453 {
1454     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1455     auto iter = tasks_.find(taskId);
1456     if (iter == tasks_.end()) {
1457         return nullptr;
1458     }
1459     return iter->second;
1460 }
1461 
1462 #if defined(ENABLE_TASKPOOL_FFRT)
UpdateSystemAppFlag()1463 void TaskManager::UpdateSystemAppFlag()
1464 {
1465     auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
1466     if (abilityManager == nullptr) {
1467         HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
1468         return;
1469     }
1470     auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
1471     if (bundleObj == nullptr) {
1472         HILOG_ERROR("taskpool:: fail to get bundle manager service.");
1473         return;
1474     }
1475     auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
1476     if (bundleMgr == nullptr) {
1477         HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
1478         return;
1479     }
1480     OHOS::AppExecFwk::BundleInfo bundleInfo;
1481     if (bundleMgr->GetBundleInfoForSelf(
1482         static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
1483         != OHOS::ERR_OK) {
1484         HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
1485         return;
1486     }
1487     isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
1488 }
1489 #endif
1490 
1491 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
PostTask(std::function<void ()> task,const char * taskName,Priority priority)1492 bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
1493 {
1494     return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
1495 }
1496 #endif
1497 
CheckTask(uint64_t taskId)1498 bool TaskManager::CheckTask(uint64_t taskId)
1499 {
1500     std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1501     auto item = tasks_.find(taskId);
1502     return item != tasks_.end();
1503 }
1504 
BatchRejectDeferred(napi_env env,std::list<napi_deferred> deferreds,std::string error)1505 void TaskManager::BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error)
1506 {
1507     if (deferreds.empty()) {
1508         return;
1509     }
1510     napi_value message = ErrorHelper::NewError(env, 0, error.c_str());
1511     for (auto deferred : deferreds) {
1512         napi_reject_deferred(env, deferred, message);
1513     }
1514 }
1515 
1516 // ----------------------------------- TaskGroupManager ----------------------------------------
GetInstance()1517 TaskGroupManager& TaskGroupManager::GetInstance()
1518 {
1519     static TaskGroupManager groupManager;
1520     return groupManager;
1521 }
1522 
AddTask(uint64_t groupId,napi_ref taskRef,uint64_t taskId)1523 void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId)
1524 {
1525     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1526     auto groupIter = taskGroups_.find(groupId);
1527     if (groupIter == taskGroups_.end()) {
1528         HILOG_DEBUG("taskpool:: taskGroup has been released");
1529         return;
1530     }
1531     auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
1532     if (taskGroup == nullptr) {
1533         HILOG_ERROR("taskpool:: taskGroup is null");
1534         return;
1535     }
1536     taskGroup->taskRefs_.push_back(taskRef);
1537     taskGroup->taskNum_++;
1538     taskGroup->taskIds_.push_back(taskId);
1539 }
1540 
ReleaseTaskGroupData(napi_env env,TaskGroup * group)1541 void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
1542 {
1543     HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group");
1544     TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
1545     {
1546         std::lock_guard<RECURSIVE_MUTEX> lock(group->taskGroupMutex_);
1547         if (group->isValid_) {
1548             for (uint64_t taskId : group->taskIds_) {
1549                 Task* task = TaskManager::GetInstance().GetTask(taskId);
1550                 if (task == nullptr || !task->IsValid()) {
1551                     continue;
1552                 }
1553                 napi_reference_unref(task->env_, task->taskRef_, nullptr);
1554             }
1555         }
1556 
1557         if (group->currentGroupInfo_ != nullptr) {
1558             delete group->currentGroupInfo_;
1559         }
1560     }
1561     group->CancelPendingGroup(env);
1562 }
1563 
CancelGroup(napi_env env,uint64_t groupId)1564 void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
1565 {
1566     std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId);
1567     HITRACE_HELPER_METER_NAME(strTrace);
1568     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
1569     TaskGroup* taskGroup = GetTaskGroup(groupId);
1570     if (taskGroup == nullptr) {
1571         HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
1572         return;
1573     }
1574     if (taskGroup->groupState_ == ExecuteState::CANCELED) {
1575         return;
1576     }
1577     std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
1578     if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
1579         taskGroup->groupState_ == ExecuteState::FINISHED) {
1580         std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
1581         HILOG_ERROR("%{public}s", errMsg.c_str());
1582         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
1583         return;
1584     }
1585     ExecuteState groupState = taskGroup->groupState_;
1586     taskGroup->groupState_ = ExecuteState::CANCELED;
1587     taskGroup->CancelPendingGroup(env);
1588     if (taskGroup->currentGroupInfo_->finishedTaskNum != taskGroup->taskNum_) {
1589         for (uint64_t taskId : taskGroup->taskIds_) {
1590             CancelGroupTask(env, taskId, taskGroup);
1591         }
1592         if (taskGroup->currentGroupInfo_->finishedTaskNum == taskGroup->taskNum_) {
1593             napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
1594             taskGroup->RejectResult(env, error);
1595             return;
1596         }
1597     }
1598     if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) {
1599         auto engine = reinterpret_cast<NativeEngine*>(env);
1600         for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) {
1601             engine->DecreaseSubEnvCounter();
1602         }
1603         napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
1604         taskGroup->RejectResult(env, error);
1605     }
1606 }
1607 
CancelGroupTask(napi_env env,uint64_t taskId,TaskGroup * group)1608 void TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group)
1609 {
1610     HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str());
1611     auto task = TaskManager::GetInstance().GetTask(taskId);
1612     if (task == nullptr) {
1613         HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
1614         return;
1615     }
1616     std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
1617     if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr &&
1618         TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority)) {
1619         reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1620         task->DecreaseTaskRefCount();
1621         TaskManager::GetInstance().DecreaseRefCount(env, taskId);
1622         delete task->currentTaskInfo_;
1623         task->currentTaskInfo_ = nullptr;
1624         if (group->currentGroupInfo_ != nullptr) {
1625             group->currentGroupInfo_->finishedTaskNum++;
1626         }
1627     }
1628     task->taskState_ = ExecuteState::CANCELED;
1629 }
1630 
StoreSequenceRunner(uint64_t seqRunnerId,SequenceRunner * seqRunner)1631 void TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
1632 {
1633     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1634     seqRunners_.emplace(seqRunnerId, seqRunner);
1635 }
1636 
RemoveSequenceRunner(uint64_t seqRunnerId)1637 void TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId)
1638 {
1639     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1640     seqRunners_.erase(seqRunnerId);
1641 }
1642 
GetSeqRunner(uint64_t seqRunnerId)1643 SequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId)
1644 {
1645     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1646     auto iter = seqRunners_.find(seqRunnerId);
1647     if (iter != seqRunners_.end()) {
1648         return iter->second;
1649     }
1650     HILOG_DEBUG("taskpool:: sequenceRunner has been released.");
1651     return nullptr;
1652 }
1653 
AddTaskToSeqRunner(uint64_t seqRunnerId,Task * task)1654 void TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
1655 {
1656     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1657     auto iter = seqRunners_.find(seqRunnerId);
1658     if (iter == seqRunners_.end()) {
1659         HILOG_ERROR("seqRunner:: seqRunner not found.");
1660         return;
1661     } else {
1662         std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_);
1663         iter->second->seqRunnerTasks_.push_back(task);
1664     }
1665 }
1666 
TriggerSeqRunner(napi_env env,Task * lastTask)1667 bool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask)
1668 {
1669     uint64_t seqRunnerId = lastTask->seqRunnerId_;
1670     SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
1671     if (seqRunner == nullptr) {
1672         HILOG_ERROR("seqRunner:: trigger seqRunner not exist.");
1673         return false;
1674     }
1675     if (!SequenceRunnerManager::GetInstance().TriggerGlobalSeqRunner(env, seqRunner)) {
1676         HILOG_ERROR("seqRunner:: trigger globalSeqRunner not exist.");
1677         return false;
1678     }
1679     if (seqRunner->currentTaskId_ != lastTask->taskId_) {
1680         HILOG_ERROR("seqRunner:: only front task can trigger seqRunner.");
1681         return false;
1682     }
1683     {
1684         std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_);
1685         if (seqRunner->seqRunnerTasks_.empty()) {
1686             HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty.", std::to_string(seqRunnerId).c_str());
1687             seqRunner->currentTaskId_ = 0;
1688             return true;
1689         }
1690         Task* task = seqRunner->seqRunnerTasks_.front();
1691         seqRunner->seqRunnerTasks_.pop_front();
1692         while (task->taskState_ == ExecuteState::CANCELED) {
1693             DisposeCanceledTask(env, task);
1694             if (seqRunner->seqRunnerTasks_.empty()) {
1695                 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.",
1696                             std::to_string(seqRunnerId).c_str());
1697                 seqRunner->currentTaskId_ = 0;
1698                 return true;
1699             }
1700             task = seqRunner->seqRunnerTasks_.front();
1701             seqRunner->seqRunnerTasks_.pop_front();
1702         }
1703         seqRunner->currentTaskId_ = task->taskId_;
1704         task->IncreaseRefCount();
1705         task->taskState_ = ExecuteState::WAITING;
1706         HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
1707                     std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
1708         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
1709     }
1710     return true;
1711 }
1712 
DisposeCanceledTask(napi_env env,Task * task)1713 void TaskGroupManager::DisposeCanceledTask(napi_env env, Task* task)
1714 {
1715     napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: sequenceRunner task has been canceled");
1716     napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
1717     reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1718     napi_reference_unref(env, task->taskRef_, nullptr);
1719     delete task->currentTaskInfo_;
1720     task->currentTaskInfo_ = nullptr;
1721 }
1722 
StoreTaskGroup(uint64_t groupId,TaskGroup * taskGroup)1723 void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
1724 {
1725     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1726     taskGroups_.emplace(groupId, taskGroup);
1727 }
1728 
RemoveTaskGroup(uint64_t groupId)1729 void TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
1730 {
1731     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1732     taskGroups_.erase(groupId);
1733 }
1734 
GetTaskGroup(uint64_t groupId)1735 TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
1736 {
1737     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1738     auto groupIter = taskGroups_.find(groupId);
1739     if (groupIter == taskGroups_.end()) {
1740         return nullptr;
1741     }
1742     return reinterpret_cast<TaskGroup*>(groupIter->second);
1743 }
1744 
UpdateGroupState(uint64_t groupId)1745 bool TaskGroupManager::UpdateGroupState(uint64_t groupId)
1746 {
1747     HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str());
1748     // During the modification process of the group, prevent other sub threads from performing other
1749     // operations on the group pointer, which may cause the modification to fail.
1750     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1751     auto groupIter = taskGroups_.find(groupId);
1752     if (groupIter == taskGroups_.end()) {
1753         return false;
1754     }
1755     TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second);
1756     if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) {
1757         HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled");
1758         return false;
1759     }
1760     group->groupState_ = ExecuteState::RUNNING;
1761     return true;
1762 }
1763 
1764 // ----------------------------------- SequenceRunnerManager ----------------------------------------
GetInstance()1765 SequenceRunnerManager& SequenceRunnerManager::GetInstance()
1766 {
1767     static SequenceRunnerManager sequenceRunnerManager;
1768     return sequenceRunnerManager;
1769 }
1770 
CreateOrGetGlobalRunner(napi_env env,napi_value thisVar,size_t argc,const std::string & name,uint32_t priority)1771 SequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
1772                                                                const std::string &name, uint32_t priority)
1773 {
1774     std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1775     SequenceRunner *seqRunner = nullptr;
1776     auto iter = globalSeqRunner_.find(name);
1777     if (iter == globalSeqRunner_.end()) {
1778         seqRunner = new SequenceRunner();
1779         // refresh priority default values on first creation
1780         if (argc == 2) { // 2: The number of parameters is 2.
1781             seqRunner->priority_ = static_cast<Priority>(priority);
1782         }
1783         seqRunner->isGlobalRunner_ = true;
1784         seqRunner->seqName_ = name;
1785         globalSeqRunner_.emplace(name, seqRunner);
1786     } else {
1787         seqRunner = iter->second;
1788         if (priority != seqRunner->priority_) {
1789             ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed.");
1790             return nullptr;
1791         }
1792     }
1793     seqRunner->count_++;
1794     auto tmpIter = seqRunner->globalSeqRunnerRef_.find(env);
1795     if (tmpIter == seqRunner->globalSeqRunnerRef_.end()) {
1796         napi_ref gloableSeqRunnerRef = nullptr;
1797         napi_create_reference(env, thisVar, 0, &gloableSeqRunnerRef);
1798         seqRunner->globalSeqRunnerRef_.emplace(env, gloableSeqRunnerRef);
1799     }
1800 
1801     return seqRunner;
1802 }
1803 
TriggerGlobalSeqRunner(napi_env env,SequenceRunner * seqRunner)1804 bool SequenceRunnerManager::TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner)
1805 {
1806     if (seqRunner->isGlobalRunner_) {
1807         std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1808         auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1809         if (iter == seqRunner->globalSeqRunnerRef_.end()) {
1810             return false;
1811         }
1812         napi_reference_unref(env, iter->second, nullptr);
1813     } else {
1814         napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr);
1815     }
1816     return true;
1817 }
1818 
DecreaseSeqCount(SequenceRunner * seqRunner)1819 uint64_t SequenceRunnerManager::DecreaseSeqCount(SequenceRunner* seqRunner)
1820 {
1821     std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1822     return --(seqRunner->count_);
1823 }
1824 
RemoveGlobalSeqRunnerRef(napi_env env,SequenceRunner * seqRunner)1825 void SequenceRunnerManager::RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner)
1826 {
1827     std::lock_guard<std::mutex> lock(globalSeqRunnerMutex_);
1828     auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1829     if (iter != seqRunner->globalSeqRunnerRef_.end()) {
1830         napi_delete_reference(env, iter->second);
1831         seqRunner->globalSeqRunnerRef_.erase(iter);
1832     }
1833 }
1834 
RemoveSequenceRunner(const std::string & name)1835 void SequenceRunnerManager::RemoveSequenceRunner(const std::string &name)
1836 {
1837     std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1838     auto iter = globalSeqRunner_.find(name.c_str());
1839     if (iter != globalSeqRunner_.end()) {
1840         globalSeqRunner_.erase(iter->first);
1841     }
1842 }
1843 
GlobalSequenceRunnerDestructor(napi_env env,SequenceRunner * seqRunner)1844 void SequenceRunnerManager::GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner)
1845 {
1846     RemoveGlobalSeqRunnerRef(env, seqRunner);
1847     if (DecreaseSeqCount(seqRunner) == 0) {
1848         RemoveSequenceRunner(seqRunner->seqName_);
1849         TaskGroupManager::GetInstance().RemoveSequenceRunner(seqRunner->seqRunnerId_);
1850         delete seqRunner;
1851     }
1852 }
1853 
IncreaseGlobalSeqRunner(napi_env env,SequenceRunner * seqRunner)1854 bool SequenceRunnerManager::IncreaseGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner)
1855 {
1856     std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1857     if (seqRunner->isGlobalRunner_) {
1858         auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1859         if (iter == seqRunner->globalSeqRunnerRef_.end()) {
1860             return false;
1861         }
1862         napi_reference_ref(env, iter->second, nullptr);
1863     } else {
1864         napi_reference_ref(env, seqRunner->seqRunnerRef_, nullptr);
1865     }
1866     return true;
1867 }
1868 
RemoveWaitingTask(Task * task)1869 void SequenceRunnerManager::RemoveWaitingTask(Task* task)
1870 {
1871     auto seqRunner = TaskGroupManager::GetInstance().GetSeqRunner(task->seqRunnerId_);
1872     if (seqRunner != nullptr) {
1873         seqRunner->RemoveWaitingTask(task);
1874     }
1875 }
1876 } // namespace Commonlibrary::Concurrent::TaskPoolModule
1877