• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $
3  * $Revision: 677240 $
4  * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $
5  *
6  * ====================================================================
7  *
8  *  Licensed to the Apache Software Foundation (ASF) under one or more
9  *  contributor license agreements.  See the NOTICE file distributed with
10  *  this work for additional information regarding copyright ownership.
11  *  The ASF licenses this file to You under the Apache License, Version 2.0
12  *  (the "License"); you may not use this file except in compliance with
13  *  the License.  You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  *  Unless required by applicable law or agreed to in writing, software
18  *  distributed under the License is distributed on an "AS IS" BASIS,
19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  *  See the License for the specific language governing permissions and
21  *  limitations under the License.
22  * ====================================================================
23  *
24  * This software consists of voluntary contributions made by many
25  * individuals on behalf of the Apache Software Foundation.  For more
26  * information on the Apache Software Foundation, please see
27  * <http://www.apache.org/>.
28  *
29  */
30 
31 package org.apache.http.impl.conn.tsccm;
32 
33 import java.util.Date;
34 import java.util.HashMap;
35 import java.util.Iterator;
36 import java.util.Queue;
37 import java.util.LinkedList;
38 import java.util.Map;
39 import java.util.concurrent.locks.Condition;
40 import java.util.concurrent.TimeUnit;
41 
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.http.conn.routing.HttpRoute;
45 import org.apache.http.conn.ClientConnectionOperator;
46 import org.apache.http.conn.ConnectionPoolTimeoutException;
47 import org.apache.http.conn.params.ConnPerRoute;
48 import org.apache.http.conn.params.ConnManagerParams;
49 import org.apache.http.params.HttpParams;
50 
51 
52 /**
53  * A connection pool that maintains connections by route.
54  * This class is derived from <code>MultiThreadedHttpConnectionManager</code>
55  * in HttpClient 3.x, see there for original authors. It implements the same
56  * algorithm for connection re-use and connection-per-host enforcement:
57  * <ul>
58  * <li>connections are re-used only for the exact same route</li>
59  * <li>connection limits are enforced per route rather than per host</li>
60  * </ul>
61  * Note that access to the pool datastructures is synchronized via the
62  * {@link AbstractConnPool#poolLock poolLock} in the base class,
63  * not via <code>synchronized</code> methods.
64  *
65  * @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
66  * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a>
67  * @author and others
68  */
69 public class ConnPoolByRoute extends AbstractConnPool {
70 
71     private final Log log = LogFactory.getLog(getClass());
72 
73     /** Connection operator for this pool */
74     protected final ClientConnectionOperator operator;
75 
76     /** The list of free connections */
77     protected Queue<BasicPoolEntry> freeConnections;
78 
79     /** The list of WaitingThreads waiting for a connection */
80     protected Queue<WaitingThread> waitingThreads;
81 
82     /**
83      * A map of route-specific pools.
84      * Keys are of class {@link HttpRoute},
85      * values of class {@link RouteSpecificPool}.
86      */
87     protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
88 
89     protected final int maxTotalConnections;
90 
91     private final ConnPerRoute connPerRoute;
92 
93     /**
94      * Creates a new connection pool, managed by route.
95      */
ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params)96     public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
97         super();
98         if (operator == null) {
99             throw new IllegalArgumentException("Connection operator may not be null");
100         }
101         this.operator = operator;
102 
103         freeConnections = createFreeConnQueue();
104         waitingThreads  = createWaitingThreadQueue();
105         routeToPool     = createRouteToPoolMap();
106         maxTotalConnections = ConnManagerParams
107             .getMaxTotalConnections(params);
108         connPerRoute = ConnManagerParams
109             .getMaxConnectionsPerRoute(params);
110     }
111 
112 
113     /**
114      * Creates the queue for {@link #freeConnections}.
115      * Called once by the constructor.
116      *
117      * @return  a queue
118      */
createFreeConnQueue()119     protected Queue<BasicPoolEntry> createFreeConnQueue() {
120         return new LinkedList<BasicPoolEntry>();
121     }
122 
123     /**
124      * Creates the queue for {@link #waitingThreads}.
125      * Called once by the constructor.
126      *
127      * @return  a queue
128      */
createWaitingThreadQueue()129     protected Queue<WaitingThread> createWaitingThreadQueue() {
130         return new LinkedList<WaitingThread>();
131     }
132 
133     /**
134      * Creates the map for {@link #routeToPool}.
135      * Called once by the constructor.
136      *
137      * @return  a map
138      */
createRouteToPoolMap()139     protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
140         return new HashMap<HttpRoute, RouteSpecificPool>();
141     }
142 
143 
144     /**
145      * Creates a new route-specific pool.
146      * Called by {@link #getRoutePool} when necessary.
147      *
148      * @param route     the route
149      *
150      * @return  the new pool
151      */
newRouteSpecificPool(HttpRoute route)152     protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
153         return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route));
154     }
155 
156 
157     /**
158      * Creates a new waiting thread.
159      * Called by {@link #getRoutePool} when necessary.
160      *
161      * @param cond      the condition to wait for
162      * @param rospl     the route specific pool, or <code>null</code>
163      *
164      * @return  a waiting thread representation
165      */
newWaitingThread(Condition cond, RouteSpecificPool rospl)166     protected WaitingThread newWaitingThread(Condition cond,
167                                              RouteSpecificPool rospl) {
168         return new WaitingThread(cond, rospl);
169     }
170 
171 
172     /**
173      * Get a route-specific pool of available connections.
174      *
175      * @param route   the route
176      * @param create    whether to create the pool if it doesn't exist
177      *
178      * @return  the pool for the argument route,
179      *     never <code>null</code> if <code>create</code> is <code>true</code>
180      */
getRoutePool(HttpRoute route, boolean create)181     protected RouteSpecificPool getRoutePool(HttpRoute route,
182                                              boolean create) {
183         RouteSpecificPool rospl = null;
184         poolLock.lock();
185         try {
186 
187             rospl = routeToPool.get(route);
188             if ((rospl == null) && create) {
189                 // no pool for this route yet (or anymore)
190                 rospl = newRouteSpecificPool(route);
191                 routeToPool.put(route, rospl);
192             }
193 
194         } finally {
195             poolLock.unlock();
196         }
197 
198         return rospl;
199     }
200 
201 
202     //@@@ consider alternatives for gathering statistics
getConnectionsInPool(HttpRoute route)203     public int getConnectionsInPool(HttpRoute route) {
204 
205         poolLock.lock();
206         try {
207             // don't allow a pool to be created here!
208             RouteSpecificPool rospl = getRoutePool(route, false);
209             return (rospl != null) ? rospl.getEntryCount() : 0;
210 
211         } finally {
212             poolLock.unlock();
213         }
214     }
215 
216     @Override
requestPoolEntry( final HttpRoute route, final Object state)217     public PoolEntryRequest requestPoolEntry(
218             final HttpRoute route,
219             final Object state) {
220 
221         final WaitingThreadAborter aborter = new WaitingThreadAborter();
222 
223         return new PoolEntryRequest() {
224 
225             public void abortRequest() {
226                 poolLock.lock();
227                 try {
228                     aborter.abort();
229                 } finally {
230                     poolLock.unlock();
231                 }
232             }
233 
234             public BasicPoolEntry getPoolEntry(
235                     long timeout,
236                     TimeUnit tunit)
237                         throws InterruptedException, ConnectionPoolTimeoutException {
238                 return getEntryBlocking(route, state, timeout, tunit, aborter);
239             }
240 
241         };
242     }
243 
244     /**
245      * Obtains a pool entry with a connection within the given timeout.
246      * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)}
247      * must be called before blocking, to allow the thread to be interrupted.
248      *
249      * @param route     the route for which to get the connection
250      * @param timeout   the timeout, 0 or negative for no timeout
251      * @param tunit     the unit for the <code>timeout</code>,
252      *                  may be <code>null</code> only if there is no timeout
253      * @param aborter   an object which can abort a {@link WaitingThread}.
254      *
255      * @return  pool entry holding a connection for the route
256      *
257      * @throws ConnectionPoolTimeoutException
258      *         if the timeout expired
259      * @throws InterruptedException
260      *         if the calling thread was interrupted
261      */
262     protected BasicPoolEntry getEntryBlocking(
263                                    HttpRoute route, Object state,
264                                    long timeout, TimeUnit tunit,
265                                    WaitingThreadAborter aborter)
266         throws ConnectionPoolTimeoutException, InterruptedException {
267 
268         Date deadline = null;
269         if (timeout > 0) {
270             deadline = new Date
271                 (System.currentTimeMillis() + tunit.toMillis(timeout));
272         }
273 
274         BasicPoolEntry entry = null;
275         poolLock.lock();
276         try {
277 
278             RouteSpecificPool rospl = getRoutePool(route, true);
279             WaitingThread waitingThread = null;
280 
281             while (entry == null) {
282 
283                 if (isShutDown) {
284                     throw new IllegalStateException
285                         ("Connection pool shut down.");
286                 }
287 
288                 if (log.isDebugEnabled()) {
289                     log.debug("Total connections kept alive: " + freeConnections.size());
290                     log.debug("Total issued connections: " + issuedConnections.size());
291                     log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections);
292                 }
293 
294                 // the cases to check for:
295                 // - have a free connection for that route
296                 // - allowed to create a free connection for that route
297                 // - can delete and replace a free connection for another route
298                 // - need to wait for one of the things above to come true
299 
300                 entry = getFreeEntry(rospl, state);
301                 if (entry != null) {
302                     break;
303                 }
304 
305                 boolean hasCapacity = rospl.getCapacity() > 0;
306 
307                 if (log.isDebugEnabled()) {
308                     log.debug("Available capacity: " + rospl.getCapacity()
309                             + " out of " + rospl.getMaxEntries()
310                             + " [" + route + "][" + state + "]");
311                 }
312 
313                 if (hasCapacity && numConnections < maxTotalConnections) {
314 
315                     entry = createEntry(rospl, operator);
316 
317                 } else if (hasCapacity && !freeConnections.isEmpty()) {
318 
319                     deleteLeastUsedEntry();
320                     entry = createEntry(rospl, operator);
321 
322                 } else {
323 
324                     if (log.isDebugEnabled()) {
325                         log.debug("Need to wait for connection" +
326                                 " [" + route + "][" + state + "]");
327                     }
328 
329                     if (waitingThread == null) {
330                         waitingThread =
331                             newWaitingThread(poolLock.newCondition(), rospl);
332                         aborter.setWaitingThread(waitingThread);
333                     }
334 
335                     boolean success = false;
336                     try {
337                         rospl.queueThread(waitingThread);
338                         waitingThreads.add(waitingThread);
339                         success = waitingThread.await(deadline);
340 
341                     } finally {
342                         // In case of 'success', we were woken up by the
343                         // connection pool and should now have a connection
344                         // waiting for us, or else we're shutting down.
345                         // Just continue in the loop, both cases are checked.
346                         rospl.removeThread(waitingThread);
347                         waitingThreads.remove(waitingThread);
348                     }
349 
350                     // check for spurious wakeup vs. timeout
351                     if (!success && (deadline != null) &&
352                         (deadline.getTime() <= System.currentTimeMillis())) {
353                         throw new ConnectionPoolTimeoutException
354                             ("Timeout waiting for connection");
355                     }
356                 }
357             } // while no entry
358 
359         } finally {
360             poolLock.unlock();
361         }
362 
363         return entry;
364 
365     } // getEntry
366 
367 
368     // non-javadoc, see base class AbstractConnPool
369     @Override
370     public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
371 
372         HttpRoute route = entry.getPlannedRoute();
373         if (log.isDebugEnabled()) {
374             log.debug("Freeing connection" +
375                     " [" + route + "][" + entry.getState() + "]");
376         }
377 
378         poolLock.lock();
379         try {
380             if (isShutDown) {
381                 // the pool is shut down, release the
382                 // connection's resources and get out of here
383                 closeConnection(entry.getConnection());
384                 return;
385             }
386 
387             // no longer issued, we keep a hard reference now
388             issuedConnections.remove(entry.getWeakRef());
389 
390             RouteSpecificPool rospl = getRoutePool(route, true);
391 
392             if (reusable) {
393                 rospl.freeEntry(entry);
394                 freeConnections.add(entry);
395                 idleConnHandler.add(entry.getConnection(), validDuration, timeUnit);
396             } else {
397                 rospl.dropEntry();
398                 numConnections--;
399             }
400 
401             notifyWaitingThread(rospl);
402 
403         } finally {
404             poolLock.unlock();
405         }
406 
407     } // freeEntry
408 
409 
410 
411     /**
412      * If available, get a free pool entry for a route.
413      *
414      * @param rospl       the route-specific pool from which to get an entry
415      *
416      * @return  an available pool entry for the given route, or
417      *          <code>null</code> if none is available
418      */
419     protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
420 
421         BasicPoolEntry entry = null;
422         poolLock.lock();
423         try {
424             boolean done = false;
425             while(!done) {
426 
427                 entry = rospl.allocEntry(state);
428 
429                 if (entry != null) {
430                     if (log.isDebugEnabled()) {
431                         log.debug("Getting free connection"
432                                 + " [" + rospl.getRoute() + "][" + state + "]");
433 
434                     }
435                     freeConnections.remove(entry);
436                     boolean valid = idleConnHandler.remove(entry.getConnection());
437                     if(!valid) {
438                         // If the free entry isn't valid anymore, get rid of it
439                         // and loop to find another one that might be valid.
440                         if(log.isDebugEnabled())
441                             log.debug("Closing expired free connection"
442                                     + " [" + rospl.getRoute() + "][" + state + "]");
443                         closeConnection(entry.getConnection());
444                         // We use dropEntry instead of deleteEntry because the entry
445                         // is no longer "free" (we just allocated it), and deleteEntry
446                         // can only be used to delete free entries.
447                         rospl.dropEntry();
448                         numConnections--;
449                     } else {
450                         issuedConnections.add(entry.getWeakRef());
451                         done = true;
452                     }
453 
454                 } else {
455                     done = true;
456                     if (log.isDebugEnabled()) {
457                         log.debug("No free connections"
458                                 + " [" + rospl.getRoute() + "][" + state + "]");
459                     }
460                 }
461             }
462         } finally {
463             poolLock.unlock();
464         }
465 
466         return entry;
467     }
468 
469 
470     /**
471      * Creates a new pool entry.
472      * This method assumes that the new connection will be handed
473      * out immediately.
474      *
475      * @param rospl       the route-specific pool for which to create the entry
476      * @param op        the operator for creating a connection
477      *
478      * @return  the new pool entry for a new connection
479      */
480     protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
481                                          ClientConnectionOperator op) {
482 
483         if (log.isDebugEnabled()) {
484             log.debug("Creating new connection [" + rospl.getRoute() + "]");
485         }
486 
487         // the entry will create the connection when needed
488         BasicPoolEntry entry =
489             new BasicPoolEntry(op, rospl.getRoute(), refQueue);
490 
491         poolLock.lock();
492         try {
493 
494             rospl.createdEntry(entry);
495             numConnections++;
496 
497             issuedConnections.add(entry.getWeakRef());
498 
499         } finally {
500             poolLock.unlock();
501         }
502 
503         return entry;
504     }
505 
506 
507     /**
508      * Deletes a given pool entry.
509      * This closes the pooled connection and removes all references,
510      * so that it can be GCed.
511      *
512      * <p><b>Note:</b> Does not remove the entry from the freeConnections list.
513      * It is assumed that the caller has already handled this step.</p>
514      * <!-- @@@ is that a good idea? or rather fix it? -->
515      *
516      * @param entry         the pool entry for the connection to delete
517      */
518     protected void deleteEntry(BasicPoolEntry entry) {
519 
520         HttpRoute route = entry.getPlannedRoute();
521 
522         if (log.isDebugEnabled()) {
523             log.debug("Deleting connection"
524                     + " [" + route + "][" + entry.getState() + "]");
525         }
526 
527         poolLock.lock();
528         try {
529 
530             closeConnection(entry.getConnection());
531 
532             RouteSpecificPool rospl = getRoutePool(route, true);
533             rospl.deleteEntry(entry);
534             numConnections--;
535             if (rospl.isUnused()) {
536                 routeToPool.remove(route);
537             }
538 
539             idleConnHandler.remove(entry.getConnection());// not idle, but dead
540 
541         } finally {
542             poolLock.unlock();
543         }
544     }
545 
546 
547     /**
548      * Delete an old, free pool entry to make room for a new one.
549      * Used to replace pool entries with ones for a different route.
550      */
551     protected void deleteLeastUsedEntry() {
552 
553         try {
554             poolLock.lock();
555 
556             //@@@ with get() instead of remove, we could
557             //@@@ leave the removing to deleteEntry()
558             BasicPoolEntry entry = freeConnections.remove();
559 
560             if (entry != null) {
561                 deleteEntry(entry);
562             } else if (log.isDebugEnabled()) {
563                 log.debug("No free connection to delete.");
564             }
565 
566         } finally {
567             poolLock.unlock();
568         }
569     }
570 
571 
572     // non-javadoc, see base class AbstractConnPool
573     @Override
574     protected void handleLostEntry(HttpRoute route) {
575 
576         poolLock.lock();
577         try {
578 
579             RouteSpecificPool rospl = getRoutePool(route, true);
580             rospl.dropEntry();
581             if (rospl.isUnused()) {
582                 routeToPool.remove(route);
583             }
584 
585             numConnections--;
586             notifyWaitingThread(rospl);
587 
588         } finally {
589             poolLock.unlock();
590         }
591     }
592 
593 
594     /**
595      * Notifies a waiting thread that a connection is available.
596      * This will wake a thread waiting in the specific route pool,
597      * if there is one.
598      * Otherwise, a thread in the connection pool will be notified.
599      *
600      * @param rospl     the pool in which to notify, or <code>null</code>
601      */
602     protected void notifyWaitingThread(RouteSpecificPool rospl) {
603 
604         //@@@ while this strategy provides for best connection re-use,
605         //@@@ is it fair? only do this if the connection is open?
606         // Find the thread we are going to notify. We want to ensure that
607         // each waiting thread is only interrupted once, so we will remove
608         // it from all wait queues before interrupting.
609         WaitingThread waitingThread = null;
610 
611         poolLock.lock();
612         try {
613 
614             if ((rospl != null) && rospl.hasThread()) {
615                 if (log.isDebugEnabled()) {
616                     log.debug("Notifying thread waiting on pool" +
617                             " [" + rospl.getRoute() + "]");
618                 }
619                 waitingThread = rospl.nextThread();
620             } else if (!waitingThreads.isEmpty()) {
621                 if (log.isDebugEnabled()) {
622                     log.debug("Notifying thread waiting on any pool");
623                 }
624                 waitingThread = waitingThreads.remove();
625             } else if (log.isDebugEnabled()) {
626                 log.debug("Notifying no-one, there are no waiting threads");
627             }
628 
629             if (waitingThread != null) {
630                 waitingThread.wakeup();
631             }
632 
633         } finally {
634             poolLock.unlock();
635         }
636     }
637 
638 
639     //@@@ revise this cleanup stuff
640     //@@@ move method to base class when deleteEntry() is fixed
641     // non-javadoc, see base class AbstractConnPool
642     @Override
643     public void deleteClosedConnections() {
644 
645         poolLock.lock();
646         try {
647 
648             Iterator<BasicPoolEntry>  iter = freeConnections.iterator();
649             while (iter.hasNext()) {
650                 BasicPoolEntry entry = iter.next();
651                 if (!entry.getConnection().isOpen()) {
652                     iter.remove();
653                     deleteEntry(entry);
654                 }
655             }
656 
657         } finally {
658             poolLock.unlock();
659         }
660     }
661 
662 
663     // non-javadoc, see base class AbstractConnPool
664     @Override
665     public void shutdown() {
666 
667         poolLock.lock();
668         try {
669 
670             super.shutdown();
671 
672             // close all free connections
673             //@@@ move this to base class?
674             Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
675             while (ibpe.hasNext()) {
676                 BasicPoolEntry entry = ibpe.next();
677                 ibpe.remove();
678                 closeConnection(entry.getConnection());
679             }
680 
681             // wake up all waiting threads
682             Iterator<WaitingThread> iwth = waitingThreads.iterator();
683             while (iwth.hasNext()) {
684                 WaitingThread waiter = iwth.next();
685                 iwth.remove();
686                 waiter.wakeup();
687             }
688 
689             routeToPool.clear();
690 
691         } finally {
692             poolLock.unlock();
693         }
694     }
695 
696 
697 } // class ConnPoolByRoute
698 
699