• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// @ts-check
2
3'use strict'
4
5/* global WebAssembly */
6
7const assert = require('assert')
8const net = require('net')
9const http = require('http')
10const { pipeline } = require('stream')
11const util = require('./core/util')
12const timers = require('./timers')
13const Request = require('./core/request')
14const DispatcherBase = require('./dispatcher-base')
15const {
16  RequestContentLengthMismatchError,
17  ResponseContentLengthMismatchError,
18  InvalidArgumentError,
19  RequestAbortedError,
20  HeadersTimeoutError,
21  HeadersOverflowError,
22  SocketError,
23  InformationalError,
24  BodyTimeoutError,
25  HTTPParserError,
26  ResponseExceededMaxSizeError,
27  ClientDestroyedError
28} = require('./core/errors')
29const buildConnector = require('./core/connect')
30const {
31  kUrl,
32  kReset,
33  kServerName,
34  kClient,
35  kBusy,
36  kParser,
37  kConnect,
38  kBlocking,
39  kResuming,
40  kRunning,
41  kPending,
42  kSize,
43  kWriting,
44  kQueue,
45  kConnected,
46  kConnecting,
47  kNeedDrain,
48  kNoRef,
49  kKeepAliveDefaultTimeout,
50  kHostHeader,
51  kPendingIdx,
52  kRunningIdx,
53  kError,
54  kPipelining,
55  kSocket,
56  kKeepAliveTimeoutValue,
57  kMaxHeadersSize,
58  kKeepAliveMaxTimeout,
59  kKeepAliveTimeoutThreshold,
60  kHeadersTimeout,
61  kBodyTimeout,
62  kStrictContentLength,
63  kConnector,
64  kMaxRedirections,
65  kMaxRequests,
66  kCounter,
67  kClose,
68  kDestroy,
69  kDispatch,
70  kInterceptors,
71  kLocalAddress,
72  kMaxResponseSize,
73  kHTTPConnVersion,
74  // HTTP2
75  kHost,
76  kHTTP2Session,
77  kHTTP2SessionState,
78  kHTTP2BuildRequest,
79  kHTTP2CopyHeaders,
80  kHTTP1BuildRequest
81} = require('./core/symbols')
82
83/** @type {import('http2')} */
84let http2
85try {
86  http2 = require('http2')
87} catch {
88  // @ts-ignore
89  http2 = { constants: {} }
90}
91
92const {
93  constants: {
94    HTTP2_HEADER_AUTHORITY,
95    HTTP2_HEADER_METHOD,
96    HTTP2_HEADER_PATH,
97    HTTP2_HEADER_SCHEME,
98    HTTP2_HEADER_CONTENT_LENGTH,
99    HTTP2_HEADER_EXPECT,
100    HTTP2_HEADER_STATUS
101  }
102} = http2
103
104// Experimental
105let h2ExperimentalWarned = false
106
107const FastBuffer = Buffer[Symbol.species]
108
109const kClosedResolve = Symbol('kClosedResolve')
110
111const channels = {}
112
113try {
114  const diagnosticsChannel = require('diagnostics_channel')
115  channels.sendHeaders = diagnosticsChannel.channel('undici:client:sendHeaders')
116  channels.beforeConnect = diagnosticsChannel.channel('undici:client:beforeConnect')
117  channels.connectError = diagnosticsChannel.channel('undici:client:connectError')
118  channels.connected = diagnosticsChannel.channel('undici:client:connected')
119} catch {
120  channels.sendHeaders = { hasSubscribers: false }
121  channels.beforeConnect = { hasSubscribers: false }
122  channels.connectError = { hasSubscribers: false }
123  channels.connected = { hasSubscribers: false }
124}
125
126/**
127 * @type {import('../types/client').default}
128 */
129class Client extends DispatcherBase {
130  /**
131   *
132   * @param {string|URL} url
133   * @param {import('../types/client').Client.Options} options
134   */
135  constructor (url, {
136    interceptors,
137    maxHeaderSize,
138    headersTimeout,
139    socketTimeout,
140    requestTimeout,
141    connectTimeout,
142    bodyTimeout,
143    idleTimeout,
144    keepAlive,
145    keepAliveTimeout,
146    maxKeepAliveTimeout,
147    keepAliveMaxTimeout,
148    keepAliveTimeoutThreshold,
149    socketPath,
150    pipelining,
151    tls,
152    strictContentLength,
153    maxCachedSessions,
154    maxRedirections,
155    connect,
156    maxRequestsPerClient,
157    localAddress,
158    maxResponseSize,
159    autoSelectFamily,
160    autoSelectFamilyAttemptTimeout,
161    // h2
162    allowH2,
163    maxConcurrentStreams
164  } = {}) {
165    super()
166
167    if (keepAlive !== undefined) {
168      throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead')
169    }
170
171    if (socketTimeout !== undefined) {
172      throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead')
173    }
174
175    if (requestTimeout !== undefined) {
176      throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead')
177    }
178
179    if (idleTimeout !== undefined) {
180      throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead')
181    }
182
183    if (maxKeepAliveTimeout !== undefined) {
184      throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead')
185    }
186
187    if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) {
188      throw new InvalidArgumentError('invalid maxHeaderSize')
189    }
190
191    if (socketPath != null && typeof socketPath !== 'string') {
192      throw new InvalidArgumentError('invalid socketPath')
193    }
194
195    if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) {
196      throw new InvalidArgumentError('invalid connectTimeout')
197    }
198
199    if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) {
200      throw new InvalidArgumentError('invalid keepAliveTimeout')
201    }
202
203    if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) {
204      throw new InvalidArgumentError('invalid keepAliveMaxTimeout')
205    }
206
207    if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) {
208      throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold')
209    }
210
211    if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) {
212      throw new InvalidArgumentError('headersTimeout must be a positive integer or zero')
213    }
214
215    if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) {
216      throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero')
217    }
218
219    if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') {
220      throw new InvalidArgumentError('connect must be a function or an object')
221    }
222
223    if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
224      throw new InvalidArgumentError('maxRedirections must be a positive number')
225    }
226
227    if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) {
228      throw new InvalidArgumentError('maxRequestsPerClient must be a positive number')
229    }
230
231    if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) {
232      throw new InvalidArgumentError('localAddress must be valid string IP address')
233    }
234
235    if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) {
236      throw new InvalidArgumentError('maxResponseSize must be a positive number')
237    }
238
239    if (
240      autoSelectFamilyAttemptTimeout != null &&
241      (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1)
242    ) {
243      throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number')
244    }
245
246    // h2
247    if (allowH2 != null && typeof allowH2 !== 'boolean') {
248      throw new InvalidArgumentError('allowH2 must be a valid boolean value')
249    }
250
251    if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) {
252      throw new InvalidArgumentError('maxConcurrentStreams must be a possitive integer, greater than 0')
253    }
254
255    if (typeof connect !== 'function') {
256      connect = buildConnector({
257        ...tls,
258        maxCachedSessions,
259        allowH2,
260        socketPath,
261        timeout: connectTimeout,
262        ...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined),
263        ...connect
264      })
265    }
266
267    this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client)
268      ? interceptors.Client
269      : [createRedirectInterceptor({ maxRedirections })]
270    this[kUrl] = util.parseOrigin(url)
271    this[kConnector] = connect
272    this[kSocket] = null
273    this[kPipelining] = pipelining != null ? pipelining : 1
274    this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize
275    this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout
276    this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout
277    this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold
278    this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout]
279    this[kServerName] = null
280    this[kLocalAddress] = localAddress != null ? localAddress : null
281    this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming
282    this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming
283    this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n`
284    this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3
285    this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3
286    this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength
287    this[kMaxRedirections] = maxRedirections
288    this[kMaxRequests] = maxRequestsPerClient
289    this[kClosedResolve] = null
290    this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
291    this[kHTTPConnVersion] = 'h1'
292
293    // HTTP/2
294    this[kHTTP2Session] = null
295    this[kHTTP2SessionState] = !allowH2
296      ? null
297      : {
298        // streams: null, // Fixed queue of streams - For future support of `push`
299          openStreams: 0, // Keep track of them to decide wether or not unref the session
300          maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
301        }
302    this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}`
303
304    // kQueue is built up of 3 sections separated by
305    // the kRunningIdx and kPendingIdx indices.
306    // |   complete   |   running   |   pending   |
307    //                ^ kRunningIdx ^ kPendingIdx ^ kQueue.length
308    // kRunningIdx points to the first running element.
309    // kPendingIdx points to the first pending element.
310    // This implements a fast queue with an amortized
311    // time of O(1).
312
313    this[kQueue] = []
314    this[kRunningIdx] = 0
315    this[kPendingIdx] = 0
316  }
317
318  get pipelining () {
319    return this[kPipelining]
320  }
321
322  set pipelining (value) {
323    this[kPipelining] = value
324    resume(this, true)
325  }
326
327  get [kPending] () {
328    return this[kQueue].length - this[kPendingIdx]
329  }
330
331  get [kRunning] () {
332    return this[kPendingIdx] - this[kRunningIdx]
333  }
334
335  get [kSize] () {
336    return this[kQueue].length - this[kRunningIdx]
337  }
338
339  get [kConnected] () {
340    return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed
341  }
342
343  get [kBusy] () {
344    const socket = this[kSocket]
345    return (
346      (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) ||
347      (this[kSize] >= (this[kPipelining] || 1)) ||
348      this[kPending] > 0
349    )
350  }
351
352  /* istanbul ignore: only used for test */
353  [kConnect] (cb) {
354    connect(this)
355    this.once('connect', cb)
356  }
357
358  [kDispatch] (opts, handler) {
359    const origin = opts.origin || this[kUrl].origin
360
361    const request = this[kHTTPConnVersion] === 'h2'
362      ? Request[kHTTP2BuildRequest](origin, opts, handler)
363      : Request[kHTTP1BuildRequest](origin, opts, handler)
364
365    this[kQueue].push(request)
366    if (this[kResuming]) {
367      // Do nothing.
368    } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) {
369      // Wait a tick in case stream/iterator is ended in the same tick.
370      this[kResuming] = 1
371      process.nextTick(resume, this)
372    } else {
373      resume(this, true)
374    }
375
376    if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) {
377      this[kNeedDrain] = 2
378    }
379
380    return this[kNeedDrain] < 2
381  }
382
383  async [kClose] () {
384    // TODO: for H2 we need to gracefully flush the remaining enqueued
385    // request and close each stream.
386    return new Promise((resolve) => {
387      if (!this[kSize]) {
388        resolve(null)
389      } else {
390        this[kClosedResolve] = resolve
391      }
392    })
393  }
394
395  async [kDestroy] (err) {
396    return new Promise((resolve) => {
397      const requests = this[kQueue].splice(this[kPendingIdx])
398      for (let i = 0; i < requests.length; i++) {
399        const request = requests[i]
400        errorRequest(this, request, err)
401      }
402
403      const callback = () => {
404        if (this[kClosedResolve]) {
405          // TODO (fix): Should we error here with ClientDestroyedError?
406          this[kClosedResolve]()
407          this[kClosedResolve] = null
408        }
409        resolve()
410      }
411
412      if (this[kHTTP2Session] != null) {
413        util.destroy(this[kHTTP2Session], err)
414        this[kHTTP2Session] = null
415        this[kHTTP2SessionState] = null
416      }
417
418      if (!this[kSocket]) {
419        queueMicrotask(callback)
420      } else {
421        util.destroy(this[kSocket].on('close', callback), err)
422      }
423
424      resume(this)
425    })
426  }
427}
428
429function onHttp2SessionError (err) {
430  assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
431
432  this[kSocket][kError] = err
433
434  onError(this[kClient], err)
435}
436
437function onHttp2FrameError (type, code, id) {
438  const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
439
440  if (id === 0) {
441    this[kSocket][kError] = err
442    onError(this[kClient], err)
443  }
444}
445
446function onHttp2SessionEnd () {
447  util.destroy(this, new SocketError('other side closed'))
448  util.destroy(this[kSocket], new SocketError('other side closed'))
449}
450
451function onHTTP2GoAway (code) {
452  const client = this[kClient]
453  const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
454  client[kSocket] = null
455  client[kHTTP2Session] = null
456
457  if (client.destroyed) {
458    assert(this[kPending] === 0)
459
460    // Fail entire queue.
461    const requests = client[kQueue].splice(client[kRunningIdx])
462    for (let i = 0; i < requests.length; i++) {
463      const request = requests[i]
464      errorRequest(this, request, err)
465    }
466  } else if (client[kRunning] > 0) {
467    // Fail head of pipeline.
468    const request = client[kQueue][client[kRunningIdx]]
469    client[kQueue][client[kRunningIdx]++] = null
470
471    errorRequest(client, request, err)
472  }
473
474  client[kPendingIdx] = client[kRunningIdx]
475
476  assert(client[kRunning] === 0)
477
478  client.emit('disconnect',
479    client[kUrl],
480    [client],
481    err
482  )
483
484  resume(client)
485}
486
487const constants = require('./llhttp/constants')
488const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
489const EMPTY_BUF = Buffer.alloc(0)
490
491async function lazyllhttp () {
492  const llhttpWasmData = process.env.JEST_WORKER_ID ? require('./llhttp/llhttp-wasm.js') : undefined
493
494  let mod
495  try {
496    mod = await WebAssembly.compile(Buffer.from(require('./llhttp/llhttp_simd-wasm.js'), 'base64'))
497  } catch (e) {
498    /* istanbul ignore next */
499
500    // We could check if the error was caused by the simd option not
501    // being enabled, but the occurring of this other error
502    // * https://github.com/emscripten-core/emscripten/issues/11495
503    // got me to remove that check to avoid breaking Node 12.
504    mod = await WebAssembly.compile(Buffer.from(llhttpWasmData || require('./llhttp/llhttp-wasm.js'), 'base64'))
505  }
506
507  return await WebAssembly.instantiate(mod, {
508    env: {
509      /* eslint-disable camelcase */
510
511      wasm_on_url: (p, at, len) => {
512        /* istanbul ignore next */
513        return 0
514      },
515      wasm_on_status: (p, at, len) => {
516        assert.strictEqual(currentParser.ptr, p)
517        const start = at - currentBufferPtr + currentBufferRef.byteOffset
518        return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
519      },
520      wasm_on_message_begin: (p) => {
521        assert.strictEqual(currentParser.ptr, p)
522        return currentParser.onMessageBegin() || 0
523      },
524      wasm_on_header_field: (p, at, len) => {
525        assert.strictEqual(currentParser.ptr, p)
526        const start = at - currentBufferPtr + currentBufferRef.byteOffset
527        return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
528      },
529      wasm_on_header_value: (p, at, len) => {
530        assert.strictEqual(currentParser.ptr, p)
531        const start = at - currentBufferPtr + currentBufferRef.byteOffset
532        return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
533      },
534      wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => {
535        assert.strictEqual(currentParser.ptr, p)
536        return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0
537      },
538      wasm_on_body: (p, at, len) => {
539        assert.strictEqual(currentParser.ptr, p)
540        const start = at - currentBufferPtr + currentBufferRef.byteOffset
541        return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0
542      },
543      wasm_on_message_complete: (p) => {
544        assert.strictEqual(currentParser.ptr, p)
545        return currentParser.onMessageComplete() || 0
546      }
547
548      /* eslint-enable camelcase */
549    }
550  })
551}
552
553let llhttpInstance = null
554let llhttpPromise = lazyllhttp()
555llhttpPromise.catch()
556
557let currentParser = null
558let currentBufferRef = null
559let currentBufferSize = 0
560let currentBufferPtr = null
561
562const TIMEOUT_HEADERS = 1
563const TIMEOUT_BODY = 2
564const TIMEOUT_IDLE = 3
565
566class Parser {
567  constructor (client, socket, { exports }) {
568    assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0)
569
570    this.llhttp = exports
571    this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
572    this.client = client
573    this.socket = socket
574    this.timeout = null
575    this.timeoutValue = null
576    this.timeoutType = null
577    this.statusCode = null
578    this.statusText = ''
579    this.upgrade = false
580    this.headers = []
581    this.headersSize = 0
582    this.headersMaxSize = client[kMaxHeadersSize]
583    this.shouldKeepAlive = false
584    this.paused = false
585    this.resume = this.resume.bind(this)
586
587    this.bytesRead = 0
588
589    this.keepAlive = ''
590    this.contentLength = ''
591    this.connection = ''
592    this.maxResponseSize = client[kMaxResponseSize]
593  }
594
595  setTimeout (value, type) {
596    this.timeoutType = type
597    if (value !== this.timeoutValue) {
598      timers.clearTimeout(this.timeout)
599      if (value) {
600        this.timeout = timers.setTimeout(onParserTimeout, value, this)
601        // istanbul ignore else: only for jest
602        if (this.timeout.unref) {
603          this.timeout.unref()
604        }
605      } else {
606        this.timeout = null
607      }
608      this.timeoutValue = value
609    } else if (this.timeout) {
610      // istanbul ignore else: only for jest
611      if (this.timeout.refresh) {
612        this.timeout.refresh()
613      }
614    }
615  }
616
617  resume () {
618    if (this.socket.destroyed || !this.paused) {
619      return
620    }
621
622    assert(this.ptr != null)
623    assert(currentParser == null)
624
625    this.llhttp.llhttp_resume(this.ptr)
626
627    assert(this.timeoutType === TIMEOUT_BODY)
628    if (this.timeout) {
629      // istanbul ignore else: only for jest
630      if (this.timeout.refresh) {
631        this.timeout.refresh()
632      }
633    }
634
635    this.paused = false
636    this.execute(this.socket.read() || EMPTY_BUF) // Flush parser.
637    this.readMore()
638  }
639
640  readMore () {
641    while (!this.paused && this.ptr) {
642      const chunk = this.socket.read()
643      if (chunk === null) {
644        break
645      }
646      this.execute(chunk)
647    }
648  }
649
650  execute (data) {
651    assert(this.ptr != null)
652    assert(currentParser == null)
653    assert(!this.paused)
654
655    const { socket, llhttp } = this
656
657    if (data.length > currentBufferSize) {
658      if (currentBufferPtr) {
659        llhttp.free(currentBufferPtr)
660      }
661      currentBufferSize = Math.ceil(data.length / 4096) * 4096
662      currentBufferPtr = llhttp.malloc(currentBufferSize)
663    }
664
665    new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data)
666
667    // Call `execute` on the wasm parser.
668    // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
669    // and finally the length of bytes to parse.
670    // The return value is an error code or `constants.ERROR.OK`.
671    try {
672      let ret
673
674      try {
675        currentBufferRef = data
676        currentParser = this
677        ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length)
678        /* eslint-disable-next-line no-useless-catch */
679      } catch (err) {
680        /* istanbul ignore next: difficult to make a test case for */
681        throw err
682      } finally {
683        currentParser = null
684        currentBufferRef = null
685      }
686
687      const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr
688
689      if (ret === constants.ERROR.PAUSED_UPGRADE) {
690        this.onUpgrade(data.slice(offset))
691      } else if (ret === constants.ERROR.PAUSED) {
692        this.paused = true
693        socket.unshift(data.slice(offset))
694      } else if (ret !== constants.ERROR.OK) {
695        const ptr = llhttp.llhttp_get_error_reason(this.ptr)
696        let message = ''
697        /* istanbul ignore else: difficult to make a test case for */
698        if (ptr) {
699          const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
700          message =
701            'Response does not match the HTTP/1.1 protocol (' +
702            Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
703            ')'
704        }
705        throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset))
706      }
707    } catch (err) {
708      util.destroy(socket, err)
709    }
710  }
711
712  destroy () {
713    assert(this.ptr != null)
714    assert(currentParser == null)
715
716    this.llhttp.llhttp_free(this.ptr)
717    this.ptr = null
718
719    timers.clearTimeout(this.timeout)
720    this.timeout = null
721    this.timeoutValue = null
722    this.timeoutType = null
723
724    this.paused = false
725  }
726
727  onStatus (buf) {
728    this.statusText = buf.toString()
729  }
730
731  onMessageBegin () {
732    const { socket, client } = this
733
734    /* istanbul ignore next: difficult to make a test case for */
735    if (socket.destroyed) {
736      return -1
737    }
738
739    const request = client[kQueue][client[kRunningIdx]]
740    if (!request) {
741      return -1
742    }
743  }
744
745  onHeaderField (buf) {
746    const len = this.headers.length
747
748    if ((len & 1) === 0) {
749      this.headers.push(buf)
750    } else {
751      this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
752    }
753
754    this.trackHeader(buf.length)
755  }
756
757  onHeaderValue (buf) {
758    let len = this.headers.length
759
760    if ((len & 1) === 1) {
761      this.headers.push(buf)
762      len += 1
763    } else {
764      this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf])
765    }
766
767    const key = this.headers[len - 2]
768    if (key.length === 10 && key.toString().toLowerCase() === 'keep-alive') {
769      this.keepAlive += buf.toString()
770    } else if (key.length === 10 && key.toString().toLowerCase() === 'connection') {
771      this.connection += buf.toString()
772    } else if (key.length === 14 && key.toString().toLowerCase() === 'content-length') {
773      this.contentLength += buf.toString()
774    }
775
776    this.trackHeader(buf.length)
777  }
778
779  trackHeader (len) {
780    this.headersSize += len
781    if (this.headersSize >= this.headersMaxSize) {
782      util.destroy(this.socket, new HeadersOverflowError())
783    }
784  }
785
786  onUpgrade (head) {
787    const { upgrade, client, socket, headers, statusCode } = this
788
789    assert(upgrade)
790
791    const request = client[kQueue][client[kRunningIdx]]
792    assert(request)
793
794    assert(!socket.destroyed)
795    assert(socket === client[kSocket])
796    assert(!this.paused)
797    assert(request.upgrade || request.method === 'CONNECT')
798
799    this.statusCode = null
800    this.statusText = ''
801    this.shouldKeepAlive = null
802
803    assert(this.headers.length % 2 === 0)
804    this.headers = []
805    this.headersSize = 0
806
807    socket.unshift(head)
808
809    socket[kParser].destroy()
810    socket[kParser] = null
811
812    socket[kClient] = null
813    socket[kError] = null
814    socket
815      .removeListener('error', onSocketError)
816      .removeListener('readable', onSocketReadable)
817      .removeListener('end', onSocketEnd)
818      .removeListener('close', onSocketClose)
819
820    client[kSocket] = null
821    client[kQueue][client[kRunningIdx]++] = null
822    client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))
823
824    try {
825      request.onUpgrade(statusCode, headers, socket)
826    } catch (err) {
827      util.destroy(socket, err)
828    }
829
830    resume(client)
831  }
832
833  onHeadersComplete (statusCode, upgrade, shouldKeepAlive) {
834    const { client, socket, headers, statusText } = this
835
836    /* istanbul ignore next: difficult to make a test case for */
837    if (socket.destroyed) {
838      return -1
839    }
840
841    const request = client[kQueue][client[kRunningIdx]]
842
843    /* istanbul ignore next: difficult to make a test case for */
844    if (!request) {
845      return -1
846    }
847
848    assert(!this.upgrade)
849    assert(this.statusCode < 200)
850
851    if (statusCode === 100) {
852      util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket)))
853      return -1
854    }
855
856    /* this can only happen if server is misbehaving */
857    if (upgrade && !request.upgrade) {
858      util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket)))
859      return -1
860    }
861
862    assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS)
863
864    this.statusCode = statusCode
865    this.shouldKeepAlive = (
866      shouldKeepAlive ||
867      // Override llhttp value which does not allow keepAlive for HEAD.
868      (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive')
869    )
870
871    if (this.statusCode >= 200) {
872      const bodyTimeout = request.bodyTimeout != null
873        ? request.bodyTimeout
874        : client[kBodyTimeout]
875      this.setTimeout(bodyTimeout, TIMEOUT_BODY)
876    } else if (this.timeout) {
877      // istanbul ignore else: only for jest
878      if (this.timeout.refresh) {
879        this.timeout.refresh()
880      }
881    }
882
883    if (request.method === 'CONNECT') {
884      assert(client[kRunning] === 1)
885      this.upgrade = true
886      return 2
887    }
888
889    if (upgrade) {
890      assert(client[kRunning] === 1)
891      this.upgrade = true
892      return 2
893    }
894
895    assert(this.headers.length % 2 === 0)
896    this.headers = []
897    this.headersSize = 0
898
899    if (this.shouldKeepAlive && client[kPipelining]) {
900      const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null
901
902      if (keepAliveTimeout != null) {
903        const timeout = Math.min(
904          keepAliveTimeout - client[kKeepAliveTimeoutThreshold],
905          client[kKeepAliveMaxTimeout]
906        )
907        if (timeout <= 0) {
908          socket[kReset] = true
909        } else {
910          client[kKeepAliveTimeoutValue] = timeout
911        }
912      } else {
913        client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout]
914      }
915    } else {
916      // Stop more requests from being dispatched.
917      socket[kReset] = true
918    }
919
920    const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false
921
922    if (request.aborted) {
923      return -1
924    }
925
926    if (request.method === 'HEAD') {
927      return 1
928    }
929
930    if (statusCode < 200) {
931      return 1
932    }
933
934    if (socket[kBlocking]) {
935      socket[kBlocking] = false
936      resume(client)
937    }
938
939    return pause ? constants.ERROR.PAUSED : 0
940  }
941
942  onBody (buf) {
943    const { client, socket, statusCode, maxResponseSize } = this
944
945    if (socket.destroyed) {
946      return -1
947    }
948
949    const request = client[kQueue][client[kRunningIdx]]
950    assert(request)
951
952    assert.strictEqual(this.timeoutType, TIMEOUT_BODY)
953    if (this.timeout) {
954      // istanbul ignore else: only for jest
955      if (this.timeout.refresh) {
956        this.timeout.refresh()
957      }
958    }
959
960    assert(statusCode >= 200)
961
962    if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) {
963      util.destroy(socket, new ResponseExceededMaxSizeError())
964      return -1
965    }
966
967    this.bytesRead += buf.length
968
969    if (request.onData(buf) === false) {
970      return constants.ERROR.PAUSED
971    }
972  }
973
974  onMessageComplete () {
975    const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this
976
977    if (socket.destroyed && (!statusCode || shouldKeepAlive)) {
978      return -1
979    }
980
981    if (upgrade) {
982      return
983    }
984
985    const request = client[kQueue][client[kRunningIdx]]
986    assert(request)
987
988    assert(statusCode >= 100)
989
990    this.statusCode = null
991    this.statusText = ''
992    this.bytesRead = 0
993    this.contentLength = ''
994    this.keepAlive = ''
995    this.connection = ''
996
997    assert(this.headers.length % 2 === 0)
998    this.headers = []
999    this.headersSize = 0
1000
1001    if (statusCode < 200) {
1002      return
1003    }
1004
1005    /* istanbul ignore next: should be handled by llhttp? */
1006    if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) {
1007      util.destroy(socket, new ResponseContentLengthMismatchError())
1008      return -1
1009    }
1010
1011    request.onComplete(headers)
1012
1013    client[kQueue][client[kRunningIdx]++] = null
1014
1015    if (socket[kWriting]) {
1016      assert.strictEqual(client[kRunning], 0)
1017      // Response completed before request.
1018      util.destroy(socket, new InformationalError('reset'))
1019      return constants.ERROR.PAUSED
1020    } else if (!shouldKeepAlive) {
1021      util.destroy(socket, new InformationalError('reset'))
1022      return constants.ERROR.PAUSED
1023    } else if (socket[kReset] && client[kRunning] === 0) {
1024      // Destroy socket once all requests have completed.
1025      // The request at the tail of the pipeline is the one
1026      // that requested reset and no further requests should
1027      // have been queued since then.
1028      util.destroy(socket, new InformationalError('reset'))
1029      return constants.ERROR.PAUSED
1030    } else if (client[kPipelining] === 1) {
1031      // We must wait a full event loop cycle to reuse this socket to make sure
1032      // that non-spec compliant servers are not closing the connection even if they
1033      // said they won't.
1034      setImmediate(resume, client)
1035    } else {
1036      resume(client)
1037    }
1038  }
1039}
1040
1041function onParserTimeout (parser) {
1042  const { socket, timeoutType, client } = parser
1043
1044  /* istanbul ignore else */
1045  if (timeoutType === TIMEOUT_HEADERS) {
1046    if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) {
1047      assert(!parser.paused, 'cannot be paused while waiting for headers')
1048      util.destroy(socket, new HeadersTimeoutError())
1049    }
1050  } else if (timeoutType === TIMEOUT_BODY) {
1051    if (!parser.paused) {
1052      util.destroy(socket, new BodyTimeoutError())
1053    }
1054  } else if (timeoutType === TIMEOUT_IDLE) {
1055    assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue])
1056    util.destroy(socket, new InformationalError('socket idle timeout'))
1057  }
1058}
1059
1060function onSocketReadable () {
1061  const { [kParser]: parser } = this
1062  if (parser) {
1063    parser.readMore()
1064  }
1065}
1066
1067function onSocketError (err) {
1068  const { [kClient]: client, [kParser]: parser } = this
1069
1070  assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
1071
1072  if (client[kHTTPConnVersion] !== 'h2') {
1073    // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
1074    // to the user.
1075    if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
1076      // We treat all incoming data so for as a valid response.
1077      parser.onMessageComplete()
1078      return
1079    }
1080  }
1081
1082  this[kError] = err
1083
1084  onError(this[kClient], err)
1085}
1086
1087function onError (client, err) {
1088  if (
1089    client[kRunning] === 0 &&
1090    err.code !== 'UND_ERR_INFO' &&
1091    err.code !== 'UND_ERR_SOCKET'
1092  ) {
1093    // Error is not caused by running request and not a recoverable
1094    // socket error.
1095
1096    assert(client[kPendingIdx] === client[kRunningIdx])
1097
1098    const requests = client[kQueue].splice(client[kRunningIdx])
1099    for (let i = 0; i < requests.length; i++) {
1100      const request = requests[i]
1101      errorRequest(client, request, err)
1102    }
1103    assert(client[kSize] === 0)
1104  }
1105}
1106
1107function onSocketEnd () {
1108  const { [kParser]: parser, [kClient]: client } = this
1109
1110  if (client[kHTTPConnVersion] !== 'h2') {
1111    if (parser.statusCode && !parser.shouldKeepAlive) {
1112      // We treat all incoming data so far as a valid response.
1113      parser.onMessageComplete()
1114      return
1115    }
1116  }
1117
1118  util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
1119}
1120
1121function onSocketClose () {
1122  const { [kClient]: client, [kParser]: parser } = this
1123
1124  if (client[kHTTPConnVersion] === 'h1' && parser) {
1125    if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
1126      // We treat all incoming data so far as a valid response.
1127      parser.onMessageComplete()
1128    }
1129
1130    this[kParser].destroy()
1131    this[kParser] = null
1132  }
1133
1134  const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
1135
1136  client[kSocket] = null
1137
1138  if (client.destroyed) {
1139    assert(client[kPending] === 0)
1140
1141    // Fail entire queue.
1142    const requests = client[kQueue].splice(client[kRunningIdx])
1143    for (let i = 0; i < requests.length; i++) {
1144      const request = requests[i]
1145      errorRequest(client, request, err)
1146    }
1147  } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
1148    // Fail head of pipeline.
1149    const request = client[kQueue][client[kRunningIdx]]
1150    client[kQueue][client[kRunningIdx]++] = null
1151
1152    errorRequest(client, request, err)
1153  }
1154
1155  client[kPendingIdx] = client[kRunningIdx]
1156
1157  assert(client[kRunning] === 0)
1158
1159  client.emit('disconnect', client[kUrl], [client], err)
1160
1161  resume(client)
1162}
1163
1164async function connect (client) {
1165  assert(!client[kConnecting])
1166  assert(!client[kSocket])
1167
1168  let { host, hostname, protocol, port } = client[kUrl]
1169
1170  // Resolve ipv6
1171  if (hostname[0] === '[') {
1172    const idx = hostname.indexOf(']')
1173
1174    assert(idx !== -1)
1175    const ip = hostname.substring(1, idx)
1176
1177    assert(net.isIP(ip))
1178    hostname = ip
1179  }
1180
1181  client[kConnecting] = true
1182
1183  if (channels.beforeConnect.hasSubscribers) {
1184    channels.beforeConnect.publish({
1185      connectParams: {
1186        host,
1187        hostname,
1188        protocol,
1189        port,
1190        servername: client[kServerName],
1191        localAddress: client[kLocalAddress]
1192      },
1193      connector: client[kConnector]
1194    })
1195  }
1196
1197  try {
1198    const socket = await new Promise((resolve, reject) => {
1199      client[kConnector]({
1200        host,
1201        hostname,
1202        protocol,
1203        port,
1204        servername: client[kServerName],
1205        localAddress: client[kLocalAddress]
1206      }, (err, socket) => {
1207        if (err) {
1208          reject(err)
1209        } else {
1210          resolve(socket)
1211        }
1212      })
1213    })
1214
1215    if (client.destroyed) {
1216      util.destroy(socket.on('error', () => {}), new ClientDestroyedError())
1217      return
1218    }
1219
1220    client[kConnecting] = false
1221
1222    assert(socket)
1223
1224    const isH2 = socket.alpnProtocol === 'h2'
1225    if (isH2) {
1226      if (!h2ExperimentalWarned) {
1227        h2ExperimentalWarned = true
1228        process.emitWarning('H2 support is experimental, expect them to change at any time.', {
1229          code: 'UNDICI-H2'
1230        })
1231      }
1232
1233      const session = http2.connect(client[kUrl], {
1234        createConnection: () => socket,
1235        peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
1236      })
1237
1238      client[kHTTPConnVersion] = 'h2'
1239      session[kClient] = client
1240      session[kSocket] = socket
1241      session.on('error', onHttp2SessionError)
1242      session.on('frameError', onHttp2FrameError)
1243      session.on('end', onHttp2SessionEnd)
1244      session.on('goaway', onHTTP2GoAway)
1245      session.on('close', onSocketClose)
1246      session.unref()
1247
1248      client[kHTTP2Session] = session
1249      socket[kHTTP2Session] = session
1250    } else {
1251      if (!llhttpInstance) {
1252        llhttpInstance = await llhttpPromise
1253        llhttpPromise = null
1254      }
1255
1256      socket[kNoRef] = false
1257      socket[kWriting] = false
1258      socket[kReset] = false
1259      socket[kBlocking] = false
1260      socket[kParser] = new Parser(client, socket, llhttpInstance)
1261    }
1262
1263    socket[kCounter] = 0
1264    socket[kMaxRequests] = client[kMaxRequests]
1265    socket[kClient] = client
1266    socket[kError] = null
1267
1268    socket
1269      .on('error', onSocketError)
1270      .on('readable', onSocketReadable)
1271      .on('end', onSocketEnd)
1272      .on('close', onSocketClose)
1273
1274    client[kSocket] = socket
1275
1276    if (channels.connected.hasSubscribers) {
1277      channels.connected.publish({
1278        connectParams: {
1279          host,
1280          hostname,
1281          protocol,
1282          port,
1283          servername: client[kServerName],
1284          localAddress: client[kLocalAddress]
1285        },
1286        connector: client[kConnector],
1287        socket
1288      })
1289    }
1290    client.emit('connect', client[kUrl], [client])
1291  } catch (err) {
1292    if (client.destroyed) {
1293      return
1294    }
1295
1296    client[kConnecting] = false
1297
1298    if (channels.connectError.hasSubscribers) {
1299      channels.connectError.publish({
1300        connectParams: {
1301          host,
1302          hostname,
1303          protocol,
1304          port,
1305          servername: client[kServerName],
1306          localAddress: client[kLocalAddress]
1307        },
1308        connector: client[kConnector],
1309        error: err
1310      })
1311    }
1312
1313    if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') {
1314      assert(client[kRunning] === 0)
1315      while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) {
1316        const request = client[kQueue][client[kPendingIdx]++]
1317        errorRequest(client, request, err)
1318      }
1319    } else {
1320      onError(client, err)
1321    }
1322
1323    client.emit('connectionError', client[kUrl], [client], err)
1324  }
1325
1326  resume(client)
1327}
1328
1329function emitDrain (client) {
1330  client[kNeedDrain] = 0
1331  client.emit('drain', client[kUrl], [client])
1332}
1333
1334function resume (client, sync) {
1335  if (client[kResuming] === 2) {
1336    return
1337  }
1338
1339  client[kResuming] = 2
1340
1341  _resume(client, sync)
1342  client[kResuming] = 0
1343
1344  if (client[kRunningIdx] > 256) {
1345    client[kQueue].splice(0, client[kRunningIdx])
1346    client[kPendingIdx] -= client[kRunningIdx]
1347    client[kRunningIdx] = 0
1348  }
1349}
1350
1351function _resume (client, sync) {
1352  while (true) {
1353    if (client.destroyed) {
1354      assert(client[kPending] === 0)
1355      return
1356    }
1357
1358    if (client[kClosedResolve] && !client[kSize]) {
1359      client[kClosedResolve]()
1360      client[kClosedResolve] = null
1361      return
1362    }
1363
1364    const socket = client[kSocket]
1365
1366    if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') {
1367      if (client[kSize] === 0) {
1368        if (!socket[kNoRef] && socket.unref) {
1369          socket.unref()
1370          socket[kNoRef] = true
1371        }
1372      } else if (socket[kNoRef] && socket.ref) {
1373        socket.ref()
1374        socket[kNoRef] = false
1375      }
1376
1377      if (client[kSize] === 0) {
1378        if (socket[kParser].timeoutType !== TIMEOUT_IDLE) {
1379          socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE)
1380        }
1381      } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) {
1382        if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) {
1383          const request = client[kQueue][client[kRunningIdx]]
1384          const headersTimeout = request.headersTimeout != null
1385            ? request.headersTimeout
1386            : client[kHeadersTimeout]
1387          socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS)
1388        }
1389      }
1390    }
1391
1392    if (client[kBusy]) {
1393      client[kNeedDrain] = 2
1394    } else if (client[kNeedDrain] === 2) {
1395      if (sync) {
1396        client[kNeedDrain] = 1
1397        process.nextTick(emitDrain, client)
1398      } else {
1399        emitDrain(client)
1400      }
1401      continue
1402    }
1403
1404    if (client[kPending] === 0) {
1405      return
1406    }
1407
1408    if (client[kRunning] >= (client[kPipelining] || 1)) {
1409      return
1410    }
1411
1412    const request = client[kQueue][client[kPendingIdx]]
1413
1414    if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) {
1415      if (client[kRunning] > 0) {
1416        return
1417      }
1418
1419      client[kServerName] = request.servername
1420
1421      if (socket && socket.servername !== request.servername) {
1422        util.destroy(socket, new InformationalError('servername changed'))
1423        return
1424      }
1425    }
1426
1427    if (client[kConnecting]) {
1428      return
1429    }
1430
1431    if (!socket && !client[kHTTP2Session]) {
1432      connect(client)
1433      return
1434    }
1435
1436    if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) {
1437      return
1438    }
1439
1440    if (client[kRunning] > 0 && !request.idempotent) {
1441      // Non-idempotent request cannot be retried.
1442      // Ensure that no other requests are inflight and
1443      // could cause failure.
1444      return
1445    }
1446
1447    if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) {
1448      // Don't dispatch an upgrade until all preceding requests have completed.
1449      // A misbehaving server might upgrade the connection before all pipelined
1450      // request has completed.
1451      return
1452    }
1453
1454    if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 &&
1455      (util.isStream(request.body) || util.isAsyncIterable(request.body))) {
1456      // Request with stream or iterator body can error while other requests
1457      // are inflight and indirectly error those as well.
1458      // Ensure this doesn't happen by waiting for inflight
1459      // to complete before dispatching.
1460
1461      // Request with stream or iterator body cannot be retried.
1462      // Ensure that no other requests are inflight and
1463      // could cause failure.
1464      return
1465    }
1466
1467    if (!request.aborted && write(client, request)) {
1468      client[kPendingIdx]++
1469    } else {
1470      client[kQueue].splice(client[kPendingIdx], 1)
1471    }
1472  }
1473}
1474
1475// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
1476function shouldSendContentLength (method) {
1477  return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
1478}
1479
1480function write (client, request) {
1481  if (client[kHTTPConnVersion] === 'h2') {
1482    writeH2(client, client[kHTTP2Session], request)
1483    return
1484  }
1485
1486  const { body, method, path, host, upgrade, headers, blocking, reset } = request
1487
1488  // https://tools.ietf.org/html/rfc7231#section-4.3.1
1489  // https://tools.ietf.org/html/rfc7231#section-4.3.2
1490  // https://tools.ietf.org/html/rfc7231#section-4.3.5
1491
1492  // Sending a payload body on a request that does not
1493  // expect it can cause undefined behavior on some
1494  // servers and corrupt connection state. Do not
1495  // re-use the connection for further requests.
1496
1497  const expectsPayload = (
1498    method === 'PUT' ||
1499    method === 'POST' ||
1500    method === 'PATCH'
1501  )
1502
1503  if (body && typeof body.read === 'function') {
1504    // Try to read EOF in order to get length.
1505    body.read(0)
1506  }
1507
1508  const bodyLength = util.bodyLength(body)
1509
1510  let contentLength = bodyLength
1511
1512  if (contentLength === null) {
1513    contentLength = request.contentLength
1514  }
1515
1516  if (contentLength === 0 && !expectsPayload) {
1517    // https://tools.ietf.org/html/rfc7230#section-3.3.2
1518    // A user agent SHOULD NOT send a Content-Length header field when
1519    // the request message does not contain a payload body and the method
1520    // semantics do not anticipate such a body.
1521
1522    contentLength = null
1523  }
1524
1525  // https://github.com/nodejs/undici/issues/2046
1526  // A user agent may send a Content-Length header with 0 value, this should be allowed.
1527  if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) {
1528    if (client[kStrictContentLength]) {
1529      errorRequest(client, request, new RequestContentLengthMismatchError())
1530      return false
1531    }
1532
1533    process.emitWarning(new RequestContentLengthMismatchError())
1534  }
1535
1536  const socket = client[kSocket]
1537
1538  try {
1539    request.onConnect((err) => {
1540      if (request.aborted || request.completed) {
1541        return
1542      }
1543
1544      errorRequest(client, request, err || new RequestAbortedError())
1545
1546      util.destroy(socket, new InformationalError('aborted'))
1547    })
1548  } catch (err) {
1549    errorRequest(client, request, err)
1550  }
1551
1552  if (request.aborted) {
1553    return false
1554  }
1555
1556  if (method === 'HEAD') {
1557    // https://github.com/mcollina/undici/issues/258
1558    // Close after a HEAD request to interop with misbehaving servers
1559    // that may send a body in the response.
1560
1561    socket[kReset] = true
1562  }
1563
1564  if (upgrade || method === 'CONNECT') {
1565    // On CONNECT or upgrade, block pipeline from dispatching further
1566    // requests on this connection.
1567
1568    socket[kReset] = true
1569  }
1570
1571  if (reset != null) {
1572    socket[kReset] = reset
1573  }
1574
1575  if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) {
1576    socket[kReset] = true
1577  }
1578
1579  if (blocking) {
1580    socket[kBlocking] = true
1581  }
1582
1583  let header = `${method} ${path} HTTP/1.1\r\n`
1584
1585  if (typeof host === 'string') {
1586    header += `host: ${host}\r\n`
1587  } else {
1588    header += client[kHostHeader]
1589  }
1590
1591  if (upgrade) {
1592    header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n`
1593  } else if (client[kPipelining] && !socket[kReset]) {
1594    header += 'connection: keep-alive\r\n'
1595  } else {
1596    header += 'connection: close\r\n'
1597  }
1598
1599  if (headers) {
1600    header += headers
1601  }
1602
1603  if (channels.sendHeaders.hasSubscribers) {
1604    channels.sendHeaders.publish({ request, headers: header, socket })
1605  }
1606
1607  /* istanbul ignore else: assertion */
1608  if (!body || bodyLength === 0) {
1609    if (contentLength === 0) {
1610      socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
1611    } else {
1612      assert(contentLength === null, 'no body must not have content length')
1613      socket.write(`${header}\r\n`, 'latin1')
1614    }
1615    request.onRequestSent()
1616  } else if (util.isBuffer(body)) {
1617    assert(contentLength === body.byteLength, 'buffer body must have content length')
1618
1619    socket.cork()
1620    socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
1621    socket.write(body)
1622    socket.uncork()
1623    request.onBodySent(body)
1624    request.onRequestSent()
1625    if (!expectsPayload) {
1626      socket[kReset] = true
1627    }
1628  } else if (util.isBlobLike(body)) {
1629    if (typeof body.stream === 'function') {
1630      writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload })
1631    } else {
1632      writeBlob({ body, client, request, socket, contentLength, header, expectsPayload })
1633    }
1634  } else if (util.isStream(body)) {
1635    writeStream({ body, client, request, socket, contentLength, header, expectsPayload })
1636  } else if (util.isIterable(body)) {
1637    writeIterable({ body, client, request, socket, contentLength, header, expectsPayload })
1638  } else {
1639    assert(false)
1640  }
1641
1642  return true
1643}
1644
1645function writeH2 (client, session, request) {
1646  const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request
1647
1648  let headers
1649  if (typeof reqHeaders === 'string') headers = Request[kHTTP2CopyHeaders](reqHeaders.trim())
1650  else headers = reqHeaders
1651
1652  if (upgrade) {
1653    errorRequest(client, request, new Error('Upgrade not supported for H2'))
1654    return false
1655  }
1656
1657  try {
1658    // TODO(HTTP/2): Should we call onConnect immediately or on stream ready event?
1659    request.onConnect((err) => {
1660      if (request.aborted || request.completed) {
1661        return
1662      }
1663
1664      errorRequest(client, request, err || new RequestAbortedError())
1665    })
1666  } catch (err) {
1667    errorRequest(client, request, err)
1668  }
1669
1670  if (request.aborted) {
1671    return false
1672  }
1673
1674  /** @type {import('node:http2').ClientHttp2Stream} */
1675  let stream
1676  const h2State = client[kHTTP2SessionState]
1677
1678  headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost]
1679  headers[HTTP2_HEADER_METHOD] = method
1680
1681  if (method === 'CONNECT') {
1682    session.ref()
1683    // we are already connected, streams are pending, first request
1684    // will create a new stream. We trigger a request to create the stream and wait until
1685    // `ready` event is triggered
1686    // We disabled endStream to allow the user to write to the stream
1687    stream = session.request(headers, { endStream: false, signal })
1688
1689    if (stream.id && !stream.pending) {
1690      request.onUpgrade(null, null, stream)
1691      ++h2State.openStreams
1692    } else {
1693      stream.once('ready', () => {
1694        request.onUpgrade(null, null, stream)
1695        ++h2State.openStreams
1696      })
1697    }
1698
1699    stream.once('close', () => {
1700      h2State.openStreams -= 1
1701      // TODO(HTTP/2): unref only if current streams count is 0
1702      if (h2State.openStreams === 0) session.unref()
1703    })
1704
1705    return true
1706  }
1707
1708  // https://tools.ietf.org/html/rfc7540#section-8.3
1709  // :path and :scheme headers must be omited when sending CONNECT
1710
1711  headers[HTTP2_HEADER_PATH] = path
1712  headers[HTTP2_HEADER_SCHEME] = 'https'
1713
1714  // https://tools.ietf.org/html/rfc7231#section-4.3.1
1715  // https://tools.ietf.org/html/rfc7231#section-4.3.2
1716  // https://tools.ietf.org/html/rfc7231#section-4.3.5
1717
1718  // Sending a payload body on a request that does not
1719  // expect it can cause undefined behavior on some
1720  // servers and corrupt connection state. Do not
1721  // re-use the connection for further requests.
1722
1723  const expectsPayload = (
1724    method === 'PUT' ||
1725    method === 'POST' ||
1726    method === 'PATCH'
1727  )
1728
1729  if (body && typeof body.read === 'function') {
1730    // Try to read EOF in order to get length.
1731    body.read(0)
1732  }
1733
1734  let contentLength = util.bodyLength(body)
1735
1736  if (contentLength == null) {
1737    contentLength = request.contentLength
1738  }
1739
1740  if (contentLength === 0 || !expectsPayload) {
1741    // https://tools.ietf.org/html/rfc7230#section-3.3.2
1742    // A user agent SHOULD NOT send a Content-Length header field when
1743    // the request message does not contain a payload body and the method
1744    // semantics do not anticipate such a body.
1745
1746    contentLength = null
1747  }
1748
1749  // https://github.com/nodejs/undici/issues/2046
1750  // A user agent may send a Content-Length header with 0 value, this should be allowed.
1751  if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
1752    if (client[kStrictContentLength]) {
1753      errorRequest(client, request, new RequestContentLengthMismatchError())
1754      return false
1755    }
1756
1757    process.emitWarning(new RequestContentLengthMismatchError())
1758  }
1759
1760  if (contentLength != null) {
1761    assert(body, 'no body must not have content length')
1762    headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}`
1763  }
1764
1765  session.ref()
1766
1767  const shouldEndStream = method === 'GET' || method === 'HEAD'
1768  if (expectContinue) {
1769    headers[HTTP2_HEADER_EXPECT] = '100-continue'
1770    stream = session.request(headers, { endStream: shouldEndStream, signal })
1771
1772    stream.once('continue', writeBodyH2)
1773  } else {
1774    stream = session.request(headers, {
1775      endStream: shouldEndStream,
1776      signal
1777    })
1778    writeBodyH2()
1779  }
1780
1781  // Increment counter as we have new several streams open
1782  ++h2State.openStreams
1783
1784  stream.once('response', headers => {
1785    const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
1786
1787    if (request.onHeaders(Number(statusCode), realHeaders, stream.resume.bind(stream), '') === false) {
1788      stream.pause()
1789    }
1790  })
1791
1792  stream.once('end', () => {
1793    request.onComplete([])
1794  })
1795
1796  stream.on('data', (chunk) => {
1797    if (request.onData(chunk) === false) {
1798      stream.pause()
1799    }
1800  })
1801
1802  stream.once('close', () => {
1803    h2State.openStreams -= 1
1804    // TODO(HTTP/2): unref only if current streams count is 0
1805    if (h2State.openStreams === 0) {
1806      session.unref()
1807    }
1808  })
1809
1810  stream.once('error', function (err) {
1811    if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
1812      h2State.streams -= 1
1813      util.destroy(stream, err)
1814    }
1815  })
1816
1817  stream.once('frameError', (type, code) => {
1818    const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
1819    errorRequest(client, request, err)
1820
1821    if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
1822      h2State.streams -= 1
1823      util.destroy(stream, err)
1824    }
1825  })
1826
1827  // stream.on('aborted', () => {
1828  //   // TODO(HTTP/2): Support aborted
1829  // })
1830
1831  // stream.on('timeout', () => {
1832  //   // TODO(HTTP/2): Support timeout
1833  // })
1834
1835  // stream.on('push', headers => {
1836  //   // TODO(HTTP/2): Suppor push
1837  // })
1838
1839  // stream.on('trailers', headers => {
1840  //   // TODO(HTTP/2): Support trailers
1841  // })
1842
1843  return true
1844
1845  function writeBodyH2 () {
1846    /* istanbul ignore else: assertion */
1847    if (!body) {
1848      request.onRequestSent()
1849    } else if (util.isBuffer(body)) {
1850      assert(contentLength === body.byteLength, 'buffer body must have content length')
1851      stream.cork()
1852      stream.write(body)
1853      stream.uncork()
1854      stream.end()
1855      request.onBodySent(body)
1856      request.onRequestSent()
1857    } else if (util.isBlobLike(body)) {
1858      if (typeof body.stream === 'function') {
1859        writeIterable({
1860          client,
1861          request,
1862          contentLength,
1863          h2stream: stream,
1864          expectsPayload,
1865          body: body.stream(),
1866          socket: client[kSocket],
1867          header: ''
1868        })
1869      } else {
1870        writeBlob({
1871          body,
1872          client,
1873          request,
1874          contentLength,
1875          expectsPayload,
1876          h2stream: stream,
1877          header: '',
1878          socket: client[kSocket]
1879        })
1880      }
1881    } else if (util.isStream(body)) {
1882      writeStream({
1883        body,
1884        client,
1885        request,
1886        contentLength,
1887        expectsPayload,
1888        socket: client[kSocket],
1889        h2stream: stream,
1890        header: ''
1891      })
1892    } else if (util.isIterable(body)) {
1893      writeIterable({
1894        body,
1895        client,
1896        request,
1897        contentLength,
1898        expectsPayload,
1899        header: '',
1900        h2stream: stream,
1901        socket: client[kSocket]
1902      })
1903    } else {
1904      assert(false)
1905    }
1906  }
1907}
1908
1909function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
1910  assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')
1911
1912  if (client[kHTTPConnVersion] === 'h2') {
1913    // For HTTP/2, is enough to pipe the stream
1914    const pipe = pipeline(
1915      body,
1916      h2stream,
1917      (err) => {
1918        if (err) {
1919          util.destroy(body, err)
1920          util.destroy(h2stream, err)
1921        } else {
1922          request.onRequestSent()
1923        }
1924      }
1925    )
1926
1927    pipe.on('data', onPipeData)
1928    pipe.once('end', () => {
1929      pipe.removeListener('data', onPipeData)
1930      util.destroy(pipe)
1931    })
1932
1933    function onPipeData (chunk) {
1934      request.onBodySent(chunk)
1935    }
1936
1937    return
1938  }
1939
1940  let finished = false
1941
1942  const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
1943
1944  const onData = function (chunk) {
1945    if (finished) {
1946      return
1947    }
1948
1949    try {
1950      if (!writer.write(chunk) && this.pause) {
1951        this.pause()
1952      }
1953    } catch (err) {
1954      util.destroy(this, err)
1955    }
1956  }
1957  const onDrain = function () {
1958    if (finished) {
1959      return
1960    }
1961
1962    if (body.resume) {
1963      body.resume()
1964    }
1965  }
1966  const onAbort = function () {
1967    if (finished) {
1968      return
1969    }
1970    const err = new RequestAbortedError()
1971    queueMicrotask(() => onFinished(err))
1972  }
1973  const onFinished = function (err) {
1974    if (finished) {
1975      return
1976    }
1977
1978    finished = true
1979
1980    assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1))
1981
1982    socket
1983      .off('drain', onDrain)
1984      .off('error', onFinished)
1985
1986    body
1987      .removeListener('data', onData)
1988      .removeListener('end', onFinished)
1989      .removeListener('error', onFinished)
1990      .removeListener('close', onAbort)
1991
1992    if (!err) {
1993      try {
1994        writer.end()
1995      } catch (er) {
1996        err = er
1997      }
1998    }
1999
2000    writer.destroy(err)
2001
2002    if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) {
2003      util.destroy(body, err)
2004    } else {
2005      util.destroy(body)
2006    }
2007  }
2008
2009  body
2010    .on('data', onData)
2011    .on('end', onFinished)
2012    .on('error', onFinished)
2013    .on('close', onAbort)
2014
2015  if (body.resume) {
2016    body.resume()
2017  }
2018
2019  socket
2020    .on('drain', onDrain)
2021    .on('error', onFinished)
2022}
2023
2024async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
2025  assert(contentLength === body.size, 'blob body must have content length')
2026
2027  const isH2 = client[kHTTPConnVersion] === 'h2'
2028  try {
2029    if (contentLength != null && contentLength !== body.size) {
2030      throw new RequestContentLengthMismatchError()
2031    }
2032
2033    const buffer = Buffer.from(await body.arrayBuffer())
2034
2035    if (isH2) {
2036      h2stream.cork()
2037      h2stream.write(buffer)
2038      h2stream.uncork()
2039    } else {
2040      socket.cork()
2041      socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
2042      socket.write(buffer)
2043      socket.uncork()
2044    }
2045
2046    request.onBodySent(buffer)
2047    request.onRequestSent()
2048
2049    if (!expectsPayload) {
2050      socket[kReset] = true
2051    }
2052
2053    resume(client)
2054  } catch (err) {
2055    util.destroy(isH2 ? h2stream : socket, err)
2056  }
2057}
2058
2059async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) {
2060  assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined')
2061
2062  let callback = null
2063  function onDrain () {
2064    if (callback) {
2065      const cb = callback
2066      callback = null
2067      cb()
2068    }
2069  }
2070
2071  const waitForDrain = () => new Promise((resolve, reject) => {
2072    assert(callback === null)
2073
2074    if (socket[kError]) {
2075      reject(socket[kError])
2076    } else {
2077      callback = resolve
2078    }
2079  })
2080
2081  if (client[kHTTPConnVersion] === 'h2') {
2082    h2stream
2083      .on('close', onDrain)
2084      .on('drain', onDrain)
2085
2086    try {
2087      // It's up to the user to somehow abort the async iterable.
2088      for await (const chunk of body) {
2089        if (socket[kError]) {
2090          throw socket[kError]
2091        }
2092
2093        const res = h2stream.write(chunk)
2094        request.onBodySent(chunk)
2095        if (!res) {
2096          await waitForDrain()
2097        }
2098      }
2099    } catch (err) {
2100      h2stream.destroy(err)
2101    } finally {
2102      request.onRequestSent()
2103      h2stream.end()
2104      h2stream
2105        .off('close', onDrain)
2106        .off('drain', onDrain)
2107    }
2108
2109    return
2110  }
2111
2112  socket
2113    .on('close', onDrain)
2114    .on('drain', onDrain)
2115
2116  const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header })
2117  try {
2118    // It's up to the user to somehow abort the async iterable.
2119    for await (const chunk of body) {
2120      if (socket[kError]) {
2121        throw socket[kError]
2122      }
2123
2124      if (!writer.write(chunk)) {
2125        await waitForDrain()
2126      }
2127    }
2128
2129    writer.end()
2130  } catch (err) {
2131    writer.destroy(err)
2132  } finally {
2133    socket
2134      .off('close', onDrain)
2135      .off('drain', onDrain)
2136  }
2137}
2138
2139class AsyncWriter {
2140  constructor ({ socket, request, contentLength, client, expectsPayload, header }) {
2141    this.socket = socket
2142    this.request = request
2143    this.contentLength = contentLength
2144    this.client = client
2145    this.bytesWritten = 0
2146    this.expectsPayload = expectsPayload
2147    this.header = header
2148
2149    socket[kWriting] = true
2150  }
2151
2152  write (chunk) {
2153    const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this
2154
2155    if (socket[kError]) {
2156      throw socket[kError]
2157    }
2158
2159    if (socket.destroyed) {
2160      return false
2161    }
2162
2163    const len = Buffer.byteLength(chunk)
2164    if (!len) {
2165      return true
2166    }
2167
2168    // We should defer writing chunks.
2169    if (contentLength !== null && bytesWritten + len > contentLength) {
2170      if (client[kStrictContentLength]) {
2171        throw new RequestContentLengthMismatchError()
2172      }
2173
2174      process.emitWarning(new RequestContentLengthMismatchError())
2175    }
2176
2177    socket.cork()
2178
2179    if (bytesWritten === 0) {
2180      if (!expectsPayload) {
2181        socket[kReset] = true
2182      }
2183
2184      if (contentLength === null) {
2185        socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1')
2186      } else {
2187        socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
2188      }
2189    }
2190
2191    if (contentLength === null) {
2192      socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1')
2193    }
2194
2195    this.bytesWritten += len
2196
2197    const ret = socket.write(chunk)
2198
2199    socket.uncork()
2200
2201    request.onBodySent(chunk)
2202
2203    if (!ret) {
2204      if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
2205        // istanbul ignore else: only for jest
2206        if (socket[kParser].timeout.refresh) {
2207          socket[kParser].timeout.refresh()
2208        }
2209      }
2210    }
2211
2212    return ret
2213  }
2214
2215  end () {
2216    const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this
2217    request.onRequestSent()
2218
2219    socket[kWriting] = false
2220
2221    if (socket[kError]) {
2222      throw socket[kError]
2223    }
2224
2225    if (socket.destroyed) {
2226      return
2227    }
2228
2229    if (bytesWritten === 0) {
2230      if (expectsPayload) {
2231        // https://tools.ietf.org/html/rfc7230#section-3.3.2
2232        // A user agent SHOULD send a Content-Length in a request message when
2233        // no Transfer-Encoding is sent and the request method defines a meaning
2234        // for an enclosed payload body.
2235
2236        socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1')
2237      } else {
2238        socket.write(`${header}\r\n`, 'latin1')
2239      }
2240    } else if (contentLength === null) {
2241      socket.write('\r\n0\r\n\r\n', 'latin1')
2242    }
2243
2244    if (contentLength !== null && bytesWritten !== contentLength) {
2245      if (client[kStrictContentLength]) {
2246        throw new RequestContentLengthMismatchError()
2247      } else {
2248        process.emitWarning(new RequestContentLengthMismatchError())
2249      }
2250    }
2251
2252    if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) {
2253      // istanbul ignore else: only for jest
2254      if (socket[kParser].timeout.refresh) {
2255        socket[kParser].timeout.refresh()
2256      }
2257    }
2258
2259    resume(client)
2260  }
2261
2262  destroy (err) {
2263    const { socket, client } = this
2264
2265    socket[kWriting] = false
2266
2267    if (err) {
2268      assert(client[kRunning] <= 1, 'pipeline should only contain this request')
2269      util.destroy(socket, err)
2270    }
2271  }
2272}
2273
2274function errorRequest (client, request, err) {
2275  try {
2276    request.onError(err)
2277    assert(request.aborted)
2278  } catch (err) {
2279    client.emit('error', err)
2280  }
2281}
2282
2283module.exports = Client
2284