1'use strict'; 2const common = require('../common'); 3const assert = require('assert'); 4 5const { Readable, Writable } = require('stream'); 6 7const source = Readable({ read: () => {} }); 8const dest1 = Writable({ write: () => {} }); 9const dest2 = Writable({ write: () => {} }); 10 11source.pipe(dest1); 12source.pipe(dest2); 13 14dest1.on('unpipe', common.mustCall()); 15dest2.on('unpipe', common.mustCall()); 16 17assert.strictEqual(source._readableState.pipes[0], dest1); 18assert.strictEqual(source._readableState.pipes[1], dest2); 19assert.strictEqual(source._readableState.pipes.length, 2); 20 21// Should be able to unpipe them in the reverse order that they were piped. 22 23source.unpipe(dest2); 24 25assert.deepStrictEqual(source._readableState.pipes, [dest1]); 26assert.notStrictEqual(source._readableState.pipes, dest2); 27 28dest2.on('unpipe', common.mustNotCall()); 29source.unpipe(dest2); 30 31source.unpipe(dest1); 32 33assert.strictEqual(source._readableState.pipes.length, 0); 34 35{ 36 // Test `cleanup()` if we unpipe all streams. 37 const source = Readable({ read: () => {} }); 38 const dest1 = Writable({ write: () => {} }); 39 const dest2 = Writable({ write: () => {} }); 40 41 let destCount = 0; 42 const srcCheckEventNames = ['end', 'data']; 43 const destCheckEventNames = ['close', 'finish', 'drain', 'error', 'unpipe']; 44 45 const checkSrcCleanup = common.mustCall(() => { 46 assert.strictEqual(source._readableState.pipes.length, 0); 47 assert.strictEqual(source._readableState.flowing, false); 48 49 srcCheckEventNames.forEach((eventName) => { 50 assert.strictEqual( 51 source.listenerCount(eventName), 0, 52 `source's '${eventName}' event listeners not removed` 53 ); 54 }); 55 }); 56 57 function checkDestCleanup(dest) { 58 const currentDestId = ++destCount; 59 source.pipe(dest); 60 61 const unpipeChecker = common.mustCall(() => { 62 assert.deepStrictEqual( 63 dest.listeners('unpipe'), [unpipeChecker], 64 `destination{${currentDestId}} should have a 'unpipe' event ` + 65 'listener which is `unpipeChecker`' 66 ); 67 dest.removeListener('unpipe', unpipeChecker); 68 destCheckEventNames.forEach((eventName) => { 69 assert.strictEqual( 70 dest.listenerCount(eventName), 0, 71 `destination{${currentDestId}}'s '${eventName}' event ` + 72 'listeners not removed' 73 ); 74 }); 75 76 if (--destCount === 0) 77 checkSrcCleanup(); 78 }); 79 80 dest.on('unpipe', unpipeChecker); 81 } 82 83 checkDestCleanup(dest1); 84 checkDestCleanup(dest2); 85 source.unpipe(); 86} 87 88{ 89 const src = Readable({ read: () => {} }); 90 const dst = Writable({ write: () => {} }); 91 src.pipe(dst); 92 src.on('resume', common.mustCall(() => { 93 src.on('pause', common.mustCall()); 94 src.unpipe(dst); 95 })); 96} 97