• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "task_manager.h"
17 
18 #include <securec.h>
19 #include <thread>
20 
21 #include "async_runner_manager.h"
22 #if defined(ENABLE_TASKPOOL_FFRT)
23 #include "bundle_info.h"
24 #include "bundle_mgr_interface.h"
25 #include "bundle_mgr_proxy.h"
26 #include "iservice_registry.h"
27 #include "parameters.h"
28 #include "status_receiver_interface.h"
29 #include "system_ability_definition.h"
30 #include "c/executor_task.h"
31 #include "ffrt_inner.h"
32 #endif
33 #include "sys_timer.h"
34 #include "helper/hitrace_helper.h"
35 #include "taskpool.h"
36 #include "task_group_manager.h"
37 
38 namespace Commonlibrary::Concurrent::TaskPoolModule {
39 using namespace OHOS::JsSysModule;
40 template void TaskManager::TryExpandWithCheckIdle<true>();
41 template void TaskManager::TryExpandWithCheckIdle<false>();
42 
43 static constexpr int8_t HIGH_PRIORITY_TASK_COUNT = 5;
44 static constexpr int8_t MEDIUM_PRIORITY_TASK_COUNT = 5;
45 static constexpr int32_t MAX_TASK_DURATION = 100; // 100: 100ms
46 static constexpr uint32_t STEP_SIZE = 2;
47 static constexpr uint32_t DEFAULT_THREADS = 3;
48 static constexpr uint32_t DEFAULT_MIN_THREADS = 1; // 1: minimum thread num when idle
49 static constexpr uint32_t MIN_TIMEOUT_TIME = 180000; // 180000: 3min
50 static constexpr uint32_t MAX_TIMEOUT_TIME = 600000; // 600000: 10min
51 static constexpr int32_t MAX_IDLE_TIME = 30000; // 30000: 30s
52 static constexpr uint32_t TRIGGER_INTERVAL = 30000; // 30000: 30s
53 static constexpr uint32_t SHRINK_STEP = 4; // 4: try to release 4 threads every time
54 static constexpr uint32_t MAX_UINT32_T = 0xFFFFFFFF; // 0xFFFFFFFF: max uint32_t
55 static constexpr uint32_t TRIGGER_EXPAND_INTERVAL = 10; // 10: ms, trigger recheck expansion interval
56 [[maybe_unused]] static constexpr uint32_t IDLE_THRESHOLD = 2; // 2: 2 intervals later will release the thread
57 static constexpr char ON_CALLBACK_STR[] = "TaskPoolOnCallbackTask";
58 static constexpr uint32_t UNEXECUTE_TASK_TIME = 60000; // 60000: 1min
59 
60 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
61 static const std::map<Priority, OHOS::AppExecFwk::EventQueue::Priority> TASK_EVENTHANDLER_PRIORITY_MAP = {
62     {Priority::IDLE, OHOS::AppExecFwk::EventQueue::Priority::IDLE},
63     {Priority::LOW, OHOS::AppExecFwk::EventQueue::Priority::LOW},
64     {Priority::MEDIUM, OHOS::AppExecFwk::EventQueue::Priority::HIGH},
65     {Priority::HIGH, OHOS::AppExecFwk::EventQueue::Priority::IMMEDIATE},
66 };
67 #endif
68 
69 // ----------------------------------- TaskManager ----------------------------------------
GetInstance()70 TaskManager& TaskManager::GetInstance()
71 {
72     static TaskManager manager;
73     return manager;
74 }
75 
TaskManager()76 TaskManager::TaskManager()
77 {
78     for (size_t i = 0; i < taskQueues_.size(); i++) {
79         std::unique_ptr<ExecuteQueue> taskQueue = std::make_unique<ExecuteQueue>();
80         taskQueues_[i] = std::move(taskQueue);
81     }
82 }
83 
~TaskManager()84 TaskManager::~TaskManager()
85 {
86     HILOG_INFO("taskpool:: ~TaskManager");
87     if (balanceTimer_ == nullptr) {
88         HILOG_ERROR("taskpool:: balanceTimer_ is nullptr");
89     } else {
90         uv_timer_stop(balanceTimer_);
91         ConcurrentHelper::UvHandleClose(balanceTimer_);
92     }
93 
94     if (expandTimer_ == nullptr) {
95         HILOG_ERROR("taskpool:: expandTimer_ is nullptr");
96     } else {
97         uv_timer_stop(expandTimer_);
98         ConcurrentHelper::UvHandleClose(expandTimer_);
99     }
100 
101     ConcurrentHelper::UvHandleClose(dispatchHandle_);
102 
103     if (loop_ != nullptr) {
104         uv_stop(loop_);
105     }
106 
107     {
108         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
109         for (auto& worker : workers_) {
110             delete worker;
111         }
112         workers_.clear();
113     }
114 
115     {
116         std::lock_guard<std::mutex> lock(callbackMutex_);
117         for (auto& [_, callbackPtr] : callbackTable_) {
118             if (callbackPtr == nullptr) {
119                 continue;
120             }
121             callbackPtr.reset();
122         }
123         callbackTable_.clear();
124     }
125 
126     {
127         std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
128         for (auto& [_, task] : tasks_) {
129             delete task;
130             task = nullptr;
131         }
132         tasks_.clear();
133     }
134     CountTraceForWorker();
135 }
136 
CountTraceForWorker(bool needLog)137 void TaskManager::CountTraceForWorker(bool needLog)
138 {
139     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
140     CountTraceForWorkerWithoutLock(needLog);
141 }
142 
CountTraceForWorkerWithoutLock(bool needLog)143 inline void TaskManager::CountTraceForWorkerWithoutLock(bool needLog)
144 {
145     int64_t threadNum = static_cast<int64_t>(workers_.size());
146     int64_t idleWorkers = static_cast<int64_t>(idleWorkers_.size());
147     int64_t timeoutWorkers = static_cast<int64_t>(timeoutWorkers_.size());
148     HITRACE_HELPER_COUNT_TRACE("timeoutThreadNum", timeoutWorkers);
149     HITRACE_HELPER_COUNT_TRACE("threadNum", threadNum);
150     HITRACE_HELPER_COUNT_TRACE("runningThreadNum", threadNum - idleWorkers);
151     HITRACE_HELPER_COUNT_TRACE("idleThreadNum", idleWorkers);
152     AddCountTraceForWorkerLog(needLog, threadNum, idleWorkers, timeoutWorkers);
153 }
154 
GetThreadInfos(napi_env env)155 napi_value TaskManager::GetThreadInfos(napi_env env)
156 {
157     napi_value threadInfos = nullptr;
158     napi_create_array(env, &threadInfos);
159     std::unordered_set<std::unique_ptr<ThreadInfo>> threadInfoSet {};
160     {
161         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
162         for (auto& worker : workers_) {
163             if (worker->workerEnv_ == nullptr) {
164                 continue;
165             }
166             auto threadInfo = std::make_unique<ThreadInfo>();
167             threadInfo->tid = worker->tid_;
168             threadInfo->priority = worker->priority_;
169             for (auto& id : worker->currentTaskId_) {
170                 threadInfo->currentTaskId.emplace_back(id);
171             }
172             threadInfoSet.emplace(std::move(threadInfo));
173         }
174     }
175     int32_t i = 0;
176     for (auto& info : threadInfoSet) {
177         napi_value tid = NapiHelper::CreateUint32(env, static_cast<uint32_t>(info->tid));
178         napi_value priority = NapiHelper::CreateUint32(env, static_cast<uint32_t>(info->priority));
179 
180         napi_value taskId = nullptr;
181         napi_create_array(env, &taskId);
182         int32_t j = 0;
183         for (auto& currentId : info->currentTaskId) {
184             napi_value id = NapiHelper::CreateUint32(env, currentId);
185             napi_set_element(env, taskId, j, id);
186             j++;
187         }
188         napi_value threadInfo = nullptr;
189         napi_create_object(env, &threadInfo);
190         napi_set_named_property(env, threadInfo, "tid", tid);
191         napi_set_named_property(env, threadInfo, "priority", priority);
192         napi_set_named_property(env, threadInfo, "taskIds", taskId);
193         napi_set_element(env, threadInfos, i, threadInfo);
194         i++;
195     }
196     return threadInfos;
197 }
198 
GetTaskInfos(napi_env env)199 napi_value TaskManager::GetTaskInfos(napi_env env)
200 {
201     napi_value taskInfos = nullptr;
202     napi_create_array(env, &taskInfos);
203     std::unordered_set<std::unique_ptr<TaskCurrentInfo>> taskCurrentInfoSet {};
204     {
205         std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
206         for (const auto& [_, task] : tasks_) {
207             if (task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::DELAYED ||
208                 task->taskState_ == ExecuteState::FINISHED) {
209                 continue;
210             }
211             auto taskCurrentInfo = std::make_unique<TaskCurrentInfo>();
212             taskCurrentInfo->taskId = task->taskId_;
213             taskCurrentInfo->name = task->name_;
214             taskCurrentInfo->taskState = task->taskState_;
215             taskCurrentInfo->startTime = task->startTime_;
216             taskCurrentInfoSet.emplace(std::move(taskCurrentInfo));
217         }
218     }
219     int32_t index = 0;
220     for (auto& info : taskCurrentInfoSet) {
221         napi_value taskInfoValue = NapiHelper::CreateObject(env);
222         napi_value taskId = NapiHelper::CreateUint32(env, info->taskId);
223         napi_value name = nullptr;
224         napi_create_string_utf8(env, info->name.c_str(), info->name.size(), &name);
225         napi_set_named_property(env, taskInfoValue, "name", name);
226         ExecuteState state = info->taskState;
227         uint64_t duration = 0;
228         if (state == ExecuteState::RUNNING || state == ExecuteState::ENDING) {
229             duration = ConcurrentHelper::GetMilliseconds() - info->startTime;
230         }
231         napi_value stateValue = NapiHelper::CreateUint32(env, static_cast<uint32_t>(state));
232         napi_set_named_property(env, taskInfoValue, "taskId", taskId);
233         napi_set_named_property(env, taskInfoValue, "state", stateValue);
234         napi_value durationValue = NapiHelper::CreateUint32(env, duration);
235         napi_set_named_property(env, taskInfoValue, "duration", durationValue);
236         napi_set_element(env, taskInfos, index, taskInfoValue);
237         index++;
238     }
239     return taskInfos;
240 }
241 
UpdateExecutedInfo(uint64_t duration)242 void TaskManager::UpdateExecutedInfo(uint64_t duration)
243 {
244     totalExecTime_ += duration;
245     totalExecCount_++;
246 }
247 
ComputeSuitableThreadNum()248 uint32_t TaskManager::ComputeSuitableThreadNum()
249 {
250     uint32_t targetNum = ComputeSuitableIdleNum() + GetRunningWorkers();
251     return targetNum;
252 }
253 
ComputeSuitableIdleNum()254 uint32_t TaskManager::ComputeSuitableIdleNum()
255 {
256     uint32_t targetNum = 0;
257     if (GetNonIdleTaskNum() != 0 && totalExecCount_ == 0) {
258         // this branch is used for avoiding time-consuming tasks that may block the taskpool
259         targetNum = std::min(STEP_SIZE, GetNonIdleTaskNum());
260     } else if (totalExecCount_ != 0) {
261         auto durationPerTask = static_cast<double>(totalExecTime_) / totalExecCount_;
262         uint32_t result = std::ceil(durationPerTask * GetNonIdleTaskNum() / MAX_TASK_DURATION);
263         targetNum = std::min(result, GetNonIdleTaskNum());
264     }
265     return targetNum | 1;
266 }
267 
CheckForBlockedWorkers()268 void TaskManager::CheckForBlockedWorkers()
269 {
270     // the threshold will be dynamically modified to provide more flexibility in detecting exceptions
271     // if the thread num has reached the limit and the idle worker is not available, a short time will be used,
272     // else we will choose the longer one
273     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
274     bool needChecking = false;
275     bool state = (GetThreadNum() == ConcurrentHelper::GetMaxThreads()) && (GetIdleWorkers() == 0);
276     uint64_t threshold = state ? MIN_TIMEOUT_TIME : MAX_TIMEOUT_TIME;
277     for (auto iter = workers_.begin(); iter != workers_.end(); iter++) {
278         auto worker = *iter;
279         // if the worker thread is idle, just skip it, and only the worker in running state can be marked as timeout
280         // if the worker is executing the longTask, we will not do the check
281         if ((worker->state_ == WorkerState::IDLE) || (worker->IsExecutingLongTask()) ||
282             (ConcurrentHelper::GetMilliseconds() - worker->startTime_ < threshold) ||
283             !worker->UpdateWorkerState(WorkerState::RUNNING, WorkerState::BLOCKED)) {
284             continue;
285         }
286         // When executing the promise task, the worker state may not be updated and will be
287         // marked as 'BLOCKED', so we should exclude this situation.
288         // Besides, if the worker is not executing sync tasks or micro tasks, it may handle
289         // the task like I/O in uv threads, we should also exclude this situation.
290         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
291         if (worker->idleState_ && !workerEngine->IsExecutingPendingJob()) {
292             if (!workerEngine->HasWaitingRequest()) {
293                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::IDLE);
294             } else {
295                 worker->UpdateWorkerState(WorkerState::BLOCKED, WorkerState::RUNNING);
296                 worker->startTime_ = ConcurrentHelper::GetMilliseconds();
297             }
298             continue;
299         }
300 
301         HILOG_INFO("taskpool:: The worker has been marked as timeout.");
302         // If the current worker has a longTask and is not executing, we will only interrupt it.
303         if (worker->HasLongTask()) {
304             continue;
305         }
306         needChecking = true;
307         idleWorkers_.erase(worker);
308         timeoutWorkers_.insert(worker);
309     }
310     // should trigger the check when we have marked and removed workers
311     if (UNLIKELY(needChecking)) {
312         TryExpand();
313     }
314 }
315 
TryTriggerExpand()316 void TaskManager::TryTriggerExpand()
317 {
318     // post the signal to notify the monitor thread to expand
319     if (UNLIKELY(!isHandleInited_)) {
320         NotifyExecuteTask();
321         needChecking_ = true;
322         HILOG_WARN("taskpool:: the dispatchHandle_ is nullptr");
323         return;
324     }
325     uv_async_send(dispatchHandle_);
326 }
327 
328 #if defined(OHOS_PLATFORM)
329 // read /proc/[pid]/task/[tid]/stat to get the number of idle threads.
ReadThreadInfo(pid_t tid,char * buf,uint32_t size)330 bool TaskManager::ReadThreadInfo(pid_t tid, char* buf, uint32_t size)
331 {
332     char path[128]; // 128: buffer for path
333     pid_t pid = getpid();
334     ssize_t bytesLen = -1;
335     int ret = snprintf_s(path, sizeof(path), sizeof(path) - 1, "/proc/%d/task/%d/stat", pid, tid);
336     if (ret < 0) {
337         HILOG_ERROR("snprintf_s failed");
338         return false;
339     }
340     int fd = open(path, O_RDONLY | O_NONBLOCK);
341     if (UNLIKELY(fd == -1)) {
342         return false;
343     }
344     bytesLen = read(fd, buf, size - 1);
345     close(fd);
346     if (bytesLen <= 0) {
347         HILOG_ERROR("taskpool:: failed to read %{public}s", path);
348         return false;
349     }
350     buf[bytesLen] = '\0';
351     return true;
352 }
353 
GetIdleWorkers()354 uint32_t TaskManager::GetIdleWorkers()
355 {
356     char buf[4096]; // 4096: buffer for thread info
357     uint32_t idleCount = 0;
358     std::unordered_set<pid_t> tids {};
359     {
360         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
361         for (auto& worker : idleWorkers_) {
362 #if defined(ENABLE_TASKPOOL_FFRT)
363             if (worker->ffrtTaskHandle_ != nullptr) {
364                 if (worker->GetWaitTime() > 0) {
365                     idleCount++;
366                 }
367                 continue;
368             }
369 #endif
370             tids.emplace(worker->tid_);
371         }
372     }
373     // The ffrt thread does not read thread info
374     for (auto tid : tids) {
375         if (!ReadThreadInfo(tid, buf, sizeof(buf))) {
376             continue;
377         }
378         char state;
379         if (sscanf_s(buf, "%*d %*s %c", &state, sizeof(state)) != 1) { // 1: state
380             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}c", state);
381             return 0;
382         }
383         if (state == 'S') {
384             idleCount++;
385         }
386     }
387     return idleCount;
388 }
389 
GetIdleWorkersList(uint32_t step)390 void TaskManager::GetIdleWorkersList(uint32_t step)
391 {
392     char buf[4096]; // 4096: buffer for thread info
393     for (auto& worker : idleWorkers_) {
394 #if defined(ENABLE_TASKPOOL_FFRT)
395         if (worker->ffrtTaskHandle_ != nullptr) {
396             uint64_t workerWaitTime = worker->GetWaitTime();
397             bool isWorkerLoopActive = worker->IsLoopActive();
398             if (workerWaitTime == 0) {
399                 continue;
400             }
401             uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
402                 std::chrono::steady_clock::now().time_since_epoch()).count());
403             if (!isWorkerLoopActive) {
404                 freeList_.emplace_back(worker);
405             } else {
406                 // worker uv alive, and will be free in 2 intervals if not wake
407                 WorkerAliveAndReport(worker);
408             }
409             continue;
410         }
411 #endif
412         if (!ReadThreadInfo(worker->tid_, buf, sizeof(buf))) {
413             continue;
414         }
415         char state;
416         uint64_t utime;
417         if (sscanf_s(buf, "%*d %*s %c %*d %*d %*d %*d %*d %*u %*lu %*lu %*lu %*lu %llu",
418             &state, sizeof(state), &utime) != 2) { // 2: state and utime
419             HILOG_ERROR("taskpool: sscanf_s of state failed for %{public}d", worker->tid_);
420             return;
421         }
422         if (state != 'S' || utime != worker->lastCpuTime_) {
423             worker->idleCount_ = 0;
424             worker->lastCpuTime_ = utime;
425             continue;
426         }
427         if (++worker->idleCount_ >= IDLE_THRESHOLD) {
428             freeList_.emplace_back(worker);
429         }
430     }
431 }
432 
TriggerShrink(uint32_t step)433 void TaskManager::TriggerShrink(uint32_t step)
434 {
435     GetIdleWorkersList(step);
436     step = std::min(step, static_cast<uint32_t>(freeList_.size()));
437     uint32_t count = 0;
438     for (size_t i = 0; i < freeList_.size(); i++) {
439         auto worker = freeList_[i];
440         if (worker->state_ != WorkerState::IDLE || worker->HasLongTask()) {
441             continue;
442         }
443         auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
444         if (idleTime < MAX_IDLE_TIME || worker->HasRunningTasks()) {
445             continue;
446         }
447         idleWorkers_.erase(worker);
448         HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
449         worker->PostReleaseSignal();
450         if (++count == step) {
451             break;
452         }
453     }
454     freeList_.clear();
455 }
456 #else
GetIdleWorkers()457 uint32_t TaskManager::GetIdleWorkers()
458 {
459     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
460     return idleWorkers_.size();
461 }
462 
TriggerShrink(uint32_t step)463 void TaskManager::TriggerShrink(uint32_t step)
464 {
465     for (uint32_t i = 0; i < step; i++) {
466         // try to free the worker that idle time meets the requirement
467         auto iter = std::find_if(idleWorkers_.begin(), idleWorkers_.end(), [](Worker* worker) {
468             auto idleTime = ConcurrentHelper::GetMilliseconds() - worker->idlePoint_;
469             return idleTime > MAX_IDLE_TIME && !worker->HasRunningTasks() && !worker->HasLongTask();
470         });
471         // remove it from all sets
472         if (iter != idleWorkers_.end()) {
473             auto worker = *iter;
474             idleWorkers_.erase(worker);
475             HILOG_DEBUG("taskpool:: try to release idle thread: %{public}d", worker->tid_);
476             worker->PostReleaseSignal();
477         }
478     }
479 }
480 #endif
481 
NotifyShrink(uint32_t targetNum)482 void TaskManager::NotifyShrink(uint32_t targetNum)
483 {
484     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
485     uint32_t workerCount = workers_.size();
486     uint32_t minThread = ConcurrentHelper::IsLowMemory() ? 0 : DEFAULT_MIN_THREADS;
487     // update the maxThreads_ periodically
488     maxThreads_ = ConcurrentHelper::GetMaxThreads();
489     if (minThread == 0) {
490         HILOG_INFO("taskpool:: low memory");
491     }
492     if (workerCount > minThread && workerCount > targetNum) {
493         targetNum = std::max(minThread, targetNum);
494         uint32_t step = std::min(workerCount - targetNum, SHRINK_STEP);
495         TriggerShrink(step);
496     }
497     // remove all timeout workers
498     for (auto iter = timeoutWorkers_.begin(); iter != timeoutWorkers_.end();) {
499         auto worker = *iter;
500         if (workers_.find(worker) == workers_.end()) {
501             HILOG_WARN("taskpool:: current worker maybe release");
502             iter = timeoutWorkers_.erase(iter);
503         } else if (!worker->HasRunningTasks()) {
504             HILOG_DEBUG("taskpool:: try to release timeout thread: %{public}d", worker->tid_);
505             worker->PostReleaseSignal();
506             timeoutWorkers_.erase(iter++);
507             return;
508         } else {
509             iter++;
510         }
511     }
512     uint32_t idleNum = idleWorkers_.size();
513     // System memory state is moderate and the worker has exeuted tasks, we will try to release it
514     if (ConcurrentHelper::IsModerateMemory() && workerCount == idleNum && workerCount == DEFAULT_MIN_THREADS) {
515         auto worker = *(idleWorkers_.begin());
516         // worker that has longTask should not be released
517         if (worker == nullptr || worker->HasLongTask()) {
518             return;
519         }
520         if (worker->hasExecuted_) { // worker that hasn't execute any tasks should not be released
521             TriggerShrink(DEFAULT_MIN_THREADS);
522             return;
523         }
524     }
525 
526     // Create a worker for performance
527     if (!ConcurrentHelper::IsLowMemory() && workers_.empty()) {
528         CreateWorkers(hostEnv_);
529     }
530     // stop the timer
531     if ((workerCount == idleNum && workerCount <= minThread) && timeoutWorkers_.empty()) {
532         suspend_ = true;
533         uv_timer_stop(balanceTimer_);
534         HILOG_DEBUG("taskpool:: timer will be suspended");
535     }
536 }
537 
TriggerLoadBalance(const uv_timer_t * req)538 void TaskManager::TriggerLoadBalance([[maybe_unused]] const uv_timer_t* req)
539 {
540     TaskManager& taskManager = TaskManager::GetInstance();
541     taskManager.CheckForBlockedWorkers();
542     uint32_t targetNum = taskManager.ComputeSuitableThreadNum();
543     taskManager.NotifyShrink(targetNum);
544     taskManager.CountTraceForWorker(true);
545 }
546 
DispatchAndTryExpand(const uv_async_t * req)547 void TaskManager::DispatchAndTryExpand([[maybe_unused]] const uv_async_t* req)
548 {
549     TaskManager& taskManager = TaskManager::GetInstance();
550     taskManager.DispatchAndTryExpandInner();
551 }
552 
DispatchAndTryExpandInner()553 void TaskManager::DispatchAndTryExpandInner()
554 {
555     // dispatch task in the TaskPoolManager thread
556     NotifyExecuteTask();
557     // do not check the worker idleTime first
558     TryExpandWithCheckIdle<false>();
559     if (!timerTriggered_ && GetNonIdleTaskNum() != 0) {
560         timerTriggered_ = true;
561         // use timer to check worker idle time after dispatch task
562         // if worker has been blocked or fd is broken, taskpool can expand automatically
563         uv_timer_start(expandTimer_, reinterpret_cast<uv_timer_cb>(TryExpand), TRIGGER_EXPAND_INTERVAL, 0);
564     }
565 }
566 
TryExpand(const uv_timer_t * req)567 void TaskManager::TryExpand([[maybe_unused]] const uv_timer_t* req)
568 {
569     TaskManager& taskManager = TaskManager::GetInstance();
570     taskManager.TryExpandWithCheckIdle<true>();
571 }
572 
573 template <bool needCheckIdle>
TryExpandWithCheckIdle()574 void TaskManager::TryExpandWithCheckIdle()
575 {
576     if (GetNonIdleTaskNum() == 0) {
577         HILOG_INFO("taskpool:: no need to create worker");
578         return;
579     }
580 
581     needChecking_ = false;
582     timerTriggered_ = false;
583     uint32_t targetNum = ComputeSuitableIdleNum();
584     uint32_t workerCount = 0;
585     uint32_t idleCount = 0;
586     uint32_t timeoutWorkers = 0;
587     {
588         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
589         workerCount = workers_.size();
590         timeoutWorkers = timeoutWorkers_.size();
591         if constexpr (needCheckIdle) {
592             uint64_t currTime = ConcurrentHelper::GetMilliseconds();
593             idleCount = static_cast<uint32_t>(std::count_if(idleWorkers_.begin(), idleWorkers_.end(),
594                 [currTime](const auto& worker) {
595                     return worker->IsRunnable(currTime);
596                 }));
597         } else {
598             idleCount = idleWorkers_.size();
599         }
600     }
601     uint32_t maxThreads = std::max(maxThreads_, DEFAULT_THREADS);
602     maxThreads = (timeoutWorkers == 0) ? maxThreads : maxThreads + 2; // 2: extra threads
603     if (workerCount < maxThreads && idleCount < targetNum) {
604         // Prevent the total number of workers from exceeding maxThreads
605         uint32_t step = std::min(std::min(maxThreads, targetNum) - idleCount, maxThreads - workerCount);
606         step = step >= expandingCount_ ? step - expandingCount_ : 0;
607         if (step == 0) {
608             return;
609         }
610         CreateWorkers(hostEnv_, step);
611         HILOG_INFO("taskpool:: maxThreads: %{public}u, created num: %{public}u, total num: %{public}u",
612             maxThreads, step, GetThreadNum());
613     }
614     if (UNLIKELY(suspend_)) {
615         suspend_ = false;
616         uv_timer_again(balanceTimer_);
617     }
618 }
619 
RunTaskManager()620 void TaskManager::RunTaskManager()
621 {
622     loop_ = uv_loop_new();
623     if (loop_ == nullptr) { // LCOV_EXCL_BR_LINE
624         HILOG_FATAL("taskpool:: new loop failed.");
625         return;
626     }
627     ConcurrentHelper::UvHandleInit(loop_, dispatchHandle_, TaskManager::DispatchAndTryExpand);
628     balanceTimer_ = new uv_timer_t;
629     uv_timer_init(loop_, balanceTimer_);
630     uv_timer_start(balanceTimer_, reinterpret_cast<uv_timer_cb>(TaskManager::TriggerLoadBalance), 0, TRIGGER_INTERVAL);
631 
632     expandTimer_ = new uv_timer_t;
633     uv_timer_init(loop_, expandTimer_);
634 
635     isHandleInited_ = true;
636 #if defined IOS_PLATFORM || defined MAC_PLATFORM
637     pthread_setname_np("OS_TaskManager");
638 #else
639     pthread_setname_np(pthread_self(), "OS_TaskManager");
640 #endif
641     if (UNLIKELY(needChecking_)) {
642         needChecking_ = false;
643         uv_async_send(dispatchHandle_);
644     }
645     uv_run(loop_, UV_RUN_DEFAULT);
646     if (loop_ != nullptr) {
647         uv_loop_delete(loop_);
648     }
649 }
650 
CancelTask(napi_env env,uint32_t taskId)651 void TaskManager::CancelTask(napi_env env, uint32_t taskId)
652 {
653     // 1. Cannot find taskInfo by executeId, throw error
654     // 2. Find executing taskInfo, skip it
655     // 3. Find waiting taskInfo, cancel it
656     // 4. Find canceled taskInfo, skip it
657     std::string strTrace = "CancelTask: taskId: " + std::to_string(taskId);
658     HILOG_INFO("taskpool:: %{public}s", strTrace.c_str());
659     HITRACE_HELPER_METER_NAME(strTrace);
660     Task* task = GetTask(taskId);
661     if (task == nullptr) {
662         std::string errMsg = "taskpool:: the task may not exist";
663         HILOG_ERROR("%{public}s", errMsg.c_str());
664         ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
665         return;
666     }
667     if (task->taskState_ == ExecuteState::CANCELED) {
668         HILOG_DEBUG("taskpool:: task has been canceled");
669         return;
670     }
671     if (task->IsGroupCommonTask()) {
672         // when task is a group common task, still check the state
673         if (task->currentTaskInfo_ == nullptr || task->taskState_ == ExecuteState::NOT_FOUND ||
674             task->taskState_ == ExecuteState::FINISHED || task->taskState_ == ExecuteState::ENDING) {
675             std::string errMsg = "taskpool:: task is not executed or has been executed";
676             HILOG_ERROR("%{public}s", errMsg.c_str());
677             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
678             return;
679         }
680         TaskGroup* taskGroup = TaskGroupManager::GetInstance().GetTaskGroup(task->groupId_);
681         if (taskGroup == nullptr) {
682             return;
683         }
684         return taskGroup->CancelGroupTask(env, task->taskId_);
685     }
686     if (task->IsPeriodicTask()) {
687         task->UpdateTaskStateToCanceled();
688         return;
689     }
690     if (task->IsSeqRunnerTask()) {
691         CancelSeqRunnerTask(env, task);
692         return;
693     }
694     if (task->IsAsyncRunnerTask()) {
695         AsyncRunnerManager::GetInstance().CancelAsyncRunnerTask(env, task);
696         return;
697     }
698     ExecuteState state = ExecuteState::NOT_FOUND;
699     {
700         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
701         if (task->taskState_ == ExecuteState::CANCELED) {
702             HILOG_DEBUG("taskpool:: task has been canceled");
703             return;
704         }
705         if ((task->currentTaskInfo_ == nullptr && task->taskState_ != ExecuteState::DELAYED) ||
706             task->taskState_ == ExecuteState::NOT_FOUND || task->taskState_ == ExecuteState::FINISHED ||
707             task->taskState_ == ExecuteState::ENDING) {
708             std::string errMsg = "taskpool:: task is not executed or has been executed";
709             HILOG_ERROR("%{public}s", errMsg.c_str());
710             ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
711             return;
712         }
713         state = task->taskState_.exchange(ExecuteState::CANCELED);
714     }
715     if (task->IsSameEnv(env)) {
716         task->CancelInner(state);
717         return;
718     }
719     CancelTaskMessage* message = new CancelTaskMessage(state, task->taskId_);
720     task->TriggerCancel(message);
721 }
722 
CancelSeqRunnerTask(napi_env env,Task * task)723 void TaskManager::CancelSeqRunnerTask(napi_env env, Task* task)
724 {
725     {
726         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
727         if (task->taskState_ != ExecuteState::FINISHED) {
728             task->taskState_ = ExecuteState::CANCELED;
729             return;
730         }
731     }
732     std::string errMsg = "taskpool:: sequenceRunner task has been executed";
733     HILOG_ERROR("%{public}s", errMsg.c_str());
734     ErrorHelper::ThrowError(env, ErrorHelper::ERR_CANCEL_NONEXIST_TASK, errMsg.c_str());
735 }
736 
NotifyWorkerIdle(Worker * worker)737 void TaskManager::NotifyWorkerIdle(Worker* worker)
738 {
739     {
740         std::lock_guard<std::recursive_mutex> lock(workersMutex_);
741         if (worker->state_ == WorkerState::BLOCKED) {
742             return;
743         }
744         idleWorkers_.insert(worker);
745     }
746     if (GetTaskNum() != 0) {
747         NotifyExecuteTask();
748     }
749     CountTraceForWorker();
750 }
751 
NotifyWorkerCreated(Worker * worker)752 void TaskManager::NotifyWorkerCreated(Worker* worker)
753 {
754     NotifyWorkerIdle(worker);
755     expandingCount_--;
756 }
757 
NotifyWorkerAdded(Worker * worker)758 void TaskManager::NotifyWorkerAdded(Worker* worker)
759 {
760     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
761     workers_.insert(worker);
762     HILOG_DEBUG("taskpool:: a new worker has been added and the current num is %{public}zu", workers_.size());
763 }
764 
NotifyWorkerRunning(Worker * worker)765 void TaskManager::NotifyWorkerRunning(Worker* worker)
766 {
767     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
768     idleWorkers_.erase(worker);
769     CountTraceForWorkerWithoutLock();
770 }
771 
GetRunningWorkers()772 uint32_t TaskManager::GetRunningWorkers()
773 {
774     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
775     return std::count_if(workers_.begin(), workers_.end(), [](const auto& worker) {
776         return worker->HasRunningTasks();
777     });
778 }
779 
GetTimeoutWorkers()780 uint32_t TaskManager::GetTimeoutWorkers()
781 {
782     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
783     return timeoutWorkers_.size();
784 }
785 
GetTaskNum()786 uint32_t TaskManager::GetTaskNum()
787 {
788     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
789     uint32_t sum = 0;
790     for (const auto& elements : taskQueues_) {
791         sum += elements->GetTaskNum();
792     }
793     return sum;
794 }
795 
GetNonIdleTaskNum()796 uint32_t TaskManager::GetNonIdleTaskNum()
797 {
798     return nonIdleTaskNum_;
799 }
800 
IncreaseTaskNum(Priority priority)801 void TaskManager::IncreaseTaskNum(Priority priority)
802 {
803     totalTaskNum_.fetch_add(1);
804     if (priority != Priority::IDLE) {
805         nonIdleTaskNum_.fetch_add(1);
806     }
807 }
808 
DecreaseTaskNum(Priority priority)809 void TaskManager::DecreaseTaskNum(Priority priority)
810 {
811     totalTaskNum_.fetch_sub(1);
812     if (priority != Priority::IDLE) {
813         nonIdleTaskNum_.fetch_sub(1);
814     }
815 }
816 
GetThreadNum()817 uint32_t TaskManager::GetThreadNum()
818 {
819     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
820     return workers_.size();
821 }
822 
EnqueueTaskId(uint32_t taskId,Priority priority)823 void TaskManager::EnqueueTaskId(uint32_t taskId, Priority priority)
824 {
825     {
826         std::lock_guard<std::mutex> lock(taskQueuesMutex_);
827         IncreaseTaskNum(priority);
828         taskQueues_[priority]->EnqueueTaskId(taskId);
829     }
830     TryTriggerExpand();
831     Task* task = GetTask(taskId);
832     if (task == nullptr) {
833         HILOG_FATAL("taskpool:: task is nullptr");
834         return;
835     }
836     task->IncreaseTaskLifecycleCount();
837     ListenerCallBackInfo* info = nullptr;
838     {
839         std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
840         info = task->onEnqueuedCallBackInfo_;
841     }
842     if (info != nullptr) {
843         task->ExecuteListenerCallback(info, taskId);
844     } else { // LOCV_EXCL_BR_LINE
845         HILOG_WARN("taskpool:: onEnqueuedCallBackInfo is null");
846     }
847 }
848 
EraseWaitingTaskId(uint32_t taskId,Priority priority)849 bool TaskManager::EraseWaitingTaskId(uint32_t taskId, Priority priority)
850 {
851     std::lock_guard<std::mutex> lock(taskQueuesMutex_);
852     if (!taskQueues_[priority]->EraseWaitingTaskId(taskId)) {
853         HILOG_WARN("taskpool:: taskId is not in executeQueue when cancel");
854         return false;
855     }
856     DecreaseTaskNum(priority);
857     return true;
858 }
859 
DequeueTaskId()860 std::pair<uint32_t, Priority> TaskManager::DequeueTaskId()
861 {
862     bool isChoose = IsChooseIdle();
863     {
864         std::lock_guard<std::mutex> lock(taskQueuesMutex_);
865         auto& highTaskQueue = taskQueues_[Priority::HIGH];
866         if (!highTaskQueue->IsEmpty() && highPrioExecuteCount_ < HIGH_PRIORITY_TASK_COUNT) {
867             highPrioExecuteCount_++;
868             return GetTaskByPriority(highTaskQueue, Priority::HIGH);
869         }
870         highPrioExecuteCount_ = 0;
871 
872         auto& mediumTaskQueue = taskQueues_[Priority::MEDIUM];
873         if (!mediumTaskQueue->IsEmpty() && mediumPrioExecuteCount_ < MEDIUM_PRIORITY_TASK_COUNT) {
874             mediumPrioExecuteCount_++;
875             return GetTaskByPriority(mediumTaskQueue, Priority::MEDIUM);
876         }
877         mediumPrioExecuteCount_ = 0;
878 
879         auto& lowTaskQueue = taskQueues_[Priority::LOW];
880         if (!lowTaskQueue->IsEmpty()) {
881             return GetTaskByPriority(lowTaskQueue, Priority::LOW);
882         }
883 
884         auto& idleTaskQueue = taskQueues_[Priority::IDLE];
885         if (!IsPerformIdle() && highTaskQueue->IsEmpty() && mediumTaskQueue->IsEmpty() && !idleTaskQueue->IsEmpty() &&
886             isChoose) {
887             return GetTaskByPriority(idleTaskQueue, Priority::IDLE);
888         }
889     }
890     return std::make_pair(0, Priority::LOW);
891 }
892 
IsChooseIdle()893 bool TaskManager::IsChooseIdle()
894 {
895     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
896     if (workers_.size() != idleWorkers_.size()) {
897         return false;
898     }
899     return true;
900 }
901 
GetTaskByPriority(const std::unique_ptr<ExecuteQueue> & taskQueue,Priority priority)902 std::pair<uint32_t, Priority> TaskManager::GetTaskByPriority(const std::unique_ptr<ExecuteQueue>& taskQueue,
903     Priority priority)
904 {
905     uint32_t taskId = taskQueue->DequeueTaskId();
906     DecreaseTaskNum(priority);
907     if (IsDependendByTaskId(taskId)) {
908         EnqueuePendingTaskInfo(taskId, priority);
909         return std::make_pair(0, priority);
910     }
911     preDequeneTime_ = ConcurrentHelper::GetMilliseconds();
912     if (priority == Priority::IDLE && taskId != 0) {
913         SetIsPerformIdle(true);
914     }
915     return std::make_pair(taskId, priority);
916 }
917 
NotifyExecuteTask()918 void TaskManager::NotifyExecuteTask()
919 {
920     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
921     if (GetNonIdleTaskNum() == 0 && workers_.size() != idleWorkers_.size()) {
922         // When there are only idle tasks and workers executing them, it is not triggered
923         HILOG_INFO("taskpool:: not notify execute task");
924         return;
925     }
926     if (idleWorkers_.size() == 0) {
927         HILOG_INFO("taskpool:: idleWorkers is 0");
928         return;
929     }
930 
931     for (auto& worker : idleWorkers_) {
932         worker->NotifyExecuteTask();
933     }
934 }
935 
InitTaskManager(napi_env env)936 void TaskManager::InitTaskManager(napi_env env)
937 {
938     HITRACE_HELPER_METER_NAME("InitTaskManager");
939     if (!isInitialized_.exchange(true, std::memory_order_relaxed)) {
940 #if defined(ENABLE_TASKPOOL_FFRT)
941         globalEnableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpoolglobalenableffrt", 0);
942         if (!globalEnableFfrtFlag_) {
943             UpdateSystemAppFlag();
944             if (IsSystemApp()) {
945                 disableFfrtFlag_ = OHOS::system::GetIntParameter<int>("persist.commonlibrary.taskpooldisableffrt", 0);
946             }
947         }
948         if (EnableFfrt()) {
949             HILOG_INFO("taskpool:: apps use ffrt");
950         } else {
951             HILOG_INFO("taskpool:: apps do not use ffrt");
952         }
953 #endif
954 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
955         mainThreadHandler_ = std::make_shared<OHOS::AppExecFwk::EventHandler>(
956             OHOS::AppExecFwk::EventRunner::GetMainEventRunner());
957 #endif
958         auto mainThreadEngine = NativeEngine::GetMainThreadEngine();
959         if (mainThreadEngine == nullptr) {
960             HILOG_FATAL("taskpool:: mainThreadEngine is nullptr");
961             return;
962         }
963         hostEnv_ = reinterpret_cast<napi_env>(mainThreadEngine);
964         // Add a reserved thread for taskpool
965         CreateWorkers(hostEnv_);
966         // Create a timer to manage worker threads
967         std::thread workerManager([this] {this->RunTaskManager();});
968         workerManager.detach();
969     }
970 }
971 
CreateWorkers(napi_env env,uint32_t num)972 void TaskManager::CreateWorkers(napi_env env, uint32_t num)
973 {
974     HILOG_DEBUG("taskpool:: CreateWorkers, num:%{public}u", num);
975     for (uint32_t i = 0; i < num; i++) {
976         expandingCount_++;
977         auto worker = Worker::WorkerConstructor(env);
978         NotifyWorkerAdded(worker);
979     }
980     CountTraceForWorker();
981 }
982 
RemoveWorker(Worker * worker)983 void TaskManager::RemoveWorker(Worker* worker)
984 {
985     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
986     idleWorkers_.erase(worker);
987     timeoutWorkers_.erase(worker);
988     workers_.erase(worker);
989 }
990 
RestoreWorker(Worker * worker)991 void TaskManager::RestoreWorker(Worker* worker)
992 {
993     std::lock_guard<std::recursive_mutex> lock(workersMutex_);
994     if (UNLIKELY(suspend_)) {
995         suspend_ = false;
996         uv_timer_again(balanceTimer_);
997     }
998     if (worker->state_ == WorkerState::BLOCKED) {
999         // since the worker is blocked, we should add it to the timeout set
1000         timeoutWorkers_.insert(worker);
1001         return;
1002     }
1003     // Since the worker may be executing some tasks in IO thread, we should add it to the
1004     // worker sets and call the 'NotifyWorkerIdle', which can still execute some tasks in its own thread.
1005     HILOG_DEBUG("taskpool:: worker has been restored and the current num is: %{public}zu", workers_.size());
1006     idleWorkers_.emplace_hint(idleWorkers_.end(), worker);
1007     if (GetTaskNum() != 0) {
1008         NotifyExecuteTask();
1009     }
1010 }
1011 
1012 // ---------------------------------- SendData ---------------------------------------
RegisterCallback(napi_env env,uint32_t taskId,std::shared_ptr<CallbackInfo> callbackInfo,const std::string & type)1013 void TaskManager::RegisterCallback(napi_env env, uint32_t taskId, std::shared_ptr<CallbackInfo> callbackInfo,
1014                                    const std::string& type)
1015 {
1016     std::lock_guard<std::mutex> lock(callbackMutex_);
1017     if (callbackInfo != nullptr) {
1018         callbackInfo->type = type;
1019     } else { // LOCV_EXCL_BR_LINE
1020         HILOG_WARN("taskpool:: callbackInfo is null.");
1021     }
1022     callbackTable_[taskId] = callbackInfo;
1023 }
1024 
IncreaseSendDataRefCount(uint32_t taskId)1025 void TaskManager::IncreaseSendDataRefCount(uint32_t taskId)
1026 {
1027     if (taskId == 0) { // do not support func
1028         return;
1029     }
1030     std::lock_guard<std::mutex> lock(callbackMutex_);
1031     auto iter = callbackTable_.find(taskId);
1032     if (iter == callbackTable_.end() || iter->second == nullptr) {
1033         return;
1034     }
1035     iter->second->refCount++;
1036 }
1037 
DecreaseSendDataRefCount(napi_env env,uint32_t taskId,Task * task)1038 void TaskManager::DecreaseSendDataRefCount(napi_env env, uint32_t taskId, Task* task)
1039 {
1040     if (taskId == 0) { // do not support func
1041         return;
1042     }
1043     std::lock_guard<std::mutex> lock(callbackMutex_);
1044     auto iter = callbackTable_.find(taskId);
1045     if (iter == callbackTable_.end() || iter->second == nullptr) {
1046         return;
1047     }
1048 
1049     if (task != nullptr && !task->IsValid()) {
1050         callbackTable_.erase(iter);
1051         return;
1052     }
1053 
1054     if (--iter->second->refCount == 0) {
1055         callbackTable_.erase(iter);
1056     }
1057 }
1058 
ExecuteSendData(napi_env env,TaskResultInfo * resultInfo,Task * task)1059 void TaskManager::ExecuteSendData(napi_env env, TaskResultInfo* resultInfo, Task* task)
1060 {
1061     std::lock_guard<std::mutex> lock(callbackMutex_);
1062     auto iter = callbackTable_.find(task->GetTaskId());
1063     if (iter == callbackTable_.end() || iter->second == nullptr) {
1064         HILOG_ERROR("taskpool:: the callback in SendData is not registered on the host side");
1065         ErrorHelper::ThrowError(env, ErrorHelper::ERR_NOT_REGISTERED);
1066         delete resultInfo;
1067         return;
1068     }
1069     auto callbackInfo = iter->second;
1070     ++callbackInfo->refCount;
1071     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
1072     workerEngine->IncreaseListeningCounter();
1073     std::weak_ptr<CallbackInfo> info = callbackInfo;
1074     auto onCallbackTask = [info, resultInfo]([[maybe_unused]] void* data) {
1075         auto callbackInfo = info.lock();
1076         if (callbackInfo == nullptr) {
1077             HILOG_ERROR("taskpool:: callbackInfo may have been released");
1078             delete resultInfo;
1079             return;
1080         }
1081         TaskPool::ExecuteOnReceiveDataCallback(callbackInfo.get(), resultInfo);
1082     };
1083     auto worker = task->GetWorker();
1084     auto hostEnv = task->GetEnv();
1085     auto priority = g_napiPriorityMap.at(worker->GetPriority());
1086     uint64_t handleId = 0;
1087     napi_status status = napi_send_cancelable_event(hostEnv, onCallbackTask, nullptr, priority,
1088                                                     &handleId, ON_CALLBACK_STR);
1089     if (status != napi_ok) {
1090         HILOG_ERROR("taskpool:: failed to send event to the host side");
1091         --callbackInfo->refCount;
1092         workerEngine->DecreaseListeningCounter();
1093         delete resultInfo;
1094     }
1095 }
1096 // ---------------------------------- SendData ---------------------------------------
1097 
NotifyDependencyTaskInfo(uint32_t taskId)1098 void TaskManager::NotifyDependencyTaskInfo(uint32_t taskId)
1099 {
1100     HILOG_DEBUG("taskpool:: task:%{public}s NotifyDependencyTaskInfo", std::to_string(taskId).c_str());
1101     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
1102     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1103     auto iter = dependentTaskInfos_.find(taskId);
1104     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1105         HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1106         return;
1107     }
1108     for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1109         auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1110         RemoveDependencyById(taskId, *taskIdIter);
1111         taskIdIter = iter->second.erase(taskIdIter);
1112         if (taskInfo.first != 0) {
1113             EnqueueTaskId(taskInfo.first, taskInfo.second);
1114         }
1115     }
1116 }
1117 
RemoveDependencyById(uint32_t dependentTaskId,uint32_t taskId)1118 void TaskManager::RemoveDependencyById(uint32_t dependentTaskId, uint32_t taskId)
1119 {
1120     HILOG_DEBUG("taskpool::task:%{public}s RemoveDependencyById", std::to_string(taskId).c_str());
1121     // remove dependency after task execute
1122     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1123     auto dependTaskIter = dependTaskInfos_.find(taskId);
1124     if (dependTaskIter != dependTaskInfos_.end()) {
1125         auto dependTaskInnerIter = dependTaskIter->second.find(dependentTaskId);
1126         if (dependTaskInnerIter != dependTaskIter->second.end()) {
1127             dependTaskIter->second.erase(dependTaskInnerIter);
1128         }
1129     }
1130 }
1131 
IsDependendByTaskId(uint32_t taskId)1132 bool TaskManager::IsDependendByTaskId(uint32_t taskId)
1133 {
1134     std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1135     auto iter = dependTaskInfos_.find(taskId);
1136     if (iter == dependTaskInfos_.end() || iter->second.empty()) {
1137         return false;
1138     }
1139     return true;
1140 }
1141 
IsDependentByTaskId(uint32_t dependentTaskId)1142 bool TaskManager::IsDependentByTaskId(uint32_t dependentTaskId)
1143 {
1144     std::shared_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1145     auto iter = dependentTaskInfos_.find(dependentTaskId);
1146     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1147         return false;
1148     }
1149     return true;
1150 }
1151 
StoreTaskDependency(uint32_t taskId,std::set<uint32_t> taskIdSet)1152 bool TaskManager::StoreTaskDependency(uint32_t taskId, std::set<uint32_t> taskIdSet)
1153 {
1154     HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDependency", std::to_string(taskId).c_str());
1155     StoreDependentTaskInfo(taskIdSet, taskId);
1156     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1157     auto iter = dependTaskInfos_.find(taskId);
1158     if (iter == dependTaskInfos_.end()) {
1159         for (const auto& dependentId : taskIdSet) {
1160             auto idIter = dependTaskInfos_.find(dependentId);
1161             if (idIter == dependTaskInfos_.end()) {
1162                 continue;
1163             }
1164             if (!CheckCircularDependency(taskIdSet, idIter->second, taskId)) {
1165                 return false;
1166             }
1167         }
1168         dependTaskInfos_.emplace(taskId, std::move(taskIdSet));
1169         return true;
1170     }
1171 
1172     for (const auto& dependentId : iter->second) {
1173         auto idIter = dependTaskInfos_.find(dependentId);
1174         if (idIter == dependTaskInfos_.end()) {
1175             continue;
1176         }
1177         if (!CheckCircularDependency(iter->second, idIter->second, taskId)) {
1178             return false;
1179         }
1180     }
1181     iter->second.insert(taskIdSet.begin(), taskIdSet.end());
1182     return true;
1183 }
1184 
CheckCircularDependency(std::set<uint32_t> dependentIdSet,std::set<uint32_t> idSet,uint32_t taskId)1185 bool TaskManager::CheckCircularDependency(std::set<uint32_t> dependentIdSet, std::set<uint32_t> idSet, uint32_t taskId)
1186 {
1187     for (const auto& id : idSet) {
1188         if (id == taskId) {
1189             return false;
1190         }
1191         auto iter = dependentIdSet.find(id);
1192         if (iter != dependentIdSet.end()) {
1193             continue;
1194         }
1195         auto dIter = dependTaskInfos_.find(id);
1196         if (dIter == dependTaskInfos_.end()) {
1197             continue;
1198         }
1199         if (!CheckCircularDependency(dependentIdSet, dIter->second, taskId)) {
1200             return false;
1201         }
1202     }
1203     return true;
1204 }
1205 
RemoveTaskDependency(uint32_t taskId,uint32_t dependentId)1206 bool TaskManager::RemoveTaskDependency(uint32_t taskId, uint32_t dependentId)
1207 {
1208     HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDependency", std::to_string(taskId).c_str());
1209     RemoveDependentTaskInfo(dependentId, taskId);
1210     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1211     auto iter = dependTaskInfos_.find(taskId);
1212     if (iter == dependTaskInfos_.end()) {
1213         return false;
1214     }
1215     auto dependIter = iter->second.find(dependentId);
1216     if (dependIter ==  iter->second.end()) {
1217         return false;
1218     }
1219     iter->second.erase(dependIter);
1220     return true;
1221 }
1222 
EnqueuePendingTaskInfo(uint32_t taskId,Priority priority)1223 void TaskManager::EnqueuePendingTaskInfo(uint32_t taskId, Priority priority)
1224 {
1225     if (taskId == 0) {
1226         return;
1227     }
1228     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1229     pendingTaskInfos_.emplace(taskId, priority);
1230 }
1231 
DequeuePendingTaskInfo(uint32_t taskId)1232 std::pair<uint32_t, Priority> TaskManager::DequeuePendingTaskInfo(uint32_t taskId)
1233 {
1234     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1235     if (pendingTaskInfos_.empty()) {
1236         return std::make_pair(0, Priority::DEFAULT);
1237     }
1238     std::pair<uint32_t, Priority> result;
1239     for (auto it = pendingTaskInfos_.begin(); it != pendingTaskInfos_.end(); ++it) {
1240         if (it->first == taskId) {
1241             result = std::make_pair(it->first, it->second);
1242             it = pendingTaskInfos_.erase(it);
1243             break;
1244         }
1245     }
1246     return result;
1247 }
1248 
RemovePendingTaskInfo(uint32_t taskId)1249 void TaskManager::RemovePendingTaskInfo(uint32_t taskId)
1250 {
1251     HILOG_DEBUG("taskpool:: task:%{public}s RemovePendingTaskInfo", std::to_string(taskId).c_str());
1252     std::unique_lock<std::shared_mutex> lock(pendingTaskInfosMutex_);
1253     pendingTaskInfos_.erase(taskId);
1254 }
1255 
StoreDependentTaskInfo(std::set<uint32_t> dependentTaskIdSet,uint32_t taskId)1256 void TaskManager::StoreDependentTaskInfo(std::set<uint32_t> dependentTaskIdSet, uint32_t taskId)
1257 {
1258     HILOG_DEBUG("taskpool:: task:%{public}s StoreDependentTaskInfo", std::to_string(taskId).c_str());
1259     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1260     for (const auto& id : dependentTaskIdSet) {
1261         auto iter = dependentTaskInfos_.find(id);
1262         if (iter == dependentTaskInfos_.end()) {
1263             std::set<uint32_t> set{taskId};
1264             dependentTaskInfos_.emplace(id, std::move(set));
1265         } else {
1266             iter->second.emplace(taskId);
1267         }
1268     }
1269 }
1270 
RemoveDependentTaskInfo(uint32_t dependentTaskId,uint32_t taskId)1271 void TaskManager::RemoveDependentTaskInfo(uint32_t dependentTaskId, uint32_t taskId)
1272 {
1273     HILOG_DEBUG("taskpool:: task:%{public}s RemoveDependentTaskInfo", std::to_string(taskId).c_str());
1274     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1275     auto iter = dependentTaskInfos_.find(dependentTaskId);
1276     if (iter == dependentTaskInfos_.end()) {
1277         return;
1278     }
1279     auto taskIter = iter->second.find(taskId);
1280     if (taskIter == iter->second.end()) {
1281         return;
1282     }
1283     iter->second.erase(taskIter);
1284 }
1285 
GetTaskDependInfoToString(uint32_t taskId)1286 std::string TaskManager::GetTaskDependInfoToString(uint32_t taskId)
1287 {
1288     std::shared_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1289     std::string str = "TaskInfos: taskId: " + std::to_string(taskId) + ", dependTaskId:";
1290     auto iter = dependTaskInfos_.find(taskId);
1291     if (iter != dependTaskInfos_.end()) {
1292         for (const auto& id : iter->second) {
1293             str += " " + std::to_string(id);
1294         }
1295     }
1296     return str;
1297 }
1298 
StoreTaskDuration(uint32_t taskId,uint64_t totalDuration,uint64_t cpuDuration)1299 void TaskManager::StoreTaskDuration(uint32_t taskId, uint64_t totalDuration, uint64_t cpuDuration)
1300 {
1301     HILOG_DEBUG("taskpool:: task:%{public}s StoreTaskDuration", std::to_string(taskId).c_str());
1302     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1303     auto iter = taskDurationInfos_.find(taskId);
1304     if (iter == taskDurationInfos_.end()) {
1305         std::pair<uint64_t, uint64_t> durationData = std::make_pair(totalDuration, cpuDuration);
1306         taskDurationInfos_.emplace(taskId, std::move(durationData));
1307     } else {
1308         if (totalDuration != 0) {
1309             iter->second.first = totalDuration;
1310         }
1311         if (cpuDuration != 0) {
1312             iter->second.second = cpuDuration;
1313         }
1314     }
1315 }
1316 
GetTaskDuration(uint32_t taskId,std::string durationType)1317 uint64_t TaskManager::GetTaskDuration(uint32_t taskId, std::string durationType)
1318 {
1319     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1320     auto iter = taskDurationInfos_.find(taskId);
1321     if (iter == taskDurationInfos_.end()) {
1322         return 0;
1323     }
1324     if (durationType == TASK_TOTAL_TIME) {
1325         return iter->second.first;
1326     } else if (durationType == TASK_CPU_TIME) {
1327         return iter->second.second;
1328     } else if (iter->second.first == 0) {
1329         return 0;
1330     }
1331     return iter->second.first - iter->second.second;
1332 }
1333 
RemoveTaskDuration(uint32_t taskId)1334 void TaskManager::RemoveTaskDuration(uint32_t taskId)
1335 {
1336     HILOG_DEBUG("taskpool:: task:%{public}s RemoveTaskDuration", std::to_string(taskId).c_str());
1337     std::unique_lock<std::shared_mutex> lock(taskDurationInfosMutex_);
1338     auto iter = taskDurationInfos_.find(taskId);
1339     if (iter != taskDurationInfos_.end()) {
1340         taskDurationInfos_.erase(iter);
1341     }
1342 }
1343 
StoreLongTaskInfo(uint32_t taskId,Worker * worker)1344 void TaskManager::StoreLongTaskInfo(uint32_t taskId, Worker* worker)
1345 {
1346     std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1347     longTasksMap_.emplace(taskId, worker);
1348 }
1349 
RemoveLongTaskInfo(uint32_t taskId)1350 void TaskManager::RemoveLongTaskInfo(uint32_t taskId)
1351 {
1352     std::unique_lock<std::shared_mutex> lock(longTasksMutex_);
1353     longTasksMap_.erase(taskId);
1354 }
1355 
GetLongTaskInfo(uint32_t taskId)1356 Worker* TaskManager::GetLongTaskInfo(uint32_t taskId)
1357 {
1358     std::shared_lock<std::shared_mutex> lock(longTasksMutex_);
1359     auto iter = longTasksMap_.find(taskId);
1360     return iter != longTasksMap_.end() ? iter->second : nullptr;
1361 }
1362 
TerminateTask(uint32_t taskId)1363 void TaskManager::TerminateTask(uint32_t taskId)
1364 {
1365     HILOG_DEBUG("taskpool:: task:%{public}s TerminateTask", std::to_string(taskId).c_str());
1366     auto worker = GetLongTaskInfo(taskId);
1367     if (UNLIKELY(worker == nullptr)) {
1368         return;
1369     }
1370     worker->TerminateTask(taskId);
1371     RemoveLongTaskInfo(taskId);
1372 }
1373 
ReleaseTaskData(napi_env env,Task * task,bool shouldDeleteTask)1374 void TaskManager::ReleaseTaskData(napi_env env, Task* task, bool shouldDeleteTask)
1375 {
1376     uint32_t taskId = task->taskId_;
1377     if (shouldDeleteTask) {
1378         RemoveTask(taskId);
1379     }
1380 
1381     task->ReleaseData();
1382     task->CancelPendingTask(env);
1383 
1384     task->ClearDelayedTimers();
1385 
1386     if (task->IsFunctionTask() || task->IsGroupFunctionTask()) {
1387         return;
1388     }
1389     if (!task->IsMainThreadTask()) {
1390         task->SetValid(false);
1391     }
1392     DecreaseSendDataRefCount(env, taskId, task);
1393     RemoveTaskDuration(taskId);
1394     RemovePendingTaskInfo(taskId);
1395     ReleaseCallBackInfo(task);
1396     {
1397         std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1398         for (auto dependentTaskIter = dependentTaskInfos_.begin(); dependentTaskIter != dependentTaskInfos_.end();) {
1399             if (dependentTaskIter->second.find(taskId) != dependentTaskIter->second.end()) {
1400                 dependentTaskIter = dependentTaskInfos_.erase(dependentTaskIter);
1401             } else {
1402                 ++dependentTaskIter;
1403             }
1404         }
1405     }
1406     std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1407     auto dependTaskIter = dependTaskInfos_.find(taskId);
1408     if (dependTaskIter != dependTaskInfos_.end()) {
1409         dependTaskInfos_.erase(dependTaskIter);
1410     }
1411 }
1412 
ReleaseCallBackInfo(Task * task)1413 void TaskManager::ReleaseCallBackInfo(Task* task)
1414 {
1415     HILOG_DEBUG("taskpool:: ReleaseCallBackInfo task:%{public}s", std::to_string(task->taskId_).c_str());
1416     std::lock_guard<std::recursive_mutex> lock(task->taskMutex_);
1417     if (task->onEnqueuedCallBackInfo_ != nullptr) {
1418         delete task->onEnqueuedCallBackInfo_;
1419         task->onEnqueuedCallBackInfo_ = nullptr;
1420     }
1421 
1422     if (task->onStartExecutionCallBackInfo_ != nullptr) {
1423         delete task->onStartExecutionCallBackInfo_;
1424         task->onStartExecutionCallBackInfo_ = nullptr;
1425     }
1426 
1427     if (task->onExecutionFailedCallBackInfo_ != nullptr) {
1428         delete task->onExecutionFailedCallBackInfo_;
1429         task->onExecutionFailedCallBackInfo_ = nullptr;
1430     }
1431 
1432     if (task->onExecutionSucceededCallBackInfo_ != nullptr) {
1433         delete task->onExecutionSucceededCallBackInfo_;
1434         task->onExecutionSucceededCallBackInfo_ = nullptr;
1435     }
1436 
1437 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
1438     if (!task->IsMainThreadTask() && task->onStartExecutionSignal_ != nullptr) {
1439         if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) {
1440             ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1441         } else {
1442             delete task->onStartExecutionSignal_;
1443             task->onStartExecutionSignal_ = nullptr;
1444         }
1445     }
1446 #else
1447     if (task->onStartExecutionSignal_ != nullptr) {
1448         if (!ConcurrentHelper::IsUvClosing(task->onStartExecutionSignal_)) {
1449             ConcurrentHelper::UvHandleClose(task->onStartExecutionSignal_);
1450         } else {
1451             delete task->onStartExecutionSignal_;
1452             task->onStartExecutionSignal_ = nullptr;
1453         }
1454     }
1455 #endif
1456 }
1457 
StoreTask(Task * task)1458 void TaskManager::StoreTask(Task* task)
1459 {
1460     uint64_t id = reinterpret_cast<uint64_t>(task);
1461     uint32_t taskId = CalculateTaskId(id);
1462     std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1463     while (tasks_.find(taskId) != tasks_.end()) {
1464         id++;
1465         taskId = CalculateTaskId(id);
1466     }
1467     task->SetTaskId(taskId);
1468     tasks_.emplace(taskId, task);
1469 }
1470 
RemoveTask(uint32_t taskId)1471 void TaskManager::RemoveTask(uint32_t taskId)
1472 {
1473     std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1474     tasks_.erase(taskId);
1475 }
1476 
GetTask(uint32_t taskId)1477 Task* TaskManager::GetTask(uint32_t taskId)
1478 {
1479     std::lock_guard<std::recursive_mutex> lock(tasksMutex_);
1480     auto iter = tasks_.find(taskId);
1481     if (iter == tasks_.end()) {
1482         return nullptr;
1483     }
1484     return iter->second;
1485 }
1486 
1487 #if defined(ENABLE_TASKPOOL_FFRT)
UpdateSystemAppFlag()1488 void TaskManager::UpdateSystemAppFlag()
1489 {
1490     auto abilityManager = OHOS::SystemAbilityManagerClient::GetInstance().GetSystemAbilityManager();
1491     if (abilityManager == nullptr) {
1492         HILOG_ERROR("taskpool:: fail to GetSystemAbility abilityManager is nullptr.");
1493         return;
1494     }
1495     auto bundleObj = abilityManager->GetSystemAbility(OHOS::BUNDLE_MGR_SERVICE_SYS_ABILITY_ID);
1496     if (bundleObj == nullptr) {
1497         HILOG_ERROR("taskpool:: fail to get bundle manager service.");
1498         return;
1499     }
1500     auto bundleMgr = OHOS::iface_cast<OHOS::AppExecFwk::IBundleMgr>(bundleObj);
1501     if (bundleMgr == nullptr) {
1502         HILOG_ERROR("taskpool:: Bundle manager is nullptr.");
1503         return;
1504     }
1505     OHOS::AppExecFwk::BundleInfo bundleInfo;
1506     if (bundleMgr->GetBundleInfoForSelf(
1507         static_cast<int32_t>(OHOS::AppExecFwk::GetBundleInfoFlag::GET_BUNDLE_INFO_WITH_APPLICATION), bundleInfo)
1508         != OHOS::ERR_OK) {
1509         HILOG_ERROR("taskpool:: fail to GetBundleInfoForSelf");
1510         return;
1511     }
1512     isSystemApp_ = bundleInfo.applicationInfo.isSystemApp;
1513 }
1514 #endif
1515 
1516 #if defined(ENABLE_TASKPOOL_EVENTHANDLER)
PostTask(std::function<void ()> task,const char * taskName,Priority priority)1517 bool TaskManager::PostTask(std::function<void()> task, const char* taskName, Priority priority)
1518 {
1519     return mainThreadHandler_->PostTask(task, taskName, 0, TASK_EVENTHANDLER_PRIORITY_MAP.at(priority));
1520 }
1521 #endif
1522 
BatchRejectDeferred(napi_env env,std::list<napi_deferred> deferreds,std::string error)1523 void TaskManager::BatchRejectDeferred(napi_env env, std::list<napi_deferred> deferreds, std::string error)
1524 {
1525     if (deferreds.empty()) {
1526         return;
1527     }
1528     napi_value message = CancelError(env, 0, error.c_str());
1529     for (auto deferred : deferreds) {
1530         napi_reject_deferred(env, deferred, message);
1531     }
1532 }
1533 
CalculateTaskId(uint64_t id)1534 uint32_t TaskManager::CalculateTaskId(uint64_t id)
1535 {
1536     size_t hash = std::hash<uint64_t>{}(id);
1537     uint64_t taskId = static_cast<uint64_t>(hash & MAX_UINT32_T);
1538     while (taskId == 0) {
1539         ++taskId;
1540         hash = std::hash<uint64_t>{}(taskId);
1541         taskId = static_cast<uint64_t>(hash & MAX_UINT32_T);
1542     }
1543     return static_cast<uint32_t>(taskId);
1544 }
1545 
ClearDependentTask(uint32_t taskId)1546 void TaskManager::ClearDependentTask(uint32_t taskId)
1547 {
1548     HILOG_DEBUG("taskpool:: task:%{public}s ClearDependentTask", std::to_string(taskId).c_str());
1549     RemoveDependTaskByTaskId(taskId);
1550     DequeuePendingTaskInfo(taskId);
1551     std::unique_lock<std::shared_mutex> lock(dependentTaskInfosMutex_);
1552     RemoveDependentTaskByTaskId(taskId);
1553 }
1554 
RemoveDependTaskByTaskId(uint32_t taskId)1555 void TaskManager::RemoveDependTaskByTaskId(uint32_t taskId)
1556 {
1557     std::set<uint32_t> dependTaskIds;
1558     {
1559         std::unique_lock<std::shared_mutex> lock(dependTaskInfosMutex_);
1560         auto iter = dependTaskInfos_.find(taskId);
1561         if (iter == dependTaskInfos_.end()) {
1562             return;
1563         }
1564         dependTaskIds.insert(iter->second.begin(), iter->second.end());
1565         dependTaskInfos_.erase(iter);
1566     }
1567     for (auto dependTaskId : dependTaskIds) {
1568         RemoveDependentTaskInfo(dependTaskId, taskId);
1569     }
1570 }
1571 
RemoveDependentTaskByTaskId(uint32_t taskId)1572 void TaskManager::RemoveDependentTaskByTaskId(uint32_t taskId)
1573 {
1574     auto iter = dependentTaskInfos_.find(taskId);
1575     if (iter == dependentTaskInfos_.end() || iter->second.empty()) {
1576         HILOG_DEBUG("taskpool:: dependentTaskInfo empty");
1577         return;
1578     }
1579     for (auto taskIdIter = iter->second.begin(); taskIdIter != iter->second.end();) {
1580         auto taskInfo = DequeuePendingTaskInfo(*taskIdIter);
1581         RemoveDependencyById(taskId, *taskIdIter);
1582         auto id = *taskIdIter;
1583         taskIdIter = iter->second.erase(taskIdIter);
1584         auto task = GetTask(id);
1585         if (task == nullptr) {
1586             continue;
1587         }
1588         if (task->currentTaskInfo_ != nullptr) {
1589             EraseWaitingTaskId(task->taskId_, task->currentTaskInfo_->priority);
1590         }
1591         task->DisposeCanceledTask();
1592         RemoveDependentTaskByTaskId(task->taskId_);
1593     }
1594 }
1595 
WriteHisysForFfrtAndUv(Worker * worker,HisyseventParams * hisyseventParams)1596 void TaskManager::WriteHisysForFfrtAndUv(Worker* worker, HisyseventParams* hisyseventParams)
1597 {
1598 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1599     if (!EnableFfrt() || hisyseventParams == nullptr) {
1600         return;
1601     }
1602     int32_t tid = 0;
1603     if (worker != nullptr) {
1604         auto loop = worker->GetWorkerLoop();
1605         uint64_t loopAddress = reinterpret_cast<uint64_t>(loop);
1606         hisyseventParams->message += "; current runningLoop: " + std::to_string(loopAddress);
1607         tid = worker->tid_;
1608     }
1609     hisyseventParams->tid = tid;
1610     hisyseventParams->pid = getpid();
1611     hisyseventParams->message += "; idleWorkers num: " + std::to_string(idleWorkers_.size());
1612     hisyseventParams->message += "; workers num: " + std::to_string(workers_.size());
1613     int32_t ret = DfxHisysEvent::WriteFfrtAndUv(hisyseventParams);
1614     if (ret < 0) {
1615         HILOG_WARN("taskpool:: HisyseventWrite failed, ret: %{public}s", std::to_string(ret).c_str());
1616     }
1617 #endif
1618 }
1619 
CheckTasksAndReportHisysEvent()1620 void TaskManager::CheckTasksAndReportHisysEvent()
1621 {
1622 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1623     if (!EnableFfrt()) {
1624         return;
1625     }
1626     uint64_t currTime = ConcurrentHelper::GetMilliseconds();
1627     auto taskNum = GetTaskNum();
1628     if (taskNum == 0 || currTime - preDequeneTime_ < UNEXECUTE_TASK_TIME || preDequeneTime_ < reportTime_) {
1629         return;
1630     }
1631     std::string message = "Task scheduling timeout, total task num:" + std::to_string(taskNum);
1632     HisyseventParams* hisyseventParams = new HisyseventParams();
1633     hisyseventParams->methodName = "CheckTasksAndReportHisysEvent";
1634     hisyseventParams->funName = "";
1635     hisyseventParams->logType = "ffrt";
1636     hisyseventParams->code = DfxHisysEvent::TASK_TIMEOUT;
1637     hisyseventParams->message = message;
1638     WriteHisysForFfrtAndUv(nullptr, hisyseventParams);
1639     reportTime_ = currTime;
1640 #endif
1641 }
1642 
WorkerAliveAndReport(Worker * worker)1643 void TaskManager::WorkerAliveAndReport(Worker* worker)
1644 {
1645 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1646     uint64_t workerWaitTime = worker->GetWaitTime();
1647     uint64_t currTime = static_cast<uint64_t>(std::chrono::duration_cast<std::chrono::seconds>(
1648         std::chrono::steady_clock::now().time_since_epoch()).count());
1649     uint64_t intervalTime = currTime - workerWaitTime;
1650     if (!worker->IsNeedReport(intervalTime)) {
1651         return;
1652     }
1653     std::string message = "worker uv alive, may have handle leak";
1654     HisyseventParams* hisyseventParams = new HisyseventParams();
1655     hisyseventParams->methodName = "WorkerAliveAndReport";
1656     hisyseventParams->funName = "";
1657     hisyseventParams->logType = "ffrt";
1658     hisyseventParams->code = DfxHisysEvent::WORKER_ALIVE;
1659     hisyseventParams->message = message;
1660     hisyseventParams->waitTime = intervalTime;
1661     WriteHisysForFfrtAndUv(worker, hisyseventParams);
1662     worker->IncreaseReportCount();
1663 #endif
1664 }
1665 
UvReportHisysEvent(Worker * worker,std::string methodName,std::string funName,std::string message,int32_t code)1666 void TaskManager::UvReportHisysEvent(Worker* worker, std::string methodName, std::string funName, std::string message,
1667                                      int32_t code)
1668 {
1669 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
1670     HisyseventParams* hisyseventParams = new HisyseventParams();
1671     hisyseventParams->methodName = methodName;
1672     hisyseventParams->funName = funName;
1673     hisyseventParams->logType = "uv";
1674     hisyseventParams->code = code;
1675     hisyseventParams->message = message;
1676     WriteHisysForFfrtAndUv(worker, hisyseventParams);
1677 #endif
1678 }
1679 
CancelError(napi_env env,int32_t errCode,const char * errMessage,napi_value result,bool success)1680 napi_value TaskManager::CancelError(napi_env env, int32_t errCode, const char* errMessage, napi_value result,
1681                                     bool success)
1682 {
1683     std::string errTitle = "";
1684     napi_value concurrentError = nullptr;
1685 
1686     napi_value code = nullptr;
1687     napi_create_uint32(env, errCode, &code);
1688 
1689     napi_value name = nullptr;
1690     std::string errName = "BusinessError";
1691 
1692     if (errCode == ErrorHelper::ERR_ASYNCRUNNER_TASK_CANCELED) {
1693         errTitle = "The asyncRunner task has been canceled.";
1694     }
1695 
1696     if (errCode == 0 && errMessage == nullptr) {
1697         errTitle = "taskpool:: task has been canceled";
1698     }
1699 
1700     napi_create_string_utf8(env, errName.c_str(), NAPI_AUTO_LENGTH, &name);
1701     napi_value msg = nullptr;
1702     if (errMessage == nullptr) {
1703         napi_create_string_utf8(env, errTitle.c_str(), NAPI_AUTO_LENGTH, &msg);
1704     } else {
1705         napi_create_string_utf8(env, (errTitle + std::string(errMessage)).c_str(), NAPI_AUTO_LENGTH, &msg);
1706     }
1707 
1708     napi_create_error(env, nullptr, msg, &concurrentError);
1709     napi_set_named_property(env, concurrentError, "code", code);
1710     napi_set_named_property(env, concurrentError, "name", name);
1711     napi_value data = nullptr;
1712     napi_create_object(env, &data);
1713     napi_value resultValue = nullptr;
1714     napi_value errorValue = msg;
1715     napi_get_undefined(env, &resultValue);
1716     if (result != nullptr) {
1717         bool isError = false;
1718         napi_is_error(env, result, &isError);
1719         napi_value error = NapiHelper::GetNameProperty(env, result, "error");
1720         if (NapiHelper::IsNotUndefined(env, error)) {
1721             errorValue = error;
1722         } else if (isError && !success) {
1723             napi_value errMsg = NapiHelper::GetNameProperty(env, result, "message");
1724             errorValue = errMsg;
1725         } else {
1726             resultValue = result;
1727         }
1728     }
1729     napi_set_named_property(env, data, "result", resultValue);
1730     napi_set_named_property(env, data, "error", errorValue);
1731     napi_set_named_property(env, concurrentError, "data", data);
1732     return concurrentError;
1733 }
1734 
SetIsPerformIdle(bool performIdle)1735 void TaskManager::SetIsPerformIdle(bool performIdle)
1736 {
1737     isPerformIdle_ = performIdle;
1738 }
1739 
IsPerformIdle() const1740 bool TaskManager::IsPerformIdle() const
1741 {
1742     return isPerformIdle_;
1743 }
1744 
GetTotalTaskNum() const1745 uint32_t TaskManager::GetTotalTaskNum() const
1746 {
1747     return totalTaskNum_;
1748 }
1749 
AddCountTraceForWorkerLog(bool needLog,int64_t threadNum,int64_t idleThreadNum,int64_t timeoutThreadNum)1750 void TaskManager::AddCountTraceForWorkerLog(bool needLog, int64_t threadNum, int64_t idleThreadNum,
1751                                             int64_t timeoutThreadNum)
1752 {
1753     if (needLog) {
1754         HILOG_INFO("taskpool:: threads: %{public}s, running: %{public}s, idle: %{public}s, timeout: %{public}s",
1755             std::to_string(threadNum).c_str(), std::to_string(threadNum - idleThreadNum).c_str(),
1756             std::to_string(idleThreadNum).c_str(), std::to_string(timeoutThreadNum).c_str());
1757     }
1758 }
1759 } // namespace Commonlibrary::Concurrent::TaskPoolModule