• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2const common = require('../common');
3// Skip test in FreeBSD jails.
4if (common.inFreeBSDJail)
5  common.skip('In a FreeBSD jail');
6
7const assert = require('assert');
8const dgram = require('dgram');
9const fork = require('child_process').fork;
10const networkInterfaces = require('os').networkInterfaces();
11const GROUP_ADDRESS = 'ff3e::1234';
12const TIMEOUT = common.platformTimeout(5000);
13const messages = [
14  Buffer.from('First message to send'),
15  Buffer.from('Second message to send'),
16  Buffer.from('Third message to send'),
17  Buffer.from('Fourth message to send'),
18];
19const workers = {};
20const listeners = 3;
21let listening, sendSocket, done, timer, dead;
22
23let sourceAddress = null;
24
25// Take the first non-internal interface as the IPv6 address for binding.
26// Ideally, this should check favor internal/private interfaces.
27get_sourceAddress: for (const name in networkInterfaces) {
28  const interfaces = networkInterfaces[name];
29  for (let i = 0; i < interfaces.length; i++) {
30    const localInterface = interfaces[i];
31    if (!localInterface.internal && localInterface.family === 'IPv6') {
32      sourceAddress = localInterface.address;
33      break get_sourceAddress;
34    }
35  }
36}
37assert.ok(sourceAddress);
38
39function launchChildProcess() {
40  const worker = fork(__filename, ['child']);
41  workers[worker.pid] = worker;
42
43  worker.messagesReceived = [];
44
45  // Handle the death of workers.
46  worker.on('exit', function(code) {
47    // Don't consider this the true death if the worker has finished
48    // successfully or if the exit code is 0.
49    if (worker.isDone || code === 0) {
50      return;
51    }
52
53    dead += 1;
54    console.error('[PARENT] Worker %d died. %d dead of %d',
55                  worker.pid,
56                  dead,
57                  listeners);
58
59    if (dead === listeners) {
60      console.error('[PARENT] All workers have died.');
61      console.error('[PARENT] Fail');
62      assert.fail();
63    }
64  });
65
66  worker.on('message', function(msg) {
67    if (msg.listening) {
68      listening += 1;
69
70      if (listening === listeners) {
71        // All child process are listening, so start sending.
72        sendSocket.sendNext();
73      }
74      return;
75    }
76    if (msg.message) {
77      worker.messagesReceived.push(msg.message);
78
79      if (worker.messagesReceived.length === messages.length) {
80        done += 1;
81        worker.isDone = true;
82        console.error('[PARENT] %d received %d messages total.',
83                      worker.pid,
84                      worker.messagesReceived.length);
85      }
86
87      if (done === listeners) {
88        console.error('[PARENT] All workers have received the ' +
89                      'required number of messages. Will now compare.');
90
91        Object.keys(workers).forEach(function(pid) {
92          const worker = workers[pid];
93
94          let count = 0;
95
96          worker.messagesReceived.forEach(function(buf) {
97            for (let i = 0; i < messages.length; ++i) {
98              if (buf.toString() === messages[i].toString()) {
99                count++;
100                break;
101              }
102            }
103          });
104
105          console.error('[PARENT] %d received %d matching messages.',
106                        worker.pid, count);
107
108          assert.strictEqual(count, messages.length);
109        });
110
111        clearTimeout(timer);
112        console.error('[PARENT] Success');
113        killChildren(workers);
114      }
115    }
116  });
117}
118
119function killChildren(children) {
120  Object.keys(children).forEach(function(key) {
121    const child = children[key];
122    child.kill();
123  });
124}
125
126if (process.argv[2] !== 'child') {
127  listening = 0;
128  dead = 0;
129  let i = 0;
130  done = 0;
131
132  // Exit the test if it doesn't succeed within TIMEOUT.
133  timer = setTimeout(function() {
134    console.error('[PARENT] Responses were not received within %d ms.',
135                  TIMEOUT);
136    console.error('[PARENT] Fail');
137
138    killChildren(workers);
139
140    assert.fail();
141  }, TIMEOUT);
142
143  // Launch child processes.
144  for (let x = 0; x < listeners; x++) {
145    launchChildProcess(x);
146  }
147
148  sendSocket = dgram.createSocket('udp6');
149
150  // The socket is actually created async now.
151  sendSocket.on('listening', function() {
152    sendSocket.setTTL(1);
153    sendSocket.setBroadcast(true);
154    sendSocket.setMulticastTTL(1);
155    sendSocket.setMulticastLoopback(true);
156    sendSocket.addSourceSpecificMembership(sourceAddress, GROUP_ADDRESS);
157  });
158
159  sendSocket.on('close', function() {
160    console.error('[PARENT] sendSocket closed');
161  });
162
163  sendSocket.sendNext = function() {
164    const buf = messages[i++];
165
166    if (!buf) {
167      try { sendSocket.close(); } catch {}
168      return;
169    }
170
171    sendSocket.send(
172      buf,
173      0,
174      buf.length,
175      common.PORT,
176      GROUP_ADDRESS,
177      function(err) {
178        assert.ifError(err);
179        console.error('[PARENT] sent "%s" to %s:%s',
180                      buf.toString(),
181                      GROUP_ADDRESS, common.PORT);
182        process.nextTick(sendSocket.sendNext);
183      }
184    );
185  };
186}
187
188if (process.argv[2] === 'child') {
189  const receivedMessages = [];
190  const listenSocket = dgram.createSocket({
191    type: 'udp6',
192    reuseAddr: true
193  });
194
195  listenSocket.on('listening', function() {
196    listenSocket.setMulticastLoopback(true);
197    listenSocket.addSourceSpecificMembership(sourceAddress, GROUP_ADDRESS);
198
199    listenSocket.on('message', function(buf, rinfo) {
200      console.error('[CHILD] %s received "%s" from %j', process.pid,
201                    buf.toString(), rinfo);
202
203      receivedMessages.push(buf);
204
205      process.send({ message: buf.toString() });
206
207      if (receivedMessages.length === messages.length) {
208        // .dropSourceSpecificMembership() not strictly needed,
209        // it is here as a sanity check.
210        listenSocket.dropSourceSpecificMembership(sourceAddress, GROUP_ADDRESS);
211        process.nextTick(function() {
212          listenSocket.close();
213        });
214      }
215    });
216
217    listenSocket.on('close', function() {
218      // HACK: Wait to exit the process to ensure that the parent
219      // process has had time to receive all messages via process.send()
220      // This may be indicative of some other issue.
221      setTimeout(function() {
222        process.exit();
223      }, common.platformTimeout(1000));
224    });
225    process.send({ listening: true });
226  });
227
228  listenSocket.bind(common.PORT);
229}
230