1'use strict'; 2 3const { 4 ArrayIsArray, 5 ArrayPrototypePush, 6 ObjectDefineProperty, 7 ObjectSetPrototypeOf, 8 ReflectApply, 9 Symbol, 10 Uint8Array, 11} = primordials; 12 13const { 14 errnoException, 15 codes: { 16 ERR_INVALID_ARG_TYPE, 17 ERR_INVALID_HANDLE_TYPE, 18 ERR_INVALID_OPT_VALUE, 19 ERR_INVALID_SYNC_FORK_INPUT, 20 ERR_IPC_CHANNEL_CLOSED, 21 ERR_IPC_DISCONNECTED, 22 ERR_IPC_ONE_PIPE, 23 ERR_IPC_SYNC_FORK, 24 ERR_MISSING_ARGS 25 } 26} = require('internal/errors'); 27const { 28 validateArray, 29 validateOneOf, 30 validateString, 31} = require('internal/validators'); 32const EventEmitter = require('events'); 33const net = require('net'); 34const dgram = require('dgram'); 35const inspect = require('internal/util/inspect').inspect; 36const assert = require('internal/assert'); 37 38const { Process } = internalBinding('process_wrap'); 39const { 40 WriteWrap, 41 kReadBytesOrError, 42 kArrayBufferOffset, 43 kLastWriteWasAsync, 44 streamBaseState 45} = internalBinding('stream_wrap'); 46const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap'); 47const { TCP } = internalBinding('tcp_wrap'); 48const { TTY } = internalBinding('tty_wrap'); 49const { UDP } = internalBinding('udp_wrap'); 50const SocketList = require('internal/socket_list'); 51const { owner_symbol } = require('internal/async_hooks').symbols; 52const { convertToValidSignal, deprecate } = require('internal/util'); 53const { isArrayBufferView } = require('internal/util/types'); 54const spawn_sync = internalBinding('spawn_sync'); 55const { kStateSymbol } = require('internal/dgram'); 56 57const { 58 UV_EACCES, 59 UV_EAGAIN, 60 UV_EINVAL, 61 UV_EMFILE, 62 UV_ENFILE, 63 UV_ENOENT, 64 UV_ENOSYS, 65 UV_ESRCH 66} = internalBinding('uv'); 67 68const { SocketListSend, SocketListReceive } = SocketList; 69 70// Lazy loaded for startup performance and to allow monkey patching of 71// internalBinding('http_parser').HTTPParser. 72let freeParser; 73let HTTPParser; 74 75const MAX_HANDLE_RETRANSMISSIONS = 3; 76const kChannelHandle = Symbol('kChannelHandle'); 77const kIsUsedAsStdio = Symbol('kIsUsedAsStdio'); 78const kPendingMessages = Symbol('kPendingMessages'); 79 80// This object contain function to convert TCP objects to native handle objects 81// and back again. 82const handleConversion = { 83 'net.Native': { 84 simultaneousAccepts: true, 85 86 send(message, handle, options) { 87 return handle; 88 }, 89 90 got(message, handle, emit) { 91 emit(handle); 92 } 93 }, 94 95 'net.Server': { 96 simultaneousAccepts: true, 97 98 send(message, server, options) { 99 return server._handle; 100 }, 101 102 got(message, handle, emit) { 103 const server = new net.Server(); 104 server.listen(handle, () => { 105 emit(server); 106 }); 107 } 108 }, 109 110 'net.Socket': { 111 send(message, socket, options) { 112 if (!socket._handle) 113 return; 114 115 // If the socket was created by net.Server 116 if (socket.server) { 117 // The worker should keep track of the socket 118 message.key = socket.server._connectionKey; 119 120 const firstTime = !this[kChannelHandle].sockets.send[message.key]; 121 const socketList = getSocketList('send', this, message.key); 122 123 // The server should no longer expose a .connection property 124 // and when asked to close it should query the socket status from 125 // the workers 126 if (firstTime) socket.server._setupWorker(socketList); 127 128 // Act like socket is detached 129 if (!options.keepOpen) 130 socket.server._connections--; 131 } 132 133 const handle = socket._handle; 134 135 // Remove handle from socket object, it will be closed when the socket 136 // will be sent 137 if (!options.keepOpen) { 138 handle.onread = nop; 139 socket._handle = null; 140 socket.setTimeout(0); 141 142 if (freeParser === undefined) 143 freeParser = require('_http_common').freeParser; 144 if (HTTPParser === undefined) 145 HTTPParser = require('_http_common').HTTPParser; 146 147 // In case of an HTTP connection socket, release the associated 148 // resources 149 if (socket.parser && socket.parser instanceof HTTPParser) { 150 freeParser(socket.parser, null, socket); 151 if (socket._httpMessage) 152 socket._httpMessage.detachSocket(socket); 153 } 154 } 155 156 return handle; 157 }, 158 159 postSend(message, handle, options, callback, target) { 160 // Store the handle after successfully sending it, so it can be closed 161 // when the NODE_HANDLE_ACK is received. If the handle could not be sent, 162 // just close it. 163 if (handle && !options.keepOpen) { 164 if (target) { 165 // There can only be one _pendingMessage as passing handles are 166 // processed one at a time: handles are stored in _handleQueue while 167 // waiting for the NODE_HANDLE_ACK of the current passing handle. 168 assert(!target._pendingMessage); 169 target._pendingMessage = 170 { callback, message, handle, options, retransmissions: 0 }; 171 } else { 172 handle.close(); 173 } 174 } 175 }, 176 177 got(message, handle, emit) { 178 const socket = new net.Socket({ 179 handle: handle, 180 readable: true, 181 writable: true 182 }); 183 184 // If the socket was created by net.Server we will track the socket 185 if (message.key) { 186 187 // Add socket to connections list 188 const socketList = getSocketList('got', this, message.key); 189 socketList.add({ 190 socket: socket 191 }); 192 } 193 194 emit(socket); 195 } 196 }, 197 198 'dgram.Native': { 199 simultaneousAccepts: false, 200 201 send(message, handle, options) { 202 return handle; 203 }, 204 205 got(message, handle, emit) { 206 emit(handle); 207 } 208 }, 209 210 'dgram.Socket': { 211 simultaneousAccepts: false, 212 213 send(message, socket, options) { 214 message.dgramType = socket.type; 215 216 return socket[kStateSymbol].handle; 217 }, 218 219 got(message, handle, emit) { 220 const socket = new dgram.Socket(message.dgramType); 221 222 socket.bind(handle, () => { 223 emit(socket); 224 }); 225 } 226 } 227}; 228 229function stdioStringToArray(stdio, channel) { 230 const options = []; 231 232 switch (stdio) { 233 case 'ignore': 234 case 'overlapped': 235 case 'pipe': options.push(stdio, stdio, stdio); break; 236 case 'inherit': options.push(0, 1, 2); break; 237 default: 238 throw new ERR_INVALID_OPT_VALUE('stdio', stdio); 239 } 240 241 if (channel) options.push(channel); 242 243 return options; 244} 245 246function ChildProcess() { 247 EventEmitter.call(this); 248 249 this._closesNeeded = 1; 250 this._closesGot = 0; 251 this.connected = false; 252 253 this.signalCode = null; 254 this.exitCode = null; 255 this.killed = false; 256 this.spawnfile = null; 257 258 this._handle = new Process(); 259 this._handle[owner_symbol] = this; 260 261 this._handle.onexit = (exitCode, signalCode) => { 262 if (signalCode) { 263 this.signalCode = signalCode; 264 } else { 265 this.exitCode = exitCode; 266 } 267 268 if (this.stdin) { 269 this.stdin.destroy(); 270 } 271 272 this._handle.close(); 273 this._handle = null; 274 275 if (exitCode < 0) { 276 const syscall = this.spawnfile ? 'spawn ' + this.spawnfile : 'spawn'; 277 const err = errnoException(exitCode, syscall); 278 279 if (this.spawnfile) 280 err.path = this.spawnfile; 281 282 err.spawnargs = this.spawnargs.slice(1); 283 this.emit('error', err); 284 } else { 285 this.emit('exit', this.exitCode, this.signalCode); 286 } 287 288 // If any of the stdio streams have not been touched, 289 // then pull all the data through so that it can get the 290 // eof and emit a 'close' event. 291 // Do it on nextTick so that the user has one last chance 292 // to consume the output, if for example they only want to 293 // start reading the data once the process exits. 294 process.nextTick(flushStdio, this); 295 296 maybeClose(this); 297 }; 298} 299ObjectSetPrototypeOf(ChildProcess.prototype, EventEmitter.prototype); 300ObjectSetPrototypeOf(ChildProcess, EventEmitter); 301 302 303function flushStdio(subprocess) { 304 const stdio = subprocess.stdio; 305 306 if (stdio == null) return; 307 308 for (let i = 0; i < stdio.length; i++) { 309 const stream = stdio[i]; 310 // TODO(addaleax): This doesn't necessarily account for all the ways in 311 // which data can be read from a stream, e.g. being consumed on the 312 // native layer directly as a StreamBase. 313 if (!stream || !stream.readable || stream[kIsUsedAsStdio]) { 314 continue; 315 } 316 stream.resume(); 317 } 318} 319 320 321function createSocket(pipe, readable) { 322 return net.Socket({ handle: pipe, readable, writable: !readable }); 323} 324 325 326function getHandleWrapType(stream) { 327 if (stream instanceof Pipe) return 'pipe'; 328 if (stream instanceof TTY) return 'tty'; 329 if (stream instanceof TCP) return 'tcp'; 330 if (stream instanceof UDP) return 'udp'; 331 332 return false; 333} 334 335function closePendingHandle(target) { 336 target._pendingMessage.handle.close(); 337 target._pendingMessage = null; 338} 339 340 341ChildProcess.prototype.spawn = function(options) { 342 let i = 0; 343 344 if (options === null || typeof options !== 'object') { 345 throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); 346 } 347 348 // If no `stdio` option was given - use default 349 let stdio = options.stdio || 'pipe'; 350 351 stdio = getValidStdio(stdio, false); 352 353 const ipc = stdio.ipc; 354 const ipcFd = stdio.ipcFd; 355 stdio = options.stdio = stdio.stdio; 356 357 358 validateOneOf(options.serialization, 'options.serialization', 359 [undefined, 'json', 'advanced'], true); 360 const serialization = options.serialization || 'json'; 361 362 if (ipc !== undefined) { 363 // Let child process know about opened IPC channel 364 if (options.envPairs === undefined) 365 options.envPairs = []; 366 else 367 validateArray(options.envPairs, 'options.envPairs'); 368 369 options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`); 370 options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`); 371 } 372 373 validateString(options.file, 'options.file'); 374 this.spawnfile = options.file; 375 376 if (ArrayIsArray(options.args)) 377 this.spawnargs = options.args; 378 else if (options.args === undefined) 379 this.spawnargs = []; 380 else 381 throw new ERR_INVALID_ARG_TYPE('options.args', 'Array', options.args); 382 383 const err = this._handle.spawn(options); 384 385 // Run-time errors should emit an error, not throw an exception. 386 if (err === UV_EACCES || 387 err === UV_EAGAIN || 388 err === UV_EMFILE || 389 err === UV_ENFILE || 390 err === UV_ENOENT) { 391 process.nextTick(onErrorNT, this, err); 392 393 // There is no point in continuing when we've hit EMFILE or ENFILE 394 // because we won't be able to set up the stdio file descriptors. 395 if (err === UV_EMFILE || err === UV_ENFILE) 396 return err; 397 } else if (err) { 398 // Close all opened fds on error 399 for (i = 0; i < stdio.length; i++) { 400 const stream = stdio[i]; 401 if (stream.type === 'pipe') { 402 stream.handle.close(); 403 } 404 } 405 406 this._handle.close(); 407 this._handle = null; 408 throw errnoException(err, 'spawn'); 409 } else { 410 process.nextTick(onSpawnNT, this); 411 } 412 413 this.pid = this._handle.pid; 414 415 for (i = 0; i < stdio.length; i++) { 416 const stream = stdio[i]; 417 if (stream.type === 'ignore') continue; 418 419 if (stream.ipc) { 420 this._closesNeeded++; 421 continue; 422 } 423 424 // The stream is already cloned and piped, thus stop its readable side, 425 // otherwise we might attempt to read from the stream when at the same time 426 // the child process does. 427 if (stream.type === 'wrap') { 428 stream.handle.reading = false; 429 stream.handle.readStop(); 430 stream._stdio.pause(); 431 stream._stdio.readableFlowing = false; 432 stream._stdio._readableState.reading = false; 433 stream._stdio[kIsUsedAsStdio] = true; 434 continue; 435 } 436 437 if (stream.handle) { 438 // When i === 0 - we're dealing with stdin 439 // (which is the only one writable pipe). 440 stream.socket = createSocket(this.pid !== 0 ? 441 stream.handle : null, i > 0); 442 443 if (i > 0 && this.pid !== 0) { 444 this._closesNeeded++; 445 stream.socket.on('close', () => { 446 maybeClose(this); 447 }); 448 } 449 } 450 } 451 452 this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ? 453 stdio[0].socket : null; 454 this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ? 455 stdio[1].socket : null; 456 this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ? 457 stdio[2].socket : null; 458 459 this.stdio = []; 460 461 for (i = 0; i < stdio.length; i++) 462 this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket); 463 464 // Add .send() method and start listening for IPC data 465 if (ipc !== undefined) setupChannel(this, ipc, serialization); 466 467 return err; 468}; 469 470 471function onErrorNT(self, err) { 472 self._handle.onexit(err); 473} 474 475 476function onSpawnNT(self) { 477 self.emit('spawn'); 478} 479 480 481ChildProcess.prototype.kill = function(sig) { 482 483 const signal = sig === 0 ? sig : 484 convertToValidSignal(sig === undefined ? 'SIGTERM' : sig); 485 486 if (this._handle) { 487 const err = this._handle.kill(signal); 488 if (err === 0) { 489 /* Success. */ 490 this.killed = true; 491 return true; 492 } 493 if (err === UV_ESRCH) { 494 /* Already dead. */ 495 } else if (err === UV_EINVAL || err === UV_ENOSYS) { 496 /* The underlying platform doesn't support this signal. */ 497 throw errnoException(err, 'kill'); 498 } else { 499 /* Other error, almost certainly EPERM. */ 500 this.emit('error', errnoException(err, 'kill')); 501 } 502 } 503 504 /* Kill didn't succeed. */ 505 return false; 506}; 507 508 509ChildProcess.prototype.ref = function() { 510 if (this._handle) this._handle.ref(); 511}; 512 513 514ChildProcess.prototype.unref = function() { 515 if (this._handle) this._handle.unref(); 516}; 517 518class Control extends EventEmitter { 519 #channel = null; 520 #refs = 0; 521 #refExplicitlySet = false; 522 523 constructor(channel) { 524 super(); 525 this.#channel = channel; 526 this[kPendingMessages] = []; 527 } 528 529 // The methods keeping track of the counter are being used to track the 530 // listener count on the child process object as well as when writes are 531 // in progress. Once the user has explicitly requested a certain state, these 532 // methods become no-ops in order to not interfere with the user's intentions. 533 refCounted() { 534 if (++this.#refs === 1 && !this.#refExplicitlySet) { 535 this.#channel.ref(); 536 } 537 } 538 539 unrefCounted() { 540 if (--this.#refs === 0 && !this.#refExplicitlySet) { 541 this.#channel.unref(); 542 this.emit('unref'); 543 } 544 } 545 546 ref() { 547 this.#refExplicitlySet = true; 548 this.#channel.ref(); 549 } 550 551 unref() { 552 this.#refExplicitlySet = true; 553 this.#channel.unref(); 554 } 555 556 get fd() { 557 return this.#channel ? this.#channel.fd : undefined; 558 } 559} 560 561const channelDeprecationMsg = '_channel is deprecated. ' + 562 'Use ChildProcess.channel instead.'; 563 564let serialization; 565function setupChannel(target, channel, serializationMode) { 566 const control = new Control(channel); 567 target.channel = control; 568 target[kChannelHandle] = channel; 569 570 ObjectDefineProperty(target, '_channel', { 571 get: deprecate(() => { 572 return target.channel; 573 }, channelDeprecationMsg, 'DEP0129'), 574 set: deprecate((val) => { 575 target.channel = val; 576 }, channelDeprecationMsg, 'DEP0129'), 577 configurable: true, 578 enumerable: false 579 }); 580 581 target._handleQueue = null; 582 target._pendingMessage = null; 583 584 if (serialization === undefined) 585 serialization = require('internal/child_process/serialization'); 586 const { 587 initMessageChannel, 588 parseChannelMessages, 589 writeChannelMessage 590 } = serialization[serializationMode]; 591 592 let pendingHandle = null; 593 initMessageChannel(channel); 594 channel.pendingHandle = null; 595 channel.onread = function(arrayBuffer) { 596 const recvHandle = channel.pendingHandle; 597 channel.pendingHandle = null; 598 if (arrayBuffer) { 599 const nread = streamBaseState[kReadBytesOrError]; 600 const offset = streamBaseState[kArrayBufferOffset]; 601 const pool = new Uint8Array(arrayBuffer, offset, nread); 602 if (recvHandle) 603 pendingHandle = recvHandle; 604 605 for (const message of parseChannelMessages(channel, pool)) { 606 // There will be at most one NODE_HANDLE message in every chunk we 607 // read because SCM_RIGHTS messages don't get coalesced. Make sure 608 // that we deliver the handle with the right message however. 609 if (isInternal(message)) { 610 if (message.cmd === 'NODE_HANDLE') { 611 handleMessage(message, pendingHandle, true); 612 pendingHandle = null; 613 } else { 614 handleMessage(message, undefined, true); 615 } 616 } else { 617 handleMessage(message, undefined, false); 618 } 619 } 620 } else { 621 this.buffering = false; 622 target.disconnect(); 623 channel.onread = nop; 624 channel.close(); 625 target.channel = null; 626 maybeClose(target); 627 } 628 }; 629 630 // Object where socket lists will live 631 channel.sockets = { got: {}, send: {} }; 632 633 // Handlers will go through this 634 target.on('internalMessage', function(message, handle) { 635 // Once acknowledged - continue sending handles. 636 if (message.cmd === 'NODE_HANDLE_ACK' || 637 message.cmd === 'NODE_HANDLE_NACK') { 638 639 if (target._pendingMessage) { 640 if (message.cmd === 'NODE_HANDLE_ACK') { 641 closePendingHandle(target); 642 } else if (target._pendingMessage.retransmissions++ === 643 MAX_HANDLE_RETRANSMISSIONS) { 644 closePendingHandle(target); 645 process.emitWarning('Handle did not reach the receiving process ' + 646 'correctly', 'SentHandleNotReceivedWarning'); 647 } 648 } 649 650 assert(ArrayIsArray(target._handleQueue)); 651 const queue = target._handleQueue; 652 target._handleQueue = null; 653 654 if (target._pendingMessage) { 655 target._send(target._pendingMessage.message, 656 target._pendingMessage.handle, 657 target._pendingMessage.options, 658 target._pendingMessage.callback); 659 } 660 661 for (let i = 0; i < queue.length; i++) { 662 const args = queue[i]; 663 target._send(args.message, args.handle, args.options, args.callback); 664 } 665 666 // Process a pending disconnect (if any). 667 if (!target.connected && target.channel && !target._handleQueue) 668 target._disconnect(); 669 670 return; 671 } 672 673 if (message.cmd !== 'NODE_HANDLE') return; 674 675 // It is possible that the handle is not received because of some error on 676 // ancillary data reception such as MSG_CTRUNC. In this case, report the 677 // sender about it by sending a NODE_HANDLE_NACK message. 678 if (!handle) 679 return target._send({ cmd: 'NODE_HANDLE_NACK' }, null, true); 680 681 // Acknowledge handle receival. Don't emit error events (for example if 682 // the other side has disconnected) because this call to send() is not 683 // initiated by the user and it shouldn't be fatal to be unable to ACK 684 // a message. 685 target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true); 686 687 const obj = handleConversion[message.type]; 688 689 // Update simultaneous accepts on Windows 690 if (process.platform === 'win32') { 691 handle.setSimultaneousAccepts(false); 692 } 693 694 // Convert handle object 695 obj.got.call(this, message, handle, (handle) => { 696 handleMessage(message.msg, handle, isInternal(message.msg)); 697 }); 698 }); 699 700 target.on('newListener', function() { 701 702 process.nextTick(() => { 703 if (!target.channel || !target.listenerCount('message')) 704 return; 705 706 const messages = target.channel[kPendingMessages]; 707 const { length } = messages; 708 if (!length) return; 709 710 for (let i = 0; i < length; i++) { 711 ReflectApply(target.emit, target, messages[i]); 712 } 713 714 target.channel[kPendingMessages] = []; 715 }); 716 }); 717 718 target.send = function(message, handle, options, callback) { 719 if (typeof handle === 'function') { 720 callback = handle; 721 handle = undefined; 722 options = undefined; 723 } else if (typeof options === 'function') { 724 callback = options; 725 options = undefined; 726 } else if (options !== undefined && 727 (options === null || typeof options !== 'object')) { 728 throw new ERR_INVALID_ARG_TYPE('options', 'Object', options); 729 } 730 731 options = { swallowErrors: false, ...options }; 732 733 if (this.connected) { 734 return this._send(message, handle, options, callback); 735 } 736 const ex = new ERR_IPC_CHANNEL_CLOSED(); 737 if (typeof callback === 'function') { 738 process.nextTick(callback, ex); 739 } else { 740 process.nextTick(() => this.emit('error', ex)); 741 } 742 return false; 743 }; 744 745 target._send = function(message, handle, options, callback) { 746 assert(this.connected || this.channel); 747 748 if (message === undefined) 749 throw new ERR_MISSING_ARGS('message'); 750 751 // Non-serializable messages should not reach the remote 752 // end point; as any failure in the stringification there 753 // will result in error message that is weakly consumable. 754 // So perform a final check on message prior to sending. 755 if (typeof message !== 'string' && 756 typeof message !== 'object' && 757 typeof message !== 'number' && 758 typeof message !== 'boolean') { 759 throw new ERR_INVALID_ARG_TYPE( 760 'message', ['string', 'object', 'number', 'boolean'], message); 761 } 762 763 // Support legacy function signature 764 if (typeof options === 'boolean') { 765 options = { swallowErrors: options }; 766 } 767 768 let obj; 769 770 // Package messages with a handle object 771 if (handle) { 772 // This message will be handled by an internalMessage event handler 773 message = { 774 cmd: 'NODE_HANDLE', 775 type: null, 776 msg: message 777 }; 778 779 if (handle instanceof net.Socket) { 780 message.type = 'net.Socket'; 781 } else if (handle instanceof net.Server) { 782 message.type = 'net.Server'; 783 } else if (handle instanceof TCP || handle instanceof Pipe) { 784 message.type = 'net.Native'; 785 } else if (handle instanceof dgram.Socket) { 786 message.type = 'dgram.Socket'; 787 } else if (handle instanceof UDP) { 788 message.type = 'dgram.Native'; 789 } else { 790 throw new ERR_INVALID_HANDLE_TYPE(); 791 } 792 793 // Queue-up message and handle if we haven't received ACK yet. 794 if (this._handleQueue) { 795 this._handleQueue.push({ 796 callback: callback, 797 handle: handle, 798 options: options, 799 message: message.msg, 800 }); 801 return this._handleQueue.length === 1; 802 } 803 804 obj = handleConversion[message.type]; 805 806 // convert TCP object to native handle object 807 handle = handleConversion[message.type].send.call(target, 808 message, 809 handle, 810 options); 811 812 // If handle was sent twice, or it is impossible to get native handle 813 // out of it - just send a text without the handle. 814 if (!handle) 815 message = message.msg; 816 817 // Update simultaneous accepts on Windows 818 if (obj.simultaneousAccepts && process.platform === 'win32') { 819 handle.setSimultaneousAccepts(true); 820 } 821 } else if (this._handleQueue && 822 !(message && (message.cmd === 'NODE_HANDLE_ACK' || 823 message.cmd === 'NODE_HANDLE_NACK'))) { 824 // Queue request anyway to avoid out-of-order messages. 825 this._handleQueue.push({ 826 callback: callback, 827 handle: null, 828 options: options, 829 message: message, 830 }); 831 return this._handleQueue.length === 1; 832 } 833 834 const req = new WriteWrap(); 835 836 const err = writeChannelMessage(channel, req, message, handle); 837 const wasAsyncWrite = streamBaseState[kLastWriteWasAsync]; 838 839 if (err === 0) { 840 if (handle) { 841 if (!this._handleQueue) 842 this._handleQueue = []; 843 if (obj && obj.postSend) 844 obj.postSend(message, handle, options, callback, target); 845 } 846 847 if (wasAsyncWrite) { 848 req.oncomplete = () => { 849 control.unrefCounted(); 850 if (typeof callback === 'function') 851 callback(null); 852 }; 853 control.refCounted(); 854 } else if (typeof callback === 'function') { 855 process.nextTick(callback, null); 856 } 857 } else { 858 // Cleanup handle on error 859 if (obj && obj.postSend) 860 obj.postSend(message, handle, options, callback); 861 862 if (!options.swallowErrors) { 863 const ex = errnoException(err, 'write'); 864 if (typeof callback === 'function') { 865 process.nextTick(callback, ex); 866 } else { 867 process.nextTick(() => this.emit('error', ex)); 868 } 869 } 870 } 871 872 /* If the master is > 2 read() calls behind, please stop sending. */ 873 return channel.writeQueueSize < (65536 * 2); 874 }; 875 876 // Connected will be set to false immediately when a disconnect() is 877 // requested, even though the channel might still be alive internally to 878 // process queued messages. The three states are distinguished as follows: 879 // - disconnect() never requested: channel is not null and connected 880 // is true 881 // - disconnect() requested, messages in the queue: channel is not null 882 // and connected is false 883 // - disconnect() requested, channel actually disconnected: channel is 884 // null and connected is false 885 target.connected = true; 886 887 target.disconnect = function() { 888 if (!this.connected) { 889 this.emit('error', new ERR_IPC_DISCONNECTED()); 890 return; 891 } 892 893 // Do not allow any new messages to be written. 894 this.connected = false; 895 896 // If there are no queued messages, disconnect immediately. Otherwise, 897 // postpone the disconnect so that it happens internally after the 898 // queue is flushed. 899 if (!this._handleQueue) 900 this._disconnect(); 901 }; 902 903 target._disconnect = function() { 904 assert(this.channel); 905 906 // This marks the fact that the channel is actually disconnected. 907 this.channel = null; 908 this[kChannelHandle] = null; 909 910 if (this._pendingMessage) 911 closePendingHandle(this); 912 913 let fired = false; 914 function finish() { 915 if (fired) return; 916 fired = true; 917 918 channel.close(); 919 target.emit('disconnect'); 920 } 921 922 // If a message is being read, then wait for it to complete. 923 if (channel.buffering) { 924 this.once('message', finish); 925 this.once('internalMessage', finish); 926 927 return; 928 } 929 930 process.nextTick(finish); 931 }; 932 933 function emit(event, message, handle) { 934 if ('internalMessage' === event || target.listenerCount('message')) { 935 target.emit(event, message, handle); 936 return; 937 } 938 939 ArrayPrototypePush( 940 target.channel[kPendingMessages], 941 [event, message, handle] 942 ); 943 } 944 945 function handleMessage(message, handle, internal) { 946 if (!target.channel) 947 return; 948 949 const eventName = (internal ? 'internalMessage' : 'message'); 950 951 process.nextTick(emit, eventName, message, handle); 952 } 953 954 channel.readStart(); 955 return control; 956} 957 958const INTERNAL_PREFIX = 'NODE_'; 959function isInternal(message) { 960 return (message !== null && 961 typeof message === 'object' && 962 typeof message.cmd === 'string' && 963 message.cmd.length > INTERNAL_PREFIX.length && 964 message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX); 965} 966 967function nop() { } 968 969function getValidStdio(stdio, sync) { 970 let ipc; 971 let ipcFd; 972 973 // Replace shortcut with an array 974 if (typeof stdio === 'string') { 975 stdio = stdioStringToArray(stdio); 976 } else if (!ArrayIsArray(stdio)) { 977 throw new ERR_INVALID_OPT_VALUE('stdio', stdio); 978 } 979 980 // At least 3 stdio will be created 981 // Don't concat() a new Array() because it would be sparse, and 982 // stdio.reduce() would skip the sparse elements of stdio. 983 // See https://stackoverflow.com/a/5501711/3561 984 while (stdio.length < 3) stdio.push(undefined); 985 986 // Translate stdio into C++-readable form 987 // (i.e. PipeWraps or fds) 988 stdio = stdio.reduce((acc, stdio, i) => { 989 function cleanup() { 990 for (let i = 0; i < acc.length; i++) { 991 if ((acc[i].type === 'pipe' || acc[i].type === 'ipc') && acc[i].handle) 992 acc[i].handle.close(); 993 } 994 } 995 996 // Defaults 997 if (stdio == null) { 998 stdio = i < 3 ? 'pipe' : 'ignore'; 999 } 1000 1001 if (stdio === 'ignore') { 1002 acc.push({ type: 'ignore' }); 1003 } else if (stdio === 'pipe' || stdio === 'overlapped' || 1004 (typeof stdio === 'number' && stdio < 0)) { 1005 const a = { 1006 type: stdio === 'overlapped' ? 'overlapped' : 'pipe', 1007 readable: i === 0, 1008 writable: i !== 0 1009 }; 1010 1011 if (!sync) 1012 a.handle = new Pipe(PipeConstants.SOCKET); 1013 1014 acc.push(a); 1015 } else if (stdio === 'ipc') { 1016 if (sync || ipc !== undefined) { 1017 // Cleanup previously created pipes 1018 cleanup(); 1019 if (!sync) 1020 throw new ERR_IPC_ONE_PIPE(); 1021 else 1022 throw new ERR_IPC_SYNC_FORK(); 1023 } 1024 1025 ipc = new Pipe(PipeConstants.IPC); 1026 ipcFd = i; 1027 1028 acc.push({ 1029 type: 'pipe', 1030 handle: ipc, 1031 ipc: true 1032 }); 1033 } else if (stdio === 'inherit') { 1034 acc.push({ 1035 type: 'inherit', 1036 fd: i 1037 }); 1038 } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') { 1039 acc.push({ 1040 type: 'fd', 1041 fd: typeof stdio === 'number' ? stdio : stdio.fd 1042 }); 1043 } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) || 1044 getHandleWrapType(stdio._handle)) { 1045 const handle = getHandleWrapType(stdio) ? 1046 stdio : 1047 getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle; 1048 1049 acc.push({ 1050 type: 'wrap', 1051 wrapType: getHandleWrapType(handle), 1052 handle: handle, 1053 _stdio: stdio 1054 }); 1055 } else if (isArrayBufferView(stdio) || typeof stdio === 'string') { 1056 if (!sync) { 1057 cleanup(); 1058 throw new ERR_INVALID_SYNC_FORK_INPUT(inspect(stdio)); 1059 } 1060 } else { 1061 // Cleanup 1062 cleanup(); 1063 throw new ERR_INVALID_OPT_VALUE('stdio', stdio); 1064 } 1065 1066 return acc; 1067 }, []); 1068 1069 return { stdio, ipc, ipcFd }; 1070} 1071 1072 1073function getSocketList(type, worker, key) { 1074 const sockets = worker[kChannelHandle].sockets[type]; 1075 let socketList = sockets[key]; 1076 if (!socketList) { 1077 const Construct = type === 'send' ? SocketListSend : SocketListReceive; 1078 socketList = sockets[key] = new Construct(worker, key); 1079 } 1080 return socketList; 1081} 1082 1083 1084function maybeClose(subprocess) { 1085 subprocess._closesGot++; 1086 1087 if (subprocess._closesGot === subprocess._closesNeeded) { 1088 subprocess.emit('close', subprocess.exitCode, subprocess.signalCode); 1089 } 1090} 1091 1092function spawnSync(options) { 1093 const result = spawn_sync.spawn(options); 1094 1095 if (result.output && options.encoding && options.encoding !== 'buffer') { 1096 for (let i = 0; i < result.output.length; i++) { 1097 if (!result.output[i]) 1098 continue; 1099 result.output[i] = result.output[i].toString(options.encoding); 1100 } 1101 } 1102 1103 result.stdout = result.output && result.output[1]; 1104 result.stderr = result.output && result.output[2]; 1105 1106 if (result.error) { 1107 result.error = errnoException(result.error, 'spawnSync ' + options.file); 1108 result.error.path = options.file; 1109 result.error.spawnargs = options.args.slice(1); 1110 } 1111 1112 return result; 1113} 1114 1115module.exports = { 1116 ChildProcess, 1117 kChannelHandle, 1118 setupChannel, 1119 getValidStdio, 1120 stdioStringToArray, 1121 spawnSync 1122}; 1123