1'use strict'; 2const common = require('../common'); 3const assert = require('assert'); 4const Readable = require('stream').Readable; 5 6const readable = new Readable({ 7 read: () => {} 8}); 9 10// Initialized to false. 11assert.strictEqual(readable._readableState.needReadable, false); 12 13readable.on('readable', common.mustCall(() => { 14 // When the readable event fires, needReadable is reset. 15 assert.strictEqual(readable._readableState.needReadable, false); 16 readable.read(); 17})); 18 19// If a readable listener is attached, then a readable event is needed. 20assert.strictEqual(readable._readableState.needReadable, true); 21 22readable.push('foo'); 23readable.push(null); 24 25readable.on('end', common.mustCall(() => { 26 // No need to emit readable anymore when the stream ends. 27 assert.strictEqual(readable._readableState.needReadable, false); 28})); 29 30const asyncReadable = new Readable({ 31 read: () => {} 32}); 33 34asyncReadable.on('readable', common.mustCall(() => { 35 if (asyncReadable.read() !== null) { 36 // After each read(), the buffer is empty. 37 // If the stream doesn't end now, 38 // then we need to notify the reader on future changes. 39 assert.strictEqual(asyncReadable._readableState.needReadable, true); 40 } 41}, 2)); 42 43process.nextTick(common.mustCall(() => { 44 asyncReadable.push('foooo'); 45})); 46process.nextTick(common.mustCall(() => { 47 asyncReadable.push('bar'); 48})); 49setImmediate(common.mustCall(() => { 50 asyncReadable.push(null); 51 assert.strictEqual(asyncReadable._readableState.needReadable, false); 52})); 53 54const flowing = new Readable({ 55 read: () => {} 56}); 57 58// Notice this must be above the on('data') call. 59flowing.push('foooo'); 60flowing.push('bar'); 61flowing.push('quo'); 62process.nextTick(common.mustCall(() => { 63 flowing.push(null); 64})); 65 66// When the buffer already has enough data, and the stream is 67// in flowing mode, there is no need for the readable event. 68flowing.on('data', common.mustCall(function(data) { 69 assert.strictEqual(flowing._readableState.needReadable, false); 70}, 3)); 71 72const slowProducer = new Readable({ 73 read: () => {} 74}); 75 76slowProducer.on('readable', common.mustCall(() => { 77 const chunk = slowProducer.read(8); 78 const state = slowProducer._readableState; 79 if (chunk === null) { 80 // The buffer doesn't have enough data, and the stream is not need, 81 // we need to notify the reader when data arrives. 82 assert.strictEqual(state.needReadable, true); 83 } else { 84 assert.strictEqual(state.needReadable, false); 85 } 86}, 4)); 87 88process.nextTick(common.mustCall(() => { 89 slowProducer.push('foo'); 90 process.nextTick(common.mustCall(() => { 91 slowProducer.push('foo'); 92 process.nextTick(common.mustCall(() => { 93 slowProducer.push('foo'); 94 process.nextTick(common.mustCall(() => { 95 slowProducer.push(null); 96 })); 97 })); 98 })); 99})); 100