1'use strict'; 2 3const { 4 ObjectCreate, 5 ObjectGetPrototypeOf, 6 ObjectSetPrototypeOf, 7 Promise, 8 PromiseReject, 9 PromiseResolve, 10 Symbol, 11} = primordials; 12 13const finished = require('internal/streams/end-of-stream'); 14 15const kLastResolve = Symbol('lastResolve'); 16const kLastReject = Symbol('lastReject'); 17const kError = Symbol('error'); 18const kEnded = Symbol('ended'); 19const kLastPromise = Symbol('lastPromise'); 20const kHandlePromise = Symbol('handlePromise'); 21const kStream = Symbol('stream'); 22 23function createIterResult(value, done) { 24 return { value, done }; 25} 26 27function readAndResolve(iter) { 28 const resolve = iter[kLastResolve]; 29 if (resolve !== null) { 30 const data = iter[kStream].read(); 31 // We defer if data is null. We can be expecting either 'end' or 'error'. 32 if (data !== null) { 33 iter[kLastPromise] = null; 34 iter[kLastResolve] = null; 35 iter[kLastReject] = null; 36 resolve(createIterResult(data, false)); 37 } 38 } 39} 40 41function onReadable(iter) { 42 // We wait for the next tick, because it might 43 // emit an error with `process.nextTick()`. 44 process.nextTick(readAndResolve, iter); 45} 46 47function wrapForNext(lastPromise, iter) { 48 return (resolve, reject) => { 49 lastPromise.then(() => { 50 if (iter[kEnded]) { 51 resolve(createIterResult(undefined, true)); 52 return; 53 } 54 55 iter[kHandlePromise](resolve, reject); 56 }, reject); 57 }; 58} 59 60const AsyncIteratorPrototype = ObjectGetPrototypeOf( 61 ObjectGetPrototypeOf(async function* () {}).prototype); 62 63const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({ 64 get stream() { 65 return this[kStream]; 66 }, 67 68 next() { 69 // If we have detected an error in the meanwhile 70 // reject straight away. 71 const error = this[kError]; 72 if (error !== null) { 73 return PromiseReject(error); 74 } 75 76 if (this[kEnded]) { 77 return PromiseResolve(createIterResult(undefined, true)); 78 } 79 80 if (this[kStream].destroyed) { 81 // We need to defer via nextTick because if .destroy(err) is 82 // called, the error will be emitted via nextTick, and 83 // we cannot guarantee that there is no error lingering around 84 // waiting to be emitted. 85 return new Promise((resolve, reject) => { 86 process.nextTick(() => { 87 if (this[kError]) { 88 reject(this[kError]); 89 } else { 90 resolve(createIterResult(undefined, true)); 91 } 92 }); 93 }); 94 } 95 96 // If we have multiple next() calls we will wait for the previous Promise to 97 // finish. This logic is optimized to support for await loops, where next() 98 // is only called once at a time. 99 const lastPromise = this[kLastPromise]; 100 let promise; 101 102 if (lastPromise) { 103 promise = new Promise(wrapForNext(lastPromise, this)); 104 } else { 105 // Fast path needed to support multiple this.push() 106 // without triggering the next() queue. 107 const data = this[kStream].read(); 108 if (data !== null) { 109 return PromiseResolve(createIterResult(data, false)); 110 } 111 112 promise = new Promise(this[kHandlePromise]); 113 } 114 115 this[kLastPromise] = promise; 116 117 return promise; 118 }, 119 120 return() { 121 return new Promise((resolve, reject) => { 122 const stream = this[kStream]; 123 124 // TODO(ronag): Remove this check once finished() handles 125 // already ended and/or destroyed streams. 126 const ended = stream.destroyed || stream.readableEnded || 127 (stream._readableState && stream._readableState.endEmitted); 128 if (ended) { 129 resolve(createIterResult(undefined, true)); 130 return; 131 } 132 133 finished(stream, (err) => { 134 if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { 135 reject(err); 136 } else { 137 resolve(createIterResult(undefined, true)); 138 } 139 }); 140 stream.destroy(); 141 }); 142 }, 143}, AsyncIteratorPrototype); 144 145const createReadableStreamAsyncIterator = (stream) => { 146 const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, { 147 [kStream]: { value: stream, writable: true }, 148 [kLastResolve]: { value: null, writable: true }, 149 [kLastReject]: { value: null, writable: true }, 150 [kError]: { value: null, writable: true }, 151 [kEnded]: { 152 value: stream.readableEnded || stream._readableState.endEmitted, 153 writable: true 154 }, 155 // The function passed to new Promise is cached so we avoid allocating a new 156 // closure at every run. 157 [kHandlePromise]: { 158 value: (resolve, reject) => { 159 const data = iterator[kStream].read(); 160 if (data) { 161 iterator[kLastPromise] = null; 162 iterator[kLastResolve] = null; 163 iterator[kLastReject] = null; 164 resolve(createIterResult(data, false)); 165 } else { 166 iterator[kLastResolve] = resolve; 167 iterator[kLastReject] = reject; 168 } 169 }, 170 writable: true, 171 }, 172 }); 173 iterator[kLastPromise] = null; 174 175 finished(stream, { writable: false }, (err) => { 176 if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') { 177 const reject = iterator[kLastReject]; 178 // Reject if we are waiting for data in the Promise returned by next() and 179 // store the error. 180 if (reject !== null) { 181 iterator[kLastPromise] = null; 182 iterator[kLastResolve] = null; 183 iterator[kLastReject] = null; 184 reject(err); 185 } 186 iterator[kError] = err; 187 return; 188 } 189 190 const resolve = iterator[kLastResolve]; 191 if (resolve !== null) { 192 iterator[kLastPromise] = null; 193 iterator[kLastResolve] = null; 194 iterator[kLastReject] = null; 195 resolve(createIterResult(undefined, true)); 196 } 197 iterator[kEnded] = true; 198 }); 199 200 stream.on('readable', onReadable.bind(null, iterator)); 201 202 return iterator; 203}; 204 205module.exports = createReadableStreamAsyncIterator; 206