• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const { Readable, Transform, PassThrough, pipeline } = require('stream');
5const assert = require('assert');
6
7async function tests() {
8  {
9    const AsyncIteratorPrototype = Object.getPrototypeOf(
10      Object.getPrototypeOf(async function* () {}).prototype);
11    const rs = new Readable({});
12    assert.strictEqual(
13      Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())),
14      AsyncIteratorPrototype);
15  }
16
17  {
18    const readable = new Readable({ objectMode: true, read() {} });
19    readable.push(0);
20    readable.push(1);
21    readable.push(null);
22
23    const iter = readable[Symbol.asyncIterator]();
24    assert.strictEqual((await iter.next()).value, 0);
25    for await (const d of iter) {
26      assert.strictEqual(d, 1);
27    }
28  }
29
30  {
31    console.log('read without for..await');
32    const max = 5;
33    const readable = new Readable({
34      objectMode: true,
35      read() {}
36    });
37
38    const iter = readable[Symbol.asyncIterator]();
39    assert.strictEqual(iter.stream, readable);
40    const values = [];
41    for (let i = 0; i < max; i++) {
42      values.push(iter.next());
43    }
44    Promise.all(values).then(common.mustCall((values) => {
45      values.forEach(common.mustCall(
46        (item, i) => assert.strictEqual(item.value, 'hello-' + i), 5));
47    }));
48
49    readable.push('hello-0');
50    readable.push('hello-1');
51    readable.push('hello-2');
52    readable.push('hello-3');
53    readable.push('hello-4');
54    readable.push(null);
55
56    const last = await iter.next();
57    assert.strictEqual(last.done, true);
58  }
59
60  {
61    console.log('read without for..await deferred');
62    const readable = new Readable({
63      objectMode: true,
64      read() {}
65    });
66
67    const iter = readable[Symbol.asyncIterator]();
68    assert.strictEqual(iter.stream, readable);
69    let values = [];
70    for (let i = 0; i < 3; i++) {
71      values.push(iter.next());
72    }
73
74    readable.push('hello-0');
75    readable.push('hello-1');
76    readable.push('hello-2');
77
78    let k = 0;
79    const results1 = await Promise.all(values);
80    results1.forEach(common.mustCall(
81      (item) => assert.strictEqual(item.value, 'hello-' + k++), 3));
82
83    values = [];
84    for (let i = 0; i < 2; i++) {
85      values.push(iter.next());
86    }
87
88    readable.push('hello-3');
89    readable.push('hello-4');
90    readable.push(null);
91
92    const results2 = await Promise.all(values);
93    results2.forEach(common.mustCall(
94      (item) => assert.strictEqual(item.value, 'hello-' + k++), 2));
95
96    const last = await iter.next();
97    assert.strictEqual(last.done, true);
98  }
99
100  {
101    console.log('read without for..await with errors');
102    const max = 3;
103    const readable = new Readable({
104      objectMode: true,
105      read() {}
106    });
107
108    const iter = readable[Symbol.asyncIterator]();
109    assert.strictEqual(iter.stream, readable);
110    const values = [];
111    const errors = [];
112    let i;
113    for (i = 0; i < max; i++) {
114      values.push(iter.next());
115    }
116    for (i = 0; i < 2; i++) {
117      errors.push(iter.next());
118    }
119
120    readable.push('hello-0');
121    readable.push('hello-1');
122    readable.push('hello-2');
123
124    const resolved = await Promise.all(values);
125
126    resolved.forEach(common.mustCall(
127      (item, i) => assert.strictEqual(item.value, 'hello-' + i), max));
128
129    errors.forEach((promise) => {
130      promise.catch(common.mustCall((err) => {
131        assert.strictEqual(err.message, 'kaboom');
132      }));
133    });
134
135    readable.destroy(new Error('kaboom'));
136  }
137
138  {
139    console.log('call next() after error');
140    const readable = new Readable({
141      read() {}
142    });
143    const iterator = readable[Symbol.asyncIterator]();
144
145    const err = new Error('kaboom');
146    readable.destroy(new Error('kaboom'));
147    await assert.rejects(iterator.next.bind(iterator), err);
148  }
149
150  {
151    console.log('read object mode');
152    const max = 42;
153    let readed = 0;
154    let received = 0;
155    const readable = new Readable({
156      objectMode: true,
157      read() {
158        this.push('hello');
159        if (++readed === max) {
160          this.push(null);
161        }
162      }
163    });
164
165    for await (const k of readable) {
166      received++;
167      assert.strictEqual(k, 'hello');
168    }
169
170    assert.strictEqual(readed, received);
171  }
172
173  {
174    console.log('destroy sync');
175    const readable = new Readable({
176      objectMode: true,
177      read() {
178        this.destroy(new Error('kaboom from read'));
179      }
180    });
181
182    let err;
183    try {
184      // eslint-disable-next-line no-unused-vars
185      for await (const k of readable) {}
186    } catch (e) {
187      err = e;
188    }
189    assert.strictEqual(err.message, 'kaboom from read');
190  }
191
192  {
193    console.log('destroy async');
194    const readable = new Readable({
195      objectMode: true,
196      read() {
197        if (!this.pushed) {
198          this.push('hello');
199          this.pushed = true;
200
201          setImmediate(() => {
202            this.destroy(new Error('kaboom'));
203          });
204        }
205      }
206    });
207
208    let received = 0;
209
210    let err = null;
211    try {
212      // eslint-disable-next-line no-unused-vars
213      for await (const k of readable) {
214        received++;
215      }
216    } catch (e) {
217      err = e;
218    }
219
220    assert.strictEqual(err.message, 'kaboom');
221    assert.strictEqual(received, 1);
222  }
223
224  {
225    console.log('destroyed by throw');
226    const readable = new Readable({
227      objectMode: true,
228      read() {
229        this.push('hello');
230      }
231    });
232
233    let err = null;
234    try {
235      for await (const k of readable) {
236        assert.strictEqual(k, 'hello');
237        throw new Error('kaboom');
238      }
239    } catch (e) {
240      err = e;
241    }
242
243    assert.strictEqual(err.message, 'kaboom');
244    assert.strictEqual(readable.destroyed, true);
245  }
246
247  {
248    console.log('destroyed sync after push');
249    const readable = new Readable({
250      objectMode: true,
251      read() {
252        this.push('hello');
253        this.destroy(new Error('kaboom'));
254      }
255    });
256
257    let received = 0;
258
259    let err = null;
260    try {
261      for await (const k of readable) {
262        assert.strictEqual(k, 'hello');
263        received++;
264      }
265    } catch (e) {
266      err = e;
267    }
268
269    assert.strictEqual(err.message, 'kaboom');
270    assert.strictEqual(received, 1);
271  }
272
273  {
274    console.log('push async');
275    const max = 42;
276    let readed = 0;
277    let received = 0;
278    const readable = new Readable({
279      objectMode: true,
280      read() {
281        setImmediate(() => {
282          this.push('hello');
283          if (++readed === max) {
284            this.push(null);
285          }
286        });
287      }
288    });
289
290    for await (const k of readable) {
291      received++;
292      assert.strictEqual(k, 'hello');
293    }
294
295    assert.strictEqual(readed, received);
296  }
297
298  {
299    console.log('push binary async');
300    const max = 42;
301    let readed = 0;
302    const readable = new Readable({
303      read() {
304        setImmediate(() => {
305          this.push('hello');
306          if (++readed === max) {
307            this.push(null);
308          }
309        });
310      }
311    });
312
313    let expected = '';
314    readable.setEncoding('utf8');
315    readable.pause();
316    readable.on('data', (chunk) => {
317      expected += chunk;
318    });
319
320    let data = '';
321    for await (const k of readable) {
322      data += k;
323    }
324
325    assert.strictEqual(data, expected);
326  }
327
328  {
329    console.log('.next() on destroyed stream');
330    const readable = new Readable({
331      read() {
332        // no-op
333      }
334    });
335
336    readable.destroy();
337
338    const { done } = await readable[Symbol.asyncIterator]().next();
339    assert.strictEqual(done, true);
340  }
341
342  {
343    console.log('.next() on pipelined stream');
344    const readable = new Readable({
345      read() {
346        // no-op
347      }
348    });
349
350    const passthrough = new PassThrough();
351    const err = new Error('kaboom');
352    pipeline(readable, passthrough, common.mustCall((e) => {
353      assert.strictEqual(e, err);
354    }));
355    readable.destroy(err);
356    await assert.rejects(
357      readable[Symbol.asyncIterator]().next(),
358      (e) => {
359        assert.strictEqual(e, err);
360        return true;
361      }
362    );
363  }
364
365  {
366    console.log('iterating on an ended stream completes');
367    const r = new Readable({
368      objectMode: true,
369      read() {
370        this.push('asdf');
371        this.push('hehe');
372        this.push(null);
373      }
374    });
375    // eslint-disable-next-line no-unused-vars
376    for await (const a of r) {
377    }
378    // eslint-disable-next-line no-unused-vars
379    for await (const b of r) {
380    }
381  }
382
383  {
384    console.log('destroy mid-stream does not error');
385    const r = new Readable({
386      objectMode: true,
387      read() {
388        this.push('asdf');
389        this.push('hehe');
390      }
391    });
392
393    // eslint-disable-next-line no-unused-vars
394    for await (const a of r) {
395      r.destroy(null);
396    }
397  }
398
399  {
400    console.log('readable side of a transform stream pushes null');
401    const transform = new Transform({
402      objectMode: true,
403      transform: (chunk, enc, cb) => { cb(null, chunk); }
404    });
405    transform.push(0);
406    transform.push(1);
407    process.nextTick(() => {
408      transform.push(null);
409    });
410
411    const mustReach = [ common.mustCall(), common.mustCall() ];
412
413    const iter = transform[Symbol.asyncIterator]();
414    assert.strictEqual((await iter.next()).value, 0);
415
416    for await (const d of iter) {
417      assert.strictEqual(d, 1);
418      mustReach[0]();
419    }
420    mustReach[1]();
421  }
422
423  {
424    console.log('all next promises must be resolved on end');
425    const r = new Readable({
426      objectMode: true,
427      read() {
428      }
429    });
430
431    const b = r[Symbol.asyncIterator]();
432    const c = b.next();
433    const d = b.next();
434    r.push(null);
435    assert.deepStrictEqual(await c, { done: true, value: undefined });
436    assert.deepStrictEqual(await d, { done: true, value: undefined });
437  }
438
439  {
440    console.log('all next promises must be resolved on destroy');
441    const r = new Readable({
442      objectMode: true,
443      read() {
444      }
445    });
446
447    const b = r[Symbol.asyncIterator]();
448    const c = b.next();
449    const d = b.next();
450    r.destroy();
451    assert.deepStrictEqual(await c, { done: true, value: undefined });
452    assert.deepStrictEqual(await d, { done: true, value: undefined });
453  }
454
455  {
456    console.log('all next promises must be resolved on destroy with error');
457    const r = new Readable({
458      objectMode: true,
459      read() {
460      }
461    });
462
463    const b = r[Symbol.asyncIterator]();
464    const c = b.next();
465    const d = b.next();
466    const err = new Error('kaboom');
467    r.destroy(err);
468
469    await Promise.all([(async () => {
470      let e;
471      try {
472        await c;
473      } catch (_e) {
474        e = _e;
475      }
476      assert.strictEqual(e, err);
477    })(), (async () => {
478      let e;
479      try {
480        await d;
481      } catch (_e) {
482        e = _e;
483      }
484      assert.strictEqual(e, err);
485    })()]);
486  }
487}
488
489{
490  // AsyncIterator return should end even when destroy
491  // does not implement the callback API.
492
493  const r = new Readable({
494    objectMode: true,
495    read() {
496    }
497  });
498
499  const originalDestroy = r.destroy;
500  r.destroy = (err) => {
501    originalDestroy.call(r, err);
502  };
503  const it = r[Symbol.asyncIterator]();
504  const p = it.return();
505  r.push(null);
506  p.then(common.mustCall());
507}
508
509
510{
511  // AsyncIterator return should not error with
512  // premature close.
513
514  const r = new Readable({
515    objectMode: true,
516    read() {
517    }
518  });
519
520  const originalDestroy = r.destroy;
521  r.destroy = (err) => {
522    originalDestroy.call(r, err);
523  };
524  const it = r[Symbol.asyncIterator]();
525  const p = it.return();
526  r.emit('close');
527  p.then(common.mustCall()).catch(common.mustNotCall());
528}
529
530{
531  // AsyncIterator should finish correctly if destroyed.
532
533  const r = new Readable({
534    objectMode: true,
535    read() {
536    }
537  });
538
539  r.destroy();
540  r.on('close', () => {
541    const it = r[Symbol.asyncIterator]();
542    const next = it.next();
543    next
544      .then(common.mustCall(({ done }) => assert.strictEqual(done, true)))
545      .catch(common.mustNotCall());
546  });
547}
548
549// To avoid missing some tests if a promise does not resolve
550tests().then(common.mustCall());
551