1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const { 25 ArrayPrototypeIndexOf, 26 NumberIsInteger, 27 NumberIsNaN, 28 NumberParseInt, 29 ObjectDefineProperties, 30 ObjectKeys, 31 ObjectSetPrototypeOf, 32 Promise, 33 SafeSet, 34 SymbolAsyncDispose, 35 SymbolAsyncIterator, 36 Symbol, 37} = primordials; 38 39module.exports = Readable; 40Readable.ReadableState = ReadableState; 41 42const EE = require('events'); 43const { Stream, prependListener } = require('internal/streams/legacy'); 44const { Buffer } = require('buffer'); 45 46const { 47 addAbortSignal, 48} = require('internal/streams/add-abort-signal'); 49const eos = require('internal/streams/end-of-stream'); 50 51let debug = require('internal/util/debuglog').debuglog('stream', (fn) => { 52 debug = fn; 53}); 54const BufferList = require('internal/streams/buffer_list'); 55const destroyImpl = require('internal/streams/destroy'); 56const { 57 getHighWaterMark, 58 getDefaultHighWaterMark, 59} = require('internal/streams/state'); 60 61const { 62 aggregateTwoErrors, 63 codes: { 64 ERR_INVALID_ARG_TYPE, 65 ERR_METHOD_NOT_IMPLEMENTED, 66 ERR_OUT_OF_RANGE, 67 ERR_STREAM_PUSH_AFTER_EOF, 68 ERR_STREAM_UNSHIFT_AFTER_END_EVENT, 69 }, 70 AbortError, 71} = require('internal/errors'); 72const { validateObject } = require('internal/validators'); 73 74const kPaused = Symbol('kPaused'); 75 76const { StringDecoder } = require('string_decoder'); 77const from = require('internal/streams/from'); 78 79ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); 80ObjectSetPrototypeOf(Readable, Stream); 81const nop = () => {}; 82 83const { errorOrDestroy } = destroyImpl; 84 85const kObjectMode = 1 << 0; 86const kEnded = 1 << 1; 87const kEndEmitted = 1 << 2; 88const kReading = 1 << 3; 89const kConstructed = 1 << 4; 90const kSync = 1 << 5; 91const kNeedReadable = 1 << 6; 92const kEmittedReadable = 1 << 7; 93const kReadableListening = 1 << 8; 94const kResumeScheduled = 1 << 9; 95const kErrorEmitted = 1 << 10; 96const kEmitClose = 1 << 11; 97const kAutoDestroy = 1 << 12; 98const kDestroyed = 1 << 13; 99const kClosed = 1 << 14; 100const kCloseEmitted = 1 << 15; 101const kMultiAwaitDrain = 1 << 16; 102const kReadingMore = 1 << 17; 103const kDataEmitted = 1 << 18; 104 105// TODO(benjamingr) it is likely slower to do it this way than with free functions 106function makeBitMapDescriptor(bit) { 107 return { 108 enumerable: false, 109 get() { return (this.state & bit) !== 0; }, 110 set(value) { 111 if (value) this.state |= bit; 112 else this.state &= ~bit; 113 }, 114 }; 115} 116ObjectDefineProperties(ReadableState.prototype, { 117 objectMode: makeBitMapDescriptor(kObjectMode), 118 ended: makeBitMapDescriptor(kEnded), 119 endEmitted: makeBitMapDescriptor(kEndEmitted), 120 reading: makeBitMapDescriptor(kReading), 121 // Stream is still being constructed and cannot be 122 // destroyed until construction finished or failed. 123 // Async construction is opt in, therefore we start as 124 // constructed. 125 constructed: makeBitMapDescriptor(kConstructed), 126 // A flag to be able to tell if the event 'readable'/'data' is emitted 127 // immediately, or on a later tick. We set this to true at first, because 128 // any actions that shouldn't happen until "later" should generally also 129 // not happen before the first read call. 130 sync: makeBitMapDescriptor(kSync), 131 // Whenever we return null, then we set a flag to say 132 // that we're awaiting a 'readable' event emission. 133 needReadable: makeBitMapDescriptor(kNeedReadable), 134 emittedReadable: makeBitMapDescriptor(kEmittedReadable), 135 readableListening: makeBitMapDescriptor(kReadableListening), 136 resumeScheduled: makeBitMapDescriptor(kResumeScheduled), 137 // True if the error was already emitted and should not be thrown again. 138 errorEmitted: makeBitMapDescriptor(kErrorEmitted), 139 emitClose: makeBitMapDescriptor(kEmitClose), 140 autoDestroy: makeBitMapDescriptor(kAutoDestroy), 141 // Has it been destroyed. 142 destroyed: makeBitMapDescriptor(kDestroyed), 143 // Indicates whether the stream has finished destroying. 144 closed: makeBitMapDescriptor(kClosed), 145 // True if close has been emitted or would have been emitted 146 // depending on emitClose. 147 closeEmitted: makeBitMapDescriptor(kCloseEmitted), 148 multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain), 149 // If true, a maybeReadMore has been scheduled. 150 readingMore: makeBitMapDescriptor(kReadingMore), 151 dataEmitted: makeBitMapDescriptor(kDataEmitted), 152}); 153 154function ReadableState(options, stream, isDuplex) { 155 // Duplex streams are both readable and writable, but share 156 // the same options object. 157 // However, some cases require setting options to different 158 // values for the readable and the writable sides of the duplex stream. 159 // These options can be provided separately as readableXXX and writableXXX. 160 if (typeof isDuplex !== 'boolean') 161 isDuplex = stream instanceof Stream.Duplex; 162 163 // Bit map field to store ReadableState more effciently with 1 bit per field 164 // instead of a V8 slot per field. 165 this.state = kEmitClose | kAutoDestroy | kConstructed | kSync; 166 // Object stream flag. Used to make read(n) ignore n and to 167 // make all the buffer merging and length checks go away. 168 if (options && options.objectMode) this.state |= kObjectMode; 169 170 if (isDuplex && options && options.readableObjectMode) 171 this.state |= kObjectMode; 172 173 // The point at which it stops calling _read() to fill the buffer 174 // Note: 0 is a valid value, means "don't call _read preemptively ever" 175 this.highWaterMark = options ? 176 getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) : 177 getDefaultHighWaterMark(false); 178 179 // A linked list is used to store data chunks instead of an array because the 180 // linked list can remove elements from the beginning faster than 181 // array.shift(). 182 this.buffer = new BufferList(); 183 this.length = 0; 184 this.pipes = []; 185 this.flowing = null; 186 187 this[kPaused] = null; 188 189 // Should close be emitted on destroy. Defaults to true. 190 if (options && options.emitClose === false) this.state &= ~kEmitClose; 191 192 // Should .destroy() be called after 'end' (and potentially 'finish'). 193 if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy; 194 195 196 // Indicates whether the stream has errored. When true no further 197 // _read calls, 'data' or 'readable' events should occur. This is needed 198 // since when autoDestroy is disabled we need a way to tell whether the 199 // stream has failed. 200 this.errored = null; 201 202 203 // Crypto is kind of old and crusty. Historically, its default string 204 // encoding is 'binary' so we have to make this configurable. 205 // Everything else in the universe uses 'utf8', though. 206 this.defaultEncoding = (options && options.defaultEncoding) || 'utf8'; 207 208 // Ref the piped dest which we need a drain event on it 209 // type: null | Writable | Set<Writable>. 210 this.awaitDrainWriters = null; 211 212 this.decoder = null; 213 this.encoding = null; 214 if (options && options.encoding) { 215 this.decoder = new StringDecoder(options.encoding); 216 this.encoding = options.encoding; 217 } 218} 219 220 221function Readable(options) { 222 if (!(this instanceof Readable)) 223 return new Readable(options); 224 225 // Checking for a Stream.Duplex instance is faster here instead of inside 226 // the ReadableState constructor, at least with V8 6.5. 227 const isDuplex = this instanceof Stream.Duplex; 228 229 this._readableState = new ReadableState(options, this, isDuplex); 230 231 if (options) { 232 if (typeof options.read === 'function') 233 this._read = options.read; 234 235 if (typeof options.destroy === 'function') 236 this._destroy = options.destroy; 237 238 if (typeof options.construct === 'function') 239 this._construct = options.construct; 240 241 if (options.signal && !isDuplex) 242 addAbortSignal(options.signal, this); 243 } 244 245 Stream.call(this, options); 246 247 destroyImpl.construct(this, () => { 248 if (this._readableState.needReadable) { 249 maybeReadMore(this, this._readableState); 250 } 251 }); 252} 253 254Readable.prototype.destroy = destroyImpl.destroy; 255Readable.prototype._undestroy = destroyImpl.undestroy; 256Readable.prototype._destroy = function(err, cb) { 257 cb(err); 258}; 259 260Readable.prototype[EE.captureRejectionSymbol] = function(err) { 261 this.destroy(err); 262}; 263 264Readable.prototype[SymbolAsyncDispose] = function() { 265 let error; 266 if (!this.destroyed) { 267 error = this.readableEnded ? null : new AbortError(); 268 this.destroy(error); 269 } 270 return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null)))); 271}; 272 273// Manually shove something into the read() buffer. 274// This returns true if the highWaterMark has not been hit yet, 275// similar to how Writable.write() returns true if you should 276// write() some more. 277Readable.prototype.push = function(chunk, encoding) { 278 return readableAddChunk(this, chunk, encoding, false); 279}; 280 281// Unshift should *always* be something directly out of read(). 282Readable.prototype.unshift = function(chunk, encoding) { 283 return readableAddChunk(this, chunk, encoding, true); 284}; 285 286function readableAddChunk(stream, chunk, encoding, addToFront) { 287 debug('readableAddChunk', chunk); 288 const state = stream._readableState; 289 290 let err; 291 if ((state.state & kObjectMode) === 0) { 292 if (typeof chunk === 'string') { 293 encoding = encoding || state.defaultEncoding; 294 if (state.encoding !== encoding) { 295 if (addToFront && state.encoding) { 296 // When unshifting, if state.encoding is set, we have to save 297 // the string in the BufferList with the state encoding. 298 chunk = Buffer.from(chunk, encoding).toString(state.encoding); 299 } else { 300 chunk = Buffer.from(chunk, encoding); 301 encoding = ''; 302 } 303 } 304 } else if (chunk instanceof Buffer) { 305 encoding = ''; 306 } else if (Stream._isUint8Array(chunk)) { 307 chunk = Stream._uint8ArrayToBuffer(chunk); 308 encoding = ''; 309 } else if (chunk != null) { 310 err = new ERR_INVALID_ARG_TYPE( 311 'chunk', ['string', 'Buffer', 'Uint8Array'], chunk); 312 } 313 } 314 315 if (err) { 316 errorOrDestroy(stream, err); 317 } else if (chunk === null) { 318 state.state &= ~kReading; 319 onEofChunk(stream, state); 320 } else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) { 321 if (addToFront) { 322 if ((state.state & kEndEmitted) !== 0) 323 errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); 324 else if (state.destroyed || state.errored) 325 return false; 326 else 327 addChunk(stream, state, chunk, true); 328 } else if (state.ended) { 329 errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); 330 } else if (state.destroyed || state.errored) { 331 return false; 332 } else { 333 state.state &= ~kReading; 334 if (state.decoder && !encoding) { 335 chunk = state.decoder.write(chunk); 336 if (state.objectMode || chunk.length !== 0) 337 addChunk(stream, state, chunk, false); 338 else 339 maybeReadMore(stream, state); 340 } else { 341 addChunk(stream, state, chunk, false); 342 } 343 } 344 } else if (!addToFront) { 345 state.state &= ~kReading; 346 maybeReadMore(stream, state); 347 } 348 349 // We can push more data if we are below the highWaterMark. 350 // Also, if we have no data yet, we can stand some more bytes. 351 // This is to work around cases where hwm=0, such as the repl. 352 return !state.ended && 353 (state.length < state.highWaterMark || state.length === 0); 354} 355 356function addChunk(stream, state, chunk, addToFront) { 357 if (state.flowing && state.length === 0 && !state.sync && 358 stream.listenerCount('data') > 0) { 359 // Use the guard to avoid creating `Set()` repeatedly 360 // when we have multiple pipes. 361 if ((state.state & kMultiAwaitDrain) !== 0) { 362 state.awaitDrainWriters.clear(); 363 } else { 364 state.awaitDrainWriters = null; 365 } 366 367 state.dataEmitted = true; 368 stream.emit('data', chunk); 369 } else { 370 // Update the buffer info. 371 state.length += state.objectMode ? 1 : chunk.length; 372 if (addToFront) 373 state.buffer.unshift(chunk); 374 else 375 state.buffer.push(chunk); 376 377 if ((state.state & kNeedReadable) !== 0) 378 emitReadable(stream); 379 } 380 maybeReadMore(stream, state); 381} 382 383Readable.prototype.isPaused = function() { 384 const state = this._readableState; 385 return state[kPaused] === true || state.flowing === false; 386}; 387 388// Backwards compatibility. 389Readable.prototype.setEncoding = function(enc) { 390 const decoder = new StringDecoder(enc); 391 this._readableState.decoder = decoder; 392 // If setEncoding(null), decoder.encoding equals utf8. 393 this._readableState.encoding = this._readableState.decoder.encoding; 394 395 const buffer = this._readableState.buffer; 396 // Iterate over current buffer to convert already stored Buffers: 397 let content = ''; 398 for (const data of buffer) { 399 content += decoder.write(data); 400 } 401 buffer.clear(); 402 if (content !== '') 403 buffer.push(content); 404 this._readableState.length = content.length; 405 return this; 406}; 407 408// Don't raise the hwm > 1GB. 409const MAX_HWM = 0x40000000; 410function computeNewHighWaterMark(n) { 411 if (n > MAX_HWM) { 412 throw new ERR_OUT_OF_RANGE('size', '<= 1GiB', n); 413 } else { 414 // Get the next highest power of 2 to prevent increasing hwm excessively in 415 // tiny amounts. 416 n--; 417 n |= n >>> 1; 418 n |= n >>> 2; 419 n |= n >>> 4; 420 n |= n >>> 8; 421 n |= n >>> 16; 422 n++; 423 } 424 return n; 425} 426 427// This function is designed to be inlinable, so please take care when making 428// changes to the function body. 429function howMuchToRead(n, state) { 430 if (n <= 0 || (state.length === 0 && state.ended)) 431 return 0; 432 if ((state.state & kObjectMode) !== 0) 433 return 1; 434 if (NumberIsNaN(n)) { 435 // Only flow one buffer at a time. 436 if (state.flowing && state.length) 437 return state.buffer.first().length; 438 return state.length; 439 } 440 if (n <= state.length) 441 return n; 442 return state.ended ? state.length : 0; 443} 444 445// You can override either this method, or the async _read(n) below. 446Readable.prototype.read = function(n) { 447 debug('read', n); 448 // Same as parseInt(undefined, 10), however V8 7.3 performance regressed 449 // in this scenario, so we are doing it manually. 450 if (n === undefined) { 451 n = NaN; 452 } else if (!NumberIsInteger(n)) { 453 n = NumberParseInt(n, 10); 454 } 455 const state = this._readableState; 456 const nOrig = n; 457 458 // If we're asking for more than the current hwm, then raise the hwm. 459 if (n > state.highWaterMark) 460 state.highWaterMark = computeNewHighWaterMark(n); 461 462 if (n !== 0) 463 state.state &= ~kEmittedReadable; 464 465 // If we're doing read(0) to trigger a readable event, but we 466 // already have a bunch of data in the buffer, then just trigger 467 // the 'readable' event and move on. 468 if (n === 0 && 469 state.needReadable && 470 ((state.highWaterMark !== 0 ? 471 state.length >= state.highWaterMark : 472 state.length > 0) || 473 state.ended)) { 474 debug('read: emitReadable', state.length, state.ended); 475 if (state.length === 0 && state.ended) 476 endReadable(this); 477 else 478 emitReadable(this); 479 return null; 480 } 481 482 n = howMuchToRead(n, state); 483 484 // If we've ended, and we're now clear, then finish it up. 485 if (n === 0 && state.ended) { 486 if (state.length === 0) 487 endReadable(this); 488 return null; 489 } 490 491 // All the actual chunk generation logic needs to be 492 // *below* the call to _read. The reason is that in certain 493 // synthetic stream cases, such as passthrough streams, _read 494 // may be a completely synchronous operation which may change 495 // the state of the read buffer, providing enough data when 496 // before there was *not* enough. 497 // 498 // So, the steps are: 499 // 1. Figure out what the state of things will be after we do 500 // a read from the buffer. 501 // 502 // 2. If that resulting state will trigger a _read, then call _read. 503 // Note that this may be asynchronous, or synchronous. Yes, it is 504 // deeply ugly to write APIs this way, but that still doesn't mean 505 // that the Readable class should behave improperly, as streams are 506 // designed to be sync/async agnostic. 507 // Take note if the _read call is sync or async (ie, if the read call 508 // has returned yet), so that we know whether or not it's safe to emit 509 // 'readable' etc. 510 // 511 // 3. Actually pull the requested chunks out of the buffer and return. 512 513 // if we need a readable event, then we need to do some reading. 514 let doRead = (state.state & kNeedReadable) !== 0; 515 debug('need readable', doRead); 516 517 // If we currently have less than the highWaterMark, then also read some. 518 if (state.length === 0 || state.length - n < state.highWaterMark) { 519 doRead = true; 520 debug('length less than watermark', doRead); 521 } 522 523 // However, if we've ended, then there's no point, if we're already 524 // reading, then it's unnecessary, if we're constructing we have to wait, 525 // and if we're destroyed or errored, then it's not allowed, 526 if (state.ended || state.reading || state.destroyed || state.errored || 527 !state.constructed) { 528 doRead = false; 529 debug('reading, ended or constructing', doRead); 530 } else if (doRead) { 531 debug('do read'); 532 state.state |= kReading | kSync; 533 // If the length is currently zero, then we *need* a readable event. 534 if (state.length === 0) 535 state.state |= kNeedReadable; 536 537 // Call internal read method 538 try { 539 this._read(state.highWaterMark); 540 } catch (err) { 541 errorOrDestroy(this, err); 542 } 543 state.state &= ~kSync; 544 545 // If _read pushed data synchronously, then `reading` will be false, 546 // and we need to re-evaluate how much data we can return to the user. 547 if (!state.reading) 548 n = howMuchToRead(nOrig, state); 549 } 550 551 let ret; 552 if (n > 0) 553 ret = fromList(n, state); 554 else 555 ret = null; 556 557 if (ret === null) { 558 state.needReadable = state.length <= state.highWaterMark; 559 n = 0; 560 } else { 561 state.length -= n; 562 if (state.multiAwaitDrain) { 563 state.awaitDrainWriters.clear(); 564 } else { 565 state.awaitDrainWriters = null; 566 } 567 } 568 569 if (state.length === 0) { 570 // If we have nothing in the buffer, then we want to know 571 // as soon as we *do* get something into the buffer. 572 if (!state.ended) 573 state.needReadable = true; 574 575 // If we tried to read() past the EOF, then emit end on the next tick. 576 if (nOrig !== n && state.ended) 577 endReadable(this); 578 } 579 580 if (ret !== null && !state.errorEmitted && !state.closeEmitted) { 581 state.dataEmitted = true; 582 this.emit('data', ret); 583 } 584 585 return ret; 586}; 587 588function onEofChunk(stream, state) { 589 debug('onEofChunk'); 590 if (state.ended) return; 591 if (state.decoder) { 592 const chunk = state.decoder.end(); 593 if (chunk && chunk.length) { 594 state.buffer.push(chunk); 595 state.length += state.objectMode ? 1 : chunk.length; 596 } 597 } 598 state.ended = true; 599 600 if (state.sync) { 601 // If we are sync, wait until next tick to emit the data. 602 // Otherwise we risk emitting data in the flow() 603 // the readable code triggers during a read() call. 604 emitReadable(stream); 605 } else { 606 // Emit 'readable' now to make sure it gets picked up. 607 state.needReadable = false; 608 state.emittedReadable = true; 609 // We have to emit readable now that we are EOF. Modules 610 // in the ecosystem (e.g. dicer) rely on this event being sync. 611 emitReadable_(stream); 612 } 613} 614 615// Don't emit readable right away in sync mode, because this can trigger 616// another read() call => stack overflow. This way, it might trigger 617// a nextTick recursion warning, but that's not so bad. 618function emitReadable(stream) { 619 const state = stream._readableState; 620 debug('emitReadable', state.needReadable, state.emittedReadable); 621 state.needReadable = false; 622 if (!state.emittedReadable) { 623 debug('emitReadable', state.flowing); 624 state.emittedReadable = true; 625 process.nextTick(emitReadable_, stream); 626 } 627} 628 629function emitReadable_(stream) { 630 const state = stream._readableState; 631 debug('emitReadable_', state.destroyed, state.length, state.ended); 632 if (!state.destroyed && !state.errored && (state.length || state.ended)) { 633 stream.emit('readable'); 634 state.emittedReadable = false; 635 } 636 637 // The stream needs another readable event if: 638 // 1. It is not flowing, as the flow mechanism will take 639 // care of it. 640 // 2. It is not ended. 641 // 3. It is below the highWaterMark, so we can schedule 642 // another readable later. 643 state.needReadable = 644 !state.flowing && 645 !state.ended && 646 state.length <= state.highWaterMark; 647 flow(stream); 648} 649 650 651// At this point, the user has presumably seen the 'readable' event, 652// and called read() to consume some data. that may have triggered 653// in turn another _read(n) call, in which case reading = true if 654// it's in progress. 655// However, if we're not ended, or reading, and the length < hwm, 656// then go ahead and try to read some more preemptively. 657function maybeReadMore(stream, state) { 658 if (!state.readingMore && state.constructed) { 659 state.readingMore = true; 660 process.nextTick(maybeReadMore_, stream, state); 661 } 662} 663 664function maybeReadMore_(stream, state) { 665 // Attempt to read more data if we should. 666 // 667 // The conditions for reading more data are (one of): 668 // - Not enough data buffered (state.length < state.highWaterMark). The loop 669 // is responsible for filling the buffer with enough data if such data 670 // is available. If highWaterMark is 0 and we are not in the flowing mode 671 // we should _not_ attempt to buffer any extra data. We'll get more data 672 // when the stream consumer calls read() instead. 673 // - No data in the buffer, and the stream is in flowing mode. In this mode 674 // the loop below is responsible for ensuring read() is called. Failing to 675 // call read here would abort the flow and there's no other mechanism for 676 // continuing the flow if the stream consumer has just subscribed to the 677 // 'data' event. 678 // 679 // In addition to the above conditions to keep reading data, the following 680 // conditions prevent the data from being read: 681 // - The stream has ended (state.ended). 682 // - There is already a pending 'read' operation (state.reading). This is a 683 // case where the stream has called the implementation defined _read() 684 // method, but they are processing the call asynchronously and have _not_ 685 // called push() with new data. In this case we skip performing more 686 // read()s. The execution ends in this method again after the _read() ends 687 // up calling push() with more data. 688 while (!state.reading && !state.ended && 689 (state.length < state.highWaterMark || 690 (state.flowing && state.length === 0))) { 691 const len = state.length; 692 debug('maybeReadMore read 0'); 693 stream.read(0); 694 if (len === state.length) 695 // Didn't get any data, stop spinning. 696 break; 697 } 698 state.readingMore = false; 699} 700 701// Abstract method. to be overridden in specific implementation classes. 702// call cb(er, data) where data is <= n in length. 703// for virtual (non-string, non-buffer) streams, "length" is somewhat 704// arbitrary, and perhaps not very meaningful. 705Readable.prototype._read = function(n) { 706 throw new ERR_METHOD_NOT_IMPLEMENTED('_read()'); 707}; 708 709Readable.prototype.pipe = function(dest, pipeOpts) { 710 const src = this; 711 const state = this._readableState; 712 713 if (state.pipes.length === 1) { 714 if (!state.multiAwaitDrain) { 715 state.multiAwaitDrain = true; 716 state.awaitDrainWriters = new SafeSet( 717 state.awaitDrainWriters ? [state.awaitDrainWriters] : [], 718 ); 719 } 720 } 721 722 state.pipes.push(dest); 723 debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts); 724 725 const doEnd = (!pipeOpts || pipeOpts.end !== false) && 726 dest !== process.stdout && 727 dest !== process.stderr; 728 729 const endFn = doEnd ? onend : unpipe; 730 if (state.endEmitted) 731 process.nextTick(endFn); 732 else 733 src.once('end', endFn); 734 735 dest.on('unpipe', onunpipe); 736 function onunpipe(readable, unpipeInfo) { 737 debug('onunpipe'); 738 if (readable === src) { 739 if (unpipeInfo && unpipeInfo.hasUnpiped === false) { 740 unpipeInfo.hasUnpiped = true; 741 cleanup(); 742 } 743 } 744 } 745 746 function onend() { 747 debug('onend'); 748 dest.end(); 749 } 750 751 let ondrain; 752 753 let cleanedUp = false; 754 function cleanup() { 755 debug('cleanup'); 756 // Cleanup event handlers once the pipe is broken. 757 dest.removeListener('close', onclose); 758 dest.removeListener('finish', onfinish); 759 if (ondrain) { 760 dest.removeListener('drain', ondrain); 761 } 762 dest.removeListener('error', onerror); 763 dest.removeListener('unpipe', onunpipe); 764 src.removeListener('end', onend); 765 src.removeListener('end', unpipe); 766 src.removeListener('data', ondata); 767 768 cleanedUp = true; 769 770 // If the reader is waiting for a drain event from this 771 // specific writer, then it would cause it to never start 772 // flowing again. 773 // So, if this is awaiting a drain, then we just call it now. 774 // If we don't know, then assume that we are waiting for one. 775 if (ondrain && state.awaitDrainWriters && 776 (!dest._writableState || dest._writableState.needDrain)) 777 ondrain(); 778 } 779 780 function pause() { 781 // If the user unpiped during `dest.write()`, it is possible 782 // to get stuck in a permanently paused state if that write 783 // also returned false. 784 // => Check whether `dest` is still a piping destination. 785 if (!cleanedUp) { 786 if (state.pipes.length === 1 && state.pipes[0] === dest) { 787 debug('false write response, pause', 0); 788 state.awaitDrainWriters = dest; 789 state.multiAwaitDrain = false; 790 } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { 791 debug('false write response, pause', state.awaitDrainWriters.size); 792 state.awaitDrainWriters.add(dest); 793 } 794 src.pause(); 795 } 796 if (!ondrain) { 797 // When the dest drains, it reduces the awaitDrain counter 798 // on the source. This would be more elegant with a .once() 799 // handler in flow(), but adding and removing repeatedly is 800 // too slow. 801 ondrain = pipeOnDrain(src, dest); 802 dest.on('drain', ondrain); 803 } 804 } 805 806 src.on('data', ondata); 807 function ondata(chunk) { 808 debug('ondata'); 809 const ret = dest.write(chunk); 810 debug('dest.write', ret); 811 if (ret === false) { 812 pause(); 813 } 814 } 815 816 // If the dest has an error, then stop piping into it. 817 // However, don't suppress the throwing behavior for this. 818 function onerror(er) { 819 debug('onerror', er); 820 unpipe(); 821 dest.removeListener('error', onerror); 822 if (dest.listenerCount('error') === 0) { 823 const s = dest._writableState || dest._readableState; 824 if (s && !s.errorEmitted) { 825 // User incorrectly emitted 'error' directly on the stream. 826 errorOrDestroy(dest, er); 827 } else { 828 dest.emit('error', er); 829 } 830 } 831 } 832 833 // Make sure our error handler is attached before userland ones. 834 prependListener(dest, 'error', onerror); 835 836 // Both close and finish should trigger unpipe, but only once. 837 function onclose() { 838 dest.removeListener('finish', onfinish); 839 unpipe(); 840 } 841 dest.once('close', onclose); 842 function onfinish() { 843 debug('onfinish'); 844 dest.removeListener('close', onclose); 845 unpipe(); 846 } 847 dest.once('finish', onfinish); 848 849 function unpipe() { 850 debug('unpipe'); 851 src.unpipe(dest); 852 } 853 854 // Tell the dest that it's being piped to. 855 dest.emit('pipe', src); 856 857 // Start the flow if it hasn't been started already. 858 859 if (dest.writableNeedDrain === true) { 860 pause(); 861 } else if (!state.flowing) { 862 debug('pipe resume'); 863 src.resume(); 864 } 865 866 return dest; 867}; 868 869function pipeOnDrain(src, dest) { 870 return function pipeOnDrainFunctionResult() { 871 const state = src._readableState; 872 873 // `ondrain` will call directly, 874 // `this` maybe not a reference to dest, 875 // so we use the real dest here. 876 if (state.awaitDrainWriters === dest) { 877 debug('pipeOnDrain', 1); 878 state.awaitDrainWriters = null; 879 } else if (state.multiAwaitDrain) { 880 debug('pipeOnDrain', state.awaitDrainWriters.size); 881 state.awaitDrainWriters.delete(dest); 882 } 883 884 if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && 885 src.listenerCount('data')) { 886 src.resume(); 887 } 888 }; 889} 890 891 892Readable.prototype.unpipe = function(dest) { 893 const state = this._readableState; 894 const unpipeInfo = { hasUnpiped: false }; 895 896 // If we're not piping anywhere, then do nothing. 897 if (state.pipes.length === 0) 898 return this; 899 900 if (!dest) { 901 // remove all. 902 const dests = state.pipes; 903 state.pipes = []; 904 this.pause(); 905 906 for (let i = 0; i < dests.length; i++) 907 dests[i].emit('unpipe', this, { hasUnpiped: false }); 908 return this; 909 } 910 911 // Try to find the right one. 912 const index = ArrayPrototypeIndexOf(state.pipes, dest); 913 if (index === -1) 914 return this; 915 916 state.pipes.splice(index, 1); 917 if (state.pipes.length === 0) 918 this.pause(); 919 920 dest.emit('unpipe', this, unpipeInfo); 921 922 return this; 923}; 924 925// Set up data events if they are asked for 926// Ensure readable listeners eventually get something. 927Readable.prototype.on = function(ev, fn) { 928 const res = Stream.prototype.on.call(this, ev, fn); 929 const state = this._readableState; 930 931 if (ev === 'data') { 932 // Update readableListening so that resume() may be a no-op 933 // a few lines down. This is needed to support once('readable'). 934 state.readableListening = this.listenerCount('readable') > 0; 935 936 // Try start flowing on next tick if stream isn't explicitly paused. 937 if (state.flowing !== false) 938 this.resume(); 939 } else if (ev === 'readable') { 940 if (!state.endEmitted && !state.readableListening) { 941 state.readableListening = state.needReadable = true; 942 state.flowing = false; 943 state.emittedReadable = false; 944 debug('on readable', state.length, state.reading); 945 if (state.length) { 946 emitReadable(this); 947 } else if (!state.reading) { 948 process.nextTick(nReadingNextTick, this); 949 } 950 } 951 } 952 953 return res; 954}; 955Readable.prototype.addListener = Readable.prototype.on; 956 957Readable.prototype.removeListener = function(ev, fn) { 958 const res = Stream.prototype.removeListener.call(this, 959 ev, fn); 960 961 if (ev === 'readable') { 962 // We need to check if there is someone still listening to 963 // readable and reset the state. However this needs to happen 964 // after readable has been emitted but before I/O (nextTick) to 965 // support once('readable', fn) cycles. This means that calling 966 // resume within the same tick will have no 967 // effect. 968 process.nextTick(updateReadableListening, this); 969 } 970 971 return res; 972}; 973Readable.prototype.off = Readable.prototype.removeListener; 974 975Readable.prototype.removeAllListeners = function(ev) { 976 const res = Stream.prototype.removeAllListeners.apply(this, 977 arguments); 978 979 if (ev === 'readable' || ev === undefined) { 980 // We need to check if there is someone still listening to 981 // readable and reset the state. However this needs to happen 982 // after readable has been emitted but before I/O (nextTick) to 983 // support once('readable', fn) cycles. This means that calling 984 // resume within the same tick will have no 985 // effect. 986 process.nextTick(updateReadableListening, this); 987 } 988 989 return res; 990}; 991 992function updateReadableListening(self) { 993 const state = self._readableState; 994 state.readableListening = self.listenerCount('readable') > 0; 995 996 if (state.resumeScheduled && state[kPaused] === false) { 997 // Flowing needs to be set to true now, otherwise 998 // the upcoming resume will not flow. 999 state.flowing = true; 1000 1001 // Crude way to check if we should resume. 1002 } else if (self.listenerCount('data') > 0) { 1003 self.resume(); 1004 } else if (!state.readableListening) { 1005 state.flowing = null; 1006 } 1007} 1008 1009function nReadingNextTick(self) { 1010 debug('readable nexttick read 0'); 1011 self.read(0); 1012} 1013 1014// pause() and resume() are remnants of the legacy readable stream API 1015// If the user uses them, then switch into old mode. 1016Readable.prototype.resume = function() { 1017 const state = this._readableState; 1018 if (!state.flowing) { 1019 debug('resume'); 1020 // We flow only if there is no one listening 1021 // for readable, but we still have to call 1022 // resume(). 1023 state.flowing = !state.readableListening; 1024 resume(this, state); 1025 } 1026 state[kPaused] = false; 1027 return this; 1028}; 1029 1030function resume(stream, state) { 1031 if (!state.resumeScheduled) { 1032 state.resumeScheduled = true; 1033 process.nextTick(resume_, stream, state); 1034 } 1035} 1036 1037function resume_(stream, state) { 1038 debug('resume', state.reading); 1039 if (!state.reading) { 1040 stream.read(0); 1041 } 1042 1043 state.resumeScheduled = false; 1044 stream.emit('resume'); 1045 flow(stream); 1046 if (state.flowing && !state.reading) 1047 stream.read(0); 1048} 1049 1050Readable.prototype.pause = function() { 1051 debug('call pause flowing=%j', this._readableState.flowing); 1052 if (this._readableState.flowing !== false) { 1053 debug('pause'); 1054 this._readableState.flowing = false; 1055 this.emit('pause'); 1056 } 1057 this._readableState[kPaused] = true; 1058 return this; 1059}; 1060 1061function flow(stream) { 1062 const state = stream._readableState; 1063 debug('flow', state.flowing); 1064 while (state.flowing && stream.read() !== null); 1065} 1066 1067// Wrap an old-style stream as the async data source. 1068// This is *not* part of the readable stream interface. 1069// It is an ugly unfortunate mess of history. 1070Readable.prototype.wrap = function(stream) { 1071 let paused = false; 1072 1073 // TODO (ronag): Should this.destroy(err) emit 1074 // 'error' on the wrapped stream? Would require 1075 // a static factory method, e.g. Readable.wrap(stream). 1076 1077 stream.on('data', (chunk) => { 1078 if (!this.push(chunk) && stream.pause) { 1079 paused = true; 1080 stream.pause(); 1081 } 1082 }); 1083 1084 stream.on('end', () => { 1085 this.push(null); 1086 }); 1087 1088 stream.on('error', (err) => { 1089 errorOrDestroy(this, err); 1090 }); 1091 1092 stream.on('close', () => { 1093 this.destroy(); 1094 }); 1095 1096 stream.on('destroy', () => { 1097 this.destroy(); 1098 }); 1099 1100 this._read = () => { 1101 if (paused && stream.resume) { 1102 paused = false; 1103 stream.resume(); 1104 } 1105 }; 1106 1107 // Proxy all the other methods. Important when wrapping filters and duplexes. 1108 const streamKeys = ObjectKeys(stream); 1109 for (let j = 1; j < streamKeys.length; j++) { 1110 const i = streamKeys[j]; 1111 if (this[i] === undefined && typeof stream[i] === 'function') { 1112 this[i] = stream[i].bind(stream); 1113 } 1114 } 1115 1116 return this; 1117}; 1118 1119Readable.prototype[SymbolAsyncIterator] = function() { 1120 return streamToAsyncIterator(this); 1121}; 1122 1123Readable.prototype.iterator = function(options) { 1124 if (options !== undefined) { 1125 validateObject(options, 'options'); 1126 } 1127 return streamToAsyncIterator(this, options); 1128}; 1129 1130function streamToAsyncIterator(stream, options) { 1131 if (typeof stream.read !== 'function') { 1132 stream = Readable.wrap(stream, { objectMode: true }); 1133 } 1134 1135 const iter = createAsyncIterator(stream, options); 1136 iter.stream = stream; 1137 return iter; 1138} 1139 1140async function* createAsyncIterator(stream, options) { 1141 let callback = nop; 1142 1143 function next(resolve) { 1144 if (this === stream) { 1145 callback(); 1146 callback = nop; 1147 } else { 1148 callback = resolve; 1149 } 1150 } 1151 1152 stream.on('readable', next); 1153 1154 let error; 1155 const cleanup = eos(stream, { writable: false }, (err) => { 1156 error = err ? aggregateTwoErrors(error, err) : null; 1157 callback(); 1158 callback = nop; 1159 }); 1160 1161 try { 1162 while (true) { 1163 const chunk = stream.destroyed ? null : stream.read(); 1164 if (chunk !== null) { 1165 yield chunk; 1166 } else if (error) { 1167 throw error; 1168 } else if (error === null) { 1169 return; 1170 } else { 1171 await new Promise(next); 1172 } 1173 } 1174 } catch (err) { 1175 error = aggregateTwoErrors(error, err); 1176 throw error; 1177 } finally { 1178 if ( 1179 (error || options?.destroyOnReturn !== false) && 1180 (error === undefined || stream._readableState.autoDestroy) 1181 ) { 1182 destroyImpl.destroyer(stream, null); 1183 } else { 1184 stream.off('readable', next); 1185 cleanup(); 1186 } 1187 } 1188} 1189 1190// Making it explicit these properties are not enumerable 1191// because otherwise some prototype manipulation in 1192// userland will fail. 1193ObjectDefineProperties(Readable.prototype, { 1194 readable: { 1195 __proto__: null, 1196 get() { 1197 const r = this._readableState; 1198 // r.readable === false means that this is part of a Duplex stream 1199 // where the readable side was disabled upon construction. 1200 // Compat. The user might manually disable readable side through 1201 // deprecated setter. 1202 return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted && 1203 !r.endEmitted; 1204 }, 1205 set(val) { 1206 // Backwards compat. 1207 if (this._readableState) { 1208 this._readableState.readable = !!val; 1209 } 1210 }, 1211 }, 1212 1213 readableDidRead: { 1214 __proto__: null, 1215 enumerable: false, 1216 get: function() { 1217 return this._readableState.dataEmitted; 1218 }, 1219 }, 1220 1221 readableAborted: { 1222 __proto__: null, 1223 enumerable: false, 1224 get: function() { 1225 return !!( 1226 this._readableState.readable !== false && 1227 (this._readableState.destroyed || this._readableState.errored) && 1228 !this._readableState.endEmitted 1229 ); 1230 }, 1231 }, 1232 1233 readableHighWaterMark: { 1234 __proto__: null, 1235 enumerable: false, 1236 get: function() { 1237 return this._readableState.highWaterMark; 1238 }, 1239 }, 1240 1241 readableBuffer: { 1242 __proto__: null, 1243 enumerable: false, 1244 get: function() { 1245 return this._readableState && this._readableState.buffer; 1246 }, 1247 }, 1248 1249 readableFlowing: { 1250 __proto__: null, 1251 enumerable: false, 1252 get: function() { 1253 return this._readableState.flowing; 1254 }, 1255 set: function(state) { 1256 if (this._readableState) { 1257 this._readableState.flowing = state; 1258 } 1259 }, 1260 }, 1261 1262 readableLength: { 1263 __proto__: null, 1264 enumerable: false, 1265 get() { 1266 return this._readableState.length; 1267 }, 1268 }, 1269 1270 readableObjectMode: { 1271 __proto__: null, 1272 enumerable: false, 1273 get() { 1274 return this._readableState ? this._readableState.objectMode : false; 1275 }, 1276 }, 1277 1278 readableEncoding: { 1279 __proto__: null, 1280 enumerable: false, 1281 get() { 1282 return this._readableState ? this._readableState.encoding : null; 1283 }, 1284 }, 1285 1286 errored: { 1287 __proto__: null, 1288 enumerable: false, 1289 get() { 1290 return this._readableState ? this._readableState.errored : null; 1291 }, 1292 }, 1293 1294 closed: { 1295 __proto__: null, 1296 get() { 1297 return this._readableState ? this._readableState.closed : false; 1298 }, 1299 }, 1300 1301 destroyed: { 1302 __proto__: null, 1303 enumerable: false, 1304 get() { 1305 return this._readableState ? this._readableState.destroyed : false; 1306 }, 1307 set(value) { 1308 // We ignore the value if the stream 1309 // has not been initialized yet. 1310 if (!this._readableState) { 1311 return; 1312 } 1313 1314 // Backward compatibility, the user is explicitly 1315 // managing destroyed. 1316 this._readableState.destroyed = value; 1317 }, 1318 }, 1319 1320 readableEnded: { 1321 __proto__: null, 1322 enumerable: false, 1323 get() { 1324 return this._readableState ? this._readableState.endEmitted : false; 1325 }, 1326 }, 1327 1328}); 1329 1330ObjectDefineProperties(ReadableState.prototype, { 1331 // Legacy getter for `pipesCount`. 1332 pipesCount: { 1333 __proto__: null, 1334 get() { 1335 return this.pipes.length; 1336 }, 1337 }, 1338 1339 // Legacy property for `paused`. 1340 paused: { 1341 __proto__: null, 1342 get() { 1343 return this[kPaused] !== false; 1344 }, 1345 set(value) { 1346 this[kPaused] = !!value; 1347 }, 1348 }, 1349}); 1350 1351// Exposed for testing purposes only. 1352Readable._fromList = fromList; 1353 1354// Pluck off n bytes from an array of buffers. 1355// Length is the combined lengths of all the buffers in the list. 1356// This function is designed to be inlinable, so please take care when making 1357// changes to the function body. 1358function fromList(n, state) { 1359 // nothing buffered. 1360 if (state.length === 0) 1361 return null; 1362 1363 let ret; 1364 if (state.objectMode) 1365 ret = state.buffer.shift(); 1366 else if (!n || n >= state.length) { 1367 // Read it all, truncate the list. 1368 if (state.decoder) 1369 ret = state.buffer.join(''); 1370 else if (state.buffer.length === 1) 1371 ret = state.buffer.first(); 1372 else 1373 ret = state.buffer.concat(state.length); 1374 state.buffer.clear(); 1375 } else { 1376 // read part of list. 1377 ret = state.buffer.consume(n, state.decoder); 1378 } 1379 1380 return ret; 1381} 1382 1383function endReadable(stream) { 1384 const state = stream._readableState; 1385 1386 debug('endReadable', state.endEmitted); 1387 if (!state.endEmitted) { 1388 state.ended = true; 1389 process.nextTick(endReadableNT, state, stream); 1390 } 1391} 1392 1393function endReadableNT(state, stream) { 1394 debug('endReadableNT', state.endEmitted, state.length); 1395 1396 // Check that we didn't get one last unshift. 1397 if (!state.errored && !state.closeEmitted && 1398 !state.endEmitted && state.length === 0) { 1399 state.endEmitted = true; 1400 stream.emit('end'); 1401 1402 if (stream.writable && stream.allowHalfOpen === false) { 1403 process.nextTick(endWritableNT, stream); 1404 } else if (state.autoDestroy) { 1405 // In case of duplex streams we need a way to detect 1406 // if the writable side is ready for autoDestroy as well. 1407 const wState = stream._writableState; 1408 const autoDestroy = !wState || ( 1409 wState.autoDestroy && 1410 // We don't expect the writable to ever 'finish' 1411 // if writable is explicitly set to false. 1412 (wState.finished || wState.writable === false) 1413 ); 1414 1415 if (autoDestroy) { 1416 stream.destroy(); 1417 } 1418 } 1419 } 1420} 1421 1422function endWritableNT(stream) { 1423 const writable = stream.writable && !stream.writableEnded && 1424 !stream.destroyed; 1425 if (writable) { 1426 stream.end(); 1427 } 1428} 1429 1430Readable.from = function(iterable, opts) { 1431 return from(Readable, iterable, opts); 1432}; 1433 1434let webStreamsAdapters; 1435 1436// Lazy to avoid circular references 1437function lazyWebStreams() { 1438 if (webStreamsAdapters === undefined) 1439 webStreamsAdapters = require('internal/webstreams/adapters'); 1440 return webStreamsAdapters; 1441} 1442 1443Readable.fromWeb = function(readableStream, options) { 1444 return lazyWebStreams().newStreamReadableFromReadableStream( 1445 readableStream, 1446 options); 1447}; 1448 1449Readable.toWeb = function(streamReadable, options) { 1450 return lazyWebStreams().newReadableStreamFromStreamReadable( 1451 streamReadable, 1452 options); 1453}; 1454 1455Readable.wrap = function(src, options) { 1456 return new Readable({ 1457 objectMode: src.readableObjectMode ?? src.objectMode ?? true, 1458 ...options, 1459 destroy(err, callback) { 1460 destroyImpl.destroyer(src, err); 1461 callback(err); 1462 }, 1463 }).wrap(src); 1464}; 1465