• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayIsArray,
5  ArrayPrototypePush,
6  ArrayPrototypeShift,
7  Boolean,
8  SafeMap,
9} = primordials;
10
11const assert = require('internal/assert');
12const net = require('net');
13const { sendHelper } = require('internal/cluster/utils');
14const { constants } = internalBinding('tcp_wrap');
15
16module.exports = RoundRobinHandle;
17
18function RoundRobinHandle(key, address, { port, fd, flags }) {
19  this.key = key;
20  this.all = new SafeMap();
21  this.free = new SafeMap();
22  this.handles = [];
23  this.handle = null;
24  this.server = net.createServer(assert.fail);
25
26  if (fd >= 0)
27    this.server.listen({ fd });
28  else if (port >= 0) {
29    this.server.listen({
30      port,
31      host: address,
32      // Currently, net module only supports `ipv6Only` option in `flags`.
33      ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY),
34    });
35  } else
36    this.server.listen(address);  // UNIX socket path.
37
38  this.server.once('listening', () => {
39    this.handle = this.server._handle;
40    this.handle.onconnection = (err, handle) => this.distribute(err, handle);
41    this.server._handle = null;
42    this.server = null;
43  });
44}
45
46RoundRobinHandle.prototype.add = function(worker, send) {
47  assert(this.all.has(worker.id) === false);
48  this.all.set(worker.id, worker);
49
50  const done = () => {
51    if (this.handle.getsockname) {
52      const out = {};
53      this.handle.getsockname(out);
54      // TODO(bnoordhuis) Check err.
55      send(null, { sockname: out }, null);
56    } else {
57      send(null, null, null);  // UNIX socket.
58    }
59
60    this.handoff(worker);  // In case there are connections pending.
61  };
62
63  if (this.server === null)
64    return done();
65
66  // Still busy binding.
67  this.server.once('listening', done);
68  this.server.once('error', (err) => {
69    send(err.errno, null);
70  });
71};
72
73RoundRobinHandle.prototype.remove = function(worker) {
74  const existed = this.all.delete(worker.id);
75
76  if (!existed)
77    return false;
78
79  this.free.delete(worker.id);
80
81  if (this.all.size !== 0)
82    return false;
83
84  for (const handle of this.handles) {
85    handle.close();
86  }
87  this.handles = [];
88
89  this.handle.close();
90  this.handle = null;
91  return true;
92};
93
94RoundRobinHandle.prototype.distribute = function(err, handle) {
95  ArrayPrototypePush(this.handles, handle);
96  const [ workerEntry ] = this.free; // this.free is a SafeMap
97
98  if (ArrayIsArray(workerEntry)) {
99    const { 0: workerId, 1: worker } = workerEntry;
100    this.free.delete(workerId);
101    this.handoff(worker);
102  }
103};
104
105RoundRobinHandle.prototype.handoff = function(worker) {
106  if (!this.all.has(worker.id)) {
107    return;  // Worker is closing (or has closed) the server.
108  }
109
110  const handle = ArrayPrototypeShift(this.handles);
111
112  if (handle === undefined) {
113    this.free.set(worker.id, worker);  // Add to ready queue again.
114    return;
115  }
116
117  const message = { act: 'newconn', key: this.key };
118
119  sendHelper(worker.process, message, handle, (reply) => {
120    if (reply.accepted)
121      handle.close();
122    else
123      this.distribute(0, handle);  // Worker is shutting down. Send to another.
124
125    this.handoff(worker);
126  });
127};
128