1var Transform = require('readable-stream').Transform; 2var inherits = require('inherits'); 3var cyclist = require('cyclist'); 4var util = require('util'); 5 6var ParallelTransform = function(maxParallel, opts, ontransform) { 7 if (!(this instanceof ParallelTransform)) return new ParallelTransform(maxParallel, opts, ontransform); 8 9 if (typeof maxParallel === 'function') { 10 ontransform = maxParallel; 11 opts = null; 12 maxParallel = 1; 13 } 14 if (typeof opts === 'function') { 15 ontransform = opts; 16 opts = null; 17 } 18 19 if (!opts) opts = {}; 20 if (!opts.highWaterMark) opts.highWaterMark = Math.max(maxParallel, 16); 21 if (opts.objectMode !== false) opts.objectMode = true; 22 23 Transform.call(this, opts); 24 25 this._maxParallel = maxParallel; 26 this._ontransform = ontransform; 27 this._destroyed = false; 28 this._flushed = false; 29 this._ordered = opts.ordered !== false; 30 this._buffer = this._ordered ? cyclist(maxParallel) : []; 31 this._top = 0; 32 this._bottom = 0; 33 this._ondrain = null; 34}; 35 36inherits(ParallelTransform, Transform); 37 38ParallelTransform.prototype.destroy = function() { 39 if (this._destroyed) return; 40 this._destroyed = true; 41 this.emit('close'); 42}; 43 44ParallelTransform.prototype._transform = function(chunk, enc, callback) { 45 var self = this; 46 var pos = this._top++; 47 48 this._ontransform(chunk, function(err, data) { 49 if (self._destroyed) return; 50 if (err) { 51 self.emit('error', err); 52 self.push(null); 53 self.destroy(); 54 return; 55 } 56 if (self._ordered) { 57 self._buffer.put(pos, (data === undefined || data === null) ? null : data); 58 } 59 else { 60 self._buffer.push(data); 61 } 62 self._drain(); 63 }); 64 65 if (this._top - this._bottom < this._maxParallel) return callback(); 66 this._ondrain = callback; 67}; 68 69ParallelTransform.prototype._flush = function(callback) { 70 this._flushed = true; 71 this._ondrain = callback; 72 this._drain(); 73}; 74 75ParallelTransform.prototype._drain = function() { 76 if (this._ordered) { 77 while (this._buffer.get(this._bottom) !== undefined) { 78 var data = this._buffer.del(this._bottom++); 79 if (data === null) continue; 80 this.push(data); 81 } 82 } 83 else { 84 while (this._buffer.length > 0) { 85 var data = this._buffer.pop(); 86 this._bottom++; 87 if (data === null) continue; 88 this.push(data); 89 } 90 } 91 92 93 if (!this._drained() || !this._ondrain) return; 94 95 var ondrain = this._ondrain; 96 this._ondrain = null; 97 ondrain(); 98}; 99 100ParallelTransform.prototype._drained = function() { 101 var diff = this._top - this._bottom; 102 return this._flushed ? !diff : diff < this._maxParallel; 103}; 104 105module.exports = ParallelTransform; 106