1'use strict'; 2 3const { 4 ArrayPrototypePush, 5 ArrayPrototypeSlice, 6 ArrayPrototypeSome, 7 ObjectKeys, 8 ObjectValues, 9 SafeMap, 10 StringPrototypeStartsWith, 11} = primordials; 12const { 13 codes: { 14 ERR_SOCKET_BAD_PORT, 15 }, 16} = require('internal/errors'); 17 18const assert = require('internal/assert'); 19const { fork } = require('child_process'); 20const path = require('path'); 21const EventEmitter = require('events'); 22const RoundRobinHandle = require('internal/cluster/round_robin_handle'); 23const SharedHandle = require('internal/cluster/shared_handle'); 24const Worker = require('internal/cluster/worker'); 25const { getInspectPort, isUsingInspector } = require('internal/util/inspector'); 26const { internal, sendHelper } = require('internal/cluster/utils'); 27const cluster = new EventEmitter(); 28const intercom = new EventEmitter(); 29const SCHED_NONE = 1; 30const SCHED_RR = 2; 31 32module.exports = cluster; 33 34const handles = new SafeMap(); 35cluster.isWorker = false; 36cluster.isMaster = true; // Deprecated alias. Must be same as isPrimary. 37cluster.isPrimary = true; 38cluster.Worker = Worker; 39cluster.workers = {}; 40cluster.settings = {}; 41cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system. 42cluster.SCHED_RR = SCHED_RR; // Primary distributes connections. 43 44let ids = 0; 45let initialized = false; 46 47// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings? 48let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY; 49if (schedulingPolicy === 'rr') 50 schedulingPolicy = SCHED_RR; 51else if (schedulingPolicy === 'none') 52 schedulingPolicy = SCHED_NONE; 53else if (process.platform === 'win32') { 54 // Round-robin doesn't perform well on 55 // Windows due to the way IOCP is wired up. 56 schedulingPolicy = SCHED_NONE; 57} else 58 schedulingPolicy = SCHED_RR; 59 60cluster.schedulingPolicy = schedulingPolicy; 61 62cluster.setupPrimary = function(options) { 63 const settings = { 64 args: ArrayPrototypeSlice(process.argv, 2), 65 exec: process.argv[1], 66 execArgv: process.execArgv, 67 silent: false, 68 ...cluster.settings, 69 ...options, 70 }; 71 72 // Tell V8 to write profile data for each process to a separate file. 73 // Without --logfile=v8-%p.log, everything ends up in a single, unusable 74 // file. (Unusable because what V8 logs are memory addresses and each 75 // process has its own memory mappings.) 76 if (ArrayPrototypeSome(settings.execArgv, 77 (s) => StringPrototypeStartsWith(s, '--prof')) && 78 !ArrayPrototypeSome(settings.execArgv, 79 (s) => StringPrototypeStartsWith(s, '--logfile='))) { 80 settings.execArgv = [...settings.execArgv, '--logfile=v8-%p.log']; 81 } 82 83 cluster.settings = settings; 84 85 if (initialized === true) 86 return process.nextTick(setupSettingsNT, settings); 87 88 initialized = true; 89 schedulingPolicy = cluster.schedulingPolicy; // Freeze policy. 90 assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR, 91 `Bad cluster.schedulingPolicy: ${schedulingPolicy}`); 92 93 process.nextTick(setupSettingsNT, settings); 94 95 process.on('internalMessage', (message) => { 96 if (message.cmd !== 'NODE_DEBUG_ENABLED') 97 return; 98 99 for (const worker of ObjectValues(cluster.workers)) { 100 if (worker.state === 'online' || worker.state === 'listening') { 101 process._debugProcess(worker.process.pid); 102 } else { 103 worker.once('online', function() { 104 process._debugProcess(this.process.pid); 105 }); 106 } 107 } 108 }); 109}; 110 111// Deprecated alias must be same as setupPrimary 112cluster.setupMaster = cluster.setupPrimary; 113 114function setupSettingsNT(settings) { 115 cluster.emit('setup', settings); 116} 117 118function createWorkerProcess(id, env) { 119 const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` }; 120 const execArgv = [...cluster.settings.execArgv]; 121 122 if (cluster.settings.inspectPort === null) { 123 throw new ERR_SOCKET_BAD_PORT('Port', null, true); 124 } 125 if (isUsingInspector(cluster.settings.execArgv)) { 126 ArrayPrototypePush(execArgv, `--inspect-port=${getInspectPort(cluster.settings.inspectPort)}`); 127 } 128 129 return fork(cluster.settings.exec, cluster.settings.args, { 130 cwd: cluster.settings.cwd, 131 env: workerEnv, 132 serialization: cluster.settings.serialization, 133 silent: cluster.settings.silent, 134 windowsHide: cluster.settings.windowsHide, 135 execArgv: execArgv, 136 stdio: cluster.settings.stdio, 137 gid: cluster.settings.gid, 138 uid: cluster.settings.uid, 139 }); 140} 141 142function removeWorker(worker) { 143 assert(worker); 144 delete cluster.workers[worker.id]; 145 146 if (ObjectKeys(cluster.workers).length === 0) { 147 assert(handles.size === 0, 'Resource leak detected.'); 148 intercom.emit('disconnect'); 149 } 150} 151 152function removeHandlesForWorker(worker) { 153 assert(worker); 154 155 handles.forEach((handle, key) => { 156 if (handle.remove(worker)) 157 handles.delete(key); 158 }); 159} 160 161cluster.fork = function(env) { 162 cluster.setupPrimary(); 163 const id = ++ids; 164 const workerProcess = createWorkerProcess(id, env); 165 const worker = new Worker({ 166 id: id, 167 process: workerProcess, 168 }); 169 170 worker.on('message', function(message, handle) { 171 cluster.emit('message', this, message, handle); 172 }); 173 174 worker.process.once('exit', (exitCode, signalCode) => { 175 /* 176 * Remove the worker from the workers list only 177 * if it has disconnected, otherwise we might 178 * still want to access it. 179 */ 180 if (!worker.isConnected()) { 181 removeHandlesForWorker(worker); 182 removeWorker(worker); 183 } 184 185 worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect; 186 worker.state = 'dead'; 187 worker.emit('exit', exitCode, signalCode); 188 cluster.emit('exit', worker, exitCode, signalCode); 189 }); 190 191 worker.process.once('disconnect', () => { 192 /* 193 * Now is a good time to remove the handles 194 * associated with this worker because it is 195 * not connected to the primary anymore. 196 */ 197 removeHandlesForWorker(worker); 198 199 /* 200 * Remove the worker from the workers list only 201 * if its process has exited. Otherwise, we might 202 * still want to access it. 203 */ 204 if (worker.isDead()) 205 removeWorker(worker); 206 207 worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect; 208 worker.state = 'disconnected'; 209 worker.emit('disconnect'); 210 cluster.emit('disconnect', worker); 211 }); 212 213 worker.process.on('internalMessage', internal(worker, onmessage)); 214 process.nextTick(emitForkNT, worker); 215 cluster.workers[worker.id] = worker; 216 return worker; 217}; 218 219function emitForkNT(worker) { 220 cluster.emit('fork', worker); 221} 222 223cluster.disconnect = function(cb) { 224 const workers = ObjectKeys(cluster.workers); 225 226 if (workers.length === 0) { 227 process.nextTick(() => intercom.emit('disconnect')); 228 } else { 229 for (const worker of ObjectValues(cluster.workers)) { 230 if (worker.isConnected()) { 231 worker.disconnect(); 232 } 233 } 234 } 235 236 if (typeof cb === 'function') 237 intercom.once('disconnect', cb); 238}; 239 240const methodMessageMapping = { 241 close, 242 exitedAfterDisconnect, 243 listening, 244 online, 245 queryServer, 246}; 247 248function onmessage(message, handle) { 249 const worker = this; 250 251 const fn = methodMessageMapping[message.act]; 252 253 if (typeof fn === 'function') 254 fn(worker, message); 255} 256 257function online(worker) { 258 worker.state = 'online'; 259 worker.emit('online'); 260 cluster.emit('online', worker); 261} 262 263function exitedAfterDisconnect(worker, message) { 264 worker.exitedAfterDisconnect = true; 265 send(worker, { ack: message.seq }); 266} 267 268function queryServer(worker, message) { 269 // Stop processing if worker already disconnecting 270 if (worker.exitedAfterDisconnect) 271 return; 272 273 const key = `${message.address}:${message.port}:${message.addressType}:` + 274 `${message.fd}:${message.index}`; 275 let handle = handles.get(key); 276 277 if (handle === undefined) { 278 let address = message.address; 279 280 // Find shortest path for unix sockets because of the ~100 byte limit 281 if (message.port < 0 && typeof address === 'string' && 282 process.platform !== 'win32') { 283 284 address = path.relative(process.cwd(), address); 285 286 if (message.address.length < address.length) 287 address = message.address; 288 } 289 290 // UDP is exempt from round-robin connection balancing for what should 291 // be obvious reasons: it's connectionless. There is nothing to send to 292 // the workers except raw datagrams and that's pointless. 293 if (schedulingPolicy !== SCHED_RR || 294 message.addressType === 'udp4' || 295 message.addressType === 'udp6') { 296 handle = new SharedHandle(key, address, message); 297 } else { 298 handle = new RoundRobinHandle(key, address, message); 299 } 300 301 handles.set(key, handle); 302 } 303 304 if (!handle.data) 305 handle.data = message.data; 306 307 // Set custom server data 308 handle.add(worker, (errno, reply, handle) => { 309 const { data } = handles.get(key); 310 311 if (errno) 312 handles.delete(key); // Gives other workers a chance to retry. 313 314 send(worker, { 315 errno, 316 key, 317 ack: message.seq, 318 data, 319 ...reply, 320 }, handle); 321 }); 322} 323 324function listening(worker, message) { 325 const info = { 326 addressType: message.addressType, 327 address: message.address, 328 port: message.port, 329 fd: message.fd, 330 }; 331 332 worker.state = 'listening'; 333 worker.emit('listening', info); 334 cluster.emit('listening', worker, info); 335} 336 337// Server in worker is closing, remove from list. The handle may have been 338// removed by a prior call to removeHandlesForWorker() so guard against that. 339function close(worker, message) { 340 const key = message.key; 341 const handle = handles.get(key); 342 343 if (handle && handle.remove(worker)) 344 handles.delete(key); 345} 346 347function send(worker, message, handle, cb) { 348 return sendHelper(worker.process, message, handle, cb); 349} 350 351// Extend generic Worker with methods specific to the primary process. 352Worker.prototype.disconnect = function() { 353 this.exitedAfterDisconnect = true; 354 send(this, { act: 'disconnect' }); 355 removeHandlesForWorker(this); 356 removeWorker(this); 357 return this; 358}; 359 360Worker.prototype.destroy = function(signo) { 361 const proc = this.process; 362 const signal = signo || 'SIGTERM'; 363 364 proc.kill(signal); 365}; 366