1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "native_safe_async_work.h"
17
18 #include "napi/native_api.h"
19 #include "native_async_work.h"
20 #include "native_engine.h"
21 #include "native_value.h"
22 #include "securec.h"
23 #include "utils/log.h"
24
25 // static methods start
AsyncCallback(uv_async_t * asyncHandler)26 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
27 {
28 HILOG_INFO("NativeSafeAsyncWork::AsyncCallback called");
29
30 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
31 auto ret = uv_idle_start(&that->idleHandler_, IdleCallback);
32 if (ret != 0) {
33 HILOG_ERROR("uv idle start failed %d", ret);
34 return;
35 }
36 }
37
IdleCallback(uv_idle_t * idleHandler)38 void NativeSafeAsyncWork::IdleCallback(uv_idle_t* idleHandler)
39 {
40 HILOG_INFO("NativeSafeAsyncWork::IdleCallback called");
41 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::idleHandler_, idleHandler);
42
43 that->ProcessAsyncHandle();
44 }
45
CallJs(NativeEngine * engine,NativeValue * js_call_func,void * context,void * data)46 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, NativeValue* js_call_func, void* context, void* data)
47 {
48 HILOG_INFO("NativeSafeAsyncWork::CallJs called");
49
50 if (engine == nullptr || js_call_func == nullptr) {
51 HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
52 return;
53 }
54
55 auto value = engine->CreateUndefined();
56 if (value == nullptr) {
57 HILOG_ERROR("CreateUndefined failed");
58 return;
59 }
60
61 auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
62 if (resultValue == nullptr) {
63 HILOG_ERROR("CallFunction failed");
64 }
65 }
66 // static methods end
67
NativeSafeAsyncWork(NativeEngine * engine,NativeValue * func,NativeValue * asyncResource,NativeValue * asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)68 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
69 NativeValue* func,
70 NativeValue* asyncResource,
71 NativeValue* asyncResourceName,
72 size_t maxQueueSize,
73 size_t threadCount,
74 void* finalizeData,
75 NativeFinalize finalizeCallback,
76 void* context,
77 NativeThreadSafeFunctionCallJs callJsCallback)
78 :engine_(engine), maxQueueSize_(maxQueueSize),
79 threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
80 context_(context), callJsCallback_(callJsCallback)
81 {
82 errno_t err = EOK;
83 err = memset_s(&asyncContext_, sizeof(asyncContext_), 0, sizeof(asyncContext_));
84 if (err != EOK) {
85 return;
86 }
87
88 asyncContext_.asyncResource = asyncResource;
89 asyncContext_.asyncResourceName = asyncResourceName;
90 if (func != nullptr) {
91 uint32_t initialRefcount = 1;
92 ref_ = engine->CreateReference(func, initialRefcount);
93 }
94 }
95
~NativeSafeAsyncWork()96 NativeSafeAsyncWork::~NativeSafeAsyncWork()
97 {
98 if (ref_ != nullptr) {
99 delete ref_;
100 ref_ = nullptr;
101 }
102 }
103
Init()104 bool NativeSafeAsyncWork::Init()
105 {
106 HILOG_INFO("NativeSafeAsyncWork::Init called");
107
108 uv_loop_t* loop = engine_->GetUVLoop();
109 if (loop == nullptr) {
110 HILOG_ERROR("Get loop failed");
111 return false;
112 }
113
114 int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
115 if (ret != 0) {
116 HILOG_ERROR("uv async init failed %d", ret);
117 return false;
118 }
119
120 ret = uv_idle_init(loop, &idleHandler_);
121 if (ret != 0) {
122 HILOG_ERROR("uv idle init failed %d", ret);
123 uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), nullptr);
124 return false;
125 }
126
127 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
128 return true;
129 }
130
IsMaxQueueSize()131 bool NativeSafeAsyncWork::IsMaxQueueSize()
132 {
133 return (queue_.size() > maxQueueSize_ &&
134 maxQueueSize_ > 0 &&
135 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
136 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
137 }
138
Send(void * data,NativeThreadSafeFunctionCallMode mode)139 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
140 {
141 HILOG_INFO("NativeSafeAsyncWork::Send called");
142
143 std::unique_lock<std::mutex> lock(mutex_);
144 if (IsMaxQueueSize()) {
145 HILOG_INFO("queue size bigger than max queue size");
146 if (mode == NATIVE_TSFUNC_BLOCKING) {
147 while (IsMaxQueueSize()) {
148 condition_.wait(lock);
149 }
150 } else {
151 return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
152 }
153 }
154
155 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
156 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
157 if (threadCount_ == 0) {
158 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
159 } else {
160 threadCount_--;
161 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
162 }
163 } else {
164 queue_.push(data);
165 auto ret = uv_async_send(&asyncHandler_);
166 if (ret != 0) {
167 HILOG_ERROR("uv async send failed %d", ret);
168 return SafeAsyncCode::SAFE_ASYNC_FAILED;
169 }
170 }
171
172 return SafeAsyncCode::SAFE_ASYNC_OK;
173 }
174
Acquire()175 SafeAsyncCode NativeSafeAsyncWork::Acquire()
176 {
177 HILOG_INFO("NativeSafeAsyncWork::Acquire called");
178
179 std::unique_lock<std::mutex> lock(mutex_);
180
181 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
182 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
183 HILOG_WARN("Do not acquire, thread is closed!");
184 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
185 }
186
187 // increase thread count
188 threadCount_++;
189
190 return SafeAsyncCode::SAFE_ASYNC_OK;
191 }
192
Release(NativeThreadSafeFunctionReleaseMode mode)193 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
194 {
195 HILOG_INFO("NativeSafeAsyncWork::Release called");
196
197 std::unique_lock<std::mutex> lock(mutex_);
198
199 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
200 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
201 HILOG_WARN("Do not release, thread is closed!");
202 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
203 }
204
205 if (threadCount_ == 0) {
206 HILOG_ERROR("Do not release, thread count is zero.");
207 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
208 }
209
210 // decrease thread count
211 threadCount_--;
212
213 if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
214 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
215 if (maxQueueSize_ > 0) {
216 condition_.notify_one();
217 }
218 }
219
220 if (threadCount_ == 0 ||
221 mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
222 // trigger async handle
223 auto ret = uv_async_send(&asyncHandler_);
224 if (ret != 0) {
225 HILOG_ERROR("uv async send failed %d", ret);
226 return SafeAsyncCode::SAFE_ASYNC_FAILED;
227 }
228 }
229
230 return SafeAsyncCode::SAFE_ASYNC_OK;
231 }
232
Ref()233 bool NativeSafeAsyncWork::Ref()
234 {
235 if (!IsSameTid()) {
236 HILOG_ERROR("tid not same");
237 return false;
238 }
239
240 uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
241 uv_ref(reinterpret_cast<uv_handle_t*>(&idleHandler_));
242
243 return true;
244 }
245
Unref()246 bool NativeSafeAsyncWork::Unref()
247 {
248 if (!IsSameTid()) {
249 HILOG_ERROR("tid not same");
250 return false;
251 }
252
253 uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
254 uv_unref(reinterpret_cast<uv_handle_t*>(&idleHandler_));
255
256 return true;
257 }
258
GetContext()259 void* NativeSafeAsyncWork::GetContext()
260 {
261 return context_;
262 }
263
ProcessAsyncHandle()264 void NativeSafeAsyncWork::ProcessAsyncHandle()
265 {
266 HILOG_INFO("NativeSafeAsyncWork::ProcessAsyncHandle called");
267
268 std::unique_lock<std::mutex> lock(mutex_);
269 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
270 HILOG_ERROR("Process failed, thread is closed!");
271 return;
272 }
273
274 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
275 HILOG_ERROR("thread is closing!");
276
277 if (uv_idle_stop(&idleHandler_) != 0) {
278 HILOG_ERROR("uv idle stop failed");
279 }
280
281 CloseHandles();
282 return;
283 }
284
285 size_t size = queue_.size();
286 void* data = nullptr;
287
288 HILOG_INFO("queue size %d", (int32_t)size);
289 while (size > 0) {
290 data = queue_.front();
291
292 // when queue is full, notify send.
293 if (size == maxQueueSize_ && maxQueueSize_ > 0) {
294 condition_.notify_one();
295 }
296
297 NativeValue* func_ = (ref_ == nullptr) ? nullptr : ref_->Get();
298 if (callJsCallback_ != nullptr) {
299 callJsCallback_(engine_, func_, context_, data);
300 } else {
301 CallJs(engine_, func_, context_, data);
302 }
303 queue_.pop();
304 size--;
305 }
306
307 if (size == 0) {
308 if (uv_idle_stop(&idleHandler_) != 0) {
309 HILOG_ERROR("uv idle stop failed");
310 }
311 if (threadCount_ == 0) {
312 CloseHandles();
313 }
314 }
315 }
316
CloseHandles()317 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
318 {
319 HILOG_INFO("NativeSafeAsyncWork::CloseHandles called");
320
321 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
322 HILOG_INFO("Close failed, thread is closed!");
323 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
324 }
325
326 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
327
328 // close async handler
329 uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
330 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
331 reinterpret_cast<uv_async_t*>(handle));
332 // close idle handler
333 uv_close(reinterpret_cast<uv_handle_t*>(&that->idleHandler_), [](uv_handle_t* handle) {
334 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::idleHandler_,
335 reinterpret_cast<uv_idle_t*>(handle));
336 that->CleanUp();
337 });
338 });
339
340 return SafeAsyncCode::SAFE_ASYNC_OK;
341 }
342
CleanUp()343 void NativeSafeAsyncWork::CleanUp()
344 {
345 HILOG_INFO("NativeSafeAsyncWork::CleanUp called");
346
347 if (finalizeCallback_ != nullptr) {
348 finalizeCallback_(engine_, finalizeData_, context_);
349 }
350
351 // clean data
352 while (!queue_.empty()) {
353 if (callJsCallback_ != nullptr) {
354 callJsCallback_(nullptr, nullptr, context_, queue_.front());
355 } else {
356 CallJs(nullptr, nullptr, context_, queue_.front());
357 }
358 queue_.pop();
359 }
360 }
361
IsSameTid()362 bool NativeSafeAsyncWork::IsSameTid()
363 {
364 auto tid = pthread_self();
365 return (tid == engine_->GetTid()) ? true : false;
366 }
367