1// @ts-check 2 3'use strict' 4 5/* global WebAssembly */ 6 7const assert = require('assert') 8const net = require('net') 9const http = require('http') 10const { pipeline } = require('stream') 11const util = require('./core/util') 12const timers = require('./timers') 13const Request = require('./core/request') 14const DispatcherBase = require('./dispatcher-base') 15const { 16 RequestContentLengthMismatchError, 17 ResponseContentLengthMismatchError, 18 InvalidArgumentError, 19 RequestAbortedError, 20 HeadersTimeoutError, 21 HeadersOverflowError, 22 SocketError, 23 InformationalError, 24 BodyTimeoutError, 25 HTTPParserError, 26 ResponseExceededMaxSizeError, 27 ClientDestroyedError 28} = require('./core/errors') 29const buildConnector = require('./core/connect') 30const { 31 kUrl, 32 kReset, 33 kServerName, 34 kClient, 35 kBusy, 36 kParser, 37 kConnect, 38 kBlocking, 39 kResuming, 40 kRunning, 41 kPending, 42 kSize, 43 kWriting, 44 kQueue, 45 kConnected, 46 kConnecting, 47 kNeedDrain, 48 kNoRef, 49 kKeepAliveDefaultTimeout, 50 kHostHeader, 51 kPendingIdx, 52 kRunningIdx, 53 kError, 54 kPipelining, 55 kSocket, 56 kKeepAliveTimeoutValue, 57 kMaxHeadersSize, 58 kKeepAliveMaxTimeout, 59 kKeepAliveTimeoutThreshold, 60 kHeadersTimeout, 61 kBodyTimeout, 62 kStrictContentLength, 63 kConnector, 64 kMaxRedirections, 65 kMaxRequests, 66 kCounter, 67 kClose, 68 kDestroy, 69 kDispatch, 70 kInterceptors, 71 kLocalAddress, 72 kMaxResponseSize, 73 kHTTPConnVersion, 74 // HTTP2 75 kHost, 76 kHTTP2Session, 77 kHTTP2SessionState, 78 kHTTP2BuildRequest, 79 kHTTP2CopyHeaders, 80 kHTTP1BuildRequest 81} = require('./core/symbols') 82 83/** @type {import('http2')} */ 84let http2 85try { 86 http2 = require('http2') 87} catch { 88 // @ts-ignore 89 http2 = { constants: {} } 90} 91 92const { 93 constants: { 94 HTTP2_HEADER_AUTHORITY, 95 HTTP2_HEADER_METHOD, 96 HTTP2_HEADER_PATH, 97 HTTP2_HEADER_SCHEME, 98 HTTP2_HEADER_CONTENT_LENGTH, 99 HTTP2_HEADER_EXPECT, 100 HTTP2_HEADER_STATUS 101 } 102} = http2 103 104// Experimental 105let h2ExperimentalWarned = false 106 107const FastBuffer = Buffer[Symbol.species] 108 109const kClosedResolve = Symbol('kClosedResolve') 110 111const channels = {} 112 113try { 114 const diagnosticsChannel = require('diagnostics_channel') 115 channels.sendHeaders = diagnosticsChannel.channel('undici:client:sendHeaders') 116 channels.beforeConnect = diagnosticsChannel.channel('undici:client:beforeConnect') 117 channels.connectError = diagnosticsChannel.channel('undici:client:connectError') 118 channels.connected = diagnosticsChannel.channel('undici:client:connected') 119} catch { 120 channels.sendHeaders = { hasSubscribers: false } 121 channels.beforeConnect = { hasSubscribers: false } 122 channels.connectError = { hasSubscribers: false } 123 channels.connected = { hasSubscribers: false } 124} 125 126/** 127 * @type {import('../types/client').default} 128 */ 129class Client extends DispatcherBase { 130 /** 131 * 132 * @param {string|URL} url 133 * @param {import('../types/client').Client.Options} options 134 */ 135 constructor (url, { 136 interceptors, 137 maxHeaderSize, 138 headersTimeout, 139 socketTimeout, 140 requestTimeout, 141 connectTimeout, 142 bodyTimeout, 143 idleTimeout, 144 keepAlive, 145 keepAliveTimeout, 146 maxKeepAliveTimeout, 147 keepAliveMaxTimeout, 148 keepAliveTimeoutThreshold, 149 socketPath, 150 pipelining, 151 tls, 152 strictContentLength, 153 maxCachedSessions, 154 maxRedirections, 155 connect, 156 maxRequestsPerClient, 157 localAddress, 158 maxResponseSize, 159 autoSelectFamily, 160 autoSelectFamilyAttemptTimeout, 161 // h2 162 allowH2, 163 maxConcurrentStreams 164 } = {}) { 165 super() 166 167 if (keepAlive !== undefined) { 168 throw new InvalidArgumentError('unsupported keepAlive, use pipelining=0 instead') 169 } 170 171 if (socketTimeout !== undefined) { 172 throw new InvalidArgumentError('unsupported socketTimeout, use headersTimeout & bodyTimeout instead') 173 } 174 175 if (requestTimeout !== undefined) { 176 throw new InvalidArgumentError('unsupported requestTimeout, use headersTimeout & bodyTimeout instead') 177 } 178 179 if (idleTimeout !== undefined) { 180 throw new InvalidArgumentError('unsupported idleTimeout, use keepAliveTimeout instead') 181 } 182 183 if (maxKeepAliveTimeout !== undefined) { 184 throw new InvalidArgumentError('unsupported maxKeepAliveTimeout, use keepAliveMaxTimeout instead') 185 } 186 187 if (maxHeaderSize != null && !Number.isFinite(maxHeaderSize)) { 188 throw new InvalidArgumentError('invalid maxHeaderSize') 189 } 190 191 if (socketPath != null && typeof socketPath !== 'string') { 192 throw new InvalidArgumentError('invalid socketPath') 193 } 194 195 if (connectTimeout != null && (!Number.isFinite(connectTimeout) || connectTimeout < 0)) { 196 throw new InvalidArgumentError('invalid connectTimeout') 197 } 198 199 if (keepAliveTimeout != null && (!Number.isFinite(keepAliveTimeout) || keepAliveTimeout <= 0)) { 200 throw new InvalidArgumentError('invalid keepAliveTimeout') 201 } 202 203 if (keepAliveMaxTimeout != null && (!Number.isFinite(keepAliveMaxTimeout) || keepAliveMaxTimeout <= 0)) { 204 throw new InvalidArgumentError('invalid keepAliveMaxTimeout') 205 } 206 207 if (keepAliveTimeoutThreshold != null && !Number.isFinite(keepAliveTimeoutThreshold)) { 208 throw new InvalidArgumentError('invalid keepAliveTimeoutThreshold') 209 } 210 211 if (headersTimeout != null && (!Number.isInteger(headersTimeout) || headersTimeout < 0)) { 212 throw new InvalidArgumentError('headersTimeout must be a positive integer or zero') 213 } 214 215 if (bodyTimeout != null && (!Number.isInteger(bodyTimeout) || bodyTimeout < 0)) { 216 throw new InvalidArgumentError('bodyTimeout must be a positive integer or zero') 217 } 218 219 if (connect != null && typeof connect !== 'function' && typeof connect !== 'object') { 220 throw new InvalidArgumentError('connect must be a function or an object') 221 } 222 223 if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) { 224 throw new InvalidArgumentError('maxRedirections must be a positive number') 225 } 226 227 if (maxRequestsPerClient != null && (!Number.isInteger(maxRequestsPerClient) || maxRequestsPerClient < 0)) { 228 throw new InvalidArgumentError('maxRequestsPerClient must be a positive number') 229 } 230 231 if (localAddress != null && (typeof localAddress !== 'string' || net.isIP(localAddress) === 0)) { 232 throw new InvalidArgumentError('localAddress must be valid string IP address') 233 } 234 235 if (maxResponseSize != null && (!Number.isInteger(maxResponseSize) || maxResponseSize < -1)) { 236 throw new InvalidArgumentError('maxResponseSize must be a positive number') 237 } 238 239 if ( 240 autoSelectFamilyAttemptTimeout != null && 241 (!Number.isInteger(autoSelectFamilyAttemptTimeout) || autoSelectFamilyAttemptTimeout < -1) 242 ) { 243 throw new InvalidArgumentError('autoSelectFamilyAttemptTimeout must be a positive number') 244 } 245 246 // h2 247 if (allowH2 != null && typeof allowH2 !== 'boolean') { 248 throw new InvalidArgumentError('allowH2 must be a valid boolean value') 249 } 250 251 if (maxConcurrentStreams != null && (typeof maxConcurrentStreams !== 'number' || maxConcurrentStreams < 1)) { 252 throw new InvalidArgumentError('maxConcurrentStreams must be a possitive integer, greater than 0') 253 } 254 255 if (typeof connect !== 'function') { 256 connect = buildConnector({ 257 ...tls, 258 maxCachedSessions, 259 allowH2, 260 socketPath, 261 timeout: connectTimeout, 262 ...(util.nodeHasAutoSelectFamily && autoSelectFamily ? { autoSelectFamily, autoSelectFamilyAttemptTimeout } : undefined), 263 ...connect 264 }) 265 } 266 267 this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client) 268 ? interceptors.Client 269 : [createRedirectInterceptor({ maxRedirections })] 270 this[kUrl] = util.parseOrigin(url) 271 this[kConnector] = connect 272 this[kSocket] = null 273 this[kPipelining] = pipelining != null ? pipelining : 1 274 this[kMaxHeadersSize] = maxHeaderSize || http.maxHeaderSize 275 this[kKeepAliveDefaultTimeout] = keepAliveTimeout == null ? 4e3 : keepAliveTimeout 276 this[kKeepAliveMaxTimeout] = keepAliveMaxTimeout == null ? 600e3 : keepAliveMaxTimeout 277 this[kKeepAliveTimeoutThreshold] = keepAliveTimeoutThreshold == null ? 1e3 : keepAliveTimeoutThreshold 278 this[kKeepAliveTimeoutValue] = this[kKeepAliveDefaultTimeout] 279 this[kServerName] = null 280 this[kLocalAddress] = localAddress != null ? localAddress : null 281 this[kResuming] = 0 // 0, idle, 1, scheduled, 2 resuming 282 this[kNeedDrain] = 0 // 0, idle, 1, scheduled, 2 resuming 283 this[kHostHeader] = `host: ${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}\r\n` 284 this[kBodyTimeout] = bodyTimeout != null ? bodyTimeout : 300e3 285 this[kHeadersTimeout] = headersTimeout != null ? headersTimeout : 300e3 286 this[kStrictContentLength] = strictContentLength == null ? true : strictContentLength 287 this[kMaxRedirections] = maxRedirections 288 this[kMaxRequests] = maxRequestsPerClient 289 this[kClosedResolve] = null 290 this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1 291 this[kHTTPConnVersion] = 'h1' 292 293 // HTTP/2 294 this[kHTTP2Session] = null 295 this[kHTTP2SessionState] = !allowH2 296 ? null 297 : { 298 // streams: null, // Fixed queue of streams - For future support of `push` 299 openStreams: 0, // Keep track of them to decide wether or not unref the session 300 maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server 301 } 302 this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}` 303 304 // kQueue is built up of 3 sections separated by 305 // the kRunningIdx and kPendingIdx indices. 306 // | complete | running | pending | 307 // ^ kRunningIdx ^ kPendingIdx ^ kQueue.length 308 // kRunningIdx points to the first running element. 309 // kPendingIdx points to the first pending element. 310 // This implements a fast queue with an amortized 311 // time of O(1). 312 313 this[kQueue] = [] 314 this[kRunningIdx] = 0 315 this[kPendingIdx] = 0 316 } 317 318 get pipelining () { 319 return this[kPipelining] 320 } 321 322 set pipelining (value) { 323 this[kPipelining] = value 324 resume(this, true) 325 } 326 327 get [kPending] () { 328 return this[kQueue].length - this[kPendingIdx] 329 } 330 331 get [kRunning] () { 332 return this[kPendingIdx] - this[kRunningIdx] 333 } 334 335 get [kSize] () { 336 return this[kQueue].length - this[kRunningIdx] 337 } 338 339 get [kConnected] () { 340 return !!this[kSocket] && !this[kConnecting] && !this[kSocket].destroyed 341 } 342 343 get [kBusy] () { 344 const socket = this[kSocket] 345 return ( 346 (socket && (socket[kReset] || socket[kWriting] || socket[kBlocking])) || 347 (this[kSize] >= (this[kPipelining] || 1)) || 348 this[kPending] > 0 349 ) 350 } 351 352 /* istanbul ignore: only used for test */ 353 [kConnect] (cb) { 354 connect(this) 355 this.once('connect', cb) 356 } 357 358 [kDispatch] (opts, handler) { 359 const origin = opts.origin || this[kUrl].origin 360 361 const request = this[kHTTPConnVersion] === 'h2' 362 ? Request[kHTTP2BuildRequest](origin, opts, handler) 363 : Request[kHTTP1BuildRequest](origin, opts, handler) 364 365 this[kQueue].push(request) 366 if (this[kResuming]) { 367 // Do nothing. 368 } else if (util.bodyLength(request.body) == null && util.isIterable(request.body)) { 369 // Wait a tick in case stream/iterator is ended in the same tick. 370 this[kResuming] = 1 371 process.nextTick(resume, this) 372 } else { 373 resume(this, true) 374 } 375 376 if (this[kResuming] && this[kNeedDrain] !== 2 && this[kBusy]) { 377 this[kNeedDrain] = 2 378 } 379 380 return this[kNeedDrain] < 2 381 } 382 383 async [kClose] () { 384 // TODO: for H2 we need to gracefully flush the remaining enqueued 385 // request and close each stream. 386 return new Promise((resolve) => { 387 if (!this[kSize]) { 388 resolve(null) 389 } else { 390 this[kClosedResolve] = resolve 391 } 392 }) 393 } 394 395 async [kDestroy] (err) { 396 return new Promise((resolve) => { 397 const requests = this[kQueue].splice(this[kPendingIdx]) 398 for (let i = 0; i < requests.length; i++) { 399 const request = requests[i] 400 errorRequest(this, request, err) 401 } 402 403 const callback = () => { 404 if (this[kClosedResolve]) { 405 // TODO (fix): Should we error here with ClientDestroyedError? 406 this[kClosedResolve]() 407 this[kClosedResolve] = null 408 } 409 resolve() 410 } 411 412 if (this[kHTTP2Session] != null) { 413 util.destroy(this[kHTTP2Session], err) 414 this[kHTTP2Session] = null 415 this[kHTTP2SessionState] = null 416 } 417 418 if (!this[kSocket]) { 419 queueMicrotask(callback) 420 } else { 421 util.destroy(this[kSocket].on('close', callback), err) 422 } 423 424 resume(this) 425 }) 426 } 427} 428 429function onHttp2SessionError (err) { 430 assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') 431 432 this[kSocket][kError] = err 433 434 onError(this[kClient], err) 435} 436 437function onHttp2FrameError (type, code, id) { 438 const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) 439 440 if (id === 0) { 441 this[kSocket][kError] = err 442 onError(this[kClient], err) 443 } 444} 445 446function onHttp2SessionEnd () { 447 util.destroy(this, new SocketError('other side closed')) 448 util.destroy(this[kSocket], new SocketError('other side closed')) 449} 450 451function onHTTP2GoAway (code) { 452 const client = this[kClient] 453 const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`) 454 client[kSocket] = null 455 client[kHTTP2Session] = null 456 457 if (client.destroyed) { 458 assert(this[kPending] === 0) 459 460 // Fail entire queue. 461 const requests = client[kQueue].splice(client[kRunningIdx]) 462 for (let i = 0; i < requests.length; i++) { 463 const request = requests[i] 464 errorRequest(this, request, err) 465 } 466 } else if (client[kRunning] > 0) { 467 // Fail head of pipeline. 468 const request = client[kQueue][client[kRunningIdx]] 469 client[kQueue][client[kRunningIdx]++] = null 470 471 errorRequest(client, request, err) 472 } 473 474 client[kPendingIdx] = client[kRunningIdx] 475 476 assert(client[kRunning] === 0) 477 478 client.emit('disconnect', 479 client[kUrl], 480 [client], 481 err 482 ) 483 484 resume(client) 485} 486 487const constants = require('./llhttp/constants') 488const createRedirectInterceptor = require('./interceptor/redirectInterceptor') 489const EMPTY_BUF = Buffer.alloc(0) 490 491async function lazyllhttp () { 492 const llhttpWasmData = process.env.JEST_WORKER_ID ? require('./llhttp/llhttp-wasm.js') : undefined 493 494 let mod 495 try { 496 mod = await WebAssembly.compile(Buffer.from(require('./llhttp/llhttp_simd-wasm.js'), 'base64')) 497 } catch (e) { 498 /* istanbul ignore next */ 499 500 // We could check if the error was caused by the simd option not 501 // being enabled, but the occurring of this other error 502 // * https://github.com/emscripten-core/emscripten/issues/11495 503 // got me to remove that check to avoid breaking Node 12. 504 mod = await WebAssembly.compile(Buffer.from(llhttpWasmData || require('./llhttp/llhttp-wasm.js'), 'base64')) 505 } 506 507 return await WebAssembly.instantiate(mod, { 508 env: { 509 /* eslint-disable camelcase */ 510 511 wasm_on_url: (p, at, len) => { 512 /* istanbul ignore next */ 513 return 0 514 }, 515 wasm_on_status: (p, at, len) => { 516 assert.strictEqual(currentParser.ptr, p) 517 const start = at - currentBufferPtr + currentBufferRef.byteOffset 518 return currentParser.onStatus(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 519 }, 520 wasm_on_message_begin: (p) => { 521 assert.strictEqual(currentParser.ptr, p) 522 return currentParser.onMessageBegin() || 0 523 }, 524 wasm_on_header_field: (p, at, len) => { 525 assert.strictEqual(currentParser.ptr, p) 526 const start = at - currentBufferPtr + currentBufferRef.byteOffset 527 return currentParser.onHeaderField(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 528 }, 529 wasm_on_header_value: (p, at, len) => { 530 assert.strictEqual(currentParser.ptr, p) 531 const start = at - currentBufferPtr + currentBufferRef.byteOffset 532 return currentParser.onHeaderValue(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 533 }, 534 wasm_on_headers_complete: (p, statusCode, upgrade, shouldKeepAlive) => { 535 assert.strictEqual(currentParser.ptr, p) 536 return currentParser.onHeadersComplete(statusCode, Boolean(upgrade), Boolean(shouldKeepAlive)) || 0 537 }, 538 wasm_on_body: (p, at, len) => { 539 assert.strictEqual(currentParser.ptr, p) 540 const start = at - currentBufferPtr + currentBufferRef.byteOffset 541 return currentParser.onBody(new FastBuffer(currentBufferRef.buffer, start, len)) || 0 542 }, 543 wasm_on_message_complete: (p) => { 544 assert.strictEqual(currentParser.ptr, p) 545 return currentParser.onMessageComplete() || 0 546 } 547 548 /* eslint-enable camelcase */ 549 } 550 }) 551} 552 553let llhttpInstance = null 554let llhttpPromise = lazyllhttp() 555llhttpPromise.catch() 556 557let currentParser = null 558let currentBufferRef = null 559let currentBufferSize = 0 560let currentBufferPtr = null 561 562const TIMEOUT_HEADERS = 1 563const TIMEOUT_BODY = 2 564const TIMEOUT_IDLE = 3 565 566class Parser { 567 constructor (client, socket, { exports }) { 568 assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0) 569 570 this.llhttp = exports 571 this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE) 572 this.client = client 573 this.socket = socket 574 this.timeout = null 575 this.timeoutValue = null 576 this.timeoutType = null 577 this.statusCode = null 578 this.statusText = '' 579 this.upgrade = false 580 this.headers = [] 581 this.headersSize = 0 582 this.headersMaxSize = client[kMaxHeadersSize] 583 this.shouldKeepAlive = false 584 this.paused = false 585 this.resume = this.resume.bind(this) 586 587 this.bytesRead = 0 588 589 this.keepAlive = '' 590 this.contentLength = '' 591 this.connection = '' 592 this.maxResponseSize = client[kMaxResponseSize] 593 } 594 595 setTimeout (value, type) { 596 this.timeoutType = type 597 if (value !== this.timeoutValue) { 598 timers.clearTimeout(this.timeout) 599 if (value) { 600 this.timeout = timers.setTimeout(onParserTimeout, value, this) 601 // istanbul ignore else: only for jest 602 if (this.timeout.unref) { 603 this.timeout.unref() 604 } 605 } else { 606 this.timeout = null 607 } 608 this.timeoutValue = value 609 } else if (this.timeout) { 610 // istanbul ignore else: only for jest 611 if (this.timeout.refresh) { 612 this.timeout.refresh() 613 } 614 } 615 } 616 617 resume () { 618 if (this.socket.destroyed || !this.paused) { 619 return 620 } 621 622 assert(this.ptr != null) 623 assert(currentParser == null) 624 625 this.llhttp.llhttp_resume(this.ptr) 626 627 assert(this.timeoutType === TIMEOUT_BODY) 628 if (this.timeout) { 629 // istanbul ignore else: only for jest 630 if (this.timeout.refresh) { 631 this.timeout.refresh() 632 } 633 } 634 635 this.paused = false 636 this.execute(this.socket.read() || EMPTY_BUF) // Flush parser. 637 this.readMore() 638 } 639 640 readMore () { 641 while (!this.paused && this.ptr) { 642 const chunk = this.socket.read() 643 if (chunk === null) { 644 break 645 } 646 this.execute(chunk) 647 } 648 } 649 650 execute (data) { 651 assert(this.ptr != null) 652 assert(currentParser == null) 653 assert(!this.paused) 654 655 const { socket, llhttp } = this 656 657 if (data.length > currentBufferSize) { 658 if (currentBufferPtr) { 659 llhttp.free(currentBufferPtr) 660 } 661 currentBufferSize = Math.ceil(data.length / 4096) * 4096 662 currentBufferPtr = llhttp.malloc(currentBufferSize) 663 } 664 665 new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(data) 666 667 // Call `execute` on the wasm parser. 668 // We pass the `llhttp_parser` pointer address, the pointer address of buffer view data, 669 // and finally the length of bytes to parse. 670 // The return value is an error code or `constants.ERROR.OK`. 671 try { 672 let ret 673 674 try { 675 currentBufferRef = data 676 currentParser = this 677 ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, data.length) 678 /* eslint-disable-next-line no-useless-catch */ 679 } catch (err) { 680 /* istanbul ignore next: difficult to make a test case for */ 681 throw err 682 } finally { 683 currentParser = null 684 currentBufferRef = null 685 } 686 687 const offset = llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr 688 689 if (ret === constants.ERROR.PAUSED_UPGRADE) { 690 this.onUpgrade(data.slice(offset)) 691 } else if (ret === constants.ERROR.PAUSED) { 692 this.paused = true 693 socket.unshift(data.slice(offset)) 694 } else if (ret !== constants.ERROR.OK) { 695 const ptr = llhttp.llhttp_get_error_reason(this.ptr) 696 let message = '' 697 /* istanbul ignore else: difficult to make a test case for */ 698 if (ptr) { 699 const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0) 700 message = 701 'Response does not match the HTTP/1.1 protocol (' + 702 Buffer.from(llhttp.memory.buffer, ptr, len).toString() + 703 ')' 704 } 705 throw new HTTPParserError(message, constants.ERROR[ret], data.slice(offset)) 706 } 707 } catch (err) { 708 util.destroy(socket, err) 709 } 710 } 711 712 destroy () { 713 assert(this.ptr != null) 714 assert(currentParser == null) 715 716 this.llhttp.llhttp_free(this.ptr) 717 this.ptr = null 718 719 timers.clearTimeout(this.timeout) 720 this.timeout = null 721 this.timeoutValue = null 722 this.timeoutType = null 723 724 this.paused = false 725 } 726 727 onStatus (buf) { 728 this.statusText = buf.toString() 729 } 730 731 onMessageBegin () { 732 const { socket, client } = this 733 734 /* istanbul ignore next: difficult to make a test case for */ 735 if (socket.destroyed) { 736 return -1 737 } 738 739 const request = client[kQueue][client[kRunningIdx]] 740 if (!request) { 741 return -1 742 } 743 } 744 745 onHeaderField (buf) { 746 const len = this.headers.length 747 748 if ((len & 1) === 0) { 749 this.headers.push(buf) 750 } else { 751 this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf]) 752 } 753 754 this.trackHeader(buf.length) 755 } 756 757 onHeaderValue (buf) { 758 let len = this.headers.length 759 760 if ((len & 1) === 1) { 761 this.headers.push(buf) 762 len += 1 763 } else { 764 this.headers[len - 1] = Buffer.concat([this.headers[len - 1], buf]) 765 } 766 767 const key = this.headers[len - 2] 768 if (key.length === 10 && key.toString().toLowerCase() === 'keep-alive') { 769 this.keepAlive += buf.toString() 770 } else if (key.length === 10 && key.toString().toLowerCase() === 'connection') { 771 this.connection += buf.toString() 772 } else if (key.length === 14 && key.toString().toLowerCase() === 'content-length') { 773 this.contentLength += buf.toString() 774 } 775 776 this.trackHeader(buf.length) 777 } 778 779 trackHeader (len) { 780 this.headersSize += len 781 if (this.headersSize >= this.headersMaxSize) { 782 util.destroy(this.socket, new HeadersOverflowError()) 783 } 784 } 785 786 onUpgrade (head) { 787 const { upgrade, client, socket, headers, statusCode } = this 788 789 assert(upgrade) 790 791 const request = client[kQueue][client[kRunningIdx]] 792 assert(request) 793 794 assert(!socket.destroyed) 795 assert(socket === client[kSocket]) 796 assert(!this.paused) 797 assert(request.upgrade || request.method === 'CONNECT') 798 799 this.statusCode = null 800 this.statusText = '' 801 this.shouldKeepAlive = null 802 803 assert(this.headers.length % 2 === 0) 804 this.headers = [] 805 this.headersSize = 0 806 807 socket.unshift(head) 808 809 socket[kParser].destroy() 810 socket[kParser] = null 811 812 socket[kClient] = null 813 socket[kError] = null 814 socket 815 .removeListener('error', onSocketError) 816 .removeListener('readable', onSocketReadable) 817 .removeListener('end', onSocketEnd) 818 .removeListener('close', onSocketClose) 819 820 client[kSocket] = null 821 client[kQueue][client[kRunningIdx]++] = null 822 client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade')) 823 824 try { 825 request.onUpgrade(statusCode, headers, socket) 826 } catch (err) { 827 util.destroy(socket, err) 828 } 829 830 resume(client) 831 } 832 833 onHeadersComplete (statusCode, upgrade, shouldKeepAlive) { 834 const { client, socket, headers, statusText } = this 835 836 /* istanbul ignore next: difficult to make a test case for */ 837 if (socket.destroyed) { 838 return -1 839 } 840 841 const request = client[kQueue][client[kRunningIdx]] 842 843 /* istanbul ignore next: difficult to make a test case for */ 844 if (!request) { 845 return -1 846 } 847 848 assert(!this.upgrade) 849 assert(this.statusCode < 200) 850 851 if (statusCode === 100) { 852 util.destroy(socket, new SocketError('bad response', util.getSocketInfo(socket))) 853 return -1 854 } 855 856 /* this can only happen if server is misbehaving */ 857 if (upgrade && !request.upgrade) { 858 util.destroy(socket, new SocketError('bad upgrade', util.getSocketInfo(socket))) 859 return -1 860 } 861 862 assert.strictEqual(this.timeoutType, TIMEOUT_HEADERS) 863 864 this.statusCode = statusCode 865 this.shouldKeepAlive = ( 866 shouldKeepAlive || 867 // Override llhttp value which does not allow keepAlive for HEAD. 868 (request.method === 'HEAD' && !socket[kReset] && this.connection.toLowerCase() === 'keep-alive') 869 ) 870 871 if (this.statusCode >= 200) { 872 const bodyTimeout = request.bodyTimeout != null 873 ? request.bodyTimeout 874 : client[kBodyTimeout] 875 this.setTimeout(bodyTimeout, TIMEOUT_BODY) 876 } else if (this.timeout) { 877 // istanbul ignore else: only for jest 878 if (this.timeout.refresh) { 879 this.timeout.refresh() 880 } 881 } 882 883 if (request.method === 'CONNECT') { 884 assert(client[kRunning] === 1) 885 this.upgrade = true 886 return 2 887 } 888 889 if (upgrade) { 890 assert(client[kRunning] === 1) 891 this.upgrade = true 892 return 2 893 } 894 895 assert(this.headers.length % 2 === 0) 896 this.headers = [] 897 this.headersSize = 0 898 899 if (this.shouldKeepAlive && client[kPipelining]) { 900 const keepAliveTimeout = this.keepAlive ? util.parseKeepAliveTimeout(this.keepAlive) : null 901 902 if (keepAliveTimeout != null) { 903 const timeout = Math.min( 904 keepAliveTimeout - client[kKeepAliveTimeoutThreshold], 905 client[kKeepAliveMaxTimeout] 906 ) 907 if (timeout <= 0) { 908 socket[kReset] = true 909 } else { 910 client[kKeepAliveTimeoutValue] = timeout 911 } 912 } else { 913 client[kKeepAliveTimeoutValue] = client[kKeepAliveDefaultTimeout] 914 } 915 } else { 916 // Stop more requests from being dispatched. 917 socket[kReset] = true 918 } 919 920 const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false 921 922 if (request.aborted) { 923 return -1 924 } 925 926 if (request.method === 'HEAD') { 927 return 1 928 } 929 930 if (statusCode < 200) { 931 return 1 932 } 933 934 if (socket[kBlocking]) { 935 socket[kBlocking] = false 936 resume(client) 937 } 938 939 return pause ? constants.ERROR.PAUSED : 0 940 } 941 942 onBody (buf) { 943 const { client, socket, statusCode, maxResponseSize } = this 944 945 if (socket.destroyed) { 946 return -1 947 } 948 949 const request = client[kQueue][client[kRunningIdx]] 950 assert(request) 951 952 assert.strictEqual(this.timeoutType, TIMEOUT_BODY) 953 if (this.timeout) { 954 // istanbul ignore else: only for jest 955 if (this.timeout.refresh) { 956 this.timeout.refresh() 957 } 958 } 959 960 assert(statusCode >= 200) 961 962 if (maxResponseSize > -1 && this.bytesRead + buf.length > maxResponseSize) { 963 util.destroy(socket, new ResponseExceededMaxSizeError()) 964 return -1 965 } 966 967 this.bytesRead += buf.length 968 969 if (request.onData(buf) === false) { 970 return constants.ERROR.PAUSED 971 } 972 } 973 974 onMessageComplete () { 975 const { client, socket, statusCode, upgrade, headers, contentLength, bytesRead, shouldKeepAlive } = this 976 977 if (socket.destroyed && (!statusCode || shouldKeepAlive)) { 978 return -1 979 } 980 981 if (upgrade) { 982 return 983 } 984 985 const request = client[kQueue][client[kRunningIdx]] 986 assert(request) 987 988 assert(statusCode >= 100) 989 990 this.statusCode = null 991 this.statusText = '' 992 this.bytesRead = 0 993 this.contentLength = '' 994 this.keepAlive = '' 995 this.connection = '' 996 997 assert(this.headers.length % 2 === 0) 998 this.headers = [] 999 this.headersSize = 0 1000 1001 if (statusCode < 200) { 1002 return 1003 } 1004 1005 /* istanbul ignore next: should be handled by llhttp? */ 1006 if (request.method !== 'HEAD' && contentLength && bytesRead !== parseInt(contentLength, 10)) { 1007 util.destroy(socket, new ResponseContentLengthMismatchError()) 1008 return -1 1009 } 1010 1011 request.onComplete(headers) 1012 1013 client[kQueue][client[kRunningIdx]++] = null 1014 1015 if (socket[kWriting]) { 1016 assert.strictEqual(client[kRunning], 0) 1017 // Response completed before request. 1018 util.destroy(socket, new InformationalError('reset')) 1019 return constants.ERROR.PAUSED 1020 } else if (!shouldKeepAlive) { 1021 util.destroy(socket, new InformationalError('reset')) 1022 return constants.ERROR.PAUSED 1023 } else if (socket[kReset] && client[kRunning] === 0) { 1024 // Destroy socket once all requests have completed. 1025 // The request at the tail of the pipeline is the one 1026 // that requested reset and no further requests should 1027 // have been queued since then. 1028 util.destroy(socket, new InformationalError('reset')) 1029 return constants.ERROR.PAUSED 1030 } else if (client[kPipelining] === 1) { 1031 // We must wait a full event loop cycle to reuse this socket to make sure 1032 // that non-spec compliant servers are not closing the connection even if they 1033 // said they won't. 1034 setImmediate(resume, client) 1035 } else { 1036 resume(client) 1037 } 1038 } 1039} 1040 1041function onParserTimeout (parser) { 1042 const { socket, timeoutType, client } = parser 1043 1044 /* istanbul ignore else */ 1045 if (timeoutType === TIMEOUT_HEADERS) { 1046 if (!socket[kWriting] || socket.writableNeedDrain || client[kRunning] > 1) { 1047 assert(!parser.paused, 'cannot be paused while waiting for headers') 1048 util.destroy(socket, new HeadersTimeoutError()) 1049 } 1050 } else if (timeoutType === TIMEOUT_BODY) { 1051 if (!parser.paused) { 1052 util.destroy(socket, new BodyTimeoutError()) 1053 } 1054 } else if (timeoutType === TIMEOUT_IDLE) { 1055 assert(client[kRunning] === 0 && client[kKeepAliveTimeoutValue]) 1056 util.destroy(socket, new InformationalError('socket idle timeout')) 1057 } 1058} 1059 1060function onSocketReadable () { 1061 const { [kParser]: parser } = this 1062 if (parser) { 1063 parser.readMore() 1064 } 1065} 1066 1067function onSocketError (err) { 1068 const { [kClient]: client, [kParser]: parser } = this 1069 1070 assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') 1071 1072 if (client[kHTTPConnVersion] !== 'h2') { 1073 // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded 1074 // to the user. 1075 if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { 1076 // We treat all incoming data so for as a valid response. 1077 parser.onMessageComplete() 1078 return 1079 } 1080 } 1081 1082 this[kError] = err 1083 1084 onError(this[kClient], err) 1085} 1086 1087function onError (client, err) { 1088 if ( 1089 client[kRunning] === 0 && 1090 err.code !== 'UND_ERR_INFO' && 1091 err.code !== 'UND_ERR_SOCKET' 1092 ) { 1093 // Error is not caused by running request and not a recoverable 1094 // socket error. 1095 1096 assert(client[kPendingIdx] === client[kRunningIdx]) 1097 1098 const requests = client[kQueue].splice(client[kRunningIdx]) 1099 for (let i = 0; i < requests.length; i++) { 1100 const request = requests[i] 1101 errorRequest(client, request, err) 1102 } 1103 assert(client[kSize] === 0) 1104 } 1105} 1106 1107function onSocketEnd () { 1108 const { [kParser]: parser, [kClient]: client } = this 1109 1110 if (client[kHTTPConnVersion] !== 'h2') { 1111 if (parser.statusCode && !parser.shouldKeepAlive) { 1112 // We treat all incoming data so far as a valid response. 1113 parser.onMessageComplete() 1114 return 1115 } 1116 } 1117 1118 util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) 1119} 1120 1121function onSocketClose () { 1122 const { [kClient]: client, [kParser]: parser } = this 1123 1124 if (client[kHTTPConnVersion] === 'h1' && parser) { 1125 if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { 1126 // We treat all incoming data so far as a valid response. 1127 parser.onMessageComplete() 1128 } 1129 1130 this[kParser].destroy() 1131 this[kParser] = null 1132 } 1133 1134 const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) 1135 1136 client[kSocket] = null 1137 1138 if (client.destroyed) { 1139 assert(client[kPending] === 0) 1140 1141 // Fail entire queue. 1142 const requests = client[kQueue].splice(client[kRunningIdx]) 1143 for (let i = 0; i < requests.length; i++) { 1144 const request = requests[i] 1145 errorRequest(client, request, err) 1146 } 1147 } else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') { 1148 // Fail head of pipeline. 1149 const request = client[kQueue][client[kRunningIdx]] 1150 client[kQueue][client[kRunningIdx]++] = null 1151 1152 errorRequest(client, request, err) 1153 } 1154 1155 client[kPendingIdx] = client[kRunningIdx] 1156 1157 assert(client[kRunning] === 0) 1158 1159 client.emit('disconnect', client[kUrl], [client], err) 1160 1161 resume(client) 1162} 1163 1164async function connect (client) { 1165 assert(!client[kConnecting]) 1166 assert(!client[kSocket]) 1167 1168 let { host, hostname, protocol, port } = client[kUrl] 1169 1170 // Resolve ipv6 1171 if (hostname[0] === '[') { 1172 const idx = hostname.indexOf(']') 1173 1174 assert(idx !== -1) 1175 const ip = hostname.substring(1, idx) 1176 1177 assert(net.isIP(ip)) 1178 hostname = ip 1179 } 1180 1181 client[kConnecting] = true 1182 1183 if (channels.beforeConnect.hasSubscribers) { 1184 channels.beforeConnect.publish({ 1185 connectParams: { 1186 host, 1187 hostname, 1188 protocol, 1189 port, 1190 servername: client[kServerName], 1191 localAddress: client[kLocalAddress] 1192 }, 1193 connector: client[kConnector] 1194 }) 1195 } 1196 1197 try { 1198 const socket = await new Promise((resolve, reject) => { 1199 client[kConnector]({ 1200 host, 1201 hostname, 1202 protocol, 1203 port, 1204 servername: client[kServerName], 1205 localAddress: client[kLocalAddress] 1206 }, (err, socket) => { 1207 if (err) { 1208 reject(err) 1209 } else { 1210 resolve(socket) 1211 } 1212 }) 1213 }) 1214 1215 if (client.destroyed) { 1216 util.destroy(socket.on('error', () => {}), new ClientDestroyedError()) 1217 return 1218 } 1219 1220 client[kConnecting] = false 1221 1222 assert(socket) 1223 1224 const isH2 = socket.alpnProtocol === 'h2' 1225 if (isH2) { 1226 if (!h2ExperimentalWarned) { 1227 h2ExperimentalWarned = true 1228 process.emitWarning('H2 support is experimental, expect them to change at any time.', { 1229 code: 'UNDICI-H2' 1230 }) 1231 } 1232 1233 const session = http2.connect(client[kUrl], { 1234 createConnection: () => socket, 1235 peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams 1236 }) 1237 1238 client[kHTTPConnVersion] = 'h2' 1239 session[kClient] = client 1240 session[kSocket] = socket 1241 session.on('error', onHttp2SessionError) 1242 session.on('frameError', onHttp2FrameError) 1243 session.on('end', onHttp2SessionEnd) 1244 session.on('goaway', onHTTP2GoAway) 1245 session.on('close', onSocketClose) 1246 session.unref() 1247 1248 client[kHTTP2Session] = session 1249 socket[kHTTP2Session] = session 1250 } else { 1251 if (!llhttpInstance) { 1252 llhttpInstance = await llhttpPromise 1253 llhttpPromise = null 1254 } 1255 1256 socket[kNoRef] = false 1257 socket[kWriting] = false 1258 socket[kReset] = false 1259 socket[kBlocking] = false 1260 socket[kParser] = new Parser(client, socket, llhttpInstance) 1261 } 1262 1263 socket[kCounter] = 0 1264 socket[kMaxRequests] = client[kMaxRequests] 1265 socket[kClient] = client 1266 socket[kError] = null 1267 1268 socket 1269 .on('error', onSocketError) 1270 .on('readable', onSocketReadable) 1271 .on('end', onSocketEnd) 1272 .on('close', onSocketClose) 1273 1274 client[kSocket] = socket 1275 1276 if (channels.connected.hasSubscribers) { 1277 channels.connected.publish({ 1278 connectParams: { 1279 host, 1280 hostname, 1281 protocol, 1282 port, 1283 servername: client[kServerName], 1284 localAddress: client[kLocalAddress] 1285 }, 1286 connector: client[kConnector], 1287 socket 1288 }) 1289 } 1290 client.emit('connect', client[kUrl], [client]) 1291 } catch (err) { 1292 if (client.destroyed) { 1293 return 1294 } 1295 1296 client[kConnecting] = false 1297 1298 if (channels.connectError.hasSubscribers) { 1299 channels.connectError.publish({ 1300 connectParams: { 1301 host, 1302 hostname, 1303 protocol, 1304 port, 1305 servername: client[kServerName], 1306 localAddress: client[kLocalAddress] 1307 }, 1308 connector: client[kConnector], 1309 error: err 1310 }) 1311 } 1312 1313 if (err.code === 'ERR_TLS_CERT_ALTNAME_INVALID') { 1314 assert(client[kRunning] === 0) 1315 while (client[kPending] > 0 && client[kQueue][client[kPendingIdx]].servername === client[kServerName]) { 1316 const request = client[kQueue][client[kPendingIdx]++] 1317 errorRequest(client, request, err) 1318 } 1319 } else { 1320 onError(client, err) 1321 } 1322 1323 client.emit('connectionError', client[kUrl], [client], err) 1324 } 1325 1326 resume(client) 1327} 1328 1329function emitDrain (client) { 1330 client[kNeedDrain] = 0 1331 client.emit('drain', client[kUrl], [client]) 1332} 1333 1334function resume (client, sync) { 1335 if (client[kResuming] === 2) { 1336 return 1337 } 1338 1339 client[kResuming] = 2 1340 1341 _resume(client, sync) 1342 client[kResuming] = 0 1343 1344 if (client[kRunningIdx] > 256) { 1345 client[kQueue].splice(0, client[kRunningIdx]) 1346 client[kPendingIdx] -= client[kRunningIdx] 1347 client[kRunningIdx] = 0 1348 } 1349} 1350 1351function _resume (client, sync) { 1352 while (true) { 1353 if (client.destroyed) { 1354 assert(client[kPending] === 0) 1355 return 1356 } 1357 1358 if (client[kClosedResolve] && !client[kSize]) { 1359 client[kClosedResolve]() 1360 client[kClosedResolve] = null 1361 return 1362 } 1363 1364 const socket = client[kSocket] 1365 1366 if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') { 1367 if (client[kSize] === 0) { 1368 if (!socket[kNoRef] && socket.unref) { 1369 socket.unref() 1370 socket[kNoRef] = true 1371 } 1372 } else if (socket[kNoRef] && socket.ref) { 1373 socket.ref() 1374 socket[kNoRef] = false 1375 } 1376 1377 if (client[kSize] === 0) { 1378 if (socket[kParser].timeoutType !== TIMEOUT_IDLE) { 1379 socket[kParser].setTimeout(client[kKeepAliveTimeoutValue], TIMEOUT_IDLE) 1380 } 1381 } else if (client[kRunning] > 0 && socket[kParser].statusCode < 200) { 1382 if (socket[kParser].timeoutType !== TIMEOUT_HEADERS) { 1383 const request = client[kQueue][client[kRunningIdx]] 1384 const headersTimeout = request.headersTimeout != null 1385 ? request.headersTimeout 1386 : client[kHeadersTimeout] 1387 socket[kParser].setTimeout(headersTimeout, TIMEOUT_HEADERS) 1388 } 1389 } 1390 } 1391 1392 if (client[kBusy]) { 1393 client[kNeedDrain] = 2 1394 } else if (client[kNeedDrain] === 2) { 1395 if (sync) { 1396 client[kNeedDrain] = 1 1397 process.nextTick(emitDrain, client) 1398 } else { 1399 emitDrain(client) 1400 } 1401 continue 1402 } 1403 1404 if (client[kPending] === 0) { 1405 return 1406 } 1407 1408 if (client[kRunning] >= (client[kPipelining] || 1)) { 1409 return 1410 } 1411 1412 const request = client[kQueue][client[kPendingIdx]] 1413 1414 if (client[kUrl].protocol === 'https:' && client[kServerName] !== request.servername) { 1415 if (client[kRunning] > 0) { 1416 return 1417 } 1418 1419 client[kServerName] = request.servername 1420 1421 if (socket && socket.servername !== request.servername) { 1422 util.destroy(socket, new InformationalError('servername changed')) 1423 return 1424 } 1425 } 1426 1427 if (client[kConnecting]) { 1428 return 1429 } 1430 1431 if (!socket && !client[kHTTP2Session]) { 1432 connect(client) 1433 return 1434 } 1435 1436 if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) { 1437 return 1438 } 1439 1440 if (client[kRunning] > 0 && !request.idempotent) { 1441 // Non-idempotent request cannot be retried. 1442 // Ensure that no other requests are inflight and 1443 // could cause failure. 1444 return 1445 } 1446 1447 if (client[kRunning] > 0 && (request.upgrade || request.method === 'CONNECT')) { 1448 // Don't dispatch an upgrade until all preceding requests have completed. 1449 // A misbehaving server might upgrade the connection before all pipelined 1450 // request has completed. 1451 return 1452 } 1453 1454 if (client[kRunning] > 0 && util.bodyLength(request.body) !== 0 && 1455 (util.isStream(request.body) || util.isAsyncIterable(request.body))) { 1456 // Request with stream or iterator body can error while other requests 1457 // are inflight and indirectly error those as well. 1458 // Ensure this doesn't happen by waiting for inflight 1459 // to complete before dispatching. 1460 1461 // Request with stream or iterator body cannot be retried. 1462 // Ensure that no other requests are inflight and 1463 // could cause failure. 1464 return 1465 } 1466 1467 if (!request.aborted && write(client, request)) { 1468 client[kPendingIdx]++ 1469 } else { 1470 client[kQueue].splice(client[kPendingIdx], 1) 1471 } 1472 } 1473} 1474 1475// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2 1476function shouldSendContentLength (method) { 1477 return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' 1478} 1479 1480function write (client, request) { 1481 if (client[kHTTPConnVersion] === 'h2') { 1482 writeH2(client, client[kHTTP2Session], request) 1483 return 1484 } 1485 1486 const { body, method, path, host, upgrade, headers, blocking, reset } = request 1487 1488 // https://tools.ietf.org/html/rfc7231#section-4.3.1 1489 // https://tools.ietf.org/html/rfc7231#section-4.3.2 1490 // https://tools.ietf.org/html/rfc7231#section-4.3.5 1491 1492 // Sending a payload body on a request that does not 1493 // expect it can cause undefined behavior on some 1494 // servers and corrupt connection state. Do not 1495 // re-use the connection for further requests. 1496 1497 const expectsPayload = ( 1498 method === 'PUT' || 1499 method === 'POST' || 1500 method === 'PATCH' 1501 ) 1502 1503 if (body && typeof body.read === 'function') { 1504 // Try to read EOF in order to get length. 1505 body.read(0) 1506 } 1507 1508 const bodyLength = util.bodyLength(body) 1509 1510 let contentLength = bodyLength 1511 1512 if (contentLength === null) { 1513 contentLength = request.contentLength 1514 } 1515 1516 if (contentLength === 0 && !expectsPayload) { 1517 // https://tools.ietf.org/html/rfc7230#section-3.3.2 1518 // A user agent SHOULD NOT send a Content-Length header field when 1519 // the request message does not contain a payload body and the method 1520 // semantics do not anticipate such a body. 1521 1522 contentLength = null 1523 } 1524 1525 // https://github.com/nodejs/undici/issues/2046 1526 // A user agent may send a Content-Length header with 0 value, this should be allowed. 1527 if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) { 1528 if (client[kStrictContentLength]) { 1529 errorRequest(client, request, new RequestContentLengthMismatchError()) 1530 return false 1531 } 1532 1533 process.emitWarning(new RequestContentLengthMismatchError()) 1534 } 1535 1536 const socket = client[kSocket] 1537 1538 try { 1539 request.onConnect((err) => { 1540 if (request.aborted || request.completed) { 1541 return 1542 } 1543 1544 errorRequest(client, request, err || new RequestAbortedError()) 1545 1546 util.destroy(socket, new InformationalError('aborted')) 1547 }) 1548 } catch (err) { 1549 errorRequest(client, request, err) 1550 } 1551 1552 if (request.aborted) { 1553 return false 1554 } 1555 1556 if (method === 'HEAD') { 1557 // https://github.com/mcollina/undici/issues/258 1558 // Close after a HEAD request to interop with misbehaving servers 1559 // that may send a body in the response. 1560 1561 socket[kReset] = true 1562 } 1563 1564 if (upgrade || method === 'CONNECT') { 1565 // On CONNECT or upgrade, block pipeline from dispatching further 1566 // requests on this connection. 1567 1568 socket[kReset] = true 1569 } 1570 1571 if (reset != null) { 1572 socket[kReset] = reset 1573 } 1574 1575 if (client[kMaxRequests] && socket[kCounter]++ >= client[kMaxRequests]) { 1576 socket[kReset] = true 1577 } 1578 1579 if (blocking) { 1580 socket[kBlocking] = true 1581 } 1582 1583 let header = `${method} ${path} HTTP/1.1\r\n` 1584 1585 if (typeof host === 'string') { 1586 header += `host: ${host}\r\n` 1587 } else { 1588 header += client[kHostHeader] 1589 } 1590 1591 if (upgrade) { 1592 header += `connection: upgrade\r\nupgrade: ${upgrade}\r\n` 1593 } else if (client[kPipelining] && !socket[kReset]) { 1594 header += 'connection: keep-alive\r\n' 1595 } else { 1596 header += 'connection: close\r\n' 1597 } 1598 1599 if (headers) { 1600 header += headers 1601 } 1602 1603 if (channels.sendHeaders.hasSubscribers) { 1604 channels.sendHeaders.publish({ request, headers: header, socket }) 1605 } 1606 1607 /* istanbul ignore else: assertion */ 1608 if (!body || bodyLength === 0) { 1609 if (contentLength === 0) { 1610 socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1') 1611 } else { 1612 assert(contentLength === null, 'no body must not have content length') 1613 socket.write(`${header}\r\n`, 'latin1') 1614 } 1615 request.onRequestSent() 1616 } else if (util.isBuffer(body)) { 1617 assert(contentLength === body.byteLength, 'buffer body must have content length') 1618 1619 socket.cork() 1620 socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') 1621 socket.write(body) 1622 socket.uncork() 1623 request.onBodySent(body) 1624 request.onRequestSent() 1625 if (!expectsPayload) { 1626 socket[kReset] = true 1627 } 1628 } else if (util.isBlobLike(body)) { 1629 if (typeof body.stream === 'function') { 1630 writeIterable({ body: body.stream(), client, request, socket, contentLength, header, expectsPayload }) 1631 } else { 1632 writeBlob({ body, client, request, socket, contentLength, header, expectsPayload }) 1633 } 1634 } else if (util.isStream(body)) { 1635 writeStream({ body, client, request, socket, contentLength, header, expectsPayload }) 1636 } else if (util.isIterable(body)) { 1637 writeIterable({ body, client, request, socket, contentLength, header, expectsPayload }) 1638 } else { 1639 assert(false) 1640 } 1641 1642 return true 1643} 1644 1645function writeH2 (client, session, request) { 1646 const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request 1647 1648 let headers 1649 if (typeof reqHeaders === 'string') headers = Request[kHTTP2CopyHeaders](reqHeaders.trim()) 1650 else headers = reqHeaders 1651 1652 if (upgrade) { 1653 errorRequest(client, request, new Error('Upgrade not supported for H2')) 1654 return false 1655 } 1656 1657 try { 1658 // TODO(HTTP/2): Should we call onConnect immediately or on stream ready event? 1659 request.onConnect((err) => { 1660 if (request.aborted || request.completed) { 1661 return 1662 } 1663 1664 errorRequest(client, request, err || new RequestAbortedError()) 1665 }) 1666 } catch (err) { 1667 errorRequest(client, request, err) 1668 } 1669 1670 if (request.aborted) { 1671 return false 1672 } 1673 1674 /** @type {import('node:http2').ClientHttp2Stream} */ 1675 let stream 1676 const h2State = client[kHTTP2SessionState] 1677 1678 headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost] 1679 headers[HTTP2_HEADER_METHOD] = method 1680 1681 if (method === 'CONNECT') { 1682 session.ref() 1683 // we are already connected, streams are pending, first request 1684 // will create a new stream. We trigger a request to create the stream and wait until 1685 // `ready` event is triggered 1686 // We disabled endStream to allow the user to write to the stream 1687 stream = session.request(headers, { endStream: false, signal }) 1688 1689 if (stream.id && !stream.pending) { 1690 request.onUpgrade(null, null, stream) 1691 ++h2State.openStreams 1692 } else { 1693 stream.once('ready', () => { 1694 request.onUpgrade(null, null, stream) 1695 ++h2State.openStreams 1696 }) 1697 } 1698 1699 stream.once('close', () => { 1700 h2State.openStreams -= 1 1701 // TODO(HTTP/2): unref only if current streams count is 0 1702 if (h2State.openStreams === 0) session.unref() 1703 }) 1704 1705 return true 1706 } 1707 1708 // https://tools.ietf.org/html/rfc7540#section-8.3 1709 // :path and :scheme headers must be omited when sending CONNECT 1710 1711 headers[HTTP2_HEADER_PATH] = path 1712 headers[HTTP2_HEADER_SCHEME] = 'https' 1713 1714 // https://tools.ietf.org/html/rfc7231#section-4.3.1 1715 // https://tools.ietf.org/html/rfc7231#section-4.3.2 1716 // https://tools.ietf.org/html/rfc7231#section-4.3.5 1717 1718 // Sending a payload body on a request that does not 1719 // expect it can cause undefined behavior on some 1720 // servers and corrupt connection state. Do not 1721 // re-use the connection for further requests. 1722 1723 const expectsPayload = ( 1724 method === 'PUT' || 1725 method === 'POST' || 1726 method === 'PATCH' 1727 ) 1728 1729 if (body && typeof body.read === 'function') { 1730 // Try to read EOF in order to get length. 1731 body.read(0) 1732 } 1733 1734 let contentLength = util.bodyLength(body) 1735 1736 if (contentLength == null) { 1737 contentLength = request.contentLength 1738 } 1739 1740 if (contentLength === 0 || !expectsPayload) { 1741 // https://tools.ietf.org/html/rfc7230#section-3.3.2 1742 // A user agent SHOULD NOT send a Content-Length header field when 1743 // the request message does not contain a payload body and the method 1744 // semantics do not anticipate such a body. 1745 1746 contentLength = null 1747 } 1748 1749 // https://github.com/nodejs/undici/issues/2046 1750 // A user agent may send a Content-Length header with 0 value, this should be allowed. 1751 if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) { 1752 if (client[kStrictContentLength]) { 1753 errorRequest(client, request, new RequestContentLengthMismatchError()) 1754 return false 1755 } 1756 1757 process.emitWarning(new RequestContentLengthMismatchError()) 1758 } 1759 1760 if (contentLength != null) { 1761 assert(body, 'no body must not have content length') 1762 headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}` 1763 } 1764 1765 session.ref() 1766 1767 const shouldEndStream = method === 'GET' || method === 'HEAD' 1768 if (expectContinue) { 1769 headers[HTTP2_HEADER_EXPECT] = '100-continue' 1770 stream = session.request(headers, { endStream: shouldEndStream, signal }) 1771 1772 stream.once('continue', writeBodyH2) 1773 } else { 1774 stream = session.request(headers, { 1775 endStream: shouldEndStream, 1776 signal 1777 }) 1778 writeBodyH2() 1779 } 1780 1781 // Increment counter as we have new several streams open 1782 ++h2State.openStreams 1783 1784 stream.once('response', headers => { 1785 const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers 1786 1787 if (request.onHeaders(Number(statusCode), realHeaders, stream.resume.bind(stream), '') === false) { 1788 stream.pause() 1789 } 1790 }) 1791 1792 stream.once('end', () => { 1793 request.onComplete([]) 1794 }) 1795 1796 stream.on('data', (chunk) => { 1797 if (request.onData(chunk) === false) { 1798 stream.pause() 1799 } 1800 }) 1801 1802 stream.once('close', () => { 1803 h2State.openStreams -= 1 1804 // TODO(HTTP/2): unref only if current streams count is 0 1805 if (h2State.openStreams === 0) { 1806 session.unref() 1807 } 1808 }) 1809 1810 stream.once('error', function (err) { 1811 if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { 1812 h2State.streams -= 1 1813 util.destroy(stream, err) 1814 } 1815 }) 1816 1817 stream.once('frameError', (type, code) => { 1818 const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) 1819 errorRequest(client, request, err) 1820 1821 if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { 1822 h2State.streams -= 1 1823 util.destroy(stream, err) 1824 } 1825 }) 1826 1827 // stream.on('aborted', () => { 1828 // // TODO(HTTP/2): Support aborted 1829 // }) 1830 1831 // stream.on('timeout', () => { 1832 // // TODO(HTTP/2): Support timeout 1833 // }) 1834 1835 // stream.on('push', headers => { 1836 // // TODO(HTTP/2): Suppor push 1837 // }) 1838 1839 // stream.on('trailers', headers => { 1840 // // TODO(HTTP/2): Support trailers 1841 // }) 1842 1843 return true 1844 1845 function writeBodyH2 () { 1846 /* istanbul ignore else: assertion */ 1847 if (!body) { 1848 request.onRequestSent() 1849 } else if (util.isBuffer(body)) { 1850 assert(contentLength === body.byteLength, 'buffer body must have content length') 1851 stream.cork() 1852 stream.write(body) 1853 stream.uncork() 1854 stream.end() 1855 request.onBodySent(body) 1856 request.onRequestSent() 1857 } else if (util.isBlobLike(body)) { 1858 if (typeof body.stream === 'function') { 1859 writeIterable({ 1860 client, 1861 request, 1862 contentLength, 1863 h2stream: stream, 1864 expectsPayload, 1865 body: body.stream(), 1866 socket: client[kSocket], 1867 header: '' 1868 }) 1869 } else { 1870 writeBlob({ 1871 body, 1872 client, 1873 request, 1874 contentLength, 1875 expectsPayload, 1876 h2stream: stream, 1877 header: '', 1878 socket: client[kSocket] 1879 }) 1880 } 1881 } else if (util.isStream(body)) { 1882 writeStream({ 1883 body, 1884 client, 1885 request, 1886 contentLength, 1887 expectsPayload, 1888 socket: client[kSocket], 1889 h2stream: stream, 1890 header: '' 1891 }) 1892 } else if (util.isIterable(body)) { 1893 writeIterable({ 1894 body, 1895 client, 1896 request, 1897 contentLength, 1898 expectsPayload, 1899 header: '', 1900 h2stream: stream, 1901 socket: client[kSocket] 1902 }) 1903 } else { 1904 assert(false) 1905 } 1906 } 1907} 1908 1909function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { 1910 assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined') 1911 1912 if (client[kHTTPConnVersion] === 'h2') { 1913 // For HTTP/2, is enough to pipe the stream 1914 const pipe = pipeline( 1915 body, 1916 h2stream, 1917 (err) => { 1918 if (err) { 1919 util.destroy(body, err) 1920 util.destroy(h2stream, err) 1921 } else { 1922 request.onRequestSent() 1923 } 1924 } 1925 ) 1926 1927 pipe.on('data', onPipeData) 1928 pipe.once('end', () => { 1929 pipe.removeListener('data', onPipeData) 1930 util.destroy(pipe) 1931 }) 1932 1933 function onPipeData (chunk) { 1934 request.onBodySent(chunk) 1935 } 1936 1937 return 1938 } 1939 1940 let finished = false 1941 1942 const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header }) 1943 1944 const onData = function (chunk) { 1945 if (finished) { 1946 return 1947 } 1948 1949 try { 1950 if (!writer.write(chunk) && this.pause) { 1951 this.pause() 1952 } 1953 } catch (err) { 1954 util.destroy(this, err) 1955 } 1956 } 1957 const onDrain = function () { 1958 if (finished) { 1959 return 1960 } 1961 1962 if (body.resume) { 1963 body.resume() 1964 } 1965 } 1966 const onAbort = function () { 1967 if (finished) { 1968 return 1969 } 1970 const err = new RequestAbortedError() 1971 queueMicrotask(() => onFinished(err)) 1972 } 1973 const onFinished = function (err) { 1974 if (finished) { 1975 return 1976 } 1977 1978 finished = true 1979 1980 assert(socket.destroyed || (socket[kWriting] && client[kRunning] <= 1)) 1981 1982 socket 1983 .off('drain', onDrain) 1984 .off('error', onFinished) 1985 1986 body 1987 .removeListener('data', onData) 1988 .removeListener('end', onFinished) 1989 .removeListener('error', onFinished) 1990 .removeListener('close', onAbort) 1991 1992 if (!err) { 1993 try { 1994 writer.end() 1995 } catch (er) { 1996 err = er 1997 } 1998 } 1999 2000 writer.destroy(err) 2001 2002 if (err && (err.code !== 'UND_ERR_INFO' || err.message !== 'reset')) { 2003 util.destroy(body, err) 2004 } else { 2005 util.destroy(body) 2006 } 2007 } 2008 2009 body 2010 .on('data', onData) 2011 .on('end', onFinished) 2012 .on('error', onFinished) 2013 .on('close', onAbort) 2014 2015 if (body.resume) { 2016 body.resume() 2017 } 2018 2019 socket 2020 .on('drain', onDrain) 2021 .on('error', onFinished) 2022} 2023 2024async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { 2025 assert(contentLength === body.size, 'blob body must have content length') 2026 2027 const isH2 = client[kHTTPConnVersion] === 'h2' 2028 try { 2029 if (contentLength != null && contentLength !== body.size) { 2030 throw new RequestContentLengthMismatchError() 2031 } 2032 2033 const buffer = Buffer.from(await body.arrayBuffer()) 2034 2035 if (isH2) { 2036 h2stream.cork() 2037 h2stream.write(buffer) 2038 h2stream.uncork() 2039 } else { 2040 socket.cork() 2041 socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') 2042 socket.write(buffer) 2043 socket.uncork() 2044 } 2045 2046 request.onBodySent(buffer) 2047 request.onRequestSent() 2048 2049 if (!expectsPayload) { 2050 socket[kReset] = true 2051 } 2052 2053 resume(client) 2054 } catch (err) { 2055 util.destroy(isH2 ? h2stream : socket, err) 2056 } 2057} 2058 2059async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { 2060 assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined') 2061 2062 let callback = null 2063 function onDrain () { 2064 if (callback) { 2065 const cb = callback 2066 callback = null 2067 cb() 2068 } 2069 } 2070 2071 const waitForDrain = () => new Promise((resolve, reject) => { 2072 assert(callback === null) 2073 2074 if (socket[kError]) { 2075 reject(socket[kError]) 2076 } else { 2077 callback = resolve 2078 } 2079 }) 2080 2081 if (client[kHTTPConnVersion] === 'h2') { 2082 h2stream 2083 .on('close', onDrain) 2084 .on('drain', onDrain) 2085 2086 try { 2087 // It's up to the user to somehow abort the async iterable. 2088 for await (const chunk of body) { 2089 if (socket[kError]) { 2090 throw socket[kError] 2091 } 2092 2093 const res = h2stream.write(chunk) 2094 request.onBodySent(chunk) 2095 if (!res) { 2096 await waitForDrain() 2097 } 2098 } 2099 } catch (err) { 2100 h2stream.destroy(err) 2101 } finally { 2102 request.onRequestSent() 2103 h2stream.end() 2104 h2stream 2105 .off('close', onDrain) 2106 .off('drain', onDrain) 2107 } 2108 2109 return 2110 } 2111 2112 socket 2113 .on('close', onDrain) 2114 .on('drain', onDrain) 2115 2116 const writer = new AsyncWriter({ socket, request, contentLength, client, expectsPayload, header }) 2117 try { 2118 // It's up to the user to somehow abort the async iterable. 2119 for await (const chunk of body) { 2120 if (socket[kError]) { 2121 throw socket[kError] 2122 } 2123 2124 if (!writer.write(chunk)) { 2125 await waitForDrain() 2126 } 2127 } 2128 2129 writer.end() 2130 } catch (err) { 2131 writer.destroy(err) 2132 } finally { 2133 socket 2134 .off('close', onDrain) 2135 .off('drain', onDrain) 2136 } 2137} 2138 2139class AsyncWriter { 2140 constructor ({ socket, request, contentLength, client, expectsPayload, header }) { 2141 this.socket = socket 2142 this.request = request 2143 this.contentLength = contentLength 2144 this.client = client 2145 this.bytesWritten = 0 2146 this.expectsPayload = expectsPayload 2147 this.header = header 2148 2149 socket[kWriting] = true 2150 } 2151 2152 write (chunk) { 2153 const { socket, request, contentLength, client, bytesWritten, expectsPayload, header } = this 2154 2155 if (socket[kError]) { 2156 throw socket[kError] 2157 } 2158 2159 if (socket.destroyed) { 2160 return false 2161 } 2162 2163 const len = Buffer.byteLength(chunk) 2164 if (!len) { 2165 return true 2166 } 2167 2168 // We should defer writing chunks. 2169 if (contentLength !== null && bytesWritten + len > contentLength) { 2170 if (client[kStrictContentLength]) { 2171 throw new RequestContentLengthMismatchError() 2172 } 2173 2174 process.emitWarning(new RequestContentLengthMismatchError()) 2175 } 2176 2177 socket.cork() 2178 2179 if (bytesWritten === 0) { 2180 if (!expectsPayload) { 2181 socket[kReset] = true 2182 } 2183 2184 if (contentLength === null) { 2185 socket.write(`${header}transfer-encoding: chunked\r\n`, 'latin1') 2186 } else { 2187 socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1') 2188 } 2189 } 2190 2191 if (contentLength === null) { 2192 socket.write(`\r\n${len.toString(16)}\r\n`, 'latin1') 2193 } 2194 2195 this.bytesWritten += len 2196 2197 const ret = socket.write(chunk) 2198 2199 socket.uncork() 2200 2201 request.onBodySent(chunk) 2202 2203 if (!ret) { 2204 if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) { 2205 // istanbul ignore else: only for jest 2206 if (socket[kParser].timeout.refresh) { 2207 socket[kParser].timeout.refresh() 2208 } 2209 } 2210 } 2211 2212 return ret 2213 } 2214 2215 end () { 2216 const { socket, contentLength, client, bytesWritten, expectsPayload, header, request } = this 2217 request.onRequestSent() 2218 2219 socket[kWriting] = false 2220 2221 if (socket[kError]) { 2222 throw socket[kError] 2223 } 2224 2225 if (socket.destroyed) { 2226 return 2227 } 2228 2229 if (bytesWritten === 0) { 2230 if (expectsPayload) { 2231 // https://tools.ietf.org/html/rfc7230#section-3.3.2 2232 // A user agent SHOULD send a Content-Length in a request message when 2233 // no Transfer-Encoding is sent and the request method defines a meaning 2234 // for an enclosed payload body. 2235 2236 socket.write(`${header}content-length: 0\r\n\r\n`, 'latin1') 2237 } else { 2238 socket.write(`${header}\r\n`, 'latin1') 2239 } 2240 } else if (contentLength === null) { 2241 socket.write('\r\n0\r\n\r\n', 'latin1') 2242 } 2243 2244 if (contentLength !== null && bytesWritten !== contentLength) { 2245 if (client[kStrictContentLength]) { 2246 throw new RequestContentLengthMismatchError() 2247 } else { 2248 process.emitWarning(new RequestContentLengthMismatchError()) 2249 } 2250 } 2251 2252 if (socket[kParser].timeout && socket[kParser].timeoutType === TIMEOUT_HEADERS) { 2253 // istanbul ignore else: only for jest 2254 if (socket[kParser].timeout.refresh) { 2255 socket[kParser].timeout.refresh() 2256 } 2257 } 2258 2259 resume(client) 2260 } 2261 2262 destroy (err) { 2263 const { socket, client } = this 2264 2265 socket[kWriting] = false 2266 2267 if (err) { 2268 assert(client[kRunning] <= 1, 'pipeline should only contain this request') 2269 util.destroy(socket, err) 2270 } 2271 } 2272} 2273 2274function errorRequest (client, request, err) { 2275 try { 2276 request.onError(err) 2277 assert(request.aborted) 2278 } catch (err) { 2279 client.emit('error', err) 2280 } 2281} 2282 2283module.exports = Client 2284