1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22'use strict'; 23 24const { 25 NumberIsNaN, 26 ObjectKeys, 27 ObjectSetPrototypeOf, 28 ObjectValues, 29 Symbol, 30} = primordials; 31 32const net = require('net'); 33const EventEmitter = require('events'); 34let debug = require('internal/util/debuglog').debuglog('http', (fn) => { 35 debug = fn; 36}); 37const { async_id_symbol } = require('internal/async_hooks').symbols; 38const { 39 codes: { 40 ERR_OUT_OF_RANGE, 41 ERR_INVALID_OPT_VALUE, 42 }, 43} = require('internal/errors'); 44const { validateNumber } = require('internal/validators'); 45 46const kOnKeylog = Symbol('onkeylog'); 47const kRequestOptions = Symbol('requestOptions'); 48// New Agent code. 49 50// The largest departure from the previous implementation is that 51// an Agent instance holds connections for a variable number of host:ports. 52// Surprisingly, this is still API compatible as far as third parties are 53// concerned. The only code that really notices the difference is the 54// request object. 55 56// Another departure is that all code related to HTTP parsing is in 57// ClientRequest.onSocket(). The Agent is now *strictly* 58// concerned with managing a connection pool. 59 60class ReusedHandle { 61 constructor(type, handle) { 62 this.type = type; 63 this.handle = handle; 64 } 65} 66 67function freeSocketErrorListener(err) { 68 const socket = this; 69 debug('SOCKET ERROR on FREE socket:', err.message, err.stack); 70 socket.destroy(); 71 socket.emit('agentRemove'); 72} 73 74function Agent(options) { 75 if (!(this instanceof Agent)) 76 return new Agent(options); 77 78 EventEmitter.call(this); 79 80 this.defaultPort = 80; 81 this.protocol = 'http:'; 82 83 this.options = { ...options }; 84 85 // Don't confuse net and make it think that we're connecting to a pipe 86 this.options.path = null; 87 this.requests = {}; 88 this.sockets = {}; 89 this.freeSockets = {}; 90 this.keepAliveMsecs = this.options.keepAliveMsecs || 1000; 91 this.keepAlive = this.options.keepAlive || false; 92 this.maxSockets = this.options.maxSockets || Agent.defaultMaxSockets; 93 this.maxFreeSockets = this.options.maxFreeSockets || 256; 94 this.maxTotalSockets = this.options.maxTotalSockets; 95 this.totalSocketCount = 0; 96 97 if (this.maxTotalSockets !== undefined) { 98 validateNumber(this.maxTotalSockets, 'maxTotalSockets'); 99 if (this.maxTotalSockets <= 0 || NumberIsNaN(this.maxTotalSockets)) 100 throw new ERR_OUT_OF_RANGE('maxTotalSockets', '> 0', 101 this.maxTotalSockets); 102 } else { 103 this.maxTotalSockets = Infinity; 104 } 105 106 this.scheduling = this.options.scheduling || 'fifo'; 107 108 if (this.scheduling !== 'fifo' && this.scheduling !== 'lifo') { 109 throw new ERR_INVALID_OPT_VALUE('scheduling', this.scheduling); 110 } 111 112 this.on('free', (socket, options) => { 113 const name = this.getName(options); 114 debug('agent.on(free)', name); 115 116 // TODO(ronag): socket.destroy(err) might have been called 117 // before coming here and have an 'error' scheduled. In the 118 // case of socket.destroy() below this 'error' has no handler 119 // and could cause unhandled exception. 120 121 if (socket.writable && 122 this.requests[name] && this.requests[name].length) { 123 const req = this.requests[name].shift(); 124 setRequestSocket(this, req, socket); 125 if (this.requests[name].length === 0) { 126 // don't leak 127 delete this.requests[name]; 128 } 129 } else { 130 // If there are no pending requests, then put it in 131 // the freeSockets pool, but only if we're allowed to do so. 132 const req = socket._httpMessage; 133 if (req && 134 req.shouldKeepAlive && 135 socket.writable && 136 this.keepAlive) { 137 let freeSockets = this.freeSockets[name]; 138 const freeLen = freeSockets ? freeSockets.length : 0; 139 let count = freeLen; 140 if (this.sockets[name]) 141 count += this.sockets[name].length; 142 143 if (this.totalSocketCount > this.maxTotalSockets || 144 count > this.maxSockets || 145 freeLen >= this.maxFreeSockets) { 146 socket.destroy(); 147 } else if (this.keepSocketAlive(socket)) { 148 freeSockets = freeSockets || []; 149 this.freeSockets[name] = freeSockets; 150 socket[async_id_symbol] = -1; 151 socket._httpMessage = null; 152 this.removeSocket(socket, options); 153 154 const agentTimeout = this.options.timeout || 0; 155 if (socket.timeout !== agentTimeout) { 156 socket.setTimeout(agentTimeout); 157 } 158 159 socket.once('error', freeSocketErrorListener); 160 freeSockets.push(socket); 161 } else { 162 // Implementation doesn't want to keep socket alive 163 socket.destroy(); 164 } 165 } else { 166 socket.destroy(); 167 } 168 } 169 }); 170 171 // Don't emit keylog events unless there is a listener for them. 172 this.on('newListener', maybeEnableKeylog); 173} 174ObjectSetPrototypeOf(Agent.prototype, EventEmitter.prototype); 175ObjectSetPrototypeOf(Agent, EventEmitter); 176 177function maybeEnableKeylog(eventName) { 178 if (eventName === 'keylog') { 179 this.removeListener('newListener', maybeEnableKeylog); 180 // Future sockets will listen on keylog at creation. 181 const agent = this; 182 this[kOnKeylog] = function onkeylog(keylog) { 183 agent.emit('keylog', keylog, this); 184 }; 185 // Existing sockets will start listening on keylog now. 186 for (const socket of ObjectValues(this.sockets)) { 187 socket.on('keylog', this[kOnKeylog]); 188 } 189 } 190} 191 192Agent.defaultMaxSockets = Infinity; 193 194Agent.prototype.createConnection = net.createConnection; 195 196// Get the key for a given set of request options 197Agent.prototype.getName = function getName(options) { 198 let name = options.host || 'localhost'; 199 200 name += ':'; 201 if (options.port) 202 name += options.port; 203 204 name += ':'; 205 if (options.localAddress) 206 name += options.localAddress; 207 208 // Pacify parallel/test-http-agent-getname by only appending 209 // the ':' when options.family is set. 210 if (options.family === 4 || options.family === 6) 211 name += `:${options.family}`; 212 213 if (options.socketPath) 214 name += `:${options.socketPath}`; 215 216 return name; 217}; 218 219Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */, 220 localAddress/* legacy */) { 221 // Legacy API: addRequest(req, host, port, localAddress) 222 if (typeof options === 'string') { 223 options = { 224 host: options, 225 port, 226 localAddress 227 }; 228 } 229 230 options = { ...options, ...this.options }; 231 if (options.socketPath) 232 options.path = options.socketPath; 233 234 if (!options.servername && options.servername !== '') 235 options.servername = calculateServerName(options, req); 236 237 const name = this.getName(options); 238 if (!this.sockets[name]) { 239 this.sockets[name] = []; 240 } 241 242 const freeSockets = this.freeSockets[name]; 243 let socket; 244 if (freeSockets) { 245 while (freeSockets.length && freeSockets[0].destroyed) { 246 freeSockets.shift(); 247 } 248 socket = this.scheduling === 'fifo' ? 249 freeSockets.shift() : 250 freeSockets.pop(); 251 if (!freeSockets.length) 252 delete this.freeSockets[name]; 253 } 254 255 const freeLen = freeSockets ? freeSockets.length : 0; 256 const sockLen = freeLen + this.sockets[name].length; 257 258 if (socket) { 259 // Guard against an uninitialized or user supplied Socket. 260 const handle = socket._handle; 261 if (handle && typeof handle.asyncReset === 'function') { 262 // Assign the handle a new asyncId and run any destroy()/init() hooks. 263 handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle)); 264 socket[async_id_symbol] = handle.getAsyncId(); 265 } 266 267 this.reuseSocket(socket, req); 268 setRequestSocket(this, req, socket); 269 this.sockets[name].push(socket); 270 this.totalSocketCount++; 271 } else if (sockLen < this.maxSockets && 272 this.totalSocketCount < this.maxTotalSockets) { 273 debug('call onSocket', sockLen, freeLen); 274 // If we are under maxSockets create a new one. 275 this.createSocket(req, options, handleSocketCreation(this, req, true)); 276 } else { 277 debug('wait for socket'); 278 // We are over limit so we'll add it to the queue. 279 if (!this.requests[name]) { 280 this.requests[name] = []; 281 } 282 283 // Used to create sockets for pending requests from different origin 284 req[kRequestOptions] = options; 285 286 this.requests[name].push(req); 287 } 288}; 289 290Agent.prototype.createSocket = function createSocket(req, options, cb) { 291 options = { ...options, ...this.options }; 292 if (options.socketPath) 293 options.path = options.socketPath; 294 295 if (!options.servername && options.servername !== '') 296 options.servername = calculateServerName(options, req); 297 298 const name = this.getName(options); 299 options._agentKey = name; 300 301 debug('createConnection', name, options); 302 options.encoding = null; 303 let called = false; 304 305 const oncreate = (err, s) => { 306 if (called) 307 return; 308 called = true; 309 if (err) 310 return cb(err); 311 if (!this.sockets[name]) { 312 this.sockets[name] = []; 313 } 314 this.sockets[name].push(s); 315 this.totalSocketCount++; 316 debug('sockets', name, this.sockets[name].length, this.totalSocketCount); 317 installListeners(this, s, options); 318 cb(null, s); 319 }; 320 321 const newSocket = this.createConnection(options, oncreate); 322 if (newSocket) 323 oncreate(null, newSocket); 324}; 325 326function calculateServerName(options, req) { 327 let servername = options.host; 328 const hostHeader = req.getHeader('host'); 329 if (hostHeader) { 330 // abc => abc 331 // abc:123 => abc 332 // [::1] => ::1 333 // [::1]:123 => ::1 334 if (hostHeader.startsWith('[')) { 335 const index = hostHeader.indexOf(']'); 336 if (index === -1) { 337 // Leading '[', but no ']'. Need to do something... 338 servername = hostHeader; 339 } else { 340 servername = hostHeader.substr(1, index - 1); 341 } 342 } else { 343 servername = hostHeader.split(':', 1)[0]; 344 } 345 } 346 // Don't implicitly set invalid (IP) servernames. 347 if (net.isIP(servername)) 348 servername = ''; 349 return servername; 350} 351 352function installListeners(agent, s, options) { 353 function onFree() { 354 debug('CLIENT socket onFree'); 355 agent.emit('free', s, options); 356 } 357 s.on('free', onFree); 358 359 function onClose(err) { 360 debug('CLIENT socket onClose'); 361 // This is the only place where sockets get removed from the Agent. 362 // If you want to remove a socket from the pool, just close it. 363 // All socket errors end in a close event anyway. 364 agent.removeSocket(s, options); 365 } 366 s.on('close', onClose); 367 368 function onTimeout() { 369 debug('CLIENT socket onTimeout'); 370 371 // Destroy if in free list. 372 // TODO(ronag): Always destroy, even if not in free list. 373 const sockets = agent.freeSockets; 374 for (const name of ObjectKeys(sockets)) { 375 if (sockets[name].includes(s)) { 376 return s.destroy(); 377 } 378 } 379 } 380 s.on('timeout', onTimeout); 381 382 function onRemove() { 383 // We need this function for cases like HTTP 'upgrade' 384 // (defined by WebSockets) where we need to remove a socket from the 385 // pool because it'll be locked up indefinitely 386 debug('CLIENT socket onRemove'); 387 agent.removeSocket(s, options); 388 s.removeListener('close', onClose); 389 s.removeListener('free', onFree); 390 s.removeListener('timeout', onTimeout); 391 s.removeListener('agentRemove', onRemove); 392 } 393 s.on('agentRemove', onRemove); 394 395 if (agent[kOnKeylog]) { 396 s.on('keylog', agent[kOnKeylog]); 397 } 398} 399 400Agent.prototype.removeSocket = function removeSocket(s, options) { 401 const name = this.getName(options); 402 debug('removeSocket', name, 'writable:', s.writable); 403 const sets = [this.sockets]; 404 405 // If the socket was destroyed, remove it from the free buffers too. 406 if (!s.writable) 407 sets.push(this.freeSockets); 408 409 for (const sockets of sets) { 410 if (sockets[name]) { 411 const index = sockets[name].indexOf(s); 412 if (index !== -1) { 413 sockets[name].splice(index, 1); 414 // Don't leak 415 if (sockets[name].length === 0) 416 delete sockets[name]; 417 this.totalSocketCount--; 418 } 419 } 420 } 421 422 let req; 423 if (this.requests[name] && this.requests[name].length) { 424 debug('removeSocket, have a request, make a socket'); 425 req = this.requests[name][0]; 426 } else { 427 // TODO(rickyes): this logic will not be FIFO across origins. 428 // There might be older requests in a different origin, but 429 // if the origin which releases the socket has pending requests 430 // that will be prioritized. 431 for (const prop of ObjectKeys(this.requests)) { 432 // Check whether this specific origin is already at maxSockets 433 if (this.sockets[prop] && this.sockets[prop].length) break; 434 debug('removeSocket, have a request with different origin,' + 435 ' make a socket'); 436 req = this.requests[prop][0]; 437 options = req[kRequestOptions]; 438 break; 439 } 440 } 441 442 if (req && options) { 443 req[kRequestOptions] = undefined; 444 // If we have pending requests and a socket gets closed make a new one 445 const socketCreationHandler = handleSocketCreation(this, req, false); 446 this.createSocket(req, options, socketCreationHandler); 447 } 448 449}; 450 451Agent.prototype.keepSocketAlive = function keepSocketAlive(socket) { 452 socket.setKeepAlive(true, this.keepAliveMsecs); 453 socket.unref(); 454 455 return true; 456}; 457 458Agent.prototype.reuseSocket = function reuseSocket(socket, req) { 459 debug('have free socket'); 460 socket.removeListener('error', freeSocketErrorListener); 461 req.reusedSocket = true; 462 socket.ref(); 463}; 464 465Agent.prototype.destroy = function destroy() { 466 for (const set of [this.freeSockets, this.sockets]) { 467 for (const key of ObjectKeys(set)) { 468 for (const setName of set[key]) { 469 setName.destroy(); 470 } 471 } 472 } 473}; 474 475function handleSocketCreation(agent, request, informRequest) { 476 return function handleSocketCreation_Inner(err, socket) { 477 if (err) { 478 process.nextTick(emitErrorNT, request, err); 479 return; 480 } 481 if (informRequest) 482 setRequestSocket(agent, request, socket); 483 else 484 socket.emit('free'); 485 }; 486} 487 488function setRequestSocket(agent, req, socket) { 489 req.onSocket(socket); 490 const agentTimeout = agent.options.timeout || 0; 491 if (req.timeout === undefined || req.timeout === agentTimeout) { 492 return; 493 } 494 socket.setTimeout(req.timeout); 495} 496 497function emitErrorNT(emitter, err) { 498 emitter.emit('error', err); 499} 500 501module.exports = { 502 Agent, 503 globalAgent: new Agent() 504}; 505