• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const {
5  Stream,
6  Readable,
7  Transform,
8  PassThrough,
9  pipeline
10} = require('stream');
11const assert = require('assert');
12const http = require('http');
13
14async function tests() {
15  {
16    // v1 stream
17
18    const stream = new Stream();
19    stream.destroy = common.mustCall();
20    process.nextTick(() => {
21      stream.emit('data', 'hello');
22      stream.emit('data', 'world');
23      stream.emit('end');
24    });
25
26    let res = '';
27    stream[Symbol.asyncIterator] = Readable.prototype[Symbol.asyncIterator];
28    for await (const d of stream) {
29      res += d;
30    }
31    assert.strictEqual(res, 'helloworld');
32  }
33
34  {
35    // v1 stream error
36
37    const stream = new Stream();
38    stream.close = common.mustCall();
39    process.nextTick(() => {
40      stream.emit('data', 0);
41      stream.emit('data', 1);
42      stream.emit('error', new Error('asd'));
43    });
44
45    const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
46    await iter.next()
47      .then(common.mustNotCall())
48      .catch(common.mustCall((err) => {
49        assert.strictEqual(err.message, 'asd');
50      }));
51  }
52
53  {
54    // Non standard stream cleanup
55
56    const readable = new Readable({ autoDestroy: false, read() {} });
57    readable.push('asd');
58    readable.push('asd');
59    readable.destroy = null;
60    readable.close = common.mustCall(() => {
61      readable.emit('close');
62    });
63
64    await (async () => {
65      for await (const d of readable) { // eslint-disable-line no-unused-vars
66        return;
67      }
68    })();
69  }
70
71  {
72    const readable = new Readable({ objectMode: true, read() {} });
73    readable.push(0);
74    readable.push(1);
75    readable.push(null);
76
77    const iter = readable[Symbol.asyncIterator]();
78    assert.strictEqual((await iter.next()).value, 0);
79    for await (const d of iter) {
80      assert.strictEqual(d, 1);
81    }
82  }
83
84  {
85    console.log('read without for..await');
86    const max = 5;
87    const readable = new Readable({
88      objectMode: true,
89      read() {}
90    });
91
92    const iter = readable[Symbol.asyncIterator]();
93    assert.strictEqual(iter.stream, readable);
94    const values = [];
95    for (let i = 0; i < max; i++) {
96      values.push(iter.next());
97    }
98    Promise.all(values).then(common.mustCall((values) => {
99      values.forEach(common.mustCall(
100        (item, i) => assert.strictEqual(item.value, 'hello-' + i), 5));
101    }));
102
103    readable.push('hello-0');
104    readable.push('hello-1');
105    readable.push('hello-2');
106    readable.push('hello-3');
107    readable.push('hello-4');
108    readable.push(null);
109
110    const last = await iter.next();
111    assert.strictEqual(last.done, true);
112  }
113
114  {
115    console.log('read without for..await deferred');
116    const readable = new Readable({
117      objectMode: true,
118      read() {}
119    });
120
121    const iter = readable[Symbol.asyncIterator]();
122    assert.strictEqual(iter.stream, readable);
123    let values = [];
124    for (let i = 0; i < 3; i++) {
125      values.push(iter.next());
126    }
127
128    readable.push('hello-0');
129    readable.push('hello-1');
130    readable.push('hello-2');
131
132    let k = 0;
133    const results1 = await Promise.all(values);
134    results1.forEach(common.mustCall(
135      (item) => assert.strictEqual(item.value, 'hello-' + k++), 3));
136
137    values = [];
138    for (let i = 0; i < 2; i++) {
139      values.push(iter.next());
140    }
141
142    readable.push('hello-3');
143    readable.push('hello-4');
144    readable.push(null);
145
146    const results2 = await Promise.all(values);
147    results2.forEach(common.mustCall(
148      (item) => assert.strictEqual(item.value, 'hello-' + k++), 2));
149
150    const last = await iter.next();
151    assert.strictEqual(last.done, true);
152  }
153
154  {
155    console.log('read without for..await with errors');
156    const max = 3;
157    const readable = new Readable({
158      objectMode: true,
159      read() {}
160    });
161
162    const iter = readable[Symbol.asyncIterator]();
163    assert.strictEqual(iter.stream, readable);
164    const values = [];
165    const errors = [];
166    let i;
167    for (i = 0; i < max; i++) {
168      values.push(iter.next());
169    }
170    for (i = 0; i < 2; i++) {
171      errors.push(iter.next());
172    }
173
174    readable.push('hello-0');
175    readable.push('hello-1');
176    readable.push('hello-2');
177
178    const resolved = await Promise.all(values);
179
180    resolved.forEach(common.mustCall(
181      (item, i) => assert.strictEqual(item.value, 'hello-' + i), max));
182
183    errors.slice(0, 1).forEach((promise) => {
184      promise.catch(common.mustCall((err) => {
185        assert.strictEqual(err.message, 'kaboom');
186      }));
187    });
188
189    errors.slice(1).forEach((promise) => {
190      promise.then(common.mustCall(({ done, value }) => {
191        assert.strictEqual(done, true);
192        assert.strictEqual(value, undefined);
193      }));
194    });
195
196    readable.destroy(new Error('kaboom'));
197  }
198
199  {
200    console.log('call next() after error');
201    const readable = new Readable({
202      read() {}
203    });
204    const iterator = readable[Symbol.asyncIterator]();
205
206    const err = new Error('kaboom');
207    readable.destroy(new Error('kaboom'));
208    await assert.rejects(iterator.next.bind(iterator), err);
209  }
210
211  {
212    console.log('read object mode');
213    const max = 42;
214    let readed = 0;
215    let received = 0;
216    const readable = new Readable({
217      objectMode: true,
218      read() {
219        this.push('hello');
220        if (++readed === max) {
221          this.push(null);
222        }
223      }
224    });
225
226    for await (const k of readable) {
227      received++;
228      assert.strictEqual(k, 'hello');
229    }
230
231    assert.strictEqual(readed, received);
232  }
233
234  {
235    console.log('destroy sync');
236    const readable = new Readable({
237      objectMode: true,
238      read() {
239        this.destroy(new Error('kaboom from read'));
240      }
241    });
242
243    let err;
244    try {
245      // eslint-disable-next-line no-unused-vars
246      for await (const k of readable) {}
247    } catch (e) {
248      err = e;
249    }
250    assert.strictEqual(err.message, 'kaboom from read');
251  }
252
253  {
254    console.log('destroy async');
255    const readable = new Readable({
256      objectMode: true,
257      read() {
258        if (!this.pushed) {
259          this.push('hello');
260          this.pushed = true;
261
262          setImmediate(() => {
263            this.destroy(new Error('kaboom'));
264          });
265        }
266      }
267    });
268
269    let received = 0;
270
271    let err = null;
272    try {
273      // eslint-disable-next-line no-unused-vars
274      for await (const k of readable) {
275        received++;
276      }
277    } catch (e) {
278      err = e;
279    }
280
281    assert.strictEqual(err.message, 'kaboom');
282    assert.strictEqual(received, 1);
283  }
284
285  {
286    console.log('destroyed by throw');
287    const readable = new Readable({
288      objectMode: true,
289      read() {
290        this.push('hello');
291      }
292    });
293
294    let err = null;
295    try {
296      for await (const k of readable) {
297        assert.strictEqual(k, 'hello');
298        throw new Error('kaboom');
299      }
300    } catch (e) {
301      err = e;
302    }
303
304    assert.strictEqual(err.message, 'kaboom');
305    assert.strictEqual(readable.destroyed, true);
306  }
307
308  {
309    console.log('destroyed sync after push');
310    const readable = new Readable({
311      objectMode: true,
312      read() {
313        this.push('hello');
314        this.destroy(new Error('kaboom'));
315      }
316    });
317
318    let received = 0;
319
320    let err = null;
321    try {
322      for await (const k of readable) {
323        assert.strictEqual(k, 'hello');
324        received++;
325      }
326    } catch (e) {
327      err = e;
328    }
329
330    assert.strictEqual(err.message, 'kaboom');
331    assert.strictEqual(received, 1);
332  }
333
334  {
335    console.log('destroyed will not deadlock');
336    const readable = new Readable();
337    readable.destroy();
338    process.nextTick(async () => {
339      readable.on('close', common.mustNotCall());
340      let received = 0;
341      for await (const k of readable) {
342        // Just make linting pass. This should never run.
343        assert.strictEqual(k, 'hello');
344        received++;
345      }
346      assert.strictEqual(received, 0);
347    });
348  }
349
350  {
351    console.log('push async');
352    const max = 42;
353    let readed = 0;
354    let received = 0;
355    const readable = new Readable({
356      objectMode: true,
357      read() {
358        setImmediate(() => {
359          this.push('hello');
360          if (++readed === max) {
361            this.push(null);
362          }
363        });
364      }
365    });
366
367    for await (const k of readable) {
368      received++;
369      assert.strictEqual(k, 'hello');
370    }
371
372    assert.strictEqual(readed, received);
373  }
374
375  {
376    console.log('push binary async');
377    const max = 42;
378    let readed = 0;
379    const readable = new Readable({
380      read() {
381        setImmediate(() => {
382          this.push('hello');
383          if (++readed === max) {
384            this.push(null);
385          }
386        });
387      }
388    });
389
390    let expected = '';
391    readable.setEncoding('utf8');
392    readable.pause();
393    readable.on('data', (chunk) => {
394      expected += chunk;
395    });
396
397    let data = '';
398    for await (const k of readable) {
399      data += k;
400    }
401
402    assert.strictEqual(data, expected);
403  }
404
405  {
406    console.log('.next() on destroyed stream');
407    const readable = new Readable({
408      read() {
409        // no-op
410      }
411    });
412
413    readable.destroy();
414
415    const { done } = await readable[Symbol.asyncIterator]().next();
416    assert.strictEqual(done, true);
417  }
418
419  {
420    console.log('.next() on pipelined stream');
421    const readable = new Readable({
422      read() {
423        // no-op
424      }
425    });
426
427    const passthrough = new PassThrough();
428    const err = new Error('kaboom');
429    pipeline(readable, passthrough, common.mustCall((e) => {
430      assert.strictEqual(e, err);
431    }));
432    readable.destroy(err);
433    await assert.rejects(
434      readable[Symbol.asyncIterator]().next(),
435      (e) => {
436        assert.strictEqual(e, err);
437        return true;
438      }
439    );
440  }
441
442  {
443    console.log('iterating on an ended stream completes');
444    const r = new Readable({
445      objectMode: true,
446      read() {
447        this.push('asdf');
448        this.push('hehe');
449        this.push(null);
450      }
451    });
452    // eslint-disable-next-line no-unused-vars
453    for await (const a of r) {
454    }
455    // eslint-disable-next-line no-unused-vars
456    for await (const b of r) {
457    }
458  }
459
460  {
461    console.log('destroy mid-stream does not error');
462    const r = new Readable({
463      objectMode: true,
464      read() {
465        this.push('asdf');
466        this.push('hehe');
467      }
468    });
469
470    // eslint-disable-next-line no-unused-vars
471    for await (const a of r) {
472      r.destroy(null);
473    }
474  }
475
476  {
477    console.log('readable side of a transform stream pushes null');
478    const transform = new Transform({
479      objectMode: true,
480      transform: (chunk, enc, cb) => { cb(null, chunk); }
481    });
482    transform.push(0);
483    transform.push(1);
484    process.nextTick(() => {
485      transform.push(null);
486    });
487
488    const mustReach = [ common.mustCall(), common.mustCall() ];
489
490    const iter = transform[Symbol.asyncIterator]();
491    assert.strictEqual((await iter.next()).value, 0);
492
493    for await (const d of iter) {
494      assert.strictEqual(d, 1);
495      mustReach[0]();
496    }
497    mustReach[1]();
498  }
499
500  {
501    console.log('all next promises must be resolved on end');
502    const r = new Readable({
503      objectMode: true,
504      read() {
505      }
506    });
507
508    const b = r[Symbol.asyncIterator]();
509    const c = b.next();
510    const d = b.next();
511    r.push(null);
512    assert.deepStrictEqual(await c, { done: true, value: undefined });
513    assert.deepStrictEqual(await d, { done: true, value: undefined });
514  }
515
516  {
517    console.log('all next promises must be resolved on destroy');
518    const r = new Readable({
519      objectMode: true,
520      read() {
521      }
522    });
523
524    const b = r[Symbol.asyncIterator]();
525    const c = b.next();
526    const d = b.next();
527    r.destroy();
528    assert.deepStrictEqual(await c, { done: true, value: undefined });
529    assert.deepStrictEqual(await d, { done: true, value: undefined });
530  }
531
532  {
533    console.log('all next promises must be resolved on destroy with error');
534    const r = new Readable({
535      objectMode: true,
536      read() {
537      }
538    });
539
540    const b = r[Symbol.asyncIterator]();
541    const c = b.next();
542    const d = b.next();
543    const err = new Error('kaboom');
544    r.destroy(err);
545
546    await Promise.all([(async () => {
547      let e;
548      try {
549        await c;
550      } catch (_e) {
551        e = _e;
552      }
553      assert.strictEqual(e, err);
554    })(), (async () => {
555      let e;
556      let x;
557      try {
558        x = await d;
559      } catch (_e) {
560        e = _e;
561      }
562      assert.strictEqual(e, undefined);
563      assert.strictEqual(x.done, true);
564      assert.strictEqual(x.value, undefined);
565    })()]);
566  }
567
568  {
569    const _err = new Error('asd');
570    const r = new Readable({
571      read() {
572      },
573      destroy(err, callback) {
574        setTimeout(() => callback(_err), 1);
575      }
576    });
577
578    r.destroy();
579    const it = r[Symbol.asyncIterator]();
580    it.next().catch(common.mustCall((err) => {
581      assert.strictEqual(err, _err);
582    }));
583  }
584
585  {
586    // Don't destroy if no auto destroy.
587    // https://github.com/nodejs/node/issues/35116
588
589    const r = new Readable({
590      autoDestroy: false,
591      read() {
592        this.push('asd');
593        this.push(null);
594      }
595    });
596
597    for await (const chunk of r) {} // eslint-disable-line no-unused-vars
598    assert.strictEqual(r.destroyed, false);
599  }
600
601  {
602    // Destroy if no auto destroy and premature break.
603    // https://github.com/nodejs/node/pull/35122/files#r485678318
604
605    const r = new Readable({
606      autoDestroy: false,
607      read() {
608        this.push('asd');
609      }
610    });
611
612    for await (const chunk of r) { // eslint-disable-line no-unused-vars
613      break;
614    }
615    assert.strictEqual(r.destroyed, true);
616  }
617
618  {
619    // Don't destroy before 'end'.
620
621    const r = new Readable({
622      read() {
623        this.push('asd');
624        this.push(null);
625      }
626    }).on('end', () => {
627      assert.strictEqual(r.destroyed, false);
628    });
629
630    for await (const chunk of r) {} // eslint-disable-line no-unused-vars
631
632    assert.strictEqual(r.destroyed, true);
633  }
634}
635
636{
637  // AsyncIterator return should end even when destroy
638  // does not implement the callback API.
639
640  const r = new Readable({
641    objectMode: true,
642    read() {
643    }
644  });
645
646  const originalDestroy = r.destroy;
647  r.destroy = (err) => {
648    originalDestroy.call(r, err);
649  };
650  const it = r[Symbol.asyncIterator]();
651  const p = it.return();
652  r.push(null);
653  p.then(common.mustCall());
654}
655
656
657{
658  // AsyncIterator return should not error with
659  // premature close.
660
661  const r = new Readable({
662    objectMode: true,
663    read() {
664    }
665  });
666
667  const originalDestroy = r.destroy;
668  r.destroy = (err) => {
669    originalDestroy.call(r, err);
670  };
671  const it = r[Symbol.asyncIterator]();
672  const p = it.return();
673  r.emit('close');
674  p.then(common.mustCall()).catch(common.mustNotCall());
675}
676
677{
678  // AsyncIterator should finish correctly if destroyed.
679
680  const r = new Readable({
681    objectMode: true,
682    read() {
683    }
684  });
685
686  r.destroy();
687  r.on('close', () => {
688    const it = r[Symbol.asyncIterator]();
689    const next = it.next();
690    next
691      .then(common.mustCall(({ done }) => assert.strictEqual(done, true)))
692      .catch(common.mustNotCall());
693  });
694}
695
696{
697  let _req;
698  const server = http.createServer((request, response) => {
699    response.statusCode = 404;
700    response.write('never ends');
701  });
702
703  server.listen(() => {
704    _req = http.request(`http://localhost:${server.address().port}`)
705      .on('response', common.mustCall(async (res) => {
706        setTimeout(() => {
707          _req.destroy(new Error('something happened'));
708        }, 100);
709
710        res.on('aborted', () => {
711          const err = new Error();
712          err.code = 'ECONNRESET';
713          res.emit('error', err);
714        });
715
716        res.on('error', common.mustCall());
717
718        let _err;
719        try {
720          // eslint-disable-next-line no-unused-vars
721          for await (const chunk of res) {}
722        } catch (err) {
723          _err = err;
724        }
725
726        assert.strictEqual(_err.code, 'ECONNRESET');
727        server.close();
728      }))
729      .on('error', common.mustCall())
730      .end();
731  });
732}
733
734{
735  async function getParsedBody(request) {
736    let body = '';
737
738    for await (const data of request) {
739      body += data;
740    }
741
742    try {
743      return JSON.parse(body);
744    } catch {
745      return {};
746    }
747  }
748
749  const str = JSON.stringify({ asd: true });
750  const server = http.createServer(async (request, response) => {
751    const body = await getParsedBody(request);
752    response.statusCode = 200;
753    assert.strictEqual(JSON.stringify(body), str);
754    response.end(JSON.stringify(body));
755  }).listen(() => {
756    http
757      .request({
758        method: 'POST',
759        hostname: 'localhost',
760        port: server.address().port,
761      })
762      .end(str)
763      .on('response', async (res) => {
764        let body = '';
765        for await (const chunk of res) {
766          body += chunk;
767        }
768        assert.strictEqual(body, str);
769        server.close();
770      });
771  });
772}
773
774// To avoid missing some tests if a promise does not resolve
775tests().then(common.mustCall());
776