• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayIsArray,
5  Boolean,
6  Map,
7} = primordials;
8
9const assert = require('internal/assert');
10const net = require('net');
11const { sendHelper } = require('internal/cluster/utils');
12const uv = internalBinding('uv');
13const { constants } = internalBinding('tcp_wrap');
14
15module.exports = RoundRobinHandle;
16
17function RoundRobinHandle(key, address, { port, fd, flags }) {
18  this.key = key;
19  this.all = new Map();
20  this.free = new Map();
21  this.handles = [];
22  this.handle = null;
23  this.server = net.createServer(assert.fail);
24
25  if (fd >= 0)
26    this.server.listen({ fd });
27  else if (port >= 0) {
28    this.server.listen({
29      port,
30      host: address,
31      // Currently, net module only supports `ipv6Only` option in `flags`.
32      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
33    });
34  } else
35    this.server.listen(address);  // UNIX socket path.
36
37  this.server.once('listening', () => {
38    this.handle = this.server._handle;
39    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
40    this.server._handle = null;
41    this.server = null;
42  });
43}
44
45RoundRobinHandle.prototype.add = function(worker, send) {
46  assert(this.all.has(worker.id) === false);
47  this.all.set(worker.id, worker);
48
49  const done = () => {
50    if (this.handle.getsockname) {
51      const out = {};
52      this.handle.getsockname(out);
53      // TODO(bnoordhuis) Check err.
54      send(null, { sockname: out }, null);
55    } else {
56      send(null, null, null);  // UNIX socket.
57    }
58
59    this.handoff(worker);  // In case there are connections pending.
60  };
61
62  if (this.server === null)
63    return done();
64
65  // Still busy binding.
66  this.server.once('listening', done);
67  this.server.once('error', (err) => {
68    // Hack: translate 'EADDRINUSE' error string back to numeric error code.
69    // It works but ideally we'd have some backchannel between the net and
70    // cluster modules for stuff like this.
71    send(uv[`UV_${err.errno}`], null);
72  });
73};
74
75RoundRobinHandle.prototype.remove = function(worker) {
76  const existed = this.all.delete(worker.id);
77
78  if (!existed)
79    return false;
80
81  this.free.delete(worker.id);
82
83  if (this.all.size !== 0)
84    return false;
85
86  for (const handle of this.handles) {
87    handle.close();
88  }
89  this.handles = [];
90
91  this.handle.close();
92  this.handle = null;
93  return true;
94};
95
96RoundRobinHandle.prototype.distribute = function(err, handle) {
97  this.handles.push(handle);
98  const [ workerEntry ] = this.free;
99
100  if (ArrayIsArray(workerEntry)) {
101    const [ workerId, worker ] = workerEntry;
102    this.free.delete(workerId);
103    this.handoff(worker);
104  }
105};
106
107RoundRobinHandle.prototype.handoff = function(worker) {
108  if (!this.all.has(worker.id)) {
109    return;  // Worker is closing (or has closed) the server.
110  }
111
112  const handle = this.handles.shift();
113
114  if (handle === undefined) {
115    this.free.set(worker.id, worker);  // Add to ready queue again.
116    return;
117  }
118
119  const message = { act: 'newconn', key: this.key };
120
121  sendHelper(worker.process, message, handle, (reply) => {
122    if (reply.accepted)
123      handle.close();
124    else
125      this.distribute(0, handle);  // Worker is shutting down. Send to another.
126
127    this.handoff(worker);
128  });
129};
130