• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2const common = require('../common');
3const assert = require('assert');
4const Readable = require('stream').Readable;
5
6const readable = new Readable({
7  read: () => {}
8});
9
10// Initialized to false.
11assert.strictEqual(readable._readableState.needReadable, false);
12
13readable.on('readable', common.mustCall(() => {
14  // When the readable event fires, needReadable is reset.
15  assert.strictEqual(readable._readableState.needReadable, false);
16  readable.read();
17}));
18
19// If a readable listener is attached, then a readable event is needed.
20assert.strictEqual(readable._readableState.needReadable, true);
21
22readable.push('foo');
23readable.push(null);
24
25readable.on('end', common.mustCall(() => {
26  // No need to emit readable anymore when the stream ends.
27  assert.strictEqual(readable._readableState.needReadable, false);
28}));
29
30const asyncReadable = new Readable({
31  read: () => {}
32});
33
34asyncReadable.on('readable', common.mustCall(() => {
35  if (asyncReadable.read() !== null) {
36    // After each read(), the buffer is empty.
37    // If the stream doesn't end now,
38    // then we need to notify the reader on future changes.
39    assert.strictEqual(asyncReadable._readableState.needReadable, true);
40  }
41}, 2));
42
43process.nextTick(common.mustCall(() => {
44  asyncReadable.push('foooo');
45}));
46process.nextTick(common.mustCall(() => {
47  asyncReadable.push('bar');
48}));
49setImmediate(common.mustCall(() => {
50  asyncReadable.push(null);
51  assert.strictEqual(asyncReadable._readableState.needReadable, false);
52}));
53
54const flowing = new Readable({
55  read: () => {}
56});
57
58// Notice this must be above the on('data') call.
59flowing.push('foooo');
60flowing.push('bar');
61flowing.push('quo');
62process.nextTick(common.mustCall(() => {
63  flowing.push(null);
64}));
65
66// When the buffer already has enough data, and the stream is
67// in flowing mode, there is no need for the readable event.
68flowing.on('data', common.mustCall(function(data) {
69  assert.strictEqual(flowing._readableState.needReadable, false);
70}, 3));
71
72const slowProducer = new Readable({
73  read: () => {}
74});
75
76slowProducer.on('readable', common.mustCall(() => {
77  const chunk = slowProducer.read(8);
78  const state = slowProducer._readableState;
79  if (chunk === null) {
80    // The buffer doesn't have enough data, and the stream is not need,
81    // we need to notify the reader when data arrives.
82    assert.strictEqual(state.needReadable, true);
83  } else {
84    assert.strictEqual(state.needReadable, false);
85  }
86}, 4));
87
88process.nextTick(common.mustCall(() => {
89  slowProducer.push('foo');
90  process.nextTick(common.mustCall(() => {
91    slowProducer.push('foo');
92    process.nextTick(common.mustCall(() => {
93      slowProducer.push('foo');
94      process.nextTick(common.mustCall(() => {
95        slowProducer.push(null);
96      }));
97    }));
98  }));
99}));
100