• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  DataViewPrototypeGetBuffer,
5  Int32Array,
6  PromisePrototypeThen,
7  ReflectApply,
8  SafeSet,
9  TypedArrayPrototypeGetBuffer,
10  globalThis: {
11    Atomics: {
12      add: AtomicsAdd,
13      notify: AtomicsNotify,
14    },
15  },
16} = primordials;
17const assert = require('internal/assert');
18const { clearImmediate, setImmediate } = require('timers');
19const {
20  hasUncaughtExceptionCaptureCallback,
21} = require('internal/process/execution');
22const {
23  isArrayBuffer,
24  isDataView,
25  isTypedArray,
26} = require('util/types');
27
28const { receiveMessageOnPort } = require('internal/worker/io');
29const {
30  WORKER_TO_MAIN_THREAD_NOTIFICATION,
31} = require('internal/modules/esm/shared_constants');
32const { initializeHooks } = require('internal/modules/esm/utils');
33
34
35/**
36 * Transfers an ArrayBuffer, TypedArray, or DataView to a worker thread.
37 * @param {boolean} hasError - Whether an error occurred during transfer.
38 * @param {ArrayBuffer | TypedArray | DataView} source - The data to transfer.
39 */
40function transferArrayBuffer(hasError, source) {
41  if (hasError || source == null) { return; }
42  if (isArrayBuffer(source)) { return [source]; }
43  if (isTypedArray(source)) { return [TypedArrayPrototypeGetBuffer(source)]; }
44  if (isDataView(source)) { return [DataViewPrototypeGetBuffer(source)]; }
45}
46
47/**
48 * Wraps a message with a status and body, and serializes the body if necessary.
49 * @param {string} status - The status of the message.
50 * @param {unknown} body - The body of the message.
51 */
52function wrapMessage(status, body) {
53  if (status === 'success' || body === null ||
54     (typeof body !== 'object' &&
55      typeof body !== 'function' &&
56      typeof body !== 'symbol')) {
57    return { status, body };
58  }
59
60  let serialized;
61  let serializationFailed;
62  try {
63    const { serializeError } = require('internal/error_serdes');
64    serialized = serializeError(body);
65  } catch {
66    serializationFailed = true;
67  }
68
69  return {
70    status,
71    body: {
72      serialized,
73      serializationFailed,
74    },
75  };
76}
77
78/**
79 * Initializes a worker thread for a customized module loader.
80 * @param {SharedArrayBuffer} lock - The lock used to synchronize communication between the worker and the main thread.
81 * @param {MessagePort} syncCommPort - The message port used for synchronous communication between the worker and the
82 * main thread.
83 * @param {(err: Error, origin?: string) => void} errorHandler - The function to use for uncaught exceptions.
84 * @returns {Promise<void>} A promise that resolves when the worker thread has been initialized.
85 */
86async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
87  let hooks, preloadScripts, initializationError;
88  let hasInitializationError = false;
89
90  {
91    // If a custom hook is calling `process.exit`, we should wake up the main thread
92    // so it can detect the exit event.
93    const { exit } = process;
94    process.exit = function(code) {
95      syncCommPort.postMessage(wrapMessage('exit', code ?? process.exitCode));
96      AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
97      AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
98      return ReflectApply(exit, this, arguments);
99    };
100  }
101
102
103  try {
104    const initResult = await initializeHooks();
105    hooks = initResult.hooks;
106    preloadScripts = initResult.preloadScripts;
107  } catch (exception) {
108    // If there was an error while parsing and executing a user loader, for example if because a
109    // loader contained a syntax error, then we need to send the error to the main thread so it can
110    // be thrown and printed.
111    hasInitializationError = true;
112    initializationError = exception;
113  }
114
115  syncCommPort.on('message', handleMessage);
116
117  if (hasInitializationError) {
118    syncCommPort.postMessage(wrapMessage('error', initializationError));
119  } else {
120    syncCommPort.postMessage(wrapMessage('success', { preloadScripts }), preloadScripts.map(({ port }) => port));
121  }
122
123  // We're ready, so unlock the main thread.
124  AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
125  AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
126
127  let immediate;
128  /**
129   * Checks for messages on the syncCommPort and handles them asynchronously.
130   */
131  function checkForMessages() {
132    immediate = setImmediate(checkForMessages).unref();
133    // We need to let the event loop tick a few times to give the main thread a chance to send
134    // follow-up messages.
135    const response = receiveMessageOnPort(syncCommPort);
136
137    if (response !== undefined) {
138      PromisePrototypeThen(handleMessage(response.message), undefined, errorHandler);
139    }
140  }
141
142  const unsettledResponsePorts = new SafeSet();
143
144  process.on('beforeExit', () => {
145    for (const port of unsettledResponsePorts) {
146      port.postMessage(wrapMessage('never-settle'));
147    }
148    unsettledResponsePorts.clear();
149
150    AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
151    AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
152
153    // Attach back the event handler.
154    syncCommPort.on('message', handleMessage);
155    // Also check synchronously for a message, in case it's already there.
156    clearImmediate(immediate);
157    checkForMessages();
158    // We don't need the sync check after this tick, as we already have added the event handler.
159    clearImmediate(immediate);
160    // Add some work for next tick so the worker cannot exit.
161    setImmediate(() => {});
162  });
163
164  /**
165   * Handles incoming messages from the main thread or other workers.
166   * @param {object} options - The options object.
167   * @param {string} options.method - The name of the hook.
168   * @param {Array} options.args - The arguments to pass to the method.
169   * @param {MessagePort} options.port - The message port to use for communication.
170   */
171  async function handleMessage({ method, args, port }) {
172    // Each potential exception needs to be caught individually so that the correct error is sent to
173    // the main thread.
174    let hasError = false;
175    let shouldRemoveGlobalErrorHandler = false;
176    assert(typeof hooks[method] === 'function');
177    if (port == null && !hasUncaughtExceptionCaptureCallback()) {
178      // When receiving sync messages, we want to unlock the main thread when there's an exception.
179      process.on('uncaughtException', errorHandler);
180      shouldRemoveGlobalErrorHandler = true;
181    }
182
183    // We are about to yield the execution with `await ReflectApply` below. In case the code
184    // following the `await` never runs, we remove the message handler so the `beforeExit` event
185    // can be triggered.
186    syncCommPort.off('message', handleMessage);
187
188    // We keep checking for new messages to not miss any.
189    clearImmediate(immediate);
190    immediate = setImmediate(checkForMessages).unref();
191
192    unsettledResponsePorts.add(port ?? syncCommPort);
193
194    let response;
195    try {
196      response = await ReflectApply(hooks[method], hooks, args);
197    } catch (exception) {
198      hasError = true;
199      response = exception;
200    }
201
202    unsettledResponsePorts.delete(port ?? syncCommPort);
203
204    // Send the method response (or exception) to the main thread.
205    try {
206      (port ?? syncCommPort).postMessage(
207        wrapMessage(hasError ? 'error' : 'success', response),
208        transferArrayBuffer(hasError, response?.source),
209      );
210    } catch (exception) {
211      // Or send the exception thrown when trying to send the response.
212      (port ?? syncCommPort).postMessage(wrapMessage('error', exception));
213    }
214
215    AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
216    AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
217    if (shouldRemoveGlobalErrorHandler) {
218      process.off('uncaughtException', errorHandler);
219    }
220
221    syncCommPort.off('message', handleMessage);
222    // We keep checking for new messages to not miss any.
223    clearImmediate(immediate);
224    immediate = setImmediate(checkForMessages).unref();
225  }
226}
227
228/**
229 * Initializes a worker thread for a module with customized hooks.
230 * ! Run everything possible within this function so errors get reported.
231 * @param {{lock: SharedArrayBuffer}} workerData - The lock used to synchronize with the main thread.
232 * @param {MessagePort} syncCommPort - The communication port used to communicate with the main thread.
233 */
234module.exports = function setupModuleWorker(workerData, syncCommPort) {
235  const lock = new Int32Array(workerData.lock);
236
237  /**
238   * Handles errors that occur in the worker thread.
239   * @param {Error} err - The error that occurred.
240   * @param {string} [origin='unhandledRejection'] - The origin of the error.
241   */
242  function errorHandler(err, origin = 'unhandledRejection') {
243    AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
244    AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
245    process.off('uncaughtException', errorHandler);
246    if (hasUncaughtExceptionCaptureCallback()) {
247      process._fatalException(err);
248      return;
249    }
250    internalBinding('errors').triggerUncaughtException(
251      err,
252      origin === 'unhandledRejection',
253    );
254  }
255
256  return PromisePrototypeThen(
257    customizedModuleWorker(lock, syncCommPort, errorHandler),
258    undefined,
259    errorHandler,
260  );
261};
262