• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const { mustCall } = require('../common');
4const { once } = require('events');
5const { Readable } = require('stream');
6const { strictEqual } = require('assert');
7
8async function toReadableBasicSupport() {
9  async function* generate() {
10    yield 'a';
11    yield 'b';
12    yield 'c';
13  }
14
15  const stream = Readable.from(generate());
16
17  const expected = ['a', 'b', 'c'];
18
19  for await (const chunk of stream) {
20    strictEqual(chunk, expected.shift());
21  }
22}
23
24async function toReadableSyncIterator() {
25  function* generate() {
26    yield 'a';
27    yield 'b';
28    yield 'c';
29  }
30
31  const stream = Readable.from(generate());
32
33  const expected = ['a', 'b', 'c'];
34
35  for await (const chunk of stream) {
36    strictEqual(chunk, expected.shift());
37  }
38}
39
40async function toReadablePromises() {
41  const promises = [
42    Promise.resolve('a'),
43    Promise.resolve('b'),
44    Promise.resolve('c'),
45  ];
46
47  const stream = Readable.from(promises);
48
49  const expected = ['a', 'b', 'c'];
50
51  for await (const chunk of stream) {
52    strictEqual(chunk, expected.shift());
53  }
54}
55
56async function toReadableString() {
57  const stream = Readable.from('abc');
58
59  const expected = ['abc'];
60
61  for await (const chunk of stream) {
62    strictEqual(chunk, expected.shift());
63  }
64}
65
66async function toReadableBuffer() {
67  const stream = Readable.from(Buffer.from('abc'));
68
69  const expected = ['abc'];
70
71  for await (const chunk of stream) {
72    strictEqual(chunk.toString(), expected.shift());
73  }
74}
75
76async function toReadableOnData() {
77  async function* generate() {
78    yield 'a';
79    yield 'b';
80    yield 'c';
81  }
82
83  const stream = Readable.from(generate());
84
85  let iterations = 0;
86  const expected = ['a', 'b', 'c'];
87
88  stream.on('data', (chunk) => {
89    iterations++;
90    strictEqual(chunk, expected.shift());
91  });
92
93  await once(stream, 'end');
94
95  strictEqual(iterations, 3);
96}
97
98async function toReadableOnDataNonObject() {
99  async function* generate() {
100    yield 'a';
101    yield 'b';
102    yield 'c';
103  }
104
105  const stream = Readable.from(generate(), { objectMode: false });
106
107  let iterations = 0;
108  const expected = ['a', 'b', 'c'];
109
110  stream.on('data', (chunk) => {
111    iterations++;
112    strictEqual(chunk instanceof Buffer, true);
113    strictEqual(chunk.toString(), expected.shift());
114  });
115
116  await once(stream, 'end');
117
118  strictEqual(iterations, 3);
119}
120
121async function destroysTheStreamWhenThrowing() {
122  async function* generate() {
123    throw new Error('kaboom');
124  }
125
126  const stream = Readable.from(generate());
127
128  stream.read();
129
130  const [err] = await once(stream, 'error');
131  strictEqual(err.message, 'kaboom');
132  strictEqual(stream.destroyed, true);
133
134}
135
136async function asTransformStream() {
137  async function* generate(stream) {
138    for await (const chunk of stream) {
139      yield chunk.toUpperCase();
140    }
141  }
142
143  const source = new Readable({
144    objectMode: true,
145    read() {
146      this.push('a');
147      this.push('b');
148      this.push('c');
149      this.push(null);
150    }
151  });
152
153  const stream = Readable.from(generate(source));
154
155  const expected = ['A', 'B', 'C'];
156
157  for await (const chunk of stream) {
158    strictEqual(chunk, expected.shift());
159  }
160}
161
162async function endWithError() {
163  async function* generate() {
164    yield 1;
165    yield 2;
166    yield Promise.reject('Boum');
167  }
168
169  const stream = Readable.from(generate());
170
171  const expected = [1, 2];
172
173  try {
174    for await (const chunk of stream) {
175      strictEqual(chunk, expected.shift());
176    }
177    throw new Error();
178  } catch (err) {
179    strictEqual(expected.length, 0);
180    strictEqual(err, 'Boum');
181  }
182}
183
184
185Promise.all([
186  toReadableBasicSupport(),
187  toReadableSyncIterator(),
188  toReadablePromises(),
189  toReadableString(),
190  toReadableBuffer(),
191  toReadableOnData(),
192  toReadableOnDataNonObject(),
193  destroysTheStreamWhenThrowing(),
194  asTransformStream(),
195  endWithError(),
196]).then(mustCall());
197