1'use strict'; 2const common = require('../common'); 3const assert = require('assert'); 4const dgram = require('dgram'); 5const util = require('util'); 6 7if (common.inFreeBSDJail) { 8 common.skip('in a FreeBSD jail'); 9 return; 10} 11 12// All SunOS systems must be able to pass this manual test before the 13// following barrier can be removed: 14// $ socat UDP-RECVFROM:12356,ip-add-membership=224.0.0.115:127.0.0.1,fork \ 15// EXEC:hostname & 16// $ echo hi |socat STDIO \ 17// UDP4-DATAGRAM:224.0.0.115:12356,ip-multicast-if=127.0.0.1 18 19if (common.isSunOS) { 20 common.skip('SunOs is not correctly delivering to loopback multicast.'); 21 return; 22} 23 24const networkInterfaces = require('os').networkInterfaces(); 25const fork = require('child_process').fork; 26const MULTICASTS = { 27 IPv4: ['224.0.0.115', '224.0.0.116', '224.0.0.117'], 28 IPv6: ['ff02::1:115', 'ff02::1:116', 'ff02::1:117'], 29}; 30const LOOPBACK = { IPv4: '127.0.0.1', IPv6: '::1' }; 31const ANY = { IPv4: '0.0.0.0', IPv6: '::' }; 32const FAM = 'IPv4'; 33 34// Windows wont bind on multicasts so its filtering is by port. 35const PORTS = {}; 36for (let i = 0; i < MULTICASTS[FAM].length; i++) { 37 PORTS[MULTICASTS[FAM][i]] = common.PORT + (common.isWindows ? i : 0); 38} 39 40const UDP = { IPv4: 'udp4', IPv6: 'udp6' }; 41 42const TIMEOUT = common.platformTimeout(5000); 43const NOW = Date.now(); 44const TMPL = (tail) => `${NOW} - ${tail}`; 45 46// Take the first non-internal interface as the other interface to isolate 47// from loopback. Ideally, this should check for whether or not this interface 48// and the loopback have the MULTICAST flag. 49const interfaceAddress = ((networkInterfaces) => { 50 for (const name in networkInterfaces) { 51 for (const localInterface of networkInterfaces[name]) { 52 if (!localInterface.internal && localInterface.family === FAM) { 53 let interfaceAddress = localInterface.address; 54 // On Windows, IPv6 would need: `%${localInterface.scopeid}` 55 if (FAM === 'IPv6') 56 interfaceAddress += `${interfaceAddress}%${name}`; 57 return interfaceAddress; 58 } 59 } 60 } 61})(networkInterfaces); 62 63assert.ok(interfaceAddress); 64 65const messages = [ 66 { tail: 'First message to send', mcast: MULTICASTS[FAM][0], rcv: true }, 67 { tail: 'Second message to send', mcast: MULTICASTS[FAM][0], rcv: true }, 68 { tail: 'Third message to send', mcast: MULTICASTS[FAM][1], rcv: true, 69 newAddr: interfaceAddress }, 70 { tail: 'Fourth message to send', mcast: MULTICASTS[FAM][2] }, 71 { tail: 'Fifth message to send', mcast: MULTICASTS[FAM][1], rcv: true }, 72 { tail: 'Sixth message to send', mcast: MULTICASTS[FAM][2], rcv: true, 73 newAddr: LOOPBACK[FAM] }, 74]; 75 76 77if (process.argv[2] !== 'child') { 78 const IFACES = [ANY[FAM], interfaceAddress, LOOPBACK[FAM]]; 79 const workers = {}; 80 const listeners = MULTICASTS[FAM].length * 2; 81 let listening = 0; 82 let dead = 0; 83 let i = 0; 84 let done = 0; 85 let timer = null; 86 87 const killSubprocesses = (subprocesses) => { 88 for (const i in subprocesses) 89 subprocesses[i].kill(); 90 }; 91 92 // Exit the test if it doesn't succeed within the TIMEOUT. 93 timer = setTimeout(() => { 94 console.error('[PARENT] Responses were not received within %d ms.', 95 TIMEOUT); 96 console.error('[PARENT] Skip'); 97 98 killSubprocesses(workers); 99 common.skip('Check filter policy'); 100 101 process.exit(1); 102 }, TIMEOUT); 103 104 // Launch the child processes. 105 for (let i = 0; i < listeners; i++) { 106 const IFACE = IFACES[i % IFACES.length]; 107 const MULTICAST = MULTICASTS[FAM][i % MULTICASTS[FAM].length]; 108 109 const messagesNeeded = messages.filter((m) => m.rcv && 110 m.mcast === MULTICAST) 111 .map((m) => TMPL(m.tail)); 112 const worker = fork(process.argv[1], 113 ['child', 114 IFACE, 115 MULTICAST, 116 messagesNeeded.length, 117 NOW]); 118 workers[worker.pid] = worker; 119 120 worker.messagesReceived = []; 121 worker.messagesNeeded = messagesNeeded; 122 123 // Handle the death of workers. 124 worker.on('exit', (code) => { 125 // Don't consider this a true death if the worker has finished 126 // successfully or if the exit code is 0. 127 if (worker.isDone || code === 0) { 128 return; 129 } 130 131 dead += 1; 132 console.error('[PARENT] Worker %d died. %d dead of %d', 133 worker.pid, 134 dead, 135 listeners); 136 137 if (dead === listeners) { 138 console.error('[PARENT] All workers have died.'); 139 console.error('[PARENT] Fail'); 140 141 killSubprocesses(workers); 142 143 process.exit(1); 144 } 145 }); 146 147 worker.on('message', (msg) => { 148 if (msg.listening) { 149 listening += 1; 150 151 if (listening === listeners) { 152 // All child process are listening, so start sending. 153 sendSocket.sendNext(); 154 } 155 } else if (msg.message) { 156 worker.messagesReceived.push(msg.message); 157 158 if (worker.messagesReceived.length === worker.messagesNeeded.length) { 159 done += 1; 160 worker.isDone = true; 161 console.error('[PARENT] %d received %d messages total.', 162 worker.pid, 163 worker.messagesReceived.length); 164 } 165 166 if (done === listeners) { 167 console.error('[PARENT] All workers have received the ' + 168 'required number of ' + 169 'messages. Will now compare.'); 170 171 Object.keys(workers).forEach((pid) => { 172 const worker = workers[pid]; 173 174 let count = 0; 175 176 worker.messagesReceived.forEach((buf) => { 177 for (let i = 0; i < worker.messagesNeeded.length; ++i) { 178 if (buf.toString() === worker.messagesNeeded[i]) { 179 count++; 180 break; 181 } 182 } 183 }); 184 185 console.error('[PARENT] %d received %d matching messages.', 186 worker.pid, 187 count); 188 189 assert.strictEqual(count, worker.messagesNeeded.length, 190 'A worker received ' + 191 'an invalid multicast message'); 192 }); 193 194 clearTimeout(timer); 195 console.error('[PARENT] Success'); 196 killSubprocesses(workers); 197 } 198 } 199 }); 200 } 201 202 const sendSocket = dgram.createSocket({ 203 type: UDP[FAM], 204 reuseAddr: true, 205 }); 206 207 // Don't bind the address explicitly when sending and start with 208 // the OSes default multicast interface selection. 209 sendSocket.bind(common.PORT, ANY[FAM]); 210 sendSocket.on('listening', () => { 211 console.error(`outgoing iface ${interfaceAddress}`); 212 }); 213 214 sendSocket.on('close', () => { 215 console.error('[PARENT] sendSocket closed'); 216 }); 217 218 sendSocket.sendNext = () => { 219 const msg = messages[i++]; 220 221 if (!msg) { 222 sendSocket.close(); 223 return; 224 } 225 console.error(TMPL(NOW, msg.tail)); 226 const buf = Buffer.from(TMPL(msg.tail)); 227 if (msg.newAddr) { 228 console.error(`changing outgoing multicast ${msg.newAddr}`); 229 sendSocket.setMulticastInterface(msg.newAddr); 230 } 231 sendSocket.send( 232 buf, 233 0, 234 buf.length, 235 PORTS[msg.mcast], 236 msg.mcast, 237 (err) => { 238 assert.ifError(err); 239 console.error('[PARENT] sent %s to %s:%s', 240 util.inspect(buf.toString()), 241 msg.mcast, PORTS[msg.mcast]); 242 243 process.nextTick(sendSocket.sendNext); 244 }, 245 ); 246 }; 247} 248 249if (process.argv[2] === 'child') { 250 const IFACE = process.argv[3]; 251 const MULTICAST = process.argv[4]; 252 const NEEDEDMSGS = Number(process.argv[5]); 253 const SESSION = Number(process.argv[6]); 254 const receivedMessages = []; 255 256 console.error(`pid ${process.pid} iface ${IFACE} MULTICAST ${MULTICAST}`); 257 const listenSocket = dgram.createSocket({ 258 type: UDP[FAM], 259 reuseAddr: true, 260 }); 261 262 listenSocket.on('message', (buf, rinfo) => { 263 // Examine udp messages only when they were sent by the parent. 264 if (!buf.toString().startsWith(SESSION)) return; 265 266 console.error('[CHILD] %s received %s from %j', 267 process.pid, 268 util.inspect(buf.toString()), 269 rinfo); 270 271 receivedMessages.push(buf); 272 273 let closecb; 274 275 if (receivedMessages.length === NEEDEDMSGS) { 276 listenSocket.close(); 277 closecb = () => process.exit(); 278 } 279 280 process.send({ message: buf.toString() }, closecb); 281 }); 282 283 284 listenSocket.on('listening', () => { 285 listenSocket.addMembership(MULTICAST, IFACE); 286 process.send({ listening: true }); 287 }); 288 289 if (common.isWindows) 290 listenSocket.bind(PORTS[MULTICAST], ANY[FAM]); 291 else 292 listenSocket.bind(common.PORT, MULTICAST); 293} 294