1'use strict'; 2const common = require('../common'); 3const stream = require('stream'); 4const assert = require('assert'); 5 6// This is very similar to test-stream-pipe-cleanup-pause.js. 7 8const reader = new stream.Readable(); 9const writer1 = new stream.Writable(); 10const writer2 = new stream.Writable(); 11const writer3 = new stream.Writable(); 12 13// 560000 is chosen here because it is larger than the (default) highWaterMark 14// and will cause `.write()` to return false 15// See: https://github.com/nodejs/node/issues/5820 16const buffer = Buffer.allocUnsafe(560000); 17 18reader._read = () => {}; 19 20writer1._write = common.mustCall(function(chunk, encoding, cb) { 21 this.emit('chunk-received'); 22 process.nextTick(cb); 23}, 1); 24 25writer1.once('chunk-received', () => { 26 assert.strictEqual( 27 reader._readableState.awaitDrainWriters.size, 28 0, 29 'awaitDrain initial value should be 0, actual is ' + 30 reader._readableState.awaitDrainWriters.size 31 ); 32 setImmediate(() => { 33 // This one should *not* get through to writer1 because writer2 is not 34 // "done" processing. 35 reader.push(buffer); 36 }); 37}); 38 39// A "slow" consumer: 40writer2._write = common.mustCall((chunk, encoding, cb) => { 41 assert.strictEqual( 42 reader._readableState.awaitDrainWriters.size, 43 1, 44 'awaitDrain should be 1 after first push, actual is ' + 45 reader._readableState.awaitDrainWriters.size 46 ); 47 // Not calling cb here to "simulate" slow stream. 48 // This should be called exactly once, since the first .write() call 49 // will return false. 50}, 1); 51 52writer3._write = common.mustCall((chunk, encoding, cb) => { 53 assert.strictEqual( 54 reader._readableState.awaitDrainWriters.size, 55 2, 56 'awaitDrain should be 2 after second push, actual is ' + 57 reader._readableState.awaitDrainWriters.size 58 ); 59 // Not calling cb here to "simulate" slow stream. 60 // This should be called exactly once, since the first .write() call 61 // will return false. 62}, 1); 63 64reader.pipe(writer1); 65reader.pipe(writer2); 66reader.pipe(writer3); 67reader.push(buffer); 68