1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "worker.h"
17 #include "helper/concurrent_helper.h"
18
19 #if defined(ENABLE_TASKPOOL_FFRT)
20 #include "c/executor_task.h"
21 #include "ffrt_inner.h"
22 #endif
23 #include "sys_timer.h"
24 #include "helper/hitrace_helper.h"
25 #include "process_helper.h"
26 #include "taskpool.h"
27 #include "task_group_manager.h"
28 #include "native_engine.h"
29
30 namespace Commonlibrary::Concurrent::TaskPoolModule {
31 using namespace OHOS::JsSysModule;
32 using namespace Commonlibrary::Platform;
33 static constexpr uint32_t TASKPOOL_TYPE = 2;
34 static constexpr uint32_t WORKER_ALIVE_TIME = 1800000; // 1800000: 30min
35 static constexpr int32_t MAX_REPORT_TIMES = 3;
36
PriorityScope(Worker * worker,Priority taskPriority)37 Worker::PriorityScope::PriorityScope(Worker* worker, Priority taskPriority) : worker_(worker)
38 {
39 if (taskPriority != worker->priority_) {
40 HILOG_DEBUG("taskpool:: reset worker priority to match task priority");
41 if (TaskManager::GetInstance().EnableFfrt()) {
42 #if defined(ENABLE_TASKPOOL_FFRT)
43 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(taskPriority)) != 0) {
44 SetWorkerPriority(taskPriority);
45 }
46 #endif
47 } else {
48 SetWorkerPriority(taskPriority);
49 }
50 worker->priority_ = taskPriority;
51 }
52 }
53
~RunningScope()54 Worker::RunningScope::~RunningScope()
55 {
56 HILOG_DEBUG("taskpool:: RunningScope destruction");
57 if (scope_ != nullptr) {
58 napi_close_handle_scope(worker_->workerEnv_, scope_);
59 }
60 worker_->NotifyIdle();
61 worker_->idleState_ = true;
62 }
63
WorkerConstructor(napi_env env)64 Worker* Worker::WorkerConstructor(napi_env env)
65 {
66 HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]");
67 Worker* worker = new Worker(env);
68 worker->StartExecuteInThread();
69 return worker;
70 }
71
CloseHandles()72 void Worker::CloseHandles()
73 {
74 // set all handles to nullptr so that they can not be used even when the loop is re-running
75 ConcurrentHelper::UvHandleClose(performTaskSignal_);
76 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
77 ConcurrentHelper::UvHandleClose(debuggerOnPostTaskSignal_);
78 #endif
79 ConcurrentHelper::UvHandleClose(clearWorkerSignal_);
80 ConcurrentHelper::UvHandleClose(triggerGCCheckSignal_);
81 }
82
ReleaseWorkerHandles(const uv_async_t * req)83 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
84 {
85 auto worker = static_cast<Worker*>(req->data);
86 HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_);
87 if (!worker->CheckFreeConditions()) {
88 return;
89 }
90
91 TaskManager::GetInstance().RemoveWorker(worker);
92 HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
93 HILOG_INFO("taskpool:: the thread is idle and will be released, and the total num is %{public}u now",
94 TaskManager::GetInstance().GetThreadNum());
95 // when there is no active handle, worker loop will stop automatically.
96 worker->CloseHandles();
97
98 uv_loop_t* loop = worker->GetWorkerLoop();
99 if (loop != nullptr) {
100 uv_stop(loop);
101 }
102 }
103
CheckFreeConditions()104 bool Worker::CheckFreeConditions()
105 {
106 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
107 // only when all conditions are met can the worker be freed
108 if (HasRunningTasks()) {
109 HILOG_DEBUG("taskpool:: async callbacks may exist, the worker thread will not exit");
110 } else if (workerEngine->HasListeningCounter()) {
111 HILOG_DEBUG("taskpool:: listening operation exists, the worker thread will not exit");
112 } else if (Timer::HasTimer(workerEnv_)) {
113 HILOG_DEBUG("taskpool:: timer exists, the worker thread will not exit");
114 } else if (workerEngine->HasWaitingRequest()) {
115 HILOG_DEBUG("taskpool:: waiting request exists, the worker thread will not exit");
116 } else if (workerEngine->HasSubEnv()) {
117 HILOG_DEBUG("taskpool:: sub env exists, the worker thread will not exit");
118 } else if (workerEngine->HasPendingJob()) {
119 HILOG_DEBUG("taskpool:: pending job exists, the worker thread will not exit");
120 } else if (workerEngine->IsProfiling()) {
121 HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling");
122 } else {
123 return true;
124 }
125 HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_);
126 TaskManager& taskManager = TaskManager::GetInstance();
127 taskManager.RestoreWorker(this);
128 taskManager.CountTraceForWorker();
129 return false;
130 }
131
StartExecuteInThread()132 void Worker::StartExecuteInThread()
133 {
134 if (!runner_) {
135 runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
136 }
137 if (runner_) {
138 runner_->Execute(); // start a new thread
139 } else {
140 HILOG_ERROR("taskpool:: runner_ is nullptr");
141 }
142 }
143
144 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)145 void Worker::HandleDebuggerTask(const uv_async_t* req)
146 {
147 Worker* worker = reinterpret_cast<Worker*>(req->data);
148 if (worker == nullptr) {
149 HILOG_ERROR("taskpool:: worker is null");
150 return;
151 }
152 worker->debuggerMutex_.lock();
153 auto task = std::move(worker->debuggerQueue_.front());
154 worker->debuggerQueue_.pop();
155 worker->debuggerMutex_.unlock();
156 task();
157 }
158
DebuggerOnPostTask(std::function<void ()> && task)159 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
160 {
161 if (ConcurrentHelper::IsUvActive(debuggerOnPostTaskSignal_)) {
162 std::lock_guard<std::mutex> lock(debuggerMutex_);
163 debuggerQueue_.push(std::move(task));
164 uv_async_send(debuggerOnPostTaskSignal_);
165 }
166 }
167 #endif
168
169 #if defined(ENABLE_TASKPOOL_FFRT)
InitFfrtInfo()170 void Worker::InitFfrtInfo()
171 {
172 if (TaskManager::GetInstance().EnableFfrt()) {
173 static const std::map<int, Priority> FFRTQOS_WORKERPRIORITY_MAP = {
174 {ffrt::qos_background, Priority::IDLE},
175 {ffrt::qos_utility, Priority::LOW},
176 {ffrt::qos_default, Priority::DEFAULT},
177 {ffrt::qos_user_initiated, Priority::HIGH},
178 };
179 ffrt_qos_t qos = ffrt_this_task_get_qos();
180 priority_ = FFRTQOS_WORKERPRIORITY_MAP.at(qos);
181 ffrtTaskHandle_ = ffrt_get_cur_task();
182 }
183 }
184
InitLoopHandleNum()185 void Worker::InitLoopHandleNum()
186 {
187 if (ffrtTaskHandle_ == nullptr) {
188 return;
189 }
190
191 uv_loop_t* loop = GetWorkerLoop();
192 if (loop != nullptr) {
193 initActiveHandleNum_ = loop->active_handles;
194 } else {
195 HILOG_ERROR("taskpool:: worker loop is nullptr when init loop handle num.");
196 }
197 }
198
IsLoopActive()199 bool Worker::IsLoopActive()
200 {
201 uv_loop_t* loop = GetWorkerLoop();
202 if (loop != nullptr) {
203 return uv_loop_alive_taskpool(loop, initActiveHandleNum_);
204 } else {
205 HILOG_ERROR("taskpool:: worker loop is nullptr when judge loop alive.");
206 return false;
207 }
208 }
209
GetWaitTime()210 uint64_t Worker::GetWaitTime()
211 {
212 return ffrt_epoll_get_wait_time(ffrtTaskHandle_);
213 }
214 #endif
215
ExecuteInThread(const void * data)216 void Worker::ExecuteInThread(const void* data)
217 {
218 HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
219 auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
220 {
221 napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
222 if (worker->workerEnv_ == nullptr) {
223 HILOG_ERROR("taskpool:: worker create runtime failed");
224 return;
225 }
226 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
227 // mark worker env is taskpoolThread
228 workerEngine->MarkTaskPoolThread();
229 workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback);
230 }
231 uv_loop_t* loop = worker->GetWorkerLoop();
232 if (loop == nullptr) {
233 HILOG_ERROR("taskpool:: loop is nullptr");
234 return;
235 }
236 // save the worker tid
237 worker->tid_ = GetThreadId();
238
239 // Init worker task execute signal
240 ConcurrentHelper::UvHandleInit(loop, worker->performTaskSignal_, Worker::PerformTask, worker);
241 ConcurrentHelper::UvHandleInit(loop, worker->clearWorkerSignal_, Worker::ReleaseWorkerHandles, worker);
242 ConcurrentHelper::UvHandleInit(loop, worker->triggerGCCheckSignal_, Worker::TriggerGCCheck, worker);
243
244 HITRACE_HELPER_FINISH_TRACE;
245 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
246 // Init debugger task post signal
247 ConcurrentHelper::UvHandleInit(loop, worker->debuggerOnPostTaskSignal_, Worker::HandleDebuggerTask, worker);
248 #endif
249 if (worker->PrepareForWorkerInstance()) {
250 // Call after uv_async_init
251 worker->NotifyWorkerCreated();
252 #if defined(ENABLE_TASKPOOL_FFRT)
253 worker->InitFfrtInfo();
254 worker->InitLoopHandleNum();
255 #endif
256 worker->RunLoop();
257 } else {
258 HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
259 }
260 TaskManager::GetInstance().RemoveWorker(worker);
261 TaskManager::GetInstance().CountTraceForWorker();
262 worker->ReleaseWorkerThreadContent();
263 delete worker;
264 worker = nullptr;
265 }
266
PrepareForWorkerInstance()267 bool Worker::PrepareForWorkerInstance()
268 {
269 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
270 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
271 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
272 workerEngine->SetDebuggerPostTaskFunc([this](std::function<void()>&& task) {
273 this->DebuggerOnPostTask(std::move(task));
274 });
275 #endif
276 reinterpret_cast<NativeEngine*>(workerEnv_)->RegisterNapiUncaughtExceptionHandler(
277 [workerEngine] (napi_value exception) -> void {
278 if (!NativeEngine::IsAlive(workerEngine)) {
279 HILOG_WARN("napi_env has been destoryed!");
280 return;
281 }
282 std::string name = "";
283 void* data = workerEngine->GetCurrentTaskInfo();
284 if (data != nullptr) {
285 Task* task = static_cast<Task*>(data);
286 name = task->name_;
287 }
288 NapiErrorManager::GetInstance()->NotifyUncaughtException(reinterpret_cast<napi_env>(workerEngine),
289 exception, name, TASKPOOL_TYPE);
290 });
291 reinterpret_cast<NativeEngine*>(workerEnv_)->RegisterAllPromiseCallback(
292 [workerEngine] (napi_value* args) -> void {
293 if (!NativeEngine::IsAlive(workerEngine)) {
294 HILOG_WARN("napi_env has been destoryed!");
295 return;
296 }
297 std::string name = "";
298 void* data = workerEngine->GetCurrentTaskInfo();
299 if (data != nullptr) {
300 Task* task = static_cast<Task*>(data);
301 name = task->name_;
302 }
303 NapiErrorManager::GetInstance()->NotifyUnhandledRejection(reinterpret_cast<napi_env>(workerEngine),
304 args, name, TASKPOOL_TYPE);
305 });
306
307 if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
308 HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
309 return false;
310 }
311 // register timer interface
312 Timer::RegisterTime(workerEnv_);
313
314 // Check exception after worker construction
315 if (NapiHelper::IsExceptionPending(workerEnv_)) {
316 HILOG_ERROR("taskpool:: Worker construction occur exception");
317 return false;
318 }
319 return true;
320 }
321
ReleaseWorkerThreadContent()322 void Worker::ReleaseWorkerThreadContent()
323 {
324 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
325 auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
326 if (workerEngine == nullptr) {
327 HILOG_ERROR("taskpool:: workerEngine is nullptr");
328 return;
329 }
330 if (hostEngine != nullptr) {
331 if (!hostEngine->DeleteWorker(workerEngine)) {
332 HILOG_ERROR("taskpool:: DeleteWorker fail");
333 }
334 }
335 if (state_ == WorkerState::BLOCKED) {
336 HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
337 } else {
338 HITRACE_HELPER_METER_NAME("Thread Exit");
339 }
340
341 Timer::ClearEnvironmentTimer(workerEnv_);
342 // 2. delete NativeEngine created in worker thread
343 if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
344 HILOG_ERROR("worker:: CallOffWorkerFunc error");
345 }
346 delete workerEngine;
347 workerEnv_ = nullptr;
348 }
349
NotifyExecuteTask()350 void Worker::NotifyExecuteTask()
351 {
352 if (LIKELY(performTaskSignal_ != nullptr && !uv_is_closing(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) {
353 int ret = uv_async_send(performTaskSignal_);
354 if (ret != 0) {
355 TaskManager::GetInstance().UvReportHisysEvent(this, "NotifyExecuteTask", "uv_async_send",
356 "uv send performTaskSignal_ failed", ret);
357 }
358 }
359 }
360
NotifyIdle()361 void Worker::NotifyIdle()
362 {
363 TaskManager::GetInstance().NotifyWorkerIdle(this);
364 }
365
NotifyWorkerCreated()366 void Worker::NotifyWorkerCreated()
367 {
368 TaskManager::GetInstance().NotifyWorkerCreated(this);
369 }
370
NotifyTaskBegin()371 void Worker::NotifyTaskBegin()
372 {
373 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
374 workerEngine->NotifyTaskBegin();
375 }
376
TriggerGCCheck(const uv_async_t * req)377 void Worker::TriggerGCCheck(const uv_async_t* req)
378 {
379 if (req == nullptr || req->data == nullptr) {
380 HILOG_ERROR("taskpool:: req handle is invalid");
381 return;
382 }
383 auto worker = reinterpret_cast<Worker*>(req->data);
384 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
385 workerEngine->NotifyTaskFinished();
386 }
387
NotifyTaskFinished()388 void Worker::NotifyTaskFinished()
389 {
390 // trigger gc check by uv and return immediately if the handle is invalid
391 if (UNLIKELY(!ConcurrentHelper::IsUvActive(triggerGCCheckSignal_))) {
392 HILOG_ERROR("taskpool:: triggerGCCheckSignal_ is nullptr or closed");
393 return;
394 } else {
395 uv_async_send(triggerGCCheckSignal_);
396 }
397
398 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
399 if (--runningCount_ != 0 || workerEngine->HasPendingJob()) {
400 // the worker state is still RUNNING and the start time will be updated
401 startTime_ = ConcurrentHelper::GetMilliseconds();
402 } else {
403 UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE);
404 }
405 idlePoint_ = ConcurrentHelper::GetMilliseconds();
406 }
407
UpdateWorkerState(WorkerState expect,WorkerState desired)408 bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired)
409 {
410 return state_.compare_exchange_strong(expect, desired);
411 }
412
PerformTask(const uv_async_t * req)413 void Worker::PerformTask(const uv_async_t* req)
414 {
415 uint64_t startTime = ConcurrentHelper::GetMilliseconds();
416 auto worker = static_cast<Worker*>(req->data);
417 worker->UpdateWorkerWakeUpTime();
418 napi_env env = worker->workerEnv_;
419 TaskManager::GetInstance().NotifyWorkerRunning(worker);
420 auto taskInfo = TaskManager::GetInstance().DequeueTaskId();
421 if (taskInfo.first == 0) {
422 worker->NotifyIdle();
423 return;
424 }
425 RunningScope runningScope(worker);
426 WorkerRunningScope workerRunningScope(env);
427 PriorityScope priorityScope(worker, taskInfo.second);
428 Task* task = TaskManager::GetInstance().GetTask(taskInfo.first);
429 if (task == nullptr) {
430 HILOG_DEBUG("taskpool:: task has been released");
431 return;
432 } else if (!task->IsValid() && task->ShouldDeleteTask(false)) {
433 HILOG_WARN("taskpool:: task is invalid");
434 delete task;
435 return;
436 }
437 // try to record the memory data for gc
438 worker->NotifyTaskBegin();
439
440 if (!task->UpdateTask(startTime, worker)) {
441 worker->NotifyTaskFinished();
442 return;
443 }
444 if (task->IsGroupTask() && (!TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_))) {
445 return;
446 }
447 if (task->IsLongTask()) {
448 worker->UpdateLongTaskInfo(task);
449 }
450 worker->StoreTaskId(task->taskId_);
451 // tag for trace parse: Task Perform
452 auto loop = worker->GetWorkerLoop();
453 uint64_t loopAddress = reinterpret_cast<uint64_t>(loop);
454 std::string strTrace = "Task Perform: name : " + task->name_ + ", taskId : " + std::to_string(task->taskId_)
455 + ", priority : " + std::to_string(taskInfo.second);
456 std::string taskLog = "Task Perform: " + task->name_ + ", " + std::to_string(task->taskId_) + ", "
457 "runningLoop: " + std::to_string(loopAddress);
458 HITRACE_HELPER_METER_NAME(strTrace);
459 HILOG_TASK_INFO("taskpool:: %{public}s", taskLog.c_str());
460
461 napi_value func = nullptr;
462 napi_value args = nullptr;
463 napi_value errorInfo = task->DeserializeValue(env, &func, &args);
464 if (UNLIKELY(func == nullptr || args == nullptr)) {
465 if (errorInfo != nullptr) {
466 worker->NotifyTaskResult(env, task, errorInfo);
467 }
468 return;
469 }
470 if (!worker->InitTaskPoolFunc(env, func, task)) {
471 return;
472 }
473 worker->hasExecuted_ = true;
474 uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
475 napi_value argsArray[argsNum];
476 for (size_t i = 0; i < argsNum; i++) {
477 argsArray[i] = NapiHelper::GetElement(env, args, i);
478 }
479
480 if (!task->CheckStartExecution(taskInfo.second)) {
481 if (task->ShouldDeleteTask()) {
482 delete task;
483 }
484 return;
485 }
486 napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr);
487 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
488 workerEngine->ClearCurrentTaskInfo();
489 task->DecreaseRefCount();
490 task->StoreTaskDuration();
491 worker->UpdateExecutedInfo();
492 HandleFunctionException(env, task);
493 }
494
NotifyTaskResult(napi_env env,Task * task,napi_value result)495 void Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result)
496 {
497 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
498 HILOG_DEBUG("taskpool:: NotifyTaskResult task:%{public}s", std::to_string(task->taskId_).c_str());
499 void* resultData = nullptr;
500 napi_value undefined = NapiHelper::GetUndefinedValue(env);
501 bool defaultTransfer = true;
502 bool defaultCloneSendable = false;
503 napi_status status = napi_serialize_inner(env, result, undefined, undefined,
504 defaultTransfer, defaultCloneSendable, &resultData);
505 if ((status != napi_ok || resultData == nullptr) && task->success_) {
506 task->success_ = false;
507 std::string errMessage = "taskpool: failed to serialize result.";
508 HILOG_ERROR("%{public}s", errMessage.c_str());
509 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
510 NotifyTaskResult(env, task, err);
511 return;
512 }
513 task->result_ = resultData;
514 NotifyHandleTaskResult(task);
515 }
516
NotifyHandleTaskResult(Task * task)517 void Worker::NotifyHandleTaskResult(Task* task)
518 {
519 if (!task->IsReadyToHandle()) {
520 return;
521 }
522 Worker* worker = reinterpret_cast<Worker*>(task->worker_);
523 if (worker != nullptr) {
524 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
525 auto iter = std::find(worker->currentTaskId_.begin(), worker->currentTaskId_.end(), task->taskId_);
526 if (iter != worker->currentTaskId_.end()) {
527 worker->currentTaskId_.erase(iter);
528 }
529 } else {
530 TaskManager::GetInstance().UvReportHisysEvent(nullptr, "NotifyHandleTaskResult", "", "worker is nullptr", -1);
531 HILOG_FATAL("taskpool:: worker is nullptr");
532 return;
533 }
534 if (!task->VerifyAndPostResult(worker->priority_)) {
535 if (task->ShouldDeleteTask()) {
536 delete task;
537 }
538 }
539 worker->NotifyTaskFinished();
540 }
541
TaskResultCallback(napi_env env,napi_value result,bool success,void * data)542 void Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data)
543 {
544 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
545 if (env == nullptr) { // LCOV_EXCL_BR_LINE
546 std::string error = "TaskResultCallback engine is null";
547 TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
548 HILOG_FATAL("taskpool:: %{public}s", error.c_str());
549 return;
550 }
551 if (data == nullptr) { // LCOV_EXCL_BR_LINE
552 std::string error = "data is nullptr";
553 TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
554 HILOG_FATAL("taskpool:: %{public}s", error.c_str());
555 return;
556 }
557 Task* task = static_cast<Task*>(data);
558 if (TaskManager::GetInstance().GetTask(task->taskId_) == nullptr) {
559 std::string error = "task is nullptr, taskId: " + std::to_string(task->taskId_);
560 TaskManager::GetInstance().UvReportHisysEvent(nullptr, "TaskResultCallback", "", error, -1);
561 HILOG_FATAL("taskpool:: task is nullptr");
562 return;
563 }
564 auto worker = static_cast<Worker*>(task->worker_);
565 worker->isExecutingLongTask_ = task->IsLongTask();
566 task->DecreaseRefCount();
567 task->ioTime_ = ConcurrentHelper::GetMilliseconds();
568 if (task->cpuTime_ != 0) {
569 uint64_t ioDuration = task->ioTime_ - task->startTime_;
570 uint64_t cpuDuration = task->cpuTime_ - task->startTime_;
571 TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration);
572 }
573 napi_value exception = nullptr;
574 napi_get_and_clear_last_exception(env, &exception);
575 if (exception != nullptr) {
576 HILOG_ERROR("taskpool::TaskResultCallback occur exception");
577 reinterpret_cast<NativeEngine*>(env)->HandleTaskpoolException(exception);
578 task->success_ = false;
579 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
580 NotifyTaskResult(env, task, errorEvent);
581 return;
582 }
583
584 task->success_ = success;
585 NotifyTaskResult(env, task, result);
586 }
587
588 // reset qos_user_initiated after perform task
ResetWorkerPriority()589 void Worker::ResetWorkerPriority()
590 {
591 if (priority_ != Priority::HIGH) {
592 if (TaskManager::GetInstance().EnableFfrt()) {
593 #if defined(ENABLE_TASKPOOL_FFRT)
594 if (ffrt::this_task::update_qos(WORKERPRIORITY_FFRTQOS_MAP.at(Priority::HIGH)) != 0) {
595 SetWorkerPriority(Priority::HIGH);
596 }
597 #endif
598 } else {
599 SetWorkerPriority(Priority::HIGH);
600 }
601 priority_ = Priority::HIGH;
602 }
603 }
604
StoreTaskId(uint32_t taskId)605 void Worker::StoreTaskId(uint32_t taskId)
606 {
607 std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
608 currentTaskId_.emplace_back(taskId);
609 }
610
InitTaskPoolFunc(napi_env env,napi_value func,Task * task)611 bool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task)
612 {
613 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
614 bool success = workerEngine->InitTaskPoolFunc(env, func, task);
615 napi_value exception;
616 napi_get_and_clear_last_exception(env, &exception);
617 if (exception != nullptr) {
618 HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
619 task->success_ = false;
620 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
621 NotifyTaskResult(env, task, errorEvent);
622 return false;
623 }
624 if (!success) {
625 HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
626 napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
627 "taskpool:: function may not be concurrent.");
628 task->success_ = false;
629 NotifyTaskResult(env, task, err);
630 return false;
631 }
632 return true;
633 }
634
UpdateExecutedInfo()635 void Worker::UpdateExecutedInfo()
636 {
637 // if the worker is blocked, just skip
638 if (LIKELY(state_ != WorkerState::BLOCKED)) {
639 uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_;
640 TaskManager::GetInstance().UpdateExecutedInfo(duration);
641 }
642 }
643
644 // Only when the worker has no longTask can it be released.
TerminateTask(uint32_t taskId)645 void Worker::TerminateTask(uint32_t taskId)
646 {
647 HILOG_DEBUG("taskpool:: TerminateTask task:%{public}s", std::to_string(taskId).c_str());
648 std::lock_guard<std::mutex> lock(longMutex_);
649 longTasksSet_.erase(taskId);
650 if (longTasksSet_.empty()) {
651 hasLongTask_ = false;
652 }
653 }
654
655 // to store longTasks' state
UpdateLongTaskInfo(Task * task)656 void Worker::UpdateLongTaskInfo(Task* task)
657 {
658 HILOG_DEBUG("taskpool:: UpdateLongTaskInfo task:%{public}s", std::to_string(task->taskId_).c_str());
659 TaskManager::GetInstance().StoreLongTaskInfo(task->taskId_, this);
660 std::lock_guard<std::mutex> lock(longMutex_);
661 hasLongTask_ = true;
662 isExecutingLongTask_ = true;
663 longTasksSet_.emplace(task->taskId_);
664 }
665
IsExecutingLongTask()666 bool Worker::IsExecutingLongTask()
667 {
668 return isExecutingLongTask_;
669 }
670
HasLongTask()671 bool Worker::HasLongTask()
672 {
673 return hasLongTask_;
674 }
675
HandleFunctionException(napi_env env,Task * task)676 void Worker::HandleFunctionException(napi_env env, Task* task)
677 {
678 napi_value exception = nullptr;
679 napi_get_and_clear_last_exception(env, &exception);
680 if (exception != nullptr) {
681 HILOG_ERROR("taskpool::PerformTask occur exception");
682 reinterpret_cast<NativeEngine*>(env)->HandleTaskpoolException(exception);
683 task->DecreaseRefCount();
684 task->success_ = false;
685 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
686 NotifyTaskResult(env, task, errorEvent);
687 return;
688 }
689 NotifyHandleTaskResult(task);
690 }
691
PostReleaseSignal()692 void Worker::PostReleaseSignal()
693 {
694 if (UNLIKELY(!ConcurrentHelper::IsUvActive(clearWorkerSignal_))) {
695 HILOG_ERROR("taskpool:: clearWorkerSignal_ is nullptr or closed");
696 return;
697 }
698 uv_async_send(clearWorkerSignal_);
699 }
700
IsRunnable(uint64_t currTime) const701 bool Worker::IsRunnable(uint64_t currTime) const
702 {
703 bool res = true;
704 if (currTime > wakeUpTime_) {
705 res = (currTime - wakeUpTime_ < 15); // 15: ms
706 }
707 return res;
708 }
709
UpdateWorkerWakeUpTime()710 void Worker::UpdateWorkerWakeUpTime()
711 {
712 wakeUpTime_ = ConcurrentHelper::GetMilliseconds();
713 }
714
715 #if defined(ENABLE_TASKPOOL_HISYSEVENT)
IsNeedReport(uint64_t intervalTime)716 bool Worker::IsNeedReport(uint64_t intervalTime)
717 {
718 if (reportCount_ >= MAX_REPORT_TIMES) {
719 return false;
720 }
721 if (intervalTime < (reportCount_ + 1) * WORKER_ALIVE_TIME) {
722 return false;
723 }
724 return true;
725 }
726
IncreaseReportCount()727 void Worker::IncreaseReportCount()
728 {
729 reportCount_++;
730 }
731 #endif
732 } // namespace Commonlibrary::Concurrent::TaskPoolModule
733