• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_manager.h"
17 
18 #include <cinttypes>
19 #include <securec.h>
20 #include <thread>
21 
22 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
23 #include "helper/concurrent_helper.h"
24 #include "helper/error_helper.h"
25 #include "helper/hitrace_helper.h"
26 #include "taskpool.h"
27 #include "utils/log.h"
28 #include "worker.h"
29 
30 namespace Commonlibrary::Concurrent::TaskPoolModule {
31 using namespace OHOS::JsSysModule;
32 
33 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
34 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
35 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
36 static constexpr uint32_t STEP_SIZE = 2;
37 static constexpr uint32_t DEFAULT_THREADS = 3;
38 static constexpr uint32_t MIN_THREADS = 1; // 1: minimum thread num when idle
39 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
40 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
41 static constexpr int32_t MAX_IDLE_TIME = 50000; // 50000: 50s
42 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2min later will release the thread
43 
44 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()45 TaskManager& TaskManager::GetInstance()
46 {
47     static TaskManager manager;
48     return manager;
49 }
50 
TaskManager()51 TaskManager::TaskManager()
52 {
53     for (size_t i = 0; i < taskQueues_.size(); i++) {
54         std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
55         taskQueues_[i] = std::move(taskQueue);
56     }
57 }
58 
~TaskManager()59 TaskManager::~TaskManager()
60 {
61     if (timer_ == nullptr) {
62         HILOG_ERROR("taskpool:: timer_ is nullptr");
63     } else {
64         uv_timer_stop(timer_);
65         ConcurrentHelper::UvHandleClose(timer_);
66         ConcurrentHelper::UvHandleClose(expandHandle_);
67     }
68 
69     if (loop_ != nullptr) {
70         uv_stop(loop_);
71     }
72 
73     {
74         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
75         for (auto& worker : workers_) {
76             delete worker;
77         }
78         workers_.clear();
79     }
80 
81     {
82         std::unique_lock<std::shared_mutex> lock(tasksMutex_);
83         for (auto& [_, task] : tasks_) {
84             delete task;
85             task = nullptr;
86         }
87         tasks_.clear();
88     }
89     CountTraceForWorker();
90 }
91 
CountTraceForWorker()92 void TaskManager::CountTraceForWorker()
93 {
94     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
95     int64_t threadNum = static_cast<int64_t>(workers_.size());
96     int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
97     int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
98     HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
99     HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
100     HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
101     HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
102 }
103 
GetThreadInfos(napi_env env)104 napi_value TaskManager::GetThreadInfos(napi_env env)
105 {
106     napi_value threadInfos = nullptr;
107     napi_create_array(env, &threadInfos);
108     {
109         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
110         int32_t i = 0;
111         for (auto& worker : workers_) {
112             if (worker->workerEnv_ == nullptr) {
113                 continue;
114             }
115             napi_value tid = nullptr;
116             napi_value priority = nullptr;
117             napi_create_int32(env, static_cast<int32_t>(worker->tid_), &tid);
118             napi_create_int32(env, static_cast<int32_t>(worker->priority_), &priority);
119 
120             napi_value taskId = nullptr;
121             napi_create_array(env, &taskId);
122             int32_t j = 0;
123             {
124                 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
125                 for (auto& currentId : worker->currentTaskId_) {
126                     napi_value id = NapiHelper::CreateUint64(env, currentId);
127                     napi_set_element(env, taskId, j, id);
128                     j++;
129                 }
130             }
131             napi_value threadInfo = nullptr;
132             napi_create_object(env, &threadInfo);
133             napi_set_named_property(env, threadInfo, "tid", tid);
134             napi_set_named_property(env, threadInfo, "priority", priority);
135             napi_set_named_property(env, threadInfo, "taskIds", taskId);
136             napi_set_element(env, threadInfos, i, threadInfo);
137             i++;
138         }
139     }
140     return threadInfos;
141 }
142 
GetTaskInfos(napi_env env)143 napi_value TaskManager::GetTaskInfos(napi_env env)
144 {
145     napi_value taskInfos = nullptr;
146     napi_create_array(env, &taskInfos);
147     {
148         std::unique_lock<std::shared_mutex> lock(tasksMutex_);
149         int32_t i = 0;
150         for (const auto& [_, task] : tasks_) {
151             if (task->taskState_ == ExecuteState::NOT_FOUND) {
152                 continue;
153             }
154             napi_value taskInfoValue = nullptr;
155             napi_create_object(env, &taskInfoValue);
156             std::unique_lock<std::shared_mutex> lock(task->taskMutex_);
157             napi_value taskId = NapiHelper::CreateUint64(env, task->taskId_);
158             napi_value name = nullptr;
159             napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
160             napi_set_named_property(env, taskInfoValue, "name", name);
161             ExecuteState state;
162             uint64_t duration = 0;
163             if (task->taskState_ == ExecuteState::WAITING) {
164                 state = ExecuteState::WAITING;
165             } else {
166                 duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
167                 state = ExecuteState::RUNNING;
168             }
169             napi_value stateValue = nullptr;
170             napi_create_int32(env, state, &stateValue);
171             napi_set_named_property(env, taskInfoValue, "taskId", taskId);
172             napi_set_named_property(env, taskInfoValue, "state", stateValue);
173             napi_value durationValue = NapiHelper::CreateUint32(env, duration);
174             napi_set_named_property(env, taskInfoValue, "duration", durationValue);
175             napi_set_element(env, taskInfos, i, taskInfoValue);
176             i++;
177         }
178     }
179     return taskInfos;
180 }
181 
UpdateExecutedInfo(uint64_t duration)182 void TaskManager::UpdateExecutedInfo(uint64_t duration)
183 {
184     totalExecTime_ += duration;
185     totalExecCount_++;
186 }
187 
ComputeSuitableThreadNum()188 uint32_t TaskManager::ComputeSuitableThreadNum()
189 {
190     uint32_t targetNum = 0;
191     if (GetTaskNum() != 0 && totalExecCount_ == 0) {
192         // this branch is used for avoiding time-consuming works that may block the taskpool
193         targetNum = std::min(STEP_SIZE, GetTaskNum());
194     }
195     uint32_t result = 0;
196     if (totalExecCount_ != 0) {
197         auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
198         result = std::ceil(durationPerTask * GetTaskNum() / MAX_TASK_DURATION);
199         targetNum += std::min(result, GetTaskNum());
200     }
201     targetNum += GetRunningWorkers();
202     targetNum |= 1;
203     return targetNum;
204 }
205 
CheckForBlockedWorkers()206 void TaskManager::CheckForBlockedWorkers()
207 {
208     // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
209     // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
210     // else we will choose the longer one
211     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
212     bool needChecking = false;
213     bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
214     uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
215     for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
216         auto worker = *iter;
217         // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
218         if ((worker->state_ == WorkerState::IDLE) ||
219             (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
220             !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
221             continue;
222         }
223         // When executing the promise task, the worker state may not be updated and will be
224         // marked as 'BLOCKED', so we should exclude this situation.
225         // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
226         // the task like I/O in uv threads, we should also exclude this situation.
227         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
228         if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
229             if (!workerEngine->HasWaitingRequest()) {
230                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
231             } else {
232                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
233                 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
234             }
235             continue;
236         }
237 
238         HILOG_INFO("taskpool:: The worker has been marked as timeout.");
239         needChecking = true;
240         workerEngine->TerminateExecution();
241 
242         idleWorkers_.erase(worker);
243         timeoutWorkers_.insert(worker);
244     }
245     // should trigger the check when we have marked and removed workers
246     if (UNLIKELY(needChecking)) {
247         TryExpand();
248     }
249 }
250 
TryTriggerExpand()251 void TaskManager::TryTriggerExpand()
252 {
253     // post the signal to notify the monitor thread to expand
254     if (UNLIKELY(expandHandle_ == nullptr)) {
255         needChecking_ = true;
256         HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr");
257         return;
258     }
259     uv_async_send(expandHandle_);
260 }
261 
262 #if defined(OHOS_PLATFORM)
263 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(Worker * worker,char * buf,uint32_t size)264 bool TaskManager::ReadThreadInfo(Worker* worker, char* buf, uint32_t size)
265 {
266     char path[128]; // 128: buffer for path
267     pid_t pid = getpid();
268     pid_t tid = worker->tid_;
269     ssize_t bytesLen = -1;
270     int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
271     if (ret < 0) {
272         HILOG_ERROR("snprintf_s failed");
273         return false;
274     }
275     int fd = open(path, O_RDONLY | O_NONBLOCK);
276     if (UNLIKELY(fd == -1)) {
277         return false;
278     }
279     bytesLen = read(fd, buf, size - 1);
280     close(fd);
281     if (bytesLen <= 0) {
282         HILOG_ERROR("taskpool:: failed to read %{public}s", path);
283         return false;
284     }
285     buf[bytesLen] = '\0';
286     return true;
287 }
288 
GetIdleWorkers()289 uint32_t TaskManager::GetIdleWorkers()
290 {
291     char buf[4096]; // 4096: buffer for thread info
292     uint32_t idleCount = 0;
293     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
294     for (auto& worker : idleWorkers_) {
295         if (!ReadThreadInfo(worker, buf, sizeof(buf))) {
296             continue;
297         }
298         char state;
299         if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
300             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
301             return 0;
302         }
303         if (state == 'S') {
304             idleCount++;
305         }
306     }
307     return idleCount;
308 }
309 
GetIdleWorkersList(uint32_t step)310 void TaskManager::GetIdleWorkersList(uint32_t step)
311 {
312     char buf[4096]; // 4096: buffer for thread info
313     for (auto& worker : idleWorkers_) {
314         if (!ReadThreadInfo(worker, buf, sizeof(buf))) {
315             continue;
316         }
317         char state;
318         uint64_t utime;
319         if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
320             &state, sizeof(state), &utime) != 2) { // 2: state and utime
321             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
322             return;
323         }
324         if (state != 'S' || utime != worker->lastCpuTime_) {
325             worker->idleCount_ = 0;
326             worker->lastCpuTime_ = utime;
327             continue;
328         }
329         if (++worker->idleCount_ >= IDLE_THRESHOLD) {
330             freeList_.emplace_back(worker);
331         }
332     }
333 }
334 
TriggerShrink(uint32_t step)335 void TaskManager::TriggerShrink(uint32_t step)
336 {
337     GetIdleWorkersList(step);
338     step = std::min(step, static_cast<uint32_t>(freeList_.size()));
339     uint32_t count = 0;
340     for (size_t i = 0; i < freeList_.size(); i++) {
341         auto worker = freeList_[i];
342         if (worker->state_ != WorkerState::IDLE) { // may in I/O
343             continue;
344         }
345         auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
346         if (idleTime < MAX_IDLE_TIME || worker->runningCount_ != 0) {
347             continue;
348         }
349         idleWorkers_.erase(worker);
350         HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
351         uv_async_send(worker->clearWorkerSignal_);
352         if (++count == step) {
353             break;
354         }
355     }
356     freeList_.clear();
357 }
358 #else
GetIdleWorkers()359 uint32_t TaskManager::GetIdleWorkers()
360 {
361     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
362     return idleWorkers_.size();
363 }
364 
TriggerShrink(uint32_t step)365 void TaskManager::TriggerShrink(uint32_t step)
366 {
367     for (uint32_t i = 0; i < step; i++) {
368         // try to free the worker that idle time meets the requirement
369         auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) {
370             auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
371             return idleTime > MAX_IDLE_TIME && worker->runningCount_ == 0;
372         });
373         // remove it from all sets
374         if (iter != idleWorkers_.end()) {
375             auto worker = *iter;
376             idleWorkers_.erase(worker);
377             HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
378             uv_async_send(worker->clearWorkerSignal_);
379         }
380     }
381 }
382 #endif
383 
NotifyShrink(uint32_t targetNum)384 void TaskManager::NotifyShrink(uint32_t targetNum)
385 {
386     uint32_t workerCount = GetThreadNum();
387     if (workerCount > MIN_THREADS && workerCount > targetNum) {
388         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
389         targetNum = std::max(MIN_THREADS, targetNum);
390         uint32_t step = std::min(workerCount - targetNum, STEP_SIZE);
391         TriggerShrink(step);
392     }
393     // remove all timeout workers
394     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
395     for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
396         HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", (*iter)->tid_);
397         uv_async_send((*iter)->clearWorkerSignal_);
398         timeoutWorkers_.erase(iter++);
399     }
400     // stop the timer
401     if ((workers_.size() == idleWorkers_.size() && workers_.size() == MIN_THREADS) && timeoutWorkers_.empty()) {
402         suspend_ = true;
403         uv_timer_stop(timer_);
404         HILOG_DEBUG("taskpool:: timer will be suspended");
405     }
406 }
407 
TriggerLoadBalance(const uv_timer_t * req)408 void TaskManager::TriggerLoadBalance(const uv_timer_t* req)
409 {
410     TaskManager& taskManager = TaskManager::GetInstance();
411     // do not check when try to expand
412     if (taskManager.expandingCount_ != 0) {
413         return;
414     }
415 
416     taskManager.CheckForBlockedWorkers();
417     uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
418     taskManager.NotifyShrink(targetNum);
419     taskManager.CountTraceForWorker();
420 }
421 
TryExpand()422 void TaskManager::TryExpand()
423 {
424     if (GetIdleWorkers() != 0) {
425         return;
426     }
427     // for accuracy, if worker is being created, we will not trigger expansion,
428     // and the expansion will be triggered until all workers are created
429     if (expandingCount_ != 0) {
430         needChecking_ = true;
431         return;
432     }
433     needChecking_ = false; // do not need to check
434     uint32_t targetNum = ComputeSuitableThreadNum();
435     targetNum |= 1;
436     uint32_t workerCount = GetThreadNum();
437     const uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS);
438     if (workerCount < maxThreads && workerCount < targetNum) {
439         uint32_t step = std::min(maxThreads, targetNum) - workerCount;
440         CreateWorkers(hostEnv_, step);
441         HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
442             maxThreads, step, GetThreadNum());
443     }
444     if (UNLIKELY(suspend_)) {
445         suspend_ = false;
446         uv_timer_again(timer_);
447     }
448 }
449 
NotifyExpand(const uv_async_t * req)450 void TaskManager::NotifyExpand(const uv_async_t* req)
451 {
452     TaskManager& taskManager = TaskManager::GetInstance();
453     taskManager.TryExpand();
454 }
455 
RunTaskManager()456 void TaskManager::RunTaskManager()
457 {
458     loop_ = uv_default_loop();
459     timer_ = new uv_timer_t;
460     uv_timer_init(loop_, timer_);
461     expandHandle_ = new uv_async_t;
462     uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, 60000); // 60000: 1min
463     uv_async_init(loop_, expandHandle_, reinterpret_cast<uv_async_cb>(TaskManager::NotifyExpand));
464 #if defined IOS_PLATFORM || defined MAC_PLATFORM
465     pthread_setname_np("OS_TaskManager");
466 #else
467     pthread_setname_np(pthread_self(), "OS_TaskManager");
468 #endif
469     if (UNLIKELY(needChecking_)) {
470         needChecking_ = false;
471         uv_async_send(expandHandle_);
472     }
473     uv_run(loop_, UV_RUN_DEFAULT);
474     uv_loop_close(loop_);
475 }
476 
CancelTask(napi_env env,uint64_t taskId)477 void TaskManager::CancelTask(napi_env env, uint64_t taskId)
478 {
479     // 1. Cannot find taskInfo by executeId, throw error
480     // 2. Find executing taskInfo, skip it
481     // 3. Find waiting taskInfo, cancel it
482     // 4. Find canceled taskInfo, skip it
483     Task* task = GetTask(taskId);
484     if (task == nullptr) {
485         return;
486     }
487     std::unique_lock<std::shared_mutex> lock(task->taskMutex_);
488     if (task->currentTaskInfo_ == nullptr) {
489         HILOG_ERROR("taskpool:: cancel non-existent task");
490         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK);
491         return;
492     }
493     ExecuteState state = task->taskState_;
494     switch (state) {
495         case ExecuteState::NOT_FOUND:
496             HILOG_ERROR("taskpool:: cancel non-existent task");
497             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK);
498             return;
499         case ExecuteState::RUNNING:
500             task->taskState_ = ExecuteState::CANCELED;
501             task->CancelPendingTask(env, ExecuteState::RUNNING);
502             break;
503         case ExecuteState::WAITING:
504             task->taskState_ = ExecuteState::CANCELED;
505             task->CancelPendingTask(env, ExecuteState::WAITING);
506             break;
507         default: // Default is CANCELED, means task isCanceled, do not need to mark again.
508             break;
509     }
510 }
511 
NotifyWorkerIdle(Worker * worker)512 void TaskManager::NotifyWorkerIdle(Worker* worker)
513 {
514     {
515         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
516         if (worker->state_ == WorkerState::BLOCKED) {
517             return;
518         }
519         idleWorkers_.insert(worker);
520     }
521     if (GetTaskNum() != 0) {
522         NotifyExecuteTask();
523     }
524     CountTraceForWorker();
525 }
526 
NotifyWorkerCreated(Worker * worker)527 void TaskManager::NotifyWorkerCreated(Worker* worker)
528 {
529     NotifyWorkerIdle(worker);
530     expandingCount_--;
531     if (UNLIKELY(needChecking_ && expandingCount_ == 0 && expandHandle_ != nullptr)) {
532         needChecking_ = false;
533         uv_async_send(expandHandle_);
534     }
535 }
536 
NotifyWorkerAdded(Worker * worker)537 void TaskManager::NotifyWorkerAdded(Worker* worker)
538 {
539     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
540     workers_.insert(worker);
541     HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
542 }
543 
NotifyWorkerRunning(Worker * worker)544 void TaskManager::NotifyWorkerRunning(Worker* worker)
545 {
546     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
547     idleWorkers_.erase(worker);
548     CountTraceForWorker();
549 }
550 
GetRunningWorkers()551 uint32_t TaskManager::GetRunningWorkers()
552 {
553     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
554     return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
555         return worker->runningCount_ != 0;
556     });
557 }
558 
GetTimeoutWorkers()559 uint32_t TaskManager::GetTimeoutWorkers()
560 {
561     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
562     return timeoutWorkers_.size();
563 }
564 
GetTaskNum()565 uint32_t TaskManager::GetTaskNum()
566 {
567     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
568     return taskQueues_[Priority::HIGH]->GetTaskNum() + taskQueues_[Priority::MEDIUM]->GetTaskNum() +
569         taskQueues_[Priority::LOW]->GetTaskNum();
570 }
571 
GetThreadNum()572 uint32_t TaskManager::GetThreadNum()
573 {
574     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
575     return workers_.size();
576 }
577 
EnqueueTaskId(uint64_t taskId,Priority priority)578 void TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority)
579 {
580     {
581         std::lock_guard<std::mutex> lock(taskQueuesMutex_);
582         taskQueues_[priority]->EnqueueTaskId(taskId);
583     }
584     NotifyExecuteTask();
585 }
586 
DequeueTaskId()587 std::pair<uint64_t, Priority> TaskManager::DequeueTaskId()
588 {
589     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
590     auto& highTaskQueue = taskQueues_[Priority::HIGH];
591     if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
592         auto& highTaskQueue = taskQueues_[Priority::HIGH];
593         highPrioExecuteCount_++;
594         uint64_t taskId = highTaskQueue->DequeueTaskId();
595         if (IsDependendByTaskId(taskId)) {
596             EnqueuePendingTaskInfo(taskId, Priority::HIGH);
597             return std::make_pair(0, Priority::HIGH);
598         }
599         return std::make_pair(taskId, Priority::HIGH);
600     }
601     highPrioExecuteCount_ = 0;
602 
603     auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
604     if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
605         mediumPrioExecuteCount_++;
606         uint64_t taskId = mediumTaskQueue->DequeueTaskId();
607         if (IsDependendByTaskId(taskId)) {
608             EnqueuePendingTaskInfo(taskId, Priority::MEDIUM);
609             return std::make_pair(0, Priority::MEDIUM);
610         }
611         return std::make_pair(taskId, Priority::MEDIUM);
612     }
613     mediumPrioExecuteCount_ = 0;
614 
615     auto& lowTaskQueue = taskQueues_[Priority::LOW];
616     uint64_t taskId = lowTaskQueue->DequeueTaskId();
617     if (IsDependendByTaskId(taskId)) {
618         EnqueuePendingTaskInfo(taskId, Priority::LOW);
619         return std::make_pair(0, Priority::LOW);
620     }
621     return std::make_pair(taskId, Priority::LOW);
622 }
623 
NotifyExecuteTask()624 void TaskManager::NotifyExecuteTask()
625 {
626     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
627     for (auto& worker : idleWorkers_) {
628         worker->NotifyExecuteTask();
629     }
630 }
631 
InitTaskManager(napi_env env)632 void TaskManager::InitTaskManager(napi_env env)
633 {
634     HITRACE_HELPER_METER_NAME("InitTaskManager");
635     auto hostEngine = reinterpret_cast<NativeEngine*>(env);
636     while (hostEngine != nullptr && !hostEngine->IsMainThread()) {
637         hostEngine = hostEngine->GetHostEngine();
638     }
639     if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
640         hostEnv_ = reinterpret_cast<napi_env>(hostEngine);
641         // Add a reserved thread for taskpool
642         CreateWorkers(hostEnv_);
643         // Create a timer to manage worker threads
644         std::thread workerManager(&TaskManager::RunTaskManager, this);
645         workerManager.detach();
646     }
647 }
648 
CreateWorkers(napi_env env,uint32_t num)649 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
650 {
651     for (uint32_t i = 0; i < num; i++) {
652         expandingCount_++;
653         auto worker = Worker::WorkerConstructor(env);
654         NotifyWorkerAdded(worker);
655     }
656     CountTraceForWorker();
657 }
658 
RemoveWorker(Worker * worker)659 void TaskManager::RemoveWorker(Worker* worker)
660 {
661     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
662     idleWorkers_.erase(worker);
663     workers_.erase(worker);
664 }
665 
RestoreWorker(Worker * worker)666 void TaskManager::RestoreWorker(Worker* worker)
667 {
668     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
669     if (UNLIKELY(suspend_)) {
670         suspend_ = false;
671         uv_timer_again(timer_);
672     }
673     if (worker->state_ == WorkerState::BLOCKED) {
674         // since the worker is blocked, we should add it to the timeout set
675         timeoutWorkers_.insert(worker);
676         return;
677     }
678     // Since the worker may be executing some tasks in IO thread, we should add it to the
679     // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
680     HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
681     idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
682     if (GetTaskNum() != 0) {
683         NotifyExecuteTask();
684     }
685 }
686 
RegisterCallback(napi_env env,uint64_t taskId,std::shared_ptr<CallbackInfo> callbackInfo)687 void TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
688 {
689     std::lock_guard<std::mutex> lock(callbackMutex_);
690     callbackTable_[taskId] = callbackInfo;
691 }
692 
GetCallbackInfo(uint64_t taskId)693 std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId)
694 {
695     std::lock_guard<std::mutex> lock(callbackMutex_);
696     auto iter = callbackTable_.find(taskId);
697     if (iter == callbackTable_.end() || iter->second == nullptr) {
698         HILOG_ERROR("taskpool:: the callback does not exist");
699         return nullptr;
700     }
701     return iter->second;
702 }
703 
IncreaseRefCount(uint64_t taskId)704 void TaskManager::IncreaseRefCount(uint64_t taskId)
705 {
706     if (taskId == 0) { // do not support func
707         return;
708     }
709     std::lock_guard<std::mutex> lock(callbackMutex_);
710     auto iter = callbackTable_.find(taskId);
711     if (iter == callbackTable_.end() || iter->second == nullptr) {
712         return;
713     }
714     iter->second->refCount++;
715 }
716 
DecreaseRefCount(napi_env env,uint64_t taskId)717 void TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId)
718 {
719     if (taskId == 0) { // do not support func
720         return;
721     }
722     std::lock_guard<std::mutex> lock(callbackMutex_);
723     auto iter = callbackTable_.find(taskId);
724     if (iter == callbackTable_.end() || iter->second == nullptr) {
725         return;
726     }
727     iter->second->refCount--;
728     if (iter->second->refCount == 0) {
729         callbackTable_.erase(iter);
730     }
731 }
732 
NotifyCallbackExecute(napi_env env,TaskResultInfo * resultInfo,Task * task)733 napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
734 {
735     std::lock_guard<std::mutex> lock(callbackMutex_);
736     auto iter = callbackTable_.find(task->taskId_);
737     if (iter == callbackTable_.end() || iter->second == nullptr) {
738         HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
739         ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
740         delete resultInfo;
741         return nullptr;
742     }
743     Worker* worker = static_cast<Worker*>(task->worker_);
744     worker->Enqueue(resultInfo);
745     auto callbackInfo = iter->second;
746     callbackInfo->refCount++;
747     callbackInfo->onCallbackSignal->data = worker;
748     uv_async_send(callbackInfo->onCallbackSignal);
749     return nullptr;
750 }
751 
NotifyDependencyTaskInfo(uint64_t taskId)752 void TaskManager::NotifyDependencyTaskInfo(uint64_t taskId)
753 {
754     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
755     auto iter = dependentTaskInfos_.find(taskId);
756     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
757         return;
758     }
759     for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
760         {
761             std::unique_lock<std::shared_mutex> lock(tasksMutex_);
762             auto taskIter = tasks_.find(*taskIdIter);
763             if (taskIter == tasks_.end()) {
764                 taskIdIter = iter->second.erase(taskIdIter);
765                 continue;
766             }
767         }
768         auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
769         if (taskInfo.first == 0) {
770             taskIdIter = iter->second.erase(taskIdIter);
771             continue;
772         }
773         EnqueueTaskId(taskInfo.first, taskInfo.second);
774         auto dependTaskIter = dependTaskInfos_.find(*taskIdIter);
775         if (dependTaskIter != dependTaskInfos_.end()) {
776             auto dependTaskInnerIter = dependTaskIter->second.find(taskId);
777             if (dependTaskInnerIter != dependTaskIter->second.end()) {
778                 dependTaskIter->second.erase(dependTaskInnerIter);
779             }
780         }
781         taskIdIter = iter->second.erase(taskIdIter);
782     }
783 }
784 
IsDependendByTaskId(uint64_t taskId)785 bool TaskManager::IsDependendByTaskId(uint64_t taskId)
786 {
787     {
788         std::unique_lock<std::shared_mutex> lock(tasksMutex_);
789         auto taskIter = tasks_.find(taskId);
790         if (taskIter == tasks_.end()) {
791             return false;
792         }
793         auto task = reinterpret_cast<Task*>(taskIter->second);
794         if (!task->IsCommonTask()) {
795             return false;
796         }
797     }
798     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
799     auto iter = dependTaskInfos_.find(taskId);
800     if (iter == dependTaskInfos_.end() || iter->second.empty()) {
801         return false;
802     }
803     return true;
804 }
805 
StoreTaskDependency(uint64_t taskId,std::set<uint64_t> taskIdSet)806 bool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet)
807 {
808     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
809     StoreDependentTaskInfo(taskIdSet, taskId);
810     auto iter = dependTaskInfos_.find(taskId);
811     if (iter == dependTaskInfos_.end()) {
812         for (const auto& dependentId : taskIdSet) {
813             auto idIter = dependTaskInfos_.find(dependentId);
814             if (idIter == dependTaskInfos_.end()) {
815                 continue;
816             }
817             if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
818                 return false;
819             }
820         }
821         dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
822         return true;
823     }
824 
825     for (const auto& dependentId : iter->second) {
826         auto idIter = dependTaskInfos_.find(dependentId);
827         if (idIter == dependTaskInfos_.end()) {
828             continue;
829         }
830         if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
831             return false;
832         }
833     }
834     iter->second.insert(taskIdSet.begin(), taskIdSet.end());
835     return true;
836 }
837 
CheckCircularDependency(std::set<uint64_t> dependentIdSet,std::set<uint64_t> idSet,uint64_t taskId)838 bool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId)
839 {
840     for (const auto& id : idSet) {
841         if (id == taskId) {
842             return false;
843         }
844         auto iter = dependentIdSet.find(id);
845         if (iter != dependentIdSet.end()) {
846             continue;
847         }
848         auto dIter = dependTaskInfos_.find(id);
849         if (dIter == dependTaskInfos_.end()) {
850             continue;
851         }
852         if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
853             return false;
854         }
855     }
856     return true;
857 }
858 
RemoveTaskDependency(uint64_t taskId,uint64_t dependentId)859 bool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId)
860 {
861     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
862     RemoveDependentTaskInfo(dependentId, taskId);
863     auto iter = dependTaskInfos_.find(taskId);
864     if (iter == dependTaskInfos_.end()) {
865         return false;
866     }
867     auto dependIter = iter->second.find(dependentId);
868     if (dependIter ==  iter->second.end()) {
869         return false;
870     }
871     iter->second.erase(dependIter);
872     return true;
873 }
874 
EnqueuePendingTaskInfo(uint64_t taskId,Priority priority)875 void TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority)
876 {
877     if (taskId == 0) {
878         return;
879     }
880     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
881     pendingTaskInfos_.emplace(taskId, priority);
882 }
883 
DequeuePendingTaskInfo(uint64_t taskId)884 std::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId)
885 {
886     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
887     if (pendingTaskInfos_.empty()) {
888         return std::make_pair(0, Priority::DEFAULT);
889     }
890     std::pair<uint64_t, Priority> result;
891     for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
892         if (it->first == taskId) {
893             result = std::make_pair(it->first, it->second);
894             it = pendingTaskInfos_.erase(it);
895             break;
896         }
897     }
898     return result;
899 }
900 
RemovePendingTaskInfo(uint64_t taskId)901 void TaskManager::RemovePendingTaskInfo(uint64_t taskId)
902 {
903     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
904     pendingTaskInfos_.erase(taskId);
905 }
906 
StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet,uint64_t taskId)907 void TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId)
908 {
909     for (const auto& id : dependentTaskIdSet) {
910         auto iter = dependentTaskInfos_.find(id);
911         if (iter == dependentTaskInfos_.end()) {
912             std::set<uint64_t> set{taskId};
913             dependentTaskInfos_.emplace(id, std::move(set));
914         } else {
915             iter->second.emplace(taskId);
916         }
917     }
918 }
919 
RemoveDependentTaskInfo(uint64_t dependentTaskId,uint64_t taskId)920 void TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId)
921 {
922     auto iter = dependentTaskInfos_.find(dependentTaskId);
923     if (iter == dependentTaskInfos_.end()) {
924         return;
925     }
926     auto taskIter = iter->second.find(taskId);
927     if (taskIter == iter->second.end()) {
928         return;
929     }
930     iter->second.erase(taskIter);
931 }
932 
StoreTaskDuration(uint64_t taskId,uint64_t totalDuration,uint64_t cpuDuration)933 void TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
934 {
935     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
936     auto iter = taskDurationInfos_.find(taskId);
937     if (iter == taskDurationInfos_.end()) {
938         std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
939         taskDurationInfos_.emplace(taskId, std::move(durationData));
940     } else {
941         if (totalDuration != 0) {
942             iter->second.first = totalDuration;
943         }
944         if (cpuDuration != 0) {
945             iter->second.second = cpuDuration;
946         }
947     }
948 }
949 
GetTaskDuration(uint64_t taskId,std::string durationType)950 uint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType)
951 {
952     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
953     auto iter = taskDurationInfos_.find(taskId);
954     if (iter == taskDurationInfos_.end()) {
955         return 0;
956     }
957     if (durationType == TASK_TOTAL_TIME) {
958         return iter->second.first;
959     } else if (durationType == TASK_CPU_TIME) {
960         return iter->second.second;
961     } else if (iter->second.first == 0) {
962         return 0;
963     }
964     return iter->second.first - iter->second.second;
965 }
966 
RemoveTaskDuration(uint64_t taskId)967 void TaskManager::RemoveTaskDuration(uint64_t taskId)
968 {
969     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
970     auto iter = taskDurationInfos_.find(taskId);
971     if (iter != taskDurationInfos_.end()) {
972         taskDurationInfos_.erase(iter);
973     }
974 }
975 
ReleaseTaskData(napi_env env,Task * task)976 void TaskManager::ReleaseTaskData(napi_env env, Task* task)
977 {
978     uint64_t taskId = task->taskId_;
979     RemoveTask(taskId);
980     if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
981         return;
982     }
983     DecreaseRefCount(env, taskId);
984     RemoveTaskDuration(taskId);
985     if (!task->IsCommonTask()) {
986         return;
987     }
988     RemovePendingTaskInfo(taskId);
989     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
990     for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
991         if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
992             dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
993         } else {
994             ++dependentTaskIter;
995         }
996     }
997     auto dependTaskIter = dependTaskInfos_.find(taskId);
998     if (dependTaskIter != dependTaskInfos_.end()) {
999         dependTaskInfos_.erase(dependTaskIter);
1000     }
1001 }
1002 
StoreTask(uint64_t taskId,Task * task)1003 void TaskManager::StoreTask(uint64_t taskId, Task* task)
1004 {
1005     std::unique_lock<std::shared_mutex> lock(tasksMutex_);
1006     tasks_.emplace(taskId, task);
1007 }
1008 
RemoveTask(uint64_t taskId)1009 void TaskManager::RemoveTask(uint64_t taskId)
1010 {
1011     std::unique_lock<std::shared_mutex> lock(tasksMutex_);
1012     tasks_.erase(taskId);
1013 }
1014 
GetTask(uint64_t taskId)1015 Task* TaskManager::GetTask(uint64_t taskId)
1016 {
1017     std::unique_lock<std::shared_mutex> lock(tasksMutex_);
1018     auto iter = tasks_.find(taskId);
1019     if (iter == tasks_.end()) {
1020         return nullptr;
1021     }
1022     return iter->second;
1023 }
1024 
1025 // ----------------------------------- TaskGroupManager ----------------------------------------
GetInstance()1026 TaskGroupManager& TaskGroupManager::GetInstance()
1027 {
1028     static TaskGroupManager groupManager;
1029     return groupManager;
1030 }
1031 
AddTask(uint64_t groupId,napi_ref taskRef,uint64_t taskId)1032 void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId)
1033 {
1034     auto groupIter = taskGroups_.find(groupId);
1035     if (groupIter == taskGroups_.end()) {
1036         return;
1037     }
1038     auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
1039     taskGroup->taskRefs_.push_back(taskRef);
1040     taskGroup->taskNum_++;
1041     taskGroup->taskIds_.push_back(taskId);
1042 }
1043 
ReleaseTaskGroupData(napi_env env,TaskGroup * group)1044 void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
1045 {
1046     TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
1047     for (uint64_t taskId : group->taskIds_) {
1048         Task* task = TaskManager::GetInstance().GetTask(taskId);
1049         if (task == nullptr) {
1050             continue;
1051         }
1052         napi_reference_unref(task->env_, task->taskRef_, nullptr);
1053     }
1054 }
1055 
CancelGroup(napi_env env,uint64_t groupId)1056 void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
1057 {
1058     TaskGroup* taskGroup = GetTaskGroup(groupId);
1059     if (taskGroup == nullptr) {
1060         HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
1061         return;
1062     }
1063     {
1064         std::unique_lock<std::shared_mutex> lock(taskGroup->taskGroupMutex_);
1065         if (taskGroup->currentGroupInfo_ == nullptr) {
1066             HILOG_ERROR("taskpool:: cancel non-existent task group");
1067             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP);
1068             return;
1069         }
1070         taskGroup->CancelPendingGroup(env);
1071     }
1072     if (taskGroup->groupState_ == ExecuteState::NOT_FOUND) {
1073         HILOG_ERROR("taskpool:: cancel non-existent task group");
1074         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP);
1075         return;
1076     }
1077     if (taskGroup->groupState_ == ExecuteState::CANCELED) {
1078         return;
1079     }
1080     if (taskGroup->currentGroupInfo_->finishedTask != taskGroup->taskNum_) {
1081         for (uint64_t taskId : taskGroup->taskIds_) {
1082             CancelGroupTask(env, taskId, taskGroup);
1083         }
1084     }
1085     taskGroup->groupState_ = ExecuteState::CANCELED;
1086 }
1087 
CancelGroupTask(napi_env env,uint64_t taskId,TaskGroup * group)1088 void TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group)
1089 {
1090     auto task = TaskManager::GetInstance().GetTask(taskId);
1091     if (task == nullptr) {
1092         HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
1093         return;
1094     }
1095     std::unique_lock<std::shared_mutex> lock(task->taskMutex_);
1096     ExecuteState state = task->taskState_;
1097     switch (state) {
1098         case ExecuteState::NOT_FOUND:
1099             return;
1100         case ExecuteState::RUNNING:
1101             task->taskState_ = ExecuteState::CANCELED;
1102             break;
1103         case ExecuteState::WAITING:
1104             task->taskState_ = ExecuteState::CANCELED;
1105             if (group->groupState_ == ExecuteState::WAITING) {
1106                 group->groupState_ = ExecuteState::CANCELED;
1107                 break;
1108             }
1109             if (task->currentTaskInfo_ != nullptr) {
1110                 delete task->currentTaskInfo_;
1111                 task->currentTaskInfo_ = nullptr;
1112             }
1113             break;
1114         default:
1115             break;
1116     }
1117 }
1118 
StoreSequenceRunner(uint64_t seqRunnerId,SequenceRunner * seqRunner)1119 void TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
1120 {
1121     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1122     seqRunners_.emplace(seqRunnerId, seqRunner);
1123 }
1124 
RemoveSequenceRunner(uint64_t seqRunnerId)1125 void TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId)
1126 {
1127     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1128     seqRunners_.erase(seqRunnerId);
1129 }
1130 
GetSeqRunner(uint64_t seqRunnerId)1131 SequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId)
1132 {
1133     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1134     auto iter = seqRunners_.find(seqRunnerId);
1135     if (iter != seqRunners_.end()) {
1136         return iter->second;
1137     }
1138     HILOG_ERROR("taskpool:: seqRunner not exist.");
1139     return nullptr;
1140 }
1141 
AddTaskToSeqRunner(uint64_t seqRunnerId,Task * task)1142 void TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
1143 {
1144     std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1145     auto iter = seqRunners_.find(seqRunnerId);
1146     if (iter == seqRunners_.end()) {
1147         HILOG_ERROR("seqRunner:: seqRunner not found.");
1148         return;
1149     } else {
1150         std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_);
1151         iter->second->seqRunnerTasks_.push(task);
1152     }
1153 }
1154 
TriggerSeqRunner(napi_env env,Task * lastTask)1155 bool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask)
1156 {
1157     uint64_t seqRunnerId = lastTask->seqRunnerId_;
1158     SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
1159     if (seqRunner == nullptr) {
1160         HILOG_ERROR("seqRunner:: trigger seqRunner not exist.");
1161         return false;
1162     }
1163     napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr);
1164     if (seqRunner->currentTaskId_ != lastTask->taskId_) {
1165         HILOG_ERROR("seqRunner:: only front task can trigger seqRunner.");
1166         return false;
1167     }
1168     {
1169         std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_);
1170         if (seqRunner->seqRunnerTasks_.empty()) {
1171             HILOG_DEBUG("seqRunner:: seqRunner %" PRIu64 " empty.", seqRunnerId);
1172             seqRunner->currentTaskId_ = 0;
1173             return true;
1174         }
1175         Task* task = seqRunner->seqRunnerTasks_.front();
1176         seqRunner->seqRunnerTasks_.pop();
1177         while (task->taskState_ == ExecuteState::CANCELED) {
1178             if (seqRunner->seqRunnerTasks_.empty()) {
1179                 HILOG_DEBUG("seqRunner:: seqRunner %" PRIu64 " empty in cancel loop.", seqRunnerId);
1180                 seqRunner->currentTaskId_ = 0;
1181                 return true;
1182             }
1183             task = seqRunner->seqRunnerTasks_.front();
1184             seqRunner->seqRunnerTasks_.pop();
1185         }
1186         seqRunner->currentTaskId_ = task->taskId_;
1187         task->IncreaseRefCount();
1188         HILOG_DEBUG("seqRunner:: Trig task %" PRIu64 " in seqRunner %" PRIu64 ".", task->taskId_, seqRunnerId);
1189         TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
1190         TaskManager::GetInstance().TryTriggerExpand();
1191     }
1192     return true;
1193 }
1194 
StoreTaskGroup(uint64_t groupId,TaskGroup * taskGroup)1195 void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
1196 {
1197     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1198     taskGroups_.emplace(groupId, taskGroup);
1199 }
1200 
RemoveTaskGroup(uint64_t groupId)1201 void TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
1202 {
1203     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1204     taskGroups_.erase(groupId);
1205 }
1206 
GetTaskGroup(uint64_t groupId)1207 TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
1208 {
1209     std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1210     auto groupIter = taskGroups_.find(groupId);
1211     if (groupIter == taskGroups_.end()) {
1212         return nullptr;
1213     }
1214     return reinterpret_cast<TaskGroup*>(groupIter->second);
1215 }
1216 
UpdateGroupState(uint64_t groupId)1217 void TaskGroupManager::UpdateGroupState(uint64_t groupId)
1218 {
1219     TaskGroup* group = GetTaskGroup(groupId);
1220     if (group == nullptr) {
1221         HILOG_ERROR("taskpool:: UpdateGroupState group is nullptr");
1222         return;
1223     }
1224     group->groupState_ = ExecuteState::RUNNING;
1225 }
1226 } // namespace Commonlibrary::Concurrent::TaskPoolModule
1227