1'use strict'; 2 3const { mustCall } = require('../common'); 4const { once } = require('events'); 5const { Readable } = require('stream'); 6const { strictEqual, throws } = require('assert'); 7const common = require('../common'); 8 9{ 10 throws(() => { 11 Readable.from(null); 12 }, /ERR_INVALID_ARG_TYPE/); 13} 14 15async function toReadableBasicSupport() { 16 async function* generate() { 17 yield 'a'; 18 yield 'b'; 19 yield 'c'; 20 } 21 22 const stream = Readable.from(generate()); 23 24 const expected = ['a', 'b', 'c']; 25 26 for await (const chunk of stream) { 27 strictEqual(chunk, expected.shift()); 28 } 29} 30 31async function toReadableSyncIterator() { 32 function* generate() { 33 yield 'a'; 34 yield 'b'; 35 yield 'c'; 36 } 37 38 const stream = Readable.from(generate()); 39 40 const expected = ['a', 'b', 'c']; 41 42 for await (const chunk of stream) { 43 strictEqual(chunk, expected.shift()); 44 } 45} 46 47async function toReadablePromises() { 48 const promises = [ 49 Promise.resolve('a'), 50 Promise.resolve('b'), 51 Promise.resolve('c'), 52 ]; 53 54 const stream = Readable.from(promises); 55 56 const expected = ['a', 'b', 'c']; 57 58 for await (const chunk of stream) { 59 strictEqual(chunk, expected.shift()); 60 } 61} 62 63async function toReadableString() { 64 const stream = Readable.from('abc'); 65 66 const expected = ['abc']; 67 68 for await (const chunk of stream) { 69 strictEqual(chunk, expected.shift()); 70 } 71} 72 73async function toReadableBuffer() { 74 const stream = Readable.from(Buffer.from('abc')); 75 76 const expected = ['abc']; 77 78 for await (const chunk of stream) { 79 strictEqual(chunk.toString(), expected.shift()); 80 } 81} 82 83async function toReadableOnData() { 84 async function* generate() { 85 yield 'a'; 86 yield 'b'; 87 yield 'c'; 88 } 89 90 const stream = Readable.from(generate()); 91 92 let iterations = 0; 93 const expected = ['a', 'b', 'c']; 94 95 stream.on('data', (chunk) => { 96 iterations++; 97 strictEqual(chunk, expected.shift()); 98 }); 99 100 await once(stream, 'end'); 101 102 strictEqual(iterations, 3); 103} 104 105async function toReadableOnDataNonObject() { 106 async function* generate() { 107 yield 'a'; 108 yield 'b'; 109 yield 'c'; 110 } 111 112 const stream = Readable.from(generate(), { objectMode: false }); 113 114 let iterations = 0; 115 const expected = ['a', 'b', 'c']; 116 117 stream.on('data', (chunk) => { 118 iterations++; 119 strictEqual(chunk instanceof Buffer, true); 120 strictEqual(chunk.toString(), expected.shift()); 121 }); 122 123 await once(stream, 'end'); 124 125 strictEqual(iterations, 3); 126} 127 128async function destroysTheStreamWhenThrowing() { 129 async function* generate() { // eslint-disable-line require-yield 130 throw new Error('kaboom'); 131 } 132 133 const stream = Readable.from(generate()); 134 135 stream.read(); 136 137 const [err] = await once(stream, 'error'); 138 strictEqual(err.message, 'kaboom'); 139 strictEqual(stream.destroyed, true); 140 141} 142 143async function asTransformStream() { 144 async function* generate(stream) { 145 for await (const chunk of stream) { 146 yield chunk.toUpperCase(); 147 } 148 } 149 150 const source = new Readable({ 151 objectMode: true, 152 read() { 153 this.push('a'); 154 this.push('b'); 155 this.push('c'); 156 this.push(null); 157 } 158 }); 159 160 const stream = Readable.from(generate(source)); 161 162 const expected = ['A', 'B', 'C']; 163 164 for await (const chunk of stream) { 165 strictEqual(chunk, expected.shift()); 166 } 167} 168 169async function endWithError() { 170 async function* generate() { 171 yield 1; 172 yield 2; 173 yield Promise.reject('Boum'); 174 } 175 176 const stream = Readable.from(generate()); 177 178 const expected = [1, 2]; 179 180 try { 181 for await (const chunk of stream) { 182 strictEqual(chunk, expected.shift()); 183 } 184 throw new Error(); 185 } catch (err) { 186 strictEqual(expected.length, 0); 187 strictEqual(err, 'Boum'); 188 } 189} 190 191async function destroyingStreamWithErrorThrowsInGenerator() { 192 const validateError = common.mustCall((e) => { 193 strictEqual(e, 'Boum'); 194 }); 195 async function* generate() { 196 try { 197 yield 1; 198 yield 2; 199 yield 3; 200 throw new Error(); 201 } catch (e) { 202 validateError(e); 203 } 204 } 205 const stream = Readable.from(generate()); 206 stream.read(); 207 stream.once('error', common.mustCall()); 208 stream.destroy('Boum'); 209} 210 211Promise.all([ 212 toReadableBasicSupport(), 213 toReadableSyncIterator(), 214 toReadablePromises(), 215 toReadableString(), 216 toReadableBuffer(), 217 toReadableOnData(), 218 toReadableOnDataNonObject(), 219 destroysTheStreamWhenThrowing(), 220 asTransformStream(), 221 endWithError(), 222 destroyingStreamWithErrorThrowsInGenerator(), 223]).then(mustCall()); 224