1'use strict'; 2 3const { 4 ArrayPrototypeForEach, 5 ArrayPrototypeMap, 6 ArrayPrototypePush, 7 Float64Array, 8 FunctionPrototypeBind, 9 JSONStringify, 10 MathMax, 11 ObjectCreate, 12 ObjectEntries, 13 Promise, 14 PromiseResolve, 15 ReflectApply, 16 RegExpPrototypeExec, 17 SafeArrayIterator, 18 SafeMap, 19 String, 20 StringPrototypeTrim, 21 Symbol, 22 SymbolFor, 23 TypedArrayPrototypeFill, 24 Uint32Array, 25 globalThis: { Atomics, SharedArrayBuffer }, 26} = primordials; 27 28const EventEmitter = require('events'); 29const assert = require('internal/assert'); 30const path = require('path'); 31const { now } = require('internal/perf/utils'); 32 33const errorCodes = require('internal/errors').codes; 34const { 35 ERR_WORKER_NOT_RUNNING, 36 ERR_WORKER_PATH, 37 ERR_WORKER_UNSERIALIZABLE_ERROR, 38 ERR_WORKER_INVALID_EXEC_ARGV, 39 ERR_INVALID_ARG_TYPE, 40 ERR_INVALID_ARG_VALUE, 41} = errorCodes; 42const { getOptionValue } = require('internal/options'); 43 44const workerIo = require('internal/worker/io'); 45const { 46 drainMessagePort, 47 receiveMessageOnPort, 48 MessageChannel, 49 messageTypes, 50 kPort, 51 kIncrementsPortRef, 52 kWaitingStreams, 53 kStdioWantsMoreDataCallback, 54 setupPortReferencing, 55 ReadableWorkerStdio, 56 WritableWorkerStdio, 57} = workerIo; 58const { deserializeError } = require('internal/error_serdes'); 59const { fileURLToPath, isURL, pathToFileURL } = require('internal/url'); 60const { kEmptyObject } = require('internal/util'); 61const { validateArray, validateString } = require('internal/validators'); 62 63const { 64 ownsProcessState, 65 isMainThread, 66 resourceLimits: resourceLimitsRaw, 67 threadId, 68 Worker: WorkerImpl, 69 kMaxYoungGenerationSizeMb, 70 kMaxOldGenerationSizeMb, 71 kCodeRangeSizeMb, 72 kStackSizeMb, 73 kTotalResourceLimitCount, 74} = internalBinding('worker'); 75 76const kHandle = Symbol('kHandle'); 77const kPublicPort = Symbol('kPublicPort'); 78const kDispose = Symbol('kDispose'); 79const kOnExit = Symbol('kOnExit'); 80const kOnMessage = Symbol('kOnMessage'); 81const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); 82const kOnErrorMessage = Symbol('kOnErrorMessage'); 83const kParentSideStdio = Symbol('kParentSideStdio'); 84const kLoopStartTime = Symbol('kLoopStartTime'); 85const kIsInternal = Symbol('kIsInternal'); 86const kIsOnline = Symbol('kIsOnline'); 87 88const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV'); 89let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { 90 debug = fn; 91}); 92 93let cwdCounter; 94 95const environmentData = new SafeMap(); 96 97// SharedArrayBuffers can be disabled with --no-harmony-sharedarraybuffer. 98// Atomics can be disabled with --no-harmony-atomics. 99if (isMainThread && SharedArrayBuffer !== undefined && Atomics !== undefined) { 100 cwdCounter = new Uint32Array(new SharedArrayBuffer(4)); 101 const originalChdir = process.chdir; 102 process.chdir = function(path) { 103 Atomics.add(cwdCounter, 0, 1); 104 originalChdir(path); 105 }; 106} 107 108function setEnvironmentData(key, value) { 109 if (value === undefined) 110 environmentData.delete(key); 111 else 112 environmentData.set(key, value); 113} 114 115function getEnvironmentData(key) { 116 return environmentData.get(key); 117} 118 119function assignEnvironmentData(data) { 120 if (data === undefined) return; 121 data.forEach((value, key) => { 122 environmentData.set(key, value); 123 }); 124} 125 126class Worker extends EventEmitter { 127 constructor(filename, options = kEmptyObject) { 128 super(); 129 const isInternal = arguments[2] === kIsInternal; 130 debug( 131 `[${threadId}] create new worker`, 132 filename, 133 options, 134 `isInternal: ${isInternal}`, 135 ); 136 if (options.execArgv) 137 validateArray(options.execArgv, 'options.execArgv'); 138 139 let argv; 140 if (options.argv) { 141 validateArray(options.argv, 'options.argv'); 142 argv = ArrayPrototypeMap(options.argv, String); 143 } 144 145 let url, doEval; 146 if (isInternal) { 147 doEval = 'internal'; 148 url = `node:${filename}`; 149 } else if (options.eval) { 150 if (typeof filename !== 'string') { 151 throw new ERR_INVALID_ARG_VALUE( 152 'options.eval', 153 options.eval, 154 'must be false when \'filename\' is not a string', 155 ); 156 } 157 url = null; 158 doEval = 'classic'; 159 } else if (isURL(filename) && filename.protocol === 'data:') { 160 url = null; 161 doEval = 'module'; 162 filename = `import ${JSONStringify(`${filename}`)}`; 163 } else { 164 doEval = false; 165 if (isURL(filename)) { 166 url = filename; 167 filename = fileURLToPath(filename); 168 } else if (typeof filename !== 'string') { 169 throw new ERR_INVALID_ARG_TYPE( 170 'filename', 171 ['string', 'URL'], 172 filename, 173 ); 174 } else if (path.isAbsolute(filename) || 175 RegExpPrototypeExec(/^\.\.?[\\/]/, filename) !== null) { 176 filename = path.resolve(filename); 177 url = pathToFileURL(filename); 178 } else { 179 throw new ERR_WORKER_PATH(filename); 180 } 181 } 182 183 let env; 184 if (typeof options.env === 'object' && options.env !== null) { 185 env = ObjectCreate(null); 186 ArrayPrototypeForEach( 187 ObjectEntries(options.env), 188 ({ 0: key, 1: value }) => { env[key] = `${value}`; }, 189 ); 190 } else if (options.env == null) { 191 env = process.env; 192 } else if (options.env !== SHARE_ENV) { 193 throw new ERR_INVALID_ARG_TYPE( 194 'options.env', 195 ['object', 'undefined', 'null', 'worker_threads.SHARE_ENV'], 196 options.env); 197 } 198 199 let name = ''; 200 if (options.name) { 201 validateString(options.name, 'options.name'); 202 name = StringPrototypeTrim(options.name); 203 } 204 205 debug('instantiating Worker.', `url: ${url}`, `doEval: ${doEval}`); 206 // Set up the C++ handle for the worker, as well as some internal wiring. 207 this[kHandle] = new WorkerImpl(url, 208 env === process.env ? null : env, 209 options.execArgv, 210 parseResourceLimits(options.resourceLimits), 211 !!(options.trackUnmanagedFds ?? true), 212 isInternal, 213 name); 214 if (this[kHandle].invalidExecArgv) { 215 throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv); 216 } 217 if (this[kHandle].invalidNodeOptions) { 218 throw new ERR_WORKER_INVALID_EXEC_ARGV( 219 this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable'); 220 } 221 this[kHandle].onexit = (code, customErr, customErrReason) => { 222 this[kOnExit](code, customErr, customErrReason); 223 }; 224 this[kPort] = this[kHandle].messagePort; 225 this[kPort].on('message', (data) => this[kOnMessage](data)); 226 this[kPort].start(); 227 this[kPort].unref(); 228 this[kPort][kWaitingStreams] = 0; 229 debug(`[${threadId}] created Worker with ID ${this.threadId}`); 230 231 let stdin = null; 232 if (options.stdin) 233 stdin = new WritableWorkerStdio(this[kPort], 'stdin'); 234 const stdout = new ReadableWorkerStdio(this[kPort], 'stdout'); 235 if (!options.stdout) { 236 stdout[kIncrementsPortRef] = false; 237 pipeWithoutWarning(stdout, process.stdout); 238 } 239 const stderr = new ReadableWorkerStdio(this[kPort], 'stderr'); 240 if (!options.stderr) { 241 stderr[kIncrementsPortRef] = false; 242 pipeWithoutWarning(stderr, process.stderr); 243 } 244 245 this[kParentSideStdio] = { stdin, stdout, stderr }; 246 247 const { port1, port2 } = new MessageChannel(); 248 const transferList = [port2]; 249 // If transferList is provided. 250 if (options.transferList) 251 ArrayPrototypePush(transferList, 252 ...new SafeArrayIterator(options.transferList)); 253 254 this[kPublicPort] = port1; 255 ArrayPrototypeForEach(['message', 'messageerror'], (event) => { 256 this[kPublicPort].on(event, (message) => this.emit(event, message)); 257 }); 258 setupPortReferencing(this[kPublicPort], this, 'message'); 259 this[kPort].postMessage({ 260 argv, 261 type: messageTypes.LOAD_SCRIPT, 262 filename, 263 doEval, 264 isInternal, 265 cwdCounter: cwdCounter || workerIo.sharedCwdCounter, 266 workerData: options.workerData, 267 environmentData, 268 publicPort: port2, 269 manifestURL: getOptionValue('--experimental-policy') ? 270 require('internal/process/policy').url : 271 null, 272 manifestSrc: getOptionValue('--experimental-policy') ? 273 require('internal/process/policy').src : 274 null, 275 hasStdin: !!options.stdin, 276 }, transferList); 277 // Use this to cache the Worker's loopStart value once available. 278 this[kLoopStartTime] = -1; 279 this[kIsOnline] = false; 280 this.performance = { 281 eventLoopUtilization: FunctionPrototypeBind(eventLoopUtilization, this), 282 }; 283 // Actually start the new thread now that everything is in place. 284 this[kHandle].startThread(); 285 286 process.nextTick(() => process.emit('worker', this)); 287 } 288 289 [kOnExit](code, customErr, customErrReason) { 290 debug(`[${threadId}] hears end event for Worker ${this.threadId}`); 291 drainMessagePort(this[kPublicPort]); 292 drainMessagePort(this[kPort]); 293 this.removeAllListeners('message'); 294 this.removeAllListeners('messageerrors'); 295 this[kPublicPort].unref(); 296 this[kPort].unref(); 297 this[kDispose](); 298 if (customErr) { 299 debug(`[${threadId}] failing with custom error ${customErr} \ 300 and with reason ${customErrReason}`); 301 this.emit('error', new errorCodes[customErr](customErrReason)); 302 } 303 this.emit('exit', code); 304 this.removeAllListeners(); 305 } 306 307 [kOnCouldNotSerializeErr]() { 308 this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR()); 309 } 310 311 [kOnErrorMessage](serialized) { 312 // This is what is called for uncaught exceptions. 313 const error = deserializeError(serialized); 314 this.emit('error', error); 315 } 316 317 [kOnMessage](message) { 318 switch (message.type) { 319 case messageTypes.UP_AND_RUNNING: 320 this[kIsOnline] = true; 321 return this.emit('online'); 322 case messageTypes.COULD_NOT_SERIALIZE_ERROR: 323 return this[kOnCouldNotSerializeErr](); 324 case messageTypes.ERROR_MESSAGE: 325 return this[kOnErrorMessage](message.error); 326 case messageTypes.STDIO_PAYLOAD: 327 { 328 const { stream, chunks } = message; 329 const readable = this[kParentSideStdio][stream]; 330 ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => { 331 readable.push(chunk, encoding); 332 }); 333 return; 334 } 335 case messageTypes.STDIO_WANTS_MORE_DATA: 336 { 337 const { stream } = message; 338 return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback](); 339 } 340 } 341 342 assert.fail(`Unknown worker message type ${message.type}`); 343 } 344 345 [kDispose]() { 346 this[kHandle].onexit = null; 347 this[kHandle] = null; 348 this[kPort] = null; 349 this[kPublicPort] = null; 350 351 const { stdout, stderr } = this[kParentSideStdio]; 352 353 if (!stdout.readableEnded) { 354 debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`); 355 stdout.push(null); 356 } 357 if (!stderr.readableEnded) { 358 debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`); 359 stderr.push(null); 360 } 361 } 362 363 postMessage(...args) { 364 if (this[kPublicPort] === null) return; 365 366 ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args); 367 } 368 369 terminate(callback) { 370 debug(`[${threadId}] terminates Worker with ID ${this.threadId}`); 371 372 this.ref(); 373 374 if (typeof callback === 'function') { 375 process.emitWarning( 376 'Passing a callback to worker.terminate() is deprecated. ' + 377 'It returns a Promise instead.', 378 'DeprecationWarning', 'DEP0132'); 379 if (this[kHandle] === null) return PromiseResolve(); 380 this.once('exit', (exitCode) => callback(null, exitCode)); 381 } 382 383 if (this[kHandle] === null) return PromiseResolve(); 384 385 this[kHandle].stopThread(); 386 387 // Do not use events.once() here, because the 'exit' event will always be 388 // emitted regardless of any errors, and the point is to only resolve 389 // once the thread has actually stopped. 390 return new Promise((resolve) => { 391 this.once('exit', resolve); 392 }); 393 } 394 395 ref() { 396 if (this[kHandle] === null) return; 397 398 this[kHandle].ref(); 399 this[kPublicPort].ref(); 400 } 401 402 unref() { 403 if (this[kHandle] === null) return; 404 405 this[kHandle].unref(); 406 this[kPublicPort].unref(); 407 } 408 409 get threadId() { 410 if (this[kHandle] === null) return -1; 411 412 return this[kHandle].threadId; 413 } 414 415 get stdin() { 416 return this[kParentSideStdio].stdin; 417 } 418 419 get stdout() { 420 return this[kParentSideStdio].stdout; 421 } 422 423 get stderr() { 424 return this[kParentSideStdio].stderr; 425 } 426 427 get resourceLimits() { 428 if (this[kHandle] === null) return {}; 429 430 return makeResourceLimits(this[kHandle].getResourceLimits()); 431 } 432 433 getHeapSnapshot() { 434 const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot(); 435 return new Promise((resolve, reject) => { 436 if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING()); 437 heapSnapshotTaker.ondone = (handle) => { 438 const { HeapSnapshotStream } = require('internal/heap_utils'); 439 resolve(new HeapSnapshotStream(handle)); 440 }; 441 }); 442 } 443} 444 445/** 446 * A worker which has an internal module for entry point (e.g. internal/module/esm/worker). 447 * Internal workers bypass the permission model. 448 */ 449class InternalWorker extends Worker { 450 constructor(filename, options) { 451 super(filename, options, kIsInternal); 452 } 453 454 receiveMessageSync() { 455 return receiveMessageOnPort(this[kPublicPort]); 456 } 457} 458 459function pipeWithoutWarning(source, dest) { 460 const sourceMaxListeners = source._maxListeners; 461 const destMaxListeners = dest._maxListeners; 462 source.setMaxListeners(Infinity); 463 dest.setMaxListeners(Infinity); 464 465 source.pipe(dest); 466 467 source._maxListeners = sourceMaxListeners; 468 dest._maxListeners = destMaxListeners; 469} 470 471const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount); 472function parseResourceLimits(obj) { 473 const ret = resourceLimitsArray; 474 TypedArrayPrototypeFill(ret, -1); 475 if (typeof obj !== 'object' || obj === null) return ret; 476 477 if (typeof obj.maxOldGenerationSizeMb === 'number') 478 ret[kMaxOldGenerationSizeMb] = MathMax(obj.maxOldGenerationSizeMb, 2); 479 if (typeof obj.maxYoungGenerationSizeMb === 'number') 480 ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb; 481 if (typeof obj.codeRangeSizeMb === 'number') 482 ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb; 483 if (typeof obj.stackSizeMb === 'number') 484 ret[kStackSizeMb] = obj.stackSizeMb; 485 return ret; 486} 487 488function makeResourceLimits(float64arr) { 489 return { 490 maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb], 491 maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb], 492 codeRangeSizeMb: float64arr[kCodeRangeSizeMb], 493 stackSizeMb: float64arr[kStackSizeMb], 494 }; 495} 496 497function eventLoopUtilization(util1, util2) { 498 // TODO(trevnorris): Works to solve the thread-safe read/write issue of 499 // loopTime, but has the drawback that it can't be set until the event loop 500 // has had a chance to turn. So it will be impossible to read the ELU of 501 // a worker thread immediately after it's been created. 502 if (!this[kIsOnline] || !this[kHandle]) { 503 return { idle: 0, active: 0, utilization: 0 }; 504 } 505 506 // Cache loopStart, since it's only written to once. 507 if (this[kLoopStartTime] === -1) { 508 this[kLoopStartTime] = this[kHandle].loopStartTime(); 509 if (this[kLoopStartTime] === -1) 510 return { idle: 0, active: 0, utilization: 0 }; 511 } 512 513 if (util2) { 514 const idle = util1.idle - util2.idle; 515 const active = util1.active - util2.active; 516 return { idle, active, utilization: active / (idle + active) }; 517 } 518 519 const idle = this[kHandle].loopIdleTime(); 520 521 // Using performance.now() here is fine since it's always the time from 522 // the beginning of the process, and is why it needs to be offset by the 523 // loopStart time (which is also calculated from the beginning of the 524 // process). 525 const active = now() - this[kLoopStartTime] - idle; 526 527 if (!util1) { 528 return { idle, active, utilization: active / (idle + active) }; 529 } 530 531 const idle_delta = idle - util1.idle; 532 const active_delta = active - util1.active; 533 const utilization = active_delta / (idle_delta + active_delta); 534 return { idle: idle_delta, active: active_delta, utilization }; 535} 536 537module.exports = { 538 ownsProcessState, 539 kIsOnline, 540 isMainThread, 541 SHARE_ENV, 542 resourceLimits: 543 !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {}, 544 setEnvironmentData, 545 getEnvironmentData, 546 assignEnvironmentData, 547 threadId, 548 InternalWorker, 549 Worker, 550}; 551