1'use strict'; 2 3const { 4 ArrayPrototypePush, 5 ArrayPrototypeSlice, 6 ArrayPrototypeSome, 7 ObjectKeys, 8 ObjectValues, 9 RegExpPrototypeTest, 10 SafeMap, 11 StringPrototypeStartsWith, 12} = primordials; 13 14const assert = require('internal/assert'); 15const { fork } = require('child_process'); 16const path = require('path'); 17const EventEmitter = require('events'); 18const RoundRobinHandle = require('internal/cluster/round_robin_handle'); 19const SharedHandle = require('internal/cluster/shared_handle'); 20const Worker = require('internal/cluster/worker'); 21const { internal, sendHelper } = require('internal/cluster/utils'); 22const cluster = new EventEmitter(); 23const intercom = new EventEmitter(); 24const SCHED_NONE = 1; 25const SCHED_RR = 2; 26const [ minPort, maxPort ] = [ 1024, 65535 ]; 27const { validatePort } = require('internal/validators'); 28 29module.exports = cluster; 30 31const handles = new SafeMap(); 32cluster.isWorker = false; 33cluster.isMaster = true; 34cluster.Worker = Worker; 35cluster.workers = {}; 36cluster.settings = {}; 37cluster.SCHED_NONE = SCHED_NONE; // Leave it to the operating system. 38cluster.SCHED_RR = SCHED_RR; // Master distributes connections. 39 40let ids = 0; 41let debugPortOffset = 1; 42let initialized = false; 43 44// XXX(bnoordhuis) Fold cluster.schedulingPolicy into cluster.settings? 45let schedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY; 46if (schedulingPolicy === 'rr') 47 schedulingPolicy = SCHED_RR; 48else if (schedulingPolicy === 'none') 49 schedulingPolicy = SCHED_NONE; 50else if (process.platform === 'win32') { 51 // Round-robin doesn't perform well on 52 // Windows due to the way IOCP is wired up. 53 schedulingPolicy = SCHED_NONE; 54} else 55 schedulingPolicy = SCHED_RR; 56 57cluster.schedulingPolicy = schedulingPolicy; 58 59cluster.setupMaster = function(options) { 60 const settings = { 61 args: ArrayPrototypeSlice(process.argv, 2), 62 exec: process.argv[1], 63 execArgv: process.execArgv, 64 silent: false, 65 ...cluster.settings, 66 ...options 67 }; 68 69 // Tell V8 to write profile data for each process to a separate file. 70 // Without --logfile=v8-%p.log, everything ends up in a single, unusable 71 // file. (Unusable because what V8 logs are memory addresses and each 72 // process has its own memory mappings.) 73 if (ArrayPrototypeSome(settings.execArgv, 74 (s) => StringPrototypeStartsWith(s, '--prof')) && 75 !ArrayPrototypeSome(settings.execArgv, 76 (s) => StringPrototypeStartsWith(s, '--logfile='))) { 77 settings.execArgv = [...settings.execArgv, '--logfile=v8-%p.log']; 78 } 79 80 cluster.settings = settings; 81 82 if (initialized === true) 83 return process.nextTick(setupSettingsNT, settings); 84 85 initialized = true; 86 schedulingPolicy = cluster.schedulingPolicy; // Freeze policy. 87 assert(schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR, 88 `Bad cluster.schedulingPolicy: ${schedulingPolicy}`); 89 90 process.nextTick(setupSettingsNT, settings); 91 92 process.on('internalMessage', (message) => { 93 if (message.cmd !== 'NODE_DEBUG_ENABLED') 94 return; 95 96 for (const worker of ObjectValues(cluster.workers)) { 97 if (worker.state === 'online' || worker.state === 'listening') { 98 process._debugProcess(worker.process.pid); 99 } else { 100 worker.once('online', function() { 101 process._debugProcess(this.process.pid); 102 }); 103 } 104 } 105 }); 106}; 107 108function setupSettingsNT(settings) { 109 cluster.emit('setup', settings); 110} 111 112function createWorkerProcess(id, env) { 113 const workerEnv = { ...process.env, ...env, NODE_UNIQUE_ID: `${id}` }; 114 const execArgv = [...cluster.settings.execArgv]; 115 const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/; 116 const nodeOptions = process.env.NODE_OPTIONS ? 117 process.env.NODE_OPTIONS : ''; 118 119 if (ArrayPrototypeSome(execArgv, 120 (arg) => RegExpPrototypeTest(debugArgRegex, arg)) || 121 RegExpPrototypeTest(debugArgRegex, nodeOptions)) { 122 let inspectPort; 123 if ('inspectPort' in cluster.settings) { 124 if (typeof cluster.settings.inspectPort === 'function') 125 inspectPort = cluster.settings.inspectPort(); 126 else 127 inspectPort = cluster.settings.inspectPort; 128 129 validatePort(inspectPort); 130 } else { 131 inspectPort = process.debugPort + debugPortOffset; 132 if (inspectPort > maxPort) 133 inspectPort = inspectPort - maxPort + minPort - 1; 134 debugPortOffset++; 135 } 136 137 ArrayPrototypePush(execArgv, `--inspect-port=${inspectPort}`); 138 } 139 140 return fork(cluster.settings.exec, cluster.settings.args, { 141 cwd: cluster.settings.cwd, 142 env: workerEnv, 143 serialization: cluster.settings.serialization, 144 silent: cluster.settings.silent, 145 windowsHide: cluster.settings.windowsHide, 146 execArgv: execArgv, 147 stdio: cluster.settings.stdio, 148 gid: cluster.settings.gid, 149 uid: cluster.settings.uid 150 }); 151} 152 153function removeWorker(worker) { 154 assert(worker); 155 delete cluster.workers[worker.id]; 156 157 if (ObjectKeys(cluster.workers).length === 0) { 158 assert(handles.size === 0, 'Resource leak detected.'); 159 intercom.emit('disconnect'); 160 } 161} 162 163function removeHandlesForWorker(worker) { 164 assert(worker); 165 166 handles.forEach((handle, key) => { 167 if (handle.remove(worker)) 168 handles.delete(key); 169 }); 170} 171 172cluster.fork = function(env) { 173 cluster.setupMaster(); 174 const id = ++ids; 175 const workerProcess = createWorkerProcess(id, env); 176 const worker = new Worker({ 177 id: id, 178 process: workerProcess 179 }); 180 181 worker.on('message', function(message, handle) { 182 cluster.emit('message', this, message, handle); 183 }); 184 185 worker.process.once('exit', (exitCode, signalCode) => { 186 /* 187 * Remove the worker from the workers list only 188 * if it has disconnected, otherwise we might 189 * still want to access it. 190 */ 191 if (!worker.isConnected()) { 192 removeHandlesForWorker(worker); 193 removeWorker(worker); 194 } 195 196 worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect; 197 worker.state = 'dead'; 198 worker.emit('exit', exitCode, signalCode); 199 cluster.emit('exit', worker, exitCode, signalCode); 200 }); 201 202 worker.process.once('disconnect', () => { 203 /* 204 * Now is a good time to remove the handles 205 * associated with this worker because it is 206 * not connected to the master anymore. 207 */ 208 removeHandlesForWorker(worker); 209 210 /* 211 * Remove the worker from the workers list only 212 * if its process has exited. Otherwise, we might 213 * still want to access it. 214 */ 215 if (worker.isDead()) 216 removeWorker(worker); 217 218 worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect; 219 worker.state = 'disconnected'; 220 worker.emit('disconnect'); 221 cluster.emit('disconnect', worker); 222 }); 223 224 worker.process.on('internalMessage', internal(worker, onmessage)); 225 process.nextTick(emitForkNT, worker); 226 cluster.workers[worker.id] = worker; 227 return worker; 228}; 229 230function emitForkNT(worker) { 231 cluster.emit('fork', worker); 232} 233 234cluster.disconnect = function(cb) { 235 const workers = ObjectKeys(cluster.workers); 236 237 if (workers.length === 0) { 238 process.nextTick(() => intercom.emit('disconnect')); 239 } else { 240 for (const worker of ObjectValues(cluster.workers)) { 241 if (worker.isConnected()) { 242 worker.disconnect(); 243 } 244 } 245 } 246 247 if (typeof cb === 'function') 248 intercom.once('disconnect', cb); 249}; 250 251function onmessage(message, handle) { 252 const worker = this; 253 254 if (message.act === 'online') 255 online(worker); 256 else if (message.act === 'queryServer') 257 queryServer(worker, message); 258 else if (message.act === 'listening') 259 listening(worker, message); 260 else if (message.act === 'exitedAfterDisconnect') 261 exitedAfterDisconnect(worker, message); 262 else if (message.act === 'close') 263 close(worker, message); 264} 265 266function online(worker) { 267 worker.state = 'online'; 268 worker.emit('online'); 269 cluster.emit('online', worker); 270} 271 272function exitedAfterDisconnect(worker, message) { 273 worker.exitedAfterDisconnect = true; 274 send(worker, { ack: message.seq }); 275} 276 277function queryServer(worker, message) { 278 // Stop processing if worker already disconnecting 279 if (worker.exitedAfterDisconnect) 280 return; 281 282 const key = `${message.address}:${message.port}:${message.addressType}:` + 283 `${message.fd}:${message.index}`; 284 let handle = handles.get(key); 285 286 if (handle === undefined) { 287 let address = message.address; 288 289 // Find shortest path for unix sockets because of the ~100 byte limit 290 if (message.port < 0 && typeof address === 'string' && 291 process.platform !== 'win32') { 292 293 address = path.relative(process.cwd(), address); 294 295 if (message.address.length < address.length) 296 address = message.address; 297 } 298 299 let constructor = RoundRobinHandle; 300 // UDP is exempt from round-robin connection balancing for what should 301 // be obvious reasons: it's connectionless. There is nothing to send to 302 // the workers except raw datagrams and that's pointless. 303 if (schedulingPolicy !== SCHED_RR || 304 message.addressType === 'udp4' || 305 message.addressType === 'udp6') { 306 constructor = SharedHandle; 307 } 308 309 handle = new constructor(key, address, message); 310 handles.set(key, handle); 311 } 312 313 if (!handle.data) 314 handle.data = message.data; 315 316 // Set custom server data 317 handle.add(worker, (errno, reply, handle) => { 318 const { data } = handles.get(key); 319 320 if (errno) 321 handles.delete(key); // Gives other workers a chance to retry. 322 323 send(worker, { 324 errno, 325 key, 326 ack: message.seq, 327 data, 328 ...reply 329 }, handle); 330 }); 331} 332 333function listening(worker, message) { 334 const info = { 335 addressType: message.addressType, 336 address: message.address, 337 port: message.port, 338 fd: message.fd 339 }; 340 341 worker.state = 'listening'; 342 worker.emit('listening', info); 343 cluster.emit('listening', worker, info); 344} 345 346// Server in worker is closing, remove from list. The handle may have been 347// removed by a prior call to removeHandlesForWorker() so guard against that. 348function close(worker, message) { 349 const key = message.key; 350 const handle = handles.get(key); 351 352 if (handle && handle.remove(worker)) 353 handles.delete(key); 354} 355 356function send(worker, message, handle, cb) { 357 return sendHelper(worker.process, message, handle, cb); 358} 359 360// Extend generic Worker with methods specific to the master process. 361Worker.prototype.disconnect = function() { 362 this.exitedAfterDisconnect = true; 363 send(this, { act: 'disconnect' }); 364 removeHandlesForWorker(this); 365 removeWorker(this); 366 return this; 367}; 368 369Worker.prototype.destroy = function(signo) { 370 const proc = this.process; 371 372 signo = signo || 'SIGTERM'; 373 374 if (this.isConnected()) { 375 this.once('disconnect', () => proc.kill(signo)); 376 this.disconnect(); 377 return; 378 } 379 380 proc.kill(signo); 381}; 382