• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayIsArray,
5  ObjectDefineProperty,
6  ObjectSetPrototypeOf,
7  Symbol,
8  Uint8Array,
9} = primordials;
10
11const {
12  errnoException,
13  codes: {
14    ERR_INVALID_ARG_TYPE,
15    ERR_INVALID_HANDLE_TYPE,
16    ERR_INVALID_OPT_VALUE,
17    ERR_INVALID_SYNC_FORK_INPUT,
18    ERR_IPC_CHANNEL_CLOSED,
19    ERR_IPC_DISCONNECTED,
20    ERR_IPC_ONE_PIPE,
21    ERR_IPC_SYNC_FORK,
22    ERR_MISSING_ARGS
23  }
24} = require('internal/errors');
25const { validateString } = require('internal/validators');
26const EventEmitter = require('events');
27const net = require('net');
28const dgram = require('dgram');
29const inspect = require('internal/util/inspect').inspect;
30const assert = require('internal/assert');
31
32const { Process } = internalBinding('process_wrap');
33const {
34  WriteWrap,
35  kReadBytesOrError,
36  kArrayBufferOffset,
37  kLastWriteWasAsync,
38  streamBaseState
39} = internalBinding('stream_wrap');
40const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
41const { TCP } = internalBinding('tcp_wrap');
42const { TTY } = internalBinding('tty_wrap');
43const { UDP } = internalBinding('udp_wrap');
44const SocketList = require('internal/socket_list');
45const { owner_symbol } = require('internal/async_hooks').symbols;
46const { convertToValidSignal } = require('internal/util');
47const { isArrayBufferView } = require('internal/util/types');
48const spawn_sync = internalBinding('spawn_sync');
49const { kStateSymbol } = require('internal/dgram');
50
51const {
52  UV_EACCES,
53  UV_EAGAIN,
54  UV_EINVAL,
55  UV_EMFILE,
56  UV_ENFILE,
57  UV_ENOENT,
58  UV_ENOSYS,
59  UV_ESRCH
60} = internalBinding('uv');
61
62const { SocketListSend, SocketListReceive } = SocketList;
63
64// Lazy loaded for startup performance and to allow monkey patching of
65// internalBinding('http_parser').HTTPParser.
66let freeParser;
67let HTTPParser;
68
69const MAX_HANDLE_RETRANSMISSIONS = 3;
70const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');
71
72// This object contain function to convert TCP objects to native handle objects
73// and back again.
74const handleConversion = {
75  'net.Native': {
76    simultaneousAccepts: true,
77
78    send(message, handle, options) {
79      return handle;
80    },
81
82    got(message, handle, emit) {
83      emit(handle);
84    }
85  },
86
87  'net.Server': {
88    simultaneousAccepts: true,
89
90    send(message, server, options) {
91      return server._handle;
92    },
93
94    got(message, handle, emit) {
95      const server = new net.Server();
96      server.listen(handle, () => {
97        emit(server);
98      });
99    }
100  },
101
102  'net.Socket': {
103    send(message, socket, options) {
104      if (!socket._handle)
105        return;
106
107      // If the socket was created by net.Server
108      if (socket.server) {
109        // The worker should keep track of the socket
110        message.key = socket.server._connectionKey;
111
112        const firstTime = !this.channel.sockets.send[message.key];
113        const socketList = getSocketList('send', this, message.key);
114
115        // The server should no longer expose a .connection property
116        // and when asked to close it should query the socket status from
117        // the workers
118        if (firstTime) socket.server._setupWorker(socketList);
119
120        // Act like socket is detached
121        if (!options.keepOpen)
122          socket.server._connections--;
123      }
124
125      const handle = socket._handle;
126
127      // Remove handle from socket object, it will be closed when the socket
128      // will be sent
129      if (!options.keepOpen) {
130        handle.onread = nop;
131        socket._handle = null;
132        socket.setTimeout(0);
133
134        if (freeParser === undefined)
135          freeParser = require('_http_common').freeParser;
136        if (HTTPParser === undefined)
137          HTTPParser = require('_http_common').HTTPParser;
138
139        // In case of an HTTP connection socket, release the associated
140        // resources
141        if (socket.parser && socket.parser instanceof HTTPParser) {
142          freeParser(socket.parser, null, socket);
143          if (socket._httpMessage)
144            socket._httpMessage.detachSocket(socket);
145        }
146      }
147
148      return handle;
149    },
150
151    postSend(message, handle, options, callback, target) {
152      // Store the handle after successfully sending it, so it can be closed
153      // when the NODE_HANDLE_ACK is received. If the handle could not be sent,
154      // just close it.
155      if (handle && !options.keepOpen) {
156        if (target) {
157          // There can only be one _pendingMessage as passing handles are
158          // processed one at a time: handles are stored in _handleQueue while
159          // waiting for the NODE_HANDLE_ACK of the current passing handle.
160          assert(!target._pendingMessage);
161          target._pendingMessage =
162              { callback, message, handle, options, retransmissions: 0 };
163        } else {
164          handle.close();
165        }
166      }
167    },
168
169    got(message, handle, emit) {
170      const socket = new net.Socket({
171        handle: handle,
172        readable: true,
173        writable: true
174      });
175
176      // If the socket was created by net.Server we will track the socket
177      if (message.key) {
178
179        // Add socket to connections list
180        const socketList = getSocketList('got', this, message.key);
181        socketList.add({
182          socket: socket
183        });
184      }
185
186      emit(socket);
187    }
188  },
189
190  'dgram.Native': {
191    simultaneousAccepts: false,
192
193    send(message, handle, options) {
194      return handle;
195    },
196
197    got(message, handle, emit) {
198      emit(handle);
199    }
200  },
201
202  'dgram.Socket': {
203    simultaneousAccepts: false,
204
205    send(message, socket, options) {
206      message.dgramType = socket.type;
207
208      return socket[kStateSymbol].handle;
209    },
210
211    got(message, handle, emit) {
212      const socket = new dgram.Socket(message.dgramType);
213
214      socket.bind(handle, () => {
215        emit(socket);
216      });
217    }
218  }
219};
220
221function stdioStringToArray(stdio, channel) {
222  const options = [];
223
224  switch (stdio) {
225    case 'ignore':
226    case 'pipe': options.push(stdio, stdio, stdio); break;
227    case 'inherit': options.push(0, 1, 2); break;
228    default:
229      throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
230  }
231
232  if (channel) options.push(channel);
233
234  return options;
235}
236
237function ChildProcess() {
238  EventEmitter.call(this);
239
240  this._closesNeeded = 1;
241  this._closesGot = 0;
242  this.connected = false;
243
244  this.signalCode = null;
245  this.exitCode = null;
246  this.killed = false;
247  this.spawnfile = null;
248
249  this._handle = new Process();
250  this._handle[owner_symbol] = this;
251
252  this._handle.onexit = (exitCode, signalCode) => {
253    if (signalCode) {
254      this.signalCode = signalCode;
255    } else {
256      this.exitCode = exitCode;
257    }
258
259    if (this.stdin) {
260      this.stdin.destroy();
261    }
262
263    this._handle.close();
264    this._handle = null;
265
266    if (exitCode < 0) {
267      const syscall = this.spawnfile ? 'spawn ' + this.spawnfile : 'spawn';
268      const err = errnoException(exitCode, syscall);
269
270      if (this.spawnfile)
271        err.path = this.spawnfile;
272
273      err.spawnargs = this.spawnargs.slice(1);
274      this.emit('error', err);
275    } else {
276      this.emit('exit', this.exitCode, this.signalCode);
277    }
278
279    // If any of the stdio streams have not been touched,
280    // then pull all the data through so that it can get the
281    // eof and emit a 'close' event.
282    // Do it on nextTick so that the user has one last chance
283    // to consume the output, if for example they only want to
284    // start reading the data once the process exits.
285    process.nextTick(flushStdio, this);
286
287    maybeClose(this);
288  };
289}
290ObjectSetPrototypeOf(ChildProcess.prototype, EventEmitter.prototype);
291ObjectSetPrototypeOf(ChildProcess, EventEmitter);
292
293
294function flushStdio(subprocess) {
295  const stdio = subprocess.stdio;
296
297  if (stdio == null) return;
298
299  for (let i = 0; i < stdio.length; i++) {
300    const stream = stdio[i];
301    // TODO(addaleax): This doesn't necessarily account for all the ways in
302    // which data can be read from a stream, e.g. being consumed on the
303    // native layer directly as a StreamBase.
304    if (!stream || !stream.readable ||
305        stream._readableState.readableListening ||
306        stream[kIsUsedAsStdio]) {
307      continue;
308    }
309    stream.resume();
310  }
311}
312
313
314function createSocket(pipe, readable) {
315  return net.Socket({ handle: pipe, readable, writable: !readable });
316}
317
318
319function getHandleWrapType(stream) {
320  if (stream instanceof Pipe) return 'pipe';
321  if (stream instanceof TTY) return 'tty';
322  if (stream instanceof TCP) return 'tcp';
323  if (stream instanceof UDP) return 'udp';
324
325  return false;
326}
327
328function closePendingHandle(target) {
329  target._pendingMessage.handle.close();
330  target._pendingMessage = null;
331}
332
333
334ChildProcess.prototype.spawn = function(options) {
335  let i = 0;
336
337  if (options === null || typeof options !== 'object') {
338    throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
339  }
340
341  // If no `stdio` option was given - use default
342  let stdio = options.stdio || 'pipe';
343
344  stdio = getValidStdio(stdio, false);
345
346  const ipc = stdio.ipc;
347  const ipcFd = stdio.ipcFd;
348  stdio = options.stdio = stdio.stdio;
349
350  if (options.serialization !== undefined &&
351      options.serialization !== 'json' &&
352      options.serialization !== 'advanced') {
353    throw new ERR_INVALID_OPT_VALUE('options.serialization',
354                                    options.serialization);
355  }
356
357  const serialization = options.serialization || 'json';
358
359  if (ipc !== undefined) {
360    // Let child process know about opened IPC channel
361    if (options.envPairs === undefined)
362      options.envPairs = [];
363    else if (!ArrayIsArray(options.envPairs)) {
364      throw new ERR_INVALID_ARG_TYPE('options.envPairs',
365                                     'Array',
366                                     options.envPairs);
367    }
368
369    options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`);
370    options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
371  }
372
373  validateString(options.file, 'options.file');
374  this.spawnfile = options.file;
375
376  if (ArrayIsArray(options.args))
377    this.spawnargs = options.args;
378  else if (options.args === undefined)
379    this.spawnargs = [];
380  else
381    throw new ERR_INVALID_ARG_TYPE('options.args', 'Array', options.args);
382
383  const err = this._handle.spawn(options);
384
385  // Run-time errors should emit an error, not throw an exception.
386  if (err === UV_EACCES ||
387      err === UV_EAGAIN ||
388      err === UV_EMFILE ||
389      err === UV_ENFILE ||
390      err === UV_ENOENT) {
391    process.nextTick(onErrorNT, this, err);
392
393    // There is no point in continuing when we've hit EMFILE or ENFILE
394    // because we won't be able to set up the stdio file descriptors.
395    if (err === UV_EMFILE || err === UV_ENFILE)
396      return err;
397  } else if (err) {
398    // Close all opened fds on error
399    for (i = 0; i < stdio.length; i++) {
400      const stream = stdio[i];
401      if (stream.type === 'pipe') {
402        stream.handle.close();
403      }
404    }
405
406    this._handle.close();
407    this._handle = null;
408    throw errnoException(err, 'spawn');
409  }
410
411  this.pid = this._handle.pid;
412
413  for (i = 0; i < stdio.length; i++) {
414    const stream = stdio[i];
415    if (stream.type === 'ignore') continue;
416
417    if (stream.ipc) {
418      this._closesNeeded++;
419      continue;
420    }
421
422    // The stream is already cloned and piped, thus stop its readable side,
423    // otherwise we might attempt to read from the stream when at the same time
424    // the child process does.
425    if (stream.type === 'wrap') {
426      stream.handle.reading = false;
427      stream.handle.readStop();
428      stream._stdio.pause();
429      stream._stdio.readableFlowing = false;
430      stream._stdio._readableState.reading = false;
431      stream._stdio[kIsUsedAsStdio] = true;
432      continue;
433    }
434
435    if (stream.handle) {
436      // When i === 0 - we're dealing with stdin
437      // (which is the only one writable pipe).
438      stream.socket = createSocket(this.pid !== 0 ?
439        stream.handle : null, i > 0);
440
441      if (i > 0 && this.pid !== 0) {
442        this._closesNeeded++;
443        stream.socket.on('close', () => {
444          maybeClose(this);
445        });
446      }
447    }
448  }
449
450  this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ?
451    stdio[0].socket : null;
452  this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ?
453    stdio[1].socket : null;
454  this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ?
455    stdio[2].socket : null;
456
457  this.stdio = [];
458
459  for (i = 0; i < stdio.length; i++)
460    this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
461
462  // Add .send() method and start listening for IPC data
463  if (ipc !== undefined) setupChannel(this, ipc, serialization);
464
465  return err;
466};
467
468
469function onErrorNT(self, err) {
470  self._handle.onexit(err);
471}
472
473
474ChildProcess.prototype.kill = function(sig) {
475
476  const signal = sig === 0 ? sig :
477    convertToValidSignal(sig === undefined ? 'SIGTERM' : sig);
478
479  if (this._handle) {
480    const err = this._handle.kill(signal);
481    if (err === 0) {
482      /* Success. */
483      this.killed = true;
484      return true;
485    }
486    if (err === UV_ESRCH) {
487      /* Already dead. */
488    } else if (err === UV_EINVAL || err === UV_ENOSYS) {
489      /* The underlying platform doesn't support this signal. */
490      throw errnoException(err, 'kill');
491    } else {
492      /* Other error, almost certainly EPERM. */
493      this.emit('error', errnoException(err, 'kill'));
494    }
495  }
496
497  /* Kill didn't succeed. */
498  return false;
499};
500
501
502ChildProcess.prototype.ref = function() {
503  if (this._handle) this._handle.ref();
504};
505
506
507ChildProcess.prototype.unref = function() {
508  if (this._handle) this._handle.unref();
509};
510
511class Control extends EventEmitter {
512  constructor(channel) {
513    super();
514    this.channel = channel;
515    this.refs = 0;
516  }
517  ref() {
518    if (++this.refs === 1) {
519      this.channel.ref();
520    }
521  }
522  unref() {
523    if (--this.refs === 0) {
524      this.channel.unref();
525      this.emit('unref');
526    }
527  }
528}
529
530let serialization;
531function setupChannel(target, channel, serializationMode) {
532  target.channel = channel;
533
534  // _channel can be deprecated in version 8
535  ObjectDefineProperty(target, '_channel', {
536    get() { return target.channel; },
537    set(val) { target.channel = val; },
538    enumerable: true
539  });
540
541  target._handleQueue = null;
542  target._pendingMessage = null;
543
544  const control = new Control(channel);
545
546  if (serialization === undefined)
547    serialization = require('internal/child_process/serialization');
548  const {
549    initMessageChannel,
550    parseChannelMessages,
551    writeChannelMessage
552  } = serialization[serializationMode];
553
554  let pendingHandle = null;
555  initMessageChannel(channel);
556  channel.pendingHandle = null;
557  channel.onread = function(arrayBuffer) {
558    const recvHandle = channel.pendingHandle;
559    channel.pendingHandle = null;
560    if (arrayBuffer) {
561      const nread = streamBaseState[kReadBytesOrError];
562      const offset = streamBaseState[kArrayBufferOffset];
563      const pool = new Uint8Array(arrayBuffer, offset, nread);
564      if (recvHandle)
565        pendingHandle = recvHandle;
566
567      for (const message of parseChannelMessages(channel, pool)) {
568        // There will be at most one NODE_HANDLE message in every chunk we
569        // read because SCM_RIGHTS messages don't get coalesced. Make sure
570        // that we deliver the handle with the right message however.
571        if (isInternal(message)) {
572          if (message.cmd === 'NODE_HANDLE') {
573            handleMessage(message, pendingHandle, true);
574            pendingHandle = null;
575          } else {
576            handleMessage(message, undefined, true);
577          }
578        } else {
579          handleMessage(message, undefined, false);
580        }
581      }
582    } else {
583      this.buffering = false;
584      target.disconnect();
585      channel.onread = nop;
586      channel.close();
587      target.channel = null;
588      maybeClose(target);
589    }
590  };
591
592  // Object where socket lists will live
593  channel.sockets = { got: {}, send: {} };
594
595  // Handlers will go through this
596  target.on('internalMessage', function(message, handle) {
597    // Once acknowledged - continue sending handles.
598    if (message.cmd === 'NODE_HANDLE_ACK' ||
599        message.cmd === 'NODE_HANDLE_NACK') {
600
601      if (target._pendingMessage) {
602        if (message.cmd === 'NODE_HANDLE_ACK') {
603          closePendingHandle(target);
604        } else if (target._pendingMessage.retransmissions++ ===
605                   MAX_HANDLE_RETRANSMISSIONS) {
606          closePendingHandle(target);
607          process.emitWarning('Handle did not reach the receiving process ' +
608                              'correctly', 'SentHandleNotReceivedWarning');
609        }
610      }
611
612      assert(ArrayIsArray(target._handleQueue));
613      const queue = target._handleQueue;
614      target._handleQueue = null;
615
616      if (target._pendingMessage) {
617        target._send(target._pendingMessage.message,
618                     target._pendingMessage.handle,
619                     target._pendingMessage.options,
620                     target._pendingMessage.callback);
621      }
622
623      for (let i = 0; i < queue.length; i++) {
624        const args = queue[i];
625        target._send(args.message, args.handle, args.options, args.callback);
626      }
627
628      // Process a pending disconnect (if any).
629      if (!target.connected && target.channel && !target._handleQueue)
630        target._disconnect();
631
632      return;
633    }
634
635    if (message.cmd !== 'NODE_HANDLE') return;
636
637    // It is possible that the handle is not received because of some error on
638    // ancillary data reception such as MSG_CTRUNC. In this case, report the
639    // sender about it by sending a NODE_HANDLE_NACK message.
640    if (!handle)
641      return target._send({ cmd: 'NODE_HANDLE_NACK' }, null, true);
642
643    // Acknowledge handle receival. Don't emit error events (for example if
644    // the other side has disconnected) because this call to send() is not
645    // initiated by the user and it shouldn't be fatal to be unable to ACK
646    // a message.
647    target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true);
648
649    const obj = handleConversion[message.type];
650
651    // Update simultaneous accepts on Windows
652    if (process.platform === 'win32') {
653      handle.setSimultaneousAccepts(false);
654    }
655
656    // Convert handle object
657    obj.got.call(this, message, handle, (handle) => {
658      handleMessage(message.msg, handle, isInternal(message.msg));
659    });
660  });
661
662  target.send = function(message, handle, options, callback) {
663    if (typeof handle === 'function') {
664      callback = handle;
665      handle = undefined;
666      options = undefined;
667    } else if (typeof options === 'function') {
668      callback = options;
669      options = undefined;
670    } else if (options !== undefined &&
671               (options === null || typeof options !== 'object')) {
672      throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
673    }
674
675    options = { swallowErrors: false, ...options };
676
677    if (this.connected) {
678      return this._send(message, handle, options, callback);
679    }
680    const ex = new ERR_IPC_CHANNEL_CLOSED();
681    if (typeof callback === 'function') {
682      process.nextTick(callback, ex);
683    } else {
684      process.nextTick(() => this.emit('error', ex));
685    }
686    return false;
687  };
688
689  target._send = function(message, handle, options, callback) {
690    assert(this.connected || this.channel);
691
692    if (message === undefined)
693      throw new ERR_MISSING_ARGS('message');
694
695    // Non-serializable messages should not reach the remote
696    // end point; as any failure in the stringification there
697    // will result in error message that is weakly consumable.
698    // So perform a sanity check on message prior to sending.
699    if (typeof message !== 'string' &&
700        typeof message !== 'object' &&
701        typeof message !== 'number' &&
702        typeof message !== 'boolean') {
703      throw new ERR_INVALID_ARG_TYPE(
704        'message', ['string', 'object', 'number', 'boolean'], message);
705    }
706
707    // Support legacy function signature
708    if (typeof options === 'boolean') {
709      options = { swallowErrors: options };
710    }
711
712    let obj;
713
714    // Package messages with a handle object
715    if (handle) {
716      // This message will be handled by an internalMessage event handler
717      message = {
718        cmd: 'NODE_HANDLE',
719        type: null,
720        msg: message
721      };
722
723      if (handle instanceof net.Socket) {
724        message.type = 'net.Socket';
725      } else if (handle instanceof net.Server) {
726        message.type = 'net.Server';
727      } else if (handle instanceof TCP || handle instanceof Pipe) {
728        message.type = 'net.Native';
729      } else if (handle instanceof dgram.Socket) {
730        message.type = 'dgram.Socket';
731      } else if (handle instanceof UDP) {
732        message.type = 'dgram.Native';
733      } else {
734        throw new ERR_INVALID_HANDLE_TYPE();
735      }
736
737      // Queue-up message and handle if we haven't received ACK yet.
738      if (this._handleQueue) {
739        this._handleQueue.push({
740          callback: callback,
741          handle: handle,
742          options: options,
743          message: message.msg,
744        });
745        return this._handleQueue.length === 1;
746      }
747
748      obj = handleConversion[message.type];
749
750      // convert TCP object to native handle object
751      handle = handleConversion[message.type].send.call(target,
752                                                        message,
753                                                        handle,
754                                                        options);
755
756      // If handle was sent twice, or it is impossible to get native handle
757      // out of it - just send a text without the handle.
758      if (!handle)
759        message = message.msg;
760
761      // Update simultaneous accepts on Windows
762      if (obj.simultaneousAccepts && process.platform === 'win32') {
763        handle.setSimultaneousAccepts(true);
764      }
765    } else if (this._handleQueue &&
766               !(message && (message.cmd === 'NODE_HANDLE_ACK' ||
767                             message.cmd === 'NODE_HANDLE_NACK'))) {
768      // Queue request anyway to avoid out-of-order messages.
769      this._handleQueue.push({
770        callback: callback,
771        handle: null,
772        options: options,
773        message: message,
774      });
775      return this._handleQueue.length === 1;
776    }
777
778    const req = new WriteWrap();
779
780    const err = writeChannelMessage(channel, req, message, handle);
781    const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
782
783    if (err === 0) {
784      if (handle) {
785        if (!this._handleQueue)
786          this._handleQueue = [];
787        if (obj && obj.postSend)
788          obj.postSend(message, handle, options, callback, target);
789      }
790
791      if (wasAsyncWrite) {
792        req.oncomplete = () => {
793          control.unref();
794          if (typeof callback === 'function')
795            callback(null);
796        };
797        control.ref();
798      } else if (typeof callback === 'function') {
799        process.nextTick(callback, null);
800      }
801    } else {
802      // Cleanup handle on error
803      if (obj && obj.postSend)
804        obj.postSend(message, handle, options, callback);
805
806      if (!options.swallowErrors) {
807        const ex = errnoException(err, 'write');
808        if (typeof callback === 'function') {
809          process.nextTick(callback, ex);
810        } else {
811          process.nextTick(() => this.emit('error', ex));
812        }
813      }
814    }
815
816    /* If the master is > 2 read() calls behind, please stop sending. */
817    return channel.writeQueueSize < (65536 * 2);
818  };
819
820  // Connected will be set to false immediately when a disconnect() is
821  // requested, even though the channel might still be alive internally to
822  // process queued messages. The three states are distinguished as follows:
823  // - disconnect() never requested: channel is not null and connected
824  //   is true
825  // - disconnect() requested, messages in the queue: channel is not null
826  //   and connected is false
827  // - disconnect() requested, channel actually disconnected: channel is
828  //   null and connected is false
829  target.connected = true;
830
831  target.disconnect = function() {
832    if (!this.connected) {
833      this.emit('error', new ERR_IPC_DISCONNECTED());
834      return;
835    }
836
837    // Do not allow any new messages to be written.
838    this.connected = false;
839
840    // If there are no queued messages, disconnect immediately. Otherwise,
841    // postpone the disconnect so that it happens internally after the
842    // queue is flushed.
843    if (!this._handleQueue)
844      this._disconnect();
845  };
846
847  target._disconnect = function() {
848    assert(this.channel);
849
850    // This marks the fact that the channel is actually disconnected.
851    this.channel = null;
852
853    if (this._pendingMessage)
854      closePendingHandle(this);
855
856    let fired = false;
857    function finish() {
858      if (fired) return;
859      fired = true;
860
861      channel.close();
862      target.emit('disconnect');
863    }
864
865    // If a message is being read, then wait for it to complete.
866    if (channel.buffering) {
867      this.once('message', finish);
868      this.once('internalMessage', finish);
869
870      return;
871    }
872
873    process.nextTick(finish);
874  };
875
876  function emit(event, message, handle) {
877    target.emit(event, message, handle);
878  }
879
880  function handleMessage(message, handle, internal) {
881    if (!target.channel)
882      return;
883
884    const eventName = (internal ? 'internalMessage' : 'message');
885
886    process.nextTick(emit, eventName, message, handle);
887  }
888
889  channel.readStart();
890  return control;
891}
892
893const INTERNAL_PREFIX = 'NODE_';
894function isInternal(message) {
895  return (message !== null &&
896          typeof message === 'object' &&
897          typeof message.cmd === 'string' &&
898          message.cmd.length > INTERNAL_PREFIX.length &&
899          message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX);
900}
901
902function nop() { }
903
904function getValidStdio(stdio, sync) {
905  let ipc;
906  let ipcFd;
907
908  // Replace shortcut with an array
909  if (typeof stdio === 'string') {
910    stdio = stdioStringToArray(stdio);
911  } else if (!ArrayIsArray(stdio)) {
912    throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
913  }
914
915  // At least 3 stdio will be created
916  // Don't concat() a new Array() because it would be sparse, and
917  // stdio.reduce() would skip the sparse elements of stdio.
918  // See https://stackoverflow.com/a/5501711/3561
919  while (stdio.length < 3) stdio.push(undefined);
920
921  // Translate stdio into C++-readable form
922  // (i.e. PipeWraps or fds)
923  stdio = stdio.reduce((acc, stdio, i) => {
924    function cleanup() {
925      for (let i = 0; i < acc.length; i++) {
926        if ((acc[i].type === 'pipe' || acc[i].type === 'ipc') && acc[i].handle)
927          acc[i].handle.close();
928      }
929    }
930
931    // Defaults
932    if (stdio == null) {
933      stdio = i < 3 ? 'pipe' : 'ignore';
934    }
935
936    if (stdio === 'ignore') {
937      acc.push({ type: 'ignore' });
938    } else if (stdio === 'pipe' || (typeof stdio === 'number' && stdio < 0)) {
939      const a = {
940        type: 'pipe',
941        readable: i === 0,
942        writable: i !== 0
943      };
944
945      if (!sync)
946        a.handle = new Pipe(PipeConstants.SOCKET);
947
948      acc.push(a);
949    } else if (stdio === 'ipc') {
950      if (sync || ipc !== undefined) {
951        // Cleanup previously created pipes
952        cleanup();
953        if (!sync)
954          throw new ERR_IPC_ONE_PIPE();
955        else
956          throw new ERR_IPC_SYNC_FORK();
957      }
958
959      ipc = new Pipe(PipeConstants.IPC);
960      ipcFd = i;
961
962      acc.push({
963        type: 'pipe',
964        handle: ipc,
965        ipc: true
966      });
967    } else if (stdio === 'inherit') {
968      acc.push({
969        type: 'inherit',
970        fd: i
971      });
972    } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') {
973      acc.push({
974        type: 'fd',
975        fd: typeof stdio === 'number' ? stdio : stdio.fd
976      });
977    } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) ||
978               getHandleWrapType(stdio._handle)) {
979      const handle = getHandleWrapType(stdio) ?
980        stdio :
981        getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle;
982
983      acc.push({
984        type: 'wrap',
985        wrapType: getHandleWrapType(handle),
986        handle: handle,
987        _stdio: stdio
988      });
989    } else if (isArrayBufferView(stdio) || typeof stdio === 'string') {
990      if (!sync) {
991        cleanup();
992        throw new ERR_INVALID_SYNC_FORK_INPUT(inspect(stdio));
993      }
994    } else {
995      // Cleanup
996      cleanup();
997      throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
998    }
999
1000    return acc;
1001  }, []);
1002
1003  return { stdio, ipc, ipcFd };
1004}
1005
1006
1007function getSocketList(type, worker, key) {
1008  const sockets = worker.channel.sockets[type];
1009  let socketList = sockets[key];
1010  if (!socketList) {
1011    const Construct = type === 'send' ? SocketListSend : SocketListReceive;
1012    socketList = sockets[key] = new Construct(worker, key);
1013  }
1014  return socketList;
1015}
1016
1017
1018function maybeClose(subprocess) {
1019  subprocess._closesGot++;
1020
1021  if (subprocess._closesGot === subprocess._closesNeeded) {
1022    subprocess.emit('close', subprocess.exitCode, subprocess.signalCode);
1023  }
1024}
1025
1026function spawnSync(opts) {
1027  const options = opts.options;
1028  const result = spawn_sync.spawn(options);
1029
1030  if (result.output && options.encoding && options.encoding !== 'buffer') {
1031    for (let i = 0; i < result.output.length; i++) {
1032      if (!result.output[i])
1033        continue;
1034      result.output[i] = result.output[i].toString(options.encoding);
1035    }
1036  }
1037
1038  result.stdout = result.output && result.output[1];
1039  result.stderr = result.output && result.output[2];
1040
1041  if (result.error) {
1042    result.error = errnoException(result.error, 'spawnSync ' + opts.file);
1043    result.error.path = opts.file;
1044    result.error.spawnargs = opts.args.slice(1);
1045  }
1046
1047  return result;
1048}
1049
1050module.exports = {
1051  ChildProcess,
1052  setupChannel,
1053  getValidStdio,
1054  stdioStringToArray,
1055  spawnSync
1056};
1057