• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "native_safe_async_work.h"
17 
18 #include "napi/native_api.h"
19 #include "native_async_work.h"
20 #include "native_engine.h"
21 #include "native_value.h"
22 #include "securec.h"
23 #include "utils/log.h"
24 
25 // static methods start
AsyncCallback(uv_async_t * asyncHandler)26 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
27 {
28     HILOG_INFO("NativeSafeAsyncWork::AsyncCallback called");
29     NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
30 
31     that->ProcessAsyncHandle();
32 }
33 
CallJs(NativeEngine * engine,NativeValue * js_call_func,void * context,void * data)34 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, NativeValue* js_call_func, void* context, void* data)
35 {
36     HILOG_INFO("NativeSafeAsyncWork::CallJs called");
37 
38     if (engine == nullptr || js_call_func == nullptr) {
39         HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
40         return;
41     }
42 
43     auto value = engine->CreateUndefined();
44     if (value == nullptr) {
45         HILOG_ERROR("CreateUndefined failed");
46         return;
47     }
48 
49     auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
50     if (resultValue == nullptr) {
51         HILOG_ERROR("CallFunction failed");
52     }
53 }
54 // static methods end
55 
NativeSafeAsyncWork(NativeEngine * engine,NativeValue * func,NativeValue * asyncResource,NativeValue * asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)56 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
57                                          NativeValue* func,
58                                          NativeValue* asyncResource,
59                                          NativeValue* asyncResourceName,
60                                          size_t maxQueueSize,
61                                          size_t threadCount,
62                                          void* finalizeData,
63                                          NativeFinalize finalizeCallback,
64                                          void* context,
65                                          NativeThreadSafeFunctionCallJs callJsCallback)
66     :engine_(engine), maxQueueSize_(maxQueueSize),
67     threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
68     context_(context), callJsCallback_(callJsCallback)
69 {
70     errno_t err = EOK;
71     err = memset_s(&asyncContext_, sizeof(asyncContext_), 0, sizeof(asyncContext_));
72     if (err != EOK) {
73         return;
74     }
75 
76     asyncContext_.asyncResource = asyncResource;
77     asyncContext_.asyncResourceName = asyncResourceName;
78     if (func != nullptr) {
79         uint32_t initialRefcount = 1;
80         ref_ = engine->CreateReference(func, initialRefcount);
81     }
82 }
83 
~NativeSafeAsyncWork()84 NativeSafeAsyncWork::~NativeSafeAsyncWork()
85 {
86     if (ref_ != nullptr) {
87         delete ref_;
88         ref_ = nullptr;
89     }
90 }
91 
Init()92 bool NativeSafeAsyncWork::Init()
93 {
94     HILOG_INFO("NativeSafeAsyncWork::Init called");
95 
96     uv_loop_t* loop = engine_->GetUVLoop();
97     if (loop == nullptr) {
98         HILOG_ERROR("Get loop failed");
99         return false;
100     }
101 
102     int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
103     if (ret != 0) {
104         HILOG_ERROR("uv async init failed %d", ret);
105         uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), nullptr);
106         return false;
107     }
108 
109     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
110     return true;
111 }
112 
IsMaxQueueSize()113 bool NativeSafeAsyncWork::IsMaxQueueSize()
114 {
115     return (queue_.size() > maxQueueSize_ &&
116            maxQueueSize_ > 0 &&
117            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
118            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
119 }
120 
Send(void * data,NativeThreadSafeFunctionCallMode mode)121 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
122 {
123     HILOG_INFO("NativeSafeAsyncWork::Send called");
124 
125     std::unique_lock<std::mutex> lock(mutex_);
126     if (IsMaxQueueSize()) {
127         HILOG_INFO("queue size bigger than max queue size");
128         if (mode == NATIVE_TSFUNC_BLOCKING) {
129             while (IsMaxQueueSize()) {
130                 condition_.wait(lock);
131             }
132         } else {
133             return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
134         }
135     }
136 
137     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
138         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
139         if (threadCount_ == 0) {
140             return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
141         } else {
142             threadCount_--;
143             return SafeAsyncCode::SAFE_ASYNC_CLOSED;
144         }
145     } else {
146         queue_.push(data);
147         auto ret = uv_async_send(&asyncHandler_);
148         if (ret != 0) {
149             HILOG_ERROR("uv async send failed %d", ret);
150             return SafeAsyncCode::SAFE_ASYNC_FAILED;
151         }
152     }
153 
154     return SafeAsyncCode::SAFE_ASYNC_OK;
155 }
156 
Acquire()157 SafeAsyncCode NativeSafeAsyncWork::Acquire()
158 {
159     HILOG_INFO("NativeSafeAsyncWork::Acquire called");
160 
161     std::unique_lock<std::mutex> lock(mutex_);
162 
163     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
164         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
165         HILOG_WARN("Do not acquire, thread is closed!");
166         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
167     }
168 
169     // increase thread count
170     threadCount_++;
171 
172     return SafeAsyncCode::SAFE_ASYNC_OK;
173 }
174 
Release(NativeThreadSafeFunctionReleaseMode mode)175 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
176 {
177     HILOG_INFO("NativeSafeAsyncWork::Release called");
178 
179     std::unique_lock<std::mutex> lock(mutex_);
180 
181     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
182         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
183         HILOG_WARN("Do not release, thread is closed!");
184         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
185     }
186 
187     if (threadCount_ == 0) {
188         HILOG_ERROR("Do not release, thread count is zero.");
189         return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
190     }
191 
192     // decrease thread count
193     threadCount_--;
194 
195     if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
196         status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
197         if (maxQueueSize_ > 0) {
198             condition_.notify_one();
199         }
200     }
201 
202     if (threadCount_ == 0 ||
203         mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
204         // trigger async handle
205         auto ret = uv_async_send(&asyncHandler_);
206         if (ret != 0) {
207             HILOG_ERROR("uv async send failed %d", ret);
208             return SafeAsyncCode::SAFE_ASYNC_FAILED;
209         }
210     }
211 
212     return SafeAsyncCode::SAFE_ASYNC_OK;
213 }
214 
Ref()215 bool NativeSafeAsyncWork::Ref()
216 {
217     if (!IsSameTid()) {
218         HILOG_ERROR("tid not same");
219         return false;
220     }
221 
222     uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
223 
224     return true;
225 }
226 
Unref()227 bool NativeSafeAsyncWork::Unref()
228 {
229     if (!IsSameTid()) {
230         HILOG_ERROR("tid not same");
231         return false;
232     }
233 
234     uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
235 
236     return true;
237 }
238 
GetContext()239 void* NativeSafeAsyncWork::GetContext()
240 {
241     return context_;
242 }
243 
ProcessAsyncHandle()244 void NativeSafeAsyncWork::ProcessAsyncHandle()
245 {
246     HILOG_INFO("NativeSafeAsyncWork::ProcessAsyncHandle called");
247 
248     auto scopeManager = engine_->GetScopeManager();
249     if (scopeManager == nullptr) {
250         HILOG_ERROR("scope manager is null");
251         return;
252     }
253 
254     std::unique_lock<std::mutex> lock(mutex_);
255     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
256         HILOG_ERROR("Process failed, thread is closed!");
257         return;
258     }
259 
260     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
261         HILOG_ERROR("thread is closing!");
262         CloseHandles();
263         return;
264     }
265 
266     size_t size = queue_.size();
267     void* data = nullptr;
268 
269     HILOG_INFO("queue size %d", (int32_t)size);
270 
271     auto nativeScope = scopeManager->Open();
272 
273     while (size > 0) {
274         data = queue_.front();
275 
276         // when queue is full, notify send.
277         if (size == maxQueueSize_ && maxQueueSize_ > 0) {
278             condition_.notify_one();
279         }
280 
281         NativeValue* func_ = (ref_ == nullptr) ? nullptr : ref_->Get();
282         if (callJsCallback_ != nullptr) {
283             callJsCallback_(engine_, func_, context_, data);
284         } else {
285             CallJs(engine_, func_, context_, data);
286         }
287         queue_.pop();
288         size--;
289     }
290 
291     if (size == 0 && threadCount_ == 0) {
292         CloseHandles();
293     }
294 
295     scopeManager->Close(nativeScope);
296 }
297 
CloseHandles()298 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
299 {
300     HILOG_INFO("NativeSafeAsyncWork::CloseHandles called");
301 
302     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
303         HILOG_INFO("Close failed, thread is closed!");
304         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
305     }
306 
307     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
308 
309     // close async handler
310     uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
311         NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
312             reinterpret_cast<uv_async_t*>(handle));
313         that->CleanUp();
314     });
315 
316     return SafeAsyncCode::SAFE_ASYNC_OK;
317 }
318 
CleanUp()319 void NativeSafeAsyncWork::CleanUp()
320 {
321     HILOG_INFO("NativeSafeAsyncWork::CleanUp called");
322 
323     if (finalizeCallback_ != nullptr) {
324         finalizeCallback_(engine_, finalizeData_, context_);
325     }
326 
327     // clean data
328     while (!queue_.empty()) {
329         if (callJsCallback_ != nullptr) {
330             callJsCallback_(nullptr, nullptr, context_, queue_.front());
331         } else {
332             CallJs(nullptr, nullptr, context_, queue_.front());
333         }
334         queue_.pop();
335     }
336 }
337 
IsSameTid()338 bool NativeSafeAsyncWork::IsSameTid()
339 {
340     auto tid = pthread_self();
341     return (tid == engine_->GetTid()) ? true : false;
342 }
343