• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2
3const DEFAULT_OPTIONS = {
4          workerOptions               : {}
5        , maxCallsPerWorker           : Infinity
6        , maxConcurrentWorkers        : (require('os').cpus() || { length: 1 }).length
7        , maxConcurrentCallsPerWorker : 10
8        , maxConcurrentCalls          : Infinity
9        , maxCallTime                 : Infinity // exceed this and the whole worker is terminated
10        , maxRetries                  : Infinity
11        , forcedKillTime              : 100
12        , autoStart                   : false
13        , onChild                     : function() {}
14      }
15
16const fork                    = require('./fork')
17    , TimeoutError            = require('errno').create('TimeoutError')
18    , ProcessTerminatedError  = require('errno').create('ProcessTerminatedError')
19    , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError')
20
21
22function Farm (options, path) {
23  this.options     = Object.assign({}, DEFAULT_OPTIONS, options)
24  this.path        = path
25  this.activeCalls = 0
26}
27
28
29// make a handle to pass back in the form of an external API
30Farm.prototype.mkhandle = function (method) {
31  return function () {
32    let args = Array.prototype.slice.call(arguments)
33    if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) {
34      let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')')
35      if (typeof args[args.length - 1] == 'function')
36        return process.nextTick(args[args.length - 1].bind(null, err))
37      throw err
38    }
39    this.addCall({
40        method   : method
41      , callback : args.pop()
42      , args     : args
43      , retries  : 0
44    })
45  }.bind(this)
46}
47
48
49// a constructor of sorts
50Farm.prototype.setup = function (methods) {
51  let iface
52  if (!methods) { // single-function export
53    iface = this.mkhandle()
54  } else { // multiple functions on the export
55    iface = {}
56    methods.forEach(function (m) {
57      iface[m] = this.mkhandle(m)
58    }.bind(this))
59  }
60
61  this.searchStart    = -1
62  this.childId        = -1
63  this.children       = {}
64  this.activeChildren = 0
65  this.callQueue      = []
66
67  if (this.options.autoStart) {
68    while (this.activeChildren < this.options.maxConcurrentWorkers)
69      this.startChild()
70  }
71
72  return iface
73}
74
75
76// when a child exits, check if there are any outstanding jobs and requeue them
77Farm.prototype.onExit = function (childId) {
78  // delay this to give any sends a chance to finish
79  setTimeout(function () {
80    let doQueue = false
81    if (this.children[childId] && this.children[childId].activeCalls) {
82      this.children[childId].calls.forEach(function (call, i) {
83        if (!call) return
84        else if (call.retries >= this.options.maxRetries) {
85          this.receive({
86              idx   : i
87            , child : childId
88            , args  : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ]
89          })
90        } else {
91          call.retries++
92          this.callQueue.unshift(call)
93          doQueue = true
94        }
95      }.bind(this))
96    }
97    this.stopChild(childId)
98    doQueue && this.processQueue()
99  }.bind(this), 10)
100}
101
102
103// start a new worker
104Farm.prototype.startChild = function () {
105  this.childId++
106
107  let forked = fork(this.path, this.options.workerOptions)
108    , id     = this.childId
109    , c      = {
110          send        : forked.send
111        , child       : forked.child
112        , calls       : []
113        , activeCalls : 0
114        , exitCode    : null
115      }
116
117  this.options.onChild(forked.child);
118
119  forked.child.on('message', function(data) {
120    if (data.owner !== 'farm') {
121      return;
122    }
123    this.receive(data);
124  }.bind(this))
125  forked.child.once('exit', function (code) {
126    c.exitCode = code
127    this.onExit(id)
128  }.bind(this))
129
130  this.activeChildren++
131  this.children[id] = c
132}
133
134
135// stop a worker, identified by id
136Farm.prototype.stopChild = function (childId) {
137  let child = this.children[childId]
138  if (child) {
139    child.send({owner: 'farm', event: 'die'})
140    setTimeout(function () {
141      if (child.exitCode === null)
142        child.child.kill('SIGKILL')
143    }, this.options.forcedKillTime).unref()
144    ;delete this.children[childId]
145    this.activeChildren--
146  }
147}
148
149
150// called from a child process, the data contains information needed to
151// look up the child and the original call so we can invoke the callback
152Farm.prototype.receive = function (data) {
153  let idx     = data.idx
154    , childId = data.child
155    , args    = data.args
156    , child   = this.children[childId]
157    , call
158
159  if (!child) {
160    return console.error(
161        'Worker Farm: Received message for unknown child. '
162      + 'This is likely as a result of premature child death, '
163      + 'the operation will have been re-queued.'
164    )
165  }
166
167  call = child.calls[idx]
168  if (!call) {
169    return console.error(
170        'Worker Farm: Received message for unknown index for existing child. '
171      + 'This should not happen!'
172    )
173  }
174
175  if (this.options.maxCallTime !== Infinity)
176    clearTimeout(call.timer)
177
178  if (args[0] && args[0].$error == '$error') {
179    let e = args[0]
180    switch (e.type) {
181      case 'TypeError': args[0] = new TypeError(e.message); break
182      case 'RangeError': args[0] = new RangeError(e.message); break
183      case 'EvalError': args[0] = new EvalError(e.message); break
184      case 'ReferenceError': args[0] = new ReferenceError(e.message); break
185      case 'SyntaxError': args[0] = new SyntaxError(e.message); break
186      case 'URIError': args[0] = new URIError(e.message); break
187      default: args[0] = new Error(e.message)
188    }
189    args[0].type = e.type
190    args[0].stack = e.stack
191
192    // Copy any custom properties to pass it on.
193    Object.keys(e).forEach(function(key) {
194      args[0][key] = e[key];
195    });
196  }
197
198  process.nextTick(function () {
199    call.callback.apply(null, args)
200  })
201
202  ;delete child.calls[idx]
203  child.activeCalls--
204  this.activeCalls--
205
206  if (child.calls.length >= this.options.maxCallsPerWorker
207      && !Object.keys(child.calls).length) {
208    // this child has finished its run, kill it
209    this.stopChild(childId)
210  }
211
212  // allow any outstanding calls to be processed
213  this.processQueue()
214}
215
216
217Farm.prototype.childTimeout = function (childId) {
218  let child = this.children[childId]
219    , i
220
221  if (!child)
222    return
223
224  for (i in child.calls) {
225    this.receive({
226        idx   : i
227      , child : childId
228      , args  : [ new TimeoutError('worker call timed out!') ]
229    })
230  }
231  this.stopChild(childId)
232}
233
234
235// send a call to a worker, identified by id
236Farm.prototype.send = function (childId, call) {
237  let child = this.children[childId]
238    , idx   = child.calls.length
239
240  child.calls.push(call)
241  child.activeCalls++
242  this.activeCalls++
243
244  child.send({
245      owner  : 'farm'
246    , idx    : idx
247    , child  : childId
248    , method : call.method
249    , args   : call.args
250  })
251
252  if (this.options.maxCallTime !== Infinity) {
253    call.timer =
254      setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime)
255  }
256}
257
258
259// a list of active worker ids, in order, but the starting offset is
260// shifted each time this method is called, so we work our way through
261// all workers when handing out jobs
262Farm.prototype.childKeys = function () {
263  let cka = Object.keys(this.children)
264    , cks
265
266  if (this.searchStart >= cka.length - 1)
267    this.searchStart = 0
268  else
269    this.searchStart++
270
271  cks = cka.splice(0, this.searchStart)
272
273  return cka.concat(cks)
274}
275
276
277// Calls are added to a queue, this processes the queue and is called
278// whenever there might be a chance to send more calls to the workers.
279// The various options all impact on when we're able to send calls,
280// they may need to be kept in a queue until a worker is ready.
281Farm.prototype.processQueue = function () {
282  let cka, i = 0, childId
283
284  if (!this.callQueue.length)
285    return this.ending && this.end()
286
287  if (this.activeChildren < this.options.maxConcurrentWorkers)
288    this.startChild()
289
290  for (cka = this.childKeys(); i < cka.length; i++) {
291    childId = +cka[i]
292    if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
293        && this.children[childId].calls.length < this.options.maxCallsPerWorker) {
294
295      this.send(childId, this.callQueue.shift())
296      if (!this.callQueue.length)
297        return this.ending && this.end()
298    } /*else {
299      console.log(
300        , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker
301        , this.children[childId].calls.length < this.options.maxCallsPerWorker
302        , this.children[childId].calls.length , this.options.maxCallsPerWorker)
303    }*/
304  }
305
306  if (this.ending)
307    this.end()
308}
309
310
311// add a new call to the call queue, then trigger a process of the queue
312Farm.prototype.addCall = function (call) {
313  if (this.ending)
314    return this.end() // don't add anything new to the queue
315  this.callQueue.push(call)
316  this.processQueue()
317}
318
319
320// kills child workers when they're all done
321Farm.prototype.end = function (callback) {
322  let complete = true
323  if (this.ending === false)
324    return
325  if (callback)
326    this.ending = callback
327  else if (this.ending == null)
328    this.ending = true
329  Object.keys(this.children).forEach(function (child) {
330    if (!this.children[child])
331      return
332    if (!this.children[child].activeCalls)
333      this.stopChild(child)
334    else
335      complete = false
336  }.bind(this))
337
338  if (complete && typeof this.ending == 'function') {
339    process.nextTick(function () {
340      this.ending()
341      this.ending = false
342    }.bind(this))
343  }
344}
345
346
347module.exports              = Farm
348module.exports.TimeoutError = TimeoutError
349