1'use strict'; 2 3const common = require('../common'); 4const assert = require('assert'); 5const { Readable } = require('stream'); 6 7let ticks = 18; 8let expectedData = 19; 9 10const rs = new Readable({ 11 objectMode: true, 12 read: () => { 13 if (ticks-- > 0) 14 return process.nextTick(() => rs.push({})); 15 rs.push({}); 16 rs.push(null); 17 } 18}); 19 20rs.on('end', common.mustCall()); 21readAndPause(); 22 23function readAndPause() { 24 // Does a on(data) -> pause -> wait -> resume -> on(data) ... loop. 25 // Expects on(data) to never fire if the stream is paused. 26 const ondata = common.mustCall((data) => { 27 rs.pause(); 28 29 expectedData--; 30 if (expectedData <= 0) 31 return; 32 33 setImmediate(function() { 34 rs.removeListener('data', ondata); 35 readAndPause(); 36 rs.resume(); 37 }); 38 }, 1); // Only call ondata once 39 40 rs.on('data', ondata); 41} 42 43{ 44 const readable = new Readable({ 45 read() {} 46 }); 47 48 function read() {} 49 50 readable.setEncoding('utf8'); 51 readable.on('readable', read); 52 readable.removeListener('readable', read); 53 readable.pause(); 54 55 process.nextTick(function() { 56 assert(readable.isPaused()); 57 }); 58} 59 60{ 61 const { PassThrough } = require('stream'); 62 63 const source3 = new PassThrough(); 64 const target3 = new PassThrough(); 65 66 const chunk = Buffer.allocUnsafe(1000); 67 while (target3.write(chunk)); 68 69 source3.pipe(target3); 70 target3.on('drain', common.mustCall(() => { 71 assert(!source3.isPaused()); 72 })); 73 target3.on('data', () => {}); 74} 75