• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 import java.io.Serializable;
39 import java.lang.reflect.Constructor;
40 import java.util.Collection;
41 import java.util.List;
42 import java.util.RandomAccess;
43 import java.util.concurrent.locks.LockSupport;
44 import jdk.internal.misc.Unsafe;
45 
46 // BEGIN android-note
47 // removed java 9 code
48 // END android-note
49 
50 /**
51  * Abstract base class for tasks that run within a {@link ForkJoinPool}.
52  * A {@code ForkJoinTask} is a thread-like entity that is much
53  * lighter weight than a normal thread.  Huge numbers of tasks and
54  * subtasks may be hosted by a small number of actual threads in a
55  * ForkJoinPool, at the price of some usage limitations.
56  *
57  * <p>A "main" {@code ForkJoinTask} begins execution when it is
58  * explicitly submitted to a {@link ForkJoinPool}, or, if not already
59  * engaged in a ForkJoin computation, commenced in the {@link
60  * ForkJoinPool#commonPool()} via {@link #fork}, {@link #invoke}, or
61  * related methods.  Once started, it will usually in turn start other
62  * subtasks.  As indicated by the name of this class, many programs
63  * using {@code ForkJoinTask} employ only methods {@link #fork} and
64  * {@link #join}, or derivatives such as {@link
65  * #invokeAll(ForkJoinTask...) invokeAll}.  However, this class also
66  * provides a number of other methods that can come into play in
67  * advanced usages, as well as extension mechanics that allow support
68  * of new forms of fork/join processing.
69  *
70  * <p>A {@code ForkJoinTask} is a lightweight form of {@link Future}.
71  * The efficiency of {@code ForkJoinTask}s stems from a set of
72  * restrictions (that are only partially statically enforceable)
73  * reflecting their main use as computational tasks calculating pure
74  * functions or operating on purely isolated objects.  The primary
75  * coordination mechanisms are {@link #fork}, that arranges
76  * asynchronous execution, and {@link #join}, that doesn't proceed
77  * until the task's result has been computed.  Computations should
78  * ideally avoid {@code synchronized} methods or blocks, and should
79  * minimize other blocking synchronization apart from joining other
80  * tasks or using synchronizers such as Phasers that are advertised to
81  * cooperate with fork/join scheduling. Subdividable tasks should also
82  * not perform blocking I/O, and should ideally access variables that
83  * are completely independent of those accessed by other running
84  * tasks. These guidelines are loosely enforced by not permitting
85  * checked exceptions such as {@code IOExceptions} to be
86  * thrown. However, computations may still encounter unchecked
87  * exceptions, that are rethrown to callers attempting to join
88  * them. These exceptions may additionally include {@link
89  * RejectedExecutionException} stemming from internal resource
90  * exhaustion, such as failure to allocate internal task
91  * queues. Rethrown exceptions behave in the same way as regular
92  * exceptions, but, when possible, contain stack traces (as displayed
93  * for example using {@code ex.printStackTrace()}) of both the thread
94  * that initiated the computation as well as the thread actually
95  * encountering the exception; minimally only the latter.
96  *
97  * <p>It is possible to define and use ForkJoinTasks that may block,
98  * but doing so requires three further considerations: (1) Completion
99  * of few if any <em>other</em> tasks should be dependent on a task
100  * that blocks on external synchronization or I/O. Event-style async
101  * tasks that are never joined (for example, those subclassing {@link
102  * CountedCompleter}) often fall into this category.  (2) To minimize
103  * resource impact, tasks should be small; ideally performing only the
104  * (possibly) blocking action. (3) Unless the {@link
105  * ForkJoinPool.ManagedBlocker} API is used, or the number of possibly
106  * blocked tasks is known to be less than the pool's {@link
107  * ForkJoinPool#getParallelism} level, the pool cannot guarantee that
108  * enough threads will be available to ensure progress or good
109  * performance.
110  *
111  * <p>The primary method for awaiting completion and extracting
112  * results of a task is {@link #join}, but there are several variants:
113  * The {@link Future#get} methods support interruptible and/or timed
114  * waits for completion and report results using {@code Future}
115  * conventions. Method {@link #invoke} is semantically
116  * equivalent to {@code fork(); join()} but always attempts to begin
117  * execution in the current thread. The "<em>quiet</em>" forms of
118  * these methods do not extract results or report exceptions. These
119  * may be useful when a set of tasks are being executed, and you need
120  * to delay processing of results or exceptions until all complete.
121  * Method {@code invokeAll} (available in multiple versions)
122  * performs the most common form of parallel invocation: forking a set
123  * of tasks and joining them all.
124  *
125  * <p>In the most typical usages, a fork-join pair act like a call
126  * (fork) and return (join) from a parallel recursive function. As is
127  * the case with other forms of recursive calls, returns (joins)
128  * should be performed innermost-first. For example, {@code a.fork();
129  * b.fork(); b.join(); a.join();} is likely to be substantially more
130  * efficient than joining {@code a} before {@code b}.
131  *
132  * <p>The execution status of tasks may be queried at several levels
133  * of detail: {@link #isDone} is true if a task completed in any way
134  * (including the case where a task was cancelled without executing);
135  * {@link #isCompletedNormally} is true if a task completed without
136  * cancellation or encountering an exception; {@link #isCancelled} is
137  * true if the task was cancelled (in which case {@link #getException}
138  * returns a {@link CancellationException}); and
139  * {@link #isCompletedAbnormally} is true if a task was either
140  * cancelled or encountered an exception, in which case {@link
141  * #getException} will return either the encountered exception or
142  * {@link CancellationException}.
143  *
144  * <p>The ForkJoinTask class is not usually directly subclassed.
145  * Instead, you subclass one of the abstract classes that support a
146  * particular style of fork/join processing, typically {@link
147  * RecursiveAction} for most computations that do not return results,
148  * {@link RecursiveTask} for those that do, and {@link
149  * CountedCompleter} for those in which completed actions trigger
150  * other actions.  Normally, a concrete ForkJoinTask subclass declares
151  * fields comprising its parameters, established in a constructor, and
152  * then defines a {@code compute} method that somehow uses the control
153  * methods supplied by this base class.
154  *
155  * <p>Method {@link #join} and its variants are appropriate for use
156  * only when completion dependencies are acyclic; that is, the
157  * parallel computation can be described as a directed acyclic graph
158  * (DAG). Otherwise, executions may encounter a form of deadlock as
159  * tasks cyclically wait for each other.  However, this framework
160  * supports other methods and techniques (for example the use of
161  * {@link Phaser}, {@link #helpQuiesce}, and {@link #complete}) that
162  * may be of use in constructing custom subclasses for problems that
163  * are not statically structured as DAGs. To support such usages, a
164  * ForkJoinTask may be atomically <em>tagged</em> with a {@code short}
165  * value using {@link #setForkJoinTaskTag} or {@link
166  * #compareAndSetForkJoinTaskTag} and checked using {@link
167  * #getForkJoinTaskTag}. The ForkJoinTask implementation does not use
168  * these {@code protected} methods or tags for any purpose, but they
169  * may be of use in the construction of specialized subclasses.  For
170  * example, parallel graph traversals can use the supplied methods to
171  * avoid revisiting nodes/tasks that have already been processed.
172  * (Method names for tagging are bulky in part to encourage definition
173  * of methods that reflect their usage patterns.)
174  *
175  * <p>Most base support methods are {@code final}, to prevent
176  * overriding of implementations that are intrinsically tied to the
177  * underlying lightweight task scheduling framework.  Developers
178  * creating new basic styles of fork/join processing should minimally
179  * implement {@code protected} methods {@link #exec}, {@link
180  * #setRawResult}, and {@link #getRawResult}, while also introducing
181  * an abstract computational method that can be implemented in its
182  * subclasses, possibly relying on other {@code protected} methods
183  * provided by this class.
184  *
185  * <p>ForkJoinTasks should perform relatively small amounts of
186  * computation. Large tasks should be split into smaller subtasks,
187  * usually via recursive decomposition. As a very rough rule of thumb,
188  * a task should perform more than 100 and less than 10000 basic
189  * computational steps, and should avoid indefinite looping. If tasks
190  * are too big, then parallelism cannot improve throughput. If too
191  * small, then memory and internal task maintenance overhead may
192  * overwhelm processing.
193  *
194  * <p>This class provides {@code adapt} methods for {@link Runnable}
195  * and {@link Callable}, that may be of use when mixing execution of
196  * {@code ForkJoinTasks} with other kinds of tasks. When all tasks are
197  * of this form, consider using a pool constructed in <em>asyncMode</em>.
198  *
199  * <p>ForkJoinTasks are {@code Serializable}, which enables them to be
200  * used in extensions such as remote execution frameworks. It is
201  * sensible to serialize tasks only before or after, but not during,
202  * execution. Serialization is not relied on during execution itself.
203  *
204  * @param <V> the type of the result of the task
205  *
206  * @since 1.7
207  * @author Doug Lea
208  */
209 public abstract class ForkJoinTask<V> implements Future<V>, Serializable {
210 
211     /*
212      * See the internal documentation of class ForkJoinPool for a
213      * general implementation overview.  ForkJoinTasks are mainly
214      * responsible for maintaining their "status" field amidst relays
215      * to methods in ForkJoinWorkerThread and ForkJoinPool.
216      *
217      * The methods of this class are more-or-less layered into
218      * (1) basic status maintenance
219      * (2) execution and awaiting completion
220      * (3) user-level methods that additionally report results.
221      * This is sometimes hard to see because this file orders exported
222      * methods in a way that flows well in javadocs.
223      *
224      * Revision notes: This class uses jdk-internal Unsafe for atomics
225      * and special memory modes, rather than VarHandles, to avoid
226      * initialization dependencies in other jdk components that
227      * require early parallelism. It also simplifies handling of
228      * pool-submitted tasks, among other minor improvements.
229      */
230 
231     /**
232      * Nodes for threads waiting for completion, or holding a thrown
233      * exception (never both). Waiting threads prepend nodes
234      * Treiber-stack-style.  Signallers detach and unpark
235      * waiters. Cancelled waiters try to unsplice.
236      */
237     static final class Aux {
238         final Thread thread;
239         final Throwable ex;  // null if a waiter
240         Aux next;            // accessed only via memory-acquire chains
Aux(Thread thread, Throwable ex)241         Aux(Thread thread, Throwable ex) {
242             this.thread = thread;
243             this.ex = ex;
244         }
casNext(Aux c, Aux v)245         final boolean casNext(Aux c, Aux v) { // used only in cancellation
246             return U.compareAndSetReference(this, NEXT, c, v);
247         }
248         private static final Unsafe U;
249         private static final long NEXT;
250         static {
251             U = Unsafe.getUnsafe();
252             NEXT = U.objectFieldOffset(Aux.class, "next");
253         }
254     }
255 
256     /*
257      * The status field holds bits packed into a single int to ensure
258      * atomicity.  Status is initially zero, and takes on nonnegative
259      * values until completed, upon which it holds (sign bit) DONE,
260      * possibly with ABNORMAL (cancelled or exceptional) and THROWN
261      * (in which case an exception has been stored). A value of
262      * ABNORMAL without DONE signifies an interrupted wait.  These
263      * control bits occupy only (some of) the upper half (16 bits) of
264      * status field. The lower bits are used for user-defined tags.
265      */
266     static final int DONE         = 1 << 31; // must be negative
267     static final int ABNORMAL     = 1 << 16;
268     static final int THROWN       = 1 << 17;
269     static final int SMASK        = 0xffff;  // short bits for tags
270     static final int UNCOMPENSATE = 1 << 16; // helpJoin return sentinel
271     static final int POOLSUBMIT   = 1 << 18; // for pool.submit vs fork
272 
273     // flags for awaitDone (in addition to above)
274     static final int RAN           = 1;
275     static final int INTERRUPTIBLE = 2;
276     static final int TIMED         = 4;
277 
278     // Fields
279     volatile int status;                // accessed directly by pool and workers
280     private transient volatile Aux aux; // either waiters or thrown Exception
281 
282     // Support for atomic operations
283     private static final Unsafe U;
284     private static final long STATUS;
285     private static final long AUX;
getAndBitwiseOrStatus(int v)286     private int getAndBitwiseOrStatus(int v) {
287         return U.getAndBitwiseOrInt(this, STATUS, v);
288     }
casStatus(int c, int v)289     private boolean casStatus(int c, int v) {
290         return U.compareAndSetInt(this, STATUS, c, v);
291     }
casAux(Aux c, Aux v)292     private boolean casAux(Aux c, Aux v) {
293         return U.compareAndSetReference(this, AUX, c, v);
294     }
295 
296     /**
297      * Marks this task as an external pool submission.
298      */
markPoolSubmission()299     final void markPoolSubmission() {
300         getAndBitwiseOrStatus(POOLSUBMIT);
301     }
302 
303     /** Removes and unparks waiters */
signalWaiters()304     private void signalWaiters() {
305         for (Aux a; (a = aux) != null && a.ex == null; ) {
306             if (casAux(a, null)) {             // detach entire list
307                 for (Thread t; a != null; a = a.next) {
308                     if ((t = a.thread) != Thread.currentThread() && t != null)
309                         LockSupport.unpark(t); // don't self-signal
310                 }
311                 break;
312             }
313         }
314     }
315 
316     /**
317      * Sets DONE status and wakes up threads waiting to join this task.
318      * @return status on exit
319      */
setDone()320     private int setDone() {
321         int s = getAndBitwiseOrStatus(DONE) | DONE;
322         signalWaiters();
323         return s;
324     }
325 
326     /**
327      * Sets ABNORMAL DONE status unless already done, and wakes up threads
328      * waiting to join this task.
329      * @return status on exit
330      */
trySetCancelled()331     private int trySetCancelled() {
332         int s;
333         do {} while ((s = status) >= 0 && !casStatus(s, s |= (DONE | ABNORMAL)));
334         signalWaiters();
335         return s;
336     }
337 
338     /**
339      * Records exception and sets ABNORMAL THROWN DONE status unless
340      * already done, and wakes up threads waiting to join this task.
341      * If losing a race with setDone or trySetCancelled, the exception
342      * may be recorded but not reported.
343      *
344      * @return status on exit
345      */
trySetThrown(Throwable ex)346     final int trySetThrown(Throwable ex) {
347         Aux h = new Aux(Thread.currentThread(), ex), p = null;
348         boolean installed = false;
349         int s;
350         while ((s = status) >= 0) {
351             Aux a;
352             if (!installed && ((a = aux) == null || a.ex == null) &&
353                 (installed = casAux(a, h)))
354                 p = a; // list of waiters replaced by h
355             if (installed && casStatus(s, s |= (DONE | ABNORMAL | THROWN)))
356                 break;
357         }
358         for (; p != null; p = p.next)
359             LockSupport.unpark(p.thread);
360         return s;
361     }
362 
363     /**
364      * Records exception unless already done. Overridable in subclasses.
365      *
366      * @return status on exit
367      */
trySetException(Throwable ex)368     int trySetException(Throwable ex) {
369         return trySetThrown(ex);
370     }
371 
372     /**
373      * Constructor for subclasses to call.
374      */
ForkJoinTask()375     public ForkJoinTask() {}
376 
isExceptionalStatus(int s)377     static boolean isExceptionalStatus(int s) {  // needed by subclasses
378         return (s & THROWN) != 0;
379     }
380 
381     /**
382      * Unless done, calls exec and records status if completed, but
383      * doesn't wait for completion otherwise.
384      *
385      * @return status on exit from this method
386      */
doExec()387     final int doExec() {
388         int s; boolean completed;
389         if ((s = status) >= 0) {
390             try {
391                 completed = exec();
392             } catch (Throwable rex) {
393                 s = trySetException(rex);
394                 completed = false;
395             }
396             if (completed)
397                 s = setDone();
398         }
399         return s;
400     }
401 
402     /**
403      * Helps and/or waits for completion from join, get, or invoke;
404      * called from either internal or external threads.
405      *
406      * @param how flags for POOLSUBMIT, RAN, INTERRUPTIBLE, TIMED
407      * @param deadline if timed, timeout deadline
408      * @return ABNORMAL if interrupted, else status on exit
409      */
awaitDone(int how, long deadline)410     private int awaitDone(int how, long deadline) {
411         int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool p;
412         ForkJoinPool.WorkQueue q = null;
413         boolean timed = (how & TIMED) != 0;
414         boolean owned = false, uncompensate = false;
415         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
416             owned = true;
417             q = (wt = (ForkJoinWorkerThread)t).workQueue;
418             p = wt.pool;
419         }
420         else if ((p = ForkJoinPool.common) != null && (how & POOLSUBMIT) == 0)
421             q = p.externalQueue();
422         if (q != null && p != null) { // try helping
423             if (this instanceof CountedCompleter)
424                 s = p.helpComplete(this, q, owned, timed);
425             else if ((how & RAN) != 0 ||
426                      (s = q.tryRemoveAndExec(this, owned)) >= 0)
427                 s = (owned) ? p.helpJoin(this, q, timed) : 0;
428             if (s < 0)
429                 return s;
430             if (s == UNCOMPENSATE)
431                 uncompensate = true;
432         }
433         Aux node = null;
434         long ns = 0L;
435         boolean interrupted = false, queued = false;
436         for (;;) {                    // install node and await signal
437             Aux a;
438             if ((s = status) < 0)
439                 break;
440             else if (node == null)
441                 node = new Aux(Thread.currentThread(), null);
442             else if (!queued) {
443                 if (((a = aux) == null || a.ex == null) &&
444                     (queued = casAux(node.next = a, node)))
445                     LockSupport.setCurrentBlocker(this);
446             }
447             else if (timed && (ns = deadline - System.nanoTime()) <= 0) {
448                 s = 0;
449                 break;
450             }
451             else if (Thread.interrupted()) {
452                 interrupted = true;
453                 if ((how & POOLSUBMIT) != 0 && p != null && p.runState < 0)
454                     cancelIgnoringExceptions(this); // cancel on shutdown
455                 else if ((how & INTERRUPTIBLE) != 0) {
456                     s = ABNORMAL;
457                     break;
458                 }
459             }
460             else if ((s = status) < 0) // recheck
461                 break;
462             else if (timed)
463                 LockSupport.parkNanos(ns);
464             else
465                 LockSupport.park();
466         }
467         if (uncompensate)
468             p.uncompensate();
469 
470         if (queued) {
471             LockSupport.setCurrentBlocker(null);
472             if (s >= 0) { // cancellation similar to AbstractQueuedSynchronizer
473                 outer: for (Aux a; (a = aux) != null && a.ex == null; ) {
474                     for (Aux trail = null;;) {
475                         Aux next = a.next;
476                         if (a == node) {
477                             if (trail != null)
478                                 trail.casNext(trail, next);
479                             else if (casAux(a, next))
480                                 break outer; // cannot be re-encountered
481                             break;           // restart
482                         } else {
483                             trail = a;
484                             if ((a = next) == null)
485                                 break outer;
486                         }
487                     }
488                 }
489             }
490             else {
491                 signalWaiters();             // help clean or signal
492                 if (interrupted)
493                     Thread.currentThread().interrupt();
494             }
495         }
496         return s;
497     }
498 
499     /**
500      * Cancels, ignoring any exceptions thrown by cancel.  Cancel is
501      * spec'ed not to throw any exceptions, but if it does anyway, we
502      * have no recourse, so guard against this case.
503      */
cancelIgnoringExceptions(Future<?> t)504     static final void cancelIgnoringExceptions(Future<?> t) {
505         if (t != null) {
506             try {
507                 t.cancel(true);
508             } catch (Throwable ignore) {
509             }
510         }
511     }
512 
513     /**
514      * Returns a rethrowable exception for this task, if available.
515      * To provide accurate stack traces, if the exception was not
516      * thrown by the current thread, we try to create a new exception
517      * of the same type as the one thrown, but with the recorded
518      * exception as its cause. If there is no such constructor, we
519      * instead try to use a no-arg constructor, followed by initCause,
520      * to the same effect. If none of these apply, or any fail due to
521      * other exceptions, we return the recorded exception, which is
522      * still correct, although it may contain a misleading stack
523      * trace.
524      *
525      * @return the exception, or null if none
526      */
getThrowableException()527     private Throwable getThrowableException() {
528         Throwable ex; Aux a;
529         if ((a = aux) == null)
530             ex = null;
531         else if ((ex = a.ex) != null && a.thread != Thread.currentThread()) {
532             try {
533                 Constructor<?> noArgCtor = null, oneArgCtor = null;
534                 for (Constructor<?> c : ex.getClass().getConstructors()) {
535                     Class<?>[] ps = c.getParameterTypes();
536                     if (ps.length == 0)
537                         noArgCtor = c;
538                     else if (ps.length == 1 && ps[0] == Throwable.class) {
539                         oneArgCtor = c;
540                         break;
541                     }
542                 }
543                 if (oneArgCtor != null)
544                     ex = (Throwable)oneArgCtor.newInstance(ex);
545                 else if (noArgCtor != null) {
546                     Throwable rx = (Throwable)noArgCtor.newInstance();
547                     rx.initCause(ex);
548                     ex = rx;
549                 }
550             } catch (Exception ignore) {
551             }
552         }
553         return ex;
554     }
555 
556     /**
557      * Returns exception associated with the given status, or null if none.
558      */
getException(int s)559     private Throwable getException(int s) {
560         Throwable ex = null;
561         if ((s & ABNORMAL) != 0 && (ex = getThrowableException()) == null)
562             ex = new CancellationException();
563         return ex;
564     }
565 
566     /**
567      * Throws exception associated with the given status, or
568      * CancellationException if none recorded.
569      */
reportException(int s)570     private void reportException(int s) {
571         ForkJoinTask.<RuntimeException>uncheckedThrow(getThrowableException());
572     }
573 
574     /**
575      * Throws exception for (timed or untimed) get, wrapping if
576      * necessary in an ExecutionException.
577      */
reportExecutionException(int s)578     private void reportExecutionException(int s) {
579         Throwable ex = null, rx;
580         if (s == ABNORMAL)
581             ex = new InterruptedException();
582         else if (s >= 0)
583             ex = new TimeoutException();
584         else if ((rx = getThrowableException()) != null)
585             ex = new ExecutionException(rx);
586         ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
587     }
588 
589     /**
590      * A version of "sneaky throw" to relay exceptions in other
591      * contexts.
592      */
rethrow(Throwable ex)593     static void rethrow(Throwable ex) {
594         ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
595     }
596 
597     /**
598      * The sneaky part of sneaky throw, relying on generics
599      * limitations to evade compiler complaints about rethrowing
600      * unchecked exceptions. If argument null, throws
601      * CancellationException.
602      */
603     @SuppressWarnings("unchecked") static <T extends Throwable>
uncheckedThrow(Throwable t)604     void uncheckedThrow(Throwable t) throws T {
605         if (t == null)
606             t = new CancellationException();
607         throw (T)t; // rely on vacuous cast
608     }
609 
610     // public methods
611 
612     /**
613      * Arranges to asynchronously execute this task in the pool the
614      * current task is running in, if applicable, or using the {@link
615      * ForkJoinPool#commonPool()} if not {@link #inForkJoinPool}.  While
616      * it is not necessarily enforced, it is a usage error to fork a
617      * task more than once unless it has completed and been
618      * reinitialized.  Subsequent modifications to the state of this
619      * task or any data it operates on are not necessarily
620      * consistently observable by any thread other than the one
621      * executing it unless preceded by a call to {@link #join} or
622      * related methods, or a call to {@link #isDone} returning {@code
623      * true}.
624      *
625      * @return {@code this}, to simplify usage
626      */
fork()627     public final ForkJoinTask<V> fork() {
628         Thread t; ForkJoinWorkerThread wt;
629         ForkJoinPool p; ForkJoinPool.WorkQueue q;
630         U.storeStoreFence();  // ensure safely publishable
631         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
632             p = (wt = (ForkJoinWorkerThread)t).pool;
633             q = wt.workQueue;
634         }
635         else
636             q = (p = ForkJoinPool.common).submissionQueue(false);
637         q.push(this, p, true);
638         return this;
639     }
640 
641     /**
642      * Returns the result of the computation when it
643      * {@linkplain #isDone is done}.
644      * This method differs from {@link #get()} in that abnormal
645      * completion results in {@code RuntimeException} or {@code Error},
646      * not {@code ExecutionException}, and that interrupts of the
647      * calling thread do <em>not</em> cause the method to abruptly
648      * return by throwing {@code InterruptedException}.
649      *
650      * @return the computed result
651      */
join()652     public final V join() {
653         int s;
654         if ((s = status) >= 0)
655             s = awaitDone(s & POOLSUBMIT, 0L);
656         if ((s & ABNORMAL) != 0)
657             reportException(s);
658         return getRawResult();
659     }
660 
661     /**
662      * Commences performing this task, awaits its completion if
663      * necessary, and returns its result, or throws an (unchecked)
664      * {@code RuntimeException} or {@code Error} if the underlying
665      * computation did so.
666      *
667      * @return the computed result
668      */
invoke()669     public final V invoke() {
670         int s;
671         if ((s = doExec()) >= 0)
672             s = awaitDone(RAN, 0L);
673         if ((s & ABNORMAL) != 0)
674             reportException(s);
675         return getRawResult();
676     }
677 
678     /**
679      * Forks the given tasks, returning when {@code isDone} holds for
680      * each task or an (unchecked) exception is encountered, in which
681      * case the exception is rethrown. If more than one task
682      * encounters an exception, then this method throws any one of
683      * these exceptions. If any task encounters an exception, the
684      * other may be cancelled. However, the execution status of
685      * individual tasks is not guaranteed upon exceptional return. The
686      * status of each task may be obtained using {@link
687      * #getException()} and related methods to check if they have been
688      * cancelled, completed normally or exceptionally, or left
689      * unprocessed.
690      *
691      * @param t1 the first task
692      * @param t2 the second task
693      * @throws NullPointerException if any task is null
694      */
invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2)695     public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
696         int s1, s2;
697         if (t1 == null || t2 == null)
698             throw new NullPointerException();
699         t2.fork();
700         if ((s1 = t1.doExec()) >= 0)
701             s1 = t1.awaitDone(RAN, 0L);
702         if ((s1 & ABNORMAL) != 0) {
703             cancelIgnoringExceptions(t2);
704             t1.reportException(s1);
705         }
706         else {
707             if ((s2 = t2.status) >= 0)
708                 s2 = t2.awaitDone(0, 0L);
709             if ((s2 & ABNORMAL) != 0)
710                 t2.reportException(s2);
711         }
712     }
713 
714     /**
715      * Forks the given tasks, returning when {@code isDone} holds for
716      * each task or an (unchecked) exception is encountered, in which
717      * case the exception is rethrown. If more than one task
718      * encounters an exception, then this method throws any one of
719      * these exceptions. If any task encounters an exception, others
720      * may be cancelled. However, the execution status of individual
721      * tasks is not guaranteed upon exceptional return. The status of
722      * each task may be obtained using {@link #getException()} and
723      * related methods to check if they have been cancelled, completed
724      * normally or exceptionally, or left unprocessed.
725      *
726      * @param tasks the tasks
727      * @throws NullPointerException if any task is null
728      */
invokeAll(ForkJoinTask<?>.... tasks)729     public static void invokeAll(ForkJoinTask<?>... tasks) {
730         Throwable ex = null;
731         int last = tasks.length - 1;
732         for (int i = last; i >= 0; --i) {
733             ForkJoinTask<?> t;
734             if ((t = tasks[i]) == null) {
735                 ex = new NullPointerException();
736                 break;
737             }
738             if (i == 0) {
739                 int s;
740                 if ((s = t.doExec()) >= 0)
741                     s = t.awaitDone(RAN, 0L);
742                 if ((s & ABNORMAL) != 0)
743                     ex = t.getException(s);
744                 break;
745             }
746             t.fork();
747         }
748         if (ex == null) {
749             for (int i = 1; i <= last; ++i) {
750                 ForkJoinTask<?> t;
751                 if ((t = tasks[i]) != null) {
752                     int s;
753                     if ((s = t.status) >= 0)
754                         s = t.awaitDone(0, 0L);
755                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
756                         break;
757                 }
758             }
759         }
760         if (ex != null) {
761             for (int i = 1; i <= last; ++i)
762                 cancelIgnoringExceptions(tasks[i]);
763             rethrow(ex);
764         }
765     }
766 
767     /**
768      * Forks all tasks in the specified collection, returning when
769      * {@code isDone} holds for each task or an (unchecked) exception
770      * is encountered, in which case the exception is rethrown. If
771      * more than one task encounters an exception, then this method
772      * throws any one of these exceptions. If any task encounters an
773      * exception, others may be cancelled. However, the execution
774      * status of individual tasks is not guaranteed upon exceptional
775      * return. The status of each task may be obtained using {@link
776      * #getException()} and related methods to check if they have been
777      * cancelled, completed normally or exceptionally, or left
778      * unprocessed.
779      *
780      * @param tasks the collection of tasks
781      * @param <T> the type of the values returned from the tasks
782      * @return the tasks argument, to simplify usage
783      * @throws NullPointerException if tasks or any element are null
784      */
invokeAll(Collection<T> tasks)785     public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
786         if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
787             invokeAll(tasks.toArray(new ForkJoinTask<?>[0]));
788             return tasks;
789         }
790         @SuppressWarnings("unchecked")
791         List<? extends ForkJoinTask<?>> ts =
792             (List<? extends ForkJoinTask<?>>) tasks;
793         Throwable ex = null;
794         int last = ts.size() - 1;  // nearly same as array version
795         for (int i = last; i >= 0; --i) {
796             ForkJoinTask<?> t;
797             if ((t = ts.get(i)) == null) {
798                 ex = new NullPointerException();
799                 break;
800             }
801             if (i == 0) {
802                 int s;
803                 if ((s = t.doExec()) >= 0)
804                     s = t.awaitDone(RAN, 0L);
805                 if ((s & ABNORMAL) != 0)
806                     ex = t.getException(s);
807                 break;
808             }
809             t.fork();
810         }
811         if (ex == null) {
812             for (int i = 1; i <= last; ++i) {
813                 ForkJoinTask<?> t;
814                 if ((t = ts.get(i)) != null) {
815                     int s;
816                     if ((s = t.status) >= 0)
817                         s = t.awaitDone(0, 0L);
818                     if ((s & ABNORMAL) != 0 && (ex = t.getException(s)) != null)
819                         break;
820                 }
821             }
822         }
823         if (ex != null) {
824             for (int i = 1; i <= last; ++i)
825                 cancelIgnoringExceptions(ts.get(i));
826             rethrow(ex);
827         }
828         return tasks;
829     }
830 
831     /**
832      * Attempts to cancel execution of this task. This attempt will
833      * fail if the task has already completed or could not be
834      * cancelled for some other reason. If successful, and this task
835      * has not started when {@code cancel} is called, execution of
836      * this task is suppressed. After this method returns
837      * successfully, unless there is an intervening call to {@link
838      * #reinitialize}, subsequent calls to {@link #isCancelled},
839      * {@link #isDone}, and {@code cancel} will return {@code true}
840      * and calls to {@link #join} and related methods will result in
841      * {@code CancellationException}.
842      *
843      * <p>This method may be overridden in subclasses, but if so, must
844      * still ensure that these properties hold. In particular, the
845      * {@code cancel} method itself must not throw exceptions.
846      *
847      * <p>This method is designed to be invoked by <em>other</em>
848      * tasks. To terminate the current task, you can just return or
849      * throw an unchecked exception from its computation method, or
850      * invoke {@link #completeExceptionally(Throwable)}.
851      *
852      * @param mayInterruptIfRunning this value has no effect in the
853      * default implementation because interrupts are not used to
854      * control cancellation.
855      *
856      * @return {@code true} if this task is now cancelled
857      */
cancel(boolean mayInterruptIfRunning)858     public boolean cancel(boolean mayInterruptIfRunning) {
859         return (trySetCancelled() & (ABNORMAL | THROWN)) == ABNORMAL;
860     }
861 
isDone()862     public final boolean isDone() {
863         return status < 0;
864     }
865 
isCancelled()866     public final boolean isCancelled() {
867         return (status & (ABNORMAL | THROWN)) == ABNORMAL;
868     }
869 
870     /**
871      * Returns {@code true} if this task threw an exception or was cancelled.
872      *
873      * @return {@code true} if this task threw an exception or was cancelled
874      */
isCompletedAbnormally()875     public final boolean isCompletedAbnormally() {
876         return (status & ABNORMAL) != 0;
877     }
878 
879     /**
880      * Returns {@code true} if this task completed without throwing an
881      * exception and was not cancelled.
882      *
883      * @return {@code true} if this task completed without throwing an
884      * exception and was not cancelled
885      */
isCompletedNormally()886     public final boolean isCompletedNormally() {
887         return (status & (DONE | ABNORMAL)) == DONE;
888     }
889 
890     @Override
state()891     public State state() {
892         int s = status;
893         return (s >= 0) ? State.RUNNING :
894             ((s & (DONE | ABNORMAL)) == DONE) ? State.SUCCESS:
895             ((s & (ABNORMAL | THROWN)) == (ABNORMAL | THROWN)) ? State.FAILED :
896             State.CANCELLED;
897     }
898 
899     @Override
resultNow()900     public V resultNow() {
901         if (!isCompletedNormally())
902             throw new IllegalStateException();
903         return getRawResult();
904     }
905 
906     @Override
exceptionNow()907     public Throwable exceptionNow() {
908         if ((status & (ABNORMAL | THROWN)) != (ABNORMAL | THROWN))
909             throw new IllegalStateException();
910         return getThrowableException();
911     }
912 
913     /**
914      * Returns the exception thrown by the base computation, or a
915      * {@code CancellationException} if cancelled, or {@code null} if
916      * none or if the method has not yet completed.
917      *
918      * @return the exception, or {@code null} if none
919      */
getException()920     public final Throwable getException() {
921         return getException(status);
922     }
923 
924     /**
925      * Completes this task abnormally, and if not already aborted or
926      * cancelled, causes it to throw the given exception upon
927      * {@code join} and related operations. This method may be used
928      * to induce exceptions in asynchronous tasks, or to force
929      * completion of tasks that would not otherwise complete.  Its use
930      * in other situations is discouraged.  This method is
931      * overridable, but overridden versions must invoke {@code super}
932      * implementation to maintain guarantees.
933      *
934      * @param ex the exception to throw. If this exception is not a
935      * {@code RuntimeException} or {@code Error}, the actual exception
936      * thrown will be a {@code RuntimeException} with cause {@code ex}.
937      */
completeExceptionally(Throwable ex)938     public void completeExceptionally(Throwable ex) {
939         trySetException((ex instanceof RuntimeException) ||
940                         (ex instanceof Error) ? ex :
941                         new RuntimeException(ex));
942     }
943 
944     /**
945      * Completes this task, and if not already aborted or cancelled,
946      * returning the given value as the result of subsequent
947      * invocations of {@code join} and related operations. This method
948      * may be used to provide results for asynchronous tasks, or to
949      * provide alternative handling for tasks that would not otherwise
950      * complete normally. Its use in other situations is
951      * discouraged. This method is overridable, but overridden
952      * versions must invoke {@code super} implementation to maintain
953      * guarantees.
954      *
955      * @param value the result value for this task
956      */
complete(V value)957     public void complete(V value) {
958         try {
959             setRawResult(value);
960         } catch (Throwable rex) {
961             trySetException(rex);
962             return;
963         }
964         setDone();
965     }
966 
967     /**
968      * Completes this task normally without setting a value. The most
969      * recent value established by {@link #setRawResult} (or {@code
970      * null} by default) will be returned as the result of subsequent
971      * invocations of {@code join} and related operations.
972      *
973      * @since 1.8
974      */
quietlyComplete()975     public final void quietlyComplete() {
976         setDone();
977     }
978 
979     /**
980      * Waits if necessary for the computation to complete, and then
981      * retrieves its result.
982      *
983      * @return the computed result
984      * @throws CancellationException if the computation was cancelled
985      * @throws ExecutionException if the computation threw an
986      * exception
987      * @throws InterruptedException if the current thread is not a
988      * member of a ForkJoinPool and was interrupted while waiting
989      */
get()990     public final V get() throws InterruptedException, ExecutionException {
991         int s;
992         if (Thread.interrupted())
993             s = ABNORMAL;
994         else if ((s = status) >= 0)
995             s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE, 0L);
996         if ((s & ABNORMAL) != 0)
997             reportExecutionException(s);
998         return getRawResult();
999     }
1000 
1001     /**
1002      * Waits if necessary for at most the given time for the computation
1003      * to complete, and then retrieves its result, if available.
1004      *
1005      * @param timeout the maximum time to wait
1006      * @param unit the time unit of the timeout argument
1007      * @return the computed result
1008      * @throws CancellationException if the computation was cancelled
1009      * @throws ExecutionException if the computation threw an
1010      * exception
1011      * @throws InterruptedException if the current thread is not a
1012      * member of a ForkJoinPool and was interrupted while waiting
1013      * @throws TimeoutException if the wait timed out
1014      */
get(long timeout, TimeUnit unit)1015     public final V get(long timeout, TimeUnit unit)
1016         throws InterruptedException, ExecutionException, TimeoutException {
1017         long nanos = unit.toNanos(timeout);
1018         int s;
1019         if (Thread.interrupted())
1020             s = ABNORMAL;
1021         else if ((s = status) >= 0 && nanos > 0L)
1022             s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE | TIMED,
1023                           nanos + System.nanoTime());
1024         if (s >= 0 || (s & ABNORMAL) != 0)
1025             reportExecutionException(s);
1026         return getRawResult();
1027     }
1028 
1029     /**
1030      * Joins this task, without returning its result or throwing its
1031      * exception. This method may be useful when processing
1032      * collections of tasks when some have been cancelled or otherwise
1033      * known to have aborted.
1034      */
quietlyJoin()1035     public final void quietlyJoin() {
1036         int s;
1037         if ((s = status) >= 0)
1038             awaitDone(s & POOLSUBMIT, 0L);
1039     }
1040 
1041     /**
1042      * Commences performing this task and awaits its completion if
1043      * necessary, without returning its result or throwing its
1044      * exception.
1045      */
quietlyInvoke()1046     public final void quietlyInvoke() {
1047         int s;
1048         if ((s = doExec()) >= 0)
1049             awaitDone(RAN, 0L);
1050     }
1051 
1052     /**
1053      * Tries to join this task, returning true if it completed
1054      * (possibly exceptionally) before the given timeout and
1055      * the current thread has not been interrupted.
1056      *
1057      * @param timeout the maximum time to wait
1058      * @param unit the time unit of the timeout argument
1059      * @return true if this task completed
1060      * @throws InterruptedException if the current thread was
1061      * interrupted while waiting
1062      * @since 19
1063      */
quietlyJoin(long timeout, TimeUnit unit)1064     public final boolean quietlyJoin(long timeout, TimeUnit unit)
1065         throws InterruptedException {
1066         int s;
1067         long nanos = unit.toNanos(timeout);
1068         if (Thread.interrupted())
1069             s = ABNORMAL;
1070         else if ((s = status) >= 0 && nanos > 0L)
1071             s = awaitDone((s & POOLSUBMIT) | INTERRUPTIBLE | TIMED,
1072                           nanos + System.nanoTime());
1073         if (s == ABNORMAL)
1074             throw new InterruptedException();
1075         else
1076             return (s < 0);
1077     }
1078 
1079     /**
1080      * Tries to join this task, returning true if it completed
1081      * (possibly exceptionally) before the given timeout.
1082      *
1083      * @param timeout the maximum time to wait
1084      * @param unit the time unit of the timeout argument
1085      * @return true if this task completed
1086      * @since 19
1087      */
quietlyJoinUninterruptibly(long timeout, TimeUnit unit)1088     public final boolean quietlyJoinUninterruptibly(long timeout,
1089                                                     TimeUnit unit) {
1090         int s;
1091         long nanos = unit.toNanos(timeout);
1092         if ((s = status) >= 0 && nanos > 0L)
1093             s = awaitDone((s & POOLSUBMIT) | TIMED, nanos + System.nanoTime());
1094         return (s < 0);
1095     }
1096 
1097     /**
1098      * Possibly executes tasks until the pool hosting the current task
1099      * {@linkplain ForkJoinPool#isQuiescent is quiescent}.  This
1100      * method may be of use in designs in which many tasks are forked,
1101      * but none are explicitly joined, instead executing them until
1102      * all are processed.
1103      */
helpQuiesce()1104     public static void helpQuiesce() {
1105         ForkJoinPool.helpQuiescePool(null, Long.MAX_VALUE, false);
1106     }
1107 
1108     /**
1109      * Resets the internal bookkeeping state of this task, allowing a
1110      * subsequent {@code fork}. This method allows repeated reuse of
1111      * this task, but only if reuse occurs when this task has either
1112      * never been forked, or has been forked, then completed and all
1113      * outstanding joins of this task have also completed. Effects
1114      * under any other usage conditions are not guaranteed.
1115      * This method may be useful when executing
1116      * pre-constructed trees of subtasks in loops.
1117      *
1118      * <p>Upon completion of this method, {@code isDone()} reports
1119      * {@code false}, and {@code getException()} reports {@code
1120      * null}. However, the value returned by {@code getRawResult} is
1121      * unaffected. To clear this value, you can invoke {@code
1122      * setRawResult(null)}.
1123      */
reinitialize()1124     public void reinitialize() {
1125         aux = null;
1126         status = 0;
1127     }
1128 
1129     /**
1130      * Returns the pool hosting the current thread, or {@code null}
1131      * if the current thread is executing outside of any ForkJoinPool.
1132      *
1133      * <p>This method returns {@code null} if and only if {@link
1134      * #inForkJoinPool} returns {@code false}.
1135      *
1136      * @return the pool, or {@code null} if none
1137      */
getPool()1138     public static ForkJoinPool getPool() {
1139         Thread t;
1140         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1141                 ((ForkJoinWorkerThread) t).pool : null);
1142     }
1143 
1144     /**
1145      * Returns {@code true} if the current thread is a {@link
1146      * ForkJoinWorkerThread} executing as a ForkJoinPool computation.
1147      *
1148      * @return {@code true} if the current thread is a {@link
1149      * ForkJoinWorkerThread} executing as a ForkJoinPool computation,
1150      * or {@code false} otherwise
1151      */
inForkJoinPool()1152     public static boolean inForkJoinPool() {
1153         return Thread.currentThread() instanceof ForkJoinWorkerThread;
1154     }
1155 
1156     /**
1157      * Tries to unschedule this task for execution. This method will
1158      * typically (but is not guaranteed to) succeed if this task is
1159      * the most recently forked task by the current thread, and has
1160      * not commenced executing in another thread.  This method may be
1161      * useful when arranging alternative local processing of tasks
1162      * that could have been, but were not, stolen.
1163      *
1164      * @return {@code true} if unforked
1165      */
tryUnfork()1166     public boolean tryUnfork() {
1167         Thread t; ForkJoinPool.WorkQueue q; boolean owned;
1168         if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1169             q = ((ForkJoinWorkerThread)t).workQueue;
1170         else
1171             q = ForkJoinPool.commonQueue();
1172         return (q != null && q.tryUnpush(this, owned));
1173     }
1174 
1175     /**
1176      * Returns an estimate of the number of tasks that have been
1177      * forked by the current worker thread but not yet executed. This
1178      * value may be useful for heuristic decisions about whether to
1179      * fork other tasks.
1180      *
1181      * @return the number of tasks
1182      */
getQueuedTaskCount()1183     public static int getQueuedTaskCount() {
1184         Thread t; ForkJoinPool.WorkQueue q;
1185         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1186             q = ((ForkJoinWorkerThread)t).workQueue;
1187         else
1188             q = ForkJoinPool.commonQueue();
1189         return (q == null) ? 0 : q.queueSize();
1190     }
1191 
1192     /**
1193      * Returns an estimate of how many more locally queued tasks are
1194      * held by the current worker thread than there are other worker
1195      * threads that might steal them, or zero if this thread is not
1196      * operating in a ForkJoinPool. This value may be useful for
1197      * heuristic decisions about whether to fork other tasks. In many
1198      * usages of ForkJoinTasks, at steady state, each worker should
1199      * aim to maintain a small constant surplus (for example, 3) of
1200      * tasks, and to process computations locally if this threshold is
1201      * exceeded.
1202      *
1203      * @return the surplus number of tasks, which may be negative
1204      */
getSurplusQueuedTaskCount()1205     public static int getSurplusQueuedTaskCount() {
1206         return ForkJoinPool.getSurplusQueuedTaskCount();
1207     }
1208 
1209     // Extension methods
1210 
1211     /**
1212      * Returns the result that would be returned by {@link #join}, even
1213      * if this task completed abnormally, or {@code null} if this task
1214      * is not known to have been completed.  This method is designed
1215      * to aid debugging, as well as to support extensions. Its use in
1216      * any other context is discouraged.
1217      *
1218      * @return the result, or {@code null} if not completed
1219      */
getRawResult()1220     public abstract V getRawResult();
1221 
1222     /**
1223      * Forces the given value to be returned as a result.  This method
1224      * is designed to support extensions, and should not in general be
1225      * called otherwise.
1226      *
1227      * @param value the value
1228      */
setRawResult(V value)1229     protected abstract void setRawResult(V value);
1230 
1231     /**
1232      * Immediately performs the base action of this task and returns
1233      * true if, upon return from this method, this task is guaranteed
1234      * to have completed. This method may return false otherwise, to
1235      * indicate that this task is not necessarily complete (or is not
1236      * known to be complete), for example in asynchronous actions that
1237      * require explicit invocations of completion methods. This method
1238      * may also throw an (unchecked) exception to indicate abnormal
1239      * exit. This method is designed to support extensions, and should
1240      * not in general be called otherwise.
1241      *
1242      * @return {@code true} if this task is known to have completed normally
1243      */
exec()1244     protected abstract boolean exec();
1245 
1246     /**
1247      * Returns, but does not unschedule or execute, a task queued by
1248      * the current thread but not yet executed, if one is immediately
1249      * available. There is no guarantee that this task will actually
1250      * be polled or executed next. Conversely, this method may return
1251      * null even if a task exists but cannot be accessed without
1252      * contention with other threads.  This method is designed
1253      * primarily to support extensions, and is unlikely to be useful
1254      * otherwise.
1255      *
1256      * @return the next task, or {@code null} if none are available
1257      */
peekNextLocalTask()1258     protected static ForkJoinTask<?> peekNextLocalTask() {
1259         Thread t; ForkJoinPool.WorkQueue q;
1260         if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
1261             q = ((ForkJoinWorkerThread)t).workQueue;
1262         else
1263             q = ForkJoinPool.commonQueue();
1264         return (q == null) ? null : q.peek();
1265     }
1266 
1267     /**
1268      * Unschedules and returns, without executing, the next task
1269      * queued by the current thread but not yet executed, if the
1270      * current thread is operating in a ForkJoinPool.  This method is
1271      * designed primarily to support extensions, and is unlikely to be
1272      * useful otherwise.
1273      *
1274      * @return the next task, or {@code null} if none are available
1275      */
pollNextLocalTask()1276     protected static ForkJoinTask<?> pollNextLocalTask() {
1277         Thread t;
1278         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1279                 ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : null);
1280     }
1281 
1282     /**
1283      * If the current thread is operating in a ForkJoinPool,
1284      * unschedules and returns, without executing, the next task
1285      * queued by the current thread but not yet executed, if one is
1286      * available, or if not available, a task that was forked by some
1287      * other thread, if available. Availability may be transient, so a
1288      * {@code null} result does not necessarily imply quiescence of
1289      * the pool this task is operating in.  This method is designed
1290      * primarily to support extensions, and is unlikely to be useful
1291      * otherwise.
1292      *
1293      * @return a task, or {@code null} if none are available
1294      */
pollTask()1295     protected static ForkJoinTask<?> pollTask() {
1296         Thread t; ForkJoinWorkerThread w;
1297         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1298                 (w = (ForkJoinWorkerThread)t).pool.nextTaskFor(w.workQueue) :
1299                 null);
1300     }
1301 
1302     /**
1303      * If the current thread is operating in a ForkJoinPool,
1304      * unschedules and returns, without executing, a task externally
1305      * submitted to the pool, if one is available. Availability may be
1306      * transient, so a {@code null} result does not necessarily imply
1307      * quiescence of the pool.  This method is designed primarily to
1308      * support extensions, and is unlikely to be useful otherwise.
1309      *
1310      * @return a task, or {@code null} if none are available
1311      * @since 9
1312      * @hide API from OpenJDK 9, not yet exposed on Android.
1313      */
pollSubmission()1314     protected static ForkJoinTask<?> pollSubmission() {
1315         Thread t;
1316         return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
1317                 ((ForkJoinWorkerThread)t).pool.pollSubmission() : null);
1318     }
1319 
1320     // tag operations
1321 
1322     /**
1323      * Returns the tag for this task.
1324      *
1325      * @return the tag for this task
1326      * @since 1.8
1327      */
getForkJoinTaskTag()1328     public final short getForkJoinTaskTag() {
1329         return (short)status;
1330     }
1331 
1332     /**
1333      * Atomically sets the tag value for this task and returns the old value.
1334      *
1335      * @param newValue the new tag value
1336      * @return the previous value of the tag
1337      * @since 1.8
1338      */
setForkJoinTaskTag(short newValue)1339     public final short setForkJoinTaskTag(short newValue) {
1340         for (int s;;) {
1341             if (casStatus(s = status, (s & ~SMASK) | (newValue & SMASK)))
1342                 return (short)s;
1343         }
1344     }
1345 
1346     /**
1347      * Atomically conditionally sets the tag value for this task.
1348      * Among other applications, tags can be used as visit markers
1349      * in tasks operating on graphs, as in methods that check: {@code
1350      * if (task.compareAndSetForkJoinTaskTag((short)0, (short)1))}
1351      * before processing, otherwise exiting because the node has
1352      * already been visited.
1353      *
1354      * @param expect the expected tag value
1355      * @param update the new tag value
1356      * @return {@code true} if successful; i.e., the current value was
1357      * equal to {@code expect} and was changed to {@code update}.
1358      * @since 1.8
1359      */
compareAndSetForkJoinTaskTag(short expect, short update)1360     public final boolean compareAndSetForkJoinTaskTag(short expect, short update) {
1361         for (int s;;) {
1362             if ((short)(s = status) != expect)
1363                 return false;
1364             if (casStatus(s, (s & ~SMASK) | (update & SMASK)))
1365                 return true;
1366         }
1367     }
1368 
1369     /**
1370      * Adapter for Runnables. This implements RunnableFuture
1371      * to be compliant with AbstractExecutorService constraints
1372      * when used in ForkJoinPool.
1373      */
1374     static final class AdaptedRunnable<T> extends ForkJoinTask<T>
1375         implements RunnableFuture<T> {
1376         @SuppressWarnings("serial") // Conditionally serializable
1377         final Runnable runnable;
1378         @SuppressWarnings("serial") // Conditionally serializable
1379         T result;
AdaptedRunnable(Runnable runnable, T result)1380         AdaptedRunnable(Runnable runnable, T result) {
1381             if (runnable == null) throw new NullPointerException();
1382             this.runnable = runnable;
1383             this.result = result; // OK to set this even before completion
1384         }
getRawResult()1385         public final T getRawResult() { return result; }
setRawResult(T v)1386         public final void setRawResult(T v) { result = v; }
exec()1387         public final boolean exec() { runnable.run(); return true; }
run()1388         public final void run() { invoke(); }
toString()1389         public String toString() {
1390             return super.toString() + "[Wrapped task = " + runnable + "]";
1391         }
1392         private static final long serialVersionUID = 5232453952276885070L;
1393     }
1394 
1395     /**
1396      * Adapter for Runnables without results.
1397      */
1398     static final class AdaptedRunnableAction extends ForkJoinTask<Void>
1399         implements RunnableFuture<Void> {
1400         @SuppressWarnings("serial") // Conditionally serializable
1401         final Runnable runnable;
AdaptedRunnableAction(Runnable runnable)1402         AdaptedRunnableAction(Runnable runnable) {
1403             if (runnable == null) throw new NullPointerException();
1404             this.runnable = runnable;
1405         }
getRawResult()1406         public final Void getRawResult() { return null; }
setRawResult(Void v)1407         public final void setRawResult(Void v) { }
exec()1408         public final boolean exec() { runnable.run(); return true; }
run()1409         public final void run() { invoke(); }
toString()1410         public String toString() {
1411             return super.toString() + "[Wrapped task = " + runnable + "]";
1412         }
1413         private static final long serialVersionUID = 5232453952276885070L;
1414     }
1415 
1416     /**
1417      * Adapter for Runnables in which failure forces worker exception.
1418      */
1419     static final class RunnableExecuteAction extends ForkJoinTask<Void> {
1420         @SuppressWarnings("serial") // Conditionally serializable
1421         final Runnable runnable;
RunnableExecuteAction(Runnable runnable)1422         RunnableExecuteAction(Runnable runnable) {
1423             if (runnable == null) throw new NullPointerException();
1424             this.runnable = runnable;
1425         }
getRawResult()1426         public final Void getRawResult() { return null; }
setRawResult(Void v)1427         public final void setRawResult(Void v) { }
exec()1428         public final boolean exec() { runnable.run(); return true; }
trySetException(Throwable ex)1429         int trySetException(Throwable ex) { // if a handler, invoke it
1430             int s; Thread t; java.lang.Thread.UncaughtExceptionHandler h;
1431             if (isExceptionalStatus(s = trySetThrown(ex)) &&
1432                 (h = ((t = Thread.currentThread()).
1433                       getUncaughtExceptionHandler())) != null) {
1434                 try {
1435                     h.uncaughtException(t, ex);
1436                 } catch (Throwable ignore) {
1437                 }
1438             }
1439             return s;
1440         }
1441         private static final long serialVersionUID = 5232453952276885070L;
1442     }
1443 
1444     /**
1445      * Adapter for Callables.
1446      */
1447     static final class AdaptedCallable<T> extends ForkJoinTask<T>
1448         implements RunnableFuture<T> {
1449         @SuppressWarnings("serial") // Conditionally serializable
1450         final Callable<? extends T> callable;
1451         @SuppressWarnings("serial") // Conditionally serializable
1452         T result;
AdaptedCallable(Callable<? extends T> callable)1453         AdaptedCallable(Callable<? extends T> callable) {
1454             if (callable == null) throw new NullPointerException();
1455             this.callable = callable;
1456         }
getRawResult()1457         public final T getRawResult() { return result; }
setRawResult(T v)1458         public final void setRawResult(T v) { result = v; }
exec()1459         public final boolean exec() {
1460             try {
1461                 result = callable.call();
1462                 return true;
1463             } catch (RuntimeException rex) {
1464                 throw rex;
1465             } catch (Exception ex) {
1466                 throw new RuntimeException(ex);
1467             }
1468         }
run()1469         public final void run() { invoke(); }
toString()1470         public String toString() {
1471             return super.toString() + "[Wrapped task = " + callable + "]";
1472         }
1473         private static final long serialVersionUID = 2838392045355241008L;
1474     }
1475 
1476     static final class AdaptedInterruptibleCallable<T> extends ForkJoinTask<T>
1477         implements RunnableFuture<T> {
1478         @SuppressWarnings("serial") // Conditionally serializable
1479         final Callable<? extends T> callable;
1480         transient volatile Thread runner;
1481         @SuppressWarnings("serial") // Conditionally serializable
1482         T result;
AdaptedInterruptibleCallable(Callable<? extends T> callable)1483         AdaptedInterruptibleCallable(Callable<? extends T> callable) {
1484             if (callable == null) throw new NullPointerException();
1485             this.callable = callable;
1486         }
getRawResult()1487         public final T getRawResult() { return result; }
setRawResult(T v)1488         public final void setRawResult(T v) { result = v; }
exec()1489         public final boolean exec() {
1490             Thread.interrupted();
1491             runner = Thread.currentThread();
1492             try {
1493                 if (!isDone()) // recheck
1494                     result = callable.call();
1495                 return true;
1496             } catch (RuntimeException rex) {
1497                 throw rex;
1498             } catch (Exception ex) {
1499                 throw new RuntimeException(ex);
1500             } finally {
1501                 runner = null;
1502                 Thread.interrupted();
1503             }
1504         }
run()1505         public final void run() { invoke(); }
cancel(boolean mayInterruptIfRunning)1506         public final boolean cancel(boolean mayInterruptIfRunning) {
1507             Thread t;
1508             boolean stat = super.cancel(false);
1509             if (mayInterruptIfRunning && (t = runner) != null) {
1510                 try {
1511                     t.interrupt();
1512                 } catch (Throwable ignore) {
1513                 }
1514             }
1515             return stat;
1516         }
toString()1517         public String toString() {
1518             return super.toString() + "[Wrapped task = " + callable + "]";
1519         }
1520         private static final long serialVersionUID = 2838392045355241008L;
1521     }
1522 
1523     /**
1524      * Returns a new {@code ForkJoinTask} that performs the {@code run}
1525      * method of the given {@code Runnable} as its action, and returns
1526      * a null result upon {@link #join}.
1527      *
1528      * @param runnable the runnable action
1529      * @return the task
1530      */
adapt(Runnable runnable)1531     public static ForkJoinTask<?> adapt(Runnable runnable) {
1532         return new AdaptedRunnableAction(runnable);
1533     }
1534 
1535     /**
1536      * Returns a new {@code ForkJoinTask} that performs the {@code run}
1537      * method of the given {@code Runnable} as its action, and returns
1538      * the given result upon {@link #join}.
1539      *
1540      * @param runnable the runnable action
1541      * @param result the result upon completion
1542      * @param <T> the type of the result
1543      * @return the task
1544      */
adapt(Runnable runnable, T result)1545     public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
1546         return new AdaptedRunnable<T>(runnable, result);
1547     }
1548 
1549     /**
1550      * Returns a new {@code ForkJoinTask} that performs the {@code call}
1551      * method of the given {@code Callable} as its action, and returns
1552      * its result upon {@link #join}, translating any checked exceptions
1553      * encountered into {@code RuntimeException}.
1554      *
1555      * @param callable the callable action
1556      * @param <T> the type of the callable's result
1557      * @return the task
1558      */
adapt(Callable<? extends T> callable)1559     public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
1560         return new AdaptedCallable<T>(callable);
1561     }
1562 
1563     /**
1564      * Returns a new {@code ForkJoinTask} that performs the {@code call}
1565      * method of the given {@code Callable} as its action, and returns
1566      * its result upon {@link #join}, translating any checked exceptions
1567      * encountered into {@code RuntimeException}.  Additionally,
1568      * invocations of {@code cancel} with {@code mayInterruptIfRunning
1569      * true} will attempt to interrupt the thread performing the task.
1570      *
1571      * @param callable the callable action
1572      * @param <T> the type of the callable's result
1573      * @return the task
1574      *
1575      * @since 19
1576      */
adaptInterruptible(Callable<? extends T> callable)1577     public static <T> ForkJoinTask<T> adaptInterruptible(Callable<? extends T> callable) {
1578         // https://bugs.openjdk.org/browse/JDK-8246587
1579         return new AdaptedInterruptibleCallable<T>(callable);
1580     }
1581 
1582     // Serialization support
1583 
1584     private static final long serialVersionUID = -7721805057305804111L;
1585 
1586     /**
1587      * Saves this task to a stream (that is, serializes it).
1588      *
1589      * @param s the stream
1590      * @throws java.io.IOException if an I/O error occurs
1591      * @serialData the current run status and the exception thrown
1592      * during execution, or {@code null} if none
1593      */
writeObject(java.io.ObjectOutputStream s)1594     private void writeObject(java.io.ObjectOutputStream s)
1595         throws java.io.IOException {
1596         Aux a;
1597         s.defaultWriteObject();
1598         s.writeObject((a = aux) == null ? null : a.ex);
1599     }
1600 
1601     /**
1602      * Reconstitutes this task from a stream (that is, deserializes it).
1603      * @param s the stream
1604      * @throws ClassNotFoundException if the class of a serialized object
1605      *         could not be found
1606      * @throws java.io.IOException if an I/O error occurs
1607      */
readObject(java.io.ObjectInputStream s)1608     private void readObject(java.io.ObjectInputStream s)
1609         throws java.io.IOException, ClassNotFoundException {
1610         s.defaultReadObject();
1611         Object ex = s.readObject();
1612         if (ex != null)
1613             trySetThrown((Throwable)ex);
1614     }
1615 
1616     static {
1617         U = Unsafe.getUnsafe();
1618         STATUS = U.objectFieldOffset(ForkJoinTask.class, "status");
1619         AUX = U.objectFieldOffset(ForkJoinTask.class, "aux");
1620         Class<?> dep1 = LockSupport.class; // ensure loaded
1621         Class<?> dep2 = Aux.class;
1622     }
1623 
1624 }
1625