1 /*
2 * Copyright (c) 2022 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "worker.h"
17
18 #include "hitrace_meter.h"
19 #include "plugin/timer.h"
20 #include "task_manager.h"
21 #include "utils/log.h"
22
23 namespace Commonlibrary::Concurrent::TaskPoolModule {
24 using namespace Commonlibrary::Concurrent::Common::Plugin;
25
Worker(napi_env env)26 Worker::Worker(napi_env env) : hostEnv_(env) {}
27
WorkerConstructor(napi_env env)28 Worker* Worker::WorkerConstructor(napi_env env)
29 {
30 HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
31 Worker* worker = new Worker(env);
32 worker->StartExecuteInThread();
33 return worker;
34 }
35
~Worker()36 Worker::~Worker()
37 {
38 // when there is no active handle, worker loop will stop automatic.
39 uv_close(reinterpret_cast<uv_handle_t*>(performTaskSignal_), [](uv_handle_t* handle) {
40 if (handle != nullptr) {
41 delete reinterpret_cast<uv_async_t*>(handle);
42 handle = nullptr;
43 }
44 });
45 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
46 uv_close(reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_), [](uv_handle_t* handle) {
47 if (handle != nullptr) {
48 delete reinterpret_cast<uv_async_t*>(handle);
49 handle = nullptr;
50 }
51 });
52 #endif
53 uv_loop_t* loop = GetWorkerLoop();
54 if (loop != nullptr) {
55 uv_stop(loop);
56 }
57 }
58
StartExecuteInThread()59 void Worker::StartExecuteInThread()
60 {
61 if (!runner_) {
62 runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
63 }
64 if (runner_) {
65 runner_->Execute(); // start a new thread
66 } else {
67 HILOG_ERROR("taskpool:: runner_ is nullptr");
68 }
69 }
70
71 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)72 void Worker::HandleDebuggerTask(const uv_async_t* req)
73 {
74 Worker* worker = reinterpret_cast<Worker*>(req->data);
75 if (worker == nullptr) {
76 HILOG_ERROR("taskpool:: worker is null");
77 return;
78 }
79 worker->debuggerTask_();
80 }
81
DebuggerOnPostTask(std::function<void ()> && task)82 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
83 {
84 if (uv_is_active(reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) {
85 debuggerTask_ = std::move(task);
86 uv_async_send(debuggerOnPostTaskSignal_);
87 }
88 }
89 #endif
90
ExecuteInThread(const void * data)91 void Worker::ExecuteInThread(const void* data)
92 {
93 StartTrace(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
94 auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
95 {
96 napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
97 if (worker->workerEnv_ == nullptr) {
98 HILOG_ERROR("taskpool:: workerEnv is nullptr");
99 return;
100 }
101 auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
102 workerEngine->MarkSubThread();
103 workerEngine->InitTaskPoolThread(workerEngine, Worker::TaskResultCallback);
104 }
105 uv_loop_t* loop = worker->GetWorkerLoop();
106 if (loop == nullptr) {
107 HILOG_ERROR("taskpool:: loop is nullptr");
108 return;
109 }
110
111 // Init worker task execute signal
112 worker->performTaskSignal_ = new uv_async_t;
113 worker->performTaskSignal_->data = worker;
114 uv_async_init(loop, worker->performTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::PerformTask));
115 FinishTrace(HITRACE_TAG_COMMONLIBRARY);
116 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
117 // Init debugger task post signal
118 worker->debuggerOnPostTaskSignal_ = new uv_async_t;
119 worker->debuggerOnPostTaskSignal_->data = worker;
120 uv_async_init(loop, worker->debuggerOnPostTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::HandleDebuggerTask));
121 #endif
122 if (worker->PrepareForWorkerInstance()) {
123 // Call after uv_async_init
124 worker->NotifyIdle();
125 worker->RunLoop();
126 } else {
127 HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance failure");
128 }
129 worker->ReleaseWorkerThreadContent();
130 delete worker;
131 worker = nullptr;
132 }
133
PrepareForWorkerInstance()134 bool Worker::PrepareForWorkerInstance()
135 {
136 HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
137 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
138 auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
139 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
140 workerEngine->SetDebuggerPostTaskFunc(
141 std::bind(&Worker::DebuggerOnPostTask, this, std::placeholders::_1));
142 #endif
143 if (!hostEngine->CallInitWorkerFunc(workerEngine)) {
144 HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc failure");
145 return false;
146 }
147 // register timer interface
148 Timer::RegisterTime(workerEnv_);
149
150 return true;
151 }
152
ReleaseWorkerThreadContent()153 void Worker::ReleaseWorkerThreadContent()
154 {
155 HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
156 auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
157 if (workerEngine == nullptr) {
158 HILOG_ERROR("taskpool:: workerEngine is nullptr");
159 return;
160 }
161
162 Timer::ClearEnvironmentTimer(workerEnv_);
163
164 // 2. delete NativeEngine created in worker thread
165 workerEngine->DeleteEngine();
166 delete workerEngine;
167 workerEnv_ = nullptr;
168 }
169
NotifyExecuteTask()170 void Worker::NotifyExecuteTask()
171 {
172 if (uv_is_active(reinterpret_cast<uv_handle_t*>(performTaskSignal_))) {
173 uv_async_send(performTaskSignal_);
174 }
175 }
176
NotifyIdle()177 void Worker::NotifyIdle()
178 {
179 TaskManager::GetInstance().NotifyWorkerIdle(this);
180 }
181
PerformTask(const uv_async_t * req)182 void Worker::PerformTask(const uv_async_t* req)
183 {
184 HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
185 auto worker = static_cast<Worker*>(req->data);
186 napi_env env = worker->workerEnv_;
187 std::unique_ptr<Task> task = TaskManager::GetInstance().DequeueTask();
188 if (task == nullptr) {
189 worker->NotifyIdle();
190 return;
191 }
192
193 TaskInfo* taskInfo = TaskManager::GetInstance().PopTaskInfo(task->executeId_);
194 if (taskInfo == nullptr) { // task may have been canceled
195 worker->NotifyIdle();
196 HILOG_ERROR("taskpool::PerformTask taskInfo is null");
197 return;
198 }
199 taskInfo->worker = worker;
200 TaskManager::GetInstance().UpdateState(taskInfo->executeId, TaskState::RUNNING);
201 napi_value undefined;
202 napi_get_undefined(env, &undefined);
203 napi_value taskData;
204 napi_status status = napi_deserialize(env, taskInfo->serializationData, &taskData);
205 if (status != napi_ok || taskData == nullptr) {
206 HILOG_ERROR("taskpool::PerformTask napi_deserialize fail");
207 napi_value err = ErrorHelper::NewError(env, ErrorHelper::WORKERSERIALIZATION_ERROR,
208 "taskpool: failed to deserialize message.");
209 taskInfo->success = false;
210 NotifyTaskResult(env, taskInfo, err);
211 return;
212 }
213 napi_value func;
214 napi_get_named_property(env, taskData, "func", &func);
215 auto funcVal = reinterpret_cast<NativeValue*>(func);
216 auto workerEngine = reinterpret_cast<NativeEngine*>(env);
217 workerEngine->InitTaskPoolFunc(workerEngine, funcVal);
218 napi_value args;
219 napi_get_named_property(env, taskData, "args", &args);
220 uint32_t argsNum = 0;
221 napi_get_array_length(env, args, &argsNum);
222 napi_value argsArray[argsNum];
223 napi_value val;
224 for (size_t i = 0; i < argsNum; i++) {
225 napi_get_element(env, args, i, &val);
226 argsArray[i] = val;
227 }
228 // Store taskinfo in last argument
229 napi_value data;
230 napi_create_external(env, taskInfo, nullptr, nullptr, &data);
231
232 napi_value result;
233 napi_call_function(env, data, func, argsNum, argsArray, &result);
234
235 napi_value exception;
236 napi_get_and_clear_last_exception(env, &exception);
237 if (exception != nullptr) {
238 HILOG_ERROR("taskpool::PerformTask occur exception");
239 taskInfo->success = false;
240 NotifyTaskResult(env, taskInfo, exception);
241 } else {
242 worker->NotifyIdle();
243 }
244 }
245
NotifyTaskResult(napi_env env,TaskInfo * taskInfo,napi_value result)246 void Worker::NotifyTaskResult(napi_env env, TaskInfo* taskInfo, napi_value result)
247 {
248 HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
249 napi_value undefined;
250 napi_get_undefined(env, &undefined);
251
252 napi_value resultData;
253 napi_status status = napi_serialize(env, result, undefined, &resultData);
254 if ((status != napi_ok || resultData == nullptr) && taskInfo->success) {
255 taskInfo->success = false;
256 napi_value err = ErrorHelper::NewError(env, ErrorHelper::WORKERSERIALIZATION_ERROR,
257 "taskpool: failed to serialize result.");
258 NotifyTaskResult(env, taskInfo, err);
259 return;
260 }
261 taskInfo->result = resultData;
262
263 TaskManager::GetInstance().UpdateState(taskInfo->executeId, TaskState::TERMINATED);
264 TaskManager::GetInstance().PopRunningInfo(taskInfo->taskId, taskInfo->executeId);
265 uv_async_send(taskInfo->onResultSignal);
266
267 // Warning: The worker thread maybe released for future taskpool shrinkage strategy?
268 Worker* worker = reinterpret_cast<Worker*>(taskInfo->worker);
269 worker->NotifyIdle();
270 }
271
TaskResultCallback(NativeEngine * engine,NativeValue * value,NativeValue * data)272 void Worker::TaskResultCallback(NativeEngine* engine, NativeValue* value, NativeValue* data)
273 {
274 HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
275 if (engine == nullptr) {
276 HILOG_FATAL("taskpool::TaskResultCallback engine is null");
277 return;
278 }
279
280 auto env = reinterpret_cast<napi_env>(engine);
281 auto result = reinterpret_cast<napi_value>(value);
282
283 napi_valuetype type;
284 napi_typeof(env, reinterpret_cast<napi_value>(data), &type);
285 if (type != napi_external) {
286 HILOG_INFO("taskpool::TaskResultCallback Concurrent func not called by taskpool.execute");
287 return;
288 }
289
290 TaskInfo* taskInfo = nullptr;
291 napi_get_value_external(env, reinterpret_cast<napi_value>(data), reinterpret_cast<void**>(&taskInfo));
292 if (taskInfo == nullptr) {
293 HILOG_FATAL("taskpool::TaskResultCallback taskInfo is null");
294 return;
295 }
296
297 NotifyTaskResult(env, taskInfo, result);
298 }
299 } // namespace Commonlibrary::Concurrent::TaskPoolModule
300