1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/licenses/publicdomain 5 */ 6 7 package java.util.concurrent; 8 9 import java.util.ArrayList; 10 import java.util.ConcurrentModificationException; 11 import java.util.HashSet; 12 import java.util.Iterator; 13 import java.util.List; 14 import java.util.concurrent.atomic.AtomicInteger; 15 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 16 import java.util.concurrent.locks.Condition; 17 import java.util.concurrent.locks.ReentrantLock; 18 19 /** 20 * An {@link ExecutorService} that executes each submitted task using 21 * one of possibly several pooled threads, normally configured 22 * using {@link Executors} factory methods. 23 * 24 * <p>Thread pools address two different problems: they usually 25 * provide improved performance when executing large numbers of 26 * asynchronous tasks, due to reduced per-task invocation overhead, 27 * and they provide a means of bounding and managing the resources, 28 * including threads, consumed when executing a collection of tasks. 29 * Each {@code ThreadPoolExecutor} also maintains some basic 30 * statistics, such as the number of completed tasks. 31 * 32 * <p>To be useful across a wide range of contexts, this class 33 * provides many adjustable parameters and extensibility 34 * hooks. However, programmers are urged to use the more convenient 35 * {@link Executors} factory methods {@link 36 * Executors#newCachedThreadPool} (unbounded thread pool, with 37 * automatic thread reclamation), {@link Executors#newFixedThreadPool} 38 * (fixed size thread pool) and {@link 39 * Executors#newSingleThreadExecutor} (single background thread), that 40 * preconfigure settings for the most common usage 41 * scenarios. Otherwise, use the following guide when manually 42 * configuring and tuning this class: 43 * 44 * <dl> 45 * 46 * <dt>Core and maximum pool sizes</dt> 47 * 48 * <dd>A {@code ThreadPoolExecutor} will automatically adjust the 49 * pool size (see {@link #getPoolSize}) 50 * according to the bounds set by 51 * corePoolSize (see {@link #getCorePoolSize}) and 52 * maximumPoolSize (see {@link #getMaximumPoolSize}). 53 * 54 * When a new task is submitted in method {@link #execute}, and fewer 55 * than corePoolSize threads are running, a new thread is created to 56 * handle the request, even if other worker threads are idle. If 57 * there are more than corePoolSize but less than maximumPoolSize 58 * threads running, a new thread will be created only if the queue is 59 * full. By setting corePoolSize and maximumPoolSize the same, you 60 * create a fixed-size thread pool. By setting maximumPoolSize to an 61 * essentially unbounded value such as {@code Integer.MAX_VALUE}, you 62 * allow the pool to accommodate an arbitrary number of concurrent 63 * tasks. Most typically, core and maximum pool sizes are set only 64 * upon construction, but they may also be changed dynamically using 65 * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd> 66 * 67 * <dt>On-demand construction</dt> 68 * 69 * <dd> By default, even core threads are initially created and 70 * started only when new tasks arrive, but this can be overridden 71 * dynamically using method {@link #prestartCoreThread} or {@link 72 * #prestartAllCoreThreads}. You probably want to prestart threads if 73 * you construct the pool with a non-empty queue. </dd> 74 * 75 * <dt>Creating new threads</dt> 76 * 77 * <dd>New threads are created using a {@link ThreadFactory}. If not 78 * otherwise specified, a {@link Executors#defaultThreadFactory} is 79 * used, that creates threads to all be in the same {@link 80 * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and 81 * non-daemon status. By supplying a different ThreadFactory, you can 82 * alter the thread's name, thread group, priority, daemon status, 83 * etc. If a {@code ThreadFactory} fails to create a thread when asked 84 * by returning null from {@code newThread}, the executor will 85 * continue, but might not be able to execute any tasks. Threads 86 * should possess the "modifyThread" {@code RuntimePermission}. If 87 * worker threads or other threads using the pool do not possess this 88 * permission, service may be degraded: configuration changes may not 89 * take effect in a timely manner, and a shutdown pool may remain in a 90 * state in which termination is possible but not completed.</dd> 91 * 92 * <dt>Keep-alive times</dt> 93 * 94 * <dd>If the pool currently has more than corePoolSize threads, 95 * excess threads will be terminated if they have been idle for more 96 * than the keepAliveTime (see {@link #getKeepAliveTime}). This 97 * provides a means of reducing resource consumption when the pool is 98 * not being actively used. If the pool becomes more active later, new 99 * threads will be constructed. This parameter can also be changed 100 * dynamically using method {@link #setKeepAliveTime}. Using a value 101 * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively 102 * disables idle threads from ever terminating prior to shut down. By 103 * default, the keep-alive policy applies only when there are more 104 * than corePoolSizeThreads. But method {@link 105 * #allowCoreThreadTimeOut(boolean)} can be used to apply this 106 * time-out policy to core threads as well, so long as the 107 * keepAliveTime value is non-zero. </dd> 108 * 109 * <dt>Queuing</dt> 110 * 111 * <dd>Any {@link BlockingQueue} may be used to transfer and hold 112 * submitted tasks. The use of this queue interacts with pool sizing: 113 * 114 * <ul> 115 * 116 * <li> If fewer than corePoolSize threads are running, the Executor 117 * always prefers adding a new thread 118 * rather than queuing.</li> 119 * 120 * <li> If corePoolSize or more threads are running, the Executor 121 * always prefers queuing a request rather than adding a new 122 * thread.</li> 123 * 124 * <li> If a request cannot be queued, a new thread is created unless 125 * this would exceed maximumPoolSize, in which case, the task will be 126 * rejected.</li> 127 * 128 * </ul> 129 * 130 * There are three general strategies for queuing: 131 * <ol> 132 * 133 * <li> <em> Direct handoffs.</em> A good default choice for a work 134 * queue is a {@link SynchronousQueue} that hands off tasks to threads 135 * without otherwise holding them. Here, an attempt to queue a task 136 * will fail if no threads are immediately available to run it, so a 137 * new thread will be constructed. This policy avoids lockups when 138 * handling sets of requests that might have internal dependencies. 139 * Direct handoffs generally require unbounded maximumPoolSizes to 140 * avoid rejection of new submitted tasks. This in turn admits the 141 * possibility of unbounded thread growth when commands continue to 142 * arrive on average faster than they can be processed. </li> 143 * 144 * <li><em> Unbounded queues.</em> Using an unbounded queue (for 145 * example a {@link LinkedBlockingQueue} without a predefined 146 * capacity) will cause new tasks to wait in the queue when all 147 * corePoolSize threads are busy. Thus, no more than corePoolSize 148 * threads will ever be created. (And the value of the maximumPoolSize 149 * therefore doesn't have any effect.) This may be appropriate when 150 * each task is completely independent of others, so tasks cannot 151 * affect each others execution; for example, in a web page server. 152 * While this style of queuing can be useful in smoothing out 153 * transient bursts of requests, it admits the possibility of 154 * unbounded work queue growth when commands continue to arrive on 155 * average faster than they can be processed. </li> 156 * 157 * <li><em>Bounded queues.</em> A bounded queue (for example, an 158 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 159 * used with finite maximumPoolSizes, but can be more difficult to 160 * tune and control. Queue sizes and maximum pool sizes may be traded 161 * off for each other: Using large queues and small pools minimizes 162 * CPU usage, OS resources, and context-switching overhead, but can 163 * lead to artificially low throughput. If tasks frequently block (for 164 * example if they are I/O bound), a system may be able to schedule 165 * time for more threads than you otherwise allow. Use of small queues 166 * generally requires larger pool sizes, which keeps CPUs busier but 167 * may encounter unacceptable scheduling overhead, which also 168 * decreases throughput. </li> 169 * 170 * </ol> 171 * 172 * </dd> 173 * 174 * <dt>Rejected tasks</dt> 175 * 176 * <dd> New tasks submitted in method {@link #execute} will be 177 * <em>rejected</em> when the Executor has been shut down, and also 178 * when the Executor uses finite bounds for both maximum threads and 179 * work queue capacity, and is saturated. In either case, the {@code 180 * execute} method invokes the {@link 181 * RejectedExecutionHandler#rejectedExecution} method of its {@link 182 * RejectedExecutionHandler}. Four predefined handler policies are 183 * provided: 184 * 185 * <ol> 186 * 187 * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the 188 * handler throws a runtime {@link RejectedExecutionException} upon 189 * rejection. </li> 190 * 191 * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread 192 * that invokes {@code execute} itself runs the task. This provides a 193 * simple feedback control mechanism that will slow down the rate that 194 * new tasks are submitted. </li> 195 * 196 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that 197 * cannot be executed is simply dropped. </li> 198 * 199 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the 200 * executor is not shut down, the task at the head of the work queue 201 * is dropped, and then execution is retried (which can fail again, 202 * causing this to be repeated.) </li> 203 * 204 * </ol> 205 * 206 * It is possible to define and use other kinds of {@link 207 * RejectedExecutionHandler} classes. Doing so requires some care 208 * especially when policies are designed to work only under particular 209 * capacity or queuing policies. </dd> 210 * 211 * <dt>Hook methods</dt> 212 * 213 * <dd>This class provides {@code protected} overridable {@link 214 * #beforeExecute} and {@link #afterExecute} methods that are called 215 * before and after execution of each task. These can be used to 216 * manipulate the execution environment; for example, reinitializing 217 * ThreadLocals, gathering statistics, or adding log 218 * entries. Additionally, method {@link #terminated} can be overridden 219 * to perform any special processing that needs to be done once the 220 * Executor has fully terminated. 221 * 222 * <p>If hook or callback methods throw exceptions, internal worker 223 * threads may in turn fail and abruptly terminate.</dd> 224 * 225 * <dt>Queue maintenance</dt> 226 * 227 * <dd> Method {@link #getQueue} allows access to the work queue for 228 * purposes of monitoring and debugging. Use of this method for any 229 * other purpose is strongly discouraged. Two supplied methods, 230 * {@link #remove} and {@link #purge} are available to assist in 231 * storage reclamation when large numbers of queued tasks become 232 * cancelled.</dd> 233 * 234 * <dt>Finalization</dt> 235 * 236 * <dd> A pool that is no longer referenced in a program <em>AND</em> 237 * has no remaining threads will be {@code shutdown} automatically. If 238 * you would like to ensure that unreferenced pools are reclaimed even 239 * if users forget to call {@link #shutdown}, then you must arrange 240 * that unused threads eventually die, by setting appropriate 241 * keep-alive times, using a lower bound of zero core threads and/or 242 * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd> 243 * 244 * </dl> 245 * 246 * <p> <b>Extension example</b>. Most extensions of this class 247 * override one or more of the protected hook methods. For example, 248 * here is a subclass that adds a simple pause/resume feature: 249 * 250 * <pre> {@code 251 * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 252 * private boolean isPaused; 253 * private ReentrantLock pauseLock = new ReentrantLock(); 254 * private Condition unpaused = pauseLock.newCondition(); 255 * 256 * public PausableThreadPoolExecutor(...) { super(...); } 257 * 258 * protected void beforeExecute(Thread t, Runnable r) { 259 * super.beforeExecute(t, r); 260 * pauseLock.lock(); 261 * try { 262 * while (isPaused) unpaused.await(); 263 * } catch (InterruptedException ie) { 264 * t.interrupt(); 265 * } finally { 266 * pauseLock.unlock(); 267 * } 268 * } 269 * 270 * public void pause() { 271 * pauseLock.lock(); 272 * try { 273 * isPaused = true; 274 * } finally { 275 * pauseLock.unlock(); 276 * } 277 * } 278 * 279 * public void resume() { 280 * pauseLock.lock(); 281 * try { 282 * isPaused = false; 283 * unpaused.signalAll(); 284 * } finally { 285 * pauseLock.unlock(); 286 * } 287 * } 288 * }}</pre> 289 * 290 * @since 1.5 291 * @author Doug Lea 292 */ 293 public class ThreadPoolExecutor extends AbstractExecutorService { 294 /** 295 * The main pool control state, ctl, is an atomic integer packing 296 * two conceptual fields 297 * workerCount, indicating the effective number of threads 298 * runState, indicating whether running, shutting down etc 299 * 300 * In order to pack them into one int, we limit workerCount to 301 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 302 * billion) otherwise representable. If this is ever an issue in 303 * the future, the variable can be changed to be an AtomicLong, 304 * and the shift/mask constants below adjusted. But until the need 305 * arises, this code is a bit faster and simpler using an int. 306 * 307 * The workerCount is the number of workers that have been 308 * permitted to start and not permitted to stop. The value may be 309 * transiently different from the actual number of live threads, 310 * for example when a ThreadFactory fails to create a thread when 311 * asked, and when exiting threads are still performing 312 * bookkeeping before terminating. The user-visible pool size is 313 * reported as the current size of the workers set. 314 * 315 * The runState provides the main lifecyle control, taking on values: 316 * 317 * RUNNING: Accept new tasks and process queued tasks 318 * SHUTDOWN: Don't accept new tasks, but process queued tasks 319 * STOP: Don't accept new tasks, don't process queued tasks, 320 * and interrupt in-progress tasks 321 * TIDYING: All tasks have terminated, workerCount is zero, 322 * the thread transitioning to state TIDYING 323 * will run the terminated() hook method 324 * TERMINATED: terminated() has completed 325 * 326 * The numerical order among these values matters, to allow 327 * ordered comparisons. The runState monotonically increases over 328 * time, but need not hit each state. The transitions are: 329 * 330 * RUNNING -> SHUTDOWN 331 * On invocation of shutdown(), perhaps implicitly in finalize() 332 * (RUNNING or SHUTDOWN) -> STOP 333 * On invocation of shutdownNow() 334 * SHUTDOWN -> TIDYING 335 * When both queue and pool are empty 336 * STOP -> TIDYING 337 * When pool is empty 338 * TIDYING -> TERMINATED 339 * When the terminated() hook method has completed 340 * 341 * Threads waiting in awaitTermination() will return when the 342 * state reaches TERMINATED. 343 * 344 * Detecting the transition from SHUTDOWN to TIDYING is less 345 * straightforward than you'd like because the queue may become 346 * empty after non-empty and vice versa during SHUTDOWN state, but 347 * we can only terminate if, after seeing that it is empty, we see 348 * that workerCount is 0 (which sometimes entails a recheck -- see 349 * below). 350 */ 351 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 352 private static final int COUNT_BITS = Integer.SIZE - 3; 353 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 354 355 // runState is stored in the high-order bits 356 private static final int RUNNING = -1 << COUNT_BITS; 357 private static final int SHUTDOWN = 0 << COUNT_BITS; 358 private static final int STOP = 1 << COUNT_BITS; 359 private static final int TIDYING = 2 << COUNT_BITS; 360 private static final int TERMINATED = 3 << COUNT_BITS; 361 362 // Packing and unpacking ctl runStateOf(int c)363 private static int runStateOf(int c) { return c & ~CAPACITY; } workerCountOf(int c)364 private static int workerCountOf(int c) { return c & CAPACITY; } ctlOf(int rs, int wc)365 private static int ctlOf(int rs, int wc) { return rs | wc; } 366 367 /* 368 * Bit field accessors that don't require unpacking ctl. 369 * These depend on the bit layout and on workerCount being never negative. 370 */ 371 runStateLessThan(int c, int s)372 private static boolean runStateLessThan(int c, int s) { 373 return c < s; 374 } 375 runStateAtLeast(int c, int s)376 private static boolean runStateAtLeast(int c, int s) { 377 return c >= s; 378 } 379 isRunning(int c)380 private static boolean isRunning(int c) { 381 return c < SHUTDOWN; 382 } 383 384 /** 385 * Attempt to CAS-increment the workerCount field of ctl. 386 */ compareAndIncrementWorkerCount(int expect)387 private boolean compareAndIncrementWorkerCount(int expect) { 388 return ctl.compareAndSet(expect, expect + 1); 389 } 390 391 /** 392 * Attempt to CAS-decrement the workerCount field of ctl. 393 */ compareAndDecrementWorkerCount(int expect)394 private boolean compareAndDecrementWorkerCount(int expect) { 395 return ctl.compareAndSet(expect, expect - 1); 396 } 397 398 /** 399 * Decrements the workerCount field of ctl. This is called only on 400 * abrupt termination of a thread (see processWorkerExit). Other 401 * decrements are performed within getTask. 402 */ decrementWorkerCount()403 private void decrementWorkerCount() { 404 do {} while (! compareAndDecrementWorkerCount(ctl.get())); 405 } 406 407 /** 408 * The queue used for holding tasks and handing off to worker 409 * threads. We do not require that workQueue.poll() returning 410 * null necessarily means that workQueue.isEmpty(), so rely 411 * solely on isEmpty to see if the queue is empty (which we must 412 * do for example when deciding whether to transition from 413 * SHUTDOWN to TIDYING). This accommodates special-purpose 414 * queues such as DelayQueues for which poll() is allowed to 415 * return null even if it may later return non-null when delays 416 * expire. 417 */ 418 private final BlockingQueue<Runnable> workQueue; 419 420 /** 421 * Lock held on access to workers set and related bookkeeping. 422 * While we could use a concurrent set of some sort, it turns out 423 * to be generally preferable to use a lock. Among the reasons is 424 * that this serializes interruptIdleWorkers, which avoids 425 * unnecessary interrupt storms, especially during shutdown. 426 * Otherwise exiting threads would concurrently interrupt those 427 * that have not yet interrupted. It also simplifies some of the 428 * associated statistics bookkeeping of largestPoolSize etc. We 429 * also hold mainLock on shutdown and shutdownNow, for the sake of 430 * ensuring workers set is stable while separately checking 431 * permission to interrupt and actually interrupting. 432 */ 433 private final ReentrantLock mainLock = new ReentrantLock(); 434 435 /** 436 * Set containing all worker threads in pool. Accessed only when 437 * holding mainLock. 438 */ 439 private final HashSet<Worker> workers = new HashSet<Worker>(); 440 441 /** 442 * Wait condition to support awaitTermination 443 */ 444 private final Condition termination = mainLock.newCondition(); 445 446 /** 447 * Tracks largest attained pool size. Accessed only under 448 * mainLock. 449 */ 450 private int largestPoolSize; 451 452 /** 453 * Counter for completed tasks. Updated only on termination of 454 * worker threads. Accessed only under mainLock. 455 */ 456 private long completedTaskCount; 457 458 /* 459 * All user control parameters are declared as volatiles so that 460 * ongoing actions are based on freshest values, but without need 461 * for locking, since no internal invariants depend on them 462 * changing synchronously with respect to other actions. 463 */ 464 465 /** 466 * Factory for new threads. All threads are created using this 467 * factory (via method addWorker). All callers must be prepared 468 * for addWorker to fail, which may reflect a system or user's 469 * policy limiting the number of threads. Even though it is not 470 * treated as an error, failure to create threads may result in 471 * new tasks being rejected or existing ones remaining stuck in 472 * the queue. On the other hand, no special precautions exist to 473 * handle OutOfMemoryErrors that might be thrown while trying to 474 * create threads, since there is generally no recourse from 475 * within this class. 476 */ 477 private volatile ThreadFactory threadFactory; 478 479 /** 480 * Handler called when saturated or shutdown in execute. 481 */ 482 private volatile RejectedExecutionHandler handler; 483 484 /** 485 * Timeout in nanoseconds for idle threads waiting for work. 486 * Threads use this timeout when there are more than corePoolSize 487 * present or if allowCoreThreadTimeOut. Otherwise they wait 488 * forever for new work. 489 */ 490 private volatile long keepAliveTime; 491 492 /** 493 * If false (default), core threads stay alive even when idle. 494 * If true, core threads use keepAliveTime to time out waiting 495 * for work. 496 */ 497 private volatile boolean allowCoreThreadTimeOut; 498 499 /** 500 * Core pool size is the minimum number of workers to keep alive 501 * (and not allow to time out etc) unless allowCoreThreadTimeOut 502 * is set, in which case the minimum is zero. 503 */ 504 private volatile int corePoolSize; 505 506 /** 507 * Maximum pool size. Note that the actual maximum is internally 508 * bounded by CAPACITY. 509 */ 510 private volatile int maximumPoolSize; 511 512 /** 513 * The default rejected execution handler 514 */ 515 private static final RejectedExecutionHandler defaultHandler = 516 new AbortPolicy(); 517 518 /** 519 * Permission required for callers of shutdown and shutdownNow. 520 * We additionally require (see checkShutdownAccess) that callers 521 * have permission to actually interrupt threads in the worker set 522 * (as governed by Thread.interrupt, which relies on 523 * ThreadGroup.checkAccess, which in turn relies on 524 * SecurityManager.checkAccess). Shutdowns are attempted only if 525 * these checks pass. 526 * 527 * All actual invocations of Thread.interrupt (see 528 * interruptIdleWorkers and interruptWorkers) ignore 529 * SecurityExceptions, meaning that the attempted interrupts 530 * silently fail. In the case of shutdown, they should not fail 531 * unless the SecurityManager has inconsistent policies, sometimes 532 * allowing access to a thread and sometimes not. In such cases, 533 * failure to actually interrupt threads may disable or delay full 534 * termination. Other uses of interruptIdleWorkers are advisory, 535 * and failure to actually interrupt will merely delay response to 536 * configuration changes so is not handled exceptionally. 537 */ 538 private static final RuntimePermission shutdownPerm = 539 new RuntimePermission("modifyThread"); 540 541 /** 542 * Class Worker mainly maintains interrupt control state for 543 * threads running tasks, along with other minor bookkeeping. 544 * This class opportunistically extends AbstractQueuedSynchronizer 545 * to simplify acquiring and releasing a lock surrounding each 546 * task execution. This protects against interrupts that are 547 * intended to wake up a worker thread waiting for a task from 548 * instead interrupting a task being run. We implement a simple 549 * non-reentrant mutual exclusion lock rather than use ReentrantLock 550 * because we do not want worker tasks to be able to reacquire the 551 * lock when they invoke pool control methods like setCorePoolSize. 552 */ 553 private final class Worker 554 extends AbstractQueuedSynchronizer 555 implements Runnable 556 { 557 /** 558 * This class will never be serialized, but we provide a 559 * serialVersionUID to suppress a javac warning. 560 */ 561 private static final long serialVersionUID = 6138294804551838833L; 562 563 /** Thread this worker is running in. Null if factory fails. */ 564 final Thread thread; 565 /** Initial task to run. Possibly null. */ 566 Runnable firstTask; 567 /** Per-thread task counter */ 568 volatile long completedTasks; 569 570 /** 571 * Creates with given first task and thread from ThreadFactory. 572 * @param firstTask the first task (null if none) 573 */ Worker(Runnable firstTask)574 Worker(Runnable firstTask) { 575 this.firstTask = firstTask; 576 this.thread = getThreadFactory().newThread(this); 577 } 578 579 /** Delegates main run loop to outer runWorker */ run()580 public void run() { 581 runWorker(this); 582 } 583 584 // Lock methods 585 // 586 // The value 0 represents the unlocked state. 587 // The value 1 represents the locked state. 588 isHeldExclusively()589 protected boolean isHeldExclusively() { 590 return getState() == 1; 591 } 592 tryAcquire(int unused)593 protected boolean tryAcquire(int unused) { 594 if (compareAndSetState(0, 1)) { 595 setExclusiveOwnerThread(Thread.currentThread()); 596 return true; 597 } 598 return false; 599 } 600 tryRelease(int unused)601 protected boolean tryRelease(int unused) { 602 setExclusiveOwnerThread(null); 603 setState(0); 604 return true; 605 } 606 lock()607 public void lock() { acquire(1); } tryLock()608 public boolean tryLock() { return tryAcquire(1); } unlock()609 public void unlock() { release(1); } isLocked()610 public boolean isLocked() { return isHeldExclusively(); } 611 } 612 613 /* 614 * Methods for setting control state 615 */ 616 617 /** 618 * Transitions runState to given target, or leaves it alone if 619 * already at least the given target. 620 * 621 * @param targetState the desired state, either SHUTDOWN or STOP 622 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 623 */ advanceRunState(int targetState)624 private void advanceRunState(int targetState) { 625 for (;;) { 626 int c = ctl.get(); 627 if (runStateAtLeast(c, targetState) || 628 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 629 break; 630 } 631 } 632 633 /** 634 * Transitions to TERMINATED state if either (SHUTDOWN and pool 635 * and queue empty) or (STOP and pool empty). If otherwise 636 * eligible to terminate but workerCount is nonzero, interrupts an 637 * idle worker to ensure that shutdown signals propagate. This 638 * method must be called following any action that might make 639 * termination possible -- reducing worker count or removing tasks 640 * from the queue during shutdown. The method is non-private to 641 * allow access from ScheduledThreadPoolExecutor. 642 */ tryTerminate()643 final void tryTerminate() { 644 for (;;) { 645 int c = ctl.get(); 646 if (isRunning(c) || 647 runStateAtLeast(c, TIDYING) || 648 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 649 return; 650 if (workerCountOf(c) != 0) { // Eligible to terminate 651 interruptIdleWorkers(ONLY_ONE); 652 return; 653 } 654 655 final ReentrantLock mainLock = this.mainLock; 656 mainLock.lock(); 657 try { 658 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 659 try { 660 terminated(); 661 } finally { 662 ctl.set(ctlOf(TERMINATED, 0)); 663 termination.signalAll(); 664 } 665 return; 666 } 667 } finally { 668 mainLock.unlock(); 669 } 670 // else retry on failed CAS 671 } 672 } 673 674 /* 675 * Methods for controlling interrupts to worker threads. 676 */ 677 678 /** 679 * If there is a security manager, makes sure caller has 680 * permission to shut down threads in general (see shutdownPerm). 681 * If this passes, additionally makes sure the caller is allowed 682 * to interrupt each worker thread. This might not be true even if 683 * first check passed, if the SecurityManager treats some threads 684 * specially. 685 */ checkShutdownAccess()686 private void checkShutdownAccess() { 687 SecurityManager security = System.getSecurityManager(); 688 if (security != null) { 689 security.checkPermission(shutdownPerm); 690 final ReentrantLock mainLock = this.mainLock; 691 mainLock.lock(); 692 try { 693 for (Worker w : workers) 694 security.checkAccess(w.thread); 695 } finally { 696 mainLock.unlock(); 697 } 698 } 699 } 700 701 /** 702 * Interrupts all threads, even if active. Ignores SecurityExceptions 703 * (in which case some threads may remain uninterrupted). 704 */ interruptWorkers()705 private void interruptWorkers() { 706 final ReentrantLock mainLock = this.mainLock; 707 mainLock.lock(); 708 try { 709 for (Worker w : workers) { 710 try { 711 w.thread.interrupt(); 712 } catch (SecurityException ignore) { 713 } 714 } 715 } finally { 716 mainLock.unlock(); 717 } 718 } 719 720 /** 721 * Interrupts threads that might be waiting for tasks (as 722 * indicated by not being locked) so they can check for 723 * termination or configuration changes. Ignores 724 * SecurityExceptions (in which case some threads may remain 725 * uninterrupted). 726 * 727 * @param onlyOne If true, interrupt at most one worker. This is 728 * called only from tryTerminate when termination is otherwise 729 * enabled but there are still other workers. In this case, at 730 * most one waiting worker is interrupted to propagate shutdown 731 * signals in case all threads are currently waiting. 732 * Interrupting any arbitrary thread ensures that newly arriving 733 * workers since shutdown began will also eventually exit. 734 * To guarantee eventual termination, it suffices to always 735 * interrupt only one idle worker, but shutdown() interrupts all 736 * idle workers so that redundant workers exit promptly, not 737 * waiting for a straggler task to finish. 738 */ interruptIdleWorkers(boolean onlyOne)739 private void interruptIdleWorkers(boolean onlyOne) { 740 final ReentrantLock mainLock = this.mainLock; 741 mainLock.lock(); 742 try { 743 for (Worker w : workers) { 744 Thread t = w.thread; 745 if (!t.isInterrupted() && w.tryLock()) { 746 try { 747 t.interrupt(); 748 } catch (SecurityException ignore) { 749 } finally { 750 w.unlock(); 751 } 752 } 753 if (onlyOne) 754 break; 755 } 756 } finally { 757 mainLock.unlock(); 758 } 759 } 760 761 /** 762 * Common form of interruptIdleWorkers, to avoid having to 763 * remember what the boolean argument means. 764 */ interruptIdleWorkers()765 private void interruptIdleWorkers() { 766 interruptIdleWorkers(false); 767 } 768 769 private static final boolean ONLY_ONE = true; 770 771 /** 772 * Ensures that unless the pool is stopping, the current thread 773 * does not have its interrupt set. This requires a double-check 774 * of state in case the interrupt was cleared concurrently with a 775 * shutdownNow -- if so, the interrupt is re-enabled. 776 */ clearInterruptsForTaskRun()777 private void clearInterruptsForTaskRun() { 778 if (runStateLessThan(ctl.get(), STOP) && 779 Thread.interrupted() && 780 runStateAtLeast(ctl.get(), STOP)) 781 Thread.currentThread().interrupt(); 782 } 783 784 /* 785 * Misc utilities, most of which are also exported to 786 * ScheduledThreadPoolExecutor 787 */ 788 789 /** 790 * Invokes the rejected execution handler for the given command. 791 * Package-protected for use by ScheduledThreadPoolExecutor. 792 */ reject(Runnable command)793 final void reject(Runnable command) { 794 handler.rejectedExecution(command, this); 795 } 796 797 /** 798 * Performs any further cleanup following run state transition on 799 * invocation of shutdown. A no-op here, but used by 800 * ScheduledThreadPoolExecutor to cancel delayed tasks. 801 */ onShutdown()802 void onShutdown() { 803 } 804 805 /** 806 * State check needed by ScheduledThreadPoolExecutor to 807 * enable running tasks during shutdown. 808 * 809 * @param shutdownOK true if should return true if SHUTDOWN 810 */ isRunningOrShutdown(boolean shutdownOK)811 final boolean isRunningOrShutdown(boolean shutdownOK) { 812 int rs = runStateOf(ctl.get()); 813 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); 814 } 815 816 /** 817 * Drains the task queue into a new list, normally using 818 * drainTo. But if the queue is a DelayQueue or any other kind of 819 * queue for which poll or drainTo may fail to remove some 820 * elements, it deletes them one by one. 821 */ drainQueue()822 private List<Runnable> drainQueue() { 823 BlockingQueue<Runnable> q = workQueue; 824 List<Runnable> taskList = new ArrayList<Runnable>(); 825 q.drainTo(taskList); 826 if (!q.isEmpty()) { 827 for (Runnable r : q.toArray(new Runnable[0])) { 828 if (q.remove(r)) 829 taskList.add(r); 830 } 831 } 832 return taskList; 833 } 834 835 /* 836 * Methods for creating, running and cleaning up after workers 837 */ 838 839 /** 840 * Checks if a new worker can be added with respect to current 841 * pool state and the given bound (either core or maximum). If so, 842 * the worker count is adjusted accordingly, and, if possible, a 843 * new worker is created and started running firstTask as its 844 * first task. This method returns false if the pool is stopped or 845 * eligible to shut down. It also returns false if the thread 846 * factory fails to create a thread when asked, which requires a 847 * backout of workerCount, and a recheck for termination, in case 848 * the existence of this worker was holding up termination. 849 * 850 * @param firstTask the task the new thread should run first (or 851 * null if none). Workers are created with an initial first task 852 * (in method execute()) to bypass queuing when there are fewer 853 * than corePoolSize threads (in which case we always start one), 854 * or when the queue is full (in which case we must bypass queue). 855 * Initially idle threads are usually created via 856 * prestartCoreThread or to replace other dying workers. 857 * 858 * @param core if true use corePoolSize as bound, else 859 * maximumPoolSize. (A boolean indicator is used here rather than a 860 * value to ensure reads of fresh values after checking other pool 861 * state). 862 * @return true if successful 863 */ addWorker(Runnable firstTask, boolean core)864 private boolean addWorker(Runnable firstTask, boolean core) { 865 retry: 866 for (;;) { 867 int c = ctl.get(); 868 int rs = runStateOf(c); 869 870 // Check if queue empty only if necessary. 871 if (rs >= SHUTDOWN && 872 ! (rs == SHUTDOWN && 873 firstTask == null && 874 ! workQueue.isEmpty())) 875 return false; 876 877 for (;;) { 878 int wc = workerCountOf(c); 879 if (wc >= CAPACITY || 880 wc >= (core ? corePoolSize : maximumPoolSize)) 881 return false; 882 if (compareAndIncrementWorkerCount(c)) 883 break retry; 884 c = ctl.get(); // Re-read ctl 885 if (runStateOf(c) != rs) 886 continue retry; 887 // else CAS failed due to workerCount change; retry inner loop 888 } 889 } 890 891 Worker w = new Worker(firstTask); 892 Thread t = w.thread; 893 894 final ReentrantLock mainLock = this.mainLock; 895 mainLock.lock(); 896 try { 897 // Recheck while holding lock. 898 // Back out on ThreadFactory failure or if 899 // shut down before lock acquired. 900 int c = ctl.get(); 901 int rs = runStateOf(c); 902 903 if (t == null || 904 (rs >= SHUTDOWN && 905 ! (rs == SHUTDOWN && 906 firstTask == null))) { 907 decrementWorkerCount(); 908 tryTerminate(); 909 return false; 910 } 911 912 workers.add(w); 913 914 int s = workers.size(); 915 if (s > largestPoolSize) 916 largestPoolSize = s; 917 } finally { 918 mainLock.unlock(); 919 } 920 921 t.start(); 922 // It is possible (but unlikely) for a thread to have been 923 // added to workers, but not yet started, during transition to 924 // STOP, which could result in a rare missed interrupt, 925 // because Thread.interrupt is not guaranteed to have any effect 926 // on a non-yet-started Thread (see Thread#interrupt). 927 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) 928 t.interrupt(); 929 930 return true; 931 } 932 933 /** 934 * Performs cleanup and bookkeeping for a dying worker. Called 935 * only from worker threads. Unless completedAbruptly is set, 936 * assumes that workerCount has already been adjusted to account 937 * for exit. This method removes thread from worker set, and 938 * possibly terminates the pool or replaces the worker if either 939 * it exited due to user task exception or if fewer than 940 * corePoolSize workers are running or queue is non-empty but 941 * there are no workers. 942 * 943 * @param w the worker 944 * @param completedAbruptly if the worker died due to user exception 945 */ processWorkerExit(Worker w, boolean completedAbruptly)946 private void processWorkerExit(Worker w, boolean completedAbruptly) { 947 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 948 decrementWorkerCount(); 949 950 final ReentrantLock mainLock = this.mainLock; 951 mainLock.lock(); 952 try { 953 completedTaskCount += w.completedTasks; 954 workers.remove(w); 955 } finally { 956 mainLock.unlock(); 957 } 958 959 tryTerminate(); 960 961 int c = ctl.get(); 962 if (runStateLessThan(c, STOP)) { 963 if (!completedAbruptly) { 964 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 965 if (min == 0 && ! workQueue.isEmpty()) 966 min = 1; 967 if (workerCountOf(c) >= min) 968 return; // replacement not needed 969 } 970 addWorker(null, false); 971 } 972 } 973 974 /** 975 * Performs blocking or timed wait for a task, depending on 976 * current configuration settings, or returns null if this worker 977 * must exit because of any of: 978 * 1. There are more than maximumPoolSize workers (due to 979 * a call to setMaximumPoolSize). 980 * 2. The pool is stopped. 981 * 3. The pool is shutdown and the queue is empty. 982 * 4. This worker timed out waiting for a task, and timed-out 983 * workers are subject to termination (that is, 984 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 985 * both before and after the timed wait. 986 * 987 * @return task, or null if the worker must exit, in which case 988 * workerCount is decremented 989 */ getTask()990 private Runnable getTask() { 991 boolean timedOut = false; // Did the last poll() time out? 992 993 retry: 994 for (;;) { 995 int c = ctl.get(); 996 int rs = runStateOf(c); 997 998 // Check if queue empty only if necessary. 999 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 1000 decrementWorkerCount(); 1001 return null; 1002 } 1003 1004 boolean timed; // Are workers subject to culling? 1005 1006 for (;;) { 1007 int wc = workerCountOf(c); 1008 timed = allowCoreThreadTimeOut || wc > corePoolSize; 1009 1010 if (wc <= maximumPoolSize && ! (timedOut && timed)) 1011 break; 1012 if (compareAndDecrementWorkerCount(c)) 1013 return null; 1014 c = ctl.get(); // Re-read ctl 1015 if (runStateOf(c) != rs) 1016 continue retry; 1017 // else CAS failed due to workerCount change; retry inner loop 1018 } 1019 1020 try { 1021 Runnable r = timed ? 1022 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 1023 workQueue.take(); 1024 if (r != null) 1025 return r; 1026 timedOut = true; 1027 } catch (InterruptedException retry) { 1028 timedOut = false; 1029 } 1030 } 1031 } 1032 1033 /** 1034 * Main worker run loop. Repeatedly gets tasks from queue and 1035 * executes them, while coping with a number of issues: 1036 * 1037 * 1. We may start out with an initial task, in which case we 1038 * don't need to get the first one. Otherwise, as long as pool is 1039 * running, we get tasks from getTask. If it returns null then the 1040 * worker exits due to changed pool state or configuration 1041 * parameters. Other exits result from exception throws in 1042 * external code, in which case completedAbruptly holds, which 1043 * usually leads processWorkerExit to replace this thread. 1044 * 1045 * 2. Before running any task, the lock is acquired to prevent 1046 * other pool interrupts while the task is executing, and 1047 * clearInterruptsForTaskRun called to ensure that unless pool is 1048 * stopping, this thread does not have its interrupt set. 1049 * 1050 * 3. Each task run is preceded by a call to beforeExecute, which 1051 * might throw an exception, in which case we cause thread to die 1052 * (breaking loop with completedAbruptly true) without processing 1053 * the task. 1054 * 1055 * 4. Assuming beforeExecute completes normally, we run the task, 1056 * gathering any of its thrown exceptions to send to 1057 * afterExecute. We separately handle RuntimeException, Error 1058 * (both of which the specs guarantee that we trap) and arbitrary 1059 * Throwables. Because we cannot rethrow Throwables within 1060 * Runnable.run, we wrap them within Errors on the way out (to the 1061 * thread's UncaughtExceptionHandler). Any thrown exception also 1062 * conservatively causes thread to die. 1063 * 1064 * 5. After task.run completes, we call afterExecute, which may 1065 * also throw an exception, which will also cause thread to 1066 * die. According to JLS Sec 14.20, this exception is the one that 1067 * will be in effect even if task.run throws. 1068 * 1069 * The net effect of the exception mechanics is that afterExecute 1070 * and the thread's UncaughtExceptionHandler have as accurate 1071 * information as we can provide about any problems encountered by 1072 * user code. 1073 * 1074 * @param w the worker 1075 */ runWorker(Worker w)1076 final void runWorker(Worker w) { 1077 Runnable task = w.firstTask; 1078 w.firstTask = null; 1079 boolean completedAbruptly = true; 1080 try { 1081 while (task != null || (task = getTask()) != null) { 1082 w.lock(); 1083 clearInterruptsForTaskRun(); 1084 try { 1085 beforeExecute(w.thread, task); 1086 Throwable thrown = null; 1087 try { 1088 task.run(); 1089 } catch (RuntimeException x) { 1090 thrown = x; throw x; 1091 } catch (Error x) { 1092 thrown = x; throw x; 1093 } catch (Throwable x) { 1094 thrown = x; throw new Error(x); 1095 } finally { 1096 afterExecute(task, thrown); 1097 } 1098 } finally { 1099 task = null; 1100 w.completedTasks++; 1101 w.unlock(); 1102 } 1103 } 1104 completedAbruptly = false; 1105 } finally { 1106 processWorkerExit(w, completedAbruptly); 1107 } 1108 } 1109 1110 // Public constructors and methods 1111 1112 /** 1113 * Creates a new {@code ThreadPoolExecutor} with the given initial 1114 * parameters and default thread factory and rejected execution handler. 1115 * It may be more convenient to use one of the {@link Executors} factory 1116 * methods instead of this general purpose constructor. 1117 * 1118 * @param corePoolSize the number of threads to keep in the pool, even 1119 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1120 * @param maximumPoolSize the maximum number of threads to allow in the 1121 * pool 1122 * @param keepAliveTime when the number of threads is greater than 1123 * the core, this is the maximum time that excess idle threads 1124 * will wait for new tasks before terminating. 1125 * @param unit the time unit for the {@code keepAliveTime} argument 1126 * @param workQueue the queue to use for holding tasks before they are 1127 * executed. This queue will hold only the {@code Runnable} 1128 * tasks submitted by the {@code execute} method. 1129 * @throws IllegalArgumentException if one of the following holds:<br> 1130 * {@code corePoolSize < 0}<br> 1131 * {@code keepAliveTime < 0}<br> 1132 * {@code maximumPoolSize <= 0}<br> 1133 * {@code maximumPoolSize < corePoolSize} 1134 * @throws NullPointerException if {@code workQueue} is null 1135 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)1136 public ThreadPoolExecutor(int corePoolSize, 1137 int maximumPoolSize, 1138 long keepAliveTime, 1139 TimeUnit unit, 1140 BlockingQueue<Runnable> workQueue) { 1141 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1142 Executors.defaultThreadFactory(), defaultHandler); 1143 } 1144 1145 /** 1146 * Creates a new {@code ThreadPoolExecutor} with the given initial 1147 * parameters and default rejected execution handler. 1148 * 1149 * @param corePoolSize the number of threads to keep in the pool, even 1150 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1151 * @param maximumPoolSize the maximum number of threads to allow in the 1152 * pool 1153 * @param keepAliveTime when the number of threads is greater than 1154 * the core, this is the maximum time that excess idle threads 1155 * will wait for new tasks before terminating. 1156 * @param unit the time unit for the {@code keepAliveTime} argument 1157 * @param workQueue the queue to use for holding tasks before they are 1158 * executed. This queue will hold only the {@code Runnable} 1159 * tasks submitted by the {@code execute} method. 1160 * @param threadFactory the factory to use when the executor 1161 * creates a new thread 1162 * @throws IllegalArgumentException if one of the following holds:<br> 1163 * {@code corePoolSize < 0}<br> 1164 * {@code keepAliveTime < 0}<br> 1165 * {@code maximumPoolSize <= 0}<br> 1166 * {@code maximumPoolSize < corePoolSize} 1167 * @throws NullPointerException if {@code workQueue} 1168 * or {@code threadFactory} is null 1169 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)1170 public ThreadPoolExecutor(int corePoolSize, 1171 int maximumPoolSize, 1172 long keepAliveTime, 1173 TimeUnit unit, 1174 BlockingQueue<Runnable> workQueue, 1175 ThreadFactory threadFactory) { 1176 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1177 threadFactory, defaultHandler); 1178 } 1179 1180 /** 1181 * Creates a new {@code ThreadPoolExecutor} with the given initial 1182 * parameters and default thread factory. 1183 * 1184 * @param corePoolSize the number of threads to keep in the pool, even 1185 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1186 * @param maximumPoolSize the maximum number of threads to allow in the 1187 * pool 1188 * @param keepAliveTime when the number of threads is greater than 1189 * the core, this is the maximum time that excess idle threads 1190 * will wait for new tasks before terminating. 1191 * @param unit the time unit for the {@code keepAliveTime} argument 1192 * @param workQueue the queue to use for holding tasks before they are 1193 * executed. This queue will hold only the {@code Runnable} 1194 * tasks submitted by the {@code execute} method. 1195 * @param handler the handler to use when execution is blocked 1196 * because the thread bounds and queue capacities are reached 1197 * @throws IllegalArgumentException if one of the following holds:<br> 1198 * {@code corePoolSize < 0}<br> 1199 * {@code keepAliveTime < 0}<br> 1200 * {@code maximumPoolSize <= 0}<br> 1201 * {@code maximumPoolSize < corePoolSize} 1202 * @throws NullPointerException if {@code workQueue} 1203 * or {@code handler} is null 1204 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)1205 public ThreadPoolExecutor(int corePoolSize, 1206 int maximumPoolSize, 1207 long keepAliveTime, 1208 TimeUnit unit, 1209 BlockingQueue<Runnable> workQueue, 1210 RejectedExecutionHandler handler) { 1211 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1212 Executors.defaultThreadFactory(), handler); 1213 } 1214 1215 /** 1216 * Creates a new {@code ThreadPoolExecutor} with the given initial 1217 * parameters. 1218 * 1219 * @param corePoolSize the number of threads to keep in the pool, even 1220 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1221 * @param maximumPoolSize the maximum number of threads to allow in the 1222 * pool 1223 * @param keepAliveTime when the number of threads is greater than 1224 * the core, this is the maximum time that excess idle threads 1225 * will wait for new tasks before terminating. 1226 * @param unit the time unit for the {@code keepAliveTime} argument 1227 * @param workQueue the queue to use for holding tasks before they are 1228 * executed. This queue will hold only the {@code Runnable} 1229 * tasks submitted by the {@code execute} method. 1230 * @param threadFactory the factory to use when the executor 1231 * creates a new thread 1232 * @param handler the handler to use when execution is blocked 1233 * because the thread bounds and queue capacities are reached 1234 * @throws IllegalArgumentException if one of the following holds:<br> 1235 * {@code corePoolSize < 0}<br> 1236 * {@code keepAliveTime < 0}<br> 1237 * {@code maximumPoolSize <= 0}<br> 1238 * {@code maximumPoolSize < corePoolSize} 1239 * @throws NullPointerException if {@code workQueue} 1240 * or {@code threadFactory} or {@code handler} is null 1241 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)1242 public ThreadPoolExecutor(int corePoolSize, 1243 int maximumPoolSize, 1244 long keepAliveTime, 1245 TimeUnit unit, 1246 BlockingQueue<Runnable> workQueue, 1247 ThreadFactory threadFactory, 1248 RejectedExecutionHandler handler) { 1249 if (corePoolSize < 0 || 1250 maximumPoolSize <= 0 || 1251 maximumPoolSize < corePoolSize || 1252 keepAliveTime < 0) 1253 throw new IllegalArgumentException(); 1254 if (workQueue == null || threadFactory == null || handler == null) 1255 throw new NullPointerException(); 1256 this.corePoolSize = corePoolSize; 1257 this.maximumPoolSize = maximumPoolSize; 1258 this.workQueue = workQueue; 1259 this.keepAliveTime = unit.toNanos(keepAliveTime); 1260 this.threadFactory = threadFactory; 1261 this.handler = handler; 1262 } 1263 1264 /** 1265 * Executes the given task sometime in the future. The task 1266 * may execute in a new thread or in an existing pooled thread. 1267 * 1268 * If the task cannot be submitted for execution, either because this 1269 * executor has been shutdown or because its capacity has been reached, 1270 * the task is handled by the current {@code RejectedExecutionHandler}. 1271 * 1272 * @param command the task to execute 1273 * @throws RejectedExecutionException at discretion of 1274 * {@code RejectedExecutionHandler}, if the task 1275 * cannot be accepted for execution 1276 * @throws NullPointerException if {@code command} is null 1277 */ execute(Runnable command)1278 public void execute(Runnable command) { 1279 if (command == null) 1280 throw new NullPointerException(); 1281 /* 1282 * Proceed in 3 steps: 1283 * 1284 * 1. If fewer than corePoolSize threads are running, try to 1285 * start a new thread with the given command as its first 1286 * task. The call to addWorker atomically checks runState and 1287 * workerCount, and so prevents false alarms that would add 1288 * threads when it shouldn't, by returning false. 1289 * 1290 * 2. If a task can be successfully queued, then we still need 1291 * to double-check whether we should have added a thread 1292 * (because existing ones died since last checking) or that 1293 * the pool shut down since entry into this method. So we 1294 * recheck state and if necessary roll back the enqueuing if 1295 * stopped, or start a new thread if there are none. 1296 * 1297 * 3. If we cannot queue task, then we try to add a new 1298 * thread. If it fails, we know we are shut down or saturated 1299 * and so reject the task. 1300 */ 1301 int c = ctl.get(); 1302 if (workerCountOf(c) < corePoolSize) { 1303 if (addWorker(command, true)) 1304 return; 1305 c = ctl.get(); 1306 } 1307 if (isRunning(c) && workQueue.offer(command)) { 1308 int recheck = ctl.get(); 1309 if (! isRunning(recheck) && remove(command)) 1310 reject(command); 1311 else if (workerCountOf(recheck) == 0) 1312 addWorker(null, false); 1313 } 1314 else if (!addWorker(command, false)) 1315 reject(command); 1316 } 1317 1318 /** 1319 * Initiates an orderly shutdown in which previously submitted 1320 * tasks are executed, but no new tasks will be accepted. 1321 * Invocation has no additional effect if already shut down. 1322 * 1323 * <p>This method does not wait for previously submitted tasks to 1324 * complete execution. Use {@link #awaitTermination awaitTermination} 1325 * to do that. 1326 * 1327 * @throws SecurityException {@inheritDoc} 1328 */ shutdown()1329 public void shutdown() { 1330 final ReentrantLock mainLock = this.mainLock; 1331 mainLock.lock(); 1332 try { 1333 checkShutdownAccess(); 1334 advanceRunState(SHUTDOWN); 1335 interruptIdleWorkers(); 1336 onShutdown(); // hook for ScheduledThreadPoolExecutor 1337 } finally { 1338 mainLock.unlock(); 1339 } 1340 tryTerminate(); 1341 } 1342 1343 /** 1344 * Attempts to stop all actively executing tasks, halts the 1345 * processing of waiting tasks, and returns a list of the tasks 1346 * that were awaiting execution. These tasks are drained (removed) 1347 * from the task queue upon return from this method. 1348 * 1349 * <p>This method does not wait for actively executing tasks to 1350 * terminate. Use {@link #awaitTermination awaitTermination} to 1351 * do that. 1352 * 1353 * <p>There are no guarantees beyond best-effort attempts to stop 1354 * processing actively executing tasks. This implementation 1355 * cancels tasks via {@link Thread#interrupt}, so any task that 1356 * fails to respond to interrupts may never terminate. 1357 * 1358 * @throws SecurityException {@inheritDoc} 1359 */ shutdownNow()1360 public List<Runnable> shutdownNow() { 1361 List<Runnable> tasks; 1362 final ReentrantLock mainLock = this.mainLock; 1363 mainLock.lock(); 1364 try { 1365 checkShutdownAccess(); 1366 advanceRunState(STOP); 1367 interruptWorkers(); 1368 tasks = drainQueue(); 1369 } finally { 1370 mainLock.unlock(); 1371 } 1372 tryTerminate(); 1373 return tasks; 1374 } 1375 isShutdown()1376 public boolean isShutdown() { 1377 return ! isRunning(ctl.get()); 1378 } 1379 1380 /** 1381 * Returns true if this executor is in the process of terminating 1382 * after {@link #shutdown} or {@link #shutdownNow} but has not 1383 * completely terminated. This method may be useful for 1384 * debugging. A return of {@code true} reported a sufficient 1385 * period after shutdown may indicate that submitted tasks have 1386 * ignored or suppressed interruption, causing this executor not 1387 * to properly terminate. 1388 * 1389 * @return true if terminating but not yet terminated 1390 */ isTerminating()1391 public boolean isTerminating() { 1392 int c = ctl.get(); 1393 return ! isRunning(c) && runStateLessThan(c, TERMINATED); 1394 } 1395 isTerminated()1396 public boolean isTerminated() { 1397 return runStateAtLeast(ctl.get(), TERMINATED); 1398 } 1399 awaitTermination(long timeout, TimeUnit unit)1400 public boolean awaitTermination(long timeout, TimeUnit unit) 1401 throws InterruptedException { 1402 long nanos = unit.toNanos(timeout); 1403 final ReentrantLock mainLock = this.mainLock; 1404 mainLock.lock(); 1405 try { 1406 for (;;) { 1407 if (runStateAtLeast(ctl.get(), TERMINATED)) 1408 return true; 1409 if (nanos <= 0) 1410 return false; 1411 nanos = termination.awaitNanos(nanos); 1412 } 1413 } finally { 1414 mainLock.unlock(); 1415 } 1416 } 1417 1418 /** 1419 * Invokes {@code shutdown} when this executor is no longer 1420 * referenced and it has no threads. 1421 */ finalize()1422 @Override protected void finalize() { 1423 try { 1424 shutdown(); 1425 } finally { 1426 try { 1427 super.finalize(); 1428 } catch (Throwable t) { 1429 throw new AssertionError(t); 1430 } 1431 } 1432 } 1433 1434 /** 1435 * Sets the thread factory used to create new threads. 1436 * 1437 * @param threadFactory the new thread factory 1438 * @throws NullPointerException if threadFactory is null 1439 * @see #getThreadFactory 1440 */ setThreadFactory(ThreadFactory threadFactory)1441 public void setThreadFactory(ThreadFactory threadFactory) { 1442 if (threadFactory == null) 1443 throw new NullPointerException(); 1444 this.threadFactory = threadFactory; 1445 } 1446 1447 /** 1448 * Returns the thread factory used to create new threads. 1449 * 1450 * @return the current thread factory 1451 * @see #setThreadFactory 1452 */ getThreadFactory()1453 public ThreadFactory getThreadFactory() { 1454 return threadFactory; 1455 } 1456 1457 /** 1458 * Sets a new handler for unexecutable tasks. 1459 * 1460 * @param handler the new handler 1461 * @throws NullPointerException if handler is null 1462 * @see #getRejectedExecutionHandler 1463 */ setRejectedExecutionHandler(RejectedExecutionHandler handler)1464 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1465 if (handler == null) 1466 throw new NullPointerException(); 1467 this.handler = handler; 1468 } 1469 1470 /** 1471 * Returns the current handler for unexecutable tasks. 1472 * 1473 * @return the current handler 1474 * @see #setRejectedExecutionHandler 1475 */ getRejectedExecutionHandler()1476 public RejectedExecutionHandler getRejectedExecutionHandler() { 1477 return handler; 1478 } 1479 1480 /** 1481 * Sets the core number of threads. This overrides any value set 1482 * in the constructor. If the new value is smaller than the 1483 * current value, excess existing threads will be terminated when 1484 * they next become idle. If larger, new threads will, if needed, 1485 * be started to execute any queued tasks. 1486 * 1487 * @param corePoolSize the new core size 1488 * @throws IllegalArgumentException if {@code corePoolSize < 0} 1489 * @see #getCorePoolSize 1490 */ setCorePoolSize(int corePoolSize)1491 public void setCorePoolSize(int corePoolSize) { 1492 if (corePoolSize < 0) 1493 throw new IllegalArgumentException(); 1494 int delta = corePoolSize - this.corePoolSize; 1495 this.corePoolSize = corePoolSize; 1496 if (workerCountOf(ctl.get()) > corePoolSize) 1497 interruptIdleWorkers(); 1498 else if (delta > 0) { 1499 // We don't really know how many new threads are "needed". 1500 // As a heuristic, prestart enough new workers (up to new 1501 // core size) to handle the current number of tasks in 1502 // queue, but stop if queue becomes empty while doing so. 1503 int k = Math.min(delta, workQueue.size()); 1504 while (k-- > 0 && addWorker(null, true)) { 1505 if (workQueue.isEmpty()) 1506 break; 1507 } 1508 } 1509 } 1510 1511 /** 1512 * Returns the core number of threads. 1513 * 1514 * @return the core number of threads 1515 * @see #setCorePoolSize 1516 */ getCorePoolSize()1517 public int getCorePoolSize() { 1518 return corePoolSize; 1519 } 1520 1521 /** 1522 * Starts a core thread, causing it to idly wait for work. This 1523 * overrides the default policy of starting core threads only when 1524 * new tasks are executed. This method will return {@code false} 1525 * if all core threads have already been started. 1526 * 1527 * @return {@code true} if a thread was started 1528 */ prestartCoreThread()1529 public boolean prestartCoreThread() { 1530 return workerCountOf(ctl.get()) < corePoolSize && 1531 addWorker(null, true); 1532 } 1533 1534 /** 1535 * Starts all core threads, causing them to idly wait for work. This 1536 * overrides the default policy of starting core threads only when 1537 * new tasks are executed. 1538 * 1539 * @return the number of threads started 1540 */ prestartAllCoreThreads()1541 public int prestartAllCoreThreads() { 1542 int n = 0; 1543 while (addWorker(null, true)) 1544 ++n; 1545 return n; 1546 } 1547 1548 /** 1549 * Returns true if this pool allows core threads to time out and 1550 * terminate if no tasks arrive within the keepAlive time, being 1551 * replaced if needed when new tasks arrive. When true, the same 1552 * keep-alive policy applying to non-core threads applies also to 1553 * core threads. When false (the default), core threads are never 1554 * terminated due to lack of incoming tasks. 1555 * 1556 * @return {@code true} if core threads are allowed to time out, 1557 * else {@code false} 1558 * 1559 * @since 1.6 1560 */ allowsCoreThreadTimeOut()1561 public boolean allowsCoreThreadTimeOut() { 1562 return allowCoreThreadTimeOut; 1563 } 1564 1565 /** 1566 * Sets the policy governing whether core threads may time out and 1567 * terminate if no tasks arrive within the keep-alive time, being 1568 * replaced if needed when new tasks arrive. When false, core 1569 * threads are never terminated due to lack of incoming 1570 * tasks. When true, the same keep-alive policy applying to 1571 * non-core threads applies also to core threads. To avoid 1572 * continual thread replacement, the keep-alive time must be 1573 * greater than zero when setting {@code true}. This method 1574 * should in general be called before the pool is actively used. 1575 * 1576 * @param value {@code true} if should time out, else {@code false} 1577 * @throws IllegalArgumentException if value is {@code true} 1578 * and the current keep-alive time is not greater than zero 1579 * 1580 * @since 1.6 1581 */ allowCoreThreadTimeOut(boolean value)1582 public void allowCoreThreadTimeOut(boolean value) { 1583 if (value && keepAliveTime <= 0) 1584 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1585 if (value != allowCoreThreadTimeOut) { 1586 allowCoreThreadTimeOut = value; 1587 if (value) 1588 interruptIdleWorkers(); 1589 } 1590 } 1591 1592 /** 1593 * Sets the maximum allowed number of threads. This overrides any 1594 * value set in the constructor. If the new value is smaller than 1595 * the current value, excess existing threads will be 1596 * terminated when they next become idle. 1597 * 1598 * @param maximumPoolSize the new maximum 1599 * @throws IllegalArgumentException if the new maximum is 1600 * less than or equal to zero, or 1601 * less than the {@linkplain #getCorePoolSize core pool size} 1602 * @see #getMaximumPoolSize 1603 */ setMaximumPoolSize(int maximumPoolSize)1604 public void setMaximumPoolSize(int maximumPoolSize) { 1605 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1606 throw new IllegalArgumentException(); 1607 this.maximumPoolSize = maximumPoolSize; 1608 if (workerCountOf(ctl.get()) > maximumPoolSize) 1609 interruptIdleWorkers(); 1610 } 1611 1612 /** 1613 * Returns the maximum allowed number of threads. 1614 * 1615 * @return the maximum allowed number of threads 1616 * @see #setMaximumPoolSize 1617 */ getMaximumPoolSize()1618 public int getMaximumPoolSize() { 1619 return maximumPoolSize; 1620 } 1621 1622 /** 1623 * Sets the time limit for which threads may remain idle before 1624 * being terminated. If there are more than the core number of 1625 * threads currently in the pool, after waiting this amount of 1626 * time without processing a task, excess threads will be 1627 * terminated. This overrides any value set in the constructor. 1628 * 1629 * @param time the time to wait. A time value of zero will cause 1630 * excess threads to terminate immediately after executing tasks. 1631 * @param unit the time unit of the {@code time} argument 1632 * @throws IllegalArgumentException if {@code time} less than zero or 1633 * if {@code time} is zero and {@code allowsCoreThreadTimeOut} 1634 * @see #getKeepAliveTime 1635 */ setKeepAliveTime(long time, TimeUnit unit)1636 public void setKeepAliveTime(long time, TimeUnit unit) { 1637 if (time < 0) 1638 throw new IllegalArgumentException(); 1639 if (time == 0 && allowsCoreThreadTimeOut()) 1640 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1641 long keepAliveTime = unit.toNanos(time); 1642 long delta = keepAliveTime - this.keepAliveTime; 1643 this.keepAliveTime = keepAliveTime; 1644 if (delta < 0) 1645 interruptIdleWorkers(); 1646 } 1647 1648 /** 1649 * Returns the thread keep-alive time, which is the amount of time 1650 * that threads in excess of the core pool size may remain 1651 * idle before being terminated. 1652 * 1653 * @param unit the desired time unit of the result 1654 * @return the time limit 1655 * @see #setKeepAliveTime 1656 */ getKeepAliveTime(TimeUnit unit)1657 public long getKeepAliveTime(TimeUnit unit) { 1658 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1659 } 1660 1661 /* User-level queue utilities */ 1662 1663 /** 1664 * Returns the task queue used by this executor. Access to the 1665 * task queue is intended primarily for debugging and monitoring. 1666 * This queue may be in active use. Retrieving the task queue 1667 * does not prevent queued tasks from executing. 1668 * 1669 * @return the task queue 1670 */ getQueue()1671 public BlockingQueue<Runnable> getQueue() { 1672 return workQueue; 1673 } 1674 1675 /** 1676 * Removes this task from the executor's internal queue if it is 1677 * present, thus causing it not to be run if it has not already 1678 * started. 1679 * 1680 * <p> This method may be useful as one part of a cancellation 1681 * scheme. It may fail to remove tasks that have been converted 1682 * into other forms before being placed on the internal queue. For 1683 * example, a task entered using {@code submit} might be 1684 * converted into a form that maintains {@code Future} status. 1685 * However, in such cases, method {@link #purge} may be used to 1686 * remove those Futures that have been cancelled. 1687 * 1688 * @param task the task to remove 1689 * @return true if the task was removed 1690 */ remove(Runnable task)1691 public boolean remove(Runnable task) { 1692 boolean removed = workQueue.remove(task); 1693 tryTerminate(); // In case SHUTDOWN and now empty 1694 return removed; 1695 } 1696 1697 /** 1698 * Tries to remove from the work queue all {@link Future} 1699 * tasks that have been cancelled. This method can be useful as a 1700 * storage reclamation operation, that has no other impact on 1701 * functionality. Cancelled tasks are never executed, but may 1702 * accumulate in work queues until worker threads can actively 1703 * remove them. Invoking this method instead tries to remove them now. 1704 * However, this method may fail to remove tasks in 1705 * the presence of interference by other threads. 1706 */ purge()1707 public void purge() { 1708 final BlockingQueue<Runnable> q = workQueue; 1709 try { 1710 Iterator<Runnable> it = q.iterator(); 1711 while (it.hasNext()) { 1712 Runnable r = it.next(); 1713 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1714 it.remove(); 1715 } 1716 } catch (ConcurrentModificationException fallThrough) { 1717 // Take slow path if we encounter interference during traversal. 1718 // Make copy for traversal and call remove for cancelled entries. 1719 // The slow path is more likely to be O(N*N). 1720 for (Object r : q.toArray()) 1721 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1722 q.remove(r); 1723 } 1724 1725 tryTerminate(); // In case SHUTDOWN and now empty 1726 } 1727 1728 /* Statistics */ 1729 1730 /** 1731 * Returns the current number of threads in the pool. 1732 * 1733 * @return the number of threads 1734 */ getPoolSize()1735 public int getPoolSize() { 1736 final ReentrantLock mainLock = this.mainLock; 1737 mainLock.lock(); 1738 try { 1739 // Remove rare and surprising possibility of 1740 // isTerminated() && getPoolSize() > 0 1741 return runStateAtLeast(ctl.get(), TIDYING) ? 0 1742 : workers.size(); 1743 } finally { 1744 mainLock.unlock(); 1745 } 1746 } 1747 1748 /** 1749 * Returns the approximate number of threads that are actively 1750 * executing tasks. 1751 * 1752 * @return the number of threads 1753 */ getActiveCount()1754 public int getActiveCount() { 1755 final ReentrantLock mainLock = this.mainLock; 1756 mainLock.lock(); 1757 try { 1758 int n = 0; 1759 for (Worker w : workers) 1760 if (w.isLocked()) 1761 ++n; 1762 return n; 1763 } finally { 1764 mainLock.unlock(); 1765 } 1766 } 1767 1768 /** 1769 * Returns the largest number of threads that have ever 1770 * simultaneously been in the pool. 1771 * 1772 * @return the number of threads 1773 */ getLargestPoolSize()1774 public int getLargestPoolSize() { 1775 final ReentrantLock mainLock = this.mainLock; 1776 mainLock.lock(); 1777 try { 1778 return largestPoolSize; 1779 } finally { 1780 mainLock.unlock(); 1781 } 1782 } 1783 1784 /** 1785 * Returns the approximate total number of tasks that have ever been 1786 * scheduled for execution. Because the states of tasks and 1787 * threads may change dynamically during computation, the returned 1788 * value is only an approximation. 1789 * 1790 * @return the number of tasks 1791 */ getTaskCount()1792 public long getTaskCount() { 1793 final ReentrantLock mainLock = this.mainLock; 1794 mainLock.lock(); 1795 try { 1796 long n = completedTaskCount; 1797 for (Worker w : workers) { 1798 n += w.completedTasks; 1799 if (w.isLocked()) 1800 ++n; 1801 } 1802 return n + workQueue.size(); 1803 } finally { 1804 mainLock.unlock(); 1805 } 1806 } 1807 1808 /** 1809 * Returns the approximate total number of tasks that have 1810 * completed execution. Because the states of tasks and threads 1811 * may change dynamically during computation, the returned value 1812 * is only an approximation, but one that does not ever decrease 1813 * across successive calls. 1814 * 1815 * @return the number of tasks 1816 */ getCompletedTaskCount()1817 public long getCompletedTaskCount() { 1818 final ReentrantLock mainLock = this.mainLock; 1819 mainLock.lock(); 1820 try { 1821 long n = completedTaskCount; 1822 for (Worker w : workers) 1823 n += w.completedTasks; 1824 return n; 1825 } finally { 1826 mainLock.unlock(); 1827 } 1828 } 1829 1830 /* Extension hooks */ 1831 1832 /** 1833 * Method invoked prior to executing the given Runnable in the 1834 * given thread. This method is invoked by thread {@code t} that 1835 * will execute task {@code r}, and may be used to re-initialize 1836 * ThreadLocals, or to perform logging. 1837 * 1838 * <p>This implementation does nothing, but may be customized in 1839 * subclasses. Note: To properly nest multiple overridings, subclasses 1840 * should generally invoke {@code super.beforeExecute} at the end of 1841 * this method. 1842 * 1843 * @param t the thread that will run task {@code r} 1844 * @param r the task that will be executed 1845 */ beforeExecute(Thread t, Runnable r)1846 protected void beforeExecute(Thread t, Runnable r) { } 1847 1848 /** 1849 * Method invoked upon completion of execution of the given Runnable. 1850 * This method is invoked by the thread that executed the task. If 1851 * non-null, the Throwable is the uncaught {@code RuntimeException} 1852 * or {@code Error} that caused execution to terminate abruptly. 1853 * 1854 * <p>This implementation does nothing, but may be customized in 1855 * subclasses. Note: To properly nest multiple overridings, subclasses 1856 * should generally invoke {@code super.afterExecute} at the 1857 * beginning of this method. 1858 * 1859 * <p><b>Note:</b> When actions are enclosed in tasks (such as 1860 * {@link FutureTask}) either explicitly or via methods such as 1861 * {@code submit}, these task objects catch and maintain 1862 * computational exceptions, and so they do not cause abrupt 1863 * termination, and the internal exceptions are <em>not</em> 1864 * passed to this method. If you would like to trap both kinds of 1865 * failures in this method, you can further probe for such cases, 1866 * as in this sample subclass that prints either the direct cause 1867 * or the underlying exception if a task has been aborted: 1868 * 1869 * <pre> {@code 1870 * class ExtendedExecutor extends ThreadPoolExecutor { 1871 * // ... 1872 * protected void afterExecute(Runnable r, Throwable t) { 1873 * super.afterExecute(r, t); 1874 * if (t == null && r instanceof Future<?>) { 1875 * try { 1876 * Object result = ((Future<?>) r).get(); 1877 * } catch (CancellationException ce) { 1878 * t = ce; 1879 * } catch (ExecutionException ee) { 1880 * t = ee.getCause(); 1881 * } catch (InterruptedException ie) { 1882 * Thread.currentThread().interrupt(); // ignore/reset 1883 * } 1884 * } 1885 * if (t != null) 1886 * System.out.println(t); 1887 * } 1888 * }}</pre> 1889 * 1890 * @param r the runnable that has completed 1891 * @param t the exception that caused termination, or null if 1892 * execution completed normally 1893 */ afterExecute(Runnable r, Throwable t)1894 protected void afterExecute(Runnable r, Throwable t) { } 1895 1896 /** 1897 * Method invoked when the Executor has terminated. Default 1898 * implementation does nothing. Note: To properly nest multiple 1899 * overridings, subclasses should generally invoke 1900 * {@code super.terminated} within this method. 1901 */ terminated()1902 protected void terminated() { } 1903 1904 /* Predefined RejectedExecutionHandlers */ 1905 1906 /** 1907 * A handler for rejected tasks that runs the rejected task 1908 * directly in the calling thread of the {@code execute} method, 1909 * unless the executor has been shut down, in which case the task 1910 * is discarded. 1911 */ 1912 public static class CallerRunsPolicy implements RejectedExecutionHandler { 1913 /** 1914 * Creates a {@code CallerRunsPolicy}. 1915 */ CallerRunsPolicy()1916 public CallerRunsPolicy() { } 1917 1918 /** 1919 * Executes task r in the caller's thread, unless the executor 1920 * has been shut down, in which case the task is discarded. 1921 * 1922 * @param r the runnable task requested to be executed 1923 * @param e the executor attempting to execute this task 1924 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1925 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1926 if (!e.isShutdown()) { 1927 r.run(); 1928 } 1929 } 1930 } 1931 1932 /** 1933 * A handler for rejected tasks that throws a 1934 * {@code RejectedExecutionException}. 1935 */ 1936 public static class AbortPolicy implements RejectedExecutionHandler { 1937 /** 1938 * Creates an {@code AbortPolicy}. 1939 */ AbortPolicy()1940 public AbortPolicy() { } 1941 1942 /** 1943 * Always throws RejectedExecutionException. 1944 * 1945 * @param r the runnable task requested to be executed 1946 * @param e the executor attempting to execute this task 1947 * @throws RejectedExecutionException always. 1948 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1949 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1950 // BEGIN android-changed 1951 // provide diagnostic messaging for a common exception 1952 1953 // a message is helpful even if it isn't created atomically 1954 int queueSize = e.getQueue().size(); 1955 int remainingCapacity = e.getQueue().remainingCapacity(); 1956 String message = "pool=" + e.getPoolSize() + "/" + e.maximumPoolSize 1957 + ", queue=" + queueSize; 1958 if (remainingCapacity != Integer.MAX_VALUE) { 1959 message += "/" + (queueSize + remainingCapacity); 1960 } 1961 throw new RejectedExecutionException(message); 1962 1963 // END android-changed 1964 } 1965 } 1966 1967 /** 1968 * A handler for rejected tasks that silently discards the 1969 * rejected task. 1970 */ 1971 public static class DiscardPolicy implements RejectedExecutionHandler { 1972 /** 1973 * Creates a {@code DiscardPolicy}. 1974 */ DiscardPolicy()1975 public DiscardPolicy() { } 1976 1977 /** 1978 * Does nothing, which has the effect of discarding task r. 1979 * 1980 * @param r the runnable task requested to be executed 1981 * @param e the executor attempting to execute this task 1982 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1983 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1984 } 1985 } 1986 1987 /** 1988 * A handler for rejected tasks that discards the oldest unhandled 1989 * request and then retries {@code execute}, unless the executor 1990 * is shut down, in which case the task is discarded. 1991 */ 1992 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 1993 /** 1994 * Creates a {@code DiscardOldestPolicy} for the given executor. 1995 */ DiscardOldestPolicy()1996 public DiscardOldestPolicy() { } 1997 1998 /** 1999 * Obtains and ignores the next task that the executor 2000 * would otherwise execute, if one is immediately available, 2001 * and then retries execution of task r, unless the executor 2002 * is shut down, in which case task r is instead discarded. 2003 * 2004 * @param r the runnable task requested to be executed 2005 * @param e the executor attempting to execute this task 2006 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2007 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2008 if (!e.isShutdown()) { 2009 e.getQueue().poll(); 2010 e.execute(r); 2011 } 2012 } 2013 } 2014 } 2015