1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_manager.h"
17
18 #include <cinttypes>
19 #include <securec.h>
20 #include <thread>
21
22 #if defined(ENABLE_TASKPOOL_FFRT)
23 #include "bundle_info.h"
24 #include "bundle_mgr_interface.h"
25 #include "bundle_mgr_proxy.h"
26 #include "iservice_registry.h"
27 #include "parameters.h"
28 #include "status_receiver_interface.h"
29 #include "system_ability_definition.h"
30 #include "c/executor_task.h"
31 #include "ffrt_inner.h"
32 #endif
33 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
34 #include "helper/concurrent_helper.h"
35 #include "helper/error_helper.h"
36 #include "helper/hitrace_helper.h"
37 #include "taskpool.h"
38 #include "tools/log.h"
39 #include "worker.h"
40
41 namespace Commonlibrary::Concurrent::TaskPoolModule {
42 using namespace OHOS::JsSysModule;
43
44 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
45 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
46 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
47 static constexpr uint32_t STEP_SIZE = 2;
48 static constexpr uint32_t DEFAULT_THREADS = 3;
49 static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
50 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
51 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
52 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
53 static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
54 static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
55 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
56
57 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
58 static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
59 {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
60 {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
61 {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
62 {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
63 };
64 #endif
65
66 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()67 TaskManager& TaskManager::GetInstance()
68 {
69 static TaskManager manager;
70 return manager;
71 }
72
TaskManager()73 TaskManager::TaskManager()
74 {
75 for (size_t i = 0; i < taskQueues_.size(); i++) {
76 std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
77 taskQueues_[i] = std::move(taskQueue);
78 }
79 }
80
~TaskManager()81 TaskManager::~TaskManager()
82 {
83 HILOG_INFO("taskpool:: ~TaskManager");
84 if (timer_ == nullptr) {
85 HILOG_ERROR("taskpool:: timer_ is nullptr");
86 } else {
87 uv_timer_stop(timer_);
88 ConcurrentHelper::UvHandleClose(timer_);
89 ConcurrentHelper::UvHandleClose(expandHandle_);
90 }
91
92 if (loop_ != nullptr) {
93 uv_stop(loop_);
94 }
95
96 {
97 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
98 for (auto& worker : workers_) {
99 delete worker;
100 }
101 workers_.clear();
102 }
103
104 {
105 std::lock_guard<std::mutex> lock(callbackMutex_);
106 for (auto& [_, callbackPtr] : callbackTable_) {
107 if (callbackPtr == nullptr) {
108 continue;
109 }
110 callbackPtr.reset();
111 }
112 callbackTable_.clear();
113 }
114
115 {
116 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
117 for (auto& [_, task] : tasks_) {
118 delete task;
119 task = nullptr;
120 }
121 tasks_.clear();
122 }
123 CountTraceForWorker();
124 }
125
CountTraceForWorker()126 void TaskManager::CountTraceForWorker()
127 {
128 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
129 int64_t threadNum = static_cast<int64_t>(workers_.size());
130 int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
131 int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
132 HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
133 HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
134 HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
135 HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
136 }
137
GetThreadInfos(napi_env env)138 napi_value TaskManager::GetThreadInfos(napi_env env)
139 {
140 napi_value threadInfos = nullptr;
141 napi_create_array(env, &threadInfos);
142 {
143 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
144 int32_t i = 0;
145 for (auto& worker : workers_) {
146 if (worker->workerEnv_ == nullptr) {
147 continue;
148 }
149 napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_));
150 napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_));
151
152 napi_value taskId = nullptr;
153 napi_create_array(env, &taskId);
154 int32_t j = 0;
155 {
156 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
157 for (auto& currentId : worker->currentTaskId_) {
158 napi_value id = NapiHelper::CreateUint32(env, currentId);
159 napi_set_element(env, taskId, j, id);
160 j++;
161 }
162 }
163 napi_value threadInfo = nullptr;
164 napi_create_object(env, &threadInfo);
165 napi_set_named_property(env, threadInfo, "tid", tid);
166 napi_set_named_property(env, threadInfo, "priority", priority);
167 napi_set_named_property(env, threadInfo, "taskIds", taskId);
168 napi_set_element(env, threadInfos, i, threadInfo);
169 i++;
170 }
171 }
172 return threadInfos;
173 }
174
GetTaskInfos(napi_env env)175 napi_value TaskManager::GetTaskInfos(napi_env env)
176 {
177 napi_value taskInfos = nullptr;
178 napi_create_array(env, &taskInfos);
179 {
180 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
181 int32_t i = 0;
182 for (const auto& [_, task] : tasks_) {
183 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
184 task->taskState_ == ExecuteState::FINISHED) {
185 continue;
186 }
187 napi_value taskInfoValue = NapiHelper::CreateObject(env);
188 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
189 napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_);
190 napi_value name = nullptr;
191 napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
192 napi_set_named_property(env, taskInfoValue, "name", name);
193 ExecuteState state = task->taskState_;
194 uint64_t duration = 0;
195 if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
196 duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
197 }
198 napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
199 napi_set_named_property(env, taskInfoValue, "taskId", taskId);
200 napi_set_named_property(env, taskInfoValue, "state", stateValue);
201 napi_value durationValue = NapiHelper::CreateUint32(env, duration);
202 napi_set_named_property(env, taskInfoValue, "duration", durationValue);
203 napi_set_element(env, taskInfos, i, taskInfoValue);
204 i++;
205 }
206 }
207 return taskInfos;
208 }
209
UpdateExecutedInfo(uint64_t duration)210 void TaskManager::UpdateExecutedInfo(uint64_t duration)
211 {
212 totalExecTime_ += duration;
213 totalExecCount_++;
214 }
215
ComputeSuitableThreadNum()216 uint32_t TaskManager::ComputeSuitableThreadNum()
217 {
218 uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
219 return targetNum;
220 }
221
ComputeSuitableIdleNum()222 uint32_t TaskManager::ComputeSuitableIdleNum()
223 {
224 uint32_t targetNum = 0;
225 if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
226 // this branch is used for avoiding time-consuming tasks that may block the taskpool
227 targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
228 } else if (totalExecCount_ != 0) {
229 auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
230 uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
231 targetNum = std::min(result, GetNonIdleTaskNum());
232 }
233 return targetNum;
234 }
235
CheckForBlockedWorkers()236 void TaskManager::CheckForBlockedWorkers()
237 {
238 // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
239 // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
240 // else we will choose the longer one
241 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
242 bool needChecking = false;
243 bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
244 uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
245 for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
246 auto worker = *iter;
247 // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
248 // if the worker is executing the longTask, we will not do the check
249 if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
250 (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
251 !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
252 continue;
253 }
254 // When executing the promise task, the worker state may not be updated and will be
255 // marked as 'BLOCKED', so we should exclude this situation.
256 // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
257 // the task like I/O in uv threads, we should also exclude this situation.
258 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
259 if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
260 if (!workerEngine->HasWaitingRequest()) {
261 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
262 } else {
263 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
264 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
265 }
266 continue;
267 }
268
269 HILOG_INFO("taskpool:: The worker has been marked as timeout.");
270 // If the current worker has a longTask and is not executing, we will only interrupt it.
271 if (worker->HasLongTask()) {
272 continue;
273 }
274 needChecking = true;
275 idleWorkers_.erase(worker);
276 timeoutWorkers_.insert(worker);
277 }
278 // should trigger the check when we have marked and removed workers
279 if (UNLIKELY(needChecking)) {
280 TryExpand();
281 }
282 }
283
TryTriggerExpand()284 void TaskManager::TryTriggerExpand()
285 {
286 // post the signal to notify the monitor thread to expand
287 if (UNLIKELY(!isHandleInited_)) {
288 NotifyExecuteTask();
289 needChecking_ = true;
290 HILOG_DEBUG("taskpool:: the expandHandle_ is nullptr");
291 return;
292 }
293 uv_async_send(expandHandle_);
294 }
295
296 #if defined(OHOS_PLATFORM)
297 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(pid_t tid,char * buf,uint32_t size)298 bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
299 {
300 char path[128]; // 128: buffer for path
301 pid_t pid = getpid();
302 ssize_t bytesLen = -1;
303 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
304 if (ret < 0) {
305 HILOG_ERROR("snprintf_s failed");
306 return false;
307 }
308 int fd = open(path, O_RDONLY | O_NONBLOCK);
309 if (UNLIKELY(fd == -1)) {
310 return false;
311 }
312 bytesLen = read(fd, buf, size - 1);
313 close(fd);
314 if (bytesLen <= 0) {
315 HILOG_ERROR("taskpool:: failed to read %{public}s", path);
316 return false;
317 }
318 buf[bytesLen] = '\0';
319 return true;
320 }
321
GetIdleWorkers()322 uint32_t TaskManager::GetIdleWorkers()
323 {
324 char buf[4096]; // 4096: buffer for thread info
325 uint32_t idleCount = 0;
326 std::unordered_set<pid_t> tids {};
327 {
328 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
329 for (auto& worker : idleWorkers_) {
330 #if defined(ENABLE_TASKPOOL_FFRT)
331 if (worker->ffrtTaskHandle_ != nullptr) {
332 if (worker->GetWaitTime() > 0) {
333 idleCount++;
334 }
335 continue;
336 }
337 #endif
338 tids.emplace(worker->tid_);
339 }
340 }
341 // The ffrt thread does not read thread info
342 for (auto tid : tids) {
343 if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
344 continue;
345 }
346 char state;
347 if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
348 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
349 return 0;
350 }
351 if (state == 'S') {
352 idleCount++;
353 }
354 }
355 return idleCount;
356 }
357
GetIdleWorkersList(uint32_t step)358 void TaskManager::GetIdleWorkersList(uint32_t step)
359 {
360 char buf[4096]; // 4096: buffer for thread info
361 for (auto& worker : idleWorkers_) {
362 #if defined(ENABLE_TASKPOOL_FFRT)
363 if (worker->ffrtTaskHandle_ != nullptr) {
364 uint64_t workerWaitTime = worker->GetWaitTime();
365 bool isWorkerLoopActive = worker->IsLoopActive();
366 if (workerWaitTime == 0) {
367 continue;
368 }
369 uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
370 std::chrono::steady_clock::now().time_since_epoch()).count());
371 if (!isWorkerLoopActive) {
372 freeList_.emplace_back(worker);
373 } else if ((currTime - workerWaitTime) > IDLE_THRESHOLD * TRIGGER_INTERVAL) {
374 freeList_.emplace_back(worker);
375 HILOG_INFO("taskpool:: worker in ffrt epoll wait more than 2 intervals, force to free.");
376 } else {
377 HILOG_INFO("taskpool:: worker uv alive, and will be free in 2 intervals if not wake.");
378 }
379 continue;
380 }
381 #endif
382 if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
383 continue;
384 }
385 char state;
386 uint64_t utime;
387 if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
388 &state, sizeof(state), &utime) != 2) { // 2: state and utime
389 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
390 return;
391 }
392 if (state != 'S' || utime != worker->lastCpuTime_) {
393 worker->idleCount_ = 0;
394 worker->lastCpuTime_ = utime;
395 continue;
396 }
397 if (++worker->idleCount_ >= IDLE_THRESHOLD) {
398 freeList_.emplace_back(worker);
399 }
400 }
401 }
402
TriggerShrink(uint32_t step)403 void TaskManager::TriggerShrink(uint32_t step)
404 {
405 GetIdleWorkersList(step);
406 step = std::min(step, static_cast<uint32_t>(freeList_.size()));
407 uint32_t count = 0;
408 for (size_t i = 0; i < freeList_.size(); i++) {
409 auto worker = freeList_[i];
410 if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
411 continue;
412 }
413 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
414 if (idleTime < MAX_IDLE_TIME || worker->runningCount_ != 0) {
415 continue;
416 }
417 idleWorkers_.erase(worker);
418 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
419 uv_async_send(worker->clearWorkerSignal_);
420 if (++count == step) {
421 break;
422 }
423 }
424 freeList_.clear();
425 }
426 #else
GetIdleWorkers()427 uint32_t TaskManager::GetIdleWorkers()
428 {
429 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
430 return idleWorkers_.size();
431 }
432
TriggerShrink(uint32_t step)433 void TaskManager::TriggerShrink(uint32_t step)
434 {
435 for (uint32_t i = 0; i < step; i++) {
436 // try to free the worker that idle time meets the requirement
437 auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker *worker) {
438 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
439 return idleTime > MAX_IDLE_TIME && worker->runningCount_ == 0 && !worker->HasLongTask();
440 });
441 // remove it from all sets
442 if (iter != idleWorkers_.end()) {
443 auto worker = *iter;
444 idleWorkers_.erase(worker);
445 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
446 uv_async_send(worker->clearWorkerSignal_);
447 }
448 }
449 }
450 #endif
451
NotifyShrink(uint32_t targetNum)452 void TaskManager::NotifyShrink(uint32_t targetNum)
453 {
454 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
455 uint32_t workerCount = workers_.size();
456 uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
457 if (minThread == 0) {
458 HILOG_INFO("taskpool:: the system now is under low memory");
459 }
460 if (workerCount > minThread && workerCount > targetNum) {
461 targetNum = std::max(minThread, targetNum);
462 uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
463 TriggerShrink(step);
464 }
465 // remove all timeout workers
466 for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
467 if (workers_.find(*iter) == workers_.end()) {
468 HILOG_WARN("taskpool:: current worker maybe release");
469 iter = timeoutWorkers_.erase(iter);
470 } else if ((*iter)->runningCount_ == 0) {
471 HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", (*iter)->tid_);
472 uv_async_send((*iter)->clearWorkerSignal_);
473 timeoutWorkers_.erase(iter++);
474 return;
475 } else {
476 iter++;
477 }
478 }
479 uint32_t idleNum = idleWorkers_.size();
480 // System memory state is moderate and the worker has exeuted tasks, we will try to release it
481 if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
482 auto worker = *(idleWorkers_.begin());
483 if (worker == nullptr || worker->clearWorkerSignal_ == nullptr) {
484 return;
485 }
486 if (worker->HasLongTask()) { // worker that has longTask should not be released
487 return;
488 }
489 if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
490 TriggerShrink(DEFAULT_MIN_THREADS);
491 return;
492 }
493 }
494
495 // Create a worker for performance
496 if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
497 CreateWorkers(hostEnv_);
498 }
499 // stop the timer
500 if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
501 suspend_ = true;
502 uv_timer_stop(timer_);
503 HILOG_DEBUG("taskpool:: timer will be suspended");
504 }
505 }
506
TriggerLoadBalance(const uv_timer_t * req)507 void TaskManager::TriggerLoadBalance(const uv_timer_t* req)
508 {
509 TaskManager& taskManager = TaskManager::GetInstance();
510 taskManager.CheckForBlockedWorkers();
511 uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
512 taskManager.NotifyShrink(targetNum);
513 taskManager.CountTraceForWorker();
514 }
515
TryExpand()516 void TaskManager::TryExpand()
517 {
518 // dispatch task in the TaskPoolManager thread
519 NotifyExecuteTask();
520 // do not trigger when there are more idleWorkers than tasks
521 uint32_t idleNum = GetIdleWorkers();
522 if (idleNum > GetNonIdleTaskNum()) {
523 return;
524 }
525 needChecking_ = false; // do not need to check
526 uint32_t targetNum = ComputeSuitableIdleNum();
527 uint32_t workerCount = 0;
528 uint32_t idleCount = 0;
529 uint32_t timeoutWorkers = 0;
530 {
531 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
532 idleCount = idleWorkers_.size();
533 workerCount = workers_.size();
534 timeoutWorkers = timeoutWorkers_.size();
535 }
536 uint32_t maxThreads = std::max(ConcurrentHelper::GetMaxThreads(), DEFAULT_THREADS);
537 maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
538 if (workerCount < maxThreads && idleCount < targetNum) {
539 uint32_t step = std::min(maxThreads, targetNum) - idleCount;
540 // Prevent the total number of expanded threads from exceeding maxThreads
541 if (step + workerCount > maxThreads) {
542 step = maxThreads - workerCount;
543 }
544 CreateWorkers(hostEnv_, step);
545 HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
546 maxThreads, step, GetThreadNum());
547 }
548 if (UNLIKELY(suspend_)) {
549 suspend_ = false;
550 uv_timer_again(timer_);
551 }
552 }
553
NotifyExpand(const uv_async_t * req)554 void TaskManager::NotifyExpand(const uv_async_t* req)
555 {
556 TaskManager& taskManager = TaskManager::GetInstance();
557 taskManager.TryExpand();
558 }
559
RunTaskManager()560 void TaskManager::RunTaskManager()
561 {
562 loop_ = uv_loop_new();
563 if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
564 HILOG_FATAL("taskpool:: new loop failed.");
565 return;
566 }
567 ConcurrentHelper::UvHandleInit(loop_, expandHandle_, TaskManager::NotifyExpand);
568 timer_ = new uv_timer_t;
569 uv_timer_init(loop_, timer_);
570 uv_timer_start(timer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
571 isHandleInited_ = true;
572 #if defined IOS_PLATFORM || defined MAC_PLATFORM
573 pthread_setname_np("OS_TaskManager");
574 #else
575 pthread_setname_np(pthread_self(), "OS_TaskManager");
576 #endif
577 if (UNLIKELY(needChecking_)) {
578 needChecking_ = false;
579 uv_async_send(expandHandle_);
580 }
581 uv_run(loop_, UV_RUN_DEFAULT);
582 if (loop_ != nullptr) {
583 uv_loop_delete(loop_);
584 }
585 }
586
CancelTask(napi_env env,uint64_t taskId)587 void TaskManager::CancelTask(napi_env env, uint64_t taskId)
588 {
589 // 1. Cannot find taskInfo by executeId, throw error
590 // 2. Find executing taskInfo, skip it
591 // 3. Find waiting taskInfo, cancel it
592 // 4. Find canceled taskInfo, skip it
593 std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
594 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
595 HITRACE_HELPER_METER_NAME(strTrace);
596 Task* task = GetTask(taskId);
597 if (task == nullptr) {
598 std::string errMsg = "taskpool:: the task may not exist";
599 HILOG_ERROR("%{public}s", errMsg.c_str());
600 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
601 return;
602 }
603 if (task->taskState_ == ExecuteState::CANCELED) {
604 HILOG_DEBUG("taskpool:: task has been canceled");
605 return;
606 }
607 if (task->IsGroupCommonTask()) {
608 // when task is a group common task, still check the state
609 if (task->currentTaskInfo_ == nullptr || task->taskState_ == ExecuteState::NOT_FOUND ||
610 task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
611 std::string errMsg = "taskpool:: task is not executed or has been executed";
612 HILOG_ERROR("%{public}s", errMsg.c_str());
613 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
614 return;
615 }
616 TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
617 if (taskGroup == nullptr) {
618 return;
619 }
620 return taskGroup->CancelGroupTask(env, task->taskId_);
621 }
622
623 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
624 if (task->IsPeriodicTask()) {
625 napi_reference_unref(env, task->taskRef_, nullptr);
626 task->CancelPendingTask(env);
627 uv_timer_stop(task->timer_);
628 ConcurrentHelper::UvHandleClose(task->timer_);
629 return;
630 } else if (task->IsSeqRunnerTask()) {
631 CancelSeqRunnerTask(env, task);
632 return;
633 }
634 if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
635 task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
636 task->taskState_ == ExecuteState::ENDING) {
637 std::string errMsg = "taskpool:: task is not executed or has been executed";
638 HILOG_ERROR("%{public}s", errMsg.c_str());
639 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
640 return;
641 }
642
643 task->ClearDelayedTimers();
644 ExecuteState state = task->taskState_.exchange(ExecuteState::CANCELED);
645 task->CancelPendingTask(env);
646 if (state == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
647 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
648 task->DecreaseTaskRefCount();
649 EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
650 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: task has been canceled");
651 napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
652 napi_reference_unref(env, task->taskRef_, nullptr);
653 delete task->currentTaskInfo_;
654 task->currentTaskInfo_ = nullptr;
655 }
656 }
657
CancelSeqRunnerTask(napi_env env,Task * task)658 void TaskManager::CancelSeqRunnerTask(napi_env env, Task *task)
659 {
660 if (task->taskState_ == ExecuteState::FINISHED) {
661 std::string errMsg = "taskpool:: sequenceRunner task has been executed";
662 HILOG_ERROR("%{public}s", errMsg.c_str());
663 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
664 } else {
665 task->taskState_ = ExecuteState::CANCELED;
666 }
667 }
668
NotifyWorkerIdle(Worker * worker)669 void TaskManager::NotifyWorkerIdle(Worker* worker)
670 {
671 {
672 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
673 if (worker->state_ == WorkerState::BLOCKED) {
674 return;
675 }
676 idleWorkers_.insert(worker);
677 }
678 if (GetTaskNum() != 0) {
679 NotifyExecuteTask();
680 }
681 CountTraceForWorker();
682 }
683
NotifyWorkerCreated(Worker * worker)684 void TaskManager::NotifyWorkerCreated(Worker* worker)
685 {
686 NotifyWorkerIdle(worker);
687 }
688
NotifyWorkerAdded(Worker * worker)689 void TaskManager::NotifyWorkerAdded(Worker* worker)
690 {
691 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
692 workers_.insert(worker);
693 HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
694 }
695
NotifyWorkerRunning(Worker * worker)696 void TaskManager::NotifyWorkerRunning(Worker* worker)
697 {
698 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
699 idleWorkers_.erase(worker);
700 CountTraceForWorker();
701 }
702
GetRunningWorkers()703 uint32_t TaskManager::GetRunningWorkers()
704 {
705 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
706 return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
707 return worker->runningCount_ != 0;
708 });
709 }
710
GetTimeoutWorkers()711 uint32_t TaskManager::GetTimeoutWorkers()
712 {
713 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
714 return timeoutWorkers_.size();
715 }
716
GetTaskNum()717 uint32_t TaskManager::GetTaskNum()
718 {
719 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
720 uint32_t sum = 0;
721 for (const auto& elements : taskQueues_) {
722 sum += elements->GetTaskNum();
723 }
724 return sum;
725 }
726
GetNonIdleTaskNum()727 uint32_t TaskManager::GetNonIdleTaskNum()
728 {
729 return nonIdleTaskNum_;
730 }
731
IncreaseNumIfNoIdle(Priority priority)732 void TaskManager::IncreaseNumIfNoIdle(Priority priority)
733 {
734 if (priority != Priority::IDLE) {
735 ++nonIdleTaskNum_;
736 }
737 }
738
DecreaseNumIfNoIdle(Priority priority)739 void TaskManager::DecreaseNumIfNoIdle(Priority priority)
740 {
741 if (priority != Priority::IDLE) {
742 --nonIdleTaskNum_;
743 }
744 }
745
GetThreadNum()746 uint32_t TaskManager::GetThreadNum()
747 {
748 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
749 return workers_.size();
750 }
751
EnqueueTaskId(uint64_t taskId,Priority priority)752 void TaskManager::EnqueueTaskId(uint64_t taskId, Priority priority)
753 {
754 {
755 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
756 IncreaseNumIfNoIdle(priority);
757 taskQueues_[priority]->EnqueueTaskId(taskId);
758 }
759 TryTriggerExpand();
760 Task* task = GetTask(taskId);
761 if (task == nullptr) {
762 HILOG_FATAL("taskpool:: task is nullptr");
763 return;
764 }
765 task->IncreaseTaskRefCount();
766 if (task->onEnqueuedCallBackInfo_ != nullptr) {
767 task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_);
768 }
769 }
770
EraseWaitingTaskId(uint64_t taskId,Priority priority)771 void TaskManager::EraseWaitingTaskId(uint64_t taskId, Priority priority)
772 {
773 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
774 if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
775 HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
776 }
777 }
778
DequeueTaskId()779 std::pair<uint64_t, Priority> TaskManager::DequeueTaskId()
780 {
781 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
782 auto& highTaskQueue = taskQueues_[Priority::HIGH];
783 if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
784 highPrioExecuteCount_++;
785 return GetTaskByPriority(highTaskQueue, Priority::HIGH);
786 }
787 highPrioExecuteCount_ = 0;
788
789 auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
790 if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
791 mediumPrioExecuteCount_++;
792 return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
793 }
794 mediumPrioExecuteCount_ = 0;
795
796 auto& lowTaskQueue = taskQueues_[Priority::LOW];
797 if (!lowTaskQueue->IsEmpty()) {
798 return GetTaskByPriority(lowTaskQueue, Priority::LOW);
799 }
800
801 auto& idleTaskQueue = taskQueues_[Priority::IDLE];
802 if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) {
803 return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
804 }
805 return std::make_pair(0, Priority::LOW);
806 }
807
IsChooseIdle()808 bool TaskManager::IsChooseIdle()
809 {
810 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
811 for (auto& worker : workers_) {
812 if (worker->state_ == WorkerState::IDLE) {
813 // If worker->state_ is WorkerState::IDLE, it means that the worker is free
814 continue;
815 }
816 // If there is a worker running a task, do not take the idle task.
817 return false;
818 }
819 // Only when all workers are free, will idle task be taken.
820 return true;
821 }
822
GetTaskByPriority(const std::unique_ptr<ExecuteQueue> & taskQueue,Priority priority)823 std::pair<uint64_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
824 Priority priority)
825 {
826 uint64_t taskId = taskQueue->DequeueTaskId();
827 if (IsDependendByTaskId(taskId)) {
828 EnqueuePendingTaskInfo(taskId, priority);
829 return std::make_pair(0, priority);
830 }
831 DecreaseNumIfNoIdle(priority);
832 return std::make_pair(taskId, priority);
833 }
834
NotifyExecuteTask()835 void TaskManager::NotifyExecuteTask()
836 {
837 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
838 if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
839 // When there are only idle tasks and workers executing them, it is not triggered
840 return;
841 }
842
843 for (auto& worker : idleWorkers_) {
844 worker->NotifyExecuteTask();
845 }
846 }
847
InitTaskManager(napi_env env)848 void TaskManager::InitTaskManager(napi_env env)
849 {
850 HITRACE_HELPER_METER_NAME("InitTaskManager");
851 if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
852 #if defined(ENABLE_TASKPOOL_FFRT)
853 globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
854 if (!globalEnableFfrtFlag_) {
855 UpdateSystemAppFlag();
856 if (IsSystemApp()) {
857 disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
858 }
859 }
860 if (EnableFfrt()) {
861 HILOG_INFO("taskpool:: apps use ffrt");
862 ffrt_set_cpu_worker_max_num(ffrt::qos_background, 1);
863 ffrt_set_cpu_worker_max_num(ffrt::qos_utility, 12); // 12 : worker max num
864 ffrt_set_cpu_worker_max_num(ffrt::qos_default, 12); // 12 : worker max num
865 ffrt_set_cpu_worker_max_num(ffrt::qos_user_initiated, 12); // 12 : worker max num
866 } else {
867 HILOG_INFO("taskpool:: apps do not use ffrt");
868 }
869 #endif
870 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
871 mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
872 OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
873 #endif
874 auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
875 if (mainThreadEngine == nullptr) {
876 HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
877 return;
878 }
879 hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
880 // Add a reserved thread for taskpool
881 CreateWorkers(hostEnv_);
882 // Create a timer to manage worker threads
883 std::thread workerManager([this] {this->RunTaskManager();});
884 workerManager.detach();
885 }
886 }
887
CreateWorkers(napi_env env,uint32_t num)888 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
889 {
890 HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
891 for (uint32_t i = 0; i < num; i++) {
892 auto worker = Worker::WorkerConstructor(env);
893 NotifyWorkerAdded(worker);
894 }
895 CountTraceForWorker();
896 }
897
RemoveWorker(Worker * worker)898 void TaskManager::RemoveWorker(Worker* worker)
899 {
900 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
901 idleWorkers_.erase(worker);
902 timeoutWorkers_.erase(worker);
903 workers_.erase(worker);
904 }
905
RestoreWorker(Worker * worker)906 void TaskManager::RestoreWorker(Worker* worker)
907 {
908 std::lock_guard<RECURSIVE_MUTEX> lock(workersMutex_);
909 if (UNLIKELY(suspend_)) {
910 suspend_ = false;
911 uv_timer_again(timer_);
912 }
913 if (worker->state_ == WorkerState::BLOCKED) {
914 // since the worker is blocked, we should add it to the timeout set
915 timeoutWorkers_.insert(worker);
916 return;
917 }
918 // Since the worker may be executing some tasks in IO thread, we should add it to the
919 // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
920 HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
921 idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
922 if (GetTaskNum() != 0) {
923 NotifyExecuteTask();
924 }
925 }
926
927 // ---------------------------------- SendData ---------------------------------------
RegisterCallback(napi_env env,uint64_t taskId,std::shared_ptr<CallbackInfo> callbackInfo)928 void TaskManager::RegisterCallback(napi_env env, uint64_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
929 {
930 std::lock_guard<std::mutex> lock(callbackMutex_);
931 callbackTable_[taskId] = callbackInfo;
932 }
933
GetCallbackInfo(uint64_t taskId)934 std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint64_t taskId)
935 {
936 std::lock_guard<std::mutex> lock(callbackMutex_);
937 auto iter = callbackTable_.find(taskId);
938 if (iter == callbackTable_.end() || iter->second == nullptr) {
939 HILOG_ERROR("taskpool:: the callback does not exist");
940 return nullptr;
941 }
942 return iter->second;
943 }
944
IncreaseRefCount(uint64_t taskId)945 void TaskManager::IncreaseRefCount(uint64_t taskId)
946 {
947 if (taskId == 0) { // do not support func
948 return;
949 }
950 std::lock_guard<std::mutex> lock(callbackMutex_);
951 auto iter = callbackTable_.find(taskId);
952 if (iter == callbackTable_.end() || iter->second == nullptr) {
953 return;
954 }
955 iter->second->refCount++;
956 }
957
DecreaseRefCount(napi_env env,uint64_t taskId)958 void TaskManager::DecreaseRefCount(napi_env env, uint64_t taskId)
959 {
960 if (taskId == 0) { // do not support func
961 return;
962 }
963 std::lock_guard<std::mutex> lock(callbackMutex_);
964 auto iter = callbackTable_.find(taskId);
965 if (iter == callbackTable_.end() || iter->second == nullptr) {
966 return;
967 }
968
969 auto task = reinterpret_cast<Task*>(taskId);
970 if (!task->IsValid()) {
971 callbackTable_.erase(iter);
972 return;
973 }
974
975 iter->second->refCount--;
976 if (iter->second->refCount == 0) {
977 callbackTable_.erase(iter);
978 }
979 }
980
ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo> & callbackInfo)981 void TaskManager::ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo)
982 {
983 std::lock_guard<std::mutex> lock(callbackMutex_);
984 callbackInfo->worker = nullptr;
985 }
986
NotifyCallbackExecute(napi_env env,TaskResultInfo * resultInfo,Task * task)987 napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
988 {
989 HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str());
990 std::lock_guard<std::mutex> lock(callbackMutex_);
991 auto iter = callbackTable_.find(task->taskId_);
992 if (iter == callbackTable_.end() || iter->second == nullptr) {
993 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
994 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
995 delete resultInfo;
996 return nullptr;
997 }
998 Worker* worker = static_cast<Worker*>(task->worker_);
999 worker->Enqueue(task->env_, resultInfo);
1000 auto callbackInfo = iter->second;
1001 callbackInfo->refCount++;
1002 callbackInfo->worker = worker;
1003 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
1004 workerEngine->IncreaseListeningCounter();
1005 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1006 if (task->IsMainThreadTask()) {
1007 HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask");
1008 auto onCallbackTask = [callbackInfo]() {
1009 TaskPool::ExecuteCallbackTask(callbackInfo.get());
1010 };
1011 TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_);
1012 } else {
1013 callbackInfo->onCallbackSignal->data = callbackInfo.get();
1014 uv_async_send(callbackInfo->onCallbackSignal);
1015 }
1016 #else
1017 callbackInfo->onCallbackSignal->data = callbackInfo.get();
1018 uv_async_send(callbackInfo->onCallbackSignal);
1019 #endif
1020 return nullptr;
1021 }
1022
GetMessageQueue(const uv_async_t * req)1023 MsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req)
1024 {
1025 std::lock_guard<std::mutex> lock(callbackMutex_);
1026 auto info = static_cast<CallbackInfo*>(req->data);
1027 if (info == nullptr || info->worker == nullptr) {
1028 HILOG_WARN("taskpool:: info or worker is nullptr");
1029 return nullptr;
1030 }
1031 auto worker = info->worker;
1032 MsgQueue* queue = nullptr;
1033 worker->Dequeue(info->hostEnv, queue);
1034 return queue;
1035 }
1036
GetMessageQueueFromCallbackInfo(CallbackInfo * callbackInfo)1037 MsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo)
1038 {
1039 std::lock_guard<std::mutex> lock(callbackMutex_);
1040 if (callbackInfo == nullptr || callbackInfo->worker == nullptr) {
1041 HILOG_WARN("taskpool:: callbackInfo or worker is nullptr");
1042 return nullptr;
1043 }
1044 auto worker = callbackInfo->worker;
1045 MsgQueue* queue = nullptr;
1046 worker->Dequeue(callbackInfo->hostEnv, queue);
1047 return queue;
1048 }
1049 // ---------------------------------- SendData ---------------------------------------
1050
NotifyDependencyTaskInfo(uint64_t taskId)1051 void TaskManager::NotifyDependencyTaskInfo(uint64_t taskId)
1052 {
1053 HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
1054 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
1055 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1056 auto iter = dependentTaskInfos_.find(taskId);
1057 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1058 HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1059 return;
1060 }
1061 for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1062 auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1063 RemoveDependencyById(taskId, *taskIdIter);
1064 taskIdIter = iter->second.erase(taskIdIter);
1065 if (taskInfo.first != 0) {
1066 EnqueueTaskId(taskInfo.first, taskInfo.second);
1067 }
1068 }
1069 }
1070
RemoveDependencyById(uint64_t dependentTaskId,uint64_t taskId)1071 void TaskManager::RemoveDependencyById(uint64_t dependentTaskId, uint64_t taskId)
1072 {
1073 HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
1074 // remove dependency after task execute
1075 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1076 auto dependTaskIter = dependTaskInfos_.find(taskId);
1077 if (dependTaskIter != dependTaskInfos_.end()) {
1078 auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
1079 if (dependTaskInnerIter != dependTaskIter->second.end()) {
1080 dependTaskIter->second.erase(dependTaskInnerIter);
1081 }
1082 }
1083 }
1084
IsDependendByTaskId(uint64_t taskId)1085 bool TaskManager::IsDependendByTaskId(uint64_t taskId)
1086 {
1087 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1088 auto iter = dependTaskInfos_.find(taskId);
1089 if (iter == dependTaskInfos_.end() || iter->second.empty()) {
1090 return false;
1091 }
1092 return true;
1093 }
1094
IsDependentByTaskId(uint64_t dependentTaskId)1095 bool TaskManager::IsDependentByTaskId(uint64_t dependentTaskId)
1096 {
1097 std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1098 auto iter = dependentTaskInfos_.find(dependentTaskId);
1099 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1100 return false;
1101 }
1102 return true;
1103 }
1104
StoreTaskDependency(uint64_t taskId,std::set<uint64_t> taskIdSet)1105 bool TaskManager::StoreTaskDependency(uint64_t taskId, std::set<uint64_t> taskIdSet)
1106 {
1107 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
1108 StoreDependentTaskInfo(taskIdSet, taskId);
1109 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1110 auto iter = dependTaskInfos_.find(taskId);
1111 if (iter == dependTaskInfos_.end()) {
1112 for (const auto& dependentId : taskIdSet) {
1113 auto idIter = dependTaskInfos_.find(dependentId);
1114 if (idIter == dependTaskInfos_.end()) {
1115 continue;
1116 }
1117 if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
1118 return false;
1119 }
1120 }
1121 dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
1122 return true;
1123 }
1124
1125 for (const auto& dependentId : iter->second) {
1126 auto idIter = dependTaskInfos_.find(dependentId);
1127 if (idIter == dependTaskInfos_.end()) {
1128 continue;
1129 }
1130 if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
1131 return false;
1132 }
1133 }
1134 iter->second.insert(taskIdSet.begin(), taskIdSet.end());
1135 return true;
1136 }
1137
CheckCircularDependency(std::set<uint64_t> dependentIdSet,std::set<uint64_t> idSet,uint64_t taskId)1138 bool TaskManager::CheckCircularDependency(std::set<uint64_t> dependentIdSet, std::set<uint64_t> idSet, uint64_t taskId)
1139 {
1140 for (const auto& id : idSet) {
1141 if (id == taskId) {
1142 return false;
1143 }
1144 auto iter = dependentIdSet.find(id);
1145 if (iter != dependentIdSet.end()) {
1146 continue;
1147 }
1148 auto dIter = dependTaskInfos_.find(id);
1149 if (dIter == dependTaskInfos_.end()) {
1150 continue;
1151 }
1152 if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
1153 return false;
1154 }
1155 }
1156 return true;
1157 }
1158
RemoveTaskDependency(uint64_t taskId,uint64_t dependentId)1159 bool TaskManager::RemoveTaskDependency(uint64_t taskId, uint64_t dependentId)
1160 {
1161 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
1162 RemoveDependentTaskInfo(dependentId, taskId);
1163 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1164 auto iter = dependTaskInfos_.find(taskId);
1165 if (iter == dependTaskInfos_.end()) {
1166 return false;
1167 }
1168 auto dependIter = iter->second.find(dependentId);
1169 if (dependIter == iter->second.end()) {
1170 return false;
1171 }
1172 iter->second.erase(dependIter);
1173 return true;
1174 }
1175
EnqueuePendingTaskInfo(uint64_t taskId,Priority priority)1176 void TaskManager::EnqueuePendingTaskInfo(uint64_t taskId, Priority priority)
1177 {
1178 if (taskId == 0) {
1179 return;
1180 }
1181 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1182 pendingTaskInfos_.emplace(taskId, priority);
1183 }
1184
DequeuePendingTaskInfo(uint64_t taskId)1185 std::pair<uint64_t, Priority> TaskManager::DequeuePendingTaskInfo(uint64_t taskId)
1186 {
1187 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1188 if (pendingTaskInfos_.empty()) {
1189 return std::make_pair(0, Priority::DEFAULT);
1190 }
1191 std::pair<uint64_t, Priority> result;
1192 for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
1193 if (it->first == taskId) {
1194 result = std::make_pair(it->first, it->second);
1195 it = pendingTaskInfos_.erase(it);
1196 break;
1197 }
1198 }
1199 return result;
1200 }
1201
RemovePendingTaskInfo(uint64_t taskId)1202 void TaskManager::RemovePendingTaskInfo(uint64_t taskId)
1203 {
1204 HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
1205 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1206 pendingTaskInfos_.erase(taskId);
1207 }
1208
StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet,uint64_t taskId)1209 void TaskManager::StoreDependentTaskInfo(std::set<uint64_t> dependentTaskIdSet, uint64_t taskId)
1210 {
1211 HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
1212 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1213 for (const auto& id : dependentTaskIdSet) {
1214 auto iter = dependentTaskInfos_.find(id);
1215 if (iter == dependentTaskInfos_.end()) {
1216 std::set<uint64_t> set{taskId};
1217 dependentTaskInfos_.emplace(id, std::move(set));
1218 } else {
1219 iter->second.emplace(taskId);
1220 }
1221 }
1222 }
1223
RemoveDependentTaskInfo(uint64_t dependentTaskId,uint64_t taskId)1224 void TaskManager::RemoveDependentTaskInfo(uint64_t dependentTaskId, uint64_t taskId)
1225 {
1226 HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
1227 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1228 auto iter = dependentTaskInfos_.find(dependentTaskId);
1229 if (iter == dependentTaskInfos_.end()) {
1230 return;
1231 }
1232 auto taskIter = iter->second.find(taskId);
1233 if (taskIter == iter->second.end()) {
1234 return;
1235 }
1236 iter->second.erase(taskIter);
1237 }
1238
GetTaskDependInfoToString(uint64_t taskId)1239 std::string TaskManager::GetTaskDependInfoToString(uint64_t taskId)
1240 {
1241 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1242 std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
1243 auto iter = dependTaskInfos_.find(taskId);
1244 if (iter != dependTaskInfos_.end()) {
1245 for (const auto& id : iter->second) {
1246 str += " " + std::to_string(id);
1247 }
1248 }
1249 return str;
1250 }
1251
StoreTaskDuration(uint64_t taskId,uint64_t totalDuration,uint64_t cpuDuration)1252 void TaskManager::StoreTaskDuration(uint64_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
1253 {
1254 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
1255 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1256 auto iter = taskDurationInfos_.find(taskId);
1257 if (iter == taskDurationInfos_.end()) {
1258 std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
1259 taskDurationInfos_.emplace(taskId, std::move(durationData));
1260 } else {
1261 if (totalDuration != 0) {
1262 iter->second.first = totalDuration;
1263 }
1264 if (cpuDuration != 0) {
1265 iter->second.second = cpuDuration;
1266 }
1267 }
1268 }
1269
GetTaskDuration(uint64_t taskId,std::string durationType)1270 uint64_t TaskManager::GetTaskDuration(uint64_t taskId, std::string durationType)
1271 {
1272 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1273 auto iter = taskDurationInfos_.find(taskId);
1274 if (iter == taskDurationInfos_.end()) {
1275 return 0;
1276 }
1277 if (durationType == TASK_TOTAL_TIME) {
1278 return iter->second.first;
1279 } else if (durationType == TASK_CPU_TIME) {
1280 return iter->second.second;
1281 } else if (iter->second.first == 0) {
1282 return 0;
1283 }
1284 return iter->second.first - iter->second.second;
1285 }
1286
GetTaskName(uint64_t taskId)1287 std::string TaskManager::GetTaskName(uint64_t taskId)
1288 {
1289 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1290 auto iter = tasks_.find(taskId);
1291 if (iter == tasks_.end()) {
1292 return "";
1293 }
1294 return iter->second->name_;
1295 }
1296
RemoveTaskDuration(uint64_t taskId)1297 void TaskManager::RemoveTaskDuration(uint64_t taskId)
1298 {
1299 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
1300 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1301 auto iter = taskDurationInfos_.find(taskId);
1302 if (iter != taskDurationInfos_.end()) {
1303 taskDurationInfos_.erase(iter);
1304 }
1305 }
1306
StoreLongTaskInfo(uint64_t taskId,Worker * worker)1307 void TaskManager::StoreLongTaskInfo(uint64_t taskId, Worker* worker)
1308 {
1309 std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1310 longTasksMap_.emplace(taskId, worker);
1311 }
1312
RemoveLongTaskInfo(uint64_t taskId)1313 void TaskManager::RemoveLongTaskInfo(uint64_t taskId)
1314 {
1315 std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1316 longTasksMap_.erase(taskId);
1317 }
1318
GetLongTaskInfo(uint64_t taskId)1319 Worker* TaskManager::GetLongTaskInfo(uint64_t taskId)
1320 {
1321 std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
1322 auto iter = longTasksMap_.find(taskId);
1323 return iter != longTasksMap_.end() ? iter->second : nullptr;
1324 }
1325
TerminateTask(uint64_t taskId)1326 void TaskManager::TerminateTask(uint64_t taskId)
1327 {
1328 HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
1329 auto worker = GetLongTaskInfo(taskId);
1330 if (UNLIKELY(worker == nullptr)) {
1331 return;
1332 }
1333 worker->TerminateTask(taskId);
1334 RemoveLongTaskInfo(taskId);
1335 }
1336
ReleaseTaskData(napi_env env,Task * task,bool shouldDeleteTask)1337 void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
1338 {
1339 uint64_t taskId = task->taskId_;
1340 if (shouldDeleteTask) {
1341 RemoveTask(taskId);
1342 }
1343 if (task->onResultSignal_ != nullptr) {
1344 if (!uv_is_closing((uv_handle_t*)task->onResultSignal_)) {
1345 ConcurrentHelper::UvHandleClose(task->onResultSignal_);
1346 } else {
1347 delete task->onResultSignal_;
1348 }
1349 task->onResultSignal_ = nullptr;
1350 }
1351
1352 if (task->currentTaskInfo_ != nullptr) {
1353 delete task->currentTaskInfo_;
1354 task->currentTaskInfo_ = nullptr;
1355 }
1356
1357 task->CancelPendingTask(env);
1358
1359 task->ClearDelayedTimers();
1360
1361 if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
1362 return;
1363 }
1364 DecreaseRefCount(env, taskId);
1365 RemoveTaskDuration(taskId);
1366 RemovePendingTaskInfo(taskId);
1367 ReleaseCallBackInfo(task);
1368 {
1369 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1370 for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
1371 if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
1372 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
1373 } else {
1374 ++dependentTaskIter;
1375 }
1376 }
1377 }
1378 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1379 auto dependTaskIter = dependTaskInfos_.find(taskId);
1380 if (dependTaskIter != dependTaskInfos_.end()) {
1381 dependTaskInfos_.erase(dependTaskIter);
1382 }
1383 }
1384
ReleaseCallBackInfo(Task * task)1385 void TaskManager::ReleaseCallBackInfo(Task* task)
1386 {
1387 HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
1388 if (task->onEnqueuedCallBackInfo_ != nullptr) {
1389 delete task->onEnqueuedCallBackInfo_;
1390 task->onEnqueuedCallBackInfo_ = nullptr;
1391 }
1392
1393 if (task->onStartExecutionCallBackInfo_ != nullptr) {
1394 delete task->onStartExecutionCallBackInfo_;
1395 task->onStartExecutionCallBackInfo_ = nullptr;
1396 }
1397
1398 if (task->onExecutionFailedCallBackInfo_ != nullptr) {
1399 delete task->onExecutionFailedCallBackInfo_;
1400 task->onExecutionFailedCallBackInfo_ = nullptr;
1401 }
1402
1403 if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
1404 delete task->onExecutionSucceededCallBackInfo_;
1405 task->onExecutionSucceededCallBackInfo_ = nullptr;
1406 }
1407
1408 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1409 if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
1410 if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1411 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1412 } else {
1413 delete task->onStartExecutionSignal_;
1414 }
1415 task->onStartExecutionSignal_ = nullptr;
1416 }
1417 #else
1418 if (task->onStartExecutionSignal_ != nullptr) {
1419 if (!uv_is_closing((uv_handle_t*)task->onStartExecutionSignal_)) {
1420 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1421 } else {
1422 delete task->onStartExecutionSignal_;
1423 }
1424 task->onStartExecutionSignal_ = nullptr;
1425 }
1426 #endif
1427 }
1428
StoreTask(uint64_t taskId,Task * task)1429 void TaskManager::StoreTask(uint64_t taskId, Task* task)
1430 {
1431 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1432 tasks_.emplace(taskId, task);
1433 }
1434
RemoveTask(uint64_t taskId)1435 void TaskManager::RemoveTask(uint64_t taskId)
1436 {
1437 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1438 tasks_.erase(taskId);
1439 }
1440
GetTask(uint64_t taskId)1441 Task* TaskManager::GetTask(uint64_t taskId)
1442 {
1443 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1444 auto iter = tasks_.find(taskId);
1445 if (iter == tasks_.end()) {
1446 return nullptr;
1447 }
1448 return iter->second;
1449 }
1450
1451 #if defined(ENABLE_TASKPOOL_FFRT)
UpdateSystemAppFlag()1452 void TaskManager::UpdateSystemAppFlag()
1453 {
1454 auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
1455 if (abilityManager == nullptr) {
1456 HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
1457 return;
1458 }
1459 auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
1460 if (bundleObj == nullptr) {
1461 HILOG_ERROR("taskpool:: fail to get bundle manager service.");
1462 return;
1463 }
1464 auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
1465 if (bundleMgr == nullptr) {
1466 HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
1467 return;
1468 }
1469 OHOS::AppExecFwk::BundleInfo bundleInfo;
1470 if (bundleMgr->GetBundleInfoForSelf(
1471 static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
1472 != OHOS::ERR_OK) {
1473 HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
1474 return;
1475 }
1476 isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
1477 }
1478 #endif
1479
1480 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
PostTask(std::function<void ()> task,const char * taskName,Priority priority)1481 bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
1482 {
1483 return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
1484 }
1485 #endif
1486
CheckTask(uint64_t taskId)1487 bool TaskManager::CheckTask(uint64_t taskId)
1488 {
1489 std::lock_guard<RECURSIVE_MUTEX> lock(tasksMutex_);
1490 auto item = tasks_.find(taskId);
1491 return item != tasks_.end();
1492 }
1493
1494 // ----------------------------------- TaskGroupManager ----------------------------------------
GetInstance()1495 TaskGroupManager& TaskGroupManager::GetInstance()
1496 {
1497 static TaskGroupManager groupManager;
1498 return groupManager;
1499 }
1500
AddTask(uint64_t groupId,napi_ref taskRef,uint64_t taskId)1501 void TaskGroupManager::AddTask(uint64_t groupId, napi_ref taskRef, uint64_t taskId)
1502 {
1503 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1504 auto groupIter = taskGroups_.find(groupId);
1505 if (groupIter == taskGroups_.end()) {
1506 HILOG_DEBUG("taskpool:: taskGroup has been released");
1507 return;
1508 }
1509 auto taskGroup = reinterpret_cast<TaskGroup*>(groupIter->second);
1510 if (taskGroup == nullptr) {
1511 HILOG_ERROR("taskpool:: taskGroup is null");
1512 return;
1513 }
1514 taskGroup->taskRefs_.push_back(taskRef);
1515 taskGroup->taskNum_++;
1516 taskGroup->taskIds_.push_back(taskId);
1517 }
1518
ReleaseTaskGroupData(napi_env env,TaskGroup * group)1519 void TaskGroupManager::ReleaseTaskGroupData(napi_env env, TaskGroup* group)
1520 {
1521 HILOG_DEBUG("taskpool:: ReleaseTaskGroupData group");
1522 TaskGroupManager::GetInstance().RemoveTaskGroup(group->groupId_);
1523 for (uint64_t taskId : group->taskIds_) {
1524 Task* task = TaskManager::GetInstance().GetTask(taskId);
1525 if (task == nullptr || !task->IsValid()) {
1526 continue;
1527 }
1528 napi_reference_unref(task->env_, task->taskRef_, nullptr);
1529 }
1530
1531 if (group->currentGroupInfo_ != nullptr) {
1532 delete group->currentGroupInfo_;
1533 }
1534
1535 group->CancelPendingGroup(env);
1536 }
1537
CancelGroup(napi_env env,uint64_t groupId)1538 void TaskGroupManager::CancelGroup(napi_env env, uint64_t groupId)
1539 {
1540 std::string strTrace = "CancelGroup: groupId: " + std::to_string(groupId);
1541 HITRACE_HELPER_METER_NAME(strTrace);
1542 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
1543 TaskGroup* taskGroup = GetTaskGroup(groupId);
1544 if (taskGroup == nullptr) {
1545 HILOG_ERROR("taskpool:: CancelGroup group is nullptr");
1546 return;
1547 }
1548 if (taskGroup->groupState_ == ExecuteState::CANCELED) {
1549 return;
1550 }
1551 std::lock_guard<RECURSIVE_MUTEX> lock(taskGroup->taskGroupMutex_);
1552 if (taskGroup->currentGroupInfo_ == nullptr || taskGroup->groupState_ == ExecuteState::NOT_FOUND ||
1553 taskGroup->groupState_ == ExecuteState::FINISHED) {
1554 std::string errMsg = "taskpool:: taskGroup is not executed or has been executed";
1555 HILOG_ERROR("%{public}s", errMsg.c_str());
1556 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK_GROUP, errMsg.c_str());
1557 return;
1558 }
1559 ExecuteState groupState = taskGroup->groupState_;
1560 taskGroup->groupState_ = ExecuteState::CANCELED;
1561 taskGroup->CancelPendingGroup(env);
1562 if (taskGroup->currentGroupInfo_->finishedTaskNum != taskGroup->taskNum_) {
1563 for (uint64_t taskId : taskGroup->taskIds_) {
1564 CancelGroupTask(env, taskId, taskGroup);
1565 }
1566 if (taskGroup->currentGroupInfo_->finishedTaskNum == taskGroup->taskNum_) {
1567 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
1568 taskGroup->RejectResult(env, error);
1569 return;
1570 }
1571 }
1572 if (groupState == ExecuteState::WAITING && taskGroup->currentGroupInfo_ != nullptr) {
1573 auto engine = reinterpret_cast<NativeEngine*>(env);
1574 for (size_t i = 0; i < taskGroup->taskIds_.size(); i++) {
1575 engine->DecreaseSubEnvCounter();
1576 }
1577 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: taskGroup has been canceled");
1578 taskGroup->RejectResult(env, error);
1579 }
1580 }
1581
CancelGroupTask(napi_env env,uint64_t taskId,TaskGroup * group)1582 void TaskGroupManager::CancelGroupTask(napi_env env, uint64_t taskId, TaskGroup* group)
1583 {
1584 HILOG_DEBUG("taskpool:: CancelGroupTask task:%{public}s", std::to_string(taskId).c_str());
1585 auto task = TaskManager::GetInstance().GetTask(taskId);
1586 if (task == nullptr) {
1587 HILOG_INFO("taskpool:: CancelGroupTask task is nullptr");
1588 return;
1589 }
1590 std::lock_guard<RECURSIVE_MUTEX> lock(task->taskMutex_);
1591 if (task->taskState_ == ExecuteState::WAITING && task->currentTaskInfo_ != nullptr) {
1592 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1593 task->DecreaseTaskRefCount();
1594 TaskManager::GetInstance().EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
1595 delete task->currentTaskInfo_;
1596 task->currentTaskInfo_ = nullptr;
1597 if (group->currentGroupInfo_ != nullptr) {
1598 group->currentGroupInfo_->finishedTaskNum++;
1599 }
1600 }
1601 task->taskState_ = ExecuteState::CANCELED;
1602 }
1603
StoreSequenceRunner(uint64_t seqRunnerId,SequenceRunner * seqRunner)1604 void TaskGroupManager::StoreSequenceRunner(uint64_t seqRunnerId, SequenceRunner* seqRunner)
1605 {
1606 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1607 seqRunners_.emplace(seqRunnerId, seqRunner);
1608 }
1609
RemoveSequenceRunner(uint64_t seqRunnerId)1610 void TaskGroupManager::RemoveSequenceRunner(uint64_t seqRunnerId)
1611 {
1612 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1613 seqRunners_.erase(seqRunnerId);
1614 }
1615
GetSeqRunner(uint64_t seqRunnerId)1616 SequenceRunner* TaskGroupManager::GetSeqRunner(uint64_t seqRunnerId)
1617 {
1618 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1619 auto iter = seqRunners_.find(seqRunnerId);
1620 if (iter != seqRunners_.end()) {
1621 return iter->second;
1622 }
1623 HILOG_DEBUG("taskpool:: sequenceRunner has been released.");
1624 return nullptr;
1625 }
1626
AddTaskToSeqRunner(uint64_t seqRunnerId,Task * task)1627 void TaskGroupManager::AddTaskToSeqRunner(uint64_t seqRunnerId, Task* task)
1628 {
1629 std::unique_lock<std::mutex> lock(seqRunnersMutex_);
1630 auto iter = seqRunners_.find(seqRunnerId);
1631 if (iter == seqRunners_.end()) {
1632 HILOG_ERROR("seqRunner:: seqRunner not found.");
1633 return;
1634 } else {
1635 std::unique_lock<std::shared_mutex> seqRunnerLock(iter->second->seqRunnerMutex_);
1636 iter->second->seqRunnerTasks_.push(task);
1637 }
1638 }
1639
TriggerSeqRunner(napi_env env,Task * lastTask)1640 bool TaskGroupManager::TriggerSeqRunner(napi_env env, Task* lastTask)
1641 {
1642 uint64_t seqRunnerId = lastTask->seqRunnerId_;
1643 SequenceRunner* seqRunner = GetSeqRunner(seqRunnerId);
1644 if (seqRunner == nullptr) {
1645 HILOG_ERROR("seqRunner:: trigger seqRunner not exist.");
1646 return false;
1647 }
1648 if (!SequenceRunnerManager::GetInstance().TriggerGlobalSeqRunner(env, seqRunner)) {
1649 HILOG_ERROR("seqRunner:: trigger globalSeqRunner not exist.");
1650 return false;
1651 }
1652 if (seqRunner->currentTaskId_ != lastTask->taskId_) {
1653 HILOG_ERROR("seqRunner:: only front task can trigger seqRunner.");
1654 return false;
1655 }
1656 {
1657 std::unique_lock<std::shared_mutex> lock(seqRunner->seqRunnerMutex_);
1658 if (seqRunner->seqRunnerTasks_.empty()) {
1659 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty.", std::to_string(seqRunnerId).c_str());
1660 seqRunner->currentTaskId_ = 0;
1661 return true;
1662 }
1663 Task* task = seqRunner->seqRunnerTasks_.front();
1664 seqRunner->seqRunnerTasks_.pop();
1665 while (task->taskState_ == ExecuteState::CANCELED) {
1666 DisposeCanceledTask(env, task);
1667 if (seqRunner->seqRunnerTasks_.empty()) {
1668 HILOG_DEBUG("seqRunner:: seqRunner %{public}s empty in cancel loop.",
1669 std::to_string(seqRunnerId).c_str());
1670 seqRunner->currentTaskId_ = 0;
1671 return true;
1672 }
1673 task = seqRunner->seqRunnerTasks_.front();
1674 seqRunner->seqRunnerTasks_.pop();
1675 }
1676 seqRunner->currentTaskId_ = task->taskId_;
1677 task->IncreaseRefCount();
1678 task->taskState_ = ExecuteState::WAITING;
1679 HILOG_DEBUG("seqRunner:: Trigger task %{public}s in seqRunner %{public}s.",
1680 std::to_string(task->taskId_).c_str(), std::to_string(seqRunnerId).c_str());
1681 TaskManager::GetInstance().EnqueueTaskId(task->taskId_, seqRunner->priority_);
1682 }
1683 return true;
1684 }
1685
DisposeCanceledTask(napi_env env,Task * task)1686 void TaskGroupManager::DisposeCanceledTask(napi_env env, Task* task)
1687 {
1688 napi_value error = ErrorHelper::NewError(env, 0, "taskpool:: sequenceRunner task has been canceled");
1689 napi_reject_deferred(env, task->currentTaskInfo_->deferred, error);
1690 reinterpret_cast<NativeEngine*>(env)->DecreaseSubEnvCounter();
1691 napi_reference_unref(env, task->taskRef_, nullptr);
1692 delete task->currentTaskInfo_;
1693 task->currentTaskInfo_ = nullptr;
1694 }
1695
StoreTaskGroup(uint64_t groupId,TaskGroup * taskGroup)1696 void TaskGroupManager::StoreTaskGroup(uint64_t groupId, TaskGroup* taskGroup)
1697 {
1698 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1699 taskGroups_.emplace(groupId, taskGroup);
1700 }
1701
RemoveTaskGroup(uint64_t groupId)1702 void TaskGroupManager::RemoveTaskGroup(uint64_t groupId)
1703 {
1704 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1705 taskGroups_.erase(groupId);
1706 }
1707
GetTaskGroup(uint64_t groupId)1708 TaskGroup* TaskGroupManager::GetTaskGroup(uint64_t groupId)
1709 {
1710 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1711 auto groupIter = taskGroups_.find(groupId);
1712 if (groupIter == taskGroups_.end()) {
1713 return nullptr;
1714 }
1715 return reinterpret_cast<TaskGroup*>(groupIter->second);
1716 }
1717
UpdateGroupState(uint64_t groupId)1718 bool TaskGroupManager::UpdateGroupState(uint64_t groupId)
1719 {
1720 HILOG_DEBUG("taskpool:: UpdateGroupState groupId:%{public}s", std::to_string(groupId).c_str());
1721 // During the modification process of the group, prevent other sub threads from performing other
1722 // operations on the group pointer, which may cause the modification to fail.
1723 std::lock_guard<std::mutex> lock(taskGroupsMutex_);
1724 auto groupIter = taskGroups_.find(groupId);
1725 if (groupIter == taskGroups_.end()) {
1726 return false;
1727 }
1728 TaskGroup* group = reinterpret_cast<TaskGroup*>(groupIter->second);
1729 if (group == nullptr || group->groupState_ == ExecuteState::CANCELED) {
1730 HILOG_DEBUG("taskpool:: UpdateGroupState taskGroup has been released or canceled");
1731 return false;
1732 }
1733 group->groupState_ = ExecuteState::RUNNING;
1734 return true;
1735 }
1736
1737 // ----------------------------------- SequenceRunnerManager ----------------------------------------
GetInstance()1738 SequenceRunnerManager& SequenceRunnerManager::GetInstance()
1739 {
1740 static SequenceRunnerManager sequenceRunnerManager;
1741 return sequenceRunnerManager;
1742 }
1743
CreateOrGetGlobalRunner(napi_env env,napi_value thisVar,size_t argc,const std::string & name,uint32_t priority)1744 SequenceRunner* SequenceRunnerManager::CreateOrGetGlobalRunner(napi_env env, napi_value thisVar, size_t argc,
1745 const std::string &name, uint32_t priority)
1746 {
1747 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1748 SequenceRunner *seqRunner = nullptr;
1749 auto iter = globalSeqRunner_.find(name);
1750 if (iter == globalSeqRunner_.end()) {
1751 seqRunner = new SequenceRunner();
1752 // refresh priority default values on first creation
1753 if (argc == 2) { // 2: The number of parameters is 2.
1754 seqRunner->priority_ = static_cast<Priority>(priority);
1755 }
1756 seqRunner->isGlobalRunner_ = true;
1757 seqRunner->seqName_ = name;
1758 globalSeqRunner_.emplace(name, seqRunner);
1759 } else {
1760 seqRunner = iter->second;
1761 if (priority != seqRunner->priority_) {
1762 ErrorHelper::ThrowError(env, ErrorHelper::TYPE_ERROR, "seqRunner:: priority can not changed.");
1763 return nullptr;
1764 }
1765 }
1766 seqRunner->count_++;
1767 auto tmpIter = seqRunner->globalSeqRunnerRef_.find(env);
1768 if (tmpIter == seqRunner->globalSeqRunnerRef_.end()) {
1769 napi_ref gloableSeqRunnerRef = nullptr;
1770 napi_create_reference(env, thisVar, 0, &gloableSeqRunnerRef);
1771 seqRunner->globalSeqRunnerRef_.emplace(env, gloableSeqRunnerRef);
1772 }
1773
1774 return seqRunner;
1775 }
1776
TriggerGlobalSeqRunner(napi_env env,SequenceRunner * seqRunner)1777 bool SequenceRunnerManager::TriggerGlobalSeqRunner(napi_env env, SequenceRunner* seqRunner)
1778 {
1779 if (seqRunner->isGlobalRunner_) {
1780 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1781 auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1782 if (iter == seqRunner->globalSeqRunnerRef_.end()) {
1783 return false;
1784 }
1785 napi_reference_unref(env, iter->second, nullptr);
1786 } else {
1787 napi_reference_unref(env, seqRunner->seqRunnerRef_, nullptr);
1788 }
1789 return true;
1790 }
1791
DecreaseSeqCount(SequenceRunner * seqRunner)1792 uint64_t SequenceRunnerManager::DecreaseSeqCount(SequenceRunner* seqRunner)
1793 {
1794 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1795 return --(seqRunner->count_);
1796 }
1797
RemoveGlobalSeqRunnerRef(napi_env env,SequenceRunner * seqRunner)1798 void SequenceRunnerManager::RemoveGlobalSeqRunnerRef(napi_env env, SequenceRunner* seqRunner)
1799 {
1800 std::lock_guard<std::mutex> lock(globalSeqRunnerMutex_);
1801 auto iter = seqRunner->globalSeqRunnerRef_.find(env);
1802 if (iter != seqRunner->globalSeqRunnerRef_.end()) {
1803 napi_delete_reference(env, iter->second);
1804 seqRunner->globalSeqRunnerRef_.erase(iter);
1805 }
1806 }
1807
RemoveSequenceRunner(const std::string & name)1808 void SequenceRunnerManager::RemoveSequenceRunner(const std::string &name)
1809 {
1810 std::unique_lock<std::mutex> lock(globalSeqRunnerMutex_);
1811 auto iter = globalSeqRunner_.find(name.c_str());
1812 if (iter != globalSeqRunner_.end()) {
1813 globalSeqRunner_.erase(iter->first);
1814 }
1815 }
1816
GlobalSequenceRunnerDestructor(napi_env env,SequenceRunner * seqRunner)1817 void SequenceRunnerManager::GlobalSequenceRunnerDestructor(napi_env env, SequenceRunner *seqRunner)
1818 {
1819 RemoveGlobalSeqRunnerRef(env, seqRunner);
1820 if (DecreaseSeqCount(seqRunner) == 0) {
1821 RemoveSequenceRunner(seqRunner->seqName_);
1822 TaskGroupManager::GetInstance().RemoveSequenceRunner(seqRunner->seqRunnerId_);
1823 delete seqRunner;
1824 }
1825 }
1826 } // namespace Commonlibrary::Concurrent::TaskPoolModule
1827