1'use strict'; 2 3const { 4 Map, 5 ObjectAssign, 6} = primordials; 7 8const assert = require('internal/assert'); 9const path = require('path'); 10const EventEmitter = require('events'); 11const { owner_symbol } = require('internal/async_hooks').symbols; 12const Worker = require('internal/cluster/worker'); 13const { internal, sendHelper } = require('internal/cluster/utils'); 14const cluster = new EventEmitter(); 15const handles = new Map(); 16const indexes = new Map(); 17const noop = () => {}; 18 19module.exports = cluster; 20 21cluster.isWorker = true; 22cluster.isMaster = false; 23cluster.worker = null; 24cluster.Worker = Worker; 25 26cluster._setupWorker = function() { 27 const worker = new Worker({ 28 id: +process.env.NODE_UNIQUE_ID | 0, 29 process: process, 30 state: 'online' 31 }); 32 33 cluster.worker = worker; 34 35 process.once('disconnect', () => { 36 worker.emit('disconnect'); 37 38 if (!worker.exitedAfterDisconnect) { 39 // Unexpected disconnect, master exited, or some such nastiness, so 40 // worker exits immediately. 41 process.exit(0); 42 } 43 }); 44 45 process.on('internalMessage', internal(worker, onmessage)); 46 send({ act: 'online' }); 47 48 function onmessage(message, handle) { 49 if (message.act === 'newconn') 50 onconnection(message, handle); 51 else if (message.act === 'disconnect') 52 _disconnect.call(worker, true); 53 } 54}; 55 56// `obj` is a net#Server or a dgram#Socket object. 57cluster._getServer = function(obj, options, cb) { 58 let address = options.address; 59 60 // Resolve unix socket paths to absolute paths 61 if (options.port < 0 && typeof address === 'string' && 62 process.platform !== 'win32') 63 address = path.resolve(address); 64 65 const indexesKey = [address, 66 options.port, 67 options.addressType, 68 options.fd ].join(':'); 69 70 let index = indexes.get(indexesKey); 71 72 if (index === undefined) 73 index = 0; 74 else 75 index++; 76 77 indexes.set(indexesKey, index); 78 79 const message = { 80 act: 'queryServer', 81 index, 82 data: null, 83 ...options 84 }; 85 86 message.address = address; 87 88 // Set custom data on handle (i.e. tls tickets key) 89 if (obj._getServerData) 90 message.data = obj._getServerData(); 91 92 send(message, (reply, handle) => { 93 if (typeof obj._setServerData === 'function') 94 obj._setServerData(reply.data); 95 96 if (handle) 97 shared(reply, handle, indexesKey, cb); // Shared listen socket. 98 else 99 rr(reply, indexesKey, cb); // Round-robin. 100 }); 101 102 obj.once('listening', () => { 103 cluster.worker.state = 'listening'; 104 const address = obj.address(); 105 message.act = 'listening'; 106 message.port = (address && address.port) || options.port; 107 send(message); 108 }); 109}; 110 111// Shared listen socket. 112function shared(message, handle, indexesKey, cb) { 113 const key = message.key; 114 // Monkey-patch the close() method so we can keep track of when it's 115 // closed. Avoids resource leaks when the handle is short-lived. 116 const close = handle.close; 117 118 handle.close = function() { 119 send({ act: 'close', key }); 120 handles.delete(key); 121 indexes.delete(indexesKey); 122 return close.apply(handle, arguments); 123 }; 124 assert(handles.has(key) === false); 125 handles.set(key, handle); 126 cb(message.errno, handle); 127} 128 129// Round-robin. Master distributes handles across workers. 130function rr(message, indexesKey, cb) { 131 if (message.errno) 132 return cb(message.errno, null); 133 134 let key = message.key; 135 136 function listen(backlog) { 137 // TODO(bnoordhuis) Send a message to the master that tells it to 138 // update the backlog size. The actual backlog should probably be 139 // the largest requested size by any worker. 140 return 0; 141 } 142 143 function close() { 144 // lib/net.js treats server._handle.close() as effectively synchronous. 145 // That means there is a time window between the call to close() and 146 // the ack by the master process in which we can still receive handles. 147 // onconnection() below handles that by sending those handles back to 148 // the master. 149 if (key === undefined) 150 return; 151 152 send({ act: 'close', key }); 153 handles.delete(key); 154 indexes.delete(indexesKey); 155 key = undefined; 156 } 157 158 function getsockname(out) { 159 if (key) 160 ObjectAssign(out, message.sockname); 161 162 return 0; 163 } 164 165 // Faux handle. Mimics a TCPWrap with just enough fidelity to get away 166 // with it. Fools net.Server into thinking that it's backed by a real 167 // handle. Use a noop function for ref() and unref() because the control 168 // channel is going to keep the worker alive anyway. 169 const handle = { close, listen, ref: noop, unref: noop }; 170 171 if (message.sockname) { 172 handle.getsockname = getsockname; // TCP handles only. 173 } 174 175 assert(handles.has(key) === false); 176 handles.set(key, handle); 177 cb(0, handle); 178} 179 180// Round-robin connection. 181function onconnection(message, handle) { 182 const key = message.key; 183 const server = handles.get(key); 184 const accepted = server !== undefined; 185 186 send({ ack: message.seq, accepted }); 187 188 if (accepted) 189 server.onconnection(0, handle); 190} 191 192function send(message, cb) { 193 return sendHelper(process, message, null, cb); 194} 195 196function _disconnect(masterInitiated) { 197 this.exitedAfterDisconnect = true; 198 let waitingCount = 1; 199 200 function checkWaitingCount() { 201 waitingCount--; 202 203 if (waitingCount === 0) { 204 // If disconnect is worker initiated, wait for ack to be sure 205 // exitedAfterDisconnect is properly set in the master, otherwise, if 206 // it's master initiated there's no need to send the 207 // exitedAfterDisconnect message 208 if (masterInitiated) { 209 process.disconnect(); 210 } else { 211 send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); 212 } 213 } 214 } 215 216 handles.forEach((handle) => { 217 waitingCount++; 218 219 if (handle[owner_symbol]) 220 handle[owner_symbol].close(checkWaitingCount); 221 else 222 handle.close(checkWaitingCount); 223 }); 224 225 handles.clear(); 226 checkWaitingCount(); 227} 228 229// Extend generic Worker with methods specific to worker processes. 230Worker.prototype.disconnect = function() { 231 if (![ 'disconnecting', 'destroying' ].includes(this.state)) { 232 this.state = 'disconnecting'; 233 _disconnect.call(this); 234 } 235 236 return this; 237}; 238 239Worker.prototype.destroy = function() { 240 if (this.state === 'destroying') 241 return; 242 243 this.exitedAfterDisconnect = true; 244 if (!this.isConnected()) { 245 process.exit(0); 246 } else { 247 this.state = 'destroying'; 248 send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); 249 process.once('disconnect', () => process.exit(0)); 250 } 251}; 252