1// Ported from https://github.com/nodejs/undici/pull/907 2 3'use strict' 4 5const assert = require('assert') 6const { Readable } = require('stream') 7const { RequestAbortedError, NotSupportedError, InvalidArgumentError } = require('../core/errors') 8const util = require('../core/util') 9const { ReadableStreamFrom, toUSVString } = require('../core/util') 10 11let Blob 12 13const kConsume = Symbol('kConsume') 14const kReading = Symbol('kReading') 15const kBody = Symbol('kBody') 16const kAbort = Symbol('abort') 17const kContentType = Symbol('kContentType') 18 19module.exports = class BodyReadable extends Readable { 20 constructor ({ 21 resume, 22 abort, 23 contentType = '', 24 highWaterMark = 64 * 1024 // Same as nodejs fs streams. 25 }) { 26 super({ 27 autoDestroy: true, 28 read: resume, 29 highWaterMark 30 }) 31 32 this._readableState.dataEmitted = false 33 34 this[kAbort] = abort 35 this[kConsume] = null 36 this[kBody] = null 37 this[kContentType] = contentType 38 39 // Is stream being consumed through Readable API? 40 // This is an optimization so that we avoid checking 41 // for 'data' and 'readable' listeners in the hot path 42 // inside push(). 43 this[kReading] = false 44 } 45 46 destroy (err) { 47 if (this.destroyed) { 48 // Node < 16 49 return this 50 } 51 52 if (!err && !this._readableState.endEmitted) { 53 err = new RequestAbortedError() 54 } 55 56 if (err) { 57 this[kAbort]() 58 } 59 60 return super.destroy(err) 61 } 62 63 emit (ev, ...args) { 64 if (ev === 'data') { 65 // Node < 16.7 66 this._readableState.dataEmitted = true 67 } else if (ev === 'error') { 68 // Node < 16 69 this._readableState.errorEmitted = true 70 } 71 return super.emit(ev, ...args) 72 } 73 74 on (ev, ...args) { 75 if (ev === 'data' || ev === 'readable') { 76 this[kReading] = true 77 } 78 return super.on(ev, ...args) 79 } 80 81 addListener (ev, ...args) { 82 return this.on(ev, ...args) 83 } 84 85 off (ev, ...args) { 86 const ret = super.off(ev, ...args) 87 if (ev === 'data' || ev === 'readable') { 88 this[kReading] = ( 89 this.listenerCount('data') > 0 || 90 this.listenerCount('readable') > 0 91 ) 92 } 93 return ret 94 } 95 96 removeListener (ev, ...args) { 97 return this.off(ev, ...args) 98 } 99 100 push (chunk) { 101 if (this[kConsume] && chunk !== null && this.readableLength === 0) { 102 consumePush(this[kConsume], chunk) 103 return this[kReading] ? super.push(chunk) : true 104 } 105 return super.push(chunk) 106 } 107 108 // https://fetch.spec.whatwg.org/#dom-body-text 109 async text () { 110 return consume(this, 'text') 111 } 112 113 // https://fetch.spec.whatwg.org/#dom-body-json 114 async json () { 115 return consume(this, 'json') 116 } 117 118 // https://fetch.spec.whatwg.org/#dom-body-blob 119 async blob () { 120 return consume(this, 'blob') 121 } 122 123 // https://fetch.spec.whatwg.org/#dom-body-arraybuffer 124 async arrayBuffer () { 125 return consume(this, 'arrayBuffer') 126 } 127 128 // https://fetch.spec.whatwg.org/#dom-body-formdata 129 async formData () { 130 // TODO: Implement. 131 throw new NotSupportedError() 132 } 133 134 // https://fetch.spec.whatwg.org/#dom-body-bodyused 135 get bodyUsed () { 136 return util.isDisturbed(this) 137 } 138 139 // https://fetch.spec.whatwg.org/#dom-body-body 140 get body () { 141 if (!this[kBody]) { 142 this[kBody] = ReadableStreamFrom(this) 143 if (this[kConsume]) { 144 // TODO: Is this the best way to force a lock? 145 this[kBody].getReader() // Ensure stream is locked. 146 assert(this[kBody].locked) 147 } 148 } 149 return this[kBody] 150 } 151 152 async dump (opts) { 153 let limit = opts && Number.isFinite(opts.limit) ? opts.limit : 262144 154 const signal = opts && opts.signal 155 const abortFn = () => { 156 this.destroy() 157 } 158 if (signal) { 159 if (typeof signal !== 'object' || !('aborted' in signal)) { 160 throw new InvalidArgumentError('signal must be an AbortSignal') 161 } 162 util.throwIfAborted(signal) 163 signal.addEventListener('abort', abortFn, { once: true }) 164 } 165 try { 166 for await (const chunk of this) { 167 util.throwIfAborted(signal) 168 limit -= Buffer.byteLength(chunk) 169 if (limit < 0) { 170 return 171 } 172 } 173 } catch { 174 util.throwIfAborted(signal) 175 } finally { 176 if (signal) { 177 signal.removeEventListener('abort', abortFn) 178 } 179 } 180 } 181} 182 183// https://streams.spec.whatwg.org/#readablestream-locked 184function isLocked (self) { 185 // Consume is an implicit lock. 186 return (self[kBody] && self[kBody].locked === true) || self[kConsume] 187} 188 189// https://fetch.spec.whatwg.org/#body-unusable 190function isUnusable (self) { 191 return util.isDisturbed(self) || isLocked(self) 192} 193 194async function consume (stream, type) { 195 if (isUnusable(stream)) { 196 throw new TypeError('unusable') 197 } 198 199 assert(!stream[kConsume]) 200 201 return new Promise((resolve, reject) => { 202 stream[kConsume] = { 203 type, 204 stream, 205 resolve, 206 reject, 207 length: 0, 208 body: [] 209 } 210 211 stream 212 .on('error', function (err) { 213 consumeFinish(this[kConsume], err) 214 }) 215 .on('close', function () { 216 if (this[kConsume].body !== null) { 217 consumeFinish(this[kConsume], new RequestAbortedError()) 218 } 219 }) 220 221 process.nextTick(consumeStart, stream[kConsume]) 222 }) 223} 224 225function consumeStart (consume) { 226 if (consume.body === null) { 227 return 228 } 229 230 const { _readableState: state } = consume.stream 231 232 for (const chunk of state.buffer) { 233 consumePush(consume, chunk) 234 } 235 236 if (state.endEmitted) { 237 consumeEnd(this[kConsume]) 238 } else { 239 consume.stream.on('end', function () { 240 consumeEnd(this[kConsume]) 241 }) 242 } 243 244 consume.stream.resume() 245 246 while (consume.stream.read() != null) { 247 // Loop 248 } 249} 250 251function consumeEnd (consume) { 252 const { type, body, resolve, stream, length } = consume 253 254 try { 255 if (type === 'text') { 256 resolve(toUSVString(Buffer.concat(body))) 257 } else if (type === 'json') { 258 resolve(JSON.parse(Buffer.concat(body))) 259 } else if (type === 'arrayBuffer') { 260 const dst = new Uint8Array(length) 261 262 let pos = 0 263 for (const buf of body) { 264 dst.set(buf, pos) 265 pos += buf.byteLength 266 } 267 268 resolve(dst) 269 } else if (type === 'blob') { 270 if (!Blob) { 271 Blob = require('buffer').Blob 272 } 273 resolve(new Blob(body, { type: stream[kContentType] })) 274 } 275 276 consumeFinish(consume) 277 } catch (err) { 278 stream.destroy(err) 279 } 280} 281 282function consumePush (consume, chunk) { 283 consume.length += chunk.length 284 consume.body.push(chunk) 285} 286 287function consumeFinish (consume, err) { 288 if (consume.body === null) { 289 return 290 } 291 292 if (err) { 293 consume.reject(err) 294 } else { 295 consume.resolve() 296 } 297 298 consume.type = null 299 consume.stream = null 300 consume.resolve = null 301 consume.reject = null 302 consume.length = 0 303 consume.body = null 304} 305