1var Stream = require('stream') 2 3// through 4// 5// a stream that does nothing but re-emit the input. 6// useful for aggregating a series of changing but not ending streams into one stream) 7 8exports = module.exports = through 9through.through = through 10 11//create a readable writable stream. 12 13function through (write, end, opts) { 14 write = write || function (data) { this.queue(data) } 15 end = end || function () { this.queue(null) } 16 17 var ended = false, destroyed = false, buffer = [], _ended = false 18 var stream = new Stream() 19 stream.readable = stream.writable = true 20 stream.paused = false 21 22// stream.autoPause = !(opts && opts.autoPause === false) 23 stream.autoDestroy = !(opts && opts.autoDestroy === false) 24 25 stream.write = function (data) { 26 write.call(this, data) 27 return !stream.paused 28 } 29 30 function drain() { 31 while(buffer.length && !stream.paused) { 32 var data = buffer.shift() 33 if(null === data) 34 return stream.emit('end') 35 else 36 stream.emit('data', data) 37 } 38 } 39 40 stream.queue = stream.push = function (data) { 41// console.error(ended) 42 if(_ended) return stream 43 if(data === null) _ended = true 44 buffer.push(data) 45 drain() 46 return stream 47 } 48 49 //this will be registered as the first 'end' listener 50 //must call destroy next tick, to make sure we're after any 51 //stream piped from here. 52 //this is only a problem if end is not emitted synchronously. 53 //a nicer way to do this is to make sure this is the last listener for 'end' 54 55 stream.on('end', function () { 56 stream.readable = false 57 if(!stream.writable && stream.autoDestroy) 58 process.nextTick(function () { 59 stream.destroy() 60 }) 61 }) 62 63 function _end () { 64 stream.writable = false 65 end.call(stream) 66 if(!stream.readable && stream.autoDestroy) 67 stream.destroy() 68 } 69 70 stream.end = function (data) { 71 if(ended) return 72 ended = true 73 if(arguments.length) stream.write(data) 74 _end() // will emit or queue 75 return stream 76 } 77 78 stream.destroy = function () { 79 if(destroyed) return 80 destroyed = true 81 ended = true 82 buffer.length = 0 83 stream.writable = stream.readable = false 84 stream.emit('close') 85 return stream 86 } 87 88 stream.pause = function () { 89 if(stream.paused) return 90 stream.paused = true 91 return stream 92 } 93 94 stream.resume = function () { 95 if(stream.paused) { 96 stream.paused = false 97 stream.emit('resume') 98 } 99 drain() 100 //may have become paused again, 101 //as drain emits 'data'. 102 if(!stream.paused) 103 stream.emit('drain') 104 return stream 105 } 106 return stream 107} 108 109