• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2const common = require('../common');
3const assert = require('assert');
4
5const { Readable, Writable } = require('stream');
6
7const source = Readable({ read: () => {} });
8const dest1 = Writable({ write: () => {} });
9const dest2 = Writable({ write: () => {} });
10
11source.pipe(dest1);
12source.pipe(dest2);
13
14dest1.on('unpipe', common.mustCall());
15dest2.on('unpipe', common.mustCall());
16
17assert.strictEqual(source._readableState.pipes[0], dest1);
18assert.strictEqual(source._readableState.pipes[1], dest2);
19assert.strictEqual(source._readableState.pipes.length, 2);
20
21// Should be able to unpipe them in the reverse order that they were piped.
22
23source.unpipe(dest2);
24
25assert.deepStrictEqual(source._readableState.pipes, [dest1]);
26assert.notStrictEqual(source._readableState.pipes, dest2);
27
28dest2.on('unpipe', common.mustNotCall());
29source.unpipe(dest2);
30
31source.unpipe(dest1);
32
33assert.strictEqual(source._readableState.pipes.length, 0);
34
35{
36  // Test `cleanup()` if we unpipe all streams.
37  const source = Readable({ read: () => {} });
38  const dest1 = Writable({ write: () => {} });
39  const dest2 = Writable({ write: () => {} });
40
41  let destCount = 0;
42  const srcCheckEventNames = ['end', 'data'];
43  const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe'];
44
45  const checkSrcCleanup = common.mustCall(() => {
46    assert.strictEqual(source._readableState.pipes.length, 0);
47    assert.strictEqual(source._readableState.flowing, false);
48
49    srcCheckEventNames.forEach((eventName) => {
50      assert.strictEqual(
51        source.listenerCount(eventName), 0,
52        `source's '${eventName}' event listeners not removed`
53      );
54    });
55  });
56
57  function checkDestCleanup(dest) {
58    const currentDestId = ++destCount;
59    source.pipe(dest);
60
61    const unpipeChecker = common.mustCall(() => {
62      assert.deepStrictEqual(
63        dest.listeners('unpipe'), [unpipeChecker],
64        `destination{${currentDestId}} should have a 'unpipe' event ` +
65        'listener which is `unpipeChecker`'
66      );
67      dest.removeListener('unpipe', unpipeChecker);
68      destCheckEventNames.forEach((eventName) => {
69        assert.strictEqual(
70          dest.listenerCount(eventName), 0,
71          `destination{${currentDestId}}'s '${eventName}' event ` +
72          'listeners not removed'
73        );
74      });
75
76      if (--destCount === 0)
77        checkSrcCleanup();
78    });
79
80    dest.on('unpipe', unpipeChecker);
81  }
82
83  checkDestCleanup(dest1);
84  checkDestCleanup(dest2);
85  source.unpipe();
86}
87
88{
89  const src = Readable({ read: () => {} });
90  const dst = Writable({ write: () => {} });
91  src.pipe(dst);
92  src.on('resume', common.mustCall(() => {
93    src.on('pause', common.mustCall());
94    src.unpipe(dst);
95  }));
96}
97