1 /* 2 * Written by Doug Lea with assistance from members of JCP JSR-166 3 * Expert Group and released to the public domain, as explained at 4 * http://creativecommons.org/publicdomain/zero/1.0/ 5 */ 6 7 package java.util.concurrent; 8 9 import java.util.ArrayList; 10 import java.util.ConcurrentModificationException; 11 import java.util.HashSet; 12 import java.util.Iterator; 13 import java.util.List; 14 import java.util.concurrent.atomic.AtomicInteger; 15 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 16 import java.util.concurrent.locks.Condition; 17 import java.util.concurrent.locks.ReentrantLock; 18 19 // BEGIN android-note 20 // removed security manager docs 21 // END android-note 22 23 /** 24 * An {@link ExecutorService} that executes each submitted task using 25 * one of possibly several pooled threads, normally configured 26 * using {@link Executors} factory methods. 27 * 28 * <p>Thread pools address two different problems: they usually 29 * provide improved performance when executing large numbers of 30 * asynchronous tasks, due to reduced per-task invocation overhead, 31 * and they provide a means of bounding and managing the resources, 32 * including threads, consumed when executing a collection of tasks. 33 * Each {@code ThreadPoolExecutor} also maintains some basic 34 * statistics, such as the number of completed tasks. 35 * 36 * <p>To be useful across a wide range of contexts, this class 37 * provides many adjustable parameters and extensibility 38 * hooks. However, programmers are urged to use the more convenient 39 * {@link Executors} factory methods {@link 40 * Executors#newCachedThreadPool} (unbounded thread pool, with 41 * automatic thread reclamation), {@link Executors#newFixedThreadPool} 42 * (fixed size thread pool) and {@link 43 * Executors#newSingleThreadExecutor} (single background thread), that 44 * preconfigure settings for the most common usage 45 * scenarios. Otherwise, use the following guide when manually 46 * configuring and tuning this class: 47 * 48 * <dl> 49 * 50 * <dt>Core and maximum pool sizes</dt> 51 * 52 * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif"> 53 * A {@code ThreadPoolExecutor} will automatically adjust the 54 * pool size (see {@link #getPoolSize}) 55 * according to the bounds set by 56 * corePoolSize (see {@link #getCorePoolSize}) and 57 * maximumPoolSize (see {@link #getMaximumPoolSize}). 58 * 59 * When a new task is submitted in method {@link #execute(Runnable)}, 60 * and fewer than corePoolSize threads are running, a new thread is 61 * created to handle the request, even if other worker threads are 62 * idle. If there are more than corePoolSize but less than 63 * maximumPoolSize threads running, a new thread will be created only 64 * if the queue is full. By setting corePoolSize and maximumPoolSize 65 * the same, you create a fixed-size thread pool. By setting 66 * maximumPoolSize to an essentially unbounded value such as {@code 67 * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary 68 * number of concurrent tasks. Most typically, core and maximum pool 69 * sizes are set only upon construction, but they may also be changed 70 * dynamically using {@link #setCorePoolSize} and {@link 71 * #setMaximumPoolSize}. </dd> 72 * 73 * <dt>On-demand construction</dt> 74 * 75 * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif"> 76 * By default, even core threads are initially created and 77 * started only when new tasks arrive, but this can be overridden 78 * dynamically using method {@link #prestartCoreThread} or {@link 79 * #prestartAllCoreThreads}. You probably want to prestart threads if 80 * you construct the pool with a non-empty queue. </dd> 81 * 82 * <dt>Creating new threads</dt> 83 * 84 * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif"> 85 * New threads are created using a {@link ThreadFactory}. If not 86 * otherwise specified, a {@link Executors#defaultThreadFactory} is 87 * used, that creates threads to all be in the same {@link 88 * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and 89 * non-daemon status. By supplying a different ThreadFactory, you can 90 * alter the thread's name, thread group, priority, daemon status, 91 * etc. If a {@code ThreadFactory} fails to create a thread when asked 92 * by returning null from {@code newThread}, the executor will 93 * continue, but might not be able to execute any tasks. Threads 94 * should possess the "modifyThread" {@code RuntimePermission}. If 95 * worker threads or other threads using the pool do not possess this 96 * permission, service may be degraded: configuration changes may not 97 * take effect in a timely manner, and a shutdown pool may remain in a 98 * state in which termination is possible but not completed.</dd> 99 * 100 * <dt>Keep-alive times</dt> 101 * 102 * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif"> 103 * If the pool currently has more than corePoolSize threads, 104 * excess threads will be terminated if they have been idle for more 105 * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}). 106 * This provides a means of reducing resource consumption when the 107 * pool is not being actively used. If the pool becomes more active 108 * later, new threads will be constructed. This parameter can also be 109 * changed dynamically using method {@link #setKeepAliveTime(long, 110 * TimeUnit)}. Using a value of {@code Long.MAX_VALUE} {@link 111 * TimeUnit#NANOSECONDS} effectively disables idle threads from ever 112 * terminating prior to shut down. By default, the keep-alive policy 113 * applies only when there are more than corePoolSize threads, but 114 * method {@link #allowCoreThreadTimeOut(boolean)} can be used to 115 * apply this time-out policy to core threads as well, so long as the 116 * keepAliveTime value is non-zero. </dd> 117 * 118 * <dt>Queuing</dt> 119 * 120 * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif"> 121 * Any {@link BlockingQueue} may be used to transfer and hold 122 * submitted tasks. The use of this queue interacts with pool sizing: 123 * 124 * <ul> 125 * 126 * <li>If fewer than corePoolSize threads are running, the Executor 127 * always prefers adding a new thread 128 * rather than queuing. 129 * 130 * <li>If corePoolSize or more threads are running, the Executor 131 * always prefers queuing a request rather than adding a new 132 * thread. 133 * 134 * <li>If a request cannot be queued, a new thread is created unless 135 * this would exceed maximumPoolSize, in which case, the task will be 136 * rejected. 137 * 138 * </ul> 139 * 140 * There are three general strategies for queuing: 141 * <ol> 142 * 143 * <li><em> Direct handoffs.</em> A good default choice for a work 144 * queue is a {@link SynchronousQueue} that hands off tasks to threads 145 * without otherwise holding them. Here, an attempt to queue a task 146 * will fail if no threads are immediately available to run it, so a 147 * new thread will be constructed. This policy avoids lockups when 148 * handling sets of requests that might have internal dependencies. 149 * Direct handoffs generally require unbounded maximumPoolSizes to 150 * avoid rejection of new submitted tasks. This in turn admits the 151 * possibility of unbounded thread growth when commands continue to 152 * arrive on average faster than they can be processed. 153 * 154 * <li><em> Unbounded queues.</em> Using an unbounded queue (for 155 * example a {@link LinkedBlockingQueue} without a predefined 156 * capacity) will cause new tasks to wait in the queue when all 157 * corePoolSize threads are busy. Thus, no more than corePoolSize 158 * threads will ever be created. (And the value of the maximumPoolSize 159 * therefore doesn't have any effect.) This may be appropriate when 160 * each task is completely independent of others, so tasks cannot 161 * affect each others execution; for example, in a web page server. 162 * While this style of queuing can be useful in smoothing out 163 * transient bursts of requests, it admits the possibility of 164 * unbounded work queue growth when commands continue to arrive on 165 * average faster than they can be processed. 166 * 167 * <li><em>Bounded queues.</em> A bounded queue (for example, an 168 * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when 169 * used with finite maximumPoolSizes, but can be more difficult to 170 * tune and control. Queue sizes and maximum pool sizes may be traded 171 * off for each other: Using large queues and small pools minimizes 172 * CPU usage, OS resources, and context-switching overhead, but can 173 * lead to artificially low throughput. If tasks frequently block (for 174 * example if they are I/O bound), a system may be able to schedule 175 * time for more threads than you otherwise allow. Use of small queues 176 * generally requires larger pool sizes, which keeps CPUs busier but 177 * may encounter unacceptable scheduling overhead, which also 178 * decreases throughput. 179 * 180 * </ol> 181 * 182 * </dd> 183 * 184 * <dt>Rejected tasks</dt> 185 * 186 * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif"> 187 * New tasks submitted in method {@link #execute(Runnable)} will be 188 * <em>rejected</em> when the Executor has been shut down, and also when 189 * the Executor uses finite bounds for both maximum threads and work queue 190 * capacity, and is saturated. In either case, the {@code execute} method 191 * invokes the {@link 192 * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)} 193 * method of its {@link RejectedExecutionHandler}. Four predefined handler 194 * policies are provided: 195 * 196 * <ol> 197 * 198 * <li>In the default {@link ThreadPoolExecutor.AbortPolicy}, the 199 * handler throws a runtime {@link RejectedExecutionException} upon 200 * rejection. 201 * 202 * <li>In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread 203 * that invokes {@code execute} itself runs the task. This provides a 204 * simple feedback control mechanism that will slow down the rate that 205 * new tasks are submitted. 206 * 207 * <li>In {@link ThreadPoolExecutor.DiscardPolicy}, a task that 208 * cannot be executed is simply dropped. 209 * 210 * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the 211 * executor is not shut down, the task at the head of the work queue 212 * is dropped, and then execution is retried (which can fail again, 213 * causing this to be repeated.) 214 * 215 * </ol> 216 * 217 * It is possible to define and use other kinds of {@link 218 * RejectedExecutionHandler} classes. Doing so requires some care 219 * especially when policies are designed to work only under particular 220 * capacity or queuing policies. </dd> 221 * 222 * <dt>Hook methods</dt> 223 * 224 * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif"> 225 * This class provides {@code protected} overridable 226 * {@link #beforeExecute(Thread, Runnable)} and 227 * {@link #afterExecute(Runnable, Throwable)} methods that are called 228 * before and after execution of each task. These can be used to 229 * manipulate the execution environment; for example, reinitializing 230 * ThreadLocals, gathering statistics, or adding log entries. 231 * Additionally, method {@link #terminated} can be overridden to perform 232 * any special processing that needs to be done once the Executor has 233 * fully terminated. 234 * 235 * <p>If hook, callback, or BlockingQueue methods throw exceptions, 236 * internal worker threads may in turn fail, abruptly terminate, and 237 * possibly be replaced.</dd> 238 * 239 * <dt>Queue maintenance</dt> 240 * 241 * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif"> 242 * Method {@link #getQueue()} allows access to the work queue 243 * for purposes of monitoring and debugging. Use of this method for 244 * any other purpose is strongly discouraged. Two supplied methods, 245 * {@link #remove(Runnable)} and {@link #purge} are available to 246 * assist in storage reclamation when large numbers of queued tasks 247 * become cancelled.</dd> 248 * 249 * <dt>Finalization</dt> 250 * 251 * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif"> 252 * A pool that is no longer referenced in a program <em>AND</em> 253 * has no remaining threads will be {@code shutdown} automatically. If 254 * you would like to ensure that unreferenced pools are reclaimed even 255 * if users forget to call {@link #shutdown}, then you must arrange 256 * that unused threads eventually die, by setting appropriate 257 * keep-alive times, using a lower bound of zero core threads and/or 258 * setting {@link #allowCoreThreadTimeOut(boolean)}. </dd> 259 * 260 * </dl> 261 * 262 * <p><b>Extension example</b>. Most extensions of this class 263 * override one or more of the protected hook methods. For example, 264 * here is a subclass that adds a simple pause/resume feature: 265 * 266 * <pre> {@code 267 * class PausableThreadPoolExecutor extends ThreadPoolExecutor { 268 * private boolean isPaused; 269 * private ReentrantLock pauseLock = new ReentrantLock(); 270 * private Condition unpaused = pauseLock.newCondition(); 271 * 272 * public PausableThreadPoolExecutor(...) { super(...); } 273 * 274 * protected void beforeExecute(Thread t, Runnable r) { 275 * super.beforeExecute(t, r); 276 * pauseLock.lock(); 277 * try { 278 * while (isPaused) unpaused.await(); 279 * } catch (InterruptedException ie) { 280 * t.interrupt(); 281 * } finally { 282 * pauseLock.unlock(); 283 * } 284 * } 285 * 286 * public void pause() { 287 * pauseLock.lock(); 288 * try { 289 * isPaused = true; 290 * } finally { 291 * pauseLock.unlock(); 292 * } 293 * } 294 * 295 * public void resume() { 296 * pauseLock.lock(); 297 * try { 298 * isPaused = false; 299 * unpaused.signalAll(); 300 * } finally { 301 * pauseLock.unlock(); 302 * } 303 * } 304 * }}</pre> 305 * 306 * @since 1.5 307 * @author Doug Lea 308 */ 309 public class ThreadPoolExecutor extends AbstractExecutorService { 310 /** 311 * The main pool control state, ctl, is an atomic integer packing 312 * two conceptual fields 313 * workerCount, indicating the effective number of threads 314 * runState, indicating whether running, shutting down etc 315 * 316 * In order to pack them into one int, we limit workerCount to 317 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 318 * billion) otherwise representable. If this is ever an issue in 319 * the future, the variable can be changed to be an AtomicLong, 320 * and the shift/mask constants below adjusted. But until the need 321 * arises, this code is a bit faster and simpler using an int. 322 * 323 * The workerCount is the number of workers that have been 324 * permitted to start and not permitted to stop. The value may be 325 * transiently different from the actual number of live threads, 326 * for example when a ThreadFactory fails to create a thread when 327 * asked, and when exiting threads are still performing 328 * bookkeeping before terminating. The user-visible pool size is 329 * reported as the current size of the workers set. 330 * 331 * The runState provides the main lifecycle control, taking on values: 332 * 333 * RUNNING: Accept new tasks and process queued tasks 334 * SHUTDOWN: Don't accept new tasks, but process queued tasks 335 * STOP: Don't accept new tasks, don't process queued tasks, 336 * and interrupt in-progress tasks 337 * TIDYING: All tasks have terminated, workerCount is zero, 338 * the thread transitioning to state TIDYING 339 * will run the terminated() hook method 340 * TERMINATED: terminated() has completed 341 * 342 * The numerical order among these values matters, to allow 343 * ordered comparisons. The runState monotonically increases over 344 * time, but need not hit each state. The transitions are: 345 * 346 * RUNNING -> SHUTDOWN 347 * On invocation of shutdown(), perhaps implicitly in finalize() 348 * (RUNNING or SHUTDOWN) -> STOP 349 * On invocation of shutdownNow() 350 * SHUTDOWN -> TIDYING 351 * When both queue and pool are empty 352 * STOP -> TIDYING 353 * When pool is empty 354 * TIDYING -> TERMINATED 355 * When the terminated() hook method has completed 356 * 357 * Threads waiting in awaitTermination() will return when the 358 * state reaches TERMINATED. 359 * 360 * Detecting the transition from SHUTDOWN to TIDYING is less 361 * straightforward than you'd like because the queue may become 362 * empty after non-empty and vice versa during SHUTDOWN state, but 363 * we can only terminate if, after seeing that it is empty, we see 364 * that workerCount is 0 (which sometimes entails a recheck -- see 365 * below). 366 */ 367 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 368 private static final int COUNT_BITS = Integer.SIZE - 3; 369 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 370 371 // runState is stored in the high-order bits 372 private static final int RUNNING = -1 << COUNT_BITS; 373 private static final int SHUTDOWN = 0 << COUNT_BITS; 374 private static final int STOP = 1 << COUNT_BITS; 375 private static final int TIDYING = 2 << COUNT_BITS; 376 private static final int TERMINATED = 3 << COUNT_BITS; 377 378 // Packing and unpacking ctl runStateOf(int c)379 private static int runStateOf(int c) { return c & ~CAPACITY; } workerCountOf(int c)380 private static int workerCountOf(int c) { return c & CAPACITY; } ctlOf(int rs, int wc)381 private static int ctlOf(int rs, int wc) { return rs | wc; } 382 383 /* 384 * Bit field accessors that don't require unpacking ctl. 385 * These depend on the bit layout and on workerCount being never negative. 386 */ 387 runStateLessThan(int c, int s)388 private static boolean runStateLessThan(int c, int s) { 389 return c < s; 390 } 391 runStateAtLeast(int c, int s)392 private static boolean runStateAtLeast(int c, int s) { 393 return c >= s; 394 } 395 isRunning(int c)396 private static boolean isRunning(int c) { 397 return c < SHUTDOWN; 398 } 399 400 /** 401 * Attempts to CAS-increment the workerCount field of ctl. 402 */ compareAndIncrementWorkerCount(int expect)403 private boolean compareAndIncrementWorkerCount(int expect) { 404 return ctl.compareAndSet(expect, expect + 1); 405 } 406 407 /** 408 * Attempts to CAS-decrement the workerCount field of ctl. 409 */ compareAndDecrementWorkerCount(int expect)410 private boolean compareAndDecrementWorkerCount(int expect) { 411 return ctl.compareAndSet(expect, expect - 1); 412 } 413 414 /** 415 * Decrements the workerCount field of ctl. This is called only on 416 * abrupt termination of a thread (see processWorkerExit). Other 417 * decrements are performed within getTask. 418 */ decrementWorkerCount()419 private void decrementWorkerCount() { 420 do {} while (! compareAndDecrementWorkerCount(ctl.get())); 421 } 422 423 /** 424 * The queue used for holding tasks and handing off to worker 425 * threads. We do not require that workQueue.poll() returning 426 * null necessarily means that workQueue.isEmpty(), so rely 427 * solely on isEmpty to see if the queue is empty (which we must 428 * do for example when deciding whether to transition from 429 * SHUTDOWN to TIDYING). This accommodates special-purpose 430 * queues such as DelayQueues for which poll() is allowed to 431 * return null even if it may later return non-null when delays 432 * expire. 433 */ 434 private final BlockingQueue<Runnable> workQueue; 435 436 /** 437 * Lock held on access to workers set and related bookkeeping. 438 * While we could use a concurrent set of some sort, it turns out 439 * to be generally preferable to use a lock. Among the reasons is 440 * that this serializes interruptIdleWorkers, which avoids 441 * unnecessary interrupt storms, especially during shutdown. 442 * Otherwise exiting threads would concurrently interrupt those 443 * that have not yet interrupted. It also simplifies some of the 444 * associated statistics bookkeeping of largestPoolSize etc. We 445 * also hold mainLock on shutdown and shutdownNow, for the sake of 446 * ensuring workers set is stable while separately checking 447 * permission to interrupt and actually interrupting. 448 */ 449 private final ReentrantLock mainLock = new ReentrantLock(); 450 451 /** 452 * Set containing all worker threads in pool. Accessed only when 453 * holding mainLock. 454 */ 455 private final HashSet<Worker> workers = new HashSet<>(); 456 457 /** 458 * Wait condition to support awaitTermination. 459 */ 460 private final Condition termination = mainLock.newCondition(); 461 462 /** 463 * Tracks largest attained pool size. Accessed only under 464 * mainLock. 465 */ 466 private int largestPoolSize; 467 468 /** 469 * Counter for completed tasks. Updated only on termination of 470 * worker threads. Accessed only under mainLock. 471 */ 472 private long completedTaskCount; 473 474 /* 475 * All user control parameters are declared as volatiles so that 476 * ongoing actions are based on freshest values, but without need 477 * for locking, since no internal invariants depend on them 478 * changing synchronously with respect to other actions. 479 */ 480 481 /** 482 * Factory for new threads. All threads are created using this 483 * factory (via method addWorker). All callers must be prepared 484 * for addWorker to fail, which may reflect a system or user's 485 * policy limiting the number of threads. Even though it is not 486 * treated as an error, failure to create threads may result in 487 * new tasks being rejected or existing ones remaining stuck in 488 * the queue. 489 * 490 * We go further and preserve pool invariants even in the face of 491 * errors such as OutOfMemoryError, that might be thrown while 492 * trying to create threads. Such errors are rather common due to 493 * the need to allocate a native stack in Thread.start, and users 494 * will want to perform clean pool shutdown to clean up. There 495 * will likely be enough memory available for the cleanup code to 496 * complete without encountering yet another OutOfMemoryError. 497 */ 498 private volatile ThreadFactory threadFactory; 499 500 /** 501 * Handler called when saturated or shutdown in execute. 502 */ 503 private volatile RejectedExecutionHandler handler; 504 505 /** 506 * Timeout in nanoseconds for idle threads waiting for work. 507 * Threads use this timeout when there are more than corePoolSize 508 * present or if allowCoreThreadTimeOut. Otherwise they wait 509 * forever for new work. 510 */ 511 private volatile long keepAliveTime; 512 513 /** 514 * If false (default), core threads stay alive even when idle. 515 * If true, core threads use keepAliveTime to time out waiting 516 * for work. 517 */ 518 private volatile boolean allowCoreThreadTimeOut; 519 520 /** 521 * Core pool size is the minimum number of workers to keep alive 522 * (and not allow to time out etc) unless allowCoreThreadTimeOut 523 * is set, in which case the minimum is zero. 524 */ 525 private volatile int corePoolSize; 526 527 /** 528 * Maximum pool size. Note that the actual maximum is internally 529 * bounded by CAPACITY. 530 */ 531 private volatile int maximumPoolSize; 532 533 /** 534 * The default rejected execution handler. 535 */ 536 private static final RejectedExecutionHandler defaultHandler = 537 new AbortPolicy(); 538 539 /** 540 * Permission required for callers of shutdown and shutdownNow. 541 * We additionally require (see checkShutdownAccess) that callers 542 * have permission to actually interrupt threads in the worker set 543 * (as governed by Thread.interrupt, which relies on 544 * ThreadGroup.checkAccess, which in turn relies on 545 * SecurityManager.checkAccess). Shutdowns are attempted only if 546 * these checks pass. 547 * 548 * All actual invocations of Thread.interrupt (see 549 * interruptIdleWorkers and interruptWorkers) ignore 550 * SecurityExceptions, meaning that the attempted interrupts 551 * silently fail. In the case of shutdown, they should not fail 552 * unless the SecurityManager has inconsistent policies, sometimes 553 * allowing access to a thread and sometimes not. In such cases, 554 * failure to actually interrupt threads may disable or delay full 555 * termination. Other uses of interruptIdleWorkers are advisory, 556 * and failure to actually interrupt will merely delay response to 557 * configuration changes so is not handled exceptionally. 558 */ 559 private static final RuntimePermission shutdownPerm = 560 new RuntimePermission("modifyThread"); 561 562 /** 563 * Class Worker mainly maintains interrupt control state for 564 * threads running tasks, along with other minor bookkeeping. 565 * This class opportunistically extends AbstractQueuedSynchronizer 566 * to simplify acquiring and releasing a lock surrounding each 567 * task execution. This protects against interrupts that are 568 * intended to wake up a worker thread waiting for a task from 569 * instead interrupting a task being run. We implement a simple 570 * non-reentrant mutual exclusion lock rather than use 571 * ReentrantLock because we do not want worker tasks to be able to 572 * reacquire the lock when they invoke pool control methods like 573 * setCorePoolSize. Additionally, to suppress interrupts until 574 * the thread actually starts running tasks, we initialize lock 575 * state to a negative value, and clear it upon start (in 576 * runWorker). 577 */ 578 private final class Worker 579 extends AbstractQueuedSynchronizer 580 implements Runnable 581 { 582 /** 583 * This class will never be serialized, but we provide a 584 * serialVersionUID to suppress a javac warning. 585 */ 586 private static final long serialVersionUID = 6138294804551838833L; 587 588 /** Thread this worker is running in. Null if factory fails. */ 589 final Thread thread; 590 /** Initial task to run. Possibly null. */ 591 Runnable firstTask; 592 /** Per-thread task counter */ 593 volatile long completedTasks; 594 595 /** 596 * Creates with given first task and thread from ThreadFactory. 597 * @param firstTask the first task (null if none) 598 */ Worker(Runnable firstTask)599 Worker(Runnable firstTask) { 600 setState(-1); // inhibit interrupts until runWorker 601 this.firstTask = firstTask; 602 this.thread = getThreadFactory().newThread(this); 603 } 604 605 /** Delegates main run loop to outer runWorker. */ run()606 public void run() { 607 runWorker(this); 608 } 609 610 // Lock methods 611 // 612 // The value 0 represents the unlocked state. 613 // The value 1 represents the locked state. 614 isHeldExclusively()615 protected boolean isHeldExclusively() { 616 return getState() != 0; 617 } 618 tryAcquire(int unused)619 protected boolean tryAcquire(int unused) { 620 if (compareAndSetState(0, 1)) { 621 setExclusiveOwnerThread(Thread.currentThread()); 622 return true; 623 } 624 return false; 625 } 626 tryRelease(int unused)627 protected boolean tryRelease(int unused) { 628 setExclusiveOwnerThread(null); 629 setState(0); 630 return true; 631 } 632 lock()633 public void lock() { acquire(1); } tryLock()634 public boolean tryLock() { return tryAcquire(1); } unlock()635 public void unlock() { release(1); } isLocked()636 public boolean isLocked() { return isHeldExclusively(); } 637 interruptIfStarted()638 void interruptIfStarted() { 639 Thread t; 640 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 641 try { 642 t.interrupt(); 643 } catch (SecurityException ignore) { 644 } 645 } 646 } 647 } 648 649 /* 650 * Methods for setting control state 651 */ 652 653 /** 654 * Transitions runState to given target, or leaves it alone if 655 * already at least the given target. 656 * 657 * @param targetState the desired state, either SHUTDOWN or STOP 658 * (but not TIDYING or TERMINATED -- use tryTerminate for that) 659 */ advanceRunState(int targetState)660 private void advanceRunState(int targetState) { 661 // assert targetState == SHUTDOWN || targetState == STOP; 662 for (;;) { 663 int c = ctl.get(); 664 if (runStateAtLeast(c, targetState) || 665 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) 666 break; 667 } 668 } 669 670 /** 671 * Transitions to TERMINATED state if either (SHUTDOWN and pool 672 * and queue empty) or (STOP and pool empty). If otherwise 673 * eligible to terminate but workerCount is nonzero, interrupts an 674 * idle worker to ensure that shutdown signals propagate. This 675 * method must be called following any action that might make 676 * termination possible -- reducing worker count or removing tasks 677 * from the queue during shutdown. The method is non-private to 678 * allow access from ScheduledThreadPoolExecutor. 679 */ tryTerminate()680 final void tryTerminate() { 681 for (;;) { 682 int c = ctl.get(); 683 if (isRunning(c) || 684 runStateAtLeast(c, TIDYING) || 685 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) 686 return; 687 if (workerCountOf(c) != 0) { // Eligible to terminate 688 interruptIdleWorkers(ONLY_ONE); 689 return; 690 } 691 692 final ReentrantLock mainLock = this.mainLock; 693 mainLock.lock(); 694 try { 695 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 696 try { 697 terminated(); 698 } finally { 699 ctl.set(ctlOf(TERMINATED, 0)); 700 termination.signalAll(); 701 } 702 return; 703 } 704 } finally { 705 mainLock.unlock(); 706 } 707 // else retry on failed CAS 708 } 709 } 710 711 /* 712 * Methods for controlling interrupts to worker threads. 713 */ 714 715 /** 716 * If there is a security manager, makes sure caller has 717 * permission to shut down threads in general (see shutdownPerm). 718 * If this passes, additionally makes sure the caller is allowed 719 * to interrupt each worker thread. This might not be true even if 720 * first check passed, if the SecurityManager treats some threads 721 * specially. 722 */ checkShutdownAccess()723 private void checkShutdownAccess() { 724 SecurityManager security = System.getSecurityManager(); 725 if (security != null) { 726 security.checkPermission(shutdownPerm); 727 final ReentrantLock mainLock = this.mainLock; 728 mainLock.lock(); 729 try { 730 for (Worker w : workers) 731 security.checkAccess(w.thread); 732 } finally { 733 mainLock.unlock(); 734 } 735 } 736 } 737 738 /** 739 * Interrupts all threads, even if active. Ignores SecurityExceptions 740 * (in which case some threads may remain uninterrupted). 741 */ interruptWorkers()742 private void interruptWorkers() { 743 final ReentrantLock mainLock = this.mainLock; 744 mainLock.lock(); 745 try { 746 for (Worker w : workers) 747 w.interruptIfStarted(); 748 } finally { 749 mainLock.unlock(); 750 } 751 } 752 753 /** 754 * Interrupts threads that might be waiting for tasks (as 755 * indicated by not being locked) so they can check for 756 * termination or configuration changes. Ignores 757 * SecurityExceptions (in which case some threads may remain 758 * uninterrupted). 759 * 760 * @param onlyOne If true, interrupt at most one worker. This is 761 * called only from tryTerminate when termination is otherwise 762 * enabled but there are still other workers. In this case, at 763 * most one waiting worker is interrupted to propagate shutdown 764 * signals in case all threads are currently waiting. 765 * Interrupting any arbitrary thread ensures that newly arriving 766 * workers since shutdown began will also eventually exit. 767 * To guarantee eventual termination, it suffices to always 768 * interrupt only one idle worker, but shutdown() interrupts all 769 * idle workers so that redundant workers exit promptly, not 770 * waiting for a straggler task to finish. 771 */ interruptIdleWorkers(boolean onlyOne)772 private void interruptIdleWorkers(boolean onlyOne) { 773 final ReentrantLock mainLock = this.mainLock; 774 mainLock.lock(); 775 try { 776 for (Worker w : workers) { 777 Thread t = w.thread; 778 if (!t.isInterrupted() && w.tryLock()) { 779 try { 780 t.interrupt(); 781 } catch (SecurityException ignore) { 782 } finally { 783 w.unlock(); 784 } 785 } 786 if (onlyOne) 787 break; 788 } 789 } finally { 790 mainLock.unlock(); 791 } 792 } 793 794 /** 795 * Common form of interruptIdleWorkers, to avoid having to 796 * remember what the boolean argument means. 797 */ interruptIdleWorkers()798 private void interruptIdleWorkers() { 799 interruptIdleWorkers(false); 800 } 801 802 private static final boolean ONLY_ONE = true; 803 804 /* 805 * Misc utilities, most of which are also exported to 806 * ScheduledThreadPoolExecutor 807 */ 808 809 /** 810 * Invokes the rejected execution handler for the given command. 811 * Package-protected for use by ScheduledThreadPoolExecutor. 812 */ reject(Runnable command)813 final void reject(Runnable command) { 814 handler.rejectedExecution(command, this); 815 } 816 817 /** 818 * Performs any further cleanup following run state transition on 819 * invocation of shutdown. A no-op here, but used by 820 * ScheduledThreadPoolExecutor to cancel delayed tasks. 821 */ onShutdown()822 void onShutdown() { 823 } 824 825 /** 826 * State check needed by ScheduledThreadPoolExecutor to 827 * enable running tasks during shutdown. 828 * 829 * @param shutdownOK true if should return true if SHUTDOWN 830 */ isRunningOrShutdown(boolean shutdownOK)831 final boolean isRunningOrShutdown(boolean shutdownOK) { 832 int rs = runStateOf(ctl.get()); 833 return rs == RUNNING || (rs == SHUTDOWN && shutdownOK); 834 } 835 836 /** 837 * Drains the task queue into a new list, normally using 838 * drainTo. But if the queue is a DelayQueue or any other kind of 839 * queue for which poll or drainTo may fail to remove some 840 * elements, it deletes them one by one. 841 */ drainQueue()842 private List<Runnable> drainQueue() { 843 BlockingQueue<Runnable> q = workQueue; 844 ArrayList<Runnable> taskList = new ArrayList<>(); 845 q.drainTo(taskList); 846 if (!q.isEmpty()) { 847 for (Runnable r : q.toArray(new Runnable[0])) { 848 if (q.remove(r)) 849 taskList.add(r); 850 } 851 } 852 return taskList; 853 } 854 855 /* 856 * Methods for creating, running and cleaning up after workers 857 */ 858 859 /** 860 * Checks if a new worker can be added with respect to current 861 * pool state and the given bound (either core or maximum). If so, 862 * the worker count is adjusted accordingly, and, if possible, a 863 * new worker is created and started, running firstTask as its 864 * first task. This method returns false if the pool is stopped or 865 * eligible to shut down. It also returns false if the thread 866 * factory fails to create a thread when asked. If the thread 867 * creation fails, either due to the thread factory returning 868 * null, or due to an exception (typically OutOfMemoryError in 869 * Thread.start()), we roll back cleanly. 870 * 871 * @param firstTask the task the new thread should run first (or 872 * null if none). Workers are created with an initial first task 873 * (in method execute()) to bypass queuing when there are fewer 874 * than corePoolSize threads (in which case we always start one), 875 * or when the queue is full (in which case we must bypass queue). 876 * Initially idle threads are usually created via 877 * prestartCoreThread or to replace other dying workers. 878 * 879 * @param core if true use corePoolSize as bound, else 880 * maximumPoolSize. (A boolean indicator is used here rather than a 881 * value to ensure reads of fresh values after checking other pool 882 * state). 883 * @return true if successful 884 */ addWorker(Runnable firstTask, boolean core)885 private boolean addWorker(Runnable firstTask, boolean core) { 886 retry: 887 for (;;) { 888 int c = ctl.get(); 889 int rs = runStateOf(c); 890 891 // Check if queue empty only if necessary. 892 if (rs >= SHUTDOWN && 893 ! (rs == SHUTDOWN && 894 firstTask == null && 895 ! workQueue.isEmpty())) 896 return false; 897 898 for (;;) { 899 int wc = workerCountOf(c); 900 if (wc >= CAPACITY || 901 wc >= (core ? corePoolSize : maximumPoolSize)) 902 return false; 903 if (compareAndIncrementWorkerCount(c)) 904 break retry; 905 c = ctl.get(); // Re-read ctl 906 if (runStateOf(c) != rs) 907 continue retry; 908 // else CAS failed due to workerCount change; retry inner loop 909 } 910 } 911 912 boolean workerStarted = false; 913 boolean workerAdded = false; 914 Worker w = null; 915 try { 916 w = new Worker(firstTask); 917 final Thread t = w.thread; 918 if (t != null) { 919 final ReentrantLock mainLock = this.mainLock; 920 mainLock.lock(); 921 try { 922 // Recheck while holding lock. 923 // Back out on ThreadFactory failure or if 924 // shut down before lock acquired. 925 int rs = runStateOf(ctl.get()); 926 927 if (rs < SHUTDOWN || 928 (rs == SHUTDOWN && firstTask == null)) { 929 if (t.isAlive()) // precheck that t is startable 930 throw new IllegalThreadStateException(); 931 workers.add(w); 932 int s = workers.size(); 933 if (s > largestPoolSize) 934 largestPoolSize = s; 935 workerAdded = true; 936 } 937 } finally { 938 mainLock.unlock(); 939 } 940 if (workerAdded) { 941 t.start(); 942 workerStarted = true; 943 } 944 } 945 } finally { 946 if (! workerStarted) 947 addWorkerFailed(w); 948 } 949 return workerStarted; 950 } 951 952 /** 953 * Rolls back the worker thread creation. 954 * - removes worker from workers, if present 955 * - decrements worker count 956 * - rechecks for termination, in case the existence of this 957 * worker was holding up termination 958 */ addWorkerFailed(Worker w)959 private void addWorkerFailed(Worker w) { 960 final ReentrantLock mainLock = this.mainLock; 961 mainLock.lock(); 962 try { 963 if (w != null) 964 workers.remove(w); 965 decrementWorkerCount(); 966 tryTerminate(); 967 } finally { 968 mainLock.unlock(); 969 } 970 } 971 972 /** 973 * Performs cleanup and bookkeeping for a dying worker. Called 974 * only from worker threads. Unless completedAbruptly is set, 975 * assumes that workerCount has already been adjusted to account 976 * for exit. This method removes thread from worker set, and 977 * possibly terminates the pool or replaces the worker if either 978 * it exited due to user task exception or if fewer than 979 * corePoolSize workers are running or queue is non-empty but 980 * there are no workers. 981 * 982 * @param w the worker 983 * @param completedAbruptly if the worker died due to user exception 984 */ processWorkerExit(Worker w, boolean completedAbruptly)985 private void processWorkerExit(Worker w, boolean completedAbruptly) { 986 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted 987 decrementWorkerCount(); 988 989 final ReentrantLock mainLock = this.mainLock; 990 mainLock.lock(); 991 try { 992 completedTaskCount += w.completedTasks; 993 workers.remove(w); 994 } finally { 995 mainLock.unlock(); 996 } 997 998 tryTerminate(); 999 1000 int c = ctl.get(); 1001 if (runStateLessThan(c, STOP)) { 1002 if (!completedAbruptly) { 1003 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 1004 if (min == 0 && ! workQueue.isEmpty()) 1005 min = 1; 1006 if (workerCountOf(c) >= min) 1007 return; // replacement not needed 1008 } 1009 addWorker(null, false); 1010 } 1011 } 1012 1013 /** 1014 * Performs blocking or timed wait for a task, depending on 1015 * current configuration settings, or returns null if this worker 1016 * must exit because of any of: 1017 * 1. There are more than maximumPoolSize workers (due to 1018 * a call to setMaximumPoolSize). 1019 * 2. The pool is stopped. 1020 * 3. The pool is shutdown and the queue is empty. 1021 * 4. This worker timed out waiting for a task, and timed-out 1022 * workers are subject to termination (that is, 1023 * {@code allowCoreThreadTimeOut || workerCount > corePoolSize}) 1024 * both before and after the timed wait, and if the queue is 1025 * non-empty, this worker is not the last thread in the pool. 1026 * 1027 * @return task, or null if the worker must exit, in which case 1028 * workerCount is decremented 1029 */ getTask()1030 private Runnable getTask() { 1031 boolean timedOut = false; // Did the last poll() time out? 1032 1033 for (;;) { 1034 int c = ctl.get(); 1035 int rs = runStateOf(c); 1036 1037 // Check if queue empty only if necessary. 1038 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 1039 decrementWorkerCount(); 1040 return null; 1041 } 1042 1043 int wc = workerCountOf(c); 1044 1045 // Are workers subject to culling? 1046 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 1047 1048 if ((wc > maximumPoolSize || (timed && timedOut)) 1049 && (wc > 1 || workQueue.isEmpty())) { 1050 if (compareAndDecrementWorkerCount(c)) 1051 return null; 1052 continue; 1053 } 1054 1055 try { 1056 Runnable r = timed ? 1057 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 1058 workQueue.take(); 1059 if (r != null) 1060 return r; 1061 timedOut = true; 1062 } catch (InterruptedException retry) { 1063 timedOut = false; 1064 } 1065 } 1066 } 1067 1068 /** 1069 * Main worker run loop. Repeatedly gets tasks from queue and 1070 * executes them, while coping with a number of issues: 1071 * 1072 * 1. We may start out with an initial task, in which case we 1073 * don't need to get the first one. Otherwise, as long as pool is 1074 * running, we get tasks from getTask. If it returns null then the 1075 * worker exits due to changed pool state or configuration 1076 * parameters. Other exits result from exception throws in 1077 * external code, in which case completedAbruptly holds, which 1078 * usually leads processWorkerExit to replace this thread. 1079 * 1080 * 2. Before running any task, the lock is acquired to prevent 1081 * other pool interrupts while the task is executing, and then we 1082 * ensure that unless pool is stopping, this thread does not have 1083 * its interrupt set. 1084 * 1085 * 3. Each task run is preceded by a call to beforeExecute, which 1086 * might throw an exception, in which case we cause thread to die 1087 * (breaking loop with completedAbruptly true) without processing 1088 * the task. 1089 * 1090 * 4. Assuming beforeExecute completes normally, we run the task, 1091 * gathering any of its thrown exceptions to send to afterExecute. 1092 * We separately handle RuntimeException, Error (both of which the 1093 * specs guarantee that we trap) and arbitrary Throwables. 1094 * Because we cannot rethrow Throwables within Runnable.run, we 1095 * wrap them within Errors on the way out (to the thread's 1096 * UncaughtExceptionHandler). Any thrown exception also 1097 * conservatively causes thread to die. 1098 * 1099 * 5. After task.run completes, we call afterExecute, which may 1100 * also throw an exception, which will also cause thread to 1101 * die. According to JLS Sec 14.20, this exception is the one that 1102 * will be in effect even if task.run throws. 1103 * 1104 * The net effect of the exception mechanics is that afterExecute 1105 * and the thread's UncaughtExceptionHandler have as accurate 1106 * information as we can provide about any problems encountered by 1107 * user code. 1108 * 1109 * @param w the worker 1110 */ runWorker(Worker w)1111 final void runWorker(Worker w) { 1112 Thread wt = Thread.currentThread(); 1113 Runnable task = w.firstTask; 1114 w.firstTask = null; 1115 w.unlock(); // allow interrupts 1116 boolean completedAbruptly = true; 1117 try { 1118 while (task != null || (task = getTask()) != null) { 1119 w.lock(); 1120 // If pool is stopping, ensure thread is interrupted; 1121 // if not, ensure thread is not interrupted. This 1122 // requires a recheck in second case to deal with 1123 // shutdownNow race while clearing interrupt 1124 if ((runStateAtLeast(ctl.get(), STOP) || 1125 (Thread.interrupted() && 1126 runStateAtLeast(ctl.get(), STOP))) && 1127 !wt.isInterrupted()) 1128 wt.interrupt(); 1129 try { 1130 beforeExecute(wt, task); 1131 Throwable thrown = null; 1132 try { 1133 task.run(); 1134 } catch (RuntimeException x) { 1135 thrown = x; throw x; 1136 } catch (Error x) { 1137 thrown = x; throw x; 1138 } catch (Throwable x) { 1139 thrown = x; throw new Error(x); 1140 } finally { 1141 afterExecute(task, thrown); 1142 } 1143 } finally { 1144 task = null; 1145 w.completedTasks++; 1146 w.unlock(); 1147 } 1148 } 1149 completedAbruptly = false; 1150 } finally { 1151 processWorkerExit(w, completedAbruptly); 1152 } 1153 } 1154 1155 // Public constructors and methods 1156 1157 /** 1158 * Creates a new {@code ThreadPoolExecutor} with the given initial 1159 * parameters and default thread factory and rejected execution handler. 1160 * It may be more convenient to use one of the {@link Executors} factory 1161 * methods instead of this general purpose constructor. 1162 * 1163 * @param corePoolSize the number of threads to keep in the pool, even 1164 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1165 * @param maximumPoolSize the maximum number of threads to allow in the 1166 * pool 1167 * @param keepAliveTime when the number of threads is greater than 1168 * the core, this is the maximum time that excess idle threads 1169 * will wait for new tasks before terminating. 1170 * @param unit the time unit for the {@code keepAliveTime} argument 1171 * @param workQueue the queue to use for holding tasks before they are 1172 * executed. This queue will hold only the {@code Runnable} 1173 * tasks submitted by the {@code execute} method. 1174 * @throws IllegalArgumentException if one of the following holds:<br> 1175 * {@code corePoolSize < 0}<br> 1176 * {@code keepAliveTime < 0}<br> 1177 * {@code maximumPoolSize <= 0}<br> 1178 * {@code maximumPoolSize < corePoolSize} 1179 * @throws NullPointerException if {@code workQueue} is null 1180 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)1181 public ThreadPoolExecutor(int corePoolSize, 1182 int maximumPoolSize, 1183 long keepAliveTime, 1184 TimeUnit unit, 1185 BlockingQueue<Runnable> workQueue) { 1186 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1187 Executors.defaultThreadFactory(), defaultHandler); 1188 } 1189 1190 /** 1191 * Creates a new {@code ThreadPoolExecutor} with the given initial 1192 * parameters and default rejected execution handler. 1193 * 1194 * @param corePoolSize the number of threads to keep in the pool, even 1195 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1196 * @param maximumPoolSize the maximum number of threads to allow in the 1197 * pool 1198 * @param keepAliveTime when the number of threads is greater than 1199 * the core, this is the maximum time that excess idle threads 1200 * will wait for new tasks before terminating. 1201 * @param unit the time unit for the {@code keepAliveTime} argument 1202 * @param workQueue the queue to use for holding tasks before they are 1203 * executed. This queue will hold only the {@code Runnable} 1204 * tasks submitted by the {@code execute} method. 1205 * @param threadFactory the factory to use when the executor 1206 * creates a new thread 1207 * @throws IllegalArgumentException if one of the following holds:<br> 1208 * {@code corePoolSize < 0}<br> 1209 * {@code keepAliveTime < 0}<br> 1210 * {@code maximumPoolSize <= 0}<br> 1211 * {@code maximumPoolSize < corePoolSize} 1212 * @throws NullPointerException if {@code workQueue} 1213 * or {@code threadFactory} is null 1214 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)1215 public ThreadPoolExecutor(int corePoolSize, 1216 int maximumPoolSize, 1217 long keepAliveTime, 1218 TimeUnit unit, 1219 BlockingQueue<Runnable> workQueue, 1220 ThreadFactory threadFactory) { 1221 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1222 threadFactory, defaultHandler); 1223 } 1224 1225 /** 1226 * Creates a new {@code ThreadPoolExecutor} with the given initial 1227 * parameters and default thread factory. 1228 * 1229 * @param corePoolSize the number of threads to keep in the pool, even 1230 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1231 * @param maximumPoolSize the maximum number of threads to allow in the 1232 * pool 1233 * @param keepAliveTime when the number of threads is greater than 1234 * the core, this is the maximum time that excess idle threads 1235 * will wait for new tasks before terminating. 1236 * @param unit the time unit for the {@code keepAliveTime} argument 1237 * @param workQueue the queue to use for holding tasks before they are 1238 * executed. This queue will hold only the {@code Runnable} 1239 * tasks submitted by the {@code execute} method. 1240 * @param handler the handler to use when execution is blocked 1241 * because the thread bounds and queue capacities are reached 1242 * @throws IllegalArgumentException if one of the following holds:<br> 1243 * {@code corePoolSize < 0}<br> 1244 * {@code keepAliveTime < 0}<br> 1245 * {@code maximumPoolSize <= 0}<br> 1246 * {@code maximumPoolSize < corePoolSize} 1247 * @throws NullPointerException if {@code workQueue} 1248 * or {@code handler} is null 1249 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)1250 public ThreadPoolExecutor(int corePoolSize, 1251 int maximumPoolSize, 1252 long keepAliveTime, 1253 TimeUnit unit, 1254 BlockingQueue<Runnable> workQueue, 1255 RejectedExecutionHandler handler) { 1256 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 1257 Executors.defaultThreadFactory(), handler); 1258 } 1259 1260 /** 1261 * Creates a new {@code ThreadPoolExecutor} with the given initial 1262 * parameters. 1263 * 1264 * @param corePoolSize the number of threads to keep in the pool, even 1265 * if they are idle, unless {@code allowCoreThreadTimeOut} is set 1266 * @param maximumPoolSize the maximum number of threads to allow in the 1267 * pool 1268 * @param keepAliveTime when the number of threads is greater than 1269 * the core, this is the maximum time that excess idle threads 1270 * will wait for new tasks before terminating. 1271 * @param unit the time unit for the {@code keepAliveTime} argument 1272 * @param workQueue the queue to use for holding tasks before they are 1273 * executed. This queue will hold only the {@code Runnable} 1274 * tasks submitted by the {@code execute} method. 1275 * @param threadFactory the factory to use when the executor 1276 * creates a new thread 1277 * @param handler the handler to use when execution is blocked 1278 * because the thread bounds and queue capacities are reached 1279 * @throws IllegalArgumentException if one of the following holds:<br> 1280 * {@code corePoolSize < 0}<br> 1281 * {@code keepAliveTime < 0}<br> 1282 * {@code maximumPoolSize <= 0}<br> 1283 * {@code maximumPoolSize < corePoolSize} 1284 * @throws NullPointerException if {@code workQueue} 1285 * or {@code threadFactory} or {@code handler} is null 1286 */ ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)1287 public ThreadPoolExecutor(int corePoolSize, 1288 int maximumPoolSize, 1289 long keepAliveTime, 1290 TimeUnit unit, 1291 BlockingQueue<Runnable> workQueue, 1292 ThreadFactory threadFactory, 1293 RejectedExecutionHandler handler) { 1294 if (corePoolSize < 0 || 1295 maximumPoolSize <= 0 || 1296 maximumPoolSize < corePoolSize || 1297 keepAliveTime < 0) 1298 throw new IllegalArgumentException(); 1299 if (workQueue == null || threadFactory == null || handler == null) 1300 throw new NullPointerException(); 1301 this.corePoolSize = corePoolSize; 1302 this.maximumPoolSize = maximumPoolSize; 1303 this.workQueue = workQueue; 1304 this.keepAliveTime = unit.toNanos(keepAliveTime); 1305 this.threadFactory = threadFactory; 1306 this.handler = handler; 1307 } 1308 1309 /** 1310 * Executes the given task sometime in the future. The task 1311 * may execute in a new thread or in an existing pooled thread. 1312 * 1313 * If the task cannot be submitted for execution, either because this 1314 * executor has been shutdown or because its capacity has been reached, 1315 * the task is handled by the current {@code RejectedExecutionHandler}. 1316 * 1317 * @param command the task to execute 1318 * @throws RejectedExecutionException at discretion of 1319 * {@code RejectedExecutionHandler}, if the task 1320 * cannot be accepted for execution 1321 * @throws NullPointerException if {@code command} is null 1322 */ execute(Runnable command)1323 public void execute(Runnable command) { 1324 if (command == null) 1325 throw new NullPointerException(); 1326 /* 1327 * Proceed in 3 steps: 1328 * 1329 * 1. If fewer than corePoolSize threads are running, try to 1330 * start a new thread with the given command as its first 1331 * task. The call to addWorker atomically checks runState and 1332 * workerCount, and so prevents false alarms that would add 1333 * threads when it shouldn't, by returning false. 1334 * 1335 * 2. If a task can be successfully queued, then we still need 1336 * to double-check whether we should have added a thread 1337 * (because existing ones died since last checking) or that 1338 * the pool shut down since entry into this method. So we 1339 * recheck state and if necessary roll back the enqueuing if 1340 * stopped, or start a new thread if there are none. 1341 * 1342 * 3. If we cannot queue task, then we try to add a new 1343 * thread. If it fails, we know we are shut down or saturated 1344 * and so reject the task. 1345 */ 1346 int c = ctl.get(); 1347 if (workerCountOf(c) < corePoolSize) { 1348 if (addWorker(command, true)) 1349 return; 1350 c = ctl.get(); 1351 } 1352 if (isRunning(c) && workQueue.offer(command)) { 1353 int recheck = ctl.get(); 1354 if (! isRunning(recheck) && remove(command)) 1355 reject(command); 1356 else if (workerCountOf(recheck) == 0) 1357 addWorker(null, false); 1358 } 1359 else if (!addWorker(command, false)) 1360 reject(command); 1361 } 1362 1363 /** 1364 * Initiates an orderly shutdown in which previously submitted 1365 * tasks are executed, but no new tasks will be accepted. 1366 * Invocation has no additional effect if already shut down. 1367 * 1368 * <p>This method does not wait for previously submitted tasks to 1369 * complete execution. Use {@link #awaitTermination awaitTermination} 1370 * to do that. 1371 */ 1372 // android-note: Removed @throws SecurityException shutdown()1373 public void shutdown() { 1374 final ReentrantLock mainLock = this.mainLock; 1375 mainLock.lock(); 1376 try { 1377 checkShutdownAccess(); 1378 advanceRunState(SHUTDOWN); 1379 interruptIdleWorkers(); 1380 onShutdown(); // hook for ScheduledThreadPoolExecutor 1381 } finally { 1382 mainLock.unlock(); 1383 } 1384 tryTerminate(); 1385 } 1386 1387 /** 1388 * Attempts to stop all actively executing tasks, halts the 1389 * processing of waiting tasks, and returns a list of the tasks 1390 * that were awaiting execution. These tasks are drained (removed) 1391 * from the task queue upon return from this method. 1392 * 1393 * <p>This method does not wait for actively executing tasks to 1394 * terminate. Use {@link #awaitTermination awaitTermination} to 1395 * do that. 1396 * 1397 * <p>There are no guarantees beyond best-effort attempts to stop 1398 * processing actively executing tasks. This implementation 1399 * interrupts tasks via {@link Thread#interrupt}; any task that 1400 * fails to respond to interrupts may never terminate. 1401 */ 1402 // android-note: Removed @throws SecurityException shutdownNow()1403 public List<Runnable> shutdownNow() { 1404 List<Runnable> tasks; 1405 final ReentrantLock mainLock = this.mainLock; 1406 mainLock.lock(); 1407 try { 1408 checkShutdownAccess(); 1409 advanceRunState(STOP); 1410 interruptWorkers(); 1411 tasks = drainQueue(); 1412 } finally { 1413 mainLock.unlock(); 1414 } 1415 tryTerminate(); 1416 return tasks; 1417 } 1418 isShutdown()1419 public boolean isShutdown() { 1420 return ! isRunning(ctl.get()); 1421 } 1422 1423 /** 1424 * Returns true if this executor is in the process of terminating 1425 * after {@link #shutdown} or {@link #shutdownNow} but has not 1426 * completely terminated. This method may be useful for 1427 * debugging. A return of {@code true} reported a sufficient 1428 * period after shutdown may indicate that submitted tasks have 1429 * ignored or suppressed interruption, causing this executor not 1430 * to properly terminate. 1431 * 1432 * @return {@code true} if terminating but not yet terminated 1433 */ isTerminating()1434 public boolean isTerminating() { 1435 int c = ctl.get(); 1436 return ! isRunning(c) && runStateLessThan(c, TERMINATED); 1437 } 1438 isTerminated()1439 public boolean isTerminated() { 1440 return runStateAtLeast(ctl.get(), TERMINATED); 1441 } 1442 awaitTermination(long timeout, TimeUnit unit)1443 public boolean awaitTermination(long timeout, TimeUnit unit) 1444 throws InterruptedException { 1445 long nanos = unit.toNanos(timeout); 1446 final ReentrantLock mainLock = this.mainLock; 1447 mainLock.lock(); 1448 try { 1449 while (!runStateAtLeast(ctl.get(), TERMINATED)) { 1450 if (nanos <= 0L) 1451 return false; 1452 nanos = termination.awaitNanos(nanos); 1453 } 1454 return true; 1455 } finally { 1456 mainLock.unlock(); 1457 } 1458 } 1459 1460 /** 1461 * Invokes {@code shutdown} when this executor is no longer 1462 * referenced and it has no threads. 1463 */ finalize()1464 protected void finalize() { 1465 shutdown(); 1466 } 1467 1468 /** 1469 * Sets the thread factory used to create new threads. 1470 * 1471 * @param threadFactory the new thread factory 1472 * @throws NullPointerException if threadFactory is null 1473 * @see #getThreadFactory 1474 */ setThreadFactory(ThreadFactory threadFactory)1475 public void setThreadFactory(ThreadFactory threadFactory) { 1476 if (threadFactory == null) 1477 throw new NullPointerException(); 1478 this.threadFactory = threadFactory; 1479 } 1480 1481 /** 1482 * Returns the thread factory used to create new threads. 1483 * 1484 * @return the current thread factory 1485 * @see #setThreadFactory(ThreadFactory) 1486 */ getThreadFactory()1487 public ThreadFactory getThreadFactory() { 1488 return threadFactory; 1489 } 1490 1491 /** 1492 * Sets a new handler for unexecutable tasks. 1493 * 1494 * @param handler the new handler 1495 * @throws NullPointerException if handler is null 1496 * @see #getRejectedExecutionHandler 1497 */ setRejectedExecutionHandler(RejectedExecutionHandler handler)1498 public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { 1499 if (handler == null) 1500 throw new NullPointerException(); 1501 this.handler = handler; 1502 } 1503 1504 /** 1505 * Returns the current handler for unexecutable tasks. 1506 * 1507 * @return the current handler 1508 * @see #setRejectedExecutionHandler(RejectedExecutionHandler) 1509 */ getRejectedExecutionHandler()1510 public RejectedExecutionHandler getRejectedExecutionHandler() { 1511 return handler; 1512 } 1513 1514 /** 1515 * Sets the core number of threads. This overrides any value set 1516 * in the constructor. If the new value is smaller than the 1517 * current value, excess existing threads will be terminated when 1518 * they next become idle. If larger, new threads will, if needed, 1519 * be started to execute any queued tasks. 1520 * 1521 * @param corePoolSize the new core size 1522 * @throws IllegalArgumentException if {@code corePoolSize < 0} 1523 * @see #getCorePoolSize 1524 */ 1525 // Android-changed: Reverted code that threw an IAE when 1526 // {@code corePoolSize} is greater than the {@linkplain #getMaximumPoolSize() 1527 // maximum pool size}. This is due to defective code in a commonly used third 1528 // party library that does something like : 1529 // 1530 // exec.setCorePoolSize(N); 1531 // exec.setMaxPoolSize(N); setCorePoolSize(int corePoolSize)1532 public void setCorePoolSize(int corePoolSize) { 1533 if (corePoolSize < 0) 1534 throw new IllegalArgumentException(); 1535 int delta = corePoolSize - this.corePoolSize; 1536 this.corePoolSize = corePoolSize; 1537 if (workerCountOf(ctl.get()) > corePoolSize) 1538 interruptIdleWorkers(); 1539 else if (delta > 0) { 1540 // We don't really know how many new threads are "needed". 1541 // As a heuristic, prestart enough new workers (up to new 1542 // core size) to handle the current number of tasks in 1543 // queue, but stop if queue becomes empty while doing so. 1544 int k = Math.min(delta, workQueue.size()); 1545 while (k-- > 0 && addWorker(null, true)) { 1546 if (workQueue.isEmpty()) 1547 break; 1548 } 1549 } 1550 } 1551 1552 /** 1553 * Returns the core number of threads. 1554 * 1555 * @return the core number of threads 1556 * @see #setCorePoolSize 1557 */ getCorePoolSize()1558 public int getCorePoolSize() { 1559 return corePoolSize; 1560 } 1561 1562 /** 1563 * Starts a core thread, causing it to idly wait for work. This 1564 * overrides the default policy of starting core threads only when 1565 * new tasks are executed. This method will return {@code false} 1566 * if all core threads have already been started. 1567 * 1568 * @return {@code true} if a thread was started 1569 */ prestartCoreThread()1570 public boolean prestartCoreThread() { 1571 return workerCountOf(ctl.get()) < corePoolSize && 1572 addWorker(null, true); 1573 } 1574 1575 /** 1576 * Same as prestartCoreThread except arranges that at least one 1577 * thread is started even if corePoolSize is 0. 1578 */ ensurePrestart()1579 void ensurePrestart() { 1580 int wc = workerCountOf(ctl.get()); 1581 if (wc < corePoolSize) 1582 addWorker(null, true); 1583 else if (wc == 0) 1584 addWorker(null, false); 1585 } 1586 1587 /** 1588 * Starts all core threads, causing them to idly wait for work. This 1589 * overrides the default policy of starting core threads only when 1590 * new tasks are executed. 1591 * 1592 * @return the number of threads started 1593 */ prestartAllCoreThreads()1594 public int prestartAllCoreThreads() { 1595 int n = 0; 1596 while (addWorker(null, true)) 1597 ++n; 1598 return n; 1599 } 1600 1601 /** 1602 * Returns true if this pool allows core threads to time out and 1603 * terminate if no tasks arrive within the keepAlive time, being 1604 * replaced if needed when new tasks arrive. When true, the same 1605 * keep-alive policy applying to non-core threads applies also to 1606 * core threads. When false (the default), core threads are never 1607 * terminated due to lack of incoming tasks. 1608 * 1609 * @return {@code true} if core threads are allowed to time out, 1610 * else {@code false} 1611 * 1612 * @since 1.6 1613 */ allowsCoreThreadTimeOut()1614 public boolean allowsCoreThreadTimeOut() { 1615 return allowCoreThreadTimeOut; 1616 } 1617 1618 /** 1619 * Sets the policy governing whether core threads may time out and 1620 * terminate if no tasks arrive within the keep-alive time, being 1621 * replaced if needed when new tasks arrive. When false, core 1622 * threads are never terminated due to lack of incoming 1623 * tasks. When true, the same keep-alive policy applying to 1624 * non-core threads applies also to core threads. To avoid 1625 * continual thread replacement, the keep-alive time must be 1626 * greater than zero when setting {@code true}. This method 1627 * should in general be called before the pool is actively used. 1628 * 1629 * @param value {@code true} if should time out, else {@code false} 1630 * @throws IllegalArgumentException if value is {@code true} 1631 * and the current keep-alive time is not greater than zero 1632 * 1633 * @since 1.6 1634 */ allowCoreThreadTimeOut(boolean value)1635 public void allowCoreThreadTimeOut(boolean value) { 1636 if (value && keepAliveTime <= 0) 1637 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1638 if (value != allowCoreThreadTimeOut) { 1639 allowCoreThreadTimeOut = value; 1640 if (value) 1641 interruptIdleWorkers(); 1642 } 1643 } 1644 1645 /** 1646 * Sets the maximum allowed number of threads. This overrides any 1647 * value set in the constructor. If the new value is smaller than 1648 * the current value, excess existing threads will be 1649 * terminated when they next become idle. 1650 * 1651 * @param maximumPoolSize the new maximum 1652 * @throws IllegalArgumentException if the new maximum is 1653 * less than or equal to zero, or 1654 * less than the {@linkplain #getCorePoolSize core pool size} 1655 * @see #getMaximumPoolSize 1656 */ setMaximumPoolSize(int maximumPoolSize)1657 public void setMaximumPoolSize(int maximumPoolSize) { 1658 if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) 1659 throw new IllegalArgumentException(); 1660 this.maximumPoolSize = maximumPoolSize; 1661 if (workerCountOf(ctl.get()) > maximumPoolSize) 1662 interruptIdleWorkers(); 1663 } 1664 1665 /** 1666 * Returns the maximum allowed number of threads. 1667 * 1668 * @return the maximum allowed number of threads 1669 * @see #setMaximumPoolSize 1670 */ getMaximumPoolSize()1671 public int getMaximumPoolSize() { 1672 return maximumPoolSize; 1673 } 1674 1675 /** 1676 * Sets the thread keep-alive time, which is the amount of time 1677 * that threads may remain idle before being terminated. 1678 * Threads that wait this amount of time without processing a 1679 * task will be terminated if there are more than the core 1680 * number of threads currently in the pool, or if this pool 1681 * {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}. 1682 * This overrides any value set in the constructor. 1683 * 1684 * @param time the time to wait. A time value of zero will cause 1685 * excess threads to terminate immediately after executing tasks. 1686 * @param unit the time unit of the {@code time} argument 1687 * @throws IllegalArgumentException if {@code time} less than zero or 1688 * if {@code time} is zero and {@code allowsCoreThreadTimeOut} 1689 * @see #getKeepAliveTime(TimeUnit) 1690 */ setKeepAliveTime(long time, TimeUnit unit)1691 public void setKeepAliveTime(long time, TimeUnit unit) { 1692 if (time < 0) 1693 throw new IllegalArgumentException(); 1694 if (time == 0 && allowsCoreThreadTimeOut()) 1695 throw new IllegalArgumentException("Core threads must have nonzero keep alive times"); 1696 long keepAliveTime = unit.toNanos(time); 1697 long delta = keepAliveTime - this.keepAliveTime; 1698 this.keepAliveTime = keepAliveTime; 1699 if (delta < 0) 1700 interruptIdleWorkers(); 1701 } 1702 1703 /** 1704 * Returns the thread keep-alive time, which is the amount of time 1705 * that threads may remain idle before being terminated. 1706 * Threads that wait this amount of time without processing a 1707 * task will be terminated if there are more than the core 1708 * number of threads currently in the pool, or if this pool 1709 * {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}. 1710 * 1711 * @param unit the desired time unit of the result 1712 * @return the time limit 1713 * @see #setKeepAliveTime(long, TimeUnit) 1714 */ getKeepAliveTime(TimeUnit unit)1715 public long getKeepAliveTime(TimeUnit unit) { 1716 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 1717 } 1718 1719 /* User-level queue utilities */ 1720 1721 /** 1722 * Returns the task queue used by this executor. Access to the 1723 * task queue is intended primarily for debugging and monitoring. 1724 * This queue may be in active use. Retrieving the task queue 1725 * does not prevent queued tasks from executing. 1726 * 1727 * @return the task queue 1728 */ getQueue()1729 public BlockingQueue<Runnable> getQueue() { 1730 return workQueue; 1731 } 1732 1733 /** 1734 * Removes this task from the executor's internal queue if it is 1735 * present, thus causing it not to be run if it has not already 1736 * started. 1737 * 1738 * <p>This method may be useful as one part of a cancellation 1739 * scheme. It may fail to remove tasks that have been converted 1740 * into other forms before being placed on the internal queue. 1741 * For example, a task entered using {@code submit} might be 1742 * converted into a form that maintains {@code Future} status. 1743 * However, in such cases, method {@link #purge} may be used to 1744 * remove those Futures that have been cancelled. 1745 * 1746 * @param task the task to remove 1747 * @return {@code true} if the task was removed 1748 */ remove(Runnable task)1749 public boolean remove(Runnable task) { 1750 boolean removed = workQueue.remove(task); 1751 tryTerminate(); // In case SHUTDOWN and now empty 1752 return removed; 1753 } 1754 1755 /** 1756 * Tries to remove from the work queue all {@link Future} 1757 * tasks that have been cancelled. This method can be useful as a 1758 * storage reclamation operation, that has no other impact on 1759 * functionality. Cancelled tasks are never executed, but may 1760 * accumulate in work queues until worker threads can actively 1761 * remove them. Invoking this method instead tries to remove them now. 1762 * However, this method may fail to remove tasks in 1763 * the presence of interference by other threads. 1764 */ purge()1765 public void purge() { 1766 final BlockingQueue<Runnable> q = workQueue; 1767 try { 1768 Iterator<Runnable> it = q.iterator(); 1769 while (it.hasNext()) { 1770 Runnable r = it.next(); 1771 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1772 it.remove(); 1773 } 1774 } catch (ConcurrentModificationException fallThrough) { 1775 // Take slow path if we encounter interference during traversal. 1776 // Make copy for traversal and call remove for cancelled entries. 1777 // The slow path is more likely to be O(N*N). 1778 for (Object r : q.toArray()) 1779 if (r instanceof Future<?> && ((Future<?>)r).isCancelled()) 1780 q.remove(r); 1781 } 1782 1783 tryTerminate(); // In case SHUTDOWN and now empty 1784 } 1785 1786 /* Statistics */ 1787 1788 /** 1789 * Returns the current number of threads in the pool. 1790 * 1791 * @return the number of threads 1792 */ getPoolSize()1793 public int getPoolSize() { 1794 final ReentrantLock mainLock = this.mainLock; 1795 mainLock.lock(); 1796 try { 1797 // Remove rare and surprising possibility of 1798 // isTerminated() && getPoolSize() > 0 1799 return runStateAtLeast(ctl.get(), TIDYING) ? 0 1800 : workers.size(); 1801 } finally { 1802 mainLock.unlock(); 1803 } 1804 } 1805 1806 /** 1807 * Returns the approximate number of threads that are actively 1808 * executing tasks. 1809 * 1810 * @return the number of threads 1811 */ getActiveCount()1812 public int getActiveCount() { 1813 final ReentrantLock mainLock = this.mainLock; 1814 mainLock.lock(); 1815 try { 1816 int n = 0; 1817 for (Worker w : workers) 1818 if (w.isLocked()) 1819 ++n; 1820 return n; 1821 } finally { 1822 mainLock.unlock(); 1823 } 1824 } 1825 1826 /** 1827 * Returns the largest number of threads that have ever 1828 * simultaneously been in the pool. 1829 * 1830 * @return the number of threads 1831 */ getLargestPoolSize()1832 public int getLargestPoolSize() { 1833 final ReentrantLock mainLock = this.mainLock; 1834 mainLock.lock(); 1835 try { 1836 return largestPoolSize; 1837 } finally { 1838 mainLock.unlock(); 1839 } 1840 } 1841 1842 /** 1843 * Returns the approximate total number of tasks that have ever been 1844 * scheduled for execution. Because the states of tasks and 1845 * threads may change dynamically during computation, the returned 1846 * value is only an approximation. 1847 * 1848 * @return the number of tasks 1849 */ getTaskCount()1850 public long getTaskCount() { 1851 final ReentrantLock mainLock = this.mainLock; 1852 mainLock.lock(); 1853 try { 1854 long n = completedTaskCount; 1855 for (Worker w : workers) { 1856 n += w.completedTasks; 1857 if (w.isLocked()) 1858 ++n; 1859 } 1860 return n + workQueue.size(); 1861 } finally { 1862 mainLock.unlock(); 1863 } 1864 } 1865 1866 /** 1867 * Returns the approximate total number of tasks that have 1868 * completed execution. Because the states of tasks and threads 1869 * may change dynamically during computation, the returned value 1870 * is only an approximation, but one that does not ever decrease 1871 * across successive calls. 1872 * 1873 * @return the number of tasks 1874 */ getCompletedTaskCount()1875 public long getCompletedTaskCount() { 1876 final ReentrantLock mainLock = this.mainLock; 1877 mainLock.lock(); 1878 try { 1879 long n = completedTaskCount; 1880 for (Worker w : workers) 1881 n += w.completedTasks; 1882 return n; 1883 } finally { 1884 mainLock.unlock(); 1885 } 1886 } 1887 1888 /** 1889 * Returns a string identifying this pool, as well as its state, 1890 * including indications of run state and estimated worker and 1891 * task counts. 1892 * 1893 * @return a string identifying this pool, as well as its state 1894 */ toString()1895 public String toString() { 1896 long ncompleted; 1897 int nworkers, nactive; 1898 final ReentrantLock mainLock = this.mainLock; 1899 mainLock.lock(); 1900 try { 1901 ncompleted = completedTaskCount; 1902 nactive = 0; 1903 nworkers = workers.size(); 1904 for (Worker w : workers) { 1905 ncompleted += w.completedTasks; 1906 if (w.isLocked()) 1907 ++nactive; 1908 } 1909 } finally { 1910 mainLock.unlock(); 1911 } 1912 int c = ctl.get(); 1913 String runState = 1914 runStateLessThan(c, SHUTDOWN) ? "Running" : 1915 runStateAtLeast(c, TERMINATED) ? "Terminated" : 1916 "Shutting down"; 1917 return super.toString() + 1918 "[" + runState + 1919 ", pool size = " + nworkers + 1920 ", active threads = " + nactive + 1921 ", queued tasks = " + workQueue.size() + 1922 ", completed tasks = " + ncompleted + 1923 "]"; 1924 } 1925 1926 /* Extension hooks */ 1927 1928 /** 1929 * Method invoked prior to executing the given Runnable in the 1930 * given thread. This method is invoked by thread {@code t} that 1931 * will execute task {@code r}, and may be used to re-initialize 1932 * ThreadLocals, or to perform logging. 1933 * 1934 * <p>This implementation does nothing, but may be customized in 1935 * subclasses. Note: To properly nest multiple overridings, subclasses 1936 * should generally invoke {@code super.beforeExecute} at the end of 1937 * this method. 1938 * 1939 * @param t the thread that will run task {@code r} 1940 * @param r the task that will be executed 1941 */ beforeExecute(Thread t, Runnable r)1942 protected void beforeExecute(Thread t, Runnable r) { } 1943 1944 /** 1945 * Method invoked upon completion of execution of the given Runnable. 1946 * This method is invoked by the thread that executed the task. If 1947 * non-null, the Throwable is the uncaught {@code RuntimeException} 1948 * or {@code Error} that caused execution to terminate abruptly. 1949 * 1950 * <p>This implementation does nothing, but may be customized in 1951 * subclasses. Note: To properly nest multiple overridings, subclasses 1952 * should generally invoke {@code super.afterExecute} at the 1953 * beginning of this method. 1954 * 1955 * <p><b>Note:</b> When actions are enclosed in tasks (such as 1956 * {@link FutureTask}) either explicitly or via methods such as 1957 * {@code submit}, these task objects catch and maintain 1958 * computational exceptions, and so they do not cause abrupt 1959 * termination, and the internal exceptions are <em>not</em> 1960 * passed to this method. If you would like to trap both kinds of 1961 * failures in this method, you can further probe for such cases, 1962 * as in this sample subclass that prints either the direct cause 1963 * or the underlying exception if a task has been aborted: 1964 * 1965 * <pre> {@code 1966 * class ExtendedExecutor extends ThreadPoolExecutor { 1967 * // ... 1968 * protected void afterExecute(Runnable r, Throwable t) { 1969 * super.afterExecute(r, t); 1970 * if (t == null 1971 * && r instanceof Future<?> 1972 * && ((Future<?>)r).isDone()) { 1973 * try { 1974 * Object result = ((Future<?>) r).get(); 1975 * } catch (CancellationException ce) { 1976 * t = ce; 1977 * } catch (ExecutionException ee) { 1978 * t = ee.getCause(); 1979 * } catch (InterruptedException ie) { 1980 * // ignore/reset 1981 * Thread.currentThread().interrupt(); 1982 * } 1983 * } 1984 * if (t != null) 1985 * System.out.println(t); 1986 * } 1987 * }}</pre> 1988 * 1989 * @param r the runnable that has completed 1990 * @param t the exception that caused termination, or null if 1991 * execution completed normally 1992 */ afterExecute(Runnable r, Throwable t)1993 protected void afterExecute(Runnable r, Throwable t) { } 1994 1995 /** 1996 * Method invoked when the Executor has terminated. Default 1997 * implementation does nothing. Note: To properly nest multiple 1998 * overridings, subclasses should generally invoke 1999 * {@code super.terminated} within this method. 2000 */ terminated()2001 protected void terminated() { } 2002 2003 /* Predefined RejectedExecutionHandlers */ 2004 2005 /** 2006 * A handler for rejected tasks that runs the rejected task 2007 * directly in the calling thread of the {@code execute} method, 2008 * unless the executor has been shut down, in which case the task 2009 * is discarded. 2010 */ 2011 public static class CallerRunsPolicy implements RejectedExecutionHandler { 2012 /** 2013 * Creates a {@code CallerRunsPolicy}. 2014 */ CallerRunsPolicy()2015 public CallerRunsPolicy() { } 2016 2017 /** 2018 * Executes task r in the caller's thread, unless the executor 2019 * has been shut down, in which case the task is discarded. 2020 * 2021 * @param r the runnable task requested to be executed 2022 * @param e the executor attempting to execute this task 2023 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2024 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2025 if (!e.isShutdown()) { 2026 r.run(); 2027 } 2028 } 2029 } 2030 2031 /** 2032 * A handler for rejected tasks that throws a 2033 * {@code RejectedExecutionException}. 2034 */ 2035 public static class AbortPolicy implements RejectedExecutionHandler { 2036 /** 2037 * Creates an {@code AbortPolicy}. 2038 */ AbortPolicy()2039 public AbortPolicy() { } 2040 2041 /** 2042 * Always throws RejectedExecutionException. 2043 * 2044 * @param r the runnable task requested to be executed 2045 * @param e the executor attempting to execute this task 2046 * @throws RejectedExecutionException always 2047 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2048 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2049 throw new RejectedExecutionException("Task " + r.toString() + 2050 " rejected from " + 2051 e.toString()); 2052 } 2053 } 2054 2055 /** 2056 * A handler for rejected tasks that silently discards the 2057 * rejected task. 2058 */ 2059 public static class DiscardPolicy implements RejectedExecutionHandler { 2060 /** 2061 * Creates a {@code DiscardPolicy}. 2062 */ DiscardPolicy()2063 public DiscardPolicy() { } 2064 2065 /** 2066 * Does nothing, which has the effect of discarding task r. 2067 * 2068 * @param r the runnable task requested to be executed 2069 * @param e the executor attempting to execute this task 2070 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2071 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2072 } 2073 } 2074 2075 /** 2076 * A handler for rejected tasks that discards the oldest unhandled 2077 * request and then retries {@code execute}, unless the executor 2078 * is shut down, in which case the task is discarded. 2079 */ 2080 public static class DiscardOldestPolicy implements RejectedExecutionHandler { 2081 /** 2082 * Creates a {@code DiscardOldestPolicy} for the given executor. 2083 */ DiscardOldestPolicy()2084 public DiscardOldestPolicy() { } 2085 2086 /** 2087 * Obtains and ignores the next task that the executor 2088 * would otherwise execute, if one is immediately available, 2089 * and then retries execution of task r, unless the executor 2090 * is shut down, in which case task r is instead discarded. 2091 * 2092 * @param r the runnable task requested to be executed 2093 * @param e the executor attempting to execute this task 2094 */ rejectedExecution(Runnable r, ThreadPoolExecutor e)2095 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 2096 if (!e.isShutdown()) { 2097 e.getQueue().poll(); 2098 e.execute(r); 2099 } 2100 } 2101 } 2102 } 2103