• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "worker.h"
17 
18 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
19 #include "helper/hitrace_helper.h"
20 #include "process_helper.h"
21 #include "task_group.h"
22 #include "task_manager.h"
23 #include "utils/log.h"
24 
25 namespace Commonlibrary::Concurrent::TaskPoolModule {
26 using namespace OHOS::JsSysModule;
27 using namespace Commonlibrary::Platform;
28 
~RunningScope()29 Worker::RunningScope::~RunningScope()
30 {
31     if (scope_ != nullptr) {
32         napi_close_handle_scope(worker_->workerEnv_, scope_);
33     }
34     // only when worker is not blocked can it be inserted
35     {
36         std::lock_guard<std::mutex> lock(worker_->stateMutex_);
37         if (UNLIKELY(worker_->state_ == WorkerState::BLOCKED)) {
38             return;
39         }
40     }
41     worker_->NotifyIdle();
42 }
43 
WorkerConstructor(napi_env env)44 Worker* Worker::WorkerConstructor(napi_env env)
45 {
46     HITRACE_HELPER_METER_NAME("WorkerConstructor: [Add Thread]");
47     Worker* worker = new Worker(env);
48     worker->StartExecuteInThread();
49     return worker;
50 }
51 
ReleaseWorkerHandles(const uv_async_t * req)52 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
53 {
54     HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
55     auto worker = static_cast<Worker*>(req->data);
56     // when there is no active handle, worker loop will stop automatically.
57     uv_close(reinterpret_cast<uv_handle_t*>(worker->performTaskSignal_), [](uv_handle_t* handle) {
58         if (handle != nullptr) {
59             delete reinterpret_cast<uv_async_t*>(handle);
60             handle = nullptr;
61         }
62     });
63 
64 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
65     uv_close(reinterpret_cast<uv_handle_t*>(worker->debuggerOnPostTaskSignal_), [](uv_handle_t* handle) {
66         if (handle != nullptr) {
67             delete reinterpret_cast<uv_async_t*>(handle);
68             handle = nullptr;
69         }
70     });
71 #endif
72 
73     uv_close(reinterpret_cast<uv_handle_t*>(worker->clearWorkerSignal_), [](uv_handle_t* handle) {
74         if (handle != nullptr) {
75             delete reinterpret_cast<uv_async_t*>(handle);
76             handle = nullptr;
77         }
78     });
79 
80     uv_loop_t* loop = worker->GetWorkerLoop();
81     if (loop != nullptr) {
82         uv_stop(loop);
83     }
84 }
85 
StartExecuteInThread()86 void Worker::StartExecuteInThread()
87 {
88     if (!runner_) {
89         runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
90     }
91     if (runner_) {
92         runner_->Execute(); // start a new thread
93     } else {
94         HILOG_ERROR("taskpool:: runner_ is nullptr");
95     }
96 }
97 
98 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)99 void Worker::HandleDebuggerTask(const uv_async_t* req)
100 {
101     Worker* worker = reinterpret_cast<Worker*>(req->data);
102     if (worker == nullptr) {
103         HILOG_ERROR("taskpool:: worker is null");
104         return;
105     }
106     worker->debuggerTask_();
107 }
108 
DebuggerOnPostTask(std::function<void ()> && task)109 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
110 {
111     if (uv_is_active(reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) {
112         debuggerTask_ = std::move(task);
113         uv_async_send(debuggerOnPostTaskSignal_);
114     }
115 }
116 #endif
117 
ExecuteInThread(const void * data)118 void Worker::ExecuteInThread(const void* data)
119 {
120     HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
121     auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
122     {
123         napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
124         if (worker->workerEnv_ == nullptr) {
125             HILOG_ERROR("taskpool:: workerEnv is nullptr");
126             return;
127         }
128         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
129         // mark worker env is taskpoolThread
130         workerEngine->MarkTaskPoolThread();
131         workerEngine->InitTaskPoolThread(workerEngine, Worker::TaskResultCallback);
132     }
133     uv_loop_t* loop = worker->GetWorkerLoop();
134     if (loop == nullptr) {
135         HILOG_ERROR("taskpool:: loop is nullptr");
136         return;
137     }
138     // save the worker tid
139     worker->tid_ = GetThreadId();
140 
141     // Init worker task execute signal
142     worker->performTaskSignal_ = new uv_async_t;
143     worker->performTaskSignal_->data = worker;
144     uv_async_init(loop, worker->performTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::PerformTask));
145 
146     worker->clearWorkerSignal_ = new uv_async_t;
147     worker->clearWorkerSignal_->data = worker;
148     uv_async_init(loop, worker->clearWorkerSignal_, reinterpret_cast<uv_async_cb>(Worker::ReleaseWorkerHandles));
149 
150     HITRACE_HELPER_FINISH_TRACE;
151 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
152     // Init debugger task post signal
153     worker->debuggerOnPostTaskSignal_ = new uv_async_t;
154     worker->debuggerOnPostTaskSignal_->data = worker;
155     uv_async_init(loop, worker->debuggerOnPostTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::HandleDebuggerTask));
156 #endif
157     if (worker->PrepareForWorkerInstance()) {
158         // Call after uv_async_init
159         worker->NotifyWorkerCreated();
160         worker->RunLoop();
161     } else {
162         HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
163     }
164     TaskManager::GetInstance().RemoveWorker(worker);
165     worker->ReleaseWorkerThreadContent();
166     delete worker;
167     worker = nullptr;
168 }
169 
PrepareForWorkerInstance()170 bool Worker::PrepareForWorkerInstance()
171 {
172     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
173     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
174 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
175     workerEngine->SetDebuggerPostTaskFunc(
176         std::bind(&Worker::DebuggerOnPostTask, this, std::placeholders::_1));
177 #endif
178     if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
179         HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
180         return false;
181     }
182     // register timer interface
183     Timer::RegisterTime(workerEnv_);
184 
185     // Check exception after worker construction
186     if (NapiHelper::IsExceptionPending(workerEnv_)) {
187         HILOG_ERROR("taskpool:: Worker construction occur exception");
188         return false;
189     }
190     return true;
191 }
192 
ReleaseWorkerThreadContent()193 void Worker::ReleaseWorkerThreadContent()
194 {
195     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
196     if (workerEngine == nullptr) {
197         HILOG_ERROR("taskpool:: workerEngine is nullptr");
198         return;
199     }
200     if (state_ == WorkerState::BLOCKED) {
201         HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
202     } else {
203         HITRACE_HELPER_METER_NAME("Thread Exit");
204     }
205 
206     Timer::ClearEnvironmentTimer(workerEnv_);
207 
208     // 2. delete NativeEngine created in worker thread
209     if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
210         HILOG_ERROR("worker:: CallOffWorkerFunc error");
211     }
212     delete workerEngine;
213     workerEnv_ = nullptr;
214 }
215 
NotifyExecuteTask()216 void Worker::NotifyExecuteTask()
217 {
218     if (uv_is_active(reinterpret_cast<uv_handle_t*>(performTaskSignal_))) {
219         uv_async_send(performTaskSignal_);
220     }
221 }
222 
NotifyIdle()223 void Worker::NotifyIdle()
224 {
225     TaskManager::GetInstance().NotifyWorkerIdle(this);
226 }
227 
NotifyWorkerCreated()228 void Worker::NotifyWorkerCreated()
229 {
230     TaskManager::GetInstance().NotifyWorkerCreated(this);
231 }
232 
NotifyTaskFinished()233 void Worker::NotifyTaskFinished()
234 {
235     if (--runningCount_ != 0) {
236         // the worker state is still RUNNING and the start time will be updated
237         startTime_ = ConcurrentHelper::GetMilliseconds();
238     } else {
239         std::lock_guard<std::mutex> lock(stateMutex_);
240         if (state_ != WorkerState::BLOCKED) {
241             state_ = WorkerState::IDLE;
242         }
243     }
244     idlePoint_ = ConcurrentHelper::GetMilliseconds();
245 }
246 
PerformTask(const uv_async_t * req)247 void Worker::PerformTask(const uv_async_t* req)
248 {
249     auto worker = static_cast<Worker*>(req->data);
250     napi_env env = worker->workerEnv_;
251     napi_status status = napi_ok;
252     RunningScope runningScope(worker, status);
253     NAPI_CALL_RETURN_VOID(env, status);
254     auto executeIdAndPriority = TaskManager::GetInstance().DequeueExecuteId();
255     if (executeIdAndPriority.first == 0) {
256         worker->NotifyTaskFinished();
257         return;
258     }
259 
260     PriorityScope priorityScope(worker, executeIdAndPriority.second);
261     TaskInfo* taskInfo = TaskManager::GetInstance().GetTaskInfo(executeIdAndPriority.first);
262     if (taskInfo == nullptr) { // task may have been canceled
263         worker->NotifyTaskFinished();
264         HILOG_DEBUG("taskpool::PerformTask taskInfo is null");
265         return;
266     }
267     {
268         std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
269         worker->currentTaskId_.emplace_back(taskInfo->taskId);
270     }
271     // tag for trace parse: Task Perform
272     std::string strTrace = "Task Perform: taskId : " + std::to_string(taskInfo->taskId) + ", executeId : " +
273                            std::to_string(taskInfo->executeId);
274     HITRACE_HELPER_METER_NAME(strTrace);
275 
276     taskInfo->worker = worker;
277     TaskManager::GetInstance().UpdateExecuteState(taskInfo->executeId, ExecuteState::RUNNING);
278     napi_value func;
279     status = napi_deserialize(env, taskInfo->serializationFunction, &func);
280     if (status != napi_ok || func == nullptr) {
281         HILOG_ERROR("taskpool:: PerformTask deserialize function fail");
282         napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION,
283                                                "taskpool: failed to deserialize function.");
284         taskInfo->success = false;
285         NotifyTaskResult(env, taskInfo, err);
286         return;
287     }
288     napi_value args;
289     status = napi_deserialize(env, taskInfo->serializationArguments, &args);
290     if (status != napi_ok || args == nullptr) {
291         HILOG_ERROR("taskpool:: PerformTask deserialize arguments fail");
292         napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION,
293                                                "taskpool: failed to deserialize arguments.");
294         taskInfo->success = false;
295         NotifyTaskResult(env, taskInfo, err);
296         return;
297     }
298 
299     auto funcVal = reinterpret_cast<NativeValue*>(func);
300     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
301     // Store taskinfo in function
302     bool success = workerEngine->InitTaskPoolFunc(workerEngine, funcVal, taskInfo);
303     napi_value exception;
304     napi_get_and_clear_last_exception(env, &exception);
305     if (exception != nullptr) {
306         HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
307         taskInfo->success = false;
308         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
309         NotifyTaskResult(env, taskInfo, errorEvent);
310         return;
311     }
312     if (!success) {
313         HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
314         napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
315                                                "taskpool: function may not be concurrent.");
316         taskInfo->success = false;
317         NotifyTaskResult(env, taskInfo, err);
318         return;
319     }
320 
321     uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
322     napi_value argsArray[argsNum];
323     napi_value val;
324     for (size_t i = 0; i < argsNum; i++) {
325         napi_get_element(env, args, i, &val);
326         argsArray[i] = val;
327     }
328 
329     napi_value result;
330     napi_value undefined = NapiHelper::GetUndefinedValue(env);
331     napi_call_function(env, undefined, func, argsNum, argsArray, &result);
332     {
333         std::lock_guard<std::mutex> lock(worker->stateMutex_);
334         if (LIKELY(worker->state_ == WorkerState::RUNNING)) {
335             uint64_t duration = ConcurrentHelper::GetMilliseconds() - worker->startTime_;
336             TaskManager::GetInstance().UpdateExecutedInfo(duration);
337         }
338     }
339     napi_get_and_clear_last_exception(env, &exception);
340     if (exception != nullptr) {
341         HILOG_ERROR("taskpool::PerformTask occur exception");
342         taskInfo->success = false;
343         napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
344         NotifyTaskResult(env, taskInfo, errorEvent);
345     }
346 }
347 
NotifyTaskResult(napi_env env,TaskInfo * taskInfo,napi_value result)348 void Worker::NotifyTaskResult(napi_env env, TaskInfo* taskInfo, napi_value result)
349 {
350     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
351     napi_value undefined = NapiHelper::GetUndefinedValue(env);
352     napi_value resultData;
353     napi_status status = napi_serialize(env, result, undefined, &resultData);
354     if ((status != napi_ok || resultData == nullptr) && taskInfo->success) {
355         taskInfo->success = false;
356         napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION,
357                                                "taskpool: failed to serialize result.");
358         NotifyTaskResult(env, taskInfo, err);
359         return;
360     }
361     taskInfo->result = resultData;
362 
363     TaskManager::GetInstance().RemoveExecuteState(taskInfo->executeId);
364     if (taskInfo->groupExecuteId == 0) {
365         TaskManager::GetInstance().PopRunningInfo(taskInfo->taskId, taskInfo->executeId);
366     }
367     TaskManager::GetInstance().PopTaskInfo(taskInfo->executeId);
368     Worker* worker = reinterpret_cast<Worker*>(taskInfo->worker);
369     {
370         std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
371         worker->currentTaskId_.erase(std::find(worker->currentTaskId_.begin(),
372                                                worker->currentTaskId_.end(),
373                                                taskInfo->taskId));
374     }
375     uv_async_send(taskInfo->onResultSignal);
376     worker->NotifyTaskFinished();
377 }
378 
TaskResultCallback(NativeEngine * engine,NativeValue * result,bool success,void * data)379 void Worker::TaskResultCallback(NativeEngine* engine, NativeValue* result, bool success, void* data)
380 {
381     HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
382     if (engine == nullptr) {
383         HILOG_FATAL("taskpool::TaskResultCallback engine is null");
384         return;
385     }
386     if (data == nullptr) {
387         HILOG_FATAL("taskpool:: taskInfo is nullptr");
388         return;
389     }
390     TaskInfo* taskInfo = static_cast<TaskInfo*>(data);
391     auto env = reinterpret_cast<napi_env>(engine);
392     taskInfo->success = success;
393     NotifyTaskResult(env, taskInfo, reinterpret_cast<napi_value>(result));
394 }
395 
396 // reset qos_user_initiated after perform task
ResetWorkerPriority()397 void Worker::ResetWorkerPriority()
398 {
399     if (priority_ != Priority::HIGH) {
400         SetWorkerPriority(Priority::HIGH);
401         priority_ = Priority::HIGH;
402     }
403 }
404 } // namespace Commonlibrary::Concurrent::TaskPoolModule
405