• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ArrayIsArray,
5  ObjectDefineProperty,
6  ObjectSetPrototypeOf,
7  Symbol,
8  Uint8Array,
9} = primordials;
10
11const {
12  errnoException,
13  codes: {
14    ERR_INVALID_ARG_TYPE,
15    ERR_INVALID_HANDLE_TYPE,
16    ERR_INVALID_OPT_VALUE,
17    ERR_INVALID_SYNC_FORK_INPUT,
18    ERR_IPC_CHANNEL_CLOSED,
19    ERR_IPC_DISCONNECTED,
20    ERR_IPC_ONE_PIPE,
21    ERR_IPC_SYNC_FORK,
22    ERR_MISSING_ARGS
23  }
24} = require('internal/errors');
25const {
26  validateArray,
27  validateOneOf,
28  validateString,
29} = require('internal/validators');
30const EventEmitter = require('events');
31const net = require('net');
32const dgram = require('dgram');
33const inspect = require('internal/util/inspect').inspect;
34const assert = require('internal/assert');
35
36const { Process } = internalBinding('process_wrap');
37const {
38  WriteWrap,
39  kReadBytesOrError,
40  kArrayBufferOffset,
41  kLastWriteWasAsync,
42  streamBaseState
43} = internalBinding('stream_wrap');
44const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
45const { TCP } = internalBinding('tcp_wrap');
46const { TTY } = internalBinding('tty_wrap');
47const { UDP } = internalBinding('udp_wrap');
48const SocketList = require('internal/socket_list');
49const { owner_symbol } = require('internal/async_hooks').symbols;
50const { convertToValidSignal, deprecate } = require('internal/util');
51const { isArrayBufferView } = require('internal/util/types');
52const spawn_sync = internalBinding('spawn_sync');
53const { kStateSymbol } = require('internal/dgram');
54
55const {
56  UV_EACCES,
57  UV_EAGAIN,
58  UV_EINVAL,
59  UV_EMFILE,
60  UV_ENFILE,
61  UV_ENOENT,
62  UV_ENOSYS,
63  UV_ESRCH
64} = internalBinding('uv');
65
66const { SocketListSend, SocketListReceive } = SocketList;
67
68// Lazy loaded for startup performance and to allow monkey patching of
69// internalBinding('http_parser').HTTPParser.
70let freeParser;
71let HTTPParser;
72
73const MAX_HANDLE_RETRANSMISSIONS = 3;
74const kChannelHandle = Symbol('kChannelHandle');
75const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');
76
77// This object contain function to convert TCP objects to native handle objects
78// and back again.
79const handleConversion = {
80  'net.Native': {
81    simultaneousAccepts: true,
82
83    send(message, handle, options) {
84      return handle;
85    },
86
87    got(message, handle, emit) {
88      emit(handle);
89    }
90  },
91
92  'net.Server': {
93    simultaneousAccepts: true,
94
95    send(message, server, options) {
96      return server._handle;
97    },
98
99    got(message, handle, emit) {
100      const server = new net.Server();
101      server.listen(handle, () => {
102        emit(server);
103      });
104    }
105  },
106
107  'net.Socket': {
108    send(message, socket, options) {
109      if (!socket._handle)
110        return;
111
112      // If the socket was created by net.Server
113      if (socket.server) {
114        // The worker should keep track of the socket
115        message.key = socket.server._connectionKey;
116
117        const firstTime = !this[kChannelHandle].sockets.send[message.key];
118        const socketList = getSocketList('send', this, message.key);
119
120        // The server should no longer expose a .connection property
121        // and when asked to close it should query the socket status from
122        // the workers
123        if (firstTime) socket.server._setupWorker(socketList);
124
125        // Act like socket is detached
126        if (!options.keepOpen)
127          socket.server._connections--;
128      }
129
130      const handle = socket._handle;
131
132      // Remove handle from socket object, it will be closed when the socket
133      // will be sent
134      if (!options.keepOpen) {
135        handle.onread = nop;
136        socket._handle = null;
137        socket.setTimeout(0);
138
139        if (freeParser === undefined)
140          freeParser = require('_http_common').freeParser;
141        if (HTTPParser === undefined)
142          HTTPParser = require('_http_common').HTTPParser;
143
144        // In case of an HTTP connection socket, release the associated
145        // resources
146        if (socket.parser && socket.parser instanceof HTTPParser) {
147          freeParser(socket.parser, null, socket);
148          if (socket._httpMessage)
149            socket._httpMessage.detachSocket(socket);
150        }
151      }
152
153      return handle;
154    },
155
156    postSend(message, handle, options, callback, target) {
157      // Store the handle after successfully sending it, so it can be closed
158      // when the NODE_HANDLE_ACK is received. If the handle could not be sent,
159      // just close it.
160      if (handle && !options.keepOpen) {
161        if (target) {
162          // There can only be one _pendingMessage as passing handles are
163          // processed one at a time: handles are stored in _handleQueue while
164          // waiting for the NODE_HANDLE_ACK of the current passing handle.
165          assert(!target._pendingMessage);
166          target._pendingMessage =
167              { callback, message, handle, options, retransmissions: 0 };
168        } else {
169          handle.close();
170        }
171      }
172    },
173
174    got(message, handle, emit) {
175      const socket = new net.Socket({
176        handle: handle,
177        readable: true,
178        writable: true
179      });
180
181      // If the socket was created by net.Server we will track the socket
182      if (message.key) {
183
184        // Add socket to connections list
185        const socketList = getSocketList('got', this, message.key);
186        socketList.add({
187          socket: socket
188        });
189      }
190
191      emit(socket);
192    }
193  },
194
195  'dgram.Native': {
196    simultaneousAccepts: false,
197
198    send(message, handle, options) {
199      return handle;
200    },
201
202    got(message, handle, emit) {
203      emit(handle);
204    }
205  },
206
207  'dgram.Socket': {
208    simultaneousAccepts: false,
209
210    send(message, socket, options) {
211      message.dgramType = socket.type;
212
213      return socket[kStateSymbol].handle;
214    },
215
216    got(message, handle, emit) {
217      const socket = new dgram.Socket(message.dgramType);
218
219      socket.bind(handle, () => {
220        emit(socket);
221      });
222    }
223  }
224};
225
226function stdioStringToArray(stdio, channel) {
227  const options = [];
228
229  switch (stdio) {
230    case 'ignore':
231    case 'overlapped':
232    case 'pipe': options.push(stdio, stdio, stdio); break;
233    case 'inherit': options.push(0, 1, 2); break;
234    default:
235      throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
236  }
237
238  if (channel) options.push(channel);
239
240  return options;
241}
242
243function ChildProcess() {
244  EventEmitter.call(this);
245
246  this._closesNeeded = 1;
247  this._closesGot = 0;
248  this.connected = false;
249
250  this.signalCode = null;
251  this.exitCode = null;
252  this.killed = false;
253  this.spawnfile = null;
254
255  this._handle = new Process();
256  this._handle[owner_symbol] = this;
257
258  this._handle.onexit = (exitCode, signalCode) => {
259    if (signalCode) {
260      this.signalCode = signalCode;
261    } else {
262      this.exitCode = exitCode;
263    }
264
265    if (this.stdin) {
266      this.stdin.destroy();
267    }
268
269    this._handle.close();
270    this._handle = null;
271
272    if (exitCode < 0) {
273      const syscall = this.spawnfile ? 'spawn ' + this.spawnfile : 'spawn';
274      const err = errnoException(exitCode, syscall);
275
276      if (this.spawnfile)
277        err.path = this.spawnfile;
278
279      err.spawnargs = this.spawnargs.slice(1);
280      this.emit('error', err);
281    } else {
282      this.emit('exit', this.exitCode, this.signalCode);
283    }
284
285    // If any of the stdio streams have not been touched,
286    // then pull all the data through so that it can get the
287    // eof and emit a 'close' event.
288    // Do it on nextTick so that the user has one last chance
289    // to consume the output, if for example they only want to
290    // start reading the data once the process exits.
291    process.nextTick(flushStdio, this);
292
293    maybeClose(this);
294  };
295}
296ObjectSetPrototypeOf(ChildProcess.prototype, EventEmitter.prototype);
297ObjectSetPrototypeOf(ChildProcess, EventEmitter);
298
299
300function flushStdio(subprocess) {
301  const stdio = subprocess.stdio;
302
303  if (stdio == null) return;
304
305  for (let i = 0; i < stdio.length; i++) {
306    const stream = stdio[i];
307    // TODO(addaleax): This doesn't necessarily account for all the ways in
308    // which data can be read from a stream, e.g. being consumed on the
309    // native layer directly as a StreamBase.
310    if (!stream || !stream.readable || stream[kIsUsedAsStdio]) {
311      continue;
312    }
313    stream.resume();
314  }
315}
316
317
318function createSocket(pipe, readable) {
319  return net.Socket({ handle: pipe, readable, writable: !readable });
320}
321
322
323function getHandleWrapType(stream) {
324  if (stream instanceof Pipe) return 'pipe';
325  if (stream instanceof TTY) return 'tty';
326  if (stream instanceof TCP) return 'tcp';
327  if (stream instanceof UDP) return 'udp';
328
329  return false;
330}
331
332function closePendingHandle(target) {
333  target._pendingMessage.handle.close();
334  target._pendingMessage = null;
335}
336
337
338ChildProcess.prototype.spawn = function(options) {
339  let i = 0;
340
341  if (options === null || typeof options !== 'object') {
342    throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
343  }
344
345  // If no `stdio` option was given - use default
346  let stdio = options.stdio || 'pipe';
347
348  stdio = getValidStdio(stdio, false);
349
350  const ipc = stdio.ipc;
351  const ipcFd = stdio.ipcFd;
352  stdio = options.stdio = stdio.stdio;
353
354
355  validateOneOf(options.serialization, 'options.serialization',
356                [undefined, 'json', 'advanced'], true);
357  const serialization = options.serialization || 'json';
358
359  if (ipc !== undefined) {
360    // Let child process know about opened IPC channel
361    if (options.envPairs === undefined)
362      options.envPairs = [];
363    else
364      validateArray(options.envPairs, 'options.envPairs');
365
366    options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`);
367    options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
368  }
369
370  validateString(options.file, 'options.file');
371  this.spawnfile = options.file;
372
373  if (ArrayIsArray(options.args))
374    this.spawnargs = options.args;
375  else if (options.args === undefined)
376    this.spawnargs = [];
377  else
378    throw new ERR_INVALID_ARG_TYPE('options.args', 'Array', options.args);
379
380  const err = this._handle.spawn(options);
381
382  // Run-time errors should emit an error, not throw an exception.
383  if (err === UV_EACCES ||
384      err === UV_EAGAIN ||
385      err === UV_EMFILE ||
386      err === UV_ENFILE ||
387      err === UV_ENOENT) {
388    process.nextTick(onErrorNT, this, err);
389
390    // There is no point in continuing when we've hit EMFILE or ENFILE
391    // because we won't be able to set up the stdio file descriptors.
392    if (err === UV_EMFILE || err === UV_ENFILE)
393      return err;
394  } else if (err) {
395    // Close all opened fds on error
396    for (i = 0; i < stdio.length; i++) {
397      const stream = stdio[i];
398      if (stream.type === 'pipe') {
399        stream.handle.close();
400      }
401    }
402
403    this._handle.close();
404    this._handle = null;
405    throw errnoException(err, 'spawn');
406  } else {
407    process.nextTick(onSpawnNT, this);
408  }
409
410  this.pid = this._handle.pid;
411
412  for (i = 0; i < stdio.length; i++) {
413    const stream = stdio[i];
414    if (stream.type === 'ignore') continue;
415
416    if (stream.ipc) {
417      this._closesNeeded++;
418      continue;
419    }
420
421    // The stream is already cloned and piped, thus stop its readable side,
422    // otherwise we might attempt to read from the stream when at the same time
423    // the child process does.
424    if (stream.type === 'wrap') {
425      stream.handle.reading = false;
426      stream.handle.readStop();
427      stream._stdio.pause();
428      stream._stdio.readableFlowing = false;
429      stream._stdio._readableState.reading = false;
430      stream._stdio[kIsUsedAsStdio] = true;
431      continue;
432    }
433
434    if (stream.handle) {
435      // When i === 0 - we're dealing with stdin
436      // (which is the only one writable pipe).
437      stream.socket = createSocket(this.pid !== 0 ?
438        stream.handle : null, i > 0);
439
440      if (i > 0 && this.pid !== 0) {
441        this._closesNeeded++;
442        stream.socket.on('close', () => {
443          maybeClose(this);
444        });
445      }
446    }
447  }
448
449  this.stdin = stdio.length >= 1 && stdio[0].socket !== undefined ?
450    stdio[0].socket : null;
451  this.stdout = stdio.length >= 2 && stdio[1].socket !== undefined ?
452    stdio[1].socket : null;
453  this.stderr = stdio.length >= 3 && stdio[2].socket !== undefined ?
454    stdio[2].socket : null;
455
456  this.stdio = [];
457
458  for (i = 0; i < stdio.length; i++)
459    this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
460
461  // Add .send() method and start listening for IPC data
462  if (ipc !== undefined) setupChannel(this, ipc, serialization);
463
464  return err;
465};
466
467
468function onErrorNT(self, err) {
469  self._handle.onexit(err);
470}
471
472
473function onSpawnNT(self) {
474  self.emit('spawn');
475}
476
477
478ChildProcess.prototype.kill = function(sig) {
479
480  const signal = sig === 0 ? sig :
481    convertToValidSignal(sig === undefined ? 'SIGTERM' : sig);
482
483  if (this._handle) {
484    const err = this._handle.kill(signal);
485    if (err === 0) {
486      /* Success. */
487      this.killed = true;
488      return true;
489    }
490    if (err === UV_ESRCH) {
491      /* Already dead. */
492    } else if (err === UV_EINVAL || err === UV_ENOSYS) {
493      /* The underlying platform doesn't support this signal. */
494      throw errnoException(err, 'kill');
495    } else {
496      /* Other error, almost certainly EPERM. */
497      this.emit('error', errnoException(err, 'kill'));
498    }
499  }
500
501  /* Kill didn't succeed. */
502  return false;
503};
504
505
506ChildProcess.prototype.ref = function() {
507  if (this._handle) this._handle.ref();
508};
509
510
511ChildProcess.prototype.unref = function() {
512  if (this._handle) this._handle.unref();
513};
514
515class Control extends EventEmitter {
516  #channel = null;
517  #refs = 0;
518  #refExplicitlySet = false;
519
520  constructor(channel) {
521    super();
522    this.#channel = channel;
523  }
524
525  // The methods keeping track of the counter are being used to track the
526  // listener count on the child process object as well as when writes are
527  // in progress. Once the user has explicitly requested a certain state, these
528  // methods become no-ops in order to not interfere with the user's intentions.
529  refCounted() {
530    if (++this.#refs === 1 && !this.#refExplicitlySet) {
531      this.#channel.ref();
532    }
533  }
534
535  unrefCounted() {
536    if (--this.#refs === 0 && !this.#refExplicitlySet) {
537      this.#channel.unref();
538      this.emit('unref');
539    }
540  }
541
542  ref() {
543    this.#refExplicitlySet = true;
544    this.#channel.ref();
545  }
546
547  unref() {
548    this.#refExplicitlySet = true;
549    this.#channel.unref();
550  }
551
552  get fd() {
553    return this.#channel ? this.#channel.fd : undefined;
554  }
555}
556
557const channelDeprecationMsg = '_channel is deprecated. ' +
558                              'Use ChildProcess.channel instead.';
559
560let serialization;
561function setupChannel(target, channel, serializationMode) {
562  const control = new Control(channel);
563  target.channel = control;
564  target[kChannelHandle] = channel;
565
566  ObjectDefineProperty(target, '_channel', {
567    get: deprecate(() => {
568      return target.channel;
569    }, channelDeprecationMsg, 'DEP0129'),
570    set: deprecate((val) => {
571      target.channel = val;
572    }, channelDeprecationMsg, 'DEP0129'),
573    configurable: true,
574    enumerable: false
575  });
576
577  target._handleQueue = null;
578  target._pendingMessage = null;
579
580  if (serialization === undefined)
581    serialization = require('internal/child_process/serialization');
582  const {
583    initMessageChannel,
584    parseChannelMessages,
585    writeChannelMessage
586  } = serialization[serializationMode];
587
588  let pendingHandle = null;
589  initMessageChannel(channel);
590  channel.pendingHandle = null;
591  channel.onread = function(arrayBuffer) {
592    const recvHandle = channel.pendingHandle;
593    channel.pendingHandle = null;
594    if (arrayBuffer) {
595      const nread = streamBaseState[kReadBytesOrError];
596      const offset = streamBaseState[kArrayBufferOffset];
597      const pool = new Uint8Array(arrayBuffer, offset, nread);
598      if (recvHandle)
599        pendingHandle = recvHandle;
600
601      for (const message of parseChannelMessages(channel, pool)) {
602        // There will be at most one NODE_HANDLE message in every chunk we
603        // read because SCM_RIGHTS messages don't get coalesced. Make sure
604        // that we deliver the handle with the right message however.
605        if (isInternal(message)) {
606          if (message.cmd === 'NODE_HANDLE') {
607            handleMessage(message, pendingHandle, true);
608            pendingHandle = null;
609          } else {
610            handleMessage(message, undefined, true);
611          }
612        } else {
613          handleMessage(message, undefined, false);
614        }
615      }
616    } else {
617      this.buffering = false;
618      target.disconnect();
619      channel.onread = nop;
620      channel.close();
621      target.channel = null;
622      maybeClose(target);
623    }
624  };
625
626  // Object where socket lists will live
627  channel.sockets = { got: {}, send: {} };
628
629  // Handlers will go through this
630  target.on('internalMessage', function(message, handle) {
631    // Once acknowledged - continue sending handles.
632    if (message.cmd === 'NODE_HANDLE_ACK' ||
633        message.cmd === 'NODE_HANDLE_NACK') {
634
635      if (target._pendingMessage) {
636        if (message.cmd === 'NODE_HANDLE_ACK') {
637          closePendingHandle(target);
638        } else if (target._pendingMessage.retransmissions++ ===
639                   MAX_HANDLE_RETRANSMISSIONS) {
640          closePendingHandle(target);
641          process.emitWarning('Handle did not reach the receiving process ' +
642                              'correctly', 'SentHandleNotReceivedWarning');
643        }
644      }
645
646      assert(ArrayIsArray(target._handleQueue));
647      const queue = target._handleQueue;
648      target._handleQueue = null;
649
650      if (target._pendingMessage) {
651        target._send(target._pendingMessage.message,
652                     target._pendingMessage.handle,
653                     target._pendingMessage.options,
654                     target._pendingMessage.callback);
655      }
656
657      for (let i = 0; i < queue.length; i++) {
658        const args = queue[i];
659        target._send(args.message, args.handle, args.options, args.callback);
660      }
661
662      // Process a pending disconnect (if any).
663      if (!target.connected && target.channel && !target._handleQueue)
664        target._disconnect();
665
666      return;
667    }
668
669    if (message.cmd !== 'NODE_HANDLE') return;
670
671    // It is possible that the handle is not received because of some error on
672    // ancillary data reception such as MSG_CTRUNC. In this case, report the
673    // sender about it by sending a NODE_HANDLE_NACK message.
674    if (!handle)
675      return target._send({ cmd: 'NODE_HANDLE_NACK' }, null, true);
676
677    // Acknowledge handle receival. Don't emit error events (for example if
678    // the other side has disconnected) because this call to send() is not
679    // initiated by the user and it shouldn't be fatal to be unable to ACK
680    // a message.
681    target._send({ cmd: 'NODE_HANDLE_ACK' }, null, true);
682
683    const obj = handleConversion[message.type];
684
685    // Update simultaneous accepts on Windows
686    if (process.platform === 'win32') {
687      handle.setSimultaneousAccepts(false);
688    }
689
690    // Convert handle object
691    obj.got.call(this, message, handle, (handle) => {
692      handleMessage(message.msg, handle, isInternal(message.msg));
693    });
694  });
695
696  target.send = function(message, handle, options, callback) {
697    if (typeof handle === 'function') {
698      callback = handle;
699      handle = undefined;
700      options = undefined;
701    } else if (typeof options === 'function') {
702      callback = options;
703      options = undefined;
704    } else if (options !== undefined &&
705               (options === null || typeof options !== 'object')) {
706      throw new ERR_INVALID_ARG_TYPE('options', 'Object', options);
707    }
708
709    options = { swallowErrors: false, ...options };
710
711    if (this.connected) {
712      return this._send(message, handle, options, callback);
713    }
714    const ex = new ERR_IPC_CHANNEL_CLOSED();
715    if (typeof callback === 'function') {
716      process.nextTick(callback, ex);
717    } else {
718      process.nextTick(() => this.emit('error', ex));
719    }
720    return false;
721  };
722
723  target._send = function(message, handle, options, callback) {
724    assert(this.connected || this.channel);
725
726    if (message === undefined)
727      throw new ERR_MISSING_ARGS('message');
728
729    // Non-serializable messages should not reach the remote
730    // end point; as any failure in the stringification there
731    // will result in error message that is weakly consumable.
732    // So perform a final check on message prior to sending.
733    if (typeof message !== 'string' &&
734        typeof message !== 'object' &&
735        typeof message !== 'number' &&
736        typeof message !== 'boolean') {
737      throw new ERR_INVALID_ARG_TYPE(
738        'message', ['string', 'object', 'number', 'boolean'], message);
739    }
740
741    // Support legacy function signature
742    if (typeof options === 'boolean') {
743      options = { swallowErrors: options };
744    }
745
746    let obj;
747
748    // Package messages with a handle object
749    if (handle) {
750      // This message will be handled by an internalMessage event handler
751      message = {
752        cmd: 'NODE_HANDLE',
753        type: null,
754        msg: message
755      };
756
757      if (handle instanceof net.Socket) {
758        message.type = 'net.Socket';
759      } else if (handle instanceof net.Server) {
760        message.type = 'net.Server';
761      } else if (handle instanceof TCP || handle instanceof Pipe) {
762        message.type = 'net.Native';
763      } else if (handle instanceof dgram.Socket) {
764        message.type = 'dgram.Socket';
765      } else if (handle instanceof UDP) {
766        message.type = 'dgram.Native';
767      } else {
768        throw new ERR_INVALID_HANDLE_TYPE();
769      }
770
771      // Queue-up message and handle if we haven't received ACK yet.
772      if (this._handleQueue) {
773        this._handleQueue.push({
774          callback: callback,
775          handle: handle,
776          options: options,
777          message: message.msg,
778        });
779        return this._handleQueue.length === 1;
780      }
781
782      obj = handleConversion[message.type];
783
784      // convert TCP object to native handle object
785      handle = handleConversion[message.type].send.call(target,
786                                                        message,
787                                                        handle,
788                                                        options);
789
790      // If handle was sent twice, or it is impossible to get native handle
791      // out of it - just send a text without the handle.
792      if (!handle)
793        message = message.msg;
794
795      // Update simultaneous accepts on Windows
796      if (obj.simultaneousAccepts && process.platform === 'win32') {
797        handle.setSimultaneousAccepts(true);
798      }
799    } else if (this._handleQueue &&
800               !(message && (message.cmd === 'NODE_HANDLE_ACK' ||
801                             message.cmd === 'NODE_HANDLE_NACK'))) {
802      // Queue request anyway to avoid out-of-order messages.
803      this._handleQueue.push({
804        callback: callback,
805        handle: null,
806        options: options,
807        message: message,
808      });
809      return this._handleQueue.length === 1;
810    }
811
812    const req = new WriteWrap();
813
814    const err = writeChannelMessage(channel, req, message, handle);
815    const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
816
817    if (err === 0) {
818      if (handle) {
819        if (!this._handleQueue)
820          this._handleQueue = [];
821        if (obj && obj.postSend)
822          obj.postSend(message, handle, options, callback, target);
823      }
824
825      if (wasAsyncWrite) {
826        req.oncomplete = () => {
827          control.unrefCounted();
828          if (typeof callback === 'function')
829            callback(null);
830        };
831        control.refCounted();
832      } else if (typeof callback === 'function') {
833        process.nextTick(callback, null);
834      }
835    } else {
836      // Cleanup handle on error
837      if (obj && obj.postSend)
838        obj.postSend(message, handle, options, callback);
839
840      if (!options.swallowErrors) {
841        const ex = errnoException(err, 'write');
842        if (typeof callback === 'function') {
843          process.nextTick(callback, ex);
844        } else {
845          process.nextTick(() => this.emit('error', ex));
846        }
847      }
848    }
849
850    /* If the master is > 2 read() calls behind, please stop sending. */
851    return channel.writeQueueSize < (65536 * 2);
852  };
853
854  // Connected will be set to false immediately when a disconnect() is
855  // requested, even though the channel might still be alive internally to
856  // process queued messages. The three states are distinguished as follows:
857  // - disconnect() never requested: channel is not null and connected
858  //   is true
859  // - disconnect() requested, messages in the queue: channel is not null
860  //   and connected is false
861  // - disconnect() requested, channel actually disconnected: channel is
862  //   null and connected is false
863  target.connected = true;
864
865  target.disconnect = function() {
866    if (!this.connected) {
867      this.emit('error', new ERR_IPC_DISCONNECTED());
868      return;
869    }
870
871    // Do not allow any new messages to be written.
872    this.connected = false;
873
874    // If there are no queued messages, disconnect immediately. Otherwise,
875    // postpone the disconnect so that it happens internally after the
876    // queue is flushed.
877    if (!this._handleQueue)
878      this._disconnect();
879  };
880
881  target._disconnect = function() {
882    assert(this.channel);
883
884    // This marks the fact that the channel is actually disconnected.
885    this.channel = null;
886    this[kChannelHandle] = null;
887
888    if (this._pendingMessage)
889      closePendingHandle(this);
890
891    let fired = false;
892    function finish() {
893      if (fired) return;
894      fired = true;
895
896      channel.close();
897      target.emit('disconnect');
898    }
899
900    // If a message is being read, then wait for it to complete.
901    if (channel.buffering) {
902      this.once('message', finish);
903      this.once('internalMessage', finish);
904
905      return;
906    }
907
908    process.nextTick(finish);
909  };
910
911  function emit(event, message, handle) {
912    target.emit(event, message, handle);
913  }
914
915  function handleMessage(message, handle, internal) {
916    if (!target.channel)
917      return;
918
919    const eventName = (internal ? 'internalMessage' : 'message');
920
921    process.nextTick(emit, eventName, message, handle);
922  }
923
924  channel.readStart();
925  return control;
926}
927
928const INTERNAL_PREFIX = 'NODE_';
929function isInternal(message) {
930  return (message !== null &&
931          typeof message === 'object' &&
932          typeof message.cmd === 'string' &&
933          message.cmd.length > INTERNAL_PREFIX.length &&
934          message.cmd.slice(0, INTERNAL_PREFIX.length) === INTERNAL_PREFIX);
935}
936
937function nop() { }
938
939function getValidStdio(stdio, sync) {
940  let ipc;
941  let ipcFd;
942
943  // Replace shortcut with an array
944  if (typeof stdio === 'string') {
945    stdio = stdioStringToArray(stdio);
946  } else if (!ArrayIsArray(stdio)) {
947    throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
948  }
949
950  // At least 3 stdio will be created
951  // Don't concat() a new Array() because it would be sparse, and
952  // stdio.reduce() would skip the sparse elements of stdio.
953  // See https://stackoverflow.com/a/5501711/3561
954  while (stdio.length < 3) stdio.push(undefined);
955
956  // Translate stdio into C++-readable form
957  // (i.e. PipeWraps or fds)
958  stdio = stdio.reduce((acc, stdio, i) => {
959    function cleanup() {
960      for (let i = 0; i < acc.length; i++) {
961        if ((acc[i].type === 'pipe' || acc[i].type === 'ipc') && acc[i].handle)
962          acc[i].handle.close();
963      }
964    }
965
966    // Defaults
967    if (stdio == null) {
968      stdio = i < 3 ? 'pipe' : 'ignore';
969    }
970
971    if (stdio === 'ignore') {
972      acc.push({ type: 'ignore' });
973    } else if (stdio === 'pipe' || stdio === 'overlapped' ||
974               (typeof stdio === 'number' && stdio < 0)) {
975      const a = {
976        type: stdio === 'overlapped' ? 'overlapped' : 'pipe',
977        readable: i === 0,
978        writable: i !== 0
979      };
980
981      if (!sync)
982        a.handle = new Pipe(PipeConstants.SOCKET);
983
984      acc.push(a);
985    } else if (stdio === 'ipc') {
986      if (sync || ipc !== undefined) {
987        // Cleanup previously created pipes
988        cleanup();
989        if (!sync)
990          throw new ERR_IPC_ONE_PIPE();
991        else
992          throw new ERR_IPC_SYNC_FORK();
993      }
994
995      ipc = new Pipe(PipeConstants.IPC);
996      ipcFd = i;
997
998      acc.push({
999        type: 'pipe',
1000        handle: ipc,
1001        ipc: true
1002      });
1003    } else if (stdio === 'inherit') {
1004      acc.push({
1005        type: 'inherit',
1006        fd: i
1007      });
1008    } else if (typeof stdio === 'number' || typeof stdio.fd === 'number') {
1009      acc.push({
1010        type: 'fd',
1011        fd: typeof stdio === 'number' ? stdio : stdio.fd
1012      });
1013    } else if (getHandleWrapType(stdio) || getHandleWrapType(stdio.handle) ||
1014               getHandleWrapType(stdio._handle)) {
1015      const handle = getHandleWrapType(stdio) ?
1016        stdio :
1017        getHandleWrapType(stdio.handle) ? stdio.handle : stdio._handle;
1018
1019      acc.push({
1020        type: 'wrap',
1021        wrapType: getHandleWrapType(handle),
1022        handle: handle,
1023        _stdio: stdio
1024      });
1025    } else if (isArrayBufferView(stdio) || typeof stdio === 'string') {
1026      if (!sync) {
1027        cleanup();
1028        throw new ERR_INVALID_SYNC_FORK_INPUT(inspect(stdio));
1029      }
1030    } else {
1031      // Cleanup
1032      cleanup();
1033      throw new ERR_INVALID_OPT_VALUE('stdio', stdio);
1034    }
1035
1036    return acc;
1037  }, []);
1038
1039  return { stdio, ipc, ipcFd };
1040}
1041
1042
1043function getSocketList(type, worker, key) {
1044  const sockets = worker[kChannelHandle].sockets[type];
1045  let socketList = sockets[key];
1046  if (!socketList) {
1047    const Construct = type === 'send' ? SocketListSend : SocketListReceive;
1048    socketList = sockets[key] = new Construct(worker, key);
1049  }
1050  return socketList;
1051}
1052
1053
1054function maybeClose(subprocess) {
1055  subprocess._closesGot++;
1056
1057  if (subprocess._closesGot === subprocess._closesNeeded) {
1058    subprocess.emit('close', subprocess.exitCode, subprocess.signalCode);
1059  }
1060}
1061
1062function spawnSync(options) {
1063  const result = spawn_sync.spawn(options);
1064
1065  if (result.output && options.encoding && options.encoding !== 'buffer') {
1066    for (let i = 0; i < result.output.length; i++) {
1067      if (!result.output[i])
1068        continue;
1069      result.output[i] = result.output[i].toString(options.encoding);
1070    }
1071  }
1072
1073  result.stdout = result.output && result.output[1];
1074  result.stderr = result.output && result.output[2];
1075
1076  if (result.error) {
1077    result.error = errnoException(result.error, 'spawnSync ' + options.file);
1078    result.error.path = options.file;
1079    result.error.spawnargs = options.args.slice(1);
1080  }
1081
1082  return result;
1083}
1084
1085module.exports = {
1086  ChildProcess,
1087  kChannelHandle,
1088  setupChannel,
1089  getValidStdio,
1090  stdioStringToArray,
1091  spawnSync
1092};
1093