• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypeForEach,
5  ArrayPrototypeMap,
6  ArrayPrototypePush,
7  Float64Array,
8  FunctionPrototypeBind,
9  JSONStringify,
10  MathMax,
11  ObjectCreate,
12  ObjectEntries,
13  Promise,
14  PromiseResolve,
15  ReflectApply,
16  RegExpPrototypeExec,
17  SafeArrayIterator,
18  SafeMap,
19  String,
20  StringPrototypeTrim,
21  Symbol,
22  SymbolFor,
23  TypedArrayPrototypeFill,
24  Uint32Array,
25  globalThis: { Atomics, SharedArrayBuffer },
26} = primordials;
27
28const EventEmitter = require('events');
29const assert = require('internal/assert');
30const path = require('path');
31const { now } = require('internal/perf/utils');
32
33const errorCodes = require('internal/errors').codes;
34const {
35  ERR_WORKER_NOT_RUNNING,
36  ERR_WORKER_PATH,
37  ERR_WORKER_UNSERIALIZABLE_ERROR,
38  ERR_WORKER_INVALID_EXEC_ARGV,
39  ERR_INVALID_ARG_TYPE,
40  ERR_INVALID_ARG_VALUE,
41} = errorCodes;
42const { getOptionValue } = require('internal/options');
43
44const workerIo = require('internal/worker/io');
45const {
46  drainMessagePort,
47  MessageChannel,
48  messageTypes,
49  kPort,
50  kIncrementsPortRef,
51  kWaitingStreams,
52  kStdioWantsMoreDataCallback,
53  setupPortReferencing,
54  ReadableWorkerStdio,
55  WritableWorkerStdio,
56} = workerIo;
57const { deserializeError } = require('internal/error_serdes');
58const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
59const { kEmptyObject } = require('internal/util');
60const { validateArray, validateString } = require('internal/validators');
61
62const {
63  ownsProcessState,
64  isMainThread,
65  resourceLimits: resourceLimitsRaw,
66  threadId,
67  Worker: WorkerImpl,
68  kMaxYoungGenerationSizeMb,
69  kMaxOldGenerationSizeMb,
70  kCodeRangeSizeMb,
71  kStackSizeMb,
72  kTotalResourceLimitCount,
73} = internalBinding('worker');
74
75const kHandle = Symbol('kHandle');
76const kPublicPort = Symbol('kPublicPort');
77const kDispose = Symbol('kDispose');
78const kOnExit = Symbol('kOnExit');
79const kOnMessage = Symbol('kOnMessage');
80const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
81const kOnErrorMessage = Symbol('kOnErrorMessage');
82const kParentSideStdio = Symbol('kParentSideStdio');
83const kLoopStartTime = Symbol('kLoopStartTime');
84const kIsOnline = Symbol('kIsOnline');
85
86const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV');
87let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
88  debug = fn;
89});
90
91let cwdCounter;
92
93const environmentData = new SafeMap();
94
95// SharedArrayBuffers can be disabled with --no-harmony-sharedarraybuffer.
96// Atomics can be disabled with --no-harmony-atomics.
97if (isMainThread && SharedArrayBuffer !== undefined && Atomics !== undefined) {
98  cwdCounter = new Uint32Array(new SharedArrayBuffer(4));
99  const originalChdir = process.chdir;
100  process.chdir = function(path) {
101    Atomics.add(cwdCounter, 0, 1);
102    originalChdir(path);
103  };
104}
105
106function setEnvironmentData(key, value) {
107  if (value === undefined)
108    environmentData.delete(key);
109  else
110    environmentData.set(key, value);
111}
112
113function getEnvironmentData(key) {
114  return environmentData.get(key);
115}
116
117function assignEnvironmentData(data) {
118  if (data === undefined) return;
119  data.forEach((value, key) => {
120    environmentData.set(key, value);
121  });
122}
123
124class Worker extends EventEmitter {
125  constructor(filename, options = kEmptyObject) {
126    super();
127    debug(`[${threadId}] create new worker`, filename, options);
128    if (options.execArgv)
129      validateArray(options.execArgv, 'options.execArgv');
130
131    let argv;
132    if (options.argv) {
133      validateArray(options.argv, 'options.argv');
134      argv = ArrayPrototypeMap(options.argv, String);
135    }
136
137    let url, doEval;
138    if (options.eval) {
139      if (typeof filename !== 'string') {
140        throw new ERR_INVALID_ARG_VALUE(
141          'options.eval',
142          options.eval,
143          'must be false when \'filename\' is not a string',
144        );
145      }
146      url = null;
147      doEval = 'classic';
148    } else if (isURL(filename) && filename.protocol === 'data:') {
149      url = null;
150      doEval = 'module';
151      filename = `import ${JSONStringify(`${filename}`)}`;
152    } else {
153      doEval = false;
154      if (isURL(filename)) {
155        url = filename;
156        filename = fileURLToPath(filename);
157      } else if (typeof filename !== 'string') {
158        throw new ERR_INVALID_ARG_TYPE(
159          'filename',
160          ['string', 'URL'],
161          filename,
162        );
163      } else if (path.isAbsolute(filename) ||
164                 RegExpPrototypeExec(/^\.\.?[\\/]/, filename) !== null) {
165        filename = path.resolve(filename);
166        url = pathToFileURL(filename);
167      } else {
168        throw new ERR_WORKER_PATH(filename);
169      }
170    }
171
172    let env;
173    if (typeof options.env === 'object' && options.env !== null) {
174      env = ObjectCreate(null);
175      ArrayPrototypeForEach(
176        ObjectEntries(options.env),
177        ({ 0: key, 1: value }) => { env[key] = `${value}`; },
178      );
179    } else if (options.env == null) {
180      env = process.env;
181    } else if (options.env !== SHARE_ENV) {
182      throw new ERR_INVALID_ARG_TYPE(
183        'options.env',
184        ['object', 'undefined', 'null', 'worker_threads.SHARE_ENV'],
185        options.env);
186    }
187
188    let name = '';
189    if (options.name) {
190      validateString(options.name, 'options.name');
191      name = StringPrototypeTrim(options.name);
192    }
193
194    // Set up the C++ handle for the worker, as well as some internal wiring.
195    this[kHandle] = new WorkerImpl(url,
196                                   env === process.env ? null : env,
197                                   options.execArgv,
198                                   parseResourceLimits(options.resourceLimits),
199                                   !!(options.trackUnmanagedFds ?? true),
200                                   name);
201    if (this[kHandle].invalidExecArgv) {
202      throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
203    }
204    if (this[kHandle].invalidNodeOptions) {
205      throw new ERR_WORKER_INVALID_EXEC_ARGV(
206        this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable');
207    }
208    this[kHandle].onexit = (code, customErr, customErrReason) => {
209      this[kOnExit](code, customErr, customErrReason);
210    };
211    this[kPort] = this[kHandle].messagePort;
212    this[kPort].on('message', (data) => this[kOnMessage](data));
213    this[kPort].start();
214    this[kPort].unref();
215    this[kPort][kWaitingStreams] = 0;
216    debug(`[${threadId}] created Worker with ID ${this.threadId}`);
217
218    let stdin = null;
219    if (options.stdin)
220      stdin = new WritableWorkerStdio(this[kPort], 'stdin');
221    const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
222    if (!options.stdout) {
223      stdout[kIncrementsPortRef] = false;
224      pipeWithoutWarning(stdout, process.stdout);
225    }
226    const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
227    if (!options.stderr) {
228      stderr[kIncrementsPortRef] = false;
229      pipeWithoutWarning(stderr, process.stderr);
230    }
231
232    this[kParentSideStdio] = { stdin, stdout, stderr };
233
234    const { port1, port2 } = new MessageChannel();
235    const transferList = [port2];
236    // If transferList is provided.
237    if (options.transferList)
238      ArrayPrototypePush(transferList,
239                         ...new SafeArrayIterator(options.transferList));
240
241    this[kPublicPort] = port1;
242    ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
243      this[kPublicPort].on(event, (message) => this.emit(event, message));
244    });
245    setupPortReferencing(this[kPublicPort], this, 'message');
246    this[kPort].postMessage({
247      argv,
248      type: messageTypes.LOAD_SCRIPT,
249      filename,
250      doEval,
251      cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
252      workerData: options.workerData,
253      environmentData,
254      publicPort: port2,
255      manifestURL: getOptionValue('--experimental-policy') ?
256        require('internal/process/policy').url :
257        null,
258      manifestSrc: getOptionValue('--experimental-policy') ?
259        require('internal/process/policy').src :
260        null,
261      hasStdin: !!options.stdin,
262    }, transferList);
263    // Use this to cache the Worker's loopStart value once available.
264    this[kLoopStartTime] = -1;
265    this[kIsOnline] = false;
266    this.performance = {
267      eventLoopUtilization: FunctionPrototypeBind(eventLoopUtilization, this),
268    };
269    // Actually start the new thread now that everything is in place.
270    this[kHandle].startThread();
271
272    process.nextTick(() => process.emit('worker', this));
273  }
274
275  [kOnExit](code, customErr, customErrReason) {
276    debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
277    drainMessagePort(this[kPublicPort]);
278    drainMessagePort(this[kPort]);
279    this.removeAllListeners('message');
280    this.removeAllListeners('messageerrors');
281    this[kPublicPort].unref();
282    this[kPort].unref();
283    this[kDispose]();
284    if (customErr) {
285      debug(`[${threadId}] failing with custom error ${customErr} \
286        and with reason ${customErrReason}`);
287      this.emit('error', new errorCodes[customErr](customErrReason));
288    }
289    this.emit('exit', code);
290    this.removeAllListeners();
291  }
292
293  [kOnCouldNotSerializeErr]() {
294    this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR());
295  }
296
297  [kOnErrorMessage](serialized) {
298    // This is what is called for uncaught exceptions.
299    const error = deserializeError(serialized);
300    this.emit('error', error);
301  }
302
303  [kOnMessage](message) {
304    switch (message.type) {
305      case messageTypes.UP_AND_RUNNING:
306        this[kIsOnline] = true;
307        return this.emit('online');
308      case messageTypes.COULD_NOT_SERIALIZE_ERROR:
309        return this[kOnCouldNotSerializeErr]();
310      case messageTypes.ERROR_MESSAGE:
311        return this[kOnErrorMessage](message.error);
312      case messageTypes.STDIO_PAYLOAD:
313      {
314        const { stream, chunks } = message;
315        const readable = this[kParentSideStdio][stream];
316        ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
317          readable.push(chunk, encoding);
318        });
319        return;
320      }
321      case messageTypes.STDIO_WANTS_MORE_DATA:
322      {
323        const { stream } = message;
324        return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
325      }
326    }
327
328    assert.fail(`Unknown worker message type ${message.type}`);
329  }
330
331  [kDispose]() {
332    this[kHandle].onexit = null;
333    this[kHandle] = null;
334    this[kPort] = null;
335    this[kPublicPort] = null;
336
337    const { stdout, stderr } = this[kParentSideStdio];
338
339    if (!stdout.readableEnded) {
340      debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
341      stdout.push(null);
342    }
343    if (!stderr.readableEnded) {
344      debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
345      stderr.push(null);
346    }
347  }
348
349  postMessage(...args) {
350    if (this[kPublicPort] === null) return;
351
352    ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args);
353  }
354
355  terminate(callback) {
356    debug(`[${threadId}] terminates Worker with ID ${this.threadId}`);
357
358    this.ref();
359
360    if (typeof callback === 'function') {
361      process.emitWarning(
362        'Passing a callback to worker.terminate() is deprecated. ' +
363        'It returns a Promise instead.',
364        'DeprecationWarning', 'DEP0132');
365      if (this[kHandle] === null) return PromiseResolve();
366      this.once('exit', (exitCode) => callback(null, exitCode));
367    }
368
369    if (this[kHandle] === null) return PromiseResolve();
370
371    this[kHandle].stopThread();
372
373    // Do not use events.once() here, because the 'exit' event will always be
374    // emitted regardless of any errors, and the point is to only resolve
375    // once the thread has actually stopped.
376    return new Promise((resolve) => {
377      this.once('exit', resolve);
378    });
379  }
380
381  ref() {
382    if (this[kHandle] === null) return;
383
384    this[kHandle].ref();
385    this[kPublicPort].ref();
386  }
387
388  unref() {
389    if (this[kHandle] === null) return;
390
391    this[kHandle].unref();
392    this[kPublicPort].unref();
393  }
394
395  get threadId() {
396    if (this[kHandle] === null) return -1;
397
398    return this[kHandle].threadId;
399  }
400
401  get stdin() {
402    return this[kParentSideStdio].stdin;
403  }
404
405  get stdout() {
406    return this[kParentSideStdio].stdout;
407  }
408
409  get stderr() {
410    return this[kParentSideStdio].stderr;
411  }
412
413  get resourceLimits() {
414    if (this[kHandle] === null) return {};
415
416    return makeResourceLimits(this[kHandle].getResourceLimits());
417  }
418
419  getHeapSnapshot() {
420    const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot();
421    return new Promise((resolve, reject) => {
422      if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING());
423      heapSnapshotTaker.ondone = (handle) => {
424        const { HeapSnapshotStream } = require('internal/heap_utils');
425        resolve(new HeapSnapshotStream(handle));
426      };
427    });
428  }
429}
430
431function pipeWithoutWarning(source, dest) {
432  const sourceMaxListeners = source._maxListeners;
433  const destMaxListeners = dest._maxListeners;
434  source.setMaxListeners(Infinity);
435  dest.setMaxListeners(Infinity);
436
437  source.pipe(dest);
438
439  source._maxListeners = sourceMaxListeners;
440  dest._maxListeners = destMaxListeners;
441}
442
443const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount);
444function parseResourceLimits(obj) {
445  const ret = resourceLimitsArray;
446  TypedArrayPrototypeFill(ret, -1);
447  if (typeof obj !== 'object' || obj === null) return ret;
448
449  if (typeof obj.maxOldGenerationSizeMb === 'number')
450    ret[kMaxOldGenerationSizeMb] = MathMax(obj.maxOldGenerationSizeMb, 2);
451  if (typeof obj.maxYoungGenerationSizeMb === 'number')
452    ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb;
453  if (typeof obj.codeRangeSizeMb === 'number')
454    ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb;
455  if (typeof obj.stackSizeMb === 'number')
456    ret[kStackSizeMb] = obj.stackSizeMb;
457  return ret;
458}
459
460function makeResourceLimits(float64arr) {
461  return {
462    maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb],
463    maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb],
464    codeRangeSizeMb: float64arr[kCodeRangeSizeMb],
465    stackSizeMb: float64arr[kStackSizeMb],
466  };
467}
468
469function eventLoopUtilization(util1, util2) {
470  // TODO(trevnorris): Works to solve the thread-safe read/write issue of
471  // loopTime, but has the drawback that it can't be set until the event loop
472  // has had a chance to turn. So it will be impossible to read the ELU of
473  // a worker thread immediately after it's been created.
474  if (!this[kIsOnline] || !this[kHandle]) {
475    return { idle: 0, active: 0, utilization: 0 };
476  }
477
478  // Cache loopStart, since it's only written to once.
479  if (this[kLoopStartTime] === -1) {
480    this[kLoopStartTime] = this[kHandle].loopStartTime();
481    if (this[kLoopStartTime] === -1)
482      return { idle: 0, active: 0, utilization: 0 };
483  }
484
485  if (util2) {
486    const idle = util1.idle - util2.idle;
487    const active = util1.active - util2.active;
488    return { idle, active, utilization: active / (idle + active) };
489  }
490
491  const idle = this[kHandle].loopIdleTime();
492
493  // Using performance.now() here is fine since it's always the time from
494  // the beginning of the process, and is why it needs to be offset by the
495  // loopStart time (which is also calculated from the beginning of the
496  // process).
497  const active = now() - this[kLoopStartTime] - idle;
498
499  if (!util1) {
500    return { idle, active, utilization: active / (idle + active) };
501  }
502
503  const idle_delta = idle - util1.idle;
504  const active_delta = active - util1.active;
505  const utilization = active_delta / (idle_delta + active_delta);
506  return { idle: idle_delta, active: active_delta, utilization };
507}
508
509module.exports = {
510  ownsProcessState,
511  isMainThread,
512  SHARE_ENV,
513  resourceLimits:
514    !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {},
515  setEnvironmentData,
516  getEnvironmentData,
517  assignEnvironmentData,
518  threadId,
519  Worker,
520};
521