1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "task_manager.h"
17
18 #include <securec.h>
19 #include <thread>
20
21 #include "async_runner_manager.h"
22 #if defined(ENABLE_TASKPOOL_FFRT)
23 #include "bundle_info.h"
24 #include "bundle_mgr_interface.h"
25 #include "bundle_mgr_proxy.h"
26 #include "iservice_registry.h"
27 #include "parameters.h"
28 #include "status_receiver_interface.h"
29 #include "system_ability_definition.h"
30 #include "c/executor_task.h"
31 #include "ffrt_inner.h"
32 #endif
33 #include "sys_timer.h"
34 #include "helper/hitrace_helper.h"
35 #include "taskpool.h"
36 #include "task_group_manager.h"
37
38 namespace Commonlibrary::Concurrent::TaskPoolModule {
39 using namespace OHOS::JsSysModule;
40 template void TaskManager::TryExpandWithCheckIdle<true>();
41 template void TaskManager::TryExpandWithCheckIdle<false>();
42
43 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
44 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
45 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
46 static constexpr uint32_t STEP_SIZE = 2;
47 static constexpr uint32_t DEFAULT_THREADS = 3;
48 static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
49 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
50 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
51 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
52 static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
53 static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
54 static constexpr uint32_t MAX_UINT32_T = 0xFFFFFFFF; // 0xFFFFFFFF: max uint32_t
55 static constexpr uint32_t TRIGGER_EXPAND_INTERVAL = 10; // 10: ms, trigger recheck expansion interval
56 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
57 static constexpr uint32_t UNEXECUTE_TASK_TIME = 60000; // 60000: 1min
58
59 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
60 static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
61 {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
62 {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
63 {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
64 {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
65 };
66 #endif
67
68 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()69 TaskManager& TaskManager::GetInstance()
70 {
71 static TaskManager manager;
72 return manager;
73 }
74
TaskManager()75 TaskManager::TaskManager()
76 {
77 for (size_t i = 0; i < taskQueues_.size(); i++) {
78 std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
79 taskQueues_[i] = std::move(taskQueue);
80 }
81 }
82
~TaskManager()83 TaskManager::~TaskManager()
84 {
85 HILOG_INFO("taskpool:: ~TaskManager");
86 if (balanceTimer_ == nullptr) {
87 HILOG_ERROR("taskpool:: balanceTimer_ is nullptr");
88 } else {
89 uv_timer_stop(balanceTimer_);
90 ConcurrentHelper::UvHandleClose(balanceTimer_);
91 }
92
93 if (expandTimer_ == nullptr) {
94 HILOG_ERROR("taskpool:: expandTimer_ is nullptr");
95 } else {
96 uv_timer_stop(expandTimer_);
97 ConcurrentHelper::UvHandleClose(expandTimer_);
98 }
99
100 ConcurrentHelper::UvHandleClose(dispatchHandle_);
101
102 if (loop_ != nullptr) {
103 uv_stop(loop_);
104 }
105
106 {
107 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
108 for (auto& worker : workers_) {
109 delete worker;
110 }
111 workers_.clear();
112 }
113
114 {
115 std::lock_guard<std::mutex> lock(callbackMutex_);
116 for (auto& [_, callbackPtr] : callbackTable_) {
117 if (callbackPtr == nullptr) {
118 continue;
119 }
120 callbackPtr.reset();
121 }
122 callbackTable_.clear();
123 }
124
125 {
126 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
127 for (auto& [_, task] : tasks_) {
128 delete task;
129 task = nullptr;
130 }
131 tasks_.clear();
132 }
133 CountTraceForWorker();
134 }
135
CountTraceForWorker()136 void TaskManager::CountTraceForWorker()
137 {
138 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
139 int64_t threadNum = static_cast<int64_t>(workers_.size());
140 int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
141 int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
142 HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
143 HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
144 HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
145 HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
146 }
147
GetThreadInfos(napi_env env)148 napi_value TaskManager::GetThreadInfos(napi_env env)
149 {
150 napi_value threadInfos = nullptr;
151 napi_create_array(env, &threadInfos);
152 {
153 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
154 int32_t i = 0;
155 for (auto& worker : workers_) {
156 if (worker->workerEnv_ == nullptr) {
157 continue;
158 }
159 napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_));
160 napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_));
161
162 napi_value taskId = nullptr;
163 napi_create_array(env, &taskId);
164 int32_t j = 0;
165 {
166 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
167 for (auto& currentId : worker->currentTaskId_) {
168 napi_value id = NapiHelper::CreateUint32(env, currentId);
169 napi_set_element(env, taskId, j, id);
170 j++;
171 }
172 }
173 napi_value threadInfo = nullptr;
174 napi_create_object(env, &threadInfo);
175 napi_set_named_property(env, threadInfo, "tid", tid);
176 napi_set_named_property(env, threadInfo, "priority", priority);
177 napi_set_named_property(env, threadInfo, "taskIds", taskId);
178 napi_set_element(env, threadInfos, i, threadInfo);
179 i++;
180 }
181 }
182 return threadInfos;
183 }
184
GetTaskInfos(napi_env env)185 napi_value TaskManager::GetTaskInfos(napi_env env)
186 {
187 napi_value taskInfos = nullptr;
188 napi_create_array(env, &taskInfos);
189 {
190 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
191 int32_t i = 0;
192 for (const auto& [_, task] : tasks_) {
193 if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
194 task->taskState_ == ExecuteState::FINISHED) {
195 continue;
196 }
197 napi_value taskInfoValue = NapiHelper::CreateObject(env);
198 napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_);
199 napi_value name = nullptr;
200 napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
201 napi_set_named_property(env, taskInfoValue, "name", name);
202 ExecuteState state = task->taskState_;
203 uint64_t duration = 0;
204 if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
205 duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
206 }
207 napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
208 napi_set_named_property(env, taskInfoValue, "taskId", taskId);
209 napi_set_named_property(env, taskInfoValue, "state", stateValue);
210 napi_value durationValue = NapiHelper::CreateUint32(env, duration);
211 napi_set_named_property(env, taskInfoValue, "duration", durationValue);
212 napi_set_element(env, taskInfos, i, taskInfoValue);
213 i++;
214 }
215 }
216 return taskInfos;
217 }
218
UpdateExecutedInfo(uint64_t duration)219 void TaskManager::UpdateExecutedInfo(uint64_t duration)
220 {
221 totalExecTime_ += duration;
222 totalExecCount_++;
223 }
224
ComputeSuitableThreadNum()225 uint32_t TaskManager::ComputeSuitableThreadNum()
226 {
227 uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
228 return targetNum;
229 }
230
ComputeSuitableIdleNum()231 uint32_t TaskManager::ComputeSuitableIdleNum()
232 {
233 uint32_t targetNum = 0;
234 if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
235 // this branch is used for avoiding time-consuming tasks that may block the taskpool
236 targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
237 } else if (totalExecCount_ != 0) {
238 auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
239 uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
240 targetNum = std::min(result, GetNonIdleTaskNum());
241 }
242 return targetNum;
243 }
244
CheckForBlockedWorkers()245 void TaskManager::CheckForBlockedWorkers()
246 {
247 // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
248 // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
249 // else we will choose the longer one
250 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
251 bool needChecking = false;
252 bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
253 uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
254 for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
255 auto worker = *iter;
256 // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
257 // if the worker is executing the longTask, we will not do the check
258 if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
259 (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
260 !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
261 continue;
262 }
263 // When executing the promise task, the worker state may not be updated and will be
264 // marked as 'BLOCKED', so we should exclude this situation.
265 // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
266 // the task like I/O in uv threads, we should also exclude this situation.
267 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
268 if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
269 if (!workerEngine->HasWaitingRequest()) {
270 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
271 } else {
272 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
273 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
274 }
275 continue;
276 }
277
278 HILOG_INFO("taskpool:: The worker has been marked as timeout.");
279 // If the current worker has a longTask and is not executing, we will only interrupt it.
280 if (worker->HasLongTask()) {
281 continue;
282 }
283 needChecking = true;
284 idleWorkers_.erase(worker);
285 timeoutWorkers_.insert(worker);
286 }
287 // should trigger the check when we have marked and removed workers
288 if (UNLIKELY(needChecking)) {
289 TryExpand();
290 }
291 }
292
TryTriggerExpand()293 void TaskManager::TryTriggerExpand()
294 {
295 // post the signal to notify the monitor thread to expand
296 if (UNLIKELY(!isHandleInited_)) {
297 NotifyExecuteTask();
298 needChecking_ = true;
299 HILOG_WARN("taskpool:: the dispatchHandle_ is nullptr");
300 return;
301 }
302 uv_async_send(dispatchHandle_);
303 }
304
305 #if defined(OHOS_PLATFORM)
306 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(pid_t tid,char * buf,uint32_t size)307 bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
308 {
309 char path[128]; // 128: buffer for path
310 pid_t pid = getpid();
311 ssize_t bytesLen = -1;
312 int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
313 if (ret < 0) {
314 HILOG_ERROR("snprintf_s failed");
315 return false;
316 }
317 int fd = open(path, O_RDONLY | O_NONBLOCK);
318 if (UNLIKELY(fd == -1)) {
319 return false;
320 }
321 bytesLen = read(fd, buf, size - 1);
322 close(fd);
323 if (bytesLen <= 0) {
324 HILOG_ERROR("taskpool:: failed to read %{public}s", path);
325 return false;
326 }
327 buf[bytesLen] = '\0';
328 return true;
329 }
330
GetIdleWorkers()331 uint32_t TaskManager::GetIdleWorkers()
332 {
333 char buf[4096]; // 4096: buffer for thread info
334 uint32_t idleCount = 0;
335 std::unordered_set<pid_t> tids {};
336 {
337 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
338 for (auto& worker : idleWorkers_) {
339 #if defined(ENABLE_TASKPOOL_FFRT)
340 if (worker->ffrtTaskHandle_ != nullptr) {
341 if (worker->GetWaitTime() > 0) {
342 idleCount++;
343 }
344 continue;
345 }
346 #endif
347 tids.emplace(worker->tid_);
348 }
349 }
350 // The ffrt thread does not read thread info
351 for (auto tid : tids) {
352 if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
353 continue;
354 }
355 char state;
356 if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
357 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
358 return 0;
359 }
360 if (state == 'S') {
361 idleCount++;
362 }
363 }
364 return idleCount;
365 }
366
GetIdleWorkersList(uint32_t step)367 void TaskManager::GetIdleWorkersList(uint32_t step)
368 {
369 char buf[4096]; // 4096: buffer for thread info
370 for (auto& worker : idleWorkers_) {
371 #if defined(ENABLE_TASKPOOL_FFRT)
372 if (worker->ffrtTaskHandle_ != nullptr) {
373 uint64_t workerWaitTime = worker->GetWaitTime();
374 bool isWorkerLoopActive = worker->IsLoopActive();
375 if (workerWaitTime == 0) {
376 continue;
377 }
378 uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
379 std::chrono::steady_clock::now().time_since_epoch()).count());
380 if (!isWorkerLoopActive) {
381 freeList_.emplace_back(worker);
382 } else {
383 // worker uv alive, and will be free in 2 intervals if not wake
384 WorkerAliveAndReport(worker);
385 }
386 continue;
387 }
388 #endif
389 if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
390 continue;
391 }
392 char state;
393 uint64_t utime;
394 if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
395 &state, sizeof(state), &utime) != 2) { // 2: state and utime
396 HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
397 return;
398 }
399 if (state != 'S' || utime != worker->lastCpuTime_) {
400 worker->idleCount_ = 0;
401 worker->lastCpuTime_ = utime;
402 continue;
403 }
404 if (++worker->idleCount_ >= IDLE_THRESHOLD) {
405 freeList_.emplace_back(worker);
406 }
407 }
408 }
409
TriggerShrink(uint32_t step)410 void TaskManager::TriggerShrink(uint32_t step)
411 {
412 GetIdleWorkersList(step);
413 step = std::min(step, static_cast<uint32_t>(freeList_.size()));
414 uint32_t count = 0;
415 for (size_t i = 0; i < freeList_.size(); i++) {
416 auto worker = freeList_[i];
417 if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
418 continue;
419 }
420 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
421 if (idleTime < MAX_IDLE_TIME || worker->HasRunningTasks()) {
422 continue;
423 }
424 idleWorkers_.erase(worker);
425 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
426 worker->PostReleaseSignal();
427 if (++count == step) {
428 break;
429 }
430 }
431 freeList_.clear();
432 }
433 #else
GetIdleWorkers()434 uint32_t TaskManager::GetIdleWorkers()
435 {
436 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
437 return idleWorkers_.size();
438 }
439
TriggerShrink(uint32_t step)440 void TaskManager::TriggerShrink(uint32_t step)
441 {
442 for (uint32_t i = 0; i < step; i++) {
443 // try to free the worker that idle time meets the requirement
444 auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker* worker) {
445 auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
446 return idleTime > MAX_IDLE_TIME && !worker->HasRunningTasks() && !worker->HasLongTask();
447 });
448 // remove it from all sets
449 if (iter != idleWorkers_.end()) {
450 auto worker = *iter;
451 idleWorkers_.erase(worker);
452 HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
453 worker->PostReleaseSignal();
454 }
455 }
456 }
457 #endif
458
NotifyShrink(uint32_t targetNum)459 void TaskManager::NotifyShrink(uint32_t targetNum)
460 {
461 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
462 uint32_t workerCount = workers_.size();
463 uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
464 CheckTasksAndReportHisysEvent();
465 // update the maxThreads_ periodically
466 maxThreads_ = ConcurrentHelper::GetMaxThreads();
467 if (minThread == 0) {
468 HILOG_INFO("taskpool:: the system now is under low memory");
469 }
470 if (workerCount > minThread && workerCount > targetNum) {
471 targetNum = std::max(minThread, targetNum);
472 uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
473 TriggerShrink(step);
474 }
475 // remove all timeout workers
476 for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
477 auto worker = *iter;
478 if (workers_.find(worker) == workers_.end()) {
479 HILOG_WARN("taskpool:: current worker maybe release");
480 iter = timeoutWorkers_.erase(iter);
481 } else if (!worker->HasRunningTasks()) {
482 HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", worker->tid_);
483 worker->PostReleaseSignal();
484 timeoutWorkers_.erase(iter++);
485 return;
486 } else {
487 iter++;
488 }
489 }
490 uint32_t idleNum = idleWorkers_.size();
491 // System memory state is moderate and the worker has exeuted tasks, we will try to release it
492 if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
493 auto worker = *(idleWorkers_.begin());
494 // worker that has longTask should not be released
495 if (worker == nullptr || worker->HasLongTask()) {
496 return;
497 }
498 if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
499 TriggerShrink(DEFAULT_MIN_THREADS);
500 return;
501 }
502 }
503
504 // Create a worker for performance
505 if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
506 CreateWorkers(hostEnv_);
507 }
508 // stop the timer
509 if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
510 suspend_ = true;
511 uv_timer_stop(balanceTimer_);
512 HILOG_DEBUG("taskpool:: timer will be suspended");
513 }
514 }
515
TriggerLoadBalance(const uv_timer_t * req)516 void TaskManager::TriggerLoadBalance([[maybe_unused]] const uv_timer_t* req)
517 {
518 TaskManager& taskManager = TaskManager::GetInstance();
519 taskManager.CheckForBlockedWorkers();
520 uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
521 taskManager.NotifyShrink(targetNum);
522 taskManager.CountTraceForWorker();
523 }
524
DispatchAndTryExpand(const uv_async_t * req)525 void TaskManager::DispatchAndTryExpand([[maybe_unused]] const uv_async_t* req)
526 {
527 TaskManager& taskManager = TaskManager::GetInstance();
528 taskManager.DispatchAndTryExpandInner();
529 }
530
DispatchAndTryExpandInner()531 void TaskManager::DispatchAndTryExpandInner()
532 {
533 // dispatch task in the TaskPoolManager thread
534 NotifyExecuteTask();
535 // do not check the worker idleTime first
536 TryExpandWithCheckIdle<false>();
537 if (!timerTriggered_ && GetNonIdleTaskNum() != 0) {
538 timerTriggered_ = true;
539 // use timer to check worker idle time after dispatch task
540 // if worker has been blocked or fd is broken, taskpool can expand automatically
541 uv_timer_start(expandTimer_, reinterpret_cast<uv_timer_cb>(TryExpand), TRIGGER_EXPAND_INTERVAL, 0);
542 }
543 }
544
TryExpand(const uv_timer_t * req)545 void TaskManager::TryExpand([[maybe_unused]] const uv_timer_t* req)
546 {
547 TaskManager& taskManager = TaskManager::GetInstance();
548 taskManager.TryExpandWithCheckIdle<true>();
549 }
550
551 template <bool needCheckIdle>
TryExpandWithCheckIdle()552 void TaskManager::TryExpandWithCheckIdle()
553 {
554 if (GetNonIdleTaskNum() == 0) {
555 return;
556 }
557
558 needChecking_ = false;
559 timerTriggered_ = false;
560 uint32_t targetNum = ComputeSuitableIdleNum();
561 uint32_t workerCount = 0;
562 uint32_t idleCount = 0;
563 uint32_t timeoutWorkers = 0;
564 {
565 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
566 workerCount = workers_.size();
567 timeoutWorkers = timeoutWorkers_.size();
568 if constexpr (needCheckIdle) {
569 uint64_t currTime = ConcurrentHelper::GetMilliseconds();
570 idleCount = static_cast<uint32_t>(std::count_if(idleWorkers_.begin(), idleWorkers_.end(),
571 [currTime](const auto& worker) {
572 return worker->IsRunnable(currTime);
573 }));
574 } else {
575 idleCount = idleWorkers_.size();
576 }
577 }
578 uint32_t maxThreads = std::max(maxThreads_, DEFAULT_THREADS);
579 maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
580 if (workerCount < maxThreads && idleCount < targetNum) {
581 // Prevent the total number of workers from exceeding maxThreads
582 uint32_t step = std::min(std::min(maxThreads, targetNum) - idleCount, maxThreads - workerCount);
583 step = step >= expandingCount_ ? step - expandingCount_ : 0;
584 if (step == 0) {
585 return;
586 }
587 CreateWorkers(hostEnv_, step);
588 HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
589 maxThreads, step, GetThreadNum());
590 }
591 if (UNLIKELY(suspend_)) {
592 suspend_ = false;
593 uv_timer_again(balanceTimer_);
594 }
595 }
596
RunTaskManager()597 void TaskManager::RunTaskManager()
598 {
599 loop_ = uv_loop_new();
600 if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
601 HILOG_FATAL("taskpool:: new loop failed.");
602 return;
603 }
604 ConcurrentHelper::UvHandleInit(loop_, dispatchHandle_, TaskManager::DispatchAndTryExpand);
605 balanceTimer_ = new uv_timer_t;
606 uv_timer_init(loop_, balanceTimer_);
607 uv_timer_start(balanceTimer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
608
609 expandTimer_ = new uv_timer_t;
610 uv_timer_init(loop_, expandTimer_);
611
612 isHandleInited_ = true;
613 #if defined IOS_PLATFORM || defined MAC_PLATFORM
614 pthread_setname_np("OS_TaskManager");
615 #else
616 pthread_setname_np(pthread_self(), "OS_TaskManager");
617 #endif
618 if (UNLIKELY(needChecking_)) {
619 needChecking_ = false;
620 uv_async_send(dispatchHandle_);
621 }
622 uv_run(loop_, UV_RUN_DEFAULT);
623 if (loop_ != nullptr) {
624 uv_loop_delete(loop_);
625 }
626 }
627
CancelTask(napi_env env,uint32_t taskId)628 void TaskManager::CancelTask(napi_env env, uint32_t taskId)
629 {
630 // 1. Cannot find taskInfo by executeId, throw error
631 // 2. Find executing taskInfo, skip it
632 // 3. Find waiting taskInfo, cancel it
633 // 4. Find canceled taskInfo, skip it
634 std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
635 HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
636 HITRACE_HELPER_METER_NAME(strTrace);
637 Task* task = GetTask(taskId);
638 if (task == nullptr) {
639 std::string errMsg = "taskpool:: the task may not exist";
640 HILOG_ERROR("%{public}s", errMsg.c_str());
641 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
642 return;
643 }
644 if (task->taskState_ == ExecuteState::CANCELED) {
645 HILOG_DEBUG("taskpool:: task has been canceled");
646 return;
647 }
648 if (task->IsGroupCommonTask()) {
649 // when task is a group common task, still check the state
650 if (task->currentTaskInfo_ == nullptr || task->taskState_ == ExecuteState::NOT_FOUND ||
651 task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
652 std::string errMsg = "taskpool:: task is not executed or has been executed";
653 HILOG_ERROR("%{public}s", errMsg.c_str());
654 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
655 return;
656 }
657 TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
658 if (taskGroup == nullptr) {
659 return;
660 }
661 return taskGroup->CancelGroupTask(env, task->taskId_);
662 }
663 if (task->IsPeriodicTask()) {
664 task->taskState_.exchange(ExecuteState::CANCELED);
665 return;
666 }
667 if (task->IsSeqRunnerTask()) {
668 CancelSeqRunnerTask(env, task);
669 return;
670 }
671 if (task->IsAsyncRunnerTask()) {
672 AsyncRunnerManager::GetInstance().CancelAsyncRunnerTask(env, task);
673 return;
674 }
675 ExecuteState state = ExecuteState::NOT_FOUND;
676 {
677 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
678 if (task->taskState_ == ExecuteState::CANCELED) {
679 HILOG_DEBUG("taskpool:: task has been canceled");
680 return;
681 }
682 if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
683 task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
684 task->taskState_ == ExecuteState::ENDING) {
685 std::string errMsg = "taskpool:: task is not executed or has been executed";
686 HILOG_ERROR("%{public}s", errMsg.c_str());
687 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
688 return;
689 }
690 state = task->taskState_.exchange(ExecuteState::CANCELED);
691 }
692 if (task->IsSameEnv(env)) {
693 task->CancelInner(state);
694 return;
695 }
696 CancelTaskMessage* message = new CancelTaskMessage(state, task->taskId_);
697 task->TriggerCancel(message);
698 }
699
CancelSeqRunnerTask(napi_env env,Task * task)700 void TaskManager::CancelSeqRunnerTask(napi_env env, Task* task)
701 {
702 {
703 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
704 if (task->taskState_ != ExecuteState::FINISHED) {
705 task->taskState_ = ExecuteState::CANCELED;
706 return;
707 }
708 }
709 std::string errMsg = "taskpool:: sequenceRunner task has been executed";
710 HILOG_ERROR("%{public}s", errMsg.c_str());
711 ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
712 }
713
NotifyWorkerIdle(Worker * worker)714 void TaskManager::NotifyWorkerIdle(Worker* worker)
715 {
716 {
717 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
718 if (worker->state_ == WorkerState::BLOCKED) {
719 return;
720 }
721 idleWorkers_.insert(worker);
722 }
723 if (GetTaskNum() != 0) {
724 NotifyExecuteTask();
725 }
726 CountTraceForWorker();
727 }
728
NotifyWorkerCreated(Worker * worker)729 void TaskManager::NotifyWorkerCreated(Worker* worker)
730 {
731 NotifyWorkerIdle(worker);
732 expandingCount_--;
733 }
734
NotifyWorkerAdded(Worker * worker)735 void TaskManager::NotifyWorkerAdded(Worker* worker)
736 {
737 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
738 workers_.insert(worker);
739 HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
740 }
741
NotifyWorkerRunning(Worker * worker)742 void TaskManager::NotifyWorkerRunning(Worker* worker)
743 {
744 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
745 idleWorkers_.erase(worker);
746 CountTraceForWorker();
747 }
748
GetRunningWorkers()749 uint32_t TaskManager::GetRunningWorkers()
750 {
751 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
752 return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
753 return worker->HasRunningTasks();
754 });
755 }
756
GetTimeoutWorkers()757 uint32_t TaskManager::GetTimeoutWorkers()
758 {
759 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
760 return timeoutWorkers_.size();
761 }
762
GetTaskNum()763 uint32_t TaskManager::GetTaskNum()
764 {
765 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
766 uint32_t sum = 0;
767 for (const auto& elements : taskQueues_) {
768 sum += elements->GetTaskNum();
769 }
770 return sum;
771 }
772
GetNonIdleTaskNum()773 uint32_t TaskManager::GetNonIdleTaskNum()
774 {
775 return nonIdleTaskNum_;
776 }
777
IncreaseNumIfNoIdle(Priority priority)778 void TaskManager::IncreaseNumIfNoIdle(Priority priority)
779 {
780 if (priority != Priority::IDLE) {
781 ++nonIdleTaskNum_;
782 }
783 }
784
DecreaseNumIfNoIdle(Priority priority)785 void TaskManager::DecreaseNumIfNoIdle(Priority priority)
786 {
787 if (priority != Priority::IDLE) {
788 --nonIdleTaskNum_;
789 }
790 }
791
GetThreadNum()792 uint32_t TaskManager::GetThreadNum()
793 {
794 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
795 return workers_.size();
796 }
797
EnqueueTaskId(uint32_t taskId,Priority priority)798 void TaskManager::EnqueueTaskId(uint32_t taskId, Priority priority)
799 {
800 {
801 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
802 IncreaseNumIfNoIdle(priority);
803 taskQueues_[priority]->EnqueueTaskId(taskId);
804 }
805 TryTriggerExpand();
806 Task* task = GetTask(taskId);
807 if (task == nullptr) {
808 HILOG_FATAL("taskpool:: task is nullptr");
809 return;
810 }
811 task->IncreaseTaskRefCount();
812 if (task->onEnqueuedCallBackInfo_ != nullptr) {
813 task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_);
814 }
815 }
816
EraseWaitingTaskId(uint32_t taskId,Priority priority)817 bool TaskManager::EraseWaitingTaskId(uint32_t taskId, Priority priority)
818 {
819 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
820 if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
821 HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
822 return false;
823 }
824 return true;
825 }
826
DequeueTaskId()827 std::pair<uint32_t, Priority> TaskManager::DequeueTaskId()
828 {
829 std::lock_guard<std::mutex> lock(taskQueuesMutex_);
830 auto& highTaskQueue = taskQueues_[Priority::HIGH];
831 if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
832 highPrioExecuteCount_++;
833 return GetTaskByPriority(highTaskQueue, Priority::HIGH);
834 }
835 highPrioExecuteCount_ = 0;
836
837 auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
838 if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
839 mediumPrioExecuteCount_++;
840 return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
841 }
842 mediumPrioExecuteCount_ = 0;
843
844 auto& lowTaskQueue = taskQueues_[Priority::LOW];
845 if (!lowTaskQueue->IsEmpty()) {
846 return GetTaskByPriority(lowTaskQueue, Priority::LOW);
847 }
848
849 auto& idleTaskQueue = taskQueues_[Priority::IDLE];
850 if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) {
851 return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
852 }
853 return std::make_pair(0, Priority::LOW);
854 }
855
IsChooseIdle()856 bool TaskManager::IsChooseIdle()
857 {
858 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
859 for (auto& worker : workers_) {
860 if (worker->state_ == WorkerState::IDLE) {
861 // If worker->state_ is WorkerState::IDLE, it means that the worker is free
862 continue;
863 }
864 // If there is a worker running a task, do not take the idle task.
865 return false;
866 }
867 // Only when all workers are free, will idle task be taken.
868 return true;
869 }
870
GetTaskByPriority(const std::unique_ptr<ExecuteQueue> & taskQueue,Priority priority)871 std::pair<uint32_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
872 Priority priority)
873 {
874 uint32_t taskId = taskQueue->DequeueTaskId();
875 if (IsDependendByTaskId(taskId)) {
876 EnqueuePendingTaskInfo(taskId, priority);
877 return std::make_pair(0, priority);
878 }
879 DecreaseNumIfNoIdle(priority);
880 preDequeneTime_ = ConcurrentHelper::GetMilliseconds();
881 return std::make_pair(taskId, priority);
882 }
883
NotifyExecuteTask()884 void TaskManager::NotifyExecuteTask()
885 {
886 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
887 if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
888 // When there are only idle tasks and workers executing them, it is not triggered
889 return;
890 }
891
892 for (auto& worker : idleWorkers_) {
893 worker->NotifyExecuteTask();
894 }
895 }
896
InitTaskManager(napi_env env)897 void TaskManager::InitTaskManager(napi_env env)
898 {
899 HITRACE_HELPER_METER_NAME("InitTaskManager");
900 if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
901 #if defined(ENABLE_TASKPOOL_FFRT)
902 globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
903 if (!globalEnableFfrtFlag_) {
904 UpdateSystemAppFlag();
905 if (IsSystemApp()) {
906 disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
907 }
908 }
909 if (EnableFfrt()) {
910 HILOG_INFO("taskpool:: apps use ffrt");
911 } else {
912 HILOG_INFO("taskpool:: apps do not use ffrt");
913 }
914 #endif
915 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
916 mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
917 OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
918 #endif
919 auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
920 if (mainThreadEngine == nullptr) {
921 HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
922 return;
923 }
924 hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
925 // Add a reserved thread for taskpool
926 CreateWorkers(hostEnv_);
927 // Create a timer to manage worker threads
928 std::thread workerManager([this] {this->RunTaskManager();});
929 workerManager.detach();
930 }
931 }
932
CreateWorkers(napi_env env,uint32_t num)933 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
934 {
935 HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
936 for (uint32_t i = 0; i < num; i++) {
937 expandingCount_++;
938 auto worker = Worker::WorkerConstructor(env);
939 NotifyWorkerAdded(worker);
940 }
941 CountTraceForWorker();
942 }
943
RemoveWorker(Worker * worker)944 void TaskManager::RemoveWorker(Worker* worker)
945 {
946 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
947 idleWorkers_.erase(worker);
948 timeoutWorkers_.erase(worker);
949 workers_.erase(worker);
950 }
951
RestoreWorker(Worker * worker)952 void TaskManager::RestoreWorker(Worker* worker)
953 {
954 std::lock_guard<std::recursive_mutex> lock(workersMutex_);
955 if (UNLIKELY(suspend_)) {
956 suspend_ = false;
957 uv_timer_again(balanceTimer_);
958 }
959 if (worker->state_ == WorkerState::BLOCKED) {
960 // since the worker is blocked, we should add it to the timeout set
961 timeoutWorkers_.insert(worker);
962 return;
963 }
964 // Since the worker may be executing some tasks in IO thread, we should add it to the
965 // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
966 HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
967 idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
968 if (GetTaskNum() != 0) {
969 NotifyExecuteTask();
970 }
971 }
972
973 // ---------------------------------- SendData ---------------------------------------
RegisterCallback(napi_env env,uint32_t taskId,std::shared_ptr<CallbackInfo> callbackInfo)974 void TaskManager::RegisterCallback(napi_env env, uint32_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
975 {
976 std::lock_guard<std::mutex> lock(callbackMutex_);
977 callbackTable_[taskId] = callbackInfo;
978 }
979
GetCallbackInfo(uint32_t taskId)980 std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint32_t taskId)
981 {
982 std::lock_guard<std::mutex> lock(callbackMutex_);
983 auto iter = callbackTable_.find(taskId);
984 if (iter == callbackTable_.end() || iter->second == nullptr) {
985 HILOG_ERROR("taskpool:: the callback does not exist");
986 return nullptr;
987 }
988 return iter->second;
989 }
990
IncreaseRefCount(uint32_t taskId)991 void TaskManager::IncreaseRefCount(uint32_t taskId)
992 {
993 if (taskId == 0) { // do not support func
994 return;
995 }
996 std::lock_guard<std::mutex> lock(callbackMutex_);
997 auto iter = callbackTable_.find(taskId);
998 if (iter == callbackTable_.end() || iter->second == nullptr) {
999 return;
1000 }
1001 iter->second->refCount++;
1002 }
1003
DecreaseRefCount(napi_env env,uint32_t taskId)1004 void TaskManager::DecreaseRefCount(napi_env env, uint32_t taskId)
1005 {
1006 if (taskId == 0) { // do not support func
1007 return;
1008 }
1009 std::lock_guard<std::mutex> lock(callbackMutex_);
1010 auto iter = callbackTable_.find(taskId);
1011 if (iter == callbackTable_.end() || iter->second == nullptr) {
1012 return;
1013 }
1014
1015 auto task = GetTask(taskId);
1016 if (task != nullptr && !task->IsValid()) {
1017 callbackTable_.erase(iter);
1018 return;
1019 }
1020
1021 iter->second->refCount--;
1022 if (iter->second->refCount == 0) {
1023 callbackTable_.erase(iter);
1024 }
1025 }
1026
ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo> & callbackInfo)1027 void TaskManager::ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo)
1028 {
1029 std::lock_guard<std::mutex> lock(callbackMutex_);
1030 callbackInfo->worker = nullptr;
1031 }
1032
NotifyCallbackExecute(napi_env env,TaskResultInfo * resultInfo,Task * task)1033 napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
1034 {
1035 HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str());
1036 std::lock_guard<std::mutex> lock(callbackMutex_);
1037 auto iter = callbackTable_.find(task->taskId_);
1038 if (iter == callbackTable_.end() || iter->second == nullptr) {
1039 HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
1040 ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
1041 delete resultInfo;
1042 return nullptr;
1043 }
1044 Worker* worker = static_cast<Worker*>(task->worker_);
1045 worker->Enqueue(task->env_, resultInfo);
1046 auto callbackInfo = iter->second;
1047 callbackInfo->refCount++;
1048 callbackInfo->worker = worker;
1049 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
1050 workerEngine->IncreaseListeningCounter();
1051 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1052 if (task->IsMainThreadTask()) {
1053 HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask");
1054 auto onCallbackTask = [callbackInfo]() {
1055 TaskPool::ExecuteCallbackTask(callbackInfo.get());
1056 };
1057 TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_);
1058 } else {
1059 callbackInfo->onCallbackSignal->data = callbackInfo.get();
1060 uv_async_send(callbackInfo->onCallbackSignal);
1061 }
1062 #else
1063 callbackInfo->onCallbackSignal->data = callbackInfo.get();
1064 uv_async_send(callbackInfo->onCallbackSignal);
1065 #endif
1066 return nullptr;
1067 }
1068
GetMessageQueue(const uv_async_t * req)1069 MsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req)
1070 {
1071 std::lock_guard<std::mutex> lock(callbackMutex_);
1072 auto info = static_cast<CallbackInfo*>(req->data);
1073 if (info == nullptr || info->worker == nullptr) {
1074 HILOG_WARN("taskpool:: info or worker is nullptr");
1075 return nullptr;
1076 }
1077 auto worker = info->worker;
1078 MsgQueue* queue = nullptr;
1079 worker->Dequeue(info->hostEnv, queue);
1080 return queue;
1081 }
1082
GetMessageQueueFromCallbackInfo(CallbackInfo * callbackInfo)1083 MsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo)
1084 {
1085 std::lock_guard<std::mutex> lock(callbackMutex_);
1086 if (callbackInfo == nullptr || callbackInfo->worker == nullptr) {
1087 HILOG_WARN("taskpool:: callbackInfo or worker is nullptr");
1088 return nullptr;
1089 }
1090 auto worker = callbackInfo->worker;
1091 MsgQueue* queue = nullptr;
1092 worker->Dequeue(callbackInfo->hostEnv, queue);
1093 return queue;
1094 }
1095 // ---------------------------------- SendData ---------------------------------------
1096
NotifyDependencyTaskInfo(uint32_t taskId)1097 void TaskManager::NotifyDependencyTaskInfo(uint32_t taskId)
1098 {
1099 HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
1100 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
1101 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1102 auto iter = dependentTaskInfos_.find(taskId);
1103 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1104 HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1105 return;
1106 }
1107 for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1108 auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1109 RemoveDependencyById(taskId, *taskIdIter);
1110 taskIdIter = iter->second.erase(taskIdIter);
1111 if (taskInfo.first != 0) {
1112 EnqueueTaskId(taskInfo.first, taskInfo.second);
1113 }
1114 }
1115 }
1116
RemoveDependencyById(uint32_t dependentTaskId,uint32_t taskId)1117 void TaskManager::RemoveDependencyById(uint32_t dependentTaskId, uint32_t taskId)
1118 {
1119 HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
1120 // remove dependency after task execute
1121 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1122 auto dependTaskIter = dependTaskInfos_.find(taskId);
1123 if (dependTaskIter != dependTaskInfos_.end()) {
1124 auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
1125 if (dependTaskInnerIter != dependTaskIter->second.end()) {
1126 dependTaskIter->second.erase(dependTaskInnerIter);
1127 }
1128 }
1129 }
1130
IsDependendByTaskId(uint32_t taskId)1131 bool TaskManager::IsDependendByTaskId(uint32_t taskId)
1132 {
1133 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1134 auto iter = dependTaskInfos_.find(taskId);
1135 if (iter == dependTaskInfos_.end() || iter->second.empty()) {
1136 return false;
1137 }
1138 return true;
1139 }
1140
IsDependentByTaskId(uint32_t dependentTaskId)1141 bool TaskManager::IsDependentByTaskId(uint32_t dependentTaskId)
1142 {
1143 std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1144 auto iter = dependentTaskInfos_.find(dependentTaskId);
1145 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1146 return false;
1147 }
1148 return true;
1149 }
1150
StoreTaskDependency(uint32_t taskId,std::set<uint32_t> taskIdSet)1151 bool TaskManager::StoreTaskDependency(uint32_t taskId, std::set<uint32_t> taskIdSet)
1152 {
1153 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
1154 StoreDependentTaskInfo(taskIdSet, taskId);
1155 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1156 auto iter = dependTaskInfos_.find(taskId);
1157 if (iter == dependTaskInfos_.end()) {
1158 for (const auto& dependentId : taskIdSet) {
1159 auto idIter = dependTaskInfos_.find(dependentId);
1160 if (idIter == dependTaskInfos_.end()) {
1161 continue;
1162 }
1163 if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
1164 return false;
1165 }
1166 }
1167 dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
1168 return true;
1169 }
1170
1171 for (const auto& dependentId : iter->second) {
1172 auto idIter = dependTaskInfos_.find(dependentId);
1173 if (idIter == dependTaskInfos_.end()) {
1174 continue;
1175 }
1176 if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
1177 return false;
1178 }
1179 }
1180 iter->second.insert(taskIdSet.begin(), taskIdSet.end());
1181 return true;
1182 }
1183
CheckCircularDependency(std::set<uint32_t> dependentIdSet,std::set<uint32_t> idSet,uint32_t taskId)1184 bool TaskManager::CheckCircularDependency(std::set<uint32_t> dependentIdSet, std::set<uint32_t> idSet, uint32_t taskId)
1185 {
1186 for (const auto& id : idSet) {
1187 if (id == taskId) {
1188 return false;
1189 }
1190 auto iter = dependentIdSet.find(id);
1191 if (iter != dependentIdSet.end()) {
1192 continue;
1193 }
1194 auto dIter = dependTaskInfos_.find(id);
1195 if (dIter == dependTaskInfos_.end()) {
1196 continue;
1197 }
1198 if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
1199 return false;
1200 }
1201 }
1202 return true;
1203 }
1204
RemoveTaskDependency(uint32_t taskId,uint32_t dependentId)1205 bool TaskManager::RemoveTaskDependency(uint32_t taskId, uint32_t dependentId)
1206 {
1207 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
1208 RemoveDependentTaskInfo(dependentId, taskId);
1209 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1210 auto iter = dependTaskInfos_.find(taskId);
1211 if (iter == dependTaskInfos_.end()) {
1212 return false;
1213 }
1214 auto dependIter = iter->second.find(dependentId);
1215 if (dependIter == iter->second.end()) {
1216 return false;
1217 }
1218 iter->second.erase(dependIter);
1219 return true;
1220 }
1221
EnqueuePendingTaskInfo(uint32_t taskId,Priority priority)1222 void TaskManager::EnqueuePendingTaskInfo(uint32_t taskId, Priority priority)
1223 {
1224 if (taskId == 0) {
1225 return;
1226 }
1227 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1228 pendingTaskInfos_.emplace(taskId, priority);
1229 }
1230
DequeuePendingTaskInfo(uint32_t taskId)1231 std::pair<uint32_t, Priority> TaskManager::DequeuePendingTaskInfo(uint32_t taskId)
1232 {
1233 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1234 if (pendingTaskInfos_.empty()) {
1235 return std::make_pair(0, Priority::DEFAULT);
1236 }
1237 std::pair<uint32_t, Priority> result;
1238 for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
1239 if (it->first == taskId) {
1240 result = std::make_pair(it->first, it->second);
1241 it = pendingTaskInfos_.erase(it);
1242 break;
1243 }
1244 }
1245 return result;
1246 }
1247
RemovePendingTaskInfo(uint32_t taskId)1248 void TaskManager::RemovePendingTaskInfo(uint32_t taskId)
1249 {
1250 HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
1251 std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1252 pendingTaskInfos_.erase(taskId);
1253 }
1254
StoreDependentTaskInfo(std::set<uint32_t> dependentTaskIdSet,uint32_t taskId)1255 void TaskManager::StoreDependentTaskInfo(std::set<uint32_t> dependentTaskIdSet, uint32_t taskId)
1256 {
1257 HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
1258 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1259 for (const auto& id : dependentTaskIdSet) {
1260 auto iter = dependentTaskInfos_.find(id);
1261 if (iter == dependentTaskInfos_.end()) {
1262 std::set<uint32_t> set{taskId};
1263 dependentTaskInfos_.emplace(id, std::move(set));
1264 } else {
1265 iter->second.emplace(taskId);
1266 }
1267 }
1268 }
1269
RemoveDependentTaskInfo(uint32_t dependentTaskId,uint32_t taskId)1270 void TaskManager::RemoveDependentTaskInfo(uint32_t dependentTaskId, uint32_t taskId)
1271 {
1272 HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
1273 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1274 auto iter = dependentTaskInfos_.find(dependentTaskId);
1275 if (iter == dependentTaskInfos_.end()) {
1276 return;
1277 }
1278 auto taskIter = iter->second.find(taskId);
1279 if (taskIter == iter->second.end()) {
1280 return;
1281 }
1282 iter->second.erase(taskIter);
1283 }
1284
GetTaskDependInfoToString(uint32_t taskId)1285 std::string TaskManager::GetTaskDependInfoToString(uint32_t taskId)
1286 {
1287 std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1288 std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
1289 auto iter = dependTaskInfos_.find(taskId);
1290 if (iter != dependTaskInfos_.end()) {
1291 for (const auto& id : iter->second) {
1292 str += " " + std::to_string(id);
1293 }
1294 }
1295 return str;
1296 }
1297
StoreTaskDuration(uint32_t taskId,uint64_t totalDuration,uint64_t cpuDuration)1298 void TaskManager::StoreTaskDuration(uint32_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
1299 {
1300 HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
1301 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1302 auto iter = taskDurationInfos_.find(taskId);
1303 if (iter == taskDurationInfos_.end()) {
1304 std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
1305 taskDurationInfos_.emplace(taskId, std::move(durationData));
1306 } else {
1307 if (totalDuration != 0) {
1308 iter->second.first = totalDuration;
1309 }
1310 if (cpuDuration != 0) {
1311 iter->second.second = cpuDuration;
1312 }
1313 }
1314 }
1315
GetTaskDuration(uint32_t taskId,std::string durationType)1316 uint64_t TaskManager::GetTaskDuration(uint32_t taskId, std::string durationType)
1317 {
1318 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1319 auto iter = taskDurationInfos_.find(taskId);
1320 if (iter == taskDurationInfos_.end()) {
1321 return 0;
1322 }
1323 if (durationType == TASK_TOTAL_TIME) {
1324 return iter->second.first;
1325 } else if (durationType == TASK_CPU_TIME) {
1326 return iter->second.second;
1327 } else if (iter->second.first == 0) {
1328 return 0;
1329 }
1330 return iter->second.first - iter->second.second;
1331 }
1332
RemoveTaskDuration(uint32_t taskId)1333 void TaskManager::RemoveTaskDuration(uint32_t taskId)
1334 {
1335 HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
1336 std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1337 auto iter = taskDurationInfos_.find(taskId);
1338 if (iter != taskDurationInfos_.end()) {
1339 taskDurationInfos_.erase(iter);
1340 }
1341 }
1342
StoreLongTaskInfo(uint32_t taskId,Worker * worker)1343 void TaskManager::StoreLongTaskInfo(uint32_t taskId, Worker* worker)
1344 {
1345 std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1346 longTasksMap_.emplace(taskId, worker);
1347 }
1348
RemoveLongTaskInfo(uint32_t taskId)1349 void TaskManager::RemoveLongTaskInfo(uint32_t taskId)
1350 {
1351 std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1352 longTasksMap_.erase(taskId);
1353 }
1354
GetLongTaskInfo(uint32_t taskId)1355 Worker* TaskManager::GetLongTaskInfo(uint32_t taskId)
1356 {
1357 std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
1358 auto iter = longTasksMap_.find(taskId);
1359 return iter != longTasksMap_.end() ? iter->second : nullptr;
1360 }
1361
TerminateTask(uint32_t taskId)1362 void TaskManager::TerminateTask(uint32_t taskId)
1363 {
1364 HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
1365 auto worker = GetLongTaskInfo(taskId);
1366 if (UNLIKELY(worker == nullptr)) {
1367 return;
1368 }
1369 worker->TerminateTask(taskId);
1370 RemoveLongTaskInfo(taskId);
1371 }
1372
ReleaseTaskData(napi_env env,Task * task,bool shouldDeleteTask)1373 void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
1374 {
1375 uint32_t taskId = task->taskId_;
1376 if (shouldDeleteTask) {
1377 RemoveTask(taskId);
1378 }
1379
1380 task->ReleaseData();
1381 task->CancelPendingTask(env);
1382
1383 task->ClearDelayedTimers();
1384
1385 if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
1386 return;
1387 }
1388 DecreaseRefCount(env, taskId);
1389 RemoveTaskDuration(taskId);
1390 RemovePendingTaskInfo(taskId);
1391 ReleaseCallBackInfo(task);
1392 {
1393 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1394 for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
1395 if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
1396 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
1397 } else {
1398 ++dependentTaskIter;
1399 }
1400 }
1401 }
1402 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1403 auto dependTaskIter = dependTaskInfos_.find(taskId);
1404 if (dependTaskIter != dependTaskInfos_.end()) {
1405 dependTaskInfos_.erase(dependTaskIter);
1406 }
1407 }
1408
ReleaseCallBackInfo(Task * task)1409 void TaskManager::ReleaseCallBackInfo(Task* task)
1410 {
1411 HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
1412 std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
1413 if (task->onEnqueuedCallBackInfo_ != nullptr) {
1414 delete task->onEnqueuedCallBackInfo_;
1415 task->onEnqueuedCallBackInfo_ = nullptr;
1416 }
1417
1418 if (task->onStartExecutionCallBackInfo_ != nullptr) {
1419 delete task->onStartExecutionCallBackInfo_;
1420 task->onStartExecutionCallBackInfo_ = nullptr;
1421 }
1422
1423 if (task->onExecutionFailedCallBackInfo_ != nullptr) {
1424 delete task->onExecutionFailedCallBackInfo_;
1425 task->onExecutionFailedCallBackInfo_ = nullptr;
1426 }
1427
1428 if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
1429 delete task->onExecutionSucceededCallBackInfo_;
1430 task->onExecutionSucceededCallBackInfo_ = nullptr;
1431 }
1432
1433 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1434 if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
1435 if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) {
1436 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1437 } else {
1438 delete task->onStartExecutionSignal_;
1439 task->onStartExecutionSignal_ = nullptr;
1440 }
1441 }
1442 #else
1443 if (task->onStartExecutionSignal_ != nullptr) {
1444 if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) {
1445 ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1446 } else {
1447 delete task->onStartExecutionSignal_;
1448 task->onStartExecutionSignal_ = nullptr;
1449 }
1450 }
1451 #endif
1452 }
1453
StoreTask(Task * task)1454 void TaskManager::StoreTask(Task* task)
1455 {
1456 uint64_t id = reinterpret_cast<uint64_t>(task);
1457 uint32_t taskId = CalculateTaskId(id);
1458 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1459 while (tasks_.find(taskId) != tasks_.end()) {
1460 id++;
1461 taskId = CalculateTaskId(id);
1462 }
1463 task->SetTaskId(taskId);
1464 tasks_.emplace(taskId, task);
1465 }
1466
RemoveTask(uint32_t taskId)1467 void TaskManager::RemoveTask(uint32_t taskId)
1468 {
1469 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1470 tasks_.erase(taskId);
1471 }
1472
GetTask(uint32_t taskId)1473 Task* TaskManager::GetTask(uint32_t taskId)
1474 {
1475 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1476 auto iter = tasks_.find(taskId);
1477 if (iter == tasks_.end()) {
1478 return nullptr;
1479 }
1480 return iter->second;
1481 }
1482
1483 #if defined(ENABLE_TASKPOOL_FFRT)
UpdateSystemAppFlag()1484 void TaskManager::UpdateSystemAppFlag()
1485 {
1486 auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
1487 if (abilityManager == nullptr) {
1488 HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
1489 return;
1490 }
1491 auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
1492 if (bundleObj == nullptr) {
1493 HILOG_ERROR("taskpool:: fail to get bundle manager service.");
1494 return;
1495 }
1496 auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
1497 if (bundleMgr == nullptr) {
1498 HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
1499 return;
1500 }
1501 OHOS::AppExecFwk::BundleInfo bundleInfo;
1502 if (bundleMgr->GetBundleInfoForSelf(
1503 static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
1504 != OHOS::ERR_OK) {
1505 HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
1506 return;
1507 }
1508 isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
1509 }
1510 #endif
1511
1512 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
PostTask(std::function<void ()> task,const char * taskName,Priority priority)1513 bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
1514 {
1515 return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
1516 }
1517 #endif
1518
CheckTask(uint32_t taskId)1519 bool TaskManager::CheckTask(uint32_t taskId)
1520 {
1521 std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1522 auto item = tasks_.find(taskId);
1523 return item != tasks_.end();
1524 }
1525
BatchRejectDeferred(napi_env env,std::list<napi_deferred> deferreds,std::string error)1526 void TaskManager::BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error)
1527 {
1528 if (deferreds.empty()) {
1529 return;
1530 }
1531 napi_value message = ErrorHelper::NewError(env, 0, error.c_str());
1532 for (auto deferred : deferreds) {
1533 napi_reject_deferred(env, deferred, message);
1534 }
1535 }
1536
CalculateTaskId(uint64_t id)1537 uint32_t TaskManager::CalculateTaskId(uint64_t id)
1538 {
1539 size_t hash = std::hash<uint64_t>{}(id);
1540 return static_cast<uint32_t>(hash & MAX_UINT32_T);
1541 }
1542
ClearDependentTask(uint32_t taskId)1543 void TaskManager::ClearDependentTask(uint32_t taskId)
1544 {
1545 HILOG_DEBUG("taskpool:: task:%{public}s ClearDependentTask", std::to_string(taskId).c_str());
1546 RemoveDependTaskByTaskId(taskId);
1547 DequeuePendingTaskInfo(taskId);
1548 std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1549 RemoveDependentTaskByTaskId(taskId);
1550 }
1551
RemoveDependTaskByTaskId(uint32_t taskId)1552 void TaskManager::RemoveDependTaskByTaskId(uint32_t taskId)
1553 {
1554 std::set<uint32_t> dependTaskIds;
1555 {
1556 std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1557 auto iter = dependTaskInfos_.find(taskId);
1558 if (iter == dependTaskInfos_.end()) {
1559 return;
1560 }
1561 dependTaskIds.insert(iter->second.begin(), iter->second.end());
1562 dependTaskInfos_.erase(iter);
1563 }
1564 for (auto dependTaskId : dependTaskIds) {
1565 RemoveDependentTaskInfo(dependTaskId, taskId);
1566 }
1567 }
1568
RemoveDependentTaskByTaskId(uint32_t taskId)1569 void TaskManager::RemoveDependentTaskByTaskId(uint32_t taskId)
1570 {
1571 auto iter = dependentTaskInfos_.find(taskId);
1572 if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1573 HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1574 return;
1575 }
1576 for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1577 auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1578 RemoveDependencyById(taskId, *taskIdIter);
1579 auto id = *taskIdIter;
1580 taskIdIter = iter->second.erase(taskIdIter);
1581 auto task = GetTask(id);
1582 if (task == nullptr) {
1583 continue;
1584 }
1585 if (task->currentTaskInfo_ != nullptr) {
1586 EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
1587 }
1588 task->DisposeCanceledTask();
1589 RemoveDependentTaskByTaskId(task->taskId_);
1590 }
1591 }
1592
WriteHisysForFfrtAndUv(Worker * worker,HisyseventParams * hisyseventParams)1593 void TaskManager::WriteHisysForFfrtAndUv(Worker* worker, HisyseventParams* hisyseventParams)
1594 {
1595 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1596 if (!EnableFfrt() || hisyseventParams == nullptr) {
1597 return;
1598 }
1599 int32_t tid = 0;
1600 if (worker != nullptr) {
1601 auto loop = worker->GetWorkerLoop();
1602 uint64_t loopAddress = reinterpret_cast<uint64_t>(loop);
1603 hisyseventParams->message += "; current runningLoop: " + std::to_string(loopAddress);
1604 tid = worker->tid_;
1605 }
1606 hisyseventParams->tid = tid;
1607 hisyseventParams->pid = getpid();
1608 hisyseventParams->message += "; idleWorkers num: " + std::to_string(idleWorkers_.size());
1609 hisyseventParams->message += "; workers num: " + std::to_string(workers_.size());
1610 int32_t ret = DfxHisysEvent::WriteFfrtAndUv(hisyseventParams);
1611 if (ret < 0) {
1612 HILOG_WARN("taskpool:: HisyseventWrite failed, ret: %{public}s", std::to_string(ret).c_str());
1613 }
1614 #endif
1615 }
1616
CheckTasksAndReportHisysEvent()1617 void TaskManager::CheckTasksAndReportHisysEvent()
1618 {
1619 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1620 if (!EnableFfrt()) {
1621 return;
1622 }
1623 uint64_t currTime = ConcurrentHelper::GetMilliseconds();
1624 auto taskNum = GetTaskNum();
1625 if (taskNum == 0 || currTime - preDequeneTime_ < UNEXECUTE_TASK_TIME || preDequeneTime_ < reportTime_) {
1626 return;
1627 }
1628 std::string message = "Task scheduling timeout, total task num:" + std::to_string(taskNum);
1629 HisyseventParams* hisyseventParams = new HisyseventParams();
1630 hisyseventParams->methodName = "CheckTasksAndReportHisysEvent";
1631 hisyseventParams->funName = "";
1632 hisyseventParams->logType = "ffrt";
1633 hisyseventParams->code = DfxHisysEvent::TASK_TIMEOUT;
1634 hisyseventParams->message = message;
1635 WriteHisysForFfrtAndUv(nullptr, hisyseventParams);
1636 reportTime_ = currTime;
1637 #endif
1638 }
1639
WorkerAliveAndReport(Worker * worker)1640 void TaskManager::WorkerAliveAndReport(Worker* worker)
1641 {
1642 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1643 uint64_t workerWaitTime = worker->GetWaitTime();
1644 uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
1645 std::chrono::steady_clock::now().time_since_epoch()).count());
1646 uint64_t intervalTime = currTime - workerWaitTime;
1647 if (!worker->IsNeedReport(intervalTime)) {
1648 return;
1649 }
1650 std::string message = "worker uv alive, may have handle leak";
1651 HisyseventParams* hisyseventParams = new HisyseventParams();
1652 hisyseventParams->methodName = "WorkerAliveAndReport";
1653 hisyseventParams->funName = "";
1654 hisyseventParams->logType = "ffrt";
1655 hisyseventParams->code = DfxHisysEvent::WORKER_ALIVE;
1656 hisyseventParams->message = message;
1657 hisyseventParams->waitTime = intervalTime;
1658 WriteHisysForFfrtAndUv(worker, hisyseventParams);
1659 worker->IncreaseReportCount();
1660 #endif
1661 }
1662
UvReportHisysEvent(Worker * worker,std::string methodName,std::string funName,std::string message,int32_t code)1663 void TaskManager::UvReportHisysEvent(Worker* worker, std::string methodName, std::string funName, std::string message,
1664 int32_t code)
1665 {
1666 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1667 HisyseventParams* hisyseventParams = new HisyseventParams();
1668 hisyseventParams->methodName = methodName;
1669 hisyseventParams->funName = funName;
1670 hisyseventParams->logType = "uv";
1671 hisyseventParams->code = code;
1672 hisyseventParams->message = message;
1673 WriteHisysForFfrtAndUv(worker, hisyseventParams);
1674 #endif
1675 }
1676 } // namespace Commonlibrary::Concurrent::TaskPoolModule