1// Test the speed of .pipe() with sockets 2'use strict'; 3 4const common = require('../common.js'); 5const PORT = common.PORT; 6 7const bench = common.createBenchmark(main, { 8 sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024], 9 type: ['utf', 'asc', 'buf'], 10 recvbuflen: [0, 64 * 1024, 1024 * 1024], 11 recvbufgenfn: ['true', 'false'], 12 dur: [5] 13}, { 14 test: { sendchunklen: 256 } 15}); 16 17let chunk; 18let encoding; 19let recvbuf; 20let received = 0; 21 22function main({ dur, sendchunklen, type, recvbuflen, recvbufgenfn }) { 23 if (isFinite(recvbuflen) && recvbuflen > 0) 24 recvbuf = Buffer.alloc(recvbuflen); 25 26 switch (type) { 27 case 'buf': 28 chunk = Buffer.alloc(sendchunklen, 'x'); 29 break; 30 case 'utf': 31 encoding = 'utf8'; 32 chunk = 'ü'.repeat(sendchunklen / 2); 33 break; 34 case 'asc': 35 encoding = 'ascii'; 36 chunk = 'x'.repeat(sendchunklen); 37 break; 38 default: 39 throw new Error(`invalid type: ${type}`); 40 } 41 42 const reader = new Reader(); 43 let writer; 44 let socketOpts; 45 if (recvbuf === undefined) { 46 writer = new Writer(); 47 socketOpts = { port: PORT }; 48 } else { 49 let buffer = recvbuf; 50 if (recvbufgenfn === 'true') { 51 let bufidx = -1; 52 const bufpool = [ 53 recvbuf, 54 Buffer.from(recvbuf), 55 Buffer.from(recvbuf), 56 ]; 57 buffer = () => { 58 bufidx = (bufidx + 1) % bufpool.length; 59 return bufpool[bufidx]; 60 }; 61 } 62 socketOpts = { 63 port: PORT, 64 onread: { 65 buffer, 66 callback: function(nread, buf) { 67 received += nread; 68 } 69 } 70 }; 71 } 72 73 // The actual benchmark. 74 const server = net.createServer((socket) => { 75 reader.pipe(socket); 76 }); 77 78 server.listen(PORT, () => { 79 const socket = net.connect(socketOpts); 80 socket.on('connect', () => { 81 bench.start(); 82 83 if (recvbuf === undefined) 84 socket.pipe(writer); 85 86 setTimeout(() => { 87 const bytes = received; 88 const gbits = (bytes * 8) / (1024 * 1024 * 1024); 89 bench.end(gbits); 90 process.exit(0); 91 }, dur * 1000); 92 }); 93 }); 94} 95 96const net = require('net'); 97 98function Writer() { 99 this.writable = true; 100} 101 102Writer.prototype.write = function(chunk, encoding, cb) { 103 received += chunk.length; 104 105 if (typeof encoding === 'function') 106 encoding(); 107 else if (typeof cb === 'function') 108 cb(); 109 110 return true; 111}; 112 113// Doesn't matter, never emits anything. 114Writer.prototype.on = function() {}; 115Writer.prototype.once = function() {}; 116Writer.prototype.emit = function() {}; 117Writer.prototype.prependListener = function() {}; 118 119 120function flow() { 121 const dest = this.dest; 122 const res = dest.write(chunk, encoding); 123 if (!res) 124 dest.once('drain', this.flow); 125 else 126 process.nextTick(this.flow); 127} 128 129function Reader() { 130 this.flow = flow.bind(this); 131 this.readable = true; 132} 133 134Reader.prototype.pipe = function(dest) { 135 this.dest = dest; 136 this.flow(); 137 return dest; 138}; 139