• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1var Transform = require('readable-stream').Transform;
2var inherits = require('inherits');
3var cyclist = require('cyclist');
4var util = require('util');
5
6var ParallelTransform = function(maxParallel, opts, ontransform) {
7	if (!(this instanceof ParallelTransform)) return new ParallelTransform(maxParallel, opts, ontransform);
8
9	if (typeof maxParallel === 'function') {
10		ontransform = maxParallel;
11		opts = null;
12		maxParallel = 1;
13	}
14	if (typeof opts === 'function') {
15		ontransform = opts;
16		opts = null;
17	}
18
19	if (!opts) opts = {};
20	if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16);
21	if (opts.objectMode !== false) opts.objectMode = true;
22
23	Transform.call(this, opts);
24
25	this._maxParallel = maxParallel;
26	this._ontransform = ontransform;
27	this._destroyed = false;
28	this._flushed = false;
29	this._ordered = opts.ordered !== false;
30	this._buffer = this._ordered ? cyclist(maxParallel) : [];
31	this._top = 0;
32	this._bottom = 0;
33	this._ondrain = null;
34};
35
36inherits(ParallelTransform, Transform);
37
38ParallelTransform.prototype.destroy = function() {
39	if (this._destroyed) return;
40	this._destroyed = true;
41	this.emit('close');
42};
43
44ParallelTransform.prototype._transform = function(chunk, enc, callback) {
45	var self = this;
46	var pos = this._top++;
47
48	this._ontransform(chunk, function(err, data) {
49		if (self._destroyed) return;
50		if (err) {
51			self.emit('error', err);
52			self.push(null);
53			self.destroy();
54			return;
55		}
56		if (self._ordered) {
57			self._buffer.put(pos, (data === undefined || data === null) ? null : data);
58		}
59		else {
60			self._buffer.push(data);
61		}
62		self._drain();
63	});
64
65	if (this._top - this._bottom < this._maxParallel) return callback();
66	this._ondrain = callback;
67};
68
69ParallelTransform.prototype._flush = function(callback) {
70	this._flushed = true;
71	this._ondrain = callback;
72	this._drain();
73};
74
75ParallelTransform.prototype._drain = function() {
76	if (this._ordered) {
77		while (this._buffer.get(this._bottom) !== undefined) {
78			var data = this._buffer.del(this._bottom++);
79			if (data === null) continue;
80			this.push(data);
81		}
82	}
83	else {
84		while (this._buffer.length > 0) {
85			var data =  this._buffer.pop();
86			this._bottom++;
87			if (data === null) continue;
88			this.push(data);
89		}
90	}
91
92
93	if (!this._drained() || !this._ondrain) return;
94
95	var ondrain = this._ondrain;
96	this._ondrain = null;
97	ondrain();
98};
99
100ParallelTransform.prototype._drained = function() {
101	var diff = this._top - this._bottom;
102	return this._flushed ? !diff : diff < this._maxParallel;
103};
104
105module.exports = ParallelTransform;
106