• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var Stream = require('stream').Stream;
2var util = require('util');
3
4module.exports = DelayedStream;
5function DelayedStream() {
6  this.source = null;
7  this.dataSize = 0;
8  this.maxDataSize = 1024 * 1024;
9  this.pauseStream = true;
10
11  this._maxDataSizeExceeded = false;
12  this._released = false;
13  this._bufferedEvents = [];
14}
15util.inherits(DelayedStream, Stream);
16
17DelayedStream.create = function(source, options) {
18  var delayedStream = new this();
19
20  options = options || {};
21  for (var option in options) {
22    delayedStream[option] = options[option];
23  }
24
25  delayedStream.source = source;
26
27  var realEmit = source.emit;
28  source.emit = function() {
29    delayedStream._handleEmit(arguments);
30    return realEmit.apply(source, arguments);
31  };
32
33  source.on('error', function() {});
34  if (delayedStream.pauseStream) {
35    source.pause();
36  }
37
38  return delayedStream;
39};
40
41Object.defineProperty(DelayedStream.prototype, 'readable', {
42  configurable: true,
43  enumerable: true,
44  get: function() {
45    return this.source.readable;
46  }
47});
48
49DelayedStream.prototype.setEncoding = function() {
50  return this.source.setEncoding.apply(this.source, arguments);
51};
52
53DelayedStream.prototype.resume = function() {
54  if (!this._released) {
55    this.release();
56  }
57
58  this.source.resume();
59};
60
61DelayedStream.prototype.pause = function() {
62  this.source.pause();
63};
64
65DelayedStream.prototype.release = function() {
66  this._released = true;
67
68  this._bufferedEvents.forEach(function(args) {
69    this.emit.apply(this, args);
70  }.bind(this));
71  this._bufferedEvents = [];
72};
73
74DelayedStream.prototype.pipe = function() {
75  var r = Stream.prototype.pipe.apply(this, arguments);
76  this.resume();
77  return r;
78};
79
80DelayedStream.prototype._handleEmit = function(args) {
81  if (this._released) {
82    this.emit.apply(this, args);
83    return;
84  }
85
86  if (args[0] === 'data') {
87    this.dataSize += args[1].length;
88    this._checkIfMaxDataSizeExceeded();
89  }
90
91  this._bufferedEvents.push(args);
92};
93
94DelayedStream.prototype._checkIfMaxDataSizeExceeded = function() {
95  if (this._maxDataSizeExceeded) {
96    return;
97  }
98
99  if (this.dataSize <= this.maxDataSize) {
100    return;
101  }
102
103  this._maxDataSizeExceeded = true;
104  var message =
105    'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.'
106  this.emit('error', new Error(message));
107};
108