1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "native_safe_async_work.h"
17
18 #include "napi/native_api.h"
19 #include "native_async_work.h"
20 #include "native_engine.h"
21 #include "native_value.h"
22 #include "securec.h"
23 #include "utils/log.h"
24
25 // static methods start
AsyncCallback(uv_async_t * asyncHandler)26 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
27 {
28 HILOG_INFO("NativeSafeAsyncWork::AsyncCallback called");
29 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
30
31 that->ProcessAsyncHandle();
32 }
33
CallJs(NativeEngine * engine,NativeValue * js_call_func,void * context,void * data)34 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, NativeValue* js_call_func, void* context, void* data)
35 {
36 HILOG_INFO("NativeSafeAsyncWork::CallJs called");
37
38 if (engine == nullptr || js_call_func == nullptr) {
39 HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
40 return;
41 }
42
43 auto value = engine->CreateUndefined();
44 if (value == nullptr) {
45 HILOG_ERROR("CreateUndefined failed");
46 return;
47 }
48
49 auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
50 if (resultValue == nullptr) {
51 HILOG_ERROR("CallFunction failed");
52 }
53 }
54 // static methods end
55
NativeSafeAsyncWork(NativeEngine * engine,NativeValue * func,NativeValue * asyncResource,NativeValue * asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)56 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
57 NativeValue* func,
58 NativeValue* asyncResource,
59 NativeValue* asyncResourceName,
60 size_t maxQueueSize,
61 size_t threadCount,
62 void* finalizeData,
63 NativeFinalize finalizeCallback,
64 void* context,
65 NativeThreadSafeFunctionCallJs callJsCallback)
66 :engine_(engine), maxQueueSize_(maxQueueSize),
67 threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
68 context_(context), callJsCallback_(callJsCallback)
69 {
70 errno_t err = EOK;
71 err = memset_s(&asyncContext_, sizeof(asyncContext_), 0, sizeof(asyncContext_));
72 if (err != EOK) {
73 return;
74 }
75
76 asyncContext_.asyncResource = asyncResource;
77 asyncContext_.asyncResourceName = asyncResourceName;
78 if (func != nullptr) {
79 uint32_t initialRefcount = 1;
80 ref_ = engine->CreateReference(func, initialRefcount);
81 }
82 }
83
~NativeSafeAsyncWork()84 NativeSafeAsyncWork::~NativeSafeAsyncWork()
85 {
86 if (ref_ != nullptr) {
87 delete ref_;
88 ref_ = nullptr;
89 }
90 }
91
Init()92 bool NativeSafeAsyncWork::Init()
93 {
94 HILOG_INFO("NativeSafeAsyncWork::Init called");
95
96 uv_loop_t* loop = engine_->GetUVLoop();
97 if (loop == nullptr) {
98 HILOG_ERROR("Get loop failed");
99 return false;
100 }
101
102 int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
103 if (ret != 0) {
104 HILOG_ERROR("uv async init failed %d", ret);
105 uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), nullptr);
106 return false;
107 }
108
109 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
110 return true;
111 }
112
IsMaxQueueSize()113 bool NativeSafeAsyncWork::IsMaxQueueSize()
114 {
115 return (queue_.size() > maxQueueSize_ &&
116 maxQueueSize_ > 0 &&
117 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
118 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
119 }
120
Send(void * data,NativeThreadSafeFunctionCallMode mode)121 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
122 {
123 HILOG_INFO("NativeSafeAsyncWork::Send called");
124
125 std::unique_lock<std::mutex> lock(mutex_);
126 if (IsMaxQueueSize()) {
127 HILOG_INFO("queue size bigger than max queue size");
128 if (mode == NATIVE_TSFUNC_BLOCKING) {
129 while (IsMaxQueueSize()) {
130 condition_.wait(lock);
131 }
132 } else {
133 return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
134 }
135 }
136
137 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
138 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
139 if (threadCount_ == 0) {
140 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
141 } else {
142 threadCount_--;
143 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
144 }
145 } else {
146 queue_.push(data);
147 auto ret = uv_async_send(&asyncHandler_);
148 if (ret != 0) {
149 HILOG_ERROR("uv async send failed %d", ret);
150 return SafeAsyncCode::SAFE_ASYNC_FAILED;
151 }
152 }
153
154 return SafeAsyncCode::SAFE_ASYNC_OK;
155 }
156
Acquire()157 SafeAsyncCode NativeSafeAsyncWork::Acquire()
158 {
159 HILOG_INFO("NativeSafeAsyncWork::Acquire called");
160
161 std::unique_lock<std::mutex> lock(mutex_);
162
163 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
164 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
165 HILOG_WARN("Do not acquire, thread is closed!");
166 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
167 }
168
169 // increase thread count
170 threadCount_++;
171
172 return SafeAsyncCode::SAFE_ASYNC_OK;
173 }
174
Release(NativeThreadSafeFunctionReleaseMode mode)175 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
176 {
177 HILOG_INFO("NativeSafeAsyncWork::Release called");
178
179 std::unique_lock<std::mutex> lock(mutex_);
180
181 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
182 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
183 HILOG_WARN("Do not release, thread is closed!");
184 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
185 }
186
187 if (threadCount_ == 0) {
188 HILOG_ERROR("Do not release, thread count is zero.");
189 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
190 }
191
192 // decrease thread count
193 threadCount_--;
194
195 if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
196 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
197 if (maxQueueSize_ > 0) {
198 condition_.notify_one();
199 }
200 }
201
202 if (threadCount_ == 0 ||
203 mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
204 // trigger async handle
205 auto ret = uv_async_send(&asyncHandler_);
206 if (ret != 0) {
207 HILOG_ERROR("uv async send failed %d", ret);
208 return SafeAsyncCode::SAFE_ASYNC_FAILED;
209 }
210 }
211
212 return SafeAsyncCode::SAFE_ASYNC_OK;
213 }
214
Ref()215 bool NativeSafeAsyncWork::Ref()
216 {
217 if (!IsSameTid()) {
218 HILOG_ERROR("tid not same");
219 return false;
220 }
221
222 uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
223
224 return true;
225 }
226
Unref()227 bool NativeSafeAsyncWork::Unref()
228 {
229 if (!IsSameTid()) {
230 HILOG_ERROR("tid not same");
231 return false;
232 }
233
234 uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
235
236 return true;
237 }
238
GetContext()239 void* NativeSafeAsyncWork::GetContext()
240 {
241 return context_;
242 }
243
ProcessAsyncHandle()244 void NativeSafeAsyncWork::ProcessAsyncHandle()
245 {
246 HILOG_INFO("NativeSafeAsyncWork::ProcessAsyncHandle called");
247
248 auto scopeManager = engine_->GetScopeManager();
249 if (scopeManager == nullptr) {
250 HILOG_ERROR("scope manager is null");
251 return;
252 }
253
254 std::unique_lock<std::mutex> lock(mutex_);
255 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
256 HILOG_ERROR("Process failed, thread is closed!");
257 return;
258 }
259
260 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
261 HILOG_ERROR("thread is closing!");
262 CloseHandles();
263 return;
264 }
265
266 size_t size = queue_.size();
267 void* data = nullptr;
268
269 HILOG_INFO("queue size %d", (int32_t)size);
270
271 auto nativeScope = scopeManager->Open();
272
273 while (size > 0) {
274 data = queue_.front();
275
276 // when queue is full, notify send.
277 if (size == maxQueueSize_ && maxQueueSize_ > 0) {
278 condition_.notify_one();
279 }
280
281 NativeValue* func_ = (ref_ == nullptr) ? nullptr : ref_->Get();
282 if (callJsCallback_ != nullptr) {
283 callJsCallback_(engine_, func_, context_, data);
284 } else {
285 CallJs(engine_, func_, context_, data);
286 }
287 queue_.pop();
288 size--;
289 }
290
291 if (size == 0 && threadCount_ == 0) {
292 CloseHandles();
293 }
294
295 scopeManager->Close(nativeScope);
296 }
297
CloseHandles()298 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
299 {
300 HILOG_INFO("NativeSafeAsyncWork::CloseHandles called");
301
302 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
303 HILOG_INFO("Close failed, thread is closed!");
304 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
305 }
306
307 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
308
309 // close async handler
310 uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
311 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
312 reinterpret_cast<uv_async_t*>(handle));
313 that->CleanUp();
314 });
315
316 return SafeAsyncCode::SAFE_ASYNC_OK;
317 }
318
CleanUp()319 void NativeSafeAsyncWork::CleanUp()
320 {
321 HILOG_INFO("NativeSafeAsyncWork::CleanUp called");
322
323 if (finalizeCallback_ != nullptr) {
324 finalizeCallback_(engine_, finalizeData_, context_);
325 }
326
327 // clean data
328 while (!queue_.empty()) {
329 if (callJsCallback_ != nullptr) {
330 callJsCallback_(nullptr, nullptr, context_, queue_.front());
331 } else {
332 CallJs(nullptr, nullptr, context_, queue_.front());
333 }
334 queue_.pop();
335 }
336 }
337
IsSameTid()338 bool NativeSafeAsyncWork::IsSameTid()
339 {
340 auto tid = pthread_self();
341 return (tid == engine_->GetTid()) ? true : false;
342 }
343