• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // For the purpose of this test we use libuv's threading library. When deciding
2 // on a threading library for a new project it bears remembering that in the
3 // future libuv may introduce API changes which may render it non-ABI-stable,
4 // which, in turn, may affect the ABI stability of the project despite its use
5 // of N-API.
6 #include <uv.h>
7 #include <node_api.h>
8 #include "../../js-native-api/common.h"
9 
10 #define ARRAY_LENGTH 10000
11 #define MAX_QUEUE_SIZE 2
12 
13 static uv_thread_t uv_threads[2];
14 static napi_threadsafe_function ts_fn;
15 
16 typedef struct {
17   napi_threadsafe_function_call_mode block_on_full;
18   napi_threadsafe_function_release_mode abort;
19   bool start_secondary;
20   napi_ref js_finalize_cb;
21   uint32_t max_queue_size;
22 } ts_fn_hint;
23 
24 static ts_fn_hint ts_info;
25 
26 // Thread data to transmit to JS
27 static int ints[ARRAY_LENGTH];
28 
secondary_thread(void * data)29 static void secondary_thread(void* data) {
30   napi_threadsafe_function ts_fn = data;
31 
32   if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
33     napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH,
34         "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
35   }
36 }
37 
38 // Source thread producing the data
data_source_thread(void * data)39 static void data_source_thread(void* data) {
40   napi_threadsafe_function ts_fn = data;
41   int index;
42   void* hint;
43   ts_fn_hint *ts_fn_info;
44   napi_status status;
45   bool queue_was_full = false;
46   bool queue_was_closing = false;
47 
48   if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) {
49     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
50         "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH);
51   }
52 
53   ts_fn_info = (ts_fn_hint *)hint;
54 
55   if (ts_fn_info != &ts_info) {
56     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
57       "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH);
58   }
59 
60   if (ts_fn_info->start_secondary) {
61     if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) {
62       napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
63         "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH);
64     }
65 
66     if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) {
67       napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
68         "failed to start secondary thread", NAPI_AUTO_LENGTH);
69     }
70   }
71 
72   for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
73     status = napi_call_threadsafe_function(ts_fn, &ints[index],
74         ts_fn_info->block_on_full);
75     if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
76       // Let's make this thread really busy for 200 ms to give the main thread a
77       // chance to abort.
78       uint64_t start = uv_hrtime();
79       for (; uv_hrtime() - start < 200000000;);
80     }
81     switch (status) {
82       case napi_queue_full:
83         queue_was_full = true;
84         index++;
85         // fall through
86 
87       case napi_ok:
88         continue;
89 
90       case napi_closing:
91         queue_was_closing = true;
92         break;
93 
94       default:
95         napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
96             "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH);
97     }
98   }
99 
100   // Assert that the enqueuing of a value was refused at least once, if this is
101   // a non-blocking test run.
102   if (!ts_fn_info->block_on_full && !queue_was_full) {
103     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
104         "queue was never full", NAPI_AUTO_LENGTH);
105   }
106 
107   // Assert that the queue was marked as closing at least once, if this is an
108   // aborting test run.
109   if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) {
110     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
111       "queue was never closing", NAPI_AUTO_LENGTH);
112   }
113 
114   if (!queue_was_closing &&
115       napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
116     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
117         "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
118   }
119 }
120 
121 // Getting the data into JS
call_js(napi_env env,napi_value cb,void * hint,void * data)122 static void call_js(napi_env env, napi_value cb, void* hint, void* data) {
123   if (!(env == NULL || cb == NULL)) {
124     napi_value argv, undefined;
125     NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
126     NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
127     NODE_API_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv,
128         NULL));
129   }
130 }
131 
132 static napi_ref alt_ref;
133 // Getting the data into JS with the alternative reference
call_ref(napi_env env,napi_value _,void * hint,void * data)134 static void call_ref(napi_env env, napi_value _, void* hint, void* data) {
135   if (!(env == NULL || alt_ref == NULL)) {
136     napi_value fn, argv, undefined;
137     NODE_API_CALL_RETURN_VOID(env, napi_get_reference_value(env, alt_ref, &fn));
138     NODE_API_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
139     NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
140     NODE_API_CALL_RETURN_VOID(env, napi_call_function(env, undefined, fn, 1, &argv,
141         NULL));
142   }
143 }
144 
145 // Cleanup
StopThread(napi_env env,napi_callback_info info)146 static napi_value StopThread(napi_env env, napi_callback_info info) {
147   size_t argc = 2;
148   napi_value argv[2];
149   NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
150   napi_valuetype value_type;
151   NODE_API_CALL(env, napi_typeof(env, argv[0], &value_type));
152   NODE_API_ASSERT(env, value_type == napi_function,
153       "StopThread argument is a function");
154   NODE_API_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function");
155   NODE_API_CALL(env,
156       napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb)));
157   bool abort;
158   NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort));
159   NODE_API_CALL(env,
160       napi_release_threadsafe_function(ts_fn,
161           abort ? napi_tsfn_abort : napi_tsfn_release));
162   ts_fn = NULL;
163   return NULL;
164 }
165 
166 // Join the thread and inform JS that we're done.
join_the_threads(napi_env env,void * data,void * hint)167 static void join_the_threads(napi_env env, void *data, void *hint) {
168   uv_thread_t *the_threads = data;
169   ts_fn_hint *the_hint = hint;
170   napi_value js_cb, undefined;
171 
172   uv_thread_join(&the_threads[0]);
173   if (the_hint->start_secondary) {
174     uv_thread_join(&the_threads[1]);
175   }
176 
177   NODE_API_CALL_RETURN_VOID(env,
178       napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb));
179   NODE_API_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
180   NODE_API_CALL_RETURN_VOID(env,
181       napi_call_function(env, undefined, js_cb, 0, NULL, NULL));
182   NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env,
183       the_hint->js_finalize_cb));
184   if (alt_ref != NULL) {
185     NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, alt_ref));
186     alt_ref = NULL;
187   }
188 }
189 
StartThreadInternal(napi_env env,napi_callback_info info,napi_threadsafe_function_call_js cb,bool block_on_full,bool alt_ref_js_cb)190 static napi_value StartThreadInternal(napi_env env,
191                                       napi_callback_info info,
192                                       napi_threadsafe_function_call_js cb,
193                                       bool block_on_full,
194                                       bool alt_ref_js_cb) {
195 
196   size_t argc = 4;
197   napi_value argv[4];
198 
199   NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
200   if (alt_ref_js_cb) {
201     NODE_API_CALL(env, napi_create_reference(env, argv[0], 1, &alt_ref));
202     argv[0] = NULL;
203   }
204 
205   ts_info.block_on_full =
206       (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
207 
208   NODE_API_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function");
209   napi_value async_name;
210   NODE_API_CALL(env, napi_create_string_utf8(env,
211       "N-API Thread-safe Function Test", NAPI_AUTO_LENGTH, &async_name));
212   NODE_API_CALL(env,
213       napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size));
214   NODE_API_CALL(env, napi_create_threadsafe_function(env,
215                                                      argv[0],
216                                                      NULL,
217                                                      async_name,
218                                                      ts_info.max_queue_size,
219                                                      2,
220                                                      uv_threads,
221                                                      join_the_threads,
222                                                      &ts_info,
223                                                      cb,
224                                                      &ts_fn));
225   bool abort;
226   NODE_API_CALL(env, napi_get_value_bool(env, argv[1], &abort));
227   ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
228   NODE_API_CALL(env,
229       napi_get_value_bool(env, argv[2], &(ts_info.start_secondary)));
230 
231   NODE_API_ASSERT(env,
232       (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0),
233       "Thread creation");
234 
235   return NULL;
236 }
237 
Unref(napi_env env,napi_callback_info info)238 static napi_value Unref(napi_env env, napi_callback_info info) {
239   NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
240   NODE_API_CALL(env, napi_unref_threadsafe_function(env, ts_fn));
241   return NULL;
242 }
243 
Release(napi_env env,napi_callback_info info)244 static napi_value Release(napi_env env, napi_callback_info info) {
245   NODE_API_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
246   NODE_API_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release));
247   return NULL;
248 }
249 
250 // Startup
StartThread(napi_env env,napi_callback_info info)251 static napi_value StartThread(napi_env env, napi_callback_info info) {
252   return StartThreadInternal(env, info, call_js,
253     /** block_on_full */true, /** alt_ref_js_cb */false);
254 }
255 
StartThreadNonblocking(napi_env env,napi_callback_info info)256 static napi_value StartThreadNonblocking(napi_env env,
257                                          napi_callback_info info) {
258   return StartThreadInternal(env, info, call_js,
259     /** block_on_full */false, /** alt_ref_js_cb */false);
260 }
261 
StartThreadNoNative(napi_env env,napi_callback_info info)262 static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) {
263   return StartThreadInternal(env, info, NULL,
264     /** block_on_full */true, /** alt_ref_js_cb */false);
265 }
266 
StartThreadNoJsFunc(napi_env env,napi_callback_info info)267 static napi_value StartThreadNoJsFunc(napi_env env, napi_callback_info info) {
268   return StartThreadInternal(env, info, call_ref,
269     /** block_on_full */true, /** alt_ref_js_cb */true);
270 }
271 
272 // Testing calling into JavaScript
ThreadSafeFunctionFinalize(napi_env env,void * finalize_data,void * finalize_hint)273 static void ThreadSafeFunctionFinalize(napi_env env,
274                               void* finalize_data,
275                               void* finalize_hint) {
276   napi_ref js_func_ref = (napi_ref) finalize_data;
277   napi_value js_func;
278   napi_value recv;
279   NODE_API_CALL_RETURN_VOID(env, napi_get_reference_value(env, js_func_ref, &js_func));
280   NODE_API_CALL_RETURN_VOID(env, napi_get_global(env, &recv));
281   NODE_API_CALL_RETURN_VOID(env, napi_call_function(env, recv, js_func, 0, NULL, NULL));
282   NODE_API_CALL_RETURN_VOID(env, napi_delete_reference(env, js_func_ref));
283 }
284 
285 // Testing calling into JavaScript
CallIntoModule(napi_env env,napi_callback_info info)286 static napi_value CallIntoModule(napi_env env, napi_callback_info info) {
287   size_t argc = 4;
288   napi_value argv[4];
289   NODE_API_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
290 
291   napi_ref finalize_func;
292   NODE_API_CALL(env, napi_create_reference(env, argv[3], 1, &finalize_func));
293 
294   napi_threadsafe_function tsfn;
295   NODE_API_CALL(env, napi_create_threadsafe_function(env, argv[0], argv[1], argv[2], 0, 1, finalize_func, ThreadSafeFunctionFinalize, NULL, NULL, &tsfn));
296   NODE_API_CALL(env, napi_call_threadsafe_function(tsfn, NULL, napi_tsfn_blocking));
297   NODE_API_CALL(env, napi_release_threadsafe_function(tsfn, napi_tsfn_release));
298   return NULL;
299 }
300 
301 // Module init
Init(napi_env env,napi_value exports)302 static napi_value Init(napi_env env, napi_value exports) {
303   size_t index;
304   for (index = 0; index < ARRAY_LENGTH; index++) {
305     ints[index] = index;
306   }
307   napi_value js_array_length, js_max_queue_size;
308   napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
309   napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size);
310 
311   napi_property_descriptor properties[] = {
312     {
313       "ARRAY_LENGTH",
314       NULL,
315       NULL,
316       NULL,
317       NULL,
318       js_array_length,
319       napi_enumerable,
320       NULL
321     },
322     {
323       "MAX_QUEUE_SIZE",
324       NULL,
325       NULL,
326       NULL,
327       NULL,
328       js_max_queue_size,
329       napi_enumerable,
330       NULL
331     },
332     DECLARE_NODE_API_PROPERTY("StartThread", StartThread),
333     DECLARE_NODE_API_PROPERTY("StartThreadNoNative", StartThreadNoNative),
334     DECLARE_NODE_API_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
335     DECLARE_NODE_API_PROPERTY("StartThreadNoJsFunc", StartThreadNoJsFunc),
336     DECLARE_NODE_API_PROPERTY("StopThread", StopThread),
337     DECLARE_NODE_API_PROPERTY("Unref", Unref),
338     DECLARE_NODE_API_PROPERTY("Release", Release),
339     DECLARE_NODE_API_PROPERTY("CallIntoModule", CallIntoModule),
340   };
341 
342   NODE_API_CALL(env, napi_define_properties(env, exports,
343     sizeof(properties)/sizeof(properties[0]), properties));
344 
345   return exports;
346 }
347 NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)
348