1'use strict'; 2 3const { 4 ArrayIsArray, 5 Boolean, 6 Map, 7} = primordials; 8 9const assert = require('internal/assert'); 10const net = require('net'); 11const { sendHelper } = require('internal/cluster/utils'); 12const uv = internalBinding('uv'); 13const { constants } = internalBinding('tcp_wrap'); 14 15module.exports = RoundRobinHandle; 16 17function RoundRobinHandle(key, address, { port, fd, flags }) { 18 this.key = key; 19 this.all = new Map(); 20 this.free = new Map(); 21 this.handles = []; 22 this.handle = null; 23 this.server = net.createServer(assert.fail); 24 25 if (fd >= 0) 26 this.server.listen({ fd }); 27 else if (port >= 0) { 28 this.server.listen({ 29 port, 30 host: address, 31 // Currently, net module only supports `ipv6Only` option in `flags`. 32 ipv6Only: Boolean(flags & constants.UV_TCP_IPV6ONLY), 33 }); 34 } else 35 this.server.listen(address); // UNIX socket path. 36 37 this.server.once('listening', () => { 38 this.handle = this.server._handle; 39 this.handle.onconnection = (err, handle) => this.distribute(err, handle); 40 this.server._handle = null; 41 this.server = null; 42 }); 43} 44 45RoundRobinHandle.prototype.add = function(worker, send) { 46 assert(this.all.has(worker.id) === false); 47 this.all.set(worker.id, worker); 48 49 const done = () => { 50 if (this.handle.getsockname) { 51 const out = {}; 52 this.handle.getsockname(out); 53 // TODO(bnoordhuis) Check err. 54 send(null, { sockname: out }, null); 55 } else { 56 send(null, null, null); // UNIX socket. 57 } 58 59 this.handoff(worker); // In case there are connections pending. 60 }; 61 62 if (this.server === null) 63 return done(); 64 65 // Still busy binding. 66 this.server.once('listening', done); 67 this.server.once('error', (err) => { 68 // Hack: translate 'EADDRINUSE' error string back to numeric error code. 69 // It works but ideally we'd have some backchannel between the net and 70 // cluster modules for stuff like this. 71 send(uv[`UV_${err.errno}`], null); 72 }); 73}; 74 75RoundRobinHandle.prototype.remove = function(worker) { 76 const existed = this.all.delete(worker.id); 77 78 if (!existed) 79 return false; 80 81 this.free.delete(worker.id); 82 83 if (this.all.size !== 0) 84 return false; 85 86 for (const handle of this.handles) { 87 handle.close(); 88 } 89 this.handles = []; 90 91 this.handle.close(); 92 this.handle = null; 93 return true; 94}; 95 96RoundRobinHandle.prototype.distribute = function(err, handle) { 97 this.handles.push(handle); 98 const [ workerEntry ] = this.free; 99 100 if (ArrayIsArray(workerEntry)) { 101 const [ workerId, worker ] = workerEntry; 102 this.free.delete(workerId); 103 this.handoff(worker); 104 } 105}; 106 107RoundRobinHandle.prototype.handoff = function(worker) { 108 if (!this.all.has(worker.id)) { 109 return; // Worker is closing (or has closed) the server. 110 } 111 112 const handle = this.handles.shift(); 113 114 if (handle === undefined) { 115 this.free.set(worker.id, worker); // Add to ready queue again. 116 return; 117 } 118 119 const message = { act: 'newconn', key: this.key }; 120 121 sendHelper(worker.process, message, handle, (reply) => { 122 if (reply.accepted) 123 handle.close(); 124 else 125 this.distribute(0, handle); // Worker is shutting down. Send to another. 126 127 this.handoff(worker); 128 }); 129}; 130