1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const { 25 NumberIsInteger, 26 NumberIsNaN, 27 NumberParseInt, 28 ObjectDefineProperties, 29 ObjectSetPrototypeOf, 30 Promise, 31 Set, 32 SymbolAsyncIterator, 33 Symbol 34} = primordials; 35 36module.exports = Readable; 37Readable.ReadableState = ReadableState; 38 39const EE = require('events'); 40const { Stream, prependListener } = require('internal/streams/legacy'); 41const { Buffer } = require('buffer'); 42 43let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { 44 debug = fn; 45}); 46const BufferList = require('internal/streams/buffer_list'); 47const destroyImpl = require('internal/streams/destroy'); 48const { 49 getHighWaterMark, 50 getDefaultHighWaterMark 51} = require('internal/streams/state'); 52const { 53 ERR_INVALID_ARG_TYPE, 54 ERR_STREAM_PUSH_AFTER_EOF, 55 ERR_METHOD_NOT_IMPLEMENTED, 56 ERR_STREAM_UNSHIFT_AFTER_END_EVENT 57} = require('internal/errors').codes; 58 59const kPaused = Symbol('kPaused'); 60 61// Lazy loaded to improve the startup performance. 62let StringDecoder; 63let from; 64 65ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); 66ObjectSetPrototypeOf(Readable, Stream); 67function nop() {} 68 69const { errorOrDestroy } = destroyImpl; 70 71function ReadableState(options, stream, isDuplex) { 72 // Duplex streams are both readable and writable, but share 73 // the same options object. 74 // However, some cases require setting options to different 75 // values for the readable and the writable sides of the duplex stream. 76 // These options can be provided separately as readableXXX and writableXXX. 77 if (typeof isDuplex !== 'boolean') 78 isDuplex = stream instanceof Stream.Duplex; 79 80 // Object stream flag. Used to make read(n) ignore n and to 81 // make all the buffer merging and length checks go away. 82 this.objectMode = !!(options && options.objectMode); 83 84 if (isDuplex) 85 this.objectMode = this.objectMode || 86 !!(options && options.readableObjectMode); 87 88 // The point at which it stops calling _read() to fill the buffer 89 // Note: 0 is a valid value, means "don't call _read preemptively ever" 90 this.highWaterMark = options ? 91 getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) : 92 getDefaultHighWaterMark(false); 93 94 // A linked list is used to store data chunks instead of an array because the 95 // linked list can remove elements from the beginning faster than 96 // array.shift(). 97 this.buffer = new BufferList(); 98 this.length = 0; 99 this.pipes = []; 100 this.flowing = null; 101 this.ended = false; 102 this.endEmitted = false; 103 this.reading = false; 104 105 // A flag to be able to tell if the event 'readable'/'data' is emitted 106 // immediately, or on a later tick. We set this to true at first, because 107 // any actions that shouldn't happen until "later" should generally also 108 // not happen before the first read call. 109 this.sync = true; 110 111 // Whenever we return null, then we set a flag to say 112 // that we're awaiting a 'readable' event emission. 113 this.needReadable = false; 114 this.emittedReadable = false; 115 this.readableListening = false; 116 this.resumeScheduled = false; 117 this[kPaused] = null; 118 119 // True if the error was already emitted and should not be thrown again. 120 this.errorEmitted = false; 121 122 // Should close be emitted on destroy. Defaults to true. 123 this.emitClose = !options || options.emitClose !== false; 124 125 // Should .destroy() be called after 'end' (and potentially 'finish'). 126 this.autoDestroy = !options || options.autoDestroy !== false; 127 128 // Has it been destroyed. 129 this.destroyed = false; 130 131 // Indicates whether the stream has errored. When true no further 132 // _read calls, 'data' or 'readable' events should occur. This is needed 133 // since when autoDestroy is disabled we need a way to tell whether the 134 // stream has failed. 135 this.errored = null; 136 137 // Indicates whether the stream has finished destroying. 138 this.closed = false; 139 140 // True if close has been emitted or would have been emitted 141 // depending on emitClose. 142 this.closeEmitted = false; 143 144 // Crypto is kind of old and crusty. Historically, its default string 145 // encoding is 'binary' so we have to make this configurable. 146 // Everything else in the universe uses 'utf8', though. 147 this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; 148 149 // Ref the piped dest which we need a drain event on it 150 // type: null | Writable | Set<Writable>. 151 this.awaitDrainWriters = null; 152 this.multiAwaitDrain = false; 153 154 // If true, a maybeReadMore has been scheduled. 155 this.readingMore = false; 156 157 this.dataEmitted = false; 158 159 this.decoder = null; 160 this.encoding = null; 161 if (options && options.encoding) { 162 if (!StringDecoder) 163 StringDecoder = require('string_decoder').StringDecoder; 164 this.decoder = new StringDecoder(options.encoding); 165 this.encoding = options.encoding; 166 } 167} 168 169 170function Readable(options) { 171 if (!(this instanceof Readable)) 172 return new Readable(options); 173 174 // Checking for a Stream.Duplex instance is faster here instead of inside 175 // the ReadableState constructor, at least with V8 6.5. 176 const isDuplex = this instanceof Stream.Duplex; 177 178 this._readableState = new ReadableState(options, this, isDuplex); 179 180 if (options) { 181 if (typeof options.read === 'function') 182 this._read = options.read; 183 184 if (typeof options.destroy === 'function') 185 this._destroy = options.destroy; 186 } 187 188 Stream.call(this, options); 189} 190 191Readable.prototype.destroy = destroyImpl.destroy; 192Readable.prototype._undestroy = destroyImpl.undestroy; 193Readable.prototype._destroy = function(err, cb) { 194 cb(err); 195}; 196 197Readable.prototype[EE.captureRejectionSymbol] = function(err) { 198 this.destroy(err); 199}; 200 201// Manually shove something into the read() buffer. 202// This returns true if the highWaterMark has not been hit yet, 203// similar to how Writable.write() returns true if you should 204// write() some more. 205Readable.prototype.push = function(chunk, encoding) { 206 return readableAddChunk(this, chunk, encoding, false); 207}; 208 209// Unshift should *always* be something directly out of read(). 210Readable.prototype.unshift = function(chunk, encoding) { 211 return readableAddChunk(this, chunk, encoding, true); 212}; 213 214function readableAddChunk(stream, chunk, encoding, addToFront) { 215 debug('readableAddChunk', chunk); 216 const state = stream._readableState; 217 218 let err; 219 if (!state.objectMode) { 220 if (typeof chunk === 'string') { 221 encoding = encoding || state.defaultEncoding; 222 if (state.encoding !== encoding) { 223 if (addToFront && state.encoding) { 224 // When unshifting, if state.encoding is set, we have to save 225 // the string in the BufferList with the state encoding. 226 chunk = Buffer.from(chunk, encoding).toString(state.encoding); 227 } else { 228 chunk = Buffer.from(chunk, encoding); 229 encoding = ''; 230 } 231 } 232 } else if (chunk instanceof Buffer) { 233 encoding = ''; 234 } else if (Stream._isUint8Array(chunk)) { 235 chunk = Stream._uint8ArrayToBuffer(chunk); 236 encoding = ''; 237 } else if (chunk != null) { 238 err = new ERR_INVALID_ARG_TYPE( 239 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); 240 } 241 } 242 243 if (err) { 244 errorOrDestroy(stream, err); 245 } else if (chunk === null) { 246 state.reading = false; 247 onEofChunk(stream, state); 248 } else if (state.objectMode || (chunk && chunk.length > 0)) { 249 if (addToFront) { 250 if (state.endEmitted) 251 errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); 252 else 253 addChunk(stream, state, chunk, true); 254 } else if (state.ended) { 255 errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); 256 } else if (state.destroyed) { 257 return false; 258 } else { 259 state.reading = false; 260 if (state.decoder && !encoding) { 261 chunk = state.decoder.write(chunk); 262 if (state.objectMode || chunk.length !== 0) 263 addChunk(stream, state, chunk, false); 264 else 265 maybeReadMore(stream, state); 266 } else { 267 addChunk(stream, state, chunk, false); 268 } 269 } 270 } else if (!addToFront) { 271 state.reading = false; 272 maybeReadMore(stream, state); 273 } 274 275 // We can push more data if we are below the highWaterMark. 276 // Also, if we have no data yet, we can stand some more bytes. 277 // This is to work around cases where hwm=0, such as the repl. 278 return !state.ended && 279 (state.length < state.highWaterMark || state.length === 0); 280} 281 282function addChunk(stream, state, chunk, addToFront) { 283 if (state.flowing && state.length === 0 && !state.sync && 284 stream.listenerCount('data') > 0) { 285 // Use the guard to avoid creating `Set()` repeatedly 286 // when we have multiple pipes. 287 if (state.multiAwaitDrain) { 288 state.awaitDrainWriters.clear(); 289 } else { 290 state.awaitDrainWriters = null; 291 } 292 state.dataEmitted = true; 293 stream.emit('data', chunk); 294 } else { 295 // Update the buffer info. 296 state.length += state.objectMode ? 1 : chunk.length; 297 if (addToFront) 298 state.buffer.unshift(chunk); 299 else 300 state.buffer.push(chunk); 301 302 if (state.needReadable) 303 emitReadable(stream); 304 } 305 maybeReadMore(stream, state); 306} 307 308Readable.prototype.isPaused = function() { 309 const state = this._readableState; 310 return state[kPaused] === true || state.flowing === false; 311}; 312 313// Backwards compatibility. 314Readable.prototype.setEncoding = function(enc) { 315 if (!StringDecoder) 316 StringDecoder = require('string_decoder').StringDecoder; 317 const decoder = new StringDecoder(enc); 318 this._readableState.decoder = decoder; 319 // If setEncoding(null), decoder.encoding equals utf8. 320 this._readableState.encoding = this._readableState.decoder.encoding; 321 322 const buffer = this._readableState.buffer; 323 // Iterate over current buffer to convert already stored Buffers: 324 let content = ''; 325 for (const data of buffer) { 326 content += decoder.write(data); 327 } 328 buffer.clear(); 329 if (content !== '') 330 buffer.push(content); 331 this._readableState.length = content.length; 332 return this; 333}; 334 335// Don't raise the hwm > 1GB. 336const MAX_HWM = 0x40000000; 337function computeNewHighWaterMark(n) { 338 if (n >= MAX_HWM) { 339 // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE. 340 n = MAX_HWM; 341 } else { 342 // Get the next highest power of 2 to prevent increasing hwm excessively in 343 // tiny amounts. 344 n--; 345 n |= n >>> 1; 346 n |= n >>> 2; 347 n |= n >>> 4; 348 n |= n >>> 8; 349 n |= n >>> 16; 350 n++; 351 } 352 return n; 353} 354 355// This function is designed to be inlinable, so please take care when making 356// changes to the function body. 357function howMuchToRead(n, state) { 358 if (n <= 0 || (state.length === 0 && state.ended)) 359 return 0; 360 if (state.objectMode) 361 return 1; 362 if (NumberIsNaN(n)) { 363 // Only flow one buffer at a time. 364 if (state.flowing && state.length) 365 return state.buffer.first().length; 366 return state.length; 367 } 368 if (n <= state.length) 369 return n; 370 return state.ended ? state.length : 0; 371} 372 373// You can override either this method, or the async _read(n) below. 374Readable.prototype.read = function(n) { 375 debug('read', n); 376 // Same as NumberParseInt(undefined, 10), however V8 7.3 performance regressed 377 // in this scenario, so we are doing it manually. 378 if (n === undefined) { 379 n = NaN; 380 } else if (!NumberIsInteger(n)) { 381 n = NumberParseInt(n, 10); 382 } 383 const state = this._readableState; 384 const nOrig = n; 385 386 // If we're asking for more than the current hwm, then raise the hwm. 387 if (n > state.highWaterMark) 388 state.highWaterMark = computeNewHighWaterMark(n); 389 390 if (n !== 0) 391 state.emittedReadable = false; 392 393 // If we're doing read(0) to trigger a readable event, but we 394 // already have a bunch of data in the buffer, then just trigger 395 // the 'readable' event and move on. 396 if (n === 0 && 397 state.needReadable && 398 ((state.highWaterMark !== 0 ? 399 state.length >= state.highWaterMark : 400 state.length > 0) || 401 state.ended)) { 402 debug('read: emitReadable', state.length, state.ended); 403 if (state.length === 0 && state.ended) 404 endReadable(this); 405 else 406 emitReadable(this); 407 return null; 408 } 409 410 n = howMuchToRead(n, state); 411 412 // If we've ended, and we're now clear, then finish it up. 413 if (n === 0 && state.ended) { 414 if (state.length === 0) 415 endReadable(this); 416 return null; 417 } 418 419 // All the actual chunk generation logic needs to be 420 // *below* the call to _read. The reason is that in certain 421 // synthetic stream cases, such as passthrough streams, _read 422 // may be a completely synchronous operation which may change 423 // the state of the read buffer, providing enough data when 424 // before there was *not* enough. 425 // 426 // So, the steps are: 427 // 1. Figure out what the state of things will be after we do 428 // a read from the buffer. 429 // 430 // 2. If that resulting state will trigger a _read, then call _read. 431 // Note that this may be asynchronous, or synchronous. Yes, it is 432 // deeply ugly to write APIs this way, but that still doesn't mean 433 // that the Readable class should behave improperly, as streams are 434 // designed to be sync/async agnostic. 435 // Take note if the _read call is sync or async (ie, if the read call 436 // has returned yet), so that we know whether or not it's safe to emit 437 // 'readable' etc. 438 // 439 // 3. Actually pull the requested chunks out of the buffer and return. 440 441 // if we need a readable event, then we need to do some reading. 442 let doRead = state.needReadable; 443 debug('need readable', doRead); 444 445 // If we currently have less than the highWaterMark, then also read some. 446 if (state.length === 0 || state.length - n < state.highWaterMark) { 447 doRead = true; 448 debug('length less than watermark', doRead); 449 } 450 451 // However, if we've ended, then there's no point, if we're already 452 // reading, then it's unnecessary, and if we're destroyed, then it's 453 // not allowed. 454 if (state.ended || state.reading || state.destroyed) { 455 doRead = false; 456 debug('reading or ended', doRead); 457 } else if (doRead) { 458 debug('do read'); 459 state.reading = true; 460 state.sync = true; 461 // If the length is currently zero, then we *need* a readable event. 462 if (state.length === 0) 463 state.needReadable = true; 464 // Call internal read method 465 this._read(state.highWaterMark); 466 state.sync = false; 467 // If _read pushed data synchronously, then `reading` will be false, 468 // and we need to re-evaluate how much data we can return to the user. 469 if (!state.reading) 470 n = howMuchToRead(nOrig, state); 471 } 472 473 let ret; 474 if (n > 0) 475 ret = fromList(n, state); 476 else 477 ret = null; 478 479 if (ret === null) { 480 state.needReadable = state.length <= state.highWaterMark; 481 n = 0; 482 } else { 483 state.length -= n; 484 if (state.multiAwaitDrain) { 485 state.awaitDrainWriters.clear(); 486 } else { 487 state.awaitDrainWriters = null; 488 } 489 } 490 491 if (state.length === 0) { 492 // If we have nothing in the buffer, then we want to know 493 // as soon as we *do* get something into the buffer. 494 if (!state.ended) 495 state.needReadable = true; 496 497 // If we tried to read() past the EOF, then emit end on the next tick. 498 if (nOrig !== n && state.ended) 499 endReadable(this); 500 } 501 502 if (ret !== null) { 503 state.dataEmitted = true; 504 this.emit('data', ret); 505 } 506 507 return ret; 508}; 509 510function onEofChunk(stream, state) { 511 debug('onEofChunk'); 512 if (state.ended) return; 513 if (state.decoder) { 514 const chunk = state.decoder.end(); 515 if (chunk && chunk.length) { 516 state.buffer.push(chunk); 517 state.length += state.objectMode ? 1 : chunk.length; 518 } 519 } 520 state.ended = true; 521 522 if (state.sync) { 523 // If we are sync, wait until next tick to emit the data. 524 // Otherwise we risk emitting data in the flow() 525 // the readable code triggers during a read() call. 526 emitReadable(stream); 527 } else { 528 // Emit 'readable' now to make sure it gets picked up. 529 state.needReadable = false; 530 state.emittedReadable = true; 531 // We have to emit readable now that we are EOF. Modules 532 // in the ecosystem (e.g. dicer) rely on this event being sync. 533 emitReadable_(stream); 534 } 535} 536 537// Don't emit readable right away in sync mode, because this can trigger 538// another read() call => stack overflow. This way, it might trigger 539// a nextTick recursion warning, but that's not so bad. 540function emitReadable(stream) { 541 const state = stream._readableState; 542 debug('emitReadable', state.needReadable, state.emittedReadable); 543 state.needReadable = false; 544 if (!state.emittedReadable) { 545 debug('emitReadable', state.flowing); 546 state.emittedReadable = true; 547 process.nextTick(emitReadable_, stream); 548 } 549} 550 551function emitReadable_(stream) { 552 const state = stream._readableState; 553 debug('emitReadable_', state.destroyed, state.length, state.ended); 554 if (!state.destroyed && (state.length || state.ended)) { 555 stream.emit('readable'); 556 state.emittedReadable = false; 557 } 558 559 // The stream needs another readable event if: 560 // 1. It is not flowing, as the flow mechanism will take 561 // care of it. 562 // 2. It is not ended. 563 // 3. It is below the highWaterMark, so we can schedule 564 // another readable later. 565 state.needReadable = 566 !state.flowing && 567 !state.ended && 568 state.length <= state.highWaterMark; 569 flow(stream); 570} 571 572 573// At this point, the user has presumably seen the 'readable' event, 574// and called read() to consume some data. that may have triggered 575// in turn another _read(n) call, in which case reading = true if 576// it's in progress. 577// However, if we're not ended, or reading, and the length < hwm, 578// then go ahead and try to read some more preemptively. 579function maybeReadMore(stream, state) { 580 if (!state.readingMore) { 581 state.readingMore = true; 582 process.nextTick(maybeReadMore_, stream, state); 583 } 584} 585 586function maybeReadMore_(stream, state) { 587 // Attempt to read more data if we should. 588 // 589 // The conditions for reading more data are (one of): 590 // - Not enough data buffered (state.length < state.highWaterMark). The loop 591 // is responsible for filling the buffer with enough data if such data 592 // is available. If highWaterMark is 0 and we are not in the flowing mode 593 // we should _not_ attempt to buffer any extra data. We'll get more data 594 // when the stream consumer calls read() instead. 595 // - No data in the buffer, and the stream is in flowing mode. In this mode 596 // the loop below is responsible for ensuring read() is called. Failing to 597 // call read here would abort the flow and there's no other mechanism for 598 // continuing the flow if the stream consumer has just subscribed to the 599 // 'data' event. 600 // 601 // In addition to the above conditions to keep reading data, the following 602 // conditions prevent the data from being read: 603 // - The stream has ended (state.ended). 604 // - There is already a pending 'read' operation (state.reading). This is a 605 // case where the stream has called the implementation defined _read() 606 // method, but they are processing the call asynchronously and have _not_ 607 // called push() with new data. In this case we skip performing more 608 // read()s. The execution ends in this method again after the _read() ends 609 // up calling push() with more data. 610 while (!state.reading && !state.ended && 611 (state.length < state.highWaterMark || 612 (state.flowing && state.length === 0))) { 613 const len = state.length; 614 debug('maybeReadMore read 0'); 615 stream.read(0); 616 if (len === state.length) 617 // Didn't get any data, stop spinning. 618 break; 619 } 620 state.readingMore = false; 621} 622 623// Abstract method. to be overridden in specific implementation classes. 624// call cb(er, data) where data is <= n in length. 625// for virtual (non-string, non-buffer) streams, "length" is somewhat 626// arbitrary, and perhaps not very meaningful. 627Readable.prototype._read = function(n) { 628 throw new ERR_METHOD_NOT_IMPLEMENTED('_read()'); 629}; 630 631Readable.prototype.pipe = function(dest, pipeOpts) { 632 const src = this; 633 const state = this._readableState; 634 635 if (state.pipes.length === 1) { 636 if (!state.multiAwaitDrain) { 637 state.multiAwaitDrain = true; 638 state.awaitDrainWriters = new Set( 639 state.awaitDrainWriters ? [state.awaitDrainWriters] : [] 640 ); 641 } 642 } 643 644 state.pipes.push(dest); 645 debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); 646 647 const doEnd = (!pipeOpts || pipeOpts.end !== false) && 648 dest !== process.stdout && 649 dest !== process.stderr; 650 651 const endFn = doEnd ? onend : unpipe; 652 if (state.endEmitted) 653 process.nextTick(endFn); 654 else 655 src.once('end', endFn); 656 657 dest.on('unpipe', onunpipe); 658 function onunpipe(readable, unpipeInfo) { 659 debug('onunpipe'); 660 if (readable === src) { 661 if (unpipeInfo && unpipeInfo.hasUnpiped === false) { 662 unpipeInfo.hasUnpiped = true; 663 cleanup(); 664 } 665 } 666 } 667 668 function onend() { 669 debug('onend'); 670 dest.end(); 671 } 672 673 let ondrain; 674 675 let cleanedUp = false; 676 function cleanup() { 677 debug('cleanup'); 678 // Cleanup event handlers once the pipe is broken. 679 dest.removeListener('close', onclose); 680 dest.removeListener('finish', onfinish); 681 if (ondrain) { 682 dest.removeListener('drain', ondrain); 683 } 684 dest.removeListener('error', onerror); 685 dest.removeListener('unpipe', onunpipe); 686 src.removeListener('end', onend); 687 src.removeListener('end', unpipe); 688 src.removeListener('data', ondata); 689 690 cleanedUp = true; 691 692 // If the reader is waiting for a drain event from this 693 // specific writer, then it would cause it to never start 694 // flowing again. 695 // So, if this is awaiting a drain, then we just call it now. 696 // If we don't know, then assume that we are waiting for one. 697 if (ondrain && state.awaitDrainWriters && 698 (!dest._writableState || dest._writableState.needDrain)) 699 ondrain(); 700 } 701 702 function pause() { 703 // If the user unpiped during `dest.write()`, it is possible 704 // to get stuck in a permanently paused state if that write 705 // also returned false. 706 // => Check whether `dest` is still a piping destination. 707 if (!cleanedUp) { 708 if (state.pipes.length === 1 && state.pipes[0] === dest) { 709 debug('false write response, pause', 0); 710 state.awaitDrainWriters = dest; 711 state.multiAwaitDrain = false; 712 } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { 713 debug('false write response, pause', state.awaitDrainWriters.size); 714 state.awaitDrainWriters.add(dest); 715 } 716 src.pause(); 717 } 718 if (!ondrain) { 719 // When the dest drains, it reduces the awaitDrain counter 720 // on the source. This would be more elegant with a .once() 721 // handler in flow(), but adding and removing repeatedly is 722 // too slow. 723 ondrain = pipeOnDrain(src, dest); 724 dest.on('drain', ondrain); 725 } 726 } 727 728 src.on('data', ondata); 729 function ondata(chunk) { 730 debug('ondata'); 731 const ret = dest.write(chunk); 732 debug('dest.write', ret); 733 if (ret === false) { 734 pause(); 735 } 736 } 737 738 // If the dest has an error, then stop piping into it. 739 // However, don't suppress the throwing behavior for this. 740 function onerror(er) { 741 debug('onerror', er); 742 unpipe(); 743 dest.removeListener('error', onerror); 744 if (EE.listenerCount(dest, 'error') === 0) { 745 const s = dest._writableState || dest._readableState; 746 if (s && !s.errorEmitted) { 747 // User incorrectly emitted 'error' directly on the stream. 748 errorOrDestroy(dest, er); 749 } else { 750 dest.emit('error', er); 751 } 752 } 753 } 754 755 // Make sure our error handler is attached before userland ones. 756 prependListener(dest, 'error', onerror); 757 758 // Both close and finish should trigger unpipe, but only once. 759 function onclose() { 760 dest.removeListener('finish', onfinish); 761 unpipe(); 762 } 763 dest.once('close', onclose); 764 function onfinish() { 765 debug('onfinish'); 766 dest.removeListener('close', onclose); 767 unpipe(); 768 } 769 dest.once('finish', onfinish); 770 771 function unpipe() { 772 debug('unpipe'); 773 src.unpipe(dest); 774 } 775 776 // Tell the dest that it's being piped to. 777 dest.emit('pipe', src); 778 779 // Start the flow if it hasn't been started already. 780 781 if (dest.writableNeedDrain === true) { 782 if (state.flowing) { 783 pause(); 784 } 785 } else if (!state.flowing) { 786 debug('pipe resume'); 787 src.resume(); 788 } 789 790 return dest; 791}; 792 793function pipeOnDrain(src, dest) { 794 return function pipeOnDrainFunctionResult() { 795 const state = src._readableState; 796 797 // `ondrain` will call directly, 798 // `this` maybe not a reference to dest, 799 // so we use the real dest here. 800 if (state.awaitDrainWriters === dest) { 801 debug('pipeOnDrain', 1); 802 state.awaitDrainWriters = null; 803 } else if (state.multiAwaitDrain) { 804 debug('pipeOnDrain', state.awaitDrainWriters.size); 805 state.awaitDrainWriters.delete(dest); 806 } 807 808 if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && 809 EE.listenerCount(src, 'data')) { 810 src.resume(); 811 } 812 }; 813} 814 815 816Readable.prototype.unpipe = function(dest) { 817 const state = this._readableState; 818 const unpipeInfo = { hasUnpiped: false }; 819 820 // If we're not piping anywhere, then do nothing. 821 if (state.pipes.length === 0) 822 return this; 823 824 if (!dest) { 825 // remove all. 826 const dests = state.pipes; 827 state.pipes = []; 828 this.pause(); 829 830 for (let i = 0; i < dests.length; i++) 831 dests[i].emit('unpipe', this, { hasUnpiped: false }); 832 return this; 833 } 834 835 // Try to find the right one. 836 const index = state.pipes.indexOf(dest); 837 if (index === -1) 838 return this; 839 840 state.pipes.splice(index, 1); 841 if (state.pipes.length === 0) 842 this.pause(); 843 844 dest.emit('unpipe', this, unpipeInfo); 845 846 return this; 847}; 848 849// Set up data events if they are asked for 850// Ensure readable listeners eventually get something. 851Readable.prototype.on = function(ev, fn) { 852 const res = Stream.prototype.on.call(this, ev, fn); 853 const state = this._readableState; 854 855 if (ev === 'data') { 856 // Update readableListening so that resume() may be a no-op 857 // a few lines down. This is needed to support once('readable'). 858 state.readableListening = this.listenerCount('readable') > 0; 859 860 // Try start flowing on next tick if stream isn't explicitly paused. 861 if (state.flowing !== false) 862 this.resume(); 863 } else if (ev === 'readable') { 864 if (!state.endEmitted && !state.readableListening) { 865 state.readableListening = state.needReadable = true; 866 state.flowing = false; 867 state.emittedReadable = false; 868 debug('on readable', state.length, state.reading); 869 if (state.length) { 870 emitReadable(this); 871 } else if (!state.reading) { 872 process.nextTick(nReadingNextTick, this); 873 } 874 } 875 } 876 877 return res; 878}; 879Readable.prototype.addListener = Readable.prototype.on; 880 881Readable.prototype.removeListener = function(ev, fn) { 882 const res = Stream.prototype.removeListener.call(this, ev, fn); 883 884 if (ev === 'readable') { 885 // We need to check if there is someone still listening to 886 // readable and reset the state. However this needs to happen 887 // after readable has been emitted but before I/O (nextTick) to 888 // support once('readable', fn) cycles. This means that calling 889 // resume within the same tick will have no 890 // effect. 891 process.nextTick(updateReadableListening, this); 892 } 893 894 return res; 895}; 896Readable.prototype.off = Readable.prototype.removeListener; 897 898Readable.prototype.removeAllListeners = function(ev) { 899 const res = Stream.prototype.removeAllListeners.apply(this, arguments); 900 901 if (ev === 'readable' || ev === undefined) { 902 // We need to check if there is someone still listening to 903 // readable and reset the state. However this needs to happen 904 // after readable has been emitted but before I/O (nextTick) to 905 // support once('readable', fn) cycles. This means that calling 906 // resume within the same tick will have no 907 // effect. 908 process.nextTick(updateReadableListening, this); 909 } 910 911 return res; 912}; 913 914function updateReadableListening(self) { 915 const state = self._readableState; 916 state.readableListening = self.listenerCount('readable') > 0; 917 918 if (state.resumeScheduled && state[kPaused] === false) { 919 // Flowing needs to be set to true now, otherwise 920 // the upcoming resume will not flow. 921 state.flowing = true; 922 923 // Crude way to check if we should resume. 924 } else if (self.listenerCount('data') > 0) { 925 self.resume(); 926 } else if (!state.readableListening) { 927 state.flowing = null; 928 } 929} 930 931function nReadingNextTick(self) { 932 debug('readable nexttick read 0'); 933 self.read(0); 934} 935 936// pause() and resume() are remnants of the legacy readable stream API 937// If the user uses them, then switch into old mode. 938Readable.prototype.resume = function() { 939 const state = this._readableState; 940 if (!state.flowing) { 941 debug('resume'); 942 // We flow only if there is no one listening 943 // for readable, but we still have to call 944 // resume(). 945 state.flowing = !state.readableListening; 946 resume(this, state); 947 } 948 state[kPaused] = false; 949 return this; 950}; 951 952function resume(stream, state) { 953 if (!state.resumeScheduled) { 954 state.resumeScheduled = true; 955 process.nextTick(resume_, stream, state); 956 } 957} 958 959function resume_(stream, state) { 960 debug('resume', state.reading); 961 if (!state.reading) { 962 stream.read(0); 963 } 964 965 state.resumeScheduled = false; 966 stream.emit('resume'); 967 flow(stream); 968 if (state.flowing && !state.reading) 969 stream.read(0); 970} 971 972Readable.prototype.pause = function() { 973 debug('call pause flowing=%j', this._readableState.flowing); 974 if (this._readableState.flowing !== false) { 975 debug('pause'); 976 this._readableState.flowing = false; 977 this.emit('pause'); 978 } 979 this._readableState[kPaused] = true; 980 return this; 981}; 982 983function flow(stream) { 984 const state = stream._readableState; 985 debug('flow', state.flowing); 986 while (state.flowing && stream.read() !== null); 987} 988 989// Wrap an old-style stream as the async data source. 990// This is *not* part of the readable stream interface. 991// It is an ugly unfortunate mess of history. 992Readable.prototype.wrap = function(stream) { 993 const state = this._readableState; 994 let paused = false; 995 996 stream.on('end', () => { 997 debug('wrapped end'); 998 if (state.decoder && !state.ended) { 999 const chunk = state.decoder.end(); 1000 if (chunk && chunk.length) 1001 this.push(chunk); 1002 } 1003 1004 this.push(null); 1005 }); 1006 1007 stream.on('data', (chunk) => { 1008 debug('wrapped data'); 1009 if (state.decoder) 1010 chunk = state.decoder.write(chunk); 1011 1012 // Don't skip over falsy values in objectMode. 1013 if (state.objectMode && (chunk === null || chunk === undefined)) 1014 return; 1015 else if (!state.objectMode && (!chunk || !chunk.length)) 1016 return; 1017 1018 const ret = this.push(chunk); 1019 if (!ret) { 1020 paused = true; 1021 stream.pause(); 1022 } 1023 }); 1024 1025 // Proxy all the other methods. Important when wrapping filters and duplexes. 1026 for (const i in stream) { 1027 if (this[i] === undefined && typeof stream[i] === 'function') { 1028 this[i] = function methodWrap(method) { 1029 return function methodWrapReturnFunction() { 1030 return stream[method].apply(stream, arguments); 1031 }; 1032 }(i); 1033 } 1034 } 1035 1036 stream.on('error', (err) => { 1037 errorOrDestroy(this, err); 1038 }); 1039 1040 stream.on('close', () => { 1041 // TODO(ronag): Update readable state? 1042 this.emit('close'); 1043 }); 1044 1045 stream.on('destroy', () => { 1046 // TODO(ronag): this.destroy()? 1047 this.emit('destroy'); 1048 }); 1049 1050 stream.on('pause', () => { 1051 // TODO(ronag): this.pause()? 1052 this.emit('pause'); 1053 }); 1054 1055 stream.on('resume', () => { 1056 // TODO(ronag): this.resume()? 1057 this.emit('resume'); 1058 }); 1059 1060 // When we try to consume some more bytes, simply unpause the 1061 // underlying stream. 1062 this._read = (n) => { 1063 debug('wrapped _read', n); 1064 if (paused) { 1065 paused = false; 1066 stream.resume(); 1067 } 1068 }; 1069 1070 return this; 1071}; 1072 1073Readable.prototype[SymbolAsyncIterator] = function() { 1074 let stream = this; 1075 1076 if (typeof stream.read !== 'function') { 1077 // v1 stream 1078 const src = stream; 1079 stream = new Readable({ 1080 objectMode: true, 1081 destroy(err, callback) { 1082 destroyImpl.destroyer(src, err); 1083 callback(err); 1084 } 1085 }).wrap(src); 1086 } 1087 1088 const iter = createAsyncIterator(stream); 1089 iter.stream = stream; 1090 return iter; 1091}; 1092 1093async function* createAsyncIterator(stream) { 1094 let callback = nop; 1095 1096 function next(resolve) { 1097 if (this === stream) { 1098 callback(); 1099 callback = nop; 1100 } else { 1101 callback = resolve; 1102 } 1103 } 1104 1105 const state = stream._readableState; 1106 1107 let error = state.errored; 1108 let errorEmitted = state.errorEmitted; 1109 let endEmitted = state.endEmitted; 1110 let closeEmitted = state.closeEmitted; 1111 1112 stream 1113 .on('readable', next) 1114 .on('error', function(err) { 1115 error = err; 1116 errorEmitted = true; 1117 next.call(this); 1118 }) 1119 .on('end', function() { 1120 endEmitted = true; 1121 next.call(this); 1122 }) 1123 .on('close', function() { 1124 closeEmitted = true; 1125 next.call(this); 1126 }); 1127 1128 try { 1129 while (true) { 1130 const chunk = stream.destroyed ? null : stream.read(); 1131 if (chunk !== null) { 1132 yield chunk; 1133 } else if (errorEmitted) { 1134 throw error; 1135 } else if (endEmitted) { 1136 break; 1137 } else if (closeEmitted) { 1138 break; 1139 } else { 1140 await new Promise(next); 1141 } 1142 } 1143 } catch (err) { 1144 destroyImpl.destroyer(stream, err); 1145 throw err; 1146 } finally { 1147 if (state.autoDestroy || !endEmitted) { 1148 // TODO(ronag): ERR_PREMATURE_CLOSE? 1149 destroyImpl.destroyer(stream, null); 1150 } 1151 } 1152} 1153 1154// Making it explicit these properties are not enumerable 1155// because otherwise some prototype manipulation in 1156// userland will fail. 1157ObjectDefineProperties(Readable.prototype, { 1158 readable: { 1159 get() { 1160 const r = this._readableState; 1161 // r.readable === false means that this is part of a Duplex stream 1162 // where the readable side was disabled upon construction. 1163 // Compat. The user might manually disable readable side through 1164 // deprecated setter. 1165 return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted && 1166 !r.endEmitted; 1167 }, 1168 set(val) { 1169 // Backwards compat. 1170 if (this._readableState) { 1171 this._readableState.readable = !!val; 1172 } 1173 } 1174 }, 1175 1176 readableDidRead: { 1177 enumerable: false, 1178 get: function() { 1179 return ( 1180 this._readableState.dataEmitted || 1181 this._readableState.endEmitted || 1182 this._readableState.errorEmitted || 1183 this._readableState.closeEmitted 1184 ); 1185 } 1186 }, 1187 1188 readableHighWaterMark: { 1189 enumerable: false, 1190 get: function() { 1191 return this._readableState.highWaterMark; 1192 } 1193 }, 1194 1195 readableBuffer: { 1196 enumerable: false, 1197 get: function() { 1198 return this._readableState && this._readableState.buffer; 1199 } 1200 }, 1201 1202 readableFlowing: { 1203 enumerable: false, 1204 get: function() { 1205 return this._readableState.flowing; 1206 }, 1207 set: function(state) { 1208 if (this._readableState) { 1209 this._readableState.flowing = state; 1210 } 1211 } 1212 }, 1213 1214 readableLength: { 1215 enumerable: false, 1216 get() { 1217 return this._readableState.length; 1218 } 1219 }, 1220 1221 readableObjectMode: { 1222 enumerable: false, 1223 get() { 1224 return this._readableState ? this._readableState.objectMode : false; 1225 } 1226 }, 1227 1228 readableEncoding: { 1229 enumerable: false, 1230 get() { 1231 return this._readableState ? this._readableState.encoding : null; 1232 } 1233 }, 1234 1235 destroyed: { 1236 enumerable: false, 1237 get() { 1238 if (this._readableState === undefined) { 1239 return false; 1240 } 1241 return this._readableState.destroyed; 1242 }, 1243 set(value) { 1244 // We ignore the value if the stream 1245 // has not been initialized yet. 1246 if (!this._readableState) { 1247 return; 1248 } 1249 1250 // Backward compatibility, the user is explicitly 1251 // managing destroyed. 1252 this._readableState.destroyed = value; 1253 } 1254 }, 1255 1256 readableEnded: { 1257 enumerable: false, 1258 get() { 1259 return this._readableState ? this._readableState.endEmitted : false; 1260 } 1261 }, 1262 1263}); 1264 1265ObjectDefineProperties(ReadableState.prototype, { 1266 // Legacy getter for `pipesCount`. 1267 pipesCount: { 1268 get() { 1269 return this.pipes.length; 1270 } 1271 }, 1272 1273 // Legacy property for `paused`. 1274 paused: { 1275 get() { 1276 return this[kPaused] !== false; 1277 }, 1278 set(value) { 1279 this[kPaused] = !!value; 1280 } 1281 } 1282}); 1283 1284// Exposed for testing purposes only. 1285Readable._fromList = fromList; 1286 1287// Pluck off n bytes from an array of buffers. 1288// Length is the combined lengths of all the buffers in the list. 1289// This function is designed to be inlinable, so please take care when making 1290// changes to the function body. 1291function fromList(n, state) { 1292 // nothing buffered. 1293 if (state.length === 0) 1294 return null; 1295 1296 let ret; 1297 if (state.objectMode) 1298 ret = state.buffer.shift(); 1299 else if (!n || n >= state.length) { 1300 // Read it all, truncate the list. 1301 if (state.decoder) 1302 ret = state.buffer.join(''); 1303 else if (state.buffer.length === 1) 1304 ret = state.buffer.first(); 1305 else 1306 ret = state.buffer.concat(state.length); 1307 state.buffer.clear(); 1308 } else { 1309 // read part of list. 1310 ret = state.buffer.consume(n, state.decoder); 1311 } 1312 1313 return ret; 1314} 1315 1316function endReadable(stream) { 1317 const state = stream._readableState; 1318 1319 debug('endReadable', state.endEmitted); 1320 if (!state.endEmitted) { 1321 state.ended = true; 1322 process.nextTick(endReadableNT, state, stream); 1323 } 1324} 1325 1326function endReadableNT(state, stream) { 1327 debug('endReadableNT', state.endEmitted, state.length); 1328 1329 // Check that we didn't get one last unshift. 1330 if (!state.errorEmitted && !state.closeEmitted && 1331 !state.endEmitted && state.length === 0) { 1332 state.endEmitted = true; 1333 stream.emit('end'); 1334 1335 if (stream.writable && stream.allowHalfOpen === false) { 1336 process.nextTick(endWritableNT, stream); 1337 } else if (state.autoDestroy) { 1338 // In case of duplex streams we need a way to detect 1339 // if the writable side is ready for autoDestroy as well. 1340 const wState = stream._writableState; 1341 const autoDestroy = !wState || ( 1342 wState.autoDestroy && 1343 // We don't expect the writable to ever 'finish' 1344 // if writable is explicitly set to false. 1345 (wState.finished || wState.writable === false) 1346 ); 1347 1348 if (autoDestroy) { 1349 stream.destroy(); 1350 } 1351 } 1352 } 1353} 1354 1355function endWritableNT(stream) { 1356 const writable = stream.writable && !stream.writableEnded && 1357 !stream.destroyed; 1358 if (writable) { 1359 stream.end(); 1360 } 1361} 1362 1363Readable.from = function(iterable, opts) { 1364 if (from === undefined) { 1365 from = require('internal/streams/from'); 1366 } 1367 return from(Readable, iterable, opts); 1368}; 1369