1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.lang.invoke.MethodHandles; 39 import java.lang.invoke.VarHandle; 40 import java.util.concurrent.locks.LockSupport; 41 import java.util.function.BiConsumer; 42 import java.util.function.BiFunction; 43 import java.util.function.Consumer; 44 import java.util.function.Function; 45 import java.util.function.Supplier; 46 47 // Android-note: Class javadoc changed to remove references to hidden OpenJDK 9 methods. 48 49 /** 50 * A {@link Future} that may be explicitly completed (setting its 51 * value and status), and may be used as a {@link CompletionStage}, 52 * supporting dependent functions and actions that trigger upon its 53 * completion. 54 * 55 * <p>When two or more threads attempt to 56 * {@link #complete complete}, 57 * {@link #completeExceptionally completeExceptionally}, or 58 * {@link #cancel cancel} 59 * a CompletableFuture, only one of them succeeds. 60 * 61 * <p>In addition to these and related methods for directly 62 * manipulating status and results, CompletableFuture implements 63 * interface {@link CompletionStage} with the following policies: <ul> 64 * 65 * <li>Actions supplied for dependent completions of 66 * <em>non-async</em> methods may be performed by the thread that 67 * completes the current CompletableFuture, or by any other caller of 68 * a completion method. 69 * 70 * <li>All <em>async</em> methods without an explicit Executor 71 * argument are performed using the {@link ForkJoinPool#commonPool()} 72 * (unless it does not support a parallelism level of at least two, in 73 * which case, a new Thread is created to run each task). 74 * To simplify monitoring, debugging, 75 * and tracking, all generated asynchronous tasks are instances of the 76 * marker interface {@link AsynchronousCompletionTask}. Operations 77 * with time-delays can use adapter methods defined in this class, for 78 * example: {@code supplyAsync(supplier, delayedExecutor(timeout, 79 * timeUnit))}. To support methods with delays and timeouts, this 80 * class maintains at most one daemon thread for triggering and 81 * cancelling actions, not for running them. 82 * 83 * <li>All CompletionStage methods are implemented independently of 84 * other public methods, so the behavior of one method is not impacted 85 * by overrides of others in subclasses. 86 * 87 * </ul> 88 * 89 * <p>CompletableFuture also implements {@link Future} with the following 90 * policies: <ul> 91 * 92 * <li>Since (unlike {@link FutureTask}) this class has no direct 93 * control over the computation that causes it to be completed, 94 * cancellation is treated as just another form of exceptional 95 * completion. Method {@link #cancel cancel} has the same effect as 96 * {@code completeExceptionally(new CancellationException())}. Method 97 * {@link #isCompletedExceptionally} can be used to determine if a 98 * CompletableFuture completed in any exceptional fashion. 99 * 100 * <li>In case of exceptional completion with a CompletionException, 101 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 102 * {@link ExecutionException} with the same cause as held in the 103 * corresponding CompletionException. To simplify usage in most 104 * contexts, this class also defines methods {@link #join()} and 105 * {@link #getNow} that instead throw the CompletionException directly 106 * in these cases. 107 * </ul> 108 * 109 * <p>Arguments used to pass a completion result (that is, for 110 * parameters of type {@code T}) for methods accepting them may be 111 * null, but passing a null value for any other parameter will result 112 * in a {@link NullPointerException} being thrown. 113 * 114 * @author Doug Lea 115 * @param <T> The result type returned by this future's {@code join} 116 * and {@code get} methods 117 * @since 1.8 118 */ 119 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { 120 121 /* 122 * Overview: 123 * 124 * A CompletableFuture may have dependent completion actions, 125 * collected in a linked stack. It atomically completes by CASing 126 * a result field, and then pops off and runs those actions. This 127 * applies across normal vs exceptional outcomes, sync vs async 128 * actions, binary triggers, and various forms of completions. 129 * 130 * Non-nullness of volatile field "result" indicates done. It may 131 * be set directly if known to be thread-confined, else via CAS. 132 * An AltResult is used to box null as a result, as well as to 133 * hold exceptions. Using a single field makes completion simple 134 * to detect and trigger. Result encoding and decoding is 135 * straightforward but tedious and adds to the sprawl of trapping 136 * and associating exceptions with targets. Minor simplifications 137 * rely on (static) NIL (to box null results) being the only 138 * AltResult with a null exception field, so we don't usually need 139 * explicit comparisons. Even though some of the generics casts 140 * are unchecked (see SuppressWarnings annotations), they are 141 * placed to be appropriate even if checked. 142 * 143 * Dependent actions are represented by Completion objects linked 144 * as Treiber stacks headed by field "stack". There are Completion 145 * classes for each kind of action, grouped into: 146 * - single-input (UniCompletion), 147 * - two-input (BiCompletion), 148 * - projected (BiCompletions using exactly one of two inputs), 149 * - shared (CoCompletion, used by the second of two sources), 150 * - zero-input source actions, 151 * - Signallers that unblock waiters. 152 * Class Completion extends ForkJoinTask to enable async execution 153 * (adding no space overhead because we exploit its "tag" methods 154 * to maintain claims). It is also declared as Runnable to allow 155 * usage with arbitrary executors. 156 * 157 * Support for each kind of CompletionStage relies on a separate 158 * class, along with two CompletableFuture methods: 159 * 160 * * A Completion class with name X corresponding to function, 161 * prefaced with "Uni", "Bi", or "Or". Each class contains 162 * fields for source(s), actions, and dependent. They are 163 * boringly similar, differing from others only with respect to 164 * underlying functional forms. We do this so that users don't 165 * encounter layers of adapters in common usages. 166 * 167 * * Boolean CompletableFuture method x(...) (for example 168 * biApply) takes all of the arguments needed to check that an 169 * action is triggerable, and then either runs the action or 170 * arranges its async execution by executing its Completion 171 * argument, if present. The method returns true if known to be 172 * complete. 173 * 174 * * Completion method tryFire(int mode) invokes the associated x 175 * method with its held arguments, and on success cleans up. 176 * The mode argument allows tryFire to be called twice (SYNC, 177 * then ASYNC); the first to screen and trap exceptions while 178 * arranging to execute, and the second when called from a task. 179 * (A few classes are not used async so take slightly different 180 * forms.) The claim() callback suppresses function invocation 181 * if already claimed by another thread. 182 * 183 * * Some classes (for example UniApply) have separate handling 184 * code for when known to be thread-confined ("now" methods) and 185 * for when shared (in tryFire), for efficiency. 186 * 187 * * CompletableFuture method xStage(...) is called from a public 188 * stage method of CompletableFuture f. It screens user 189 * arguments and invokes and/or creates the stage object. If 190 * not async and already triggerable, the action is run 191 * immediately. Otherwise a Completion c is created, and 192 * submitted to the executor if triggerable, or pushed onto f's 193 * stack if not. Completion actions are started via c.tryFire. 194 * We recheck after pushing to a source future's stack to cover 195 * possible races if the source completes while pushing. 196 * Classes with two inputs (for example BiApply) deal with races 197 * across both while pushing actions. The second completion is 198 * a CoCompletion pointing to the first, shared so that at most 199 * one performs the action. The multiple-arity methods allOf 200 * does this pairwise to form trees of completions. Method 201 * anyOf is handled differently from allOf because completion of 202 * any source should trigger a cleanStack of other sources. 203 * Each AnyOf completion can reach others via a shared array. 204 * 205 * Note that the generic type parameters of methods vary according 206 * to whether "this" is a source, dependent, or completion. 207 * 208 * Method postComplete is called upon completion unless the target 209 * is guaranteed not to be observable (i.e., not yet returned or 210 * linked). Multiple threads can call postComplete, which 211 * atomically pops each dependent action, and tries to trigger it 212 * via method tryFire, in NESTED mode. Triggering can propagate 213 * recursively, so NESTED mode returns its completed dependent (if 214 * one exists) for further processing by its caller (see method 215 * postFire). 216 * 217 * Blocking methods get() and join() rely on Signaller Completions 218 * that wake up waiting threads. The mechanics are similar to 219 * Treiber stack wait-nodes used in FutureTask, Phaser, and 220 * SynchronousQueue. See their internal documentation for 221 * algorithmic details. 222 * 223 * Without precautions, CompletableFutures would be prone to 224 * garbage accumulation as chains of Completions build up, each 225 * pointing back to its sources. So we null out fields as soon as 226 * possible. The screening checks needed anyway harmlessly ignore 227 * null arguments that may have been obtained during races with 228 * threads nulling out fields. We also try to unlink non-isLive 229 * (fired or cancelled) Completions from stacks that might 230 * otherwise never be popped: Method cleanStack always unlinks non 231 * isLive completions from the head of stack; others may 232 * occasionally remain if racing with other cancellations or 233 * removals. 234 * 235 * Completion fields need not be declared as final or volatile 236 * because they are only visible to other threads upon safe 237 * publication. 238 */ 239 240 volatile Object result; // Either the result or boxed AltResult 241 volatile Completion stack; // Top of Treiber stack of dependent actions 242 internalComplete(Object r)243 final boolean internalComplete(Object r) { // CAS from null to r 244 return RESULT.compareAndSet(this, null, r); 245 } 246 247 /** Returns true if successfully pushed c onto stack. */ tryPushStack(Completion c)248 final boolean tryPushStack(Completion c) { 249 Completion h = stack; 250 NEXT.set(c, h); // CAS piggyback 251 return STACK.compareAndSet(this, h, c); 252 } 253 254 /** Unconditionally pushes c onto stack, retrying if necessary. */ pushStack(Completion c)255 final void pushStack(Completion c) { 256 do {} while (!tryPushStack(c)); 257 } 258 259 /* ------------- Encoding and decoding outcomes -------------- */ 260 261 static final class AltResult { // See above 262 final Throwable ex; // null only for NIL AltResult(Throwable x)263 AltResult(Throwable x) { this.ex = x; } 264 } 265 266 /** The encoding of the null value. */ 267 static final AltResult NIL = new AltResult(null); 268 269 /** Completes with the null value, unless already completed. */ completeNull()270 final boolean completeNull() { 271 return RESULT.compareAndSet(this, null, NIL); 272 } 273 274 /** Returns the encoding of the given non-exceptional value. */ encodeValue(T t)275 final Object encodeValue(T t) { 276 return (t == null) ? NIL : t; 277 } 278 279 /** Completes with a non-exceptional result, unless already completed. */ completeValue(T t)280 final boolean completeValue(T t) { 281 return RESULT.compareAndSet(this, null, (t == null) ? NIL : t); 282 } 283 284 /** 285 * Returns the encoding of the given (non-null) exception as a 286 * wrapped CompletionException unless it is one already. 287 */ encodeThrowable(Throwable x)288 static AltResult encodeThrowable(Throwable x) { 289 return new AltResult((x instanceof CompletionException) ? x : 290 new CompletionException(x)); 291 } 292 293 /** Completes with an exceptional result, unless already completed. */ completeThrowable(Throwable x)294 final boolean completeThrowable(Throwable x) { 295 return RESULT.compareAndSet(this, null, encodeThrowable(x)); 296 } 297 298 /** 299 * Returns the encoding of the given (non-null) exception as a 300 * wrapped CompletionException unless it is one already. May 301 * return the given Object r (which must have been the result of a 302 * source future) if it is equivalent, i.e. if this is a simple 303 * relay of an existing CompletionException. 304 */ encodeThrowable(Throwable x, Object r)305 static Object encodeThrowable(Throwable x, Object r) { 306 if (!(x instanceof CompletionException)) 307 x = new CompletionException(x); 308 else if (r instanceof AltResult && x == ((AltResult)r).ex) 309 return r; 310 return new AltResult(x); 311 } 312 313 /** 314 * Completes with the given (non-null) exceptional result as a 315 * wrapped CompletionException unless it is one already, unless 316 * already completed. May complete with the given Object r 317 * (which must have been the result of a source future) if it is 318 * equivalent, i.e. if this is a simple propagation of an 319 * existing CompletionException. 320 */ completeThrowable(Throwable x, Object r)321 final boolean completeThrowable(Throwable x, Object r) { 322 return RESULT.compareAndSet(this, null, encodeThrowable(x, r)); 323 } 324 325 /** 326 * Returns the encoding of the given arguments: if the exception 327 * is non-null, encodes as AltResult. Otherwise uses the given 328 * value, boxed as NIL if null. 329 */ encodeOutcome(T t, Throwable x)330 Object encodeOutcome(T t, Throwable x) { 331 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); 332 } 333 334 /** 335 * Returns the encoding of a copied outcome; if exceptional, 336 * rewraps as a CompletionException, else returns argument. 337 */ encodeRelay(Object r)338 static Object encodeRelay(Object r) { 339 Throwable x; 340 if (r instanceof AltResult 341 && (x = ((AltResult)r).ex) != null 342 && !(x instanceof CompletionException)) 343 r = new AltResult(new CompletionException(x)); 344 return r; 345 } 346 347 /** 348 * Completes with r or a copy of r, unless already completed. 349 * If exceptional, r is first coerced to a CompletionException. 350 */ completeRelay(Object r)351 final boolean completeRelay(Object r) { 352 return RESULT.compareAndSet(this, null, encodeRelay(r)); 353 } 354 355 /** 356 * Reports result using Future.get conventions. 357 */ reportGet(Object r)358 private static Object reportGet(Object r) 359 throws InterruptedException, ExecutionException { 360 if (r == null) // by convention below, null means interrupted 361 throw new InterruptedException(); 362 if (r instanceof AltResult) { 363 Throwable x, cause; 364 if ((x = ((AltResult)r).ex) == null) 365 return null; 366 if (x instanceof CancellationException) 367 throw (CancellationException)x; 368 if ((x instanceof CompletionException) && 369 (cause = x.getCause()) != null) 370 x = cause; 371 throw new ExecutionException(x); 372 } 373 return r; 374 } 375 376 /** 377 * Decodes outcome to return result or throw unchecked exception. 378 */ reportJoin(Object r)379 private static Object reportJoin(Object r) { 380 if (r instanceof AltResult) { 381 Throwable x; 382 if ((x = ((AltResult)r).ex) == null) 383 return null; 384 if (x instanceof CancellationException) 385 throw (CancellationException)x; 386 if (x instanceof CompletionException) 387 throw (CompletionException)x; 388 throw new CompletionException(x); 389 } 390 return r; 391 } 392 393 /* ------------- Async task preliminaries -------------- */ 394 395 /** 396 * A marker interface identifying asynchronous tasks produced by 397 * {@code async} methods. This may be useful for monitoring, 398 * debugging, and tracking asynchronous activities. 399 * 400 * @since 1.8 401 */ 402 public static interface AsynchronousCompletionTask { 403 } 404 405 private static final boolean USE_COMMON_POOL = 406 (ForkJoinPool.getCommonPoolParallelism() > 1); 407 408 /** 409 * Default executor -- ForkJoinPool.commonPool() unless it cannot 410 * support parallelism. 411 */ 412 private static final Executor ASYNC_POOL = USE_COMMON_POOL ? 413 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 414 415 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 416 static final class ThreadPerTaskExecutor implements Executor { execute(Runnable r)417 public void execute(Runnable r) { new Thread(r).start(); } 418 } 419 420 /** 421 * Null-checks user executor argument, and translates uses of 422 * commonPool to ASYNC_POOL in case parallelism disabled. 423 */ screenExecutor(Executor e)424 static Executor screenExecutor(Executor e) { 425 if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool()) 426 return ASYNC_POOL; 427 if (e == null) throw new NullPointerException(); 428 return e; 429 } 430 431 // Modes for Completion.tryFire. Signedness matters. 432 static final int SYNC = 0; 433 static final int ASYNC = 1; 434 static final int NESTED = -1; 435 436 /* ------------- Base Completion classes and operations -------------- */ 437 438 @SuppressWarnings("serial") 439 abstract static class Completion extends ForkJoinTask<Void> 440 implements Runnable, AsynchronousCompletionTask { 441 volatile Completion next; // Treiber stack link 442 443 /** 444 * Performs completion action if triggered, returning a 445 * dependent that may need propagation, if one exists. 446 * 447 * @param mode SYNC, ASYNC, or NESTED 448 */ tryFire(int mode)449 abstract CompletableFuture<?> tryFire(int mode); 450 451 /** Returns true if possibly still triggerable. Used by cleanStack. */ isLive()452 abstract boolean isLive(); 453 run()454 public final void run() { tryFire(ASYNC); } exec()455 public final boolean exec() { tryFire(ASYNC); return false; } getRawResult()456 public final Void getRawResult() { return null; } setRawResult(Void v)457 public final void setRawResult(Void v) {} 458 } 459 460 /** 461 * Pops and tries to trigger all reachable dependents. Call only 462 * when known to be done. 463 */ postComplete()464 final void postComplete() { 465 /* 466 * On each step, variable f holds current dependents to pop 467 * and run. It is extended along only one path at a time, 468 * pushing others to avoid unbounded recursion. 469 */ 470 CompletableFuture<?> f = this; Completion h; 471 while ((h = f.stack) != null || 472 (f != this && (h = (f = this).stack) != null)) { 473 CompletableFuture<?> d; Completion t; 474 if (STACK.compareAndSet(f, h, t = h.next)) { 475 if (t != null) { 476 if (f != this) { 477 pushStack(h); 478 continue; 479 } 480 NEXT.compareAndSet(h, t, null); // try to detach 481 } 482 f = (d = h.tryFire(NESTED)) == null ? this : d; 483 } 484 } 485 } 486 487 /** Traverses stack and unlinks one or more dead Completions, if found. */ cleanStack()488 final void cleanStack() { 489 Completion p = stack; 490 // ensure head of stack live 491 for (boolean unlinked = false;;) { 492 if (p == null) 493 return; 494 else if (p.isLive()) { 495 if (unlinked) 496 return; 497 else 498 break; 499 } 500 else if (STACK.weakCompareAndSet(this, p, (p = p.next))) 501 unlinked = true; 502 else 503 p = stack; 504 } 505 // try to unlink first non-live 506 for (Completion q = p.next; q != null;) { 507 Completion s = q.next; 508 if (q.isLive()) { 509 p = q; 510 q = s; 511 } else if (NEXT.weakCompareAndSet(p, q, s)) 512 break; 513 else 514 q = p.next; 515 } 516 } 517 518 /* ------------- One-input Completions -------------- */ 519 520 /** A Completion with a source, dependent, and executor. */ 521 @SuppressWarnings("serial") 522 abstract static class UniCompletion<T,V> extends Completion { 523 Executor executor; // executor to use (null if none) 524 CompletableFuture<V> dep; // the dependent to complete 525 CompletableFuture<T> src; // source for action 526 UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src)527 UniCompletion(Executor executor, CompletableFuture<V> dep, 528 CompletableFuture<T> src) { 529 this.executor = executor; this.dep = dep; this.src = src; 530 } 531 532 /** 533 * Returns true if action can be run. Call only when known to 534 * be triggerable. Uses FJ tag bit to ensure that only one 535 * thread claims ownership. If async, starts as task -- a 536 * later call to tryFire will run action. 537 */ claim()538 final boolean claim() { 539 Executor e = executor; 540 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { 541 if (e == null) 542 return true; 543 executor = null; // disable 544 e.execute(this); 545 } 546 return false; 547 } 548 isLive()549 final boolean isLive() { return dep != null; } 550 } 551 552 /** 553 * Pushes the given completion unless it completes while trying. 554 * Caller should first check that result is null. 555 */ unipush(Completion c)556 final void unipush(Completion c) { 557 if (c != null) { 558 while (!tryPushStack(c)) { 559 if (result != null) { 560 NEXT.set(c, null); 561 break; 562 } 563 } 564 if (result != null) 565 c.tryFire(SYNC); 566 } 567 } 568 569 /** 570 * Post-processing by dependent after successful UniCompletion tryFire. 571 * Tries to clean stack of source a, and then either runs postComplete 572 * or returns this to caller, depending on mode. 573 */ postFire(CompletableFuture<?> a, int mode)574 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { 575 if (a != null && a.stack != null) { 576 Object r; 577 if ((r = a.result) == null) 578 a.cleanStack(); 579 if (mode >= 0 && (r != null || a.result != null)) 580 a.postComplete(); 581 } 582 if (result != null && stack != null) { 583 if (mode < 0) 584 return this; 585 else 586 postComplete(); 587 } 588 return null; 589 } 590 591 @SuppressWarnings("serial") 592 static final class UniApply<T,V> extends UniCompletion<T,V> { 593 Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn)594 UniApply(Executor executor, CompletableFuture<V> dep, 595 CompletableFuture<T> src, 596 Function<? super T,? extends V> fn) { 597 super(executor, dep, src); this.fn = fn; 598 } tryFire(int mode)599 final CompletableFuture<V> tryFire(int mode) { 600 CompletableFuture<V> d; CompletableFuture<T> a; 601 Object r; Throwable x; Function<? super T,? extends V> f; 602 if ((d = dep) == null || (f = fn) == null 603 || (a = src) == null || (r = a.result) == null) 604 return null; 605 tryComplete: if (d.result == null) { 606 if (r instanceof AltResult) { 607 if ((x = ((AltResult)r).ex) != null) { 608 d.completeThrowable(x, r); 609 break tryComplete; 610 } 611 r = null; 612 } 613 try { 614 if (mode <= 0 && !claim()) 615 return null; 616 else { 617 @SuppressWarnings("unchecked") T t = (T) r; 618 d.completeValue(f.apply(t)); 619 } 620 } catch (Throwable ex) { 621 d.completeThrowable(ex); 622 } 623 } 624 dep = null; src = null; fn = null; 625 return d.postFire(a, mode); 626 } 627 } 628 uniApplyStage( Executor e, Function<? super T,? extends V> f)629 private <V> CompletableFuture<V> uniApplyStage( 630 Executor e, Function<? super T,? extends V> f) { 631 if (f == null) throw new NullPointerException(); 632 Object r; 633 if ((r = result) != null) 634 return uniApplyNow(r, e, f); 635 CompletableFuture<V> d = newIncompleteFuture(); 636 unipush(new UniApply<T,V>(e, d, this, f)); 637 return d; 638 } 639 uniApplyNow( Object r, Executor e, Function<? super T,? extends V> f)640 private <V> CompletableFuture<V> uniApplyNow( 641 Object r, Executor e, Function<? super T,? extends V> f) { 642 Throwable x; 643 CompletableFuture<V> d = newIncompleteFuture(); 644 if (r instanceof AltResult) { 645 if ((x = ((AltResult)r).ex) != null) { 646 d.result = encodeThrowable(x, r); 647 return d; 648 } 649 r = null; 650 } 651 try { 652 if (e != null) { 653 e.execute(new UniApply<T,V>(null, d, this, f)); 654 } else { 655 @SuppressWarnings("unchecked") T t = (T) r; 656 d.result = d.encodeValue(f.apply(t)); 657 } 658 } catch (Throwable ex) { 659 d.result = encodeThrowable(ex); 660 } 661 return d; 662 } 663 664 @SuppressWarnings("serial") 665 static final class UniAccept<T> extends UniCompletion<T,Void> { 666 Consumer<? super T> fn; UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn)667 UniAccept(Executor executor, CompletableFuture<Void> dep, 668 CompletableFuture<T> src, Consumer<? super T> fn) { 669 super(executor, dep, src); this.fn = fn; 670 } tryFire(int mode)671 final CompletableFuture<Void> tryFire(int mode) { 672 CompletableFuture<Void> d; CompletableFuture<T> a; 673 Object r; Throwable x; Consumer<? super T> f; 674 if ((d = dep) == null || (f = fn) == null 675 || (a = src) == null || (r = a.result) == null) 676 return null; 677 tryComplete: if (d.result == null) { 678 if (r instanceof AltResult) { 679 if ((x = ((AltResult)r).ex) != null) { 680 d.completeThrowable(x, r); 681 break tryComplete; 682 } 683 r = null; 684 } 685 try { 686 if (mode <= 0 && !claim()) 687 return null; 688 else { 689 @SuppressWarnings("unchecked") T t = (T) r; 690 f.accept(t); 691 d.completeNull(); 692 } 693 } catch (Throwable ex) { 694 d.completeThrowable(ex); 695 } 696 } 697 dep = null; src = null; fn = null; 698 return d.postFire(a, mode); 699 } 700 } 701 uniAcceptStage(Executor e, Consumer<? super T> f)702 private CompletableFuture<Void> uniAcceptStage(Executor e, 703 Consumer<? super T> f) { 704 if (f == null) throw new NullPointerException(); 705 Object r; 706 if ((r = result) != null) 707 return uniAcceptNow(r, e, f); 708 CompletableFuture<Void> d = newIncompleteFuture(); 709 unipush(new UniAccept<T>(e, d, this, f)); 710 return d; 711 } 712 uniAcceptNow( Object r, Executor e, Consumer<? super T> f)713 private CompletableFuture<Void> uniAcceptNow( 714 Object r, Executor e, Consumer<? super T> f) { 715 Throwable x; 716 CompletableFuture<Void> d = newIncompleteFuture(); 717 if (r instanceof AltResult) { 718 if ((x = ((AltResult)r).ex) != null) { 719 d.result = encodeThrowable(x, r); 720 return d; 721 } 722 r = null; 723 } 724 try { 725 if (e != null) { 726 e.execute(new UniAccept<T>(null, d, this, f)); 727 } else { 728 @SuppressWarnings("unchecked") T t = (T) r; 729 f.accept(t); 730 d.result = NIL; 731 } 732 } catch (Throwable ex) { 733 d.result = encodeThrowable(ex); 734 } 735 return d; 736 } 737 738 @SuppressWarnings("serial") 739 static final class UniRun<T> extends UniCompletion<T,Void> { 740 Runnable fn; UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn)741 UniRun(Executor executor, CompletableFuture<Void> dep, 742 CompletableFuture<T> src, Runnable fn) { 743 super(executor, dep, src); this.fn = fn; 744 } tryFire(int mode)745 final CompletableFuture<Void> tryFire(int mode) { 746 CompletableFuture<Void> d; CompletableFuture<T> a; 747 Object r; Throwable x; Runnable f; 748 if ((d = dep) == null || (f = fn) == null 749 || (a = src) == null || (r = a.result) == null) 750 return null; 751 if (d.result == null) { 752 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 753 d.completeThrowable(x, r); 754 else 755 try { 756 if (mode <= 0 && !claim()) 757 return null; 758 else { 759 f.run(); 760 d.completeNull(); 761 } 762 } catch (Throwable ex) { 763 d.completeThrowable(ex); 764 } 765 } 766 dep = null; src = null; fn = null; 767 return d.postFire(a, mode); 768 } 769 } 770 uniRunStage(Executor e, Runnable f)771 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { 772 if (f == null) throw new NullPointerException(); 773 Object r; 774 if ((r = result) != null) 775 return uniRunNow(r, e, f); 776 CompletableFuture<Void> d = newIncompleteFuture(); 777 unipush(new UniRun<T>(e, d, this, f)); 778 return d; 779 } 780 uniRunNow(Object r, Executor e, Runnable f)781 private CompletableFuture<Void> uniRunNow(Object r, Executor e, Runnable f) { 782 Throwable x; 783 CompletableFuture<Void> d = newIncompleteFuture(); 784 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 785 d.result = encodeThrowable(x, r); 786 else 787 try { 788 if (e != null) { 789 e.execute(new UniRun<T>(null, d, this, f)); 790 } else { 791 f.run(); 792 d.result = NIL; 793 } 794 } catch (Throwable ex) { 795 d.result = encodeThrowable(ex); 796 } 797 return d; 798 } 799 800 @SuppressWarnings("serial") 801 static final class UniWhenComplete<T> extends UniCompletion<T,T> { 802 BiConsumer<? super T, ? super Throwable> fn; UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn)803 UniWhenComplete(Executor executor, CompletableFuture<T> dep, 804 CompletableFuture<T> src, 805 BiConsumer<? super T, ? super Throwable> fn) { 806 super(executor, dep, src); this.fn = fn; 807 } tryFire(int mode)808 final CompletableFuture<T> tryFire(int mode) { 809 CompletableFuture<T> d; CompletableFuture<T> a; 810 Object r; BiConsumer<? super T, ? super Throwable> f; 811 if ((d = dep) == null || (f = fn) == null 812 || (a = src) == null || (r = a.result) == null 813 || !d.uniWhenComplete(r, f, mode > 0 ? null : this)) 814 return null; 815 dep = null; src = null; fn = null; 816 return d.postFire(a, mode); 817 } 818 } 819 uniWhenComplete(Object r, BiConsumer<? super T,? super Throwable> f, UniWhenComplete<T> c)820 final boolean uniWhenComplete(Object r, 821 BiConsumer<? super T,? super Throwable> f, 822 UniWhenComplete<T> c) { 823 T t; Throwable x = null; 824 if (result == null) { 825 try { 826 if (c != null && !c.claim()) 827 return false; 828 if (r instanceof AltResult) { 829 x = ((AltResult)r).ex; 830 t = null; 831 } else { 832 @SuppressWarnings("unchecked") T tr = (T) r; 833 t = tr; 834 } 835 f.accept(t, x); 836 if (x == null) { 837 internalComplete(r); 838 return true; 839 } 840 } catch (Throwable ex) { 841 if (x == null) 842 x = ex; 843 else if (x != ex) 844 x.addSuppressed(ex); 845 } 846 completeThrowable(x, r); 847 } 848 return true; 849 } 850 uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f)851 private CompletableFuture<T> uniWhenCompleteStage( 852 Executor e, BiConsumer<? super T, ? super Throwable> f) { 853 if (f == null) throw new NullPointerException(); 854 CompletableFuture<T> d = newIncompleteFuture(); 855 Object r; 856 if ((r = result) == null) 857 unipush(new UniWhenComplete<T>(e, d, this, f)); 858 else if (e == null) 859 d.uniWhenComplete(r, f, null); 860 else { 861 try { 862 e.execute(new UniWhenComplete<T>(null, d, this, f)); 863 } catch (Throwable ex) { 864 d.result = encodeThrowable(ex); 865 } 866 } 867 return d; 868 } 869 870 @SuppressWarnings("serial") 871 static final class UniHandle<T,V> extends UniCompletion<T,V> { 872 BiFunction<? super T, Throwable, ? extends V> fn; UniHandle(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends V> fn)873 UniHandle(Executor executor, CompletableFuture<V> dep, 874 CompletableFuture<T> src, 875 BiFunction<? super T, Throwable, ? extends V> fn) { 876 super(executor, dep, src); this.fn = fn; 877 } tryFire(int mode)878 final CompletableFuture<V> tryFire(int mode) { 879 CompletableFuture<V> d; CompletableFuture<T> a; 880 Object r; BiFunction<? super T, Throwable, ? extends V> f; 881 if ((d = dep) == null || (f = fn) == null 882 || (a = src) == null || (r = a.result) == null 883 || !d.uniHandle(r, f, mode > 0 ? null : this)) 884 return null; 885 dep = null; src = null; fn = null; 886 return d.postFire(a, mode); 887 } 888 } 889 uniHandle(Object r, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c)890 final <S> boolean uniHandle(Object r, 891 BiFunction<? super S, Throwable, ? extends T> f, 892 UniHandle<S,T> c) { 893 S s; Throwable x; 894 if (result == null) { 895 try { 896 if (c != null && !c.claim()) 897 return false; 898 if (r instanceof AltResult) { 899 x = ((AltResult)r).ex; 900 s = null; 901 } else { 902 x = null; 903 @SuppressWarnings("unchecked") S ss = (S) r; 904 s = ss; 905 } 906 completeValue(f.apply(s, x)); 907 } catch (Throwable ex) { 908 completeThrowable(ex); 909 } 910 } 911 return true; 912 } 913 uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f)914 private <V> CompletableFuture<V> uniHandleStage( 915 Executor e, BiFunction<? super T, Throwable, ? extends V> f) { 916 if (f == null) throw new NullPointerException(); 917 CompletableFuture<V> d = newIncompleteFuture(); 918 Object r; 919 if ((r = result) == null) 920 unipush(new UniHandle<T,V>(e, d, this, f)); 921 else if (e == null) 922 d.uniHandle(r, f, null); 923 else { 924 try { 925 e.execute(new UniHandle<T,V>(null, d, this, f)); 926 } catch (Throwable ex) { 927 d.result = encodeThrowable(ex); 928 } 929 } 930 return d; 931 } 932 933 @SuppressWarnings("serial") 934 static final class UniExceptionally<T> extends UniCompletion<T,T> { 935 Function<? super Throwable, ? extends T> fn; UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn)936 UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, 937 Function<? super Throwable, ? extends T> fn) { 938 super(null, dep, src); this.fn = fn; 939 } tryFire(int mode)940 final CompletableFuture<T> tryFire(int mode) { // never ASYNC 941 // assert mode != ASYNC; 942 CompletableFuture<T> d; CompletableFuture<T> a; 943 Object r; Function<? super Throwable, ? extends T> f; 944 if ((d = dep) == null || (f = fn) == null 945 || (a = src) == null || (r = a.result) == null 946 || !d.uniExceptionally(r, f, this)) 947 return null; 948 dep = null; src = null; fn = null; 949 return d.postFire(a, mode); 950 } 951 } 952 uniExceptionally(Object r, Function<? super Throwable, ? extends T> f, UniExceptionally<T> c)953 final boolean uniExceptionally(Object r, 954 Function<? super Throwable, ? extends T> f, 955 UniExceptionally<T> c) { 956 Throwable x; 957 if (result == null) { 958 try { 959 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { 960 if (c != null && !c.claim()) 961 return false; 962 completeValue(f.apply(x)); 963 } else 964 internalComplete(r); 965 } catch (Throwable ex) { 966 completeThrowable(ex); 967 } 968 } 969 return true; 970 } 971 uniExceptionallyStage( Function<Throwable, ? extends T> f)972 private CompletableFuture<T> uniExceptionallyStage( 973 Function<Throwable, ? extends T> f) { 974 if (f == null) throw new NullPointerException(); 975 CompletableFuture<T> d = newIncompleteFuture(); 976 Object r; 977 if ((r = result) == null) 978 unipush(new UniExceptionally<T>(d, this, f)); 979 else 980 d.uniExceptionally(r, f, null); 981 return d; 982 } 983 984 @SuppressWarnings("serial") 985 static final class UniRelay<U, T extends U> extends UniCompletion<T,U> { UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src)986 UniRelay(CompletableFuture<U> dep, CompletableFuture<T> src) { 987 super(null, dep, src); 988 } tryFire(int mode)989 final CompletableFuture<U> tryFire(int mode) { 990 CompletableFuture<U> d; CompletableFuture<T> a; Object r; 991 if ((d = dep) == null 992 || (a = src) == null || (r = a.result) == null) 993 return null; 994 if (d.result == null) 995 d.completeRelay(r); 996 src = null; dep = null; 997 return d.postFire(a, mode); 998 } 999 } 1000 uniCopyStage( CompletableFuture<T> src)1001 private static <U, T extends U> CompletableFuture<U> uniCopyStage( 1002 CompletableFuture<T> src) { 1003 Object r; 1004 CompletableFuture<U> d = src.newIncompleteFuture(); 1005 if ((r = src.result) != null) 1006 d.result = encodeRelay(r); 1007 else 1008 src.unipush(new UniRelay<U,T>(d, src)); 1009 return d; 1010 } 1011 uniAsMinimalStage()1012 private MinimalStage<T> uniAsMinimalStage() { 1013 Object r; 1014 if ((r = result) != null) 1015 return new MinimalStage<T>(encodeRelay(r)); 1016 MinimalStage<T> d = new MinimalStage<T>(); 1017 unipush(new UniRelay<T,T>(d, this)); 1018 return d; 1019 } 1020 1021 @SuppressWarnings("serial") 1022 static final class UniCompose<T,V> extends UniCompletion<T,V> { 1023 Function<? super T, ? extends CompletionStage<V>> fn; UniCompose(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<V>> fn)1024 UniCompose(Executor executor, CompletableFuture<V> dep, 1025 CompletableFuture<T> src, 1026 Function<? super T, ? extends CompletionStage<V>> fn) { 1027 super(executor, dep, src); this.fn = fn; 1028 } tryFire(int mode)1029 final CompletableFuture<V> tryFire(int mode) { 1030 CompletableFuture<V> d; CompletableFuture<T> a; 1031 Function<? super T, ? extends CompletionStage<V>> f; 1032 Object r; Throwable x; 1033 if ((d = dep) == null || (f = fn) == null 1034 || (a = src) == null || (r = a.result) == null) 1035 return null; 1036 tryComplete: if (d.result == null) { 1037 if (r instanceof AltResult) { 1038 if ((x = ((AltResult)r).ex) != null) { 1039 d.completeThrowable(x, r); 1040 break tryComplete; 1041 } 1042 r = null; 1043 } 1044 try { 1045 if (mode <= 0 && !claim()) 1046 return null; 1047 @SuppressWarnings("unchecked") T t = (T) r; 1048 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1049 if ((r = g.result) != null) 1050 d.completeRelay(r); 1051 else { 1052 g.unipush(new UniRelay<V,V>(d, g)); 1053 if (d.result == null) 1054 return null; 1055 } 1056 } catch (Throwable ex) { 1057 d.completeThrowable(ex); 1058 } 1059 } 1060 dep = null; src = null; fn = null; 1061 return d.postFire(a, mode); 1062 } 1063 } 1064 uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f)1065 private <V> CompletableFuture<V> uniComposeStage( 1066 Executor e, Function<? super T, ? extends CompletionStage<V>> f) { 1067 if (f == null) throw new NullPointerException(); 1068 CompletableFuture<V> d = newIncompleteFuture(); 1069 Object r, s; Throwable x; 1070 if ((r = result) == null) 1071 unipush(new UniCompose<T,V>(e, d, this, f)); 1072 else if (e == null) { 1073 if (r instanceof AltResult) { 1074 if ((x = ((AltResult)r).ex) != null) { 1075 d.result = encodeThrowable(x, r); 1076 return d; 1077 } 1078 r = null; 1079 } 1080 try { 1081 @SuppressWarnings("unchecked") T t = (T) r; 1082 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1083 if ((s = g.result) != null) 1084 d.result = encodeRelay(s); 1085 else { 1086 g.unipush(new UniRelay<V,V>(d, g)); 1087 } 1088 } catch (Throwable ex) { 1089 d.result = encodeThrowable(ex); 1090 } 1091 } 1092 else 1093 try { 1094 e.execute(new UniCompose<T,V>(null, d, this, f)); 1095 } catch (Throwable ex) { 1096 d.result = encodeThrowable(ex); 1097 } 1098 return d; 1099 } 1100 1101 /* ------------- Two-input Completions -------------- */ 1102 1103 /** A Completion for an action with two sources */ 1104 @SuppressWarnings("serial") 1105 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { 1106 CompletableFuture<U> snd; // second source for action BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1107 BiCompletion(Executor executor, CompletableFuture<V> dep, 1108 CompletableFuture<T> src, CompletableFuture<U> snd) { 1109 super(executor, dep, src); this.snd = snd; 1110 } 1111 } 1112 1113 /** A Completion delegating to a BiCompletion */ 1114 @SuppressWarnings("serial") 1115 static final class CoCompletion extends Completion { 1116 BiCompletion<?,?,?> base; CoCompletion(BiCompletion<?,?,?> base)1117 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; } tryFire(int mode)1118 final CompletableFuture<?> tryFire(int mode) { 1119 BiCompletion<?,?,?> c; CompletableFuture<?> d; 1120 if ((c = base) == null || (d = c.tryFire(mode)) == null) 1121 return null; 1122 base = null; // detach 1123 return d; 1124 } isLive()1125 final boolean isLive() { 1126 BiCompletion<?,?,?> c; 1127 return (c = base) != null 1128 // && c.isLive() 1129 && c.dep != null; 1130 } 1131 } 1132 1133 /** 1134 * Pushes completion to this and b unless both done. 1135 * Caller should first check that either result or b.result is null. 1136 */ bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1137 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1138 if (c != null) { 1139 while (result == null) { 1140 if (tryPushStack(c)) { 1141 if (b.result == null) 1142 b.unipush(new CoCompletion(c)); 1143 else if (result != null) 1144 c.tryFire(SYNC); 1145 return; 1146 } 1147 } 1148 b.unipush(c); 1149 } 1150 } 1151 1152 /** Post-processing after successful BiCompletion tryFire. */ postFire(CompletableFuture<?> a, CompletableFuture<?> b, int mode)1153 final CompletableFuture<T> postFire(CompletableFuture<?> a, 1154 CompletableFuture<?> b, int mode) { 1155 if (b != null && b.stack != null) { // clean second source 1156 Object r; 1157 if ((r = b.result) == null) 1158 b.cleanStack(); 1159 if (mode >= 0 && (r != null || b.result != null)) 1160 b.postComplete(); 1161 } 1162 return postFire(a, mode); 1163 } 1164 1165 @SuppressWarnings("serial") 1166 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { 1167 BiFunction<? super T,? super U,? extends V> fn; BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn)1168 BiApply(Executor executor, CompletableFuture<V> dep, 1169 CompletableFuture<T> src, CompletableFuture<U> snd, 1170 BiFunction<? super T,? super U,? extends V> fn) { 1171 super(executor, dep, src, snd); this.fn = fn; 1172 } tryFire(int mode)1173 final CompletableFuture<V> tryFire(int mode) { 1174 CompletableFuture<V> d; 1175 CompletableFuture<T> a; 1176 CompletableFuture<U> b; 1177 Object r, s; BiFunction<? super T,? super U,? extends V> f; 1178 if ((d = dep) == null || (f = fn) == null 1179 || (a = src) == null || (r = a.result) == null 1180 || (b = snd) == null || (s = b.result) == null 1181 || !d.biApply(r, s, f, mode > 0 ? null : this)) 1182 return null; 1183 dep = null; src = null; snd = null; fn = null; 1184 return d.postFire(a, b, mode); 1185 } 1186 } 1187 biApply(Object r, Object s, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c)1188 final <R,S> boolean biApply(Object r, Object s, 1189 BiFunction<? super R,? super S,? extends T> f, 1190 BiApply<R,S,T> c) { 1191 Throwable x; 1192 tryComplete: if (result == null) { 1193 if (r instanceof AltResult) { 1194 if ((x = ((AltResult)r).ex) != null) { 1195 completeThrowable(x, r); 1196 break tryComplete; 1197 } 1198 r = null; 1199 } 1200 if (s instanceof AltResult) { 1201 if ((x = ((AltResult)s).ex) != null) { 1202 completeThrowable(x, s); 1203 break tryComplete; 1204 } 1205 s = null; 1206 } 1207 try { 1208 if (c != null && !c.claim()) 1209 return false; 1210 @SuppressWarnings("unchecked") R rr = (R) r; 1211 @SuppressWarnings("unchecked") S ss = (S) s; 1212 completeValue(f.apply(rr, ss)); 1213 } catch (Throwable ex) { 1214 completeThrowable(ex); 1215 } 1216 } 1217 return true; 1218 } 1219 biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f)1220 private <U,V> CompletableFuture<V> biApplyStage( 1221 Executor e, CompletionStage<U> o, 1222 BiFunction<? super T,? super U,? extends V> f) { 1223 CompletableFuture<U> b; Object r, s; 1224 if (f == null || (b = o.toCompletableFuture()) == null) 1225 throw new NullPointerException(); 1226 CompletableFuture<V> d = newIncompleteFuture(); 1227 if ((r = result) == null || (s = b.result) == null) 1228 bipush(b, new BiApply<T,U,V>(e, d, this, b, f)); 1229 else if (e == null) 1230 d.biApply(r, s, f, null); 1231 else 1232 try { 1233 e.execute(new BiApply<T,U,V>(null, d, this, b, f)); 1234 } catch (Throwable ex) { 1235 d.result = encodeThrowable(ex); 1236 } 1237 return d; 1238 } 1239 1240 @SuppressWarnings("serial") 1241 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> { 1242 BiConsumer<? super T,? super U> fn; BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiConsumer<? super T,? super U> fn)1243 BiAccept(Executor executor, CompletableFuture<Void> dep, 1244 CompletableFuture<T> src, CompletableFuture<U> snd, 1245 BiConsumer<? super T,? super U> fn) { 1246 super(executor, dep, src, snd); this.fn = fn; 1247 } tryFire(int mode)1248 final CompletableFuture<Void> tryFire(int mode) { 1249 CompletableFuture<Void> d; 1250 CompletableFuture<T> a; 1251 CompletableFuture<U> b; 1252 Object r, s; BiConsumer<? super T,? super U> f; 1253 if ((d = dep) == null || (f = fn) == null 1254 || (a = src) == null || (r = a.result) == null 1255 || (b = snd) == null || (s = b.result) == null 1256 || !d.biAccept(r, s, f, mode > 0 ? null : this)) 1257 return null; 1258 dep = null; src = null; snd = null; fn = null; 1259 return d.postFire(a, b, mode); 1260 } 1261 } 1262 biAccept(Object r, Object s, BiConsumer<? super R,? super S> f, BiAccept<R,S> c)1263 final <R,S> boolean biAccept(Object r, Object s, 1264 BiConsumer<? super R,? super S> f, 1265 BiAccept<R,S> c) { 1266 Throwable x; 1267 tryComplete: if (result == null) { 1268 if (r instanceof AltResult) { 1269 if ((x = ((AltResult)r).ex) != null) { 1270 completeThrowable(x, r); 1271 break tryComplete; 1272 } 1273 r = null; 1274 } 1275 if (s instanceof AltResult) { 1276 if ((x = ((AltResult)s).ex) != null) { 1277 completeThrowable(x, s); 1278 break tryComplete; 1279 } 1280 s = null; 1281 } 1282 try { 1283 if (c != null && !c.claim()) 1284 return false; 1285 @SuppressWarnings("unchecked") R rr = (R) r; 1286 @SuppressWarnings("unchecked") S ss = (S) s; 1287 f.accept(rr, ss); 1288 completeNull(); 1289 } catch (Throwable ex) { 1290 completeThrowable(ex); 1291 } 1292 } 1293 return true; 1294 } 1295 biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f)1296 private <U> CompletableFuture<Void> biAcceptStage( 1297 Executor e, CompletionStage<U> o, 1298 BiConsumer<? super T,? super U> f) { 1299 CompletableFuture<U> b; Object r, s; 1300 if (f == null || (b = o.toCompletableFuture()) == null) 1301 throw new NullPointerException(); 1302 CompletableFuture<Void> d = newIncompleteFuture(); 1303 if ((r = result) == null || (s = b.result) == null) 1304 bipush(b, new BiAccept<T,U>(e, d, this, b, f)); 1305 else if (e == null) 1306 d.biAccept(r, s, f, null); 1307 else 1308 try { 1309 e.execute(new BiAccept<T,U>(null, d, this, b, f)); 1310 } catch (Throwable ex) { 1311 d.result = encodeThrowable(ex); 1312 } 1313 return d; 1314 } 1315 1316 @SuppressWarnings("serial") 1317 static final class BiRun<T,U> extends BiCompletion<T,U,Void> { 1318 Runnable fn; BiRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1319 BiRun(Executor executor, CompletableFuture<Void> dep, 1320 CompletableFuture<T> src, CompletableFuture<U> snd, 1321 Runnable fn) { 1322 super(executor, dep, src, snd); this.fn = fn; 1323 } tryFire(int mode)1324 final CompletableFuture<Void> tryFire(int mode) { 1325 CompletableFuture<Void> d; 1326 CompletableFuture<T> a; 1327 CompletableFuture<U> b; 1328 Object r, s; Runnable f; 1329 if ((d = dep) == null || (f = fn) == null 1330 || (a = src) == null || (r = a.result) == null 1331 || (b = snd) == null || (s = b.result) == null 1332 || !d.biRun(r, s, f, mode > 0 ? null : this)) 1333 return null; 1334 dep = null; src = null; snd = null; fn = null; 1335 return d.postFire(a, b, mode); 1336 } 1337 } 1338 biRun(Object r, Object s, Runnable f, BiRun<?,?> c)1339 final boolean biRun(Object r, Object s, Runnable f, BiRun<?,?> c) { 1340 Throwable x; Object z; 1341 if (result == null) { 1342 if ((r instanceof AltResult 1343 && (x = ((AltResult)(z = r)).ex) != null) || 1344 (s instanceof AltResult 1345 && (x = ((AltResult)(z = s)).ex) != null)) 1346 completeThrowable(x, z); 1347 else 1348 try { 1349 if (c != null && !c.claim()) 1350 return false; 1351 f.run(); 1352 completeNull(); 1353 } catch (Throwable ex) { 1354 completeThrowable(ex); 1355 } 1356 } 1357 return true; 1358 } 1359 biRunStage(Executor e, CompletionStage<?> o, Runnable f)1360 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, 1361 Runnable f) { 1362 CompletableFuture<?> b; Object r, s; 1363 if (f == null || (b = o.toCompletableFuture()) == null) 1364 throw new NullPointerException(); 1365 CompletableFuture<Void> d = newIncompleteFuture(); 1366 if ((r = result) == null || (s = b.result) == null) 1367 bipush(b, new BiRun<>(e, d, this, b, f)); 1368 else if (e == null) 1369 d.biRun(r, s, f, null); 1370 else 1371 try { 1372 e.execute(new BiRun<>(null, d, this, b, f)); 1373 } catch (Throwable ex) { 1374 d.result = encodeThrowable(ex); 1375 } 1376 return d; 1377 } 1378 1379 @SuppressWarnings("serial") 1380 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1381 BiRelay(CompletableFuture<Void> dep, 1382 CompletableFuture<T> src, CompletableFuture<U> snd) { 1383 super(null, dep, src, snd); 1384 } tryFire(int mode)1385 final CompletableFuture<Void> tryFire(int mode) { 1386 CompletableFuture<Void> d; 1387 CompletableFuture<T> a; 1388 CompletableFuture<U> b; 1389 Object r, s, z; Throwable x; 1390 if ((d = dep) == null 1391 || (a = src) == null || (r = a.result) == null 1392 || (b = snd) == null || (s = b.result) == null) 1393 return null; 1394 if (d.result == null) { 1395 if ((r instanceof AltResult 1396 && (x = ((AltResult)(z = r)).ex) != null) || 1397 (s instanceof AltResult 1398 && (x = ((AltResult)(z = s)).ex) != null)) 1399 d.completeThrowable(x, z); 1400 else 1401 d.completeNull(); 1402 } 1403 src = null; snd = null; dep = null; 1404 return d.postFire(a, b, mode); 1405 } 1406 } 1407 1408 /** Recursively constructs a tree of completions. */ andTree(CompletableFuture<?>[] cfs, int lo, int hi)1409 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, 1410 int lo, int hi) { 1411 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1412 if (lo > hi) // empty 1413 d.result = NIL; 1414 else { 1415 CompletableFuture<?> a, b; Object r, s, z; Throwable x; 1416 int mid = (lo + hi) >>> 1; 1417 if ((a = (lo == mid ? cfs[lo] : 1418 andTree(cfs, lo, mid))) == null || 1419 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1420 andTree(cfs, mid+1, hi))) == null) 1421 throw new NullPointerException(); 1422 if ((r = a.result) == null || (s = b.result) == null) 1423 a.bipush(b, new BiRelay<>(d, a, b)); 1424 else if ((r instanceof AltResult 1425 && (x = ((AltResult)(z = r)).ex) != null) || 1426 (s instanceof AltResult 1427 && (x = ((AltResult)(z = s)).ex) != null)) 1428 d.result = encodeThrowable(x, z); 1429 else 1430 d.result = NIL; 1431 } 1432 return d; 1433 } 1434 1435 /* ------------- Projected (Ored) BiCompletions -------------- */ 1436 1437 /** 1438 * Pushes completion to this and b unless either done. 1439 * Caller should first check that result and b.result are both null. 1440 */ orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1441 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1442 if (c != null) { 1443 while (!tryPushStack(c)) { 1444 if (result != null) { 1445 NEXT.set(c, null); 1446 break; 1447 } 1448 } 1449 if (result != null) 1450 c.tryFire(SYNC); 1451 else 1452 b.unipush(new CoCompletion(c)); 1453 } 1454 } 1455 1456 @SuppressWarnings("serial") 1457 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { 1458 Function<? super T,? extends V> fn; OrApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Function<? super T,? extends V> fn)1459 OrApply(Executor executor, CompletableFuture<V> dep, 1460 CompletableFuture<T> src, CompletableFuture<U> snd, 1461 Function<? super T,? extends V> fn) { 1462 super(executor, dep, src, snd); this.fn = fn; 1463 } tryFire(int mode)1464 final CompletableFuture<V> tryFire(int mode) { 1465 CompletableFuture<V> d; 1466 CompletableFuture<T> a; 1467 CompletableFuture<U> b; 1468 Object r; Throwable x; Function<? super T,? extends V> f; 1469 if ((d = dep) == null || (f = fn) == null 1470 || (a = src) == null || (b = snd) == null 1471 || ((r = a.result) == null && (r = b.result) == null)) 1472 return null; 1473 tryComplete: if (d.result == null) { 1474 try { 1475 if (mode <= 0 && !claim()) 1476 return null; 1477 if (r instanceof AltResult) { 1478 if ((x = ((AltResult)r).ex) != null) { 1479 d.completeThrowable(x, r); 1480 break tryComplete; 1481 } 1482 r = null; 1483 } 1484 @SuppressWarnings("unchecked") T t = (T) r; 1485 d.completeValue(f.apply(t)); 1486 } catch (Throwable ex) { 1487 d.completeThrowable(ex); 1488 } 1489 } 1490 dep = null; src = null; snd = null; fn = null; 1491 return d.postFire(a, b, mode); 1492 } 1493 } 1494 orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f)1495 private <U extends T,V> CompletableFuture<V> orApplyStage( 1496 Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) { 1497 CompletableFuture<U> b; 1498 if (f == null || (b = o.toCompletableFuture()) == null) 1499 throw new NullPointerException(); 1500 1501 Object r; CompletableFuture<? extends T> z; 1502 if ((r = (z = this).result) != null || 1503 (r = (z = b).result) != null) 1504 return z.uniApplyNow(r, e, f); 1505 1506 CompletableFuture<V> d = newIncompleteFuture(); 1507 orpush(b, new OrApply<T,U,V>(e, d, this, b, f)); 1508 return d; 1509 } 1510 1511 @SuppressWarnings("serial") 1512 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { 1513 Consumer<? super T> fn; OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn)1514 OrAccept(Executor executor, CompletableFuture<Void> dep, 1515 CompletableFuture<T> src, CompletableFuture<U> snd, 1516 Consumer<? super T> fn) { 1517 super(executor, dep, src, snd); this.fn = fn; 1518 } tryFire(int mode)1519 final CompletableFuture<Void> tryFire(int mode) { 1520 CompletableFuture<Void> d; 1521 CompletableFuture<T> a; 1522 CompletableFuture<U> b; 1523 Object r; Throwable x; Consumer<? super T> f; 1524 if ((d = dep) == null || (f = fn) == null 1525 || (a = src) == null || (b = snd) == null 1526 || ((r = a.result) == null && (r = b.result) == null)) 1527 return null; 1528 tryComplete: if (d.result == null) { 1529 try { 1530 if (mode <= 0 && !claim()) 1531 return null; 1532 if (r instanceof AltResult) { 1533 if ((x = ((AltResult)r).ex) != null) { 1534 d.completeThrowable(x, r); 1535 break tryComplete; 1536 } 1537 r = null; 1538 } 1539 @SuppressWarnings("unchecked") T t = (T) r; 1540 f.accept(t); 1541 d.completeNull(); 1542 } catch (Throwable ex) { 1543 d.completeThrowable(ex); 1544 } 1545 } 1546 dep = null; src = null; snd = null; fn = null; 1547 return d.postFire(a, b, mode); 1548 } 1549 } 1550 orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f)1551 private <U extends T> CompletableFuture<Void> orAcceptStage( 1552 Executor e, CompletionStage<U> o, Consumer<? super T> f) { 1553 CompletableFuture<U> b; 1554 if (f == null || (b = o.toCompletableFuture()) == null) 1555 throw new NullPointerException(); 1556 1557 Object r; CompletableFuture<? extends T> z; 1558 if ((r = (z = this).result) != null || 1559 (r = (z = b).result) != null) 1560 return z.uniAcceptNow(r, e, f); 1561 1562 CompletableFuture<Void> d = newIncompleteFuture(); 1563 orpush(b, new OrAccept<T,U>(e, d, this, b, f)); 1564 return d; 1565 } 1566 1567 @SuppressWarnings("serial") 1568 static final class OrRun<T,U> extends BiCompletion<T,U,Void> { 1569 Runnable fn; OrRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1570 OrRun(Executor executor, CompletableFuture<Void> dep, 1571 CompletableFuture<T> src, CompletableFuture<U> snd, 1572 Runnable fn) { 1573 super(executor, dep, src, snd); this.fn = fn; 1574 } tryFire(int mode)1575 final CompletableFuture<Void> tryFire(int mode) { 1576 CompletableFuture<Void> d; 1577 CompletableFuture<T> a; 1578 CompletableFuture<U> b; 1579 Object r; Throwable x; Runnable f; 1580 if ((d = dep) == null || (f = fn) == null 1581 || (a = src) == null || (b = snd) == null 1582 || ((r = a.result) == null && (r = b.result) == null)) 1583 return null; 1584 if (d.result == null) { 1585 try { 1586 if (mode <= 0 && !claim()) 1587 return null; 1588 else if (r instanceof AltResult 1589 && (x = ((AltResult)r).ex) != null) 1590 d.completeThrowable(x, r); 1591 else { 1592 f.run(); 1593 d.completeNull(); 1594 } 1595 } catch (Throwable ex) { 1596 d.completeThrowable(ex); 1597 } 1598 } 1599 dep = null; src = null; snd = null; fn = null; 1600 return d.postFire(a, b, mode); 1601 } 1602 } 1603 orRunStage(Executor e, CompletionStage<?> o, Runnable f)1604 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, 1605 Runnable f) { 1606 CompletableFuture<?> b; 1607 if (f == null || (b = o.toCompletableFuture()) == null) 1608 throw new NullPointerException(); 1609 1610 Object r; CompletableFuture<?> z; 1611 if ((r = (z = this).result) != null || 1612 (r = (z = b).result) != null) 1613 return z.uniRunNow(r, e, f); 1614 1615 CompletableFuture<Void> d = newIncompleteFuture(); 1616 orpush(b, new OrRun<>(e, d, this, b, f)); 1617 return d; 1618 } 1619 1620 /** Completion for an anyOf input future. */ 1621 @SuppressWarnings("serial") 1622 static class AnyOf extends Completion { 1623 CompletableFuture<Object> dep; CompletableFuture<?> src; 1624 CompletableFuture<?>[] srcs; AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, CompletableFuture<?>[] srcs)1625 AnyOf(CompletableFuture<Object> dep, CompletableFuture<?> src, 1626 CompletableFuture<?>[] srcs) { 1627 this.dep = dep; this.src = src; this.srcs = srcs; 1628 } tryFire(int mode)1629 final CompletableFuture<Object> tryFire(int mode) { 1630 // assert mode != ASYNC; 1631 CompletableFuture<Object> d; CompletableFuture<?> a; 1632 CompletableFuture<?>[] as; 1633 Object r; 1634 if ((d = dep) == null 1635 || (a = src) == null || (r = a.result) == null 1636 || (as = srcs) == null) 1637 return null; 1638 dep = null; src = null; srcs = null; 1639 if (d.completeRelay(r)) { 1640 for (CompletableFuture<?> b : as) 1641 if (b != a) 1642 b.cleanStack(); 1643 if (mode < 0) 1644 return d; 1645 else 1646 d.postComplete(); 1647 } 1648 return null; 1649 } isLive()1650 final boolean isLive() { 1651 CompletableFuture<Object> d; 1652 return (d = dep) != null && d.result == null; 1653 } 1654 } 1655 1656 /* ------------- Zero-input Async forms -------------- */ 1657 1658 @SuppressWarnings("serial") 1659 static final class AsyncSupply<T> extends ForkJoinTask<Void> 1660 implements Runnable, AsynchronousCompletionTask { 1661 CompletableFuture<T> dep; Supplier<? extends T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn)1662 AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) { 1663 this.dep = dep; this.fn = fn; 1664 } 1665 getRawResult()1666 public final Void getRawResult() { return null; } setRawResult(Void v)1667 public final void setRawResult(Void v) {} exec()1668 public final boolean exec() { run(); return false; } 1669 run()1670 public void run() { 1671 CompletableFuture<T> d; Supplier<? extends T> f; 1672 if ((d = dep) != null && (f = fn) != null) { 1673 dep = null; fn = null; 1674 if (d.result == null) { 1675 try { 1676 d.completeValue(f.get()); 1677 } catch (Throwable ex) { 1678 d.completeThrowable(ex); 1679 } 1680 } 1681 d.postComplete(); 1682 } 1683 } 1684 } 1685 asyncSupplyStage(Executor e, Supplier<U> f)1686 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 1687 Supplier<U> f) { 1688 if (f == null) throw new NullPointerException(); 1689 CompletableFuture<U> d = new CompletableFuture<U>(); 1690 e.execute(new AsyncSupply<U>(d, f)); 1691 return d; 1692 } 1693 1694 @SuppressWarnings("serial") 1695 static final class AsyncRun extends ForkJoinTask<Void> 1696 implements Runnable, AsynchronousCompletionTask { 1697 CompletableFuture<Void> dep; Runnable fn; AsyncRun(CompletableFuture<Void> dep, Runnable fn)1698 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { 1699 this.dep = dep; this.fn = fn; 1700 } 1701 getRawResult()1702 public final Void getRawResult() { return null; } setRawResult(Void v)1703 public final void setRawResult(Void v) {} exec()1704 public final boolean exec() { run(); return false; } 1705 run()1706 public void run() { 1707 CompletableFuture<Void> d; Runnable f; 1708 if ((d = dep) != null && (f = fn) != null) { 1709 dep = null; fn = null; 1710 if (d.result == null) { 1711 try { 1712 f.run(); 1713 d.completeNull(); 1714 } catch (Throwable ex) { 1715 d.completeThrowable(ex); 1716 } 1717 } 1718 d.postComplete(); 1719 } 1720 } 1721 } 1722 asyncRunStage(Executor e, Runnable f)1723 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { 1724 if (f == null) throw new NullPointerException(); 1725 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1726 e.execute(new AsyncRun(d, f)); 1727 return d; 1728 } 1729 1730 /* ------------- Signallers -------------- */ 1731 1732 /** 1733 * Completion for recording and releasing a waiting thread. This 1734 * class implements ManagedBlocker to avoid starvation when 1735 * blocking actions pile up in ForkJoinPools. 1736 */ 1737 @SuppressWarnings("serial") 1738 static final class Signaller extends Completion 1739 implements ForkJoinPool.ManagedBlocker { 1740 long nanos; // remaining wait time if timed 1741 final long deadline; // non-zero if timed 1742 final boolean interruptible; 1743 boolean interrupted; 1744 volatile Thread thread; 1745 Signaller(boolean interruptible, long nanos, long deadline)1746 Signaller(boolean interruptible, long nanos, long deadline) { 1747 this.thread = Thread.currentThread(); 1748 this.interruptible = interruptible; 1749 this.nanos = nanos; 1750 this.deadline = deadline; 1751 } tryFire(int ignore)1752 final CompletableFuture<?> tryFire(int ignore) { 1753 Thread w; // no need to atomically claim 1754 if ((w = thread) != null) { 1755 thread = null; 1756 LockSupport.unpark(w); 1757 } 1758 return null; 1759 } isReleasable()1760 public boolean isReleasable() { 1761 if (Thread.interrupted()) 1762 interrupted = true; 1763 return ((interrupted && interruptible) || 1764 (deadline != 0L && 1765 (nanos <= 0L || 1766 (nanos = deadline - System.nanoTime()) <= 0L)) || 1767 thread == null); 1768 } block()1769 public boolean block() { 1770 while (!isReleasable()) { 1771 if (deadline == 0L) 1772 LockSupport.park(this); 1773 else 1774 LockSupport.parkNanos(this, nanos); 1775 } 1776 return true; 1777 } isLive()1778 final boolean isLive() { return thread != null; } 1779 } 1780 1781 /** 1782 * Returns raw result after waiting, or null if interruptible and 1783 * interrupted. 1784 */ waitingGet(boolean interruptible)1785 private Object waitingGet(boolean interruptible) { 1786 Signaller q = null; 1787 boolean queued = false; 1788 Object r; 1789 while ((r = result) == null) { 1790 if (q == null) { 1791 q = new Signaller(interruptible, 0L, 0L); 1792 if (Thread.currentThread() instanceof ForkJoinWorkerThread) 1793 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1794 } 1795 else if (!queued) 1796 queued = tryPushStack(q); 1797 else { 1798 try { 1799 ForkJoinPool.managedBlock(q); 1800 } catch (InterruptedException ie) { // currently cannot happen 1801 q.interrupted = true; 1802 } 1803 if (q.interrupted && interruptible) 1804 break; 1805 } 1806 } 1807 if (q != null && queued) { 1808 q.thread = null; 1809 if (!interruptible && q.interrupted) 1810 Thread.currentThread().interrupt(); 1811 if (r == null) 1812 cleanStack(); 1813 } 1814 if (r != null || (r = result) != null) 1815 postComplete(); 1816 return r; 1817 } 1818 1819 /** 1820 * Returns raw result after waiting, or null if interrupted, or 1821 * throws TimeoutException on timeout. 1822 */ timedGet(long nanos)1823 private Object timedGet(long nanos) throws TimeoutException { 1824 if (Thread.interrupted()) 1825 return null; 1826 if (nanos > 0L) { 1827 long d = System.nanoTime() + nanos; 1828 long deadline = (d == 0L) ? 1L : d; // avoid 0 1829 Signaller q = null; 1830 boolean queued = false; 1831 Object r; 1832 while ((r = result) == null) { // similar to untimed 1833 if (q == null) { 1834 q = new Signaller(true, nanos, deadline); 1835 if (Thread.currentThread() instanceof ForkJoinWorkerThread) 1836 ForkJoinPool.helpAsyncBlocker(defaultExecutor(), q); 1837 } 1838 else if (!queued) 1839 queued = tryPushStack(q); 1840 else if (q.nanos <= 0L) 1841 break; 1842 else { 1843 try { 1844 ForkJoinPool.managedBlock(q); 1845 } catch (InterruptedException ie) { 1846 q.interrupted = true; 1847 } 1848 if (q.interrupted) 1849 break; 1850 } 1851 } 1852 if (q != null && queued) { 1853 q.thread = null; 1854 if (r == null) 1855 cleanStack(); 1856 } 1857 if (r != null || (r = result) != null) 1858 postComplete(); 1859 if (r != null || (q != null && q.interrupted)) 1860 return r; 1861 } 1862 throw new TimeoutException(); 1863 } 1864 1865 /* ------------- public methods -------------- */ 1866 1867 /** 1868 * Creates a new incomplete CompletableFuture. 1869 */ CompletableFuture()1870 public CompletableFuture() { 1871 } 1872 1873 /** 1874 * Creates a new complete CompletableFuture with given encoded result. 1875 */ CompletableFuture(Object r)1876 CompletableFuture(Object r) { 1877 this.result = r; 1878 } 1879 1880 /** 1881 * Returns a new CompletableFuture that is asynchronously completed 1882 * by a task running in the {@link ForkJoinPool#commonPool()} with 1883 * the value obtained by calling the given Supplier. 1884 * 1885 * @param supplier a function returning the value to be used 1886 * to complete the returned CompletableFuture 1887 * @param <U> the function's return type 1888 * @return the new CompletableFuture 1889 */ supplyAsync(Supplier<U> supplier)1890 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 1891 return asyncSupplyStage(ASYNC_POOL, supplier); 1892 } 1893 1894 /** 1895 * Returns a new CompletableFuture that is asynchronously completed 1896 * by a task running in the given executor with the value obtained 1897 * by calling the given Supplier. 1898 * 1899 * @param supplier a function returning the value to be used 1900 * to complete the returned CompletableFuture 1901 * @param executor the executor to use for asynchronous execution 1902 * @param <U> the function's return type 1903 * @return the new CompletableFuture 1904 */ supplyAsync(Supplier<U> supplier, Executor executor)1905 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 1906 Executor executor) { 1907 return asyncSupplyStage(screenExecutor(executor), supplier); 1908 } 1909 1910 /** 1911 * Returns a new CompletableFuture that is asynchronously completed 1912 * by a task running in the {@link ForkJoinPool#commonPool()} after 1913 * it runs the given action. 1914 * 1915 * @param runnable the action to run before completing the 1916 * returned CompletableFuture 1917 * @return the new CompletableFuture 1918 */ runAsync(Runnable runnable)1919 public static CompletableFuture<Void> runAsync(Runnable runnable) { 1920 return asyncRunStage(ASYNC_POOL, runnable); 1921 } 1922 1923 /** 1924 * Returns a new CompletableFuture that is asynchronously completed 1925 * by a task running in the given executor after it runs the given 1926 * action. 1927 * 1928 * @param runnable the action to run before completing the 1929 * returned CompletableFuture 1930 * @param executor the executor to use for asynchronous execution 1931 * @return the new CompletableFuture 1932 */ runAsync(Runnable runnable, Executor executor)1933 public static CompletableFuture<Void> runAsync(Runnable runnable, 1934 Executor executor) { 1935 return asyncRunStage(screenExecutor(executor), runnable); 1936 } 1937 1938 /** 1939 * Returns a new CompletableFuture that is already completed with 1940 * the given value. 1941 * 1942 * @param value the value 1943 * @param <U> the type of the value 1944 * @return the completed CompletableFuture 1945 */ completedFuture(U value)1946 public static <U> CompletableFuture<U> completedFuture(U value) { 1947 return new CompletableFuture<U>((value == null) ? NIL : value); 1948 } 1949 1950 /** 1951 * Returns {@code true} if completed in any fashion: normally, 1952 * exceptionally, or via cancellation. 1953 * 1954 * @return {@code true} if completed 1955 */ isDone()1956 public boolean isDone() { 1957 return result != null; 1958 } 1959 1960 /** 1961 * Waits if necessary for this future to complete, and then 1962 * returns its result. 1963 * 1964 * @return the result value 1965 * @throws CancellationException if this future was cancelled 1966 * @throws ExecutionException if this future completed exceptionally 1967 * @throws InterruptedException if the current thread was interrupted 1968 * while waiting 1969 */ 1970 @SuppressWarnings("unchecked") get()1971 public T get() throws InterruptedException, ExecutionException { 1972 Object r; 1973 if ((r = result) == null) 1974 r = waitingGet(true); 1975 return (T) reportGet(r); 1976 } 1977 1978 /** 1979 * Waits if necessary for at most the given time for this future 1980 * to complete, and then returns its result, if available. 1981 * 1982 * @param timeout the maximum time to wait 1983 * @param unit the time unit of the timeout argument 1984 * @return the result value 1985 * @throws CancellationException if this future was cancelled 1986 * @throws ExecutionException if this future completed exceptionally 1987 * @throws InterruptedException if the current thread was interrupted 1988 * while waiting 1989 * @throws TimeoutException if the wait timed out 1990 */ 1991 @SuppressWarnings("unchecked") get(long timeout, TimeUnit unit)1992 public T get(long timeout, TimeUnit unit) 1993 throws InterruptedException, ExecutionException, TimeoutException { 1994 long nanos = unit.toNanos(timeout); 1995 Object r; 1996 if ((r = result) == null) 1997 r = timedGet(nanos); 1998 return (T) reportGet(r); 1999 } 2000 2001 /** 2002 * Returns the result value when complete, or throws an 2003 * (unchecked) exception if completed exceptionally. To better 2004 * conform with the use of common functional forms, if a 2005 * computation involved in the completion of this 2006 * CompletableFuture threw an exception, this method throws an 2007 * (unchecked) {@link CompletionException} with the underlying 2008 * exception as its cause. 2009 * 2010 * @return the result value 2011 * @throws CancellationException if the computation was cancelled 2012 * @throws CompletionException if this future completed 2013 * exceptionally or a completion computation threw an exception 2014 */ 2015 @SuppressWarnings("unchecked") join()2016 public T join() { 2017 Object r; 2018 if ((r = result) == null) 2019 r = waitingGet(false); 2020 return (T) reportJoin(r); 2021 } 2022 2023 /** 2024 * Returns the result value (or throws any encountered exception) 2025 * if completed, else returns the given valueIfAbsent. 2026 * 2027 * @param valueIfAbsent the value to return if not completed 2028 * @return the result value, if completed, else the given valueIfAbsent 2029 * @throws CancellationException if the computation was cancelled 2030 * @throws CompletionException if this future completed 2031 * exceptionally or a completion computation threw an exception 2032 */ 2033 @SuppressWarnings("unchecked") getNow(T valueIfAbsent)2034 public T getNow(T valueIfAbsent) { 2035 Object r; 2036 return ((r = result) == null) ? valueIfAbsent : (T) reportJoin(r); 2037 } 2038 2039 /** 2040 * If not already completed, sets the value returned by {@link 2041 * #get()} and related methods to the given value. 2042 * 2043 * @param value the result value 2044 * @return {@code true} if this invocation caused this CompletableFuture 2045 * to transition to a completed state, else {@code false} 2046 */ complete(T value)2047 public boolean complete(T value) { 2048 boolean triggered = completeValue(value); 2049 postComplete(); 2050 return triggered; 2051 } 2052 2053 /** 2054 * If not already completed, causes invocations of {@link #get()} 2055 * and related methods to throw the given exception. 2056 * 2057 * @param ex the exception 2058 * @return {@code true} if this invocation caused this CompletableFuture 2059 * to transition to a completed state, else {@code false} 2060 */ completeExceptionally(Throwable ex)2061 public boolean completeExceptionally(Throwable ex) { 2062 if (ex == null) throw new NullPointerException(); 2063 boolean triggered = internalComplete(new AltResult(ex)); 2064 postComplete(); 2065 return triggered; 2066 } 2067 thenApply( Function<? super T,? extends U> fn)2068 public <U> CompletableFuture<U> thenApply( 2069 Function<? super T,? extends U> fn) { 2070 return uniApplyStage(null, fn); 2071 } 2072 thenApplyAsync( Function<? super T,? extends U> fn)2073 public <U> CompletableFuture<U> thenApplyAsync( 2074 Function<? super T,? extends U> fn) { 2075 return uniApplyStage(defaultExecutor(), fn); 2076 } 2077 thenApplyAsync( Function<? super T,? extends U> fn, Executor executor)2078 public <U> CompletableFuture<U> thenApplyAsync( 2079 Function<? super T,? extends U> fn, Executor executor) { 2080 return uniApplyStage(screenExecutor(executor), fn); 2081 } 2082 thenAccept(Consumer<? super T> action)2083 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { 2084 return uniAcceptStage(null, action); 2085 } 2086 thenAcceptAsync(Consumer<? super T> action)2087 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { 2088 return uniAcceptStage(defaultExecutor(), action); 2089 } 2090 thenAcceptAsync(Consumer<? super T> action, Executor executor)2091 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, 2092 Executor executor) { 2093 return uniAcceptStage(screenExecutor(executor), action); 2094 } 2095 thenRun(Runnable action)2096 public CompletableFuture<Void> thenRun(Runnable action) { 2097 return uniRunStage(null, action); 2098 } 2099 thenRunAsync(Runnable action)2100 public CompletableFuture<Void> thenRunAsync(Runnable action) { 2101 return uniRunStage(defaultExecutor(), action); 2102 } 2103 thenRunAsync(Runnable action, Executor executor)2104 public CompletableFuture<Void> thenRunAsync(Runnable action, 2105 Executor executor) { 2106 return uniRunStage(screenExecutor(executor), action); 2107 } 2108 thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2109 public <U,V> CompletableFuture<V> thenCombine( 2110 CompletionStage<? extends U> other, 2111 BiFunction<? super T,? super U,? extends V> fn) { 2112 return biApplyStage(null, other, fn); 2113 } 2114 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2115 public <U,V> CompletableFuture<V> thenCombineAsync( 2116 CompletionStage<? extends U> other, 2117 BiFunction<? super T,? super U,? extends V> fn) { 2118 return biApplyStage(defaultExecutor(), other, fn); 2119 } 2120 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)2121 public <U,V> CompletableFuture<V> thenCombineAsync( 2122 CompletionStage<? extends U> other, 2123 BiFunction<? super T,? super U,? extends V> fn, Executor executor) { 2124 return biApplyStage(screenExecutor(executor), other, fn); 2125 } 2126 thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2127 public <U> CompletableFuture<Void> thenAcceptBoth( 2128 CompletionStage<? extends U> other, 2129 BiConsumer<? super T, ? super U> action) { 2130 return biAcceptStage(null, other, action); 2131 } 2132 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2133 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2134 CompletionStage<? extends U> other, 2135 BiConsumer<? super T, ? super U> action) { 2136 return biAcceptStage(defaultExecutor(), other, action); 2137 } 2138 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)2139 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2140 CompletionStage<? extends U> other, 2141 BiConsumer<? super T, ? super U> action, Executor executor) { 2142 return biAcceptStage(screenExecutor(executor), other, action); 2143 } 2144 runAfterBoth(CompletionStage<?> other, Runnable action)2145 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, 2146 Runnable action) { 2147 return biRunStage(null, other, action); 2148 } 2149 runAfterBothAsync(CompletionStage<?> other, Runnable action)2150 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2151 Runnable action) { 2152 return biRunStage(defaultExecutor(), other, action); 2153 } 2154 runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)2155 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2156 Runnable action, 2157 Executor executor) { 2158 return biRunStage(screenExecutor(executor), other, action); 2159 } 2160 applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)2161 public <U> CompletableFuture<U> applyToEither( 2162 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2163 return orApplyStage(null, other, fn); 2164 } 2165 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn)2166 public <U> CompletableFuture<U> applyToEitherAsync( 2167 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2168 return orApplyStage(defaultExecutor(), other, fn); 2169 } 2170 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)2171 public <U> CompletableFuture<U> applyToEitherAsync( 2172 CompletionStage<? extends T> other, Function<? super T, U> fn, 2173 Executor executor) { 2174 return orApplyStage(screenExecutor(executor), other, fn); 2175 } 2176 acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)2177 public CompletableFuture<Void> acceptEither( 2178 CompletionStage<? extends T> other, Consumer<? super T> action) { 2179 return orAcceptStage(null, other, action); 2180 } 2181 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)2182 public CompletableFuture<Void> acceptEitherAsync( 2183 CompletionStage<? extends T> other, Consumer<? super T> action) { 2184 return orAcceptStage(defaultExecutor(), other, action); 2185 } 2186 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)2187 public CompletableFuture<Void> acceptEitherAsync( 2188 CompletionStage<? extends T> other, Consumer<? super T> action, 2189 Executor executor) { 2190 return orAcceptStage(screenExecutor(executor), other, action); 2191 } 2192 runAfterEither(CompletionStage<?> other, Runnable action)2193 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, 2194 Runnable action) { 2195 return orRunStage(null, other, action); 2196 } 2197 runAfterEitherAsync(CompletionStage<?> other, Runnable action)2198 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2199 Runnable action) { 2200 return orRunStage(defaultExecutor(), other, action); 2201 } 2202 runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)2203 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2204 Runnable action, 2205 Executor executor) { 2206 return orRunStage(screenExecutor(executor), other, action); 2207 } 2208 thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)2209 public <U> CompletableFuture<U> thenCompose( 2210 Function<? super T, ? extends CompletionStage<U>> fn) { 2211 return uniComposeStage(null, fn); 2212 } 2213 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)2214 public <U> CompletableFuture<U> thenComposeAsync( 2215 Function<? super T, ? extends CompletionStage<U>> fn) { 2216 return uniComposeStage(defaultExecutor(), fn); 2217 } 2218 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)2219 public <U> CompletableFuture<U> thenComposeAsync( 2220 Function<? super T, ? extends CompletionStage<U>> fn, 2221 Executor executor) { 2222 return uniComposeStage(screenExecutor(executor), fn); 2223 } 2224 whenComplete( BiConsumer<? super T, ? super Throwable> action)2225 public CompletableFuture<T> whenComplete( 2226 BiConsumer<? super T, ? super Throwable> action) { 2227 return uniWhenCompleteStage(null, action); 2228 } 2229 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)2230 public CompletableFuture<T> whenCompleteAsync( 2231 BiConsumer<? super T, ? super Throwable> action) { 2232 return uniWhenCompleteStage(defaultExecutor(), action); 2233 } 2234 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)2235 public CompletableFuture<T> whenCompleteAsync( 2236 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 2237 return uniWhenCompleteStage(screenExecutor(executor), action); 2238 } 2239 handle( BiFunction<? super T, Throwable, ? extends U> fn)2240 public <U> CompletableFuture<U> handle( 2241 BiFunction<? super T, Throwable, ? extends U> fn) { 2242 return uniHandleStage(null, fn); 2243 } 2244 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)2245 public <U> CompletableFuture<U> handleAsync( 2246 BiFunction<? super T, Throwable, ? extends U> fn) { 2247 return uniHandleStage(defaultExecutor(), fn); 2248 } 2249 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)2250 public <U> CompletableFuture<U> handleAsync( 2251 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { 2252 return uniHandleStage(screenExecutor(executor), fn); 2253 } 2254 2255 /** 2256 * Returns this CompletableFuture. 2257 * 2258 * @return this CompletableFuture 2259 */ toCompletableFuture()2260 public CompletableFuture<T> toCompletableFuture() { 2261 return this; 2262 } 2263 2264 // not in interface CompletionStage 2265 2266 /** 2267 * Returns a new CompletableFuture that is completed when this 2268 * CompletableFuture completes, with the result of the given 2269 * function of the exception triggering this CompletableFuture's 2270 * completion when it completes exceptionally; otherwise, if this 2271 * CompletableFuture completes normally, then the returned 2272 * CompletableFuture also completes normally with the same value. 2273 * Note: More flexible versions of this functionality are 2274 * available using methods {@code whenComplete} and {@code handle}. 2275 * 2276 * @param fn the function to use to compute the value of the 2277 * returned CompletableFuture if this CompletableFuture completed 2278 * exceptionally 2279 * @return the new CompletableFuture 2280 */ exceptionally( Function<Throwable, ? extends T> fn)2281 public CompletableFuture<T> exceptionally( 2282 Function<Throwable, ? extends T> fn) { 2283 return uniExceptionallyStage(fn); 2284 } 2285 2286 2287 /* ------------- Arbitrary-arity constructions -------------- */ 2288 2289 /** 2290 * Returns a new CompletableFuture that is completed when all of 2291 * the given CompletableFutures complete. If any of the given 2292 * CompletableFutures complete exceptionally, then the returned 2293 * CompletableFuture also does so, with a CompletionException 2294 * holding this exception as its cause. Otherwise, the results, 2295 * if any, of the given CompletableFutures are not reflected in 2296 * the returned CompletableFuture, but may be obtained by 2297 * inspecting them individually. If no CompletableFutures are 2298 * provided, returns a CompletableFuture completed with the value 2299 * {@code null}. 2300 * 2301 * <p>Among the applications of this method is to await completion 2302 * of a set of independent CompletableFutures before continuing a 2303 * program, as in: {@code CompletableFuture.allOf(c1, c2, 2304 * c3).join();}. 2305 * 2306 * @param cfs the CompletableFutures 2307 * @return a new CompletableFuture that is completed when all of the 2308 * given CompletableFutures complete 2309 * @throws NullPointerException if the array or any of its elements are 2310 * {@code null} 2311 */ allOf(CompletableFuture<?>.... cfs)2312 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2313 return andTree(cfs, 0, cfs.length - 1); 2314 } 2315 2316 /** 2317 * Returns a new CompletableFuture that is completed when any of 2318 * the given CompletableFutures complete, with the same result. 2319 * Otherwise, if it completed exceptionally, the returned 2320 * CompletableFuture also does so, with a CompletionException 2321 * holding this exception as its cause. If no CompletableFutures 2322 * are provided, returns an incomplete CompletableFuture. 2323 * 2324 * @param cfs the CompletableFutures 2325 * @return a new CompletableFuture that is completed with the 2326 * result or exception of any of the given CompletableFutures when 2327 * one completes 2328 * @throws NullPointerException if the array or any of its elements are 2329 * {@code null} 2330 */ anyOf(CompletableFuture<?>.... cfs)2331 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { 2332 int n; Object r; 2333 if ((n = cfs.length) <= 1) 2334 return (n == 0) 2335 ? new CompletableFuture<Object>() 2336 : uniCopyStage(cfs[0]); 2337 for (CompletableFuture<?> cf : cfs) 2338 if ((r = cf.result) != null) 2339 return new CompletableFuture<Object>(encodeRelay(r)); 2340 cfs = cfs.clone(); 2341 CompletableFuture<Object> d = new CompletableFuture<>(); 2342 for (CompletableFuture<?> cf : cfs) 2343 cf.unipush(new AnyOf(d, cf, cfs)); 2344 // If d was completed while we were adding completions, we should 2345 // clean the stack of any sources that may have had completions 2346 // pushed on their stack after d was completed. 2347 if (d.result != null) 2348 for (int i = 0, len = cfs.length; i < len; i++) 2349 if (cfs[i].result != null) 2350 for (i++; i < len; i++) 2351 if (cfs[i].result == null) 2352 cfs[i].cleanStack(); 2353 return d; 2354 } 2355 2356 /* ------------- Control and status methods -------------- */ 2357 2358 /** 2359 * If not already completed, completes this CompletableFuture with 2360 * a {@link CancellationException}. Dependent CompletableFutures 2361 * that have not already completed will also complete 2362 * exceptionally, with a {@link CompletionException} caused by 2363 * this {@code CancellationException}. 2364 * 2365 * @param mayInterruptIfRunning this value has no effect in this 2366 * implementation because interrupts are not used to control 2367 * processing. 2368 * 2369 * @return {@code true} if this task is now cancelled 2370 */ cancel(boolean mayInterruptIfRunning)2371 public boolean cancel(boolean mayInterruptIfRunning) { 2372 boolean cancelled = (result == null) && 2373 internalComplete(new AltResult(new CancellationException())); 2374 postComplete(); 2375 return cancelled || isCancelled(); 2376 } 2377 2378 /** 2379 * Returns {@code true} if this CompletableFuture was cancelled 2380 * before it completed normally. 2381 * 2382 * @return {@code true} if this CompletableFuture was cancelled 2383 * before it completed normally 2384 */ isCancelled()2385 public boolean isCancelled() { 2386 Object r; 2387 return ((r = result) instanceof AltResult) && 2388 (((AltResult)r).ex instanceof CancellationException); 2389 } 2390 2391 /** 2392 * Returns {@code true} if this CompletableFuture completed 2393 * exceptionally, in any way. Possible causes include 2394 * cancellation, explicit invocation of {@code 2395 * completeExceptionally}, and abrupt termination of a 2396 * CompletionStage action. 2397 * 2398 * @return {@code true} if this CompletableFuture completed 2399 * exceptionally 2400 */ isCompletedExceptionally()2401 public boolean isCompletedExceptionally() { 2402 Object r; 2403 return ((r = result) instanceof AltResult) && r != NIL; 2404 } 2405 2406 /** 2407 * Forcibly sets or resets the value subsequently returned by 2408 * method {@link #get()} and related methods, whether or not 2409 * already completed. This method is designed for use only in 2410 * error recovery actions, and even in such situations may result 2411 * in ongoing dependent completions using established versus 2412 * overwritten outcomes. 2413 * 2414 * @param value the completion value 2415 */ obtrudeValue(T value)2416 public void obtrudeValue(T value) { 2417 result = (value == null) ? NIL : value; 2418 postComplete(); 2419 } 2420 2421 /** 2422 * Forcibly causes subsequent invocations of method {@link #get()} 2423 * and related methods to throw the given exception, whether or 2424 * not already completed. This method is designed for use only in 2425 * error recovery actions, and even in such situations may result 2426 * in ongoing dependent completions using established versus 2427 * overwritten outcomes. 2428 * 2429 * @param ex the exception 2430 * @throws NullPointerException if the exception is null 2431 */ obtrudeException(Throwable ex)2432 public void obtrudeException(Throwable ex) { 2433 if (ex == null) throw new NullPointerException(); 2434 result = new AltResult(ex); 2435 postComplete(); 2436 } 2437 2438 /** 2439 * Returns the estimated number of CompletableFutures whose 2440 * completions are awaiting completion of this CompletableFuture. 2441 * This method is designed for use in monitoring system state, not 2442 * for synchronization control. 2443 * 2444 * @return the number of dependent CompletableFutures 2445 */ getNumberOfDependents()2446 public int getNumberOfDependents() { 2447 int count = 0; 2448 for (Completion p = stack; p != null; p = p.next) 2449 ++count; 2450 return count; 2451 } 2452 2453 /** 2454 * Returns a string identifying this CompletableFuture, as well as 2455 * its completion state. The state, in brackets, contains the 2456 * String {@code "Completed Normally"} or the String {@code 2457 * "Completed Exceptionally"}, or the String {@code "Not 2458 * completed"} followed by the number of CompletableFutures 2459 * dependent upon its completion, if any. 2460 * 2461 * @return a string identifying this CompletableFuture, as well as its state 2462 */ toString()2463 public String toString() { 2464 Object r = result; 2465 int count = 0; // avoid call to getNumberOfDependents in case disabled 2466 for (Completion p = stack; p != null; p = p.next) 2467 ++count; 2468 return super.toString() + 2469 ((r == null) 2470 ? ((count == 0) 2471 ? "[Not completed]" 2472 : "[Not completed, " + count + " dependents]") 2473 : (((r instanceof AltResult) && ((AltResult)r).ex != null) 2474 ? "[Completed exceptionally: " + ((AltResult)r).ex + "]" 2475 : "[Completed normally]")); 2476 } 2477 2478 // jdk9 additions 2479 2480 /** 2481 * Returns a new incomplete CompletableFuture of the type to be 2482 * returned by a CompletionStage method. Subclasses should 2483 * normally override this method to return an instance of the same 2484 * class as this CompletableFuture. The default implementation 2485 * returns an instance of class CompletableFuture. 2486 * 2487 * @param <U> the type of the value 2488 * @return a new CompletableFuture 2489 * @since 9 2490 */ newIncompleteFuture()2491 public <U> CompletableFuture<U> newIncompleteFuture() { 2492 return new CompletableFuture<U>(); 2493 } 2494 2495 /** 2496 * Returns the default Executor used for async methods that do not 2497 * specify an Executor. This class uses the {@link 2498 * ForkJoinPool#commonPool()} if it supports more than one 2499 * parallel thread, or else an Executor using one thread per async 2500 * task. This method may be overridden in subclasses to return 2501 * an Executor that provides at least one independent thread. 2502 * 2503 * @return the executor 2504 * @since 9 2505 */ defaultExecutor()2506 public Executor defaultExecutor() { 2507 return ASYNC_POOL; 2508 } 2509 2510 /** 2511 * Returns a new CompletableFuture that is completed normally with 2512 * the same value as this CompletableFuture when it completes 2513 * normally. If this CompletableFuture completes exceptionally, 2514 * then the returned CompletableFuture completes exceptionally 2515 * with a CompletionException with this exception as cause. The 2516 * behavior is equivalent to {@code thenApply(x -> x)}. This 2517 * method may be useful as a form of "defensive copying", to 2518 * prevent clients from completing, while still being able to 2519 * arrange dependent actions. 2520 * 2521 * @return the new CompletableFuture 2522 * @since 9 2523 */ copy()2524 public CompletableFuture<T> copy() { 2525 return uniCopyStage(this); 2526 } 2527 2528 /** 2529 * Returns a new CompletionStage that is completed normally with 2530 * the same value as this CompletableFuture when it completes 2531 * normally, and cannot be independently completed or otherwise 2532 * used in ways not defined by the methods of interface {@link 2533 * CompletionStage}. If this CompletableFuture completes 2534 * exceptionally, then the returned CompletionStage completes 2535 * exceptionally with a CompletionException with this exception as 2536 * cause. 2537 * 2538 * <p>Unless overridden by a subclass, a new non-minimal 2539 * CompletableFuture with all methods available can be obtained from 2540 * a minimal CompletionStage via {@link #toCompletableFuture()}. 2541 * For example, completion of a minimal stage can be awaited by 2542 * 2543 * <pre> {@code minimalStage.toCompletableFuture().join(); }</pre> 2544 * 2545 * @return the new CompletionStage 2546 * @since 9 2547 */ minimalCompletionStage()2548 public CompletionStage<T> minimalCompletionStage() { 2549 return uniAsMinimalStage(); 2550 } 2551 2552 /** 2553 * Completes this CompletableFuture with the result of 2554 * the given Supplier function invoked from an asynchronous 2555 * task using the given executor. 2556 * 2557 * @param supplier a function returning the value to be used 2558 * to complete this CompletableFuture 2559 * @param executor the executor to use for asynchronous execution 2560 * @return this CompletableFuture 2561 * @since 9 2562 */ completeAsync(Supplier<? extends T> supplier, Executor executor)2563 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, 2564 Executor executor) { 2565 if (supplier == null || executor == null) 2566 throw new NullPointerException(); 2567 executor.execute(new AsyncSupply<T>(this, supplier)); 2568 return this; 2569 } 2570 2571 /** 2572 * Completes this CompletableFuture with the result of the given 2573 * Supplier function invoked from an asynchronous task using the 2574 * default executor. 2575 * 2576 * @param supplier a function returning the value to be used 2577 * to complete this CompletableFuture 2578 * @return this CompletableFuture 2579 * @since 9 2580 */ completeAsync(Supplier<? extends T> supplier)2581 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) { 2582 return completeAsync(supplier, defaultExecutor()); 2583 } 2584 2585 /** 2586 * Exceptionally completes this CompletableFuture with 2587 * a {@link TimeoutException} if not otherwise completed 2588 * before the given timeout. 2589 * 2590 * @param timeout how long to wait before completing exceptionally 2591 * with a TimeoutException, in units of {@code unit} 2592 * @param unit a {@code TimeUnit} determining how to interpret the 2593 * {@code timeout} parameter 2594 * @return this CompletableFuture 2595 * @since 9 2596 */ orTimeout(long timeout, TimeUnit unit)2597 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { 2598 if (unit == null) 2599 throw new NullPointerException(); 2600 if (result == null) 2601 whenComplete(new Canceller(Delayer.delay(new Timeout(this), 2602 timeout, unit))); 2603 return this; 2604 } 2605 2606 /** 2607 * Completes this CompletableFuture with the given value if not 2608 * otherwise completed before the given timeout. 2609 * 2610 * @param value the value to use upon timeout 2611 * @param timeout how long to wait before completing normally 2612 * with the given value, in units of {@code unit} 2613 * @param unit a {@code TimeUnit} determining how to interpret the 2614 * {@code timeout} parameter 2615 * @return this CompletableFuture 2616 * @since 9 2617 */ completeOnTimeout(T value, long timeout, TimeUnit unit)2618 public CompletableFuture<T> completeOnTimeout(T value, long timeout, 2619 TimeUnit unit) { 2620 if (unit == null) 2621 throw new NullPointerException(); 2622 if (result == null) 2623 whenComplete(new Canceller(Delayer.delay( 2624 new DelayedCompleter<T>(this, value), 2625 timeout, unit))); 2626 return this; 2627 } 2628 2629 /** 2630 * Returns a new Executor that submits a task to the given base 2631 * executor after the given delay (or no delay if non-positive). 2632 * Each delay commences upon invocation of the returned executor's 2633 * {@code execute} method. 2634 * 2635 * @param delay how long to delay, in units of {@code unit} 2636 * @param unit a {@code TimeUnit} determining how to interpret the 2637 * {@code delay} parameter 2638 * @param executor the base executor 2639 * @return the new delayed executor 2640 * @since 9 2641 */ delayedExecutor(long delay, TimeUnit unit, Executor executor)2642 public static Executor delayedExecutor(long delay, TimeUnit unit, 2643 Executor executor) { 2644 if (unit == null || executor == null) 2645 throw new NullPointerException(); 2646 return new DelayedExecutor(delay, unit, executor); 2647 } 2648 2649 /** 2650 * Returns a new Executor that submits a task to the default 2651 * executor after the given delay (or no delay if non-positive). 2652 * Each delay commences upon invocation of the returned executor's 2653 * {@code execute} method. 2654 * 2655 * @param delay how long to delay, in units of {@code unit} 2656 * @param unit a {@code TimeUnit} determining how to interpret the 2657 * {@code delay} parameter 2658 * @return the new delayed executor 2659 * @since 9 2660 */ delayedExecutor(long delay, TimeUnit unit)2661 public static Executor delayedExecutor(long delay, TimeUnit unit) { 2662 if (unit == null) 2663 throw new NullPointerException(); 2664 return new DelayedExecutor(delay, unit, ASYNC_POOL); 2665 } 2666 2667 /** 2668 * Returns a new CompletionStage that is already completed with 2669 * the given value and supports only those methods in 2670 * interface {@link CompletionStage}. 2671 * 2672 * @param value the value 2673 * @param <U> the type of the value 2674 * @return the completed CompletionStage 2675 * @since 9 2676 */ completedStage(U value)2677 public static <U> CompletionStage<U> completedStage(U value) { 2678 return new MinimalStage<U>((value == null) ? NIL : value); 2679 } 2680 2681 /** 2682 * Returns a new CompletableFuture that is already completed 2683 * exceptionally with the given exception. 2684 * 2685 * @param ex the exception 2686 * @param <U> the type of the value 2687 * @return the exceptionally completed CompletableFuture 2688 * @since 9 2689 */ failedFuture(Throwable ex)2690 public static <U> CompletableFuture<U> failedFuture(Throwable ex) { 2691 if (ex == null) throw new NullPointerException(); 2692 return new CompletableFuture<U>(new AltResult(ex)); 2693 } 2694 2695 /** 2696 * Returns a new CompletionStage that is already completed 2697 * exceptionally with the given exception and supports only those 2698 * methods in interface {@link CompletionStage}. 2699 * 2700 * @param ex the exception 2701 * @param <U> the type of the value 2702 * @return the exceptionally completed CompletionStage 2703 * @since 9 2704 */ failedStage(Throwable ex)2705 public static <U> CompletionStage<U> failedStage(Throwable ex) { 2706 if (ex == null) throw new NullPointerException(); 2707 return new MinimalStage<U>(new AltResult(ex)); 2708 } 2709 2710 /** 2711 * Singleton delay scheduler, used only for starting and 2712 * cancelling tasks. 2713 */ 2714 static final class Delayer { delay(Runnable command, long delay, TimeUnit unit)2715 static ScheduledFuture<?> delay(Runnable command, long delay, 2716 TimeUnit unit) { 2717 return delayer.schedule(command, delay, unit); 2718 } 2719 2720 static final class DaemonThreadFactory implements ThreadFactory { newThread(Runnable r)2721 public Thread newThread(Runnable r) { 2722 Thread t = new Thread(r); 2723 t.setDaemon(true); 2724 t.setName("CompletableFutureDelayScheduler"); 2725 return t; 2726 } 2727 } 2728 2729 static final ScheduledThreadPoolExecutor delayer; 2730 static { 2731 (delayer = new ScheduledThreadPoolExecutor( 2732 1, new DaemonThreadFactory())). 2733 setRemoveOnCancelPolicy(true); 2734 } 2735 } 2736 2737 // Little class-ified lambdas to better support monitoring 2738 2739 static final class DelayedExecutor implements Executor { 2740 final long delay; 2741 final TimeUnit unit; 2742 final Executor executor; DelayedExecutor(long delay, TimeUnit unit, Executor executor)2743 DelayedExecutor(long delay, TimeUnit unit, Executor executor) { 2744 this.delay = delay; this.unit = unit; this.executor = executor; 2745 } execute(Runnable r)2746 public void execute(Runnable r) { 2747 Delayer.delay(new TaskSubmitter(executor, r), delay, unit); 2748 } 2749 } 2750 2751 /** Action to submit user task */ 2752 static final class TaskSubmitter implements Runnable { 2753 final Executor executor; 2754 final Runnable action; TaskSubmitter(Executor executor, Runnable action)2755 TaskSubmitter(Executor executor, Runnable action) { 2756 this.executor = executor; 2757 this.action = action; 2758 } run()2759 public void run() { executor.execute(action); } 2760 } 2761 2762 /** Action to completeExceptionally on timeout */ 2763 static final class Timeout implements Runnable { 2764 final CompletableFuture<?> f; Timeout(CompletableFuture<?> f)2765 Timeout(CompletableFuture<?> f) { this.f = f; } run()2766 public void run() { 2767 if (f != null && !f.isDone()) 2768 f.completeExceptionally(new TimeoutException()); 2769 } 2770 } 2771 2772 /** Action to complete on timeout */ 2773 static final class DelayedCompleter<U> implements Runnable { 2774 final CompletableFuture<U> f; 2775 final U u; DelayedCompleter(CompletableFuture<U> f, U u)2776 DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; } run()2777 public void run() { 2778 if (f != null) 2779 f.complete(u); 2780 } 2781 } 2782 2783 /** Action to cancel unneeded timeouts */ 2784 static final class Canceller implements BiConsumer<Object, Throwable> { 2785 final Future<?> f; Canceller(Future<?> f)2786 Canceller(Future<?> f) { this.f = f; } accept(Object ignore, Throwable ex)2787 public void accept(Object ignore, Throwable ex) { 2788 if (ex == null && f != null && !f.isDone()) 2789 f.cancel(false); 2790 } 2791 } 2792 2793 /** 2794 * A subclass that just throws UOE for most non-CompletionStage methods. 2795 */ 2796 static final class MinimalStage<T> extends CompletableFuture<T> { MinimalStage()2797 MinimalStage() { } MinimalStage(Object r)2798 MinimalStage(Object r) { super(r); } newIncompleteFuture()2799 @Override public <U> CompletableFuture<U> newIncompleteFuture() { 2800 return new MinimalStage<U>(); } get()2801 @Override public T get() { 2802 throw new UnsupportedOperationException(); } get(long timeout, TimeUnit unit)2803 @Override public T get(long timeout, TimeUnit unit) { 2804 throw new UnsupportedOperationException(); } getNow(T valueIfAbsent)2805 @Override public T getNow(T valueIfAbsent) { 2806 throw new UnsupportedOperationException(); } join()2807 @Override public T join() { 2808 throw new UnsupportedOperationException(); } complete(T value)2809 @Override public boolean complete(T value) { 2810 throw new UnsupportedOperationException(); } completeExceptionally(Throwable ex)2811 @Override public boolean completeExceptionally(Throwable ex) { 2812 throw new UnsupportedOperationException(); } cancel(boolean mayInterruptIfRunning)2813 @Override public boolean cancel(boolean mayInterruptIfRunning) { 2814 throw new UnsupportedOperationException(); } obtrudeValue(T value)2815 @Override public void obtrudeValue(T value) { 2816 throw new UnsupportedOperationException(); } obtrudeException(Throwable ex)2817 @Override public void obtrudeException(Throwable ex) { 2818 throw new UnsupportedOperationException(); } isDone()2819 @Override public boolean isDone() { 2820 throw new UnsupportedOperationException(); } isCancelled()2821 @Override public boolean isCancelled() { 2822 throw new UnsupportedOperationException(); } isCompletedExceptionally()2823 @Override public boolean isCompletedExceptionally() { 2824 throw new UnsupportedOperationException(); } getNumberOfDependents()2825 @Override public int getNumberOfDependents() { 2826 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier, Executor executor)2827 @Override public CompletableFuture<T> completeAsync 2828 (Supplier<? extends T> supplier, Executor executor) { 2829 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier)2830 @Override public CompletableFuture<T> completeAsync 2831 (Supplier<? extends T> supplier) { 2832 throw new UnsupportedOperationException(); } orTimeout(long timeout, TimeUnit unit)2833 @Override public CompletableFuture<T> orTimeout 2834 (long timeout, TimeUnit unit) { 2835 throw new UnsupportedOperationException(); } completeOnTimeout(T value, long timeout, TimeUnit unit)2836 @Override public CompletableFuture<T> completeOnTimeout 2837 (T value, long timeout, TimeUnit unit) { 2838 throw new UnsupportedOperationException(); } toCompletableFuture()2839 @Override public CompletableFuture<T> toCompletableFuture() { 2840 Object r; 2841 if ((r = result) != null) 2842 return new CompletableFuture<T>(encodeRelay(r)); 2843 else { 2844 CompletableFuture<T> d = new CompletableFuture<>(); 2845 unipush(new UniRelay<T,T>(d, this)); 2846 return d; 2847 } 2848 } 2849 } 2850 2851 // VarHandle mechanics 2852 private static final VarHandle RESULT; 2853 private static final VarHandle STACK; 2854 private static final VarHandle NEXT; 2855 static { 2856 try { 2857 MethodHandles.Lookup l = MethodHandles.lookup(); 2858 RESULT = l.findVarHandle(CompletableFuture.class, "result", Object.class); 2859 STACK = l.findVarHandle(CompletableFuture.class, "stack", Completion.class); 2860 NEXT = l.findVarHandle(Completion.class, "next", Completion.class); 2861 } catch (ReflectiveOperationException e) { 2862 throw new ExceptionInInitializerError(e); 2863 } 2864 2865 // Reduce the risk of rare disastrous classloading in first call to 2866 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 2867 Class<?> ensureLoaded = LockSupport.class; 2868 } 2869 } 2870