1var util = require('util'); 2var Stream = require('stream').Stream; 3var DelayedStream = require('delayed-stream'); 4var defer = require('./defer.js'); 5 6module.exports = CombinedStream; 7function CombinedStream() { 8 this.writable = false; 9 this.readable = true; 10 this.dataSize = 0; 11 this.maxDataSize = 2 * 1024 * 1024; 12 this.pauseStreams = true; 13 14 this._released = false; 15 this._streams = []; 16 this._currentStream = null; 17} 18util.inherits(CombinedStream, Stream); 19 20CombinedStream.create = function(options) { 21 var combinedStream = new this(); 22 23 options = options || {}; 24 for (var option in options) { 25 combinedStream[option] = options[option]; 26 } 27 28 return combinedStream; 29}; 30 31CombinedStream.isStreamLike = function(stream) { 32 return (typeof stream !== 'function') 33 && (typeof stream !== 'string') 34 && (typeof stream !== 'boolean') 35 && (typeof stream !== 'number') 36 && (!Buffer.isBuffer(stream)); 37}; 38 39CombinedStream.prototype.append = function(stream) { 40 var isStreamLike = CombinedStream.isStreamLike(stream); 41 42 if (isStreamLike) { 43 if (!(stream instanceof DelayedStream)) { 44 var newStream = DelayedStream.create(stream, { 45 maxDataSize: Infinity, 46 pauseStream: this.pauseStreams, 47 }); 48 stream.on('data', this._checkDataSize.bind(this)); 49 stream = newStream; 50 } 51 52 this._handleErrors(stream); 53 54 if (this.pauseStreams) { 55 stream.pause(); 56 } 57 } 58 59 this._streams.push(stream); 60 return this; 61}; 62 63CombinedStream.prototype.pipe = function(dest, options) { 64 Stream.prototype.pipe.call(this, dest, options); 65 this.resume(); 66 return dest; 67}; 68 69CombinedStream.prototype._getNext = function() { 70 this._currentStream = null; 71 var stream = this._streams.shift(); 72 73 74 if (typeof stream == 'undefined') { 75 this.end(); 76 return; 77 } 78 79 if (typeof stream !== 'function') { 80 this._pipeNext(stream); 81 return; 82 } 83 84 var getStream = stream; 85 getStream(function(stream) { 86 var isStreamLike = CombinedStream.isStreamLike(stream); 87 if (isStreamLike) { 88 stream.on('data', this._checkDataSize.bind(this)); 89 this._handleErrors(stream); 90 } 91 92 defer(this._pipeNext.bind(this, stream)); 93 }.bind(this)); 94}; 95 96CombinedStream.prototype._pipeNext = function(stream) { 97 this._currentStream = stream; 98 99 var isStreamLike = CombinedStream.isStreamLike(stream); 100 if (isStreamLike) { 101 stream.on('end', this._getNext.bind(this)); 102 stream.pipe(this, {end: false}); 103 return; 104 } 105 106 var value = stream; 107 this.write(value); 108 this._getNext(); 109}; 110 111CombinedStream.prototype._handleErrors = function(stream) { 112 var self = this; 113 stream.on('error', function(err) { 114 self._emitError(err); 115 }); 116}; 117 118CombinedStream.prototype.write = function(data) { 119 this.emit('data', data); 120}; 121 122CombinedStream.prototype.pause = function() { 123 if (!this.pauseStreams) { 124 return; 125 } 126 127 if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause(); 128 this.emit('pause'); 129}; 130 131CombinedStream.prototype.resume = function() { 132 if (!this._released) { 133 this._released = true; 134 this.writable = true; 135 this._getNext(); 136 } 137 138 if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume(); 139 this.emit('resume'); 140}; 141 142CombinedStream.prototype.end = function() { 143 this._reset(); 144 this.emit('end'); 145}; 146 147CombinedStream.prototype.destroy = function() { 148 this._reset(); 149 this.emit('close'); 150}; 151 152CombinedStream.prototype._reset = function() { 153 this.writable = false; 154 this._streams = []; 155 this._currentStream = null; 156}; 157 158CombinedStream.prototype._checkDataSize = function() { 159 this._updateDataSize(); 160 if (this.dataSize <= this.maxDataSize) { 161 return; 162 } 163 164 var message = 165 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'; 166 this._emitError(new Error(message)); 167}; 168 169CombinedStream.prototype._updateDataSize = function() { 170 this.dataSize = 0; 171 172 var self = this; 173 this._streams.forEach(function(stream) { 174 if (!stream.dataSize) { 175 return; 176 } 177 178 self.dataSize += stream.dataSize; 179 }); 180 181 if (this._currentStream && this._currentStream.dataSize) { 182 this.dataSize += this._currentStream.dataSize; 183 } 184}; 185 186CombinedStream.prototype._emitError = function(err) { 187 this._reset(); 188 this.emit('error', err); 189}; 190