• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  Map,
5  ObjectKeys,
6  ObjectValues,
7} = primordials;
8
9const assert = require('internal/assert');
10const { fork } = require('child_process');
11const path = require('path');
12const EventEmitter = require('events');
13const RoundRobinHandle = require('internal/cluster/round_robin_handle');
14const SharedHandle = require('internal/cluster/shared_handle');
15const Worker = require('internal/cluster/worker');
16const { internal, sendHelper } = require('internal/cluster/utils');
17const cluster = new EventEmitter();
18const intercom = new EventEmitter();
19const SCHED_NONE = 1;
20const SCHED_RR = 2;
21const [ minPort, maxPort ] = [ 1024, 65535 ];
22const { validatePort } = require('internal/validators');
23
24module.exports = cluster;
25
26const handles = new Map();
27cluster.isWorker = false;
28cluster.isMaster = true;
29cluster.Worker = Worker;
30cluster.workers = {};
31cluster.settings = {};
32cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
33cluster.SCHED_RR = SCHED_RR;      // Master distributes connections.
34
35let ids = 0;
36let debugPortOffset = 1;
37let initialized = false;
38
39// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
40let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY;
41if (schedulingPolicy === 'rr')
42  schedulingPolicy = SCHED_RR;
43else if (schedulingPolicy === 'none')
44  schedulingPolicy = SCHED_NONE;
45else if (process.platform === 'win32') {
46  // Round-robin doesn't perform well on
47  // Windows due to the way IOCP is wired up.
48  schedulingPolicy = SCHED_NONE;
49} else
50  schedulingPolicy = SCHED_RR;
51
52cluster.schedulingPolicy = schedulingPolicy;
53
54cluster.setupMaster = function(options) {
55  const settings = {
56    args: process.argv.slice(2),
57    exec: process.argv[1],
58    execArgv: process.execArgv,
59    silent: false,
60    ...cluster.settings,
61    ...options
62  };
63
64  // Tell V8 to write profile data for each process to a separate file.
65  // Without --logfile=v8-%p.log, everything ends up in a single, unusable
66  // file. (Unusable because what V8 logs are memory addresses and each
67  // process has its own memory mappings.)
68  if (settings.execArgv.some((s) => s.startsWith('--prof')) &&
69      !settings.execArgv.some((s) => s.startsWith('--logfile='))) {
70    settings.execArgv = [...settings.execArgv, '--logfile=v8-%p.log'];
71  }
72
73  cluster.settings = settings;
74
75  if (initialized === true)
76    return process.nextTick(setupSettingsNT, settings);
77
78  initialized = true;
79  schedulingPolicy = cluster.schedulingPolicy;  // Freeze policy.
80  assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
81         `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
82
83  process.nextTick(setupSettingsNT, settings);
84
85  process.on('internalMessage', (message) => {
86    if (message.cmd !== 'NODE_DEBUG_ENABLED')
87      return;
88
89    for (const worker of ObjectValues(cluster.workers)) {
90      if (worker.state === 'online' || worker.state === 'listening') {
91        process._debugProcess(worker.process.pid);
92      } else {
93        worker.once('online', function() {
94          process._debugProcess(this.process.pid);
95        });
96      }
97    }
98  });
99};
100
101function setupSettingsNT(settings) {
102  cluster.emit('setup', settings);
103}
104
105function createWorkerProcess(id, env) {
106  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
107  const execArgv = [...cluster.settings.execArgv];
108  const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/;
109  const nodeOptions = process.env.NODE_OPTIONS ?
110    process.env.NODE_OPTIONS : '';
111
112  if (execArgv.some((arg) => arg.match(debugArgRegex)) ||
113      nodeOptions.match(debugArgRegex)) {
114    let inspectPort;
115    if ('inspectPort' in cluster.settings) {
116      if (typeof cluster.settings.inspectPort === 'function')
117        inspectPort = cluster.settings.inspectPort();
118      else
119        inspectPort = cluster.settings.inspectPort;
120
121      validatePort(inspectPort);
122    } else {
123      inspectPort = process.debugPort + debugPortOffset;
124      if (inspectPort > maxPort)
125        inspectPort = inspectPort - maxPort + minPort - 1;
126      debugPortOffset++;
127    }
128
129    execArgv.push(`--inspect-port=${inspectPort}`);
130  }
131
132  return fork(cluster.settings.exec, cluster.settings.args, {
133    cwd: cluster.settings.cwd,
134    env: workerEnv,
135    serialization: cluster.settings.serialization,
136    silent: cluster.settings.silent,
137    windowsHide: cluster.settings.windowsHide,
138    execArgv: execArgv,
139    stdio: cluster.settings.stdio,
140    gid: cluster.settings.gid,
141    uid: cluster.settings.uid
142  });
143}
144
145function removeWorker(worker) {
146  assert(worker);
147  delete cluster.workers[worker.id];
148
149  if (ObjectKeys(cluster.workers).length === 0) {
150    assert(handles.size === 0, 'Resource leak detected.');
151    intercom.emit('disconnect');
152  }
153}
154
155function removeHandlesForWorker(worker) {
156  assert(worker);
157
158  handles.forEach((handle, key) => {
159    if (handle.remove(worker))
160      handles.delete(key);
161  });
162}
163
164cluster.fork = function(env) {
165  cluster.setupMaster();
166  const id = ++ids;
167  const workerProcess = createWorkerProcess(id, env);
168  const worker = new Worker({
169    id: id,
170    process: workerProcess
171  });
172
173  worker.on('message', function(message, handle) {
174    cluster.emit('message', this, message, handle);
175  });
176
177  worker.process.once('exit', (exitCode, signalCode) => {
178    /*
179     * Remove the worker from the workers list only
180     * if it has disconnected, otherwise we might
181     * still want to access it.
182     */
183    if (!worker.isConnected()) {
184      removeHandlesForWorker(worker);
185      removeWorker(worker);
186    }
187
188    worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
189    worker.state = 'dead';
190    worker.emit('exit', exitCode, signalCode);
191    cluster.emit('exit', worker, exitCode, signalCode);
192  });
193
194  worker.process.once('disconnect', () => {
195    /*
196     * Now is a good time to remove the handles
197     * associated with this worker because it is
198     * not connected to the master anymore.
199     */
200    removeHandlesForWorker(worker);
201
202    /*
203     * Remove the worker from the workers list only
204     * if its process has exited. Otherwise, we might
205     * still want to access it.
206     */
207    if (worker.isDead())
208      removeWorker(worker);
209
210    worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
211    worker.state = 'disconnected';
212    worker.emit('disconnect');
213    cluster.emit('disconnect', worker);
214  });
215
216  worker.process.on('internalMessage', internal(worker, onmessage));
217  process.nextTick(emitForkNT, worker);
218  cluster.workers[worker.id] = worker;
219  return worker;
220};
221
222function emitForkNT(worker) {
223  cluster.emit('fork', worker);
224}
225
226cluster.disconnect = function(cb) {
227  const workers = ObjectKeys(cluster.workers);
228
229  if (workers.length === 0) {
230    process.nextTick(() => intercom.emit('disconnect'));
231  } else {
232    for (const worker of ObjectValues(cluster.workers)) {
233      if (worker.isConnected()) {
234        worker.disconnect();
235      }
236    }
237  }
238
239  if (typeof cb === 'function')
240    intercom.once('disconnect', cb);
241};
242
243function onmessage(message, handle) {
244  const worker = this;
245
246  if (message.act === 'online')
247    online(worker);
248  else if (message.act === 'queryServer')
249    queryServer(worker, message);
250  else if (message.act === 'listening')
251    listening(worker, message);
252  else if (message.act === 'exitedAfterDisconnect')
253    exitedAfterDisconnect(worker, message);
254  else if (message.act === 'close')
255    close(worker, message);
256}
257
258function online(worker) {
259  worker.state = 'online';
260  worker.emit('online');
261  cluster.emit('online', worker);
262}
263
264function exitedAfterDisconnect(worker, message) {
265  worker.exitedAfterDisconnect = true;
266  send(worker, { ack: message.seq });
267}
268
269function queryServer(worker, message) {
270  // Stop processing if worker already disconnecting
271  if (worker.exitedAfterDisconnect)
272    return;
273
274  const key = `${message.address}:${message.port}:${message.addressType}:` +
275              `${message.fd}:${message.index}`;
276  let handle = handles.get(key);
277
278  if (handle === undefined) {
279    let address = message.address;
280
281    // Find shortest path for unix sockets because of the ~100 byte limit
282    if (message.port < 0 && typeof address === 'string' &&
283        process.platform !== 'win32') {
284
285      address = path.relative(process.cwd(), address);
286
287      if (message.address.length < address.length)
288        address = message.address;
289    }
290
291    let constructor = RoundRobinHandle;
292    // UDP is exempt from round-robin connection balancing for what should
293    // be obvious reasons: it's connectionless. There is nothing to send to
294    // the workers except raw datagrams and that's pointless.
295    if (schedulingPolicy !== SCHED_RR ||
296        message.addressType === 'udp4' ||
297        message.addressType === 'udp6') {
298      constructor = SharedHandle;
299    }
300
301    handle = new constructor(key, address, message);
302    handles.set(key, handle);
303  }
304
305  if (!handle.data)
306    handle.data = message.data;
307
308  // Set custom server data
309  handle.add(worker, (errno, reply, handle) => {
310    const { data } = handles.get(key);
311
312    if (errno)
313      handles.delete(key);  // Gives other workers a chance to retry.
314
315    send(worker, {
316      errno,
317      key,
318      ack: message.seq,
319      data,
320      ...reply
321    }, handle);
322  });
323}
324
325function listening(worker, message) {
326  const info = {
327    addressType: message.addressType,
328    address: message.address,
329    port: message.port,
330    fd: message.fd
331  };
332
333  worker.state = 'listening';
334  worker.emit('listening', info);
335  cluster.emit('listening', worker, info);
336}
337
338// Server in worker is closing, remove from list. The handle may have been
339// removed by a prior call to removeHandlesForWorker() so guard against that.
340function close(worker, message) {
341  const key = message.key;
342  const handle = handles.get(key);
343
344  if (handle && handle.remove(worker))
345    handles.delete(key);
346}
347
348function send(worker, message, handle, cb) {
349  return sendHelper(worker.process, message, handle, cb);
350}
351
352// Extend generic Worker with methods specific to the master process.
353Worker.prototype.disconnect = function() {
354  this.exitedAfterDisconnect = true;
355  send(this, { act: 'disconnect' });
356  removeHandlesForWorker(this);
357  removeWorker(this);
358  return this;
359};
360
361Worker.prototype.destroy = function(signo) {
362  const proc = this.process;
363
364  signo = signo || 'SIGTERM';
365
366  if (this.isConnected()) {
367    this.once('disconnect', () => proc.kill(signo));
368    this.disconnect();
369    return;
370  }
371
372  proc.kill(signo);
373};
374