• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const {
25  ArrayPrototypeIndexOf,
26  NumberIsInteger,
27  NumberIsNaN,
28  NumberParseInt,
29  ObjectDefineProperties,
30  ObjectKeys,
31  ObjectSetPrototypeOf,
32  Promise,
33  SafeSet,
34  SymbolAsyncDispose,
35  SymbolAsyncIterator,
36  Symbol,
37} = primordials;
38
39module.exports = Readable;
40Readable.ReadableState = ReadableState;
41
42const EE = require('events');
43const { Stream, prependListener } = require('internal/streams/legacy');
44const { Buffer } = require('buffer');
45
46const {
47  addAbortSignal,
48} = require('internal/streams/add-abort-signal');
49const eos = require('internal/streams/end-of-stream');
50
51let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
52  debug = fn;
53});
54const BufferList = require('internal/streams/buffer_list');
55const destroyImpl = require('internal/streams/destroy');
56const {
57  getHighWaterMark,
58  getDefaultHighWaterMark,
59} = require('internal/streams/state');
60
61const {
62  aggregateTwoErrors,
63  codes: {
64    ERR_INVALID_ARG_TYPE,
65    ERR_METHOD_NOT_IMPLEMENTED,
66    ERR_OUT_OF_RANGE,
67    ERR_STREAM_PUSH_AFTER_EOF,
68    ERR_STREAM_UNSHIFT_AFTER_END_EVENT,
69  },
70  AbortError,
71} = require('internal/errors');
72const { validateObject } = require('internal/validators');
73
74const kPaused = Symbol('kPaused');
75
76const { StringDecoder } = require('string_decoder');
77const from = require('internal/streams/from');
78
79ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
80ObjectSetPrototypeOf(Readable, Stream);
81const nop = () => {};
82
83const { errorOrDestroy } = destroyImpl;
84
85const kObjectMode = 1 << 0;
86const kEnded = 1 << 1;
87const kEndEmitted = 1 << 2;
88const kReading = 1 << 3;
89const kConstructed = 1 << 4;
90const kSync = 1 << 5;
91const kNeedReadable = 1 << 6;
92const kEmittedReadable = 1 << 7;
93const kReadableListening = 1 << 8;
94const kResumeScheduled = 1 << 9;
95const kErrorEmitted = 1 << 10;
96const kEmitClose = 1 << 11;
97const kAutoDestroy = 1 << 12;
98const kDestroyed = 1 << 13;
99const kClosed = 1 << 14;
100const kCloseEmitted = 1 << 15;
101const kMultiAwaitDrain = 1 << 16;
102const kReadingMore = 1 << 17;
103const kDataEmitted = 1 << 18;
104
105// TODO(benjamingr) it is likely slower to do it this way than with free functions
106function makeBitMapDescriptor(bit) {
107  return {
108    enumerable: false,
109    get() { return (this.state & bit) !== 0; },
110    set(value) {
111      if (value) this.state |= bit;
112      else this.state &= ~bit;
113    },
114  };
115}
116ObjectDefineProperties(ReadableState.prototype, {
117  objectMode: makeBitMapDescriptor(kObjectMode),
118  ended: makeBitMapDescriptor(kEnded),
119  endEmitted: makeBitMapDescriptor(kEndEmitted),
120  reading: makeBitMapDescriptor(kReading),
121  // Stream is still being constructed and cannot be
122  // destroyed until construction finished or failed.
123  // Async construction is opt in, therefore we start as
124  // constructed.
125  constructed: makeBitMapDescriptor(kConstructed),
126  // A flag to be able to tell if the event 'readable'/'data' is emitted
127  // immediately, or on a later tick.  We set this to true at first, because
128  // any actions that shouldn't happen until "later" should generally also
129  // not happen before the first read call.
130  sync: makeBitMapDescriptor(kSync),
131  // Whenever we return null, then we set a flag to say
132  // that we're awaiting a 'readable' event emission.
133  needReadable: makeBitMapDescriptor(kNeedReadable),
134  emittedReadable: makeBitMapDescriptor(kEmittedReadable),
135  readableListening: makeBitMapDescriptor(kReadableListening),
136  resumeScheduled: makeBitMapDescriptor(kResumeScheduled),
137  // True if the error was already emitted and should not be thrown again.
138  errorEmitted: makeBitMapDescriptor(kErrorEmitted),
139  emitClose: makeBitMapDescriptor(kEmitClose),
140  autoDestroy: makeBitMapDescriptor(kAutoDestroy),
141  // Has it been destroyed.
142  destroyed: makeBitMapDescriptor(kDestroyed),
143  // Indicates whether the stream has finished destroying.
144  closed: makeBitMapDescriptor(kClosed),
145  // True if close has been emitted or would have been emitted
146  // depending on emitClose.
147  closeEmitted: makeBitMapDescriptor(kCloseEmitted),
148  multiAwaitDrain: makeBitMapDescriptor(kMultiAwaitDrain),
149  // If true, a maybeReadMore has been scheduled.
150  readingMore: makeBitMapDescriptor(kReadingMore),
151  dataEmitted: makeBitMapDescriptor(kDataEmitted),
152});
153
154function ReadableState(options, stream, isDuplex) {
155  // Duplex streams are both readable and writable, but share
156  // the same options object.
157  // However, some cases require setting options to different
158  // values for the readable and the writable sides of the duplex stream.
159  // These options can be provided separately as readableXXX and writableXXX.
160  if (typeof isDuplex !== 'boolean')
161    isDuplex = stream instanceof Stream.Duplex;
162
163  // Bit map field to store ReadableState more effciently with 1 bit per field
164  // instead of a V8 slot per field.
165  this.state = kEmitClose | kAutoDestroy | kConstructed | kSync;
166  // Object stream flag. Used to make read(n) ignore n and to
167  // make all the buffer merging and length checks go away.
168  if (options && options.objectMode) this.state |= kObjectMode;
169
170  if (isDuplex && options && options.readableObjectMode)
171    this.state |= kObjectMode;
172
173  // The point at which it stops calling _read() to fill the buffer
174  // Note: 0 is a valid value, means "don't call _read preemptively ever"
175  this.highWaterMark = options ?
176    getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
177    getDefaultHighWaterMark(false);
178
179  // A linked list is used to store data chunks instead of an array because the
180  // linked list can remove elements from the beginning faster than
181  // array.shift().
182  this.buffer = new BufferList();
183  this.length = 0;
184  this.pipes = [];
185  this.flowing = null;
186
187  this[kPaused] = null;
188
189  // Should close be emitted on destroy. Defaults to true.
190  if (options && options.emitClose === false) this.state &= ~kEmitClose;
191
192  // Should .destroy() be called after 'end' (and potentially 'finish').
193  if (options && options.autoDestroy === false) this.state &= ~kAutoDestroy;
194
195
196  // Indicates whether the stream has errored. When true no further
197  // _read calls, 'data' or 'readable' events should occur. This is needed
198  // since when autoDestroy is disabled we need a way to tell whether the
199  // stream has failed.
200  this.errored = null;
201
202
203  // Crypto is kind of old and crusty.  Historically, its default string
204  // encoding is 'binary' so we have to make this configurable.
205  // Everything else in the universe uses 'utf8', though.
206  this.defaultEncoding = (options && options.defaultEncoding) || 'utf8';
207
208  // Ref the piped dest which we need a drain event on it
209  // type: null | Writable | Set<Writable>.
210  this.awaitDrainWriters = null;
211
212  this.decoder = null;
213  this.encoding = null;
214  if (options && options.encoding) {
215    this.decoder = new StringDecoder(options.encoding);
216    this.encoding = options.encoding;
217  }
218}
219
220
221function Readable(options) {
222  if (!(this instanceof Readable))
223    return new Readable(options);
224
225  // Checking for a Stream.Duplex instance is faster here instead of inside
226  // the ReadableState constructor, at least with V8 6.5.
227  const isDuplex = this instanceof Stream.Duplex;
228
229  this._readableState = new ReadableState(options, this, isDuplex);
230
231  if (options) {
232    if (typeof options.read === 'function')
233      this._read = options.read;
234
235    if (typeof options.destroy === 'function')
236      this._destroy = options.destroy;
237
238    if (typeof options.construct === 'function')
239      this._construct = options.construct;
240
241    if (options.signal && !isDuplex)
242      addAbortSignal(options.signal, this);
243  }
244
245  Stream.call(this, options);
246
247  destroyImpl.construct(this, () => {
248    if (this._readableState.needReadable) {
249      maybeReadMore(this, this._readableState);
250    }
251  });
252}
253
254Readable.prototype.destroy = destroyImpl.destroy;
255Readable.prototype._undestroy = destroyImpl.undestroy;
256Readable.prototype._destroy = function(err, cb) {
257  cb(err);
258};
259
260Readable.prototype[EE.captureRejectionSymbol] = function(err) {
261  this.destroy(err);
262};
263
264Readable.prototype[SymbolAsyncDispose] = function() {
265  let error;
266  if (!this.destroyed) {
267    error = this.readableEnded ? null : new AbortError();
268    this.destroy(error);
269  }
270  return new Promise((resolve, reject) => eos(this, (err) => (err && err !== error ? reject(err) : resolve(null))));
271};
272
273// Manually shove something into the read() buffer.
274// This returns true if the highWaterMark has not been hit yet,
275// similar to how Writable.write() returns true if you should
276// write() some more.
277Readable.prototype.push = function(chunk, encoding) {
278  return readableAddChunk(this, chunk, encoding, false);
279};
280
281// Unshift should *always* be something directly out of read().
282Readable.prototype.unshift = function(chunk, encoding) {
283  return readableAddChunk(this, chunk, encoding, true);
284};
285
286function readableAddChunk(stream, chunk, encoding, addToFront) {
287  debug('readableAddChunk', chunk);
288  const state = stream._readableState;
289
290  let err;
291  if ((state.state & kObjectMode) === 0) {
292    if (typeof chunk === 'string') {
293      encoding = encoding || state.defaultEncoding;
294      if (state.encoding !== encoding) {
295        if (addToFront && state.encoding) {
296          // When unshifting, if state.encoding is set, we have to save
297          // the string in the BufferList with the state encoding.
298          chunk = Buffer.from(chunk, encoding).toString(state.encoding);
299        } else {
300          chunk = Buffer.from(chunk, encoding);
301          encoding = '';
302        }
303      }
304    } else if (chunk instanceof Buffer) {
305      encoding = '';
306    } else if (Stream._isUint8Array(chunk)) {
307      chunk = Stream._uint8ArrayToBuffer(chunk);
308      encoding = '';
309    } else if (chunk != null) {
310      err = new ERR_INVALID_ARG_TYPE(
311        'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
312    }
313  }
314
315  if (err) {
316    errorOrDestroy(stream, err);
317  } else if (chunk === null) {
318    state.state &= ~kReading;
319    onEofChunk(stream, state);
320  } else if (((state.state & kObjectMode) !== 0) || (chunk && chunk.length > 0)) {
321    if (addToFront) {
322      if ((state.state & kEndEmitted) !== 0)
323        errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
324      else if (state.destroyed || state.errored)
325        return false;
326      else
327        addChunk(stream, state, chunk, true);
328    } else if (state.ended) {
329      errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
330    } else if (state.destroyed || state.errored) {
331      return false;
332    } else {
333      state.state &= ~kReading;
334      if (state.decoder && !encoding) {
335        chunk = state.decoder.write(chunk);
336        if (state.objectMode || chunk.length !== 0)
337          addChunk(stream, state, chunk, false);
338        else
339          maybeReadMore(stream, state);
340      } else {
341        addChunk(stream, state, chunk, false);
342      }
343    }
344  } else if (!addToFront) {
345    state.state &= ~kReading;
346    maybeReadMore(stream, state);
347  }
348
349  // We can push more data if we are below the highWaterMark.
350  // Also, if we have no data yet, we can stand some more bytes.
351  // This is to work around cases where hwm=0, such as the repl.
352  return !state.ended &&
353    (state.length < state.highWaterMark || state.length === 0);
354}
355
356function addChunk(stream, state, chunk, addToFront) {
357  if (state.flowing && state.length === 0 && !state.sync &&
358      stream.listenerCount('data') > 0) {
359    // Use the guard to avoid creating `Set()` repeatedly
360    // when we have multiple pipes.
361    if ((state.state & kMultiAwaitDrain) !== 0) {
362      state.awaitDrainWriters.clear();
363    } else {
364      state.awaitDrainWriters = null;
365    }
366
367    state.dataEmitted = true;
368    stream.emit('data', chunk);
369  } else {
370    // Update the buffer info.
371    state.length += state.objectMode ? 1 : chunk.length;
372    if (addToFront)
373      state.buffer.unshift(chunk);
374    else
375      state.buffer.push(chunk);
376
377    if ((state.state & kNeedReadable) !== 0)
378      emitReadable(stream);
379  }
380  maybeReadMore(stream, state);
381}
382
383Readable.prototype.isPaused = function() {
384  const state = this._readableState;
385  return state[kPaused] === true || state.flowing === false;
386};
387
388// Backwards compatibility.
389Readable.prototype.setEncoding = function(enc) {
390  const decoder = new StringDecoder(enc);
391  this._readableState.decoder = decoder;
392  // If setEncoding(null), decoder.encoding equals utf8.
393  this._readableState.encoding = this._readableState.decoder.encoding;
394
395  const buffer = this._readableState.buffer;
396  // Iterate over current buffer to convert already stored Buffers:
397  let content = '';
398  for (const data of buffer) {
399    content += decoder.write(data);
400  }
401  buffer.clear();
402  if (content !== '')
403    buffer.push(content);
404  this._readableState.length = content.length;
405  return this;
406};
407
408// Don't raise the hwm > 1GB.
409const MAX_HWM = 0x40000000;
410function computeNewHighWaterMark(n) {
411  if (n > MAX_HWM) {
412    throw new ERR_OUT_OF_RANGE('size', '<= 1GiB', n);
413  } else {
414    // Get the next highest power of 2 to prevent increasing hwm excessively in
415    // tiny amounts.
416    n--;
417    n |= n >>> 1;
418    n |= n >>> 2;
419    n |= n >>> 4;
420    n |= n >>> 8;
421    n |= n >>> 16;
422    n++;
423  }
424  return n;
425}
426
427// This function is designed to be inlinable, so please take care when making
428// changes to the function body.
429function howMuchToRead(n, state) {
430  if (n <= 0 || (state.length === 0 && state.ended))
431    return 0;
432  if ((state.state & kObjectMode) !== 0)
433    return 1;
434  if (NumberIsNaN(n)) {
435    // Only flow one buffer at a time.
436    if (state.flowing && state.length)
437      return state.buffer.first().length;
438    return state.length;
439  }
440  if (n <= state.length)
441    return n;
442  return state.ended ? state.length : 0;
443}
444
445// You can override either this method, or the async _read(n) below.
446Readable.prototype.read = function(n) {
447  debug('read', n);
448  // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
449  // in this scenario, so we are doing it manually.
450  if (n === undefined) {
451    n = NaN;
452  } else if (!NumberIsInteger(n)) {
453    n = NumberParseInt(n, 10);
454  }
455  const state = this._readableState;
456  const nOrig = n;
457
458  // If we're asking for more than the current hwm, then raise the hwm.
459  if (n > state.highWaterMark)
460    state.highWaterMark = computeNewHighWaterMark(n);
461
462  if (n !== 0)
463    state.state &= ~kEmittedReadable;
464
465  // If we're doing read(0) to trigger a readable event, but we
466  // already have a bunch of data in the buffer, then just trigger
467  // the 'readable' event and move on.
468  if (n === 0 &&
469      state.needReadable &&
470      ((state.highWaterMark !== 0 ?
471        state.length >= state.highWaterMark :
472        state.length > 0) ||
473       state.ended)) {
474    debug('read: emitReadable', state.length, state.ended);
475    if (state.length === 0 && state.ended)
476      endReadable(this);
477    else
478      emitReadable(this);
479    return null;
480  }
481
482  n = howMuchToRead(n, state);
483
484  // If we've ended, and we're now clear, then finish it up.
485  if (n === 0 && state.ended) {
486    if (state.length === 0)
487      endReadable(this);
488    return null;
489  }
490
491  // All the actual chunk generation logic needs to be
492  // *below* the call to _read.  The reason is that in certain
493  // synthetic stream cases, such as passthrough streams, _read
494  // may be a completely synchronous operation which may change
495  // the state of the read buffer, providing enough data when
496  // before there was *not* enough.
497  //
498  // So, the steps are:
499  // 1. Figure out what the state of things will be after we do
500  // a read from the buffer.
501  //
502  // 2. If that resulting state will trigger a _read, then call _read.
503  // Note that this may be asynchronous, or synchronous.  Yes, it is
504  // deeply ugly to write APIs this way, but that still doesn't mean
505  // that the Readable class should behave improperly, as streams are
506  // designed to be sync/async agnostic.
507  // Take note if the _read call is sync or async (ie, if the read call
508  // has returned yet), so that we know whether or not it's safe to emit
509  // 'readable' etc.
510  //
511  // 3. Actually pull the requested chunks out of the buffer and return.
512
513  // if we need a readable event, then we need to do some reading.
514  let doRead = (state.state & kNeedReadable) !== 0;
515  debug('need readable', doRead);
516
517  // If we currently have less than the highWaterMark, then also read some.
518  if (state.length === 0 || state.length - n < state.highWaterMark) {
519    doRead = true;
520    debug('length less than watermark', doRead);
521  }
522
523  // However, if we've ended, then there's no point, if we're already
524  // reading, then it's unnecessary, if we're constructing we have to wait,
525  // and if we're destroyed or errored, then it's not allowed,
526  if (state.ended || state.reading || state.destroyed || state.errored ||
527      !state.constructed) {
528    doRead = false;
529    debug('reading, ended or constructing', doRead);
530  } else if (doRead) {
531    debug('do read');
532    state.state |= kReading | kSync;
533    // If the length is currently zero, then we *need* a readable event.
534    if (state.length === 0)
535      state.state |= kNeedReadable;
536
537    // Call internal read method
538    try {
539      this._read(state.highWaterMark);
540    } catch (err) {
541      errorOrDestroy(this, err);
542    }
543    state.state &= ~kSync;
544
545    // If _read pushed data synchronously, then `reading` will be false,
546    // and we need to re-evaluate how much data we can return to the user.
547    if (!state.reading)
548      n = howMuchToRead(nOrig, state);
549  }
550
551  let ret;
552  if (n > 0)
553    ret = fromList(n, state);
554  else
555    ret = null;
556
557  if (ret === null) {
558    state.needReadable = state.length <= state.highWaterMark;
559    n = 0;
560  } else {
561    state.length -= n;
562    if (state.multiAwaitDrain) {
563      state.awaitDrainWriters.clear();
564    } else {
565      state.awaitDrainWriters = null;
566    }
567  }
568
569  if (state.length === 0) {
570    // If we have nothing in the buffer, then we want to know
571    // as soon as we *do* get something into the buffer.
572    if (!state.ended)
573      state.needReadable = true;
574
575    // If we tried to read() past the EOF, then emit end on the next tick.
576    if (nOrig !== n && state.ended)
577      endReadable(this);
578  }
579
580  if (ret !== null && !state.errorEmitted && !state.closeEmitted) {
581    state.dataEmitted = true;
582    this.emit('data', ret);
583  }
584
585  return ret;
586};
587
588function onEofChunk(stream, state) {
589  debug('onEofChunk');
590  if (state.ended) return;
591  if (state.decoder) {
592    const chunk = state.decoder.end();
593    if (chunk && chunk.length) {
594      state.buffer.push(chunk);
595      state.length += state.objectMode ? 1 : chunk.length;
596    }
597  }
598  state.ended = true;
599
600  if (state.sync) {
601    // If we are sync, wait until next tick to emit the data.
602    // Otherwise we risk emitting data in the flow()
603    // the readable code triggers during a read() call.
604    emitReadable(stream);
605  } else {
606    // Emit 'readable' now to make sure it gets picked up.
607    state.needReadable = false;
608    state.emittedReadable = true;
609    // We have to emit readable now that we are EOF. Modules
610    // in the ecosystem (e.g. dicer) rely on this event being sync.
611    emitReadable_(stream);
612  }
613}
614
615// Don't emit readable right away in sync mode, because this can trigger
616// another read() call => stack overflow.  This way, it might trigger
617// a nextTick recursion warning, but that's not so bad.
618function emitReadable(stream) {
619  const state = stream._readableState;
620  debug('emitReadable', state.needReadable, state.emittedReadable);
621  state.needReadable = false;
622  if (!state.emittedReadable) {
623    debug('emitReadable', state.flowing);
624    state.emittedReadable = true;
625    process.nextTick(emitReadable_, stream);
626  }
627}
628
629function emitReadable_(stream) {
630  const state = stream._readableState;
631  debug('emitReadable_', state.destroyed, state.length, state.ended);
632  if (!state.destroyed && !state.errored && (state.length || state.ended)) {
633    stream.emit('readable');
634    state.emittedReadable = false;
635  }
636
637  // The stream needs another readable event if:
638  // 1. It is not flowing, as the flow mechanism will take
639  //    care of it.
640  // 2. It is not ended.
641  // 3. It is below the highWaterMark, so we can schedule
642  //    another readable later.
643  state.needReadable =
644    !state.flowing &&
645    !state.ended &&
646    state.length <= state.highWaterMark;
647  flow(stream);
648}
649
650
651// At this point, the user has presumably seen the 'readable' event,
652// and called read() to consume some data.  that may have triggered
653// in turn another _read(n) call, in which case reading = true if
654// it's in progress.
655// However, if we're not ended, or reading, and the length < hwm,
656// then go ahead and try to read some more preemptively.
657function maybeReadMore(stream, state) {
658  if (!state.readingMore && state.constructed) {
659    state.readingMore = true;
660    process.nextTick(maybeReadMore_, stream, state);
661  }
662}
663
664function maybeReadMore_(stream, state) {
665  // Attempt to read more data if we should.
666  //
667  // The conditions for reading more data are (one of):
668  // - Not enough data buffered (state.length < state.highWaterMark). The loop
669  //   is responsible for filling the buffer with enough data if such data
670  //   is available. If highWaterMark is 0 and we are not in the flowing mode
671  //   we should _not_ attempt to buffer any extra data. We'll get more data
672  //   when the stream consumer calls read() instead.
673  // - No data in the buffer, and the stream is in flowing mode. In this mode
674  //   the loop below is responsible for ensuring read() is called. Failing to
675  //   call read here would abort the flow and there's no other mechanism for
676  //   continuing the flow if the stream consumer has just subscribed to the
677  //   'data' event.
678  //
679  // In addition to the above conditions to keep reading data, the following
680  // conditions prevent the data from being read:
681  // - The stream has ended (state.ended).
682  // - There is already a pending 'read' operation (state.reading). This is a
683  //   case where the stream has called the implementation defined _read()
684  //   method, but they are processing the call asynchronously and have _not_
685  //   called push() with new data. In this case we skip performing more
686  //   read()s. The execution ends in this method again after the _read() ends
687  //   up calling push() with more data.
688  while (!state.reading && !state.ended &&
689         (state.length < state.highWaterMark ||
690          (state.flowing && state.length === 0))) {
691    const len = state.length;
692    debug('maybeReadMore read 0');
693    stream.read(0);
694    if (len === state.length)
695      // Didn't get any data, stop spinning.
696      break;
697  }
698  state.readingMore = false;
699}
700
701// Abstract method.  to be overridden in specific implementation classes.
702// call cb(er, data) where data is <= n in length.
703// for virtual (non-string, non-buffer) streams, "length" is somewhat
704// arbitrary, and perhaps not very meaningful.
705Readable.prototype._read = function(n) {
706  throw new ERR_METHOD_NOT_IMPLEMENTED('_read()');
707};
708
709Readable.prototype.pipe = function(dest, pipeOpts) {
710  const src = this;
711  const state = this._readableState;
712
713  if (state.pipes.length === 1) {
714    if (!state.multiAwaitDrain) {
715      state.multiAwaitDrain = true;
716      state.awaitDrainWriters = new SafeSet(
717        state.awaitDrainWriters ? [state.awaitDrainWriters] : [],
718      );
719    }
720  }
721
722  state.pipes.push(dest);
723  debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);
724
725  const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
726              dest !== process.stdout &&
727              dest !== process.stderr;
728
729  const endFn = doEnd ? onend : unpipe;
730  if (state.endEmitted)
731    process.nextTick(endFn);
732  else
733    src.once('end', endFn);
734
735  dest.on('unpipe', onunpipe);
736  function onunpipe(readable, unpipeInfo) {
737    debug('onunpipe');
738    if (readable === src) {
739      if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
740        unpipeInfo.hasUnpiped = true;
741        cleanup();
742      }
743    }
744  }
745
746  function onend() {
747    debug('onend');
748    dest.end();
749  }
750
751  let ondrain;
752
753  let cleanedUp = false;
754  function cleanup() {
755    debug('cleanup');
756    // Cleanup event handlers once the pipe is broken.
757    dest.removeListener('close', onclose);
758    dest.removeListener('finish', onfinish);
759    if (ondrain) {
760      dest.removeListener('drain', ondrain);
761    }
762    dest.removeListener('error', onerror);
763    dest.removeListener('unpipe', onunpipe);
764    src.removeListener('end', onend);
765    src.removeListener('end', unpipe);
766    src.removeListener('data', ondata);
767
768    cleanedUp = true;
769
770    // If the reader is waiting for a drain event from this
771    // specific writer, then it would cause it to never start
772    // flowing again.
773    // So, if this is awaiting a drain, then we just call it now.
774    // If we don't know, then assume that we are waiting for one.
775    if (ondrain && state.awaitDrainWriters &&
776        (!dest._writableState || dest._writableState.needDrain))
777      ondrain();
778  }
779
780  function pause() {
781    // If the user unpiped during `dest.write()`, it is possible
782    // to get stuck in a permanently paused state if that write
783    // also returned false.
784    // => Check whether `dest` is still a piping destination.
785    if (!cleanedUp) {
786      if (state.pipes.length === 1 && state.pipes[0] === dest) {
787        debug('false write response, pause', 0);
788        state.awaitDrainWriters = dest;
789        state.multiAwaitDrain = false;
790      } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
791        debug('false write response, pause', state.awaitDrainWriters.size);
792        state.awaitDrainWriters.add(dest);
793      }
794      src.pause();
795    }
796    if (!ondrain) {
797      // When the dest drains, it reduces the awaitDrain counter
798      // on the source.  This would be more elegant with a .once()
799      // handler in flow(), but adding and removing repeatedly is
800      // too slow.
801      ondrain = pipeOnDrain(src, dest);
802      dest.on('drain', ondrain);
803    }
804  }
805
806  src.on('data', ondata);
807  function ondata(chunk) {
808    debug('ondata');
809    const ret = dest.write(chunk);
810    debug('dest.write', ret);
811    if (ret === false) {
812      pause();
813    }
814  }
815
816  // If the dest has an error, then stop piping into it.
817  // However, don't suppress the throwing behavior for this.
818  function onerror(er) {
819    debug('onerror', er);
820    unpipe();
821    dest.removeListener('error', onerror);
822    if (dest.listenerCount('error') === 0) {
823      const s = dest._writableState || dest._readableState;
824      if (s && !s.errorEmitted) {
825        // User incorrectly emitted 'error' directly on the stream.
826        errorOrDestroy(dest, er);
827      } else {
828        dest.emit('error', er);
829      }
830    }
831  }
832
833  // Make sure our error handler is attached before userland ones.
834  prependListener(dest, 'error', onerror);
835
836  // Both close and finish should trigger unpipe, but only once.
837  function onclose() {
838    dest.removeListener('finish', onfinish);
839    unpipe();
840  }
841  dest.once('close', onclose);
842  function onfinish() {
843    debug('onfinish');
844    dest.removeListener('close', onclose);
845    unpipe();
846  }
847  dest.once('finish', onfinish);
848
849  function unpipe() {
850    debug('unpipe');
851    src.unpipe(dest);
852  }
853
854  // Tell the dest that it's being piped to.
855  dest.emit('pipe', src);
856
857  // Start the flow if it hasn't been started already.
858
859  if (dest.writableNeedDrain === true) {
860    pause();
861  } else if (!state.flowing) {
862    debug('pipe resume');
863    src.resume();
864  }
865
866  return dest;
867};
868
869function pipeOnDrain(src, dest) {
870  return function pipeOnDrainFunctionResult() {
871    const state = src._readableState;
872
873    // `ondrain` will call directly,
874    // `this` maybe not a reference to dest,
875    // so we use the real dest here.
876    if (state.awaitDrainWriters === dest) {
877      debug('pipeOnDrain', 1);
878      state.awaitDrainWriters = null;
879    } else if (state.multiAwaitDrain) {
880      debug('pipeOnDrain', state.awaitDrainWriters.size);
881      state.awaitDrainWriters.delete(dest);
882    }
883
884    if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
885      src.listenerCount('data')) {
886      src.resume();
887    }
888  };
889}
890
891
892Readable.prototype.unpipe = function(dest) {
893  const state = this._readableState;
894  const unpipeInfo = { hasUnpiped: false };
895
896  // If we're not piping anywhere, then do nothing.
897  if (state.pipes.length === 0)
898    return this;
899
900  if (!dest) {
901    // remove all.
902    const dests = state.pipes;
903    state.pipes = [];
904    this.pause();
905
906    for (let i = 0; i < dests.length; i++)
907      dests[i].emit('unpipe', this, { hasUnpiped: false });
908    return this;
909  }
910
911  // Try to find the right one.
912  const index = ArrayPrototypeIndexOf(state.pipes, dest);
913  if (index === -1)
914    return this;
915
916  state.pipes.splice(index, 1);
917  if (state.pipes.length === 0)
918    this.pause();
919
920  dest.emit('unpipe', this, unpipeInfo);
921
922  return this;
923};
924
925// Set up data events if they are asked for
926// Ensure readable listeners eventually get something.
927Readable.prototype.on = function(ev, fn) {
928  const res = Stream.prototype.on.call(this, ev, fn);
929  const state = this._readableState;
930
931  if (ev === 'data') {
932    // Update readableListening so that resume() may be a no-op
933    // a few lines down. This is needed to support once('readable').
934    state.readableListening = this.listenerCount('readable') > 0;
935
936    // Try start flowing on next tick if stream isn't explicitly paused.
937    if (state.flowing !== false)
938      this.resume();
939  } else if (ev === 'readable') {
940    if (!state.endEmitted && !state.readableListening) {
941      state.readableListening = state.needReadable = true;
942      state.flowing = false;
943      state.emittedReadable = false;
944      debug('on readable', state.length, state.reading);
945      if (state.length) {
946        emitReadable(this);
947      } else if (!state.reading) {
948        process.nextTick(nReadingNextTick, this);
949      }
950    }
951  }
952
953  return res;
954};
955Readable.prototype.addListener = Readable.prototype.on;
956
957Readable.prototype.removeListener = function(ev, fn) {
958  const res = Stream.prototype.removeListener.call(this,
959                                                   ev, fn);
960
961  if (ev === 'readable') {
962    // We need to check if there is someone still listening to
963    // readable and reset the state. However this needs to happen
964    // after readable has been emitted but before I/O (nextTick) to
965    // support once('readable', fn) cycles. This means that calling
966    // resume within the same tick will have no
967    // effect.
968    process.nextTick(updateReadableListening, this);
969  }
970
971  return res;
972};
973Readable.prototype.off = Readable.prototype.removeListener;
974
975Readable.prototype.removeAllListeners = function(ev) {
976  const res = Stream.prototype.removeAllListeners.apply(this,
977                                                        arguments);
978
979  if (ev === 'readable' || ev === undefined) {
980    // We need to check if there is someone still listening to
981    // readable and reset the state. However this needs to happen
982    // after readable has been emitted but before I/O (nextTick) to
983    // support once('readable', fn) cycles. This means that calling
984    // resume within the same tick will have no
985    // effect.
986    process.nextTick(updateReadableListening, this);
987  }
988
989  return res;
990};
991
992function updateReadableListening(self) {
993  const state = self._readableState;
994  state.readableListening = self.listenerCount('readable') > 0;
995
996  if (state.resumeScheduled && state[kPaused] === false) {
997    // Flowing needs to be set to true now, otherwise
998    // the upcoming resume will not flow.
999    state.flowing = true;
1000
1001    // Crude way to check if we should resume.
1002  } else if (self.listenerCount('data') > 0) {
1003    self.resume();
1004  } else if (!state.readableListening) {
1005    state.flowing = null;
1006  }
1007}
1008
1009function nReadingNextTick(self) {
1010  debug('readable nexttick read 0');
1011  self.read(0);
1012}
1013
1014// pause() and resume() are remnants of the legacy readable stream API
1015// If the user uses them, then switch into old mode.
1016Readable.prototype.resume = function() {
1017  const state = this._readableState;
1018  if (!state.flowing) {
1019    debug('resume');
1020    // We flow only if there is no one listening
1021    // for readable, but we still have to call
1022    // resume().
1023    state.flowing = !state.readableListening;
1024    resume(this, state);
1025  }
1026  state[kPaused] = false;
1027  return this;
1028};
1029
1030function resume(stream, state) {
1031  if (!state.resumeScheduled) {
1032    state.resumeScheduled = true;
1033    process.nextTick(resume_, stream, state);
1034  }
1035}
1036
1037function resume_(stream, state) {
1038  debug('resume', state.reading);
1039  if (!state.reading) {
1040    stream.read(0);
1041  }
1042
1043  state.resumeScheduled = false;
1044  stream.emit('resume');
1045  flow(stream);
1046  if (state.flowing && !state.reading)
1047    stream.read(0);
1048}
1049
1050Readable.prototype.pause = function() {
1051  debug('call pause flowing=%j', this._readableState.flowing);
1052  if (this._readableState.flowing !== false) {
1053    debug('pause');
1054    this._readableState.flowing = false;
1055    this.emit('pause');
1056  }
1057  this._readableState[kPaused] = true;
1058  return this;
1059};
1060
1061function flow(stream) {
1062  const state = stream._readableState;
1063  debug('flow', state.flowing);
1064  while (state.flowing && stream.read() !== null);
1065}
1066
1067// Wrap an old-style stream as the async data source.
1068// This is *not* part of the readable stream interface.
1069// It is an ugly unfortunate mess of history.
1070Readable.prototype.wrap = function(stream) {
1071  let paused = false;
1072
1073  // TODO (ronag): Should this.destroy(err) emit
1074  // 'error' on the wrapped stream? Would require
1075  // a static factory method, e.g. Readable.wrap(stream).
1076
1077  stream.on('data', (chunk) => {
1078    if (!this.push(chunk) && stream.pause) {
1079      paused = true;
1080      stream.pause();
1081    }
1082  });
1083
1084  stream.on('end', () => {
1085    this.push(null);
1086  });
1087
1088  stream.on('error', (err) => {
1089    errorOrDestroy(this, err);
1090  });
1091
1092  stream.on('close', () => {
1093    this.destroy();
1094  });
1095
1096  stream.on('destroy', () => {
1097    this.destroy();
1098  });
1099
1100  this._read = () => {
1101    if (paused && stream.resume) {
1102      paused = false;
1103      stream.resume();
1104    }
1105  };
1106
1107  // Proxy all the other methods. Important when wrapping filters and duplexes.
1108  const streamKeys = ObjectKeys(stream);
1109  for (let j = 1; j < streamKeys.length; j++) {
1110    const i = streamKeys[j];
1111    if (this[i] === undefined && typeof stream[i] === 'function') {
1112      this[i] = stream[i].bind(stream);
1113    }
1114  }
1115
1116  return this;
1117};
1118
1119Readable.prototype[SymbolAsyncIterator] = function() {
1120  return streamToAsyncIterator(this);
1121};
1122
1123Readable.prototype.iterator = function(options) {
1124  if (options !== undefined) {
1125    validateObject(options, 'options');
1126  }
1127  return streamToAsyncIterator(this, options);
1128};
1129
1130function streamToAsyncIterator(stream, options) {
1131  if (typeof stream.read !== 'function') {
1132    stream = Readable.wrap(stream, { objectMode: true });
1133  }
1134
1135  const iter = createAsyncIterator(stream, options);
1136  iter.stream = stream;
1137  return iter;
1138}
1139
1140async function* createAsyncIterator(stream, options) {
1141  let callback = nop;
1142
1143  function next(resolve) {
1144    if (this === stream) {
1145      callback();
1146      callback = nop;
1147    } else {
1148      callback = resolve;
1149    }
1150  }
1151
1152  stream.on('readable', next);
1153
1154  let error;
1155  const cleanup = eos(stream, { writable: false }, (err) => {
1156    error = err ? aggregateTwoErrors(error, err) : null;
1157    callback();
1158    callback = nop;
1159  });
1160
1161  try {
1162    while (true) {
1163      const chunk = stream.destroyed ? null : stream.read();
1164      if (chunk !== null) {
1165        yield chunk;
1166      } else if (error) {
1167        throw error;
1168      } else if (error === null) {
1169        return;
1170      } else {
1171        await new Promise(next);
1172      }
1173    }
1174  } catch (err) {
1175    error = aggregateTwoErrors(error, err);
1176    throw error;
1177  } finally {
1178    if (
1179      (error || options?.destroyOnReturn !== false) &&
1180      (error === undefined || stream._readableState.autoDestroy)
1181    ) {
1182      destroyImpl.destroyer(stream, null);
1183    } else {
1184      stream.off('readable', next);
1185      cleanup();
1186    }
1187  }
1188}
1189
1190// Making it explicit these properties are not enumerable
1191// because otherwise some prototype manipulation in
1192// userland will fail.
1193ObjectDefineProperties(Readable.prototype, {
1194  readable: {
1195    __proto__: null,
1196    get() {
1197      const r = this._readableState;
1198      // r.readable === false means that this is part of a Duplex stream
1199      // where the readable side was disabled upon construction.
1200      // Compat. The user might manually disable readable side through
1201      // deprecated setter.
1202      return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted &&
1203        !r.endEmitted;
1204    },
1205    set(val) {
1206      // Backwards compat.
1207      if (this._readableState) {
1208        this._readableState.readable = !!val;
1209      }
1210    },
1211  },
1212
1213  readableDidRead: {
1214    __proto__: null,
1215    enumerable: false,
1216    get: function() {
1217      return this._readableState.dataEmitted;
1218    },
1219  },
1220
1221  readableAborted: {
1222    __proto__: null,
1223    enumerable: false,
1224    get: function() {
1225      return !!(
1226        this._readableState.readable !== false &&
1227        (this._readableState.destroyed || this._readableState.errored) &&
1228        !this._readableState.endEmitted
1229      );
1230    },
1231  },
1232
1233  readableHighWaterMark: {
1234    __proto__: null,
1235    enumerable: false,
1236    get: function() {
1237      return this._readableState.highWaterMark;
1238    },
1239  },
1240
1241  readableBuffer: {
1242    __proto__: null,
1243    enumerable: false,
1244    get: function() {
1245      return this._readableState && this._readableState.buffer;
1246    },
1247  },
1248
1249  readableFlowing: {
1250    __proto__: null,
1251    enumerable: false,
1252    get: function() {
1253      return this._readableState.flowing;
1254    },
1255    set: function(state) {
1256      if (this._readableState) {
1257        this._readableState.flowing = state;
1258      }
1259    },
1260  },
1261
1262  readableLength: {
1263    __proto__: null,
1264    enumerable: false,
1265    get() {
1266      return this._readableState.length;
1267    },
1268  },
1269
1270  readableObjectMode: {
1271    __proto__: null,
1272    enumerable: false,
1273    get() {
1274      return this._readableState ? this._readableState.objectMode : false;
1275    },
1276  },
1277
1278  readableEncoding: {
1279    __proto__: null,
1280    enumerable: false,
1281    get() {
1282      return this._readableState ? this._readableState.encoding : null;
1283    },
1284  },
1285
1286  errored: {
1287    __proto__: null,
1288    enumerable: false,
1289    get() {
1290      return this._readableState ? this._readableState.errored : null;
1291    },
1292  },
1293
1294  closed: {
1295    __proto__: null,
1296    get() {
1297      return this._readableState ? this._readableState.closed : false;
1298    },
1299  },
1300
1301  destroyed: {
1302    __proto__: null,
1303    enumerable: false,
1304    get() {
1305      return this._readableState ? this._readableState.destroyed : false;
1306    },
1307    set(value) {
1308      // We ignore the value if the stream
1309      // has not been initialized yet.
1310      if (!this._readableState) {
1311        return;
1312      }
1313
1314      // Backward compatibility, the user is explicitly
1315      // managing destroyed.
1316      this._readableState.destroyed = value;
1317    },
1318  },
1319
1320  readableEnded: {
1321    __proto__: null,
1322    enumerable: false,
1323    get() {
1324      return this._readableState ? this._readableState.endEmitted : false;
1325    },
1326  },
1327
1328});
1329
1330ObjectDefineProperties(ReadableState.prototype, {
1331  // Legacy getter for `pipesCount`.
1332  pipesCount: {
1333    __proto__: null,
1334    get() {
1335      return this.pipes.length;
1336    },
1337  },
1338
1339  // Legacy property for `paused`.
1340  paused: {
1341    __proto__: null,
1342    get() {
1343      return this[kPaused] !== false;
1344    },
1345    set(value) {
1346      this[kPaused] = !!value;
1347    },
1348  },
1349});
1350
1351// Exposed for testing purposes only.
1352Readable._fromList = fromList;
1353
1354// Pluck off n bytes from an array of buffers.
1355// Length is the combined lengths of all the buffers in the list.
1356// This function is designed to be inlinable, so please take care when making
1357// changes to the function body.
1358function fromList(n, state) {
1359  // nothing buffered.
1360  if (state.length === 0)
1361    return null;
1362
1363  let ret;
1364  if (state.objectMode)
1365    ret = state.buffer.shift();
1366  else if (!n || n >= state.length) {
1367    // Read it all, truncate the list.
1368    if (state.decoder)
1369      ret = state.buffer.join('');
1370    else if (state.buffer.length === 1)
1371      ret = state.buffer.first();
1372    else
1373      ret = state.buffer.concat(state.length);
1374    state.buffer.clear();
1375  } else {
1376    // read part of list.
1377    ret = state.buffer.consume(n, state.decoder);
1378  }
1379
1380  return ret;
1381}
1382
1383function endReadable(stream) {
1384  const state = stream._readableState;
1385
1386  debug('endReadable', state.endEmitted);
1387  if (!state.endEmitted) {
1388    state.ended = true;
1389    process.nextTick(endReadableNT, state, stream);
1390  }
1391}
1392
1393function endReadableNT(state, stream) {
1394  debug('endReadableNT', state.endEmitted, state.length);
1395
1396  // Check that we didn't get one last unshift.
1397  if (!state.errored && !state.closeEmitted &&
1398      !state.endEmitted && state.length === 0) {
1399    state.endEmitted = true;
1400    stream.emit('end');
1401
1402    if (stream.writable && stream.allowHalfOpen === false) {
1403      process.nextTick(endWritableNT, stream);
1404    } else if (state.autoDestroy) {
1405      // In case of duplex streams we need a way to detect
1406      // if the writable side is ready for autoDestroy as well.
1407      const wState = stream._writableState;
1408      const autoDestroy = !wState || (
1409        wState.autoDestroy &&
1410        // We don't expect the writable to ever 'finish'
1411        // if writable is explicitly set to false.
1412        (wState.finished || wState.writable === false)
1413      );
1414
1415      if (autoDestroy) {
1416        stream.destroy();
1417      }
1418    }
1419  }
1420}
1421
1422function endWritableNT(stream) {
1423  const writable = stream.writable && !stream.writableEnded &&
1424    !stream.destroyed;
1425  if (writable) {
1426    stream.end();
1427  }
1428}
1429
1430Readable.from = function(iterable, opts) {
1431  return from(Readable, iterable, opts);
1432};
1433
1434let webStreamsAdapters;
1435
1436// Lazy to avoid circular references
1437function lazyWebStreams() {
1438  if (webStreamsAdapters === undefined)
1439    webStreamsAdapters = require('internal/webstreams/adapters');
1440  return webStreamsAdapters;
1441}
1442
1443Readable.fromWeb = function(readableStream, options) {
1444  return lazyWebStreams().newStreamReadableFromReadableStream(
1445    readableStream,
1446    options);
1447};
1448
1449Readable.toWeb = function(streamReadable, options) {
1450  return lazyWebStreams().newReadableStreamFromStreamReadable(
1451    streamReadable,
1452    options);
1453};
1454
1455Readable.wrap = function(src, options) {
1456  return new Readable({
1457    objectMode: src.readableObjectMode ?? src.objectMode ?? true,
1458    ...options,
1459    destroy(err, callback) {
1460      destroyImpl.destroyer(src, err);
1461      callback(err);
1462    },
1463  }).wrap(src);
1464};
1465