1'use strict'; 2const common = require('../common'); 3const stream = require('stream'); 4const assert = require('assert'); 5 6// A consumer stream with a very low highWaterMark, which starts in a state 7// where it buffers the chunk it receives rather than indicating that they 8// have been consumed. 9const writable = new stream.Writable({ 10 highWaterMark: 5 11}); 12 13let isCurrentlyBufferingWrites = true; 14const queue = []; 15 16writable._write = (chunk, encoding, cb) => { 17 if (isCurrentlyBufferingWrites) 18 queue.push({ chunk, cb }); 19 else 20 cb(); 21}; 22 23const readable = new stream.Readable({ 24 read() {} 25}); 26 27readable.pipe(writable); 28 29readable.once('pause', common.mustCall(() => { 30 assert.strictEqual( 31 readable._readableState.awaitDrainWriters, 32 writable, 33 'Expected awaitDrainWriters to be a Writable but instead got ' + 34 `${readable._readableState.awaitDrainWriters}` 35 ); 36 // First pause, resume manually. The next write() to writable will still 37 // return false, because chunks are still being buffered, so it will increase 38 // the awaitDrain counter again. 39 40 process.nextTick(common.mustCall(() => { 41 readable.resume(); 42 })); 43 44 readable.once('pause', common.mustCall(() => { 45 assert.strictEqual( 46 readable._readableState.awaitDrainWriters, 47 writable, 48 '.resume() should not reset the awaitDrainWriters, but instead got ' + 49 `${readable._readableState.awaitDrainWriters}` 50 ); 51 // Second pause, handle all chunks from now on. Once all callbacks that 52 // are currently queued up are handled, the awaitDrain drain counter should 53 // fall back to 0 and all chunks that are pending on the readable side 54 // should be flushed. 55 isCurrentlyBufferingWrites = false; 56 for (const queued of queue) 57 queued.cb(); 58 })); 59})); 60 61readable.push(Buffer.alloc(100)); // Fill the writable HWM, first 'pause'. 62readable.push(Buffer.alloc(100)); // Second 'pause'. 63readable.push(Buffer.alloc(100)); // Should get through to the writable. 64readable.push(null); 65 66writable.on('finish', common.mustCall(() => { 67 assert.strictEqual( 68 readable._readableState.awaitDrainWriters, 69 null, 70 `awaitDrainWriters should be reset to null 71 after all chunks are written but instead got 72 ${readable._readableState.awaitDrainWriters}` 73 ); 74 // Everything okay, all chunks were written. 75})); 76