1'use strict'; 2 3const { 4 ArrayPrototypeJoin, 5 FunctionPrototype, 6 ObjectAssign, 7 ReflectApply, 8 SafeMap, 9} = primordials; 10 11const assert = require('internal/assert'); 12const path = require('path'); 13const EventEmitter = require('events'); 14const { owner_symbol } = require('internal/async_hooks').symbols; 15const Worker = require('internal/cluster/worker'); 16const { internal, sendHelper } = require('internal/cluster/utils'); 17const cluster = new EventEmitter(); 18const handles = new SafeMap(); 19const indexes = new SafeMap(); 20const noop = FunctionPrototype; 21 22module.exports = cluster; 23 24cluster.isWorker = true; 25cluster.isMaster = false; 26cluster.worker = null; 27cluster.Worker = Worker; 28 29cluster._setupWorker = function() { 30 const worker = new Worker({ 31 id: +process.env.NODE_UNIQUE_ID | 0, 32 process: process, 33 state: 'online' 34 }); 35 36 cluster.worker = worker; 37 38 process.once('disconnect', () => { 39 worker.emit('disconnect'); 40 41 if (!worker.exitedAfterDisconnect) { 42 // Unexpected disconnect, master exited, or some such nastiness, so 43 // worker exits immediately. 44 process.exit(0); 45 } 46 }); 47 48 process.on('internalMessage', internal(worker, onmessage)); 49 send({ act: 'online' }); 50 51 function onmessage(message, handle) { 52 if (message.act === 'newconn') 53 onconnection(message, handle); 54 else if (message.act === 'disconnect') 55 ReflectApply(_disconnect, worker, [true]); 56 } 57}; 58 59// `obj` is a net#Server or a dgram#Socket object. 60cluster._getServer = function(obj, options, cb) { 61 let address = options.address; 62 63 // Resolve unix socket paths to absolute paths 64 if (options.port < 0 && typeof address === 'string' && 65 process.platform !== 'win32') 66 address = path.resolve(address); 67 68 const indexesKey = ArrayPrototypeJoin( 69 [ 70 address, 71 options.port, 72 options.addressType, 73 options.fd, 74 ], ':'); 75 76 let index = indexes.get(indexesKey); 77 78 if (index === undefined) 79 index = 0; 80 else 81 index++; 82 83 indexes.set(indexesKey, index); 84 85 const message = { 86 act: 'queryServer', 87 index, 88 data: null, 89 ...options 90 }; 91 92 message.address = address; 93 94 // Set custom data on handle (i.e. tls tickets key) 95 if (obj._getServerData) 96 message.data = obj._getServerData(); 97 98 send(message, (reply, handle) => { 99 if (typeof obj._setServerData === 'function') 100 obj._setServerData(reply.data); 101 102 if (handle) 103 shared(reply, handle, indexesKey, cb); // Shared listen socket. 104 else 105 rr(reply, indexesKey, cb); // Round-robin. 106 }); 107 108 obj.once('listening', () => { 109 cluster.worker.state = 'listening'; 110 const address = obj.address(); 111 message.act = 'listening'; 112 message.port = (address && address.port) || options.port; 113 send(message); 114 }); 115}; 116 117// Shared listen socket. 118function shared(message, handle, indexesKey, cb) { 119 const key = message.key; 120 // Monkey-patch the close() method so we can keep track of when it's 121 // closed. Avoids resource leaks when the handle is short-lived. 122 const close = handle.close; 123 124 handle.close = function() { 125 send({ act: 'close', key }); 126 handles.delete(key); 127 indexes.delete(indexesKey); 128 return ReflectApply(close, handle, arguments); 129 }; 130 assert(handles.has(key) === false); 131 handles.set(key, handle); 132 cb(message.errno, handle); 133} 134 135// Round-robin. Master distributes handles across workers. 136function rr(message, indexesKey, cb) { 137 if (message.errno) 138 return cb(message.errno, null); 139 140 let key = message.key; 141 142 function listen(backlog) { 143 // TODO(bnoordhuis) Send a message to the master that tells it to 144 // update the backlog size. The actual backlog should probably be 145 // the largest requested size by any worker. 146 return 0; 147 } 148 149 function close() { 150 // lib/net.js treats server._handle.close() as effectively synchronous. 151 // That means there is a time window between the call to close() and 152 // the ack by the master process in which we can still receive handles. 153 // onconnection() below handles that by sending those handles back to 154 // the master. 155 if (key === undefined) 156 return; 157 158 send({ act: 'close', key }); 159 handles.delete(key); 160 indexes.delete(indexesKey); 161 key = undefined; 162 } 163 164 function getsockname(out) { 165 if (key) 166 ObjectAssign(out, message.sockname); 167 168 return 0; 169 } 170 171 // Faux handle. Mimics a TCPWrap with just enough fidelity to get away 172 // with it. Fools net.Server into thinking that it's backed by a real 173 // handle. Use a noop function for ref() and unref() because the control 174 // channel is going to keep the worker alive anyway. 175 const handle = { close, listen, ref: noop, unref: noop }; 176 177 if (message.sockname) { 178 handle.getsockname = getsockname; // TCP handles only. 179 } 180 181 assert(handles.has(key) === false); 182 handles.set(key, handle); 183 cb(0, handle); 184} 185 186// Round-robin connection. 187function onconnection(message, handle) { 188 const key = message.key; 189 const server = handles.get(key); 190 const accepted = server !== undefined; 191 192 send({ ack: message.seq, accepted }); 193 194 if (accepted) 195 server.onconnection(0, handle); 196} 197 198function send(message, cb) { 199 return sendHelper(process, message, null, cb); 200} 201 202function _disconnect(masterInitiated) { 203 this.exitedAfterDisconnect = true; 204 let waitingCount = 1; 205 206 function checkWaitingCount() { 207 waitingCount--; 208 209 if (waitingCount === 0) { 210 // If disconnect is worker initiated, wait for ack to be sure 211 // exitedAfterDisconnect is properly set in the master, otherwise, if 212 // it's master initiated there's no need to send the 213 // exitedAfterDisconnect message 214 if (masterInitiated) { 215 process.disconnect(); 216 } else { 217 send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); 218 } 219 } 220 } 221 222 handles.forEach((handle) => { 223 waitingCount++; 224 225 if (handle[owner_symbol]) 226 handle[owner_symbol].close(checkWaitingCount); 227 else 228 handle.close(checkWaitingCount); 229 }); 230 231 handles.clear(); 232 checkWaitingCount(); 233} 234 235// Extend generic Worker with methods specific to worker processes. 236Worker.prototype.disconnect = function() { 237 if (this.state !== 'disconnecting' && this.state !== 'destroying') { 238 this.state = 'disconnecting'; 239 ReflectApply(_disconnect, this, []); 240 } 241 242 return this; 243}; 244 245Worker.prototype.destroy = function() { 246 if (this.state === 'destroying') 247 return; 248 249 this.exitedAfterDisconnect = true; 250 if (!this.isConnected()) { 251 process.exit(0); 252 } else { 253 this.state = 'destroying'; 254 send({ act: 'exitedAfterDisconnect' }, () => process.disconnect()); 255 process.once('disconnect', () => process.exit(0)); 256 } 257}; 258