1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_manager.h"
17
18 #include <securec.h>
19 #include <thread>
20
21 #include "async_runner_manager.h"
22 #if defined(ENABLE_TASKPOOL_FFRT)
23 #include "bundle_info.h"
24 #include "bundle_mgr_interface.h"
25 #include "bundle_mgr_proxy.h"
26 #include "iservice_registry.h"
27 #include "parameters.h"
28 #include "status_receiver_interface.h"
29 #include "system_ability_definition.h"
30 #include "c/executor_task.h"
31 #include "ffrt_inner.h"
32 #endif
33 #include "sys_timer.h"
34 #include "helper/hitrace_helper.h"
35 #include "taskpool.h"
36 #include "task_group_manager.h"
37
38 namespace Commonlibrary::Concurrent::TaskPoolModule {
39 using namespace OHOS::JsSysModule;
40 template void TaskManager::TryExpandWithCheckIdle<true>();
41 template void TaskManager::TryExpandWithCheckIdle<false>();
42
43 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
44 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
45 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
46 static constexpr uint32_t STEP_SIZE = 2;
47 static constexpr uint32_t DEFAULT_THREADS = 3;
48 static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
49 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
50 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
51 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
52 static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
53 static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
54 static constexpr uint32_t MAX_UINT32_T = 0xFFFFFFFF; // 0xFFFFFFFF: max uint32_t
55 static constexpr uint32_t TRIGGER_EXPAND_INTERVAL = 10; // 10: ms, trigger recheck expansion interval
56 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
57 static constexpr char ON_CALLBACK_STR[] = "TaskPoolOnCallbackTask";
58 static constexpr uint32_t UNEXECUTE_TASK_TIME = 60000; // 60000: 1min
59
60 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
61 static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
62 {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
63 {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
64 {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
65 {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
66 };
67 #endif
68
69 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()70 TaskManager& TaskManager::GetInstance()
71 {
72 static TaskManager manager;
73 return manager;
74 }
75
TaskManager()76 TaskManager::TaskManager()
77 {
78 for (size_t i = 0; i < taskQueues_.size(); i++) {
79 std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
80 taskQueues_[i] = std::move(taskQueue);
81 }
82 }
83
~TaskManager()84 TaskManager::~TaskManager()
85 {
86 HILOG_INFO("taskpool:: ~TaskManager");
87 if (balanceTimer_ == nullptr) {
88 HILOG_ERROR("taskpool:: balanceTimer_ is nullptr");
89 } else {
90 uv_timer_stop(balanceTimer_);
91 ConcurrentHelper::UvHandleClose(balanceTimer_);
92 }
93
94 if (expandTimer_ == nullptr) {
95 HILOG_ERROR("taskpool:: expandTimer_ is nullptr");
96 } else {
97 uv_timer_stop(expandTimer_);
98 ConcurrentHelper::UvHandleClose(expandTimer_);
99 }
100
101 ConcurrentHelper::UvHandleClose(dispatchHandle_);
102
103 if (loop_ != nullptr) {
104 uv_stop(loop_);
105 }
106
107 {
108 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
109 for (auto& worker : workers_) {
110 delete worker;
111 }
112 workers_.clear();
113 }
114
115 {
116 std::lock_guard<std::mutex> lock(callbackMutex_);
117 for (auto& [_, callbackPtr] : callbackTable_) {
118 if (callbackPtr == nullptr) {
119 continue;
120 }
121 callbackPtr.reset();
122 }
123 callbackTable_.clear();
124 }
125
126 {
127 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
128 for (auto& [_, task] : tasks_) {
129 delete task;
130 task = nullptr;
131 }
132 tasks_.clear();
133 }
134 CountTraceForWorker();
135 }
136
CountTraceForWorker(bool needLog)137 void TaskManager::CountTraceForWorker(bool needLog)
138 {
139 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
140 CountTraceForWorkerWithoutLock(needLog);
141 }
142
CountTraceForWorkerWithoutLock(bool needLog)143 inline void TaskManager::CountTraceForWorkerWithoutLock(bool needLog)
144 {
145 int64_t threadNum = static_cast<int64_t>(workers_.size());
146 int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
147 int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
148 HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
149 HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
150 HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
151 HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
152 AddCountTraceForWorkerLog(needLog, threadNum, idleWorkers, timeoutWorkers);
153 }
154
GetThreadInfos(napi_env env)155 napi_value TaskManager::GetThreadInfos(napi_env env)
156 {
157 napi_value threadInfos = nullptr;
158 napi_create_array(env, &threadInfos);
159 std::unordered_set<std::unique_ptr<ThreadInfo>> threadInfoSet {};
160 {
161 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
162 for (auto& worker : workers_) {
163 if (worker->workerEnv_ == nullptr) {
164 continue;
165 }
166 auto threadInfo = std::make_unique<ThreadInfo>();
167 threadInfo->tid = worker->tid_;
168 threadInfo->priority = worker->priority_;
169 for (auto& id : worker->currentTaskId_) {
170 threadInfo->currentTaskId.emplace_back(id);
171 }
172 threadInfoSet.emplace(std::move(threadInfo));
173 }
174 }
175 int32_t i = 0;
176 for (auto& info : threadInfoSet) {
177 napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(info->tid));
178 napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(info->priority));
179
180 napi_value taskId = nullptr;
181 napi_create_array(env, &taskId);
182 int32_t j = 0;
183 for (auto& currentId : info->currentTaskId) {
184 napi_value id = NapiHelper::CreateUint32(env, currentId);
185 napi_set_element(env, taskId, j, id);
186 j++;
187 }
188 napi_value threadInfo = nullptr;
189 napi_create_object(env, &threadInfo);
190 napi_set_named_property(env, threadInfo, "tid", tid);
191 napi_set_named_property(env, threadInfo, "priority", priority);
192 napi_set_named_property(env, threadInfo, "taskIds", taskId);
193 napi_set_element(env, threadInfos, i, threadInfo);
194 i++;
195 }
196 return threadInfos;
197 }
198
GetTaskInfos(napi_env env)199 napi_value TaskManager::GetTaskInfos(napi_env env)
200 {
201 napi_value taskInfos = nullptr;
202 napi_create_array(env, &taskInfos);
203 std::unordered_set<std::unique_ptr<TaskCurrentInfo>> taskCurrentInfoSet {};
204 {
205 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
206 for (const auto& [_, task] : tasks_) {
207 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
208 task->taskState_ == ExecuteState::FINISHED) {
209 continue;
210 }
211 auto taskCurrentInfo = std::make_unique<TaskCurrentInfo>();
212 taskCurrentInfo->taskId = task->taskId_;
213 taskCurrentInfo->name = task->name_;
214 taskCurrentInfo->taskState = task->taskState_;
215 taskCurrentInfo->startTime = task->startTime_;
216 taskCurrentInfoSet.emplace(std::move(taskCurrentInfo));
217 }
218 }
219 int32_t index = 0;
220 for (auto& info : taskCurrentInfoSet) {
221 napi_value taskInfoValue = NapiHelper::CreateObject(env);
222 napi_value taskId = NapiHelper::CreateUint32(env, info->taskId);
223 napi_value name = nullptr;
224 napi_create_string_utf8(env, info->name.c_str(), info->name.size(), &name);
225 napi_set_named_property(env, taskInfoValue, "name", name);
226 ExecuteState state = info->taskState;
227 uint64_t duration = 0;
228 if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
229 duration = ConcurrentHelper::GetMilliseconds() - info->startTime;
230 }
231 napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
232 napi_set_named_property(env, taskInfoValue, "taskId", taskId);
233 napi_set_named_property(env, taskInfoValue, "state", stateValue);
234 napi_value durationValue = NapiHelper::CreateUint32(env, duration);
235 napi_set_named_property(env, taskInfoValue, "duration", durationValue);
236 napi_set_element(env, taskInfos, index, taskInfoValue);
237 index++;
238 }
239 return taskInfos;
240 }
241
UpdateExecutedInfo(uint64_t duration)242 void TaskManager::UpdateExecutedInfo(uint64_t duration)
243 {
244 totalExecTime_ += duration;
245 totalExecCount_++;
246 }
247
ComputeSuitableThreadNum()248 uint32_t TaskManager::ComputeSuitableThreadNum()
249 {
250 uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
251 return targetNum;
252 }
253
ComputeSuitableIdleNum()254 uint32_t TaskManager::ComputeSuitableIdleNum()
255 {
256 uint32_t targetNum = 0;
257 if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
258 // this branch is used for avoiding time-consuming tasks that may block the taskpool
259 targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
260 } else if (totalExecCount_ != 0) {
261 auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
262 uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
263 targetNum = std::min(result, GetNonIdleTaskNum());
264 }
265 return targetNum | 1;
266 }
267
CheckForBlockedWorkers()268 void TaskManager::CheckForBlockedWorkers()
269 {
270 // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
271 // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
272 // else we will choose the longer one
273 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
274 bool needChecking = false;
275 bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
276 uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
277 for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
278 auto worker = *iter;
279 // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
280 // if the worker is executing the longTask, we will not do the check
281 if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
282 (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
283 !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
284 continue;
285 }
286 // When executing the promise task, the worker state may not be updated and will be
287 // marked as 'BLOCKED', so we should exclude this situation.
288 // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
289 // the task like I/O in uv threads, we should also exclude this situation.
290 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
291 if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
292 if (!workerEngine->HasWaitingRequest()) {
293 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
294 } else {
295 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
296 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
297 }
298 continue;
299 }
300
301 HILOG_INFO("taskpool:: The worker has been marked as timeout.");
302 // If the current worker has a longTask and is not executing, we will only interrupt it.
303 if (worker->HasLongTask()) {
304 continue;
305 }
306 needChecking = true;
307 idleWorkers_.erase(worker);
308 timeoutWorkers_.insert(worker);
309 }
310 // should trigger the check when we have marked and removed workers
311 if (UNLIKELY(needChecking)) {
312 TryExpand();
313 }
314 }
315
TryTriggerExpand()316 void TaskManager::TryTriggerExpand()
317 {
318 // post the signal to notify the monitor thread to expand
319 if (UNLIKELY(!isHandleInited_)) {
320 NotifyExecuteTask();
321 needChecking_ = true;
322 HILOG_WARN("taskpool:: the dispatchHandle_ is nullptr");
323 return;
324 }
325 uv_async_send(dispatchHandle_);
326 }
327
328 #if defined(OHOS_PLATFORM)
329 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(pid_t tid,char * buf,uint32_t size)330 bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
331 {
332 char path[128]; // 128: buffer for path
333 pid_t pid = getpid();
334 ssize_t bytesLen = -1;
335 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
336 if (ret < 0) {
337 HILOG_ERROR("snprintf_s failed");
338 return false;
339 }
340 int fd = open(path, O_RDONLY | O_NONBLOCK);
341 if (UNLIKELY(fd == -1)) {
342 return false;
343 }
344 bytesLen = read(fd, buf, size - 1);
345 close(fd);
346 if (bytesLen <= 0) {
347 HILOG_ERROR("taskpool:: failed to read %{public}s", path);
348 return false;
349 }
350 buf[bytesLen] = '\0';
351 return true;
352 }
353
GetIdleWorkers()354 uint32_t TaskManager::GetIdleWorkers()
355 {
356 char buf[4096]; // 4096: buffer for thread info
357 uint32_t idleCount = 0;
358 std::unordered_set<pid_t> tids {};
359 {
360 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
361 for (auto& worker : idleWorkers_) {
362 #if defined(ENABLE_TASKPOOL_FFRT)
363 if (worker->ffrtTaskHandle_ != nullptr) {
364 if (worker->GetWaitTime() > 0) {
365 idleCount++;
366 }
367 continue;
368 }
369 #endif
370 tids.emplace(worker->tid_);
371 }
372 }
373 // The ffrt thread does not read thread info
374 for (auto tid : tids) {
375 if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
376 continue;
377 }
378 char state;
379 if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
380 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
381 return 0;
382 }
383 if (state == 'S') {
384 idleCount++;
385 }
386 }
387 return idleCount;
388 }
389
GetIdleWorkersList(uint32_t step)390 void TaskManager::GetIdleWorkersList(uint32_t step)
391 {
392 char buf[4096]; // 4096: buffer for thread info
393 for (auto& worker : idleWorkers_) {
394 #if defined(ENABLE_TASKPOOL_FFRT)
395 if (worker->ffrtTaskHandle_ != nullptr) {
396 uint64_t workerWaitTime = worker->GetWaitTime();
397 bool isWorkerLoopActive = worker->IsLoopActive();
398 if (workerWaitTime == 0) {
399 continue;
400 }
401 uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
402 std::chrono::steady_clock::now().time_since_epoch()).count());
403 if (!isWorkerLoopActive) {
404 freeList_.emplace_back(worker);
405 } else {
406 // worker uv alive, and will be free in 2 intervals if not wake
407 WorkerAliveAndReport(worker);
408 }
409 continue;
410 }
411 #endif
412 if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
413 continue;
414 }
415 char state;
416 uint64_t utime;
417 if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
418 &state, sizeof(state), &utime) != 2) { // 2: state and utime
419 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
420 return;
421 }
422 if (state != 'S' || utime != worker->lastCpuTime_) {
423 worker->idleCount_ = 0;
424 worker->lastCpuTime_ = utime;
425 continue;
426 }
427 if (++worker->idleCount_ >= IDLE_THRESHOLD) {
428 freeList_.emplace_back(worker);
429 }
430 }
431 }
432
TriggerShrink(uint32_t step)433 void TaskManager::TriggerShrink(uint32_t step)
434 {
435 GetIdleWorkersList(step);
436 step = std::min(step, static_cast<uint32_t>(freeList_.size()));
437 uint32_t count = 0;
438 for (size_t i = 0; i < freeList_.size(); i++) {
439 auto worker = freeList_[i];
440 if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
441 continue;
442 }
443 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
444 if (idleTime < MAX_IDLE_TIME || worker->HasRunningTasks()) {
445 continue;
446 }
447 idleWorkers_.erase(worker);
448 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
449 worker->PostReleaseSignal();
450 if (++count == step) {
451 break;
452 }
453 }
454 freeList_.clear();
455 }
456 #else
GetIdleWorkers()457 uint32_t TaskManager::GetIdleWorkers()
458 {
459 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
460 return idleWorkers_.size();
461 }
462
TriggerShrink(uint32_t step)463 void TaskManager::TriggerShrink(uint32_t step)
464 {
465 for (uint32_t i = 0; i < step; i++) {
466 // try to free the worker that idle time meets the requirement
467 auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker* worker) {
468 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
469 return idleTime > MAX_IDLE_TIME && !worker->HasRunningTasks() && !worker->HasLongTask();
470 });
471 // remove it from all sets
472 if (iter != idleWorkers_.end()) {
473 auto worker = *iter;
474 idleWorkers_.erase(worker);
475 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
476 worker->PostReleaseSignal();
477 }
478 }
479 }
480 #endif
481
NotifyShrink(uint32_t targetNum)482 void TaskManager::NotifyShrink(uint32_t targetNum)
483 {
484 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
485 uint32_t workerCount = workers_.size();
486 uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
487 // update the maxThreads_ periodically
488 maxThreads_ = ConcurrentHelper::GetMaxThreads();
489 if (minThread == 0) {
490 HILOG_INFO("taskpool:: low memory");
491 }
492 if (workerCount > minThread && workerCount > targetNum) {
493 targetNum = std::max(minThread, targetNum);
494 uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
495 TriggerShrink(step);
496 }
497 // remove all timeout workers
498 for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
499 auto worker = *iter;
500 if (workers_.find(worker) == workers_.end()) {
501 HILOG_WARN("taskpool:: current worker maybe release");
502 iter = timeoutWorkers_.erase(iter);
503 } else if (!worker->HasRunningTasks()) {
504 HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", worker->tid_);
505 worker->PostReleaseSignal();
506 timeoutWorkers_.erase(iter++);
507 return;
508 } else {
509 iter++;
510 }
511 }
512 uint32_t idleNum = idleWorkers_.size();
513 // System memory state is moderate and the worker has exeuted tasks, we will try to release it
514 if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
515 auto worker = *(idleWorkers_.begin());
516 // worker that has longTask should not be released
517 if (worker == nullptr || worker->HasLongTask()) {
518 return;
519 }
520 if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
521 TriggerShrink(DEFAULT_MIN_THREADS);
522 return;
523 }
524 }
525
526 // Create a worker for performance
527 if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
528 CreateWorkers(hostEnv_);
529 }
530 // stop the timer
531 if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
532 suspend_ = true;
533 uv_timer_stop(balanceTimer_);
534 HILOG_DEBUG("taskpool:: timer will be suspended");
535 }
536 }
537
TriggerLoadBalance(const uv_timer_t * req)538 void TaskManager::TriggerLoadBalance([[maybe_unused]] const uv_timer_t* req)
539 {
540 TaskManager& taskManager = TaskManager::GetInstance();
541 taskManager.CheckForBlockedWorkers();
542 uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
543 taskManager.NotifyShrink(targetNum);
544 taskManager.CountTraceForWorker(true);
545 }
546
DispatchAndTryExpand(const uv_async_t * req)547 void TaskManager::DispatchAndTryExpand([[maybe_unused]] const uv_async_t* req)
548 {
549 TaskManager& taskManager = TaskManager::GetInstance();
550 taskManager.DispatchAndTryExpandInner();
551 }
552
DispatchAndTryExpandInner()553 void TaskManager::DispatchAndTryExpandInner()
554 {
555 // dispatch task in the TaskPoolManager thread
556 NotifyExecuteTask();
557 // do not check the worker idleTime first
558 TryExpandWithCheckIdle<false>();
559 if (!timerTriggered_ && GetNonIdleTaskNum() != 0) {
560 timerTriggered_ = true;
561 // use timer to check worker idle time after dispatch task
562 // if worker has been blocked or fd is broken, taskpool can expand automatically
563 uv_timer_start(expandTimer_, reinterpret_cast<uv_timer_cb>(TryExpand), TRIGGER_EXPAND_INTERVAL, 0);
564 }
565 }
566
TryExpand(const uv_timer_t * req)567 void TaskManager::TryExpand([[maybe_unused]] const uv_timer_t* req)
568 {
569 TaskManager& taskManager = TaskManager::GetInstance();
570 taskManager.TryExpandWithCheckIdle<true>();
571 }
572
573 template <bool needCheckIdle>
TryExpandWithCheckIdle()574 void TaskManager::TryExpandWithCheckIdle()
575 {
576 if (GetNonIdleTaskNum() == 0) {
577 HILOG_INFO("taskpool:: no need to create worker");
578 return;
579 }
580
581 needChecking_ = false;
582 timerTriggered_ = false;
583 uint32_t targetNum = ComputeSuitableIdleNum();
584 uint32_t workerCount = 0;
585 uint32_t idleCount = 0;
586 uint32_t timeoutWorkers = 0;
587 {
588 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
589 workerCount = workers_.size();
590 timeoutWorkers = timeoutWorkers_.size();
591 if constexpr (needCheckIdle) {
592 uint64_t currTime = ConcurrentHelper::GetMilliseconds();
593 idleCount = static_cast<uint32_t>(std::count_if(idleWorkers_.begin(), idleWorkers_.end(),
594 [currTime](const auto& worker) {
595 return worker->IsRunnable(currTime);
596 }));
597 } else {
598 idleCount = idleWorkers_.size();
599 }
600 }
601 uint32_t maxThreads = std::max(maxThreads_, DEFAULT_THREADS);
602 maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
603 if (workerCount < maxThreads && idleCount < targetNum) {
604 // Prevent the total number of workers from exceeding maxThreads
605 uint32_t step = std::min(std::min(maxThreads, targetNum) - idleCount, maxThreads - workerCount);
606 step = step >= expandingCount_ ? step - expandingCount_ : 0;
607 if (step == 0) {
608 return;
609 }
610 CreateWorkers(hostEnv_, step);
611 HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
612 maxThreads, step, GetThreadNum());
613 }
614 if (UNLIKELY(suspend_)) {
615 suspend_ = false;
616 uv_timer_again(balanceTimer_);
617 }
618 }
619
RunTaskManager()620 void TaskManager::RunTaskManager()
621 {
622 loop_ = uv_loop_new();
623 if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
624 HILOG_FATAL("taskpool:: new loop failed.");
625 return;
626 }
627 ConcurrentHelper::UvHandleInit(loop_, dispatchHandle_, TaskManager::DispatchAndTryExpand);
628 balanceTimer_ = new uv_timer_t;
629 uv_timer_init(loop_, balanceTimer_);
630 uv_timer_start(balanceTimer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
631
632 expandTimer_ = new uv_timer_t;
633 uv_timer_init(loop_, expandTimer_);
634
635 isHandleInited_ = true;
636 #if defined IOS_PLATFORM || defined MAC_PLATFORM
637 pthread_setname_np("OS_TaskManager");
638 #else
639 pthread_setname_np(pthread_self(), "OS_TaskManager");
640 #endif
641 if (UNLIKELY(needChecking_)) {
642 needChecking_ = false;
643 uv_async_send(dispatchHandle_);
644 }
645 uv_run(loop_, UV_RUN_DEFAULT);
646 if (loop_ != nullptr) {
647 uv_loop_delete(loop_);
648 }
649 }
650
CancelTask(napi_env env,uint32_t taskId)651 void TaskManager::CancelTask(napi_env env, uint32_t taskId)
652 {
653 // 1. Cannot find taskInfo by executeId, throw error
654 // 2. Find executing taskInfo, skip it
655 // 3. Find waiting taskInfo, cancel it
656 // 4. Find canceled taskInfo, skip it
657 std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
658 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
659 HITRACE_HELPER_METER_NAME(strTrace);
660 Task* task = GetTask(taskId);
661 if (task == nullptr) {
662 std::string errMsg = "taskpool:: the task may not exist";
663 HILOG_ERROR("%{public}s", errMsg.c_str());
664 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
665 return;
666 }
667 if (task->taskState_ == ExecuteState::CANCELED) {
668 HILOG_DEBUG("taskpool:: task has been canceled");
669 return;
670 }
671 if (task->IsGroupCommonTask()) {
672 // when task is a group common task, still check the state
673 if (task->currentTaskInfo_ == nullptr || task->taskState_ == ExecuteState::NOT_FOUND ||
674 task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
675 std::string errMsg = "taskpool:: task is not executed or has been executed";
676 HILOG_ERROR("%{public}s", errMsg.c_str());
677 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
678 return;
679 }
680 TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
681 if (taskGroup == nullptr) {
682 return;
683 }
684 return taskGroup->CancelGroupTask(env, task->taskId_);
685 }
686 if (task->IsPeriodicTask()) {
687 task->UpdateTaskStateToCanceled();
688 return;
689 }
690 if (task->IsSeqRunnerTask()) {
691 CancelSeqRunnerTask(env, task);
692 return;
693 }
694 if (task->IsAsyncRunnerTask()) {
695 AsyncRunnerManager::GetInstance().CancelAsyncRunnerTask(env, task);
696 return;
697 }
698 ExecuteState state = ExecuteState::NOT_FOUND;
699 {
700 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
701 if (task->taskState_ == ExecuteState::CANCELED) {
702 HILOG_DEBUG("taskpool:: task has been canceled");
703 return;
704 }
705 if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
706 task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
707 task->taskState_ == ExecuteState::ENDING) {
708 std::string errMsg = "taskpool:: task is not executed or has been executed";
709 HILOG_ERROR("%{public}s", errMsg.c_str());
710 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
711 return;
712 }
713 state = task->taskState_.exchange(ExecuteState::CANCELED);
714 }
715 if (task->IsSameEnv(env)) {
716 task->CancelInner(state);
717 return;
718 }
719 CancelTaskMessage* message = new CancelTaskMessage(state, task->taskId_);
720 task->TriggerCancel(message);
721 }
722
CancelSeqRunnerTask(napi_env env,Task * task)723 void TaskManager::CancelSeqRunnerTask(napi_env env, Task* task)
724 {
725 {
726 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
727 if (task->taskState_ != ExecuteState::FINISHED) {
728 task->taskState_ = ExecuteState::CANCELED;
729 return;
730 }
731 }
732 std::string errMsg = "taskpool:: sequenceRunner task has been executed";
733 HILOG_ERROR("%{public}s", errMsg.c_str());
734 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
735 }
736
NotifyWorkerIdle(Worker * worker)737 void TaskManager::NotifyWorkerIdle(Worker* worker)
738 {
739 {
740 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
741 if (worker->state_ == WorkerState::BLOCKED) {
742 return;
743 }
744 idleWorkers_.insert(worker);
745 }
746 if (GetTaskNum() != 0) {
747 NotifyExecuteTask();
748 }
749 CountTraceForWorker();
750 }
751
NotifyWorkerCreated(Worker * worker)752 void TaskManager::NotifyWorkerCreated(Worker* worker)
753 {
754 NotifyWorkerIdle(worker);
755 expandingCount_--;
756 }
757
NotifyWorkerAdded(Worker * worker)758 void TaskManager::NotifyWorkerAdded(Worker* worker)
759 {
760 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
761 workers_.insert(worker);
762 HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
763 }
764
NotifyWorkerRunning(Worker * worker)765 void TaskManager::NotifyWorkerRunning(Worker* worker)
766 {
767 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
768 idleWorkers_.erase(worker);
769 CountTraceForWorkerWithoutLock();
770 }
771
GetRunningWorkers()772 uint32_t TaskManager::GetRunningWorkers()
773 {
774 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
775 return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
776 return worker->HasRunningTasks();
777 });
778 }
779
GetTimeoutWorkers()780 uint32_t TaskManager::GetTimeoutWorkers()
781 {
782 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
783 return timeoutWorkers_.size();
784 }
785
GetTaskNum()786 uint32_t TaskManager::GetTaskNum()
787 {
788 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
789 uint32_t sum = 0;
790 for (const auto& elements : taskQueues_) {
791 sum += elements->GetTaskNum();
792 }
793 return sum;
794 }
795
GetNonIdleTaskNum()796 uint32_t TaskManager::GetNonIdleTaskNum()
797 {
798 return nonIdleTaskNum_;
799 }
800
IncreaseTaskNum(Priority priority)801 void TaskManager::IncreaseTaskNum(Priority priority)
802 {
803 totalTaskNum_.fetch_add(1);
804 if (priority != Priority::IDLE) {
805 nonIdleTaskNum_.fetch_add(1);
806 }
807 }
808
DecreaseTaskNum(Priority priority)809 void TaskManager::DecreaseTaskNum(Priority priority)
810 {
811 totalTaskNum_.fetch_sub(1);
812 if (priority != Priority::IDLE) {
813 nonIdleTaskNum_.fetch_sub(1);
814 }
815 }
816
GetThreadNum()817 uint32_t TaskManager::GetThreadNum()
818 {
819 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
820 return workers_.size();
821 }
822
EnqueueTaskId(uint32_t taskId,Priority priority)823 void TaskManager::EnqueueTaskId(uint32_t taskId, Priority priority)
824 {
825 {
826 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
827 IncreaseTaskNum(priority);
828 taskQueues_[priority]->EnqueueTaskId(taskId);
829 }
830 TryTriggerExpand();
831 Task* task = GetTask(taskId);
832 if (task == nullptr) {
833 HILOG_FATAL("taskpool:: task is nullptr");
834 return;
835 }
836 task->IncreaseTaskLifecycleCount();
837 ListenerCallBackInfo* info = nullptr;
838 {
839 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
840 info = task->onEnqueuedCallBackInfo_;
841 }
842 if (info != nullptr) {
843 task->ExecuteListenerCallback(info, taskId);
844 } else { // LOCV_EXCL_BR_LINE
845 HILOG_WARN("taskpool:: onEnqueuedCallBackInfo is null");
846 }
847 }
848
EraseWaitingTaskId(uint32_t taskId,Priority priority)849 bool TaskManager::EraseWaitingTaskId(uint32_t taskId, Priority priority)
850 {
851 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
852 if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
853 HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
854 return false;
855 }
856 DecreaseTaskNum(priority);
857 return true;
858 }
859
DequeueTaskId()860 std::pair<uint32_t, Priority> TaskManager::DequeueTaskId()
861 {
862 bool isChoose = IsChooseIdle();
863 {
864 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
865 auto& highTaskQueue = taskQueues_[Priority::HIGH];
866 if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
867 highPrioExecuteCount_++;
868 return GetTaskByPriority(highTaskQueue, Priority::HIGH);
869 }
870 highPrioExecuteCount_ = 0;
871
872 auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
873 if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
874 mediumPrioExecuteCount_++;
875 return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
876 }
877 mediumPrioExecuteCount_ = 0;
878
879 auto& lowTaskQueue = taskQueues_[Priority::LOW];
880 if (!lowTaskQueue->IsEmpty()) {
881 return GetTaskByPriority(lowTaskQueue, Priority::LOW);
882 }
883
884 auto& idleTaskQueue = taskQueues_[Priority::IDLE];
885 if (!IsPerformIdle() && highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() &&
886 isChoose) {
887 return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
888 }
889 }
890 return std::make_pair(0, Priority::LOW);
891 }
892
IsChooseIdle()893 bool TaskManager::IsChooseIdle()
894 {
895 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
896 if (workers_.size() != idleWorkers_.size()) {
897 return false;
898 }
899 return true;
900 }
901
GetTaskByPriority(const std::unique_ptr<ExecuteQueue> & taskQueue,Priority priority)902 std::pair<uint32_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
903 Priority priority)
904 {
905 uint32_t taskId = taskQueue->DequeueTaskId();
906 DecreaseTaskNum(priority);
907 if (IsDependendByTaskId(taskId)) {
908 EnqueuePendingTaskInfo(taskId, priority);
909 return std::make_pair(0, priority);
910 }
911 preDequeneTime_ = ConcurrentHelper::GetMilliseconds();
912 if (priority == Priority::IDLE && taskId != 0) {
913 SetIsPerformIdle(true);
914 }
915 return std::make_pair(taskId, priority);
916 }
917
NotifyExecuteTask()918 void TaskManager::NotifyExecuteTask()
919 {
920 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
921 if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
922 // When there are only idle tasks and workers executing them, it is not triggered
923 HILOG_INFO("taskpool:: not notify execute task");
924 return;
925 }
926 if (idleWorkers_.size() == 0) {
927 HILOG_INFO("taskpool:: idleWorkers is 0");
928 return;
929 }
930
931 for (auto& worker : idleWorkers_) {
932 worker->NotifyExecuteTask();
933 }
934 }
935
InitTaskManager(napi_env env)936 void TaskManager::InitTaskManager(napi_env env)
937 {
938 HITRACE_HELPER_METER_NAME("InitTaskManager");
939 if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
940 #if defined(ENABLE_TASKPOOL_FFRT)
941 globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
942 if (!globalEnableFfrtFlag_) {
943 UpdateSystemAppFlag();
944 if (IsSystemApp()) {
945 disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
946 }
947 }
948 if (EnableFfrt()) {
949 HILOG_INFO("taskpool:: apps use ffrt");
950 } else {
951 HILOG_INFO("taskpool:: apps do not use ffrt");
952 }
953 #endif
954 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
955 mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
956 OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
957 #endif
958 auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
959 if (mainThreadEngine == nullptr) {
960 HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
961 return;
962 }
963 hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
964 // Add a reserved thread for taskpool
965 CreateWorkers(hostEnv_);
966 // Create a timer to manage worker threads
967 std::thread workerManager([this] {this->RunTaskManager();});
968 workerManager.detach();
969 }
970 }
971
CreateWorkers(napi_env env,uint32_t num)972 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
973 {
974 HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
975 for (uint32_t i = 0; i < num; i++) {
976 expandingCount_++;
977 auto worker = Worker::WorkerConstructor(env);
978 NotifyWorkerAdded(worker);
979 }
980 CountTraceForWorker();
981 }
982
RemoveWorker(Worker * worker)983 void TaskManager::RemoveWorker(Worker* worker)
984 {
985 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
986 idleWorkers_.erase(worker);
987 timeoutWorkers_.erase(worker);
988 workers_.erase(worker);
989 }
990
RestoreWorker(Worker * worker)991 void TaskManager::RestoreWorker(Worker* worker)
992 {
993 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
994 if (UNLIKELY(suspend_)) {
995 suspend_ = false;
996 uv_timer_again(balanceTimer_);
997 }
998 if (worker->state_ == WorkerState::BLOCKED) {
999 // since the worker is blocked, we should add it to the timeout set
1000 timeoutWorkers_.insert(worker);
1001 return;
1002 }
1003 // Since the worker may be executing some tasks in IO thread, we should add it to the
1004 // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
1005 HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
1006 idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
1007 if (GetTaskNum() != 0) {
1008 NotifyExecuteTask();
1009 }
1010 }
1011
1012 // ---------------------------------- SendData ---------------------------------------
RegisterCallback(napi_env env,uint32_t taskId,std::shared_ptr<CallbackInfo> callbackInfo,const std::string & type)1013 void TaskManager::RegisterCallback(napi_env env, uint32_t taskId, std::shared_ptr<CallbackInfo> callbackInfo,
1014 const std::string& type)
1015 {
1016 std::lock_guard<std::mutex> lock(callbackMutex_);
1017 if (callbackInfo != nullptr) {
1018 callbackInfo->type = type;
1019 } else { // LOCV_EXCL_BR_LINE
1020 HILOG_WARN("taskpool:: callbackInfo is null.");
1021 }
1022 callbackTable_[taskId] = callbackInfo;
1023 }
1024
IncreaseSendDataRefCount(uint32_t taskId)1025 void TaskManager::IncreaseSendDataRefCount(uint32_t taskId)
1026 {
1027 if (taskId == 0) { // do not support func
1028 return;
1029 }
1030 std::lock_guard<std::mutex> lock(callbackMutex_);
1031 auto iter = callbackTable_.find(taskId);
1032 if (iter == callbackTable_.end() || iter->second == nullptr) {
1033 return;
1034 }
1035 iter->second->refCount++;
1036 }
1037
DecreaseSendDataRefCount(napi_env env,uint32_t taskId,Task * task)1038 void TaskManager::DecreaseSendDataRefCount(napi_env env, uint32_t taskId, Task* task)
1039 {
1040 if (taskId == 0) { // do not support func
1041 return;
1042 }
1043 std::lock_guard<std::mutex> lock(callbackMutex_);
1044 auto iter = callbackTable_.find(taskId);
1045 if (iter == callbackTable_.end() || iter->second == nullptr) {
1046 return;
1047 }
1048
1049 if (task != nullptr && !task->IsValid()) {
1050 callbackTable_.erase(iter);
1051 return;
1052 }
1053
1054 if (--iter->second->refCount == 0) {
1055 callbackTable_.erase(iter);
1056 }
1057 }
1058
ExecuteSendData(napi_env env,TaskResultInfo * resultInfo,Task * task)1059 void TaskManager::ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, Task* task)
1060 {
1061 std::lock_guard<std::mutex> lock(callbackMutex_);
1062 auto iter = callbackTable_.find(task->GetTaskId());
1063 if (iter == callbackTable_.end() || iter->second == nullptr) {
1064 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
1065 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
1066 delete resultInfo;
1067 return;
1068 }
1069 auto callbackInfo = iter->second;
1070 ++callbackInfo->refCount;
1071 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
1072 workerEngine->IncreaseListeningCounter();
1073 std::weak_ptr<CallbackInfo> info = callbackInfo;
1074 auto onCallbackTask = [info, resultInfo]([[maybe_unused]] void* data) {
1075 auto callbackInfo = info.lock();
1076 if (callbackInfo == nullptr) {
1077 HILOG_ERROR("taskpool:: callbackInfo may have been released");
1078 delete resultInfo;
1079 return;
1080 }
1081 TaskPool::ExecuteOnReceiveDataCallback(callbackInfo.get(), resultInfo);
1082 };
1083 auto worker = task->GetWorker();
1084 auto hostEnv = task->GetEnv();
1085 auto priority = g_napiPriorityMap.at(worker->GetPriority());
1086 uint64_t handleId = 0;
1087 napi_status status = napi_send_cancelable_event(hostEnv, onCallbackTask, nullptr, priority,
1088 &handleId, ON_CALLBACK_STR);
1089 if (status != napi_ok) {
1090 HILOG_ERROR("taskpool:: failed to send event to the host side");
1091 --callbackInfo->refCount;
1092 workerEngine->DecreaseListeningCounter();
1093 delete resultInfo;
1094 }
1095 }
1096 // ---------------------------------- SendData ---------------------------------------
1097
NotifyDependencyTaskInfo(uint32_t taskId)1098 void TaskManager::NotifyDependencyTaskInfo(uint32_t taskId)
1099 {
1100 HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
1101 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
1102 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1103 auto iter = dependentTaskInfos_.find(taskId);
1104 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1105 HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1106 return;
1107 }
1108 for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1109 auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1110 RemoveDependencyById(taskId, *taskIdIter);
1111 taskIdIter = iter->second.erase(taskIdIter);
1112 if (taskInfo.first != 0) {
1113 EnqueueTaskId(taskInfo.first, taskInfo.second);
1114 }
1115 }
1116 }
1117
RemoveDependencyById(uint32_t dependentTaskId,uint32_t taskId)1118 void TaskManager::RemoveDependencyById(uint32_t dependentTaskId, uint32_t taskId)
1119 {
1120 HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
1121 // remove dependency after task execute
1122 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1123 auto dependTaskIter = dependTaskInfos_.find(taskId);
1124 if (dependTaskIter != dependTaskInfos_.end()) {
1125 auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
1126 if (dependTaskInnerIter != dependTaskIter->second.end()) {
1127 dependTaskIter->second.erase(dependTaskInnerIter);
1128 }
1129 }
1130 }
1131
IsDependendByTaskId(uint32_t taskId)1132 bool TaskManager::IsDependendByTaskId(uint32_t taskId)
1133 {
1134 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1135 auto iter = dependTaskInfos_.find(taskId);
1136 if (iter == dependTaskInfos_.end() || iter->second.empty()) {
1137 return false;
1138 }
1139 return true;
1140 }
1141
IsDependentByTaskId(uint32_t dependentTaskId)1142 bool TaskManager::IsDependentByTaskId(uint32_t dependentTaskId)
1143 {
1144 std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1145 auto iter = dependentTaskInfos_.find(dependentTaskId);
1146 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1147 return false;
1148 }
1149 return true;
1150 }
1151
StoreTaskDependency(uint32_t taskId,std::set<uint32_t> taskIdSet)1152 bool TaskManager::StoreTaskDependency(uint32_t taskId, std::set<uint32_t> taskIdSet)
1153 {
1154 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
1155 StoreDependentTaskInfo(taskIdSet, taskId);
1156 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1157 auto iter = dependTaskInfos_.find(taskId);
1158 if (iter == dependTaskInfos_.end()) {
1159 for (const auto& dependentId : taskIdSet) {
1160 auto idIter = dependTaskInfos_.find(dependentId);
1161 if (idIter == dependTaskInfos_.end()) {
1162 continue;
1163 }
1164 if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
1165 return false;
1166 }
1167 }
1168 dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
1169 return true;
1170 }
1171
1172 for (const auto& dependentId : iter->second) {
1173 auto idIter = dependTaskInfos_.find(dependentId);
1174 if (idIter == dependTaskInfos_.end()) {
1175 continue;
1176 }
1177 if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
1178 return false;
1179 }
1180 }
1181 iter->second.insert(taskIdSet.begin(), taskIdSet.end());
1182 return true;
1183 }
1184
CheckCircularDependency(std::set<uint32_t> dependentIdSet,std::set<uint32_t> idSet,uint32_t taskId)1185 bool TaskManager::CheckCircularDependency(std::set<uint32_t> dependentIdSet, std::set<uint32_t> idSet, uint32_t taskId)
1186 {
1187 for (const auto& id : idSet) {
1188 if (id == taskId) {
1189 return false;
1190 }
1191 auto iter = dependentIdSet.find(id);
1192 if (iter != dependentIdSet.end()) {
1193 continue;
1194 }
1195 auto dIter = dependTaskInfos_.find(id);
1196 if (dIter == dependTaskInfos_.end()) {
1197 continue;
1198 }
1199 if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
1200 return false;
1201 }
1202 }
1203 return true;
1204 }
1205
RemoveTaskDependency(uint32_t taskId,uint32_t dependentId)1206 bool TaskManager::RemoveTaskDependency(uint32_t taskId, uint32_t dependentId)
1207 {
1208 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
1209 RemoveDependentTaskInfo(dependentId, taskId);
1210 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1211 auto iter = dependTaskInfos_.find(taskId);
1212 if (iter == dependTaskInfos_.end()) {
1213 return false;
1214 }
1215 auto dependIter = iter->second.find(dependentId);
1216 if (dependIter == iter->second.end()) {
1217 return false;
1218 }
1219 iter->second.erase(dependIter);
1220 return true;
1221 }
1222
EnqueuePendingTaskInfo(uint32_t taskId,Priority priority)1223 void TaskManager::EnqueuePendingTaskInfo(uint32_t taskId, Priority priority)
1224 {
1225 if (taskId == 0) {
1226 return;
1227 }
1228 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1229 pendingTaskInfos_.emplace(taskId, priority);
1230 }
1231
DequeuePendingTaskInfo(uint32_t taskId)1232 std::pair<uint32_t, Priority> TaskManager::DequeuePendingTaskInfo(uint32_t taskId)
1233 {
1234 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1235 if (pendingTaskInfos_.empty()) {
1236 return std::make_pair(0, Priority::DEFAULT);
1237 }
1238 std::pair<uint32_t, Priority> result;
1239 for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
1240 if (it->first == taskId) {
1241 result = std::make_pair(it->first, it->second);
1242 it = pendingTaskInfos_.erase(it);
1243 break;
1244 }
1245 }
1246 return result;
1247 }
1248
RemovePendingTaskInfo(uint32_t taskId)1249 void TaskManager::RemovePendingTaskInfo(uint32_t taskId)
1250 {
1251 HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
1252 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1253 pendingTaskInfos_.erase(taskId);
1254 }
1255
StoreDependentTaskInfo(std::set<uint32_t> dependentTaskIdSet,uint32_t taskId)1256 void TaskManager::StoreDependentTaskInfo(std::set<uint32_t> dependentTaskIdSet, uint32_t taskId)
1257 {
1258 HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
1259 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1260 for (const auto& id : dependentTaskIdSet) {
1261 auto iter = dependentTaskInfos_.find(id);
1262 if (iter == dependentTaskInfos_.end()) {
1263 std::set<uint32_t> set{taskId};
1264 dependentTaskInfos_.emplace(id, std::move(set));
1265 } else {
1266 iter->second.emplace(taskId);
1267 }
1268 }
1269 }
1270
RemoveDependentTaskInfo(uint32_t dependentTaskId,uint32_t taskId)1271 void TaskManager::RemoveDependentTaskInfo(uint32_t dependentTaskId, uint32_t taskId)
1272 {
1273 HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
1274 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1275 auto iter = dependentTaskInfos_.find(dependentTaskId);
1276 if (iter == dependentTaskInfos_.end()) {
1277 return;
1278 }
1279 auto taskIter = iter->second.find(taskId);
1280 if (taskIter == iter->second.end()) {
1281 return;
1282 }
1283 iter->second.erase(taskIter);
1284 }
1285
GetTaskDependInfoToString(uint32_t taskId)1286 std::string TaskManager::GetTaskDependInfoToString(uint32_t taskId)
1287 {
1288 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1289 std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
1290 auto iter = dependTaskInfos_.find(taskId);
1291 if (iter != dependTaskInfos_.end()) {
1292 for (const auto& id : iter->second) {
1293 str += " " + std::to_string(id);
1294 }
1295 }
1296 return str;
1297 }
1298
StoreTaskDuration(uint32_t taskId,uint64_t totalDuration,uint64_t cpuDuration)1299 void TaskManager::StoreTaskDuration(uint32_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
1300 {
1301 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
1302 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1303 auto iter = taskDurationInfos_.find(taskId);
1304 if (iter == taskDurationInfos_.end()) {
1305 std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
1306 taskDurationInfos_.emplace(taskId, std::move(durationData));
1307 } else {
1308 if (totalDuration != 0) {
1309 iter->second.first = totalDuration;
1310 }
1311 if (cpuDuration != 0) {
1312 iter->second.second = cpuDuration;
1313 }
1314 }
1315 }
1316
GetTaskDuration(uint32_t taskId,std::string durationType)1317 uint64_t TaskManager::GetTaskDuration(uint32_t taskId, std::string durationType)
1318 {
1319 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1320 auto iter = taskDurationInfos_.find(taskId);
1321 if (iter == taskDurationInfos_.end()) {
1322 return 0;
1323 }
1324 if (durationType == TASK_TOTAL_TIME) {
1325 return iter->second.first;
1326 } else if (durationType == TASK_CPU_TIME) {
1327 return iter->second.second;
1328 } else if (iter->second.first == 0) {
1329 return 0;
1330 }
1331 return iter->second.first - iter->second.second;
1332 }
1333
RemoveTaskDuration(uint32_t taskId)1334 void TaskManager::RemoveTaskDuration(uint32_t taskId)
1335 {
1336 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
1337 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1338 auto iter = taskDurationInfos_.find(taskId);
1339 if (iter != taskDurationInfos_.end()) {
1340 taskDurationInfos_.erase(iter);
1341 }
1342 }
1343
StoreLongTaskInfo(uint32_t taskId,Worker * worker)1344 void TaskManager::StoreLongTaskInfo(uint32_t taskId, Worker* worker)
1345 {
1346 std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1347 longTasksMap_.emplace(taskId, worker);
1348 }
1349
RemoveLongTaskInfo(uint32_t taskId)1350 void TaskManager::RemoveLongTaskInfo(uint32_t taskId)
1351 {
1352 std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1353 longTasksMap_.erase(taskId);
1354 }
1355
GetLongTaskInfo(uint32_t taskId)1356 Worker* TaskManager::GetLongTaskInfo(uint32_t taskId)
1357 {
1358 std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
1359 auto iter = longTasksMap_.find(taskId);
1360 return iter != longTasksMap_.end() ? iter->second : nullptr;
1361 }
1362
TerminateTask(uint32_t taskId)1363 void TaskManager::TerminateTask(uint32_t taskId)
1364 {
1365 HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
1366 auto worker = GetLongTaskInfo(taskId);
1367 if (UNLIKELY(worker == nullptr)) {
1368 return;
1369 }
1370 worker->TerminateTask(taskId);
1371 RemoveLongTaskInfo(taskId);
1372 }
1373
ReleaseTaskData(napi_env env,Task * task,bool shouldDeleteTask)1374 void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
1375 {
1376 uint32_t taskId = task->taskId_;
1377 if (shouldDeleteTask) {
1378 RemoveTask(taskId);
1379 }
1380
1381 task->ReleaseData();
1382 task->CancelPendingTask(env);
1383
1384 task->ClearDelayedTimers();
1385
1386 if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
1387 return;
1388 }
1389 if (!task->IsMainThreadTask()) {
1390 task->SetValid(false);
1391 }
1392 DecreaseSendDataRefCount(env, taskId, task);
1393 RemoveTaskDuration(taskId);
1394 RemovePendingTaskInfo(taskId);
1395 ReleaseCallBackInfo(task);
1396 {
1397 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1398 for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
1399 if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
1400 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
1401 } else {
1402 ++dependentTaskIter;
1403 }
1404 }
1405 }
1406 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1407 auto dependTaskIter = dependTaskInfos_.find(taskId);
1408 if (dependTaskIter != dependTaskInfos_.end()) {
1409 dependTaskInfos_.erase(dependTaskIter);
1410 }
1411 }
1412
ReleaseCallBackInfo(Task * task)1413 void TaskManager::ReleaseCallBackInfo(Task* task)
1414 {
1415 HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
1416 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
1417 if (task->onEnqueuedCallBackInfo_ != nullptr) {
1418 delete task->onEnqueuedCallBackInfo_;
1419 task->onEnqueuedCallBackInfo_ = nullptr;
1420 }
1421
1422 if (task->onStartExecutionCallBackInfo_ != nullptr) {
1423 delete task->onStartExecutionCallBackInfo_;
1424 task->onStartExecutionCallBackInfo_ = nullptr;
1425 }
1426
1427 if (task->onExecutionFailedCallBackInfo_ != nullptr) {
1428 delete task->onExecutionFailedCallBackInfo_;
1429 task->onExecutionFailedCallBackInfo_ = nullptr;
1430 }
1431
1432 if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
1433 delete task->onExecutionSucceededCallBackInfo_;
1434 task->onExecutionSucceededCallBackInfo_ = nullptr;
1435 }
1436
1437 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1438 if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
1439 if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) {
1440 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1441 } else {
1442 delete task->onStartExecutionSignal_;
1443 task->onStartExecutionSignal_ = nullptr;
1444 }
1445 }
1446 #else
1447 if (task->onStartExecutionSignal_ != nullptr) {
1448 if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) {
1449 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1450 } else {
1451 delete task->onStartExecutionSignal_;
1452 task->onStartExecutionSignal_ = nullptr;
1453 }
1454 }
1455 #endif
1456 }
1457
StoreTask(Task * task)1458 void TaskManager::StoreTask(Task* task)
1459 {
1460 uint64_t id = reinterpret_cast<uint64_t>(task);
1461 uint32_t taskId = CalculateTaskId(id);
1462 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1463 while (tasks_.find(taskId) != tasks_.end()) {
1464 id++;
1465 taskId = CalculateTaskId(id);
1466 }
1467 task->SetTaskId(taskId);
1468 tasks_.emplace(taskId, task);
1469 }
1470
RemoveTask(uint32_t taskId)1471 void TaskManager::RemoveTask(uint32_t taskId)
1472 {
1473 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1474 tasks_.erase(taskId);
1475 }
1476
GetTask(uint32_t taskId)1477 Task* TaskManager::GetTask(uint32_t taskId)
1478 {
1479 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1480 auto iter = tasks_.find(taskId);
1481 if (iter == tasks_.end()) {
1482 return nullptr;
1483 }
1484 return iter->second;
1485 }
1486
1487 #if defined(ENABLE_TASKPOOL_FFRT)
UpdateSystemAppFlag()1488 void TaskManager::UpdateSystemAppFlag()
1489 {
1490 auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
1491 if (abilityManager == nullptr) {
1492 HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
1493 return;
1494 }
1495 auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
1496 if (bundleObj == nullptr) {
1497 HILOG_ERROR("taskpool:: fail to get bundle manager service.");
1498 return;
1499 }
1500 auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
1501 if (bundleMgr == nullptr) {
1502 HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
1503 return;
1504 }
1505 OHOS::AppExecFwk::BundleInfo bundleInfo;
1506 if (bundleMgr->GetBundleInfoForSelf(
1507 static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
1508 != OHOS::ERR_OK) {
1509 HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
1510 return;
1511 }
1512 isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
1513 }
1514 #endif
1515
1516 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
PostTask(std::function<void ()> task,const char * taskName,Priority priority)1517 bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
1518 {
1519 return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
1520 }
1521 #endif
1522
BatchRejectDeferred(napi_env env,std::list<napi_deferred> deferreds,std::string error)1523 void TaskManager::BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error)
1524 {
1525 if (deferreds.empty()) {
1526 return;
1527 }
1528 napi_value message = CancelError(env, 0, error.c_str());
1529 for (auto deferred : deferreds) {
1530 napi_reject_deferred(env, deferred, message);
1531 }
1532 }
1533
CalculateTaskId(uint64_t id)1534 uint32_t TaskManager::CalculateTaskId(uint64_t id)
1535 {
1536 size_t hash = std::hash<uint64_t>{}(id);
1537 uint64_t taskId = static_cast<uint64_t>(hash & MAX_UINT32_T);
1538 while (taskId == 0) {
1539 ++taskId;
1540 hash = std::hash<uint64_t>{}(taskId);
1541 taskId = static_cast<uint64_t>(hash & MAX_UINT32_T);
1542 }
1543 return static_cast<uint32_t>(taskId);
1544 }
1545
ClearDependentTask(uint32_t taskId)1546 void TaskManager::ClearDependentTask(uint32_t taskId)
1547 {
1548 HILOG_DEBUG("taskpool:: task:%{public}s ClearDependentTask", std::to_string(taskId).c_str());
1549 RemoveDependTaskByTaskId(taskId);
1550 DequeuePendingTaskInfo(taskId);
1551 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1552 RemoveDependentTaskByTaskId(taskId);
1553 }
1554
RemoveDependTaskByTaskId(uint32_t taskId)1555 void TaskManager::RemoveDependTaskByTaskId(uint32_t taskId)
1556 {
1557 std::set<uint32_t> dependTaskIds;
1558 {
1559 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1560 auto iter = dependTaskInfos_.find(taskId);
1561 if (iter == dependTaskInfos_.end()) {
1562 return;
1563 }
1564 dependTaskIds.insert(iter->second.begin(), iter->second.end());
1565 dependTaskInfos_.erase(iter);
1566 }
1567 for (auto dependTaskId : dependTaskIds) {
1568 RemoveDependentTaskInfo(dependTaskId, taskId);
1569 }
1570 }
1571
RemoveDependentTaskByTaskId(uint32_t taskId)1572 void TaskManager::RemoveDependentTaskByTaskId(uint32_t taskId)
1573 {
1574 auto iter = dependentTaskInfos_.find(taskId);
1575 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1576 HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1577 return;
1578 }
1579 for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1580 auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1581 RemoveDependencyById(taskId, *taskIdIter);
1582 auto id = *taskIdIter;
1583 taskIdIter = iter->second.erase(taskIdIter);
1584 auto task = GetTask(id);
1585 if (task == nullptr) {
1586 continue;
1587 }
1588 if (task->currentTaskInfo_ != nullptr) {
1589 EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
1590 }
1591 task->DisposeCanceledTask();
1592 RemoveDependentTaskByTaskId(task->taskId_);
1593 }
1594 }
1595
WriteHisysForFfrtAndUv(Worker * worker,HisyseventParams * hisyseventParams)1596 void TaskManager::WriteHisysForFfrtAndUv(Worker* worker, HisyseventParams* hisyseventParams)
1597 {
1598 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1599 if (!EnableFfrt() || hisyseventParams == nullptr) {
1600 return;
1601 }
1602 int32_t tid = 0;
1603 if (worker != nullptr) {
1604 auto loop = worker->GetWorkerLoop();
1605 uint64_t loopAddress = reinterpret_cast<uint64_t>(loop);
1606 hisyseventParams->message += "; current runningLoop: " + std::to_string(loopAddress);
1607 tid = worker->tid_;
1608 }
1609 hisyseventParams->tid = tid;
1610 hisyseventParams->pid = getpid();
1611 hisyseventParams->message += "; idleWorkers num: " + std::to_string(idleWorkers_.size());
1612 hisyseventParams->message += "; workers num: " + std::to_string(workers_.size());
1613 int32_t ret = DfxHisysEvent::WriteFfrtAndUv(hisyseventParams);
1614 if (ret < 0) {
1615 HILOG_WARN("taskpool:: HisyseventWrite failed, ret: %{public}s", std::to_string(ret).c_str());
1616 }
1617 #endif
1618 }
1619
CheckTasksAndReportHisysEvent()1620 void TaskManager::CheckTasksAndReportHisysEvent()
1621 {
1622 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1623 if (!EnableFfrt()) {
1624 return;
1625 }
1626 uint64_t currTime = ConcurrentHelper::GetMilliseconds();
1627 auto taskNum = GetTaskNum();
1628 if (taskNum == 0 || currTime - preDequeneTime_ < UNEXECUTE_TASK_TIME || preDequeneTime_ < reportTime_) {
1629 return;
1630 }
1631 std::string message = "Task scheduling timeout, total task num:" + std::to_string(taskNum);
1632 HisyseventParams* hisyseventParams = new HisyseventParams();
1633 hisyseventParams->methodName = "CheckTasksAndReportHisysEvent";
1634 hisyseventParams->funName = "";
1635 hisyseventParams->logType = "ffrt";
1636 hisyseventParams->code = DfxHisysEvent::TASK_TIMEOUT;
1637 hisyseventParams->message = message;
1638 WriteHisysForFfrtAndUv(nullptr, hisyseventParams);
1639 reportTime_ = currTime;
1640 #endif
1641 }
1642
WorkerAliveAndReport(Worker * worker)1643 void TaskManager::WorkerAliveAndReport(Worker* worker)
1644 {
1645 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1646 uint64_t workerWaitTime = worker->GetWaitTime();
1647 uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
1648 std::chrono::steady_clock::now().time_since_epoch()).count());
1649 uint64_t intervalTime = currTime - workerWaitTime;
1650 if (!worker->IsNeedReport(intervalTime)) {
1651 return;
1652 }
1653 std::string message = "worker uv alive, may have handle leak";
1654 HisyseventParams* hisyseventParams = new HisyseventParams();
1655 hisyseventParams->methodName = "WorkerAliveAndReport";
1656 hisyseventParams->funName = "";
1657 hisyseventParams->logType = "ffrt";
1658 hisyseventParams->code = DfxHisysEvent::WORKER_ALIVE;
1659 hisyseventParams->message = message;
1660 hisyseventParams->waitTime = intervalTime;
1661 WriteHisysForFfrtAndUv(worker, hisyseventParams);
1662 worker->IncreaseReportCount();
1663 #endif
1664 }
1665
UvReportHisysEvent(Worker * worker,std::string methodName,std::string funName,std::string message,int32_t code)1666 void TaskManager::UvReportHisysEvent(Worker* worker, std::string methodName, std::string funName, std::string message,
1667 int32_t code)
1668 {
1669 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1670 HisyseventParams* hisyseventParams = new HisyseventParams();
1671 hisyseventParams->methodName = methodName;
1672 hisyseventParams->funName = funName;
1673 hisyseventParams->logType = "uv";
1674 hisyseventParams->code = code;
1675 hisyseventParams->message = message;
1676 WriteHisysForFfrtAndUv(worker, hisyseventParams);
1677 #endif
1678 }
1679
CancelError(napi_env env,int32_t errCode,const char * errMessage,napi_value result,bool success)1680 napi_value TaskManager::CancelError(napi_env env, int32_t errCode, const char* errMessage, napi_value result,
1681 bool success)
1682 {
1683 std::string errTitle = "";
1684 napi_value concurrentError = nullptr;
1685
1686 napi_value code = nullptr;
1687 napi_create_uint32(env, errCode, &code);
1688
1689 napi_value name = nullptr;
1690 std::string errName = "BusinessError";
1691
1692 if (errCode == ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED) {
1693 errTitle = "The asyncRunner task has been canceled.";
1694 }
1695
1696 if (errCode == 0 && errMessage == nullptr) {
1697 errTitle = "taskpool:: task has been canceled";
1698 }
1699
1700 napi_create_string_utf8(env, errName.c_str(), NAPI_AUTO_LENGTH, &name);
1701 napi_value msg = nullptr;
1702 if (errMessage == nullptr) {
1703 napi_create_string_utf8(env, errTitle.c_str(), NAPI_AUTO_LENGTH, &msg);
1704 } else {
1705 napi_create_string_utf8(env, (errTitle + std::string(errMessage)).c_str(), NAPI_AUTO_LENGTH, &msg);
1706 }
1707
1708 napi_create_error(env, nullptr, msg, &concurrentError);
1709 napi_set_named_property(env, concurrentError, "code", code);
1710 napi_set_named_property(env, concurrentError, "name", name);
1711 napi_value data = nullptr;
1712 napi_create_object(env, &data);
1713 napi_value resultValue = nullptr;
1714 napi_value errorValue = msg;
1715 napi_get_undefined(env, &resultValue);
1716 if (result != nullptr) {
1717 bool isError = false;
1718 napi_is_error(env, result, &isError);
1719 napi_value error = NapiHelper::GetNameProperty(env, result, "error");
1720 if (NapiHelper::IsNotUndefined(env, error)) {
1721 errorValue = error;
1722 } else if (isError && !success) {
1723 napi_value errMsg = NapiHelper::GetNameProperty(env, result, "message");
1724 errorValue = errMsg;
1725 } else {
1726 resultValue = result;
1727 }
1728 }
1729 napi_set_named_property(env, data, "result", resultValue);
1730 napi_set_named_property(env, data, "error", errorValue);
1731 napi_set_named_property(env, concurrentError, "data", data);
1732 return concurrentError;
1733 }
1734
SetIsPerformIdle(bool performIdle)1735 void TaskManager::SetIsPerformIdle(bool performIdle)
1736 {
1737 isPerformIdle_ = performIdle;
1738 }
1739
IsPerformIdle() const1740 bool TaskManager::IsPerformIdle() const
1741 {
1742 return isPerformIdle_;
1743 }
1744
GetTotalTaskNum() const1745 uint32_t TaskManager::GetTotalTaskNum() const
1746 {
1747 return totalTaskNum_;
1748 }
1749
AddCountTraceForWorkerLog(bool needLog,int64_t threadNum,int64_t idleThreadNum,int64_t timeoutThreadNum)1750 void TaskManager::AddCountTraceForWorkerLog(bool needLog, int64_t threadNum, int64_t idleThreadNum,
1751 int64_t timeoutThreadNum)
1752 {
1753 if (needLog) {
1754 HILOG_INFO("taskpool:: threads: %{public}s, running: %{public}s, idle: %{public}s, timeout: %{public}s",
1755 std::to_string(threadNum).c_str(), std::to_string(threadNum - idleThreadNum).c_str(),
1756 std::to_string(idleThreadNum).c_str(), std::to_string(timeoutThreadNum).c_str());
1757 }
1758 }
1759 } // namespace Commonlibrary::Concurrent::TaskPoolModule