1/* replacement start */ 2 3const process = require('process/') 4 5/* replacement end */ 6// Copyright Joyent, Inc. and other Node contributors. 7// 8// Permission is hereby granted, free of charge, to any person obtaining a 9// copy of this software and associated documentation files (the 10// "Software"), to deal in the Software without restriction, including 11// without limitation the rights to use, copy, modify, merge, publish, 12// distribute, sublicense, and/or sell copies of the Software, and to permit 13// persons to whom the Software is furnished to do so, subject to the 14// following conditions: 15// 16// The above copyright notice and this permission notice shall be included 17// in all copies or substantial portions of the Software. 18// 19// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 20// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 21// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 22// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 23// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 24// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 25// USE OR OTHER DEALINGS IN THE SOFTWARE. 26 27;('use strict') 28const { 29 ArrayPrototypeIndexOf, 30 NumberIsInteger, 31 NumberIsNaN, 32 NumberParseInt, 33 ObjectDefineProperties, 34 ObjectKeys, 35 ObjectSetPrototypeOf, 36 Promise, 37 SafeSet, 38 SymbolAsyncIterator, 39 Symbol 40} = require('../../ours/primordials') 41module.exports = Readable 42Readable.ReadableState = ReadableState 43const { EventEmitter: EE } = require('events') 44const { Stream, prependListener } = require('./legacy') 45const { Buffer } = require('buffer') 46const { addAbortSignal } = require('./add-abort-signal') 47const eos = require('./end-of-stream') 48let debug = require('../../ours/util').debuglog('stream', (fn) => { 49 debug = fn 50}) 51const BufferList = require('./buffer_list') 52const destroyImpl = require('./destroy') 53const { getHighWaterMark, getDefaultHighWaterMark } = require('./state') 54const { 55 aggregateTwoErrors, 56 codes: { 57 ERR_INVALID_ARG_TYPE, 58 ERR_METHOD_NOT_IMPLEMENTED, 59 ERR_OUT_OF_RANGE, 60 ERR_STREAM_PUSH_AFTER_EOF, 61 ERR_STREAM_UNSHIFT_AFTER_END_EVENT 62 } 63} = require('../../ours/errors') 64const { validateObject } = require('../validators') 65const kPaused = Symbol('kPaused') 66const { StringDecoder } = require('string_decoder') 67const from = require('./from') 68ObjectSetPrototypeOf(Readable.prototype, Stream.prototype) 69ObjectSetPrototypeOf(Readable, Stream) 70const nop = () => {} 71const { errorOrDestroy } = destroyImpl 72function ReadableState(options, stream, isDuplex) { 73 // Duplex streams are both readable and writable, but share 74 // the same options object. 75 // However, some cases require setting options to different 76 // values for the readable and the writable sides of the duplex stream. 77 // These options can be provided separately as readableXXX and writableXXX. 78 if (typeof isDuplex !== 'boolean') isDuplex = stream instanceof require('./duplex') 79 80 // Object stream flag. Used to make read(n) ignore n and to 81 // make all the buffer merging and length checks go away. 82 this.objectMode = !!(options && options.objectMode) 83 if (isDuplex) this.objectMode = this.objectMode || !!(options && options.readableObjectMode) 84 85 // The point at which it stops calling _read() to fill the buffer 86 // Note: 0 is a valid value, means "don't call _read preemptively ever" 87 this.highWaterMark = options 88 ? getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) 89 : getDefaultHighWaterMark(false) 90 91 // A linked list is used to store data chunks instead of an array because the 92 // linked list can remove elements from the beginning faster than 93 // array.shift(). 94 this.buffer = new BufferList() 95 this.length = 0 96 this.pipes = [] 97 this.flowing = null 98 this.ended = false 99 this.endEmitted = false 100 this.reading = false 101 102 // Stream is still being constructed and cannot be 103 // destroyed until construction finished or failed. 104 // Async construction is opt in, therefore we start as 105 // constructed. 106 this.constructed = true 107 108 // A flag to be able to tell if the event 'readable'/'data' is emitted 109 // immediately, or on a later tick. We set this to true at first, because 110 // any actions that shouldn't happen until "later" should generally also 111 // not happen before the first read call. 112 this.sync = true 113 114 // Whenever we return null, then we set a flag to say 115 // that we're awaiting a 'readable' event emission. 116 this.needReadable = false 117 this.emittedReadable = false 118 this.readableListening = false 119 this.resumeScheduled = false 120 this[kPaused] = null 121 122 // True if the error was already emitted and should not be thrown again. 123 this.errorEmitted = false 124 125 // Should close be emitted on destroy. Defaults to true. 126 this.emitClose = !options || options.emitClose !== false 127 128 // Should .destroy() be called after 'end' (and potentially 'finish'). 129 this.autoDestroy = !options || options.autoDestroy !== false 130 131 // Has it been destroyed. 132 this.destroyed = false 133 134 // Indicates whether the stream has errored. When true no further 135 // _read calls, 'data' or 'readable' events should occur. This is needed 136 // since when autoDestroy is disabled we need a way to tell whether the 137 // stream has failed. 138 this.errored = null 139 140 // Indicates whether the stream has finished destroying. 141 this.closed = false 142 143 // True if close has been emitted or would have been emitted 144 // depending on emitClose. 145 this.closeEmitted = false 146 147 // Crypto is kind of old and crusty. Historically, its default string 148 // encoding is 'binary' so we have to make this configurable. 149 // Everything else in the universe uses 'utf8', though. 150 this.defaultEncoding = (options && options.defaultEncoding) || 'utf8' 151 152 // Ref the piped dest which we need a drain event on it 153 // type: null | Writable | Set<Writable>. 154 this.awaitDrainWriters = null 155 this.multiAwaitDrain = false 156 157 // If true, a maybeReadMore has been scheduled. 158 this.readingMore = false 159 this.dataEmitted = false 160 this.decoder = null 161 this.encoding = null 162 if (options && options.encoding) { 163 this.decoder = new StringDecoder(options.encoding) 164 this.encoding = options.encoding 165 } 166} 167function Readable(options) { 168 if (!(this instanceof Readable)) return new Readable(options) 169 170 // Checking for a Stream.Duplex instance is faster here instead of inside 171 // the ReadableState constructor, at least with V8 6.5. 172 const isDuplex = this instanceof require('./duplex') 173 this._readableState = new ReadableState(options, this, isDuplex) 174 if (options) { 175 if (typeof options.read === 'function') this._read = options.read 176 if (typeof options.destroy === 'function') this._destroy = options.destroy 177 if (typeof options.construct === 'function') this._construct = options.construct 178 if (options.signal && !isDuplex) addAbortSignal(options.signal, this) 179 } 180 Stream.call(this, options) 181 destroyImpl.construct(this, () => { 182 if (this._readableState.needReadable) { 183 maybeReadMore(this, this._readableState) 184 } 185 }) 186} 187Readable.prototype.destroy = destroyImpl.destroy 188Readable.prototype._undestroy = destroyImpl.undestroy 189Readable.prototype._destroy = function (err, cb) { 190 cb(err) 191} 192Readable.prototype[EE.captureRejectionSymbol] = function (err) { 193 this.destroy(err) 194} 195 196// Manually shove something into the read() buffer. 197// This returns true if the highWaterMark has not been hit yet, 198// similar to how Writable.write() returns true if you should 199// write() some more. 200Readable.prototype.push = function (chunk, encoding) { 201 return readableAddChunk(this, chunk, encoding, false) 202} 203 204// Unshift should *always* be something directly out of read(). 205Readable.prototype.unshift = function (chunk, encoding) { 206 return readableAddChunk(this, chunk, encoding, true) 207} 208function readableAddChunk(stream, chunk, encoding, addToFront) { 209 debug('readableAddChunk', chunk) 210 const state = stream._readableState 211 let err 212 if (!state.objectMode) { 213 if (typeof chunk === 'string') { 214 encoding = encoding || state.defaultEncoding 215 if (state.encoding !== encoding) { 216 if (addToFront && state.encoding) { 217 // When unshifting, if state.encoding is set, we have to save 218 // the string in the BufferList with the state encoding. 219 chunk = Buffer.from(chunk, encoding).toString(state.encoding) 220 } else { 221 chunk = Buffer.from(chunk, encoding) 222 encoding = '' 223 } 224 } 225 } else if (chunk instanceof Buffer) { 226 encoding = '' 227 } else if (Stream._isUint8Array(chunk)) { 228 chunk = Stream._uint8ArrayToBuffer(chunk) 229 encoding = '' 230 } else if (chunk != null) { 231 err = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer', 'Uint8Array'], chunk) 232 } 233 } 234 if (err) { 235 errorOrDestroy(stream, err) 236 } else if (chunk === null) { 237 state.reading = false 238 onEofChunk(stream, state) 239 } else if (state.objectMode || (chunk && chunk.length > 0)) { 240 if (addToFront) { 241 if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()) 242 else if (state.destroyed || state.errored) return false 243 else addChunk(stream, state, chunk, true) 244 } else if (state.ended) { 245 errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()) 246 } else if (state.destroyed || state.errored) { 247 return false 248 } else { 249 state.reading = false 250 if (state.decoder && !encoding) { 251 chunk = state.decoder.write(chunk) 252 if (state.objectMode || chunk.length !== 0) addChunk(stream, state, chunk, false) 253 else maybeReadMore(stream, state) 254 } else { 255 addChunk(stream, state, chunk, false) 256 } 257 } 258 } else if (!addToFront) { 259 state.reading = false 260 maybeReadMore(stream, state) 261 } 262 263 // We can push more data if we are below the highWaterMark. 264 // Also, if we have no data yet, we can stand some more bytes. 265 // This is to work around cases where hwm=0, such as the repl. 266 return !state.ended && (state.length < state.highWaterMark || state.length === 0) 267} 268function addChunk(stream, state, chunk, addToFront) { 269 if (state.flowing && state.length === 0 && !state.sync && stream.listenerCount('data') > 0) { 270 // Use the guard to avoid creating `Set()` repeatedly 271 // when we have multiple pipes. 272 if (state.multiAwaitDrain) { 273 state.awaitDrainWriters.clear() 274 } else { 275 state.awaitDrainWriters = null 276 } 277 state.dataEmitted = true 278 stream.emit('data', chunk) 279 } else { 280 // Update the buffer info. 281 state.length += state.objectMode ? 1 : chunk.length 282 if (addToFront) state.buffer.unshift(chunk) 283 else state.buffer.push(chunk) 284 if (state.needReadable) emitReadable(stream) 285 } 286 maybeReadMore(stream, state) 287} 288Readable.prototype.isPaused = function () { 289 const state = this._readableState 290 return state[kPaused] === true || state.flowing === false 291} 292 293// Backwards compatibility. 294Readable.prototype.setEncoding = function (enc) { 295 const decoder = new StringDecoder(enc) 296 this._readableState.decoder = decoder 297 // If setEncoding(null), decoder.encoding equals utf8. 298 this._readableState.encoding = this._readableState.decoder.encoding 299 const buffer = this._readableState.buffer 300 // Iterate over current buffer to convert already stored Buffers: 301 let content = '' 302 for (const data of buffer) { 303 content += decoder.write(data) 304 } 305 buffer.clear() 306 if (content !== '') buffer.push(content) 307 this._readableState.length = content.length 308 return this 309} 310 311// Don't raise the hwm > 1GB. 312const MAX_HWM = 0x40000000 313function computeNewHighWaterMark(n) { 314 if (n > MAX_HWM) { 315 throw new ERR_OUT_OF_RANGE('size', '<= 1GiB', n) 316 } else { 317 // Get the next highest power of 2 to prevent increasing hwm excessively in 318 // tiny amounts. 319 n-- 320 n |= n >>> 1 321 n |= n >>> 2 322 n |= n >>> 4 323 n |= n >>> 8 324 n |= n >>> 16 325 n++ 326 } 327 return n 328} 329 330// This function is designed to be inlinable, so please take care when making 331// changes to the function body. 332function howMuchToRead(n, state) { 333 if (n <= 0 || (state.length === 0 && state.ended)) return 0 334 if (state.objectMode) return 1 335 if (NumberIsNaN(n)) { 336 // Only flow one buffer at a time. 337 if (state.flowing && state.length) return state.buffer.first().length 338 return state.length 339 } 340 if (n <= state.length) return n 341 return state.ended ? state.length : 0 342} 343 344// You can override either this method, or the async _read(n) below. 345Readable.prototype.read = function (n) { 346 debug('read', n) 347 // Same as parseInt(undefined, 10), however V8 7.3 performance regressed 348 // in this scenario, so we are doing it manually. 349 if (n === undefined) { 350 n = NaN 351 } else if (!NumberIsInteger(n)) { 352 n = NumberParseInt(n, 10) 353 } 354 const state = this._readableState 355 const nOrig = n 356 357 // If we're asking for more than the current hwm, then raise the hwm. 358 if (n > state.highWaterMark) state.highWaterMark = computeNewHighWaterMark(n) 359 if (n !== 0) state.emittedReadable = false 360 361 // If we're doing read(0) to trigger a readable event, but we 362 // already have a bunch of data in the buffer, then just trigger 363 // the 'readable' event and move on. 364 if ( 365 n === 0 && 366 state.needReadable && 367 ((state.highWaterMark !== 0 ? state.length >= state.highWaterMark : state.length > 0) || state.ended) 368 ) { 369 debug('read: emitReadable', state.length, state.ended) 370 if (state.length === 0 && state.ended) endReadable(this) 371 else emitReadable(this) 372 return null 373 } 374 n = howMuchToRead(n, state) 375 376 // If we've ended, and we're now clear, then finish it up. 377 if (n === 0 && state.ended) { 378 if (state.length === 0) endReadable(this) 379 return null 380 } 381 382 // All the actual chunk generation logic needs to be 383 // *below* the call to _read. The reason is that in certain 384 // synthetic stream cases, such as passthrough streams, _read 385 // may be a completely synchronous operation which may change 386 // the state of the read buffer, providing enough data when 387 // before there was *not* enough. 388 // 389 // So, the steps are: 390 // 1. Figure out what the state of things will be after we do 391 // a read from the buffer. 392 // 393 // 2. If that resulting state will trigger a _read, then call _read. 394 // Note that this may be asynchronous, or synchronous. Yes, it is 395 // deeply ugly to write APIs this way, but that still doesn't mean 396 // that the Readable class should behave improperly, as streams are 397 // designed to be sync/async agnostic. 398 // Take note if the _read call is sync or async (ie, if the read call 399 // has returned yet), so that we know whether or not it's safe to emit 400 // 'readable' etc. 401 // 402 // 3. Actually pull the requested chunks out of the buffer and return. 403 404 // if we need a readable event, then we need to do some reading. 405 let doRead = state.needReadable 406 debug('need readable', doRead) 407 408 // If we currently have less than the highWaterMark, then also read some. 409 if (state.length === 0 || state.length - n < state.highWaterMark) { 410 doRead = true 411 debug('length less than watermark', doRead) 412 } 413 414 // However, if we've ended, then there's no point, if we're already 415 // reading, then it's unnecessary, if we're constructing we have to wait, 416 // and if we're destroyed or errored, then it's not allowed, 417 if (state.ended || state.reading || state.destroyed || state.errored || !state.constructed) { 418 doRead = false 419 debug('reading, ended or constructing', doRead) 420 } else if (doRead) { 421 debug('do read') 422 state.reading = true 423 state.sync = true 424 // If the length is currently zero, then we *need* a readable event. 425 if (state.length === 0) state.needReadable = true 426 427 // Call internal read method 428 try { 429 this._read(state.highWaterMark) 430 } catch (err) { 431 errorOrDestroy(this, err) 432 } 433 state.sync = false 434 // If _read pushed data synchronously, then `reading` will be false, 435 // and we need to re-evaluate how much data we can return to the user. 436 if (!state.reading) n = howMuchToRead(nOrig, state) 437 } 438 let ret 439 if (n > 0) ret = fromList(n, state) 440 else ret = null 441 if (ret === null) { 442 state.needReadable = state.length <= state.highWaterMark 443 n = 0 444 } else { 445 state.length -= n 446 if (state.multiAwaitDrain) { 447 state.awaitDrainWriters.clear() 448 } else { 449 state.awaitDrainWriters = null 450 } 451 } 452 if (state.length === 0) { 453 // If we have nothing in the buffer, then we want to know 454 // as soon as we *do* get something into the buffer. 455 if (!state.ended) state.needReadable = true 456 457 // If we tried to read() past the EOF, then emit end on the next tick. 458 if (nOrig !== n && state.ended) endReadable(this) 459 } 460 if (ret !== null && !state.errorEmitted && !state.closeEmitted) { 461 state.dataEmitted = true 462 this.emit('data', ret) 463 } 464 return ret 465} 466function onEofChunk(stream, state) { 467 debug('onEofChunk') 468 if (state.ended) return 469 if (state.decoder) { 470 const chunk = state.decoder.end() 471 if (chunk && chunk.length) { 472 state.buffer.push(chunk) 473 state.length += state.objectMode ? 1 : chunk.length 474 } 475 } 476 state.ended = true 477 if (state.sync) { 478 // If we are sync, wait until next tick to emit the data. 479 // Otherwise we risk emitting data in the flow() 480 // the readable code triggers during a read() call. 481 emitReadable(stream) 482 } else { 483 // Emit 'readable' now to make sure it gets picked up. 484 state.needReadable = false 485 state.emittedReadable = true 486 // We have to emit readable now that we are EOF. Modules 487 // in the ecosystem (e.g. dicer) rely on this event being sync. 488 emitReadable_(stream) 489 } 490} 491 492// Don't emit readable right away in sync mode, because this can trigger 493// another read() call => stack overflow. This way, it might trigger 494// a nextTick recursion warning, but that's not so bad. 495function emitReadable(stream) { 496 const state = stream._readableState 497 debug('emitReadable', state.needReadable, state.emittedReadable) 498 state.needReadable = false 499 if (!state.emittedReadable) { 500 debug('emitReadable', state.flowing) 501 state.emittedReadable = true 502 process.nextTick(emitReadable_, stream) 503 } 504} 505function emitReadable_(stream) { 506 const state = stream._readableState 507 debug('emitReadable_', state.destroyed, state.length, state.ended) 508 if (!state.destroyed && !state.errored && (state.length || state.ended)) { 509 stream.emit('readable') 510 state.emittedReadable = false 511 } 512 513 // The stream needs another readable event if: 514 // 1. It is not flowing, as the flow mechanism will take 515 // care of it. 516 // 2. It is not ended. 517 // 3. It is below the highWaterMark, so we can schedule 518 // another readable later. 519 state.needReadable = !state.flowing && !state.ended && state.length <= state.highWaterMark 520 flow(stream) 521} 522 523// At this point, the user has presumably seen the 'readable' event, 524// and called read() to consume some data. that may have triggered 525// in turn another _read(n) call, in which case reading = true if 526// it's in progress. 527// However, if we're not ended, or reading, and the length < hwm, 528// then go ahead and try to read some more preemptively. 529function maybeReadMore(stream, state) { 530 if (!state.readingMore && state.constructed) { 531 state.readingMore = true 532 process.nextTick(maybeReadMore_, stream, state) 533 } 534} 535function maybeReadMore_(stream, state) { 536 // Attempt to read more data if we should. 537 // 538 // The conditions for reading more data are (one of): 539 // - Not enough data buffered (state.length < state.highWaterMark). The loop 540 // is responsible for filling the buffer with enough data if such data 541 // is available. If highWaterMark is 0 and we are not in the flowing mode 542 // we should _not_ attempt to buffer any extra data. We'll get more data 543 // when the stream consumer calls read() instead. 544 // - No data in the buffer, and the stream is in flowing mode. In this mode 545 // the loop below is responsible for ensuring read() is called. Failing to 546 // call read here would abort the flow and there's no other mechanism for 547 // continuing the flow if the stream consumer has just subscribed to the 548 // 'data' event. 549 // 550 // In addition to the above conditions to keep reading data, the following 551 // conditions prevent the data from being read: 552 // - The stream has ended (state.ended). 553 // - There is already a pending 'read' operation (state.reading). This is a 554 // case where the stream has called the implementation defined _read() 555 // method, but they are processing the call asynchronously and have _not_ 556 // called push() with new data. In this case we skip performing more 557 // read()s. The execution ends in this method again after the _read() ends 558 // up calling push() with more data. 559 while ( 560 !state.reading && 561 !state.ended && 562 (state.length < state.highWaterMark || (state.flowing && state.length === 0)) 563 ) { 564 const len = state.length 565 debug('maybeReadMore read 0') 566 stream.read(0) 567 if (len === state.length) 568 // Didn't get any data, stop spinning. 569 break 570 } 571 state.readingMore = false 572} 573 574// Abstract method. to be overridden in specific implementation classes. 575// call cb(er, data) where data is <= n in length. 576// for virtual (non-string, non-buffer) streams, "length" is somewhat 577// arbitrary, and perhaps not very meaningful. 578Readable.prototype._read = function (n) { 579 throw new ERR_METHOD_NOT_IMPLEMENTED('_read()') 580} 581Readable.prototype.pipe = function (dest, pipeOpts) { 582 const src = this 583 const state = this._readableState 584 if (state.pipes.length === 1) { 585 if (!state.multiAwaitDrain) { 586 state.multiAwaitDrain = true 587 state.awaitDrainWriters = new SafeSet(state.awaitDrainWriters ? [state.awaitDrainWriters] : []) 588 } 589 } 590 state.pipes.push(dest) 591 debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts) 592 const doEnd = (!pipeOpts || pipeOpts.end !== false) && dest !== process.stdout && dest !== process.stderr 593 const endFn = doEnd ? onend : unpipe 594 if (state.endEmitted) process.nextTick(endFn) 595 else src.once('end', endFn) 596 dest.on('unpipe', onunpipe) 597 function onunpipe(readable, unpipeInfo) { 598 debug('onunpipe') 599 if (readable === src) { 600 if (unpipeInfo && unpipeInfo.hasUnpiped === false) { 601 unpipeInfo.hasUnpiped = true 602 cleanup() 603 } 604 } 605 } 606 function onend() { 607 debug('onend') 608 dest.end() 609 } 610 let ondrain 611 let cleanedUp = false 612 function cleanup() { 613 debug('cleanup') 614 // Cleanup event handlers once the pipe is broken. 615 dest.removeListener('close', onclose) 616 dest.removeListener('finish', onfinish) 617 if (ondrain) { 618 dest.removeListener('drain', ondrain) 619 } 620 dest.removeListener('error', onerror) 621 dest.removeListener('unpipe', onunpipe) 622 src.removeListener('end', onend) 623 src.removeListener('end', unpipe) 624 src.removeListener('data', ondata) 625 cleanedUp = true 626 627 // If the reader is waiting for a drain event from this 628 // specific writer, then it would cause it to never start 629 // flowing again. 630 // So, if this is awaiting a drain, then we just call it now. 631 // If we don't know, then assume that we are waiting for one. 632 if (ondrain && state.awaitDrainWriters && (!dest._writableState || dest._writableState.needDrain)) ondrain() 633 } 634 function pause() { 635 // If the user unpiped during `dest.write()`, it is possible 636 // to get stuck in a permanently paused state if that write 637 // also returned false. 638 // => Check whether `dest` is still a piping destination. 639 if (!cleanedUp) { 640 if (state.pipes.length === 1 && state.pipes[0] === dest) { 641 debug('false write response, pause', 0) 642 state.awaitDrainWriters = dest 643 state.multiAwaitDrain = false 644 } else if (state.pipes.length > 1 && state.pipes.includes(dest)) { 645 debug('false write response, pause', state.awaitDrainWriters.size) 646 state.awaitDrainWriters.add(dest) 647 } 648 src.pause() 649 } 650 if (!ondrain) { 651 // When the dest drains, it reduces the awaitDrain counter 652 // on the source. This would be more elegant with a .once() 653 // handler in flow(), but adding and removing repeatedly is 654 // too slow. 655 ondrain = pipeOnDrain(src, dest) 656 dest.on('drain', ondrain) 657 } 658 } 659 src.on('data', ondata) 660 function ondata(chunk) { 661 debug('ondata') 662 const ret = dest.write(chunk) 663 debug('dest.write', ret) 664 if (ret === false) { 665 pause() 666 } 667 } 668 669 // If the dest has an error, then stop piping into it. 670 // However, don't suppress the throwing behavior for this. 671 function onerror(er) { 672 debug('onerror', er) 673 unpipe() 674 dest.removeListener('error', onerror) 675 if (dest.listenerCount('error') === 0) { 676 const s = dest._writableState || dest._readableState 677 if (s && !s.errorEmitted) { 678 // User incorrectly emitted 'error' directly on the stream. 679 errorOrDestroy(dest, er) 680 } else { 681 dest.emit('error', er) 682 } 683 } 684 } 685 686 // Make sure our error handler is attached before userland ones. 687 prependListener(dest, 'error', onerror) 688 689 // Both close and finish should trigger unpipe, but only once. 690 function onclose() { 691 dest.removeListener('finish', onfinish) 692 unpipe() 693 } 694 dest.once('close', onclose) 695 function onfinish() { 696 debug('onfinish') 697 dest.removeListener('close', onclose) 698 unpipe() 699 } 700 dest.once('finish', onfinish) 701 function unpipe() { 702 debug('unpipe') 703 src.unpipe(dest) 704 } 705 706 // Tell the dest that it's being piped to. 707 dest.emit('pipe', src) 708 709 // Start the flow if it hasn't been started already. 710 711 if (dest.writableNeedDrain === true) { 712 if (state.flowing) { 713 pause() 714 } 715 } else if (!state.flowing) { 716 debug('pipe resume') 717 src.resume() 718 } 719 return dest 720} 721function pipeOnDrain(src, dest) { 722 return function pipeOnDrainFunctionResult() { 723 const state = src._readableState 724 725 // `ondrain` will call directly, 726 // `this` maybe not a reference to dest, 727 // so we use the real dest here. 728 if (state.awaitDrainWriters === dest) { 729 debug('pipeOnDrain', 1) 730 state.awaitDrainWriters = null 731 } else if (state.multiAwaitDrain) { 732 debug('pipeOnDrain', state.awaitDrainWriters.size) 733 state.awaitDrainWriters.delete(dest) 734 } 735 if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) && src.listenerCount('data')) { 736 src.resume() 737 } 738 } 739} 740Readable.prototype.unpipe = function (dest) { 741 const state = this._readableState 742 const unpipeInfo = { 743 hasUnpiped: false 744 } 745 746 // If we're not piping anywhere, then do nothing. 747 if (state.pipes.length === 0) return this 748 if (!dest) { 749 // remove all. 750 const dests = state.pipes 751 state.pipes = [] 752 this.pause() 753 for (let i = 0; i < dests.length; i++) 754 dests[i].emit('unpipe', this, { 755 hasUnpiped: false 756 }) 757 return this 758 } 759 760 // Try to find the right one. 761 const index = ArrayPrototypeIndexOf(state.pipes, dest) 762 if (index === -1) return this 763 state.pipes.splice(index, 1) 764 if (state.pipes.length === 0) this.pause() 765 dest.emit('unpipe', this, unpipeInfo) 766 return this 767} 768 769// Set up data events if they are asked for 770// Ensure readable listeners eventually get something. 771Readable.prototype.on = function (ev, fn) { 772 const res = Stream.prototype.on.call(this, ev, fn) 773 const state = this._readableState 774 if (ev === 'data') { 775 // Update readableListening so that resume() may be a no-op 776 // a few lines down. This is needed to support once('readable'). 777 state.readableListening = this.listenerCount('readable') > 0 778 779 // Try start flowing on next tick if stream isn't explicitly paused. 780 if (state.flowing !== false) this.resume() 781 } else if (ev === 'readable') { 782 if (!state.endEmitted && !state.readableListening) { 783 state.readableListening = state.needReadable = true 784 state.flowing = false 785 state.emittedReadable = false 786 debug('on readable', state.length, state.reading) 787 if (state.length) { 788 emitReadable(this) 789 } else if (!state.reading) { 790 process.nextTick(nReadingNextTick, this) 791 } 792 } 793 } 794 return res 795} 796Readable.prototype.addListener = Readable.prototype.on 797Readable.prototype.removeListener = function (ev, fn) { 798 const res = Stream.prototype.removeListener.call(this, ev, fn) 799 if (ev === 'readable') { 800 // We need to check if there is someone still listening to 801 // readable and reset the state. However this needs to happen 802 // after readable has been emitted but before I/O (nextTick) to 803 // support once('readable', fn) cycles. This means that calling 804 // resume within the same tick will have no 805 // effect. 806 process.nextTick(updateReadableListening, this) 807 } 808 return res 809} 810Readable.prototype.off = Readable.prototype.removeListener 811Readable.prototype.removeAllListeners = function (ev) { 812 const res = Stream.prototype.removeAllListeners.apply(this, arguments) 813 if (ev === 'readable' || ev === undefined) { 814 // We need to check if there is someone still listening to 815 // readable and reset the state. However this needs to happen 816 // after readable has been emitted but before I/O (nextTick) to 817 // support once('readable', fn) cycles. This means that calling 818 // resume within the same tick will have no 819 // effect. 820 process.nextTick(updateReadableListening, this) 821 } 822 return res 823} 824function updateReadableListening(self) { 825 const state = self._readableState 826 state.readableListening = self.listenerCount('readable') > 0 827 if (state.resumeScheduled && state[kPaused] === false) { 828 // Flowing needs to be set to true now, otherwise 829 // the upcoming resume will not flow. 830 state.flowing = true 831 832 // Crude way to check if we should resume. 833 } else if (self.listenerCount('data') > 0) { 834 self.resume() 835 } else if (!state.readableListening) { 836 state.flowing = null 837 } 838} 839function nReadingNextTick(self) { 840 debug('readable nexttick read 0') 841 self.read(0) 842} 843 844// pause() and resume() are remnants of the legacy readable stream API 845// If the user uses them, then switch into old mode. 846Readable.prototype.resume = function () { 847 const state = this._readableState 848 if (!state.flowing) { 849 debug('resume') 850 // We flow only if there is no one listening 851 // for readable, but we still have to call 852 // resume(). 853 state.flowing = !state.readableListening 854 resume(this, state) 855 } 856 state[kPaused] = false 857 return this 858} 859function resume(stream, state) { 860 if (!state.resumeScheduled) { 861 state.resumeScheduled = true 862 process.nextTick(resume_, stream, state) 863 } 864} 865function resume_(stream, state) { 866 debug('resume', state.reading) 867 if (!state.reading) { 868 stream.read(0) 869 } 870 state.resumeScheduled = false 871 stream.emit('resume') 872 flow(stream) 873 if (state.flowing && !state.reading) stream.read(0) 874} 875Readable.prototype.pause = function () { 876 debug('call pause flowing=%j', this._readableState.flowing) 877 if (this._readableState.flowing !== false) { 878 debug('pause') 879 this._readableState.flowing = false 880 this.emit('pause') 881 } 882 this._readableState[kPaused] = true 883 return this 884} 885function flow(stream) { 886 const state = stream._readableState 887 debug('flow', state.flowing) 888 while (state.flowing && stream.read() !== null); 889} 890 891// Wrap an old-style stream as the async data source. 892// This is *not* part of the readable stream interface. 893// It is an ugly unfortunate mess of history. 894Readable.prototype.wrap = function (stream) { 895 let paused = false 896 897 // TODO (ronag): Should this.destroy(err) emit 898 // 'error' on the wrapped stream? Would require 899 // a static factory method, e.g. Readable.wrap(stream). 900 901 stream.on('data', (chunk) => { 902 if (!this.push(chunk) && stream.pause) { 903 paused = true 904 stream.pause() 905 } 906 }) 907 stream.on('end', () => { 908 this.push(null) 909 }) 910 stream.on('error', (err) => { 911 errorOrDestroy(this, err) 912 }) 913 stream.on('close', () => { 914 this.destroy() 915 }) 916 stream.on('destroy', () => { 917 this.destroy() 918 }) 919 this._read = () => { 920 if (paused && stream.resume) { 921 paused = false 922 stream.resume() 923 } 924 } 925 926 // Proxy all the other methods. Important when wrapping filters and duplexes. 927 const streamKeys = ObjectKeys(stream) 928 for (let j = 1; j < streamKeys.length; j++) { 929 const i = streamKeys[j] 930 if (this[i] === undefined && typeof stream[i] === 'function') { 931 this[i] = stream[i].bind(stream) 932 } 933 } 934 return this 935} 936Readable.prototype[SymbolAsyncIterator] = function () { 937 return streamToAsyncIterator(this) 938} 939Readable.prototype.iterator = function (options) { 940 if (options !== undefined) { 941 validateObject(options, 'options') 942 } 943 return streamToAsyncIterator(this, options) 944} 945function streamToAsyncIterator(stream, options) { 946 if (typeof stream.read !== 'function') { 947 stream = Readable.wrap(stream, { 948 objectMode: true 949 }) 950 } 951 const iter = createAsyncIterator(stream, options) 952 iter.stream = stream 953 return iter 954} 955async function* createAsyncIterator(stream, options) { 956 let callback = nop 957 function next(resolve) { 958 if (this === stream) { 959 callback() 960 callback = nop 961 } else { 962 callback = resolve 963 } 964 } 965 stream.on('readable', next) 966 let error 967 const cleanup = eos( 968 stream, 969 { 970 writable: false 971 }, 972 (err) => { 973 error = err ? aggregateTwoErrors(error, err) : null 974 callback() 975 callback = nop 976 } 977 ) 978 try { 979 while (true) { 980 const chunk = stream.destroyed ? null : stream.read() 981 if (chunk !== null) { 982 yield chunk 983 } else if (error) { 984 throw error 985 } else if (error === null) { 986 return 987 } else { 988 await new Promise(next) 989 } 990 } 991 } catch (err) { 992 error = aggregateTwoErrors(error, err) 993 throw error 994 } finally { 995 if ( 996 (error || (options === null || options === undefined ? undefined : options.destroyOnReturn) !== false) && 997 (error === undefined || stream._readableState.autoDestroy) 998 ) { 999 destroyImpl.destroyer(stream, null) 1000 } else { 1001 stream.off('readable', next) 1002 cleanup() 1003 } 1004 } 1005} 1006 1007// Making it explicit these properties are not enumerable 1008// because otherwise some prototype manipulation in 1009// userland will fail. 1010ObjectDefineProperties(Readable.prototype, { 1011 readable: { 1012 __proto__: null, 1013 get() { 1014 const r = this._readableState 1015 // r.readable === false means that this is part of a Duplex stream 1016 // where the readable side was disabled upon construction. 1017 // Compat. The user might manually disable readable side through 1018 // deprecated setter. 1019 return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted && !r.endEmitted 1020 }, 1021 set(val) { 1022 // Backwards compat. 1023 if (this._readableState) { 1024 this._readableState.readable = !!val 1025 } 1026 } 1027 }, 1028 readableDidRead: { 1029 __proto__: null, 1030 enumerable: false, 1031 get: function () { 1032 return this._readableState.dataEmitted 1033 } 1034 }, 1035 readableAborted: { 1036 __proto__: null, 1037 enumerable: false, 1038 get: function () { 1039 return !!( 1040 this._readableState.readable !== false && 1041 (this._readableState.destroyed || this._readableState.errored) && 1042 !this._readableState.endEmitted 1043 ) 1044 } 1045 }, 1046 readableHighWaterMark: { 1047 __proto__: null, 1048 enumerable: false, 1049 get: function () { 1050 return this._readableState.highWaterMark 1051 } 1052 }, 1053 readableBuffer: { 1054 __proto__: null, 1055 enumerable: false, 1056 get: function () { 1057 return this._readableState && this._readableState.buffer 1058 } 1059 }, 1060 readableFlowing: { 1061 __proto__: null, 1062 enumerable: false, 1063 get: function () { 1064 return this._readableState.flowing 1065 }, 1066 set: function (state) { 1067 if (this._readableState) { 1068 this._readableState.flowing = state 1069 } 1070 } 1071 }, 1072 readableLength: { 1073 __proto__: null, 1074 enumerable: false, 1075 get() { 1076 return this._readableState.length 1077 } 1078 }, 1079 readableObjectMode: { 1080 __proto__: null, 1081 enumerable: false, 1082 get() { 1083 return this._readableState ? this._readableState.objectMode : false 1084 } 1085 }, 1086 readableEncoding: { 1087 __proto__: null, 1088 enumerable: false, 1089 get() { 1090 return this._readableState ? this._readableState.encoding : null 1091 } 1092 }, 1093 errored: { 1094 __proto__: null, 1095 enumerable: false, 1096 get() { 1097 return this._readableState ? this._readableState.errored : null 1098 } 1099 }, 1100 closed: { 1101 __proto__: null, 1102 get() { 1103 return this._readableState ? this._readableState.closed : false 1104 } 1105 }, 1106 destroyed: { 1107 __proto__: null, 1108 enumerable: false, 1109 get() { 1110 return this._readableState ? this._readableState.destroyed : false 1111 }, 1112 set(value) { 1113 // We ignore the value if the stream 1114 // has not been initialized yet. 1115 if (!this._readableState) { 1116 return 1117 } 1118 1119 // Backward compatibility, the user is explicitly 1120 // managing destroyed. 1121 this._readableState.destroyed = value 1122 } 1123 }, 1124 readableEnded: { 1125 __proto__: null, 1126 enumerable: false, 1127 get() { 1128 return this._readableState ? this._readableState.endEmitted : false 1129 } 1130 } 1131}) 1132ObjectDefineProperties(ReadableState.prototype, { 1133 // Legacy getter for `pipesCount`. 1134 pipesCount: { 1135 __proto__: null, 1136 get() { 1137 return this.pipes.length 1138 } 1139 }, 1140 // Legacy property for `paused`. 1141 paused: { 1142 __proto__: null, 1143 get() { 1144 return this[kPaused] !== false 1145 }, 1146 set(value) { 1147 this[kPaused] = !!value 1148 } 1149 } 1150}) 1151 1152// Exposed for testing purposes only. 1153Readable._fromList = fromList 1154 1155// Pluck off n bytes from an array of buffers. 1156// Length is the combined lengths of all the buffers in the list. 1157// This function is designed to be inlinable, so please take care when making 1158// changes to the function body. 1159function fromList(n, state) { 1160 // nothing buffered. 1161 if (state.length === 0) return null 1162 let ret 1163 if (state.objectMode) ret = state.buffer.shift() 1164 else if (!n || n >= state.length) { 1165 // Read it all, truncate the list. 1166 if (state.decoder) ret = state.buffer.join('') 1167 else if (state.buffer.length === 1) ret = state.buffer.first() 1168 else ret = state.buffer.concat(state.length) 1169 state.buffer.clear() 1170 } else { 1171 // read part of list. 1172 ret = state.buffer.consume(n, state.decoder) 1173 } 1174 return ret 1175} 1176function endReadable(stream) { 1177 const state = stream._readableState 1178 debug('endReadable', state.endEmitted) 1179 if (!state.endEmitted) { 1180 state.ended = true 1181 process.nextTick(endReadableNT, state, stream) 1182 } 1183} 1184function endReadableNT(state, stream) { 1185 debug('endReadableNT', state.endEmitted, state.length) 1186 1187 // Check that we didn't get one last unshift. 1188 if (!state.errored && !state.closeEmitted && !state.endEmitted && state.length === 0) { 1189 state.endEmitted = true 1190 stream.emit('end') 1191 if (stream.writable && stream.allowHalfOpen === false) { 1192 process.nextTick(endWritableNT, stream) 1193 } else if (state.autoDestroy) { 1194 // In case of duplex streams we need a way to detect 1195 // if the writable side is ready for autoDestroy as well. 1196 const wState = stream._writableState 1197 const autoDestroy = 1198 !wState || 1199 (wState.autoDestroy && 1200 // We don't expect the writable to ever 'finish' 1201 // if writable is explicitly set to false. 1202 (wState.finished || wState.writable === false)) 1203 if (autoDestroy) { 1204 stream.destroy() 1205 } 1206 } 1207 } 1208} 1209function endWritableNT(stream) { 1210 const writable = stream.writable && !stream.writableEnded && !stream.destroyed 1211 if (writable) { 1212 stream.end() 1213 } 1214} 1215Readable.from = function (iterable, opts) { 1216 return from(Readable, iterable, opts) 1217} 1218let webStreamsAdapters 1219 1220// Lazy to avoid circular references 1221function lazyWebStreams() { 1222 if (webStreamsAdapters === undefined) webStreamsAdapters = {} 1223 return webStreamsAdapters 1224} 1225Readable.fromWeb = function (readableStream, options) { 1226 return lazyWebStreams().newStreamReadableFromReadableStream(readableStream, options) 1227} 1228Readable.toWeb = function (streamReadable, options) { 1229 return lazyWebStreams().newReadableStreamFromStreamReadable(streamReadable, options) 1230} 1231Readable.wrap = function (src, options) { 1232 var _ref, _src$readableObjectMo 1233 return new Readable({ 1234 objectMode: 1235 (_ref = 1236 (_src$readableObjectMo = src.readableObjectMode) !== null && _src$readableObjectMo !== undefined 1237 ? _src$readableObjectMo 1238 : src.objectMode) !== null && _ref !== undefined 1239 ? _ref 1240 : true, 1241 ...options, 1242 destroy(err, callback) { 1243 destroyImpl.destroyer(src, err) 1244 callback(err) 1245 } 1246 }).wrap(src) 1247} 1248