1 // For the purpose of this test we use libuv's threading library. When deciding
2 // on a threading library for a new project it bears remembering that in the
3 // future libuv may introduce API changes which may render it non-ABI-stable,
4 // which, in turn, may affect the ABI stability of the project despite its use
5 // of N-API.
6 #include <uv.h>
7 #include <node_api.h>
8 #include "../../js-native-api/common.h"
9
10 #define ARRAY_LENGTH 10000
11 #define MAX_QUEUE_SIZE 2
12
13 static uv_thread_t uv_threads[2];
14 static napi_threadsafe_function ts_fn;
15
16 typedef struct {
17 napi_threadsafe_function_call_mode block_on_full;
18 napi_threadsafe_function_release_mode abort;
19 bool start_secondary;
20 napi_ref js_finalize_cb;
21 uint32_t max_queue_size;
22 } ts_fn_hint;
23
24 static ts_fn_hint ts_info;
25
26 // Thread data to transmit to JS
27 static int ints[ARRAY_LENGTH];
28
secondary_thread(void * data)29 static void secondary_thread(void* data) {
30 napi_threadsafe_function ts_fn = data;
31
32 if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
33 napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH,
34 "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
35 }
36 }
37
38 // Source thread producing the data
data_source_thread(void * data)39 static void data_source_thread(void* data) {
40 napi_threadsafe_function ts_fn = data;
41 int index;
42 void* hint;
43 ts_fn_hint *ts_fn_info;
44 napi_status status;
45 bool queue_was_full = false;
46 bool queue_was_closing = false;
47
48 if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) {
49 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
50 "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH);
51 }
52
53 ts_fn_info = (ts_fn_hint *)hint;
54
55 if (ts_fn_info != &ts_info) {
56 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
57 "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH);
58 }
59
60 if (ts_fn_info->start_secondary) {
61 if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) {
62 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
63 "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH);
64 }
65
66 if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) {
67 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
68 "failed to start secondary thread", NAPI_AUTO_LENGTH);
69 }
70 }
71
72 for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
73 status = napi_call_threadsafe_function(ts_fn, &ints[index],
74 ts_fn_info->block_on_full);
75 if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
76 // Let's make this thread really busy for 200 ms to give the main thread a
77 // chance to abort.
78 uint64_t start = uv_hrtime();
79 for (; uv_hrtime() - start < 200000000;);
80 }
81 switch (status) {
82 case napi_queue_full:
83 queue_was_full = true;
84 index++;
85 // fall through
86
87 case napi_ok:
88 continue;
89
90 case napi_closing:
91 queue_was_closing = true;
92 break;
93
94 default:
95 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
96 "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH);
97 }
98 }
99
100 // Assert that the enqueuing of a value was refused at least once, if this is
101 // a non-blocking test run.
102 if (!ts_fn_info->block_on_full && !queue_was_full) {
103 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
104 "queue was never full", NAPI_AUTO_LENGTH);
105 }
106
107 // Assert that the queue was marked as closing at least once, if this is an
108 // aborting test run.
109 if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) {
110 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
111 "queue was never closing", NAPI_AUTO_LENGTH);
112 }
113
114 if (!queue_was_closing &&
115 napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
116 napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
117 "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
118 }
119 }
120
121 // Getting the data into JS
call_js(napi_env env,napi_value cb,void * hint,void * data)122 static void call_js(napi_env env, napi_value cb, void* hint, void* data) {
123 if (!(env == NULL || cb == NULL)) {
124 napi_value argv, undefined;
125 NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
126 NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
127 NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv,
128 NULL));
129 }
130 }
131
132 static napi_ref alt_ref;
133 // Getting the data into JS with the alternative reference
call_ref(napi_env env,napi_value _,void * hint,void * data)134 static void call_ref(napi_env env, napi_value _, void* hint, void* data) {
135 if (!(env == NULL || alt_ref == NULL)) {
136 napi_value fn, argv, undefined;
137 NAPI_CALL_RETURN_VOID(env, napi_get_reference_value(env, alt_ref, &fn));
138 NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
139 NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
140 NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, fn, 1, &argv,
141 NULL));
142 }
143 }
144
145 // Cleanup
StopThread(napi_env env,napi_callback_info info)146 static napi_value StopThread(napi_env env, napi_callback_info info) {
147 size_t argc = 2;
148 napi_value argv[2];
149 NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
150 napi_valuetype value_type;
151 NAPI_CALL(env, napi_typeof(env, argv[0], &value_type));
152 NAPI_ASSERT(env, value_type == napi_function,
153 "StopThread argument is a function");
154 NAPI_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function");
155 NAPI_CALL(env,
156 napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb)));
157 bool abort;
158 NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
159 NAPI_CALL(env,
160 napi_release_threadsafe_function(ts_fn,
161 abort ? napi_tsfn_abort : napi_tsfn_release));
162 ts_fn = NULL;
163 return NULL;
164 }
165
166 // Join the thread and inform JS that we're done.
join_the_threads(napi_env env,void * data,void * hint)167 static void join_the_threads(napi_env env, void *data, void *hint) {
168 uv_thread_t *the_threads = data;
169 ts_fn_hint *the_hint = hint;
170 napi_value js_cb, undefined;
171
172 uv_thread_join(&the_threads[0]);
173 if (the_hint->start_secondary) {
174 uv_thread_join(&the_threads[1]);
175 }
176
177 NAPI_CALL_RETURN_VOID(env,
178 napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb));
179 NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
180 NAPI_CALL_RETURN_VOID(env,
181 napi_call_function(env, undefined, js_cb, 0, NULL, NULL));
182 NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env,
183 the_hint->js_finalize_cb));
184 if (alt_ref != NULL) {
185 NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env, alt_ref));
186 alt_ref = NULL;
187 }
188 }
189
StartThreadInternal(napi_env env,napi_callback_info info,napi_threadsafe_function_call_js cb,bool block_on_full,bool alt_ref_js_cb)190 static napi_value StartThreadInternal(napi_env env,
191 napi_callback_info info,
192 napi_threadsafe_function_call_js cb,
193 bool block_on_full,
194 bool alt_ref_js_cb) {
195
196 size_t argc = 4;
197 napi_value argv[4];
198
199 NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
200 if (alt_ref_js_cb) {
201 NAPI_CALL(env, napi_create_reference(env, argv[0], 1, &alt_ref));
202 argv[0] = NULL;
203 }
204
205 ts_info.block_on_full =
206 (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
207
208 NAPI_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function");
209 napi_value async_name;
210 NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test",
211 NAPI_AUTO_LENGTH, &async_name));
212 NAPI_CALL(env, napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size));
213 NAPI_CALL(env, napi_create_threadsafe_function(env,
214 argv[0],
215 NULL,
216 async_name,
217 ts_info.max_queue_size,
218 2,
219 uv_threads,
220 join_the_threads,
221 &ts_info,
222 cb,
223 &ts_fn));
224 bool abort;
225 NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
226 ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
227 NAPI_CALL(env, napi_get_value_bool(env, argv[2], &(ts_info.start_secondary)));
228
229 NAPI_ASSERT(env,
230 (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0),
231 "Thread creation");
232
233 return NULL;
234 }
235
Unref(napi_env env,napi_callback_info info)236 static napi_value Unref(napi_env env, napi_callback_info info) {
237 NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
238 NAPI_CALL(env, napi_unref_threadsafe_function(env, ts_fn));
239 return NULL;
240 }
241
Release(napi_env env,napi_callback_info info)242 static napi_value Release(napi_env env, napi_callback_info info) {
243 NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
244 NAPI_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release));
245 return NULL;
246 }
247
248 // Startup
StartThread(napi_env env,napi_callback_info info)249 static napi_value StartThread(napi_env env, napi_callback_info info) {
250 return StartThreadInternal(env, info, call_js,
251 /** block_on_full */true, /** alt_ref_js_cb */false);
252 }
253
StartThreadNonblocking(napi_env env,napi_callback_info info)254 static napi_value StartThreadNonblocking(napi_env env,
255 napi_callback_info info) {
256 return StartThreadInternal(env, info, call_js,
257 /** block_on_full */false, /** alt_ref_js_cb */false);
258 }
259
StartThreadNoNative(napi_env env,napi_callback_info info)260 static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) {
261 return StartThreadInternal(env, info, NULL,
262 /** block_on_full */true, /** alt_ref_js_cb */false);
263 }
264
StartThreadNoJsFunc(napi_env env,napi_callback_info info)265 static napi_value StartThreadNoJsFunc(napi_env env, napi_callback_info info) {
266 return StartThreadInternal(env, info, call_ref,
267 /** block_on_full */true, /** alt_ref_js_cb */true);
268 }
269
270 // Module init
Init(napi_env env,napi_value exports)271 static napi_value Init(napi_env env, napi_value exports) {
272 size_t index;
273 for (index = 0; index < ARRAY_LENGTH; index++) {
274 ints[index] = index;
275 }
276 napi_value js_array_length, js_max_queue_size;
277 napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
278 napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size);
279
280 napi_property_descriptor properties[] = {
281 {
282 "ARRAY_LENGTH",
283 NULL,
284 NULL,
285 NULL,
286 NULL,
287 js_array_length,
288 napi_enumerable,
289 NULL
290 },
291 {
292 "MAX_QUEUE_SIZE",
293 NULL,
294 NULL,
295 NULL,
296 NULL,
297 js_max_queue_size,
298 napi_enumerable,
299 NULL
300 },
301 DECLARE_NAPI_PROPERTY("StartThread", StartThread),
302 DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative),
303 DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
304 DECLARE_NAPI_PROPERTY("StartThreadNoJsFunc", StartThreadNoJsFunc),
305 DECLARE_NAPI_PROPERTY("StopThread", StopThread),
306 DECLARE_NAPI_PROPERTY("Unref", Unref),
307 DECLARE_NAPI_PROPERTY("Release", Release),
308 };
309
310 NAPI_CALL(env, napi_define_properties(env, exports,
311 sizeof(properties)/sizeof(properties[0]), properties));
312
313 return exports;
314 }
315 NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)
316