• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3// In worker threads, execute the script sent through the
4// message port.
5
6const {
7  ArrayPrototypeForEach,
8  ArrayPrototypePushApply,
9  ArrayPrototypeSplice,
10  ObjectDefineProperty,
11  PromisePrototypeThen,
12  RegExpPrototypeExec,
13  globalThis: {
14    Atomics,
15    SharedArrayBuffer,
16  },
17} = primordials;
18
19const {
20  prepareWorkerThreadExecution,
21  setupUserModules,
22  markBootstrapComplete,
23} = require('internal/process/pre_execution');
24
25const {
26  threadId,
27  getEnvMessagePort,
28} = internalBinding('worker');
29
30const workerIo = require('internal/worker/io');
31const {
32  messageTypes: {
33    // Messages that may be received by workers
34    LOAD_SCRIPT,
35    // Messages that may be posted from workers
36    UP_AND_RUNNING,
37    ERROR_MESSAGE,
38    COULD_NOT_SERIALIZE_ERROR,
39    // Messages that may be either received or posted
40    STDIO_PAYLOAD,
41    STDIO_WANTS_MORE_DATA,
42  },
43  kStdioWantsMoreDataCallback,
44} = workerIo;
45
46const {
47  onGlobalUncaughtException,
48} = require('internal/process/execution');
49
50let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
51  debug = fn;
52});
53
54const assert = require('internal/assert');
55
56prepareWorkerThreadExecution();
57
58debug(`[${threadId}] is setting up worker child environment`);
59
60// Set up the message port and start listening
61const port = getEnvMessagePort();
62
63// If the main thread is spawned with env NODE_CHANNEL_FD, it's probably
64// spawned by our child_process module. In the work threads, mark the
65// related IPC properties as unavailable.
66if (process.env.NODE_CHANNEL_FD) {
67  const workerThreadSetup = require('internal/process/worker_thread_only');
68  ObjectDefineProperty(process, 'channel', {
69    __proto__: null,
70    enumerable: false,
71    get: workerThreadSetup.unavailable('process.channel'),
72  });
73
74  ObjectDefineProperty(process, 'connected', {
75    __proto__: null,
76    enumerable: false,
77    get: workerThreadSetup.unavailable('process.connected'),
78  });
79
80  process.send = workerThreadSetup.unavailable('process.send()');
81  process.disconnect =
82    workerThreadSetup.unavailable('process.disconnect()');
83}
84
85port.on('message', (message) => {
86  if (message.type === LOAD_SCRIPT) {
87    port.unref();
88    const {
89      argv,
90      cwdCounter,
91      filename,
92      doEval,
93      workerData,
94      environmentData,
95      publicPort,
96      manifestSrc,
97      manifestURL,
98      hasStdin,
99    } = message;
100
101    if (argv !== undefined) {
102      ArrayPrototypePushApply(process.argv, argv);
103    }
104
105    const publicWorker = require('worker_threads');
106    publicWorker.parentPort = publicPort;
107    publicWorker.workerData = workerData;
108
109    require('internal/worker').assignEnvironmentData(environmentData);
110
111    if (SharedArrayBuffer !== undefined && Atomics !== undefined) {
112      // The counter is only passed to the workers created by the main thread,
113      // not to workers created by other workers.
114      let cachedCwd = '';
115      let lastCounter = -1;
116      const originalCwd = process.cwd;
117
118      process.cwd = function() {
119        const currentCounter = Atomics.load(cwdCounter, 0);
120        if (currentCounter === lastCounter)
121          return cachedCwd;
122        lastCounter = currentCounter;
123        cachedCwd = originalCwd();
124        return cachedCwd;
125      };
126      workerIo.sharedCwdCounter = cwdCounter;
127    }
128
129    if (manifestSrc) {
130      require('internal/process/policy').setup(manifestSrc, manifestURL);
131    }
132    setupUserModules();
133
134    if (!hasStdin)
135      process.stdin.push(null);
136
137    debug(`[${threadId}] starts worker script ${filename} ` +
138          `(eval = ${doEval}) at cwd = ${process.cwd()}`);
139    port.postMessage({ type: UP_AND_RUNNING });
140    if (doEval === 'classic') {
141      const { evalScript } = require('internal/process/execution');
142      const name = '[worker eval]';
143      // This is necessary for CJS module compilation.
144      // TODO: pass this with something really internal.
145      ObjectDefineProperty(process, '_eval', {
146        __proto__: null,
147        configurable: true,
148        enumerable: true,
149        value: filename,
150      });
151      ArrayPrototypeSplice(process.argv, 1, 0, name);
152      evalScript(name, filename);
153    } else if (doEval === 'module') {
154      const { evalModule } = require('internal/process/execution');
155      PromisePrototypeThen(evalModule(filename), undefined, (e) => {
156        workerOnGlobalUncaughtException(e, true);
157      });
158    } else {
159      // script filename
160      // runMain here might be monkey-patched by users in --require.
161      // XXX: the monkey-patchability here should probably be deprecated.
162      ArrayPrototypeSplice(process.argv, 1, 0, filename);
163      const CJSLoader = require('internal/modules/cjs/loader');
164      CJSLoader.Module.runMain(filename);
165    }
166  } else if (message.type === STDIO_PAYLOAD) {
167    const { stream, chunks } = message;
168    ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
169      process[stream].push(chunk, encoding);
170    });
171  } else {
172    assert(
173      message.type === STDIO_WANTS_MORE_DATA,
174      `Unknown worker message type ${message.type}`,
175    );
176    const { stream } = message;
177    process[stream][kStdioWantsMoreDataCallback]();
178  }
179});
180
181function workerOnGlobalUncaughtException(error, fromPromise) {
182  debug(`[${threadId}] gets uncaught exception`);
183  let handled = false;
184  let handlerThrew = false;
185  try {
186    handled = onGlobalUncaughtException(error, fromPromise);
187  } catch (e) {
188    error = e;
189    handlerThrew = true;
190  }
191  debug(`[${threadId}] uncaught exception handled = ${handled}`);
192
193  if (handled) {
194    return true;
195  }
196
197  if (!process._exiting) {
198    try {
199      process._exiting = true;
200      process.exitCode = 1;
201      if (!handlerThrew) {
202        process.emit('exit', process.exitCode);
203      }
204    } catch {
205      // Continue regardless of error.
206    }
207  }
208
209  let serialized;
210  try {
211    const { serializeError } = require('internal/error_serdes');
212    serialized = serializeError(error);
213  } catch {
214    // Continue regardless of error.
215  }
216  debug(`[${threadId}] uncaught exception serialized = ${!!serialized}`);
217  if (serialized)
218    port.postMessage({
219      type: ERROR_MESSAGE,
220      error: serialized,
221    });
222  else
223    port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR });
224
225  const { clearAsyncIdStack } = require('internal/async_hooks');
226  clearAsyncIdStack();
227
228  process.exit();
229}
230
231// Patch the global uncaught exception handler so it gets picked up by
232// node::errors::TriggerUncaughtException().
233process._fatalException = workerOnGlobalUncaughtException;
234
235markBootstrapComplete();
236
237// Necessary to reset RegExp statics before user code runs.
238RegExpPrototypeExec(/^/, '');
239
240port.start();
241