• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_manager.h"
17 
18 #include <securec.h>
19 #include <thread>
20 
21 #include "async_runner_manager.h"
22 #if defined(ENABLE_TASKPOOL_FFRT)
23 #include "bundle_info.h"
24 #include "bundle_mgr_interface.h"
25 #include "bundle_mgr_proxy.h"
26 #include "iservice_registry.h"
27 #include "parameters.h"
28 #include "status_receiver_interface.h"
29 #include "system_ability_definition.h"
30 #include "c/executor_task.h"
31 #include "ffrt_inner.h"
32 #endif
33 #include "sys_timer.h"
34 #include "helper/hitrace_helper.h"
35 #include "taskpool.h"
36 #include "task_group_manager.h"
37 
38 namespace Commonlibrary::Concurrent::TaskPoolModule {
39 using namespace OHOS::JsSysModule;
40 template void TaskManager::TryExpandWithCheckIdle<true>();
41 template void TaskManager::TryExpandWithCheckIdle<false>();
42 
43 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
44 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
45 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
46 static constexpr uint32_t STEP_SIZE = 2;
47 static constexpr uint32_t DEFAULT_THREADS = 3;
48 static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
49 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
50 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
51 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
52 static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
53 static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
54 static constexpr uint32_t MAX_UINT32_T = 0xFFFFFFFF; // 0xFFFFFFFF: max uint32_t
55 static constexpr uint32_t TRIGGER_EXPAND_INTERVAL = 10; // 10: ms, trigger recheck expansion interval
56 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
57 static constexpr uint32_t UNEXECUTE_TASK_TIME = 60000; // 60000: 1min
58 
59 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
60 static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
61     {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
62     {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
63     {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
64     {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
65 };
66 #endif
67 
68 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()69 TaskManager& TaskManager::GetInstance()
70 {
71     static TaskManager manager;
72     return manager;
73 }
74 
TaskManager()75 TaskManager::TaskManager()
76 {
77     for (size_t i = 0; i < taskQueues_.size(); i++) {
78         std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
79         taskQueues_[i] = std::move(taskQueue);
80     }
81 }
82 
~TaskManager()83 TaskManager::~TaskManager()
84 {
85     HILOG_INFO("taskpool:: ~TaskManager");
86     if (balanceTimer_ == nullptr) {
87         HILOG_ERROR("taskpool:: balanceTimer_ is nullptr");
88     } else {
89         uv_timer_stop(balanceTimer_);
90         ConcurrentHelper::UvHandleClose(balanceTimer_);
91     }
92 
93     if (expandTimer_ == nullptr) {
94         HILOG_ERROR("taskpool:: expandTimer_ is nullptr");
95     } else {
96         uv_timer_stop(expandTimer_);
97         ConcurrentHelper::UvHandleClose(expandTimer_);
98     }
99 
100     ConcurrentHelper::UvHandleClose(dispatchHandle_);
101 
102     if (loop_ != nullptr) {
103         uv_stop(loop_);
104     }
105 
106     {
107         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
108         for (auto& worker : workers_) {
109             delete worker;
110         }
111         workers_.clear();
112     }
113 
114     {
115         std::lock_guard<std::mutex> lock(callbackMutex_);
116         for (auto& [_, callbackPtr] : callbackTable_) {
117             if (callbackPtr == nullptr) {
118                 continue;
119             }
120             callbackPtr.reset();
121         }
122         callbackTable_.clear();
123     }
124 
125     {
126         std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
127         for (auto& [_, task] : tasks_) {
128             delete task;
129             task = nullptr;
130         }
131         tasks_.clear();
132     }
133     CountTraceForWorker();
134 }
135 
CountTraceForWorker()136 void TaskManager::CountTraceForWorker()
137 {
138     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
139     int64_t threadNum = static_cast<int64_t>(workers_.size());
140     int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
141     int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
142     HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
143     HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
144     HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
145     HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
146 }
147 
GetThreadInfos(napi_env env)148 napi_value TaskManager::GetThreadInfos(napi_env env)
149 {
150     napi_value threadInfos = nullptr;
151     napi_create_array(env, &threadInfos);
152     {
153         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
154         int32_t i = 0;
155         for (auto& worker : workers_) {
156             if (worker->workerEnv_ == nullptr) {
157                 continue;
158             }
159             napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->tid_));
160             napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(worker->priority_));
161 
162             napi_value taskId = nullptr;
163             napi_create_array(env, &taskId);
164             int32_t j = 0;
165             {
166                 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
167                 for (auto& currentId : worker->currentTaskId_) {
168                     napi_value id = NapiHelper::CreateUint32(env, currentId);
169                     napi_set_element(env, taskId, j, id);
170                     j++;
171                 }
172             }
173             napi_value threadInfo = nullptr;
174             napi_create_object(env, &threadInfo);
175             napi_set_named_property(env, threadInfo, "tid", tid);
176             napi_set_named_property(env, threadInfo, "priority", priority);
177             napi_set_named_property(env, threadInfo, "taskIds", taskId);
178             napi_set_element(env, threadInfos, i, threadInfo);
179             i++;
180         }
181     }
182     return threadInfos;
183 }
184 
GetTaskInfos(napi_env env)185 napi_value TaskManager::GetTaskInfos(napi_env env)
186 {
187     napi_value taskInfos = nullptr;
188     napi_create_array(env, &taskInfos);
189     {
190         std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
191         int32_t i = 0;
192         for (const auto& [_, task] : tasks_) {
193             if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
194                 task->taskState_ == ExecuteState::FINISHED) {
195                 continue;
196             }
197             napi_value taskInfoValue = NapiHelper::CreateObject(env);
198             napi_value taskId = NapiHelper::CreateUint32(env, task->taskId_);
199             napi_value name = nullptr;
200             napi_create_string_utf8(env, task->name_.c_str(), task->name_.size(), &name);
201             napi_set_named_property(env, taskInfoValue, "name", name);
202             ExecuteState state = task->taskState_;
203             uint64_t duration = 0;
204             if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
205                 duration = ConcurrentHelper::GetMilliseconds() - task->startTime_;
206             }
207             napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
208             napi_set_named_property(env, taskInfoValue, "taskId", taskId);
209             napi_set_named_property(env, taskInfoValue, "state", stateValue);
210             napi_value durationValue = NapiHelper::CreateUint32(env, duration);
211             napi_set_named_property(env, taskInfoValue, "duration", durationValue);
212             napi_set_element(env, taskInfos, i, taskInfoValue);
213             i++;
214         }
215     }
216     return taskInfos;
217 }
218 
UpdateExecutedInfo(uint64_t duration)219 void TaskManager::UpdateExecutedInfo(uint64_t duration)
220 {
221     totalExecTime_ += duration;
222     totalExecCount_++;
223 }
224 
ComputeSuitableThreadNum()225 uint32_t TaskManager::ComputeSuitableThreadNum()
226 {
227     uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
228     return targetNum;
229 }
230 
ComputeSuitableIdleNum()231 uint32_t TaskManager::ComputeSuitableIdleNum()
232 {
233     uint32_t targetNum = 0;
234     if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
235         // this branch is used for avoiding time-consuming tasks that may block the taskpool
236         targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
237     } else if (totalExecCount_ != 0) {
238         auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
239         uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
240         targetNum = std::min(result, GetNonIdleTaskNum());
241     }
242     return targetNum;
243 }
244 
CheckForBlockedWorkers()245 void TaskManager::CheckForBlockedWorkers()
246 {
247     // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
248     // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
249     // else we will choose the longer one
250     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
251     bool needChecking = false;
252     bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
253     uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
254     for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
255         auto worker = *iter;
256         // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
257         // if the worker is executing the longTask, we will not do the check
258         if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
259             (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
260             !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
261             continue;
262         }
263         // When executing the promise task, the worker state may not be updated and will be
264         // marked as 'BLOCKED', so we should exclude this situation.
265         // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
266         // the task like I/O in uv threads, we should also exclude this situation.
267         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
268         if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
269             if (!workerEngine->HasWaitingRequest()) {
270                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
271             } else {
272                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
273                 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
274             }
275             continue;
276         }
277 
278         HILOG_INFO("taskpool:: The worker has been marked as timeout.");
279         // If the current worker has a longTask and is not executing, we will only interrupt it.
280         if (worker->HasLongTask()) {
281             continue;
282         }
283         needChecking = true;
284         idleWorkers_.erase(worker);
285         timeoutWorkers_.insert(worker);
286     }
287     // should trigger the check when we have marked and removed workers
288     if (UNLIKELY(needChecking)) {
289         TryExpand();
290     }
291 }
292 
TryTriggerExpand()293 void TaskManager::TryTriggerExpand()
294 {
295     // post the signal to notify the monitor thread to expand
296     if (UNLIKELY(!isHandleInited_)) {
297         NotifyExecuteTask();
298         needChecking_ = true;
299         HILOG_WARN("taskpool:: the dispatchHandle_ is nullptr");
300         return;
301     }
302     uv_async_send(dispatchHandle_);
303 }
304 
305 #if defined(OHOS_PLATFORM)
306 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(pid_t tid,char * buf,uint32_t size)307 bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
308 {
309     char path[128]; // 128: buffer for path
310     pid_t pid = getpid();
311     ssize_t bytesLen = -1;
312     int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
313     if (ret < 0) {
314         HILOG_ERROR("snprintf_s failed");
315         return false;
316     }
317     int fd = open(path, O_RDONLY | O_NONBLOCK);
318     if (UNLIKELY(fd == -1)) {
319         return false;
320     }
321     bytesLen = read(fd, buf, size - 1);
322     close(fd);
323     if (bytesLen <= 0) {
324         HILOG_ERROR("taskpool:: failed to read %{public}s", path);
325         return false;
326     }
327     buf[bytesLen] = '\0';
328     return true;
329 }
330 
GetIdleWorkers()331 uint32_t TaskManager::GetIdleWorkers()
332 {
333     char buf[4096]; // 4096: buffer for thread info
334     uint32_t idleCount = 0;
335     std::unordered_set<pid_t> tids {};
336     {
337         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
338         for (auto& worker : idleWorkers_) {
339 #if defined(ENABLE_TASKPOOL_FFRT)
340             if (worker->ffrtTaskHandle_ != nullptr) {
341                 if (worker->GetWaitTime() > 0) {
342                     idleCount++;
343                 }
344                 continue;
345             }
346 #endif
347             tids.emplace(worker->tid_);
348         }
349     }
350     // The ffrt thread does not read thread info
351     for (auto tid : tids) {
352         if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
353             continue;
354         }
355         char state;
356         if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
357             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
358             return 0;
359         }
360         if (state == 'S') {
361             idleCount++;
362         }
363     }
364     return idleCount;
365 }
366 
GetIdleWorkersList(uint32_t step)367 void TaskManager::GetIdleWorkersList(uint32_t step)
368 {
369     char buf[4096]; // 4096: buffer for thread info
370     for (auto& worker : idleWorkers_) {
371 #if defined(ENABLE_TASKPOOL_FFRT)
372         if (worker->ffrtTaskHandle_ != nullptr) {
373             uint64_t workerWaitTime = worker->GetWaitTime();
374             bool isWorkerLoopActive = worker->IsLoopActive();
375             if (workerWaitTime == 0) {
376                 continue;
377             }
378             uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
379                 std::chrono::steady_clock::now().time_since_epoch()).count());
380             if (!isWorkerLoopActive) {
381                 freeList_.emplace_back(worker);
382             } else {
383                 // worker uv alive, and will be free in 2 intervals if not wake
384                 WorkerAliveAndReport(worker);
385             }
386             continue;
387         }
388 #endif
389         if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
390             continue;
391         }
392         char state;
393         uint64_t utime;
394         if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
395             &state, sizeof(state), &utime) != 2) { // 2: state and utime
396             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
397             return;
398         }
399         if (state != 'S' || utime != worker->lastCpuTime_) {
400             worker->idleCount_ = 0;
401             worker->lastCpuTime_ = utime;
402             continue;
403         }
404         if (++worker->idleCount_ >= IDLE_THRESHOLD) {
405             freeList_.emplace_back(worker);
406         }
407     }
408 }
409 
TriggerShrink(uint32_t step)410 void TaskManager::TriggerShrink(uint32_t step)
411 {
412     GetIdleWorkersList(step);
413     step = std::min(step, static_cast<uint32_t>(freeList_.size()));
414     uint32_t count = 0;
415     for (size_t i = 0; i < freeList_.size(); i++) {
416         auto worker = freeList_[i];
417         if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
418             continue;
419         }
420         auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
421         if (idleTime < MAX_IDLE_TIME || worker->HasRunningTasks()) {
422             continue;
423         }
424         idleWorkers_.erase(worker);
425         HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
426         worker->PostReleaseSignal();
427         if (++count == step) {
428             break;
429         }
430     }
431     freeList_.clear();
432 }
433 #else
GetIdleWorkers()434 uint32_t TaskManager::GetIdleWorkers()
435 {
436     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
437     return idleWorkers_.size();
438 }
439 
TriggerShrink(uint32_t step)440 void TaskManager::TriggerShrink(uint32_t step)
441 {
442     for (uint32_t i = 0; i < step; i++) {
443         // try to free the worker that idle time meets the requirement
444         auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker* worker) {
445             auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
446             return idleTime > MAX_IDLE_TIME && !worker->HasRunningTasks() && !worker->HasLongTask();
447         });
448         // remove it from all sets
449         if (iter != idleWorkers_.end()) {
450             auto worker = *iter;
451             idleWorkers_.erase(worker);
452             HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
453             worker->PostReleaseSignal();
454         }
455     }
456 }
457 #endif
458 
NotifyShrink(uint32_t targetNum)459 void TaskManager::NotifyShrink(uint32_t targetNum)
460 {
461     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
462     uint32_t workerCount = workers_.size();
463     uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
464     CheckTasksAndReportHisysEvent();
465     // update the maxThreads_ periodically
466     maxThreads_ = ConcurrentHelper::GetMaxThreads();
467     if (minThread == 0) {
468         HILOG_INFO("taskpool:: the system now is under low memory");
469     }
470     if (workerCount > minThread && workerCount > targetNum) {
471         targetNum = std::max(minThread, targetNum);
472         uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
473         TriggerShrink(step);
474     }
475     // remove all timeout workers
476     for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
477         auto worker = *iter;
478         if (workers_.find(worker) == workers_.end()) {
479             HILOG_WARN("taskpool:: current worker maybe release");
480             iter = timeoutWorkers_.erase(iter);
481         } else if (!worker->HasRunningTasks()) {
482             HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", worker->tid_);
483             worker->PostReleaseSignal();
484             timeoutWorkers_.erase(iter++);
485             return;
486         } else {
487             iter++;
488         }
489     }
490     uint32_t idleNum = idleWorkers_.size();
491     // System memory state is moderate and the worker has exeuted tasks, we will try to release it
492     if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
493         auto worker = *(idleWorkers_.begin());
494         // worker that has longTask should not be released
495         if (worker == nullptr || worker->HasLongTask()) {
496             return;
497         }
498         if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
499             TriggerShrink(DEFAULT_MIN_THREADS);
500             return;
501         }
502     }
503 
504     // Create a worker for performance
505     if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
506         CreateWorkers(hostEnv_);
507     }
508     // stop the timer
509     if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
510         suspend_ = true;
511         uv_timer_stop(balanceTimer_);
512         HILOG_DEBUG("taskpool:: timer will be suspended");
513     }
514 }
515 
TriggerLoadBalance(const uv_timer_t * req)516 void TaskManager::TriggerLoadBalance([[maybe_unused]] const uv_timer_t* req)
517 {
518     TaskManager& taskManager = TaskManager::GetInstance();
519     taskManager.CheckForBlockedWorkers();
520     uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
521     taskManager.NotifyShrink(targetNum);
522     taskManager.CountTraceForWorker();
523 }
524 
DispatchAndTryExpand(const uv_async_t * req)525 void TaskManager::DispatchAndTryExpand([[maybe_unused]] const uv_async_t* req)
526 {
527     TaskManager& taskManager = TaskManager::GetInstance();
528     taskManager.DispatchAndTryExpandInner();
529 }
530 
DispatchAndTryExpandInner()531 void TaskManager::DispatchAndTryExpandInner()
532 {
533     // dispatch task in the TaskPoolManager thread
534     NotifyExecuteTask();
535     // do not check the worker idleTime first
536     TryExpandWithCheckIdle<false>();
537     if (!timerTriggered_ && GetNonIdleTaskNum() != 0) {
538         timerTriggered_ = true;
539         // use timer to check worker idle time after dispatch task
540         // if worker has been blocked or fd is broken, taskpool can expand automatically
541         uv_timer_start(expandTimer_, reinterpret_cast<uv_timer_cb>(TryExpand), TRIGGER_EXPAND_INTERVAL, 0);
542     }
543 }
544 
TryExpand(const uv_timer_t * req)545 void TaskManager::TryExpand([[maybe_unused]] const uv_timer_t* req)
546 {
547     TaskManager& taskManager = TaskManager::GetInstance();
548     taskManager.TryExpandWithCheckIdle<true>();
549 }
550 
551 template <bool needCheckIdle>
TryExpandWithCheckIdle()552 void TaskManager::TryExpandWithCheckIdle()
553 {
554     if (GetNonIdleTaskNum() == 0) {
555         return;
556     }
557 
558     needChecking_ = false;
559     timerTriggered_ = false;
560     uint32_t targetNum = ComputeSuitableIdleNum();
561     uint32_t workerCount = 0;
562     uint32_t idleCount = 0;
563     uint32_t timeoutWorkers = 0;
564     {
565         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
566         workerCount = workers_.size();
567         timeoutWorkers = timeoutWorkers_.size();
568         if constexpr (needCheckIdle) {
569             uint64_t currTime = ConcurrentHelper::GetMilliseconds();
570             idleCount = static_cast<uint32_t>(std::count_if(idleWorkers_.begin(), idleWorkers_.end(),
571                 [currTime](const auto& worker) {
572                     return worker->IsRunnable(currTime);
573                 }));
574         } else {
575             idleCount = idleWorkers_.size();
576         }
577     }
578     uint32_t maxThreads = std::max(maxThreads_, DEFAULT_THREADS);
579     maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
580     if (workerCount < maxThreads && idleCount < targetNum) {
581         // Prevent the total number of workers from exceeding maxThreads
582         uint32_t step = std::min(std::min(maxThreads, targetNum) - idleCount, maxThreads - workerCount);
583         step = step >= expandingCount_ ? step - expandingCount_ : 0;
584         if (step == 0) {
585             return;
586         }
587         CreateWorkers(hostEnv_, step);
588         HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
589             maxThreads, step, GetThreadNum());
590     }
591     if (UNLIKELY(suspend_)) {
592         suspend_ = false;
593         uv_timer_again(balanceTimer_);
594     }
595 }
596 
RunTaskManager()597 void TaskManager::RunTaskManager()
598 {
599     loop_ = uv_loop_new();
600     if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
601         HILOG_FATAL("taskpool:: new loop failed.");
602         return;
603     }
604     ConcurrentHelper::UvHandleInit(loop_, dispatchHandle_, TaskManager::DispatchAndTryExpand);
605     balanceTimer_ = new uv_timer_t;
606     uv_timer_init(loop_, balanceTimer_);
607     uv_timer_start(balanceTimer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
608 
609     expandTimer_ = new uv_timer_t;
610     uv_timer_init(loop_, expandTimer_);
611 
612     isHandleInited_ = true;
613 #if defined IOS_PLATFORM || defined MAC_PLATFORM
614     pthread_setname_np("OS_TaskManager");
615 #else
616     pthread_setname_np(pthread_self(), "OS_TaskManager");
617 #endif
618     if (UNLIKELY(needChecking_)) {
619         needChecking_ = false;
620         uv_async_send(dispatchHandle_);
621     }
622     uv_run(loop_, UV_RUN_DEFAULT);
623     if (loop_ != nullptr) {
624         uv_loop_delete(loop_);
625     }
626 }
627 
CancelTask(napi_env env,uint32_t taskId)628 void TaskManager::CancelTask(napi_env env, uint32_t taskId)
629 {
630     // 1. Cannot find taskInfo by executeId, throw error
631     // 2. Find executing taskInfo, skip it
632     // 3. Find waiting taskInfo, cancel it
633     // 4. Find canceled taskInfo, skip it
634     std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
635     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
636     HITRACE_HELPER_METER_NAME(strTrace);
637     Task* task = GetTask(taskId);
638     if (task == nullptr) {
639         std::string errMsg = "taskpool:: the task may not exist";
640         HILOG_ERROR("%{public}s", errMsg.c_str());
641         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
642         return;
643     }
644     if (task->taskState_ == ExecuteState::CANCELED) {
645         HILOG_DEBUG("taskpool:: task has been canceled");
646         return;
647     }
648     if (task->IsGroupCommonTask()) {
649         // when task is a group common task, still check the state
650         if (task->currentTaskInfo_ == nullptr || task->taskState_ == ExecuteState::NOT_FOUND ||
651             task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
652             std::string errMsg = "taskpool:: task is not executed or has been executed";
653             HILOG_ERROR("%{public}s", errMsg.c_str());
654             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
655             return;
656         }
657         TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
658         if (taskGroup == nullptr) {
659             return;
660         }
661         return taskGroup->CancelGroupTask(env, task->taskId_);
662     }
663     if (task->IsPeriodicTask()) {
664         task->taskState_.exchange(ExecuteState::CANCELED);
665         return;
666     }
667     if (task->IsSeqRunnerTask()) {
668         CancelSeqRunnerTask(env, task);
669         return;
670     }
671     if (task->IsAsyncRunnerTask()) {
672         AsyncRunnerManager::GetInstance().CancelAsyncRunnerTask(env, task);
673         return;
674     }
675     ExecuteState state = ExecuteState::NOT_FOUND;
676     {
677         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
678         if (task->taskState_ == ExecuteState::CANCELED) {
679             HILOG_DEBUG("taskpool:: task has been canceled");
680             return;
681         }
682         if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
683             task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
684             task->taskState_ == ExecuteState::ENDING) {
685             std::string errMsg = "taskpool:: task is not executed or has been executed";
686             HILOG_ERROR("%{public}s", errMsg.c_str());
687             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
688             return;
689         }
690         state = task->taskState_.exchange(ExecuteState::CANCELED);
691     }
692     if (task->IsSameEnv(env)) {
693         task->CancelInner(state);
694         return;
695     }
696     CancelTaskMessage* message = new CancelTaskMessage(state, task->taskId_);
697     task->TriggerCancel(message);
698 }
699 
CancelSeqRunnerTask(napi_env env,Task * task)700 void TaskManager::CancelSeqRunnerTask(napi_env env, Task* task)
701 {
702     {
703         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
704         if (task->taskState_ != ExecuteState::FINISHED) {
705             task->taskState_ = ExecuteState::CANCELED;
706             return;
707         }
708     }
709     std::string errMsg = "taskpool:: sequenceRunner task has been executed";
710     HILOG_ERROR("%{public}s", errMsg.c_str());
711     ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
712 }
713 
NotifyWorkerIdle(Worker * worker)714 void TaskManager::NotifyWorkerIdle(Worker* worker)
715 {
716     {
717         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
718         if (worker->state_ == WorkerState::BLOCKED) {
719             return;
720         }
721         idleWorkers_.insert(worker);
722     }
723     if (GetTaskNum() != 0) {
724         NotifyExecuteTask();
725     }
726     CountTraceForWorker();
727 }
728 
NotifyWorkerCreated(Worker * worker)729 void TaskManager::NotifyWorkerCreated(Worker* worker)
730 {
731     NotifyWorkerIdle(worker);
732     expandingCount_--;
733 }
734 
NotifyWorkerAdded(Worker * worker)735 void TaskManager::NotifyWorkerAdded(Worker* worker)
736 {
737     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
738     workers_.insert(worker);
739     HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
740 }
741 
NotifyWorkerRunning(Worker * worker)742 void TaskManager::NotifyWorkerRunning(Worker* worker)
743 {
744     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
745     idleWorkers_.erase(worker);
746     CountTraceForWorker();
747 }
748 
GetRunningWorkers()749 uint32_t TaskManager::GetRunningWorkers()
750 {
751     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
752     return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
753         return worker->HasRunningTasks();
754     });
755 }
756 
GetTimeoutWorkers()757 uint32_t TaskManager::GetTimeoutWorkers()
758 {
759     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
760     return timeoutWorkers_.size();
761 }
762 
GetTaskNum()763 uint32_t TaskManager::GetTaskNum()
764 {
765     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
766     uint32_t sum = 0;
767     for (const auto& elements : taskQueues_) {
768         sum += elements->GetTaskNum();
769     }
770     return sum;
771 }
772 
GetNonIdleTaskNum()773 uint32_t TaskManager::GetNonIdleTaskNum()
774 {
775     return nonIdleTaskNum_;
776 }
777 
IncreaseNumIfNoIdle(Priority priority)778 void TaskManager::IncreaseNumIfNoIdle(Priority priority)
779 {
780     if (priority != Priority::IDLE) {
781         ++nonIdleTaskNum_;
782     }
783 }
784 
DecreaseNumIfNoIdle(Priority priority)785 void TaskManager::DecreaseNumIfNoIdle(Priority priority)
786 {
787     if (priority != Priority::IDLE) {
788         --nonIdleTaskNum_;
789     }
790 }
791 
GetThreadNum()792 uint32_t TaskManager::GetThreadNum()
793 {
794     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
795     return workers_.size();
796 }
797 
EnqueueTaskId(uint32_t taskId,Priority priority)798 void TaskManager::EnqueueTaskId(uint32_t taskId, Priority priority)
799 {
800     {
801         std::lock_guard<std::mutex> lock(taskQueuesMutex_);
802         IncreaseNumIfNoIdle(priority);
803         taskQueues_[priority]->EnqueueTaskId(taskId);
804     }
805     TryTriggerExpand();
806     Task* task = GetTask(taskId);
807     if (task == nullptr) {
808         HILOG_FATAL("taskpool:: task is nullptr");
809         return;
810     }
811     task->IncreaseTaskRefCount();
812     if (task->onEnqueuedCallBackInfo_ != nullptr) {
813         task->ExecuteListenerCallback(task->onEnqueuedCallBackInfo_);
814     }
815 }
816 
EraseWaitingTaskId(uint32_t taskId,Priority priority)817 bool TaskManager::EraseWaitingTaskId(uint32_t taskId, Priority priority)
818 {
819     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
820     if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
821         HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
822         return false;
823     }
824     return true;
825 }
826 
DequeueTaskId()827 std::pair<uint32_t, Priority> TaskManager::DequeueTaskId()
828 {
829     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
830     auto& highTaskQueue = taskQueues_[Priority::HIGH];
831     if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
832         highPrioExecuteCount_++;
833         return GetTaskByPriority(highTaskQueue, Priority::HIGH);
834     }
835     highPrioExecuteCount_ = 0;
836 
837     auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
838     if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
839         mediumPrioExecuteCount_++;
840         return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
841     }
842     mediumPrioExecuteCount_ = 0;
843 
844     auto& lowTaskQueue = taskQueues_[Priority::LOW];
845     if (!lowTaskQueue->IsEmpty()) {
846         return GetTaskByPriority(lowTaskQueue, Priority::LOW);
847     }
848 
849     auto& idleTaskQueue = taskQueues_[Priority::IDLE];
850     if (highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() && IsChooseIdle()) {
851         return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
852     }
853     return std::make_pair(0, Priority::LOW);
854 }
855 
IsChooseIdle()856 bool TaskManager::IsChooseIdle()
857 {
858     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
859     for (auto& worker : workers_) {
860         if (worker->state_ == WorkerState::IDLE) {
861             // If worker->state_ is WorkerState::IDLE, it means that the worker is free
862             continue;
863         }
864         // If there is a worker running a task, do not take the idle task.
865         return false;
866     }
867     // Only when all workers are free, will idle task be taken.
868     return true;
869 }
870 
GetTaskByPriority(const std::unique_ptr<ExecuteQueue> & taskQueue,Priority priority)871 std::pair<uint32_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
872     Priority priority)
873 {
874     uint32_t taskId = taskQueue->DequeueTaskId();
875     if (IsDependendByTaskId(taskId)) {
876         EnqueuePendingTaskInfo(taskId, priority);
877         return std::make_pair(0, priority);
878     }
879     DecreaseNumIfNoIdle(priority);
880     preDequeneTime_ = ConcurrentHelper::GetMilliseconds();
881     return std::make_pair(taskId, priority);
882 }
883 
NotifyExecuteTask()884 void TaskManager::NotifyExecuteTask()
885 {
886     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
887     if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
888         // When there are only idle tasks and workers executing them, it is not triggered
889         return;
890     }
891 
892     for (auto& worker : idleWorkers_) {
893         worker->NotifyExecuteTask();
894     }
895 }
896 
InitTaskManager(napi_env env)897 void TaskManager::InitTaskManager(napi_env env)
898 {
899     HITRACE_HELPER_METER_NAME("InitTaskManager");
900     if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
901 #if defined(ENABLE_TASKPOOL_FFRT)
902         globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
903         if (!globalEnableFfrtFlag_) {
904             UpdateSystemAppFlag();
905             if (IsSystemApp()) {
906                 disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
907             }
908         }
909         if (EnableFfrt()) {
910             HILOG_INFO("taskpool:: apps use ffrt");
911         } else {
912             HILOG_INFO("taskpool:: apps do not use ffrt");
913         }
914 #endif
915 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
916         mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
917             OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
918 #endif
919         auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
920         if (mainThreadEngine == nullptr) {
921             HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
922             return;
923         }
924         hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
925         // Add a reserved thread for taskpool
926         CreateWorkers(hostEnv_);
927         // Create a timer to manage worker threads
928         std::thread workerManager([this] {this->RunTaskManager();});
929         workerManager.detach();
930     }
931 }
932 
CreateWorkers(napi_env env,uint32_t num)933 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
934 {
935     HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
936     for (uint32_t i = 0; i < num; i++) {
937         expandingCount_++;
938         auto worker = Worker::WorkerConstructor(env);
939         NotifyWorkerAdded(worker);
940     }
941     CountTraceForWorker();
942 }
943 
RemoveWorker(Worker * worker)944 void TaskManager::RemoveWorker(Worker* worker)
945 {
946     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
947     idleWorkers_.erase(worker);
948     timeoutWorkers_.erase(worker);
949     workers_.erase(worker);
950 }
951 
RestoreWorker(Worker * worker)952 void TaskManager::RestoreWorker(Worker* worker)
953 {
954     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
955     if (UNLIKELY(suspend_)) {
956         suspend_ = false;
957         uv_timer_again(balanceTimer_);
958     }
959     if (worker->state_ == WorkerState::BLOCKED) {
960         // since the worker is blocked, we should add it to the timeout set
961         timeoutWorkers_.insert(worker);
962         return;
963     }
964     // Since the worker may be executing some tasks in IO thread, we should add it to the
965     // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
966     HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
967     idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
968     if (GetTaskNum() != 0) {
969         NotifyExecuteTask();
970     }
971 }
972 
973 // ---------------------------------- SendData ---------------------------------------
RegisterCallback(napi_env env,uint32_t taskId,std::shared_ptr<CallbackInfo> callbackInfo)974 void TaskManager::RegisterCallback(napi_env env, uint32_t taskId, std::shared_ptr<CallbackInfo> callbackInfo)
975 {
976     std::lock_guard<std::mutex> lock(callbackMutex_);
977     callbackTable_[taskId] = callbackInfo;
978 }
979 
GetCallbackInfo(uint32_t taskId)980 std::shared_ptr<CallbackInfo> TaskManager::GetCallbackInfo(uint32_t taskId)
981 {
982     std::lock_guard<std::mutex> lock(callbackMutex_);
983     auto iter = callbackTable_.find(taskId);
984     if (iter == callbackTable_.end() || iter->second == nullptr) {
985         HILOG_ERROR("taskpool:: the callback does not exist");
986         return nullptr;
987     }
988     return iter->second;
989 }
990 
IncreaseRefCount(uint32_t taskId)991 void TaskManager::IncreaseRefCount(uint32_t taskId)
992 {
993     if (taskId == 0) { // do not support func
994         return;
995     }
996     std::lock_guard<std::mutex> lock(callbackMutex_);
997     auto iter = callbackTable_.find(taskId);
998     if (iter == callbackTable_.end() || iter->second == nullptr) {
999         return;
1000     }
1001     iter->second->refCount++;
1002 }
1003 
DecreaseRefCount(napi_env env,uint32_t taskId)1004 void TaskManager::DecreaseRefCount(napi_env env, uint32_t taskId)
1005 {
1006     if (taskId == 0) { // do not support func
1007         return;
1008     }
1009     std::lock_guard<std::mutex> lock(callbackMutex_);
1010     auto iter = callbackTable_.find(taskId);
1011     if (iter == callbackTable_.end() || iter->second == nullptr) {
1012         return;
1013     }
1014 
1015     auto task = GetTask(taskId);
1016     if (task != nullptr && !task->IsValid()) {
1017         callbackTable_.erase(iter);
1018         return;
1019     }
1020 
1021     iter->second->refCount--;
1022     if (iter->second->refCount == 0) {
1023         callbackTable_.erase(iter);
1024     }
1025 }
1026 
ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo> & callbackInfo)1027 void TaskManager::ResetCallbackInfoWorker(const std::shared_ptr<CallbackInfo>& callbackInfo)
1028 {
1029     std::lock_guard<std::mutex> lock(callbackMutex_);
1030     callbackInfo->worker = nullptr;
1031 }
1032 
NotifyCallbackExecute(napi_env env,TaskResultInfo * resultInfo,Task * task)1033 napi_value TaskManager::NotifyCallbackExecute(napi_env env, TaskResultInfo* resultInfo, Task* task)
1034 {
1035     HILOG_DEBUG("taskpool:: task:%{public}s NotifyCallbackExecute", std::to_string(task->taskId_).c_str());
1036     std::lock_guard<std::mutex> lock(callbackMutex_);
1037     auto iter = callbackTable_.find(task->taskId_);
1038     if (iter == callbackTable_.end() || iter->second == nullptr) {
1039         HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
1040         ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
1041         delete resultInfo;
1042         return nullptr;
1043     }
1044     Worker* worker = static_cast<Worker*>(task->worker_);
1045     worker->Enqueue(task->env_, resultInfo);
1046     auto callbackInfo = iter->second;
1047     callbackInfo->refCount++;
1048     callbackInfo->worker = worker;
1049     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
1050     workerEngine->IncreaseListeningCounter();
1051 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1052     if (task->IsMainThreadTask()) {
1053         HITRACE_HELPER_METER_NAME("NotifyCallbackExecute: PostTask");
1054         auto onCallbackTask = [callbackInfo]() {
1055             TaskPool::ExecuteCallbackTask(callbackInfo.get());
1056         };
1057         TaskManager::GetInstance().PostTask(onCallbackTask, "TaskPoolOnCallbackTask", worker->priority_);
1058     } else {
1059         callbackInfo->onCallbackSignal->data = callbackInfo.get();
1060         uv_async_send(callbackInfo->onCallbackSignal);
1061     }
1062 #else
1063     callbackInfo->onCallbackSignal->data = callbackInfo.get();
1064     uv_async_send(callbackInfo->onCallbackSignal);
1065 #endif
1066     return nullptr;
1067 }
1068 
GetMessageQueue(const uv_async_t * req)1069 MsgQueue* TaskManager::GetMessageQueue(const uv_async_t* req)
1070 {
1071     std::lock_guard<std::mutex> lock(callbackMutex_);
1072     auto info = static_cast<CallbackInfo*>(req->data);
1073     if (info == nullptr || info->worker == nullptr) {
1074         HILOG_WARN("taskpool:: info or worker is nullptr");
1075         return nullptr;
1076     }
1077     auto worker = info->worker;
1078     MsgQueue* queue = nullptr;
1079     worker->Dequeue(info->hostEnv, queue);
1080     return queue;
1081 }
1082 
GetMessageQueueFromCallbackInfo(CallbackInfo * callbackInfo)1083 MsgQueue* TaskManager::GetMessageQueueFromCallbackInfo(CallbackInfo* callbackInfo)
1084 {
1085     std::lock_guard<std::mutex> lock(callbackMutex_);
1086     if (callbackInfo == nullptr || callbackInfo->worker == nullptr) {
1087         HILOG_WARN("taskpool:: callbackInfo or worker is nullptr");
1088         return nullptr;
1089     }
1090     auto worker = callbackInfo->worker;
1091     MsgQueue* queue = nullptr;
1092     worker->Dequeue(callbackInfo->hostEnv, queue);
1093     return queue;
1094 }
1095 // ---------------------------------- SendData ---------------------------------------
1096 
NotifyDependencyTaskInfo(uint32_t taskId)1097 void TaskManager::NotifyDependencyTaskInfo(uint32_t taskId)
1098 {
1099     HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
1100     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
1101     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1102     auto iter = dependentTaskInfos_.find(taskId);
1103     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1104         HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1105         return;
1106     }
1107     for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1108         auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1109         RemoveDependencyById(taskId, *taskIdIter);
1110         taskIdIter = iter->second.erase(taskIdIter);
1111         if (taskInfo.first != 0) {
1112             EnqueueTaskId(taskInfo.first, taskInfo.second);
1113         }
1114     }
1115 }
1116 
RemoveDependencyById(uint32_t dependentTaskId,uint32_t taskId)1117 void TaskManager::RemoveDependencyById(uint32_t dependentTaskId, uint32_t taskId)
1118 {
1119     HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
1120     // remove dependency after task execute
1121     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1122     auto dependTaskIter = dependTaskInfos_.find(taskId);
1123     if (dependTaskIter != dependTaskInfos_.end()) {
1124         auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
1125         if (dependTaskInnerIter != dependTaskIter->second.end()) {
1126             dependTaskIter->second.erase(dependTaskInnerIter);
1127         }
1128     }
1129 }
1130 
IsDependendByTaskId(uint32_t taskId)1131 bool TaskManager::IsDependendByTaskId(uint32_t taskId)
1132 {
1133     std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1134     auto iter = dependTaskInfos_.find(taskId);
1135     if (iter == dependTaskInfos_.end() || iter->second.empty()) {
1136         return false;
1137     }
1138     return true;
1139 }
1140 
IsDependentByTaskId(uint32_t dependentTaskId)1141 bool TaskManager::IsDependentByTaskId(uint32_t dependentTaskId)
1142 {
1143     std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1144     auto iter = dependentTaskInfos_.find(dependentTaskId);
1145     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1146         return false;
1147     }
1148     return true;
1149 }
1150 
StoreTaskDependency(uint32_t taskId,std::set<uint32_t> taskIdSet)1151 bool TaskManager::StoreTaskDependency(uint32_t taskId, std::set<uint32_t> taskIdSet)
1152 {
1153     HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
1154     StoreDependentTaskInfo(taskIdSet, taskId);
1155     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1156     auto iter = dependTaskInfos_.find(taskId);
1157     if (iter == dependTaskInfos_.end()) {
1158         for (const auto& dependentId : taskIdSet) {
1159             auto idIter = dependTaskInfos_.find(dependentId);
1160             if (idIter == dependTaskInfos_.end()) {
1161                 continue;
1162             }
1163             if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
1164                 return false;
1165             }
1166         }
1167         dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
1168         return true;
1169     }
1170 
1171     for (const auto& dependentId : iter->second) {
1172         auto idIter = dependTaskInfos_.find(dependentId);
1173         if (idIter == dependTaskInfos_.end()) {
1174             continue;
1175         }
1176         if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
1177             return false;
1178         }
1179     }
1180     iter->second.insert(taskIdSet.begin(), taskIdSet.end());
1181     return true;
1182 }
1183 
CheckCircularDependency(std::set<uint32_t> dependentIdSet,std::set<uint32_t> idSet,uint32_t taskId)1184 bool TaskManager::CheckCircularDependency(std::set<uint32_t> dependentIdSet, std::set<uint32_t> idSet, uint32_t taskId)
1185 {
1186     for (const auto& id : idSet) {
1187         if (id == taskId) {
1188             return false;
1189         }
1190         auto iter = dependentIdSet.find(id);
1191         if (iter != dependentIdSet.end()) {
1192             continue;
1193         }
1194         auto dIter = dependTaskInfos_.find(id);
1195         if (dIter == dependTaskInfos_.end()) {
1196             continue;
1197         }
1198         if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
1199             return false;
1200         }
1201     }
1202     return true;
1203 }
1204 
RemoveTaskDependency(uint32_t taskId,uint32_t dependentId)1205 bool TaskManager::RemoveTaskDependency(uint32_t taskId, uint32_t dependentId)
1206 {
1207     HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
1208     RemoveDependentTaskInfo(dependentId, taskId);
1209     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1210     auto iter = dependTaskInfos_.find(taskId);
1211     if (iter == dependTaskInfos_.end()) {
1212         return false;
1213     }
1214     auto dependIter = iter->second.find(dependentId);
1215     if (dependIter ==  iter->second.end()) {
1216         return false;
1217     }
1218     iter->second.erase(dependIter);
1219     return true;
1220 }
1221 
EnqueuePendingTaskInfo(uint32_t taskId,Priority priority)1222 void TaskManager::EnqueuePendingTaskInfo(uint32_t taskId, Priority priority)
1223 {
1224     if (taskId == 0) {
1225         return;
1226     }
1227     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1228     pendingTaskInfos_.emplace(taskId, priority);
1229 }
1230 
DequeuePendingTaskInfo(uint32_t taskId)1231 std::pair<uint32_t, Priority> TaskManager::DequeuePendingTaskInfo(uint32_t taskId)
1232 {
1233     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1234     if (pendingTaskInfos_.empty()) {
1235         return std::make_pair(0, Priority::DEFAULT);
1236     }
1237     std::pair<uint32_t, Priority> result;
1238     for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
1239         if (it->first == taskId) {
1240             result = std::make_pair(it->first, it->second);
1241             it = pendingTaskInfos_.erase(it);
1242             break;
1243         }
1244     }
1245     return result;
1246 }
1247 
RemovePendingTaskInfo(uint32_t taskId)1248 void TaskManager::RemovePendingTaskInfo(uint32_t taskId)
1249 {
1250     HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
1251     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1252     pendingTaskInfos_.erase(taskId);
1253 }
1254 
StoreDependentTaskInfo(std::set<uint32_t> dependentTaskIdSet,uint32_t taskId)1255 void TaskManager::StoreDependentTaskInfo(std::set<uint32_t> dependentTaskIdSet, uint32_t taskId)
1256 {
1257     HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
1258     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1259     for (const auto& id : dependentTaskIdSet) {
1260         auto iter = dependentTaskInfos_.find(id);
1261         if (iter == dependentTaskInfos_.end()) {
1262             std::set<uint32_t> set{taskId};
1263             dependentTaskInfos_.emplace(id, std::move(set));
1264         } else {
1265             iter->second.emplace(taskId);
1266         }
1267     }
1268 }
1269 
RemoveDependentTaskInfo(uint32_t dependentTaskId,uint32_t taskId)1270 void TaskManager::RemoveDependentTaskInfo(uint32_t dependentTaskId, uint32_t taskId)
1271 {
1272     HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
1273     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1274     auto iter = dependentTaskInfos_.find(dependentTaskId);
1275     if (iter == dependentTaskInfos_.end()) {
1276         return;
1277     }
1278     auto taskIter = iter->second.find(taskId);
1279     if (taskIter == iter->second.end()) {
1280         return;
1281     }
1282     iter->second.erase(taskIter);
1283 }
1284 
GetTaskDependInfoToString(uint32_t taskId)1285 std::string TaskManager::GetTaskDependInfoToString(uint32_t taskId)
1286 {
1287     std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1288     std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
1289     auto iter = dependTaskInfos_.find(taskId);
1290     if (iter != dependTaskInfos_.end()) {
1291         for (const auto& id : iter->second) {
1292             str += " " + std::to_string(id);
1293         }
1294     }
1295     return str;
1296 }
1297 
StoreTaskDuration(uint32_t taskId,uint64_t totalDuration,uint64_t cpuDuration)1298 void TaskManager::StoreTaskDuration(uint32_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
1299 {
1300     HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
1301     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1302     auto iter = taskDurationInfos_.find(taskId);
1303     if (iter == taskDurationInfos_.end()) {
1304         std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
1305         taskDurationInfos_.emplace(taskId, std::move(durationData));
1306     } else {
1307         if (totalDuration != 0) {
1308             iter->second.first = totalDuration;
1309         }
1310         if (cpuDuration != 0) {
1311             iter->second.second = cpuDuration;
1312         }
1313     }
1314 }
1315 
GetTaskDuration(uint32_t taskId,std::string durationType)1316 uint64_t TaskManager::GetTaskDuration(uint32_t taskId, std::string durationType)
1317 {
1318     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1319     auto iter = taskDurationInfos_.find(taskId);
1320     if (iter == taskDurationInfos_.end()) {
1321         return 0;
1322     }
1323     if (durationType == TASK_TOTAL_TIME) {
1324         return iter->second.first;
1325     } else if (durationType == TASK_CPU_TIME) {
1326         return iter->second.second;
1327     } else if (iter->second.first == 0) {
1328         return 0;
1329     }
1330     return iter->second.first - iter->second.second;
1331 }
1332 
RemoveTaskDuration(uint32_t taskId)1333 void TaskManager::RemoveTaskDuration(uint32_t taskId)
1334 {
1335     HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
1336     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1337     auto iter = taskDurationInfos_.find(taskId);
1338     if (iter != taskDurationInfos_.end()) {
1339         taskDurationInfos_.erase(iter);
1340     }
1341 }
1342 
StoreLongTaskInfo(uint32_t taskId,Worker * worker)1343 void TaskManager::StoreLongTaskInfo(uint32_t taskId, Worker* worker)
1344 {
1345     std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1346     longTasksMap_.emplace(taskId, worker);
1347 }
1348 
RemoveLongTaskInfo(uint32_t taskId)1349 void TaskManager::RemoveLongTaskInfo(uint32_t taskId)
1350 {
1351     std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1352     longTasksMap_.erase(taskId);
1353 }
1354 
GetLongTaskInfo(uint32_t taskId)1355 Worker* TaskManager::GetLongTaskInfo(uint32_t taskId)
1356 {
1357     std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
1358     auto iter = longTasksMap_.find(taskId);
1359     return iter != longTasksMap_.end() ? iter->second : nullptr;
1360 }
1361 
TerminateTask(uint32_t taskId)1362 void TaskManager::TerminateTask(uint32_t taskId)
1363 {
1364     HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
1365     auto worker = GetLongTaskInfo(taskId);
1366     if (UNLIKELY(worker == nullptr)) {
1367         return;
1368     }
1369     worker->TerminateTask(taskId);
1370     RemoveLongTaskInfo(taskId);
1371 }
1372 
ReleaseTaskData(napi_env env,Task * task,bool shouldDeleteTask)1373 void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
1374 {
1375     uint32_t taskId = task->taskId_;
1376     if (shouldDeleteTask) {
1377         RemoveTask(taskId);
1378     }
1379 
1380     task->ReleaseData();
1381     task->CancelPendingTask(env);
1382 
1383     task->ClearDelayedTimers();
1384 
1385     if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
1386         return;
1387     }
1388     DecreaseRefCount(env, taskId);
1389     RemoveTaskDuration(taskId);
1390     RemovePendingTaskInfo(taskId);
1391     ReleaseCallBackInfo(task);
1392     {
1393         std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1394         for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
1395             if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
1396                 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
1397             } else {
1398                 ++dependentTaskIter;
1399             }
1400         }
1401     }
1402     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1403     auto dependTaskIter = dependTaskInfos_.find(taskId);
1404     if (dependTaskIter != dependTaskInfos_.end()) {
1405         dependTaskInfos_.erase(dependTaskIter);
1406     }
1407 }
1408 
ReleaseCallBackInfo(Task * task)1409 void TaskManager::ReleaseCallBackInfo(Task* task)
1410 {
1411     HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
1412     std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
1413     if (task->onEnqueuedCallBackInfo_ != nullptr) {
1414         delete task->onEnqueuedCallBackInfo_;
1415         task->onEnqueuedCallBackInfo_ = nullptr;
1416     }
1417 
1418     if (task->onStartExecutionCallBackInfo_ != nullptr) {
1419         delete task->onStartExecutionCallBackInfo_;
1420         task->onStartExecutionCallBackInfo_ = nullptr;
1421     }
1422 
1423     if (task->onExecutionFailedCallBackInfo_ != nullptr) {
1424         delete task->onExecutionFailedCallBackInfo_;
1425         task->onExecutionFailedCallBackInfo_ = nullptr;
1426     }
1427 
1428     if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
1429         delete task->onExecutionSucceededCallBackInfo_;
1430         task->onExecutionSucceededCallBackInfo_ = nullptr;
1431     }
1432 
1433 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1434     if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
1435         if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) {
1436             ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1437         } else {
1438             delete task->onStartExecutionSignal_;
1439             task->onStartExecutionSignal_ = nullptr;
1440         }
1441     }
1442 #else
1443     if (task->onStartExecutionSignal_ != nullptr) {
1444         if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) {
1445             ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1446         } else {
1447             delete task->onStartExecutionSignal_;
1448             task->onStartExecutionSignal_ = nullptr;
1449         }
1450     }
1451 #endif
1452 }
1453 
StoreTask(Task * task)1454 void TaskManager::StoreTask(Task* task)
1455 {
1456     uint64_t id = reinterpret_cast<uint64_t>(task);
1457     uint32_t taskId = CalculateTaskId(id);
1458     std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1459     while (tasks_.find(taskId) != tasks_.end()) {
1460         id++;
1461         taskId = CalculateTaskId(id);
1462     }
1463     task->SetTaskId(taskId);
1464     tasks_.emplace(taskId, task);
1465 }
1466 
RemoveTask(uint32_t taskId)1467 void TaskManager::RemoveTask(uint32_t taskId)
1468 {
1469     std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1470     tasks_.erase(taskId);
1471 }
1472 
GetTask(uint32_t taskId)1473 Task* TaskManager::GetTask(uint32_t taskId)
1474 {
1475     std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1476     auto iter = tasks_.find(taskId);
1477     if (iter == tasks_.end()) {
1478         return nullptr;
1479     }
1480     return iter->second;
1481 }
1482 
1483 #if defined(ENABLE_TASKPOOL_FFRT)
UpdateSystemAppFlag()1484 void TaskManager::UpdateSystemAppFlag()
1485 {
1486     auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
1487     if (abilityManager == nullptr) {
1488         HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
1489         return;
1490     }
1491     auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
1492     if (bundleObj == nullptr) {
1493         HILOG_ERROR("taskpool:: fail to get bundle manager service.");
1494         return;
1495     }
1496     auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
1497     if (bundleMgr == nullptr) {
1498         HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
1499         return;
1500     }
1501     OHOS::AppExecFwk::BundleInfo bundleInfo;
1502     if (bundleMgr->GetBundleInfoForSelf(
1503         static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
1504         != OHOS::ERR_OK) {
1505         HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
1506         return;
1507     }
1508     isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
1509 }
1510 #endif
1511 
1512 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
PostTask(std::function<void ()> task,const char * taskName,Priority priority)1513 bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
1514 {
1515     return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
1516 }
1517 #endif
1518 
CheckTask(uint32_t taskId)1519 bool TaskManager::CheckTask(uint32_t taskId)
1520 {
1521     std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1522     auto item = tasks_.find(taskId);
1523     return item != tasks_.end();
1524 }
1525 
BatchRejectDeferred(napi_env env,std::list<napi_deferred> deferreds,std::string error)1526 void TaskManager::BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error)
1527 {
1528     if (deferreds.empty()) {
1529         return;
1530     }
1531     napi_value message = ErrorHelper::NewError(env, 0, error.c_str());
1532     for (auto deferred : deferreds) {
1533         napi_reject_deferred(env, deferred, message);
1534     }
1535 }
1536 
CalculateTaskId(uint64_t id)1537 uint32_t TaskManager::CalculateTaskId(uint64_t id)
1538 {
1539     size_t hash = std::hash<uint64_t>{}(id);
1540     return static_cast<uint32_t>(hash & MAX_UINT32_T);
1541 }
1542 
ClearDependentTask(uint32_t taskId)1543 void TaskManager::ClearDependentTask(uint32_t taskId)
1544 {
1545     HILOG_DEBUG("taskpool:: task:%{public}s ClearDependentTask", std::to_string(taskId).c_str());
1546     RemoveDependTaskByTaskId(taskId);
1547     DequeuePendingTaskInfo(taskId);
1548     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1549     RemoveDependentTaskByTaskId(taskId);
1550 }
1551 
RemoveDependTaskByTaskId(uint32_t taskId)1552 void TaskManager::RemoveDependTaskByTaskId(uint32_t taskId)
1553 {
1554     std::set<uint32_t> dependTaskIds;
1555     {
1556         std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1557         auto iter = dependTaskInfos_.find(taskId);
1558         if (iter == dependTaskInfos_.end()) {
1559             return;
1560         }
1561         dependTaskIds.insert(iter->second.begin(), iter->second.end());
1562         dependTaskInfos_.erase(iter);
1563     }
1564     for (auto dependTaskId : dependTaskIds) {
1565         RemoveDependentTaskInfo(dependTaskId, taskId);
1566     }
1567 }
1568 
RemoveDependentTaskByTaskId(uint32_t taskId)1569 void TaskManager::RemoveDependentTaskByTaskId(uint32_t taskId)
1570 {
1571     auto iter = dependentTaskInfos_.find(taskId);
1572     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1573         HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1574         return;
1575     }
1576     for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1577         auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1578         RemoveDependencyById(taskId, *taskIdIter);
1579         auto id = *taskIdIter;
1580         taskIdIter = iter->second.erase(taskIdIter);
1581         auto task = GetTask(id);
1582         if (task == nullptr) {
1583             continue;
1584         }
1585         if (task->currentTaskInfo_ != nullptr) {
1586             EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
1587         }
1588         task->DisposeCanceledTask();
1589         RemoveDependentTaskByTaskId(task->taskId_);
1590     }
1591 }
1592 
WriteHisysForFfrtAndUv(Worker * worker,HisyseventParams * hisyseventParams)1593 void TaskManager::WriteHisysForFfrtAndUv(Worker* worker, HisyseventParams* hisyseventParams)
1594 {
1595 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1596     if (!EnableFfrt() || hisyseventParams == nullptr) {
1597         return;
1598     }
1599     int32_t tid = 0;
1600     if (worker != nullptr) {
1601         auto loop = worker->GetWorkerLoop();
1602         uint64_t loopAddress = reinterpret_cast<uint64_t>(loop);
1603         hisyseventParams->message += "; current runningLoop: " + std::to_string(loopAddress);
1604         tid = worker->tid_;
1605     }
1606     hisyseventParams->tid = tid;
1607     hisyseventParams->pid = getpid();
1608     hisyseventParams->message += "; idleWorkers num: " + std::to_string(idleWorkers_.size());
1609     hisyseventParams->message += "; workers num: " + std::to_string(workers_.size());
1610     int32_t ret = DfxHisysEvent::WriteFfrtAndUv(hisyseventParams);
1611     if (ret < 0) {
1612         HILOG_WARN("taskpool:: HisyseventWrite failed, ret: %{public}s", std::to_string(ret).c_str());
1613     }
1614 #endif
1615 }
1616 
CheckTasksAndReportHisysEvent()1617 void TaskManager::CheckTasksAndReportHisysEvent()
1618 {
1619 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1620     if (!EnableFfrt()) {
1621         return;
1622     }
1623     uint64_t currTime = ConcurrentHelper::GetMilliseconds();
1624     auto taskNum = GetTaskNum();
1625     if (taskNum == 0 || currTime - preDequeneTime_ < UNEXECUTE_TASK_TIME || preDequeneTime_ < reportTime_) {
1626         return;
1627     }
1628     std::string message = "Task scheduling timeout, total task num:" + std::to_string(taskNum);
1629     HisyseventParams* hisyseventParams = new HisyseventParams();
1630     hisyseventParams->methodName = "CheckTasksAndReportHisysEvent";
1631     hisyseventParams->funName = "";
1632     hisyseventParams->logType = "ffrt";
1633     hisyseventParams->code = DfxHisysEvent::TASK_TIMEOUT;
1634     hisyseventParams->message = message;
1635     WriteHisysForFfrtAndUv(nullptr, hisyseventParams);
1636     reportTime_ = currTime;
1637 #endif
1638 }
1639 
WorkerAliveAndReport(Worker * worker)1640 void TaskManager::WorkerAliveAndReport(Worker* worker)
1641 {
1642 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1643     uint64_t workerWaitTime = worker->GetWaitTime();
1644     uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
1645         std::chrono::steady_clock::now().time_since_epoch()).count());
1646     uint64_t intervalTime = currTime - workerWaitTime;
1647     if (!worker->IsNeedReport(intervalTime)) {
1648         return;
1649     }
1650     std::string message = "worker uv alive, may have handle leak";
1651     HisyseventParams* hisyseventParams = new HisyseventParams();
1652     hisyseventParams->methodName = "WorkerAliveAndReport";
1653     hisyseventParams->funName = "";
1654     hisyseventParams->logType = "ffrt";
1655     hisyseventParams->code = DfxHisysEvent::WORKER_ALIVE;
1656     hisyseventParams->message = message;
1657     hisyseventParams->waitTime = intervalTime;
1658     WriteHisysForFfrtAndUv(worker, hisyseventParams);
1659     worker->IncreaseReportCount();
1660 #endif
1661 }
1662 
UvReportHisysEvent(Worker * worker,std::string methodName,std::string funName,std::string message,int32_t code)1663 void TaskManager::UvReportHisysEvent(Worker* worker, std::string methodName, std::string funName, std::string message,
1664                                      int32_t code)
1665 {
1666 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1667     HisyseventParams* hisyseventParams = new HisyseventParams();
1668     hisyseventParams->methodName = methodName;
1669     hisyseventParams->funName = funName;
1670     hisyseventParams->logType = "uv";
1671     hisyseventParams->code = code;
1672     hisyseventParams->message = message;
1673     WriteHisysForFfrtAndUv(worker, hisyseventParams);
1674 #endif
1675 }
1676 } // namespace Commonlibrary::Concurrent::TaskPoolModule