• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "worker.h"
17 #include "helper/concurrent_helper.h"
18 
19 #if defined(ENABLE_TASKPOOL_FFRT)
20 #include "c/executor_task.h"
21 #include "ffrt_inner.h"
22 #endif
23 #include "sys_timer.h"
24 #include "helper/hitrace_helper.h"
25 #include "process_helper.h"
26 #include "taskpool.h"
27 #include "task_group_manager.h"
28 #include "native_engine.h"
29 
30 namespace Commonlibrary::Concurrent::TaskPoolModule {
31 using namespace OHOS::JsSysModule;
32 using namespace Commonlibrary::Platform;
33 static constexpr uint32_t TASKPOOL_TYPE = 2;
34 static constexpr uint32_t WORKER_ALIVE_TIME = 1800000; // 1800000: 30min
35 static constexpr int32_t MAX_REPORT_TIMES = 3;
36 
PriorityScope(Worker * worker,Priority taskPriority)37 Worker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker)
38 {
39     if (taskPriority != worker->priority_) {
40         HILOG_DEBUG("taskpool:: reset worker priority to match task priority");
41         if (TaskManager::GetInstance().EnableFfrt()) {
42 #if defined(ENABLE_TASKPOOL_FFRT)
43             if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) {
44                 SetWorkerPriority(taskPriority);
45             }
46 #endif
47         } else {
48             SetWorkerPriority(taskPriority);
49         }
50         worker->priority_ = taskPriority;
51     }
52 }
53 
~RunningScope()54 Worker::RunningScope::~RunningScope()
55 {
56     HILOG_DEBUG("taskpool:: RunningScope destruction");
57     if (scope_ != nullptr) {
58         napi_close_handle_scope(worker_->workerEnv_, scope_);
59     }
60     worker_->NotifyIdle();
61     worker_->idleState_ = true;
62 }
63 
WorkerConstructor(napi_env env)64 Worker* Worker::WorkerConstructor(napi_env env)
65 {
66     HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]");
67     Worker* worker = new Worker(env);
68     worker->StartExecuteInThread();
69     return worker;
70 }
71 
CloseHandles()72 void Worker::CloseHandles()
73 {
74     // set all handles to nullptr so that they can not be used even when the loop is re-running
75     ConcurrentHelper::UvHandleClose(performTaskSignal_);
76 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
77     ConcurrentHelper::UvHandleClose(debuggerOnPostTaskSignal_);
78 #endif
79     ConcurrentHelper::UvHandleClose(clearWorkerSignal_);
80     ConcurrentHelper::UvHandleClose(triggerGCCheckSignal_);
81 }
82 
ReleaseWorkerHandles(const uv_async_t * req)83 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
84 {
85     auto worker = static_cast<Worker*>(req->data);
86     HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_);
87     if (!worker->CheckFreeConditions()) {
88         return;
89     }
90 
91     TaskManager::GetInstance().RemoveWorker(worker);
92     HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
93     HILOG_INFO("taskpool:: release idle thread, total num is %{public}u",
94         TaskManager::GetInstance().GetThreadNum());
95     // when there is no active handle, worker loop will stop automatically.
96     worker->CloseHandles();
97 
98     uv_loop_t* loop = worker->GetWorkerLoop();
99     if (loop != nullptr) {
100         uv_stop(loop);
101     }
102 }
103 
CheckFreeConditions()104 bool Worker::CheckFreeConditions()
105 {
106     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
107     // only when all conditions are met can the worker be freed
108     if (HasRunningTasks()) {
109         HILOG_DEBUG("taskpool:: async callbacks may exist, the worker thread will not exit");
110     } else if (workerEngine->HasListeningCounter()) {
111         HILOG_DEBUG("taskpool:: listening operation exists, the worker thread will not exit");
112     } else if (Timer::HasTimer(workerEnv_)) {
113         HILOG_DEBUG("taskpool:: timer exists, the worker thread will not exit");
114     } else if (workerEngine->HasWaitingRequest()) {
115         HILOG_DEBUG("taskpool:: waiting request exists, the worker thread will not exit");
116     } else if (workerEngine->HasSubEnv()) {
117         HILOG_DEBUG("taskpool:: sub env exists, the worker thread will not exit");
118     } else if (workerEngine->HasPendingJob()) {
119         HILOG_DEBUG("taskpool:: pending job exists, the worker thread will not exit");
120     } else if (workerEngine->IsProfiling()) {
121         HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling");
122     } else {
123         return true;
124     }
125     HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_);
126     TaskManager& taskManager = TaskManager::GetInstance();
127     taskManager.RestoreWorker(this);
128     taskManager.CountTraceForWorker();
129     return false;
130 }
131 
StartExecuteInThread()132 void Worker::StartExecuteInThread()
133 {
134     if (!runner_) {
135         runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
136     }
137     if (runner_) {
138         runner_->Execute(); // start a new thread
139     } else {
140         HILOG_ERROR("taskpool:: runner_ is nullptr");
141     }
142 }
143 
144 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)145 void Worker::HandleDebuggerTask(const uv_async_t* req)
146 {
147     Worker* worker = reinterpret_cast<Worker*>(req->data);
148     if (worker == nullptr) {
149         HILOG_ERROR("taskpool:: worker is null");
150         return;
151     }
152     worker->debuggerMutex_.lock();
153     auto task = std::move(worker->debuggerQueue_.front());
154     worker->debuggerQueue_.pop();
155     worker->debuggerMutex_.unlock();
156     task();
157 }
158 
DebuggerOnPostTask(std::function<void ()> && task)159 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
160 {
161     if (ConcurrentHelper::IsUvActive(debuggerOnPostTaskSignal_)) {
162         std::lock_guard<std::mutex> lock(debuggerMutex_);
163         debuggerQueue_.push(std::move(task));
164         uv_async_send(debuggerOnPostTaskSignal_);
165     }
166 }
167 #endif
168 
169 #if defined(ENABLE_TASKPOOL_FFRT)
InitFfrtInfo()170 void Worker::InitFfrtInfo()
171 {
172     if (TaskManager::GetInstance().EnableFfrt()) {
173         static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = {
174             {ffrt::qos_background, Priority::IDLE},
175             {ffrt::qos_utility, Priority::LOW},
176             {ffrt::qos_default, Priority::DEFAULT},
177             {ffrt::qos_user_initiated, Priority::HIGH},
178         };
179         ffrt_qos_t qos = ffrt_this_task_get_qos();
180         priority_ = FFRTQOS_WORKERPRIORITY_MAP.at(qos);
181         ffrtTaskHandle_ = ffrt_get_cur_task();
182     }
183 }
184 
InitLoopHandleNum()185 void Worker::InitLoopHandleNum()
186 {
187     if (ffrtTaskHandle_ == nullptr) {
188         return;
189     }
190 
191     uv_loop_t* loop = GetWorkerLoop();
192     if (loop != nullptr) {
193         initActiveHandleNum_ = loop->active_handles;
194     } else {
195         HILOG_ERROR("taskpool:: worker loop is nullptr when init loop handle num.");
196     }
197 }
198 
IsLoopActive()199 bool Worker::IsLoopActive()
200 {
201     uv_loop_t* loop = GetWorkerLoop();
202     if (loop != nullptr) {
203         return uv_loop_alive_taskpool(loop, initActiveHandleNum_);
204     } else {
205         HILOG_ERROR("taskpool:: worker loop is nullptr when judge loop alive.");
206         return false;
207     }
208 }
209 
GetWaitTime()210 uint64_t Worker::GetWaitTime()
211 {
212     return ffrt_epoll_get_wait_time(ffrtTaskHandle_);
213 }
214 #endif
215 
ExecuteInThread(const void * data)216 void Worker::ExecuteInThread(const void* data)
217 {
218     HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
219     auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
220     {
221         napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
222         if (worker->workerEnv_ == nullptr) {
223             HILOG_ERROR("taskpool:: worker create runtime failed");
224             return;
225         }
226         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
227         // mark worker env is taskpoolThread
228         workerEngine->MarkTaskPoolThread();
229         workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback);
230     }
231     uv_loop_t* loop = worker->GetWorkerLoop();
232     if (loop == nullptr) {
233         HILOG_ERROR("taskpool:: loop is nullptr");
234         return;
235     }
236     // save the worker tid
237     worker->tid_ = GetThreadId();
238 
239     // Init worker task execute signal
240     ConcurrentHelper::UvHandleInit(loop, worker->performTaskSignal_, Worker::PerformTask, worker);
241     ConcurrentHelper::UvHandleInit(loop, worker->clearWorkerSignal_, Worker::ReleaseWorkerHandles, worker);
242     ConcurrentHelper::UvHandleInit(loop, worker->triggerGCCheckSignal_, Worker::TriggerGCCheck, worker);
243 
244     HITRACE_HELPER_FINISH_TRACE;
245 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
246     // Init debugger task post signal
247     ConcurrentHelper::UvHandleInit(loop, worker->debuggerOnPostTaskSignal_, Worker::HandleDebuggerTask, worker);
248 #endif
249     if (worker->PrepareForWorkerInstance()) {
250         // Call after uv_async_init
251         worker->NotifyWorkerCreated();
252 #if defined(ENABLE_TASKPOOL_FFRT)
253         worker->InitFfrtInfo();
254         worker->InitLoopHandleNum();
255 #endif
256         worker->RunLoop();
257     } else {
258         HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
259     }
260     TaskManager::GetInstance().RemoveWorker(worker);
261     TaskManager::GetInstance().CountTraceForWorker();
262     worker->ReleaseWorkerThreadContent();
263     delete worker;
264     worker = nullptr;
265 }
266 
PrepareForWorkerInstance()267 bool Worker::PrepareForWorkerInstance()
268 {
269     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
270     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
271 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
272     workerEngine->SetDebuggerPostTaskFunc([this](std::function<void()>&& task) {
273         this->DebuggerOnPostTask(std::move(task));
274     });
275 #endif
276     reinterpret_cast<NativeEngine*>(workerEnv_)->RegisterNapiUncaughtExceptionHandler(
277         [workerEngine] (napi_value exception) -> void {
278         if (!NativeEngine::IsAlive(workerEngine)) {
279             HILOG_WARN("napi_env has been destoryed!");
280             return;
281         }
282         std::string name = "";
283         void* data = workerEngine->GetCurrentTaskInfo();
284         if (data != nullptr) {
285             Task* task = static_cast<Task*>(data);
286             name = task->name_;
287         }
288         NapiErrorManager::GetInstance()->NotifyUncaughtException(reinterpret_cast<napi_env>(workerEngine),
289             exception, name, TASKPOOL_TYPE);
290     });
291     reinterpret_cast<NativeEngine*>(workerEnv_)->RegisterAllPromiseCallback(
292         [workerEngine] (napi_value* args) -> void {
293         if (!NativeEngine::IsAlive(workerEngine)) {
294             HILOG_WARN("napi_env has been destoryed!");
295             return;
296         }
297         std::string name = "";
298         void* data = workerEngine->GetCurrentTaskInfo();
299         if (data != nullptr) {
300             Task* task = static_cast<Task*>(data);
301             name = task->name_;
302         }
303         NapiErrorManager::GetInstance()->NotifyUnhandledRejection(reinterpret_cast<napi_env>(workerEngine),
304             args, name, TASKPOOL_TYPE);
305     });
306 
307     if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
308         HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
309         return false;
310     }
311     // register timer interface
312     Timer::RegisterTime(workerEnv_);
313 
314     // Check exception after worker construction
315     if (NapiHelper::IsExceptionPending(workerEnv_)) {
316         HILOG_ERROR("taskpool:: Worker construction occur exception");
317         return false;
318     }
319     return true;
320 }
321 
ReleaseWorkerThreadContent()322 void Worker::ReleaseWorkerThreadContent()
323 {
324     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
325     auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
326     if (workerEngine == nullptr) {
327         HILOG_ERROR("taskpool:: workerEngine is nullptr");
328         return;
329     }
330     if (hostEngine != nullptr) {
331         if (!hostEngine->DeleteWorker(workerEngine)) {
332             HILOG_ERROR("taskpool:: DeleteWorker fail");
333         }
334     }
335     if (state_ == WorkerState::BLOCKED) {
336         HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
337     } else {
338         HITRACE_HELPER_METER_NAME("Thread Exit");
339     }
340 
341     Timer::ClearEnvironmentTimer(workerEnv_);
342     // 2. delete NativeEngine created in worker thread
343     if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
344         HILOG_ERROR("worker:: CallOffWorkerFunc error");
345     }
346     delete workerEngine;
347     workerEnv_ = nullptr;
348 }
349 
NotifyExecuteTask()350 void Worker::NotifyExecuteTask()
351 {
352     if (LIKELY(performTaskSignal_ != nullptr && !uv_is_closing(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) {
353         int ret = uv_async_send(performTaskSignal_);
354         if (ret != 0) {
355             HILOG_ERROR("taskpool:: worker NotifyExecuteTask uv send failed");
356             TaskManager::GetInstance().UvReportHisysEvent(this, "NotifyExecuteTask", "uv_async_send",
357                 "uv send performTaskSignal_ failed", ret);
358         }
359     } else {
360         HILOG_ERROR("taskpool:: performTaskSignal_ is invalid");
361     }
362 }
363 
NotifyIdle()364 void Worker::NotifyIdle()
365 {
366     TaskManager::GetInstance().NotifyWorkerIdle(this);
367 }
368 
NotifyWorkerCreated()369 void Worker::NotifyWorkerCreated()
370 {
371     TaskManager::GetInstance().NotifyWorkerCreated(this);
372 }
373 
NotifyTaskBegin()374 void Worker::NotifyTaskBegin()
375 {
376     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
377     workerEngine->NotifyTaskBegin();
378 }
379 
TriggerGCCheck(const uv_async_t * req)380 void Worker::TriggerGCCheck(const uv_async_t* req)
381 {
382     if (req == nullptr || req->data == nullptr) {
383         HILOG_ERROR("taskpool:: req handle is invalid");
384         return;
385     }
386     auto worker = reinterpret_cast<Worker*>(req->data);
387     auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
388     workerEngine->NotifyTaskFinished();
389 }
390 
NotifyTaskFinished()391 void Worker::NotifyTaskFinished()
392 {
393     // trigger gc check by uv and return immediately if the handle is invalid
394     if (UNLIKELY(!ConcurrentHelper::IsUvActive(triggerGCCheckSignal_))) {
395         HILOG_ERROR("taskpool:: triggerGCCheckSignal_ is nullptr or closed");
396         return;
397     } else {
398         uv_async_send(triggerGCCheckSignal_);
399     }
400 
401     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
402     if (--runningCount_ != 0 || workerEngine->HasPendingJob()) {
403         // the worker state is still RUNNING and the start time will be updated
404         startTime_ = ConcurrentHelper::GetMilliseconds();
405     } else {
406         UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE);
407     }
408     idlePoint_ = ConcurrentHelper::GetMilliseconds();
409 }
410 
UpdateWorkerState(WorkerState expect,WorkerState desired)411 bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired)
412 {
413     return state_.compare_exchange_strong(expect, desired);
414 }
415 
PerformTask(const uv_async_t * req)416 void Worker::PerformTask(const uv_async_t* req)
417 {
418     auto worker = static_cast<Worker*>(req->data);
419     auto taskInfo = TaskManager::GetInstance().DequeueTaskId();
420     if (taskInfo.first == 0) {
421         if (TaskManager::GetInstance().GetTotalTaskNum() != 0) {
422             worker->NotifyExecuteTask();
423         }
424         return;
425     }
426     uint64_t startTime = ConcurrentHelper::GetMilliseconds();
427     worker->UpdateWorkerWakeUpTime();
428     napi_env env = worker->workerEnv_;
429     TaskManager::GetInstance().NotifyWorkerRunning(worker);
430     RunningScope runningScope(worker);
431     WorkerRunningScope workerRunningScope(env);
432     PriorityScope priorityScope(worker, taskInfo.second);
433     Task* task = TaskManager::GetInstance().GetTask(taskInfo.first);
434     if (task == nullptr) {
435         HILOG_DEBUG("taskpool:: task has been released");
436         return;
437     } else if (!task->IsValid() && task->ShouldDeleteTask(false)) {
438         HILOG_WARN("taskpool:: task is invalid");
439         delete task;
440         return;
441     }
442     // try to record the memory data for gc
443     worker->NotifyTaskBegin();
444 
445     if (!task->UpdateTask(startTime, worker)) {
446         worker->NotifyTaskFinished();
447         return;
448     }
449     if (task->IsGroupTask()) {
450         TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_);
451     }
452     if (task->IsLongTask()) {
453         worker->UpdateLongTaskInfo(task);
454     }
455     worker->StoreTaskId(task->taskId_);
456     // tag for trace parse: Task Perform
457     auto loop = worker->GetWorkerLoop();
458     uint64_t loopAddress = reinterpret_cast<uint64_t>(loop);
459     std::string strTrace = "Task Perform: name : "  + task->name_ + ", taskId : " + std::to_string(task->taskId_)
460                             + ", priority : " + std::to_string(taskInfo.second);
461     std::string taskLog = "Task Perform: "  + task->name_ + ", " + std::to_string(task->taskId_) + ", "
462                           "runningLoop: " + std::to_string(loopAddress);
463     HITRACE_HELPER_METER_NAME(strTrace);
464     HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
465 
466     napi_value func = nullptr;
467     napi_value args = nullptr;
468     napi_value errorInfo = task->DeserializeValue(env, &func, &args);
469     if (UNLIKELY(func == nullptr || args == nullptr)) {
470         if (errorInfo != nullptr) {
471             worker->NotifyTaskResult(env, task, errorInfo);
472         }
473         return;
474     }
475     if (!worker->InitTaskPoolFunc(env, func, task)) {
476         return;
477     }
478     worker->hasExecuted_ = true;
479     uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
480     napi_value argsArray[argsNum];
481     for (size_t i = 0; i < argsNum; i++) {
482         argsArray[i] = NapiHelper::GetElement(env, args, i);
483     }
484 
485     if (!task->CheckStartExecution(taskInfo.second)) {
486         if (task->ShouldDeleteTask()) {
487             delete task;
488         }
489         return;
490     }
491     napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr);
492     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
493     workerEngine->ClearCurrentTaskInfo();
494     task->DecreaseRefCount();
495     task->StoreTaskDuration();
496     worker->UpdateExecutedInfo();
497     HandleFunctionResult(env, task);
498 }
499 
NotifyTaskResult(napi_env env,Task * task,napi_value result)500 void Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result)
501 {
502     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
503     HILOG_DEBUG("taskpool:: NotifyTaskResult task:%{public}s", std::to_string(task->taskId_).c_str());
504     void* resultData = nullptr;
505     napi_value undefined = NapiHelper::GetUndefinedValue(env);
506     bool defaultTransfer = true;
507     bool defaultCloneSendable = false;
508     std::string errString = "";
509     napi_status status = napi_serialize_inner_with_error(env, result, undefined, undefined, defaultTransfer,
510                                                          defaultCloneSendable, &resultData, errString);
511     if ((status != napi_ok || resultData == nullptr) && task->success_) {
512         task->success_ = false;
513         std::string errMessage = "taskpool: failed to serialize result.\nSerialize error: " + errString;
514         HILOG_ERROR("%{public}s", errMessage.c_str());
515         napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
516         NotifyTaskResult(env, task, err);
517         return;
518     }
519     task->result_ = resultData;
520     NotifyHandleTaskResult(task);
521 }
522 
NotifyHandleTaskResult(Task * task)523 void Worker::NotifyHandleTaskResult(Task* task)
524 {
525     if (!task->IsReadyToHandle()) {
526         return;
527     }
528     Worker* worker = reinterpret_cast<Worker*>(task->GetWorker());
529     if (worker == nullptr) {
530         TaskManager::GetInstance().UvReportHisysEvent(nullptr, "NotifyHandleTaskResult", "", "worker is nullptr", -1);
531         HILOG_FATAL("taskpool:: worker is nullptr");
532         return;
533     }
534     worker->EraseRunningTaskId(task->GetTaskId());
535     auto priority = worker->GetPriority();
536     if (!Task::VerifyAndPostResult(task, priority)) {
537         if (task->ShouldDeleteTask()) {
538             delete task;
539         }
540     }
541     worker->NotifyTaskFinished();
542 }
543 
TaskResultCallback(napi_env env,napi_value result,bool success,void * data)544 void Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data)
545 {
546     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
547     if (env == nullptr) { // LCOV_EXCL_BR_LINE
548         std::string error = "TaskResultCallback engine is null";
549         TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
550         HILOG_FATAL("taskpool:: %{public}s", error.c_str());
551         return;
552     }
553     if (data == nullptr) { // LCOV_EXCL_BR_LINE
554         std::string error = "data is nullptr";
555         TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
556         HILOG_FATAL("taskpool:: %{public}s", error.c_str());
557         return;
558     }
559     Task* task = static_cast<Task*>(data);
560     if (TaskManager::GetInstance().GetTask(task->taskId_) == nullptr) {
561         std::string error = "task is nullptr, taskId: " + std::to_string(task->taskId_);
562         TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
563         HILOG_FATAL("taskpool:: task is nullptr");
564         return;
565     }
566     auto worker = static_cast<Worker*>(task->worker_);
567     worker->isExecutingLongTask_ = task->IsLongTask();
568     task->DecreaseRefCount();
569     task->ioTime_ = ConcurrentHelper::GetMilliseconds();
570     if (task->cpuTime_ != 0) {
571         uint64_t ioDuration = task->ioTime_ - task->startTime_;
572         uint64_t cpuDuration = task->cpuTime_ - task->startTime_;
573         TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration);
574     }
575     napi_value exception = nullptr;
576     napi_get_and_clear_last_exception(env, &exception);
577     if (exception != nullptr) {
578         HILOG_ERROR("taskpool::TaskResultCallback occur exception");
579         reinterpret_cast<NativeEngine*>(env)->HandleTaskpoolException(exception);
580         task->success_ = false;
581         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
582         NotifyTaskResult(env, task, errorEvent);
583         return;
584     }
585 
586     task->success_ = success;
587     NotifyTaskResult(env, task, result);
588 }
589 
590 // reset qos_user_initiated after perform task
ResetWorkerPriority()591 void Worker::ResetWorkerPriority()
592 {
593     if (priority_ != Priority::HIGH) {
594         if (TaskManager::GetInstance().EnableFfrt()) {
595 #if defined(ENABLE_TASKPOOL_FFRT)
596             if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) {
597                 SetWorkerPriority(Priority::HIGH);
598             }
599             priority_ = Priority::HIGH;
600 #endif
601         }
602     }
603 }
604 
StoreTaskId(uint32_t taskId)605 void Worker::StoreTaskId(uint32_t taskId)
606 {
607     std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
608     currentTaskId_.emplace_back(taskId);
609 }
610 
InitTaskPoolFunc(napi_env env,napi_value func,Task * task)611 bool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task)
612 {
613     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
614     bool success = workerEngine->InitTaskPoolFunc(env, func, task);
615     napi_value exception;
616     napi_get_and_clear_last_exception(env, &exception);
617     if (exception != nullptr) {
618         HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
619         task->success_ = false;
620         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
621         NotifyTaskResult(env, task, errorEvent);
622         return false;
623     }
624     if (!success) {
625         HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
626         napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
627                                                "taskpool:: function may not be concurrent.");
628         task->success_ = false;
629         NotifyTaskResult(env, task, err);
630         return false;
631     }
632     return true;
633 }
634 
UpdateExecutedInfo()635 void Worker::UpdateExecutedInfo()
636 {
637     // if the worker is blocked, just skip
638     if (LIKELY(state_ != WorkerState::BLOCKED)) {
639         uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_;
640         TaskManager::GetInstance().UpdateExecutedInfo(duration);
641     }
642 }
643 
644 // Only when the worker has no longTask can it be released.
TerminateTask(uint32_t taskId)645 void Worker::TerminateTask(uint32_t taskId)
646 {
647     HILOG_DEBUG("taskpool:: TerminateTask task:%{public}s", std::to_string(taskId).c_str());
648     std::lock_guard<std::mutex> lock(longMutex_);
649     longTasksSet_.erase(taskId);
650     if (longTasksSet_.empty()) {
651         hasLongTask_ = false;
652     }
653 }
654 
655 // to store longTasks' state
UpdateLongTaskInfo(Task * task)656 void Worker::UpdateLongTaskInfo(Task* task)
657 {
658     HILOG_DEBUG("taskpool:: UpdateLongTaskInfo task:%{public}s", std::to_string(task->taskId_).c_str());
659     TaskManager::GetInstance().StoreLongTaskInfo(task->taskId_, this);
660     std::lock_guard<std::mutex> lock(longMutex_);
661     hasLongTask_ = true;
662     isExecutingLongTask_ = true;
663     longTasksSet_.emplace(task->taskId_);
664 }
665 
IsExecutingLongTask()666 bool Worker::IsExecutingLongTask()
667 {
668     return isExecutingLongTask_;
669 }
670 
HasLongTask()671 bool Worker::HasLongTask()
672 {
673     return hasLongTask_;
674 }
675 
HandleFunctionResult(napi_env env,Task * task)676 void Worker::HandleFunctionResult(napi_env env, Task* task)
677 {
678     napi_value exception = nullptr;
679     napi_get_and_clear_last_exception(env, &exception);
680     if (exception != nullptr) {
681         HILOG_ERROR("taskpool::PerformTask occur exception");
682         reinterpret_cast<NativeEngine*>(env)->HandleTaskpoolException(exception);
683         task->DecreaseRefCount();
684         task->success_ = false;
685         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
686         NotifyTaskResult(env, task, errorEvent);
687         return;
688     }
689     NotifyHandleTaskResult(task);
690 }
691 
PostReleaseSignal()692 void Worker::PostReleaseSignal()
693 {
694     if (UNLIKELY(!ConcurrentHelper::IsUvActive(clearWorkerSignal_))) {
695         HILOG_ERROR("taskpool:: clearWorkerSignal_ is nullptr or closed");
696         return;
697     }
698     uv_async_send(clearWorkerSignal_);
699 }
700 
IsRunnable(uint64_t currTime) const701 bool Worker::IsRunnable(uint64_t currTime) const
702 {
703     bool res = true;
704     if (currTime > wakeUpTime_) {
705         res = (currTime - wakeUpTime_ < 15); // 15: ms
706     }
707     return res;
708 }
709 
UpdateWorkerWakeUpTime()710 void Worker::UpdateWorkerWakeUpTime()
711 {
712     wakeUpTime_ = ConcurrentHelper::GetMilliseconds();
713 }
714 
GetPriority() const715 Priority Worker::GetPriority() const
716 {
717     return priority_;
718 }
719 
EraseRunningTaskId(uint32_t taskId)720 void Worker::EraseRunningTaskId(uint32_t taskId)
721 {
722     std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
723     auto iter = std::find(currentTaskId_.begin(), currentTaskId_.end(), taskId);
724     if (iter != currentTaskId_.end()) {
725         currentTaskId_.erase(iter);
726     }
727 }
728 
729 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
IsNeedReport(uint64_t intervalTime)730 bool Worker::IsNeedReport(uint64_t intervalTime)
731 {
732     if (reportCount_ >= MAX_REPORT_TIMES) {
733         return false;
734     }
735     if (intervalTime < static_cast<uint64_t>(reportCount_.load() + 1) * WORKER_ALIVE_TIME) {
736         return false;
737     }
738     return true;
739 }
740 
IncreaseReportCount()741 void Worker::IncreaseReportCount()
742 {
743     reportCount_++;
744 }
745 #endif
746 
ResetPerformIdleState()747 void Worker::ResetPerformIdleState()
748 {
749     if (priority_ == Priority::IDLE) {
750         TaskManager::GetInstance().SetIsPerformIdle(false);
751     }
752 }
753 } // namespace Commonlibrary::Concurrent::TaskPoolModule
754