• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "worker.h"
17 #include "helper/concurrent_helper.h"
18 
19 #if defined(ENABLE_TASKPOOL_FFRT)
20 #include "c/executor_task.h"
21 #include "ffrt_inner.h"
22 #endif
23 #include "sys_timer.h"
24 #include "helper/hitrace_helper.h"
25 #include "process_helper.h"
26 #include "taskpool.h"
27 #include "task_group_manager.h"
28 #include "native_engine.h"
29 
30 namespace Commonlibrary::Concurrent::TaskPoolModule {
31 using namespace OHOS::JsSysModule;
32 using namespace Commonlibrary::Platform;
33 static constexpr uint32_t TASKPOOL_TYPE = 2;
34 static constexpr uint32_t WORKER_ALIVE_TIME = 1800000; // 1800000: 30min
35 static constexpr int32_t MAX_REPORT_TIMES = 3;
36 
PriorityScope(Worker * worker,Priority taskPriority)37 Worker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker)
38 {
39     if (taskPriority != worker->priority_) {
40         HILOG_DEBUG("taskpool:: reset worker priority to match task priority");
41         if (TaskManager::GetInstance().EnableFfrt()) {
42 #if defined(ENABLE_TASKPOOL_FFRT)
43             if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) {
44                 SetWorkerPriority(taskPriority);
45             }
46 #endif
47         } else {
48             SetWorkerPriority(taskPriority);
49         }
50         worker->priority_ = taskPriority;
51     }
52 }
53 
~RunningScope()54 Worker::RunningScope::~RunningScope()
55 {
56     HILOG_DEBUG("taskpool:: RunningScope destruction");
57     if (scope_ != nullptr) {
58         napi_close_handle_scope(worker_->workerEnv_, scope_);
59     }
60     worker_->NotifyIdle();
61     worker_->idleState_ = true;
62 }
63 
WorkerConstructor(napi_env env)64 Worker* Worker::WorkerConstructor(napi_env env)
65 {
66     HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]");
67     Worker* worker = new Worker(env);
68     worker->StartExecuteInThread();
69     return worker;
70 }
71 
CloseHandles()72 void Worker::CloseHandles()
73 {
74     // set all handles to nullptr so that they can not be used even when the loop is re-running
75     ConcurrentHelper::UvHandleClose(performTaskSignal_);
76 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
77     ConcurrentHelper::UvHandleClose(debuggerOnPostTaskSignal_);
78 #endif
79     ConcurrentHelper::UvHandleClose(clearWorkerSignal_);
80     ConcurrentHelper::UvHandleClose(triggerGCCheckSignal_);
81 }
82 
ReleaseWorkerHandles(const uv_async_t * req)83 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
84 {
85     auto worker = static_cast<Worker*>(req->data);
86     HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_);
87     if (!worker->CheckFreeConditions()) {
88         return;
89     }
90 
91     TaskManager::GetInstance().RemoveWorker(worker);
92     HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
93     HILOG_INFO("taskpool:: the thread is idle and will be released, and the total num is %{public}u now",
94         TaskManager::GetInstance().GetThreadNum());
95     // when there is no active handle, worker loop will stop automatically.
96     worker->CloseHandles();
97 
98     uv_loop_t* loop = worker->GetWorkerLoop();
99     if (loop != nullptr) {
100         uv_stop(loop);
101     }
102 }
103 
CheckFreeConditions()104 bool Worker::CheckFreeConditions()
105 {
106     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
107     // only when all conditions are met can the worker be freed
108     if (HasRunningTasks()) {
109         HILOG_DEBUG("taskpool:: async callbacks may exist, the worker thread will not exit");
110     } else if (workerEngine->HasListeningCounter()) {
111         HILOG_DEBUG("taskpool:: listening operation exists, the worker thread will not exit");
112     } else if (Timer::HasTimer(workerEnv_)) {
113         HILOG_DEBUG("taskpool:: timer exists, the worker thread will not exit");
114     } else if (workerEngine->HasWaitingRequest()) {
115         HILOG_DEBUG("taskpool:: waiting request exists, the worker thread will not exit");
116     } else if (workerEngine->HasSubEnv()) {
117         HILOG_DEBUG("taskpool:: sub env exists, the worker thread will not exit");
118     } else if (workerEngine->HasPendingJob()) {
119         HILOG_DEBUG("taskpool:: pending job exists, the worker thread will not exit");
120     } else if (workerEngine->IsProfiling()) {
121         HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling");
122     } else {
123         return true;
124     }
125     HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_);
126     TaskManager& taskManager = TaskManager::GetInstance();
127     taskManager.RestoreWorker(this);
128     taskManager.CountTraceForWorker();
129     return false;
130 }
131 
StartExecuteInThread()132 void Worker::StartExecuteInThread()
133 {
134     if (!runner_) {
135         runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
136     }
137     if (runner_) {
138         runner_->Execute(); // start a new thread
139     } else {
140         HILOG_ERROR("taskpool:: runner_ is nullptr");
141     }
142 }
143 
144 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)145 void Worker::HandleDebuggerTask(const uv_async_t* req)
146 {
147     Worker* worker = reinterpret_cast<Worker*>(req->data);
148     if (worker == nullptr) {
149         HILOG_ERROR("taskpool:: worker is null");
150         return;
151     }
152     worker->debuggerMutex_.lock();
153     auto task = std::move(worker->debuggerQueue_.front());
154     worker->debuggerQueue_.pop();
155     worker->debuggerMutex_.unlock();
156     task();
157 }
158 
DebuggerOnPostTask(std::function<void ()> && task)159 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
160 {
161     if (ConcurrentHelper::IsUvActive(debuggerOnPostTaskSignal_)) {
162         std::lock_guard<std::mutex> lock(debuggerMutex_);
163         debuggerQueue_.push(std::move(task));
164         uv_async_send(debuggerOnPostTaskSignal_);
165     }
166 }
167 #endif
168 
169 #if defined(ENABLE_TASKPOOL_FFRT)
InitFfrtInfo()170 void Worker::InitFfrtInfo()
171 {
172     if (TaskManager::GetInstance().EnableFfrt()) {
173         static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = {
174             {ffrt::qos_background, Priority::IDLE},
175             {ffrt::qos_utility, Priority::LOW},
176             {ffrt::qos_default, Priority::DEFAULT},
177             {ffrt::qos_user_initiated, Priority::HIGH},
178         };
179         ffrt_qos_t qos = ffrt_this_task_get_qos();
180         priority_ = FFRTQOS_WORKERPRIORITY_MAP.at(qos);
181         ffrtTaskHandle_ = ffrt_get_cur_task();
182     }
183 }
184 
InitLoopHandleNum()185 void Worker::InitLoopHandleNum()
186 {
187     if (ffrtTaskHandle_ == nullptr) {
188         return;
189     }
190 
191     uv_loop_t* loop = GetWorkerLoop();
192     if (loop != nullptr) {
193         initActiveHandleNum_ = loop->active_handles;
194     } else {
195         HILOG_ERROR("taskpool:: worker loop is nullptr when init loop handle num.");
196     }
197 }
198 
IsLoopActive()199 bool Worker::IsLoopActive()
200 {
201     uv_loop_t* loop = GetWorkerLoop();
202     if (loop != nullptr) {
203         return uv_loop_alive_taskpool(loop, initActiveHandleNum_);
204     } else {
205         HILOG_ERROR("taskpool:: worker loop is nullptr when judge loop alive.");
206         return false;
207     }
208 }
209 
GetWaitTime()210 uint64_t Worker::GetWaitTime()
211 {
212     return ffrt_epoll_get_wait_time(ffrtTaskHandle_);
213 }
214 #endif
215 
ExecuteInThread(const void * data)216 void Worker::ExecuteInThread(const void* data)
217 {
218     HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
219     auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
220     {
221         napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
222         if (worker->workerEnv_ == nullptr) {
223             HILOG_ERROR("taskpool:: worker create runtime failed");
224             return;
225         }
226         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
227         // mark worker env is taskpoolThread
228         workerEngine->MarkTaskPoolThread();
229         workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback);
230     }
231     uv_loop_t* loop = worker->GetWorkerLoop();
232     if (loop == nullptr) {
233         HILOG_ERROR("taskpool:: loop is nullptr");
234         return;
235     }
236     // save the worker tid
237     worker->tid_ = GetThreadId();
238 
239     // Init worker task execute signal
240     ConcurrentHelper::UvHandleInit(loop, worker->performTaskSignal_, Worker::PerformTask, worker);
241     ConcurrentHelper::UvHandleInit(loop, worker->clearWorkerSignal_, Worker::ReleaseWorkerHandles, worker);
242     ConcurrentHelper::UvHandleInit(loop, worker->triggerGCCheckSignal_, Worker::TriggerGCCheck, worker);
243 
244     HITRACE_HELPER_FINISH_TRACE;
245 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
246     // Init debugger task post signal
247     ConcurrentHelper::UvHandleInit(loop, worker->debuggerOnPostTaskSignal_, Worker::HandleDebuggerTask, worker);
248 #endif
249     if (worker->PrepareForWorkerInstance()) {
250         // Call after uv_async_init
251         worker->NotifyWorkerCreated();
252 #if defined(ENABLE_TASKPOOL_FFRT)
253         worker->InitFfrtInfo();
254         worker->InitLoopHandleNum();
255 #endif
256         worker->RunLoop();
257     } else {
258         HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
259     }
260     TaskManager::GetInstance().RemoveWorker(worker);
261     TaskManager::GetInstance().CountTraceForWorker();
262     worker->ReleaseWorkerThreadContent();
263     delete worker;
264     worker = nullptr;
265 }
266 
PrepareForWorkerInstance()267 bool Worker::PrepareForWorkerInstance()
268 {
269     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
270     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
271 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
272     workerEngine->SetDebuggerPostTaskFunc([this](std::function<void()>&& task) {
273         this->DebuggerOnPostTask(std::move(task));
274     });
275 #endif
276     reinterpret_cast<NativeEngine*>(workerEnv_)->RegisterNapiUncaughtExceptionHandler(
277         [workerEngine] (napi_value exception) -> void {
278         if (!NativeEngine::IsAlive(workerEngine)) {
279             HILOG_WARN("napi_env has been destoryed!");
280             return;
281         }
282         std::string name = "";
283         void* data = workerEngine->GetCurrentTaskInfo();
284         if (data != nullptr) {
285             Task* task = static_cast<Task*>(data);
286             name = task->name_;
287         }
288         NapiErrorManager::GetInstance()->NotifyUncaughtException(reinterpret_cast<napi_env>(workerEngine),
289             exception, name, TASKPOOL_TYPE);
290     });
291     reinterpret_cast<NativeEngine*>(workerEnv_)->RegisterAllPromiseCallback(
292         [workerEngine] (napi_value* args) -> void {
293         if (!NativeEngine::IsAlive(workerEngine)) {
294             HILOG_WARN("napi_env has been destoryed!");
295             return;
296         }
297         std::string name = "";
298         void* data = workerEngine->GetCurrentTaskInfo();
299         if (data != nullptr) {
300             Task* task = static_cast<Task*>(data);
301             name = task->name_;
302         }
303         NapiErrorManager::GetInstance()->NotifyUnhandledRejection(reinterpret_cast<napi_env>(workerEngine),
304             args, name, TASKPOOL_TYPE);
305     });
306 
307     if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
308         HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
309         return false;
310     }
311     // register timer interface
312     Timer::RegisterTime(workerEnv_);
313 
314     // Check exception after worker construction
315     if (NapiHelper::IsExceptionPending(workerEnv_)) {
316         HILOG_ERROR("taskpool:: Worker construction occur exception");
317         return false;
318     }
319     return true;
320 }
321 
ReleaseWorkerThreadContent()322 void Worker::ReleaseWorkerThreadContent()
323 {
324     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
325     auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
326     if (workerEngine == nullptr) {
327         HILOG_ERROR("taskpool:: workerEngine is nullptr");
328         return;
329     }
330     if (hostEngine != nullptr) {
331         if (!hostEngine->DeleteWorker(workerEngine)) {
332             HILOG_ERROR("taskpool:: DeleteWorker fail");
333         }
334     }
335     if (state_ == WorkerState::BLOCKED) {
336         HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
337     } else {
338         HITRACE_HELPER_METER_NAME("Thread Exit");
339     }
340 
341     Timer::ClearEnvironmentTimer(workerEnv_);
342     // 2. delete NativeEngine created in worker thread
343     if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
344         HILOG_ERROR("worker:: CallOffWorkerFunc error");
345     }
346     delete workerEngine;
347     workerEnv_ = nullptr;
348 }
349 
NotifyExecuteTask()350 void Worker::NotifyExecuteTask()
351 {
352     if (LIKELY(performTaskSignal_ != nullptr && !uv_is_closing(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) {
353         int ret = uv_async_send(performTaskSignal_);
354         if (ret != 0) {
355             TaskManager::GetInstance().UvReportHisysEvent(this, "NotifyExecuteTask", "uv_async_send",
356                 "uv send performTaskSignal_ failed", ret);
357         }
358     }
359 }
360 
NotifyIdle()361 void Worker::NotifyIdle()
362 {
363     TaskManager::GetInstance().NotifyWorkerIdle(this);
364 }
365 
NotifyWorkerCreated()366 void Worker::NotifyWorkerCreated()
367 {
368     TaskManager::GetInstance().NotifyWorkerCreated(this);
369 }
370 
NotifyTaskBegin()371 void Worker::NotifyTaskBegin()
372 {
373     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
374     workerEngine->NotifyTaskBegin();
375 }
376 
TriggerGCCheck(const uv_async_t * req)377 void Worker::TriggerGCCheck(const uv_async_t* req)
378 {
379     if (req == nullptr || req->data == nullptr) {
380         HILOG_ERROR("taskpool:: req handle is invalid");
381         return;
382     }
383     auto worker = reinterpret_cast<Worker*>(req->data);
384     auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
385     workerEngine->NotifyTaskFinished();
386 }
387 
NotifyTaskFinished()388 void Worker::NotifyTaskFinished()
389 {
390     // trigger gc check by uv and return immediately if the handle is invalid
391     if (UNLIKELY(!ConcurrentHelper::IsUvActive(triggerGCCheckSignal_))) {
392         HILOG_ERROR("taskpool:: triggerGCCheckSignal_ is nullptr or closed");
393         return;
394     } else {
395         uv_async_send(triggerGCCheckSignal_);
396     }
397 
398     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
399     if (--runningCount_ != 0 || workerEngine->HasPendingJob()) {
400         // the worker state is still RUNNING and the start time will be updated
401         startTime_ = ConcurrentHelper::GetMilliseconds();
402     } else {
403         UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE);
404     }
405     idlePoint_ = ConcurrentHelper::GetMilliseconds();
406 }
407 
UpdateWorkerState(WorkerState expect,WorkerState desired)408 bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired)
409 {
410     return state_.compare_exchange_strong(expect, desired);
411 }
412 
PerformTask(const uv_async_t * req)413 void Worker::PerformTask(const uv_async_t* req)
414 {
415     uint64_t startTime = ConcurrentHelper::GetMilliseconds();
416     auto worker = static_cast<Worker*>(req->data);
417     worker->UpdateWorkerWakeUpTime();
418     napi_env env = worker->workerEnv_;
419     TaskManager::GetInstance().NotifyWorkerRunning(worker);
420     auto taskInfo = TaskManager::GetInstance().DequeueTaskId();
421     if (taskInfo.first == 0) {
422         worker->NotifyIdle();
423         return;
424     }
425     RunningScope runningScope(worker);
426     WorkerRunningScope workerRunningScope(env);
427     PriorityScope priorityScope(worker, taskInfo.second);
428     Task* task = TaskManager::GetInstance().GetTask(taskInfo.first);
429     if (task == nullptr) {
430         HILOG_DEBUG("taskpool:: task has been released");
431         return;
432     } else if (!task->IsValid() && task->ShouldDeleteTask(false)) {
433         HILOG_WARN("taskpool:: task is invalid");
434         delete task;
435         return;
436     }
437     // try to record the memory data for gc
438     worker->NotifyTaskBegin();
439 
440     if (!task->UpdateTask(startTime, worker)) {
441         worker->NotifyTaskFinished();
442         return;
443     }
444     if (task->IsGroupTask() && (!TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_))) {
445         return;
446     }
447     if (task->IsLongTask()) {
448         worker->UpdateLongTaskInfo(task);
449     }
450     worker->StoreTaskId(task->taskId_);
451     // tag for trace parse: Task Perform
452     auto loop = worker->GetWorkerLoop();
453     uint64_t loopAddress = reinterpret_cast<uint64_t>(loop);
454     std::string strTrace = "Task Perform: name : "  + task->name_ + ", taskId : " + std::to_string(task->taskId_)
455                             + ", priority : " + std::to_string(taskInfo.second);
456     std::string taskLog = "Task Perform: "  + task->name_ + ", " + std::to_string(task->taskId_) + ", "
457                           "runningLoop: " + std::to_string(loopAddress);
458     HITRACE_HELPER_METER_NAME(strTrace);
459     HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
460 
461     napi_value func = nullptr;
462     napi_value args = nullptr;
463     napi_value errorInfo = task->DeserializeValue(env, &func, &args);
464     if (UNLIKELY(func == nullptr || args == nullptr)) {
465         if (errorInfo != nullptr) {
466             worker->NotifyTaskResult(env, task, errorInfo);
467         }
468         return;
469     }
470     if (!worker->InitTaskPoolFunc(env, func, task)) {
471         return;
472     }
473     worker->hasExecuted_ = true;
474     uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
475     napi_value argsArray[argsNum];
476     for (size_t i = 0; i < argsNum; i++) {
477         argsArray[i] = NapiHelper::GetElement(env, args, i);
478     }
479 
480     if (!task->CheckStartExecution(taskInfo.second)) {
481         if (task->ShouldDeleteTask()) {
482             delete task;
483         }
484         return;
485     }
486     napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr);
487     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
488     workerEngine->ClearCurrentTaskInfo();
489     task->DecreaseRefCount();
490     task->StoreTaskDuration();
491     worker->UpdateExecutedInfo();
492     HandleFunctionException(env, task);
493 }
494 
NotifyTaskResult(napi_env env,Task * task,napi_value result)495 void Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result)
496 {
497     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
498     HILOG_DEBUG("taskpool:: NotifyTaskResult task:%{public}s", std::to_string(task->taskId_).c_str());
499     void* resultData = nullptr;
500     napi_value undefined = NapiHelper::GetUndefinedValue(env);
501     bool defaultTransfer = true;
502     bool defaultCloneSendable = false;
503     napi_status status = napi_serialize_inner(env, result, undefined, undefined,
504                                               defaultTransfer, defaultCloneSendable, &resultData);
505     if ((status != napi_ok || resultData == nullptr) && task->success_) {
506         task->success_ = false;
507         std::string errMessage = "taskpool: failed to serialize result.";
508         HILOG_ERROR("%{public}s", errMessage.c_str());
509         napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
510         NotifyTaskResult(env, task, err);
511         return;
512     }
513     task->result_ = resultData;
514     NotifyHandleTaskResult(task);
515 }
516 
NotifyHandleTaskResult(Task * task)517 void Worker::NotifyHandleTaskResult(Task* task)
518 {
519     if (!task->IsReadyToHandle()) {
520         return;
521     }
522     Worker* worker = reinterpret_cast<Worker*>(task->worker_);
523     if (worker != nullptr) {
524         std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
525         auto iter = std::find(worker->currentTaskId_.begin(), worker->currentTaskId_.end(), task->taskId_);
526         if (iter != worker->currentTaskId_.end()) {
527             worker->currentTaskId_.erase(iter);
528         }
529     } else {
530         TaskManager::GetInstance().UvReportHisysEvent(nullptr, "NotifyHandleTaskResult", "", "worker is nullptr", -1);
531         HILOG_FATAL("taskpool:: worker is nullptr");
532         return;
533     }
534     if (!task->VerifyAndPostResult(worker->priority_)) {
535         if (task->ShouldDeleteTask()) {
536             delete task;
537         }
538     }
539     worker->NotifyTaskFinished();
540 }
541 
TaskResultCallback(napi_env env,napi_value result,bool success,void * data)542 void Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data)
543 {
544     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
545     if (env == nullptr) { // LCOV_EXCL_BR_LINE
546         std::string error = "TaskResultCallback engine is null";
547         TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
548         HILOG_FATAL("taskpool:: %{public}s", error.c_str());
549         return;
550     }
551     if (data == nullptr) { // LCOV_EXCL_BR_LINE
552         std::string error = "data is nullptr";
553         TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
554         HILOG_FATAL("taskpool:: %{public}s", error.c_str());
555         return;
556     }
557     Task* task = static_cast<Task*>(data);
558     if (TaskManager::GetInstance().GetTask(task->taskId_) == nullptr) {
559         std::string error = "task is nullptr, taskId: " + std::to_string(task->taskId_);
560         TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
561         HILOG_FATAL("taskpool:: task is nullptr");
562         return;
563     }
564     auto worker = static_cast<Worker*>(task->worker_);
565     worker->isExecutingLongTask_ = task->IsLongTask();
566     task->DecreaseRefCount();
567     task->ioTime_ = ConcurrentHelper::GetMilliseconds();
568     if (task->cpuTime_ != 0) {
569         uint64_t ioDuration = task->ioTime_ - task->startTime_;
570         uint64_t cpuDuration = task->cpuTime_ - task->startTime_;
571         TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration);
572     }
573     napi_value exception = nullptr;
574     napi_get_and_clear_last_exception(env, &exception);
575     if (exception != nullptr) {
576         HILOG_ERROR("taskpool::TaskResultCallback occur exception");
577         reinterpret_cast<NativeEngine*>(env)->HandleTaskpoolException(exception);
578         task->success_ = false;
579         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
580         NotifyTaskResult(env, task, errorEvent);
581         return;
582     }
583 
584     task->success_ = success;
585     NotifyTaskResult(env, task, result);
586 }
587 
588 // reset qos_user_initiated after perform task
ResetWorkerPriority()589 void Worker::ResetWorkerPriority()
590 {
591     if (priority_ != Priority::HIGH) {
592         if (TaskManager::GetInstance().EnableFfrt()) {
593 #if defined(ENABLE_TASKPOOL_FFRT)
594             if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) {
595                 SetWorkerPriority(Priority::HIGH);
596             }
597 #endif
598         } else {
599             SetWorkerPriority(Priority::HIGH);
600         }
601         priority_ = Priority::HIGH;
602     }
603 }
604 
StoreTaskId(uint32_t taskId)605 void Worker::StoreTaskId(uint32_t taskId)
606 {
607     std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
608     currentTaskId_.emplace_back(taskId);
609 }
610 
InitTaskPoolFunc(napi_env env,napi_value func,Task * task)611 bool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task)
612 {
613     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
614     bool success = workerEngine->InitTaskPoolFunc(env, func, task);
615     napi_value exception;
616     napi_get_and_clear_last_exception(env, &exception);
617     if (exception != nullptr) {
618         HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
619         task->success_ = false;
620         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
621         NotifyTaskResult(env, task, errorEvent);
622         return false;
623     }
624     if (!success) {
625         HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
626         napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
627                                                "taskpool:: function may not be concurrent.");
628         task->success_ = false;
629         NotifyTaskResult(env, task, err);
630         return false;
631     }
632     return true;
633 }
634 
UpdateExecutedInfo()635 void Worker::UpdateExecutedInfo()
636 {
637     // if the worker is blocked, just skip
638     if (LIKELY(state_ != WorkerState::BLOCKED)) {
639         uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_;
640         TaskManager::GetInstance().UpdateExecutedInfo(duration);
641     }
642 }
643 
644 // Only when the worker has no longTask can it be released.
TerminateTask(uint32_t taskId)645 void Worker::TerminateTask(uint32_t taskId)
646 {
647     HILOG_DEBUG("taskpool:: TerminateTask task:%{public}s", std::to_string(taskId).c_str());
648     std::lock_guard<std::mutex> lock(longMutex_);
649     longTasksSet_.erase(taskId);
650     if (longTasksSet_.empty()) {
651         hasLongTask_ = false;
652     }
653 }
654 
655 // to store longTasks' state
UpdateLongTaskInfo(Task * task)656 void Worker::UpdateLongTaskInfo(Task* task)
657 {
658     HILOG_DEBUG("taskpool:: UpdateLongTaskInfo task:%{public}s", std::to_string(task->taskId_).c_str());
659     TaskManager::GetInstance().StoreLongTaskInfo(task->taskId_, this);
660     std::lock_guard<std::mutex> lock(longMutex_);
661     hasLongTask_ = true;
662     isExecutingLongTask_ = true;
663     longTasksSet_.emplace(task->taskId_);
664 }
665 
IsExecutingLongTask()666 bool Worker::IsExecutingLongTask()
667 {
668     return isExecutingLongTask_;
669 }
670 
HasLongTask()671 bool Worker::HasLongTask()
672 {
673     return hasLongTask_;
674 }
675 
HandleFunctionException(napi_env env,Task * task)676 void Worker::HandleFunctionException(napi_env env, Task* task)
677 {
678     napi_value exception = nullptr;
679     napi_get_and_clear_last_exception(env, &exception);
680     if (exception != nullptr) {
681         HILOG_ERROR("taskpool::PerformTask occur exception");
682         reinterpret_cast<NativeEngine*>(env)->HandleTaskpoolException(exception);
683         task->DecreaseRefCount();
684         task->success_ = false;
685         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
686         NotifyTaskResult(env, task, errorEvent);
687         return;
688     }
689     NotifyHandleTaskResult(task);
690 }
691 
PostReleaseSignal()692 void Worker::PostReleaseSignal()
693 {
694     if (UNLIKELY(!ConcurrentHelper::IsUvActive(clearWorkerSignal_))) {
695         HILOG_ERROR("taskpool:: clearWorkerSignal_ is nullptr or closed");
696         return;
697     }
698     uv_async_send(clearWorkerSignal_);
699 }
700 
IsRunnable(uint64_t currTime) const701 bool Worker::IsRunnable(uint64_t currTime) const
702 {
703     bool res = true;
704     if (currTime > wakeUpTime_) {
705         res = (currTime - wakeUpTime_ < 15); // 15: ms
706     }
707     return res;
708 }
709 
UpdateWorkerWakeUpTime()710 void Worker::UpdateWorkerWakeUpTime()
711 {
712     wakeUpTime_ = ConcurrentHelper::GetMilliseconds();
713 }
714 
715 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
IsNeedReport(uint64_t intervalTime)716 bool Worker::IsNeedReport(uint64_t intervalTime)
717 {
718     if (reportCount_ >= MAX_REPORT_TIMES) {
719         return false;
720     }
721     if (intervalTime < (reportCount_ + 1) * WORKER_ALIVE_TIME) {
722         return false;
723     }
724     return true;
725 }
726 
IncreaseReportCount()727 void Worker::IncreaseReportCount()
728 {
729     reportCount_++;
730 }
731 #endif
732 } // namespace Commonlibrary::Concurrent::TaskPoolModule
733