• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23const common = require('../common');
24if (common.inFreeBSDJail)
25  common.skip('in a FreeBSD jail');
26
27const assert = require('assert');
28const dgram = require('dgram');
29const util = require('util');
30const networkInterfaces = require('os').networkInterfaces();
31const { fork } = require('child_process');
32const LOCAL_BROADCAST_HOST = '255.255.255.255';
33const TIMEOUT = common.platformTimeout(5000);
34const messages = [
35  Buffer.from('First message to send'),
36  Buffer.from('Second message to send'),
37  Buffer.from('Third message to send'),
38  Buffer.from('Fourth message to send'),
39];
40
41let bindAddress = null;
42
43// Take the first non-internal interface as the address for binding.
44// Ideally, this should check for whether or not an interface is set up for
45// BROADCAST and favor internal/private interfaces.
46get_bindAddress: for (const name in networkInterfaces) {
47  const interfaces = networkInterfaces[name];
48  for (let i = 0; i < interfaces.length; i++) {
49    const localInterface = interfaces[i];
50    if (!localInterface.internal && localInterface.family === 'IPv4') {
51      bindAddress = localInterface.address;
52      break get_bindAddress;
53    }
54  }
55}
56assert.ok(bindAddress);
57
58if (process.argv[2] !== 'child') {
59  const workers = {};
60  const listeners = 3;
61  let listening = 0;
62  let dead = 0;
63  let i = 0;
64  let done = 0;
65  let timer = null;
66
67  // Exit the test if it doesn't succeed within TIMEOUT
68  timer = setTimeout(() => {
69    console.error('[PARENT] Responses were not received within %d ms.',
70                  TIMEOUT);
71    console.error('[PARENT] Fail');
72
73    killSubprocesses(workers);
74
75    process.exit(1);
76  }, TIMEOUT);
77
78  // Launch child processes
79  for (let x = 0; x < listeners; x++) {
80    (function() {
81      const worker = fork(process.argv[1], ['child']);
82      workers[worker.pid] = worker;
83
84      worker.messagesReceived = [];
85
86      // Handle the death of workers
87      worker.on('exit', (code, signal) => {
88        // Don't consider this the true death if the worker
89        // has finished successfully
90        // or if the exit code is 0
91        if (worker.isDone || code === 0) {
92          return;
93        }
94
95        dead += 1;
96        console.error('[PARENT] Worker %d died. %d dead of %d',
97                      worker.pid,
98                      dead,
99                      listeners);
100
101        assert.notStrictEqual(signal, null);
102
103        if (dead === listeners) {
104          console.error('[PARENT] All workers have died.');
105          console.error('[PARENT] Fail');
106
107          killSubprocesses(workers);
108
109          process.exit(1);
110        }
111      });
112
113      worker.on('message', (msg) => {
114        if (msg.listening) {
115          listening += 1;
116
117          if (listening === listeners) {
118            // All child process are listening, so start sending
119            sendSocket.sendNext();
120          }
121        } else if (msg.message) {
122          worker.messagesReceived.push(msg.message);
123
124          if (worker.messagesReceived.length === messages.length) {
125            done += 1;
126            worker.isDone = true;
127            console.error('[PARENT] %d received %d messages total.',
128                          worker.pid,
129                          worker.messagesReceived.length);
130          }
131
132          if (done === listeners) {
133            console.error('[PARENT] All workers have received the ' +
134                          'required number of ' +
135                          'messages. Will now compare.');
136
137            Object.keys(workers).forEach((pid) => {
138              const worker = workers[pid];
139
140              let count = 0;
141
142              worker.messagesReceived.forEach((buf) => {
143                for (let i = 0; i < messages.length; ++i) {
144                  if (buf.toString() === messages[i].toString()) {
145                    count++;
146                    break;
147                  }
148                }
149              });
150
151              console.error('[PARENT] %d received %d matching messages.',
152                            worker.pid,
153                            count);
154
155              assert.strictEqual(count, messages.length);
156            });
157
158            clearTimeout(timer);
159            console.error('[PARENT] Success');
160            killSubprocesses(workers);
161          }
162        }
163      });
164    })(x);
165  }
166
167  const sendSocket = dgram.createSocket({
168    type: 'udp4',
169    reuseAddr: true
170  });
171
172  // Bind the address explicitly for sending
173  // INADDR_BROADCAST to only one interface
174  sendSocket.bind(common.PORT, bindAddress);
175  sendSocket.on('listening', () => {
176    sendSocket.setBroadcast(true);
177  });
178
179  sendSocket.on('close', () => {
180    console.error('[PARENT] sendSocket closed');
181  });
182
183  sendSocket.sendNext = function() {
184    const buf = messages[i++];
185
186    if (!buf) {
187      try { sendSocket.close(); } catch {}
188      return;
189    }
190
191    sendSocket.send(
192      buf,
193      0,
194      buf.length,
195      common.PORT,
196      LOCAL_BROADCAST_HOST,
197      (err) => {
198        assert.ifError(err);
199        console.error('[PARENT] sent %s to %s:%s',
200                      util.inspect(buf.toString()),
201                      LOCAL_BROADCAST_HOST, common.PORT);
202
203        process.nextTick(sendSocket.sendNext);
204      }
205    );
206  };
207
208  function killSubprocesses(subprocesses) {
209    Object.keys(subprocesses).forEach((key) => {
210      const subprocess = subprocesses[key];
211      subprocess.kill();
212    });
213  }
214}
215
216if (process.argv[2] === 'child') {
217  const receivedMessages = [];
218  const listenSocket = dgram.createSocket({
219    type: 'udp4',
220    reuseAddr: true
221  });
222
223  listenSocket.on('message', (buf, rinfo) => {
224    // Receive udp messages only sent from parent
225    if (rinfo.address !== bindAddress) return;
226
227    console.error('[CHILD] %s received %s from %j',
228                  process.pid,
229                  util.inspect(buf.toString()),
230                  rinfo);
231
232    receivedMessages.push(buf);
233
234    process.send({ message: buf.toString() });
235
236    if (receivedMessages.length === messages.length) {
237      process.nextTick(() => { listenSocket.close(); });
238    }
239  });
240
241  listenSocket.on('close', () => {
242    // HACK: Wait to exit the process to ensure that the parent
243    // process has had time to receive all messages via process.send()
244    // This may be indicative of some other issue.
245    setTimeout(() => { process.exit(); }, 1000);
246  });
247
248  listenSocket.on('listening', () => { process.send({ listening: true }); });
249
250  listenSocket.bind(common.PORT);
251}
252