1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const { 25 ArrayIsArray, 26 NumberIsInteger, 27 NumberIsNaN, 28 ObjectDefineProperties, 29 ObjectSetPrototypeOf, 30 Set, 31 SymbolAsyncIterator, 32 Symbol 33} = primordials; 34 35module.exports = Readable; 36Readable.ReadableState = ReadableState; 37 38const EE = require('events'); 39const Stream = require('stream'); 40const { Buffer } = require('buffer'); 41 42let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { 43 debug = fn; 44}); 45const BufferList = require('internal/streams/buffer_list'); 46const destroyImpl = require('internal/streams/destroy'); 47const { 48 getHighWaterMark, 49 getDefaultHighWaterMark 50} = require('internal/streams/state'); 51const { 52 ERR_INVALID_ARG_TYPE, 53 ERR_STREAM_PUSH_AFTER_EOF, 54 ERR_METHOD_NOT_IMPLEMENTED, 55 ERR_STREAM_UNSHIFT_AFTER_END_EVENT 56} = require('internal/errors').codes; 57 58const kPaused = Symbol('kPaused'); 59 60// Lazy loaded to improve the startup performance. 61let StringDecoder; 62let createReadableStreamAsyncIterator; 63let from; 64 65ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); 66ObjectSetPrototypeOf(Readable, Stream); 67 68const { errorOrDestroy } = destroyImpl; 69 70function prependListener(emitter, event, fn) { 71 // Sadly this is not cacheable as some libraries bundle their own 72 // event emitter implementation with them. 73 if (typeof emitter.prependListener === 'function') 74 return emitter.prependListener(event, fn); 75 76 // This is a hack to make sure that our error handler is attached before any 77 // userland ones. NEVER DO THIS. This is here only because this code needs 78 // to continue to work with older versions of Node.js that do not include 79 // the prependListener() method. The goal is to eventually remove this hack. 80 if (!emitter._events || !emitter._events[event]) 81 emitter.on(event, fn); 82 else if (ArrayIsArray(emitter._events[event])) 83 emitter._events[event].unshift(fn); 84 else 85 emitter._events[event] = [fn, emitter._events[event]]; 86} 87 88function ReadableState(options, stream, isDuplex) { 89 // Duplex streams are both readable and writable, but share 90 // the same options object. 91 // However, some cases require setting options to different 92 // values for the readable and the writable sides of the duplex stream. 93 // These options can be provided separately as readableXXX and writableXXX. 94 if (typeof isDuplex !== 'boolean') 95 isDuplex = stream instanceof Stream.Duplex; 96 97 // Object stream flag. Used to make read(n) ignore n and to 98 // make all the buffer merging and length checks go away 99 this.objectMode = !!(options && options.objectMode); 100 101 if (isDuplex) 102 this.objectMode = this.objectMode || 103 !!(options && options.readableObjectMode); 104 105 // The point at which it stops calling _read() to fill the buffer 106 // Note: 0 is a valid value, means "don't call _read preemptively ever" 107 this.highWaterMark = options ? 108 getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) : 109 getDefaultHighWaterMark(false); 110 111 // A linked list is used to store data chunks instead of an array because the 112 // linked list can remove elements from the beginning faster than 113 // array.shift() 114 this.buffer = new BufferList(); 115 this.length = 0; 116 this.pipes = null; 117 this.pipesCount = 0; 118 this.flowing = null; 119 this.ended = false; 120 this.endEmitted = false; 121 this.reading = false; 122 123 // A flag to be able to tell if the event 'readable'/'data' is emitted 124 // immediately, or on a later tick. We set this to true at first, because 125 // any actions that shouldn't happen until "later" should generally also 126 // not happen before the first read call. 127 this.sync = true; 128 129 // Whenever we return null, then we set a flag to say 130 // that we're awaiting a 'readable' event emission. 131 this.needReadable = false; 132 this.emittedReadable = false; 133 this.readableListening = false; 134 this.resumeScheduled = false; 135 this[kPaused] = null; 136 137 // Should close be emitted on destroy. Defaults to true. 138 this.emitClose = !options || options.emitClose !== false; 139 140 // Should .destroy() be called after 'end' (and potentially 'finish') 141 this.autoDestroy = !!(options && options.autoDestroy); 142 143 // Has it been destroyed 144 this.destroyed = false; 145 146 // Crypto is kind of old and crusty. Historically, its default string 147 // encoding is 'binary' so we have to make this configurable. 148 // Everything else in the universe uses 'utf8', though. 149 this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; 150 151 // Ref the piped dest which we need a drain event on it 152 // type: null | Writable | Set<Writable> 153 this.awaitDrainWriters = null; 154 this.multiAwaitDrain = false; 155 156 // If true, a maybeReadMore has been scheduled 157 this.readingMore = false; 158 159 this.decoder = null; 160 this.encoding = null; 161 if (options && options.encoding) { 162 if (!StringDecoder) 163 StringDecoder = require('string_decoder').StringDecoder; 164 this.decoder = new StringDecoder(options.encoding); 165 this.encoding = options.encoding; 166 } 167} 168 169 170function Readable(options) { 171 if (!(this instanceof Readable)) 172 return new Readable(options); 173 174 // Checking for a Stream.Duplex instance is faster here instead of inside 175 // the ReadableState constructor, at least with V8 6.5 176 const isDuplex = this instanceof Stream.Duplex; 177 178 this._readableState = new ReadableState(options, this, isDuplex); 179 180 // legacy 181 this.readable = true; 182 183 if (options) { 184 if (typeof options.read === 'function') 185 this._read = options.read; 186 187 if (typeof options.destroy === 'function') 188 this._destroy = options.destroy; 189 } 190 191 Stream.call(this, options); 192} 193 194Readable.prototype.destroy = destroyImpl.destroy; 195Readable.prototype._undestroy = destroyImpl.undestroy; 196Readable.prototype._destroy = function(err, cb) { 197 cb(err); 198}; 199 200Readable.prototype[EE.captureRejectionSymbol] = function(err) { 201 // TODO(mcollina): remove the destroyed if once errorEmitted lands in 202 // Readable. 203 if (!this.destroyed) { 204 this.destroy(err); 205 } 206}; 207 208// Manually shove something into the read() buffer. 209// This returns true if the highWaterMark has not been hit yet, 210// similar to how Writable.write() returns true if you should 211// write() some more. 212Readable.prototype.push = function(chunk, encoding) { 213 return readableAddChunk(this, chunk, encoding, false); 214}; 215 216// Unshift should *always* be something directly out of read() 217Readable.prototype.unshift = function(chunk, encoding) { 218 return readableAddChunk(this, chunk, encoding, true); 219}; 220 221function readableAddChunk(stream, chunk, encoding, addToFront) { 222 debug('readableAddChunk', chunk); 223 const state = stream._readableState; 224 225 let err; 226 if (!state.objectMode) { 227 if (typeof chunk === 'string') { 228 encoding = encoding || state.defaultEncoding; 229 if (addToFront && state.encoding && state.encoding !== encoding) { 230 // When unshifting, if state.encoding is set, we have to save 231 // the string in the BufferList with the state encoding 232 chunk = Buffer.from(chunk, encoding).toString(state.encoding); 233 } else if (encoding !== state.encoding) { 234 chunk = Buffer.from(chunk, encoding); 235 encoding = ''; 236 } 237 } else if (chunk instanceof Buffer) { 238 encoding = ''; 239 } else if (Stream._isUint8Array(chunk)) { 240 chunk = Stream._uint8ArrayToBuffer(chunk); 241 encoding = ''; 242 } else if (chunk != null) { 243 err = new ERR_INVALID_ARG_TYPE( 244 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); 245 } 246 } 247 248 if (err) { 249 errorOrDestroy(stream, err); 250 } else if (chunk === null) { 251 state.reading = false; 252 onEofChunk(stream, state); 253 } else if (state.objectMode || (chunk && chunk.length > 0)) { 254 if (addToFront) { 255 if (state.endEmitted) 256 errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); 257 else 258 addChunk(stream, state, chunk, true); 259 } else if (state.ended) { 260 errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); 261 } else if (state.destroyed) { 262 return false; 263 } else { 264 state.reading = false; 265 if (state.decoder && !encoding) { 266 chunk = state.decoder.write(chunk); 267 if (state.objectMode || chunk.length !== 0) 268 addChunk(stream, state, chunk, false); 269 else 270 maybeReadMore(stream, state); 271 } else { 272 addChunk(stream, state, chunk, false); 273 } 274 } 275 } else if (!addToFront) { 276 state.reading = false; 277 maybeReadMore(stream, state); 278 } 279 280 // We can push more data if we are below the highWaterMark. 281 // Also, if we have no data yet, we can stand some more bytes. 282 // This is to work around cases where hwm=0, such as the repl. 283 return !state.ended && 284 (state.length < state.highWaterMark || state.length === 0); 285} 286 287function addChunk(stream, state, chunk, addToFront) { 288 if (state.flowing && state.length === 0 && !state.sync && 289 stream.listenerCount('data') > 0) { 290 // Use the guard to avoid creating `Set()` repeatedly 291 // when we have multiple pipes. 292 if (state.multiAwaitDrain) { 293 state.awaitDrainWriters.clear(); 294 } else { 295 state.awaitDrainWriters = null; 296 } 297 stream.emit('data', chunk); 298 } else { 299 // Update the buffer info. 300 state.length += state.objectMode ? 1 : chunk.length; 301 if (addToFront) 302 state.buffer.unshift(chunk); 303 else 304 state.buffer.push(chunk); 305 306 if (state.needReadable) 307 emitReadable(stream); 308 } 309 maybeReadMore(stream, state); 310} 311 312Readable.prototype.isPaused = function() { 313 const state = this._readableState; 314 return state[kPaused] === true || state.flowing === false; 315}; 316 317// Backwards compatibility. 318Readable.prototype.setEncoding = function(enc) { 319 if (!StringDecoder) 320 StringDecoder = require('string_decoder').StringDecoder; 321 const decoder = new StringDecoder(enc); 322 this._readableState.decoder = decoder; 323 // If setEncoding(null), decoder.encoding equals utf8 324 this._readableState.encoding = this._readableState.decoder.encoding; 325 326 const buffer = this._readableState.buffer; 327 // Iterate over current buffer to convert already stored Buffers: 328 let content = ''; 329 for (const data of buffer) { 330 content += decoder.write(data); 331 } 332 buffer.clear(); 333 if (content !== '') 334 buffer.push(content); 335 this._readableState.length = content.length; 336 return this; 337}; 338 339// Don't raise the hwm > 1GB 340const MAX_HWM = 0x40000000; 341function computeNewHighWaterMark(n) { 342 if (n >= MAX_HWM) { 343 // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE. 344 n = MAX_HWM; 345 } else { 346 // Get the next highest power of 2 to prevent increasing hwm excessively in 347 // tiny amounts 348 n--; 349 n |= n >>> 1; 350 n |= n >>> 2; 351 n |= n >>> 4; 352 n |= n >>> 8; 353 n |= n >>> 16; 354 n++; 355 } 356 return n; 357} 358 359// This function is designed to be inlinable, so please take care when making 360// changes to the function body. 361function howMuchToRead(n, state) { 362 if (n <= 0 || (state.length === 0 && state.ended)) 363 return 0; 364 if (state.objectMode) 365 return 1; 366 if (NumberIsNaN(n)) { 367 // Only flow one buffer at a time 368 if (state.flowing && state.length) 369 return state.buffer.first().length; 370 return state.length; 371 } 372 if (n <= state.length) 373 return n; 374 return state.ended ? state.length : 0; 375} 376 377// You can override either this method, or the async _read(n) below. 378Readable.prototype.read = function(n) { 379 debug('read', n); 380 // Same as parseInt(undefined, 10), however V8 7.3 performance regressed 381 // in this scenario, so we are doing it manually. 382 if (n === undefined) { 383 n = NaN; 384 } else if (!NumberIsInteger(n)) { 385 n = parseInt(n, 10); 386 } 387 const state = this._readableState; 388 const nOrig = n; 389 390 // If we're asking for more than the current hwm, then raise the hwm. 391 if (n > state.highWaterMark) 392 state.highWaterMark = computeNewHighWaterMark(n); 393 394 if (n !== 0) 395 state.emittedReadable = false; 396 397 // If we're doing read(0) to trigger a readable event, but we 398 // already have a bunch of data in the buffer, then just trigger 399 // the 'readable' event and move on. 400 if (n === 0 && 401 state.needReadable && 402 ((state.highWaterMark !== 0 ? 403 state.length >= state.highWaterMark : 404 state.length > 0) || 405 state.ended)) { 406 debug('read: emitReadable', state.length, state.ended); 407 if (state.length === 0 && state.ended) 408 endReadable(this); 409 else 410 emitReadable(this); 411 return null; 412 } 413 414 n = howMuchToRead(n, state); 415 416 // If we've ended, and we're now clear, then finish it up. 417 if (n === 0 && state.ended) { 418 if (state.length === 0) 419 endReadable(this); 420 return null; 421 } 422 423 // All the actual chunk generation logic needs to be 424 // *below* the call to _read. The reason is that in certain 425 // synthetic stream cases, such as passthrough streams, _read 426 // may be a completely synchronous operation which may change 427 // the state of the read buffer, providing enough data when 428 // before there was *not* enough. 429 // 430 // So, the steps are: 431 // 1. Figure out what the state of things will be after we do 432 // a read from the buffer. 433 // 434 // 2. If that resulting state will trigger a _read, then call _read. 435 // Note that this may be asynchronous, or synchronous. Yes, it is 436 // deeply ugly to write APIs this way, but that still doesn't mean 437 // that the Readable class should behave improperly, as streams are 438 // designed to be sync/async agnostic. 439 // Take note if the _read call is sync or async (ie, if the read call 440 // has returned yet), so that we know whether or not it's safe to emit 441 // 'readable' etc. 442 // 443 // 3. Actually pull the requested chunks out of the buffer and return. 444 445 // if we need a readable event, then we need to do some reading. 446 var doRead = state.needReadable; 447 debug('need readable', doRead); 448 449 // If we currently have less than the highWaterMark, then also read some 450 if (state.length === 0 || state.length - n < state.highWaterMark) { 451 doRead = true; 452 debug('length less than watermark', doRead); 453 } 454 455 // However, if we've ended, then there's no point, if we're already 456 // reading, then it's unnecessary, and if we're destroyed, then it's 457 // not allowed. 458 if (state.ended || state.reading || state.destroyed) { 459 doRead = false; 460 debug('reading or ended', doRead); 461 } else if (doRead) { 462 debug('do read'); 463 state.reading = true; 464 state.sync = true; 465 // If the length is currently zero, then we *need* a readable event. 466 if (state.length === 0) 467 state.needReadable = true; 468 // Call internal read method 469 this._read(state.highWaterMark); 470 state.sync = false; 471 // If _read pushed data synchronously, then `reading` will be false, 472 // and we need to re-evaluate how much data we can return to the user. 473 if (!state.reading) 474 n = howMuchToRead(nOrig, state); 475 } 476 477 var ret; 478 if (n > 0) 479 ret = fromList(n, state); 480 else 481 ret = null; 482 483 if (ret === null) { 484 state.needReadable = state.length <= state.highWaterMark; 485 n = 0; 486 } else { 487 state.length -= n; 488 if (state.multiAwaitDrain) { 489 state.awaitDrainWriters.clear(); 490 } else { 491 state.awaitDrainWriters = null; 492 } 493 } 494 495 if (state.length === 0) { 496 // If we have nothing in the buffer, then we want to know 497 // as soon as we *do* get something into the buffer. 498 if (!state.ended) 499 state.needReadable = true; 500 501 // If we tried to read() past the EOF, then emit end on the next tick. 502 if (nOrig !== n && state.ended) 503 endReadable(this); 504 } 505 506 if (ret !== null) 507 this.emit('data', ret); 508 509 return ret; 510}; 511 512function onEofChunk(stream, state) { 513 debug('onEofChunk'); 514 if (state.ended) return; 515 if (state.decoder) { 516 var chunk = state.decoder.end(); 517 if (chunk && chunk.length) { 518 state.buffer.push(chunk); 519 state.length += state.objectMode ? 1 : chunk.length; 520 } 521 } 522 state.ended = true; 523 524 if (state.sync) { 525 // If we are sync, wait until next tick to emit the data. 526 // Otherwise we risk emitting data in the flow() 527 // the readable code triggers during a read() call 528 emitReadable(stream); 529 } else { 530 // Emit 'readable' now to make sure it gets picked up. 531 state.needReadable = false; 532 state.emittedReadable = true; 533 // We have to emit readable now that we are EOF. Modules 534 // in the ecosystem (e.g. dicer) rely on this event being sync. 535 emitReadable_(stream); 536 } 537} 538 539// Don't emit readable right away in sync mode, because this can trigger 540// another read() call => stack overflow. This way, it might trigger 541// a nextTick recursion warning, but that's not so bad. 542function emitReadable(stream) { 543 const state = stream._readableState; 544 debug('emitReadable', state.needReadable, state.emittedReadable); 545 state.needReadable = false; 546 if (!state.emittedReadable) { 547 debug('emitReadable', state.flowing); 548 state.emittedReadable = true; 549 process.nextTick(emitReadable_, stream); 550 } 551} 552 553function emitReadable_(stream) { 554 const state = stream._readableState; 555 debug('emitReadable_', state.destroyed, state.length, state.ended); 556 if (!state.destroyed && (state.length || state.ended)) { 557 stream.emit('readable'); 558 state.emittedReadable = false; 559 } 560 561 // The stream needs another readable event if 562 // 1. It is not flowing, as the flow mechanism will take 563 // care of it. 564 // 2. It is not ended. 565 // 3. It is below the highWaterMark, so we can schedule 566 // another readable later. 567 state.needReadable = 568 !state.flowing && 569 !state.ended && 570 state.length <= state.highWaterMark; 571 flow(stream); 572} 573 574 575// At this point, the user has presumably seen the 'readable' event, 576// and called read() to consume some data. that may have triggered 577// in turn another _read(n) call, in which case reading = true if 578// it's in progress. 579// However, if we're not ended, or reading, and the length < hwm, 580// then go ahead and try to read some more preemptively. 581function maybeReadMore(stream, state) { 582 if (!state.readingMore) { 583 state.readingMore = true; 584 process.nextTick(maybeReadMore_, stream, state); 585 } 586} 587 588function maybeReadMore_(stream, state) { 589 // Attempt to read more data if we should. 590 // 591 // The conditions for reading more data are (one of): 592 // - Not enough data buffered (state.length < state.highWaterMark). The loop 593 // is responsible for filling the buffer with enough data if such data 594 // is available. If highWaterMark is 0 and we are not in the flowing mode 595 // we should _not_ attempt to buffer any extra data. We'll get more data 596 // when the stream consumer calls read() instead. 597 // - No data in the buffer, and the stream is in flowing mode. In this mode 598 // the loop below is responsible for ensuring read() is called. Failing to 599 // call read here would abort the flow and there's no other mechanism for 600 // continuing the flow if the stream consumer has just subscribed to the 601 // 'data' event. 602 // 603 // In addition to the above conditions to keep reading data, the following 604 // conditions prevent the data from being read: 605 // - The stream has ended (state.ended). 606 // - There is already a pending 'read' operation (state.reading). This is a 607 // case where the the stream has called the implementation defined _read() 608 // method, but they are processing the call asynchronously and have _not_ 609 // called push() with new data. In this case we skip performing more 610 // read()s. The execution ends in this method again after the _read() ends 611 // up calling push() with more data. 612 while (!state.reading && !state.ended && 613 (state.length < state.highWaterMark || 614 (state.flowing && state.length === 0))) { 615 const len = state.length; 616 debug('maybeReadMore read 0'); 617 stream.read(0); 618 if (len === state.length) 619 // Didn't get any data, stop spinning. 620 break; 621 } 622 state.readingMore = false; 623} 624 625// Abstract method. to be overridden in specific implementation classes. 626// call cb(er, data) where data is <= n in length. 627// for virtual (non-string, non-buffer) streams, "length" is somewhat 628// arbitrary, and perhaps not very meaningful. 629Readable.prototype._read = function(n) { 630 errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()')); 631}; 632 633Readable.prototype.pipe = function(dest, pipeOpts) { 634 const src = this; 635 const state = this._readableState; 636 637 if (state.pipesCount === 1) { 638 if (!state.multiAwaitDrain) { 639 state.multiAwaitDrain = true; 640 state.awaitDrainWriters = new Set( 641 state.awaitDrainWriters ? [state.awaitDrainWriters] : [] 642 ); 643 } 644 } 645 646 switch (state.pipesCount) { 647 case 0: 648 state.pipes = dest; 649 break; 650 case 1: 651 state.pipes = [state.pipes, dest]; 652 break; 653 default: 654 state.pipes.push(dest); 655 break; 656 } 657 state.pipesCount += 1; 658 debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts); 659 660 const doEnd = (!pipeOpts || pipeOpts.end !== false) && 661 dest !== process.stdout && 662 dest !== process.stderr; 663 664 const endFn = doEnd ? onend : unpipe; 665 if (state.endEmitted) 666 process.nextTick(endFn); 667 else 668 src.once('end', endFn); 669 670 dest.on('unpipe', onunpipe); 671 function onunpipe(readable, unpipeInfo) { 672 debug('onunpipe'); 673 if (readable === src) { 674 if (unpipeInfo && unpipeInfo.hasUnpiped === false) { 675 unpipeInfo.hasUnpiped = true; 676 cleanup(); 677 } 678 } 679 } 680 681 function onend() { 682 debug('onend'); 683 dest.end(); 684 } 685 686 let ondrain; 687 688 var cleanedUp = false; 689 function cleanup() { 690 debug('cleanup'); 691 // Cleanup event handlers once the pipe is broken 692 dest.removeListener('close', onclose); 693 dest.removeListener('finish', onfinish); 694 if (ondrain) { 695 dest.removeListener('drain', ondrain); 696 } 697 dest.removeListener('error', onerror); 698 dest.removeListener('unpipe', onunpipe); 699 src.removeListener('end', onend); 700 src.removeListener('end', unpipe); 701 src.removeListener('data', ondata); 702 703 cleanedUp = true; 704 705 // If the reader is waiting for a drain event from this 706 // specific writer, then it would cause it to never start 707 // flowing again. 708 // So, if this is awaiting a drain, then we just call it now. 709 // If we don't know, then assume that we are waiting for one. 710 if (ondrain && state.awaitDrainWriters && 711 (!dest._writableState || dest._writableState.needDrain)) 712 ondrain(); 713 } 714 715 src.on('data', ondata); 716 function ondata(chunk) { 717 debug('ondata'); 718 const ret = dest.write(chunk); 719 debug('dest.write', ret); 720 if (ret === false) { 721 // If the user unpiped during `dest.write()`, it is possible 722 // to get stuck in a permanently paused state if that write 723 // also returned false. 724 // => Check whether `dest` is still a piping destination. 725 if (!cleanedUp) { 726 if (state.pipesCount === 1 && state.pipes === dest) { 727 debug('false write response, pause', 0); 728 state.awaitDrainWriters = dest; 729 state.multiAwaitDrain = false; 730 } else if (state.pipesCount > 1 && state.pipes.includes(dest)) { 731 debug('false write response, pause', state.awaitDrainWriters.size); 732 state.awaitDrainWriters.add(dest); 733 } 734 src.pause(); 735 } 736 if (!ondrain) { 737 // When the dest drains, it reduces the awaitDrain counter 738 // on the source. This would be more elegant with a .once() 739 // handler in flow(), but adding and removing repeatedly is 740 // too slow. 741 ondrain = pipeOnDrain(src, dest); 742 dest.on('drain', ondrain); 743 } 744 } 745 } 746 747 // If the dest has an error, then stop piping into it. 748 // However, don't suppress the throwing behavior for this. 749 function onerror(er) { 750 debug('onerror', er); 751 unpipe(); 752 dest.removeListener('error', onerror); 753 if (EE.listenerCount(dest, 'error') === 0) 754 errorOrDestroy(dest, er); 755 } 756 757 // Make sure our error handler is attached before userland ones. 758 prependListener(dest, 'error', onerror); 759 760 // Both close and finish should trigger unpipe, but only once. 761 function onclose() { 762 dest.removeListener('finish', onfinish); 763 unpipe(); 764 } 765 dest.once('close', onclose); 766 function onfinish() { 767 debug('onfinish'); 768 dest.removeListener('close', onclose); 769 unpipe(); 770 } 771 dest.once('finish', onfinish); 772 773 function unpipe() { 774 debug('unpipe'); 775 src.unpipe(dest); 776 } 777 778 // Tell the dest that it's being piped to 779 dest.emit('pipe', src); 780 781 // Start the flow if it hasn't been started already. 782 if (!state.flowing) { 783 debug('pipe resume'); 784 src.resume(); 785 } 786 787 return dest; 788}; 789 790function pipeOnDrain(src, dest) { 791 return function pipeOnDrainFunctionResult() { 792 const state = src._readableState; 793 794 // `ondrain` will call directly, 795 // `this` maybe not a reference to dest, 796 // so we use the real dest here. 797 if (state.awaitDrainWriters === dest) { 798 debug('pipeOnDrain', 1); 799 state.awaitDrainWriters = null; 800 } else if (state.multiAwaitDrain) { 801 debug('pipeOnDrain', state.awaitDrainWriters.size); 802 state.awaitDrainWriters.delete(dest); 803 } 804 805 if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && 806 EE.listenerCount(src, 'data')) { 807 state.flowing = true; 808 flow(src); 809 } 810 }; 811} 812 813 814Readable.prototype.unpipe = function(dest) { 815 const state = this._readableState; 816 const unpipeInfo = { hasUnpiped: false }; 817 818 // If we're not piping anywhere, then do nothing. 819 if (state.pipesCount === 0) 820 return this; 821 822 // Just one destination. most common case. 823 if (state.pipesCount === 1) { 824 // Passed in one, but it's not the right one. 825 if (dest && dest !== state.pipes) 826 return this; 827 828 if (!dest) 829 dest = state.pipes; 830 831 // got a match. 832 state.pipes = null; 833 state.pipesCount = 0; 834 state.flowing = false; 835 if (dest) 836 dest.emit('unpipe', this, unpipeInfo); 837 return this; 838 } 839 840 // Slow case with multiple pipe destinations. 841 842 if (!dest) { 843 // remove all. 844 var dests = state.pipes; 845 var len = state.pipesCount; 846 state.pipes = null; 847 state.pipesCount = 0; 848 state.flowing = false; 849 850 for (var i = 0; i < len; i++) 851 dests[i].emit('unpipe', this, { hasUnpiped: false }); 852 return this; 853 } 854 855 // Try to find the right one. 856 const index = state.pipes.indexOf(dest); 857 if (index === -1) 858 return this; 859 860 state.pipes.splice(index, 1); 861 state.pipesCount -= 1; 862 if (state.pipesCount === 1) 863 state.pipes = state.pipes[0]; 864 865 dest.emit('unpipe', this, unpipeInfo); 866 867 return this; 868}; 869 870// Set up data events if they are asked for 871// Ensure readable listeners eventually get something 872Readable.prototype.on = function(ev, fn) { 873 const res = Stream.prototype.on.call(this, ev, fn); 874 const state = this._readableState; 875 876 if (ev === 'data') { 877 // Update readableListening so that resume() may be a no-op 878 // a few lines down. This is needed to support once('readable'). 879 state.readableListening = this.listenerCount('readable') > 0; 880 881 // Try start flowing on next tick if stream isn't explicitly paused 882 if (state.flowing !== false) 883 this.resume(); 884 } else if (ev === 'readable') { 885 if (!state.endEmitted && !state.readableListening) { 886 state.readableListening = state.needReadable = true; 887 state.flowing = false; 888 state.emittedReadable = false; 889 debug('on readable', state.length, state.reading); 890 if (state.length) { 891 emitReadable(this); 892 } else if (!state.reading) { 893 process.nextTick(nReadingNextTick, this); 894 } 895 } 896 } 897 898 return res; 899}; 900Readable.prototype.addListener = Readable.prototype.on; 901 902Readable.prototype.removeListener = function(ev, fn) { 903 const res = Stream.prototype.removeListener.call(this, ev, fn); 904 905 if (ev === 'readable') { 906 // We need to check if there is someone still listening to 907 // readable and reset the state. However this needs to happen 908 // after readable has been emitted but before I/O (nextTick) to 909 // support once('readable', fn) cycles. This means that calling 910 // resume within the same tick will have no 911 // effect. 912 process.nextTick(updateReadableListening, this); 913 } 914 915 return res; 916}; 917Readable.prototype.off = Readable.prototype.removeListener; 918 919Readable.prototype.removeAllListeners = function(ev) { 920 const res = Stream.prototype.removeAllListeners.apply(this, arguments); 921 922 if (ev === 'readable' || ev === undefined) { 923 // We need to check if there is someone still listening to 924 // readable and reset the state. However this needs to happen 925 // after readable has been emitted but before I/O (nextTick) to 926 // support once('readable', fn) cycles. This means that calling 927 // resume within the same tick will have no 928 // effect. 929 process.nextTick(updateReadableListening, this); 930 } 931 932 return res; 933}; 934 935function updateReadableListening(self) { 936 const state = self._readableState; 937 state.readableListening = self.listenerCount('readable') > 0; 938 939 if (state.resumeScheduled && state[kPaused] === false) { 940 // Flowing needs to be set to true now, otherwise 941 // the upcoming resume will not flow. 942 state.flowing = true; 943 944 // Crude way to check if we should resume 945 } else if (self.listenerCount('data') > 0) { 946 self.resume(); 947 } else if (!state.readableListening) { 948 state.flowing = null; 949 } 950} 951 952function nReadingNextTick(self) { 953 debug('readable nexttick read 0'); 954 self.read(0); 955} 956 957// pause() and resume() are remnants of the legacy readable stream API 958// If the user uses them, then switch into old mode. 959Readable.prototype.resume = function() { 960 const state = this._readableState; 961 if (!state.flowing) { 962 debug('resume'); 963 // We flow only if there is no one listening 964 // for readable, but we still have to call 965 // resume() 966 state.flowing = !state.readableListening; 967 resume(this, state); 968 } 969 state[kPaused] = false; 970 return this; 971}; 972 973function resume(stream, state) { 974 if (!state.resumeScheduled) { 975 state.resumeScheduled = true; 976 process.nextTick(resume_, stream, state); 977 } 978} 979 980function resume_(stream, state) { 981 debug('resume', state.reading); 982 if (!state.reading) { 983 stream.read(0); 984 } 985 986 state.resumeScheduled = false; 987 stream.emit('resume'); 988 flow(stream); 989 if (state.flowing && !state.reading) 990 stream.read(0); 991} 992 993Readable.prototype.pause = function() { 994 debug('call pause flowing=%j', this._readableState.flowing); 995 if (this._readableState.flowing !== false) { 996 debug('pause'); 997 this._readableState.flowing = false; 998 this.emit('pause'); 999 } 1000 this._readableState[kPaused] = true; 1001 return this; 1002}; 1003 1004function flow(stream) { 1005 const state = stream._readableState; 1006 debug('flow', state.flowing); 1007 while (state.flowing && stream.read() !== null); 1008} 1009 1010// Wrap an old-style stream as the async data source. 1011// This is *not* part of the readable stream interface. 1012// It is an ugly unfortunate mess of history. 1013Readable.prototype.wrap = function(stream) { 1014 const state = this._readableState; 1015 var paused = false; 1016 1017 stream.on('end', () => { 1018 debug('wrapped end'); 1019 if (state.decoder && !state.ended) { 1020 var chunk = state.decoder.end(); 1021 if (chunk && chunk.length) 1022 this.push(chunk); 1023 } 1024 1025 this.push(null); 1026 }); 1027 1028 stream.on('data', (chunk) => { 1029 debug('wrapped data'); 1030 if (state.decoder) 1031 chunk = state.decoder.write(chunk); 1032 1033 // Don't skip over falsy values in objectMode 1034 if (state.objectMode && (chunk === null || chunk === undefined)) 1035 return; 1036 else if (!state.objectMode && (!chunk || !chunk.length)) 1037 return; 1038 1039 const ret = this.push(chunk); 1040 if (!ret) { 1041 paused = true; 1042 stream.pause(); 1043 } 1044 }); 1045 1046 // Proxy all the other methods. Important when wrapping filters and duplexes. 1047 for (const i in stream) { 1048 if (this[i] === undefined && typeof stream[i] === 'function') { 1049 this[i] = function methodWrap(method) { 1050 return function methodWrapReturnFunction() { 1051 return stream[method].apply(stream, arguments); 1052 }; 1053 }(i); 1054 } 1055 } 1056 1057 stream.on('error', (err) => { 1058 errorOrDestroy(this, err); 1059 }); 1060 1061 stream.on('close', () => { 1062 // TODO(ronag): Update readable state? 1063 this.emit('close'); 1064 }); 1065 1066 stream.on('destroy', () => { 1067 // TODO(ronag): this.destroy()? 1068 this.emit('destroy'); 1069 }); 1070 1071 stream.on('pause', () => { 1072 // TODO(ronag): this.pause()? 1073 this.emit('pause'); 1074 }); 1075 1076 stream.on('resume', () => { 1077 // TODO(ronag): this.resume()? 1078 this.emit('resume'); 1079 }); 1080 1081 // When we try to consume some more bytes, simply unpause the 1082 // underlying stream. 1083 this._read = (n) => { 1084 debug('wrapped _read', n); 1085 if (paused) { 1086 paused = false; 1087 stream.resume(); 1088 } 1089 }; 1090 1091 return this; 1092}; 1093 1094Readable.prototype[SymbolAsyncIterator] = function() { 1095 if (createReadableStreamAsyncIterator === undefined) { 1096 createReadableStreamAsyncIterator = 1097 require('internal/streams/async_iterator'); 1098 } 1099 return createReadableStreamAsyncIterator(this); 1100}; 1101 1102// Making it explicit these properties are not enumerable 1103// because otherwise some prototype manipulation in 1104// userland will fail 1105ObjectDefineProperties(Readable.prototype, { 1106 1107 readableHighWaterMark: { 1108 enumerable: false, 1109 get: function() { 1110 return this._readableState.highWaterMark; 1111 } 1112 }, 1113 1114 readableBuffer: { 1115 enumerable: false, 1116 get: function() { 1117 return this._readableState && this._readableState.buffer; 1118 } 1119 }, 1120 1121 readableFlowing: { 1122 enumerable: false, 1123 get: function() { 1124 return this._readableState.flowing; 1125 }, 1126 set: function(state) { 1127 if (this._readableState) { 1128 this._readableState.flowing = state; 1129 } 1130 } 1131 }, 1132 1133 readableLength: { 1134 enumerable: false, 1135 get() { 1136 return this._readableState.length; 1137 } 1138 }, 1139 1140 readableObjectMode: { 1141 enumerable: false, 1142 get() { 1143 return this._readableState ? this._readableState.objectMode : false; 1144 } 1145 }, 1146 1147 readableEncoding: { 1148 enumerable: false, 1149 get() { 1150 return this._readableState ? this._readableState.encoding : null; 1151 } 1152 }, 1153 1154 destroyed: { 1155 enumerable: false, 1156 get() { 1157 if (this._readableState === undefined) { 1158 return false; 1159 } 1160 return this._readableState.destroyed; 1161 }, 1162 set(value) { 1163 // We ignore the value if the stream 1164 // has not been initialized yet 1165 if (!this._readableState) { 1166 return; 1167 } 1168 1169 // Backward compatibility, the user is explicitly 1170 // managing destroyed 1171 this._readableState.destroyed = value; 1172 } 1173 }, 1174 1175 readableEnded: { 1176 enumerable: false, 1177 get() { 1178 return this._readableState ? this._readableState.endEmitted : false; 1179 } 1180 }, 1181 1182 paused: { 1183 get() { 1184 return this[kPaused] !== false; 1185 }, 1186 set(value) { 1187 this[kPaused] = !!value; 1188 } 1189 } 1190}); 1191 1192// Exposed for testing purposes only. 1193Readable._fromList = fromList; 1194 1195// Pluck off n bytes from an array of buffers. 1196// Length is the combined lengths of all the buffers in the list. 1197// This function is designed to be inlinable, so please take care when making 1198// changes to the function body. 1199function fromList(n, state) { 1200 // nothing buffered 1201 if (state.length === 0) 1202 return null; 1203 1204 var ret; 1205 if (state.objectMode) 1206 ret = state.buffer.shift(); 1207 else if (!n || n >= state.length) { 1208 // Read it all, truncate the list 1209 if (state.decoder) 1210 ret = state.buffer.join(''); 1211 else if (state.buffer.length === 1) 1212 ret = state.buffer.first(); 1213 else 1214 ret = state.buffer.concat(state.length); 1215 state.buffer.clear(); 1216 } else { 1217 // read part of list 1218 ret = state.buffer.consume(n, state.decoder); 1219 } 1220 1221 return ret; 1222} 1223 1224function endReadable(stream) { 1225 const state = stream._readableState; 1226 1227 debug('endReadable', state.endEmitted); 1228 if (!state.endEmitted) { 1229 state.ended = true; 1230 process.nextTick(endReadableNT, state, stream); 1231 } 1232} 1233 1234function endReadableNT(state, stream) { 1235 debug('endReadableNT', state.endEmitted, state.length); 1236 1237 // Check that we didn't get one last unshift. 1238 if (!state.endEmitted && state.length === 0) { 1239 state.endEmitted = true; 1240 stream.readable = false; 1241 stream.emit('end'); 1242 1243 if (state.autoDestroy) { 1244 // In case of duplex streams we need a way to detect 1245 // if the writable side is ready for autoDestroy as well 1246 const wState = stream._writableState; 1247 if (!wState || (wState.autoDestroy && wState.finished)) { 1248 stream.destroy(); 1249 } 1250 } 1251 } 1252} 1253 1254Readable.from = function(iterable, opts) { 1255 if (from === undefined) { 1256 from = require('internal/streams/from'); 1257 } 1258 return from(Readable, iterable, opts); 1259}; 1260