1var Stream = require('stream').Stream; 2var util = require('util'); 3 4module.exports = DelayedStream; 5function DelayedStream() { 6 this.source = null; 7 this.dataSize = 0; 8 this.maxDataSize = 1024 * 1024; 9 this.pauseStream = true; 10 11 this._maxDataSizeExceeded = false; 12 this._released = false; 13 this._bufferedEvents = []; 14} 15util.inherits(DelayedStream, Stream); 16 17DelayedStream.create = function(source, options) { 18 var delayedStream = new this(); 19 20 options = options || {}; 21 for (var option in options) { 22 delayedStream[option] = options[option]; 23 } 24 25 delayedStream.source = source; 26 27 var realEmit = source.emit; 28 source.emit = function() { 29 delayedStream._handleEmit(arguments); 30 return realEmit.apply(source, arguments); 31 }; 32 33 source.on('error', function() {}); 34 if (delayedStream.pauseStream) { 35 source.pause(); 36 } 37 38 return delayedStream; 39}; 40 41Object.defineProperty(DelayedStream.prototype, 'readable', { 42 configurable: true, 43 enumerable: true, 44 get: function() { 45 return this.source.readable; 46 } 47}); 48 49DelayedStream.prototype.setEncoding = function() { 50 return this.source.setEncoding.apply(this.source, arguments); 51}; 52 53DelayedStream.prototype.resume = function() { 54 if (!this._released) { 55 this.release(); 56 } 57 58 this.source.resume(); 59}; 60 61DelayedStream.prototype.pause = function() { 62 this.source.pause(); 63}; 64 65DelayedStream.prototype.release = function() { 66 this._released = true; 67 68 this._bufferedEvents.forEach(function(args) { 69 this.emit.apply(this, args); 70 }.bind(this)); 71 this._bufferedEvents = []; 72}; 73 74DelayedStream.prototype.pipe = function() { 75 var r = Stream.prototype.pipe.apply(this, arguments); 76 this.resume(); 77 return r; 78}; 79 80DelayedStream.prototype._handleEmit = function(args) { 81 if (this._released) { 82 this.emit.apply(this, args); 83 return; 84 } 85 86 if (args[0] === 'data') { 87 this.dataSize += args[1].length; 88 this._checkIfMaxDataSizeExceeded(); 89 } 90 91 this._bufferedEvents.push(args); 92}; 93 94DelayedStream.prototype._checkIfMaxDataSizeExceeded = function() { 95 if (this._maxDataSizeExceeded) { 96 return; 97 } 98 99 if (this.dataSize <= this.maxDataSize) { 100 return; 101 } 102 103 this._maxDataSizeExceeded = true; 104 var message = 105 'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.' 106 this.emit('error', new Error(message)); 107}; 108