1 // For the purpose of this test we use libuv's threading library. When deciding
2 // on a threading library for a new project it bears remembering that in the
3 // future libuv may introduce API changes which may render it non-ABI-stable,
4 // which, in turn, may affect the ABI stability of the project despite its use
5 // of N-API.
6 #include <uv.h>
7 #include <node_api.h>
8 #include "../../js-native-api/common.h"
9
10 #define ARRAY_LENGTH 10000
11 #define MAX_QUEUE_SIZE 2
12
13 static uv_thread_t uv_threads[2];
14 static napi_threadsafe_function ts_fn;
15
16 typedef struct {
17 napi_threadsafe_function_call_mode block_on_full;
18 napi_threadsafe_function_release_mode abort;
19 bool start_secondary;
20 napi_ref js_finalize_cb;
21 uint32_t max_queue_size;
22 } ts_fn_hint;
23
24 static ts_fn_hint ts_info;
25
26 // Thread data to transmit to JS
27 static int ints[ARRAY_LENGTH];
28
secondary_thread(void * data)29 static void secondary_thread(void* data) {
30 napi_threadsafe_function ts_fn = data;
31
32 if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
33 napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH,
34 "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
35 }
36 }
37
38 // Source thread producing the data
data_source_thread(void * data)39 static void data_source_thread(void* data) {
40 napi_threadsafe_function ts_fn = data;
41 int index;
42 void* hint;
43 ts_fn_hint *ts_fn_info;
44 napi_status status;
45 bool queue_was_full = false;
46 bool queue_was_closing = false;
47
48 if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) {
49 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
50 "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH);
51 }
52
53 ts_fn_info = (ts_fn_hint *)hint;
54
55 if (ts_fn_info != &ts_info) {
56 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
57 "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH);
58 }
59
60 if (ts_fn_info->start_secondary) {
61 if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) {
62 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
63 "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH);
64 }
65
66 if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) {
67 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
68 "failed to start secondary thread", NAPI_AUTO_LENGTH);
69 }
70 }
71
72 for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
73 status = napi_call_threadsafe_function(ts_fn, &ints[index],
74 ts_fn_info->block_on_full);
75 if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
76 // Let's make this thread really busy for 200 ms to give the main thread a
77 // chance to abort.
78 uint64_t start = uv_hrtime();
79 for (; uv_hrtime() - start < 200000000;);
80 }
81 switch (status) {
82 case napi_queue_full:
83 queue_was_full = true;
84 index++;
85 // fall through
86
87 case napi_ok:
88 continue;
89
90 case napi_closing:
91 queue_was_closing = true;
92 break;
93
94 default:
95 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
96 "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH);
97 }
98 }
99
100 // Assert that the enqueuing of a value was refused at least once, if this is
101 // a non-blocking test run.
102 if (!ts_fn_info->block_on_full && !queue_was_full) {
103 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
104 "queue was never full", NAPI_AUTO_LENGTH);
105 }
106
107 // Assert that the queue was marked as closing at least once, if this is an
108 // aborting test run.
109 if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) {
110 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
111 "queue was never closing", NAPI_AUTO_LENGTH);
112 }
113
114 if (!queue_was_closing &&
115 napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
116 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
117 "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
118 }
119 }
120
121 // Getting the data into JS
call_js(napi_env env,napi_value cb,void * hint,void * data)122 static void call_js(napi_env env, napi_value cb, void* hint, void* data) {
123 if (!(env == NULL || cb == NULL)) {
124 napi_value argv, undefined;
125 NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
126 NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
127 NODE_API_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv,
128 NULL));
129 }
130 }
131
132 static napi_ref alt_ref;
133 // Getting the data into JS with the alternative reference
call_ref(napi_env env,napi_value _,void * hint,void * data)134 static void call_ref(napi_env env, napi_value _, void* hint, void* data) {
135 if (!(env == NULL || alt_ref == NULL)) {
136 napi_value fn, argv, undefined;
137 NODE_API_CALL_RETURN_VOID(env, napi_get_reference_value(env, alt_ref, &fn));
138 NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
139 NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
140 NODE_API_CALL_RETURN_VOID(env, napi_call_function(env, undefined, fn, 1, &argv,
141 NULL));
142 }
143 }
144
145 // Cleanup
StopThread(napi_env env,napi_callback_info info)146 static napi_value StopThread(napi_env env, napi_callback_info info) {
147 size_t argc = 2;
148 napi_value argv[2];
149 NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
150 napi_valuetype value_type;
151 NODE_API_CALL(env, napi_typeof(env, argv[0], &value_type));
152 NODE_API_ASSERT(env, value_type == napi_function,
153 "StopThread argument is a function");
154 NODE_API_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function");
155 NODE_API_CALL(env,
156 napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb)));
157 bool abort;
158 NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort));
159 NODE_API_CALL(env,
160 napi_release_threadsafe_function(ts_fn,
161 abort ? napi_tsfn_abort : napi_tsfn_release));
162 ts_fn = NULL;
163 return NULL;
164 }
165
166 // Join the thread and inform JS that we're done.
join_the_threads(napi_env env,void * data,void * hint)167 static void join_the_threads(napi_env env, void *data, void *hint) {
168 uv_thread_t *the_threads = data;
169 ts_fn_hint *the_hint = hint;
170 napi_value js_cb, undefined;
171
172 uv_thread_join(&the_threads[0]);
173 if (the_hint->start_secondary) {
174 uv_thread_join(&the_threads[1]);
175 }
176
177 NODE_API_CALL_RETURN_VOID(env,
178 napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb));
179 NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
180 NODE_API_CALL_RETURN_VOID(env,
181 napi_call_function(env, undefined, js_cb, 0, NULL, NULL));
182 NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env,
183 the_hint->js_finalize_cb));
184 if (alt_ref != NULL) {
185 NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, alt_ref));
186 alt_ref = NULL;
187 }
188 }
189
StartThreadInternal(napi_env env,napi_callback_info info,napi_threadsafe_function_call_js cb,bool block_on_full,bool alt_ref_js_cb)190 static napi_value StartThreadInternal(napi_env env,
191 napi_callback_info info,
192 napi_threadsafe_function_call_js cb,
193 bool block_on_full,
194 bool alt_ref_js_cb) {
195
196 size_t argc = 4;
197 napi_value argv[4];
198
199 NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
200 if (alt_ref_js_cb) {
201 NODE_API_CALL(env, napi_create_reference(env, argv[0], 1, &alt_ref));
202 argv[0] = NULL;
203 }
204
205 ts_info.block_on_full =
206 (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
207
208 NODE_API_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function");
209 napi_value async_name;
210 NODE_API_CALL(env, napi_create_string_utf8(env,
211 "N-API Thread-safe Function Test", NAPI_AUTO_LENGTH, &async_name));
212 NODE_API_CALL(env,
213 napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size));
214 NODE_API_CALL(env, napi_create_threadsafe_function(env,
215 argv[0],
216 NULL,
217 async_name,
218 ts_info.max_queue_size,
219 2,
220 uv_threads,
221 join_the_threads,
222 &ts_info,
223 cb,
224 &ts_fn));
225 bool abort;
226 NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort));
227 ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
228 NODE_API_CALL(env,
229 napi_get_value_bool(env, argv[2], &(ts_info.start_secondary)));
230
231 NODE_API_ASSERT(env,
232 (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0),
233 "Thread creation");
234
235 return NULL;
236 }
237
Unref(napi_env env,napi_callback_info info)238 static napi_value Unref(napi_env env, napi_callback_info info) {
239 NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
240 NODE_API_CALL(env, napi_unref_threadsafe_function(env, ts_fn));
241 return NULL;
242 }
243
Release(napi_env env,napi_callback_info info)244 static napi_value Release(napi_env env, napi_callback_info info) {
245 NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
246 NODE_API_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release));
247 return NULL;
248 }
249
250 // Startup
StartThread(napi_env env,napi_callback_info info)251 static napi_value StartThread(napi_env env, napi_callback_info info) {
252 return StartThreadInternal(env, info, call_js,
253 /** block_on_full */true, /** alt_ref_js_cb */false);
254 }
255
StartThreadNonblocking(napi_env env,napi_callback_info info)256 static napi_value StartThreadNonblocking(napi_env env,
257 napi_callback_info info) {
258 return StartThreadInternal(env, info, call_js,
259 /** block_on_full */false, /** alt_ref_js_cb */false);
260 }
261
StartThreadNoNative(napi_env env,napi_callback_info info)262 static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) {
263 return StartThreadInternal(env, info, NULL,
264 /** block_on_full */true, /** alt_ref_js_cb */false);
265 }
266
StartThreadNoJsFunc(napi_env env,napi_callback_info info)267 static napi_value StartThreadNoJsFunc(napi_env env, napi_callback_info info) {
268 return StartThreadInternal(env, info, call_ref,
269 /** block_on_full */true, /** alt_ref_js_cb */true);
270 }
271
272 // Testing calling into JavaScript
ThreadSafeFunctionFinalize(napi_env env,void * finalize_data,void * finalize_hint)273 static void ThreadSafeFunctionFinalize(napi_env env,
274 void* finalize_data,
275 void* finalize_hint) {
276 napi_ref js_func_ref = (napi_ref) finalize_data;
277 napi_value js_func;
278 napi_value recv;
279 NODE_API_CALL_RETURN_VOID(env, napi_get_reference_value(env, js_func_ref, &js_func));
280 NODE_API_CALL_RETURN_VOID(env, napi_get_global(env, &recv));
281 NODE_API_CALL_RETURN_VOID(env, napi_call_function(env, recv, js_func, 0, NULL, NULL));
282 NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, js_func_ref));
283 }
284
285 // Testing calling into JavaScript
CallIntoModule(napi_env env,napi_callback_info info)286 static napi_value CallIntoModule(napi_env env, napi_callback_info info) {
287 size_t argc = 4;
288 napi_value argv[4];
289 NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
290
291 napi_ref finalize_func;
292 NODE_API_CALL(env, napi_create_reference(env, argv[3], 1, &finalize_func));
293
294 napi_threadsafe_function tsfn;
295 NODE_API_CALL(env, napi_create_threadsafe_function(env, argv[0], argv[1], argv[2], 0, 1, finalize_func, ThreadSafeFunctionFinalize, NULL, NULL, &tsfn));
296 NODE_API_CALL(env, napi_call_threadsafe_function(tsfn, NULL, napi_tsfn_blocking));
297 NODE_API_CALL(env, napi_release_threadsafe_function(tsfn, napi_tsfn_release));
298 return NULL;
299 }
300
301 // Module init
Init(napi_env env,napi_value exports)302 static napi_value Init(napi_env env, napi_value exports) {
303 size_t index;
304 for (index = 0; index < ARRAY_LENGTH; index++) {
305 ints[index] = index;
306 }
307 napi_value js_array_length, js_max_queue_size;
308 napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
309 napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size);
310
311 napi_property_descriptor properties[] = {
312 {
313 "ARRAY_LENGTH",
314 NULL,
315 NULL,
316 NULL,
317 NULL,
318 js_array_length,
319 napi_enumerable,
320 NULL
321 },
322 {
323 "MAX_QUEUE_SIZE",
324 NULL,
325 NULL,
326 NULL,
327 NULL,
328 js_max_queue_size,
329 napi_enumerable,
330 NULL
331 },
332 DECLARE_NODE_API_PROPERTY("StartThread", StartThread),
333 DECLARE_NODE_API_PROPERTY("StartThreadNoNative", StartThreadNoNative),
334 DECLARE_NODE_API_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
335 DECLARE_NODE_API_PROPERTY("StartThreadNoJsFunc", StartThreadNoJsFunc),
336 DECLARE_NODE_API_PROPERTY("StopThread", StopThread),
337 DECLARE_NODE_API_PROPERTY("Unref", Unref),
338 DECLARE_NODE_API_PROPERTY("Release", Release),
339 DECLARE_NODE_API_PROPERTY("CallIntoModule", CallIntoModule),
340 };
341
342 NODE_API_CALL(env, napi_define_properties(env, exports,
343 sizeof(properties)/sizeof(properties[0]), properties));
344
345 return exports;
346 }
347 NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)
348