• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23const common = require('../common');
24if (common.isWindows)
25  common.skip('dgram clustering is currently not supported on Windows.');
26
27const NUM_WORKERS = 4;
28const PACKETS_PER_WORKER = 10;
29
30const assert = require('assert');
31const cluster = require('cluster');
32const dgram = require('dgram');
33
34if (cluster.isPrimary)
35  primary();
36else
37  worker();
38
39
40function primary() {
41  let listening = 0;
42
43  // Fork 4 workers.
44  for (let i = 0; i < NUM_WORKERS; i++)
45    cluster.fork();
46
47  // Wait until all workers are listening.
48  cluster.on('listening', common.mustCall((worker, address) => {
49    if (++listening < NUM_WORKERS)
50      return;
51
52    // Start sending messages.
53    const buf = Buffer.from('hello world');
54    const socket = dgram.createSocket('udp4');
55    let sent = 0;
56    doSend();
57
58    function doSend() {
59      socket.send(buf, 0, buf.length, address.port, address.address, afterSend);
60    }
61
62    function afterSend() {
63      sent++;
64      if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
65        doSend();
66      } else {
67        socket.close();
68      }
69    }
70  }, NUM_WORKERS));
71
72  // Set up event handlers for every worker. Each worker sends a message when
73  // it has received the expected number of packets. After that it disconnects.
74  for (const key in cluster.workers) {
75    if (Object.hasOwn(cluster.workers, key))
76      setupWorker(cluster.workers[key]);
77  }
78
79  function setupWorker(worker) {
80    let received = 0;
81
82    worker.on('message', common.mustCall((msg) => {
83      received = msg.received;
84      worker.disconnect();
85    }));
86
87    worker.on('exit', common.mustCall(() => {
88      assert.strictEqual(received, PACKETS_PER_WORKER);
89    }));
90  }
91}
92
93
94function worker() {
95  let received = 0;
96
97  // Create udp socket and start listening.
98  const socket = dgram.createSocket('udp4');
99
100  socket.on('message', common.mustCall((data, info) => {
101    received++;
102
103    // Every 10 messages, notify the primary.
104    if (received === PACKETS_PER_WORKER) {
105      process.send({ received });
106      socket.close();
107    }
108  }, PACKETS_PER_WORKER));
109
110  socket.bind(0);
111}
112