• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 // For the purpose of this test we use libuv's threading library. When deciding
2 // on a threading library for a new project it bears remembering that in the
3 // future libuv may introduce API changes which may render it non-ABI-stable,
4 // which, in turn, may affect the ABI stability of the project despite its use
5 // of N-API.
6 #include <uv.h>
7 #include <node_api.h>
8 #include "../../js-native-api/common.h"
9 
10 #define ARRAY_LENGTH 10000
11 #define MAX_QUEUE_SIZE 2
12 
13 static uv_thread_t uv_threads[2];
14 static napi_threadsafe_function ts_fn;
15 
16 typedef struct {
17   napi_threadsafe_function_call_mode block_on_full;
18   napi_threadsafe_function_release_mode abort;
19   bool start_secondary;
20   napi_ref js_finalize_cb;
21   uint32_t max_queue_size;
22 } ts_fn_hint;
23 
24 static ts_fn_hint ts_info;
25 
26 // Thread data to transmit to JS
27 static int ints[ARRAY_LENGTH];
28 
secondary_thread(void * data)29 static void secondary_thread(void* data) {
30   napi_threadsafe_function ts_fn = data;
31 
32   if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
33     napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH,
34         "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
35   }
36 }
37 
38 // Source thread producing the data
data_source_thread(void * data)39 static void data_source_thread(void* data) {
40   napi_threadsafe_function ts_fn = data;
41   int index;
42   void* hint;
43   ts_fn_hint *ts_fn_info;
44   napi_status status;
45   bool queue_was_full = false;
46   bool queue_was_closing = false;
47 
48   if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) {
49     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
50         "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH);
51   }
52 
53   ts_fn_info = (ts_fn_hint *)hint;
54 
55   if (ts_fn_info != &ts_info) {
56     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
57       "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH);
58   }
59 
60   if (ts_fn_info->start_secondary) {
61     if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) {
62       napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
63         "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH);
64     }
65 
66     if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) {
67       napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
68         "failed to start secondary thread", NAPI_AUTO_LENGTH);
69     }
70   }
71 
72   for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
73     status = napi_call_threadsafe_function(ts_fn, &ints[index],
74         ts_fn_info->block_on_full);
75     if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
76       // Let's make this thread really busy for 200 ms to give the main thread a
77       // chance to abort.
78       uint64_t start = uv_hrtime();
79       for (; uv_hrtime() - start < 200000000;);
80     }
81     switch (status) {
82       case napi_queue_full:
83         queue_was_full = true;
84         index++;
85         // fall through
86 
87       case napi_ok:
88         continue;
89 
90       case napi_closing:
91         queue_was_closing = true;
92         break;
93 
94       default:
95         napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
96             "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH);
97     }
98   }
99 
100   // Assert that the enqueuing of a value was refused at least once, if this is
101   // a non-blocking test run.
102   if (!ts_fn_info->block_on_full && !queue_was_full) {
103     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
104         "queue was never full", NAPI_AUTO_LENGTH);
105   }
106 
107   // Assert that the queue was marked as closing at least once, if this is an
108   // aborting test run.
109   if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) {
110     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
111       "queue was never closing", NAPI_AUTO_LENGTH);
112   }
113 
114   if (!queue_was_closing &&
115       napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
116     napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
117         "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
118   }
119 }
120 
121 // Getting the data into JS
call_js(napi_env env,napi_value cb,void * hint,void * data)122 static void call_js(napi_env env, napi_value cb, void* hint, void* data) {
123   if (!(env == NULL || cb == NULL)) {
124     napi_value argv, undefined;
125     NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
126     NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
127     NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv,
128         NULL));
129   }
130 }
131 
132 static napi_ref alt_ref;
133 // Getting the data into JS with the alternative reference
call_ref(napi_env env,napi_value _,void * hint,void * data)134 static void call_ref(napi_env env, napi_value _, void* hint, void* data) {
135   if (!(env == NULL || alt_ref == NULL)) {
136     napi_value fn, argv, undefined;
137     NAPI_CALL_RETURN_VOID(env, napi_get_reference_value(env, alt_ref, &fn));
138     NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
139     NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
140     NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, fn, 1, &argv,
141         NULL));
142   }
143 }
144 
145 // Cleanup
StopThread(napi_env env,napi_callback_info info)146 static napi_value StopThread(napi_env env, napi_callback_info info) {
147   size_t argc = 2;
148   napi_value argv[2];
149   NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
150   napi_valuetype value_type;
151   NAPI_CALL(env, napi_typeof(env, argv[0], &value_type));
152   NAPI_ASSERT(env, value_type == napi_function,
153       "StopThread argument is a function");
154   NAPI_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function");
155   NAPI_CALL(env,
156       napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb)));
157   bool abort;
158   NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
159   NAPI_CALL(env,
160       napi_release_threadsafe_function(ts_fn,
161           abort ? napi_tsfn_abort : napi_tsfn_release));
162   ts_fn = NULL;
163   return NULL;
164 }
165 
166 // Join the thread and inform JS that we're done.
join_the_threads(napi_env env,void * data,void * hint)167 static void join_the_threads(napi_env env, void *data, void *hint) {
168   uv_thread_t *the_threads = data;
169   ts_fn_hint *the_hint = hint;
170   napi_value js_cb, undefined;
171 
172   uv_thread_join(&the_threads[0]);
173   if (the_hint->start_secondary) {
174     uv_thread_join(&the_threads[1]);
175   }
176 
177   NAPI_CALL_RETURN_VOID(env,
178       napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb));
179   NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
180   NAPI_CALL_RETURN_VOID(env,
181       napi_call_function(env, undefined, js_cb, 0, NULL, NULL));
182   NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env,
183       the_hint->js_finalize_cb));
184   if (alt_ref != NULL) {
185     NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env, alt_ref));
186     alt_ref = NULL;
187   }
188 }
189 
StartThreadInternal(napi_env env,napi_callback_info info,napi_threadsafe_function_call_js cb,bool block_on_full,bool alt_ref_js_cb)190 static napi_value StartThreadInternal(napi_env env,
191                                       napi_callback_info info,
192                                       napi_threadsafe_function_call_js cb,
193                                       bool block_on_full,
194                                       bool alt_ref_js_cb) {
195 
196   size_t argc = 4;
197   napi_value argv[4];
198 
199   NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
200   if (alt_ref_js_cb) {
201     NAPI_CALL(env, napi_create_reference(env, argv[0], 1, &alt_ref));
202     argv[0] = NULL;
203   }
204 
205   ts_info.block_on_full =
206       (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
207 
208   NAPI_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function");
209   napi_value async_name;
210   NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test",
211       NAPI_AUTO_LENGTH, &async_name));
212   NAPI_CALL(env, napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size));
213   NAPI_CALL(env, napi_create_threadsafe_function(env,
214                                                  argv[0],
215                                                  NULL,
216                                                  async_name,
217                                                  ts_info.max_queue_size,
218                                                  2,
219                                                  uv_threads,
220                                                  join_the_threads,
221                                                  &ts_info,
222                                                  cb,
223                                                  &ts_fn));
224   bool abort;
225   NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
226   ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
227   NAPI_CALL(env, napi_get_value_bool(env, argv[2], &(ts_info.start_secondary)));
228 
229   NAPI_ASSERT(env,
230       (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0),
231       "Thread creation");
232 
233   return NULL;
234 }
235 
Unref(napi_env env,napi_callback_info info)236 static napi_value Unref(napi_env env, napi_callback_info info) {
237   NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
238   NAPI_CALL(env, napi_unref_threadsafe_function(env, ts_fn));
239   return NULL;
240 }
241 
Release(napi_env env,napi_callback_info info)242 static napi_value Release(napi_env env, napi_callback_info info) {
243   NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
244   NAPI_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release));
245   return NULL;
246 }
247 
248 // Startup
StartThread(napi_env env,napi_callback_info info)249 static napi_value StartThread(napi_env env, napi_callback_info info) {
250   return StartThreadInternal(env, info, call_js,
251     /** block_on_full */true, /** alt_ref_js_cb */false);
252 }
253 
StartThreadNonblocking(napi_env env,napi_callback_info info)254 static napi_value StartThreadNonblocking(napi_env env,
255                                          napi_callback_info info) {
256   return StartThreadInternal(env, info, call_js,
257     /** block_on_full */false, /** alt_ref_js_cb */false);
258 }
259 
StartThreadNoNative(napi_env env,napi_callback_info info)260 static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) {
261   return StartThreadInternal(env, info, NULL,
262     /** block_on_full */true, /** alt_ref_js_cb */false);
263 }
264 
StartThreadNoJsFunc(napi_env env,napi_callback_info info)265 static napi_value StartThreadNoJsFunc(napi_env env, napi_callback_info info) {
266   return StartThreadInternal(env, info, call_ref,
267     /** block_on_full */true, /** alt_ref_js_cb */true);
268 }
269 
270 // Module init
Init(napi_env env,napi_value exports)271 static napi_value Init(napi_env env, napi_value exports) {
272   size_t index;
273   for (index = 0; index < ARRAY_LENGTH; index++) {
274     ints[index] = index;
275   }
276   napi_value js_array_length, js_max_queue_size;
277   napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
278   napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size);
279 
280   napi_property_descriptor properties[] = {
281     {
282       "ARRAY_LENGTH",
283       NULL,
284       NULL,
285       NULL,
286       NULL,
287       js_array_length,
288       napi_enumerable,
289       NULL
290     },
291     {
292       "MAX_QUEUE_SIZE",
293       NULL,
294       NULL,
295       NULL,
296       NULL,
297       js_max_queue_size,
298       napi_enumerable,
299       NULL
300     },
301     DECLARE_NAPI_PROPERTY("StartThread", StartThread),
302     DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative),
303     DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
304     DECLARE_NAPI_PROPERTY("StartThreadNoJsFunc", StartThreadNoJsFunc),
305     DECLARE_NAPI_PROPERTY("StopThread", StopThread),
306     DECLARE_NAPI_PROPERTY("Unref", Unref),
307     DECLARE_NAPI_PROPERTY("Release", Release),
308   };
309 
310   NAPI_CALL(env, napi_define_properties(env, exports,
311     sizeof(properties)/sizeof(properties[0]), properties));
312 
313   return exports;
314 }
315 NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)
316