• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  Map,
5  ObjectAssign,
6} = primordials;
7
8const assert = require('internal/assert');
9const path = require('path');
10const EventEmitter = require('events');
11const { owner_symbol } = require('internal/async_hooks').symbols;
12const Worker = require('internal/cluster/worker');
13const { internal, sendHelper } = require('internal/cluster/utils');
14const cluster = new EventEmitter();
15const handles = new Map();
16const indexes = new Map();
17const noop = () => {};
18
19module.exports = cluster;
20
21cluster.isWorker = true;
22cluster.isMaster = false;
23cluster.worker = null;
24cluster.Worker = Worker;
25
26cluster._setupWorker = function() {
27  const worker = new Worker({
28    id: +process.env.NODE_UNIQUE_ID | 0,
29    process: process,
30    state: 'online'
31  });
32
33  cluster.worker = worker;
34
35  process.once('disconnect', () => {
36    worker.emit('disconnect');
37
38    if (!worker.exitedAfterDisconnect) {
39      // Unexpected disconnect, master exited, or some such nastiness, so
40      // worker exits immediately.
41      process.exit(0);
42    }
43  });
44
45  process.on('internalMessage', internal(worker, onmessage));
46  send({ act: 'online' });
47
48  function onmessage(message, handle) {
49    if (message.act === 'newconn')
50      onconnection(message, handle);
51    else if (message.act === 'disconnect')
52      _disconnect.call(worker, true);
53  }
54};
55
56// `obj` is a net#Server or a dgram#Socket object.
57cluster._getServer = function(obj, options, cb) {
58  let address = options.address;
59
60  // Resolve unix socket paths to absolute paths
61  if (options.port < 0 && typeof address === 'string' &&
62      process.platform !== 'win32')
63    address = path.resolve(address);
64
65  const indexesKey = [address,
66                      options.port,
67                      options.addressType,
68                      options.fd ].join(':');
69
70  let index = indexes.get(indexesKey);
71
72  if (index === undefined)
73    index = 0;
74  else
75    index++;
76
77  indexes.set(indexesKey, index);
78
79  const message = {
80    act: 'queryServer',
81    index,
82    data: null,
83    ...options
84  };
85
86  message.address = address;
87
88  // Set custom data on handle (i.e. tls tickets key)
89  if (obj._getServerData)
90    message.data = obj._getServerData();
91
92  send(message, (reply, handle) => {
93    if (typeof obj._setServerData === 'function')
94      obj._setServerData(reply.data);
95
96    if (handle)
97      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
98    else
99      rr(reply, indexesKey, cb);              // Round-robin.
100  });
101
102  obj.once('listening', () => {
103    cluster.worker.state = 'listening';
104    const address = obj.address();
105    message.act = 'listening';
106    message.port = (address && address.port) || options.port;
107    send(message);
108  });
109};
110
111// Shared listen socket.
112function shared(message, handle, indexesKey, cb) {
113  const key = message.key;
114  // Monkey-patch the close() method so we can keep track of when it's
115  // closed. Avoids resource leaks when the handle is short-lived.
116  const close = handle.close;
117
118  handle.close = function() {
119    send({ act: 'close', key });
120    handles.delete(key);
121    indexes.delete(indexesKey);
122    return close.apply(handle, arguments);
123  };
124  assert(handles.has(key) === false);
125  handles.set(key, handle);
126  cb(message.errno, handle);
127}
128
129// Round-robin. Master distributes handles across workers.
130function rr(message, indexesKey, cb) {
131  if (message.errno)
132    return cb(message.errno, null);
133
134  let key = message.key;
135
136  function listen(backlog) {
137    // TODO(bnoordhuis) Send a message to the master that tells it to
138    // update the backlog size. The actual backlog should probably be
139    // the largest requested size by any worker.
140    return 0;
141  }
142
143  function close() {
144    // lib/net.js treats server._handle.close() as effectively synchronous.
145    // That means there is a time window between the call to close() and
146    // the ack by the master process in which we can still receive handles.
147    // onconnection() below handles that by sending those handles back to
148    // the master.
149    if (key === undefined)
150      return;
151
152    send({ act: 'close', key });
153    handles.delete(key);
154    indexes.delete(indexesKey);
155    key = undefined;
156  }
157
158  function getsockname(out) {
159    if (key)
160      ObjectAssign(out, message.sockname);
161
162    return 0;
163  }
164
165  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
166  // with it. Fools net.Server into thinking that it's backed by a real
167  // handle. Use a noop function for ref() and unref() because the control
168  // channel is going to keep the worker alive anyway.
169  const handle = { close, listen, ref: noop, unref: noop };
170
171  if (message.sockname) {
172    handle.getsockname = getsockname;  // TCP handles only.
173  }
174
175  assert(handles.has(key) === false);
176  handles.set(key, handle);
177  cb(0, handle);
178}
179
180// Round-robin connection.
181function onconnection(message, handle) {
182  const key = message.key;
183  const server = handles.get(key);
184  const accepted = server !== undefined;
185
186  send({ ack: message.seq, accepted });
187
188  if (accepted)
189    server.onconnection(0, handle);
190}
191
192function send(message, cb) {
193  return sendHelper(process, message, null, cb);
194}
195
196function _disconnect(masterInitiated) {
197  this.exitedAfterDisconnect = true;
198  let waitingCount = 1;
199
200  function checkWaitingCount() {
201    waitingCount--;
202
203    if (waitingCount === 0) {
204      // If disconnect is worker initiated, wait for ack to be sure
205      // exitedAfterDisconnect is properly set in the master, otherwise, if
206      // it's master initiated there's no need to send the
207      // exitedAfterDisconnect message
208      if (masterInitiated) {
209        process.disconnect();
210      } else {
211        send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
212      }
213    }
214  }
215
216  handles.forEach((handle) => {
217    waitingCount++;
218
219    if (handle[owner_symbol])
220      handle[owner_symbol].close(checkWaitingCount);
221    else
222      handle.close(checkWaitingCount);
223  });
224
225  handles.clear();
226  checkWaitingCount();
227}
228
229// Extend generic Worker with methods specific to worker processes.
230Worker.prototype.disconnect = function() {
231  if (![ 'disconnecting', 'destroying' ].includes(this.state)) {
232    this.state = 'disconnecting';
233    _disconnect.call(this);
234  }
235
236  return this;
237};
238
239Worker.prototype.destroy = function() {
240  if (this.state === 'destroying')
241    return;
242
243  this.exitedAfterDisconnect = true;
244  if (!this.isConnected()) {
245    process.exit(0);
246  } else {
247    this.state = 'destroying';
248    send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
249    process.once('disconnect', () => process.exit(0));
250  }
251};
252