1'use strict' 2module.exports = RunQueue 3 4var validate = require('aproba') 5 6function RunQueue (opts) { 7 validate('Z|O', [opts]) 8 if (!opts) opts = {} 9 this.finished = false 10 this.inflight = 0 11 this.maxConcurrency = opts.maxConcurrency || 1 12 this.queued = 0 13 this.queue = [] 14 this.currentPrio = null 15 this.currentQueue = null 16 this.Promise = opts.Promise || global.Promise 17 this.deferred = {} 18} 19 20RunQueue.prototype = {} 21 22RunQueue.prototype.run = function () { 23 if (arguments.length !== 0) throw new Error('RunQueue.run takes no arguments') 24 var self = this 25 var deferred = this.deferred 26 if (!deferred.promise) { 27 deferred.promise = new this.Promise(function (resolve, reject) { 28 deferred.resolve = resolve 29 deferred.reject = reject 30 self._runQueue() 31 }) 32 } 33 return deferred.promise 34} 35 36RunQueue.prototype._runQueue = function () { 37 var self = this 38 39 while ((this.inflight < this.maxConcurrency) && this.queued) { 40 if (!this.currentQueue || this.currentQueue.length === 0) { 41 // wait till the current priority is entirely processed before 42 // starting a new one 43 if (this.inflight) return 44 var prios = Object.keys(this.queue) 45 for (var ii = 0; ii < prios.length; ++ii) { 46 var prioQueue = this.queue[prios[ii]] 47 if (prioQueue.length) { 48 this.currentQueue = prioQueue 49 this.currentPrio = prios[ii] 50 break 51 } 52 } 53 } 54 55 --this.queued 56 ++this.inflight 57 var next = this.currentQueue.shift() 58 var args = next.args || [] 59 60 // we explicitly construct a promise here so that queue items can throw 61 // or immediately return to resolve 62 var queueEntry = new this.Promise(function (resolve) { 63 resolve(next.cmd.apply(null, args)) 64 }) 65 66 queueEntry.then(function () { 67 --self.inflight 68 if (self.finished) return 69 if (self.queued <= 0 && self.inflight <= 0) { 70 self.finished = true 71 self.deferred.resolve() 72 } 73 self._runQueue() 74 }, function (err) { 75 self.finished = true 76 self.deferred.reject(err) 77 }) 78 } 79} 80 81RunQueue.prototype.add = function (prio, cmd, args) { 82 if (this.finished) throw new Error("Can't add to a finished queue. Create a new queue.") 83 if (Math.abs(Math.floor(prio)) !== prio) throw new Error('Priorities must be a positive integer value.') 84 validate('NFA|NFZ', [prio, cmd, args]) 85 prio = Number(prio) 86 if (!this.queue[prio]) this.queue[prio] = [] 87 ++this.queued 88 this.queue[prio].push({cmd: cmd, args: args}) 89 // if this priority is higher than the one we're currently processing, 90 // switch back to processing its queue. 91 if (this.currentPrio > prio) { 92 this.currentQueue = this.queue[prio] 93 this.currentPrio = prio 94 } 95} 96