• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $
3  * $Revision: 677240 $
4  * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $
5  *
6  * ====================================================================
7  *
8  *  Licensed to the Apache Software Foundation (ASF) under one or more
9  *  contributor license agreements.  See the NOTICE file distributed with
10  *  this work for additional information regarding copyright ownership.
11  *  The ASF licenses this file to You under the Apache License, Version 2.0
12  *  (the "License"); you may not use this file except in compliance with
13  *  the License.  You may obtain a copy of the License at
14  *
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  *
17  *  Unless required by applicable law or agreed to in writing, software
18  *  distributed under the License is distributed on an "AS IS" BASIS,
19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  *  See the License for the specific language governing permissions and
21  *  limitations under the License.
22  * ====================================================================
23  *
24  * This software consists of voluntary contributions made by many
25  * individuals on behalf of the Apache Software Foundation.  For more
26  * information on the Apache Software Foundation, please see
27  * <http://www.apache.org/>.
28  *
29  */
30 
31 package org.apache.http.impl.conn.tsccm;
32 
33 import java.util.Date;
34 import java.util.HashMap;
35 import java.util.Iterator;
36 import java.util.Queue;
37 import java.util.LinkedList;
38 import java.util.Map;
39 import java.util.concurrent.locks.Condition;
40 import java.util.concurrent.TimeUnit;
41 
42 import org.apache.commons.logging.Log;
43 import org.apache.commons.logging.LogFactory;
44 import org.apache.http.conn.routing.HttpRoute;
45 import org.apache.http.conn.ClientConnectionOperator;
46 import org.apache.http.conn.ConnectionPoolTimeoutException;
47 import org.apache.http.conn.params.ConnPerRoute;
48 import org.apache.http.conn.params.ConnManagerParams;
49 import org.apache.http.params.HttpParams;
50 
51 
52 /**
53  * A connection pool that maintains connections by route.
54  * This class is derived from <code>MultiThreadedHttpConnectionManager</code>
55  * in HttpClient 3.x, see there for original authors. It implements the same
56  * algorithm for connection re-use and connection-per-host enforcement:
57  * <ul>
58  * <li>connections are re-used only for the exact same route</li>
59  * <li>connection limits are enforced per route rather than per host</li>
60  * </ul>
61  * Note that access to the pool datastructures is synchronized via the
62  * {@link AbstractConnPool#poolLock poolLock} in the base class,
63  * not via <code>synchronized</code> methods.
64  *
65  * @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
66  * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a>
67  * @author and others
68  *
69  * @deprecated Please use {@link java.net.URL#openConnection} instead.
70  *     Please visit <a href="http://android-developers.blogspot.com/2011/09/androids-http-clients.html">this webpage</a>
71  *     for further details.
72  */
73 @Deprecated
74 public class ConnPoolByRoute extends AbstractConnPool {
75 
76     private final Log log = LogFactory.getLog(getClass());
77 
78     /** Connection operator for this pool */
79     protected final ClientConnectionOperator operator;
80 
81     /** The list of free connections */
82     protected Queue<BasicPoolEntry> freeConnections;
83 
84     /** The list of WaitingThreads waiting for a connection */
85     protected Queue<WaitingThread> waitingThreads;
86 
87     /**
88      * A map of route-specific pools.
89      * Keys are of class {@link HttpRoute},
90      * values of class {@link RouteSpecificPool}.
91      */
92     protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
93 
94     protected final int maxTotalConnections;
95 
96     private final ConnPerRoute connPerRoute;
97 
98     /**
99      * Creates a new connection pool, managed by route.
100      */
ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params)101     public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
102         super();
103         if (operator == null) {
104             throw new IllegalArgumentException("Connection operator may not be null");
105         }
106         this.operator = operator;
107 
108         freeConnections = createFreeConnQueue();
109         waitingThreads  = createWaitingThreadQueue();
110         routeToPool     = createRouteToPoolMap();
111         maxTotalConnections = ConnManagerParams
112             .getMaxTotalConnections(params);
113         connPerRoute = ConnManagerParams
114             .getMaxConnectionsPerRoute(params);
115     }
116 
117 
118     /**
119      * Creates the queue for {@link #freeConnections}.
120      * Called once by the constructor.
121      *
122      * @return  a queue
123      */
createFreeConnQueue()124     protected Queue<BasicPoolEntry> createFreeConnQueue() {
125         return new LinkedList<BasicPoolEntry>();
126     }
127 
128     /**
129      * Creates the queue for {@link #waitingThreads}.
130      * Called once by the constructor.
131      *
132      * @return  a queue
133      */
createWaitingThreadQueue()134     protected Queue<WaitingThread> createWaitingThreadQueue() {
135         return new LinkedList<WaitingThread>();
136     }
137 
138     /**
139      * Creates the map for {@link #routeToPool}.
140      * Called once by the constructor.
141      *
142      * @return  a map
143      */
createRouteToPoolMap()144     protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
145         return new HashMap<HttpRoute, RouteSpecificPool>();
146     }
147 
148 
149     /**
150      * Creates a new route-specific pool.
151      * Called by {@link #getRoutePool} when necessary.
152      *
153      * @param route     the route
154      *
155      * @return  the new pool
156      */
newRouteSpecificPool(HttpRoute route)157     protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
158         return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route));
159     }
160 
161 
162     /**
163      * Creates a new waiting thread.
164      * Called by {@link #getRoutePool} when necessary.
165      *
166      * @param cond      the condition to wait for
167      * @param rospl     the route specific pool, or <code>null</code>
168      *
169      * @return  a waiting thread representation
170      */
newWaitingThread(Condition cond, RouteSpecificPool rospl)171     protected WaitingThread newWaitingThread(Condition cond,
172                                              RouteSpecificPool rospl) {
173         return new WaitingThread(cond, rospl);
174     }
175 
176 
177     /**
178      * Get a route-specific pool of available connections.
179      *
180      * @param route   the route
181      * @param create    whether to create the pool if it doesn't exist
182      *
183      * @return  the pool for the argument route,
184      *     never <code>null</code> if <code>create</code> is <code>true</code>
185      */
getRoutePool(HttpRoute route, boolean create)186     protected RouteSpecificPool getRoutePool(HttpRoute route,
187                                              boolean create) {
188         RouteSpecificPool rospl = null;
189         poolLock.lock();
190         try {
191 
192             rospl = routeToPool.get(route);
193             if ((rospl == null) && create) {
194                 // no pool for this route yet (or anymore)
195                 rospl = newRouteSpecificPool(route);
196                 routeToPool.put(route, rospl);
197             }
198 
199         } finally {
200             poolLock.unlock();
201         }
202 
203         return rospl;
204     }
205 
206 
207     //@@@ consider alternatives for gathering statistics
getConnectionsInPool(HttpRoute route)208     public int getConnectionsInPool(HttpRoute route) {
209 
210         poolLock.lock();
211         try {
212             // don't allow a pool to be created here!
213             RouteSpecificPool rospl = getRoutePool(route, false);
214             return (rospl != null) ? rospl.getEntryCount() : 0;
215 
216         } finally {
217             poolLock.unlock();
218         }
219     }
220 
221     @Override
requestPoolEntry( final HttpRoute route, final Object state)222     public PoolEntryRequest requestPoolEntry(
223             final HttpRoute route,
224             final Object state) {
225 
226         final WaitingThreadAborter aborter = new WaitingThreadAborter();
227 
228         return new PoolEntryRequest() {
229 
230             public void abortRequest() {
231                 poolLock.lock();
232                 try {
233                     aborter.abort();
234                 } finally {
235                     poolLock.unlock();
236                 }
237             }
238 
239             public BasicPoolEntry getPoolEntry(
240                     long timeout,
241                     TimeUnit tunit)
242                         throws InterruptedException, ConnectionPoolTimeoutException {
243                 return getEntryBlocking(route, state, timeout, tunit, aborter);
244             }
245 
246         };
247     }
248 
249     /**
250      * Obtains a pool entry with a connection within the given timeout.
251      * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)}
252      * must be called before blocking, to allow the thread to be interrupted.
253      *
254      * @param route     the route for which to get the connection
255      * @param timeout   the timeout, 0 or negative for no timeout
256      * @param tunit     the unit for the <code>timeout</code>,
257      *                  may be <code>null</code> only if there is no timeout
258      * @param aborter   an object which can abort a {@link WaitingThread}.
259      *
260      * @return  pool entry holding a connection for the route
261      *
262      * @throws ConnectionPoolTimeoutException
263      *         if the timeout expired
264      * @throws InterruptedException
265      *         if the calling thread was interrupted
266      */
267     protected BasicPoolEntry getEntryBlocking(
268                                    HttpRoute route, Object state,
269                                    long timeout, TimeUnit tunit,
270                                    WaitingThreadAborter aborter)
271         throws ConnectionPoolTimeoutException, InterruptedException {
272 
273         Date deadline = null;
274         if (timeout > 0) {
275             deadline = new Date
276                 (System.currentTimeMillis() + tunit.toMillis(timeout));
277         }
278 
279         BasicPoolEntry entry = null;
280         poolLock.lock();
281         try {
282 
283             RouteSpecificPool rospl = getRoutePool(route, true);
284             WaitingThread waitingThread = null;
285 
286             while (entry == null) {
287 
288                 if (isShutDown) {
289                     throw new IllegalStateException
290                         ("Connection pool shut down.");
291                 }
292 
293                 if (log.isDebugEnabled()) {
294                     log.debug("Total connections kept alive: " + freeConnections.size());
295                     log.debug("Total issued connections: " + issuedConnections.size());
296                     log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections);
297                 }
298 
299                 // the cases to check for:
300                 // - have a free connection for that route
301                 // - allowed to create a free connection for that route
302                 // - can delete and replace a free connection for another route
303                 // - need to wait for one of the things above to come true
304 
305                 entry = getFreeEntry(rospl, state);
306                 if (entry != null) {
307                     break;
308                 }
309 
310                 boolean hasCapacity = rospl.getCapacity() > 0;
311 
312                 if (log.isDebugEnabled()) {
313                     log.debug("Available capacity: " + rospl.getCapacity()
314                             + " out of " + rospl.getMaxEntries()
315                             + " [" + route + "][" + state + "]");
316                 }
317 
318                 if (hasCapacity && numConnections < maxTotalConnections) {
319 
320                     entry = createEntry(rospl, operator);
321 
322                 } else if (hasCapacity && !freeConnections.isEmpty()) {
323 
324                     deleteLeastUsedEntry();
325                     entry = createEntry(rospl, operator);
326 
327                 } else {
328 
329                     if (log.isDebugEnabled()) {
330                         log.debug("Need to wait for connection" +
331                                 " [" + route + "][" + state + "]");
332                     }
333 
334                     if (waitingThread == null) {
335                         waitingThread =
336                             newWaitingThread(poolLock.newCondition(), rospl);
337                         aborter.setWaitingThread(waitingThread);
338                     }
339 
340                     boolean success = false;
341                     try {
342                         rospl.queueThread(waitingThread);
343                         waitingThreads.add(waitingThread);
344                         success = waitingThread.await(deadline);
345 
346                     } finally {
347                         // In case of 'success', we were woken up by the
348                         // connection pool and should now have a connection
349                         // waiting for us, or else we're shutting down.
350                         // Just continue in the loop, both cases are checked.
351                         rospl.removeThread(waitingThread);
352                         waitingThreads.remove(waitingThread);
353                     }
354 
355                     // check for spurious wakeup vs. timeout
356                     if (!success && (deadline != null) &&
357                         (deadline.getTime() <= System.currentTimeMillis())) {
358                         throw new ConnectionPoolTimeoutException
359                             ("Timeout waiting for connection");
360                     }
361                 }
362             } // while no entry
363 
364         } finally {
365             poolLock.unlock();
366         }
367 
368         return entry;
369 
370     } // getEntry
371 
372 
373     // non-javadoc, see base class AbstractConnPool
374     @Override
375     public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
376 
377         HttpRoute route = entry.getPlannedRoute();
378         if (log.isDebugEnabled()) {
379             log.debug("Freeing connection" +
380                     " [" + route + "][" + entry.getState() + "]");
381         }
382 
383         poolLock.lock();
384         try {
385             if (isShutDown) {
386                 // the pool is shut down, release the
387                 // connection's resources and get out of here
388                 closeConnection(entry.getConnection());
389                 return;
390             }
391 
392             // no longer issued, we keep a hard reference now
393             issuedConnections.remove(entry.getWeakRef());
394 
395             RouteSpecificPool rospl = getRoutePool(route, true);
396 
397             if (reusable) {
398                 rospl.freeEntry(entry);
399                 freeConnections.add(entry);
400                 idleConnHandler.add(entry.getConnection(), validDuration, timeUnit);
401             } else {
402                 rospl.dropEntry();
403                 numConnections--;
404             }
405 
406             notifyWaitingThread(rospl);
407 
408         } finally {
409             poolLock.unlock();
410         }
411 
412     } // freeEntry
413 
414 
415 
416     /**
417      * If available, get a free pool entry for a route.
418      *
419      * @param rospl       the route-specific pool from which to get an entry
420      *
421      * @return  an available pool entry for the given route, or
422      *          <code>null</code> if none is available
423      */
424     protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
425 
426         BasicPoolEntry entry = null;
427         poolLock.lock();
428         try {
429             boolean done = false;
430             while(!done) {
431 
432                 entry = rospl.allocEntry(state);
433 
434                 if (entry != null) {
435                     if (log.isDebugEnabled()) {
436                         log.debug("Getting free connection"
437                                 + " [" + rospl.getRoute() + "][" + state + "]");
438 
439                     }
440                     freeConnections.remove(entry);
441                     boolean valid = idleConnHandler.remove(entry.getConnection());
442                     if(!valid) {
443                         // If the free entry isn't valid anymore, get rid of it
444                         // and loop to find another one that might be valid.
445                         if(log.isDebugEnabled())
446                             log.debug("Closing expired free connection"
447                                     + " [" + rospl.getRoute() + "][" + state + "]");
448                         closeConnection(entry.getConnection());
449                         // We use dropEntry instead of deleteEntry because the entry
450                         // is no longer "free" (we just allocated it), and deleteEntry
451                         // can only be used to delete free entries.
452                         rospl.dropEntry();
453                         numConnections--;
454                     } else {
455                         issuedConnections.add(entry.getWeakRef());
456                         done = true;
457                     }
458 
459                 } else {
460                     done = true;
461                     if (log.isDebugEnabled()) {
462                         log.debug("No free connections"
463                                 + " [" + rospl.getRoute() + "][" + state + "]");
464                     }
465                 }
466             }
467         } finally {
468             poolLock.unlock();
469         }
470 
471         return entry;
472     }
473 
474 
475     /**
476      * Creates a new pool entry.
477      * This method assumes that the new connection will be handed
478      * out immediately.
479      *
480      * @param rospl       the route-specific pool for which to create the entry
481      * @param op        the operator for creating a connection
482      *
483      * @return  the new pool entry for a new connection
484      */
485     protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
486                                          ClientConnectionOperator op) {
487 
488         if (log.isDebugEnabled()) {
489             log.debug("Creating new connection [" + rospl.getRoute() + "]");
490         }
491 
492         // the entry will create the connection when needed
493         BasicPoolEntry entry =
494             new BasicPoolEntry(op, rospl.getRoute(), refQueue);
495 
496         poolLock.lock();
497         try {
498 
499             rospl.createdEntry(entry);
500             numConnections++;
501 
502             issuedConnections.add(entry.getWeakRef());
503 
504         } finally {
505             poolLock.unlock();
506         }
507 
508         return entry;
509     }
510 
511 
512     /**
513      * Deletes a given pool entry.
514      * This closes the pooled connection and removes all references,
515      * so that it can be GCed.
516      *
517      * <p><b>Note:</b> Does not remove the entry from the freeConnections list.
518      * It is assumed that the caller has already handled this step.</p>
519      * <!-- @@@ is that a good idea? or rather fix it? -->
520      *
521      * @param entry         the pool entry for the connection to delete
522      */
523     protected void deleteEntry(BasicPoolEntry entry) {
524 
525         HttpRoute route = entry.getPlannedRoute();
526 
527         if (log.isDebugEnabled()) {
528             log.debug("Deleting connection"
529                     + " [" + route + "][" + entry.getState() + "]");
530         }
531 
532         poolLock.lock();
533         try {
534 
535             closeConnection(entry.getConnection());
536 
537             RouteSpecificPool rospl = getRoutePool(route, true);
538             rospl.deleteEntry(entry);
539             numConnections--;
540             if (rospl.isUnused()) {
541                 routeToPool.remove(route);
542             }
543 
544             idleConnHandler.remove(entry.getConnection());// not idle, but dead
545 
546         } finally {
547             poolLock.unlock();
548         }
549     }
550 
551 
552     /**
553      * Delete an old, free pool entry to make room for a new one.
554      * Used to replace pool entries with ones for a different route.
555      */
556     protected void deleteLeastUsedEntry() {
557 
558         try {
559             poolLock.lock();
560 
561             //@@@ with get() instead of remove, we could
562             //@@@ leave the removing to deleteEntry()
563             BasicPoolEntry entry = freeConnections.remove();
564 
565             if (entry != null) {
566                 deleteEntry(entry);
567             } else if (log.isDebugEnabled()) {
568                 log.debug("No free connection to delete.");
569             }
570 
571         } finally {
572             poolLock.unlock();
573         }
574     }
575 
576 
577     // non-javadoc, see base class AbstractConnPool
578     @Override
579     protected void handleLostEntry(HttpRoute route) {
580 
581         poolLock.lock();
582         try {
583 
584             RouteSpecificPool rospl = getRoutePool(route, true);
585             rospl.dropEntry();
586             if (rospl.isUnused()) {
587                 routeToPool.remove(route);
588             }
589 
590             numConnections--;
591             notifyWaitingThread(rospl);
592 
593         } finally {
594             poolLock.unlock();
595         }
596     }
597 
598 
599     /**
600      * Notifies a waiting thread that a connection is available.
601      * This will wake a thread waiting in the specific route pool,
602      * if there is one.
603      * Otherwise, a thread in the connection pool will be notified.
604      *
605      * @param rospl     the pool in which to notify, or <code>null</code>
606      */
607     protected void notifyWaitingThread(RouteSpecificPool rospl) {
608 
609         //@@@ while this strategy provides for best connection re-use,
610         //@@@ is it fair? only do this if the connection is open?
611         // Find the thread we are going to notify. We want to ensure that
612         // each waiting thread is only interrupted once, so we will remove
613         // it from all wait queues before interrupting.
614         WaitingThread waitingThread = null;
615 
616         poolLock.lock();
617         try {
618 
619             if ((rospl != null) && rospl.hasThread()) {
620                 if (log.isDebugEnabled()) {
621                     log.debug("Notifying thread waiting on pool" +
622                             " [" + rospl.getRoute() + "]");
623                 }
624                 waitingThread = rospl.nextThread();
625             } else if (!waitingThreads.isEmpty()) {
626                 if (log.isDebugEnabled()) {
627                     log.debug("Notifying thread waiting on any pool");
628                 }
629                 waitingThread = waitingThreads.remove();
630             } else if (log.isDebugEnabled()) {
631                 log.debug("Notifying no-one, there are no waiting threads");
632             }
633 
634             if (waitingThread != null) {
635                 waitingThread.wakeup();
636             }
637 
638         } finally {
639             poolLock.unlock();
640         }
641     }
642 
643 
644     //@@@ revise this cleanup stuff
645     //@@@ move method to base class when deleteEntry() is fixed
646     // non-javadoc, see base class AbstractConnPool
647     @Override
648     public void deleteClosedConnections() {
649 
650         poolLock.lock();
651         try {
652 
653             Iterator<BasicPoolEntry>  iter = freeConnections.iterator();
654             while (iter.hasNext()) {
655                 BasicPoolEntry entry = iter.next();
656                 if (!entry.getConnection().isOpen()) {
657                     iter.remove();
658                     deleteEntry(entry);
659                 }
660             }
661 
662         } finally {
663             poolLock.unlock();
664         }
665     }
666 
667 
668     // non-javadoc, see base class AbstractConnPool
669     @Override
670     public void shutdown() {
671 
672         poolLock.lock();
673         try {
674 
675             super.shutdown();
676 
677             // close all free connections
678             //@@@ move this to base class?
679             Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
680             while (ibpe.hasNext()) {
681                 BasicPoolEntry entry = ibpe.next();
682                 ibpe.remove();
683                 closeConnection(entry.getConnection());
684             }
685 
686             // wake up all waiting threads
687             Iterator<WaitingThread> iwth = waitingThreads.iterator();
688             while (iwth.hasNext()) {
689                 WaitingThread waiter = iwth.next();
690                 iwth.remove();
691                 waiter.wakeup();
692             }
693 
694             routeToPool.clear();
695 
696         } finally {
697             poolLock.unlock();
698         }
699     }
700 
701 
702 } // class ConnPoolByRoute
703 
704