• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const assert = require('assert');
5const { Readable } = require('stream');
6
7const MAX = 42;
8const BATCH = 10;
9
10{
11  const readable = new Readable({
12    objectMode: true,
13    read: common.mustCall(function() {
14      console.log('>> READ');
15      fetchData((err, data) => {
16        if (err) {
17          this.destroy(err);
18          return;
19        }
20
21        if (data.length === 0) {
22          console.log('pushing null');
23          this.push(null);
24          return;
25        }
26
27        console.log('pushing');
28        data.forEach((d) => this.push(d));
29      });
30    }, Math.floor(MAX / BATCH) + 2)
31  });
32
33  let i = 0;
34  function fetchData(cb) {
35    if (i > MAX) {
36      setTimeout(cb, 10, null, []);
37    } else {
38      const array = [];
39      const max = i + BATCH;
40      for (; i < max; i++) {
41        array.push(i);
42      }
43      setTimeout(cb, 10, null, array);
44    }
45  }
46
47  readable.on('readable', () => {
48    let data;
49    console.log('readable emitted');
50    while (data = readable.read()) {
51      console.log(data);
52    }
53  });
54
55  readable.on('end', common.mustCall(() => {
56    assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
57  }));
58}
59
60{
61  const readable = new Readable({
62    objectMode: true,
63    read: common.mustCall(function() {
64      console.log('>> READ');
65      fetchData((err, data) => {
66        if (err) {
67          this.destroy(err);
68          return;
69        }
70
71        if (data.length === 0) {
72          console.log('pushing null');
73          this.push(null);
74          return;
75        }
76
77        console.log('pushing');
78        data.forEach((d) => this.push(d));
79      });
80    }, Math.floor(MAX / BATCH) + 2)
81  });
82
83  let i = 0;
84  function fetchData(cb) {
85    if (i > MAX) {
86      setTimeout(cb, 10, null, []);
87    } else {
88      const array = [];
89      const max = i + BATCH;
90      for (; i < max; i++) {
91        array.push(i);
92      }
93      setTimeout(cb, 10, null, array);
94    }
95  }
96
97  readable.on('data', (data) => {
98    console.log('data emitted', data);
99  });
100
101  readable.on('end', common.mustCall(() => {
102    assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
103  }));
104}
105
106{
107  const readable = new Readable({
108    objectMode: true,
109    read: common.mustCall(function() {
110      console.log('>> READ');
111      fetchData((err, data) => {
112        if (err) {
113          this.destroy(err);
114          return;
115        }
116
117        console.log('pushing');
118        data.forEach((d) => this.push(d));
119
120        if (data[BATCH - 1] >= MAX) {
121          console.log('pushing null');
122          this.push(null);
123        }
124      });
125    }, Math.floor(MAX / BATCH) + 1)
126  });
127
128  let i = 0;
129  function fetchData(cb) {
130    const array = [];
131    const max = i + BATCH;
132    for (; i < max; i++) {
133      array.push(i);
134    }
135    setTimeout(cb, 10, null, array);
136  }
137
138  readable.on('data', (data) => {
139    console.log('data emitted', data);
140  });
141
142  readable.on('end', common.mustCall(() => {
143    assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH);
144  }));
145}
146
147{
148  const readable = new Readable({
149    objectMode: true,
150    read: common.mustNotCall()
151  });
152
153  readable.on('data', common.mustNotCall());
154
155  readable.push(null);
156
157  let nextTickPassed = false;
158  process.nextTick(() => {
159    nextTickPassed = true;
160  });
161
162  readable.on('end', common.mustCall(() => {
163    assert.strictEqual(nextTickPassed, true);
164  }));
165}
166
167{
168  const readable = new Readable({
169    objectMode: true,
170    read: common.mustCall()
171  });
172
173  readable.on('data', (data) => {
174    console.log('data emitted', data);
175  });
176
177  readable.on('end', common.mustCall());
178
179  setImmediate(() => {
180    readable.push('aaa');
181    readable.push(null);
182  });
183}
184