• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3// In worker threads, execute the script sent through the
4// message port.
5
6const {
7  ArrayPrototypeForEach,
8  ArrayPrototypePushApply,
9  ArrayPrototypeSplice,
10  ObjectDefineProperty,
11  PromisePrototypeThen,
12  RegExpPrototypeExec,
13  SafeWeakMap,
14  globalThis: {
15    Atomics,
16    SharedArrayBuffer,
17  },
18} = primordials;
19
20const {
21  prepareWorkerThreadExecution,
22  setupUserModules,
23  markBootstrapComplete,
24} = require('internal/process/pre_execution');
25
26const {
27  threadId,
28  getEnvMessagePort,
29} = internalBinding('worker');
30
31const workerIo = require('internal/worker/io');
32const {
33  messageTypes: {
34    // Messages that may be received by workers
35    LOAD_SCRIPT,
36    // Messages that may be posted from workers
37    UP_AND_RUNNING,
38    ERROR_MESSAGE,
39    COULD_NOT_SERIALIZE_ERROR,
40    // Messages that may be either received or posted
41    STDIO_PAYLOAD,
42    STDIO_WANTS_MORE_DATA,
43  },
44  kStdioWantsMoreDataCallback,
45} = workerIo;
46
47const {
48  onGlobalUncaughtException,
49} = require('internal/process/execution');
50
51let debug = require('internal/util/debuglog').debuglog('worker', (fn) => {
52  debug = fn;
53});
54
55const assert = require('internal/assert');
56
57prepareWorkerThreadExecution();
58
59debug(`[${threadId}] is setting up worker child environment`);
60
61// Set up the message port and start listening
62const port = getEnvMessagePort();
63
64// If the main thread is spawned with env NODE_CHANNEL_FD, it's probably
65// spawned by our child_process module. In the work threads, mark the
66// related IPC properties as unavailable.
67if (process.env.NODE_CHANNEL_FD) {
68  const workerThreadSetup = require('internal/process/worker_thread_only');
69  ObjectDefineProperty(process, 'channel', {
70    __proto__: null,
71    enumerable: false,
72    get: workerThreadSetup.unavailable('process.channel'),
73  });
74
75  ObjectDefineProperty(process, 'connected', {
76    __proto__: null,
77    enumerable: false,
78    get: workerThreadSetup.unavailable('process.connected'),
79  });
80
81  process.send = workerThreadSetup.unavailable('process.send()');
82  process.disconnect =
83    workerThreadSetup.unavailable('process.disconnect()');
84}
85
86port.on('message', (message) => {
87  if (message.type === LOAD_SCRIPT) {
88    port.unref();
89    const {
90      argv,
91      cwdCounter,
92      doEval,
93      environmentData,
94      filename,
95      hasStdin,
96      manifestSrc,
97      manifestURL,
98      publicPort,
99      workerData,
100    } = message;
101
102    if (doEval !== 'internal') {
103      if (argv !== undefined) {
104        ArrayPrototypePushApply(process.argv, argv);
105      }
106
107      const publicWorker = require('worker_threads');
108      publicWorker.parentPort = publicPort;
109      publicWorker.workerData = workerData;
110    }
111
112    require('internal/worker').assignEnvironmentData(environmentData);
113
114    if (SharedArrayBuffer !== undefined && Atomics !== undefined) {
115      // The counter is only passed to the workers created by the main thread,
116      // not to workers created by other workers.
117      let cachedCwd = '';
118      let lastCounter = -1;
119      const originalCwd = process.cwd;
120
121      process.cwd = function() {
122        const currentCounter = Atomics.load(cwdCounter, 0);
123        if (currentCounter === lastCounter)
124          return cachedCwd;
125        lastCounter = currentCounter;
126        cachedCwd = originalCwd();
127        return cachedCwd;
128      };
129      workerIo.sharedCwdCounter = cwdCounter;
130    }
131
132    if (manifestSrc) {
133      require('internal/process/policy').setup(manifestSrc, manifestURL);
134    }
135    const isLoaderWorker =
136      doEval === 'internal' &&
137      filename === require('internal/modules/esm/utils').loaderWorkerId;
138    setupUserModules(isLoaderWorker);
139
140    if (!hasStdin)
141      process.stdin.push(null);
142
143    debug(`[${threadId}] starts worker script ${filename} ` +
144          `(eval = ${doEval}) at cwd = ${process.cwd()}`);
145    port.postMessage({ type: UP_AND_RUNNING });
146    switch (doEval) {
147      case 'internal': {
148        // Create this WeakMap in js-land because V8 has no C++ API for WeakMap.
149        internalBinding('module_wrap').callbackMap = new SafeWeakMap();
150        require(filename)(workerData, publicPort);
151        break;
152      }
153
154      case 'classic': {
155        const { evalScript } = require('internal/process/execution');
156        const name = '[worker eval]';
157        // This is necessary for CJS module compilation.
158        // TODO: pass this with something really internal.
159        ObjectDefineProperty(process, '_eval', {
160          __proto__: null,
161          configurable: true,
162          enumerable: true,
163          value: filename,
164        });
165        ArrayPrototypeSplice(process.argv, 1, 0, name);
166        evalScript(name, filename);
167        break;
168      }
169
170      case 'module': {
171        const { evalModule } = require('internal/process/execution');
172        PromisePrototypeThen(evalModule(filename), undefined, (e) => {
173          workerOnGlobalUncaughtException(e, true);
174        });
175        break;
176      }
177
178      default: {
179        // script filename
180        // runMain here might be monkey-patched by users in --require.
181        // XXX: the monkey-patchability here should probably be deprecated.
182        ArrayPrototypeSplice(process.argv, 1, 0, filename);
183        const CJSLoader = require('internal/modules/cjs/loader');
184        CJSLoader.Module.runMain(filename);
185        break;
186      }
187    }
188  } else if (message.type === STDIO_PAYLOAD) {
189    const { stream, chunks } = message;
190    ArrayPrototypeForEach(chunks, ({ chunk, encoding }) => {
191      process[stream].push(chunk, encoding);
192    });
193  } else {
194    assert(
195      message.type === STDIO_WANTS_MORE_DATA,
196      `Unknown worker message type ${message.type}`,
197    );
198    const { stream } = message;
199    process[stream][kStdioWantsMoreDataCallback]();
200  }
201});
202
203function workerOnGlobalUncaughtException(error, fromPromise) {
204  debug(`[${threadId}] gets uncaught exception`);
205  let handled = false;
206  let handlerThrew = false;
207  try {
208    handled = onGlobalUncaughtException(error, fromPromise);
209  } catch (e) {
210    error = e;
211    handlerThrew = true;
212  }
213  debug(`[${threadId}] uncaught exception handled = ${handled}`);
214
215  if (handled) {
216    return true;
217  }
218
219  if (!process._exiting) {
220    try {
221      process._exiting = true;
222      process.exitCode = 1;
223      if (!handlerThrew) {
224        process.emit('exit', process.exitCode);
225      }
226    } catch {
227      // Continue regardless of error.
228    }
229  }
230
231  let serialized;
232  try {
233    const { serializeError } = require('internal/error_serdes');
234    serialized = serializeError(error);
235  } catch {
236    // Continue regardless of error.
237  }
238  debug(`[${threadId}] uncaught exception serialized = ${!!serialized}`);
239  if (serialized)
240    port.postMessage({
241      type: ERROR_MESSAGE,
242      error: serialized,
243    });
244  else
245    port.postMessage({ type: COULD_NOT_SERIALIZE_ERROR });
246
247  const { clearAsyncIdStack } = require('internal/async_hooks');
248  clearAsyncIdStack();
249
250  process.exit();
251}
252
253// Patch the global uncaught exception handler so it gets picked up by
254// node::errors::TriggerUncaughtException().
255process._fatalException = workerOnGlobalUncaughtException;
256
257markBootstrapComplete();
258
259// Necessary to reset RegExp statics before user code runs.
260RegExpPrototypeExec(/^/, '');
261
262port.start();
263