1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "native_safe_async_work.h"
17
18 #include "ecmascript/napi/include/jsnapi.h"
19 #include "napi/native_api.h"
20 #include "native_async_work.h"
21 #include "native_engine.h"
22 #include "native_value.h"
23 #include "securec.h"
24 #include "utils/log.h"
25
26 // static methods start
AsyncCallback(uv_async_t * asyncHandler)27 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
28 {
29 HILOG_DEBUG("NativeSafeAsyncWork::AsyncCallback called");
30 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
31
32 that->ProcessAsyncHandle();
33 }
34
CallJs(NativeEngine * engine,napi_value js_call_func,void * context,void * data)35 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, napi_value js_call_func, void* context, void* data)
36 {
37 if (engine == nullptr || js_call_func == nullptr) {
38 HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
39 return;
40 }
41 napi_value value = nullptr;
42 napi_get_undefined(reinterpret_cast<napi_env>(engine), &value);
43 if (value == nullptr) {
44 HILOG_ERROR("CreateUndefined failed");
45 return;
46 }
47
48 auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
49 if (resultValue == nullptr) {
50 HILOG_ERROR("CallFunction failed");
51 }
52 }
53
NativeSafeAsyncWork(NativeEngine * engine,napi_value func,napi_value asyncResource,napi_value asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)54 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
55 napi_value func,
56 napi_value asyncResource,
57 napi_value asyncResourceName,
58 size_t maxQueueSize,
59 size_t threadCount,
60 void* finalizeData,
61 NativeFinalize finalizeCallback,
62 void* context,
63 NativeThreadSafeFunctionCallJs callJsCallback)
64 :engine_(engine), maxQueueSize_(maxQueueSize),
65 threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
66 context_(context), callJsCallback_(callJsCallback)
67 {
68 errno_t err = EOK;
69 err = memset_s(&asyncContext_, sizeof(asyncContext_), 0, sizeof(asyncContext_));
70 if (err != EOK) {
71 return;
72 }
73
74 asyncContext_.napiAsyncResource = asyncResource;
75 asyncContext_.napiAsyncResourceName = asyncResourceName;
76 if (func != nullptr) {
77 uint32_t initialRefcount = 1;
78 ref_ = engine->CreateReference(func, initialRefcount);
79 }
80 }
81
~NativeSafeAsyncWork()82 NativeSafeAsyncWork::~NativeSafeAsyncWork()
83 {
84 if (ref_ != nullptr) {
85 delete ref_;
86 ref_ = nullptr;
87 }
88 }
89
Init()90 bool NativeSafeAsyncWork::Init()
91 {
92 HILOG_DEBUG("NativeSafeAsyncWork::Init called");
93
94 uv_loop_t* loop = engine_->GetUVLoop();
95 if (loop == nullptr) {
96 HILOG_ERROR("Get loop failed");
97 return false;
98 }
99
100 int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
101 if (ret != 0) {
102 HILOG_ERROR("uv async init failed %d", ret);
103 uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), nullptr);
104 return false;
105 }
106
107 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
108 return true;
109 }
110
IsMaxQueueSize()111 bool NativeSafeAsyncWork::IsMaxQueueSize()
112 {
113 return (queue_.size() > maxQueueSize_ &&
114 maxQueueSize_ > 0 &&
115 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
116 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
117 }
118
Send(void * data,NativeThreadSafeFunctionCallMode mode)119 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
120 {
121 std::unique_lock<std::mutex> lock(mutex_);
122 if (IsMaxQueueSize()) {
123 HILOG_INFO("queue size bigger than max queue size");
124 if (mode == NATIVE_TSFUNC_BLOCKING) {
125 while (IsMaxQueueSize()) {
126 condition_.wait(lock);
127 }
128 } else {
129 return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
130 }
131 }
132
133 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
134 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
135 if (threadCount_ == 0) {
136 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
137 } else {
138 threadCount_--;
139 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
140 }
141 } else {
142 queue_.emplace(data);
143 auto ret = uv_async_send(&asyncHandler_);
144 if (ret != 0) {
145 HILOG_ERROR("uv async send failed %d", ret);
146 return SafeAsyncCode::SAFE_ASYNC_FAILED;
147 }
148 }
149
150 return SafeAsyncCode::SAFE_ASYNC_OK;
151 }
152
Acquire()153 SafeAsyncCode NativeSafeAsyncWork::Acquire()
154 {
155 HILOG_DEBUG("NativeSafeAsyncWork::Acquire called");
156
157 std::unique_lock<std::mutex> lock(mutex_);
158
159 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
160 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
161 HILOG_WARN("Do not acquire, thread is closed!");
162 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
163 }
164
165 // increase thread count
166 threadCount_++;
167
168 return SafeAsyncCode::SAFE_ASYNC_OK;
169 }
170
Release(NativeThreadSafeFunctionReleaseMode mode)171 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
172 {
173 HILOG_DEBUG("NativeSafeAsyncWork::Release called");
174
175 std::unique_lock<std::mutex> lock(mutex_);
176
177 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
178 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
179 HILOG_WARN("Do not release, thread is closed!");
180 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
181 }
182
183 if (threadCount_ == 0) {
184 HILOG_ERROR("Do not release, thread count is zero.");
185 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
186 }
187
188 // decrease thread count
189 threadCount_--;
190
191 if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
192 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
193 if (maxQueueSize_ > 0) {
194 condition_.notify_one();
195 }
196 }
197
198 if (threadCount_ == 0 ||
199 mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
200 // trigger async handle
201 auto ret = uv_async_send(&asyncHandler_);
202 if (ret != 0) {
203 HILOG_ERROR("uv async send failed %d", ret);
204 return SafeAsyncCode::SAFE_ASYNC_FAILED;
205 }
206 }
207
208 return SafeAsyncCode::SAFE_ASYNC_OK;
209 }
210
Ref()211 bool NativeSafeAsyncWork::Ref()
212 {
213 if (!IsSameTid()) {
214 HILOG_ERROR("tid not same");
215 return false;
216 }
217
218 uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
219
220 return true;
221 }
222
Unref()223 bool NativeSafeAsyncWork::Unref()
224 {
225 if (!IsSameTid()) {
226 HILOG_ERROR("tid not same");
227 return false;
228 }
229
230 uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
231
232 return true;
233 }
234
GetContext()235 void* NativeSafeAsyncWork::GetContext()
236 {
237 return context_;
238 }
239
ProcessAsyncHandle()240 void NativeSafeAsyncWork::ProcessAsyncHandle()
241 {
242 std::unique_lock<std::mutex> lock(mutex_);
243 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
244 HILOG_ERROR("Process failed, thread is closed!");
245 return;
246 }
247
248 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
249 HILOG_ERROR("thread is closing!");
250 CloseHandles();
251 return;
252 }
253
254 size_t size = queue_.size();
255 void* data = nullptr;
256
257 auto vm = engine_->GetEcmaVm();
258 panda::LocalScope scope(vm);
259 TryCatch tryCatch(reinterpret_cast<napi_env>(engine_));
260 while (size > 0) {
261 data = queue_.front();
262
263 // when queue is full, notify send.
264 if (size == maxQueueSize_ && maxQueueSize_ > 0) {
265 condition_.notify_one();
266 }
267
268 napi_value func_ = (ref_ == nullptr) ? nullptr : ref_->Get();
269 if (callJsCallback_ != nullptr) {
270 callJsCallback_(engine_, func_, context_, data);
271 } else {
272 CallJs(engine_, func_, context_, data);
273 }
274
275 if (tryCatch.HasCaught()) {
276 engine_->HandleUncaughtException();
277 }
278 queue_.pop();
279 size--;
280 }
281
282 if (size == 0 && threadCount_ == 0) {
283 CloseHandles();
284 }
285 }
286
CloseHandles()287 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
288 {
289 HILOG_DEBUG("NativeSafeAsyncWork::CloseHandles called");
290
291 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
292 HILOG_INFO("Close failed, thread is closed!");
293 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
294 }
295
296 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
297
298 // close async handler
299 uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
300 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
301 reinterpret_cast<uv_async_t*>(handle));
302 that->CleanUp();
303 });
304
305 return SafeAsyncCode::SAFE_ASYNC_OK;
306 }
307
CleanUp()308 void NativeSafeAsyncWork::CleanUp()
309 {
310 HILOG_DEBUG("NativeSafeAsyncWork::CleanUp called");
311
312 if (finalizeCallback_ != nullptr) {
313 finalizeCallback_(engine_, finalizeData_, context_);
314 }
315
316 // clean data
317 while (!queue_.empty()) {
318 if (callJsCallback_ != nullptr) {
319 callJsCallback_(nullptr, nullptr, context_, queue_.front());
320 } else {
321 CallJs(nullptr, nullptr, context_, queue_.front());
322 }
323 queue_.pop();
324 }
325 }
326
IsSameTid()327 bool NativeSafeAsyncWork::IsSameTid()
328 {
329 auto tid = pthread_self();
330 return (tid == engine_->GetTid()) ? true : false;
331 }
332