• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * Copyright (c) 2021 Huawei Device Co., Ltd.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 
16 #include "native_safe_async_work.h"
17 
18 #include "napi/native_api.h"
19 #include "native_async_work.h"
20 #include "native_engine.h"
21 #include "native_value.h"
22 #include "securec.h"
23 #include "utils/log.h"
24 
25 // static methods start
AsyncCallback(uv_async_t * asyncHandler)26 void NativeSafeAsyncWork::AsyncCallback(uv_async_t* asyncHandler)
27 {
28     HILOG_INFO("NativeSafeAsyncWork::AsyncCallback called");
29 
30     NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_, asyncHandler);
31     auto ret = uv_idle_start(&that->idleHandler_, IdleCallback);
32     if (ret != 0) {
33         HILOG_ERROR("uv idle start failed %d", ret);
34         return;
35     }
36 }
37 
IdleCallback(uv_idle_t * idleHandler)38 void NativeSafeAsyncWork::IdleCallback(uv_idle_t* idleHandler)
39 {
40     HILOG_INFO("NativeSafeAsyncWork::IdleCallback called");
41     NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::idleHandler_, idleHandler);
42 
43     that->ProcessAsyncHandle();
44 }
45 
CallJs(NativeEngine * engine,NativeValue * js_call_func,void * context,void * data)46 void NativeSafeAsyncWork::CallJs(NativeEngine* engine, NativeValue* js_call_func, void* context, void* data)
47 {
48     HILOG_INFO("NativeSafeAsyncWork::CallJs called");
49 
50     if (engine == nullptr || js_call_func == nullptr) {
51         HILOG_ERROR("CallJs failed. engine or js_call_func is nullptr!");
52         return;
53     }
54 
55     auto value = engine->CreateUndefined();
56     if (value == nullptr) {
57         HILOG_ERROR("CreateUndefined failed");
58         return;
59     }
60 
61     auto resultValue = engine->CallFunction(value, js_call_func, nullptr, 0);
62     if (resultValue == nullptr) {
63         HILOG_ERROR("CallFunction failed");
64     }
65 }
66 // static methods end
67 
NativeSafeAsyncWork(NativeEngine * engine,NativeValue * func,NativeValue * asyncResource,NativeValue * asyncResourceName,size_t maxQueueSize,size_t threadCount,void * finalizeData,NativeFinalize finalizeCallback,void * context,NativeThreadSafeFunctionCallJs callJsCallback)68 NativeSafeAsyncWork::NativeSafeAsyncWork(NativeEngine* engine,
69                                          NativeValue* func,
70                                          NativeValue* asyncResource,
71                                          NativeValue* asyncResourceName,
72                                          size_t maxQueueSize,
73                                          size_t threadCount,
74                                          void* finalizeData,
75                                          NativeFinalize finalizeCallback,
76                                          void* context,
77                                          NativeThreadSafeFunctionCallJs callJsCallback)
78     :engine_(engine), maxQueueSize_(maxQueueSize),
79     threadCount_(threadCount), finalizeData_(finalizeData), finalizeCallback_(finalizeCallback),
80     context_(context), callJsCallback_(callJsCallback)
81 {
82     errno_t err = EOK;
83     err = memset_s(&asyncContext_, sizeof(asyncContext_), 0, sizeof(asyncContext_));
84     if (err != EOK) {
85         return;
86     }
87 
88     asyncContext_.asyncResource = asyncResource;
89     asyncContext_.asyncResourceName = asyncResourceName;
90     if (func != nullptr) {
91         uint32_t initialRefcount = 1;
92         ref_ = engine->CreateReference(func, initialRefcount);
93     }
94 }
95 
~NativeSafeAsyncWork()96 NativeSafeAsyncWork::~NativeSafeAsyncWork()
97 {
98     if (ref_ != nullptr) {
99         delete ref_;
100         ref_ = nullptr;
101     }
102 }
103 
Init()104 bool NativeSafeAsyncWork::Init()
105 {
106     HILOG_INFO("NativeSafeAsyncWork::Init called");
107 
108     uv_loop_t* loop = engine_->GetUVLoop();
109     if (loop == nullptr) {
110         HILOG_ERROR("Get loop failed");
111         return false;
112     }
113 
114     int ret = uv_async_init(loop, &asyncHandler_, AsyncCallback);
115     if (ret != 0) {
116         HILOG_ERROR("uv async init failed %d", ret);
117         return false;
118     }
119 
120     ret = uv_idle_init(loop, &idleHandler_);
121     if (ret != 0) {
122         HILOG_ERROR("uv idle init failed %d", ret);
123         uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), nullptr);
124         return false;
125     }
126 
127     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_INTE;
128     return true;
129 }
130 
IsMaxQueueSize()131 bool NativeSafeAsyncWork::IsMaxQueueSize()
132 {
133     return (queue_.size() > maxQueueSize_ &&
134            maxQueueSize_ > 0 &&
135            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING &&
136            status_ != SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED);
137 }
138 
Send(void * data,NativeThreadSafeFunctionCallMode mode)139 SafeAsyncCode NativeSafeAsyncWork::Send(void* data, NativeThreadSafeFunctionCallMode mode)
140 {
141     HILOG_INFO("NativeSafeAsyncWork::Send called");
142 
143     std::unique_lock<std::mutex> lock(mutex_);
144     if (IsMaxQueueSize()) {
145         HILOG_INFO("queue size bigger than max queue size");
146         if (mode == NATIVE_TSFUNC_BLOCKING) {
147             while (IsMaxQueueSize()) {
148                 condition_.wait(lock);
149             }
150         } else {
151             return SafeAsyncCode::SAFE_ASYNC_QUEUE_FULL;
152         }
153     }
154 
155     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
156         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
157         if (threadCount_ == 0) {
158             return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
159         } else {
160             threadCount_--;
161             return SafeAsyncCode::SAFE_ASYNC_CLOSED;
162         }
163     } else {
164         queue_.push(data);
165         auto ret = uv_async_send(&asyncHandler_);
166         if (ret != 0) {
167             HILOG_ERROR("uv async send failed %d", ret);
168             return SafeAsyncCode::SAFE_ASYNC_FAILED;
169         }
170     }
171 
172     return SafeAsyncCode::SAFE_ASYNC_OK;
173 }
174 
Acquire()175 SafeAsyncCode NativeSafeAsyncWork::Acquire()
176 {
177     HILOG_INFO("NativeSafeAsyncWork::Acquire called");
178 
179     std::unique_lock<std::mutex> lock(mutex_);
180 
181     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
182         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
183         HILOG_WARN("Do not acquire, thread is closed!");
184         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
185     }
186 
187     // increase thread count
188     threadCount_++;
189 
190     return SafeAsyncCode::SAFE_ASYNC_OK;
191 }
192 
Release(NativeThreadSafeFunctionReleaseMode mode)193 SafeAsyncCode NativeSafeAsyncWork::Release(NativeThreadSafeFunctionReleaseMode mode)
194 {
195     HILOG_INFO("NativeSafeAsyncWork::Release called");
196 
197     std::unique_lock<std::mutex> lock(mutex_);
198 
199     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED ||
200         status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
201         HILOG_WARN("Do not release, thread is closed!");
202         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
203     }
204 
205     if (threadCount_ == 0) {
206         HILOG_ERROR("Do not release, thread count is zero.");
207         return SafeAsyncCode::SAFE_ASYNC_INVALID_ARGS;
208     }
209 
210     // decrease thread count
211     threadCount_--;
212 
213     if (mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
214         status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING;
215         if (maxQueueSize_ > 0) {
216             condition_.notify_one();
217         }
218     }
219 
220     if (threadCount_ == 0 ||
221         mode == NativeThreadSafeFunctionReleaseMode::NATIVE_TSFUNC_ABORT) {
222         // trigger async handle
223         auto ret = uv_async_send(&asyncHandler_);
224         if (ret != 0) {
225             HILOG_ERROR("uv async send failed %d", ret);
226             return SafeAsyncCode::SAFE_ASYNC_FAILED;
227         }
228     }
229 
230     return SafeAsyncCode::SAFE_ASYNC_OK;
231 }
232 
Ref()233 bool NativeSafeAsyncWork::Ref()
234 {
235     if (!IsSameTid()) {
236         HILOG_ERROR("tid not same");
237         return false;
238     }
239 
240     uv_ref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
241     uv_ref(reinterpret_cast<uv_handle_t*>(&idleHandler_));
242 
243     return true;
244 }
245 
Unref()246 bool NativeSafeAsyncWork::Unref()
247 {
248     if (!IsSameTid()) {
249         HILOG_ERROR("tid not same");
250         return false;
251     }
252 
253     uv_unref(reinterpret_cast<uv_handle_t*>(&asyncHandler_));
254     uv_unref(reinterpret_cast<uv_handle_t*>(&idleHandler_));
255 
256     return true;
257 }
258 
GetContext()259 void* NativeSafeAsyncWork::GetContext()
260 {
261     return context_;
262 }
263 
ProcessAsyncHandle()264 void NativeSafeAsyncWork::ProcessAsyncHandle()
265 {
266     HILOG_INFO("NativeSafeAsyncWork::ProcessAsyncHandle called");
267 
268     std::unique_lock<std::mutex> lock(mutex_);
269     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
270         HILOG_ERROR("Process failed, thread is closed!");
271         return;
272     }
273 
274     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSING) {
275         HILOG_ERROR("thread is closing!");
276 
277         if (uv_idle_stop(&idleHandler_) != 0) {
278             HILOG_ERROR("uv idle stop failed");
279         }
280 
281         CloseHandles();
282         return;
283     }
284 
285     size_t size = queue_.size();
286     void* data = nullptr;
287 
288     HILOG_INFO("queue size %d", (int32_t)size);
289     while (size > 0) {
290         data = queue_.front();
291 
292         // when queue is full, notify send.
293         if (size == maxQueueSize_ && maxQueueSize_ > 0) {
294             condition_.notify_one();
295         }
296 
297         NativeValue* func_ = (ref_ == nullptr) ? nullptr : ref_->Get();
298         if (callJsCallback_ != nullptr) {
299             callJsCallback_(engine_, func_, context_, data);
300         } else {
301             CallJs(engine_, func_, context_, data);
302         }
303         queue_.pop();
304         size--;
305     }
306 
307     if (size == 0) {
308         if (uv_idle_stop(&idleHandler_) != 0) {
309             HILOG_ERROR("uv idle stop failed");
310         }
311         if (threadCount_ == 0) {
312             CloseHandles();
313         }
314     }
315 }
316 
CloseHandles()317 SafeAsyncCode NativeSafeAsyncWork::CloseHandles()
318 {
319     HILOG_INFO("NativeSafeAsyncWork::CloseHandles called");
320 
321     if (status_ == SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED) {
322         HILOG_INFO("Close failed, thread is closed!");
323         return SafeAsyncCode::SAFE_ASYNC_CLOSED;
324     }
325 
326     status_ = SafeAsyncStatus::SAFE_ASYNC_STATUS_CLOSED;
327 
328     // close async handler
329     uv_close(reinterpret_cast<uv_handle_t*>(&asyncHandler_), [](uv_handle_t* handle) {
330         NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::asyncHandler_,
331             reinterpret_cast<uv_async_t*>(handle));
332         // close idle handler
333         uv_close(reinterpret_cast<uv_handle_t*>(&that->idleHandler_), [](uv_handle_t* handle) {
334             NativeSafeAsyncWork* that = NativeAsyncWork::DereferenceOf(&NativeSafeAsyncWork::idleHandler_,
335                 reinterpret_cast<uv_idle_t*>(handle));
336             that->CleanUp();
337         });
338     });
339 
340     return SafeAsyncCode::SAFE_ASYNC_OK;
341 }
342 
CleanUp()343 void NativeSafeAsyncWork::CleanUp()
344 {
345     HILOG_INFO("NativeSafeAsyncWork::CleanUp called");
346 
347     if (finalizeCallback_ != nullptr) {
348         finalizeCallback_(engine_, finalizeData_, context_);
349     }
350 
351     // clean data
352     while (!queue_.empty()) {
353         if (callJsCallback_ != nullptr) {
354             callJsCallback_(nullptr, nullptr, context_, queue_.front());
355         } else {
356             CallJs(nullptr, nullptr, context_, queue_.front());
357         }
358         queue_.pop();
359     }
360 }
361 
IsSameTid()362 bool NativeSafeAsyncWork::IsSameTid()
363 {
364     auto tid = pthread_self();
365     return (tid == engine_->GetTid()) ? true : false;
366 }
367