1'use strict' 2 3const DEFAULT_OPTIONS = { 4 workerOptions : {} 5 , maxCallsPerWorker : Infinity 6 , maxConcurrentWorkers : (require('os').cpus() || { length: 1 }).length 7 , maxConcurrentCallsPerWorker : 10 8 , maxConcurrentCalls : Infinity 9 , maxCallTime : Infinity // exceed this and the whole worker is terminated 10 , maxRetries : Infinity 11 , forcedKillTime : 100 12 , autoStart : false 13 , onChild : function() {} 14 } 15 16const fork = require('./fork') 17 , TimeoutError = require('errno').create('TimeoutError') 18 , ProcessTerminatedError = require('errno').create('ProcessTerminatedError') 19 , MaxConcurrentCallsError = require('errno').create('MaxConcurrentCallsError') 20 21 22function Farm (options, path) { 23 this.options = Object.assign({}, DEFAULT_OPTIONS, options) 24 this.path = path 25 this.activeCalls = 0 26} 27 28 29// make a handle to pass back in the form of an external API 30Farm.prototype.mkhandle = function (method) { 31 return function () { 32 let args = Array.prototype.slice.call(arguments) 33 if (this.activeCalls + this.callQueue.length >= this.options.maxConcurrentCalls) { 34 let err = new MaxConcurrentCallsError('Too many concurrent calls (active: ' + this.activeCalls + ', queued: ' + this.callQueue.length + ')') 35 if (typeof args[args.length - 1] == 'function') 36 return process.nextTick(args[args.length - 1].bind(null, err)) 37 throw err 38 } 39 this.addCall({ 40 method : method 41 , callback : args.pop() 42 , args : args 43 , retries : 0 44 }) 45 }.bind(this) 46} 47 48 49// a constructor of sorts 50Farm.prototype.setup = function (methods) { 51 let iface 52 if (!methods) { // single-function export 53 iface = this.mkhandle() 54 } else { // multiple functions on the export 55 iface = {} 56 methods.forEach(function (m) { 57 iface[m] = this.mkhandle(m) 58 }.bind(this)) 59 } 60 61 this.searchStart = -1 62 this.childId = -1 63 this.children = {} 64 this.activeChildren = 0 65 this.callQueue = [] 66 67 if (this.options.autoStart) { 68 while (this.activeChildren < this.options.maxConcurrentWorkers) 69 this.startChild() 70 } 71 72 return iface 73} 74 75 76// when a child exits, check if there are any outstanding jobs and requeue them 77Farm.prototype.onExit = function (childId) { 78 // delay this to give any sends a chance to finish 79 setTimeout(function () { 80 let doQueue = false 81 if (this.children[childId] && this.children[childId].activeCalls) { 82 this.children[childId].calls.forEach(function (call, i) { 83 if (!call) return 84 else if (call.retries >= this.options.maxRetries) { 85 this.receive({ 86 idx : i 87 , child : childId 88 , args : [ new ProcessTerminatedError('cancel after ' + call.retries + ' retries!') ] 89 }) 90 } else { 91 call.retries++ 92 this.callQueue.unshift(call) 93 doQueue = true 94 } 95 }.bind(this)) 96 } 97 this.stopChild(childId) 98 doQueue && this.processQueue() 99 }.bind(this), 10) 100} 101 102 103// start a new worker 104Farm.prototype.startChild = function () { 105 this.childId++ 106 107 let forked = fork(this.path, this.options.workerOptions) 108 , id = this.childId 109 , c = { 110 send : forked.send 111 , child : forked.child 112 , calls : [] 113 , activeCalls : 0 114 , exitCode : null 115 } 116 117 this.options.onChild(forked.child); 118 119 forked.child.on('message', function(data) { 120 if (data.owner !== 'farm') { 121 return; 122 } 123 this.receive(data); 124 }.bind(this)) 125 forked.child.once('exit', function (code) { 126 c.exitCode = code 127 this.onExit(id) 128 }.bind(this)) 129 130 this.activeChildren++ 131 this.children[id] = c 132} 133 134 135// stop a worker, identified by id 136Farm.prototype.stopChild = function (childId) { 137 let child = this.children[childId] 138 if (child) { 139 child.send({owner: 'farm', event: 'die'}) 140 setTimeout(function () { 141 if (child.exitCode === null) 142 child.child.kill('SIGKILL') 143 }, this.options.forcedKillTime).unref() 144 ;delete this.children[childId] 145 this.activeChildren-- 146 } 147} 148 149 150// called from a child process, the data contains information needed to 151// look up the child and the original call so we can invoke the callback 152Farm.prototype.receive = function (data) { 153 let idx = data.idx 154 , childId = data.child 155 , args = data.args 156 , child = this.children[childId] 157 , call 158 159 if (!child) { 160 return console.error( 161 'Worker Farm: Received message for unknown child. ' 162 + 'This is likely as a result of premature child death, ' 163 + 'the operation will have been re-queued.' 164 ) 165 } 166 167 call = child.calls[idx] 168 if (!call) { 169 return console.error( 170 'Worker Farm: Received message for unknown index for existing child. ' 171 + 'This should not happen!' 172 ) 173 } 174 175 if (this.options.maxCallTime !== Infinity) 176 clearTimeout(call.timer) 177 178 if (args[0] && args[0].$error == '$error') { 179 let e = args[0] 180 switch (e.type) { 181 case 'TypeError': args[0] = new TypeError(e.message); break 182 case 'RangeError': args[0] = new RangeError(e.message); break 183 case 'EvalError': args[0] = new EvalError(e.message); break 184 case 'ReferenceError': args[0] = new ReferenceError(e.message); break 185 case 'SyntaxError': args[0] = new SyntaxError(e.message); break 186 case 'URIError': args[0] = new URIError(e.message); break 187 default: args[0] = new Error(e.message) 188 } 189 args[0].type = e.type 190 args[0].stack = e.stack 191 192 // Copy any custom properties to pass it on. 193 Object.keys(e).forEach(function(key) { 194 args[0][key] = e[key]; 195 }); 196 } 197 198 process.nextTick(function () { 199 call.callback.apply(null, args) 200 }) 201 202 ;delete child.calls[idx] 203 child.activeCalls-- 204 this.activeCalls-- 205 206 if (child.calls.length >= this.options.maxCallsPerWorker 207 && !Object.keys(child.calls).length) { 208 // this child has finished its run, kill it 209 this.stopChild(childId) 210 } 211 212 // allow any outstanding calls to be processed 213 this.processQueue() 214} 215 216 217Farm.prototype.childTimeout = function (childId) { 218 let child = this.children[childId] 219 , i 220 221 if (!child) 222 return 223 224 for (i in child.calls) { 225 this.receive({ 226 idx : i 227 , child : childId 228 , args : [ new TimeoutError('worker call timed out!') ] 229 }) 230 } 231 this.stopChild(childId) 232} 233 234 235// send a call to a worker, identified by id 236Farm.prototype.send = function (childId, call) { 237 let child = this.children[childId] 238 , idx = child.calls.length 239 240 child.calls.push(call) 241 child.activeCalls++ 242 this.activeCalls++ 243 244 child.send({ 245 owner : 'farm' 246 , idx : idx 247 , child : childId 248 , method : call.method 249 , args : call.args 250 }) 251 252 if (this.options.maxCallTime !== Infinity) { 253 call.timer = 254 setTimeout(this.childTimeout.bind(this, childId), this.options.maxCallTime) 255 } 256} 257 258 259// a list of active worker ids, in order, but the starting offset is 260// shifted each time this method is called, so we work our way through 261// all workers when handing out jobs 262Farm.prototype.childKeys = function () { 263 let cka = Object.keys(this.children) 264 , cks 265 266 if (this.searchStart >= cka.length - 1) 267 this.searchStart = 0 268 else 269 this.searchStart++ 270 271 cks = cka.splice(0, this.searchStart) 272 273 return cka.concat(cks) 274} 275 276 277// Calls are added to a queue, this processes the queue and is called 278// whenever there might be a chance to send more calls to the workers. 279// The various options all impact on when we're able to send calls, 280// they may need to be kept in a queue until a worker is ready. 281Farm.prototype.processQueue = function () { 282 let cka, i = 0, childId 283 284 if (!this.callQueue.length) 285 return this.ending && this.end() 286 287 if (this.activeChildren < this.options.maxConcurrentWorkers) 288 this.startChild() 289 290 for (cka = this.childKeys(); i < cka.length; i++) { 291 childId = +cka[i] 292 if (this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker 293 && this.children[childId].calls.length < this.options.maxCallsPerWorker) { 294 295 this.send(childId, this.callQueue.shift()) 296 if (!this.callQueue.length) 297 return this.ending && this.end() 298 } /*else { 299 console.log( 300 , this.children[childId].activeCalls < this.options.maxConcurrentCallsPerWorker 301 , this.children[childId].calls.length < this.options.maxCallsPerWorker 302 , this.children[childId].calls.length , this.options.maxCallsPerWorker) 303 }*/ 304 } 305 306 if (this.ending) 307 this.end() 308} 309 310 311// add a new call to the call queue, then trigger a process of the queue 312Farm.prototype.addCall = function (call) { 313 if (this.ending) 314 return this.end() // don't add anything new to the queue 315 this.callQueue.push(call) 316 this.processQueue() 317} 318 319 320// kills child workers when they're all done 321Farm.prototype.end = function (callback) { 322 let complete = true 323 if (this.ending === false) 324 return 325 if (callback) 326 this.ending = callback 327 else if (this.ending == null) 328 this.ending = true 329 Object.keys(this.children).forEach(function (child) { 330 if (!this.children[child]) 331 return 332 if (!this.children[child].activeCalls) 333 this.stopChild(child) 334 else 335 complete = false 336 }.bind(this)) 337 338 if (complete && typeof this.ending == 'function') { 339 process.nextTick(function () { 340 this.ending() 341 this.ending = false 342 }.bind(this)) 343 } 344} 345 346 347module.exports = Farm 348module.exports.TimeoutError = TimeoutError 349