• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "native_safe_async_work.h"
17 
18 #include "napi/native_api.h"
19 #include "native_async_work.h"
20 #include "native_engine.h"
21 #include "native_value.h"
22 #include "securec.h"
23 #include "utils/log.h"
24 
25 // static methods start
AsyncCallback(uv_async_t * asyncHandler)26 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
27 {
28     HILOG_INFO("NativeSafeAsyncWork::AsyncCallback called");
29 
30     NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
31     auto ret = uv_idle_start(&that->idleHandler_, IdleCallback);
32     if (ret != 0) {
33         HILOG_ERROR("uv idle start failed %d", ret);
34         return;
35     }
36 }
37 
IdleCallback(uv_idle_t * idleHandler)38 void NativeSafeAsyncWork::IdleCallback(uv_idle_t* idleHandler)
39 {
40     HILOG_INFO("NativeSafeAsyncWork::IdleCallback called");
41     NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::idleHandler_, idleHandler);
42 
43     that->ProcessAsyncHandle();
44 }
45 
CallJs(NativeEngine * engine,NativeValue * js_call_func,void * context,void * data)46 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, NativeValue* js_call_func, void* context, void* data)
47 {
48     HILOG_INFO("NativeSafeAsyncWork::CallJs called");
49 
50     if (engine == nullptr || js_call_func == nullptr) {
51         HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
52         return;
53     }
54 
55     auto value = engine->CreateUndefined();
56     if (value == nullptr) {
57         HILOG_ERROR("CreateUndefined failed");
58         return;
59     }
60 
61     auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
62     if (resultValue == nullptr) {
63         HILOG_ERROR("CallFunction failed");
64     }
65 }
66 // static methods end
67 
NativeSafeAsyncWork(NativeEngine * engine,NativeValue * func,NativeValue * asyncResource,NativeValue * asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)68 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
69                                          NativeValue* func,
70                                          NativeValue* asyncResource,
71                                          NativeValue* asyncResourceName,
72                                          size_t maxQueueSize,
73                                          size_t threadCount,
74                                          void* finalizeData,
75                                          NativeFinalize finalizeCallback,
76                                          void* context,
77                                          NativeThreadSafeFunctionCallJs callJsCallback)
78     :engine_(engine), func_(func), maxQueueSize_(maxQueueSize),
79     threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
80     context_(context), callJsCallback_(callJsCallback)
81 {
82     errno_t err = EOK;
83     err = memset_s(&asyncContext_, sizeof(asyncContext_), 0, sizeof(asyncContext_));
84     if (err != EOK) {
85         return;
86     }
87 
88     asyncContext_.asyncResource = asyncResource;
89     asyncContext_.asyncResourceName = asyncResourceName;
90 }
91 
~NativeSafeAsyncWork()92 NativeSafeAsyncWork::~NativeSafeAsyncWork() {}
93 
Init()94 bool NativeSafeAsyncWork::Init()
95 {
96     HILOG_INFO("NativeSafeAsyncWork::Init called");
97 
98     uv_loop_t* loop = engine_->GetUVLoop();
99     if (loop == nullptr) {
100         HILOG_ERROR("Get loop failed");
101         return false;
102     }
103 
104     int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
105     if (ret != 0) {
106         HILOG_ERROR("uv async init failed %d", ret);
107         return false;
108     }
109 
110     ret = uv_idle_init(loop, &idleHandler_);
111     if (ret != 0) {
112         HILOG_ERROR("uv idle init failed %d", ret);
113         uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), nullptr);
114         return false;
115     }
116 
117     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
118     return true;
119 }
120 
Send(void * data,NativeThreadSafeFunctionCallMode mode)121 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
122 {
123     HILOG_INFO("NativeSafeAsyncWork::Send called");
124 
125     std::unique_lock<std::mutex> lock(mutex_);
126 
127     if (queue_.size() > maxQueueSize_ &&
128         maxQueueSize_ > 0 &&
129         status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
130         status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
131         HILOG_INFO("queue size bigger than max queue size");
132 
133         if (mode == NATIVE_TSFUNC_NONBLOCKING) {
134             return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
135         }
136         condition_.wait(lock);
137     }
138 
139     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
140         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
141         if (threadCount_ == 0) {
142             return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
143         } else {
144             threadCount_--;
145             return SafeAsyncCode::SAFE_ASYNC_CLOSED;
146         }
147     } else {
148         queue_.push(data);
149         auto ret = uv_async_send(&asyncHandler_);
150         if (ret != 0) {
151             HILOG_ERROR("uv async send failed %d", ret);
152             return SafeAsyncCode::SAFE_ASYNC_FAILED;
153         }
154     }
155 
156     return SafeAsyncCode::SAFE_ASYNC_OK;
157 }
158 
Acquire()159 SafeAsyncCode NativeSafeAsyncWork::Acquire()
160 {
161     HILOG_INFO("NativeSafeAsyncWork::Acquire called");
162 
163     std::unique_lock<std::mutex> lock(mutex_);
164 
165     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
166         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
167         HILOG_WARN("Do not acquire, thread is closed!");
168         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
169     }
170 
171     // increase thread count
172     threadCount_++;
173 
174     return SafeAsyncCode::SAFE_ASYNC_OK;
175 }
176 
Release(NativeThreadSafeFunctionReleaseMode mode)177 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
178 {
179     HILOG_INFO("NativeSafeAsyncWork::Release called");
180 
181     std::unique_lock<std::mutex> lock(mutex_);
182 
183     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
184         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
185         HILOG_WARN("Do not release, thread is closed!");
186         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
187     }
188 
189     if (threadCount_ == 0) {
190         HILOG_ERROR("Do not release, thread count is zero.");
191         return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
192     }
193 
194     // decrease thread count
195     threadCount_--;
196 
197     if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
198         status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
199         if (maxQueueSize_ > 0) {
200             condition_.notify_one();
201         }
202     }
203 
204     if (threadCount_ == 0 ||
205         mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
206         // trigger async handle
207         auto ret = uv_async_send(&asyncHandler_);
208         if (ret != 0) {
209             HILOG_ERROR("uv async send failed %d", ret);
210             return SafeAsyncCode::SAFE_ASYNC_FAILED;
211         }
212     }
213 
214     return SafeAsyncCode::SAFE_ASYNC_OK;
215 }
216 
Ref()217 bool NativeSafeAsyncWork::Ref()
218 {
219     if (!IsSameTid()) {
220         HILOG_ERROR("tid not same");
221         return false;
222     }
223 
224     uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
225     uv_ref(reinterpret_cast<uv_handle_t*>(&idleHandler_));
226 
227     return true;
228 }
229 
Unref()230 bool NativeSafeAsyncWork::Unref()
231 {
232     if (!IsSameTid()) {
233         HILOG_ERROR("tid not same");
234         return false;
235     }
236 
237     uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
238     uv_unref(reinterpret_cast<uv_handle_t*>(&idleHandler_));
239 
240     return true;
241 }
242 
GetContext()243 void* NativeSafeAsyncWork::GetContext()
244 {
245     return context_;
246 }
247 
ProcessAsyncHandle()248 void NativeSafeAsyncWork::ProcessAsyncHandle()
249 {
250     HILOG_INFO("NativeSafeAsyncWork::ProcessAsyncHandle called");
251 
252     std::unique_lock<std::mutex> lock(mutex_);
253     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
254         HILOG_ERROR("Process failed, thread is closed!");
255         return;
256     }
257 
258     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
259         HILOG_ERROR("thread is closing!");
260 
261         if (uv_idle_stop(&idleHandler_) != 0) {
262             HILOG_ERROR("uv idle stop failed");
263         }
264 
265         CloseHandles();
266         return;
267     }
268 
269     size_t size = queue_.size();
270     void* data = nullptr;
271 
272     HILOG_INFO("queue size %d", (int32_t)size);
273     if (size > 0) {
274         data = queue_.front();
275         queue_.pop();
276 
277         // when queue is full, notify send.
278         if (size == maxQueueSize_ && maxQueueSize_ > 0) {
279             condition_.notify_one();
280         }
281 
282         if (callJsCallback_ != nullptr) {
283             callJsCallback_(engine_, func_, context_, data);
284         } else {
285             CallJs(engine_, func_, context_, data);
286         }
287         size--;
288     }
289 
290     if (size == 0) {
291         if (uv_idle_stop(&idleHandler_) != 0) {
292             HILOG_ERROR("uv idle stop failed");
293         }
294         if (threadCount_ == 0) {
295             CloseHandles();
296         }
297     }
298 }
299 
CloseHandles()300 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
301 {
302     HILOG_INFO("NativeSafeAsyncWork::CloseHandles called");
303 
304     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
305         HILOG_INFO("Close failed, thread is closed!");
306         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
307     }
308 
309     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
310 
311     // close async handler
312     uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
313         NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
314             reinterpret_cast<uv_async_t*>(handle));
315         // close idle handler
316         uv_close(reinterpret_cast<uv_handle_t*>(&that->idleHandler_), [](uv_handle_t* handle) {
317             NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::idleHandler_,
318                 reinterpret_cast<uv_idle_t*>(handle));
319             that->CleanUp();
320         });
321     });
322 
323     return SafeAsyncCode::SAFE_ASYNC_OK;
324 }
325 
CleanUp()326 void NativeSafeAsyncWork::CleanUp()
327 {
328     HILOG_INFO("NativeSafeAsyncWork::CleanUp called");
329 
330     if (finalizeCallback_ != nullptr) {
331         finalizeCallback_(engine_, finalizeData_, context_);
332     }
333 
334     // clean data
335     while (!queue_.empty()) {
336         if (callJsCallback_ != nullptr) {
337             callJsCallback_(nullptr, nullptr, context_, queue_.front());
338         } else {
339             CallJs(nullptr, nullptr, context_, queue_.front());
340         }
341         queue_.pop();
342     }
343 }
344 
IsSameTid()345 bool NativeSafeAsyncWork::IsSameTid()
346 {
347     auto tid = pthread_self();
348     return (tid == engine_->GetTid()) ? true : false;
349 }
350