• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Test the speed of .pipe() with JSStream wrapping for PassThrough streams
2'use strict';
3
4const common = require('../common.js');
5const { PassThrough } = require('stream');
6
7const bench = common.createBenchmark(main, {
8  len: [64, 102400, 1024 * 1024 * 16],
9  type: ['utf', 'asc', 'buf'],
10  dur: [5],
11}, {
12  test: { len: 64 },
13  flags: ['--expose-internals']
14});
15
16let chunk;
17let encoding;
18
19function main({ dur, len, type }) {
20  // Can only require internals inside main().
21  const JSStreamWrap = require('internal/js_stream_socket');
22
23  switch (type) {
24    case 'buf':
25      chunk = Buffer.alloc(len, 'x');
26      break;
27    case 'utf':
28      encoding = 'utf8';
29      chunk = 'ü'.repeat(len / 2);
30      break;
31    case 'asc':
32      encoding = 'ascii';
33      chunk = 'x'.repeat(len);
34      break;
35    default:
36      throw new Error(`invalid type: ${type}`);
37  }
38
39  const reader = new Reader();
40  const writer = new Writer();
41
42  // The actual benchmark.
43  const fakeSocket = new JSStreamWrap(new PassThrough());
44  bench.start();
45  reader.pipe(fakeSocket);
46  fakeSocket.pipe(writer);
47
48  setTimeout(() => {
49    const bytes = writer.received;
50    const gbits = (bytes * 8) / (1024 * 1024 * 1024);
51    bench.end(gbits);
52    process.exit(0);
53  }, dur * 1000);
54}
55
56function Writer() {
57  this.received = 0;
58  this.writable = true;
59}
60
61Writer.prototype.write = function(chunk, encoding, cb) {
62  this.received += chunk.length;
63
64  if (typeof encoding === 'function')
65    encoding();
66  else if (typeof cb === 'function')
67    cb();
68
69  return true;
70};
71
72// Doesn't matter, never emits anything.
73Writer.prototype.on = function() {};
74Writer.prototype.once = function() {};
75Writer.prototype.emit = function() {};
76Writer.prototype.prependListener = function() {};
77
78
79function flow() {
80  const dest = this.dest;
81  const res = dest.write(chunk, encoding);
82  if (!res)
83    dest.once('drain', this.flow);
84  else
85    process.nextTick(this.flow);
86}
87
88function Reader() {
89  this.flow = flow.bind(this);
90  this.readable = true;
91}
92
93Reader.prototype.pipe = function(dest) {
94  this.dest = dest;
95  this.flow();
96  return dest;
97};
98