• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const {
25  ArrayIsArray,
26  NumberIsInteger,
27  NumberIsNaN,
28  ObjectDefineProperties,
29  ObjectSetPrototypeOf,
30  Set,
31  SymbolAsyncIterator,
32  Symbol
33} = primordials;
34
35module.exports = Readable;
36Readable.ReadableState = ReadableState;
37
38const EE = require('events');
39const Stream = require('stream');
40const { Buffer } = require('buffer');
41
42let debug = require('internal/util/debuglog').debuglog('stream', (fn) => {
43  debug = fn;
44});
45const BufferList = require('internal/streams/buffer_list');
46const destroyImpl = require('internal/streams/destroy');
47const {
48  getHighWaterMark,
49  getDefaultHighWaterMark
50} = require('internal/streams/state');
51const {
52  ERR_INVALID_ARG_TYPE,
53  ERR_STREAM_PUSH_AFTER_EOF,
54  ERR_METHOD_NOT_IMPLEMENTED,
55  ERR_STREAM_UNSHIFT_AFTER_END_EVENT
56} = require('internal/errors').codes;
57
58const kPaused = Symbol('kPaused');
59
60// Lazy loaded to improve the startup performance.
61let StringDecoder;
62let createReadableStreamAsyncIterator;
63let from;
64
65ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
66ObjectSetPrototypeOf(Readable, Stream);
67
68const { errorOrDestroy } = destroyImpl;
69
70function prependListener(emitter, event, fn) {
71  // Sadly this is not cacheable as some libraries bundle their own
72  // event emitter implementation with them.
73  if (typeof emitter.prependListener === 'function')
74    return emitter.prependListener(event, fn);
75
76  // This is a hack to make sure that our error handler is attached before any
77  // userland ones.  NEVER DO THIS. This is here only because this code needs
78  // to continue to work with older versions of Node.js that do not include
79  // the prependListener() method. The goal is to eventually remove this hack.
80  if (!emitter._events || !emitter._events[event])
81    emitter.on(event, fn);
82  else if (ArrayIsArray(emitter._events[event]))
83    emitter._events[event].unshift(fn);
84  else
85    emitter._events[event] = [fn, emitter._events[event]];
86}
87
88function ReadableState(options, stream, isDuplex) {
89  // Duplex streams are both readable and writable, but share
90  // the same options object.
91  // However, some cases require setting options to different
92  // values for the readable and the writable sides of the duplex stream.
93  // These options can be provided separately as readableXXX and writableXXX.
94  if (typeof isDuplex !== 'boolean')
95    isDuplex = stream instanceof Stream.Duplex;
96
97  // Object stream flag. Used to make read(n) ignore n and to
98  // make all the buffer merging and length checks go away
99  this.objectMode = !!(options && options.objectMode);
100
101  if (isDuplex)
102    this.objectMode = this.objectMode ||
103      !!(options && options.readableObjectMode);
104
105  // The point at which it stops calling _read() to fill the buffer
106  // Note: 0 is a valid value, means "don't call _read preemptively ever"
107  this.highWaterMark = options ?
108    getHighWaterMark(this, options, 'readableHighWaterMark', isDuplex) :
109    getDefaultHighWaterMark(false);
110
111  // A linked list is used to store data chunks instead of an array because the
112  // linked list can remove elements from the beginning faster than
113  // array.shift()
114  this.buffer = new BufferList();
115  this.length = 0;
116  this.pipes = null;
117  this.pipesCount = 0;
118  this.flowing = null;
119  this.ended = false;
120  this.endEmitted = false;
121  this.reading = false;
122
123  // A flag to be able to tell if the event 'readable'/'data' is emitted
124  // immediately, or on a later tick.  We set this to true at first, because
125  // any actions that shouldn't happen until "later" should generally also
126  // not happen before the first read call.
127  this.sync = true;
128
129  // Whenever we return null, then we set a flag to say
130  // that we're awaiting a 'readable' event emission.
131  this.needReadable = false;
132  this.emittedReadable = false;
133  this.readableListening = false;
134  this.resumeScheduled = false;
135  this[kPaused] = null;
136
137  // Should close be emitted on destroy. Defaults to true.
138  this.emitClose = !options || options.emitClose !== false;
139
140  // Should .destroy() be called after 'end' (and potentially 'finish')
141  this.autoDestroy = !!(options && options.autoDestroy);
142
143  // Has it been destroyed
144  this.destroyed = false;
145
146  // Crypto is kind of old and crusty.  Historically, its default string
147  // encoding is 'binary' so we have to make this configurable.
148  // Everything else in the universe uses 'utf8', though.
149  this.defaultEncoding = (options && options.defaultEncoding) || 'utf8';
150
151  // Ref the piped dest which we need a drain event on it
152  // type: null | Writable | Set<Writable>
153  this.awaitDrainWriters = null;
154  this.multiAwaitDrain = false;
155
156  // If true, a maybeReadMore has been scheduled
157  this.readingMore = false;
158
159  this.decoder = null;
160  this.encoding = null;
161  if (options && options.encoding) {
162    if (!StringDecoder)
163      StringDecoder = require('string_decoder').StringDecoder;
164    this.decoder = new StringDecoder(options.encoding);
165    this.encoding = options.encoding;
166  }
167}
168
169
170function Readable(options) {
171  if (!(this instanceof Readable))
172    return new Readable(options);
173
174  // Checking for a Stream.Duplex instance is faster here instead of inside
175  // the ReadableState constructor, at least with V8 6.5
176  const isDuplex = this instanceof Stream.Duplex;
177
178  this._readableState = new ReadableState(options, this, isDuplex);
179
180  // legacy
181  this.readable = true;
182
183  if (options) {
184    if (typeof options.read === 'function')
185      this._read = options.read;
186
187    if (typeof options.destroy === 'function')
188      this._destroy = options.destroy;
189  }
190
191  Stream.call(this, options);
192}
193
194Readable.prototype.destroy = destroyImpl.destroy;
195Readable.prototype._undestroy = destroyImpl.undestroy;
196Readable.prototype._destroy = function(err, cb) {
197  cb(err);
198};
199
200Readable.prototype[EE.captureRejectionSymbol] = function(err) {
201  // TODO(mcollina): remove the destroyed if once errorEmitted lands in
202  // Readable.
203  if (!this.destroyed) {
204    this.destroy(err);
205  }
206};
207
208// Manually shove something into the read() buffer.
209// This returns true if the highWaterMark has not been hit yet,
210// similar to how Writable.write() returns true if you should
211// write() some more.
212Readable.prototype.push = function(chunk, encoding) {
213  return readableAddChunk(this, chunk, encoding, false);
214};
215
216// Unshift should *always* be something directly out of read()
217Readable.prototype.unshift = function(chunk, encoding) {
218  return readableAddChunk(this, chunk, encoding, true);
219};
220
221function readableAddChunk(stream, chunk, encoding, addToFront) {
222  debug('readableAddChunk', chunk);
223  const state = stream._readableState;
224
225  let err;
226  if (!state.objectMode) {
227    if (typeof chunk === 'string') {
228      encoding = encoding || state.defaultEncoding;
229      if (addToFront && state.encoding && state.encoding !== encoding) {
230        // When unshifting, if state.encoding is set, we have to save
231        // the string in the BufferList with the state encoding
232        chunk = Buffer.from(chunk, encoding).toString(state.encoding);
233      } else if (encoding !== state.encoding) {
234        chunk = Buffer.from(chunk, encoding);
235        encoding = '';
236      }
237    } else if (chunk instanceof Buffer) {
238      encoding = '';
239    } else if (Stream._isUint8Array(chunk)) {
240      chunk = Stream._uint8ArrayToBuffer(chunk);
241      encoding = '';
242    } else if (chunk != null) {
243      err = new ERR_INVALID_ARG_TYPE(
244        'chunk', ['string', 'Buffer', 'Uint8Array'], chunk);
245    }
246  }
247
248  if (err) {
249    errorOrDestroy(stream, err);
250  } else if (chunk === null) {
251    state.reading = false;
252    onEofChunk(stream, state);
253  } else if (state.objectMode || (chunk && chunk.length > 0)) {
254    if (addToFront) {
255      if (state.endEmitted)
256        errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());
257      else
258        addChunk(stream, state, chunk, true);
259    } else if (state.ended) {
260      errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
261    } else if (state.destroyed) {
262      return false;
263    } else {
264      state.reading = false;
265      if (state.decoder && !encoding) {
266        chunk = state.decoder.write(chunk);
267        if (state.objectMode || chunk.length !== 0)
268          addChunk(stream, state, chunk, false);
269        else
270          maybeReadMore(stream, state);
271      } else {
272        addChunk(stream, state, chunk, false);
273      }
274    }
275  } else if (!addToFront) {
276    state.reading = false;
277    maybeReadMore(stream, state);
278  }
279
280  // We can push more data if we are below the highWaterMark.
281  // Also, if we have no data yet, we can stand some more bytes.
282  // This is to work around cases where hwm=0, such as the repl.
283  return !state.ended &&
284    (state.length < state.highWaterMark || state.length === 0);
285}
286
287function addChunk(stream, state, chunk, addToFront) {
288  if (state.flowing && state.length === 0 && !state.sync &&
289      stream.listenerCount('data') > 0) {
290    // Use the guard to avoid creating `Set()` repeatedly
291    // when we have multiple pipes.
292    if (state.multiAwaitDrain) {
293      state.awaitDrainWriters.clear();
294    } else {
295      state.awaitDrainWriters = null;
296    }
297    stream.emit('data', chunk);
298  } else {
299    // Update the buffer info.
300    state.length += state.objectMode ? 1 : chunk.length;
301    if (addToFront)
302      state.buffer.unshift(chunk);
303    else
304      state.buffer.push(chunk);
305
306    if (state.needReadable)
307      emitReadable(stream);
308  }
309  maybeReadMore(stream, state);
310}
311
312Readable.prototype.isPaused = function() {
313  const state = this._readableState;
314  return state[kPaused] === true || state.flowing === false;
315};
316
317// Backwards compatibility.
318Readable.prototype.setEncoding = function(enc) {
319  if (!StringDecoder)
320    StringDecoder = require('string_decoder').StringDecoder;
321  const decoder = new StringDecoder(enc);
322  this._readableState.decoder = decoder;
323  // If setEncoding(null), decoder.encoding equals utf8
324  this._readableState.encoding = this._readableState.decoder.encoding;
325
326  const buffer = this._readableState.buffer;
327  // Iterate over current buffer to convert already stored Buffers:
328  let content = '';
329  for (const data of buffer) {
330    content += decoder.write(data);
331  }
332  buffer.clear();
333  if (content !== '')
334    buffer.push(content);
335  this._readableState.length = content.length;
336  return this;
337};
338
339// Don't raise the hwm > 1GB
340const MAX_HWM = 0x40000000;
341function computeNewHighWaterMark(n) {
342  if (n >= MAX_HWM) {
343    // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE.
344    n = MAX_HWM;
345  } else {
346    // Get the next highest power of 2 to prevent increasing hwm excessively in
347    // tiny amounts
348    n--;
349    n |= n >>> 1;
350    n |= n >>> 2;
351    n |= n >>> 4;
352    n |= n >>> 8;
353    n |= n >>> 16;
354    n++;
355  }
356  return n;
357}
358
359// This function is designed to be inlinable, so please take care when making
360// changes to the function body.
361function howMuchToRead(n, state) {
362  if (n <= 0 || (state.length === 0 && state.ended))
363    return 0;
364  if (state.objectMode)
365    return 1;
366  if (NumberIsNaN(n)) {
367    // Only flow one buffer at a time
368    if (state.flowing && state.length)
369      return state.buffer.first().length;
370    return state.length;
371  }
372  if (n <= state.length)
373    return n;
374  return state.ended ? state.length : 0;
375}
376
377// You can override either this method, or the async _read(n) below.
378Readable.prototype.read = function(n) {
379  debug('read', n);
380  // Same as parseInt(undefined, 10), however V8 7.3 performance regressed
381  // in this scenario, so we are doing it manually.
382  if (n === undefined) {
383    n = NaN;
384  } else if (!NumberIsInteger(n)) {
385    n = parseInt(n, 10);
386  }
387  const state = this._readableState;
388  const nOrig = n;
389
390  // If we're asking for more than the current hwm, then raise the hwm.
391  if (n > state.highWaterMark)
392    state.highWaterMark = computeNewHighWaterMark(n);
393
394  if (n !== 0)
395    state.emittedReadable = false;
396
397  // If we're doing read(0) to trigger a readable event, but we
398  // already have a bunch of data in the buffer, then just trigger
399  // the 'readable' event and move on.
400  if (n === 0 &&
401      state.needReadable &&
402      ((state.highWaterMark !== 0 ?
403        state.length >= state.highWaterMark :
404        state.length > 0) ||
405       state.ended)) {
406    debug('read: emitReadable', state.length, state.ended);
407    if (state.length === 0 && state.ended)
408      endReadable(this);
409    else
410      emitReadable(this);
411    return null;
412  }
413
414  n = howMuchToRead(n, state);
415
416  // If we've ended, and we're now clear, then finish it up.
417  if (n === 0 && state.ended) {
418    if (state.length === 0)
419      endReadable(this);
420    return null;
421  }
422
423  // All the actual chunk generation logic needs to be
424  // *below* the call to _read.  The reason is that in certain
425  // synthetic stream cases, such as passthrough streams, _read
426  // may be a completely synchronous operation which may change
427  // the state of the read buffer, providing enough data when
428  // before there was *not* enough.
429  //
430  // So, the steps are:
431  // 1. Figure out what the state of things will be after we do
432  // a read from the buffer.
433  //
434  // 2. If that resulting state will trigger a _read, then call _read.
435  // Note that this may be asynchronous, or synchronous.  Yes, it is
436  // deeply ugly to write APIs this way, but that still doesn't mean
437  // that the Readable class should behave improperly, as streams are
438  // designed to be sync/async agnostic.
439  // Take note if the _read call is sync or async (ie, if the read call
440  // has returned yet), so that we know whether or not it's safe to emit
441  // 'readable' etc.
442  //
443  // 3. Actually pull the requested chunks out of the buffer and return.
444
445  // if we need a readable event, then we need to do some reading.
446  var doRead = state.needReadable;
447  debug('need readable', doRead);
448
449  // If we currently have less than the highWaterMark, then also read some
450  if (state.length === 0 || state.length - n < state.highWaterMark) {
451    doRead = true;
452    debug('length less than watermark', doRead);
453  }
454
455  // However, if we've ended, then there's no point, if we're already
456  // reading, then it's unnecessary, and if we're destroyed, then it's
457  // not allowed.
458  if (state.ended || state.reading || state.destroyed) {
459    doRead = false;
460    debug('reading or ended', doRead);
461  } else if (doRead) {
462    debug('do read');
463    state.reading = true;
464    state.sync = true;
465    // If the length is currently zero, then we *need* a readable event.
466    if (state.length === 0)
467      state.needReadable = true;
468    // Call internal read method
469    this._read(state.highWaterMark);
470    state.sync = false;
471    // If _read pushed data synchronously, then `reading` will be false,
472    // and we need to re-evaluate how much data we can return to the user.
473    if (!state.reading)
474      n = howMuchToRead(nOrig, state);
475  }
476
477  var ret;
478  if (n > 0)
479    ret = fromList(n, state);
480  else
481    ret = null;
482
483  if (ret === null) {
484    state.needReadable = state.length <= state.highWaterMark;
485    n = 0;
486  } else {
487    state.length -= n;
488    if (state.multiAwaitDrain) {
489      state.awaitDrainWriters.clear();
490    } else {
491      state.awaitDrainWriters = null;
492    }
493  }
494
495  if (state.length === 0) {
496    // If we have nothing in the buffer, then we want to know
497    // as soon as we *do* get something into the buffer.
498    if (!state.ended)
499      state.needReadable = true;
500
501    // If we tried to read() past the EOF, then emit end on the next tick.
502    if (nOrig !== n && state.ended)
503      endReadable(this);
504  }
505
506  if (ret !== null)
507    this.emit('data', ret);
508
509  return ret;
510};
511
512function onEofChunk(stream, state) {
513  debug('onEofChunk');
514  if (state.ended) return;
515  if (state.decoder) {
516    var chunk = state.decoder.end();
517    if (chunk && chunk.length) {
518      state.buffer.push(chunk);
519      state.length += state.objectMode ? 1 : chunk.length;
520    }
521  }
522  state.ended = true;
523
524  if (state.sync) {
525    // If we are sync, wait until next tick to emit the data.
526    // Otherwise we risk emitting data in the flow()
527    // the readable code triggers during a read() call
528    emitReadable(stream);
529  } else {
530    // Emit 'readable' now to make sure it gets picked up.
531    state.needReadable = false;
532    state.emittedReadable = true;
533    // We have to emit readable now that we are EOF. Modules
534    // in the ecosystem (e.g. dicer) rely on this event being sync.
535    emitReadable_(stream);
536  }
537}
538
539// Don't emit readable right away in sync mode, because this can trigger
540// another read() call => stack overflow.  This way, it might trigger
541// a nextTick recursion warning, but that's not so bad.
542function emitReadable(stream) {
543  const state = stream._readableState;
544  debug('emitReadable', state.needReadable, state.emittedReadable);
545  state.needReadable = false;
546  if (!state.emittedReadable) {
547    debug('emitReadable', state.flowing);
548    state.emittedReadable = true;
549    process.nextTick(emitReadable_, stream);
550  }
551}
552
553function emitReadable_(stream) {
554  const state = stream._readableState;
555  debug('emitReadable_', state.destroyed, state.length, state.ended);
556  if (!state.destroyed && (state.length || state.ended)) {
557    stream.emit('readable');
558    state.emittedReadable = false;
559  }
560
561  // The stream needs another readable event if
562  // 1. It is not flowing, as the flow mechanism will take
563  //    care of it.
564  // 2. It is not ended.
565  // 3. It is below the highWaterMark, so we can schedule
566  //    another readable later.
567  state.needReadable =
568    !state.flowing &&
569    !state.ended &&
570    state.length <= state.highWaterMark;
571  flow(stream);
572}
573
574
575// At this point, the user has presumably seen the 'readable' event,
576// and called read() to consume some data.  that may have triggered
577// in turn another _read(n) call, in which case reading = true if
578// it's in progress.
579// However, if we're not ended, or reading, and the length < hwm,
580// then go ahead and try to read some more preemptively.
581function maybeReadMore(stream, state) {
582  if (!state.readingMore) {
583    state.readingMore = true;
584    process.nextTick(maybeReadMore_, stream, state);
585  }
586}
587
588function maybeReadMore_(stream, state) {
589  // Attempt to read more data if we should.
590  //
591  // The conditions for reading more data are (one of):
592  // - Not enough data buffered (state.length < state.highWaterMark). The loop
593  //   is responsible for filling the buffer with enough data if such data
594  //   is available. If highWaterMark is 0 and we are not in the flowing mode
595  //   we should _not_ attempt to buffer any extra data. We'll get more data
596  //   when the stream consumer calls read() instead.
597  // - No data in the buffer, and the stream is in flowing mode. In this mode
598  //   the loop below is responsible for ensuring read() is called. Failing to
599  //   call read here would abort the flow and there's no other mechanism for
600  //   continuing the flow if the stream consumer has just subscribed to the
601  //   'data' event.
602  //
603  // In addition to the above conditions to keep reading data, the following
604  // conditions prevent the data from being read:
605  // - The stream has ended (state.ended).
606  // - There is already a pending 'read' operation (state.reading). This is a
607  //   case where the the stream has called the implementation defined _read()
608  //   method, but they are processing the call asynchronously and have _not_
609  //   called push() with new data. In this case we skip performing more
610  //   read()s. The execution ends in this method again after the _read() ends
611  //   up calling push() with more data.
612  while (!state.reading && !state.ended &&
613         (state.length < state.highWaterMark ||
614          (state.flowing && state.length === 0))) {
615    const len = state.length;
616    debug('maybeReadMore read 0');
617    stream.read(0);
618    if (len === state.length)
619      // Didn't get any data, stop spinning.
620      break;
621  }
622  state.readingMore = false;
623}
624
625// Abstract method.  to be overridden in specific implementation classes.
626// call cb(er, data) where data is <= n in length.
627// for virtual (non-string, non-buffer) streams, "length" is somewhat
628// arbitrary, and perhaps not very meaningful.
629Readable.prototype._read = function(n) {
630  errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
631};
632
633Readable.prototype.pipe = function(dest, pipeOpts) {
634  const src = this;
635  const state = this._readableState;
636
637  if (state.pipesCount === 1) {
638    if (!state.multiAwaitDrain) {
639      state.multiAwaitDrain = true;
640      state.awaitDrainWriters = new Set(
641        state.awaitDrainWriters ? [state.awaitDrainWriters] : []
642      );
643    }
644  }
645
646  switch (state.pipesCount) {
647    case 0:
648      state.pipes = dest;
649      break;
650    case 1:
651      state.pipes = [state.pipes, dest];
652      break;
653    default:
654      state.pipes.push(dest);
655      break;
656  }
657  state.pipesCount += 1;
658  debug('pipe count=%d opts=%j', state.pipesCount, pipeOpts);
659
660  const doEnd = (!pipeOpts || pipeOpts.end !== false) &&
661              dest !== process.stdout &&
662              dest !== process.stderr;
663
664  const endFn = doEnd ? onend : unpipe;
665  if (state.endEmitted)
666    process.nextTick(endFn);
667  else
668    src.once('end', endFn);
669
670  dest.on('unpipe', onunpipe);
671  function onunpipe(readable, unpipeInfo) {
672    debug('onunpipe');
673    if (readable === src) {
674      if (unpipeInfo && unpipeInfo.hasUnpiped === false) {
675        unpipeInfo.hasUnpiped = true;
676        cleanup();
677      }
678    }
679  }
680
681  function onend() {
682    debug('onend');
683    dest.end();
684  }
685
686  let ondrain;
687
688  var cleanedUp = false;
689  function cleanup() {
690    debug('cleanup');
691    // Cleanup event handlers once the pipe is broken
692    dest.removeListener('close', onclose);
693    dest.removeListener('finish', onfinish);
694    if (ondrain) {
695      dest.removeListener('drain', ondrain);
696    }
697    dest.removeListener('error', onerror);
698    dest.removeListener('unpipe', onunpipe);
699    src.removeListener('end', onend);
700    src.removeListener('end', unpipe);
701    src.removeListener('data', ondata);
702
703    cleanedUp = true;
704
705    // If the reader is waiting for a drain event from this
706    // specific writer, then it would cause it to never start
707    // flowing again.
708    // So, if this is awaiting a drain, then we just call it now.
709    // If we don't know, then assume that we are waiting for one.
710    if (ondrain && state.awaitDrainWriters &&
711        (!dest._writableState || dest._writableState.needDrain))
712      ondrain();
713  }
714
715  src.on('data', ondata);
716  function ondata(chunk) {
717    debug('ondata');
718    const ret = dest.write(chunk);
719    debug('dest.write', ret);
720    if (ret === false) {
721      // If the user unpiped during `dest.write()`, it is possible
722      // to get stuck in a permanently paused state if that write
723      // also returned false.
724      // => Check whether `dest` is still a piping destination.
725      if (!cleanedUp) {
726        if (state.pipesCount === 1 && state.pipes === dest) {
727          debug('false write response, pause', 0);
728          state.awaitDrainWriters = dest;
729          state.multiAwaitDrain = false;
730        } else if (state.pipesCount > 1 && state.pipes.includes(dest)) {
731          debug('false write response, pause', state.awaitDrainWriters.size);
732          state.awaitDrainWriters.add(dest);
733        }
734        src.pause();
735      }
736      if (!ondrain) {
737        // When the dest drains, it reduces the awaitDrain counter
738        // on the source.  This would be more elegant with a .once()
739        // handler in flow(), but adding and removing repeatedly is
740        // too slow.
741        ondrain = pipeOnDrain(src, dest);
742        dest.on('drain', ondrain);
743      }
744    }
745  }
746
747  // If the dest has an error, then stop piping into it.
748  // However, don't suppress the throwing behavior for this.
749  function onerror(er) {
750    debug('onerror', er);
751    unpipe();
752    dest.removeListener('error', onerror);
753    if (EE.listenerCount(dest, 'error') === 0)
754      errorOrDestroy(dest, er);
755  }
756
757  // Make sure our error handler is attached before userland ones.
758  prependListener(dest, 'error', onerror);
759
760  // Both close and finish should trigger unpipe, but only once.
761  function onclose() {
762    dest.removeListener('finish', onfinish);
763    unpipe();
764  }
765  dest.once('close', onclose);
766  function onfinish() {
767    debug('onfinish');
768    dest.removeListener('close', onclose);
769    unpipe();
770  }
771  dest.once('finish', onfinish);
772
773  function unpipe() {
774    debug('unpipe');
775    src.unpipe(dest);
776  }
777
778  // Tell the dest that it's being piped to
779  dest.emit('pipe', src);
780
781  // Start the flow if it hasn't been started already.
782  if (!state.flowing) {
783    debug('pipe resume');
784    src.resume();
785  }
786
787  return dest;
788};
789
790function pipeOnDrain(src, dest) {
791  return function pipeOnDrainFunctionResult() {
792    const state = src._readableState;
793
794    // `ondrain` will call directly,
795    // `this` maybe not a reference to dest,
796    // so we use the real dest here.
797    if (state.awaitDrainWriters === dest) {
798      debug('pipeOnDrain', 1);
799      state.awaitDrainWriters = null;
800    } else if (state.multiAwaitDrain) {
801      debug('pipeOnDrain', state.awaitDrainWriters.size);
802      state.awaitDrainWriters.delete(dest);
803    }
804
805    if ((!state.awaitDrainWriters || state.awaitDrainWriters.size === 0) &&
806      EE.listenerCount(src, 'data')) {
807      state.flowing = true;
808      flow(src);
809    }
810  };
811}
812
813
814Readable.prototype.unpipe = function(dest) {
815  const state = this._readableState;
816  const unpipeInfo = { hasUnpiped: false };
817
818  // If we're not piping anywhere, then do nothing.
819  if (state.pipesCount === 0)
820    return this;
821
822  // Just one destination.  most common case.
823  if (state.pipesCount === 1) {
824    // Passed in one, but it's not the right one.
825    if (dest && dest !== state.pipes)
826      return this;
827
828    if (!dest)
829      dest = state.pipes;
830
831    // got a match.
832    state.pipes = null;
833    state.pipesCount = 0;
834    state.flowing = false;
835    if (dest)
836      dest.emit('unpipe', this, unpipeInfo);
837    return this;
838  }
839
840  // Slow case with multiple pipe destinations.
841
842  if (!dest) {
843    // remove all.
844    var dests = state.pipes;
845    var len = state.pipesCount;
846    state.pipes = null;
847    state.pipesCount = 0;
848    state.flowing = false;
849
850    for (var i = 0; i < len; i++)
851      dests[i].emit('unpipe', this, { hasUnpiped: false });
852    return this;
853  }
854
855  // Try to find the right one.
856  const index = state.pipes.indexOf(dest);
857  if (index === -1)
858    return this;
859
860  state.pipes.splice(index, 1);
861  state.pipesCount -= 1;
862  if (state.pipesCount === 1)
863    state.pipes = state.pipes[0];
864
865  dest.emit('unpipe', this, unpipeInfo);
866
867  return this;
868};
869
870// Set up data events if they are asked for
871// Ensure readable listeners eventually get something
872Readable.prototype.on = function(ev, fn) {
873  const res = Stream.prototype.on.call(this, ev, fn);
874  const state = this._readableState;
875
876  if (ev === 'data') {
877    // Update readableListening so that resume() may be a no-op
878    // a few lines down. This is needed to support once('readable').
879    state.readableListening = this.listenerCount('readable') > 0;
880
881    // Try start flowing on next tick if stream isn't explicitly paused
882    if (state.flowing !== false)
883      this.resume();
884  } else if (ev === 'readable') {
885    if (!state.endEmitted && !state.readableListening) {
886      state.readableListening = state.needReadable = true;
887      state.flowing = false;
888      state.emittedReadable = false;
889      debug('on readable', state.length, state.reading);
890      if (state.length) {
891        emitReadable(this);
892      } else if (!state.reading) {
893        process.nextTick(nReadingNextTick, this);
894      }
895    }
896  }
897
898  return res;
899};
900Readable.prototype.addListener = Readable.prototype.on;
901
902Readable.prototype.removeListener = function(ev, fn) {
903  const res = Stream.prototype.removeListener.call(this, ev, fn);
904
905  if (ev === 'readable') {
906    // We need to check if there is someone still listening to
907    // readable and reset the state. However this needs to happen
908    // after readable has been emitted but before I/O (nextTick) to
909    // support once('readable', fn) cycles. This means that calling
910    // resume within the same tick will have no
911    // effect.
912    process.nextTick(updateReadableListening, this);
913  }
914
915  return res;
916};
917Readable.prototype.off = Readable.prototype.removeListener;
918
919Readable.prototype.removeAllListeners = function(ev) {
920  const res = Stream.prototype.removeAllListeners.apply(this, arguments);
921
922  if (ev === 'readable' || ev === undefined) {
923    // We need to check if there is someone still listening to
924    // readable and reset the state. However this needs to happen
925    // after readable has been emitted but before I/O (nextTick) to
926    // support once('readable', fn) cycles. This means that calling
927    // resume within the same tick will have no
928    // effect.
929    process.nextTick(updateReadableListening, this);
930  }
931
932  return res;
933};
934
935function updateReadableListening(self) {
936  const state = self._readableState;
937  state.readableListening = self.listenerCount('readable') > 0;
938
939  if (state.resumeScheduled && state[kPaused] === false) {
940    // Flowing needs to be set to true now, otherwise
941    // the upcoming resume will not flow.
942    state.flowing = true;
943
944    // Crude way to check if we should resume
945  } else if (self.listenerCount('data') > 0) {
946    self.resume();
947  } else if (!state.readableListening) {
948    state.flowing = null;
949  }
950}
951
952function nReadingNextTick(self) {
953  debug('readable nexttick read 0');
954  self.read(0);
955}
956
957// pause() and resume() are remnants of the legacy readable stream API
958// If the user uses them, then switch into old mode.
959Readable.prototype.resume = function() {
960  const state = this._readableState;
961  if (!state.flowing) {
962    debug('resume');
963    // We flow only if there is no one listening
964    // for readable, but we still have to call
965    // resume()
966    state.flowing = !state.readableListening;
967    resume(this, state);
968  }
969  state[kPaused] = false;
970  return this;
971};
972
973function resume(stream, state) {
974  if (!state.resumeScheduled) {
975    state.resumeScheduled = true;
976    process.nextTick(resume_, stream, state);
977  }
978}
979
980function resume_(stream, state) {
981  debug('resume', state.reading);
982  if (!state.reading) {
983    stream.read(0);
984  }
985
986  state.resumeScheduled = false;
987  stream.emit('resume');
988  flow(stream);
989  if (state.flowing && !state.reading)
990    stream.read(0);
991}
992
993Readable.prototype.pause = function() {
994  debug('call pause flowing=%j', this._readableState.flowing);
995  if (this._readableState.flowing !== false) {
996    debug('pause');
997    this._readableState.flowing = false;
998    this.emit('pause');
999  }
1000  this._readableState[kPaused] = true;
1001  return this;
1002};
1003
1004function flow(stream) {
1005  const state = stream._readableState;
1006  debug('flow', state.flowing);
1007  while (state.flowing && stream.read() !== null);
1008}
1009
1010// Wrap an old-style stream as the async data source.
1011// This is *not* part of the readable stream interface.
1012// It is an ugly unfortunate mess of history.
1013Readable.prototype.wrap = function(stream) {
1014  const state = this._readableState;
1015  var paused = false;
1016
1017  stream.on('end', () => {
1018    debug('wrapped end');
1019    if (state.decoder && !state.ended) {
1020      var chunk = state.decoder.end();
1021      if (chunk && chunk.length)
1022        this.push(chunk);
1023    }
1024
1025    this.push(null);
1026  });
1027
1028  stream.on('data', (chunk) => {
1029    debug('wrapped data');
1030    if (state.decoder)
1031      chunk = state.decoder.write(chunk);
1032
1033    // Don't skip over falsy values in objectMode
1034    if (state.objectMode && (chunk === null || chunk === undefined))
1035      return;
1036    else if (!state.objectMode && (!chunk || !chunk.length))
1037      return;
1038
1039    const ret = this.push(chunk);
1040    if (!ret) {
1041      paused = true;
1042      stream.pause();
1043    }
1044  });
1045
1046  // Proxy all the other methods. Important when wrapping filters and duplexes.
1047  for (const i in stream) {
1048    if (this[i] === undefined && typeof stream[i] === 'function') {
1049      this[i] = function methodWrap(method) {
1050        return function methodWrapReturnFunction() {
1051          return stream[method].apply(stream, arguments);
1052        };
1053      }(i);
1054    }
1055  }
1056
1057  stream.on('error', (err) => {
1058    errorOrDestroy(this, err);
1059  });
1060
1061  stream.on('close', () => {
1062    // TODO(ronag): Update readable state?
1063    this.emit('close');
1064  });
1065
1066  stream.on('destroy', () => {
1067    // TODO(ronag): this.destroy()?
1068    this.emit('destroy');
1069  });
1070
1071  stream.on('pause', () => {
1072    // TODO(ronag): this.pause()?
1073    this.emit('pause');
1074  });
1075
1076  stream.on('resume', () => {
1077    // TODO(ronag): this.resume()?
1078    this.emit('resume');
1079  });
1080
1081  // When we try to consume some more bytes, simply unpause the
1082  // underlying stream.
1083  this._read = (n) => {
1084    debug('wrapped _read', n);
1085    if (paused) {
1086      paused = false;
1087      stream.resume();
1088    }
1089  };
1090
1091  return this;
1092};
1093
1094Readable.prototype[SymbolAsyncIterator] = function() {
1095  if (createReadableStreamAsyncIterator === undefined) {
1096    createReadableStreamAsyncIterator =
1097      require('internal/streams/async_iterator');
1098  }
1099  return createReadableStreamAsyncIterator(this);
1100};
1101
1102// Making it explicit these properties are not enumerable
1103// because otherwise some prototype manipulation in
1104// userland will fail
1105ObjectDefineProperties(Readable.prototype, {
1106
1107  readableHighWaterMark: {
1108    enumerable: false,
1109    get: function() {
1110      return this._readableState.highWaterMark;
1111    }
1112  },
1113
1114  readableBuffer: {
1115    enumerable: false,
1116    get: function() {
1117      return this._readableState && this._readableState.buffer;
1118    }
1119  },
1120
1121  readableFlowing: {
1122    enumerable: false,
1123    get: function() {
1124      return this._readableState.flowing;
1125    },
1126    set: function(state) {
1127      if (this._readableState) {
1128        this._readableState.flowing = state;
1129      }
1130    }
1131  },
1132
1133  readableLength: {
1134    enumerable: false,
1135    get() {
1136      return this._readableState.length;
1137    }
1138  },
1139
1140  readableObjectMode: {
1141    enumerable: false,
1142    get() {
1143      return this._readableState ? this._readableState.objectMode : false;
1144    }
1145  },
1146
1147  readableEncoding: {
1148    enumerable: false,
1149    get() {
1150      return this._readableState ? this._readableState.encoding : null;
1151    }
1152  },
1153
1154  destroyed: {
1155    enumerable: false,
1156    get() {
1157      if (this._readableState === undefined) {
1158        return false;
1159      }
1160      return this._readableState.destroyed;
1161    },
1162    set(value) {
1163      // We ignore the value if the stream
1164      // has not been initialized yet
1165      if (!this._readableState) {
1166        return;
1167      }
1168
1169      // Backward compatibility, the user is explicitly
1170      // managing destroyed
1171      this._readableState.destroyed = value;
1172    }
1173  },
1174
1175  readableEnded: {
1176    enumerable: false,
1177    get() {
1178      return this._readableState ? this._readableState.endEmitted : false;
1179    }
1180  },
1181
1182  paused: {
1183    get() {
1184      return this[kPaused] !== false;
1185    },
1186    set(value) {
1187      this[kPaused] = !!value;
1188    }
1189  }
1190});
1191
1192// Exposed for testing purposes only.
1193Readable._fromList = fromList;
1194
1195// Pluck off n bytes from an array of buffers.
1196// Length is the combined lengths of all the buffers in the list.
1197// This function is designed to be inlinable, so please take care when making
1198// changes to the function body.
1199function fromList(n, state) {
1200  // nothing buffered
1201  if (state.length === 0)
1202    return null;
1203
1204  var ret;
1205  if (state.objectMode)
1206    ret = state.buffer.shift();
1207  else if (!n || n >= state.length) {
1208    // Read it all, truncate the list
1209    if (state.decoder)
1210      ret = state.buffer.join('');
1211    else if (state.buffer.length === 1)
1212      ret = state.buffer.first();
1213    else
1214      ret = state.buffer.concat(state.length);
1215    state.buffer.clear();
1216  } else {
1217    // read part of list
1218    ret = state.buffer.consume(n, state.decoder);
1219  }
1220
1221  return ret;
1222}
1223
1224function endReadable(stream) {
1225  const state = stream._readableState;
1226
1227  debug('endReadable', state.endEmitted);
1228  if (!state.endEmitted) {
1229    state.ended = true;
1230    process.nextTick(endReadableNT, state, stream);
1231  }
1232}
1233
1234function endReadableNT(state, stream) {
1235  debug('endReadableNT', state.endEmitted, state.length);
1236
1237  // Check that we didn't get one last unshift.
1238  if (!state.endEmitted && state.length === 0) {
1239    state.endEmitted = true;
1240    stream.readable = false;
1241    stream.emit('end');
1242
1243    if (state.autoDestroy) {
1244      // In case of duplex streams we need a way to detect
1245      // if the writable side is ready for autoDestroy as well
1246      const wState = stream._writableState;
1247      if (!wState || (wState.autoDestroy && wState.finished)) {
1248        stream.destroy();
1249      }
1250    }
1251  }
1252}
1253
1254Readable.from = function(iterable, opts) {
1255  if (from === undefined) {
1256    from = require('internal/streams/from');
1257  }
1258  return from(Readable, iterable, opts);
1259};
1260