• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypeForEach,
5  ArrayPrototypeMap,
6  ArrayPrototypePush,
7  Float64Array,
8  FunctionPrototypeBind,
9  JSONStringify,
10  MathMax,
11  ObjectCreate,
12  ObjectEntries,
13  Promise,
14  PromiseResolve,
15  ReflectApply,
16  RegExpPrototypeExec,
17  SafeArrayIterator,
18  SafeMap,
19  String,
20  StringPrototypeTrim,
21  Symbol,
22  SymbolFor,
23  TypedArrayPrototypeFill,
24  Uint32Array,
25  globalThis: { Atomics, SharedArrayBuffer },
26} = primordials;
27
28const EventEmitter = require('events');
29const assert = require('internal/assert');
30const path = require('path');
31const { now } = require('internal/perf/utils');
32
33const errorCodes = require('internal/errors').codes;
34const {
35  ERR_WORKER_NOT_RUNNING,
36  ERR_WORKER_PATH,
37  ERR_WORKER_UNSERIALIZABLE_ERROR,
38  ERR_WORKER_INVALID_EXEC_ARGV,
39  ERR_INVALID_ARG_TYPE,
40  ERR_INVALID_ARG_VALUE,
41} = errorCodes;
42const { getOptionValue } = require('internal/options');
43
44const workerIo = require('internal/worker/io');
45const {
46  drainMessagePort,
47  receiveMessageOnPort,
48  MessageChannel,
49  messageTypes,
50  kPort,
51  kIncrementsPortRef,
52  kWaitingStreams,
53  kStdioWantsMoreDataCallback,
54  setupPortReferencing,
55  ReadableWorkerStdio,
56  WritableWorkerStdio,
57} = workerIo;
58const { deserializeError } = require('internal/error_serdes');
59const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
60const { kEmptyObject } = require('internal/util');
61const { validateArray, validateString } = require('internal/validators');
62
63const {
64  ownsProcessState,
65  isMainThread,
66  resourceLimits: resourceLimitsRaw,
67  threadId,
68  Worker: WorkerImpl,
69  kMaxYoungGenerationSizeMb,
70  kMaxOldGenerationSizeMb,
71  kCodeRangeSizeMb,
72  kStackSizeMb,
73  kTotalResourceLimitCount,
74} = internalBinding('worker');
75
76const kHandle = Symbol('kHandle');
77const kPublicPort = Symbol('kPublicPort');
78const kDispose = Symbol('kDispose');
79const kOnExit = Symbol('kOnExit');
80const kOnMessage = Symbol('kOnMessage');
81const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
82const kOnErrorMessage = Symbol('kOnErrorMessage');
83const kParentSideStdio = Symbol('kParentSideStdio');
84const kLoopStartTime = Symbol('kLoopStartTime');
85const kIsInternal = Symbol('kIsInternal');
86const kIsOnline = Symbol('kIsOnline');
87
88const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV');
89let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
90  debug = fn;
91});
92
93let cwdCounter;
94
95const environmentData = new SafeMap();
96
97// SharedArrayBuffers can be disabled with --no-harmony-sharedarraybuffer.
98// Atomics can be disabled with --no-harmony-atomics.
99if (isMainThread && SharedArrayBuffer !== undefined && Atomics !== undefined) {
100  cwdCounter = new Uint32Array(new SharedArrayBuffer(4));
101  const originalChdir = process.chdir;
102  process.chdir = function(path) {
103    Atomics.add(cwdCounter, 0, 1);
104    originalChdir(path);
105  };
106}
107
108function setEnvironmentData(key, value) {
109  if (value === undefined)
110    environmentData.delete(key);
111  else
112    environmentData.set(key, value);
113}
114
115function getEnvironmentData(key) {
116  return environmentData.get(key);
117}
118
119function assignEnvironmentData(data) {
120  if (data === undefined) return;
121  data.forEach((value, key) => {
122    environmentData.set(key, value);
123  });
124}
125
126class Worker extends EventEmitter {
127  constructor(filename, options = kEmptyObject) {
128    super();
129    const isInternal = arguments[2] === kIsInternal;
130    debug(
131      `[${threadId}] create new worker`,
132      filename,
133      options,
134      `isInternal: ${isInternal}`,
135    );
136    if (options.execArgv)
137      validateArray(options.execArgv, 'options.execArgv');
138
139    let argv;
140    if (options.argv) {
141      validateArray(options.argv, 'options.argv');
142      argv = ArrayPrototypeMap(options.argv, String);
143    }
144
145    let url, doEval;
146    if (isInternal) {
147      doEval = 'internal';
148      url = `node:${filename}`;
149    } else if (options.eval) {
150      if (typeof filename !== 'string') {
151        throw new ERR_INVALID_ARG_VALUE(
152          'options.eval',
153          options.eval,
154          'must be false when \'filename\' is not a string',
155        );
156      }
157      url = null;
158      doEval = 'classic';
159    } else if (isURL(filename) && filename.protocol === 'data:') {
160      url = null;
161      doEval = 'module';
162      filename = `import ${JSONStringify(`${filename}`)}`;
163    } else {
164      doEval = false;
165      if (isURL(filename)) {
166        url = filename;
167        filename = fileURLToPath(filename);
168      } else if (typeof filename !== 'string') {
169        throw new ERR_INVALID_ARG_TYPE(
170          'filename',
171          ['string', 'URL'],
172          filename,
173        );
174      } else if (path.isAbsolute(filename) ||
175                 RegExpPrototypeExec(/^\.\.?[\\/]/, filename) !== null) {
176        filename = path.resolve(filename);
177        url = pathToFileURL(filename);
178      } else {
179        throw new ERR_WORKER_PATH(filename);
180      }
181    }
182
183    let env;
184    if (typeof options.env === 'object' && options.env !== null) {
185      env = ObjectCreate(null);
186      ArrayPrototypeForEach(
187        ObjectEntries(options.env),
188        ({ 0: key, 1: value }) => { env[key] = `${value}`; },
189      );
190    } else if (options.env == null) {
191      env = process.env;
192    } else if (options.env !== SHARE_ENV) {
193      throw new ERR_INVALID_ARG_TYPE(
194        'options.env',
195        ['object', 'undefined', 'null', 'worker_threads.SHARE_ENV'],
196        options.env);
197    }
198
199    let name = '';
200    if (options.name) {
201      validateString(options.name, 'options.name');
202      name = StringPrototypeTrim(options.name);
203    }
204
205    debug('instantiating Worker.', `url: ${url}`, `doEval: ${doEval}`);
206    // Set up the C++ handle for the worker, as well as some internal wiring.
207    this[kHandle] = new WorkerImpl(url,
208                                   env === process.env ? null : env,
209                                   options.execArgv,
210                                   parseResourceLimits(options.resourceLimits),
211                                   !!(options.trackUnmanagedFds ?? true),
212                                   isInternal,
213                                   name);
214    if (this[kHandle].invalidExecArgv) {
215      throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
216    }
217    if (this[kHandle].invalidNodeOptions) {
218      throw new ERR_WORKER_INVALID_EXEC_ARGV(
219        this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable');
220    }
221    this[kHandle].onexit = (code, customErr, customErrReason) => {
222      this[kOnExit](code, customErr, customErrReason);
223    };
224    this[kPort] = this[kHandle].messagePort;
225    this[kPort].on('message', (data) => this[kOnMessage](data));
226    this[kPort].start();
227    this[kPort].unref();
228    this[kPort][kWaitingStreams] = 0;
229    debug(`[${threadId}] created Worker with ID ${this.threadId}`);
230
231    let stdin = null;
232    if (options.stdin)
233      stdin = new WritableWorkerStdio(this[kPort], 'stdin');
234    const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
235    if (!options.stdout) {
236      stdout[kIncrementsPortRef] = false;
237      pipeWithoutWarning(stdout, process.stdout);
238    }
239    const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
240    if (!options.stderr) {
241      stderr[kIncrementsPortRef] = false;
242      pipeWithoutWarning(stderr, process.stderr);
243    }
244
245    this[kParentSideStdio] = { stdin, stdout, stderr };
246
247    const { port1, port2 } = new MessageChannel();
248    const transferList = [port2];
249    // If transferList is provided.
250    if (options.transferList)
251      ArrayPrototypePush(transferList,
252                         ...new SafeArrayIterator(options.transferList));
253
254    this[kPublicPort] = port1;
255    ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
256      this[kPublicPort].on(event, (message) => this.emit(event, message));
257    });
258    setupPortReferencing(this[kPublicPort], this, 'message');
259    this[kPort].postMessage({
260      argv,
261      type: messageTypes.LOAD_SCRIPT,
262      filename,
263      doEval,
264      isInternal,
265      cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
266      workerData: options.workerData,
267      environmentData,
268      publicPort: port2,
269      manifestURL: getOptionValue('--experimental-policy') ?
270        require('internal/process/policy').url :
271        null,
272      manifestSrc: getOptionValue('--experimental-policy') ?
273        require('internal/process/policy').src :
274        null,
275      hasStdin: !!options.stdin,
276    }, transferList);
277    // Use this to cache the Worker's loopStart value once available.
278    this[kLoopStartTime] = -1;
279    this[kIsOnline] = false;
280    this.performance = {
281      eventLoopUtilization: FunctionPrototypeBind(eventLoopUtilization, this),
282    };
283    // Actually start the new thread now that everything is in place.
284    this[kHandle].startThread();
285
286    process.nextTick(() => process.emit('worker', this));
287  }
288
289  [kOnExit](code, customErr, customErrReason) {
290    debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
291    drainMessagePort(this[kPublicPort]);
292    drainMessagePort(this[kPort]);
293    this.removeAllListeners('message');
294    this.removeAllListeners('messageerrors');
295    this[kPublicPort].unref();
296    this[kPort].unref();
297    this[kDispose]();
298    if (customErr) {
299      debug(`[${threadId}] failing with custom error ${customErr} \
300        and with reason ${customErrReason}`);
301      this.emit('error', new errorCodes[customErr](customErrReason));
302    }
303    this.emit('exit', code);
304    this.removeAllListeners();
305  }
306
307  [kOnCouldNotSerializeErr]() {
308    this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR());
309  }
310
311  [kOnErrorMessage](serialized) {
312    // This is what is called for uncaught exceptions.
313    const error = deserializeError(serialized);
314    this.emit('error', error);
315  }
316
317  [kOnMessage](message) {
318    switch (message.type) {
319      case messageTypes.UP_AND_RUNNING:
320        this[kIsOnline] = true;
321        return this.emit('online');
322      case messageTypes.COULD_NOT_SERIALIZE_ERROR:
323        return this[kOnCouldNotSerializeErr]();
324      case messageTypes.ERROR_MESSAGE:
325        return this[kOnErrorMessage](message.error);
326      case messageTypes.STDIO_PAYLOAD:
327      {
328        const { stream, chunks } = message;
329        const readable = this[kParentSideStdio][stream];
330        ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
331          readable.push(chunk, encoding);
332        });
333        return;
334      }
335      case messageTypes.STDIO_WANTS_MORE_DATA:
336      {
337        const { stream } = message;
338        return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
339      }
340    }
341
342    assert.fail(`Unknown worker message type ${message.type}`);
343  }
344
345  [kDispose]() {
346    this[kHandle].onexit = null;
347    this[kHandle] = null;
348    this[kPort] = null;
349    this[kPublicPort] = null;
350
351    const { stdout, stderr } = this[kParentSideStdio];
352
353    if (!stdout.readableEnded) {
354      debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
355      stdout.push(null);
356    }
357    if (!stderr.readableEnded) {
358      debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
359      stderr.push(null);
360    }
361  }
362
363  postMessage(...args) {
364    if (this[kPublicPort] === null) return;
365
366    ReflectApply(this[kPublicPort].postMessage, this[kPublicPort], args);
367  }
368
369  terminate(callback) {
370    debug(`[${threadId}] terminates Worker with ID ${this.threadId}`);
371
372    this.ref();
373
374    if (typeof callback === 'function') {
375      process.emitWarning(
376        'Passing a callback to worker.terminate() is deprecated. ' +
377        'It returns a Promise instead.',
378        'DeprecationWarning', 'DEP0132');
379      if (this[kHandle] === null) return PromiseResolve();
380      this.once('exit', (exitCode) => callback(null, exitCode));
381    }
382
383    if (this[kHandle] === null) return PromiseResolve();
384
385    this[kHandle].stopThread();
386
387    // Do not use events.once() here, because the 'exit' event will always be
388    // emitted regardless of any errors, and the point is to only resolve
389    // once the thread has actually stopped.
390    return new Promise((resolve) => {
391      this.once('exit', resolve);
392    });
393  }
394
395  ref() {
396    if (this[kHandle] === null) return;
397
398    this[kHandle].ref();
399    this[kPublicPort].ref();
400  }
401
402  unref() {
403    if (this[kHandle] === null) return;
404
405    this[kHandle].unref();
406    this[kPublicPort].unref();
407  }
408
409  get threadId() {
410    if (this[kHandle] === null) return -1;
411
412    return this[kHandle].threadId;
413  }
414
415  get stdin() {
416    return this[kParentSideStdio].stdin;
417  }
418
419  get stdout() {
420    return this[kParentSideStdio].stdout;
421  }
422
423  get stderr() {
424    return this[kParentSideStdio].stderr;
425  }
426
427  get resourceLimits() {
428    if (this[kHandle] === null) return {};
429
430    return makeResourceLimits(this[kHandle].getResourceLimits());
431  }
432
433  getHeapSnapshot() {
434    const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot();
435    return new Promise((resolve, reject) => {
436      if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING());
437      heapSnapshotTaker.ondone = (handle) => {
438        const { HeapSnapshotStream } = require('internal/heap_utils');
439        resolve(new HeapSnapshotStream(handle));
440      };
441    });
442  }
443}
444
445/**
446 * A worker which has an internal module for entry point (e.g. internal/module/esm/worker).
447 * Internal workers bypass the permission model.
448 */
449class InternalWorker extends Worker {
450  constructor(filename, options) {
451    super(filename, options, kIsInternal);
452  }
453
454  receiveMessageSync() {
455    return receiveMessageOnPort(this[kPublicPort]);
456  }
457}
458
459function pipeWithoutWarning(source, dest) {
460  const sourceMaxListeners = source._maxListeners;
461  const destMaxListeners = dest._maxListeners;
462  source.setMaxListeners(Infinity);
463  dest.setMaxListeners(Infinity);
464
465  source.pipe(dest);
466
467  source._maxListeners = sourceMaxListeners;
468  dest._maxListeners = destMaxListeners;
469}
470
471const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount);
472function parseResourceLimits(obj) {
473  const ret = resourceLimitsArray;
474  TypedArrayPrototypeFill(ret, -1);
475  if (typeof obj !== 'object' || obj === null) return ret;
476
477  if (typeof obj.maxOldGenerationSizeMb === 'number')
478    ret[kMaxOldGenerationSizeMb] = MathMax(obj.maxOldGenerationSizeMb, 2);
479  if (typeof obj.maxYoungGenerationSizeMb === 'number')
480    ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb;
481  if (typeof obj.codeRangeSizeMb === 'number')
482    ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb;
483  if (typeof obj.stackSizeMb === 'number')
484    ret[kStackSizeMb] = obj.stackSizeMb;
485  return ret;
486}
487
488function makeResourceLimits(float64arr) {
489  return {
490    maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb],
491    maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb],
492    codeRangeSizeMb: float64arr[kCodeRangeSizeMb],
493    stackSizeMb: float64arr[kStackSizeMb],
494  };
495}
496
497function eventLoopUtilization(util1, util2) {
498  // TODO(trevnorris): Works to solve the thread-safe read/write issue of
499  // loopTime, but has the drawback that it can't be set until the event loop
500  // has had a chance to turn. So it will be impossible to read the ELU of
501  // a worker thread immediately after it's been created.
502  if (!this[kIsOnline] || !this[kHandle]) {
503    return { idle: 0, active: 0, utilization: 0 };
504  }
505
506  // Cache loopStart, since it's only written to once.
507  if (this[kLoopStartTime] === -1) {
508    this[kLoopStartTime] = this[kHandle].loopStartTime();
509    if (this[kLoopStartTime] === -1)
510      return { idle: 0, active: 0, utilization: 0 };
511  }
512
513  if (util2) {
514    const idle = util1.idle - util2.idle;
515    const active = util1.active - util2.active;
516    return { idle, active, utilization: active / (idle + active) };
517  }
518
519  const idle = this[kHandle].loopIdleTime();
520
521  // Using performance.now() here is fine since it's always the time from
522  // the beginning of the process, and is why it needs to be offset by the
523  // loopStart time (which is also calculated from the beginning of the
524  // process).
525  const active = now() - this[kLoopStartTime] - idle;
526
527  if (!util1) {
528    return { idle, active, utilization: active / (idle + active) };
529  }
530
531  const idle_delta = idle - util1.idle;
532  const active_delta = active - util1.active;
533  const utilization = active_delta / (idle_delta + active_delta);
534  return { idle: idle_delta, active: active_delta, utilization };
535}
536
537module.exports = {
538  ownsProcessState,
539  kIsOnline,
540  isMainThread,
541  SHARE_ENV,
542  resourceLimits:
543    !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {},
544  setEnvironmentData,
545  getEnvironmentData,
546  assignEnvironmentData,
547  threadId,
548  InternalWorker,
549  Worker,
550};
551