• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1// Copyright Joyent, Inc. and other Node contributors.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the
5// "Software"), to deal in the Software without restriction, including
6// without limitation the rights to use, copy, modify, merge, publish,
7// distribute, sublicense, and/or sell copies of the Software, and to permit
8// persons to whom the Software is furnished to do so, subject to the
9// following conditions:
10//
11// The above copyright notice and this permission notice shall be included
12// in all copies or substantial portions of the Software.
13//
14// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
15// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
17// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
18// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
19// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
20// USE OR OTHER DEALINGS IN THE SOFTWARE.
21
22'use strict';
23
24const {
25  NumberIsNaN,
26  ObjectKeys,
27  ObjectSetPrototypeOf,
28  ObjectValues,
29  Symbol,
30} = primordials;
31
32const net = require('net');
33const EventEmitter = require('events');
34let debug = require('internal/util/debuglog').debuglog('http', (fn) => {
35  debug = fn;
36});
37const { async_id_symbol } = require('internal/async_hooks').symbols;
38const {
39  codes: {
40    ERR_OUT_OF_RANGE,
41    ERR_INVALID_OPT_VALUE,
42  },
43} = require('internal/errors');
44const { validateNumber } = require('internal/validators');
45
46const kOnKeylog = Symbol('onkeylog');
47const kRequestOptions = Symbol('requestOptions');
48// New Agent code.
49
50// The largest departure from the previous implementation is that
51// an Agent instance holds connections for a variable number of host:ports.
52// Surprisingly, this is still API compatible as far as third parties are
53// concerned. The only code that really notices the difference is the
54// request object.
55
56// Another departure is that all code related to HTTP parsing is in
57// ClientRequest.onSocket(). The Agent is now *strictly*
58// concerned with managing a connection pool.
59
60class ReusedHandle {
61  constructor(type, handle) {
62    this.type = type;
63    this.handle = handle;
64  }
65}
66
67function freeSocketErrorListener(err) {
68  const socket = this;
69  debug('SOCKET ERROR on FREE socket:', err.message, err.stack);
70  socket.destroy();
71  socket.emit('agentRemove');
72}
73
74function Agent(options) {
75  if (!(this instanceof Agent))
76    return new Agent(options);
77
78  EventEmitter.call(this);
79
80  this.defaultPort = 80;
81  this.protocol = 'http:';
82
83  this.options = { ...options };
84
85  // Don't confuse net and make it think that we're connecting to a pipe
86  this.options.path = null;
87  this.requests = {};
88  this.sockets = {};
89  this.freeSockets = {};
90  this.keepAliveMsecs = this.options.keepAliveMsecs || 1000;
91  this.keepAlive = this.options.keepAlive || false;
92  this.maxSockets = this.options.maxSockets || Agent.defaultMaxSockets;
93  this.maxFreeSockets = this.options.maxFreeSockets || 256;
94  this.maxTotalSockets = this.options.maxTotalSockets;
95  this.totalSocketCount = 0;
96
97  if (this.maxTotalSockets !== undefined) {
98    validateNumber(this.maxTotalSockets, 'maxTotalSockets');
99    if (this.maxTotalSockets <= 0 || NumberIsNaN(this.maxTotalSockets))
100      throw new ERR_OUT_OF_RANGE('maxTotalSockets', '> 0',
101                                 this.maxTotalSockets);
102  } else {
103    this.maxTotalSockets = Infinity;
104  }
105
106  this.scheduling = this.options.scheduling || 'fifo';
107
108  if (this.scheduling !== 'fifo' && this.scheduling !== 'lifo') {
109    throw new ERR_INVALID_OPT_VALUE('scheduling', this.scheduling);
110  }
111
112  this.on('free', (socket, options) => {
113    const name = this.getName(options);
114    debug('agent.on(free)', name);
115
116    // TODO(ronag): socket.destroy(err) might have been called
117    // before coming here and have an 'error' scheduled. In the
118    // case of socket.destroy() below this 'error' has no handler
119    // and could cause unhandled exception.
120
121    if (socket.writable &&
122        this.requests[name] && this.requests[name].length) {
123      const req = this.requests[name].shift();
124      setRequestSocket(this, req, socket);
125      if (this.requests[name].length === 0) {
126        // don't leak
127        delete this.requests[name];
128      }
129    } else {
130      // If there are no pending requests, then put it in
131      // the freeSockets pool, but only if we're allowed to do so.
132      const req = socket._httpMessage;
133      if (req &&
134          req.shouldKeepAlive &&
135          socket.writable &&
136          this.keepAlive) {
137        let freeSockets = this.freeSockets[name];
138        const freeLen = freeSockets ? freeSockets.length : 0;
139        let count = freeLen;
140        if (this.sockets[name])
141          count += this.sockets[name].length;
142
143        if (this.totalSocketCount > this.maxTotalSockets ||
144            count > this.maxSockets ||
145            freeLen >= this.maxFreeSockets) {
146          socket.destroy();
147        } else if (this.keepSocketAlive(socket)) {
148          freeSockets = freeSockets || [];
149          this.freeSockets[name] = freeSockets;
150          socket[async_id_symbol] = -1;
151          socket._httpMessage = null;
152          this.removeSocket(socket, options);
153
154          const agentTimeout = this.options.timeout || 0;
155          if (socket.timeout !== agentTimeout) {
156            socket.setTimeout(agentTimeout);
157          }
158
159          socket.once('error', freeSocketErrorListener);
160          freeSockets.push(socket);
161        } else {
162          // Implementation doesn't want to keep socket alive
163          socket.destroy();
164        }
165      } else {
166        socket.destroy();
167      }
168    }
169  });
170
171  // Don't emit keylog events unless there is a listener for them.
172  this.on('newListener', maybeEnableKeylog);
173}
174ObjectSetPrototypeOf(Agent.prototype, EventEmitter.prototype);
175ObjectSetPrototypeOf(Agent, EventEmitter);
176
177function maybeEnableKeylog(eventName) {
178  if (eventName === 'keylog') {
179    this.removeListener('newListener', maybeEnableKeylog);
180    // Future sockets will listen on keylog at creation.
181    const agent = this;
182    this[kOnKeylog] = function onkeylog(keylog) {
183      agent.emit('keylog', keylog, this);
184    };
185    // Existing sockets will start listening on keylog now.
186    for (const socket of ObjectValues(this.sockets)) {
187      socket.on('keylog', this[kOnKeylog]);
188    }
189  }
190}
191
192Agent.defaultMaxSockets = Infinity;
193
194Agent.prototype.createConnection = net.createConnection;
195
196// Get the key for a given set of request options
197Agent.prototype.getName = function getName(options) {
198  let name = options.host || 'localhost';
199
200  name += ':';
201  if (options.port)
202    name += options.port;
203
204  name += ':';
205  if (options.localAddress)
206    name += options.localAddress;
207
208  // Pacify parallel/test-http-agent-getname by only appending
209  // the ':' when options.family is set.
210  if (options.family === 4 || options.family === 6)
211    name += `:${options.family}`;
212
213  if (options.socketPath)
214    name += `:${options.socketPath}`;
215
216  return name;
217};
218
219Agent.prototype.addRequest = function addRequest(req, options, port/* legacy */,
220                                                 localAddress/* legacy */) {
221  // Legacy API: addRequest(req, host, port, localAddress)
222  if (typeof options === 'string') {
223    options = {
224      host: options,
225      port,
226      localAddress
227    };
228  }
229
230  options = { ...options, ...this.options };
231  if (options.socketPath)
232    options.path = options.socketPath;
233
234  if (!options.servername && options.servername !== '')
235    options.servername = calculateServerName(options, req);
236
237  const name = this.getName(options);
238  if (!this.sockets[name]) {
239    this.sockets[name] = [];
240  }
241
242  const freeSockets = this.freeSockets[name];
243  let socket;
244  if (freeSockets) {
245    while (freeSockets.length && freeSockets[0].destroyed) {
246      freeSockets.shift();
247    }
248    socket = this.scheduling === 'fifo' ?
249      freeSockets.shift() :
250      freeSockets.pop();
251    if (!freeSockets.length)
252      delete this.freeSockets[name];
253  }
254
255  const freeLen = freeSockets ? freeSockets.length : 0;
256  const sockLen = freeLen + this.sockets[name].length;
257
258  if (socket) {
259    // Guard against an uninitialized or user supplied Socket.
260    const handle = socket._handle;
261    if (handle && typeof handle.asyncReset === 'function') {
262      // Assign the handle a new asyncId and run any destroy()/init() hooks.
263      handle.asyncReset(new ReusedHandle(handle.getProviderType(), handle));
264      socket[async_id_symbol] = handle.getAsyncId();
265    }
266
267    this.reuseSocket(socket, req);
268    setRequestSocket(this, req, socket);
269    this.sockets[name].push(socket);
270    this.totalSocketCount++;
271  } else if (sockLen < this.maxSockets &&
272             this.totalSocketCount < this.maxTotalSockets) {
273    debug('call onSocket', sockLen, freeLen);
274    // If we are under maxSockets create a new one.
275    this.createSocket(req, options, handleSocketCreation(this, req, true));
276  } else {
277    debug('wait for socket');
278    // We are over limit so we'll add it to the queue.
279    if (!this.requests[name]) {
280      this.requests[name] = [];
281    }
282
283    // Used to create sockets for pending requests from different origin
284    req[kRequestOptions] = options;
285
286    this.requests[name].push(req);
287  }
288};
289
290Agent.prototype.createSocket = function createSocket(req, options, cb) {
291  options = { ...options, ...this.options };
292  if (options.socketPath)
293    options.path = options.socketPath;
294
295  if (!options.servername && options.servername !== '')
296    options.servername = calculateServerName(options, req);
297
298  const name = this.getName(options);
299  options._agentKey = name;
300
301  debug('createConnection', name, options);
302  options.encoding = null;
303  let called = false;
304
305  const oncreate = (err, s) => {
306    if (called)
307      return;
308    called = true;
309    if (err)
310      return cb(err);
311    if (!this.sockets[name]) {
312      this.sockets[name] = [];
313    }
314    this.sockets[name].push(s);
315    this.totalSocketCount++;
316    debug('sockets', name, this.sockets[name].length, this.totalSocketCount);
317    installListeners(this, s, options);
318    cb(null, s);
319  };
320
321  const newSocket = this.createConnection(options, oncreate);
322  if (newSocket)
323    oncreate(null, newSocket);
324};
325
326function calculateServerName(options, req) {
327  let servername = options.host;
328  const hostHeader = req.getHeader('host');
329  if (hostHeader) {
330    // abc => abc
331    // abc:123 => abc
332    // [::1] => ::1
333    // [::1]:123 => ::1
334    if (hostHeader.startsWith('[')) {
335      const index = hostHeader.indexOf(']');
336      if (index === -1) {
337        // Leading '[', but no ']'. Need to do something...
338        servername = hostHeader;
339      } else {
340        servername = hostHeader.substr(1, index - 1);
341      }
342    } else {
343      servername = hostHeader.split(':', 1)[0];
344    }
345  }
346  // Don't implicitly set invalid (IP) servernames.
347  if (net.isIP(servername))
348    servername = '';
349  return servername;
350}
351
352function installListeners(agent, s, options) {
353  function onFree() {
354    debug('CLIENT socket onFree');
355    agent.emit('free', s, options);
356  }
357  s.on('free', onFree);
358
359  function onClose(err) {
360    debug('CLIENT socket onClose');
361    // This is the only place where sockets get removed from the Agent.
362    // If you want to remove a socket from the pool, just close it.
363    // All socket errors end in a close event anyway.
364    agent.removeSocket(s, options);
365  }
366  s.on('close', onClose);
367
368  function onTimeout() {
369    debug('CLIENT socket onTimeout');
370
371    // Destroy if in free list.
372    // TODO(ronag): Always destroy, even if not in free list.
373    const sockets = agent.freeSockets;
374    for (const name of ObjectKeys(sockets)) {
375      if (sockets[name].includes(s)) {
376        return s.destroy();
377      }
378    }
379  }
380  s.on('timeout', onTimeout);
381
382  function onRemove() {
383    // We need this function for cases like HTTP 'upgrade'
384    // (defined by WebSockets) where we need to remove a socket from the
385    // pool because it'll be locked up indefinitely
386    debug('CLIENT socket onRemove');
387    agent.removeSocket(s, options);
388    s.removeListener('close', onClose);
389    s.removeListener('free', onFree);
390    s.removeListener('timeout', onTimeout);
391    s.removeListener('agentRemove', onRemove);
392  }
393  s.on('agentRemove', onRemove);
394
395  if (agent[kOnKeylog]) {
396    s.on('keylog', agent[kOnKeylog]);
397  }
398}
399
400Agent.prototype.removeSocket = function removeSocket(s, options) {
401  const name = this.getName(options);
402  debug('removeSocket', name, 'writable:', s.writable);
403  const sets = [this.sockets];
404
405  // If the socket was destroyed, remove it from the free buffers too.
406  if (!s.writable)
407    sets.push(this.freeSockets);
408
409  for (const sockets of sets) {
410    if (sockets[name]) {
411      const index = sockets[name].indexOf(s);
412      if (index !== -1) {
413        sockets[name].splice(index, 1);
414        // Don't leak
415        if (sockets[name].length === 0)
416          delete sockets[name];
417        this.totalSocketCount--;
418      }
419    }
420  }
421
422  let req;
423  if (this.requests[name] && this.requests[name].length) {
424    debug('removeSocket, have a request, make a socket');
425    req = this.requests[name][0];
426  } else {
427    // TODO(rickyes): this logic will not be FIFO across origins.
428    // There might be older requests in a different origin, but
429    // if the origin which releases the socket has pending requests
430    // that will be prioritized.
431    for (const prop of ObjectKeys(this.requests)) {
432      // Check whether this specific origin is already at maxSockets
433      if (this.sockets[prop] && this.sockets[prop].length) break;
434      debug('removeSocket, have a request with different origin,' +
435        ' make a socket');
436      req = this.requests[prop][0];
437      options = req[kRequestOptions];
438      break;
439    }
440  }
441
442  if (req && options) {
443    req[kRequestOptions] = undefined;
444    // If we have pending requests and a socket gets closed make a new one
445    const socketCreationHandler = handleSocketCreation(this, req, false);
446    this.createSocket(req, options, socketCreationHandler);
447  }
448
449};
450
451Agent.prototype.keepSocketAlive = function keepSocketAlive(socket) {
452  socket.setKeepAlive(true, this.keepAliveMsecs);
453  socket.unref();
454
455  return true;
456};
457
458Agent.prototype.reuseSocket = function reuseSocket(socket, req) {
459  debug('have free socket');
460  socket.removeListener('error', freeSocketErrorListener);
461  req.reusedSocket = true;
462  socket.ref();
463};
464
465Agent.prototype.destroy = function destroy() {
466  for (const set of [this.freeSockets, this.sockets]) {
467    for (const key of ObjectKeys(set)) {
468      for (const setName of set[key]) {
469        setName.destroy();
470      }
471    }
472  }
473};
474
475function handleSocketCreation(agent, request, informRequest) {
476  return function handleSocketCreation_Inner(err, socket) {
477    if (err) {
478      process.nextTick(emitErrorNT, request, err);
479      return;
480    }
481    if (informRequest)
482      setRequestSocket(agent, request, socket);
483    else
484      socket.emit('free');
485  };
486}
487
488function setRequestSocket(agent, req, socket) {
489  req.onSocket(socket);
490  const agentTimeout = agent.options.timeout || 0;
491  if (req.timeout === undefined || req.timeout === agentTimeout) {
492    return;
493  }
494  socket.setTimeout(req.timeout);
495}
496
497function emitErrorNT(emitter, err) {
498  emitter.emit('error', err);
499}
500
501module.exports = {
502  Agent,
503  globalAgent: new Agent()
504};
505