• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypePush,
5  ArrayPrototypeSlice,
6  ArrayPrototypeSome,
7  ObjectKeys,
8  ObjectValues,
9  RegExpPrototypeTest,
10  SafeMap,
11  StringPrototypeStartsWith,
12} = primordials;
13
14const assert = require('internal/assert');
15const { fork } = require('child_process');
16const path = require('path');
17const EventEmitter = require('events');
18const RoundRobinHandle = require('internal/cluster/round_robin_handle');
19const SharedHandle = require('internal/cluster/shared_handle');
20const Worker = require('internal/cluster/worker');
21const { internal, sendHelper } = require('internal/cluster/utils');
22const cluster = new EventEmitter();
23const intercom = new EventEmitter();
24const SCHED_NONE = 1;
25const SCHED_RR = 2;
26const [ minPort, maxPort ] = [ 1024, 65535 ];
27const { validatePort } = require('internal/validators');
28
29module.exports = cluster;
30
31const handles = new SafeMap();
32cluster.isWorker = false;
33cluster.isMaster = true;
34cluster.Worker = Worker;
35cluster.workers = {};
36cluster.settings = {};
37cluster.SCHED_NONE = SCHED_NONE;  // Leave it to the operating system.
38cluster.SCHED_RR = SCHED_RR;      // Master distributes connections.
39
40let ids = 0;
41let debugPortOffset = 1;
42let initialized = false;
43
44// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings?
45let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY;
46if (schedulingPolicy === 'rr')
47  schedulingPolicy = SCHED_RR;
48else if (schedulingPolicy === 'none')
49  schedulingPolicy = SCHED_NONE;
50else if (process.platform === 'win32') {
51  // Round-robin doesn't perform well on
52  // Windows due to the way IOCP is wired up.
53  schedulingPolicy = SCHED_NONE;
54} else
55  schedulingPolicy = SCHED_RR;
56
57cluster.schedulingPolicy = schedulingPolicy;
58
59cluster.setupMaster = function(options) {
60  const settings = {
61    args: ArrayPrototypeSlice(process.argv, 2),
62    exec: process.argv[1],
63    execArgv: process.execArgv,
64    silent: false,
65    ...cluster.settings,
66    ...options
67  };
68
69  // Tell V8 to write profile data for each process to a separate file.
70  // Without --logfile=v8-%p.log, everything ends up in a single, unusable
71  // file. (Unusable because what V8 logs are memory addresses and each
72  // process has its own memory mappings.)
73  if (ArrayPrototypeSome(settings.execArgv,
74                         (s) => StringPrototypeStartsWith(s, '--prof')) &&
75      !ArrayPrototypeSome(settings.execArgv,
76                          (s) => StringPrototypeStartsWith(s, '--logfile='))) {
77    settings.execArgv = [...settings.execArgv, '--logfile=v8-%p.log'];
78  }
79
80  cluster.settings = settings;
81
82  if (initialized === true)
83    return process.nextTick(setupSettingsNT, settings);
84
85  initialized = true;
86  schedulingPolicy = cluster.schedulingPolicy;  // Freeze policy.
87  assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
88         `Bad cluster.schedulingPolicy: ${schedulingPolicy}`);
89
90  process.nextTick(setupSettingsNT, settings);
91
92  process.on('internalMessage', (message) => {
93    if (message.cmd !== 'NODE_DEBUG_ENABLED')
94      return;
95
96    for (const worker of ObjectValues(cluster.workers)) {
97      if (worker.state === 'online' || worker.state === 'listening') {
98        process._debugProcess(worker.process.pid);
99      } else {
100        worker.once('online', function() {
101          process._debugProcess(this.process.pid);
102        });
103      }
104    }
105  });
106};
107
108function setupSettingsNT(settings) {
109  cluster.emit('setup', settings);
110}
111
112function createWorkerProcess(id, env) {
113  const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` };
114  const execArgv = [...cluster.settings.execArgv];
115  const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/;
116  const nodeOptions = process.env.NODE_OPTIONS ?
117    process.env.NODE_OPTIONS : '';
118
119  if (ArrayPrototypeSome(execArgv,
120                         (arg) => RegExpPrototypeTest(debugArgRegex, arg)) ||
121      RegExpPrototypeTest(debugArgRegex, nodeOptions)) {
122    let inspectPort;
123    if ('inspectPort' in cluster.settings) {
124      if (typeof cluster.settings.inspectPort === 'function')
125        inspectPort = cluster.settings.inspectPort();
126      else
127        inspectPort = cluster.settings.inspectPort;
128
129      validatePort(inspectPort);
130    } else {
131      inspectPort = process.debugPort + debugPortOffset;
132      if (inspectPort > maxPort)
133        inspectPort = inspectPort - maxPort + minPort - 1;
134      debugPortOffset++;
135    }
136
137    ArrayPrototypePush(execArgv, `--inspect-port=${inspectPort}`);
138  }
139
140  return fork(cluster.settings.exec, cluster.settings.args, {
141    cwd: cluster.settings.cwd,
142    env: workerEnv,
143    serialization: cluster.settings.serialization,
144    silent: cluster.settings.silent,
145    windowsHide: cluster.settings.windowsHide,
146    execArgv: execArgv,
147    stdio: cluster.settings.stdio,
148    gid: cluster.settings.gid,
149    uid: cluster.settings.uid
150  });
151}
152
153function removeWorker(worker) {
154  assert(worker);
155  delete cluster.workers[worker.id];
156
157  if (ObjectKeys(cluster.workers).length === 0) {
158    assert(handles.size === 0, 'Resource leak detected.');
159    intercom.emit('disconnect');
160  }
161}
162
163function removeHandlesForWorker(worker) {
164  assert(worker);
165
166  handles.forEach((handle, key) => {
167    if (handle.remove(worker))
168      handles.delete(key);
169  });
170}
171
172cluster.fork = function(env) {
173  cluster.setupMaster();
174  const id = ++ids;
175  const workerProcess = createWorkerProcess(id, env);
176  const worker = new Worker({
177    id: id,
178    process: workerProcess
179  });
180
181  worker.on('message', function(message, handle) {
182    cluster.emit('message', this, message, handle);
183  });
184
185  worker.process.once('exit', (exitCode, signalCode) => {
186    /*
187     * Remove the worker from the workers list only
188     * if it has disconnected, otherwise we might
189     * still want to access it.
190     */
191    if (!worker.isConnected()) {
192      removeHandlesForWorker(worker);
193      removeWorker(worker);
194    }
195
196    worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
197    worker.state = 'dead';
198    worker.emit('exit', exitCode, signalCode);
199    cluster.emit('exit', worker, exitCode, signalCode);
200  });
201
202  worker.process.once('disconnect', () => {
203    /*
204     * Now is a good time to remove the handles
205     * associated with this worker because it is
206     * not connected to the master anymore.
207     */
208    removeHandlesForWorker(worker);
209
210    /*
211     * Remove the worker from the workers list only
212     * if its process has exited. Otherwise, we might
213     * still want to access it.
214     */
215    if (worker.isDead())
216      removeWorker(worker);
217
218    worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
219    worker.state = 'disconnected';
220    worker.emit('disconnect');
221    cluster.emit('disconnect', worker);
222  });
223
224  worker.process.on('internalMessage', internal(worker, onmessage));
225  process.nextTick(emitForkNT, worker);
226  cluster.workers[worker.id] = worker;
227  return worker;
228};
229
230function emitForkNT(worker) {
231  cluster.emit('fork', worker);
232}
233
234cluster.disconnect = function(cb) {
235  const workers = ObjectKeys(cluster.workers);
236
237  if (workers.length === 0) {
238    process.nextTick(() => intercom.emit('disconnect'));
239  } else {
240    for (const worker of ObjectValues(cluster.workers)) {
241      if (worker.isConnected()) {
242        worker.disconnect();
243      }
244    }
245  }
246
247  if (typeof cb === 'function')
248    intercom.once('disconnect', cb);
249};
250
251function onmessage(message, handle) {
252  const worker = this;
253
254  if (message.act === 'online')
255    online(worker);
256  else if (message.act === 'queryServer')
257    queryServer(worker, message);
258  else if (message.act === 'listening')
259    listening(worker, message);
260  else if (message.act === 'exitedAfterDisconnect')
261    exitedAfterDisconnect(worker, message);
262  else if (message.act === 'close')
263    close(worker, message);
264}
265
266function online(worker) {
267  worker.state = 'online';
268  worker.emit('online');
269  cluster.emit('online', worker);
270}
271
272function exitedAfterDisconnect(worker, message) {
273  worker.exitedAfterDisconnect = true;
274  send(worker, { ack: message.seq });
275}
276
277function queryServer(worker, message) {
278  // Stop processing if worker already disconnecting
279  if (worker.exitedAfterDisconnect)
280    return;
281
282  const key = `${message.address}:${message.port}:${message.addressType}:` +
283              `${message.fd}:${message.index}`;
284  let handle = handles.get(key);
285
286  if (handle === undefined) {
287    let address = message.address;
288
289    // Find shortest path for unix sockets because of the ~100 byte limit
290    if (message.port < 0 && typeof address === 'string' &&
291        process.platform !== 'win32') {
292
293      address = path.relative(process.cwd(), address);
294
295      if (message.address.length < address.length)
296        address = message.address;
297    }
298
299    let constructor = RoundRobinHandle;
300    // UDP is exempt from round-robin connection balancing for what should
301    // be obvious reasons: it's connectionless. There is nothing to send to
302    // the workers except raw datagrams and that's pointless.
303    if (schedulingPolicy !== SCHED_RR ||
304        message.addressType === 'udp4' ||
305        message.addressType === 'udp6') {
306      constructor = SharedHandle;
307    }
308
309    handle = new constructor(key, address, message);
310    handles.set(key, handle);
311  }
312
313  if (!handle.data)
314    handle.data = message.data;
315
316  // Set custom server data
317  handle.add(worker, (errno, reply, handle) => {
318    const { data } = handles.get(key);
319
320    if (errno)
321      handles.delete(key);  // Gives other workers a chance to retry.
322
323    send(worker, {
324      errno,
325      key,
326      ack: message.seq,
327      data,
328      ...reply
329    }, handle);
330  });
331}
332
333function listening(worker, message) {
334  const info = {
335    addressType: message.addressType,
336    address: message.address,
337    port: message.port,
338    fd: message.fd
339  };
340
341  worker.state = 'listening';
342  worker.emit('listening', info);
343  cluster.emit('listening', worker, info);
344}
345
346// Server in worker is closing, remove from list. The handle may have been
347// removed by a prior call to removeHandlesForWorker() so guard against that.
348function close(worker, message) {
349  const key = message.key;
350  const handle = handles.get(key);
351
352  if (handle && handle.remove(worker))
353    handles.delete(key);
354}
355
356function send(worker, message, handle, cb) {
357  return sendHelper(worker.process, message, handle, cb);
358}
359
360// Extend generic Worker with methods specific to the master process.
361Worker.prototype.disconnect = function() {
362  this.exitedAfterDisconnect = true;
363  send(this, { act: 'disconnect' });
364  removeHandlesForWorker(this);
365  removeWorker(this);
366  return this;
367};
368
369Worker.prototype.destroy = function(signo) {
370  const proc = this.process;
371
372  signo = signo || 'SIGTERM';
373
374  if (this.isConnected()) {
375    this.once('disconnect', () => proc.kill(signo));
376    this.disconnect();
377    return;
378  }
379
380  proc.kill(signo);
381};
382