1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22// A bit simpler than readable streams. 23// Implement an async ._write(chunk, encoding, cb), and it'll handle all 24// the drain event emission and buffering. 25 26'use strict'; 27 28const { 29 ArrayPrototypeSlice, 30 Error, 31 FunctionPrototypeSymbolHasInstance, 32 ObjectDefineProperty, 33 ObjectDefineProperties, 34 ObjectSetPrototypeOf, 35 StringPrototypeToLowerCase, 36 Symbol, 37 SymbolHasInstance, 38} = primordials; 39 40module.exports = Writable; 41Writable.WritableState = WritableState; 42 43const EE = require('events'); 44const Stream = require('internal/streams/legacy').Stream; 45const { Buffer } = require('buffer'); 46const destroyImpl = require('internal/streams/destroy'); 47 48const { 49 addAbortSignal, 50} = require('internal/streams/add-abort-signal'); 51 52const { 53 getHighWaterMark, 54 getDefaultHighWaterMark, 55} = require('internal/streams/state'); 56const { 57 ERR_INVALID_ARG_TYPE, 58 ERR_METHOD_NOT_IMPLEMENTED, 59 ERR_MULTIPLE_CALLBACK, 60 ERR_STREAM_CANNOT_PIPE, 61 ERR_STREAM_DESTROYED, 62 ERR_STREAM_ALREADY_FINISHED, 63 ERR_STREAM_NULL_VALUES, 64 ERR_STREAM_WRITE_AFTER_END, 65 ERR_UNKNOWN_ENCODING, 66} = require('internal/errors').codes; 67 68const { errorOrDestroy } = destroyImpl; 69 70ObjectSetPrototypeOf(Writable.prototype, Stream.prototype); 71ObjectSetPrototypeOf(Writable, Stream); 72 73function nop() {} 74 75const kOnFinished = Symbol('kOnFinished'); 76 77function WritableState(options, stream, isDuplex) { 78 // Duplex streams are both readable and writable, but share 79 // the same options object. 80 // However, some cases require setting options to different 81 // values for the readable and the writable sides of the duplex stream, 82 // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. 83 if (typeof isDuplex !== 'boolean') 84 isDuplex = stream instanceof Stream.Duplex; 85 86 // Object stream flag to indicate whether or not this stream 87 // contains buffers or objects. 88 this.objectMode = !!(options && options.objectMode); 89 90 if (isDuplex) 91 this.objectMode = this.objectMode || 92 !!(options && options.writableObjectMode); 93 94 // The point at which write() starts returning false 95 // Note: 0 is a valid value, means that we always return false if 96 // the entire buffer is not flushed immediately on write(). 97 this.highWaterMark = options ? 98 getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) : 99 getDefaultHighWaterMark(false); 100 101 // if _final has been called. 102 this.finalCalled = false; 103 104 // drain event flag. 105 this.needDrain = false; 106 // At the start of calling end() 107 this.ending = false; 108 // When end() has been called, and returned. 109 this.ended = false; 110 // When 'finish' is emitted. 111 this.finished = false; 112 113 // Has it been destroyed 114 this.destroyed = false; 115 116 // Should we decode strings into buffers before passing to _write? 117 // this is here so that some node-core streams can optimize string 118 // handling at a lower level. 119 const noDecode = !!(options && options.decodeStrings === false); 120 this.decodeStrings = !noDecode; 121 122 // Crypto is kind of old and crusty. Historically, its default string 123 // encoding is 'binary' so we have to make this configurable. 124 // Everything else in the universe uses 'utf8', though. 125 this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; 126 127 // Not an actual buffer we keep track of, but a measurement 128 // of how much we're waiting to get pushed to some underlying 129 // socket or file. 130 this.length = 0; 131 132 // A flag to see when we're in the middle of a write. 133 this.writing = false; 134 135 // When true all writes will be buffered until .uncork() call. 136 this.corked = 0; 137 138 // A flag to be able to tell if the onwrite cb is called immediately, 139 // or on a later tick. We set this to true at first, because any 140 // actions that shouldn't happen until "later" should generally also 141 // not happen before the first write call. 142 this.sync = true; 143 144 // A flag to know if we're processing previously buffered items, which 145 // may call the _write() callback in the same tick, so that we don't 146 // end up in an overlapped onwrite situation. 147 this.bufferProcessing = false; 148 149 // The callback that's passed to _write(chunk, cb). 150 this.onwrite = onwrite.bind(undefined, stream); 151 152 // The callback that the user supplies to write(chunk, encoding, cb). 153 this.writecb = null; 154 155 // The amount that is being written when _write is called. 156 this.writelen = 0; 157 158 // Storage for data passed to the afterWrite() callback in case of 159 // synchronous _write() completion. 160 this.afterWriteTickInfo = null; 161 162 resetBuffer(this); 163 164 // Number of pending user-supplied write callbacks 165 // this must be 0 before 'finish' can be emitted. 166 this.pendingcb = 0; 167 168 // Stream is still being constructed and cannot be 169 // destroyed until construction finished or failed. 170 // Async construction is opt in, therefore we start as 171 // constructed. 172 this.constructed = true; 173 174 // Emit prefinish if the only thing we're waiting for is _write cbs 175 // This is relevant for synchronous Transform streams. 176 this.prefinished = false; 177 178 // True if the error was already emitted and should not be thrown again. 179 this.errorEmitted = false; 180 181 // Should close be emitted on destroy. Defaults to true. 182 this.emitClose = !options || options.emitClose !== false; 183 184 // Should .destroy() be called after 'finish' (and potentially 'end'). 185 this.autoDestroy = !options || options.autoDestroy !== false; 186 187 // Indicates whether the stream has errored. When true all write() calls 188 // should return false. This is needed since when autoDestroy 189 // is disabled we need a way to tell whether the stream has failed. 190 this.errored = null; 191 192 // Indicates whether the stream has finished destroying. 193 this.closed = false; 194 195 // True if close has been emitted or would have been emitted 196 // depending on emitClose. 197 this.closeEmitted = false; 198 199 this[kOnFinished] = []; 200} 201 202function resetBuffer(state) { 203 state.buffered = []; 204 state.bufferedIndex = 0; 205 state.allBuffers = true; 206 state.allNoop = true; 207} 208 209WritableState.prototype.getBuffer = function getBuffer() { 210 return ArrayPrototypeSlice(this.buffered, this.bufferedIndex); 211}; 212 213ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { 214 __proto__: null, 215 get() { 216 return this.buffered.length - this.bufferedIndex; 217 }, 218}); 219 220function Writable(options) { 221 // Writable ctor is applied to Duplexes, too. 222 // `realHasInstance` is necessary because using plain `instanceof` 223 // would return false, as no `_writableState` property is attached. 224 225 // Trying to use the custom `instanceof` for Writable here will also break the 226 // Node.js LazyTransform implementation, which has a non-trivial getter for 227 // `_writableState` that would lead to infinite recursion. 228 229 // Checking for a Stream.Duplex instance is faster here instead of inside 230 // the WritableState constructor, at least with V8 6.5. 231 const isDuplex = (this instanceof Stream.Duplex); 232 233 if (!isDuplex && !FunctionPrototypeSymbolHasInstance(Writable, this)) 234 return new Writable(options); 235 236 this._writableState = new WritableState(options, this, isDuplex); 237 238 if (options) { 239 if (typeof options.write === 'function') 240 this._write = options.write; 241 242 if (typeof options.writev === 'function') 243 this._writev = options.writev; 244 245 if (typeof options.destroy === 'function') 246 this._destroy = options.destroy; 247 248 if (typeof options.final === 'function') 249 this._final = options.final; 250 251 if (typeof options.construct === 'function') 252 this._construct = options.construct; 253 254 if (options.signal) 255 addAbortSignal(options.signal, this); 256 } 257 258 Stream.call(this, options); 259 260 destroyImpl.construct(this, () => { 261 const state = this._writableState; 262 263 if (!state.writing) { 264 clearBuffer(this, state); 265 } 266 267 finishMaybe(this, state); 268 }); 269} 270 271ObjectDefineProperty(Writable, SymbolHasInstance, { 272 __proto__: null, 273 value: function(object) { 274 if (FunctionPrototypeSymbolHasInstance(this, object)) return true; 275 if (this !== Writable) return false; 276 277 return object && object._writableState instanceof WritableState; 278 }, 279}); 280 281// Otherwise people can pipe Writable streams, which is just wrong. 282Writable.prototype.pipe = function() { 283 errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); 284}; 285 286function _write(stream, chunk, encoding, cb) { 287 const state = stream._writableState; 288 289 if (typeof encoding === 'function') { 290 cb = encoding; 291 encoding = state.defaultEncoding; 292 } else { 293 if (!encoding) 294 encoding = state.defaultEncoding; 295 else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding)) 296 throw new ERR_UNKNOWN_ENCODING(encoding); 297 if (typeof cb !== 'function') 298 cb = nop; 299 } 300 301 if (chunk === null) { 302 throw new ERR_STREAM_NULL_VALUES(); 303 } else if (!state.objectMode) { 304 if (typeof chunk === 'string') { 305 if (state.decodeStrings !== false) { 306 chunk = Buffer.from(chunk, encoding); 307 encoding = 'buffer'; 308 } 309 } else if (chunk instanceof Buffer) { 310 encoding = 'buffer'; 311 } else if (Stream._isUint8Array(chunk)) { 312 chunk = Stream._uint8ArrayToBuffer(chunk); 313 encoding = 'buffer'; 314 } else { 315 throw new ERR_INVALID_ARG_TYPE( 316 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); 317 } 318 } 319 320 let err; 321 if (state.ending) { 322 err = new ERR_STREAM_WRITE_AFTER_END(); 323 } else if (state.destroyed) { 324 err = new ERR_STREAM_DESTROYED('write'); 325 } 326 327 if (err) { 328 process.nextTick(cb, err); 329 errorOrDestroy(stream, err, true); 330 return err; 331 } 332 state.pendingcb++; 333 return writeOrBuffer(stream, state, chunk, encoding, cb); 334} 335 336Writable.prototype.write = function(chunk, encoding, cb) { 337 return _write(this, chunk, encoding, cb) === true; 338}; 339 340Writable.prototype.cork = function() { 341 this._writableState.corked++; 342}; 343 344Writable.prototype.uncork = function() { 345 const state = this._writableState; 346 347 if (state.corked) { 348 state.corked--; 349 350 if (!state.writing) 351 clearBuffer(this, state); 352 } 353}; 354 355Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { 356 // node::ParseEncoding() requires lower case. 357 if (typeof encoding === 'string') 358 encoding = StringPrototypeToLowerCase(encoding); 359 if (!Buffer.isEncoding(encoding)) 360 throw new ERR_UNKNOWN_ENCODING(encoding); 361 this._writableState.defaultEncoding = encoding; 362 return this; 363}; 364 365// If we're already writing something, then just put this 366// in the queue, and wait our turn. Otherwise, call _write 367// If we return false, then we need a drain event, so set that flag. 368function writeOrBuffer(stream, state, chunk, encoding, callback) { 369 const len = state.objectMode ? 1 : chunk.length; 370 371 state.length += len; 372 373 // stream._write resets state.length 374 const ret = state.length < state.highWaterMark; 375 // We must ensure that previous needDrain will not be reset to false. 376 if (!ret) 377 state.needDrain = true; 378 379 if (state.writing || state.corked || state.errored || !state.constructed) { 380 state.buffered.push({ chunk, encoding, callback }); 381 if (state.allBuffers && encoding !== 'buffer') { 382 state.allBuffers = false; 383 } 384 if (state.allNoop && callback !== nop) { 385 state.allNoop = false; 386 } 387 } else { 388 state.writelen = len; 389 state.writecb = callback; 390 state.writing = true; 391 state.sync = true; 392 stream._write(chunk, encoding, state.onwrite); 393 state.sync = false; 394 } 395 396 // Return false if errored or destroyed in order to break 397 // any synchronous while(stream.write(data)) loops. 398 return ret && !state.errored && !state.destroyed; 399} 400 401function doWrite(stream, state, writev, len, chunk, encoding, cb) { 402 state.writelen = len; 403 state.writecb = cb; 404 state.writing = true; 405 state.sync = true; 406 if (state.destroyed) 407 state.onwrite(new ERR_STREAM_DESTROYED('write')); 408 else if (writev) 409 stream._writev(chunk, state.onwrite); 410 else 411 stream._write(chunk, encoding, state.onwrite); 412 state.sync = false; 413} 414 415function onwriteError(stream, state, er, cb) { 416 --state.pendingcb; 417 418 cb(er); 419 // Ensure callbacks are invoked even when autoDestroy is 420 // not enabled. Passing `er` here doesn't make sense since 421 // it's related to one specific write, not to the buffered 422 // writes. 423 errorBuffer(state); 424 // This can emit error, but error must always follow cb. 425 errorOrDestroy(stream, er); 426} 427 428function onwrite(stream, er) { 429 const state = stream._writableState; 430 const sync = state.sync; 431 const cb = state.writecb; 432 433 if (typeof cb !== 'function') { 434 errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); 435 return; 436 } 437 438 state.writing = false; 439 state.writecb = null; 440 state.length -= state.writelen; 441 state.writelen = 0; 442 443 if (er) { 444 // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 445 er.stack; // eslint-disable-line no-unused-expressions 446 447 if (!state.errored) { 448 state.errored = er; 449 } 450 451 // In case of duplex streams we need to notify the readable side of the 452 // error. 453 if (stream._readableState && !stream._readableState.errored) { 454 stream._readableState.errored = er; 455 } 456 457 if (sync) { 458 process.nextTick(onwriteError, stream, state, er, cb); 459 } else { 460 onwriteError(stream, state, er, cb); 461 } 462 } else { 463 if (state.buffered.length > state.bufferedIndex) { 464 clearBuffer(stream, state); 465 } 466 467 if (sync) { 468 // It is a common case that the callback passed to .write() is always 469 // the same. In that case, we do not schedule a new nextTick(), but 470 // rather just increase a counter, to improve performance and avoid 471 // memory allocations. 472 if (state.afterWriteTickInfo !== null && 473 state.afterWriteTickInfo.cb === cb) { 474 state.afterWriteTickInfo.count++; 475 } else { 476 state.afterWriteTickInfo = { count: 1, cb, stream, state }; 477 process.nextTick(afterWriteTick, state.afterWriteTickInfo); 478 } 479 } else { 480 afterWrite(stream, state, 1, cb); 481 } 482 } 483} 484 485function afterWriteTick({ stream, state, count, cb }) { 486 state.afterWriteTickInfo = null; 487 return afterWrite(stream, state, count, cb); 488} 489 490function afterWrite(stream, state, count, cb) { 491 const needDrain = !state.ending && !stream.destroyed && state.length === 0 && 492 state.needDrain; 493 if (needDrain) { 494 state.needDrain = false; 495 stream.emit('drain'); 496 } 497 498 while (count-- > 0) { 499 state.pendingcb--; 500 cb(); 501 } 502 503 if (state.destroyed) { 504 errorBuffer(state); 505 } 506 507 finishMaybe(stream, state); 508} 509 510// If there's something in the buffer waiting, then invoke callbacks. 511function errorBuffer(state) { 512 if (state.writing) { 513 return; 514 } 515 516 for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { 517 const { chunk, callback } = state.buffered[n]; 518 const len = state.objectMode ? 1 : chunk.length; 519 state.length -= len; 520 callback(state.errored ?? new ERR_STREAM_DESTROYED('write')); 521 } 522 523 const onfinishCallbacks = state[kOnFinished].splice(0); 524 for (let i = 0; i < onfinishCallbacks.length; i++) { 525 onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end')); 526 } 527 528 resetBuffer(state); 529} 530 531// If there's something in the buffer waiting, then process it. 532function clearBuffer(stream, state) { 533 if (state.corked || 534 state.bufferProcessing || 535 state.destroyed || 536 !state.constructed) { 537 return; 538 } 539 540 const { buffered, bufferedIndex, objectMode } = state; 541 const bufferedLength = buffered.length - bufferedIndex; 542 543 if (!bufferedLength) { 544 return; 545 } 546 547 let i = bufferedIndex; 548 549 state.bufferProcessing = true; 550 if (bufferedLength > 1 && stream._writev) { 551 state.pendingcb -= bufferedLength - 1; 552 553 const callback = state.allNoop ? nop : (err) => { 554 for (let n = i; n < buffered.length; ++n) { 555 buffered[n].callback(err); 556 } 557 }; 558 // Make a copy of `buffered` if it's going to be used by `callback` above, 559 // since `doWrite` will mutate the array. 560 const chunks = state.allNoop && i === 0 ? 561 buffered : ArrayPrototypeSlice(buffered, i); 562 chunks.allBuffers = state.allBuffers; 563 564 doWrite(stream, state, true, state.length, chunks, '', callback); 565 566 resetBuffer(state); 567 } else { 568 do { 569 const { chunk, encoding, callback } = buffered[i]; 570 buffered[i++] = null; 571 const len = objectMode ? 1 : chunk.length; 572 doWrite(stream, state, false, len, chunk, encoding, callback); 573 } while (i < buffered.length && !state.writing); 574 575 if (i === buffered.length) { 576 resetBuffer(state); 577 } else if (i > 256) { 578 buffered.splice(0, i); 579 state.bufferedIndex = 0; 580 } else { 581 state.bufferedIndex = i; 582 } 583 } 584 state.bufferProcessing = false; 585} 586 587Writable.prototype._write = function(chunk, encoding, cb) { 588 if (this._writev) { 589 this._writev([{ chunk, encoding }], cb); 590 } else { 591 throw new ERR_METHOD_NOT_IMPLEMENTED('_write()'); 592 } 593}; 594 595Writable.prototype._writev = null; 596 597Writable.prototype.end = function(chunk, encoding, cb) { 598 const state = this._writableState; 599 600 if (typeof chunk === 'function') { 601 cb = chunk; 602 chunk = null; 603 encoding = null; 604 } else if (typeof encoding === 'function') { 605 cb = encoding; 606 encoding = null; 607 } 608 609 let err; 610 611 if (chunk !== null && chunk !== undefined) { 612 const ret = _write(this, chunk, encoding); 613 if (ret instanceof Error) { 614 err = ret; 615 } 616 } 617 618 // .end() fully uncorks. 619 if (state.corked) { 620 state.corked = 1; 621 this.uncork(); 622 } 623 624 if (err) { 625 // Do nothing... 626 } else if (!state.errored && !state.ending) { 627 // This is forgiving in terms of unnecessary calls to end() and can hide 628 // logic errors. However, usually such errors are harmless and causing a 629 // hard error can be disproportionately destructive. It is not always 630 // trivial for the user to determine whether end() needs to be called 631 // or not. 632 633 state.ending = true; 634 finishMaybe(this, state, true); 635 state.ended = true; 636 } else if (state.finished) { 637 err = new ERR_STREAM_ALREADY_FINISHED('end'); 638 } else if (state.destroyed) { 639 err = new ERR_STREAM_DESTROYED('end'); 640 } 641 642 if (typeof cb === 'function') { 643 if (err || state.finished) { 644 process.nextTick(cb, err); 645 } else { 646 state[kOnFinished].push(cb); 647 } 648 } 649 650 return this; 651}; 652 653function needFinish(state) { 654 return (state.ending && 655 !state.destroyed && 656 state.constructed && 657 state.length === 0 && 658 !state.errored && 659 state.buffered.length === 0 && 660 !state.finished && 661 !state.writing && 662 !state.errorEmitted && 663 !state.closeEmitted); 664} 665 666function callFinal(stream, state) { 667 let called = false; 668 669 function onFinish(err) { 670 if (called) { 671 errorOrDestroy(stream, err ?? ERR_MULTIPLE_CALLBACK()); 672 return; 673 } 674 called = true; 675 676 state.pendingcb--; 677 if (err) { 678 const onfinishCallbacks = state[kOnFinished].splice(0); 679 for (let i = 0; i < onfinishCallbacks.length; i++) { 680 onfinishCallbacks[i](err); 681 } 682 errorOrDestroy(stream, err, state.sync); 683 } else if (needFinish(state)) { 684 state.prefinished = true; 685 stream.emit('prefinish'); 686 // Backwards compat. Don't check state.sync here. 687 // Some streams assume 'finish' will be emitted 688 // asynchronously relative to _final callback. 689 state.pendingcb++; 690 process.nextTick(finish, stream, state); 691 } 692 } 693 694 state.sync = true; 695 state.pendingcb++; 696 697 try { 698 stream._final(onFinish); 699 } catch (err) { 700 onFinish(err); 701 } 702 703 state.sync = false; 704} 705 706function prefinish(stream, state) { 707 if (!state.prefinished && !state.finalCalled) { 708 if (typeof stream._final === 'function' && !state.destroyed) { 709 state.finalCalled = true; 710 callFinal(stream, state); 711 } else { 712 state.prefinished = true; 713 stream.emit('prefinish'); 714 } 715 } 716} 717 718function finishMaybe(stream, state, sync) { 719 if (needFinish(state)) { 720 prefinish(stream, state); 721 if (state.pendingcb === 0) { 722 if (sync) { 723 state.pendingcb++; 724 process.nextTick((stream, state) => { 725 if (needFinish(state)) { 726 finish(stream, state); 727 } else { 728 state.pendingcb--; 729 } 730 }, stream, state); 731 } else if (needFinish(state)) { 732 state.pendingcb++; 733 finish(stream, state); 734 } 735 } 736 } 737} 738 739function finish(stream, state) { 740 state.pendingcb--; 741 state.finished = true; 742 743 const onfinishCallbacks = state[kOnFinished].splice(0); 744 for (let i = 0; i < onfinishCallbacks.length; i++) { 745 onfinishCallbacks[i](); 746 } 747 748 stream.emit('finish'); 749 750 if (state.autoDestroy) { 751 // In case of duplex streams we need a way to detect 752 // if the readable side is ready for autoDestroy as well. 753 const rState = stream._readableState; 754 const autoDestroy = !rState || ( 755 rState.autoDestroy && 756 // We don't expect the readable to ever 'end' 757 // if readable is explicitly set to false. 758 (rState.endEmitted || rState.readable === false) 759 ); 760 if (autoDestroy) { 761 stream.destroy(); 762 } 763 } 764} 765 766ObjectDefineProperties(Writable.prototype, { 767 768 closed: { 769 __proto__: null, 770 get() { 771 return this._writableState ? this._writableState.closed : false; 772 }, 773 }, 774 775 destroyed: { 776 __proto__: null, 777 get() { 778 return this._writableState ? this._writableState.destroyed : false; 779 }, 780 set(value) { 781 // Backward compatibility, the user is explicitly managing destroyed. 782 if (this._writableState) { 783 this._writableState.destroyed = value; 784 } 785 }, 786 }, 787 788 writable: { 789 __proto__: null, 790 get() { 791 const w = this._writableState; 792 // w.writable === false means that this is part of a Duplex stream 793 // where the writable side was disabled upon construction. 794 // Compat. The user might manually disable writable side through 795 // deprecated setter. 796 return !!w && w.writable !== false && !w.destroyed && !w.errored && 797 !w.ending && !w.ended; 798 }, 799 set(val) { 800 // Backwards compatible. 801 if (this._writableState) { 802 this._writableState.writable = !!val; 803 } 804 }, 805 }, 806 807 writableFinished: { 808 __proto__: null, 809 get() { 810 return this._writableState ? this._writableState.finished : false; 811 }, 812 }, 813 814 writableObjectMode: { 815 __proto__: null, 816 get() { 817 return this._writableState ? this._writableState.objectMode : false; 818 }, 819 }, 820 821 writableBuffer: { 822 __proto__: null, 823 get() { 824 return this._writableState && this._writableState.getBuffer(); 825 }, 826 }, 827 828 writableEnded: { 829 __proto__: null, 830 get() { 831 return this._writableState ? this._writableState.ending : false; 832 }, 833 }, 834 835 writableNeedDrain: { 836 __proto__: null, 837 get() { 838 const wState = this._writableState; 839 if (!wState) return false; 840 return !wState.destroyed && !wState.ending && wState.needDrain; 841 }, 842 }, 843 844 writableHighWaterMark: { 845 __proto__: null, 846 get() { 847 return this._writableState && this._writableState.highWaterMark; 848 }, 849 }, 850 851 writableCorked: { 852 __proto__: null, 853 get() { 854 return this._writableState ? this._writableState.corked : 0; 855 }, 856 }, 857 858 writableLength: { 859 __proto__: null, 860 get() { 861 return this._writableState && this._writableState.length; 862 }, 863 }, 864 865 errored: { 866 __proto__: null, 867 enumerable: false, 868 get() { 869 return this._writableState ? this._writableState.errored : null; 870 }, 871 }, 872 873 writableAborted: { 874 __proto__: null, 875 enumerable: false, 876 get: function() { 877 return !!( 878 this._writableState.writable !== false && 879 (this._writableState.destroyed || this._writableState.errored) && 880 !this._writableState.finished 881 ); 882 }, 883 }, 884}); 885 886const destroy = destroyImpl.destroy; 887Writable.prototype.destroy = function(err, cb) { 888 const state = this._writableState; 889 890 // Invoke pending callbacks. 891 if (!state.destroyed && 892 (state.bufferedIndex < state.buffered.length || 893 state[kOnFinished].length)) { 894 process.nextTick(errorBuffer, state); 895 } 896 897 destroy.call(this, err, cb); 898 return this; 899}; 900 901Writable.prototype._undestroy = destroyImpl.undestroy; 902Writable.prototype._destroy = function(err, cb) { 903 cb(err); 904}; 905 906Writable.prototype[EE.captureRejectionSymbol] = function(err) { 907 this.destroy(err); 908}; 909 910let webStreamsAdapters; 911 912// Lazy to avoid circular references 913function lazyWebStreams() { 914 if (webStreamsAdapters === undefined) 915 webStreamsAdapters = require('internal/webstreams/adapters'); 916 return webStreamsAdapters; 917} 918 919Writable.fromWeb = function(writableStream, options) { 920 return lazyWebStreams().newStreamWritableFromWritableStream( 921 writableStream, 922 options); 923}; 924 925Writable.toWeb = function(streamWritable) { 926 return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable); 927}; 928