• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3// In worker threads, execute the script sent through the
4// message port.
5
6const {
7  ArrayPrototypeConcat,
8  ArrayPrototypeForEach,
9  ArrayPrototypeSplice,
10  ObjectDefineProperty,
11  PromisePrototypeCatch,
12  globalThis: { Atomics },
13} = primordials;
14
15const {
16  patchProcessObject,
17  setupCoverageHooks,
18  setupInspectorHooks,
19  setupWarningHandler,
20  setupDebugEnv,
21  initializeAbortController,
22  initializeDeprecations,
23  initializeWASI,
24  initializeCJSLoader,
25  initializeESMLoader,
26  initializeFrozenIntrinsics,
27  initializeReport,
28  initializeSourceMapsHandlers,
29  loadPreloadModules,
30  setupTraceCategoryState
31} = require('internal/bootstrap/pre_execution');
32
33const {
34  threadId,
35  getEnvMessagePort
36} = internalBinding('worker');
37
38const workerIo = require('internal/worker/io');
39const {
40  messageTypes: {
41    // Messages that may be received by workers
42    LOAD_SCRIPT,
43    // Messages that may be posted from workers
44    UP_AND_RUNNING,
45    ERROR_MESSAGE,
46    COULD_NOT_SERIALIZE_ERROR,
47    // Messages that may be either received or posted
48    STDIO_PAYLOAD,
49    STDIO_WANTS_MORE_DATA,
50  },
51  kStdioWantsMoreDataCallback
52} = workerIo;
53
54const {
55  onGlobalUncaughtException
56} = require('internal/process/execution');
57
58const publicWorker = require('worker_threads');
59let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
60  debug = fn;
61});
62
63const assert = require('internal/assert');
64
65patchProcessObject();
66setupInspectorHooks();
67setupDebugEnv();
68
69setupWarningHandler();
70initializeSourceMapsHandlers();
71
72// Since worker threads cannot switch cwd, we do not need to
73// overwrite the process.env.NODE_V8_COVERAGE variable.
74if (process.env.NODE_V8_COVERAGE) {
75  setupCoverageHooks(process.env.NODE_V8_COVERAGE);
76}
77
78debug(`[${threadId}] is setting up worker child environment`);
79
80// Set up the message port and start listening
81const port = getEnvMessagePort();
82
83// If the main thread is spawned with env NODE_CHANNEL_FD, it's probably
84// spawned by our child_process module. In the work threads, mark the
85// related IPC properties as unavailable.
86if (process.env.NODE_CHANNEL_FD) {
87  const workerThreadSetup = require('internal/process/worker_thread_only');
88  ObjectDefineProperty(process, 'channel', {
89    enumerable: false,
90    get: workerThreadSetup.unavailable('process.channel')
91  });
92
93  ObjectDefineProperty(process, 'connected', {
94    enumerable: false,
95    get: workerThreadSetup.unavailable('process.connected')
96  });
97
98  process.send = workerThreadSetup.unavailable('process.send()');
99  process.disconnect =
100    workerThreadSetup.unavailable('process.disconnect()');
101}
102
103port.on('message', (message) => {
104  if (message.type === LOAD_SCRIPT) {
105    port.unref();
106    const {
107      argv,
108      cwdCounter,
109      filename,
110      doEval,
111      workerData,
112      environmentData,
113      publicPort,
114      manifestSrc,
115      manifestURL,
116      hasStdin
117    } = message;
118
119    setupTraceCategoryState();
120    initializeReport();
121    if (manifestSrc) {
122      require('internal/process/policy').setup(manifestSrc, manifestURL);
123    }
124    initializeAbortController();
125    initializeDeprecations();
126    initializeWASI();
127    initializeCJSLoader();
128    initializeESMLoader();
129
130    if (argv !== undefined) {
131      process.argv = ArrayPrototypeConcat(process.argv, argv);
132    }
133    publicWorker.parentPort = publicPort;
134    publicWorker.workerData = workerData;
135
136    require('internal/worker').assignEnvironmentData(environmentData);
137
138    // The counter is only passed to the workers created by the main thread, not
139    // to workers created by other workers.
140    let cachedCwd = '';
141    let lastCounter = -1;
142    const originalCwd = process.cwd;
143
144    process.cwd = function() {
145      const currentCounter = Atomics.load(cwdCounter, 0);
146      if (currentCounter === lastCounter)
147        return cachedCwd;
148      lastCounter = currentCounter;
149      cachedCwd = originalCwd();
150      return cachedCwd;
151    };
152    workerIo.sharedCwdCounter = cwdCounter;
153
154    const CJSLoader = require('internal/modules/cjs/loader');
155    assert(!CJSLoader.hasLoadedAnyUserCJSModule);
156    loadPreloadModules();
157    initializeFrozenIntrinsics();
158
159    if (!hasStdin)
160      process.stdin.push(null);
161
162    debug(`[${threadId}] starts worker script ${filename} ` +
163          `(eval = ${eval}) at cwd = ${process.cwd()}`);
164    port.postMessage({ type: UP_AND_RUNNING });
165    if (doEval === 'classic') {
166      const { evalScript } = require('internal/process/execution');
167      const name = '[worker eval]';
168      // This is necessary for CJS module compilation.
169      // TODO: pass this with something really internal.
170      ObjectDefineProperty(process, '_eval', {
171        configurable: true,
172        enumerable: true,
173        value: filename,
174      });
175      ArrayPrototypeSplice(process.argv, 1, 0, name);
176      evalScript(name, filename);
177    } else if (doEval === 'module') {
178      const { evalModule } = require('internal/process/execution');
179      PromisePrototypeCatch(evalModule(filename), (e) => {
180        workerOnGlobalUncaughtException(e, true);
181      });
182    } else {
183      // script filename
184      // runMain here might be monkey-patched by users in --require.
185      // XXX: the monkey-patchability here should probably be deprecated.
186      ArrayPrototypeSplice(process.argv, 1, 0, filename);
187      CJSLoader.Module.runMain(filename);
188    }
189  } else if (message.type === STDIO_PAYLOAD) {
190    const { stream, chunks } = message;
191    ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
192      process[stream].push(chunk, encoding);
193    });
194  } else {
195    assert(
196      message.type === STDIO_WANTS_MORE_DATA,
197      `Unknown worker message type ${message.type}`
198    );
199    const { stream } = message;
200    process[stream][kStdioWantsMoreDataCallback]();
201  }
202});
203
204function workerOnGlobalUncaughtException(error, fromPromise) {
205  debug(`[${threadId}] gets uncaught exception`);
206  let handled = false;
207  let handlerThrew = false;
208  try {
209    handled = onGlobalUncaughtException(error, fromPromise);
210  } catch (e) {
211    error = e;
212    handlerThrew = true;
213  }
214  debug(`[${threadId}] uncaught exception handled = ${handled}`);
215
216  if (handled) {
217    return true;
218  }
219
220  if (!process._exiting) {
221    try {
222      process._exiting = true;
223      process.exitCode = 1;
224      if (!handlerThrew) {
225        process.emit('exit', process.exitCode);
226      }
227    } catch {}
228  }
229
230  let serialized;
231  try {
232    const { serializeError } = require('internal/error_serdes');
233    serialized = serializeError(error);
234  } catch {}
235  debug(`[${threadId}] uncaught exception serialized = ${!!serialized}`);
236  if (serialized)
237    port.postMessage({
238      type: ERROR_MESSAGE,
239      error: serialized
240    });
241  else
242    port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR });
243
244  const { clearAsyncIdStack } = require('internal/async_hooks');
245  clearAsyncIdStack();
246
247  process.exit();
248}
249
250// Patch the global uncaught exception handler so it gets picked up by
251// node::errors::TriggerUncaughtException().
252process._fatalException = workerOnGlobalUncaughtException;
253
254markBootstrapComplete();
255
256port.start();
257