• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22// A bit simpler than readable streams.
23// Implement an async ._write(chunk, encoding, cb), and it'll handle all
24// the drain event emission and buffering.
25
26'use strict';
27
28const {
29  ArrayPrototypeSlice,
30  Error,
31  FunctionPrototypeSymbolHasInstance,
32  ObjectDefineProperty,
33  ObjectDefineProperties,
34  ObjectSetPrototypeOf,
35  StringPrototypeToLowerCase,
36  Symbol,
37  SymbolHasInstance,
38} = primordials;
39
40module.exports = Writable;
41Writable.WritableState = WritableState;
42
43const EE = require('events');
44const Stream = require('internal/streams/legacy').Stream;
45const { Buffer } = require('buffer');
46const destroyImpl = require('internal/streams/destroy');
47
48const {
49  addAbortSignal,
50} = require('internal/streams/add-abort-signal');
51
52const {
53  getHighWaterMark,
54  getDefaultHighWaterMark,
55} = require('internal/streams/state');
56const {
57  ERR_INVALID_ARG_TYPE,
58  ERR_METHOD_NOT_IMPLEMENTED,
59  ERR_MULTIPLE_CALLBACK,
60  ERR_STREAM_CANNOT_PIPE,
61  ERR_STREAM_DESTROYED,
62  ERR_STREAM_ALREADY_FINISHED,
63  ERR_STREAM_NULL_VALUES,
64  ERR_STREAM_WRITE_AFTER_END,
65  ERR_UNKNOWN_ENCODING,
66} = require('internal/errors').codes;
67
68const { errorOrDestroy } = destroyImpl;
69
70ObjectSetPrototypeOf(Writable.prototype, Stream.prototype);
71ObjectSetPrototypeOf(Writable, Stream);
72
73function nop() {}
74
75const kOnFinished = Symbol('kOnFinished');
76
77function WritableState(options, stream, isDuplex) {
78  // Duplex streams are both readable and writable, but share
79  // the same options object.
80  // However, some cases require setting options to different
81  // values for the readable and the writable sides of the duplex stream,
82  // e.g. options.readableObjectMode vs. options.writableObjectMode, etc.
83  if (typeof isDuplex !== 'boolean')
84    isDuplex = stream instanceof Stream.Duplex;
85
86  // Object stream flag to indicate whether or not this stream
87  // contains buffers or objects.
88  this.objectMode = !!(options && options.objectMode);
89
90  if (isDuplex)
91    this.objectMode = this.objectMode ||
92      !!(options && options.writableObjectMode);
93
94  // The point at which write() starts returning false
95  // Note: 0 is a valid value, means that we always return false if
96  // the entire buffer is not flushed immediately on write().
97  this.highWaterMark = options ?
98    getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) :
99    getDefaultHighWaterMark(false);
100
101  // if _final has been called.
102  this.finalCalled = false;
103
104  // drain event flag.
105  this.needDrain = false;
106  // At the start of calling end()
107  this.ending = false;
108  // When end() has been called, and returned.
109  this.ended = false;
110  // When 'finish' is emitted.
111  this.finished = false;
112
113  // Has it been destroyed
114  this.destroyed = false;
115
116  // Should we decode strings into buffers before passing to _write?
117  // this is here so that some node-core streams can optimize string
118  // handling at a lower level.
119  const noDecode = !!(options && options.decodeStrings === false);
120  this.decodeStrings = !noDecode;
121
122  // Crypto is kind of old and crusty.  Historically, its default string
123  // encoding is 'binary' so we have to make this configurable.
124  // Everything else in the universe uses 'utf8', though.
125  this.defaultEncoding = (options && options.defaultEncoding) || 'utf8';
126
127  // Not an actual buffer we keep track of, but a measurement
128  // of how much we're waiting to get pushed to some underlying
129  // socket or file.
130  this.length = 0;
131
132  // A flag to see when we're in the middle of a write.
133  this.writing = false;
134
135  // When true all writes will be buffered until .uncork() call.
136  this.corked = 0;
137
138  // A flag to be able to tell if the onwrite cb is called immediately,
139  // or on a later tick.  We set this to true at first, because any
140  // actions that shouldn't happen until "later" should generally also
141  // not happen before the first write call.
142  this.sync = true;
143
144  // A flag to know if we're processing previously buffered items, which
145  // may call the _write() callback in the same tick, so that we don't
146  // end up in an overlapped onwrite situation.
147  this.bufferProcessing = false;
148
149  // The callback that's passed to _write(chunk, cb).
150  this.onwrite = onwrite.bind(undefined, stream);
151
152  // The callback that the user supplies to write(chunk, encoding, cb).
153  this.writecb = null;
154
155  // The amount that is being written when _write is called.
156  this.writelen = 0;
157
158  // Storage for data passed to the afterWrite() callback in case of
159  // synchronous _write() completion.
160  this.afterWriteTickInfo = null;
161
162  resetBuffer(this);
163
164  // Number of pending user-supplied write callbacks
165  // this must be 0 before 'finish' can be emitted.
166  this.pendingcb = 0;
167
168  // Stream is still being constructed and cannot be
169  // destroyed until construction finished or failed.
170  // Async construction is opt in, therefore we start as
171  // constructed.
172  this.constructed = true;
173
174  // Emit prefinish if the only thing we're waiting for is _write cbs
175  // This is relevant for synchronous Transform streams.
176  this.prefinished = false;
177
178  // True if the error was already emitted and should not be thrown again.
179  this.errorEmitted = false;
180
181  // Should close be emitted on destroy. Defaults to true.
182  this.emitClose = !options || options.emitClose !== false;
183
184  // Should .destroy() be called after 'finish' (and potentially 'end').
185  this.autoDestroy = !options || options.autoDestroy !== false;
186
187  // Indicates whether the stream has errored. When true all write() calls
188  // should return false. This is needed since when autoDestroy
189  // is disabled we need a way to tell whether the stream has failed.
190  this.errored = null;
191
192  // Indicates whether the stream has finished destroying.
193  this.closed = false;
194
195  // True if close has been emitted or would have been emitted
196  // depending on emitClose.
197  this.closeEmitted = false;
198
199  this[kOnFinished] = [];
200}
201
202function resetBuffer(state) {
203  state.buffered = [];
204  state.bufferedIndex = 0;
205  state.allBuffers = true;
206  state.allNoop = true;
207}
208
209WritableState.prototype.getBuffer = function getBuffer() {
210  return ArrayPrototypeSlice(this.buffered, this.bufferedIndex);
211};
212
213ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', {
214  __proto__: null,
215  get() {
216    return this.buffered.length - this.bufferedIndex;
217  },
218});
219
220function Writable(options) {
221  // Writable ctor is applied to Duplexes, too.
222  // `realHasInstance` is necessary because using plain `instanceof`
223  // would return false, as no `_writableState` property is attached.
224
225  // Trying to use the custom `instanceof` for Writable here will also break the
226  // Node.js LazyTransform implementation, which has a non-trivial getter for
227  // `_writableState` that would lead to infinite recursion.
228
229  // Checking for a Stream.Duplex instance is faster here instead of inside
230  // the WritableState constructor, at least with V8 6.5.
231  const isDuplex = (this instanceof Stream.Duplex);
232
233  if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this))
234    return new Writable(options);
235
236  this._writableState = new WritableState(options, this, isDuplex);
237
238  if (options) {
239    if (typeof options.write === 'function')
240      this._write = options.write;
241
242    if (typeof options.writev === 'function')
243      this._writev = options.writev;
244
245    if (typeof options.destroy === 'function')
246      this._destroy = options.destroy;
247
248    if (typeof options.final === 'function')
249      this._final = options.final;
250
251    if (typeof options.construct === 'function')
252      this._construct = options.construct;
253
254    if (options.signal)
255      addAbortSignal(options.signal, this);
256  }
257
258  Stream.call(this, options);
259
260  destroyImpl.construct(this, () => {
261    const state = this._writableState;
262
263    if (!state.writing) {
264      clearBuffer(this, state);
265    }
266
267    finishMaybe(this, state);
268  });
269}
270
271ObjectDefineProperty(Writable, SymbolHasInstance, {
272  __proto__: null,
273  value: function(object) {
274    if (FunctionPrototypeSymbolHasInstance(this, object)) return true;
275    if (this !== Writable) return false;
276
277    return object && object._writableState instanceof WritableState;
278  },
279});
280
281// Otherwise people can pipe Writable streams, which is just wrong.
282Writable.prototype.pipe = function() {
283  errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE());
284};
285
286function _write(stream, chunk, encoding, cb) {
287  const state = stream._writableState;
288
289  if (typeof encoding === 'function') {
290    cb = encoding;
291    encoding = state.defaultEncoding;
292  } else {
293    if (!encoding)
294      encoding = state.defaultEncoding;
295    else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding))
296      throw new ERR_UNKNOWN_ENCODING(encoding);
297    if (typeof cb !== 'function')
298      cb = nop;
299  }
300
301  if (chunk === null) {
302    throw new ERR_STREAM_NULL_VALUES();
303  } else if (!state.objectMode) {
304    if (typeof chunk === 'string') {
305      if (state.decodeStrings !== false) {
306        chunk = Buffer.from(chunk, encoding);
307        encoding = 'buffer';
308      }
309    } else if (chunk instanceof Buffer) {
310      encoding = 'buffer';
311    } else if (Stream._isUint8Array(chunk)) {
312      chunk = Stream._uint8ArrayToBuffer(chunk);
313      encoding = 'buffer';
314    } else {
315      throw new ERR_INVALID_ARG_TYPE(
316        'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
317    }
318  }
319
320  let err;
321  if (state.ending) {
322    err = new ERR_STREAM_WRITE_AFTER_END();
323  } else if (state.destroyed) {
324    err = new ERR_STREAM_DESTROYED('write');
325  }
326
327  if (err) {
328    process.nextTick(cb, err);
329    errorOrDestroy(stream, err, true);
330    return err;
331  }
332  state.pendingcb++;
333  return writeOrBuffer(stream, state, chunk, encoding, cb);
334}
335
336Writable.prototype.write = function(chunk, encoding, cb) {
337  return _write(this, chunk, encoding, cb) === true;
338};
339
340Writable.prototype.cork = function() {
341  this._writableState.corked++;
342};
343
344Writable.prototype.uncork = function() {
345  const state = this._writableState;
346
347  if (state.corked) {
348    state.corked--;
349
350    if (!state.writing)
351      clearBuffer(this, state);
352  }
353};
354
355Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
356  // node::ParseEncoding() requires lower case.
357  if (typeof encoding === 'string')
358    encoding = StringPrototypeToLowerCase(encoding);
359  if (!Buffer.isEncoding(encoding))
360    throw new ERR_UNKNOWN_ENCODING(encoding);
361  this._writableState.defaultEncoding = encoding;
362  return this;
363};
364
365// If we're already writing something, then just put this
366// in the queue, and wait our turn.  Otherwise, call _write
367// If we return false, then we need a drain event, so set that flag.
368function writeOrBuffer(stream, state, chunk, encoding, callback) {
369  const len = state.objectMode ? 1 : chunk.length;
370
371  state.length += len;
372
373  // stream._write resets state.length
374  const ret = state.length < state.highWaterMark;
375  // We must ensure that previous needDrain will not be reset to false.
376  if (!ret)
377    state.needDrain = true;
378
379  if (state.writing || state.corked || state.errored || !state.constructed) {
380    state.buffered.push({ chunk, encoding, callback });
381    if (state.allBuffers && encoding !== 'buffer') {
382      state.allBuffers = false;
383    }
384    if (state.allNoop && callback !== nop) {
385      state.allNoop = false;
386    }
387  } else {
388    state.writelen = len;
389    state.writecb = callback;
390    state.writing = true;
391    state.sync = true;
392    stream._write(chunk, encoding, state.onwrite);
393    state.sync = false;
394  }
395
396  // Return false if errored or destroyed in order to break
397  // any synchronous while(stream.write(data)) loops.
398  return ret && !state.errored && !state.destroyed;
399}
400
401function doWrite(stream, state, writev, len, chunk, encoding, cb) {
402  state.writelen = len;
403  state.writecb = cb;
404  state.writing = true;
405  state.sync = true;
406  if (state.destroyed)
407    state.onwrite(new ERR_STREAM_DESTROYED('write'));
408  else if (writev)
409    stream._writev(chunk, state.onwrite);
410  else
411    stream._write(chunk, encoding, state.onwrite);
412  state.sync = false;
413}
414
415function onwriteError(stream, state, er, cb) {
416  --state.pendingcb;
417
418  cb(er);
419  // Ensure callbacks are invoked even when autoDestroy is
420  // not enabled. Passing `er` here doesn't make sense since
421  // it's related to one specific write, not to the buffered
422  // writes.
423  errorBuffer(state);
424  // This can emit error, but error must always follow cb.
425  errorOrDestroy(stream, er);
426}
427
428function onwrite(stream, er) {
429  const state = stream._writableState;
430  const sync = state.sync;
431  const cb = state.writecb;
432
433  if (typeof cb !== 'function') {
434    errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
435    return;
436  }
437
438  state.writing = false;
439  state.writecb = null;
440  state.length -= state.writelen;
441  state.writelen = 0;
442
443  if (er) {
444    // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
445    er.stack; // eslint-disable-line no-unused-expressions
446
447    if (!state.errored) {
448      state.errored = er;
449    }
450
451    // In case of duplex streams we need to notify the readable side of the
452    // error.
453    if (stream._readableState && !stream._readableState.errored) {
454      stream._readableState.errored = er;
455    }
456
457    if (sync) {
458      process.nextTick(onwriteError, stream, state, er, cb);
459    } else {
460      onwriteError(stream, state, er, cb);
461    }
462  } else {
463    if (state.buffered.length > state.bufferedIndex) {
464      clearBuffer(stream, state);
465    }
466
467    if (sync) {
468      // It is a common case that the callback passed to .write() is always
469      // the same. In that case, we do not schedule a new nextTick(), but
470      // rather just increase a counter, to improve performance and avoid
471      // memory allocations.
472      if (state.afterWriteTickInfo !== null &&
473          state.afterWriteTickInfo.cb === cb) {
474        state.afterWriteTickInfo.count++;
475      } else {
476        state.afterWriteTickInfo = { count: 1, cb, stream, state };
477        process.nextTick(afterWriteTick, state.afterWriteTickInfo);
478      }
479    } else {
480      afterWrite(stream, state, 1, cb);
481    }
482  }
483}
484
485function afterWriteTick({ stream, state, count, cb }) {
486  state.afterWriteTickInfo = null;
487  return afterWrite(stream, state, count, cb);
488}
489
490function afterWrite(stream, state, count, cb) {
491  const needDrain = !state.ending && !stream.destroyed && state.length === 0 &&
492    state.needDrain;
493  if (needDrain) {
494    state.needDrain = false;
495    stream.emit('drain');
496  }
497
498  while (count-- > 0) {
499    state.pendingcb--;
500    cb();
501  }
502
503  if (state.destroyed) {
504    errorBuffer(state);
505  }
506
507  finishMaybe(stream, state);
508}
509
510// If there's something in the buffer waiting, then invoke callbacks.
511function errorBuffer(state) {
512  if (state.writing) {
513    return;
514  }
515
516  for (let n = state.bufferedIndex; n < state.buffered.length; ++n) {
517    const { chunk, callback } = state.buffered[n];
518    const len = state.objectMode ? 1 : chunk.length;
519    state.length -= len;
520    callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
521  }
522
523  const onfinishCallbacks = state[kOnFinished].splice(0);
524  for (let i = 0; i < onfinishCallbacks.length; i++) {
525    onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
526  }
527
528  resetBuffer(state);
529}
530
531// If there's something in the buffer waiting, then process it.
532function clearBuffer(stream, state) {
533  if (state.corked ||
534      state.bufferProcessing ||
535      state.destroyed ||
536      !state.constructed) {
537    return;
538  }
539
540  const { buffered, bufferedIndex, objectMode } = state;
541  const bufferedLength = buffered.length - bufferedIndex;
542
543  if (!bufferedLength) {
544    return;
545  }
546
547  let i = bufferedIndex;
548
549  state.bufferProcessing = true;
550  if (bufferedLength > 1 && stream._writev) {
551    state.pendingcb -= bufferedLength - 1;
552
553    const callback = state.allNoop ? nop : (err) => {
554      for (let n = i; n < buffered.length; ++n) {
555        buffered[n].callback(err);
556      }
557    };
558    // Make a copy of `buffered` if it's going to be used by `callback` above,
559    // since `doWrite` will mutate the array.
560    const chunks = state.allNoop && i === 0 ?
561      buffered : ArrayPrototypeSlice(buffered, i);
562    chunks.allBuffers = state.allBuffers;
563
564    doWrite(stream, state, true, state.length, chunks, '', callback);
565
566    resetBuffer(state);
567  } else {
568    do {
569      const { chunk, encoding, callback } = buffered[i];
570      buffered[i++] = null;
571      const len = objectMode ? 1 : chunk.length;
572      doWrite(stream, state, false, len, chunk, encoding, callback);
573    } while (i < buffered.length && !state.writing);
574
575    if (i === buffered.length) {
576      resetBuffer(state);
577    } else if (i > 256) {
578      buffered.splice(0, i);
579      state.bufferedIndex = 0;
580    } else {
581      state.bufferedIndex = i;
582    }
583  }
584  state.bufferProcessing = false;
585}
586
587Writable.prototype._write = function(chunk, encoding, cb) {
588  if (this._writev) {
589    this._writev([{ chunk, encoding }], cb);
590  } else {
591    throw new ERR_METHOD_NOT_IMPLEMENTED('_write()');
592  }
593};
594
595Writable.prototype._writev = null;
596
597Writable.prototype.end = function(chunk, encoding, cb) {
598  const state = this._writableState;
599
600  if (typeof chunk === 'function') {
601    cb = chunk;
602    chunk = null;
603    encoding = null;
604  } else if (typeof encoding === 'function') {
605    cb = encoding;
606    encoding = null;
607  }
608
609  let err;
610
611  if (chunk !== null && chunk !== undefined) {
612    const ret = _write(this, chunk, encoding);
613    if (ret instanceof Error) {
614      err = ret;
615    }
616  }
617
618  // .end() fully uncorks.
619  if (state.corked) {
620    state.corked = 1;
621    this.uncork();
622  }
623
624  if (err) {
625    // Do nothing...
626  } else if (!state.errored && !state.ending) {
627    // This is forgiving in terms of unnecessary calls to end() and can hide
628    // logic errors. However, usually such errors are harmless and causing a
629    // hard error can be disproportionately destructive. It is not always
630    // trivial for the user to determine whether end() needs to be called
631    // or not.
632
633    state.ending = true;
634    finishMaybe(this, state, true);
635    state.ended = true;
636  } else if (state.finished) {
637    err = new ERR_STREAM_ALREADY_FINISHED('end');
638  } else if (state.destroyed) {
639    err = new ERR_STREAM_DESTROYED('end');
640  }
641
642  if (typeof cb === 'function') {
643    if (err || state.finished) {
644      process.nextTick(cb, err);
645    } else {
646      state[kOnFinished].push(cb);
647    }
648  }
649
650  return this;
651};
652
653function needFinish(state) {
654  return (state.ending &&
655          !state.destroyed &&
656          state.constructed &&
657          state.length === 0 &&
658          !state.errored &&
659          state.buffered.length === 0 &&
660          !state.finished &&
661          !state.writing &&
662          !state.errorEmitted &&
663          !state.closeEmitted);
664}
665
666function callFinal(stream, state) {
667  let called = false;
668
669  function onFinish(err) {
670    if (called) {
671      errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK());
672      return;
673    }
674    called = true;
675
676    state.pendingcb--;
677    if (err) {
678      const onfinishCallbacks = state[kOnFinished].splice(0);
679      for (let i = 0; i < onfinishCallbacks.length; i++) {
680        onfinishCallbacks[i](err);
681      }
682      errorOrDestroy(stream, err, state.sync);
683    } else if (needFinish(state)) {
684      state.prefinished = true;
685      stream.emit('prefinish');
686      // Backwards compat. Don't check state.sync here.
687      // Some streams assume 'finish' will be emitted
688      // asynchronously relative to _final callback.
689      state.pendingcb++;
690      process.nextTick(finish, stream, state);
691    }
692  }
693
694  state.sync = true;
695  state.pendingcb++;
696
697  try {
698    stream._final(onFinish);
699  } catch (err) {
700    onFinish(err);
701  }
702
703  state.sync = false;
704}
705
706function prefinish(stream, state) {
707  if (!state.prefinished && !state.finalCalled) {
708    if (typeof stream._final === 'function' && !state.destroyed) {
709      state.finalCalled = true;
710      callFinal(stream, state);
711    } else {
712      state.prefinished = true;
713      stream.emit('prefinish');
714    }
715  }
716}
717
718function finishMaybe(stream, state, sync) {
719  if (needFinish(state)) {
720    prefinish(stream, state);
721    if (state.pendingcb === 0) {
722      if (sync) {
723        state.pendingcb++;
724        process.nextTick((stream, state) => {
725          if (needFinish(state)) {
726            finish(stream, state);
727          } else {
728            state.pendingcb--;
729          }
730        }, stream, state);
731      } else if (needFinish(state)) {
732        state.pendingcb++;
733        finish(stream, state);
734      }
735    }
736  }
737}
738
739function finish(stream, state) {
740  state.pendingcb--;
741  state.finished = true;
742
743  const onfinishCallbacks = state[kOnFinished].splice(0);
744  for (let i = 0; i < onfinishCallbacks.length; i++) {
745    onfinishCallbacks[i]();
746  }
747
748  stream.emit('finish');
749
750  if (state.autoDestroy) {
751    // In case of duplex streams we need a way to detect
752    // if the readable side is ready for autoDestroy as well.
753    const rState = stream._readableState;
754    const autoDestroy = !rState || (
755      rState.autoDestroy &&
756      // We don't expect the readable to ever 'end'
757      // if readable is explicitly set to false.
758      (rState.endEmitted || rState.readable === false)
759    );
760    if (autoDestroy) {
761      stream.destroy();
762    }
763  }
764}
765
766ObjectDefineProperties(Writable.prototype, {
767
768  closed: {
769    __proto__: null,
770    get() {
771      return this._writableState ? this._writableState.closed : false;
772    },
773  },
774
775  destroyed: {
776    __proto__: null,
777    get() {
778      return this._writableState ? this._writableState.destroyed : false;
779    },
780    set(value) {
781      // Backward compatibility, the user is explicitly managing destroyed.
782      if (this._writableState) {
783        this._writableState.destroyed = value;
784      }
785    },
786  },
787
788  writable: {
789    __proto__: null,
790    get() {
791      const w = this._writableState;
792      // w.writable === false means that this is part of a Duplex stream
793      // where the writable side was disabled upon construction.
794      // Compat. The user might manually disable writable side through
795      // deprecated setter.
796      return !!w && w.writable !== false && !w.destroyed && !w.errored &&
797        !w.ending && !w.ended;
798    },
799    set(val) {
800      // Backwards compatible.
801      if (this._writableState) {
802        this._writableState.writable = !!val;
803      }
804    },
805  },
806
807  writableFinished: {
808    __proto__: null,
809    get() {
810      return this._writableState ? this._writableState.finished : false;
811    },
812  },
813
814  writableObjectMode: {
815    __proto__: null,
816    get() {
817      return this._writableState ? this._writableState.objectMode : false;
818    },
819  },
820
821  writableBuffer: {
822    __proto__: null,
823    get() {
824      return this._writableState && this._writableState.getBuffer();
825    },
826  },
827
828  writableEnded: {
829    __proto__: null,
830    get() {
831      return this._writableState ? this._writableState.ending : false;
832    },
833  },
834
835  writableNeedDrain: {
836    __proto__: null,
837    get() {
838      const wState = this._writableState;
839      if (!wState) return false;
840      return !wState.destroyed && !wState.ending && wState.needDrain;
841    },
842  },
843
844  writableHighWaterMark: {
845    __proto__: null,
846    get() {
847      return this._writableState && this._writableState.highWaterMark;
848    },
849  },
850
851  writableCorked: {
852    __proto__: null,
853    get() {
854      return this._writableState ? this._writableState.corked : 0;
855    },
856  },
857
858  writableLength: {
859    __proto__: null,
860    get() {
861      return this._writableState && this._writableState.length;
862    },
863  },
864
865  errored: {
866    __proto__: null,
867    enumerable: false,
868    get() {
869      return this._writableState ? this._writableState.errored : null;
870    },
871  },
872
873  writableAborted: {
874    __proto__: null,
875    enumerable: false,
876    get: function() {
877      return !!(
878        this._writableState.writable !== false &&
879        (this._writableState.destroyed || this._writableState.errored) &&
880        !this._writableState.finished
881      );
882    },
883  },
884});
885
886const destroy = destroyImpl.destroy;
887Writable.prototype.destroy = function(err, cb) {
888  const state = this._writableState;
889
890  // Invoke pending callbacks.
891  if (!state.destroyed &&
892    (state.bufferedIndex < state.buffered.length ||
893      state[kOnFinished].length)) {
894    process.nextTick(errorBuffer, state);
895  }
896
897  destroy.call(this, err, cb);
898  return this;
899};
900
901Writable.prototype._undestroy = destroyImpl.undestroy;
902Writable.prototype._destroy = function(err, cb) {
903  cb(err);
904};
905
906Writable.prototype[EE.captureRejectionSymbol] = function(err) {
907  this.destroy(err);
908};
909
910let webStreamsAdapters;
911
912// Lazy to avoid circular references
913function lazyWebStreams() {
914  if (webStreamsAdapters === undefined)
915    webStreamsAdapters = require('internal/webstreams/adapters');
916  return webStreamsAdapters;
917}
918
919Writable.fromWeb = function(writableStream, options) {
920  return lazyWebStreams().newStreamWritableFromWritableStream(
921    writableStream,
922    options);
923};
924
925Writable.toWeb = function(streamWritable) {
926  return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
927};
928