• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3/* global SharedArrayBuffer */
4
5const {
6  ArrayIsArray,
7  MathMax,
8  ObjectCreate,
9  ObjectEntries,
10  Promise,
11  PromiseResolve,
12  Symbol,
13  SymbolFor,
14  Uint32Array,
15} = primordials;
16
17const EventEmitter = require('events');
18const assert = require('internal/assert');
19const path = require('path');
20
21const errorCodes = require('internal/errors').codes;
22const {
23  ERR_WORKER_NOT_RUNNING,
24  ERR_WORKER_PATH,
25  ERR_WORKER_UNSERIALIZABLE_ERROR,
26  ERR_WORKER_UNSUPPORTED_EXTENSION,
27  ERR_WORKER_INVALID_EXEC_ARGV,
28  ERR_INVALID_ARG_TYPE,
29  ERR_INVALID_ARG_VALUE,
30} = errorCodes;
31const { getOptionValue } = require('internal/options');
32
33const workerIo = require('internal/worker/io');
34const {
35  drainMessagePort,
36  MessageChannel,
37  messageTypes,
38  kPort,
39  kIncrementsPortRef,
40  kWaitingStreams,
41  kStdioWantsMoreDataCallback,
42  setupPortReferencing,
43  ReadableWorkerStdio,
44  WritableWorkerStdio
45} = workerIo;
46const { deserializeError } = require('internal/error_serdes');
47const { fileURLToPath, isURLInstance, pathToFileURL } = require('internal/url');
48
49const {
50  ownsProcessState,
51  isMainThread,
52  resourceLimits: resourceLimitsRaw,
53  threadId,
54  Worker: WorkerImpl,
55  kMaxYoungGenerationSizeMb,
56  kMaxOldGenerationSizeMb,
57  kCodeRangeSizeMb,
58  kStackSizeMb,
59  kTotalResourceLimitCount
60} = internalBinding('worker');
61
62const kHandle = Symbol('kHandle');
63const kPublicPort = Symbol('kPublicPort');
64const kDispose = Symbol('kDispose');
65const kOnExit = Symbol('kOnExit');
66const kOnMessage = Symbol('kOnMessage');
67const kOnCouldNotSerializeErr = Symbol('kOnCouldNotSerializeErr');
68const kOnErrorMessage = Symbol('kOnErrorMessage');
69const kParentSideStdio = Symbol('kParentSideStdio');
70
71const SHARE_ENV = SymbolFor('nodejs.worker_threads.SHARE_ENV');
72let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
73  debug = fn;
74});
75
76let cwdCounter;
77
78if (isMainThread) {
79  cwdCounter = new Uint32Array(new SharedArrayBuffer(4));
80  const originalChdir = process.chdir;
81  process.chdir = function(path) {
82    Atomics.add(cwdCounter, 0, 1);
83    originalChdir(path);
84  };
85}
86
87class Worker extends EventEmitter {
88  constructor(filename, options = {}) {
89    super();
90    debug(`[${threadId}] create new worker`, filename, options);
91    if (options.execArgv && !ArrayIsArray(options.execArgv)) {
92      throw new ERR_INVALID_ARG_TYPE('options.execArgv',
93                                     'Array',
94                                     options.execArgv);
95    }
96    let argv;
97    if (options.argv) {
98      if (!ArrayIsArray(options.argv)) {
99        throw new ERR_INVALID_ARG_TYPE('options.argv', 'Array', options.argv);
100      }
101      argv = options.argv.map(String);
102    }
103
104    let url;
105    if (options.eval) {
106      if (typeof filename !== 'string') {
107        throw new ERR_INVALID_ARG_VALUE(
108          'options.eval',
109          options.eval,
110          'must be false when \'filename\' is not a string'
111        );
112      }
113      url = null;
114    } else {
115      if (isURLInstance(filename)) {
116        url = filename;
117        filename = fileURLToPath(filename);
118      } else if (typeof filename !== 'string') {
119        throw new ERR_INVALID_ARG_TYPE(
120          'filename',
121          ['string', 'URL'],
122          filename
123        );
124      } else if (path.isAbsolute(filename) || /^\.\.?[\\/]/.test(filename)) {
125        filename = path.resolve(filename);
126        url = pathToFileURL(filename);
127      } else {
128        throw new ERR_WORKER_PATH(filename);
129      }
130
131      const ext = path.extname(filename);
132      if (ext !== '.js' && ext !== '.mjs' && ext !== '.cjs') {
133        throw new ERR_WORKER_UNSUPPORTED_EXTENSION(ext);
134      }
135    }
136
137    let env;
138    if (typeof options.env === 'object' && options.env !== null) {
139      env = ObjectCreate(null);
140      for (const [ key, value ] of ObjectEntries(options.env))
141        env[key] = `${value}`;
142    } else if (options.env == null) {
143      env = process.env;
144    } else if (options.env !== SHARE_ENV) {
145      throw new ERR_INVALID_ARG_TYPE(
146        'options.env',
147        ['object', 'undefined', 'null', 'worker_threads.SHARE_ENV'],
148        options.env);
149    }
150
151    // Set up the C++ handle for the worker, as well as some internal wiring.
152    this[kHandle] = new WorkerImpl(url,
153                                   env === process.env ? null : env,
154                                   options.execArgv,
155                                   parseResourceLimits(options.resourceLimits),
156                                   !!options.trackUnmanagedFds);
157    if (this[kHandle].invalidExecArgv) {
158      throw new ERR_WORKER_INVALID_EXEC_ARGV(this[kHandle].invalidExecArgv);
159    }
160    if (this[kHandle].invalidNodeOptions) {
161      throw new ERR_WORKER_INVALID_EXEC_ARGV(
162        this[kHandle].invalidNodeOptions, 'invalid NODE_OPTIONS env variable');
163    }
164    this[kHandle].onexit = (code, customErr, customErrReason) => {
165      this[kOnExit](code, customErr, customErrReason);
166    };
167    this[kPort] = this[kHandle].messagePort;
168    this[kPort].on('message', (data) => this[kOnMessage](data));
169    this[kPort].start();
170    this[kPort].unref();
171    this[kPort][kWaitingStreams] = 0;
172    debug(`[${threadId}] created Worker with ID ${this.threadId}`);
173
174    let stdin = null;
175    if (options.stdin)
176      stdin = new WritableWorkerStdio(this[kPort], 'stdin');
177    const stdout = new ReadableWorkerStdio(this[kPort], 'stdout');
178    if (!options.stdout) {
179      stdout[kIncrementsPortRef] = false;
180      pipeWithoutWarning(stdout, process.stdout);
181    }
182    const stderr = new ReadableWorkerStdio(this[kPort], 'stderr');
183    if (!options.stderr) {
184      stderr[kIncrementsPortRef] = false;
185      pipeWithoutWarning(stderr, process.stderr);
186    }
187
188    this[kParentSideStdio] = { stdin, stdout, stderr };
189
190    const { port1, port2 } = new MessageChannel();
191    const transferList = [port2];
192    // If transferList is provided.
193    if (options.transferList)
194      transferList.push(...options.transferList);
195
196    this[kPublicPort] = port1;
197    for (const event of ['message', 'messageerror']) {
198      this[kPublicPort].on(event, (message) => this.emit(event, message));
199    }
200    setupPortReferencing(this[kPublicPort], this, 'message');
201    this[kPort].postMessage({
202      argv,
203      type: messageTypes.LOAD_SCRIPT,
204      filename,
205      doEval: !!options.eval,
206      cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
207      workerData: options.workerData,
208      publicPort: port2,
209      manifestURL: getOptionValue('--experimental-policy') ?
210        require('internal/process/policy').url :
211        null,
212      manifestSrc: getOptionValue('--experimental-policy') ?
213        require('internal/process/policy').src :
214        null,
215      hasStdin: !!options.stdin
216    }, transferList);
217    // Actually start the new thread now that everything is in place.
218    this[kHandle].startThread();
219  }
220
221  [kOnExit](code, customErr, customErrReason) {
222    debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
223    drainMessagePort(this[kPublicPort]);
224    drainMessagePort(this[kPort]);
225    this[kDispose]();
226    if (customErr) {
227      debug(`[${threadId}] failing with custom error ${customErr} \
228        and with reason ${customErrReason}`);
229      this.emit('error', new errorCodes[customErr](customErrReason));
230    }
231    this.emit('exit', code);
232    this.removeAllListeners();
233  }
234
235  [kOnCouldNotSerializeErr]() {
236    this.emit('error', new ERR_WORKER_UNSERIALIZABLE_ERROR());
237  }
238
239  [kOnErrorMessage](serialized) {
240    // This is what is called for uncaught exceptions.
241    const error = deserializeError(serialized);
242    this.emit('error', error);
243  }
244
245  [kOnMessage](message) {
246    switch (message.type) {
247      case messageTypes.UP_AND_RUNNING:
248        return this.emit('online');
249      case messageTypes.COULD_NOT_SERIALIZE_ERROR:
250        return this[kOnCouldNotSerializeErr]();
251      case messageTypes.ERROR_MESSAGE:
252        return this[kOnErrorMessage](message.error);
253      case messageTypes.STDIO_PAYLOAD:
254      {
255        const { stream, chunks } = message;
256        const readable = this[kParentSideStdio][stream];
257        for (const { chunk, encoding } of chunks)
258          readable.push(chunk, encoding);
259        return;
260      }
261      case messageTypes.STDIO_WANTS_MORE_DATA:
262      {
263        const { stream } = message;
264        return this[kParentSideStdio][stream][kStdioWantsMoreDataCallback]();
265      }
266    }
267
268    assert.fail(`Unknown worker message type ${message.type}`);
269  }
270
271  [kDispose]() {
272    this[kHandle].onexit = null;
273    this[kHandle] = null;
274    this[kPort] = null;
275    this[kPublicPort] = null;
276
277    const { stdout, stderr } = this[kParentSideStdio];
278
279    if (!stdout.readableEnded) {
280      debug(`[${threadId}] explicitly closes stdout for ${this.threadId}`);
281      stdout.push(null);
282    }
283    if (!stderr.readableEnded) {
284      debug(`[${threadId}] explicitly closes stderr for ${this.threadId}`);
285      stderr.push(null);
286    }
287  }
288
289  postMessage(...args) {
290    if (this[kPublicPort] === null) return;
291
292    this[kPublicPort].postMessage(...args);
293  }
294
295  terminate(callback) {
296    debug(`[${threadId}] terminates Worker with ID ${this.threadId}`);
297
298    this.ref();
299
300    if (typeof callback === 'function') {
301      process.emitWarning(
302        'Passing a callback to worker.terminate() is deprecated. ' +
303        'It returns a Promise instead.',
304        'DeprecationWarning', 'DEP0132');
305      if (this[kHandle] === null) return PromiseResolve();
306      this.once('exit', (exitCode) => callback(null, exitCode));
307    }
308
309    if (this[kHandle] === null) return PromiseResolve();
310
311    this[kHandle].stopThread();
312
313    // Do not use events.once() here, because the 'exit' event will always be
314    // emitted regardless of any errors, and the point is to only resolve
315    // once the thread has actually stopped.
316    return new Promise((resolve) => {
317      this.once('exit', resolve);
318    });
319  }
320
321  ref() {
322    if (this[kHandle] === null) return;
323
324    this[kHandle].ref();
325    this[kPublicPort].ref();
326  }
327
328  unref() {
329    if (this[kHandle] === null) return;
330
331    this[kHandle].unref();
332    this[kPublicPort].unref();
333  }
334
335  get threadId() {
336    if (this[kHandle] === null) return -1;
337
338    return this[kHandle].threadId;
339  }
340
341  get stdin() {
342    return this[kParentSideStdio].stdin;
343  }
344
345  get stdout() {
346    return this[kParentSideStdio].stdout;
347  }
348
349  get stderr() {
350    return this[kParentSideStdio].stderr;
351  }
352
353  get resourceLimits() {
354    if (this[kHandle] === null) return {};
355
356    return makeResourceLimits(this[kHandle].getResourceLimits());
357  }
358
359  getHeapSnapshot() {
360    const heapSnapshotTaker = this[kHandle] && this[kHandle].takeHeapSnapshot();
361    return new Promise((resolve, reject) => {
362      if (!heapSnapshotTaker) return reject(new ERR_WORKER_NOT_RUNNING());
363      heapSnapshotTaker.ondone = (handle) => {
364        const { HeapSnapshotStream } = require('internal/heap_utils');
365        resolve(new HeapSnapshotStream(handle));
366      };
367    });
368  }
369}
370
371function pipeWithoutWarning(source, dest) {
372  const sourceMaxListeners = source._maxListeners;
373  const destMaxListeners = dest._maxListeners;
374  source.setMaxListeners(Infinity);
375  dest.setMaxListeners(Infinity);
376
377  source.pipe(dest);
378
379  source._maxListeners = sourceMaxListeners;
380  dest._maxListeners = destMaxListeners;
381}
382
383const resourceLimitsArray = new Float64Array(kTotalResourceLimitCount);
384function parseResourceLimits(obj) {
385  const ret = resourceLimitsArray;
386  ret.fill(-1);
387  if (typeof obj !== 'object' || obj === null) return ret;
388
389  if (typeof obj.maxOldGenerationSizeMb === 'number')
390    ret[kMaxOldGenerationSizeMb] = MathMax(obj.maxOldGenerationSizeMb, 2);
391  if (typeof obj.maxYoungGenerationSizeMb === 'number')
392    ret[kMaxYoungGenerationSizeMb] = obj.maxYoungGenerationSizeMb;
393  if (typeof obj.codeRangeSizeMb === 'number')
394    ret[kCodeRangeSizeMb] = obj.codeRangeSizeMb;
395  if (typeof obj.stackSizeMb === 'number')
396    ret[kStackSizeMb] = obj.stackSizeMb;
397  return ret;
398}
399
400function makeResourceLimits(float64arr) {
401  return {
402    maxYoungGenerationSizeMb: float64arr[kMaxYoungGenerationSizeMb],
403    maxOldGenerationSizeMb: float64arr[kMaxOldGenerationSizeMb],
404    codeRangeSizeMb: float64arr[kCodeRangeSizeMb],
405    stackSizeMb: float64arr[kStackSizeMb]
406  };
407}
408
409module.exports = {
410  ownsProcessState,
411  isMainThread,
412  SHARE_ENV,
413  resourceLimits:
414    !isMainThread ? makeResourceLimits(resourceLimitsRaw) : {},
415  threadId,
416  Worker,
417};
418