1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.concurrent.locks.LockSupport; 41 import java.util.function.BiConsumer; 42 import java.util.function.BiFunction; 43 import java.util.function.Consumer; 44 import java.util.function.Function; 45 import java.util.function.Supplier; 46 import java.util.Objects; 47 48 // Android-note: Class javadoc changed to remove references to hidden OpenJDK 9 methods. 49 50 /** 51 * A {@link Future} that may be explicitly completed (setting its 52 * value and status), and may be used as a {@link CompletionStage}, 53 * supporting dependent functions and actions that trigger upon its 54 * completion. 55 * 56 * <p>When two or more threads attempt to 57 * {@link #complete complete}, 58 * {@link #completeExceptionally completeExceptionally}, or 59 * {@link #cancel cancel} 60 * a CompletableFuture, only one of them succeeds. 61 * 62 * <p>In addition to these and related methods for directly 63 * manipulating status and results, CompletableFuture implements 64 * interface {@link CompletionStage} with the following policies: <ul> 65 * 66 * <li>Actions supplied for dependent completions of 67 * <em>non-async</em> methods may be performed by the thread that 68 * completes the current CompletableFuture, or by any other caller of 69 * a completion method. 70 * 71 * <li>All <em>async</em> methods without an explicit Executor 72 * argument are performed using the {@link ForkJoinPool#commonPool()} 73 * (unless it does not support a parallelism level of at least two, in 74 * which case, a new Thread is created to run each task). 75 * To simplify monitoring, debugging, 76 * and tracking, all generated asynchronous tasks are instances of the 77 * marker interface {@link AsynchronousCompletionTask}. Operations 78 * with time-delays can use adapter methods defined in this class, for 79 * example: {@code supplyAsync(supplier, delayedExecutor(timeout, 80 * timeUnit))}. To support methods with delays and timeouts, this 81 * class maintains at most one daemon thread for triggering and 82 * cancelling actions, not for running them. 83 * 84 * <li>All CompletionStage methods are implemented independently of 85 * other public methods, so the behavior of one method is not impacted 86 * by overrides of others in subclasses. 87 * 88 * </ul> 89 * 90 * <p>CompletableFuture also implements {@link Future} with the following 91 * policies: <ul> 92 * 93 * <li>Since (unlike {@link FutureTask}) this class has no direct 94 * control over the computation that causes it to be completed, 95 * cancellation is treated as just another form of exceptional 96 * completion. Method {@link #cancel cancel} has the same effect as 97 * {@code completeExceptionally(new CancellationException())}. Method 98 * {@link #isCompletedExceptionally} can be used to determine if a 99 * CompletableFuture completed in any exceptional fashion. 100 * 101 * <li>In case of exceptional completion with a CompletionException, 102 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 103 * {@link ExecutionException} with the same cause as held in the 104 * corresponding CompletionException. To simplify usage in most 105 * contexts, this class also defines methods {@link #join()} and 106 * {@link #getNow} that instead throw the CompletionException directly 107 * in these cases. 108 * </ul> 109 * 110 * <p>Arguments used to pass a completion result (that is, for 111 * parameters of type {@code T}) for methods accepting them may be 112 * null, but passing a null value for any other parameter will result 113 * in a {@link NullPointerException} being thrown. 114 * 115 * @author Doug Lea 116 * @param <T> The result type returned by this future's {@code join} 117 * and {@code get} methods 118 * @since 1.8 119 */ 120 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { 121 122 /* 123 * Overview: 124 * 125 * A CompletableFuture may have dependent completion actions, 126 * collected in a linked stack. It atomically completes by CASing 127 * a result field, and then pops off and runs those actions. This 128 * applies across normal vs exceptional outcomes, sync vs async 129 * actions, binary triggers, and various forms of completions. 130 * 131 * Non-nullness of volatile field "result" indicates done. It may 132 * be set directly if known to be thread-confined, else via CAS. 133 * An AltResult is used to box null as a result, as well as to 134 * hold exceptions. Using a single field makes completion simple 135 * to detect and trigger. Result encoding and decoding is 136 * straightforward but tedious and adds to the sprawl of trapping 137 * and associating exceptions with targets. Minor simplifications 138 * rely on (static) NIL (to box null results) being the only 139 * AltResult with a null exception field, so we don't usually need 140 * explicit comparisons. Even though some of the generics casts 141 * are unchecked (see SuppressWarnings annotations), they are 142 * placed to be appropriate even if checked. 143 * 144 * Dependent actions are represented by Completion objects linked 145 * as Treiber stacks headed by field "stack". There are Completion 146 * classes for each kind of action, grouped into: 147 * - single-input (UniCompletion), 148 * - two-input (BiCompletion), 149 * - projected (BiCompletions using exactly one of two inputs), 150 * - shared (CoCompletion, used by the second of two sources), 151 * - zero-input source actions, 152 * - Signallers that unblock waiters. 153 * Class Completion extends ForkJoinTask to enable async execution 154 * (adding no space overhead because we exploit its "tag" methods 155 * to maintain claims). It is also declared as Runnable to allow 156 * usage with arbitrary executors. 157 * 158 * Support for each kind of CompletionStage relies on a separate 159 * class, along with two CompletableFuture methods: 160 * 161 * * A Completion class with name X corresponding to function, 162 * prefaced with "Uni", "Bi", or "Or". Each class contains 163 * fields for source(s), actions, and dependent. They are 164 * boringly similar, differing from others only with respect to 165 * underlying functional forms. We do this so that users don't 166 * encounter layers of adapters in common usages. 167 * 168 * * Boolean CompletableFuture method x(...) (for example 169 * biApply) takes all of the arguments needed to check that an 170 * action is triggerable, and then either runs the action or 171 * arranges its async execution by executing its Completion 172 * argument, if present. The method returns true if known to be 173 * complete. 174 * 175 * * Completion method tryFire(int mode) invokes the associated x 176 * method with its held arguments, and on success cleans up. 177 * The mode argument allows tryFire to be called twice (SYNC, 178 * then ASYNC); the first to screen and trap exceptions while 179 * arranging to execute, and the second when called from a task. 180 * (A few classes are not used async so take slightly different 181 * forms.) The claim() callback suppresses function invocation 182 * if already claimed by another thread. 183 * 184 * * Some classes (for example UniApply) have separate handling 185 * code for when known to be thread-confined ("now" methods) and 186 * for when shared (in tryFire), for efficiency. 187 * 188 * * CompletableFuture method xStage(...) is called from a public 189 * stage method of CompletableFuture f. It screens user 190 * arguments and invokes and/or creates the stage object. If 191 * not async and already triggerable, the action is run 192 * immediately. Otherwise a Completion c is created, and 193 * submitted to the executor if triggerable, or pushed onto f's 194 * stack if not. Completion actions are started via c.tryFire. 195 * We recheck after pushing to a source future's stack to cover 196 * possible races if the source completes while pushing. 197 * Classes with two inputs (for example BiApply) deal with races 198 * across both while pushing actions. The second completion is 199 * a CoCompletion pointing to the first, shared so that at most 200 * one performs the action. The multiple-arity methods allOf 201 * does this pairwise to form trees of completions. Method 202 * anyOf is handled differently from allOf because completion of 203 * any source should trigger a cleanStack of other sources. 204 * Each AnyOf completion can reach others via a shared array. 205 * 206 * Note that the generic type parameters of methods vary according 207 * to whether "this" is a source, dependent, or completion. 208 * 209 * Method postComplete is called upon completion unless the target 210 * is guaranteed not to be observable (i.e., not yet returned or 211 * linked). Multiple threads can call postComplete, which 212 * atomically pops each dependent action, and tries to trigger it 213 * via method tryFire, in NESTED mode. Triggering can propagate 214 * recursively, so NESTED mode returns its completed dependent (if 215 * one exists) for further processing by its caller (see method 216 * postFire). 217 * 218 * Blocking methods get() and join() rely on Signaller Completions 219 * that wake up waiting threads. The mechanics are similar to 220 * Treiber stack wait-nodes used in FutureTask, Phaser, and 221 * SynchronousQueue. See their internal documentation for 222 * algorithmic details. 223 * 224 * Without precautions, CompletableFutures would be prone to 225 * garbage accumulation as chains of Completions build up, each 226 * pointing back to its sources. So we null out fields as soon as 227 * possible. The screening checks needed anyway harmlessly ignore 228 * null arguments that may have been obtained during races with 229 * threads nulling out fields. We also try to unlink non-isLive 230 * (fired or cancelled) Completions from stacks that might 231 * otherwise never be popped: Method cleanStack always unlinks non 232 * isLive completions from the head of stack; others may 233 * occasionally remain if racing with other cancellations or 234 * removals. 235 * 236 * Completion fields need not be declared as final or volatile 237 * because they are only visible to other threads upon safe 238 * publication. 239 */ 240 241 volatile Object result; // Either the result or boxed AltResult 242 volatile Completion stack; // Top of Treiber stack of dependent actions 243 internalComplete(Object r)244 final boolean internalComplete(Object r) { // CAS from null to r 245 return RESULT.compareAndSet(this, null, r); 246 } 247 248 /** Returns true if successfully pushed c onto stack. */ tryPushStack(Completion c)249 final boolean tryPushStack(Completion c) { 250 Completion h = stack; 251 NEXT.set(c, h); // CAS piggyback 252 return STACK.compareAndSet(this, h, c); 253 } 254 255 /** Unconditionally pushes c onto stack, retrying if necessary. */ pushStack(Completion c)256 final void pushStack(Completion c) { 257 do {} while (!tryPushStack(c)); 258 } 259 260 /* ------------- Encoding and decoding outcomes -------------- */ 261 262 static final class AltResult { // See above 263 final Throwable ex; // null only for NIL AltResult(Throwable x)264 AltResult(Throwable x) { this.ex = x; } 265 } 266 267 /** The encoding of the null value. */ 268 static final AltResult NIL = new AltResult(null); 269 270 /** Completes with the null value, unless already completed. */ completeNull()271 final boolean completeNull() { 272 return RESULT.compareAndSet(this, null, NIL); 273 } 274 275 /** Returns the encoding of the given non-exceptional value. */ encodeValue(T t)276 final Object encodeValue(T t) { 277 return (t == null) ? NIL : t; 278 } 279 280 /** Completes with a non-exceptional result, unless already completed. */ completeValue(T t)281 final boolean completeValue(T t) { 282 return RESULT.compareAndSet(this, null, (t == null) ? NIL : t); 283 } 284 285 /** 286 * Returns the encoding of the given (non-null) exception as a 287 * wrapped CompletionException unless it is one already. 288 */ encodeThrowable(Throwable x)289 static AltResult encodeThrowable(Throwable x) { 290 return new AltResult((x instanceof CompletionException) ? x : 291 new CompletionException(x)); 292 } 293 294 /** Completes with an exceptional result, unless already completed. */ completeThrowable(Throwable x)295 final boolean completeThrowable(Throwable x) { 296 return RESULT.compareAndSet(this, null, encodeThrowable(x)); 297 } 298 299 /** 300 * Returns the encoding of the given (non-null) exception as a 301 * wrapped CompletionException unless it is one already. May 302 * return the given Object r (which must have been the result of a 303 * source future) if it is equivalent, i.e. if this is a simple 304 * relay of an existing CompletionException. 305 */ encodeThrowable(Throwable x, Object r)306 static Object encodeThrowable(Throwable x, Object r) { 307 if (!(x instanceof CompletionException)) 308 x = new CompletionException(x); 309 else if (r instanceof AltResult && x == ((AltResult)r).ex) 310 return r; 311 return new AltResult(x); 312 } 313 314 /** 315 * Completes with the given (non-null) exceptional result as a 316 * wrapped CompletionException unless it is one already, unless 317 * already completed. May complete with the given Object r 318 * (which must have been the result of a source future) if it is 319 * equivalent, i.e. if this is a simple propagation of an 320 * existing CompletionException. 321 */ completeThrowable(Throwable x, Object r)322 final boolean completeThrowable(Throwable x, Object r) { 323 return RESULT.compareAndSet(this, null, encodeThrowable(x, r)); 324 } 325 326 /** 327 * Returns the encoding of the given arguments: if the exception 328 * is non-null, encodes as AltResult. Otherwise uses the given 329 * value, boxed as NIL if null. 330 */ encodeOutcome(T t, Throwable x)331 Object encodeOutcome(T t, Throwable x) { 332 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); 333 } 334 335 /** 336 * Returns the encoding of a copied outcome; if exceptional, 337 * rewraps as a CompletionException, else returns argument. 338 */ encodeRelay(Object r)339 static Object encodeRelay(Object r) { 340 Throwable x; 341 if (r instanceof AltResult 342 && (x = ((AltResult)r).ex) != null 343 && !(x instanceof CompletionException)) 344 r = new AltResult(new CompletionException(x)); 345 return r; 346 } 347 348 /** 349 * Completes with r or a copy of r, unless already completed. 350 * If exceptional, r is first coerced to a CompletionException. 351 */ completeRelay(Object r)352 final boolean completeRelay(Object r) { 353 return RESULT.compareAndSet(this, null, encodeRelay(r)); 354 } 355 356 /** 357 * Reports result using Future.get conventions. 358 */ reportGet(Object r)359 private static Object reportGet(Object r) 360 throws InterruptedException, ExecutionException { 361 if (r == null) // by convention below, null means interrupted 362 throw new InterruptedException(); 363 if (r instanceof AltResult) { 364 Throwable x, cause; 365 if ((x = ((AltResult)r).ex) == null) 366 return null; 367 if (x instanceof CancellationException) 368 throw (CancellationException)x; 369 if ((x instanceof CompletionException) && 370 (cause = x.getCause()) != null) 371 x = cause; 372 throw new ExecutionException(x); 373 } 374 return r; 375 } 376 377 /** 378 * Decodes outcome to return result or throw unchecked exception. 379 */ reportJoin(Object r)380 private static Object reportJoin(Object r) { 381 if (r instanceof AltResult) { 382 Throwable x; 383 if ((x = ((AltResult)r).ex) == null) 384 return null; 385 if (x instanceof CancellationException) 386 throw (CancellationException)x; 387 if (x instanceof CompletionException) 388 throw (CompletionException)x; 389 throw new CompletionException(x); 390 } 391 return r; 392 } 393 394 /* ------------- Async task preliminaries -------------- */ 395 396 /** 397 * A marker interface identifying asynchronous tasks produced by 398 * {@code async} methods. This may be useful for monitoring, 399 * debugging, and tracking asynchronous activities. 400 * 401 * @since 1.8 402 */ 403 public static interface AsynchronousCompletionTask { 404 } 405 406 private static final boolean USE_COMMON_POOL = 407 (ForkJoinPool.getCommonPoolParallelism() > 1); 408 409 /** 410 * Default executor -- ForkJoinPool.commonPool() unless it cannot 411 * support parallelism. 412 */ 413 private static final Executor ASYNC_POOL = USE_COMMON_POOL ? 414 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 415 416 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 417 private static final class ThreadPerTaskExecutor implements Executor { execute(Runnable r)418 public void execute(Runnable r) { 419 Objects.requireNonNull(r); 420 new Thread(r).start(); 421 } 422 } 423 424 /** 425 * Null-checks user executor argument, and translates uses of 426 * commonPool to ASYNC_POOL in case parallelism disabled. 427 */ screenExecutor(Executor e)428 static Executor screenExecutor(Executor e) { 429 if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool()) 430 return ASYNC_POOL; 431 if (e == null) throw new NullPointerException(); 432 return e; 433 } 434 435 // Modes for Completion.tryFire. Signedness matters. 436 static final int SYNC = 0; 437 static final int ASYNC = 1; 438 static final int NESTED = -1; 439 440 /* ------------- Base Completion classes and operations -------------- */ 441 442 @SuppressWarnings("serial") 443 abstract static class Completion extends ForkJoinTask<Void> 444 implements Runnable, AsynchronousCompletionTask { 445 volatile Completion next; // Treiber stack link 446 447 /** 448 * Performs completion action if triggered, returning a 449 * dependent that may need propagation, if one exists. 450 * 451 * @param mode SYNC, ASYNC, or NESTED 452 */ tryFire(int mode)453 abstract CompletableFuture<?> tryFire(int mode); 454 455 /** Returns true if possibly still triggerable. Used by cleanStack. */ isLive()456 abstract boolean isLive(); 457 run()458 public final void run() { tryFire(ASYNC); } exec()459 public final boolean exec() { tryFire(ASYNC); return false; } getRawResult()460 public final Void getRawResult() { return null; } setRawResult(Void v)461 public final void setRawResult(Void v) {} 462 } 463 464 /** 465 * Pops and tries to trigger all reachable dependents. Call only 466 * when known to be done. 467 */ postComplete()468 final void postComplete() { 469 /* 470 * On each step, variable f holds current dependents to pop 471 * and run. It is extended along only one path at a time, 472 * pushing others to avoid unbounded recursion. 473 */ 474 CompletableFuture<?> f = this; Completion h; 475 while ((h = f.stack) != null || 476 (f != this && (h = (f = this).stack) != null)) { 477 CompletableFuture<?> d; Completion t; 478 if (STACK.compareAndSet(f, h, t = h.next)) { 479 if (t != null) { 480 if (f != this) { 481 pushStack(h); 482 continue; 483 } 484 NEXT.compareAndSet(h, t, null); // try to detach 485 } 486 f = (d = h.tryFire(NESTED)) == null ? this : d; 487 } 488 } 489 } 490 491 /** Traverses stack and unlinks one or more dead Completions, if found. */ cleanStack()492 final void cleanStack() { 493 Completion p = stack; 494 // ensure head of stack live 495 for (boolean unlinked = false;;) { 496 if (p == null) 497 return; 498 else if (p.isLive()) { 499 if (unlinked) 500 return; 501 else 502 break; 503 } 504 else if (STACK.weakCompareAndSet(this, p, (p = p.next))) 505 unlinked = true; 506 else 507 p = stack; 508 } 509 // try to unlink first non-live 510 for (Completion q = p.next; q != null;) { 511 Completion s = q.next; 512 if (q.isLive()) { 513 p = q; 514 q = s; 515 } else if (NEXT.weakCompareAndSet(p, q, s)) 516 break; 517 else 518 q = p.next; 519 } 520 } 521 522 /* ------------- One-input Completions -------------- */ 523 524 /** A Completion with a source, dependent, and executor. */ 525 @SuppressWarnings("serial") 526 abstract static class UniCompletion<T,V> extends Completion { 527 Executor executor; // executor to use (null if none) 528 CompletableFuture<V> dep; // the dependent to complete 529 CompletableFuture<T> src; // source for action 530 UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src)531 UniCompletion(Executor executor, CompletableFuture<V> dep, 532 CompletableFuture<T> src) { 533 this.executor = executor; this.dep = dep; this.src = src; 534 } 535 536 /** 537 * Returns true if action can be run. Call only when known to 538 * be triggerable. Uses FJ tag bit to ensure that only one 539 * thread claims ownership. If async, starts as task -- a 540 * later call to tryFire will run action. 541 */ claim()542 final boolean claim() { 543 Executor e = executor; 544 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { 545 if (e == null) 546 return true; 547 executor = null; // disable 548 e.execute(this); 549 } 550 return false; 551 } 552 isLive()553 final boolean isLive() { return dep != null; } 554 } 555 556 /** 557 * Pushes the given completion unless it completes while trying. 558 * Caller should first check that result is null. 559 */ unipush(Completion c)560 final void unipush(Completion c) { 561 if (c != null) { 562 while (!tryPushStack(c)) { 563 if (result != null) { 564 NEXT.set(c, null); 565 break; 566 } 567 } 568 if (result != null) 569 c.tryFire(SYNC); 570 } 571 } 572 573 /** 574 * Post-processing by dependent after successful UniCompletion tryFire. 575 * Tries to clean stack of source a, and then either runs postComplete 576 * or returns this to caller, depending on mode. 577 */ postFire(CompletableFuture<?> a, int mode)578 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { 579 if (a != null && a.stack != null) { 580 Object r; 581 if ((r = a.result) == null) 582 a.cleanStack(); 583 if (mode >= 0 && (r != null || a.result != null)) 584 a.postComplete(); 585 } 586 if (result != null && stack != null) { 587 if (mode < 0) 588 return this; 589 else 590 postComplete(); 591 } 592 return null; 593 } 594 595 @SuppressWarnings("serial") 596 static final class UniApply<T,V> extends UniCompletion<T,V> { 597 Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn)598 UniApply(Executor executor, CompletableFuture<V> dep, 599 CompletableFuture<T> src, 600 Function<? super T,? extends V> fn) { 601 super(executor, dep, src); this.fn = fn; 602 } tryFire(int mode)603 final CompletableFuture<V> tryFire(int mode) { 604 CompletableFuture<V> d; CompletableFuture<T> a; 605 Object r; Throwable x; Function<? super T,? extends V> f; 606 if ((a = src) == null || (r = a.result) == null 607 || (d = dep) == null || (f = fn) == null) 608 return null; 609 tryComplete: if (d.result == null) { 610 if (r instanceof AltResult) { 611 if ((x = ((AltResult)r).ex) != null) { 612 d.completeThrowable(x, r); 613 break tryComplete; 614 } 615 r = null; 616 } 617 try { 618 if (mode <= 0 && !claim()) 619 return null; 620 else { 621 @SuppressWarnings("unchecked") T t = (T) r; 622 d.completeValue(f.apply(t)); 623 } 624 } catch (Throwable ex) { 625 d.completeThrowable(ex); 626 } 627 } 628 src = null; dep = null; fn = null; 629 return d.postFire(a, mode); 630 } 631 } 632 uniApplyStage( Executor e, Function<? super T,? extends V> f)633 private <V> CompletableFuture<V> uniApplyStage( 634 Executor e, Function<? super T,? extends V> f) { 635 if (f == null) throw new NullPointerException(); 636 Object r; 637 if ((r = result) != null) 638 return uniApplyNow(r, e, f); 639 CompletableFuture<V> d = newIncompleteFuture(); 640 unipush(new UniApply<T,V>(e, d, this, f)); 641 return d; 642 } 643 uniApplyNow( Object r, Executor e, Function<? super T,? extends V> f)644 private <V> CompletableFuture<V> uniApplyNow( 645 Object r, Executor e, Function<? super T,? extends V> f) { 646 Throwable x; 647 CompletableFuture<V> d = newIncompleteFuture(); 648 if (r instanceof AltResult) { 649 if ((x = ((AltResult)r).ex) != null) { 650 d.result = encodeThrowable(x, r); 651 return d; 652 } 653 r = null; 654 } 655 try { 656 if (e != null) { 657 e.execute(new UniApply<T,V>(null, d, this, f)); 658 } else { 659 @SuppressWarnings("unchecked") T t = (T) r; 660 d.result = d.encodeValue(f.apply(t)); 661 } 662 } catch (Throwable ex) { 663 d.result = encodeThrowable(ex); 664 } 665 return d; 666 } 667 668 @SuppressWarnings("serial") 669 static final class UniAccept<T> extends UniCompletion<T,Void> { 670 Consumer<? super T> fn; UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn)671 UniAccept(Executor executor, CompletableFuture<Void> dep, 672 CompletableFuture<T> src, Consumer<? super T> fn) { 673 super(executor, dep, src); this.fn = fn; 674 } tryFire(int mode)675 final CompletableFuture<Void> tryFire(int mode) { 676 CompletableFuture<Void> d; CompletableFuture<T> a; 677 Object r; Throwable x; Consumer<? super T> f; 678 if ((a = src) == null || (r = a.result) == null 679 || (d = dep) == null || (f = fn) == null) 680 return null; 681 tryComplete: if (d.result == null) { 682 if (r instanceof AltResult) { 683 if ((x = ((AltResult)r).ex) != null) { 684 d.completeThrowable(x, r); 685 break tryComplete; 686 } 687 r = null; 688 } 689 try { 690 if (mode <= 0 && !claim()) 691 return null; 692 else { 693 @SuppressWarnings("unchecked") T t = (T) r; 694 f.accept(t); 695 d.completeNull(); 696 } 697 } catch (Throwable ex) { 698 d.completeThrowable(ex); 699 } 700 } 701 src = null; dep = null; fn = null; 702 return d.postFire(a, mode); 703 } 704 } 705 uniAcceptStage(Executor e, Consumer<? super T> f)706 private CompletableFuture<Void> uniAcceptStage(Executor e, 707 Consumer<? super T> f) { 708 if (f == null) throw new NullPointerException(); 709 Object r; 710 if ((r = result) != null) 711 return uniAcceptNow(r, e, f); 712 CompletableFuture<Void> d = newIncompleteFuture(); 713 unipush(new UniAccept<T>(e, d, this, f)); 714 return d; 715 } 716 uniAcceptNow( Object r, Executor e, Consumer<? super T> f)717 private CompletableFuture<Void> uniAcceptNow( 718 Object r, Executor e, Consumer<? super T> f) { 719 Throwable x; 720 CompletableFuture<Void> d = newIncompleteFuture(); 721 if (r instanceof AltResult) { 722 if ((x = ((AltResult)r).ex) != null) { 723 d.result = encodeThrowable(x, r); 724 return d; 725 } 726 r = null; 727 } 728 try { 729 if (e != null) { 730 e.execute(new UniAccept<T>(null, d, this, f)); 731 } else { 732 @SuppressWarnings("unchecked") T t = (T) r; 733 f.accept(t); 734 d.result = NIL; 735 } 736 } catch (Throwable ex) { 737 d.result = encodeThrowable(ex); 738 } 739 return d; 740 } 741 742 @SuppressWarnings("serial") 743 static final class UniRun<T> extends UniCompletion<T,Void> { 744 Runnable fn; UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn)745 UniRun(Executor executor, CompletableFuture<Void> dep, 746 CompletableFuture<T> src, Runnable fn) { 747 super(executor, dep, src); this.fn = fn; 748 } tryFire(int mode)749 final CompletableFuture<Void> tryFire(int mode) { 750 CompletableFuture<Void> d; CompletableFuture<T> a; 751 Object r; Throwable x; Runnable f; 752 if ((a = src) == null || (r = a.result) == null 753 || (d = dep) == null || (f = fn) == null) 754 return null; 755 if (d.result == null) { 756 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 757 d.completeThrowable(x, r); 758 else 759 try { 760 if (mode <= 0 && !claim()) 761 return null; 762 else { 763 f.run(); 764 d.completeNull(); 765 } 766 } catch (Throwable ex) { 767 d.completeThrowable(ex); 768 } 769 } 770 src = null; dep = null; fn = null; 771 return d.postFire(a, mode); 772 } 773 } 774 uniRunStage(Executor e, Runnable f)775 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { 776 if (f == null) throw new NullPointerException(); 777 Object r; 778 if ((r = result) != null) 779 return uniRunNow(r, e, f); 780 CompletableFuture<Void> d = newIncompleteFuture(); 781 unipush(new UniRun<T>(e, d, this, f)); 782 return d; 783 } 784 uniRunNow(Object r, Executor e, Runnable f)785 private CompletableFuture<Void> uniRunNow(Object r, Executor e, Runnable f) { 786 Throwable x; 787 CompletableFuture<Void> d = newIncompleteFuture(); 788 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 789 d.result = encodeThrowable(x, r); 790 else 791 try { 792 if (e != null) { 793 e.execute(new UniRun<T>(null, d, this, f)); 794 } else { 795 f.run(); 796 d.result = NIL; 797 } 798 } catch (Throwable ex) { 799 d.result = encodeThrowable(ex); 800 } 801 return d; 802 } 803 804 @SuppressWarnings("serial") 805 static final class UniWhenComplete<T> extends UniCompletion<T,T> { 806 BiConsumer<? super T, ? super Throwable> fn; UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn)807 UniWhenComplete(Executor executor, CompletableFuture<T> dep, 808 CompletableFuture<T> src, 809 BiConsumer<? super T, ? super Throwable> fn) { 810 super(executor, dep, src); this.fn = fn; 811 } tryFire(int mode)812 final CompletableFuture<T> tryFire(int mode) { 813 CompletableFuture<T> d; CompletableFuture<T> a; 814 Object r; BiConsumer<? super T, ? super Throwable> f; 815 if ((a = src) == null || (r = a.result) == null 816 || (d = dep) == null || (f = fn) == null 817 || !d.uniWhenComplete(r, f, mode > 0 ? null : this)) 818 return null; 819 src = null; dep = null; fn = null; 820 return d.postFire(a, mode); 821 } 822 } 823 uniWhenComplete(Object r, BiConsumer<? super T,? super Throwable> f, UniWhenComplete<T> c)824 final boolean uniWhenComplete(Object r, 825 BiConsumer<? super T,? super Throwable> f, 826 UniWhenComplete<T> c) { 827 T t; Throwable x = null; 828 if (result == null) { 829 try { 830 if (c != null && !c.claim()) 831 return false; 832 if (r instanceof AltResult) { 833 x = ((AltResult)r).ex; 834 t = null; 835 } else { 836 @SuppressWarnings("unchecked") T tr = (T) r; 837 t = tr; 838 } 839 f.accept(t, x); 840 if (x == null) { 841 internalComplete(r); 842 return true; 843 } 844 } catch (Throwable ex) { 845 if (x == null) 846 x = ex; 847 else if (x != ex) 848 x.addSuppressed(ex); 849 } 850 completeThrowable(x, r); 851 } 852 return true; 853 } 854 uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f)855 private CompletableFuture<T> uniWhenCompleteStage( 856 Executor e, BiConsumer<? super T, ? super Throwable> f) { 857 if (f == null) throw new NullPointerException(); 858 CompletableFuture<T> d = newIncompleteFuture(); 859 Object r; 860 if ((r = result) == null) 861 unipush(new UniWhenComplete<T>(e, d, this, f)); 862 else if (e == null) 863 d.uniWhenComplete(r, f, null); 864 else { 865 try { 866 e.execute(new UniWhenComplete<T>(null, d, this, f)); 867 } catch (Throwable ex) { 868 d.result = encodeThrowable(ex); 869 } 870 } 871 return d; 872 } 873 874 @SuppressWarnings("serial") 875 static final class UniHandle<T,V> extends UniCompletion<T,V> { 876 BiFunction<? super T, Throwable, ? extends V> fn; UniHandle(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends V> fn)877 UniHandle(Executor executor, CompletableFuture<V> dep, 878 CompletableFuture<T> src, 879 BiFunction<? super T, Throwable, ? extends V> fn) { 880 super(executor, dep, src); this.fn = fn; 881 } tryFire(int mode)882 final CompletableFuture<V> tryFire(int mode) { 883 CompletableFuture<V> d; CompletableFuture<T> a; 884 Object r; BiFunction<? super T, Throwable, ? extends V> f; 885 if ((a = src) == null || (r = a.result) == null 886 || (d = dep) == null || (f = fn) == null 887 || !d.uniHandle(r, f, mode > 0 ? null : this)) 888 return null; 889 src = null; dep = null; fn = null; 890 return d.postFire(a, mode); 891 } 892 } 893 uniHandle(Object r, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c)894 final <S> boolean uniHandle(Object r, 895 BiFunction<? super S, Throwable, ? extends T> f, 896 UniHandle<S,T> c) { 897 S s; Throwable x; 898 if (result == null) { 899 try { 900 if (c != null && !c.claim()) 901 return false; 902 if (r instanceof AltResult) { 903 x = ((AltResult)r).ex; 904 s = null; 905 } else { 906 x = null; 907 @SuppressWarnings("unchecked") S ss = (S) r; 908 s = ss; 909 } 910 completeValue(f.apply(s, x)); 911 } catch (Throwable ex) { 912 completeThrowable(ex); 913 } 914 } 915 return true; 916 } 917 uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f)918 private <V> CompletableFuture<V> uniHandleStage( 919 Executor e, BiFunction<? super T, Throwable, ? extends V> f) { 920 if (f == null) throw new NullPointerException(); 921 CompletableFuture<V> d = newIncompleteFuture(); 922 Object r; 923 if ((r = result) == null) 924 unipush(new UniHandle<T,V>(e, d, this, f)); 925 else if (e == null) 926 d.uniHandle(r, f, null); 927 else { 928 try { 929 e.execute(new UniHandle<T,V>(null, d, this, f)); 930 } catch (Throwable ex) { 931 d.result = encodeThrowable(ex); 932 } 933 } 934 return d; 935 } 936 937 @SuppressWarnings("serial") 938 static final class UniExceptionally<T> extends UniCompletion<T,T> { 939 Function<? super Throwable, ? extends T> fn; UniExceptionally(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn)940 UniExceptionally(Executor executor, 941 CompletableFuture<T> dep, CompletableFuture<T> src, 942 Function<? super Throwable, ? extends T> fn) { 943 super(executor, dep, src); this.fn = fn; 944 } tryFire(int mode)945 final CompletableFuture<T> tryFire(int mode) { 946 CompletableFuture<T> d; CompletableFuture<T> a; 947 Object r; Function<? super Throwable, ? extends T> f; 948 if ((a = src) == null || (r = a.result) == null 949 || (d = dep) == null || (f = fn) == null 950 || !d.uniExceptionally(r, f, mode > 0 ? null : this)) 951 return null; 952 src = null; dep = null; fn = null; 953 return d.postFire(a, mode); 954 } 955 } 956 uniExceptionally(Object r, Function<? super Throwable, ? extends T> f, UniExceptionally<T> c)957 final boolean uniExceptionally(Object r, 958 Function<? super Throwable, ? extends T> f, 959 UniExceptionally<T> c) { 960 Throwable x; 961 if (result == null) { 962 try { 963 if (c != null && !c.claim()) 964 return false; 965 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 966 completeValue(f.apply(x)); 967 else 968 internalComplete(r); 969 } catch (Throwable ex) { 970 completeThrowable(ex); 971 } 972 } 973 return true; 974 } 975 uniExceptionallyStage( Executor e, Function<Throwable, ? extends T> f)976 private CompletableFuture<T> uniExceptionallyStage( 977 Executor e, Function<Throwable, ? extends T> f) { 978 if (f == null) throw new NullPointerException(); 979 CompletableFuture<T> d = newIncompleteFuture(); 980 Object r; 981 if ((r = result) == null) 982 unipush(new UniExceptionally<T>(e, d, this, f)); 983 else if (e == null) 984 d.uniExceptionally(r, f, null); 985 else { 986 try { 987 e.execute(new UniExceptionally<T>(null, d, this, f)); 988 } catch (Throwable ex) { 989 d.result = encodeThrowable(ex); 990 } 991 } 992 return d; 993 } 994 995 @SuppressWarnings("serial") 996 static final class UniComposeExceptionally<T> extends UniCompletion<T,T> { 997 Function<Throwable, ? extends CompletionStage<T>> fn; UniComposeExceptionally(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, Function<Throwable, ? extends CompletionStage<T>> fn)998 UniComposeExceptionally(Executor executor, CompletableFuture<T> dep, 999 CompletableFuture<T> src, 1000 Function<Throwable, ? extends CompletionStage<T>> fn) { 1001 super(executor, dep, src); this.fn = fn; 1002 } tryFire(int mode)1003 final CompletableFuture<T> tryFire(int mode) { 1004 CompletableFuture<T> d; CompletableFuture<T> a; 1005 Function<Throwable, ? extends CompletionStage<T>> f; 1006 Object r; Throwable x; 1007 if ((a = src) == null || (r = a.result) == null 1008 || (d = dep) == null || (f = fn) == null) 1009 return null; 1010 if (d.result == null) { 1011 if ((r instanceof AltResult) && 1012 (x = ((AltResult)r).ex) != null) { 1013 try { 1014 if (mode <= 0 && !claim()) 1015 return null; 1016 CompletableFuture<T> g = f.apply(x).toCompletableFuture(); 1017 if ((r = g.result) != null) 1018 d.completeRelay(r); 1019 else { 1020 g.unipush(new UniRelay<T,T>(d, g)); 1021 if (d.result == null) 1022 return null; 1023 } 1024 } catch (Throwable ex) { 1025 d.completeThrowable(ex); 1026 } 1027 } 1028 else 1029 d.internalComplete(r); 1030 } 1031 src = null; dep = null; fn = null; 1032 return d.postFire(a, mode); 1033 } 1034 } 1035 uniComposeExceptionallyStage( Executor e, Function<Throwable, ? extends CompletionStage<T>> f)1036 private CompletableFuture<T> uniComposeExceptionallyStage( 1037 Executor e, Function<Throwable, ? extends CompletionStage<T>> f) { 1038 if (f == null) throw new NullPointerException(); 1039 CompletableFuture<T> d = newIncompleteFuture(); 1040 Object r, s; Throwable x; 1041 if ((r = result) == null) 1042 unipush(new UniComposeExceptionally<T>(e, d, this, f)); 1043 else if (!(r instanceof AltResult) || (x = ((AltResult)r).ex) == null) 1044 d.internalComplete(r); 1045 else 1046 try { 1047 if (e != null) 1048 e.execute(new UniComposeExceptionally<T>(null, d, this, f)); 1049 else { 1050 CompletableFuture<T> g = f.apply(x).toCompletableFuture(); 1051 if ((s = g.result) != null) 1052 d.result = encodeRelay(s); 1053 else 1054 g.unipush(new UniRelay<T,T>(d, g)); 1055 } 1056 } catch (Throwable ex) { 1057 d.result = encodeThrowable(ex); 1058 } 1059 return d; 1060 } 1061 1062 @SuppressWarnings("serial") 1063 static final class UniRelay<U, T extends U> extends UniCompletion<T,U> { UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src)1064 UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) { 1065 super(null, dep, src); 1066 } tryFire(int mode)1067 final CompletableFuture<U> tryFire(int mode) { 1068 CompletableFuture<U> d; CompletableFuture<T> a; Object r; 1069 if ((a = src) == null || (r = a.result) == null 1070 || (d = dep) == null) 1071 return null; 1072 if (d.result == null) 1073 d.completeRelay(r); 1074 src = null; dep = null; 1075 return d.postFire(a, mode); 1076 } 1077 } 1078 uniCopyStage( CompletableFuture<T> src)1079 private static <U, T extends U> CompletableFuture<U> uniCopyStage( 1080 CompletableFuture<T> src) { 1081 Object r; 1082 CompletableFuture<U> d = src.newIncompleteFuture(); 1083 if ((r = src.result) != null) 1084 d.result = encodeRelay(r); 1085 else 1086 src.unipush(new UniRelay<U,T>(d, src)); 1087 return d; 1088 } 1089 uniAsMinimalStage()1090 private MinimalStage<T> uniAsMinimalStage() { 1091 Object r; 1092 if ((r = result) != null) 1093 return new MinimalStage<T>(encodeRelay(r)); 1094 MinimalStage<T> d = new MinimalStage<T>(); 1095 unipush(new UniRelay<T,T>(d, this)); 1096 return d; 1097 } 1098 1099 @SuppressWarnings("serial") 1100 static final class UniCompose<T,V> extends UniCompletion<T,V> { 1101 Function<? super T, ? extends CompletionStage<V>> fn; UniCompose(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<V>> fn)1102 UniCompose(Executor executor, CompletableFuture<V> dep, 1103 CompletableFuture<T> src, 1104 Function<? super T, ? extends CompletionStage<V>> fn) { 1105 super(executor, dep, src); this.fn = fn; 1106 } tryFire(int mode)1107 final CompletableFuture<V> tryFire(int mode) { 1108 CompletableFuture<V> d; CompletableFuture<T> a; 1109 Function<? super T, ? extends CompletionStage<V>> f; 1110 Object r; Throwable x; 1111 if ((a = src) == null || (r = a.result) == null 1112 || (d = dep) == null || (f = fn) == null) 1113 return null; 1114 tryComplete: if (d.result == null) { 1115 if (r instanceof AltResult) { 1116 if ((x = ((AltResult)r).ex) != null) { 1117 d.completeThrowable(x, r); 1118 break tryComplete; 1119 } 1120 r = null; 1121 } 1122 try { 1123 if (mode <= 0 && !claim()) 1124 return null; 1125 @SuppressWarnings("unchecked") T t = (T) r; 1126 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1127 if ((r = g.result) != null) 1128 d.completeRelay(r); 1129 else { 1130 g.unipush(new UniRelay<V,V>(d, g)); 1131 if (d.result == null) 1132 return null; 1133 } 1134 } catch (Throwable ex) { 1135 d.completeThrowable(ex); 1136 } 1137 } 1138 src = null; dep = null; fn = null; 1139 return d.postFire(a, mode); 1140 } 1141 } 1142 uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f)1143 private <V> CompletableFuture<V> uniComposeStage( 1144 Executor e, Function<? super T, ? extends CompletionStage<V>> f) { 1145 if (f == null) throw new NullPointerException(); 1146 CompletableFuture<V> d = newIncompleteFuture(); 1147 Object r, s; Throwable x; 1148 if ((r = result) == null) 1149 unipush(new UniCompose<T,V>(e, d, this, f)); 1150 else { 1151 if (r instanceof AltResult) { 1152 if ((x = ((AltResult)r).ex) != null) { 1153 d.result = encodeThrowable(x, r); 1154 return d; 1155 } 1156 r = null; 1157 } 1158 try { 1159 if (e != null) 1160 e.execute(new UniCompose<T,V>(null, d, this, f)); 1161 else { 1162 @SuppressWarnings("unchecked") T t = (T) r; 1163 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1164 if ((s = g.result) != null) 1165 d.result = encodeRelay(s); 1166 else 1167 g.unipush(new UniRelay<V,V>(d, g)); 1168 } 1169 } catch (Throwable ex) { 1170 d.result = encodeThrowable(ex); 1171 } 1172 } 1173 return d; 1174 } 1175 1176 /* ------------- Two-input Completions -------------- */ 1177 1178 /** A Completion for an action with two sources */ 1179 @SuppressWarnings("serial") 1180 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { 1181 CompletableFuture<U> snd; // second source for action BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1182 BiCompletion(Executor executor, CompletableFuture<V> dep, 1183 CompletableFuture<T> src, CompletableFuture<U> snd) { 1184 super(executor, dep, src); this.snd = snd; 1185 } 1186 } 1187 1188 /** A Completion delegating to a BiCompletion */ 1189 @SuppressWarnings("serial") 1190 static final class CoCompletion extends Completion { 1191 BiCompletion<?,?,?> base; CoCompletion(BiCompletion<?,?,?> base)1192 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; } tryFire(int mode)1193 final CompletableFuture<?> tryFire(int mode) { 1194 BiCompletion<?,?,?> c; CompletableFuture<?> d; 1195 if ((c = base) == null || (d = c.tryFire(mode)) == null) 1196 return null; 1197 base = null; // detach 1198 return d; 1199 } isLive()1200 final boolean isLive() { 1201 BiCompletion<?,?,?> c; 1202 return (c = base) != null 1203 // && c.isLive() 1204 && c.dep != null; 1205 } 1206 } 1207 1208 /** 1209 * Pushes completion to this and b unless both done. 1210 * Caller should first check that either result or b.result is null. 1211 */ bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1212 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1213 if (c != null) { 1214 while (result == null) { 1215 if (tryPushStack(c)) { 1216 if (b.result == null) 1217 b.unipush(new CoCompletion(c)); 1218 else if (result != null) 1219 c.tryFire(SYNC); 1220 return; 1221 } 1222 } 1223 b.unipush(c); 1224 } 1225 } 1226 1227 /** Post-processing after successful BiCompletion tryFire. */ postFire(CompletableFuture<?> a, CompletableFuture<?> b, int mode)1228 final CompletableFuture<T> postFire(CompletableFuture<?> a, 1229 CompletableFuture<?> b, int mode) { 1230 if (b != null && b.stack != null) { // clean second source 1231 Object r; 1232 if ((r = b.result) == null) 1233 b.cleanStack(); 1234 if (mode >= 0 && (r != null || b.result != null)) 1235 b.postComplete(); 1236 } 1237 return postFire(a, mode); 1238 } 1239 1240 @SuppressWarnings("serial") 1241 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { 1242 BiFunction<? super T,? super U,? extends V> fn; BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn)1243 BiApply(Executor executor, CompletableFuture<V> dep, 1244 CompletableFuture<T> src, CompletableFuture<U> snd, 1245 BiFunction<? super T,? super U,? extends V> fn) { 1246 super(executor, dep, src, snd); this.fn = fn; 1247 } tryFire(int mode)1248 final CompletableFuture<V> tryFire(int mode) { 1249 CompletableFuture<V> d; 1250 CompletableFuture<T> a; 1251 CompletableFuture<U> b; 1252 Object r, s; BiFunction<? super T,? super U,? extends V> f; 1253 if ( (a = src) == null || (r = a.result) == null 1254 || (b = snd) == null || (s = b.result) == null 1255 || (d = dep) == null || (f = fn) == null 1256 || !d.biApply(r, s, f, mode > 0 ? null : this)) 1257 return null; 1258 src = null; snd = null; dep = null; fn = null; 1259 return d.postFire(a, b, mode); 1260 } 1261 } 1262 biApply(Object r, Object s, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c)1263 final <R,S> boolean biApply(Object r, Object s, 1264 BiFunction<? super R,? super S,? extends T> f, 1265 BiApply<R,S,T> c) { 1266 Throwable x; 1267 tryComplete: if (result == null) { 1268 if (r instanceof AltResult) { 1269 if ((x = ((AltResult)r).ex) != null) { 1270 completeThrowable(x, r); 1271 break tryComplete; 1272 } 1273 r = null; 1274 } 1275 if (s instanceof AltResult) { 1276 if ((x = ((AltResult)s).ex) != null) { 1277 completeThrowable(x, s); 1278 break tryComplete; 1279 } 1280 s = null; 1281 } 1282 try { 1283 if (c != null && !c.claim()) 1284 return false; 1285 @SuppressWarnings("unchecked") R rr = (R) r; 1286 @SuppressWarnings("unchecked") S ss = (S) s; 1287 completeValue(f.apply(rr, ss)); 1288 } catch (Throwable ex) { 1289 completeThrowable(ex); 1290 } 1291 } 1292 return true; 1293 } 1294 biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f)1295 private <U,V> CompletableFuture<V> biApplyStage( 1296 Executor e, CompletionStage<U> o, 1297 BiFunction<? super T,? super U,? extends V> f) { 1298 CompletableFuture<U> b; Object r, s; 1299 if (f == null || (b = o.toCompletableFuture()) == null) 1300 throw new NullPointerException(); 1301 CompletableFuture<V> d = newIncompleteFuture(); 1302 if ((r = result) == null || (s = b.result) == null) 1303 bipush(b, new BiApply<T,U,V>(e, d, this, b, f)); 1304 else if (e == null) 1305 d.biApply(r, s, f, null); 1306 else 1307 try { 1308 e.execute(new BiApply<T,U,V>(null, d, this, b, f)); 1309 } catch (Throwable ex) { 1310 d.result = encodeThrowable(ex); 1311 } 1312 return d; 1313 } 1314 1315 @SuppressWarnings("serial") 1316 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> { 1317 BiConsumer<? super T,? super U> fn; BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiConsumer<? super T,? super U> fn)1318 BiAccept(Executor executor, CompletableFuture<Void> dep, 1319 CompletableFuture<T> src, CompletableFuture<U> snd, 1320 BiConsumer<? super T,? super U> fn) { 1321 super(executor, dep, src, snd); this.fn = fn; 1322 } tryFire(int mode)1323 final CompletableFuture<Void> tryFire(int mode) { 1324 CompletableFuture<Void> d; 1325 CompletableFuture<T> a; 1326 CompletableFuture<U> b; 1327 Object r, s; BiConsumer<? super T,? super U> f; 1328 if ( (a = src) == null || (r = a.result) == null 1329 || (b = snd) == null || (s = b.result) == null 1330 || (d = dep) == null || (f = fn) == null 1331 || !d.biAccept(r, s, f, mode > 0 ? null : this)) 1332 return null; 1333 src = null; snd = null; dep = null; fn = null; 1334 return d.postFire(a, b, mode); 1335 } 1336 } 1337 biAccept(Object r, Object s, BiConsumer<? super R,? super S> f, BiAccept<R,S> c)1338 final <R,S> boolean biAccept(Object r, Object s, 1339 BiConsumer<? super R,? super S> f, 1340 BiAccept<R,S> c) { 1341 Throwable x; 1342 tryComplete: if (result == null) { 1343 if (r instanceof AltResult) { 1344 if ((x = ((AltResult)r).ex) != null) { 1345 completeThrowable(x, r); 1346 break tryComplete; 1347 } 1348 r = null; 1349 } 1350 if (s instanceof AltResult) { 1351 if ((x = ((AltResult)s).ex) != null) { 1352 completeThrowable(x, s); 1353 break tryComplete; 1354 } 1355 s = null; 1356 } 1357 try { 1358 if (c != null && !c.claim()) 1359 return false; 1360 @SuppressWarnings("unchecked") R rr = (R) r; 1361 @SuppressWarnings("unchecked") S ss = (S) s; 1362 f.accept(rr, ss); 1363 completeNull(); 1364 } catch (Throwable ex) { 1365 completeThrowable(ex); 1366 } 1367 } 1368 return true; 1369 } 1370 biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f)1371 private <U> CompletableFuture<Void> biAcceptStage( 1372 Executor e, CompletionStage<U> o, 1373 BiConsumer<? super T,? super U> f) { 1374 CompletableFuture<U> b; Object r, s; 1375 if (f == null || (b = o.toCompletableFuture()) == null) 1376 throw new NullPointerException(); 1377 CompletableFuture<Void> d = newIncompleteFuture(); 1378 if ((r = result) == null || (s = b.result) == null) 1379 bipush(b, new BiAccept<T,U>(e, d, this, b, f)); 1380 else if (e == null) 1381 d.biAccept(r, s, f, null); 1382 else 1383 try { 1384 e.execute(new BiAccept<T,U>(null, d, this, b, f)); 1385 } catch (Throwable ex) { 1386 d.result = encodeThrowable(ex); 1387 } 1388 return d; 1389 } 1390 1391 @SuppressWarnings("serial") 1392 static final class BiRun<T,U> extends BiCompletion<T,U,Void> { 1393 Runnable fn; BiRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1394 BiRun(Executor executor, CompletableFuture<Void> dep, 1395 CompletableFuture<T> src, CompletableFuture<U> snd, 1396 Runnable fn) { 1397 super(executor, dep, src, snd); this.fn = fn; 1398 } tryFire(int mode)1399 final CompletableFuture<Void> tryFire(int mode) { 1400 CompletableFuture<Void> d; 1401 CompletableFuture<T> a; 1402 CompletableFuture<U> b; 1403 Object r, s; Runnable f; 1404 if ( (a = src) == null || (r = a.result) == null 1405 || (b = snd) == null || (s = b.result) == null 1406 || (d = dep) == null || (f = fn) == null 1407 || !d.biRun(r, s, f, mode > 0 ? null : this)) 1408 return null; 1409 src = null; snd = null; dep = null; fn = null; 1410 return d.postFire(a, b, mode); 1411 } 1412 } 1413 biRun(Object r, Object s, Runnable f, BiRun<?,?> c)1414 final boolean biRun(Object r, Object s, Runnable f, BiRun<?,?> c) { 1415 Throwable x; Object z; 1416 if (result == null) { 1417 if ((r instanceof AltResult 1418 && (x = ((AltResult)(z = r)).ex) != null) || 1419 (s instanceof AltResult 1420 && (x = ((AltResult)(z = s)).ex) != null)) 1421 completeThrowable(x, z); 1422 else 1423 try { 1424 if (c != null && !c.claim()) 1425 return false; 1426 f.run(); 1427 completeNull(); 1428 } catch (Throwable ex) { 1429 completeThrowable(ex); 1430 } 1431 } 1432 return true; 1433 } 1434 biRunStage(Executor e, CompletionStage<?> o, Runnable f)1435 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, 1436 Runnable f) { 1437 CompletableFuture<?> b; Object r, s; 1438 if (f == null || (b = o.toCompletableFuture()) == null) 1439 throw new NullPointerException(); 1440 CompletableFuture<Void> d = newIncompleteFuture(); 1441 if ((r = result) == null || (s = b.result) == null) 1442 bipush(b, new BiRun<>(e, d, this, b, f)); 1443 else if (e == null) 1444 d.biRun(r, s, f, null); 1445 else 1446 try { 1447 e.execute(new BiRun<>(null, d, this, b, f)); 1448 } catch (Throwable ex) { 1449 d.result = encodeThrowable(ex); 1450 } 1451 return d; 1452 } 1453 1454 @SuppressWarnings("serial") 1455 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1456 BiRelay(CompletableFuture<Void> dep, 1457 CompletableFuture<T> src, CompletableFuture<U> snd) { 1458 super(null, dep, src, snd); 1459 } tryFire(int mode)1460 final CompletableFuture<Void> tryFire(int mode) { 1461 CompletableFuture<Void> d; 1462 CompletableFuture<T> a; 1463 CompletableFuture<U> b; 1464 Object r, s, z; Throwable x; 1465 if ( (a = src) == null || (r = a.result) == null 1466 || (b = snd) == null || (s = b.result) == null 1467 || (d = dep) == null) 1468 return null; 1469 if (d.result == null) { 1470 if ((r instanceof AltResult 1471 && (x = ((AltResult)(z = r)).ex) != null) || 1472 (s instanceof AltResult 1473 && (x = ((AltResult)(z = s)).ex) != null)) 1474 d.completeThrowable(x, z); 1475 else 1476 d.completeNull(); 1477 } 1478 src = null; snd = null; dep = null; 1479 return d.postFire(a, b, mode); 1480 } 1481 } 1482 1483 /** Recursively constructs a tree of completions. */ andTree(CompletableFuture<?>[] cfs, int lo, int hi)1484 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, 1485 int lo, int hi) { 1486 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1487 if (lo > hi) // empty 1488 d.result = NIL; 1489 else { 1490 CompletableFuture<?> a, b; Object r, s, z; Throwable x; 1491 int mid = (lo + hi) >>> 1; 1492 if ((a = (lo == mid ? cfs[lo] : 1493 andTree(cfs, lo, mid))) == null || 1494 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1495 andTree(cfs, mid+1, hi))) == null) 1496 throw new NullPointerException(); 1497 if ((r = a.result) == null || (s = b.result) == null) 1498 a.bipush(b, new BiRelay<>(d, a, b)); 1499 else if ((r instanceof AltResult 1500 && (x = ((AltResult)(z = r)).ex) != null) || 1501 (s instanceof AltResult 1502 && (x = ((AltResult)(z = s)).ex) != null)) 1503 d.result = encodeThrowable(x, z); 1504 else 1505 d.result = NIL; 1506 } 1507 return d; 1508 } 1509 1510 /* ------------- Projected (Ored) BiCompletions -------------- */ 1511 1512 /** 1513 * Pushes completion to this and b unless either done. 1514 * Caller should first check that result and b.result are both null. 1515 */ orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1516 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1517 if (c != null) { 1518 while (!tryPushStack(c)) { 1519 if (result != null) { 1520 NEXT.set(c, null); 1521 break; 1522 } 1523 } 1524 if (result != null) 1525 c.tryFire(SYNC); 1526 else 1527 b.unipush(new CoCompletion(c)); 1528 } 1529 } 1530 1531 @SuppressWarnings("serial") 1532 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { 1533 Function<? super T,? extends V> fn; OrApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Function<? super T,? extends V> fn)1534 OrApply(Executor executor, CompletableFuture<V> dep, 1535 CompletableFuture<T> src, CompletableFuture<U> snd, 1536 Function<? super T,? extends V> fn) { 1537 super(executor, dep, src, snd); this.fn = fn; 1538 } tryFire(int mode)1539 final CompletableFuture<V> tryFire(int mode) { 1540 CompletableFuture<V> d; CompletableFuture<? extends T> a, b; 1541 Object r; Throwable x; Function<? super T,? extends V> f; 1542 if ((a = src) == null || (b = snd) == null 1543 || ((r = a.result) == null && (r = b.result) == null) 1544 || (d = dep) == null || (f = fn) == null) 1545 return null; 1546 tryComplete: if (d.result == null) { 1547 try { 1548 if (mode <= 0 && !claim()) 1549 return null; 1550 if (r instanceof AltResult) { 1551 if ((x = ((AltResult)r).ex) != null) { 1552 d.completeThrowable(x, r); 1553 break tryComplete; 1554 } 1555 r = null; 1556 } 1557 @SuppressWarnings("unchecked") T t = (T) r; 1558 d.completeValue(f.apply(t)); 1559 } catch (Throwable ex) { 1560 d.completeThrowable(ex); 1561 } 1562 } 1563 src = null; snd = null; dep = null; fn = null; 1564 return d.postFire(a, b, mode); 1565 } 1566 } 1567 orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f)1568 private <U extends T,V> CompletableFuture<V> orApplyStage( 1569 Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) { 1570 CompletableFuture<U> b; 1571 if (f == null || (b = o.toCompletableFuture()) == null) 1572 throw new NullPointerException(); 1573 1574 Object r; CompletableFuture<? extends T> z; 1575 if ((r = (z = this).result) != null || 1576 (r = (z = b).result) != null) 1577 return z.uniApplyNow(r, e, f); 1578 1579 CompletableFuture<V> d = newIncompleteFuture(); 1580 orpush(b, new OrApply<T,U,V>(e, d, this, b, f)); 1581 return d; 1582 } 1583 1584 @SuppressWarnings("serial") 1585 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { 1586 Consumer<? super T> fn; OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn)1587 OrAccept(Executor executor, CompletableFuture<Void> dep, 1588 CompletableFuture<T> src, CompletableFuture<U> snd, 1589 Consumer<? super T> fn) { 1590 super(executor, dep, src, snd); this.fn = fn; 1591 } tryFire(int mode)1592 final CompletableFuture<Void> tryFire(int mode) { 1593 CompletableFuture<Void> d; CompletableFuture<? extends T> a, b; 1594 Object r; Throwable x; Consumer<? super T> f; 1595 if ((a = src) == null || (b = snd) == null 1596 || ((r = a.result) == null && (r = b.result) == null) 1597 || (d = dep) == null || (f = fn) == null) 1598 return null; 1599 tryComplete: if (d.result == null) { 1600 try { 1601 if (mode <= 0 && !claim()) 1602 return null; 1603 if (r instanceof AltResult) { 1604 if ((x = ((AltResult)r).ex) != null) { 1605 d.completeThrowable(x, r); 1606 break tryComplete; 1607 } 1608 r = null; 1609 } 1610 @SuppressWarnings("unchecked") T t = (T) r; 1611 f.accept(t); 1612 d.completeNull(); 1613 } catch (Throwable ex) { 1614 d.completeThrowable(ex); 1615 } 1616 } 1617 src = null; snd = null; dep = null; fn = null; 1618 return d.postFire(a, b, mode); 1619 } 1620 } 1621 orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f)1622 private <U extends T> CompletableFuture<Void> orAcceptStage( 1623 Executor e, CompletionStage<U> o, Consumer<? super T> f) { 1624 CompletableFuture<U> b; 1625 if (f == null || (b = o.toCompletableFuture()) == null) 1626 throw new NullPointerException(); 1627 1628 Object r; CompletableFuture<? extends T> z; 1629 if ((r = (z = this).result) != null || 1630 (r = (z = b).result) != null) 1631 return z.uniAcceptNow(r, e, f); 1632 1633 CompletableFuture<Void> d = newIncompleteFuture(); 1634 orpush(b, new OrAccept<T,U>(e, d, this, b, f)); 1635 return d; 1636 } 1637 1638 @SuppressWarnings("serial") 1639 static final class OrRun<T,U> extends BiCompletion<T,U,Void> { 1640 Runnable fn; OrRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1641 OrRun(Executor executor, CompletableFuture<Void> dep, 1642 CompletableFuture<T> src, CompletableFuture<U> snd, 1643 Runnable fn) { 1644 super(executor, dep, src, snd); this.fn = fn; 1645 } tryFire(int mode)1646 final CompletableFuture<Void> tryFire(int mode) { 1647 CompletableFuture<Void> d; CompletableFuture<?> a, b; 1648 Object r; Throwable x; Runnable f; 1649 if ((a = src) == null || (b = snd) == null 1650 || ((r = a.result) == null && (r = b.result) == null) 1651 || (d = dep) == null || (f = fn) == null) 1652 return null; 1653 if (d.result == null) { 1654 try { 1655 if (mode <= 0 && !claim()) 1656 return null; 1657 else if (r instanceof AltResult 1658 && (x = ((AltResult)r).ex) != null) 1659 d.completeThrowable(x, r); 1660 else { 1661 f.run(); 1662 d.completeNull(); 1663 } 1664 } catch (Throwable ex) { 1665 d.completeThrowable(ex); 1666 } 1667 } 1668 src = null; snd = null; dep = null; fn = null; 1669 return d.postFire(a, b, mode); 1670 } 1671 } 1672 orRunStage(Executor e, CompletionStage<?> o, Runnable f)1673 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, 1674 Runnable f) { 1675 CompletableFuture<?> b; 1676 if (f == null || (b = o.toCompletableFuture()) == null) 1677 throw new NullPointerException(); 1678 1679 Object r; CompletableFuture<?> z; 1680 if ((r = (z = this).result) != null || 1681 (r = (z = b).result) != null) 1682 return z.uniRunNow(r, e, f); 1683 1684 CompletableFuture<Void> d = newIncompleteFuture(); 1685 orpush(b, new OrRun<>(e, d, this, b, f)); 1686 return d; 1687 } 1688 1689 /** Completion for an anyOf input future. */ 1690 @SuppressWarnings("serial") 1691 static class AnyOf extends Completion { 1692 CompletableFuture<Object> dep; CompletableFuture<?> src; 1693 CompletableFuture<?>[] srcs; AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, CompletableFuture<?>[] srcs)1694 AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, 1695 CompletableFuture<?>[] srcs) { 1696 this.dep = dep; this.src = src; this.srcs = srcs; 1697 } tryFire(int mode)1698 final CompletableFuture<Object> tryFire(int mode) { 1699 // assert mode != ASYNC; 1700 CompletableFuture<Object> d; CompletableFuture<?> a; 1701 CompletableFuture<?>[] as; 1702 Object r; 1703 if ((a = src) == null || (r = a.result) == null 1704 || (d = dep) == null || (as = srcs) == null) 1705 return null; 1706 src = null; dep = null; srcs = null; 1707 if (d.completeRelay(r)) { 1708 for (CompletableFuture<?> b : as) 1709 if (b != a) 1710 b.cleanStack(); 1711 if (mode < 0) 1712 return d; 1713 else 1714 d.postComplete(); 1715 } 1716 return null; 1717 } isLive()1718 final boolean isLive() { 1719 CompletableFuture<Object> d; 1720 return (d = dep) != null && d.result == null; 1721 } 1722 } 1723 1724 /* ------------- Zero-input Async forms -------------- */ 1725 1726 @SuppressWarnings("serial") 1727 static final class AsyncSupply<T> extends ForkJoinTask<Void> 1728 implements Runnable, AsynchronousCompletionTask { 1729 CompletableFuture<T> dep; Supplier<? extends T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn)1730 AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) { 1731 this.dep = dep; this.fn = fn; 1732 } 1733 getRawResult()1734 public final Void getRawResult() { return null; } setRawResult(Void v)1735 public final void setRawResult(Void v) {} exec()1736 public final boolean exec() { run(); return false; } 1737 run()1738 public void run() { 1739 CompletableFuture<T> d; Supplier<? extends T> f; 1740 if ((d = dep) != null && (f = fn) != null) { 1741 dep = null; fn = null; 1742 if (d.result == null) { 1743 try { 1744 d.completeValue(f.get()); 1745 } catch (Throwable ex) { 1746 d.completeThrowable(ex); 1747 } 1748 } 1749 d.postComplete(); 1750 } 1751 } 1752 } 1753 asyncSupplyStage(Executor e, Supplier<U> f)1754 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 1755 Supplier<U> f) { 1756 if (f == null) throw new NullPointerException(); 1757 CompletableFuture<U> d = new CompletableFuture<U>(); 1758 e.execute(new AsyncSupply<U>(d, f)); 1759 return d; 1760 } 1761 1762 @SuppressWarnings("serial") 1763 static final class AsyncRun extends ForkJoinTask<Void> 1764 implements Runnable, AsynchronousCompletionTask { 1765 CompletableFuture<Void> dep; Runnable fn; AsyncRun(CompletableFuture<Void> dep, Runnable fn)1766 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { 1767 this.dep = dep; this.fn = fn; 1768 } 1769 getRawResult()1770 public final Void getRawResult() { return null; } setRawResult(Void v)1771 public final void setRawResult(Void v) {} exec()1772 public final boolean exec() { run(); return false; } 1773 run()1774 public void run() { 1775 CompletableFuture<Void> d; Runnable f; 1776 if ((d = dep) != null && (f = fn) != null) { 1777 dep = null; fn = null; 1778 if (d.result == null) { 1779 try { 1780 f.run(); 1781 d.completeNull(); 1782 } catch (Throwable ex) { 1783 d.completeThrowable(ex); 1784 } 1785 } 1786 d.postComplete(); 1787 } 1788 } 1789 } 1790 asyncRunStage(Executor e, Runnable f)1791 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { 1792 if (f == null) throw new NullPointerException(); 1793 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1794 e.execute(new AsyncRun(d, f)); 1795 return d; 1796 } 1797 1798 /* ------------- Signallers -------------- */ 1799 1800 /** 1801 * Completion for recording and releasing a waiting thread. This 1802 * class implements ManagedBlocker to avoid starvation when 1803 * blocking actions pile up in ForkJoinPools. 1804 */ 1805 @SuppressWarnings("serial") 1806 static final class Signaller extends Completion 1807 implements ForkJoinPool.ManagedBlocker { 1808 long nanos; // remaining wait time if timed 1809 final long deadline; // non-zero if timed 1810 final boolean interruptible; 1811 boolean interrupted; 1812 volatile Thread thread; 1813 Signaller(boolean interruptible, long nanos, long deadline)1814 Signaller(boolean interruptible, long nanos, long deadline) { 1815 this.thread = Thread.currentThread(); 1816 this.interruptible = interruptible; 1817 this.nanos = nanos; 1818 this.deadline = deadline; 1819 } tryFire(int ignore)1820 final CompletableFuture<?> tryFire(int ignore) { 1821 Thread w; // no need to atomically claim 1822 if ((w = thread) != null) { 1823 thread = null; 1824 LockSupport.unpark(w); 1825 } 1826 return null; 1827 } isReleasable()1828 public boolean isReleasable() { 1829 if (Thread.interrupted()) 1830 interrupted = true; 1831 return ((interrupted && interruptible) || 1832 (deadline != 0L && 1833 (nanos <= 0L || 1834 (nanos = deadline - System.nanoTime()) <= 0L)) || 1835 thread == null); 1836 } block()1837 public boolean block() { 1838 while (!isReleasable()) { 1839 if (deadline == 0L) 1840 LockSupport.park(this); 1841 else 1842 LockSupport.parkNanos(this, nanos); 1843 } 1844 return true; 1845 } isLive()1846 final boolean isLive() { return thread != null; } 1847 } 1848 1849 /** 1850 * Returns raw result after waiting, or null if interruptible and 1851 * interrupted. 1852 */ waitingGet(boolean interruptible)1853 private Object waitingGet(boolean interruptible) { 1854 if (interruptible && Thread.interrupted()) 1855 return null; 1856 Signaller q = null; 1857 boolean queued = false; 1858 Object r; 1859 while ((r = result) == null) { 1860 if (q == null) { 1861 q = new Signaller(interruptible, 0L, 0L); 1862 if (Thread.currentThread() instanceof ForkJoinWorkerThread) 1863 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1864 } 1865 else if (!queued) 1866 queued = tryPushStack(q); 1867 else if (interruptible && q.interrupted) { 1868 q.thread = null; 1869 cleanStack(); 1870 return null; 1871 } 1872 else { 1873 try { 1874 ForkJoinPool.managedBlock(q); 1875 } catch (InterruptedException ie) { // currently cannot happen 1876 q.interrupted = true; 1877 } 1878 } 1879 } 1880 if (q != null) { 1881 q.thread = null; 1882 if (q.interrupted) 1883 Thread.currentThread().interrupt(); 1884 } 1885 postComplete(); 1886 return r; 1887 } 1888 1889 /** 1890 * Returns raw result after waiting, or null if interrupted, or 1891 * throws TimeoutException on timeout. 1892 */ timedGet(long nanos)1893 private Object timedGet(long nanos) throws TimeoutException { 1894 long d = System.nanoTime() + nanos; 1895 long deadline = (d == 0L) ? 1L : d; // avoid 0 1896 boolean interrupted = false, queued = false; 1897 Signaller q = null; 1898 Object r = null; 1899 for (;;) { // order of checking interrupt, result, timeout matters 1900 if (interrupted || (interrupted = Thread.interrupted())) 1901 break; 1902 else if ((r = result) != null) 1903 break; 1904 else if (nanos <= 0L) 1905 break; 1906 else if (q == null) { 1907 q = new Signaller(true, nanos, deadline); 1908 if (Thread.currentThread() instanceof ForkJoinWorkerThread) 1909 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1910 } 1911 else if (!queued) 1912 queued = tryPushStack(q); 1913 else { 1914 try { 1915 ForkJoinPool.managedBlock(q); 1916 interrupted = q.interrupted; 1917 nanos = q.nanos; 1918 } catch (InterruptedException ie) { 1919 interrupted = true; 1920 } 1921 } 1922 } 1923 if (q != null) { 1924 q.thread = null; 1925 if (r == null) 1926 cleanStack(); 1927 } 1928 if (r != null) { 1929 if (interrupted) 1930 Thread.currentThread().interrupt(); 1931 postComplete(); 1932 return r; 1933 } else if (interrupted) 1934 return null; 1935 else 1936 throw new TimeoutException(); 1937 } 1938 1939 /* ------------- public methods -------------- */ 1940 1941 /** 1942 * Creates a new incomplete CompletableFuture. 1943 */ CompletableFuture()1944 public CompletableFuture() { 1945 } 1946 1947 /** 1948 * Creates a new complete CompletableFuture with given encoded result. 1949 */ CompletableFuture(Object r)1950 CompletableFuture(Object r) { 1951 RESULT.setRelease(this, r); 1952 } 1953 1954 /** 1955 * Returns a new CompletableFuture that is asynchronously completed 1956 * by a task running in the {@link ForkJoinPool#commonPool()} with 1957 * the value obtained by calling the given Supplier. 1958 * 1959 * @param supplier a function returning the value to be used 1960 * to complete the returned CompletableFuture 1961 * @param <U> the function's return type 1962 * @return the new CompletableFuture 1963 */ supplyAsync(Supplier<U> supplier)1964 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 1965 return asyncSupplyStage(ASYNC_POOL, supplier); 1966 } 1967 1968 /** 1969 * Returns a new CompletableFuture that is asynchronously completed 1970 * by a task running in the given executor with the value obtained 1971 * by calling the given Supplier. 1972 * 1973 * @param supplier a function returning the value to be used 1974 * to complete the returned CompletableFuture 1975 * @param executor the executor to use for asynchronous execution 1976 * @param <U> the function's return type 1977 * @return the new CompletableFuture 1978 */ supplyAsync(Supplier<U> supplier, Executor executor)1979 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 1980 Executor executor) { 1981 return asyncSupplyStage(screenExecutor(executor), supplier); 1982 } 1983 1984 /** 1985 * Returns a new CompletableFuture that is asynchronously completed 1986 * by a task running in the {@link ForkJoinPool#commonPool()} after 1987 * it runs the given action. 1988 * 1989 * @param runnable the action to run before completing the 1990 * returned CompletableFuture 1991 * @return the new CompletableFuture 1992 */ runAsync(Runnable runnable)1993 public static CompletableFuture<Void> runAsync(Runnable runnable) { 1994 return asyncRunStage(ASYNC_POOL, runnable); 1995 } 1996 1997 /** 1998 * Returns a new CompletableFuture that is asynchronously completed 1999 * by a task running in the given executor after it runs the given 2000 * action. 2001 * 2002 * @param runnable the action to run before completing the 2003 * returned CompletableFuture 2004 * @param executor the executor to use for asynchronous execution 2005 * @return the new CompletableFuture 2006 */ runAsync(Runnable runnable, Executor executor)2007 public static CompletableFuture<Void> runAsync(Runnable runnable, 2008 Executor executor) { 2009 return asyncRunStage(screenExecutor(executor), runnable); 2010 } 2011 2012 /** 2013 * Returns a new CompletableFuture that is already completed with 2014 * the given value. 2015 * 2016 * @param value the value 2017 * @param <U> the type of the value 2018 * @return the completed CompletableFuture 2019 */ completedFuture(U value)2020 public static <U> CompletableFuture<U> completedFuture(U value) { 2021 return new CompletableFuture<U>((value == null) ? NIL : value); 2022 } 2023 2024 /** 2025 * Returns {@code true} if completed in any fashion: normally, 2026 * exceptionally, or via cancellation. 2027 * 2028 * @return {@code true} if completed 2029 */ isDone()2030 public boolean isDone() { 2031 return result != null; 2032 } 2033 2034 /** 2035 * Waits if necessary for this future to complete, and then 2036 * returns its result. 2037 * 2038 * @return the result value 2039 * @throws CancellationException if this future was cancelled 2040 * @throws ExecutionException if this future completed exceptionally 2041 * @throws InterruptedException if the current thread was interrupted 2042 * while waiting 2043 */ 2044 @SuppressWarnings("unchecked") get()2045 public T get() throws InterruptedException, ExecutionException { 2046 Object r; 2047 if ((r = result) == null) 2048 r = waitingGet(true); 2049 return (T) reportGet(r); 2050 } 2051 2052 /** 2053 * Waits if necessary for at most the given time for this future 2054 * to complete, and then returns its result, if available. 2055 * 2056 * @param timeout the maximum time to wait 2057 * @param unit the time unit of the timeout argument 2058 * @return the result value 2059 * @throws CancellationException if this future was cancelled 2060 * @throws ExecutionException if this future completed exceptionally 2061 * @throws InterruptedException if the current thread was interrupted 2062 * while waiting 2063 * @throws TimeoutException if the wait timed out 2064 */ 2065 @SuppressWarnings("unchecked") get(long timeout, TimeUnit unit)2066 public T get(long timeout, TimeUnit unit) 2067 throws InterruptedException, ExecutionException, TimeoutException { 2068 long nanos = unit.toNanos(timeout); 2069 Object r; 2070 if ((r = result) == null) 2071 r = timedGet(nanos); 2072 return (T) reportGet(r); 2073 } 2074 2075 /** 2076 * Returns the result value when complete, or throws an 2077 * (unchecked) exception if completed exceptionally. To better 2078 * conform with the use of common functional forms, if a 2079 * computation involved in the completion of this 2080 * CompletableFuture threw an exception, this method throws an 2081 * (unchecked) {@link CompletionException} with the underlying 2082 * exception as its cause. 2083 * 2084 * @return the result value 2085 * @throws CancellationException if the computation was cancelled 2086 * @throws CompletionException if this future completed 2087 * exceptionally or a completion computation threw an exception 2088 */ 2089 @SuppressWarnings("unchecked") join()2090 public T join() { 2091 Object r; 2092 if ((r = result) == null) 2093 r = waitingGet(false); 2094 return (T) reportJoin(r); 2095 } 2096 2097 /** 2098 * Returns the result value (or throws any encountered exception) 2099 * if completed, else returns the given valueIfAbsent. 2100 * 2101 * @param valueIfAbsent the value to return if not completed 2102 * @return the result value, if completed, else the given valueIfAbsent 2103 * @throws CancellationException if the computation was cancelled 2104 * @throws CompletionException if this future completed 2105 * exceptionally or a completion computation threw an exception 2106 */ 2107 @SuppressWarnings("unchecked") getNow(T valueIfAbsent)2108 public T getNow(T valueIfAbsent) { 2109 Object r; 2110 return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r); 2111 } 2112 2113 @Override resultNow()2114 public T resultNow() { 2115 Object r = result; 2116 if (r != null) { 2117 if (r instanceof AltResult alt) { 2118 if (alt.ex == null) return null; 2119 } else { 2120 @SuppressWarnings("unchecked") 2121 T t = (T) r; 2122 return t; 2123 } 2124 } 2125 throw new IllegalStateException(); 2126 } 2127 2128 @Override exceptionNow()2129 public Throwable exceptionNow() { 2130 Object r = result; 2131 Throwable x; 2132 if (r instanceof AltResult alt 2133 && ((x = alt.ex) != null) 2134 && !(x instanceof CancellationException)) { 2135 if (x instanceof CompletionException) { 2136 Throwable cause = x.getCause(); 2137 if (cause != null) 2138 x = cause; 2139 } 2140 return x; 2141 } 2142 throw new IllegalStateException(); 2143 } 2144 2145 /** 2146 * If not already completed, sets the value returned by {@link 2147 * #get()} and related methods to the given value. 2148 * 2149 * @param value the result value 2150 * @return {@code true} if this invocation caused this CompletableFuture 2151 * to transition to a completed state, else {@code false} 2152 */ complete(T value)2153 public boolean complete(T value) { 2154 boolean triggered = completeValue(value); 2155 postComplete(); 2156 return triggered; 2157 } 2158 2159 /** 2160 * If not already completed, causes invocations of {@link #get()} 2161 * and related methods to throw the given exception. 2162 * 2163 * @param ex the exception 2164 * @return {@code true} if this invocation caused this CompletableFuture 2165 * to transition to a completed state, else {@code false} 2166 */ completeExceptionally(Throwable ex)2167 public boolean completeExceptionally(Throwable ex) { 2168 if (ex == null) throw new NullPointerException(); 2169 boolean triggered = internalComplete(new AltResult(ex)); 2170 postComplete(); 2171 return triggered; 2172 } 2173 thenApply( Function<? super T,? extends U> fn)2174 public <U> CompletableFuture<U> thenApply( 2175 Function<? super T,? extends U> fn) { 2176 return uniApplyStage(null, fn); 2177 } 2178 thenApplyAsync( Function<? super T,? extends U> fn)2179 public <U> CompletableFuture<U> thenApplyAsync( 2180 Function<? super T,? extends U> fn) { 2181 return uniApplyStage(defaultExecutor(), fn); 2182 } 2183 thenApplyAsync( Function<? super T,? extends U> fn, Executor executor)2184 public <U> CompletableFuture<U> thenApplyAsync( 2185 Function<? super T,? extends U> fn, Executor executor) { 2186 return uniApplyStage(screenExecutor(executor), fn); 2187 } 2188 thenAccept(Consumer<? super T> action)2189 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { 2190 return uniAcceptStage(null, action); 2191 } 2192 thenAcceptAsync(Consumer<? super T> action)2193 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { 2194 return uniAcceptStage(defaultExecutor(), action); 2195 } 2196 thenAcceptAsync(Consumer<? super T> action, Executor executor)2197 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, 2198 Executor executor) { 2199 return uniAcceptStage(screenExecutor(executor), action); 2200 } 2201 thenRun(Runnable action)2202 public CompletableFuture<Void> thenRun(Runnable action) { 2203 return uniRunStage(null, action); 2204 } 2205 thenRunAsync(Runnable action)2206 public CompletableFuture<Void> thenRunAsync(Runnable action) { 2207 return uniRunStage(defaultExecutor(), action); 2208 } 2209 thenRunAsync(Runnable action, Executor executor)2210 public CompletableFuture<Void> thenRunAsync(Runnable action, 2211 Executor executor) { 2212 return uniRunStage(screenExecutor(executor), action); 2213 } 2214 thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2215 public <U,V> CompletableFuture<V> thenCombine( 2216 CompletionStage<? extends U> other, 2217 BiFunction<? super T,? super U,? extends V> fn) { 2218 return biApplyStage(null, other, fn); 2219 } 2220 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2221 public <U,V> CompletableFuture<V> thenCombineAsync( 2222 CompletionStage<? extends U> other, 2223 BiFunction<? super T,? super U,? extends V> fn) { 2224 return biApplyStage(defaultExecutor(), other, fn); 2225 } 2226 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)2227 public <U,V> CompletableFuture<V> thenCombineAsync( 2228 CompletionStage<? extends U> other, 2229 BiFunction<? super T,? super U,? extends V> fn, Executor executor) { 2230 return biApplyStage(screenExecutor(executor), other, fn); 2231 } 2232 thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2233 public <U> CompletableFuture<Void> thenAcceptBoth( 2234 CompletionStage<? extends U> other, 2235 BiConsumer<? super T, ? super U> action) { 2236 return biAcceptStage(null, other, action); 2237 } 2238 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2239 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2240 CompletionStage<? extends U> other, 2241 BiConsumer<? super T, ? super U> action) { 2242 return biAcceptStage(defaultExecutor(), other, action); 2243 } 2244 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)2245 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2246 CompletionStage<? extends U> other, 2247 BiConsumer<? super T, ? super U> action, Executor executor) { 2248 return biAcceptStage(screenExecutor(executor), other, action); 2249 } 2250 runAfterBoth(CompletionStage<?> other, Runnable action)2251 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, 2252 Runnable action) { 2253 return biRunStage(null, other, action); 2254 } 2255 runAfterBothAsync(CompletionStage<?> other, Runnable action)2256 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2257 Runnable action) { 2258 return biRunStage(defaultExecutor(), other, action); 2259 } 2260 runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)2261 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2262 Runnable action, 2263 Executor executor) { 2264 return biRunStage(screenExecutor(executor), other, action); 2265 } 2266 applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)2267 public <U> CompletableFuture<U> applyToEither( 2268 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2269 return orApplyStage(null, other, fn); 2270 } 2271 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn)2272 public <U> CompletableFuture<U> applyToEitherAsync( 2273 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2274 return orApplyStage(defaultExecutor(), other, fn); 2275 } 2276 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)2277 public <U> CompletableFuture<U> applyToEitherAsync( 2278 CompletionStage<? extends T> other, Function<? super T, U> fn, 2279 Executor executor) { 2280 return orApplyStage(screenExecutor(executor), other, fn); 2281 } 2282 acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)2283 public CompletableFuture<Void> acceptEither( 2284 CompletionStage<? extends T> other, Consumer<? super T> action) { 2285 return orAcceptStage(null, other, action); 2286 } 2287 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)2288 public CompletableFuture<Void> acceptEitherAsync( 2289 CompletionStage<? extends T> other, Consumer<? super T> action) { 2290 return orAcceptStage(defaultExecutor(), other, action); 2291 } 2292 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)2293 public CompletableFuture<Void> acceptEitherAsync( 2294 CompletionStage<? extends T> other, Consumer<? super T> action, 2295 Executor executor) { 2296 return orAcceptStage(screenExecutor(executor), other, action); 2297 } 2298 runAfterEither(CompletionStage<?> other, Runnable action)2299 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, 2300 Runnable action) { 2301 return orRunStage(null, other, action); 2302 } 2303 runAfterEitherAsync(CompletionStage<?> other, Runnable action)2304 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2305 Runnable action) { 2306 return orRunStage(defaultExecutor(), other, action); 2307 } 2308 runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)2309 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2310 Runnable action, 2311 Executor executor) { 2312 return orRunStage(screenExecutor(executor), other, action); 2313 } 2314 thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)2315 public <U> CompletableFuture<U> thenCompose( 2316 Function<? super T, ? extends CompletionStage<U>> fn) { 2317 return uniComposeStage(null, fn); 2318 } 2319 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)2320 public <U> CompletableFuture<U> thenComposeAsync( 2321 Function<? super T, ? extends CompletionStage<U>> fn) { 2322 return uniComposeStage(defaultExecutor(), fn); 2323 } 2324 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)2325 public <U> CompletableFuture<U> thenComposeAsync( 2326 Function<? super T, ? extends CompletionStage<U>> fn, 2327 Executor executor) { 2328 return uniComposeStage(screenExecutor(executor), fn); 2329 } 2330 whenComplete( BiConsumer<? super T, ? super Throwable> action)2331 public CompletableFuture<T> whenComplete( 2332 BiConsumer<? super T, ? super Throwable> action) { 2333 return uniWhenCompleteStage(null, action); 2334 } 2335 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)2336 public CompletableFuture<T> whenCompleteAsync( 2337 BiConsumer<? super T, ? super Throwable> action) { 2338 return uniWhenCompleteStage(defaultExecutor(), action); 2339 } 2340 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)2341 public CompletableFuture<T> whenCompleteAsync( 2342 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 2343 return uniWhenCompleteStage(screenExecutor(executor), action); 2344 } 2345 handle( BiFunction<? super T, Throwable, ? extends U> fn)2346 public <U> CompletableFuture<U> handle( 2347 BiFunction<? super T, Throwable, ? extends U> fn) { 2348 return uniHandleStage(null, fn); 2349 } 2350 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)2351 public <U> CompletableFuture<U> handleAsync( 2352 BiFunction<? super T, Throwable, ? extends U> fn) { 2353 return uniHandleStage(defaultExecutor(), fn); 2354 } 2355 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)2356 public <U> CompletableFuture<U> handleAsync( 2357 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { 2358 return uniHandleStage(screenExecutor(executor), fn); 2359 } 2360 2361 /** 2362 * Returns this CompletableFuture. 2363 * 2364 * @return this CompletableFuture 2365 */ toCompletableFuture()2366 public CompletableFuture<T> toCompletableFuture() { 2367 return this; 2368 } 2369 2370 // Android-added: Override annotation to mark this as overriding from CompletionStage. 2371 @Override exceptionally( Function<Throwable, ? extends T> fn)2372 public CompletableFuture<T> exceptionally( 2373 Function<Throwable, ? extends T> fn) { 2374 return uniExceptionallyStage(null, fn); 2375 } 2376 2377 // Android-added: Override annotation to mark this as overriding from CompletionStage. 2378 @Override exceptionallyAsync( Function<Throwable, ? extends T> fn)2379 public CompletableFuture<T> exceptionallyAsync( 2380 Function<Throwable, ? extends T> fn) { 2381 return uniExceptionallyStage(defaultExecutor(), fn); 2382 } 2383 2384 // Android-added: Override annotation to mark this as overriding from CompletionStage. 2385 @Override exceptionallyAsync( Function<Throwable, ? extends T> fn, Executor executor)2386 public CompletableFuture<T> exceptionallyAsync( 2387 Function<Throwable, ? extends T> fn, Executor executor) { 2388 return uniExceptionallyStage(screenExecutor(executor), fn); 2389 } 2390 2391 // Android-added: Override annotation to mark this as overriding from CompletionStage. 2392 @Override exceptionallyCompose( Function<Throwable, ? extends CompletionStage<T>> fn)2393 public CompletableFuture<T> exceptionallyCompose( 2394 Function<Throwable, ? extends CompletionStage<T>> fn) { 2395 return uniComposeExceptionallyStage(null, fn); 2396 } 2397 2398 // Android-added: Override annotation to mark this as overriding from CompletionStage. 2399 @Override exceptionallyComposeAsync( Function<Throwable, ? extends CompletionStage<T>> fn)2400 public CompletableFuture<T> exceptionallyComposeAsync( 2401 Function<Throwable, ? extends CompletionStage<T>> fn) { 2402 return uniComposeExceptionallyStage(defaultExecutor(), fn); 2403 } 2404 2405 // Android-added: Override annotation to mark this as overriding from CompletionStage. 2406 @Override exceptionallyComposeAsync( Function<Throwable, ? extends CompletionStage<T>> fn, Executor executor)2407 public CompletableFuture<T> exceptionallyComposeAsync( 2408 Function<Throwable, ? extends CompletionStage<T>> fn, 2409 Executor executor) { 2410 return uniComposeExceptionallyStage(screenExecutor(executor), fn); 2411 } 2412 2413 /* ------------- Arbitrary-arity constructions -------------- */ 2414 2415 /** 2416 * Returns a new CompletableFuture that is completed when all of 2417 * the given CompletableFutures complete. If any of the given 2418 * CompletableFutures complete exceptionally, then the returned 2419 * CompletableFuture also does so, with a CompletionException 2420 * holding this exception as its cause. Otherwise, the results, 2421 * if any, of the given CompletableFutures are not reflected in 2422 * the returned CompletableFuture, but may be obtained by 2423 * inspecting them individually. If no CompletableFutures are 2424 * provided, returns a CompletableFuture completed with the value 2425 * {@code null}. 2426 * 2427 * <p>Among the applications of this method is to await completion 2428 * of a set of independent CompletableFutures before continuing a 2429 * program, as in: {@code CompletableFuture.allOf(c1, c2, 2430 * c3).join();}. 2431 * 2432 * @param cfs the CompletableFutures 2433 * @return a new CompletableFuture that is completed when all of the 2434 * given CompletableFutures complete 2435 * @throws NullPointerException if the array or any of its elements are 2436 * {@code null} 2437 */ allOf(CompletableFuture<?>.... cfs)2438 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2439 return andTree(cfs, 0, cfs.length - 1); 2440 } 2441 2442 /** 2443 * Returns a new CompletableFuture that is completed when any of 2444 * the given CompletableFutures complete, with the same result. 2445 * Otherwise, if it completed exceptionally, the returned 2446 * CompletableFuture also does so, with a CompletionException 2447 * holding this exception as its cause. If no CompletableFutures 2448 * are provided, returns an incomplete CompletableFuture. 2449 * 2450 * @param cfs the CompletableFutures 2451 * @return a new CompletableFuture that is completed with the 2452 * result or exception of any of the given CompletableFutures when 2453 * one completes 2454 * @throws NullPointerException if the array or any of its elements are 2455 * {@code null} 2456 */ anyOf(CompletableFuture<?>.... cfs)2457 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { 2458 int n; Object r; 2459 if ((n = cfs.length) <= 1) 2460 return (n == 0) 2461 ? new CompletableFuture<Object>() 2462 : uniCopyStage(cfs[0]); 2463 for (CompletableFuture<?> cf : cfs) 2464 if ((r = cf.result) != null) 2465 return new CompletableFuture<Object>(encodeRelay(r)); 2466 cfs = cfs.clone(); 2467 CompletableFuture<Object> d = new CompletableFuture<>(); 2468 for (CompletableFuture<?> cf : cfs) 2469 cf.unipush(new AnyOf(d, cf, cfs)); 2470 // If d was completed while we were adding completions, we should 2471 // clean the stack of any sources that may have had completions 2472 // pushed on their stack after d was completed. 2473 if (d.result != null) 2474 for (int i = 0, len = cfs.length; i < len; i++) 2475 if (cfs[i].result != null) 2476 for (i++; i < len; i++) 2477 if (cfs[i].result == null) 2478 cfs[i].cleanStack(); 2479 return d; 2480 } 2481 2482 /* ------------- Control and status methods -------------- */ 2483 2484 /** 2485 * If not already completed, completes this CompletableFuture with 2486 * a {@link CancellationException}. Dependent CompletableFutures 2487 * that have not already completed will also complete 2488 * exceptionally, with a {@link CompletionException} caused by 2489 * this {@code CancellationException}. 2490 * 2491 * @param mayInterruptIfRunning this value has no effect in this 2492 * implementation because interrupts are not used to control 2493 * processing. 2494 * 2495 * @return {@code true} if this task is now cancelled 2496 */ cancel(boolean mayInterruptIfRunning)2497 public boolean cancel(boolean mayInterruptIfRunning) { 2498 boolean cancelled = (result == null) && 2499 internalComplete(new AltResult(new CancellationException())); 2500 postComplete(); 2501 return cancelled || isCancelled(); 2502 } 2503 2504 /** 2505 * Returns {@code true} if this CompletableFuture was cancelled 2506 * before it completed normally. 2507 * 2508 * @return {@code true} if this CompletableFuture was cancelled 2509 * before it completed normally 2510 */ isCancelled()2511 public boolean isCancelled() { 2512 Object r; 2513 return ((r = result) instanceof AltResult) && 2514 (((AltResult)r).ex instanceof CancellationException); 2515 } 2516 2517 /** 2518 * Returns {@code true} if this CompletableFuture completed 2519 * exceptionally, in any way. Possible causes include 2520 * cancellation, explicit invocation of {@code 2521 * completeExceptionally}, and abrupt termination of a 2522 * CompletionStage action. 2523 * 2524 * @return {@code true} if this CompletableFuture completed 2525 * exceptionally 2526 */ isCompletedExceptionally()2527 public boolean isCompletedExceptionally() { 2528 Object r; 2529 return ((r = result) instanceof AltResult) && r != NIL; 2530 } 2531 2532 @Override state()2533 public State state() { 2534 Object r = result; 2535 if (r == null) 2536 return State.RUNNING; 2537 if (r != NIL && r instanceof AltResult alt) { 2538 if (alt.ex instanceof CancellationException) 2539 return State.CANCELLED; 2540 else 2541 return State.FAILED; 2542 } 2543 return State.SUCCESS; 2544 } 2545 2546 /** 2547 * Forcibly sets or resets the value subsequently returned by 2548 * method {@link #get()} and related methods, whether or not 2549 * already completed. This method is designed for use only in 2550 * error recovery actions, and even in such situations may result 2551 * in ongoing dependent completions using established versus 2552 * overwritten outcomes. 2553 * 2554 * @param value the completion value 2555 */ obtrudeValue(T value)2556 public void obtrudeValue(T value) { 2557 result = (value == null) ? NIL : value; 2558 postComplete(); 2559 } 2560 2561 /** 2562 * Forcibly causes subsequent invocations of method {@link #get()} 2563 * and related methods to throw the given exception, whether or 2564 * not already completed. This method is designed for use only in 2565 * error recovery actions, and even in such situations may result 2566 * in ongoing dependent completions using established versus 2567 * overwritten outcomes. 2568 * 2569 * @param ex the exception 2570 * @throws NullPointerException if the exception is null 2571 */ obtrudeException(Throwable ex)2572 public void obtrudeException(Throwable ex) { 2573 if (ex == null) throw new NullPointerException(); 2574 result = new AltResult(ex); 2575 postComplete(); 2576 } 2577 2578 /** 2579 * Returns the estimated number of CompletableFutures whose 2580 * completions are awaiting completion of this CompletableFuture. 2581 * This method is designed for use in monitoring system state, not 2582 * for synchronization control. 2583 * 2584 * @return the number of dependent CompletableFutures 2585 */ getNumberOfDependents()2586 public int getNumberOfDependents() { 2587 int count = 0; 2588 for (Completion p = stack; p != null; p = p.next) 2589 ++count; 2590 return count; 2591 } 2592 2593 /** 2594 * Returns a string identifying this CompletableFuture, as well as 2595 * its completion state. The state, in brackets, contains the 2596 * String {@code "Completed Normally"} or the String {@code 2597 * "Completed Exceptionally"}, or the String {@code "Not 2598 * completed"} followed by the number of CompletableFutures 2599 * dependent upon its completion, if any. 2600 * 2601 * @return a string identifying this CompletableFuture, as well as its state 2602 */ toString()2603 public String toString() { 2604 Object r = result; 2605 int count = 0; // avoid call to getNumberOfDependents in case disabled 2606 for (Completion p = stack; p != null; p = p.next) 2607 ++count; 2608 return super.toString() + 2609 ((r == null) 2610 ? ((count == 0) 2611 ? "[Not completed]" 2612 : "[Not completed, " + count + " dependents]") 2613 : (((r instanceof AltResult) && ((AltResult)r).ex != null) 2614 ? "[Completed exceptionally: " + ((AltResult)r).ex + "]" 2615 : "[Completed normally]")); 2616 } 2617 2618 // jdk9 additions 2619 2620 /** 2621 * Returns a new incomplete CompletableFuture of the type to be 2622 * returned by a CompletionStage method. Subclasses should 2623 * normally override this method to return an instance of the same 2624 * class as this CompletableFuture. The default implementation 2625 * returns an instance of class CompletableFuture. 2626 * 2627 * @param <U> the type of the value 2628 * @return a new CompletableFuture 2629 * @since 9 2630 */ newIncompleteFuture()2631 public <U> CompletableFuture<U> newIncompleteFuture() { 2632 return new CompletableFuture<U>(); 2633 } 2634 2635 /** 2636 * Returns the default Executor used for async methods that do not 2637 * specify an Executor. This class uses the {@link 2638 * ForkJoinPool#commonPool()} if it supports more than one 2639 * parallel thread, or else an Executor using one thread per async 2640 * task. This method may be overridden in subclasses to return 2641 * an Executor that provides at least one independent thread. 2642 * 2643 * @return the executor 2644 * @since 9 2645 */ defaultExecutor()2646 public Executor defaultExecutor() { 2647 return ASYNC_POOL; 2648 } 2649 2650 /** 2651 * Returns a new CompletableFuture that is completed normally with 2652 * the same value as this CompletableFuture when it completes 2653 * normally. If this CompletableFuture completes exceptionally, 2654 * then the returned CompletableFuture completes exceptionally 2655 * with a CompletionException with this exception as cause. The 2656 * behavior is equivalent to {@code thenApply(x -> x)}. This 2657 * method may be useful as a form of "defensive copying", to 2658 * prevent clients from completing, while still being able to 2659 * arrange dependent actions. 2660 * 2661 * @return the new CompletableFuture 2662 * @since 9 2663 */ copy()2664 public CompletableFuture<T> copy() { 2665 return uniCopyStage(this); 2666 } 2667 2668 /** 2669 * Returns a new CompletionStage that is completed normally with 2670 * the same value as this CompletableFuture when it completes 2671 * normally, and cannot be independently completed or otherwise 2672 * used in ways not defined by the methods of interface {@link 2673 * CompletionStage}. If this CompletableFuture completes 2674 * exceptionally, then the returned CompletionStage completes 2675 * exceptionally with a CompletionException with this exception as 2676 * cause. 2677 * 2678 * <p>Unless overridden by a subclass, a new non-minimal 2679 * CompletableFuture with all methods available can be obtained from 2680 * a minimal CompletionStage via {@link #toCompletableFuture()}. 2681 * For example, completion of a minimal stage can be awaited by 2682 * 2683 * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre> 2684 * 2685 * @return the new CompletionStage 2686 * @since 9 2687 */ minimalCompletionStage()2688 public CompletionStage<T> minimalCompletionStage() { 2689 return uniAsMinimalStage(); 2690 } 2691 2692 /** 2693 * Completes this CompletableFuture with the result of 2694 * the given Supplier function invoked from an asynchronous 2695 * task using the given executor. 2696 * 2697 * @param supplier a function returning the value to be used 2698 * to complete this CompletableFuture 2699 * @param executor the executor to use for asynchronous execution 2700 * @return this CompletableFuture 2701 * @since 9 2702 */ completeAsync(Supplier<? extends T> supplier, Executor executor)2703 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, 2704 Executor executor) { 2705 if (supplier == null || executor == null) 2706 throw new NullPointerException(); 2707 executor.execute(new AsyncSupply<T>(this, supplier)); 2708 return this; 2709 } 2710 2711 /** 2712 * Completes this CompletableFuture with the result of the given 2713 * Supplier function invoked from an asynchronous task using the 2714 * default executor. 2715 * 2716 * @param supplier a function returning the value to be used 2717 * to complete this CompletableFuture 2718 * @return this CompletableFuture 2719 * @since 9 2720 */ completeAsync(Supplier<? extends T> supplier)2721 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) { 2722 return completeAsync(supplier, defaultExecutor()); 2723 } 2724 2725 /** 2726 * Exceptionally completes this CompletableFuture with 2727 * a {@link TimeoutException} if not otherwise completed 2728 * before the given timeout. 2729 * 2730 * @param timeout how long to wait before completing exceptionally 2731 * with a TimeoutException, in units of {@code unit} 2732 * @param unit a {@code TimeUnit} determining how to interpret the 2733 * {@code timeout} parameter 2734 * @return this CompletableFuture 2735 * @since 9 2736 */ orTimeout(long timeout, TimeUnit unit)2737 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { 2738 if (unit == null) 2739 throw new NullPointerException(); 2740 if (result == null) 2741 whenComplete(new Canceller(Delayer.delay(new Timeout(this), 2742 timeout, unit))); 2743 return this; 2744 } 2745 2746 /** 2747 * Completes this CompletableFuture with the given value if not 2748 * otherwise completed before the given timeout. 2749 * 2750 * @param value the value to use upon timeout 2751 * @param timeout how long to wait before completing normally 2752 * with the given value, in units of {@code unit} 2753 * @param unit a {@code TimeUnit} determining how to interpret the 2754 * {@code timeout} parameter 2755 * @return this CompletableFuture 2756 * @since 9 2757 */ completeOnTimeout(T value, long timeout, TimeUnit unit)2758 public CompletableFuture<T> completeOnTimeout(T value, long timeout, 2759 TimeUnit unit) { 2760 if (unit == null) 2761 throw new NullPointerException(); 2762 if (result == null) 2763 whenComplete(new Canceller(Delayer.delay( 2764 new DelayedCompleter<T>(this, value), 2765 timeout, unit))); 2766 return this; 2767 } 2768 2769 /** 2770 * Returns a new Executor that submits a task to the given base 2771 * executor after the given delay (or no delay if non-positive). 2772 * Each delay commences upon invocation of the returned executor's 2773 * {@code execute} method. 2774 * 2775 * @param delay how long to delay, in units of {@code unit} 2776 * @param unit a {@code TimeUnit} determining how to interpret the 2777 * {@code delay} parameter 2778 * @param executor the base executor 2779 * @return the new delayed executor 2780 * @since 9 2781 */ delayedExecutor(long delay, TimeUnit unit, Executor executor)2782 public static Executor delayedExecutor(long delay, TimeUnit unit, 2783 Executor executor) { 2784 if (unit == null || executor == null) 2785 throw new NullPointerException(); 2786 return new DelayedExecutor(delay, unit, executor); 2787 } 2788 2789 /** 2790 * Returns a new Executor that submits a task to the default 2791 * executor after the given delay (or no delay if non-positive). 2792 * Each delay commences upon invocation of the returned executor's 2793 * {@code execute} method. 2794 * 2795 * @param delay how long to delay, in units of {@code unit} 2796 * @param unit a {@code TimeUnit} determining how to interpret the 2797 * {@code delay} parameter 2798 * @return the new delayed executor 2799 * @since 9 2800 */ delayedExecutor(long delay, TimeUnit unit)2801 public static Executor delayedExecutor(long delay, TimeUnit unit) { 2802 if (unit == null) 2803 throw new NullPointerException(); 2804 return new DelayedExecutor(delay, unit, ASYNC_POOL); 2805 } 2806 2807 /** 2808 * Returns a new CompletionStage that is already completed with 2809 * the given value and supports only those methods in 2810 * interface {@link CompletionStage}. 2811 * 2812 * @param value the value 2813 * @param <U> the type of the value 2814 * @return the completed CompletionStage 2815 * @since 9 2816 */ completedStage(U value)2817 public static <U> CompletionStage<U> completedStage(U value) { 2818 return new MinimalStage<U>((value == null) ? NIL : value); 2819 } 2820 2821 /** 2822 * Returns a new CompletableFuture that is already completed 2823 * exceptionally with the given exception. 2824 * 2825 * @param ex the exception 2826 * @param <U> the type of the value 2827 * @return the exceptionally completed CompletableFuture 2828 * @since 9 2829 */ failedFuture(Throwable ex)2830 public static <U> CompletableFuture<U> failedFuture(Throwable ex) { 2831 if (ex == null) throw new NullPointerException(); 2832 return new CompletableFuture<U>(new AltResult(ex)); 2833 } 2834 2835 /** 2836 * Returns a new CompletionStage that is already completed 2837 * exceptionally with the given exception and supports only those 2838 * methods in interface {@link CompletionStage}. 2839 * 2840 * @param ex the exception 2841 * @param <U> the type of the value 2842 * @return the exceptionally completed CompletionStage 2843 * @since 9 2844 */ failedStage(Throwable ex)2845 public static <U> CompletionStage<U> failedStage(Throwable ex) { 2846 if (ex == null) throw new NullPointerException(); 2847 return new MinimalStage<U>(new AltResult(ex)); 2848 } 2849 2850 /** 2851 * Singleton delay scheduler, used only for starting and 2852 * cancelling tasks. 2853 */ 2854 static final class Delayer { delay(Runnable command, long delay, TimeUnit unit)2855 static ScheduledFuture<?> delay(Runnable command, long delay, 2856 TimeUnit unit) { 2857 return delayer.schedule(command, delay, unit); 2858 } 2859 2860 static final class DaemonThreadFactory implements ThreadFactory { newThread(Runnable r)2861 public Thread newThread(Runnable r) { 2862 Thread t = new Thread(r); 2863 t.setDaemon(true); 2864 t.setName("CompletableFutureDelayScheduler"); 2865 return t; 2866 } 2867 } 2868 2869 static final ScheduledThreadPoolExecutor delayer; 2870 static { 2871 (delayer = new ScheduledThreadPoolExecutor( 2872 1, new DaemonThreadFactory())). 2873 setRemoveOnCancelPolicy(true); 2874 } 2875 } 2876 2877 // Little class-ified lambdas to better support monitoring 2878 2879 static final class DelayedExecutor implements Executor { 2880 final long delay; 2881 final TimeUnit unit; 2882 final Executor executor; DelayedExecutor(long delay, TimeUnit unit, Executor executor)2883 DelayedExecutor(long delay, TimeUnit unit, Executor executor) { 2884 this.delay = delay; this.unit = unit; this.executor = executor; 2885 } execute(Runnable r)2886 public void execute(Runnable r) { 2887 Delayer.delay(new TaskSubmitter(executor, r), delay, unit); 2888 } 2889 } 2890 2891 /** Action to submit user task */ 2892 static final class TaskSubmitter implements Runnable { 2893 final Executor executor; 2894 final Runnable action; TaskSubmitter(Executor executor, Runnable action)2895 TaskSubmitter(Executor executor, Runnable action) { 2896 this.executor = executor; 2897 this.action = action; 2898 } run()2899 public void run() { executor.execute(action); } 2900 } 2901 2902 /** Action to completeExceptionally on timeout */ 2903 static final class Timeout implements Runnable { 2904 final CompletableFuture<?> f; Timeout(CompletableFuture<?> f)2905 Timeout(CompletableFuture<?> f) { this.f = f; } run()2906 public void run() { 2907 if (f != null && !f.isDone()) 2908 f.completeExceptionally(new TimeoutException()); 2909 } 2910 } 2911 2912 /** Action to complete on timeout */ 2913 static final class DelayedCompleter<U> implements Runnable { 2914 final CompletableFuture<U> f; 2915 final U u; DelayedCompleter(CompletableFuture<U> f, U u)2916 DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; } run()2917 public void run() { 2918 if (f != null) 2919 f.complete(u); 2920 } 2921 } 2922 2923 /** Action to cancel unneeded timeouts */ 2924 static final class Canceller implements BiConsumer<Object, Throwable> { 2925 final Future<?> f; Canceller(Future<?> f)2926 Canceller(Future<?> f) { this.f = f; } accept(Object ignore, Throwable ex)2927 public void accept(Object ignore, Throwable ex) { 2928 if (f != null && !f.isDone()) 2929 f.cancel(false); 2930 } 2931 } 2932 2933 /** 2934 * A subclass that just throws UOE for most non-CompletionStage methods. 2935 */ 2936 static final class MinimalStage<T> extends CompletableFuture<T> { MinimalStage()2937 MinimalStage() { } MinimalStage(Object r)2938 MinimalStage(Object r) { super(r); } newIncompleteFuture()2939 @Override public <U> CompletableFuture<U> newIncompleteFuture() { 2940 return new MinimalStage<U>(); } get()2941 @Override public T get() { 2942 throw new UnsupportedOperationException(); } get(long timeout, TimeUnit unit)2943 @Override public T get(long timeout, TimeUnit unit) { 2944 throw new UnsupportedOperationException(); } getNow(T valueIfAbsent)2945 @Override public T getNow(T valueIfAbsent) { 2946 throw new UnsupportedOperationException(); } join()2947 @Override public T join() { 2948 throw new UnsupportedOperationException(); } resultNow()2949 @Override public T resultNow() { 2950 throw new UnsupportedOperationException(); } exceptionNow()2951 @Override public Throwable exceptionNow() { 2952 throw new UnsupportedOperationException(); } complete(T value)2953 @Override public boolean complete(T value) { 2954 throw new UnsupportedOperationException(); } completeExceptionally(Throwable ex)2955 @Override public boolean completeExceptionally(Throwable ex) { 2956 throw new UnsupportedOperationException(); } cancel(boolean mayInterruptIfRunning)2957 @Override public boolean cancel(boolean mayInterruptIfRunning) { 2958 throw new UnsupportedOperationException(); } obtrudeValue(T value)2959 @Override public void obtrudeValue(T value) { 2960 throw new UnsupportedOperationException(); } obtrudeException(Throwable ex)2961 @Override public void obtrudeException(Throwable ex) { 2962 throw new UnsupportedOperationException(); } isDone()2963 @Override public boolean isDone() { 2964 throw new UnsupportedOperationException(); } isCancelled()2965 @Override public boolean isCancelled() { 2966 throw new UnsupportedOperationException(); } isCompletedExceptionally()2967 @Override public boolean isCompletedExceptionally() { 2968 throw new UnsupportedOperationException(); } state()2969 @Override public State state() { 2970 throw new UnsupportedOperationException(); } getNumberOfDependents()2971 @Override public int getNumberOfDependents() { 2972 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier, Executor executor)2973 @Override public CompletableFuture<T> completeAsync 2974 (Supplier<? extends T> supplier, Executor executor) { 2975 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier)2976 @Override public CompletableFuture<T> completeAsync 2977 (Supplier<? extends T> supplier) { 2978 throw new UnsupportedOperationException(); } orTimeout(long timeout, TimeUnit unit)2979 @Override public CompletableFuture<T> orTimeout 2980 (long timeout, TimeUnit unit) { 2981 throw new UnsupportedOperationException(); } completeOnTimeout(T value, long timeout, TimeUnit unit)2982 @Override public CompletableFuture<T> completeOnTimeout 2983 (T value, long timeout, TimeUnit unit) { 2984 throw new UnsupportedOperationException(); } toCompletableFuture()2985 @Override public CompletableFuture<T> toCompletableFuture() { 2986 Object r; 2987 if ((r = result) != null) 2988 return new CompletableFuture<T>(encodeRelay(r)); 2989 else { 2990 CompletableFuture<T> d = new CompletableFuture<>(); 2991 unipush(new UniRelay<T,T>(d, this)); 2992 return d; 2993 } 2994 } 2995 } 2996 2997 // VarHandle mechanics 2998 private static final VarHandle RESULT; 2999 private static final VarHandle STACK; 3000 private static final VarHandle NEXT; 3001 static { 3002 try { 3003 MethodHandles.Lookup l = MethodHandles.lookup(); 3004 RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class); 3005 STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class); 3006 NEXT = l.findVarHandle(Completion.class, "next", Completion.class); 3007 } catch (ReflectiveOperationException e) { 3008 throw new ExceptionInInitializerError(e); 3009 } 3010 3011 // Reduce the risk of rare disastrous classloading in first call to 3012 // LockSupport.park: https://bugs.openjdk.org/browse/JDK-8074773 3013 Class<?> ensureLoaded = LockSupport.class; 3014 } 3015 } 3016