• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Ported from https://github.com/nodejs/undici/pull/907
2
3'use strict'
4
5const assert = require('assert')
6const { Readable } = require('stream')
7const { RequestAbortedError, NotSupportedError, InvalidArgumentError } = require('../core/errors')
8const util = require('../core/util')
9const { ReadableStreamFrom, toUSVString } = require('../core/util')
10
11let Blob
12
13const kConsume = Symbol('kConsume')
14const kReading = Symbol('kReading')
15const kBody = Symbol('kBody')
16const kAbort = Symbol('abort')
17const kContentType = Symbol('kContentType')
18
19module.exports = class BodyReadable extends Readable {
20  constructor ({
21    resume,
22    abort,
23    contentType = '',
24    highWaterMark = 64 * 1024 // Same as nodejs fs streams.
25  }) {
26    super({
27      autoDestroy: true,
28      read: resume,
29      highWaterMark
30    })
31
32    this._readableState.dataEmitted = false
33
34    this[kAbort] = abort
35    this[kConsume] = null
36    this[kBody] = null
37    this[kContentType] = contentType
38
39    // Is stream being consumed through Readable API?
40    // This is an optimization so that we avoid checking
41    // for 'data' and 'readable' listeners in the hot path
42    // inside push().
43    this[kReading] = false
44  }
45
46  destroy (err) {
47    if (this.destroyed) {
48      // Node < 16
49      return this
50    }
51
52    if (!err && !this._readableState.endEmitted) {
53      err = new RequestAbortedError()
54    }
55
56    if (err) {
57      this[kAbort]()
58    }
59
60    return super.destroy(err)
61  }
62
63  emit (ev, ...args) {
64    if (ev === 'data') {
65      // Node < 16.7
66      this._readableState.dataEmitted = true
67    } else if (ev === 'error') {
68      // Node < 16
69      this._readableState.errorEmitted = true
70    }
71    return super.emit(ev, ...args)
72  }
73
74  on (ev, ...args) {
75    if (ev === 'data' || ev === 'readable') {
76      this[kReading] = true
77    }
78    return super.on(ev, ...args)
79  }
80
81  addListener (ev, ...args) {
82    return this.on(ev, ...args)
83  }
84
85  off (ev, ...args) {
86    const ret = super.off(ev, ...args)
87    if (ev === 'data' || ev === 'readable') {
88      this[kReading] = (
89        this.listenerCount('data') > 0 ||
90        this.listenerCount('readable') > 0
91      )
92    }
93    return ret
94  }
95
96  removeListener (ev, ...args) {
97    return this.off(ev, ...args)
98  }
99
100  push (chunk) {
101    if (this[kConsume] && chunk !== null && this.readableLength === 0) {
102      consumePush(this[kConsume], chunk)
103      return this[kReading] ? super.push(chunk) : true
104    }
105    return super.push(chunk)
106  }
107
108  // https://fetch.spec.whatwg.org/#dom-body-text
109  async text () {
110    return consume(this, 'text')
111  }
112
113  // https://fetch.spec.whatwg.org/#dom-body-json
114  async json () {
115    return consume(this, 'json')
116  }
117
118  // https://fetch.spec.whatwg.org/#dom-body-blob
119  async blob () {
120    return consume(this, 'blob')
121  }
122
123  // https://fetch.spec.whatwg.org/#dom-body-arraybuffer
124  async arrayBuffer () {
125    return consume(this, 'arrayBuffer')
126  }
127
128  // https://fetch.spec.whatwg.org/#dom-body-formdata
129  async formData () {
130    // TODO: Implement.
131    throw new NotSupportedError()
132  }
133
134  // https://fetch.spec.whatwg.org/#dom-body-bodyused
135  get bodyUsed () {
136    return util.isDisturbed(this)
137  }
138
139  // https://fetch.spec.whatwg.org/#dom-body-body
140  get body () {
141    if (!this[kBody]) {
142      this[kBody] = ReadableStreamFrom(this)
143      if (this[kConsume]) {
144        // TODO: Is this the best way to force a lock?
145        this[kBody].getReader() // Ensure stream is locked.
146        assert(this[kBody].locked)
147      }
148    }
149    return this[kBody]
150  }
151
152  async dump (opts) {
153    let limit = opts && Number.isFinite(opts.limit) ? opts.limit : 262144
154    const signal = opts && opts.signal
155    const abortFn = () => {
156      this.destroy()
157    }
158    if (signal) {
159      if (typeof signal !== 'object' || !('aborted' in signal)) {
160        throw new InvalidArgumentError('signal must be an AbortSignal')
161      }
162      util.throwIfAborted(signal)
163      signal.addEventListener('abort', abortFn, { once: true })
164    }
165    try {
166      for await (const chunk of this) {
167        util.throwIfAborted(signal)
168        limit -= Buffer.byteLength(chunk)
169        if (limit < 0) {
170          return
171        }
172      }
173    } catch {
174      util.throwIfAborted(signal)
175    } finally {
176      if (signal) {
177        signal.removeEventListener('abort', abortFn)
178      }
179    }
180  }
181}
182
183// https://streams.spec.whatwg.org/#readablestream-locked
184function isLocked (self) {
185  // Consume is an implicit lock.
186  return (self[kBody] && self[kBody].locked === true) || self[kConsume]
187}
188
189// https://fetch.spec.whatwg.org/#body-unusable
190function isUnusable (self) {
191  return util.isDisturbed(self) || isLocked(self)
192}
193
194async function consume (stream, type) {
195  if (isUnusable(stream)) {
196    throw new TypeError('unusable')
197  }
198
199  assert(!stream[kConsume])
200
201  return new Promise((resolve, reject) => {
202    stream[kConsume] = {
203      type,
204      stream,
205      resolve,
206      reject,
207      length: 0,
208      body: []
209    }
210
211    stream
212      .on('error', function (err) {
213        consumeFinish(this[kConsume], err)
214      })
215      .on('close', function () {
216        if (this[kConsume].body !== null) {
217          consumeFinish(this[kConsume], new RequestAbortedError())
218        }
219      })
220
221    process.nextTick(consumeStart, stream[kConsume])
222  })
223}
224
225function consumeStart (consume) {
226  if (consume.body === null) {
227    return
228  }
229
230  const { _readableState: state } = consume.stream
231
232  for (const chunk of state.buffer) {
233    consumePush(consume, chunk)
234  }
235
236  if (state.endEmitted) {
237    consumeEnd(this[kConsume])
238  } else {
239    consume.stream.on('end', function () {
240      consumeEnd(this[kConsume])
241    })
242  }
243
244  consume.stream.resume()
245
246  while (consume.stream.read() != null) {
247    // Loop
248  }
249}
250
251function consumeEnd (consume) {
252  const { type, body, resolve, stream, length } = consume
253
254  try {
255    if (type === 'text') {
256      resolve(toUSVString(Buffer.concat(body)))
257    } else if (type === 'json') {
258      resolve(JSON.parse(Buffer.concat(body)))
259    } else if (type === 'arrayBuffer') {
260      const dst = new Uint8Array(length)
261
262      let pos = 0
263      for (const buf of body) {
264        dst.set(buf, pos)
265        pos += buf.byteLength
266      }
267
268      resolve(dst)
269    } else if (type === 'blob') {
270      if (!Blob) {
271        Blob = require('buffer').Blob
272      }
273      resolve(new Blob(body, { type: stream[kContentType] }))
274    }
275
276    consumeFinish(consume)
277  } catch (err) {
278    stream.destroy(err)
279  }
280}
281
282function consumePush (consume, chunk) {
283  consume.length += chunk.length
284  consume.body.push(chunk)
285}
286
287function consumeFinish (consume, err) {
288  if (consume.body === null) {
289    return
290  }
291
292  if (err) {
293    consume.reject(err)
294  } else {
295    consume.resolve()
296  }
297
298  consume.type = null
299  consume.stream = null
300  consume.resolve = null
301  consume.reject = null
302  consume.length = 0
303  consume.body = null
304}
305