1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import jdk.internal.misc.Unsafe; 39 40 /** 41 * A {@link ForkJoinTask} with a completion action performed when 42 * triggered and there are no remaining pending actions. 43 * CountedCompleters are in general more robust in the 44 * presence of subtask stalls and blockage than are other forms of 45 * ForkJoinTasks, but are less intuitive to program. Uses of 46 * CountedCompleter are similar to those of other completion based 47 * components (such as {@link java.nio.channels.CompletionHandler}) 48 * except that multiple <em>pending</em> completions may be necessary 49 * to trigger the completion action {@link #onCompletion(CountedCompleter)}, 50 * not just one. 51 * Unless initialized otherwise, the {@linkplain #getPendingCount pending 52 * count} starts at zero, but may be (atomically) changed using 53 * methods {@link #setPendingCount}, {@link #addToPendingCount}, and 54 * {@link #compareAndSetPendingCount}. Upon invocation of {@link 55 * #tryComplete}, if the pending action count is nonzero, it is 56 * decremented; otherwise, the completion action is performed, and if 57 * this completer itself has a completer, the process is continued 58 * with its completer. As is the case with related synchronization 59 * components such as {@link Phaser} and {@link Semaphore}, these methods 60 * affect only internal counts; they do not establish any further 61 * internal bookkeeping. In particular, the identities of pending 62 * tasks are not maintained. As illustrated below, you can create 63 * subclasses that do record some or all pending tasks or their 64 * results when needed. As illustrated below, utility methods 65 * supporting customization of completion traversals are also 66 * provided. However, because CountedCompleters provide only basic 67 * synchronization mechanisms, it may be useful to create further 68 * abstract subclasses that maintain linkages, fields, and additional 69 * support methods appropriate for a set of related usages. 70 * 71 * <p>A concrete CountedCompleter class must define method {@link 72 * #compute}, that should in most cases (as illustrated below), invoke 73 * {@code tryComplete()} once before returning. The class may also 74 * optionally override method {@link #onCompletion(CountedCompleter)} 75 * to perform an action upon normal completion, and method 76 * {@link #onExceptionalCompletion(Throwable, CountedCompleter)} to 77 * perform an action upon any exception. 78 * 79 * <p>CountedCompleters most often do not bear results, in which case 80 * they are normally declared as {@code CountedCompleter<Void>}, and 81 * will always return {@code null} as a result value. In other cases, 82 * you should override method {@link #getRawResult} to provide a 83 * result from {@code join(), invoke()}, and related methods. In 84 * general, this method should return the value of a field (or a 85 * function of one or more fields) of the CountedCompleter object that 86 * holds the result upon completion. Method {@link #setRawResult} by 87 * default plays no role in CountedCompleters. It is possible, but 88 * rarely applicable, to override this method to maintain other 89 * objects or fields holding result data. 90 * 91 * <p>A CountedCompleter that does not itself have a completer (i.e., 92 * one for which {@link #getCompleter} returns {@code null}) can be 93 * used as a regular ForkJoinTask with this added functionality. 94 * However, any completer that in turn has another completer serves 95 * only as an internal helper for other computations, so its own task 96 * status (as reported in methods such as {@link ForkJoinTask#isDone}) 97 * is arbitrary; this status changes only upon explicit invocations of 98 * {@link #complete}, {@link ForkJoinTask#cancel}, 99 * {@link ForkJoinTask#completeExceptionally(Throwable)} or upon 100 * exceptional completion of method {@code compute}. Upon any 101 * exceptional completion, the exception may be relayed to a task's 102 * completer (and its completer, and so on), if one exists and it has 103 * not otherwise already completed. Similarly, cancelling an internal 104 * CountedCompleter has only a local effect on that completer, so is 105 * not often useful. 106 * 107 * <p><b>Sample Usages.</b> 108 * 109 * <p><b>Parallel recursive decomposition.</b> CountedCompleters may 110 * be arranged in trees similar to those often used with {@link 111 * RecursiveAction}s, although the constructions involved in setting 112 * them up typically vary. Here, the completer of each task is its 113 * parent in the computation tree. Even though they entail a bit more 114 * bookkeeping, CountedCompleters may be better choices when applying 115 * a possibly time-consuming operation (that cannot be further 116 * subdivided) to each element of an array or collection; especially 117 * when the operation takes a significantly different amount of time 118 * to complete for some elements than others, either because of 119 * intrinsic variation (for example I/O) or auxiliary effects such as 120 * garbage collection. Because CountedCompleters provide their own 121 * continuations, other tasks need not block waiting to perform them. 122 * 123 * <p>For example, here is an initial version of a utility method that 124 * uses divide-by-two recursive decomposition to divide work into 125 * single pieces (leaf tasks). Even when work is split into individual 126 * calls, tree-based techniques are usually preferable to directly 127 * forking leaf tasks, because they reduce inter-thread communication 128 * and improve load balancing. In the recursive case, the second of 129 * each pair of subtasks to finish triggers completion of their parent 130 * (because no result combination is performed, the default no-op 131 * implementation of method {@code onCompletion} is not overridden). 132 * The utility method sets up the root task and invokes it (here, 133 * implicitly using the {@link ForkJoinPool#commonPool()}). It is 134 * straightforward and reliable (but not optimal) to always set the 135 * pending count to the number of child tasks and call {@code 136 * tryComplete()} immediately before returning. 137 * 138 * <pre> {@code 139 * public static <E> void forEach(E[] array, Consumer<E> action) { 140 * class Task extends CountedCompleter<Void> { 141 * final int lo, hi; 142 * Task(Task parent, int lo, int hi) { 143 * super(parent); this.lo = lo; this.hi = hi; 144 * } 145 * 146 * public void compute() { 147 * if (hi - lo >= 2) { 148 * int mid = (lo + hi) >>> 1; 149 * // must set pending count before fork 150 * setPendingCount(2); 151 * new Task(this, mid, hi).fork(); // right child 152 * new Task(this, lo, mid).fork(); // left child 153 * } 154 * else if (hi > lo) 155 * action.accept(array[lo]); 156 * tryComplete(); 157 * } 158 * } 159 * new Task(null, 0, array.length).invoke(); 160 * }}</pre> 161 * 162 * This design can be improved by noticing that in the recursive case, 163 * the task has nothing to do after forking its right task, so can 164 * directly invoke its left task before returning. (This is an analog 165 * of tail recursion removal.) Also, when the last action in a task 166 * is to fork or invoke a subtask (a "tail call"), the call to {@code 167 * tryComplete()} can be optimized away, at the cost of making the 168 * pending count look "off by one". 169 * 170 * <pre> {@code 171 * public void compute() { 172 * if (hi - lo >= 2) { 173 * int mid = (lo + hi) >>> 1; 174 * setPendingCount(1); // looks off by one, but correct! 175 * new Task(this, mid, hi).fork(); // right child 176 * new Task(this, lo, mid).compute(); // direct invoke 177 * } else { 178 * if (hi > lo) 179 * action.accept(array[lo]); 180 * tryComplete(); 181 * } 182 * }}</pre> 183 * 184 * As a further optimization, notice that the left task need not even exist. 185 * Instead of creating a new one, we can continue using the original task, 186 * and add a pending count for each fork. Additionally, because no task 187 * in this tree implements an {@link #onCompletion(CountedCompleter)} method, 188 * {@code tryComplete} can be replaced with {@link #propagateCompletion}. 189 * 190 * <pre> {@code 191 * public void compute() { 192 * int n = hi - lo; 193 * for (; n >= 2; n /= 2) { 194 * addToPendingCount(1); 195 * new Task(this, lo + n/2, lo + n).fork(); 196 * } 197 * if (n > 0) 198 * action.accept(array[lo]); 199 * propagateCompletion(); 200 * }}</pre> 201 * 202 * When pending counts can be precomputed, they can be established in 203 * the constructor: 204 * 205 * <pre> {@code 206 * public static <E> void forEach(E[] array, Consumer<E> action) { 207 * class Task extends CountedCompleter<Void> { 208 * final int lo, hi; 209 * Task(Task parent, int lo, int hi) { 210 * super(parent, 31 - Integer.numberOfLeadingZeros(hi - lo)); 211 * this.lo = lo; this.hi = hi; 212 * } 213 * 214 * public void compute() { 215 * for (int n = hi - lo; n >= 2; n /= 2) 216 * new Task(this, lo + n/2, lo + n).fork(); 217 * action.accept(array[lo]); 218 * propagateCompletion(); 219 * } 220 * } 221 * if (array.length > 0) 222 * new Task(null, 0, array.length).invoke(); 223 * }}</pre> 224 * 225 * Additional optimizations of such classes might entail specializing 226 * classes for leaf steps, subdividing by say, four, instead of two 227 * per iteration, and using an adaptive threshold instead of always 228 * subdividing down to single elements. 229 * 230 * <p><b>Searching.</b> A tree of CountedCompleters can search for a 231 * value or property in different parts of a data structure, and 232 * report a result in an {@link 233 * java.util.concurrent.atomic.AtomicReference AtomicReference} as 234 * soon as one is found. The others can poll the result to avoid 235 * unnecessary work. (You could additionally {@linkplain #cancel 236 * cancel} other tasks, but it is usually simpler and more efficient 237 * to just let them notice that the result is set and if so skip 238 * further processing.) Illustrating again with an array using full 239 * partitioning (again, in practice, leaf tasks will almost always 240 * process more than one element): 241 * 242 * <pre> {@code 243 * class Searcher<E> extends CountedCompleter<E> { 244 * final E[] array; final AtomicReference<E> result; final int lo, hi; 245 * Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) { 246 * super(p); 247 * this.array = array; this.result = result; this.lo = lo; this.hi = hi; 248 * } 249 * public E getRawResult() { return result.get(); } 250 * public void compute() { // similar to ForEach version 3 251 * int l = lo, h = hi; 252 * while (result.get() == null && h >= l) { 253 * if (h - l >= 2) { 254 * int mid = (l + h) >>> 1; 255 * addToPendingCount(1); 256 * new Searcher(this, array, result, mid, h).fork(); 257 * h = mid; 258 * } 259 * else { 260 * E x = array[l]; 261 * if (matches(x) && result.compareAndSet(null, x)) 262 * quietlyCompleteRoot(); // root task is now joinable 263 * break; 264 * } 265 * } 266 * tryComplete(); // normally complete whether or not found 267 * } 268 * boolean matches(E e) { ... } // return true if found 269 * 270 * public static <E> E search(E[] array) { 271 * return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke(); 272 * } 273 * }}</pre> 274 * 275 * In this example, as well as others in which tasks have no other 276 * effects except to {@code compareAndSet} a common result, the 277 * trailing unconditional invocation of {@code tryComplete} could be 278 * made conditional ({@code if (result.get() == null) tryComplete();}) 279 * because no further bookkeeping is required to manage completions 280 * once the root task completes. 281 * 282 * <p><b>Recording subtasks.</b> CountedCompleter tasks that combine 283 * results of multiple subtasks usually need to access these results 284 * in method {@link #onCompletion(CountedCompleter)}. As illustrated in the following 285 * class (that performs a simplified form of map-reduce where mappings 286 * and reductions are all of type {@code E}), one way to do this in 287 * divide and conquer designs is to have each subtask record its 288 * sibling, so that it can be accessed in method {@code onCompletion}. 289 * This technique applies to reductions in which the order of 290 * combining left and right results does not matter; ordered 291 * reductions require explicit left/right designations. Variants of 292 * other streamlinings seen in the above examples may also apply. 293 * 294 * <pre> {@code 295 * class MyMapper<E> { E apply(E v) { ... } } 296 * class MyReducer<E> { E apply(E x, E y) { ... } } 297 * class MapReducer<E> extends CountedCompleter<E> { 298 * final E[] array; final MyMapper<E> mapper; 299 * final MyReducer<E> reducer; final int lo, hi; 300 * MapReducer<E> sibling; 301 * E result; 302 * MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, 303 * MyReducer<E> reducer, int lo, int hi) { 304 * super(p); 305 * this.array = array; this.mapper = mapper; 306 * this.reducer = reducer; this.lo = lo; this.hi = hi; 307 * } 308 * public void compute() { 309 * if (hi - lo >= 2) { 310 * int mid = (lo + hi) >>> 1; 311 * MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid); 312 * MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi); 313 * left.sibling = right; 314 * right.sibling = left; 315 * setPendingCount(1); // only right is pending 316 * right.fork(); 317 * left.compute(); // directly execute left 318 * } 319 * else { 320 * if (hi > lo) 321 * result = mapper.apply(array[lo]); 322 * tryComplete(); 323 * } 324 * } 325 * public void onCompletion(CountedCompleter<?> caller) { 326 * if (caller != this) { 327 * MapReducer<E> child = (MapReducer<E>)caller; 328 * MapReducer<E> sib = child.sibling; 329 * if (sib == null || sib.result == null) 330 * result = child.result; 331 * else 332 * result = reducer.apply(child.result, sib.result); 333 * } 334 * } 335 * public E getRawResult() { return result; } 336 * 337 * public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { 338 * return new MapReducer<E>(null, array, mapper, reducer, 339 * 0, array.length).invoke(); 340 * } 341 * }}</pre> 342 * 343 * Here, method {@code onCompletion} takes a form common to many 344 * completion designs that combine results. This callback-style method 345 * is triggered once per task, in either of the two different contexts 346 * in which the pending count is, or becomes, zero: (1) by a task 347 * itself, if its pending count is zero upon invocation of {@code 348 * tryComplete}, or (2) by any of its subtasks when they complete and 349 * decrement the pending count to zero. The {@code caller} argument 350 * distinguishes cases. Most often, when the caller is {@code this}, 351 * no action is necessary. Otherwise the caller argument can be used 352 * (usually via a cast) to supply a value (and/or links to other 353 * values) to be combined. Assuming proper use of pending counts, the 354 * actions inside {@code onCompletion} occur (once) upon completion of 355 * a task and its subtasks. No additional synchronization is required 356 * within this method to ensure thread safety of accesses to fields of 357 * this task or other completed tasks. 358 * 359 * <p><b>Completion Traversals.</b> If using {@code onCompletion} to 360 * process completions is inapplicable or inconvenient, you can use 361 * methods {@link #firstComplete} and {@link #nextComplete} to create 362 * custom traversals. For example, to define a MapReducer that only 363 * splits out right-hand tasks in the form of the third ForEach 364 * example, the completions must cooperatively reduce along 365 * unexhausted subtask links, which can be done as follows: 366 * 367 * <pre> {@code 368 * class MapReducer<E> extends CountedCompleter<E> { // version 2 369 * final E[] array; final MyMapper<E> mapper; 370 * final MyReducer<E> reducer; final int lo, hi; 371 * MapReducer<E> forks, next; // record subtask forks in list 372 * E result; 373 * MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper, 374 * MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) { 375 * super(p); 376 * this.array = array; this.mapper = mapper; 377 * this.reducer = reducer; this.lo = lo; this.hi = hi; 378 * this.next = next; 379 * } 380 * public void compute() { 381 * int l = lo, h = hi; 382 * while (h - l >= 2) { 383 * int mid = (l + h) >>> 1; 384 * addToPendingCount(1); 385 * (forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork(); 386 * h = mid; 387 * } 388 * if (h > l) 389 * result = mapper.apply(array[l]); 390 * // process completions by reducing along and advancing subtask links 391 * for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) { 392 * for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next) 393 * t.result = reducer.apply(t.result, s.result); 394 * } 395 * } 396 * public E getRawResult() { return result; } 397 * 398 * public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) { 399 * return new MapReducer<E>(null, array, mapper, reducer, 400 * 0, array.length, null).invoke(); 401 * } 402 * }}</pre> 403 * 404 * <p><b>Triggers.</b> Some CountedCompleters are themselves never 405 * forked, but instead serve as bits of plumbing in other designs; 406 * including those in which the completion of one or more async tasks 407 * triggers another async task. For example: 408 * 409 * <pre> {@code 410 * class HeaderBuilder extends CountedCompleter<...> { ... } 411 * class BodyBuilder extends CountedCompleter<...> { ... } 412 * class PacketSender extends CountedCompleter<...> { 413 * PacketSender(...) { super(null, 1); ... } // trigger on second completion 414 * public void compute() { } // never called 415 * public void onCompletion(CountedCompleter<?> caller) { sendPacket(); } 416 * } 417 * // sample use: 418 * PacketSender p = new PacketSender(); 419 * new HeaderBuilder(p, ...).fork(); 420 * new BodyBuilder(p, ...).fork();}</pre> 421 * 422 * @param <T> the type of the result of the completer 423 * 424 * @since 1.8 425 * @author Doug Lea 426 */ 427 public abstract class CountedCompleter<T> extends ForkJoinTask<T> { 428 private static final long serialVersionUID = 5232453752276485070L; 429 430 /** This task's completer, or null if none */ 431 final CountedCompleter<?> completer; 432 /** The number of pending tasks until completion */ 433 volatile int pending; 434 435 /** 436 * Creates a new CountedCompleter with the given completer 437 * and initial pending count. 438 * 439 * @param completer this task's completer, or {@code null} if none 440 * @param initialPendingCount the initial pending count 441 */ CountedCompleter(CountedCompleter<?> completer, int initialPendingCount)442 protected CountedCompleter(CountedCompleter<?> completer, 443 int initialPendingCount) { 444 this.completer = completer; 445 this.pending = initialPendingCount; 446 } 447 448 /** 449 * Creates a new CountedCompleter with the given completer 450 * and an initial pending count of zero. 451 * 452 * @param completer this task's completer, or {@code null} if none 453 */ CountedCompleter(CountedCompleter<?> completer)454 protected CountedCompleter(CountedCompleter<?> completer) { 455 this.completer = completer; 456 } 457 458 /** 459 * Creates a new CountedCompleter with no completer 460 * and an initial pending count of zero. 461 */ CountedCompleter()462 protected CountedCompleter() { 463 this.completer = null; 464 } 465 466 /** 467 * The main computation performed by this task. 468 */ compute()469 public abstract void compute(); 470 471 /** 472 * Performs an action when method {@link #tryComplete} is invoked 473 * and the pending count is zero, or when the unconditional 474 * method {@link #complete} is invoked. By default, this method 475 * does nothing. You can distinguish cases by checking the 476 * identity of the given caller argument. If not equal to {@code 477 * this}, then it is typically a subtask that may contain results 478 * (and/or links to other results) to combine. 479 * 480 * @param caller the task invoking this method (which may 481 * be this task itself) 482 */ onCompletion(CountedCompleter<?> caller)483 public void onCompletion(CountedCompleter<?> caller) { 484 } 485 486 /** 487 * Performs an action when method {@link 488 * #completeExceptionally(Throwable)} is invoked or method {@link 489 * #compute} throws an exception, and this task has not already 490 * otherwise completed normally. On entry to this method, this task 491 * {@link ForkJoinTask#isCompletedAbnormally}. The return value 492 * of this method controls further propagation: If {@code true} 493 * and this task has a completer that has not completed, then that 494 * completer is also completed exceptionally, with the same 495 * exception as this completer. The default implementation of 496 * this method does nothing except return {@code true}. 497 * 498 * @param ex the exception 499 * @param caller the task invoking this method (which may 500 * be this task itself) 501 * @return {@code true} if this exception should be propagated to this 502 * task's completer, if one exists 503 */ onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller)504 public boolean onExceptionalCompletion(Throwable ex, CountedCompleter<?> caller) { 505 return true; 506 } 507 508 /** 509 * Returns the completer established in this task's constructor, 510 * or {@code null} if none. 511 * 512 * @return the completer 513 */ getCompleter()514 public final CountedCompleter<?> getCompleter() { 515 return completer; 516 } 517 518 /** 519 * Returns the current pending count. 520 * 521 * @return the current pending count 522 */ getPendingCount()523 public final int getPendingCount() { 524 return pending; 525 } 526 527 /** 528 * Sets the pending count to the given value. 529 * 530 * @param count the count 531 */ setPendingCount(int count)532 public final void setPendingCount(int count) { 533 pending = count; 534 } 535 536 /** 537 * Adds (atomically) the given value to the pending count. 538 * 539 * @param delta the value to add 540 */ addToPendingCount(int delta)541 public final void addToPendingCount(int delta) { 542 U.getAndAddInt(this, PENDING, delta); 543 } 544 545 /** 546 * Sets (atomically) the pending count to the given count only if 547 * it currently holds the given expected value. 548 * 549 * @param expected the expected value 550 * @param count the new value 551 * @return {@code true} if successful 552 */ compareAndSetPendingCount(int expected, int count)553 public final boolean compareAndSetPendingCount(int expected, int count) { 554 return U.compareAndSetInt(this, PENDING, expected, count); 555 } 556 557 // internal-only weak version weakCompareAndSetPendingCount(int expected, int count)558 final boolean weakCompareAndSetPendingCount(int expected, int count) { 559 return U.weakCompareAndSetInt(this, PENDING, expected, count); 560 } 561 562 /** 563 * If the pending count is nonzero, (atomically) decrements it. 564 * 565 * @return the initial (undecremented) pending count holding on entry 566 * to this method 567 */ decrementPendingCountUnlessZero()568 public final int decrementPendingCountUnlessZero() { 569 int c; 570 do {} while ((c = pending) != 0 && 571 !weakCompareAndSetPendingCount(c, c - 1)); 572 return c; 573 } 574 575 /** 576 * Returns the root of the current computation; i.e., this 577 * task if it has no completer, else its completer's root. 578 * 579 * @return the root of the current computation 580 */ getRoot()581 public final CountedCompleter<?> getRoot() { 582 CountedCompleter<?> a = this, p; 583 while ((p = a.completer) != null) 584 a = p; 585 return a; 586 } 587 588 /** 589 * If the pending count is nonzero, decrements the count; 590 * otherwise invokes {@link #onCompletion(CountedCompleter)} 591 * and then similarly tries to complete this task's completer, 592 * if one exists, else marks this task as complete. 593 */ tryComplete()594 public final void tryComplete() { 595 CountedCompleter<?> a = this, s = a; 596 for (int c;;) { 597 if ((c = a.pending) == 0) { 598 a.onCompletion(s); 599 if ((a = (s = a).completer) == null) { 600 s.quietlyComplete(); 601 return; 602 } 603 } 604 else if (a.weakCompareAndSetPendingCount(c, c - 1)) 605 return; 606 } 607 } 608 609 /** 610 * Equivalent to {@link #tryComplete} but does not invoke {@link 611 * #onCompletion(CountedCompleter)} along the completion path: 612 * If the pending count is nonzero, decrements the count; 613 * otherwise, similarly tries to complete this task's completer, if 614 * one exists, else marks this task as complete. This method may be 615 * useful in cases where {@code onCompletion} should not, or need 616 * not, be invoked for each completer in a computation. 617 */ propagateCompletion()618 public final void propagateCompletion() { 619 CountedCompleter<?> a = this, s; 620 for (int c;;) { 621 if ((c = a.pending) == 0) { 622 if ((a = (s = a).completer) == null) { 623 s.quietlyComplete(); 624 return; 625 } 626 } 627 else if (a.weakCompareAndSetPendingCount(c, c - 1)) 628 return; 629 } 630 } 631 632 /** 633 * Regardless of pending count, invokes 634 * {@link #onCompletion(CountedCompleter)}, marks this task as 635 * complete and further triggers {@link #tryComplete} on this 636 * task's completer, if one exists. The given rawResult is 637 * used as an argument to {@link #setRawResult} before invoking 638 * {@link #onCompletion(CountedCompleter)} or marking this task 639 * as complete; its value is meaningful only for classes 640 * overriding {@code setRawResult}. This method does not modify 641 * the pending count. 642 * 643 * <p>This method may be useful when forcing completion as soon as 644 * any one (versus all) of several subtask results are obtained. 645 * However, in the common (and recommended) case in which {@code 646 * setRawResult} is not overridden, this effect can be obtained 647 * more simply using {@link #quietlyCompleteRoot()}. 648 * 649 * @param rawResult the raw result 650 */ complete(T rawResult)651 public void complete(T rawResult) { 652 CountedCompleter<?> p; 653 setRawResult(rawResult); 654 onCompletion(this); 655 quietlyComplete(); 656 if ((p = completer) != null) 657 p.tryComplete(); 658 } 659 660 /** 661 * If this task's pending count is zero, returns this task; 662 * otherwise decrements its pending count and returns {@code null}. 663 * This method is designed to be used with {@link #nextComplete} in 664 * completion traversal loops. 665 * 666 * @return this task, if pending count was zero, else {@code null} 667 */ firstComplete()668 public final CountedCompleter<?> firstComplete() { 669 for (int c;;) { 670 if ((c = pending) == 0) 671 return this; 672 else if (weakCompareAndSetPendingCount(c, c - 1)) 673 return null; 674 } 675 } 676 677 /** 678 * If this task does not have a completer, invokes {@link 679 * ForkJoinTask#quietlyComplete} and returns {@code null}. Or, if 680 * the completer's pending count is non-zero, decrements that 681 * pending count and returns {@code null}. Otherwise, returns the 682 * completer. This method can be used as part of a completion 683 * traversal loop for homogeneous task hierarchies: 684 * 685 * <pre> {@code 686 * for (CountedCompleter<?> c = firstComplete(); 687 * c != null; 688 * c = c.nextComplete()) { 689 * // ... process c ... 690 * }}</pre> 691 * 692 * @return the completer, or {@code null} if none 693 */ nextComplete()694 public final CountedCompleter<?> nextComplete() { 695 CountedCompleter<?> p; 696 if ((p = completer) != null) 697 return p.firstComplete(); 698 else { 699 quietlyComplete(); 700 return null; 701 } 702 } 703 704 /** 705 * Equivalent to {@code getRoot().quietlyComplete()}. 706 */ quietlyCompleteRoot()707 public final void quietlyCompleteRoot() { 708 for (CountedCompleter<?> a = this, p;;) { 709 if ((p = a.completer) == null) { 710 a.quietlyComplete(); 711 return; 712 } 713 a = p; 714 } 715 } 716 717 /** 718 * If this task has not completed, attempts to process at most the 719 * given number of other unprocessed tasks for which this task is 720 * on the completion path, if any are known to exist. 721 * 722 * @param maxTasks the maximum number of tasks to process. If 723 * less than or equal to zero, then no tasks are 724 * processed. 725 */ helpComplete(int maxTasks)726 public final void helpComplete(int maxTasks) { 727 ForkJoinPool.WorkQueue q; Thread t; boolean owned; 728 if (owned = (t = Thread.currentThread()) instanceof ForkJoinWorkerThread) 729 q = ((ForkJoinWorkerThread)t).workQueue; 730 else 731 q = ForkJoinPool.commonQueue(); 732 if (q != null && maxTasks > 0) 733 q.helpComplete(this, owned, maxTasks); 734 } 735 // ForkJoinTask overrides 736 737 /** 738 * Supports ForkJoinTask exception propagation. 739 */ 740 @Override trySetException(Throwable ex)741 final int trySetException(Throwable ex) { 742 CountedCompleter<?> a = this, p = a; 743 do {} while (isExceptionalStatus(a.trySetThrown(ex)) && 744 a.onExceptionalCompletion(ex, p) && 745 (a = (p = a).completer) != null && a.status >= 0); 746 return status; 747 } 748 749 /** 750 * Implements execution conventions for CountedCompleters. 751 */ 752 @Override exec()753 protected final boolean exec() { 754 compute(); 755 return false; 756 } 757 758 /** 759 * Returns the result of the computation. By default, 760 * returns {@code null}, which is appropriate for {@code Void} 761 * actions, but in other cases should be overridden, almost 762 * always to return a field or function of a field that 763 * holds the result upon completion. 764 * 765 * @return the result of the computation 766 */ 767 @Override getRawResult()768 public T getRawResult() { return null; } 769 770 /** 771 * A method that result-bearing CountedCompleters may optionally 772 * use to help maintain result data. By default, does nothing. 773 * Overrides are not recommended. However, if this method is 774 * overridden to update existing objects or fields, then it must 775 * in general be defined to be thread-safe. 776 */ 777 @Override setRawResult(T t)778 protected void setRawResult(T t) { } 779 780 /* 781 * This class uses jdk-internal Unsafe for atomics and special 782 * memory modes, rather than VarHandles, to avoid initialization 783 * dependencies in other jdk components that require early 784 * parallelism. 785 */ 786 private static final Unsafe U; 787 private static final long PENDING; 788 static { 789 U = Unsafe.getUnsafe(); 790 PENDING = U.objectFieldOffset(CountedCompleter.class, "pending"); 791 } 792 } 793