• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayIsArray,
5  ArrayPrototypePush,
6  ObjectDefineProperty,
7  ObjectSetPrototypeOf,
8  ReflectApply,
9  Symbol,
10  Uint8Array,
11} = primordials;
12
13const {
14  errnoException,
15  codes: {
16    ERR_INVALID_ARG_TYPE,
17    ERR_INVALID_HANDLE_TYPE,
18    ERR_INVALID_OPT_VALUE,
19    ERR_INVALID_SYNC_FORK_INPUT,
20    ERR_IPC_CHANNEL_CLOSED,
21    ERR_IPC_DISCONNECTED,
22    ERR_IPC_ONE_PIPE,
23    ERR_IPC_SYNC_FORK,
24    ERR_MISSING_ARGS
25  }
26} = require('internal/errors');
27const {
28  validateArray,
29  validateOneOf,
30  validateString,
31} = require('internal/validators');
32const EventEmitter = require('events');
33const net = require('net');
34const dgram = require('dgram');
35const inspect = require('internal/util/inspect').inspect;
36const assert = require('internal/assert');
37
38const { Process } = internalBinding('process_wrap');
39const {
40  WriteWrap,
41  kReadBytesOrError,
42  kArrayBufferOffset,
43  kLastWriteWasAsync,
44  streamBaseState
45} = internalBinding('stream_wrap');
46const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
47const { TCP } = internalBinding('tcp_wrap');
48const { TTY } = internalBinding('tty_wrap');
49const { UDP } = internalBinding('udp_wrap');
50const SocketList = require('internal/socket_list');
51const { owner_symbol } = require('internal/async_hooks').symbols;
52const { convertToValidSignal, deprecate } = require('internal/util');
53const { isArrayBufferView } = require('internal/util/types');
54const spawn_sync = internalBinding('spawn_sync');
55const { kStateSymbol } = require('internal/dgram');
56
57const {
58  UV_EACCES,
59  UV_EAGAIN,
60  UV_EINVAL,
61  UV_EMFILE,
62  UV_ENFILE,
63  UV_ENOENT,
64  UV_ENOSYS,
65  UV_ESRCH
66} = internalBinding('uv');
67
68const { SocketListSend, SocketListReceive } = SocketList;
69
70// Lazy loaded for startup performance and to allow monkey patching of
71// internalBinding('http_parser').HTTPParser.
72let freeParser;
73let HTTPParser;
74
75const MAX_HANDLE_RETRANSMISSIONS = 3;
76const kChannelHandle = Symbol('kChannelHandle');
77const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');
78const kPendingMessages = Symbol('kPendingMessages');
79
80// This object contain function to convert TCP objects to native handle objects
81// and back again.
82const handleConversion = {
83  'net.Native': {
84    simultaneousAccepts: true,
85
86    send(message, handle, options) {
87      return handle;
88    },
89
90    got(message, handle, emit) {
91      emit(handle);
92    }
93  },
94
95  'net.Server': {
96    simultaneousAccepts: true,
97
98    send(message, server, options) {
99      return server._handle;
100    },
101
102    got(message, handle, emit) {
103      const server = new net.Server();
104      server.listen(handle, () => {
105        emit(server);
106      });
107    }
108  },
109
110  'net.Socket': {
111    send(message, socket, options) {
112      if (!socket._handle)
113        return;
114
115      // If the socket was created by net.Server
116      if (socket.server) {
117        // The worker should keep track of the socket
118        message.key = socket.server._connectionKey;
119
120        const firstTime = !this[kChannelHandle].sockets.send[message.key];
121        const socketList = getSocketList('send', this, message.key);
122
123        // The server should no longer expose a .connection property
124        // and when asked to close it should query the socket status from
125        // the workers
126        if (firstTime) socket.server._setupWorker(socketList);
127
128        // Act like socket is detached
129        if (!options.keepOpen)
130          socket.server._connections--;
131      }
132
133      const handle = socket._handle;
134
135      // Remove handle from socket object, it will be closed when the socket
136      // will be sent
137      if (!options.keepOpen) {
138        handle.onread = nop;
139        socket._handle = null;
140        socket.setTimeout(0);
141
142        if (freeParser === undefined)
143          freeParser = require('_http_common').freeParser;
144        if (HTTPParser === undefined)
145          HTTPParser = require('_http_common').HTTPParser;
146
147        // In case of an HTTP connection socket, release the associated
148        // resources
149        if (socket.parser && socket.parser instanceof HTTPParser) {
150          freeParser(socket.parser, null, socket);
151          if (socket._httpMessage)
152            socket._httpMessage.detachSocket(socket);
153        }
154      }
155
156      return handle;
157    },
158
159    postSend(message, handle, options, callback, target) {
160      // Store the handle after successfully sending it, so it can be closed
161      // when the NODE_HANDLE_ACK is received. If the handle could not be sent,
162      // just close it.
163      if (handle && !options.keepOpen) {
164        if (target) {
165          // There can only be one _pendingMessage as passing handles are
166          // processed one at a time: handles are stored in _handleQueue while
167          // waiting for the NODE_HANDLE_ACK of the current passing handle.
168          assert(!target._pendingMessage);
169          target._pendingMessage =
170              { callback, message, handle, options, retransmissions: 0 };
171        } else {
172          handle.close();
173        }
174      }
175    },
176
177    got(message, handle, emit) {
178      const socket = new net.Socket({
179        handle: handle,
180        readable: true,
181        writable: true
182      });
183
184      // If the socket was created by net.Server we will track the socket
185      if (message.key) {
186
187        // Add socket to connections list
188        const socketList = getSocketList('got', this, message.key);
189        socketList.add({
190          socket: socket
191        });
192      }
193
194      emit(socket);
195    }
196  },
197
198  'dgram.Native': {
199    simultaneousAccepts: false,
200
201    send(message, handle, options) {
202      return handle;
203    },
204
205    got(message, handle, emit) {
206      emit(handle);
207    }
208  },
209
210  'dgram.Socket': {
211    simultaneousAccepts: false,
212
213    send(message, socket, options) {
214      message.dgramType = socket.type;
215
216      return socket[kStateSymbol].handle;
217    },
218
219    got(message, handle, emit) {
220      const socket = new dgram.Socket(message.dgramType);
221
222      socket.bind(handle, () => {
223        emit(socket);
224      });
225    }
226  }
227};
228
229function stdioStringToArray(stdio, channel) {
230  const options = [];
231
232  switch (stdio) {
233    case 'ignore':
234    case 'overlapped':
235    case 'pipe': options.push(stdio, stdio, stdio); break;
236    case 'inherit': options.push(0, 1, 2); break;
237    default:
238      throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
239  }
240
241  if (channel) options.push(channel);
242
243  return options;
244}
245
246function ChildProcess() {
247  EventEmitter.call(this);
248
249  this._closesNeeded = 1;
250  this._closesGot = 0;
251  this.connected = false;
252
253  this.signalCode = null;
254  this.exitCode = null;
255  this.killed = false;
256  this.spawnfile = null;
257
258  this._handle = new Process();
259  this._handle[owner_symbol] = this;
260
261  this._handle.onexit = (exitCode, signalCode) => {
262    if (signalCode) {
263      this.signalCode = signalCode;
264    } else {
265      this.exitCode = exitCode;
266    }
267
268    if (this.stdin) {
269      this.stdin.destroy();
270    }
271
272    this._handle.close();
273    this._handle = null;
274
275    if (exitCode < 0) {
276      const syscall = this.spawnfile ? 'spawn ' + this.spawnfile : 'spawn';
277      const err = errnoException(exitCode, syscall);
278
279      if (this.spawnfile)
280        err.path = this.spawnfile;
281
282      err.spawnargs = this.spawnargs.slice(1);
283      this.emit('error', err);
284    } else {
285      this.emit('exit', this.exitCode, this.signalCode);
286    }
287
288    // If any of the stdio streams have not been touched,
289    // then pull all the data through so that it can get the
290    // eof and emit a 'close' event.
291    // Do it on nextTick so that the user has one last chance
292    // to consume the output, if for example they only want to
293    // start reading the data once the process exits.
294    process.nextTick(flushStdio, this);
295
296    maybeClose(this);
297  };
298}
299ObjectSetPrototypeOf(ChildProcess.prototype, EventEmitter.prototype);
300ObjectSetPrototypeOf(ChildProcess, EventEmitter);
301
302
303function flushStdio(subprocess) {
304  const stdio = subprocess.stdio;
305
306  if (stdio == null) return;
307
308  for (let i = 0; i < stdio.length; i++) {
309    const stream = stdio[i];
310    // TODO(addaleax): This doesn't necessarily account for all the ways in
311    // which data can be read from a stream, e.g. being consumed on the
312    // native layer directly as a StreamBase.
313    if (!stream || !stream.readable || stream[kIsUsedAsStdio]) {
314      continue;
315    }
316    stream.resume();
317  }
318}
319
320
321function createSocket(pipe, readable) {
322  return net.Socket({ handle: pipe, readable, writable: !readable });
323}
324
325
326function getHandleWrapType(stream) {
327  if (stream instanceof Pipe) return 'pipe';
328  if (stream instanceof TTY) return 'tty';
329  if (stream instanceof TCP) return 'tcp';
330  if (stream instanceof UDP) return 'udp';
331
332  return false;
333}
334
335function closePendingHandle(target) {
336  target._pendingMessage.handle.close();
337  target._pendingMessage = null;
338}
339
340
341ChildProcess.prototype.spawn = function(options) {
342  let i = 0;
343
344  if (options === null || typeof options !== 'object') {
345    throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
346  }
347
348  // If no `stdio` option was given - use default
349  let stdio = options.stdio || 'pipe';
350
351  stdio = getValidStdio(stdio, false);
352
353  const ipc = stdio.ipc;
354  const ipcFd = stdio.ipcFd;
355  stdio = options.stdio = stdio.stdio;
356
357
358  validateOneOf(options.serialization, 'options.serialization',
359                [undefined, 'json', 'advanced'], true);
360  const serialization = options.serialization || 'json';
361
362  if (ipc !== undefined) {
363    // Let child process know about opened IPC channel
364    if (options.envPairs === undefined)
365      options.envPairs = [];
366    else
367      validateArray(options.envPairs, 'options.envPairs');
368
369    options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`);
370    options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
371  }
372
373  validateString(options.file, 'options.file');
374  this.spawnfile = options.file;
375
376  if (ArrayIsArray(options.args))
377    this.spawnargs = options.args;
378  else if (options.args === undefined)
379    this.spawnargs = [];
380  else
381    throw new ERR_INVALID_ARG_TYPE('options.args', 'Array', options.args);
382
383  const err = this._handle.spawn(options);
384
385  // Run-time errors should emit an error, not throw an exception.
386  if (err === UV_EACCES ||
387      err === UV_EAGAIN ||
388      err === UV_EMFILE ||
389      err === UV_ENFILE ||
390      err === UV_ENOENT) {
391    process.nextTick(onErrorNT, this, err);
392
393    // There is no point in continuing when we've hit EMFILE or ENFILE
394    // because we won't be able to set up the stdio file descriptors.
395    if (err === UV_EMFILE || err === UV_ENFILE)
396      return err;
397  } else if (err) {
398    // Close all opened fds on error
399    for (i = 0; i < stdio.length; i++) {
400      const stream = stdio[i];
401      if (stream.type === 'pipe') {
402        stream.handle.close();
403      }
404    }
405
406    this._handle.close();
407    this._handle = null;
408    throw errnoException(err, 'spawn');
409  } else {
410    process.nextTick(onSpawnNT, this);
411  }
412
413  this.pid = this._handle.pid;
414
415  for (i = 0; i < stdio.length; i++) {
416    const stream = stdio[i];
417    if (stream.type === 'ignore') continue;
418
419    if (stream.ipc) {
420      this._closesNeeded++;
421      continue;
422    }
423
424    // The stream is already cloned and piped, thus stop its readable side,
425    // otherwise we might attempt to read from the stream when at the same time
426    // the child process does.
427    if (stream.type === 'wrap') {
428      stream.handle.reading = false;
429      stream.handle.readStop();
430      stream._stdio.pause();
431      stream._stdio.readableFlowing = false;
432      stream._stdio._readableState.reading = false;
433      stream._stdio[kIsUsedAsStdio] = true;
434      continue;
435    }
436
437    if (stream.handle) {
438      // When i === 0 - we're dealing with stdin
439      // (which is the only one writable pipe).
440      stream.socket = createSocket(this.pid !== 0 ?
441        stream.handle : null, i > 0);
442
443      if (i > 0 && this.pid !== 0) {
444        this._closesNeeded++;
445        stream.socket.on('close', () => {
446          maybeClose(this);
447        });
448      }
449    }
450  }
451
452  this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ?
453    stdio[0].socket : null;
454  this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ?
455    stdio[1].socket : null;
456  this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ?
457    stdio[2].socket : null;
458
459  this.stdio = [];
460
461  for (i = 0; i < stdio.length; i++)
462    this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
463
464  // Add .send() method and start listening for IPC data
465  if (ipc !== undefined) setupChannel(this, ipc, serialization);
466
467  return err;
468};
469
470
471function onErrorNT(self, err) {
472  self._handle.onexit(err);
473}
474
475
476function onSpawnNT(self) {
477  self.emit('spawn');
478}
479
480
481ChildProcess.prototype.kill = function(sig) {
482
483  const signal = sig === 0 ? sig :
484    convertToValidSignal(sig === undefined ? 'SIGTERM' : sig);
485
486  if (this._handle) {
487    const err = this._handle.kill(signal);
488    if (err === 0) {
489      /* Success. */
490      this.killed = true;
491      return true;
492    }
493    if (err === UV_ESRCH) {
494      /* Already dead. */
495    } else if (err === UV_EINVAL || err === UV_ENOSYS) {
496      /* The underlying platform doesn't support this signal. */
497      throw errnoException(err, 'kill');
498    } else {
499      /* Other error, almost certainly EPERM. */
500      this.emit('error', errnoException(err, 'kill'));
501    }
502  }
503
504  /* Kill didn't succeed. */
505  return false;
506};
507
508
509ChildProcess.prototype.ref = function() {
510  if (this._handle) this._handle.ref();
511};
512
513
514ChildProcess.prototype.unref = function() {
515  if (this._handle) this._handle.unref();
516};
517
518class Control extends EventEmitter {
519  #channel = null;
520  #refs = 0;
521  #refExplicitlySet = false;
522
523  constructor(channel) {
524    super();
525    this.#channel = channel;
526    this[kPendingMessages] = [];
527  }
528
529  // The methods keeping track of the counter are being used to track the
530  // listener count on the child process object as well as when writes are
531  // in progress. Once the user has explicitly requested a certain state, these
532  // methods become no-ops in order to not interfere with the user's intentions.
533  refCounted() {
534    if (++this.#refs === 1 && !this.#refExplicitlySet) {
535      this.#channel.ref();
536    }
537  }
538
539  unrefCounted() {
540    if (--this.#refs === 0 && !this.#refExplicitlySet) {
541      this.#channel.unref();
542      this.emit('unref');
543    }
544  }
545
546  ref() {
547    this.#refExplicitlySet = true;
548    this.#channel.ref();
549  }
550
551  unref() {
552    this.#refExplicitlySet = true;
553    this.#channel.unref();
554  }
555
556  get fd() {
557    return this.#channel ? this.#channel.fd : undefined;
558  }
559}
560
561const channelDeprecationMsg = '_channel is deprecated. ' +
562                              'Use ChildProcess.channel instead.';
563
564let serialization;
565function setupChannel(target, channel, serializationMode) {
566  const control = new Control(channel);
567  target.channel = control;
568  target[kChannelHandle] = channel;
569
570  ObjectDefineProperty(target, '_channel', {
571    get: deprecate(() => {
572      return target.channel;
573    }, channelDeprecationMsg, 'DEP0129'),
574    set: deprecate((val) => {
575      target.channel = val;
576    }, channelDeprecationMsg, 'DEP0129'),
577    configurable: true,
578    enumerable: false
579  });
580
581  target._handleQueue = null;
582  target._pendingMessage = null;
583
584  if (serialization === undefined)
585    serialization = require('internal/child_process/serialization');
586  const {
587    initMessageChannel,
588    parseChannelMessages,
589    writeChannelMessage
590  } = serialization[serializationMode];
591
592  let pendingHandle = null;
593  initMessageChannel(channel);
594  channel.pendingHandle = null;
595  channel.onread = function(arrayBuffer) {
596    const recvHandle = channel.pendingHandle;
597    channel.pendingHandle = null;
598    if (arrayBuffer) {
599      const nread = streamBaseState[kReadBytesOrError];
600      const offset = streamBaseState[kArrayBufferOffset];
601      const pool = new Uint8Array(arrayBuffer, offset, nread);
602      if (recvHandle)
603        pendingHandle = recvHandle;
604
605      for (const message of parseChannelMessages(channel, pool)) {
606        // There will be at most one NODE_HANDLE message in every chunk we
607        // read because SCM_RIGHTS messages don't get coalesced. Make sure
608        // that we deliver the handle with the right message however.
609        if (isInternal(message)) {
610          if (message.cmd === 'NODE_HANDLE') {
611            handleMessage(message, pendingHandle, true);
612            pendingHandle = null;
613          } else {
614            handleMessage(message, undefined, true);
615          }
616        } else {
617          handleMessage(message, undefined, false);
618        }
619      }
620    } else {
621      this.buffering = false;
622      target.disconnect();
623      channel.onread = nop;
624      channel.close();
625      target.channel = null;
626      maybeClose(target);
627    }
628  };
629
630  // Object where socket lists will live
631  channel.sockets = { got: {}, send: {} };
632
633  // Handlers will go through this
634  target.on('internalMessage', function(message, handle) {
635    // Once acknowledged - continue sending handles.
636    if (message.cmd === 'NODE_HANDLE_ACK' ||
637        message.cmd === 'NODE_HANDLE_NACK') {
638
639      if (target._pendingMessage) {
640        if (message.cmd === 'NODE_HANDLE_ACK') {
641          closePendingHandle(target);
642        } else if (target._pendingMessage.retransmissions++ ===
643                   MAX_HANDLE_RETRANSMISSIONS) {
644          closePendingHandle(target);
645          process.emitWarning('Handle did not reach the receiving process ' +
646                              'correctly', 'SentHandleNotReceivedWarning');
647        }
648      }
649
650      assert(ArrayIsArray(target._handleQueue));
651      const queue = target._handleQueue;
652      target._handleQueue = null;
653
654      if (target._pendingMessage) {
655        target._send(target._pendingMessage.message,
656                     target._pendingMessage.handle,
657                     target._pendingMessage.options,
658                     target._pendingMessage.callback);
659      }
660
661      for (let i = 0; i < queue.length; i++) {
662        const args = queue[i];
663        target._send(args.message, args.handle, args.options, args.callback);
664      }
665
666      // Process a pending disconnect (if any).
667      if (!target.connected && target.channel && !target._handleQueue)
668        target._disconnect();
669
670      return;
671    }
672
673    if (message.cmd !== 'NODE_HANDLE') return;
674
675    // It is possible that the handle is not received because of some error on
676    // ancillary data reception such as MSG_CTRUNC. In this case, report the
677    // sender about it by sending a NODE_HANDLE_NACK message.
678    if (!handle)
679      return target._send({ cmd: 'NODE_HANDLE_NACK' }, null, true);
680
681    // Acknowledge handle receival. Don't emit error events (for example if
682    // the other side has disconnected) because this call to send() is not
683    // initiated by the user and it shouldn't be fatal to be unable to ACK
684    // a message.
685    target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true);
686
687    const obj = handleConversion[message.type];
688
689    // Update simultaneous accepts on Windows
690    if (process.platform === 'win32') {
691      handle.setSimultaneousAccepts(false);
692    }
693
694    // Convert handle object
695    obj.got.call(this, message, handle, (handle) => {
696      handleMessage(message.msg, handle, isInternal(message.msg));
697    });
698  });
699
700  target.on('newListener', function() {
701
702    process.nextTick(() => {
703      if (!target.channel || !target.listenerCount('message'))
704        return;
705
706      const messages = target.channel[kPendingMessages];
707      const { length } = messages;
708      if (!length) return;
709
710      for (let i = 0; i < length; i++) {
711        ReflectApply(target.emit, target, messages[i]);
712      }
713
714      target.channel[kPendingMessages] = [];
715    });
716  });
717
718  target.send = function(message, handle, options, callback) {
719    if (typeof handle === 'function') {
720      callback = handle;
721      handle = undefined;
722      options = undefined;
723    } else if (typeof options === 'function') {
724      callback = options;
725      options = undefined;
726    } else if (options !== undefined &&
727               (options === null || typeof options !== 'object')) {
728      throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
729    }
730
731    options = { swallowErrors: false, ...options };
732
733    if (this.connected) {
734      return this._send(message, handle, options, callback);
735    }
736    const ex = new ERR_IPC_CHANNEL_CLOSED();
737    if (typeof callback === 'function') {
738      process.nextTick(callback, ex);
739    } else {
740      process.nextTick(() => this.emit('error', ex));
741    }
742    return false;
743  };
744
745  target._send = function(message, handle, options, callback) {
746    assert(this.connected || this.channel);
747
748    if (message === undefined)
749      throw new ERR_MISSING_ARGS('message');
750
751    // Non-serializable messages should not reach the remote
752    // end point; as any failure in the stringification there
753    // will result in error message that is weakly consumable.
754    // So perform a final check on message prior to sending.
755    if (typeof message !== 'string' &&
756        typeof message !== 'object' &&
757        typeof message !== 'number' &&
758        typeof message !== 'boolean') {
759      throw new ERR_INVALID_ARG_TYPE(
760        'message', ['string', 'object', 'number', 'boolean'], message);
761    }
762
763    // Support legacy function signature
764    if (typeof options === 'boolean') {
765      options = { swallowErrors: options };
766    }
767
768    let obj;
769
770    // Package messages with a handle object
771    if (handle) {
772      // This message will be handled by an internalMessage event handler
773      message = {
774        cmd: 'NODE_HANDLE',
775        type: null,
776        msg: message
777      };
778
779      if (handle instanceof net.Socket) {
780        message.type = 'net.Socket';
781      } else if (handle instanceof net.Server) {
782        message.type = 'net.Server';
783      } else if (handle instanceof TCP || handle instanceof Pipe) {
784        message.type = 'net.Native';
785      } else if (handle instanceof dgram.Socket) {
786        message.type = 'dgram.Socket';
787      } else if (handle instanceof UDP) {
788        message.type = 'dgram.Native';
789      } else {
790        throw new ERR_INVALID_HANDLE_TYPE();
791      }
792
793      // Queue-up message and handle if we haven't received ACK yet.
794      if (this._handleQueue) {
795        this._handleQueue.push({
796          callback: callback,
797          handle: handle,
798          options: options,
799          message: message.msg,
800        });
801        return this._handleQueue.length === 1;
802      }
803
804      obj = handleConversion[message.type];
805
806      // convert TCP object to native handle object
807      handle = handleConversion[message.type].send.call(target,
808                                                        message,
809                                                        handle,
810                                                        options);
811
812      // If handle was sent twice, or it is impossible to get native handle
813      // out of it - just send a text without the handle.
814      if (!handle)
815        message = message.msg;
816
817      // Update simultaneous accepts on Windows
818      if (obj.simultaneousAccepts && process.platform === 'win32') {
819        handle.setSimultaneousAccepts(true);
820      }
821    } else if (this._handleQueue &&
822               !(message && (message.cmd === 'NODE_HANDLE_ACK' ||
823                             message.cmd === 'NODE_HANDLE_NACK'))) {
824      // Queue request anyway to avoid out-of-order messages.
825      this._handleQueue.push({
826        callback: callback,
827        handle: null,
828        options: options,
829        message: message,
830      });
831      return this._handleQueue.length === 1;
832    }
833
834    const req = new WriteWrap();
835
836    const err = writeChannelMessage(channel, req, message, handle);
837    const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
838
839    if (err === 0) {
840      if (handle) {
841        if (!this._handleQueue)
842          this._handleQueue = [];
843        if (obj && obj.postSend)
844          obj.postSend(message, handle, options, callback, target);
845      }
846
847      if (wasAsyncWrite) {
848        req.oncomplete = () => {
849          control.unrefCounted();
850          if (typeof callback === 'function')
851            callback(null);
852        };
853        control.refCounted();
854      } else if (typeof callback === 'function') {
855        process.nextTick(callback, null);
856      }
857    } else {
858      // Cleanup handle on error
859      if (obj && obj.postSend)
860        obj.postSend(message, handle, options, callback);
861
862      if (!options.swallowErrors) {
863        const ex = errnoException(err, 'write');
864        if (typeof callback === 'function') {
865          process.nextTick(callback, ex);
866        } else {
867          process.nextTick(() => this.emit('error', ex));
868        }
869      }
870    }
871
872    /* If the master is > 2 read() calls behind, please stop sending. */
873    return channel.writeQueueSize < (65536 * 2);
874  };
875
876  // Connected will be set to false immediately when a disconnect() is
877  // requested, even though the channel might still be alive internally to
878  // process queued messages. The three states are distinguished as follows:
879  // - disconnect() never requested: channel is not null and connected
880  //   is true
881  // - disconnect() requested, messages in the queue: channel is not null
882  //   and connected is false
883  // - disconnect() requested, channel actually disconnected: channel is
884  //   null and connected is false
885  target.connected = true;
886
887  target.disconnect = function() {
888    if (!this.connected) {
889      this.emit('error', new ERR_IPC_DISCONNECTED());
890      return;
891    }
892
893    // Do not allow any new messages to be written.
894    this.connected = false;
895
896    // If there are no queued messages, disconnect immediately. Otherwise,
897    // postpone the disconnect so that it happens internally after the
898    // queue is flushed.
899    if (!this._handleQueue)
900      this._disconnect();
901  };
902
903  target._disconnect = function() {
904    assert(this.channel);
905
906    // This marks the fact that the channel is actually disconnected.
907    this.channel = null;
908    this[kChannelHandle] = null;
909
910    if (this._pendingMessage)
911      closePendingHandle(this);
912
913    let fired = false;
914    function finish() {
915      if (fired) return;
916      fired = true;
917
918      channel.close();
919      target.emit('disconnect');
920    }
921
922    // If a message is being read, then wait for it to complete.
923    if (channel.buffering) {
924      this.once('message', finish);
925      this.once('internalMessage', finish);
926
927      return;
928    }
929
930    process.nextTick(finish);
931  };
932
933  function emit(event, message, handle) {
934    if ('internalMessage' === event || target.listenerCount('message')) {
935      target.emit(event, message, handle);
936      return;
937    }
938
939    ArrayPrototypePush(
940      target.channel[kPendingMessages],
941      [event, message, handle]
942    );
943  }
944
945  function handleMessage(message, handle, internal) {
946    if (!target.channel)
947      return;
948
949    const eventName = (internal ? 'internalMessage' : 'message');
950
951    process.nextTick(emit, eventName, message, handle);
952  }
953
954  channel.readStart();
955  return control;
956}
957
958const INTERNAL_PREFIX = 'NODE_';
959function isInternal(message) {
960  return (message !== null &&
961          typeof message === 'object' &&
962          typeof message.cmd === 'string' &&
963          message.cmd.length > INTERNAL_PREFIX.length &&
964          message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX);
965}
966
967function nop() { }
968
969function getValidStdio(stdio, sync) {
970  let ipc;
971  let ipcFd;
972
973  // Replace shortcut with an array
974  if (typeof stdio === 'string') {
975    stdio = stdioStringToArray(stdio);
976  } else if (!ArrayIsArray(stdio)) {
977    throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
978  }
979
980  // At least 3 stdio will be created
981  // Don't concat() a new Array() because it would be sparse, and
982  // stdio.reduce() would skip the sparse elements of stdio.
983  // See https://stackoverflow.com/a/5501711/3561
984  while (stdio.length < 3) stdio.push(undefined);
985
986  // Translate stdio into C++-readable form
987  // (i.e. PipeWraps or fds)
988  stdio = stdio.reduce((acc, stdio, i) => {
989    function cleanup() {
990      for (let i = 0; i < acc.length; i++) {
991        if ((acc[i].type === 'pipe' || acc[i].type === 'ipc') && acc[i].handle)
992          acc[i].handle.close();
993      }
994    }
995
996    // Defaults
997    if (stdio == null) {
998      stdio = i < 3 ? 'pipe' : 'ignore';
999    }
1000
1001    if (stdio === 'ignore') {
1002      acc.push({ type: 'ignore' });
1003    } else if (stdio === 'pipe' || stdio === 'overlapped' ||
1004               (typeof stdio === 'number' && stdio < 0)) {
1005      const a = {
1006        type: stdio === 'overlapped' ? 'overlapped' : 'pipe',
1007        readable: i === 0,
1008        writable: i !== 0
1009      };
1010
1011      if (!sync)
1012        a.handle = new Pipe(PipeConstants.SOCKET);
1013
1014      acc.push(a);
1015    } else if (stdio === 'ipc') {
1016      if (sync || ipc !== undefined) {
1017        // Cleanup previously created pipes
1018        cleanup();
1019        if (!sync)
1020          throw new ERR_IPC_ONE_PIPE();
1021        else
1022          throw new ERR_IPC_SYNC_FORK();
1023      }
1024
1025      ipc = new Pipe(PipeConstants.IPC);
1026      ipcFd = i;
1027
1028      acc.push({
1029        type: 'pipe',
1030        handle: ipc,
1031        ipc: true
1032      });
1033    } else if (stdio === 'inherit') {
1034      acc.push({
1035        type: 'inherit',
1036        fd: i
1037      });
1038    } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') {
1039      acc.push({
1040        type: 'fd',
1041        fd: typeof stdio === 'number' ? stdio : stdio.fd
1042      });
1043    } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) ||
1044               getHandleWrapType(stdio._handle)) {
1045      const handle = getHandleWrapType(stdio) ?
1046        stdio :
1047        getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle;
1048
1049      acc.push({
1050        type: 'wrap',
1051        wrapType: getHandleWrapType(handle),
1052        handle: handle,
1053        _stdio: stdio
1054      });
1055    } else if (isArrayBufferView(stdio) || typeof stdio === 'string') {
1056      if (!sync) {
1057        cleanup();
1058        throw new ERR_INVALID_SYNC_FORK_INPUT(inspect(stdio));
1059      }
1060    } else {
1061      // Cleanup
1062      cleanup();
1063      throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
1064    }
1065
1066    return acc;
1067  }, []);
1068
1069  return { stdio, ipc, ipcFd };
1070}
1071
1072
1073function getSocketList(type, worker, key) {
1074  const sockets = worker[kChannelHandle].sockets[type];
1075  let socketList = sockets[key];
1076  if (!socketList) {
1077    const Construct = type === 'send' ? SocketListSend : SocketListReceive;
1078    socketList = sockets[key] = new Construct(worker, key);
1079  }
1080  return socketList;
1081}
1082
1083
1084function maybeClose(subprocess) {
1085  subprocess._closesGot++;
1086
1087  if (subprocess._closesGot === subprocess._closesNeeded) {
1088    subprocess.emit('close', subprocess.exitCode, subprocess.signalCode);
1089  }
1090}
1091
1092function spawnSync(options) {
1093  const result = spawn_sync.spawn(options);
1094
1095  if (result.output && options.encoding && options.encoding !== 'buffer') {
1096    for (let i = 0; i < result.output.length; i++) {
1097      if (!result.output[i])
1098        continue;
1099      result.output[i] = result.output[i].toString(options.encoding);
1100    }
1101  }
1102
1103  result.stdout = result.output && result.output[1];
1104  result.stderr = result.output && result.output[2];
1105
1106  if (result.error) {
1107    result.error = errnoException(result.error, 'spawnSync ' + options.file);
1108    result.error.path = options.file;
1109    result.error.spawnargs = options.args.slice(1);
1110  }
1111
1112  return result;
1113}
1114
1115module.exports = {
1116  ChildProcess,
1117  kChannelHandle,
1118  setupChannel,
1119  getValidStdio,
1120  stdioStringToArray,
1121  spawnSync
1122};
1123