1'use strict'; 2 3const { 4 ArrayIsArray, 5 ObjectDefineProperty, 6 ObjectSetPrototypeOf, 7 Symbol, 8 Uint8Array, 9} = primordials; 10 11const { 12 errnoException, 13 codes: { 14 ERR_INVALID_ARG_TYPE, 15 ERR_INVALID_HANDLE_TYPE, 16 ERR_INVALID_OPT_VALUE, 17 ERR_INVALID_SYNC_FORK_INPUT, 18 ERR_IPC_CHANNEL_CLOSED, 19 ERR_IPC_DISCONNECTED, 20 ERR_IPC_ONE_PIPE, 21 ERR_IPC_SYNC_FORK, 22 ERR_MISSING_ARGS 23 } 24} = require('internal/errors'); 25const { validateString } = require('internal/validators'); 26const EventEmitter = require('events'); 27const net = require('net'); 28const dgram = require('dgram'); 29const inspect = require('internal/util/inspect').inspect; 30const assert = require('internal/assert'); 31 32const { Process } = internalBinding('process_wrap'); 33const { 34 WriteWrap, 35 kReadBytesOrError, 36 kArrayBufferOffset, 37 kLastWriteWasAsync, 38 streamBaseState 39} = internalBinding('stream_wrap'); 40const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap'); 41const { TCP } = internalBinding('tcp_wrap'); 42const { TTY } = internalBinding('tty_wrap'); 43const { UDP } = internalBinding('udp_wrap'); 44const SocketList = require('internal/socket_list'); 45const { owner_symbol } = require('internal/async_hooks').symbols; 46const { convertToValidSignal } = require('internal/util'); 47const { isArrayBufferView } = require('internal/util/types'); 48const spawn_sync = internalBinding('spawn_sync'); 49const { kStateSymbol } = require('internal/dgram'); 50 51const { 52 UV_EACCES, 53 UV_EAGAIN, 54 UV_EINVAL, 55 UV_EMFILE, 56 UV_ENFILE, 57 UV_ENOENT, 58 UV_ENOSYS, 59 UV_ESRCH 60} = internalBinding('uv'); 61 62const { SocketListSend, SocketListReceive } = SocketList; 63 64// Lazy loaded for startup performance and to allow monkey patching of 65// internalBinding('http_parser').HTTPParser. 66let freeParser; 67let HTTPParser; 68 69const MAX_HANDLE_RETRANSMISSIONS = 3; 70const kIsUsedAsStdio = Symbol('kIsUsedAsStdio'); 71 72// This object contain function to convert TCP objects to native handle objects 73// and back again. 74const handleConversion = { 75 'net.Native': { 76 simultaneousAccepts: true, 77 78 send(message, handle, options) { 79 return handle; 80 }, 81 82 got(message, handle, emit) { 83 emit(handle); 84 } 85 }, 86 87 'net.Server': { 88 simultaneousAccepts: true, 89 90 send(message, server, options) { 91 return server._handle; 92 }, 93 94 got(message, handle, emit) { 95 const server = new net.Server(); 96 server.listen(handle, () => { 97 emit(server); 98 }); 99 } 100 }, 101 102 'net.Socket': { 103 send(message, socket, options) { 104 if (!socket._handle) 105 return; 106 107 // If the socket was created by net.Server 108 if (socket.server) { 109 // The worker should keep track of the socket 110 message.key = socket.server._connectionKey; 111 112 const firstTime = !this.channel.sockets.send[message.key]; 113 const socketList = getSocketList('send', this, message.key); 114 115 // The server should no longer expose a .connection property 116 // and when asked to close it should query the socket status from 117 // the workers 118 if (firstTime) socket.server._setupWorker(socketList); 119 120 // Act like socket is detached 121 if (!options.keepOpen) 122 socket.server._connections--; 123 } 124 125 const handle = socket._handle; 126 127 // Remove handle from socket object, it will be closed when the socket 128 // will be sent 129 if (!options.keepOpen) { 130 handle.onread = nop; 131 socket._handle = null; 132 socket.setTimeout(0); 133 134 if (freeParser === undefined) 135 freeParser = require('_http_common').freeParser; 136 if (HTTPParser === undefined) 137 HTTPParser = require('_http_common').HTTPParser; 138 139 // In case of an HTTP connection socket, release the associated 140 // resources 141 if (socket.parser && socket.parser instanceof HTTPParser) { 142 freeParser(socket.parser, null, socket); 143 if (socket._httpMessage) 144 socket._httpMessage.detachSocket(socket); 145 } 146 } 147 148 return handle; 149 }, 150 151 postSend(message, handle, options, callback, target) { 152 // Store the handle after successfully sending it, so it can be closed 153 // when the NODE_HANDLE_ACK is received. If the handle could not be sent, 154 // just close it. 155 if (handle && !options.keepOpen) { 156 if (target) { 157 // There can only be one _pendingMessage as passing handles are 158 // processed one at a time: handles are stored in _handleQueue while 159 // waiting for the NODE_HANDLE_ACK of the current passing handle. 160 assert(!target._pendingMessage); 161 target._pendingMessage = 162 { callback, message, handle, options, retransmissions: 0 }; 163 } else { 164 handle.close(); 165 } 166 } 167 }, 168 169 got(message, handle, emit) { 170 const socket = new net.Socket({ 171 handle: handle, 172 readable: true, 173 writable: true 174 }); 175 176 // If the socket was created by net.Server we will track the socket 177 if (message.key) { 178 179 // Add socket to connections list 180 const socketList = getSocketList('got', this, message.key); 181 socketList.add({ 182 socket: socket 183 }); 184 } 185 186 emit(socket); 187 } 188 }, 189 190 'dgram.Native': { 191 simultaneousAccepts: false, 192 193 send(message, handle, options) { 194 return handle; 195 }, 196 197 got(message, handle, emit) { 198 emit(handle); 199 } 200 }, 201 202 'dgram.Socket': { 203 simultaneousAccepts: false, 204 205 send(message, socket, options) { 206 message.dgramType = socket.type; 207 208 return socket[kStateSymbol].handle; 209 }, 210 211 got(message, handle, emit) { 212 const socket = new dgram.Socket(message.dgramType); 213 214 socket.bind(handle, () => { 215 emit(socket); 216 }); 217 } 218 } 219}; 220 221function stdioStringToArray(stdio, channel) { 222 const options = []; 223 224 switch (stdio) { 225 case 'ignore': 226 case 'pipe': options.push(stdio, stdio, stdio); break; 227 case 'inherit': options.push(0, 1, 2); break; 228 default: 229 throw new ERR_INVALID_OPT_VALUE('stdio', stdio); 230 } 231 232 if (channel) options.push(channel); 233 234 return options; 235} 236 237function ChildProcess() { 238 EventEmitter.call(this); 239 240 this._closesNeeded = 1; 241 this._closesGot = 0; 242 this.connected = false; 243 244 this.signalCode = null; 245 this.exitCode = null; 246 this.killed = false; 247 this.spawnfile = null; 248 249 this._handle = new Process(); 250 this._handle[owner_symbol] = this; 251 252 this._handle.onexit = (exitCode, signalCode) => { 253 if (signalCode) { 254 this.signalCode = signalCode; 255 } else { 256 this.exitCode = exitCode; 257 } 258 259 if (this.stdin) { 260 this.stdin.destroy(); 261 } 262 263 this._handle.close(); 264 this._handle = null; 265 266 if (exitCode < 0) { 267 const syscall = this.spawnfile ? 'spawn ' + this.spawnfile : 'spawn'; 268 const err = errnoException(exitCode, syscall); 269 270 if (this.spawnfile) 271 err.path = this.spawnfile; 272 273 err.spawnargs = this.spawnargs.slice(1); 274 this.emit('error', err); 275 } else { 276 this.emit('exit', this.exitCode, this.signalCode); 277 } 278 279 // If any of the stdio streams have not been touched, 280 // then pull all the data through so that it can get the 281 // eof and emit a 'close' event. 282 // Do it on nextTick so that the user has one last chance 283 // to consume the output, if for example they only want to 284 // start reading the data once the process exits. 285 process.nextTick(flushStdio, this); 286 287 maybeClose(this); 288 }; 289} 290ObjectSetPrototypeOf(ChildProcess.prototype, EventEmitter.prototype); 291ObjectSetPrototypeOf(ChildProcess, EventEmitter); 292 293 294function flushStdio(subprocess) { 295 const stdio = subprocess.stdio; 296 297 if (stdio == null) return; 298 299 for (let i = 0; i < stdio.length; i++) { 300 const stream = stdio[i]; 301 // TODO(addaleax): This doesn't necessarily account for all the ways in 302 // which data can be read from a stream, e.g. being consumed on the 303 // native layer directly as a StreamBase. 304 if (!stream || !stream.readable || 305 stream._readableState.readableListening || 306 stream[kIsUsedAsStdio]) { 307 continue; 308 } 309 stream.resume(); 310 } 311} 312 313 314function createSocket(pipe, readable) { 315 return net.Socket({ handle: pipe, readable, writable: !readable }); 316} 317 318 319function getHandleWrapType(stream) { 320 if (stream instanceof Pipe) return 'pipe'; 321 if (stream instanceof TTY) return 'tty'; 322 if (stream instanceof TCP) return 'tcp'; 323 if (stream instanceof UDP) return 'udp'; 324 325 return false; 326} 327 328function closePendingHandle(target) { 329 target._pendingMessage.handle.close(); 330 target._pendingMessage = null; 331} 332 333 334ChildProcess.prototype.spawn = function(options) { 335 let i = 0; 336 337 if (options === null || typeof options !== 'object') { 338 throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); 339 } 340 341 // If no `stdio` option was given - use default 342 let stdio = options.stdio || 'pipe'; 343 344 stdio = getValidStdio(stdio, false); 345 346 const ipc = stdio.ipc; 347 const ipcFd = stdio.ipcFd; 348 stdio = options.stdio = stdio.stdio; 349 350 if (options.serialization !== undefined && 351 options.serialization !== 'json' && 352 options.serialization !== 'advanced') { 353 throw new ERR_INVALID_OPT_VALUE('options.serialization', 354 options.serialization); 355 } 356 357 const serialization = options.serialization || 'json'; 358 359 if (ipc !== undefined) { 360 // Let child process know about opened IPC channel 361 if (options.envPairs === undefined) 362 options.envPairs = []; 363 else if (!ArrayIsArray(options.envPairs)) { 364 throw new ERR_INVALID_ARG_TYPE('options.envPairs', 365 'Array', 366 options.envPairs); 367 } 368 369 options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`); 370 options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`); 371 } 372 373 validateString(options.file, 'options.file'); 374 this.spawnfile = options.file; 375 376 if (ArrayIsArray(options.args)) 377 this.spawnargs = options.args; 378 else if (options.args === undefined) 379 this.spawnargs = []; 380 else 381 throw new ERR_INVALID_ARG_TYPE('options.args', 'Array', options.args); 382 383 const err = this._handle.spawn(options); 384 385 // Run-time errors should emit an error, not throw an exception. 386 if (err === UV_EACCES || 387 err === UV_EAGAIN || 388 err === UV_EMFILE || 389 err === UV_ENFILE || 390 err === UV_ENOENT) { 391 process.nextTick(onErrorNT, this, err); 392 393 // There is no point in continuing when we've hit EMFILE or ENFILE 394 // because we won't be able to set up the stdio file descriptors. 395 if (err === UV_EMFILE || err === UV_ENFILE) 396 return err; 397 } else if (err) { 398 // Close all opened fds on error 399 for (i = 0; i < stdio.length; i++) { 400 const stream = stdio[i]; 401 if (stream.type === 'pipe') { 402 stream.handle.close(); 403 } 404 } 405 406 this._handle.close(); 407 this._handle = null; 408 throw errnoException(err, 'spawn'); 409 } 410 411 this.pid = this._handle.pid; 412 413 for (i = 0; i < stdio.length; i++) { 414 const stream = stdio[i]; 415 if (stream.type === 'ignore') continue; 416 417 if (stream.ipc) { 418 this._closesNeeded++; 419 continue; 420 } 421 422 // The stream is already cloned and piped, thus stop its readable side, 423 // otherwise we might attempt to read from the stream when at the same time 424 // the child process does. 425 if (stream.type === 'wrap') { 426 stream.handle.reading = false; 427 stream.handle.readStop(); 428 stream._stdio.pause(); 429 stream._stdio.readableFlowing = false; 430 stream._stdio._readableState.reading = false; 431 stream._stdio[kIsUsedAsStdio] = true; 432 continue; 433 } 434 435 if (stream.handle) { 436 // When i === 0 - we're dealing with stdin 437 // (which is the only one writable pipe). 438 stream.socket = createSocket(this.pid !== 0 ? 439 stream.handle : null, i > 0); 440 441 if (i > 0 && this.pid !== 0) { 442 this._closesNeeded++; 443 stream.socket.on('close', () => { 444 maybeClose(this); 445 }); 446 } 447 } 448 } 449 450 this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ? 451 stdio[0].socket : null; 452 this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ? 453 stdio[1].socket : null; 454 this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ? 455 stdio[2].socket : null; 456 457 this.stdio = []; 458 459 for (i = 0; i < stdio.length; i++) 460 this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket); 461 462 // Add .send() method and start listening for IPC data 463 if (ipc !== undefined) setupChannel(this, ipc, serialization); 464 465 return err; 466}; 467 468 469function onErrorNT(self, err) { 470 self._handle.onexit(err); 471} 472 473 474ChildProcess.prototype.kill = function(sig) { 475 476 const signal = sig === 0 ? sig : 477 convertToValidSignal(sig === undefined ? 'SIGTERM' : sig); 478 479 if (this._handle) { 480 const err = this._handle.kill(signal); 481 if (err === 0) { 482 /* Success. */ 483 this.killed = true; 484 return true; 485 } 486 if (err === UV_ESRCH) { 487 /* Already dead. */ 488 } else if (err === UV_EINVAL || err === UV_ENOSYS) { 489 /* The underlying platform doesn't support this signal. */ 490 throw errnoException(err, 'kill'); 491 } else { 492 /* Other error, almost certainly EPERM. */ 493 this.emit('error', errnoException(err, 'kill')); 494 } 495 } 496 497 /* Kill didn't succeed. */ 498 return false; 499}; 500 501 502ChildProcess.prototype.ref = function() { 503 if (this._handle) this._handle.ref(); 504}; 505 506 507ChildProcess.prototype.unref = function() { 508 if (this._handle) this._handle.unref(); 509}; 510 511class Control extends EventEmitter { 512 constructor(channel) { 513 super(); 514 this.channel = channel; 515 this.refs = 0; 516 } 517 ref() { 518 if (++this.refs === 1) { 519 this.channel.ref(); 520 } 521 } 522 unref() { 523 if (--this.refs === 0) { 524 this.channel.unref(); 525 this.emit('unref'); 526 } 527 } 528} 529 530let serialization; 531function setupChannel(target, channel, serializationMode) { 532 target.channel = channel; 533 534 // _channel can be deprecated in version 8 535 ObjectDefineProperty(target, '_channel', { 536 get() { return target.channel; }, 537 set(val) { target.channel = val; }, 538 enumerable: true 539 }); 540 541 target._handleQueue = null; 542 target._pendingMessage = null; 543 544 const control = new Control(channel); 545 546 if (serialization === undefined) 547 serialization = require('internal/child_process/serialization'); 548 const { 549 initMessageChannel, 550 parseChannelMessages, 551 writeChannelMessage 552 } = serialization[serializationMode]; 553 554 let pendingHandle = null; 555 initMessageChannel(channel); 556 channel.pendingHandle = null; 557 channel.onread = function(arrayBuffer) { 558 const recvHandle = channel.pendingHandle; 559 channel.pendingHandle = null; 560 if (arrayBuffer) { 561 const nread = streamBaseState[kReadBytesOrError]; 562 const offset = streamBaseState[kArrayBufferOffset]; 563 const pool = new Uint8Array(arrayBuffer, offset, nread); 564 if (recvHandle) 565 pendingHandle = recvHandle; 566 567 for (const message of parseChannelMessages(channel, pool)) { 568 // There will be at most one NODE_HANDLE message in every chunk we 569 // read because SCM_RIGHTS messages don't get coalesced. Make sure 570 // that we deliver the handle with the right message however. 571 if (isInternal(message)) { 572 if (message.cmd === 'NODE_HANDLE') { 573 handleMessage(message, pendingHandle, true); 574 pendingHandle = null; 575 } else { 576 handleMessage(message, undefined, true); 577 } 578 } else { 579 handleMessage(message, undefined, false); 580 } 581 } 582 } else { 583 this.buffering = false; 584 target.disconnect(); 585 channel.onread = nop; 586 channel.close(); 587 target.channel = null; 588 maybeClose(target); 589 } 590 }; 591 592 // Object where socket lists will live 593 channel.sockets = { got: {}, send: {} }; 594 595 // Handlers will go through this 596 target.on('internalMessage', function(message, handle) { 597 // Once acknowledged - continue sending handles. 598 if (message.cmd === 'NODE_HANDLE_ACK' || 599 message.cmd === 'NODE_HANDLE_NACK') { 600 601 if (target._pendingMessage) { 602 if (message.cmd === 'NODE_HANDLE_ACK') { 603 closePendingHandle(target); 604 } else if (target._pendingMessage.retransmissions++ === 605 MAX_HANDLE_RETRANSMISSIONS) { 606 closePendingHandle(target); 607 process.emitWarning('Handle did not reach the receiving process ' + 608 'correctly', 'SentHandleNotReceivedWarning'); 609 } 610 } 611 612 assert(ArrayIsArray(target._handleQueue)); 613 const queue = target._handleQueue; 614 target._handleQueue = null; 615 616 if (target._pendingMessage) { 617 target._send(target._pendingMessage.message, 618 target._pendingMessage.handle, 619 target._pendingMessage.options, 620 target._pendingMessage.callback); 621 } 622 623 for (let i = 0; i < queue.length; i++) { 624 const args = queue[i]; 625 target._send(args.message, args.handle, args.options, args.callback); 626 } 627 628 // Process a pending disconnect (if any). 629 if (!target.connected && target.channel && !target._handleQueue) 630 target._disconnect(); 631 632 return; 633 } 634 635 if (message.cmd !== 'NODE_HANDLE') return; 636 637 // It is possible that the handle is not received because of some error on 638 // ancillary data reception such as MSG_CTRUNC. In this case, report the 639 // sender about it by sending a NODE_HANDLE_NACK message. 640 if (!handle) 641 return target._send({ cmd: 'NODE_HANDLE_NACK' }, null, true); 642 643 // Acknowledge handle receival. Don't emit error events (for example if 644 // the other side has disconnected) because this call to send() is not 645 // initiated by the user and it shouldn't be fatal to be unable to ACK 646 // a message. 647 target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true); 648 649 const obj = handleConversion[message.type]; 650 651 // Update simultaneous accepts on Windows 652 if (process.platform === 'win32') { 653 handle.setSimultaneousAccepts(false); 654 } 655 656 // Convert handle object 657 obj.got.call(this, message, handle, (handle) => { 658 handleMessage(message.msg, handle, isInternal(message.msg)); 659 }); 660 }); 661 662 target.send = function(message, handle, options, callback) { 663 if (typeof handle === 'function') { 664 callback = handle; 665 handle = undefined; 666 options = undefined; 667 } else if (typeof options === 'function') { 668 callback = options; 669 options = undefined; 670 } else if (options !== undefined && 671 (options === null || typeof options !== 'object')) { 672 throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); 673 } 674 675 options = { swallowErrors: false, ...options }; 676 677 if (this.connected) { 678 return this._send(message, handle, options, callback); 679 } 680 const ex = new ERR_IPC_CHANNEL_CLOSED(); 681 if (typeof callback === 'function') { 682 process.nextTick(callback, ex); 683 } else { 684 process.nextTick(() => this.emit('error', ex)); 685 } 686 return false; 687 }; 688 689 target._send = function(message, handle, options, callback) { 690 assert(this.connected || this.channel); 691 692 if (message === undefined) 693 throw new ERR_MISSING_ARGS('message'); 694 695 // Non-serializable messages should not reach the remote 696 // end point; as any failure in the stringification there 697 // will result in error message that is weakly consumable. 698 // So perform a sanity check on message prior to sending. 699 if (typeof message !== 'string' && 700 typeof message !== 'object' && 701 typeof message !== 'number' && 702 typeof message !== 'boolean') { 703 throw new ERR_INVALID_ARG_TYPE( 704 'message', ['string', 'object', 'number', 'boolean'], message); 705 } 706 707 // Support legacy function signature 708 if (typeof options === 'boolean') { 709 options = { swallowErrors: options }; 710 } 711 712 let obj; 713 714 // Package messages with a handle object 715 if (handle) { 716 // This message will be handled by an internalMessage event handler 717 message = { 718 cmd: 'NODE_HANDLE', 719 type: null, 720 msg: message 721 }; 722 723 if (handle instanceof net.Socket) { 724 message.type = 'net.Socket'; 725 } else if (handle instanceof net.Server) { 726 message.type = 'net.Server'; 727 } else if (handle instanceof TCP || handle instanceof Pipe) { 728 message.type = 'net.Native'; 729 } else if (handle instanceof dgram.Socket) { 730 message.type = 'dgram.Socket'; 731 } else if (handle instanceof UDP) { 732 message.type = 'dgram.Native'; 733 } else { 734 throw new ERR_INVALID_HANDLE_TYPE(); 735 } 736 737 // Queue-up message and handle if we haven't received ACK yet. 738 if (this._handleQueue) { 739 this._handleQueue.push({ 740 callback: callback, 741 handle: handle, 742 options: options, 743 message: message.msg, 744 }); 745 return this._handleQueue.length === 1; 746 } 747 748 obj = handleConversion[message.type]; 749 750 // convert TCP object to native handle object 751 handle = handleConversion[message.type].send.call(target, 752 message, 753 handle, 754 options); 755 756 // If handle was sent twice, or it is impossible to get native handle 757 // out of it - just send a text without the handle. 758 if (!handle) 759 message = message.msg; 760 761 // Update simultaneous accepts on Windows 762 if (obj.simultaneousAccepts && process.platform === 'win32') { 763 handle.setSimultaneousAccepts(true); 764 } 765 } else if (this._handleQueue && 766 !(message && (message.cmd === 'NODE_HANDLE_ACK' || 767 message.cmd === 'NODE_HANDLE_NACK'))) { 768 // Queue request anyway to avoid out-of-order messages. 769 this._handleQueue.push({ 770 callback: callback, 771 handle: null, 772 options: options, 773 message: message, 774 }); 775 return this._handleQueue.length === 1; 776 } 777 778 const req = new WriteWrap(); 779 780 const err = writeChannelMessage(channel, req, message, handle); 781 const wasAsyncWrite = streamBaseState[kLastWriteWasAsync]; 782 783 if (err === 0) { 784 if (handle) { 785 if (!this._handleQueue) 786 this._handleQueue = []; 787 if (obj && obj.postSend) 788 obj.postSend(message, handle, options, callback, target); 789 } 790 791 if (wasAsyncWrite) { 792 req.oncomplete = () => { 793 control.unref(); 794 if (typeof callback === 'function') 795 callback(null); 796 }; 797 control.ref(); 798 } else if (typeof callback === 'function') { 799 process.nextTick(callback, null); 800 } 801 } else { 802 // Cleanup handle on error 803 if (obj && obj.postSend) 804 obj.postSend(message, handle, options, callback); 805 806 if (!options.swallowErrors) { 807 const ex = errnoException(err, 'write'); 808 if (typeof callback === 'function') { 809 process.nextTick(callback, ex); 810 } else { 811 process.nextTick(() => this.emit('error', ex)); 812 } 813 } 814 } 815 816 /* If the master is > 2 read() calls behind, please stop sending. */ 817 return channel.writeQueueSize < (65536 * 2); 818 }; 819 820 // Connected will be set to false immediately when a disconnect() is 821 // requested, even though the channel might still be alive internally to 822 // process queued messages. The three states are distinguished as follows: 823 // - disconnect() never requested: channel is not null and connected 824 // is true 825 // - disconnect() requested, messages in the queue: channel is not null 826 // and connected is false 827 // - disconnect() requested, channel actually disconnected: channel is 828 // null and connected is false 829 target.connected = true; 830 831 target.disconnect = function() { 832 if (!this.connected) { 833 this.emit('error', new ERR_IPC_DISCONNECTED()); 834 return; 835 } 836 837 // Do not allow any new messages to be written. 838 this.connected = false; 839 840 // If there are no queued messages, disconnect immediately. Otherwise, 841 // postpone the disconnect so that it happens internally after the 842 // queue is flushed. 843 if (!this._handleQueue) 844 this._disconnect(); 845 }; 846 847 target._disconnect = function() { 848 assert(this.channel); 849 850 // This marks the fact that the channel is actually disconnected. 851 this.channel = null; 852 853 if (this._pendingMessage) 854 closePendingHandle(this); 855 856 let fired = false; 857 function finish() { 858 if (fired) return; 859 fired = true; 860 861 channel.close(); 862 target.emit('disconnect'); 863 } 864 865 // If a message is being read, then wait for it to complete. 866 if (channel.buffering) { 867 this.once('message', finish); 868 this.once('internalMessage', finish); 869 870 return; 871 } 872 873 process.nextTick(finish); 874 }; 875 876 function emit(event, message, handle) { 877 target.emit(event, message, handle); 878 } 879 880 function handleMessage(message, handle, internal) { 881 if (!target.channel) 882 return; 883 884 const eventName = (internal ? 'internalMessage' : 'message'); 885 886 process.nextTick(emit, eventName, message, handle); 887 } 888 889 channel.readStart(); 890 return control; 891} 892 893const INTERNAL_PREFIX = 'NODE_'; 894function isInternal(message) { 895 return (message !== null && 896 typeof message === 'object' && 897 typeof message.cmd === 'string' && 898 message.cmd.length > INTERNAL_PREFIX.length && 899 message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX); 900} 901 902function nop() { } 903 904function getValidStdio(stdio, sync) { 905 let ipc; 906 let ipcFd; 907 908 // Replace shortcut with an array 909 if (typeof stdio === 'string') { 910 stdio = stdioStringToArray(stdio); 911 } else if (!ArrayIsArray(stdio)) { 912 throw new ERR_INVALID_OPT_VALUE('stdio', stdio); 913 } 914 915 // At least 3 stdio will be created 916 // Don't concat() a new Array() because it would be sparse, and 917 // stdio.reduce() would skip the sparse elements of stdio. 918 // See https://stackoverflow.com/a/5501711/3561 919 while (stdio.length < 3) stdio.push(undefined); 920 921 // Translate stdio into C++-readable form 922 // (i.e. PipeWraps or fds) 923 stdio = stdio.reduce((acc, stdio, i) => { 924 function cleanup() { 925 for (let i = 0; i < acc.length; i++) { 926 if ((acc[i].type === 'pipe' || acc[i].type === 'ipc') && acc[i].handle) 927 acc[i].handle.close(); 928 } 929 } 930 931 // Defaults 932 if (stdio == null) { 933 stdio = i < 3 ? 'pipe' : 'ignore'; 934 } 935 936 if (stdio === 'ignore') { 937 acc.push({ type: 'ignore' }); 938 } else if (stdio === 'pipe' || (typeof stdio === 'number' && stdio < 0)) { 939 const a = { 940 type: 'pipe', 941 readable: i === 0, 942 writable: i !== 0 943 }; 944 945 if (!sync) 946 a.handle = new Pipe(PipeConstants.SOCKET); 947 948 acc.push(a); 949 } else if (stdio === 'ipc') { 950 if (sync || ipc !== undefined) { 951 // Cleanup previously created pipes 952 cleanup(); 953 if (!sync) 954 throw new ERR_IPC_ONE_PIPE(); 955 else 956 throw new ERR_IPC_SYNC_FORK(); 957 } 958 959 ipc = new Pipe(PipeConstants.IPC); 960 ipcFd = i; 961 962 acc.push({ 963 type: 'pipe', 964 handle: ipc, 965 ipc: true 966 }); 967 } else if (stdio === 'inherit') { 968 acc.push({ 969 type: 'inherit', 970 fd: i 971 }); 972 } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') { 973 acc.push({ 974 type: 'fd', 975 fd: typeof stdio === 'number' ? stdio : stdio.fd 976 }); 977 } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) || 978 getHandleWrapType(stdio._handle)) { 979 const handle = getHandleWrapType(stdio) ? 980 stdio : 981 getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle; 982 983 acc.push({ 984 type: 'wrap', 985 wrapType: getHandleWrapType(handle), 986 handle: handle, 987 _stdio: stdio 988 }); 989 } else if (isArrayBufferView(stdio) || typeof stdio === 'string') { 990 if (!sync) { 991 cleanup(); 992 throw new ERR_INVALID_SYNC_FORK_INPUT(inspect(stdio)); 993 } 994 } else { 995 // Cleanup 996 cleanup(); 997 throw new ERR_INVALID_OPT_VALUE('stdio', stdio); 998 } 999 1000 return acc; 1001 }, []); 1002 1003 return { stdio, ipc, ipcFd }; 1004} 1005 1006 1007function getSocketList(type, worker, key) { 1008 const sockets = worker.channel.sockets[type]; 1009 let socketList = sockets[key]; 1010 if (!socketList) { 1011 const Construct = type === 'send' ? SocketListSend : SocketListReceive; 1012 socketList = sockets[key] = new Construct(worker, key); 1013 } 1014 return socketList; 1015} 1016 1017 1018function maybeClose(subprocess) { 1019 subprocess._closesGot++; 1020 1021 if (subprocess._closesGot === subprocess._closesNeeded) { 1022 subprocess.emit('close', subprocess.exitCode, subprocess.signalCode); 1023 } 1024} 1025 1026function spawnSync(opts) { 1027 const options = opts.options; 1028 const result = spawn_sync.spawn(options); 1029 1030 if (result.output && options.encoding && options.encoding !== 'buffer') { 1031 for (let i = 0; i < result.output.length; i++) { 1032 if (!result.output[i]) 1033 continue; 1034 result.output[i] = result.output[i].toString(options.encoding); 1035 } 1036 } 1037 1038 result.stdout = result.output && result.output[1]; 1039 result.stderr = result.output && result.output[2]; 1040 1041 if (result.error) { 1042 result.error = errnoException(result.error, 'spawnSync ' + opts.file); 1043 result.error.path = opts.file; 1044 result.error.spawnargs = opts.args.slice(1); 1045 } 1046 1047 return result; 1048} 1049 1050module.exports = { 1051 ChildProcess, 1052 setupChannel, 1053 getValidStdio, 1054 stdioStringToArray, 1055 spawnSync 1056}; 1057