• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const { mustCall } = require('../common');
4const { once } = require('events');
5const { Readable } = require('stream');
6const { strictEqual, throws } = require('assert');
7const common = require('../common');
8
9{
10  throws(() => {
11    Readable.from(null);
12  }, /ERR_INVALID_ARG_TYPE/);
13}
14
15async function toReadableBasicSupport() {
16  async function* generate() {
17    yield 'a';
18    yield 'b';
19    yield 'c';
20  }
21
22  const stream = Readable.from(generate());
23
24  const expected = ['a', 'b', 'c'];
25
26  for await (const chunk of stream) {
27    strictEqual(chunk, expected.shift());
28  }
29}
30
31async function toReadableSyncIterator() {
32  function* generate() {
33    yield 'a';
34    yield 'b';
35    yield 'c';
36  }
37
38  const stream = Readable.from(generate());
39
40  const expected = ['a', 'b', 'c'];
41
42  for await (const chunk of stream) {
43    strictEqual(chunk, expected.shift());
44  }
45}
46
47async function toReadablePromises() {
48  const promises = [
49    Promise.resolve('a'),
50    Promise.resolve('b'),
51    Promise.resolve('c'),
52  ];
53
54  const stream = Readable.from(promises);
55
56  const expected = ['a', 'b', 'c'];
57
58  for await (const chunk of stream) {
59    strictEqual(chunk, expected.shift());
60  }
61}
62
63async function toReadableString() {
64  const stream = Readable.from('abc');
65
66  const expected = ['abc'];
67
68  for await (const chunk of stream) {
69    strictEqual(chunk, expected.shift());
70  }
71}
72
73async function toReadableBuffer() {
74  const stream = Readable.from(Buffer.from('abc'));
75
76  const expected = ['abc'];
77
78  for await (const chunk of stream) {
79    strictEqual(chunk.toString(), expected.shift());
80  }
81}
82
83async function toReadableOnData() {
84  async function* generate() {
85    yield 'a';
86    yield 'b';
87    yield 'c';
88  }
89
90  const stream = Readable.from(generate());
91
92  let iterations = 0;
93  const expected = ['a', 'b', 'c'];
94
95  stream.on('data', (chunk) => {
96    iterations++;
97    strictEqual(chunk, expected.shift());
98  });
99
100  await once(stream, 'end');
101
102  strictEqual(iterations, 3);
103}
104
105async function toReadableOnDataNonObject() {
106  async function* generate() {
107    yield 'a';
108    yield 'b';
109    yield 'c';
110  }
111
112  const stream = Readable.from(generate(), { objectMode: false });
113
114  let iterations = 0;
115  const expected = ['a', 'b', 'c'];
116
117  stream.on('data', (chunk) => {
118    iterations++;
119    strictEqual(chunk instanceof Buffer, true);
120    strictEqual(chunk.toString(), expected.shift());
121  });
122
123  await once(stream, 'end');
124
125  strictEqual(iterations, 3);
126}
127
128async function destroysTheStreamWhenThrowing() {
129  async function* generate() { // eslint-disable-line require-yield
130    throw new Error('kaboom');
131  }
132
133  const stream = Readable.from(generate());
134
135  stream.read();
136
137  const [err] = await once(stream, 'error');
138  strictEqual(err.message, 'kaboom');
139  strictEqual(stream.destroyed, true);
140
141}
142
143async function asTransformStream() {
144  async function* generate(stream) {
145    for await (const chunk of stream) {
146      yield chunk.toUpperCase();
147    }
148  }
149
150  const source = new Readable({
151    objectMode: true,
152    read() {
153      this.push('a');
154      this.push('b');
155      this.push('c');
156      this.push(null);
157    }
158  });
159
160  const stream = Readable.from(generate(source));
161
162  const expected = ['A', 'B', 'C'];
163
164  for await (const chunk of stream) {
165    strictEqual(chunk, expected.shift());
166  }
167}
168
169async function endWithError() {
170  async function* generate() {
171    yield 1;
172    yield 2;
173    yield Promise.reject('Boum');
174  }
175
176  const stream = Readable.from(generate());
177
178  const expected = [1, 2];
179
180  try {
181    for await (const chunk of stream) {
182      strictEqual(chunk, expected.shift());
183    }
184    throw new Error();
185  } catch (err) {
186    strictEqual(expected.length, 0);
187    strictEqual(err, 'Boum');
188  }
189}
190
191async function destroyingStreamWithErrorThrowsInGenerator() {
192  const validateError = common.mustCall((e) => {
193    strictEqual(e, 'Boum');
194  });
195  async function* generate() {
196    try {
197      yield 1;
198      yield 2;
199      yield 3;
200      throw new Error();
201    } catch (e) {
202      validateError(e);
203    }
204  }
205  const stream = Readable.from(generate());
206  stream.read();
207  stream.once('error', common.mustCall());
208  stream.destroy('Boum');
209}
210
211Promise.all([
212  toReadableBasicSupport(),
213  toReadableSyncIterator(),
214  toReadablePromises(),
215  toReadableString(),
216  toReadableBuffer(),
217  toReadableOnData(),
218  toReadableOnDataNonObject(),
219  destroysTheStreamWhenThrowing(),
220  asTransformStream(),
221  endWithError(),
222  destroyingStreamWithErrorThrowsInGenerator(),
223]).then(mustCall());
224