1'use strict'; 2 3const { 4 ArrayIsArray, 5 ArrayPrototypePush, 6 ArrayPrototypeShift, 7 Boolean, 8 SafeMap, 9} = primordials; 10 11const assert = require('internal/assert'); 12const net = require('net'); 13const { sendHelper } = require('internal/cluster/utils'); 14const { constants } = internalBinding('tcp_wrap'); 15 16module.exports = RoundRobinHandle; 17 18function RoundRobinHandle(key, address, { port, fd, flags }) { 19 this.key = key; 20 this.all = new SafeMap(); 21 this.free = new SafeMap(); 22 this.handles = []; 23 this.handle = null; 24 this.server = net.createServer(assert.fail); 25 26 if (fd >= 0) 27 this.server.listen({ fd }); 28 else if (port >= 0) { 29 this.server.listen({ 30 port, 31 host: address, 32 // Currently, net module only supports `ipv6Only` option in `flags`. 33 ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), 34 }); 35 } else 36 this.server.listen(address); // UNIX socket path. 37 38 this.server.once('listening', () => { 39 this.handle = this.server._handle; 40 this.handle.onconnection = (err, handle) => this.distribute(err, handle); 41 this.server._handle = null; 42 this.server = null; 43 }); 44} 45 46RoundRobinHandle.prototype.add = function(worker, send) { 47 assert(this.all.has(worker.id) === false); 48 this.all.set(worker.id, worker); 49 50 const done = () => { 51 if (this.handle.getsockname) { 52 const out = {}; 53 this.handle.getsockname(out); 54 // TODO(bnoordhuis) Check err. 55 send(null, { sockname: out }, null); 56 } else { 57 send(null, null, null); // UNIX socket. 58 } 59 60 this.handoff(worker); // In case there are connections pending. 61 }; 62 63 if (this.server === null) 64 return done(); 65 66 // Still busy binding. 67 this.server.once('listening', done); 68 this.server.once('error', (err) => { 69 send(err.errno, null); 70 }); 71}; 72 73RoundRobinHandle.prototype.remove = function(worker) { 74 const existed = this.all.delete(worker.id); 75 76 if (!existed) 77 return false; 78 79 this.free.delete(worker.id); 80 81 if (this.all.size !== 0) 82 return false; 83 84 for (const handle of this.handles) { 85 handle.close(); 86 } 87 this.handles = []; 88 89 this.handle.close(); 90 this.handle = null; 91 return true; 92}; 93 94RoundRobinHandle.prototype.distribute = function(err, handle) { 95 ArrayPrototypePush(this.handles, handle); 96 const [ workerEntry ] = this.free; // this.free is a SafeMap 97 98 if (ArrayIsArray(workerEntry)) { 99 const { 0: workerId, 1: worker } = workerEntry; 100 this.free.delete(workerId); 101 this.handoff(worker); 102 } 103}; 104 105RoundRobinHandle.prototype.handoff = function(worker) { 106 if (!this.all.has(worker.id)) { 107 return; // Worker is closing (or has closed) the server. 108 } 109 110 const handle = ArrayPrototypeShift(this.handles); 111 112 if (handle === undefined) { 113 this.free.set(worker.id, worker); // Add to ready queue again. 114 return; 115 } 116 117 const message = { act: 'newconn', key: this.key }; 118 119 sendHelper(worker.process, message, handle, (reply) => { 120 if (reply.accepted) 121 handle.close(); 122 else 123 this.distribute(0, handle); // Worker is shutting down. Send to another. 124 125 this.handoff(worker); 126 }); 127}; 128