1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "worker.h"
17 #include "helper/concurrent_helper.h"
18
19 #if defined(ENABLE_TASKPOOL_FFRT)
20 #include "c/executor_task.h"
21 #include "ffrt_inner.h"
22 #endif
23 #include "sys_timer.h"
24 #include "helper/hitrace_helper.h"
25 #include "process_helper.h"
26 #include "taskpool.h"
27 #include "task_group_manager.h"
28 #include "native_engine.h"
29
30 namespace Commonlibrary::Concurrent::TaskPoolModule {
31 using namespace OHOS::JsSysModule;
32 using namespace Commonlibrary::Platform;
33 static constexpr uint32_t TASKPOOL_TYPE = 2;
34 static constexpr uint32_t WORKER_ALIVE_TIME = 1800000; // 1800000: 30min
35 static constexpr int32_t MAX_REPORT_TIMES = 3;
36
PriorityScope(Worker * worker,Priority taskPriority)37 Worker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker)
38 {
39 if (taskPriority != worker->priority_) {
40 HILOG_DEBUG("taskpool:: reset worker priority to match task priority");
41 if (TaskManager::GetInstance().EnableFfrt()) {
42 #if defined(ENABLE_TASKPOOL_FFRT)
43 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) {
44 SetWorkerPriority(taskPriority);
45 }
46 #endif
47 } else {
48 SetWorkerPriority(taskPriority);
49 }
50 worker->priority_ = taskPriority;
51 }
52 }
53
~RunningScope()54 Worker::RunningScope::~RunningScope()
55 {
56 HILOG_DEBUG("taskpool:: RunningScope destruction");
57 if (scope_ != nullptr) {
58 napi_close_handle_scope(worker_->workerEnv_, scope_);
59 }
60 worker_->NotifyIdle();
61 worker_->idleState_ = true;
62 }
63
WorkerConstructor(napi_env env)64 Worker* Worker::WorkerConstructor(napi_env env)
65 {
66 HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]");
67 Worker* worker = new Worker(env);
68 worker->StartExecuteInThread();
69 return worker;
70 }
71
CloseHandles()72 void Worker::CloseHandles()
73 {
74 // set all handles to nullptr so that they can not be used even when the loop is re-running
75 ConcurrentHelper::UvHandleClose(performTaskSignal_);
76 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
77 ConcurrentHelper::UvHandleClose(debuggerOnPostTaskSignal_);
78 #endif
79 ConcurrentHelper::UvHandleClose(clearWorkerSignal_);
80 ConcurrentHelper::UvHandleClose(triggerGCCheckSignal_);
81 }
82
ReleaseWorkerHandles(const uv_async_t * req)83 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
84 {
85 auto worker = static_cast<Worker*>(req->data);
86 HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_);
87 if (!worker->CheckFreeConditions()) {
88 return;
89 }
90
91 TaskManager::GetInstance().RemoveWorker(worker);
92 HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
93 HILOG_INFO("taskpool:: release idle thread, total num is %{public}u",
94 TaskManager::GetInstance().GetThreadNum());
95 // when there is no active handle, worker loop will stop automatically.
96 worker->CloseHandles();
97
98 uv_loop_t* loop = worker->GetWorkerLoop();
99 if (loop != nullptr) {
100 uv_stop(loop);
101 }
102 }
103
CheckFreeConditions()104 bool Worker::CheckFreeConditions()
105 {
106 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
107 // only when all conditions are met can the worker be freed
108 if (HasRunningTasks()) {
109 HILOG_DEBUG("taskpool:: async callbacks may exist, the worker thread will not exit");
110 } else if (workerEngine->HasListeningCounter()) {
111 HILOG_DEBUG("taskpool:: listening operation exists, the worker thread will not exit");
112 } else if (Timer::HasTimer(workerEnv_)) {
113 HILOG_DEBUG("taskpool:: timer exists, the worker thread will not exit");
114 } else if (workerEngine->HasWaitingRequest()) {
115 HILOG_DEBUG("taskpool:: waiting request exists, the worker thread will not exit");
116 } else if (workerEngine->HasSubEnv()) {
117 HILOG_DEBUG("taskpool:: sub env exists, the worker thread will not exit");
118 } else if (workerEngine->HasPendingJob()) {
119 HILOG_DEBUG("taskpool:: pending job exists, the worker thread will not exit");
120 } else if (workerEngine->IsProfiling()) {
121 HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling");
122 } else {
123 return true;
124 }
125 HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_);
126 TaskManager& taskManager = TaskManager::GetInstance();
127 taskManager.RestoreWorker(this);
128 taskManager.CountTraceForWorker();
129 return false;
130 }
131
StartExecuteInThread()132 void Worker::StartExecuteInThread()
133 {
134 if (!runner_) {
135 runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
136 }
137 if (runner_) {
138 runner_->Execute(); // start a new thread
139 } else {
140 HILOG_ERROR("taskpool:: runner_ is nullptr");
141 }
142 }
143
144 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)145 void Worker::HandleDebuggerTask(const uv_async_t* req)
146 {
147 Worker* worker = reinterpret_cast<Worker*>(req->data);
148 if (worker == nullptr) {
149 HILOG_ERROR("taskpool:: worker is null");
150 return;
151 }
152 worker->debuggerMutex_.lock();
153 auto task = std::move(worker->debuggerQueue_.front());
154 worker->debuggerQueue_.pop();
155 worker->debuggerMutex_.unlock();
156 task();
157 }
158
DebuggerOnPostTask(std::function<void ()> && task)159 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
160 {
161 if (ConcurrentHelper::IsUvActive(debuggerOnPostTaskSignal_)) {
162 std::lock_guard<std::mutex> lock(debuggerMutex_);
163 debuggerQueue_.push(std::move(task));
164 uv_async_send(debuggerOnPostTaskSignal_);
165 }
166 }
167 #endif
168
169 #if defined(ENABLE_TASKPOOL_FFRT)
InitFfrtInfo()170 void Worker::InitFfrtInfo()
171 {
172 if (TaskManager::GetInstance().EnableFfrt()) {
173 static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = {
174 {ffrt::qos_background, Priority::IDLE},
175 {ffrt::qos_utility, Priority::LOW},
176 {ffrt::qos_default, Priority::DEFAULT},
177 {ffrt::qos_user_initiated, Priority::HIGH},
178 };
179 ffrt_qos_t qos = ffrt_this_task_get_qos();
180 priority_ = FFRTQOS_WORKERPRIORITY_MAP.at(qos);
181 ffrtTaskHandle_ = ffrt_get_cur_task();
182 }
183 }
184
InitLoopHandleNum()185 void Worker::InitLoopHandleNum()
186 {
187 if (ffrtTaskHandle_ == nullptr) {
188 return;
189 }
190
191 uv_loop_t* loop = GetWorkerLoop();
192 if (loop != nullptr) {
193 initActiveHandleNum_ = loop->active_handles;
194 } else {
195 HILOG_ERROR("taskpool:: worker loop is nullptr when init loop handle num.");
196 }
197 }
198
IsLoopActive()199 bool Worker::IsLoopActive()
200 {
201 uv_loop_t* loop = GetWorkerLoop();
202 if (loop != nullptr) {
203 return uv_loop_alive_taskpool(loop, initActiveHandleNum_);
204 } else {
205 HILOG_ERROR("taskpool:: worker loop is nullptr when judge loop alive.");
206 return false;
207 }
208 }
209
GetWaitTime()210 uint64_t Worker::GetWaitTime()
211 {
212 return ffrt_epoll_get_wait_time(ffrtTaskHandle_);
213 }
214 #endif
215
ExecuteInThread(const void * data)216 void Worker::ExecuteInThread(const void* data)
217 {
218 HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
219 auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
220 {
221 napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
222 if (worker->workerEnv_ == nullptr) {
223 HILOG_ERROR("taskpool:: worker create runtime failed");
224 return;
225 }
226 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
227 // mark worker env is taskpoolThread
228 workerEngine->MarkTaskPoolThread();
229 workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback);
230 }
231 uv_loop_t* loop = worker->GetWorkerLoop();
232 if (loop == nullptr) {
233 HILOG_ERROR("taskpool:: loop is nullptr");
234 return;
235 }
236 // save the worker tid
237 worker->tid_ = GetThreadId();
238
239 // Init worker task execute signal
240 ConcurrentHelper::UvHandleInit(loop, worker->performTaskSignal_, Worker::PerformTask, worker);
241 ConcurrentHelper::UvHandleInit(loop, worker->clearWorkerSignal_, Worker::ReleaseWorkerHandles, worker);
242 ConcurrentHelper::UvHandleInit(loop, worker->triggerGCCheckSignal_, Worker::TriggerGCCheck, worker);
243
244 HITRACE_HELPER_FINISH_TRACE;
245 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
246 // Init debugger task post signal
247 ConcurrentHelper::UvHandleInit(loop, worker->debuggerOnPostTaskSignal_, Worker::HandleDebuggerTask, worker);
248 #endif
249 if (worker->PrepareForWorkerInstance()) {
250 // Call after uv_async_init
251 worker->NotifyWorkerCreated();
252 #if defined(ENABLE_TASKPOOL_FFRT)
253 worker->InitFfrtInfo();
254 worker->InitLoopHandleNum();
255 #endif
256 worker->RunLoop();
257 } else {
258 HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
259 }
260 TaskManager::GetInstance().RemoveWorker(worker);
261 TaskManager::GetInstance().CountTraceForWorker();
262 worker->ReleaseWorkerThreadContent();
263 delete worker;
264 worker = nullptr;
265 }
266
PrepareForWorkerInstance()267 bool Worker::PrepareForWorkerInstance()
268 {
269 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
270 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
271 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
272 workerEngine->SetDebuggerPostTaskFunc([this](std::function<void()>&& task) {
273 this->DebuggerOnPostTask(std::move(task));
274 });
275 #endif
276 reinterpret_cast<NativeEngine*>(workerEnv_)->RegisterNapiUncaughtExceptionHandler(
277 [workerEngine] (napi_value exception) -> void {
278 if (!NativeEngine::IsAlive(workerEngine)) {
279 HILOG_WARN("napi_env has been destoryed!");
280 return;
281 }
282 std::string name = "";
283 void* data = workerEngine->GetCurrentTaskInfo();
284 if (data != nullptr) {
285 Task* task = static_cast<Task*>(data);
286 name = task->name_;
287 }
288 NapiErrorManager::GetInstance()->NotifyUncaughtException(reinterpret_cast<napi_env>(workerEngine),
289 exception, name, TASKPOOL_TYPE);
290 });
291 reinterpret_cast<NativeEngine*>(workerEnv_)->RegisterAllPromiseCallback(
292 [workerEngine] (napi_value* args) -> void {
293 if (!NativeEngine::IsAlive(workerEngine)) {
294 HILOG_WARN("napi_env has been destoryed!");
295 return;
296 }
297 std::string name = "";
298 void* data = workerEngine->GetCurrentTaskInfo();
299 if (data != nullptr) {
300 Task* task = static_cast<Task*>(data);
301 name = task->name_;
302 }
303 NapiErrorManager::GetInstance()->NotifyUnhandledRejection(reinterpret_cast<napi_env>(workerEngine),
304 args, name, TASKPOOL_TYPE);
305 });
306
307 if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
308 HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
309 return false;
310 }
311 // register timer interface
312 Timer::RegisterTime(workerEnv_);
313
314 // Check exception after worker construction
315 if (NapiHelper::IsExceptionPending(workerEnv_)) {
316 HILOG_ERROR("taskpool:: Worker construction occur exception");
317 return false;
318 }
319 return true;
320 }
321
ReleaseWorkerThreadContent()322 void Worker::ReleaseWorkerThreadContent()
323 {
324 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
325 auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
326 if (workerEngine == nullptr) {
327 HILOG_ERROR("taskpool:: workerEngine is nullptr");
328 return;
329 }
330 if (hostEngine != nullptr) {
331 if (!hostEngine->DeleteWorker(workerEngine)) {
332 HILOG_ERROR("taskpool:: DeleteWorker fail");
333 }
334 }
335 if (state_ == WorkerState::BLOCKED) {
336 HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
337 } else {
338 HITRACE_HELPER_METER_NAME("Thread Exit");
339 }
340
341 Timer::ClearEnvironmentTimer(workerEnv_);
342 // 2. delete NativeEngine created in worker thread
343 if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
344 HILOG_ERROR("worker:: CallOffWorkerFunc error");
345 }
346 delete workerEngine;
347 workerEnv_ = nullptr;
348 }
349
NotifyExecuteTask()350 void Worker::NotifyExecuteTask()
351 {
352 if (LIKELY(performTaskSignal_ != nullptr && !uv_is_closing(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) {
353 int ret = uv_async_send(performTaskSignal_);
354 if (ret != 0) {
355 HILOG_ERROR("taskpool:: worker NotifyExecuteTask uv send failed");
356 TaskManager::GetInstance().UvReportHisysEvent(this, "NotifyExecuteTask", "uv_async_send",
357 "uv send performTaskSignal_ failed", ret);
358 }
359 } else {
360 HILOG_ERROR("taskpool:: performTaskSignal_ is invalid");
361 }
362 }
363
NotifyIdle()364 void Worker::NotifyIdle()
365 {
366 TaskManager::GetInstance().NotifyWorkerIdle(this);
367 }
368
NotifyWorkerCreated()369 void Worker::NotifyWorkerCreated()
370 {
371 TaskManager::GetInstance().NotifyWorkerCreated(this);
372 }
373
NotifyTaskBegin()374 void Worker::NotifyTaskBegin()
375 {
376 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
377 workerEngine->NotifyTaskBegin();
378 }
379
TriggerGCCheck(const uv_async_t * req)380 void Worker::TriggerGCCheck(const uv_async_t* req)
381 {
382 if (req == nullptr || req->data == nullptr) {
383 HILOG_ERROR("taskpool:: req handle is invalid");
384 return;
385 }
386 auto worker = reinterpret_cast<Worker*>(req->data);
387 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
388 workerEngine->NotifyTaskFinished();
389 }
390
NotifyTaskFinished()391 void Worker::NotifyTaskFinished()
392 {
393 // trigger gc check by uv and return immediately if the handle is invalid
394 if (UNLIKELY(!ConcurrentHelper::IsUvActive(triggerGCCheckSignal_))) {
395 HILOG_ERROR("taskpool:: triggerGCCheckSignal_ is nullptr or closed");
396 return;
397 } else {
398 uv_async_send(triggerGCCheckSignal_);
399 }
400
401 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
402 if (--runningCount_ != 0 || workerEngine->HasPendingJob()) {
403 // the worker state is still RUNNING and the start time will be updated
404 startTime_ = ConcurrentHelper::GetMilliseconds();
405 } else {
406 UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE);
407 }
408 idlePoint_ = ConcurrentHelper::GetMilliseconds();
409 }
410
UpdateWorkerState(WorkerState expect,WorkerState desired)411 bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired)
412 {
413 return state_.compare_exchange_strong(expect, desired);
414 }
415
PerformTask(const uv_async_t * req)416 void Worker::PerformTask(const uv_async_t* req)
417 {
418 auto worker = static_cast<Worker*>(req->data);
419 auto taskInfo = TaskManager::GetInstance().DequeueTaskId();
420 if (taskInfo.first == 0) {
421 if (TaskManager::GetInstance().GetTotalTaskNum() != 0) {
422 worker->NotifyExecuteTask();
423 }
424 return;
425 }
426 uint64_t startTime = ConcurrentHelper::GetMilliseconds();
427 worker->UpdateWorkerWakeUpTime();
428 napi_env env = worker->workerEnv_;
429 TaskManager::GetInstance().NotifyWorkerRunning(worker);
430 RunningScope runningScope(worker);
431 WorkerRunningScope workerRunningScope(env);
432 PriorityScope priorityScope(worker, taskInfo.second);
433 Task* task = TaskManager::GetInstance().GetTask(taskInfo.first);
434 if (task == nullptr) {
435 HILOG_DEBUG("taskpool:: task has been released");
436 return;
437 } else if (!task->IsValid() && task->ShouldDeleteTask(false)) {
438 HILOG_WARN("taskpool:: task is invalid");
439 delete task;
440 return;
441 }
442 // try to record the memory data for gc
443 worker->NotifyTaskBegin();
444
445 if (!task->UpdateTask(startTime, worker)) {
446 worker->NotifyTaskFinished();
447 return;
448 }
449 if (task->IsGroupTask()) {
450 TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_);
451 }
452 if (task->IsLongTask()) {
453 worker->UpdateLongTaskInfo(task);
454 }
455 worker->StoreTaskId(task->taskId_);
456 // tag for trace parse: Task Perform
457 auto loop = worker->GetWorkerLoop();
458 uint64_t loopAddress = reinterpret_cast<uint64_t>(loop);
459 std::string strTrace = "Task Perform: name : " + task->name_ + ", taskId : " + std::to_string(task->taskId_)
460 + ", priority : " + std::to_string(taskInfo.second);
461 std::string taskLog = "Task Perform: " + task->name_ + ", " + std::to_string(task->taskId_) + ", "
462 "runningLoop: " + std::to_string(loopAddress);
463 HITRACE_HELPER_METER_NAME(strTrace);
464 HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
465
466 napi_value func = nullptr;
467 napi_value args = nullptr;
468 napi_value errorInfo = task->DeserializeValue(env, &func, &args);
469 if (UNLIKELY(func == nullptr || args == nullptr)) {
470 if (errorInfo != nullptr) {
471 worker->NotifyTaskResult(env, task, errorInfo);
472 }
473 return;
474 }
475 if (!worker->InitTaskPoolFunc(env, func, task)) {
476 return;
477 }
478 worker->hasExecuted_ = true;
479 uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
480 napi_value argsArray[argsNum];
481 for (size_t i = 0; i < argsNum; i++) {
482 argsArray[i] = NapiHelper::GetElement(env, args, i);
483 }
484
485 if (!task->CheckStartExecution(taskInfo.second)) {
486 if (task->ShouldDeleteTask()) {
487 delete task;
488 }
489 return;
490 }
491 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr);
492 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
493 workerEngine->ClearCurrentTaskInfo();
494 task->DecreaseRefCount();
495 task->StoreTaskDuration();
496 worker->UpdateExecutedInfo();
497 HandleFunctionResult(env, task);
498 }
499
NotifyTaskResult(napi_env env,Task * task,napi_value result)500 void Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result)
501 {
502 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
503 HILOG_DEBUG("taskpool:: NotifyTaskResult task:%{public}s", std::to_string(task->taskId_).c_str());
504 void* resultData = nullptr;
505 napi_value undefined = NapiHelper::GetUndefinedValue(env);
506 bool defaultTransfer = true;
507 bool defaultCloneSendable = false;
508 std::string errString = "";
509 napi_status status = napi_serialize_inner_with_error(env, result, undefined, undefined, defaultTransfer,
510 defaultCloneSendable, &resultData, errString);
511 if ((status != napi_ok || resultData == nullptr) && task->success_) {
512 task->success_ = false;
513 std::string errMessage = "taskpool: failed to serialize result.\nSerialize error: " + errString;
514 HILOG_ERROR("%{public}s", errMessage.c_str());
515 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
516 NotifyTaskResult(env, task, err);
517 return;
518 }
519 task->result_ = resultData;
520 NotifyHandleTaskResult(task);
521 }
522
NotifyHandleTaskResult(Task * task)523 void Worker::NotifyHandleTaskResult(Task* task)
524 {
525 if (!task->IsReadyToHandle()) {
526 return;
527 }
528 Worker* worker = reinterpret_cast<Worker*>(task->GetWorker());
529 if (worker == nullptr) {
530 TaskManager::GetInstance().UvReportHisysEvent(nullptr, "NotifyHandleTaskResult", "", "worker is nullptr", -1);
531 HILOG_FATAL("taskpool:: worker is nullptr");
532 return;
533 }
534 worker->EraseRunningTaskId(task->GetTaskId());
535 auto priority = worker->GetPriority();
536 if (!Task::VerifyAndPostResult(task, priority)) {
537 if (task->ShouldDeleteTask()) {
538 delete task;
539 }
540 }
541 worker->NotifyTaskFinished();
542 }
543
TaskResultCallback(napi_env env,napi_value result,bool success,void * data)544 void Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data)
545 {
546 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
547 if (env == nullptr) { // LCOV_EXCL_BR_LINE
548 std::string error = "TaskResultCallback engine is null";
549 TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
550 HILOG_FATAL("taskpool:: %{public}s", error.c_str());
551 return;
552 }
553 if (data == nullptr) { // LCOV_EXCL_BR_LINE
554 std::string error = "data is nullptr";
555 TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
556 HILOG_FATAL("taskpool:: %{public}s", error.c_str());
557 return;
558 }
559 Task* task = static_cast<Task*>(data);
560 if (TaskManager::GetInstance().GetTask(task->taskId_) == nullptr) {
561 std::string error = "task is nullptr, taskId: " + std::to_string(task->taskId_);
562 TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
563 HILOG_FATAL("taskpool:: task is nullptr");
564 return;
565 }
566 auto worker = static_cast<Worker*>(task->worker_);
567 worker->isExecutingLongTask_ = task->IsLongTask();
568 task->DecreaseRefCount();
569 task->ioTime_ = ConcurrentHelper::GetMilliseconds();
570 if (task->cpuTime_ != 0) {
571 uint64_t ioDuration = task->ioTime_ - task->startTime_;
572 uint64_t cpuDuration = task->cpuTime_ - task->startTime_;
573 TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration);
574 }
575 napi_value exception = nullptr;
576 napi_get_and_clear_last_exception(env, &exception);
577 if (exception != nullptr) {
578 HILOG_ERROR("taskpool::TaskResultCallback occur exception");
579 reinterpret_cast<NativeEngine*>(env)->HandleTaskpoolException(exception);
580 task->success_ = false;
581 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
582 NotifyTaskResult(env, task, errorEvent);
583 return;
584 }
585
586 task->success_ = success;
587 NotifyTaskResult(env, task, result);
588 }
589
590 // reset qos_user_initiated after perform task
ResetWorkerPriority()591 void Worker::ResetWorkerPriority()
592 {
593 if (priority_ != Priority::HIGH) {
594 if (TaskManager::GetInstance().EnableFfrt()) {
595 #if defined(ENABLE_TASKPOOL_FFRT)
596 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) {
597 SetWorkerPriority(Priority::HIGH);
598 }
599 priority_ = Priority::HIGH;
600 #endif
601 }
602 }
603 }
604
StoreTaskId(uint32_t taskId)605 void Worker::StoreTaskId(uint32_t taskId)
606 {
607 std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
608 currentTaskId_.emplace_back(taskId);
609 }
610
InitTaskPoolFunc(napi_env env,napi_value func,Task * task)611 bool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task)
612 {
613 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
614 bool success = workerEngine->InitTaskPoolFunc(env, func, task);
615 napi_value exception;
616 napi_get_and_clear_last_exception(env, &exception);
617 if (exception != nullptr) {
618 HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
619 task->success_ = false;
620 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
621 NotifyTaskResult(env, task, errorEvent);
622 return false;
623 }
624 if (!success) {
625 HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
626 napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
627 "taskpool:: function may not be concurrent.");
628 task->success_ = false;
629 NotifyTaskResult(env, task, err);
630 return false;
631 }
632 return true;
633 }
634
UpdateExecutedInfo()635 void Worker::UpdateExecutedInfo()
636 {
637 // if the worker is blocked, just skip
638 if (LIKELY(state_ != WorkerState::BLOCKED)) {
639 uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_;
640 TaskManager::GetInstance().UpdateExecutedInfo(duration);
641 }
642 }
643
644 // Only when the worker has no longTask can it be released.
TerminateTask(uint32_t taskId)645 void Worker::TerminateTask(uint32_t taskId)
646 {
647 HILOG_DEBUG("taskpool:: TerminateTask task:%{public}s", std::to_string(taskId).c_str());
648 std::lock_guard<std::mutex> lock(longMutex_);
649 longTasksSet_.erase(taskId);
650 if (longTasksSet_.empty()) {
651 hasLongTask_ = false;
652 }
653 }
654
655 // to store longTasks' state
UpdateLongTaskInfo(Task * task)656 void Worker::UpdateLongTaskInfo(Task* task)
657 {
658 HILOG_DEBUG("taskpool:: UpdateLongTaskInfo task:%{public}s", std::to_string(task->taskId_).c_str());
659 TaskManager::GetInstance().StoreLongTaskInfo(task->taskId_, this);
660 std::lock_guard<std::mutex> lock(longMutex_);
661 hasLongTask_ = true;
662 isExecutingLongTask_ = true;
663 longTasksSet_.emplace(task->taskId_);
664 }
665
IsExecutingLongTask()666 bool Worker::IsExecutingLongTask()
667 {
668 return isExecutingLongTask_;
669 }
670
HasLongTask()671 bool Worker::HasLongTask()
672 {
673 return hasLongTask_;
674 }
675
HandleFunctionResult(napi_env env,Task * task)676 void Worker::HandleFunctionResult(napi_env env, Task* task)
677 {
678 napi_value exception = nullptr;
679 napi_get_and_clear_last_exception(env, &exception);
680 if (exception != nullptr) {
681 HILOG_ERROR("taskpool::PerformTask occur exception");
682 reinterpret_cast<NativeEngine*>(env)->HandleTaskpoolException(exception);
683 task->DecreaseRefCount();
684 task->success_ = false;
685 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
686 NotifyTaskResult(env, task, errorEvent);
687 return;
688 }
689 NotifyHandleTaskResult(task);
690 }
691
PostReleaseSignal()692 void Worker::PostReleaseSignal()
693 {
694 if (UNLIKELY(!ConcurrentHelper::IsUvActive(clearWorkerSignal_))) {
695 HILOG_ERROR("taskpool:: clearWorkerSignal_ is nullptr or closed");
696 return;
697 }
698 uv_async_send(clearWorkerSignal_);
699 }
700
IsRunnable(uint64_t currTime) const701 bool Worker::IsRunnable(uint64_t currTime) const
702 {
703 bool res = true;
704 if (currTime > wakeUpTime_) {
705 res = (currTime - wakeUpTime_ < 15); // 15: ms
706 }
707 return res;
708 }
709
UpdateWorkerWakeUpTime()710 void Worker::UpdateWorkerWakeUpTime()
711 {
712 wakeUpTime_ = ConcurrentHelper::GetMilliseconds();
713 }
714
GetPriority() const715 Priority Worker::GetPriority() const
716 {
717 return priority_;
718 }
719
EraseRunningTaskId(uint32_t taskId)720 void Worker::EraseRunningTaskId(uint32_t taskId)
721 {
722 std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
723 auto iter = std::find(currentTaskId_.begin(), currentTaskId_.end(), taskId);
724 if (iter != currentTaskId_.end()) {
725 currentTaskId_.erase(iter);
726 }
727 }
728
729 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
IsNeedReport(uint64_t intervalTime)730 bool Worker::IsNeedReport(uint64_t intervalTime)
731 {
732 if (reportCount_ >= MAX_REPORT_TIMES) {
733 return false;
734 }
735 if (intervalTime < static_cast<uint64_t>(reportCount_.load() + 1) * WORKER_ALIVE_TIME) {
736 return false;
737 }
738 return true;
739 }
740
IncreaseReportCount()741 void Worker::IncreaseReportCount()
742 {
743 reportCount_++;
744 }
745 #endif
746
ResetPerformIdleState()747 void Worker::ResetPerformIdleState()
748 {
749 if (priority_ == Priority::IDLE) {
750 TaskManager::GetInstance().SetIsPerformIdle(false);
751 }
752 }
753 } // namespace Commonlibrary::Concurrent::TaskPoolModule
754