1'use strict'; 2 3const common = require('../common'); 4const { pipeline, Duplex, PassThrough, Writable } = require('stream'); 5const assert = require('assert'); 6 7process.on('uncaughtException', common.mustCall((err) => { 8 assert.strictEqual(err.message, 'no way'); 9}, 2)); 10 11// Ensure that listeners is removed if last stream is readable 12// And other stream's listeners unchanged 13const a = new PassThrough(); 14a.end('foobar'); 15const b = new Duplex({ 16 write(chunk, encoding, callback) { 17 callback(); 18 } 19}); 20pipeline(a, b, common.mustCall((error) => { 21 if (error) { 22 assert.ifError(error); 23 } 24 25 assert(a.listenerCount('error') > 0); 26 assert.strictEqual(b.listenerCount('error'), 0); 27 setTimeout(() => { 28 assert.strictEqual(b.listenerCount('error'), 0); 29 b.destroy(new Error('no way')); 30 }, 100); 31})); 32 33// Async generators 34const c = new PassThrough(); 35c.end('foobar'); 36const d = pipeline( 37 c, 38 async function* (source) { 39 for await (const chunk of source) { 40 yield String(chunk).toUpperCase(); 41 } 42 }, 43 common.mustCall((error) => { 44 if (error) { 45 assert.ifError(error); 46 } 47 48 assert(c.listenerCount('error') > 0); 49 assert.strictEqual(d.listenerCount('error'), 0); 50 setTimeout(() => { 51 assert.strictEqual(b.listenerCount('error'), 0); 52 d.destroy(new Error('no way')); 53 }, 100); 54 }) 55); 56 57// If last stream is not readable, will not throw and remove listeners 58const e = new PassThrough(); 59e.end('foobar'); 60const f = new Writable({ 61 write(chunk, encoding, callback) { 62 callback(); 63 } 64}); 65pipeline(e, f, common.mustCall((error) => { 66 if (error) { 67 assert.ifError(error); 68 } 69 70 assert(e.listenerCount('error') > 0); 71 assert(f.listenerCount('error') > 0); 72 setTimeout(() => { 73 assert(f.listenerCount('error') > 0); 74 f.destroy(new Error('no way')); 75 }, 100); 76})); 77