• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22// patch from https://github.com/nodejs/node/blob/v7.2.1/lib/_http_agent.js
23
24'use strict';
25
26const net = require('net');
27const util = require('util');
28const EventEmitter = require('events');
29const debug = util.debuglog('http');
30
31// New Agent code.
32
33// The largest departure from the previous implementation is that
34// an Agent instance holds connections for a variable number of host:ports.
35// Surprisingly, this is still API compatible as far as third parties are
36// concerned. The only code that really notices the difference is the
37// request object.
38
39// Another departure is that all code related to HTTP parsing is in
40// ClientRequest.onSocket(). The Agent is now *strictly*
41// concerned with managing a connection pool.
42
43function Agent(options) {
44  if (!(this instanceof Agent))
45    return new Agent(options);
46
47  EventEmitter.call(this);
48
49  var self = this;
50
51  self.defaultPort = 80;
52  self.protocol = 'http:';
53
54  self.options = util._extend({}, options);
55
56  // don't confuse net and make it think that we're connecting to a pipe
57  self.options.path = null;
58  self.requests = {};
59  self.sockets = {};
60  self.freeSockets = {};
61  self.keepAliveMsecs = self.options.keepAliveMsecs || 1000;
62  self.keepAlive = self.options.keepAlive || false;
63  self.maxSockets = self.options.maxSockets || Agent.defaultMaxSockets;
64  self.maxFreeSockets = self.options.maxFreeSockets || 256;
65
66  // [patch start]
67  // free keep-alive socket timeout. By default free socket do not have a timeout.
68  self.freeSocketKeepAliveTimeout = self.options.freeSocketKeepAliveTimeout || 0;
69  // working socket timeout. By default working socket do not have a timeout.
70  self.timeout = self.options.timeout || 0;
71  // the socket active time to live, even if it's in use
72  this.socketActiveTTL = this.options.socketActiveTTL || null;
73  // [patch end]
74
75  self.on('free', function(socket, options) {
76    var name = self.getName(options);
77    debug('agent.on(free)', name);
78
79    if (socket.writable &&
80        self.requests[name] && self.requests[name].length) {
81      // [patch start]
82      debug('continue handle next request');
83      // [patch end]
84      self.requests[name].shift().onSocket(socket);
85      if (self.requests[name].length === 0) {
86        // don't leak
87        delete self.requests[name];
88      }
89    } else {
90      // If there are no pending requests, then put it in
91      // the freeSockets pool, but only if we're allowed to do so.
92      var req = socket._httpMessage;
93      if (req &&
94          req.shouldKeepAlive &&
95          socket.writable &&
96          self.keepAlive) {
97        var freeSockets = self.freeSockets[name];
98        var freeLen = freeSockets ? freeSockets.length : 0;
99        var count = freeLen;
100        if (self.sockets[name])
101          count += self.sockets[name].length;
102
103        if (count > self.maxSockets || freeLen >= self.maxFreeSockets) {
104          socket.destroy();
105        } else {
106          freeSockets = freeSockets || [];
107          self.freeSockets[name] = freeSockets;
108          socket.setKeepAlive(true, self.keepAliveMsecs);
109          socket.unref();
110          socket._httpMessage = null;
111          self.removeSocket(socket, options);
112          freeSockets.push(socket);
113
114          // [patch start]
115          // Add a default error handler to avoid Unhandled 'error' event throw on idle socket
116          // https://github.com/node-modules/agentkeepalive/issues/25
117          // https://github.com/nodejs/node/pull/4482 (fixed in >= 4.4.0 and >= 5.4.0)
118          if (socket.listeners('error').length === 0) {
119            socket.once('error', freeSocketErrorListener);
120          }
121          // set free keepalive timer
122          // try to use socket custom freeSocketKeepAliveTimeout first
123          const freeSocketKeepAliveTimeout = socket.freeSocketKeepAliveTimeout || self.freeSocketKeepAliveTimeout;
124          socket.setTimeout(freeSocketKeepAliveTimeout);
125          debug(`push to free socket queue and wait for ${freeSocketKeepAliveTimeout}ms`);
126          // [patch end]
127        }
128      } else {
129        socket.destroy();
130      }
131    }
132  });
133}
134
135util.inherits(Agent, EventEmitter);
136exports.Agent = Agent;
137
138// [patch start]
139function freeSocketErrorListener(err) {
140  var socket = this;
141  debug('SOCKET ERROR on FREE socket:', err.message, err.stack);
142  socket.destroy();
143  socket.emit('agentRemove');
144}
145// [patch end]
146
147Agent.defaultMaxSockets = Infinity;
148
149Agent.prototype.createConnection = net.createConnection;
150
151// Get the key for a given set of request options
152Agent.prototype.getName = function getName(options) {
153  var name = options.host || 'localhost';
154
155  name += ':';
156  if (options.port)
157    name += options.port;
158
159  name += ':';
160  if (options.localAddress)
161    name += options.localAddress;
162
163  // Pacify parallel/test-http-agent-getname by only appending
164  // the ':' when options.family is set.
165  if (options.family === 4 || options.family === 6)
166    name += ':' + options.family;
167
168  return name;
169};
170
171// [patch start]
172function handleSocketCreation(req) {
173  return function(err, newSocket) {
174    if (err) {
175      process.nextTick(function() {
176        req.emit('error', err);
177      });
178      return;
179    }
180    req.onSocket(newSocket);
181  }
182}
183// [patch end]
184
185Agent.prototype.addRequest = function addRequest(req, options, port/*legacy*/,
186                                                 localAddress/*legacy*/) {
187  // Legacy API: addRequest(req, host, port, localAddress)
188  if (typeof options === 'string') {
189    options = {
190      host: options,
191      port,
192      localAddress
193    };
194  }
195
196  options = util._extend({}, options);
197  options = util._extend(options, this.options);
198
199  if (!options.servername)
200    options.servername = calculateServerName(options, req);
201
202  var name = this.getName(options);
203  if (!this.sockets[name]) {
204    this.sockets[name] = [];
205  }
206
207  var freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0;
208  var sockLen = freeLen + this.sockets[name].length;
209
210  if (freeLen) {
211    // we have a free socket, so use that.
212    var socket = this.freeSockets[name].shift();
213    debug('have free socket');
214
215    // [patch start]
216    // remove free socket error event handler
217    socket.removeListener('error', freeSocketErrorListener);
218    // restart the default timer
219    socket.setTimeout(this.timeout);
220
221    if (this.socketActiveTTL && Date.now() - socket.createdTime > this.socketActiveTTL) {
222      debug(`socket ${socket.createdTime} expired`);
223      socket.destroy();
224      return this.createSocket(req, options, handleSocketCreation(req));
225    }
226    // [patch end]
227
228    // don't leak
229    if (!this.freeSockets[name].length)
230      delete this.freeSockets[name];
231
232    socket.ref();
233    req.onSocket(socket);
234    this.sockets[name].push(socket);
235  } else if (sockLen < this.maxSockets) {
236    debug('call onSocket', sockLen, freeLen);
237    // If we are under maxSockets create a new one.
238    // [patch start]
239    this.createSocket(req, options, handleSocketCreation(req));
240    // [patch end]
241  } else {
242    debug('wait for socket');
243    // We are over limit so we'll add it to the queue.
244    if (!this.requests[name]) {
245      this.requests[name] = [];
246    }
247    this.requests[name].push(req);
248  }
249};
250
251Agent.prototype.createSocket = function createSocket(req, options, cb) {
252  var self = this;
253  options = util._extend({}, options);
254  options = util._extend(options, self.options);
255
256  if (!options.servername)
257    options.servername = calculateServerName(options, req);
258
259  var name = self.getName(options);
260  options._agentKey = name;
261
262  debug('createConnection', name, options);
263  options.encoding = null;
264  var called = false;
265  const newSocket = self.createConnection(options, oncreate);
266  // [patch start]
267  if (newSocket) {
268    oncreate(null, Object.assign(newSocket, { createdTime: Date.now() }));
269  }
270  // [patch end]
271  function oncreate(err, s) {
272    if (called)
273      return;
274    called = true;
275    if (err)
276      return cb(err);
277    if (!self.sockets[name]) {
278      self.sockets[name] = [];
279    }
280    self.sockets[name].push(s);
281    debug('sockets', name, self.sockets[name].length);
282
283    function onFree() {
284      self.emit('free', s, options);
285    }
286    s.on('free', onFree);
287
288    function onClose(err) {
289      debug('CLIENT socket onClose');
290      // This is the only place where sockets get removed from the Agent.
291      // If you want to remove a socket from the pool, just close it.
292      // All socket errors end in a close event anyway.
293      self.removeSocket(s, options);
294
295      // [patch start]
296      self.emit('close');
297      // [patch end]
298    }
299    s.on('close', onClose);
300
301    // [patch start]
302    // start socket timeout handler
303    function onTimeout() {
304      debug('CLIENT socket onTimeout');
305      s.destroy();
306      // Remove it from freeSockets immediately to prevent new requests from being sent through this socket.
307      self.removeSocket(s, options);
308      self.emit('timeout');
309    }
310    s.on('timeout', onTimeout);
311    // set the default timer
312    s.setTimeout(self.timeout);
313    // [patch end]
314
315    function onRemove() {
316      // We need this function for cases like HTTP 'upgrade'
317      // (defined by WebSockets) where we need to remove a socket from the
318      // pool because it'll be locked up indefinitely
319      debug('CLIENT socket onRemove');
320      self.removeSocket(s, options);
321      s.removeListener('close', onClose);
322      s.removeListener('free', onFree);
323      s.removeListener('agentRemove', onRemove);
324
325      // [patch start]
326      // remove socket timeout handler
327      s.setTimeout(0, onTimeout);
328      // [patch end]
329    }
330    s.on('agentRemove', onRemove);
331    cb(null, s);
332  }
333};
334
335function calculateServerName(options, req) {
336  let servername = options.host;
337  const hostHeader = req.getHeader('host');
338  if (hostHeader) {
339    // abc => abc
340    // abc:123 => abc
341    // [::1] => ::1
342    // [::1]:123 => ::1
343    if (hostHeader.startsWith('[')) {
344      const index = hostHeader.indexOf(']');
345      if (index === -1) {
346        // Leading '[', but no ']'. Need to do something...
347        servername = hostHeader;
348      } else {
349        servername = hostHeader.substr(1, index - 1);
350      }
351    } else {
352      servername = hostHeader.split(':', 1)[0];
353    }
354  }
355  return servername;
356}
357
358Agent.prototype.removeSocket = function removeSocket(s, options) {
359  var name = this.getName(options);
360  debug('removeSocket', name, 'writable:', s.writable);
361  var sets = [this.sockets];
362
363  // If the socket was destroyed, remove it from the free buffers too.
364  if (!s.writable)
365    sets.push(this.freeSockets);
366
367  for (var sk = 0; sk < sets.length; sk++) {
368    var sockets = sets[sk];
369
370    if (sockets[name]) {
371      var index = sockets[name].indexOf(s);
372      if (index !== -1) {
373        sockets[name].splice(index, 1);
374        // Don't leak
375        if (sockets[name].length === 0)
376          delete sockets[name];
377      }
378    }
379  }
380
381  // [patch start]
382  var freeLen = this.freeSockets[name] ? this.freeSockets[name].length : 0;
383  var sockLen = freeLen + (this.sockets[name] ? this.sockets[name].length : 0);
384  // [patch end]
385
386  if (this.requests[name] && this.requests[name].length && sockLen < this.maxSockets) {
387    debug('removeSocket, have a request, make a socket');
388    var req = this.requests[name][0];
389    // If we have pending requests and a socket gets closed make a new one
390    this.createSocket(req, options, function(err, newSocket) {
391      if (err) {
392        process.nextTick(function() {
393          req.emit('error', err);
394        });
395        return;
396      }
397      newSocket.emit('free');
398    });
399  }
400};
401
402Agent.prototype.destroy = function destroy() {
403  var sets = [this.freeSockets, this.sockets];
404  for (var s = 0; s < sets.length; s++) {
405    var set = sets[s];
406    var keys = Object.keys(set);
407    for (var v = 0; v < keys.length; v++) {
408      var setName = set[keys[v]];
409      for (var n = 0; n < setName.length; n++) {
410        setName[n].destroy();
411      }
412    }
413  }
414};
415
416exports.globalAgent = new Agent();
417