• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include <cinttypes>
17 
18 #include "native_safe_async_work.h"
19 
20 #include "native_api_internal.h"
21 #include <securec.h>
22 
23 #ifdef ENABLE_CONTAINER_SCOPE
24 #include "core/common/container_scope.h"
25 #endif
26 
27 #ifdef ENABLE_CONTAINER_SCOPE
28 using OHOS::Ace::ContainerScope;
29 #endif
30 
31 #if defined(ENABLE_EVENT_HANDLER)
32 #include "event_handler.h"
33 using namespace OHOS::AppExecFwk;
34 static constexpr int32_t API_VERSION_SIXTEEN = 16;
35 #endif
36 
37 // static methods start
AsyncCallback(uv_async_t * asyncHandler)38 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
39 {
40     HILOG_DEBUG("NativeSafeAsyncWork::AsyncCallback called");
41     NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
42     if (that == nullptr) {
43         HILOG_ERROR("NativeSafeAsyncWork:: DereferenceOf failed!");
44         return;
45     }
46     that->ProcessAsyncHandle();
47 }
48 
CallJs(NativeEngine * engine,napi_value js_call_func,void * context,void * data)49 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, napi_value js_call_func, void* context, void* data)
50 {
51     if (engine == nullptr || js_call_func == nullptr) {
52         HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
53         return;
54     }
55     napi_value value = nullptr;
56     napi_get_undefined(reinterpret_cast<napi_env>(engine), &value);
57     if (value == nullptr) {
58         HILOG_ERROR("CreateUndefined failed");
59         return;
60     }
61 
62     auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
63     if (resultValue == nullptr) {
64         HILOG_ERROR("CallFunction failed");
65     }
66 }
67 
NativeSafeAsyncWork(NativeEngine * engine,napi_value func,napi_value asyncResource,napi_value asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)68 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
69                                          napi_value func,
70                                          napi_value asyncResource,
71                                          napi_value asyncResourceName,
72                                          size_t maxQueueSize,
73                                          size_t threadCount,
74                                          void* finalizeData,
75                                          NativeFinalize finalizeCallback,
76                                          void* context,
77                                          NativeThreadSafeFunctionCallJs callJsCallback)
78     :engine_(engine), engineId_(engine->GetId()), maxQueueSize_(maxQueueSize),
79     threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
80     context_(context), callJsCallback_(callJsCallback)
81 {
82     asyncContext_.napiAsyncResource = asyncResource;
83     asyncContext_.napiAsyncResourceName = asyncResourceName;
84 
85     errno_t err = EOK;
86     err = memset_s(&asyncHandler_, sizeof(asyncHandler_), 0, sizeof(asyncHandler_));
87     if (err != EOK) {
88         HILOG_ERROR("faild to init asyncHandler_");
89         return;
90     }
91 
92     if (func != nullptr) {
93         uint32_t initialRefcount = 1;
94         ref_ = engine->CreateReference(func, initialRefcount);
95     }
96 
97 #ifdef ENABLE_CONTAINER_SCOPE
98     containerScopeId_ = ContainerScope::CurrentId();
99 #endif
100 
101 #if defined(ENABLE_EVENT_HANDLER)
102     runner_ = EventRunner::Current();
103 #endif
104 }
105 
~NativeSafeAsyncWork()106 NativeSafeAsyncWork::~NativeSafeAsyncWork()
107 {
108     if (ref_ != nullptr) {
109         delete ref_;
110         ref_ = nullptr;
111     }
112 }
113 
Init()114 bool NativeSafeAsyncWork::Init()
115 {
116     HILOG_DEBUG("NativeSafeAsyncWork::Init called");
117 
118     uv_loop_t* loop = engine_->GetUVLoop();
119     if (loop == nullptr) {
120         HILOG_ERROR("Get loop failed");
121         return false;
122     }
123 
124     int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
125     if (ret != 0) {
126         HILOG_ERROR("uv async send failed in Init ret = %{public}d", ret);
127         return false;
128     }
129 
130     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
131     return true;
132 }
133 
IsMaxQueueSize()134 bool NativeSafeAsyncWork::IsMaxQueueSize()
135 {
136     return (queue_.size() > maxQueueSize_ &&
137            maxQueueSize_ > 0 &&
138            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
139            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
140 }
141 
ValidEngineCheck()142 SafeAsyncCode NativeSafeAsyncWork::ValidEngineCheck()
143 {
144     if (!NativeEngine::IsAlive(engine_)) {
145         HILOG_ERROR("napi_env has been destoryed");
146         return SafeAsyncCode::SAFE_ASYNC_FAILED;
147     } else if (engineId_ != engine_->GetId()) {
148         LOG_IF_SPECIAL(engine_, UNLIKELY(engine_->IsCrossThreadCheckEnabled()),
149                        "current tsfn was created by dead env, "
150                        "owner id: %{public}" PRIu64 ", current env id: %{public}" PRIu64,
151                        engineId_, engine_->GetId());
152         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
153     }
154     return SafeAsyncCode::SAFE_ASYNC_OK;
155 }
156 
Send(void * data,NativeThreadSafeFunctionCallMode mode)157 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
158 {
159     std::unique_lock<std::mutex> lock(mutex_);
160     if (IsMaxQueueSize()) {
161         HILOG_INFO("queue size bigger than max queue size");
162         if (mode == NATIVE_TSFUNC_BLOCKING) {
163             while (IsMaxQueueSize()) {
164                 condition_.wait(lock);
165             }
166         } else {
167             return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
168         }
169     }
170 
171     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
172         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
173         if (threadCount_ == 0) {
174             return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
175         } else {
176             threadCount_--;
177             return SafeAsyncCode::SAFE_ASYNC_CLOSED;
178         }
179     } else {
180         SafeAsyncCode checkRet = ValidEngineCheck();
181         if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
182             return checkRet;
183         }
184         queue_.emplace_back(data);
185         auto ret = uv_async_send(&asyncHandler_);
186         if (ret != 0) {
187             HILOG_ERROR("uv async send failed in Send ret = %{public}d", ret);
188             return SafeAsyncCode::SAFE_ASYNC_FAILED;
189         }
190     }
191 
192     return SafeAsyncCode::SAFE_ASYNC_OK;
193 }
194 
Acquire()195 SafeAsyncCode NativeSafeAsyncWork::Acquire()
196 {
197     HILOG_DEBUG("NativeSafeAsyncWork::Acquire called");
198 
199     std::unique_lock<std::mutex> lock(mutex_);
200 
201     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
202         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
203         HILOG_WARN("Do not acquire, thread is closed!");
204         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
205     }
206 
207     // increase thread count
208     threadCount_++;
209 
210     return SafeAsyncCode::SAFE_ASYNC_OK;
211 }
212 
Release(NativeThreadSafeFunctionReleaseMode mode)213 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
214 {
215     HILOG_DEBUG("NativeSafeAsyncWork::Release called");
216 
217     std::unique_lock<std::mutex> lock(mutex_);
218 
219     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
220         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
221         HILOG_WARN("Do not release, thread is closed!");
222         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
223     }
224 
225     if (threadCount_ == 0) {
226         HILOG_ERROR("Do not release, thread count is zero.");
227         return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
228     }
229 
230     // decrease thread count
231     threadCount_--;
232 
233     if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
234         status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
235         if (maxQueueSize_ > 0) {
236             condition_.notify_one();
237         }
238     }
239 
240     if (threadCount_ == 0 ||
241         mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
242         SafeAsyncCode checkRet = ValidEngineCheck();
243         if (checkRet != SafeAsyncCode::SAFE_ASYNC_OK) {
244             return checkRet;
245         }
246         // trigger async handle
247         auto ret = uv_async_send(&asyncHandler_);
248         if (ret != 0) {
249             HILOG_ERROR("uv async send failed in Release ret = %{public}d", ret);
250             return SafeAsyncCode::SAFE_ASYNC_FAILED;
251         }
252     }
253 
254     return SafeAsyncCode::SAFE_ASYNC_OK;
255 }
256 
Ref()257 bool NativeSafeAsyncWork::Ref()
258 {
259     if (!IsSameTid()) {
260         HILOG_ERROR("tid not same");
261         return false;
262     }
263 
264     uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
265 
266     return true;
267 }
268 
Unref()269 bool NativeSafeAsyncWork::Unref()
270 {
271     if (!IsSameTid()) {
272         HILOG_ERROR("tid not same");
273         return false;
274     }
275 
276     uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
277 
278     return true;
279 }
280 
GetContext()281 void* NativeSafeAsyncWork::GetContext()
282 {
283     return context_;
284 }
285 
ProcessAsyncHandle()286 void NativeSafeAsyncWork::ProcessAsyncHandle()
287 {
288     std::unique_lock<std::mutex> lock(mutex_);
289     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
290         HILOG_ERROR("Process failed, thread is closed!");
291         return;
292     }
293 
294     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
295         HILOG_ERROR("thread is closing!");
296         CloseHandles();
297         return;
298     }
299 
300     size_t size = queue_.size();
301     void* data = nullptr;
302     auto vm = engine_->GetEcmaVm();
303     panda::LocalScope scope(vm);
304 #ifdef ENABLE_CONTAINER_SCOPE
305     ContainerScope containerScope(containerScopeId_);
306 #endif
307     TryCatch tryCatch(reinterpret_cast<napi_env>(engine_));
308     while (size > 0) {
309         data = queue_.front();
310         // when queue is full, notify send.
311         if (size == maxQueueSize_ && maxQueueSize_ > 0) {
312             condition_.notify_one();
313         }
314         napi_value func_ = (ref_ == nullptr) ? nullptr : ref_->Get(engine_);
315         lock.unlock();
316         if (callJsCallback_ != nullptr) {
317             callJsCallback_(engine_, func_, context_, data);
318         } else {
319             CallJs(engine_, func_, context_, data);
320         }
321         lock.lock();
322         if (tryCatch.HasCaught()) {
323             engine_->HandleUncaughtException();
324         }
325         queue_.pop_front();
326         size--;
327     }
328     if (!queue_.empty()) {
329         auto ret = uv_async_send(&asyncHandler_);
330         if (ret != 0) {
331             HILOG_ERROR("uv async send failed in ProcessAsyncHandle ret = %{public}d", ret);
332         }
333     }
334     if (queue_.empty() && threadCount_ == 0) {
335 #if defined(ENABLE_EVENT_HANDLER)
336         if (engine_->GetRealApiVersion() < API_VERSION_SIXTEEN) {
337             CloseHandles();
338             return;
339         }
340         if (taskSize_.load() == 0) {
341             CloseHandles();
342         }
343 #else
344         CloseHandles();
345 #endif
346     }
347 }
348 
CloseHandles()349 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
350 {
351     HILOG_DEBUG("NativeSafeAsyncWork::CloseHandles called");
352 
353     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
354         HILOG_INFO("Close failed, thread is closed!");
355         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
356     }
357 
358     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
359 
360     // close async handler
361     uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
362         NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
363             reinterpret_cast<uv_async_t*>(handle));
364         that->CleanUp();
365     });
366 
367     return SafeAsyncCode::SAFE_ASYNC_OK;
368 }
369 
CleanUp()370 void NativeSafeAsyncWork::CleanUp()
371 {
372     HILOG_DEBUG("NativeSafeAsyncWork::CleanUp called");
373 
374     if (finalizeCallback_ != nullptr) {
375         finalizeCallback_(engine_, finalizeData_, context_);
376     }
377 
378     // clean data
379     while (!queue_.empty()) {
380         if (callJsCallback_ != nullptr) {
381             callJsCallback_(nullptr, nullptr, context_, queue_.front());
382         } else {
383             CallJs(nullptr, nullptr, context_, queue_.front());
384         }
385         queue_.pop_front();
386     }
387 #if defined(ENABLE_EVENT_HANDLER)
388     if (engine_->GetRealApiVersion() < API_VERSION_SIXTEEN) {
389         delete this;
390         return;
391     }
392     if (taskSize_.load() == 0) {
393         delete this;
394     }
395 #else
396     delete this;
397 #endif
398 }
399 
IsSameTid()400 bool NativeSafeAsyncWork::IsSameTid()
401 {
402     auto tid = pthread_self();
403     return (tid == engine_->GetTid()) ? true : false;
404 }
405 
PostTask(void * data,int32_t priority,bool isTail)406 napi_status NativeSafeAsyncWork::PostTask(void *data, int32_t priority, bool isTail)
407 {
408 #if defined(ENABLE_EVENT_HANDLER)
409     HILOG_DEBUG("NativeSafeAsyncWork::PostTask called");
410     if (runner_ == nullptr || engine_ == nullptr) {
411         HILOG_ERROR("post task failed due to nullptr engine or eventRunner");
412         return napi_status::napi_generic_failure;
413     }
414     // the task will be execute at main thread or worker thread
415     auto task = [this, data]() {
416         HILOG_DEBUG("The task is executing in main thread or worker thread");
417         panda::LocalScope scope(this->engine_->GetEcmaVm());
418         napi_value func_ = (this->ref_ == nullptr) ? nullptr : this->ref_->Get(engine_);
419         if (this->callJsCallback_ != nullptr) {
420             this->callJsCallback_(engine_, func_, context_, data);
421         } else {
422             CallJs(engine_, func_, context_, data);
423         }
424         if (engine_->GetRealApiVersion() < API_VERSION_SIXTEEN) {
425             return;
426         }
427         taskSize_--;
428         std::unique_lock<std::mutex> lock(mutex_);
429         if (taskSize_.load() == 0 && queue_.empty()) {
430             if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
431                 HILOG_DEBUG("NativeSafeAsyncWork is closed!");
432                 delete this;
433                 return;
434             }
435             auto ret = uv_async_send(&asyncHandler_);
436             if (ret != 0) {
437                 HILOG_ERROR("uv async send failed in PostTask ret = %{public}d", ret);
438             }
439         }
440     };
441 
442     if (UNLIKELY(eventHandler_ == nullptr)) {
443         eventHandler_ = std::make_shared<EventHandler>(runner_);
444     }
445     bool res = false;
446     if (isTail) {
447         HILOG_DEBUG("The task is posted from tail");
448         res = eventHandler_->PostTask(task, static_cast<EventQueue::Priority>(priority));
449     } else {
450         HILOG_DEBUG("The task is posted from head");
451         res = eventHandler_->PostTaskAtFront(task, std::string(), static_cast<EventQueue::Priority>(priority));
452     }
453     if (res) {
454         if (engine_->GetRealApiVersion() >= API_VERSION_SIXTEEN) {
455             taskSize_++;
456         }
457         return napi_status::napi_ok;
458     }
459 #else
460     HILOG_WARN("EventHandler feature is not supported");
461 #endif
462     return napi_status::napi_generic_failure;
463 }
464