• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Test the speed of .pipe() with sockets
2'use strict';
3
4const common = require('../common.js');
5const PORT = common.PORT;
6
7const bench = common.createBenchmark(main, {
8  sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
9  type: ['utf', 'asc', 'buf'],
10  recvbuflen: [0, 64 * 1024, 1024 * 1024],
11  recvbufgenfn: ['true', 'false'],
12  dur: [5]
13}, {
14  test: { sendchunklen: 256 }
15});
16
17let chunk;
18let encoding;
19let recvbuf;
20let received = 0;
21
22function main({ dur, sendchunklen, type, recvbuflen, recvbufgenfn }) {
23  if (isFinite(recvbuflen) && recvbuflen > 0)
24    recvbuf = Buffer.alloc(recvbuflen);
25
26  switch (type) {
27    case 'buf':
28      chunk = Buffer.alloc(sendchunklen, 'x');
29      break;
30    case 'utf':
31      encoding = 'utf8';
32      chunk = 'ü'.repeat(sendchunklen / 2);
33      break;
34    case 'asc':
35      encoding = 'ascii';
36      chunk = 'x'.repeat(sendchunklen);
37      break;
38    default:
39      throw new Error(`invalid type: ${type}`);
40  }
41
42  const reader = new Reader();
43  let writer;
44  let socketOpts;
45  if (recvbuf === undefined) {
46    writer = new Writer();
47    socketOpts = { port: PORT };
48  } else {
49    let buffer = recvbuf;
50    if (recvbufgenfn === 'true') {
51      let bufidx = -1;
52      const bufpool = [
53        recvbuf,
54        Buffer.from(recvbuf),
55        Buffer.from(recvbuf),
56      ];
57      buffer = () => {
58        bufidx = (bufidx + 1) % bufpool.length;
59        return bufpool[bufidx];
60      };
61    }
62    socketOpts = {
63      port: PORT,
64      onread: {
65        buffer,
66        callback: function(nread, buf) {
67          received += nread;
68        }
69      }
70    };
71  }
72
73  // The actual benchmark.
74  const server = net.createServer((socket) => {
75    reader.pipe(socket);
76  });
77
78  server.listen(PORT, () => {
79    const socket = net.connect(socketOpts);
80    socket.on('connect', () => {
81      bench.start();
82
83      if (recvbuf === undefined)
84        socket.pipe(writer);
85
86      setTimeout(() => {
87        const bytes = received;
88        const gbits = (bytes * 8) / (1024 * 1024 * 1024);
89        bench.end(gbits);
90        process.exit(0);
91      }, dur * 1000);
92    });
93  });
94}
95
96const net = require('net');
97
98function Writer() {
99  this.writable = true;
100}
101
102Writer.prototype.write = function(chunk, encoding, cb) {
103  received += chunk.length;
104
105  if (typeof encoding === 'function')
106    encoding();
107  else if (typeof cb === 'function')
108    cb();
109
110  return true;
111};
112
113// Doesn't matter, never emits anything.
114Writer.prototype.on = function() {};
115Writer.prototype.once = function() {};
116Writer.prototype.emit = function() {};
117Writer.prototype.prependListener = function() {};
118
119
120function flow() {
121  const dest = this.dest;
122  const res = dest.write(chunk, encoding);
123  if (!res)
124    dest.once('drain', this.flow);
125  else
126    process.nextTick(this.flow);
127}
128
129function Reader() {
130  this.flow = flow.bind(this);
131  this.readable = true;
132}
133
134Reader.prototype.pipe = function(dest) {
135  this.dest = dest;
136  this.flow();
137  return dest;
138};
139