1 /*
2 * Copyright (c) 2021 Huawei Device Co., Ltd.
3 * Licensed under the Apache License, Version 2.0 (the "License");
4 * you may not use this file except in compliance with the License.
5 * You may obtain a copy of the License at
6 *
7 * http://www.apache.org/licenses/LICENSE-2.0
8 *
9 * Unless required by applicable law or agreed to in writing, software
10 * distributed under the License is distributed on an "AS IS" BASIS,
11 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 * See the License for the specific language governing permissions and
13 * limitations under the License.
14 */
15
16 #include <cinttypes>
17
18 #include "native_safe_async_work.h"
19
20 #include "native_api_internal.h"
21 #include <securec.h>
22
23 #ifdef ENABLE_HITRACE
24 #include "hitrace_meter.h"
25 #include "parameter.h"
26 #endif
27
28 #ifdef ENABLE_CONTAINER_SCOPE
29 #include "core/common/container_scope.h"
30 #endif
31
32 #ifdef ENABLE_CONTAINER_SCOPE
33 using OHOS::Ace::ContainerScope;
34 #endif
35
36 #if defined(ENABLE_EVENT_HANDLER)
37 #include "event_handler.h"
38 using namespace OHOS::AppExecFwk;
39 #endif
40
41 #ifdef ENABLE_HITRACE
42 std::atomic<bool> g_SafeWorkTraceIdEnabled(false);
43 std::once_flag g_SafeWorkParamUpdated;
44 constexpr size_t TRACEID_PARAM_SIZE = 10;
45 using namespace OHOS::HiviewDFX;
46 #endif
47
48 // static methods start
AsyncCallback(uv_async_t * asyncHandler)49 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
50 {
51 HILOG_DEBUG("NativeSafeAsyncWork::AsyncCallback called");
52 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
53 if (that == nullptr) {
54 HILOG_ERROR("NativeSafeAsyncWork:: DereferenceOf failed!");
55 return;
56 }
57 that->ProcessAsyncHandle();
58 }
59
CallJs(NativeEngine * engine,napi_value js_call_func,void * context,void * data)60 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, napi_value js_call_func, void* context, void* data)
61 {
62 if (engine == nullptr || js_call_func == nullptr) {
63 HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
64 return;
65 }
66 napi_value value = nullptr;
67 napi_get_undefined(reinterpret_cast<napi_env>(engine), &value);
68 if (value == nullptr) {
69 HILOG_ERROR("CreateUndefined failed");
70 return;
71 }
72
73 auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
74 if (resultValue == nullptr) {
75 HILOG_ERROR("CallFunction failed");
76 }
77 }
78
NativeSafeAsyncWork(NativeEngine * engine,napi_value func,napi_value asyncResource,napi_value asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)79 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
80 napi_value func,
81 napi_value asyncResource,
82 napi_value asyncResourceName,
83 size_t maxQueueSize,
84 size_t threadCount,
85 void* finalizeData,
86 NativeFinalize finalizeCallback,
87 void* context,
88 NativeThreadSafeFunctionCallJs callJsCallback)
89 :engine_(engine), engineId_(engine->GetId()), maxQueueSize_(maxQueueSize),
90 threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
91 context_(context), callJsCallback_(callJsCallback)
92 {
93 asyncContext_.napiAsyncResource = asyncResource;
94 asyncContext_.napiAsyncResourceName = asyncResourceName;
95
96 errno_t err = EOK;
97 err = memset_s(&asyncHandler_, sizeof(asyncHandler_), 0, sizeof(asyncHandler_));
98 if (err != EOK) {
99 HILOG_ERROR("faild to init asyncHandler_");
100 return;
101 }
102
103 if (func != nullptr) {
104 uint32_t initialRefcount = 1;
105 ref_ = engine->CreateReference(func, initialRefcount);
106 }
107
108 #ifdef ENABLE_CONTAINER_SCOPE
109 if (engine->IsContainerScopeEnabled()) {
110 containerScopeId_ = ContainerScope::CurrentId();
111 }
112 #endif
113
114 #if defined(ENABLE_EVENT_HANDLER)
115 std::shared_ptr<EventRunner> runner = EventRunner::Current();
116 if (runner != nullptr) {
117 eventHandler_ = std::make_shared<EventHandler>(runner);
118 }
119 #endif
120
121 InitSafeAsyncWorkTraceId();
122 }
123
~NativeSafeAsyncWork()124 NativeSafeAsyncWork::~NativeSafeAsyncWork()
125 {
126 if (ref_ != nullptr) {
127 delete ref_;
128 ref_ = nullptr;
129 }
130
131 engine_->DecreaseActiveTsfnCounter();
132 }
133
Init()134 bool NativeSafeAsyncWork::Init()
135 {
136 HILOG_DEBUG("NativeSafeAsyncWork::Init called");
137
138 uv_loop_t* loop = nullptr;
139 if (engine_->IsMainEnvContext()) {
140 loop = engine_->GetUVLoop();
141 } else {
142 loop = engine_->GetParent()->GetUVLoop();
143 }
144
145 if (loop == nullptr) {
146 HILOG_ERROR("Get loop failed");
147 return false;
148 }
149
150 int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
151 if (ret != 0) {
152 HILOG_ERROR("uv async send failed in Init ret = %{public}d", ret);
153 return false;
154 }
155
156 // only uv_async init successd will leak to uv_loop
157 engine_->IncreaseActiveTsfnCounter();
158
159 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
160 return true;
161 }
162
IsMaxQueueSize()163 bool NativeSafeAsyncWork::IsMaxQueueSize()
164 {
165 return (queue_.size() > maxQueueSize_ &&
166 maxQueueSize_ > 0 &&
167 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
168 status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
169 }
170
ValidEngineCheck()171 SafeAsyncCode NativeSafeAsyncWork::ValidEngineCheck()
172 {
173 if (!NativeEngine::IsAlive(engine_)) {
174 HILOG_WARN("napi_env has been destoryed");
175 return SafeAsyncCode::SAFE_ASYNC_FAILED;
176 } else if (engineId_ != engine_->GetId()) {
177 LOG_IF_SPECIAL(engine_, UNLIKELY(engine_->IsCrossThreadCheckEnabled()),
178 "current tsfn was created by dead env, "
179 "owner id: %{public}" PRIu64 ", current env id: %{public}" PRIu64,
180 engineId_, engine_->GetId());
181 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
182 }
183 return SafeAsyncCode::SAFE_ASYNC_OK;
184 }
185
Send(void * data,NativeThreadSafeFunctionCallMode mode)186 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
187 {
188 std::unique_lock<std::mutex> lock(mutex_);
189 if (IsMaxQueueSize()) {
190 HILOG_INFO("queue size bigger than max queue size");
191 if (mode == NATIVE_TSFUNC_BLOCKING) {
192 while (IsMaxQueueSize()) {
193 condition_.wait(lock);
194 }
195 } else {
196 return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
197 }
198 }
199
200 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
201 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
202 if (threadCount_ == 0) {
203 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
204 } else {
205 threadCount_--;
206 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
207 }
208 } else {
209 SafeAsyncCode checkRet = ValidEngineCheck();
210 if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
211 return checkRet;
212 }
213 queue_.emplace_back(data);
214 auto ret = uv_async_send(&asyncHandler_);
215 if (ret != 0) {
216 HILOG_ERROR("uv async send failed in Send ret = %{public}d", ret);
217 return SafeAsyncCode::SAFE_ASYNC_FAILED;
218 }
219 }
220
221 return SafeAsyncCode::SAFE_ASYNC_OK;
222 }
223
Acquire()224 SafeAsyncCode NativeSafeAsyncWork::Acquire()
225 {
226 HILOG_DEBUG("NativeSafeAsyncWork::Acquire called");
227
228 std::unique_lock<std::mutex> lock(mutex_);
229
230 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
231 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
232 HILOG_WARN("Do not acquire, thread is closed!");
233 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
234 }
235
236 // increase thread count
237 threadCount_++;
238
239 return SafeAsyncCode::SAFE_ASYNC_OK;
240 }
241
Release(NativeThreadSafeFunctionReleaseMode mode)242 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
243 {
244 HILOG_DEBUG("NativeSafeAsyncWork::Release called");
245
246 std::unique_lock<std::mutex> lock(mutex_);
247
248 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
249 status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
250 HILOG_WARN("Do not release, thread is closed!");
251 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
252 }
253
254 if (threadCount_ == 0) {
255 HILOG_ERROR("Do not release, thread count is zero.");
256 return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
257 }
258
259 // decrease thread count
260 threadCount_--;
261
262 if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
263 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
264 if (maxQueueSize_ > 0) {
265 condition_.notify_one();
266 }
267 }
268
269 if (threadCount_ == 0 ||
270 mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
271 SafeAsyncCode checkRet = ValidEngineCheck();
272 if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
273 return checkRet;
274 }
275 // trigger async handle
276 auto ret = uv_async_send(&asyncHandler_);
277 if (ret != 0) {
278 HILOG_ERROR("uv async send failed in Release ret = %{public}d", ret);
279 return SafeAsyncCode::SAFE_ASYNC_FAILED;
280 }
281 }
282
283 return SafeAsyncCode::SAFE_ASYNC_OK;
284 }
285
Ref()286 bool NativeSafeAsyncWork::Ref()
287 {
288 if (!IsSameTid()) {
289 HILOG_ERROR("tid not same");
290 return false;
291 }
292
293 uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
294
295 return true;
296 }
297
Unref()298 bool NativeSafeAsyncWork::Unref()
299 {
300 if (!IsSameTid()) {
301 HILOG_ERROR("tid not same");
302 return false;
303 }
304
305 uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
306
307 return true;
308 }
309
GetContext()310 void* NativeSafeAsyncWork::GetContext()
311 {
312 return context_;
313 }
314
ProcessAsyncHandle()315 void NativeSafeAsyncWork::ProcessAsyncHandle()
316 {
317 std::unique_lock<std::mutex> lock(mutex_);
318 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
319 HILOG_ERROR("Process failed, thread is closed!");
320 return;
321 }
322
323 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
324 HILOG_DEBUG("threadsafe function is closing!");
325 CloseHandles();
326 return;
327 }
328
329 size_t size = queue_.size();
330 void* data = nullptr;
331
332 auto vm = engine_->GetEcmaVm();
333 panda::LocalScope scope(vm);
334 #ifdef ENABLE_CONTAINER_SCOPE
335 ContainerScope containerScope(containerScopeId_, engine_->IsContainerScopeEnabled());
336 #endif
337 TryCatch tryCatch(reinterpret_cast<napi_env>(engine_));
338
339 bool isValidTraceId = SaveAndSetTraceId();
340 while (size > 0) {
341 data = queue_.front();
342
343 // when queue is full, notify send.
344 if (size == maxQueueSize_ && maxQueueSize_ > 0) {
345 condition_.notify_one();
346 }
347
348 napi_value func_ = (ref_ == nullptr) ? nullptr : ref_->Get(engine_);
349 lock.unlock();
350 if (callJsCallback_ != nullptr) {
351 callJsCallback_(engine_, func_, context_, data);
352 } else {
353 CallJs(engine_, func_, context_, data);
354 }
355 lock.lock();
356
357 if (tryCatch.HasCaught()) {
358 engine_->HandleUncaughtException();
359 }
360 queue_.pop_front();
361 size--;
362 }
363 RestoreTraceId(isValidTraceId);
364
365 if (!queue_.empty()) {
366 auto ret = uv_async_send(&asyncHandler_);
367 if (ret != 0) {
368 HILOG_ERROR("uv async send failed in ProcessAsyncHandle ret = %{public}d", ret);
369 }
370 }
371
372 if (queue_.empty() && threadCount_ == 0) {
373 CloseHandles();
374 }
375 }
376
CloseHandles()377 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
378 {
379 HILOG_DEBUG("NativeSafeAsyncWork::CloseHandles called");
380
381 if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
382 HILOG_INFO("Close failed, thread is closed!");
383 return SafeAsyncCode::SAFE_ASYNC_CLOSED;
384 }
385
386 status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
387
388 // close async handler
389 uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
390 NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
391 reinterpret_cast<uv_async_t*>(handle));
392 that->CleanUp();
393 });
394
395 return SafeAsyncCode::SAFE_ASYNC_OK;
396 }
397
CleanUp()398 void NativeSafeAsyncWork::CleanUp()
399 {
400 HILOG_DEBUG("NativeSafeAsyncWork::CleanUp called");
401 bool isValidTraceId = SaveAndSetTraceId();
402 if (finalizeCallback_ != nullptr) {
403 finalizeCallback_(engine_, finalizeData_, context_);
404 }
405
406 // clean data
407 while (!queue_.empty()) {
408 if (callJsCallback_ != nullptr) {
409 callJsCallback_(nullptr, nullptr, context_, queue_.front());
410 } else {
411 CallJs(nullptr, nullptr, context_, queue_.front());
412 }
413 queue_.pop_front();
414 }
415 ClearTraceId(isValidTraceId);
416
417 delete this;
418 }
419
IsSameTid()420 bool NativeSafeAsyncWork::IsSameTid()
421 {
422 auto tid = pthread_self();
423 return (tid == engine_->GetTid()) ? true : false;
424 }
425
PostTask(void * data,int32_t priority,bool isTail)426 napi_status NativeSafeAsyncWork::PostTask(void *data, int32_t priority, bool isTail)
427 {
428 #if defined(ENABLE_EVENT_HANDLER)
429 HILOG_DEBUG("NativeSafeAsyncWork::PostTask called");
430 std::unique_lock<std::mutex> lock(eventHandlerMutex_);
431 if (engine_ == nullptr || eventHandler_ == nullptr) {
432 HILOG_ERROR("post task failed due to nullptr engine or eventHandler");
433 return napi_status::napi_generic_failure;
434 }
435 // the task will be execute at main thread or worker thread
436 auto task = [this, data]() {
437 HILOG_DEBUG("The task is executing in main thread or worker thread");
438 panda::LocalScope scope(this->engine_->GetEcmaVm());
439 napi_value func_ = (this->ref_ == nullptr) ? nullptr : this->ref_->Get(engine_);
440 bool isValidTraceId = SaveAndSetTraceId();
441 if (this->callJsCallback_ != nullptr) {
442 this->callJsCallback_(engine_, func_, context_, data);
443 } else {
444 CallJs(engine_, func_, context_, data);
445 }
446 RestoreTraceId(isValidTraceId);
447 };
448
449 bool res = false;
450 if (isTail) {
451 HILOG_DEBUG("The task is posted from tail");
452 res = eventHandler_->PostTask(task, static_cast<EventQueue::Priority>(priority));
453 } else {
454 HILOG_DEBUG("The task is posted from head");
455 res = eventHandler_->PostTaskAtFront(task, std::string(), static_cast<EventQueue::Priority>(priority));
456 }
457
458 return res ? napi_status::napi_ok : napi_status::napi_generic_failure;
459 #else
460 HILOG_WARN("EventHandler feature is not supported");
461 return napi_status::napi_generic_failure;
462 #endif
463 }
464
InitSafeAsyncWorkTraceId()465 void NativeSafeAsyncWork::InitSafeAsyncWorkTraceId()
466 {
467 #ifdef ENABLE_HITRACE
468 std::call_once(g_SafeWorkParamUpdated, []() {
469 char napiTraceIdEnabled[TRACEID_PARAM_SIZE] = {0};
470 int ret = GetParameter("persist.hiviewdfx.napitraceid.enabled", "false",
471 napiTraceIdEnabled, sizeof(napiTraceIdEnabled));
472 if (ret > 0 && strcmp(napiTraceIdEnabled, "true") == 0) {
473 g_SafeWorkTraceIdEnabled.store(true);
474 }
475 });
476 bool createdTraceId = false;
477 HiTraceId thisId = HiTraceChain::GetId();
478 if (g_SafeWorkTraceIdEnabled.load() && (!thisId.IsValid())) {
479 thisId = HiTraceChain::Begin("New NativeAsyncWork", 0);
480 createdTraceId = true;
481 }
482 if (thisId.IsValid()) {
483 taskTraceId_ = HiTraceChain::CreateSpan();
484 }
485 if (createdTraceId) {
486 OHOS::HiviewDFX::HiTraceChain::ClearId();
487 }
488 #endif
489 }
490