• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Flags: --expose-internals
2'use strict';
3const common = require('../common');
4if (common.isWindows)
5  common.skip('dgram clustering is currently not supported on Windows.');
6
7const NUM_WORKERS = 4;
8const PACKETS_PER_WORKER = 10;
9
10const assert = require('assert');
11const cluster = require('cluster');
12const dgram = require('dgram');
13
14if (cluster.isPrimary)
15  primary();
16else
17  worker();
18
19
20function primary() {
21  const { internalBinding } = require('internal/test/binding');
22  const { UDP } = internalBinding('udp_wrap');
23
24  // Create a handle and use its fd.
25  const rawHandle = new UDP();
26  const err = rawHandle.bind(common.localhostIPv4, 0, 0);
27  assert(err >= 0, String(err));
28  assert.notStrictEqual(rawHandle.fd, -1);
29
30  const fd = rawHandle.fd;
31
32  let listening = 0;
33
34  // Fork 4 workers.
35  for (let i = 0; i < NUM_WORKERS; i++)
36    cluster.fork();
37
38  // Wait until all workers are listening.
39  cluster.on('listening', common.mustCall((worker, address) => {
40    if (++listening < NUM_WORKERS)
41      return;
42
43    // Start sending messages.
44    const buf = Buffer.from('hello world');
45    const socket = dgram.createSocket('udp4');
46    let sent = 0;
47    doSend();
48
49    function doSend() {
50      socket.send(buf, 0, buf.length, address.port, address.address, afterSend);
51    }
52
53    function afterSend() {
54      sent++;
55      if (sent < NUM_WORKERS * PACKETS_PER_WORKER) {
56        doSend();
57      } else {
58        socket.close();
59      }
60    }
61  }, NUM_WORKERS));
62
63  // Set up event handlers for every worker. Each worker sends a message when
64  // it has received the expected number of packets. After that it disconnects.
65  for (const key in cluster.workers) {
66    if (Object.hasOwn(cluster.workers, key))
67      setupWorker(cluster.workers[key]);
68  }
69
70  function setupWorker(worker) {
71    let received = 0;
72
73    worker.send({
74      fd,
75    });
76
77    worker.on('message', common.mustCall((msg) => {
78      received = msg.received;
79      worker.disconnect();
80    }));
81
82    worker.on('exit', common.mustCall(() => {
83      assert.strictEqual(received, PACKETS_PER_WORKER);
84    }));
85  }
86}
87
88
89function worker() {
90  let received = 0;
91
92  process.on('message', common.mustCall((data) => {
93    const { fd } = data;
94    // Create udp socket and start listening.
95    const socket = dgram.createSocket('udp4');
96
97    socket.on('message', common.mustCall((data, info) => {
98      received++;
99
100      // Every 10 messages, notify the primary.
101      if (received === PACKETS_PER_WORKER) {
102        process.send({ received });
103        socket.close();
104      }
105    }, PACKETS_PER_WORKER));
106
107    socket.bind({
108      fd,
109    });
110  }));
111}
112