1'use strict'; 2 3const { 4 ArrayPrototypeJoin, 5 FunctionPrototype, 6 ObjectAssign, 7 ReflectApply, 8 SafeMap, 9 SafeSet, 10} = primordials; 11 12const assert = require('internal/assert'); 13const path = require('path'); 14const EventEmitter = require('events'); 15const { owner_symbol } = require('internal/async_hooks').symbols; 16const Worker = require('internal/cluster/worker'); 17const { internal, sendHelper } = require('internal/cluster/utils'); 18const { TIMEOUT_MAX } = require('internal/timers'); 19const { setInterval, clearInterval } = require('timers'); 20 21const cluster = new EventEmitter(); 22const handles = new SafeMap(); 23const indexes = new SafeMap(); 24const noop = FunctionPrototype; 25 26module.exports = cluster; 27 28cluster.isWorker = true; 29cluster.isMaster = false; // Deprecated alias. Must be same as isPrimary. 30cluster.isPrimary = false; 31cluster.worker = null; 32cluster.Worker = Worker; 33 34cluster._setupWorker = function() { 35 const worker = new Worker({ 36 id: +process.env.NODE_UNIQUE_ID | 0, 37 process: process, 38 state: 'online', 39 }); 40 41 cluster.worker = worker; 42 43 process.once('disconnect', () => { 44 worker.emit('disconnect'); 45 46 if (!worker.exitedAfterDisconnect) { 47 // Unexpected disconnect, primary exited, or some such nastiness, so 48 // worker exits immediately. 49 process.exit(0); 50 } 51 }); 52 53 process.on('internalMessage', internal(worker, onmessage)); 54 send({ act: 'online' }); 55 56 function onmessage(message, handle) { 57 if (message.act === 'newconn') 58 onconnection(message, handle); 59 else if (message.act === 'disconnect') 60 ReflectApply(_disconnect, worker, [true]); 61 } 62}; 63 64// `obj` is a net#Server or a dgram#Socket object. 65cluster._getServer = function(obj, options, cb) { 66 let address = options.address; 67 68 // Resolve unix socket paths to absolute paths 69 if (options.port < 0 && typeof address === 'string' && 70 process.platform !== 'win32') 71 address = path.resolve(address); 72 73 const indexesKey = ArrayPrototypeJoin( 74 [ 75 address, 76 options.port, 77 options.addressType, 78 options.fd, 79 ], ':'); 80 81 let indexSet = indexes.get(indexesKey); 82 83 if (indexSet === undefined) { 84 indexSet = { nextIndex: 0, set: new SafeSet() }; 85 indexes.set(indexesKey, indexSet); 86 } 87 const index = indexSet.nextIndex++; 88 indexSet.set.add(index); 89 90 const message = { 91 act: 'queryServer', 92 index, 93 data: null, 94 ...options, 95 }; 96 97 message.address = address; 98 99 // Set custom data on handle (i.e. tls tickets key) 100 if (obj._getServerData) 101 message.data = obj._getServerData(); 102 103 send(message, (reply, handle) => { 104 if (typeof obj._setServerData === 'function') 105 obj._setServerData(reply.data); 106 107 if (handle) { 108 // Shared listen socket 109 shared(reply, { handle, indexesKey, index }, cb); 110 } else { 111 // Round-robin. 112 rr(reply, { indexesKey, index }, cb); 113 } 114 }); 115 116 obj.once('listening', () => { 117 // short-lived sockets might have been closed 118 if (!indexes.has(indexesKey)) { 119 return; 120 } 121 cluster.worker.state = 'listening'; 122 const address = obj.address(); 123 message.act = 'listening'; 124 message.port = (address && address.port) || options.port; 125 send(message); 126 }); 127}; 128 129function removeIndexesKey(indexesKey, index) { 130 const indexSet = indexes.get(indexesKey); 131 if (!indexSet) { 132 return; 133 } 134 135 indexSet.set.delete(index); 136 if (indexSet.set.size === 0) { 137 indexes.delete(indexesKey); 138 } 139} 140 141// Shared listen socket. 142function shared(message, { handle, indexesKey, index }, cb) { 143 const key = message.key; 144 // Monkey-patch the close() method so we can keep track of when it's 145 // closed. Avoids resource leaks when the handle is short-lived. 146 const close = handle.close; 147 148 handle.close = function() { 149 send({ act: 'close', key }); 150 handles.delete(key); 151 removeIndexesKey(indexesKey, index); 152 return ReflectApply(close, handle, arguments); 153 }; 154 assert(handles.has(key) === false); 155 handles.set(key, handle); 156 cb(message.errno, handle); 157} 158 159// Round-robin. Master distributes handles across workers. 160function rr(message, { indexesKey, index }, cb) { 161 if (message.errno) 162 return cb(message.errno, null); 163 164 let key = message.key; 165 166 let fakeHandle = null; 167 168 function ref() { 169 if (!fakeHandle) { 170 fakeHandle = setInterval(noop, TIMEOUT_MAX); 171 } 172 } 173 174 function unref() { 175 if (fakeHandle) { 176 clearInterval(fakeHandle); 177 fakeHandle = null; 178 } 179 } 180 181 function listen(backlog) { 182 // TODO(bnoordhuis) Send a message to the primary that tells it to 183 // update the backlog size. The actual backlog should probably be 184 // the largest requested size by any worker. 185 return 0; 186 } 187 188 function close() { 189 // lib/net.js treats server._handle.close() as effectively synchronous. 190 // That means there is a time window between the call to close() and 191 // the ack by the primary process in which we can still receive handles. 192 // onconnection() below handles that by sending those handles back to 193 // the primary. 194 if (key === undefined) 195 return; 196 unref(); 197 // If the handle is the last handle in process, 198 // the parent process will delete the handle when worker process exits. 199 // So it is ok if the close message get lost. 200 // See the comments of https://github.com/nodejs/node/pull/46161 201 send({ act: 'close', key }); 202 handles.delete(key); 203 removeIndexesKey(indexesKey, index); 204 key = undefined; 205 } 206 207 function getsockname(out) { 208 if (key) 209 ObjectAssign(out, message.sockname); 210 211 return 0; 212 } 213 214 // Faux handle. net.Server is not associated with handle, 215 // so we control its state(ref or unref) by setInterval. 216 const handle = { close, listen, ref, unref }; 217 handle.ref(); 218 if (message.sockname) { 219 handle.getsockname = getsockname; // TCP handles only. 220 } 221 222 assert(handles.has(key) === false); 223 handles.set(key, handle); 224 cb(0, handle); 225} 226 227// Round-robin connection. 228function onconnection(message, handle) { 229 const key = message.key; 230 const server = handles.get(key); 231 let accepted = server !== undefined; 232 233 if (accepted && server[owner_symbol]) { 234 const self = server[owner_symbol]; 235 if (self.maxConnections && self._connections >= self.maxConnections) { 236 accepted = false; 237 } 238 } 239 240 send({ ack: message.seq, accepted }); 241 242 if (accepted) 243 server.onconnection(0, handle); 244 else 245 handle.close(); 246} 247 248function send(message, cb) { 249 return sendHelper(process, message, null, cb); 250} 251 252function _disconnect(primaryInitiated) { 253 this.exitedAfterDisconnect = true; 254 let waitingCount = 1; 255 256 function checkWaitingCount() { 257 waitingCount--; 258 259 if (waitingCount === 0) { 260 // If disconnect is worker initiated, wait for ack to be sure 261 // exitedAfterDisconnect is properly set in the primary, otherwise, if 262 // it's primary initiated there's no need to send the 263 // exitedAfterDisconnect message 264 if (primaryInitiated) { 265 process.disconnect(); 266 } else { 267 send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); 268 } 269 } 270 } 271 272 handles.forEach((handle) => { 273 waitingCount++; 274 275 if (handle[owner_symbol]) 276 handle[owner_symbol].close(checkWaitingCount); 277 else 278 handle.close(checkWaitingCount); 279 }); 280 281 handles.clear(); 282 checkWaitingCount(); 283} 284 285// Extend generic Worker with methods specific to worker processes. 286Worker.prototype.disconnect = function() { 287 if (this.state !== 'disconnecting' && this.state !== 'destroying') { 288 this.state = 'disconnecting'; 289 ReflectApply(_disconnect, this, []); 290 } 291 292 return this; 293}; 294 295Worker.prototype.destroy = function() { 296 if (this.state === 'destroying') 297 return; 298 299 this.exitedAfterDisconnect = true; 300 if (!this.isConnected()) { 301 process.exit(0); 302 } else { 303 this.state = 'destroying'; 304 send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); 305 process.once('disconnect', () => process.exit(0)); 306 } 307}; 308