1'use strict'; 2 3const { 4 ArrayPrototypeForEach, 5 ArrayPrototypeMap, 6 ArrayPrototypePush, 7 Float64Array, 8 FunctionPrototypeBind, 9 JSONStringify, 10 MathMax, 11 ObjectCreate, 12 ObjectEntries, 13 Promise, 14 PromiseResolve, 15 ReflectApply, 16 RegExpPrototypeExec, 17 SafeArrayIterator, 18 SafeMap, 19 String, 20 StringPrototypeTrim, 21 Symbol, 22 SymbolFor, 23 TypedArrayPrototypeFill, 24 Uint32Array, 25 globalThis: { Atomics, SharedArrayBuffer }, 26} = primordials; 27 28const EventEmitter = require('events'); 29const assert = require('internal/assert'); 30const path = require('path'); 31const { now } = require('internal/perf/utils'); 32 33const errorCodes = require('internal/errors').codes; 34const { 35 ERR_WORKER_NOT_RUNNING, 36 ERR_WORKER_PATH, 37 ERR_WORKER_UNSERIALIZABLE_ERROR, 38 ERR_WORKER_INVALID_EXEC_ARGV, 39 ERR_INVALID_ARG_TYPE, 40 ERR_INVALID_ARG_VALUE, 41} = errorCodes; 42const { getOptionValue } = require('internal/options'); 43 44const workerIo = require('internal/worker/io'); 45const { 46 drainMessagePort, 47 MessageChannel, 48 messageTypes, 49 kPort, 50 kIncrementsPortRef, 51 kWaitingStreams, 52 kStdioWantsMoreDataCallback, 53 setupPortReferencing, 54 ReadableWorkerStdio, 55 WritableWorkerStdio, 56} = workerIo; 57const { deserializeError } = require('internal/error_serdes'); 58const { fileURLToPath, isURL, pathToFileURL } = require('internal/url'); 59const { kEmptyObject } = require('internal/util'); 60const { validateArray, validateString } = require('internal/validators'); 61 62const { 63 ownsProcessState, 64 isMainThread, 65 resourceLimits: resourceLimitsRaw, 66 threadId, 67 Worker: WorkerImpl, 68 kMaxYoungGenerationSizeMb, 69 kMaxOldGenerationSizeMb, 70 kCodeRangeSizeMb, 71 kStackSizeMb, 72 kTotalResourceLimitCount, 73} = internalBinding('worker'); 74 75const kHandle = Symbol('kHandle'); 76const kPublicPort = Symbol('kPublicPort'); 77const kDispose = Symbol('kDispose'); 78const kOnExit = Symbol('kOnExit'); 79const kOnMessage = Symbol('kOnMessage'); 80const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); 81const kOnErrorMessage = Symbol('kOnErrorMessage'); 82const kParentSideStdio = Symbol('kParentSideStdio'); 83const kLoopStartTime = Symbol('kLoopStartTime'); 84const kIsOnline = Symbol('kIsOnline'); 85 86const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV'); 87let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { 88 debug = fn; 89}); 90 91let cwdCounter; 92 93const environmentData = new SafeMap(); 94 95// SharedArrayBuffers can be disabled with --no-harmony-sharedarraybuffer. 96// Atomics can be disabled with --no-harmony-atomics. 97if (isMainThread && SharedArrayBuffer !== undefined && Atomics !== undefined) { 98 cwdCounter = new Uint32Array(new SharedArrayBuffer(4)); 99 const originalChdir = process.chdir; 100 process.chdir = function(path) { 101 Atomics.add(cwdCounter, 0, 1); 102 originalChdir(path); 103 }; 104} 105 106function setEnvironmentData(key, value) { 107 if (value === undefined) 108 environmentData.delete(key); 109 else 110 environmentData.set(key, value); 111} 112 113function getEnvironmentData(key) { 114 return environmentData.get(key); 115} 116 117function assignEnvironmentData(data) { 118 if (data === undefined) return; 119 data.forEach((value, key) => { 120 environmentData.set(key, value); 121 }); 122} 123 124class Worker extends EventEmitter { 125 constructor(filename, options = kEmptyObject) { 126 super(); 127 debug(`[${threadId}] create new worker`, filename, options); 128 if (options.execArgv) 129 validateArray(options.execArgv, 'options.execArgv'); 130 131 let argv; 132 if (options.argv) { 133 validateArray(options.argv, 'options.argv'); 134 argv = ArrayPrototypeMap(options.argv, String); 135 } 136 137 let url, doEval; 138 if (options.eval) { 139 if (typeof filename !== 'string') { 140 throw new ERR_INVALID_ARG_VALUE( 141 'options.eval', 142 options.eval, 143 'must be false when \'filename\' is not a string', 144 ); 145 } 146 url = null; 147 doEval = 'classic'; 148 } else if (isURL(filename) && filename.protocol === 'data:') { 149 url = null; 150 doEval = 'module'; 151 filename = `import ${JSONStringify(`${filename}`)}`; 152 } else { 153 doEval = false; 154 if (isURL(filename)) { 155 url = filename; 156 filename = fileURLToPath(filename); 157 } else if (typeof filename !== 'string') { 158 throw new ERR_INVALID_ARG_TYPE( 159 'filename', 160 ['string', 'URL'], 161 filename, 162 ); 163 } else if (path.isAbsolute(filename) || 164 RegExpPrototypeExec(/^\.\.?[\\/]/, filename) !== null) { 165 filename = path.resolve(filename); 166 url = pathToFileURL(filename); 167 } else { 168 throw new ERR_WORKER_PATH(filename); 169 } 170 } 171 172 let env; 173 if (typeof options.env === 'object' && options.env !== null) { 174 env = ObjectCreate(null); 175 ArrayPrototypeForEach( 176 ObjectEntries(options.env), 177 ({ 0: key, 1: value }) => { env[key] = `${value}`; }, 178 ); 179 } else if (options.env == null) { 180 env = process.env; 181 } else if (options.env !== SHARE_ENV) { 182 throw new ERR_INVALID_ARG_TYPE( 183 'options.env', 184 ['object', 'undefined', 'null', 'worker_threads.SHARE_ENV'], 185 options.env); 186 } 187 188 let name = ''; 189 if (options.name) { 190 validateString(options.name, 'options.name'); 191 name = StringPrototypeTrim(options.name); 192 } 193 194 // Set up the C++ handle for the worker, as well as some internal wiring. 195 this[kHandle] = new WorkerImpl(url, 196 env === process.env ? null : env, 197 options.execArgv, 198 parseResourceLimits(options.resourceLimits), 199 !!(options.trackUnmanagedFds ?? true), 200 name); 201 if (this[kHandle].invalidExecArgv) { 202 throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv); 203 } 204 if (this[kHandle].invalidNodeOptions) { 205 throw new ERR_WORKER_INVALID_EXEC_ARGV( 206 this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable'); 207 } 208 this[kHandle].onexit = (code, customErr, customErrReason) => { 209 this[kOnExit](code, customErr, customErrReason); 210 }; 211 this[kPort] = this[kHandle].messagePort; 212 this[kPort].on('message', (data) => this[kOnMessage](data)); 213 this[kPort].start(); 214 this[kPort].unref(); 215 this[kPort][kWaitingStreams] = 0; 216 debug(`[${threadId}] created Worker with ID ${this.threadId}`); 217 218 let stdin = null; 219 if (options.stdin) 220 stdin = new WritableWorkerStdio(this[kPort], 'stdin'); 221 const stdout = new ReadableWorkerStdio(this[kPort], 'stdout'); 222 if (!options.stdout) { 223 stdout[kIncrementsPortRef] = false; 224 pipeWithoutWarning(stdout, process.stdout); 225 } 226 const stderr = new ReadableWorkerStdio(this[kPort], 'stderr'); 227 if (!options.stderr) { 228 stderr[kIncrementsPortRef] = false; 229 pipeWithoutWarning(stderr, process.stderr); 230 } 231 232 this[kParentSideStdio] = { stdin, stdout, stderr }; 233 234 const { port1, port2 } = new MessageChannel(); 235 const transferList = [port2]; 236 // If transferList is provided. 237 if (options.transferList) 238 ArrayPrototypePush(transferList, 239 ...new SafeArrayIterator(options.transferList)); 240 241 this[kPublicPort] = port1; 242 ArrayPrototypeForEach(['message', 'messageerror'], (event) => { 243 this[kPublicPort].on(event, (message) => this.emit(event, message)); 244 }); 245 setupPortReferencing(this[kPublicPort], this, 'message'); 246 this[kPort].postMessage({ 247 argv, 248 type: messageTypes.LOAD_SCRIPT, 249 filename, 250 doEval, 251 cwdCounter: cwdCounter || workerIo.sharedCwdCounter, 252 workerData: options.workerData, 253 environmentData, 254 publicPort: port2, 255 manifestURL: getOptionValue('--experimental-policy') ? 256 require('internal/process/policy').url : 257 null, 258 manifestSrc: getOptionValue('--experimental-policy') ? 259 require('internal/process/policy').src : 260 null, 261 hasStdin: !!options.stdin, 262 }, transferList); 263 // Use this to cache the Worker's loopStart value once available. 264 this[kLoopStartTime] = -1; 265 this[kIsOnline] = false; 266 this.performance = { 267 eventLoopUtilization: FunctionPrototypeBind(eventLoopUtilization, this), 268 }; 269 // Actually start the new thread now that everything is in place. 270 this[kHandle].startThread(); 271 272 process.nextTick(() => process.emit('worker', this)); 273 } 274 275 [kOnExit](code, customErr, customErrReason) { 276 debug(`[${threadId}] hears end event for Worker ${this.threadId}`); 277 drainMessagePort(this[kPublicPort]); 278 drainMessagePort(this[kPort]); 279 this.removeAllListeners('message'); 280 this.removeAllListeners('messageerrors'); 281 this[kPublicPort].unref(); 282 this[kPort].unref(); 283 this[kDispose](); 284 if (customErr) { 285 debug(`[${threadId}] failing with custom error ${customErr} \ 286 and with reason ${customErrReason}`); 287 this.emit('error', new errorCodes[customErr](customErrReason)); 288 } 289 this.emit('exit', code); 290 this.removeAllListeners(); 291 } 292 293 [kOnCouldNotSerializeErr]() { 294 this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR()); 295 } 296 297 [kOnErrorMessage](serialized) { 298 // This is what is called for uncaught exceptions. 299 const error = deserializeError(serialized); 300 this.emit('error', error); 301 } 302 303 [kOnMessage](message) { 304 switch (message.type) { 305 case messageTypes.UP_AND_RUNNING: 306 this[kIsOnline] = true; 307 return this.emit('online'); 308 case messageTypes.COULD_NOT_SERIALIZE_ERROR: 309 return this[kOnCouldNotSerializeErr](); 310 case messageTypes.ERROR_MESSAGE: 311 return this[kOnErrorMessage](message.error); 312 case messageTypes.STDIO_PAYLOAD: 313 { 314 const { stream, chunks } = message; 315 const readable = this[kParentSideStdio][stream]; 316 ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => { 317 readable.push(chunk, encoding); 318 }); 319 return; 320 } 321 case messageTypes.STDIO_WANTS_MORE_DATA: 322 { 323 const { stream } = message; 324 return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback](); 325 } 326 } 327 328 assert.fail(`Unknown worker message type ${message.type}`); 329 } 330 331 [kDispose]() { 332 this[kHandle].onexit = null; 333 this[kHandle] = null; 334 this[kPort] = null; 335 this[kPublicPort] = null; 336 337 const { stdout, stderr } = this[kParentSideStdio]; 338 339 if (!stdout.readableEnded) { 340 debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`); 341 stdout.push(null); 342 } 343 if (!stderr.readableEnded) { 344 debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`); 345 stderr.push(null); 346 } 347 } 348 349 postMessage(...args) { 350 if (this[kPublicPort] === null) return; 351 352 ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args); 353 } 354 355 terminate(callback) { 356 debug(`[${threadId}] terminates Worker with ID ${this.threadId}`); 357 358 this.ref(); 359 360 if (typeof callback === 'function') { 361 process.emitWarning( 362 'Passing a callback to worker.terminate() is deprecated. ' + 363 'It returns a Promise instead.', 364 'DeprecationWarning', 'DEP0132'); 365 if (this[kHandle] === null) return PromiseResolve(); 366 this.once('exit', (exitCode) => callback(null, exitCode)); 367 } 368 369 if (this[kHandle] === null) return PromiseResolve(); 370 371 this[kHandle].stopThread(); 372 373 // Do not use events.once() here, because the 'exit' event will always be 374 // emitted regardless of any errors, and the point is to only resolve 375 // once the thread has actually stopped. 376 return new Promise((resolve) => { 377 this.once('exit', resolve); 378 }); 379 } 380 381 ref() { 382 if (this[kHandle] === null) return; 383 384 this[kHandle].ref(); 385 this[kPublicPort].ref(); 386 } 387 388 unref() { 389 if (this[kHandle] === null) return; 390 391 this[kHandle].unref(); 392 this[kPublicPort].unref(); 393 } 394 395 get threadId() { 396 if (this[kHandle] === null) return -1; 397 398 return this[kHandle].threadId; 399 } 400 401 get stdin() { 402 return this[kParentSideStdio].stdin; 403 } 404 405 get stdout() { 406 return this[kParentSideStdio].stdout; 407 } 408 409 get stderr() { 410 return this[kParentSideStdio].stderr; 411 } 412 413 get resourceLimits() { 414 if (this[kHandle] === null) return {}; 415 416 return makeResourceLimits(this[kHandle].getResourceLimits()); 417 } 418 419 getHeapSnapshot() { 420 const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot(); 421 return new Promise((resolve, reject) => { 422 if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING()); 423 heapSnapshotTaker.ondone = (handle) => { 424 const { HeapSnapshotStream } = require('internal/heap_utils'); 425 resolve(new HeapSnapshotStream(handle)); 426 }; 427 }); 428 } 429} 430 431function pipeWithoutWarning(source, dest) { 432 const sourceMaxListeners = source._maxListeners; 433 const destMaxListeners = dest._maxListeners; 434 source.setMaxListeners(Infinity); 435 dest.setMaxListeners(Infinity); 436 437 source.pipe(dest); 438 439 source._maxListeners = sourceMaxListeners; 440 dest._maxListeners = destMaxListeners; 441} 442 443const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount); 444function parseResourceLimits(obj) { 445 const ret = resourceLimitsArray; 446 TypedArrayPrototypeFill(ret, -1); 447 if (typeof obj !== 'object' || obj === null) return ret; 448 449 if (typeof obj.maxOldGenerationSizeMb === 'number') 450 ret[kMaxOldGenerationSizeMb] = MathMax(obj.maxOldGenerationSizeMb, 2); 451 if (typeof obj.maxYoungGenerationSizeMb === 'number') 452 ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb; 453 if (typeof obj.codeRangeSizeMb === 'number') 454 ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb; 455 if (typeof obj.stackSizeMb === 'number') 456 ret[kStackSizeMb] = obj.stackSizeMb; 457 return ret; 458} 459 460function makeResourceLimits(float64arr) { 461 return { 462 maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb], 463 maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb], 464 codeRangeSizeMb: float64arr[kCodeRangeSizeMb], 465 stackSizeMb: float64arr[kStackSizeMb], 466 }; 467} 468 469function eventLoopUtilization(util1, util2) { 470 // TODO(trevnorris): Works to solve the thread-safe read/write issue of 471 // loopTime, but has the drawback that it can't be set until the event loop 472 // has had a chance to turn. So it will be impossible to read the ELU of 473 // a worker thread immediately after it's been created. 474 if (!this[kIsOnline] || !this[kHandle]) { 475 return { idle: 0, active: 0, utilization: 0 }; 476 } 477 478 // Cache loopStart, since it's only written to once. 479 if (this[kLoopStartTime] === -1) { 480 this[kLoopStartTime] = this[kHandle].loopStartTime(); 481 if (this[kLoopStartTime] === -1) 482 return { idle: 0, active: 0, utilization: 0 }; 483 } 484 485 if (util2) { 486 const idle = util1.idle - util2.idle; 487 const active = util1.active - util2.active; 488 return { idle, active, utilization: active / (idle + active) }; 489 } 490 491 const idle = this[kHandle].loopIdleTime(); 492 493 // Using performance.now() here is fine since it's always the time from 494 // the beginning of the process, and is why it needs to be offset by the 495 // loopStart time (which is also calculated from the beginning of the 496 // process). 497 const active = now() - this[kLoopStartTime] - idle; 498 499 if (!util1) { 500 return { idle, active, utilization: active / (idle + active) }; 501 } 502 503 const idle_delta = idle - util1.idle; 504 const active_delta = active - util1.active; 505 const utilization = active_delta / (idle_delta + active_delta); 506 return { idle: idle_delta, active: active_delta, utilization }; 507} 508 509module.exports = { 510 ownsProcessState, 511 isMainThread, 512 SHARE_ENV, 513 resourceLimits: 514 !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {}, 515 setEnvironmentData, 516 getEnvironmentData, 517 assignEnvironmentData, 518 threadId, 519 Worker, 520}; 521