1var pump = require('pump') 2var inherits = require('inherits') 3var Duplexify = require('duplexify') 4 5var toArray = function(args) { 6 if (!args.length) return [] 7 return Array.isArray(args[0]) ? args[0] : Array.prototype.slice.call(args) 8} 9 10var define = function(opts) { 11 var Pumpify = function() { 12 var streams = toArray(arguments) 13 if (!(this instanceof Pumpify)) return new Pumpify(streams) 14 Duplexify.call(this, null, null, opts) 15 if (streams.length) this.setPipeline(streams) 16 } 17 18 inherits(Pumpify, Duplexify) 19 20 Pumpify.prototype.setPipeline = function() { 21 var streams = toArray(arguments) 22 var self = this 23 var ended = false 24 var w = streams[0] 25 var r = streams[streams.length-1] 26 27 r = r.readable ? r : null 28 w = w.writable ? w : null 29 30 var onclose = function() { 31 streams[0].emit('error', new Error('stream was destroyed')) 32 } 33 34 this.on('close', onclose) 35 this.on('prefinish', function() { 36 if (!ended) self.cork() 37 }) 38 39 pump(streams, function(err) { 40 self.removeListener('close', onclose) 41 if (err) return self.destroy(err.message === 'premature close' ? null : err) 42 ended = true 43 // pump ends after the last stream is not writable *but* 44 // pumpify still forwards the readable part so we need to catch errors 45 // still, so reenable autoDestroy in this case 46 if (self._autoDestroy === false) self._autoDestroy = true 47 self.uncork() 48 }) 49 50 if (this.destroyed) return onclose() 51 this.setWritable(w) 52 this.setReadable(r) 53 } 54 55 return Pumpify 56} 57 58module.exports = define({autoDestroy:false, destroy:false}) 59module.exports.obj = define({autoDestroy: false, destroy:false, objectMode:true, highWaterMark:16}) 60module.exports.ctor = define 61