• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2const common = require('../common');
3const assert = require('assert');
4const dgram = require('dgram');
5const util = require('util');
6
7if (common.inFreeBSDJail) {
8  common.skip('in a FreeBSD jail');
9  return;
10}
11
12// All SunOS systems must be able to pass this manual test before the
13//   following barrier can be removed:
14// $ socat UDP-RECVFROM:12356,ip-add-membership=224.0.0.115:127.0.0.1,fork \
15//   EXEC:hostname &
16// $ echo hi |socat STDIO \
17//   UDP4-DATAGRAM:224.0.0.115:12356,ip-multicast-if=127.0.0.1
18
19if (common.isSunOS) {
20  common.skip('SunOs is not correctly delivering to loopback multicast.');
21  return;
22}
23
24const networkInterfaces = require('os').networkInterfaces();
25const fork = require('child_process').fork;
26const MULTICASTS = {
27  IPv4: ['224.0.0.115', '224.0.0.116', '224.0.0.117'],
28  IPv6: ['ff02::1:115', 'ff02::1:116', 'ff02::1:117'],
29};
30const LOOPBACK = { IPv4: '127.0.0.1', IPv6: '::1' };
31const ANY = { IPv4: '0.0.0.0', IPv6: '::' };
32const FAM = 'IPv4';
33
34// Windows wont bind on multicasts so its filtering is by port.
35const PORTS = {};
36for (let i = 0; i < MULTICASTS[FAM].length; i++) {
37  PORTS[MULTICASTS[FAM][i]] = common.PORT + (common.isWindows ? i : 0);
38}
39
40const UDP = { IPv4: 'udp4', IPv6: 'udp6' };
41
42const TIMEOUT = common.platformTimeout(5000);
43const NOW = Date.now();
44const TMPL = (tail) => `${NOW} - ${tail}`;
45
46// Take the first non-internal interface as the other interface to isolate
47// from loopback. Ideally, this should check for whether or not this interface
48// and the loopback have the MULTICAST flag.
49const interfaceAddress = ((networkInterfaces) => {
50  for (const name in networkInterfaces) {
51    for (const localInterface of networkInterfaces[name]) {
52      if (!localInterface.internal && localInterface.family === FAM) {
53        let interfaceAddress = localInterface.address;
54        // On Windows, IPv6 would need: `%${localInterface.scopeid}`
55        if (FAM === 'IPv6')
56          interfaceAddress += `${interfaceAddress}%${name}`;
57        return interfaceAddress;
58      }
59    }
60  }
61})(networkInterfaces);
62
63assert.ok(interfaceAddress);
64
65const messages = [
66  { tail: 'First message to send', mcast: MULTICASTS[FAM][0], rcv: true },
67  { tail: 'Second message to send', mcast: MULTICASTS[FAM][0], rcv: true },
68  { tail: 'Third message to send', mcast: MULTICASTS[FAM][1], rcv: true,
69    newAddr: interfaceAddress },
70  { tail: 'Fourth message to send', mcast: MULTICASTS[FAM][2] },
71  { tail: 'Fifth message to send', mcast: MULTICASTS[FAM][1], rcv: true },
72  { tail: 'Sixth message to send', mcast: MULTICASTS[FAM][2], rcv: true,
73    newAddr: LOOPBACK[FAM] },
74];
75
76
77if (process.argv[2] !== 'child') {
78  const IFACES = [ANY[FAM], interfaceAddress, LOOPBACK[FAM]];
79  const workers = {};
80  const listeners = MULTICASTS[FAM].length * 2;
81  let listening = 0;
82  let dead = 0;
83  let i = 0;
84  let done = 0;
85  let timer = null;
86
87  const killSubprocesses = (subprocesses) => {
88    for (const i in subprocesses)
89      subprocesses[i].kill();
90  };
91
92  // Exit the test if it doesn't succeed within the TIMEOUT.
93  timer = setTimeout(() => {
94    console.error('[PARENT] Responses were not received within %d ms.',
95                  TIMEOUT);
96    console.error('[PARENT] Skip');
97
98    killSubprocesses(workers);
99    common.skip('Check filter policy');
100
101    process.exit(1);
102  }, TIMEOUT);
103
104  // Launch the child processes.
105  for (let i = 0; i < listeners; i++) {
106    const IFACE = IFACES[i % IFACES.length];
107    const MULTICAST = MULTICASTS[FAM][i % MULTICASTS[FAM].length];
108
109    const messagesNeeded = messages.filter((m) => m.rcv &&
110                                                  m.mcast === MULTICAST)
111                                   .map((m) => TMPL(m.tail));
112    const worker = fork(process.argv[1],
113                        ['child',
114                         IFACE,
115                         MULTICAST,
116                         messagesNeeded.length,
117                         NOW]);
118    workers[worker.pid] = worker;
119
120    worker.messagesReceived = [];
121    worker.messagesNeeded = messagesNeeded;
122
123    // Handle the death of workers.
124    worker.on('exit', (code) => {
125      // Don't consider this a true death if the worker has finished
126      // successfully or if the exit code is 0.
127      if (worker.isDone || code === 0) {
128        return;
129      }
130
131      dead += 1;
132      console.error('[PARENT] Worker %d died. %d dead of %d',
133                    worker.pid,
134                    dead,
135                    listeners);
136
137      if (dead === listeners) {
138        console.error('[PARENT] All workers have died.');
139        console.error('[PARENT] Fail');
140
141        killSubprocesses(workers);
142
143        process.exit(1);
144      }
145    });
146
147    worker.on('message', (msg) => {
148      if (msg.listening) {
149        listening += 1;
150
151        if (listening === listeners) {
152          // All child process are listening, so start sending.
153          sendSocket.sendNext();
154        }
155      } else if (msg.message) {
156        worker.messagesReceived.push(msg.message);
157
158        if (worker.messagesReceived.length === worker.messagesNeeded.length) {
159          done += 1;
160          worker.isDone = true;
161          console.error('[PARENT] %d received %d messages total.',
162                        worker.pid,
163                        worker.messagesReceived.length);
164        }
165
166        if (done === listeners) {
167          console.error('[PARENT] All workers have received the ' +
168                        'required number of ' +
169                        'messages. Will now compare.');
170
171          Object.keys(workers).forEach((pid) => {
172            const worker = workers[pid];
173
174            let count = 0;
175
176            worker.messagesReceived.forEach((buf) => {
177              for (let i = 0; i < worker.messagesNeeded.length; ++i) {
178                if (buf.toString() === worker.messagesNeeded[i]) {
179                  count++;
180                  break;
181                }
182              }
183            });
184
185            console.error('[PARENT] %d received %d matching messages.',
186                          worker.pid,
187                          count);
188
189            assert.strictEqual(count, worker.messagesNeeded.length,
190                               'A worker received ' +
191                               'an invalid multicast message');
192          });
193
194          clearTimeout(timer);
195          console.error('[PARENT] Success');
196          killSubprocesses(workers);
197        }
198      }
199    });
200  }
201
202  const sendSocket = dgram.createSocket({
203    type: UDP[FAM],
204    reuseAddr: true,
205  });
206
207  // Don't bind the address explicitly when sending and start with
208  // the OSes default multicast interface selection.
209  sendSocket.bind(common.PORT, ANY[FAM]);
210  sendSocket.on('listening', () => {
211    console.error(`outgoing iface ${interfaceAddress}`);
212  });
213
214  sendSocket.on('close', () => {
215    console.error('[PARENT] sendSocket closed');
216  });
217
218  sendSocket.sendNext = () => {
219    const msg = messages[i++];
220
221    if (!msg) {
222      sendSocket.close();
223      return;
224    }
225    console.error(TMPL(NOW, msg.tail));
226    const buf = Buffer.from(TMPL(msg.tail));
227    if (msg.newAddr) {
228      console.error(`changing outgoing multicast ${msg.newAddr}`);
229      sendSocket.setMulticastInterface(msg.newAddr);
230    }
231    sendSocket.send(
232      buf,
233      0,
234      buf.length,
235      PORTS[msg.mcast],
236      msg.mcast,
237      (err) => {
238        assert.ifError(err);
239        console.error('[PARENT] sent %s to %s:%s',
240                      util.inspect(buf.toString()),
241                      msg.mcast, PORTS[msg.mcast]);
242
243        process.nextTick(sendSocket.sendNext);
244      },
245    );
246  };
247}
248
249if (process.argv[2] === 'child') {
250  const IFACE = process.argv[3];
251  const MULTICAST = process.argv[4];
252  const NEEDEDMSGS = Number(process.argv[5]);
253  const SESSION = Number(process.argv[6]);
254  const receivedMessages = [];
255
256  console.error(`pid ${process.pid} iface ${IFACE} MULTICAST ${MULTICAST}`);
257  const listenSocket = dgram.createSocket({
258    type: UDP[FAM],
259    reuseAddr: true,
260  });
261
262  listenSocket.on('message', (buf, rinfo) => {
263    // Examine udp messages only when they were sent by the parent.
264    if (!buf.toString().startsWith(SESSION)) return;
265
266    console.error('[CHILD] %s received %s from %j',
267                  process.pid,
268                  util.inspect(buf.toString()),
269                  rinfo);
270
271    receivedMessages.push(buf);
272
273    let closecb;
274
275    if (receivedMessages.length === NEEDEDMSGS) {
276      listenSocket.close();
277      closecb = () => process.exit();
278    }
279
280    process.send({ message: buf.toString() }, closecb);
281  });
282
283
284  listenSocket.on('listening', () => {
285    listenSocket.addMembership(MULTICAST, IFACE);
286    process.send({ listening: true });
287  });
288
289  if (common.isWindows)
290    listenSocket.bind(PORTS[MULTICAST], ANY[FAM]);
291  else
292    listenSocket.bind(common.PORT, MULTICAST);
293}
294