• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var util = require('util');
2var Stream = require('stream').Stream;
3var DelayedStream = require('delayed-stream');
4var defer = require('./defer.js');
5
6module.exports = CombinedStream;
7function CombinedStream() {
8  this.writable = false;
9  this.readable = true;
10  this.dataSize = 0;
11  this.maxDataSize = 2 * 1024 * 1024;
12  this.pauseStreams = true;
13
14  this._released = false;
15  this._streams = [];
16  this._currentStream = null;
17}
18util.inherits(CombinedStream, Stream);
19
20CombinedStream.create = function(options) {
21  var combinedStream = new this();
22
23  options = options || {};
24  for (var option in options) {
25    combinedStream[option] = options[option];
26  }
27
28  return combinedStream;
29};
30
31CombinedStream.isStreamLike = function(stream) {
32  return (typeof stream !== 'function')
33    && (typeof stream !== 'string')
34    && (typeof stream !== 'boolean')
35    && (typeof stream !== 'number')
36    && (!Buffer.isBuffer(stream));
37};
38
39CombinedStream.prototype.append = function(stream) {
40  var isStreamLike = CombinedStream.isStreamLike(stream);
41
42  if (isStreamLike) {
43    if (!(stream instanceof DelayedStream)) {
44      var newStream = DelayedStream.create(stream, {
45        maxDataSize: Infinity,
46        pauseStream: this.pauseStreams,
47      });
48      stream.on('data', this._checkDataSize.bind(this));
49      stream = newStream;
50    }
51
52    this._handleErrors(stream);
53
54    if (this.pauseStreams) {
55      stream.pause();
56    }
57  }
58
59  this._streams.push(stream);
60  return this;
61};
62
63CombinedStream.prototype.pipe = function(dest, options) {
64  Stream.prototype.pipe.call(this, dest, options);
65  this.resume();
66  return dest;
67};
68
69CombinedStream.prototype._getNext = function() {
70  this._currentStream = null;
71  var stream = this._streams.shift();
72
73
74  if (typeof stream == 'undefined') {
75    this.end();
76    return;
77  }
78
79  if (typeof stream !== 'function') {
80    this._pipeNext(stream);
81    return;
82  }
83
84  var getStream = stream;
85  getStream(function(stream) {
86    var isStreamLike = CombinedStream.isStreamLike(stream);
87    if (isStreamLike) {
88      stream.on('data', this._checkDataSize.bind(this));
89      this._handleErrors(stream);
90    }
91
92    defer(this._pipeNext.bind(this, stream));
93  }.bind(this));
94};
95
96CombinedStream.prototype._pipeNext = function(stream) {
97  this._currentStream = stream;
98
99  var isStreamLike = CombinedStream.isStreamLike(stream);
100  if (isStreamLike) {
101    stream.on('end', this._getNext.bind(this));
102    stream.pipe(this, {end: false});
103    return;
104  }
105
106  var value = stream;
107  this.write(value);
108  this._getNext();
109};
110
111CombinedStream.prototype._handleErrors = function(stream) {
112  var self = this;
113  stream.on('error', function(err) {
114    self._emitError(err);
115  });
116};
117
118CombinedStream.prototype.write = function(data) {
119  this.emit('data', data);
120};
121
122CombinedStream.prototype.pause = function() {
123  if (!this.pauseStreams) {
124    return;
125  }
126
127  if(this.pauseStreams && this._currentStream && typeof(this._currentStream.pause) == 'function') this._currentStream.pause();
128  this.emit('pause');
129};
130
131CombinedStream.prototype.resume = function() {
132  if (!this._released) {
133    this._released = true;
134    this.writable = true;
135    this._getNext();
136  }
137
138  if(this.pauseStreams && this._currentStream && typeof(this._currentStream.resume) == 'function') this._currentStream.resume();
139  this.emit('resume');
140};
141
142CombinedStream.prototype.end = function() {
143  this._reset();
144  this.emit('end');
145};
146
147CombinedStream.prototype.destroy = function() {
148  this._reset();
149  this.emit('close');
150};
151
152CombinedStream.prototype._reset = function() {
153  this.writable = false;
154  this._streams = [];
155  this._currentStream = null;
156};
157
158CombinedStream.prototype._checkDataSize = function() {
159  this._updateDataSize();
160  if (this.dataSize <= this.maxDataSize) {
161    return;
162  }
163
164  var message =
165    'DelayedStream#maxDataSize of ' + this.maxDataSize + ' bytes exceeded.';
166  this._emitError(new Error(message));
167};
168
169CombinedStream.prototype._updateDataSize = function() {
170  this.dataSize = 0;
171
172  var self = this;
173  this._streams.forEach(function(stream) {
174    if (!stream.dataSize) {
175      return;
176    }
177
178    self.dataSize += stream.dataSize;
179  });
180
181  if (this._currentStream && this._currentStream.dataSize) {
182    this.dataSize += this._currentStream.dataSize;
183  }
184};
185
186CombinedStream.prototype._emitError = function(err) {
187  this._reset();
188  this.emit('error', err);
189};
190