1'use strict'; 2 3const { 4 ArrayIsArray, 5 ObjectDefineProperty, 6 ObjectSetPrototypeOf, 7 Symbol, 8 Uint8Array, 9} = primordials; 10 11const { 12 errnoException, 13 codes: { 14 ERR_INVALID_ARG_TYPE, 15 ERR_INVALID_HANDLE_TYPE, 16 ERR_INVALID_OPT_VALUE, 17 ERR_INVALID_SYNC_FORK_INPUT, 18 ERR_IPC_CHANNEL_CLOSED, 19 ERR_IPC_DISCONNECTED, 20 ERR_IPC_ONE_PIPE, 21 ERR_IPC_SYNC_FORK, 22 ERR_MISSING_ARGS 23 } 24} = require('internal/errors'); 25const { 26 validateArray, 27 validateOneOf, 28 validateString, 29} = require('internal/validators'); 30const EventEmitter = require('events'); 31const net = require('net'); 32const dgram = require('dgram'); 33const inspect = require('internal/util/inspect').inspect; 34const assert = require('internal/assert'); 35 36const { Process } = internalBinding('process_wrap'); 37const { 38 WriteWrap, 39 kReadBytesOrError, 40 kArrayBufferOffset, 41 kLastWriteWasAsync, 42 streamBaseState 43} = internalBinding('stream_wrap'); 44const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap'); 45const { TCP } = internalBinding('tcp_wrap'); 46const { TTY } = internalBinding('tty_wrap'); 47const { UDP } = internalBinding('udp_wrap'); 48const SocketList = require('internal/socket_list'); 49const { owner_symbol } = require('internal/async_hooks').symbols; 50const { convertToValidSignal, deprecate } = require('internal/util'); 51const { isArrayBufferView } = require('internal/util/types'); 52const spawn_sync = internalBinding('spawn_sync'); 53const { kStateSymbol } = require('internal/dgram'); 54 55const { 56 UV_EACCES, 57 UV_EAGAIN, 58 UV_EINVAL, 59 UV_EMFILE, 60 UV_ENFILE, 61 UV_ENOENT, 62 UV_ENOSYS, 63 UV_ESRCH 64} = internalBinding('uv'); 65 66const { SocketListSend, SocketListReceive } = SocketList; 67 68// Lazy loaded for startup performance and to allow monkey patching of 69// internalBinding('http_parser').HTTPParser. 70let freeParser; 71let HTTPParser; 72 73const MAX_HANDLE_RETRANSMISSIONS = 3; 74const kChannelHandle = Symbol('kChannelHandle'); 75const kIsUsedAsStdio = Symbol('kIsUsedAsStdio'); 76 77// This object contain function to convert TCP objects to native handle objects 78// and back again. 79const handleConversion = { 80 'net.Native': { 81 simultaneousAccepts: true, 82 83 send(message, handle, options) { 84 return handle; 85 }, 86 87 got(message, handle, emit) { 88 emit(handle); 89 } 90 }, 91 92 'net.Server': { 93 simultaneousAccepts: true, 94 95 send(message, server, options) { 96 return server._handle; 97 }, 98 99 got(message, handle, emit) { 100 const server = new net.Server(); 101 server.listen(handle, () => { 102 emit(server); 103 }); 104 } 105 }, 106 107 'net.Socket': { 108 send(message, socket, options) { 109 if (!socket._handle) 110 return; 111 112 // If the socket was created by net.Server 113 if (socket.server) { 114 // The worker should keep track of the socket 115 message.key = socket.server._connectionKey; 116 117 const firstTime = !this[kChannelHandle].sockets.send[message.key]; 118 const socketList = getSocketList('send', this, message.key); 119 120 // The server should no longer expose a .connection property 121 // and when asked to close it should query the socket status from 122 // the workers 123 if (firstTime) socket.server._setupWorker(socketList); 124 125 // Act like socket is detached 126 if (!options.keepOpen) 127 socket.server._connections--; 128 } 129 130 const handle = socket._handle; 131 132 // Remove handle from socket object, it will be closed when the socket 133 // will be sent 134 if (!options.keepOpen) { 135 handle.onread = nop; 136 socket._handle = null; 137 socket.setTimeout(0); 138 139 if (freeParser === undefined) 140 freeParser = require('_http_common').freeParser; 141 if (HTTPParser === undefined) 142 HTTPParser = require('_http_common').HTTPParser; 143 144 // In case of an HTTP connection socket, release the associated 145 // resources 146 if (socket.parser && socket.parser instanceof HTTPParser) { 147 freeParser(socket.parser, null, socket); 148 if (socket._httpMessage) 149 socket._httpMessage.detachSocket(socket); 150 } 151 } 152 153 return handle; 154 }, 155 156 postSend(message, handle, options, callback, target) { 157 // Store the handle after successfully sending it, so it can be closed 158 // when the NODE_HANDLE_ACK is received. If the handle could not be sent, 159 // just close it. 160 if (handle && !options.keepOpen) { 161 if (target) { 162 // There can only be one _pendingMessage as passing handles are 163 // processed one at a time: handles are stored in _handleQueue while 164 // waiting for the NODE_HANDLE_ACK of the current passing handle. 165 assert(!target._pendingMessage); 166 target._pendingMessage = 167 { callback, message, handle, options, retransmissions: 0 }; 168 } else { 169 handle.close(); 170 } 171 } 172 }, 173 174 got(message, handle, emit) { 175 const socket = new net.Socket({ 176 handle: handle, 177 readable: true, 178 writable: true 179 }); 180 181 // If the socket was created by net.Server we will track the socket 182 if (message.key) { 183 184 // Add socket to connections list 185 const socketList = getSocketList('got', this, message.key); 186 socketList.add({ 187 socket: socket 188 }); 189 } 190 191 emit(socket); 192 } 193 }, 194 195 'dgram.Native': { 196 simultaneousAccepts: false, 197 198 send(message, handle, options) { 199 return handle; 200 }, 201 202 got(message, handle, emit) { 203 emit(handle); 204 } 205 }, 206 207 'dgram.Socket': { 208 simultaneousAccepts: false, 209 210 send(message, socket, options) { 211 message.dgramType = socket.type; 212 213 return socket[kStateSymbol].handle; 214 }, 215 216 got(message, handle, emit) { 217 const socket = new dgram.Socket(message.dgramType); 218 219 socket.bind(handle, () => { 220 emit(socket); 221 }); 222 } 223 } 224}; 225 226function stdioStringToArray(stdio, channel) { 227 const options = []; 228 229 switch (stdio) { 230 case 'ignore': 231 case 'overlapped': 232 case 'pipe': options.push(stdio, stdio, stdio); break; 233 case 'inherit': options.push(0, 1, 2); break; 234 default: 235 throw new ERR_INVALID_OPT_VALUE('stdio', stdio); 236 } 237 238 if (channel) options.push(channel); 239 240 return options; 241} 242 243function ChildProcess() { 244 EventEmitter.call(this); 245 246 this._closesNeeded = 1; 247 this._closesGot = 0; 248 this.connected = false; 249 250 this.signalCode = null; 251 this.exitCode = null; 252 this.killed = false; 253 this.spawnfile = null; 254 255 this._handle = new Process(); 256 this._handle[owner_symbol] = this; 257 258 this._handle.onexit = (exitCode, signalCode) => { 259 if (signalCode) { 260 this.signalCode = signalCode; 261 } else { 262 this.exitCode = exitCode; 263 } 264 265 if (this.stdin) { 266 this.stdin.destroy(); 267 } 268 269 this._handle.close(); 270 this._handle = null; 271 272 if (exitCode < 0) { 273 const syscall = this.spawnfile ? 'spawn ' + this.spawnfile : 'spawn'; 274 const err = errnoException(exitCode, syscall); 275 276 if (this.spawnfile) 277 err.path = this.spawnfile; 278 279 err.spawnargs = this.spawnargs.slice(1); 280 this.emit('error', err); 281 } else { 282 this.emit('exit', this.exitCode, this.signalCode); 283 } 284 285 // If any of the stdio streams have not been touched, 286 // then pull all the data through so that it can get the 287 // eof and emit a 'close' event. 288 // Do it on nextTick so that the user has one last chance 289 // to consume the output, if for example they only want to 290 // start reading the data once the process exits. 291 process.nextTick(flushStdio, this); 292 293 maybeClose(this); 294 }; 295} 296ObjectSetPrototypeOf(ChildProcess.prototype, EventEmitter.prototype); 297ObjectSetPrototypeOf(ChildProcess, EventEmitter); 298 299 300function flushStdio(subprocess) { 301 const stdio = subprocess.stdio; 302 303 if (stdio == null) return; 304 305 for (let i = 0; i < stdio.length; i++) { 306 const stream = stdio[i]; 307 // TODO(addaleax): This doesn't necessarily account for all the ways in 308 // which data can be read from a stream, e.g. being consumed on the 309 // native layer directly as a StreamBase. 310 if (!stream || !stream.readable || stream[kIsUsedAsStdio]) { 311 continue; 312 } 313 stream.resume(); 314 } 315} 316 317 318function createSocket(pipe, readable) { 319 return net.Socket({ handle: pipe, readable, writable: !readable }); 320} 321 322 323function getHandleWrapType(stream) { 324 if (stream instanceof Pipe) return 'pipe'; 325 if (stream instanceof TTY) return 'tty'; 326 if (stream instanceof TCP) return 'tcp'; 327 if (stream instanceof UDP) return 'udp'; 328 329 return false; 330} 331 332function closePendingHandle(target) { 333 target._pendingMessage.handle.close(); 334 target._pendingMessage = null; 335} 336 337 338ChildProcess.prototype.spawn = function(options) { 339 let i = 0; 340 341 if (options === null || typeof options !== 'object') { 342 throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); 343 } 344 345 // If no `stdio` option was given - use default 346 let stdio = options.stdio || 'pipe'; 347 348 stdio = getValidStdio(stdio, false); 349 350 const ipc = stdio.ipc; 351 const ipcFd = stdio.ipcFd; 352 stdio = options.stdio = stdio.stdio; 353 354 355 validateOneOf(options.serialization, 'options.serialization', 356 [undefined, 'json', 'advanced'], true); 357 const serialization = options.serialization || 'json'; 358 359 if (ipc !== undefined) { 360 // Let child process know about opened IPC channel 361 if (options.envPairs === undefined) 362 options.envPairs = []; 363 else 364 validateArray(options.envPairs, 'options.envPairs'); 365 366 options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`); 367 options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`); 368 } 369 370 validateString(options.file, 'options.file'); 371 this.spawnfile = options.file; 372 373 if (ArrayIsArray(options.args)) 374 this.spawnargs = options.args; 375 else if (options.args === undefined) 376 this.spawnargs = []; 377 else 378 throw new ERR_INVALID_ARG_TYPE('options.args', 'Array', options.args); 379 380 const err = this._handle.spawn(options); 381 382 // Run-time errors should emit an error, not throw an exception. 383 if (err === UV_EACCES || 384 err === UV_EAGAIN || 385 err === UV_EMFILE || 386 err === UV_ENFILE || 387 err === UV_ENOENT) { 388 process.nextTick(onErrorNT, this, err); 389 390 // There is no point in continuing when we've hit EMFILE or ENFILE 391 // because we won't be able to set up the stdio file descriptors. 392 if (err === UV_EMFILE || err === UV_ENFILE) 393 return err; 394 } else if (err) { 395 // Close all opened fds on error 396 for (i = 0; i < stdio.length; i++) { 397 const stream = stdio[i]; 398 if (stream.type === 'pipe') { 399 stream.handle.close(); 400 } 401 } 402 403 this._handle.close(); 404 this._handle = null; 405 throw errnoException(err, 'spawn'); 406 } else { 407 process.nextTick(onSpawnNT, this); 408 } 409 410 this.pid = this._handle.pid; 411 412 for (i = 0; i < stdio.length; i++) { 413 const stream = stdio[i]; 414 if (stream.type === 'ignore') continue; 415 416 if (stream.ipc) { 417 this._closesNeeded++; 418 continue; 419 } 420 421 // The stream is already cloned and piped, thus stop its readable side, 422 // otherwise we might attempt to read from the stream when at the same time 423 // the child process does. 424 if (stream.type === 'wrap') { 425 stream.handle.reading = false; 426 stream.handle.readStop(); 427 stream._stdio.pause(); 428 stream._stdio.readableFlowing = false; 429 stream._stdio._readableState.reading = false; 430 stream._stdio[kIsUsedAsStdio] = true; 431 continue; 432 } 433 434 if (stream.handle) { 435 // When i === 0 - we're dealing with stdin 436 // (which is the only one writable pipe). 437 stream.socket = createSocket(this.pid !== 0 ? 438 stream.handle : null, i > 0); 439 440 if (i > 0 && this.pid !== 0) { 441 this._closesNeeded++; 442 stream.socket.on('close', () => { 443 maybeClose(this); 444 }); 445 } 446 } 447 } 448 449 this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ? 450 stdio[0].socket : null; 451 this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ? 452 stdio[1].socket : null; 453 this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ? 454 stdio[2].socket : null; 455 456 this.stdio = []; 457 458 for (i = 0; i < stdio.length; i++) 459 this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket); 460 461 // Add .send() method and start listening for IPC data 462 if (ipc !== undefined) setupChannel(this, ipc, serialization); 463 464 return err; 465}; 466 467 468function onErrorNT(self, err) { 469 self._handle.onexit(err); 470} 471 472 473function onSpawnNT(self) { 474 self.emit('spawn'); 475} 476 477 478ChildProcess.prototype.kill = function(sig) { 479 480 const signal = sig === 0 ? sig : 481 convertToValidSignal(sig === undefined ? 'SIGTERM' : sig); 482 483 if (this._handle) { 484 const err = this._handle.kill(signal); 485 if (err === 0) { 486 /* Success. */ 487 this.killed = true; 488 return true; 489 } 490 if (err === UV_ESRCH) { 491 /* Already dead. */ 492 } else if (err === UV_EINVAL || err === UV_ENOSYS) { 493 /* The underlying platform doesn't support this signal. */ 494 throw errnoException(err, 'kill'); 495 } else { 496 /* Other error, almost certainly EPERM. */ 497 this.emit('error', errnoException(err, 'kill')); 498 } 499 } 500 501 /* Kill didn't succeed. */ 502 return false; 503}; 504 505 506ChildProcess.prototype.ref = function() { 507 if (this._handle) this._handle.ref(); 508}; 509 510 511ChildProcess.prototype.unref = function() { 512 if (this._handle) this._handle.unref(); 513}; 514 515class Control extends EventEmitter { 516 #channel = null; 517 #refs = 0; 518 #refExplicitlySet = false; 519 520 constructor(channel) { 521 super(); 522 this.#channel = channel; 523 } 524 525 // The methods keeping track of the counter are being used to track the 526 // listener count on the child process object as well as when writes are 527 // in progress. Once the user has explicitly requested a certain state, these 528 // methods become no-ops in order to not interfere with the user's intentions. 529 refCounted() { 530 if (++this.#refs === 1 && !this.#refExplicitlySet) { 531 this.#channel.ref(); 532 } 533 } 534 535 unrefCounted() { 536 if (--this.#refs === 0 && !this.#refExplicitlySet) { 537 this.#channel.unref(); 538 this.emit('unref'); 539 } 540 } 541 542 ref() { 543 this.#refExplicitlySet = true; 544 this.#channel.ref(); 545 } 546 547 unref() { 548 this.#refExplicitlySet = true; 549 this.#channel.unref(); 550 } 551 552 get fd() { 553 return this.#channel ? this.#channel.fd : undefined; 554 } 555} 556 557const channelDeprecationMsg = '_channel is deprecated. ' + 558 'Use ChildProcess.channel instead.'; 559 560let serialization; 561function setupChannel(target, channel, serializationMode) { 562 const control = new Control(channel); 563 target.channel = control; 564 target[kChannelHandle] = channel; 565 566 ObjectDefineProperty(target, '_channel', { 567 get: deprecate(() => { 568 return target.channel; 569 }, channelDeprecationMsg, 'DEP0129'), 570 set: deprecate((val) => { 571 target.channel = val; 572 }, channelDeprecationMsg, 'DEP0129'), 573 configurable: true, 574 enumerable: false 575 }); 576 577 target._handleQueue = null; 578 target._pendingMessage = null; 579 580 if (serialization === undefined) 581 serialization = require('internal/child_process/serialization'); 582 const { 583 initMessageChannel, 584 parseChannelMessages, 585 writeChannelMessage 586 } = serialization[serializationMode]; 587 588 let pendingHandle = null; 589 initMessageChannel(channel); 590 channel.pendingHandle = null; 591 channel.onread = function(arrayBuffer) { 592 const recvHandle = channel.pendingHandle; 593 channel.pendingHandle = null; 594 if (arrayBuffer) { 595 const nread = streamBaseState[kReadBytesOrError]; 596 const offset = streamBaseState[kArrayBufferOffset]; 597 const pool = new Uint8Array(arrayBuffer, offset, nread); 598 if (recvHandle) 599 pendingHandle = recvHandle; 600 601 for (const message of parseChannelMessages(channel, pool)) { 602 // There will be at most one NODE_HANDLE message in every chunk we 603 // read because SCM_RIGHTS messages don't get coalesced. Make sure 604 // that we deliver the handle with the right message however. 605 if (isInternal(message)) { 606 if (message.cmd === 'NODE_HANDLE') { 607 handleMessage(message, pendingHandle, true); 608 pendingHandle = null; 609 } else { 610 handleMessage(message, undefined, true); 611 } 612 } else { 613 handleMessage(message, undefined, false); 614 } 615 } 616 } else { 617 this.buffering = false; 618 target.disconnect(); 619 channel.onread = nop; 620 channel.close(); 621 target.channel = null; 622 maybeClose(target); 623 } 624 }; 625 626 // Object where socket lists will live 627 channel.sockets = { got: {}, send: {} }; 628 629 // Handlers will go through this 630 target.on('internalMessage', function(message, handle) { 631 // Once acknowledged - continue sending handles. 632 if (message.cmd === 'NODE_HANDLE_ACK' || 633 message.cmd === 'NODE_HANDLE_NACK') { 634 635 if (target._pendingMessage) { 636 if (message.cmd === 'NODE_HANDLE_ACK') { 637 closePendingHandle(target); 638 } else if (target._pendingMessage.retransmissions++ === 639 MAX_HANDLE_RETRANSMISSIONS) { 640 closePendingHandle(target); 641 process.emitWarning('Handle did not reach the receiving process ' + 642 'correctly', 'SentHandleNotReceivedWarning'); 643 } 644 } 645 646 assert(ArrayIsArray(target._handleQueue)); 647 const queue = target._handleQueue; 648 target._handleQueue = null; 649 650 if (target._pendingMessage) { 651 target._send(target._pendingMessage.message, 652 target._pendingMessage.handle, 653 target._pendingMessage.options, 654 target._pendingMessage.callback); 655 } 656 657 for (let i = 0; i < queue.length; i++) { 658 const args = queue[i]; 659 target._send(args.message, args.handle, args.options, args.callback); 660 } 661 662 // Process a pending disconnect (if any). 663 if (!target.connected && target.channel && !target._handleQueue) 664 target._disconnect(); 665 666 return; 667 } 668 669 if (message.cmd !== 'NODE_HANDLE') return; 670 671 // It is possible that the handle is not received because of some error on 672 // ancillary data reception such as MSG_CTRUNC. In this case, report the 673 // sender about it by sending a NODE_HANDLE_NACK message. 674 if (!handle) 675 return target._send({ cmd: 'NODE_HANDLE_NACK' }, null, true); 676 677 // Acknowledge handle receival. Don't emit error events (for example if 678 // the other side has disconnected) because this call to send() is not 679 // initiated by the user and it shouldn't be fatal to be unable to ACK 680 // a message. 681 target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true); 682 683 const obj = handleConversion[message.type]; 684 685 // Update simultaneous accepts on Windows 686 if (process.platform === 'win32') { 687 handle.setSimultaneousAccepts(false); 688 } 689 690 // Convert handle object 691 obj.got.call(this, message, handle, (handle) => { 692 handleMessage(message.msg, handle, isInternal(message.msg)); 693 }); 694 }); 695 696 target.send = function(message, handle, options, callback) { 697 if (typeof handle === 'function') { 698 callback = handle; 699 handle = undefined; 700 options = undefined; 701 } else if (typeof options === 'function') { 702 callback = options; 703 options = undefined; 704 } else if (options !== undefined && 705 (options === null || typeof options !== 'object')) { 706 throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); 707 } 708 709 options = { swallowErrors: false, ...options }; 710 711 if (this.connected) { 712 return this._send(message, handle, options, callback); 713 } 714 const ex = new ERR_IPC_CHANNEL_CLOSED(); 715 if (typeof callback === 'function') { 716 process.nextTick(callback, ex); 717 } else { 718 process.nextTick(() => this.emit('error', ex)); 719 } 720 return false; 721 }; 722 723 target._send = function(message, handle, options, callback) { 724 assert(this.connected || this.channel); 725 726 if (message === undefined) 727 throw new ERR_MISSING_ARGS('message'); 728 729 // Non-serializable messages should not reach the remote 730 // end point; as any failure in the stringification there 731 // will result in error message that is weakly consumable. 732 // So perform a final check on message prior to sending. 733 if (typeof message !== 'string' && 734 typeof message !== 'object' && 735 typeof message !== 'number' && 736 typeof message !== 'boolean') { 737 throw new ERR_INVALID_ARG_TYPE( 738 'message', ['string', 'object', 'number', 'boolean'], message); 739 } 740 741 // Support legacy function signature 742 if (typeof options === 'boolean') { 743 options = { swallowErrors: options }; 744 } 745 746 let obj; 747 748 // Package messages with a handle object 749 if (handle) { 750 // This message will be handled by an internalMessage event handler 751 message = { 752 cmd: 'NODE_HANDLE', 753 type: null, 754 msg: message 755 }; 756 757 if (handle instanceof net.Socket) { 758 message.type = 'net.Socket'; 759 } else if (handle instanceof net.Server) { 760 message.type = 'net.Server'; 761 } else if (handle instanceof TCP || handle instanceof Pipe) { 762 message.type = 'net.Native'; 763 } else if (handle instanceof dgram.Socket) { 764 message.type = 'dgram.Socket'; 765 } else if (handle instanceof UDP) { 766 message.type = 'dgram.Native'; 767 } else { 768 throw new ERR_INVALID_HANDLE_TYPE(); 769 } 770 771 // Queue-up message and handle if we haven't received ACK yet. 772 if (this._handleQueue) { 773 this._handleQueue.push({ 774 callback: callback, 775 handle: handle, 776 options: options, 777 message: message.msg, 778 }); 779 return this._handleQueue.length === 1; 780 } 781 782 obj = handleConversion[message.type]; 783 784 // convert TCP object to native handle object 785 handle = handleConversion[message.type].send.call(target, 786 message, 787 handle, 788 options); 789 790 // If handle was sent twice, or it is impossible to get native handle 791 // out of it - just send a text without the handle. 792 if (!handle) 793 message = message.msg; 794 795 // Update simultaneous accepts on Windows 796 if (obj.simultaneousAccepts && process.platform === 'win32') { 797 handle.setSimultaneousAccepts(true); 798 } 799 } else if (this._handleQueue && 800 !(message && (message.cmd === 'NODE_HANDLE_ACK' || 801 message.cmd === 'NODE_HANDLE_NACK'))) { 802 // Queue request anyway to avoid out-of-order messages. 803 this._handleQueue.push({ 804 callback: callback, 805 handle: null, 806 options: options, 807 message: message, 808 }); 809 return this._handleQueue.length === 1; 810 } 811 812 const req = new WriteWrap(); 813 814 const err = writeChannelMessage(channel, req, message, handle); 815 const wasAsyncWrite = streamBaseState[kLastWriteWasAsync]; 816 817 if (err === 0) { 818 if (handle) { 819 if (!this._handleQueue) 820 this._handleQueue = []; 821 if (obj && obj.postSend) 822 obj.postSend(message, handle, options, callback, target); 823 } 824 825 if (wasAsyncWrite) { 826 req.oncomplete = () => { 827 control.unrefCounted(); 828 if (typeof callback === 'function') 829 callback(null); 830 }; 831 control.refCounted(); 832 } else if (typeof callback === 'function') { 833 process.nextTick(callback, null); 834 } 835 } else { 836 // Cleanup handle on error 837 if (obj && obj.postSend) 838 obj.postSend(message, handle, options, callback); 839 840 if (!options.swallowErrors) { 841 const ex = errnoException(err, 'write'); 842 if (typeof callback === 'function') { 843 process.nextTick(callback, ex); 844 } else { 845 process.nextTick(() => this.emit('error', ex)); 846 } 847 } 848 } 849 850 /* If the master is > 2 read() calls behind, please stop sending. */ 851 return channel.writeQueueSize < (65536 * 2); 852 }; 853 854 // Connected will be set to false immediately when a disconnect() is 855 // requested, even though the channel might still be alive internally to 856 // process queued messages. The three states are distinguished as follows: 857 // - disconnect() never requested: channel is not null and connected 858 // is true 859 // - disconnect() requested, messages in the queue: channel is not null 860 // and connected is false 861 // - disconnect() requested, channel actually disconnected: channel is 862 // null and connected is false 863 target.connected = true; 864 865 target.disconnect = function() { 866 if (!this.connected) { 867 this.emit('error', new ERR_IPC_DISCONNECTED()); 868 return; 869 } 870 871 // Do not allow any new messages to be written. 872 this.connected = false; 873 874 // If there are no queued messages, disconnect immediately. Otherwise, 875 // postpone the disconnect so that it happens internally after the 876 // queue is flushed. 877 if (!this._handleQueue) 878 this._disconnect(); 879 }; 880 881 target._disconnect = function() { 882 assert(this.channel); 883 884 // This marks the fact that the channel is actually disconnected. 885 this.channel = null; 886 this[kChannelHandle] = null; 887 888 if (this._pendingMessage) 889 closePendingHandle(this); 890 891 let fired = false; 892 function finish() { 893 if (fired) return; 894 fired = true; 895 896 channel.close(); 897 target.emit('disconnect'); 898 } 899 900 // If a message is being read, then wait for it to complete. 901 if (channel.buffering) { 902 this.once('message', finish); 903 this.once('internalMessage', finish); 904 905 return; 906 } 907 908 process.nextTick(finish); 909 }; 910 911 function emit(event, message, handle) { 912 target.emit(event, message, handle); 913 } 914 915 function handleMessage(message, handle, internal) { 916 if (!target.channel) 917 return; 918 919 const eventName = (internal ? 'internalMessage' : 'message'); 920 921 process.nextTick(emit, eventName, message, handle); 922 } 923 924 channel.readStart(); 925 return control; 926} 927 928const INTERNAL_PREFIX = 'NODE_'; 929function isInternal(message) { 930 return (message !== null && 931 typeof message === 'object' && 932 typeof message.cmd === 'string' && 933 message.cmd.length > INTERNAL_PREFIX.length && 934 message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX); 935} 936 937function nop() { } 938 939function getValidStdio(stdio, sync) { 940 let ipc; 941 let ipcFd; 942 943 // Replace shortcut with an array 944 if (typeof stdio === 'string') { 945 stdio = stdioStringToArray(stdio); 946 } else if (!ArrayIsArray(stdio)) { 947 throw new ERR_INVALID_OPT_VALUE('stdio', stdio); 948 } 949 950 // At least 3 stdio will be created 951 // Don't concat() a new Array() because it would be sparse, and 952 // stdio.reduce() would skip the sparse elements of stdio. 953 // See https://stackoverflow.com/a/5501711/3561 954 while (stdio.length < 3) stdio.push(undefined); 955 956 // Translate stdio into C++-readable form 957 // (i.e. PipeWraps or fds) 958 stdio = stdio.reduce((acc, stdio, i) => { 959 function cleanup() { 960 for (let i = 0; i < acc.length; i++) { 961 if ((acc[i].type === 'pipe' || acc[i].type === 'ipc') && acc[i].handle) 962 acc[i].handle.close(); 963 } 964 } 965 966 // Defaults 967 if (stdio == null) { 968 stdio = i < 3 ? 'pipe' : 'ignore'; 969 } 970 971 if (stdio === 'ignore') { 972 acc.push({ type: 'ignore' }); 973 } else if (stdio === 'pipe' || stdio === 'overlapped' || 974 (typeof stdio === 'number' && stdio < 0)) { 975 const a = { 976 type: stdio === 'overlapped' ? 'overlapped' : 'pipe', 977 readable: i === 0, 978 writable: i !== 0 979 }; 980 981 if (!sync) 982 a.handle = new Pipe(PipeConstants.SOCKET); 983 984 acc.push(a); 985 } else if (stdio === 'ipc') { 986 if (sync || ipc !== undefined) { 987 // Cleanup previously created pipes 988 cleanup(); 989 if (!sync) 990 throw new ERR_IPC_ONE_PIPE(); 991 else 992 throw new ERR_IPC_SYNC_FORK(); 993 } 994 995 ipc = new Pipe(PipeConstants.IPC); 996 ipcFd = i; 997 998 acc.push({ 999 type: 'pipe', 1000 handle: ipc, 1001 ipc: true 1002 }); 1003 } else if (stdio === 'inherit') { 1004 acc.push({ 1005 type: 'inherit', 1006 fd: i 1007 }); 1008 } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') { 1009 acc.push({ 1010 type: 'fd', 1011 fd: typeof stdio === 'number' ? stdio : stdio.fd 1012 }); 1013 } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) || 1014 getHandleWrapType(stdio._handle)) { 1015 const handle = getHandleWrapType(stdio) ? 1016 stdio : 1017 getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle; 1018 1019 acc.push({ 1020 type: 'wrap', 1021 wrapType: getHandleWrapType(handle), 1022 handle: handle, 1023 _stdio: stdio 1024 }); 1025 } else if (isArrayBufferView(stdio) || typeof stdio === 'string') { 1026 if (!sync) { 1027 cleanup(); 1028 throw new ERR_INVALID_SYNC_FORK_INPUT(inspect(stdio)); 1029 } 1030 } else { 1031 // Cleanup 1032 cleanup(); 1033 throw new ERR_INVALID_OPT_VALUE('stdio', stdio); 1034 } 1035 1036 return acc; 1037 }, []); 1038 1039 return { stdio, ipc, ipcFd }; 1040} 1041 1042 1043function getSocketList(type, worker, key) { 1044 const sockets = worker[kChannelHandle].sockets[type]; 1045 let socketList = sockets[key]; 1046 if (!socketList) { 1047 const Construct = type === 'send' ? SocketListSend : SocketListReceive; 1048 socketList = sockets[key] = new Construct(worker, key); 1049 } 1050 return socketList; 1051} 1052 1053 1054function maybeClose(subprocess) { 1055 subprocess._closesGot++; 1056 1057 if (subprocess._closesGot === subprocess._closesNeeded) { 1058 subprocess.emit('close', subprocess.exitCode, subprocess.signalCode); 1059 } 1060} 1061 1062function spawnSync(options) { 1063 const result = spawn_sync.spawn(options); 1064 1065 if (result.output && options.encoding && options.encoding !== 'buffer') { 1066 for (let i = 0; i < result.output.length; i++) { 1067 if (!result.output[i]) 1068 continue; 1069 result.output[i] = result.output[i].toString(options.encoding); 1070 } 1071 } 1072 1073 result.stdout = result.output && result.output[1]; 1074 result.stderr = result.output && result.output[2]; 1075 1076 if (result.error) { 1077 result.error = errnoException(result.error, 'spawnSync ' + options.file); 1078 result.error.path = options.file; 1079 result.error.spawnargs = options.args.slice(1); 1080 } 1081 1082 return result; 1083} 1084 1085module.exports = { 1086 ChildProcess, 1087 kChannelHandle, 1088 setupChannel, 1089 getValidStdio, 1090 stdioStringToArray, 1091 spawnSync 1092}; 1093