1'use strict'; 2const common = require('../common'); 3const assert = require('assert'); 4const { Readable, Writable, PassThrough } = require('stream'); 5 6{ 7 let ticks = 17; 8 9 const rs = new Readable({ 10 objectMode: true, 11 read: () => { 12 if (ticks-- > 0) 13 return process.nextTick(() => rs.push({})); 14 rs.push({}); 15 rs.push(null); 16 } 17 }); 18 19 const ws = new Writable({ 20 highWaterMark: 0, 21 objectMode: true, 22 write: (data, end, cb) => setImmediate(cb) 23 }); 24 25 rs.on('end', common.mustCall()); 26 ws.on('finish', common.mustCall()); 27 rs.pipe(ws); 28} 29 30{ 31 let missing = 8; 32 33 const rs = new Readable({ 34 objectMode: true, 35 read: () => { 36 if (missing--) rs.push({}); 37 else rs.push(null); 38 } 39 }); 40 41 const pt = rs 42 .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })) 43 .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })); 44 45 pt.on('end', () => { 46 wrapper.push(null); 47 }); 48 49 const wrapper = new Readable({ 50 objectMode: true, 51 read: () => { 52 process.nextTick(() => { 53 let data = pt.read(); 54 if (data === null) { 55 pt.once('readable', () => { 56 data = pt.read(); 57 if (data !== null) wrapper.push(data); 58 }); 59 } else { 60 wrapper.push(data); 61 } 62 }); 63 } 64 }); 65 66 wrapper.resume(); 67 wrapper.on('end', common.mustCall()); 68} 69 70{ 71 // Only register drain if there is backpressure. 72 const rs = new Readable({ read() {} }); 73 74 const pt = rs 75 .pipe(new PassThrough({ objectMode: true, highWaterMark: 2 })); 76 assert.strictEqual(pt.listenerCount('drain'), 0); 77 pt.on('finish', () => { 78 assert.strictEqual(pt.listenerCount('drain'), 0); 79 }); 80 81 rs.push('asd'); 82 assert.strictEqual(pt.listenerCount('drain'), 0); 83 84 process.nextTick(() => { 85 rs.push('asd'); 86 assert.strictEqual(pt.listenerCount('drain'), 0); 87 rs.push(null); 88 assert.strictEqual(pt.listenerCount('drain'), 0); 89 }); 90} 91