• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2022 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "worker.h"
17 
18 #include "hitrace_meter.h"
19 #include "plugin/timer.h"
20 #include "task_manager.h"
21 #include "utils/log.h"
22 
23 namespace Commonlibrary::Concurrent::TaskPoolModule {
24 using namespace Commonlibrary::Concurrent::Common::Plugin;
25 
Worker(napi_env env)26 Worker::Worker(napi_env env) : hostEnv_(env) {}
27 
WorkerConstructor(napi_env env)28 Worker* Worker::WorkerConstructor(napi_env env)
29 {
30     HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
31     Worker* worker = new Worker(env);
32     worker->StartExecuteInThread();
33     return worker;
34 }
35 
~Worker()36 Worker::~Worker()
37 {
38     // when there is no active handle, worker loop will stop automatic.
39     uv_close(reinterpret_cast<uv_handle_t*>(performTaskSignal_), [](uv_handle_t* handle) {
40         if (handle != nullptr) {
41             delete reinterpret_cast<uv_async_t*>(handle);
42             handle = nullptr;
43         }
44     });
45 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
46     uv_close(reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_), [](uv_handle_t* handle) {
47         if (handle != nullptr) {
48             delete reinterpret_cast<uv_async_t*>(handle);
49             handle = nullptr;
50         }
51     });
52 #endif
53     uv_loop_t* loop = GetWorkerLoop();
54     if (loop != nullptr) {
55         uv_stop(loop);
56     }
57 }
58 
StartExecuteInThread()59 void Worker::StartExecuteInThread()
60 {
61     if (!runner_) {
62         runner_ = std::make_unique<TaskRunner>(TaskStartCallback(ExecuteInThread, this));
63     }
64     if (runner_) {
65         runner_->Execute(); // start a new thread
66     } else {
67         HILOG_ERROR("taskpool:: runner_ is nullptr");
68     }
69 }
70 
71 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
HandleDebuggerTask(const uv_async_t * req)72 void Worker::HandleDebuggerTask(const uv_async_t* req)
73 {
74     Worker* worker = reinterpret_cast<Worker*>(req->data);
75     if (worker == nullptr) {
76         HILOG_ERROR("taskpool:: worker is null");
77         return;
78     }
79     worker->debuggerTask_();
80 }
81 
DebuggerOnPostTask(std::function<void ()> && task)82 void Worker::DebuggerOnPostTask(std::function<void()>&& task)
83 {
84     if (uv_is_active(reinterpret_cast<uv_handle_t*>(debuggerOnPostTaskSignal_))) {
85         debuggerTask_ = std::move(task);
86         uv_async_send(debuggerOnPostTaskSignal_);
87     }
88 }
89 #endif
90 
ExecuteInThread(const void * data)91 void Worker::ExecuteInThread(const void* data)
92 {
93     StartTrace(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
94     auto worker = reinterpret_cast<Worker*>(const_cast<void*>(data));
95     {
96         napi_create_runtime(worker->hostEnv_, &worker->workerEnv_);
97         if (worker->workerEnv_ == nullptr) {
98             HILOG_ERROR("taskpool:: workerEnv is nullptr");
99             return;
100         }
101         auto workerEngine = reinterpret_cast<NativeEngine*>(worker->workerEnv_);
102         workerEngine->MarkSubThread();
103         workerEngine->InitTaskPoolThread(workerEngine, Worker::TaskResultCallback);
104     }
105     uv_loop_t* loop = worker->GetWorkerLoop();
106     if (loop == nullptr) {
107         HILOG_ERROR("taskpool:: loop is nullptr");
108         return;
109     }
110 
111     // Init worker task execute signal
112     worker->performTaskSignal_ = new uv_async_t;
113     worker->performTaskSignal_->data = worker;
114     uv_async_init(loop, worker->performTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::PerformTask));
115     FinishTrace(HITRACE_TAG_COMMONLIBRARY);
116 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
117     // Init debugger task post signal
118     worker->debuggerOnPostTaskSignal_ = new uv_async_t;
119     worker->debuggerOnPostTaskSignal_->data = worker;
120     uv_async_init(loop, worker->debuggerOnPostTaskSignal_, reinterpret_cast<uv_async_cb>(Worker::HandleDebuggerTask));
121 #endif
122     if (worker->PrepareForWorkerInstance()) {
123         // Call after uv_async_init
124         worker->NotifyIdle();
125         worker->RunLoop();
126     } else {
127         HILOG_ERROR("taskpool:: Worker PrepareForWorkerInstance failure");
128     }
129     worker->ReleaseWorkerThreadContent();
130     delete worker;
131     worker = nullptr;
132 }
133 
PrepareForWorkerInstance()134 bool Worker::PrepareForWorkerInstance()
135 {
136     HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
137     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
138     auto hostEngine = reinterpret_cast<NativeEngine*>(hostEnv_);
139 #if !defined(WINDOWS_PLATFORM) && !defined(MAC_PLATFORM)
140     workerEngine->SetDebuggerPostTaskFunc(
141         std::bind(&Worker::DebuggerOnPostTask, this, std::placeholders::_1));
142 #endif
143     if (!hostEngine->CallInitWorkerFunc(workerEngine)) {
144         HILOG_ERROR("taskpool:: Worker CallInitWorkerFunc failure");
145         return false;
146     }
147     // register timer interface
148     Timer::RegisterTime(workerEnv_);
149 
150     return true;
151 }
152 
ReleaseWorkerThreadContent()153 void Worker::ReleaseWorkerThreadContent()
154 {
155     HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
156     auto workerEngine = reinterpret_cast<NativeEngine*>(workerEnv_);
157     if (workerEngine == nullptr) {
158         HILOG_ERROR("taskpool:: workerEngine is nullptr");
159         return;
160     }
161 
162     Timer::ClearEnvironmentTimer(workerEnv_);
163 
164     // 2. delete NativeEngine created in worker thread
165     workerEngine->DeleteEngine();
166     delete workerEngine;
167     workerEnv_ = nullptr;
168 }
169 
NotifyExecuteTask()170 void Worker::NotifyExecuteTask()
171 {
172     if (uv_is_active(reinterpret_cast<uv_handle_t*>(performTaskSignal_))) {
173         uv_async_send(performTaskSignal_);
174     }
175 }
176 
NotifyIdle()177 void Worker::NotifyIdle()
178 {
179     TaskManager::GetInstance().NotifyWorkerIdle(this);
180 }
181 
PerformTask(const uv_async_t * req)182 void Worker::PerformTask(const uv_async_t* req)
183 {
184     HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
185     auto worker = static_cast<Worker*>(req->data);
186     napi_env env = worker->workerEnv_;
187     std::unique_ptr<Task> task = TaskManager::GetInstance().DequeueTask();
188     if (task == nullptr) {
189         worker->NotifyIdle();
190         return;
191     }
192 
193     TaskInfo* taskInfo = TaskManager::GetInstance().PopTaskInfo(task->executeId_);
194     if (taskInfo == nullptr) { // task may have been canceled
195         worker->NotifyIdle();
196         HILOG_ERROR("taskpool::PerformTask taskInfo is null");
197         return;
198     }
199     taskInfo->worker = worker;
200     TaskManager::GetInstance().UpdateState(taskInfo->executeId, TaskState::RUNNING);
201     napi_value undefined;
202     napi_get_undefined(env, &undefined);
203     napi_value taskData;
204     napi_status status = napi_deserialize(env, taskInfo->serializationData, &taskData);
205     if (status != napi_ok || taskData == nullptr) {
206         HILOG_ERROR("taskpool::PerformTask napi_deserialize fail");
207         napi_value err = ErrorHelper::NewError(env, ErrorHelper::WORKERSERIALIZATION_ERROR,
208             "taskpool: failed to deserialize message.");
209         taskInfo->success = false;
210         NotifyTaskResult(env, taskInfo, err);
211         return;
212     }
213     napi_value func;
214     napi_get_named_property(env, taskData, "func", &func);
215     auto funcVal = reinterpret_cast<NativeValue*>(func);
216     auto workerEngine = reinterpret_cast<NativeEngine*>(env);
217     workerEngine->InitTaskPoolFunc(workerEngine, funcVal);
218     napi_value args;
219     napi_get_named_property(env, taskData, "args", &args);
220     uint32_t argsNum = 0;
221     napi_get_array_length(env, args, &argsNum);
222     napi_value argsArray[argsNum];
223     napi_value val;
224     for (size_t i = 0; i < argsNum; i++) {
225         napi_get_element(env, args, i, &val);
226         argsArray[i] = val;
227     }
228     // Store taskinfo in last argument
229     napi_value data;
230     napi_create_external(env, taskInfo, nullptr, nullptr, &data);
231 
232     napi_value result;
233     napi_call_function(env, data, func, argsNum, argsArray, &result);
234 
235     napi_value exception;
236     napi_get_and_clear_last_exception(env, &exception);
237     if (exception != nullptr) {
238         HILOG_ERROR("taskpool::PerformTask occur exception");
239         taskInfo->success = false;
240         NotifyTaskResult(env, taskInfo, exception);
241     } else {
242         worker->NotifyIdle();
243     }
244 }
245 
NotifyTaskResult(napi_env env,TaskInfo * taskInfo,napi_value result)246 void Worker::NotifyTaskResult(napi_env env, TaskInfo* taskInfo, napi_value result)
247 {
248     HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
249     napi_value undefined;
250     napi_get_undefined(env, &undefined);
251 
252     napi_value resultData;
253     napi_status status = napi_serialize(env, result, undefined, &resultData);
254     if ((status != napi_ok || resultData == nullptr) && taskInfo->success) {
255         taskInfo->success = false;
256         napi_value err = ErrorHelper::NewError(env, ErrorHelper::WORKERSERIALIZATION_ERROR,
257             "taskpool: failed to serialize result.");
258         NotifyTaskResult(env, taskInfo, err);
259         return;
260     }
261     taskInfo->result = resultData;
262 
263     TaskManager::GetInstance().UpdateState(taskInfo->executeId, TaskState::TERMINATED);
264     TaskManager::GetInstance().PopRunningInfo(taskInfo->taskId, taskInfo->executeId);
265     uv_async_send(taskInfo->onResultSignal);
266 
267     // Warning: The worker thread maybe released for future taskpool shrinkage strategy?
268     Worker* worker = reinterpret_cast<Worker*>(taskInfo->worker);
269     worker->NotifyIdle();
270 }
271 
TaskResultCallback(NativeEngine * engine,NativeValue * value,NativeValue * data)272 void Worker::TaskResultCallback(NativeEngine* engine, NativeValue* value, NativeValue* data)
273 {
274     HITRACE_METER_NAME(HITRACE_TAG_COMMONLIBRARY, __PRETTY_FUNCTION__);
275     if (engine == nullptr) {
276         HILOG_FATAL("taskpool::TaskResultCallback engine is null");
277         return;
278     }
279 
280     auto env = reinterpret_cast<napi_env>(engine);
281     auto result = reinterpret_cast<napi_value>(value);
282 
283     napi_valuetype type;
284     napi_typeof(env, reinterpret_cast<napi_value>(data), &type);
285     if (type != napi_external) {
286         HILOG_INFO("taskpool::TaskResultCallback Concurrent func not called by taskpool.execute");
287         return;
288     }
289 
290     TaskInfo* taskInfo = nullptr;
291     napi_get_value_external(env, reinterpret_cast<napi_value>(data), reinterpret_cast<void**>(&taskInfo));
292     if (taskInfo == nullptr) {
293         HILOG_FATAL("taskpool::TaskResultCallback taskInfo is null");
294         return;
295     }
296 
297     NotifyTaskResult(env, taskInfo, result);
298 }
299 } // namespace Commonlibrary::Concurrent::TaskPoolModule
300