1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_manager.h"
17
18 #include <cinttypes>
19 #include <securec.h>
20 #include <thread>
21
22 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
23 #include "helper/concurrent_helper.h"
24 #include "helper/error_helper.h"
25 #include "helper/hitrace_helper.h"
26 #include "taskpool.h"
27 #include "utils/log.h"
28 #include "worker.h"
29
30 namespace Commonlibrary::Concurrent::TaskPoolModule {
31 using namespace OHOS::JsSysModule;
32
33 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
34 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
35 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
36 static constexpr uint32_t STEP_SIZE = 2;
37 static constexpr uint32_t DEFAULT_THREADS = 3;
38 static constexpr uint32_t MIN_THREADS = 1; // 1: minimum thread num when idle
39 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
40 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
41 static constexpr int32_t MAX_IDLE_TIME = 50000; // 50000: 50s
42 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2min later will release the thread
43
44 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()45 TaskManager& TaskManager::GetInstance()
46 {
47 static TaskManager manager;
48 return manager;
49 }
50
TaskManager()51 TaskManager::TaskManager()
52 {
53 for (size_t i = 0; i < taskQueues_.size(); i++) {
54 std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
55 taskQueues_[i] = std::move(taskQueue);
56 }
57 }
58
~TaskManager()59 TaskManager::~TaskManager()
60 {
61 if (timer_ == nullptr) {
62 HILOG_ERROR("taskpool:: timer_ is nullptr");
63 } else {
64 uv_timer_stop(timer_);
65 ConcurrentHelper::UvHandleClose(timer_);
66 ConcurrentHelper::UvHandleClose(expandHandle_);
67 }
68
69 if (loop_ != nullptr) {
70 uv_stop(loop_);
71 }
72
73 {
74 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
75 for (auto& worker : workers_) {
76 delete worker;
77 }
78 workers_.clear();
79 }
80
81 {
82 std::unique_lock<std::shared_mutex> lock(tasksMutex_);
83 for (auto& [_, task] : tasks_) {
84 delete task;
85 task = nullptr;
86 }
87 tasks_.clear();
88 }
89 CountTraceForWorker();
90 }
91
CountTraceForWorker()92 void TaskManager::CountTraceForWorker()
93 {
94 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
95 int64_t threadNum = static_cast<int64_t>(workers_.size());
96 int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
97 int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
98 HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
99 HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
100 HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
101 HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
102 }
103
GetThreadInfos(napi_env env)104 napi_value TaskManager::GetThreadInfos(napi_env env)
105 {
106 napi_value threadInfos = nullptr;
107 napi_create_array(env, &threadInfos);
108 {
109 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
110 int32_t i = 0;
111 for (auto& worker : workers_) {
112 if (worker->workerEnv_ == nullptr) {
113 continue;
114 }
115 napi_value tid = nullptr;
116 napi_value priority = nullptr;
117 napi_create_int32(env, static_cast<int32_t>(worker->tid_), &tid);
118 napi_create_int32(env, static_cast<int32_t>(worker->priority_), &priority);
119
120 napi_value taskId = nullptr;
121 napi_create_array(env, &taskId);
122 int32_t j = 0;
123 {
124 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
125 for (auto& currentId : worker->currentTaskId_) {
126 napi_value id = NapiHelper::CreateUint64(env, currentId);
127 napi_set_element(env, taskId, j, id);
128 j++;
129 }
130 }
131 napi_value threadInfo = nullptr;
132 napi_create_object(env, &threadInfo);
133 napi_set_named_property(env, threadInfo, "tid", tid);
134 napi_set_named_property(env, threadInfo, "priority", priority);
135 napi_set_named_property(env, threadInfo, "taskIds", taskId);
136 napi_set_element(env, threadInfos, i, threadInfo);
137 i++;
138 }
139 }
140 return threadInfos;
141 }
142
GetTaskInfos(napi_env env)143 napi_value TaskManager::GetTaskInfos(napi_env env)
144 {
145 napi_value taskInfos = nullptr;
146 napi_create_array(env, &taskInfos);
147 {
148 std::unique_lock<std::shared_mutex> lock(tasksMutex_);
149 int32_t i = 0;
150 for (const auto& [_, task] : tasks_) {
151 if (task->taskState_ == ExecuteState::NOT_FOUND) {
152 continue;
153 }
154 napi_value taskInfoValue = nullptr;
155 napi_create_object(env, &taskInfoValue);
156 std::unique_lock<std::shared_mutex> lock(task->taskMutex_);
157 napi_value taskId = NapiHelper::CreateUint64(env, task->taskId_);
158 napi_value name = nullptr;
159 napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
160 napi_set_named_property(env, taskInfoValue, "name", name);
161 ExecuteState state;
162 uint64_t duration = 0;
163 if (task->taskState_ == ExecuteState::WAITING) {
164 state = ExecuteState::WAITING;
165 } else {
166 duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
167 state = ExecuteState::RUNNING;
168 }
169 napi_value stateValue = nullptr;
170 napi_create_int32(env, state, &stateValue);
171 napi_set_named_property(env, taskInfoValue, "taskId", taskId);
172 napi_set_named_property(env, taskInfoValue, "state", stateValue);
173 napi_value durationValue = NapiHelper::CreateUint32(env, duration);
174 napi_set_named_property(env, taskInfoValue, "duration", durationValue);
175 napi_set_element(env, taskInfos, i, taskInfoValue);
176 i++;
177 }
178 }
179 return taskInfos;
180 }
181
UpdateExecutedInfo(uint64_t duration)182 void TaskManager::UpdateExecutedInfo(uint64_t duration)
183 {
184 totalExecTime_ += duration;
185 totalExecCount_++;
186 }
187
ComputeSuitableThreadNum()188 uint32_t TaskManager::ComputeSuitableThreadNum()
189 {
190 uint32_t targetNum = 0;
191 if (GetTaskNum() != 0 && totalExecCount_ == 0) {
192 // this branch is used for avoiding time-consuming works that may block the taskpool
193 targetNum = std::min(STEP_SIZE, GetTaskNum());
194 }
195 uint32_t result = 0;
196 if (totalExecCount_ != 0) {
197 auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
198 result = std::ceil(durationPerTask * GetTaskNum() / MAX_TASK_DURATION);
199 targetNum += std::min(result, GetTaskNum());
200 }
201 targetNum += GetRunningWorkers();
202 targetNum |= 1;
203 return targetNum;
204 }
205
CheckForBlockedWorkers()206 void TaskManager::CheckForBlockedWorkers()
207 {
208 // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
209 // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
210 // else we will choose the longer one
211 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
212 bool needChecking = false;
213 bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
214 uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
215 for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
216 auto worker = *iter;
217 // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
218 if ((worker->state_ == WorkerState::IDLE) ||
219 (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
220 !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
221 continue;
222 }
223 // When executing the promise task, the worker state may not be updated and will be
224 // marked as 'BLOCKED', so we should exclude this situation.
225 // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
226 // the task like I/O in uv threads, we should also exclude this situation.
227 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
228 if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
229 if (!workerEngine->HasWaitingRequest()) {
230 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
231 } else {
232 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
233 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
234 }
235 continue;
236 }
237
238 HILOG_INFO("taskpool:: The worker has been marked as timeout.");
239 needChecking = true;
240 workerEngine->TerminateExecution();
241
242 idleWorkers_.erase(worker);
243 timeoutWorkers_.insert(worker);
244 }
245 // should trigger the check when we have marked and removed workers
246 if (UNLIKELY(needChecking)) {
247 TryExpand();
248 }
249 }
250
TryTriggerExpand()251 void TaskManager::TryTriggerExpand()
252 {
253 // post the signal to notify the monitor thread to expand
254 if (UNLIKELY(expandHandle_ == nullptr)) {
255 needChecking_ = true;
256 HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr");
257 return;
258 }
259 uv_async_send(expandHandle_);
260 }
261
262 #if defined(OHOS_PLATFORM)
263 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(Worker * worker,char * buf,uint32_t size)264 bool TaskManager::ReadThreadInfo(Worker* worker, char* buf, uint32_t size)
265 {
266 char path[128]; // 128: buffer for path
267 pid_t pid = getpid();
268 pid_t tid = worker->tid_;
269 ssize_t bytesLen = -1;
270 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
271 if (ret < 0) {
272 HILOG_ERROR("snprintf_s failed");
273 return false;
274 }
275 int fd = open(path, O_RDONLY | O_NONBLOCK);
276 if (UNLIKELY(fd == -1)) {
277 return false;
278 }
279 bytesLen = read(fd, buf, size - 1);
280 close(fd);
281 if (bytesLen <= 0) {
282 HILOG_ERROR("taskpool:: failed to read %{public}s", path);
283 return false;
284 }
285 buf[bytesLen] = '\0';
286 return true;
287 }
288
GetIdleWorkers()289 uint32_t TaskManager::GetIdleWorkers()
290 {
291 char buf[4096]; // 4096: buffer for thread info
292 uint32_t idleCount = 0;
293 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
294 for (auto& worker : idleWorkers_) {
295 if (!ReadThreadInfo(worker, buf, sizeof(buf))) {
296 continue;
297 }
298 char state;
299 if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
300 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
301 return 0;
302 }
303 if (state == 'S') {
304 idleCount++;
305 }
306 }
307 return idleCount;
308 }
309
GetIdleWorkersList(uint32_t step)310 void TaskManager::GetIdleWorkersList(uint32_t step)
311 {
312 char buf[4096]; // 4096: buffer for thread info
313 for (auto& worker : idleWorkers_) {
314 if (!ReadThreadInfo(worker, buf, sizeof(buf))) {
315 continue;
316 }
317 char state;
318 uint64_t utime;
319 if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
320 &state, sizeof(state), &utime) != 2) { // 2: state and utime
321 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
322 return;
323 }
324 if (state != 'S' || utime != worker->lastCpuTime_) {
325 worker->idleCount_ = 0;
326 worker->lastCpuTime_ = utime;
327 continue;
328 }
329 if (++worker->idleCount_ >= IDLE_THRESHOLD) {
330 freeList_.emplace_back(worker);
331 }
332 }
333 }
334
TriggerShrink(uint32_t step)335 void TaskManager::TriggerShrink(uint32_t step)
336 {
337 GetIdleWorkersList(step);
338 step = std::min(step, static_cast<uint32_t>(freeList_.size()));
339 uint32_t count = 0;
340 for (size_t i = 0; i < freeList_.size(); i++) {
341 auto worker = freeList_[i];
342 if (worker->state_ != WorkerState::IDLE) { // may in I/O
343 continue;
344 }
345 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
346 if (idleTime < MAX_IDLE_TIME || worker->runningCount_ != 0) {
347 continue;
348 }
349 idleWorkers_.erase(worker);
350 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
351 uv_async_send(worker->clearWorkerSignal_);
352 if (++count == step) {
353 break;
354 }
355 }
356 freeList_.clear();
357 }
358 #else
GetIdleWorkers()359 uint32_t TaskManager::GetIdleWorkers()
360 {
361 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
362 return idleWorkers_.size();
363 }
364
TriggerShrink(uint32_t step)365 void TaskManager::TriggerShrink(uint32_t step)
366 {
367 for (uint32_t i = 0; i < step; i++) {
368 // try to free the worker that idle time meets the requirement
369 auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) {
370 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
371 return idleTime > MAX_IDLE_TIME && worker->runningCount_ == 0;
372 });
373 // remove it from all sets
374 if (iter != idleWorkers_.end()) {
375 auto worker = *iter;
376 idleWorkers_.erase(worker);
377 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
378 uv_async_send(worker->clearWorkerSignal_);
379 }
380 }
381 }
382 #endif
383
NotifyShrink(uint32_t targetNum)384 void TaskManager::NotifyShrink(uint32_t targetNum)
385 {
386 uint32_t workerCount = GetThreadNum();
387 if (workerCount > MIN_THREADS && workerCount > targetNum) {
388 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
389 targetNum = std::max(MIN_THREADS, targetNum);
390 uint32_t step = std::min(workerCount - targetNum, STEP_SIZE);
391 TriggerShrink(step);
392 }
393 // remove all timeout workers
394 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
395 for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
396 HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", (*iter)->tid_);
397 uv_async_send((*iter)->clearWorkerSignal_);
398 timeoutWorkers_.erase(iter++);
399 }
400 // stop the timer
401 if ((workers_.size() == idleWorkers_.size() && workers_.size() == MIN_THREADS) && timeoutWorkers_.empty()) {
402 suspend_ = true;
403 uv_timer_stop(timer_);
404 HILOG_DEBUG("taskpool:: timer will be suspended");
405 }
406 }
407
TriggerLoadBalance(const uv_timer_t * req)408 void TaskManager::TriggerLoadBalance(const uv_timer_t* req)
409 {
410 TaskManager& taskManager = TaskManager::GetInstance();
411 // do not check when try to expand
412 if (taskManager.expandingCount_ != 0) {
413 return;
414 }
415
416 taskManager.CheckForBlockedWorkers();
417 uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
418 taskManager.NotifyShrink(targetNum);
419 taskManager.CountTraceForWorker();
420 }
421
TryExpand()422 void TaskManager::TryExpand()
423 {
424 if (GetIdleWorkers() != 0) {
425 return;
426 }
427 // for accuracy, if worker is being created, we will not trigger expansion,
428 // and the expansion will be triggered until all workers are created
429 if (expandingCount_ != 0) {
430 needChecking_ = true;
431 return;
432 }
433 needChecking_ = false; // do not need to check
434 uint32_t targetNum = ComputeSuitableThreadNum();
435 targetNum |= 1;
436 uint32_t workerCount = GetThreadNum();
437 const uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS);
438 if (workerCount < maxThreads && workerCount < targetNum) {
439 uint32_t step = std::min(maxThreads, targetNum) - workerCount;
440 CreateWorkers(hostEnv_, step);
441 HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
442 maxThreads, step, GetThreadNum());
443 }
444 if (UNLIKELY(suspend_)) {
445 suspend_ = false;
446 uv_timer_again(timer_);
447 }
448 }
449
NotifyExpand(const uv_async_t * req)450 void TaskManager::NotifyExpand(const uv_async_t* req)
451 {
452 TaskManager& taskManager = TaskManager::GetInstance();
453 taskManager.TryExpand();
454 }
455
RunTaskManager()456 void TaskManager::RunTaskManager()
457 {
458 loop_ = uv_default_loop();
459 timer_ = new uv_timer_t;
460 uv_timer_init(loop_, timer_);
461 expandHandle_ = new uv_async_t;
462 uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, 60000); // 60000: 1min
463 uv_async_init(loop_, expandHandle_, reinterpret_cast<uv_async_cb>(TaskManager::NotifyExpand));
464 #if defined IOS_PLATFORM || defined MAC_PLATFORM
465 pthread_setname_np("OS_TaskManager");
466 #else
467 pthread_setname_np(pthread_self(), "OS_TaskManager");
468 #endif
469 if (UNLIKELY(needChecking_)) {
470 needChecking_ = false;
471 uv_async_send(expandHandle_);
472 }
473 uv_run(loop_, UV_RUN_DEFAULT);
474 uv_loop_close(loop_);
475 }
476
CancelTask(napi_env env,uint64_t taskId)477 void TaskManager::CancelTask(napi_env env, uint64_t taskId)
478 {
479 // 1. Cannot find taskInfo by executeId, throw error
480 // 2. Find executing taskInfo, skip it
481 // 3. Find waiting taskInfo, cancel it
482 // 4. Find canceled taskInfo, skip it
483 Task* task = GetTask(taskId);
484 if (task == nullptr) {
485 return;
486 }
487 std::unique_lock<std::shared_mutex> lock(task->taskMutex_);
488 if (task->currentTaskInfo_ == nullptr) {
489 HILOG_ERROR("taskpool:: cancel non-existent task");
490 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK);
491 return;
492 }
493 ExecuteState state = task->taskState_;
494 switch (state) {
495 case ExecuteState::NOT_FOUND:
496 HILOG_ERROR("taskpool:: cancel non-existent task");
497 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK);
498 return;
499 case ExecuteState::RUNNING:
500 task->taskState_ = ExecuteState::CANCELED;
501 task->CancelPendingTask(env, ExecuteState::RUNNING);
502 break;
503 case ExecuteState::WAITING:
504 task->taskState_ = ExecuteState::CANCELED;
505 task->CancelPendingTask(env, ExecuteState::WAITING);
506 break;
507 default: // Default is CANCELED, means task isCanceled, do not need to mark again.
508 break;
509 }
510 }
511
NotifyWorkerIdle(Worker * worker)512 void TaskManager::NotifyWorkerIdle(Worker* worker)
513 {
514 {
515 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
516 if (worker->state_ == WorkerState::BLOCKED) {
517 return;
518 }
519 idleWorkers_.insert(worker);
520 }
521 if (GetTaskNum() != 0) {
522 NotifyExecuteTask();
523 }
524 CountTraceForWorker();
525 }
526
NotifyWorkerCreated(Worker * worker)527 void TaskManager::NotifyWorkerCreated(Worker* worker)
528 {
529 NotifyWorkerIdle(worker);
530 expandingCount_--;
531 if (UNLIKELY(needChecking_ && expandingCount_ == 0 && expandHandle_ != nullptr)) {
532 needChecking_ = false;
533 uv_async_send(expandHandle_);
534 }
535 }
536
NotifyWorkerAdded(Worker * worker)537 void TaskManager::NotifyWorkerAdded(Worker* worker)
538 {
539 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
540 workers_.insert(worker);
541 HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
542 }
543
NotifyWorkerRunning(Worker * worker)544 void TaskManager::NotifyWorkerRunning(Worker* worker)
545 {
546 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
547 idleWorkers_.erase(worker);
548 CountTraceForWorker();
549 }
550
GetRunningWorkers()551 uint32_t TaskManager::GetRunningWorkers()
552 {
553 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
554 return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
555 return worker->runningCount_ != 0;
556 });
557 }
558
GetTimeoutWorkers()559 uint32_t TaskManager::GetTimeoutWorkers()
560 {
561 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
562 return timeoutWorkers_.size();
563 }
564
GetTaskNum()565 uint32_t TaskManager::GetTaskNum()
566 {
567 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
568 return taskQueues_[Priority::HIGH]->GetTaskNum() + taskQueues_[Priority::MEDIUM]->GetTaskNum() +
569 taskQueues_[Priority::LOW]->GetTaskNum();
570 }
571
GetThreadNum()572 uint32_t TaskManager::GetThreadNum()
573 {
574 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
575 return workers_.size();
576 }
577
EnqueueTaskId(uint64_t taskId,Priority priority)578 void TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority)
579 {
580 {
581 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
582 taskQueues_[priority]->EnqueueTaskId(taskId);
583 }
584 NotifyExecuteTask();
585 }
586
DequeueTaskId()587 std::pair<uint64_t, Priority> TaskManager::DequeueTaskId()
588 {
589 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
590 auto& highTaskQueue = taskQueues_[Priority::HIGH];
591 if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
592 auto& highTaskQueue = taskQueues_[Priority::HIGH];
593 highPrioExecuteCount_++;
594 uint64_t taskId = highTaskQueue->DequeueTaskId();
595 if (IsDependendByTaskId(taskId)) {
596 EnqueuePendingTaskInfo(taskId, Priority::HIGH);
597 return std::make_pair(0, Priority::HIGH);
598 }
599 return std::make_pair(taskId, Priority::HIGH);
600 }
601 highPrioExecuteCount_ = 0;
602
603 auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
604 if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
605 mediumPrioExecuteCount_++;
606 uint64_t taskId = mediumTaskQueue->DequeueTaskId();
607 if (IsDependendByTaskId(taskId)) {
608 EnqueuePendingTaskInfo(taskId, Priority::MEDIUM);
609 return std::make_pair(0, Priority::MEDIUM);
610 }
611 return std::make_pair(taskId, Priority::MEDIUM);
612 }
613 mediumPrioExecuteCount_ = 0;
614
615 auto& lowTaskQueue = taskQueues_[Priority::LOW];
616 uint64_t taskId = lowTaskQueue->DequeueTaskId();
617 if (IsDependendByTaskId(taskId)) {
618 EnqueuePendingTaskInfo(taskId, Priority::LOW);
619 return std::make_pair(0, Priority::LOW);
620 }
621 return std::make_pair(taskId, Priority::LOW);
622 }
623
NotifyExecuteTask()624 void TaskManager::NotifyExecuteTask()
625 {
626 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
627 for (auto& worker : idleWorkers_) {
628 worker->NotifyExecuteTask();
629 }
630 }
631
InitTaskManager(napi_env env)632 void TaskManager::InitTaskManager(napi_env env)
633 {
634 HITRACE_HELPER_METER_NAME("InitTaskManager");
635 auto hostEngine = reinterpret_cast<NativeEngine*>(env);
636 while (hostEngine != nullptr && !hostEngine->IsMainThread()) {
637 hostEngine = hostEngine->GetHostEngine();
638 }
639 if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
640 hostEnv_ = reinterpret_cast<napi_env>(hostEngine);
641 // Add a reserved thread for taskpool
642 CreateWorkers(hostEnv_);
643 // Create a timer to manage worker threads
644 std::thread workerManager(&TaskManager::RunTaskManager, this);
645 workerManager.detach();
646 }
647 }
648
CreateWorkers(napi_env env,uint32_t num)649 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
650 {
651 for (uint32_t i = 0; i < num; i++) {
652 expandingCount_++;
653 auto worker = Worker::WorkerConstructor(env);
654 NotifyWorkerAdded(worker);
655 }
656 CountTraceForWorker();
657 }
658
RemoveWorker(Worker * worker)659 void TaskManager::RemoveWorker(Worker* worker)
660 {
661 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
662 idleWorkers_.erase(worker);
663 workers_.erase(worker);
664 }
665
RestoreWorker(Worker * worker)666 void TaskManager::RestoreWorker(Worker* worker)
667 {
668 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
669 if (UNLIKELY(suspend_)) {
670 suspend_ = false;
671 uv_timer_again(timer_);
672 }
673 if (worker->state_ == WorkerState::BLOCKED) {
674 // since the worker is blocked, we should add it to the timeout set
675 timeoutWorkers_.insert(worker);
676 return;
677 }
678 // Since the worker may be executing some tasks in IO thread, we should add it to the
679 // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
680 HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
681 idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
682 if (GetTaskNum() != 0) {
683 NotifyExecuteTask();
684 }
685 }
686
RegisterCallback(napi_env env,uint64_t taskId,std::shared_ptr<CallbackInfo> callbackInfo)687 void TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
688 {
689 std::lock_guard<std::mutex> lock(callbackMutex_);
690 callbackTable_[taskId] = callbackInfo;
691 }
692
GetCallbackInfo(uint64_t taskId)693 std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId)
694 {
695 std::lock_guard<std::mutex> lock(callbackMutex_);
696 auto iter = callbackTable_.find(taskId);
697 if (iter == callbackTable_.end() || iter->second == nullptr) {
698 HILOG_ERROR("taskpool:: the callback does not exist");
699 return nullptr;
700 }
701 return iter->second;
702 }
703
IncreaseRefCount(uint64_t taskId)704 void TaskManager::IncreaseRefCount(uint64_t taskId)
705 {
706 if (taskId == 0) { // do not support func
707 return;
708 }
709 std::lock_guard<std::mutex> lock(callbackMutex_);
710 auto iter = callbackTable_.find(taskId);
711 if (iter == callbackTable_.end() || iter->second == nullptr) {
712 return;
713 }
714 iter->second->refCount++;
715 }
716
DecreaseRefCount(napi_env env,uint64_t taskId)717 void TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId)
718 {
719 if (taskId == 0) { // do not support func
720 return;
721 }
722 std::lock_guard<std::mutex> lock(callbackMutex_);
723 auto iter = callbackTable_.find(taskId);
724 if (iter == callbackTable_.end() || iter->second == nullptr) {
725 return;
726 }
727 iter->second->refCount--;
728 if (iter->second->refCount == 0) {
729 callbackTable_.erase(iter);
730 }
731 }
732
NotifyCallbackExecute(napi_env env,TaskResultInfo * resultInfo,Task * task)733 napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
734 {
735 std::lock_guard<std::mutex> lock(callbackMutex_);
736 auto iter = callbackTable_.find(task->taskId_);
737 if (iter == callbackTable_.end() || iter->second == nullptr) {
738 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
739 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
740 delete resultInfo;
741 return nullptr;
742 }
743 Worker* worker = static_cast<Worker*>(task->worker_);
744 worker->Enqueue(resultInfo);
745 auto callbackInfo = iter->second;
746 callbackInfo->refCount++;
747 callbackInfo->onCallbackSignal->data = worker;
748 uv_async_send(callbackInfo->onCallbackSignal);
749 return nullptr;
750 }
751
NotifyDependencyTaskInfo(uint64_t taskId)752 void TaskManager::NotifyDependencyTaskInfo(uint64_t taskId)
753 {
754 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
755 auto iter = dependentTaskInfos_.find(taskId);
756 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
757 return;
758 }
759 for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
760 {
761 std::unique_lock<std::shared_mutex> lock(tasksMutex_);
762 auto taskIter = tasks_.find(*taskIdIter);
763 if (taskIter == tasks_.end()) {
764 taskIdIter = iter->second.erase(taskIdIter);
765 continue;
766 }
767 }
768 auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
769 if (taskInfo.first == 0) {
770 taskIdIter = iter->second.erase(taskIdIter);
771 continue;
772 }
773 EnqueueTaskId(taskInfo.first, taskInfo.second);
774 auto dependTaskIter = dependTaskInfos_.find(*taskIdIter);
775 if (dependTaskIter != dependTaskInfos_.end()) {
776 auto dependTaskInnerIter = dependTaskIter->second.find(taskId);
777 if (dependTaskInnerIter != dependTaskIter->second.end()) {
778 dependTaskIter->second.erase(dependTaskInnerIter);
779 }
780 }
781 taskIdIter = iter->second.erase(taskIdIter);
782 }
783 }
784
IsDependendByTaskId(uint64_t taskId)785 bool TaskManager::IsDependendByTaskId(uint64_t taskId)
786 {
787 {
788 std::unique_lock<std::shared_mutex> lock(tasksMutex_);
789 auto taskIter = tasks_.find(taskId);
790 if (taskIter == tasks_.end()) {
791 return false;
792 }
793 auto task = reinterpret_cast<Task*>(taskIter->second);
794 if (!task->IsCommonTask()) {
795 return false;
796 }
797 }
798 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
799 auto iter = dependTaskInfos_.find(taskId);
800 if (iter == dependTaskInfos_.end() || iter->second.empty()) {
801 return false;
802 }
803 return true;
804 }
805
StoreTaskDependency(uint64_t taskId,std::set<uint64_t> taskIdSet)806 bool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet)
807 {
808 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
809 StoreDependentTaskInfo(taskIdSet, taskId);
810 auto iter = dependTaskInfos_.find(taskId);
811 if (iter == dependTaskInfos_.end()) {
812 for (const auto& dependentId : taskIdSet) {
813 auto idIter = dependTaskInfos_.find(dependentId);
814 if (idIter == dependTaskInfos_.end()) {
815 continue;
816 }
817 if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
818 return false;
819 }
820 }
821 dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
822 return true;
823 }
824
825 for (const auto& dependentId : iter->second) {
826 auto idIter = dependTaskInfos_.find(dependentId);
827 if (idIter == dependTaskInfos_.end()) {
828 continue;
829 }
830 if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
831 return false;
832 }
833 }
834 iter->second.insert(taskIdSet.begin(), taskIdSet.end());
835 return true;
836 }
837
CheckCircularDependency(std::set<uint64_t> dependentIdSet,std::set<uint64_t> idSet,uint64_t taskId)838 bool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId)
839 {
840 for (const auto& id : idSet) {
841 if (id == taskId) {
842 return false;
843 }
844 auto iter = dependentIdSet.find(id);
845 if (iter != dependentIdSet.end()) {
846 continue;
847 }
848 auto dIter = dependTaskInfos_.find(id);
849 if (dIter == dependTaskInfos_.end()) {
850 continue;
851 }
852 if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
853 return false;
854 }
855 }
856 return true;
857 }
858
RemoveTaskDependency(uint64_t taskId,uint64_t dependentId)859 bool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId)
860 {
861 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
862 RemoveDependentTaskInfo(dependentId, taskId);
863 auto iter = dependTaskInfos_.find(taskId);
864 if (iter == dependTaskInfos_.end()) {
865 return false;
866 }
867 auto dependIter = iter->second.find(dependentId);
868 if (dependIter == iter->second.end()) {
869 return false;
870 }
871 iter->second.erase(dependIter);
872 return true;
873 }
874
EnqueuePendingTaskInfo(uint64_t taskId,Priority priority)875 void TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority)
876 {
877 if (taskId == 0) {
878 return;
879 }
880 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
881 pendingTaskInfos_.emplace(taskId, priority);
882 }
883
DequeuePendingTaskInfo(uint64_t taskId)884 std::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId)
885 {
886 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
887 if (pendingTaskInfos_.empty()) {
888 return std::make_pair(0, Priority::DEFAULT);
889 }
890 std::pair<uint64_t, Priority> result;
891 for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
892 if (it->first == taskId) {
893 result = std::make_pair(it->first, it->second);
894 it = pendingTaskInfos_.erase(it);
895 break;
896 }
897 }
898 return result;
899 }
900
RemovePendingTaskInfo(uint64_t taskId)901 void TaskManager::RemovePendingTaskInfo(uint64_t taskId)
902 {
903 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
904 pendingTaskInfos_.erase(taskId);
905 }
906
StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet,uint64_t taskId)907 void TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId)
908 {
909 for (const auto& id : dependentTaskIdSet) {
910 auto iter = dependentTaskInfos_.find(id);
911 if (iter == dependentTaskInfos_.end()) {
912 std::set<uint64_t> set{taskId};
913 dependentTaskInfos_.emplace(id, std::move(set));
914 } else {
915 iter->second.emplace(taskId);
916 }
917 }
918 }
919
RemoveDependentTaskInfo(uint64_t dependentTaskId,uint64_t taskId)920 void TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId)
921 {
922 auto iter = dependentTaskInfos_.find(dependentTaskId);
923 if (iter == dependentTaskInfos_.end()) {
924 return;
925 }
926 auto taskIter = iter->second.find(taskId);
927 if (taskIter == iter->second.end()) {
928 return;
929 }
930 iter->second.erase(taskIter);
931 }
932
StoreTaskDuration(uint64_t taskId,uint64_t totalDuration,uint64_t cpuDuration)933 void TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
934 {
935 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
936 auto iter = taskDurationInfos_.find(taskId);
937 if (iter == taskDurationInfos_.end()) {
938 std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
939 taskDurationInfos_.emplace(taskId, std::move(durationData));
940 } else {
941 if (totalDuration != 0) {
942 iter->second.first = totalDuration;
943 }
944 if (cpuDuration != 0) {
945 iter->second.second = cpuDuration;
946 }
947 }
948 }
949
GetTaskDuration(uint64_t taskId,std::string durationType)950 uint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType)
951 {
952 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
953 auto iter = taskDurationInfos_.find(taskId);
954 if (iter == taskDurationInfos_.end()) {
955 return 0;
956 }
957 if (durationType == TASK_TOTAL_TIME) {
958 return iter->second.first;
959 } else if (durationType == TASK_CPU_TIME) {
960 return iter->second.second;
961 } else if (iter->second.first == 0) {
962 return 0;
963 }
964 return iter->second.first - iter->second.second;
965 }
966
RemoveTaskDuration(uint64_t taskId)967 void TaskManager::RemoveTaskDuration(uint64_t taskId)
968 {
969 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
970 auto iter = taskDurationInfos_.find(taskId);
971 if (iter != taskDurationInfos_.end()) {
972 taskDurationInfos_.erase(iter);
973 }
974 }
975
ReleaseTaskData(napi_env env,Task * task)976 void TaskManager::ReleaseTaskData(napi_env env, Task* task)
977 {
978 uint64_t taskId = task->taskId_;
979 RemoveTask(taskId);
980 if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
981 return;
982 }
983 DecreaseRefCount(env, taskId);
984 RemoveTaskDuration(taskId);
985 if (!task->IsCommonTask()) {
986 return;
987 }
988 RemovePendingTaskInfo(taskId);
989 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
990 for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
991 if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
992 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
993 } else {
994 ++dependentTaskIter;
995 }
996 }
997 auto dependTaskIter = dependTaskInfos_.find(taskId);
998 if (dependTaskIter != dependTaskInfos_.end()) {
999 dependTaskInfos_.erase(dependTaskIter);
1000 }
1001 }
1002
StoreTask(uint64_t taskId,Task * task)1003 void TaskManager::StoreTask(uint64_t taskId, Task* task)
1004 {
1005 std::unique_lock<std::shared_mutex> lock(tasksMutex_);
1006 tasks_.emplace(taskId, task);
1007 }
1008
RemoveTask(uint64_t taskId)1009 void TaskManager::RemoveTask(uint64_t taskId)
1010 {
1011 std::unique_lock<std::shared_mutex> lock(tasksMutex_);
1012 tasks_.erase(taskId);
1013 }
1014
GetTask(uint64_t taskId)1015 Task* TaskManager::GetTask(uint64_t taskId)
1016 {
1017 std::unique_lock<std::shared_mutex> lock(tasksMutex_);
1018 auto iter = tasks_.find(taskId);
1019 if (iter == tasks_.end()) {
1020 return nullptr;
1021 }
1022 return iter->second;
1023 }
1024
1025 // ----------------------------------- TaskGroupManager ----------------------------------------
GetInstance()1026 TaskGroupManager& TaskGroupManager::GetInstance()
1027 {
1028 static TaskGroupManager groupManager;
1029 return groupManager;
1030 }
1031
AddTask(uint64_t groupId,napi_ref taskRef,uint64_t taskId)1032 void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId)
1033 {
1034 auto groupIter = taskGroups_.find(groupId);
1035 if (groupIter == taskGroups_.end()) {
1036 return;
1037 }
1038 auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
1039 taskGroup->taskRefs_.push_back(taskRef);
1040 taskGroup->taskNum_++;
1041 taskGroup->taskIds_.push_back(taskId);
1042 }
1043
ReleaseTaskGroupData(napi_env env,TaskGroup * group)1044 void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
1045 {
1046 TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
1047 for (uint64_t taskId : group->taskIds_) {
1048 Task* task = TaskManager::GetInstance().GetTask(taskId);
1049 if (task == nullptr) {
1050 continue;
1051 }
1052 napi_reference_unref(task->env_, task->taskRef_, nullptr);
1053 }
1054 }
1055
CancelGroup(napi_env env,uint64_t groupId)1056 void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
1057 {
1058 TaskGroup* taskGroup = GetTaskGroup(groupId);
1059 if (taskGroup == nullptr) {
1060 HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
1061 return;
1062 }
1063 {
1064 std::unique_lock<std::shared_mutex> lock(taskGroup->taskGroupMutex_);
1065 if (taskGroup->currentGroupInfo_ == nullptr) {
1066 HILOG_ERROR("taskpool:: cancel non-existent task group");
1067 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP);
1068 return;
1069 }
1070 taskGroup->CancelPendingGroup(env);
1071 }
1072 if (taskGroup->groupState_ == ExecuteState::NOT_FOUND) {
1073 HILOG_ERROR("taskpool:: cancel non-existent task group");
1074 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP);
1075 return;
1076 }
1077 if (taskGroup->groupState_ == ExecuteState::CANCELED) {
1078 return;
1079 }
1080 if (taskGroup->currentGroupInfo_->finishedTask != taskGroup->taskNum_) {
1081 for (uint64_t taskId : taskGroup->taskIds_) {
1082 CancelGroupTask(env, taskId, taskGroup);
1083 }
1084 }
1085 taskGroup->groupState_ = ExecuteState::CANCELED;
1086 }
1087
CancelGroupTask(napi_env env,uint64_t taskId,TaskGroup * group)1088 void TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group)
1089 {
1090 auto task = TaskManager::GetInstance().GetTask(taskId);
1091 if (task == nullptr) {
1092 HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
1093 return;
1094 }
1095 std::unique_lock<std::shared_mutex> lock(task->taskMutex_);
1096 ExecuteState state = task->taskState_;
1097 switch (state) {
1098 case ExecuteState::NOT_FOUND:
1099 return;
1100 case ExecuteState::RUNNING:
1101 task->taskState_ = ExecuteState::CANCELED;
1102 break;
1103 case ExecuteState::WAITING:
1104 task->taskState_ = ExecuteState::CANCELED;
1105 if (group->groupState_ == ExecuteState::WAITING) {
1106 group->groupState_ = ExecuteState::CANCELED;
1107 break;
1108 }
1109 if (task->currentTaskInfo_ != nullptr) {
1110 delete task->currentTaskInfo_;
1111 task->currentTaskInfo_ = nullptr;
1112 }
1113 break;
1114 default:
1115 break;
1116 }
1117 }
1118
StoreSequenceRunner(uint64_t seqRunnerId,SequenceRunner * seqRunner)1119 void TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
1120 {
1121 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1122 seqRunners_.emplace(seqRunnerId, seqRunner);
1123 }
1124
RemoveSequenceRunner(uint64_t seqRunnerId)1125 void TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId)
1126 {
1127 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1128 seqRunners_.erase(seqRunnerId);
1129 }
1130
GetSeqRunner(uint64_t seqRunnerId)1131 SequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId)
1132 {
1133 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1134 auto iter = seqRunners_.find(seqRunnerId);
1135 if (iter != seqRunners_.end()) {
1136 return iter->second;
1137 }
1138 HILOG_ERROR("taskpool:: seqRunner not exist.");
1139 return nullptr;
1140 }
1141
AddTaskToSeqRunner(uint64_t seqRunnerId,Task * task)1142 void TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
1143 {
1144 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1145 auto iter = seqRunners_.find(seqRunnerId);
1146 if (iter == seqRunners_.end()) {
1147 HILOG_ERROR("seqRunner:: seqRunner not found.");
1148 return;
1149 } else {
1150 std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_);
1151 iter->second->seqRunnerTasks_.push(task);
1152 }
1153 }
1154
TriggerSeqRunner(napi_env env,Task * lastTask)1155 bool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask)
1156 {
1157 uint64_t seqRunnerId = lastTask->seqRunnerId_;
1158 SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
1159 if (seqRunner == nullptr) {
1160 HILOG_ERROR("seqRunner:: trigger seqRunner not exist.");
1161 return false;
1162 }
1163 napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr);
1164 if (seqRunner->currentTaskId_ != lastTask->taskId_) {
1165 HILOG_ERROR("seqRunner:: only front task can trigger seqRunner.");
1166 return false;
1167 }
1168 {
1169 std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_);
1170 if (seqRunner->seqRunnerTasks_.empty()) {
1171 HILOG_DEBUG("seqRunner:: seqRunner %" PRIu64 " empty.", seqRunnerId);
1172 seqRunner->currentTaskId_ = 0;
1173 return true;
1174 }
1175 Task* task = seqRunner->seqRunnerTasks_.front();
1176 seqRunner->seqRunnerTasks_.pop();
1177 while (task->taskState_ == ExecuteState::CANCELED) {
1178 if (seqRunner->seqRunnerTasks_.empty()) {
1179 HILOG_DEBUG("seqRunner:: seqRunner %" PRIu64 " empty in cancel loop.", seqRunnerId);
1180 seqRunner->currentTaskId_ = 0;
1181 return true;
1182 }
1183 task = seqRunner->seqRunnerTasks_.front();
1184 seqRunner->seqRunnerTasks_.pop();
1185 }
1186 seqRunner->currentTaskId_ = task->taskId_;
1187 task->IncreaseRefCount();
1188 HILOG_DEBUG("seqRunner:: Trig task %" PRIu64 " in seqRunner %" PRIu64 ".", task->taskId_, seqRunnerId);
1189 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
1190 TaskManager::GetInstance().TryTriggerExpand();
1191 }
1192 return true;
1193 }
1194
StoreTaskGroup(uint64_t groupId,TaskGroup * taskGroup)1195 void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
1196 {
1197 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1198 taskGroups_.emplace(groupId, taskGroup);
1199 }
1200
RemoveTaskGroup(uint64_t groupId)1201 void TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
1202 {
1203 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1204 taskGroups_.erase(groupId);
1205 }
1206
GetTaskGroup(uint64_t groupId)1207 TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
1208 {
1209 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1210 auto groupIter = taskGroups_.find(groupId);
1211 if (groupIter == taskGroups_.end()) {
1212 return nullptr;
1213 }
1214 return reinterpret_cast<TaskGroup*>(groupIter->second);
1215 }
1216
UpdateGroupState(uint64_t groupId)1217 void TaskGroupManager::UpdateGroupState(uint64_t groupId)
1218 {
1219 TaskGroup* group = GetTaskGroup(groupId);
1220 if (group == nullptr) {
1221 HILOG_ERROR("taskpool:: UpdateGroupState group is nullptr");
1222 return;
1223 }
1224 group->groupState_ = ExecuteState::RUNNING;
1225 }
1226 } // namespace Commonlibrary::Concurrent::TaskPoolModule
1227