1'use strict'; 2 3// In worker threads, execute the script sent through the 4// message port. 5 6const { 7 ArrayPrototypeConcat, 8 ArrayPrototypeForEach, 9 ArrayPrototypeSplice, 10 ObjectDefineProperty, 11 PromisePrototypeCatch, 12 globalThis: { Atomics }, 13} = primordials; 14 15const { 16 patchProcessObject, 17 setupCoverageHooks, 18 setupInspectorHooks, 19 setupWarningHandler, 20 setupDebugEnv, 21 initializeAbortController, 22 initializeDeprecations, 23 initializeWASI, 24 initializeCJSLoader, 25 initializeESMLoader, 26 initializeFrozenIntrinsics, 27 initializeReport, 28 initializeSourceMapsHandlers, 29 loadPreloadModules, 30 setupTraceCategoryState 31} = require('internal/bootstrap/pre_execution'); 32 33const { 34 threadId, 35 getEnvMessagePort 36} = internalBinding('worker'); 37 38const workerIo = require('internal/worker/io'); 39const { 40 messageTypes: { 41 // Messages that may be received by workers 42 LOAD_SCRIPT, 43 // Messages that may be posted from workers 44 UP_AND_RUNNING, 45 ERROR_MESSAGE, 46 COULD_NOT_SERIALIZE_ERROR, 47 // Messages that may be either received or posted 48 STDIO_PAYLOAD, 49 STDIO_WANTS_MORE_DATA, 50 }, 51 kStdioWantsMoreDataCallback 52} = workerIo; 53 54const { 55 onGlobalUncaughtException 56} = require('internal/process/execution'); 57 58const publicWorker = require('worker_threads'); 59let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { 60 debug = fn; 61}); 62 63const assert = require('internal/assert'); 64 65patchProcessObject(); 66setupInspectorHooks(); 67setupDebugEnv(); 68 69setupWarningHandler(); 70initializeSourceMapsHandlers(); 71 72// Since worker threads cannot switch cwd, we do not need to 73// overwrite the process.env.NODE_V8_COVERAGE variable. 74if (process.env.NODE_V8_COVERAGE) { 75 setupCoverageHooks(process.env.NODE_V8_COVERAGE); 76} 77 78debug(`[${threadId}] is setting up worker child environment`); 79 80// Set up the message port and start listening 81const port = getEnvMessagePort(); 82 83// If the main thread is spawned with env NODE_CHANNEL_FD, it's probably 84// spawned by our child_process module. In the work threads, mark the 85// related IPC properties as unavailable. 86if (process.env.NODE_CHANNEL_FD) { 87 const workerThreadSetup = require('internal/process/worker_thread_only'); 88 ObjectDefineProperty(process, 'channel', { 89 enumerable: false, 90 get: workerThreadSetup.unavailable('process.channel') 91 }); 92 93 ObjectDefineProperty(process, 'connected', { 94 enumerable: false, 95 get: workerThreadSetup.unavailable('process.connected') 96 }); 97 98 process.send = workerThreadSetup.unavailable('process.send()'); 99 process.disconnect = 100 workerThreadSetup.unavailable('process.disconnect()'); 101} 102 103port.on('message', (message) => { 104 if (message.type === LOAD_SCRIPT) { 105 port.unref(); 106 const { 107 argv, 108 cwdCounter, 109 filename, 110 doEval, 111 workerData, 112 environmentData, 113 publicPort, 114 manifestSrc, 115 manifestURL, 116 hasStdin 117 } = message; 118 119 setupTraceCategoryState(); 120 initializeReport(); 121 if (manifestSrc) { 122 require('internal/process/policy').setup(manifestSrc, manifestURL); 123 } 124 initializeAbortController(); 125 initializeDeprecations(); 126 initializeWASI(); 127 initializeCJSLoader(); 128 initializeESMLoader(); 129 130 if (argv !== undefined) { 131 process.argv = ArrayPrototypeConcat(process.argv, argv); 132 } 133 publicWorker.parentPort = publicPort; 134 publicWorker.workerData = workerData; 135 136 require('internal/worker').assignEnvironmentData(environmentData); 137 138 // The counter is only passed to the workers created by the main thread, not 139 // to workers created by other workers. 140 let cachedCwd = ''; 141 let lastCounter = -1; 142 const originalCwd = process.cwd; 143 144 process.cwd = function() { 145 const currentCounter = Atomics.load(cwdCounter, 0); 146 if (currentCounter === lastCounter) 147 return cachedCwd; 148 lastCounter = currentCounter; 149 cachedCwd = originalCwd(); 150 return cachedCwd; 151 }; 152 workerIo.sharedCwdCounter = cwdCounter; 153 154 const CJSLoader = require('internal/modules/cjs/loader'); 155 assert(!CJSLoader.hasLoadedAnyUserCJSModule); 156 loadPreloadModules(); 157 initializeFrozenIntrinsics(); 158 159 if (!hasStdin) 160 process.stdin.push(null); 161 162 debug(`[${threadId}] starts worker script ${filename} ` + 163 `(eval = ${eval}) at cwd = ${process.cwd()}`); 164 port.postMessage({ type: UP_AND_RUNNING }); 165 if (doEval === 'classic') { 166 const { evalScript } = require('internal/process/execution'); 167 const name = '[worker eval]'; 168 // This is necessary for CJS module compilation. 169 // TODO: pass this with something really internal. 170 ObjectDefineProperty(process, '_eval', { 171 configurable: true, 172 enumerable: true, 173 value: filename, 174 }); 175 ArrayPrototypeSplice(process.argv, 1, 0, name); 176 evalScript(name, filename); 177 } else if (doEval === 'module') { 178 const { evalModule } = require('internal/process/execution'); 179 PromisePrototypeCatch(evalModule(filename), (e) => { 180 workerOnGlobalUncaughtException(e, true); 181 }); 182 } else { 183 // script filename 184 // runMain here might be monkey-patched by users in --require. 185 // XXX: the monkey-patchability here should probably be deprecated. 186 ArrayPrototypeSplice(process.argv, 1, 0, filename); 187 CJSLoader.Module.runMain(filename); 188 } 189 } else if (message.type === STDIO_PAYLOAD) { 190 const { stream, chunks } = message; 191 ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => { 192 process[stream].push(chunk, encoding); 193 }); 194 } else { 195 assert( 196 message.type === STDIO_WANTS_MORE_DATA, 197 `Unknown worker message type ${message.type}` 198 ); 199 const { stream } = message; 200 process[stream][kStdioWantsMoreDataCallback](); 201 } 202}); 203 204function workerOnGlobalUncaughtException(error, fromPromise) { 205 debug(`[${threadId}] gets uncaught exception`); 206 let handled = false; 207 let handlerThrew = false; 208 try { 209 handled = onGlobalUncaughtException(error, fromPromise); 210 } catch (e) { 211 error = e; 212 handlerThrew = true; 213 } 214 debug(`[${threadId}] uncaught exception handled = ${handled}`); 215 216 if (handled) { 217 return true; 218 } 219 220 if (!process._exiting) { 221 try { 222 process._exiting = true; 223 process.exitCode = 1; 224 if (!handlerThrew) { 225 process.emit('exit', process.exitCode); 226 } 227 } catch {} 228 } 229 230 let serialized; 231 try { 232 const { serializeError } = require('internal/error_serdes'); 233 serialized = serializeError(error); 234 } catch {} 235 debug(`[${threadId}] uncaught exception serialized = ${!!serialized}`); 236 if (serialized) 237 port.postMessage({ 238 type: ERROR_MESSAGE, 239 error: serialized 240 }); 241 else 242 port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR }); 243 244 const { clearAsyncIdStack } = require('internal/async_hooks'); 245 clearAsyncIdStack(); 246 247 process.exit(); 248} 249 250// Patch the global uncaught exception handler so it gets picked up by 251// node::errors::TriggerUncaughtException(). 252process._fatalException = workerOnGlobalUncaughtException; 253 254markBootstrapComplete(); 255 256port.start(); 257