• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayIsArray,
5  ArrayPrototypeMap,
6  ArrayPrototypePush,
7  Float64Array,
8  FunctionPrototypeBind,
9  JSONStringify,
10  MathMax,
11  ObjectCreate,
12  ObjectEntries,
13  Promise,
14  PromiseResolve,
15  RegExpPrototypeTest,
16  SafeMap,
17  String,
18  Symbol,
19  SymbolFor,
20  TypedArrayPrototypeFill,
21  Uint32Array,
22  globalThis: { Atomics, SharedArrayBuffer },
23} = primordials;
24
25const EventEmitter = require('events');
26const assert = require('internal/assert');
27const path = require('path');
28const { timeOrigin } = internalBinding('performance');
29
30const errorCodes = require('internal/errors').codes;
31const {
32  ERR_WORKER_NOT_RUNNING,
33  ERR_WORKER_PATH,
34  ERR_WORKER_UNSERIALIZABLE_ERROR,
35  ERR_WORKER_UNSUPPORTED_EXTENSION,
36  ERR_WORKER_INVALID_EXEC_ARGV,
37  ERR_INVALID_ARG_TYPE,
38  ERR_INVALID_ARG_VALUE,
39} = errorCodes;
40const { getOptionValue } = require('internal/options');
41
42const workerIo = require('internal/worker/io');
43const {
44  drainMessagePort,
45  MessageChannel,
46  messageTypes,
47  kPort,
48  kIncrementsPortRef,
49  kWaitingStreams,
50  kStdioWantsMoreDataCallback,
51  setupPortReferencing,
52  ReadableWorkerStdio,
53  WritableWorkerStdio
54} = workerIo;
55const { deserializeError } = require('internal/error_serdes');
56const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url');
57
58const {
59  ownsProcessState,
60  isMainThread,
61  resourceLimits: resourceLimitsRaw,
62  threadId,
63  Worker: WorkerImpl,
64  kMaxYoungGenerationSizeMb,
65  kMaxOldGenerationSizeMb,
66  kCodeRangeSizeMb,
67  kStackSizeMb,
68  kTotalResourceLimitCount
69} = internalBinding('worker');
70
71const kHandle = Symbol('kHandle');
72const kPublicPort = Symbol('kPublicPort');
73const kDispose = Symbol('kDispose');
74const kOnExit = Symbol('kOnExit');
75const kOnMessage = Symbol('kOnMessage');
76const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
77const kOnErrorMessage = Symbol('kOnErrorMessage');
78const kParentSideStdio = Symbol('kParentSideStdio');
79const kLoopStartTime = Symbol('kLoopStartTime');
80const kIsOnline = Symbol('kIsOnline');
81
82const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV');
83let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
84  debug = fn;
85});
86
87let cwdCounter;
88
89const environmentData = new SafeMap();
90
91if (isMainThread) {
92  cwdCounter = new Uint32Array(new SharedArrayBuffer(4));
93  const originalChdir = process.chdir;
94  process.chdir = function(path) {
95    Atomics.add(cwdCounter, 0, 1);
96    originalChdir(path);
97  };
98}
99
100function setEnvironmentData(key, value) {
101  if (value === undefined)
102    environmentData.delete(key);
103  else
104    environmentData.set(key, value);
105}
106
107function getEnvironmentData(key) {
108  return environmentData.get(key);
109}
110
111function assignEnvironmentData(data) {
112  if (data === undefined) return;
113  data.forEach((value, key) => {
114    environmentData.set(key, value);
115  });
116}
117
118class Worker extends EventEmitter {
119  constructor(filename, options = {}) {
120    super();
121    debug(`[${threadId}] create new worker`, filename, options);
122    if (options.execArgv && !ArrayIsArray(options.execArgv)) {
123      throw new ERR_INVALID_ARG_TYPE('options.execArgv',
124                                     'Array',
125                                     options.execArgv);
126    }
127    let argv;
128    if (options.argv) {
129      if (!ArrayIsArray(options.argv)) {
130        throw new ERR_INVALID_ARG_TYPE('options.argv', 'Array', options.argv);
131      }
132      argv = ArrayPrototypeMap(options.argv, String);
133    }
134
135    let url, doEval;
136    if (options.eval) {
137      if (typeof filename !== 'string') {
138        throw new ERR_INVALID_ARG_VALUE(
139          'options.eval',
140          options.eval,
141          'must be false when \'filename\' is not a string'
142        );
143      }
144      url = null;
145      doEval = 'classic';
146    } else if (isURLInstance(filename) && filename.protocol === 'data:') {
147      url = null;
148      doEval = 'module';
149      filename = `import ${JSONStringify(`${filename}`)}`;
150    } else {
151      doEval = false;
152      if (isURLInstance(filename)) {
153        url = filename;
154        filename = fileURLToPath(filename);
155      } else if (typeof filename !== 'string') {
156        throw new ERR_INVALID_ARG_TYPE(
157          'filename',
158          ['string', 'URL'],
159          filename
160        );
161      } else if (path.isAbsolute(filename) ||
162                 RegExpPrototypeTest(/^\.\.?[\\/]/, filename)) {
163        filename = path.resolve(filename);
164        url = pathToFileURL(filename);
165      } else {
166        throw new ERR_WORKER_PATH(filename);
167      }
168
169      const ext = path.extname(filename);
170      if (ext !== '.js' && ext !== '.mjs' && ext !== '.cjs') {
171        throw new ERR_WORKER_UNSUPPORTED_EXTENSION(ext);
172      }
173    }
174
175    let env;
176    if (typeof options.env === 'object' && options.env !== null) {
177      env = ObjectCreate(null);
178      for (const [ key, value ] of ObjectEntries(options.env))
179        env[key] = `${value}`;
180    } else if (options.env == null) {
181      env = process.env;
182    } else if (options.env !== SHARE_ENV) {
183      throw new ERR_INVALID_ARG_TYPE(
184        'options.env',
185        ['object', 'undefined', 'null', 'worker_threads.SHARE_ENV'],
186        options.env);
187    }
188
189    // Set up the C++ handle for the worker, as well as some internal wiring.
190    this[kHandle] = new WorkerImpl(url,
191                                   env === process.env ? null : env,
192                                   options.execArgv,
193                                   parseResourceLimits(options.resourceLimits),
194                                   !!options.trackUnmanagedFds);
195    if (this[kHandle].invalidExecArgv) {
196      throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
197    }
198    if (this[kHandle].invalidNodeOptions) {
199      throw new ERR_WORKER_INVALID_EXEC_ARGV(
200        this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable');
201    }
202    this[kHandle].onexit = (code, customErr, customErrReason) => {
203      this[kOnExit](code, customErr, customErrReason);
204    };
205    this[kPort] = this[kHandle].messagePort;
206    this[kPort].on('message', (data) => this[kOnMessage](data));
207    this[kPort].start();
208    this[kPort].unref();
209    this[kPort][kWaitingStreams] = 0;
210    debug(`[${threadId}] created Worker with ID ${this.threadId}`);
211
212    let stdin = null;
213    if (options.stdin)
214      stdin = new WritableWorkerStdio(this[kPort], 'stdin');
215    const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
216    if (!options.stdout) {
217      stdout[kIncrementsPortRef] = false;
218      pipeWithoutWarning(stdout, process.stdout);
219    }
220    const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
221    if (!options.stderr) {
222      stderr[kIncrementsPortRef] = false;
223      pipeWithoutWarning(stderr, process.stderr);
224    }
225
226    this[kParentSideStdio] = { stdin, stdout, stderr };
227
228    const { port1, port2 } = new MessageChannel();
229    const transferList = [port2];
230    // If transferList is provided.
231    if (options.transferList)
232      ArrayPrototypePush(transferList, ...options.transferList);
233
234    this[kPublicPort] = port1;
235    for (const event of ['message', 'messageerror']) {
236      this[kPublicPort].on(event, (message) => this.emit(event, message));
237    }
238    setupPortReferencing(this[kPublicPort], this, 'message');
239    this[kPort].postMessage({
240      argv,
241      type: messageTypes.LOAD_SCRIPT,
242      filename,
243      doEval,
244      cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
245      workerData: options.workerData,
246      environmentData,
247      publicPort: port2,
248      manifestURL: getOptionValue('--experimental-policy') ?
249        require('internal/process/policy').url :
250        null,
251      manifestSrc: getOptionValue('--experimental-policy') ?
252        require('internal/process/policy').src :
253        null,
254      hasStdin: !!options.stdin
255    }, transferList);
256    // Use this to cache the Worker's loopStart value once available.
257    this[kLoopStartTime] = -1;
258    this[kIsOnline] = false;
259    this.performance = {
260      eventLoopUtilization: FunctionPrototypeBind(eventLoopUtilization, this),
261    };
262    // Actually start the new thread now that everything is in place.
263    this[kHandle].startThread();
264
265    process.nextTick(() => process.emit('worker', this));
266  }
267
268  [kOnExit](code, customErr, customErrReason) {
269    debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
270    drainMessagePort(this[kPublicPort]);
271    drainMessagePort(this[kPort]);
272    this[kDispose]();
273    if (customErr) {
274      debug(`[${threadId}] failing with custom error ${customErr} \
275        and with reason ${customErrReason}`);
276      this.emit('error', new errorCodes[customErr](customErrReason));
277    }
278    this.emit('exit', code);
279    this.removeAllListeners();
280  }
281
282  [kOnCouldNotSerializeErr]() {
283    this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR());
284  }
285
286  [kOnErrorMessage](serialized) {
287    // This is what is called for uncaught exceptions.
288    const error = deserializeError(serialized);
289    this.emit('error', error);
290  }
291
292  [kOnMessage](message) {
293    switch (message.type) {
294      case messageTypes.UP_AND_RUNNING:
295        this[kIsOnline] = true;
296        return this.emit('online');
297      case messageTypes.COULD_NOT_SERIALIZE_ERROR:
298        return this[kOnCouldNotSerializeErr]();
299      case messageTypes.ERROR_MESSAGE:
300        return this[kOnErrorMessage](message.error);
301      case messageTypes.STDIO_PAYLOAD:
302      {
303        const { stream, chunks } = message;
304        const readable = this[kParentSideStdio][stream];
305        for (const { chunk, encoding } of chunks)
306          readable.push(chunk, encoding);
307        return;
308      }
309      case messageTypes.STDIO_WANTS_MORE_DATA:
310      {
311        const { stream } = message;
312        return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
313      }
314    }
315
316    assert.fail(`Unknown worker message type ${message.type}`);
317  }
318
319  [kDispose]() {
320    this[kHandle].onexit = null;
321    this[kHandle] = null;
322    this[kPort] = null;
323    this[kPublicPort] = null;
324
325    const { stdout, stderr } = this[kParentSideStdio];
326
327    if (!stdout.readableEnded) {
328      debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
329      stdout.push(null);
330    }
331    if (!stderr.readableEnded) {
332      debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
333      stderr.push(null);
334    }
335  }
336
337  postMessage(...args) {
338    if (this[kPublicPort] === null) return;
339
340    this[kPublicPort].postMessage(...args);
341  }
342
343  terminate(callback) {
344    debug(`[${threadId}] terminates Worker with ID ${this.threadId}`);
345
346    this.ref();
347
348    if (typeof callback === 'function') {
349      process.emitWarning(
350        'Passing a callback to worker.terminate() is deprecated. ' +
351        'It returns a Promise instead.',
352        'DeprecationWarning', 'DEP0132');
353      if (this[kHandle] === null) return PromiseResolve();
354      this.once('exit', (exitCode) => callback(null, exitCode));
355    }
356
357    if (this[kHandle] === null) return PromiseResolve();
358
359    this[kHandle].stopThread();
360
361    // Do not use events.once() here, because the 'exit' event will always be
362    // emitted regardless of any errors, and the point is to only resolve
363    // once the thread has actually stopped.
364    return new Promise((resolve) => {
365      this.once('exit', resolve);
366    });
367  }
368
369  ref() {
370    if (this[kHandle] === null) return;
371
372    this[kHandle].ref();
373    this[kPublicPort].ref();
374  }
375
376  unref() {
377    if (this[kHandle] === null) return;
378
379    this[kHandle].unref();
380    this[kPublicPort].unref();
381  }
382
383  get threadId() {
384    if (this[kHandle] === null) return -1;
385
386    return this[kHandle].threadId;
387  }
388
389  get stdin() {
390    return this[kParentSideStdio].stdin;
391  }
392
393  get stdout() {
394    return this[kParentSideStdio].stdout;
395  }
396
397  get stderr() {
398    return this[kParentSideStdio].stderr;
399  }
400
401  get resourceLimits() {
402    if (this[kHandle] === null) return {};
403
404    return makeResourceLimits(this[kHandle].getResourceLimits());
405  }
406
407  getHeapSnapshot() {
408    const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot();
409    return new Promise((resolve, reject) => {
410      if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING());
411      heapSnapshotTaker.ondone = (handle) => {
412        const { HeapSnapshotStream } = require('internal/heap_utils');
413        resolve(new HeapSnapshotStream(handle));
414      };
415    });
416  }
417}
418
419function pipeWithoutWarning(source, dest) {
420  const sourceMaxListeners = source._maxListeners;
421  const destMaxListeners = dest._maxListeners;
422  source.setMaxListeners(Infinity);
423  dest.setMaxListeners(Infinity);
424
425  source.pipe(dest);
426
427  source._maxListeners = sourceMaxListeners;
428  dest._maxListeners = destMaxListeners;
429}
430
431const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount);
432function parseResourceLimits(obj) {
433  const ret = resourceLimitsArray;
434  TypedArrayPrototypeFill(ret, -1);
435  if (typeof obj !== 'object' || obj === null) return ret;
436
437  if (typeof obj.maxOldGenerationSizeMb === 'number')
438    ret[kMaxOldGenerationSizeMb] = MathMax(obj.maxOldGenerationSizeMb, 2);
439  if (typeof obj.maxYoungGenerationSizeMb === 'number')
440    ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb;
441  if (typeof obj.codeRangeSizeMb === 'number')
442    ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb;
443  if (typeof obj.stackSizeMb === 'number')
444    ret[kStackSizeMb] = obj.stackSizeMb;
445  return ret;
446}
447
448function makeResourceLimits(float64arr) {
449  return {
450    maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb],
451    maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb],
452    codeRangeSizeMb: float64arr[kCodeRangeSizeMb],
453    stackSizeMb: float64arr[kStackSizeMb]
454  };
455}
456
457function eventLoopUtilization(util1, util2) {
458  // TODO(trevnorris): Works to solve the thread-safe read/write issue of
459  // loopTime, but has the drawback that it can't be set until the event loop
460  // has had a chance to turn. So it will be impossible to read the ELU of
461  // a worker thread immediately after it's been created.
462  if (!this[kIsOnline] || !this[kHandle]) {
463    return { idle: 0, active: 0, utilization: 0 };
464  }
465
466  // Cache loopStart, since it's only written to once.
467  if (this[kLoopStartTime] === -1) {
468    this[kLoopStartTime] = this[kHandle].loopStartTime();
469    if (this[kLoopStartTime] === -1)
470      return { idle: 0, active: 0, utilization: 0 };
471  }
472
473  if (util2) {
474    const idle = util1.idle - util2.idle;
475    const active = util1.active - util2.active;
476    return { idle, active, utilization: active / (idle + active) };
477  }
478
479  const idle = this[kHandle].loopIdleTime();
480
481  // Using performance.now() here is fine since it's always the time from
482  // the beginning of the process, and is why it needs to be offset by the
483  // loopStart time (which is also calculated from the beginning of the
484  // process).
485  const active = now() - this[kLoopStartTime] - idle;
486
487  if (!util1) {
488    return { idle, active, utilization: active / (idle + active) };
489  }
490
491  const idle_delta = idle - util1.idle;
492  const active_delta = active - util1.active;
493  const utilization = active_delta / (idle_delta + active_delta);
494  return { idle: idle_delta, active: active_delta, utilization };
495}
496
497// Duplicate code from performance.now() so don't need to require perf_hooks.
498function now() {
499  const hr = process.hrtime();
500  return (hr[0] * 1000 + hr[1] / 1e6) - timeOrigin;
501}
502
503module.exports = {
504  ownsProcessState,
505  isMainThread,
506  SHARE_ENV,
507  resourceLimits:
508    !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {},
509  setEnvironmentData,
510  getEnvironmentData,
511  assignEnvironmentData,
512  threadId,
513  Worker,
514};
515