• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayPrototypeJoin,
5  FunctionPrototype,
6  ObjectAssign,
7  ReflectApply,
8  SafeMap,
9} = primordials;
10
11const assert = require('internal/assert');
12const path = require('path');
13const EventEmitter = require('events');
14const { owner_symbol } = require('internal/async_hooks').symbols;
15const Worker = require('internal/cluster/worker');
16const { internal, sendHelper } = require('internal/cluster/utils');
17const cluster = new EventEmitter();
18const handles = new SafeMap();
19const indexes = new SafeMap();
20const noop = FunctionPrototype;
21
22module.exports = cluster;
23
24cluster.isWorker = true;
25cluster.isMaster = false;
26cluster.worker = null;
27cluster.Worker = Worker;
28
29cluster._setupWorker = function() {
30  const worker = new Worker({
31    id: +process.env.NODE_UNIQUE_ID | 0,
32    process: process,
33    state: 'online'
34  });
35
36  cluster.worker = worker;
37
38  process.once('disconnect', () => {
39    worker.emit('disconnect');
40
41    if (!worker.exitedAfterDisconnect) {
42      // Unexpected disconnect, master exited, or some such nastiness, so
43      // worker exits immediately.
44      process.exit(0);
45    }
46  });
47
48  process.on('internalMessage', internal(worker, onmessage));
49  send({ act: 'online' });
50
51  function onmessage(message, handle) {
52    if (message.act === 'newconn')
53      onconnection(message, handle);
54    else if (message.act === 'disconnect')
55      ReflectApply(_disconnect, worker, [true]);
56  }
57};
58
59// `obj` is a net#Server or a dgram#Socket object.
60cluster._getServer = function(obj, options, cb) {
61  let address = options.address;
62
63  // Resolve unix socket paths to absolute paths
64  if (options.port < 0 && typeof address === 'string' &&
65      process.platform !== 'win32')
66    address = path.resolve(address);
67
68  const indexesKey = ArrayPrototypeJoin(
69    [
70      address,
71      options.port,
72      options.addressType,
73      options.fd,
74    ], ':');
75
76  let index = indexes.get(indexesKey);
77
78  if (index === undefined)
79    index = 0;
80  else
81    index++;
82
83  indexes.set(indexesKey, index);
84
85  const message = {
86    act: 'queryServer',
87    index,
88    data: null,
89    ...options
90  };
91
92  message.address = address;
93
94  // Set custom data on handle (i.e. tls tickets key)
95  if (obj._getServerData)
96    message.data = obj._getServerData();
97
98  send(message, (reply, handle) => {
99    if (typeof obj._setServerData === 'function')
100      obj._setServerData(reply.data);
101
102    if (handle)
103      shared(reply, handle, indexesKey, cb);  // Shared listen socket.
104    else
105      rr(reply, indexesKey, cb);              // Round-robin.
106  });
107
108  obj.once('listening', () => {
109    cluster.worker.state = 'listening';
110    const address = obj.address();
111    message.act = 'listening';
112    message.port = (address && address.port) || options.port;
113    send(message);
114  });
115};
116
117// Shared listen socket.
118function shared(message, handle, indexesKey, cb) {
119  const key = message.key;
120  // Monkey-patch the close() method so we can keep track of when it's
121  // closed. Avoids resource leaks when the handle is short-lived.
122  const close = handle.close;
123
124  handle.close = function() {
125    send({ act: 'close', key });
126    handles.delete(key);
127    indexes.delete(indexesKey);
128    return ReflectApply(close, handle, arguments);
129  };
130  assert(handles.has(key) === false);
131  handles.set(key, handle);
132  cb(message.errno, handle);
133}
134
135// Round-robin. Master distributes handles across workers.
136function rr(message, indexesKey, cb) {
137  if (message.errno)
138    return cb(message.errno, null);
139
140  let key = message.key;
141
142  function listen(backlog) {
143    // TODO(bnoordhuis) Send a message to the master that tells it to
144    // update the backlog size. The actual backlog should probably be
145    // the largest requested size by any worker.
146    return 0;
147  }
148
149  function close() {
150    // lib/net.js treats server._handle.close() as effectively synchronous.
151    // That means there is a time window between the call to close() and
152    // the ack by the master process in which we can still receive handles.
153    // onconnection() below handles that by sending those handles back to
154    // the master.
155    if (key === undefined)
156      return;
157
158    send({ act: 'close', key });
159    handles.delete(key);
160    indexes.delete(indexesKey);
161    key = undefined;
162  }
163
164  function getsockname(out) {
165    if (key)
166      ObjectAssign(out, message.sockname);
167
168    return 0;
169  }
170
171  // Faux handle. Mimics a TCPWrap with just enough fidelity to get away
172  // with it. Fools net.Server into thinking that it's backed by a real
173  // handle. Use a noop function for ref() and unref() because the control
174  // channel is going to keep the worker alive anyway.
175  const handle = { close, listen, ref: noop, unref: noop };
176
177  if (message.sockname) {
178    handle.getsockname = getsockname;  // TCP handles only.
179  }
180
181  assert(handles.has(key) === false);
182  handles.set(key, handle);
183  cb(0, handle);
184}
185
186// Round-robin connection.
187function onconnection(message, handle) {
188  const key = message.key;
189  const server = handles.get(key);
190  const accepted = server !== undefined;
191
192  send({ ack: message.seq, accepted });
193
194  if (accepted)
195    server.onconnection(0, handle);
196}
197
198function send(message, cb) {
199  return sendHelper(process, message, null, cb);
200}
201
202function _disconnect(masterInitiated) {
203  this.exitedAfterDisconnect = true;
204  let waitingCount = 1;
205
206  function checkWaitingCount() {
207    waitingCount--;
208
209    if (waitingCount === 0) {
210      // If disconnect is worker initiated, wait for ack to be sure
211      // exitedAfterDisconnect is properly set in the master, otherwise, if
212      // it's master initiated there's no need to send the
213      // exitedAfterDisconnect message
214      if (masterInitiated) {
215        process.disconnect();
216      } else {
217        send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
218      }
219    }
220  }
221
222  handles.forEach((handle) => {
223    waitingCount++;
224
225    if (handle[owner_symbol])
226      handle[owner_symbol].close(checkWaitingCount);
227    else
228      handle.close(checkWaitingCount);
229  });
230
231  handles.clear();
232  checkWaitingCount();
233}
234
235// Extend generic Worker with methods specific to worker processes.
236Worker.prototype.disconnect = function() {
237  if (this.state !== 'disconnecting' && this.state !== 'destroying') {
238    this.state = 'disconnecting';
239    ReflectApply(_disconnect, this, []);
240  }
241
242  return this;
243};
244
245Worker.prototype.destroy = function() {
246  if (this.state === 'destroying')
247    return;
248
249  this.exitedAfterDisconnect = true;
250  if (!this.isConnected()) {
251    process.exit(0);
252  } else {
253    this.state = 'destroying';
254    send({ act: 'exitedAfterDisconnect' }, () => process.disconnect());
255    process.once('disconnect', () => process.exit(0));
256  }
257};
258