• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.util.ArrayList;
39 import java.util.ConcurrentModificationException;
40 import java.util.HashSet;
41 import java.util.Iterator;
42 import java.util.List;
43 import java.util.concurrent.atomic.AtomicInteger;
44 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
45 import java.util.concurrent.locks.Condition;
46 import java.util.concurrent.locks.ReentrantLock;
47 
48 // BEGIN android-note
49 // removed security manager docs
50 // END android-note
51 
52 /**
53  * An {@link ExecutorService} that executes each submitted task using
54  * one of possibly several pooled threads, normally configured
55  * using {@link Executors} factory methods.
56  *
57  * <p>Thread pools address two different problems: they usually
58  * provide improved performance when executing large numbers of
59  * asynchronous tasks, due to reduced per-task invocation overhead,
60  * and they provide a means of bounding and managing the resources,
61  * including threads, consumed when executing a collection of tasks.
62  * Each {@code ThreadPoolExecutor} also maintains some basic
63  * statistics, such as the number of completed tasks.
64  *
65  * <p>To be useful across a wide range of contexts, this class
66  * provides many adjustable parameters and extensibility
67  * hooks. However, programmers are urged to use the more convenient
68  * {@link Executors} factory methods {@link
69  * Executors#newCachedThreadPool} (unbounded thread pool, with
70  * automatic thread reclamation), {@link Executors#newFixedThreadPool}
71  * (fixed size thread pool) and {@link
72  * Executors#newSingleThreadExecutor} (single background thread), that
73  * preconfigure settings for the most common usage
74  * scenarios. Otherwise, use the following guide when manually
75  * configuring and tuning this class:
76  *
77  * <dl>
78  *
79  * <dt>Core and maximum pool sizes</dt>
80  *
81  * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
82  * A {@code ThreadPoolExecutor} will automatically adjust the
83  * pool size (see {@link #getPoolSize})
84  * according to the bounds set by
85  * corePoolSize (see {@link #getCorePoolSize}) and
86  * maximumPoolSize (see {@link #getMaximumPoolSize}).
87  *
88  * When a new task is submitted in method {@link #execute(Runnable)},
89  * and fewer than corePoolSize threads are running, a new thread is
90  * created to handle the request, even if other worker threads are
91  * idle.  If there are more than corePoolSize but less than
92  * maximumPoolSize threads running, a new thread will be created only
93  * if the queue is full.  By setting corePoolSize and maximumPoolSize
94  * the same, you create a fixed-size thread pool. By setting
95  * maximumPoolSize to an essentially unbounded value such as {@code
96  * Integer.MAX_VALUE}, you allow the pool to accommodate an arbitrary
97  * number of concurrent tasks. Most typically, core and maximum pool
98  * sizes are set only upon construction, but they may also be changed
99  * dynamically using {@link #setCorePoolSize} and {@link
100  * #setMaximumPoolSize}. </dd>
101  *
102  * <dt>On-demand construction</dt>
103  *
104  * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
105  * By default, even core threads are initially created and
106  * started only when new tasks arrive, but this can be overridden
107  * dynamically using method {@link #prestartCoreThread} or {@link
108  * #prestartAllCoreThreads}.  You probably want to prestart threads if
109  * you construct the pool with a non-empty queue. </dd>
110  *
111  * <dt>Creating new threads</dt>
112  *
113  * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
114  * New threads are created using a {@link ThreadFactory}.  If not
115  * otherwise specified, a {@link Executors#defaultThreadFactory} is
116  * used, that creates threads to all be in the same {@link
117  * ThreadGroup} and with the same {@code NORM_PRIORITY} priority and
118  * non-daemon status. By supplying a different ThreadFactory, you can
119  * alter the thread's name, thread group, priority, daemon status,
120  * etc. If a {@code ThreadFactory} fails to create a thread when asked
121  * by returning null from {@code newThread}, the executor will
122  * continue, but might not be able to execute any tasks. Threads
123  * should possess the "modifyThread" {@code RuntimePermission}. If
124  * worker threads or other threads using the pool do not possess this
125  * permission, service may be degraded: configuration changes may not
126  * take effect in a timely manner, and a shutdown pool may remain in a
127  * state in which termination is possible but not completed.</dd>
128  *
129  * <dt>Keep-alive times</dt>
130  *
131  * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
132  * If the pool currently has more than corePoolSize threads,
133  * excess threads will be terminated if they have been idle for more
134  * than the keepAliveTime (see {@link #getKeepAliveTime(TimeUnit)}).
135  * This provides a means of reducing resource consumption when the
136  * pool is not being actively used. If the pool becomes more active
137  * later, new threads will be constructed. This parameter can also be
138  * changed dynamically using method {@link #setKeepAliveTime(long,
139  * TimeUnit)}.  Using a value of {@code Long.MAX_VALUE} {@link
140  * TimeUnit#NANOSECONDS} effectively disables idle threads from ever
141  * terminating prior to shut down. By default, the keep-alive policy
142  * applies only when there are more than corePoolSize threads, but
143  * method {@link #allowCoreThreadTimeOut(boolean)} can be used to
144  * apply this time-out policy to core threads as well, so long as the
145  * keepAliveTime value is non-zero. </dd>
146  *
147  * <dt>Queuing</dt>
148  *
149  * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
150  * Any {@link BlockingQueue} may be used to transfer and hold
151  * submitted tasks.  The use of this queue interacts with pool sizing:
152  *
153  * <ul>
154  *
155  * <li>If fewer than corePoolSize threads are running, the Executor
156  * always prefers adding a new thread
157  * rather than queuing.
158  *
159  * <li>If corePoolSize or more threads are running, the Executor
160  * always prefers queuing a request rather than adding a new
161  * thread.
162  *
163  * <li>If a request cannot be queued, a new thread is created unless
164  * this would exceed maximumPoolSize, in which case, the task will be
165  * rejected.
166  *
167  * </ul>
168  *
169  * There are three general strategies for queuing:
170  * <ol>
171  *
172  * <li><em> Direct handoffs.</em> A good default choice for a work
173  * queue is a {@link SynchronousQueue} that hands off tasks to threads
174  * without otherwise holding them. Here, an attempt to queue a task
175  * will fail if no threads are immediately available to run it, so a
176  * new thread will be constructed. This policy avoids lockups when
177  * handling sets of requests that might have internal dependencies.
178  * Direct handoffs generally require unbounded maximumPoolSizes to
179  * avoid rejection of new submitted tasks. This in turn admits the
180  * possibility of unbounded thread growth when commands continue to
181  * arrive on average faster than they can be processed.
182  *
183  * <li><em> Unbounded queues.</em> Using an unbounded queue (for
184  * example a {@link LinkedBlockingQueue} without a predefined
185  * capacity) will cause new tasks to wait in the queue when all
186  * corePoolSize threads are busy. Thus, no more than corePoolSize
187  * threads will ever be created. (And the value of the maximumPoolSize
188  * therefore doesn't have any effect.)  This may be appropriate when
189  * each task is completely independent of others, so tasks cannot
190  * affect each others execution; for example, in a web page server.
191  * While this style of queuing can be useful in smoothing out
192  * transient bursts of requests, it admits the possibility of
193  * unbounded work queue growth when commands continue to arrive on
194  * average faster than they can be processed.
195  *
196  * <li><em>Bounded queues.</em> A bounded queue (for example, an
197  * {@link ArrayBlockingQueue}) helps prevent resource exhaustion when
198  * used with finite maximumPoolSizes, but can be more difficult to
199  * tune and control.  Queue sizes and maximum pool sizes may be traded
200  * off for each other: Using large queues and small pools minimizes
201  * CPU usage, OS resources, and context-switching overhead, but can
202  * lead to artificially low throughput.  If tasks frequently block (for
203  * example if they are I/O bound), a system may be able to schedule
204  * time for more threads than you otherwise allow. Use of small queues
205  * generally requires larger pool sizes, which keeps CPUs busier but
206  * may encounter unacceptable scheduling overhead, which also
207  * decreases throughput.
208  *
209  * </ol>
210  *
211  * </dd>
212  *
213  * <dt>Rejected tasks</dt>
214  *
215  * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
216  * New tasks submitted in method {@link #execute(Runnable)} will be
217  * <em>rejected</em> when the Executor has been shut down, and also when
218  * the Executor uses finite bounds for both maximum threads and work queue
219  * capacity, and is saturated.  In either case, the {@code execute} method
220  * invokes the {@link
221  * RejectedExecutionHandler#rejectedExecution(Runnable, ThreadPoolExecutor)}
222  * method of its {@link RejectedExecutionHandler}.  Four predefined handler
223  * policies are provided:
224  *
225  * <ol>
226  *
227  * <li>In the default {@link ThreadPoolExecutor.AbortPolicy}, the
228  * handler throws a runtime {@link RejectedExecutionException} upon
229  * rejection.
230  *
231  * <li>In {@link ThreadPoolExecutor.CallerRunsPolicy}, the thread
232  * that invokes {@code execute} itself runs the task. This provides a
233  * simple feedback control mechanism that will slow down the rate that
234  * new tasks are submitted.
235  *
236  * <li>In {@link ThreadPoolExecutor.DiscardPolicy}, a task that
237  * cannot be executed is simply dropped.
238  *
239  * <li>In {@link ThreadPoolExecutor.DiscardOldestPolicy}, if the
240  * executor is not shut down, the task at the head of the work queue
241  * is dropped, and then execution is retried (which can fail again,
242  * causing this to be repeated.)
243  *
244  * </ol>
245  *
246  * It is possible to define and use other kinds of {@link
247  * RejectedExecutionHandler} classes. Doing so requires some care
248  * especially when policies are designed to work only under particular
249  * capacity or queuing policies. </dd>
250  *
251  * <dt>Hook methods</dt>
252  *
253  * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
254  * This class provides {@code protected} overridable
255  * {@link #beforeExecute(Thread, Runnable)} and
256  * {@link #afterExecute(Runnable, Throwable)} methods that are called
257  * before and after execution of each task.  These can be used to
258  * manipulate the execution environment; for example, reinitializing
259  * ThreadLocals, gathering statistics, or adding log entries.
260  * Additionally, method {@link #terminated} can be overridden to perform
261  * any special processing that needs to be done once the Executor has
262  * fully terminated.
263  *
264  * <p>If hook, callback, or BlockingQueue methods throw exceptions,
265  * internal worker threads may in turn fail, abruptly terminate, and
266  * possibly be replaced.</dd>
267  *
268  * <dt>Queue maintenance</dt>
269  *
270  * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
271  * Method {@link #getQueue()} allows access to the work queue
272  * for purposes of monitoring and debugging.  Use of this method for
273  * any other purpose is strongly discouraged.  Two supplied methods,
274  * {@link #remove(Runnable)} and {@link #purge} are available to
275  * assist in storage reclamation when large numbers of queued tasks
276  * become cancelled.</dd>
277  *
278  * <dt>Finalization</dt>
279  *
280  * <dd style="font-family:'DejaVu Sans', Arial, Helvetica, sans-serif">
281  * A pool that is no longer referenced in a program <em>AND</em>
282  * has no remaining threads will be {@code shutdown} automatically. If
283  * you would like to ensure that unreferenced pools are reclaimed even
284  * if users forget to call {@link #shutdown}, then you must arrange
285  * that unused threads eventually die, by setting appropriate
286  * keep-alive times, using a lower bound of zero core threads and/or
287  * setting {@link #allowCoreThreadTimeOut(boolean)}.  </dd>
288  *
289  * </dl>
290  *
291  * <p><b>Extension example</b>. Most extensions of this class
292  * override one or more of the protected hook methods. For example,
293  * here is a subclass that adds a simple pause/resume feature:
294  *
295  * <pre> {@code
296  * class PausableThreadPoolExecutor extends ThreadPoolExecutor {
297  *   private boolean isPaused;
298  *   private ReentrantLock pauseLock = new ReentrantLock();
299  *   private Condition unpaused = pauseLock.newCondition();
300  *
301  *   public PausableThreadPoolExecutor(...) { super(...); }
302  *
303  *   protected void beforeExecute(Thread t, Runnable r) {
304  *     super.beforeExecute(t, r);
305  *     pauseLock.lock();
306  *     try {
307  *       while (isPaused) unpaused.await();
308  *     } catch (InterruptedException ie) {
309  *       t.interrupt();
310  *     } finally {
311  *       pauseLock.unlock();
312  *     }
313  *   }
314  *
315  *   public void pause() {
316  *     pauseLock.lock();
317  *     try {
318  *       isPaused = true;
319  *     } finally {
320  *       pauseLock.unlock();
321  *     }
322  *   }
323  *
324  *   public void resume() {
325  *     pauseLock.lock();
326  *     try {
327  *       isPaused = false;
328  *       unpaused.signalAll();
329  *     } finally {
330  *       pauseLock.unlock();
331  *     }
332  *   }
333  * }}</pre>
334  *
335  * @since 1.5
336  * @author Doug Lea
337  */
338 public class ThreadPoolExecutor extends AbstractExecutorService {
339     /**
340      * The main pool control state, ctl, is an atomic integer packing
341      * two conceptual fields
342      *   workerCount, indicating the effective number of threads
343      *   runState,    indicating whether running, shutting down etc
344      *
345      * In order to pack them into one int, we limit workerCount to
346      * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2
347      * billion) otherwise representable. If this is ever an issue in
348      * the future, the variable can be changed to be an AtomicLong,
349      * and the shift/mask constants below adjusted. But until the need
350      * arises, this code is a bit faster and simpler using an int.
351      *
352      * The workerCount is the number of workers that have been
353      * permitted to start and not permitted to stop.  The value may be
354      * transiently different from the actual number of live threads,
355      * for example when a ThreadFactory fails to create a thread when
356      * asked, and when exiting threads are still performing
357      * bookkeeping before terminating. The user-visible pool size is
358      * reported as the current size of the workers set.
359      *
360      * The runState provides the main lifecycle control, taking on values:
361      *
362      *   RUNNING:  Accept new tasks and process queued tasks
363      *   SHUTDOWN: Don't accept new tasks, but process queued tasks
364      *   STOP:     Don't accept new tasks, don't process queued tasks,
365      *             and interrupt in-progress tasks
366      *   TIDYING:  All tasks have terminated, workerCount is zero,
367      *             the thread transitioning to state TIDYING
368      *             will run the terminated() hook method
369      *   TERMINATED: terminated() has completed
370      *
371      * The numerical order among these values matters, to allow
372      * ordered comparisons. The runState monotonically increases over
373      * time, but need not hit each state. The transitions are:
374      *
375      * RUNNING -> SHUTDOWN
376      *    On invocation of shutdown(), perhaps implicitly in finalize()
377      * (RUNNING or SHUTDOWN) -> STOP
378      *    On invocation of shutdownNow()
379      * SHUTDOWN -> TIDYING
380      *    When both queue and pool are empty
381      * STOP -> TIDYING
382      *    When pool is empty
383      * TIDYING -> TERMINATED
384      *    When the terminated() hook method has completed
385      *
386      * Threads waiting in awaitTermination() will return when the
387      * state reaches TERMINATED.
388      *
389      * Detecting the transition from SHUTDOWN to TIDYING is less
390      * straightforward than you'd like because the queue may become
391      * empty after non-empty and vice versa during SHUTDOWN state, but
392      * we can only terminate if, after seeing that it is empty, we see
393      * that workerCount is 0 (which sometimes entails a recheck -- see
394      * below).
395      */
396     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
397     private static final int COUNT_BITS = Integer.SIZE - 3;
398     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
399 
400     // runState is stored in the high-order bits
401     private static final int RUNNING    = -1 << COUNT_BITS;
402     private static final int SHUTDOWN   =  0 << COUNT_BITS;
403     private static final int STOP       =  1 << COUNT_BITS;
404     private static final int TIDYING    =  2 << COUNT_BITS;
405     private static final int TERMINATED =  3 << COUNT_BITS;
406 
407     // Packing and unpacking ctl
runStateOf(int c)408     private static int runStateOf(int c)     { return c & ~CAPACITY; }
workerCountOf(int c)409     private static int workerCountOf(int c)  { return c & CAPACITY; }
ctlOf(int rs, int wc)410     private static int ctlOf(int rs, int wc) { return rs | wc; }
411 
412     /*
413      * Bit field accessors that don't require unpacking ctl.
414      * These depend on the bit layout and on workerCount being never negative.
415      */
416 
runStateLessThan(int c, int s)417     private static boolean runStateLessThan(int c, int s) {
418         return c < s;
419     }
420 
runStateAtLeast(int c, int s)421     private static boolean runStateAtLeast(int c, int s) {
422         return c >= s;
423     }
424 
isRunning(int c)425     private static boolean isRunning(int c) {
426         return c < SHUTDOWN;
427     }
428 
429     /**
430      * Attempts to CAS-increment the workerCount field of ctl.
431      */
compareAndIncrementWorkerCount(int expect)432     private boolean compareAndIncrementWorkerCount(int expect) {
433         return ctl.compareAndSet(expect, expect + 1);
434     }
435 
436     /**
437      * Attempts to CAS-decrement the workerCount field of ctl.
438      */
compareAndDecrementWorkerCount(int expect)439     private boolean compareAndDecrementWorkerCount(int expect) {
440         return ctl.compareAndSet(expect, expect - 1);
441     }
442 
443     /**
444      * Decrements the workerCount field of ctl. This is called only on
445      * abrupt termination of a thread (see processWorkerExit). Other
446      * decrements are performed within getTask.
447      */
decrementWorkerCount()448     private void decrementWorkerCount() {
449         do {} while (! compareAndDecrementWorkerCount(ctl.get()));
450     }
451 
452     /**
453      * The queue used for holding tasks and handing off to worker
454      * threads.  We do not require that workQueue.poll() returning
455      * null necessarily means that workQueue.isEmpty(), so rely
456      * solely on isEmpty to see if the queue is empty (which we must
457      * do for example when deciding whether to transition from
458      * SHUTDOWN to TIDYING).  This accommodates special-purpose
459      * queues such as DelayQueues for which poll() is allowed to
460      * return null even if it may later return non-null when delays
461      * expire.
462      */
463     private final BlockingQueue<Runnable> workQueue;
464 
465     /**
466      * Lock held on access to workers set and related bookkeeping.
467      * While we could use a concurrent set of some sort, it turns out
468      * to be generally preferable to use a lock. Among the reasons is
469      * that this serializes interruptIdleWorkers, which avoids
470      * unnecessary interrupt storms, especially during shutdown.
471      * Otherwise exiting threads would concurrently interrupt those
472      * that have not yet interrupted. It also simplifies some of the
473      * associated statistics bookkeeping of largestPoolSize etc. We
474      * also hold mainLock on shutdown and shutdownNow, for the sake of
475      * ensuring workers set is stable while separately checking
476      * permission to interrupt and actually interrupting.
477      */
478     private final ReentrantLock mainLock = new ReentrantLock();
479 
480     /**
481      * Set containing all worker threads in pool. Accessed only when
482      * holding mainLock.
483      */
484     private final HashSet<Worker> workers = new HashSet<>();
485 
486     /**
487      * Wait condition to support awaitTermination.
488      */
489     private final Condition termination = mainLock.newCondition();
490 
491     /**
492      * Tracks largest attained pool size. Accessed only under
493      * mainLock.
494      */
495     private int largestPoolSize;
496 
497     /**
498      * Counter for completed tasks. Updated only on termination of
499      * worker threads. Accessed only under mainLock.
500      */
501     private long completedTaskCount;
502 
503     /*
504      * All user control parameters are declared as volatiles so that
505      * ongoing actions are based on freshest values, but without need
506      * for locking, since no internal invariants depend on them
507      * changing synchronously with respect to other actions.
508      */
509 
510     /**
511      * Factory for new threads. All threads are created using this
512      * factory (via method addWorker).  All callers must be prepared
513      * for addWorker to fail, which may reflect a system or user's
514      * policy limiting the number of threads.  Even though it is not
515      * treated as an error, failure to create threads may result in
516      * new tasks being rejected or existing ones remaining stuck in
517      * the queue.
518      *
519      * We go further and preserve pool invariants even in the face of
520      * errors such as OutOfMemoryError, that might be thrown while
521      * trying to create threads.  Such errors are rather common due to
522      * the need to allocate a native stack in Thread.start, and users
523      * will want to perform clean pool shutdown to clean up.  There
524      * will likely be enough memory available for the cleanup code to
525      * complete without encountering yet another OutOfMemoryError.
526      */
527     private volatile ThreadFactory threadFactory;
528 
529     /**
530      * Handler called when saturated or shutdown in execute.
531      */
532     private volatile RejectedExecutionHandler handler;
533 
534     /**
535      * Timeout in nanoseconds for idle threads waiting for work.
536      * Threads use this timeout when there are more than corePoolSize
537      * present or if allowCoreThreadTimeOut. Otherwise they wait
538      * forever for new work.
539      */
540     private volatile long keepAliveTime;
541 
542     /**
543      * If false (default), core threads stay alive even when idle.
544      * If true, core threads use keepAliveTime to time out waiting
545      * for work.
546      */
547     private volatile boolean allowCoreThreadTimeOut;
548 
549     /**
550      * Core pool size is the minimum number of workers to keep alive
551      * (and not allow to time out etc) unless allowCoreThreadTimeOut
552      * is set, in which case the minimum is zero.
553      */
554     private volatile int corePoolSize;
555 
556     /**
557      * Maximum pool size. Note that the actual maximum is internally
558      * bounded by CAPACITY.
559      */
560     private volatile int maximumPoolSize;
561 
562     /**
563      * The default rejected execution handler.
564      */
565     private static final RejectedExecutionHandler defaultHandler =
566         new AbortPolicy();
567 
568     /**
569      * Permission required for callers of shutdown and shutdownNow.
570      * We additionally require (see checkShutdownAccess) that callers
571      * have permission to actually interrupt threads in the worker set
572      * (as governed by Thread.interrupt, which relies on
573      * ThreadGroup.checkAccess, which in turn relies on
574      * SecurityManager.checkAccess). Shutdowns are attempted only if
575      * these checks pass.
576      *
577      * All actual invocations of Thread.interrupt (see
578      * interruptIdleWorkers and interruptWorkers) ignore
579      * SecurityExceptions, meaning that the attempted interrupts
580      * silently fail. In the case of shutdown, they should not fail
581      * unless the SecurityManager has inconsistent policies, sometimes
582      * allowing access to a thread and sometimes not. In such cases,
583      * failure to actually interrupt threads may disable or delay full
584      * termination. Other uses of interruptIdleWorkers are advisory,
585      * and failure to actually interrupt will merely delay response to
586      * configuration changes so is not handled exceptionally.
587      */
588     private static final RuntimePermission shutdownPerm =
589         new RuntimePermission("modifyThread");
590 
591     /**
592      * Class Worker mainly maintains interrupt control state for
593      * threads running tasks, along with other minor bookkeeping.
594      * This class opportunistically extends AbstractQueuedSynchronizer
595      * to simplify acquiring and releasing a lock surrounding each
596      * task execution.  This protects against interrupts that are
597      * intended to wake up a worker thread waiting for a task from
598      * instead interrupting a task being run.  We implement a simple
599      * non-reentrant mutual exclusion lock rather than use
600      * ReentrantLock because we do not want worker tasks to be able to
601      * reacquire the lock when they invoke pool control methods like
602      * setCorePoolSize.  Additionally, to suppress interrupts until
603      * the thread actually starts running tasks, we initialize lock
604      * state to a negative value, and clear it upon start (in
605      * runWorker).
606      */
607     private final class Worker
608         extends AbstractQueuedSynchronizer
609         implements Runnable
610     {
611         /**
612          * This class will never be serialized, but we provide a
613          * serialVersionUID to suppress a javac warning.
614          */
615         private static final long serialVersionUID = 6138294804551838833L;
616 
617         /** Thread this worker is running in.  Null if factory fails. */
618         final Thread thread;
619         /** Initial task to run.  Possibly null. */
620         Runnable firstTask;
621         /** Per-thread task counter */
622         volatile long completedTasks;
623 
624         /**
625          * Creates with given first task and thread from ThreadFactory.
626          * @param firstTask the first task (null if none)
627          */
Worker(Runnable firstTask)628         Worker(Runnable firstTask) {
629             setState(-1); // inhibit interrupts until runWorker
630             this.firstTask = firstTask;
631             this.thread = getThreadFactory().newThread(this);
632         }
633 
634         /** Delegates main run loop to outer runWorker. */
run()635         public void run() {
636             runWorker(this);
637         }
638 
639         // Lock methods
640         //
641         // The value 0 represents the unlocked state.
642         // The value 1 represents the locked state.
643 
isHeldExclusively()644         protected boolean isHeldExclusively() {
645             return getState() != 0;
646         }
647 
tryAcquire(int unused)648         protected boolean tryAcquire(int unused) {
649             if (compareAndSetState(0, 1)) {
650                 setExclusiveOwnerThread(Thread.currentThread());
651                 return true;
652             }
653             return false;
654         }
655 
tryRelease(int unused)656         protected boolean tryRelease(int unused) {
657             setExclusiveOwnerThread(null);
658             setState(0);
659             return true;
660         }
661 
lock()662         public void lock()        { acquire(1); }
tryLock()663         public boolean tryLock()  { return tryAcquire(1); }
unlock()664         public void unlock()      { release(1); }
isLocked()665         public boolean isLocked() { return isHeldExclusively(); }
666 
interruptIfStarted()667         void interruptIfStarted() {
668             Thread t;
669             if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
670                 try {
671                     t.interrupt();
672                 } catch (SecurityException ignore) {
673                 }
674             }
675         }
676     }
677 
678     /*
679      * Methods for setting control state
680      */
681 
682     /**
683      * Transitions runState to given target, or leaves it alone if
684      * already at least the given target.
685      *
686      * @param targetState the desired state, either SHUTDOWN or STOP
687      *        (but not TIDYING or TERMINATED -- use tryTerminate for that)
688      */
advanceRunState(int targetState)689     private void advanceRunState(int targetState) {
690         // assert targetState == SHUTDOWN || targetState == STOP;
691         for (;;) {
692             int c = ctl.get();
693             if (runStateAtLeast(c, targetState) ||
694                 ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
695                 break;
696         }
697     }
698 
699     /**
700      * Transitions to TERMINATED state if either (SHUTDOWN and pool
701      * and queue empty) or (STOP and pool empty).  If otherwise
702      * eligible to terminate but workerCount is nonzero, interrupts an
703      * idle worker to ensure that shutdown signals propagate. This
704      * method must be called following any action that might make
705      * termination possible -- reducing worker count or removing tasks
706      * from the queue during shutdown. The method is non-private to
707      * allow access from ScheduledThreadPoolExecutor.
708      */
tryTerminate()709     final void tryTerminate() {
710         for (;;) {
711             int c = ctl.get();
712             if (isRunning(c) ||
713                 runStateAtLeast(c, TIDYING) ||
714                 (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
715                 return;
716             if (workerCountOf(c) != 0) { // Eligible to terminate
717                 interruptIdleWorkers(ONLY_ONE);
718                 return;
719             }
720 
721             final ReentrantLock mainLock = this.mainLock;
722             mainLock.lock();
723             try {
724                 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
725                     try {
726                         terminated();
727                     } finally {
728                         ctl.set(ctlOf(TERMINATED, 0));
729                         termination.signalAll();
730                     }
731                     return;
732                 }
733             } finally {
734                 mainLock.unlock();
735             }
736             // else retry on failed CAS
737         }
738     }
739 
740     /*
741      * Methods for controlling interrupts to worker threads.
742      */
743 
744     /**
745      * If there is a security manager, makes sure caller has
746      * permission to shut down threads in general (see shutdownPerm).
747      * If this passes, additionally makes sure the caller is allowed
748      * to interrupt each worker thread. This might not be true even if
749      * first check passed, if the SecurityManager treats some threads
750      * specially.
751      */
checkShutdownAccess()752     private void checkShutdownAccess() {
753         SecurityManager security = System.getSecurityManager();
754         if (security != null) {
755             security.checkPermission(shutdownPerm);
756             final ReentrantLock mainLock = this.mainLock;
757             mainLock.lock();
758             try {
759                 for (Worker w : workers)
760                     security.checkAccess(w.thread);
761             } finally {
762                 mainLock.unlock();
763             }
764         }
765     }
766 
767     /**
768      * Interrupts all threads, even if active. Ignores SecurityExceptions
769      * (in which case some threads may remain uninterrupted).
770      */
interruptWorkers()771     private void interruptWorkers() {
772         final ReentrantLock mainLock = this.mainLock;
773         mainLock.lock();
774         try {
775             for (Worker w : workers)
776                 w.interruptIfStarted();
777         } finally {
778             mainLock.unlock();
779         }
780     }
781 
782     /**
783      * Interrupts threads that might be waiting for tasks (as
784      * indicated by not being locked) so they can check for
785      * termination or configuration changes. Ignores
786      * SecurityExceptions (in which case some threads may remain
787      * uninterrupted).
788      *
789      * @param onlyOne If true, interrupt at most one worker. This is
790      * called only from tryTerminate when termination is otherwise
791      * enabled but there are still other workers.  In this case, at
792      * most one waiting worker is interrupted to propagate shutdown
793      * signals in case all threads are currently waiting.
794      * Interrupting any arbitrary thread ensures that newly arriving
795      * workers since shutdown began will also eventually exit.
796      * To guarantee eventual termination, it suffices to always
797      * interrupt only one idle worker, but shutdown() interrupts all
798      * idle workers so that redundant workers exit promptly, not
799      * waiting for a straggler task to finish.
800      */
interruptIdleWorkers(boolean onlyOne)801     private void interruptIdleWorkers(boolean onlyOne) {
802         final ReentrantLock mainLock = this.mainLock;
803         mainLock.lock();
804         try {
805             for (Worker w : workers) {
806                 Thread t = w.thread;
807                 if (!t.isInterrupted() && w.tryLock()) {
808                     try {
809                         t.interrupt();
810                     } catch (SecurityException ignore) {
811                     } finally {
812                         w.unlock();
813                     }
814                 }
815                 if (onlyOne)
816                     break;
817             }
818         } finally {
819             mainLock.unlock();
820         }
821     }
822 
823     /**
824      * Common form of interruptIdleWorkers, to avoid having to
825      * remember what the boolean argument means.
826      */
interruptIdleWorkers()827     private void interruptIdleWorkers() {
828         interruptIdleWorkers(false);
829     }
830 
831     private static final boolean ONLY_ONE = true;
832 
833     /*
834      * Misc utilities, most of which are also exported to
835      * ScheduledThreadPoolExecutor
836      */
837 
838     /**
839      * Invokes the rejected execution handler for the given command.
840      * Package-protected for use by ScheduledThreadPoolExecutor.
841      */
reject(Runnable command)842     final void reject(Runnable command) {
843         handler.rejectedExecution(command, this);
844     }
845 
846     /**
847      * Performs any further cleanup following run state transition on
848      * invocation of shutdown.  A no-op here, but used by
849      * ScheduledThreadPoolExecutor to cancel delayed tasks.
850      */
onShutdown()851     void onShutdown() {
852     }
853 
854     /**
855      * State check needed by ScheduledThreadPoolExecutor to
856      * enable running tasks during shutdown.
857      *
858      * @param shutdownOK true if should return true if SHUTDOWN
859      */
isRunningOrShutdown(boolean shutdownOK)860     final boolean isRunningOrShutdown(boolean shutdownOK) {
861         int rs = runStateOf(ctl.get());
862         return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
863     }
864 
865     /**
866      * Drains the task queue into a new list, normally using
867      * drainTo. But if the queue is a DelayQueue or any other kind of
868      * queue for which poll or drainTo may fail to remove some
869      * elements, it deletes them one by one.
870      */
drainQueue()871     private List<Runnable> drainQueue() {
872         BlockingQueue<Runnable> q = workQueue;
873         ArrayList<Runnable> taskList = new ArrayList<>();
874         q.drainTo(taskList);
875         if (!q.isEmpty()) {
876             for (Runnable r : q.toArray(new Runnable[0])) {
877                 if (q.remove(r))
878                     taskList.add(r);
879             }
880         }
881         return taskList;
882     }
883 
884     /*
885      * Methods for creating, running and cleaning up after workers
886      */
887 
888     /**
889      * Checks if a new worker can be added with respect to current
890      * pool state and the given bound (either core or maximum). If so,
891      * the worker count is adjusted accordingly, and, if possible, a
892      * new worker is created and started, running firstTask as its
893      * first task. This method returns false if the pool is stopped or
894      * eligible to shut down. It also returns false if the thread
895      * factory fails to create a thread when asked.  If the thread
896      * creation fails, either due to the thread factory returning
897      * null, or due to an exception (typically OutOfMemoryError in
898      * Thread.start()), we roll back cleanly.
899      *
900      * @param firstTask the task the new thread should run first (or
901      * null if none). Workers are created with an initial first task
902      * (in method execute()) to bypass queuing when there are fewer
903      * than corePoolSize threads (in which case we always start one),
904      * or when the queue is full (in which case we must bypass queue).
905      * Initially idle threads are usually created via
906      * prestartCoreThread or to replace other dying workers.
907      *
908      * @param core if true use corePoolSize as bound, else
909      * maximumPoolSize. (A boolean indicator is used here rather than a
910      * value to ensure reads of fresh values after checking other pool
911      * state).
912      * @return true if successful
913      */
addWorker(Runnable firstTask, boolean core)914     private boolean addWorker(Runnable firstTask, boolean core) {
915         retry:
916         for (;;) {
917             int c = ctl.get();
918             int rs = runStateOf(c);
919 
920             // Check if queue empty only if necessary.
921             if (rs >= SHUTDOWN &&
922                 ! (rs == SHUTDOWN &&
923                    firstTask == null &&
924                    ! workQueue.isEmpty()))
925                 return false;
926 
927             for (;;) {
928                 int wc = workerCountOf(c);
929                 if (wc >= CAPACITY ||
930                     wc >= (core ? corePoolSize : maximumPoolSize))
931                     return false;
932                 if (compareAndIncrementWorkerCount(c))
933                     break retry;
934                 c = ctl.get();  // Re-read ctl
935                 if (runStateOf(c) != rs)
936                     continue retry;
937                 // else CAS failed due to workerCount change; retry inner loop
938             }
939         }
940 
941         boolean workerStarted = false;
942         boolean workerAdded = false;
943         Worker w = null;
944         try {
945             w = new Worker(firstTask);
946             final Thread t = w.thread;
947             if (t != null) {
948                 final ReentrantLock mainLock = this.mainLock;
949                 mainLock.lock();
950                 try {
951                     // Recheck while holding lock.
952                     // Back out on ThreadFactory failure or if
953                     // shut down before lock acquired.
954                     int rs = runStateOf(ctl.get());
955 
956                     if (rs < SHUTDOWN ||
957                         (rs == SHUTDOWN && firstTask == null)) {
958                         if (t.isAlive()) // precheck that t is startable
959                             throw new IllegalThreadStateException();
960                         workers.add(w);
961                         int s = workers.size();
962                         if (s > largestPoolSize)
963                             largestPoolSize = s;
964                         workerAdded = true;
965                     }
966                 } finally {
967                     mainLock.unlock();
968                 }
969                 if (workerAdded) {
970                     t.start();
971                     workerStarted = true;
972                 }
973             }
974         } finally {
975             if (! workerStarted)
976                 addWorkerFailed(w);
977         }
978         return workerStarted;
979     }
980 
981     /**
982      * Rolls back the worker thread creation.
983      * - removes worker from workers, if present
984      * - decrements worker count
985      * - rechecks for termination, in case the existence of this
986      *   worker was holding up termination
987      */
addWorkerFailed(Worker w)988     private void addWorkerFailed(Worker w) {
989         final ReentrantLock mainLock = this.mainLock;
990         mainLock.lock();
991         try {
992             if (w != null)
993                 workers.remove(w);
994             decrementWorkerCount();
995             tryTerminate();
996         } finally {
997             mainLock.unlock();
998         }
999     }
1000 
1001     /**
1002      * Performs cleanup and bookkeeping for a dying worker. Called
1003      * only from worker threads. Unless completedAbruptly is set,
1004      * assumes that workerCount has already been adjusted to account
1005      * for exit.  This method removes thread from worker set, and
1006      * possibly terminates the pool or replaces the worker if either
1007      * it exited due to user task exception or if fewer than
1008      * corePoolSize workers are running or queue is non-empty but
1009      * there are no workers.
1010      *
1011      * @param w the worker
1012      * @param completedAbruptly if the worker died due to user exception
1013      */
processWorkerExit(Worker w, boolean completedAbruptly)1014     private void processWorkerExit(Worker w, boolean completedAbruptly) {
1015         if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
1016             decrementWorkerCount();
1017 
1018         final ReentrantLock mainLock = this.mainLock;
1019         mainLock.lock();
1020         try {
1021             completedTaskCount += w.completedTasks;
1022             workers.remove(w);
1023         } finally {
1024             mainLock.unlock();
1025         }
1026 
1027         tryTerminate();
1028 
1029         int c = ctl.get();
1030         if (runStateLessThan(c, STOP)) {
1031             if (!completedAbruptly) {
1032                 int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
1033                 if (min == 0 && ! workQueue.isEmpty())
1034                     min = 1;
1035                 if (workerCountOf(c) >= min)
1036                     return; // replacement not needed
1037             }
1038             addWorker(null, false);
1039         }
1040     }
1041 
1042     /**
1043      * Performs blocking or timed wait for a task, depending on
1044      * current configuration settings, or returns null if this worker
1045      * must exit because of any of:
1046      * 1. There are more than maximumPoolSize workers (due to
1047      *    a call to setMaximumPoolSize).
1048      * 2. The pool is stopped.
1049      * 3. The pool is shutdown and the queue is empty.
1050      * 4. This worker timed out waiting for a task, and timed-out
1051      *    workers are subject to termination (that is,
1052      *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
1053      *    both before and after the timed wait, and if the queue is
1054      *    non-empty, this worker is not the last thread in the pool.
1055      *
1056      * @return task, or null if the worker must exit, in which case
1057      *         workerCount is decremented
1058      */
getTask()1059     private Runnable getTask() {
1060         boolean timedOut = false; // Did the last poll() time out?
1061 
1062         for (;;) {
1063             int c = ctl.get();
1064             int rs = runStateOf(c);
1065 
1066             // Check if queue empty only if necessary.
1067             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
1068                 decrementWorkerCount();
1069                 return null;
1070             }
1071 
1072             int wc = workerCountOf(c);
1073 
1074             // Are workers subject to culling?
1075             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
1076 
1077             if ((wc > maximumPoolSize || (timed && timedOut))
1078                 && (wc > 1 || workQueue.isEmpty())) {
1079                 if (compareAndDecrementWorkerCount(c))
1080                     return null;
1081                 continue;
1082             }
1083 
1084             try {
1085                 Runnable r = timed ?
1086                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
1087                     workQueue.take();
1088                 if (r != null)
1089                     return r;
1090                 timedOut = true;
1091             } catch (InterruptedException retry) {
1092                 timedOut = false;
1093             }
1094         }
1095     }
1096 
1097     /**
1098      * Main worker run loop.  Repeatedly gets tasks from queue and
1099      * executes them, while coping with a number of issues:
1100      *
1101      * 1. We may start out with an initial task, in which case we
1102      * don't need to get the first one. Otherwise, as long as pool is
1103      * running, we get tasks from getTask. If it returns null then the
1104      * worker exits due to changed pool state or configuration
1105      * parameters.  Other exits result from exception throws in
1106      * external code, in which case completedAbruptly holds, which
1107      * usually leads processWorkerExit to replace this thread.
1108      *
1109      * 2. Before running any task, the lock is acquired to prevent
1110      * other pool interrupts while the task is executing, and then we
1111      * ensure that unless pool is stopping, this thread does not have
1112      * its interrupt set.
1113      *
1114      * 3. Each task run is preceded by a call to beforeExecute, which
1115      * might throw an exception, in which case we cause thread to die
1116      * (breaking loop with completedAbruptly true) without processing
1117      * the task.
1118      *
1119      * 4. Assuming beforeExecute completes normally, we run the task,
1120      * gathering any of its thrown exceptions to send to afterExecute.
1121      * We separately handle RuntimeException, Error (both of which the
1122      * specs guarantee that we trap) and arbitrary Throwables.
1123      * Because we cannot rethrow Throwables within Runnable.run, we
1124      * wrap them within Errors on the way out (to the thread's
1125      * UncaughtExceptionHandler).  Any thrown exception also
1126      * conservatively causes thread to die.
1127      *
1128      * 5. After task.run completes, we call afterExecute, which may
1129      * also throw an exception, which will also cause thread to
1130      * die. According to JLS Sec 14.20, this exception is the one that
1131      * will be in effect even if task.run throws.
1132      *
1133      * The net effect of the exception mechanics is that afterExecute
1134      * and the thread's UncaughtExceptionHandler have as accurate
1135      * information as we can provide about any problems encountered by
1136      * user code.
1137      *
1138      * @param w the worker
1139      */
runWorker(Worker w)1140     final void runWorker(Worker w) {
1141         Thread wt = Thread.currentThread();
1142         Runnable task = w.firstTask;
1143         w.firstTask = null;
1144         w.unlock(); // allow interrupts
1145         boolean completedAbruptly = true;
1146         try {
1147             while (task != null || (task = getTask()) != null) {
1148                 w.lock();
1149                 // If pool is stopping, ensure thread is interrupted;
1150                 // if not, ensure thread is not interrupted.  This
1151                 // requires a recheck in second case to deal with
1152                 // shutdownNow race while clearing interrupt
1153                 if ((runStateAtLeast(ctl.get(), STOP) ||
1154                      (Thread.interrupted() &&
1155                       runStateAtLeast(ctl.get(), STOP))) &&
1156                     !wt.isInterrupted())
1157                     wt.interrupt();
1158                 try {
1159                     beforeExecute(wt, task);
1160                     Throwable thrown = null;
1161                     try {
1162                         task.run();
1163                     } catch (RuntimeException x) {
1164                         thrown = x; throw x;
1165                     } catch (Error x) {
1166                         thrown = x; throw x;
1167                     } catch (Throwable x) {
1168                         thrown = x; throw new Error(x);
1169                     } finally {
1170                         afterExecute(task, thrown);
1171                     }
1172                 } finally {
1173                     task = null;
1174                     w.completedTasks++;
1175                     w.unlock();
1176                 }
1177             }
1178             completedAbruptly = false;
1179         } finally {
1180             processWorkerExit(w, completedAbruptly);
1181         }
1182     }
1183 
1184     // Public constructors and methods
1185 
1186     /**
1187      * Creates a new {@code ThreadPoolExecutor} with the given initial
1188      * parameters and default thread factory and rejected execution handler.
1189      * It may be more convenient to use one of the {@link Executors} factory
1190      * methods instead of this general purpose constructor.
1191      *
1192      * @param corePoolSize the number of threads to keep in the pool, even
1193      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1194      * @param maximumPoolSize the maximum number of threads to allow in the
1195      *        pool
1196      * @param keepAliveTime when the number of threads is greater than
1197      *        the core, this is the maximum time that excess idle threads
1198      *        will wait for new tasks before terminating.
1199      * @param unit the time unit for the {@code keepAliveTime} argument
1200      * @param workQueue the queue to use for holding tasks before they are
1201      *        executed.  This queue will hold only the {@code Runnable}
1202      *        tasks submitted by the {@code execute} method.
1203      * @throws IllegalArgumentException if one of the following holds:<br>
1204      *         {@code corePoolSize < 0}<br>
1205      *         {@code keepAliveTime < 0}<br>
1206      *         {@code maximumPoolSize <= 0}<br>
1207      *         {@code maximumPoolSize < corePoolSize}
1208      * @throws NullPointerException if {@code workQueue} is null
1209      */
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)1210     public ThreadPoolExecutor(int corePoolSize,
1211                               int maximumPoolSize,
1212                               long keepAliveTime,
1213                               TimeUnit unit,
1214                               BlockingQueue<Runnable> workQueue) {
1215         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1216              Executors.defaultThreadFactory(), defaultHandler);
1217     }
1218 
1219     /**
1220      * Creates a new {@code ThreadPoolExecutor} with the given initial
1221      * parameters and default rejected execution handler.
1222      *
1223      * @param corePoolSize the number of threads to keep in the pool, even
1224      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1225      * @param maximumPoolSize the maximum number of threads to allow in the
1226      *        pool
1227      * @param keepAliveTime when the number of threads is greater than
1228      *        the core, this is the maximum time that excess idle threads
1229      *        will wait for new tasks before terminating.
1230      * @param unit the time unit for the {@code keepAliveTime} argument
1231      * @param workQueue the queue to use for holding tasks before they are
1232      *        executed.  This queue will hold only the {@code Runnable}
1233      *        tasks submitted by the {@code execute} method.
1234      * @param threadFactory the factory to use when the executor
1235      *        creates a new thread
1236      * @throws IllegalArgumentException if one of the following holds:<br>
1237      *         {@code corePoolSize < 0}<br>
1238      *         {@code keepAliveTime < 0}<br>
1239      *         {@code maximumPoolSize <= 0}<br>
1240      *         {@code maximumPoolSize < corePoolSize}
1241      * @throws NullPointerException if {@code workQueue}
1242      *         or {@code threadFactory} is null
1243      */
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)1244     public ThreadPoolExecutor(int corePoolSize,
1245                               int maximumPoolSize,
1246                               long keepAliveTime,
1247                               TimeUnit unit,
1248                               BlockingQueue<Runnable> workQueue,
1249                               ThreadFactory threadFactory) {
1250         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1251              threadFactory, defaultHandler);
1252     }
1253 
1254     /**
1255      * Creates a new {@code ThreadPoolExecutor} with the given initial
1256      * parameters and default thread factory.
1257      *
1258      * @param corePoolSize the number of threads to keep in the pool, even
1259      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1260      * @param maximumPoolSize the maximum number of threads to allow in the
1261      *        pool
1262      * @param keepAliveTime when the number of threads is greater than
1263      *        the core, this is the maximum time that excess idle threads
1264      *        will wait for new tasks before terminating.
1265      * @param unit the time unit for the {@code keepAliveTime} argument
1266      * @param workQueue the queue to use for holding tasks before they are
1267      *        executed.  This queue will hold only the {@code Runnable}
1268      *        tasks submitted by the {@code execute} method.
1269      * @param handler the handler to use when execution is blocked
1270      *        because the thread bounds and queue capacities are reached
1271      * @throws IllegalArgumentException if one of the following holds:<br>
1272      *         {@code corePoolSize < 0}<br>
1273      *         {@code keepAliveTime < 0}<br>
1274      *         {@code maximumPoolSize <= 0}<br>
1275      *         {@code maximumPoolSize < corePoolSize}
1276      * @throws NullPointerException if {@code workQueue}
1277      *         or {@code handler} is null
1278      */
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)1279     public ThreadPoolExecutor(int corePoolSize,
1280                               int maximumPoolSize,
1281                               long keepAliveTime,
1282                               TimeUnit unit,
1283                               BlockingQueue<Runnable> workQueue,
1284                               RejectedExecutionHandler handler) {
1285         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
1286              Executors.defaultThreadFactory(), handler);
1287     }
1288 
1289     /**
1290      * Creates a new {@code ThreadPoolExecutor} with the given initial
1291      * parameters.
1292      *
1293      * @param corePoolSize the number of threads to keep in the pool, even
1294      *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
1295      * @param maximumPoolSize the maximum number of threads to allow in the
1296      *        pool
1297      * @param keepAliveTime when the number of threads is greater than
1298      *        the core, this is the maximum time that excess idle threads
1299      *        will wait for new tasks before terminating.
1300      * @param unit the time unit for the {@code keepAliveTime} argument
1301      * @param workQueue the queue to use for holding tasks before they are
1302      *        executed.  This queue will hold only the {@code Runnable}
1303      *        tasks submitted by the {@code execute} method.
1304      * @param threadFactory the factory to use when the executor
1305      *        creates a new thread
1306      * @param handler the handler to use when execution is blocked
1307      *        because the thread bounds and queue capacities are reached
1308      * @throws IllegalArgumentException if one of the following holds:<br>
1309      *         {@code corePoolSize < 0}<br>
1310      *         {@code keepAliveTime < 0}<br>
1311      *         {@code maximumPoolSize <= 0}<br>
1312      *         {@code maximumPoolSize < corePoolSize}
1313      * @throws NullPointerException if {@code workQueue}
1314      *         or {@code threadFactory} or {@code handler} is null
1315      */
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)1316     public ThreadPoolExecutor(int corePoolSize,
1317                               int maximumPoolSize,
1318                               long keepAliveTime,
1319                               TimeUnit unit,
1320                               BlockingQueue<Runnable> workQueue,
1321                               ThreadFactory threadFactory,
1322                               RejectedExecutionHandler handler) {
1323         if (corePoolSize < 0 ||
1324             maximumPoolSize <= 0 ||
1325             maximumPoolSize < corePoolSize ||
1326             keepAliveTime < 0)
1327             throw new IllegalArgumentException();
1328         if (workQueue == null || threadFactory == null || handler == null)
1329             throw new NullPointerException();
1330         this.corePoolSize = corePoolSize;
1331         this.maximumPoolSize = maximumPoolSize;
1332         this.workQueue = workQueue;
1333         this.keepAliveTime = unit.toNanos(keepAliveTime);
1334         this.threadFactory = threadFactory;
1335         this.handler = handler;
1336     }
1337 
1338     /**
1339      * Executes the given task sometime in the future.  The task
1340      * may execute in a new thread or in an existing pooled thread.
1341      *
1342      * If the task cannot be submitted for execution, either because this
1343      * executor has been shutdown or because its capacity has been reached,
1344      * the task is handled by the current {@code RejectedExecutionHandler}.
1345      *
1346      * @param command the task to execute
1347      * @throws RejectedExecutionException at discretion of
1348      *         {@code RejectedExecutionHandler}, if the task
1349      *         cannot be accepted for execution
1350      * @throws NullPointerException if {@code command} is null
1351      */
execute(Runnable command)1352     public void execute(Runnable command) {
1353         if (command == null)
1354             throw new NullPointerException();
1355         /*
1356          * Proceed in 3 steps:
1357          *
1358          * 1. If fewer than corePoolSize threads are running, try to
1359          * start a new thread with the given command as its first
1360          * task.  The call to addWorker atomically checks runState and
1361          * workerCount, and so prevents false alarms that would add
1362          * threads when it shouldn't, by returning false.
1363          *
1364          * 2. If a task can be successfully queued, then we still need
1365          * to double-check whether we should have added a thread
1366          * (because existing ones died since last checking) or that
1367          * the pool shut down since entry into this method. So we
1368          * recheck state and if necessary roll back the enqueuing if
1369          * stopped, or start a new thread if there are none.
1370          *
1371          * 3. If we cannot queue task, then we try to add a new
1372          * thread.  If it fails, we know we are shut down or saturated
1373          * and so reject the task.
1374          */
1375         int c = ctl.get();
1376         if (workerCountOf(c) < corePoolSize) {
1377             if (addWorker(command, true))
1378                 return;
1379             c = ctl.get();
1380         }
1381         if (isRunning(c) && workQueue.offer(command)) {
1382             int recheck = ctl.get();
1383             if (! isRunning(recheck) && remove(command))
1384                 reject(command);
1385             else if (workerCountOf(recheck) == 0)
1386                 addWorker(null, false);
1387         }
1388         else if (!addWorker(command, false))
1389             reject(command);
1390     }
1391 
1392     /**
1393      * Initiates an orderly shutdown in which previously submitted
1394      * tasks are executed, but no new tasks will be accepted.
1395      * Invocation has no additional effect if already shut down.
1396      *
1397      * <p>This method does not wait for previously submitted tasks to
1398      * complete execution.  Use {@link #awaitTermination awaitTermination}
1399      * to do that.
1400      */
1401     // android-note: Removed @throws SecurityException
shutdown()1402     public void shutdown() {
1403         final ReentrantLock mainLock = this.mainLock;
1404         mainLock.lock();
1405         try {
1406             checkShutdownAccess();
1407             advanceRunState(SHUTDOWN);
1408             interruptIdleWorkers();
1409             onShutdown(); // hook for ScheduledThreadPoolExecutor
1410         } finally {
1411             mainLock.unlock();
1412         }
1413         tryTerminate();
1414     }
1415 
1416     /**
1417      * Attempts to stop all actively executing tasks, halts the
1418      * processing of waiting tasks, and returns a list of the tasks
1419      * that were awaiting execution. These tasks are drained (removed)
1420      * from the task queue upon return from this method.
1421      *
1422      * <p>This method does not wait for actively executing tasks to
1423      * terminate.  Use {@link #awaitTermination awaitTermination} to
1424      * do that.
1425      *
1426      * <p>There are no guarantees beyond best-effort attempts to stop
1427      * processing actively executing tasks.  This implementation
1428      * interrupts tasks via {@link Thread#interrupt}; any task that
1429      * fails to respond to interrupts may never terminate.
1430      */
1431     // android-note: Removed @throws SecurityException
shutdownNow()1432     public List<Runnable> shutdownNow() {
1433         List<Runnable> tasks;
1434         final ReentrantLock mainLock = this.mainLock;
1435         mainLock.lock();
1436         try {
1437             checkShutdownAccess();
1438             advanceRunState(STOP);
1439             interruptWorkers();
1440             tasks = drainQueue();
1441         } finally {
1442             mainLock.unlock();
1443         }
1444         tryTerminate();
1445         return tasks;
1446     }
1447 
isShutdown()1448     public boolean isShutdown() {
1449         return ! isRunning(ctl.get());
1450     }
1451 
1452     /**
1453      * Returns true if this executor is in the process of terminating
1454      * after {@link #shutdown} or {@link #shutdownNow} but has not
1455      * completely terminated.  This method may be useful for
1456      * debugging. A return of {@code true} reported a sufficient
1457      * period after shutdown may indicate that submitted tasks have
1458      * ignored or suppressed interruption, causing this executor not
1459      * to properly terminate.
1460      *
1461      * @return {@code true} if terminating but not yet terminated
1462      */
isTerminating()1463     public boolean isTerminating() {
1464         int c = ctl.get();
1465         return ! isRunning(c) && runStateLessThan(c, TERMINATED);
1466     }
1467 
isTerminated()1468     public boolean isTerminated() {
1469         return runStateAtLeast(ctl.get(), TERMINATED);
1470     }
1471 
awaitTermination(long timeout, TimeUnit unit)1472     public boolean awaitTermination(long timeout, TimeUnit unit)
1473         throws InterruptedException {
1474         long nanos = unit.toNanos(timeout);
1475         final ReentrantLock mainLock = this.mainLock;
1476         mainLock.lock();
1477         try {
1478             while (!runStateAtLeast(ctl.get(), TERMINATED)) {
1479                 if (nanos <= 0L)
1480                     return false;
1481                 nanos = termination.awaitNanos(nanos);
1482             }
1483             return true;
1484         } finally {
1485             mainLock.unlock();
1486         }
1487     }
1488 
1489     /**
1490      * Invokes {@code shutdown} when this executor is no longer
1491      * referenced and it has no threads.
1492      */
finalize()1493     protected void finalize() {
1494         shutdown();
1495     }
1496 
1497     /**
1498      * Sets the thread factory used to create new threads.
1499      *
1500      * @param threadFactory the new thread factory
1501      * @throws NullPointerException if threadFactory is null
1502      * @see #getThreadFactory
1503      */
setThreadFactory(ThreadFactory threadFactory)1504     public void setThreadFactory(ThreadFactory threadFactory) {
1505         if (threadFactory == null)
1506             throw new NullPointerException();
1507         this.threadFactory = threadFactory;
1508     }
1509 
1510     /**
1511      * Returns the thread factory used to create new threads.
1512      *
1513      * @return the current thread factory
1514      * @see #setThreadFactory(ThreadFactory)
1515      */
getThreadFactory()1516     public ThreadFactory getThreadFactory() {
1517         return threadFactory;
1518     }
1519 
1520     /**
1521      * Sets a new handler for unexecutable tasks.
1522      *
1523      * @param handler the new handler
1524      * @throws NullPointerException if handler is null
1525      * @see #getRejectedExecutionHandler
1526      */
setRejectedExecutionHandler(RejectedExecutionHandler handler)1527     public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
1528         if (handler == null)
1529             throw new NullPointerException();
1530         this.handler = handler;
1531     }
1532 
1533     /**
1534      * Returns the current handler for unexecutable tasks.
1535      *
1536      * @return the current handler
1537      * @see #setRejectedExecutionHandler(RejectedExecutionHandler)
1538      */
getRejectedExecutionHandler()1539     public RejectedExecutionHandler getRejectedExecutionHandler() {
1540         return handler;
1541     }
1542 
1543     /**
1544      * Sets the core number of threads.  This overrides any value set
1545      * in the constructor.  If the new value is smaller than the
1546      * current value, excess existing threads will be terminated when
1547      * they next become idle.  If larger, new threads will, if needed,
1548      * be started to execute any queued tasks.
1549      *
1550      * @param corePoolSize the new core size
1551      * @throws IllegalArgumentException if {@code corePoolSize < 0}
1552      * @see #getCorePoolSize
1553      */
1554      // Android-changed: Reverted code that threw an IAE when
1555      // {@code corePoolSize} is greater than the {@linkplain #getMaximumPoolSize()
1556      // maximum pool size}. This is due to defective code in a commonly used third
1557      // party library that does something like :
1558      //
1559      // exec.setCorePoolSize(N);
1560      // exec.setMaxPoolSize(N);
setCorePoolSize(int corePoolSize)1561     public void setCorePoolSize(int corePoolSize) {
1562         if (corePoolSize < 0)
1563             throw new IllegalArgumentException();
1564         int delta = corePoolSize - this.corePoolSize;
1565         this.corePoolSize = corePoolSize;
1566         if (workerCountOf(ctl.get()) > corePoolSize)
1567             interruptIdleWorkers();
1568         else if (delta > 0) {
1569             // We don't really know how many new threads are "needed".
1570             // As a heuristic, prestart enough new workers (up to new
1571             // core size) to handle the current number of tasks in
1572             // queue, but stop if queue becomes empty while doing so.
1573             int k = Math.min(delta, workQueue.size());
1574             while (k-- > 0 && addWorker(null, true)) {
1575                 if (workQueue.isEmpty())
1576                     break;
1577             }
1578         }
1579     }
1580 
1581     /**
1582      * Returns the core number of threads.
1583      *
1584      * @return the core number of threads
1585      * @see #setCorePoolSize
1586      */
getCorePoolSize()1587     public int getCorePoolSize() {
1588         return corePoolSize;
1589     }
1590 
1591     /**
1592      * Starts a core thread, causing it to idly wait for work. This
1593      * overrides the default policy of starting core threads only when
1594      * new tasks are executed. This method will return {@code false}
1595      * if all core threads have already been started.
1596      *
1597      * @return {@code true} if a thread was started
1598      */
prestartCoreThread()1599     public boolean prestartCoreThread() {
1600         return workerCountOf(ctl.get()) < corePoolSize &&
1601             addWorker(null, true);
1602     }
1603 
1604     /**
1605      * Same as prestartCoreThread except arranges that at least one
1606      * thread is started even if corePoolSize is 0.
1607      */
ensurePrestart()1608     void ensurePrestart() {
1609         int wc = workerCountOf(ctl.get());
1610         if (wc < corePoolSize)
1611             addWorker(null, true);
1612         else if (wc == 0)
1613             addWorker(null, false);
1614     }
1615 
1616     /**
1617      * Starts all core threads, causing them to idly wait for work. This
1618      * overrides the default policy of starting core threads only when
1619      * new tasks are executed.
1620      *
1621      * @return the number of threads started
1622      */
prestartAllCoreThreads()1623     public int prestartAllCoreThreads() {
1624         int n = 0;
1625         while (addWorker(null, true))
1626             ++n;
1627         return n;
1628     }
1629 
1630     /**
1631      * Returns true if this pool allows core threads to time out and
1632      * terminate if no tasks arrive within the keepAlive time, being
1633      * replaced if needed when new tasks arrive. When true, the same
1634      * keep-alive policy applying to non-core threads applies also to
1635      * core threads. When false (the default), core threads are never
1636      * terminated due to lack of incoming tasks.
1637      *
1638      * @return {@code true} if core threads are allowed to time out,
1639      *         else {@code false}
1640      *
1641      * @since 1.6
1642      */
allowsCoreThreadTimeOut()1643     public boolean allowsCoreThreadTimeOut() {
1644         return allowCoreThreadTimeOut;
1645     }
1646 
1647     /**
1648      * Sets the policy governing whether core threads may time out and
1649      * terminate if no tasks arrive within the keep-alive time, being
1650      * replaced if needed when new tasks arrive. When false, core
1651      * threads are never terminated due to lack of incoming
1652      * tasks. When true, the same keep-alive policy applying to
1653      * non-core threads applies also to core threads. To avoid
1654      * continual thread replacement, the keep-alive time must be
1655      * greater than zero when setting {@code true}. This method
1656      * should in general be called before the pool is actively used.
1657      *
1658      * @param value {@code true} if should time out, else {@code false}
1659      * @throws IllegalArgumentException if value is {@code true}
1660      *         and the current keep-alive time is not greater than zero
1661      *
1662      * @since 1.6
1663      */
allowCoreThreadTimeOut(boolean value)1664     public void allowCoreThreadTimeOut(boolean value) {
1665         if (value && keepAliveTime <= 0)
1666             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
1667         if (value != allowCoreThreadTimeOut) {
1668             allowCoreThreadTimeOut = value;
1669             if (value)
1670                 interruptIdleWorkers();
1671         }
1672     }
1673 
1674     /**
1675      * Sets the maximum allowed number of threads. This overrides any
1676      * value set in the constructor. If the new value is smaller than
1677      * the current value, excess existing threads will be
1678      * terminated when they next become idle.
1679      *
1680      * @param maximumPoolSize the new maximum
1681      * @throws IllegalArgumentException if the new maximum is
1682      *         less than or equal to zero, or
1683      *         less than the {@linkplain #getCorePoolSize core pool size}
1684      * @see #getMaximumPoolSize
1685      */
setMaximumPoolSize(int maximumPoolSize)1686     public void setMaximumPoolSize(int maximumPoolSize) {
1687         if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
1688             throw new IllegalArgumentException();
1689         this.maximumPoolSize = maximumPoolSize;
1690         if (workerCountOf(ctl.get()) > maximumPoolSize)
1691             interruptIdleWorkers();
1692     }
1693 
1694     /**
1695      * Returns the maximum allowed number of threads.
1696      *
1697      * @return the maximum allowed number of threads
1698      * @see #setMaximumPoolSize
1699      */
getMaximumPoolSize()1700     public int getMaximumPoolSize() {
1701         return maximumPoolSize;
1702     }
1703 
1704     /**
1705      * Sets the thread keep-alive time, which is the amount of time
1706      * that threads may remain idle before being terminated.
1707      * Threads that wait this amount of time without processing a
1708      * task will be terminated if there are more than the core
1709      * number of threads currently in the pool, or if this pool
1710      * {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}.
1711      * This overrides any value set in the constructor.
1712      *
1713      * @param time the time to wait.  A time value of zero will cause
1714      *        excess threads to terminate immediately after executing tasks.
1715      * @param unit the time unit of the {@code time} argument
1716      * @throws IllegalArgumentException if {@code time} less than zero or
1717      *         if {@code time} is zero and {@code allowsCoreThreadTimeOut}
1718      * @see #getKeepAliveTime(TimeUnit)
1719      */
setKeepAliveTime(long time, TimeUnit unit)1720     public void setKeepAliveTime(long time, TimeUnit unit) {
1721         if (time < 0)
1722             throw new IllegalArgumentException();
1723         if (time == 0 && allowsCoreThreadTimeOut())
1724             throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
1725         long keepAliveTime = unit.toNanos(time);
1726         long delta = keepAliveTime - this.keepAliveTime;
1727         this.keepAliveTime = keepAliveTime;
1728         if (delta < 0)
1729             interruptIdleWorkers();
1730     }
1731 
1732     /**
1733      * Returns the thread keep-alive time, which is the amount of time
1734      * that threads may remain idle before being terminated.
1735      * Threads that wait this amount of time without processing a
1736      * task will be terminated if there are more than the core
1737      * number of threads currently in the pool, or if this pool
1738      * {@linkplain #allowsCoreThreadTimeOut() allows core thread timeout}.
1739      *
1740      * @param unit the desired time unit of the result
1741      * @return the time limit
1742      * @see #setKeepAliveTime(long, TimeUnit)
1743      */
getKeepAliveTime(TimeUnit unit)1744     public long getKeepAliveTime(TimeUnit unit) {
1745         return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
1746     }
1747 
1748     /* User-level queue utilities */
1749 
1750     /**
1751      * Returns the task queue used by this executor. Access to the
1752      * task queue is intended primarily for debugging and monitoring.
1753      * This queue may be in active use.  Retrieving the task queue
1754      * does not prevent queued tasks from executing.
1755      *
1756      * @return the task queue
1757      */
getQueue()1758     public BlockingQueue<Runnable> getQueue() {
1759         return workQueue;
1760     }
1761 
1762     /**
1763      * Removes this task from the executor's internal queue if it is
1764      * present, thus causing it not to be run if it has not already
1765      * started.
1766      *
1767      * <p>This method may be useful as one part of a cancellation
1768      * scheme.  It may fail to remove tasks that have been converted
1769      * into other forms before being placed on the internal queue.
1770      * For example, a task entered using {@code submit} might be
1771      * converted into a form that maintains {@code Future} status.
1772      * However, in such cases, method {@link #purge} may be used to
1773      * remove those Futures that have been cancelled.
1774      *
1775      * @param task the task to remove
1776      * @return {@code true} if the task was removed
1777      */
remove(Runnable task)1778     public boolean remove(Runnable task) {
1779         boolean removed = workQueue.remove(task);
1780         tryTerminate(); // In case SHUTDOWN and now empty
1781         return removed;
1782     }
1783 
1784     /**
1785      * Tries to remove from the work queue all {@link Future}
1786      * tasks that have been cancelled. This method can be useful as a
1787      * storage reclamation operation, that has no other impact on
1788      * functionality. Cancelled tasks are never executed, but may
1789      * accumulate in work queues until worker threads can actively
1790      * remove them. Invoking this method instead tries to remove them now.
1791      * However, this method may fail to remove tasks in
1792      * the presence of interference by other threads.
1793      */
purge()1794     public void purge() {
1795         final BlockingQueue<Runnable> q = workQueue;
1796         try {
1797             Iterator<Runnable> it = q.iterator();
1798             while (it.hasNext()) {
1799                 Runnable r = it.next();
1800                 if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
1801                     it.remove();
1802             }
1803         } catch (ConcurrentModificationException fallThrough) {
1804             // Take slow path if we encounter interference during traversal.
1805             // Make copy for traversal and call remove for cancelled entries.
1806             // The slow path is more likely to be O(N*N).
1807             for (Object r : q.toArray())
1808                 if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
1809                     q.remove(r);
1810         }
1811 
1812         tryTerminate(); // In case SHUTDOWN and now empty
1813     }
1814 
1815     /* Statistics */
1816 
1817     /**
1818      * Returns the current number of threads in the pool.
1819      *
1820      * @return the number of threads
1821      */
getPoolSize()1822     public int getPoolSize() {
1823         final ReentrantLock mainLock = this.mainLock;
1824         mainLock.lock();
1825         try {
1826             // Remove rare and surprising possibility of
1827             // isTerminated() && getPoolSize() > 0
1828             return runStateAtLeast(ctl.get(), TIDYING) ? 0
1829                 : workers.size();
1830         } finally {
1831             mainLock.unlock();
1832         }
1833     }
1834 
1835     /**
1836      * Returns the approximate number of threads that are actively
1837      * executing tasks.
1838      *
1839      * @return the number of threads
1840      */
getActiveCount()1841     public int getActiveCount() {
1842         final ReentrantLock mainLock = this.mainLock;
1843         mainLock.lock();
1844         try {
1845             int n = 0;
1846             for (Worker w : workers)
1847                 if (w.isLocked())
1848                     ++n;
1849             return n;
1850         } finally {
1851             mainLock.unlock();
1852         }
1853     }
1854 
1855     /**
1856      * Returns the largest number of threads that have ever
1857      * simultaneously been in the pool.
1858      *
1859      * @return the number of threads
1860      */
getLargestPoolSize()1861     public int getLargestPoolSize() {
1862         final ReentrantLock mainLock = this.mainLock;
1863         mainLock.lock();
1864         try {
1865             return largestPoolSize;
1866         } finally {
1867             mainLock.unlock();
1868         }
1869     }
1870 
1871     /**
1872      * Returns the approximate total number of tasks that have ever been
1873      * scheduled for execution. Because the states of tasks and
1874      * threads may change dynamically during computation, the returned
1875      * value is only an approximation.
1876      *
1877      * @return the number of tasks
1878      */
getTaskCount()1879     public long getTaskCount() {
1880         final ReentrantLock mainLock = this.mainLock;
1881         mainLock.lock();
1882         try {
1883             long n = completedTaskCount;
1884             for (Worker w : workers) {
1885                 n += w.completedTasks;
1886                 if (w.isLocked())
1887                     ++n;
1888             }
1889             return n + workQueue.size();
1890         } finally {
1891             mainLock.unlock();
1892         }
1893     }
1894 
1895     /**
1896      * Returns the approximate total number of tasks that have
1897      * completed execution. Because the states of tasks and threads
1898      * may change dynamically during computation, the returned value
1899      * is only an approximation, but one that does not ever decrease
1900      * across successive calls.
1901      *
1902      * @return the number of tasks
1903      */
getCompletedTaskCount()1904     public long getCompletedTaskCount() {
1905         final ReentrantLock mainLock = this.mainLock;
1906         mainLock.lock();
1907         try {
1908             long n = completedTaskCount;
1909             for (Worker w : workers)
1910                 n += w.completedTasks;
1911             return n;
1912         } finally {
1913             mainLock.unlock();
1914         }
1915     }
1916 
1917     /**
1918      * Returns a string identifying this pool, as well as its state,
1919      * including indications of run state and estimated worker and
1920      * task counts.
1921      *
1922      * @return a string identifying this pool, as well as its state
1923      */
toString()1924     public String toString() {
1925         long ncompleted;
1926         int nworkers, nactive;
1927         final ReentrantLock mainLock = this.mainLock;
1928         mainLock.lock();
1929         try {
1930             ncompleted = completedTaskCount;
1931             nactive = 0;
1932             nworkers = workers.size();
1933             for (Worker w : workers) {
1934                 ncompleted += w.completedTasks;
1935                 if (w.isLocked())
1936                     ++nactive;
1937             }
1938         } finally {
1939             mainLock.unlock();
1940         }
1941         int c = ctl.get();
1942         String runState =
1943             runStateLessThan(c, SHUTDOWN) ? "Running" :
1944             runStateAtLeast(c, TERMINATED) ? "Terminated" :
1945             "Shutting down";
1946         return super.toString() +
1947             "[" + runState +
1948             ", pool size = " + nworkers +
1949             ", active threads = " + nactive +
1950             ", queued tasks = " + workQueue.size() +
1951             ", completed tasks = " + ncompleted +
1952             "]";
1953     }
1954 
1955     /* Extension hooks */
1956 
1957     /**
1958      * Method invoked prior to executing the given Runnable in the
1959      * given thread.  This method is invoked by thread {@code t} that
1960      * will execute task {@code r}, and may be used to re-initialize
1961      * ThreadLocals, or to perform logging.
1962      *
1963      * <p>This implementation does nothing, but may be customized in
1964      * subclasses. Note: To properly nest multiple overridings, subclasses
1965      * should generally invoke {@code super.beforeExecute} at the end of
1966      * this method.
1967      *
1968      * @param t the thread that will run task {@code r}
1969      * @param r the task that will be executed
1970      */
beforeExecute(Thread t, Runnable r)1971     protected void beforeExecute(Thread t, Runnable r) { }
1972 
1973     /**
1974      * Method invoked upon completion of execution of the given Runnable.
1975      * This method is invoked by the thread that executed the task. If
1976      * non-null, the Throwable is the uncaught {@code RuntimeException}
1977      * or {@code Error} that caused execution to terminate abruptly.
1978      *
1979      * <p>This implementation does nothing, but may be customized in
1980      * subclasses. Note: To properly nest multiple overridings, subclasses
1981      * should generally invoke {@code super.afterExecute} at the
1982      * beginning of this method.
1983      *
1984      * <p><b>Note:</b> When actions are enclosed in tasks (such as
1985      * {@link FutureTask}) either explicitly or via methods such as
1986      * {@code submit}, these task objects catch and maintain
1987      * computational exceptions, and so they do not cause abrupt
1988      * termination, and the internal exceptions are <em>not</em>
1989      * passed to this method. If you would like to trap both kinds of
1990      * failures in this method, you can further probe for such cases,
1991      * as in this sample subclass that prints either the direct cause
1992      * or the underlying exception if a task has been aborted:
1993      *
1994      * <pre> {@code
1995      * class ExtendedExecutor extends ThreadPoolExecutor {
1996      *   // ...
1997      *   protected void afterExecute(Runnable r, Throwable t) {
1998      *     super.afterExecute(r, t);
1999      *     if (t == null
2000      *         && r instanceof Future<?>
2001      *         && ((Future<?>)r).isDone()) {
2002      *       try {
2003      *         Object result = ((Future<?>) r).get();
2004      *       } catch (CancellationException ce) {
2005      *         t = ce;
2006      *       } catch (ExecutionException ee) {
2007      *         t = ee.getCause();
2008      *       } catch (InterruptedException ie) {
2009      *         // ignore/reset
2010      *         Thread.currentThread().interrupt();
2011      *       }
2012      *     }
2013      *     if (t != null)
2014      *       System.out.println(t);
2015      *   }
2016      * }}</pre>
2017      *
2018      * @param r the runnable that has completed
2019      * @param t the exception that caused termination, or null if
2020      * execution completed normally
2021      */
afterExecute(Runnable r, Throwable t)2022     protected void afterExecute(Runnable r, Throwable t) { }
2023 
2024     /**
2025      * Method invoked when the Executor has terminated.  Default
2026      * implementation does nothing. Note: To properly nest multiple
2027      * overridings, subclasses should generally invoke
2028      * {@code super.terminated} within this method.
2029      */
terminated()2030     protected void terminated() { }
2031 
2032     /* Predefined RejectedExecutionHandlers */
2033 
2034     /**
2035      * A handler for rejected tasks that runs the rejected task
2036      * directly in the calling thread of the {@code execute} method,
2037      * unless the executor has been shut down, in which case the task
2038      * is discarded.
2039      */
2040     public static class CallerRunsPolicy implements RejectedExecutionHandler {
2041         /**
2042          * Creates a {@code CallerRunsPolicy}.
2043          */
CallerRunsPolicy()2044         public CallerRunsPolicy() { }
2045 
2046         /**
2047          * Executes task r in the caller's thread, unless the executor
2048          * has been shut down, in which case the task is discarded.
2049          *
2050          * @param r the runnable task requested to be executed
2051          * @param e the executor attempting to execute this task
2052          */
rejectedExecution(Runnable r, ThreadPoolExecutor e)2053         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2054             if (!e.isShutdown()) {
2055                 r.run();
2056             }
2057         }
2058     }
2059 
2060     /**
2061      * A handler for rejected tasks that throws a
2062      * {@code RejectedExecutionException}.
2063      */
2064     public static class AbortPolicy implements RejectedExecutionHandler {
2065         /**
2066          * Creates an {@code AbortPolicy}.
2067          */
AbortPolicy()2068         public AbortPolicy() { }
2069 
2070         /**
2071          * Always throws RejectedExecutionException.
2072          *
2073          * @param r the runnable task requested to be executed
2074          * @param e the executor attempting to execute this task
2075          * @throws RejectedExecutionException always
2076          */
rejectedExecution(Runnable r, ThreadPoolExecutor e)2077         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2078             throw new RejectedExecutionException("Task " + r.toString() +
2079                                                  " rejected from " +
2080                                                  e.toString());
2081         }
2082     }
2083 
2084     /**
2085      * A handler for rejected tasks that silently discards the
2086      * rejected task.
2087      */
2088     public static class DiscardPolicy implements RejectedExecutionHandler {
2089         /**
2090          * Creates a {@code DiscardPolicy}.
2091          */
DiscardPolicy()2092         public DiscardPolicy() { }
2093 
2094         /**
2095          * Does nothing, which has the effect of discarding task r.
2096          *
2097          * @param r the runnable task requested to be executed
2098          * @param e the executor attempting to execute this task
2099          */
rejectedExecution(Runnable r, ThreadPoolExecutor e)2100         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2101         }
2102     }
2103 
2104     /**
2105      * A handler for rejected tasks that discards the oldest unhandled
2106      * request and then retries {@code execute}, unless the executor
2107      * is shut down, in which case the task is discarded.
2108      */
2109     public static class DiscardOldestPolicy implements RejectedExecutionHandler {
2110         /**
2111          * Creates a {@code DiscardOldestPolicy} for the given executor.
2112          */
DiscardOldestPolicy()2113         public DiscardOldestPolicy() { }
2114 
2115         /**
2116          * Obtains and ignores the next task that the executor
2117          * would otherwise execute, if one is immediately available,
2118          * and then retries execution of task r, unless the executor
2119          * is shut down, in which case task r is instead discarded.
2120          *
2121          * @param r the runnable task requested to be executed
2122          * @param e the executor attempting to execute this task
2123          */
rejectedExecution(Runnable r, ThreadPoolExecutor e)2124         public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
2125             if (!e.isShutdown()) {
2126                 e.getQueue().poll();
2127                 e.execute(r);
2128             }
2129         }
2130     }
2131 }
2132