1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22// A bit simpler than readable streams. 23// Implement an async ._write(chunk, encoding, cb), and it'll handle all 24// the drain event emission and buffering. 25 26'use strict'; 27 28const { 29 FunctionPrototype, 30 ObjectDefineProperty, 31 ObjectDefineProperties, 32 ObjectSetPrototypeOf, 33 Symbol, 34 SymbolHasInstance, 35} = primordials; 36 37module.exports = Writable; 38Writable.WritableState = WritableState; 39 40const EE = require('events'); 41const Stream = require('internal/streams/legacy').Stream; 42const { Buffer } = require('buffer'); 43const destroyImpl = require('internal/streams/destroy'); 44const { 45 getHighWaterMark, 46 getDefaultHighWaterMark 47} = require('internal/streams/state'); 48const { 49 ERR_INVALID_ARG_TYPE, 50 ERR_METHOD_NOT_IMPLEMENTED, 51 ERR_MULTIPLE_CALLBACK, 52 ERR_STREAM_CANNOT_PIPE, 53 ERR_STREAM_DESTROYED, 54 ERR_STREAM_ALREADY_FINISHED, 55 ERR_STREAM_NULL_VALUES, 56 ERR_STREAM_WRITE_AFTER_END, 57 ERR_UNKNOWN_ENCODING 58} = require('internal/errors').codes; 59 60const { errorOrDestroy } = destroyImpl; 61 62ObjectSetPrototypeOf(Writable.prototype, Stream.prototype); 63ObjectSetPrototypeOf(Writable, Stream); 64 65function nop() {} 66 67function WritableState(options, stream, isDuplex) { 68 // Duplex streams are both readable and writable, but share 69 // the same options object. 70 // However, some cases require setting options to different 71 // values for the readable and the writable sides of the duplex stream, 72 // e.g. options.readableObjectMode vs. options.writableObjectMode, etc. 73 if (typeof isDuplex !== 'boolean') 74 isDuplex = stream instanceof Stream.Duplex; 75 76 // Object stream flag to indicate whether or not this stream 77 // contains buffers or objects. 78 this.objectMode = !!(options && options.objectMode); 79 80 if (isDuplex) 81 this.objectMode = this.objectMode || 82 !!(options && options.writableObjectMode); 83 84 // The point at which write() starts returning false 85 // Note: 0 is a valid value, means that we always return false if 86 // the entire buffer is not flushed immediately on write(). 87 this.highWaterMark = options ? 88 getHighWaterMark(this, options, 'writableHighWaterMark', isDuplex) : 89 getDefaultHighWaterMark(false); 90 91 // if _final has been called. 92 this.finalCalled = false; 93 94 // drain event flag. 95 this.needDrain = false; 96 // At the start of calling end() 97 this.ending = false; 98 // When end() has been called, and returned. 99 this.ended = false; 100 // When 'finish' is emitted. 101 this.finished = false; 102 103 // Has it been destroyed 104 this.destroyed = false; 105 106 // Should we decode strings into buffers before passing to _write? 107 // this is here so that some node-core streams can optimize string 108 // handling at a lower level. 109 const noDecode = !!(options && options.decodeStrings === false); 110 this.decodeStrings = !noDecode; 111 112 // Crypto is kind of old and crusty. Historically, its default string 113 // encoding is 'binary' so we have to make this configurable. 114 // Everything else in the universe uses 'utf8', though. 115 this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; 116 117 // Not an actual buffer we keep track of, but a measurement 118 // of how much we're waiting to get pushed to some underlying 119 // socket or file. 120 this.length = 0; 121 122 // A flag to see when we're in the middle of a write. 123 this.writing = false; 124 125 // When true all writes will be buffered until .uncork() call. 126 this.corked = 0; 127 128 // A flag to be able to tell if the onwrite cb is called immediately, 129 // or on a later tick. We set this to true at first, because any 130 // actions that shouldn't happen until "later" should generally also 131 // not happen before the first write call. 132 this.sync = true; 133 134 // A flag to know if we're processing previously buffered items, which 135 // may call the _write() callback in the same tick, so that we don't 136 // end up in an overlapped onwrite situation. 137 this.bufferProcessing = false; 138 139 // The callback that's passed to _write(chunk, cb). 140 this.onwrite = onwrite.bind(undefined, stream); 141 142 // The callback that the user supplies to write(chunk, encoding, cb). 143 this.writecb = null; 144 145 // The amount that is being written when _write is called. 146 this.writelen = 0; 147 148 // Storage for data passed to the afterWrite() callback in case of 149 // synchronous _write() completion. 150 this.afterWriteTickInfo = null; 151 152 resetBuffer(this); 153 154 // Number of pending user-supplied write callbacks 155 // this must be 0 before 'finish' can be emitted. 156 this.pendingcb = 0; 157 158 // Emit prefinish if the only thing we're waiting for is _write cbs 159 // This is relevant for synchronous Transform streams. 160 this.prefinished = false; 161 162 // True if the error was already emitted and should not be thrown again. 163 this.errorEmitted = false; 164 165 // Should close be emitted on destroy. Defaults to true. 166 this.emitClose = !options || options.emitClose !== false; 167 168 // Should .destroy() be called after 'finish' (and potentially 'end'). 169 this.autoDestroy = !options || options.autoDestroy !== false; 170 171 // Indicates whether the stream has errored. When true all write() calls 172 // should return false. This is needed since when autoDestroy 173 // is disabled we need a way to tell whether the stream has failed. 174 this.errored = null; 175 176 // Indicates whether the stream has finished destroying. 177 this.closed = false; 178} 179 180function resetBuffer(state) { 181 state.buffered = []; 182 state.bufferedIndex = 0; 183 state.allBuffers = true; 184 state.allNoop = true; 185} 186 187WritableState.prototype.getBuffer = function getBuffer() { 188 return this.buffered.slice(this.bufferedIndex); 189}; 190 191ObjectDefineProperty(WritableState.prototype, 'bufferedRequestCount', { 192 get() { 193 return this.buffered.length - this.bufferedIndex; 194 } 195}); 196 197// Test _writableState for inheritance to account for Duplex streams, 198// whose prototype chain only points to Readable. 199let realHasInstance; 200if (typeof Symbol === 'function' && SymbolHasInstance) { 201 realHasInstance = FunctionPrototype[SymbolHasInstance]; 202 ObjectDefineProperty(Writable, SymbolHasInstance, { 203 value: function(object) { 204 if (realHasInstance.call(this, object)) 205 return true; 206 if (this !== Writable) 207 return false; 208 209 return object && object._writableState instanceof WritableState; 210 } 211 }); 212} else { 213 realHasInstance = function(object) { 214 return object instanceof this; 215 }; 216} 217 218function Writable(options) { 219 // Writable ctor is applied to Duplexes, too. 220 // `realHasInstance` is necessary because using plain `instanceof` 221 // would return false, as no `_writableState` property is attached. 222 223 // Trying to use the custom `instanceof` for Writable here will also break the 224 // Node.js LazyTransform implementation, which has a non-trivial getter for 225 // `_writableState` that would lead to infinite recursion. 226 227 // Checking for a Stream.Duplex instance is faster here instead of inside 228 // the WritableState constructor, at least with V8 6.5. 229 const isDuplex = (this instanceof Stream.Duplex); 230 231 if (!isDuplex && !realHasInstance.call(Writable, this)) 232 return new Writable(options); 233 234 this._writableState = new WritableState(options, this, isDuplex); 235 236 if (options) { 237 if (typeof options.write === 'function') 238 this._write = options.write; 239 240 if (typeof options.writev === 'function') 241 this._writev = options.writev; 242 243 if (typeof options.destroy === 'function') 244 this._destroy = options.destroy; 245 246 if (typeof options.final === 'function') 247 this._final = options.final; 248 } 249 250 Stream.call(this, options); 251} 252 253// Otherwise people can pipe Writable streams, which is just wrong. 254Writable.prototype.pipe = function() { 255 errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); 256}; 257 258Writable.prototype.write = function(chunk, encoding, cb) { 259 const state = this._writableState; 260 261 if (typeof encoding === 'function') { 262 cb = encoding; 263 encoding = state.defaultEncoding; 264 } else { 265 if (!encoding) 266 encoding = state.defaultEncoding; 267 if (typeof cb !== 'function') 268 cb = nop; 269 } 270 271 if (chunk === null) { 272 throw new ERR_STREAM_NULL_VALUES(); 273 } else if (!state.objectMode) { 274 if (typeof chunk === 'string') { 275 if (state.decodeStrings !== false) { 276 chunk = Buffer.from(chunk, encoding); 277 encoding = 'buffer'; 278 } 279 } else if (chunk instanceof Buffer) { 280 encoding = 'buffer'; 281 } else if (Stream._isUint8Array(chunk)) { 282 chunk = Stream._uint8ArrayToBuffer(chunk); 283 encoding = 'buffer'; 284 } else { 285 throw new ERR_INVALID_ARG_TYPE( 286 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); 287 } 288 } 289 290 let err; 291 if (state.ending) { 292 err = new ERR_STREAM_WRITE_AFTER_END(); 293 } else if (state.destroyed) { 294 err = new ERR_STREAM_DESTROYED('write'); 295 } 296 297 if (err) { 298 process.nextTick(cb, err); 299 errorOrDestroy(this, err, true); 300 return false; 301 } 302 state.pendingcb++; 303 return writeOrBuffer(this, state, chunk, encoding, cb); 304}; 305 306Writable.prototype.cork = function() { 307 this._writableState.corked++; 308}; 309 310Writable.prototype.uncork = function() { 311 const state = this._writableState; 312 313 if (state.corked) { 314 state.corked--; 315 316 if (!state.writing) 317 clearBuffer(this, state); 318 } 319}; 320 321Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) { 322 // node::ParseEncoding() requires lower case. 323 if (typeof encoding === 'string') 324 encoding = encoding.toLowerCase(); 325 if (!Buffer.isEncoding(encoding)) 326 throw new ERR_UNKNOWN_ENCODING(encoding); 327 this._writableState.defaultEncoding = encoding; 328 return this; 329}; 330 331// If we're already writing something, then just put this 332// in the queue, and wait our turn. Otherwise, call _write 333// If we return false, then we need a drain event, so set that flag. 334function writeOrBuffer(stream, state, chunk, encoding, callback) { 335 const len = state.objectMode ? 1 : chunk.length; 336 337 state.length += len; 338 339 // stream._write resets state.length 340 const ret = state.length < state.highWaterMark; 341 // We must ensure that previous needDrain will not be reset to false. 342 if (!ret) 343 state.needDrain = true; 344 345 if (state.writing || state.corked || state.errored) { 346 state.buffered.push({ chunk, encoding, callback }); 347 if (state.allBuffers && encoding !== 'buffer') { 348 state.allBuffers = false; 349 } 350 if (state.allNoop && callback !== nop) { 351 state.allNoop = false; 352 } 353 } else { 354 state.writelen = len; 355 state.writecb = callback; 356 state.writing = true; 357 state.sync = true; 358 stream._write(chunk, encoding, state.onwrite); 359 state.sync = false; 360 } 361 362 // Return false if errored or destroyed in order to break 363 // any synchronous while(stream.write(data)) loops. 364 return ret && !state.errored && !state.destroyed; 365} 366 367function doWrite(stream, state, writev, len, chunk, encoding, cb) { 368 state.writelen = len; 369 state.writecb = cb; 370 state.writing = true; 371 state.sync = true; 372 if (state.destroyed) 373 state.onwrite(new ERR_STREAM_DESTROYED('write')); 374 else if (writev) 375 stream._writev(chunk, state.onwrite); 376 else 377 stream._write(chunk, encoding, state.onwrite); 378 state.sync = false; 379} 380 381function onwriteError(stream, state, er, cb) { 382 --state.pendingcb; 383 384 cb(er); 385 // Ensure callbacks are invoked even when autoDestroy is 386 // not enabled. Passing `er` here doesn't make sense since 387 // it's related to one specific write, not to the buffered 388 // writes. 389 errorBuffer(state, new ERR_STREAM_DESTROYED('write')); 390 // This can emit error, but error must always follow cb. 391 errorOrDestroy(stream, er); 392} 393 394function onwrite(stream, er) { 395 const state = stream._writableState; 396 const sync = state.sync; 397 const cb = state.writecb; 398 399 if (typeof cb !== 'function') { 400 errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK()); 401 return; 402 } 403 404 state.writing = false; 405 state.writecb = null; 406 state.length -= state.writelen; 407 state.writelen = 0; 408 409 if (er) { 410 // Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364 411 er.stack; // eslint-disable-line no-unused-expressions 412 413 if (!state.errored) { 414 state.errored = er; 415 } 416 417 // In case of duplex streams we need to notify the readable side of the 418 // error. 419 if (stream._readableState && !stream._readableState.errored) { 420 stream._readableState.errored = er; 421 } 422 423 if (sync) { 424 process.nextTick(onwriteError, stream, state, er, cb); 425 } else { 426 onwriteError(stream, state, er, cb); 427 } 428 } else { 429 if (state.buffered.length > state.bufferedIndex) { 430 clearBuffer(stream, state); 431 } 432 433 if (sync) { 434 // It is a common case that the callback passed to .write() is always 435 // the same. In that case, we do not schedule a new nextTick(), but 436 // rather just increase a counter, to improve performance and avoid 437 // memory allocations. 438 if (state.afterWriteTickInfo !== null && 439 state.afterWriteTickInfo.cb === cb) { 440 state.afterWriteTickInfo.count++; 441 } else { 442 state.afterWriteTickInfo = { count: 1, cb, stream, state }; 443 process.nextTick(afterWriteTick, state.afterWriteTickInfo); 444 } 445 } else { 446 afterWrite(stream, state, 1, cb); 447 } 448 } 449} 450 451function afterWriteTick({ stream, state, count, cb }) { 452 state.afterWriteTickInfo = null; 453 return afterWrite(stream, state, count, cb); 454} 455 456function afterWrite(stream, state, count, cb) { 457 const needDrain = !state.ending && !stream.destroyed && state.length === 0 && 458 state.needDrain; 459 if (needDrain) { 460 state.needDrain = false; 461 stream.emit('drain'); 462 } 463 464 while (count-- > 0) { 465 state.pendingcb--; 466 cb(); 467 } 468 469 if (state.destroyed) { 470 errorBuffer(state, new ERR_STREAM_DESTROYED('write')); 471 } 472 473 finishMaybe(stream, state); 474} 475 476// If there's something in the buffer waiting, then invoke callbacks. 477function errorBuffer(state, err) { 478 if (state.writing) { 479 return; 480 } 481 482 for (let n = state.bufferedIndex; n < state.buffered.length; ++n) { 483 const { chunk, callback } = state.buffered[n]; 484 const len = state.objectMode ? 1 : chunk.length; 485 state.length -= len; 486 callback(err); 487 } 488 489 resetBuffer(state); 490} 491 492// If there's something in the buffer waiting, then process it. 493function clearBuffer(stream, state) { 494 if (state.corked || state.bufferProcessing || state.destroyed) { 495 return; 496 } 497 498 const { buffered, bufferedIndex, objectMode } = state; 499 const bufferedLength = buffered.length - bufferedIndex; 500 501 if (!bufferedLength) { 502 return; 503 } 504 505 let i = bufferedIndex; 506 507 state.bufferProcessing = true; 508 if (bufferedLength > 1 && stream._writev) { 509 state.pendingcb -= bufferedLength - 1; 510 511 const callback = state.allNoop ? nop : (err) => { 512 for (let n = i; n < buffered.length; ++n) { 513 buffered[n].callback(err); 514 } 515 }; 516 // Make a copy of `buffered` if it's going to be used by `callback` above, 517 // since `doWrite` will mutate the array. 518 const chunks = state.allNoop && i === 0 ? buffered : buffered.slice(i); 519 chunks.allBuffers = state.allBuffers; 520 521 doWrite(stream, state, true, state.length, chunks, '', callback); 522 523 resetBuffer(state); 524 } else { 525 do { 526 const { chunk, encoding, callback } = buffered[i]; 527 buffered[i++] = null; 528 const len = objectMode ? 1 : chunk.length; 529 doWrite(stream, state, false, len, chunk, encoding, callback); 530 } while (i < buffered.length && !state.writing); 531 532 if (i === buffered.length) { 533 resetBuffer(state); 534 } else if (i > 256) { 535 buffered.splice(0, i); 536 state.bufferedIndex = 0; 537 } else { 538 state.bufferedIndex = i; 539 } 540 } 541 state.bufferProcessing = false; 542} 543 544Writable.prototype._write = function(chunk, encoding, cb) { 545 if (this._writev) { 546 this._writev([{ chunk, encoding }], cb); 547 } else { 548 throw new ERR_METHOD_NOT_IMPLEMENTED('_write()'); 549 } 550}; 551 552Writable.prototype._writev = null; 553 554Writable.prototype.end = function(chunk, encoding, cb) { 555 const state = this._writableState; 556 557 if (typeof chunk === 'function') { 558 cb = chunk; 559 chunk = null; 560 encoding = null; 561 } else if (typeof encoding === 'function') { 562 cb = encoding; 563 encoding = null; 564 } 565 566 if (chunk !== null && chunk !== undefined) 567 this.write(chunk, encoding); 568 569 // .end() fully uncorks. 570 if (state.corked) { 571 state.corked = 1; 572 this.uncork(); 573 } 574 575 // This is forgiving in terms of unnecessary calls to end() and can hide 576 // logic errors. However, usually such errors are harmless and causing a 577 // hard error can be disproportionately destructive. It is not always 578 // trivial for the user to determine whether end() needs to be called or not. 579 let err; 580 if (!state.errored && !state.ending) { 581 state.ending = true; 582 finishMaybe(this, state, true); 583 state.ended = true; 584 } else if (state.finished) { 585 err = new ERR_STREAM_ALREADY_FINISHED('end'); 586 } else if (state.destroyed) { 587 err = new ERR_STREAM_DESTROYED('end'); 588 } 589 590 if (typeof cb === 'function') { 591 if (err || state.finished) 592 process.nextTick(cb, err); 593 else 594 onFinished(this, cb); 595 } 596 597 return this; 598}; 599 600function needFinish(state) { 601 return (state.ending && 602 state.length === 0 && 603 !state.errored && 604 state.buffered.length === 0 && 605 !state.finished && 606 !state.writing); 607} 608 609function callFinal(stream, state) { 610 stream._final((err) => { 611 state.pendingcb--; 612 if (err) { 613 errorOrDestroy(stream, err); 614 } else { 615 state.prefinished = true; 616 stream.emit('prefinish'); 617 finishMaybe(stream, state); 618 } 619 }); 620} 621 622function prefinish(stream, state) { 623 if (!state.prefinished && !state.finalCalled) { 624 if (typeof stream._final === 'function' && !state.destroyed) { 625 state.pendingcb++; 626 state.finalCalled = true; 627 process.nextTick(callFinal, stream, state); 628 } else { 629 state.prefinished = true; 630 stream.emit('prefinish'); 631 } 632 } 633} 634 635function finishMaybe(stream, state, sync) { 636 const need = needFinish(state); 637 if (need) { 638 prefinish(stream, state); 639 if (state.pendingcb === 0) { 640 state.pendingcb++; 641 if (sync) { 642 process.nextTick(finish, stream, state); 643 } else { 644 finish(stream, state); 645 } 646 } 647 } 648 return need; 649} 650 651function finish(stream, state) { 652 state.pendingcb--; 653 if (state.errorEmitted) 654 return; 655 656 state.finished = true; 657 stream.emit('finish'); 658 659 if (state.autoDestroy) { 660 // In case of duplex streams we need a way to detect 661 // if the readable side is ready for autoDestroy as well. 662 const rState = stream._readableState; 663 const autoDestroy = !rState || ( 664 rState.autoDestroy && 665 // We don't expect the readable to ever 'end' 666 // if readable is explicitly set to false. 667 (rState.endEmitted || rState.readable === false) 668 ); 669 if (autoDestroy) { 670 stream.destroy(); 671 } 672 } 673} 674 675// TODO(ronag): Avoid using events to implement internal logic. 676function onFinished(stream, cb) { 677 function onerror(err) { 678 stream.removeListener('finish', onfinish); 679 stream.removeListener('error', onerror); 680 cb(err); 681 if (stream.listenerCount('error') === 0) { 682 stream.emit('error', err); 683 } 684 } 685 686 function onfinish() { 687 stream.removeListener('finish', onfinish); 688 stream.removeListener('error', onerror); 689 cb(); 690 } 691 stream.on('finish', onfinish); 692 stream.prependListener('error', onerror); 693} 694 695ObjectDefineProperties(Writable.prototype, { 696 697 destroyed: { 698 get() { 699 return this._writableState ? this._writableState.destroyed : false; 700 }, 701 set(value) { 702 // Backward compatibility, the user is explicitly managing destroyed. 703 if (this._writableState) { 704 this._writableState.destroyed = value; 705 } 706 } 707 }, 708 709 writable: { 710 get() { 711 const w = this._writableState; 712 // w.writable === false means that this is part of a Duplex stream 713 // where the writable side was disabled upon construction. 714 // Compat. The user might manually disable writable side through 715 // deprecated setter. 716 return !!w && w.writable !== false && !w.destroyed && !w.errored && 717 !w.ending && !w.ended; 718 }, 719 set(val) { 720 // Backwards compatible. 721 if (this._writableState) { 722 this._writableState.writable = !!val; 723 } 724 } 725 }, 726 727 writableFinished: { 728 get() { 729 return this._writableState ? this._writableState.finished : false; 730 } 731 }, 732 733 writableObjectMode: { 734 get() { 735 return this._writableState ? this._writableState.objectMode : false; 736 } 737 }, 738 739 writableBuffer: { 740 get() { 741 return this._writableState && this._writableState.getBuffer(); 742 } 743 }, 744 745 writableEnded: { 746 get() { 747 return this._writableState ? this._writableState.ending : false; 748 } 749 }, 750 751 writableNeedDrain: { 752 get() { 753 const wState = this._writableState; 754 if (!wState) return false; 755 return !wState.destroyed && !wState.ending && wState.needDrain; 756 } 757 }, 758 759 writableHighWaterMark: { 760 get() { 761 return this._writableState && this._writableState.highWaterMark; 762 } 763 }, 764 765 writableCorked: { 766 get() { 767 return this._writableState ? this._writableState.corked : 0; 768 } 769 }, 770 771 writableLength: { 772 get() { 773 return this._writableState && this._writableState.length; 774 } 775 } 776}); 777 778const destroy = destroyImpl.destroy; 779Writable.prototype.destroy = function(err, cb) { 780 const state = this._writableState; 781 782 if (!state.destroyed) { 783 process.nextTick(errorBuffer, state, new ERR_STREAM_DESTROYED('write')); 784 } 785 destroy.call(this, err, cb); 786 return this; 787}; 788 789Writable.prototype._undestroy = destroyImpl.undestroy; 790Writable.prototype._destroy = function(err, cb) { 791 cb(err); 792}; 793 794Writable.prototype[EE.captureRejectionSymbol] = function(err) { 795 this.destroy(err); 796}; 797