1'use strict'; 2 3const { 4 DataViewPrototypeGetBuffer, 5 Int32Array, 6 PromisePrototypeThen, 7 ReflectApply, 8 SafeSet, 9 TypedArrayPrototypeGetBuffer, 10 globalThis: { 11 Atomics: { 12 add: AtomicsAdd, 13 notify: AtomicsNotify, 14 }, 15 }, 16} = primordials; 17const assert = require('internal/assert'); 18const { clearImmediate, setImmediate } = require('timers'); 19const { 20 hasUncaughtExceptionCaptureCallback, 21} = require('internal/process/execution'); 22const { 23 isArrayBuffer, 24 isDataView, 25 isTypedArray, 26} = require('util/types'); 27 28const { receiveMessageOnPort } = require('internal/worker/io'); 29const { 30 WORKER_TO_MAIN_THREAD_NOTIFICATION, 31} = require('internal/modules/esm/shared_constants'); 32const { initializeHooks } = require('internal/modules/esm/utils'); 33 34 35/** 36 * Transfers an ArrayBuffer, TypedArray, or DataView to a worker thread. 37 * @param {boolean} hasError - Whether an error occurred during transfer. 38 * @param {ArrayBuffer | TypedArray | DataView} source - The data to transfer. 39 */ 40function transferArrayBuffer(hasError, source) { 41 if (hasError || source == null) { return; } 42 if (isArrayBuffer(source)) { return [source]; } 43 if (isTypedArray(source)) { return [TypedArrayPrototypeGetBuffer(source)]; } 44 if (isDataView(source)) { return [DataViewPrototypeGetBuffer(source)]; } 45} 46 47/** 48 * Wraps a message with a status and body, and serializes the body if necessary. 49 * @param {string} status - The status of the message. 50 * @param {unknown} body - The body of the message. 51 */ 52function wrapMessage(status, body) { 53 if (status === 'success' || body === null || 54 (typeof body !== 'object' && 55 typeof body !== 'function' && 56 typeof body !== 'symbol')) { 57 return { status, body }; 58 } 59 60 let serialized; 61 let serializationFailed; 62 try { 63 const { serializeError } = require('internal/error_serdes'); 64 serialized = serializeError(body); 65 } catch { 66 serializationFailed = true; 67 } 68 69 return { 70 status, 71 body: { 72 serialized, 73 serializationFailed, 74 }, 75 }; 76} 77 78/** 79 * Initializes a worker thread for a customized module loader. 80 * @param {SharedArrayBuffer} lock - The lock used to synchronize communication between the worker and the main thread. 81 * @param {MessagePort} syncCommPort - The message port used for synchronous communication between the worker and the 82 * main thread. 83 * @param {(err: Error, origin?: string) => void} errorHandler - The function to use for uncaught exceptions. 84 * @returns {Promise<void>} A promise that resolves when the worker thread has been initialized. 85 */ 86async function customizedModuleWorker(lock, syncCommPort, errorHandler) { 87 let hooks, preloadScripts, initializationError; 88 let hasInitializationError = false; 89 90 { 91 // If a custom hook is calling `process.exit`, we should wake up the main thread 92 // so it can detect the exit event. 93 const { exit } = process; 94 process.exit = function(code) { 95 syncCommPort.postMessage(wrapMessage('exit', code ?? process.exitCode)); 96 AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 97 AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 98 return ReflectApply(exit, this, arguments); 99 }; 100 } 101 102 103 try { 104 const initResult = await initializeHooks(); 105 hooks = initResult.hooks; 106 preloadScripts = initResult.preloadScripts; 107 } catch (exception) { 108 // If there was an error while parsing and executing a user loader, for example if because a 109 // loader contained a syntax error, then we need to send the error to the main thread so it can 110 // be thrown and printed. 111 hasInitializationError = true; 112 initializationError = exception; 113 } 114 115 syncCommPort.on('message', handleMessage); 116 117 if (hasInitializationError) { 118 syncCommPort.postMessage(wrapMessage('error', initializationError)); 119 } else { 120 syncCommPort.postMessage(wrapMessage('success', { preloadScripts }), preloadScripts.map(({ port }) => port)); 121 } 122 123 // We're ready, so unlock the main thread. 124 AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 125 AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 126 127 let immediate; 128 /** 129 * Checks for messages on the syncCommPort and handles them asynchronously. 130 */ 131 function checkForMessages() { 132 immediate = setImmediate(checkForMessages).unref(); 133 // We need to let the event loop tick a few times to give the main thread a chance to send 134 // follow-up messages. 135 const response = receiveMessageOnPort(syncCommPort); 136 137 if (response !== undefined) { 138 PromisePrototypeThen(handleMessage(response.message), undefined, errorHandler); 139 } 140 } 141 142 const unsettledResponsePorts = new SafeSet(); 143 144 process.on('beforeExit', () => { 145 for (const port of unsettledResponsePorts) { 146 port.postMessage(wrapMessage('never-settle')); 147 } 148 unsettledResponsePorts.clear(); 149 150 AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 151 AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 152 153 // Attach back the event handler. 154 syncCommPort.on('message', handleMessage); 155 // Also check synchronously for a message, in case it's already there. 156 clearImmediate(immediate); 157 checkForMessages(); 158 // We don't need the sync check after this tick, as we already have added the event handler. 159 clearImmediate(immediate); 160 // Add some work for next tick so the worker cannot exit. 161 setImmediate(() => {}); 162 }); 163 164 /** 165 * Handles incoming messages from the main thread or other workers. 166 * @param {object} options - The options object. 167 * @param {string} options.method - The name of the hook. 168 * @param {Array} options.args - The arguments to pass to the method. 169 * @param {MessagePort} options.port - The message port to use for communication. 170 */ 171 async function handleMessage({ method, args, port }) { 172 // Each potential exception needs to be caught individually so that the correct error is sent to 173 // the main thread. 174 let hasError = false; 175 let shouldRemoveGlobalErrorHandler = false; 176 assert(typeof hooks[method] === 'function'); 177 if (port == null && !hasUncaughtExceptionCaptureCallback()) { 178 // When receiving sync messages, we want to unlock the main thread when there's an exception. 179 process.on('uncaughtException', errorHandler); 180 shouldRemoveGlobalErrorHandler = true; 181 } 182 183 // We are about to yield the execution with `await ReflectApply` below. In case the code 184 // following the `await` never runs, we remove the message handler so the `beforeExit` event 185 // can be triggered. 186 syncCommPort.off('message', handleMessage); 187 188 // We keep checking for new messages to not miss any. 189 clearImmediate(immediate); 190 immediate = setImmediate(checkForMessages).unref(); 191 192 unsettledResponsePorts.add(port ?? syncCommPort); 193 194 let response; 195 try { 196 response = await ReflectApply(hooks[method], hooks, args); 197 } catch (exception) { 198 hasError = true; 199 response = exception; 200 } 201 202 unsettledResponsePorts.delete(port ?? syncCommPort); 203 204 // Send the method response (or exception) to the main thread. 205 try { 206 (port ?? syncCommPort).postMessage( 207 wrapMessage(hasError ? 'error' : 'success', response), 208 transferArrayBuffer(hasError, response?.source), 209 ); 210 } catch (exception) { 211 // Or send the exception thrown when trying to send the response. 212 (port ?? syncCommPort).postMessage(wrapMessage('error', exception)); 213 } 214 215 AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 216 AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 217 if (shouldRemoveGlobalErrorHandler) { 218 process.off('uncaughtException', errorHandler); 219 } 220 221 syncCommPort.off('message', handleMessage); 222 // We keep checking for new messages to not miss any. 223 clearImmediate(immediate); 224 immediate = setImmediate(checkForMessages).unref(); 225 } 226} 227 228/** 229 * Initializes a worker thread for a module with customized hooks. 230 * ! Run everything possible within this function so errors get reported. 231 * @param {{lock: SharedArrayBuffer}} workerData - The lock used to synchronize with the main thread. 232 * @param {MessagePort} syncCommPort - The communication port used to communicate with the main thread. 233 */ 234module.exports = function setupModuleWorker(workerData, syncCommPort) { 235 const lock = new Int32Array(workerData.lock); 236 237 /** 238 * Handles errors that occur in the worker thread. 239 * @param {Error} err - The error that occurred. 240 * @param {string} [origin='unhandledRejection'] - The origin of the error. 241 */ 242 function errorHandler(err, origin = 'unhandledRejection') { 243 AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1); 244 AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION); 245 process.off('uncaughtException', errorHandler); 246 if (hasUncaughtExceptionCaptureCallback()) { 247 process._fatalException(err); 248 return; 249 } 250 internalBinding('errors').triggerUncaughtException( 251 err, 252 origin === 'unhandledRejection', 253 ); 254 } 255 256 return PromisePrototypeThen( 257 customizedModuleWorker(lock, syncCommPort, errorHandler), 258 undefined, 259 errorHandler, 260 ); 261}; 262