1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "worker.h"
17
18 #include "commonlibrary/ets_utils/js_sys_module/timer/timer.h"
19 #include "helper/hitrace_helper.h"
20 #include "process_helper.h"
21 #include "task_group.h"
22 #include "task_manager.h"
23 #include "utils/log.h"
24
25 namespace Commonlibrary::Concurrent::TaskPoolModule {
26 using namespace OHOS::JsSysModule;
27 using namespace Commonlibrary::Platform;
28
~RunningScope()29 Worker::RunningScope::~RunningScope()
30 {
31 if (scope_ != nullptr) {
32 napi_close_handle_scope(worker_->workerEnv_, scope_);
33 }
34 // only when worker is not blocked can it be inserted
35 {
36 std::lock_guard<std::mutex> lock(worker_->stateMutex_);
37 if (UNLIKELY(worker_->state_ == WorkerState::BLOCKED)) {
38 return;
39 }
40 }
41 worker_->NotifyIdle();
42 }
43
WorkerConstructor(napi_env env)44 Worker* Worker::WorkerConstructor(napi_env env)
45 {
46 HITRACE_HELPER_METER_NAME("WorkerConstructor: [Add Thread]");
47 Worker* worker = new Worker(env);
48 worker->StartExecuteInThread();
49 return worker;
50 }
51
ReleaseWorkerHandles(const uv_async_t * req)52 void Worker::ReleaseWorkerHandles(const uv_async_t* req)
53 {
54 HITRACE_HELPER_METER_NAME("ReleaseWorkerHandles: [Release Thread]");
55 auto worker = static_cast<Worker*>(req->data);
56 // when there is no active handle, worker loop will stop automatically.
57 uv_close(reinterpret_cast<uv_handle_t*>(worker->performTaskSignal_), [](uv_handle_t* handle) {
58 if (handle != nullptr) {
59 delete reinterpret_cast<uv_async_t*>(handle);
60 handle = nullptr;
61 }
62 });
63
64 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
65 uv_close(reinterpret_cast<uv_handle_t*>(worker->debuggerOnPostTaskSignal_), [](uv_handle_t* handle) {
66 if (handle != nullptr) {
67 delete reinterpret_cast<uv_async_t*>(handle);
68 handle = nullptr;
69 }
70 });
71 #endif
72
73 uv_close(reinterpret_cast<uv_handle_t*>(worker->clearWorkerSignal_), [](uv_handle_t* handle) {
74 if (handle != nullptr) {
75 delete reinterpret_cast<uv_async_t*>(handle);
76 handle = nullptr;
77 }
78 });
79
80 uv_loop_t* loop = worker->GetWorkerLoop();
81 if (loop != nullptr) {
82 uv_stop(loop);
83 }
84 }
85
StartExecuteInThread()86 void Worker::StartExecuteInThread()
87 {
88 if (!runner_) {
89 runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
90 }
91 if (runner_) {
92 runner_->Execute(); // start a new thread
93 } else {
94 HILOG_ERROR("taskpool:: runner_ is nullptr");
95 }
96 }
97
98 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)99 void Worker::HandleDebuggerTask(const uv_async_t* req)
100 {
101 Worker* worker = reinterpret_cast<Worker*>(req->data);
102 if (worker == nullptr) {
103 HILOG_ERROR("taskpool:: worker is null");
104 return;
105 }
106 worker->debuggerTask_();
107 }
108
DebuggerOnPostTask(std::function<void ()> && task)109 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
110 {
111 if (uv_is_active(reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) {
112 debuggerTask_ = std::move(task);
113 uv_async_send(debuggerOnPostTaskSignal_);
114 }
115 }
116 #endif
117
ExecuteInThread(const void * data)118 void Worker::ExecuteInThread(const void* data)
119 {
120 HITRACE_HELPER_START_TRACE(__PRETTY_FUNCTION__);
121 auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
122 {
123 napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
124 if (worker->workerEnv_ == nullptr) {
125 HILOG_ERROR("taskpool:: workerEnv is nullptr");
126 return;
127 }
128 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
129 // mark worker env is taskpoolThread
130 workerEngine->MarkTaskPoolThread();
131 workerEngine->InitTaskPoolThread(workerEngine, Worker::TaskResultCallback);
132 }
133 uv_loop_t* loop = worker->GetWorkerLoop();
134 if (loop == nullptr) {
135 HILOG_ERROR("taskpool:: loop is nullptr");
136 return;
137 }
138 // save the worker tid
139 worker->tid_ = GetThreadId();
140
141 // Init worker task execute signal
142 worker->performTaskSignal_ = new uv_async_t;
143 worker->performTaskSignal_->data = worker;
144 uv_async_init(loop, worker->performTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::PerformTask));
145
146 worker->clearWorkerSignal_ = new uv_async_t;
147 worker->clearWorkerSignal_->data = worker;
148 uv_async_init(loop, worker->clearWorkerSignal_, reinterpret_cast<uv_async_cb>(Worker::ReleaseWorkerHandles));
149
150 HITRACE_HELPER_FINISH_TRACE;
151 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
152 // Init debugger task post signal
153 worker->debuggerOnPostTaskSignal_ = new uv_async_t;
154 worker->debuggerOnPostTaskSignal_->data = worker;
155 uv_async_init(loop, worker->debuggerOnPostTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::HandleDebuggerTask));
156 #endif
157 if (worker->PrepareForWorkerInstance()) {
158 // Call after uv_async_init
159 worker->NotifyWorkerCreated();
160 worker->RunLoop();
161 } else {
162 HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance fail");
163 }
164 TaskManager::GetInstance().RemoveWorker(worker);
165 worker->ReleaseWorkerThreadContent();
166 delete worker;
167 worker = nullptr;
168 }
169
PrepareForWorkerInstance()170 bool Worker::PrepareForWorkerInstance()
171 {
172 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
173 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
174 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
175 workerEngine->SetDebuggerPostTaskFunc(
176 std::bind(&Worker::DebuggerOnPostTask, this, std::placeholders::_1));
177 #endif
178 if (!workerEngine->CallInitWorkerFunc(workerEngine)) {
179 HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc fail");
180 return false;
181 }
182 // register timer interface
183 Timer::RegisterTime(workerEnv_);
184
185 // Check exception after worker construction
186 if (NapiHelper::IsExceptionPending(workerEnv_)) {
187 HILOG_ERROR("taskpool:: Worker construction occur exception");
188 return false;
189 }
190 return true;
191 }
192
ReleaseWorkerThreadContent()193 void Worker::ReleaseWorkerThreadContent()
194 {
195 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
196 if (workerEngine == nullptr) {
197 HILOG_ERROR("taskpool:: workerEngine is nullptr");
198 return;
199 }
200 if (state_ == WorkerState::BLOCKED) {
201 HITRACE_HELPER_METER_NAME("Thread Timeout Exit");
202 } else {
203 HITRACE_HELPER_METER_NAME("Thread Exit");
204 }
205
206 Timer::ClearEnvironmentTimer(workerEnv_);
207
208 // 2. delete NativeEngine created in worker thread
209 if (!workerEngine->CallOffWorkerFunc(workerEngine)) {
210 HILOG_ERROR("worker:: CallOffWorkerFunc error");
211 }
212 delete workerEngine;
213 workerEnv_ = nullptr;
214 }
215
NotifyExecuteTask()216 void Worker::NotifyExecuteTask()
217 {
218 if (uv_is_active(reinterpret_cast<uv_handle_t*>(performTaskSignal_))) {
219 uv_async_send(performTaskSignal_);
220 }
221 }
222
NotifyIdle()223 void Worker::NotifyIdle()
224 {
225 TaskManager::GetInstance().NotifyWorkerIdle(this);
226 }
227
NotifyWorkerCreated()228 void Worker::NotifyWorkerCreated()
229 {
230 TaskManager::GetInstance().NotifyWorkerCreated(this);
231 }
232
NotifyTaskFinished()233 void Worker::NotifyTaskFinished()
234 {
235 if (--runningCount_ != 0) {
236 // the worker state is still RUNNING and the start time will be updated
237 startTime_ = ConcurrentHelper::GetMilliseconds();
238 } else {
239 std::lock_guard<std::mutex> lock(stateMutex_);
240 if (state_ != WorkerState::BLOCKED) {
241 state_ = WorkerState::IDLE;
242 }
243 }
244 idlePoint_ = ConcurrentHelper::GetMilliseconds();
245 }
246
PerformTask(const uv_async_t * req)247 void Worker::PerformTask(const uv_async_t* req)
248 {
249 auto worker = static_cast<Worker*>(req->data);
250 napi_env env = worker->workerEnv_;
251 napi_status status = napi_ok;
252 RunningScope runningScope(worker, status);
253 NAPI_CALL_RETURN_VOID(env, status);
254 auto executeIdAndPriority = TaskManager::GetInstance().DequeueExecuteId();
255 if (executeIdAndPriority.first == 0) {
256 worker->NotifyTaskFinished();
257 return;
258 }
259
260 PriorityScope priorityScope(worker, executeIdAndPriority.second);
261 TaskInfo* taskInfo = TaskManager::GetInstance().GetTaskInfo(executeIdAndPriority.first);
262 if (taskInfo == nullptr) { // task may have been canceled
263 worker->NotifyTaskFinished();
264 HILOG_DEBUG("taskpool::PerformTask taskInfo is null");
265 return;
266 }
267 {
268 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
269 worker->currentTaskId_.emplace_back(taskInfo->taskId);
270 }
271 // tag for trace parse: Task Perform
272 std::string strTrace = "Task Perform: taskId : " + std::to_string(taskInfo->taskId) + ", executeId : " +
273 std::to_string(taskInfo->executeId);
274 HITRACE_HELPER_METER_NAME(strTrace);
275
276 taskInfo->worker = worker;
277 TaskManager::GetInstance().UpdateExecuteState(taskInfo->executeId, ExecuteState::RUNNING);
278 napi_value func;
279 status = napi_deserialize(env, taskInfo->serializationFunction, &func);
280 if (status != napi_ok || func == nullptr) {
281 HILOG_ERROR("taskpool:: PerformTask deserialize function fail");
282 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION,
283 "taskpool: failed to deserialize function.");
284 taskInfo->success = false;
285 NotifyTaskResult(env, taskInfo, err);
286 return;
287 }
288 napi_value args;
289 status = napi_deserialize(env, taskInfo->serializationArguments, &args);
290 if (status != napi_ok || args == nullptr) {
291 HILOG_ERROR("taskpool:: PerformTask deserialize arguments fail");
292 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION,
293 "taskpool: failed to deserialize arguments.");
294 taskInfo->success = false;
295 NotifyTaskResult(env, taskInfo, err);
296 return;
297 }
298
299 auto funcVal = reinterpret_cast<NativeValue*>(func);
300 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
301 // Store taskinfo in function
302 bool success = workerEngine->InitTaskPoolFunc(workerEngine, funcVal, taskInfo);
303 napi_value exception;
304 napi_get_and_clear_last_exception(env, &exception);
305 if (exception != nullptr) {
306 HILOG_ERROR("taskpool:: InitTaskPoolFunc occur exception");
307 taskInfo->success = false;
308 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
309 NotifyTaskResult(env, taskInfo, errorEvent);
310 return;
311 }
312 if (!success) {
313 HILOG_ERROR("taskpool:: InitTaskPoolFunc fail");
314 napi_value err = ErrorHelper::NewError(env, ErrorHelper::TYPE_ERROR,
315 "taskpool: function may not be concurrent.");
316 taskInfo->success = false;
317 NotifyTaskResult(env, taskInfo, err);
318 return;
319 }
320
321 uint32_t argsNum = NapiHelper::GetArrayLength(env, args);
322 napi_value argsArray[argsNum];
323 napi_value val;
324 for (size_t i = 0; i < argsNum; i++) {
325 napi_get_element(env, args, i, &val);
326 argsArray[i] = val;
327 }
328
329 napi_value result;
330 napi_value undefined = NapiHelper::GetUndefinedValue(env);
331 napi_call_function(env, undefined, func, argsNum, argsArray, &result);
332 {
333 std::lock_guard<std::mutex> lock(worker->stateMutex_);
334 if (LIKELY(worker->state_ == WorkerState::RUNNING)) {
335 uint64_t duration = ConcurrentHelper::GetMilliseconds() - worker->startTime_;
336 TaskManager::GetInstance().UpdateExecutedInfo(duration);
337 }
338 }
339 napi_get_and_clear_last_exception(env, &exception);
340 if (exception != nullptr) {
341 HILOG_ERROR("taskpool::PerformTask occur exception");
342 taskInfo->success = false;
343 napi_value errorEvent = ErrorHelper::TranslateErrorEvent(env, exception);
344 NotifyTaskResult(env, taskInfo, errorEvent);
345 }
346 }
347
NotifyTaskResult(napi_env env,TaskInfo * taskInfo,napi_value result)348 void Worker::NotifyTaskResult(napi_env env, TaskInfo* taskInfo, napi_value result)
349 {
350 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
351 napi_value undefined = NapiHelper::GetUndefinedValue(env);
352 napi_value resultData;
353 napi_status status = napi_serialize(env, result, undefined, &resultData);
354 if ((status != napi_ok || resultData == nullptr) && taskInfo->success) {
355 taskInfo->success = false;
356 napi_value err = ErrorHelper::NewError(env, ErrorHelper::ERR_WORKER_SERIALIZATION,
357 "taskpool: failed to serialize result.");
358 NotifyTaskResult(env, taskInfo, err);
359 return;
360 }
361 taskInfo->result = resultData;
362
363 TaskManager::GetInstance().RemoveExecuteState(taskInfo->executeId);
364 if (taskInfo->groupExecuteId == 0) {
365 TaskManager::GetInstance().PopRunningInfo(taskInfo->taskId, taskInfo->executeId);
366 }
367 TaskManager::GetInstance().PopTaskInfo(taskInfo->executeId);
368 Worker* worker = reinterpret_cast<Worker*>(taskInfo->worker);
369 {
370 std::lock_guard<std::mutex> lock(worker->currentTaskIdMutex_);
371 worker->currentTaskId_.erase(std::find(worker->currentTaskId_.begin(),
372 worker->currentTaskId_.end(),
373 taskInfo->taskId));
374 }
375 uv_async_send(taskInfo->onResultSignal);
376 worker->NotifyTaskFinished();
377 }
378
TaskResultCallback(NativeEngine * engine,NativeValue * result,bool success,void * data)379 void Worker::TaskResultCallback(NativeEngine* engine, NativeValue* result, bool success, void* data)
380 {
381 HITRACE_HELPER_METER_NAME(__PRETTY_FUNCTION__);
382 if (engine == nullptr) {
383 HILOG_FATAL("taskpool::TaskResultCallback engine is null");
384 return;
385 }
386 if (data == nullptr) {
387 HILOG_FATAL("taskpool:: taskInfo is nullptr");
388 return;
389 }
390 TaskInfo* taskInfo = static_cast<TaskInfo*>(data);
391 auto env = reinterpret_cast<napi_env>(engine);
392 taskInfo->success = success;
393 NotifyTaskResult(env, taskInfo, reinterpret_cast<napi_value>(result));
394 }
395
396 // reset qos_user_initiated after perform task
ResetWorkerPriority()397 void Worker::ResetWorkerPriority()
398 {
399 if (priority_ != Priority::HIGH) {
400 SetWorkerPriority(Priority::HIGH);
401 priority_ = Priority::HIGH;
402 }
403 }
404 } // namespace Commonlibrary::Concurrent::TaskPoolModule
405