1'use strict'; 2 3const { 4 Map, 5 ObjectKeys, 6 ObjectValues, 7} = primordials; 8 9const assert = require('internal/assert'); 10const { fork } = require('child_process'); 11const path = require('path'); 12const EventEmitter = require('events'); 13const RoundRobinHandle = require('internal/cluster/round_robin_handle'); 14const SharedHandle = require('internal/cluster/shared_handle'); 15const Worker = require('internal/cluster/worker'); 16const { internal, sendHelper } = require('internal/cluster/utils'); 17const cluster = new EventEmitter(); 18const intercom = new EventEmitter(); 19const SCHED_NONE = 1; 20const SCHED_RR = 2; 21const [ minPort, maxPort ] = [ 1024, 65535 ]; 22const { validatePort } = require('internal/validators'); 23 24module.exports = cluster; 25 26const handles = new Map(); 27cluster.isWorker = false; 28cluster.isMaster = true; 29cluster.Worker = Worker; 30cluster.workers = {}; 31cluster.settings = {}; 32cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system. 33cluster.SCHED_RR = SCHED_RR; // Master distributes connections. 34 35let ids = 0; 36let debugPortOffset = 1; 37let initialized = false; 38 39// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings? 40let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY; 41if (schedulingPolicy === 'rr') 42 schedulingPolicy = SCHED_RR; 43else if (schedulingPolicy === 'none') 44 schedulingPolicy = SCHED_NONE; 45else if (process.platform === 'win32') { 46 // Round-robin doesn't perform well on 47 // Windows due to the way IOCP is wired up. 48 schedulingPolicy = SCHED_NONE; 49} else 50 schedulingPolicy = SCHED_RR; 51 52cluster.schedulingPolicy = schedulingPolicy; 53 54cluster.setupMaster = function(options) { 55 const settings = { 56 args: process.argv.slice(2), 57 exec: process.argv[1], 58 execArgv: process.execArgv, 59 silent: false, 60 ...cluster.settings, 61 ...options 62 }; 63 64 // Tell V8 to write profile data for each process to a separate file. 65 // Without --logfile=v8-%p.log, everything ends up in a single, unusable 66 // file. (Unusable because what V8 logs are memory addresses and each 67 // process has its own memory mappings.) 68 if (settings.execArgv.some((s) => s.startsWith('--prof')) && 69 !settings.execArgv.some((s) => s.startsWith('--logfile='))) { 70 settings.execArgv = [...settings.execArgv, '--logfile=v8-%p.log']; 71 } 72 73 cluster.settings = settings; 74 75 if (initialized === true) 76 return process.nextTick(setupSettingsNT, settings); 77 78 initialized = true; 79 schedulingPolicy = cluster.schedulingPolicy; // Freeze policy. 80 assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR, 81 `Bad cluster.schedulingPolicy: ${schedulingPolicy}`); 82 83 process.nextTick(setupSettingsNT, settings); 84 85 process.on('internalMessage', (message) => { 86 if (message.cmd !== 'NODE_DEBUG_ENABLED') 87 return; 88 89 for (const worker of ObjectValues(cluster.workers)) { 90 if (worker.state === 'online' || worker.state === 'listening') { 91 process._debugProcess(worker.process.pid); 92 } else { 93 worker.once('online', function() { 94 process._debugProcess(this.process.pid); 95 }); 96 } 97 } 98 }); 99}; 100 101function setupSettingsNT(settings) { 102 cluster.emit('setup', settings); 103} 104 105function createWorkerProcess(id, env) { 106 const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` }; 107 const execArgv = [...cluster.settings.execArgv]; 108 const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/; 109 const nodeOptions = process.env.NODE_OPTIONS ? 110 process.env.NODE_OPTIONS : ''; 111 112 if (execArgv.some((arg) => arg.match(debugArgRegex)) || 113 nodeOptions.match(debugArgRegex)) { 114 let inspectPort; 115 if ('inspectPort' in cluster.settings) { 116 if (typeof cluster.settings.inspectPort === 'function') 117 inspectPort = cluster.settings.inspectPort(); 118 else 119 inspectPort = cluster.settings.inspectPort; 120 121 validatePort(inspectPort); 122 } else { 123 inspectPort = process.debugPort + debugPortOffset; 124 if (inspectPort > maxPort) 125 inspectPort = inspectPort - maxPort + minPort - 1; 126 debugPortOffset++; 127 } 128 129 execArgv.push(`--inspect-port=${inspectPort}`); 130 } 131 132 return fork(cluster.settings.exec, cluster.settings.args, { 133 cwd: cluster.settings.cwd, 134 env: workerEnv, 135 serialization: cluster.settings.serialization, 136 silent: cluster.settings.silent, 137 windowsHide: cluster.settings.windowsHide, 138 execArgv: execArgv, 139 stdio: cluster.settings.stdio, 140 gid: cluster.settings.gid, 141 uid: cluster.settings.uid 142 }); 143} 144 145function removeWorker(worker) { 146 assert(worker); 147 delete cluster.workers[worker.id]; 148 149 if (ObjectKeys(cluster.workers).length === 0) { 150 assert(handles.size === 0, 'Resource leak detected.'); 151 intercom.emit('disconnect'); 152 } 153} 154 155function removeHandlesForWorker(worker) { 156 assert(worker); 157 158 handles.forEach((handle, key) => { 159 if (handle.remove(worker)) 160 handles.delete(key); 161 }); 162} 163 164cluster.fork = function(env) { 165 cluster.setupMaster(); 166 const id = ++ids; 167 const workerProcess = createWorkerProcess(id, env); 168 const worker = new Worker({ 169 id: id, 170 process: workerProcess 171 }); 172 173 worker.on('message', function(message, handle) { 174 cluster.emit('message', this, message, handle); 175 }); 176 177 worker.process.once('exit', (exitCode, signalCode) => { 178 /* 179 * Remove the worker from the workers list only 180 * if it has disconnected, otherwise we might 181 * still want to access it. 182 */ 183 if (!worker.isConnected()) { 184 removeHandlesForWorker(worker); 185 removeWorker(worker); 186 } 187 188 worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect; 189 worker.state = 'dead'; 190 worker.emit('exit', exitCode, signalCode); 191 cluster.emit('exit', worker, exitCode, signalCode); 192 }); 193 194 worker.process.once('disconnect', () => { 195 /* 196 * Now is a good time to remove the handles 197 * associated with this worker because it is 198 * not connected to the master anymore. 199 */ 200 removeHandlesForWorker(worker); 201 202 /* 203 * Remove the worker from the workers list only 204 * if its process has exited. Otherwise, we might 205 * still want to access it. 206 */ 207 if (worker.isDead()) 208 removeWorker(worker); 209 210 worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect; 211 worker.state = 'disconnected'; 212 worker.emit('disconnect'); 213 cluster.emit('disconnect', worker); 214 }); 215 216 worker.process.on('internalMessage', internal(worker, onmessage)); 217 process.nextTick(emitForkNT, worker); 218 cluster.workers[worker.id] = worker; 219 return worker; 220}; 221 222function emitForkNT(worker) { 223 cluster.emit('fork', worker); 224} 225 226cluster.disconnect = function(cb) { 227 const workers = ObjectKeys(cluster.workers); 228 229 if (workers.length === 0) { 230 process.nextTick(() => intercom.emit('disconnect')); 231 } else { 232 for (const worker of ObjectValues(cluster.workers)) { 233 if (worker.isConnected()) { 234 worker.disconnect(); 235 } 236 } 237 } 238 239 if (typeof cb === 'function') 240 intercom.once('disconnect', cb); 241}; 242 243function onmessage(message, handle) { 244 const worker = this; 245 246 if (message.act === 'online') 247 online(worker); 248 else if (message.act === 'queryServer') 249 queryServer(worker, message); 250 else if (message.act === 'listening') 251 listening(worker, message); 252 else if (message.act === 'exitedAfterDisconnect') 253 exitedAfterDisconnect(worker, message); 254 else if (message.act === 'close') 255 close(worker, message); 256} 257 258function online(worker) { 259 worker.state = 'online'; 260 worker.emit('online'); 261 cluster.emit('online', worker); 262} 263 264function exitedAfterDisconnect(worker, message) { 265 worker.exitedAfterDisconnect = true; 266 send(worker, { ack: message.seq }); 267} 268 269function queryServer(worker, message) { 270 // Stop processing if worker already disconnecting 271 if (worker.exitedAfterDisconnect) 272 return; 273 274 const key = `${message.address}:${message.port}:${message.addressType}:` + 275 `${message.fd}:${message.index}`; 276 let handle = handles.get(key); 277 278 if (handle === undefined) { 279 let address = message.address; 280 281 // Find shortest path for unix sockets because of the ~100 byte limit 282 if (message.port < 0 && typeof address === 'string' && 283 process.platform !== 'win32') { 284 285 address = path.relative(process.cwd(), address); 286 287 if (message.address.length < address.length) 288 address = message.address; 289 } 290 291 let constructor = RoundRobinHandle; 292 // UDP is exempt from round-robin connection balancing for what should 293 // be obvious reasons: it's connectionless. There is nothing to send to 294 // the workers except raw datagrams and that's pointless. 295 if (schedulingPolicy !== SCHED_RR || 296 message.addressType === 'udp4' || 297 message.addressType === 'udp6') { 298 constructor = SharedHandle; 299 } 300 301 handle = new constructor(key, address, message); 302 handles.set(key, handle); 303 } 304 305 if (!handle.data) 306 handle.data = message.data; 307 308 // Set custom server data 309 handle.add(worker, (errno, reply, handle) => { 310 const { data } = handles.get(key); 311 312 if (errno) 313 handles.delete(key); // Gives other workers a chance to retry. 314 315 send(worker, { 316 errno, 317 key, 318 ack: message.seq, 319 data, 320 ...reply 321 }, handle); 322 }); 323} 324 325function listening(worker, message) { 326 const info = { 327 addressType: message.addressType, 328 address: message.address, 329 port: message.port, 330 fd: message.fd 331 }; 332 333 worker.state = 'listening'; 334 worker.emit('listening', info); 335 cluster.emit('listening', worker, info); 336} 337 338// Server in worker is closing, remove from list. The handle may have been 339// removed by a prior call to removeHandlesForWorker() so guard against that. 340function close(worker, message) { 341 const key = message.key; 342 const handle = handles.get(key); 343 344 if (handle && handle.remove(worker)) 345 handles.delete(key); 346} 347 348function send(worker, message, handle, cb) { 349 return sendHelper(worker.process, message, handle, cb); 350} 351 352// Extend generic Worker with methods specific to the master process. 353Worker.prototype.disconnect = function() { 354 this.exitedAfterDisconnect = true; 355 send(this, { act: 'disconnect' }); 356 removeHandlesForWorker(this); 357 removeWorker(this); 358 return this; 359}; 360 361Worker.prototype.destroy = function(signo) { 362 const proc = this.process; 363 364 signo = signo || 'SIGTERM'; 365 366 if (this.isConnected()) { 367 this.once('disconnect', () => proc.kill(signo)); 368 this.disconnect(); 369 return; 370 } 371 372 proc.kill(signo); 373}; 374