• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.lang.Thread.UncaughtExceptionHandler;
39 import java.lang.invoke.MethodHandles;
40 import java.lang.invoke.VarHandle;
41 import java.security.AccessController;
42 import java.security.AccessControlContext;
43 import java.security.Permission;
44 import java.security.Permissions;
45 import java.security.PrivilegedAction;
46 import java.security.ProtectionDomain;
47 import java.util.ArrayList;
48 import java.util.Collection;
49 import java.util.Collections;
50 import java.util.List;
51 import java.util.function.Predicate;
52 import java.util.concurrent.locks.LockSupport;
53 
54 /**
55  * An {@link ExecutorService} for running {@link ForkJoinTask}s.
56  * A {@code ForkJoinPool} provides the entry point for submissions
57  * from non-{@code ForkJoinTask} clients, as well as management and
58  * monitoring operations.
59  *
60  * <p>A {@code ForkJoinPool} differs from other kinds of {@link
61  * ExecutorService} mainly by virtue of employing
62  * <em>work-stealing</em>: all threads in the pool attempt to find and
63  * execute tasks submitted to the pool and/or created by other active
64  * tasks (eventually blocking waiting for work if none exist). This
65  * enables efficient processing when most tasks spawn other subtasks
66  * (as do most {@code ForkJoinTask}s), as well as when many small
67  * tasks are submitted to the pool from external clients.  Especially
68  * when setting <em>asyncMode</em> to true in constructors, {@code
69  * ForkJoinPool}s may also be appropriate for use with event-style
70  * tasks that are never joined. All worker threads are initialized
71  * with {@link Thread#isDaemon} set {@code true}.
72  *
73  * <p>A static {@link #commonPool()} is available and appropriate for
74  * most applications. The common pool is used by any ForkJoinTask that
75  * is not explicitly submitted to a specified pool. Using the common
76  * pool normally reduces resource usage (its threads are slowly
77  * reclaimed during periods of non-use, and reinstated upon subsequent
78  * use).
79  *
80  * <p>For applications that require separate or custom pools, a {@code
81  * ForkJoinPool} may be constructed with a given target parallelism
82  * level; by default, equal to the number of available processors.
83  * The pool attempts to maintain enough active (or available) threads
84  * by dynamically adding, suspending, or resuming internal worker
85  * threads, even if some tasks are stalled waiting to join others.
86  * However, no such adjustments are guaranteed in the face of blocked
87  * I/O or other unmanaged synchronization. The nested {@link
88  * ManagedBlocker} interface enables extension of the kinds of
89  * synchronization accommodated. The default policies may be
90  * overridden using a constructor with parameters corresponding to
91  * those documented in class {@link ThreadPoolExecutor}.
92  *
93  * <p>In addition to execution and lifecycle control methods, this
94  * class provides status check methods (for example
95  * {@link #getStealCount}) that are intended to aid in developing,
96  * tuning, and monitoring fork/join applications. Also, method
97  * {@link #toString} returns indications of pool state in a
98  * convenient form for informal monitoring.
99  *
100  * <p>As is the case with other ExecutorServices, there are three
101  * main task execution methods summarized in the following table.
102  * These are designed to be used primarily by clients not already
103  * engaged in fork/join computations in the current pool.  The main
104  * forms of these methods accept instances of {@code ForkJoinTask},
105  * but overloaded forms also allow mixed execution of plain {@code
106  * Runnable}- or {@code Callable}- based activities as well.  However,
107  * tasks that are already executing in a pool should normally instead
108  * use the within-computation forms listed in the table unless using
109  * async event-style tasks that are not usually joined, in which case
110  * there is little difference among choice of methods.
111  *
112  * <table class="plain">
113  * <caption>Summary of task execution methods</caption>
114  *  <tr>
115  *    <td></td>
116  *    <th scope="col"> Call from non-fork/join clients</th>
117  *    <th scope="col"> Call from within fork/join computations</th>
118  *  </tr>
119  *  <tr>
120  *    <th scope="row" style="text-align:left"> Arrange async execution</th>
121  *    <td> {@link #execute(ForkJoinTask)}</td>
122  *    <td> {@link ForkJoinTask#fork}</td>
123  *  </tr>
124  *  <tr>
125  *    <th scope="row" style="text-align:left"> Await and obtain result</th>
126  *    <td> {@link #invoke(ForkJoinTask)}</td>
127  *    <td> {@link ForkJoinTask#invoke}</td>
128  *  </tr>
129  *  <tr>
130  *    <th scope="row" style="text-align:left"> Arrange exec and obtain Future</th>
131  *    <td> {@link #submit(ForkJoinTask)}</td>
132  *    <td> {@link ForkJoinTask#fork} (ForkJoinTasks <em>are</em> Futures)</td>
133  *  </tr>
134  * </table>
135  *
136  * <p>The parameters used to construct the common pool may be controlled by
137  * setting the following {@linkplain System#getProperty system properties}:
138  * <ul>
139  * <li>{@code java.util.concurrent.ForkJoinPool.common.parallelism}
140  * - the parallelism level, a non-negative integer
141  * <li>{@code java.util.concurrent.ForkJoinPool.common.threadFactory}
142  * - the class name of a {@link ForkJoinWorkerThreadFactory}.
143  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
144  * is used to load this class.
145  * <li>{@code java.util.concurrent.ForkJoinPool.common.exceptionHandler}
146  * - the class name of a {@link UncaughtExceptionHandler}.
147  * The {@linkplain ClassLoader#getSystemClassLoader() system class loader}
148  * is used to load this class.
149  * <li>{@code java.util.concurrent.ForkJoinPool.common.maximumSpares}
150  * - the maximum number of allowed extra threads to maintain target
151  * parallelism (default 256).
152  * </ul>
153  * If no thread factory is supplied via a system property, then the
154  * common pool uses a factory that uses the system class loader as the
155  * {@linkplain Thread#getContextClassLoader() thread context class loader}.
156  * In addition, if a {@link SecurityManager} is present, then
157  * the common pool uses a factory supplying threads that have no
158  * {@link Permissions} enabled.
159  *
160  * Upon any error in establishing these settings, default parameters
161  * are used. It is possible to disable or limit the use of threads in
162  * the common pool by setting the parallelism property to zero, and/or
163  * using a factory that may return {@code null}. However doing so may
164  * cause unjoined tasks to never be executed.
165  *
166  * <p><b>Implementation notes</b>: This implementation restricts the
167  * maximum number of running threads to 32767. Attempts to create
168  * pools with greater than the maximum number result in
169  * {@code IllegalArgumentException}.
170  *
171  * <p>This implementation rejects submitted tasks (that is, by throwing
172  * {@link RejectedExecutionException}) only when the pool is shut down
173  * or internal resources have been exhausted.
174  *
175  * @since 1.7
176  * @author Doug Lea
177  */
178 @jdk.internal.vm.annotation.Contended
179 public class ForkJoinPool extends AbstractExecutorService {
180 
181     /*
182      * Implementation Overview
183      *
184      * This class and its nested classes provide the main
185      * functionality and control for a set of worker threads:
186      * Submissions from non-FJ threads enter into submission queues.
187      * Workers take these tasks and typically split them into subtasks
188      * that may be stolen by other workers. Work-stealing based on
189      * randomized scans generally leads to better throughput than
190      * "work dealing" in which producers assign tasks to idle threads,
191      * in part because threads that have finished other tasks before
192      * the signalled thread wakes up (which can be a long time) can
193      * take the task instead.  Preference rules give first priority to
194      * processing tasks from their own queues (LIFO or FIFO, depending
195      * on mode), then to randomized FIFO steals of tasks in other
196      * queues.  This framework began as vehicle for supporting
197      * tree-structured parallelism using work-stealing.  Over time,
198      * its scalability advantages led to extensions and changes to
199      * better support more diverse usage contexts.  Because most
200      * internal methods and nested classes are interrelated, their
201      * main rationale and descriptions are presented here; individual
202      * methods and nested classes contain only brief comments about
203      * details.
204      *
205      * WorkQueues
206      * ==========
207      *
208      * Most operations occur within work-stealing queues (in nested
209      * class WorkQueue).  These are special forms of Deques that
210      * support only three of the four possible end-operations -- push,
211      * pop, and poll (aka steal), under the further constraints that
212      * push and pop are called only from the owning thread (or, as
213      * extended here, under a lock), while poll may be called from
214      * other threads.  (If you are unfamiliar with them, you probably
215      * want to read Herlihy and Shavit's book "The Art of
216      * Multiprocessor programming", chapter 16 describing these in
217      * more detail before proceeding.)  The main work-stealing queue
218      * design is roughly similar to those in the papers "Dynamic
219      * Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
220      * (http://research.sun.com/scalable/pubs/index.html) and
221      * "Idempotent work stealing" by Michael, Saraswat, and Vechev,
222      * PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
223      * The main differences ultimately stem from GC requirements that
224      * we null out taken slots as soon as we can, to maintain as small
225      * a footprint as possible even in programs generating huge
226      * numbers of tasks. To accomplish this, we shift the CAS
227      * arbitrating pop vs poll (steal) from being on the indices
228      * ("base" and "top") to the slots themselves.
229      *
230      * Adding tasks then takes the form of a classic array push(task)
231      * in a circular buffer:
232      *    q.array[q.top++ % length] = task;
233      *
234      * (The actual code needs to null-check and size-check the array,
235      * uses masking, not mod, for indexing a power-of-two-sized array,
236      * adds a release fence for publication, and possibly signals
237      * waiting workers to start scanning -- see below.)  Both a
238      * successful pop and poll mainly entail a CAS of a slot from
239      * non-null to null.
240      *
241      * The pop operation (always performed by owner) is:
242      *   if ((the task at top slot is not null) and
243      *        (CAS slot to null))
244      *           decrement top and return task;
245      *
246      * And the poll operation (usually by a stealer) is
247      *    if ((the task at base slot is not null) and
248      *        (CAS slot to null))
249      *           increment base and return task;
250      *
251      * There are several variants of each of these. Most uses occur
252      * within operations that also interleave contention or emptiness
253      * tracking or inspection of elements before extracting them, so
254      * must interleave these with the above code. When performed by
255      * owner, getAndSet is used instead of CAS (see for example method
256      * nextLocalTask) which is usually more efficient, and possible
257      * because the top index cannot independently change during the
258      * operation.
259      *
260      * Memory ordering.  See "Correct and Efficient Work-Stealing for
261      * Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
262      * (http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
263      * analysis of memory ordering requirements in work-stealing
264      * algorithms similar to (but different than) the one used here.
265      * Extracting tasks in array slots via (fully fenced) CAS provides
266      * primary synchronization. The base and top indices imprecisely
267      * guide where to extract from. We do not usually require strict
268      * orderings of array and index updates. Many index accesses use
269      * plain mode, with ordering constrained by surrounding context
270      * (usually with respect to element CASes or the two WorkQueue
271      * volatile fields source and phase). When not otherwise already
272      * constrained, reads of "base" by queue owners use acquire-mode,
273      * and some externally callable methods preface accesses with
274      * acquire fences.  Additionally, to ensure that index update
275      * writes are not coalesced or postponed in loops etc, "opaque"
276      * mode is used in a few cases where timely writes are not
277      * otherwise ensured. The "locked" versions of push- and pop-
278      * based methods for shared queues differ from owned versions
279      * because locking already forces some of the ordering.
280      *
281      * Because indices and slot contents cannot always be consistent,
282      * a check that base == top indicates (momentary) emptiness, but
283      * otherwise may err on the side of possibly making the queue
284      * appear nonempty when a push, pop, or poll have not fully
285      * committed, or making it appear empty when an update of top has
286      * not yet been visibly written.  (Method isEmpty() checks the
287      * case of a partially completed removal of the last element.)
288      * Because of this, the poll operation, considered individually,
289      * is not wait-free. One thief cannot successfully continue until
290      * another in-progress one (or, if previously empty, a push)
291      * visibly completes.  This can stall threads when required to
292      * consume from a given queue (see method poll()).  However, in
293      * the aggregate, we ensure at least probabilistic
294      * non-blockingness.  If an attempted steal fails, a scanning
295      * thief chooses a different random victim target to try next. So,
296      * in order for one thief to progress, it suffices for any
297      * in-progress poll or new push on any empty queue to complete.
298      *
299      * This approach also enables support of a user mode in which
300      * local task processing is in FIFO, not LIFO order, simply by
301      * using poll rather than pop.  This can be useful in
302      * message-passing frameworks in which tasks are never joined.
303      *
304      * WorkQueues are also used in a similar way for tasks submitted
305      * to the pool. We cannot mix these tasks in the same queues used
306      * by workers. Instead, we randomly associate submission queues
307      * with submitting threads, using a form of hashing.  The
308      * ThreadLocalRandom probe value serves as a hash code for
309      * choosing existing queues, and may be randomly repositioned upon
310      * contention with other submitters.  In essence, submitters act
311      * like workers except that they are restricted to executing local
312      * tasks that they submitted.  Insertion of tasks in shared mode
313      * requires a lock but we use only a simple spinlock (using field
314      * phase), because submitters encountering a busy queue move to a
315      * different position to use or create other queues -- they block
316      * only when creating and registering new queues. Because it is
317      * used only as a spinlock, unlocking requires only a "releasing"
318      * store (using setRelease) unless otherwise signalling.
319      *
320      * Management
321      * ==========
322      *
323      * The main throughput advantages of work-stealing stem from
324      * decentralized control -- workers mostly take tasks from
325      * themselves or each other, at rates that can exceed a billion
326      * per second.  The pool itself creates, activates (enables
327      * scanning for and running tasks), deactivates, blocks, and
328      * terminates threads, all with minimal central information.
329      * There are only a few properties that we can globally track or
330      * maintain, so we pack them into a small number of variables,
331      * often maintaining atomicity without blocking or locking.
332      * Nearly all essentially atomic control state is held in a few
333      * volatile variables that are by far most often read (not
334      * written) as status and consistency checks. We pack as much
335      * information into them as we can.
336      *
337      * Field "ctl" contains 64 bits holding information needed to
338      * atomically decide to add, enqueue (on an event queue), and
339      * dequeue and release workers.  To enable this packing, we
340      * restrict maximum parallelism to (1<<15)-1 (which is far in
341      * excess of normal operating range) to allow ids, counts, and
342      * their negations (used for thresholding) to fit into 16bit
343      * subfields.
344      *
345      * Field "mode" holds configuration parameters as well as lifetime
346      * status, atomically and monotonically setting SHUTDOWN, STOP,
347      * and finally TERMINATED bits.
348      *
349      * Field "workQueues" holds references to WorkQueues.  It is
350      * updated (only during worker creation and termination) under
351      * lock (using field workerNamePrefix as lock), but is otherwise
352      * concurrently readable, and accessed directly. We also ensure
353      * that uses of the array reference itself never become too stale
354      * in case of resizing, by arranging that (re-)reads are separated
355      * by at least one acquiring read access.  To simplify index-based
356      * operations, the array size is always a power of two, and all
357      * readers must tolerate null slots. Worker queues are at odd
358      * indices. Shared (submission) queues are at even indices, up to
359      * a maximum of 64 slots, to limit growth even if the array needs
360      * to expand to add more workers. Grouping them together in this
361      * way simplifies and speeds up task scanning.
362      *
363      * All worker thread creation is on-demand, triggered by task
364      * submissions, replacement of terminated workers, and/or
365      * compensation for blocked workers. However, all other support
366      * code is set up to work with other policies.  To ensure that we
367      * do not hold on to worker references that would prevent GC, all
368      * accesses to workQueues are via indices into the workQueues
369      * array (which is one source of some of the messy code
370      * constructions here). In essence, the workQueues array serves as
371      * a weak reference mechanism. Thus for example the stack top
372      * subfield of ctl stores indices, not references.
373      *
374      * Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
375      * cannot let workers spin indefinitely scanning for tasks when
376      * none can be found immediately, and we cannot start/resume
377      * workers unless there appear to be tasks available.  On the
378      * other hand, we must quickly prod them into action when new
379      * tasks are submitted or generated. In many usages, ramp-up time
380      * is the main limiting factor in overall performance, which is
381      * compounded at program start-up by JIT compilation and
382      * allocation. So we streamline this as much as possible.
383      *
384      * The "ctl" field atomically maintains total worker and
385      * "released" worker counts, plus the head of the available worker
386      * queue (actually stack, represented by the lower 32bit subfield
387      * of ctl).  Released workers are those known to be scanning for
388      * and/or running tasks. Unreleased ("available") workers are
389      * recorded in the ctl stack. These workers are made available for
390      * signalling by enqueuing in ctl (see method runWorker).  The
391      * "queue" is a form of Treiber stack. This is ideal for
392      * activating threads in most-recently used order, and improves
393      * performance and locality, outweighing the disadvantages of
394      * being prone to contention and inability to release a worker
395      * unless it is topmost on stack.  To avoid missed signal problems
396      * inherent in any wait/signal design, available workers rescan
397      * for (and if found run) tasks after enqueuing.  Normally their
398      * release status will be updated while doing so, but the released
399      * worker ctl count may underestimate the number of active
400      * threads. (However, it is still possible to determine quiescence
401      * via a validation traversal -- see isQuiescent).  After an
402      * unsuccessful rescan, available workers are blocked until
403      * signalled (see signalWork).  The top stack state holds the
404      * value of the "phase" field of the worker: its index and status,
405      * plus a version counter that, in addition to the count subfields
406      * (also serving as version stamps) provide protection against
407      * Treiber stack ABA effects.
408      *
409      * Creating workers. To create a worker, we pre-increment counts
410      * (serving as a reservation), and attempt to construct a
411      * ForkJoinWorkerThread via its factory. Upon construction, the
412      * new thread invokes registerWorker, where it constructs a
413      * WorkQueue and is assigned an index in the workQueues array
414      * (expanding the array if necessary). The thread is then started.
415      * Upon any exception across these steps, or null return from
416      * factory, deregisterWorker adjusts counts and records
417      * accordingly.  If a null return, the pool continues running with
418      * fewer than the target number workers. If exceptional, the
419      * exception is propagated, generally to some external caller.
420      * Worker index assignment avoids the bias in scanning that would
421      * occur if entries were sequentially packed starting at the front
422      * of the workQueues array. We treat the array as a simple
423      * power-of-two hash table, expanding as needed. The seedIndex
424      * increment ensures no collisions until a resize is needed or a
425      * worker is deregistered and replaced, and thereafter keeps
426      * probability of collision low. We cannot use
427      * ThreadLocalRandom.getProbe() for similar purposes here because
428      * the thread has not started yet, but do so for creating
429      * submission queues for existing external threads (see
430      * externalPush).
431      *
432      * WorkQueue field "phase" is used by both workers and the pool to
433      * manage and track whether a worker is UNSIGNALLED (possibly
434      * blocked waiting for a signal).  When a worker is enqueued its
435      * phase field is set. Note that phase field updates lag queue CAS
436      * releases so usage requires care -- seeing a negative phase does
437      * not guarantee that the worker is available. When queued, the
438      * lower 16 bits of scanState must hold its pool index. So we
439      * place the index there upon initialization and otherwise keep it
440      * there or restore it when necessary.
441      *
442      * The ctl field also serves as the basis for memory
443      * synchronization surrounding activation. This uses a more
444      * efficient version of a Dekker-like rule that task producers and
445      * consumers sync with each other by both writing/CASing ctl (even
446      * if to its current value).  This would be extremely costly. So
447      * we relax it in several ways: (1) Producers only signal when
448      * their queue is possibly empty at some point during a push
449      * operation (which requires conservatively checking size zero or
450      * one to cover races). (2) Other workers propagate this signal
451      * when they find tasks in a queue with size greater than one. (3)
452      * Workers only enqueue after scanning (see below) and not finding
453      * any tasks.  (4) Rather than CASing ctl to its current value in
454      * the common case where no action is required, we reduce write
455      * contention by equivalently prefacing signalWork when called by
456      * an external task producer using a memory access with
457      * full-volatile semantics or a "fullFence".
458      *
459      * Almost always, too many signals are issued, in part because a
460      * task producer cannot tell if some existing worker is in the
461      * midst of finishing one task (or already scanning) and ready to
462      * take another without being signalled. So the producer might
463      * instead activate a different worker that does not find any
464      * work, and then inactivates. This scarcely matters in
465      * steady-state computations involving all workers, but can create
466      * contention and bookkeeping bottlenecks during ramp-up,
467      * ramp-down, and small computations involving only a few workers.
468      *
469      * Scanning. Method scan (from runWorker) performs top-level
470      * scanning for tasks. (Similar scans appear in helpQuiesce and
471      * pollScan.)  Each scan traverses and tries to poll from each
472      * queue starting at a random index. Scans are not performed in
473      * ideal random permutation order, to reduce cacheline
474      * contention. The pseudorandom generator need not have
475      * high-quality statistical properties in the long term, but just
476      * within computations; We use Marsaglia XorShifts (often via
477      * ThreadLocalRandom.nextSecondarySeed), which are cheap and
478      * suffice. Scanning also includes contention reduction: When
479      * scanning workers fail to extract an apparently existing task,
480      * they soon restart at a different pseudorandom index.  This form
481      * of backoff improves throughput when many threads are trying to
482      * take tasks from few queues, which can be common in some usages.
483      * Scans do not otherwise explicitly take into account core
484      * affinities, loads, cache localities, etc, However, they do
485      * exploit temporal locality (which usually approximates these) by
486      * preferring to re-poll from the same queue after a successful
487      * poll before trying others (see method topLevelExec). However
488      * this preference is bounded (see TOP_BOUND_SHIFT) as a safeguard
489      * against infinitely unfair looping under unbounded user task
490      * recursion, and also to reduce long-term contention when many
491      * threads poll few queues holding many small tasks. The bound is
492      * high enough to avoid much impact on locality and scheduling
493      * overhead.
494      *
495      * Trimming workers. To release resources after periods of lack of
496      * use, a worker starting to wait when the pool is quiescent will
497      * time out and terminate (see method runWorker) if the pool has
498      * remained quiescent for period given by field keepAlive.
499      *
500      * Shutdown and Termination. A call to shutdownNow invokes
501      * tryTerminate to atomically set a runState bit. The calling
502      * thread, as well as every other worker thereafter terminating,
503      * helps terminate others by cancelling their unprocessed tasks,
504      * and waking them up, doing so repeatedly until stable. Calls to
505      * non-abrupt shutdown() preface this by checking whether
506      * termination should commence by sweeping through queues (until
507      * stable) to ensure lack of in-flight submissions and workers
508      * about to process them before triggering the "STOP" phase of
509      * termination.
510      *
511      * Joining Tasks
512      * =============
513      *
514      * Any of several actions may be taken when one worker is waiting
515      * to join a task stolen (or always held) by another.  Because we
516      * are multiplexing many tasks on to a pool of workers, we can't
517      * always just let them block (as in Thread.join).  We also cannot
518      * just reassign the joiner's run-time stack with another and
519      * replace it later, which would be a form of "continuation", that
520      * even if possible is not necessarily a good idea since we may
521      * need both an unblocked task and its continuation to progress.
522      * Instead we combine two tactics:
523      *
524      *   Helping: Arranging for the joiner to execute some task that it
525      *      would be running if the steal had not occurred.
526      *
527      *   Compensating: Unless there are already enough live threads,
528      *      method tryCompensate() may create or re-activate a spare
529      *      thread to compensate for blocked joiners until they unblock.
530      *
531      * A third form (implemented in tryRemoveAndExec) amounts to
532      * helping a hypothetical compensator: If we can readily tell that
533      * a possible action of a compensator is to steal and execute the
534      * task being joined, the joining thread can do so directly,
535      * without the need for a compensation thread.
536      *
537      * The ManagedBlocker extension API can't use helping so relies
538      * only on compensation in method awaitBlocker.
539      *
540      * The algorithm in awaitJoin entails a form of "linear helping".
541      * Each worker records (in field source) the id of the queue from
542      * which it last stole a task.  The scan in method awaitJoin uses
543      * these markers to try to find a worker to help (i.e., steal back
544      * a task from and execute it) that could hasten completion of the
545      * actively joined task.  Thus, the joiner executes a task that
546      * would be on its own local deque if the to-be-joined task had
547      * not been stolen. This is a conservative variant of the approach
548      * described in Wagner & Calder "Leapfrogging: a portable
549      * technique for implementing efficient futures" SIGPLAN Notices,
550      * 1993 (http://portal.acm.org/citation.cfm?id=155354). It differs
551      * mainly in that we only record queue ids, not full dependency
552      * links.  This requires a linear scan of the workQueues array to
553      * locate stealers, but isolates cost to when it is needed, rather
554      * than adding to per-task overhead. Searches can fail to locate
555      * stealers GC stalls and the like delay recording sources.
556      * Further, even when accurately identified, stealers might not
557      * ever produce a task that the joiner can in turn help with. So,
558      * compensation is tried upon failure to find tasks to run.
559      *
560      * Compensation does not by default aim to keep exactly the target
561      * parallelism number of unblocked threads running at any given
562      * time. Some previous versions of this class employed immediate
563      * compensations for any blocked join. However, in practice, the
564      * vast majority of blockages are transient byproducts of GC and
565      * other JVM or OS activities that are made worse by replacement
566      * when they cause longer-term oversubscription.  Rather than
567      * impose arbitrary policies, we allow users to override the
568      * default of only adding threads upon apparent starvation.  The
569      * compensation mechanism may also be bounded.  Bounds for the
570      * commonPool (see COMMON_MAX_SPARES) better enable JVMs to cope
571      * with programming errors and abuse before running out of
572      * resources to do so.
573      *
574      * Common Pool
575      * ===========
576      *
577      * The static common pool always exists after static
578      * initialization.  Since it (or any other created pool) need
579      * never be used, we minimize initial construction overhead and
580      * footprint to the setup of about a dozen fields.
581      *
582      * When external threads submit to the common pool, they can
583      * perform subtask processing (see externalHelpComplete and
584      * related methods) upon joins.  This caller-helps policy makes it
585      * sensible to set common pool parallelism level to one (or more)
586      * less than the total number of available cores, or even zero for
587      * pure caller-runs.  We do not need to record whether external
588      * submissions are to the common pool -- if not, external help
589      * methods return quickly. These submitters would otherwise be
590      * blocked waiting for completion, so the extra effort (with
591      * liberally sprinkled task status checks) in inapplicable cases
592      * amounts to an odd form of limited spin-wait before blocking in
593      * ForkJoinTask.join.
594      *
595      * As a more appropriate default in managed environments, unless
596      * overridden by system properties, we use workers of subclass
597      * InnocuousForkJoinWorkerThread when there is a SecurityManager
598      * present. These workers have no permissions set, do not belong
599      * to any user-defined ThreadGroup, and erase all ThreadLocals
600      * after executing any top-level task (see
601      * WorkQueue.afterTopLevelExec).  The associated mechanics (mainly
602      * in ForkJoinWorkerThread) may be JVM-dependent and must access
603      * particular Thread class fields to achieve this effect.
604      *
605      * Memory placement
606      * ================
607      *
608      * Performance can be very sensitive to placement of instances of
609      * ForkJoinPool and WorkQueues and their queue arrays. To reduce
610      * false-sharing impact, the @Contended annotation isolates
611      * adjacent WorkQueue instances, as well as the ForkJoinPool.ctl
612      * field. WorkQueue arrays are allocated (by their threads) with
613      * larger initial sizes than most ever need, mostly to reduce
614      * false sharing with current garbage collectors that use cardmark
615      * tables.
616      *
617      * Style notes
618      * ===========
619      *
620      * Memory ordering relies mainly on VarHandles.  This can be
621      * awkward and ugly, but also reflects the need to control
622      * outcomes across the unusual cases that arise in very racy code
623      * with very few invariants. All fields are read into locals
624      * before use, and null-checked if they are references.  Array
625      * accesses using masked indices include checks (that are always
626      * true) that the array length is non-zero to avoid compilers
627      * inserting more expensive traps.  This is usually done in a
628      * "C"-like style of listing declarations at the heads of methods
629      * or blocks, and using inline assignments on first encounter.
630      * Nearly all explicit checks lead to bypass/return, not exception
631      * throws, because they may legitimately arise due to
632      * cancellation/revocation during shutdown.
633      *
634      * There is a lot of representation-level coupling among classes
635      * ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask.  The
636      * fields of WorkQueue maintain data structures managed by
637      * ForkJoinPool, so are directly accessed.  There is little point
638      * trying to reduce this, since any associated future changes in
639      * representations will need to be accompanied by algorithmic
640      * changes anyway. Several methods intrinsically sprawl because
641      * they must accumulate sets of consistent reads of fields held in
642      * local variables. Some others are artificially broken up to
643      * reduce producer/consumer imbalances due to dynamic compilation.
644      * There are also other coding oddities (including several
645      * unnecessary-looking hoisted null checks) that help some methods
646      * perform reasonably even when interpreted (not compiled).
647      *
648      * The order of declarations in this file is (with a few exceptions):
649      * (1) Static utility functions
650      * (2) Nested (static) classes
651      * (3) Static fields
652      * (4) Fields, along with constants used when unpacking some of them
653      * (5) Internal control methods
654      * (6) Callbacks and other support for ForkJoinTask methods
655      * (7) Exported methods
656      * (8) Static block initializing statics in minimally dependent order
657      */
658 
659     // Static utilities
660 
661     /**
662      * If there is a security manager, makes sure caller has
663      * permission to modify threads.
664      */
checkPermission()665     private static void checkPermission() {
666         SecurityManager security = System.getSecurityManager();
667         if (security != null)
668             security.checkPermission(modifyThreadPermission);
669     }
670 
671     // Nested classes
672 
673     /**
674      * Factory for creating new {@link ForkJoinWorkerThread}s.
675      * A {@code ForkJoinWorkerThreadFactory} must be defined and used
676      * for {@code ForkJoinWorkerThread} subclasses that extend base
677      * functionality or initialize threads with different contexts.
678      */
679     public static interface ForkJoinWorkerThreadFactory {
680         /**
681          * Returns a new worker thread operating in the given pool.
682          * Returning null or throwing an exception may result in tasks
683          * never being executed.  If this method throws an exception,
684          * it is relayed to the caller of the method (for example
685          * {@code execute}) causing attempted thread creation. If this
686          * method returns null or throws an exception, it is not
687          * retried until the next attempted creation (for example
688          * another call to {@code execute}).
689          *
690          * @param pool the pool this thread works in
691          * @return the new worker thread, or {@code null} if the request
692          *         to create a thread is rejected
693          * @throws NullPointerException if the pool is null
694          */
newThread(ForkJoinPool pool)695         public ForkJoinWorkerThread newThread(ForkJoinPool pool);
696     }
697 
contextWithPermissions(Permission .... perms)698     static AccessControlContext contextWithPermissions(Permission ... perms) {
699         Permissions permissions = new Permissions();
700         for (Permission perm : perms)
701             permissions.add(perm);
702         return new AccessControlContext(
703             new ProtectionDomain[] { new ProtectionDomain(null, permissions) });
704     }
705 
706     /**
707      * Default ForkJoinWorkerThreadFactory implementation; creates a
708      * new ForkJoinWorkerThread using the system class loader as the
709      * thread context class loader.
710      */
711     private static final class DefaultForkJoinWorkerThreadFactory
712         implements ForkJoinWorkerThreadFactory {
713         private static final AccessControlContext ACC = contextWithPermissions(
714             new RuntimePermission("getClassLoader"),
715             new RuntimePermission("setContextClassLoader"));
716 
newThread(ForkJoinPool pool)717         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
718             return AccessController.doPrivileged(
719                 new PrivilegedAction<>() {
720                     public ForkJoinWorkerThread run() {
721                         return new ForkJoinWorkerThread(
722                             pool, ClassLoader.getSystemClassLoader()); }},
723                 ACC);
724         }
725     }
726 
727     // Constants shared across ForkJoinPool and WorkQueue
728 
729     // Bounds
730     static final int SWIDTH       = 16;            // width of short
731     static final int SMASK        = 0xffff;        // short bits == max index
732     static final int MAX_CAP      = 0x7fff;        // max #workers - 1
733     static final int SQMASK       = 0x007e;        // max 64 (even) slots
734 
735     // Masks and units for WorkQueue.phase and ctl sp subfield
736     static final int UNSIGNALLED  = 1 << 31;       // must be negative
737     static final int SS_SEQ       = 1 << 16;       // version count
738     static final int QLOCK        = 1;             // must be 1
739 
740     // Mode bits and sentinels, some also used in WorkQueue id and.source fields
741     static final int OWNED        = 1;             // queue has owner thread
742     static final int FIFO         = 1 << 16;       // fifo queue or access mode
743     static final int SHUTDOWN     = 1 << 18;
744     static final int TERMINATED   = 1 << 19;
745     static final int STOP         = 1 << 31;       // must be negative
746     static final int QUIET        = 1 << 30;       // not scanning or working
747     static final int DORMANT      = QUIET | UNSIGNALLED;
748 
749     /**
750      * Initial capacity of work-stealing queue array.
751      * Must be a power of two, at least 2.
752      */
753     static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
754 
755     /**
756      * Maximum capacity for queue arrays. Must be a power of two less
757      * than or equal to 1 << (31 - width of array entry) to ensure
758      * lack of wraparound of index calculations, but defined to a
759      * value a bit less than this to help users trap runaway programs
760      * before saturating systems.
761      */
762     static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
763 
764     /**
765      * The maximum number of top-level polls per worker before
766      * checking other queues, expressed as a bit shift to, in effect,
767      * multiply by pool size, and then use as random value mask, so
768      * average bound is about poolSize*(1<<TOP_BOUND_SHIFT).  See
769      * above for rationale.
770      */
771     static final int TOP_BOUND_SHIFT = 10;
772 
773     /**
774      * Queues supporting work-stealing as well as external task
775      * submission. See above for descriptions and algorithms.
776      */
777     @jdk.internal.vm.annotation.Contended
778     static final class WorkQueue {
779         volatile int source;       // source queue id, or sentinel
780         int id;                    // pool index, mode, tag
781         int base;                  // index of next slot for poll
782         int top;                   // index of next slot for push
783         volatile int phase;        // versioned, negative: queued, 1: locked
784         int stackPred;             // pool stack (ctl) predecessor link
785         int nsteals;               // number of steals
786         ForkJoinTask<?>[] array;   // the queued tasks; power of 2 size
787         final ForkJoinPool pool;   // the containing pool (may be null)
788         final ForkJoinWorkerThread owner; // owning thread or null if shared
789 
790         WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
791             this.pool = pool;
792             this.owner = owner;
793             // Place indices in the center of array (that is not yet allocated)
794             base = top = INITIAL_QUEUE_CAPACITY >>> 1;
795         }
796 
797         /**
798          * Tries to lock shared queue by CASing phase field.
799          */
800         final boolean tryLockPhase() {
801             return PHASE.compareAndSet(this, 0, 1);
802         }
803 
804         final void releasePhaseLock() {
805             PHASE.setRelease(this, 0);
806         }
807 
808         /**
809          * Returns an exportable index (used by ForkJoinWorkerThread).
810          */
811         final int getPoolIndex() {
812             return (id & 0xffff) >>> 1; // ignore odd/even tag bit
813         }
814 
815         /**
816          * Returns the approximate number of tasks in the queue.
817          */
818         final int queueSize() {
819             int n = (int)BASE.getAcquire(this) - top;
820             return (n >= 0) ? 0 : -n; // ignore transient negative
821         }
822 
823         /**
824          * Provides a more accurate estimate of whether this queue has
825          * any tasks than does queueSize, by checking whether a
826          * near-empty queue has at least one unclaimed task.
827          */
828         final boolean isEmpty() {
829             ForkJoinTask<?>[] a; int n, cap, b;
830             VarHandle.acquireFence(); // needed by external callers
831             return ((n = (b = base) - top) >= 0 || // possibly one task
832                     (n == -1 && ((a = array) == null ||
833                                  (cap = a.length) == 0 ||
834                                  a[(cap - 1) & b] == null)));
835         }
836 
837         /**
838          * Pushes a task. Call only by owner in unshared queues.
839          *
840          * @param task the task. Caller must ensure non-null.
841          * @throws RejectedExecutionException if array cannot be resized
842          */
843         final void push(ForkJoinTask<?> task) {
844             ForkJoinTask<?>[] a;
845             int s = top, d, cap, m;
846             ForkJoinPool p = pool;
847             if ((a = array) != null && (cap = a.length) > 0) {
848                 QA.setRelease(a, (m = cap - 1) & s, task);
849                 top = s + 1;
850                 if (((d = s - (int)BASE.getAcquire(this)) & ~1) == 0 &&
851                     p != null) {                 // size 0 or 1
852                     VarHandle.fullFence();
853                     p.signalWork();
854                 }
855                 else if (d == m)
856                     growArray(false);
857             }
858         }
859 
860         /**
861          * Version of push for shared queues. Call only with phase lock held.
862          * @return true if should signal work
863          */
864         final boolean lockedPush(ForkJoinTask<?> task) {
865             ForkJoinTask<?>[] a;
866             boolean signal = false;
867             int s = top, b = base, cap, d;
868             if ((a = array) != null && (cap = a.length) > 0) {
869                 a[(cap - 1) & s] = task;
870                 top = s + 1;
871                 if (b - s + cap - 1 == 0)
872                     growArray(true);
873                 else {
874                     phase = 0; // full volatile unlock
875                     if (((s - base) & ~1) == 0) // size 0 or 1
876                         signal = true;
877                 }
878             }
879             return signal;
880         }
881 
882         /**
883          * Doubles the capacity of array. Call either by owner or with
884          * lock held -- it is OK for base, but not top, to move while
885          * resizings are in progress.
886          */
887         final void growArray(boolean locked) {
888             ForkJoinTask<?>[] newA = null;
889             try {
890                 ForkJoinTask<?>[] oldA; int oldSize, newSize;
891                 if ((oldA = array) != null && (oldSize = oldA.length) > 0 &&
892                     (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY &&
893                     newSize > 0) {
894                     try {
895                         newA = new ForkJoinTask<?>[newSize];
896                     } catch (OutOfMemoryError ex) {
897                     }
898                     if (newA != null) { // poll from old array, push to new
899                         int oldMask = oldSize - 1, newMask = newSize - 1;
900                         for (int s = top - 1, k = oldMask; k >= 0; --k) {
901                             ForkJoinTask<?> x = (ForkJoinTask<?>)
902                                 QA.getAndSet(oldA, s & oldMask, null);
903                             if (x != null)
904                                 newA[s-- & newMask] = x;
905                             else
906                                 break;
907                         }
908                         array = newA;
909                         VarHandle.releaseFence();
910                     }
911                 }
912             } finally {
913                 if (locked)
914                     phase = 0;
915             }
916             if (newA == null)
917                 throw new RejectedExecutionException("Queue capacity exceeded");
918         }
919 
920         /**
921          * Takes next task, if one exists, in FIFO order.
922          */
923         final ForkJoinTask<?> poll() {
924             int b, k, cap; ForkJoinTask<?>[] a;
925             while ((a = array) != null && (cap = a.length) > 0 &&
926                    top - (b = base) > 0) {
927                 ForkJoinTask<?> t = (ForkJoinTask<?>)
928                     QA.getAcquire(a, k = (cap - 1) & b);
929                 if (base == b++) {
930                     if (t == null)
931                         Thread.yield(); // await index advance
932                     else if (QA.compareAndSet(a, k, t, null)) {
933                         BASE.setOpaque(this, b);
934                         return t;
935                     }
936                 }
937             }
938             return null;
939         }
940 
941         /**
942          * Takes next task, if one exists, in order specified by mode.
943          */
944         final ForkJoinTask<?> nextLocalTask() {
945             ForkJoinTask<?> t = null;
946             int md = id, b, s, d, cap; ForkJoinTask<?>[] a;
947             if ((a = array) != null && (cap = a.length) > 0 &&
948                 (d = (s = top) - (b = base)) > 0) {
949                 if ((md & FIFO) == 0 || d == 1) {
950                     if ((t = (ForkJoinTask<?>)
951                          QA.getAndSet(a, (cap - 1) & --s, null)) != null)
952                         TOP.setOpaque(this, s);
953                 }
954                 else if ((t = (ForkJoinTask<?>)
955                           QA.getAndSet(a, (cap - 1) & b++, null)) != null) {
956                     BASE.setOpaque(this, b);
957                 }
958                 else // on contention in FIFO mode, use regular poll
959                     t = poll();
960             }
961             return t;
962         }
963 
964         /**
965          * Returns next task, if one exists, in order specified by mode.
966          */
967         final ForkJoinTask<?> peek() {
968             int cap; ForkJoinTask<?>[] a;
969             return ((a = array) != null && (cap = a.length) > 0) ?
970                 a[(cap - 1) & ((id & FIFO) != 0 ? base : top - 1)] : null;
971         }
972 
973         /**
974          * Pops the given task only if it is at the current top.
975          */
976         final boolean tryUnpush(ForkJoinTask<?> task) {
977             boolean popped = false;
978             int s, cap; ForkJoinTask<?>[] a;
979             if ((a = array) != null && (cap = a.length) > 0 &&
980                 (s = top) != base &&
981                 (popped = QA.compareAndSet(a, (cap - 1) & --s, task, null)))
982                 TOP.setOpaque(this, s);
983             return popped;
984         }
985 
986         /**
987          * Shared version of tryUnpush.
988          */
989         final boolean tryLockedUnpush(ForkJoinTask<?> task) {
990             boolean popped = false;
991             int s = top - 1, k, cap; ForkJoinTask<?>[] a;
992             if ((a = array) != null && (cap = a.length) > 0 &&
993                 a[k = (cap - 1) & s] == task && tryLockPhase()) {
994                 if (top == s + 1 && array == a &&
995                     (popped = QA.compareAndSet(a, k, task, null)))
996                     top = s;
997                 releasePhaseLock();
998             }
999             return popped;
1000         }
1001 
1002         /**
1003          * Removes and cancels all known tasks, ignoring any exceptions.
1004          */
1005         final void cancelAll() {
1006             for (ForkJoinTask<?> t; (t = poll()) != null; )
1007                 ForkJoinTask.cancelIgnoringExceptions(t);
1008         }
1009 
1010         // Specialized execution methods
1011 
1012         /**
1013          * Runs the given (stolen) task if nonnull, as well as
1014          * remaining local tasks and others available from the given
1015          * queue, up to bound n (to avoid infinite unfairness).
1016          */
1017         final void topLevelExec(ForkJoinTask<?> t, WorkQueue q, int n) {
1018             if (t != null && q != null) { // hoist checks
1019                 int nstolen = 1;
1020                 for (;;) {
1021                     t.doExec();
1022                     if (n-- < 0)
1023                         break;
1024                     else if ((t = nextLocalTask()) == null) {
1025                         if ((t = q.poll()) == null)
1026                             break;
1027                         else
1028                             ++nstolen;
1029                     }
1030                 }
1031                 ForkJoinWorkerThread thread = owner;
1032                 nsteals += nstolen;
1033                 source = 0;
1034                 if (thread != null)
1035                     thread.afterTopLevelExec();
1036             }
1037         }
1038 
1039         /**
1040          * If present, removes task from queue and executes it.
1041          */
1042         final void tryRemoveAndExec(ForkJoinTask<?> task) {
1043             ForkJoinTask<?>[] a; int s, cap;
1044             if ((a = array) != null && (cap = a.length) > 0 &&
1045                 (s = top) - base > 0) { // traverse from top
1046                 for (int m = cap - 1, ns = s - 1, i = ns; ; --i) {
1047                     int index = i & m;
1048                     ForkJoinTask<?> t = (ForkJoinTask<?>)QA.get(a, index);
1049                     if (t == null)
1050                         break;
1051                     else if (t == task) {
1052                         if (QA.compareAndSet(a, index, t, null)) {
1053                             top = ns;   // safely shift down
1054                             for (int j = i; j != ns; ++j) {
1055                                 ForkJoinTask<?> f;
1056                                 int pindex = (j + 1) & m;
1057                                 f = (ForkJoinTask<?>)QA.get(a, pindex);
1058                                 QA.setVolatile(a, pindex, null);
1059                                 int jindex = j & m;
1060                                 QA.setRelease(a, jindex, f);
1061                             }
1062                             VarHandle.releaseFence();
1063                             t.doExec();
1064                         }
1065                         break;
1066                     }
1067                 }
1068             }
1069         }
1070 
1071         /**
1072          * Tries to pop and run tasks within the target's computation
1073          * until done, not found, or limit exceeded.
1074          *
1075          * @param task root of CountedCompleter computation
1076          * @param limit max runs, or zero for no limit
1077          * @param shared true if must lock to extract task
1078          * @return task status on exit
1079          */
1080         final int helpCC(CountedCompleter<?> task, int limit, boolean shared) {
1081             int status = 0;
1082             if (task != null && (status = task.status) >= 0) {
1083                 int s, k, cap; ForkJoinTask<?>[] a;
1084                 while ((a = array) != null && (cap = a.length) > 0 &&
1085                        (s = top) - base > 0) {
1086                     CountedCompleter<?> v = null;
1087                     ForkJoinTask<?> o = a[k = (cap - 1) & (s - 1)];
1088                     if (o instanceof CountedCompleter) {
1089                         CountedCompleter<?> t = (CountedCompleter<?>)o;
1090                         for (CountedCompleter<?> f = t;;) {
1091                             if (f != task) {
1092                                 if ((f = f.completer) == null)
1093                                     break;
1094                             }
1095                             else if (shared) {
1096                                 if (tryLockPhase()) {
1097                                     if (top == s && array == a &&
1098                                         QA.compareAndSet(a, k, t, null)) {
1099                                         top = s - 1;
1100                                         v = t;
1101                                     }
1102                                     releasePhaseLock();
1103                                 }
1104                                 break;
1105                             }
1106                             else {
1107                                 if (QA.compareAndSet(a, k, t, null)) {
1108                                     top = s - 1;
1109                                     v = t;
1110                                 }
1111                                 break;
1112                             }
1113                         }
1114                     }
1115                     if (v != null)
1116                         v.doExec();
1117                     if ((status = task.status) < 0 || v == null ||
1118                         (limit != 0 && --limit == 0))
1119                         break;
1120                 }
1121             }
1122             return status;
1123         }
1124 
1125         /**
1126          * Tries to poll and run AsynchronousCompletionTasks until
1127          * none found or blocker is released
1128          *
1129          * @param blocker the blocker
1130          */
1131         final void helpAsyncBlocker(ManagedBlocker blocker) {
1132             if (blocker != null) {
1133                 int b, k, cap; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
1134                 while ((a = array) != null && (cap = a.length) > 0 &&
1135                        top - (b = base) > 0) {
1136                     t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1137                     if (blocker.isReleasable())
1138                         break;
1139                     else if (base == b++ && t != null) {
1140                         if (!(t instanceof CompletableFuture.
1141                               AsynchronousCompletionTask))
1142                             break;
1143                         else if (QA.compareAndSet(a, k, t, null)) {
1144                             BASE.setOpaque(this, b);
1145                             t.doExec();
1146                         }
1147                     }
1148                 }
1149             }
1150         }
1151 
1152         /**
1153          * Returns true if owned and not known to be blocked.
1154          */
1155         final boolean isApparentlyUnblocked() {
1156             Thread wt; Thread.State s;
1157             return ((wt = owner) != null &&
1158                     (s = wt.getState()) != Thread.State.BLOCKED &&
1159                     s != Thread.State.WAITING &&
1160                     s != Thread.State.TIMED_WAITING);
1161         }
1162 
1163         // VarHandle mechanics.
1164         static final VarHandle PHASE;
1165         static final VarHandle BASE;
1166         static final VarHandle TOP;
1167         static {
1168             try {
1169                 MethodHandles.Lookup l = MethodHandles.lookup();
1170                 PHASE = l.findVarHandle(WorkQueue.class, "phase", int.class);
1171                 BASE = l.findVarHandle(WorkQueue.class, "base", int.class);
1172                 TOP = l.findVarHandle(WorkQueue.class, "top", int.class);
1173             } catch (ReflectiveOperationException e) {
1174                 throw new ExceptionInInitializerError(e);
1175             }
1176         }
1177     }
1178 
1179     // static fields (initialized in static initializer below)
1180 
1181     /**
1182      * Creates a new ForkJoinWorkerThread. This factory is used unless
1183      * overridden in ForkJoinPool constructors.
1184      */
1185     public static final ForkJoinWorkerThreadFactory
1186         defaultForkJoinWorkerThreadFactory;
1187 
1188     /**
1189      * Permission required for callers of methods that may start or
1190      * kill threads.
1191      */
1192     static final RuntimePermission modifyThreadPermission;
1193 
1194     /**
1195      * Common (static) pool. Non-null for public use unless a static
1196      * construction exception, but internal usages null-check on use
1197      * to paranoically avoid potential initialization circularities
1198      * as well as to simplify generated code.
1199      */
1200     static final ForkJoinPool common;
1201 
1202     /**
1203      * Common pool parallelism. To allow simpler use and management
1204      * when common pool threads are disabled, we allow the underlying
1205      * common.parallelism field to be zero, but in that case still report
1206      * parallelism as 1 to reflect resulting caller-runs mechanics.
1207      */
1208     static final int COMMON_PARALLELISM;
1209 
1210     /**
1211      * Limit on spare thread construction in tryCompensate.
1212      */
1213     private static final int COMMON_MAX_SPARES;
1214 
1215     /**
1216      * Sequence number for creating workerNamePrefix.
1217      */
1218     private static int poolNumberSequence;
1219 
1220     /**
1221      * Returns the next sequence number. We don't expect this to
1222      * ever contend, so use simple builtin sync.
1223      */
1224     private static final synchronized int nextPoolId() {
1225         return ++poolNumberSequence;
1226     }
1227 
1228     // static configuration constants
1229 
1230     /**
1231      * Default idle timeout value (in milliseconds) for the thread
1232      * triggering quiescence to park waiting for new work
1233      */
1234     private static final long DEFAULT_KEEPALIVE = 60_000L;
1235 
1236     /**
1237      * Undershoot tolerance for idle timeouts
1238      */
1239     private static final long TIMEOUT_SLOP = 20L;
1240 
1241     /**
1242      * The default value for COMMON_MAX_SPARES.  Overridable using the
1243      * "java.util.concurrent.ForkJoinPool.common.maximumSpares" system
1244      * property.  The default value is far in excess of normal
1245      * requirements, but also far short of MAX_CAP and typical OS
1246      * thread limits, so allows JVMs to catch misuse/abuse before
1247      * running out of resources needed to do so.
1248      */
1249     private static final int DEFAULT_COMMON_MAX_SPARES = 256;
1250 
1251     /**
1252      * Increment for seed generators. See class ThreadLocal for
1253      * explanation.
1254      */
1255     private static final int SEED_INCREMENT = 0x9e3779b9;
1256 
1257     /*
1258      * Bits and masks for field ctl, packed with 4 16 bit subfields:
1259      * RC: Number of released (unqueued) workers minus target parallelism
1260      * TC: Number of total workers minus target parallelism
1261      * SS: version count and status of top waiting thread
1262      * ID: poolIndex of top of Treiber stack of waiters
1263      *
1264      * When convenient, we can extract the lower 32 stack top bits
1265      * (including version bits) as sp=(int)ctl.  The offsets of counts
1266      * by the target parallelism and the positionings of fields makes
1267      * it possible to perform the most common checks via sign tests of
1268      * fields: When ac is negative, there are not enough unqueued
1269      * workers, when tc is negative, there are not enough total
1270      * workers.  When sp is non-zero, there are waiting workers.  To
1271      * deal with possibly negative fields, we use casts in and out of
1272      * "short" and/or signed shifts to maintain signedness.
1273      *
1274      * Because it occupies uppermost bits, we can add one release count
1275      * using getAndAddLong of RC_UNIT, rather than CAS, when returning
1276      * from a blocked join.  Other updates entail multiple subfields
1277      * and masking, requiring CAS.
1278      *
1279      * The limits packed in field "bounds" are also offset by the
1280      * parallelism level to make them comparable to the ctl rc and tc
1281      * fields.
1282      */
1283 
1284     // Lower and upper word masks
1285     private static final long SP_MASK    = 0xffffffffL;
1286     private static final long UC_MASK    = ~SP_MASK;
1287 
1288     // Release counts
1289     private static final int  RC_SHIFT   = 48;
1290     private static final long RC_UNIT    = 0x0001L << RC_SHIFT;
1291     private static final long RC_MASK    = 0xffffL << RC_SHIFT;
1292 
1293     // Total counts
1294     private static final int  TC_SHIFT   = 32;
1295     private static final long TC_UNIT    = 0x0001L << TC_SHIFT;
1296     private static final long TC_MASK    = 0xffffL << TC_SHIFT;
1297     private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); // sign
1298 
1299     // Instance fields
1300 
1301     volatile long stealCount;            // collects worker nsteals
1302     final long keepAlive;                // milliseconds before dropping if idle
1303     int indexSeed;                       // next worker index
1304     final int bounds;                    // min, max threads packed as shorts
1305     volatile int mode;                   // parallelism, runstate, queue mode
1306     WorkQueue[] workQueues;              // main registry
1307     final String workerNamePrefix;       // for worker thread string; sync lock
1308     final ForkJoinWorkerThreadFactory factory;
1309     final UncaughtExceptionHandler ueh;  // per-worker UEH
1310     final Predicate<? super ForkJoinPool> saturate;
1311 
1312     @jdk.internal.vm.annotation.Contended("fjpctl") // segregate
1313     volatile long ctl;                   // main pool control
1314 
1315     // Creating, registering and deregistering workers
1316 
1317     /**
1318      * Tries to construct and start one worker. Assumes that total
1319      * count has already been incremented as a reservation.  Invokes
1320      * deregisterWorker on any failure.
1321      *
1322      * @return true if successful
1323      */
1324     private boolean createWorker() {
1325         ForkJoinWorkerThreadFactory fac = factory;
1326         Throwable ex = null;
1327         ForkJoinWorkerThread wt = null;
1328         try {
1329             if (fac != null && (wt = fac.newThread(this)) != null) {
1330                 wt.start();
1331                 return true;
1332             }
1333         } catch (Throwable rex) {
1334             ex = rex;
1335         }
1336         deregisterWorker(wt, ex);
1337         return false;
1338     }
1339 
1340     /**
1341      * Tries to add one worker, incrementing ctl counts before doing
1342      * so, relying on createWorker to back out on failure.
1343      *
1344      * @param c incoming ctl value, with total count negative and no
1345      * idle workers.  On CAS failure, c is refreshed and retried if
1346      * this holds (otherwise, a new worker is not needed).
1347      */
1348     private void tryAddWorker(long c) {
1349         do {
1350             long nc = ((RC_MASK & (c + RC_UNIT)) |
1351                        (TC_MASK & (c + TC_UNIT)));
1352             if (ctl == c && CTL.compareAndSet(this, c, nc)) {
1353                 createWorker();
1354                 break;
1355             }
1356         } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
1357     }
1358 
1359     /**
1360      * Callback from ForkJoinWorkerThread constructor to establish and
1361      * record its WorkQueue.
1362      *
1363      * @param wt the worker thread
1364      * @return the worker's queue
1365      */
1366     final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
1367         UncaughtExceptionHandler handler;
1368         wt.setDaemon(true);                             // configure thread
1369         if ((handler = ueh) != null)
1370             wt.setUncaughtExceptionHandler(handler);
1371         int tid = 0;                                    // for thread name
1372         int idbits = mode & FIFO;
1373         String prefix = workerNamePrefix;
1374         WorkQueue w = new WorkQueue(this, wt);
1375         if (prefix != null) {
1376             synchronized (prefix) {
1377                 WorkQueue[] ws = workQueues; int n;
1378                 int s = indexSeed += SEED_INCREMENT;
1379                 idbits |= (s & ~(SMASK | FIFO | DORMANT));
1380                 if (ws != null && (n = ws.length) > 1) {
1381                     int m = n - 1;
1382                     tid = m & ((s << 1) | 1);           // odd-numbered indices
1383                     for (int probes = n >>> 1;;) {      // find empty slot
1384                         WorkQueue q;
1385                         if ((q = ws[tid]) == null || q.phase == QUIET)
1386                             break;
1387                         else if (--probes == 0) {
1388                             tid = n | 1;                // resize below
1389                             break;
1390                         }
1391                         else
1392                             tid = (tid + 2) & m;
1393                     }
1394                     w.phase = w.id = tid | idbits;      // now publishable
1395 
1396                     if (tid < n)
1397                         ws[tid] = w;
1398                     else {                              // expand array
1399                         int an = n << 1;
1400                         WorkQueue[] as = new WorkQueue[an];
1401                         as[tid] = w;
1402                         int am = an - 1;
1403                         for (int j = 0; j < n; ++j) {
1404                             WorkQueue v;                // copy external queue
1405                             if ((v = ws[j]) != null)    // position may change
1406                                 as[v.id & am & SQMASK] = v;
1407                             if (++j >= n)
1408                                 break;
1409                             as[j] = ws[j];              // copy worker
1410                         }
1411                         workQueues = as;
1412                     }
1413                 }
1414             }
1415             wt.setName(prefix.concat(Integer.toString(tid)));
1416         }
1417         return w;
1418     }
1419 
1420     /**
1421      * Final callback from terminating worker, as well as upon failure
1422      * to construct or start a worker.  Removes record of worker from
1423      * array, and adjusts counts. If pool is shutting down, tries to
1424      * complete termination.
1425      *
1426      * @param wt the worker thread, or null if construction failed
1427      * @param ex the exception causing failure, or null if none
1428      */
1429     final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
1430         WorkQueue w = null;
1431         int phase = 0;
1432         if (wt != null && (w = wt.workQueue) != null) {
1433             Object lock = workerNamePrefix;
1434             int wid = w.id;
1435             long ns = (long)w.nsteals & 0xffffffffL;
1436             if (lock != null) {
1437                 synchronized (lock) {
1438                     WorkQueue[] ws; int n, i;         // remove index from array
1439                     if ((ws = workQueues) != null && (n = ws.length) > 0 &&
1440                         ws[i = wid & (n - 1)] == w)
1441                         ws[i] = null;
1442                     stealCount += ns;
1443                 }
1444             }
1445             phase = w.phase;
1446         }
1447         if (phase != QUIET) {                         // else pre-adjusted
1448             long c;                                   // decrement counts
1449             do {} while (!CTL.weakCompareAndSet
1450                          (this, c = ctl, ((RC_MASK & (c - RC_UNIT)) |
1451                                           (TC_MASK & (c - TC_UNIT)) |
1452                                           (SP_MASK & c))));
1453         }
1454         if (w != null)
1455             w.cancelAll();                            // cancel remaining tasks
1456 
1457         if (!tryTerminate(false, false) &&            // possibly replace worker
1458             w != null && w.array != null)             // avoid repeated failures
1459             signalWork();
1460 
1461         if (ex == null)                               // help clean on way out
1462             ForkJoinTask.helpExpungeStaleExceptions();
1463         else                                          // rethrow
1464             ForkJoinTask.rethrow(ex);
1465     }
1466 
1467     /**
1468      * Tries to create or release a worker if too few are running.
1469      */
1470     final void signalWork() {
1471         for (;;) {
1472             long c; int sp; WorkQueue[] ws; int i; WorkQueue v;
1473             if ((c = ctl) >= 0L)                      // enough workers
1474                 break;
1475             else if ((sp = (int)c) == 0) {            // no idle workers
1476                 if ((c & ADD_WORKER) != 0L)           // too few workers
1477                     tryAddWorker(c);
1478                 break;
1479             }
1480             else if ((ws = workQueues) == null)
1481                 break;                                // unstarted/terminated
1482             else if (ws.length <= (i = sp & SMASK))
1483                 break;                                // terminated
1484             else if ((v = ws[i]) == null)
1485                 break;                                // terminating
1486             else {
1487                 int np = sp & ~UNSIGNALLED;
1488                 int vp = v.phase;
1489                 long nc = (v.stackPred & SP_MASK) | (UC_MASK & (c + RC_UNIT));
1490                 Thread vt = v.owner;
1491                 if (sp == vp && CTL.compareAndSet(this, c, nc)) {
1492                     v.phase = np;
1493                     if (vt != null && v.source < 0)
1494                         LockSupport.unpark(vt);
1495                     break;
1496                 }
1497             }
1498         }
1499     }
1500 
1501     /**
1502      * Tries to decrement counts (sometimes implicitly) and possibly
1503      * arrange for a compensating worker in preparation for blocking:
1504      * If not all core workers yet exist, creates one, else if any are
1505      * unreleased (possibly including caller) releases one, else if
1506      * fewer than the minimum allowed number of workers running,
1507      * checks to see that they are all active, and if so creates an
1508      * extra worker unless over maximum limit and policy is to
1509      * saturate.  Most of these steps can fail due to interference, in
1510      * which case 0 is returned so caller will retry. A negative
1511      * return value indicates that the caller doesn't need to
1512      * re-adjust counts when later unblocked.
1513      *
1514      * @return 1: block then adjust, -1: block without adjust, 0 : retry
1515      */
1516     private int tryCompensate(WorkQueue w) {
1517         int t, n, sp;
1518         long c = ctl;
1519         WorkQueue[] ws = workQueues;
1520         if ((t = (short)(c >>> TC_SHIFT)) >= 0) {
1521             if (ws == null || (n = ws.length) <= 0 || w == null)
1522                 return 0;                        // disabled
1523             else if ((sp = (int)c) != 0) {       // replace or release
1524                 WorkQueue v = ws[sp & (n - 1)];
1525                 int wp = w.phase;
1526                 long uc = UC_MASK & ((wp < 0) ? c + RC_UNIT : c);
1527                 int np = sp & ~UNSIGNALLED;
1528                 if (v != null) {
1529                     int vp = v.phase;
1530                     Thread vt = v.owner;
1531                     long nc = ((long)v.stackPred & SP_MASK) | uc;
1532                     if (vp == sp && CTL.compareAndSet(this, c, nc)) {
1533                         v.phase = np;
1534                         if (vt != null && v.source < 0)
1535                             LockSupport.unpark(vt);
1536                         return (wp < 0) ? -1 : 1;
1537                     }
1538                 }
1539                 return 0;
1540             }
1541             else if ((int)(c >> RC_SHIFT) -      // reduce parallelism
1542                      (short)(bounds & SMASK) > 0) {
1543                 long nc = ((RC_MASK & (c - RC_UNIT)) | (~RC_MASK & c));
1544                 return CTL.compareAndSet(this, c, nc) ? 1 : 0;
1545             }
1546             else {                               // validate
1547                 int md = mode, pc = md & SMASK, tc = pc + t, bc = 0;
1548                 boolean unstable = false;
1549                 for (int i = 1; i < n; i += 2) {
1550                     WorkQueue q; Thread wt; Thread.State ts;
1551                     if ((q = ws[i]) != null) {
1552                         if (q.source == 0) {
1553                             unstable = true;
1554                             break;
1555                         }
1556                         else {
1557                             --tc;
1558                             if ((wt = q.owner) != null &&
1559                                 ((ts = wt.getState()) == Thread.State.BLOCKED ||
1560                                  ts == Thread.State.WAITING))
1561                                 ++bc;            // worker is blocking
1562                         }
1563                     }
1564                 }
1565                 if (unstable || tc != 0 || ctl != c)
1566                     return 0;                    // inconsistent
1567                 else if (t + pc >= MAX_CAP || t >= (bounds >>> SWIDTH)) {
1568                     Predicate<? super ForkJoinPool> sat;
1569                     if ((sat = saturate) != null && sat.test(this))
1570                         return -1;
1571                     else if (bc < pc) {          // lagging
1572                         Thread.yield();          // for retry spins
1573                         return 0;
1574                     }
1575                     else
1576                         throw new RejectedExecutionException(
1577                             "Thread limit exceeded replacing blocked worker");
1578                 }
1579             }
1580         }
1581 
1582         long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK); // expand pool
1583         return CTL.compareAndSet(this, c, nc) && createWorker() ? 1 : 0;
1584     }
1585 
1586     /**
1587      * Top-level runloop for workers, called by ForkJoinWorkerThread.run.
1588      * See above for explanation.
1589      */
1590     final void runWorker(WorkQueue w) {
1591         int r = (w.id ^ ThreadLocalRandom.nextSecondarySeed()) | FIFO; // rng
1592         w.array = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; // initialize
1593         for (;;) {
1594             int phase;
1595             if (scan(w, r)) {                     // scan until apparently empty
1596                 r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // move (xorshift)
1597             }
1598             else if ((phase = w.phase) >= 0) {    // enqueue, then rescan
1599                 long np = (w.phase = (phase + SS_SEQ) | UNSIGNALLED) & SP_MASK;
1600                 long c, nc;
1601                 do {
1602                     w.stackPred = (int)(c = ctl);
1603                     nc = ((c - RC_UNIT) & UC_MASK) | np;
1604                 } while (!CTL.weakCompareAndSet(this, c, nc));
1605             }
1606             else {                                // already queued
1607                 int pred = w.stackPred;
1608                 Thread.interrupted();             // clear before park
1609                 w.source = DORMANT;               // enable signal
1610                 long c = ctl;
1611                 int md = mode, rc = (md & SMASK) + (int)(c >> RC_SHIFT);
1612                 if (md < 0)                       // terminating
1613                     break;
1614                 else if (rc <= 0 && (md & SHUTDOWN) != 0 &&
1615                          tryTerminate(false, false))
1616                     break;                        // quiescent shutdown
1617                 else if (rc <= 0 && pred != 0 && phase == (int)c) {
1618                     long nc = (UC_MASK & (c - TC_UNIT)) | (SP_MASK & pred);
1619                     long d = keepAlive + System.currentTimeMillis();
1620                     LockSupport.parkUntil(this, d);
1621                     if (ctl == c &&               // drop on timeout if all idle
1622                         d - System.currentTimeMillis() <= TIMEOUT_SLOP &&
1623                         CTL.compareAndSet(this, c, nc)) {
1624                         w.phase = QUIET;
1625                         break;
1626                     }
1627                 }
1628                 else if (w.phase < 0)
1629                     LockSupport.park(this);       // OK if spuriously woken
1630                 w.source = 0;                     // disable signal
1631             }
1632         }
1633     }
1634 
1635     /**
1636      * Scans for and if found executes one or more top-level tasks from a queue.
1637      *
1638      * @return true if found an apparently non-empty queue, and
1639      * possibly ran task(s).
1640      */
1641     private boolean scan(WorkQueue w, int r) {
1642         WorkQueue[] ws; int n;
1643         if ((ws = workQueues) != null && (n = ws.length) > 0 && w != null) {
1644             for (int m = n - 1, j = r & m;;) {
1645                 WorkQueue q; int b;
1646                 if ((q = ws[j]) != null && q.top != (b = q.base)) {
1647                     int qid = q.id;
1648                     ForkJoinTask<?>[] a; int cap, k; ForkJoinTask<?> t;
1649                     if ((a = q.array) != null && (cap = a.length) > 0) {
1650                         t = (ForkJoinTask<?>)QA.getAcquire(a, k = (cap - 1) & b);
1651                         if (q.base == b++ && t != null &&
1652                             QA.compareAndSet(a, k, t, null)) {
1653                             q.base = b;
1654                             w.source = qid;
1655                             if (q.top - b > 0)
1656                                 signalWork();
1657                             w.topLevelExec(t, q,  // random fairness bound
1658                                            r & ((n << TOP_BOUND_SHIFT) - 1));
1659                         }
1660                     }
1661                     return true;
1662                 }
1663                 else if (--n > 0)
1664                     j = (j + 1) & m;
1665                 else
1666                     break;
1667             }
1668         }
1669         return false;
1670     }
1671 
1672     /**
1673      * Helps and/or blocks until the given task is done or timeout.
1674      * First tries locally helping, then scans other queues for a task
1675      * produced by one of w's stealers; compensating and blocking if
1676      * none are found (rescanning if tryCompensate fails).
1677      *
1678      * @param w caller
1679      * @param task the task
1680      * @param deadline for timed waits, if nonzero
1681      * @return task status on exit
1682      */
1683     final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
1684         int s = 0;
1685         int seed = ThreadLocalRandom.nextSecondarySeed();
1686         if (w != null && task != null &&
1687             (!(task instanceof CountedCompleter) ||
1688              (s = w.helpCC((CountedCompleter<?>)task, 0, false)) >= 0)) {
1689             w.tryRemoveAndExec(task);
1690             int src = w.source, id = w.id;
1691             int r = (seed >>> 16) | 1, step = (seed & ~1) | 2;
1692             s = task.status;
1693             while (s >= 0) {
1694                 WorkQueue[] ws;
1695                 int n = (ws = workQueues) == null ? 0 : ws.length, m = n - 1;
1696                 while (n > 0) {
1697                     WorkQueue q; int b;
1698                     if ((q = ws[r & m]) != null && q.source == id &&
1699                         q.top != (b = q.base)) {
1700                         ForkJoinTask<?>[] a; int cap, k;
1701                         int qid = q.id;
1702                         if ((a = q.array) != null && (cap = a.length) > 0) {
1703                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1704                                 QA.getAcquire(a, k = (cap - 1) & b);
1705                             if (q.source == id && q.base == b++ &&
1706                                 t != null && QA.compareAndSet(a, k, t, null)) {
1707                                 q.base = b;
1708                                 w.source = qid;
1709                                 t.doExec();
1710                                 w.source = src;
1711                             }
1712                         }
1713                         break;
1714                     }
1715                     else {
1716                         r += step;
1717                         --n;
1718                     }
1719                 }
1720                 if ((s = task.status) < 0)
1721                     break;
1722                 else if (n == 0) { // empty scan
1723                     long ms, ns; int block;
1724                     if (deadline == 0L)
1725                         ms = 0L;                       // untimed
1726                     else if ((ns = deadline - System.nanoTime()) <= 0L)
1727                         break;                         // timeout
1728                     else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
1729                         ms = 1L;                       // avoid 0 for timed wait
1730                     if ((block = tryCompensate(w)) != 0) {
1731                         task.internalWait(ms);
1732                         CTL.getAndAdd(this, (block > 0) ? RC_UNIT : 0L);
1733                     }
1734                     s = task.status;
1735                 }
1736             }
1737         }
1738         return s;
1739     }
1740 
1741     /**
1742      * Runs tasks until {@code isQuiescent()}. Rather than blocking
1743      * when tasks cannot be found, rescans until all others cannot
1744      * find tasks either.
1745      */
1746     final void helpQuiescePool(WorkQueue w) {
1747         int prevSrc = w.source;
1748         int seed = ThreadLocalRandom.nextSecondarySeed();
1749         int r = seed >>> 16, step = r | 1;
1750         for (int source = prevSrc, released = -1;;) { // -1 until known
1751             ForkJoinTask<?> localTask; WorkQueue[] ws;
1752             while ((localTask = w.nextLocalTask()) != null)
1753                 localTask.doExec();
1754             if (w.phase >= 0 && released == -1)
1755                 released = 1;
1756             boolean quiet = true, empty = true;
1757             int n = (ws = workQueues) == null ? 0 : ws.length;
1758             for (int m = n - 1; n > 0; r += step, --n) {
1759                 WorkQueue q; int b;
1760                 if ((q = ws[r & m]) != null) {
1761                     int qs = q.source;
1762                     if (q.top != (b = q.base)) {
1763                         quiet = empty = false;
1764                         ForkJoinTask<?>[] a; int cap, k;
1765                         int qid = q.id;
1766                         if ((a = q.array) != null && (cap = a.length) > 0) {
1767                             if (released == 0) {    // increment
1768                                 released = 1;
1769                                 CTL.getAndAdd(this, RC_UNIT);
1770                             }
1771                             ForkJoinTask<?> t = (ForkJoinTask<?>)
1772                                 QA.getAcquire(a, k = (cap - 1) & b);
1773                             if (q.base == b++ && t != null &&
1774                                 QA.compareAndSet(a, k, t, null)) {
1775                                 q.base = b;
1776                                 w.source = qid;
1777                                 t.doExec();
1778                                 w.source = source = prevSrc;
1779                             }
1780                         }
1781                         break;
1782                     }
1783                     else if ((qs & QUIET) == 0)
1784                         quiet = false;
1785                 }
1786             }
1787             if (quiet) {
1788                 if (released == 0)
1789                     CTL.getAndAdd(this, RC_UNIT);
1790                 w.source = prevSrc;
1791                 break;
1792             }
1793             else if (empty) {
1794                 if (source != QUIET)
1795                     w.source = source = QUIET;
1796                 if (released == 1) {                 // decrement
1797                     released = 0;
1798                     CTL.getAndAdd(this, RC_MASK & -RC_UNIT);
1799                 }
1800             }
1801         }
1802     }
1803 
1804     /**
1805      * Scans for and returns a polled task, if available.
1806      * Used only for untracked polls.
1807      *
1808      * @param submissionsOnly if true, only scan submission queues
1809      */
1810     private ForkJoinTask<?> pollScan(boolean submissionsOnly) {
1811         WorkQueue[] ws; int n;
1812         rescan: while ((mode & STOP) == 0 && (ws = workQueues) != null &&
1813                       (n = ws.length) > 0) {
1814             int m = n - 1;
1815             int r = ThreadLocalRandom.nextSecondarySeed();
1816             int h = r >>> 16;
1817             int origin, step;
1818             if (submissionsOnly) {
1819                 origin = (r & ~1) & m;         // even indices and steps
1820                 step = (h & ~1) | 2;
1821             }
1822             else {
1823                 origin = r & m;
1824                 step = h | 1;
1825             }
1826             boolean nonempty = false;
1827             for (int i = origin, oldSum = 0, checkSum = 0;;) {
1828                 WorkQueue q;
1829                 if ((q = ws[i]) != null) {
1830                     int b; ForkJoinTask<?> t;
1831                     if (q.top - (b = q.base) > 0) {
1832                         nonempty = true;
1833                         if ((t = q.poll()) != null)
1834                             return t;
1835                     }
1836                     else
1837                         checkSum += b + q.id;
1838                 }
1839                 if ((i = (i + step) & m) == origin) {
1840                     if (!nonempty && oldSum == (oldSum = checkSum))
1841                         break rescan;
1842                     checkSum = 0;
1843                     nonempty = false;
1844                 }
1845             }
1846         }
1847         return null;
1848     }
1849 
1850     /**
1851      * Gets and removes a local or stolen task for the given worker.
1852      *
1853      * @return a task, if available
1854      */
1855     final ForkJoinTask<?> nextTaskFor(WorkQueue w) {
1856         ForkJoinTask<?> t;
1857         if (w == null || (t = w.nextLocalTask()) == null)
1858             t = pollScan(false);
1859         return t;
1860     }
1861 
1862     // External operations
1863 
1864     /**
1865      * Adds the given task to a submission queue at submitter's
1866      * current queue, creating one if null or contended.
1867      *
1868      * @param task the task. Caller must ensure non-null.
1869      */
1870     final void externalPush(ForkJoinTask<?> task) {
1871         int r;                                // initialize caller's probe
1872         if ((r = ThreadLocalRandom.getProbe()) == 0) {
1873             ThreadLocalRandom.localInit();
1874             r = ThreadLocalRandom.getProbe();
1875         }
1876         for (;;) {
1877             WorkQueue q;
1878             int md = mode, n;
1879             WorkQueue[] ws = workQueues;
1880             if ((md & SHUTDOWN) != 0 || ws == null || (n = ws.length) <= 0)
1881                 throw new RejectedExecutionException();
1882             else if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue
1883                 int qid = (r | QUIET) & ~(FIFO | OWNED);
1884                 Object lock = workerNamePrefix;
1885                 ForkJoinTask<?>[] qa =
1886                     new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
1887                 q = new WorkQueue(this, null);
1888                 q.array = qa;
1889                 q.id = qid;
1890                 q.source = QUIET;
1891                 if (lock != null) {     // unless disabled, lock pool to install
1892                     synchronized (lock) {
1893                         WorkQueue[] vs; int i, vn;
1894                         if ((vs = workQueues) != null && (vn = vs.length) > 0 &&
1895                             vs[i = qid & (vn - 1) & SQMASK] == null)
1896                             vs[i] = q;  // else another thread already installed
1897                     }
1898                 }
1899             }
1900             else if (!q.tryLockPhase()) // move if busy
1901                 r = ThreadLocalRandom.advanceProbe(r);
1902             else {
1903                 if (q.lockedPush(task))
1904                     signalWork();
1905                 return;
1906             }
1907         }
1908     }
1909 
1910     /**
1911      * Pushes a possibly-external submission.
1912      */
1913     private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) {
1914         Thread t; ForkJoinWorkerThread w; WorkQueue q;
1915         if (task == null)
1916             throw new NullPointerException();
1917         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
1918             (w = (ForkJoinWorkerThread)t).pool == this &&
1919             (q = w.workQueue) != null)
1920             q.push(task);
1921         else
1922             externalPush(task);
1923         return task;
1924     }
1925 
1926     /**
1927      * Returns common pool queue for an external thread.
1928      */
1929     static WorkQueue commonSubmitterQueue() {
1930         ForkJoinPool p = common;
1931         int r = ThreadLocalRandom.getProbe();
1932         WorkQueue[] ws; int n;
1933         return (p != null && (ws = p.workQueues) != null &&
1934                 (n = ws.length) > 0) ?
1935             ws[(n - 1) & r & SQMASK] : null;
1936     }
1937 
1938     /**
1939      * Performs tryUnpush for an external submitter.
1940      */
1941     final boolean tryExternalUnpush(ForkJoinTask<?> task) {
1942         int r = ThreadLocalRandom.getProbe();
1943         WorkQueue[] ws; WorkQueue w; int n;
1944         return ((ws = workQueues) != null &&
1945                 (n = ws.length) > 0 &&
1946                 (w = ws[(n - 1) & r & SQMASK]) != null &&
1947                 w.tryLockedUnpush(task));
1948     }
1949 
1950     /**
1951      * Performs helpComplete for an external submitter.
1952      */
1953     final int externalHelpComplete(CountedCompleter<?> task, int maxTasks) {
1954         int r = ThreadLocalRandom.getProbe();
1955         WorkQueue[] ws; WorkQueue w; int n;
1956         return ((ws = workQueues) != null && (n = ws.length) > 0 &&
1957                 (w = ws[(n - 1) & r & SQMASK]) != null) ?
1958             w.helpCC(task, maxTasks, true) : 0;
1959     }
1960 
1961     /**
1962      * Tries to steal and run tasks within the target's computation.
1963      * The maxTasks argument supports external usages; internal calls
1964      * use zero, allowing unbounded steps (external calls trap
1965      * non-positive values).
1966      *
1967      * @param w caller
1968      * @param maxTasks if non-zero, the maximum number of other tasks to run
1969      * @return task status on exit
1970      */
1971     final int helpComplete(WorkQueue w, CountedCompleter<?> task,
1972                            int maxTasks) {
1973         return (w == null) ? 0 : w.helpCC(task, maxTasks, false);
1974     }
1975 
1976     /**
1977      * Returns a cheap heuristic guide for task partitioning when
1978      * programmers, frameworks, tools, or languages have little or no
1979      * idea about task granularity.  In essence, by offering this
1980      * method, we ask users only about tradeoffs in overhead vs
1981      * expected throughput and its variance, rather than how finely to
1982      * partition tasks.
1983      *
1984      * In a steady state strict (tree-structured) computation, each
1985      * thread makes available for stealing enough tasks for other
1986      * threads to remain active. Inductively, if all threads play by
1987      * the same rules, each thread should make available only a
1988      * constant number of tasks.
1989      *
1990      * The minimum useful constant is just 1. But using a value of 1
1991      * would require immediate replenishment upon each steal to
1992      * maintain enough tasks, which is infeasible.  Further,
1993      * partitionings/granularities of offered tasks should minimize
1994      * steal rates, which in general means that threads nearer the top
1995      * of computation tree should generate more than those nearer the
1996      * bottom. In perfect steady state, each thread is at
1997      * approximately the same level of computation tree. However,
1998      * producing extra tasks amortizes the uncertainty of progress and
1999      * diffusion assumptions.
2000      *
2001      * So, users will want to use values larger (but not much larger)
2002      * than 1 to both smooth over transient shortages and hedge
2003      * against uneven progress; as traded off against the cost of
2004      * extra task overhead. We leave the user to pick a threshold
2005      * value to compare with the results of this call to guide
2006      * decisions, but recommend values such as 3.
2007      *
2008      * When all threads are active, it is on average OK to estimate
2009      * surplus strictly locally. In steady-state, if one thread is
2010      * maintaining say 2 surplus tasks, then so are others. So we can
2011      * just use estimated queue length.  However, this strategy alone
2012      * leads to serious mis-estimates in some non-steady-state
2013      * conditions (ramp-up, ramp-down, other stalls). We can detect
2014      * many of these by further considering the number of "idle"
2015      * threads, that are known to have zero queued tasks, so
2016      * compensate by a factor of (#idle/#active) threads.
2017      */
2018     static int getSurplusQueuedTaskCount() {
2019         Thread t; ForkJoinWorkerThread wt; ForkJoinPool pool; WorkQueue q;
2020         if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) &&
2021             (pool = (wt = (ForkJoinWorkerThread)t).pool) != null &&
2022             (q = wt.workQueue) != null) {
2023             int p = pool.mode & SMASK;
2024             int a = p + (int)(pool.ctl >> RC_SHIFT);
2025             int n = q.top - q.base;
2026             return n - (a > (p >>>= 1) ? 0 :
2027                         a > (p >>>= 1) ? 1 :
2028                         a > (p >>>= 1) ? 2 :
2029                         a > (p >>>= 1) ? 4 :
2030                         8);
2031         }
2032         return 0;
2033     }
2034 
2035     // Termination
2036 
2037     /**
2038      * Possibly initiates and/or completes termination.
2039      *
2040      * @param now if true, unconditionally terminate, else only
2041      * if no work and no active workers
2042      * @param enable if true, terminate when next possible
2043      * @return true if terminating or terminated
2044      */
2045     private boolean tryTerminate(boolean now, boolean enable) {
2046         int md; // 3 phases: try to set SHUTDOWN, then STOP, then TERMINATED
2047 
2048         while (((md = mode) & SHUTDOWN) == 0) {
2049             if (!enable || this == common)        // cannot shutdown
2050                 return false;
2051             else
2052                 MODE.compareAndSet(this, md, md | SHUTDOWN);
2053         }
2054 
2055         while (((md = mode) & STOP) == 0) {       // try to initiate termination
2056             if (!now) {                           // check if quiescent & empty
2057                 for (long oldSum = 0L;;) {        // repeat until stable
2058                     boolean running = false;
2059                     long checkSum = ctl;
2060                     WorkQueue[] ws = workQueues;
2061                     if ((md & SMASK) + (int)(checkSum >> RC_SHIFT) > 0)
2062                         running = true;
2063                     else if (ws != null) {
2064                         WorkQueue w;
2065                         for (int i = 0; i < ws.length; ++i) {
2066                             if ((w = ws[i]) != null) {
2067                                 int s = w.source, p = w.phase;
2068                                 int d = w.id, b = w.base;
2069                                 if (b != w.top ||
2070                                     ((d & 1) == 1 && (s >= 0 || p >= 0))) {
2071                                     running = true;
2072                                     break;     // working, scanning, or have work
2073                                 }
2074                                 checkSum += (((long)s << 48) + ((long)p << 32) +
2075                                              ((long)b << 16) + (long)d);
2076                             }
2077                         }
2078                     }
2079                     if (((md = mode) & STOP) != 0)
2080                         break;                 // already triggered
2081                     else if (running)
2082                         return false;
2083                     else if (workQueues == ws && oldSum == (oldSum = checkSum))
2084                         break;
2085                 }
2086             }
2087             if ((md & STOP) == 0)
2088                 MODE.compareAndSet(this, md, md | STOP);
2089         }
2090 
2091         while (((md = mode) & TERMINATED) == 0) { // help terminate others
2092             for (long oldSum = 0L;;) {            // repeat until stable
2093                 WorkQueue[] ws; WorkQueue w;
2094                 long checkSum = ctl;
2095                 if ((ws = workQueues) != null) {
2096                     for (int i = 0; i < ws.length; ++i) {
2097                         if ((w = ws[i]) != null) {
2098                             ForkJoinWorkerThread wt = w.owner;
2099                             w.cancelAll();        // clear queues
2100                             if (wt != null) {
2101                                 try {             // unblock join or park
2102                                     wt.interrupt();
2103                                 } catch (Throwable ignore) {
2104                                 }
2105                             }
2106                             checkSum += ((long)w.phase << 32) + w.base;
2107                         }
2108                     }
2109                 }
2110                 if (((md = mode) & TERMINATED) != 0 ||
2111                     (workQueues == ws && oldSum == (oldSum = checkSum)))
2112                     break;
2113             }
2114             if ((md & TERMINATED) != 0)
2115                 break;
2116             else if ((md & SMASK) + (short)(ctl >>> TC_SHIFT) > 0)
2117                 break;
2118             else if (MODE.compareAndSet(this, md, md | TERMINATED)) {
2119                 synchronized (this) {
2120                     notifyAll();                  // for awaitTermination
2121                 }
2122                 break;
2123             }
2124         }
2125         return true;
2126     }
2127 
2128     // Exported methods
2129 
2130     // Constructors
2131 
2132     /**
2133      * Creates a {@code ForkJoinPool} with parallelism equal to {@link
2134      * java.lang.Runtime#availableProcessors}, using defaults for all
2135      * other parameters (see {@link #ForkJoinPool(int,
2136      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2137      * int, int, int, Predicate, long, TimeUnit)}).
2138      *
2139      * @throws SecurityException if a security manager exists and
2140      *         the caller is not permitted to modify threads
2141      *         because it does not hold {@link
2142      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2143      */
2144     public ForkJoinPool() {
2145         this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
2146              defaultForkJoinWorkerThreadFactory, null, false,
2147              0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2148     }
2149 
2150     /**
2151      * Creates a {@code ForkJoinPool} with the indicated parallelism
2152      * level, using defaults for all other parameters (see {@link
2153      * #ForkJoinPool(int, ForkJoinWorkerThreadFactory,
2154      * UncaughtExceptionHandler, boolean, int, int, int, Predicate,
2155      * long, TimeUnit)}).
2156      *
2157      * @param parallelism the parallelism level
2158      * @throws IllegalArgumentException if parallelism less than or
2159      *         equal to zero, or greater than implementation limit
2160      * @throws SecurityException if a security manager exists and
2161      *         the caller is not permitted to modify threads
2162      *         because it does not hold {@link
2163      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2164      */
2165     public ForkJoinPool(int parallelism) {
2166         this(parallelism, defaultForkJoinWorkerThreadFactory, null, false,
2167              0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2168     }
2169 
2170     /**
2171      * Creates a {@code ForkJoinPool} with the given parameters (using
2172      * defaults for others -- see {@link #ForkJoinPool(int,
2173      * ForkJoinWorkerThreadFactory, UncaughtExceptionHandler, boolean,
2174      * int, int, int, Predicate, long, TimeUnit)}).
2175      *
2176      * @param parallelism the parallelism level. For default value,
2177      * use {@link java.lang.Runtime#availableProcessors}.
2178      * @param factory the factory for creating new threads. For default value,
2179      * use {@link #defaultForkJoinWorkerThreadFactory}.
2180      * @param handler the handler for internal worker threads that
2181      * terminate due to unrecoverable errors encountered while executing
2182      * tasks. For default value, use {@code null}.
2183      * @param asyncMode if true,
2184      * establishes local first-in-first-out scheduling mode for forked
2185      * tasks that are never joined. This mode may be more appropriate
2186      * than default locally stack-based mode in applications in which
2187      * worker threads only process event-style asynchronous tasks.
2188      * For default value, use {@code false}.
2189      * @throws IllegalArgumentException if parallelism less than or
2190      *         equal to zero, or greater than implementation limit
2191      * @throws NullPointerException if the factory is null
2192      * @throws SecurityException if a security manager exists and
2193      *         the caller is not permitted to modify threads
2194      *         because it does not hold {@link
2195      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2196      */
2197     public ForkJoinPool(int parallelism,
2198                         ForkJoinWorkerThreadFactory factory,
2199                         UncaughtExceptionHandler handler,
2200                         boolean asyncMode) {
2201         this(parallelism, factory, handler, asyncMode,
2202              0, MAX_CAP, 1, null, DEFAULT_KEEPALIVE, TimeUnit.MILLISECONDS);
2203     }
2204 
2205     /**
2206      * Creates a {@code ForkJoinPool} with the given parameters.
2207      *
2208      * @param parallelism the parallelism level. For default value,
2209      * use {@link java.lang.Runtime#availableProcessors}.
2210      *
2211      * @param factory the factory for creating new threads. For
2212      * default value, use {@link #defaultForkJoinWorkerThreadFactory}.
2213      *
2214      * @param handler the handler for internal worker threads that
2215      * terminate due to unrecoverable errors encountered while
2216      * executing tasks. For default value, use {@code null}.
2217      *
2218      * @param asyncMode if true, establishes local first-in-first-out
2219      * scheduling mode for forked tasks that are never joined. This
2220      * mode may be more appropriate than default locally stack-based
2221      * mode in applications in which worker threads only process
2222      * event-style asynchronous tasks.  For default value, use {@code
2223      * false}.
2224      *
2225      * @param corePoolSize the number of threads to keep in the pool
2226      * (unless timed out after an elapsed keep-alive). Normally (and
2227      * by default) this is the same value as the parallelism level,
2228      * but may be set to a larger value to reduce dynamic overhead if
2229      * tasks regularly block. Using a smaller value (for example
2230      * {@code 0}) has the same effect as the default.
2231      *
2232      * @param maximumPoolSize the maximum number of threads allowed.
2233      * When the maximum is reached, attempts to replace blocked
2234      * threads fail.  (However, because creation and termination of
2235      * different threads may overlap, and may be managed by the given
2236      * thread factory, this value may be transiently exceeded.)  To
2237      * arrange the same value as is used by default for the common
2238      * pool, use {@code 256} plus the {@code parallelism} level. (By
2239      * default, the common pool allows a maximum of 256 spare
2240      * threads.)  Using a value (for example {@code
2241      * Integer.MAX_VALUE}) larger than the implementation's total
2242      * thread limit has the same effect as using this limit (which is
2243      * the default).
2244      *
2245      * @param minimumRunnable the minimum allowed number of core
2246      * threads not blocked by a join or {@link ManagedBlocker}.  To
2247      * ensure progress, when too few unblocked threads exist and
2248      * unexecuted tasks may exist, new threads are constructed, up to
2249      * the given maximumPoolSize.  For the default value, use {@code
2250      * 1}, that ensures liveness.  A larger value might improve
2251      * throughput in the presence of blocked activities, but might
2252      * not, due to increased overhead.  A value of zero may be
2253      * acceptable when submitted tasks cannot have dependencies
2254      * requiring additional threads.
2255      *
2256      * @param saturate if non-null, a predicate invoked upon attempts
2257      * to create more than the maximum total allowed threads.  By
2258      * default, when a thread is about to block on a join or {@link
2259      * ManagedBlocker}, but cannot be replaced because the
2260      * maximumPoolSize would be exceeded, a {@link
2261      * RejectedExecutionException} is thrown.  But if this predicate
2262      * returns {@code true}, then no exception is thrown, so the pool
2263      * continues to operate with fewer than the target number of
2264      * runnable threads, which might not ensure progress.
2265      *
2266      * @param keepAliveTime the elapsed time since last use before
2267      * a thread is terminated (and then later replaced if needed).
2268      * For the default value, use {@code 60, TimeUnit.SECONDS}.
2269      *
2270      * @param unit the time unit for the {@code keepAliveTime} argument
2271      *
2272      * @throws IllegalArgumentException if parallelism is less than or
2273      *         equal to zero, or is greater than implementation limit,
2274      *         or if maximumPoolSize is less than parallelism,
2275      *         of if the keepAliveTime is less than or equal to zero.
2276      * @throws NullPointerException if the factory is null
2277      * @throws SecurityException if a security manager exists and
2278      *         the caller is not permitted to modify threads
2279      *         because it does not hold {@link
2280      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2281      * @since 9
2282      */
2283     public ForkJoinPool(int parallelism,
2284                         ForkJoinWorkerThreadFactory factory,
2285                         UncaughtExceptionHandler handler,
2286                         boolean asyncMode,
2287                         int corePoolSize,
2288                         int maximumPoolSize,
2289                         int minimumRunnable,
2290                         Predicate<? super ForkJoinPool> saturate,
2291                         long keepAliveTime,
2292                         TimeUnit unit) {
2293         // check, encode, pack parameters
2294         if (parallelism <= 0 || parallelism > MAX_CAP ||
2295             maximumPoolSize < parallelism || keepAliveTime <= 0L)
2296             throw new IllegalArgumentException();
2297         if (factory == null)
2298             throw new NullPointerException();
2299         long ms = Math.max(unit.toMillis(keepAliveTime), TIMEOUT_SLOP);
2300 
2301         int corep = Math.min(Math.max(corePoolSize, parallelism), MAX_CAP);
2302         long c = ((((long)(-corep)       << TC_SHIFT) & TC_MASK) |
2303                   (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2304         int m = parallelism | (asyncMode ? FIFO : 0);
2305         int maxSpares = Math.min(maximumPoolSize, MAX_CAP) - parallelism;
2306         int minAvail = Math.min(Math.max(minimumRunnable, 0), MAX_CAP);
2307         int b = ((minAvail - parallelism) & SMASK) | (maxSpares << SWIDTH);
2308         int n = (parallelism > 1) ? parallelism - 1 : 1; // at least 2 slots
2309         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2310         n = (n + 1) << 1; // power of two, including space for submission queues
2311 
2312         this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-";
2313         this.workQueues = new WorkQueue[n];
2314         this.factory = factory;
2315         this.ueh = handler;
2316         this.saturate = saturate;
2317         this.keepAlive = ms;
2318         this.bounds = b;
2319         this.mode = m;
2320         this.ctl = c;
2321         checkPermission();
2322     }
2323 
2324     private static Object newInstanceFromSystemProperty(String property)
2325         throws ReflectiveOperationException {
2326         String className = System.getProperty(property);
2327         return (className == null)
2328             ? null
2329             : ClassLoader.getSystemClassLoader().loadClass(className)
2330             .getConstructor().newInstance();
2331     }
2332 
2333     /**
2334      * Constructor for common pool using parameters possibly
2335      * overridden by system properties
2336      */
2337     private ForkJoinPool(byte forCommonPoolOnly) {
2338         int parallelism = -1;
2339         ForkJoinWorkerThreadFactory fac = null;
2340         UncaughtExceptionHandler handler = null;
2341         try {  // ignore exceptions in accessing/parsing properties
2342             String pp = System.getProperty
2343                 ("java.util.concurrent.ForkJoinPool.common.parallelism");
2344             if (pp != null)
2345                 parallelism = Integer.parseInt(pp);
2346             fac = (ForkJoinWorkerThreadFactory) newInstanceFromSystemProperty(
2347                 "java.util.concurrent.ForkJoinPool.common.threadFactory");
2348             handler = (UncaughtExceptionHandler) newInstanceFromSystemProperty(
2349                 "java.util.concurrent.ForkJoinPool.common.exceptionHandler");
2350         } catch (Exception ignore) {
2351         }
2352 
2353         if (fac == null) {
2354             if (System.getSecurityManager() == null)
2355                 fac = defaultForkJoinWorkerThreadFactory;
2356             else // use security-managed default
2357                 fac = new InnocuousForkJoinWorkerThreadFactory();
2358         }
2359         if (parallelism < 0 && // default 1 less than #cores
2360             (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
2361             parallelism = 1;
2362         if (parallelism > MAX_CAP)
2363             parallelism = MAX_CAP;
2364 
2365         long c = ((((long)(-parallelism) << TC_SHIFT) & TC_MASK) |
2366                   (((long)(-parallelism) << RC_SHIFT) & RC_MASK));
2367         int b = ((1 - parallelism) & SMASK) | (COMMON_MAX_SPARES << SWIDTH);
2368         int n = (parallelism > 1) ? parallelism - 1 : 1;
2369         n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16;
2370         n = (n + 1) << 1;
2371 
2372         this.workerNamePrefix = "ForkJoinPool.commonPool-worker-";
2373         this.workQueues = new WorkQueue[n];
2374         this.factory = fac;
2375         this.ueh = handler;
2376         this.saturate = null;
2377         this.keepAlive = DEFAULT_KEEPALIVE;
2378         this.bounds = b;
2379         this.mode = parallelism;
2380         this.ctl = c;
2381     }
2382 
2383     /**
2384      * Returns the common pool instance. This pool is statically
2385      * constructed; its run state is unaffected by attempts to {@link
2386      * #shutdown} or {@link #shutdownNow}. However this pool and any
2387      * ongoing processing are automatically terminated upon program
2388      * {@link System#exit}.  Any program that relies on asynchronous
2389      * task processing to complete before program termination should
2390      * invoke {@code commonPool().}{@link #awaitQuiescence awaitQuiescence},
2391      * before exit.
2392      *
2393      * @return the common pool instance
2394      * @since 1.8
2395      */
2396     public static ForkJoinPool commonPool() {
2397         // assert common != null : "static init error";
2398         return common;
2399     }
2400 
2401     // Execution methods
2402 
2403     /**
2404      * Performs the given task, returning its result upon completion.
2405      * If the computation encounters an unchecked Exception or Error,
2406      * it is rethrown as the outcome of this invocation.  Rethrown
2407      * exceptions behave in the same way as regular exceptions, but,
2408      * when possible, contain stack traces (as displayed for example
2409      * using {@code ex.printStackTrace()}) of both the current thread
2410      * as well as the thread actually encountering the exception;
2411      * minimally only the latter.
2412      *
2413      * @param task the task
2414      * @param <T> the type of the task's result
2415      * @return the task's result
2416      * @throws NullPointerException if the task is null
2417      * @throws RejectedExecutionException if the task cannot be
2418      *         scheduled for execution
2419      */
2420     public <T> T invoke(ForkJoinTask<T> task) {
2421         if (task == null)
2422             throw new NullPointerException();
2423         externalSubmit(task);
2424         return task.join();
2425     }
2426 
2427     /**
2428      * Arranges for (asynchronous) execution of the given task.
2429      *
2430      * @param task the task
2431      * @throws NullPointerException if the task is null
2432      * @throws RejectedExecutionException if the task cannot be
2433      *         scheduled for execution
2434      */
2435     public void execute(ForkJoinTask<?> task) {
2436         externalSubmit(task);
2437     }
2438 
2439     // AbstractExecutorService methods
2440 
2441     /**
2442      * @throws NullPointerException if the task is null
2443      * @throws RejectedExecutionException if the task cannot be
2444      *         scheduled for execution
2445      */
2446     public void execute(Runnable task) {
2447         if (task == null)
2448             throw new NullPointerException();
2449         ForkJoinTask<?> job;
2450         if (task instanceof ForkJoinTask<?>) // avoid re-wrap
2451             job = (ForkJoinTask<?>) task;
2452         else
2453             job = new ForkJoinTask.RunnableExecuteAction(task);
2454         externalSubmit(job);
2455     }
2456 
2457     /**
2458      * Submits a ForkJoinTask for execution.
2459      *
2460      * @param task the task to submit
2461      * @param <T> the type of the task's result
2462      * @return the task
2463      * @throws NullPointerException if the task is null
2464      * @throws RejectedExecutionException if the task cannot be
2465      *         scheduled for execution
2466      */
2467     public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
2468         return externalSubmit(task);
2469     }
2470 
2471     /**
2472      * @throws NullPointerException if the task is null
2473      * @throws RejectedExecutionException if the task cannot be
2474      *         scheduled for execution
2475      */
2476     public <T> ForkJoinTask<T> submit(Callable<T> task) {
2477         return externalSubmit(new ForkJoinTask.AdaptedCallable<T>(task));
2478     }
2479 
2480     /**
2481      * @throws NullPointerException if the task is null
2482      * @throws RejectedExecutionException if the task cannot be
2483      *         scheduled for execution
2484      */
2485     public <T> ForkJoinTask<T> submit(Runnable task, T result) {
2486         return externalSubmit(new ForkJoinTask.AdaptedRunnable<T>(task, result));
2487     }
2488 
2489     /**
2490      * @throws NullPointerException if the task is null
2491      * @throws RejectedExecutionException if the task cannot be
2492      *         scheduled for execution
2493      */
2494     @SuppressWarnings("unchecked")
2495     public ForkJoinTask<?> submit(Runnable task) {
2496         if (task == null)
2497             throw new NullPointerException();
2498         return externalSubmit((task instanceof ForkJoinTask<?>)
2499             ? (ForkJoinTask<Void>) task // avoid re-wrap
2500             : new ForkJoinTask.AdaptedRunnableAction(task));
2501     }
2502 
2503     /**
2504      * @throws NullPointerException       {@inheritDoc}
2505      * @throws RejectedExecutionException {@inheritDoc}
2506      */
2507     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
2508         // In previous versions of this class, this method constructed
2509         // a task to run ForkJoinTask.invokeAll, but now external
2510         // invocation of multiple tasks is at least as efficient.
2511         ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
2512 
2513         try {
2514             for (Callable<T> t : tasks) {
2515                 ForkJoinTask<T> f = new ForkJoinTask.AdaptedCallable<T>(t);
2516                 futures.add(f);
2517                 externalSubmit(f);
2518             }
2519             for (int i = 0, size = futures.size(); i < size; i++)
2520                 ((ForkJoinTask<?>)futures.get(i)).quietlyJoin();
2521             return futures;
2522         } catch (Throwable t) {
2523             for (int i = 0, size = futures.size(); i < size; i++)
2524                 futures.get(i).cancel(false);
2525             throw t;
2526         }
2527     }
2528 
2529     /**
2530      * Returns the factory used for constructing new workers.
2531      *
2532      * @return the factory used for constructing new workers
2533      */
2534     public ForkJoinWorkerThreadFactory getFactory() {
2535         return factory;
2536     }
2537 
2538     /**
2539      * Returns the handler for internal worker threads that terminate
2540      * due to unrecoverable errors encountered while executing tasks.
2541      *
2542      * @return the handler, or {@code null} if none
2543      */
2544     public UncaughtExceptionHandler getUncaughtExceptionHandler() {
2545         return ueh;
2546     }
2547 
2548     /**
2549      * Returns the targeted parallelism level of this pool.
2550      *
2551      * @return the targeted parallelism level of this pool
2552      */
2553     public int getParallelism() {
2554         int par = mode & SMASK;
2555         return (par > 0) ? par : 1;
2556     }
2557 
2558     /**
2559      * Returns the targeted parallelism level of the common pool.
2560      *
2561      * @return the targeted parallelism level of the common pool
2562      * @since 1.8
2563      */
2564     public static int getCommonPoolParallelism() {
2565         return COMMON_PARALLELISM;
2566     }
2567 
2568     /**
2569      * Returns the number of worker threads that have started but not
2570      * yet terminated.  The result returned by this method may differ
2571      * from {@link #getParallelism} when threads are created to
2572      * maintain parallelism when others are cooperatively blocked.
2573      *
2574      * @return the number of worker threads
2575      */
2576     public int getPoolSize() {
2577         return ((mode & SMASK) + (short)(ctl >>> TC_SHIFT));
2578     }
2579 
2580     /**
2581      * Returns {@code true} if this pool uses local first-in-first-out
2582      * scheduling mode for forked tasks that are never joined.
2583      *
2584      * @return {@code true} if this pool uses async mode
2585      */
2586     public boolean getAsyncMode() {
2587         return (mode & FIFO) != 0;
2588     }
2589 
2590     /**
2591      * Returns an estimate of the number of worker threads that are
2592      * not blocked waiting to join tasks or for other managed
2593      * synchronization. This method may overestimate the
2594      * number of running threads.
2595      *
2596      * @return the number of worker threads
2597      */
2598     public int getRunningThreadCount() {
2599         WorkQueue[] ws; WorkQueue w;
2600         VarHandle.acquireFence();
2601         int rc = 0;
2602         if ((ws = workQueues) != null) {
2603             for (int i = 1; i < ws.length; i += 2) {
2604                 if ((w = ws[i]) != null && w.isApparentlyUnblocked())
2605                     ++rc;
2606             }
2607         }
2608         return rc;
2609     }
2610 
2611     /**
2612      * Returns an estimate of the number of threads that are currently
2613      * stealing or executing tasks. This method may overestimate the
2614      * number of active threads.
2615      *
2616      * @return the number of active threads
2617      */
2618     public int getActiveThreadCount() {
2619         int r = (mode & SMASK) + (int)(ctl >> RC_SHIFT);
2620         return (r <= 0) ? 0 : r; // suppress momentarily negative values
2621     }
2622 
2623     /**
2624      * Returns {@code true} if all worker threads are currently idle.
2625      * An idle worker is one that cannot obtain a task to execute
2626      * because none are available to steal from other threads, and
2627      * there are no pending submissions to the pool. This method is
2628      * conservative; it might not return {@code true} immediately upon
2629      * idleness of all threads, but will eventually become true if
2630      * threads remain inactive.
2631      *
2632      * @return {@code true} if all threads are currently idle
2633      */
2634     public boolean isQuiescent() {
2635         for (;;) {
2636             long c = ctl;
2637             int md = mode, pc = md & SMASK;
2638             int tc = pc + (short)(c >>> TC_SHIFT);
2639             int rc = pc + (int)(c >> RC_SHIFT);
2640             if ((md & (STOP | TERMINATED)) != 0)
2641                 return true;
2642             else if (rc > 0)
2643                 return false;
2644             else {
2645                 WorkQueue[] ws; WorkQueue v;
2646                 if ((ws = workQueues) != null) {
2647                     for (int i = 1; i < ws.length; i += 2) {
2648                         if ((v = ws[i]) != null) {
2649                             if (v.source > 0)
2650                                 return false;
2651                             --tc;
2652                         }
2653                     }
2654                 }
2655                 if (tc == 0 && ctl == c)
2656                     return true;
2657             }
2658         }
2659     }
2660 
2661     /**
2662      * Returns an estimate of the total number of tasks stolen from
2663      * one thread's work queue by another. The reported value
2664      * underestimates the actual total number of steals when the pool
2665      * is not quiescent. This value may be useful for monitoring and
2666      * tuning fork/join programs: in general, steal counts should be
2667      * high enough to keep threads busy, but low enough to avoid
2668      * overhead and contention across threads.
2669      *
2670      * @return the number of steals
2671      */
2672     public long getStealCount() {
2673         long count = stealCount;
2674         WorkQueue[] ws; WorkQueue w;
2675         if ((ws = workQueues) != null) {
2676             for (int i = 1; i < ws.length; i += 2) {
2677                 if ((w = ws[i]) != null)
2678                     count += (long)w.nsteals & 0xffffffffL;
2679             }
2680         }
2681         return count;
2682     }
2683 
2684     /**
2685      * Returns an estimate of the total number of tasks currently held
2686      * in queues by worker threads (but not including tasks submitted
2687      * to the pool that have not begun executing). This value is only
2688      * an approximation, obtained by iterating across all threads in
2689      * the pool. This method may be useful for tuning task
2690      * granularities.
2691      *
2692      * @return the number of queued tasks
2693      */
2694     public long getQueuedTaskCount() {
2695         WorkQueue[] ws; WorkQueue w;
2696         VarHandle.acquireFence();
2697         int count = 0;
2698         if ((ws = workQueues) != null) {
2699             for (int i = 1; i < ws.length; i += 2) {
2700                 if ((w = ws[i]) != null)
2701                     count += w.queueSize();
2702             }
2703         }
2704         return count;
2705     }
2706 
2707     /**
2708      * Returns an estimate of the number of tasks submitted to this
2709      * pool that have not yet begun executing.  This method may take
2710      * time proportional to the number of submissions.
2711      *
2712      * @return the number of queued submissions
2713      */
2714     public int getQueuedSubmissionCount() {
2715         WorkQueue[] ws; WorkQueue w;
2716         VarHandle.acquireFence();
2717         int count = 0;
2718         if ((ws = workQueues) != null) {
2719             for (int i = 0; i < ws.length; i += 2) {
2720                 if ((w = ws[i]) != null)
2721                     count += w.queueSize();
2722             }
2723         }
2724         return count;
2725     }
2726 
2727     /**
2728      * Returns {@code true} if there are any tasks submitted to this
2729      * pool that have not yet begun executing.
2730      *
2731      * @return {@code true} if there are any queued submissions
2732      */
2733     public boolean hasQueuedSubmissions() {
2734         WorkQueue[] ws; WorkQueue w;
2735         VarHandle.acquireFence();
2736         if ((ws = workQueues) != null) {
2737             for (int i = 0; i < ws.length; i += 2) {
2738                 if ((w = ws[i]) != null && !w.isEmpty())
2739                     return true;
2740             }
2741         }
2742         return false;
2743     }
2744 
2745     /**
2746      * Removes and returns the next unexecuted submission if one is
2747      * available.  This method may be useful in extensions to this
2748      * class that re-assign work in systems with multiple pools.
2749      *
2750      * @return the next submission, or {@code null} if none
2751      */
2752     protected ForkJoinTask<?> pollSubmission() {
2753         return pollScan(true);
2754     }
2755 
2756     /**
2757      * Removes all available unexecuted submitted and forked tasks
2758      * from scheduling queues and adds them to the given collection,
2759      * without altering their execution status. These may include
2760      * artificially generated or wrapped tasks. This method is
2761      * designed to be invoked only when the pool is known to be
2762      * quiescent. Invocations at other times may not remove all
2763      * tasks. A failure encountered while attempting to add elements
2764      * to collection {@code c} may result in elements being in
2765      * neither, either or both collections when the associated
2766      * exception is thrown.  The behavior of this operation is
2767      * undefined if the specified collection is modified while the
2768      * operation is in progress.
2769      *
2770      * @param c the collection to transfer elements into
2771      * @return the number of elements transferred
2772      */
2773     protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
2774         WorkQueue[] ws; WorkQueue w; ForkJoinTask<?> t;
2775         VarHandle.acquireFence();
2776         int count = 0;
2777         if ((ws = workQueues) != null) {
2778             for (int i = 0; i < ws.length; ++i) {
2779                 if ((w = ws[i]) != null) {
2780                     while ((t = w.poll()) != null) {
2781                         c.add(t);
2782                         ++count;
2783                     }
2784                 }
2785             }
2786         }
2787         return count;
2788     }
2789 
2790     /**
2791      * Returns a string identifying this pool, as well as its state,
2792      * including indications of run state, parallelism level, and
2793      * worker and task counts.
2794      *
2795      * @return a string identifying this pool, as well as its state
2796      */
2797     public String toString() {
2798         // Use a single pass through workQueues to collect counts
2799         int md = mode; // read volatile fields first
2800         long c = ctl;
2801         long st = stealCount;
2802         long qt = 0L, qs = 0L; int rc = 0;
2803         WorkQueue[] ws; WorkQueue w;
2804         if ((ws = workQueues) != null) {
2805             for (int i = 0; i < ws.length; ++i) {
2806                 if ((w = ws[i]) != null) {
2807                     int size = w.queueSize();
2808                     if ((i & 1) == 0)
2809                         qs += size;
2810                     else {
2811                         qt += size;
2812                         st += (long)w.nsteals & 0xffffffffL;
2813                         if (w.isApparentlyUnblocked())
2814                             ++rc;
2815                     }
2816                 }
2817             }
2818         }
2819 
2820         int pc = (md & SMASK);
2821         int tc = pc + (short)(c >>> TC_SHIFT);
2822         int ac = pc + (int)(c >> RC_SHIFT);
2823         if (ac < 0) // ignore transient negative
2824             ac = 0;
2825         String level = ((md & TERMINATED) != 0 ? "Terminated" :
2826                         (md & STOP)       != 0 ? "Terminating" :
2827                         (md & SHUTDOWN)   != 0 ? "Shutting down" :
2828                         "Running");
2829         return super.toString() +
2830             "[" + level +
2831             ", parallelism = " + pc +
2832             ", size = " + tc +
2833             ", active = " + ac +
2834             ", running = " + rc +
2835             ", steals = " + st +
2836             ", tasks = " + qt +
2837             ", submissions = " + qs +
2838             "]";
2839     }
2840 
2841     /**
2842      * Possibly initiates an orderly shutdown in which previously
2843      * submitted tasks are executed, but no new tasks will be
2844      * accepted. Invocation has no effect on execution state if this
2845      * is the {@link #commonPool()}, and no additional effect if
2846      * already shut down.  Tasks that are in the process of being
2847      * submitted concurrently during the course of this method may or
2848      * may not be rejected.
2849      *
2850      * @throws SecurityException if a security manager exists and
2851      *         the caller is not permitted to modify threads
2852      *         because it does not hold {@link
2853      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2854      */
2855     public void shutdown() {
2856         checkPermission();
2857         tryTerminate(false, true);
2858     }
2859 
2860     /**
2861      * Possibly attempts to cancel and/or stop all tasks, and reject
2862      * all subsequently submitted tasks.  Invocation has no effect on
2863      * execution state if this is the {@link #commonPool()}, and no
2864      * additional effect if already shut down. Otherwise, tasks that
2865      * are in the process of being submitted or executed concurrently
2866      * during the course of this method may or may not be
2867      * rejected. This method cancels both existing and unexecuted
2868      * tasks, in order to permit termination in the presence of task
2869      * dependencies. So the method always returns an empty list
2870      * (unlike the case for some other Executors).
2871      *
2872      * @return an empty list
2873      * @throws SecurityException if a security manager exists and
2874      *         the caller is not permitted to modify threads
2875      *         because it does not hold {@link
2876      *         java.lang.RuntimePermission}{@code ("modifyThread")}
2877      */
2878     public List<Runnable> shutdownNow() {
2879         checkPermission();
2880         tryTerminate(true, true);
2881         return Collections.emptyList();
2882     }
2883 
2884     /**
2885      * Returns {@code true} if all tasks have completed following shut down.
2886      *
2887      * @return {@code true} if all tasks have completed following shut down
2888      */
2889     public boolean isTerminated() {
2890         return (mode & TERMINATED) != 0;
2891     }
2892 
2893     /**
2894      * Returns {@code true} if the process of termination has
2895      * commenced but not yet completed.  This method may be useful for
2896      * debugging. A return of {@code true} reported a sufficient
2897      * period after shutdown may indicate that submitted tasks have
2898      * ignored or suppressed interruption, or are waiting for I/O,
2899      * causing this executor not to properly terminate. (See the
2900      * advisory notes for class {@link ForkJoinTask} stating that
2901      * tasks should not normally entail blocking operations.  But if
2902      * they do, they must abort them on interrupt.)
2903      *
2904      * @return {@code true} if terminating but not yet terminated
2905      */
2906     public boolean isTerminating() {
2907         int md = mode;
2908         return (md & STOP) != 0 && (md & TERMINATED) == 0;
2909     }
2910 
2911     /**
2912      * Returns {@code true} if this pool has been shut down.
2913      *
2914      * @return {@code true} if this pool has been shut down
2915      */
2916     public boolean isShutdown() {
2917         return (mode & SHUTDOWN) != 0;
2918     }
2919 
2920     /**
2921      * Blocks until all tasks have completed execution after a
2922      * shutdown request, or the timeout occurs, or the current thread
2923      * is interrupted, whichever happens first. Because the {@link
2924      * #commonPool()} never terminates until program shutdown, when
2925      * applied to the common pool, this method is equivalent to {@link
2926      * #awaitQuiescence(long, TimeUnit)} but always returns {@code false}.
2927      *
2928      * @param timeout the maximum time to wait
2929      * @param unit the time unit of the timeout argument
2930      * @return {@code true} if this executor terminated and
2931      *         {@code false} if the timeout elapsed before termination
2932      * @throws InterruptedException if interrupted while waiting
2933      */
2934     public boolean awaitTermination(long timeout, TimeUnit unit)
2935         throws InterruptedException {
2936         if (Thread.interrupted())
2937             throw new InterruptedException();
2938         if (this == common) {
2939             awaitQuiescence(timeout, unit);
2940             return false;
2941         }
2942         long nanos = unit.toNanos(timeout);
2943         if (isTerminated())
2944             return true;
2945         if (nanos <= 0L)
2946             return false;
2947         long deadline = System.nanoTime() + nanos;
2948         synchronized (this) {
2949             for (;;) {
2950                 if (isTerminated())
2951                     return true;
2952                 if (nanos <= 0L)
2953                     return false;
2954                 long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
2955                 wait(millis > 0L ? millis : 1L);
2956                 nanos = deadline - System.nanoTime();
2957             }
2958         }
2959     }
2960 
2961     /**
2962      * If called by a ForkJoinTask operating in this pool, equivalent
2963      * in effect to {@link ForkJoinTask#helpQuiesce}. Otherwise,
2964      * waits and/or attempts to assist performing tasks until this
2965      * pool {@link #isQuiescent} or the indicated timeout elapses.
2966      *
2967      * @param timeout the maximum time to wait
2968      * @param unit the time unit of the timeout argument
2969      * @return {@code true} if quiescent; {@code false} if the
2970      * timeout elapsed.
2971      */
2972     public boolean awaitQuiescence(long timeout, TimeUnit unit) {
2973         long nanos = unit.toNanos(timeout);
2974         ForkJoinWorkerThread wt;
2975         Thread thread = Thread.currentThread();
2976         if ((thread instanceof ForkJoinWorkerThread) &&
2977             (wt = (ForkJoinWorkerThread)thread).pool == this) {
2978             helpQuiescePool(wt.workQueue);
2979             return true;
2980         }
2981         else {
2982             for (long startTime = System.nanoTime();;) {
2983                 ForkJoinTask<?> t;
2984                 if ((t = pollScan(false)) != null)
2985                     t.doExec();
2986                 else if (isQuiescent())
2987                     return true;
2988                 else if ((System.nanoTime() - startTime) > nanos)
2989                     return false;
2990                 else
2991                     Thread.yield(); // cannot block
2992             }
2993         }
2994     }
2995 
2996     /**
2997      * Waits and/or attempts to assist performing tasks indefinitely
2998      * until the {@link #commonPool()} {@link #isQuiescent}.
2999      */
3000     static void quiesceCommonPool() {
3001         common.awaitQuiescence(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
3002     }
3003 
3004     /**
3005      * Interface for extending managed parallelism for tasks running
3006      * in {@link ForkJoinPool}s.
3007      *
3008      * <p>A {@code ManagedBlocker} provides two methods.  Method
3009      * {@link #isReleasable} must return {@code true} if blocking is
3010      * not necessary. Method {@link #block} blocks the current thread
3011      * if necessary (perhaps internally invoking {@code isReleasable}
3012      * before actually blocking). These actions are performed by any
3013      * thread invoking {@link ForkJoinPool#managedBlock(ManagedBlocker)}.
3014      * The unusual methods in this API accommodate synchronizers that
3015      * may, but don't usually, block for long periods. Similarly, they
3016      * allow more efficient internal handling of cases in which
3017      * additional workers may be, but usually are not, needed to
3018      * ensure sufficient parallelism.  Toward this end,
3019      * implementations of method {@code isReleasable} must be amenable
3020      * to repeated invocation.
3021      *
3022      * <p>For example, here is a ManagedBlocker based on a
3023      * ReentrantLock:
3024      * <pre> {@code
3025      * class ManagedLocker implements ManagedBlocker {
3026      *   final ReentrantLock lock;
3027      *   boolean hasLock = false;
3028      *   ManagedLocker(ReentrantLock lock) { this.lock = lock; }
3029      *   public boolean block() {
3030      *     if (!hasLock)
3031      *       lock.lock();
3032      *     return true;
3033      *   }
3034      *   public boolean isReleasable() {
3035      *     return hasLock || (hasLock = lock.tryLock());
3036      *   }
3037      * }}</pre>
3038      *
3039      * <p>Here is a class that possibly blocks waiting for an
3040      * item on a given queue:
3041      * <pre> {@code
3042      * class QueueTaker<E> implements ManagedBlocker {
3043      *   final BlockingQueue<E> queue;
3044      *   volatile E item = null;
3045      *   QueueTaker(BlockingQueue<E> q) { this.queue = q; }
3046      *   public boolean block() throws InterruptedException {
3047      *     if (item == null)
3048      *       item = queue.take();
3049      *     return true;
3050      *   }
3051      *   public boolean isReleasable() {
3052      *     return item != null || (item = queue.poll()) != null;
3053      *   }
3054      *   public E getItem() { // call after pool.managedBlock completes
3055      *     return item;
3056      *   }
3057      * }}</pre>
3058      */
3059     public static interface ManagedBlocker {
3060         /**
3061          * Possibly blocks the current thread, for example waiting for
3062          * a lock or condition.
3063          *
3064          * @return {@code true} if no additional blocking is necessary
3065          * (i.e., if isReleasable would return true)
3066          * @throws InterruptedException if interrupted while waiting
3067          * (the method is not required to do so, but is allowed to)
3068          */
3069         boolean block() throws InterruptedException;
3070 
3071         /**
3072          * Returns {@code true} if blocking is unnecessary.
3073          * @return {@code true} if blocking is unnecessary
3074          */
3075         boolean isReleasable();
3076     }
3077 
3078     /**
3079      * Runs the given possibly blocking task.  When {@linkplain
3080      * ForkJoinTask#inForkJoinPool() running in a ForkJoinPool}, this
3081      * method possibly arranges for a spare thread to be activated if
3082      * necessary to ensure sufficient parallelism while the current
3083      * thread is blocked in {@link ManagedBlocker#block blocker.block()}.
3084      *
3085      * <p>This method repeatedly calls {@code blocker.isReleasable()} and
3086      * {@code blocker.block()} until either method returns {@code true}.
3087      * Every call to {@code blocker.block()} is preceded by a call to
3088      * {@code blocker.isReleasable()} that returned {@code false}.
3089      *
3090      * <p>If not running in a ForkJoinPool, this method is
3091      * behaviorally equivalent to
3092      * <pre> {@code
3093      * while (!blocker.isReleasable())
3094      *   if (blocker.block())
3095      *     break;}</pre>
3096      *
3097      * If running in a ForkJoinPool, the pool may first be expanded to
3098      * ensure sufficient parallelism available during the call to
3099      * {@code blocker.block()}.
3100      *
3101      * @param blocker the blocker task
3102      * @throws InterruptedException if {@code blocker.block()} did so
3103      */
3104     public static void managedBlock(ManagedBlocker blocker)
3105         throws InterruptedException {
3106         if (blocker == null) throw new NullPointerException();
3107         ForkJoinPool p;
3108         ForkJoinWorkerThread wt;
3109         WorkQueue w;
3110         Thread t = Thread.currentThread();
3111         if ((t instanceof ForkJoinWorkerThread) &&
3112             (p = (wt = (ForkJoinWorkerThread)t).pool) != null &&
3113             (w = wt.workQueue) != null) {
3114             int block;
3115             while (!blocker.isReleasable()) {
3116                 if ((block = p.tryCompensate(w)) != 0) {
3117                     try {
3118                         do {} while (!blocker.isReleasable() &&
3119                                      !blocker.block());
3120                     } finally {
3121                         CTL.getAndAdd(p, (block > 0) ? RC_UNIT : 0L);
3122                     }
3123                     break;
3124                 }
3125             }
3126         }
3127         else {
3128             do {} while (!blocker.isReleasable() &&
3129                          !blocker.block());
3130         }
3131     }
3132 
3133     /**
3134      * If the given executor is a ForkJoinPool, poll and execute
3135      * AsynchronousCompletionTasks from worker's queue until none are
3136      * available or blocker is released.
3137      */
3138     static void helpAsyncBlocker(Executor e, ManagedBlocker blocker) {
3139         if (e instanceof ForkJoinPool) {
3140             WorkQueue w; ForkJoinWorkerThread wt; WorkQueue[] ws; int r, n;
3141             ForkJoinPool p = (ForkJoinPool)e;
3142             Thread thread = Thread.currentThread();
3143             if (thread instanceof ForkJoinWorkerThread &&
3144                 (wt = (ForkJoinWorkerThread)thread).pool == p)
3145                 w = wt.workQueue;
3146             else if ((r = ThreadLocalRandom.getProbe()) != 0 &&
3147                      (ws = p.workQueues) != null && (n = ws.length) > 0)
3148                 w = ws[(n - 1) & r & SQMASK];
3149             else
3150                 w = null;
3151             if (w != null)
3152                 w.helpAsyncBlocker(blocker);
3153         }
3154     }
3155 
3156     // AbstractExecutorService overrides.  These rely on undocumented
3157     // fact that ForkJoinTask.adapt returns ForkJoinTasks that also
3158     // implement RunnableFuture.
3159 
3160     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
3161         return new ForkJoinTask.AdaptedRunnable<T>(runnable, value);
3162     }
3163 
3164     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
3165         return new ForkJoinTask.AdaptedCallable<T>(callable);
3166     }
3167 
3168     // VarHandle mechanics
3169     private static final VarHandle CTL;
3170     private static final VarHandle MODE;
3171     static final VarHandle QA;
3172 
3173     static {
3174         try {
3175             MethodHandles.Lookup l = MethodHandles.lookup();
3176             CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
3177             MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
3178             QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
3179         } catch (ReflectiveOperationException e) {
3180             throw new ExceptionInInitializerError(e);
3181         }
3182 
3183         // Reduce the risk of rare disastrous classloading in first call to
3184         // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
3185         Class<?> ensureLoaded = LockSupport.class;
3186 
3187         int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
3188         try {
3189             String p = System.getProperty
3190                 ("java.util.concurrent.ForkJoinPool.common.maximumSpares");
3191             if (p != null)
3192                 commonMaxSpares = Integer.parseInt(p);
3193         } catch (Exception ignore) {}
3194         COMMON_MAX_SPARES = commonMaxSpares;
3195 
3196         defaultForkJoinWorkerThreadFactory =
3197             new DefaultForkJoinWorkerThreadFactory();
3198         modifyThreadPermission = new RuntimePermission("modifyThread");
3199 
3200         common = AccessController.doPrivileged(new PrivilegedAction<>() {
3201             public ForkJoinPool run() {
3202                 return new ForkJoinPool((byte)0); }});
3203 
3204         COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
3205     }
3206 
3207     /**
3208      * Factory for innocuous worker threads.
3209      */
3210     private static final class InnocuousForkJoinWorkerThreadFactory
3211         implements ForkJoinWorkerThreadFactory {
3212 
3213         /**
3214          * An ACC to restrict permissions for the factory itself.
3215          * The constructed workers have no permissions set.
3216          */
3217         private static final AccessControlContext ACC = contextWithPermissions(
3218             modifyThreadPermission,
3219             new RuntimePermission("enableContextClassLoaderOverride"),
3220             new RuntimePermission("modifyThreadGroup"),
3221             new RuntimePermission("getClassLoader"),
3222             new RuntimePermission("setContextClassLoader"));
3223 
3224         public final ForkJoinWorkerThread newThread(ForkJoinPool pool) {
3225             return AccessController.doPrivileged(
3226                 new PrivilegedAction<>() {
3227                     public ForkJoinWorkerThread run() {
3228                         return new ForkJoinWorkerThread.
3229                             InnocuousForkJoinWorkerThread(pool); }},
3230                 ACC);
3231         }
3232     }
3233 }
3234