1'use strict'; 2 3const { 4 ArrayIsArray, 5 ArrayPrototypeMap, 6 ArrayPrototypePush, 7 Float64Array, 8 FunctionPrototypeBind, 9 JSONStringify, 10 MathMax, 11 ObjectCreate, 12 ObjectEntries, 13 Promise, 14 PromiseResolve, 15 RegExpPrototypeTest, 16 SafeMap, 17 String, 18 Symbol, 19 SymbolFor, 20 TypedArrayPrototypeFill, 21 Uint32Array, 22 globalThis: { Atomics, SharedArrayBuffer }, 23} = primordials; 24 25const EventEmitter = require('events'); 26const assert = require('internal/assert'); 27const path = require('path'); 28const { timeOrigin } = internalBinding('performance'); 29 30const errorCodes = require('internal/errors').codes; 31const { 32 ERR_WORKER_NOT_RUNNING, 33 ERR_WORKER_PATH, 34 ERR_WORKER_UNSERIALIZABLE_ERROR, 35 ERR_WORKER_UNSUPPORTED_EXTENSION, 36 ERR_WORKER_INVALID_EXEC_ARGV, 37 ERR_INVALID_ARG_TYPE, 38 ERR_INVALID_ARG_VALUE, 39} = errorCodes; 40const { getOptionValue } = require('internal/options'); 41 42const workerIo = require('internal/worker/io'); 43const { 44 drainMessagePort, 45 MessageChannel, 46 messageTypes, 47 kPort, 48 kIncrementsPortRef, 49 kWaitingStreams, 50 kStdioWantsMoreDataCallback, 51 setupPortReferencing, 52 ReadableWorkerStdio, 53 WritableWorkerStdio 54} = workerIo; 55const { deserializeError } = require('internal/error_serdes'); 56const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url'); 57 58const { 59 ownsProcessState, 60 isMainThread, 61 resourceLimits: resourceLimitsRaw, 62 threadId, 63 Worker: WorkerImpl, 64 kMaxYoungGenerationSizeMb, 65 kMaxOldGenerationSizeMb, 66 kCodeRangeSizeMb, 67 kStackSizeMb, 68 kTotalResourceLimitCount 69} = internalBinding('worker'); 70 71const kHandle = Symbol('kHandle'); 72const kPublicPort = Symbol('kPublicPort'); 73const kDispose = Symbol('kDispose'); 74const kOnExit = Symbol('kOnExit'); 75const kOnMessage = Symbol('kOnMessage'); 76const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr'); 77const kOnErrorMessage = Symbol('kOnErrorMessage'); 78const kParentSideStdio = Symbol('kParentSideStdio'); 79const kLoopStartTime = Symbol('kLoopStartTime'); 80const kIsOnline = Symbol('kIsOnline'); 81 82const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV'); 83let debug = require('internal/util/debuglog').debuglog('worker', (fn) => { 84 debug = fn; 85}); 86 87let cwdCounter; 88 89const environmentData = new SafeMap(); 90 91if (isMainThread) { 92 cwdCounter = new Uint32Array(new SharedArrayBuffer(4)); 93 const originalChdir = process.chdir; 94 process.chdir = function(path) { 95 Atomics.add(cwdCounter, 0, 1); 96 originalChdir(path); 97 }; 98} 99 100function setEnvironmentData(key, value) { 101 if (value === undefined) 102 environmentData.delete(key); 103 else 104 environmentData.set(key, value); 105} 106 107function getEnvironmentData(key) { 108 return environmentData.get(key); 109} 110 111function assignEnvironmentData(data) { 112 if (data === undefined) return; 113 data.forEach((value, key) => { 114 environmentData.set(key, value); 115 }); 116} 117 118class Worker extends EventEmitter { 119 constructor(filename, options = {}) { 120 super(); 121 debug(`[${threadId}] create new worker`, filename, options); 122 if (options.execArgv && !ArrayIsArray(options.execArgv)) { 123 throw new ERR_INVALID_ARG_TYPE('options.execArgv', 124 'Array', 125 options.execArgv); 126 } 127 let argv; 128 if (options.argv) { 129 if (!ArrayIsArray(options.argv)) { 130 throw new ERR_INVALID_ARG_TYPE('options.argv', 'Array', options.argv); 131 } 132 argv = ArrayPrototypeMap(options.argv, String); 133 } 134 135 let url, doEval; 136 if (options.eval) { 137 if (typeof filename !== 'string') { 138 throw new ERR_INVALID_ARG_VALUE( 139 'options.eval', 140 options.eval, 141 'must be false when \'filename\' is not a string' 142 ); 143 } 144 url = null; 145 doEval = 'classic'; 146 } else if (isURLInstance(filename) && filename.protocol === 'data:') { 147 url = null; 148 doEval = 'module'; 149 filename = `import ${JSONStringify(`${filename}`)}`; 150 } else { 151 doEval = false; 152 if (isURLInstance(filename)) { 153 url = filename; 154 filename = fileURLToPath(filename); 155 } else if (typeof filename !== 'string') { 156 throw new ERR_INVALID_ARG_TYPE( 157 'filename', 158 ['string', 'URL'], 159 filename 160 ); 161 } else if (path.isAbsolute(filename) || 162 RegExpPrototypeTest(/^\.\.?[\\/]/, filename)) { 163 filename = path.resolve(filename); 164 url = pathToFileURL(filename); 165 } else { 166 throw new ERR_WORKER_PATH(filename); 167 } 168 169 const ext = path.extname(filename); 170 if (ext !== '.js' && ext !== '.mjs' && ext !== '.cjs') { 171 throw new ERR_WORKER_UNSUPPORTED_EXTENSION(ext); 172 } 173 } 174 175 let env; 176 if (typeof options.env === 'object' && options.env !== null) { 177 env = ObjectCreate(null); 178 for (const [ key, value ] of ObjectEntries(options.env)) 179 env[key] = `${value}`; 180 } else if (options.env == null) { 181 env = process.env; 182 } else if (options.env !== SHARE_ENV) { 183 throw new ERR_INVALID_ARG_TYPE( 184 'options.env', 185 ['object', 'undefined', 'null', 'worker_threads.SHARE_ENV'], 186 options.env); 187 } 188 189 // Set up the C++ handle for the worker, as well as some internal wiring. 190 this[kHandle] = new WorkerImpl(url, 191 env === process.env ? null : env, 192 options.execArgv, 193 parseResourceLimits(options.resourceLimits), 194 !!options.trackUnmanagedFds); 195 if (this[kHandle].invalidExecArgv) { 196 throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv); 197 } 198 if (this[kHandle].invalidNodeOptions) { 199 throw new ERR_WORKER_INVALID_EXEC_ARGV( 200 this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable'); 201 } 202 this[kHandle].onexit = (code, customErr, customErrReason) => { 203 this[kOnExit](code, customErr, customErrReason); 204 }; 205 this[kPort] = this[kHandle].messagePort; 206 this[kPort].on('message', (data) => this[kOnMessage](data)); 207 this[kPort].start(); 208 this[kPort].unref(); 209 this[kPort][kWaitingStreams] = 0; 210 debug(`[${threadId}] created Worker with ID ${this.threadId}`); 211 212 let stdin = null; 213 if (options.stdin) 214 stdin = new WritableWorkerStdio(this[kPort], 'stdin'); 215 const stdout = new ReadableWorkerStdio(this[kPort], 'stdout'); 216 if (!options.stdout) { 217 stdout[kIncrementsPortRef] = false; 218 pipeWithoutWarning(stdout, process.stdout); 219 } 220 const stderr = new ReadableWorkerStdio(this[kPort], 'stderr'); 221 if (!options.stderr) { 222 stderr[kIncrementsPortRef] = false; 223 pipeWithoutWarning(stderr, process.stderr); 224 } 225 226 this[kParentSideStdio] = { stdin, stdout, stderr }; 227 228 const { port1, port2 } = new MessageChannel(); 229 const transferList = [port2]; 230 // If transferList is provided. 231 if (options.transferList) 232 ArrayPrototypePush(transferList, ...options.transferList); 233 234 this[kPublicPort] = port1; 235 for (const event of ['message', 'messageerror']) { 236 this[kPublicPort].on(event, (message) => this.emit(event, message)); 237 } 238 setupPortReferencing(this[kPublicPort], this, 'message'); 239 this[kPort].postMessage({ 240 argv, 241 type: messageTypes.LOAD_SCRIPT, 242 filename, 243 doEval, 244 cwdCounter: cwdCounter || workerIo.sharedCwdCounter, 245 workerData: options.workerData, 246 environmentData, 247 publicPort: port2, 248 manifestURL: getOptionValue('--experimental-policy') ? 249 require('internal/process/policy').url : 250 null, 251 manifestSrc: getOptionValue('--experimental-policy') ? 252 require('internal/process/policy').src : 253 null, 254 hasStdin: !!options.stdin 255 }, transferList); 256 // Use this to cache the Worker's loopStart value once available. 257 this[kLoopStartTime] = -1; 258 this[kIsOnline] = false; 259 this.performance = { 260 eventLoopUtilization: FunctionPrototypeBind(eventLoopUtilization, this), 261 }; 262 // Actually start the new thread now that everything is in place. 263 this[kHandle].startThread(); 264 265 process.nextTick(() => process.emit('worker', this)); 266 } 267 268 [kOnExit](code, customErr, customErrReason) { 269 debug(`[${threadId}] hears end event for Worker ${this.threadId}`); 270 drainMessagePort(this[kPublicPort]); 271 drainMessagePort(this[kPort]); 272 this[kDispose](); 273 if (customErr) { 274 debug(`[${threadId}] failing with custom error ${customErr} \ 275 and with reason ${customErrReason}`); 276 this.emit('error', new errorCodes[customErr](customErrReason)); 277 } 278 this.emit('exit', code); 279 this.removeAllListeners(); 280 } 281 282 [kOnCouldNotSerializeErr]() { 283 this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR()); 284 } 285 286 [kOnErrorMessage](serialized) { 287 // This is what is called for uncaught exceptions. 288 const error = deserializeError(serialized); 289 this.emit('error', error); 290 } 291 292 [kOnMessage](message) { 293 switch (message.type) { 294 case messageTypes.UP_AND_RUNNING: 295 this[kIsOnline] = true; 296 return this.emit('online'); 297 case messageTypes.COULD_NOT_SERIALIZE_ERROR: 298 return this[kOnCouldNotSerializeErr](); 299 case messageTypes.ERROR_MESSAGE: 300 return this[kOnErrorMessage](message.error); 301 case messageTypes.STDIO_PAYLOAD: 302 { 303 const { stream, chunks } = message; 304 const readable = this[kParentSideStdio][stream]; 305 for (const { chunk, encoding } of chunks) 306 readable.push(chunk, encoding); 307 return; 308 } 309 case messageTypes.STDIO_WANTS_MORE_DATA: 310 { 311 const { stream } = message; 312 return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback](); 313 } 314 } 315 316 assert.fail(`Unknown worker message type ${message.type}`); 317 } 318 319 [kDispose]() { 320 this[kHandle].onexit = null; 321 this[kHandle] = null; 322 this[kPort] = null; 323 this[kPublicPort] = null; 324 325 const { stdout, stderr } = this[kParentSideStdio]; 326 327 if (!stdout.readableEnded) { 328 debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`); 329 stdout.push(null); 330 } 331 if (!stderr.readableEnded) { 332 debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`); 333 stderr.push(null); 334 } 335 } 336 337 postMessage(...args) { 338 if (this[kPublicPort] === null) return; 339 340 this[kPublicPort].postMessage(...args); 341 } 342 343 terminate(callback) { 344 debug(`[${threadId}] terminates Worker with ID ${this.threadId}`); 345 346 this.ref(); 347 348 if (typeof callback === 'function') { 349 process.emitWarning( 350 'Passing a callback to worker.terminate() is deprecated. ' + 351 'It returns a Promise instead.', 352 'DeprecationWarning', 'DEP0132'); 353 if (this[kHandle] === null) return PromiseResolve(); 354 this.once('exit', (exitCode) => callback(null, exitCode)); 355 } 356 357 if (this[kHandle] === null) return PromiseResolve(); 358 359 this[kHandle].stopThread(); 360 361 // Do not use events.once() here, because the 'exit' event will always be 362 // emitted regardless of any errors, and the point is to only resolve 363 // once the thread has actually stopped. 364 return new Promise((resolve) => { 365 this.once('exit', resolve); 366 }); 367 } 368 369 ref() { 370 if (this[kHandle] === null) return; 371 372 this[kHandle].ref(); 373 this[kPublicPort].ref(); 374 } 375 376 unref() { 377 if (this[kHandle] === null) return; 378 379 this[kHandle].unref(); 380 this[kPublicPort].unref(); 381 } 382 383 get threadId() { 384 if (this[kHandle] === null) return -1; 385 386 return this[kHandle].threadId; 387 } 388 389 get stdin() { 390 return this[kParentSideStdio].stdin; 391 } 392 393 get stdout() { 394 return this[kParentSideStdio].stdout; 395 } 396 397 get stderr() { 398 return this[kParentSideStdio].stderr; 399 } 400 401 get resourceLimits() { 402 if (this[kHandle] === null) return {}; 403 404 return makeResourceLimits(this[kHandle].getResourceLimits()); 405 } 406 407 getHeapSnapshot() { 408 const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot(); 409 return new Promise((resolve, reject) => { 410 if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING()); 411 heapSnapshotTaker.ondone = (handle) => { 412 const { HeapSnapshotStream } = require('internal/heap_utils'); 413 resolve(new HeapSnapshotStream(handle)); 414 }; 415 }); 416 } 417} 418 419function pipeWithoutWarning(source, dest) { 420 const sourceMaxListeners = source._maxListeners; 421 const destMaxListeners = dest._maxListeners; 422 source.setMaxListeners(Infinity); 423 dest.setMaxListeners(Infinity); 424 425 source.pipe(dest); 426 427 source._maxListeners = sourceMaxListeners; 428 dest._maxListeners = destMaxListeners; 429} 430 431const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount); 432function parseResourceLimits(obj) { 433 const ret = resourceLimitsArray; 434 TypedArrayPrototypeFill(ret, -1); 435 if (typeof obj !== 'object' || obj === null) return ret; 436 437 if (typeof obj.maxOldGenerationSizeMb === 'number') 438 ret[kMaxOldGenerationSizeMb] = MathMax(obj.maxOldGenerationSizeMb, 2); 439 if (typeof obj.maxYoungGenerationSizeMb === 'number') 440 ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb; 441 if (typeof obj.codeRangeSizeMb === 'number') 442 ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb; 443 if (typeof obj.stackSizeMb === 'number') 444 ret[kStackSizeMb] = obj.stackSizeMb; 445 return ret; 446} 447 448function makeResourceLimits(float64arr) { 449 return { 450 maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb], 451 maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb], 452 codeRangeSizeMb: float64arr[kCodeRangeSizeMb], 453 stackSizeMb: float64arr[kStackSizeMb] 454 }; 455} 456 457function eventLoopUtilization(util1, util2) { 458 // TODO(trevnorris): Works to solve the thread-safe read/write issue of 459 // loopTime, but has the drawback that it can't be set until the event loop 460 // has had a chance to turn. So it will be impossible to read the ELU of 461 // a worker thread immediately after it's been created. 462 if (!this[kIsOnline] || !this[kHandle]) { 463 return { idle: 0, active: 0, utilization: 0 }; 464 } 465 466 // Cache loopStart, since it's only written to once. 467 if (this[kLoopStartTime] === -1) { 468 this[kLoopStartTime] = this[kHandle].loopStartTime(); 469 if (this[kLoopStartTime] === -1) 470 return { idle: 0, active: 0, utilization: 0 }; 471 } 472 473 if (util2) { 474 const idle = util1.idle - util2.idle; 475 const active = util1.active - util2.active; 476 return { idle, active, utilization: active / (idle + active) }; 477 } 478 479 const idle = this[kHandle].loopIdleTime(); 480 481 // Using performance.now() here is fine since it's always the time from 482 // the beginning of the process, and is why it needs to be offset by the 483 // loopStart time (which is also calculated from the beginning of the 484 // process). 485 const active = now() - this[kLoopStartTime] - idle; 486 487 if (!util1) { 488 return { idle, active, utilization: active / (idle + active) }; 489 } 490 491 const idle_delta = idle - util1.idle; 492 const active_delta = active - util1.active; 493 const utilization = active_delta / (idle_delta + active_delta); 494 return { idle: idle_delta, active: active_delta, utilization }; 495} 496 497// Duplicate code from performance.now() so don't need to require perf_hooks. 498function now() { 499 const hr = process.hrtime(); 500 return (hr[0] * 1000 + hr[1] / 1e6) - timeOrigin; 501} 502 503module.exports = { 504 ownsProcessState, 505 isMainThread, 506 SHARE_ENV, 507 resourceLimits: 508 !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {}, 509 setEnvironmentData, 510 getEnvironmentData, 511 assignEnvironmentData, 512 threadId, 513 Worker, 514}; 515