• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const common = require('../common');
4const { pipeline, Duplex, PassThrough, Writable } = require('stream');
5const assert = require('assert');
6
7process.on('uncaughtException', common.mustCall((err) => {
8  assert.strictEqual(err.message, 'no way');
9}, 2));
10
11// Ensure that listeners is removed if last stream is readable
12// And other stream's listeners unchanged
13const a = new PassThrough();
14a.end('foobar');
15const b = new Duplex({
16  write(chunk, encoding, callback) {
17    callback();
18  }
19});
20pipeline(a, b, common.mustCall((error) => {
21  if (error) {
22    assert.ifError(error);
23  }
24
25  assert(a.listenerCount('error') > 0);
26  assert.strictEqual(b.listenerCount('error'), 0);
27  setTimeout(() => {
28    assert.strictEqual(b.listenerCount('error'), 0);
29    b.destroy(new Error('no way'));
30  }, 100);
31}));
32
33// Async generators
34const c = new PassThrough();
35c.end('foobar');
36const d = pipeline(
37  c,
38  async function* (source) {
39    for await (const chunk of source) {
40      yield String(chunk).toUpperCase();
41    }
42  },
43  common.mustCall((error) => {
44    if (error) {
45      assert.ifError(error);
46    }
47
48    assert(c.listenerCount('error') > 0);
49    assert.strictEqual(d.listenerCount('error'), 0);
50    setTimeout(() => {
51      assert.strictEqual(b.listenerCount('error'), 0);
52      d.destroy(new Error('no way'));
53    }, 100);
54  })
55);
56
57// If last stream is not readable, will not throw and remove listeners
58const e = new PassThrough();
59e.end('foobar');
60const f = new Writable({
61  write(chunk, encoding, callback) {
62    callback();
63  }
64});
65pipeline(e, f, common.mustCall((error) => {
66  if (error) {
67    assert.ifError(error);
68  }
69
70  assert(e.listenerCount('error') > 0);
71  assert(f.listenerCount('error') > 0);
72  setTimeout(() => {
73    assert(f.listenerCount('error') > 0);
74    f.destroy(new Error('no way'));
75  }, 100);
76}));
77