• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1'use strict'
2
3const {
4  Readable,
5  Duplex,
6  PassThrough
7} = require('stream')
8const {
9  InvalidArgumentError,
10  InvalidReturnValueError,
11  RequestAbortedError
12} = require('../core/errors')
13const util = require('../core/util')
14const { AsyncResource } = require('async_hooks')
15const { addSignal, removeSignal } = require('./abort-signal')
16const assert = require('assert')
17
18const kResume = Symbol('resume')
19
20class PipelineRequest extends Readable {
21  constructor () {
22    super({ autoDestroy: true })
23
24    this[kResume] = null
25  }
26
27  _read () {
28    const { [kResume]: resume } = this
29
30    if (resume) {
31      this[kResume] = null
32      resume()
33    }
34  }
35
36  _destroy (err, callback) {
37    this._read()
38
39    callback(err)
40  }
41}
42
43class PipelineResponse extends Readable {
44  constructor (resume) {
45    super({ autoDestroy: true })
46    this[kResume] = resume
47  }
48
49  _read () {
50    this[kResume]()
51  }
52
53  _destroy (err, callback) {
54    if (!err && !this._readableState.endEmitted) {
55      err = new RequestAbortedError()
56    }
57
58    callback(err)
59  }
60}
61
62class PipelineHandler extends AsyncResource {
63  constructor (opts, handler) {
64    if (!opts || typeof opts !== 'object') {
65      throw new InvalidArgumentError('invalid opts')
66    }
67
68    if (typeof handler !== 'function') {
69      throw new InvalidArgumentError('invalid handler')
70    }
71
72    const { signal, method, opaque, onInfo, responseHeaders } = opts
73
74    if (signal && typeof signal.on !== 'function' && typeof signal.addEventListener !== 'function') {
75      throw new InvalidArgumentError('signal must be an EventEmitter or EventTarget')
76    }
77
78    if (method === 'CONNECT') {
79      throw new InvalidArgumentError('invalid method')
80    }
81
82    if (onInfo && typeof onInfo !== 'function') {
83      throw new InvalidArgumentError('invalid onInfo callback')
84    }
85
86    super('UNDICI_PIPELINE')
87
88    this.opaque = opaque || null
89    this.responseHeaders = responseHeaders || null
90    this.handler = handler
91    this.abort = null
92    this.context = null
93    this.onInfo = onInfo || null
94
95    this.req = new PipelineRequest().on('error', util.nop)
96
97    this.ret = new Duplex({
98      readableObjectMode: opts.objectMode,
99      autoDestroy: true,
100      read: () => {
101        const { body } = this
102
103        if (body && body.resume) {
104          body.resume()
105        }
106      },
107      write: (chunk, encoding, callback) => {
108        const { req } = this
109
110        if (req.push(chunk, encoding) || req._readableState.destroyed) {
111          callback()
112        } else {
113          req[kResume] = callback
114        }
115      },
116      destroy: (err, callback) => {
117        const { body, req, res, ret, abort } = this
118
119        if (!err && !ret._readableState.endEmitted) {
120          err = new RequestAbortedError()
121        }
122
123        if (abort && err) {
124          abort()
125        }
126
127        util.destroy(body, err)
128        util.destroy(req, err)
129        util.destroy(res, err)
130
131        removeSignal(this)
132
133        callback(err)
134      }
135    }).on('prefinish', () => {
136      const { req } = this
137
138      // Node < 15 does not call _final in same tick.
139      req.push(null)
140    })
141
142    this.res = null
143
144    addSignal(this, signal)
145  }
146
147  onConnect (abort, context) {
148    const { ret, res } = this
149
150    assert(!res, 'pipeline cannot be retried')
151
152    if (ret.destroyed) {
153      throw new RequestAbortedError()
154    }
155
156    this.abort = abort
157    this.context = context
158  }
159
160  onHeaders (statusCode, rawHeaders, resume) {
161    const { opaque, handler, context } = this
162
163    if (statusCode < 200) {
164      if (this.onInfo) {
165        const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
166        this.onInfo({ statusCode, headers })
167      }
168      return
169    }
170
171    this.res = new PipelineResponse(resume)
172
173    let body
174    try {
175      this.handler = null
176      const headers = this.responseHeaders === 'raw' ? util.parseRawHeaders(rawHeaders) : util.parseHeaders(rawHeaders)
177      body = this.runInAsyncScope(handler, null, {
178        statusCode,
179        headers,
180        opaque,
181        body: this.res,
182        context
183      })
184    } catch (err) {
185      this.res.on('error', util.nop)
186      throw err
187    }
188
189    if (!body || typeof body.on !== 'function') {
190      throw new InvalidReturnValueError('expected Readable')
191    }
192
193    body
194      .on('data', (chunk) => {
195        const { ret, body } = this
196
197        if (!ret.push(chunk) && body.pause) {
198          body.pause()
199        }
200      })
201      .on('error', (err) => {
202        const { ret } = this
203
204        util.destroy(ret, err)
205      })
206      .on('end', () => {
207        const { ret } = this
208
209        ret.push(null)
210      })
211      .on('close', () => {
212        const { ret } = this
213
214        if (!ret._readableState.ended) {
215          util.destroy(ret, new RequestAbortedError())
216        }
217      })
218
219    this.body = body
220  }
221
222  onData (chunk) {
223    const { res } = this
224    return res.push(chunk)
225  }
226
227  onComplete (trailers) {
228    const { res } = this
229    res.push(null)
230  }
231
232  onError (err) {
233    const { ret } = this
234    this.handler = null
235    util.destroy(ret, err)
236  }
237}
238
239function pipeline (opts, handler) {
240  try {
241    const pipelineHandler = new PipelineHandler(opts, handler)
242    this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
243    return pipelineHandler.ret
244  } catch (err) {
245    return new PassThrough().destroy(err)
246  }
247}
248
249module.exports = pipeline
250