1'use strict'; 2 3// In worker threads, execute the script sent through the 4// message port. 5 6const { 7 ArrayPrototypeForEach, 8 ArrayPrototypePushApply, 9 ArrayPrototypeSplice, 10 ObjectDefineProperty, 11 PromisePrototypeThen, 12 RegExpPrototypeExec, 13 globalThis: { 14 Atomics, 15 SharedArrayBuffer, 16 }, 17} = primordials; 18 19const { 20 prepareWorkerThreadExecution, 21 setupUserModules, 22 markBootstrapComplete, 23} = require('internal/process/pre_execution'); 24 25const { 26 threadId, 27 getEnvMessagePort, 28} = internalBinding('worker'); 29 30const workerIo = require('internal/worker/io'); 31const { 32 messageTypes: { 33 // Messages that may be received by workers 34 LOAD_SCRIPT, 35 // Messages that may be posted from workers 36 UP_AND_RUNNING, 37 ERROR_MESSAGE, 38 COULD_NOT_SERIALIZE_ERROR, 39 // Messages that may be either received or posted 40 STDIO_PAYLOAD, 41 STDIO_WANTS_MORE_DATA, 42 }, 43 kStdioWantsMoreDataCallback, 44} = workerIo; 45 46const { 47 onGlobalUncaughtException, 48} = require('internal/process/execution'); 49 50let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { 51 debug = fn; 52}); 53 54const assert = require('internal/assert'); 55 56prepareWorkerThreadExecution(); 57 58debug(`[${threadId}] is setting up worker child environment`); 59 60// Set up the message port and start listening 61const port = getEnvMessagePort(); 62 63// If the main thread is spawned with env NODE_CHANNEL_FD, it's probably 64// spawned by our child_process module. In the work threads, mark the 65// related IPC properties as unavailable. 66if (process.env.NODE_CHANNEL_FD) { 67 const workerThreadSetup = require('internal/process/worker_thread_only'); 68 ObjectDefineProperty(process, 'channel', { 69 __proto__: null, 70 enumerable: false, 71 get: workerThreadSetup.unavailable('process.channel'), 72 }); 73 74 ObjectDefineProperty(process, 'connected', { 75 __proto__: null, 76 enumerable: false, 77 get: workerThreadSetup.unavailable('process.connected'), 78 }); 79 80 process.send = workerThreadSetup.unavailable('process.send()'); 81 process.disconnect = 82 workerThreadSetup.unavailable('process.disconnect()'); 83} 84 85port.on('message', (message) => { 86 if (message.type === LOAD_SCRIPT) { 87 port.unref(); 88 const { 89 argv, 90 cwdCounter, 91 filename, 92 doEval, 93 workerData, 94 environmentData, 95 publicPort, 96 manifestSrc, 97 manifestURL, 98 hasStdin, 99 } = message; 100 101 if (argv !== undefined) { 102 ArrayPrototypePushApply(process.argv, argv); 103 } 104 105 const publicWorker = require('worker_threads'); 106 publicWorker.parentPort = publicPort; 107 publicWorker.workerData = workerData; 108 109 require('internal/worker').assignEnvironmentData(environmentData); 110 111 if (SharedArrayBuffer !== undefined && Atomics !== undefined) { 112 // The counter is only passed to the workers created by the main thread, 113 // not to workers created by other workers. 114 let cachedCwd = ''; 115 let lastCounter = -1; 116 const originalCwd = process.cwd; 117 118 process.cwd = function() { 119 const currentCounter = Atomics.load(cwdCounter, 0); 120 if (currentCounter === lastCounter) 121 return cachedCwd; 122 lastCounter = currentCounter; 123 cachedCwd = originalCwd(); 124 return cachedCwd; 125 }; 126 workerIo.sharedCwdCounter = cwdCounter; 127 } 128 129 if (manifestSrc) { 130 require('internal/process/policy').setup(manifestSrc, manifestURL); 131 } 132 setupUserModules(); 133 134 if (!hasStdin) 135 process.stdin.push(null); 136 137 debug(`[${threadId}] starts worker script ${filename} ` + 138 `(eval = ${doEval}) at cwd = ${process.cwd()}`); 139 port.postMessage({ type: UP_AND_RUNNING }); 140 if (doEval === 'classic') { 141 const { evalScript } = require('internal/process/execution'); 142 const name = '[worker eval]'; 143 // This is necessary for CJS module compilation. 144 // TODO: pass this with something really internal. 145 ObjectDefineProperty(process, '_eval', { 146 __proto__: null, 147 configurable: true, 148 enumerable: true, 149 value: filename, 150 }); 151 ArrayPrototypeSplice(process.argv, 1, 0, name); 152 evalScript(name, filename); 153 } else if (doEval === 'module') { 154 const { evalModule } = require('internal/process/execution'); 155 PromisePrototypeThen(evalModule(filename), undefined, (e) => { 156 workerOnGlobalUncaughtException(e, true); 157 }); 158 } else { 159 // script filename 160 // runMain here might be monkey-patched by users in --require. 161 // XXX: the monkey-patchability here should probably be deprecated. 162 ArrayPrototypeSplice(process.argv, 1, 0, filename); 163 const CJSLoader = require('internal/modules/cjs/loader'); 164 CJSLoader.Module.runMain(filename); 165 } 166 } else if (message.type === STDIO_PAYLOAD) { 167 const { stream, chunks } = message; 168 ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => { 169 process[stream].push(chunk, encoding); 170 }); 171 } else { 172 assert( 173 message.type === STDIO_WANTS_MORE_DATA, 174 `Unknown worker message type ${message.type}`, 175 ); 176 const { stream } = message; 177 process[stream][kStdioWantsMoreDataCallback](); 178 } 179}); 180 181function workerOnGlobalUncaughtException(error, fromPromise) { 182 debug(`[${threadId}] gets uncaught exception`); 183 let handled = false; 184 let handlerThrew = false; 185 try { 186 handled = onGlobalUncaughtException(error, fromPromise); 187 } catch (e) { 188 error = e; 189 handlerThrew = true; 190 } 191 debug(`[${threadId}] uncaught exception handled = ${handled}`); 192 193 if (handled) { 194 return true; 195 } 196 197 if (!process._exiting) { 198 try { 199 process._exiting = true; 200 process.exitCode = 1; 201 if (!handlerThrew) { 202 process.emit('exit', process.exitCode); 203 } 204 } catch { 205 // Continue regardless of error. 206 } 207 } 208 209 let serialized; 210 try { 211 const { serializeError } = require('internal/error_serdes'); 212 serialized = serializeError(error); 213 } catch { 214 // Continue regardless of error. 215 } 216 debug(`[${threadId}] uncaught exception serialized = ${!!serialized}`); 217 if (serialized) 218 port.postMessage({ 219 type: ERROR_MESSAGE, 220 error: serialized, 221 }); 222 else 223 port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR }); 224 225 const { clearAsyncIdStack } = require('internal/async_hooks'); 226 clearAsyncIdStack(); 227 228 process.exit(); 229} 230 231// Patch the global uncaught exception handler so it gets picked up by 232// node::errors::TriggerUncaughtException(). 233process._fatalException = workerOnGlobalUncaughtException; 234 235markBootstrapComplete(); 236 237// Necessary to reset RegExp statics before user code runs. 238RegExpPrototypeExec(/^/, ''); 239 240port.start(); 241