1 /* 2 * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $ 3 * $Revision: 677240 $ 4 * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $ 5 * 6 * ==================================================================== 7 * 8 * Licensed to the Apache Software Foundation (ASF) under one or more 9 * contributor license agreements. See the NOTICE file distributed with 10 * this work for additional information regarding copyright ownership. 11 * The ASF licenses this file to You under the Apache License, Version 2.0 12 * (the "License"); you may not use this file except in compliance with 13 * the License. You may obtain a copy of the License at 14 * 15 * http://www.apache.org/licenses/LICENSE-2.0 16 * 17 * Unless required by applicable law or agreed to in writing, software 18 * distributed under the License is distributed on an "AS IS" BASIS, 19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 20 * See the License for the specific language governing permissions and 21 * limitations under the License. 22 * ==================================================================== 23 * 24 * This software consists of voluntary contributions made by many 25 * individuals on behalf of the Apache Software Foundation. For more 26 * information on the Apache Software Foundation, please see 27 * <http://www.apache.org/>. 28 * 29 */ 30 31 package org.apache.http.impl.conn.tsccm; 32 33 import java.util.Date; 34 import java.util.HashMap; 35 import java.util.Iterator; 36 import java.util.Queue; 37 import java.util.LinkedList; 38 import java.util.Map; 39 import java.util.concurrent.locks.Condition; 40 import java.util.concurrent.TimeUnit; 41 42 import org.apache.commons.logging.Log; 43 import org.apache.commons.logging.LogFactory; 44 import org.apache.http.conn.routing.HttpRoute; 45 import org.apache.http.conn.ClientConnectionOperator; 46 import org.apache.http.conn.ConnectionPoolTimeoutException; 47 import org.apache.http.conn.params.ConnPerRoute; 48 import org.apache.http.conn.params.ConnManagerParams; 49 import org.apache.http.params.HttpParams; 50 51 52 /** 53 * A connection pool that maintains connections by route. 54 * This class is derived from <code>MultiThreadedHttpConnectionManager</code> 55 * in HttpClient 3.x, see there for original authors. It implements the same 56 * algorithm for connection re-use and connection-per-host enforcement: 57 * <ul> 58 * <li>connections are re-used only for the exact same route</li> 59 * <li>connection limits are enforced per route rather than per host</li> 60 * </ul> 61 * Note that access to the pool datastructures is synchronized via the 62 * {@link AbstractConnPool#poolLock poolLock} in the base class, 63 * not via <code>synchronized</code> methods. 64 * 65 * @author <a href="mailto:rolandw at apache.org">Roland Weber</a> 66 * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a> 67 * @author and others 68 * 69 * @deprecated Please use {@link java.net.URL#openConnection} instead. 70 * Please visit <a href="http://android-developers.blogspot.com/2011/09/androids-http-clients.html">this webpage</a> 71 * for further details. 72 */ 73 @Deprecated 74 public class ConnPoolByRoute extends AbstractConnPool { 75 76 private final Log log = LogFactory.getLog(getClass()); 77 78 /** Connection operator for this pool */ 79 protected final ClientConnectionOperator operator; 80 81 /** The list of free connections */ 82 protected Queue<BasicPoolEntry> freeConnections; 83 84 /** The list of WaitingThreads waiting for a connection */ 85 protected Queue<WaitingThread> waitingThreads; 86 87 /** 88 * A map of route-specific pools. 89 * Keys are of class {@link HttpRoute}, 90 * values of class {@link RouteSpecificPool}. 91 */ 92 protected final Map<HttpRoute, RouteSpecificPool> routeToPool; 93 94 protected final int maxTotalConnections; 95 96 private final ConnPerRoute connPerRoute; 97 98 /** 99 * Creates a new connection pool, managed by route. 100 */ ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params)101 public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) { 102 super(); 103 if (operator == null) { 104 throw new IllegalArgumentException("Connection operator may not be null"); 105 } 106 this.operator = operator; 107 108 freeConnections = createFreeConnQueue(); 109 waitingThreads = createWaitingThreadQueue(); 110 routeToPool = createRouteToPoolMap(); 111 maxTotalConnections = ConnManagerParams 112 .getMaxTotalConnections(params); 113 connPerRoute = ConnManagerParams 114 .getMaxConnectionsPerRoute(params); 115 } 116 117 118 /** 119 * Creates the queue for {@link #freeConnections}. 120 * Called once by the constructor. 121 * 122 * @return a queue 123 */ createFreeConnQueue()124 protected Queue<BasicPoolEntry> createFreeConnQueue() { 125 return new LinkedList<BasicPoolEntry>(); 126 } 127 128 /** 129 * Creates the queue for {@link #waitingThreads}. 130 * Called once by the constructor. 131 * 132 * @return a queue 133 */ createWaitingThreadQueue()134 protected Queue<WaitingThread> createWaitingThreadQueue() { 135 return new LinkedList<WaitingThread>(); 136 } 137 138 /** 139 * Creates the map for {@link #routeToPool}. 140 * Called once by the constructor. 141 * 142 * @return a map 143 */ createRouteToPoolMap()144 protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() { 145 return new HashMap<HttpRoute, RouteSpecificPool>(); 146 } 147 148 149 /** 150 * Creates a new route-specific pool. 151 * Called by {@link #getRoutePool} when necessary. 152 * 153 * @param route the route 154 * 155 * @return the new pool 156 */ newRouteSpecificPool(HttpRoute route)157 protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) { 158 return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route)); 159 } 160 161 162 /** 163 * Creates a new waiting thread. 164 * Called by {@link #getRoutePool} when necessary. 165 * 166 * @param cond the condition to wait for 167 * @param rospl the route specific pool, or <code>null</code> 168 * 169 * @return a waiting thread representation 170 */ newWaitingThread(Condition cond, RouteSpecificPool rospl)171 protected WaitingThread newWaitingThread(Condition cond, 172 RouteSpecificPool rospl) { 173 return new WaitingThread(cond, rospl); 174 } 175 176 177 /** 178 * Get a route-specific pool of available connections. 179 * 180 * @param route the route 181 * @param create whether to create the pool if it doesn't exist 182 * 183 * @return the pool for the argument route, 184 * never <code>null</code> if <code>create</code> is <code>true</code> 185 */ getRoutePool(HttpRoute route, boolean create)186 protected RouteSpecificPool getRoutePool(HttpRoute route, 187 boolean create) { 188 RouteSpecificPool rospl = null; 189 poolLock.lock(); 190 try { 191 192 rospl = routeToPool.get(route); 193 if ((rospl == null) && create) { 194 // no pool for this route yet (or anymore) 195 rospl = newRouteSpecificPool(route); 196 routeToPool.put(route, rospl); 197 } 198 199 } finally { 200 poolLock.unlock(); 201 } 202 203 return rospl; 204 } 205 206 207 //@@@ consider alternatives for gathering statistics getConnectionsInPool(HttpRoute route)208 public int getConnectionsInPool(HttpRoute route) { 209 210 poolLock.lock(); 211 try { 212 // don't allow a pool to be created here! 213 RouteSpecificPool rospl = getRoutePool(route, false); 214 return (rospl != null) ? rospl.getEntryCount() : 0; 215 216 } finally { 217 poolLock.unlock(); 218 } 219 } 220 221 @Override requestPoolEntry( final HttpRoute route, final Object state)222 public PoolEntryRequest requestPoolEntry( 223 final HttpRoute route, 224 final Object state) { 225 226 final WaitingThreadAborter aborter = new WaitingThreadAborter(); 227 228 return new PoolEntryRequest() { 229 230 public void abortRequest() { 231 poolLock.lock(); 232 try { 233 aborter.abort(); 234 } finally { 235 poolLock.unlock(); 236 } 237 } 238 239 public BasicPoolEntry getPoolEntry( 240 long timeout, 241 TimeUnit tunit) 242 throws InterruptedException, ConnectionPoolTimeoutException { 243 return getEntryBlocking(route, state, timeout, tunit, aborter); 244 } 245 246 }; 247 } 248 249 /** 250 * Obtains a pool entry with a connection within the given timeout. 251 * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)} 252 * must be called before blocking, to allow the thread to be interrupted. 253 * 254 * @param route the route for which to get the connection 255 * @param timeout the timeout, 0 or negative for no timeout 256 * @param tunit the unit for the <code>timeout</code>, 257 * may be <code>null</code> only if there is no timeout 258 * @param aborter an object which can abort a {@link WaitingThread}. 259 * 260 * @return pool entry holding a connection for the route 261 * 262 * @throws ConnectionPoolTimeoutException 263 * if the timeout expired 264 * @throws InterruptedException 265 * if the calling thread was interrupted 266 */ 267 protected BasicPoolEntry getEntryBlocking( 268 HttpRoute route, Object state, 269 long timeout, TimeUnit tunit, 270 WaitingThreadAborter aborter) 271 throws ConnectionPoolTimeoutException, InterruptedException { 272 273 Date deadline = null; 274 if (timeout > 0) { 275 deadline = new Date 276 (System.currentTimeMillis() + tunit.toMillis(timeout)); 277 } 278 279 BasicPoolEntry entry = null; 280 poolLock.lock(); 281 try { 282 283 RouteSpecificPool rospl = getRoutePool(route, true); 284 WaitingThread waitingThread = null; 285 286 while (entry == null) { 287 288 if (isShutDown) { 289 throw new IllegalStateException 290 ("Connection pool shut down."); 291 } 292 293 if (log.isDebugEnabled()) { 294 log.debug("Total connections kept alive: " + freeConnections.size()); 295 log.debug("Total issued connections: " + issuedConnections.size()); 296 log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections); 297 } 298 299 // the cases to check for: 300 // - have a free connection for that route 301 // - allowed to create a free connection for that route 302 // - can delete and replace a free connection for another route 303 // - need to wait for one of the things above to come true 304 305 entry = getFreeEntry(rospl, state); 306 if (entry != null) { 307 break; 308 } 309 310 boolean hasCapacity = rospl.getCapacity() > 0; 311 312 if (log.isDebugEnabled()) { 313 log.debug("Available capacity: " + rospl.getCapacity() 314 + " out of " + rospl.getMaxEntries() 315 + " [" + route + "][" + state + "]"); 316 } 317 318 if (hasCapacity && numConnections < maxTotalConnections) { 319 320 entry = createEntry(rospl, operator); 321 322 } else if (hasCapacity && !freeConnections.isEmpty()) { 323 324 deleteLeastUsedEntry(); 325 entry = createEntry(rospl, operator); 326 327 } else { 328 329 if (log.isDebugEnabled()) { 330 log.debug("Need to wait for connection" + 331 " [" + route + "][" + state + "]"); 332 } 333 334 if (waitingThread == null) { 335 waitingThread = 336 newWaitingThread(poolLock.newCondition(), rospl); 337 aborter.setWaitingThread(waitingThread); 338 } 339 340 boolean success = false; 341 try { 342 rospl.queueThread(waitingThread); 343 waitingThreads.add(waitingThread); 344 success = waitingThread.await(deadline); 345 346 } finally { 347 // In case of 'success', we were woken up by the 348 // connection pool and should now have a connection 349 // waiting for us, or else we're shutting down. 350 // Just continue in the loop, both cases are checked. 351 rospl.removeThread(waitingThread); 352 waitingThreads.remove(waitingThread); 353 } 354 355 // check for spurious wakeup vs. timeout 356 if (!success && (deadline != null) && 357 (deadline.getTime() <= System.currentTimeMillis())) { 358 throw new ConnectionPoolTimeoutException 359 ("Timeout waiting for connection"); 360 } 361 } 362 } // while no entry 363 364 } finally { 365 poolLock.unlock(); 366 } 367 368 return entry; 369 370 } // getEntry 371 372 373 // non-javadoc, see base class AbstractConnPool 374 @Override 375 public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) { 376 377 HttpRoute route = entry.getPlannedRoute(); 378 if (log.isDebugEnabled()) { 379 log.debug("Freeing connection" + 380 " [" + route + "][" + entry.getState() + "]"); 381 } 382 383 poolLock.lock(); 384 try { 385 if (isShutDown) { 386 // the pool is shut down, release the 387 // connection's resources and get out of here 388 closeConnection(entry.getConnection()); 389 return; 390 } 391 392 // no longer issued, we keep a hard reference now 393 issuedConnections.remove(entry.getWeakRef()); 394 395 RouteSpecificPool rospl = getRoutePool(route, true); 396 397 if (reusable) { 398 rospl.freeEntry(entry); 399 freeConnections.add(entry); 400 idleConnHandler.add(entry.getConnection(), validDuration, timeUnit); 401 } else { 402 rospl.dropEntry(); 403 numConnections--; 404 } 405 406 notifyWaitingThread(rospl); 407 408 } finally { 409 poolLock.unlock(); 410 } 411 412 } // freeEntry 413 414 415 416 /** 417 * If available, get a free pool entry for a route. 418 * 419 * @param rospl the route-specific pool from which to get an entry 420 * 421 * @return an available pool entry for the given route, or 422 * <code>null</code> if none is available 423 */ 424 protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) { 425 426 BasicPoolEntry entry = null; 427 poolLock.lock(); 428 try { 429 boolean done = false; 430 while(!done) { 431 432 entry = rospl.allocEntry(state); 433 434 if (entry != null) { 435 if (log.isDebugEnabled()) { 436 log.debug("Getting free connection" 437 + " [" + rospl.getRoute() + "][" + state + "]"); 438 439 } 440 freeConnections.remove(entry); 441 boolean valid = idleConnHandler.remove(entry.getConnection()); 442 if(!valid) { 443 // If the free entry isn't valid anymore, get rid of it 444 // and loop to find another one that might be valid. 445 if(log.isDebugEnabled()) 446 log.debug("Closing expired free connection" 447 + " [" + rospl.getRoute() + "][" + state + "]"); 448 closeConnection(entry.getConnection()); 449 // We use dropEntry instead of deleteEntry because the entry 450 // is no longer "free" (we just allocated it), and deleteEntry 451 // can only be used to delete free entries. 452 rospl.dropEntry(); 453 numConnections--; 454 } else { 455 issuedConnections.add(entry.getWeakRef()); 456 done = true; 457 } 458 459 } else { 460 done = true; 461 if (log.isDebugEnabled()) { 462 log.debug("No free connections" 463 + " [" + rospl.getRoute() + "][" + state + "]"); 464 } 465 } 466 } 467 } finally { 468 poolLock.unlock(); 469 } 470 471 return entry; 472 } 473 474 475 /** 476 * Creates a new pool entry. 477 * This method assumes that the new connection will be handed 478 * out immediately. 479 * 480 * @param rospl the route-specific pool for which to create the entry 481 * @param op the operator for creating a connection 482 * 483 * @return the new pool entry for a new connection 484 */ 485 protected BasicPoolEntry createEntry(RouteSpecificPool rospl, 486 ClientConnectionOperator op) { 487 488 if (log.isDebugEnabled()) { 489 log.debug("Creating new connection [" + rospl.getRoute() + "]"); 490 } 491 492 // the entry will create the connection when needed 493 BasicPoolEntry entry = 494 new BasicPoolEntry(op, rospl.getRoute(), refQueue); 495 496 poolLock.lock(); 497 try { 498 499 rospl.createdEntry(entry); 500 numConnections++; 501 502 issuedConnections.add(entry.getWeakRef()); 503 504 } finally { 505 poolLock.unlock(); 506 } 507 508 return entry; 509 } 510 511 512 /** 513 * Deletes a given pool entry. 514 * This closes the pooled connection and removes all references, 515 * so that it can be GCed. 516 * 517 * <p><b>Note:</b> Does not remove the entry from the freeConnections list. 518 * It is assumed that the caller has already handled this step.</p> 519 * <!-- @@@ is that a good idea? or rather fix it? --> 520 * 521 * @param entry the pool entry for the connection to delete 522 */ 523 protected void deleteEntry(BasicPoolEntry entry) { 524 525 HttpRoute route = entry.getPlannedRoute(); 526 527 if (log.isDebugEnabled()) { 528 log.debug("Deleting connection" 529 + " [" + route + "][" + entry.getState() + "]"); 530 } 531 532 poolLock.lock(); 533 try { 534 535 closeConnection(entry.getConnection()); 536 537 RouteSpecificPool rospl = getRoutePool(route, true); 538 rospl.deleteEntry(entry); 539 numConnections--; 540 if (rospl.isUnused()) { 541 routeToPool.remove(route); 542 } 543 544 idleConnHandler.remove(entry.getConnection());// not idle, but dead 545 546 } finally { 547 poolLock.unlock(); 548 } 549 } 550 551 552 /** 553 * Delete an old, free pool entry to make room for a new one. 554 * Used to replace pool entries with ones for a different route. 555 */ 556 protected void deleteLeastUsedEntry() { 557 558 try { 559 poolLock.lock(); 560 561 //@@@ with get() instead of remove, we could 562 //@@@ leave the removing to deleteEntry() 563 BasicPoolEntry entry = freeConnections.remove(); 564 565 if (entry != null) { 566 deleteEntry(entry); 567 } else if (log.isDebugEnabled()) { 568 log.debug("No free connection to delete."); 569 } 570 571 } finally { 572 poolLock.unlock(); 573 } 574 } 575 576 577 // non-javadoc, see base class AbstractConnPool 578 @Override 579 protected void handleLostEntry(HttpRoute route) { 580 581 poolLock.lock(); 582 try { 583 584 RouteSpecificPool rospl = getRoutePool(route, true); 585 rospl.dropEntry(); 586 if (rospl.isUnused()) { 587 routeToPool.remove(route); 588 } 589 590 numConnections--; 591 notifyWaitingThread(rospl); 592 593 } finally { 594 poolLock.unlock(); 595 } 596 } 597 598 599 /** 600 * Notifies a waiting thread that a connection is available. 601 * This will wake a thread waiting in the specific route pool, 602 * if there is one. 603 * Otherwise, a thread in the connection pool will be notified. 604 * 605 * @param rospl the pool in which to notify, or <code>null</code> 606 */ 607 protected void notifyWaitingThread(RouteSpecificPool rospl) { 608 609 //@@@ while this strategy provides for best connection re-use, 610 //@@@ is it fair? only do this if the connection is open? 611 // Find the thread we are going to notify. We want to ensure that 612 // each waiting thread is only interrupted once, so we will remove 613 // it from all wait queues before interrupting. 614 WaitingThread waitingThread = null; 615 616 poolLock.lock(); 617 try { 618 619 if ((rospl != null) && rospl.hasThread()) { 620 if (log.isDebugEnabled()) { 621 log.debug("Notifying thread waiting on pool" + 622 " [" + rospl.getRoute() + "]"); 623 } 624 waitingThread = rospl.nextThread(); 625 } else if (!waitingThreads.isEmpty()) { 626 if (log.isDebugEnabled()) { 627 log.debug("Notifying thread waiting on any pool"); 628 } 629 waitingThread = waitingThreads.remove(); 630 } else if (log.isDebugEnabled()) { 631 log.debug("Notifying no-one, there are no waiting threads"); 632 } 633 634 if (waitingThread != null) { 635 waitingThread.wakeup(); 636 } 637 638 } finally { 639 poolLock.unlock(); 640 } 641 } 642 643 644 //@@@ revise this cleanup stuff 645 //@@@ move method to base class when deleteEntry() is fixed 646 // non-javadoc, see base class AbstractConnPool 647 @Override 648 public void deleteClosedConnections() { 649 650 poolLock.lock(); 651 try { 652 653 Iterator<BasicPoolEntry> iter = freeConnections.iterator(); 654 while (iter.hasNext()) { 655 BasicPoolEntry entry = iter.next(); 656 if (!entry.getConnection().isOpen()) { 657 iter.remove(); 658 deleteEntry(entry); 659 } 660 } 661 662 } finally { 663 poolLock.unlock(); 664 } 665 } 666 667 668 // non-javadoc, see base class AbstractConnPool 669 @Override 670 public void shutdown() { 671 672 poolLock.lock(); 673 try { 674 675 super.shutdown(); 676 677 // close all free connections 678 //@@@ move this to base class? 679 Iterator<BasicPoolEntry> ibpe = freeConnections.iterator(); 680 while (ibpe.hasNext()) { 681 BasicPoolEntry entry = ibpe.next(); 682 ibpe.remove(); 683 closeConnection(entry.getConnection()); 684 } 685 686 // wake up all waiting threads 687 Iterator<WaitingThread> iwth = waitingThreads.iterator(); 688 while (iwth.hasNext()) { 689 WaitingThread waiter = iwth.next(); 690 iwth.remove(); 691 waiter.wakeup(); 692 } 693 694 routeToPool.clear(); 695 696 } finally { 697 poolLock.unlock(); 698 } 699 } 700 701 702 } // class ConnPoolByRoute 703 704