• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <cinttypes>
17 
18 #include "native_safe_async_work.h"
19 
20 #include "native_api_internal.h"
21 #include <securec.h>
22 
23 #ifdef ENABLE_HITRACE
24 #include "hitrace_meter.h"
25 #include "parameter.h"
26 #endif
27 
28 #ifdef ENABLE_CONTAINER_SCOPE
29 #include "core/common/container_scope.h"
30 #endif
31 
32 #ifdef ENABLE_CONTAINER_SCOPE
33 using OHOS::Ace::ContainerScope;
34 #endif
35 
36 #if defined(ENABLE_EVENT_HANDLER)
37 #include "event_handler.h"
38 using namespace OHOS::AppExecFwk;
39 #endif
40 
41 #ifdef ENABLE_HITRACE
42 std::atomic<bool> g_SafeWorkTraceIdEnabled(false);
43 std::once_flag g_SafeWorkParamUpdated;
44 constexpr size_t TRACEID_PARAM_SIZE = 10;
45 using namespace OHOS::HiviewDFX;
46 #endif
47 
48 // static methods start
AsyncCallback(uv_async_t * asyncHandler)49 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
50 {
51     HILOG_DEBUG("NativeSafeAsyncWork::AsyncCallback called");
52     NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
53     if (that == nullptr) {
54         HILOG_ERROR("NativeSafeAsyncWork:: DereferenceOf failed!");
55         return;
56     }
57     that->ProcessAsyncHandle();
58 }
59 
CallJs(NativeEngine * engine,napi_value js_call_func,void * context,void * data)60 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, napi_value js_call_func, void* context, void* data)
61 {
62     if (engine == nullptr || js_call_func == nullptr) {
63         HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
64         return;
65     }
66     napi_value value = nullptr;
67     napi_get_undefined(reinterpret_cast<napi_env>(engine), &value);
68     if (value == nullptr) {
69         HILOG_ERROR("CreateUndefined failed");
70         return;
71     }
72 
73     auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
74     if (resultValue == nullptr) {
75         HILOG_ERROR("CallFunction failed");
76     }
77 }
78 
NativeSafeAsyncWork(NativeEngine * engine,napi_value func,napi_value asyncResource,napi_value asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)79 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
80                                          napi_value func,
81                                          napi_value asyncResource,
82                                          napi_value asyncResourceName,
83                                          size_t maxQueueSize,
84                                          size_t threadCount,
85                                          void* finalizeData,
86                                          NativeFinalize finalizeCallback,
87                                          void* context,
88                                          NativeThreadSafeFunctionCallJs callJsCallback)
89     :engine_(engine), engineId_(engine->GetId()), maxQueueSize_(maxQueueSize),
90     threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
91     context_(context), callJsCallback_(callJsCallback)
92 {
93     asyncContext_.napiAsyncResource = asyncResource;
94     asyncContext_.napiAsyncResourceName = asyncResourceName;
95 
96     errno_t err = EOK;
97     err = memset_s(&asyncHandler_, sizeof(asyncHandler_), 0, sizeof(asyncHandler_));
98     if (err != EOK) {
99         HILOG_ERROR("faild to init asyncHandler_");
100         return;
101     }
102 
103     if (func != nullptr) {
104         uint32_t initialRefcount = 1;
105         ref_ = engine->CreateReference(func, initialRefcount);
106     }
107 
108 #ifdef ENABLE_CONTAINER_SCOPE
109     if (engine->IsContainerScopeEnabled()) {
110         containerScopeId_ = ContainerScope::CurrentId();
111     }
112 #endif
113 
114 #if defined(ENABLE_EVENT_HANDLER)
115     std::shared_ptr<EventRunner> runner = EventRunner::Current();
116     if (runner != nullptr) {
117         eventHandler_ = std::make_shared<EventHandler>(runner);
118     }
119 #endif
120 
121     InitSafeAsyncWorkTraceId();
122 }
123 
~NativeSafeAsyncWork()124 NativeSafeAsyncWork::~NativeSafeAsyncWork()
125 {
126     if (ref_ != nullptr) {
127         delete ref_;
128         ref_ = nullptr;
129     }
130 
131     engine_->DecreaseActiveTsfnCounter();
132 }
133 
Init()134 bool NativeSafeAsyncWork::Init()
135 {
136     HILOG_DEBUG("NativeSafeAsyncWork::Init called");
137 
138     uv_loop_t* loop = nullptr;
139     if (engine_->IsMainEnvContext()) {
140         loop = engine_->GetUVLoop();
141     } else {
142         loop = engine_->GetParent()->GetUVLoop();
143     }
144 
145     if (loop == nullptr) {
146         HILOG_ERROR("Get loop failed");
147         return false;
148     }
149 
150     int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
151     if (ret != 0) {
152         HILOG_ERROR("uv async send failed in Init ret = %{public}d", ret);
153         return false;
154     }
155 
156     // only uv_async init successd will leak to uv_loop
157     engine_->IncreaseActiveTsfnCounter();
158 
159     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
160     return true;
161 }
162 
IsMaxQueueSize()163 bool NativeSafeAsyncWork::IsMaxQueueSize()
164 {
165     return (queue_.size() > maxQueueSize_ &&
166            maxQueueSize_ > 0 &&
167            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
168            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
169 }
170 
ValidEngineCheck()171 SafeAsyncCode NativeSafeAsyncWork::ValidEngineCheck()
172 {
173     if (!NativeEngine::IsAlive(engine_)) {
174         HILOG_WARN("napi_env has been destoryed");
175         return SafeAsyncCode::SAFE_ASYNC_FAILED;
176     } else if (engineId_ != engine_->GetId()) {
177         LOG_IF_SPECIAL(engine_, UNLIKELY(engine_->IsCrossThreadCheckEnabled()),
178                        "current tsfn was created by dead env, "
179                        "owner id: %{public}" PRIu64 ", current env id: %{public}" PRIu64,
180                        engineId_, engine_->GetId());
181         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
182     }
183     return SafeAsyncCode::SAFE_ASYNC_OK;
184 }
185 
Send(void * data,NativeThreadSafeFunctionCallMode mode)186 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
187 {
188     std::unique_lock<std::mutex> lock(mutex_);
189     if (IsMaxQueueSize()) {
190         HILOG_INFO("queue size bigger than max queue size");
191         if (mode == NATIVE_TSFUNC_BLOCKING) {
192             while (IsMaxQueueSize()) {
193                 condition_.wait(lock);
194             }
195         } else {
196             return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
197         }
198     }
199 
200     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
201         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
202         if (threadCount_ == 0) {
203             return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
204         } else {
205             threadCount_--;
206             return SafeAsyncCode::SAFE_ASYNC_CLOSED;
207         }
208     } else {
209         SafeAsyncCode checkRet = ValidEngineCheck();
210         if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
211             return checkRet;
212         }
213         queue_.emplace_back(data);
214         auto ret = uv_async_send(&asyncHandler_);
215         if (ret != 0) {
216             HILOG_ERROR("uv async send failed in Send ret = %{public}d", ret);
217             return SafeAsyncCode::SAFE_ASYNC_FAILED;
218         }
219     }
220 
221     return SafeAsyncCode::SAFE_ASYNC_OK;
222 }
223 
Acquire()224 SafeAsyncCode NativeSafeAsyncWork::Acquire()
225 {
226     HILOG_DEBUG("NativeSafeAsyncWork::Acquire called");
227 
228     std::unique_lock<std::mutex> lock(mutex_);
229 
230     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
231         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
232         HILOG_WARN("Do not acquire, thread is closed!");
233         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
234     }
235 
236     // increase thread count
237     threadCount_++;
238 
239     return SafeAsyncCode::SAFE_ASYNC_OK;
240 }
241 
Release(NativeThreadSafeFunctionReleaseMode mode)242 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
243 {
244     HILOG_DEBUG("NativeSafeAsyncWork::Release called");
245 
246     std::unique_lock<std::mutex> lock(mutex_);
247 
248     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
249         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
250         HILOG_WARN("Do not release, thread is closed!");
251         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
252     }
253 
254     if (threadCount_ == 0) {
255         HILOG_ERROR("Do not release, thread count is zero.");
256         return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
257     }
258 
259     // decrease thread count
260     threadCount_--;
261 
262     if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
263         status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
264         if (maxQueueSize_ > 0) {
265             condition_.notify_one();
266         }
267     }
268 
269     if (threadCount_ == 0 ||
270         mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
271         SafeAsyncCode checkRet = ValidEngineCheck();
272         if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
273             return checkRet;
274         }
275         // trigger async handle
276         auto ret = uv_async_send(&asyncHandler_);
277         if (ret != 0) {
278             HILOG_ERROR("uv async send failed in Release ret = %{public}d", ret);
279             return SafeAsyncCode::SAFE_ASYNC_FAILED;
280         }
281     }
282 
283     return SafeAsyncCode::SAFE_ASYNC_OK;
284 }
285 
Ref()286 bool NativeSafeAsyncWork::Ref()
287 {
288     if (!IsSameTid()) {
289         HILOG_ERROR("tid not same");
290         return false;
291     }
292 
293     uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
294 
295     return true;
296 }
297 
Unref()298 bool NativeSafeAsyncWork::Unref()
299 {
300     if (!IsSameTid()) {
301         HILOG_ERROR("tid not same");
302         return false;
303     }
304 
305     uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
306 
307     return true;
308 }
309 
GetContext()310 void* NativeSafeAsyncWork::GetContext()
311 {
312     return context_;
313 }
314 
ProcessAsyncHandle()315 void NativeSafeAsyncWork::ProcessAsyncHandle()
316 {
317     std::unique_lock<std::mutex> lock(mutex_);
318     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
319         HILOG_ERROR("Process failed, thread is closed!");
320         return;
321     }
322 
323     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
324         HILOG_DEBUG("threadsafe function is closing!");
325         CloseHandles();
326         return;
327     }
328 
329     size_t size = queue_.size();
330     void* data = nullptr;
331 
332     auto vm = engine_->GetEcmaVm();
333     panda::LocalScope scope(vm);
334 #ifdef ENABLE_CONTAINER_SCOPE
335     ContainerScope containerScope(containerScopeId_, engine_->IsContainerScopeEnabled());
336 #endif
337     TryCatch tryCatch(reinterpret_cast<napi_env>(engine_));
338 
339     bool isValidTraceId = SaveAndSetTraceId();
340     while (size > 0) {
341         data = queue_.front();
342 
343         // when queue is full, notify send.
344         if (size == maxQueueSize_ && maxQueueSize_ > 0) {
345             condition_.notify_one();
346         }
347 
348         napi_value func_ = (ref_ == nullptr) ? nullptr : ref_->Get(engine_);
349         lock.unlock();
350         if (callJsCallback_ != nullptr) {
351             callJsCallback_(engine_, func_, context_, data);
352         } else {
353             CallJs(engine_, func_, context_, data);
354         }
355         lock.lock();
356 
357         if (tryCatch.HasCaught()) {
358             engine_->HandleUncaughtException();
359         }
360         queue_.pop_front();
361         size--;
362     }
363     RestoreTraceId(isValidTraceId);
364 
365     if (!queue_.empty()) {
366         auto ret = uv_async_send(&asyncHandler_);
367         if (ret != 0) {
368             HILOG_ERROR("uv async send failed in ProcessAsyncHandle ret = %{public}d", ret);
369         }
370     }
371 
372     if (queue_.empty() && threadCount_ == 0) {
373         CloseHandles();
374     }
375 }
376 
CloseHandles()377 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
378 {
379     HILOG_DEBUG("NativeSafeAsyncWork::CloseHandles called");
380 
381     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
382         HILOG_INFO("Close failed, thread is closed!");
383         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
384     }
385 
386     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
387 
388     // close async handler
389     uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
390         NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
391             reinterpret_cast<uv_async_t*>(handle));
392         that->CleanUp();
393     });
394 
395     return SafeAsyncCode::SAFE_ASYNC_OK;
396 }
397 
CleanUp()398 void NativeSafeAsyncWork::CleanUp()
399 {
400     HILOG_DEBUG("NativeSafeAsyncWork::CleanUp called");
401     bool isValidTraceId = SaveAndSetTraceId();
402     if (finalizeCallback_ != nullptr) {
403         finalizeCallback_(engine_, finalizeData_, context_);
404     }
405 
406     // clean data
407     while (!queue_.empty()) {
408         if (callJsCallback_ != nullptr) {
409             callJsCallback_(nullptr, nullptr, context_, queue_.front());
410         } else {
411             CallJs(nullptr, nullptr, context_, queue_.front());
412         }
413         queue_.pop_front();
414     }
415     ClearTraceId(isValidTraceId);
416 
417     delete this;
418 }
419 
IsSameTid()420 bool NativeSafeAsyncWork::IsSameTid()
421 {
422     auto tid = pthread_self();
423     return (tid == engine_->GetTid()) ? true : false;
424 }
425 
PostTask(void * data,int32_t priority,bool isTail)426 napi_status NativeSafeAsyncWork::PostTask(void *data, int32_t priority, bool isTail)
427 {
428 #if defined(ENABLE_EVENT_HANDLER)
429     HILOG_DEBUG("NativeSafeAsyncWork::PostTask called");
430     std::unique_lock<std::mutex> lock(eventHandlerMutex_);
431     if (engine_ == nullptr || eventHandler_ == nullptr) {
432         HILOG_ERROR("post task failed due to nullptr engine or eventHandler");
433         return napi_status::napi_generic_failure;
434     }
435     // the task will be execute at main thread or worker thread
436     auto task = [this, data]() {
437         HILOG_DEBUG("The task is executing in main thread or worker thread");
438         panda::LocalScope scope(this->engine_->GetEcmaVm());
439         napi_value func_ = (this->ref_ == nullptr) ? nullptr : this->ref_->Get(engine_);
440         bool isValidTraceId = SaveAndSetTraceId();
441         if (this->callJsCallback_ != nullptr) {
442             this->callJsCallback_(engine_, func_, context_, data);
443         } else {
444             CallJs(engine_, func_, context_, data);
445         }
446         RestoreTraceId(isValidTraceId);
447     };
448 
449     bool res = false;
450     if (isTail) {
451         HILOG_DEBUG("The task is posted from tail");
452         res = eventHandler_->PostTask(task, static_cast<EventQueue::Priority>(priority));
453     } else {
454         HILOG_DEBUG("The task is posted from head");
455         res = eventHandler_->PostTaskAtFront(task, std::string(), static_cast<EventQueue::Priority>(priority));
456     }
457 
458     return res ? napi_status::napi_ok : napi_status::napi_generic_failure;
459 #else
460     HILOG_WARN("EventHandler feature is not supported");
461     return napi_status::napi_generic_failure;
462 #endif
463 }
464 
InitSafeAsyncWorkTraceId()465 void NativeSafeAsyncWork::InitSafeAsyncWorkTraceId()
466 {
467 #ifdef ENABLE_HITRACE
468     std::call_once(g_SafeWorkParamUpdated, []() {
469         char napiTraceIdEnabled[TRACEID_PARAM_SIZE] = {0};
470         int ret = GetParameter("persist.hiviewdfx.napitraceid.enabled", "false",
471             napiTraceIdEnabled, sizeof(napiTraceIdEnabled));
472         if (ret > 0 && strcmp(napiTraceIdEnabled, "true") == 0) {
473             g_SafeWorkTraceIdEnabled.store(true);
474         }
475     });
476     bool createdTraceId = false;
477     HiTraceId thisId = HiTraceChain::GetId();
478     if (g_SafeWorkTraceIdEnabled.load() && (!thisId.IsValid())) {
479         thisId = HiTraceChain::Begin("New NativeAsyncWork", 0);
480         createdTraceId = true;
481     }
482     if (thisId.IsValid()) {
483         taskTraceId_ = HiTraceChain::CreateSpan();
484     }
485     if (createdTraceId) {
486         OHOS::HiviewDFX::HiTraceChain::ClearId();
487     }
488 #endif
489 }
490