1// Copyright Joyent, Inc. and other Node contributors. 2// 3// Permission is hereby granted, free of charge, to any person obtaining a 4// copy of this software and associated documentation files (the 5// "Software"), to deal in the Software without restriction, including 6// without limitation the rights to use, copy, modify, merge, publish, 7// distribute, sublicense, and/or sell copies of the Software, and to permit 8// persons to whom the Software is furnished to do so, subject to the 9// following conditions: 10// 11// The above copyright notice and this permission notice shall be included 12// in all copies or substantial portions of the Software. 13// 14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS 15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN 17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, 18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR 19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE 20// USE OR OTHER DEALINGS IN THE SOFTWARE. 21 22// patch from https://github.com/nodejs/node/blob/v7.2.1/lib/_http_agent.js 23 24'use strict'; 25 26const net = require('net'); 27const util = require('util'); 28const EventEmitter = require('events'); 29const debug = util.debuglog('http'); 30 31// New Agent code. 32 33// The largest departure from the previous implementation is that 34// an Agent instance holds connections for a variable number of host:ports. 35// Surprisingly, this is still API compatible as far as third parties are 36// concerned. The only code that really notices the difference is the 37// request object. 38 39// Another departure is that all code related to HTTP parsing is in 40// ClientRequest.onSocket(). The Agent is now *strictly* 41// concerned with managing a connection pool. 42 43function Agent(options) { 44 if (!(this instanceof Agent)) 45 return new Agent(options); 46 47 EventEmitter.call(this); 48 49 var self = this; 50 51 self.defaultPort = 80; 52 self.protocol = 'http:'; 53 54 self.options = util._extend({}, options); 55 56 // don't confuse net and make it think that we're connecting to a pipe 57 self.options.path = null; 58 self.requests = {}; 59 self.sockets = {}; 60 self.freeSockets = {}; 61 self.keepAliveMsecs = self.options.keepAliveMsecs || 1000; 62 self.keepAlive = self.options.keepAlive || false; 63 self.maxSockets = self.options.maxSockets || Agent.defaultMaxSockets; 64 self.maxFreeSockets = self.options.maxFreeSockets || 256; 65 66 // [patch start] 67 // free keep-alive socket timeout. By default free socket do not have a timeout. 68 self.freeSocketKeepAliveTimeout = self.options.freeSocketKeepAliveTimeout || 0; 69 // working socket timeout. By default working socket do not have a timeout. 70 self.timeout = self.options.timeout || 0; 71 // the socket active time to live, even if it's in use 72 this.socketActiveTTL = this.options.socketActiveTTL || null; 73 // [patch end] 74 75 self.on('free', function(socket, options) { 76 var name = self.getName(options); 77 debug('agent.on(free)', name); 78 79 if (socket.writable && 80 self.requests[name] && self.requests[name].length) { 81 // [patch start] 82 debug('continue handle next request'); 83 // [patch end] 84 self.requests[name].shift().onSocket(socket); 85 if (self.requests[name].length === 0) { 86 // don't leak 87 delete self.requests[name]; 88 } 89 } else { 90 // If there are no pending requests, then put it in 91 // the freeSockets pool, but only if we're allowed to do so. 92 var req = socket._httpMessage; 93 if (req && 94 req.shouldKeepAlive && 95 socket.writable && 96 self.keepAlive) { 97 var freeSockets = self.freeSockets[name]; 98 var freeLen = freeSockets ? freeSockets.length : 0; 99 var count = freeLen; 100 if (self.sockets[name]) 101 count += self.sockets[name].length; 102 103 if (count > self.maxSockets || freeLen >= self.maxFreeSockets) { 104 socket.destroy(); 105 } else { 106 freeSockets = freeSockets || []; 107 self.freeSockets[name] = freeSockets; 108 socket.setKeepAlive(true, self.keepAliveMsecs); 109 socket.unref(); 110 socket._httpMessage = null; 111 self.removeSocket(socket, options); 112 freeSockets.push(socket); 113 114 // [patch start] 115 // Add a default error handler to avoid Unhandled 'error' event throw on idle socket 116 // https://github.com/node-modules/agentkeepalive/issues/25 117 // https://github.com/nodejs/node/pull/4482 (fixed in >= 4.4.0 and >= 5.4.0) 118 if (socket.listeners('error').length === 0) { 119 socket.once('error', freeSocketErrorListener); 120 } 121 // set free keepalive timer 122 // try to use socket custom freeSocketKeepAliveTimeout first 123 const freeSocketKeepAliveTimeout = socket.freeSocketKeepAliveTimeout || self.freeSocketKeepAliveTimeout; 124 socket.setTimeout(freeSocketKeepAliveTimeout); 125 debug(`push to free socket queue and wait for ${freeSocketKeepAliveTimeout}ms`); 126 // [patch end] 127 } 128 } else { 129 socket.destroy(); 130 } 131 } 132 }); 133} 134 135util.inherits(Agent, EventEmitter); 136exports.Agent = Agent; 137 138// [patch start] 139function freeSocketErrorListener(err) { 140 var socket = this; 141 debug('SOCKET ERROR on FREE socket:', err.message, err.stack); 142 socket.destroy(); 143 socket.emit('agentRemove'); 144} 145// [patch end] 146 147Agent.defaultMaxSockets = Infinity; 148 149Agent.prototype.createConnection = net.createConnection; 150 151// Get the key for a given set of request options 152Agent.prototype.getName = function getName(options) { 153 var name = options.host || 'localhost'; 154 155 name += ':'; 156 if (options.port) 157 name += options.port; 158 159 name += ':'; 160 if (options.localAddress) 161 name += options.localAddress; 162 163 // Pacify parallel/test-http-agent-getname by only appending 164 // the ':' when options.family is set. 165 if (options.family === 4 || options.family === 6) 166 name += ':' + options.family; 167 168 return name; 169}; 170 171// [patch start] 172function handleSocketCreation(req) { 173 return function(err, newSocket) { 174 if (err) { 175 process.nextTick(function() { 176 req.emit('error', err); 177 }); 178 return; 179 } 180 req.onSocket(newSocket); 181 } 182} 183// [patch end] 184 185Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/, 186 localAddress/*legacy*/) { 187 // Legacy API: addRequest(req, host, port, localAddress) 188 if (typeof options === 'string') { 189 options = { 190 host: options, 191 port, 192 localAddress 193 }; 194 } 195 196 options = util._extend({}, options); 197 options = util._extend(options, this.options); 198 199 if (!options.servername) 200 options.servername = calculateServerName(options, req); 201 202 var name = this.getName(options); 203 if (!this.sockets[name]) { 204 this.sockets[name] = []; 205 } 206 207 var freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0; 208 var sockLen = freeLen + this.sockets[name].length; 209 210 if (freeLen) { 211 // we have a free socket, so use that. 212 var socket = this.freeSockets[name].shift(); 213 debug('have free socket'); 214 215 // [patch start] 216 // remove free socket error event handler 217 socket.removeListener('error', freeSocketErrorListener); 218 // restart the default timer 219 socket.setTimeout(this.timeout); 220 221 if (this.socketActiveTTL && Date.now() - socket.createdTime > this.socketActiveTTL) { 222 debug(`socket ${socket.createdTime} expired`); 223 socket.destroy(); 224 return this.createSocket(req, options, handleSocketCreation(req)); 225 } 226 // [patch end] 227 228 // don't leak 229 if (!this.freeSockets[name].length) 230 delete this.freeSockets[name]; 231 232 socket.ref(); 233 req.onSocket(socket); 234 this.sockets[name].push(socket); 235 } else if (sockLen < this.maxSockets) { 236 debug('call onSocket', sockLen, freeLen); 237 // If we are under maxSockets create a new one. 238 // [patch start] 239 this.createSocket(req, options, handleSocketCreation(req)); 240 // [patch end] 241 } else { 242 debug('wait for socket'); 243 // We are over limit so we'll add it to the queue. 244 if (!this.requests[name]) { 245 this.requests[name] = []; 246 } 247 this.requests[name].push(req); 248 } 249}; 250 251Agent.prototype.createSocket = function createSocket(req, options, cb) { 252 var self = this; 253 options = util._extend({}, options); 254 options = util._extend(options, self.options); 255 256 if (!options.servername) 257 options.servername = calculateServerName(options, req); 258 259 var name = self.getName(options); 260 options._agentKey = name; 261 262 debug('createConnection', name, options); 263 options.encoding = null; 264 var called = false; 265 const newSocket = self.createConnection(options, oncreate); 266 // [patch start] 267 if (newSocket) { 268 oncreate(null, Object.assign(newSocket, { createdTime: Date.now() })); 269 } 270 // [patch end] 271 function oncreate(err, s) { 272 if (called) 273 return; 274 called = true; 275 if (err) 276 return cb(err); 277 if (!self.sockets[name]) { 278 self.sockets[name] = []; 279 } 280 self.sockets[name].push(s); 281 debug('sockets', name, self.sockets[name].length); 282 283 function onFree() { 284 self.emit('free', s, options); 285 } 286 s.on('free', onFree); 287 288 function onClose(err) { 289 debug('CLIENT socket onClose'); 290 // This is the only place where sockets get removed from the Agent. 291 // If you want to remove a socket from the pool, just close it. 292 // All socket errors end in a close event anyway. 293 self.removeSocket(s, options); 294 295 // [patch start] 296 self.emit('close'); 297 // [patch end] 298 } 299 s.on('close', onClose); 300 301 // [patch start] 302 // start socket timeout handler 303 function onTimeout() { 304 debug('CLIENT socket onTimeout'); 305 s.destroy(); 306 // Remove it from freeSockets immediately to prevent new requests from being sent through this socket. 307 self.removeSocket(s, options); 308 self.emit('timeout'); 309 } 310 s.on('timeout', onTimeout); 311 // set the default timer 312 s.setTimeout(self.timeout); 313 // [patch end] 314 315 function onRemove() { 316 // We need this function for cases like HTTP 'upgrade' 317 // (defined by WebSockets) where we need to remove a socket from the 318 // pool because it'll be locked up indefinitely 319 debug('CLIENT socket onRemove'); 320 self.removeSocket(s, options); 321 s.removeListener('close', onClose); 322 s.removeListener('free', onFree); 323 s.removeListener('agentRemove', onRemove); 324 325 // [patch start] 326 // remove socket timeout handler 327 s.setTimeout(0, onTimeout); 328 // [patch end] 329 } 330 s.on('agentRemove', onRemove); 331 cb(null, s); 332 } 333}; 334 335function calculateServerName(options, req) { 336 let servername = options.host; 337 const hostHeader = req.getHeader('host'); 338 if (hostHeader) { 339 // abc => abc 340 // abc:123 => abc 341 // [::1] => ::1 342 // [::1]:123 => ::1 343 if (hostHeader.startsWith('[')) { 344 const index = hostHeader.indexOf(']'); 345 if (index === -1) { 346 // Leading '[', but no ']'. Need to do something... 347 servername = hostHeader; 348 } else { 349 servername = hostHeader.substr(1, index - 1); 350 } 351 } else { 352 servername = hostHeader.split(':', 1)[0]; 353 } 354 } 355 return servername; 356} 357 358Agent.prototype.removeSocket = function removeSocket(s, options) { 359 var name = this.getName(options); 360 debug('removeSocket', name, 'writable:', s.writable); 361 var sets = [this.sockets]; 362 363 // If the socket was destroyed, remove it from the free buffers too. 364 if (!s.writable) 365 sets.push(this.freeSockets); 366 367 for (var sk = 0; sk < sets.length; sk++) { 368 var sockets = sets[sk]; 369 370 if (sockets[name]) { 371 var index = sockets[name].indexOf(s); 372 if (index !== -1) { 373 sockets[name].splice(index, 1); 374 // Don't leak 375 if (sockets[name].length === 0) 376 delete sockets[name]; 377 } 378 } 379 } 380 381 // [patch start] 382 var freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0; 383 var sockLen = freeLen + (this.sockets[name] ? this.sockets[name].length : 0); 384 // [patch end] 385 386 if (this.requests[name] && this.requests[name].length && sockLen < this.maxSockets) { 387 debug('removeSocket, have a request, make a socket'); 388 var req = this.requests[name][0]; 389 // If we have pending requests and a socket gets closed make a new one 390 this.createSocket(req, options, function(err, newSocket) { 391 if (err) { 392 process.nextTick(function() { 393 req.emit('error', err); 394 }); 395 return; 396 } 397 newSocket.emit('free'); 398 }); 399 } 400}; 401 402Agent.prototype.destroy = function destroy() { 403 var sets = [this.freeSockets, this.sockets]; 404 for (var s = 0; s < sets.length; s++) { 405 var set = sets[s]; 406 var keys = Object.keys(set); 407 for (var v = 0; v < keys.length; v++) { 408 var setName = set[keys[v]]; 409 for (var n = 0; n < setName.length; n++) { 410 setName[n].destroy(); 411 } 412 } 413 } 414}; 415 416exports.globalAgent = new Agent(); 417