1 /* 2 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 3 * 4 * This code is free software; you can redistribute it and/or modify it 5 * under the terms of the GNU General Public License version 2 only, as 6 * published by the Free Software Foundation. Oracle designates this 7 * particular file as subject to the "Classpath" exception as provided 8 * by Oracle in the LICENSE file that accompanied this code. 9 * 10 * This code is distributed in the hope that it will be useful, but WITHOUT 11 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 12 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 13 * version 2 for more details (a copy is included in the LICENSE file that 14 * accompanied this code). 15 * 16 * You should have received a copy of the GNU General Public License version 17 * 2 along with this work; if not, write to the Free Software Foundation, 18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 19 * 20 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 21 * or visit www.oracle.com if you need additional information or have any 22 * questions. 23 */ 24 25 /* 26 * This file is available under and governed by the GNU General Public 27 * License version 2 only, as published by the Free Software Foundation. 28 * However, the following notice accompanied the original version of this 29 * file: 30 * 31 * Written by Doug Lea with assistance from members of JCP JSR-166 32 * Expert Group and released to the public domain, as explained at 33 * http://creativecommons.org/publicdomain/zero/1.0/ 34 */ 35 36 package java.util.concurrent; 37 38 import java.util.concurrent.locks.LockSupport; 39 import java.util.function.BiConsumer; 40 import java.util.function.BiFunction; 41 import java.util.function.Consumer; 42 import java.util.function.Function; 43 import java.util.function.Supplier; 44 45 // Android-note: Class javadoc changed to remove references to hidden OpenJDK 9 methods. 46 47 /** 48 * A {@link Future} that may be explicitly completed (setting its 49 * value and status), and may be used as a {@link CompletionStage}, 50 * supporting dependent functions and actions that trigger upon its 51 * completion. 52 * 53 * <p>When two or more threads attempt to 54 * {@link #complete complete}, 55 * {@link #completeExceptionally completeExceptionally}, or 56 * {@link #cancel cancel} 57 * a CompletableFuture, only one of them succeeds. 58 * 59 * <p>In addition to these and related methods for directly 60 * manipulating status and results, CompletableFuture implements 61 * interface {@link CompletionStage} with the following policies: <ul> 62 * 63 * <li>Actions supplied for dependent completions of 64 * <em>non-async</em> methods may be performed by the thread that 65 * completes the current CompletableFuture, or by any other caller of 66 * a completion method. 67 * 68 * <li>All <em>async</em> methods without an explicit Executor 69 * argument are performed using the {@link ForkJoinPool#commonPool()} 70 * (unless it does not support a parallelism level of at least two, in 71 * which case, a new Thread is created to run each task). 72 * To simplify monitoring, debugging, 73 * and tracking, all generated asynchronous tasks are instances of the 74 * marker interface {@link AsynchronousCompletionTask}. Operations 75 * with time-delays can use adapter methods defined in this class, for 76 * example: {@code supplyAsync(supplier, delayedExecutor(timeout, 77 * timeUnit))}. To support methods with delays and timeouts, this 78 * class maintains at most one daemon thread for triggering and 79 * cancelling actions, not for running them. 80 * 81 * <li>All CompletionStage methods are implemented independently of 82 * other public methods, so the behavior of one method is not impacted 83 * by overrides of others in subclasses. 84 * 85 * </ul> 86 * 87 * <p>CompletableFuture also implements {@link Future} with the following 88 * policies: <ul> 89 * 90 * <li>Since (unlike {@link FutureTask}) this class has no direct 91 * control over the computation that causes it to be completed, 92 * cancellation is treated as just another form of exceptional 93 * completion. Method {@link #cancel cancel} has the same effect as 94 * {@code completeExceptionally(new CancellationException())}. Method 95 * {@link #isCompletedExceptionally} can be used to determine if a 96 * CompletableFuture completed in any exceptional fashion. 97 * 98 * <li>In case of exceptional completion with a CompletionException, 99 * methods {@link #get()} and {@link #get(long, TimeUnit)} throw an 100 * {@link ExecutionException} with the same cause as held in the 101 * corresponding CompletionException. To simplify usage in most 102 * contexts, this class also defines methods {@link #join()} and 103 * {@link #getNow} that instead throw the CompletionException directly 104 * in these cases. 105 * </ul> 106 * 107 * <p>Arguments used to pass a completion result (that is, for 108 * parameters of type {@code T}) for methods accepting them may be 109 * null, but passing a null value for any other parameter will result 110 * in a {@link NullPointerException} being thrown. 111 * 112 * @author Doug Lea 113 * @since 1.8 114 * @param <T> The result type returned by this future's {@code join} 115 * and {@code get} methods 116 */ 117 public class CompletableFuture<T> implements Future<T>, CompletionStage<T> { 118 119 /* 120 * Overview: 121 * 122 * A CompletableFuture may have dependent completion actions, 123 * collected in a linked stack. It atomically completes by CASing 124 * a result field, and then pops off and runs those actions. This 125 * applies across normal vs exceptional outcomes, sync vs async 126 * actions, binary triggers, and various forms of completions. 127 * 128 * Non-nullness of field result (set via CAS) indicates done. An 129 * AltResult is used to box null as a result, as well as to hold 130 * exceptions. Using a single field makes completion simple to 131 * detect and trigger. Encoding and decoding is straightforward 132 * but adds to the sprawl of trapping and associating exceptions 133 * with targets. Minor simplifications rely on (static) NIL (to 134 * box null results) being the only AltResult with a null 135 * exception field, so we don't usually need explicit comparisons. 136 * Even though some of the generics casts are unchecked (see 137 * SuppressWarnings annotations), they are placed to be 138 * appropriate even if checked. 139 * 140 * Dependent actions are represented by Completion objects linked 141 * as Treiber stacks headed by field "stack". There are Completion 142 * classes for each kind of action, grouped into single-input 143 * (UniCompletion), two-input (BiCompletion), projected 144 * (BiCompletions using either (not both) of two inputs), shared 145 * (CoCompletion, used by the second of two sources), zero-input 146 * source actions, and Signallers that unblock waiters. Class 147 * Completion extends ForkJoinTask to enable async execution 148 * (adding no space overhead because we exploit its "tag" methods 149 * to maintain claims). It is also declared as Runnable to allow 150 * usage with arbitrary executors. 151 * 152 * Support for each kind of CompletionStage relies on a separate 153 * class, along with two CompletableFuture methods: 154 * 155 * * A Completion class with name X corresponding to function, 156 * prefaced with "Uni", "Bi", or "Or". Each class contains 157 * fields for source(s), actions, and dependent. They are 158 * boringly similar, differing from others only with respect to 159 * underlying functional forms. We do this so that users don't 160 * encounter layers of adapters in common usages. 161 * 162 * * Boolean CompletableFuture method x(...) (for example 163 * uniApply) takes all of the arguments needed to check that an 164 * action is triggerable, and then either runs the action or 165 * arranges its async execution by executing its Completion 166 * argument, if present. The method returns true if known to be 167 * complete. 168 * 169 * * Completion method tryFire(int mode) invokes the associated x 170 * method with its held arguments, and on success cleans up. 171 * The mode argument allows tryFire to be called twice (SYNC, 172 * then ASYNC); the first to screen and trap exceptions while 173 * arranging to execute, and the second when called from a 174 * task. (A few classes are not used async so take slightly 175 * different forms.) The claim() callback suppresses function 176 * invocation if already claimed by another thread. 177 * 178 * * CompletableFuture method xStage(...) is called from a public 179 * stage method of CompletableFuture x. It screens user 180 * arguments and invokes and/or creates the stage object. If 181 * not async and x is already complete, the action is run 182 * immediately. Otherwise a Completion c is created, pushed to 183 * x's stack (unless done), and started or triggered via 184 * c.tryFire. This also covers races possible if x completes 185 * while pushing. Classes with two inputs (for example BiApply) 186 * deal with races across both while pushing actions. The 187 * second completion is a CoCompletion pointing to the first, 188 * shared so that at most one performs the action. The 189 * multiple-arity methods allOf and anyOf do this pairwise to 190 * form trees of completions. 191 * 192 * Note that the generic type parameters of methods vary according 193 * to whether "this" is a source, dependent, or completion. 194 * 195 * Method postComplete is called upon completion unless the target 196 * is guaranteed not to be observable (i.e., not yet returned or 197 * linked). Multiple threads can call postComplete, which 198 * atomically pops each dependent action, and tries to trigger it 199 * via method tryFire, in NESTED mode. Triggering can propagate 200 * recursively, so NESTED mode returns its completed dependent (if 201 * one exists) for further processing by its caller (see method 202 * postFire). 203 * 204 * Blocking methods get() and join() rely on Signaller Completions 205 * that wake up waiting threads. The mechanics are similar to 206 * Treiber stack wait-nodes used in FutureTask, Phaser, and 207 * SynchronousQueue. See their internal documentation for 208 * algorithmic details. 209 * 210 * Without precautions, CompletableFutures would be prone to 211 * garbage accumulation as chains of Completions build up, each 212 * pointing back to its sources. So we null out fields as soon as 213 * possible. The screening checks needed anyway harmlessly ignore 214 * null arguments that may have been obtained during races with 215 * threads nulling out fields. We also try to unlink fired 216 * Completions from stacks that might never be popped (see method 217 * postFire). Completion fields need not be declared as final or 218 * volatile because they are only visible to other threads upon 219 * safe publication. 220 */ 221 222 volatile Object result; // Either the result or boxed AltResult 223 volatile Completion stack; // Top of Treiber stack of dependent actions 224 internalComplete(Object r)225 final boolean internalComplete(Object r) { // CAS from null to r 226 return U.compareAndSwapObject(this, RESULT, null, r); 227 } 228 casStack(Completion cmp, Completion val)229 final boolean casStack(Completion cmp, Completion val) { 230 return U.compareAndSwapObject(this, STACK, cmp, val); 231 } 232 233 /** Returns true if successfully pushed c onto stack. */ tryPushStack(Completion c)234 final boolean tryPushStack(Completion c) { 235 Completion h = stack; 236 lazySetNext(c, h); 237 return U.compareAndSwapObject(this, STACK, h, c); 238 } 239 240 /** Unconditionally pushes c onto stack, retrying if necessary. */ pushStack(Completion c)241 final void pushStack(Completion c) { 242 do {} while (!tryPushStack(c)); 243 } 244 245 /* ------------- Encoding and decoding outcomes -------------- */ 246 247 static final class AltResult { // See above 248 final Throwable ex; // null only for NIL AltResult(Throwable x)249 AltResult(Throwable x) { this.ex = x; } 250 } 251 252 /** The encoding of the null value. */ 253 static final AltResult NIL = new AltResult(null); 254 255 /** Completes with the null value, unless already completed. */ completeNull()256 final boolean completeNull() { 257 return U.compareAndSwapObject(this, RESULT, null, 258 NIL); 259 } 260 261 /** Returns the encoding of the given non-exceptional value. */ encodeValue(T t)262 final Object encodeValue(T t) { 263 return (t == null) ? NIL : t; 264 } 265 266 /** Completes with a non-exceptional result, unless already completed. */ completeValue(T t)267 final boolean completeValue(T t) { 268 return U.compareAndSwapObject(this, RESULT, null, 269 (t == null) ? NIL : t); 270 } 271 272 /** 273 * Returns the encoding of the given (non-null) exception as a 274 * wrapped CompletionException unless it is one already. 275 */ encodeThrowable(Throwable x)276 static AltResult encodeThrowable(Throwable x) { 277 return new AltResult((x instanceof CompletionException) ? x : 278 new CompletionException(x)); 279 } 280 281 /** Completes with an exceptional result, unless already completed. */ completeThrowable(Throwable x)282 final boolean completeThrowable(Throwable x) { 283 return U.compareAndSwapObject(this, RESULT, null, 284 encodeThrowable(x)); 285 } 286 287 /** 288 * Returns the encoding of the given (non-null) exception as a 289 * wrapped CompletionException unless it is one already. May 290 * return the given Object r (which must have been the result of a 291 * source future) if it is equivalent, i.e. if this is a simple 292 * relay of an existing CompletionException. 293 */ encodeThrowable(Throwable x, Object r)294 static Object encodeThrowable(Throwable x, Object r) { 295 if (!(x instanceof CompletionException)) 296 x = new CompletionException(x); 297 else if (r instanceof AltResult && x == ((AltResult)r).ex) 298 return r; 299 return new AltResult(x); 300 } 301 302 /** 303 * Completes with the given (non-null) exceptional result as a 304 * wrapped CompletionException unless it is one already, unless 305 * already completed. May complete with the given Object r 306 * (which must have been the result of a source future) if it is 307 * equivalent, i.e. if this is a simple propagation of an 308 * existing CompletionException. 309 */ completeThrowable(Throwable x, Object r)310 final boolean completeThrowable(Throwable x, Object r) { 311 return U.compareAndSwapObject(this, RESULT, null, 312 encodeThrowable(x, r)); 313 } 314 315 /** 316 * Returns the encoding of the given arguments: if the exception 317 * is non-null, encodes as AltResult. Otherwise uses the given 318 * value, boxed as NIL if null. 319 */ encodeOutcome(T t, Throwable x)320 Object encodeOutcome(T t, Throwable x) { 321 return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); 322 } 323 324 /** 325 * Returns the encoding of a copied outcome; if exceptional, 326 * rewraps as a CompletionException, else returns argument. 327 */ encodeRelay(Object r)328 static Object encodeRelay(Object r) { 329 Throwable x; 330 return (((r instanceof AltResult) && 331 (x = ((AltResult)r).ex) != null && 332 !(x instanceof CompletionException)) ? 333 new AltResult(new CompletionException(x)) : r); 334 } 335 336 /** 337 * Completes with r or a copy of r, unless already completed. 338 * If exceptional, r is first coerced to a CompletionException. 339 */ completeRelay(Object r)340 final boolean completeRelay(Object r) { 341 return U.compareAndSwapObject(this, RESULT, null, 342 encodeRelay(r)); 343 } 344 345 /** 346 * Reports result using Future.get conventions. 347 */ reportGet(Object r)348 private static <T> T reportGet(Object r) 349 throws InterruptedException, ExecutionException { 350 if (r == null) // by convention below, null means interrupted 351 throw new InterruptedException(); 352 if (r instanceof AltResult) { 353 Throwable x, cause; 354 if ((x = ((AltResult)r).ex) == null) 355 return null; 356 if (x instanceof CancellationException) 357 throw (CancellationException)x; 358 if ((x instanceof CompletionException) && 359 (cause = x.getCause()) != null) 360 x = cause; 361 throw new ExecutionException(x); 362 } 363 @SuppressWarnings("unchecked") T t = (T) r; 364 return t; 365 } 366 367 /** 368 * Decodes outcome to return result or throw unchecked exception. 369 */ reportJoin(Object r)370 private static <T> T reportJoin(Object r) { 371 if (r instanceof AltResult) { 372 Throwable x; 373 if ((x = ((AltResult)r).ex) == null) 374 return null; 375 if (x instanceof CancellationException) 376 throw (CancellationException)x; 377 if (x instanceof CompletionException) 378 throw (CompletionException)x; 379 throw new CompletionException(x); 380 } 381 @SuppressWarnings("unchecked") T t = (T) r; 382 return t; 383 } 384 385 /* ------------- Async task preliminaries -------------- */ 386 387 /** 388 * A marker interface identifying asynchronous tasks produced by 389 * {@code async} methods. This may be useful for monitoring, 390 * debugging, and tracking asynchronous activities. 391 * 392 * @since 1.8 393 */ 394 public static interface AsynchronousCompletionTask { 395 } 396 397 private static final boolean USE_COMMON_POOL = 398 (ForkJoinPool.getCommonPoolParallelism() > 1); 399 400 /** 401 * Default executor -- ForkJoinPool.commonPool() unless it cannot 402 * support parallelism. 403 */ 404 private static final Executor ASYNC_POOL = USE_COMMON_POOL ? 405 ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); 406 407 /** Fallback if ForkJoinPool.commonPool() cannot support parallelism */ 408 static final class ThreadPerTaskExecutor implements Executor { execute(Runnable r)409 public void execute(Runnable r) { new Thread(r).start(); } 410 } 411 412 /** 413 * Null-checks user executor argument, and translates uses of 414 * commonPool to ASYNC_POOL in case parallelism disabled. 415 */ screenExecutor(Executor e)416 static Executor screenExecutor(Executor e) { 417 if (!USE_COMMON_POOL && e == ForkJoinPool.commonPool()) 418 return ASYNC_POOL; 419 if (e == null) throw new NullPointerException(); 420 return e; 421 } 422 423 // Modes for Completion.tryFire. Signedness matters. 424 static final int SYNC = 0; 425 static final int ASYNC = 1; 426 static final int NESTED = -1; 427 428 /** 429 * Spins before blocking in waitingGet 430 */ 431 static final int SPINS = (Runtime.getRuntime().availableProcessors() > 1 ? 432 1 << 8 : 0); 433 434 /* ------------- Base Completion classes and operations -------------- */ 435 436 @SuppressWarnings("serial") 437 abstract static class Completion extends ForkJoinTask<Void> 438 implements Runnable, AsynchronousCompletionTask { 439 volatile Completion next; // Treiber stack link 440 441 /** 442 * Performs completion action if triggered, returning a 443 * dependent that may need propagation, if one exists. 444 * 445 * @param mode SYNC, ASYNC, or NESTED 446 */ tryFire(int mode)447 abstract CompletableFuture<?> tryFire(int mode); 448 449 /** Returns true if possibly still triggerable. Used by cleanStack. */ isLive()450 abstract boolean isLive(); 451 run()452 public final void run() { tryFire(ASYNC); } exec()453 public final boolean exec() { tryFire(ASYNC); return false; } getRawResult()454 public final Void getRawResult() { return null; } setRawResult(Void v)455 public final void setRawResult(Void v) {} 456 } 457 lazySetNext(Completion c, Completion next)458 static void lazySetNext(Completion c, Completion next) { 459 U.putOrderedObject(c, NEXT, next); 460 } 461 462 /** 463 * Pops and tries to trigger all reachable dependents. Call only 464 * when known to be done. 465 */ postComplete()466 final void postComplete() { 467 /* 468 * On each step, variable f holds current dependents to pop 469 * and run. It is extended along only one path at a time, 470 * pushing others to avoid unbounded recursion. 471 */ 472 CompletableFuture<?> f = this; Completion h; 473 while ((h = f.stack) != null || 474 (f != this && (h = (f = this).stack) != null)) { 475 CompletableFuture<?> d; Completion t; 476 if (f.casStack(h, t = h.next)) { 477 if (t != null) { 478 if (f != this) { 479 pushStack(h); 480 continue; 481 } 482 h.next = null; // detach 483 } 484 f = (d = h.tryFire(NESTED)) == null ? this : d; 485 } 486 } 487 } 488 489 /** Traverses stack and unlinks dead Completions. */ cleanStack()490 final void cleanStack() { 491 for (Completion p = null, q = stack; q != null;) { 492 Completion s = q.next; 493 if (q.isLive()) { 494 p = q; 495 q = s; 496 } 497 else if (p == null) { 498 casStack(q, s); 499 q = stack; 500 } 501 else { 502 p.next = s; 503 if (p.isLive()) 504 q = s; 505 else { 506 p = null; // restart 507 q = stack; 508 } 509 } 510 } 511 } 512 513 /* ------------- One-input Completions -------------- */ 514 515 /** A Completion with a source, dependent, and executor. */ 516 @SuppressWarnings("serial") 517 abstract static class UniCompletion<T,V> extends Completion { 518 Executor executor; // executor to use (null if none) 519 CompletableFuture<V> dep; // the dependent to complete 520 CompletableFuture<T> src; // source for action 521 UniCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src)522 UniCompletion(Executor executor, CompletableFuture<V> dep, 523 CompletableFuture<T> src) { 524 this.executor = executor; this.dep = dep; this.src = src; 525 } 526 527 /** 528 * Returns true if action can be run. Call only when known to 529 * be triggerable. Uses FJ tag bit to ensure that only one 530 * thread claims ownership. If async, starts as task -- a 531 * later call to tryFire will run action. 532 */ claim()533 final boolean claim() { 534 Executor e = executor; 535 if (compareAndSetForkJoinTaskTag((short)0, (short)1)) { 536 if (e == null) 537 return true; 538 executor = null; // disable 539 e.execute(this); 540 } 541 return false; 542 } 543 isLive()544 final boolean isLive() { return dep != null; } 545 } 546 547 /** Pushes the given completion (if it exists) unless done. */ push(UniCompletion<?,?> c)548 final void push(UniCompletion<?,?> c) { 549 if (c != null) { 550 while (result == null && !tryPushStack(c)) 551 lazySetNext(c, null); // clear on failure 552 } 553 } 554 555 /** 556 * Post-processing by dependent after successful UniCompletion 557 * tryFire. Tries to clean stack of source a, and then either runs 558 * postComplete or returns this to caller, depending on mode. 559 */ postFire(CompletableFuture<?> a, int mode)560 final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) { 561 if (a != null && a.stack != null) { 562 if (mode < 0 || a.result == null) 563 a.cleanStack(); 564 else 565 a.postComplete(); 566 } 567 if (result != null && stack != null) { 568 if (mode < 0) 569 return this; 570 else 571 postComplete(); 572 } 573 return null; 574 } 575 576 @SuppressWarnings("serial") 577 static final class UniApply<T,V> extends UniCompletion<T,V> { 578 Function<? super T,? extends V> fn; UniApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T,? extends V> fn)579 UniApply(Executor executor, CompletableFuture<V> dep, 580 CompletableFuture<T> src, 581 Function<? super T,? extends V> fn) { 582 super(executor, dep, src); this.fn = fn; 583 } tryFire(int mode)584 final CompletableFuture<V> tryFire(int mode) { 585 CompletableFuture<V> d; CompletableFuture<T> a; 586 if ((d = dep) == null || 587 !d.uniApply(a = src, fn, mode > 0 ? null : this)) 588 return null; 589 dep = null; src = null; fn = null; 590 return d.postFire(a, mode); 591 } 592 } 593 uniApply(CompletableFuture<S> a, Function<? super S,? extends T> f, UniApply<S,T> c)594 final <S> boolean uniApply(CompletableFuture<S> a, 595 Function<? super S,? extends T> f, 596 UniApply<S,T> c) { 597 Object r; Throwable x; 598 if (a == null || (r = a.result) == null || f == null) 599 return false; 600 tryComplete: if (result == null) { 601 if (r instanceof AltResult) { 602 if ((x = ((AltResult)r).ex) != null) { 603 completeThrowable(x, r); 604 break tryComplete; 605 } 606 r = null; 607 } 608 try { 609 if (c != null && !c.claim()) 610 return false; 611 @SuppressWarnings("unchecked") S s = (S) r; 612 completeValue(f.apply(s)); 613 } catch (Throwable ex) { 614 completeThrowable(ex); 615 } 616 } 617 return true; 618 } 619 uniApplyStage( Executor e, Function<? super T,? extends V> f)620 private <V> CompletableFuture<V> uniApplyStage( 621 Executor e, Function<? super T,? extends V> f) { 622 if (f == null) throw new NullPointerException(); 623 CompletableFuture<V> d = newIncompleteFuture(); 624 if (e != null || !d.uniApply(this, f, null)) { 625 UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); 626 push(c); 627 c.tryFire(SYNC); 628 } 629 return d; 630 } 631 632 @SuppressWarnings("serial") 633 static final class UniAccept<T> extends UniCompletion<T,Void> { 634 Consumer<? super T> fn; UniAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Consumer<? super T> fn)635 UniAccept(Executor executor, CompletableFuture<Void> dep, 636 CompletableFuture<T> src, Consumer<? super T> fn) { 637 super(executor, dep, src); this.fn = fn; 638 } tryFire(int mode)639 final CompletableFuture<Void> tryFire(int mode) { 640 CompletableFuture<Void> d; CompletableFuture<T> a; 641 if ((d = dep) == null || 642 !d.uniAccept(a = src, fn, mode > 0 ? null : this)) 643 return null; 644 dep = null; src = null; fn = null; 645 return d.postFire(a, mode); 646 } 647 } 648 uniAccept(CompletableFuture<S> a, Consumer<? super S> f, UniAccept<S> c)649 final <S> boolean uniAccept(CompletableFuture<S> a, 650 Consumer<? super S> f, UniAccept<S> c) { 651 Object r; Throwable x; 652 if (a == null || (r = a.result) == null || f == null) 653 return false; 654 tryComplete: if (result == null) { 655 if (r instanceof AltResult) { 656 if ((x = ((AltResult)r).ex) != null) { 657 completeThrowable(x, r); 658 break tryComplete; 659 } 660 r = null; 661 } 662 try { 663 if (c != null && !c.claim()) 664 return false; 665 @SuppressWarnings("unchecked") S s = (S) r; 666 f.accept(s); 667 completeNull(); 668 } catch (Throwable ex) { 669 completeThrowable(ex); 670 } 671 } 672 return true; 673 } 674 uniAcceptStage(Executor e, Consumer<? super T> f)675 private CompletableFuture<Void> uniAcceptStage(Executor e, 676 Consumer<? super T> f) { 677 if (f == null) throw new NullPointerException(); 678 CompletableFuture<Void> d = newIncompleteFuture(); 679 if (e != null || !d.uniAccept(this, f, null)) { 680 UniAccept<T> c = new UniAccept<T>(e, d, this, f); 681 push(c); 682 c.tryFire(SYNC); 683 } 684 return d; 685 } 686 687 @SuppressWarnings("serial") 688 static final class UniRun<T> extends UniCompletion<T,Void> { 689 Runnable fn; UniRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, Runnable fn)690 UniRun(Executor executor, CompletableFuture<Void> dep, 691 CompletableFuture<T> src, Runnable fn) { 692 super(executor, dep, src); this.fn = fn; 693 } tryFire(int mode)694 final CompletableFuture<Void> tryFire(int mode) { 695 CompletableFuture<Void> d; CompletableFuture<T> a; 696 if ((d = dep) == null || 697 !d.uniRun(a = src, fn, mode > 0 ? null : this)) 698 return null; 699 dep = null; src = null; fn = null; 700 return d.postFire(a, mode); 701 } 702 } 703 uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c)704 final boolean uniRun(CompletableFuture<?> a, Runnable f, UniRun<?> c) { 705 Object r; Throwable x; 706 if (a == null || (r = a.result) == null || f == null) 707 return false; 708 if (result == null) { 709 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 710 completeThrowable(x, r); 711 else 712 try { 713 if (c != null && !c.claim()) 714 return false; 715 f.run(); 716 completeNull(); 717 } catch (Throwable ex) { 718 completeThrowable(ex); 719 } 720 } 721 return true; 722 } 723 uniRunStage(Executor e, Runnable f)724 private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { 725 if (f == null) throw new NullPointerException(); 726 CompletableFuture<Void> d = newIncompleteFuture(); 727 if (e != null || !d.uniRun(this, f, null)) { 728 UniRun<T> c = new UniRun<T>(e, d, this, f); 729 push(c); 730 c.tryFire(SYNC); 731 } 732 return d; 733 } 734 735 @SuppressWarnings("serial") 736 static final class UniWhenComplete<T> extends UniCompletion<T,T> { 737 BiConsumer<? super T, ? super Throwable> fn; UniWhenComplete(Executor executor, CompletableFuture<T> dep, CompletableFuture<T> src, BiConsumer<? super T, ? super Throwable> fn)738 UniWhenComplete(Executor executor, CompletableFuture<T> dep, 739 CompletableFuture<T> src, 740 BiConsumer<? super T, ? super Throwable> fn) { 741 super(executor, dep, src); this.fn = fn; 742 } tryFire(int mode)743 final CompletableFuture<T> tryFire(int mode) { 744 CompletableFuture<T> d; CompletableFuture<T> a; 745 if ((d = dep) == null || 746 !d.uniWhenComplete(a = src, fn, mode > 0 ? null : this)) 747 return null; 748 dep = null; src = null; fn = null; 749 return d.postFire(a, mode); 750 } 751 } 752 uniWhenComplete(CompletableFuture<T> a, BiConsumer<? super T,? super Throwable> f, UniWhenComplete<T> c)753 final boolean uniWhenComplete(CompletableFuture<T> a, 754 BiConsumer<? super T,? super Throwable> f, 755 UniWhenComplete<T> c) { 756 Object r; T t; Throwable x = null; 757 if (a == null || (r = a.result) == null || f == null) 758 return false; 759 if (result == null) { 760 try { 761 if (c != null && !c.claim()) 762 return false; 763 if (r instanceof AltResult) { 764 x = ((AltResult)r).ex; 765 t = null; 766 } else { 767 @SuppressWarnings("unchecked") T tr = (T) r; 768 t = tr; 769 } 770 f.accept(t, x); 771 if (x == null) { 772 internalComplete(r); 773 return true; 774 } 775 } catch (Throwable ex) { 776 if (x == null) 777 x = ex; 778 else if (x != ex) 779 x.addSuppressed(ex); 780 } 781 completeThrowable(x, r); 782 } 783 return true; 784 } 785 uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f)786 private CompletableFuture<T> uniWhenCompleteStage( 787 Executor e, BiConsumer<? super T, ? super Throwable> f) { 788 if (f == null) throw new NullPointerException(); 789 CompletableFuture<T> d = newIncompleteFuture(); 790 if (e != null || !d.uniWhenComplete(this, f, null)) { 791 UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); 792 push(c); 793 c.tryFire(SYNC); 794 } 795 return d; 796 } 797 798 @SuppressWarnings("serial") 799 static final class UniHandle<T,V> extends UniCompletion<T,V> { 800 BiFunction<? super T, Throwable, ? extends V> fn; UniHandle(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, BiFunction<? super T, Throwable, ? extends V> fn)801 UniHandle(Executor executor, CompletableFuture<V> dep, 802 CompletableFuture<T> src, 803 BiFunction<? super T, Throwable, ? extends V> fn) { 804 super(executor, dep, src); this.fn = fn; 805 } tryFire(int mode)806 final CompletableFuture<V> tryFire(int mode) { 807 CompletableFuture<V> d; CompletableFuture<T> a; 808 if ((d = dep) == null || 809 !d.uniHandle(a = src, fn, mode > 0 ? null : this)) 810 return null; 811 dep = null; src = null; fn = null; 812 return d.postFire(a, mode); 813 } 814 } 815 uniHandle(CompletableFuture<S> a, BiFunction<? super S, Throwable, ? extends T> f, UniHandle<S,T> c)816 final <S> boolean uniHandle(CompletableFuture<S> a, 817 BiFunction<? super S, Throwable, ? extends T> f, 818 UniHandle<S,T> c) { 819 Object r; S s; Throwable x; 820 if (a == null || (r = a.result) == null || f == null) 821 return false; 822 if (result == null) { 823 try { 824 if (c != null && !c.claim()) 825 return false; 826 if (r instanceof AltResult) { 827 x = ((AltResult)r).ex; 828 s = null; 829 } else { 830 x = null; 831 @SuppressWarnings("unchecked") S ss = (S) r; 832 s = ss; 833 } 834 completeValue(f.apply(s, x)); 835 } catch (Throwable ex) { 836 completeThrowable(ex); 837 } 838 } 839 return true; 840 } 841 uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f)842 private <V> CompletableFuture<V> uniHandleStage( 843 Executor e, BiFunction<? super T, Throwable, ? extends V> f) { 844 if (f == null) throw new NullPointerException(); 845 CompletableFuture<V> d = newIncompleteFuture(); 846 if (e != null || !d.uniHandle(this, f, null)) { 847 UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); 848 push(c); 849 c.tryFire(SYNC); 850 } 851 return d; 852 } 853 854 @SuppressWarnings("serial") 855 static final class UniExceptionally<T> extends UniCompletion<T,T> { 856 Function<? super Throwable, ? extends T> fn; UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, Function<? super Throwable, ? extends T> fn)857 UniExceptionally(CompletableFuture<T> dep, CompletableFuture<T> src, 858 Function<? super Throwable, ? extends T> fn) { 859 super(null, dep, src); this.fn = fn; 860 } tryFire(int mode)861 final CompletableFuture<T> tryFire(int mode) { // never ASYNC 862 // assert mode != ASYNC; 863 CompletableFuture<T> d; CompletableFuture<T> a; 864 if ((d = dep) == null || !d.uniExceptionally(a = src, fn, this)) 865 return null; 866 dep = null; src = null; fn = null; 867 return d.postFire(a, mode); 868 } 869 } 870 uniExceptionally(CompletableFuture<T> a, Function<? super Throwable, ? extends T> f, UniExceptionally<T> c)871 final boolean uniExceptionally(CompletableFuture<T> a, 872 Function<? super Throwable, ? extends T> f, 873 UniExceptionally<T> c) { 874 Object r; Throwable x; 875 if (a == null || (r = a.result) == null || f == null) 876 return false; 877 if (result == null) { 878 try { 879 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) { 880 if (c != null && !c.claim()) 881 return false; 882 completeValue(f.apply(x)); 883 } else 884 internalComplete(r); 885 } catch (Throwable ex) { 886 completeThrowable(ex); 887 } 888 } 889 return true; 890 } 891 uniExceptionallyStage( Function<Throwable, ? extends T> f)892 private CompletableFuture<T> uniExceptionallyStage( 893 Function<Throwable, ? extends T> f) { 894 if (f == null) throw new NullPointerException(); 895 CompletableFuture<T> d = newIncompleteFuture(); 896 if (!d.uniExceptionally(this, f, null)) { 897 UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); 898 push(c); 899 c.tryFire(SYNC); 900 } 901 return d; 902 } 903 904 @SuppressWarnings("serial") 905 static final class UniRelay<T> extends UniCompletion<T,T> { // for Compose UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src)906 UniRelay(CompletableFuture<T> dep, CompletableFuture<T> src) { 907 super(null, dep, src); 908 } tryFire(int mode)909 final CompletableFuture<T> tryFire(int mode) { 910 CompletableFuture<T> d; CompletableFuture<T> a; 911 if ((d = dep) == null || !d.uniRelay(a = src)) 912 return null; 913 src = null; dep = null; 914 return d.postFire(a, mode); 915 } 916 } 917 uniRelay(CompletableFuture<T> a)918 final boolean uniRelay(CompletableFuture<T> a) { 919 Object r; 920 if (a == null || (r = a.result) == null) 921 return false; 922 if (result == null) // no need to claim 923 completeRelay(r); 924 return true; 925 } 926 uniCopyStage()927 private CompletableFuture<T> uniCopyStage() { 928 Object r; 929 CompletableFuture<T> d = newIncompleteFuture(); 930 if ((r = result) != null) 931 d.completeRelay(r); 932 else { 933 UniRelay<T> c = new UniRelay<T>(d, this); 934 push(c); 935 c.tryFire(SYNC); 936 } 937 return d; 938 } 939 uniAsMinimalStage()940 private MinimalStage<T> uniAsMinimalStage() { 941 Object r; 942 if ((r = result) != null) 943 return new MinimalStage<T>(encodeRelay(r)); 944 MinimalStage<T> d = new MinimalStage<T>(); 945 UniRelay<T> c = new UniRelay<T>(d, this); 946 push(c); 947 c.tryFire(SYNC); 948 return d; 949 } 950 951 @SuppressWarnings("serial") 952 static final class UniCompose<T,V> extends UniCompletion<T,V> { 953 Function<? super T, ? extends CompletionStage<V>> fn; UniCompose(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, Function<? super T, ? extends CompletionStage<V>> fn)954 UniCompose(Executor executor, CompletableFuture<V> dep, 955 CompletableFuture<T> src, 956 Function<? super T, ? extends CompletionStage<V>> fn) { 957 super(executor, dep, src); this.fn = fn; 958 } tryFire(int mode)959 final CompletableFuture<V> tryFire(int mode) { 960 CompletableFuture<V> d; CompletableFuture<T> a; 961 if ((d = dep) == null || 962 !d.uniCompose(a = src, fn, mode > 0 ? null : this)) 963 return null; 964 dep = null; src = null; fn = null; 965 return d.postFire(a, mode); 966 } 967 } 968 uniCompose( CompletableFuture<S> a, Function<? super S, ? extends CompletionStage<T>> f, UniCompose<S,T> c)969 final <S> boolean uniCompose( 970 CompletableFuture<S> a, 971 Function<? super S, ? extends CompletionStage<T>> f, 972 UniCompose<S,T> c) { 973 Object r; Throwable x; 974 if (a == null || (r = a.result) == null || f == null) 975 return false; 976 tryComplete: if (result == null) { 977 if (r instanceof AltResult) { 978 if ((x = ((AltResult)r).ex) != null) { 979 completeThrowable(x, r); 980 break tryComplete; 981 } 982 r = null; 983 } 984 try { 985 if (c != null && !c.claim()) 986 return false; 987 @SuppressWarnings("unchecked") S s = (S) r; 988 CompletableFuture<T> g = f.apply(s).toCompletableFuture(); 989 if (g.result == null || !uniRelay(g)) { 990 UniRelay<T> copy = new UniRelay<T>(this, g); 991 g.push(copy); 992 copy.tryFire(SYNC); 993 if (result == null) 994 return false; 995 } 996 } catch (Throwable ex) { 997 completeThrowable(ex); 998 } 999 } 1000 return true; 1001 } 1002 uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f)1003 private <V> CompletableFuture<V> uniComposeStage( 1004 Executor e, Function<? super T, ? extends CompletionStage<V>> f) { 1005 if (f == null) throw new NullPointerException(); 1006 Object r, s; Throwable x; 1007 CompletableFuture<V> d = newIncompleteFuture(); 1008 if (e == null && (r = result) != null) { 1009 if (r instanceof AltResult) { 1010 if ((x = ((AltResult)r).ex) != null) { 1011 d.result = encodeThrowable(x, r); 1012 return d; 1013 } 1014 r = null; 1015 } 1016 try { 1017 @SuppressWarnings("unchecked") T t = (T) r; 1018 CompletableFuture<V> g = f.apply(t).toCompletableFuture(); 1019 if ((s = g.result) != null) 1020 d.completeRelay(s); 1021 else { 1022 UniRelay<V> c = new UniRelay<V>(d, g); 1023 g.push(c); 1024 c.tryFire(SYNC); 1025 } 1026 return d; 1027 } catch (Throwable ex) { 1028 d.result = encodeThrowable(ex); 1029 return d; 1030 } 1031 } 1032 UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); 1033 push(c); 1034 c.tryFire(SYNC); 1035 return d; 1036 } 1037 1038 /* ------------- Two-input Completions -------------- */ 1039 1040 /** A Completion for an action with two sources */ 1041 @SuppressWarnings("serial") 1042 abstract static class BiCompletion<T,U,V> extends UniCompletion<T,V> { 1043 CompletableFuture<U> snd; // second source for action BiCompletion(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1044 BiCompletion(Executor executor, CompletableFuture<V> dep, 1045 CompletableFuture<T> src, CompletableFuture<U> snd) { 1046 super(executor, dep, src); this.snd = snd; 1047 } 1048 } 1049 1050 /** A Completion delegating to a BiCompletion */ 1051 @SuppressWarnings("serial") 1052 static final class CoCompletion extends Completion { 1053 BiCompletion<?,?,?> base; CoCompletion(BiCompletion<?,?,?> base)1054 CoCompletion(BiCompletion<?,?,?> base) { this.base = base; } tryFire(int mode)1055 final CompletableFuture<?> tryFire(int mode) { 1056 BiCompletion<?,?,?> c; CompletableFuture<?> d; 1057 if ((c = base) == null || (d = c.tryFire(mode)) == null) 1058 return null; 1059 base = null; // detach 1060 return d; 1061 } isLive()1062 final boolean isLive() { 1063 BiCompletion<?,?,?> c; 1064 return (c = base) != null && c.dep != null; 1065 } 1066 } 1067 1068 /** Pushes completion to this and b unless both done. */ bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1069 final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1070 if (c != null) { 1071 Object r; 1072 while ((r = result) == null && !tryPushStack(c)) 1073 lazySetNext(c, null); // clear on failure 1074 if (b != null && b != this && b.result == null) { 1075 Completion q = (r != null) ? c : new CoCompletion(c); 1076 while (b.result == null && !b.tryPushStack(q)) 1077 lazySetNext(q, null); // clear on failure 1078 } 1079 } 1080 } 1081 1082 /** Post-processing after successful BiCompletion tryFire. */ postFire(CompletableFuture<?> a, CompletableFuture<?> b, int mode)1083 final CompletableFuture<T> postFire(CompletableFuture<?> a, 1084 CompletableFuture<?> b, int mode) { 1085 if (b != null && b.stack != null) { // clean second source 1086 if (mode < 0 || b.result == null) 1087 b.cleanStack(); 1088 else 1089 b.postComplete(); 1090 } 1091 return postFire(a, mode); 1092 } 1093 1094 @SuppressWarnings("serial") 1095 static final class BiApply<T,U,V> extends BiCompletion<T,U,V> { 1096 BiFunction<? super T,? super U,? extends V> fn; BiApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiFunction<? super T,? super U,? extends V> fn)1097 BiApply(Executor executor, CompletableFuture<V> dep, 1098 CompletableFuture<T> src, CompletableFuture<U> snd, 1099 BiFunction<? super T,? super U,? extends V> fn) { 1100 super(executor, dep, src, snd); this.fn = fn; 1101 } tryFire(int mode)1102 final CompletableFuture<V> tryFire(int mode) { 1103 CompletableFuture<V> d; 1104 CompletableFuture<T> a; 1105 CompletableFuture<U> b; 1106 if ((d = dep) == null || 1107 !d.biApply(a = src, b = snd, fn, mode > 0 ? null : this)) 1108 return null; 1109 dep = null; src = null; snd = null; fn = null; 1110 return d.postFire(a, b, mode); 1111 } 1112 } 1113 biApply(CompletableFuture<R> a, CompletableFuture<S> b, BiFunction<? super R,? super S,? extends T> f, BiApply<R,S,T> c)1114 final <R,S> boolean biApply(CompletableFuture<R> a, 1115 CompletableFuture<S> b, 1116 BiFunction<? super R,? super S,? extends T> f, 1117 BiApply<R,S,T> c) { 1118 Object r, s; Throwable x; 1119 if (a == null || (r = a.result) == null || 1120 b == null || (s = b.result) == null || f == null) 1121 return false; 1122 tryComplete: if (result == null) { 1123 if (r instanceof AltResult) { 1124 if ((x = ((AltResult)r).ex) != null) { 1125 completeThrowable(x, r); 1126 break tryComplete; 1127 } 1128 r = null; 1129 } 1130 if (s instanceof AltResult) { 1131 if ((x = ((AltResult)s).ex) != null) { 1132 completeThrowable(x, s); 1133 break tryComplete; 1134 } 1135 s = null; 1136 } 1137 try { 1138 if (c != null && !c.claim()) 1139 return false; 1140 @SuppressWarnings("unchecked") R rr = (R) r; 1141 @SuppressWarnings("unchecked") S ss = (S) s; 1142 completeValue(f.apply(rr, ss)); 1143 } catch (Throwable ex) { 1144 completeThrowable(ex); 1145 } 1146 } 1147 return true; 1148 } 1149 biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f)1150 private <U,V> CompletableFuture<V> biApplyStage( 1151 Executor e, CompletionStage<U> o, 1152 BiFunction<? super T,? super U,? extends V> f) { 1153 CompletableFuture<U> b; 1154 if (f == null || (b = o.toCompletableFuture()) == null) 1155 throw new NullPointerException(); 1156 CompletableFuture<V> d = newIncompleteFuture(); 1157 if (e != null || !d.biApply(this, b, f, null)) { 1158 BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); 1159 bipush(b, c); 1160 c.tryFire(SYNC); 1161 } 1162 return d; 1163 } 1164 1165 @SuppressWarnings("serial") 1166 static final class BiAccept<T,U> extends BiCompletion<T,U,Void> { 1167 BiConsumer<? super T,? super U> fn; BiAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, BiConsumer<? super T,? super U> fn)1168 BiAccept(Executor executor, CompletableFuture<Void> dep, 1169 CompletableFuture<T> src, CompletableFuture<U> snd, 1170 BiConsumer<? super T,? super U> fn) { 1171 super(executor, dep, src, snd); this.fn = fn; 1172 } tryFire(int mode)1173 final CompletableFuture<Void> tryFire(int mode) { 1174 CompletableFuture<Void> d; 1175 CompletableFuture<T> a; 1176 CompletableFuture<U> b; 1177 if ((d = dep) == null || 1178 !d.biAccept(a = src, b = snd, fn, mode > 0 ? null : this)) 1179 return null; 1180 dep = null; src = null; snd = null; fn = null; 1181 return d.postFire(a, b, mode); 1182 } 1183 } 1184 biAccept(CompletableFuture<R> a, CompletableFuture<S> b, BiConsumer<? super R,? super S> f, BiAccept<R,S> c)1185 final <R,S> boolean biAccept(CompletableFuture<R> a, 1186 CompletableFuture<S> b, 1187 BiConsumer<? super R,? super S> f, 1188 BiAccept<R,S> c) { 1189 Object r, s; Throwable x; 1190 if (a == null || (r = a.result) == null || 1191 b == null || (s = b.result) == null || f == null) 1192 return false; 1193 tryComplete: if (result == null) { 1194 if (r instanceof AltResult) { 1195 if ((x = ((AltResult)r).ex) != null) { 1196 completeThrowable(x, r); 1197 break tryComplete; 1198 } 1199 r = null; 1200 } 1201 if (s instanceof AltResult) { 1202 if ((x = ((AltResult)s).ex) != null) { 1203 completeThrowable(x, s); 1204 break tryComplete; 1205 } 1206 s = null; 1207 } 1208 try { 1209 if (c != null && !c.claim()) 1210 return false; 1211 @SuppressWarnings("unchecked") R rr = (R) r; 1212 @SuppressWarnings("unchecked") S ss = (S) s; 1213 f.accept(rr, ss); 1214 completeNull(); 1215 } catch (Throwable ex) { 1216 completeThrowable(ex); 1217 } 1218 } 1219 return true; 1220 } 1221 biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f)1222 private <U> CompletableFuture<Void> biAcceptStage( 1223 Executor e, CompletionStage<U> o, 1224 BiConsumer<? super T,? super U> f) { 1225 CompletableFuture<U> b; 1226 if (f == null || (b = o.toCompletableFuture()) == null) 1227 throw new NullPointerException(); 1228 CompletableFuture<Void> d = newIncompleteFuture(); 1229 if (e != null || !d.biAccept(this, b, f, null)) { 1230 BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); 1231 bipush(b, c); 1232 c.tryFire(SYNC); 1233 } 1234 return d; 1235 } 1236 1237 @SuppressWarnings("serial") 1238 static final class BiRun<T,U> extends BiCompletion<T,U,Void> { 1239 Runnable fn; BiRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1240 BiRun(Executor executor, CompletableFuture<Void> dep, 1241 CompletableFuture<T> src, 1242 CompletableFuture<U> snd, 1243 Runnable fn) { 1244 super(executor, dep, src, snd); this.fn = fn; 1245 } tryFire(int mode)1246 final CompletableFuture<Void> tryFire(int mode) { 1247 CompletableFuture<Void> d; 1248 CompletableFuture<T> a; 1249 CompletableFuture<U> b; 1250 if ((d = dep) == null || 1251 !d.biRun(a = src, b = snd, fn, mode > 0 ? null : this)) 1252 return null; 1253 dep = null; src = null; snd = null; fn = null; 1254 return d.postFire(a, b, mode); 1255 } 1256 } 1257 biRun(CompletableFuture<?> a, CompletableFuture<?> b, Runnable f, BiRun<?,?> c)1258 final boolean biRun(CompletableFuture<?> a, CompletableFuture<?> b, 1259 Runnable f, BiRun<?,?> c) { 1260 Object r, s; Throwable x; 1261 if (a == null || (r = a.result) == null || 1262 b == null || (s = b.result) == null || f == null) 1263 return false; 1264 if (result == null) { 1265 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 1266 completeThrowable(x, r); 1267 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) 1268 completeThrowable(x, s); 1269 else 1270 try { 1271 if (c != null && !c.claim()) 1272 return false; 1273 f.run(); 1274 completeNull(); 1275 } catch (Throwable ex) { 1276 completeThrowable(ex); 1277 } 1278 } 1279 return true; 1280 } 1281 biRunStage(Executor e, CompletionStage<?> o, Runnable f)1282 private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, 1283 Runnable f) { 1284 CompletableFuture<?> b; 1285 if (f == null || (b = o.toCompletableFuture()) == null) 1286 throw new NullPointerException(); 1287 CompletableFuture<Void> d = newIncompleteFuture(); 1288 if (e != null || !d.biRun(this, b, f, null)) { 1289 BiRun<T,?> c = new BiRun<>(e, d, this, b, f); 1290 bipush(b, c); 1291 c.tryFire(SYNC); 1292 } 1293 return d; 1294 } 1295 1296 @SuppressWarnings("serial") 1297 static final class BiRelay<T,U> extends BiCompletion<T,U,Void> { // for And BiRelay(CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1298 BiRelay(CompletableFuture<Void> dep, 1299 CompletableFuture<T> src, 1300 CompletableFuture<U> snd) { 1301 super(null, dep, src, snd); 1302 } tryFire(int mode)1303 final CompletableFuture<Void> tryFire(int mode) { 1304 CompletableFuture<Void> d; 1305 CompletableFuture<T> a; 1306 CompletableFuture<U> b; 1307 if ((d = dep) == null || !d.biRelay(a = src, b = snd)) 1308 return null; 1309 src = null; snd = null; dep = null; 1310 return d.postFire(a, b, mode); 1311 } 1312 } 1313 biRelay(CompletableFuture<?> a, CompletableFuture<?> b)1314 boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) { 1315 Object r, s; Throwable x; 1316 if (a == null || (r = a.result) == null || 1317 b == null || (s = b.result) == null) 1318 return false; 1319 if (result == null) { 1320 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 1321 completeThrowable(x, r); 1322 else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) 1323 completeThrowable(x, s); 1324 else 1325 completeNull(); 1326 } 1327 return true; 1328 } 1329 1330 /** Recursively constructs a tree of completions. */ andTree(CompletableFuture<?>[] cfs, int lo, int hi)1331 static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, 1332 int lo, int hi) { 1333 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1334 if (lo > hi) // empty 1335 d.result = NIL; 1336 else { 1337 CompletableFuture<?> a, b; 1338 int mid = (lo + hi) >>> 1; 1339 if ((a = (lo == mid ? cfs[lo] : 1340 andTree(cfs, lo, mid))) == null || 1341 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1342 andTree(cfs, mid+1, hi))) == null) 1343 throw new NullPointerException(); 1344 if (!d.biRelay(a, b)) { 1345 BiRelay<?,?> c = new BiRelay<>(d, a, b); 1346 a.bipush(b, c); 1347 c.tryFire(SYNC); 1348 } 1349 } 1350 return d; 1351 } 1352 1353 /* ------------- Projected (Ored) BiCompletions -------------- */ 1354 1355 /** Pushes completion to this and b unless either done. */ orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c)1356 final void orpush(CompletableFuture<?> b, BiCompletion<?,?,?> c) { 1357 if (c != null) { 1358 while ((b == null || b.result == null) && result == null) { 1359 if (tryPushStack(c)) { 1360 if (b != null && b != this && b.result == null) { 1361 Completion q = new CoCompletion(c); 1362 while (result == null && b.result == null && 1363 !b.tryPushStack(q)) 1364 lazySetNext(q, null); // clear on failure 1365 } 1366 break; 1367 } 1368 lazySetNext(c, null); // clear on failure 1369 } 1370 } 1371 } 1372 1373 @SuppressWarnings("serial") 1374 static final class OrApply<T,U extends T,V> extends BiCompletion<T,U,V> { 1375 Function<? super T,? extends V> fn; OrApply(Executor executor, CompletableFuture<V> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Function<? super T,? extends V> fn)1376 OrApply(Executor executor, CompletableFuture<V> dep, 1377 CompletableFuture<T> src, 1378 CompletableFuture<U> snd, 1379 Function<? super T,? extends V> fn) { 1380 super(executor, dep, src, snd); this.fn = fn; 1381 } tryFire(int mode)1382 final CompletableFuture<V> tryFire(int mode) { 1383 CompletableFuture<V> d; 1384 CompletableFuture<T> a; 1385 CompletableFuture<U> b; 1386 if ((d = dep) == null || 1387 !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this)) 1388 return null; 1389 dep = null; src = null; snd = null; fn = null; 1390 return d.postFire(a, b, mode); 1391 } 1392 } 1393 orApply(CompletableFuture<R> a, CompletableFuture<S> b, Function<? super R, ? extends T> f, OrApply<R,S,T> c)1394 final <R,S extends R> boolean orApply(CompletableFuture<R> a, 1395 CompletableFuture<S> b, 1396 Function<? super R, ? extends T> f, 1397 OrApply<R,S,T> c) { 1398 Object r; Throwable x; 1399 if (a == null || b == null || 1400 ((r = a.result) == null && (r = b.result) == null) || f == null) 1401 return false; 1402 tryComplete: if (result == null) { 1403 try { 1404 if (c != null && !c.claim()) 1405 return false; 1406 if (r instanceof AltResult) { 1407 if ((x = ((AltResult)r).ex) != null) { 1408 completeThrowable(x, r); 1409 break tryComplete; 1410 } 1411 r = null; 1412 } 1413 @SuppressWarnings("unchecked") R rr = (R) r; 1414 completeValue(f.apply(rr)); 1415 } catch (Throwable ex) { 1416 completeThrowable(ex); 1417 } 1418 } 1419 return true; 1420 } 1421 orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f)1422 private <U extends T,V> CompletableFuture<V> orApplyStage( 1423 Executor e, CompletionStage<U> o, 1424 Function<? super T, ? extends V> f) { 1425 CompletableFuture<U> b; 1426 if (f == null || (b = o.toCompletableFuture()) == null) 1427 throw new NullPointerException(); 1428 CompletableFuture<V> d = newIncompleteFuture(); 1429 if (e != null || !d.orApply(this, b, f, null)) { 1430 OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); 1431 orpush(b, c); 1432 c.tryFire(SYNC); 1433 } 1434 return d; 1435 } 1436 1437 @SuppressWarnings("serial") 1438 static final class OrAccept<T,U extends T> extends BiCompletion<T,U,Void> { 1439 Consumer<? super T> fn; OrAccept(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Consumer<? super T> fn)1440 OrAccept(Executor executor, CompletableFuture<Void> dep, 1441 CompletableFuture<T> src, 1442 CompletableFuture<U> snd, 1443 Consumer<? super T> fn) { 1444 super(executor, dep, src, snd); this.fn = fn; 1445 } tryFire(int mode)1446 final CompletableFuture<Void> tryFire(int mode) { 1447 CompletableFuture<Void> d; 1448 CompletableFuture<T> a; 1449 CompletableFuture<U> b; 1450 if ((d = dep) == null || 1451 !d.orAccept(a = src, b = snd, fn, mode > 0 ? null : this)) 1452 return null; 1453 dep = null; src = null; snd = null; fn = null; 1454 return d.postFire(a, b, mode); 1455 } 1456 } 1457 orAccept(CompletableFuture<R> a, CompletableFuture<S> b, Consumer<? super R> f, OrAccept<R,S> c)1458 final <R,S extends R> boolean orAccept(CompletableFuture<R> a, 1459 CompletableFuture<S> b, 1460 Consumer<? super R> f, 1461 OrAccept<R,S> c) { 1462 Object r; Throwable x; 1463 if (a == null || b == null || 1464 ((r = a.result) == null && (r = b.result) == null) || f == null) 1465 return false; 1466 tryComplete: if (result == null) { 1467 try { 1468 if (c != null && !c.claim()) 1469 return false; 1470 if (r instanceof AltResult) { 1471 if ((x = ((AltResult)r).ex) != null) { 1472 completeThrowable(x, r); 1473 break tryComplete; 1474 } 1475 r = null; 1476 } 1477 @SuppressWarnings("unchecked") R rr = (R) r; 1478 f.accept(rr); 1479 completeNull(); 1480 } catch (Throwable ex) { 1481 completeThrowable(ex); 1482 } 1483 } 1484 return true; 1485 } 1486 orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f)1487 private <U extends T> CompletableFuture<Void> orAcceptStage( 1488 Executor e, CompletionStage<U> o, Consumer<? super T> f) { 1489 CompletableFuture<U> b; 1490 if (f == null || (b = o.toCompletableFuture()) == null) 1491 throw new NullPointerException(); 1492 CompletableFuture<Void> d = newIncompleteFuture(); 1493 if (e != null || !d.orAccept(this, b, f, null)) { 1494 OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); 1495 orpush(b, c); 1496 c.tryFire(SYNC); 1497 } 1498 return d; 1499 } 1500 1501 @SuppressWarnings("serial") 1502 static final class OrRun<T,U> extends BiCompletion<T,U,Void> { 1503 Runnable fn; OrRun(Executor executor, CompletableFuture<Void> dep, CompletableFuture<T> src, CompletableFuture<U> snd, Runnable fn)1504 OrRun(Executor executor, CompletableFuture<Void> dep, 1505 CompletableFuture<T> src, 1506 CompletableFuture<U> snd, 1507 Runnable fn) { 1508 super(executor, dep, src, snd); this.fn = fn; 1509 } tryFire(int mode)1510 final CompletableFuture<Void> tryFire(int mode) { 1511 CompletableFuture<Void> d; 1512 CompletableFuture<T> a; 1513 CompletableFuture<U> b; 1514 if ((d = dep) == null || 1515 !d.orRun(a = src, b = snd, fn, mode > 0 ? null : this)) 1516 return null; 1517 dep = null; src = null; snd = null; fn = null; 1518 return d.postFire(a, b, mode); 1519 } 1520 } 1521 orRun(CompletableFuture<?> a, CompletableFuture<?> b, Runnable f, OrRun<?,?> c)1522 final boolean orRun(CompletableFuture<?> a, CompletableFuture<?> b, 1523 Runnable f, OrRun<?,?> c) { 1524 Object r; Throwable x; 1525 if (a == null || b == null || 1526 ((r = a.result) == null && (r = b.result) == null) || f == null) 1527 return false; 1528 if (result == null) { 1529 try { 1530 if (c != null && !c.claim()) 1531 return false; 1532 if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) 1533 completeThrowable(x, r); 1534 else { 1535 f.run(); 1536 completeNull(); 1537 } 1538 } catch (Throwable ex) { 1539 completeThrowable(ex); 1540 } 1541 } 1542 return true; 1543 } 1544 orRunStage(Executor e, CompletionStage<?> o, Runnable f)1545 private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, 1546 Runnable f) { 1547 CompletableFuture<?> b; 1548 if (f == null || (b = o.toCompletableFuture()) == null) 1549 throw new NullPointerException(); 1550 CompletableFuture<Void> d = newIncompleteFuture(); 1551 if (e != null || !d.orRun(this, b, f, null)) { 1552 OrRun<T,?> c = new OrRun<>(e, d, this, b, f); 1553 orpush(b, c); 1554 c.tryFire(SYNC); 1555 } 1556 return d; 1557 } 1558 1559 @SuppressWarnings("serial") 1560 static final class OrRelay<T,U> extends BiCompletion<T,U,Object> { // for Or OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, CompletableFuture<U> snd)1561 OrRelay(CompletableFuture<Object> dep, CompletableFuture<T> src, 1562 CompletableFuture<U> snd) { 1563 super(null, dep, src, snd); 1564 } tryFire(int mode)1565 final CompletableFuture<Object> tryFire(int mode) { 1566 CompletableFuture<Object> d; 1567 CompletableFuture<T> a; 1568 CompletableFuture<U> b; 1569 if ((d = dep) == null || !d.orRelay(a = src, b = snd)) 1570 return null; 1571 src = null; snd = null; dep = null; 1572 return d.postFire(a, b, mode); 1573 } 1574 } 1575 orRelay(CompletableFuture<?> a, CompletableFuture<?> b)1576 final boolean orRelay(CompletableFuture<?> a, CompletableFuture<?> b) { 1577 Object r; 1578 if (a == null || b == null || 1579 ((r = a.result) == null && (r = b.result) == null)) 1580 return false; 1581 if (result == null) 1582 completeRelay(r); 1583 return true; 1584 } 1585 1586 /** Recursively constructs a tree of completions. */ orTree(CompletableFuture<?>[] cfs, int lo, int hi)1587 static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, 1588 int lo, int hi) { 1589 CompletableFuture<Object> d = new CompletableFuture<Object>(); 1590 if (lo <= hi) { 1591 CompletableFuture<?> a, b; 1592 int mid = (lo + hi) >>> 1; 1593 if ((a = (lo == mid ? cfs[lo] : 1594 orTree(cfs, lo, mid))) == null || 1595 (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : 1596 orTree(cfs, mid+1, hi))) == null) 1597 throw new NullPointerException(); 1598 if (!d.orRelay(a, b)) { 1599 OrRelay<?,?> c = new OrRelay<>(d, a, b); 1600 a.orpush(b, c); 1601 c.tryFire(SYNC); 1602 } 1603 } 1604 return d; 1605 } 1606 1607 /* ------------- Zero-input Async forms -------------- */ 1608 1609 @SuppressWarnings("serial") 1610 static final class AsyncSupply<T> extends ForkJoinTask<Void> 1611 implements Runnable, AsynchronousCompletionTask { 1612 CompletableFuture<T> dep; Supplier<? extends T> fn; AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn)1613 AsyncSupply(CompletableFuture<T> dep, Supplier<? extends T> fn) { 1614 this.dep = dep; this.fn = fn; 1615 } 1616 getRawResult()1617 public final Void getRawResult() { return null; } setRawResult(Void v)1618 public final void setRawResult(Void v) {} exec()1619 public final boolean exec() { run(); return true; } 1620 run()1621 public void run() { 1622 CompletableFuture<T> d; Supplier<? extends T> f; 1623 if ((d = dep) != null && (f = fn) != null) { 1624 dep = null; fn = null; 1625 if (d.result == null) { 1626 try { 1627 d.completeValue(f.get()); 1628 } catch (Throwable ex) { 1629 d.completeThrowable(ex); 1630 } 1631 } 1632 d.postComplete(); 1633 } 1634 } 1635 } 1636 asyncSupplyStage(Executor e, Supplier<U> f)1637 static <U> CompletableFuture<U> asyncSupplyStage(Executor e, 1638 Supplier<U> f) { 1639 if (f == null) throw new NullPointerException(); 1640 CompletableFuture<U> d = new CompletableFuture<U>(); 1641 e.execute(new AsyncSupply<U>(d, f)); 1642 return d; 1643 } 1644 1645 @SuppressWarnings("serial") 1646 static final class AsyncRun extends ForkJoinTask<Void> 1647 implements Runnable, AsynchronousCompletionTask { 1648 CompletableFuture<Void> dep; Runnable fn; AsyncRun(CompletableFuture<Void> dep, Runnable fn)1649 AsyncRun(CompletableFuture<Void> dep, Runnable fn) { 1650 this.dep = dep; this.fn = fn; 1651 } 1652 getRawResult()1653 public final Void getRawResult() { return null; } setRawResult(Void v)1654 public final void setRawResult(Void v) {} exec()1655 public final boolean exec() { run(); return true; } 1656 run()1657 public void run() { 1658 CompletableFuture<Void> d; Runnable f; 1659 if ((d = dep) != null && (f = fn) != null) { 1660 dep = null; fn = null; 1661 if (d.result == null) { 1662 try { 1663 f.run(); 1664 d.completeNull(); 1665 } catch (Throwable ex) { 1666 d.completeThrowable(ex); 1667 } 1668 } 1669 d.postComplete(); 1670 } 1671 } 1672 } 1673 asyncRunStage(Executor e, Runnable f)1674 static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { 1675 if (f == null) throw new NullPointerException(); 1676 CompletableFuture<Void> d = new CompletableFuture<Void>(); 1677 e.execute(new AsyncRun(d, f)); 1678 return d; 1679 } 1680 1681 /* ------------- Signallers -------------- */ 1682 1683 /** 1684 * Completion for recording and releasing a waiting thread. This 1685 * class implements ManagedBlocker to avoid starvation when 1686 * blocking actions pile up in ForkJoinPools. 1687 */ 1688 @SuppressWarnings("serial") 1689 static final class Signaller extends Completion 1690 implements ForkJoinPool.ManagedBlocker { 1691 long nanos; // remaining wait time if timed 1692 final long deadline; // non-zero if timed 1693 final boolean interruptible; 1694 boolean interrupted; 1695 volatile Thread thread; 1696 Signaller(boolean interruptible, long nanos, long deadline)1697 Signaller(boolean interruptible, long nanos, long deadline) { 1698 this.thread = Thread.currentThread(); 1699 this.interruptible = interruptible; 1700 this.nanos = nanos; 1701 this.deadline = deadline; 1702 } tryFire(int ignore)1703 final CompletableFuture<?> tryFire(int ignore) { 1704 Thread w; // no need to atomically claim 1705 if ((w = thread) != null) { 1706 thread = null; 1707 LockSupport.unpark(w); 1708 } 1709 return null; 1710 } isReleasable()1711 public boolean isReleasable() { 1712 if (Thread.interrupted()) 1713 interrupted = true; 1714 return ((interrupted && interruptible) || 1715 (deadline != 0L && 1716 (nanos <= 0L || 1717 (nanos = deadline - System.nanoTime()) <= 0L)) || 1718 thread == null); 1719 } block()1720 public boolean block() { 1721 while (!isReleasable()) { 1722 if (deadline == 0L) 1723 LockSupport.park(this); 1724 else 1725 LockSupport.parkNanos(this, nanos); 1726 } 1727 return true; 1728 } isLive()1729 final boolean isLive() { return thread != null; } 1730 } 1731 1732 /** 1733 * Returns raw result after waiting, or null if interruptible and 1734 * interrupted. 1735 */ waitingGet(boolean interruptible)1736 private Object waitingGet(boolean interruptible) { 1737 Signaller q = null; 1738 boolean queued = false; 1739 int spins = SPINS; 1740 Object r; 1741 while ((r = result) == null) { 1742 if (spins > 0) { 1743 if (ThreadLocalRandom.nextSecondarySeed() >= 0) 1744 --spins; 1745 } 1746 else if (q == null) 1747 q = new Signaller(interruptible, 0L, 0L); 1748 else if (!queued) 1749 queued = tryPushStack(q); 1750 else { 1751 try { 1752 ForkJoinPool.managedBlock(q); 1753 } catch (InterruptedException ie) { // currently cannot happen 1754 q.interrupted = true; 1755 } 1756 if (q.interrupted && interruptible) 1757 break; 1758 } 1759 } 1760 if (q != null) { 1761 q.thread = null; 1762 if (q.interrupted) { 1763 if (interruptible) 1764 cleanStack(); 1765 else 1766 Thread.currentThread().interrupt(); 1767 } 1768 } 1769 if (r != null) 1770 postComplete(); 1771 return r; 1772 } 1773 1774 /** 1775 * Returns raw result after waiting, or null if interrupted, or 1776 * throws TimeoutException on timeout. 1777 */ timedGet(long nanos)1778 private Object timedGet(long nanos) throws TimeoutException { 1779 if (Thread.interrupted()) 1780 return null; 1781 if (nanos > 0L) { 1782 long d = System.nanoTime() + nanos; 1783 long deadline = (d == 0L) ? 1L : d; // avoid 0 1784 Signaller q = null; 1785 boolean queued = false; 1786 Object r; 1787 while ((r = result) == null) { // similar to untimed, without spins 1788 if (q == null) 1789 q = new Signaller(true, nanos, deadline); 1790 else if (!queued) 1791 queued = tryPushStack(q); 1792 else if (q.nanos <= 0L) 1793 break; 1794 else { 1795 try { 1796 ForkJoinPool.managedBlock(q); 1797 } catch (InterruptedException ie) { 1798 q.interrupted = true; 1799 } 1800 if (q.interrupted) 1801 break; 1802 } 1803 } 1804 if (q != null) 1805 q.thread = null; 1806 if (r != null) 1807 postComplete(); 1808 else 1809 cleanStack(); 1810 if (r != null || (q != null && q.interrupted)) 1811 return r; 1812 } 1813 throw new TimeoutException(); 1814 } 1815 1816 /* ------------- public methods -------------- */ 1817 1818 /** 1819 * Creates a new incomplete CompletableFuture. 1820 */ CompletableFuture()1821 public CompletableFuture() { 1822 } 1823 1824 /** 1825 * Creates a new complete CompletableFuture with given encoded result. 1826 */ CompletableFuture(Object r)1827 CompletableFuture(Object r) { 1828 this.result = r; 1829 } 1830 1831 /** 1832 * Returns a new CompletableFuture that is asynchronously completed 1833 * by a task running in the {@link ForkJoinPool#commonPool()} with 1834 * the value obtained by calling the given Supplier. 1835 * 1836 * @param supplier a function returning the value to be used 1837 * to complete the returned CompletableFuture 1838 * @param <U> the function's return type 1839 * @return the new CompletableFuture 1840 */ supplyAsync(Supplier<U> supplier)1841 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { 1842 return asyncSupplyStage(ASYNC_POOL, supplier); 1843 } 1844 1845 /** 1846 * Returns a new CompletableFuture that is asynchronously completed 1847 * by a task running in the given executor with the value obtained 1848 * by calling the given Supplier. 1849 * 1850 * @param supplier a function returning the value to be used 1851 * to complete the returned CompletableFuture 1852 * @param executor the executor to use for asynchronous execution 1853 * @param <U> the function's return type 1854 * @return the new CompletableFuture 1855 */ supplyAsync(Supplier<U> supplier, Executor executor)1856 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, 1857 Executor executor) { 1858 return asyncSupplyStage(screenExecutor(executor), supplier); 1859 } 1860 1861 /** 1862 * Returns a new CompletableFuture that is asynchronously completed 1863 * by a task running in the {@link ForkJoinPool#commonPool()} after 1864 * it runs the given action. 1865 * 1866 * @param runnable the action to run before completing the 1867 * returned CompletableFuture 1868 * @return the new CompletableFuture 1869 */ runAsync(Runnable runnable)1870 public static CompletableFuture<Void> runAsync(Runnable runnable) { 1871 return asyncRunStage(ASYNC_POOL, runnable); 1872 } 1873 1874 /** 1875 * Returns a new CompletableFuture that is asynchronously completed 1876 * by a task running in the given executor after it runs the given 1877 * action. 1878 * 1879 * @param runnable the action to run before completing the 1880 * returned CompletableFuture 1881 * @param executor the executor to use for asynchronous execution 1882 * @return the new CompletableFuture 1883 */ runAsync(Runnable runnable, Executor executor)1884 public static CompletableFuture<Void> runAsync(Runnable runnable, 1885 Executor executor) { 1886 return asyncRunStage(screenExecutor(executor), runnable); 1887 } 1888 1889 /** 1890 * Returns a new CompletableFuture that is already completed with 1891 * the given value. 1892 * 1893 * @param value the value 1894 * @param <U> the type of the value 1895 * @return the completed CompletableFuture 1896 */ completedFuture(U value)1897 public static <U> CompletableFuture<U> completedFuture(U value) { 1898 return new CompletableFuture<U>((value == null) ? NIL : value); 1899 } 1900 1901 /** 1902 * Returns {@code true} if completed in any fashion: normally, 1903 * exceptionally, or via cancellation. 1904 * 1905 * @return {@code true} if completed 1906 */ isDone()1907 public boolean isDone() { 1908 return result != null; 1909 } 1910 1911 /** 1912 * Waits if necessary for this future to complete, and then 1913 * returns its result. 1914 * 1915 * @return the result value 1916 * @throws CancellationException if this future was cancelled 1917 * @throws ExecutionException if this future completed exceptionally 1918 * @throws InterruptedException if the current thread was interrupted 1919 * while waiting 1920 */ get()1921 public T get() throws InterruptedException, ExecutionException { 1922 Object r; 1923 return reportGet((r = result) == null ? waitingGet(true) : r); 1924 } 1925 1926 /** 1927 * Waits if necessary for at most the given time for this future 1928 * to complete, and then returns its result, if available. 1929 * 1930 * @param timeout the maximum time to wait 1931 * @param unit the time unit of the timeout argument 1932 * @return the result value 1933 * @throws CancellationException if this future was cancelled 1934 * @throws ExecutionException if this future completed exceptionally 1935 * @throws InterruptedException if the current thread was interrupted 1936 * while waiting 1937 * @throws TimeoutException if the wait timed out 1938 */ get(long timeout, TimeUnit unit)1939 public T get(long timeout, TimeUnit unit) 1940 throws InterruptedException, ExecutionException, TimeoutException { 1941 Object r; 1942 long nanos = unit.toNanos(timeout); 1943 return reportGet((r = result) == null ? timedGet(nanos) : r); 1944 } 1945 1946 /** 1947 * Returns the result value when complete, or throws an 1948 * (unchecked) exception if completed exceptionally. To better 1949 * conform with the use of common functional forms, if a 1950 * computation involved in the completion of this 1951 * CompletableFuture threw an exception, this method throws an 1952 * (unchecked) {@link CompletionException} with the underlying 1953 * exception as its cause. 1954 * 1955 * @return the result value 1956 * @throws CancellationException if the computation was cancelled 1957 * @throws CompletionException if this future completed 1958 * exceptionally or a completion computation threw an exception 1959 */ join()1960 public T join() { 1961 Object r; 1962 return reportJoin((r = result) == null ? waitingGet(false) : r); 1963 } 1964 1965 /** 1966 * Returns the result value (or throws any encountered exception) 1967 * if completed, else returns the given valueIfAbsent. 1968 * 1969 * @param valueIfAbsent the value to return if not completed 1970 * @return the result value, if completed, else the given valueIfAbsent 1971 * @throws CancellationException if the computation was cancelled 1972 * @throws CompletionException if this future completed 1973 * exceptionally or a completion computation threw an exception 1974 */ getNow(T valueIfAbsent)1975 public T getNow(T valueIfAbsent) { 1976 Object r; 1977 return ((r = result) == null) ? valueIfAbsent : reportJoin(r); 1978 } 1979 1980 /** 1981 * If not already completed, sets the value returned by {@link 1982 * #get()} and related methods to the given value. 1983 * 1984 * @param value the result value 1985 * @return {@code true} if this invocation caused this CompletableFuture 1986 * to transition to a completed state, else {@code false} 1987 */ complete(T value)1988 public boolean complete(T value) { 1989 boolean triggered = completeValue(value); 1990 postComplete(); 1991 return triggered; 1992 } 1993 1994 /** 1995 * If not already completed, causes invocations of {@link #get()} 1996 * and related methods to throw the given exception. 1997 * 1998 * @param ex the exception 1999 * @return {@code true} if this invocation caused this CompletableFuture 2000 * to transition to a completed state, else {@code false} 2001 */ completeExceptionally(Throwable ex)2002 public boolean completeExceptionally(Throwable ex) { 2003 if (ex == null) throw new NullPointerException(); 2004 boolean triggered = internalComplete(new AltResult(ex)); 2005 postComplete(); 2006 return triggered; 2007 } 2008 thenApply( Function<? super T,? extends U> fn)2009 public <U> CompletableFuture<U> thenApply( 2010 Function<? super T,? extends U> fn) { 2011 return uniApplyStage(null, fn); 2012 } 2013 thenApplyAsync( Function<? super T,? extends U> fn)2014 public <U> CompletableFuture<U> thenApplyAsync( 2015 Function<? super T,? extends U> fn) { 2016 return uniApplyStage(defaultExecutor(), fn); 2017 } 2018 thenApplyAsync( Function<? super T,? extends U> fn, Executor executor)2019 public <U> CompletableFuture<U> thenApplyAsync( 2020 Function<? super T,? extends U> fn, Executor executor) { 2021 return uniApplyStage(screenExecutor(executor), fn); 2022 } 2023 thenAccept(Consumer<? super T> action)2024 public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { 2025 return uniAcceptStage(null, action); 2026 } 2027 thenAcceptAsync(Consumer<? super T> action)2028 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { 2029 return uniAcceptStage(defaultExecutor(), action); 2030 } 2031 thenAcceptAsync(Consumer<? super T> action, Executor executor)2032 public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, 2033 Executor executor) { 2034 return uniAcceptStage(screenExecutor(executor), action); 2035 } 2036 thenRun(Runnable action)2037 public CompletableFuture<Void> thenRun(Runnable action) { 2038 return uniRunStage(null, action); 2039 } 2040 thenRunAsync(Runnable action)2041 public CompletableFuture<Void> thenRunAsync(Runnable action) { 2042 return uniRunStage(defaultExecutor(), action); 2043 } 2044 thenRunAsync(Runnable action, Executor executor)2045 public CompletableFuture<Void> thenRunAsync(Runnable action, 2046 Executor executor) { 2047 return uniRunStage(screenExecutor(executor), action); 2048 } 2049 thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2050 public <U,V> CompletableFuture<V> thenCombine( 2051 CompletionStage<? extends U> other, 2052 BiFunction<? super T,? super U,? extends V> fn) { 2053 return biApplyStage(null, other, fn); 2054 } 2055 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)2056 public <U,V> CompletableFuture<V> thenCombineAsync( 2057 CompletionStage<? extends U> other, 2058 BiFunction<? super T,? super U,? extends V> fn) { 2059 return biApplyStage(defaultExecutor(), other, fn); 2060 } 2061 thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)2062 public <U,V> CompletableFuture<V> thenCombineAsync( 2063 CompletionStage<? extends U> other, 2064 BiFunction<? super T,? super U,? extends V> fn, Executor executor) { 2065 return biApplyStage(screenExecutor(executor), other, fn); 2066 } 2067 thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2068 public <U> CompletableFuture<Void> thenAcceptBoth( 2069 CompletionStage<? extends U> other, 2070 BiConsumer<? super T, ? super U> action) { 2071 return biAcceptStage(null, other, action); 2072 } 2073 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)2074 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2075 CompletionStage<? extends U> other, 2076 BiConsumer<? super T, ? super U> action) { 2077 return biAcceptStage(defaultExecutor(), other, action); 2078 } 2079 thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)2080 public <U> CompletableFuture<Void> thenAcceptBothAsync( 2081 CompletionStage<? extends U> other, 2082 BiConsumer<? super T, ? super U> action, Executor executor) { 2083 return biAcceptStage(screenExecutor(executor), other, action); 2084 } 2085 runAfterBoth(CompletionStage<?> other, Runnable action)2086 public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, 2087 Runnable action) { 2088 return biRunStage(null, other, action); 2089 } 2090 runAfterBothAsync(CompletionStage<?> other, Runnable action)2091 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2092 Runnable action) { 2093 return biRunStage(defaultExecutor(), other, action); 2094 } 2095 runAfterBothAsync(CompletionStage<?> other, Runnable action, Executor executor)2096 public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, 2097 Runnable action, 2098 Executor executor) { 2099 return biRunStage(screenExecutor(executor), other, action); 2100 } 2101 applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn)2102 public <U> CompletableFuture<U> applyToEither( 2103 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2104 return orApplyStage(null, other, fn); 2105 } 2106 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn)2107 public <U> CompletableFuture<U> applyToEitherAsync( 2108 CompletionStage<? extends T> other, Function<? super T, U> fn) { 2109 return orApplyStage(defaultExecutor(), other, fn); 2110 } 2111 applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor)2112 public <U> CompletableFuture<U> applyToEitherAsync( 2113 CompletionStage<? extends T> other, Function<? super T, U> fn, 2114 Executor executor) { 2115 return orApplyStage(screenExecutor(executor), other, fn); 2116 } 2117 acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action)2118 public CompletableFuture<Void> acceptEither( 2119 CompletionStage<? extends T> other, Consumer<? super T> action) { 2120 return orAcceptStage(null, other, action); 2121 } 2122 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action)2123 public CompletableFuture<Void> acceptEitherAsync( 2124 CompletionStage<? extends T> other, Consumer<? super T> action) { 2125 return orAcceptStage(defaultExecutor(), other, action); 2126 } 2127 acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)2128 public CompletableFuture<Void> acceptEitherAsync( 2129 CompletionStage<? extends T> other, Consumer<? super T> action, 2130 Executor executor) { 2131 return orAcceptStage(screenExecutor(executor), other, action); 2132 } 2133 runAfterEither(CompletionStage<?> other, Runnable action)2134 public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, 2135 Runnable action) { 2136 return orRunStage(null, other, action); 2137 } 2138 runAfterEitherAsync(CompletionStage<?> other, Runnable action)2139 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2140 Runnable action) { 2141 return orRunStage(defaultExecutor(), other, action); 2142 } 2143 runAfterEitherAsync(CompletionStage<?> other, Runnable action, Executor executor)2144 public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, 2145 Runnable action, 2146 Executor executor) { 2147 return orRunStage(screenExecutor(executor), other, action); 2148 } 2149 thenCompose( Function<? super T, ? extends CompletionStage<U>> fn)2150 public <U> CompletableFuture<U> thenCompose( 2151 Function<? super T, ? extends CompletionStage<U>> fn) { 2152 return uniComposeStage(null, fn); 2153 } 2154 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn)2155 public <U> CompletableFuture<U> thenComposeAsync( 2156 Function<? super T, ? extends CompletionStage<U>> fn) { 2157 return uniComposeStage(defaultExecutor(), fn); 2158 } 2159 thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)2160 public <U> CompletableFuture<U> thenComposeAsync( 2161 Function<? super T, ? extends CompletionStage<U>> fn, 2162 Executor executor) { 2163 return uniComposeStage(screenExecutor(executor), fn); 2164 } 2165 whenComplete( BiConsumer<? super T, ? super Throwable> action)2166 public CompletableFuture<T> whenComplete( 2167 BiConsumer<? super T, ? super Throwable> action) { 2168 return uniWhenCompleteStage(null, action); 2169 } 2170 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action)2171 public CompletableFuture<T> whenCompleteAsync( 2172 BiConsumer<? super T, ? super Throwable> action) { 2173 return uniWhenCompleteStage(defaultExecutor(), action); 2174 } 2175 whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action, Executor executor)2176 public CompletableFuture<T> whenCompleteAsync( 2177 BiConsumer<? super T, ? super Throwable> action, Executor executor) { 2178 return uniWhenCompleteStage(screenExecutor(executor), action); 2179 } 2180 handle( BiFunction<? super T, Throwable, ? extends U> fn)2181 public <U> CompletableFuture<U> handle( 2182 BiFunction<? super T, Throwable, ? extends U> fn) { 2183 return uniHandleStage(null, fn); 2184 } 2185 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn)2186 public <U> CompletableFuture<U> handleAsync( 2187 BiFunction<? super T, Throwable, ? extends U> fn) { 2188 return uniHandleStage(defaultExecutor(), fn); 2189 } 2190 handleAsync( BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)2191 public <U> CompletableFuture<U> handleAsync( 2192 BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) { 2193 return uniHandleStage(screenExecutor(executor), fn); 2194 } 2195 2196 /** 2197 * Returns this CompletableFuture. 2198 * 2199 * @return this CompletableFuture 2200 */ toCompletableFuture()2201 public CompletableFuture<T> toCompletableFuture() { 2202 return this; 2203 } 2204 2205 // not in interface CompletionStage 2206 2207 /** 2208 * Returns a new CompletableFuture that is completed when this 2209 * CompletableFuture completes, with the result of the given 2210 * function of the exception triggering this CompletableFuture's 2211 * completion when it completes exceptionally; otherwise, if this 2212 * CompletableFuture completes normally, then the returned 2213 * CompletableFuture also completes normally with the same value. 2214 * Note: More flexible versions of this functionality are 2215 * available using methods {@code whenComplete} and {@code handle}. 2216 * 2217 * @param fn the function to use to compute the value of the 2218 * returned CompletableFuture if this CompletableFuture completed 2219 * exceptionally 2220 * @return the new CompletableFuture 2221 */ exceptionally( Function<Throwable, ? extends T> fn)2222 public CompletableFuture<T> exceptionally( 2223 Function<Throwable, ? extends T> fn) { 2224 return uniExceptionallyStage(fn); 2225 } 2226 2227 2228 /* ------------- Arbitrary-arity constructions -------------- */ 2229 2230 /** 2231 * Returns a new CompletableFuture that is completed when all of 2232 * the given CompletableFutures complete. If any of the given 2233 * CompletableFutures complete exceptionally, then the returned 2234 * CompletableFuture also does so, with a CompletionException 2235 * holding this exception as its cause. Otherwise, the results, 2236 * if any, of the given CompletableFutures are not reflected in 2237 * the returned CompletableFuture, but may be obtained by 2238 * inspecting them individually. If no CompletableFutures are 2239 * provided, returns a CompletableFuture completed with the value 2240 * {@code null}. 2241 * 2242 * <p>Among the applications of this method is to await completion 2243 * of a set of independent CompletableFutures before continuing a 2244 * program, as in: {@code CompletableFuture.allOf(c1, c2, 2245 * c3).join();}. 2246 * 2247 * @param cfs the CompletableFutures 2248 * @return a new CompletableFuture that is completed when all of the 2249 * given CompletableFutures complete 2250 * @throws NullPointerException if the array or any of its elements are 2251 * {@code null} 2252 */ allOf(CompletableFuture<?>.... cfs)2253 public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { 2254 return andTree(cfs, 0, cfs.length - 1); 2255 } 2256 2257 /** 2258 * Returns a new CompletableFuture that is completed when any of 2259 * the given CompletableFutures complete, with the same result. 2260 * Otherwise, if it completed exceptionally, the returned 2261 * CompletableFuture also does so, with a CompletionException 2262 * holding this exception as its cause. If no CompletableFutures 2263 * are provided, returns an incomplete CompletableFuture. 2264 * 2265 * @param cfs the CompletableFutures 2266 * @return a new CompletableFuture that is completed with the 2267 * result or exception of any of the given CompletableFutures when 2268 * one completes 2269 * @throws NullPointerException if the array or any of its elements are 2270 * {@code null} 2271 */ anyOf(CompletableFuture<?>.... cfs)2272 public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { 2273 return orTree(cfs, 0, cfs.length - 1); 2274 } 2275 2276 /* ------------- Control and status methods -------------- */ 2277 2278 /** 2279 * If not already completed, completes this CompletableFuture with 2280 * a {@link CancellationException}. Dependent CompletableFutures 2281 * that have not already completed will also complete 2282 * exceptionally, with a {@link CompletionException} caused by 2283 * this {@code CancellationException}. 2284 * 2285 * @param mayInterruptIfRunning this value has no effect in this 2286 * implementation because interrupts are not used to control 2287 * processing. 2288 * 2289 * @return {@code true} if this task is now cancelled 2290 */ cancel(boolean mayInterruptIfRunning)2291 public boolean cancel(boolean mayInterruptIfRunning) { 2292 boolean cancelled = (result == null) && 2293 internalComplete(new AltResult(new CancellationException())); 2294 postComplete(); 2295 return cancelled || isCancelled(); 2296 } 2297 2298 /** 2299 * Returns {@code true} if this CompletableFuture was cancelled 2300 * before it completed normally. 2301 * 2302 * @return {@code true} if this CompletableFuture was cancelled 2303 * before it completed normally 2304 */ isCancelled()2305 public boolean isCancelled() { 2306 Object r; 2307 return ((r = result) instanceof AltResult) && 2308 (((AltResult)r).ex instanceof CancellationException); 2309 } 2310 2311 /** 2312 * Returns {@code true} if this CompletableFuture completed 2313 * exceptionally, in any way. Possible causes include 2314 * cancellation, explicit invocation of {@code 2315 * completeExceptionally}, and abrupt termination of a 2316 * CompletionStage action. 2317 * 2318 * @return {@code true} if this CompletableFuture completed 2319 * exceptionally 2320 */ isCompletedExceptionally()2321 public boolean isCompletedExceptionally() { 2322 Object r; 2323 return ((r = result) instanceof AltResult) && r != NIL; 2324 } 2325 2326 /** 2327 * Forcibly sets or resets the value subsequently returned by 2328 * method {@link #get()} and related methods, whether or not 2329 * already completed. This method is designed for use only in 2330 * error recovery actions, and even in such situations may result 2331 * in ongoing dependent completions using established versus 2332 * overwritten outcomes. 2333 * 2334 * @param value the completion value 2335 */ obtrudeValue(T value)2336 public void obtrudeValue(T value) { 2337 result = (value == null) ? NIL : value; 2338 postComplete(); 2339 } 2340 2341 /** 2342 * Forcibly causes subsequent invocations of method {@link #get()} 2343 * and related methods to throw the given exception, whether or 2344 * not already completed. This method is designed for use only in 2345 * error recovery actions, and even in such situations may result 2346 * in ongoing dependent completions using established versus 2347 * overwritten outcomes. 2348 * 2349 * @param ex the exception 2350 * @throws NullPointerException if the exception is null 2351 */ obtrudeException(Throwable ex)2352 public void obtrudeException(Throwable ex) { 2353 if (ex == null) throw new NullPointerException(); 2354 result = new AltResult(ex); 2355 postComplete(); 2356 } 2357 2358 /** 2359 * Returns the estimated number of CompletableFutures whose 2360 * completions are awaiting completion of this CompletableFuture. 2361 * This method is designed for use in monitoring system state, not 2362 * for synchronization control. 2363 * 2364 * @return the number of dependent CompletableFutures 2365 */ getNumberOfDependents()2366 public int getNumberOfDependents() { 2367 int count = 0; 2368 for (Completion p = stack; p != null; p = p.next) 2369 ++count; 2370 return count; 2371 } 2372 2373 /** 2374 * Returns a string identifying this CompletableFuture, as well as 2375 * its completion state. The state, in brackets, contains the 2376 * String {@code "Completed Normally"} or the String {@code 2377 * "Completed Exceptionally"}, or the String {@code "Not 2378 * completed"} followed by the number of CompletableFutures 2379 * dependent upon its completion, if any. 2380 * 2381 * @return a string identifying this CompletableFuture, as well as its state 2382 */ toString()2383 public String toString() { 2384 Object r = result; 2385 int count = 0; // avoid call to getNumberOfDependents in case disabled 2386 for (Completion p = stack; p != null; p = p.next) 2387 ++count; 2388 return super.toString() + 2389 ((r == null) ? 2390 ((count == 0) ? 2391 "[Not completed]" : 2392 "[Not completed, " + count + " dependents]") : 2393 (((r instanceof AltResult) && ((AltResult)r).ex != null) ? 2394 "[Completed exceptionally]" : 2395 "[Completed normally]")); 2396 } 2397 2398 // jdk9 additions 2399 2400 /** 2401 * Returns a new incomplete CompletableFuture of the type to be 2402 * returned by a CompletionStage method. Subclasses should 2403 * normally override this method to return an instance of the same 2404 * class as this CompletableFuture. The default implementation 2405 * returns an instance of class CompletableFuture. 2406 * 2407 * @param <U> the type of the value 2408 * @return a new CompletableFuture 2409 * @since 9 2410 * @hide API from OpenJDK 9, not yet exposed on Android. 2411 */ newIncompleteFuture()2412 public <U> CompletableFuture<U> newIncompleteFuture() { 2413 return new CompletableFuture<U>(); 2414 } 2415 2416 /** 2417 * Returns the default Executor used for async methods that do not 2418 * specify an Executor. This class uses the {@link 2419 * ForkJoinPool#commonPool()} if it supports more than one 2420 * parallel thread, or else an Executor using one thread per async 2421 * task. This method may be overridden in subclasses to return 2422 * an Executor that provides at least one independent thread. 2423 * 2424 * @return the executor 2425 * @since 9 2426 * @hide API from OpenJDK 9, not yet exposed on Android. 2427 */ defaultExecutor()2428 public Executor defaultExecutor() { 2429 return ASYNC_POOL; 2430 } 2431 2432 /** 2433 * Returns a new CompletableFuture that is completed normally with 2434 * the same value as this CompletableFuture when it completes 2435 * normally. If this CompletableFuture completes exceptionally, 2436 * then the returned CompletableFuture completes exceptionally 2437 * with a CompletionException with this exception as cause. The 2438 * behavior is equivalent to {@code thenApply(x -> x)}. This 2439 * method may be useful as a form of "defensive copying", to 2440 * prevent clients from completing, while still being able to 2441 * arrange dependent actions. 2442 * 2443 * @return the new CompletableFuture 2444 * @since 9 2445 * @hide API from OpenJDK 9, not yet exposed on Android. 2446 */ copy()2447 public CompletableFuture<T> copy() { 2448 return uniCopyStage(); 2449 } 2450 2451 /** 2452 * Returns a new CompletionStage that is completed normally with 2453 * the same value as this CompletableFuture when it completes 2454 * normally, and cannot be independently completed or otherwise 2455 * used in ways not defined by the methods of interface {@link 2456 * CompletionStage}. If this CompletableFuture completes 2457 * exceptionally, then the returned CompletionStage completes 2458 * exceptionally with a CompletionException with this exception as 2459 * cause. 2460 * 2461 * @return the new CompletionStage 2462 * @since 9 2463 * @hide API from OpenJDK 9, not yet exposed on Android. 2464 */ minimalCompletionStage()2465 public CompletionStage<T> minimalCompletionStage() { 2466 return uniAsMinimalStage(); 2467 } 2468 2469 /** 2470 * Completes this CompletableFuture with the result of 2471 * the given Supplier function invoked from an asynchronous 2472 * task using the given executor. 2473 * 2474 * @param supplier a function returning the value to be used 2475 * to complete this CompletableFuture 2476 * @param executor the executor to use for asynchronous execution 2477 * @return this CompletableFuture 2478 * @since 9 2479 * @hide API from OpenJDK 9, not yet exposed on Android. 2480 */ completeAsync(Supplier<? extends T> supplier, Executor executor)2481 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier, 2482 Executor executor) { 2483 if (supplier == null || executor == null) 2484 throw new NullPointerException(); 2485 executor.execute(new AsyncSupply<T>(this, supplier)); 2486 return this; 2487 } 2488 2489 /** 2490 * Completes this CompletableFuture with the result of the given 2491 * Supplier function invoked from an asynchronous task using the 2492 * default executor. 2493 * 2494 * @param supplier a function returning the value to be used 2495 * to complete this CompletableFuture 2496 * @return this CompletableFuture 2497 * @since 9 2498 * @hide API from OpenJDK 9, not yet exposed on Android. 2499 */ completeAsync(Supplier<? extends T> supplier)2500 public CompletableFuture<T> completeAsync(Supplier<? extends T> supplier) { 2501 return completeAsync(supplier, defaultExecutor()); 2502 } 2503 2504 /** 2505 * Exceptionally completes this CompletableFuture with 2506 * a {@link TimeoutException} if not otherwise completed 2507 * before the given timeout. 2508 * 2509 * @param timeout how long to wait before completing exceptionally 2510 * with a TimeoutException, in units of {@code unit} 2511 * @param unit a {@code TimeUnit} determining how to interpret the 2512 * {@code timeout} parameter 2513 * @return this CompletableFuture 2514 * @since 9 2515 * @hide API from OpenJDK 9, not yet exposed on Android. 2516 */ orTimeout(long timeout, TimeUnit unit)2517 public CompletableFuture<T> orTimeout(long timeout, TimeUnit unit) { 2518 if (unit == null) 2519 throw new NullPointerException(); 2520 if (result == null) 2521 whenComplete(new Canceller(Delayer.delay(new Timeout(this), 2522 timeout, unit))); 2523 return this; 2524 } 2525 2526 /** 2527 * Completes this CompletableFuture with the given value if not 2528 * otherwise completed before the given timeout. 2529 * 2530 * @param value the value to use upon timeout 2531 * @param timeout how long to wait before completing normally 2532 * with the given value, in units of {@code unit} 2533 * @param unit a {@code TimeUnit} determining how to interpret the 2534 * {@code timeout} parameter 2535 * @return this CompletableFuture 2536 * @since 9 2537 * @hide API from OpenJDK 9, not yet exposed on Android. 2538 */ completeOnTimeout(T value, long timeout, TimeUnit unit)2539 public CompletableFuture<T> completeOnTimeout(T value, long timeout, 2540 TimeUnit unit) { 2541 if (unit == null) 2542 throw new NullPointerException(); 2543 if (result == null) 2544 whenComplete(new Canceller(Delayer.delay( 2545 new DelayedCompleter<T>(this, value), 2546 timeout, unit))); 2547 return this; 2548 } 2549 2550 /** 2551 * Returns a new Executor that submits a task to the given base 2552 * executor after the given delay (or no delay if non-positive). 2553 * Each delay commences upon invocation of the returned executor's 2554 * {@code execute} method. 2555 * 2556 * @param delay how long to delay, in units of {@code unit} 2557 * @param unit a {@code TimeUnit} determining how to interpret the 2558 * {@code delay} parameter 2559 * @param executor the base executor 2560 * @return the new delayed executor 2561 * @since 9 2562 * @hide API from OpenJDK 9, not yet exposed on Android. 2563 */ delayedExecutor(long delay, TimeUnit unit, Executor executor)2564 public static Executor delayedExecutor(long delay, TimeUnit unit, 2565 Executor executor) { 2566 if (unit == null || executor == null) 2567 throw new NullPointerException(); 2568 return new DelayedExecutor(delay, unit, executor); 2569 } 2570 2571 /** 2572 * Returns a new Executor that submits a task to the default 2573 * executor after the given delay (or no delay if non-positive). 2574 * Each delay commences upon invocation of the returned executor's 2575 * {@code execute} method. 2576 * 2577 * @param delay how long to delay, in units of {@code unit} 2578 * @param unit a {@code TimeUnit} determining how to interpret the 2579 * {@code delay} parameter 2580 * @return the new delayed executor 2581 * @since 9 2582 * @hide API from OpenJDK 9, not yet exposed on Android. 2583 */ delayedExecutor(long delay, TimeUnit unit)2584 public static Executor delayedExecutor(long delay, TimeUnit unit) { 2585 if (unit == null) 2586 throw new NullPointerException(); 2587 return new DelayedExecutor(delay, unit, ASYNC_POOL); 2588 } 2589 2590 /** 2591 * Returns a new CompletionStage that is already completed with 2592 * the given value and supports only those methods in 2593 * interface {@link CompletionStage}. 2594 * 2595 * @param value the value 2596 * @param <U> the type of the value 2597 * @return the completed CompletionStage 2598 * @since 9 2599 * @hide API from OpenJDK 9, not yet exposed on Android. 2600 */ completedStage(U value)2601 public static <U> CompletionStage<U> completedStage(U value) { 2602 return new MinimalStage<U>((value == null) ? NIL : value); 2603 } 2604 2605 /** 2606 * Returns a new CompletableFuture that is already completed 2607 * exceptionally with the given exception. 2608 * 2609 * @param ex the exception 2610 * @param <U> the type of the value 2611 * @return the exceptionally completed CompletableFuture 2612 * @since 9 2613 * @hide API from OpenJDK 9, not yet exposed on Android. 2614 */ failedFuture(Throwable ex)2615 public static <U> CompletableFuture<U> failedFuture(Throwable ex) { 2616 if (ex == null) throw new NullPointerException(); 2617 return new CompletableFuture<U>(new AltResult(ex)); 2618 } 2619 2620 /** 2621 * Returns a new CompletionStage that is already completed 2622 * exceptionally with the given exception and supports only those 2623 * methods in interface {@link CompletionStage}. 2624 * 2625 * @param ex the exception 2626 * @param <U> the type of the value 2627 * @return the exceptionally completed CompletionStage 2628 * @since 9 2629 * @hide API from OpenJDK 9, not yet exposed on Android. 2630 */ failedStage(Throwable ex)2631 public static <U> CompletionStage<U> failedStage(Throwable ex) { 2632 if (ex == null) throw new NullPointerException(); 2633 return new MinimalStage<U>(new AltResult(ex)); 2634 } 2635 2636 /** 2637 * Singleton delay scheduler, used only for starting and 2638 * cancelling tasks. 2639 */ 2640 static final class Delayer { delay(Runnable command, long delay, TimeUnit unit)2641 static ScheduledFuture<?> delay(Runnable command, long delay, 2642 TimeUnit unit) { 2643 return delayer.schedule(command, delay, unit); 2644 } 2645 2646 static final class DaemonThreadFactory implements ThreadFactory { newThread(Runnable r)2647 public Thread newThread(Runnable r) { 2648 Thread t = new Thread(r); 2649 t.setDaemon(true); 2650 t.setName("CompletableFutureDelayScheduler"); 2651 return t; 2652 } 2653 } 2654 2655 static final ScheduledThreadPoolExecutor delayer; 2656 static { 2657 (delayer = new ScheduledThreadPoolExecutor( 2658 1, new DaemonThreadFactory())). 2659 setRemoveOnCancelPolicy(true); 2660 } 2661 } 2662 2663 // Little class-ified lambdas to better support monitoring 2664 2665 static final class DelayedExecutor implements Executor { 2666 final long delay; 2667 final TimeUnit unit; 2668 final Executor executor; DelayedExecutor(long delay, TimeUnit unit, Executor executor)2669 DelayedExecutor(long delay, TimeUnit unit, Executor executor) { 2670 this.delay = delay; this.unit = unit; this.executor = executor; 2671 } execute(Runnable r)2672 public void execute(Runnable r) { 2673 Delayer.delay(new TaskSubmitter(executor, r), delay, unit); 2674 } 2675 } 2676 2677 /** Action to submit user task */ 2678 static final class TaskSubmitter implements Runnable { 2679 final Executor executor; 2680 final Runnable action; TaskSubmitter(Executor executor, Runnable action)2681 TaskSubmitter(Executor executor, Runnable action) { 2682 this.executor = executor; 2683 this.action = action; 2684 } run()2685 public void run() { executor.execute(action); } 2686 } 2687 2688 /** Action to completeExceptionally on timeout */ 2689 static final class Timeout implements Runnable { 2690 final CompletableFuture<?> f; Timeout(CompletableFuture<?> f)2691 Timeout(CompletableFuture<?> f) { this.f = f; } run()2692 public void run() { 2693 if (f != null && !f.isDone()) 2694 f.completeExceptionally(new TimeoutException()); 2695 } 2696 } 2697 2698 /** Action to complete on timeout */ 2699 static final class DelayedCompleter<U> implements Runnable { 2700 final CompletableFuture<U> f; 2701 final U u; DelayedCompleter(CompletableFuture<U> f, U u)2702 DelayedCompleter(CompletableFuture<U> f, U u) { this.f = f; this.u = u; } run()2703 public void run() { 2704 if (f != null) 2705 f.complete(u); 2706 } 2707 } 2708 2709 /** Action to cancel unneeded timeouts */ 2710 static final class Canceller implements BiConsumer<Object, Throwable> { 2711 final Future<?> f; Canceller(Future<?> f)2712 Canceller(Future<?> f) { this.f = f; } accept(Object ignore, Throwable ex)2713 public void accept(Object ignore, Throwable ex) { 2714 if (ex == null && f != null && !f.isDone()) 2715 f.cancel(false); 2716 } 2717 } 2718 2719 /** 2720 * A subclass that just throws UOE for most non-CompletionStage methods. 2721 */ 2722 static final class MinimalStage<T> extends CompletableFuture<T> { MinimalStage()2723 MinimalStage() { } MinimalStage(Object r)2724 MinimalStage(Object r) { super(r); } newIncompleteFuture()2725 @Override public <U> CompletableFuture<U> newIncompleteFuture() { 2726 return new MinimalStage<U>(); } get()2727 @Override public T get() { 2728 throw new UnsupportedOperationException(); } get(long timeout, TimeUnit unit)2729 @Override public T get(long timeout, TimeUnit unit) { 2730 throw new UnsupportedOperationException(); } getNow(T valueIfAbsent)2731 @Override public T getNow(T valueIfAbsent) { 2732 throw new UnsupportedOperationException(); } join()2733 @Override public T join() { 2734 throw new UnsupportedOperationException(); } complete(T value)2735 @Override public boolean complete(T value) { 2736 throw new UnsupportedOperationException(); } completeExceptionally(Throwable ex)2737 @Override public boolean completeExceptionally(Throwable ex) { 2738 throw new UnsupportedOperationException(); } cancel(boolean mayInterruptIfRunning)2739 @Override public boolean cancel(boolean mayInterruptIfRunning) { 2740 throw new UnsupportedOperationException(); } obtrudeValue(T value)2741 @Override public void obtrudeValue(T value) { 2742 throw new UnsupportedOperationException(); } obtrudeException(Throwable ex)2743 @Override public void obtrudeException(Throwable ex) { 2744 throw new UnsupportedOperationException(); } isDone()2745 @Override public boolean isDone() { 2746 throw new UnsupportedOperationException(); } isCancelled()2747 @Override public boolean isCancelled() { 2748 throw new UnsupportedOperationException(); } isCompletedExceptionally()2749 @Override public boolean isCompletedExceptionally() { 2750 throw new UnsupportedOperationException(); } getNumberOfDependents()2751 @Override public int getNumberOfDependents() { 2752 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier, Executor executor)2753 @Override public CompletableFuture<T> completeAsync 2754 (Supplier<? extends T> supplier, Executor executor) { 2755 throw new UnsupportedOperationException(); } completeAsync(Supplier<? extends T> supplier)2756 @Override public CompletableFuture<T> completeAsync 2757 (Supplier<? extends T> supplier) { 2758 throw new UnsupportedOperationException(); } orTimeout(long timeout, TimeUnit unit)2759 @Override public CompletableFuture<T> orTimeout 2760 (long timeout, TimeUnit unit) { 2761 throw new UnsupportedOperationException(); } completeOnTimeout(T value, long timeout, TimeUnit unit)2762 @Override public CompletableFuture<T> completeOnTimeout 2763 (T value, long timeout, TimeUnit unit) { 2764 throw new UnsupportedOperationException(); } 2765 } 2766 2767 // Unsafe mechanics 2768 private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); 2769 private static final long RESULT; 2770 private static final long STACK; 2771 private static final long NEXT; 2772 static { 2773 try { 2774 RESULT = U.objectFieldOffset 2775 (CompletableFuture.class.getDeclaredField("result")); 2776 STACK = U.objectFieldOffset 2777 (CompletableFuture.class.getDeclaredField("stack")); 2778 NEXT = U.objectFieldOffset 2779 (Completion.class.getDeclaredField("next")); 2780 } catch (ReflectiveOperationException e) { 2781 throw new Error(e); 2782 } 2783 2784 // Reduce the risk of rare disastrous classloading in first call to 2785 // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 2786 Class<?> ensureLoaded = LockSupport.class; 2787 } 2788 } 2789