• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2module.exports = RunQueue
3
4var validate = require('aproba')
5
6function RunQueue (opts) {
7  validate('Z|O', [opts])
8  if (!opts) opts = {}
9  this.finished = false
10  this.inflight = 0
11  this.maxConcurrency = opts.maxConcurrency || 1
12  this.queued = 0
13  this.queue = []
14  this.currentPrio = null
15  this.currentQueue = null
16  this.Promise = opts.Promise || global.Promise
17  this.deferred = {}
18}
19
20RunQueue.prototype = {}
21
22RunQueue.prototype.run = function () {
23  if (arguments.length !== 0) throw new Error('RunQueue.run takes no arguments')
24  var self = this
25  var deferred = this.deferred
26  if (!deferred.promise) {
27    deferred.promise = new this.Promise(function (resolve, reject) {
28      deferred.resolve = resolve
29      deferred.reject = reject
30      self._runQueue()
31    })
32  }
33  return deferred.promise
34}
35
36RunQueue.prototype._runQueue = function () {
37  var self = this
38
39  while ((this.inflight < this.maxConcurrency) && this.queued) {
40    if (!this.currentQueue || this.currentQueue.length === 0) {
41      // wait till the current priority is entirely processed before
42      // starting a new one
43      if (this.inflight) return
44      var prios = Object.keys(this.queue)
45      for (var ii = 0; ii < prios.length; ++ii) {
46        var prioQueue = this.queue[prios[ii]]
47        if (prioQueue.length) {
48          this.currentQueue = prioQueue
49          this.currentPrio = prios[ii]
50          break
51        }
52      }
53    }
54
55    --this.queued
56    ++this.inflight
57    var next = this.currentQueue.shift()
58    var args = next.args || []
59
60    // we explicitly construct a promise here so that queue items can throw
61    // or immediately return to resolve
62    var queueEntry = new this.Promise(function (resolve) {
63      resolve(next.cmd.apply(null, args))
64    })
65
66    queueEntry.then(function () {
67      --self.inflight
68      if (self.finished) return
69      if (self.queued <= 0 && self.inflight <= 0) {
70        self.finished = true
71        self.deferred.resolve()
72      }
73      self._runQueue()
74    }, function (err) {
75      self.finished = true
76      self.deferred.reject(err)
77    })
78  }
79}
80
81RunQueue.prototype.add = function (prio, cmd, args) {
82  if (this.finished) throw new Error("Can't add to a finished queue. Create a new queue.")
83  if (Math.abs(Math.floor(prio)) !== prio) throw new Error('Priorities must be a positive integer value.')
84  validate('NFA|NFZ', [prio, cmd, args])
85  prio = Number(prio)
86  if (!this.queue[prio]) this.queue[prio] = []
87  ++this.queued
88  this.queue[prio].push({cmd: cmd, args: args})
89  // if this priority is higher than the one we're currently processing,
90  // switch back to processing its queue.
91  if (this.currentPrio > prio) {
92    this.currentQueue = this.queue[prio]
93    this.currentPrio = prio
94  }
95}
96