1'use strict' 2 3const tape = require('tape') 4 , child_process = require('child_process') 5 , workerFarm = require('../') 6 , childPath = require.resolve('./child') 7 , fs = require('fs') 8 , os = require('os') 9 10function uniq (ar) { 11 let a = [], i, j 12 o: for (i = 0; i < ar.length; ++i) { 13 for (j = 0; j < a.length; ++j) if (a[j] == ar[i]) continue o 14 a[a.length] = ar[i] 15 } 16 return a 17} 18 19 20// a child where module.exports = function ... 21tape('simple, exports=function test', function (t) { 22 t.plan(4) 23 24 let child = workerFarm(childPath) 25 child(0, function (err, pid, rnd) { 26 t.ok(pid > process.pid, 'pid makes sense') 27 t.ok(pid < process.pid + 750, 'pid makes sense') 28 t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense') 29 }) 30 31 workerFarm.end(child, function () { 32 t.ok(true, 'workerFarm ended') 33 }) 34}) 35 36 37// a child where we have module.exports.fn = function ... 38tape('simple, exports.fn test', function (t) { 39 t.plan(4) 40 41 let child = workerFarm(childPath, [ 'run0' ]) 42 child.run0(function (err, pid, rnd) { 43 t.ok(pid > process.pid, 'pid makes sense') 44 t.ok(pid < process.pid + 750, 'pid makes sense') 45 t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense') 46 }) 47 48 workerFarm.end(child, function () { 49 t.ok(true, 'workerFarm ended') 50 }) 51}) 52 53 54tape('on child', function (t) { 55 t.plan(2) 56 57 let child = workerFarm({ onChild: function(subprocess) { childPid = subprocess.pid } }, childPath) 58 , childPid = null; 59 60 child(0, function(err, pid) { 61 t.equal(childPid, pid) 62 }) 63 64 workerFarm.end(child, function () { 65 t.ok(true, 'workerFarm ended') 66 }) 67}) 68 69 70// use the returned pids to check that we're using a single child process 71// when maxConcurrentWorkers = 1 72tape('single worker', function (t) { 73 t.plan(2) 74 75 let child = workerFarm({ maxConcurrentWorkers: 1 }, childPath) 76 , pids = [] 77 , i = 10 78 79 while (i--) { 80 child(0, function (err, pid) { 81 pids.push(pid) 82 if (pids.length == 10) { 83 t.equal(1, uniq(pids).length, 'only a single process (by pid)') 84 } else if (pids.length > 10) 85 t.fail('too many callbacks!') 86 }) 87 } 88 89 workerFarm.end(child, function () { 90 t.ok(true, 'workerFarm ended') 91 }) 92}) 93 94 95// use the returned pids to check that we're using two child processes 96// when maxConcurrentWorkers = 2 97tape('two workers', function (t) { 98 t.plan(2) 99 100 let child = workerFarm({ maxConcurrentWorkers: 2 }, childPath) 101 , pids = [] 102 , i = 10 103 104 while (i--) { 105 child(0, function (err, pid) { 106 pids.push(pid) 107 if (pids.length == 10) { 108 t.equal(2, uniq(pids).length, 'only two child processes (by pid)') 109 } else if (pids.length > 10) 110 t.fail('too many callbacks!') 111 }) 112 } 113 114 workerFarm.end(child, function () { 115 t.ok(true, 'workerFarm ended') 116 }) 117}) 118 119 120// use the returned pids to check that we're using a child process per 121// call when maxConcurrentWorkers = 10 122tape('many workers', function (t) { 123 t.plan(2) 124 125 let child = workerFarm({ maxConcurrentWorkers: 10 }, childPath) 126 , pids = [] 127 , i = 10 128 129 while (i--) { 130 child(1, function (err, pid) { 131 pids.push(pid) 132 if (pids.length == 10) { 133 t.equal(10, uniq(pids).length, 'pids are all the same (by pid)') 134 } else if (pids.length > 10) 135 t.fail('too many callbacks!') 136 }) 137 } 138 139 workerFarm.end(child, function () { 140 t.ok(true, 'workerFarm ended') 141 }) 142}) 143 144 145tape('auto start workers', function (t) { 146 let child = workerFarm({ maxConcurrentWorkers: 3, autoStart: true }, childPath, ['uptime']) 147 , pids = [] 148 , count = 5 149 , i = count 150 , delay = 250 151 152 t.plan(count + 1) 153 154 setTimeout(function() { 155 while (i--) 156 child.uptime(function (err, uptime) { 157 t.ok(uptime > 10, 'child has been up before the request (' + uptime + 'ms)') 158 }) 159 160 workerFarm.end(child, function () { 161 t.ok(true, 'workerFarm ended') 162 }) 163 }, delay) 164}) 165 166 167// use the returned pids to check that we're using a child process per 168// call when we set maxCallsPerWorker = 1 even when we have maxConcurrentWorkers = 1 169tape('single call per worker', function (t) { 170 t.plan(2) 171 172 let child = workerFarm({ 173 maxConcurrentWorkers: 1 174 , maxConcurrentCallsPerWorker: Infinity 175 , maxCallsPerWorker: 1 176 , autoStart: true 177 }, childPath) 178 , pids = [] 179 , count = 25 180 , i = count 181 182 while (i--) { 183 child(0, function (err, pid) { 184 pids.push(pid) 185 if (pids.length == count) { 186 t.equal(count, uniq(pids).length, 'one process for each call (by pid)') 187 workerFarm.end(child, function () { 188 t.ok(true, 'workerFarm ended') 189 }) 190 } else if (pids.length > count) 191 t.fail('too many callbacks!') 192 }) 193 } 194}) 195 196 197// use the returned pids to check that we're using a child process per 198// two-calls when we set maxCallsPerWorker = 2 even when we have maxConcurrentWorkers = 1 199tape('two calls per worker', function (t) { 200 t.plan(2) 201 202 let child = workerFarm({ 203 maxConcurrentWorkers: 1 204 , maxConcurrentCallsPerWorker: Infinity 205 , maxCallsPerWorker: 2 206 , autoStart: true 207 }, childPath) 208 , pids = [] 209 , count = 20 210 , i = count 211 212 while (i--) { 213 child(0, function (err, pid) { 214 pids.push(pid) 215 if (pids.length == count) { 216 t.equal(count / 2, uniq(pids).length, 'one process for each call (by pid)') 217 workerFarm.end(child, function () { 218 t.ok(true, 'workerFarm ended') 219 }) 220 } else if (pids.length > count) 221 t.fail('too many callbacks!') 222 }) 223 } 224}) 225 226 227// use timing to confirm that one worker will process calls sequentially 228tape('many concurrent calls', function (t) { 229 t.plan(2) 230 231 let child = workerFarm({ 232 maxConcurrentWorkers: 1 233 , maxConcurrentCallsPerWorker: Infinity 234 , maxCallsPerWorker: Infinity 235 , autoStart: true 236 }, childPath) 237 , defer = 200 238 , count = 200 239 , i = count 240 , cbc = 0 241 242 setTimeout(function () { 243 let start = Date.now() 244 245 while (i--) { 246 child(defer, function () { 247 if (++cbc == count) { 248 let time = Date.now() - start 249 // upper-limit not tied to `count` at all 250 t.ok(time > defer && time < (defer * 2.5), 'processed tasks concurrently (' + time + 'ms)') 251 workerFarm.end(child, function () { 252 t.ok(true, 'workerFarm ended') 253 }) 254 } else if (cbc > count) 255 t.fail('too many callbacks!') 256 }) 257 } 258 }, 250) 259}) 260 261 262// use timing to confirm that one child processes calls sequentially with 263// maxConcurrentCallsPerWorker = 1 264tape('single concurrent call', function (t) { 265 t.plan(2) 266 267 let child = workerFarm({ 268 maxConcurrentWorkers: 1 269 , maxConcurrentCallsPerWorker: 1 270 , maxCallsPerWorker: Infinity 271 , autoStart: true 272 }, childPath) 273 , defer = 20 274 , count = 100 275 , i = count 276 , cbc = 0 277 278 setTimeout(function () { 279 let start = Date.now() 280 281 while (i--) { 282 child(defer, function () { 283 if (++cbc == count) { 284 let time = Date.now() - start 285 // upper-limit tied closely to `count`, 1.3 is generous but accounts for all the timers 286 // coming back at the same time and the IPC overhead 287 t.ok(time > (defer * count) && time < (defer * count * 1.3), 'processed tasks sequentially (' + time + ')') 288 workerFarm.end(child, function () { 289 t.ok(true, 'workerFarm ended') 290 }) 291 } else if (cbc > count) 292 t.fail('too many callbacks!') 293 }) 294 } 295 }, 250) 296}) 297 298 299// use timing to confirm that one child processes *only* 5 calls concurrently 300tape('multiple concurrent calls', function (t) { 301 t.plan(2) 302 303 let callsPerWorker = 5 304 , child = workerFarm({ 305 maxConcurrentWorkers: 1 306 , maxConcurrentCallsPerWorker: callsPerWorker 307 , maxCallsPerWorker: Infinity 308 , autoStart: true 309 }, childPath) 310 , defer = 100 311 , count = 100 312 , i = count 313 , cbc = 0 314 315 setTimeout(function () { 316 let start = Date.now() 317 318 while (i--) { 319 child(defer, function () { 320 if (++cbc == count) { 321 let time = Date.now() - start 322 let min = defer * 1.5 323 // (defer * (count / callsPerWorker + 2)) - if precise it'd be count/callsPerWorker 324 // but accounting for IPC and other overhead, we need to give it a bit of extra time, 325 // hence the +2 326 let max = defer * (count / callsPerWorker + 2) 327 t.ok(time > min && time < max, 'processed tasks concurrently (' + time + ' > ' + min + ' && ' + time + ' < ' + max + ')') 328 workerFarm.end(child, function () { 329 t.ok(true, 'workerFarm ended') 330 }) 331 } else if (cbc > count) 332 t.fail('too many callbacks!') 333 }) 334 } 335 }, 250) 336}) 337 338 339// call a method that will die with a probability of 0.5 but expect that 340// we'll get results for each of our calls anyway 341tape('durability', function (t) { 342 t.plan(3) 343 344 let child = workerFarm({ maxConcurrentWorkers: 2 }, childPath, [ 'killable' ]) 345 , ids = [] 346 , pids = [] 347 , count = 20 348 , i = count 349 350 while (i--) { 351 child.killable(i, function (err, id, pid) { 352 ids.push(id) 353 pids.push(pid) 354 if (ids.length == count) { 355 t.ok(uniq(pids).length > 2, 'processed by many (' + uniq(pids).length + ') workers, but got there in the end!') 356 t.ok(uniq(ids).length == count, 'received a single result for each unique call') 357 workerFarm.end(child, function () { 358 t.ok(true, 'workerFarm ended') 359 }) 360 } else if (ids.length > count) 361 t.fail('too many callbacks!') 362 }) 363 } 364}) 365 366 367// a callback provided to .end() can and will be called (uses "simple, exports=function test" to create a child) 368tape('simple, end callback', function (t) { 369 t.plan(4) 370 371 let child = workerFarm(childPath) 372 child(0, function (err, pid, rnd) { 373 t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid) 374 t.ok(pid < process.pid + 750, 'pid makes sense ' + pid + ' vs ' + process.pid) 375 t.ok(rnd >= 0 && rnd < 1, 'rnd result makes sense') 376 }) 377 378 workerFarm.end(child, function() { 379 t.pass('an .end() callback was successfully called') 380 }) 381}) 382 383 384tape('call timeout test', function (t) { 385 t.plan(3 + 3 + 4 + 4 + 4 + 3 + 1) 386 387 let child = workerFarm({ maxCallTime: 250, maxConcurrentWorkers: 1 }, childPath) 388 389 // should come back ok 390 child(50, function (err, pid, rnd) { 391 t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid) 392 t.ok(pid < process.pid + 750, 'pid makes sense ' + pid + ' vs ' + process.pid) 393 t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd) 394 }) 395 396 // should come back ok 397 child(50, function (err, pid, rnd) { 398 t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid) 399 t.ok(pid < process.pid + 750, 'pid makes sense ' + pid + ' vs ' + process.pid) 400 t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd) 401 }) 402 403 // should die 404 child(500, function (err, pid, rnd) { 405 t.ok(err, 'got an error') 406 t.equal(err.type, 'TimeoutError', 'correct error type') 407 t.ok(pid === undefined, 'no pid') 408 t.ok(rnd === undefined, 'no rnd') 409 }) 410 411 // should die 412 child(1000, function (err, pid, rnd) { 413 t.ok(err, 'got an error') 414 t.equal(err.type, 'TimeoutError', 'correct error type') 415 t.ok(pid === undefined, 'no pid') 416 t.ok(rnd === undefined, 'no rnd') 417 }) 418 419 // should die even though it is only a 100ms task, it'll get caught up 420 // in a dying worker 421 setTimeout(function () { 422 child(100, function (err, pid, rnd) { 423 t.ok(err, 'got an error') 424 t.equal(err.type, 'TimeoutError', 'correct error type') 425 t.ok(pid === undefined, 'no pid') 426 t.ok(rnd === undefined, 'no rnd') 427 }) 428 }, 200) 429 430 // should be ok, new worker 431 setTimeout(function () { 432 child(50, function (err, pid, rnd) { 433 t.ok(pid > process.pid, 'pid makes sense ' + pid + ' vs ' + process.pid) 434 t.ok(pid < process.pid + 750, 'pid makes sense ' + pid + ' vs ' + process.pid) 435 t.ok(rnd > 0 && rnd < 1, 'rnd result makes sense ' + rnd) 436 }) 437 workerFarm.end(child, function () { 438 t.ok(true, 'workerFarm ended') 439 }) 440 }, 400) 441}) 442 443 444tape('test error passing', function (t) { 445 t.plan(10) 446 447 let child = workerFarm(childPath, [ 'err' ]) 448 child.err('Error', 'this is an Error', function (err) { 449 t.ok(err instanceof Error, 'is an Error object') 450 t.equal('Error', err.type, 'correct type') 451 t.equal('this is an Error', err.message, 'correct message') 452 }) 453 child.err('TypeError', 'this is a TypeError', function (err) { 454 t.ok(err instanceof Error, 'is a TypeError object') 455 t.equal('TypeError', err.type, 'correct type') 456 t.equal('this is a TypeError', err.message, 'correct message') 457 }) 458 child.err('Error', 'this is an Error with custom props', {foo: 'bar', 'baz': 1}, function (err) { 459 t.ok(err instanceof Error, 'is an Error object') 460 t.equal(err.foo, 'bar', 'passes data') 461 t.equal(err.baz, 1, 'passes data') 462 }) 463 464 workerFarm.end(child, function () { 465 t.ok(true, 'workerFarm ended') 466 }) 467}) 468 469 470tape('test maxConcurrentCalls', function (t) { 471 t.plan(10) 472 473 let child = workerFarm({ maxConcurrentCalls: 5 }, childPath) 474 475 child(50, function (err) { t.notOk(err, 'no error') }) 476 child(50, function (err) { t.notOk(err, 'no error') }) 477 child(50, function (err) { t.notOk(err, 'no error') }) 478 child(50, function (err) { t.notOk(err, 'no error') }) 479 child(50, function (err) { t.notOk(err, 'no error') }) 480 child(50, function (err) { 481 t.ok(err) 482 t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') 483 }) 484 child(50, function (err) { 485 t.ok(err) 486 t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') 487 }) 488 489 workerFarm.end(child, function () { 490 t.ok(true, 'workerFarm ended') 491 }) 492}) 493 494 495tape('test maxConcurrentCalls + queue', function (t) { 496 t.plan(13) 497 498 let child = workerFarm({ maxConcurrentCalls: 4, maxConcurrentWorkers: 2, maxConcurrentCallsPerWorker: 1 }, childPath) 499 500 child(20, function (err) { console.log('ended short1'); t.notOk(err, 'no error, short call 1') }) 501 child(20, function (err) { console.log('ended short2'); t.notOk(err, 'no error, short call 2') }) 502 child(300, function (err) { t.notOk(err, 'no error, long call 1') }) 503 child(300, function (err) { t.notOk(err, 'no error, long call 2') }) 504 child(20, function (err) { 505 t.ok(err, 'short call 3 should error') 506 t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') 507 }) 508 child(20, function (err) { 509 t.ok(err, 'short call 4 should error') 510 t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') 511 }) 512 513 // cross fingers and hope the two short jobs have ended 514 setTimeout(function () { 515 child(20, function (err) { t.notOk(err, 'no error, delayed short call 1') }) 516 child(20, function (err) { t.notOk(err, 'no error, delayed short call 2') }) 517 child(20, function (err) { 518 t.ok(err, 'delayed short call 3 should error') 519 t.equal(err.type, 'MaxConcurrentCallsError', 'correct error type') 520 }) 521 522 workerFarm.end(child, function () { 523 t.ok(true, 'workerFarm ended') 524 }) 525 }, 250) 526}) 527 528 529// this test should not keep the process running! if the test process 530// doesn't die then the problem is here 531tape('test timeout kill', function (t) { 532 t.plan(3) 533 534 let child = workerFarm({ maxCallTime: 250, maxConcurrentWorkers: 1 }, childPath, [ 'block' ]) 535 child.block(function (err) { 536 t.ok(err, 'got an error') 537 t.equal(err.type, 'TimeoutError', 'correct error type') 538 }) 539 540 workerFarm.end(child, function () { 541 t.ok(true, 'workerFarm ended') 542 }) 543}) 544 545 546tape('test max retries after process terminate', function (t) { 547 t.plan(7) 548 549 // temporary file is used to store the number of retries among terminating workers 550 let filepath1 = '.retries1' 551 let child1 = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 5}, childPath, [ 'stubborn' ]) 552 child1.stubborn(filepath1, function (err, result) { 553 t.notOk(err, 'no error') 554 t.equal(result, 12, 'correct result') 555 }) 556 557 workerFarm.end(child1, function () { 558 fs.unlinkSync(filepath1) 559 t.ok(true, 'workerFarm ended') 560 }) 561 562 let filepath2 = '.retries2' 563 let child2 = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 3}, childPath, [ 'stubborn' ]) 564 child2.stubborn(filepath2, function (err, result) { 565 t.ok(err, 'got an error') 566 t.equal(err.type, 'ProcessTerminatedError', 'correct error type') 567 t.equal(err.message, 'cancel after 3 retries!', 'correct message and number of retries') 568 }) 569 570 workerFarm.end(child2, function () { 571 fs.unlinkSync(filepath2) 572 t.ok(true, 'workerFarm ended') 573 }) 574}) 575 576 577tape('custom arguments can be passed to "fork"', function (t) { 578 t.plan(3) 579 580 // allocate a real, valid path, in any OS 581 let cwd = fs.realpathSync(os.tmpdir()) 582 , workerOptions = { 583 cwd : cwd 584 , execArgv : ['--expose-gc'] 585 } 586 , child = workerFarm({ maxConcurrentWorkers: 1, maxRetries: 5, workerOptions: workerOptions}, childPath, ['args']) 587 588 child.args(function (err, result) { 589 t.equal(result.execArgv[0], '--expose-gc', 'flags passed (overridden default)') 590 t.equal(result.cwd, cwd, 'correct cwd folder') 591 }) 592 593 workerFarm.end(child, function () { 594 t.ok(true, 'workerFarm ended') 595 }) 596}) 597 598 599tape('ensure --debug/--inspect not propagated to children', function (t) { 600 t.plan(3) 601 602 let script = __dirname + '/debug.js' 603 , debugArg = process.version.replace(/^v(\d+)\..*$/, '$1') >= 8 ? '--inspect' : '--debug=8881' 604 , child = child_process.spawn(process.execPath, [ debugArg, script ]) 605 , stdout = '' 606 607 child.stdout.on('data', function (data) { 608 stdout += data.toString() 609 }) 610 611 child.on('close', function (code) { 612 t.equal(code, 0, 'exited without error (' + code + ')') 613 t.ok(stdout.indexOf('FINISHED') > -1, 'process finished') 614 t.ok(stdout.indexOf('--debug') === -1, 'child does not receive debug flag') 615 }) 616}) 617