1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include "native_safe_async_work.h"
17
18 #include "napi/native_api.h"
19 #include "native_async_work.h"
20 #include "native_engine.h"
21 #include "native_value.h"
22 #include "securec.h"
23 #include "utils/log.h"
24
25 // static methods start
AsyncCallback(uv_async_t * asyncHandler)26 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
27 {
28 HILOG_INFO("NativeSafeAsyncWork::AsyncCallback called");
29
30 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
31 auto ret = uv_idle_start(&that->idleHandler_, IdleCallback);
32 if (ret != 0) {
33 HILOG_ERROR("uv idle start failed %d", ret);
34 return;
35 }
36 }
37
IdleCallback(uv_idle_t * idleHandler)38 void NativeSafeAsyncWork::IdleCallback(uv_idle_t* idleHandler)
39 {
40 HILOG_INFO("NativeSafeAsyncWork::IdleCallback called");
41 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::idleHandler_, idleHandler);
42
43 that->ProcessAsyncHandle();
44 }
45
CallJs(NativeEngine * engine,NativeValue * js_call_func,void * context,void * data)46 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, NativeValue* js_call_func, void* context, void* data)
47 {
48 HILOG_INFO("NativeSafeAsyncWork::CallJs called");
49
50 if (engine == nullptr || js_call_func == nullptr) {
51 HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
52 return;
53 }
54
55 auto value = engine->CreateUndefined();
56 if (value == nullptr) {
57 HILOG_ERROR("CreateUndefined failed");
58 return;
59 }
60
61 auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
62 if (resultValue == nullptr) {
63 HILOG_ERROR("CallFunction failed");
64 }
65 }
66 // static methods end
67
NativeSafeAsyncWork(NativeEngine * engine,NativeValue * func,NativeValue * asyncResource,NativeValue * asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)68 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
69 NativeValue* func,
70 NativeValue* asyncResource,
71 NativeValue* asyncResourceName,
72 size_t maxQueueSize,
73 size_t threadCount,
74 void* finalizeData,
75 NativeFinalize finalizeCallback,
76 void* context,
77 NativeThreadSafeFunctionCallJs callJsCallback)
78 :engine_(engine), func_(func), maxQueueSize_(maxQueueSize),
79 threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
80 context_(context), callJsCallback_(callJsCallback)
81 {
82 errno_t err = EOK;
83 err = memset_s(&asyncContext_, sizeof(asyncContext_), 0, sizeof(asyncContext_));
84 if (err != EOK) {
85 return;
86 }
87
88 asyncContext_.asyncResource = asyncResource;
89 asyncContext_.asyncResourceName = asyncResourceName;
90 }
91
~NativeSafeAsyncWork()92 NativeSafeAsyncWork::~NativeSafeAsyncWork() {}
93
Init()94 bool NativeSafeAsyncWork::Init()
95 {
96 HILOG_INFO("NativeSafeAsyncWork::Init called");
97
98 uv_loop_t* loop = engine_->GetUVLoop();
99 if (loop == nullptr) {
100 HILOG_ERROR("Get loop failed");
101 return false;
102 }
103
104 int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
105 if (ret != 0) {
106 HILOG_ERROR("uv async init failed %d", ret);
107 return false;
108 }
109
110 ret = uv_idle_init(loop, &idleHandler_);
111 if (ret != 0) {
112 HILOG_ERROR("uv idle init failed %d", ret);
113 uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), nullptr);
114 return false;
115 }
116
117 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
118 return true;
119 }
120
Send(void * data,NativeThreadSafeFunctionCallMode mode)121 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
122 {
123 HILOG_INFO("NativeSafeAsyncWork::Send called");
124
125 std::unique_lock<std::mutex> lock(mutex_);
126
127 if (queue_.size() > maxQueueSize_ &&
128 maxQueueSize_ > 0 &&
129 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
130 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
131 HILOG_INFO("queue size bigger than max queue size");
132
133 if (mode == NATIVE_TSFUNC_NONBLOCKING) {
134 return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
135 }
136 condition_.wait(lock);
137 }
138
139 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
140 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
141 if (threadCount_ == 0) {
142 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
143 } else {
144 threadCount_--;
145 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
146 }
147 } else {
148 queue_.push(data);
149 auto ret = uv_async_send(&asyncHandler_);
150 if (ret != 0) {
151 HILOG_ERROR("uv async send failed %d", ret);
152 return SafeAsyncCode::SAFE_ASYNC_FAILED;
153 }
154 }
155
156 return SafeAsyncCode::SAFE_ASYNC_OK;
157 }
158
Acquire()159 SafeAsyncCode NativeSafeAsyncWork::Acquire()
160 {
161 HILOG_INFO("NativeSafeAsyncWork::Acquire called");
162
163 std::unique_lock<std::mutex> lock(mutex_);
164
165 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
166 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
167 HILOG_WARN("Do not acquire, thread is closed!");
168 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
169 }
170
171 // increase thread count
172 threadCount_++;
173
174 return SafeAsyncCode::SAFE_ASYNC_OK;
175 }
176
Release(NativeThreadSafeFunctionReleaseMode mode)177 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
178 {
179 HILOG_INFO("NativeSafeAsyncWork::Release called");
180
181 std::unique_lock<std::mutex> lock(mutex_);
182
183 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
184 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
185 HILOG_WARN("Do not release, thread is closed!");
186 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
187 }
188
189 if (threadCount_ == 0) {
190 HILOG_ERROR("Do not release, thread count is zero.");
191 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
192 }
193
194 // decrease thread count
195 threadCount_--;
196
197 if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
198 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
199 if (maxQueueSize_ > 0) {
200 condition_.notify_one();
201 }
202 }
203
204 if (threadCount_ == 0 ||
205 mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
206 // trigger async handle
207 auto ret = uv_async_send(&asyncHandler_);
208 if (ret != 0) {
209 HILOG_ERROR("uv async send failed %d", ret);
210 return SafeAsyncCode::SAFE_ASYNC_FAILED;
211 }
212 }
213
214 return SafeAsyncCode::SAFE_ASYNC_OK;
215 }
216
Ref()217 bool NativeSafeAsyncWork::Ref()
218 {
219 if (!IsSameTid()) {
220 HILOG_ERROR("tid not same");
221 return false;
222 }
223
224 uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
225 uv_ref(reinterpret_cast<uv_handle_t*>(&idleHandler_));
226
227 return true;
228 }
229
Unref()230 bool NativeSafeAsyncWork::Unref()
231 {
232 if (!IsSameTid()) {
233 HILOG_ERROR("tid not same");
234 return false;
235 }
236
237 uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
238 uv_unref(reinterpret_cast<uv_handle_t*>(&idleHandler_));
239
240 return true;
241 }
242
GetContext()243 void* NativeSafeAsyncWork::GetContext()
244 {
245 return context_;
246 }
247
ProcessAsyncHandle()248 void NativeSafeAsyncWork::ProcessAsyncHandle()
249 {
250 HILOG_INFO("NativeSafeAsyncWork::ProcessAsyncHandle called");
251
252 std::unique_lock<std::mutex> lock(mutex_);
253 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
254 HILOG_ERROR("Process failed, thread is closed!");
255 return;
256 }
257
258 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
259 HILOG_ERROR("thread is closing!");
260
261 if (uv_idle_stop(&idleHandler_) != 0) {
262 HILOG_ERROR("uv idle stop failed");
263 }
264
265 CloseHandles();
266 return;
267 }
268
269 size_t size = queue_.size();
270 void* data = nullptr;
271
272 HILOG_INFO("queue size %d", (int32_t)size);
273 if (size > 0) {
274 data = queue_.front();
275 queue_.pop();
276
277 // when queue is full, notify send.
278 if (size == maxQueueSize_ && maxQueueSize_ > 0) {
279 condition_.notify_one();
280 }
281
282 if (callJsCallback_ != nullptr) {
283 callJsCallback_(engine_, func_, context_, data);
284 } else {
285 CallJs(engine_, func_, context_, data);
286 }
287 size--;
288 }
289
290 if (size == 0) {
291 if (uv_idle_stop(&idleHandler_) != 0) {
292 HILOG_ERROR("uv idle stop failed");
293 }
294 if (threadCount_ == 0) {
295 CloseHandles();
296 }
297 }
298 }
299
CloseHandles()300 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
301 {
302 HILOG_INFO("NativeSafeAsyncWork::CloseHandles called");
303
304 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
305 HILOG_INFO("Close failed, thread is closed!");
306 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
307 }
308
309 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
310
311 // close async handler
312 uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
313 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
314 reinterpret_cast<uv_async_t*>(handle));
315 // close idle handler
316 uv_close(reinterpret_cast<uv_handle_t*>(&that->idleHandler_), [](uv_handle_t* handle) {
317 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::idleHandler_,
318 reinterpret_cast<uv_idle_t*>(handle));
319 that->CleanUp();
320 });
321 });
322
323 return SafeAsyncCode::SAFE_ASYNC_OK;
324 }
325
CleanUp()326 void NativeSafeAsyncWork::CleanUp()
327 {
328 HILOG_INFO("NativeSafeAsyncWork::CleanUp called");
329
330 if (finalizeCallback_ != nullptr) {
331 finalizeCallback_(engine_, finalizeData_, context_);
332 }
333
334 // clean data
335 while (!queue_.empty()) {
336 if (callJsCallback_ != nullptr) {
337 callJsCallback_(nullptr, nullptr, context_, queue_.front());
338 } else {
339 CallJs(nullptr, nullptr, context_, queue_.front());
340 }
341 queue_.pop();
342 }
343 }
344
IsSameTid()345 bool NativeSafeAsyncWork::IsSameTid()
346 {
347 auto tid = pthread_self();
348 return (tid == engine_->GetTid()) ? true : false;
349 }
350