1'use strict'; 2 3/* global SharedArrayBuffer */ 4 5const { 6 ArrayIsArray, 7 MathMax, 8 ObjectCreate, 9 ObjectEntries, 10 Promise, 11 PromiseResolve, 12 Symbol, 13 SymbolFor, 14 Uint32Array, 15} = primordials; 16 17const EventEmitter = require('events'); 18const assert = require('internal/assert'); 19const path = require('path'); 20 21const errorCodes = require('internal/errors').codes; 22const { 23 ERR_WORKER_NOT_RUNNING, 24 ERR_WORKER_PATH, 25 ERR_WORKER_UNSERIALIZABLE_ERROR, 26 ERR_WORKER_UNSUPPORTED_EXTENSION, 27 ERR_WORKER_INVALID_EXEC_ARGV, 28 ERR_INVALID_ARG_TYPE, 29 ERR_INVALID_ARG_VALUE, 30} = errorCodes; 31const { getOptionValue } = require('internal/options'); 32 33const workerIo = require('internal/worker/io'); 34const { 35 drainMessagePort, 36 MessageChannel, 37 messageTypes, 38 kPort, 39 kIncrementsPortRef, 40 kWaitingStreams, 41 kStdioWantsMoreDataCallback, 42 setupPortReferencing, 43 ReadableWorkerStdio, 44 WritableWorkerStdio 45} = workerIo; 46const { deserializeError } = require('internal/error_serdes'); 47const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url'); 48 49const { 50 ownsProcessState, 51 isMainThread, 52 resourceLimits: resourceLimitsRaw, 53 threadId, 54 Worker: WorkerImpl, 55 kMaxYoungGenerationSizeMb, 56 kMaxOldGenerationSizeMb, 57 kCodeRangeSizeMb, 58 kStackSizeMb, 59 kTotalResourceLimitCount 60} = internalBinding('worker'); 61 62const kHandle = Symbol('kHandle'); 63const kPublicPort = Symbol('kPublicPort'); 64const kDispose = Symbol('kDispose'); 65const kOnExit = Symbol('kOnExit'); 66const kOnMessage = Symbol('kOnMessage'); 67const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); 68const kOnErrorMessage = Symbol('kOnErrorMessage'); 69const kParentSideStdio = Symbol('kParentSideStdio'); 70 71const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV'); 72let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { 73 debug = fn; 74}); 75 76let cwdCounter; 77 78if (isMainThread) { 79 cwdCounter = new Uint32Array(new SharedArrayBuffer(4)); 80 const originalChdir = process.chdir; 81 process.chdir = function(path) { 82 Atomics.add(cwdCounter, 0, 1); 83 originalChdir(path); 84 }; 85} 86 87class Worker extends EventEmitter { 88 constructor(filename, options = {}) { 89 super(); 90 debug(`[${threadId}] create new worker`, filename, options); 91 if (options.execArgv && !ArrayIsArray(options.execArgv)) { 92 throw new ERR_INVALID_ARG_TYPE('options.execArgv', 93 'Array', 94 options.execArgv); 95 } 96 let argv; 97 if (options.argv) { 98 if (!ArrayIsArray(options.argv)) { 99 throw new ERR_INVALID_ARG_TYPE('options.argv', 'Array', options.argv); 100 } 101 argv = options.argv.map(String); 102 } 103 104 let url; 105 if (options.eval) { 106 if (typeof filename !== 'string') { 107 throw new ERR_INVALID_ARG_VALUE( 108 'options.eval', 109 options.eval, 110 'must be false when \'filename\' is not a string' 111 ); 112 } 113 url = null; 114 } else { 115 if (isURLInstance(filename)) { 116 url = filename; 117 filename = fileURLToPath(filename); 118 } else if (typeof filename !== 'string') { 119 throw new ERR_INVALID_ARG_TYPE( 120 'filename', 121 ['string', 'URL'], 122 filename 123 ); 124 } else if (path.isAbsolute(filename) || /^\.\.?[\\/]/.test(filename)) { 125 filename = path.resolve(filename); 126 url = pathToFileURL(filename); 127 } else { 128 throw new ERR_WORKER_PATH(filename); 129 } 130 131 const ext = path.extname(filename); 132 if (ext !== '.js' && ext !== '.mjs' && ext !== '.cjs') { 133 throw new ERR_WORKER_UNSUPPORTED_EXTENSION(ext); 134 } 135 } 136 137 let env; 138 if (typeof options.env === 'object' && options.env !== null) { 139 env = ObjectCreate(null); 140 for (const [ key, value ] of ObjectEntries(options.env)) 141 env[key] = `${value}`; 142 } else if (options.env == null) { 143 env = process.env; 144 } else if (options.env !== SHARE_ENV) { 145 throw new ERR_INVALID_ARG_TYPE( 146 'options.env', 147 ['object', 'undefined', 'null', 'worker_threads.SHARE_ENV'], 148 options.env); 149 } 150 151 // Set up the C++ handle for the worker, as well as some internal wiring. 152 this[kHandle] = new WorkerImpl(url, 153 env === process.env ? null : env, 154 options.execArgv, 155 parseResourceLimits(options.resourceLimits), 156 !!options.trackUnmanagedFds); 157 if (this[kHandle].invalidExecArgv) { 158 throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv); 159 } 160 if (this[kHandle].invalidNodeOptions) { 161 throw new ERR_WORKER_INVALID_EXEC_ARGV( 162 this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable'); 163 } 164 this[kHandle].onexit = (code, customErr, customErrReason) => { 165 this[kOnExit](code, customErr, customErrReason); 166 }; 167 this[kPort] = this[kHandle].messagePort; 168 this[kPort].on('message', (data) => this[kOnMessage](data)); 169 this[kPort].start(); 170 this[kPort].unref(); 171 this[kPort][kWaitingStreams] = 0; 172 debug(`[${threadId}] created Worker with ID ${this.threadId}`); 173 174 let stdin = null; 175 if (options.stdin) 176 stdin = new WritableWorkerStdio(this[kPort], 'stdin'); 177 const stdout = new ReadableWorkerStdio(this[kPort], 'stdout'); 178 if (!options.stdout) { 179 stdout[kIncrementsPortRef] = false; 180 pipeWithoutWarning(stdout, process.stdout); 181 } 182 const stderr = new ReadableWorkerStdio(this[kPort], 'stderr'); 183 if (!options.stderr) { 184 stderr[kIncrementsPortRef] = false; 185 pipeWithoutWarning(stderr, process.stderr); 186 } 187 188 this[kParentSideStdio] = { stdin, stdout, stderr }; 189 190 const { port1, port2 } = new MessageChannel(); 191 const transferList = [port2]; 192 // If transferList is provided. 193 if (options.transferList) 194 transferList.push(...options.transferList); 195 196 this[kPublicPort] = port1; 197 for (const event of ['message', 'messageerror']) { 198 this[kPublicPort].on(event, (message) => this.emit(event, message)); 199 } 200 setupPortReferencing(this[kPublicPort], this, 'message'); 201 this[kPort].postMessage({ 202 argv, 203 type: messageTypes.LOAD_SCRIPT, 204 filename, 205 doEval: !!options.eval, 206 cwdCounter: cwdCounter || workerIo.sharedCwdCounter, 207 workerData: options.workerData, 208 publicPort: port2, 209 manifestURL: getOptionValue('--experimental-policy') ? 210 require('internal/process/policy').url : 211 null, 212 manifestSrc: getOptionValue('--experimental-policy') ? 213 require('internal/process/policy').src : 214 null, 215 hasStdin: !!options.stdin 216 }, transferList); 217 // Actually start the new thread now that everything is in place. 218 this[kHandle].startThread(); 219 } 220 221 [kOnExit](code, customErr, customErrReason) { 222 debug(`[${threadId}] hears end event for Worker ${this.threadId}`); 223 drainMessagePort(this[kPublicPort]); 224 drainMessagePort(this[kPort]); 225 this[kDispose](); 226 if (customErr) { 227 debug(`[${threadId}] failing with custom error ${customErr} \ 228 and with reason ${customErrReason}`); 229 this.emit('error', new errorCodes[customErr](customErrReason)); 230 } 231 this.emit('exit', code); 232 this.removeAllListeners(); 233 } 234 235 [kOnCouldNotSerializeErr]() { 236 this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR()); 237 } 238 239 [kOnErrorMessage](serialized) { 240 // This is what is called for uncaught exceptions. 241 const error = deserializeError(serialized); 242 this.emit('error', error); 243 } 244 245 [kOnMessage](message) { 246 switch (message.type) { 247 case messageTypes.UP_AND_RUNNING: 248 return this.emit('online'); 249 case messageTypes.COULD_NOT_SERIALIZE_ERROR: 250 return this[kOnCouldNotSerializeErr](); 251 case messageTypes.ERROR_MESSAGE: 252 return this[kOnErrorMessage](message.error); 253 case messageTypes.STDIO_PAYLOAD: 254 { 255 const { stream, chunks } = message; 256 const readable = this[kParentSideStdio][stream]; 257 for (const { chunk, encoding } of chunks) 258 readable.push(chunk, encoding); 259 return; 260 } 261 case messageTypes.STDIO_WANTS_MORE_DATA: 262 { 263 const { stream } = message; 264 return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback](); 265 } 266 } 267 268 assert.fail(`Unknown worker message type ${message.type}`); 269 } 270 271 [kDispose]() { 272 this[kHandle].onexit = null; 273 this[kHandle] = null; 274 this[kPort] = null; 275 this[kPublicPort] = null; 276 277 const { stdout, stderr } = this[kParentSideStdio]; 278 279 if (!stdout.readableEnded) { 280 debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`); 281 stdout.push(null); 282 } 283 if (!stderr.readableEnded) { 284 debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`); 285 stderr.push(null); 286 } 287 } 288 289 postMessage(...args) { 290 if (this[kPublicPort] === null) return; 291 292 this[kPublicPort].postMessage(...args); 293 } 294 295 terminate(callback) { 296 debug(`[${threadId}] terminates Worker with ID ${this.threadId}`); 297 298 this.ref(); 299 300 if (typeof callback === 'function') { 301 process.emitWarning( 302 'Passing a callback to worker.terminate() is deprecated. ' + 303 'It returns a Promise instead.', 304 'DeprecationWarning', 'DEP0132'); 305 if (this[kHandle] === null) return PromiseResolve(); 306 this.once('exit', (exitCode) => callback(null, exitCode)); 307 } 308 309 if (this[kHandle] === null) return PromiseResolve(); 310 311 this[kHandle].stopThread(); 312 313 // Do not use events.once() here, because the 'exit' event will always be 314 // emitted regardless of any errors, and the point is to only resolve 315 // once the thread has actually stopped. 316 return new Promise((resolve) => { 317 this.once('exit', resolve); 318 }); 319 } 320 321 ref() { 322 if (this[kHandle] === null) return; 323 324 this[kHandle].ref(); 325 this[kPublicPort].ref(); 326 } 327 328 unref() { 329 if (this[kHandle] === null) return; 330 331 this[kHandle].unref(); 332 this[kPublicPort].unref(); 333 } 334 335 get threadId() { 336 if (this[kHandle] === null) return -1; 337 338 return this[kHandle].threadId; 339 } 340 341 get stdin() { 342 return this[kParentSideStdio].stdin; 343 } 344 345 get stdout() { 346 return this[kParentSideStdio].stdout; 347 } 348 349 get stderr() { 350 return this[kParentSideStdio].stderr; 351 } 352 353 get resourceLimits() { 354 if (this[kHandle] === null) return {}; 355 356 return makeResourceLimits(this[kHandle].getResourceLimits()); 357 } 358 359 getHeapSnapshot() { 360 const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot(); 361 return new Promise((resolve, reject) => { 362 if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING()); 363 heapSnapshotTaker.ondone = (handle) => { 364 const { HeapSnapshotStream } = require('internal/heap_utils'); 365 resolve(new HeapSnapshotStream(handle)); 366 }; 367 }); 368 } 369} 370 371function pipeWithoutWarning(source, dest) { 372 const sourceMaxListeners = source._maxListeners; 373 const destMaxListeners = dest._maxListeners; 374 source.setMaxListeners(Infinity); 375 dest.setMaxListeners(Infinity); 376 377 source.pipe(dest); 378 379 source._maxListeners = sourceMaxListeners; 380 dest._maxListeners = destMaxListeners; 381} 382 383const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount); 384function parseResourceLimits(obj) { 385 const ret = resourceLimitsArray; 386 ret.fill(-1); 387 if (typeof obj !== 'object' || obj === null) return ret; 388 389 if (typeof obj.maxOldGenerationSizeMb === 'number') 390 ret[kMaxOldGenerationSizeMb] = MathMax(obj.maxOldGenerationSizeMb, 2); 391 if (typeof obj.maxYoungGenerationSizeMb === 'number') 392 ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb; 393 if (typeof obj.codeRangeSizeMb === 'number') 394 ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb; 395 if (typeof obj.stackSizeMb === 'number') 396 ret[kStackSizeMb] = obj.stackSizeMb; 397 return ret; 398} 399 400function makeResourceLimits(float64arr) { 401 return { 402 maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb], 403 maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb], 404 codeRangeSizeMb: float64arr[kCodeRangeSizeMb], 405 stackSizeMb: float64arr[kStackSizeMb] 406 }; 407} 408 409module.exports = { 410 ownsProcessState, 411 isMainThread, 412 SHARE_ENV, 413 resourceLimits: 414 !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {}, 415 threadId, 416 Worker, 417}; 418