1 /* 2 * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $ 3 * $Revision: 677240 $ 4 * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $ 5 * 6 * ==================================================================== 7 * 8 * Licensed to the Apache Software Foundation (ASF) under one or more 9 * contributor license agreements. See the NOTICE file distributed with 10 * this work for additional information regarding copyright ownership. 11 * The ASF licenses this file to You under the Apache License, Version 2.0 12 * (the "License"); you may not use this file except in compliance with 13 * the License. You may obtain a copy of the License at 14 * 15 * http://www.apache.org/licenses/LICENSE-2.0 16 * 17 * Unless required by applicable law or agreed to in writing, software 18 * distributed under the License is distributed on an "AS IS" BASIS, 19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 20 * See the License for the specific language governing permissions and 21 * limitations under the License. 22 * ==================================================================== 23 * 24 * This software consists of voluntary contributions made by many 25 * individuals on behalf of the Apache Software Foundation. For more 26 * information on the Apache Software Foundation, please see 27 * <http://www.apache.org/>. 28 * 29 */ 30 31 package org.apache.http.impl.conn.tsccm; 32 33 import java.util.Date; 34 import java.util.HashMap; 35 import java.util.Iterator; 36 import java.util.Queue; 37 import java.util.LinkedList; 38 import java.util.Map; 39 import java.util.concurrent.locks.Condition; 40 import java.util.concurrent.TimeUnit; 41 42 import org.apache.commons.logging.Log; 43 import org.apache.commons.logging.LogFactory; 44 import org.apache.http.conn.routing.HttpRoute; 45 import org.apache.http.conn.ClientConnectionOperator; 46 import org.apache.http.conn.ConnectionPoolTimeoutException; 47 import org.apache.http.conn.params.ConnPerRoute; 48 import org.apache.http.conn.params.ConnManagerParams; 49 import org.apache.http.params.HttpParams; 50 51 52 /** 53 * A connection pool that maintains connections by route. 54 * This class is derived from <code>MultiThreadedHttpConnectionManager</code> 55 * in HttpClient 3.x, see there for original authors. It implements the same 56 * algorithm for connection re-use and connection-per-host enforcement: 57 * <ul> 58 * <li>connections are re-used only for the exact same route</li> 59 * <li>connection limits are enforced per route rather than per host</li> 60 * </ul> 61 * Note that access to the pool datastructures is synchronized via the 62 * {@link AbstractConnPool#poolLock poolLock} in the base class, 63 * not via <code>synchronized</code> methods. 64 * 65 * @author <a href="mailto:rolandw at apache.org">Roland Weber</a> 66 * @author <a href="mailto:becke@u.washington.edu">Michael Becke</a> 67 * @author and others 68 */ 69 public class ConnPoolByRoute extends AbstractConnPool { 70 71 private final Log log = LogFactory.getLog(getClass()); 72 73 /** Connection operator for this pool */ 74 protected final ClientConnectionOperator operator; 75 76 /** The list of free connections */ 77 protected Queue<BasicPoolEntry> freeConnections; 78 79 /** The list of WaitingThreads waiting for a connection */ 80 protected Queue<WaitingThread> waitingThreads; 81 82 /** 83 * A map of route-specific pools. 84 * Keys are of class {@link HttpRoute}, 85 * values of class {@link RouteSpecificPool}. 86 */ 87 protected final Map<HttpRoute, RouteSpecificPool> routeToPool; 88 89 protected final int maxTotalConnections; 90 91 private final ConnPerRoute connPerRoute; 92 93 /** 94 * Creates a new connection pool, managed by route. 95 */ ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params)96 public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) { 97 super(); 98 if (operator == null) { 99 throw new IllegalArgumentException("Connection operator may not be null"); 100 } 101 this.operator = operator; 102 103 freeConnections = createFreeConnQueue(); 104 waitingThreads = createWaitingThreadQueue(); 105 routeToPool = createRouteToPoolMap(); 106 maxTotalConnections = ConnManagerParams 107 .getMaxTotalConnections(params); 108 connPerRoute = ConnManagerParams 109 .getMaxConnectionsPerRoute(params); 110 } 111 112 113 /** 114 * Creates the queue for {@link #freeConnections}. 115 * Called once by the constructor. 116 * 117 * @return a queue 118 */ createFreeConnQueue()119 protected Queue<BasicPoolEntry> createFreeConnQueue() { 120 return new LinkedList<BasicPoolEntry>(); 121 } 122 123 /** 124 * Creates the queue for {@link #waitingThreads}. 125 * Called once by the constructor. 126 * 127 * @return a queue 128 */ createWaitingThreadQueue()129 protected Queue<WaitingThread> createWaitingThreadQueue() { 130 return new LinkedList<WaitingThread>(); 131 } 132 133 /** 134 * Creates the map for {@link #routeToPool}. 135 * Called once by the constructor. 136 * 137 * @return a map 138 */ createRouteToPoolMap()139 protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() { 140 return new HashMap<HttpRoute, RouteSpecificPool>(); 141 } 142 143 144 /** 145 * Creates a new route-specific pool. 146 * Called by {@link #getRoutePool} when necessary. 147 * 148 * @param route the route 149 * 150 * @return the new pool 151 */ newRouteSpecificPool(HttpRoute route)152 protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) { 153 return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route)); 154 } 155 156 157 /** 158 * Creates a new waiting thread. 159 * Called by {@link #getRoutePool} when necessary. 160 * 161 * @param cond the condition to wait for 162 * @param rospl the route specific pool, or <code>null</code> 163 * 164 * @return a waiting thread representation 165 */ newWaitingThread(Condition cond, RouteSpecificPool rospl)166 protected WaitingThread newWaitingThread(Condition cond, 167 RouteSpecificPool rospl) { 168 return new WaitingThread(cond, rospl); 169 } 170 171 172 /** 173 * Get a route-specific pool of available connections. 174 * 175 * @param route the route 176 * @param create whether to create the pool if it doesn't exist 177 * 178 * @return the pool for the argument route, 179 * never <code>null</code> if <code>create</code> is <code>true</code> 180 */ getRoutePool(HttpRoute route, boolean create)181 protected RouteSpecificPool getRoutePool(HttpRoute route, 182 boolean create) { 183 RouteSpecificPool rospl = null; 184 poolLock.lock(); 185 try { 186 187 rospl = routeToPool.get(route); 188 if ((rospl == null) && create) { 189 // no pool for this route yet (or anymore) 190 rospl = newRouteSpecificPool(route); 191 routeToPool.put(route, rospl); 192 } 193 194 } finally { 195 poolLock.unlock(); 196 } 197 198 return rospl; 199 } 200 201 202 //@@@ consider alternatives for gathering statistics getConnectionsInPool(HttpRoute route)203 public int getConnectionsInPool(HttpRoute route) { 204 205 poolLock.lock(); 206 try { 207 // don't allow a pool to be created here! 208 RouteSpecificPool rospl = getRoutePool(route, false); 209 return (rospl != null) ? rospl.getEntryCount() : 0; 210 211 } finally { 212 poolLock.unlock(); 213 } 214 } 215 216 @Override requestPoolEntry( final HttpRoute route, final Object state)217 public PoolEntryRequest requestPoolEntry( 218 final HttpRoute route, 219 final Object state) { 220 221 final WaitingThreadAborter aborter = new WaitingThreadAborter(); 222 223 return new PoolEntryRequest() { 224 225 public void abortRequest() { 226 poolLock.lock(); 227 try { 228 aborter.abort(); 229 } finally { 230 poolLock.unlock(); 231 } 232 } 233 234 public BasicPoolEntry getPoolEntry( 235 long timeout, 236 TimeUnit tunit) 237 throws InterruptedException, ConnectionPoolTimeoutException { 238 return getEntryBlocking(route, state, timeout, tunit, aborter); 239 } 240 241 }; 242 } 243 244 /** 245 * Obtains a pool entry with a connection within the given timeout. 246 * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)} 247 * must be called before blocking, to allow the thread to be interrupted. 248 * 249 * @param route the route for which to get the connection 250 * @param timeout the timeout, 0 or negative for no timeout 251 * @param tunit the unit for the <code>timeout</code>, 252 * may be <code>null</code> only if there is no timeout 253 * @param aborter an object which can abort a {@link WaitingThread}. 254 * 255 * @return pool entry holding a connection for the route 256 * 257 * @throws ConnectionPoolTimeoutException 258 * if the timeout expired 259 * @throws InterruptedException 260 * if the calling thread was interrupted 261 */ 262 protected BasicPoolEntry getEntryBlocking( 263 HttpRoute route, Object state, 264 long timeout, TimeUnit tunit, 265 WaitingThreadAborter aborter) 266 throws ConnectionPoolTimeoutException, InterruptedException { 267 268 Date deadline = null; 269 if (timeout > 0) { 270 deadline = new Date 271 (System.currentTimeMillis() + tunit.toMillis(timeout)); 272 } 273 274 BasicPoolEntry entry = null; 275 poolLock.lock(); 276 try { 277 278 RouteSpecificPool rospl = getRoutePool(route, true); 279 WaitingThread waitingThread = null; 280 281 while (entry == null) { 282 283 if (isShutDown) { 284 throw new IllegalStateException 285 ("Connection pool shut down."); 286 } 287 288 if (log.isDebugEnabled()) { 289 log.debug("Total connections kept alive: " + freeConnections.size()); 290 log.debug("Total issued connections: " + issuedConnections.size()); 291 log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections); 292 } 293 294 // the cases to check for: 295 // - have a free connection for that route 296 // - allowed to create a free connection for that route 297 // - can delete and replace a free connection for another route 298 // - need to wait for one of the things above to come true 299 300 entry = getFreeEntry(rospl, state); 301 if (entry != null) { 302 break; 303 } 304 305 boolean hasCapacity = rospl.getCapacity() > 0; 306 307 if (log.isDebugEnabled()) { 308 log.debug("Available capacity: " + rospl.getCapacity() 309 + " out of " + rospl.getMaxEntries() 310 + " [" + route + "][" + state + "]"); 311 } 312 313 if (hasCapacity && numConnections < maxTotalConnections) { 314 315 entry = createEntry(rospl, operator); 316 317 } else if (hasCapacity && !freeConnections.isEmpty()) { 318 319 deleteLeastUsedEntry(); 320 entry = createEntry(rospl, operator); 321 322 } else { 323 324 if (log.isDebugEnabled()) { 325 log.debug("Need to wait for connection" + 326 " [" + route + "][" + state + "]"); 327 } 328 329 if (waitingThread == null) { 330 waitingThread = 331 newWaitingThread(poolLock.newCondition(), rospl); 332 aborter.setWaitingThread(waitingThread); 333 } 334 335 boolean success = false; 336 try { 337 rospl.queueThread(waitingThread); 338 waitingThreads.add(waitingThread); 339 success = waitingThread.await(deadline); 340 341 } finally { 342 // In case of 'success', we were woken up by the 343 // connection pool and should now have a connection 344 // waiting for us, or else we're shutting down. 345 // Just continue in the loop, both cases are checked. 346 rospl.removeThread(waitingThread); 347 waitingThreads.remove(waitingThread); 348 } 349 350 // check for spurious wakeup vs. timeout 351 if (!success && (deadline != null) && 352 (deadline.getTime() <= System.currentTimeMillis())) { 353 throw new ConnectionPoolTimeoutException 354 ("Timeout waiting for connection"); 355 } 356 } 357 } // while no entry 358 359 } finally { 360 poolLock.unlock(); 361 } 362 363 return entry; 364 365 } // getEntry 366 367 368 // non-javadoc, see base class AbstractConnPool 369 @Override 370 public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) { 371 372 HttpRoute route = entry.getPlannedRoute(); 373 if (log.isDebugEnabled()) { 374 log.debug("Freeing connection" + 375 " [" + route + "][" + entry.getState() + "]"); 376 } 377 378 poolLock.lock(); 379 try { 380 if (isShutDown) { 381 // the pool is shut down, release the 382 // connection's resources and get out of here 383 closeConnection(entry.getConnection()); 384 return; 385 } 386 387 // no longer issued, we keep a hard reference now 388 issuedConnections.remove(entry.getWeakRef()); 389 390 RouteSpecificPool rospl = getRoutePool(route, true); 391 392 if (reusable) { 393 rospl.freeEntry(entry); 394 freeConnections.add(entry); 395 idleConnHandler.add(entry.getConnection(), validDuration, timeUnit); 396 } else { 397 rospl.dropEntry(); 398 numConnections--; 399 } 400 401 notifyWaitingThread(rospl); 402 403 } finally { 404 poolLock.unlock(); 405 } 406 407 } // freeEntry 408 409 410 411 /** 412 * If available, get a free pool entry for a route. 413 * 414 * @param rospl the route-specific pool from which to get an entry 415 * 416 * @return an available pool entry for the given route, or 417 * <code>null</code> if none is available 418 */ 419 protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) { 420 421 BasicPoolEntry entry = null; 422 poolLock.lock(); 423 try { 424 boolean done = false; 425 while(!done) { 426 427 entry = rospl.allocEntry(state); 428 429 if (entry != null) { 430 if (log.isDebugEnabled()) { 431 log.debug("Getting free connection" 432 + " [" + rospl.getRoute() + "][" + state + "]"); 433 434 } 435 freeConnections.remove(entry); 436 boolean valid = idleConnHandler.remove(entry.getConnection()); 437 if(!valid) { 438 // If the free entry isn't valid anymore, get rid of it 439 // and loop to find another one that might be valid. 440 if(log.isDebugEnabled()) 441 log.debug("Closing expired free connection" 442 + " [" + rospl.getRoute() + "][" + state + "]"); 443 closeConnection(entry.getConnection()); 444 // We use dropEntry instead of deleteEntry because the entry 445 // is no longer "free" (we just allocated it), and deleteEntry 446 // can only be used to delete free entries. 447 rospl.dropEntry(); 448 numConnections--; 449 } else { 450 issuedConnections.add(entry.getWeakRef()); 451 done = true; 452 } 453 454 } else { 455 done = true; 456 if (log.isDebugEnabled()) { 457 log.debug("No free connections" 458 + " [" + rospl.getRoute() + "][" + state + "]"); 459 } 460 } 461 } 462 } finally { 463 poolLock.unlock(); 464 } 465 466 return entry; 467 } 468 469 470 /** 471 * Creates a new pool entry. 472 * This method assumes that the new connection will be handed 473 * out immediately. 474 * 475 * @param rospl the route-specific pool for which to create the entry 476 * @param op the operator for creating a connection 477 * 478 * @return the new pool entry for a new connection 479 */ 480 protected BasicPoolEntry createEntry(RouteSpecificPool rospl, 481 ClientConnectionOperator op) { 482 483 if (log.isDebugEnabled()) { 484 log.debug("Creating new connection [" + rospl.getRoute() + "]"); 485 } 486 487 // the entry will create the connection when needed 488 BasicPoolEntry entry = 489 new BasicPoolEntry(op, rospl.getRoute(), refQueue); 490 491 poolLock.lock(); 492 try { 493 494 rospl.createdEntry(entry); 495 numConnections++; 496 497 issuedConnections.add(entry.getWeakRef()); 498 499 } finally { 500 poolLock.unlock(); 501 } 502 503 return entry; 504 } 505 506 507 /** 508 * Deletes a given pool entry. 509 * This closes the pooled connection and removes all references, 510 * so that it can be GCed. 511 * 512 * <p><b>Note:</b> Does not remove the entry from the freeConnections list. 513 * It is assumed that the caller has already handled this step.</p> 514 * <!-- @@@ is that a good idea? or rather fix it? --> 515 * 516 * @param entry the pool entry for the connection to delete 517 */ 518 protected void deleteEntry(BasicPoolEntry entry) { 519 520 HttpRoute route = entry.getPlannedRoute(); 521 522 if (log.isDebugEnabled()) { 523 log.debug("Deleting connection" 524 + " [" + route + "][" + entry.getState() + "]"); 525 } 526 527 poolLock.lock(); 528 try { 529 530 closeConnection(entry.getConnection()); 531 532 RouteSpecificPool rospl = getRoutePool(route, true); 533 rospl.deleteEntry(entry); 534 numConnections--; 535 if (rospl.isUnused()) { 536 routeToPool.remove(route); 537 } 538 539 idleConnHandler.remove(entry.getConnection());// not idle, but dead 540 541 } finally { 542 poolLock.unlock(); 543 } 544 } 545 546 547 /** 548 * Delete an old, free pool entry to make room for a new one. 549 * Used to replace pool entries with ones for a different route. 550 */ 551 protected void deleteLeastUsedEntry() { 552 553 try { 554 poolLock.lock(); 555 556 //@@@ with get() instead of remove, we could 557 //@@@ leave the removing to deleteEntry() 558 BasicPoolEntry entry = freeConnections.remove(); 559 560 if (entry != null) { 561 deleteEntry(entry); 562 } else if (log.isDebugEnabled()) { 563 log.debug("No free connection to delete."); 564 } 565 566 } finally { 567 poolLock.unlock(); 568 } 569 } 570 571 572 // non-javadoc, see base class AbstractConnPool 573 @Override 574 protected void handleLostEntry(HttpRoute route) { 575 576 poolLock.lock(); 577 try { 578 579 RouteSpecificPool rospl = getRoutePool(route, true); 580 rospl.dropEntry(); 581 if (rospl.isUnused()) { 582 routeToPool.remove(route); 583 } 584 585 numConnections--; 586 notifyWaitingThread(rospl); 587 588 } finally { 589 poolLock.unlock(); 590 } 591 } 592 593 594 /** 595 * Notifies a waiting thread that a connection is available. 596 * This will wake a thread waiting in the specific route pool, 597 * if there is one. 598 * Otherwise, a thread in the connection pool will be notified. 599 * 600 * @param rospl the pool in which to notify, or <code>null</code> 601 */ 602 protected void notifyWaitingThread(RouteSpecificPool rospl) { 603 604 //@@@ while this strategy provides for best connection re-use, 605 //@@@ is it fair? only do this if the connection is open? 606 // Find the thread we are going to notify. We want to ensure that 607 // each waiting thread is only interrupted once, so we will remove 608 // it from all wait queues before interrupting. 609 WaitingThread waitingThread = null; 610 611 poolLock.lock(); 612 try { 613 614 if ((rospl != null) && rospl.hasThread()) { 615 if (log.isDebugEnabled()) { 616 log.debug("Notifying thread waiting on pool" + 617 " [" + rospl.getRoute() + "]"); 618 } 619 waitingThread = rospl.nextThread(); 620 } else if (!waitingThreads.isEmpty()) { 621 if (log.isDebugEnabled()) { 622 log.debug("Notifying thread waiting on any pool"); 623 } 624 waitingThread = waitingThreads.remove(); 625 } else if (log.isDebugEnabled()) { 626 log.debug("Notifying no-one, there are no waiting threads"); 627 } 628 629 if (waitingThread != null) { 630 waitingThread.wakeup(); 631 } 632 633 } finally { 634 poolLock.unlock(); 635 } 636 } 637 638 639 //@@@ revise this cleanup stuff 640 //@@@ move method to base class when deleteEntry() is fixed 641 // non-javadoc, see base class AbstractConnPool 642 @Override 643 public void deleteClosedConnections() { 644 645 poolLock.lock(); 646 try { 647 648 Iterator<BasicPoolEntry> iter = freeConnections.iterator(); 649 while (iter.hasNext()) { 650 BasicPoolEntry entry = iter.next(); 651 if (!entry.getConnection().isOpen()) { 652 iter.remove(); 653 deleteEntry(entry); 654 } 655 } 656 657 } finally { 658 poolLock.unlock(); 659 } 660 } 661 662 663 // non-javadoc, see base class AbstractConnPool 664 @Override 665 public void shutdown() { 666 667 poolLock.lock(); 668 try { 669 670 super.shutdown(); 671 672 // close all free connections 673 //@@@ move this to base class? 674 Iterator<BasicPoolEntry> ibpe = freeConnections.iterator(); 675 while (ibpe.hasNext()) { 676 BasicPoolEntry entry = ibpe.next(); 677 ibpe.remove(); 678 closeConnection(entry.getConnection()); 679 } 680 681 // wake up all waiting threads 682 Iterator<WaitingThread> iwth = waitingThreads.iterator(); 683 while (iwth.hasNext()) { 684 WaitingThread waiter = iwth.next(); 685 iwth.remove(); 686 waiter.wakeup(); 687 } 688 689 routeToPool.clear(); 690 691 } finally { 692 poolLock.unlock(); 693 } 694 } 695 696 697 } // class ConnPoolByRoute 698 699