• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "worker.h"
17 
18 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
19 #include "helper/hitrace_helper.h"
20 #include "process_helper.h"
21 #include "task_group.h"
22 #include "task_manager.h"
23 #include "utils/log.h"
24 
25 namespace Commonlibrary::Concurrent::TaskPoolModule {
26 using namespace OHOS::JsSysModule;
27 using namespace Commonlibrary::Platform;
28 
~RunningScope()29 Worker::RunningScope::~RunningScope()
30 {
31     if (scope_ != nullptr) {
32         napi_close_handle_scope(worker_->workerEnv_, scope_);
33     }
34     worker_->NotifyIdle();
35     worker_->idleState_ = true;
36 }
37 
WorkerConstructor(napi_env env)38 Worker* Worker::WorkerConstructor(napi_env env)
39 {
40     HITRACE_HELPER_METER_NAME("TaskWorkerConstructor: [Add Thread]");
41     Worker* worker = new Worker(env);
42     worker->StartExecuteInThread();
43     return worker;
44 }
45 
ReleaseWorkerHandles(const uv_async_t * req)46 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
47 {
48     auto worker = static_cast<Worker*>(req->data);
49     HILOG_DEBUG("taskpool:: enter the worker loop and try to release thread: %{public}d", worker->tid_);
50     if (!worker->CheckFreeConditions()) {
51         return;
52     }
53 
54     TaskManager::GetInstance().RemoveWorker(worker);
55     HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
56     HILOG_INFO("taskpool:: the thread is idle and will be released, and the total num is %{public}u now",
57         TaskManager::GetInstance().GetThreadNum());
58     // when there is no active handle, worker loop will stop automatically.
59     ConcurrentHelper::UvHandleClose(worker->performTaskSignal_);
60 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
61     ConcurrentHelper::UvHandleClose(worker->debuggerOnPostTaskSignal_);
62 #endif
63     ConcurrentHelper::UvHandleClose(worker->clearWorkerSignal_);
64 
65     uv_loop_t* loop = worker->GetWorkerLoop();
66     if (loop != nullptr) {
67         uv_stop(loop);
68     }
69 }
70 
CheckFreeConditions()71 bool Worker::CheckFreeConditions()
72 {
73     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
74     // only when all conditions are met can the worker be freed
75     if (workerEngine->HasListeningCounter()) {
76         HILOG_DEBUG("taskpool:: listening operation exits, the worker thread will not exit");
77     } else if (Timer::HasTimer(workerEnv_)) {
78         HILOG_DEBUG("taskpool:: timer exits, the worker thread will not exit");
79     } else if (workerEngine->HasWaitingRequest()) {
80         HILOG_DEBUG("taskpool:: waiting request exits, the worker thread will not exit");
81     } else if (workerEngine->HasSubEnv()) {
82         HILOG_DEBUG("taskpool:: sub env exits, the worker thread will not exit");
83     } else if (workerEngine->HasPendingJob()) {
84         HILOG_DEBUG("taskpool:: pending job exits, the worker thread will not exit");
85     } else if (workerEngine->IsProfiling()) {
86         HILOG_DEBUG("taskpool:: the worker thread will not exit during profiling");
87     } else {
88         return true;
89     }
90     HILOG_DEBUG("taskpool:: the worker %{public}d can't be released due to not meeting the conditions", tid_);
91     TaskManager& taskManager = TaskManager::GetInstance();
92     taskManager.RestoreWorker(this);
93     taskManager.CountTraceForWorker();
94     return false;
95 }
96 
StartExecuteInThread()97 void Worker::StartExecuteInThread()
98 {
99     if (!runner_) {
100         runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
101     }
102     if (runner_) {
103         runner_->Execute(); // start a new thread
104     } else {
105         HILOG_ERROR("taskpool:: runner_ is nullptr");
106     }
107 }
108 
109 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)110 void Worker::HandleDebuggerTask(const uv_async_t* req)
111 {
112     Worker* worker = reinterpret_cast<Worker*>(req->data);
113     if (worker == nullptr) {
114         HILOG_ERROR("taskpool:: worker is null");
115         return;
116     }
117     worker->debuggerMutex_.lock();
118     auto task = std::move(worker->debuggerQueue_.front());
119     worker->debuggerQueue_.pop();
120     worker->debuggerMutex_.unlock();
121     task();
122 }
123 
DebuggerOnPostTask(std::function<void ()> && task)124 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
125 {
126     if (uv_is_active(reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) {
127         std::lock_guard<std::mutex> lock(debuggerMutex_);
128         debuggerQueue_.push(std::move(task));
129         uv_async_send(debuggerOnPostTaskSignal_);
130     }
131 }
132 #endif
133 
ExecuteInThread(const void * data)134 void Worker::ExecuteInThread(const void* data)
135 {
136     HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
137     auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
138     {
139         napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
140         if (worker->workerEnv_ == nullptr) {
141             HILOG_ERROR("taskpool:: workerEnv is nullptr");
142             return;
143         }
144         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
145         // mark worker env is taskpoolThread
146         workerEngine->MarkTaskPoolThread();
147         workerEngine->InitTaskPoolThread(worker->workerEnv_, Worker::TaskResultCallback);
148     }
149     uv_loop_t* loop = worker->GetWorkerLoop();
150     if (loop == nullptr) {
151         HILOG_ERROR("taskpool:: loop is nullptr");
152         return;
153     }
154     // save the worker tid
155     worker->tid_ = GetThreadId();
156 
157     // Init worker task execute signal
158     worker->performTaskSignal_ = new uv_async_t;
159     worker->performTaskSignal_->data = worker;
160     uv_async_init(loop, worker->performTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::PerformTask));
161 
162     worker->clearWorkerSignal_ = new uv_async_t;
163     worker->clearWorkerSignal_->data = worker;
164     uv_async_init(loop, worker->clearWorkerSignal_, reinterpret_cast<uv_async_cb>(Worker::ReleaseWorkerHandles));
165 
166     HITRACE_HELPER_FINISH_TRACE;
167 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
168     // Init debugger task post signal
169     worker->debuggerOnPostTaskSignal_ = new uv_async_t;
170     worker->debuggerOnPostTaskSignal_->data = worker;
171     uv_async_init(loop, worker->debuggerOnPostTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::HandleDebuggerTask));
172 #endif
173     if (worker->PrepareForWorkerInstance()) {
174         // Call after uv_async_init
175         worker->NotifyWorkerCreated();
176         worker->RunLoop();
177     } else {
178         HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
179     }
180     TaskManager::GetInstance().RemoveWorker(worker);
181     TaskManager::GetInstance().CountTraceForWorker();
182     worker->ReleaseWorkerThreadContent();
183     delete worker;
184     worker = nullptr;
185 }
186 
PrepareForWorkerInstance()187 bool Worker::PrepareForWorkerInstance()
188 {
189     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
190     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
191 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
192     workerEngine->SetDebuggerPostTaskFunc(
193         std::bind(&Worker::DebuggerOnPostTask, this, std::placeholders::_1));
194 #endif
195     if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
196         HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
197         return false;
198     }
199     // register timer interface
200     Timer::RegisterTime(workerEnv_);
201 
202     // Check exception after worker construction
203     if (NapiHelper::IsExceptionPending(workerEnv_)) {
204         HILOG_ERROR("taskpool:: Worker construction occur exception");
205         return false;
206     }
207     return true;
208 }
209 
ReleaseWorkerThreadContent()210 void Worker::ReleaseWorkerThreadContent()
211 {
212     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
213     auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
214     if (workerEngine == nullptr) {
215         HILOG_ERROR("taskpool:: workerEngine is nullptr");
216         return;
217     }
218     if (hostEngine != nullptr) {
219         if (!hostEngine->DeleteWorker(workerEngine)) {
220             HILOG_ERROR("taskpool:: DeleteWorker fail");
221         }
222     }
223     if (state_ == WorkerState::BLOCKED) {
224         HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
225     } else {
226         HITRACE_HELPER_METER_NAME("Thread Exit");
227     }
228 
229     Timer::ClearEnvironmentTimer(workerEnv_);
230     // 2. delete NativeEngine created in worker thread
231     if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
232         HILOG_ERROR("worker:: CallOffWorkerFunc error");
233     }
234     delete workerEngine;
235     workerEnv_ = nullptr;
236 }
237 
NotifyExecuteTask()238 void Worker::NotifyExecuteTask()
239 {
240     if (LIKELY(uv_is_active(reinterpret_cast<uv_handle_t*>(performTaskSignal_)))) {
241         uv_async_send(performTaskSignal_);
242     }
243 }
244 
NotifyIdle()245 void Worker::NotifyIdle()
246 {
247     TaskManager::GetInstance().NotifyWorkerIdle(this);
248 }
249 
NotifyWorkerCreated()250 void Worker::NotifyWorkerCreated()
251 {
252     TaskManager::GetInstance().NotifyWorkerCreated(this);
253 }
254 
NotifyTaskFinished()255 void Worker::NotifyTaskFinished()
256 {
257     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
258     if (--runningCount_ != 0 || workerEngine->HasPendingJob()) {
259         // the worker state is still RUNNING and the start time will be updated
260         startTime_ = ConcurrentHelper::GetMilliseconds();
261     } else {
262         UpdateWorkerState(WorkerState::RUNNING, WorkerState::IDLE);
263     }
264     idlePoint_ = ConcurrentHelper::GetMilliseconds();
265 }
266 
UpdateWorkerState(WorkerState expect,WorkerState desired)267 bool Worker::UpdateWorkerState(WorkerState expect, WorkerState desired)
268 {
269     return state_.compare_exchange_strong(expect, desired);
270 }
271 
PerformTask(const uv_async_t * req)272 void Worker::PerformTask(const uv_async_t* req)
273 {
274     uint64_t startTime = ConcurrentHelper::GetMilliseconds();
275     auto worker = static_cast<Worker*>(req->data);
276     napi_env env = worker->workerEnv_;
277     TaskManager::GetInstance().NotifyWorkerRunning(worker);
278     auto taskInfo = TaskManager::GetInstance().DequeueTaskId();
279     if (taskInfo.first == 0) {
280         worker->NotifyIdle();
281         return;
282     }
283     RunningScope runningScope(worker);
284     PriorityScope priorityScope(worker, taskInfo.second);
285     Task* task = TaskManager::GetInstance().GetTask(taskInfo.first);
286     if (task == nullptr) {
287         HILOG_ERROR("taskpool:: task is null");
288         return;
289     }
290     if (!task->UpdateTask(startTime, worker)) {
291         return;
292     }
293     if (task->IsGroupTask()) {
294         TaskGroupManager::GetInstance().UpdateGroupState(task->groupId_);
295     }
296     worker->StoreTaskId(task->taskId_);
297     // tag for trace parse: Task Perform
298     std::string strTrace = "Task Perform: name : "  + task->name_ + ", taskId : " + std::to_string(task->taskId_);
299     HITRACE_HELPER_METER_NAME(strTrace);
300     napi_value func = task->DeserializeValue(env, true, false);
301     if (func == nullptr) {
302         return;
303     }
304     napi_value args = task->DeserializeValue(env, false, true);
305     if (args == nullptr) {
306         return;
307     }
308     if (!worker->InitTaskPoolFunc(env, func, task)) {
309         return;
310     }
311     uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
312     napi_value argsArray[argsNum];
313     for (size_t i = 0; i < argsNum; i++) {
314         argsArray[i] = NapiHelper::GetElement(env, args, i);
315     }
316     napi_call_function(env, NapiHelper::GetGlobalObject(env), func, argsNum, argsArray, nullptr);
317     task->DecreaseRefCount();
318     task->StoreTaskDuration();
319     worker->UpdateExecutedInfo();
320     HandleFunctionException(env, task);
321 }
322 
NotifyTaskResult(napi_env env,Task * task,napi_value result)323 void Worker::NotifyTaskResult(napi_env env, Task* task, napi_value result)
324 {
325     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
326     napi_value resultData;
327     napi_value undefined = NapiHelper::GetUndefinedValue(env);
328     bool defaultTransfer = true;
329     bool defaultCloneSendable = true;
330     napi_status status = napi_serialize(env, result, undefined, undefined,
331                                         defaultTransfer, defaultCloneSendable, &resultData);
332     if ((status != napi_ok || resultData == nullptr) && task->success_) {
333         task->success_ = false;
334         std::string errMessage = "taskpool: failed to serialize result.";
335         HILOG_ERROR("%{public}s", errMessage.c_str());
336         napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION, errMessage.c_str());
337         NotifyTaskResult(env, task, err);
338         return;
339     }
340     task->result_ = resultData;
341     NotifyHandleTaskResult(task);
342 }
343 
NotifyHandleTaskResult(Task * task)344 void Worker::NotifyHandleTaskResult(Task* task)
345 {
346     if (!task->IsReadyToHandle()) {
347         return;
348     }
349     Worker* worker = reinterpret_cast<Worker*>(task->worker_);
350     if (worker != nullptr) {
351         std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
352         auto iter = std::find(worker->currentTaskId_.begin(), worker->currentTaskId_.end(), task->taskId_);
353         if (iter != worker->currentTaskId_.end()) {
354             worker->currentTaskId_.erase(iter);
355         }
356     }
357     uv_async_send(task->onResultSignal_);
358     worker->NotifyTaskFinished();
359 }
360 
TaskResultCallback(napi_env env,napi_value result,bool success,void * data)361 void Worker::TaskResultCallback(napi_env env, napi_value result, bool success, void* data)
362 {
363     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
364     if (env == nullptr) {
365         HILOG_FATAL("taskpool:: TaskResultCallback engine is null");
366         return;
367     }
368     if (data == nullptr) {
369         HILOG_FATAL("taskpool:: task is nullptr");
370         return;
371     }
372     Task* task = static_cast<Task*>(data);
373     task->DecreaseRefCount();
374     task->ioTime_ = ConcurrentHelper::GetMilliseconds();
375     if (task->cpuTime_ != 0) {
376         uint64_t ioDuration = task->ioTime_ - task->startTime_;
377         uint64_t cpuDuration = task->cpuTime_ - task->startTime_;
378         TaskManager::GetInstance().StoreTaskDuration(task->taskId_, std::max(ioDuration, cpuDuration), cpuDuration);
379     }
380     task->success_ = success;
381     NotifyTaskResult(env, task, result);
382 }
383 
384 // reset qos_user_initiated after perform task
ResetWorkerPriority()385 void Worker::ResetWorkerPriority()
386 {
387     if (priority_ != Priority::HIGH) {
388         SetWorkerPriority(Priority::HIGH);
389         priority_ = Priority::HIGH;
390     }
391 }
392 
Enqueue(TaskResultInfo * resultInfo)393 void Worker::Enqueue(TaskResultInfo* resultInfo)
394 {
395     hostMessageQueue_.EnQueue(resultInfo);
396 }
397 
Dequeue()398 TaskResultInfo* Worker::Dequeue()
399 {
400     return hostMessageQueue_.DeQueue();
401 }
402 
IsQueueEmpty()403 bool Worker::IsQueueEmpty()
404 {
405     return hostMessageQueue_.IsEmpty();
406 }
407 
StoreTaskId(uint64_t taskId)408 void Worker::StoreTaskId(uint64_t taskId)
409 {
410     std::lock_guard<std::mutex> lock(currentTaskIdMutex_);
411     currentTaskId_.emplace_back(taskId);
412 }
413 
InitTaskPoolFunc(napi_env env,napi_value func,Task * task)414 bool Worker::InitTaskPoolFunc(napi_env env, napi_value func, Task* task)
415 {
416     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
417     bool success = workerEngine->InitTaskPoolFunc(env, func, task);
418     napi_value exception;
419     napi_get_and_clear_last_exception(env, &exception);
420     if (exception != nullptr) {
421         HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
422         task->success_ = false;
423         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
424         NotifyTaskResult(env, task, errorEvent);
425         return false;
426     }
427     if (!success) {
428         HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
429         napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
430                                                "taskpool:: function may not be concurrent.");
431         task->success_ = false;
432         NotifyTaskResult(env, task, err);
433         return false;
434     }
435     return true;
436 }
437 
UpdateExecutedInfo()438 void Worker::UpdateExecutedInfo()
439 {
440     // if the worker is blocked, just skip
441     if (LIKELY(state_ != WorkerState::BLOCKED)) {
442         uint64_t duration = ConcurrentHelper::GetMilliseconds() - startTime_;
443         TaskManager::GetInstance().UpdateExecutedInfo(duration);
444     }
445 }
446 
HandleFunctionException(napi_env env,Task * task)447 void Worker::HandleFunctionException(napi_env env, Task* task)
448 {
449     napi_value exception;
450     napi_get_and_clear_last_exception(env, &exception);
451     if (exception != nullptr) {
452         HILOG_ERROR("taskpool::PerformTask occur exception");
453         task->success_ = false;
454         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
455         NotifyTaskResult(env, task, errorEvent);
456         return;
457     }
458     NotifyHandleTaskResult(task);
459 }
460 } // namespace Commonlibrary::Concurrent::TaskPoolModule
461