1'use strict'; 2 3// In worker threads, execute the script sent through the 4// message port. 5 6const { 7 ArrayPrototypeForEach, 8 ArrayPrototypePushApply, 9 ArrayPrototypeSplice, 10 ObjectDefineProperty, 11 PromisePrototypeThen, 12 RegExpPrototypeExec, 13 SafeWeakMap, 14 globalThis: { 15 Atomics, 16 SharedArrayBuffer, 17 }, 18} = primordials; 19 20const { 21 prepareWorkerThreadExecution, 22 setupUserModules, 23 markBootstrapComplete, 24} = require('internal/process/pre_execution'); 25 26const { 27 threadId, 28 getEnvMessagePort, 29} = internalBinding('worker'); 30 31const workerIo = require('internal/worker/io'); 32const { 33 messageTypes: { 34 // Messages that may be received by workers 35 LOAD_SCRIPT, 36 // Messages that may be posted from workers 37 UP_AND_RUNNING, 38 ERROR_MESSAGE, 39 COULD_NOT_SERIALIZE_ERROR, 40 // Messages that may be either received or posted 41 STDIO_PAYLOAD, 42 STDIO_WANTS_MORE_DATA, 43 }, 44 kStdioWantsMoreDataCallback, 45} = workerIo; 46 47const { 48 onGlobalUncaughtException, 49} = require('internal/process/execution'); 50 51let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { 52 debug = fn; 53}); 54 55const assert = require('internal/assert'); 56 57prepareWorkerThreadExecution(); 58 59debug(`[${threadId}] is setting up worker child environment`); 60 61// Set up the message port and start listening 62const port = getEnvMessagePort(); 63 64// If the main thread is spawned with env NODE_CHANNEL_FD, it's probably 65// spawned by our child_process module. In the work threads, mark the 66// related IPC properties as unavailable. 67if (process.env.NODE_CHANNEL_FD) { 68 const workerThreadSetup = require('internal/process/worker_thread_only'); 69 ObjectDefineProperty(process, 'channel', { 70 __proto__: null, 71 enumerable: false, 72 get: workerThreadSetup.unavailable('process.channel'), 73 }); 74 75 ObjectDefineProperty(process, 'connected', { 76 __proto__: null, 77 enumerable: false, 78 get: workerThreadSetup.unavailable('process.connected'), 79 }); 80 81 process.send = workerThreadSetup.unavailable('process.send()'); 82 process.disconnect = 83 workerThreadSetup.unavailable('process.disconnect()'); 84} 85 86port.on('message', (message) => { 87 if (message.type === LOAD_SCRIPT) { 88 port.unref(); 89 const { 90 argv, 91 cwdCounter, 92 doEval, 93 environmentData, 94 filename, 95 hasStdin, 96 manifestSrc, 97 manifestURL, 98 publicPort, 99 workerData, 100 } = message; 101 102 if (doEval !== 'internal') { 103 if (argv !== undefined) { 104 ArrayPrototypePushApply(process.argv, argv); 105 } 106 107 const publicWorker = require('worker_threads'); 108 publicWorker.parentPort = publicPort; 109 publicWorker.workerData = workerData; 110 } 111 112 require('internal/worker').assignEnvironmentData(environmentData); 113 114 if (SharedArrayBuffer !== undefined && Atomics !== undefined) { 115 // The counter is only passed to the workers created by the main thread, 116 // not to workers created by other workers. 117 let cachedCwd = ''; 118 let lastCounter = -1; 119 const originalCwd = process.cwd; 120 121 process.cwd = function() { 122 const currentCounter = Atomics.load(cwdCounter, 0); 123 if (currentCounter === lastCounter) 124 return cachedCwd; 125 lastCounter = currentCounter; 126 cachedCwd = originalCwd(); 127 return cachedCwd; 128 }; 129 workerIo.sharedCwdCounter = cwdCounter; 130 } 131 132 if (manifestSrc) { 133 require('internal/process/policy').setup(manifestSrc, manifestURL); 134 } 135 const isLoaderWorker = 136 doEval === 'internal' && 137 filename === require('internal/modules/esm/utils').loaderWorkerId; 138 setupUserModules(isLoaderWorker); 139 140 if (!hasStdin) 141 process.stdin.push(null); 142 143 debug(`[${threadId}] starts worker script ${filename} ` + 144 `(eval = ${doEval}) at cwd = ${process.cwd()}`); 145 port.postMessage({ type: UP_AND_RUNNING }); 146 switch (doEval) { 147 case 'internal': { 148 // Create this WeakMap in js-land because V8 has no C++ API for WeakMap. 149 internalBinding('module_wrap').callbackMap = new SafeWeakMap(); 150 require(filename)(workerData, publicPort); 151 break; 152 } 153 154 case 'classic': { 155 const { evalScript } = require('internal/process/execution'); 156 const name = '[worker eval]'; 157 // This is necessary for CJS module compilation. 158 // TODO: pass this with something really internal. 159 ObjectDefineProperty(process, '_eval', { 160 __proto__: null, 161 configurable: true, 162 enumerable: true, 163 value: filename, 164 }); 165 ArrayPrototypeSplice(process.argv, 1, 0, name); 166 evalScript(name, filename); 167 break; 168 } 169 170 case 'module': { 171 const { evalModule } = require('internal/process/execution'); 172 PromisePrototypeThen(evalModule(filename), undefined, (e) => { 173 workerOnGlobalUncaughtException(e, true); 174 }); 175 break; 176 } 177 178 default: { 179 // script filename 180 // runMain here might be monkey-patched by users in --require. 181 // XXX: the monkey-patchability here should probably be deprecated. 182 ArrayPrototypeSplice(process.argv, 1, 0, filename); 183 const CJSLoader = require('internal/modules/cjs/loader'); 184 CJSLoader.Module.runMain(filename); 185 break; 186 } 187 } 188 } else if (message.type === STDIO_PAYLOAD) { 189 const { stream, chunks } = message; 190 ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => { 191 process[stream].push(chunk, encoding); 192 }); 193 } else { 194 assert( 195 message.type === STDIO_WANTS_MORE_DATA, 196 `Unknown worker message type ${message.type}`, 197 ); 198 const { stream } = message; 199 process[stream][kStdioWantsMoreDataCallback](); 200 } 201}); 202 203function workerOnGlobalUncaughtException(error, fromPromise) { 204 debug(`[${threadId}] gets uncaught exception`); 205 let handled = false; 206 let handlerThrew = false; 207 try { 208 handled = onGlobalUncaughtException(error, fromPromise); 209 } catch (e) { 210 error = e; 211 handlerThrew = true; 212 } 213 debug(`[${threadId}] uncaught exception handled = ${handled}`); 214 215 if (handled) { 216 return true; 217 } 218 219 if (!process._exiting) { 220 try { 221 process._exiting = true; 222 process.exitCode = 1; 223 if (!handlerThrew) { 224 process.emit('exit', process.exitCode); 225 } 226 } catch { 227 // Continue regardless of error. 228 } 229 } 230 231 let serialized; 232 try { 233 const { serializeError } = require('internal/error_serdes'); 234 serialized = serializeError(error); 235 } catch { 236 // Continue regardless of error. 237 } 238 debug(`[${threadId}] uncaught exception serialized = ${!!serialized}`); 239 if (serialized) 240 port.postMessage({ 241 type: ERROR_MESSAGE, 242 error: serialized, 243 }); 244 else 245 port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR }); 246 247 const { clearAsyncIdStack } = require('internal/async_hooks'); 248 clearAsyncIdStack(); 249 250 process.exit(); 251} 252 253// Patch the global uncaught exception handler so it gets picked up by 254// node::errors::TriggerUncaughtException(). 255process._fatalException = workerOnGlobalUncaughtException; 256 257markBootstrapComplete(); 258 259// Necessary to reset RegExp statics before user code runs. 260RegExpPrototypeExec(/^/, ''); 261 262port.start(); 263