• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
2  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
3  *
4  * This code is free software; you can redistribute it and/or modify it
5  * under the terms of the GNU General Public License version 2 only, as
6  * published by the Free Software Foundation.  Oracle designates this
7  * particular file as subject to the "Classpath" exception as provided
8  * by Oracle in the LICENSE file that accompanied this code.
9  *
10  * This code is distributed in the hope that it will be useful, but WITHOUT
11  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
12  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
13  * version 2 for more details (a copy is included in the LICENSE file that
14  * accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License version
17  * 2 along with this work; if not, write to the Free Software Foundation,
18  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
19  *
20  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
21  * or visit www.oracle.com if you need additional information or have any
22  * questions.
23  */
24 
25 /*
26  * This file is available under and governed by the GNU General Public
27  * License version 2 only, as published by the Free Software Foundation.
28  * However, the following notice accompanied the original version of this
29  * file:
30  *
31  * Written by Doug Lea with assistance from members of JCP JSR-166
32  * Expert Group and released to the public domain, as explained at
33  * http://creativecommons.org/publicdomain/zero/1.0/
34  */
35 
36 package java.util.concurrent;
37 
38 /**
39  * A {@link ForkJoinTask} with a completion action performed when
40  * triggered and there are no remaining pending actions.
41  * CountedCompleters are in general more robust in the
42  * presence of subtask stalls and blockage than are other forms of
43  * ForkJoinTasks, but are less intuitive to program.  Uses of
44  * CountedCompleter are similar to those of other completion based
45  * components
46  * except that multiple <em>pending</em> completions may be necessary
47  * to trigger the completion action {@link #onCompletion(CountedCompleter)},
48  * not just one.
49  * Unless initialized otherwise, the {@linkplain #getPendingCount pending
50  * count} starts at zero, but may be (atomically) changed using
51  * methods {@link #setPendingCount}, {@link #addToPendingCount}, and
52  * {@link #compareAndSetPendingCount}. Upon invocation of {@link
53  * #tryComplete}, if the pending action count is nonzero, it is
54  * decremented; otherwise, the completion action is performed, and if
55  * this completer itself has a completer, the process is continued
56  * with its completer.  As is the case with related synchronization
57  * components such as {@link java.util.concurrent.Phaser Phaser} and
58  * {@link java.util.concurrent.Semaphore Semaphore}, these methods
59  * affect only internal counts; they do not establish any further
60  * internal bookkeeping. In particular, the identities of pending
61  * tasks are not maintained. As illustrated below, you can create
62  * subclasses that do record some or all pending tasks or their
63  * results when needed.  As illustrated below, utility methods
64  * supporting customization of completion traversals are also
65  * provided. However, because CountedCompleters provide only basic
66  * synchronization mechanisms, it may be useful to create further
67  * abstract subclasses that maintain linkages, fields, and additional
68  * support methods appropriate for a set of related usages.
69  *
70  * <p>A concrete CountedCompleter class must define method {@link
71  * #compute}, that should in most cases (as illustrated below), invoke
72  * {@code tryComplete()} once before returning. The class may also
73  * optionally override method {@link #onCompletion(CountedCompleter)}
74  * to perform an action upon normal completion, and method
75  * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to
76  * perform an action upon any exception.
77  *
78  * <p>CountedCompleters most often do not bear results, in which case
79  * they are normally declared as {@code CountedCompleter<Void>}, and
80  * will always return {@code null} as a result value.  In other cases,
81  * you should override method {@link #getRawResult} to provide a
82  * result from {@code join(), invoke()}, and related methods.  In
83  * general, this method should return the value of a field (or a
84  * function of one or more fields) of the CountedCompleter object that
85  * holds the result upon completion. Method {@link #setRawResult} by
86  * default plays no role in CountedCompleters.  It is possible, but
87  * rarely applicable, to override this method to maintain other
88  * objects or fields holding result data.
89  *
90  * <p>A CountedCompleter that does not itself have a completer (i.e.,
91  * one for which {@link #getCompleter} returns {@code null}) can be
92  * used as a regular ForkJoinTask with this added functionality.
93  * However, any completer that in turn has another completer serves
94  * only as an internal helper for other computations, so its own task
95  * status (as reported in methods such as {@link ForkJoinTask#isDone})
96  * is arbitrary; this status changes only upon explicit invocations of
97  * {@link #complete}, {@link ForkJoinTask#cancel},
98  * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon
99  * exceptional completion of method {@code compute}. Upon any
100  * exceptional completion, the exception may be relayed to a task's
101  * completer (and its completer, and so on), if one exists and it has
102  * not otherwise already completed. Similarly, cancelling an internal
103  * CountedCompleter has only a local effect on that completer, so is
104  * not often useful.
105  *
106  * <p><b>Sample Usages.</b>
107  *
108  * <p><b>Parallel recursive decomposition.</b> CountedCompleters may
109  * be arranged in trees similar to those often used with {@link
110  * RecursiveAction}s, although the constructions involved in setting
111  * them up typically vary. Here, the completer of each task is its
112  * parent in the computation tree. Even though they entail a bit more
113  * bookkeeping, CountedCompleters may be better choices when applying
114  * a possibly time-consuming operation (that cannot be further
115  * subdivided) to each element of an array or collection; especially
116  * when the operation takes a significantly different amount of time
117  * to complete for some elements than others, either because of
118  * intrinsic variation (for example I/O) or auxiliary effects such as
119  * garbage collection.  Because CountedCompleters provide their own
120  * continuations, other threads need not block waiting to perform
121  * them.
122  *
123  * <p>For example, here is an initial version of a class that uses
124  * divide-by-two recursive decomposition to divide work into single
125  * pieces (leaf tasks). Even when work is split into individual calls,
126  * tree-based techniques are usually preferable to directly forking
127  * leaf tasks, because they reduce inter-thread communication and
128  * improve load balancing. In the recursive case, the second of each
129  * pair of subtasks to finish triggers completion of its parent
130  * (because no result combination is performed, the default no-op
131  * implementation of method {@code onCompletion} is not overridden).
132  * A static utility method sets up the base task and invokes it
133  * (here, implicitly using the {@link ForkJoinPool#commonPool()}).
134  *
135  * <pre> {@code
136  * class MyOperation<E> { void apply(E e) { ... }  }
137  *
138  * class ForEach<E> extends CountedCompleter<Void> {
139  *
140  *   public static <E> void forEach(E[] array, MyOperation<E> op) {
141  *     new ForEach<E>(null, array, op, 0, array.length).invoke();
142  *   }
143  *
144  *   final E[] array; final MyOperation<E> op; final int lo, hi;
145  *   ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
146  *     super(p);
147  *     this.array = array; this.op = op; this.lo = lo; this.hi = hi;
148  *   }
149  *
150  *   public void compute() { // version 1
151  *     if (hi - lo >= 2) {
152  *       int mid = (lo + hi) >>> 1;
153  *       setPendingCount(2); // must set pending count before fork
154  *       new ForEach(this, array, op, mid, hi).fork(); // right child
155  *       new ForEach(this, array, op, lo, mid).fork(); // left child
156  *     }
157  *     else if (hi > lo)
158  *       op.apply(array[lo]);
159  *     tryComplete();
160  *   }
161  * }}</pre>
162  *
163  * This design can be improved by noticing that in the recursive case,
164  * the task has nothing to do after forking its right task, so can
165  * directly invoke its left task before returning. (This is an analog
166  * of tail recursion removal.)  Also, because the task returns upon
167  * executing its left task (rather than falling through to invoke
168  * {@code tryComplete}) the pending count is set to one:
169  *
170  * <pre> {@code
171  * class ForEach<E> ... {
172  *   ...
173  *   public void compute() { // version 2
174  *     if (hi - lo >= 2) {
175  *       int mid = (lo + hi) >>> 1;
176  *       setPendingCount(1); // only one pending
177  *       new ForEach(this, array, op, mid, hi).fork(); // right child
178  *       new ForEach(this, array, op, lo, mid).compute(); // direct invoke
179  *     }
180  *     else {
181  *       if (hi > lo)
182  *         op.apply(array[lo]);
183  *       tryComplete();
184  *     }
185  *   }
186  * }}</pre>
187  *
188  * As a further optimization, notice that the left task need not even exist.
189  * Instead of creating a new one, we can iterate using the original task,
190  * and add a pending count for each fork.  Additionally, because no task
191  * in this tree implements an {@link #onCompletion(CountedCompleter)} method,
192  * {@code tryComplete()} can be replaced with {@link #propagateCompletion}.
193  *
194  * <pre> {@code
195  * class ForEach<E> ... {
196  *   ...
197  *   public void compute() { // version 3
198  *     int l = lo, h = hi;
199  *     while (h - l >= 2) {
200  *       int mid = (l + h) >>> 1;
201  *       addToPendingCount(1);
202  *       new ForEach(this, array, op, mid, h).fork(); // right child
203  *       h = mid;
204  *     }
205  *     if (h > l)
206  *       op.apply(array[l]);
207  *     propagateCompletion();
208  *   }
209  * }}</pre>
210  *
211  * Additional optimizations of such classes might entail precomputing
212  * pending counts so that they can be established in constructors,
213  * specializing classes for leaf steps, subdividing by say, four,
214  * instead of two per iteration, and using an adaptive threshold
215  * instead of always subdividing down to single elements.
216  *
217  * <p><b>Searching.</b> A tree of CountedCompleters can search for a
218  * value or property in different parts of a data structure, and
219  * report a result in an {@link
220  * java.util.concurrent.atomic.AtomicReference AtomicReference} as
221  * soon as one is found. The others can poll the result to avoid
222  * unnecessary work. (You could additionally {@linkplain #cancel
223  * cancel} other tasks, but it is usually simpler and more efficient
224  * to just let them notice that the result is set and if so skip
225  * further processing.)  Illustrating again with an array using full
226  * partitioning (again, in practice, leaf tasks will almost always
227  * process more than one element):
228  *
229  * <pre> {@code
230  * class Searcher<E> extends CountedCompleter<E> {
231  *   final E[] array; final AtomicReference<E> result; final int lo, hi;
232  *   Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
233  *     super(p);
234  *     this.array = array; this.result = result; this.lo = lo; this.hi = hi;
235  *   }
236  *   public E getRawResult() { return result.get(); }
237  *   public void compute() { // similar to ForEach version 3
238  *     int l = lo, h = hi;
239  *     while (result.get() == null && h >= l) {
240  *       if (h - l >= 2) {
241  *         int mid = (l + h) >>> 1;
242  *         addToPendingCount(1);
243  *         new Searcher(this, array, result, mid, h).fork();
244  *         h = mid;
245  *       }
246  *       else {
247  *         E x = array[l];
248  *         if (matches(x) && result.compareAndSet(null, x))
249  *           quietlyCompleteRoot(); // root task is now joinable
250  *         break;
251  *       }
252  *     }
253  *     tryComplete(); // normally complete whether or not found
254  *   }
255  *   boolean matches(E e) { ... } // return true if found
256  *
257  *   public static <E> E search(E[] array) {
258  *       return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
259  *   }
260  * }}</pre>
261  *
262  * In this example, as well as others in which tasks have no other
263  * effects except to {@code compareAndSet} a common result, the
264  * trailing unconditional invocation of {@code tryComplete} could be
265  * made conditional ({@code if (result.get() == null) tryComplete();})
266  * because no further bookkeeping is required to manage completions
267  * once the root task completes.
268  *
269  * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine
270  * results of multiple subtasks usually need to access these results
271  * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following
272  * class (that performs a simplified form of map-reduce where mappings
273  * and reductions are all of type {@code E}), one way to do this in
274  * divide and conquer designs is to have each subtask record its
275  * sibling, so that it can be accessed in method {@code onCompletion}.
276  * This technique applies to reductions in which the order of
277  * combining left and right results does not matter; ordered
278  * reductions require explicit left/right designations.  Variants of
279  * other streamlinings seen in the above examples may also apply.
280  *
281  * <pre> {@code
282  * class MyMapper<E> { E apply(E v) {  ...  } }
283  * class MyReducer<E> { E apply(E x, E y) {  ...  } }
284  * class MapReducer<E> extends CountedCompleter<E> {
285  *   final E[] array; final MyMapper<E> mapper;
286  *   final MyReducer<E> reducer; final int lo, hi;
287  *   MapReducer<E> sibling;
288  *   E result;
289  *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
290  *              MyReducer<E> reducer, int lo, int hi) {
291  *     super(p);
292  *     this.array = array; this.mapper = mapper;
293  *     this.reducer = reducer; this.lo = lo; this.hi = hi;
294  *   }
295  *   public void compute() {
296  *     if (hi - lo >= 2) {
297  *       int mid = (lo + hi) >>> 1;
298  *       MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
299  *       MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
300  *       left.sibling = right;
301  *       right.sibling = left;
302  *       setPendingCount(1); // only right is pending
303  *       right.fork();
304  *       left.compute();     // directly execute left
305  *     }
306  *     else {
307  *       if (hi > lo)
308  *           result = mapper.apply(array[lo]);
309  *       tryComplete();
310  *     }
311  *   }
312  *   public void onCompletion(CountedCompleter<?> caller) {
313  *     if (caller != this) {
314  *       MapReducer<E> child = (MapReducer<E>)caller;
315  *       MapReducer<E> sib = child.sibling;
316  *       if (sib == null || sib.result == null)
317  *         result = child.result;
318  *       else
319  *         result = reducer.apply(child.result, sib.result);
320  *     }
321  *   }
322  *   public E getRawResult() { return result; }
323  *
324  *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
325  *     return new MapReducer<E>(null, array, mapper, reducer,
326  *                              0, array.length).invoke();
327  *   }
328  * }}</pre>
329  *
330  * Here, method {@code onCompletion} takes a form common to many
331  * completion designs that combine results. This callback-style method
332  * is triggered once per task, in either of the two different contexts
333  * in which the pending count is, or becomes, zero: (1) by a task
334  * itself, if its pending count is zero upon invocation of {@code
335  * tryComplete}, or (2) by any of its subtasks when they complete and
336  * decrement the pending count to zero. The {@code caller} argument
337  * distinguishes cases.  Most often, when the caller is {@code this},
338  * no action is necessary. Otherwise the caller argument can be used
339  * (usually via a cast) to supply a value (and/or links to other
340  * values) to be combined.  Assuming proper use of pending counts, the
341  * actions inside {@code onCompletion} occur (once) upon completion of
342  * a task and its subtasks. No additional synchronization is required
343  * within this method to ensure thread safety of accesses to fields of
344  * this task or other completed tasks.
345  *
346  * <p><b>Completion Traversals</b>. If using {@code onCompletion} to
347  * process completions is inapplicable or inconvenient, you can use
348  * methods {@link #firstComplete} and {@link #nextComplete} to create
349  * custom traversals.  For example, to define a MapReducer that only
350  * splits out right-hand tasks in the form of the third ForEach
351  * example, the completions must cooperatively reduce along
352  * unexhausted subtask links, which can be done as follows:
353  *
354  * <pre> {@code
355  * class MapReducer<E> extends CountedCompleter<E> { // version 2
356  *   final E[] array; final MyMapper<E> mapper;
357  *   final MyReducer<E> reducer; final int lo, hi;
358  *   MapReducer<E> forks, next; // record subtask forks in list
359  *   E result;
360  *   MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
361  *              MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
362  *     super(p);
363  *     this.array = array; this.mapper = mapper;
364  *     this.reducer = reducer; this.lo = lo; this.hi = hi;
365  *     this.next = next;
366  *   }
367  *   public void compute() {
368  *     int l = lo, h = hi;
369  *     while (h - l >= 2) {
370  *       int mid = (l + h) >>> 1;
371  *       addToPendingCount(1);
372  *       (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
373  *       h = mid;
374  *     }
375  *     if (h > l)
376  *       result = mapper.apply(array[l]);
377  *     // process completions by reducing along and advancing subtask links
378  *     for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
379  *       for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next)
380  *         t.result = reducer.apply(t.result, s.result);
381  *     }
382  *   }
383  *   public E getRawResult() { return result; }
384  *
385  *   public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
386  *     return new MapReducer<E>(null, array, mapper, reducer,
387  *                              0, array.length, null).invoke();
388  *   }
389  * }}</pre>
390  *
391  * <p><b>Triggers.</b> Some CountedCompleters are themselves never
392  * forked, but instead serve as bits of plumbing in other designs;
393  * including those in which the completion of one or more async tasks
394  * triggers another async task. For example:
395  *
396  * <pre> {@code
397  * class HeaderBuilder extends CountedCompleter<...> { ... }
398  * class BodyBuilder extends CountedCompleter<...> { ... }
399  * class PacketSender extends CountedCompleter<...> {
400  *   PacketSender(...) { super(null, 1); ... } // trigger on second completion
401  *   public void compute() { } // never called
402  *   public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
403  * }
404  * // sample use:
405  * PacketSender p = new PacketSender();
406  * new HeaderBuilder(p, ...).fork();
407  * new BodyBuilder(p, ...).fork();}</pre>
408  *
409  * @since 1.8
410  * @author Doug Lea
411  */
412 public abstract class CountedCompleter<T> extends ForkJoinTask<T> {
413     private static final long serialVersionUID = 5232453752276485070L;
414 
415     /** This task's completer, or null if none */
416     final CountedCompleter<?> completer;
417     /** The number of pending tasks until completion */
418     volatile int pending;
419 
420     /**
421      * Creates a new CountedCompleter with the given completer
422      * and initial pending count.
423      *
424      * @param completer this task's completer, or {@code null} if none
425      * @param initialPendingCount the initial pending count
426      */
CountedCompleter(CountedCompleter<?> completer, int initialPendingCount)427     protected CountedCompleter(CountedCompleter<?> completer,
428                                int initialPendingCount) {
429         this.completer = completer;
430         this.pending = initialPendingCount;
431     }
432 
433     /**
434      * Creates a new CountedCompleter with the given completer
435      * and an initial pending count of zero.
436      *
437      * @param completer this task's completer, or {@code null} if none
438      */
CountedCompleter(CountedCompleter<?> completer)439     protected CountedCompleter(CountedCompleter<?> completer) {
440         this.completer = completer;
441     }
442 
443     /**
444      * Creates a new CountedCompleter with no completer
445      * and an initial pending count of zero.
446      */
CountedCompleter()447     protected CountedCompleter() {
448         this.completer = null;
449     }
450 
451     /**
452      * The main computation performed by this task.
453      */
compute()454     public abstract void compute();
455 
456     /**
457      * Performs an action when method {@link #tryComplete} is invoked
458      * and the pending count is zero, or when the unconditional
459      * method {@link #complete} is invoked.  By default, this method
460      * does nothing. You can distinguish cases by checking the
461      * identity of the given caller argument. If not equal to {@code
462      * this}, then it is typically a subtask that may contain results
463      * (and/or links to other results) to combine.
464      *
465      * @param caller the task invoking this method (which may
466      * be this task itself)
467      */
onCompletion(CountedCompleter<?> caller)468     public void onCompletion(CountedCompleter<?> caller) {
469     }
470 
471     /**
472      * Performs an action when method {@link
473      * #completeExceptionally(Throwable)} is invoked or method {@link
474      * #compute} throws an exception, and this task has not already
475      * otherwise completed normally. On entry to this method, this task
476      * {@link ForkJoinTask#isCompletedAbnormally}.  The return value
477      * of this method controls further propagation: If {@code true}
478      * and this task has a completer that has not completed, then that
479      * completer is also completed exceptionally, with the same
480      * exception as this completer.  The default implementation of
481      * this method does nothing except return {@code true}.
482      *
483      * @param ex the exception
484      * @param caller the task invoking this method (which may
485      * be this task itself)
486      * @return {@code true} if this exception should be propagated to this
487      * task's completer, if one exists
488      */
onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller)489     public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) {
490         return true;
491     }
492 
493     /**
494      * Returns the completer established in this task's constructor,
495      * or {@code null} if none.
496      *
497      * @return the completer
498      */
getCompleter()499     public final CountedCompleter<?> getCompleter() {
500         return completer;
501     }
502 
503     /**
504      * Returns the current pending count.
505      *
506      * @return the current pending count
507      */
getPendingCount()508     public final int getPendingCount() {
509         return pending;
510     }
511 
512     /**
513      * Sets the pending count to the given value.
514      *
515      * @param count the count
516      */
setPendingCount(int count)517     public final void setPendingCount(int count) {
518         pending = count;
519     }
520 
521     /**
522      * Adds (atomically) the given value to the pending count.
523      *
524      * @param delta the value to add
525      */
addToPendingCount(int delta)526     public final void addToPendingCount(int delta) {
527         U.getAndAddInt(this, PENDING, delta);
528     }
529 
530     /**
531      * Sets (atomically) the pending count to the given count only if
532      * it currently holds the given expected value.
533      *
534      * @param expected the expected value
535      * @param count the new value
536      * @return {@code true} if successful
537      */
compareAndSetPendingCount(int expected, int count)538     public final boolean compareAndSetPendingCount(int expected, int count) {
539         return U.compareAndSwapInt(this, PENDING, expected, count);
540     }
541 
542     /**
543      * If the pending count is nonzero, (atomically) decrements it.
544      *
545      * @return the initial (undecremented) pending count holding on entry
546      * to this method
547      */
decrementPendingCountUnlessZero()548     public final int decrementPendingCountUnlessZero() {
549         int c;
550         do {} while ((c = pending) != 0 &&
551                      !U.compareAndSwapInt(this, PENDING, c, c - 1));
552         return c;
553     }
554 
555     /**
556      * Returns the root of the current computation; i.e., this
557      * task if it has no completer, else its completer's root.
558      *
559      * @return the root of the current computation
560      */
getRoot()561     public final CountedCompleter<?> getRoot() {
562         CountedCompleter<?> a = this, p;
563         while ((p = a.completer) != null)
564             a = p;
565         return a;
566     }
567 
568     /**
569      * If the pending count is nonzero, decrements the count;
570      * otherwise invokes {@link #onCompletion(CountedCompleter)}
571      * and then similarly tries to complete this task's completer,
572      * if one exists, else marks this task as complete.
573      */
tryComplete()574     public final void tryComplete() {
575         CountedCompleter<?> a = this, s = a;
576         for (int c;;) {
577             if ((c = a.pending) == 0) {
578                 a.onCompletion(s);
579                 if ((a = (s = a).completer) == null) {
580                     s.quietlyComplete();
581                     return;
582                 }
583             }
584             else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
585                 return;
586         }
587     }
588 
589     /**
590      * Equivalent to {@link #tryComplete} but does not invoke {@link
591      * #onCompletion(CountedCompleter)} along the completion path:
592      * If the pending count is nonzero, decrements the count;
593      * otherwise, similarly tries to complete this task's completer, if
594      * one exists, else marks this task as complete. This method may be
595      * useful in cases where {@code onCompletion} should not, or need
596      * not, be invoked for each completer in a computation.
597      */
propagateCompletion()598     public final void propagateCompletion() {
599         CountedCompleter<?> a = this, s = a;
600         for (int c;;) {
601             if ((c = a.pending) == 0) {
602                 if ((a = (s = a).completer) == null) {
603                     s.quietlyComplete();
604                     return;
605                 }
606             }
607             else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
608                 return;
609         }
610     }
611 
612     /**
613      * Regardless of pending count, invokes
614      * {@link #onCompletion(CountedCompleter)}, marks this task as
615      * complete and further triggers {@link #tryComplete} on this
616      * task's completer, if one exists.  The given rawResult is
617      * used as an argument to {@link #setRawResult} before invoking
618      * {@link #onCompletion(CountedCompleter)} or marking this task
619      * as complete; its value is meaningful only for classes
620      * overriding {@code setRawResult}.  This method does not modify
621      * the pending count.
622      *
623      * <p>This method may be useful when forcing completion as soon as
624      * any one (versus all) of several subtask results are obtained.
625      * However, in the common (and recommended) case in which {@code
626      * setRawResult} is not overridden, this effect can be obtained
627      * more simply using {@link #quietlyCompleteRoot()}.
628      *
629      * @param rawResult the raw result
630      */
complete(T rawResult)631     public void complete(T rawResult) {
632         CountedCompleter<?> p;
633         setRawResult(rawResult);
634         onCompletion(this);
635         quietlyComplete();
636         if ((p = completer) != null)
637             p.tryComplete();
638     }
639 
640     /**
641      * If this task's pending count is zero, returns this task;
642      * otherwise decrements its pending count and returns {@code null}.
643      * This method is designed to be used with {@link #nextComplete} in
644      * completion traversal loops.
645      *
646      * @return this task, if pending count was zero, else {@code null}
647      */
firstComplete()648     public final CountedCompleter<?> firstComplete() {
649         for (int c;;) {
650             if ((c = pending) == 0)
651                 return this;
652             else if (U.compareAndSwapInt(this, PENDING, c, c - 1))
653                 return null;
654         }
655     }
656 
657     /**
658      * If this task does not have a completer, invokes {@link
659      * ForkJoinTask#quietlyComplete} and returns {@code null}.  Or, if
660      * the completer's pending count is non-zero, decrements that
661      * pending count and returns {@code null}.  Otherwise, returns the
662      * completer.  This method can be used as part of a completion
663      * traversal loop for homogeneous task hierarchies:
664      *
665      * <pre> {@code
666      * for (CountedCompleter<?> c = firstComplete();
667      *      c != null;
668      *      c = c.nextComplete()) {
669      *   // ... process c ...
670      * }}</pre>
671      *
672      * @return the completer, or {@code null} if none
673      */
nextComplete()674     public final CountedCompleter<?> nextComplete() {
675         CountedCompleter<?> p;
676         if ((p = completer) != null)
677             return p.firstComplete();
678         else {
679             quietlyComplete();
680             return null;
681         }
682     }
683 
684     /**
685      * Equivalent to {@code getRoot().quietlyComplete()}.
686      */
quietlyCompleteRoot()687     public final void quietlyCompleteRoot() {
688         for (CountedCompleter<?> a = this, p;;) {
689             if ((p = a.completer) == null) {
690                 a.quietlyComplete();
691                 return;
692             }
693             a = p;
694         }
695     }
696 
697     /**
698      * If this task has not completed, attempts to process at most the
699      * given number of other unprocessed tasks for which this task is
700      * on the completion path, if any are known to exist.
701      *
702      * @param maxTasks the maximum number of tasks to process.  If
703      *                 less than or equal to zero, then no tasks are
704      *                 processed.
705      */
helpComplete(int maxTasks)706     public final void helpComplete(int maxTasks) {
707         Thread t; ForkJoinWorkerThread wt;
708         if (maxTasks > 0 && status >= 0) {
709             if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
710                 (wt = (ForkJoinWorkerThread)t).pool.
711                     helpComplete(wt.workQueue, this, maxTasks);
712             else
713                 ForkJoinPool.common.externalHelpComplete(this, maxTasks);
714         }
715     }
716 
717     /**
718      * Supports ForkJoinTask exception propagation.
719      */
internalPropagateException(Throwable ex)720     void internalPropagateException(Throwable ex) {
721         CountedCompleter<?> a = this, s = a;
722         while (a.onExceptionalCompletion(ex, s) &&
723                (a = (s = a).completer) != null && a.status >= 0 &&
724                a.recordExceptionalCompletion(ex) == EXCEPTIONAL)
725             ;
726     }
727 
728     /**
729      * Implements execution conventions for CountedCompleters.
730      */
exec()731     protected final boolean exec() {
732         compute();
733         return false;
734     }
735 
736     /**
737      * Returns the result of the computation.  By default,
738      * returns {@code null}, which is appropriate for {@code Void}
739      * actions, but in other cases should be overridden, almost
740      * always to return a field or function of a field that
741      * holds the result upon completion.
742      *
743      * @return the result of the computation
744      */
getRawResult()745     public T getRawResult() { return null; }
746 
747     /**
748      * A method that result-bearing CountedCompleters may optionally
749      * use to help maintain result data.  By default, does nothing.
750      * Overrides are not recommended. However, if this method is
751      * overridden to update existing objects or fields, then it must
752      * in general be defined to be thread-safe.
753      */
setRawResult(T t)754     protected void setRawResult(T t) { }
755 
756     // Unsafe mechanics
757     private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
758     private static final long PENDING;
759     static {
760         try {
761             PENDING = U.objectFieldOffset
762                 (CountedCompleter.class.getDeclaredField("pending"));
763         } catch (ReflectiveOperationException e) {
764             throw new Error(e);
765         }
766     }
767 }
768