• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22// A bit simpler than readable streams.
23// Implement an async ._write(chunk, encoding, cb), and it'll handle all
24// the drain event emission and buffering.
25
26'use strict';
27
28const {
29  FunctionPrototype,
30  ObjectDefineProperty,
31  ObjectDefineProperties,
32  ObjectSetPrototypeOf,
33  Symbol,
34  SymbolHasInstance,
35} = primordials;
36
37module.exports = Writable;
38Writable.WritableState = WritableState;
39
40const EE = require('events');
41const Stream = require('internal/streams/legacy').Stream;
42const { Buffer } = require('buffer');
43const destroyImpl = require('internal/streams/destroy');
44const {
45  getHighWaterMark,
46  getDefaultHighWaterMark
47} = require('internal/streams/state');
48const {
49  ERR_INVALID_ARG_TYPE,
50  ERR_METHOD_NOT_IMPLEMENTED,
51  ERR_MULTIPLE_CALLBACK,
52  ERR_STREAM_CANNOT_PIPE,
53  ERR_STREAM_DESTROYED,
54  ERR_STREAM_ALREADY_FINISHED,
55  ERR_STREAM_NULL_VALUES,
56  ERR_STREAM_WRITE_AFTER_END,
57  ERR_UNKNOWN_ENCODING
58} = require('internal/errors').codes;
59
60const { errorOrDestroy } = destroyImpl;
61
62ObjectSetPrototypeOf(Writable.prototype, Stream.prototype);
63ObjectSetPrototypeOf(Writable, Stream);
64
65function nop() {}
66
67function WritableState(options, stream, isDuplex) {
68  // Duplex streams are both readable and writable, but share
69  // the same options object.
70  // However, some cases require setting options to different
71  // values for the readable and the writable sides of the duplex stream,
72  // e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
73  if (typeof isDuplex !== 'boolean')
74    isDuplex = stream instanceof Stream.Duplex;
75
76  // Object stream flag to indicate whether or not this stream
77  // contains buffers or objects.
78  this.objectMode = !!(options && options.objectMode);
79
80  if (isDuplex)
81    this.objectMode = this.objectMode ||
82      !!(options && options.writableObjectMode);
83
84  // The point at which write() starts returning false
85  // Note: 0 is a valid value, means that we always return false if
86  // the entire buffer is not flushed immediately on write().
87  this.highWaterMark = options ?
88    getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) :
89    getDefaultHighWaterMark(false);
90
91  // if _final has been called.
92  this.finalCalled = false;
93
94  // drain event flag.
95  this.needDrain = false;
96  // At the start of calling end()
97  this.ending = false;
98  // When end() has been called, and returned.
99  this.ended = false;
100  // When 'finish' is emitted.
101  this.finished = false;
102
103  // Has it been destroyed
104  this.destroyed = false;
105
106  // Should we decode strings into buffers before passing to _write?
107  // this is here so that some node-core streams can optimize string
108  // handling at a lower level.
109  const noDecode = !!(options && options.decodeStrings === false);
110  this.decodeStrings = !noDecode;
111
112  // Crypto is kind of old and crusty.  Historically, its default string
113  // encoding is 'binary' so we have to make this configurable.
114  // Everything else in the universe uses 'utf8', though.
115  this.defaultEncoding = (options && options.defaultEncoding) || 'utf8';
116
117  // Not an actual buffer we keep track of, but a measurement
118  // of how much we're waiting to get pushed to some underlying
119  // socket or file.
120  this.length = 0;
121
122  // A flag to see when we're in the middle of a write.
123  this.writing = false;
124
125  // When true all writes will be buffered until .uncork() call.
126  this.corked = 0;
127
128  // A flag to be able to tell if the onwrite cb is called immediately,
129  // or on a later tick.  We set this to true at first, because any
130  // actions that shouldn't happen until "later" should generally also
131  // not happen before the first write call.
132  this.sync = true;
133
134  // A flag to know if we're processing previously buffered items, which
135  // may call the _write() callback in the same tick, so that we don't
136  // end up in an overlapped onwrite situation.
137  this.bufferProcessing = false;
138
139  // The callback that's passed to _write(chunk, cb).
140  this.onwrite = onwrite.bind(undefined, stream);
141
142  // The callback that the user supplies to write(chunk, encoding, cb).
143  this.writecb = null;
144
145  // The amount that is being written when _write is called.
146  this.writelen = 0;
147
148  // Storage for data passed to the afterWrite() callback in case of
149  // synchronous _write() completion.
150  this.afterWriteTickInfo = null;
151
152  resetBuffer(this);
153
154  // Number of pending user-supplied write callbacks
155  // this must be 0 before 'finish' can be emitted.
156  this.pendingcb = 0;
157
158  // Emit prefinish if the only thing we're waiting for is _write cbs
159  // This is relevant for synchronous Transform streams.
160  this.prefinished = false;
161
162  // True if the error was already emitted and should not be thrown again.
163  this.errorEmitted = false;
164
165  // Should close be emitted on destroy. Defaults to true.
166  this.emitClose = !options || options.emitClose !== false;
167
168  // Should .destroy() be called after 'finish' (and potentially 'end').
169  this.autoDestroy = !options || options.autoDestroy !== false;
170
171  // Indicates whether the stream has errored. When true all write() calls
172  // should return false. This is needed since when autoDestroy
173  // is disabled we need a way to tell whether the stream has failed.
174  this.errored = null;
175
176  // Indicates whether the stream has finished destroying.
177  this.closed = false;
178}
179
180function resetBuffer(state) {
181  state.buffered = [];
182  state.bufferedIndex = 0;
183  state.allBuffers = true;
184  state.allNoop = true;
185}
186
187WritableState.prototype.getBuffer = function getBuffer() {
188  return this.buffered.slice(this.bufferedIndex);
189};
190
191ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
192  get() {
193    return this.buffered.length - this.bufferedIndex;
194  }
195});
196
197// Test _writableState for inheritance to account for Duplex streams,
198// whose prototype chain only points to Readable.
199let realHasInstance;
200if (typeof Symbol === 'function' && SymbolHasInstance) {
201  realHasInstance = FunctionPrototype[SymbolHasInstance];
202  ObjectDefineProperty(Writable, SymbolHasInstance, {
203    value: function(object) {
204      if (realHasInstance.call(this, object))
205        return true;
206      if (this !== Writable)
207        return false;
208
209      return object && object._writableState instanceof WritableState;
210    }
211  });
212} else {
213  realHasInstance = function(object) {
214    return object instanceof this;
215  };
216}
217
218function Writable(options) {
219  // Writable ctor is applied to Duplexes, too.
220  // `realHasInstance` is necessary because using plain `instanceof`
221  // would return false, as no `_writableState` property is attached.
222
223  // Trying to use the custom `instanceof` for Writable here will also break the
224  // Node.js LazyTransform implementation, which has a non-trivial getter for
225  // `_writableState` that would lead to infinite recursion.
226
227  // Checking for a Stream.Duplex instance is faster here instead of inside
228  // the WritableState constructor, at least with V8 6.5.
229  const isDuplex = (this instanceof Stream.Duplex);
230
231  if (!isDuplex && !realHasInstance.call(Writable, this))
232    return new Writable(options);
233
234  this._writableState = new WritableState(options, this, isDuplex);
235
236  if (options) {
237    if (typeof options.write === 'function')
238      this._write = options.write;
239
240    if (typeof options.writev === 'function')
241      this._writev = options.writev;
242
243    if (typeof options.destroy === 'function')
244      this._destroy = options.destroy;
245
246    if (typeof options.final === 'function')
247      this._final = options.final;
248  }
249
250  Stream.call(this, options);
251}
252
253// Otherwise people can pipe Writable streams, which is just wrong.
254Writable.prototype.pipe = function() {
255  errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
256};
257
258Writable.prototype.write = function(chunk, encoding, cb) {
259  const state = this._writableState;
260
261  if (typeof encoding === 'function') {
262    cb = encoding;
263    encoding = state.defaultEncoding;
264  } else {
265    if (!encoding)
266      encoding = state.defaultEncoding;
267    if (typeof cb !== 'function')
268      cb = nop;
269  }
270
271  if (chunk === null) {
272    throw new ERR_STREAM_NULL_VALUES();
273  } else if (!state.objectMode) {
274    if (typeof chunk === 'string') {
275      if (state.decodeStrings !== false) {
276        chunk = Buffer.from(chunk, encoding);
277        encoding = 'buffer';
278      }
279    } else if (chunk instanceof Buffer) {
280      encoding = 'buffer';
281    } else if (Stream._isUint8Array(chunk)) {
282      chunk = Stream._uint8ArrayToBuffer(chunk);
283      encoding = 'buffer';
284    } else {
285      throw new ERR_INVALID_ARG_TYPE(
286        'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
287    }
288  }
289
290  let err;
291  if (state.ending) {
292    err = new ERR_STREAM_WRITE_AFTER_END();
293  } else if (state.destroyed) {
294    err = new ERR_STREAM_DESTROYED('write');
295  }
296
297  if (err) {
298    process.nextTick(cb, err);
299    errorOrDestroy(this, err, true);
300    return false;
301  }
302  state.pendingcb++;
303  return writeOrBuffer(this, state, chunk, encoding, cb);
304};
305
306Writable.prototype.cork = function() {
307  this._writableState.corked++;
308};
309
310Writable.prototype.uncork = function() {
311  const state = this._writableState;
312
313  if (state.corked) {
314    state.corked--;
315
316    if (!state.writing)
317      clearBuffer(this, state);
318  }
319};
320
321Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
322  // node::ParseEncoding() requires lower case.
323  if (typeof encoding === 'string')
324    encoding = encoding.toLowerCase();
325  if (!Buffer.isEncoding(encoding))
326    throw new ERR_UNKNOWN_ENCODING(encoding);
327  this._writableState.defaultEncoding = encoding;
328  return this;
329};
330
331// If we're already writing something, then just put this
332// in the queue, and wait our turn.  Otherwise, call _write
333// If we return false, then we need a drain event, so set that flag.
334function writeOrBuffer(stream, state, chunk, encoding, callback) {
335  const len = state.objectMode ? 1 : chunk.length;
336
337  state.length += len;
338
339  // stream._write resets state.length
340  const ret = state.length < state.highWaterMark;
341  // We must ensure that previous needDrain will not be reset to false.
342  if (!ret)
343    state.needDrain = true;
344
345  if (state.writing || state.corked || state.errored) {
346    state.buffered.push({ chunk, encoding, callback });
347    if (state.allBuffers && encoding !== 'buffer') {
348      state.allBuffers = false;
349    }
350    if (state.allNoop && callback !== nop) {
351      state.allNoop = false;
352    }
353  } else {
354    state.writelen = len;
355    state.writecb = callback;
356    state.writing = true;
357    state.sync = true;
358    stream._write(chunk, encoding, state.onwrite);
359    state.sync = false;
360  }
361
362  // Return false if errored or destroyed in order to break
363  // any synchronous while(stream.write(data)) loops.
364  return ret && !state.errored && !state.destroyed;
365}
366
367function doWrite(stream, state, writev, len, chunk, encoding, cb) {
368  state.writelen = len;
369  state.writecb = cb;
370  state.writing = true;
371  state.sync = true;
372  if (state.destroyed)
373    state.onwrite(new ERR_STREAM_DESTROYED('write'));
374  else if (writev)
375    stream._writev(chunk, state.onwrite);
376  else
377    stream._write(chunk, encoding, state.onwrite);
378  state.sync = false;
379}
380
381function onwriteError(stream, state, er, cb) {
382  --state.pendingcb;
383
384  cb(er);
385  // Ensure callbacks are invoked even when autoDestroy is
386  // not enabled. Passing `er` here doesn't make sense since
387  // it's related to one specific write, not to the buffered
388  // writes.
389  errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
390  // This can emit error, but error must always follow cb.
391  errorOrDestroy(stream, er);
392}
393
394function onwrite(stream, er) {
395  const state = stream._writableState;
396  const sync = state.sync;
397  const cb = state.writecb;
398
399  if (typeof cb !== 'function') {
400    errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
401    return;
402  }
403
404  state.writing = false;
405  state.writecb = null;
406  state.length -= state.writelen;
407  state.writelen = 0;
408
409  if (er) {
410    // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
411    er.stack; // eslint-disable-line no-unused-expressions
412
413    if (!state.errored) {
414      state.errored = er;
415    }
416
417    // In case of duplex streams we need to notify the readable side of the
418    // error.
419    if (stream._readableState && !stream._readableState.errored) {
420      stream._readableState.errored = er;
421    }
422
423    if (sync) {
424      process.nextTick(onwriteError, stream, state, er, cb);
425    } else {
426      onwriteError(stream, state, er, cb);
427    }
428  } else {
429    if (state.buffered.length > state.bufferedIndex) {
430      clearBuffer(stream, state);
431    }
432
433    if (sync) {
434      // It is a common case that the callback passed to .write() is always
435      // the same. In that case, we do not schedule a new nextTick(), but
436      // rather just increase a counter, to improve performance and avoid
437      // memory allocations.
438      if (state.afterWriteTickInfo !== null &&
439          state.afterWriteTickInfo.cb === cb) {
440        state.afterWriteTickInfo.count++;
441      } else {
442        state.afterWriteTickInfo = { count: 1, cb, stream, state };
443        process.nextTick(afterWriteTick, state.afterWriteTickInfo);
444      }
445    } else {
446      afterWrite(stream, state, 1, cb);
447    }
448  }
449}
450
451function afterWriteTick({ stream, state, count, cb }) {
452  state.afterWriteTickInfo = null;
453  return afterWrite(stream, state, count, cb);
454}
455
456function afterWrite(stream, state, count, cb) {
457  const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
458    state.needDrain;
459  if (needDrain) {
460    state.needDrain = false;
461    stream.emit('drain');
462  }
463
464  while (count-- > 0) {
465    state.pendingcb--;
466    cb();
467  }
468
469  if (state.destroyed) {
470    errorBuffer(state, new ERR_STREAM_DESTROYED('write'));
471  }
472
473  finishMaybe(stream, state);
474}
475
476// If there's something in the buffer waiting, then invoke callbacks.
477function errorBuffer(state, err) {
478  if (state.writing) {
479    return;
480  }
481
482  for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
483    const { chunk, callback } = state.buffered[n];
484    const len = state.objectMode ? 1 : chunk.length;
485    state.length -= len;
486    callback(err);
487  }
488
489  resetBuffer(state);
490}
491
492// If there's something in the buffer waiting, then process it.
493function clearBuffer(stream, state) {
494  if (state.corked || state.bufferProcessing || state.destroyed) {
495    return;
496  }
497
498  const { buffered, bufferedIndex, objectMode } = state;
499  const bufferedLength = buffered.length - bufferedIndex;
500
501  if (!bufferedLength) {
502    return;
503  }
504
505  let i = bufferedIndex;
506
507  state.bufferProcessing = true;
508  if (bufferedLength > 1 && stream._writev) {
509    state.pendingcb -= bufferedLength - 1;
510
511    const callback = state.allNoop ? nop : (err) => {
512      for (let n = i; n < buffered.length; ++n) {
513        buffered[n].callback(err);
514      }
515    };
516    // Make a copy of `buffered` if it's going to be used by `callback` above,
517    // since `doWrite` will mutate the array.
518    const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i);
519    chunks.allBuffers = state.allBuffers;
520
521    doWrite(stream, state, true, state.length, chunks, '', callback);
522
523    resetBuffer(state);
524  } else {
525    do {
526      const { chunk, encoding, callback } = buffered[i];
527      buffered[i++] = null;
528      const len = objectMode ? 1 : chunk.length;
529      doWrite(stream, state, false, len, chunk, encoding, callback);
530    } while (i < buffered.length && !state.writing);
531
532    if (i === buffered.length) {
533      resetBuffer(state);
534    } else if (i > 256) {
535      buffered.splice(0, i);
536      state.bufferedIndex = 0;
537    } else {
538      state.bufferedIndex = i;
539    }
540  }
541  state.bufferProcessing = false;
542}
543
544Writable.prototype._write = function(chunk, encoding, cb) {
545  if (this._writev) {
546    this._writev([{ chunk, encoding }], cb);
547  } else {
548    throw new ERR_METHOD_NOT_IMPLEMENTED('_write()');
549  }
550};
551
552Writable.prototype._writev = null;
553
554Writable.prototype.end = function(chunk, encoding, cb) {
555  const state = this._writableState;
556
557  if (typeof chunk === 'function') {
558    cb = chunk;
559    chunk = null;
560    encoding = null;
561  } else if (typeof encoding === 'function') {
562    cb = encoding;
563    encoding = null;
564  }
565
566  if (chunk !== null && chunk !== undefined)
567    this.write(chunk, encoding);
568
569  // .end() fully uncorks.
570  if (state.corked) {
571    state.corked = 1;
572    this.uncork();
573  }
574
575  // This is forgiving in terms of unnecessary calls to end() and can hide
576  // logic errors. However, usually such errors are harmless and causing a
577  // hard error can be disproportionately destructive. It is not always
578  // trivial for the user to determine whether end() needs to be called or not.
579  let err;
580  if (!state.errored && !state.ending) {
581    state.ending = true;
582    finishMaybe(this, state, true);
583    state.ended = true;
584  } else if (state.finished) {
585    err = new ERR_STREAM_ALREADY_FINISHED('end');
586  } else if (state.destroyed) {
587    err = new ERR_STREAM_DESTROYED('end');
588  }
589
590  if (typeof cb === 'function') {
591    if (err || state.finished)
592      process.nextTick(cb, err);
593    else
594      onFinished(this, cb);
595  }
596
597  return this;
598};
599
600function needFinish(state) {
601  return (state.ending &&
602          state.length === 0 &&
603          !state.errored &&
604          state.buffered.length === 0 &&
605          !state.finished &&
606          !state.writing);
607}
608
609function callFinal(stream, state) {
610  stream._final((err) => {
611    state.pendingcb--;
612    if (err) {
613      errorOrDestroy(stream, err);
614    } else {
615      state.prefinished = true;
616      stream.emit('prefinish');
617      finishMaybe(stream, state);
618    }
619  });
620}
621
622function prefinish(stream, state) {
623  if (!state.prefinished && !state.finalCalled) {
624    if (typeof stream._final === 'function' && !state.destroyed) {
625      state.pendingcb++;
626      state.finalCalled = true;
627      process.nextTick(callFinal, stream, state);
628    } else {
629      state.prefinished = true;
630      stream.emit('prefinish');
631    }
632  }
633}
634
635function finishMaybe(stream, state, sync) {
636  const need = needFinish(state);
637  if (need) {
638    prefinish(stream, state);
639    if (state.pendingcb === 0) {
640      state.pendingcb++;
641      if (sync) {
642        process.nextTick(finish, stream, state);
643      } else {
644        finish(stream, state);
645      }
646    }
647  }
648  return need;
649}
650
651function finish(stream, state) {
652  state.pendingcb--;
653  if (state.errorEmitted)
654    return;
655
656  state.finished = true;
657  stream.emit('finish');
658
659  if (state.autoDestroy) {
660    // In case of duplex streams we need a way to detect
661    // if the readable side is ready for autoDestroy as well.
662    const rState = stream._readableState;
663    const autoDestroy = !rState || (
664      rState.autoDestroy &&
665      // We don't expect the readable to ever 'end'
666      // if readable is explicitly set to false.
667      (rState.endEmitted || rState.readable === false)
668    );
669    if (autoDestroy) {
670      stream.destroy();
671    }
672  }
673}
674
675// TODO(ronag): Avoid using events to implement internal logic.
676function onFinished(stream, cb) {
677  function onerror(err) {
678    stream.removeListener('finish', onfinish);
679    stream.removeListener('error', onerror);
680    cb(err);
681    if (stream.listenerCount('error') === 0) {
682      stream.emit('error', err);
683    }
684  }
685
686  function onfinish() {
687    stream.removeListener('finish', onfinish);
688    stream.removeListener('error', onerror);
689    cb();
690  }
691  stream.on('finish', onfinish);
692  stream.prependListener('error', onerror);
693}
694
695ObjectDefineProperties(Writable.prototype, {
696
697  destroyed: {
698    get() {
699      return this._writableState ? this._writableState.destroyed : false;
700    },
701    set(value) {
702      // Backward compatibility, the user is explicitly managing destroyed.
703      if (this._writableState) {
704        this._writableState.destroyed = value;
705      }
706    }
707  },
708
709  writable: {
710    get() {
711      const w = this._writableState;
712      // w.writable === false means that this is part of a Duplex stream
713      // where the writable side was disabled upon construction.
714      // Compat. The user might manually disable writable side through
715      // deprecated setter.
716      return !!w && w.writable !== false && !w.destroyed && !w.errored &&
717        !w.ending && !w.ended;
718    },
719    set(val) {
720      // Backwards compatible.
721      if (this._writableState) {
722        this._writableState.writable = !!val;
723      }
724    }
725  },
726
727  writableFinished: {
728    get() {
729      return this._writableState ? this._writableState.finished : false;
730    }
731  },
732
733  writableObjectMode: {
734    get() {
735      return this._writableState ? this._writableState.objectMode : false;
736    }
737  },
738
739  writableBuffer: {
740    get() {
741      return this._writableState && this._writableState.getBuffer();
742    }
743  },
744
745  writableEnded: {
746    get() {
747      return this._writableState ? this._writableState.ending : false;
748    }
749  },
750
751  writableNeedDrain: {
752    get() {
753      const wState = this._writableState;
754      if (!wState) return false;
755      return !wState.destroyed && !wState.ending && wState.needDrain;
756    }
757  },
758
759  writableHighWaterMark: {
760    get() {
761      return this._writableState && this._writableState.highWaterMark;
762    }
763  },
764
765  writableCorked: {
766    get() {
767      return this._writableState ? this._writableState.corked : 0;
768    }
769  },
770
771  writableLength: {
772    get() {
773      return this._writableState && this._writableState.length;
774    }
775  }
776});
777
778const destroy = destroyImpl.destroy;
779Writable.prototype.destroy = function(err, cb) {
780  const state = this._writableState;
781
782  if (!state.destroyed) {
783    process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write'));
784  }
785  destroy.call(this, err, cb);
786  return this;
787};
788
789Writable.prototype._undestroy = destroyImpl.undestroy;
790Writable.prototype._destroy = function(err, cb) {
791  cb(err);
792};
793
794Writable.prototype[EE.captureRejectionSymbol] = function(err) {
795  this.destroy(err);
796};
797