1'use strict'; 2 3const common = require('../common'); 4const assert = require('assert'); 5const { Readable } = require('stream'); 6 7const MAX = 42; 8const BATCH = 10; 9 10{ 11 const readable = new Readable({ 12 objectMode: true, 13 read: common.mustCall(function() { 14 console.log('>> READ'); 15 fetchData((err, data) => { 16 if (err) { 17 this.destroy(err); 18 return; 19 } 20 21 if (data.length === 0) { 22 console.log('pushing null'); 23 this.push(null); 24 return; 25 } 26 27 console.log('pushing'); 28 data.forEach((d) => this.push(d)); 29 }); 30 }, Math.floor(MAX / BATCH) + 2) 31 }); 32 33 let i = 0; 34 function fetchData(cb) { 35 if (i > MAX) { 36 setTimeout(cb, 10, null, []); 37 } else { 38 const array = []; 39 const max = i + BATCH; 40 for (; i < max; i++) { 41 array.push(i); 42 } 43 setTimeout(cb, 10, null, array); 44 } 45 } 46 47 readable.on('readable', () => { 48 let data; 49 console.log('readable emitted'); 50 while (data = readable.read()) { 51 console.log(data); 52 } 53 }); 54 55 readable.on('end', common.mustCall(() => { 56 assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH); 57 })); 58} 59 60{ 61 const readable = new Readable({ 62 objectMode: true, 63 read: common.mustCall(function() { 64 console.log('>> READ'); 65 fetchData((err, data) => { 66 if (err) { 67 this.destroy(err); 68 return; 69 } 70 71 if (data.length === 0) { 72 console.log('pushing null'); 73 this.push(null); 74 return; 75 } 76 77 console.log('pushing'); 78 data.forEach((d) => this.push(d)); 79 }); 80 }, Math.floor(MAX / BATCH) + 2) 81 }); 82 83 let i = 0; 84 function fetchData(cb) { 85 if (i > MAX) { 86 setTimeout(cb, 10, null, []); 87 } else { 88 const array = []; 89 const max = i + BATCH; 90 for (; i < max; i++) { 91 array.push(i); 92 } 93 setTimeout(cb, 10, null, array); 94 } 95 } 96 97 readable.on('data', (data) => { 98 console.log('data emitted', data); 99 }); 100 101 readable.on('end', common.mustCall(() => { 102 assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH); 103 })); 104} 105 106{ 107 const readable = new Readable({ 108 objectMode: true, 109 read: common.mustCall(function() { 110 console.log('>> READ'); 111 fetchData((err, data) => { 112 if (err) { 113 this.destroy(err); 114 return; 115 } 116 117 console.log('pushing'); 118 data.forEach((d) => this.push(d)); 119 120 if (data[BATCH - 1] >= MAX) { 121 console.log('pushing null'); 122 this.push(null); 123 } 124 }); 125 }, Math.floor(MAX / BATCH) + 1) 126 }); 127 128 let i = 0; 129 function fetchData(cb) { 130 const array = []; 131 const max = i + BATCH; 132 for (; i < max; i++) { 133 array.push(i); 134 } 135 setTimeout(cb, 10, null, array); 136 } 137 138 readable.on('data', (data) => { 139 console.log('data emitted', data); 140 }); 141 142 readable.on('end', common.mustCall(() => { 143 assert.strictEqual(i, (Math.floor(MAX / BATCH) + 1) * BATCH); 144 })); 145} 146 147{ 148 const readable = new Readable({ 149 objectMode: true, 150 read: common.mustNotCall() 151 }); 152 153 readable.on('data', common.mustNotCall()); 154 155 readable.push(null); 156 157 let nextTickPassed = false; 158 process.nextTick(() => { 159 nextTickPassed = true; 160 }); 161 162 readable.on('end', common.mustCall(() => { 163 assert.strictEqual(nextTickPassed, true); 164 })); 165} 166 167{ 168 const readable = new Readable({ 169 objectMode: true, 170 read: common.mustCall() 171 }); 172 173 readable.on('data', (data) => { 174 console.log('data emitted', data); 175 }); 176 177 readable.on('end', common.mustCall()); 178 179 setImmediate(() => { 180 readable.push('aaa'); 181 readable.push(null); 182 }); 183} 184