1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/licenses/publicdomain 5 */ 6 7 package java.util.concurrent; 8 import java.util.concurrent.locks.*; 9 import java.util.concurrent.atomic.*; 10 import java.util.*; 11 12 /** 13 * An {@link ExecutorService} that executes each submitted task using 14 * one of possibly several pooled threads, normally configured 15 * using {@link Executors} factory methods. 16 * 17 * <p>Thread pools address two different problems: they usually 18 * provide improved performance when executing large numbers of 19 * asynchronous tasks, due to reduced per-task invocation overhead, 20 * and they provide a means of bounding and managing the resources, 21 * including threads, consumed when executing a collection of tasks. 22 * Each {@code ThreadPoolExecutor} also maintains some basic 23 * statistics, such as the number of completed tasks. 24 * 25 * <p>To be useful across a wide range of contexts, this class 26 * provides many adjustable parameters and extensibility 27 * hooks. However, programmers are urged to use the more convenient 28 * {@link Executors} factory methods {@link 29 * Executors#newCachedThreadPool} (unbounded thread pool, with 30 * automatic thread reclamation), {@link Executors#newFixedThreadPool} 31 * (fixed size thread pool) and {@link 32 * Executors#newSingleThreadExecutor} (single background thread), that 33 * preconfigure settings for the most common usage 34 * scenarios. Otherwise, use the following guide when manually 35 * configuring and tuning this class: 36 * 37 * <dl> 38 * 39 * <dt>Core and maximum pool sizes</dt> 40 * 41 * <dd>A {@code ThreadPoolExecutor} will automatically adjust the 42 * pool size (see {@link #getPoolSize}) 43 * according to the bounds set by 44 * corePoolSize (see {@link #getCorePoolSize}) and 45 * maximumPoolSize (see {@link #getMaximumPoolSize}). 46 * 47 * When a new task is submitted in method {@link #execute}, and fewer 48 * than corePoolSize threads are running, a new thread is created to 49 * handle the request, even if other worker threads are idle. If 50 * there are more than corePoolSize but less than maximumPoolSize 51 * threads running, a new thread will be created only if the queue is 52 * full. By setting corePoolSize and maximumPoolSize the same, you 53 * create a fixed-size thread pool. By setting maximumPoolSize to an 54 * essentially unbounded value such as {@code Integer.MAX_VALUE}, you 55 * allow the pool to accommodate an arbitrary number of concurrent 56 * tasks. Most typically, core and maximum pool sizes are set only 57 * upon construction, but they may also be changed dynamically using 58 * {@link #setCorePoolSize} and {@link #setMaximumPoolSize}. </dd> 59 * 60 * <dt>On-demand construction</dt> 61 * 62 * <dd> By default, even core threads are initially created and 63 * started only when new tasks arrive, but this can be overridden 64 * dynamically using method {@link #prestartCoreThread} or {@link 65 * #prestartAllCoreThreads}. You probably want to prestart threads if 66 * you construct the pool with a non-empty queue. </dd> 67 * 68 * <dt>Creating new threads</dt> 69 * 70 * <dd>New threads are created using a {@link ThreadFactory}. If not 71 * otherwise specified, a {@link Executors#defaultThreadFactory} is 72 * used, that creates threads to all be in the same {@link 73 * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and 74 * non-daemon status. By supplying a different ThreadFactory, you can 75 * alter the thread's name, thread group, priority, daemon status, 76 * etc. If a {@code ThreadFactory} fails to create a thread when asked 77 * by returning null from {@code newThread}, the executor will 78 * continue, but might not be able to execute any tasks.</dd> 79 * 80 * <dt>Keep-alive times</dt> 81 * 82 * <dd>If the pool currently has more than corePoolSize threads, 83 * excess threads will be terminated if they have been idle for more 84 * than the keepAliveTime (see {@link #getKeepAliveTime}). This 85 * provides a means of reducing resource consumption when the pool is 86 * not being actively used. If the pool becomes more active later, new 87 * threads will be constructed. This parameter can also be changed 88 * dynamically using method {@link #setKeepAliveTime}. Using a value 89 * of {@code Long.MAX_VALUE} {@link TimeUnit#NANOSECONDS} effectively 90 * disables idle threads from ever terminating prior to shut down. By 91 * default, the keep-alive policy applies only when there are more 92 * than corePoolSizeThreads. But method {@link 93 * #allowCoreThreadTimeOut(boolean)} can be used to apply this 94 * time-out policy to core threads as well, so long as the 95 * keepAliveTime value is non-zero. </dd> 96 * 97 * <dt>Queuing</dt> 98 * 99 * <dd>Any {@link BlockingQueue} may be used to transfer and hold 100 * submitted tasks. The use of this queue interacts with pool sizing: 101 * 102 * <ul> 103 * 104 * <li> If fewer than corePoolSize threads are running, the Executor 105 * always prefers adding a new thread 106 * rather than queuing.</li> 107 * 108 * <li> If corePoolSize or more threads are running, the Executor 109 * always prefers queuing a request rather than adding a new 110 * thread.</li> 111 * 112 * <li> If a request cannot be queued, a new thread is created unless 113 * this would exceed maximumPoolSize, in which case, the task will be 114 * rejected.</li> 115 * 116 * </ul> 117 * 118 * There are three general strategies for queuing: 119 * <ol> 120 * 121 * <li> <em> Direct handoffs.</em> A good default choice for a work 122 * queue is a {@link SynchronousQueue} that hands off tasks to threads 123 * without otherwise holding them. Here, an attempt to queue a task 124 * will fail if no threads are immediately available to run it, so a 125 * new thread will be constructed. This policy avoids lockups when 126 * handling sets of requests that might have internal dependencies. 127 * Direct handoffs generally require unbounded maximumPoolSizes to 128 * avoid rejection of new submitted tasks. This in turn admits the 129 * possibility of unbounded thread growth when commands continue to 130 * arrive on average faster than they can be processed. </li> 131 * 132 * <li><em> Unbounded queues.</em> Using an unbounded queue (for 133 * example a {@link LinkedBlockingQueue} without a predefined 134 * capacity) will cause new tasks to wait in the queue when all 135 * corePoolSize threads are busy. Thus, no more than corePoolSize 136 * threads will ever be created. (And the value of the maximumPoolSize 137 * therefore doesn't have any effect.) This may be appropriate when 138 * each task is completely independent of others, so tasks cannot 139 * affect each others execution; for example, in a web page server. 140 * While this style of queuing can be useful in smoothing out 141 * transient bursts of requests, it admits the possibility of 142 * unbounded work queue growth when commands continue to arrive on 143 * average faster than they can be processed. </li> 144 * 145 * <li><em>Bounded queues.</em> A bounded queue (for example, an 146 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 147 * used with finite maximumPoolSizes, but can be more difficult to 148 * tune and control. Queue sizes and maximum pool sizes may be traded 149 * off for each other: Using large queues and small pools minimizes 150 * CPU usage, OS resources, and context-switching overhead, but can 151 * lead to artificially low throughput. If tasks frequently block (for 152 * example if they are I/O bound), a system may be able to schedule 153 * time for more threads than you otherwise allow. Use of small queues 154 * generally requires larger pool sizes, which keeps CPUs busier but 155 * may encounter unacceptable scheduling overhead, which also 156 * decreases throughput. </li> 157 * 158 * </ol> 159 * 160 * </dd> 161 * 162 * <dt>Rejected tasks</dt> 163 * 164 * <dd> New tasks submitted in method {@link #execute} will be 165 * <em>rejected</em> when the Executor has been shut down, and also 166 * when the Executor uses finite bounds for both maximum threads and 167 * work queue capacity, and is saturated. In either case, the {@code 168 * execute} method invokes the {@link 169 * RejectedExecutionHandler#rejectedExecution} method of its {@link 170 * RejectedExecutionHandler}. Four predefined handler policies are 171 * provided: 172 * 173 * <ol> 174 * 175 * <li> In the default {@link ThreadPoolExecutor.AbortPolicy}, the 176 * handler throws a runtime {@link RejectedExecutionException} upon 177 * rejection. </li> 178 * 179 * <li> In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread 180 * that invokes {@code execute} itself runs the task. This provides a 181 * simple feedback control mechanism that will slow down the rate that 182 * new tasks are submitted. </li> 183 * 184 * <li> In {@link ThreadPoolExecutor.DiscardPolicy}, a task that 185 * cannot be executed is simply dropped. </li> 186 * 187 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the 188 * executor is not shut down, the task at the head of the work queue 189 * is dropped, and then execution is retried (which can fail again, 190 * causing this to be repeated.) </li> 191 * 192 * </ol> 193 * 194 * It is possible to define and use other kinds of {@link 195 * RejectedExecutionHandler} classes. Doing so requires some care 196 * especially when policies are designed to work only under particular 197 * capacity or queuing policies. </dd> 198 * 199 * <dt>Hook methods</dt> 200 * 201 * <dd>This class provides {@code protected} overridable {@link 202 * #beforeExecute} and {@link #afterExecute} methods that are called 203 * before and after execution of each task. These can be used to 204 * manipulate the execution environment; for example, reinitializing 205 * ThreadLocals, gathering statistics, or adding log 206 * entries. Additionally, method {@link #terminated} can be overridden 207 * to perform any special processing that needs to be done once the 208 * Executor has fully terminated. 209 * 210 * <p>If hook or callback methods throw exceptions, internal worker 211 * threads may in turn fail and abruptly terminate.</dd> 212 * 213 * <dt>Queue maintenance</dt> 214 * 215 * <dd> Method {@link #getQueue} allows access to the work queue for 216 * purposes of monitoring and debugging. Use of this method for any 217 * other purpose is strongly discouraged. Two supplied methods, 218 * {@link #remove} and {@link #purge} are available to assist in 219 * storage reclamation when large numbers of queued tasks become 220 * cancelled.</dd> 221 * 222 * <dt>Finalization</dt> 223 * 224 * <dd> A pool that is no longer referenced in a program <em>AND</em> 225 * has no remaining threads will be {@code shutdown} automatically. If 226 * you would like to ensure that unreferenced pools are reclaimed even 227 * if users forget to call {@link #shutdown}, then you must arrange 228 * that unused threads eventually die, by setting appropriate 229 * keep-alive times, using a lower bound of zero core threads and/or 230 * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd> 231 * 232 * </dl> 233 * 234 * <p> <b>Extension example</b>. Most extensions of this class 235 * override one or more of the protected hook methods. For example, 236 * here is a subclass that adds a simple pause/resume feature: 237 * 238 * <pre> {@code 239 * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 240 * private boolean isPaused; 241 * private ReentrantLock pauseLock = new ReentrantLock(); 242 * private Condition unpaused = pauseLock.newCondition(); 243 * 244 * public PausableThreadPoolExecutor(...) { super(...); } 245 * 246 * protected void beforeExecute(Thread t, Runnable r) { 247 * super.beforeExecute(t, r); 248 * pauseLock.lock(); 249 * try { 250 * while (isPaused) unpaused.await(); 251 * } catch (InterruptedException ie) { 252 * t.interrupt(); 253 * } finally { 254 * pauseLock.unlock(); 255 * } 256 * } 257 * 258 * public void pause() { 259 * pauseLock.lock(); 260 * try { 261 * isPaused = true; 262 * } finally { 263 * pauseLock.unlock(); 264 * } 265 * } 266 * 267 * public void resume() { 268 * pauseLock.lock(); 269 * try { 270 * isPaused = false; 271 * unpaused.signalAll(); 272 * } finally { 273 * pauseLock.unlock(); 274 * } 275 * } 276 * }}</pre> 277 * 278 * @since 1.5 279 * @author Doug Lea 280 */ 281 public class ThreadPoolExecutor extends AbstractExecutorService { 282 /** 283 * The main pool control state, ctl, is an atomic integer packing 284 * two conceptual fields 285 * workerCount, indicating the effective number of threads 286 * runState, indicating whether running, shutting down etc 287 * 288 * In order to pack them into one int, we limit workerCount to 289 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 290 * billion) otherwise representable. If this is ever an issue in 291 * the future, the variable can be changed to be an AtomicLong, 292 * and the shift/mask constants below adjusted. But until the need 293 * arises, this code is a bit faster and simpler using an int. 294 * 295 * The workerCount is the number of workers that have been 296 * permitted to start and not permitted to stop. The value may be 297 * transiently different from the actual number of live threads, 298 * for example when a ThreadFactory fails to create a thread when 299 * asked, and when exiting threads are still performing 300 * bookkeeping before terminating. The user-visible pool size is 301 * reported as the current size of the workers set. 302 * 303 * The runState provides the main lifecyle control, taking on values: 304 * 305 * RUNNING: Accept new tasks and process queued tasks 306 * SHUTDOWN: Don't accept new tasks, but process queued tasks 307 * STOP: Don't accept new tasks, don't process queued tasks, 308 * and interrupt in-progress tasks 309 * TIDYING: All tasks have terminated, workerCount is zero, 310 * the thread transitioning to state TIDYING 311 * will run the terminated() hook method 312 * TERMINATED: terminated() has completed 313 * 314 * The numerical order among these values matters, to allow 315 * ordered comparisons. The runState monotonically increases over 316 * time, but need not hit each state. The transitions are: 317 * 318 * RUNNING -> SHUTDOWN 319 * On invocation of shutdown(), perhaps implicitly in finalize() 320 * (RUNNING or SHUTDOWN) -> STOP 321 * On invocation of shutdownNow() 322 * SHUTDOWN -> TIDYING 323 * When both queue and pool are empty 324 * STOP -> TIDYING 325 * When pool is empty 326 * TIDYING -> TERMINATED 327 * When the terminated() hook method has completed 328 * 329 * Threads waiting in awaitTermination() will return when the 330 * state reaches TERMINATED. 331 * 332 * Detecting the transition from SHUTDOWN to TIDYING is less 333 * straightforward than you'd like because the queue may become 334 * empty after non-empty and vice versa during SHUTDOWN state, but 335 * we can only terminate if, after seeing that it is empty, we see 336 * that workerCount is 0 (which sometimes entails a recheck -- see 337 * below). 338 */ 339 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 340 private static final int COUNT_BITS = Integer.SIZE - 3; 341 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 342 343 // runState is stored in the high-order bits 344 private static final int RUNNING = -1 << COUNT_BITS; 345 private static final int SHUTDOWN = 0 << COUNT_BITS; 346 private static final int STOP = 1 << COUNT_BITS; 347 private static final int TIDYING = 2 << COUNT_BITS; 348 private static final int TERMINATED = 3 << COUNT_BITS; 349 350 // Packing and unpacking ctl runStateOf(int c)351 private static int runStateOf(int c) { return c & ~CAPACITY; } workerCountOf(int c)352 private static int workerCountOf(int c) { return c & CAPACITY; } ctlOf(int rs, int wc)353 private static int ctlOf(int rs, int wc) { return rs | wc; } 354 355 /* 356 * Bit field accessors that don't require unpacking ctl. 357 * These depend on the bit layout and on workerCount being never negative. 358 */ 359 runStateLessThan(int c, int s)360 private static boolean runStateLessThan(int c, int s) { 361 return c < s; 362 } 363 runStateAtLeast(int c, int s)364 private static boolean runStateAtLeast(int c, int s) { 365 return c >= s; 366 } 367 isRunning(int c)368 private static boolean isRunning(int c) { 369 return c < SHUTDOWN; 370 } 371 372 /** 373 * Attempt to CAS-increment the workerCount field of ctl. 374 */ compareAndIncrementWorkerCount(int expect)375 private boolean compareAndIncrementWorkerCount(int expect) { 376 return ctl.compareAndSet(expect, expect + 1); 377 } 378 379 /** 380 * Attempt to CAS-decrement the workerCount field of ctl. 381 */ compareAndDecrementWorkerCount(int expect)382 private boolean compareAndDecrementWorkerCount(int expect) { 383 return ctl.compareAndSet(expect, expect - 1); 384 } 385 386 /** 387 * Decrements the workerCount field of ctl. This is called only on 388 * abrupt termination of a thread (see processWorkerExit). Other 389 * decrements are performed within getTask. 390 */ decrementWorkerCount()391 private void decrementWorkerCount() { 392 do {} while (! compareAndDecrementWorkerCount(ctl.get())); 393 } 394 395 /** 396 * The queue used for holding tasks and handing off to worker 397 * threads. We do not require that workQueue.poll() returning 398 * null necessarily means that workQueue.isEmpty(), so rely 399 * solely on isEmpty to see if the queue is empty (which we must 400 * do for example when deciding whether to transition from 401 * SHUTDOWN to TIDYING). This accommodates special-purpose 402 * queues such as DelayQueues for which poll() is allowed to 403 * return null even if it may later return non-null when delays 404 * expire. 405 */ 406 private final BlockingQueue<Runnable> workQueue; 407 408 /** 409 * Lock held on access to workers set and related bookkeeping. 410 * While we could use a concurrent set of some sort, it turns out 411 * to be generally preferable to use a lock. Among the reasons is 412 * that this serializes interruptIdleWorkers, which avoids 413 * unnecessary interrupt storms, especially during shutdown. 414 * Otherwise exiting threads would concurrently interrupt those 415 * that have not yet interrupted. It also simplifies some of the 416 * associated statistics bookkeeping of largestPoolSize etc. We 417 * also hold mainLock on shutdown and shutdownNow, for the sake of 418 * ensuring workers set is stable while separately checking 419 * permission to interrupt and actually interrupting. 420 */ 421 private final ReentrantLock mainLock = new ReentrantLock(); 422 423 /** 424 * Set containing all worker threads in pool. Accessed only when 425 * holding mainLock. 426 */ 427 private final HashSet<Worker> workers = new HashSet<Worker>(); 428 429 /** 430 * Wait condition to support awaitTermination 431 */ 432 private final Condition termination = mainLock.newCondition(); 433 434 /** 435 * Tracks largest attained pool size. Accessed only under 436 * mainLock. 437 */ 438 private int largestPoolSize; 439 440 /** 441 * Counter for completed tasks. Updated only on termination of 442 * worker threads. Accessed only under mainLock. 443 */ 444 private long completedTaskCount; 445 446 /* 447 * All user control parameters are declared as volatiles so that 448 * ongoing actions are based on freshest values, but without need 449 * for locking, since no internal invariants depend on them 450 * changing synchronously with respect to other actions. 451 */ 452 453 /** 454 * Factory for new threads. All threads are created using this 455 * factory (via method addWorker). All callers must be prepared 456 * for addWorker to fail, which may reflect a system or user's 457 * policy limiting the number of threads. Even though it is not 458 * treated as an error, failure to create threads may result in 459 * new tasks being rejected or existing ones remaining stuck in 460 * the queue. On the other hand, no special precautions exist to 461 * handle OutOfMemoryErrors that might be thrown while trying to 462 * create threads, since there is generally no recourse from 463 * within this class. 464 */ 465 private volatile ThreadFactory threadFactory; 466 467 /** 468 * Handler called when saturated or shutdown in execute. 469 */ 470 private volatile RejectedExecutionHandler handler; 471 472 /** 473 * Timeout in nanoseconds for idle threads waiting for work. 474 * Threads use this timeout when there are more than corePoolSize 475 * present or if allowCoreThreadTimeOut. Otherwise they wait 476 * forever for new work. 477 */ 478 private volatile long keepAliveTime; 479 480 /** 481 * If false (default), core threads stay alive even when idle. 482 * If true, core threads use keepAliveTime to time out waiting 483 * for work. 484 */ 485 private volatile boolean allowCoreThreadTimeOut; 486 487 /** 488 * Core pool size is the minimum number of workers to keep alive 489 * (and not allow to time out etc) unless allowCoreThreadTimeOut 490 * is set, in which case the minimum is zero. 491 */ 492 private volatile int corePoolSize; 493 494 /** 495 * Maximum pool size. Note that the actual maximum is internally 496 * bounded by CAPACITY. 497 */ 498 private volatile int maximumPoolSize; 499 500 /** 501 * The default rejected execution handler 502 */ 503 private static final RejectedExecutionHandler defaultHandler = 504 new AbortPolicy(); 505 506 /** 507 * Permission required for callers of shutdown and shutdownNow. 508 * We additionally require (see checkShutdownAccess) that callers 509 * have permission to actually interrupt threads in the worker set 510 * (as governed by Thread.interrupt, which relies on 511 * ThreadGroup.checkAccess, which in turn relies on 512 * SecurityManager.checkAccess). Shutdowns are attempted only if 513 * these checks pass. 514 * 515 * All actual invocations of Thread.interrupt (see 516 * interruptIdleWorkers and interruptWorkers) ignore 517 * SecurityExceptions, meaning that the attempted interrupts 518 * silently fail. In the case of shutdown, they should not fail 519 * unless the SecurityManager has inconsistent policies, sometimes 520 * allowing access to a thread and sometimes not. In such cases, 521 * failure to actually interrupt threads may disable or delay full 522 * termination. Other uses of interruptIdleWorkers are advisory, 523 * and failure to actually interrupt will merely delay response to 524 * configuration changes so is not handled exceptionally. 525 */ 526 private static final RuntimePermission shutdownPerm = 527 new RuntimePermission("modifyThread"); 528 529 /** 530 * Class Worker mainly maintains interrupt control state for 531 * threads running tasks, along with other minor bookkeeping. 532 * This class opportunistically extends AbstractQueuedSynchronizer 533 * to simplify acquiring and releasing a lock surrounding each 534 * task execution. This protects against interrupts that are 535 * intended to wake up a worker thread waiting for a task from 536 * instead interrupting a task being run. We implement a simple 537 * non-reentrant mutual exclusion lock rather than use ReentrantLock 538 * because we do not want worker tasks to be able to reacquire the 539 * lock when they invoke pool control methods like setCorePoolSize. 540 */ 541 private final class Worker 542 extends AbstractQueuedSynchronizer 543 implements Runnable 544 { 545 /** 546 * This class will never be serialized, but we provide a 547 * serialVersionUID to suppress a javac warning. 548 */ 549 private static final long serialVersionUID = 6138294804551838833L; 550 551 /** Thread this worker is running in. Null if factory fails. */ 552 final Thread thread; 553 /** Initial task to run. Possibly null. */ 554 Runnable firstTask; 555 /** Per-thread task counter */ 556 volatile long completedTasks; 557 558 /** 559 * Creates with given first task and thread from ThreadFactory. 560 * @param firstTask the first task (null if none) 561 */ Worker(Runnable firstTask)562 Worker(Runnable firstTask) { 563 this.firstTask = firstTask; 564 this.thread = getThreadFactory().newThread(this); 565 } 566 567 /** Delegates main run loop to outer runWorker */ run()568 public void run() { 569 runWorker(this); 570 } 571 572 // Lock methods 573 // 574 // The value 0 represents the unlocked state. 575 // The value 1 represents the locked state. 576 isHeldExclusively()577 protected boolean isHeldExclusively() { 578 return getState() == 1; 579 } 580 tryAcquire(int unused)581 protected boolean tryAcquire(int unused) { 582 if (compareAndSetState(0, 1)) { 583 setExclusiveOwnerThread(Thread.currentThread()); 584 return true; 585 } 586 return false; 587 } 588 tryRelease(int unused)589 protected boolean tryRelease(int unused) { 590 setExclusiveOwnerThread(null); 591 setState(0); 592 return true; 593 } 594 lock()595 public void lock() { acquire(1); } tryLock()596 public boolean tryLock() { return tryAcquire(1); } unlock()597 public void unlock() { release(1); } isLocked()598 public boolean isLocked() { return isHeldExclusively(); } 599 } 600 601 /* 602 * Methods for setting control state 603 */ 604 605 /** 606 * Transitions runState to given target, or leaves it alone if 607 * already at least the given target. 608 * 609 * @param targetState the desired state, either SHUTDOWN or STOP 610 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 611 */ advanceRunState(int targetState)612 private void advanceRunState(int targetState) { 613 for (;;) { 614 int c = ctl.get(); 615 if (runStateAtLeast(c, targetState) || 616 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 617 break; 618 } 619 } 620 621 /** 622 * Transitions to TERMINATED state if either (SHUTDOWN and pool 623 * and queue empty) or (STOP and pool empty). If otherwise 624 * eligible to terminate but workerCount is nonzero, interrupts an 625 * idle worker to ensure that shutdown signals propagate. This 626 * method must be called following any action that might make 627 * termination possible -- reducing worker count or removing tasks 628 * from the queue during shutdown. The method is non-private to 629 * allow access from ScheduledThreadPoolExecutor. 630 */ tryTerminate()631 final void tryTerminate() { 632 for (;;) { 633 int c = ctl.get(); 634 if (isRunning(c) || 635 runStateAtLeast(c, TIDYING) || 636 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 637 return; 638 if (workerCountOf(c) != 0) { // Eligible to terminate 639 interruptIdleWorkers(ONLY_ONE); 640 return; 641 } 642 643 final ReentrantLock mainLock = this.mainLock; 644 mainLock.lock(); 645 try { 646 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 647 try { 648 terminated(); 649 } finally { 650 ctl.set(ctlOf(TERMINATED, 0)); 651 termination.signalAll(); 652 } 653 return; 654 } 655 } finally { 656 mainLock.unlock(); 657 } 658 // else retry on failed CAS 659 } 660 } 661 662 /* 663 * Methods for controlling interrupts to worker threads. 664 */ 665 666 /** 667 * If there is a security manager, makes sure caller has 668 * permission to shut down threads in general (see shutdownPerm). 669 * If this passes, additionally makes sure the caller is allowed 670 * to interrupt each worker thread. This might not be true even if 671 * first check passed, if the SecurityManager treats some threads 672 * specially. 673 */ checkShutdownAccess()674 private void checkShutdownAccess() { 675 SecurityManager security = System.getSecurityManager(); 676 if (security != null) { 677 security.checkPermission(shutdownPerm); 678 final ReentrantLock mainLock = this.mainLock; 679 mainLock.lock(); 680 try { 681 for (Worker w : workers) 682 security.checkAccess(w.thread); 683 } finally { 684 mainLock.unlock(); 685 } 686 } 687 } 688 689 /** 690 * Interrupts all threads, even if active. Ignores SecurityExceptions 691 * (in which case some threads may remain uninterrupted). 692 */ interruptWorkers()693 private void interruptWorkers() { 694 final ReentrantLock mainLock = this.mainLock; 695 mainLock.lock(); 696 try { 697 for (Worker w : workers) { 698 try { 699 w.thread.interrupt(); 700 } catch (SecurityException ignore) { 701 } 702 } 703 } finally { 704 mainLock.unlock(); 705 } 706 } 707 708 /** 709 * Interrupts threads that might be waiting for tasks (as 710 * indicated by not being locked) so they can check for 711 * termination or configuration changes. Ignores 712 * SecurityExceptions (in which case some threads may remain 713 * uninterrupted). 714 * 715 * @param onlyOne If true, interrupt at most one worker. This is 716 * called only from tryTerminate when termination is otherwise 717 * enabled but there are still other workers. In this case, at 718 * most one waiting worker is interrupted to propagate shutdown 719 * signals in case all threads are currently waiting. 720 * Interrupting any arbitrary thread ensures that newly arriving 721 * workers since shutdown began will also eventually exit. 722 * To guarantee eventual termination, it suffices to always 723 * interrupt only one idle worker, but shutdown() interrupts all 724 * idle workers so that redundant workers exit promptly, not 725 * waiting for a straggler task to finish. 726 */ interruptIdleWorkers(boolean onlyOne)727 private void interruptIdleWorkers(boolean onlyOne) { 728 final ReentrantLock mainLock = this.mainLock; 729 mainLock.lock(); 730 try { 731 for (Worker w : workers) { 732 Thread t = w.thread; 733 if (!t.isInterrupted() && w.tryLock()) { 734 try { 735 t.interrupt(); 736 } catch (SecurityException ignore) { 737 } finally { 738 w.unlock(); 739 } 740 } 741 if (onlyOne) 742 break; 743 } 744 } finally { 745 mainLock.unlock(); 746 } 747 } 748 749 /** 750 * Common form of interruptIdleWorkers, to avoid having to 751 * remember what the boolean argument means. 752 */ interruptIdleWorkers()753 private void interruptIdleWorkers() { 754 interruptIdleWorkers(false); 755 } 756 757 private static final boolean ONLY_ONE = true; 758 759 /** 760 * Ensures that unless the pool is stopping, the current thread 761 * does not have its interrupt set. This requires a double-check 762 * of state in case the interrupt was cleared concurrently with a 763 * shutdownNow -- if so, the interrupt is re-enabled. 764 */ clearInterruptsForTaskRun()765 private void clearInterruptsForTaskRun() { 766 if (runStateLessThan(ctl.get(), STOP) && 767 Thread.interrupted() && 768 runStateAtLeast(ctl.get(), STOP)) 769 Thread.currentThread().interrupt(); 770 } 771 772 /* 773 * Misc utilities, most of which are also exported to 774 * ScheduledThreadPoolExecutor 775 */ 776 777 /** 778 * Invokes the rejected execution handler for the given command. 779 * Package-protected for use by ScheduledThreadPoolExecutor. 780 */ reject(Runnable command)781 final void reject(Runnable command) { 782 handler.rejectedExecution(command, this); 783 } 784 785 /** 786 * Performs any further cleanup following run state transition on 787 * invocation of shutdown. A no-op here, but used by 788 * ScheduledThreadPoolExecutor to cancel delayed tasks. 789 */ onShutdown()790 void onShutdown() { 791 } 792 793 /** 794 * State check needed by ScheduledThreadPoolExecutor to 795 * enable running tasks during shutdown. 796 * 797 * @param shutdownOK true if should return true if SHUTDOWN 798 */ isRunningOrShutdown(boolean shutdownOK)799 final boolean isRunningOrShutdown(boolean shutdownOK) { 800 int rs = runStateOf(ctl.get()); 801 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); 802 } 803 804 /** 805 * Drains the task queue into a new list, normally using 806 * drainTo. But if the queue is a DelayQueue or any other kind of 807 * queue for which poll or drainTo may fail to remove some 808 * elements, it deletes them one by one. 809 */ drainQueue()810 private List<Runnable> drainQueue() { 811 BlockingQueue<Runnable> q = workQueue; 812 List<Runnable> taskList = new ArrayList<Runnable>(); 813 q.drainTo(taskList); 814 if (!q.isEmpty()) { 815 for (Runnable r : q.toArray(new Runnable[0])) { 816 if (q.remove(r)) 817 taskList.add(r); 818 } 819 } 820 return taskList; 821 } 822 823 /* 824 * Methods for creating, running and cleaning up after workers 825 */ 826 827 /** 828 * Checks if a new worker can be added with respect to current 829 * pool state and the given bound (either core or maximum). If so, 830 * the worker count is adjusted accordingly, and, if possible, a 831 * new worker is created and started running firstTask as its 832 * first task. This method returns false if the pool is stopped or 833 * eligible to shut down. It also returns false if the thread 834 * factory fails to create a thread when asked, which requires a 835 * backout of workerCount, and a recheck for termination, in case 836 * the existence of this worker was holding up termination. 837 * 838 * @param firstTask the task the new thread should run first (or 839 * null if none). Workers are created with an initial first task 840 * (in method execute()) to bypass queuing when there are fewer 841 * than corePoolSize threads (in which case we always start one), 842 * or when the queue is full (in which case we must bypass queue). 843 * Initially idle threads are usually created via 844 * prestartCoreThread or to replace other dying workers. 845 * 846 * @param core if true use corePoolSize as bound, else 847 * maximumPoolSize. (A boolean indicator is used here rather than a 848 * value to ensure reads of fresh values after checking other pool 849 * state). 850 * @return true if successful 851 */ addWorker(Runnable firstTask, boolean core)852 private boolean addWorker(Runnable firstTask, boolean core) { 853 retry: 854 for (;;) { 855 int c = ctl.get(); 856 int rs = runStateOf(c); 857 858 // Check if queue empty only if necessary. 859 if (rs >= SHUTDOWN && 860 ! (rs == SHUTDOWN && 861 firstTask == null && 862 ! workQueue.isEmpty())) 863 return false; 864 865 for (;;) { 866 int wc = workerCountOf(c); 867 if (wc >= CAPACITY || 868 wc >= (core ? corePoolSize : maximumPoolSize)) 869 return false; 870 if (compareAndIncrementWorkerCount(c)) 871 break retry; 872 c = ctl.get(); // Re-read ctl 873 if (runStateOf(c) != rs) 874 continue retry; 875 // else CAS failed due to workerCount change; retry inner loop 876 } 877 } 878 879 Worker w = new Worker(firstTask); 880 Thread t = w.thread; 881 882 final ReentrantLock mainLock = this.mainLock; 883 mainLock.lock(); 884 try { 885 // Recheck while holding lock. 886 // Back out on ThreadFactory failure or if 887 // shut down before lock acquired. 888 int c = ctl.get(); 889 int rs = runStateOf(c); 890 891 if (t == null || 892 (rs >= SHUTDOWN && 893 ! (rs == SHUTDOWN && 894 firstTask == null))) { 895 decrementWorkerCount(); 896 tryTerminate(); 897 return false; 898 } 899 900 workers.add(w); 901 902 int s = workers.size(); 903 if (s > largestPoolSize) 904 largestPoolSize = s; 905 } finally { 906 mainLock.unlock(); 907 } 908 909 t.start(); 910 // It is possible (but unlikely) for a thread to have been 911 // added to workers, but not yet started, during transition to 912 // STOP, which could result in a rare missed interrupt, 913 // because Thread.interrupt is not guaranteed to have any effect 914 // on a non-yet-started Thread (see Thread#interrupt). 915 if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted()) 916 t.interrupt(); 917 918 return true; 919 } 920 921 /** 922 * Performs cleanup and bookkeeping for a dying worker. Called 923 * only from worker threads. Unless completedAbruptly is set, 924 * assumes that workerCount has already been adjusted to account 925 * for exit. This method removes thread from worker set, and 926 * possibly terminates the pool or replaces the worker if either 927 * it exited due to user task exception or if fewer than 928 * corePoolSize workers are running or queue is non-empty but 929 * there are no workers. 930 * 931 * @param w the worker 932 * @param completedAbruptly if the worker died due to user exception 933 */ processWorkerExit(Worker w, boolean completedAbruptly)934 private void processWorkerExit(Worker w, boolean completedAbruptly) { 935 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 936 decrementWorkerCount(); 937 938 final ReentrantLock mainLock = this.mainLock; 939 mainLock.lock(); 940 try { 941 completedTaskCount += w.completedTasks; 942 workers.remove(w); 943 } finally { 944 mainLock.unlock(); 945 } 946 947 tryTerminate(); 948 949 int c = ctl.get(); 950 if (runStateLessThan(c, STOP)) { 951 if (!completedAbruptly) { 952 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 953 if (min == 0 && ! workQueue.isEmpty()) 954 min = 1; 955 if (workerCountOf(c) >= min) 956 return; // replacement not needed 957 } 958 addWorker(null, false); 959 } 960 } 961 962 /** 963 * Performs blocking or timed wait for a task, depending on 964 * current configuration settings, or returns null if this worker 965 * must exit because of any of: 966 * 1. There are more than maximumPoolSize workers (due to 967 * a call to setMaximumPoolSize). 968 * 2. The pool is stopped. 969 * 3. The pool is shutdown and the queue is empty. 970 * 4. This worker timed out waiting for a task, and timed-out 971 * workers are subject to termination (that is, 972 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 973 * both before and after the timed wait. 974 * 975 * @return task, or null if the worker must exit, in which case 976 * workerCount is decremented 977 */ getTask()978 private Runnable getTask() { 979 boolean timedOut = false; // Did the last poll() time out? 980 981 retry: 982 for (;;) { 983 int c = ctl.get(); 984 int rs = runStateOf(c); 985 986 // Check if queue empty only if necessary. 987 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 988 decrementWorkerCount(); 989 return null; 990 } 991 992 boolean timed; // Are workers subject to culling? 993 994 for (;;) { 995 int wc = workerCountOf(c); 996 timed = allowCoreThreadTimeOut || wc > corePoolSize; 997 998 if (wc <= maximumPoolSize && ! (timedOut && timed)) 999 break; 1000 if (compareAndDecrementWorkerCount(c)) 1001 return null; 1002 c = ctl.get(); // Re-read ctl 1003 if (runStateOf(c) != rs) 1004 continue retry; 1005 // else CAS failed due to workerCount change; retry inner loop 1006 } 1007 1008 try { 1009 Runnable r = timed ? 1010 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 1011 workQueue.take(); 1012 if (r != null) 1013 return r; 1014 timedOut = true; 1015 } catch (InterruptedException retry) { 1016 timedOut = false; 1017 } 1018 } 1019 } 1020 1021 /** 1022 * Main worker run loop. Repeatedly gets tasks from queue and 1023 * executes them, while coping with a number of issues: 1024 * 1025 * 1. We may start out with an initial task, in which case we 1026 * don't need to get the first one. Otherwise, as long as pool is 1027 * running, we get tasks from getTask. If it returns null then the 1028 * worker exits due to changed pool state or configuration 1029 * parameters. Other exits result from exception throws in 1030 * external code, in which case completedAbruptly holds, which 1031 * usually leads processWorkerExit to replace this thread. 1032 * 1033 * 2. Before running any task, the lock is acquired to prevent 1034 * other pool interrupts while the task is executing, and 1035 * clearInterruptsForTaskRun called to ensure that unless pool is 1036 * stopping, this thread does not have its interrupt set. 1037 * 1038 * 3. Each task run is preceded by a call to beforeExecute, which 1039 * might throw an exception, in which case we cause thread to die 1040 * (breaking loop with completedAbruptly true) without processing 1041 * the task. 1042 * 1043 * 4. Assuming beforeExecute completes normally, we run the task, 1044 * gathering any of its thrown exceptions to send to 1045 * afterExecute. We separately handle RuntimeException, Error 1046 * (both of which the specs guarantee that we trap) and arbitrary 1047 * Throwables. Because we cannot rethrow Throwables within 1048 * Runnable.run, we wrap them within Errors on the way out (to the 1049 * thread's UncaughtExceptionHandler). Any thrown exception also 1050 * conservatively causes thread to die. 1051 * 1052 * 5. After task.run completes, we call afterExecute, which may 1053 * also throw an exception, which will also cause thread to 1054 * die. According to JLS Sec 14.20, this exception is the one that 1055 * will be in effect even if task.run throws. 1056 * 1057 * The net effect of the exception mechanics is that afterExecute 1058 * and the thread's UncaughtExceptionHandler have as accurate 1059 * information as we can provide about any problems encountered by 1060 * user code. 1061 * 1062 * @param w the worker 1063 */ runWorker(Worker w)1064 final void runWorker(Worker w) { 1065 Runnable task = w.firstTask; 1066 w.firstTask = null; 1067 boolean completedAbruptly = true; 1068 try { 1069 while (task != null || (task = getTask()) != null) { 1070 w.lock(); 1071 clearInterruptsForTaskRun(); 1072 try { 1073 beforeExecute(w.thread, task); 1074 Throwable thrown = null; 1075 try { 1076 task.run(); 1077 } catch (RuntimeException x) { 1078 thrown = x; throw x; 1079 } catch (Error x) { 1080 thrown = x; throw x; 1081 } catch (Throwable x) { 1082 thrown = x; throw new Error(x); 1083 } finally { 1084 afterExecute(task, thrown); 1085 } 1086 } finally { 1087 task = null; 1088 w.completedTasks++; 1089 w.unlock(); 1090 } 1091 } 1092 completedAbruptly = false; 1093 } finally { 1094 processWorkerExit(w, completedAbruptly); 1095 } 1096 } 1097 1098 // Public constructors and methods 1099 1100 /** 1101 * Creates a new {@code ThreadPoolExecutor} with the given initial 1102 * parameters and default thread factory and rejected execution handler. 1103 * It may be more convenient to use one of the {@link Executors} factory 1104 * methods instead of this general purpose constructor. 1105 * 1106 * @param corePoolSize the number of threads to keep in the pool, even 1107 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1108 * @param maximumPoolSize the maximum number of threads to allow in the 1109 * pool 1110 * @param keepAliveTime when the number of threads is greater than 1111 * the core, this is the maximum time that excess idle threads 1112 * will wait for new tasks before terminating. 1113 * @param unit the time unit for the {@code keepAliveTime} argument 1114 * @param workQueue the queue to use for holding tasks before they are 1115 * executed. This queue will hold only the {@code Runnable} 1116 * tasks submitted by the {@code execute} method. 1117 * @throws IllegalArgumentException if one of the following holds:<br> 1118 * {@code corePoolSize < 0}<br> 1119 * {@code keepAliveTime < 0}<br> 1120 * {@code maximumPoolSize <= 0}<br> 1121 * {@code maximumPoolSize < corePoolSize} 1122 * @throws NullPointerException if {@code workQueue} is null 1123 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)1124 public ThreadPoolExecutor(int corePoolSize, 1125 int maximumPoolSize, 1126 long keepAliveTime, 1127 TimeUnit unit, 1128 BlockingQueue<Runnable> workQueue) { 1129 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1130 Executors.defaultThreadFactory(), defaultHandler); 1131 } 1132 1133 /** 1134 * Creates a new {@code ThreadPoolExecutor} with the given initial 1135 * parameters and default rejected execution handler. 1136 * 1137 * @param corePoolSize the number of threads to keep in the pool, even 1138 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1139 * @param maximumPoolSize the maximum number of threads to allow in the 1140 * pool 1141 * @param keepAliveTime when the number of threads is greater than 1142 * the core, this is the maximum time that excess idle threads 1143 * will wait for new tasks before terminating. 1144 * @param unit the time unit for the {@code keepAliveTime} argument 1145 * @param workQueue the queue to use for holding tasks before they are 1146 * executed. This queue will hold only the {@code Runnable} 1147 * tasks submitted by the {@code execute} method. 1148 * @param threadFactory the factory to use when the executor 1149 * creates a new thread 1150 * @throws IllegalArgumentException if one of the following holds:<br> 1151 * {@code corePoolSize < 0}<br> 1152 * {@code keepAliveTime < 0}<br> 1153 * {@code maximumPoolSize <= 0}<br> 1154 * {@code maximumPoolSize < corePoolSize} 1155 * @throws NullPointerException if {@code workQueue} 1156 * or {@code threadFactory} is null 1157 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)1158 public ThreadPoolExecutor(int corePoolSize, 1159 int maximumPoolSize, 1160 long keepAliveTime, 1161 TimeUnit unit, 1162 BlockingQueue<Runnable> workQueue, 1163 ThreadFactory threadFactory) { 1164 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1165 threadFactory, defaultHandler); 1166 } 1167 1168 /** 1169 * Creates a new {@code ThreadPoolExecutor} with the given initial 1170 * parameters and default thread factory. 1171 * 1172 * @param corePoolSize the number of threads to keep in the pool, even 1173 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1174 * @param maximumPoolSize the maximum number of threads to allow in the 1175 * pool 1176 * @param keepAliveTime when the number of threads is greater than 1177 * the core, this is the maximum time that excess idle threads 1178 * will wait for new tasks before terminating. 1179 * @param unit the time unit for the {@code keepAliveTime} argument 1180 * @param workQueue the queue to use for holding tasks before they are 1181 * executed. This queue will hold only the {@code Runnable} 1182 * tasks submitted by the {@code execute} method. 1183 * @param handler the handler to use when execution is blocked 1184 * because the thread bounds and queue capacities are reached 1185 * @throws IllegalArgumentException if one of the following holds:<br> 1186 * {@code corePoolSize < 0}<br> 1187 * {@code keepAliveTime < 0}<br> 1188 * {@code maximumPoolSize <= 0}<br> 1189 * {@code maximumPoolSize < corePoolSize} 1190 * @throws NullPointerException if {@code workQueue} 1191 * or {@code handler} is null 1192 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)1193 public ThreadPoolExecutor(int corePoolSize, 1194 int maximumPoolSize, 1195 long keepAliveTime, 1196 TimeUnit unit, 1197 BlockingQueue<Runnable> workQueue, 1198 RejectedExecutionHandler handler) { 1199 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1200 Executors.defaultThreadFactory(), handler); 1201 } 1202 1203 /** 1204 * Creates a new {@code ThreadPoolExecutor} with the given initial 1205 * parameters. 1206 * 1207 * @param corePoolSize the number of threads to keep in the pool, even 1208 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1209 * @param maximumPoolSize the maximum number of threads to allow in the 1210 * pool 1211 * @param keepAliveTime when the number of threads is greater than 1212 * the core, this is the maximum time that excess idle threads 1213 * will wait for new tasks before terminating. 1214 * @param unit the time unit for the {@code keepAliveTime} argument 1215 * @param workQueue the queue to use for holding tasks before they are 1216 * executed. This queue will hold only the {@code Runnable} 1217 * tasks submitted by the {@code execute} method. 1218 * @param threadFactory the factory to use when the executor 1219 * creates a new thread 1220 * @param handler the handler to use when execution is blocked 1221 * because the thread bounds and queue capacities are reached 1222 * @throws IllegalArgumentException if one of the following holds:<br> 1223 * {@code corePoolSize < 0}<br> 1224 * {@code keepAliveTime < 0}<br> 1225 * {@code maximumPoolSize <= 0}<br> 1226 * {@code maximumPoolSize < corePoolSize} 1227 * @throws NullPointerException if {@code workQueue} 1228 * or {@code threadFactory} or {@code handler} is null 1229 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)1230 public ThreadPoolExecutor(int corePoolSize, 1231 int maximumPoolSize, 1232 long keepAliveTime, 1233 TimeUnit unit, 1234 BlockingQueue<Runnable> workQueue, 1235 ThreadFactory threadFactory, 1236 RejectedExecutionHandler handler) { 1237 if (corePoolSize < 0 || 1238 maximumPoolSize <= 0 || 1239 maximumPoolSize < corePoolSize || 1240 keepAliveTime < 0) 1241 throw new IllegalArgumentException(); 1242 if (workQueue == null || threadFactory == null || handler == null) 1243 throw new NullPointerException(); 1244 this.corePoolSize = corePoolSize; 1245 this.maximumPoolSize = maximumPoolSize; 1246 this.workQueue = workQueue; 1247 this.keepAliveTime = unit.toNanos(keepAliveTime); 1248 this.threadFactory = threadFactory; 1249 this.handler = handler; 1250 } 1251 1252 /** 1253 * Executes the given task sometime in the future. The task 1254 * may execute in a new thread or in an existing pooled thread. 1255 * 1256 * If the task cannot be submitted for execution, either because this 1257 * executor has been shutdown or because its capacity has been reached, 1258 * the task is handled by the current {@code RejectedExecutionHandler}. 1259 * 1260 * @param command the task to execute 1261 * @throws RejectedExecutionException at discretion of 1262 * {@code RejectedExecutionHandler}, if the task 1263 * cannot be accepted for execution 1264 * @throws NullPointerException if {@code command} is null 1265 */ execute(Runnable command)1266 public void execute(Runnable command) { 1267 if (command == null) 1268 throw new NullPointerException(); 1269 /* 1270 * Proceed in 3 steps: 1271 * 1272 * 1. If fewer than corePoolSize threads are running, try to 1273 * start a new thread with the given command as its first 1274 * task. The call to addWorker atomically checks runState and 1275 * workerCount, and so prevents false alarms that would add 1276 * threads when it shouldn't, by returning false. 1277 * 1278 * 2. If a task can be successfully queued, then we still need 1279 * to double-check whether we should have added a thread 1280 * (because existing ones died since last checking) or that 1281 * the pool shut down since entry into this method. So we 1282 * recheck state and if necessary roll back the enqueuing if 1283 * stopped, or start a new thread if there are none. 1284 * 1285 * 3. If we cannot queue task, then we try to add a new 1286 * thread. If it fails, we know we are shut down or saturated 1287 * and so reject the task. 1288 */ 1289 int c = ctl.get(); 1290 if (workerCountOf(c) < corePoolSize) { 1291 if (addWorker(command, true)) 1292 return; 1293 c = ctl.get(); 1294 } 1295 if (isRunning(c) && workQueue.offer(command)) { 1296 int recheck = ctl.get(); 1297 if (! isRunning(recheck) && remove(command)) 1298 reject(command); 1299 else if (workerCountOf(recheck) == 0) 1300 addWorker(null, false); 1301 } 1302 else if (!addWorker(command, false)) 1303 reject(command); 1304 } 1305 1306 /** 1307 * Initiates an orderly shutdown in which previously submitted 1308 * tasks are executed, but no new tasks will be accepted. 1309 * Invocation has no additional effect if already shut down. 1310 * 1311 * <p>This method does not wait for previously submitted tasks to 1312 * complete execution. Use {@link #awaitTermination awaitTermination} 1313 * to do that. 1314 * 1315 * @throws SecurityException {@inheritDoc} 1316 */ shutdown()1317 public void shutdown() { 1318 final ReentrantLock mainLock = this.mainLock; 1319 mainLock.lock(); 1320 try { 1321 checkShutdownAccess(); 1322 advanceRunState(SHUTDOWN); 1323 interruptIdleWorkers(); 1324 onShutdown(); // hook for ScheduledThreadPoolExecutor 1325 } finally { 1326 mainLock.unlock(); 1327 } 1328 tryTerminate(); 1329 } 1330 1331 /** 1332 * Attempts to stop all actively executing tasks, halts the 1333 * processing of waiting tasks, and returns a list of the tasks 1334 * that were awaiting execution. These tasks are drained (removed) 1335 * from the task queue upon return from this method. 1336 * 1337 * <p>This method does not wait for actively executing tasks to 1338 * terminate. Use {@link #awaitTermination awaitTermination} to 1339 * do that. 1340 * 1341 * <p>There are no guarantees beyond best-effort attempts to stop 1342 * processing actively executing tasks. This implementation 1343 * cancels tasks via {@link Thread#interrupt}, so any task that 1344 * fails to respond to interrupts may never terminate. 1345 * 1346 * @throws SecurityException {@inheritDoc} 1347 */ shutdownNow()1348 public List<Runnable> shutdownNow() { 1349 List<Runnable> tasks; 1350 final ReentrantLock mainLock = this.mainLock; 1351 mainLock.lock(); 1352 try { 1353 checkShutdownAccess(); 1354 advanceRunState(STOP); 1355 interruptWorkers(); 1356 tasks = drainQueue(); 1357 } finally { 1358 mainLock.unlock(); 1359 } 1360 tryTerminate(); 1361 return tasks; 1362 } 1363 isShutdown()1364 public boolean isShutdown() { 1365 return ! isRunning(ctl.get()); 1366 } 1367 1368 /** 1369 * Returns true if this executor is in the process of terminating 1370 * after {@link #shutdown} or {@link #shutdownNow} but has not 1371 * completely terminated. This method may be useful for 1372 * debugging. A return of {@code true} reported a sufficient 1373 * period after shutdown may indicate that submitted tasks have 1374 * ignored or suppressed interruption, causing this executor not 1375 * to properly terminate. 1376 * 1377 * @return true if terminating but not yet terminated 1378 */ isTerminating()1379 public boolean isTerminating() { 1380 int c = ctl.get(); 1381 return ! isRunning(c) && runStateLessThan(c, TERMINATED); 1382 } 1383 isTerminated()1384 public boolean isTerminated() { 1385 return runStateAtLeast(ctl.get(), TERMINATED); 1386 } 1387 awaitTermination(long timeout, TimeUnit unit)1388 public boolean awaitTermination(long timeout, TimeUnit unit) 1389 throws InterruptedException { 1390 long nanos = unit.toNanos(timeout); 1391 final ReentrantLock mainLock = this.mainLock; 1392 mainLock.lock(); 1393 try { 1394 for (;;) { 1395 if (runStateAtLeast(ctl.get(), TERMINATED)) 1396 return true; 1397 if (nanos <= 0) 1398 return false; 1399 nanos = termination.awaitNanos(nanos); 1400 } 1401 } finally { 1402 mainLock.unlock(); 1403 } 1404 } 1405 1406 /** 1407 * Invokes {@code shutdown} when this executor is no longer 1408 * referenced and it has no threads. 1409 */ finalize()1410 protected void finalize() { 1411 shutdown(); 1412 } 1413 1414 /** 1415 * Sets the thread factory used to create new threads. 1416 * 1417 * @param threadFactory the new thread factory 1418 * @throws NullPointerException if threadFactory is null 1419 * @see #getThreadFactory 1420 */ setThreadFactory(ThreadFactory threadFactory)1421 public void setThreadFactory(ThreadFactory threadFactory) { 1422 if (threadFactory == null) 1423 throw new NullPointerException(); 1424 this.threadFactory = threadFactory; 1425 } 1426 1427 /** 1428 * Returns the thread factory used to create new threads. 1429 * 1430 * @return the current thread factory 1431 * @see #setThreadFactory 1432 */ getThreadFactory()1433 public ThreadFactory getThreadFactory() { 1434 return threadFactory; 1435 } 1436 1437 /** 1438 * Sets a new handler for unexecutable tasks. 1439 * 1440 * @param handler the new handler 1441 * @throws NullPointerException if handler is null 1442 * @see #getRejectedExecutionHandler 1443 */ setRejectedExecutionHandler(RejectedExecutionHandler handler)1444 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1445 if (handler == null) 1446 throw new NullPointerException(); 1447 this.handler = handler; 1448 } 1449 1450 /** 1451 * Returns the current handler for unexecutable tasks. 1452 * 1453 * @return the current handler 1454 * @see #setRejectedExecutionHandler 1455 */ getRejectedExecutionHandler()1456 public RejectedExecutionHandler getRejectedExecutionHandler() { 1457 return handler; 1458 } 1459 1460 /** 1461 * Sets the core number of threads. This overrides any value set 1462 * in the constructor. If the new value is smaller than the 1463 * current value, excess existing threads will be terminated when 1464 * they next become idle. If larger, new threads will, if needed, 1465 * be started to execute any queued tasks. 1466 * 1467 * @param corePoolSize the new core size 1468 * @throws IllegalArgumentException if {@code corePoolSize < 0} 1469 * @see #getCorePoolSize 1470 */ setCorePoolSize(int corePoolSize)1471 public void setCorePoolSize(int corePoolSize) { 1472 if (corePoolSize < 0) 1473 throw new IllegalArgumentException(); 1474 int delta = corePoolSize - this.corePoolSize; 1475 this.corePoolSize = corePoolSize; 1476 if (workerCountOf(ctl.get()) > corePoolSize) 1477 interruptIdleWorkers(); 1478 else if (delta > 0) { 1479 // We don't really know how many new threads are "needed". 1480 // As a heuristic, prestart enough new workers (up to new 1481 // core size) to handle the current number of tasks in 1482 // queue, but stop if queue becomes empty while doing so. 1483 int k = Math.min(delta, workQueue.size()); 1484 while (k-- > 0 && addWorker(null, true)) { 1485 if (workQueue.isEmpty()) 1486 break; 1487 } 1488 } 1489 } 1490 1491 /** 1492 * Returns the core number of threads. 1493 * 1494 * @return the core number of threads 1495 * @see #setCorePoolSize 1496 */ getCorePoolSize()1497 public int getCorePoolSize() { 1498 return corePoolSize; 1499 } 1500 1501 /** 1502 * Starts a core thread, causing it to idly wait for work. This 1503 * overrides the default policy of starting core threads only when 1504 * new tasks are executed. This method will return {@code false} 1505 * if all core threads have already been started. 1506 * 1507 * @return {@code true} if a thread was started 1508 */ prestartCoreThread()1509 public boolean prestartCoreThread() { 1510 return workerCountOf(ctl.get()) < corePoolSize && 1511 addWorker(null, true); 1512 } 1513 1514 /** 1515 * Starts all core threads, causing them to idly wait for work. This 1516 * overrides the default policy of starting core threads only when 1517 * new tasks are executed. 1518 * 1519 * @return the number of threads started 1520 */ prestartAllCoreThreads()1521 public int prestartAllCoreThreads() { 1522 int n = 0; 1523 while (addWorker(null, true)) 1524 ++n; 1525 return n; 1526 } 1527 1528 /** 1529 * Returns true if this pool allows core threads to time out and 1530 * terminate if no tasks arrive within the keepAlive time, being 1531 * replaced if needed when new tasks arrive. When true, the same 1532 * keep-alive policy applying to non-core threads applies also to 1533 * core threads. When false (the default), core threads are never 1534 * terminated due to lack of incoming tasks. 1535 * 1536 * @return {@code true} if core threads are allowed to time out, 1537 * else {@code false} 1538 * 1539 * @since 1.6 1540 */ allowsCoreThreadTimeOut()1541 public boolean allowsCoreThreadTimeOut() { 1542 return allowCoreThreadTimeOut; 1543 } 1544 1545 /** 1546 * Sets the policy governing whether core threads may time out and 1547 * terminate if no tasks arrive within the keep-alive time, being 1548 * replaced if needed when new tasks arrive. When false, core 1549 * threads are never terminated due to lack of incoming 1550 * tasks. When true, the same keep-alive policy applying to 1551 * non-core threads applies also to core threads. To avoid 1552 * continual thread replacement, the keep-alive time must be 1553 * greater than zero when setting {@code true}. This method 1554 * should in general be called before the pool is actively used. 1555 * 1556 * @param value {@code true} if should time out, else {@code false} 1557 * @throws IllegalArgumentException if value is {@code true} 1558 * and the current keep-alive time is not greater than zero 1559 * 1560 * @since 1.6 1561 */ allowCoreThreadTimeOut(boolean value)1562 public void allowCoreThreadTimeOut(boolean value) { 1563 if (value && keepAliveTime <= 0) 1564 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1565 if (value != allowCoreThreadTimeOut) { 1566 allowCoreThreadTimeOut = value; 1567 if (value) 1568 interruptIdleWorkers(); 1569 } 1570 } 1571 1572 /** 1573 * Sets the maximum allowed number of threads. This overrides any 1574 * value set in the constructor. If the new value is smaller than 1575 * the current value, excess existing threads will be 1576 * terminated when they next become idle. 1577 * 1578 * @param maximumPoolSize the new maximum 1579 * @throws IllegalArgumentException if the new maximum is 1580 * less than or equal to zero, or 1581 * less than the {@linkplain #getCorePoolSize core pool size} 1582 * @see #getMaximumPoolSize 1583 */ setMaximumPoolSize(int maximumPoolSize)1584 public void setMaximumPoolSize(int maximumPoolSize) { 1585 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1586 throw new IllegalArgumentException(); 1587 this.maximumPoolSize = maximumPoolSize; 1588 if (workerCountOf(ctl.get()) > maximumPoolSize) 1589 interruptIdleWorkers(); 1590 } 1591 1592 /** 1593 * Returns the maximum allowed number of threads. 1594 * 1595 * @return the maximum allowed number of threads 1596 * @see #setMaximumPoolSize 1597 */ getMaximumPoolSize()1598 public int getMaximumPoolSize() { 1599 return maximumPoolSize; 1600 } 1601 1602 /** 1603 * Sets the time limit for which threads may remain idle before 1604 * being terminated. If there are more than the core number of 1605 * threads currently in the pool, after waiting this amount of 1606 * time without processing a task, excess threads will be 1607 * terminated. This overrides any value set in the constructor. 1608 * 1609 * @param time the time to wait. A time value of zero will cause 1610 * excess threads to terminate immediately after executing tasks. 1611 * @param unit the time unit of the {@code time} argument 1612 * @throws IllegalArgumentException if {@code time} less than zero or 1613 * if {@code time} is zero and {@code allowsCoreThreadTimeOut} 1614 * @see #getKeepAliveTime 1615 */ setKeepAliveTime(long time, TimeUnit unit)1616 public void setKeepAliveTime(long time, TimeUnit unit) { 1617 if (time < 0) 1618 throw new IllegalArgumentException(); 1619 if (time == 0 && allowsCoreThreadTimeOut()) 1620 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1621 long keepAliveTime = unit.toNanos(time); 1622 long delta = keepAliveTime - this.keepAliveTime; 1623 this.keepAliveTime = keepAliveTime; 1624 if (delta < 0) 1625 interruptIdleWorkers(); 1626 } 1627 1628 /** 1629 * Returns the thread keep-alive time, which is the amount of time 1630 * that threads in excess of the core pool size may remain 1631 * idle before being terminated. 1632 * 1633 * @param unit the desired time unit of the result 1634 * @return the time limit 1635 * @see #setKeepAliveTime 1636 */ getKeepAliveTime(TimeUnit unit)1637 public long getKeepAliveTime(TimeUnit unit) { 1638 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1639 } 1640 1641 /* User-level queue utilities */ 1642 1643 /** 1644 * Returns the task queue used by this executor. Access to the 1645 * task queue is intended primarily for debugging and monitoring. 1646 * This queue may be in active use. Retrieving the task queue 1647 * does not prevent queued tasks from executing. 1648 * 1649 * @return the task queue 1650 */ getQueue()1651 public BlockingQueue<Runnable> getQueue() { 1652 return workQueue; 1653 } 1654 1655 /** 1656 * Removes this task from the executor's internal queue if it is 1657 * present, thus causing it not to be run if it has not already 1658 * started. 1659 * 1660 * <p> This method may be useful as one part of a cancellation 1661 * scheme. It may fail to remove tasks that have been converted 1662 * into other forms before being placed on the internal queue. For 1663 * example, a task entered using {@code submit} might be 1664 * converted into a form that maintains {@code Future} status. 1665 * However, in such cases, method {@link #purge} may be used to 1666 * remove those Futures that have been cancelled. 1667 * 1668 * @param task the task to remove 1669 * @return true if the task was removed 1670 */ remove(Runnable task)1671 public boolean remove(Runnable task) { 1672 boolean removed = workQueue.remove(task); 1673 tryTerminate(); // In case SHUTDOWN and now empty 1674 return removed; 1675 } 1676 1677 /** 1678 * Tries to remove from the work queue all {@link Future} 1679 * tasks that have been cancelled. This method can be useful as a 1680 * storage reclamation operation, that has no other impact on 1681 * functionality. Cancelled tasks are never executed, but may 1682 * accumulate in work queues until worker threads can actively 1683 * remove them. Invoking this method instead tries to remove them now. 1684 * However, this method may fail to remove tasks in 1685 * the presence of interference by other threads. 1686 */ purge()1687 public void purge() { 1688 final BlockingQueue<Runnable> q = workQueue; 1689 try { 1690 Iterator<Runnable> it = q.iterator(); 1691 while (it.hasNext()) { 1692 Runnable r = it.next(); 1693 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1694 it.remove(); 1695 } 1696 } catch (ConcurrentModificationException fallThrough) { 1697 // Take slow path if we encounter interference during traversal. 1698 // Make copy for traversal and call remove for cancelled entries. 1699 // The slow path is more likely to be O(N*N). 1700 for (Object r : q.toArray()) 1701 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1702 q.remove(r); 1703 } 1704 1705 tryTerminate(); // In case SHUTDOWN and now empty 1706 } 1707 1708 /* Statistics */ 1709 1710 /** 1711 * Returns the current number of threads in the pool. 1712 * 1713 * @return the number of threads 1714 */ getPoolSize()1715 public int getPoolSize() { 1716 final ReentrantLock mainLock = this.mainLock; 1717 mainLock.lock(); 1718 try { 1719 // Remove rare and surprising possibility of 1720 // isTerminated() && getPoolSize() > 0 1721 return runStateAtLeast(ctl.get(), TIDYING) ? 0 1722 : workers.size(); 1723 } finally { 1724 mainLock.unlock(); 1725 } 1726 } 1727 1728 /** 1729 * Returns the approximate number of threads that are actively 1730 * executing tasks. 1731 * 1732 * @return the number of threads 1733 */ getActiveCount()1734 public int getActiveCount() { 1735 final ReentrantLock mainLock = this.mainLock; 1736 mainLock.lock(); 1737 try { 1738 int n = 0; 1739 for (Worker w : workers) 1740 if (w.isLocked()) 1741 ++n; 1742 return n; 1743 } finally { 1744 mainLock.unlock(); 1745 } 1746 } 1747 1748 /** 1749 * Returns the largest number of threads that have ever 1750 * simultaneously been in the pool. 1751 * 1752 * @return the number of threads 1753 */ getLargestPoolSize()1754 public int getLargestPoolSize() { 1755 final ReentrantLock mainLock = this.mainLock; 1756 mainLock.lock(); 1757 try { 1758 return largestPoolSize; 1759 } finally { 1760 mainLock.unlock(); 1761 } 1762 } 1763 1764 /** 1765 * Returns the approximate total number of tasks that have ever been 1766 * scheduled for execution. Because the states of tasks and 1767 * threads may change dynamically during computation, the returned 1768 * value is only an approximation. 1769 * 1770 * @return the number of tasks 1771 */ getTaskCount()1772 public long getTaskCount() { 1773 final ReentrantLock mainLock = this.mainLock; 1774 mainLock.lock(); 1775 try { 1776 long n = completedTaskCount; 1777 for (Worker w : workers) { 1778 n += w.completedTasks; 1779 if (w.isLocked()) 1780 ++n; 1781 } 1782 return n + workQueue.size(); 1783 } finally { 1784 mainLock.unlock(); 1785 } 1786 } 1787 1788 /** 1789 * Returns the approximate total number of tasks that have 1790 * completed execution. Because the states of tasks and threads 1791 * may change dynamically during computation, the returned value 1792 * is only an approximation, but one that does not ever decrease 1793 * across successive calls. 1794 * 1795 * @return the number of tasks 1796 */ getCompletedTaskCount()1797 public long getCompletedTaskCount() { 1798 final ReentrantLock mainLock = this.mainLock; 1799 mainLock.lock(); 1800 try { 1801 long n = completedTaskCount; 1802 for (Worker w : workers) 1803 n += w.completedTasks; 1804 return n; 1805 } finally { 1806 mainLock.unlock(); 1807 } 1808 } 1809 1810 /** 1811 * Returns a string identifying this pool, as well as its state, 1812 * including indications of run state and estimated worker and 1813 * task counts. 1814 * 1815 * @return a string identifying this pool, as well as its state 1816 */ toString()1817 public String toString() { 1818 long ncompleted; 1819 int nworkers, nactive; 1820 final ReentrantLock mainLock = this.mainLock; 1821 mainLock.lock(); 1822 try { 1823 ncompleted = completedTaskCount; 1824 nactive = 0; 1825 nworkers = workers.size(); 1826 for (Worker w : workers) { 1827 ncompleted += w.completedTasks; 1828 if (w.isLocked()) 1829 ++nactive; 1830 } 1831 } finally { 1832 mainLock.unlock(); 1833 } 1834 int c = ctl.get(); 1835 String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" : 1836 (runStateAtLeast(c, TERMINATED) ? "Terminated" : 1837 "Shutting down")); 1838 return super.toString() + 1839 "[" + rs + 1840 ", pool size = " + nworkers + 1841 ", active threads = " + nactive + 1842 ", queued tasks = " + workQueue.size() + 1843 ", completed tasks = " + ncompleted + 1844 "]"; 1845 } 1846 1847 /* Extension hooks */ 1848 1849 /** 1850 * Method invoked prior to executing the given Runnable in the 1851 * given thread. This method is invoked by thread {@code t} that 1852 * will execute task {@code r}, and may be used to re-initialize 1853 * ThreadLocals, or to perform logging. 1854 * 1855 * <p>This implementation does nothing, but may be customized in 1856 * subclasses. Note: To properly nest multiple overridings, subclasses 1857 * should generally invoke {@code super.beforeExecute} at the end of 1858 * this method. 1859 * 1860 * @param t the thread that will run task {@code r} 1861 * @param r the task that will be executed 1862 */ beforeExecute(Thread t, Runnable r)1863 protected void beforeExecute(Thread t, Runnable r) { } 1864 1865 /** 1866 * Method invoked upon completion of execution of the given Runnable. 1867 * This method is invoked by the thread that executed the task. If 1868 * non-null, the Throwable is the uncaught {@code RuntimeException} 1869 * or {@code Error} that caused execution to terminate abruptly. 1870 * 1871 * <p>This implementation does nothing, but may be customized in 1872 * subclasses. Note: To properly nest multiple overridings, subclasses 1873 * should generally invoke {@code super.afterExecute} at the 1874 * beginning of this method. 1875 * 1876 * <p><b>Note:</b> When actions are enclosed in tasks (such as 1877 * {@link FutureTask}) either explicitly or via methods such as 1878 * {@code submit}, these task objects catch and maintain 1879 * computational exceptions, and so they do not cause abrupt 1880 * termination, and the internal exceptions are <em>not</em> 1881 * passed to this method. If you would like to trap both kinds of 1882 * failures in this method, you can further probe for such cases, 1883 * as in this sample subclass that prints either the direct cause 1884 * or the underlying exception if a task has been aborted: 1885 * 1886 * <pre> {@code 1887 * class ExtendedExecutor extends ThreadPoolExecutor { 1888 * // ... 1889 * protected void afterExecute(Runnable r, Throwable t) { 1890 * super.afterExecute(r, t); 1891 * if (t == null && r instanceof Future<?>) { 1892 * try { 1893 * Object result = ((Future<?>) r).get(); 1894 * } catch (CancellationException ce) { 1895 * t = ce; 1896 * } catch (ExecutionException ee) { 1897 * t = ee.getCause(); 1898 * } catch (InterruptedException ie) { 1899 * Thread.currentThread().interrupt(); // ignore/reset 1900 * } 1901 * } 1902 * if (t != null) 1903 * System.out.println(t); 1904 * } 1905 * }}</pre> 1906 * 1907 * @param r the runnable that has completed 1908 * @param t the exception that caused termination, or null if 1909 * execution completed normally 1910 */ afterExecute(Runnable r, Throwable t)1911 protected void afterExecute(Runnable r, Throwable t) { } 1912 1913 /** 1914 * Method invoked when the Executor has terminated. Default 1915 * implementation does nothing. Note: To properly nest multiple 1916 * overridings, subclasses should generally invoke 1917 * {@code super.terminated} within this method. 1918 */ terminated()1919 protected void terminated() { } 1920 1921 /* Predefined RejectedExecutionHandlers */ 1922 1923 /** 1924 * A handler for rejected tasks that runs the rejected task 1925 * directly in the calling thread of the {@code execute} method, 1926 * unless the executor has been shut down, in which case the task 1927 * is discarded. 1928 */ 1929 public static class CallerRunsPolicy implements RejectedExecutionHandler { 1930 /** 1931 * Creates a {@code CallerRunsPolicy}. 1932 */ CallerRunsPolicy()1933 public CallerRunsPolicy() { } 1934 1935 /** 1936 * Executes task r in the caller's thread, unless the executor 1937 * has been shut down, in which case the task is discarded. 1938 * 1939 * @param r the runnable task requested to be executed 1940 * @param e the executor attempting to execute this task 1941 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1942 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1943 if (!e.isShutdown()) { 1944 r.run(); 1945 } 1946 } 1947 } 1948 1949 /** 1950 * A handler for rejected tasks that throws a 1951 * {@code RejectedExecutionException}. 1952 */ 1953 public static class AbortPolicy implements RejectedExecutionHandler { 1954 /** 1955 * Creates an {@code AbortPolicy}. 1956 */ AbortPolicy()1957 public AbortPolicy() { } 1958 1959 /** 1960 * Always throws RejectedExecutionException. 1961 * 1962 * @param r the runnable task requested to be executed 1963 * @param e the executor attempting to execute this task 1964 * @throws RejectedExecutionException always. 1965 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1966 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1967 throw new RejectedExecutionException("Task " + r.toString() + 1968 " rejected from " + 1969 e.toString()); 1970 } 1971 } 1972 1973 /** 1974 * A handler for rejected tasks that silently discards the 1975 * rejected task. 1976 */ 1977 public static class DiscardPolicy implements RejectedExecutionHandler { 1978 /** 1979 * Creates a {@code DiscardPolicy}. 1980 */ DiscardPolicy()1981 public DiscardPolicy() { } 1982 1983 /** 1984 * Does nothing, which has the effect of discarding task r. 1985 * 1986 * @param r the runnable task requested to be executed 1987 * @param e the executor attempting to execute this task 1988 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)1989 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 1990 } 1991 } 1992 1993 /** 1994 * A handler for rejected tasks that discards the oldest unhandled 1995 * request and then retries {@code execute}, unless the executor 1996 * is shut down, in which case the task is discarded. 1997 */ 1998 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 1999 /** 2000 * Creates a {@code DiscardOldestPolicy} for the given executor. 2001 */ DiscardOldestPolicy()2002 public DiscardOldestPolicy() { } 2003 2004 /** 2005 * Obtains and ignores the next task that the executor 2006 * would otherwise execute, if one is immediately available, 2007 * and then retries execution of task r, unless the executor 2008 * is shut down, in which case task r is instead discarded. 2009 * 2010 * @param r the runnable task requested to be executed 2011 * @param e the executor attempting to execute this task 2012 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2013 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2014 if (!e.isShutdown()) { 2015 e.getQueue().poll(); 2016 e.execute(r); 2017 } 2018 } 2019 } 2020 } 2021