• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypePush,
5  ArrayPrototypeSlice,
6  ArrayPrototypeSome,
7  ObjectKeys,
8  ObjectValues,
9  SafeMap,
10  StringPrototypeStartsWith,
11} = primordials;
12const {
13  codes: {
14    ERR_SOCKET_BAD_PORT,
15  },
16} = require('internal/errors');
17
18const assert = require('internal/assert');
19const { fork } = require('child_process');
20const path = require('path');
21const EventEmitter = require('events');
22const RoundRobinHandle = require('internal/cluster/round_robin_handle');
23const SharedHandle = require('internal/cluster/shared_handle');
24const Worker = require('internal/cluster/worker');
25const { getInspectPort, isUsingInspector } = require('internal/util/inspector');
26const { internal, sendHelper } = require('internal/cluster/utils');
27const cluster = new EventEmitter();
28const intercom = new EventEmitter();
29const SCHED_NONE = 1;
30const SCHED_RR = 2;
31
32module.exports = cluster;
33
34const handles = new SafeMap();
35cluster.isWorker = false;
36cluster.isMaster = true; // Deprecated alias. Must be same as isPrimary.
37cluster.isPrimary = true;
38cluster.Worker = Worker;
39cluster.workers = {};
40cluster.settings = {};
41cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
42cluster.SCHED_RR = SCHED_RR;      // Primary distributes connections.
43
44let ids = 0;
45let initialized = false;
46
47// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
48let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY;
49if (schedulingPolicy === 'rr')
50  schedulingPolicy = SCHED_RR;
51else if (schedulingPolicy === 'none')
52  schedulingPolicy = SCHED_NONE;
53else if (process.platform === 'win32') {
54  // Round-robin doesn't perform well on
55  // Windows due to the way IOCP is wired up.
56  schedulingPolicy = SCHED_NONE;
57} else
58  schedulingPolicy = SCHED_RR;
59
60cluster.schedulingPolicy = schedulingPolicy;
61
62cluster.setupPrimary = function(options) {
63  const settings = {
64    args: ArrayPrototypeSlice(process.argv, 2),
65    exec: process.argv[1],
66    execArgv: process.execArgv,
67    silent: false,
68    ...cluster.settings,
69    ...options,
70  };
71
72  // Tell V8 to write profile data for each process to a separate file.
73  // Without --logfile=v8-%p.log, everything ends up in a single, unusable
74  // file. (Unusable because what V8 logs are memory addresses and each
75  // process has its own memory mappings.)
76  if (ArrayPrototypeSome(settings.execArgv,
77                         (s) => StringPrototypeStartsWith(s, '--prof')) &&
78      !ArrayPrototypeSome(settings.execArgv,
79                          (s) => StringPrototypeStartsWith(s, '--logfile='))) {
80    settings.execArgv = [...settings.execArgv, '--logfile=v8-%p.log'];
81  }
82
83  cluster.settings = settings;
84
85  if (initialized === true)
86    return process.nextTick(setupSettingsNT, settings);
87
88  initialized = true;
89  schedulingPolicy = cluster.schedulingPolicy;  // Freeze policy.
90  assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
91         `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
92
93  process.nextTick(setupSettingsNT, settings);
94
95  process.on('internalMessage', (message) => {
96    if (message.cmd !== 'NODE_DEBUG_ENABLED')
97      return;
98
99    for (const worker of ObjectValues(cluster.workers)) {
100      if (worker.state === 'online' || worker.state === 'listening') {
101        process._debugProcess(worker.process.pid);
102      } else {
103        worker.once('online', function() {
104          process._debugProcess(this.process.pid);
105        });
106      }
107    }
108  });
109};
110
111// Deprecated alias must be same as setupPrimary
112cluster.setupMaster = cluster.setupPrimary;
113
114function setupSettingsNT(settings) {
115  cluster.emit('setup', settings);
116}
117
118function createWorkerProcess(id, env) {
119  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
120  const execArgv = [...cluster.settings.execArgv];
121
122  if (cluster.settings.inspectPort === null) {
123    throw new ERR_SOCKET_BAD_PORT('Port', null, true);
124  }
125  if (isUsingInspector(cluster.settings.execArgv)) {
126    ArrayPrototypePush(execArgv, `--inspect-port=${getInspectPort(cluster.settings.inspectPort)}`);
127  }
128
129  return fork(cluster.settings.exec, cluster.settings.args, {
130    cwd: cluster.settings.cwd,
131    env: workerEnv,
132    serialization: cluster.settings.serialization,
133    silent: cluster.settings.silent,
134    windowsHide: cluster.settings.windowsHide,
135    execArgv: execArgv,
136    stdio: cluster.settings.stdio,
137    gid: cluster.settings.gid,
138    uid: cluster.settings.uid,
139  });
140}
141
142function removeWorker(worker) {
143  assert(worker);
144  delete cluster.workers[worker.id];
145
146  if (ObjectKeys(cluster.workers).length === 0) {
147    assert(handles.size === 0, 'Resource leak detected.');
148    intercom.emit('disconnect');
149  }
150}
151
152function removeHandlesForWorker(worker) {
153  assert(worker);
154
155  handles.forEach((handle, key) => {
156    if (handle.remove(worker))
157      handles.delete(key);
158  });
159}
160
161cluster.fork = function(env) {
162  cluster.setupPrimary();
163  const id = ++ids;
164  const workerProcess = createWorkerProcess(id, env);
165  const worker = new Worker({
166    id: id,
167    process: workerProcess,
168  });
169
170  worker.on('message', function(message, handle) {
171    cluster.emit('message', this, message, handle);
172  });
173
174  worker.process.once('exit', (exitCode, signalCode) => {
175    /*
176     * Remove the worker from the workers list only
177     * if it has disconnected, otherwise we might
178     * still want to access it.
179     */
180    if (!worker.isConnected()) {
181      removeHandlesForWorker(worker);
182      removeWorker(worker);
183    }
184
185    worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
186    worker.state = 'dead';
187    worker.emit('exit', exitCode, signalCode);
188    cluster.emit('exit', worker, exitCode, signalCode);
189  });
190
191  worker.process.once('disconnect', () => {
192    /*
193     * Now is a good time to remove the handles
194     * associated with this worker because it is
195     * not connected to the primary anymore.
196     */
197    removeHandlesForWorker(worker);
198
199    /*
200     * Remove the worker from the workers list only
201     * if its process has exited. Otherwise, we might
202     * still want to access it.
203     */
204    if (worker.isDead())
205      removeWorker(worker);
206
207    worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
208    worker.state = 'disconnected';
209    worker.emit('disconnect');
210    cluster.emit('disconnect', worker);
211  });
212
213  worker.process.on('internalMessage', internal(worker, onmessage));
214  process.nextTick(emitForkNT, worker);
215  cluster.workers[worker.id] = worker;
216  return worker;
217};
218
219function emitForkNT(worker) {
220  cluster.emit('fork', worker);
221}
222
223cluster.disconnect = function(cb) {
224  const workers = ObjectKeys(cluster.workers);
225
226  if (workers.length === 0) {
227    process.nextTick(() => intercom.emit('disconnect'));
228  } else {
229    for (const worker of ObjectValues(cluster.workers)) {
230      if (worker.isConnected()) {
231        worker.disconnect();
232      }
233    }
234  }
235
236  if (typeof cb === 'function')
237    intercom.once('disconnect', cb);
238};
239
240const methodMessageMapping = {
241  close,
242  exitedAfterDisconnect,
243  listening,
244  online,
245  queryServer,
246};
247
248function onmessage(message, handle) {
249  const worker = this;
250
251  const fn = methodMessageMapping[message.act];
252
253  if (typeof fn === 'function')
254    fn(worker, message);
255}
256
257function online(worker) {
258  worker.state = 'online';
259  worker.emit('online');
260  cluster.emit('online', worker);
261}
262
263function exitedAfterDisconnect(worker, message) {
264  worker.exitedAfterDisconnect = true;
265  send(worker, { ack: message.seq });
266}
267
268function queryServer(worker, message) {
269  // Stop processing if worker already disconnecting
270  if (worker.exitedAfterDisconnect)
271    return;
272
273  const key = `${message.address}:${message.port}:${message.addressType}:` +
274              `${message.fd}:${message.index}`;
275  let handle = handles.get(key);
276
277  if (handle === undefined) {
278    let address = message.address;
279
280    // Find shortest path for unix sockets because of the ~100 byte limit
281    if (message.port < 0 && typeof address === 'string' &&
282        process.platform !== 'win32') {
283
284      address = path.relative(process.cwd(), address);
285
286      if (message.address.length < address.length)
287        address = message.address;
288    }
289
290    // UDP is exempt from round-robin connection balancing for what should
291    // be obvious reasons: it's connectionless. There is nothing to send to
292    // the workers except raw datagrams and that's pointless.
293    if (schedulingPolicy !== SCHED_RR ||
294        message.addressType === 'udp4' ||
295        message.addressType === 'udp6') {
296      handle = new SharedHandle(key, address, message);
297    } else {
298      handle = new RoundRobinHandle(key, address, message);
299    }
300
301    handles.set(key, handle);
302  }
303
304  if (!handle.data)
305    handle.data = message.data;
306
307  // Set custom server data
308  handle.add(worker, (errno, reply, handle) => {
309    const { data } = handles.get(key);
310
311    if (errno)
312      handles.delete(key);  // Gives other workers a chance to retry.
313
314    send(worker, {
315      errno,
316      key,
317      ack: message.seq,
318      data,
319      ...reply,
320    }, handle);
321  });
322}
323
324function listening(worker, message) {
325  const info = {
326    addressType: message.addressType,
327    address: message.address,
328    port: message.port,
329    fd: message.fd,
330  };
331
332  worker.state = 'listening';
333  worker.emit('listening', info);
334  cluster.emit('listening', worker, info);
335}
336
337// Server in worker is closing, remove from list. The handle may have been
338// removed by a prior call to removeHandlesForWorker() so guard against that.
339function close(worker, message) {
340  const key = message.key;
341  const handle = handles.get(key);
342
343  if (handle && handle.remove(worker))
344    handles.delete(key);
345}
346
347function send(worker, message, handle, cb) {
348  return sendHelper(worker.process, message, handle, cb);
349}
350
351// Extend generic Worker with methods specific to the primary process.
352Worker.prototype.disconnect = function() {
353  this.exitedAfterDisconnect = true;
354  send(this, { act: 'disconnect' });
355  removeHandlesForWorker(this);
356  removeWorker(this);
357  return this;
358};
359
360Worker.prototype.destroy = function(signo) {
361  const proc = this.process;
362  const signal = signo || 'SIGTERM';
363
364  proc.kill(signal);
365};
366