• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const {
25  NumberIsInteger,
26  NumberIsNaN,
27  NumberParseInt,
28  ObjectDefineProperties,
29  ObjectSetPrototypeOf,
30  Promise,
31  Set,
32  SymbolAsyncIterator,
33  Symbol
34} = primordials;
35
36module.exports = Readable;
37Readable.ReadableState = ReadableState;
38
39const EE = require('events');
40const { Stream, prependListener } = require('internal/streams/legacy');
41const { Buffer } = require('buffer');
42
43let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
44  debug = fn;
45});
46const BufferList = require('internal/streams/buffer_list');
47const destroyImpl = require('internal/streams/destroy');
48const {
49  getHighWaterMark,
50  getDefaultHighWaterMark
51} = require('internal/streams/state');
52const {
53  ERR_INVALID_ARG_TYPE,
54  ERR_STREAM_PUSH_AFTER_EOF,
55  ERR_METHOD_NOT_IMPLEMENTED,
56  ERR_STREAM_UNSHIFT_AFTER_END_EVENT
57} = require('internal/errors').codes;
58
59const kPaused = Symbol('kPaused');
60
61// Lazy loaded to improve the startup performance.
62let StringDecoder;
63let from;
64
65ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
66ObjectSetPrototypeOf(Readable, Stream);
67function nop() {}
68
69const { errorOrDestroy } = destroyImpl;
70
71function ReadableState(options, stream, isDuplex) {
72  // Duplex streams are both readable and writable, but share
73  // the same options object.
74  // However, some cases require setting options to different
75  // values for the readable and the writable sides of the duplex stream.
76  // These options can be provided separately as readableXXX and writableXXX.
77  if (typeof isDuplex !== 'boolean')
78    isDuplex = stream instanceof Stream.Duplex;
79
80  // Object stream flag. Used to make read(n) ignore n and to
81  // make all the buffer merging and length checks go away.
82  this.objectMode = !!(options && options.objectMode);
83
84  if (isDuplex)
85    this.objectMode = this.objectMode ||
86      !!(options && options.readableObjectMode);
87
88  // The point at which it stops calling _read() to fill the buffer
89  // Note: 0 is a valid value, means "don't call _read preemptively ever"
90  this.highWaterMark = options ?
91    getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
92    getDefaultHighWaterMark(false);
93
94  // A linked list is used to store data chunks instead of an array because the
95  // linked list can remove elements from the beginning faster than
96  // array.shift().
97  this.buffer = new BufferList();
98  this.length = 0;
99  this.pipes = [];
100  this.flowing = null;
101  this.ended = false;
102  this.endEmitted = false;
103  this.reading = false;
104
105  // A flag to be able to tell if the event 'readable'/'data' is emitted
106  // immediately, or on a later tick.  We set this to true at first, because
107  // any actions that shouldn't happen until "later" should generally also
108  // not happen before the first read call.
109  this.sync = true;
110
111  // Whenever we return null, then we set a flag to say
112  // that we're awaiting a 'readable' event emission.
113  this.needReadable = false;
114  this.emittedReadable = false;
115  this.readableListening = false;
116  this.resumeScheduled = false;
117  this[kPaused] = null;
118
119  // True if the error was already emitted and should not be thrown again.
120  this.errorEmitted = false;
121
122  // Should close be emitted on destroy. Defaults to true.
123  this.emitClose = !options || options.emitClose !== false;
124
125  // Should .destroy() be called after 'end' (and potentially 'finish').
126  this.autoDestroy = !options || options.autoDestroy !== false;
127
128  // Has it been destroyed.
129  this.destroyed = false;
130
131  // Indicates whether the stream has errored. When true no further
132  // _read calls, 'data' or 'readable' events should occur. This is needed
133  // since when autoDestroy is disabled we need a way to tell whether the
134  // stream has failed.
135  this.errored = null;
136
137  // Indicates whether the stream has finished destroying.
138  this.closed = false;
139
140  // True if close has been emitted or would have been emitted
141  // depending on emitClose.
142  this.closeEmitted = false;
143
144  // Crypto is kind of old and crusty.  Historically, its default string
145  // encoding is 'binary' so we have to make this configurable.
146  // Everything else in the universe uses 'utf8', though.
147  this.defaultEncoding = (options && options.defaultEncoding) || 'utf8';
148
149  // Ref the piped dest which we need a drain event on it
150  // type: null | Writable | Set<Writable>.
151  this.awaitDrainWriters = null;
152  this.multiAwaitDrain = false;
153
154  // If true, a maybeReadMore has been scheduled.
155  this.readingMore = false;
156
157  this.dataEmitted = false;
158
159  this.decoder = null;
160  this.encoding = null;
161  if (options && options.encoding) {
162    if (!StringDecoder)
163      StringDecoder = require('string_decoder').StringDecoder;
164    this.decoder = new StringDecoder(options.encoding);
165    this.encoding = options.encoding;
166  }
167}
168
169
170function Readable(options) {
171  if (!(this instanceof Readable))
172    return new Readable(options);
173
174  // Checking for a Stream.Duplex instance is faster here instead of inside
175  // the ReadableState constructor, at least with V8 6.5.
176  const isDuplex = this instanceof Stream.Duplex;
177
178  this._readableState = new ReadableState(options, this, isDuplex);
179
180  if (options) {
181    if (typeof options.read === 'function')
182      this._read = options.read;
183
184    if (typeof options.destroy === 'function')
185      this._destroy = options.destroy;
186  }
187
188  Stream.call(this, options);
189}
190
191Readable.prototype.destroy = destroyImpl.destroy;
192Readable.prototype._undestroy = destroyImpl.undestroy;
193Readable.prototype._destroy = function(err, cb) {
194  cb(err);
195};
196
197Readable.prototype[EE.captureRejectionSymbol] = function(err) {
198  this.destroy(err);
199};
200
201// Manually shove something into the read() buffer.
202// This returns true if the highWaterMark has not been hit yet,
203// similar to how Writable.write() returns true if you should
204// write() some more.
205Readable.prototype.push = function(chunk, encoding) {
206  return readableAddChunk(this, chunk, encoding, false);
207};
208
209// Unshift should *always* be something directly out of read().
210Readable.prototype.unshift = function(chunk, encoding) {
211  return readableAddChunk(this, chunk, encoding, true);
212};
213
214function readableAddChunk(stream, chunk, encoding, addToFront) {
215  debug('readableAddChunk', chunk);
216  const state = stream._readableState;
217
218  let err;
219  if (!state.objectMode) {
220    if (typeof chunk === 'string') {
221      encoding = encoding || state.defaultEncoding;
222      if (state.encoding !== encoding) {
223        if (addToFront && state.encoding) {
224          // When unshifting, if state.encoding is set, we have to save
225          // the string in the BufferList with the state encoding.
226          chunk = Buffer.from(chunk, encoding).toString(state.encoding);
227        } else {
228          chunk = Buffer.from(chunk, encoding);
229          encoding = '';
230        }
231      }
232    } else if (chunk instanceof Buffer) {
233      encoding = '';
234    } else if (Stream._isUint8Array(chunk)) {
235      chunk = Stream._uint8ArrayToBuffer(chunk);
236      encoding = '';
237    } else if (chunk != null) {
238      err = new ERR_INVALID_ARG_TYPE(
239        'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
240    }
241  }
242
243  if (err) {
244    errorOrDestroy(stream, err);
245  } else if (chunk === null) {
246    state.reading = false;
247    onEofChunk(stream, state);
248  } else if (state.objectMode || (chunk && chunk.length > 0)) {
249    if (addToFront) {
250      if (state.endEmitted)
251        errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
252      else
253        addChunk(stream, state, chunk, true);
254    } else if (state.ended) {
255      errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
256    } else if (state.destroyed) {
257      return false;
258    } else {
259      state.reading = false;
260      if (state.decoder && !encoding) {
261        chunk = state.decoder.write(chunk);
262        if (state.objectMode || chunk.length !== 0)
263          addChunk(stream, state, chunk, false);
264        else
265          maybeReadMore(stream, state);
266      } else {
267        addChunk(stream, state, chunk, false);
268      }
269    }
270  } else if (!addToFront) {
271    state.reading = false;
272    maybeReadMore(stream, state);
273  }
274
275  // We can push more data if we are below the highWaterMark.
276  // Also, if we have no data yet, we can stand some more bytes.
277  // This is to work around cases where hwm=0, such as the repl.
278  return !state.ended &&
279    (state.length < state.highWaterMark || state.length === 0);
280}
281
282function addChunk(stream, state, chunk, addToFront) {
283  if (state.flowing && state.length === 0 && !state.sync &&
284      stream.listenerCount('data') > 0) {
285    // Use the guard to avoid creating `Set()` repeatedly
286    // when we have multiple pipes.
287    if (state.multiAwaitDrain) {
288      state.awaitDrainWriters.clear();
289    } else {
290      state.awaitDrainWriters = null;
291    }
292    state.dataEmitted = true;
293    stream.emit('data', chunk);
294  } else {
295    // Update the buffer info.
296    state.length += state.objectMode ? 1 : chunk.length;
297    if (addToFront)
298      state.buffer.unshift(chunk);
299    else
300      state.buffer.push(chunk);
301
302    if (state.needReadable)
303      emitReadable(stream);
304  }
305  maybeReadMore(stream, state);
306}
307
308Readable.prototype.isPaused = function() {
309  const state = this._readableState;
310  return state[kPaused] === true || state.flowing === false;
311};
312
313// Backwards compatibility.
314Readable.prototype.setEncoding = function(enc) {
315  if (!StringDecoder)
316    StringDecoder = require('string_decoder').StringDecoder;
317  const decoder = new StringDecoder(enc);
318  this._readableState.decoder = decoder;
319  // If setEncoding(null), decoder.encoding equals utf8.
320  this._readableState.encoding = this._readableState.decoder.encoding;
321
322  const buffer = this._readableState.buffer;
323  // Iterate over current buffer to convert already stored Buffers:
324  let content = '';
325  for (const data of buffer) {
326    content += decoder.write(data);
327  }
328  buffer.clear();
329  if (content !== '')
330    buffer.push(content);
331  this._readableState.length = content.length;
332  return this;
333};
334
335// Don't raise the hwm > 1GB.
336const MAX_HWM = 0x40000000;
337function computeNewHighWaterMark(n) {
338  if (n >= MAX_HWM) {
339    // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE.
340    n = MAX_HWM;
341  } else {
342    // Get the next highest power of 2 to prevent increasing hwm excessively in
343    // tiny amounts.
344    n--;
345    n |= n >>> 1;
346    n |= n >>> 2;
347    n |= n >>> 4;
348    n |= n >>> 8;
349    n |= n >>> 16;
350    n++;
351  }
352  return n;
353}
354
355// This function is designed to be inlinable, so please take care when making
356// changes to the function body.
357function howMuchToRead(n, state) {
358  if (n <= 0 || (state.length === 0 && state.ended))
359    return 0;
360  if (state.objectMode)
361    return 1;
362  if (NumberIsNaN(n)) {
363    // Only flow one buffer at a time.
364    if (state.flowing && state.length)
365      return state.buffer.first().length;
366    return state.length;
367  }
368  if (n <= state.length)
369    return n;
370  return state.ended ? state.length : 0;
371}
372
373// You can override either this method, or the async _read(n) below.
374Readable.prototype.read = function(n) {
375  debug('read', n);
376  // Same as NumberParseInt(undefined, 10), however V8 7.3 performance regressed
377  // in this scenario, so we are doing it manually.
378  if (n === undefined) {
379    n = NaN;
380  } else if (!NumberIsInteger(n)) {
381    n = NumberParseInt(n, 10);
382  }
383  const state = this._readableState;
384  const nOrig = n;
385
386  // If we're asking for more than the current hwm, then raise the hwm.
387  if (n > state.highWaterMark)
388    state.highWaterMark = computeNewHighWaterMark(n);
389
390  if (n !== 0)
391    state.emittedReadable = false;
392
393  // If we're doing read(0) to trigger a readable event, but we
394  // already have a bunch of data in the buffer, then just trigger
395  // the 'readable' event and move on.
396  if (n === 0 &&
397      state.needReadable &&
398      ((state.highWaterMark !== 0 ?
399        state.length >= state.highWaterMark :
400        state.length > 0) ||
401       state.ended)) {
402    debug('read: emitReadable', state.length, state.ended);
403    if (state.length === 0 && state.ended)
404      endReadable(this);
405    else
406      emitReadable(this);
407    return null;
408  }
409
410  n = howMuchToRead(n, state);
411
412  // If we've ended, and we're now clear, then finish it up.
413  if (n === 0 && state.ended) {
414    if (state.length === 0)
415      endReadable(this);
416    return null;
417  }
418
419  // All the actual chunk generation logic needs to be
420  // *below* the call to _read.  The reason is that in certain
421  // synthetic stream cases, such as passthrough streams, _read
422  // may be a completely synchronous operation which may change
423  // the state of the read buffer, providing enough data when
424  // before there was *not* enough.
425  //
426  // So, the steps are:
427  // 1. Figure out what the state of things will be after we do
428  // a read from the buffer.
429  //
430  // 2. If that resulting state will trigger a _read, then call _read.
431  // Note that this may be asynchronous, or synchronous.  Yes, it is
432  // deeply ugly to write APIs this way, but that still doesn't mean
433  // that the Readable class should behave improperly, as streams are
434  // designed to be sync/async agnostic.
435  // Take note if the _read call is sync or async (ie, if the read call
436  // has returned yet), so that we know whether or not it's safe to emit
437  // 'readable' etc.
438  //
439  // 3. Actually pull the requested chunks out of the buffer and return.
440
441  // if we need a readable event, then we need to do some reading.
442  let doRead = state.needReadable;
443  debug('need readable', doRead);
444
445  // If we currently have less than the highWaterMark, then also read some.
446  if (state.length === 0 || state.length - n < state.highWaterMark) {
447    doRead = true;
448    debug('length less than watermark', doRead);
449  }
450
451  // However, if we've ended, then there's no point, if we're already
452  // reading, then it's unnecessary, and if we're destroyed, then it's
453  // not allowed.
454  if (state.ended || state.reading || state.destroyed) {
455    doRead = false;
456    debug('reading or ended', doRead);
457  } else if (doRead) {
458    debug('do read');
459    state.reading = true;
460    state.sync = true;
461    // If the length is currently zero, then we *need* a readable event.
462    if (state.length === 0)
463      state.needReadable = true;
464    // Call internal read method
465    this._read(state.highWaterMark);
466    state.sync = false;
467    // If _read pushed data synchronously, then `reading` will be false,
468    // and we need to re-evaluate how much data we can return to the user.
469    if (!state.reading)
470      n = howMuchToRead(nOrig, state);
471  }
472
473  let ret;
474  if (n > 0)
475    ret = fromList(n, state);
476  else
477    ret = null;
478
479  if (ret === null) {
480    state.needReadable = state.length <= state.highWaterMark;
481    n = 0;
482  } else {
483    state.length -= n;
484    if (state.multiAwaitDrain) {
485      state.awaitDrainWriters.clear();
486    } else {
487      state.awaitDrainWriters = null;
488    }
489  }
490
491  if (state.length === 0) {
492    // If we have nothing in the buffer, then we want to know
493    // as soon as we *do* get something into the buffer.
494    if (!state.ended)
495      state.needReadable = true;
496
497    // If we tried to read() past the EOF, then emit end on the next tick.
498    if (nOrig !== n && state.ended)
499      endReadable(this);
500  }
501
502  if (ret !== null) {
503    state.dataEmitted = true;
504    this.emit('data', ret);
505  }
506
507  return ret;
508};
509
510function onEofChunk(stream, state) {
511  debug('onEofChunk');
512  if (state.ended) return;
513  if (state.decoder) {
514    const chunk = state.decoder.end();
515    if (chunk && chunk.length) {
516      state.buffer.push(chunk);
517      state.length += state.objectMode ? 1 : chunk.length;
518    }
519  }
520  state.ended = true;
521
522  if (state.sync) {
523    // If we are sync, wait until next tick to emit the data.
524    // Otherwise we risk emitting data in the flow()
525    // the readable code triggers during a read() call.
526    emitReadable(stream);
527  } else {
528    // Emit 'readable' now to make sure it gets picked up.
529    state.needReadable = false;
530    state.emittedReadable = true;
531    // We have to emit readable now that we are EOF. Modules
532    // in the ecosystem (e.g. dicer) rely on this event being sync.
533    emitReadable_(stream);
534  }
535}
536
537// Don't emit readable right away in sync mode, because this can trigger
538// another read() call => stack overflow.  This way, it might trigger
539// a nextTick recursion warning, but that's not so bad.
540function emitReadable(stream) {
541  const state = stream._readableState;
542  debug('emitReadable', state.needReadable, state.emittedReadable);
543  state.needReadable = false;
544  if (!state.emittedReadable) {
545    debug('emitReadable', state.flowing);
546    state.emittedReadable = true;
547    process.nextTick(emitReadable_, stream);
548  }
549}
550
551function emitReadable_(stream) {
552  const state = stream._readableState;
553  debug('emitReadable_', state.destroyed, state.length, state.ended);
554  if (!state.destroyed && (state.length || state.ended)) {
555    stream.emit('readable');
556    state.emittedReadable = false;
557  }
558
559  // The stream needs another readable event if:
560  // 1. It is not flowing, as the flow mechanism will take
561  //    care of it.
562  // 2. It is not ended.
563  // 3. It is below the highWaterMark, so we can schedule
564  //    another readable later.
565  state.needReadable =
566    !state.flowing &&
567    !state.ended &&
568    state.length <= state.highWaterMark;
569  flow(stream);
570}
571
572
573// At this point, the user has presumably seen the 'readable' event,
574// and called read() to consume some data.  that may have triggered
575// in turn another _read(n) call, in which case reading = true if
576// it's in progress.
577// However, if we're not ended, or reading, and the length < hwm,
578// then go ahead and try to read some more preemptively.
579function maybeReadMore(stream, state) {
580  if (!state.readingMore) {
581    state.readingMore = true;
582    process.nextTick(maybeReadMore_, stream, state);
583  }
584}
585
586function maybeReadMore_(stream, state) {
587  // Attempt to read more data if we should.
588  //
589  // The conditions for reading more data are (one of):
590  // - Not enough data buffered (state.length < state.highWaterMark). The loop
591  //   is responsible for filling the buffer with enough data if such data
592  //   is available. If highWaterMark is 0 and we are not in the flowing mode
593  //   we should _not_ attempt to buffer any extra data. We'll get more data
594  //   when the stream consumer calls read() instead.
595  // - No data in the buffer, and the stream is in flowing mode. In this mode
596  //   the loop below is responsible for ensuring read() is called. Failing to
597  //   call read here would abort the flow and there's no other mechanism for
598  //   continuing the flow if the stream consumer has just subscribed to the
599  //   'data' event.
600  //
601  // In addition to the above conditions to keep reading data, the following
602  // conditions prevent the data from being read:
603  // - The stream has ended (state.ended).
604  // - There is already a pending 'read' operation (state.reading). This is a
605  //   case where the stream has called the implementation defined _read()
606  //   method, but they are processing the call asynchronously and have _not_
607  //   called push() with new data. In this case we skip performing more
608  //   read()s. The execution ends in this method again after the _read() ends
609  //   up calling push() with more data.
610  while (!state.reading && !state.ended &&
611         (state.length < state.highWaterMark ||
612          (state.flowing && state.length === 0))) {
613    const len = state.length;
614    debug('maybeReadMore read 0');
615    stream.read(0);
616    if (len === state.length)
617      // Didn't get any data, stop spinning.
618      break;
619  }
620  state.readingMore = false;
621}
622
623// Abstract method.  to be overridden in specific implementation classes.
624// call cb(er, data) where data is <= n in length.
625// for virtual (non-string, non-buffer) streams, "length" is somewhat
626// arbitrary, and perhaps not very meaningful.
627Readable.prototype._read = function(n) {
628  throw new ERR_METHOD_NOT_IMPLEMENTED('_read()');
629};
630
631Readable.prototype.pipe = function(dest, pipeOpts) {
632  const src = this;
633  const state = this._readableState;
634
635  if (state.pipes.length === 1) {
636    if (!state.multiAwaitDrain) {
637      state.multiAwaitDrain = true;
638      state.awaitDrainWriters = new Set(
639        state.awaitDrainWriters ? [state.awaitDrainWriters] : []
640      );
641    }
642  }
643
644  state.pipes.push(dest);
645  debug('pipe count=%d opts=%j', state.pipes.length, pipeOpts);
646
647  const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
648              dest !== process.stdout &&
649              dest !== process.stderr;
650
651  const endFn = doEnd ? onend : unpipe;
652  if (state.endEmitted)
653    process.nextTick(endFn);
654  else
655    src.once('end', endFn);
656
657  dest.on('unpipe', onunpipe);
658  function onunpipe(readable, unpipeInfo) {
659    debug('onunpipe');
660    if (readable === src) {
661      if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
662        unpipeInfo.hasUnpiped = true;
663        cleanup();
664      }
665    }
666  }
667
668  function onend() {
669    debug('onend');
670    dest.end();
671  }
672
673  let ondrain;
674
675  let cleanedUp = false;
676  function cleanup() {
677    debug('cleanup');
678    // Cleanup event handlers once the pipe is broken.
679    dest.removeListener('close', onclose);
680    dest.removeListener('finish', onfinish);
681    if (ondrain) {
682      dest.removeListener('drain', ondrain);
683    }
684    dest.removeListener('error', onerror);
685    dest.removeListener('unpipe', onunpipe);
686    src.removeListener('end', onend);
687    src.removeListener('end', unpipe);
688    src.removeListener('data', ondata);
689
690    cleanedUp = true;
691
692    // If the reader is waiting for a drain event from this
693    // specific writer, then it would cause it to never start
694    // flowing again.
695    // So, if this is awaiting a drain, then we just call it now.
696    // If we don't know, then assume that we are waiting for one.
697    if (ondrain && state.awaitDrainWriters &&
698        (!dest._writableState || dest._writableState.needDrain))
699      ondrain();
700  }
701
702  function pause() {
703    // If the user unpiped during `dest.write()`, it is possible
704    // to get stuck in a permanently paused state if that write
705    // also returned false.
706    // => Check whether `dest` is still a piping destination.
707    if (!cleanedUp) {
708      if (state.pipes.length === 1 && state.pipes[0] === dest) {
709        debug('false write response, pause', 0);
710        state.awaitDrainWriters = dest;
711        state.multiAwaitDrain = false;
712      } else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
713        debug('false write response, pause', state.awaitDrainWriters.size);
714        state.awaitDrainWriters.add(dest);
715      }
716      src.pause();
717    }
718    if (!ondrain) {
719      // When the dest drains, it reduces the awaitDrain counter
720      // on the source.  This would be more elegant with a .once()
721      // handler in flow(), but adding and removing repeatedly is
722      // too slow.
723      ondrain = pipeOnDrain(src, dest);
724      dest.on('drain', ondrain);
725    }
726  }
727
728  src.on('data', ondata);
729  function ondata(chunk) {
730    debug('ondata');
731    const ret = dest.write(chunk);
732    debug('dest.write', ret);
733    if (ret === false) {
734      pause();
735    }
736  }
737
738  // If the dest has an error, then stop piping into it.
739  // However, don't suppress the throwing behavior for this.
740  function onerror(er) {
741    debug('onerror', er);
742    unpipe();
743    dest.removeListener('error', onerror);
744    if (EE.listenerCount(dest, 'error') === 0) {
745      const s = dest._writableState || dest._readableState;
746      if (s && !s.errorEmitted) {
747        // User incorrectly emitted 'error' directly on the stream.
748        errorOrDestroy(dest, er);
749      } else {
750        dest.emit('error', er);
751      }
752    }
753  }
754
755  // Make sure our error handler is attached before userland ones.
756  prependListener(dest, 'error', onerror);
757
758  // Both close and finish should trigger unpipe, but only once.
759  function onclose() {
760    dest.removeListener('finish', onfinish);
761    unpipe();
762  }
763  dest.once('close', onclose);
764  function onfinish() {
765    debug('onfinish');
766    dest.removeListener('close', onclose);
767    unpipe();
768  }
769  dest.once('finish', onfinish);
770
771  function unpipe() {
772    debug('unpipe');
773    src.unpipe(dest);
774  }
775
776  // Tell the dest that it's being piped to.
777  dest.emit('pipe', src);
778
779  // Start the flow if it hasn't been started already.
780
781  if (dest.writableNeedDrain === true) {
782    if (state.flowing) {
783      pause();
784    }
785  } else if (!state.flowing) {
786    debug('pipe resume');
787    src.resume();
788  }
789
790  return dest;
791};
792
793function pipeOnDrain(src, dest) {
794  return function pipeOnDrainFunctionResult() {
795    const state = src._readableState;
796
797    // `ondrain` will call directly,
798    // `this` maybe not a reference to dest,
799    // so we use the real dest here.
800    if (state.awaitDrainWriters === dest) {
801      debug('pipeOnDrain', 1);
802      state.awaitDrainWriters = null;
803    } else if (state.multiAwaitDrain) {
804      debug('pipeOnDrain', state.awaitDrainWriters.size);
805      state.awaitDrainWriters.delete(dest);
806    }
807
808    if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
809      EE.listenerCount(src, 'data')) {
810      src.resume();
811    }
812  };
813}
814
815
816Readable.prototype.unpipe = function(dest) {
817  const state = this._readableState;
818  const unpipeInfo = { hasUnpiped: false };
819
820  // If we're not piping anywhere, then do nothing.
821  if (state.pipes.length === 0)
822    return this;
823
824  if (!dest) {
825    // remove all.
826    const dests = state.pipes;
827    state.pipes = [];
828    this.pause();
829
830    for (let i = 0; i < dests.length; i++)
831      dests[i].emit('unpipe', this, { hasUnpiped: false });
832    return this;
833  }
834
835  // Try to find the right one.
836  const index = state.pipes.indexOf(dest);
837  if (index === -1)
838    return this;
839
840  state.pipes.splice(index, 1);
841  if (state.pipes.length === 0)
842    this.pause();
843
844  dest.emit('unpipe', this, unpipeInfo);
845
846  return this;
847};
848
849// Set up data events if they are asked for
850// Ensure readable listeners eventually get something.
851Readable.prototype.on = function(ev, fn) {
852  const res = Stream.prototype.on.call(this, ev, fn);
853  const state = this._readableState;
854
855  if (ev === 'data') {
856    // Update readableListening so that resume() may be a no-op
857    // a few lines down. This is needed to support once('readable').
858    state.readableListening = this.listenerCount('readable') > 0;
859
860    // Try start flowing on next tick if stream isn't explicitly paused.
861    if (state.flowing !== false)
862      this.resume();
863  } else if (ev === 'readable') {
864    if (!state.endEmitted && !state.readableListening) {
865      state.readableListening = state.needReadable = true;
866      state.flowing = false;
867      state.emittedReadable = false;
868      debug('on readable', state.length, state.reading);
869      if (state.length) {
870        emitReadable(this);
871      } else if (!state.reading) {
872        process.nextTick(nReadingNextTick, this);
873      }
874    }
875  }
876
877  return res;
878};
879Readable.prototype.addListener = Readable.prototype.on;
880
881Readable.prototype.removeListener = function(ev, fn) {
882  const res = Stream.prototype.removeListener.call(this, ev, fn);
883
884  if (ev === 'readable') {
885    // We need to check if there is someone still listening to
886    // readable and reset the state. However this needs to happen
887    // after readable has been emitted but before I/O (nextTick) to
888    // support once('readable', fn) cycles. This means that calling
889    // resume within the same tick will have no
890    // effect.
891    process.nextTick(updateReadableListening, this);
892  }
893
894  return res;
895};
896Readable.prototype.off = Readable.prototype.removeListener;
897
898Readable.prototype.removeAllListeners = function(ev) {
899  const res = Stream.prototype.removeAllListeners.apply(this, arguments);
900
901  if (ev === 'readable' || ev === undefined) {
902    // We need to check if there is someone still listening to
903    // readable and reset the state. However this needs to happen
904    // after readable has been emitted but before I/O (nextTick) to
905    // support once('readable', fn) cycles. This means that calling
906    // resume within the same tick will have no
907    // effect.
908    process.nextTick(updateReadableListening, this);
909  }
910
911  return res;
912};
913
914function updateReadableListening(self) {
915  const state = self._readableState;
916  state.readableListening = self.listenerCount('readable') > 0;
917
918  if (state.resumeScheduled && state[kPaused] === false) {
919    // Flowing needs to be set to true now, otherwise
920    // the upcoming resume will not flow.
921    state.flowing = true;
922
923    // Crude way to check if we should resume.
924  } else if (self.listenerCount('data') > 0) {
925    self.resume();
926  } else if (!state.readableListening) {
927    state.flowing = null;
928  }
929}
930
931function nReadingNextTick(self) {
932  debug('readable nexttick read 0');
933  self.read(0);
934}
935
936// pause() and resume() are remnants of the legacy readable stream API
937// If the user uses them, then switch into old mode.
938Readable.prototype.resume = function() {
939  const state = this._readableState;
940  if (!state.flowing) {
941    debug('resume');
942    // We flow only if there is no one listening
943    // for readable, but we still have to call
944    // resume().
945    state.flowing = !state.readableListening;
946    resume(this, state);
947  }
948  state[kPaused] = false;
949  return this;
950};
951
952function resume(stream, state) {
953  if (!state.resumeScheduled) {
954    state.resumeScheduled = true;
955    process.nextTick(resume_, stream, state);
956  }
957}
958
959function resume_(stream, state) {
960  debug('resume', state.reading);
961  if (!state.reading) {
962    stream.read(0);
963  }
964
965  state.resumeScheduled = false;
966  stream.emit('resume');
967  flow(stream);
968  if (state.flowing && !state.reading)
969    stream.read(0);
970}
971
972Readable.prototype.pause = function() {
973  debug('call pause flowing=%j', this._readableState.flowing);
974  if (this._readableState.flowing !== false) {
975    debug('pause');
976    this._readableState.flowing = false;
977    this.emit('pause');
978  }
979  this._readableState[kPaused] = true;
980  return this;
981};
982
983function flow(stream) {
984  const state = stream._readableState;
985  debug('flow', state.flowing);
986  while (state.flowing && stream.read() !== null);
987}
988
989// Wrap an old-style stream as the async data source.
990// This is *not* part of the readable stream interface.
991// It is an ugly unfortunate mess of history.
992Readable.prototype.wrap = function(stream) {
993  const state = this._readableState;
994  let paused = false;
995
996  stream.on('end', () => {
997    debug('wrapped end');
998    if (state.decoder && !state.ended) {
999      const chunk = state.decoder.end();
1000      if (chunk && chunk.length)
1001        this.push(chunk);
1002    }
1003
1004    this.push(null);
1005  });
1006
1007  stream.on('data', (chunk) => {
1008    debug('wrapped data');
1009    if (state.decoder)
1010      chunk = state.decoder.write(chunk);
1011
1012    // Don't skip over falsy values in objectMode.
1013    if (state.objectMode && (chunk === null || chunk === undefined))
1014      return;
1015    else if (!state.objectMode && (!chunk || !chunk.length))
1016      return;
1017
1018    const ret = this.push(chunk);
1019    if (!ret) {
1020      paused = true;
1021      stream.pause();
1022    }
1023  });
1024
1025  // Proxy all the other methods. Important when wrapping filters and duplexes.
1026  for (const i in stream) {
1027    if (this[i] === undefined && typeof stream[i] === 'function') {
1028      this[i] = function methodWrap(method) {
1029        return function methodWrapReturnFunction() {
1030          return stream[method].apply(stream, arguments);
1031        };
1032      }(i);
1033    }
1034  }
1035
1036  stream.on('error', (err) => {
1037    errorOrDestroy(this, err);
1038  });
1039
1040  stream.on('close', () => {
1041    // TODO(ronag): Update readable state?
1042    this.emit('close');
1043  });
1044
1045  stream.on('destroy', () => {
1046    // TODO(ronag): this.destroy()?
1047    this.emit('destroy');
1048  });
1049
1050  stream.on('pause', () => {
1051    // TODO(ronag): this.pause()?
1052    this.emit('pause');
1053  });
1054
1055  stream.on('resume', () => {
1056    // TODO(ronag): this.resume()?
1057    this.emit('resume');
1058  });
1059
1060  // When we try to consume some more bytes, simply unpause the
1061  // underlying stream.
1062  this._read = (n) => {
1063    debug('wrapped _read', n);
1064    if (paused) {
1065      paused = false;
1066      stream.resume();
1067    }
1068  };
1069
1070  return this;
1071};
1072
1073Readable.prototype[SymbolAsyncIterator] = function() {
1074  let stream = this;
1075
1076  if (typeof stream.read !== 'function') {
1077    // v1 stream
1078    const src = stream;
1079    stream = new Readable({
1080      objectMode: true,
1081      destroy(err, callback) {
1082        destroyImpl.destroyer(src, err);
1083        callback(err);
1084      }
1085    }).wrap(src);
1086  }
1087
1088  const iter = createAsyncIterator(stream);
1089  iter.stream = stream;
1090  return iter;
1091};
1092
1093async function* createAsyncIterator(stream) {
1094  let callback = nop;
1095
1096  function next(resolve) {
1097    if (this === stream) {
1098      callback();
1099      callback = nop;
1100    } else {
1101      callback = resolve;
1102    }
1103  }
1104
1105  const state = stream._readableState;
1106
1107  let error = state.errored;
1108  let errorEmitted = state.errorEmitted;
1109  let endEmitted = state.endEmitted;
1110  let closeEmitted = state.closeEmitted;
1111
1112  stream
1113    .on('readable', next)
1114    .on('error', function(err) {
1115      error = err;
1116      errorEmitted = true;
1117      next.call(this);
1118    })
1119    .on('end', function() {
1120      endEmitted = true;
1121      next.call(this);
1122    })
1123    .on('close', function() {
1124      closeEmitted = true;
1125      next.call(this);
1126    });
1127
1128  try {
1129    while (true) {
1130      const chunk = stream.destroyed ? null : stream.read();
1131      if (chunk !== null) {
1132        yield chunk;
1133      } else if (errorEmitted) {
1134        throw error;
1135      } else if (endEmitted) {
1136        break;
1137      } else if (closeEmitted) {
1138        break;
1139      } else {
1140        await new Promise(next);
1141      }
1142    }
1143  } catch (err) {
1144    destroyImpl.destroyer(stream, err);
1145    throw err;
1146  } finally {
1147    if (state.autoDestroy || !endEmitted) {
1148      // TODO(ronag): ERR_PREMATURE_CLOSE?
1149      destroyImpl.destroyer(stream, null);
1150    }
1151  }
1152}
1153
1154// Making it explicit these properties are not enumerable
1155// because otherwise some prototype manipulation in
1156// userland will fail.
1157ObjectDefineProperties(Readable.prototype, {
1158  readable: {
1159    get() {
1160      const r = this._readableState;
1161      // r.readable === false means that this is part of a Duplex stream
1162      // where the readable side was disabled upon construction.
1163      // Compat. The user might manually disable readable side through
1164      // deprecated setter.
1165      return !!r && r.readable !== false && !r.destroyed && !r.errorEmitted &&
1166        !r.endEmitted;
1167    },
1168    set(val) {
1169      // Backwards compat.
1170      if (this._readableState) {
1171        this._readableState.readable = !!val;
1172      }
1173    }
1174  },
1175
1176  readableDidRead: {
1177    enumerable: false,
1178    get: function() {
1179      return (
1180        this._readableState.dataEmitted ||
1181        this._readableState.endEmitted ||
1182        this._readableState.errorEmitted ||
1183        this._readableState.closeEmitted
1184      );
1185    }
1186  },
1187
1188  readableHighWaterMark: {
1189    enumerable: false,
1190    get: function() {
1191      return this._readableState.highWaterMark;
1192    }
1193  },
1194
1195  readableBuffer: {
1196    enumerable: false,
1197    get: function() {
1198      return this._readableState && this._readableState.buffer;
1199    }
1200  },
1201
1202  readableFlowing: {
1203    enumerable: false,
1204    get: function() {
1205      return this._readableState.flowing;
1206    },
1207    set: function(state) {
1208      if (this._readableState) {
1209        this._readableState.flowing = state;
1210      }
1211    }
1212  },
1213
1214  readableLength: {
1215    enumerable: false,
1216    get() {
1217      return this._readableState.length;
1218    }
1219  },
1220
1221  readableObjectMode: {
1222    enumerable: false,
1223    get() {
1224      return this._readableState ? this._readableState.objectMode : false;
1225    }
1226  },
1227
1228  readableEncoding: {
1229    enumerable: false,
1230    get() {
1231      return this._readableState ? this._readableState.encoding : null;
1232    }
1233  },
1234
1235  destroyed: {
1236    enumerable: false,
1237    get() {
1238      if (this._readableState === undefined) {
1239        return false;
1240      }
1241      return this._readableState.destroyed;
1242    },
1243    set(value) {
1244      // We ignore the value if the stream
1245      // has not been initialized yet.
1246      if (!this._readableState) {
1247        return;
1248      }
1249
1250      // Backward compatibility, the user is explicitly
1251      // managing destroyed.
1252      this._readableState.destroyed = value;
1253    }
1254  },
1255
1256  readableEnded: {
1257    enumerable: false,
1258    get() {
1259      return this._readableState ? this._readableState.endEmitted : false;
1260    }
1261  },
1262
1263});
1264
1265ObjectDefineProperties(ReadableState.prototype, {
1266  // Legacy getter for `pipesCount`.
1267  pipesCount: {
1268    get() {
1269      return this.pipes.length;
1270    }
1271  },
1272
1273  // Legacy property for `paused`.
1274  paused: {
1275    get() {
1276      return this[kPaused] !== false;
1277    },
1278    set(value) {
1279      this[kPaused] = !!value;
1280    }
1281  }
1282});
1283
1284// Exposed for testing purposes only.
1285Readable._fromList = fromList;
1286
1287// Pluck off n bytes from an array of buffers.
1288// Length is the combined lengths of all the buffers in the list.
1289// This function is designed to be inlinable, so please take care when making
1290// changes to the function body.
1291function fromList(n, state) {
1292  // nothing buffered.
1293  if (state.length === 0)
1294    return null;
1295
1296  let ret;
1297  if (state.objectMode)
1298    ret = state.buffer.shift();
1299  else if (!n || n >= state.length) {
1300    // Read it all, truncate the list.
1301    if (state.decoder)
1302      ret = state.buffer.join('');
1303    else if (state.buffer.length === 1)
1304      ret = state.buffer.first();
1305    else
1306      ret = state.buffer.concat(state.length);
1307    state.buffer.clear();
1308  } else {
1309    // read part of list.
1310    ret = state.buffer.consume(n, state.decoder);
1311  }
1312
1313  return ret;
1314}
1315
1316function endReadable(stream) {
1317  const state = stream._readableState;
1318
1319  debug('endReadable', state.endEmitted);
1320  if (!state.endEmitted) {
1321    state.ended = true;
1322    process.nextTick(endReadableNT, state, stream);
1323  }
1324}
1325
1326function endReadableNT(state, stream) {
1327  debug('endReadableNT', state.endEmitted, state.length);
1328
1329  // Check that we didn't get one last unshift.
1330  if (!state.errorEmitted && !state.closeEmitted &&
1331      !state.endEmitted && state.length === 0) {
1332    state.endEmitted = true;
1333    stream.emit('end');
1334
1335    if (stream.writable && stream.allowHalfOpen === false) {
1336      process.nextTick(endWritableNT, stream);
1337    } else if (state.autoDestroy) {
1338      // In case of duplex streams we need a way to detect
1339      // if the writable side is ready for autoDestroy as well.
1340      const wState = stream._writableState;
1341      const autoDestroy = !wState || (
1342        wState.autoDestroy &&
1343        // We don't expect the writable to ever 'finish'
1344        // if writable is explicitly set to false.
1345        (wState.finished || wState.writable === false)
1346      );
1347
1348      if (autoDestroy) {
1349        stream.destroy();
1350      }
1351    }
1352  }
1353}
1354
1355function endWritableNT(stream) {
1356  const writable = stream.writable && !stream.writableEnded &&
1357    !stream.destroyed;
1358  if (writable) {
1359    stream.end();
1360  }
1361}
1362
1363Readable.from = function(iterable, opts) {
1364  if (from === undefined) {
1365    from = require('internal/streams/from');
1366  }
1367  return from(Readable, iterable, opts);
1368};
1369