• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict';
2
3const {
4  ObjectCreate,
5  ObjectGetPrototypeOf,
6  ObjectSetPrototypeOf,
7  Promise,
8  PromiseReject,
9  PromiseResolve,
10  Symbol,
11} = primordials;
12
13const finished = require('internal/streams/end-of-stream');
14
15const kLastResolve = Symbol('lastResolve');
16const kLastReject = Symbol('lastReject');
17const kError = Symbol('error');
18const kEnded = Symbol('ended');
19const kLastPromise = Symbol('lastPromise');
20const kHandlePromise = Symbol('handlePromise');
21const kStream = Symbol('stream');
22
23function createIterResult(value, done) {
24  return { value, done };
25}
26
27function readAndResolve(iter) {
28  const resolve = iter[kLastResolve];
29  if (resolve !== null) {
30    const data = iter[kStream].read();
31    // We defer if data is null. We can be expecting either 'end' or 'error'.
32    if (data !== null) {
33      iter[kLastPromise] = null;
34      iter[kLastResolve] = null;
35      iter[kLastReject] = null;
36      resolve(createIterResult(data, false));
37    }
38  }
39}
40
41function onReadable(iter) {
42  // We wait for the next tick, because it might
43  // emit an error with `process.nextTick()`.
44  process.nextTick(readAndResolve, iter);
45}
46
47function wrapForNext(lastPromise, iter) {
48  return (resolve, reject) => {
49    lastPromise.then(() => {
50      if (iter[kEnded]) {
51        resolve(createIterResult(undefined, true));
52        return;
53      }
54
55      iter[kHandlePromise](resolve, reject);
56    }, reject);
57  };
58}
59
60const AsyncIteratorPrototype = ObjectGetPrototypeOf(
61  ObjectGetPrototypeOf(async function* () {}).prototype);
62
63const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
64  get stream() {
65    return this[kStream];
66  },
67
68  next() {
69    // If we have detected an error in the meanwhile
70    // reject straight away.
71    const error = this[kError];
72    if (error !== null) {
73      return PromiseReject(error);
74    }
75
76    if (this[kEnded]) {
77      return PromiseResolve(createIterResult(undefined, true));
78    }
79
80    if (this[kStream].destroyed) {
81      // We need to defer via nextTick because if .destroy(err) is
82      // called, the error will be emitted via nextTick, and
83      // we cannot guarantee that there is no error lingering around
84      // waiting to be emitted.
85      return new Promise((resolve, reject) => {
86        process.nextTick(() => {
87          if (this[kError]) {
88            reject(this[kError]);
89          } else {
90            resolve(createIterResult(undefined, true));
91          }
92        });
93      });
94    }
95
96    // If we have multiple next() calls we will wait for the previous Promise to
97    // finish. This logic is optimized to support for await loops, where next()
98    // is only called once at a time.
99    const lastPromise = this[kLastPromise];
100    let promise;
101
102    if (lastPromise) {
103      promise = new Promise(wrapForNext(lastPromise, this));
104    } else {
105      // Fast path needed to support multiple this.push()
106      // without triggering the next() queue.
107      const data = this[kStream].read();
108      if (data !== null) {
109        return PromiseResolve(createIterResult(data, false));
110      }
111
112      promise = new Promise(this[kHandlePromise]);
113    }
114
115    this[kLastPromise] = promise;
116
117    return promise;
118  },
119
120  return() {
121    return new Promise((resolve, reject) => {
122      const stream = this[kStream];
123
124      // TODO(ronag): Remove this check once finished() handles
125      // already ended and/or destroyed streams.
126      const ended = stream.destroyed || stream.readableEnded ||
127        (stream._readableState && stream._readableState.endEmitted);
128      if (ended) {
129        resolve(createIterResult(undefined, true));
130        return;
131      }
132
133      finished(stream, (err) => {
134        if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
135          reject(err);
136        } else {
137          resolve(createIterResult(undefined, true));
138        }
139      });
140      stream.destroy();
141    });
142  },
143}, AsyncIteratorPrototype);
144
145const createReadableStreamAsyncIterator = (stream) => {
146  const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
147    [kStream]: { value: stream, writable: true },
148    [kLastResolve]: { value: null, writable: true },
149    [kLastReject]: { value: null, writable: true },
150    [kError]: { value: null, writable: true },
151    [kEnded]: {
152      value: stream.readableEnded || stream._readableState.endEmitted,
153      writable: true
154    },
155    // The function passed to new Promise is cached so we avoid allocating a new
156    // closure at every run.
157    [kHandlePromise]: {
158      value: (resolve, reject) => {
159        const data = iterator[kStream].read();
160        if (data) {
161          iterator[kLastPromise] = null;
162          iterator[kLastResolve] = null;
163          iterator[kLastReject] = null;
164          resolve(createIterResult(data, false));
165        } else {
166          iterator[kLastResolve] = resolve;
167          iterator[kLastReject] = reject;
168        }
169      },
170      writable: true,
171    },
172  });
173  iterator[kLastPromise] = null;
174
175  finished(stream, { writable: false }, (err) => {
176    if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
177      const reject = iterator[kLastReject];
178      // Reject if we are waiting for data in the Promise returned by next() and
179      // store the error.
180      if (reject !== null) {
181        iterator[kLastPromise] = null;
182        iterator[kLastResolve] = null;
183        iterator[kLastReject] = null;
184        reject(err);
185      }
186      iterator[kError] = err;
187      return;
188    }
189
190    const resolve = iterator[kLastResolve];
191    if (resolve !== null) {
192      iterator[kLastPromise] = null;
193      iterator[kLastResolve] = null;
194      iterator[kLastReject] = null;
195      resolve(createIterResult(undefined, true));
196    }
197    iterator[kEnded] = true;
198  });
199
200  stream.on('readable', onReadable.bind(null, iterator));
201
202  return iterator;
203};
204
205module.exports = createReadableStreamAsyncIterator;
206