• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypeJoin,
5  FunctionPrototype,
6  ObjectAssign,
7  ReflectApply,
8  SafeMap,
9  SafeSet,
10} = primordials;
11
12const assert = require('internal/assert');
13const path = require('path');
14const EventEmitter = require('events');
15const { owner_symbol } = require('internal/async_hooks').symbols;
16const Worker = require('internal/cluster/worker');
17const { internal, sendHelper } = require('internal/cluster/utils');
18const { TIMEOUT_MAX } = require('internal/timers');
19const { setInterval, clearInterval } = require('timers');
20
21const cluster = new EventEmitter();
22const handles = new SafeMap();
23const indexes = new SafeMap();
24const noop = FunctionPrototype;
25
26module.exports = cluster;
27
28cluster.isWorker = true;
29cluster.isMaster = false; // Deprecated alias. Must be same as isPrimary.
30cluster.isPrimary = false;
31cluster.worker = null;
32cluster.Worker = Worker;
33
34cluster._setupWorker = function() {
35  const worker = new Worker({
36    id: +process.env.NODE_UNIQUE_ID | 0,
37    process: process,
38    state: 'online',
39  });
40
41  cluster.worker = worker;
42
43  process.once('disconnect', () => {
44    worker.emit('disconnect');
45
46    if (!worker.exitedAfterDisconnect) {
47      // Unexpected disconnect, primary exited, or some such nastiness, so
48      // worker exits immediately.
49      process.exit(0);
50    }
51  });
52
53  process.on('internalMessage', internal(worker, onmessage));
54  send({ act: 'online' });
55
56  function onmessage(message, handle) {
57    if (message.act === 'newconn')
58      onconnection(message, handle);
59    else if (message.act === 'disconnect')
60      ReflectApply(_disconnect, worker, [true]);
61  }
62};
63
64// `obj` is a net#Server or a dgram#Socket object.
65cluster._getServer = function(obj, options, cb) {
66  let address = options.address;
67
68  // Resolve unix socket paths to absolute paths
69  if (options.port < 0 && typeof address === 'string' &&
70      process.platform !== 'win32')
71    address = path.resolve(address);
72
73  const indexesKey = ArrayPrototypeJoin(
74    [
75      address,
76      options.port,
77      options.addressType,
78      options.fd,
79    ], ':');
80
81  let indexSet = indexes.get(indexesKey);
82
83  if (indexSet === undefined) {
84    indexSet = { nextIndex: 0, set: new SafeSet() };
85    indexes.set(indexesKey, indexSet);
86  }
87  const index = indexSet.nextIndex++;
88  indexSet.set.add(index);
89
90  const message = {
91    act: 'queryServer',
92    index,
93    data: null,
94    ...options,
95  };
96
97  message.address = address;
98
99  // Set custom data on handle (i.e. tls tickets key)
100  if (obj._getServerData)
101    message.data = obj._getServerData();
102
103  send(message, (reply, handle) => {
104    if (typeof obj._setServerData === 'function')
105      obj._setServerData(reply.data);
106
107    if (handle) {
108      // Shared listen socket
109      shared(reply, { handle, indexesKey, index }, cb);
110    } else {
111      // Round-robin.
112      rr(reply, { indexesKey, index }, cb);
113    }
114  });
115
116  obj.once('listening', () => {
117    // short-lived sockets might have been closed
118    if (!indexes.has(indexesKey)) {
119      return;
120    }
121    cluster.worker.state = 'listening';
122    const address = obj.address();
123    message.act = 'listening';
124    message.port = (address && address.port) || options.port;
125    send(message);
126  });
127};
128
129function removeIndexesKey(indexesKey, index) {
130  const indexSet = indexes.get(indexesKey);
131  if (!indexSet) {
132    return;
133  }
134
135  indexSet.set.delete(index);
136  if (indexSet.set.size === 0) {
137    indexes.delete(indexesKey);
138  }
139}
140
141// Shared listen socket.
142function shared(message, { handle, indexesKey, index }, cb) {
143  const key = message.key;
144  // Monkey-patch the close() method so we can keep track of when it's
145  // closed. Avoids resource leaks when the handle is short-lived.
146  const close = handle.close;
147
148  handle.close = function() {
149    send({ act: 'close', key });
150    handles.delete(key);
151    removeIndexesKey(indexesKey, index);
152    return ReflectApply(close, handle, arguments);
153  };
154  assert(handles.has(key) === false);
155  handles.set(key, handle);
156  cb(message.errno, handle);
157}
158
159// Round-robin. Master distributes handles across workers.
160function rr(message, { indexesKey, index }, cb) {
161  if (message.errno)
162    return cb(message.errno, null);
163
164  let key = message.key;
165
166  let fakeHandle = null;
167
168  function ref() {
169    if (!fakeHandle) {
170      fakeHandle = setInterval(noop, TIMEOUT_MAX);
171    }
172  }
173
174  function unref() {
175    if (fakeHandle) {
176      clearInterval(fakeHandle);
177      fakeHandle = null;
178    }
179  }
180
181  function listen(backlog) {
182    // TODO(bnoordhuis) Send a message to the primary that tells it to
183    // update the backlog size. The actual backlog should probably be
184    // the largest requested size by any worker.
185    return 0;
186  }
187
188  function close() {
189    // lib/net.js treats server._handle.close() as effectively synchronous.
190    // That means there is a time window between the call to close() and
191    // the ack by the primary process in which we can still receive handles.
192    // onconnection() below handles that by sending those handles back to
193    // the primary.
194    if (key === undefined)
195      return;
196    unref();
197    // If the handle is the last handle in process,
198    // the parent process will delete the handle when worker process exits.
199    // So it is ok if the close message get lost.
200    // See the comments of https://github.com/nodejs/node/pull/46161
201    send({ act: 'close', key });
202    handles.delete(key);
203    removeIndexesKey(indexesKey, index);
204    key = undefined;
205  }
206
207  function getsockname(out) {
208    if (key)
209      ObjectAssign(out, message.sockname);
210
211    return 0;
212  }
213
214  // Faux handle. net.Server is not associated with handle,
215  // so we control its state(ref or unref) by setInterval.
216  const handle = { close, listen, ref, unref };
217  handle.ref();
218  if (message.sockname) {
219    handle.getsockname = getsockname;  // TCP handles only.
220  }
221
222  assert(handles.has(key) === false);
223  handles.set(key, handle);
224  cb(0, handle);
225}
226
227// Round-robin connection.
228function onconnection(message, handle) {
229  const key = message.key;
230  const server = handles.get(key);
231  let accepted = server !== undefined;
232
233  if (accepted && server[owner_symbol]) {
234    const self = server[owner_symbol];
235    if (self.maxConnections && self._connections >= self.maxConnections) {
236      accepted = false;
237    }
238  }
239
240  send({ ack: message.seq, accepted });
241
242  if (accepted)
243    server.onconnection(0, handle);
244  else
245    handle.close();
246}
247
248function send(message, cb) {
249  return sendHelper(process, message, null, cb);
250}
251
252function _disconnect(primaryInitiated) {
253  this.exitedAfterDisconnect = true;
254  let waitingCount = 1;
255
256  function checkWaitingCount() {
257    waitingCount--;
258
259    if (waitingCount === 0) {
260      // If disconnect is worker initiated, wait for ack to be sure
261      // exitedAfterDisconnect is properly set in the primary, otherwise, if
262      // it's primary initiated there's no need to send the
263      // exitedAfterDisconnect message
264      if (primaryInitiated) {
265        process.disconnect();
266      } else {
267        send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
268      }
269    }
270  }
271
272  handles.forEach((handle) => {
273    waitingCount++;
274
275    if (handle[owner_symbol])
276      handle[owner_symbol].close(checkWaitingCount);
277    else
278      handle.close(checkWaitingCount);
279  });
280
281  handles.clear();
282  checkWaitingCount();
283}
284
285// Extend generic Worker with methods specific to worker processes.
286Worker.prototype.disconnect = function() {
287  if (this.state !== 'disconnecting' && this.state !== 'destroying') {
288    this.state = 'disconnecting';
289    ReflectApply(_disconnect, this, []);
290  }
291
292  return this;
293};
294
295Worker.prototype.destroy = function() {
296  if (this.state === 'destroying')
297    return;
298
299  this.exitedAfterDisconnect = true;
300  if (!this.isConnected()) {
301    process.exit(0);
302  } else {
303    this.state = 'destroying';
304    send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
305    process.once('disconnect', () => process.exit(0));
306  }
307};
308